timely/dataflow/operators/
concat.rs1use crate::Container;
5use crate::dataflow::channels::pact::Pipeline;
6use crate::dataflow::{StreamCore, Scope};
7
8pub trait Concat<G: Scope, D: Container> {
10 fn concat(&self, _: &StreamCore<G, D>) -> StreamCore<G, D>;
24}
25
26impl<G: Scope, D: Container> Concat<G, D> for StreamCore<G, D> {
27 fn concat(&self, other: &StreamCore<G, D>) -> StreamCore<G, D> {
28 self.scope().concatenate([self.clone(), other.clone()])
29 }
30}
31
32pub trait Concatenate<G: Scope, D: Container> {
34 fn concatenate<I>(&self, sources: I) -> StreamCore<G, D>
51 where
52 I: IntoIterator<Item=StreamCore<G, D>>;
53}
54
55impl<G: Scope, D: Container> Concatenate<G, D> for StreamCore<G, D> {
56 fn concatenate<I>(&self, sources: I) -> StreamCore<G, D>
57 where
58 I: IntoIterator<Item=StreamCore<G, D>>
59 {
60 let clone = self.clone();
61 self.scope().concatenate(Some(clone).into_iter().chain(sources))
62 }
63}
64
65impl<G: Scope, D: Container> Concatenate<G, D> for G {
66 fn concatenate<I>(&self, sources: I) -> StreamCore<G, D>
67 where
68 I: IntoIterator<Item=StreamCore<G, D>>
69 {
70
71 use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
73 let mut builder = OperatorBuilder::new("Concatenate".to_string(), self.clone());
74
75 let mut handles = sources.into_iter().map(|s| builder.new_input(&s, Pipeline)).collect::<Vec<_>>();
77
78 let (mut output, result) = builder.new_output();
80
81 builder.build(move |_capability| {
83
84 let mut vector = Default::default();
85 move |_frontier| {
86 let mut output = output.activate();
87 for handle in handles.iter_mut() {
88 handle.for_each(|time, data| {
89 data.swap(&mut vector);
90 output.session(&time).give_container(&mut vector);
91 })
92 }
93 }
94 });
95
96 result
97 }
98}