Trait timely::dataflow::operators::unordered_input::UnorderedInput [] [src]

pub trait UnorderedInput<G: Scope> {
    fn new_unordered_input<D: Data>(
        &mut self
    ) -> ((UnorderedHandle<G::Timestamp, D>, Capability<G::Timestamp>), Stream<G, D>); }

Create a new Stream and Handle through which to supply input.

Required Methods

Create a new capability-based Stream and Handle through which to supply input. This input supports multiple open epochs (timestamps) at the same time.

The new_unordered_input method returns ((Handle, Capability), Stream) where the Stream can be used immediately for timely dataflow construction, Handle and Capability are later used to introduce data into the timely dataflow computation.

The Capability returned is for the default value of the timestamp type in use. The capability can be dropped to inform the system that the input has advanced beyond the capability's timestamp. To retain the ability to send, a new capability at a later timestamp should be obtained first, via the delayed function for Capability.

To communicate the end-of-input drop all available capabilities.

Examples

use std::sync::{Arc, Mutex};

use timely::*;
use timely::dataflow::operators::*;
use timely::dataflow::operators::capture::Extract;
use timely::dataflow::Stream;
use timely::progress::timestamp::RootTimestamp;

// get send and recv endpoints, wrap send to share
let (send, recv) = ::std::sync::mpsc::channel();
let send = Arc::new(Mutex::new(send));

timely::execute(Configuration::Thread, move |worker| {

    // this is only to validate the output.
    let send = send.lock().unwrap().clone();

    // create and capture the unordered input.
    let (mut input, mut cap) = worker.dataflow(|scope| {
        let (input, stream) = scope.new_unordered_input();
        stream.capture_into(send);
        input
    });

    // feed values 0..10 at times 0..10.
    for round in 0..10 {
        input.session(cap.clone()).give(round);
        cap = cap.delayed(&RootTimestamp::new(round + 1));
        worker.step();
    }
}).unwrap();
 
let extract = recv.extract();
for i in 0..10 {
    assert_eq!(extract[i], (RootTimestamp::new(i), vec![i]));
}

Implementors