use std::hash::Hash;
use std::collections::HashMap;
use crate::ExchangeData;
use crate::progress::Timestamp;
use crate::dataflow::StreamVec;
use crate::dataflow::operators::generic::operator::Operator;
use crate::dataflow::channels::pact::Exchange;
pub trait StateMachine<'scope, T: Timestamp, K: ExchangeData+Hash+Eq, V: ExchangeData> {
fn state_machine<
R: 'static, D: Default+'static, I: IntoIterator<Item=R>, F: Fn(&K, V, &mut D)->(bool, I)+'static, H: Fn(&K)->u64+'static, >(self, fold: F, hash: H) -> StreamVec<'scope, T, R> where T : Hash+Eq ;
}
impl<'scope, T: Timestamp, K: ExchangeData+Hash+Eq+Clone, V: ExchangeData> StateMachine<'scope, T, K, V> for StreamVec<'scope, T, (K, V)> {
fn state_machine<
R: 'static, D: Default+'static, I: IntoIterator<Item=R>, F: Fn(&K, V, &mut D)->(bool, I)+'static, H: Fn(&K)->u64+'static, >(self, fold: F, hash: H) -> StreamVec<'scope, T, R> where T : Hash+Eq {
let mut pending: HashMap<_, Vec<(K, V)>> = HashMap::new(); let mut states = HashMap::new();
self.unary_notify(Exchange::new(move |(k, _)| hash(k)), "StateMachine", vec![], move |input, output, notificator| {
notificator.for_each(|time,_,_| {
if let Some(pend) = pending.remove(time.time()) {
let mut session = output.session(&time);
for (key, val) in pend {
let (remove, output) = {
let state = states.entry(key.clone()).or_insert_with(Default::default);
fold(&key, val, state)
};
if remove { states.remove(&key); }
session.give_iterator(output.into_iter());
}
}
});
input.for_each_time(|time, data| {
if notificator.frontier(0).less_than(time.time()) {
for data in data { pending.entry(time.time().clone()).or_insert_with(Vec::new).append(data); }
notificator.notify_at(time.retain(output.output_index()));
}
else {
let mut session = output.session(&time);
for (key, val) in data.flat_map(|d| d.drain(..)) {
let (remove, output) = {
let state = states.entry(key.clone()).or_insert_with(Default::default);
fold(&key, val, state)
};
if remove { states.remove(&key); }
session.give_iterator(output.into_iter());
}
}
});
})
}
}