use std::hash::Hash;
use std::collections::HashMap;
use crate::{Data, ExchangeData};
use crate::dataflow::{Stream, Scope};
use crate::dataflow::operators::generic::operator::Operator;
use crate::dataflow::channels::pact::Exchange;
pub trait Aggregate<S: Scope, K: ExchangeData+Hash, V: ExchangeData> {
fn aggregate<R: Data, D: Default+'static, F: Fn(&K, V, &mut D)+'static, E: Fn(K, D)->R+'static, H: Fn(&K)->u64+'static>(
&self,
fold: F,
emit: E,
hash: H) -> Stream<S, R> where S::Timestamp: Eq;
}
impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> Aggregate<S, K, V> for Stream<S, (K, V)> {
fn aggregate<R: Data, D: Default+'static, F: Fn(&K, V, &mut D)+'static, E: Fn(K, D)->R+'static, H: Fn(&K)->u64+'static>(
&self,
fold: F,
emit: E,
hash: H) -> Stream<S, R> where S::Timestamp: Eq {
let mut aggregates = HashMap::new();
let mut vector = Vec::new();
self.unary_notify(Exchange::new(move |&(ref k, _)| hash(k)), "Aggregate", vec![], move |input, output, notificator| {
input.for_each(|time, data| {
data.swap(&mut vector);
let agg_time = aggregates.entry(time.time().clone()).or_insert_with(HashMap::new);
for (key, val) in vector.drain(..) {
let agg = agg_time.entry(key.clone()).or_insert_with(Default::default);
fold(&key, val, agg);
}
notificator.notify_at(time.retain());
});
notificator.for_each(|time,_,_| {
if let Some(aggs) = aggregates.remove(time.time()) {
let mut session = output.session(&time);
for (key, agg) in aggs {
session.give(emit(key, agg));
}
}
});
})
}
}