use std::collections::VecDeque;
use timely::progress::frontier::AntichainRef;
use timely::progress::{frontier::Antichain, Timestamp};
use timely::container::PushInto;
use crate::logging::Logger;
use crate::trace::{Batcher, Description};
use super::layout::ColumnarUpdate as Update;
use super::updates::UpdatesTyped;
use super::arrangement::trie_merger;
use super::spill::{Entry, SpillPolicy};
pub struct MergeBatcher<U: Update> {
chains: Vec<VecDeque<Entry<UpdatesTyped<U>>>>,
lower: Antichain<U::Time>,
frontier: Antichain<U::Time>,
policy: Option<Box<dyn SpillPolicy<UpdatesTyped<U>>>>,
}
impl<U: Update<Time: Timestamp>> Batcher for MergeBatcher<U> {
type Time = U::Time;
type Output = UpdatesTyped<U>;
fn new(_logger: Option<Logger>, _operator_id: usize) -> Self {
Self {
chains: Vec::new(),
frontier: Antichain::new(),
lower: Antichain::from_elem(U::Time::minimum()),
policy: None,
}
}
fn seal(&mut self, upper: Antichain<U::Time>) -> (Vec<Self::Output>, Description<U::Time>) {
while self.chains.len() > 1 {
let list1 = self.chains.pop().unwrap();
let list2 = self.chains.pop().unwrap();
let merged = self.merge_by(list1, list2);
self.push_chain(merged);
}
let merged = self.chains.pop().unwrap_or_default();
let mut readied: Vec<UpdatesTyped<U>> = Vec::new();
let mut kept_chain: VecDeque<Entry<UpdatesTyped<U>>> = VecDeque::new();
self.frontier.clear();
{
let policy = &mut self.policy;
let frontier = &mut self.frontier;
let ship = |chunk: UpdatesTyped<U>| readied.push(chunk);
let keep = |chunk: UpdatesTyped<U>| {
kept_chain.push_back(Entry::Typed(chunk));
if let Some(p) = policy.as_mut() {
p.apply(&mut kept_chain);
}
};
trie_merger::extract(
FetchIter::new(merged),
upper.borrow(),
frontier,
ship,
keep,
);
}
if !kept_chain.is_empty() {
self.push_chain(kept_chain);
}
let description = Description::new(self.lower.clone(), upper.clone(), Antichain::from_elem(U::Time::minimum()));
self.lower = upper;
(readied, description)
}
#[inline]
fn frontier(&mut self) -> AntichainRef<'_, U::Time> {
self.frontier.borrow()
}
}
impl<U: Update> PushInto<UpdatesTyped<U>> for MergeBatcher<U> {
fn push_into(&mut self, chunk: UpdatesTyped<U>) {
self.insert_chain(VecDeque::from([Entry::Typed(chunk)]));
}
}
impl<U: Update> MergeBatcher<U> {
pub fn set_spill_policy(&mut self, policy: Box<dyn SpillPolicy<UpdatesTyped<U>>>) {
self.policy = Some(policy);
}
pub fn resident_records(&self) -> usize {
self.chains
.iter()
.flat_map(|c| c.iter())
.map(|e| match e {
Entry::Typed(c) => {
use timely::Accountable;
c.record_count() as usize
}
Entry::Paged(_) => 0,
})
.sum()
}
fn insert_chain(&mut self, chain: VecDeque<Entry<UpdatesTyped<U>>>) {
if !chain.is_empty() {
self.push_chain(chain);
while self.chains.len() > 1 && (self.chains[self.chains.len() - 1].len() >= self.chains[self.chains.len() - 2].len() / 2) {
let list1 = self.chains.pop().unwrap();
let list2 = self.chains.pop().unwrap();
let merged = self.merge_by(list1, list2);
self.push_chain(merged);
}
}
}
fn push_chain(&mut self, chain: VecDeque<Entry<UpdatesTyped<U>>>) {
self.chains.push(chain);
if let Some(policy) = self.policy.as_mut() {
if let Some(top) = self.chains.last_mut() {
policy.apply(top);
}
}
}
fn merge_by(
&mut self,
list1: VecDeque<Entry<UpdatesTyped<U>>>,
list2: VecDeque<Entry<UpdatesTyped<U>>>,
) -> VecDeque<Entry<UpdatesTyped<U>>> {
let mut output: VecDeque<Entry<UpdatesTyped<U>>> = VecDeque::new();
let policy = &mut self.policy;
let sink = |chunk: UpdatesTyped<U>| {
output.push_back(Entry::Typed(chunk));
if let Some(p) = policy.as_mut() {
p.apply(&mut output);
}
};
trie_merger::merge_batches(
FetchIter::new(list1),
FetchIter::new(list2),
sink,
);
output
}
}
struct FetchIter<U: Update> {
queue: VecDeque<Entry<UpdatesTyped<U>>>,
pending: VecDeque<UpdatesTyped<U>>,
}
impl<U: Update> FetchIter<U> {
fn new(queue: VecDeque<Entry<UpdatesTyped<U>>>) -> Self {
Self { queue, pending: VecDeque::new() }
}
}
impl<U: Update> Iterator for FetchIter<U> {
type Item = UpdatesTyped<U>;
fn next(&mut self) -> Option<UpdatesTyped<U>> {
loop {
if let Some(c) = self.pending.pop_front() {
return Some(c);
}
match self.queue.pop_front()? {
Entry::Typed(c) => return Some(c),
Entry::Paged(handle) => match handle.fetch() {
Ok(chunks) => self.pending.extend(chunks),
Err(_) => panic!("Fetch::fetch failed; retry path not yet wired"),
},
}
}
}
}