Skip to main content

timely/dataflow/operators/vec/
branch.rs

1//! Operators that separate one stream into two streams based on some condition
2
3use crate::dataflow::channels::pact::Pipeline;
4use crate::progress::Timestamp;
5use crate::dataflow::operators::generic::OutputBuilder;
6use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
7use crate::dataflow::{StreamVec, Stream};
8use crate::Container;
9
10/// Extension trait for `StreamVec`.
11pub trait Branch<T: Timestamp, D> : Sized {
12    /// Takes one input stream and splits it into two output streams.
13    /// For each record, the supplied closure is called with a reference to
14    /// the data and its time. If it returns `true`, the record will be sent
15    /// to the second returned stream, otherwise it will be sent to the first.
16    ///
17    /// If the result of the closure only depends on the time, not the data,
18    /// `branch_when` should be used instead.
19    ///
20    /// # Examples
21    /// ```
22    /// use timely::dataflow::operators::{ToStream, Inspect, vec::Branch};
23    ///
24    /// timely::example(|scope| {
25    ///     let (odd, even) = (0..10)
26    ///         .to_stream(scope)
27    ///         .branch(|_time, x| *x % 2 == 0);
28    ///
29    ///     even.inspect(|x| println!("even numbers: {:?}", x));
30    ///     odd.inspect(|x| println!("odd numbers: {:?}", x));
31    /// });
32    /// ```
33    fn branch(self, condition: impl Fn(&T, &D) -> bool + 'static) -> (Self, Self);
34}
35
36impl<'scope, T: Timestamp, D: 'static> Branch<T, D> for StreamVec<'scope, T, D> {
37    fn branch(self, condition: impl Fn(&T, &D) -> bool + 'static) -> (Self, Self) {
38        let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());
39
40        let mut input = builder.new_input(self, Pipeline);
41        builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never);
42        let (output1, stream1) = builder.new_output();
43        let (output2, stream2) = builder.new_output();
44
45        let mut output1 = OutputBuilder::from(output1);
46        let mut output2 = OutputBuilder::from(output2);
47
48        builder.build(move |_| {
49            move |_frontiers| {
50                let mut output1_handle = output1.activate();
51                let mut output2_handle = output2.activate();
52
53                input.for_each_time(|time, data| {
54                    let mut out1 = output1_handle.session(&time);
55                    let mut out2 = output2_handle.session(&time);
56                    for datum in data.flat_map(|d| d.drain(..)) {
57                        if condition(time.time(), &datum) {
58                            out2.give(datum);
59                        } else {
60                            out1.give(datum);
61                        }
62                    }
63                });
64            }
65        });
66
67        (stream1, stream2)
68    }
69}
70
71/// Extension trait for `Stream`.
72pub trait BranchWhen<T>: Sized {
73    /// Takes one input stream and splits it into two output streams.
74    /// For each time, the supplied closure is called. If it returns `true`,
75    /// the records for that will be sent to the second returned stream, otherwise
76    /// they will be sent to the first.
77    ///
78    /// # Examples
79    /// ```
80    /// use timely::dataflow::operators::{ToStream, Inspect};
81    /// use timely::dataflow::operators::vec::{BranchWhen, Delay};
82    ///
83    /// timely::example(|scope| {
84    ///     let (before_five, after_five) = (0..10)
85    ///         .to_stream(scope)
86    ///         .container::<Vec<_>>()
87    ///         .delay(|x,t| *x) // data 0..10 at time 0..10
88    ///         .branch_when(|time| time >= &5);
89    ///
90    ///     before_five.inspect(|x| println!("Times 0-4: {:?}", x));
91    ///     after_five.inspect(|x| println!("Times 5 and later: {:?}", x));
92    /// });
93    /// ```
94    fn branch_when(self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self);
95}
96
97impl<'scope, T: Timestamp, C: Container> BranchWhen<T> for Stream<'scope, T, C> {
98    fn branch_when(self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self) {
99        let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());
100
101        let mut input = builder.new_input(self, Pipeline);
102        builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never);
103        let (output1, stream1) = builder.new_output();
104        let (output2, stream2) = builder.new_output();
105
106        let mut output1 = OutputBuilder::from(output1);
107        let mut output2 = OutputBuilder::from(output2);
108
109        builder.build(move |_| {
110
111            move |_frontiers| {
112                let mut output1_handle = output1.activate();
113                let mut output2_handle = output2.activate();
114
115                input.for_each_time(|time, data| {
116                    let mut out = if condition(time.time()) {
117                        output2_handle.session(&time)
118                    } else {
119                        output1_handle.session(&time)
120                    };
121                    out.give_containers(data);
122                });
123            }
124        });
125
126        (stream1, stream2)
127    }
128}