use std::hash::Hash;
use std::collections::HashMap;
use ::{Data, ExchangeData};
use order::PartialOrder;
use dataflow::{Stream, Scope};
use dataflow::operators::unary::Unary;
use dataflow::channels::pact::Exchange;
pub trait StateMachine<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> {
fn state_machine<
R: Data, D: Default+'static, I: IntoIterator<Item=R>, F: Fn(&K, V, &mut D)->(bool, I)+'static, H: Fn(&K)->u64+'static, >(&self, fold: F, hash: H) -> Stream<S, R> where S::Timestamp : Hash+Eq ;
}
impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> StateMachine<S, K, V> for Stream<S, (K, V)> {
fn state_machine<
R: Data, D: Default+'static, I: IntoIterator<Item=R>, F: Fn(&K, V, &mut D)->(bool, I)+'static, H: Fn(&K)->u64+'static, >(&self, fold: F, hash: H) -> Stream<S, R> where S::Timestamp : Hash+Eq {
let mut pending = HashMap::new(); let mut states = HashMap::new();
self.unary_notify(Exchange::new(move |&(ref k, _)| hash(k)), "StateMachine", vec![], move |input, output, notificator| {
input.for_each(|time, data| {
if notificator.frontier(0).iter().any(|x| x.less_than(&time.time())) {
pending.entry(time.time().clone()).or_insert_with(Vec::new).extend(data.drain(..));
notificator.notify_at(time);
}
else {
let mut session = output.session(&time);
for (key, val) in data.drain(..) {
let (remove, output) = {
let state = states.entry(key.clone()).or_insert_with(Default::default);
fold(&key, val, state)
};
if remove { states.remove(&key); }
session.give_iterator(output.into_iter());
}
}
});
notificator.for_each(|time,_,_| {
if let Some(pend) = pending.remove(time.time()) {
let mut session = output.session(&time);
for (key, val) in pend {
let (remove, output) = {
let state = states.entry(key.clone()).or_insert_with(Default::default);
fold(&key, val, state)
};
if remove { states.remove(&key); }
session.give_iterator(output.into_iter());
}
}
});
})
}
}