use timely::dataflow::Scope;
use ::{Collection, Data, Diff, Hashable};
use operators::arrange::ArrangeBySelf;
pub trait Consolidate<D: Data+Hashable> {
fn consolidate(&self) -> Self;
}
impl<G: Scope, D, R> Consolidate<D> for Collection<G, D, R>
where
D: Data+Hashable,
R: Diff,
G::Timestamp: ::lattice::Lattice+Ord,
{
fn consolidate(&self) -> Self {
self.arrange_by_self().as_collection(|d,_| d.clone())
}
}
pub trait ConsolidateStream<D: Data+Hashable> {
fn consolidate_stream(&self) -> Self;
}
impl<G: Scope, D, R> ConsolidateStream<D> for Collection<G, D, R>
where
D: Data+Hashable,
R: Diff,
G::Timestamp: ::lattice::Lattice+Ord,
{
fn consolidate_stream(&self) -> Self {
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::Operator;
use collection::AsCollection;
self.inner
.unary(Pipeline, "ConsolidateStream", |_cap, _info| {
let mut vector = Vec::new();
move |input, output| {
input.for_each(|time, data| {
data.swap(&mut vector);
vector.sort_unstable_by(|x,y| (&x.0, &x.1).cmp(&(&y.0, &y.1)));
for index in 1 .. vector.len() {
if vector[index].0 == vector[index - 1].0 && vector[index].1 == vector[index - 1].1 {
vector[index].2 = vector[index].2 + vector[index - 1].2;
vector[index - 1].2 = R::zero();
}
}
vector.retain(|x| !x.2.is_zero());
output.session(&time).give_vec(&mut vector);
})
}
})
.as_collection()
}
}