Skip to main content

differential_dataflow/
consolidation.rs

1//! Common logic for the consolidation of vectors of Semigroups.
2//!
3//! Often we find ourselves with collections of records with associated weights (often
4//! integers) where we want to reduce the collection to the point that each record occurs
5//! at most once, with the accumulated weights. These methods supply that functionality.
6//!
7//! Importantly, these methods are used internally by differential dataflow, but are made
8//! public for the convenience of others. Their precise behavior is driven by the needs of
9//! differential dataflow (chiefly: canonicalizing sequences of non-zero updates); should
10//! you need specific behavior, it may be best to defensively copy, paste, and maintain the
11//! specific behavior you require.
12
13use std::collections::VecDeque;
14use columnation::Columnation;
15use timely::container::{ContainerBuilder, PushInto};
16use crate::Data;
17use crate::difference::Semigroup;
18
19/// Sorts and consolidates `vec`.
20///
21/// This method will sort `vec` and then consolidate runs of more than one entry with
22/// identical first elements by accumulating the second elements of the pairs. Should the final
23/// accumulation be zero, the element is discarded.
24#[inline]
25pub fn consolidate<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>) {
26    consolidate_from(vec, 0);
27}
28
29/// Sorts and consolidate `vec[offset..]`.
30///
31/// This method will sort `vec[offset..]` and then consolidate runs of more than one entry with
32/// identical first elements by accumulating the second elements of the pairs. Should the final
33/// accumulation be zero, the element is discarded.
34#[inline]
35pub fn consolidate_from<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>, offset: usize) {
36    let length = consolidate_slice(&mut vec[offset..]);
37    vec.truncate(offset + length);
38}
39
40/// Sorts and consolidates a slice, returning the valid prefix length.
41#[inline]
42pub fn consolidate_slice<T: Ord, R: Semigroup>(slice: &mut [(T, R)]) -> usize {
43    if slice.len() > 1 {
44        consolidate_slice_slow(slice)
45    }
46    else {
47        slice.iter().filter(|x| !x.1.is_zero()).count()
48    }
49}
50
51/// Part of `consolidate_slice` that handles slices of length greater than 1.
52fn consolidate_slice_slow<T: Ord, R: Semigroup>(slice: &mut [(T, R)]) -> usize {
53    // We could do an insertion-sort like initial scan which builds up sorted, consolidated runs.
54    // In a world where there are not many results, we may never even need to call in to merge sort.
55    slice.sort_by(|x,y| x.0.cmp(&y.0));
56
57    // Counts the number of distinct known-non-zero accumulations. Indexes the write location.
58    let mut offset = 0;
59    let mut accum = slice[offset].1.clone();
60
61    for index in 1 .. slice.len() {
62        if slice[index].0 == slice[index-1].0 {
63            accum.plus_equals(&slice[index].1);
64        }
65        else {
66            if !accum.is_zero() {
67                slice.swap(offset, index-1);
68                slice[offset].1.clone_from(&accum);
69                offset += 1;
70            }
71            accum.clone_from(&slice[index].1);
72        }
73    }
74    if !accum.is_zero() {
75        slice.swap(offset, slice.len()-1);
76        slice[offset].1 = accum;
77        offset += 1;
78    }
79
80    offset
81}
82
83/// Sorts and consolidates `vec`.
84///
85/// This method will sort `vec` and then consolidate runs of more than one entry with
86/// identical first two elements by accumulating the third elements of the triples. Should the final
87/// accumulation be zero, the element is discarded.
88#[inline]
89pub fn consolidate_updates<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>) {
90    consolidate_updates_from(vec, 0);
91}
92
93/// Sorts and consolidate `vec[offset..]`.
94///
95/// This method will sort `vec[offset..]` and then consolidate runs of more than one entry with
96/// identical first two elements by accumulating the third elements of the triples. Should the final
97/// accumulation be zero, the element is discarded.
98#[inline]
99pub fn consolidate_updates_from<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>, offset: usize) {
100    let length = consolidate_updates_slice(&mut vec[offset..]);
101    vec.truncate(offset + length);
102}
103
104/// Sorts and consolidates a slice, returning the valid prefix length.
105#[inline]
106pub fn consolidate_updates_slice<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D, T, R)]) -> usize {
107
108    if slice.len() > 1 {
109        consolidate_updates_slice_slow(slice)
110    }
111    else {
112        slice.iter().filter(|x| !x.2.is_zero()).count()
113    }
114}
115
116/// Part of `consolidate_updates_slice` that handles slices of length greater than 1.
117fn consolidate_updates_slice_slow<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D, T, R)]) -> usize {
118    // We could do an insertion-sort like initial scan which builds up sorted, consolidated runs.
119    // In a world where there are not many results, we may never even need to call in to merge sort.
120    slice.sort_unstable_by(|x,y| (&x.0, &x.1).cmp(&(&y.0, &y.1)));
121
122    // Counts the number of distinct known-non-zero accumulations. Indexes the write location.
123    let mut offset = 0;
124    let mut accum = slice[offset].2.clone();
125
126    for index in 1 .. slice.len() {
127        if (slice[index].0 == slice[index-1].0) && (slice[index].1 == slice[index-1].1) {
128            accum.plus_equals(&slice[index].2);
129        }
130        else {
131            if !accum.is_zero() {
132                slice.swap(offset, index-1);
133                slice[offset].2.clone_from(&accum);
134                offset += 1;
135            }
136            accum.clone_from(&slice[index].2);
137        }
138    }
139    if !accum.is_zero() {
140        slice.swap(offset, slice.len()-1);
141        slice[offset].2 = accum;
142        offset += 1;
143    }
144
145    offset
146}
147
148
149/// A container builder that consolidates data in-places into fixed-sized containers. Does not
150/// maintain FIFO ordering.
151#[derive(Default)]
152pub struct ConsolidatingContainerBuilder<C>{
153    current: C,
154    empty: Vec<C>,
155    outbound: VecDeque<C>,
156}
157
158impl<D,T,R> ConsolidatingContainerBuilder<Vec<(D, T, R)>>
159where
160    D: Data,
161    T: Data,
162    R: Semigroup+'static,
163{
164    /// Flush `self.current` up to the biggest `multiple` of elements. Pass 1 to flush all elements.
165    // TODO: Can we replace `multiple` by a bool?
166    #[cold]
167    fn consolidate_and_flush_through(&mut self, multiple: usize) {
168        let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>();
169        consolidate_updates(&mut self.current);
170        let mut drain = self.current.drain(..(self.current.len()/multiple)*multiple).peekable();
171        while drain.peek().is_some() {
172            let mut container = self.empty.pop().unwrap_or_else(|| Vec::with_capacity(preferred_capacity));
173            container.clear();
174            container.extend((&mut drain).take(preferred_capacity));
175            self.outbound.push_back(container);
176        }
177    }
178}
179
180impl<D, T, R, P> PushInto<P> for ConsolidatingContainerBuilder<Vec<(D, T, R)>>
181where
182    D: Data,
183    T: Data,
184    R: Semigroup+'static,
185    Vec<(D, T, R)>: PushInto<P>,
186{
187    /// Push an element.
188    ///
189    /// Precondition: `current` is not allocated or has space for at least one element.
190    #[inline]
191    fn push_into(&mut self, item: P) {
192        let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>();
193        if self.current.capacity() < preferred_capacity * 2 {
194            self.current.reserve(preferred_capacity * 2 - self.current.capacity());
195        }
196        self.current.push_into(item);
197        if self.current.len() == self.current.capacity() {
198            // Flush complete containers.
199            self.consolidate_and_flush_through(preferred_capacity);
200        }
201    }
202}
203
204impl<D,T,R> ContainerBuilder for ConsolidatingContainerBuilder<Vec<(D, T, R)>>
205where
206    D: Data,
207    T: Data,
208    R: Semigroup+'static,
209{
210    type Container = Vec<(D,T,R)>;
211
212    #[inline]
213    fn extract(&mut self) -> Option<&mut Vec<(D,T,R)>> {
214        if let Some(container) = self.outbound.pop_front() {
215            self.empty.push(container);
216            self.empty.last_mut()
217        } else {
218            None
219        }
220    }
221
222    #[inline]
223    fn finish(&mut self) -> Option<&mut Vec<(D,T,R)>> {
224        if !self.current.is_empty() {
225            // Flush all
226            self.consolidate_and_flush_through(1);
227            // Remove all but two elements from the stash of empty to avoid memory leaks. We retain
228            // two to match `current` capacity.
229            self.empty.truncate(2);
230        }
231        self.extract()
232    }
233}
234
235/// A container that can sort and consolidate its contents internally.
236///
237/// The container knows its own layout — how to sort its elements, how to
238/// compare adjacent entries, and how to merge diffs. The caller provides
239/// a `target` container to receive the consolidated output, allowing
240/// reuse of allocations across calls.
241///
242/// After the call, `target` contains the sorted, consolidated data and
243/// `self` may be empty or in an unspecified state (implementations should
244/// document this).
245pub trait Consolidate {
246    /// The number of elements in the container.
247    fn len(&self) -> usize;
248    /// Clear the container.
249    fn clear(&mut self);
250    /// Sort and consolidate `self` into `target`.
251    fn consolidate_into(&mut self, target: &mut Self);
252}
253
254impl<D: Ord, T: Ord, R: Semigroup> Consolidate for Vec<(D, T, R)> {
255    fn len(&self) -> usize { Vec::len(self) }
256    fn clear(&mut self) { Vec::clear(self) }
257    fn consolidate_into(&mut self, target: &mut Self) {
258        consolidate_updates(self);
259        std::mem::swap(self, target);
260    }
261}
262
263impl<D: Ord + Columnation, T: Ord + Columnation, R: Semigroup + Columnation> Consolidate for crate::containers::TimelyStack<(D, T, R)> {
264    fn len(&self) -> usize { self[..].len() }
265    fn clear(&mut self) { crate::containers::TimelyStack::clear(self) }
266    fn consolidate_into(&mut self, target: &mut Self) {
267        let len = self[..].len();
268        let mut indices: Vec<usize> = (0..len).collect();
269        indices.sort_unstable_by(|&i, &j| {
270            let (d1, t1, _) = &self[i];
271            let (d2, t2, _) = &self[j];
272            (d1, t1).cmp(&(d2, t2))
273        });
274        target.clear();
275        let mut idx = 0;
276        while idx < indices.len() {
277            let (d, t, r) = &self[indices[idx]];
278            let mut r_owned = r.clone();
279            idx += 1;
280            while idx < indices.len() {
281                let (d2, t2, r2) = &self[indices[idx]];
282                if d == d2 && t == t2 {
283                    r_owned.plus_equals(r2);
284                    idx += 1;
285                } else { break; }
286            }
287            if !r_owned.is_zero() {
288                target.copy_destructured(d, t, &r_owned);
289            }
290        }
291        self.clear();
292    }
293}
294
295#[cfg(test)]
296mod tests {
297    use super::*;
298
299    #[test]
300    fn test_consolidate() {
301        let test_cases = vec![
302            (
303                vec![("a", -1), ("b", -2), ("a", 1)],
304                vec![("b", -2)],
305            ),
306            (
307                vec![("a", -1), ("b", 0), ("a", 1)],
308                vec![],
309            ),
310            (
311                vec![("a", 0)],
312                vec![],
313            ),
314            (
315                vec![("a", 0), ("b", 0)],
316                vec![],
317            ),
318            (
319                vec![("a", 1), ("b", 1)],
320                vec![("a", 1), ("b", 1)],
321            ),
322        ];
323
324        for (mut input, output) in test_cases {
325            consolidate(&mut input);
326            assert_eq!(input, output);
327        }
328    }
329
330
331    #[test]
332    fn test_consolidate_updates() {
333        let test_cases = vec![
334            (
335                vec![("a", 1, -1), ("b", 1, -2), ("a", 1, 1)],
336                vec![("b", 1, -2)],
337            ),
338            (
339                vec![("a", 1, -1), ("b", 1, 0), ("a", 1, 1)],
340                vec![],
341            ),
342            (
343                vec![("a", 1, 0)],
344                vec![],
345            ),
346            (
347                vec![("a", 1, 0), ("b", 1, 0)],
348                vec![],
349            ),
350            (
351                vec![("a", 1, 1), ("b", 2, 1)],
352                vec![("a", 1, 1), ("b", 2, 1)],
353            ),
354        ];
355
356        for (mut input, output) in test_cases {
357            consolidate_updates(&mut input);
358            assert_eq!(input, output);
359        }
360    }
361
362    #[test]
363    fn test_consolidating_container_builder() {
364        let mut ccb = <ConsolidatingContainerBuilder<Vec<(usize, usize, usize)>>>::default();
365        for _ in 0..1024 {
366            ccb.push_into((0, 0, 0));
367        }
368        assert_eq!(ccb.extract(), None);
369        assert_eq!(ccb.finish(), None);
370
371        for i in 0..1024 {
372            ccb.push_into((i, 0, 1));
373        }
374
375        let mut collected = Vec::default();
376        while let Some(container) = ccb.finish() {
377            collected.append(container);
378        }
379        // The output happens to be sorted, but it's not guaranteed.
380        collected.sort();
381        for i in 0..1024 {
382            assert_eq!((i, 0, 1), collected[i]);
383        }
384    }
385
386    #[test]
387    fn test_consolidate_into() {
388        let mut data = vec![(1, 1, 1), (2, 1, 1), (1, 1, -1)];
389        let mut target = Vec::default();
390        data.sort();
391        data.consolidate_into(&mut target);
392        assert_eq!(target, [(2, 1, 1)]);
393    }
394
395    #[cfg(not(debug_assertions))]
396    const LEN: usize = 256 << 10;
397    #[cfg(not(debug_assertions))]
398    const REPS: usize = 10 << 10;
399
400    #[cfg(debug_assertions)]
401    const LEN: usize = 256 << 1;
402    #[cfg(debug_assertions)]
403    const REPS: usize = 10 << 1;
404
405    #[test]
406    fn test_consolidator_duration() {
407        let mut data = Vec::with_capacity(LEN);
408        let mut data2 = Vec::with_capacity(LEN);
409        let mut target = Vec::new();
410        let mut duration = std::time::Duration::default();
411        for _ in 0..REPS {
412            data.clear();
413            data2.clear();
414            target.clear();
415            data.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
416            data2.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
417            data.sort_by(|x,y| x.0.cmp(&y.0));
418            let start = std::time::Instant::now();
419            data.consolidate_into(&mut target);
420            duration += start.elapsed();
421
422            consolidate_updates(&mut data2);
423            assert_eq!(target, data2);
424        }
425        println!("elapsed consolidator {duration:?}");
426    }
427
428    #[test]
429    fn test_consolidator_duration_vec() {
430        let mut data = Vec::with_capacity(LEN);
431        let mut duration = std::time::Duration::default();
432        for _ in 0..REPS {
433            data.clear();
434            data.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
435            data.sort_by(|x,y| x.0.cmp(&y.0));
436            let start = std::time::Instant::now();
437            consolidate_updates(&mut data);
438            duration += start.elapsed();
439        }
440        println!("elapsed vec {duration:?}");
441    }
442}