differential_dataflow/trace/implementations/
merge_batcher_col.rs

1//! A general purpose `Batcher` implementation based on radix sort for TimelyStack.
2
3use timely::Container;
4use timely::communication::message::RefOrMut;
5use timely::container::columnation::{Columnation, TimelyStack};
6use timely::logging::WorkerIdentifier;
7use timely::logging_core::Logger;
8use timely::progress::{frontier::Antichain, Timestamp};
9
10use crate::difference::Semigroup;
11use crate::logging::{BatcherEvent, DifferentialEvent};
12use crate::trace::{Batcher, Builder};
13
14/// Creates batches from unordered tuples.
15pub struct ColumnatedMergeBatcher<K, V, T, D>
16where
17    K: Columnation + 'static,
18    V: Columnation + 'static,
19    T: Columnation + 'static,
20    D: Columnation + 'static,
21{
22    sorter: MergeSorterColumnation<(K, V), T, D>,
23    lower: Antichain<T>,
24    frontier: Antichain<T>,
25}
26
27impl<K, V, T, D> Batcher for ColumnatedMergeBatcher<K, V, T, D>
28where
29    K: Columnation + Ord + Clone + 'static,
30    V: Columnation + Ord + Clone + 'static,
31    T: Columnation + Timestamp + 'static,
32    D: Columnation + Semigroup + 'static,
33{
34    type Item = ((K,V),T,D);
35    type Time = T;
36
37    fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self {
38        ColumnatedMergeBatcher {
39            sorter: MergeSorterColumnation::new(logger, operator_id),
40            frontier: Antichain::new(),
41            lower: Antichain::from_elem(<T as Timestamp>::minimum()),
42        }
43    }
44
45    #[inline]
46    fn push_batch(&mut self, batch: RefOrMut<Vec<Self::Item>>) {
47        // `batch` is either a shared reference or an owned allocations.
48        match batch {
49            RefOrMut::Ref(reference) => {
50                // This is a moment at which we could capture the allocations backing
51                // `batch` into a different form of region, rather than just  cloning.
52                self.sorter.push(&mut reference.clone());
53            },
54            RefOrMut::Mut(reference) => {
55                self.sorter.push(reference);
56            }
57        }
58    }
59
60    // Sealing a batch means finding those updates with times not greater or equal to any time
61    // in `upper`. All updates must have time greater or equal to the previously used `upper`,
62    // which we call `lower`, by assumption that after sealing a batcher we receive no more
63    // updates with times not greater or equal to `upper`.
64    #[inline]
65    fn seal<B: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<T>) -> B::Output {
66
67        let mut merged = Default::default();
68        self.sorter.finish_into(&mut merged);
69
70        // Determine the number of distinct keys, values, and updates,
71        // and form a builder pre-sized for these numbers.
72        let mut builder = {
73            let mut keys = 0;
74            let mut vals = 0;
75            let mut upds = 0;
76            let mut prev_keyval = None;
77            for buffer in merged.iter() {
78                for ((key, val), time, _) in buffer.iter() {
79                    if !upper.less_equal(time) {
80                        if let Some((p_key, p_val)) = prev_keyval {
81                            if p_key != key {
82                                keys += 1;
83                                vals += 1;
84                            }
85                            else if p_val != val {
86                                vals += 1;
87                            }
88                            upds += 1;
89                        }
90                        prev_keyval = Some((key, val));
91                    }
92                }
93            }
94            B::with_capacity(keys, vals, upds)
95        };
96
97        let mut kept = Vec::new();
98        let mut keep = TimelyStack::default();
99
100        self.frontier.clear();
101
102        for buffer in merged.drain(..) {
103            for datum @ ((_key, _val), time, _diff) in &buffer[..] {
104                if upper.less_equal(time) {
105                    self.frontier.insert(time.clone());
106                    if keep.is_empty() {
107                        if keep.capacity() != MergeSorterColumnation::<(K, V), T, D>::buffer_size() {
108                            keep = self.sorter.empty();
109                        }
110                    } else if keep.len() == keep.capacity() {
111                        kept.push(keep);
112                        keep = self.sorter.empty();
113                    }
114                    keep.copy(datum);
115                }
116                else {
117                    builder.copy(datum);
118                }
119            }
120            // Recycling buffer.
121            self.sorter.recycle(buffer);
122        }
123
124        // Finish the kept data.
125        if !keep.is_empty() {
126            kept.push(keep);
127        }
128        if !kept.is_empty() {
129            self.sorter.push_list(kept);
130        }
131
132        // Drain buffers (fast reclamation).
133        self.sorter.clear_stash();
134
135        let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(T::minimum()));
136        self.lower = upper;
137        seal
138    }
139
140    /// The frontier of elements remaining after the most recent call to `self.seal`.
141    fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<T> {
142        self.frontier.borrow()
143    }
144}
145
146struct TimelyStackQueue<T: Columnation> {
147    list: TimelyStack<T>,
148    head: usize,
149}
150
151impl<T: Columnation> Default for TimelyStackQueue<T> {
152    fn default() -> Self {
153        Self::from(Default::default())
154    }
155}
156
157impl<T: Columnation> TimelyStackQueue<T> {
158
159    fn pop(&mut self) -> &T {
160        self.head += 1;
161        &self.list[self.head - 1]
162    }
163
164    fn peek(&self) -> &T {
165        &self.list[self.head]
166    }
167
168    fn from(list: TimelyStack<T>) -> Self {
169        TimelyStackQueue {
170            list,
171            head: 0,
172        }
173    }
174
175    fn done(self) -> TimelyStack<T> {
176        self.list
177    }
178
179    fn is_empty(&self) -> bool { self.head == self.list[..].len() }
180
181    /// Return an iterator over the remaining elements.
182    fn iter(&self) -> impl Iterator<Item=&T> + Clone + ExactSizeIterator {
183        self.list[self.head..].iter()
184    }
185}
186
187struct MergeSorterColumnation<D: Columnation + 'static, T: Columnation + 'static, R: Columnation + 'static> {
188    /// each power-of-two length list of allocations. Do not push/pop directly but use the corresponding functions.
189    queue: Vec<Vec<TimelyStack<(D, T, R)>>>,
190    stash: Vec<TimelyStack<(D, T, R)>>,
191    pending: Vec<(D, T, R)>,
192    logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>,
193    operator_id: usize,
194}
195
196impl<D: Ord+Columnation+'static, T: Ord+Columnation+'static, R: Semigroup+Columnation+'static> MergeSorterColumnation<D, T, R> {
197
198    const BUFFER_SIZE_BYTES: usize = 64 << 10;
199
200    /// Buffer size (number of elements) to use for new/empty buffers.
201    const fn buffer_size() -> usize {
202        let size = std::mem::size_of::<(D, T, R)>();
203        if size == 0 {
204            Self::BUFFER_SIZE_BYTES
205        } else if size <= Self::BUFFER_SIZE_BYTES {
206            Self::BUFFER_SIZE_BYTES / size
207        } else {
208            1
209        }
210    }
211
212    /// Buffer size for pending updates, currently 2 * [`Self::buffer_size`].
213    const fn pending_buffer_size() -> usize {
214        Self::buffer_size() * 2
215    }
216
217    fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self {
218        Self {
219            logger,
220            operator_id,
221            queue: Vec::new(),
222            stash: Vec::new(),
223            pending: Vec::new(),
224        }
225    }
226
227    fn empty(&mut self) -> TimelyStack<(D, T, R)> {
228        self.stash.pop().unwrap_or_else(|| TimelyStack::with_capacity(Self::buffer_size()))
229    }
230
231    /// Remove all elements from the stash.
232    fn clear_stash(&mut self) {
233        self.stash.clear();
234    }
235
236    /// Insert an empty buffer into the stash. Panics if the buffer is not empty.
237    fn recycle(&mut self, mut buffer: TimelyStack<(D, T, R)>) {
238        if buffer.capacity() == Self::buffer_size() && self.stash.len() < 2 {
239            buffer.clear();
240            self.stash.push(buffer);
241        }
242    }
243
244    fn push(&mut self, batch: &mut Vec<(D, T, R)>) {
245        // Ensure `self.pending` has a capacity of `Self::pending_buffer_size`.
246        if self.pending.capacity() < Self::pending_buffer_size() {
247            self.pending.reserve(Self::pending_buffer_size() - self.pending.capacity());
248        }
249
250        while !batch.is_empty() {
251            self.pending.extend(batch.drain(..std::cmp::min(batch.len(), self.pending.capacity() - self.pending.len())));
252            if self.pending.len() == self.pending.capacity() {
253                crate::consolidation::consolidate_updates(&mut self.pending);
254                if self.pending.len() > self.pending.capacity() / 2 {
255                    // Flush if `self.pending` is more than half full after consolidation.
256                    self.flush_pending();
257                }
258            }
259        }
260    }
261
262    /// Move all elements in `pending` into `queue`. The data in `pending` must be compacted and
263    /// sorted. After this function returns, `self.pending` is empty.
264    fn flush_pending(&mut self) {
265        if !self.pending.is_empty() {
266            let mut stack = self.empty();
267            stack.reserve_items(self.pending.iter());
268            for tuple in self.pending.drain(..) {
269                stack.copy(&tuple);
270            }
271            self.queue_push(vec![stack]);
272            while self.queue.len() > 1 && (self.queue[self.queue.len()-1].len() >= self.queue[self.queue.len()-2].len() / 2) {
273                let list1 = self.queue_pop().unwrap();
274                let list2 = self.queue_pop().unwrap();
275                let merged = self.merge_by(list1, list2);
276                self.queue_push(merged);
277            }
278        }
279    }
280
281    // This is awkward, because it isn't a power-of-two length any more, and we don't want
282    // to break it down to be so.
283    fn push_list(&mut self, list: Vec<TimelyStack<(D, T, R)>>) {
284        while self.queue.len() > 1 && self.queue[self.queue.len()-1].len() < list.len() {
285            let list1 = self.queue_pop().unwrap();
286            let list2 = self.queue_pop().unwrap();
287            let merged = self.merge_by(list1, list2);
288            self.queue_push(merged);
289        }
290        self.queue_push(list);
291    }
292
293    fn finish_into(&mut self, target: &mut Vec<TimelyStack<(D, T, R)>>) {
294        crate::consolidation::consolidate_updates(&mut self.pending);
295        self.flush_pending();
296        while self.queue.len() > 1 {
297            let list1 = self.queue_pop().unwrap();
298            let list2 = self.queue_pop().unwrap();
299            let merged = self.merge_by(list1, list2);
300            self.queue_push(merged);
301        }
302
303        if let Some(mut last) = self.queue_pop() {
304            std::mem::swap(&mut last, target);
305        }
306    }
307
308    // merges two sorted input lists into one sorted output list.
309    fn merge_by(&mut self, list1: Vec<TimelyStack<(D, T, R)>>, list2: Vec<TimelyStack<(D, T, R)>>) -> Vec<TimelyStack<(D, T, R)>> {
310        use std::cmp::Ordering;
311
312        // TODO: `list1` and `list2` get dropped; would be better to reuse?
313        let mut output = Vec::with_capacity(list1.len() + list2.len());
314        let mut result = self.empty();
315
316        let mut list1 = list1.into_iter();
317        let mut list2 = list2.into_iter();
318
319        let mut head1 = TimelyStackQueue::from(list1.next().unwrap_or_default());
320        let mut head2 = TimelyStackQueue::from(list2.next().unwrap_or_default());
321
322        // while we have valid data in each input, merge.
323        while !head1.is_empty() && !head2.is_empty() {
324
325            while (result.capacity() - result.len()) > 0 && !head1.is_empty() && !head2.is_empty() {
326
327                let cmp = {
328                    let x = head1.peek();
329                    let y = head2.peek();
330                    (&x.0, &x.1).cmp(&(&y.0, &y.1))
331                };
332                match cmp {
333                    Ordering::Less    => { result.copy(head1.pop()); }
334                    Ordering::Greater => { result.copy(head2.pop()); }
335                    Ordering::Equal   => {
336                        let (data1, time1, diff1) = head1.pop();
337                        let (_data2, _time2, diff2) = head2.pop();
338                        let mut diff1 = diff1.clone();
339                        diff1.plus_equals(diff2);
340                        if !diff1.is_zero() {
341                            result.copy_destructured(data1, time1, &diff1);
342                        }
343                    }
344                }
345            }
346
347            if result.capacity() == result.len() {
348                output.push(result);
349                result = self.empty();
350            }
351
352            if head1.is_empty() {
353                self.recycle(head1.done());
354                head1 = TimelyStackQueue::from(list1.next().unwrap_or_default());
355            }
356            if head2.is_empty() {
357                self.recycle(head2.done());
358                head2 = TimelyStackQueue::from(list2.next().unwrap_or_default());
359            }
360        }
361
362        if result.len() > 0 {
363            output.push(result);
364        } else {
365            self.recycle(result);
366        }
367
368        if !head1.is_empty() {
369            let mut result = self.empty();
370            result.reserve_items(head1.iter());
371            for item in head1.iter() { result.copy(item); }
372            output.push(result);
373        }
374        output.extend(list1);
375
376        if !head2.is_empty() {
377            let mut result = self.empty();
378            result.reserve_items(head2.iter());
379            for item in head2.iter() { result.copy(item); }
380            output.push(result);
381        }
382        output.extend(list2);
383
384        output
385    }
386}
387
388impl<D: Columnation + 'static, T: Columnation + 'static, R: Columnation + 'static> MergeSorterColumnation<D, T, R> {
389    /// Pop a batch from `self.queue` and account size changes.
390    #[inline]
391    fn queue_pop(&mut self) -> Option<Vec<TimelyStack<(D, T, R)>>> {
392        let batch = self.queue.pop();
393        self.account(batch.iter().flatten(), -1);
394        batch
395    }
396
397    /// Push a batch to `self.queue` and account size changes.
398    #[inline]
399    fn queue_push(&mut self, batch: Vec<TimelyStack<(D, T, R)>>) {
400        self.account(&batch, 1);
401        self.queue.push(batch);
402    }
403
404    /// Account size changes. Only performs work if a logger exists.
405    ///
406    /// Calculate the size based on the [`TimelyStack`]s passed along, with each attribute
407    /// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff.
408    fn account<'a, I: IntoIterator<Item=&'a TimelyStack<(D, T, R)>>>(&self, items: I, diff: isize) {
409        if let Some(logger) = &self.logger {
410            let (mut records, mut siz, mut capacity, mut allocations) = (0isize, 0isize, 0isize, 0isize);
411            for stack in items {
412                records = records.saturating_add_unsigned(stack.len());
413                stack.heap_size(|s, c| {
414                    siz = siz.saturating_add_unsigned(s);
415                    capacity = capacity.saturating_add_unsigned(c);
416                    allocations += isize::from(c > 0);
417                });
418            }
419            logger.log(BatcherEvent {
420                operator: self.operator_id,
421                records_diff: records * diff,
422                size_diff: siz * diff,
423                capacity_diff: capacity * diff,
424                allocations_diff: allocations * diff,
425            })
426        }
427    }
428
429}
430
431impl<D: Columnation + 'static, T: Columnation + 'static, R: Columnation + 'static> Drop for MergeSorterColumnation<D, T, R> {
432    fn drop(&mut self) {
433        while self.queue_pop().is_some() { }
434    }
435}