Trait timely::dataflow::operators::reclock::Reclock [] [src]

pub trait Reclock<S: Scope, D: Data> {
    fn reclock(&self, clock: &Stream<S, ()>) -> Stream<S, D>;
}

Extension trait for reclocking a stream.

Required Methods

Delays records until an input is observed on the clock input.

The source stream is buffered until a record is seen on the clock input, at which point a notification is requested and all data with time less or equal to the clock time are sent. This method does not ensure that all workers receive the same clock records, which can be accomplished with broadcast.

Examples

use timely::dataflow::operators::{ToStream, Delay, Map, Reclock, Capture};
use timely::dataflow::operators::capture::Extract;
use timely::progress::timestamp::RootTimestamp;

let captured = timely::example(|scope| {
 
    // produce data 0..10 at times 0..10.
    let data = (0..10).to_stream(scope)
                      .delay(|x,t| RootTimestamp::new(*x));
 
    // product clock ticks at three times.
    let clock = vec![3, 5, 8].into_iter()
                             .to_stream(scope)
                             .delay(|x,t| RootTimestamp::new(*x))
                             .map(|_| ());

    // reclock the data.
    data.reclock(&clock)
        .capture()
});

let extracted = captured.extract();
assert_eq!(extracted.len(), 3);
assert_eq!(extracted[0], (RootTimestamp::new(3), vec![0,1,2,3]));
assert_eq!(extracted[1], (RootTimestamp::new(5), vec![4,5]));
assert_eq!(extracted[2], (RootTimestamp::new(8), vec![6,7,8]));

Implementors