1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
//! Merges the contents of multiple streams.
use crate::Container;
use crate::progress::Timestamp;
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::{Stream, Scope};
/// Merge the contents of two streams.
pub trait Concat{
/// Merge the contents of two streams.
///
/// # Examples
/// ```
/// use timely::dataflow::operators::{ToStream, Concat, Inspect};
///
/// timely::example(|scope| {
///
/// let stream = (0..10).to_stream(scope);
/// stream.clone()
/// .concat(stream)
/// .container::<Vec<_>>()
/// .inspect(|x| println!("seen: {:?}", x));
/// });
/// ```
fn concat(self, other: Self) -> Self;
}
impl<'scope, T: Timestamp, C: Container> Concat for Stream<'scope, T, C> {
fn concat(self, other: Self) -> Self {
self.scope().concatenate([self, other])
}
}
/// Merge the contents of multiple streams.
pub trait Concatenate<'scope, T: Timestamp> {
/// Merge the contents of multiple streams.
///
/// # Examples
/// ```
/// use timely::dataflow::operators::{ToStream, Concatenate, Inspect};
///
/// timely::example(|scope| {
///
/// let streams = vec![(0..10).to_stream(scope),
/// (0..10).to_stream(scope),
/// (0..10).to_stream(scope)];
///
/// scope.concatenate(streams)
/// .container::<Vec<_>>()
/// .inspect(|x| println!("seen: {:?}", x));
/// });
/// ```
fn concatenate<I, C: Container>(&self, sources: I) -> Stream<'scope, T, C>
where
I: IntoIterator<Item=Stream<'scope, T, C>>;
}
impl<'scope, T: Timestamp> Concatenate<'scope, T> for Scope<'scope, T> {
fn concatenate<I, C: Container>(&self, sources: I) -> Stream<'scope, T, C>
where
I: IntoIterator<Item=Stream<'scope, T, C>>
{
// create an operator builder.
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
let mut builder = OperatorBuilder::new("Concatenate".to_string(), *self);
// create new input handles for each input stream.
let mut handles = sources.into_iter().map(|s| builder.new_input(s, Pipeline)).collect::<Vec<_>>();
for i in 0 .. handles.len() { builder.set_notify_for(i, crate::progress::operate::FrontierInterest::Never); }
// create one output handle for the concatenated results.
let (mut output, result) = builder.new_output();
// build an operator that plays out all input data.
builder.build(move |_capability| {
move |_frontier| {
let mut output = output.activate();
for handle in handles.iter_mut() {
handle.for_each(|time, data| {
output.give(&time, data);
})
}
}
});
result
}
}