use timely::dataflow::Scope;
use ::{Collection, Data, Hashable};
use ::difference::Monoid;
use operators::arrange::ArrangeBySelf;
pub trait Consolidate<D: Data+Hashable> {
fn consolidate(&self) -> Self;
}
impl<G: Scope, D, R> Consolidate<D> for Collection<G, D, R>
where
D: Data+Hashable,
R: Monoid,
G::Timestamp: ::lattice::Lattice+Ord,
{
fn consolidate(&self) -> Self {
self.arrange_by_self().as_collection(|d,_| d.clone())
}
}
pub trait ConsolidateStream<D: Data+Hashable> {
fn consolidate_stream(&self) -> Self;
}
impl<G: Scope, D, R> ConsolidateStream<D> for Collection<G, D, R>
where
D: Data+Hashable,
R: Monoid,
G::Timestamp: ::lattice::Lattice+Ord,
{
fn consolidate_stream(&self) -> Self {
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::Operator;
use collection::AsCollection;
self.inner
.unary(Pipeline, "ConsolidateStream", |_cap, _info| {
let mut vector = Vec::new();
move |input, output| {
input.for_each(|time, data| {
data.swap(&mut vector);
vector.sort_unstable_by(|x,y| (&x.0, &x.1).cmp(&(&y.0, &y.1)));
for index in 1 .. vector.len() {
if vector[index].0 == vector[index - 1].0 && vector[index].1 == vector[index - 1].1 {
let prev = ::std::mem::replace(&mut vector[index - 1].2, R::zero());
vector[index].2 += &prev;
}
}
vector.retain(|x| !x.2.is_zero());
output.session(&time).give_vec(&mut vector);
})
}
})
.as_collection()
}
}