use timely::order::TotalOrder;
use timely::dataflow::*;
use timely::dataflow::operators::Operator;
use timely::dataflow::channels::pact::Pipeline;
use lattice::Lattice;
use ::{Data, Collection};
use ::difference::{Monoid, Abelian};
use hashable::Hashable;
use collection::AsCollection;
use operators::arrange::{Arranged, ArrangeBySelf};
use trace::{BatchReader, Cursor, TraceReader};
pub trait ThresholdTotal<G: Scope, K: Data, R: Monoid> where G::Timestamp: TotalOrder+Lattice+Ord {
fn threshold_total<R2: Abelian, F: Fn(&K,&R)->R2+'static>(&self, thresh: F) -> Collection<G, K, R2>;
fn distinct_total(&self) -> Collection<G, K, isize> {
self.threshold_total(|_,c| if c.is_zero() { 0 } else { 1 })
}
}
impl<G: Scope, K: Data+Hashable, R: Monoid> ThresholdTotal<G, K, R> for Collection<G, K, R>
where G::Timestamp: TotalOrder+Lattice+Ord {
fn threshold_total<R2: Abelian, F: Fn(&K,&R)->R2+'static>(&self, thresh: F) -> Collection<G, K, R2> {
self.arrange_by_self()
.threshold_total(thresh)
}
}
impl<G: Scope, K: Data, R: Monoid, T1> ThresholdTotal<G, K, R> for Arranged<G, K, (), R, T1>
where
G::Timestamp: TotalOrder+Lattice+Ord,
T1: TraceReader<K, (), G::Timestamp, R>+Clone+'static,
T1::Batch: BatchReader<K, (), G::Timestamp, R> {
fn threshold_total<R2: Abelian, F:Fn(&K,&R)->R2+'static>(&self, thresh: F) -> Collection<G, K, R2> {
let mut trace = self.trace.clone();
let mut buffer = Vec::new();
self.stream.unary(Pipeline, "ThresholdTotal", move |_,_| move |input, output| {
let thresh = &thresh;
input.for_each(|capability, batches| {
batches.swap(&mut buffer);
let mut session = output.session(&capability);
for batch in buffer.drain(..) {
let mut batch_cursor = batch.cursor();
let (mut trace_cursor, trace_storage) = trace.cursor_through(batch.lower()).unwrap();
while batch_cursor.key_valid(&batch) {
let key = batch_cursor.key(&batch);
let mut count = R::zero();
trace_cursor.seek_key(&trace_storage, key);
if trace_cursor.key_valid(&trace_storage) && trace_cursor.key(&trace_storage) == key {
trace_cursor.map_times(&trace_storage, |_, diff| count += diff);
}
batch_cursor.map_times(&batch, |time, diff| {
let old_weight = if count.is_zero() { R2::zero() } else { thresh(key, &count) };
count += diff;
let new_weight = if count.is_zero() { R2::zero() } else { thresh(key, &count) };
let mut difference = -old_weight;
difference += &new_weight;
if !difference.is_zero() {
session.give((key.clone(), time.clone(), difference));
}
});
batch_cursor.step_key(&batch);
}
trace.advance_by(batch.upper());
trace.distinguish_since(batch.upper());
}
});
})
.as_collection()
}
}