differential_dataflow/trace/implementations/
merge_batcher.rs

1//! A general purpose `Batcher` implementation based on radix sort.
2
3use std::collections::VecDeque;
4
5use timely::communication::message::RefOrMut;
6use timely::logging::WorkerIdentifier;
7use timely::logging_core::Logger;
8use timely::progress::{frontier::Antichain, Timestamp};
9
10use crate::difference::Semigroup;
11use crate::logging::{BatcherEvent, DifferentialEvent};
12use crate::trace::{Batcher, Builder};
13
14/// Creates batches from unordered tuples.
15pub struct MergeBatcher<K, V, T, D> {
16    sorter: MergeSorter<(K, V), T, D>,
17    lower: Antichain<T>,
18    frontier: Antichain<T>,
19}
20
21impl<K, V, T, D> Batcher for MergeBatcher<K, V, T, D>
22where
23    K: Ord + Clone,
24    V: Ord + Clone,
25    T: Timestamp,
26    D: Semigroup,
27{
28    type Item = ((K,V),T,D);
29    type Time = T;
30
31    fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self {
32        MergeBatcher {
33            sorter: MergeSorter::new(logger, operator_id),
34            frontier: Antichain::new(),
35            lower: Antichain::from_elem(T::minimum()),
36        }
37    }
38
39    #[inline(never)]
40    fn push_batch(&mut self, batch: RefOrMut<Vec<Self::Item>>) {
41        // `batch` is either a shared reference or an owned allocations.
42        match batch {
43            RefOrMut::Ref(reference) => {
44                // This is a moment at which we could capture the allocations backing
45                // `batch` into a different form of region, rather than just  cloning.
46                let mut owned: Vec<_> = self.sorter.empty();
47                owned.clone_from(reference);
48                self.sorter.push(&mut owned);
49            },
50            RefOrMut::Mut(reference) => {
51                self.sorter.push(reference);
52            }
53        }
54    }
55
56    // Sealing a batch means finding those updates with times not greater or equal to any time
57    // in `upper`. All updates must have time greater or equal to the previously used `upper`,
58    // which we call `lower`, by assumption that after sealing a batcher we receive no more
59    // updates with times not greater or equal to `upper`.
60    #[inline(never)]
61    fn seal<B: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<T>) -> B::Output {
62
63        let mut merged = Vec::new();
64        self.sorter.finish_into(&mut merged);
65
66        // Determine the number of distinct keys, values, and updates,
67        // and form a builder pre-sized for these numbers.
68        let mut builder = {
69            let mut keys = 0;
70            let mut vals = 0;
71            let mut upds = 0;
72            let mut prev_keyval = None;
73            for buffer in merged.iter() {
74                for ((key, val), time, _) in buffer.iter() {
75                    if !upper.less_equal(time) {
76                        if let Some((p_key, p_val)) = prev_keyval {
77                            if p_key != key {
78                                keys += 1;
79                                vals += 1;
80                            }
81                            else if p_val != val {
82                                vals += 1;
83                            }
84                            upds += 1;
85                        }
86                        prev_keyval = Some((key, val));
87                    }
88                }
89            }
90            B::with_capacity(keys, vals, upds)
91        };
92
93        let mut kept = Vec::new();
94        let mut keep = Vec::new();
95
96        self.frontier.clear();
97
98        // TODO: Re-use buffer, rather than dropping.
99        for mut buffer in merged.drain(..) {
100            for ((key, val), time, diff) in buffer.drain(..) {
101                if upper.less_equal(&time) {
102                    self.frontier.insert(time.clone());
103                    if keep.len() == keep.capacity() && !keep.is_empty() {
104                        kept.push(keep);
105                        keep = self.sorter.empty();
106                    }
107                    keep.push(((key, val), time, diff));
108                }
109                else {
110                    builder.push(((key, val), time, diff));
111                }
112            }
113            // Recycling buffer.
114            self.sorter.push(&mut buffer);
115        }
116
117        // Finish the kept data.
118        if !keep.is_empty() {
119            kept.push(keep);
120        }
121        if !kept.is_empty() {
122            self.sorter.push_list(kept);
123        }
124
125        // Drain buffers (fast reclaimation).
126        // TODO : This isn't obviously the best policy, but "safe" wrt footprint.
127        //        In particular, if we are reading serialized input data, we may
128        //        prefer to keep these buffers around to re-fill, if possible.
129        let mut buffer = Vec::new();
130        self.sorter.push(&mut buffer);
131        // We recycle buffers with allocations (capacity, and not zero-sized).
132        while buffer.capacity() > 0 && std::mem::size_of::<((K,V),T,D)>() > 0 {
133            buffer = Vec::new();
134            self.sorter.push(&mut buffer);
135        }
136
137        let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(T::minimum()));
138        self.lower = upper;
139        seal
140    }
141
142    /// The frontier of elements remaining after the most recent call to `self.seal`.
143    fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<T> {
144        self.frontier.borrow()
145    }
146}
147
148struct MergeSorter<D, T, R> {
149    /// each power-of-two length list of allocations. Do not push/pop directly but use the corresponding functions.
150    queue: Vec<Vec<Vec<(D, T, R)>>>,
151    stash: Vec<Vec<(D, T, R)>>,
152    logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>,
153    operator_id: usize,
154}
155
156impl<D: Ord, T: Ord, R: Semigroup> MergeSorter<D, T, R> {
157
158    const BUFFER_SIZE_BYTES: usize = 1 << 13;
159
160    fn buffer_size() -> usize {
161        let size = ::std::mem::size_of::<(D, T, R)>();
162        if size == 0 {
163            Self::BUFFER_SIZE_BYTES
164        } else if size <= Self::BUFFER_SIZE_BYTES {
165            Self::BUFFER_SIZE_BYTES / size
166        } else {
167            1
168        }
169    }
170
171    #[inline]
172    fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self {
173        Self {
174            logger,
175            operator_id,
176            queue: Vec::new(),
177            stash: Vec::new(),
178        }
179    }
180
181    #[inline]
182    pub fn empty(&mut self) -> Vec<(D, T, R)> {
183        self.stash.pop().unwrap_or_else(|| Vec::with_capacity(Self::buffer_size()))
184    }
185
186    #[inline]
187    pub fn push(&mut self, batch: &mut Vec<(D, T, R)>) {
188        // TODO: Reason about possible unbounded stash growth. How to / should we return them?
189        // TODO: Reason about mis-sized vectors, from deserialized data; should probably drop.
190        let mut batch = if self.stash.len() > 2 {
191            ::std::mem::replace(batch, self.stash.pop().unwrap())
192        }
193        else {
194            ::std::mem::take(batch)
195        };
196
197        if !batch.is_empty() {
198            crate::consolidation::consolidate_updates(&mut batch);
199            self.account([batch.len()], 1);
200            self.queue_push(vec![batch]);
201            while self.queue.len() > 1 && (self.queue[self.queue.len()-1].len() >= self.queue[self.queue.len()-2].len() / 2) {
202                let list1 = self.queue_pop().unwrap();
203                let list2 = self.queue_pop().unwrap();
204                let merged = self.merge_by(list1, list2);
205                self.queue_push(merged);
206            }
207        }
208    }
209
210    // This is awkward, because it isn't a power-of-two length any more, and we don't want
211    // to break it down to be so.
212    pub fn push_list(&mut self, list: Vec<Vec<(D, T, R)>>) {
213        while self.queue.len() > 1 && self.queue[self.queue.len()-1].len() < list.len() {
214            let list1 = self.queue_pop().unwrap();
215            let list2 = self.queue_pop().unwrap();
216            let merged = self.merge_by(list1, list2);
217            self.queue_push(merged);
218        }
219        self.queue_push(list);
220    }
221
222    #[inline(never)]
223    pub fn finish_into(&mut self, target: &mut Vec<Vec<(D, T, R)>>) {
224        while self.queue.len() > 1 {
225            let list1 = self.queue_pop().unwrap();
226            let list2 = self.queue_pop().unwrap();
227            let merged = self.merge_by(list1, list2);
228            self.queue_push(merged);
229        }
230
231        if let Some(mut last) = self.queue_pop() {
232            ::std::mem::swap(&mut last, target);
233        }
234    }
235
236    // merges two sorted input lists into one sorted output list.
237    #[inline(never)]
238    fn merge_by(&mut self, list1: Vec<Vec<(D, T, R)>>, list2: Vec<Vec<(D, T, R)>>) -> Vec<Vec<(D, T, R)>> {
239        self.account(list1.iter().chain(list2.iter()).map(Vec::len), -1);
240
241        use std::cmp::Ordering;
242
243        // TODO: `list1` and `list2` get dropped; would be better to reuse?
244        let mut output = Vec::with_capacity(list1.len() + list2.len());
245        let mut result = self.empty();
246
247        let mut list1 = list1.into_iter();
248        let mut list2 = list2.into_iter();
249
250        let mut head1 = VecDeque::from(list1.next().unwrap_or_default());
251        let mut head2 = VecDeque::from(list2.next().unwrap_or_default());
252
253        // while we have valid data in each input, merge.
254        while !head1.is_empty() && !head2.is_empty() {
255
256            while (result.capacity() - result.len()) > 0 && !head1.is_empty() && !head2.is_empty() {
257
258                let cmp = {
259                    let x = head1.front().unwrap();
260                    let y = head2.front().unwrap();
261                    (&x.0, &x.1).cmp(&(&y.0, &y.1))
262                };
263                match cmp {
264                    Ordering::Less    => result.push(head1.pop_front().unwrap()),
265                    Ordering::Greater => result.push(head2.pop_front().unwrap()),
266                    Ordering::Equal   => {
267                        let (data1, time1, mut diff1) = head1.pop_front().unwrap();
268                        let (_data2, _time2, diff2) = head2.pop_front().unwrap();
269                        diff1.plus_equals(&diff2);
270                        if !diff1.is_zero() {
271                            result.push((data1, time1, diff1));
272                        }
273                    }
274                }
275            }
276
277            if result.capacity() == result.len() {
278                output.push(result);
279                result = self.empty();
280            }
281
282            if head1.is_empty() {
283                let done1 = Vec::from(head1);
284                if done1.capacity() == Self::buffer_size() { self.stash.push(done1); }
285                head1 = VecDeque::from(list1.next().unwrap_or_default());
286            }
287            if head2.is_empty() {
288                let done2 = Vec::from(head2);
289                if done2.capacity() == Self::buffer_size() { self.stash.push(done2); }
290                head2 = VecDeque::from(list2.next().unwrap_or_default());
291            }
292        }
293
294        if !result.is_empty() { output.push(result); }
295        else if result.capacity() > 0 { self.stash.push(result); }
296
297        if !head1.is_empty() {
298            let mut result = self.empty();
299            for item1 in head1 { result.push(item1); }
300            output.push(result);
301        }
302        output.extend(list1);
303
304        if !head2.is_empty() {
305            let mut result = self.empty();
306            for item2 in head2 { result.push(item2); }
307            output.push(result);
308        }
309        output.extend(list2);
310
311        output
312    }
313}
314
315impl<D, T, R> MergeSorter<D, T, R> {
316    /// Pop a batch from `self.queue` and account size changes.
317    #[inline]
318    fn queue_pop(&mut self) -> Option<Vec<Vec<(D, T, R)>>> {
319        let batch = self.queue.pop();
320        self.account(batch.iter().flatten().map(Vec::len), -1);
321        batch
322    }
323
324    /// Push a batch to `self.queue` and account size changes.
325    #[inline]
326    fn queue_push(&mut self, batch: Vec<Vec<(D, T, R)>>) {
327        self.account(batch.iter().map(Vec::len), 1);
328        self.queue.push(batch);
329    }
330
331    /// Account size changes. Only performs work if a logger exists.
332    ///
333    /// Calculate the size based on the [`TimelyStack`]s passed along, with each attribute
334    /// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff.
335    fn account<I: IntoIterator<Item=usize>>(&self, items: I, diff: isize) {
336        if let Some(logger) = &self.logger {
337            let mut records= 0isize;
338            for len in items {
339                records = records.saturating_add_unsigned(len);
340            }
341            logger.log(BatcherEvent {
342                operator: self.operator_id,
343                records_diff: records * diff,
344                size_diff: 0,
345                capacity_diff: 0,
346                allocations_diff: 0,
347            })
348        }
349    }
350}
351
352impl<D, T, R> Drop for MergeSorter<D, T, R> {
353    fn drop(&mut self) {
354        while self.queue_pop().is_some() { }
355    }
356}