use timely::progress::frontier::AntichainRef;
use timely::progress::{frontier::Antichain, Timestamp};
use crate::logging::Logger;
use crate::trace;
pub trait BatcherStorage<T: Timestamp> : Default + Sized {
fn len(&self) -> usize;
fn merge(self, other: Self) -> Self;
fn split(&mut self, frontier: AntichainRef<T>) -> Self;
fn lower(&self, frontier: &mut Antichain<T>);
}
pub struct Batcher<T: Timestamp, S: BatcherStorage<T>> {
storages: Vec<S>,
lower: Antichain<T>,
prior: Antichain<T>,
_logger: Option<Logger>,
_operator_id: usize,
}
impl<T: Timestamp, S: BatcherStorage<T>> Batcher<T, S> {
fn tidy(&mut self) {
self.storages.retain(|x| x.len() > 0);
self.storages.sort_by_key(|x| x.len());
self.storages.reverse();
while let Some(pos) = (1..self.storages.len()).position(|i| self.storages[i-1].len() < 2 * self.storages[i].len()) {
while self.storages.len() > pos + 1 {
let x = self.storages.pop().unwrap();
let y = self.storages.pop().unwrap();
self.storages.push(x.merge(y));
self.storages.sort_by_key(|x| x.len());
self.storages.reverse();
}
}
}
}
impl<T: Timestamp, S: BatcherStorage<T>> trace::Batcher for Batcher<T, S> {
type Time = T;
type Input = S;
type Output = S;
fn new(logger: Option<Logger>, operator_id: usize) -> Self {
Self {
storages: Vec::default(),
lower: Default::default(),
prior: Antichain::from_elem(T::minimum()),
_logger: logger,
_operator_id: operator_id,
}
}
fn push_container(&mut self, batch: &mut Self::Input) {
if batch.len() > 0 {
batch.lower(&mut self.lower);
self.storages.push(std::mem::take(batch));
self.tidy();
}
}
fn seal<B: trace::Builder<Input=Self::Output, Time=Self::Time>>(&mut self, upper: Antichain<Self::Time>) -> B::Output {
let description = trace::Description::new(self.prior.clone(), upper.clone(), Antichain::new());
self.prior = upper.clone();
if let Some(mut store) = self.storages.pop() {
self.lower.clear();
let mut ship = store.split(upper.borrow());
let mut keep = store;
while let Some(mut store) = self.storages.pop() {
let split = store.split(upper.borrow());
ship = ship.merge(split);
keep = keep.merge(store);
}
keep.lower(&mut self.lower);
self.storages.push(keep);
B::seal(&mut vec![ship], description)
}
else {
B::seal(&mut vec![], description)
}
}
fn frontier(&mut self) -> AntichainRef<'_, Self::Time> { self.lower.borrow() }
}