Trait timely::dataflow::operators::delay::Delay [] [src]

pub trait Delay<G: Scope, D: Data> {
    fn delay<F: Fn(&D, &G::Timestamp) -> G::Timestamp + 'static>(
        &self,
        _: F
    ) -> Self;
fn delay_batch<F: Fn(&G::Timestamp) -> G::Timestamp + 'static>(
        &self,
        _: F
    ) -> Self; }

Methods to advance the timestamps of records or batches of records.

Required Methods

Advances the timestamp of records using a supplied function.

The function must advance the timestamp; the operator will test that the new timestamp is greater or equal to the old timestamp, and will assert if it is not.

Examples

The following example takes the sequence 0..10 at time RootTimestamp(0) and delays each element i to time RootTimestamp(i).

use timely::dataflow::operators::{ToStream, Delay};
use timely::dataflow::operators::generic::unary::Unary;
use timely::dataflow::channels::pact::Pipeline;
use timely::progress::timestamp::RootTimestamp;

timely::example(|scope| {
    (0..10).to_stream(scope)
           .delay(|data, time| RootTimestamp::new(*data))
           .unary_stream(Pipeline, "example", |input, output| {
               input.for_each(|time, data| {
                   println!("data at time: {:?}", time);
                   output.session(&time).give_content(data);
               });
           });
});

Advances the timestamp of batches of records using a supplied function.

The function must advance the timestamp; the operator will test that the new timestamp is greater or equal to the old timestamp, and will assert if it is not. The batch version does not consult the data, and may only view the timestamp itself.

Examples

The following example takes the sequence 0..10 at time RootTimestamp(0) and delays each batch (there is just one) to time RootTimestamp(1).

use timely::dataflow::operators::{ToStream, Delay};
use timely::dataflow::operators::generic::unary::Unary;
use timely::dataflow::channels::pact::Pipeline;
use timely::progress::timestamp::RootTimestamp;

timely::example(|scope| {
    (0..10).to_stream(scope)
           .delay_batch(|time| RootTimestamp::new(time.inner + 1))
           .unary_stream(Pipeline, "example", |input, output| {
               input.for_each(|time, data| {
                   println!("data at time: {:?}", time);
                   output.session(&time).give_content(data);
               });
           });
});

Implementors