timely 0.29.0

A low-latency data-parallel dataflow system in Rust
Documentation
//! Extension trait and implementation for observing and action on streamed data.

use crate::Container;
use crate::progress::Timestamp;
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::Stream;
use crate::dataflow::operators::generic::Operator;

/// Methods to inspect records and batches of records on a stream.
pub trait Inspect<T: Timestamp, C>: InspectCore<T, C> + Sized
where
    for<'a> &'a C: IntoIterator,
{
    /// Runs a supplied closure on each observed data element.
    ///
    /// # Examples
    /// ```
    /// use timely::dataflow::operators::{ToStream, Inspect};
    ///
    /// timely::example(|scope| {
    ///     (0..10).to_stream(scope)
    ///            .container::<Vec<_>>()
    ///            .inspect(|x| println!("seen: {:?}", x));
    /// });
    /// ```
    fn inspect<F>(self, mut func: F) -> Self
    where
        F: for<'a> FnMut(<&'a C as IntoIterator>::Item) + 'static,
    {
        self.inspect_batch(move |_, data| {
            for datum in data.into_iter() { func(datum); }
        })
    }

    /// Runs a supplied closure on each observed data element and associated time.
    ///
    /// # Examples
    /// ```
    /// use timely::dataflow::operators::{ToStream, Inspect};
    ///
    /// timely::example(|scope| {
    ///     (0..10).to_stream(scope)
    ///            .container::<Vec<_>>()
    ///            .inspect_time(|t, x| println!("seen at: {:?}\t{:?}", t, x));
    /// });
    /// ```
    fn inspect_time<F>(self, mut func: F) -> Self
    where
        F: for<'a> FnMut(&T, <&'a C as IntoIterator>::Item) + 'static,
    {
        self.inspect_batch(move |time, data| {
            for datum in data.into_iter() {
                func(time, datum);
            }
        })
    }

    /// Runs a supplied closure on each observed data batch (time and data slice).
    ///
    /// # Examples
    /// ```
    /// use timely::dataflow::operators::{ToStream, Inspect};
    ///
    /// timely::example(|scope| {
    ///     (0..10).to_stream(scope)
    ///            .container::<Vec<_>>()
    ///            .inspect_batch(|t,xs| println!("seen at: {:?}\t{:?} records", t, xs.len()));
    /// });
    /// ```
    fn inspect_batch(self, mut func: impl FnMut(&T, &C)+'static) -> Self {
        self.inspect_core(move |event| {
            if let Ok((time, data)) = event {
                func(time, data);
            }
        })
    }

    /// Runs a supplied closure on each observed data batch, and each frontier advancement.
    ///
    /// Rust's `Result` type is used to distinguish the events, with `Ok` for time and data,
    /// and `Err` for frontiers. Frontiers are only presented when they change.
    ///
    /// # Examples
    /// ```
    /// use timely::dataflow::operators::{ToStream, Inspect};
    ///
    /// timely::example(|scope| {
    ///     (0..10).to_stream(scope)
    ///            .container::<Vec<_>>()
    ///            .inspect_core(|event| {
    ///                match event {
    ///                    Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()),
    ///                    Err(frontier) => println!("frontier advanced to {:?}", frontier),
    ///                }
    ///             });
    /// });
    /// ```
    fn inspect_core<F>(self, func: F) -> Self where F: FnMut(Result<(&T, &C), &[T]>)+'static;
}

impl<T: Timestamp, C: Container> Inspect<T, C> for Stream<'_, T, C>
where
    for<'a> &'a C: IntoIterator,
{
    fn inspect_core<F>(self, func: F) -> Self where F: FnMut(Result<(&T, &C), &[T]>) + 'static {
        self.inspect_container(func)
    }
}

/// Inspect containers
pub trait InspectCore<T: Timestamp, C> {
    /// Runs a supplied closure on each observed container, and each frontier advancement.
    ///
    /// Rust's `Result` type is used to distinguish the events, with `Ok` for time and data,
    /// and `Err` for frontiers. Frontiers are only presented when they change.
    ///
    /// # Examples
    /// ```
    /// use timely::dataflow::operators::{ToStream, InspectCore};
    ///
    /// timely::example(|scope| {
    ///     (0..10).to_stream(scope)
    ///            .container::<Vec<_>>()
    ///            .inspect_container(|event| {
    ///                match event {
    ///                    Ok((time, data)) => println!("seen at: {:?}\t{:?} records", time, data.len()),
    ///                    Err(frontier) => println!("frontier advanced to {:?}", frontier),
    ///                }
    ///             });
    /// });
    /// ```
    fn inspect_container<F>(self, func: F) -> Self where F: FnMut(Result<(&T, &C), &[T]>)+'static;
}

impl<T: Timestamp, C: Container> InspectCore<T, C> for Stream<'_, T, C> {

    fn inspect_container<F>(self, mut func: F) -> Self
        where F: FnMut(Result<(&T, &C), &[T]>)+'static
    {
        let mut frontier = crate::progress::Antichain::from_elem(T::minimum());
        self.unary_frontier(Pipeline, "InspectBatch", move |_,_| move |(input, chain), output| {
            if chain.frontier() != frontier.borrow() {
                frontier.clear();
                frontier.extend(chain.frontier().iter().cloned());
                func(Err(frontier.elements()));
            }
            input.for_each_time(|time, data| {
                let mut session = output.session(&time);
                for data in data {
                    func(Ok((&time, &*data)));
                    session.give_container(data);
                }
            });
        })
    }
}