timely/dataflow/operators/vec/branch.rs
1//! Operators that separate one stream into two streams based on some condition
2
3use crate::dataflow::channels::pact::Pipeline;
4use crate::progress::Timestamp;
5use crate::dataflow::operators::generic::OutputBuilder;
6use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
7use crate::dataflow::{StreamVec, Stream};
8use crate::Container;
9
10/// Extension trait for `StreamVec`.
11pub trait Branch<T: Timestamp, D> : Sized {
12 /// Takes one input stream and splits it into two output streams.
13 /// For each record, the supplied closure is called with a reference to
14 /// the data and its time. If it returns `true`, the record will be sent
15 /// to the second returned stream, otherwise it will be sent to the first.
16 ///
17 /// If the result of the closure only depends on the time, not the data,
18 /// `branch_when` should be used instead.
19 ///
20 /// # Examples
21 /// ```
22 /// use timely::dataflow::operators::{ToStream, Inspect, vec::Branch};
23 ///
24 /// timely::example(|scope| {
25 /// let (odd, even) = (0..10)
26 /// .to_stream(scope)
27 /// .branch(|_time, x| *x % 2 == 0);
28 ///
29 /// even.inspect(|x| println!("even numbers: {:?}", x));
30 /// odd.inspect(|x| println!("odd numbers: {:?}", x));
31 /// });
32 /// ```
33 fn branch(self, condition: impl Fn(&T, &D) -> bool + 'static) -> (Self, Self);
34}
35
36impl<'scope, T: Timestamp, D: 'static> Branch<T, D> for StreamVec<'scope, T, D> {
37 fn branch(self, condition: impl Fn(&T, &D) -> bool + 'static) -> (Self, Self) {
38 let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());
39
40 let mut input = builder.new_input(self, Pipeline);
41 builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never);
42 let (output1, stream1) = builder.new_output();
43 let (output2, stream2) = builder.new_output();
44
45 let mut output1 = OutputBuilder::from(output1);
46 let mut output2 = OutputBuilder::from(output2);
47
48 builder.build(move |_| {
49 move |_frontiers| {
50 let mut output1_handle = output1.activate();
51 let mut output2_handle = output2.activate();
52
53 input.for_each_time(|time, data| {
54 let mut out1 = output1_handle.session(&time);
55 let mut out2 = output2_handle.session(&time);
56 for datum in data.flat_map(|d| d.drain(..)) {
57 if condition(time.time(), &datum) {
58 out2.give(datum);
59 } else {
60 out1.give(datum);
61 }
62 }
63 });
64 }
65 });
66
67 (stream1, stream2)
68 }
69}
70
71/// Extension trait for `Stream`.
72pub trait BranchWhen<T>: Sized {
73 /// Takes one input stream and splits it into two output streams.
74 /// For each time, the supplied closure is called. If it returns `true`,
75 /// the records for that will be sent to the second returned stream, otherwise
76 /// they will be sent to the first.
77 ///
78 /// # Examples
79 /// ```
80 /// use timely::dataflow::operators::{ToStream, Inspect};
81 /// use timely::dataflow::operators::vec::{BranchWhen, Delay};
82 ///
83 /// timely::example(|scope| {
84 /// let (before_five, after_five) = (0..10)
85 /// .to_stream(scope)
86 /// .container::<Vec<_>>()
87 /// .delay(|x,t| *x) // data 0..10 at time 0..10
88 /// .branch_when(|time| time >= &5);
89 ///
90 /// before_five.inspect(|x| println!("Times 0-4: {:?}", x));
91 /// after_five.inspect(|x| println!("Times 5 and later: {:?}", x));
92 /// });
93 /// ```
94 fn branch_when(self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self);
95}
96
97impl<'scope, T: Timestamp, C: Container> BranchWhen<T> for Stream<'scope, T, C> {
98 fn branch_when(self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self) {
99 let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());
100
101 let mut input = builder.new_input(self, Pipeline);
102 builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never);
103 let (output1, stream1) = builder.new_output();
104 let (output2, stream2) = builder.new_output();
105
106 let mut output1 = OutputBuilder::from(output1);
107 let mut output2 = OutputBuilder::from(output2);
108
109 builder.build(move |_| {
110
111 move |_frontiers| {
112 let mut output1_handle = output1.activate();
113 let mut output2_handle = output2.activate();
114
115 input.for_each_time(|time, data| {
116 let mut out = if condition(time.time()) {
117 output2_handle.session(&time)
118 } else {
119 output1_handle.session(&time)
120 };
121 out.give_containers(data);
122 });
123 }
124 });
125
126 (stream1, stream2)
127 }
128}