Skip to main content

differential_dataflow/
consolidation.rs

1//! Common logic for the consolidation of vectors of Semigroups.
2//!
3//! Often we find ourselves with collections of records with associated weights (often
4//! integers) where we want to reduce the collection to the point that each record occurs
5//! at most once, with the accumulated weights. These methods supply that functionality.
6//!
7//! Importantly, these methods are used internally by differential dataflow, but are made
8//! public for the convenience of others. Their precise behavior is driven by the needs of
9//! differential dataflow (chiefly: canonicalizing sequences of non-zero updates); should
10//! you need specific behavior, it may be best to defensively copy, paste, and maintain the
11//! specific behavior you require.
12
13use std::collections::VecDeque;
14use timely::container::{ContainerBuilder, PushInto};
15use crate::Data;
16use crate::difference::Semigroup;
17
18/// Sorts and consolidates `vec`.
19///
20/// This method will sort `vec` and then consolidate runs of more than one entry with
21/// identical first elements by accumulating the second elements of the pairs. Should the final
22/// accumulation be zero, the element is discarded.
23#[inline]
24pub fn consolidate<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>) {
25    consolidate_from(vec, 0);
26}
27
28/// Sorts and consolidate `vec[offset..]`.
29///
30/// This method will sort `vec[offset..]` and then consolidate runs of more than one entry with
31/// identical first elements by accumulating the second elements of the pairs. Should the final
32/// accumulation be zero, the element is discarded.
33#[inline]
34pub fn consolidate_from<T: Ord, R: Semigroup>(vec: &mut Vec<(T, R)>, offset: usize) {
35    let length = consolidate_slice(&mut vec[offset..]);
36    vec.truncate(offset + length);
37}
38
39/// Sorts and consolidates a slice, returning the valid prefix length.
40#[inline]
41pub fn consolidate_slice<T: Ord, R: Semigroup>(slice: &mut [(T, R)]) -> usize {
42    if slice.len() > 1 {
43        consolidate_slice_slow(slice)
44    }
45    else {
46        slice.iter().filter(|x| !x.1.is_zero()).count()
47    }
48}
49
50/// Part of `consolidate_slice` that handles slices of length greater than 1.
51fn consolidate_slice_slow<T: Ord, R: Semigroup>(slice: &mut [(T, R)]) -> usize {
52    // We could do an insertion-sort like initial scan which builds up sorted, consolidated runs.
53    // In a world where there are not many results, we may never even need to call in to merge sort.
54    slice.sort_by(|x,y| x.0.cmp(&y.0));
55
56    // Counts the number of distinct known-non-zero accumulations. Indexes the write location.
57    let mut offset = 0;
58    let mut accum = slice[offset].1.clone();
59
60    for index in 1 .. slice.len() {
61        if slice[index].0 == slice[index-1].0 {
62            accum.plus_equals(&slice[index].1);
63        }
64        else {
65            if !accum.is_zero() {
66                slice.swap(offset, index-1);
67                slice[offset].1.clone_from(&accum);
68                offset += 1;
69            }
70            accum.clone_from(&slice[index].1);
71        }
72    }
73    if !accum.is_zero() {
74        slice.swap(offset, slice.len()-1);
75        slice[offset].1 = accum;
76        offset += 1;
77    }
78
79    offset
80}
81
82/// Sorts and consolidates `vec`.
83///
84/// This method will sort `vec` and then consolidate runs of more than one entry with
85/// identical first two elements by accumulating the third elements of the triples. Should the final
86/// accumulation be zero, the element is discarded.
87#[inline]
88pub fn consolidate_updates<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>) {
89    consolidate_updates_from(vec, 0);
90}
91
92/// Sorts and consolidate `vec[offset..]`.
93///
94/// This method will sort `vec[offset..]` and then consolidate runs of more than one entry with
95/// identical first two elements by accumulating the third elements of the triples. Should the final
96/// accumulation be zero, the element is discarded.
97#[inline]
98pub fn consolidate_updates_from<D: Ord, T: Ord, R: Semigroup>(vec: &mut Vec<(D, T, R)>, offset: usize) {
99    let length = consolidate_updates_slice(&mut vec[offset..]);
100    vec.truncate(offset + length);
101}
102
103/// Sorts and consolidates a slice, returning the valid prefix length.
104#[inline]
105pub fn consolidate_updates_slice<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D, T, R)]) -> usize {
106
107    if slice.len() > 1 {
108        consolidate_updates_slice_slow(slice)
109    }
110    else {
111        slice.iter().filter(|x| !x.2.is_zero()).count()
112    }
113}
114
115/// Part of `consolidate_updates_slice` that handles slices of length greater than 1.
116fn consolidate_updates_slice_slow<D: Ord, T: Ord, R: Semigroup>(slice: &mut [(D, T, R)]) -> usize {
117    // We could do an insertion-sort like initial scan which builds up sorted, consolidated runs.
118    // In a world where there are not many results, we may never even need to call in to merge sort.
119    slice.sort_unstable_by(|x,y| (&x.0, &x.1).cmp(&(&y.0, &y.1)));
120
121    // Counts the number of distinct known-non-zero accumulations. Indexes the write location.
122    let mut offset = 0;
123    let mut accum = slice[offset].2.clone();
124
125    for index in 1 .. slice.len() {
126        if (slice[index].0 == slice[index-1].0) && (slice[index].1 == slice[index-1].1) {
127            accum.plus_equals(&slice[index].2);
128        }
129        else {
130            if !accum.is_zero() {
131                slice.swap(offset, index-1);
132                slice[offset].2.clone_from(&accum);
133                offset += 1;
134            }
135            accum.clone_from(&slice[index].2);
136        }
137    }
138    if !accum.is_zero() {
139        slice.swap(offset, slice.len()-1);
140        slice[offset].2 = accum;
141        offset += 1;
142    }
143
144    offset
145}
146
147
148/// A container builder that consolidates data in-places into fixed-sized containers. Does not
149/// maintain FIFO ordering.
150#[derive(Default)]
151pub struct ConsolidatingContainerBuilder<C>{
152    current: C,
153    empty: Vec<C>,
154    outbound: VecDeque<C>,
155}
156
157impl<D,T,R> ConsolidatingContainerBuilder<Vec<(D, T, R)>>
158where
159    D: Data,
160    T: Data,
161    R: Semigroup+'static,
162{
163    /// Flush `self.current` up to the biggest `multiple` of elements. Pass 1 to flush all elements.
164    // TODO: Can we replace `multiple` by a bool?
165    #[cold]
166    fn consolidate_and_flush_through(&mut self, multiple: usize) {
167        let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>();
168        consolidate_updates(&mut self.current);
169        let mut drain = self.current.drain(..(self.current.len()/multiple)*multiple).peekable();
170        while drain.peek().is_some() {
171            let mut container = self.empty.pop().unwrap_or_else(|| Vec::with_capacity(preferred_capacity));
172            container.clear();
173            container.extend((&mut drain).take(preferred_capacity));
174            self.outbound.push_back(container);
175        }
176    }
177}
178
179impl<D, T, R, P> PushInto<P> for ConsolidatingContainerBuilder<Vec<(D, T, R)>>
180where
181    D: Data,
182    T: Data,
183    R: Semigroup+'static,
184    Vec<(D, T, R)>: PushInto<P>,
185{
186    /// Push an element.
187    ///
188    /// Precondition: `current` is not allocated or has space for at least one element.
189    #[inline]
190    fn push_into(&mut self, item: P) {
191        let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>();
192        if self.current.capacity() < preferred_capacity * 2 {
193            self.current.reserve(preferred_capacity * 2 - self.current.capacity());
194        }
195        self.current.push_into(item);
196        if self.current.len() == self.current.capacity() {
197            // Flush complete containers.
198            self.consolidate_and_flush_through(preferred_capacity);
199        }
200    }
201}
202
203impl<D,T,R> ContainerBuilder for ConsolidatingContainerBuilder<Vec<(D, T, R)>>
204where
205    D: Data,
206    T: Data,
207    R: Semigroup+'static,
208{
209    type Container = Vec<(D,T,R)>;
210
211    #[inline]
212    fn extract(&mut self) -> Option<&mut Vec<(D,T,R)>> {
213        if let Some(container) = self.outbound.pop_front() {
214            self.empty.push(container);
215            self.empty.last_mut()
216        } else {
217            None
218        }
219    }
220
221    #[inline]
222    fn finish(&mut self) -> Option<&mut Vec<(D,T,R)>> {
223        if !self.current.is_empty() {
224            // Flush all
225            self.consolidate_and_flush_through(1);
226            // Remove all but two elements from the stash of empty to avoid memory leaks. We retain
227            // two to match `current` capacity.
228            self.empty.truncate(2);
229        }
230        self.extract()
231    }
232}
233
234/// A container that can sort and consolidate its contents internally.
235///
236/// The container knows its own layout — how to sort its elements, how to
237/// compare adjacent entries, and how to merge diffs. The caller provides
238/// a `target` container to receive the consolidated output, allowing
239/// reuse of allocations across calls.
240///
241/// After the call, `target` contains the sorted, consolidated data and
242/// `self` may be empty or in an unspecified state (implementations should
243/// document this).
244pub trait Consolidate {
245    /// The number of elements in the container.
246    fn len(&self) -> usize;
247    /// Clear the container.
248    fn clear(&mut self);
249    /// Sort and consolidate `self` into `target`.
250    fn consolidate_into(&mut self, target: &mut Self);
251}
252
253impl<D: Ord, T: Ord, R: Semigroup> Consolidate for Vec<(D, T, R)> {
254    fn len(&self) -> usize { Vec::len(self) }
255    fn clear(&mut self) { Vec::clear(self) }
256    fn consolidate_into(&mut self, target: &mut Self) {
257        consolidate_updates(self);
258        std::mem::swap(self, target);
259    }
260}
261
262#[cfg(test)]
263mod tests {
264    use super::*;
265
266    #[test]
267    fn test_consolidate() {
268        let test_cases = vec![
269            (
270                vec![("a", -1), ("b", -2), ("a", 1)],
271                vec![("b", -2)],
272            ),
273            (
274                vec![("a", -1), ("b", 0), ("a", 1)],
275                vec![],
276            ),
277            (
278                vec![("a", 0)],
279                vec![],
280            ),
281            (
282                vec![("a", 0), ("b", 0)],
283                vec![],
284            ),
285            (
286                vec![("a", 1), ("b", 1)],
287                vec![("a", 1), ("b", 1)],
288            ),
289        ];
290
291        for (mut input, output) in test_cases {
292            consolidate(&mut input);
293            assert_eq!(input, output);
294        }
295    }
296
297
298    #[test]
299    fn test_consolidate_updates() {
300        let test_cases = vec![
301            (
302                vec![("a", 1, -1), ("b", 1, -2), ("a", 1, 1)],
303                vec![("b", 1, -2)],
304            ),
305            (
306                vec![("a", 1, -1), ("b", 1, 0), ("a", 1, 1)],
307                vec![],
308            ),
309            (
310                vec![("a", 1, 0)],
311                vec![],
312            ),
313            (
314                vec![("a", 1, 0), ("b", 1, 0)],
315                vec![],
316            ),
317            (
318                vec![("a", 1, 1), ("b", 2, 1)],
319                vec![("a", 1, 1), ("b", 2, 1)],
320            ),
321        ];
322
323        for (mut input, output) in test_cases {
324            consolidate_updates(&mut input);
325            assert_eq!(input, output);
326        }
327    }
328
329    #[test]
330    fn test_consolidating_container_builder() {
331        let mut ccb = <ConsolidatingContainerBuilder<Vec<(usize, usize, usize)>>>::default();
332        for _ in 0..1024 {
333            ccb.push_into((0, 0, 0));
334        }
335        assert_eq!(ccb.extract(), None);
336        assert_eq!(ccb.finish(), None);
337
338        for i in 0..1024 {
339            ccb.push_into((i, 0, 1));
340        }
341
342        let mut collected = Vec::default();
343        while let Some(container) = ccb.finish() {
344            collected.append(container);
345        }
346        // The output happens to be sorted, but it's not guaranteed.
347        collected.sort();
348        for i in 0..1024 {
349            assert_eq!((i, 0, 1), collected[i]);
350        }
351    }
352
353    #[test]
354    fn test_consolidate_into() {
355        let mut data = vec![(1, 1, 1), (2, 1, 1), (1, 1, -1)];
356        let mut target = Vec::default();
357        data.sort();
358        data.consolidate_into(&mut target);
359        assert_eq!(target, [(2, 1, 1)]);
360    }
361
362    #[cfg(not(debug_assertions))]
363    const LEN: usize = 256 << 10;
364    #[cfg(not(debug_assertions))]
365    const REPS: usize = 10 << 10;
366
367    #[cfg(debug_assertions)]
368    const LEN: usize = 256 << 1;
369    #[cfg(debug_assertions)]
370    const REPS: usize = 10 << 1;
371
372    #[test]
373    fn test_consolidator_duration() {
374        let mut data = Vec::with_capacity(LEN);
375        let mut data2 = Vec::with_capacity(LEN);
376        let mut target = Vec::new();
377        let mut duration = std::time::Duration::default();
378        for _ in 0..REPS {
379            data.clear();
380            data2.clear();
381            target.clear();
382            data.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
383            data2.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
384            data.sort_by(|x,y| x.0.cmp(&y.0));
385            let start = std::time::Instant::now();
386            data.consolidate_into(&mut target);
387            duration += start.elapsed();
388
389            consolidate_updates(&mut data2);
390            assert_eq!(target, data2);
391        }
392        println!("elapsed consolidator {duration:?}");
393    }
394
395    #[test]
396    fn test_consolidator_duration_vec() {
397        let mut data = Vec::with_capacity(LEN);
398        let mut duration = std::time::Duration::default();
399        for _ in 0..REPS {
400            data.clear();
401            data.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize))));
402            data.sort_by(|x,y| x.0.cmp(&y.0));
403            let start = std::time::Instant::now();
404            consolidate_updates(&mut data);
405            duration += start.elapsed();
406        }
407        println!("elapsed vec {duration:?}");
408    }
409}