use timely::dataflow::Scope;
use crate::consolidation::ConsolidatingContainerBuilder;
use crate::difference::Semigroup;
use crate::{ExchangeData, Hashable, VecCollection};
use crate::lattice::Lattice;
use crate::trace::{Batcher, Builder};
use crate::Data;
impl<G, D, R> VecCollection<G, D, R>
where
G: Scope<Timestamp: Data + Lattice>,
D: ExchangeData + Hashable,
R: Semigroup + ExchangeData,
{
pub fn consolidate(&self) -> Self {
use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine};
self.consolidate_named::<KeyBatcher<_, _, _>, KeyBuilder<_, _, _>, KeySpine<_, _, _>, _>(
"Consolidate",
|key, &()| key.clone(),
)
}
pub fn consolidate_named<Ba, Bu, Tr, F>(&self, name: &str, reify: F) -> Self
where
Ba: Batcher<Input = Vec<((D, ()), G::Timestamp, R)>, Time = G::Timestamp> + 'static,
Tr: for<'a> crate::trace::Trace<Time = G::Timestamp, Diff = R> + 'static,
Bu: Builder<Time = Tr::Time, Input = Ba::Output, Output = Tr::Batch>,
F: Fn(Tr::Key<'_>, Tr::Val<'_>) -> D + 'static,
{
use crate::operators::arrange::arrangement::Arrange;
self.map(|k| (k, ()))
.arrange_named::<Ba, Bu, Tr>(name)
.as_collection(reify)
}
pub fn consolidate_stream(&self) -> Self {
use crate::collection::AsCollection;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::Operator;
self.inner
.unary::<ConsolidatingContainerBuilder<_>, _, _, _>(
Pipeline,
"ConsolidateStream",
|_cap, _info| {
move |input, output| {
input.for_each(|time, data| {
output
.session_with_builder(&time)
.give_iterator(data.drain(..));
})
}
},
)
.as_collection()
}
}