timely 0.29.0

A low-latency data-parallel dataflow system in Rust
Documentation
//! Types to build operators with general shapes.

use std::rc::Rc;
use std::cell::RefCell;
use std::default::Default;

use crate::progress::{ChangeBatch, Timestamp};
use crate::progress::operate::SharedProgress;
use crate::progress::frontier::{Antichain, MutableAntichain};

use crate::Container;
use crate::dataflow::{Scope, Stream};
use crate::dataflow::channels::pushers::Counter as PushCounter;
use crate::dataflow::channels::pushers;
use crate::dataflow::channels::pact::ParallelizationContract;
use crate::dataflow::channels::pullers::Counter as PullCounter;
use crate::dataflow::operators::capability::Capability;
use crate::dataflow::operators::generic::handles::{InputHandleCore, new_input_handle};
use crate::dataflow::operators::generic::operator_info::OperatorInfo;
use crate::dataflow::operators::generic::builder_raw::OperatorShape;
use crate::progress::operate::{FrontierInterest, PortConnectivity};

use super::builder_raw::OperatorBuilder as OperatorBuilderRaw;

/// Builds operators with generic shape.
#[derive(Debug)]
pub struct OperatorBuilder<'scope, T: Timestamp> {
    builder: OperatorBuilderRaw<'scope, T>,
    frontier: Vec<MutableAntichain<T>>,
    consumed: Vec<Rc<RefCell<ChangeBatch<T>>>>,
    internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
    /// For each input, a shared list of summaries to each output.
    summaries: Vec<Rc<RefCell<PortConnectivity<<T as Timestamp>::Summary>>>>,
    produced: Vec<Rc<RefCell<ChangeBatch<T>>>>,
}

impl<'scope, T: Timestamp> OperatorBuilder<'scope, T> {

    /// Allocates a new generic operator builder from its containing scope.
    pub fn new(name: String, scope: Scope<'scope, T>) -> Self {
        OperatorBuilder {
            builder: OperatorBuilderRaw::new(name, scope),
            frontier: Vec::new(),
            consumed: Vec::new(),
            internal: Rc::new(RefCell::new(Vec::new())),
            summaries: Vec::new(),
            produced: Vec::new(),
        }
    }

    /// Sets frontier interest for a specific input.
    pub fn set_notify_for(&mut self, input: usize, notify: FrontierInterest) {
        self.builder.set_notify_for(input, notify);
    }

