use timely::dataflow::Scope;
use ::{Collection, ExchangeData, Hashable};
use ::difference::Semigroup;
use operators::arrange::arrangement::Arrange;
pub trait Consolidate<D: ExchangeData+Hashable> : Sized {
fn consolidate(&self) -> Self {
self.consolidate_named("Consolidate")
}
fn consolidate_named(&self, name: &str) -> Self;
}
impl<G: Scope, D, R> Consolidate<D> for Collection<G, D, R>
where
D: ExchangeData+Hashable,
R: ExchangeData+Semigroup,
G::Timestamp: ::lattice::Lattice+Ord,
{
fn consolidate_named(&self, name: &str) -> Self {
use trace::implementations::ord::OrdKeySpine as DefaultKeyTrace;
self.map(|k| (k, ()))
.arrange_named::<DefaultKeyTrace<_,_,_>>(name)
.as_collection(|d: &D, _| d.clone())
}
}
pub trait ConsolidateStream<D: ExchangeData+Hashable> {
fn consolidate_stream(&self) -> Self;
}
impl<G: Scope, D, R> ConsolidateStream<D> for Collection<G, D, R>
where
D: ExchangeData+Hashable,
R: ExchangeData+Semigroup,
G::Timestamp: ::lattice::Lattice+Ord,
{
fn consolidate_stream(&self) -> Self {
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::Operator;
use collection::AsCollection;
self.inner
.unary(Pipeline, "ConsolidateStream", |_cap, _info| {
let mut vector = Vec::new();
move |input, output| {
input.for_each(|time, data| {
data.swap(&mut vector);
crate::consolidation::consolidate_updates(&mut vector);
output.session(&time).give_vec(&mut vector);
})
}
})
.as_collection()
}
}