1use std::marker::PhantomData;
14
15use timely::container::{ContainerBuilder, PushInto};
16use timely::progress::frontier::AntichainRef;
17use timely::progress::{frontier::Antichain, Timestamp};
18
19use crate::logging::{BatcherEvent, Logger};
20use crate::trace::{Batcher, Builder, Description};
21
22pub struct MergeBatcher<Input, C, M: Merger> {
27 chunker: C,
29 chains: Vec<Vec<M::Chunk>>,
33 stash: Vec<M::Chunk>,
35 merger: M,
37 lower: Antichain<M::Time>,
39 frontier: Antichain<M::Time>,
41 logger: Option<Logger>,
43 operator_id: usize,
45 _marker: PhantomData<Input>,
47}
48
49impl<Input, C, M> Batcher for MergeBatcher<Input, C, M>
50where
51 C: ContainerBuilder<Container = M::Chunk> + for<'a> PushInto<&'a mut Input>,
52 M: Merger<Time: Timestamp>,
53{
54 type Input = Input;
55 type Time = M::Time;
56 type Output = M::Chunk;
57
58 fn new(logger: Option<Logger>, operator_id: usize) -> Self {
59 Self {
60 logger,
61 operator_id,
62 chunker: C::default(),
63 merger: M::default(),
64 chains: Vec::new(),
65 stash: Vec::new(),
66 frontier: Antichain::new(),
67 lower: Antichain::from_elem(M::Time::minimum()),
68 _marker: PhantomData,
69 }
70 }
71
72 fn push_container(&mut self, container: &mut Input) {
75 self.chunker.push_into(container);
76 while let Some(chunk) = self.chunker.extract() {
77 let chunk = std::mem::take(chunk);
78 self.insert_chain(vec![chunk]);
79 }
80 }
81
82 fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(
87 &mut self,
88 upper: Antichain<M::Time>,
89 ) -> B::Output {
90 while let Some(chunk) = self.chunker.finish() {
92 let chunk = std::mem::take(chunk);
93 self.insert_chain(vec![chunk]);
94 }
95
96 while self.chains.len() > 1 {
98 let list1 = self.chain_pop().unwrap();
99 let list2 = self.chain_pop().unwrap();
100 let merged = self.merge_by(list1, list2);
101 self.chain_push(merged);
102 }
103 let merged = self.chain_pop().unwrap_or_default();
104
105 let mut kept = Vec::new();
107 let mut readied = Vec::new();
108 self.frontier.clear();
109
110 self.merger.extract(
111 merged,
112 upper.borrow(),
113 &mut self.frontier,
114 &mut readied,
115 &mut kept,
116 &mut self.stash,
117 );
118
119 if !kept.is_empty() {
120 self.chain_push(kept);
121 }
122
123 self.stash.clear();
124
125 let description = Description::new(
126 self.lower.clone(),
127 upper.clone(),
128 Antichain::from_elem(M::Time::minimum()),
129 );
130 let seal = B::seal(&mut readied, description);
131 self.lower = upper;
132 seal
133 }
134
135 #[inline]
137 fn frontier(&mut self) -> AntichainRef<'_, M::Time> {
138 self.frontier.borrow()
139 }
140}
141
142impl<Input, C, M: Merger> MergeBatcher<Input, C, M> {
143 fn insert_chain(&mut self, chain: Vec<M::Chunk>) {
146 if !chain.is_empty() {
147 self.chain_push(chain);
148 while self.chains.len() > 1
149 && (self.chains[self.chains.len() - 1].len()
150 >= self.chains[self.chains.len() - 2].len() / 2)
151 {
152 let list1 = self.chain_pop().unwrap();
153 let list2 = self.chain_pop().unwrap();
154 let merged = self.merge_by(list1, list2);
155 self.chain_push(merged);
156 }
157 }
158 }
159
160 fn merge_by(&mut self, list1: Vec<M::Chunk>, list2: Vec<M::Chunk>) -> Vec<M::Chunk> {
162 let mut output = Vec::with_capacity(list1.len() + list2.len());
164 self.merger
165 .merge(list1, list2, &mut output, &mut self.stash);
166
167 output
168 }
169
170 #[inline]
172 fn chain_pop(&mut self) -> Option<Vec<M::Chunk>> {
173 let chain = self.chains.pop();
174 self.account(chain.iter().flatten().map(M::account), -1);
175 chain
176 }
177
178 #[inline]
180 fn chain_push(&mut self, chain: Vec<M::Chunk>) {
181 self.account(chain.iter().map(M::account), 1);
182 self.chains.push(chain);
183 }
184
185 #[inline]
190 fn account<I: IntoIterator<Item = (usize, usize, usize, usize)>>(&self, items: I, diff: isize) {
191 if let Some(logger) = &self.logger {
192 let (mut records, mut size, mut capacity, mut allocations) =
193 (0isize, 0isize, 0isize, 0isize);
194 for (records_, size_, capacity_, allocations_) in items {
195 records = records.saturating_add_unsigned(records_);
196 size = size.saturating_add_unsigned(size_);
197 capacity = capacity.saturating_add_unsigned(capacity_);
198 allocations = allocations.saturating_add_unsigned(allocations_);
199 }
200 logger.log(BatcherEvent {
201 operator: self.operator_id,
202 records_diff: records * diff,
203 size_diff: size * diff,
204 capacity_diff: capacity * diff,
205 allocations_diff: allocations * diff,
206 })
207 }
208 }
209}
210
211impl<Input, C, M: Merger> Drop for MergeBatcher<Input, C, M> {
212 fn drop(&mut self) {
213 while self.chain_pop().is_some() {}
215 }
216}
217
218pub trait Merger: Default {
220 type Chunk: Default;
222 type Time;
224 fn merge(
226 &mut self,
227 list1: Vec<Self::Chunk>,
228 list2: Vec<Self::Chunk>,
229 output: &mut Vec<Self::Chunk>,
230 stash: &mut Vec<Self::Chunk>,
231 );
232 fn extract(
234 &mut self,
235 merged: Vec<Self::Chunk>,
236 upper: AntichainRef<Self::Time>,
237 frontier: &mut Antichain<Self::Time>,
238 readied: &mut Vec<Self::Chunk>,
239 kept: &mut Vec<Self::Chunk>,
240 stash: &mut Vec<Self::Chunk>,
241 );
242
243 fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize);
245}
246
247pub use container::{ColMerger, VecMerger};
248
249pub mod container {
250
251 use crate::trace::implementations::merge_batcher::Merger;
264 use std::cmp::Ordering;
265 use std::marker::PhantomData;
266 use timely::container::DrainContainer;
267 use timely::container::{PushInto, SizableContainer};
268 use timely::progress::frontier::{Antichain, AntichainRef};
269 use timely::{Accountable, Data, PartialOrder};
270
271 pub trait ContainerQueue<C: DrainContainer> {
273 fn next_or_alloc(&mut self) -> Result<C::Item<'_>, C>;
275 fn is_empty(&self) -> bool;
277 fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering;
279 fn from(container: C) -> Self;
281 }
282
283 pub trait MergerChunk: Accountable + DrainContainer + SizableContainer + Default {
285 type TimeOwned;
290 type DiffOwned: Default;
295
296 fn time_kept(
300 time1: &Self::Item<'_>,
301 upper: &AntichainRef<Self::TimeOwned>,
302 frontier: &mut Antichain<Self::TimeOwned>,
303 ) -> bool;
304
305 fn push_and_add<'a>(
310 &mut self,
311 item1: Self::Item<'a>,
312 item2: Self::Item<'a>,
313 stash: &mut Self::DiffOwned,
314 );
315
316 fn account(&self) -> (usize, usize, usize, usize) {
319 let (size, capacity, allocations) = (0, 0, 0);
320 (
321 usize::try_from(self.record_count()).unwrap(),
322 size,
323 capacity,
324 allocations,
325 )
326 }
327
328 fn clear(&mut self);
330 }
331
332 pub struct ContainerMerger<MC, CQ> {
337 _marker: PhantomData<(MC, CQ)>,
338 }
339
340 impl<MC, CQ> Default for ContainerMerger<MC, CQ> {
341 fn default() -> Self {
342 Self {
343 _marker: PhantomData,
344 }
345 }
346 }
347
348 impl<MC: MergerChunk, CQ> ContainerMerger<MC, CQ> {
349 #[inline]
351 fn empty(&self, stash: &mut Vec<MC>) -> MC {
352 stash.pop().unwrap_or_else(|| {
353 let mut container = MC::default();
354 container.ensure_capacity(&mut None);
355 container
356 })
357 }
358 #[inline]
360 fn recycle(&self, mut chunk: MC, stash: &mut Vec<MC>) {
361 chunk.clear();
363 stash.push(chunk);
364 }
365 }
366
367 impl<MC, CQ> Merger for ContainerMerger<MC, CQ>
368 where
369 for<'a> MC: MergerChunk<TimeOwned: Ord + PartialOrder + Data>
370 + Clone
371 + PushInto<<MC as DrainContainer>::Item<'a>>
372 + 'static,
373 CQ: ContainerQueue<MC>,
374 {
375 type Time = MC::TimeOwned;
376 type Chunk = MC;
377
378 fn merge(
380 &mut self,
381 list1: Vec<Self::Chunk>,
382 list2: Vec<Self::Chunk>,
383 output: &mut Vec<Self::Chunk>,
384 stash: &mut Vec<Self::Chunk>,
385 ) {
386 let mut list1 = list1.into_iter();
387 let mut list2 = list2.into_iter();
388
389 let mut head1 = CQ::from(list1.next().unwrap_or_default());
390 let mut head2 = CQ::from(list2.next().unwrap_or_default());
391
392 let mut result = self.empty(stash);
393
394 let mut diff_owned = Default::default();
395
396 while !head1.is_empty() && !head2.is_empty() {
398 while !result.at_capacity() && !head1.is_empty() && !head2.is_empty() {
399 let cmp = head1.cmp_heads(&head2);
400 match cmp {
404 Ordering::Less => {
405 result.push_into(head1.next_or_alloc().ok().unwrap());
406 }
407 Ordering::Greater => {
408 result.push_into(head2.next_or_alloc().ok().unwrap());
409 }
410 Ordering::Equal => {
411 let item1 = head1.next_or_alloc().ok().unwrap();
412 let item2 = head2.next_or_alloc().ok().unwrap();
413 result.push_and_add(item1, item2, &mut diff_owned);
414 }
415 }
416 }
417
418 if result.at_capacity() {
419 output.push_into(result);
420 result = self.empty(stash);
421 }
422
423 if head1.is_empty() {
424 self.recycle(head1.next_or_alloc().err().unwrap(), stash);
425 head1 = CQ::from(list1.next().unwrap_or_default());
426 }
427 if head2.is_empty() {
428 self.recycle(head2.next_or_alloc().err().unwrap(), stash);
429 head2 = CQ::from(list2.next().unwrap_or_default());
430 }
431 }
432
433 while let Ok(next) = head1.next_or_alloc() {
435 result.push_into(next);
436 if result.at_capacity() {
437 output.push_into(result);
438 result = self.empty(stash);
439 }
440 }
441 if !result.is_empty() {
442 output.push_into(result);
443 result = self.empty(stash);
444 }
445 output.extend(list1);
446
447 while let Ok(next) = head2.next_or_alloc() {
449 result.push_into(next);
450 if result.at_capacity() {
451 output.push(result);
452 result = self.empty(stash);
453 }
454 }
455 if !result.is_empty() {
456 output.push_into(result);
457 }
459 output.extend(list2);
460 }
461
462 fn extract(
463 &mut self,
464 merged: Vec<Self::Chunk>,
465 upper: AntichainRef<Self::Time>,
466 frontier: &mut Antichain<Self::Time>,
467 readied: &mut Vec<Self::Chunk>,
468 kept: &mut Vec<Self::Chunk>,
469 stash: &mut Vec<Self::Chunk>,
470 ) {
471 let mut keep = self.empty(stash);
472 let mut ready = self.empty(stash);
473
474 for mut buffer in merged {
475 for item in buffer.drain() {
476 if MC::time_kept(&item, &upper, frontier) {
477 if keep.at_capacity() && !keep.is_empty() {
478 kept.push(keep);
479 keep = self.empty(stash);
480 }
481 keep.push_into(item);
482 } else {
483 if ready.at_capacity() && !ready.is_empty() {
484 readied.push(ready);
485 ready = self.empty(stash);
486 }
487 ready.push_into(item);
488 }
489 }
490 self.recycle(buffer, stash);
492 }
493 if !keep.is_empty() {
495 kept.push(keep);
496 }
497 if !ready.is_empty() {
498 readied.push(ready);
499 }
500 }
501
502 fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
504 chunk.account()
505 }
506 }
507
508 pub use vec::VecMerger;
509 pub mod vec {
511
512 use super::{ContainerQueue, MergerChunk};
513 use crate::difference::Semigroup;
514 use std::collections::VecDeque;
515 use timely::progress::{frontier::AntichainRef, Antichain};
516
517 pub type VecMerger<D, T, R> =
519 super::ContainerMerger<Vec<(D, T, R)>, std::collections::VecDeque<(D, T, R)>>;
520
521 impl<D: Ord, T: Ord, R> ContainerQueue<Vec<(D, T, R)>> for VecDeque<(D, T, R)> {
522 fn next_or_alloc(&mut self) -> Result<(D, T, R), Vec<(D, T, R)>> {
523 if self.is_empty() {
524 Err(Vec::from(std::mem::take(self)))
525 } else {
526 Ok(self.pop_front().unwrap())
527 }
528 }
529 fn is_empty(&self) -> bool {
530 self.is_empty()
531 }
532 fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering {
533 let (data1, time1, _) = self.front().unwrap();
534 let (data2, time2, _) = other.front().unwrap();
535 (data1, time1).cmp(&(data2, time2))
536 }
537 fn from(list: Vec<(D, T, R)>) -> Self {
538 <Self as From<_>>::from(list)
539 }
540 }
541
542 impl<
543 D: Ord + 'static,
544 T: Ord + timely::PartialOrder + Clone + 'static,
545 R: Semigroup + 'static,
546 > MergerChunk for Vec<(D, T, R)>
547 {
548 type TimeOwned = T;
549 type DiffOwned = ();
550
551 fn time_kept(
552 (_, time, _): &Self::Item<'_>,
553 upper: &AntichainRef<Self::TimeOwned>,
554 frontier: &mut Antichain<Self::TimeOwned>,
555 ) -> bool {
556 if upper.less_equal(time) {
557 frontier.insert_with(&time, |time| time.clone());
558 true
559 } else {
560 false
561 }
562 }
563 fn push_and_add<'a>(
564 &mut self,
565 item1: Self::Item<'a>,
566 item2: Self::Item<'a>,
567 _stash: &mut Self::DiffOwned,
568 ) {
569 let (data, time, mut diff1) = item1;
570 let (_data, _time, diff2) = item2;
571 diff1.plus_equals(&diff2);
572 if !diff1.is_zero() {
573 self.push((data, time, diff1));
574 }
575 }
576 fn account(&self) -> (usize, usize, usize, usize) {
577 let (size, capacity, allocations) = (0, 0, 0);
578 (self.len(), size, capacity, allocations)
579 }
580 #[inline]
581 fn clear(&mut self) {
582 Vec::clear(self)
583 }
584 }
585 }
586
587 pub use columnation::ColMerger;
588 pub mod columnation {
590
591 use columnation::Columnation;
592 use timely::progress::{frontier::AntichainRef, Antichain};
593
594 use crate::containers::TimelyStack;
595 use crate::difference::Semigroup;
596
597 use super::{ContainerQueue, MergerChunk};
598
599 pub type ColMerger<D, T, R> =
601 super::ContainerMerger<TimelyStack<(D, T, R)>, TimelyStackQueue<(D, T, R)>>;
602
603 pub struct TimelyStackQueue<T: Columnation> {
605 list: TimelyStack<T>,
606 head: usize,
607 }
608
609 impl<D: Ord + Columnation, T: Ord + Columnation, R: Columnation>
610 ContainerQueue<TimelyStack<(D, T, R)>> for TimelyStackQueue<(D, T, R)>
611 {
612 fn next_or_alloc(&mut self) -> Result<&(D, T, R), TimelyStack<(D, T, R)>> {
613 if self.is_empty() {
614 Err(std::mem::take(&mut self.list))
615 } else {
616 Ok(self.pop())
617 }
618 }
619 fn is_empty(&self) -> bool {
620 self.head == self.list[..].len()
621 }
622 fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering {
623 let (data1, time1, _) = self.peek();
624 let (data2, time2, _) = other.peek();
625 (data1, time1).cmp(&(data2, time2))
626 }
627 fn from(list: TimelyStack<(D, T, R)>) -> Self {
628 TimelyStackQueue { list, head: 0 }
629 }
630 }
631
632 impl<T: Columnation> TimelyStackQueue<T> {
633 fn pop(&mut self) -> &T {
634 self.head += 1;
635 &self.list[self.head - 1]
636 }
637
638 fn peek(&self) -> &T {
639 &self.list[self.head]
640 }
641 }
642
643 impl<
644 D: Ord + Columnation + 'static,
645 T: Ord + timely::PartialOrder + Clone + Columnation + 'static,
646 R: Default + Semigroup + Columnation + 'static,
647 > MergerChunk for TimelyStack<(D, T, R)>
648 {
649 type TimeOwned = T;
650 type DiffOwned = R;
651
652 fn time_kept(
653 (_, time, _): &Self::Item<'_>,
654 upper: &AntichainRef<Self::TimeOwned>,
655 frontier: &mut Antichain<Self::TimeOwned>,
656 ) -> bool {
657 if upper.less_equal(time) {
658 frontier.insert_with(&time, |time| time.clone());
659 true
660 } else {
661 false
662 }
663 }
664 fn push_and_add<'a>(
665 &mut self,
666 item1: Self::Item<'a>,
667 item2: Self::Item<'a>,
668 stash: &mut Self::DiffOwned,
669 ) {
670 let (data, time, diff1) = item1;
671 let (_data, _time, diff2) = item2;
672 stash.clone_from(diff1);
673 stash.plus_equals(&diff2);
674 if !stash.is_zero() {
675 self.copy_destructured(data, time, stash);
676 }
677 }
678 fn account(&self) -> (usize, usize, usize, usize) {
679 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
680 let cb = |siz, cap| {
681 size += siz;
682 capacity += cap;
683 allocations += 1;
684 };
685 self.heap_size(cb);
686 (self.len(), size, capacity, allocations)
687 }
688 #[inline]
689 fn clear(&mut self) {
690 TimelyStack::clear(self)
691 }
692 }
693 }
694}