Skip to main content

differential_dataflow/trace/implementations/
merge_batcher.rs

1//! A `Batcher` implementation based on merge sort.
2//!
3//! The `MergeBatcher` requires support from two types, a "chunker" and a "merger".
4//! The chunker receives input batches and consolidates them, producing sorted output
5//! "chunks" that are fully consolidated (no adjacent updates can be accumulated).
6//! The merger implements the [`Merger`] trait, and provides hooks for manipulating
7//! sorted "chains" of chunks as needed by the merge batcher: merging chunks and also
8//! splitting them apart based on time.
9//!
10//! Implementations of `MergeBatcher` can be instantiated through the choice of both
11//! the chunker and the merger, provided their respective output and input types align.
12
13use std::marker::PhantomData;
14
15use timely::progress::frontier::AntichainRef;
16use timely::progress::{frontier::Antichain, Timestamp};
17use timely::container::{ContainerBuilder, PushInto};
18
19use crate::logging::{BatcherEvent, Logger};
20use crate::trace::{Batcher, Builder, Description};
21
22/// Creates batches from containers of unordered tuples.
23///
24/// To implement `Batcher`, the container builder `C` must accept `&mut Input` as inputs,
25/// and must produce outputs of type `M::Chunk`.
26pub struct MergeBatcher<Input, C, M: Merger> {
27    /// Transforms input streams to chunks of sorted, consolidated data.
28    chunker: C,
29    /// A sequence of power-of-two length lists of sorted, consolidated containers.
30    ///
31    /// Do not push/pop directly but use the corresponding functions ([`Self::chain_push`]/[`Self::chain_pop`]).
32    chains: Vec<Vec<M::Chunk>>,
33    /// Stash of empty chunks, recycled through the merging process.
34    stash: Vec<M::Chunk>,
35    /// Merges consolidated chunks, and extracts the subset of an update chain that lies in an interval of time.
36    merger: M,
37    /// Current lower frontier, we sealed up to here.
38    lower: Antichain<M::Time>,
39    /// The lower-bound frontier of the data, after the last call to seal.
40    frontier: Antichain<M::Time>,
41    /// Logger for size accounting.
42    logger: Option<Logger>,
43    /// Timely operator ID.
44    operator_id: usize,
45    /// The `Input` type needs to be called out as the type of container accepted, but it is not otherwise present.
46    _marker: PhantomData<Input>,
47}
48
49impl<Input, C, M> Batcher for MergeBatcher<Input, C, M>
50where
51    C: ContainerBuilder<Container=M::Chunk> + for<'a> PushInto<&'a mut Input>,
52    M: Merger<Time: Timestamp>,
53{
54    type Input = Input;
55    type Time = M::Time;
56    type Output = M::Chunk;
57
58    fn new(logger: Option<Logger>, operator_id: usize) -> Self {
59        Self {
60            logger,
61            operator_id,
62            chunker: C::default(),
63            merger: M::default(),
64            chains: Vec::new(),
65            stash: Vec::new(),
66            frontier: Antichain::new(),
67            lower: Antichain::from_elem(M::Time::minimum()),
68            _marker: PhantomData,
69        }
70    }
71
72    /// Push a container of data into this merge batcher. Updates the internal chain structure if
73    /// needed.
74    fn push_container(&mut self, container: &mut Input) {
75        self.chunker.push_into(container);
76        while let Some(chunk) = self.chunker.extract() {
77            let chunk = std::mem::take(chunk);
78            self.insert_chain(vec![chunk]);
79        }
80    }
81
82    // Sealing a batch means finding those updates with times not greater or equal to any time
83    // in `upper`. All updates must have time greater or equal to the previously used `upper`,
84    // which we call `lower`, by assumption that after sealing a batcher we receive no more
85    // updates with times not greater or equal to `upper`.
86    fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(&mut self, upper: Antichain<M::Time>) -> B::Output {
87        // Finish
88        while let Some(chunk) = self.chunker.finish() {
89            let chunk = std::mem::take(chunk);
90            self.insert_chain(vec![chunk]);
91        }
92
93        // Merge all remaining chains into a single chain.
94        while self.chains.len() > 1 {
95            let list1 = self.chain_pop().unwrap();
96            let list2 = self.chain_pop().unwrap();
97            let merged = self.merge_by(list1, list2);
98            self.chain_push(merged);
99        }
100        let merged = self.chain_pop().unwrap_or_default();
101
102        // Extract readied data.
103        let mut kept = Vec::new();
104        let mut readied = Vec::new();
105        self.frontier.clear();
106
107        self.merger.extract(merged, upper.borrow(), &mut self.frontier, &mut readied, &mut kept, &mut self.stash);
108
109        if !kept.is_empty() {
110            self.chain_push(kept);
111        }
112
113        self.stash.clear();
114
115        let description = Description::new(self.lower.clone(), upper.clone(), Antichain::from_elem(M::Time::minimum()));
116        let seal = B::seal(&mut readied, description);
117        self.lower = upper;
118        seal
119    }
120
121    /// The frontier of elements remaining after the most recent call to `self.seal`.
122    #[inline]
123    fn frontier(&mut self) -> AntichainRef<'_, M::Time> {
124        self.frontier.borrow()
125    }
126}
127
128impl<Input, C, M: Merger> MergeBatcher<Input, C, M> {
129    /// Insert a chain and maintain chain properties: Chains are geometrically sized and ordered
130    /// by decreasing length.
131    fn insert_chain(&mut self, chain: Vec<M::Chunk>) {
132        if !chain.is_empty() {
133            self.chain_push(chain);
134            while self.chains.len() > 1 && (self.chains[self.chains.len() - 1].len() >= self.chains[self.chains.len() - 2].len() / 2) {
135                let list1 = self.chain_pop().unwrap();
136                let list2 = self.chain_pop().unwrap();
137                let merged = self.merge_by(list1, list2);
138                self.chain_push(merged);
139            }
140        }
141    }
142
143    // merges two sorted input lists into one sorted output list.
144    fn merge_by(&mut self, list1: Vec<M::Chunk>, list2: Vec<M::Chunk>) -> Vec<M::Chunk> {
145        // TODO: `list1` and `list2` get dropped; would be better to reuse?
146        let mut output = Vec::with_capacity(list1.len() + list2.len());
147        self.merger.merge(list1, list2, &mut output, &mut self.stash);
148
149        output
150    }
151
152    /// Pop a chain and account size changes.
153    #[inline]
154    fn chain_pop(&mut self) -> Option<Vec<M::Chunk>> {
155        let chain = self.chains.pop();
156        self.account(chain.iter().flatten().map(M::account), -1);
157        chain
158    }
159
160    /// Push a chain and account size changes.
161    #[inline]
162    fn chain_push(&mut self, chain: Vec<M::Chunk>) {
163        self.account(chain.iter().map(M::account), 1);
164        self.chains.push(chain);
165    }
166
167    /// Account size changes. Only performs work if a logger exists.
168    ///
169    /// Calculate the size based on the iterator passed along, with each attribute
170    /// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff.
171    #[inline]
172    fn account<I: IntoIterator<Item = (usize, usize, usize, usize)>>(&self, items: I, diff: isize) {
173        if let Some(logger) = &self.logger {
174            let (mut records, mut size, mut capacity, mut allocations) = (0isize, 0isize, 0isize, 0isize);
175            for (records_, size_, capacity_, allocations_) in items {
176                records = records.saturating_add_unsigned(records_);
177                size = size.saturating_add_unsigned(size_);
178                capacity = capacity.saturating_add_unsigned(capacity_);
179                allocations = allocations.saturating_add_unsigned(allocations_);
180            }
181            logger.log(BatcherEvent {
182                operator: self.operator_id,
183                records_diff: records * diff,
184                size_diff: size * diff,
185                capacity_diff: capacity * diff,
186                allocations_diff: allocations * diff,
187            })
188        }
189    }
190}
191
192impl<Input, C, M: Merger> Drop for MergeBatcher<Input, C, M> {
193    fn drop(&mut self) {
194        // Cleanup chain to retract accounting information.
195        while self.chain_pop().is_some() {}
196    }
197}
198
199/// A trait to describe interesting moments in a merge batcher.
200pub trait Merger: Default {
201    /// The internal representation of chunks of data.
202    type Chunk: Default;
203    /// The type of time in frontiers to extract updates.
204    type Time;
205    /// Merge chains into an output chain.
206    fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>);
207    /// Extract ready updates based on the `upper` frontier.
208    fn extract(
209        &mut self,
210        merged: Vec<Self::Chunk>,
211        upper: AntichainRef<Self::Time>,
212        frontier: &mut Antichain<Self::Time>,
213        readied: &mut Vec<Self::Chunk>,
214        kept: &mut Vec<Self::Chunk>,
215        stash: &mut Vec<Self::Chunk>,
216    );
217
218    /// Account size and allocation changes. Returns a tuple of (records, size, capacity, allocations).
219    fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize);
220}
221
222pub use container::InternalMerger;
223
224pub mod container {
225
226    //! Merger implementations for the merge batcher.
227    //!
228    //! The `InternalMerge` trait allows containers to merge sorted, consolidated
229    //! data using internal iteration. The `InternalMerger` type implements the
230    //! `Merger` trait using `InternalMerge`, and is the standard merger for all
231    //! container types.
232
233    use std::marker::PhantomData;
234    use timely::container::SizableContainer;
235    use timely::progress::frontier::{Antichain, AntichainRef};
236    use timely::{Accountable, PartialOrder};
237    use crate::trace::implementations::merge_batcher::Merger;
238
239    /// A container that supports the operations needed by the merge batcher:
240    /// merging sorted chains and extracting updates by time.
241    pub trait InternalMerge: Accountable + SizableContainer + Default {
242        /// The owned time type, for maintaining antichains.
243        type TimeOwned;
244
245        /// The number of items in this container.
246        fn len(&self) -> usize;
247
248        /// Clear the container for reuse.
249        fn clear(&mut self);
250
251        /// Account the allocations behind the chunk.
252        fn account(&self) -> (usize, usize, usize, usize) {
253            let (size, capacity, allocations) = (0, 0, 0);
254            (usize::try_from(self.record_count()).unwrap(), size, capacity, allocations)
255        }
256
257        /// Merge items from sorted inputs into `self`, advancing positions.
258        /// Merges until `self` is at capacity or all inputs are exhausted.
259        ///
260        /// Dispatches based on the number of inputs:
261        /// - **0**: no-op
262        /// - **1**: bulk copy (may swap the input into `self`)
263        /// - **2**: merge two sorted streams
264        fn merge_from(
265            &mut self,
266            others: &mut [Self],
267            positions: &mut [usize],
268        );
269
270        /// Extract updates from `self` into `ship` (times not beyond `upper`)
271        /// and `keep` (times beyond `upper`), updating `frontier` with kept times.
272        ///
273        /// Iteration starts at `*position` and advances `*position` as updates
274        /// are consumed. The implementation must yield (return early) when
275        /// either `keep.at_capacity()` or `ship.at_capacity()` becomes true,
276        /// so the caller can swap out a full output buffer and resume by
277        /// calling `extract` again. The caller invokes `extract` repeatedly
278        /// until `*position >= self.len()`.
279        ///
280        /// This shape exists because `at_capacity()` for `Vec` is
281        /// `len() == capacity()`, which silently becomes false again the
282        /// moment a push past capacity grows the backing allocation.
283        /// Without per-element yielding, a single `extract` call can
284        /// quietly produce oversized output chunks.
285        fn extract(
286            &mut self,
287            position: &mut usize,
288            upper: AntichainRef<Self::TimeOwned>,
289            frontier: &mut Antichain<Self::TimeOwned>,
290            keep: &mut Self,
291            ship: &mut Self,
292        );
293    }
294
295    /// A `Merger` for `Vec` containers, which contain owned data and need special treatment.
296    pub type VecInternalMerger<D, T, R> = VecMerger<D, T, R>;
297    /// A `Merger` implementation for `Vec<(D, T, R)>` that drains owned inputs.
298    pub struct VecMerger<D, T, R> {
299        _marker: PhantomData<(D, T, R)>,
300    }
301
302    impl<D, T, R> Default for VecMerger<D, T, R> {
303        fn default() -> Self { Self { _marker: PhantomData } }
304    }
305
306    impl<D, T, R> VecMerger<D, T, R> {
307        /// The target capacity for output buffers, as a power of two.
308        ///
309        /// This amount is used to size vectors, where vectors not exactly this capacity are dropped.
310        /// If this is mis-set, there is the potential for more memory churn than anticipated.
311        fn target_capacity() -> usize {
312            timely::container::buffer::default_capacity::<(D, T, R)>().next_power_of_two()
313        }
314        /// Acquire a buffer with the target capacity.
315        fn empty(&self, stash: &mut Vec<Vec<(D, T, R)>>) -> Vec<(D, T, R)> {
316            let target = Self::target_capacity();
317            let mut container = stash.pop().unwrap_or_default();
318            container.clear();
319            // Reuse if at target; otherwise allocate fresh.
320            if container.capacity() != target {
321                container = Vec::with_capacity(target);
322            }
323            container
324        }
325        /// Refill `queue` from `iter` if empty. Recycles drained queues into `stash`.
326        fn refill(queue: &mut std::collections::VecDeque<(D, T, R)>, iter: &mut impl Iterator<Item = Vec<(D, T, R)>>, stash: &mut Vec<Vec<(D, T, R)>>) {
327            if queue.is_empty() {
328                let target = Self::target_capacity();
329                if stash.len() < 2 {
330                    let mut recycled = Vec::from(std::mem::take(queue));
331                    recycled.clear();
332                    if recycled.capacity() == target {
333                        stash.push(recycled);
334                    }
335                }
336                if let Some(chunk) = iter.next() {
337                    *queue = std::collections::VecDeque::from(chunk);
338                }
339            }
340        }
341    }
342
343    impl<D, T, R> Merger for VecMerger<D, T, R>
344    where
345        D: Ord + Clone + 'static,
346        T: Ord + Clone + PartialOrder + 'static,
347        R: crate::difference::Semigroup + Clone + 'static,
348    {
349        type Chunk = Vec<(D, T, R)>;
350        type Time = T;
351
352        fn merge(
353            &mut self,
354            list1: Vec<Vec<(D, T, R)>>,
355            list2: Vec<Vec<(D, T, R)>>,
356            output: &mut Vec<Vec<(D, T, R)>>,
357            stash: &mut Vec<Vec<(D, T, R)>>,
358        ) {
359            use std::cmp::Ordering;
360            use std::collections::VecDeque;
361
362            let mut iter1 = list1.into_iter();
363            let mut iter2 = list2.into_iter();
364            let mut q1 = VecDeque::<(D,T,R)>::from(iter1.next().unwrap_or_default());
365            let mut q2 = VecDeque::<(D,T,R)>::from(iter2.next().unwrap_or_default());
366
367            let mut result = self.empty(stash);
368
369            // Merge while both queues are non-empty.
370            while let (Some((d1, t1, _)), Some((d2, t2, _))) = (q1.front(), q2.front()) {
371                match (d1, t1).cmp(&(d2, t2)) {
372                    Ordering::Less => {
373                        result.push(q1.pop_front().unwrap());
374                    }
375                    Ordering::Greater => {
376                        result.push(q2.pop_front().unwrap());
377                    }
378                    Ordering::Equal => {
379                        let (d, t, mut r1) = q1.pop_front().unwrap();
380                        let (_, _, r2) = q2.pop_front().unwrap();
381                        r1.plus_equals(&r2);
382                        if !r1.is_zero() {
383                            result.push((d, t, r1));
384                        }
385                    }
386                }
387
388                if result.at_capacity() {
389                    output.push(std::mem::take(&mut result));
390                    result = self.empty(stash);
391                }
392
393                // Refill emptied queues from their chains.
394                if q1.is_empty() { Self::refill(&mut q1, &mut iter1, stash); }
395                if q2.is_empty() { Self::refill(&mut q2, &mut iter2, stash); }
396            }
397
398            // Push partial result and remaining data from both sides.
399            if !result.is_empty() { output.push(result); }
400            for q in [q1, q2] {
401                if !q.is_empty() { output.push(Vec::from(q)); }
402            }
403            output.extend(iter1);
404            output.extend(iter2);
405        }
406
407        fn extract(
408            &mut self,
409            merged: Vec<Vec<(D, T, R)>>,
410            upper: AntichainRef<T>,
411            frontier: &mut Antichain<T>,
412            ship: &mut Vec<Vec<(D, T, R)>>,
413            kept: &mut Vec<Vec<(D, T, R)>>,
414            stash: &mut Vec<Vec<(D, T, R)>>,
415        ) {
416            let mut keep = self.empty(stash);
417            let mut ready = self.empty(stash);
418
419            for mut chunk in merged {
420                // Go update-by-update to swap out full containers.
421                for (data, time, diff) in chunk.drain(..) {
422                    if upper.less_equal(&time) {
423                        frontier.insert_with(&time, |time| time.clone());
424                        keep.push((data, time, diff));
425                    } else {
426                        ready.push((data, time, diff));
427                    }
428                    if keep.at_capacity() {
429                        kept.push(std::mem::take(&mut keep));
430                        keep = self.empty(stash);
431                    }
432                    if ready.at_capacity() {
433                        ship.push(std::mem::take(&mut ready));
434                        ready = self.empty(stash);
435                    }
436                }
437                // Recycle the now-empty chunk if it has the right capacity.
438                if chunk.capacity() == Self::target_capacity() {
439                    stash.push(chunk);
440                }
441            }
442            if !keep.is_empty() { kept.push(keep); }
443            if !ready.is_empty() { ship.push(ready); }
444        }
445
446        fn account(chunk: &Vec<(D, T, R)>) -> (usize, usize, usize, usize) {
447            (chunk.len(), 0, 0, 0)
448        }
449    }
450
451    /// A merger that uses internal iteration via [`InternalMerge`].
452    pub struct InternalMerger<MC> {
453        _marker: PhantomData<MC>,
454    }
455
456    impl<MC> Default for InternalMerger<MC> {
457        fn default() -> Self { Self { _marker: PhantomData } }
458    }
459
460    impl<MC> InternalMerger<MC> where MC: InternalMerge {
461        #[inline]
462        fn empty(&self, stash: &mut Vec<MC>) -> MC {
463            stash.pop().unwrap_or_else(|| {
464                let mut container = MC::default();
465                container.ensure_capacity(&mut None);
466                container
467            })
468        }
469        #[inline]
470        fn recycle(&self, mut chunk: MC, stash: &mut Vec<MC>) {
471            chunk.clear();
472            stash.push(chunk);
473        }
474        /// Drain remaining items from one side into `result`/`output`.
475        ///
476        /// Copies the partially-consumed head into `result`, then appends
477        /// remaining full chunks directly to `output` without copying.
478        fn drain_side(
479            &self,
480            head: &mut MC,
481            pos: &mut usize,
482            list: &mut std::vec::IntoIter<MC>,
483            result: &mut MC,
484            output: &mut Vec<MC>,
485            stash: &mut Vec<MC>,
486        ) {
487            // Copy the partially-consumed head into result.
488            if *pos < head.len() {
489                result.merge_from(
490                    std::slice::from_mut(head),
491                    std::slice::from_mut(pos),
492                );
493            }
494            // Flush result before appending full chunks.
495            if !result.is_empty() {
496                output.push(std::mem::take(result));
497                *result = self.empty(stash);
498            }
499            // Remaining full chunks go directly to output.
500            output.extend(list);
501        }
502    }
503
504    impl<MC> Merger for InternalMerger<MC>
505    where
506        MC: InternalMerge<TimeOwned: Ord + PartialOrder + Clone + 'static> + 'static,
507    {
508        type Time = MC::TimeOwned;
509        type Chunk = MC;
510
511        fn merge(&mut self, list1: Vec<MC>, list2: Vec<MC>, output: &mut Vec<MC>, stash: &mut Vec<MC>) {
512            let mut list1 = list1.into_iter();
513            let mut list2 = list2.into_iter();
514
515            let mut heads = [list1.next().unwrap_or_default(), list2.next().unwrap_or_default()];
516            let mut positions = [0usize, 0usize];
517
518            let mut result = self.empty(stash);
519
520            // Main merge loop: both sides have data.
521            while positions[0] < heads[0].len() && positions[1] < heads[1].len() {
522                result.merge_from(&mut heads, &mut positions);
523
524                if positions[0] >= heads[0].len() {
525                    let old = std::mem::replace(&mut heads[0], list1.next().unwrap_or_default());
526                    self.recycle(old, stash);
527                    positions[0] = 0;
528                }
529                if positions[1] >= heads[1].len() {
530                    let old = std::mem::replace(&mut heads[1], list2.next().unwrap_or_default());
531                    self.recycle(old, stash);
532                    positions[1] = 0;
533                }
534                if result.at_capacity() {
535                    output.push(std::mem::take(&mut result));
536                    result = self.empty(stash);
537                }
538            }
539
540            // Drain remaining from each side: copy partial head, then append full chunks.
541            self.drain_side(&mut heads[0], &mut positions[0], &mut list1, &mut result, output, stash);
542            self.drain_side(&mut heads[1], &mut positions[1], &mut list2, &mut result, output, stash);
543            if !result.is_empty() {
544                output.push(result);
545            }
546        }
547
548        fn extract(
549            &mut self,
550            merged: Vec<Self::Chunk>,
551            upper: AntichainRef<Self::Time>,
552            frontier: &mut Antichain<Self::Time>,
553            ship: &mut Vec<Self::Chunk>,
554            kept: &mut Vec<Self::Chunk>,
555            stash: &mut Vec<Self::Chunk>,
556        ) {
557            let mut keep = self.empty(stash);
558            let mut ready = self.empty(stash);
559
560            for mut buffer in merged {
561                let mut position = 0;
562                let len = buffer.len();
563                while position < len {
564                    buffer.extract(&mut position, upper, frontier, &mut keep, &mut ready);
565                    if keep.at_capacity() {
566                        kept.push(std::mem::take(&mut keep));
567                        keep = self.empty(stash);
568                    }
569                    if ready.at_capacity() {
570                        ship.push(std::mem::take(&mut ready));
571                        ready = self.empty(stash);
572                    }
573                }
574                self.recycle(buffer, stash);
575            }
576            if !keep.is_empty() {
577                kept.push(keep);
578            }
579            if !ready.is_empty() {
580                ship.push(ready);
581            }
582        }
583
584        fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
585            chunk.account()
586        }
587    }
588
589    /// Implementation of `InternalMerge` for `Vec<(D, T, R)>`.
590    ///
591    /// Note: The `VecMerger` type implements `Merger` directly and avoids
592    /// cloning by draining inputs. This `InternalMerge` impl is retained
593    /// because `reduce` requires `Builder::Input: InternalMerge`.
594    pub mod vec_internal {
595        use std::cmp::Ordering;
596        use timely::PartialOrder;
597        use timely::container::SizableContainer;
598        use timely::progress::frontier::{Antichain, AntichainRef};
599        use crate::difference::Semigroup;
600        use super::InternalMerge;
601
602        impl<D: Ord + Clone + 'static, T: Ord + Clone + PartialOrder + 'static, R: Semigroup + Clone + 'static> InternalMerge for Vec<(D, T, R)> {
603            type TimeOwned = T;
604
605            fn len(&self) -> usize { Vec::len(self) }
606            fn clear(&mut self) { Vec::clear(self) }
607
608            fn merge_from(
609                &mut self,
610                others: &mut [Self],
611                positions: &mut [usize],
612            ) {
613                match others.len() {
614                    0 => {},
615                    1 => {
616                        let other = &mut others[0];
617                        let pos = &mut positions[0];
618                        if self.is_empty() && *pos == 0 {
619                            std::mem::swap(self, other);
620                            return;
621                        }
622                        self.extend_from_slice(&other[*pos ..]);
623                        *pos = other.len();
624                    },
625                    2 => {
626                        let (left, right) = others.split_at_mut(1);
627                        let other1 = &mut left[0];
628                        let other2 = &mut right[0];
629
630                        while positions[0] < other1.len() && positions[1] < other2.len() && !self.at_capacity() {
631                            let (d1, t1, _) = &other1[positions[0]];
632                            let (d2, t2, _) = &other2[positions[1]];
633                            // NOTE: The .clone() calls here are not great, but this dead code to be removed in the next release.
634                            match (d1, t1).cmp(&(d2, t2)) {
635                                Ordering::Less => {
636                                    self.push(other1[positions[0]].clone());
637                                    positions[0] += 1;
638                                }
639                                Ordering::Greater => {
640                                    self.push(other2[positions[1]].clone());
641                                    positions[1] += 1;
642                                }
643                                Ordering::Equal => {
644                                    let (d, t, mut r1) = other1[positions[0]].clone();
645                                    let (_, _, ref r2) = other2[positions[1]];
646                                    r1.plus_equals(r2);
647                                    if !r1.is_zero() {
648                                        self.push((d, t, r1));
649                                    }
650                                    positions[0] += 1;
651                                    positions[1] += 1;
652                                }
653                            }
654                        }
655                    },
656                    n => unimplemented!("{n}-way merge not yet supported"),
657                }
658            }
659
660            fn extract(
661                &mut self,
662                position: &mut usize,
663                upper: AntichainRef<T>,
664                frontier: &mut Antichain<T>,
665                keep: &mut Self,
666                ship: &mut Self,
667            ) {
668                let len = self.len();
669                while *position < len && !keep.at_capacity() && !ship.at_capacity() {
670                    let (data, time, diff) = self[*position].clone();
671                    if upper.less_equal(&time) {
672                        frontier.insert_with(&time, |time| time.clone());
673                        keep.push((data, time, diff));
674                    } else {
675                        ship.push((data, time, diff));
676                    }
677                    *position += 1;
678                }
679            }
680        }
681    }
682
683}