1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
use std::hash::Hash;
use std::collections::HashMap;
use crate::{Data, ExchangeData};
use crate::dataflow::{Stream, Scope};
use crate::dataflow::operators::generic::operator::Operator;
use crate::dataflow::channels::pact::Exchange;
pub trait StateMachine<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> {
fn state_machine<
R: Data,
D: Default+'static,
I: IntoIterator<Item=R>,
F: Fn(&K, V, &mut D)->(bool, I)+'static,
H: Fn(&K)->u64+'static,
>(&self, fold: F, hash: H) -> Stream<S, R> where S::Timestamp : Hash+Eq ;
}
impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> StateMachine<S, K, V> for Stream<S, (K, V)> {
fn state_machine<
R: Data,
D: Default+'static,
I: IntoIterator<Item=R>,
F: Fn(&K, V, &mut D)->(bool, I)+'static,
H: Fn(&K)->u64+'static,
>(&self, fold: F, hash: H) -> Stream<S, R> where S::Timestamp : Hash+Eq {
let mut pending: HashMap<_, Vec<(K, V)>> = HashMap::new();
let mut states = HashMap::new();
let mut vector = Vec::new();
self.unary_notify(Exchange::new(move |&(ref k, _)| hash(k)), "StateMachine", vec![], move |input, output, notificator| {
notificator.for_each(|time,_,_| {
if let Some(pend) = pending.remove(time.time()) {
let mut session = output.session(&time);
for (key, val) in pend {
let (remove, output) = {
let state = states.entry(key.clone()).or_insert_with(Default::default);
fold(&key, val, state)
};
if remove { states.remove(&key); }
session.give_iterator(output.into_iter());
}
}
});
input.for_each(|time, data| {
data.swap(&mut vector);
if notificator.frontier(0).less_than(time.time()) {
pending.entry(time.time().clone()).or_insert_with(Vec::new).extend(vector.drain(..));
notificator.notify_at(time.retain());
}
else {
let mut session = output.session(&time);
for (key, val) in vector.drain(..) {
let (remove, output) = {
let state = states.entry(key.clone()).or_insert_with(Default::default);
fold(&key, val, state)
};
if remove { states.remove(&key); }
session.give_iterator(output.into_iter());
}
}
});
})
}
}