Skip to main content

differential_dataflow/trace/implementations/
merge_batcher.rs

1//! A `Batcher` implementation based on merge sort.
2//!
3//! The `MergeBatcher` requires support from two types, a "chunker" and a "merger".
4//! The chunker receives input batches and consolidates them, producing sorted output
5//! "chunks" that are fully consolidated (no adjacent updates can be accumulated).
6//! The merger implements the [`Merger`] trait, and provides hooks for manipulating
7//! sorted "chains" of chunks as needed by the merge batcher: merging chunks and also
8//! splitting them apart based on time.
9//!
10//! Implementations of `MergeBatcher` can be instantiated through the choice of both
11//! the chunker and the merger, provided their respective output and input types align.
12
13use std::marker::PhantomData;
14
15use timely::progress::frontier::AntichainRef;
16use timely::progress::{frontier::Antichain, Timestamp};
17use timely::container::{ContainerBuilder, PushInto};
18
19use crate::logging::{BatcherEvent, Logger};
20use crate::trace::{Batcher, Builder, Description};
21
22/// Creates batches from containers of unordered tuples.
23///
24/// To implement `Batcher`, the container builder `C` must accept `&mut Input` as inputs,
25/// and must produce outputs of type `M::Chunk`.
26pub struct MergeBatcher<Input, C, M: Merger> {
27    /// Transforms input streams to chunks of sorted, consolidated data.
28    chunker: C,
29    /// A sequence of power-of-two length lists of sorted, consolidated containers.
30    ///
31    /// Do not push/pop directly but use the corresponding functions ([`Self::chain_push`]/[`Self::chain_pop`]).
32    chains: Vec<Vec<M::Chunk>>,
33    /// Stash of empty chunks, recycled through the merging process.
34    stash: Vec<M::Chunk>,
35    /// Merges consolidated chunks, and extracts the subset of an update chain that lies in an interval of time.
36    merger: M,
37    /// Current lower frontier, we sealed up to here.
38    lower: Antichain<M::Time>,
39    /// The lower-bound frontier of the data, after the last call to seal.
40    frontier: Antichain<M::Time>,
41    /// Logger for size accounting.
42    logger: Option<Logger>,
43    /// Timely operator ID.
44    operator_id: usize,
45    /// The `Input` type needs to be called out as the type of container accepted, but it is not otherwise present.
46    _marker: PhantomData<Input>,
47}
48
49impl<Input, C, M> Batcher for MergeBatcher<Input, C, M>
50where
51    C: ContainerBuilder<Container=M::Chunk> + for<'a> PushInto<&'a mut Input>,
52    M: Merger<Time: Timestamp>,
53{
54    type Input = Input;
55    type Time = M::Time;
56    type Output = M::Chunk;
57
58    fn new(logger: Option<Logger>, operator_id: usize) -> Self {
59        Self {
60            logger,
61            operator_id,
62            chunker: C::default(),
63            merger: M::default(),
64            chains: Vec::new(),
65            stash: Vec::new(),
66            frontier: Antichain::new(),
67            lower: Antichain::from_elem(M::Time::minimum()),
68            _marker: PhantomData,
69        }
70    }
71
72    /// Push a container of data into this merge batcher. Updates the internal chain structure if
73    /// needed.
74    fn push_container(&mut self, container: &mut Input) {
75        self.chunker.push_into(container);
76        while let Some(chunk) = self.chunker.extract() {
77            let chunk = std::mem::take(chunk);
78            self.insert_chain(vec![chunk]);
79        }
80    }
81
82    // Sealing a batch means finding those updates with times not greater or equal to any time
83    // in `upper`. All updates must have time greater or equal to the previously used `upper`,
84    // which we call `lower`, by assumption that after sealing a batcher we receive no more
85    // updates with times not greater or equal to `upper`.
86    fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(&mut self, upper: Antichain<M::Time>) -> B::Output {
87        // Finish
88        while let Some(chunk) = self.chunker.finish() {
89            let chunk = std::mem::take(chunk);
90            self.insert_chain(vec![chunk]);
91        }
92
93        // Merge all remaining chains into a single chain.
94        while self.chains.len() > 1 {
95            let list1 = self.chain_pop().unwrap();
96            let list2 = self.chain_pop().unwrap();
97            let merged = self.merge_by(list1, list2);
98            self.chain_push(merged);
99        }
100        let merged = self.chain_pop().unwrap_or_default();
101
102        // Extract readied data.
103        let mut kept = Vec::new();
104        let mut readied = Vec::new();
105        self.frontier.clear();
106
107        self.merger.extract(merged, upper.borrow(), &mut self.frontier, &mut readied, &mut kept, &mut self.stash);
108
109        if !kept.is_empty() {
110            self.chain_push(kept);
111        }
112
113        self.stash.clear();
114
115        let description = Description::new(self.lower.clone(), upper.clone(), Antichain::from_elem(M::Time::minimum()));
116        let seal = B::seal(&mut readied, description);
117        self.lower = upper;
118        seal
119    }
120
121    /// The frontier of elements remaining after the most recent call to `self.seal`.
122    #[inline]
123    fn frontier(&mut self) -> AntichainRef<'_, M::Time> {
124        self.frontier.borrow()
125    }
126}
127
128impl<Input, C, M: Merger> MergeBatcher<Input, C, M> {
129    /// Insert a chain and maintain chain properties: Chains are geometrically sized and ordered
130    /// by decreasing length.
131    fn insert_chain(&mut self, chain: Vec<M::Chunk>) {
132        if !chain.is_empty() {
133            self.chain_push(chain);
134            while self.chains.len() > 1 && (self.chains[self.chains.len() - 1].len() >= self.chains[self.chains.len() - 2].len() / 2) {
135                let list1 = self.chain_pop().unwrap();
136                let list2 = self.chain_pop().unwrap();
137                let merged = self.merge_by(list1, list2);
138                self.chain_push(merged);
139            }
140        }
141    }
142
143    // merges two sorted input lists into one sorted output list.
144    fn merge_by(&mut self, list1: Vec<M::Chunk>, list2: Vec<M::Chunk>) -> Vec<M::Chunk> {
145        // TODO: `list1` and `list2` get dropped; would be better to reuse?
146        let mut output = Vec::with_capacity(list1.len() + list2.len());
147        self.merger.merge(list1, list2, &mut output, &mut self.stash);
148
149        output
150    }
151
152    /// Pop a chain and account size changes.
153    #[inline]
154    fn chain_pop(&mut self) -> Option<Vec<M::Chunk>> {
155        let chain = self.chains.pop();
156        self.account(chain.iter().flatten().map(M::account), -1);
157        chain
158    }
159
160    /// Push a chain and account size changes.
161    #[inline]
162    fn chain_push(&mut self, chain: Vec<M::Chunk>) {
163        self.account(chain.iter().map(M::account), 1);
164        self.chains.push(chain);
165    }
166
167    /// Account size changes. Only performs work if a logger exists.
168    ///
169    /// Calculate the size based on the iterator passed along, with each attribute
170    /// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff.
171    #[inline]
172    fn account<I: IntoIterator<Item = (usize, usize, usize, usize)>>(&self, items: I, diff: isize) {
173        if let Some(logger) = &self.logger {
174            let (mut records, mut size, mut capacity, mut allocations) = (0isize, 0isize, 0isize, 0isize);
175            for (records_, size_, capacity_, allocations_) in items {
176                records = records.saturating_add_unsigned(records_);
177                size = size.saturating_add_unsigned(size_);
178                capacity = capacity.saturating_add_unsigned(capacity_);
179                allocations = allocations.saturating_add_unsigned(allocations_);
180            }
181            logger.log(BatcherEvent {
182                operator: self.operator_id,
183                records_diff: records * diff,
184                size_diff: size * diff,
185                capacity_diff: capacity * diff,
186                allocations_diff: allocations * diff,
187            })
188        }
189    }
190}
191
192impl<Input, C, M: Merger> Drop for MergeBatcher<Input, C, M> {
193    fn drop(&mut self) {
194        // Cleanup chain to retract accounting information.
195        while self.chain_pop().is_some() {}
196    }
197}
198
199/// A trait to describe interesting moments in a merge batcher.
200pub trait Merger: Default {
201    /// The internal representation of chunks of data.
202    type Chunk: Default;
203    /// The type of time in frontiers to extract updates.
204    type Time;
205    /// Merge chains into an output chain.
206    fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>);
207    /// Extract ready updates based on the `upper` frontier.
208    fn extract(
209        &mut self,
210        merged: Vec<Self::Chunk>,
211        upper: AntichainRef<Self::Time>,
212        frontier: &mut Antichain<Self::Time>,
213        readied: &mut Vec<Self::Chunk>,
214        kept: &mut Vec<Self::Chunk>,
215        stash: &mut Vec<Self::Chunk>,
216    );
217
218    /// Account size and allocation changes. Returns a tuple of (records, size, capacity, allocations).
219    fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize);
220}
221
222pub use container::InternalMerger;
223
224pub mod container {
225
226    //! Merger implementations for the merge batcher.
227    //!
228    //! The `InternalMerge` trait allows containers to merge sorted, consolidated
229    //! data using internal iteration. The `InternalMerger` type implements the
230    //! `Merger` trait using `InternalMerge`, and is the standard merger for all
231    //! container types.
232
233    use std::marker::PhantomData;
234    use timely::container::SizableContainer;
235    use timely::progress::frontier::{Antichain, AntichainRef};
236    use timely::{Accountable, PartialOrder};
237    use crate::trace::implementations::merge_batcher::Merger;
238
239    /// A container that supports the operations needed by the merge batcher:
240    /// merging sorted chains and extracting updates by time.
241    pub trait InternalMerge: Accountable + SizableContainer + Default {
242        /// The owned time type, for maintaining antichains.
243        type TimeOwned;
244
245        /// The number of items in this container.
246        fn len(&self) -> usize;
247
248        /// Clear the container for reuse.
249        fn clear(&mut self);
250
251        /// Account the allocations behind the chunk.
252        fn account(&self) -> (usize, usize, usize, usize) {
253            let (size, capacity, allocations) = (0, 0, 0);
254            (usize::try_from(self.record_count()).unwrap(), size, capacity, allocations)
255        }
256
257        /// Merge items from sorted inputs into `self`, advancing positions.
258        /// Merges until `self` is at capacity or all inputs are exhausted.
259        ///
260        /// Dispatches based on the number of inputs:
261        /// - **0**: no-op
262        /// - **1**: bulk copy (may swap the input into `self`)
263        /// - **2**: merge two sorted streams
264        fn merge_from(
265            &mut self,
266            others: &mut [Self],
267            positions: &mut [usize],
268        );
269
270        /// Extract updates from `self` into `ship` (times not beyond `upper`)
271        /// and `keep` (times beyond `upper`), updating `frontier` with kept times.
272        fn extract(
273            &mut self,
274            upper: AntichainRef<Self::TimeOwned>,
275            frontier: &mut Antichain<Self::TimeOwned>,
276            keep: &mut Self,
277            ship: &mut Self,
278        );
279    }
280
281    /// A `Merger` using internal iteration for `Vec` containers.
282    pub type VecInternalMerger<D, T, R> = InternalMerger<Vec<(D, T, R)>>;
283    /// A `Merger` using internal iteration for `TimelyStack` containers.
284    pub type ColInternalMerger<D, T, R> = InternalMerger<crate::containers::TimelyStack<(D, T, R)>>;
285
286    /// A merger that uses internal iteration via [`InternalMerge`].
287    pub struct InternalMerger<MC> {
288        _marker: PhantomData<MC>,
289    }
290
291    impl<MC> Default for InternalMerger<MC> {
292        fn default() -> Self { Self { _marker: PhantomData } }
293    }
294
295    impl<MC> InternalMerger<MC> where MC: InternalMerge {
296        #[inline]
297        fn empty(&self, stash: &mut Vec<MC>) -> MC {
298            stash.pop().unwrap_or_else(|| {
299                let mut container = MC::default();
300                container.ensure_capacity(&mut None);
301                container
302            })
303        }
304        #[inline]
305        fn recycle(&self, mut chunk: MC, stash: &mut Vec<MC>) {
306            chunk.clear();
307            stash.push(chunk);
308        }
309        /// Drain remaining items from one side into `result`/`output`.
310        fn drain_side(
311            &self,
312            head: &mut MC,
313            pos: &mut usize,
314            list: &mut std::vec::IntoIter<MC>,
315            result: &mut MC,
316            output: &mut Vec<MC>,
317            stash: &mut Vec<MC>,
318        ) {
319            while *pos < head.len() {
320                result.merge_from(
321                    std::slice::from_mut(head),
322                    std::slice::from_mut(pos),
323                );
324                if *pos >= head.len() {
325                    let old = std::mem::replace(head, list.next().unwrap_or_default());
326                    self.recycle(old, stash);
327                    *pos = 0;
328                }
329                if result.at_capacity() {
330                    output.push(std::mem::take(result));
331                    *result = self.empty(stash);
332                }
333            }
334        }
335    }
336
337    impl<MC> Merger for InternalMerger<MC>
338    where
339        MC: InternalMerge<TimeOwned: Ord + PartialOrder + Clone + 'static> + 'static,
340    {
341        type Time = MC::TimeOwned;
342        type Chunk = MC;
343
344        fn merge(&mut self, list1: Vec<MC>, list2: Vec<MC>, output: &mut Vec<MC>, stash: &mut Vec<MC>) {
345            let mut list1 = list1.into_iter();
346            let mut list2 = list2.into_iter();
347
348            let mut heads = [list1.next().unwrap_or_default(), list2.next().unwrap_or_default()];
349            let mut positions = [0usize, 0usize];
350
351            let mut result = self.empty(stash);
352
353            // Main merge loop: both sides have data.
354            while positions[0] < heads[0].len() && positions[1] < heads[1].len() {
355                result.merge_from(&mut heads, &mut positions);
356
357                if positions[0] >= heads[0].len() {
358                    let old = std::mem::replace(&mut heads[0], list1.next().unwrap_or_default());
359                    self.recycle(old, stash);
360                    positions[0] = 0;
361                }
362                if positions[1] >= heads[1].len() {
363                    let old = std::mem::replace(&mut heads[1], list2.next().unwrap_or_default());
364                    self.recycle(old, stash);
365                    positions[1] = 0;
366                }
367                if result.at_capacity() {
368                    output.push(std::mem::take(&mut result));
369                    result = self.empty(stash);
370                }
371            }
372
373            // Drain remaining from side 0.
374            self.drain_side(&mut heads[0], &mut positions[0], &mut list1, &mut result, output, stash);
375            if !result.is_empty() {
376                output.push(std::mem::take(&mut result));
377                result = self.empty(stash);
378            }
379            output.extend(list1);
380
381            // Drain remaining from side 1.
382            self.drain_side(&mut heads[1], &mut positions[1], &mut list2, &mut result, output, stash);
383            if !result.is_empty() {
384                output.push(std::mem::take(&mut result));
385            }
386            output.extend(list2);
387        }
388
389        fn extract(
390            &mut self,
391            merged: Vec<Self::Chunk>,
392            upper: AntichainRef<Self::Time>,
393            frontier: &mut Antichain<Self::Time>,
394            ship: &mut Vec<Self::Chunk>,
395            kept: &mut Vec<Self::Chunk>,
396            stash: &mut Vec<Self::Chunk>,
397        ) {
398            let mut keep = self.empty(stash);
399            let mut ready = self.empty(stash);
400
401            for mut buffer in merged {
402                buffer.extract(upper, frontier, &mut keep, &mut ready);
403                self.recycle(buffer, stash);
404                if keep.at_capacity() {
405                    kept.push(std::mem::take(&mut keep));
406                    keep = self.empty(stash);
407                }
408                if ready.at_capacity() {
409                    ship.push(std::mem::take(&mut ready));
410                    ready = self.empty(stash);
411                }
412            }
413            if !keep.is_empty() {
414                kept.push(keep);
415            }
416            if !ready.is_empty() {
417                ship.push(ready);
418            }
419        }
420
421        fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
422            chunk.account()
423        }
424    }
425
426    /// Implementation of `InternalMerge` for `Vec<(D, T, R)>`.
427    pub mod vec_internal {
428        use std::cmp::Ordering;
429        use timely::PartialOrder;
430        use timely::container::SizableContainer;
431        use timely::progress::frontier::{Antichain, AntichainRef};
432        use crate::difference::Semigroup;
433        use super::InternalMerge;
434
435        impl<D: Ord + Clone + 'static, T: Ord + Clone + PartialOrder + 'static, R: Semigroup + Clone + 'static> InternalMerge for Vec<(D, T, R)> {
436            type TimeOwned = T;
437
438            fn len(&self) -> usize { Vec::len(self) }
439            fn clear(&mut self) { Vec::clear(self) }
440
441            fn merge_from(
442                &mut self,
443                others: &mut [Self],
444                positions: &mut [usize],
445            ) {
446                match others.len() {
447                    0 => {},
448                    1 => {
449                        let other = &mut others[0];
450                        let pos = &mut positions[0];
451                        if self.is_empty() && *pos == 0 {
452                            std::mem::swap(self, other);
453                            return;
454                        }
455                        self.extend_from_slice(&other[*pos ..]);
456                        *pos = other.len();
457                    },
458                    2 => {
459                        let (left, right) = others.split_at_mut(1);
460                        let other1 = &mut left[0];
461                        let other2 = &mut right[0];
462
463                        while positions[0] < other1.len() && positions[1] < other2.len() && !self.at_capacity() {
464                            let (d1, t1, _) = &other1[positions[0]];
465                            let (d2, t2, _) = &other2[positions[1]];
466                            match (d1, t1).cmp(&(d2, t2)) {
467                                Ordering::Less => {
468                                    self.push(other1[positions[0]].clone());
469                                    positions[0] += 1;
470                                }
471                                Ordering::Greater => {
472                                    self.push(other2[positions[1]].clone());
473                                    positions[1] += 1;
474                                }
475                                Ordering::Equal => {
476                                    let (d, t, mut r1) = other1[positions[0]].clone();
477                                    let (_, _, ref r2) = other2[positions[1]];
478                                    r1.plus_equals(r2);
479                                    if !r1.is_zero() {
480                                        self.push((d, t, r1));
481                                    }
482                                    positions[0] += 1;
483                                    positions[1] += 1;
484                                }
485                            }
486                        }
487                    },
488                    n => unimplemented!("{n}-way merge not yet supported"),
489                }
490            }
491
492            fn extract(
493                &mut self,
494                upper: AntichainRef<T>,
495                frontier: &mut Antichain<T>,
496                keep: &mut Self,
497                ship: &mut Self,
498            ) {
499                for (data, time, diff) in self.drain(..) {
500                    if upper.less_equal(&time) {
501                        frontier.insert_with(&time, |time| time.clone());
502                        keep.push((data, time, diff));
503                    } else {
504                        ship.push((data, time, diff));
505                    }
506                }
507            }
508        }
509    }
510
511    /// Implementation of `InternalMerge` for `TimelyStack<(D, T, R)>`.
512    pub mod columnation_internal {
513        use std::cmp::Ordering;
514        use columnation::Columnation;
515        use timely::PartialOrder;
516        use timely::container::SizableContainer;
517        use timely::progress::frontier::{Antichain, AntichainRef};
518        use crate::containers::TimelyStack;
519        use crate::difference::Semigroup;
520        use super::InternalMerge;
521
522        impl<D, T, R> InternalMerge for TimelyStack<(D, T, R)>
523        where
524            D: Ord + Columnation + Clone + 'static,
525            T: Ord + Columnation + Clone + PartialOrder + 'static,
526            R: Default + Semigroup + Columnation + Clone + 'static,
527        {
528            type TimeOwned = T;
529
530            fn len(&self) -> usize { self[..].len() }
531            fn clear(&mut self) { TimelyStack::clear(self) }
532
533            fn account(&self) -> (usize, usize, usize, usize) {
534                let (mut size, mut capacity, mut allocations) = (0, 0, 0);
535                let cb = |siz, cap| {
536                    size += siz;
537                    capacity += cap;
538                    allocations += 1;
539                };
540                self.heap_size(cb);
541                (self.len(), size, capacity, allocations)
542            }
543
544            fn merge_from(
545                &mut self,
546                others: &mut [Self],
547                positions: &mut [usize],
548            ) {
549                match others.len() {
550                    0 => {},
551                    1 => {
552                        let other = &mut others[0];
553                        let pos = &mut positions[0];
554                        if self[..].is_empty() && *pos == 0 {
555                            std::mem::swap(self, other);
556                            return;
557                        }
558                        for i in *pos .. other[..].len() {
559                            self.copy(&other[i]);
560                        }
561                        *pos = other[..].len();
562                    },
563                    2 => {
564                        let (left, right) = others.split_at_mut(1);
565                        let other1 = &left[0];
566                        let other2 = &right[0];
567
568                        let mut stash = R::default();
569
570                        while positions[0] < other1[..].len() && positions[1] < other2[..].len() && !self.at_capacity() {
571                            let (d1, t1, _) = &other1[positions[0]];
572                            let (d2, t2, _) = &other2[positions[1]];
573                            match (d1, t1).cmp(&(d2, t2)) {
574                                Ordering::Less => {
575                                    self.copy(&other1[positions[0]]);
576                                    positions[0] += 1;
577                                }
578                                Ordering::Greater => {
579                                    self.copy(&other2[positions[1]]);
580                                    positions[1] += 1;
581                                }
582                                Ordering::Equal => {
583                                    let (_, _, r1) = &other1[positions[0]];
584                                    let (_, _, r2) = &other2[positions[1]];
585                                    stash.clone_from(r1);
586                                    stash.plus_equals(r2);
587                                    if !stash.is_zero() {
588                                        let (d, t, _) = &other1[positions[0]];
589                                        self.copy_destructured(d, t, &stash);
590                                    }
591                                    positions[0] += 1;
592                                    positions[1] += 1;
593                                }
594                            }
595                        }
596                    },
597                    n => unimplemented!("{n}-way merge not yet supported"),
598                }
599            }
600
601            fn extract(
602                &mut self,
603                upper: AntichainRef<T>,
604                frontier: &mut Antichain<T>,
605                keep: &mut Self,
606                ship: &mut Self,
607            ) {
608                for (data, time, diff) in self.iter() {
609                    if upper.less_equal(time) {
610                        frontier.insert_with(time, |time| time.clone());
611                        keep.copy_destructured(data, time, diff);
612                    } else {
613                        ship.copy_destructured(data, time, diff);
614                    }
615                }
616            }
617        }
618    }
619}