use std::default::Default;
use timely::order::TotalOrder;
use timely::dataflow::*;
use timely::dataflow::operators::Unary;
use timely::dataflow::channels::pact::Pipeline;
use timely_sort::Unsigned;
use lattice::Lattice;
use ::{Data, Collection, Diff};
use hashable::{Hashable, UnsignedWrapper};
use collection::AsCollection;
use operators::arrange::{Arrange, Arranged, ArrangeBySelf};
use trace::{BatchReader, Cursor, Trace, TraceReader};
use trace::implementations::ord::OrdKeySpine as DefaultKeyTrace;
pub trait CountTotal<G: Scope, K: Data, R: Diff> where G::Timestamp: TotalOrder+Lattice+Ord {
fn count_total(&self) -> Collection<G, (K, R), isize>;
fn count_total_u(&self) -> Collection<G, (K, R), isize> where K: Unsigned+Copy;
}
impl<G: Scope, K: Data+Default+Hashable, R: Diff> CountTotal<G, K, R> for Collection<G, K, R>
where G::Timestamp: TotalOrder+Lattice+Ord {
fn count_total(&self) -> Collection<G, (K, R), isize> {
self.arrange_by_self()
.count_total_core()
.map(|(k,c)| (k.item, c))
}
fn count_total_u(&self) -> Collection<G, (K, R), isize> where K: Unsigned+Copy {
self.map(|k| (UnsignedWrapper::from(k), ()))
.arrange(DefaultKeyTrace::new())
.count_total_core()
.map(|(k,c)| (k.item, c))
}
}
pub trait CountTotalCore<G: Scope, K: Data, R: Diff> where G::Timestamp: TotalOrder+Lattice+Ord {
fn count_total_core(&self) -> Collection<G, (K, R), isize>;
}
impl<G: Scope, K: Data, R: Diff, T1> CountTotalCore<G, K, R> for Arranged<G, K, (), R, T1>
where
G::Timestamp: TotalOrder+Lattice+Ord,
T1: TraceReader<K, (), G::Timestamp, R>+Clone+'static,
T1::Batch: BatchReader<K, (), G::Timestamp, R> {
fn count_total_core(&self) -> Collection<G, (K, R), isize> {
let mut trace = self.trace.clone();
self.stream.unary_stream(Pipeline, "CountTotal", move |input, output| {
input.for_each(|capability, batches| {
let mut session = output.session(&capability);
for batch in batches.drain(..).map(|x| x.item) {
let (mut batch_cursor, batch_storage) = batch.cursor();
let (mut trace_cursor, trace_storage) = trace.cursor_through(batch.lower()).unwrap();
while batch_cursor.key_valid(&batch_storage) {
let key = batch_cursor.key(&batch_storage);
let mut count = R::zero();
trace_cursor.seek_key(&trace_storage, key);
if trace_cursor.key_valid(&trace_storage) && trace_cursor.key(&trace_storage) == key {
trace_cursor.map_times(&trace_storage, |_, diff| count = count + diff);
}
batch_cursor.map_times(&batch_storage, |time, diff| {
if !count.is_zero() {
session.give(((key.clone(), count), time.clone(), -1));
}
count = count + diff;
if !count.is_zero() {
session.give(((key.clone(), count), time.clone(), 1));
}
});
batch_cursor.step_key(&batch_storage);
}
trace.advance_by(batch.upper());
trace.distinguish_since(batch.upper());
}
});
})
.as_collection()
}
}