palimpsest_dataflow/trace/implementations/
chunker.rs1use std::collections::VecDeque;
4
5use columnation::Columnation;
6use timely::container::{ContainerBuilder, DrainContainer, PushInto, SizableContainer};
7use timely::Container;
8
9use crate::consolidation::{consolidate_updates, ConsolidateLayout};
10use crate::containers::TimelyStack;
11use crate::difference::Semigroup;
12
13pub struct VecChunker<T> {
15 pending: Vec<T>,
16 ready: VecDeque<Vec<T>>,
17 empty: Option<Vec<T>>,
18}
19
20impl<T> Default for VecChunker<T> {
21 fn default() -> Self {
22 Self {
23 pending: Vec::default(),
24 ready: VecDeque::default(),
25 empty: None,
26 }
27 }
28}
29
30impl<K, V, T, R> VecChunker<((K, V), T, R)>
31where
32 K: Ord,
33 V: Ord,
34 T: Ord,
35 R: Semigroup,
36{
37 const BUFFER_SIZE_BYTES: usize = 8 << 10;
38 fn chunk_capacity() -> usize {
39 let size = ::std::mem::size_of::<((K, V), T, R)>();
40 if size == 0 {
41 Self::BUFFER_SIZE_BYTES
42 } else if size <= Self::BUFFER_SIZE_BYTES {
43 Self::BUFFER_SIZE_BYTES / size
44 } else {
45 1
46 }
47 }
48
49 fn form_chunk(&mut self) {
58 consolidate_updates(&mut self.pending);
59 if self.pending.len() >= Self::chunk_capacity() {
60 while self.pending.len() > Self::chunk_capacity() {
61 let mut chunk = Vec::with_capacity(Self::chunk_capacity());
62 chunk.extend(self.pending.drain(..chunk.capacity()));
63 self.ready.push_back(chunk);
64 }
65 }
66 }
67}
68
69impl<'a, K, V, T, R> PushInto<&'a mut Vec<((K, V), T, R)>> for VecChunker<((K, V), T, R)>
70where
71 K: Ord + Clone,
72 V: Ord + Clone,
73 T: Ord + Clone,
74 R: Semigroup + Clone,
75{
76 fn push_into(&mut self, container: &'a mut Vec<((K, V), T, R)>) {
77 if self.pending.capacity() < Self::chunk_capacity() * 2 {
82 self.pending
83 .reserve(Self::chunk_capacity() * 2 - self.pending.len());
84 }
85
86 let mut drain = container.drain(..).peekable();
87 while drain.peek().is_some() {
88 self.pending
89 .extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
90 if self.pending.len() == self.pending.capacity() {
91 self.form_chunk();
92 }
93 }
94 }
95}
96
97impl<K, V, T, R> ContainerBuilder for VecChunker<((K, V), T, R)>
98where
99 K: Ord + Clone + 'static,
100 V: Ord + Clone + 'static,
101 T: Ord + Clone + 'static,
102 R: Semigroup + Clone + 'static,
103{
104 type Container = Vec<((K, V), T, R)>;
105
106 fn extract(&mut self) -> Option<&mut Self::Container> {
107 if let Some(ready) = self.ready.pop_front() {
108 self.empty = Some(ready);
109 self.empty.as_mut()
110 } else {
111 None
112 }
113 }
114
115 fn finish(&mut self) -> Option<&mut Self::Container> {
116 if !self.pending.is_empty() {
117 consolidate_updates(&mut self.pending);
118 while !self.pending.is_empty() {
119 let mut chunk = Vec::with_capacity(Self::chunk_capacity());
120 chunk.extend(
121 self.pending
122 .drain(..std::cmp::min(self.pending.len(), chunk.capacity())),
123 );
124 self.ready.push_back(chunk);
125 }
126 }
127 self.empty = self.ready.pop_front();
128 self.empty.as_mut()
129 }
130}
131
132pub struct ColumnationChunker<T: Columnation> {
134 pending: Vec<T>,
135 ready: VecDeque<TimelyStack<T>>,
136 empty: Option<TimelyStack<T>>,
137}
138
139impl<T: Columnation> Default for ColumnationChunker<T> {
140 fn default() -> Self {
141 Self {
142 pending: Vec::default(),
143 ready: VecDeque::default(),
144 empty: None,
145 }
146 }
147}
148
149impl<D, T, R> ColumnationChunker<(D, T, R)>
150where
151 D: Columnation + Ord,
152 T: Columnation + Ord,
153 R: Columnation + Semigroup,
154{
155 const BUFFER_SIZE_BYTES: usize = 64 << 10;
156 fn chunk_capacity() -> usize {
157 let size = ::std::mem::size_of::<(D, T, R)>();
158 if size == 0 {
159 Self::BUFFER_SIZE_BYTES
160 } else if size <= Self::BUFFER_SIZE_BYTES {
161 Self::BUFFER_SIZE_BYTES / size
162 } else {
163 1
164 }
165 }
166
167 fn form_chunk(&mut self) {
176 consolidate_updates(&mut self.pending);
177 if self.pending.len() >= Self::chunk_capacity() {
178 while self.pending.len() > Self::chunk_capacity() {
179 let mut chunk = TimelyStack::with_capacity(Self::chunk_capacity());
180 for item in self.pending.drain(..chunk.capacity()) {
181 chunk.copy(&item);
182 }
183 self.ready.push_back(chunk);
184 }
185 }
186 }
187}
188
189impl<'a, D, T, R> PushInto<&'a mut Vec<(D, T, R)>> for ColumnationChunker<(D, T, R)>
190where
191 D: Columnation + Ord + Clone,
192 T: Columnation + Ord + Clone,
193 R: Columnation + Semigroup + Clone,
194{
195 fn push_into(&mut self, container: &'a mut Vec<(D, T, R)>) {
196 if self.pending.capacity() < Self::chunk_capacity() * 2 {
199 self.pending
200 .reserve(Self::chunk_capacity() * 2 - self.pending.len());
201 }
202
203 let mut drain = container.drain(..).peekable();
204 while drain.peek().is_some() {
205 self.pending
206 .extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
207 if self.pending.len() == self.pending.capacity() {
208 self.form_chunk();
209 }
210 }
211 }
212}
213
214impl<D, T, R> ContainerBuilder for ColumnationChunker<(D, T, R)>
215where
216 D: Columnation + Ord + Clone + 'static,
217 T: Columnation + Ord + Clone + 'static,
218 R: Columnation + Semigroup + Clone + 'static,
219{
220 type Container = TimelyStack<(D, T, R)>;
221
222 fn extract(&mut self) -> Option<&mut Self::Container> {
223 if let Some(ready) = self.ready.pop_front() {
224 self.empty = Some(ready);
225 self.empty.as_mut()
226 } else {
227 None
228 }
229 }
230
231 fn finish(&mut self) -> Option<&mut Self::Container> {
232 consolidate_updates(&mut self.pending);
233 while !self.pending.is_empty() {
234 let mut chunk = TimelyStack::with_capacity(Self::chunk_capacity());
235 for item in self
236 .pending
237 .drain(..std::cmp::min(self.pending.len(), chunk.capacity()))
238 {
239 chunk.copy(&item);
240 }
241 self.ready.push_back(chunk);
242 }
243 self.empty = self.ready.pop_front();
244 self.empty.as_mut()
245 }
246}
247
248pub struct ContainerChunker<Output> {
250 pending: Output,
251 ready: VecDeque<Output>,
252 empty: Output,
253}
254
255impl<Output: Default> Default for ContainerChunker<Output> {
256 fn default() -> Self {
257 Self {
258 pending: Output::default(),
259 ready: VecDeque::default(),
260 empty: Output::default(),
261 }
262 }
263}
264
265impl<'a, Input, Output> PushInto<&'a mut Input> for ContainerChunker<Output>
266where
267 Input: DrainContainer,
268 Output: Default + SizableContainer + ConsolidateLayout + PushInto<Input::Item<'a>>,
269{
270 fn push_into(&mut self, container: &'a mut Input) {
271 self.pending.ensure_capacity(&mut None);
272
273 for item in container.drain() {
274 self.pending.push_into(item);
275 if self.pending.at_capacity() {
276 let starting_len = self.pending.len();
277 self.pending.consolidate_into(&mut self.empty);
278 std::mem::swap(&mut self.pending, &mut self.empty);
279 self.empty.clear();
280 if self.pending.len() > starting_len / 2 {
281 self.ready.push_back(std::mem::take(&mut self.pending));
285 }
286 }
287 }
288 }
289}
290
291impl<Output> ContainerBuilder for ContainerChunker<Output>
292where
293 Output: SizableContainer + ConsolidateLayout + Container,
294{
295 type Container = Output;
296
297 fn extract(&mut self) -> Option<&mut Self::Container> {
298 if let Some(ready) = self.ready.pop_front() {
299 self.empty = ready;
300 Some(&mut self.empty)
301 } else {
302 None
303 }
304 }
305
306 fn finish(&mut self) -> Option<&mut Self::Container> {
307 if !self.pending.is_empty() {
308 self.pending.consolidate_into(&mut self.empty);
309 std::mem::swap(&mut self.pending, &mut self.empty);
310 self.empty.clear();
311 if !self.pending.is_empty() {
312 self.ready.push_back(std::mem::take(&mut self.pending));
313 }
314 }
315 if let Some(ready) = self.ready.pop_front() {
316 self.empty = ready;
317 Some(&mut self.empty)
318 } else {
319 None
320 }
321 }
322}