use crate::{
dynamic::{DynWeightedPairs, pair::DynPair},
trace::{Batch, BatchFactories, Batcher, Builder, TupleBuilder},
};
use size_of::SizeOf;
use std::marker::PhantomData;
mod merge_sorter;
mod tests;
pub use merge_sorter::MergeSorter;
#[derive(SizeOf)]
pub struct MergeBatcher<B: Batch> {
#[size_of(skip)]
batch_factories: B::Factories,
sorter: MergeSorter<DynPair<B::Key, B::Val>, B::R>,
time: B::Time,
phantom: PhantomData<B>,
}
impl<B> Batcher<B> for MergeBatcher<B>
where
Self: SizeOf,
B: Batch,
{
fn new_batcher(batch_factories: &B::Factories, time: B::Time) -> Self {
Self {
batch_factories: batch_factories.clone(),
sorter: MergeSorter::new(
batch_factories.weighted_item_factory(),
batch_factories.weighted_items_factory(),
),
time,
phantom: PhantomData,
}
}
fn push_batch(&mut self, batch: &mut Box<DynWeightedPairs<DynPair<B::Key, B::Val>, B::R>>) {
self.sorter.push_batch(batch);
}
fn push_consolidated_batch(
&mut self,
batch: &mut Box<DynWeightedPairs<DynPair<B::Key, B::Val>, B::R>>,
) {
self.sorter.push_consolidated_batch(batch);
}
fn tuples(&self) -> usize {
self.sorter.len()
}
fn seal(mut self) -> B {
let mut merged = Vec::new();
self.sorter.finish_into(&mut merged);
let capacity = merged.iter().map(|batch| batch.len()).sum();
let builder = B::Builder::with_capacity(&self.batch_factories, capacity, capacity);
let mut builder = TupleBuilder::new(&self.batch_factories, builder);
for mut buffer in merged.drain(..) {
for tuple in buffer.dyn_iter_mut() {
let (kv, w) = tuple.split_mut();
let (k, v) = kv.split_mut();
builder.push_vals(k, v, &mut self.time.clone(), w);
}
}
builder.done()
}
}