Skip to main content

palimpsest_dataflow/trace/implementations/
chainless_batcher.rs

1//! A `Batcher` implementation based on merge sort.
2
3use timely::progress::frontier::AntichainRef;
4use timely::progress::{frontier::Antichain, Timestamp};
5
6use crate::logging::Logger;
7use crate::trace;
8
9/// A type that can be used as storage within a merge batcher.
10pub trait BatcherStorage<T: Timestamp>: Default + Sized {
11    /// Number of contained updates.
12    fn len(&self) -> usize;
13    /// Merges two storage containers into one.
14    ///
15    /// This is expected to consolidate updates as it goes.
16    fn merge(self, other: Self) -> Self;
17    /// Extracts elements not greater or equal to the frontier.
18    fn split(&mut self, frontier: AntichainRef<T>) -> Self;
19    /// Ensures `frontier` is less or equal to all contained times.
20    ///
21    /// Consider merging with `split`, but needed for new stores as well.
22    fn lower(&self, frontier: &mut Antichain<T>);
23}
24
25/// A batcher that simple merges `BatcherStorage` implementors.
26pub struct Batcher<T: Timestamp, S: BatcherStorage<T>> {
27    /// Each store is at least twice the size of the next.
28    storages: Vec<S>,
29    /// The lower bound of timestamps of the maintained updates.
30    lower: Antichain<T>,
31    /// The previosly minted frontier.
32    prior: Antichain<T>,
33
34    /// Logger for size accounting.
35    _logger: Option<Logger>,
36    /// Timely operator ID.
37    _operator_id: usize,
38}
39
40impl<T: Timestamp, S: BatcherStorage<T>> Batcher<T, S> {
41    /// Ensures lists decrease in size geometrically.
42    fn tidy(&mut self) {
43        self.storages.retain(|x| x.len() > 0);
44        self.storages.sort_by_key(|x| x.len());
45        self.storages.reverse();
46        while let Some(pos) = (1..self.storages.len())
47            .position(|i| self.storages[i - 1].len() < 2 * self.storages[i].len())
48        {
49            while self.storages.len() > pos + 1 {
50                let x = self.storages.pop().unwrap();
51                let y = self.storages.pop().unwrap();
52                self.storages.push(x.merge(y));
53                self.storages.sort_by_key(|x| x.len());
54                self.storages.reverse();
55            }
56        }
57    }
58}
59
60impl<T: Timestamp, S: BatcherStorage<T>> trace::Batcher for Batcher<T, S> {
61    type Time = T;
62    type Input = S;
63    type Output = S;
64
65    fn new(logger: Option<Logger>, operator_id: usize) -> Self {
66        Self {
67            storages: Vec::default(),
68            lower: Default::default(),
69            prior: Antichain::from_elem(T::minimum()),
70            _logger: logger,
71            _operator_id: operator_id,
72        }
73    }
74
75    fn push_container(&mut self, batch: &mut Self::Input) {
76        if batch.len() > 0 {
77            batch.lower(&mut self.lower);
78            self.storages.push(std::mem::take(batch));
79            self.tidy();
80        }
81    }
82
83    fn seal<B: trace::Builder<Input = Self::Output, Time = Self::Time>>(
84        &mut self,
85        upper: Antichain<Self::Time>,
86    ) -> B::Output {
87        let description =
88            trace::Description::new(self.prior.clone(), upper.clone(), Antichain::new());
89        self.prior = upper.clone();
90        let mut stores = self.storages.iter_mut().rev();
91        if let Some(store) = stores.next() {
92            self.lower.clear();
93            let mut ship = store.split(upper.borrow());
94            store.lower(&mut self.lower);
95            for store in stores {
96                let split = store.split(upper.borrow());
97                ship = ship.merge(split);
98                store.lower(&mut self.lower);
99            }
100            self.tidy();
101            B::seal(&mut vec![ship], description)
102        } else {
103            B::seal(&mut vec![], description)
104        }
105    }
106
107    fn frontier(&mut self) -> AntichainRef<'_, Self::Time> {
108        self.lower.borrow()
109    }
110}