differential_dataflow/trace/implementations/
chunker.rs

1//! Organize streams of data into sorted chunks.
2
3use std::collections::VecDeque;
4
5use columnation::Columnation;
6use timely::Container;
7use timely::container::{ContainerBuilder, PushInto, SizableContainer};
8
9use crate::containers::TimelyStack;
10use crate::consolidation::{consolidate_updates, ConsolidateLayout};
11use crate::difference::Semigroup;
12
13/// Chunk a stream of vectors into chains of vectors.
14pub struct VecChunker<T> {
15    pending: Vec<T>,
16    ready: VecDeque<Vec<T>>,
17    empty: Option<Vec<T>>,
18}
19
20impl<T> Default for VecChunker<T> {
21    fn default() -> Self {
22        Self {
23            pending: Vec::default(),
24            ready: VecDeque::default(),
25            empty: None,
26        }
27    }
28}
29
30impl<K, V, T, R> VecChunker<((K, V), T, R)>
31where
32    K: Ord,
33    V: Ord,
34    T: Ord,
35    R: Semigroup,
36{
37    const BUFFER_SIZE_BYTES: usize = 8 << 10;
38    fn chunk_capacity() -> usize {
39        let size = ::std::mem::size_of::<((K, V), T, R)>();
40        if size == 0 {
41            Self::BUFFER_SIZE_BYTES
42        } else if size <= Self::BUFFER_SIZE_BYTES {
43            Self::BUFFER_SIZE_BYTES / size
44        } else {
45            1
46        }
47    }
48
49    /// Form chunks out of pending data, if needed. This function is meant to be applied to
50    /// potentially full buffers, and ensures that if the buffer was full when called it is at most
51    /// half full when the function returns.
52    ///
53    /// `form_chunk` does the following:
54    /// * If pending is full, consolidate.
55    /// * If after consolidation it's more than half full, peel off chunks,
56    ///   leaving behind any partial chunk in pending.
57    fn form_chunk(&mut self) {
58        consolidate_updates(&mut self.pending);
59        if self.pending.len() >= Self::chunk_capacity() {
60            while self.pending.len() > Self::chunk_capacity() {
61                let mut chunk = Vec::with_capacity(Self::chunk_capacity());
62                chunk.extend(self.pending.drain(..chunk.capacity()));
63                self.ready.push_back(chunk);
64            }
65        }
66    }
67}
68
69impl<'a, K, V, T, R> PushInto<&'a mut Vec<((K, V), T, R)>> for VecChunker<((K, V), T, R)>
70where
71    K: Ord + Clone,
72    V: Ord + Clone,
73    T: Ord + Clone,
74    R: Semigroup + Clone,
75{
76    fn push_into(&mut self, container: &'a mut Vec<((K, V), T, R)>) {
77        // Ensure `self.pending` has the desired capacity. We should never have a larger capacity
78        // because we don't write more than capacity elements into the buffer.
79        // Important: Consolidation requires `pending` to have twice the chunk capacity to
80        // amortize its cost. Otherwise, it risks to do quadratic work.
81        if self.pending.capacity() < Self::chunk_capacity() * 2 {
82            self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len());
83        }
84
85        let mut drain = container.drain(..).peekable();
86        while drain.peek().is_some() {
87            self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
88            if self.pending.len() == self.pending.capacity() {
89                self.form_chunk();
90            }
91        }
92    }
93}
94
95impl<K, V, T, R> ContainerBuilder for VecChunker<((K, V), T, R)>
96where
97    K: Ord + Clone + 'static,
98    V: Ord + Clone + 'static,
99    T: Ord + Clone + 'static,
100    R: Semigroup + Clone + 'static,
101{
102    type Container = Vec<((K, V), T, R)>;
103
104    fn extract(&mut self) -> Option<&mut Self::Container> {
105        if let Some(ready) = self.ready.pop_front() {
106            self.empty = Some(ready);
107            self.empty.as_mut()
108        } else {
109            None
110        }
111    }
112
113    fn finish(&mut self) -> Option<&mut Self::Container> {
114        if !self.pending.is_empty() {
115            consolidate_updates(&mut self.pending);
116            while !self.pending.is_empty() {
117                let mut chunk = Vec::with_capacity(Self::chunk_capacity());
118                chunk.extend(self.pending.drain(..std::cmp::min(self.pending.len(), chunk.capacity())));
119                self.ready.push_back(chunk);
120            }
121        }
122        self.empty = self.ready.pop_front();
123        self.empty.as_mut()
124    }
125}
126
127/// Chunk a stream of vectors into chains of vectors.
128pub struct ColumnationChunker<T: Columnation> {
129    pending: Vec<T>,
130    ready: VecDeque<TimelyStack<T>>,
131    empty: Option<TimelyStack<T>>,
132}
133
134impl<T: Columnation> Default for ColumnationChunker<T> {
135    fn default() -> Self {
136        Self {
137            pending: Vec::default(),
138            ready: VecDeque::default(),
139            empty: None,
140        }
141    }
142}
143
144impl<D,T,R> ColumnationChunker<(D, T, R)>
145where
146    D: Columnation + Ord,
147    T: Columnation + Ord,
148    R: Columnation + Semigroup,
149{
150    const BUFFER_SIZE_BYTES: usize = 64 << 10;
151    fn chunk_capacity() -> usize {
152        let size = ::std::mem::size_of::<(D, T, R)>();
153        if size == 0 {
154            Self::BUFFER_SIZE_BYTES
155        } else if size <= Self::BUFFER_SIZE_BYTES {
156            Self::BUFFER_SIZE_BYTES / size
157        } else {
158            1
159        }
160    }
161
162    /// Form chunks out of pending data, if needed. This function is meant to be applied to
163    /// potentially full buffers, and ensures that if the buffer was full when called it is at most
164    /// half full when the function returns.
165    ///
166    /// `form_chunk` does the following:
167    /// * If pending is full, consolidate.
168    /// * If after consolidation it's more than half full, peel off chunks,
169    ///   leaving behind any partial chunk in pending.
170    fn form_chunk(&mut self) {
171        consolidate_updates(&mut self.pending);
172        if self.pending.len() >= Self::chunk_capacity() {
173            while self.pending.len() > Self::chunk_capacity() {
174                let mut chunk = TimelyStack::with_capacity(Self::chunk_capacity());
175                for item in self.pending.drain(..chunk.capacity()) {
176                    chunk.copy(&item);
177                }
178                self.ready.push_back(chunk);
179            }
180        }
181    }
182}
183
184impl<'a, D, T, R> PushInto<&'a mut Vec<(D, T, R)>> for ColumnationChunker<(D, T, R)>
185where
186    D: Columnation + Ord + Clone,
187    T: Columnation + Ord + Clone,
188    R: Columnation + Semigroup + Clone,
189{
190    fn push_into(&mut self, container: &'a mut Vec<(D, T, R)>) {
191        // Ensure `self.pending` has the desired capacity. We should never have a larger capacity
192        // because we don't write more than capacity elements into the buffer.
193        if self.pending.capacity() < Self::chunk_capacity() * 2 {
194            self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len());
195        }
196
197        let mut drain = container.drain(..).peekable();
198        while drain.peek().is_some() {
199            self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
200            if self.pending.len() == self.pending.capacity() {
201                self.form_chunk();
202            }
203        }
204    }
205}
206
207impl<D, T, R> ContainerBuilder for ColumnationChunker<(D, T, R)>
208where
209    D: Columnation + Ord + Clone + 'static,
210    T: Columnation + Ord + Clone + 'static,
211    R: Columnation + Semigroup + Clone + 'static,
212{
213    type Container = TimelyStack<(D,T,R)>;
214
215    fn extract(&mut self) -> Option<&mut Self::Container> {
216        if let Some(ready) = self.ready.pop_front() {
217            self.empty = Some(ready);
218            self.empty.as_mut()
219        } else {
220            None
221        }
222    }
223
224    fn finish(&mut self) -> Option<&mut Self::Container> {
225        consolidate_updates(&mut self.pending);
226        while !self.pending.is_empty() {
227            let mut chunk = TimelyStack::with_capacity(Self::chunk_capacity());
228            for item in self.pending.drain(..std::cmp::min(self.pending.len(), chunk.capacity())) {
229                chunk.copy(&item);
230            }
231            self.ready.push_back(chunk);
232        }
233        self.empty = self.ready.pop_front();
234        self.empty.as_mut()
235    }
236}
237
238/// Chunk a stream of containers into chains of vectors.
239pub struct ContainerChunker<Output> {
240    pending: Output,
241    ready: VecDeque<Output>,
242    empty: Output,
243}
244
245impl<Output: Default> Default for ContainerChunker<Output> {
246    fn default() -> Self {
247        Self {
248            pending: Output::default(),
249            ready: VecDeque::default(),
250            empty: Output::default(),
251        }
252    }
253}
254
255impl<'a, Input, Output> PushInto<&'a mut Input> for ContainerChunker<Output>
256where
257    Input: Container,
258    Output: SizableContainer
259        + ConsolidateLayout
260        + PushInto<Input::Item<'a>>,
261{
262    fn push_into(&mut self, container: &'a mut Input) {
263        self.pending.ensure_capacity(&mut None);
264
265        for item in container.drain() {
266            self.pending.push(item);
267            if self.pending.at_capacity() {
268                let starting_len = self.pending.len();
269                self.pending.consolidate_into(&mut self.empty);
270                std::mem::swap(&mut self.pending, &mut self.empty);
271                self.empty.clear();
272                if self.pending.len() > starting_len / 2 {
273                    // Note that we're pushing non-full containers, which is a deviation from
274                    // other implementation. The reason for this is that we cannot extract
275                    // partial data from `this.pending`. We should revisit this in the future.
276                    self.ready.push_back(std::mem::take(&mut self.pending));
277                }
278            }
279        }
280    }
281}
282
283impl<Output> ContainerBuilder for ContainerChunker<Output>
284where
285    Output: SizableContainer + ConsolidateLayout + Clone + 'static,
286{
287    type Container = Output;
288
289    fn extract(&mut self) -> Option<&mut Self::Container> {
290        if let Some(ready) = self.ready.pop_front() {
291            self.empty = ready;
292            Some(&mut self.empty)
293        } else {
294            None
295        }
296    }
297
298    fn finish(&mut self) -> Option<&mut Self::Container> {
299        if !self.pending.is_empty() {
300            self.pending.consolidate_into(&mut self.empty);
301            std::mem::swap(&mut self.pending, &mut self.empty);
302            self.empty.clear();
303            if !self.pending.is_empty() {
304                self.ready.push_back(std::mem::take(&mut self.pending));
305            }
306        }
307        if let Some(ready) = self.ready.pop_front() {
308            self.empty = ready;
309            Some(&mut self.empty)
310        } else {
311            None
312        }
313    }
314}