differential_dataflow/trace/implementations/
chainless_batcher.rs1use timely::progress::frontier::AntichainRef;
4use timely::progress::{frontier::Antichain, Timestamp};
5
6use crate::logging::Logger;
7use crate::trace;
8
9pub trait BatcherStorage<T: Timestamp> : Default + Sized {
11 fn len(&self) -> usize;
13 fn merge(self, other: Self) -> Self;
17 fn split(&mut self, frontier: AntichainRef<T>) -> Self;
19 fn lower(&self, frontier: &mut Antichain<T>);
23}
24
25pub struct Batcher<T: Timestamp, S: BatcherStorage<T>> {
27 storages: Vec<S>,
29 lower: Antichain<T>,
31 prior: Antichain<T>,
33
34 _logger: Option<Logger>,
36 _operator_id: usize,
38}
39
40impl<T: Timestamp, S: BatcherStorage<T>> Batcher<T, S> {
41 fn tidy(&mut self) {
43 self.storages.retain(|x| x.len() > 0);
44 self.storages.sort_by_key(|x| x.len());
45 self.storages.reverse();
46 while let Some(pos) = (1..self.storages.len()).position(|i| self.storages[i-1].len() < 2 * self.storages[i].len()) {
47 while self.storages.len() > pos + 1 {
48 let x = self.storages.pop().unwrap();
49 let y = self.storages.pop().unwrap();
50 self.storages.push(x.merge(y));
51 self.storages.sort_by_key(|x| x.len());
52 self.storages.reverse();
53 }
54 }
55 }
56}
57
58impl<T: Timestamp, S: BatcherStorage<T>> trace::Batcher for Batcher<T, S> {
59 type Time = T;
60 type Input = S;
61 type Output = S;
62
63 fn new(logger: Option<Logger>, operator_id: usize) -> Self {
64 Self {
65 storages: Vec::default(),
66 lower: Default::default(),
67 prior: Antichain::from_elem(T::minimum()),
68 _logger: logger,
69 _operator_id: operator_id,
70 }
71 }
72
73 fn push_container(&mut self, batch: &mut Self::Input) {
74 if batch.len() > 0 {
75 batch.lower(&mut self.lower);
78 self.storages.push(std::mem::take(batch));
79 self.tidy();
80 }
81 }
82
83 fn seal<B: trace::Builder<Input=Self::Output, Time=Self::Time>>(&mut self, upper: Antichain<Self::Time>) -> B::Output {
84 let description = trace::Description::new(self.prior.clone(), upper.clone(), Antichain::new());
85 self.prior = upper.clone();
86 if let Some(mut store) = self.storages.pop() {
87 self.lower.clear();
88 let mut ship = store.split(upper.borrow());
89 let mut keep = store;
90 while let Some(mut store) = self.storages.pop() {
91 let split = store.split(upper.borrow());
92 ship = ship.merge(split);
93 keep = keep.merge(store);
94 }
95 keep.lower(&mut self.lower);
96 self.storages.push(keep);
97 B::seal(&mut vec![ship], description)
98 }
99 else {
100 B::seal(&mut vec![], description)
101 }
102 }
103
104 fn frontier(&mut self) -> AntichainRef<'_, Self::Time> { self.lower.borrow() }
105}