timely 0.29.0

A low-latency data-parallel dataflow system in Rust
Documentation
//! Operators that separate one stream into two streams based on some condition

use crate::dataflow::channels::pact::Pipeline;
use crate::progress::Timestamp;
use crate::dataflow::operators::generic::OutputBuilder;
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
use crate::dataflow::{StreamVec, Stream};
use crate::Container;

/// Extension trait for `StreamVec`.
pub trait Branch<T: Timestamp, D> : Sized {
    /// Takes one input stream and splits it into two output streams.
    /// For each record, the supplied closure is called with a reference to
    /// the data and its time. If it returns `true`, the record will be sent
    /// to the second returned stream, otherwise it will be sent to the first.
    ///
    /// If the result of the closure only depends on the time, not the data,
    /// `branch_when` should be used instead.
    ///
    /// # Examples
    /// ```
    /// use timely::dataflow::operators::{ToStream, Inspect, vec::Branch};
    ///
    /// timely::example(|scope| {
    ///     let (odd, even) = (0..10)
    ///         .to_stream(scope)
    ///         .branch(|_time, x| *x % 2 == 0);
    ///
    ///     even.inspect(|x| println!("even numbers: {:?}", x));
    ///     odd.inspect(|x| println!("odd numbers: {:?}", x));
    /// });
    /// ```
    fn branch(self, condition: impl Fn(&T, &D) -> bool + 'static) -> (Self, Self);
}

impl<'scope, T: Timestamp, D: 'static> Branch<T, D> for StreamVec<'scope, T, D> {
    fn branch(self, condition: impl Fn(&T, &D) -> bool + 'static) -> (Self, Self) {
        let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());

        let mut input = builder.new_input(self, Pipeline);
        builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never);
        let (output1, stream1) = builder.new_output();
        let (output2, stream2) = builder.new_output();

        let mut output1 = OutputBuilder::from(output1);
        let mut output2 = OutputBuilder::from(output2);

        builder.build(move |_| {
            move |_frontiers| {
                let mut output1_handle = output1.activate();
                let mut output2_handle = output2.activate();

                input.for_each_time(|time, data| {
                    let mut out1 = output1_handle.session(&time);
                    let mut out2 = output2_handle.session(&time);
                    for datum in data.flat_map(|d| d.drain(..)) {
                        if condition(time.time(), &datum) {
                            out2.give(datum);
                        } else {
                            out1.give(datum);
                        }
                    }
                });
            }
        });

        (stream1, stream2)
    }
}

/// Extension trait for `Stream`.
pub trait BranchWhen<T>: Sized {
    /// Takes one input stream and splits it into two output streams.
    /// For each time, the supplied closure is called. If it returns `true`,
    /// the records for that will be sent to the second returned stream, otherwise
    /// they will be sent to the first.
    ///
    /// # Examples
    /// ```
    /// use timely::dataflow::operators::{ToStream, Inspect};
    /// use timely::dataflow::operators::vec::{BranchWhen, Delay};
    ///
    /// timely::example(|scope| {
    ///     let (before_five, after_five) = (0..10)
    ///         .to_stream(scope)
    ///         .container::<Vec<_>>()
    ///         .delay(|x,t| *x) // data 0..10 at time 0..10
    ///         .branch_when(|time| time >= &5);
    ///
    ///     before_five.inspect(|x| println!("Times 0-4: {:?}", x));
    ///     after_five.inspect(|x| println!("Times 5 and later: {:?}", x));
    /// });
    /// ```
    fn branch_when(self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self);
}

impl<'scope, T: Timestamp, C: Container> BranchWhen<T> for Stream<'scope, T, C> {
    fn branch_when(self, condition: impl Fn(&T) -> bool + 'static) -> (Self, Self) {
        let mut builder = OperatorBuilder::new("Branch".to_owned(), self.scope());

        let mut input = builder.new_input(self, Pipeline);
        builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never);
        let (output1, stream1) = builder.new_output();
        let (output2, stream2) = builder.new_output();

        let mut output1 = OutputBuilder::from(output1);
        let mut output2 = OutputBuilder::from(output2);

        builder.build(move |_| {

            move |_frontiers| {
                let mut output1_handle = output1.activate();
                let mut output2_handle = output2.activate();

                input.for_each_time(|time, data| {
                    let mut out = if condition(time.time()) {
                        output2_handle.session(&time)
                    } else {
                        output1_handle.session(&time)
                    };
                    out.give_containers(data);
                });
            }
        });

        (stream1, stream2)
    }
}