differential_dataflow/trace/implementations/
chunker.rs1use std::collections::VecDeque;
4
5use columnation::Columnation;
6use timely::Container;
7use timely::container::{ContainerBuilder, PushInto, SizableContainer};
8
9use crate::containers::TimelyStack;
10use crate::consolidation::{consolidate_updates, ConsolidateLayout};
11use crate::difference::Semigroup;
12
13pub struct VecChunker<T> {
15 pending: Vec<T>,
16 ready: VecDeque<Vec<T>>,
17 empty: Option<Vec<T>>,
18}
19
20impl<T> Default for VecChunker<T> {
21 fn default() -> Self {
22 Self {
23 pending: Vec::default(),
24 ready: VecDeque::default(),
25 empty: None,
26 }
27 }
28}
29
30impl<K, V, T, R> VecChunker<((K, V), T, R)>
31where
32 K: Ord,
33 V: Ord,
34 T: Ord,
35 R: Semigroup,
36{
37 const BUFFER_SIZE_BYTES: usize = 8 << 10;
38 fn chunk_capacity() -> usize {
39 let size = ::std::mem::size_of::<((K, V), T, R)>();
40 if size == 0 {
41 Self::BUFFER_SIZE_BYTES
42 } else if size <= Self::BUFFER_SIZE_BYTES {
43 Self::BUFFER_SIZE_BYTES / size
44 } else {
45 1
46 }
47 }
48
49 fn form_chunk(&mut self) {
58 consolidate_updates(&mut self.pending);
59 if self.pending.len() >= Self::chunk_capacity() {
60 while self.pending.len() > Self::chunk_capacity() {
61 let mut chunk = Vec::with_capacity(Self::chunk_capacity());
62 chunk.extend(self.pending.drain(..chunk.capacity()));
63 self.ready.push_back(chunk);
64 }
65 }
66 }
67}
68
69impl<'a, K, V, T, R> PushInto<&'a mut Vec<((K, V), T, R)>> for VecChunker<((K, V), T, R)>
70where
71 K: Ord + Clone,
72 V: Ord + Clone,
73 T: Ord + Clone,
74 R: Semigroup + Clone,
75{
76 fn push_into(&mut self, container: &'a mut Vec<((K, V), T, R)>) {
77 if self.pending.capacity() < Self::chunk_capacity() * 2 {
82 self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len());
83 }
84
85 let mut drain = container.drain(..).peekable();
86 while drain.peek().is_some() {
87 self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
88 if self.pending.len() == self.pending.capacity() {
89 self.form_chunk();
90 }
91 }
92 }
93}
94
95impl<K, V, T, R> ContainerBuilder for VecChunker<((K, V), T, R)>
96where
97 K: Ord + Clone + 'static,
98 V: Ord + Clone + 'static,
99 T: Ord + Clone + 'static,
100 R: Semigroup + Clone + 'static,
101{
102 type Container = Vec<((K, V), T, R)>;
103
104 fn extract(&mut self) -> Option<&mut Self::Container> {
105 if let Some(ready) = self.ready.pop_front() {
106 self.empty = Some(ready);
107 self.empty.as_mut()
108 } else {
109 None
110 }
111 }
112
113 fn finish(&mut self) -> Option<&mut Self::Container> {
114 if !self.pending.is_empty() {
115 consolidate_updates(&mut self.pending);
116 while !self.pending.is_empty() {
117 let mut chunk = Vec::with_capacity(Self::chunk_capacity());
118 chunk.extend(self.pending.drain(..std::cmp::min(self.pending.len(), chunk.capacity())));
119 self.ready.push_back(chunk);
120 }
121 }
122 self.empty = self.ready.pop_front();
123 self.empty.as_mut()
124 }
125}
126
127pub struct ColumnationChunker<T: Columnation> {
129 pending: Vec<T>,
130 ready: VecDeque<TimelyStack<T>>,
131 empty: Option<TimelyStack<T>>,
132}
133
134impl<T: Columnation> Default for ColumnationChunker<T> {
135 fn default() -> Self {
136 Self {
137 pending: Vec::default(),
138 ready: VecDeque::default(),
139 empty: None,
140 }
141 }
142}
143
144impl<D,T,R> ColumnationChunker<(D, T, R)>
145where
146 D: Columnation + Ord,
147 T: Columnation + Ord,
148 R: Columnation + Semigroup,
149{
150 const BUFFER_SIZE_BYTES: usize = 64 << 10;
151 fn chunk_capacity() -> usize {
152 let size = ::std::mem::size_of::<(D, T, R)>();
153 if size == 0 {
154 Self::BUFFER_SIZE_BYTES
155 } else if size <= Self::BUFFER_SIZE_BYTES {
156 Self::BUFFER_SIZE_BYTES / size
157 } else {
158 1
159 }
160 }
161
162 fn form_chunk(&mut self) {
171 consolidate_updates(&mut self.pending);
172 if self.pending.len() >= Self::chunk_capacity() {
173 while self.pending.len() > Self::chunk_capacity() {
174 let mut chunk = TimelyStack::with_capacity(Self::chunk_capacity());
175 for item in self.pending.drain(..chunk.capacity()) {
176 chunk.copy(&item);
177 }
178 self.ready.push_back(chunk);
179 }
180 }
181 }
182}
183
184impl<'a, D, T, R> PushInto<&'a mut Vec<(D, T, R)>> for ColumnationChunker<(D, T, R)>
185where
186 D: Columnation + Ord + Clone,
187 T: Columnation + Ord + Clone,
188 R: Columnation + Semigroup + Clone,
189{
190 fn push_into(&mut self, container: &'a mut Vec<(D, T, R)>) {
191 if self.pending.capacity() < Self::chunk_capacity() * 2 {
194 self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len());
195 }
196
197 let mut drain = container.drain(..).peekable();
198 while drain.peek().is_some() {
199 self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
200 if self.pending.len() == self.pending.capacity() {
201 self.form_chunk();
202 }
203 }
204 }
205}
206
207impl<D, T, R> ContainerBuilder for ColumnationChunker<(D, T, R)>
208where
209 D: Columnation + Ord + Clone + 'static,
210 T: Columnation + Ord + Clone + 'static,
211 R: Columnation + Semigroup + Clone + 'static,
212{
213 type Container = TimelyStack<(D,T,R)>;
214
215 fn extract(&mut self) -> Option<&mut Self::Container> {
216 if let Some(ready) = self.ready.pop_front() {
217 self.empty = Some(ready);
218 self.empty.as_mut()
219 } else {
220 None
221 }
222 }
223
224 fn finish(&mut self) -> Option<&mut Self::Container> {
225 consolidate_updates(&mut self.pending);
226 while !self.pending.is_empty() {
227 let mut chunk = TimelyStack::with_capacity(Self::chunk_capacity());
228 for item in self.pending.drain(..std::cmp::min(self.pending.len(), chunk.capacity())) {
229 chunk.copy(&item);
230 }
231 self.ready.push_back(chunk);
232 }
233 self.empty = self.ready.pop_front();
234 self.empty.as_mut()
235 }
236}
237
238pub struct ContainerChunker<Output> {
240 pending: Output,
241 ready: VecDeque<Output>,
242 empty: Output,
243}
244
245impl<Output: Default> Default for ContainerChunker<Output> {
246 fn default() -> Self {
247 Self {
248 pending: Output::default(),
249 ready: VecDeque::default(),
250 empty: Output::default(),
251 }
252 }
253}
254
255impl<'a, Input, Output> PushInto<&'a mut Input> for ContainerChunker<Output>
256where
257 Input: Container,
258 Output: SizableContainer
259 + ConsolidateLayout
260 + PushInto<Input::Item<'a>>,
261{
262 fn push_into(&mut self, container: &'a mut Input) {
263 self.pending.ensure_capacity(&mut None);
264
265 for item in container.drain() {
266 self.pending.push(item);
267 if self.pending.at_capacity() {
268 let starting_len = self.pending.len();
269 self.pending.consolidate_into(&mut self.empty);
270 std::mem::swap(&mut self.pending, &mut self.empty);
271 self.empty.clear();
272 if self.pending.len() > starting_len / 2 {
273 self.ready.push_back(std::mem::take(&mut self.pending));
277 }
278 }
279 }
280 }
281}
282
283impl<Output> ContainerBuilder for ContainerChunker<Output>
284where
285 Output: SizableContainer + ConsolidateLayout + Clone + 'static,
286{
287 type Container = Output;
288
289 fn extract(&mut self) -> Option<&mut Self::Container> {
290 if let Some(ready) = self.ready.pop_front() {
291 self.empty = ready;
292 Some(&mut self.empty)
293 } else {
294 None
295 }
296 }
297
298 fn finish(&mut self) -> Option<&mut Self::Container> {
299 if !self.pending.is_empty() {
300 self.pending.consolidate_into(&mut self.empty);
301 std::mem::swap(&mut self.pending, &mut self.empty);
302 self.empty.clear();
303 if !self.pending.is_empty() {
304 self.ready.push_back(std::mem::take(&mut self.pending));
305 }
306 }
307 if let Some(ready) = self.ready.pop_front() {
308 self.empty = ready;
309 Some(&mut self.empty)
310 } else {
311 None
312 }
313 }
314}