Skip to main content

differential_dataflow/trace/implementations/
chunker.rs

1//! Organize streams of data into sorted chunks.
2
3use std::collections::VecDeque;
4
5use timely::Container;
6use timely::container::{ContainerBuilder, DrainContainer, PushInto, SizableContainer};
7
8use crate::consolidation::Consolidate;
9
10/// Chunk a stream of containers into chains of vectors.
11pub struct ContainerChunker<Output> {
12    pending: Output,
13    ready: VecDeque<Output>,
14    empty: Output,
15}
16
17impl<Output: Default> Default for ContainerChunker<Output> {
18    fn default() -> Self {
19        Self {
20            pending: Output::default(),
21            ready: VecDeque::default(),
22            empty: Output::default(),
23        }
24    }
25}
26
27impl<'a, Input, Output> PushInto<&'a mut Input> for ContainerChunker<Output>
28where
29    Input: DrainContainer,
30    Output: Default
31        + SizableContainer
32        + Consolidate
33        + PushInto<Input::Item<'a>>,
34{
35    fn push_into(&mut self, container: &'a mut Input) {
36        self.pending.ensure_capacity(&mut None);
37
38        for item in container.drain() {
39            self.pending.push_into(item);
40            if self.pending.at_capacity() {
41                let starting_len = self.pending.len();
42                self.pending.consolidate_into(&mut self.empty);
43                std::mem::swap(&mut self.pending, &mut self.empty);
44                self.empty.clear();
45                if self.pending.len() > starting_len / 2 {
46                    // Note that we're pushing non-full containers, which is a deviation from
47                    // other implementation. The reason for this is that we cannot extract
48                    // partial data from `this.pending`. We should revisit this in the future.
49                    self.ready.push_back(std::mem::take(&mut self.pending));
50                }
51            }
52        }
53    }
54}
55
56impl<Output> ContainerBuilder for ContainerChunker<Output>
57where
58    Output: SizableContainer + Consolidate + Container,
59{
60    type Container = Output;
61
62    fn extract(&mut self) -> Option<&mut Self::Container> {
63        if let Some(ready) = self.ready.pop_front() {
64            self.empty = ready;
65            Some(&mut self.empty)
66        } else {
67            None
68        }
69    }
70
71    fn finish(&mut self) -> Option<&mut Self::Container> {
72        if !self.pending.is_empty() {
73            self.pending.consolidate_into(&mut self.empty);
74            std::mem::swap(&mut self.pending, &mut self.empty);
75            self.empty.clear();
76            if !self.pending.is_empty() {
77                self.ready.push_back(std::mem::take(&mut self.pending));
78            }
79        }
80        if let Some(ready) = self.ready.pop_front() {
81            self.empty = ready;
82            Some(&mut self.empty)
83        } else {
84            None
85        }
86    }
87}