differential_dataflow/trace/implementations/
merge_batcher_col.rs1use timely::Container;
4use timely::communication::message::RefOrMut;
5use timely::container::columnation::{Columnation, TimelyStack};
6use timely::logging::WorkerIdentifier;
7use timely::logging_core::Logger;
8use timely::progress::{frontier::Antichain, Timestamp};
9
10use crate::difference::Semigroup;
11use crate::logging::{BatcherEvent, DifferentialEvent};
12use crate::trace::{Batcher, Builder};
13
14pub struct ColumnatedMergeBatcher<K, V, T, D>
16where
17 K: Columnation + 'static,
18 V: Columnation + 'static,
19 T: Columnation + 'static,
20 D: Columnation + 'static,
21{
22 sorter: MergeSorterColumnation<(K, V), T, D>,
23 lower: Antichain<T>,
24 frontier: Antichain<T>,
25}
26
27impl<K, V, T, D> Batcher for ColumnatedMergeBatcher<K, V, T, D>
28where
29 K: Columnation + Ord + Clone + 'static,
30 V: Columnation + Ord + Clone + 'static,
31 T: Columnation + Timestamp + 'static,
32 D: Columnation + Semigroup + 'static,
33{
34 type Item = ((K,V),T,D);
35 type Time = T;
36
37 fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self {
38 ColumnatedMergeBatcher {
39 sorter: MergeSorterColumnation::new(logger, operator_id),
40 frontier: Antichain::new(),
41 lower: Antichain::from_elem(<T as Timestamp>::minimum()),
42 }
43 }
44
45 #[inline]
46 fn push_batch(&mut self, batch: RefOrMut<Vec<Self::Item>>) {
47 match batch {
49 RefOrMut::Ref(reference) => {
50 self.sorter.push(&mut reference.clone());
53 },
54 RefOrMut::Mut(reference) => {
55 self.sorter.push(reference);
56 }
57 }
58 }
59
60 #[inline]
65 fn seal<B: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<T>) -> B::Output {
66
67 let mut merged = Default::default();
68 self.sorter.finish_into(&mut merged);
69
70 let mut builder = {
73 let mut keys = 0;
74 let mut vals = 0;
75 let mut upds = 0;
76 let mut prev_keyval = None;
77 for buffer in merged.iter() {
78 for ((key, val), time, _) in buffer.iter() {
79 if !upper.less_equal(time) {
80 if let Some((p_key, p_val)) = prev_keyval {
81 if p_key != key {
82 keys += 1;
83 vals += 1;
84 }
85 else if p_val != val {
86 vals += 1;
87 }
88 upds += 1;
89 }
90 prev_keyval = Some((key, val));
91 }
92 }
93 }
94 B::with_capacity(keys, vals, upds)
95 };
96
97 let mut kept = Vec::new();
98 let mut keep = TimelyStack::default();
99
100 self.frontier.clear();
101
102 for buffer in merged.drain(..) {
103 for datum @ ((_key, _val), time, _diff) in &buffer[..] {
104 if upper.less_equal(time) {
105 self.frontier.insert(time.clone());
106 if keep.is_empty() {
107 if keep.capacity() != MergeSorterColumnation::<(K, V), T, D>::buffer_size() {
108 keep = self.sorter.empty();
109 }
110 } else if keep.len() == keep.capacity() {
111 kept.push(keep);
112 keep = self.sorter.empty();
113 }
114 keep.copy(datum);
115 }
116 else {
117 builder.copy(datum);
118 }
119 }
120 self.sorter.recycle(buffer);
122 }
123
124 if !keep.is_empty() {
126 kept.push(keep);
127 }
128 if !kept.is_empty() {
129 self.sorter.push_list(kept);
130 }
131
132 self.sorter.clear_stash();
134
135 let seal = builder.done(self.lower.clone(), upper.clone(), Antichain::from_elem(T::minimum()));
136 self.lower = upper;
137 seal
138 }
139
140 fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<T> {
142 self.frontier.borrow()
143 }
144}
145
146struct TimelyStackQueue<T: Columnation> {
147 list: TimelyStack<T>,
148 head: usize,
149}
150
151impl<T: Columnation> Default for TimelyStackQueue<T> {
152 fn default() -> Self {
153 Self::from(Default::default())
154 }
155}
156
157impl<T: Columnation> TimelyStackQueue<T> {
158
159 fn pop(&mut self) -> &T {
160 self.head += 1;
161 &self.list[self.head - 1]
162 }
163
164 fn peek(&self) -> &T {
165 &self.list[self.head]
166 }
167
168 fn from(list: TimelyStack<T>) -> Self {
169 TimelyStackQueue {
170 list,
171 head: 0,
172 }
173 }
174
175 fn done(self) -> TimelyStack<T> {
176 self.list
177 }
178
179 fn is_empty(&self) -> bool { self.head == self.list[..].len() }
180
181 fn iter(&self) -> impl Iterator<Item=&T> + Clone + ExactSizeIterator {
183 self.list[self.head..].iter()
184 }
185}
186
187struct MergeSorterColumnation<D: Columnation + 'static, T: Columnation + 'static, R: Columnation + 'static> {
188 queue: Vec<Vec<TimelyStack<(D, T, R)>>>,
190 stash: Vec<TimelyStack<(D, T, R)>>,
191 pending: Vec<(D, T, R)>,
192 logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>,
193 operator_id: usize,
194}
195
196impl<D: Ord+Columnation+'static, T: Ord+Columnation+'static, R: Semigroup+Columnation+'static> MergeSorterColumnation<D, T, R> {
197
198 const BUFFER_SIZE_BYTES: usize = 64 << 10;
199
200 const fn buffer_size() -> usize {
202 let size = std::mem::size_of::<(D, T, R)>();
203 if size == 0 {
204 Self::BUFFER_SIZE_BYTES
205 } else if size <= Self::BUFFER_SIZE_BYTES {
206 Self::BUFFER_SIZE_BYTES / size
207 } else {
208 1
209 }
210 }
211
212 const fn pending_buffer_size() -> usize {
214 Self::buffer_size() * 2
215 }
216
217 fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self {
218 Self {
219 logger,
220 operator_id,
221 queue: Vec::new(),
222 stash: Vec::new(),
223 pending: Vec::new(),
224 }
225 }
226
227 fn empty(&mut self) -> TimelyStack<(D, T, R)> {
228 self.stash.pop().unwrap_or_else(|| TimelyStack::with_capacity(Self::buffer_size()))
229 }
230
231 fn clear_stash(&mut self) {
233 self.stash.clear();
234 }
235
236 fn recycle(&mut self, mut buffer: TimelyStack<(D, T, R)>) {
238 if buffer.capacity() == Self::buffer_size() && self.stash.len() < 2 {
239 buffer.clear();
240 self.stash.push(buffer);
241 }
242 }
243
244 fn push(&mut self, batch: &mut Vec<(D, T, R)>) {
245 if self.pending.capacity() < Self::pending_buffer_size() {
247 self.pending.reserve(Self::pending_buffer_size() - self.pending.capacity());
248 }
249
250 while !batch.is_empty() {
251 self.pending.extend(batch.drain(..std::cmp::min(batch.len(), self.pending.capacity() - self.pending.len())));
252 if self.pending.len() == self.pending.capacity() {
253 crate::consolidation::consolidate_updates(&mut self.pending);
254 if self.pending.len() > self.pending.capacity() / 2 {
255 self.flush_pending();
257 }
258 }
259 }
260 }
261
262 fn flush_pending(&mut self) {
265 if !self.pending.is_empty() {
266 let mut stack = self.empty();
267 stack.reserve_items(self.pending.iter());
268 for tuple in self.pending.drain(..) {
269 stack.copy(&tuple);
270 }
271 self.queue_push(vec![stack]);
272 while self.queue.len() > 1 && (self.queue[self.queue.len()-1].len() >= self.queue[self.queue.len()-2].len() / 2) {
273 let list1 = self.queue_pop().unwrap();
274 let list2 = self.queue_pop().unwrap();
275 let merged = self.merge_by(list1, list2);
276 self.queue_push(merged);
277 }
278 }
279 }
280
281 fn push_list(&mut self, list: Vec<TimelyStack<(D, T, R)>>) {
284 while self.queue.len() > 1 && self.queue[self.queue.len()-1].len() < list.len() {
285 let list1 = self.queue_pop().unwrap();
286 let list2 = self.queue_pop().unwrap();
287 let merged = self.merge_by(list1, list2);
288 self.queue_push(merged);
289 }
290 self.queue_push(list);
291 }
292
293 fn finish_into(&mut self, target: &mut Vec<TimelyStack<(D, T, R)>>) {
294 crate::consolidation::consolidate_updates(&mut self.pending);
295 self.flush_pending();
296 while self.queue.len() > 1 {
297 let list1 = self.queue_pop().unwrap();
298 let list2 = self.queue_pop().unwrap();
299 let merged = self.merge_by(list1, list2);
300 self.queue_push(merged);
301 }
302
303 if let Some(mut last) = self.queue_pop() {
304 std::mem::swap(&mut last, target);
305 }
306 }
307
308 fn merge_by(&mut self, list1: Vec<TimelyStack<(D, T, R)>>, list2: Vec<TimelyStack<(D, T, R)>>) -> Vec<TimelyStack<(D, T, R)>> {
310 use std::cmp::Ordering;
311
312 let mut output = Vec::with_capacity(list1.len() + list2.len());
314 let mut result = self.empty();
315
316 let mut list1 = list1.into_iter();
317 let mut list2 = list2.into_iter();
318
319 let mut head1 = TimelyStackQueue::from(list1.next().unwrap_or_default());
320 let mut head2 = TimelyStackQueue::from(list2.next().unwrap_or_default());
321
322 while !head1.is_empty() && !head2.is_empty() {
324
325 while (result.capacity() - result.len()) > 0 && !head1.is_empty() && !head2.is_empty() {
326
327 let cmp = {
328 let x = head1.peek();
329 let y = head2.peek();
330 (&x.0, &x.1).cmp(&(&y.0, &y.1))
331 };
332 match cmp {
333 Ordering::Less => { result.copy(head1.pop()); }
334 Ordering::Greater => { result.copy(head2.pop()); }
335 Ordering::Equal => {
336 let (data1, time1, diff1) = head1.pop();
337 let (_data2, _time2, diff2) = head2.pop();
338 let mut diff1 = diff1.clone();
339 diff1.plus_equals(diff2);
340 if !diff1.is_zero() {
341 result.copy_destructured(data1, time1, &diff1);
342 }
343 }
344 }
345 }
346
347 if result.capacity() == result.len() {
348 output.push(result);
349 result = self.empty();
350 }
351
352 if head1.is_empty() {
353 self.recycle(head1.done());
354 head1 = TimelyStackQueue::from(list1.next().unwrap_or_default());
355 }
356 if head2.is_empty() {
357 self.recycle(head2.done());
358 head2 = TimelyStackQueue::from(list2.next().unwrap_or_default());
359 }
360 }
361
362 if result.len() > 0 {
363 output.push(result);
364 } else {
365 self.recycle(result);
366 }
367
368 if !head1.is_empty() {
369 let mut result = self.empty();
370 result.reserve_items(head1.iter());
371 for item in head1.iter() { result.copy(item); }
372 output.push(result);
373 }
374 output.extend(list1);
375
376 if !head2.is_empty() {
377 let mut result = self.empty();
378 result.reserve_items(head2.iter());
379 for item in head2.iter() { result.copy(item); }
380 output.push(result);
381 }
382 output.extend(list2);
383
384 output
385 }
386}
387
388impl<D: Columnation + 'static, T: Columnation + 'static, R: Columnation + 'static> MergeSorterColumnation<D, T, R> {
389 #[inline]
391 fn queue_pop(&mut self) -> Option<Vec<TimelyStack<(D, T, R)>>> {
392 let batch = self.queue.pop();
393 self.account(batch.iter().flatten(), -1);
394 batch
395 }
396
397 #[inline]
399 fn queue_push(&mut self, batch: Vec<TimelyStack<(D, T, R)>>) {
400 self.account(&batch, 1);
401 self.queue.push(batch);
402 }
403
404 fn account<'a, I: IntoIterator<Item=&'a TimelyStack<(D, T, R)>>>(&self, items: I, diff: isize) {
409 if let Some(logger) = &self.logger {
410 let (mut records, mut siz, mut capacity, mut allocations) = (0isize, 0isize, 0isize, 0isize);
411 for stack in items {
412 records = records.saturating_add_unsigned(stack.len());
413 stack.heap_size(|s, c| {
414 siz = siz.saturating_add_unsigned(s);
415 capacity = capacity.saturating_add_unsigned(c);
416 allocations += isize::from(c > 0);
417 });
418 }
419 logger.log(BatcherEvent {
420 operator: self.operator_id,
421 records_diff: records * diff,
422 size_diff: siz * diff,
423 capacity_diff: capacity * diff,
424 allocations_diff: allocations * diff,
425 })
426 }
427 }
428
429}
430
431impl<D: Columnation + 'static, T: Columnation + 'static, R: Columnation + 'static> Drop for MergeSorterColumnation<D, T, R> {
432 fn drop(&mut self) {
433 while self.queue_pop().is_some() { }
434 }
435}