Skip to main content

differential_dataflow/trace/implementations/
merge_batcher.rs

1//! A `Batcher` implementation based on merge sort.
2//!
3//! The `MergeBatcher` requires a "merger" that implements the [`Merger`] trait, which provides
4//! hooks for manipulating sorted "chains" of chunks as needed by the merge batcher: merging
5//! chunks and also splitting them apart based on time.
6//!
7//! Callers feed already-chunked, sorted-and-consolidated input into the batcher via [`PushInto`].
8//! Forming such chunks from raw data is the responsibility of the caller (typically a chunker
9//! living in the surrounding dataflow operator).
10
11use timely::progress::frontier::AntichainRef;
12use timely::progress::{frontier::Antichain, Timestamp};
13use timely::container::PushInto;
14
15use crate::logging::{BatcherEvent, Logger};
16use crate::trace::{Batcher, Description};
17
18/// Creates batches from chunks of sorted, consolidated tuples.
19pub struct MergeBatcher<M: Merger> {
20    /// A sequence of power-of-two length lists of sorted, consolidated containers.
21    ///
22    /// Do not push/pop directly but use the corresponding functions ([`Self::chain_push`]/[`Self::chain_pop`]).
23    chains: Vec<Vec<M::Chunk>>,
24    /// Stash of empty chunks, recycled through the merging process.
25    stash: Vec<M::Chunk>,
26    /// Merges consolidated chunks, and extracts the subset of an update chain that lies in an interval of time.
27    merger: M,
28    /// Current lower frontier, we sealed up to here.
29    lower: Antichain<M::Time>,
30    /// The lower-bound frontier of the data, after the last call to seal.
31    frontier: Antichain<M::Time>,
32    /// Logger for size accounting.
33    logger: Option<Logger>,
34    /// Timely operator ID.
35    operator_id: usize,
36}
37
38impl<M> Batcher for MergeBatcher<M>
39where
40    M: Merger<Time: Timestamp>,
41{
42    type Time = M::Time;
43    type Output = M::Chunk;
44
45    fn new(logger: Option<Logger>, operator_id: usize) -> Self {
46        Self {
47            logger,
48            operator_id,
49            merger: M::default(),
50            chains: Vec::new(),
51            stash: Vec::new(),
52            frontier: Antichain::new(),
53            lower: Antichain::from_elem(M::Time::minimum()),
54        }
55    }
56
57    // Sealing a batch means finding those updates with times not greater or equal to any time
58    // in `upper`. All updates must have time greater or equal to the previously used `upper`,
59    // which we call `lower`, by assumption that after sealing a batcher we receive no more
60    // updates with times not greater or equal to `upper`.
61    fn seal(&mut self, upper: Antichain<M::Time>) -> (Vec<Self::Output>, Description<M::Time>) {
62        // Merge all remaining chains into a single chain.
63        while self.chains.len() > 1 {
64            let list1 = self.chain_pop().unwrap();
65            let list2 = self.chain_pop().unwrap();
66            let merged = self.merge_by(list1, list2);
67            self.chain_push(merged);
68        }
69        let merged = self.chain_pop().unwrap_or_default();
70
71        // Extract readied data.
72        let mut kept = Vec::new();
73        let mut readied = Vec::new();
74        self.frontier.clear();
75
76        self.merger.extract(merged, upper.borrow(), &mut self.frontier, &mut readied, &mut kept, &mut self.stash);
77
78        if !kept.is_empty() {
79            self.chain_push(kept);
80        }
81
82        self.stash.clear();
83
84        let description = Description::new(self.lower.clone(), upper.clone(), Antichain::from_elem(M::Time::minimum()));
85        self.lower = upper;
86        (readied, description)
87    }
88
89    /// The frontier of elements remaining after the most recent call to `self.seal`.
90    #[inline]
91    fn frontier(&mut self) -> AntichainRef<'_, M::Time> {
92        self.frontier.borrow()
93    }
94}
95
96impl<M: Merger> PushInto<M::Chunk> for MergeBatcher<M> {
97    fn push_into(&mut self, chunk: M::Chunk) {
98        self.insert_chain(vec![chunk]);
99    }
100}
101
102impl<M: Merger> MergeBatcher<M> {
103    /// Insert a chain and maintain chain properties: Chains are geometrically sized and ordered
104    /// by decreasing length.
105    fn insert_chain(&mut self, chain: Vec<M::Chunk>) {
106        if !chain.is_empty() {
107            self.chain_push(chain);
108            while self.chains.len() > 1 && (self.chains[self.chains.len() - 1].len() >= self.chains[self.chains.len() - 2].len() / 2) {
109                let list1 = self.chain_pop().unwrap();
110                let list2 = self.chain_pop().unwrap();
111                let merged = self.merge_by(list1, list2);
112                self.chain_push(merged);
113            }
114        }
115    }
116
117    // merges two sorted input lists into one sorted output list.
118    fn merge_by(&mut self, list1: Vec<M::Chunk>, list2: Vec<M::Chunk>) -> Vec<M::Chunk> {
119        // TODO: `list1` and `list2` get dropped; would be better to reuse?
120        let mut output = Vec::with_capacity(list1.len() + list2.len());
121        self.merger.merge(list1, list2, &mut output, &mut self.stash);
122
123        output
124    }
125
126    /// Pop a chain and account size changes.
127    #[inline]
128    fn chain_pop(&mut self) -> Option<Vec<M::Chunk>> {
129        let chain = self.chains.pop();
130        self.account(chain.iter().flatten().map(M::account), -1);
131        chain
132    }
133
134    /// Push a chain and account size changes.
135    #[inline]
136    fn chain_push(&mut self, chain: Vec<M::Chunk>) {
137        self.account(chain.iter().map(M::account), 1);
138        self.chains.push(chain);
139    }
140
141    /// Account size changes. Only performs work if a logger exists.
142    ///
143    /// Calculate the size based on the iterator passed along, with each attribute
144    /// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff.
145    #[inline]
146    fn account<I: IntoIterator<Item = (usize, usize, usize, usize)>>(&self, items: I, diff: isize) {
147        if let Some(logger) = &self.logger {
148            let (mut records, mut size, mut capacity, mut allocations) = (0isize, 0isize, 0isize, 0isize);
149            for (records_, size_, capacity_, allocations_) in items {
150                records = records.saturating_add_unsigned(records_);
151                size = size.saturating_add_unsigned(size_);
152                capacity = capacity.saturating_add_unsigned(capacity_);
153                allocations = allocations.saturating_add_unsigned(allocations_);
154            }
155            logger.log(BatcherEvent {
156                operator: self.operator_id,
157                records_diff: records * diff,
158                size_diff: size * diff,
159                capacity_diff: capacity * diff,
160                allocations_diff: allocations * diff,
161            })
162        }
163    }
164}
165
166impl<M: Merger> Drop for MergeBatcher<M> {
167    fn drop(&mut self) {
168        // Cleanup chain to retract accounting information.
169        while self.chain_pop().is_some() {}
170    }
171}
172
173/// A trait to describe interesting moments in a merge batcher.
174pub trait Merger: Default {
175    /// The internal representation of chunks of data.
176    type Chunk: Default;
177    /// The type of time in frontiers to extract updates.
178    type Time;
179    /// Merge chains into an output chain.
180    fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>);
181    /// Extract ready updates based on the `upper` frontier.
182    fn extract(
183        &mut self,
184        merged: Vec<Self::Chunk>,
185        upper: AntichainRef<Self::Time>,
186        frontier: &mut Antichain<Self::Time>,
187        readied: &mut Vec<Self::Chunk>,
188        kept: &mut Vec<Self::Chunk>,
189        stash: &mut Vec<Self::Chunk>,
190    );
191
192    /// Account size and allocation changes. Returns a tuple of (records, size, capacity, allocations).
193    fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize);
194}
195
196/// A `Merger` implementation for vector update containers.
197pub mod vec {
198
199    use std::marker::PhantomData;
200    use timely::container::SizableContainer;
201    use timely::progress::frontier::{Antichain, AntichainRef};
202    use timely::PartialOrder;
203    use crate::trace::implementations::merge_batcher::Merger;
204
205    /// A `Merger` implementation for `Vec<(D, T, R)>` that drains owned inputs.
206    pub struct VecMerger<D, T, R> {
207        _marker: PhantomData<(D, T, R)>,
208    }
209
210    impl<D, T, R> Default for VecMerger<D, T, R> {
211        fn default() -> Self { Self { _marker: PhantomData } }
212    }
213
214    impl<D, T, R> VecMerger<D, T, R> {
215        /// The target capacity for output buffers, as a power of two.
216        ///
217        /// This amount is used to size vectors, where vectors not exactly this capacity are dropped.
218        /// If this is mis-set, there is the potential for more memory churn than anticipated.
219        fn target_capacity() -> usize {
220            timely::container::buffer::default_capacity::<(D, T, R)>().next_power_of_two()
221        }
222        /// Acquire a buffer with the target capacity.
223        fn empty(&self, stash: &mut Vec<Vec<(D, T, R)>>) -> Vec<(D, T, R)> {
224            let target = Self::target_capacity();
225            let mut container = stash.pop().unwrap_or_default();
226            container.clear();
227            // Reuse if at target; otherwise allocate fresh.
228            if container.capacity() != target {
229                container = Vec::with_capacity(target);
230            }
231            container
232        }
233        /// Refill `queue` from `iter` if empty. Recycles drained queues into `stash`.
234        fn refill(queue: &mut std::collections::VecDeque<(D, T, R)>, iter: &mut impl Iterator<Item = Vec<(D, T, R)>>, stash: &mut Vec<Vec<(D, T, R)>>) {
235            if queue.is_empty() {
236                let target = Self::target_capacity();
237                if stash.len() < 2 {
238                    let mut recycled = Vec::from(std::mem::take(queue));
239                    recycled.clear();
240                    if recycled.capacity() == target {
241                        stash.push(recycled);
242                    }
243                }
244                if let Some(chunk) = iter.next() {
245                    *queue = std::collections::VecDeque::from(chunk);
246                }
247            }
248        }
249    }
250
251    impl<D, T, R> Merger for VecMerger<D, T, R>
252    where
253        D: Ord + Clone + 'static,
254        T: Ord + Clone + PartialOrder + 'static,
255        R: crate::difference::Semigroup + 'static,
256    {
257        type Chunk = Vec<(D, T, R)>;
258        type Time = T;
259
260        fn merge(
261            &mut self,
262            list1: Vec<Vec<(D, T, R)>>,
263            list2: Vec<Vec<(D, T, R)>>,
264            output: &mut Vec<Vec<(D, T, R)>>,
265            stash: &mut Vec<Vec<(D, T, R)>>,
266        ) {
267            use std::cmp::Ordering;
268            use std::collections::VecDeque;
269
270            let mut iter1 = list1.into_iter();
271            let mut iter2 = list2.into_iter();
272            let mut q1 = VecDeque::<(D,T,R)>::from(iter1.next().unwrap_or_default());
273            let mut q2 = VecDeque::<(D,T,R)>::from(iter2.next().unwrap_or_default());
274
275            let mut result = self.empty(stash);
276
277            // Merge while both queues are non-empty.
278            while let (Some((d1, t1, _)), Some((d2, t2, _))) = (q1.front(), q2.front()) {
279                match (d1, t1).cmp(&(d2, t2)) {
280                    Ordering::Less => {
281                        result.push(q1.pop_front().unwrap());
282                    }
283                    Ordering::Greater => {
284                        result.push(q2.pop_front().unwrap());
285                    }
286                    Ordering::Equal => {
287                        let (d, t, mut r1) = q1.pop_front().unwrap();
288                        let (_, _, r2) = q2.pop_front().unwrap();
289                        r1.plus_equals(&r2);
290                        if !r1.is_zero() {
291                            result.push((d, t, r1));
292                        }
293                    }
294                }
295
296                if result.at_capacity() {
297                    output.push(std::mem::take(&mut result));
298                    result = self.empty(stash);
299                }
300
301                // Refill emptied queues from their chains.
302                if q1.is_empty() { Self::refill(&mut q1, &mut iter1, stash); }
303                if q2.is_empty() { Self::refill(&mut q2, &mut iter2, stash); }
304            }
305
306            // Push partial result and remaining data from both sides.
307            if !result.is_empty() { output.push(result); }
308            for q in [q1, q2] {
309                if !q.is_empty() { output.push(Vec::from(q)); }
310            }
311            output.extend(iter1);
312            output.extend(iter2);
313        }
314
315        fn extract(
316            &mut self,
317            merged: Vec<Vec<(D, T, R)>>,
318            upper: AntichainRef<T>,
319            frontier: &mut Antichain<T>,
320            ship: &mut Vec<Vec<(D, T, R)>>,
321            kept: &mut Vec<Vec<(D, T, R)>>,
322            stash: &mut Vec<Vec<(D, T, R)>>,
323        ) {
324            let mut keep = self.empty(stash);
325            let mut ready = self.empty(stash);
326
327            for mut chunk in merged {
328                // Go update-by-update to swap out full containers.
329                for (data, time, diff) in chunk.drain(..) {
330                    if upper.less_equal(&time) {
331                        frontier.insert_with(&time, |time| time.clone());
332                        keep.push((data, time, diff));
333                    } else {
334                        ready.push((data, time, diff));
335                    }
336                    if keep.at_capacity() {
337                        kept.push(std::mem::take(&mut keep));
338                        keep = self.empty(stash);
339                    }
340                    if ready.at_capacity() {
341                        ship.push(std::mem::take(&mut ready));
342                        ready = self.empty(stash);
343                    }
344                }
345                // Recycle the now-empty chunk if it has the right capacity.
346                if chunk.capacity() == Self::target_capacity() {
347                    stash.push(chunk);
348                }
349            }
350            if !keep.is_empty() { kept.push(keep); }
351            if !ready.is_empty() { ship.push(ready); }
352        }
353
354        fn account(chunk: &Vec<(D, T, R)>) -> (usize, usize, usize, usize) {
355            (chunk.len(), 0, 0, 0)
356        }
357    }
358}