Struct timely::dataflow::operators::FrontierNotificator [] [src]

pub struct FrontierNotificator<T: Timestamp> { /* fields omitted */ }

Tracks requests for notification and delivers available notifications.

FrontierNotificator is meant to manage the delivery of requested notifications in the presence of inputs that may have outstanding messages to deliver. The notificator inspects the frontiers, as presented from the outside, for each input. Requested notifications can be served only once there are no frontier elements less-or-equal to them, and there are no other pending notification requests less than them. Each will be less-or-equal to itself, so we want to dodge that corner case.

Examples

use std::collections::HashMap;
use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;

timely::execute(timely::Configuration::Thread, |worker| {
    let (mut in1, mut in2) = worker.dataflow(|scope| {
        let (in1_handle, in1) = scope.new_input();
        let (in2_handle, in2) = scope.new_input();
        in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _default_cap| {
            let mut notificator = FrontierNotificator::new();
            let mut stash = HashMap::new();
            move |input1, input2, output| {
                while let Some((time, data)) = input1.next() {
                    stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.drain(..));
                    notificator.notify_at(time);
                }
                while let Some((time, data)) = input2.next() {
                    stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.drain(..));
                    notificator.notify_at(time);
                }
                notificator.for_each(&[input1.frontier(), input2.frontier()], |time, _| {
                    if let Some(mut vec) = stash.remove(time.time()) {
                        output.session(&time).give_iterator(vec.drain(..));
                    }
                });
            }
        }).inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));

        (in1_handle, in2_handle)
    });

    for i in 1..10 {
        in1.send(i - 1);
        in1.advance_to(i);
        in2.send(i - 1);
        in2.advance_to(i);
    }
    in1.close();
    in2.close();
}).unwrap();

Methods

impl<T: Timestamp> FrontierNotificator<T>
[src]

[src]

Allocates a new Notificator.

[src]

Requests a notification at the time associated with capability cap. Takes ownership of the capability.

In order to request a notification at future timestamp, obtain a capability for the new timestamp first, as shown in the example.

Examples

use timely::dataflow::operators::{ToStream, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;

timely::example(|scope| {
    (0..10).to_stream(scope)
           .unary_frontier(Pipeline, "example", |_| {
               let mut notificator = FrontierNotificator::new();
               move |input, output| {
                   input.for_each(|cap, data| {
                       output.session(&cap).give_content(data);
                       let mut time = cap.time().clone();
                       time.inner += 1;
                       notificator.notify_at(cap.delayed(&time));
                   });
                   notificator.for_each(&[input.frontier()], |cap, _| {
                       println!("done with time: {:?}", cap.time());
                   });
               }
           });
});

[src]

Iterate over the notifications made available by inspecting the frontiers.

[src]

Repeatedly calls logic till exhaustion of the notifications made available by inspecting the frontiers.

logic receives a capability for t, the timestamp being notified.