1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
use crate::dataflow::{Scope, Stream};
use crate::Data;
pub trait OkErr<S: Scope, D: Data> {
fn ok_err<D1, D2, L>(
&self,
logic: L,
) -> (Stream<S, D1>, Stream<S, D2>)
where
D1: Data,
D2: Data,
L: FnMut(D) -> Result<D1,D2>+'static
;
}
impl<S: Scope, D: Data> OkErr<S, D> for Stream<S, D> {
fn ok_err<D1, D2, L>(
&self,
mut logic: L,
) -> (Stream<S, D1>, Stream<S, D2>)
where
D1: Data,
D2: Data,
L: FnMut(D) -> Result<D1,D2>+'static
{
let mut builder = OperatorBuilder::new("OkErr".to_owned(), self.scope());
let mut input = builder.new_input(self, Pipeline);
let (mut output1, stream1) = builder.new_output();
let (mut output2, stream2) = builder.new_output();
builder.build(move |_| {
let mut vector = Vec::new();
move |_frontiers| {
let mut output1_handle = output1.activate();
let mut output2_handle = output2.activate();
input.for_each(|time, data| {
data.swap(&mut vector);
let mut out1 = output1_handle.session(&time);
let mut out2 = output2_handle.session(&time);
for datum in vector.drain(..) {
match logic(datum) {
Ok(datum) => out1.give(datum),
Err(datum) => out2.give(datum),
}
}
});
}
});
(stream1, stream2)
}
}