1use std::marker::PhantomData;
14
15use timely::progress::frontier::AntichainRef;
16use timely::progress::{frontier::Antichain, Timestamp};
17use timely::container::{ContainerBuilder, PushInto};
18
19use crate::logging::{BatcherEvent, Logger};
20use crate::trace::{Batcher, Builder, Description};
21
22pub struct MergeBatcher<Input, C, M: Merger> {
27 chunker: C,
29 chains: Vec<Vec<M::Chunk>>,
33 stash: Vec<M::Chunk>,
35 merger: M,
37 lower: Antichain<M::Time>,
39 frontier: Antichain<M::Time>,
41 logger: Option<Logger>,
43 operator_id: usize,
45 _marker: PhantomData<Input>,
47}
48
49impl<Input, C, M> Batcher for MergeBatcher<Input, C, M>
50where
51 C: ContainerBuilder<Container=M::Chunk> + for<'a> PushInto<&'a mut Input>,
52 M: Merger<Time: Timestamp>,
53{
54 type Input = Input;
55 type Time = M::Time;
56 type Output = M::Chunk;
57
58 fn new(logger: Option<Logger>, operator_id: usize) -> Self {
59 Self {
60 logger,
61 operator_id,
62 chunker: C::default(),
63 merger: M::default(),
64 chains: Vec::new(),
65 stash: Vec::new(),
66 frontier: Antichain::new(),
67 lower: Antichain::from_elem(M::Time::minimum()),
68 _marker: PhantomData,
69 }
70 }
71
72 fn push_container(&mut self, container: &mut Input) {
75 self.chunker.push_into(container);
76 while let Some(chunk) = self.chunker.extract() {
77 let chunk = std::mem::take(chunk);
78 self.insert_chain(vec![chunk]);
79 }
80 }
81
82 fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(&mut self, upper: Antichain<M::Time>) -> B::Output {
87 while let Some(chunk) = self.chunker.finish() {
89 let chunk = std::mem::take(chunk);
90 self.insert_chain(vec![chunk]);
91 }
92
93 while self.chains.len() > 1 {
95 let list1 = self.chain_pop().unwrap();
96 let list2 = self.chain_pop().unwrap();
97 let merged = self.merge_by(list1, list2);
98 self.chain_push(merged);
99 }
100 let merged = self.chain_pop().unwrap_or_default();
101
102 let mut kept = Vec::new();
104 let mut readied = Vec::new();
105 self.frontier.clear();
106
107 self.merger.extract(merged, upper.borrow(), &mut self.frontier, &mut readied, &mut kept, &mut self.stash);
108
109 if !kept.is_empty() {
110 self.chain_push(kept);
111 }
112
113 self.stash.clear();
114
115 let description = Description::new(self.lower.clone(), upper.clone(), Antichain::from_elem(M::Time::minimum()));
116 let seal = B::seal(&mut readied, description);
117 self.lower = upper;
118 seal
119 }
120
121 #[inline]
123 fn frontier(&mut self) -> AntichainRef<'_, M::Time> {
124 self.frontier.borrow()
125 }
126}
127
128impl<Input, C, M: Merger> MergeBatcher<Input, C, M> {
129 fn insert_chain(&mut self, chain: Vec<M::Chunk>) {
132 if !chain.is_empty() {
133 self.chain_push(chain);
134 while self.chains.len() > 1 && (self.chains[self.chains.len() - 1].len() >= self.chains[self.chains.len() - 2].len() / 2) {
135 let list1 = self.chain_pop().unwrap();
136 let list2 = self.chain_pop().unwrap();
137 let merged = self.merge_by(list1, list2);
138 self.chain_push(merged);
139 }
140 }
141 }
142
143 fn merge_by(&mut self, list1: Vec<M::Chunk>, list2: Vec<M::Chunk>) -> Vec<M::Chunk> {
145 let mut output = Vec::with_capacity(list1.len() + list2.len());
147 self.merger.merge(list1, list2, &mut output, &mut self.stash);
148
149 output
150 }
151
152 #[inline]
154 fn chain_pop(&mut self) -> Option<Vec<M::Chunk>> {
155 let chain = self.chains.pop();
156 self.account(chain.iter().flatten().map(M::account), -1);
157 chain
158 }
159
160 #[inline]
162 fn chain_push(&mut self, chain: Vec<M::Chunk>) {
163 self.account(chain.iter().map(M::account), 1);
164 self.chains.push(chain);
165 }
166
167 #[inline]
172 fn account<I: IntoIterator<Item = (usize, usize, usize, usize)>>(&self, items: I, diff: isize) {
173 if let Some(logger) = &self.logger {
174 let (mut records, mut size, mut capacity, mut allocations) = (0isize, 0isize, 0isize, 0isize);
175 for (records_, size_, capacity_, allocations_) in items {
176 records = records.saturating_add_unsigned(records_);
177 size = size.saturating_add_unsigned(size_);
178 capacity = capacity.saturating_add_unsigned(capacity_);
179 allocations = allocations.saturating_add_unsigned(allocations_);
180 }
181 logger.log(BatcherEvent {
182 operator: self.operator_id,
183 records_diff: records * diff,
184 size_diff: size * diff,
185 capacity_diff: capacity * diff,
186 allocations_diff: allocations * diff,
187 })
188 }
189 }
190}
191
192impl<Input, C, M: Merger> Drop for MergeBatcher<Input, C, M> {
193 fn drop(&mut self) {
194 while self.chain_pop().is_some() {}
196 }
197}
198
199pub trait Merger: Default {
201 type Chunk: Default;
203 type Time;
205 fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>);
207 fn extract(
209 &mut self,
210 merged: Vec<Self::Chunk>,
211 upper: AntichainRef<Self::Time>,
212 frontier: &mut Antichain<Self::Time>,
213 readied: &mut Vec<Self::Chunk>,
214 kept: &mut Vec<Self::Chunk>,
215 stash: &mut Vec<Self::Chunk>,
216 );
217
218 fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize);
220}
221
222pub use container::InternalMerger;
223
224pub mod container {
225
226 use std::marker::PhantomData;
234 use timely::container::SizableContainer;
235 use timely::progress::frontier::{Antichain, AntichainRef};
236 use timely::{Accountable, PartialOrder};
237 use crate::trace::implementations::merge_batcher::Merger;
238
239 pub trait InternalMerge: Accountable + SizableContainer + Default {
242 type TimeOwned;
244
245 fn len(&self) -> usize;
247
248 fn clear(&mut self);
250
251 fn account(&self) -> (usize, usize, usize, usize) {
253 let (size, capacity, allocations) = (0, 0, 0);
254 (usize::try_from(self.record_count()).unwrap(), size, capacity, allocations)
255 }
256
257 fn merge_from(
265 &mut self,
266 others: &mut [Self],
267 positions: &mut [usize],
268 );
269
270 fn extract(
273 &mut self,
274 upper: AntichainRef<Self::TimeOwned>,
275 frontier: &mut Antichain<Self::TimeOwned>,
276 keep: &mut Self,
277 ship: &mut Self,
278 );
279 }
280
281 pub type VecInternalMerger<D, T, R> = InternalMerger<Vec<(D, T, R)>>;
283 pub type ColInternalMerger<D, T, R> = InternalMerger<crate::containers::TimelyStack<(D, T, R)>>;
285
286 pub struct InternalMerger<MC> {
288 _marker: PhantomData<MC>,
289 }
290
291 impl<MC> Default for InternalMerger<MC> {
292 fn default() -> Self { Self { _marker: PhantomData } }
293 }
294
295 impl<MC> InternalMerger<MC> where MC: InternalMerge {
296 #[inline]
297 fn empty(&self, stash: &mut Vec<MC>) -> MC {
298 stash.pop().unwrap_or_else(|| {
299 let mut container = MC::default();
300 container.ensure_capacity(&mut None);
301 container
302 })
303 }
304 #[inline]
305 fn recycle(&self, mut chunk: MC, stash: &mut Vec<MC>) {
306 chunk.clear();
307 stash.push(chunk);
308 }
309 fn drain_side(
311 &self,
312 head: &mut MC,
313 pos: &mut usize,
314 list: &mut std::vec::IntoIter<MC>,
315 result: &mut MC,
316 output: &mut Vec<MC>,
317 stash: &mut Vec<MC>,
318 ) {
319 while *pos < head.len() {
320 result.merge_from(
321 std::slice::from_mut(head),
322 std::slice::from_mut(pos),
323 );
324 if *pos >= head.len() {
325 let old = std::mem::replace(head, list.next().unwrap_or_default());
326 self.recycle(old, stash);
327 *pos = 0;
328 }
329 if result.at_capacity() {
330 output.push(std::mem::take(result));
331 *result = self.empty(stash);
332 }
333 }
334 }
335 }
336
337 impl<MC> Merger for InternalMerger<MC>
338 where
339 MC: InternalMerge<TimeOwned: Ord + PartialOrder + Clone + 'static> + 'static,
340 {
341 type Time = MC::TimeOwned;
342 type Chunk = MC;
343
344 fn merge(&mut self, list1: Vec<MC>, list2: Vec<MC>, output: &mut Vec<MC>, stash: &mut Vec<MC>) {
345 let mut list1 = list1.into_iter();
346 let mut list2 = list2.into_iter();
347
348 let mut heads = [list1.next().unwrap_or_default(), list2.next().unwrap_or_default()];
349 let mut positions = [0usize, 0usize];
350
351 let mut result = self.empty(stash);
352
353 while positions[0] < heads[0].len() && positions[1] < heads[1].len() {
355 result.merge_from(&mut heads, &mut positions);
356
357 if positions[0] >= heads[0].len() {
358 let old = std::mem::replace(&mut heads[0], list1.next().unwrap_or_default());
359 self.recycle(old, stash);
360 positions[0] = 0;
361 }
362 if positions[1] >= heads[1].len() {
363 let old = std::mem::replace(&mut heads[1], list2.next().unwrap_or_default());
364 self.recycle(old, stash);
365 positions[1] = 0;
366 }
367 if result.at_capacity() {
368 output.push(std::mem::take(&mut result));
369 result = self.empty(stash);
370 }
371 }
372
373 self.drain_side(&mut heads[0], &mut positions[0], &mut list1, &mut result, output, stash);
375 if !result.is_empty() {
376 output.push(std::mem::take(&mut result));
377 result = self.empty(stash);
378 }
379 output.extend(list1);
380
381 self.drain_side(&mut heads[1], &mut positions[1], &mut list2, &mut result, output, stash);
383 if !result.is_empty() {
384 output.push(std::mem::take(&mut result));
385 }
386 output.extend(list2);
387 }
388
389 fn extract(
390 &mut self,
391 merged: Vec<Self::Chunk>,
392 upper: AntichainRef<Self::Time>,
393 frontier: &mut Antichain<Self::Time>,
394 ship: &mut Vec<Self::Chunk>,
395 kept: &mut Vec<Self::Chunk>,
396 stash: &mut Vec<Self::Chunk>,
397 ) {
398 let mut keep = self.empty(stash);
399 let mut ready = self.empty(stash);
400
401 for mut buffer in merged {
402 buffer.extract(upper, frontier, &mut keep, &mut ready);
403 self.recycle(buffer, stash);
404 if keep.at_capacity() {
405 kept.push(std::mem::take(&mut keep));
406 keep = self.empty(stash);
407 }
408 if ready.at_capacity() {
409 ship.push(std::mem::take(&mut ready));
410 ready = self.empty(stash);
411 }
412 }
413 if !keep.is_empty() {
414 kept.push(keep);
415 }
416 if !ready.is_empty() {
417 ship.push(ready);
418 }
419 }
420
421 fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
422 chunk.account()
423 }
424 }
425
426 pub mod vec_internal {
428 use std::cmp::Ordering;
429 use timely::PartialOrder;
430 use timely::container::SizableContainer;
431 use timely::progress::frontier::{Antichain, AntichainRef};
432 use crate::difference::Semigroup;
433 use super::InternalMerge;
434
435 impl<D: Ord + Clone + 'static, T: Ord + Clone + PartialOrder + 'static, R: Semigroup + Clone + 'static> InternalMerge for Vec<(D, T, R)> {
436 type TimeOwned = T;
437
438 fn len(&self) -> usize { Vec::len(self) }
439 fn clear(&mut self) { Vec::clear(self) }
440
441 fn merge_from(
442 &mut self,
443 others: &mut [Self],
444 positions: &mut [usize],
445 ) {
446 match others.len() {
447 0 => {},
448 1 => {
449 let other = &mut others[0];
450 let pos = &mut positions[0];
451 if self.is_empty() && *pos == 0 {
452 std::mem::swap(self, other);
453 return;
454 }
455 self.extend_from_slice(&other[*pos ..]);
456 *pos = other.len();
457 },
458 2 => {
459 let (left, right) = others.split_at_mut(1);
460 let other1 = &mut left[0];
461 let other2 = &mut right[0];
462
463 while positions[0] < other1.len() && positions[1] < other2.len() && !self.at_capacity() {
464 let (d1, t1, _) = &other1[positions[0]];
465 let (d2, t2, _) = &other2[positions[1]];
466 match (d1, t1).cmp(&(d2, t2)) {
467 Ordering::Less => {
468 self.push(other1[positions[0]].clone());
469 positions[0] += 1;
470 }
471 Ordering::Greater => {
472 self.push(other2[positions[1]].clone());
473 positions[1] += 1;
474 }
475 Ordering::Equal => {
476 let (d, t, mut r1) = other1[positions[0]].clone();
477 let (_, _, ref r2) = other2[positions[1]];
478 r1.plus_equals(r2);
479 if !r1.is_zero() {
480 self.push((d, t, r1));
481 }
482 positions[0] += 1;
483 positions[1] += 1;
484 }
485 }
486 }
487 },
488 n => unimplemented!("{n}-way merge not yet supported"),
489 }
490 }
491
492 fn extract(
493 &mut self,
494 upper: AntichainRef<T>,
495 frontier: &mut Antichain<T>,
496 keep: &mut Self,
497 ship: &mut Self,
498 ) {
499 for (data, time, diff) in self.drain(..) {
500 if upper.less_equal(&time) {
501 frontier.insert_with(&time, |time| time.clone());
502 keep.push((data, time, diff));
503 } else {
504 ship.push((data, time, diff));
505 }
506 }
507 }
508 }
509 }
510
511 pub mod columnation_internal {
513 use std::cmp::Ordering;
514 use columnation::Columnation;
515 use timely::PartialOrder;
516 use timely::container::SizableContainer;
517 use timely::progress::frontier::{Antichain, AntichainRef};
518 use crate::containers::TimelyStack;
519 use crate::difference::Semigroup;
520 use super::InternalMerge;
521
522 impl<D, T, R> InternalMerge for TimelyStack<(D, T, R)>
523 where
524 D: Ord + Columnation + Clone + 'static,
525 T: Ord + Columnation + Clone + PartialOrder + 'static,
526 R: Default + Semigroup + Columnation + Clone + 'static,
527 {
528 type TimeOwned = T;
529
530 fn len(&self) -> usize { self[..].len() }
531 fn clear(&mut self) { TimelyStack::clear(self) }
532
533 fn account(&self) -> (usize, usize, usize, usize) {
534 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
535 let cb = |siz, cap| {
536 size += siz;
537 capacity += cap;
538 allocations += 1;
539 };
540 self.heap_size(cb);
541 (self.len(), size, capacity, allocations)
542 }
543
544 fn merge_from(
545 &mut self,
546 others: &mut [Self],
547 positions: &mut [usize],
548 ) {
549 match others.len() {
550 0 => {},
551 1 => {
552 let other = &mut others[0];
553 let pos = &mut positions[0];
554 if self[..].is_empty() && *pos == 0 {
555 std::mem::swap(self, other);
556 return;
557 }
558 for i in *pos .. other[..].len() {
559 self.copy(&other[i]);
560 }
561 *pos = other[..].len();
562 },
563 2 => {
564 let (left, right) = others.split_at_mut(1);
565 let other1 = &left[0];
566 let other2 = &right[0];
567
568 let mut stash = R::default();
569
570 while positions[0] < other1[..].len() && positions[1] < other2[..].len() && !self.at_capacity() {
571 let (d1, t1, _) = &other1[positions[0]];
572 let (d2, t2, _) = &other2[positions[1]];
573 match (d1, t1).cmp(&(d2, t2)) {
574 Ordering::Less => {
575 self.copy(&other1[positions[0]]);
576 positions[0] += 1;
577 }
578 Ordering::Greater => {
579 self.copy(&other2[positions[1]]);
580 positions[1] += 1;
581 }
582 Ordering::Equal => {
583 let (_, _, r1) = &other1[positions[0]];
584 let (_, _, r2) = &other2[positions[1]];
585 stash.clone_from(r1);
586 stash.plus_equals(r2);
587 if !stash.is_zero() {
588 let (d, t, _) = &other1[positions[0]];
589 self.copy_destructured(d, t, &stash);
590 }
591 positions[0] += 1;
592 positions[1] += 1;
593 }
594 }
595 }
596 },
597 n => unimplemented!("{n}-way merge not yet supported"),
598 }
599 }
600
601 fn extract(
602 &mut self,
603 upper: AntichainRef<T>,
604 frontier: &mut Antichain<T>,
605 keep: &mut Self,
606 ship: &mut Self,
607 ) {
608 for (data, time, diff) in self.iter() {
609 if upper.less_equal(time) {
610 frontier.insert_with(time, |time| time.clone());
611 keep.copy_destructured(data, time, diff);
612 } else {
613 ship.copy_destructured(data, time, diff);
614 }
615 }
616 }
617 }
618 }
619}