    /// Adds a new input to a generic operator builder, returning the `Pull` implementor to use.
    pub fn new_input<C: Container, P>(&mut self, stream: Stream<'scope, T, C>, pact: P) -> InputHandleCore<T, C, P::Puller>
    where
        P: ParallelizationContract<T, C> {

        let connection = (0..self.builder.shape().outputs()).map(|o| (o, Antichain::from_elem(Default::default())));
        self.new_input_connection(stream, pact, connection)
    }

    /// Adds a new input with connection information to a generic operator builder, returning the `Pull` implementor to use.
    ///
    /// The `connection` parameter contains promises made by the operator for each of the existing *outputs*, that any timestamp
    /// appearing at the input, any output timestamp will be greater than or equal to the input timestamp subjected to a `Summary`
    /// greater or equal to some element of the corresponding antichain in `connection`.
    ///
    /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty
    /// antichain indicating that there is no connection from the input to the output.
    pub fn new_input_connection<C: Container, P, I>(&mut self, stream: Stream<'scope, T, C>, pact: P, connection: I) -> InputHandleCore<T, C, P::Puller>
    where
        P: ParallelizationContract<T, C>,
        I: IntoIterator<Item = (usize, Antichain<<T as Timestamp>::Summary>)> + Clone,
    {
        let puller = self.builder.new_input_connection(stream, pact, connection.clone());

        let input = PullCounter::new(puller);
        self.frontier.push(MutableAntichain::new());
        self.consumed.push(Rc::clone(input.consumed()));

        let shared_summary = Rc::new(RefCell::new(connection.into_iter().collect()));
        self.summaries.push(Rc::clone(&shared_summary));

        new_input_handle(input, Rc::clone(&self.internal), shared_summary)
    }

    /// Adds a new output to a generic operator builder, returning the `Push` implementor to use.
    pub fn new_output<C: Container>(&mut self) -> (pushers::Output<T, C>, Stream<'scope, T, C>) {
        let connection = (0..self.builder.shape().inputs()).map(|i| (i, Antichain::from_elem(Default::default())));
        self.new_output_connection(connection)
    }

    /// Adds a new output with connection information to a generic operator builder, returning the `Push` implementor to use.
    ///
    /// The `connection` parameter contains promises made by the operator for each of the existing *inputs*, that any timestamp
    /// appearing at the input, any output timestamp will be greater than or equal to the input timestamp subjected to a `Summary`
    /// greater or equal to some element of the corresponding antichain in `connection`.
    ///
    /// Commonly the connections are either the unit summary, indicating the same timestamp might be produced as output, or an empty
    /// antichain indicating that there is no connection from the input to the output.
    pub fn new_output_connection<C: Container, I>(&mut self, connection: I) -> (pushers::Output<T, C>, Stream<'scope, T, C>)
    where
        I: IntoIterator<Item = (usize, Antichain<<T as Timestamp>::Summary>)> + Clone,
    {
        let new_output = self.shape().outputs();
        let (tee, stream) = self.builder.new_output_connection(connection.clone());

        let internal = Rc::new(RefCell::new(ChangeBatch::new()));
        self.internal.borrow_mut().push(Rc::clone(&internal));

        let counter = PushCounter::new(tee);
        self.produced.push(Rc::clone(counter.produced()));

        for (input, entry) in connection {
            self.summaries[input].borrow_mut().add_port(new_output, entry);
        }

        (pushers::Output::new(counter, internal, new_output), stream)
    }

    /// Creates an operator implementation from supplied logic constructor.
    pub fn build<B, L>(self, constructor: B)
    where
        B: FnOnce(Vec<Capability<T>>) -> L,
        L: FnMut(&[MutableAntichain<T>])+'static
    {
        self.build_reschedule(|caps| {
            let mut logic = constructor(caps);
            move |frontier| { logic(frontier); false }
        })
    }

    /// Creates an operator implementation from supplied logic constructor.
    ///
    /// Unlike `build`, the supplied closure can indicate if the operator
    /// should be considered incomplete. A not-incomplete operator will be
    /// shut down if it has empty input frontiers and holds no capabilities.
    /// Flagging oneself as incomplete is most commonly used by operators
    /// that manage external resources like file writes or transactions that
    /// must complete before the operator should be shut down.
    ///
    /// This method boxes `B` and `L` and delegates to [`build_reschedule_boxed`].
    /// For the fully generic (non-boxing) path, see [`build_reschedule_typed`].
    pub fn build_reschedule<B, L>(self, constructor: B)
    where
        B: FnOnce(Vec<Capability<T>>) -> L,
        L: FnMut(&[MutableAntichain<T>])->bool+'static
    {
        self.build_reschedule_boxed(Box::new(|caps| -> Box<dyn FnMut(&[MutableAntichain<T>])->bool> { Box::new(constructor(caps)) }));
    }

    /// Like `build_reschedule`, but with a pre-boxed constructor.
    ///
    /// This method exists primarily to force the `Box<dyn ...>` coercions, which
    /// can otherwise easily be `Box<B>` or `Box<L>` for specialized `B` and `L` instead.
    pub fn build_reschedule_boxed<'a>(self, constructor: Box<dyn FnOnce(Vec<Capability<T>>) -> Box<dyn FnMut(&[MutableAntichain<T>])->bool> + 'a>) {
        self.build_reschedule_typed(constructor);
    }

    /// Like `build_reschedule`, but specialized to the closure types `B` and `L`.
    ///
    /// This method is instantiated once per distinct `(B, L)` pair, and one
    /// should be mindful of monomorphization bloat. Callers with many closures
    /// should consider erasing their variation, for example via `Box<dyn ...>`.
    ///
    /// This method calls `build_typed` directly using a new closure, mirroring
    /// the variation in `L`, rather than forcing it to be reboxed via `build`.
    pub fn build_reschedule_typed<B, L>(self, constructor: B)
    where
        B: FnOnce(Vec<Capability<T>>) -> L,
        L: FnMut(&[MutableAntichain<T>])->bool+'static
    {
        let mut logic = constructor(self.mint_capabilities());

        let mut bookkeeping = ProgressBookkeeping {
            frontier: self.frontier,
            consumed: self.consumed,
            internal: self.internal,
            produced: self.produced,
        };

        let raw_logic =
        move |progress: &mut SharedProgress<T>| {
            bookkeeping.drain_frontiers(progress);
            let result = logic(bookkeeping.frontiers());
            bookkeeping.publish_progress(progress);
            result
        };

        self.builder.build_typed(raw_logic);
    }

    /// Create initial capabilities, one per output, and clear their creation evidence.
    ///
    /// This method is specifically outlined from `Self::build_reschedule_typed` to avoid
    /// monomorphization bloat, as it depends only on `T`, not on the closures.
    fn mint_capabilities(&self) -> Vec<Capability<T>> {
        let mut capabilities = Vec::with_capacity(self.internal.borrow().len());
        for batch in self.internal.borrow().iter() {
            capabilities.push(Capability::new(T::minimum(), Rc::clone(batch)));
            // Discard evidence of creation, as we are assumed to start with one.
            batch.borrow_mut().clear();
        }
        capabilities
    }

    /// Get the identifier assigned to the operator being constructed
    pub fn index(&self) -> usize { self.builder.index() }

    /// The operator's worker-unique identifier.
    pub fn global(&self) -> usize { self.builder.global() }

    /// Return a reference to the operator's shape
    pub fn shape(&self) -> &OperatorShape { self.builder.shape() }

    /// Creates operator info for the operator.
    pub fn operator_info(&self) -> OperatorInfo { self.builder.operator_info() }
}


/// Progress-tracking state that is independent of operator logic.
///
/// Extracted so that `drain_frontiers` and `publish_progress` are monomorphized
/// once per timestamp type `T`, rather than once per closure type passed to
/// `build_reschedule`.
struct ProgressBookkeeping<T: Timestamp> {
    frontier: Vec<MutableAntichain<T>>,
    consumed: Vec<Rc<RefCell<ChangeBatch<T>>>>,
    internal: Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>,
    produced: Vec<Rc<RefCell<ChangeBatch<T>>>>,
}

impl<T: Timestamp> ProgressBookkeeping<T> {
    /// The current input frontiers, for passing to operator logic.
    #[inline(always)] fn frontiers(&self) -> &[MutableAntichain<T>] { &self.frontier[..] }

    /// Drain incoming frontier changes from `SharedProgress` into our local antichains.
    fn drain_frontiers(&mut self, progress: &mut SharedProgress<T>) {
        for (progress, frontier) in progress.frontiers.iter_mut().zip(self.frontier.iter_mut()) {
            frontier.update_iter(progress.drain());
        }
    }

    /// Publish consumed, internal, and produced changes back to `SharedProgress`.
    fn publish_progress(&self, progress: &mut SharedProgress<T>) {
        // move batches of consumed changes.
        for (progress, consumed) in progress.consumeds.iter_mut().zip(self.consumed.iter()) {
            consumed.borrow_mut().drain_into(progress);
        }

        // move batches of internal changes.
        let self_internal_borrow = self.internal.borrow_mut();
        for index in 0 .. self_internal_borrow.len() {
            let mut borrow = self_internal_borrow[index].borrow_mut();
            progress.internals[index].extend(borrow.drain());
        }

        // move batches of produced changes.
        for (progress, produced) in progress.produceds.iter_mut().zip(self.produced.iter()) {
            produced.borrow_mut().drain_into(progress);
        }
    }
}

#[cfg(test)]
mod tests {
    use crate::dataflow::operators::generic::OutputBuilder;

    #[test]
    #[should_panic]
    fn incorrect_capabilities() {

        // This tests that if we attempt to use a capability associated with the
        // wrong output, there is a run-time assertion.

        use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;

        crate::example(|scope| {

            let mut builder = OperatorBuilder::new("Failure".to_owned(), scope.clone());

            let (output1, _stream1) = builder.new_output::<Vec<()>>();
            let (output2, _stream2) = builder.new_output::<Vec<()>>();
            let mut output1 = OutputBuilder::from(output1);
            let mut output2 = OutputBuilder::from(output2);

            builder.build(move |capabilities| {
                move |_frontiers| {

                    let mut output_handle1 = output1.activate();
                    let mut output_handle2 = output2.activate();

                    // NOTE: Using incorrect capabilities here.
                    output_handle2.session(&capabilities[0]);
                    output_handle1.session(&capabilities[1]);
                }
            });
        })
    }

    #[test]
    fn correct_capabilities() {

        // This tests that if we attempt to use capabilities with the correct outputs
        // there is no runtime assertion

        use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;

        crate::example(|scope| {

            let mut builder = OperatorBuilder::new("Failure".to_owned(), scope.clone());

            let (output1, _stream1) = builder.new_output::<Vec<()>>();
            let (output2, _stream2) = builder.new_output::<Vec<()>>();
            let mut output1 = OutputBuilder::from(output1);
            let mut output2 = OutputBuilder::from(output2);

            builder.build(move |mut capabilities| {
                move |_frontiers| {

                    let mut output_handle1 = output1.activate();
                    let mut output_handle2 = output2.activate();

                    // Avoid second call.
                    if !capabilities.is_empty() {

                        // NOTE: Using correct capabilities here.
                        output_handle1.session(&capabilities[0]);
                        output_handle2.session(&capabilities[1]);

                        capabilities.clear();
                    }
                }
            });

            "Hello".to_owned()
        });
    }
}