palimpsest_dataflow/trace/implementations/
chainless_batcher.rs1use timely::progress::frontier::AntichainRef;
4use timely::progress::{frontier::Antichain, Timestamp};
5
6use crate::logging::Logger;
7use crate::trace;
8
9pub trait BatcherStorage<T: Timestamp>: Default + Sized {
11 fn len(&self) -> usize;
13 fn merge(self, other: Self) -> Self;
17 fn split(&mut self, frontier: AntichainRef<T>) -> Self;
19 fn lower(&self, frontier: &mut Antichain<T>);
23}
24
25pub struct Batcher<T: Timestamp, S: BatcherStorage<T>> {
27 storages: Vec<S>,
29 lower: Antichain<T>,
31 prior: Antichain<T>,
33
34 _logger: Option<Logger>,
36 _operator_id: usize,
38}
39
40impl<T: Timestamp, S: BatcherStorage<T>> Batcher<T, S> {
41 fn tidy(&mut self) {
43 self.storages.retain(|x| x.len() > 0);
44 self.storages.sort_by_key(|x| x.len());
45 self.storages.reverse();
46 while let Some(pos) = (1..self.storages.len())
47 .position(|i| self.storages[i - 1].len() < 2 * self.storages[i].len())
48 {
49 while self.storages.len() > pos + 1 {
50 let x = self.storages.pop().unwrap();
51 let y = self.storages.pop().unwrap();
52 self.storages.push(x.merge(y));
53 self.storages.sort_by_key(|x| x.len());
54 self.storages.reverse();
55 }
56 }
57 }
58}
59
60impl<T: Timestamp, S: BatcherStorage<T>> trace::Batcher for Batcher<T, S> {
61 type Time = T;
62 type Input = S;
63 type Output = S;
64
65 fn new(logger: Option<Logger>, operator_id: usize) -> Self {
66 Self {
67 storages: Vec::default(),
68 lower: Default::default(),
69 prior: Antichain::from_elem(T::minimum()),
70 _logger: logger,
71 _operator_id: operator_id,
72 }
73 }
74
75 fn push_container(&mut self, batch: &mut Self::Input) {
76 if batch.len() > 0 {
77 batch.lower(&mut self.lower);
78 self.storages.push(std::mem::take(batch));
79 self.tidy();
80 }
81 }
82
83 fn seal<B: trace::Builder<Input = Self::Output, Time = Self::Time>>(
84 &mut self,
85 upper: Antichain<Self::Time>,
86 ) -> B::Output {
87 let description =
88 trace::Description::new(self.prior.clone(), upper.clone(), Antichain::new());
89 self.prior = upper.clone();
90 let mut stores = self.storages.iter_mut().rev();
91 if let Some(store) = stores.next() {
92 self.lower.clear();
93 let mut ship = store.split(upper.borrow());
94 store.lower(&mut self.lower);
95 for store in stores {
96 let split = store.split(upper.borrow());
97 ship = ship.merge(split);
98 store.lower(&mut self.lower);
99 }
100 self.tidy();
101 B::seal(&mut vec![ship], description)
102 } else {
103 B::seal(&mut vec![], description)
104 }
105 }
106
107 fn frontier(&mut self) -> AntichainRef<'_, Self::Time> {
108 self.lower.borrow()
109 }
110}