1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
use crate::Data;
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::{Stream, Scope};
pub trait Concat<G: Scope, D: Data> {
fn concat(&self, _: &Stream<G, D>) -> Stream<G, D>;
}
impl<G: Scope, D: Data> Concat<G, D> for Stream<G, D> {
fn concat(&self, other: &Stream<G, D>) -> Stream<G, D> {
self.scope().concatenate(vec![self.clone(), other.clone()])
}
}
pub trait Concatenate<G: Scope, D: Data> {
fn concatenate<I>(&self, sources: I) -> Stream<G, D>
where
I: IntoIterator<Item=Stream<G, D>>;
}
impl<G: Scope, D: Data> Concatenate<G, D> for Stream<G, D> {
fn concatenate<I>(&self, sources: I) -> Stream<G, D>
where
I: IntoIterator<Item=Stream<G, D>>
{
let clone = self.clone();
self.scope().concatenate(Some(clone).into_iter().chain(sources))
}
}
impl<G: Scope, D: Data> Concatenate<G, D> for G {
fn concatenate<I>(&self, sources: I) -> Stream<G, D>
where
I: IntoIterator<Item=Stream<G, D>>
{
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
let mut builder = OperatorBuilder::new("Concatenate".to_string(), self.clone());
let mut handles = sources.into_iter().map(|s| builder.new_input(&s, Pipeline)).collect::<Vec<_>>();
let (mut output, result) = builder.new_output();
builder.build(move |_capability| {
let mut vector = Vec::new();
move |_frontier| {
let mut output = output.activate();
for handle in handles.iter_mut() {
handle.for_each(|time, data| {
data.swap(&mut vector);
output.session(&time).give_vec(&mut vector);
})
}
}
});
result
}
}