1use std::marker::PhantomData;
14
15use timely::progress::frontier::AntichainRef;
16use timely::progress::{frontier::Antichain, Timestamp};
17use timely::Container;
18use timely::container::{ContainerBuilder, PushInto};
19
20use crate::logging::{BatcherEvent, Logger};
21use crate::trace::{Batcher, Builder, Description};
22
23pub struct MergeBatcher<Input, C, M: Merger> {
28 chunker: C,
30 chains: Vec<Vec<M::Chunk>>,
34 stash: Vec<M::Chunk>,
36 merger: M,
38 lower: Antichain<M::Time>,
40 frontier: Antichain<M::Time>,
42 logger: Option<Logger>,
44 operator_id: usize,
46 _marker: PhantomData<Input>,
48}
49
50impl<Input, C, M> Batcher for MergeBatcher<Input, C, M>
51where
52 C: ContainerBuilder<Container=M::Chunk> + Default + for<'a> PushInto<&'a mut Input>,
53 M: Merger<Time: Timestamp>,
54{
55 type Input = Input;
56 type Time = M::Time;
57 type Output = M::Chunk;
58
59 fn new(logger: Option<Logger>, operator_id: usize) -> Self {
60 Self {
61 logger,
62 operator_id,
63 chunker: C::default(),
64 merger: M::default(),
65 chains: Vec::new(),
66 stash: Vec::new(),
67 frontier: Antichain::new(),
68 lower: Antichain::from_elem(M::Time::minimum()),
69 _marker: PhantomData,
70 }
71 }
72
73 fn push_container(&mut self, container: &mut Input) {
76 self.chunker.push_into(container);
77 while let Some(chunk) = self.chunker.extract() {
78 let chunk = std::mem::take(chunk);
79 self.insert_chain(vec![chunk]);
80 }
81 }
82
83 fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(&mut self, upper: Antichain<M::Time>) -> B::Output {
88 while let Some(chunk) = self.chunker.finish() {
90 let chunk = std::mem::take(chunk);
91 self.insert_chain(vec![chunk]);
92 }
93
94 while self.chains.len() > 1 {
96 let list1 = self.chain_pop().unwrap();
97 let list2 = self.chain_pop().unwrap();
98 let merged = self.merge_by(list1, list2);
99 self.chain_push(merged);
100 }
101 let merged = self.chain_pop().unwrap_or_default();
102
103 let mut kept = Vec::new();
105 let mut readied = Vec::new();
106 self.frontier.clear();
107
108 self.merger.extract(merged, upper.borrow(), &mut self.frontier, &mut readied, &mut kept, &mut self.stash);
109
110 if !kept.is_empty() {
111 self.chain_push(kept);
112 }
113
114 self.stash.clear();
115
116 let description = Description::new(self.lower.clone(), upper.clone(), Antichain::from_elem(M::Time::minimum()));
117 let seal = B::seal(&mut readied, description);
118 self.lower = upper;
119 seal
120 }
121
122 #[inline]
124 fn frontier(&mut self) -> AntichainRef<'_, M::Time> {
125 self.frontier.borrow()
126 }
127}
128
129impl<Input, C, M: Merger> MergeBatcher<Input, C, M> {
130 fn insert_chain(&mut self, chain: Vec<M::Chunk>) {
133 if !chain.is_empty() {
134 self.chain_push(chain);
135 while self.chains.len() > 1 && (self.chains[self.chains.len() - 1].len() >= self.chains[self.chains.len() - 2].len() / 2) {
136 let list1 = self.chain_pop().unwrap();
137 let list2 = self.chain_pop().unwrap();
138 let merged = self.merge_by(list1, list2);
139 self.chain_push(merged);
140 }
141 }
142 }
143
144 fn merge_by(&mut self, list1: Vec<M::Chunk>, list2: Vec<M::Chunk>) -> Vec<M::Chunk> {
146 let mut output = Vec::with_capacity(list1.len() + list2.len());
148 self.merger.merge(list1, list2, &mut output, &mut self.stash);
149
150 output
151 }
152
153 #[inline]
155 fn chain_pop(&mut self) -> Option<Vec<M::Chunk>> {
156 let chain = self.chains.pop();
157 self.account(chain.iter().flatten().map(M::account), -1);
158 chain
159 }
160
161 #[inline]
163 fn chain_push(&mut self, chain: Vec<M::Chunk>) {
164 self.account(chain.iter().map(M::account), 1);
165 self.chains.push(chain);
166 }
167
168 #[inline]
173 fn account<I: IntoIterator<Item = (usize, usize, usize, usize)>>(&self, items: I, diff: isize) {
174 if let Some(logger) = &self.logger {
175 let (mut records, mut size, mut capacity, mut allocations) = (0isize, 0isize, 0isize, 0isize);
176 for (records_, size_, capacity_, allocations_) in items {
177 records = records.saturating_add_unsigned(records_);
178 size = size.saturating_add_unsigned(size_);
179 capacity = capacity.saturating_add_unsigned(capacity_);
180 allocations = allocations.saturating_add_unsigned(allocations_);
181 }
182 logger.log(BatcherEvent {
183 operator: self.operator_id,
184 records_diff: records * diff,
185 size_diff: size * diff,
186 capacity_diff: capacity * diff,
187 allocations_diff: allocations * diff,
188 })
189 }
190 }
191}
192
193impl<Input, C, M: Merger> Drop for MergeBatcher<Input, C, M> {
194 fn drop(&mut self) {
195 while self.chain_pop().is_some() {}
197 }
198}
199
200pub trait Merger: Default {
202 type Chunk: Container;
204 type Time;
206 fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>);
208 fn extract(
210 &mut self,
211 merged: Vec<Self::Chunk>,
212 upper: AntichainRef<Self::Time>,
213 frontier: &mut Antichain<Self::Time>,
214 readied: &mut Vec<Self::Chunk>,
215 kept: &mut Vec<Self::Chunk>,
216 stash: &mut Vec<Self::Chunk>,
217 );
218
219 fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize);
221}
222
223pub use container::{VecMerger, ColMerger};
224
225pub mod container {
226
227 use std::cmp::Ordering;
240 use std::marker::PhantomData;
241 use timely::{Container, container::{PushInto, SizableContainer}};
242 use timely::progress::frontier::{Antichain, AntichainRef};
243 use timely::{Data, PartialOrder};
244
245 use crate::trace::implementations::merge_batcher::Merger;
246
247 pub trait ContainerQueue<C: Container> {
249 fn next_or_alloc(&mut self) -> Result<C::Item<'_>, C>;
251 fn is_empty(&self) -> bool;
253 fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering;
255 fn from(container: C) -> Self;
257 }
258
259 pub trait MergerChunk : SizableContainer {
261 type TimeOwned;
266 type DiffOwned: Default;
271
272 fn time_kept(time1: &Self::Item<'_>, upper: &AntichainRef<Self::TimeOwned>, frontier: &mut Antichain<Self::TimeOwned>) -> bool;
276
277 fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, stash: &mut Self::DiffOwned);
282
283 fn account(&self) -> (usize, usize, usize, usize) {
286 let (size, capacity, allocations) = (0, 0, 0);
287 (self.len(), size, capacity, allocations)
288 }
289 }
290
291 pub struct ContainerMerger<MC, CQ> {
296 _marker: PhantomData<(MC, CQ)>,
297 }
298
299 impl<MC, CQ> Default for ContainerMerger<MC, CQ> {
300 fn default() -> Self {
301 Self { _marker: PhantomData, }
302 }
303 }
304
305 impl<MC: MergerChunk, CQ> ContainerMerger<MC, CQ> {
306 #[inline]
308 fn empty(&self, stash: &mut Vec<MC>) -> MC {
309 stash.pop().unwrap_or_else(|| {
310 let mut container = MC::default();
311 container.ensure_capacity(&mut None);
312 container
313 })
314 }
315 #[inline]
317 fn recycle(&self, mut chunk: MC, stash: &mut Vec<MC>) {
318 chunk.clear();
320 stash.push(chunk);
321 }
322 }
323
324 impl<MC, CQ> Merger for ContainerMerger<MC, CQ>
325 where
326 for<'a> MC: MergerChunk<TimeOwned: Ord + PartialOrder + Data> + Clone + PushInto<<MC as Container>::Item<'a>> + 'static,
327 CQ: ContainerQueue<MC>,
328 {
329 type Time = MC::TimeOwned;
330 type Chunk = MC;
331
332 fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>) {
334 let mut list1 = list1.into_iter();
335 let mut list2 = list2.into_iter();
336
337 let mut head1 = CQ::from(list1.next().unwrap_or_default());
338 let mut head2 = CQ::from(list2.next().unwrap_or_default());
339
340 let mut result = self.empty(stash);
341
342 let mut diff_owned = Default::default();
343
344 while !head1.is_empty() && !head2.is_empty() {
346 while !result.at_capacity() && !head1.is_empty() && !head2.is_empty() {
347 let cmp = head1.cmp_heads(&head2);
348 match cmp {
352 Ordering::Less => {
353 result.push_into(head1.next_or_alloc().ok().unwrap());
354 }
355 Ordering::Greater => {
356 result.push_into(head2.next_or_alloc().ok().unwrap());
357 }
358 Ordering::Equal => {
359 let item1 = head1.next_or_alloc().ok().unwrap();
360 let item2 = head2.next_or_alloc().ok().unwrap();
361 result.push_and_add(item1, item2, &mut diff_owned);
362 }
363 }
364 }
365
366 if result.at_capacity() {
367 output.push_into(result);
368 result = self.empty(stash);
369 }
370
371 if head1.is_empty() {
372 self.recycle(head1.next_or_alloc().err().unwrap(), stash);
373 head1 = CQ::from(list1.next().unwrap_or_default());
374 }
375 if head2.is_empty() {
376 self.recycle(head2.next_or_alloc().err().unwrap(), stash);
377 head2 = CQ::from(list2.next().unwrap_or_default());
378 }
379 }
380
381 while let Ok(next) = head1.next_or_alloc() {
383 result.push_into(next);
384 if result.at_capacity() {
385 output.push_into(result);
386 result = self.empty(stash);
387 }
388 }
389 if !result.is_empty() {
390 output.push_into(result);
391 result = self.empty(stash);
392 }
393 output.extend(list1);
394
395 while let Ok(next) = head2.next_or_alloc() {
397 result.push_into(next);
398 if result.at_capacity() {
399 output.push(result);
400 result = self.empty(stash);
401 }
402 }
403 if !result.is_empty() {
404 output.push_into(result);
405 }
407 output.extend(list2);
408 }
409
410 fn extract(
411 &mut self,
412 merged: Vec<Self::Chunk>,
413 upper: AntichainRef<Self::Time>,
414 frontier: &mut Antichain<Self::Time>,
415 readied: &mut Vec<Self::Chunk>,
416 kept: &mut Vec<Self::Chunk>,
417 stash: &mut Vec<Self::Chunk>,
418 ) {
419 let mut keep = self.empty(stash);
420 let mut ready = self.empty(stash);
421
422 for mut buffer in merged {
423 for item in buffer.drain() {
424 if MC::time_kept(&item, &upper, frontier) {
425 if keep.at_capacity() && !keep.is_empty() {
426 kept.push(keep);
427 keep = self.empty(stash);
428 }
429 keep.push_into(item);
430 } else {
431 if ready.at_capacity() && !ready.is_empty() {
432 readied.push(ready);
433 ready = self.empty(stash);
434 }
435 ready.push_into(item);
436 }
437 }
438 self.recycle(buffer, stash);
440 }
441 if !keep.is_empty() {
443 kept.push(keep);
444 }
445 if !ready.is_empty() {
446 readied.push(ready);
447 }
448 }
449
450 fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
452 chunk.account()
453 }
454 }
455
456 pub use vec::VecMerger;
457 pub mod vec {
459
460 use std::collections::VecDeque;
461 use timely::progress::{Antichain, frontier::AntichainRef};
462 use crate::difference::Semigroup;
463 use super::{ContainerQueue, MergerChunk};
464
465 pub type VecMerger<D, T, R> = super::ContainerMerger<Vec<(D, T, R)>, std::collections::VecDeque<(D, T, R)>>;
467
468 impl<D: Ord, T: Ord, R> ContainerQueue<Vec<(D, T, R)>> for VecDeque<(D, T, R)> {
469 fn next_or_alloc(&mut self) -> Result<(D, T, R), Vec<(D, T, R)>> {
470 if self.is_empty() {
471 Err(Vec::from(std::mem::take(self)))
472 }
473 else {
474 Ok(self.pop_front().unwrap())
475 }
476 }
477 fn is_empty(&self) -> bool {
478 self.is_empty()
479 }
480 fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering {
481 let (data1, time1, _) = self.front().unwrap();
482 let (data2, time2, _) = other.front().unwrap();
483 (data1, time1).cmp(&(data2, time2))
484 }
485 fn from(list: Vec<(D, T, R)>) -> Self {
486 <Self as From<_>>::from(list)
487 }
488 }
489
490 impl<D: Ord + 'static, T: Ord + timely::PartialOrder + Clone + 'static, R: Semigroup + 'static> MergerChunk for Vec<(D, T, R)> {
491 type TimeOwned = T;
492 type DiffOwned = ();
493
494 fn time_kept((_, time, _): &Self::Item<'_>, upper: &AntichainRef<Self::TimeOwned>, frontier: &mut Antichain<Self::TimeOwned>) -> bool {
495 if upper.less_equal(time) {
496 frontier.insert_with(&time, |time| time.clone());
497 true
498 }
499 else { false }
500 }
501 fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, _stash: &mut Self::DiffOwned) {
502 let (data, time, mut diff1) = item1;
503 let (_data, _time, diff2) = item2;
504 diff1.plus_equals(&diff2);
505 if !diff1.is_zero() {
506 self.push((data, time, diff1));
507 }
508 }
509 fn account(&self) -> (usize, usize, usize, usize) {
510 let (size, capacity, allocations) = (0, 0, 0);
511 (self.len(), size, capacity, allocations)
512 }
513 }
514 }
515
516 pub use columnation::ColMerger;
517 pub mod columnation {
519
520 use timely::progress::{Antichain, frontier::AntichainRef};
521 use columnation::Columnation;
522
523 use crate::containers::TimelyStack;
524 use crate::difference::Semigroup;
525
526 use super::{ContainerQueue, MergerChunk};
527
528 pub type ColMerger<D, T, R> = super::ContainerMerger<TimelyStack<(D,T,R)>,TimelyStackQueue<(D, T, R)>>;
530
531 pub struct TimelyStackQueue<T: Columnation> {
533 list: TimelyStack<T>,
534 head: usize,
535 }
536
537 impl<D: Ord + Columnation, T: Ord + Columnation, R: Columnation> ContainerQueue<TimelyStack<(D, T, R)>> for TimelyStackQueue<(D, T, R)> {
538 fn next_or_alloc(&mut self) -> Result<&(D, T, R), TimelyStack<(D, T, R)>> {
539 if self.is_empty() {
540 Err(std::mem::take(&mut self.list))
541 }
542 else {
543 Ok(self.pop())
544 }
545 }
546 fn is_empty(&self) -> bool {
547 self.head == self.list[..].len()
548 }
549 fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering {
550 let (data1, time1, _) = self.peek();
551 let (data2, time2, _) = other.peek();
552 (data1, time1).cmp(&(data2, time2))
553 }
554 fn from(list: TimelyStack<(D, T, R)>) -> Self {
555 TimelyStackQueue { list, head: 0 }
556 }
557 }
558
559 impl<T: Columnation> TimelyStackQueue<T> {
560 fn pop(&mut self) -> &T {
561 self.head += 1;
562 &self.list[self.head - 1]
563 }
564
565 fn peek(&self) -> &T {
566 &self.list[self.head]
567 }
568 }
569
570 impl<D: Ord + Columnation + 'static, T: Ord + timely::PartialOrder + Clone + Columnation + 'static, R: Default + Semigroup + Columnation + 'static> MergerChunk for TimelyStack<(D, T, R)> {
571 type TimeOwned = T;
572 type DiffOwned = R;
573
574 fn time_kept((_, time, _): &Self::Item<'_>, upper: &AntichainRef<Self::TimeOwned>, frontier: &mut Antichain<Self::TimeOwned>) -> bool {
575 if upper.less_equal(time) {
576 frontier.insert_with(&time, |time| time.clone());
577 true
578 }
579 else { false }
580 }
581 fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, stash: &mut Self::DiffOwned) {
582 let (data, time, diff1) = item1;
583 let (_data, _time, diff2) = item2;
584 stash.clone_from(diff1);
585 stash.plus_equals(&diff2);
586 if !stash.is_zero() {
587 self.copy_destructured(data, time, stash);
588 }
589 }
590 fn account(&self) -> (usize, usize, usize, usize) {
591 let (mut size, mut capacity, mut allocations) = (0, 0, 0);
592 let cb = |siz, cap| {
593 size += siz;
594 capacity += cap;
595 allocations += 1;
596 };
597 self.heap_size(cb);
598 (self.len(), size, capacity, allocations)
599 }
600 }
601 }
602}