Skip to main content

palimpsest_dataflow/trace/implementations/
chunker.rs

1//! Organize streams of data into sorted chunks.
2
3use std::collections::VecDeque;
4
5use columnation::Columnation;
6use timely::container::{ContainerBuilder, DrainContainer, PushInto, SizableContainer};
7use timely::Container;
8
9use crate::consolidation::{consolidate_updates, ConsolidateLayout};
10use crate::containers::TimelyStack;
11use crate::difference::Semigroup;
12
13/// Chunk a stream of vectors into chains of vectors.
14pub struct VecChunker<T> {
15    pending: Vec<T>,
16    ready: VecDeque<Vec<T>>,
17    empty: Option<Vec<T>>,
18}
19
20impl<T> Default for VecChunker<T> {
21    fn default() -> Self {
22        Self {
23            pending: Vec::default(),
24            ready: VecDeque::default(),
25            empty: None,
26        }
27    }
28}
29
30impl<K, V, T, R> VecChunker<((K, V), T, R)>
31where
32    K: Ord,
33    V: Ord,
34    T: Ord,
35    R: Semigroup,
36{
37    const BUFFER_SIZE_BYTES: usize = 8 << 10;
38    fn chunk_capacity() -> usize {
39        let size = ::std::mem::size_of::<((K, V), T, R)>();
40        if size == 0 {
41            Self::BUFFER_SIZE_BYTES
42        } else if size <= Self::BUFFER_SIZE_BYTES {
43            Self::BUFFER_SIZE_BYTES / size
44        } else {
45            1
46        }
47    }
48
49    /// Form chunks out of pending data, if needed. This function is meant to be applied to
50    /// potentially full buffers, and ensures that if the buffer was full when called it is at most
51    /// half full when the function returns.
52    ///
53    /// `form_chunk` does the following:
54    /// * If pending is full, consolidate.
55    /// * If after consolidation it's more than half full, peel off chunks,
56    ///   leaving behind any partial chunk in pending.
57    fn form_chunk(&mut self) {
58        consolidate_updates(&mut self.pending);
59        if self.pending.len() >= Self::chunk_capacity() {
60            while self.pending.len() > Self::chunk_capacity() {
61                let mut chunk = Vec::with_capacity(Self::chunk_capacity());
62                chunk.extend(self.pending.drain(..chunk.capacity()));
63                self.ready.push_back(chunk);
64            }
65        }
66    }
67}
68
69impl<'a, K, V, T, R> PushInto<&'a mut Vec<((K, V), T, R)>> for VecChunker<((K, V), T, R)>
70where
71    K: Ord + Clone,
72    V: Ord + Clone,
73    T: Ord + Clone,
74    R: Semigroup + Clone,
75{
76    fn push_into(&mut self, container: &'a mut Vec<((K, V), T, R)>) {
77        // Ensure `self.pending` has the desired capacity. We should never have a larger capacity
78        // because we don't write more than capacity elements into the buffer.
79        // Important: Consolidation requires `pending` to have twice the chunk capacity to
80        // amortize its cost. Otherwise, it risks to do quadratic work.
81        if self.pending.capacity() < Self::chunk_capacity() * 2 {
82            self.pending
83                .reserve(Self::chunk_capacity() * 2 - self.pending.len());
84        }
85
86        let mut drain = container.drain(..).peekable();
87        while drain.peek().is_some() {
88            self.pending
89                .extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
90            if self.pending.len() == self.pending.capacity() {
91                self.form_chunk();
92            }
93        }
94    }
95}
96
97impl<K, V, T, R> ContainerBuilder for VecChunker<((K, V), T, R)>
98where
99    K: Ord + Clone + 'static,
100    V: Ord + Clone + 'static,
101    T: Ord + Clone + 'static,
102    R: Semigroup + Clone + 'static,
103{
104    type Container = Vec<((K, V), T, R)>;
105
106    fn extract(&mut self) -> Option<&mut Self::Container> {
107        if let Some(ready) = self.ready.pop_front() {
108            self.empty = Some(ready);
109            self.empty.as_mut()
110        } else {
111            None
112        }
113    }
114
115    fn finish(&mut self) -> Option<&mut Self::Container> {
116        if !self.pending.is_empty() {
117            consolidate_updates(&mut self.pending);
118            while !self.pending.is_empty() {
119                let mut chunk = Vec::with_capacity(Self::chunk_capacity());
120                chunk.extend(
121                    self.pending
122                        .drain(..std::cmp::min(self.pending.len(), chunk.capacity())),
123                );
124                self.ready.push_back(chunk);
125            }
126        }
127        self.empty = self.ready.pop_front();
128        self.empty.as_mut()
129    }
130}
131
132/// Chunk a stream of vectors into chains of vectors.
133pub struct ColumnationChunker<T: Columnation> {
134    pending: Vec<T>,
135    ready: VecDeque<TimelyStack<T>>,
136    empty: Option<TimelyStack<T>>,
137}
138
139impl<T: Columnation> Default for ColumnationChunker<T> {
140    fn default() -> Self {
141        Self {
142            pending: Vec::default(),
143            ready: VecDeque::default(),
144            empty: None,
145        }
146    }
147}
148
149impl<D, T, R> ColumnationChunker<(D, T, R)>
150where
151    D: Columnation + Ord,
152    T: Columnation + Ord,
153    R: Columnation + Semigroup,
154{
155    const BUFFER_SIZE_BYTES: usize = 64 << 10;
156    fn chunk_capacity() -> usize {
157        let size = ::std::mem::size_of::<(D, T, R)>();
158        if size == 0 {
159            Self::BUFFER_SIZE_BYTES
160        } else if size <= Self::BUFFER_SIZE_BYTES {
161            Self::BUFFER_SIZE_BYTES / size
162        } else {
163            1
164        }
165    }
166
167    /// Form chunks out of pending data, if needed. This function is meant to be applied to
168    /// potentially full buffers, and ensures that if the buffer was full when called it is at most
169    /// half full when the function returns.
170    ///
171    /// `form_chunk` does the following:
172    /// * If pending is full, consolidate.
173    /// * If after consolidation it's more than half full, peel off chunks,
174    ///   leaving behind any partial chunk in pending.
175    fn form_chunk(&mut self) {
176        consolidate_updates(&mut self.pending);
177        if self.pending.len() >= Self::chunk_capacity() {
178            while self.pending.len() > Self::chunk_capacity() {
179                let mut chunk = TimelyStack::with_capacity(Self::chunk_capacity());
180                for item in self.pending.drain(..chunk.capacity()) {
181                    chunk.copy(&item);
182                }
183                self.ready.push_back(chunk);
184            }
185        }
186    }
187}
188
189impl<'a, D, T, R> PushInto<&'a mut Vec<(D, T, R)>> for ColumnationChunker<(D, T, R)>
190where
191    D: Columnation + Ord + Clone,
192    T: Columnation + Ord + Clone,
193    R: Columnation + Semigroup + Clone,
194{
195    fn push_into(&mut self, container: &'a mut Vec<(D, T, R)>) {
196        // Ensure `self.pending` has the desired capacity. We should never have a larger capacity
197        // because we don't write more than capacity elements into the buffer.
198        if self.pending.capacity() < Self::chunk_capacity() * 2 {
199            self.pending
200                .reserve(Self::chunk_capacity() * 2 - self.pending.len());
201        }
202
203        let mut drain = container.drain(..).peekable();
204        while drain.peek().is_some() {
205            self.pending
206                .extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
207            if self.pending.len() == self.pending.capacity() {
208                self.form_chunk();
209            }
210        }
211    }
212}
213
214impl<D, T, R> ContainerBuilder for ColumnationChunker<(D, T, R)>
215where
216    D: Columnation + Ord + Clone + 'static,
217    T: Columnation + Ord + Clone + 'static,
218    R: Columnation + Semigroup + Clone + 'static,
219{
220    type Container = TimelyStack<(D, T, R)>;
221
222    fn extract(&mut self) -> Option<&mut Self::Container> {
223        if let Some(ready) = self.ready.pop_front() {
224            self.empty = Some(ready);
225            self.empty.as_mut()
226        } else {
227            None
228        }
229    }
230
231    fn finish(&mut self) -> Option<&mut Self::Container> {
232        consolidate_updates(&mut self.pending);
233        while !self.pending.is_empty() {
234            let mut chunk = TimelyStack::with_capacity(Self::chunk_capacity());
235            for item in self
236                .pending
237                .drain(..std::cmp::min(self.pending.len(), chunk.capacity()))
238            {
239                chunk.copy(&item);
240            }
241            self.ready.push_back(chunk);
242        }
243        self.empty = self.ready.pop_front();
244        self.empty.as_mut()
245    }
246}
247
248/// Chunk a stream of containers into chains of vectors.
249pub struct ContainerChunker<Output> {
250    pending: Output,
251    ready: VecDeque<Output>,
252    empty: Output,
253}
254
255impl<Output: Default> Default for ContainerChunker<Output> {
256    fn default() -> Self {
257        Self {
258            pending: Output::default(),
259            ready: VecDeque::default(),
260            empty: Output::default(),
261        }
262    }
263}
264
265impl<'a, Input, Output> PushInto<&'a mut Input> for ContainerChunker<Output>
266where
267    Input: DrainContainer,
268    Output: Default + SizableContainer + ConsolidateLayout + PushInto<Input::Item<'a>>,
269{
270    fn push_into(&mut self, container: &'a mut Input) {
271        self.pending.ensure_capacity(&mut None);
272
273        for item in container.drain() {
274            self.pending.push_into(item);
275            if self.pending.at_capacity() {
276                let starting_len = self.pending.len();
277                self.pending.consolidate_into(&mut self.empty);
278                std::mem::swap(&mut self.pending, &mut self.empty);
279                self.empty.clear();
280                if self.pending.len() > starting_len / 2 {
281                    // Note that we're pushing non-full containers, which is a deviation from
282                    // other implementation. The reason for this is that we cannot extract
283                    // partial data from `this.pending`. We should revisit this in the future.
284                    self.ready.push_back(std::mem::take(&mut self.pending));
285                }
286            }
287        }
288    }
289}
290
291impl<Output> ContainerBuilder for ContainerChunker<Output>
292where
293    Output: SizableContainer + ConsolidateLayout + Container,
294{
295    type Container = Output;
296
297    fn extract(&mut self) -> Option<&mut Self::Container> {
298        if let Some(ready) = self.ready.pop_front() {
299            self.empty = ready;
300            Some(&mut self.empty)
301        } else {
302            None
303        }
304    }
305
306    fn finish(&mut self) -> Option<&mut Self::Container> {
307        if !self.pending.is_empty() {
308            self.pending.consolidate_into(&mut self.empty);
309            std::mem::swap(&mut self.pending, &mut self.empty);
310            self.empty.clear();
311            if !self.pending.is_empty() {
312                self.ready.push_back(std::mem::take(&mut self.pending));
313            }
314        }
315        if let Some(ready) = self.ready.pop_front() {
316            self.empty = ready;
317            Some(&mut self.empty)
318        } else {
319            None
320        }
321    }
322}