use timely::progress::frontier::AntichainRef;
use timely::progress::{frontier::Antichain, Timestamp};
use timely::container::PushInto;
use crate::logging::{BatcherEvent, Logger};
use crate::trace::{Batcher, Description};
pub struct MergeBatcher<M: Merger> {
chains: Vec<Vec<M::Chunk>>,
stash: Vec<M::Chunk>,
merger: M,
lower: Antichain<M::Time>,
frontier: Antichain<M::Time>,
logger: Option<Logger>,
operator_id: usize,
}
impl<M> Batcher for MergeBatcher<M>
where
M: Merger<Time: Timestamp>,
{
type Time = M::Time;
type Output = M::Chunk;
fn new(logger: Option<Logger>, operator_id: usize) -> Self {
Self {
logger,
operator_id,
merger: M::default(),
chains: Vec::new(),
stash: Vec::new(),
frontier: Antichain::new(),
lower: Antichain::from_elem(M::Time::minimum()),
}
}
fn seal(&mut self, upper: Antichain<M::Time>) -> (Vec<Self::Output>, Description<M::Time>) {
while self.chains.len() > 1 {
let list1 = self.chain_pop().unwrap();
let list2 = self.chain_pop().unwrap();
let merged = self.merge_by(list1, list2);
self.chain_push(merged);
}
let merged = self.chain_pop().unwrap_or_default();
let mut kept = Vec::new();
let mut readied = Vec::new();
self.frontier.clear();
self.merger.extract(merged, upper.borrow(), &mut self.frontier, &mut readied, &mut kept, &mut self.stash);
if !kept.is_empty() {
self.chain_push(kept);
}
self.stash.clear();
let description = Description::new(self.lower.clone(), upper.clone(), Antichain::from_elem(M::Time::minimum()));
self.lower = upper;
(readied, description)
}
#[inline]
fn frontier(&mut self) -> AntichainRef<'_, M::Time> {
self.frontier.borrow()
}
}
impl<M: Merger> PushInto<M::Chunk> for MergeBatcher<M> {
fn push_into(&mut self, chunk: M::Chunk) {
self.insert_chain(vec![chunk]);
}
}
impl<M: Merger> MergeBatcher<M> {
fn insert_chain(&mut self, chain: Vec<M::Chunk>) {
if !chain.is_empty() {
self.chain_push(chain);
while self.chains.len() > 1 && (self.chains[self.chains.len() - 1].len() >= self.chains[self.chains.len() - 2].len() / 2) {
let list1 = self.chain_pop().unwrap();
let list2 = self.chain_pop().unwrap();
let merged = self.merge_by(list1, list2);
self.chain_push(merged);
}
}
}
fn merge_by(&mut self, list1: Vec<M::Chunk>, list2: Vec<M::Chunk>) -> Vec<M::Chunk> {
let mut output = Vec::with_capacity(list1.len() + list2.len());
self.merger.merge(list1, list2, &mut output, &mut self.stash);
output
}
#[inline]
fn chain_pop(&mut self) -> Option<Vec<M::Chunk>> {
let chain = self.chains.pop();
self.account(chain.iter().flatten().map(M::account), -1);
chain
}
#[inline]
fn chain_push(&mut self, chain: Vec<M::Chunk>) {
self.account(chain.iter().map(M::account), 1);
self.chains.push(chain);
}
#[inline]
fn account<I: IntoIterator<Item = (usize, usize, usize, usize)>>(&self, items: I, diff: isize) {
if let Some(logger) = &self.logger {
let (mut records, mut size, mut capacity, mut allocations) = (0isize, 0isize, 0isize, 0isize);
for (records_, size_, capacity_, allocations_) in items {
records = records.saturating_add_unsigned(records_);
size = size.saturating_add_unsigned(size_);
capacity = capacity.saturating_add_unsigned(capacity_);
allocations = allocations.saturating_add_unsigned(allocations_);
}
logger.log(BatcherEvent {
operator: self.operator_id,
records_diff: records * diff,
size_diff: size * diff,
capacity_diff: capacity * diff,
allocations_diff: allocations * diff,
})
}
}
}
impl<M: Merger> Drop for MergeBatcher<M> {
fn drop(&mut self) {
while self.chain_pop().is_some() {}
}
}
pub trait Merger: Default {
type Chunk: Default;
type Time;
fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>);
fn extract(
&mut self,
merged: Vec<Self::Chunk>,
upper: AntichainRef<Self::Time>,
frontier: &mut Antichain<Self::Time>,
readied: &mut Vec<Self::Chunk>,
kept: &mut Vec<Self::Chunk>,
stash: &mut Vec<Self::Chunk>,
);
fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize);
}
pub mod vec {
use std::marker::PhantomData;
use timely::container::SizableContainer;
use timely::progress::frontier::{Antichain, AntichainRef};
use timely::PartialOrder;
use crate::trace::implementations::merge_batcher::Merger;
pub struct VecMerger<D, T, R> {
_marker: PhantomData<(D, T, R)>,
}
impl<D, T, R> Default for VecMerger<D, T, R> {
fn default() -> Self { Self { _marker: PhantomData } }
}
impl<D, T, R> VecMerger<D, T, R> {
fn target_capacity() -> usize {
timely::container::buffer::default_capacity::<(D, T, R)>().next_power_of_two()
}
fn empty(&self, stash: &mut Vec<Vec<(D, T, R)>>) -> Vec<(D, T, R)> {
let target = Self::target_capacity();
let mut container = stash.pop().unwrap_or_default();
container.clear();
if container.capacity() != target {
container = Vec::with_capacity(target);
}
container
}
fn refill(queue: &mut std::collections::VecDeque<(D, T, R)>, iter: &mut impl Iterator<Item = Vec<(D, T, R)>>, stash: &mut Vec<Vec<(D, T, R)>>) {
if queue.is_empty() {
let target = Self::target_capacity();
if stash.len() < 2 {
let mut recycled = Vec::from(std::mem::take(queue));
recycled.clear();
if recycled.capacity() == target {
stash.push(recycled);
}
}
if let Some(chunk) = iter.next() {
*queue = std::collections::VecDeque::from(chunk);
}
}
}
}
impl<D, T, R> Merger for VecMerger<D, T, R>
where
D: Ord + Clone + 'static,
T: Ord + Clone + PartialOrder + 'static,
R: crate::difference::Semigroup + 'static,
{
type Chunk = Vec<(D, T, R)>;
type Time = T;
fn merge(
&mut self,
list1: Vec<Vec<(D, T, R)>>,
list2: Vec<Vec<(D, T, R)>>,
output: &mut Vec<Vec<(D, T, R)>>,
stash: &mut Vec<Vec<(D, T, R)>>,
) {
use std::cmp::Ordering;
use std::collections::VecDeque;
let mut iter1 = list1.into_iter();
let mut iter2 = list2.into_iter();
let mut q1 = VecDeque::<(D,T,R)>::from(iter1.next().unwrap_or_default());
let mut q2 = VecDeque::<(D,T,R)>::from(iter2.next().unwrap_or_default());
let mut result = self.empty(stash);
while let (Some((d1, t1, _)), Some((d2, t2, _))) = (q1.front(), q2.front()) {
match (d1, t1).cmp(&(d2, t2)) {
Ordering::Less => {
result.push(q1.pop_front().unwrap());
}
Ordering::Greater => {
result.push(q2.pop_front().unwrap());
}
Ordering::Equal => {
let (d, t, mut r1) = q1.pop_front().unwrap();
let (_, _, r2) = q2.pop_front().unwrap();
r1.plus_equals(&r2);
if !r1.is_zero() {
result.push((d, t, r1));
}
}
}
if result.at_capacity() {
output.push(std::mem::take(&mut result));
result = self.empty(stash);
}
if q1.is_empty() { Self::refill(&mut q1, &mut iter1, stash); }
if q2.is_empty() { Self::refill(&mut q2, &mut iter2, stash); }
}
if !result.is_empty() { output.push(result); }
for q in [q1, q2] {
if !q.is_empty() { output.push(Vec::from(q)); }
}
output.extend(iter1);
output.extend(iter2);
}
fn extract(
&mut self,
merged: Vec<Vec<(D, T, R)>>,
upper: AntichainRef<T>,
frontier: &mut Antichain<T>,
ship: &mut Vec<Vec<(D, T, R)>>,
kept: &mut Vec<Vec<(D, T, R)>>,
stash: &mut Vec<Vec<(D, T, R)>>,
) {
let mut keep = self.empty(stash);
let mut ready = self.empty(stash);
for mut chunk in merged {
for (data, time, diff) in chunk.drain(..) {
if upper.less_equal(&time) {
frontier.insert_with(&time, |time| time.clone());
keep.push((data, time, diff));
} else {
ready.push((data, time, diff));
}
if keep.at_capacity() {
kept.push(std::mem::take(&mut keep));
keep = self.empty(stash);
}
if ready.at_capacity() {
ship.push(std::mem::take(&mut ready));
ready = self.empty(stash);
}
}
if chunk.capacity() == Self::target_capacity() {
stash.push(chunk);
}
}
if !keep.is_empty() { kept.push(keep); }
if !ready.is_empty() { ship.push(ready); }
}
fn account(chunk: &Vec<(D, T, R)>) -> (usize, usize, usize, usize) {
(chunk.len(), 0, 0, 0)
}
}
}