differential_dataflow/trace/implementations/
merge_batcher.rs

1//! A `Batcher` implementation based on merge sort.
2//!
3//! The `MergeBatcher` requires support from two types, a "chunker" and a "merger".
4//! The chunker receives input batches and consolidates them, producing sorted output
5//! "chunks" that are fully consolidated (no adjacent updates can be accumulated).
6//! The merger implements the [`Merger`] trait, and provides hooks for manipulating
7//! sorted "chains" of chunks as needed by the merge batcher: merging chunks and also
8//! splitting them apart based on time.
9//!
10//! Implementations of `MergeBatcher` can be instantiated through the choice of both
11//! the chunker and the merger, provided their respective output and input types align.
12
13use std::marker::PhantomData;
14
15use timely::progress::frontier::AntichainRef;
16use timely::progress::{frontier::Antichain, Timestamp};
17use timely::Container;
18use timely::container::{ContainerBuilder, PushInto};
19
20use crate::logging::{BatcherEvent, Logger};
21use crate::trace::{Batcher, Builder, Description};
22
23/// Creates batches from containers of unordered tuples.
24///
25/// To implement `Batcher`, the container builder `C` must accept `&mut Input` as inputs,
26/// and must produce outputs of type `M::Chunk`.
27pub struct MergeBatcher<Input, C, M: Merger> {
28    /// Transforms input streams to chunks of sorted, consolidated data.
29    chunker: C,
30    /// A sequence of power-of-two length lists of sorted, consolidated containers.
31    ///
32    /// Do not push/pop directly but use the corresponding functions ([`Self::chain_push`]/[`Self::chain_pop`]).
33    chains: Vec<Vec<M::Chunk>>,
34    /// Stash of empty chunks, recycled through the merging process.
35    stash: Vec<M::Chunk>,
36    /// Merges consolidated chunks, and extracts the subset of an update chain that lies in an interval of time.
37    merger: M,
38    /// Current lower frontier, we sealed up to here.
39    lower: Antichain<M::Time>,
40    /// The lower-bound frontier of the data, after the last call to seal.
41    frontier: Antichain<M::Time>,
42    /// Logger for size accounting.
43    logger: Option<Logger>,
44    /// Timely operator ID.
45    operator_id: usize,
46    /// The `Input` type needs to be called out as the type of container accepted, but it is not otherwise present.
47    _marker: PhantomData<Input>,
48}
49
50impl<Input, C, M> Batcher for MergeBatcher<Input, C, M>
51where
52    C: ContainerBuilder<Container=M::Chunk> + Default + for<'a> PushInto<&'a mut Input>,
53    M: Merger<Time: Timestamp>,
54{
55    type Input = Input;
56    type Time = M::Time;
57    type Output = M::Chunk;
58
59    fn new(logger: Option<Logger>, operator_id: usize) -> Self {
60        Self {
61            logger,
62            operator_id,
63            chunker: C::default(),
64            merger: M::default(),
65            chains: Vec::new(),
66            stash: Vec::new(),
67            frontier: Antichain::new(),
68            lower: Antichain::from_elem(M::Time::minimum()),
69            _marker: PhantomData,
70        }
71    }
72
73    /// Push a container of data into this merge batcher. Updates the internal chain structure if
74    /// needed.
75    fn push_container(&mut self, container: &mut Input) {
76        self.chunker.push_into(container);
77        while let Some(chunk) = self.chunker.extract() {
78            let chunk = std::mem::take(chunk);
79            self.insert_chain(vec![chunk]);
80        }
81    }
82
83    // Sealing a batch means finding those updates with times not greater or equal to any time
84    // in `upper`. All updates must have time greater or equal to the previously used `upper`,
85    // which we call `lower`, by assumption that after sealing a batcher we receive no more
86    // updates with times not greater or equal to `upper`.
87    fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(&mut self, upper: Antichain<M::Time>) -> B::Output {
88        // Finish
89        while let Some(chunk) = self.chunker.finish() {
90            let chunk = std::mem::take(chunk);
91            self.insert_chain(vec![chunk]);
92        }
93
94        // Merge all remaining chains into a single chain.
95        while self.chains.len() > 1 {
96            let list1 = self.chain_pop().unwrap();
97            let list2 = self.chain_pop().unwrap();
98            let merged = self.merge_by(list1, list2);
99            self.chain_push(merged);
100        }
101        let merged = self.chain_pop().unwrap_or_default();
102
103        // Extract readied data.
104        let mut kept = Vec::new();
105        let mut readied = Vec::new();
106        self.frontier.clear();
107
108        self.merger.extract(merged, upper.borrow(), &mut self.frontier, &mut readied, &mut kept, &mut self.stash);
109
110        if !kept.is_empty() {
111            self.chain_push(kept);
112        }
113
114        self.stash.clear();
115
116        let description = Description::new(self.lower.clone(), upper.clone(), Antichain::from_elem(M::Time::minimum()));
117        let seal = B::seal(&mut readied, description);
118        self.lower = upper;
119        seal
120    }
121
122    /// The frontier of elements remaining after the most recent call to `self.seal`.
123    #[inline]
124    fn frontier(&mut self) -> AntichainRef<'_, M::Time> {
125        self.frontier.borrow()
126    }
127}
128
129impl<Input, C, M: Merger> MergeBatcher<Input, C, M> {
130    /// Insert a chain and maintain chain properties: Chains are geometrically sized and ordered
131    /// by decreasing length.
132    fn insert_chain(&mut self, chain: Vec<M::Chunk>) {
133        if !chain.is_empty() {
134            self.chain_push(chain);
135            while self.chains.len() > 1 && (self.chains[self.chains.len() - 1].len() >= self.chains[self.chains.len() - 2].len() / 2) {
136                let list1 = self.chain_pop().unwrap();
137                let list2 = self.chain_pop().unwrap();
138                let merged = self.merge_by(list1, list2);
139                self.chain_push(merged);
140            }
141        }
142    }
143
144    // merges two sorted input lists into one sorted output list.
145    fn merge_by(&mut self, list1: Vec<M::Chunk>, list2: Vec<M::Chunk>) -> Vec<M::Chunk> {
146        // TODO: `list1` and `list2` get dropped; would be better to reuse?
147        let mut output = Vec::with_capacity(list1.len() + list2.len());
148        self.merger.merge(list1, list2, &mut output, &mut self.stash);
149
150        output
151    }
152
153    /// Pop a chain and account size changes.
154    #[inline]
155    fn chain_pop(&mut self) -> Option<Vec<M::Chunk>> {
156        let chain = self.chains.pop();
157        self.account(chain.iter().flatten().map(M::account), -1);
158        chain
159    }
160
161    /// Push a chain and account size changes.
162    #[inline]
163    fn chain_push(&mut self, chain: Vec<M::Chunk>) {
164        self.account(chain.iter().map(M::account), 1);
165        self.chains.push(chain);
166    }
167
168    /// Account size changes. Only performs work if a logger exists.
169    ///
170    /// Calculate the size based on the iterator passed along, with each attribute
171    /// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff.
172    #[inline]
173    fn account<I: IntoIterator<Item = (usize, usize, usize, usize)>>(&self, items: I, diff: isize) {
174        if let Some(logger) = &self.logger {
175            let (mut records, mut size, mut capacity, mut allocations) = (0isize, 0isize, 0isize, 0isize);
176            for (records_, size_, capacity_, allocations_) in items {
177                records = records.saturating_add_unsigned(records_);
178                size = size.saturating_add_unsigned(size_);
179                capacity = capacity.saturating_add_unsigned(capacity_);
180                allocations = allocations.saturating_add_unsigned(allocations_);
181            }
182            logger.log(BatcherEvent {
183                operator: self.operator_id,
184                records_diff: records * diff,
185                size_diff: size * diff,
186                capacity_diff: capacity * diff,
187                allocations_diff: allocations * diff,
188            })
189        }
190    }
191}
192
193impl<Input, C, M: Merger> Drop for MergeBatcher<Input, C, M> {
194    fn drop(&mut self) {
195        // Cleanup chain to retract accounting information.
196        while self.chain_pop().is_some() {}
197    }
198}
199
200/// A trait to describe interesting moments in a merge batcher.
201pub trait Merger: Default {
202    /// The internal representation of chunks of data.
203    type Chunk: Container;
204    /// The type of time in frontiers to extract updates.
205    type Time;
206    /// Merge chains into an output chain.
207    fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>);
208    /// Extract ready updates based on the `upper` frontier.
209    fn extract(
210        &mut self,
211        merged: Vec<Self::Chunk>,
212        upper: AntichainRef<Self::Time>,
213        frontier: &mut Antichain<Self::Time>,
214        readied: &mut Vec<Self::Chunk>,
215        kept: &mut Vec<Self::Chunk>,
216        stash: &mut Vec<Self::Chunk>,
217    );
218
219    /// Account size and allocation changes. Returns a tuple of (records, size, capacity, allocations).
220    fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize);
221}
222
223pub use container::{VecMerger, ColMerger};
224
225pub mod container {
226
227    //! A general purpose `Merger` implementation for arbitrary containers.
228    //!
229    //! The implementation requires implementations of two traits, `ContainerQueue` and `MergerChunk`.
230    //! The `ContainerQueue` trait is meant to wrap a container and provide iterable access to it, as
231    //! well as the ability to return the container when iteration is complete.
232    //! The `MergerChunk` trait is meant to be implemented by containers, and it explains how container
233    //! items should be interpreted with respect to times, and with respect to differences.
234    //! These two traits exist instead of a stack of constraints on the structure of the associated items
235    //! of the containers, allowing them to perform their functions without destructuring their guts.
236    //!
237    //! Standard implementations exist in the `vec`, `columnation`, and `flat_container` modules.
238
239    use std::cmp::Ordering;
240    use std::marker::PhantomData;
241    use timely::{Container, container::{PushInto, SizableContainer}};
242    use timely::progress::frontier::{Antichain, AntichainRef};
243    use timely::{Data, PartialOrder};
244
245    use crate::trace::implementations::merge_batcher::Merger;
246
247    /// An abstraction for a container that can be iterated over, and conclude by returning itself.
248    pub trait ContainerQueue<C: Container> {
249        /// Returns either the next item in the container, or the container itself.
250        fn next_or_alloc(&mut self) -> Result<C::Item<'_>, C>;
251        /// Indicates whether `next_or_alloc` will return `Ok`, and whether `peek` will return `Some`.
252        fn is_empty(&self) -> bool;
253        /// Compare the heads of two queues, where empty queues come last.
254        fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering;
255        /// Create a new queue from an existing container.
256        fn from(container: C) -> Self;
257    }
258
259    /// Behavior to dissect items of chunks in the merge batcher
260    pub trait MergerChunk : SizableContainer {
261        /// An owned time type.
262        ///
263        /// This type is provided so that users can maintain antichains of something, in order to track
264        /// the forward movement of time and extract intervals from chains of updates.
265        type TimeOwned;
266        /// The owned diff type.
267        ///
268        /// This type is provided so that users can provide an owned instance to the `push_and_add` method,
269        /// to act as a scratch space when the type is substantial and could otherwise require allocations.
270        type DiffOwned: Default;
271
272        /// Relates a borrowed time to antichains of owned times.
273        ///
274        /// If `upper` is less or equal to `time`, the method returns `true` and ensures that `frontier` reflects `time`.
275        fn time_kept(time1: &Self::Item<'_>, upper: &AntichainRef<Self::TimeOwned>, frontier: &mut Antichain<Self::TimeOwned>) -> bool;
276
277        /// Push an entry that adds together two diffs.
278        ///
279        /// This is only called when two items are deemed mergeable by the container queue.
280        /// If the two diffs added together is zero do not push anything.
281        fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, stash: &mut Self::DiffOwned);
282
283        /// Account the allocations behind the chunk.
284        // TODO: Find a more universal home for this: `Container`?
285        fn account(&self) -> (usize, usize, usize, usize) {
286            let (size, capacity, allocations) = (0, 0, 0);
287            (self.len(), size, capacity, allocations)
288        }
289    }
290
291    /// A merger for arbitrary containers.
292    ///
293    /// `MC` is a [`Container`] that implements [`MergerChunk`].
294    /// `CQ` is a [`ContainerQueue`] supporting `MC`.
295    pub struct ContainerMerger<MC, CQ> {
296        _marker: PhantomData<(MC, CQ)>,
297    }
298
299    impl<MC, CQ> Default for ContainerMerger<MC, CQ> {
300        fn default() -> Self {
301            Self { _marker: PhantomData, }
302        }
303    }
304
305    impl<MC: MergerChunk, CQ> ContainerMerger<MC, CQ> {
306        /// Helper to get pre-sized vector from the stash.
307        #[inline]
308        fn empty(&self, stash: &mut Vec<MC>) -> MC {
309            stash.pop().unwrap_or_else(|| {
310                let mut container = MC::default();
311                container.ensure_capacity(&mut None);
312                container
313            })
314        }
315        /// Helper to return a chunk to the stash.
316        #[inline]
317        fn recycle(&self, mut chunk: MC, stash: &mut Vec<MC>) {
318            // TODO: Should we only retain correctly sized containers?
319            chunk.clear();
320            stash.push(chunk);
321        }
322    }
323
324    impl<MC, CQ> Merger for ContainerMerger<MC, CQ>
325    where
326        for<'a> MC: MergerChunk<TimeOwned: Ord + PartialOrder + Data> + Clone + PushInto<<MC as Container>::Item<'a>> + 'static,
327        CQ: ContainerQueue<MC>,
328    {
329        type Time = MC::TimeOwned;
330        type Chunk = MC;
331
332        // TODO: Consider integrating with `ConsolidateLayout`.
333        fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>) {
334            let mut list1 = list1.into_iter();
335            let mut list2 = list2.into_iter();
336
337            let mut head1 = CQ::from(list1.next().unwrap_or_default());
338            let mut head2 = CQ::from(list2.next().unwrap_or_default());
339
340            let mut result = self.empty(stash);
341
342            let mut diff_owned = Default::default();
343
344            // while we have valid data in each input, merge.
345            while !head1.is_empty() && !head2.is_empty() {
346                while !result.at_capacity() && !head1.is_empty() && !head2.is_empty() {
347                    let cmp = head1.cmp_heads(&head2);
348                    // TODO: The following less/greater branches could plausibly be a good moment for
349                    // `copy_range`, on account of runs of records that might benefit more from a
350                    // `memcpy`.
351                    match cmp {
352                        Ordering::Less => {
353                            result.push_into(head1.next_or_alloc().ok().unwrap());
354                        }
355                        Ordering::Greater => {
356                            result.push_into(head2.next_or_alloc().ok().unwrap());
357                        }
358                        Ordering::Equal => {
359                            let item1 = head1.next_or_alloc().ok().unwrap();
360                            let item2 = head2.next_or_alloc().ok().unwrap();
361                            result.push_and_add(item1, item2, &mut diff_owned);
362                       }
363                    }
364                }
365
366                if result.at_capacity() {
367                    output.push_into(result);
368                    result = self.empty(stash);
369                }
370
371                if head1.is_empty() {
372                    self.recycle(head1.next_or_alloc().err().unwrap(), stash);
373                    head1 = CQ::from(list1.next().unwrap_or_default());
374                }
375                if head2.is_empty() {
376                    self.recycle(head2.next_or_alloc().err().unwrap(), stash);
377                    head2 = CQ::from(list2.next().unwrap_or_default());
378                }
379            }
380
381            // TODO: recycle `head1` rather than discarding.
382            while let Ok(next) = head1.next_or_alloc() {
383                result.push_into(next);
384                if result.at_capacity() {
385                    output.push_into(result);
386                    result = self.empty(stash);
387                }
388            }
389            if !result.is_empty() {
390                output.push_into(result);
391                result = self.empty(stash);
392            }
393            output.extend(list1);
394
395            // TODO: recycle `head2` rather than discarding.
396            while let Ok(next) = head2.next_or_alloc() {
397                result.push_into(next);
398                if result.at_capacity() {
399                    output.push(result);
400                    result = self.empty(stash);
401                }
402            }
403            if !result.is_empty() {
404                output.push_into(result);
405                // result = self.empty(stash);
406            }
407            output.extend(list2);
408        }
409
410        fn extract(
411            &mut self,
412            merged: Vec<Self::Chunk>,
413            upper: AntichainRef<Self::Time>,
414            frontier: &mut Antichain<Self::Time>,
415            readied: &mut Vec<Self::Chunk>,
416            kept: &mut Vec<Self::Chunk>,
417            stash: &mut Vec<Self::Chunk>,
418        ) {
419            let mut keep = self.empty(stash);
420            let mut ready = self.empty(stash);
421
422            for mut buffer in merged {
423                for item in buffer.drain() {
424                    if MC::time_kept(&item, &upper, frontier) {
425                        if keep.at_capacity() && !keep.is_empty() {
426                            kept.push(keep);
427                            keep = self.empty(stash);
428                        }
429                        keep.push_into(item);
430                    } else {
431                        if ready.at_capacity() && !ready.is_empty() {
432                            readied.push(ready);
433                            ready = self.empty(stash);
434                        }
435                        ready.push_into(item);
436                    }
437                }
438                // Recycling buffer.
439                self.recycle(buffer, stash);
440            }
441            // Finish the kept data.
442            if !keep.is_empty() {
443                kept.push(keep);
444            }
445            if !ready.is_empty() {
446                readied.push(ready);
447            }
448        }
449
450        /// Account the allocations behind the chunk.
451        fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
452            chunk.account()
453        }
454    }
455
456    pub use vec::VecMerger;
457    /// Implementations of `ContainerQueue` and `MergerChunk` for `Vec` containers.
458    pub mod vec {
459
460        use std::collections::VecDeque;
461        use timely::progress::{Antichain, frontier::AntichainRef};
462        use crate::difference::Semigroup;
463        use super::{ContainerQueue, MergerChunk};
464
465        /// A `Merger` implementation backed by vector containers.
466        pub type VecMerger<D, T, R> = super::ContainerMerger<Vec<(D, T, R)>, std::collections::VecDeque<(D, T, R)>>;
467
468        impl<D: Ord, T: Ord, R> ContainerQueue<Vec<(D, T, R)>> for VecDeque<(D, T, R)> {
469            fn next_or_alloc(&mut self) -> Result<(D, T, R), Vec<(D, T, R)>> {
470                if self.is_empty() {
471                    Err(Vec::from(std::mem::take(self)))
472                }
473                else {
474                    Ok(self.pop_front().unwrap())
475                }
476            }
477            fn is_empty(&self) -> bool {
478                self.is_empty()
479            }
480            fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering {
481                let (data1, time1, _) = self.front().unwrap();
482                let (data2, time2, _) = other.front().unwrap();
483                (data1, time1).cmp(&(data2, time2))
484            }
485            fn from(list: Vec<(D, T, R)>) -> Self {
486                <Self as From<_>>::from(list)
487            }
488        }
489
490        impl<D: Ord + 'static, T: Ord + timely::PartialOrder + Clone + 'static, R: Semigroup + 'static> MergerChunk for Vec<(D, T, R)> {
491            type TimeOwned = T;
492            type DiffOwned = ();
493
494            fn time_kept((_, time, _): &Self::Item<'_>, upper: &AntichainRef<Self::TimeOwned>, frontier: &mut Antichain<Self::TimeOwned>) -> bool {
495                if upper.less_equal(time) {
496                    frontier.insert_with(&time, |time| time.clone());
497                    true
498                }
499                else { false }
500            }
501            fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, _stash: &mut Self::DiffOwned) {
502                let (data, time, mut diff1) = item1;
503                let (_data, _time, diff2) = item2;
504                diff1.plus_equals(&diff2);
505                if !diff1.is_zero() {
506                    self.push((data, time, diff1));
507                }
508            }
509            fn account(&self) -> (usize, usize, usize, usize) {
510                let (size, capacity, allocations) = (0, 0, 0);
511                (self.len(), size, capacity, allocations)
512            }
513        }
514    }
515
516    pub use columnation::ColMerger;
517    /// Implementations of `ContainerQueue` and `MergerChunk` for `TimelyStack` containers (columnation).
518    pub mod columnation {
519
520        use timely::progress::{Antichain, frontier::AntichainRef};
521        use columnation::Columnation;
522
523        use crate::containers::TimelyStack;
524        use crate::difference::Semigroup;
525
526        use super::{ContainerQueue, MergerChunk};
527
528        /// A `Merger` implementation backed by `TimelyStack` containers (columnation).
529        pub type ColMerger<D, T, R> = super::ContainerMerger<TimelyStack<(D,T,R)>,TimelyStackQueue<(D, T, R)>>;
530
531        /// TODO
532        pub struct TimelyStackQueue<T: Columnation> {
533            list: TimelyStack<T>,
534            head: usize,
535        }
536
537        impl<D: Ord + Columnation, T: Ord + Columnation, R: Columnation> ContainerQueue<TimelyStack<(D, T, R)>> for TimelyStackQueue<(D, T, R)> {
538            fn next_or_alloc(&mut self) -> Result<&(D, T, R), TimelyStack<(D, T, R)>> {
539                if self.is_empty() {
540                    Err(std::mem::take(&mut self.list))
541                }
542                else {
543                    Ok(self.pop())
544                }
545            }
546            fn is_empty(&self) -> bool {
547                self.head == self.list[..].len()
548            }
549            fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering {
550                let (data1, time1, _) = self.peek();
551                let (data2, time2, _) = other.peek();
552                (data1, time1).cmp(&(data2, time2))
553            }
554            fn from(list: TimelyStack<(D, T, R)>) -> Self {
555                TimelyStackQueue { list, head: 0 }
556            }
557        }
558
559        impl<T: Columnation> TimelyStackQueue<T> {
560            fn pop(&mut self) -> &T {
561                self.head += 1;
562                &self.list[self.head - 1]
563            }
564
565            fn peek(&self) -> &T {
566                &self.list[self.head]
567            }
568        }
569
570        impl<D: Ord + Columnation + 'static, T: Ord + timely::PartialOrder + Clone + Columnation + 'static, R: Default + Semigroup + Columnation + 'static> MergerChunk for TimelyStack<(D, T, R)> {
571            type TimeOwned = T;
572            type DiffOwned = R;
573
574            fn time_kept((_, time, _): &Self::Item<'_>, upper: &AntichainRef<Self::TimeOwned>, frontier: &mut Antichain<Self::TimeOwned>) -> bool {
575                if upper.less_equal(time) {
576                    frontier.insert_with(&time, |time| time.clone());
577                    true
578                }
579                else { false }
580            }
581            fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, stash: &mut Self::DiffOwned) {
582                let (data, time, diff1) = item1;
583                let (_data, _time, diff2) = item2;
584                stash.clone_from(diff1);
585                stash.plus_equals(&diff2);
586                if !stash.is_zero() {
587                    self.copy_destructured(data, time, stash);
588                }
589            }
590            fn account(&self) -> (usize, usize, usize, usize) {
591                let (mut size, mut capacity, mut allocations) = (0, 0, 0);
592                let cb = |siz, cap| {
593                    size += siz;
594                    capacity += cap;
595                    allocations += 1;
596                };
597                self.heap_size(cb);
598                (self.len(), size, capacity, allocations)
599            }
600        }
601    }
602}