timely 0.29.0

A low-latency data-parallel dataflow system in Rust
Documentation
//! Create new `Streams` connected to external inputs.

use crate::container::CapacityContainerBuilder;
use crate::progress::Timestamp;
use crate::dataflow::operators::{ActivateCapability};
use crate::dataflow::operators::core::{UnorderedInput as UnorderedInputCore, UnorderedHandle as UnorderedHandleCore};
use crate::dataflow::{StreamVec, Scope};

/// Create a new `StreamVec` and `Handle` through which to supply input.
pub trait UnorderedInput<'scope, T: Timestamp> {
    /// Create a new capability-based `StreamVec` and `Handle` through which to supply input. This
    /// input supports multiple open epochs (timestamps) at the same time.
    ///
    /// The `new_unordered_input` method returns `((Handle, Capability), Stream)` where the `StreamVec` can be used
    /// immediately for timely dataflow construction, `Handle` and `Capability` are later used to introduce
    /// data into the timely dataflow computation.
    ///
    /// The `Capability` returned is for the default value of the timestamp type in use. The
    /// capability can be dropped to inform the system that the input has advanced beyond the
    /// capability's timestamp. To retain the ability to send, a new capability at a later timestamp
    /// should be obtained first, via the `delayed` function for `Capability`.
    ///
    /// To communicate the end-of-input drop all available capabilities.
    ///
    /// # Examples
    ///
    /// ```
    /// use std::sync::{Arc, Mutex};
    ///
    /// use timely::*;
    /// use timely::dataflow::operators::*;
    /// use timely::dataflow::operators::vec::UnorderedInput;
    /// use timely::dataflow::operators::capture::Extract;
    ///
    /// // get send and recv endpoints, wrap send to share
    /// let (send, recv) = ::std::sync::mpsc::channel();
    /// let send = Arc::new(Mutex::new(send));
    ///
    /// timely::execute(Config::thread(), move |worker| {
    ///
    ///     // this is only to validate the output.
    ///     let send = send.lock().unwrap().clone();
    ///
    ///     // create and capture the unordered input.
    ///     let (mut input, mut cap) = worker.dataflow::<usize,_,_>(|scope| {
    ///         let (input, stream) = scope.new_unordered_input();
    ///         stream.capture_into(send);
    ///         input
    ///     });
    ///
    ///     // feed values 0..10 at times 0..10.
    ///     for round in 0..10 {
    ///         input.activate().session(&cap).give(round);
    ///         cap = cap.delayed(&(round + 1));
    ///         worker.step();
    ///     }
    /// }).unwrap();
    ///
    /// let extract = recv.extract();
    /// for i in 0..10 {
    ///     assert_eq!(extract[i], (i, vec![i]));
    /// }
    /// ```
    fn new_unordered_input<D: 'static>(&self) -> ((UnorderedHandle<T, D>, ActivateCapability<T>), StreamVec<'scope, T, D>);
}


impl<'scope, T: Timestamp> UnorderedInput<'scope, T> for Scope<'scope, T> {
    fn new_unordered_input<D: 'static>(&self) -> ((UnorderedHandle<T, D>, ActivateCapability<T>), StreamVec<'scope, T, D>) {
        UnorderedInputCore::new_unordered_input(self)
    }
}

/// An unordered handle specialized to vectors.
pub type UnorderedHandle<T, D> = UnorderedHandleCore<T, CapacityContainerBuilder<Vec<D>>>;