timely/dataflow/operators/branch.rs
1//! Operators that separate one stream into two streams based on some condition
2
3use crate::dataflow::channels::pact::Pipeline;
4use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
5use crate::dataflow::{Scope, Stream, StreamCore};
6use crate::{Container, Data};
7
8/// Extension trait for `Stream`.
9pub trait Branch<S: Scope, D: Data> {
10 /// Takes one input stream and splits it into two output streams.
11 /// For each record, the supplied closure is called with a reference to
12 /// the data and its time. If it returns true, the record will be sent
13 /// to the second returned stream, otherwise it will be sent to the first.
14 ///
15 /// If the result of the closure only depends on the time, not the data,
16 /// `branch_when` should be used instead.
17 ///
18 /// # Examples
19 /// ```
20 /// use timely::dataflow::operators::{ToStream, Branch, Inspect};
21 ///
22 /// timely::example(|scope| {
23 /// let (odd, even) = (0..10)
24 /// .to_stream(scope)
25 /// .branch(|_time, x| *x % 2 == 0);
26 ///
27 /// even.inspect(|x| println!("even numbers: {:?}", x));
28 /// odd.inspect(|x| println!("odd numbers: {:?}", x));
29 /// });
30 /// ```
31 fn branch(
32 &self,
33 condition: impl Fn(&S::Timestamp, &D) -> bool + 'static,
34 ) -> (Stream<S, D>, Stream<S, D>);
35}
36
37impl<S: Scope, D: Data> Branch<S, D> for Stream<S, D> {
38 fn branch(
39 &self,
40 condition: impl Fn(&S::Timestamp, &D) -> bool + 'static,
41 ) -> (Stream<S, D>, Stream<S, D>) {
42 let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());
43
44 let mut input = builder.new_input(self, Pipeline);
45 let (mut output1, stream1) = builder.new_output();
46 let (mut output2, stream2) = builder.new_output();
47
48 builder.build(move |_| {
49 let mut vector = Vec::new();
50 move |_frontiers| {
51 let mut output1_handle = output1.activate();
52 let mut output2_handle = output2.activate();
53
54 input.for_each(|time, data| {
55 data.swap(&mut vector);
56 let mut out1 = output1_handle.session(&time);
57 let mut out2 = output2_handle.session(&time);
58 for datum in vector.drain(..) {
59 if condition(&time.time(), &datum) {
60 out2.give(datum);
61 } else {
62 out1.give(datum);
63 }
64 }
65 });
66 }
67 });
68
69 (stream1, stream2)
70 }
71}
72
73/// Extension trait for `Stream`.
74pub trait BranchWhen<T>: Sized {
75 /// Takes one input stream and splits it into two output streams.
76 /// For each time, the supplied closure is called. If it returns true,
77 /// the records for that will be sent to the second returned stream, otherwise
78 /// they will be sent to the first.
79 ///
80 /// # Examples
81 /// ```
82 /// use timely::dataflow::operators::{ToStream, BranchWhen, Inspect, Delay};
83 ///
84 /// timely::example(|scope| {
85 /// let (before_five, after_five) = (0..10)
86 /// .to_stream(scope)
87 /// .delay(|x,t| *x) // data 0..10 at time 0..10
88 /// .branch_when(|time| time >= &5);
89 ///
90 /// before_five.inspect(|x| println!("Times 0-4: {:?}", x));
91 /// after_five.inspect(|x| println!("Times 5 and later: {:?}", x));
92 /// });
93 /// ```
94 fn branch_when(&self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self);
95}
96
97impl<S: Scope, C: Container> BranchWhen<S::Timestamp> for StreamCore<S, C> {
98 fn branch_when(&self, condition: impl Fn(&S::Timestamp) -> bool + 'static) -> (Self, Self) {
99 let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());
100
101 let mut input = builder.new_input(self, Pipeline);
102 let (mut output1, stream1) = builder.new_output();
103 let (mut output2, stream2) = builder.new_output();
104
105 builder.build(move |_| {
106
107 let mut container = Default::default();
108 move |_frontiers| {
109 let mut output1_handle = output1.activate();
110 let mut output2_handle = output2.activate();
111
112 input.for_each(|time, data| {
113 data.swap(&mut container);
114 let mut out = if condition(&time.time()) {
115 output2_handle.session(&time)
116 } else {
117 output1_handle.session(&time)
118 };
119 out.give_container(&mut container);
120 });
121 }
122 });
123
124 (stream1, stream2)
125 }
126}