Struct timely::dataflow::operators::FrontierNotificator[][src]

pub struct FrontierNotificator<T: Timestamp> { /* fields omitted */ }

Tracks requests for notification and delivers available notifications.

FrontierNotificator is meant to manage the delivery of requested notifications in the presence of inputs that may have outstanding messages to deliver. The notificator inspects the frontiers, as presented from the outside, for each input. Requested notifications can be served only once there are no frontier elements less-or-equal to them, and there are no other pending notification requests less than them. Each will be less-or-equal to itself, so we want to dodge that corner case.

#Examples

use std::collections::HashMap;
use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;

timely::execute(timely::Configuration::Thread, |worker| {
    let (mut in1, mut in2) = worker.dataflow(|scope| {
        let (in1_handle, in1) = scope.new_input();
        let (in2_handle, in2) = scope.new_input();
        in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| {
            let mut notificator = FrontierNotificator::new();
            let mut stash = HashMap::new();
            move |input1, input2, output| {
                while let Some((time, data)) = input1.next() {
                    stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.drain(..));
                    notificator.notify_at(time.retain());
                }
                while let Some((time, data)) = input2.next() {
                    stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.drain(..));
                    notificator.notify_at(time.retain());
                }
                notificator.for_each(&[input1.frontier(), input2.frontier()], |time, _| {
                    if let Some(mut vec) = stash.remove(time.time()) {
                        output.session(&time).give_iterator(vec.drain(..));
                    }
                });
            }
        }).inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));

        (in1_handle, in2_handle)
    });

    for i in 1..10 {
        in1.send(i - 1);
        in1.advance_to(i);
        in2.send(i - 1);
        in2.advance_to(i);
    }
    in1.close();
    in2.close();
}).unwrap();

Methods

impl<T: Timestamp> FrontierNotificator<T>
[src]

Allocates a new FrontierNotificator.

Allocates a new FrontierNotificator with initial capabilities.

Requests a notification at the time associated with capability cap. Takes ownership of the capability.

In order to request a notification at future timestamp, obtain a capability for the new timestamp first, as shown in the example.

#Examples

use timely::dataflow::operators::{ToStream, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;

timely::example(|scope| {
    (0..10).to_stream(scope)
           .unary_frontier(Pipeline, "example", |_, _| {
               let mut notificator = FrontierNotificator::new();
               move |input, output| {
                   input.for_each(|cap, data| {
                       output.session(&cap).give_content(data);
                       let mut time = cap.time().clone();
                       time.inner += 1;
                       notificator.notify_at(cap.delayed(&time));
                   });
                   notificator.for_each(&[input.frontier()], |cap, _| {
                       println!("done with time: {:?}", cap.time());
                   });
               }
           });
});

Requests a notification at the time associated with capability cap.

The method takes list of frontiers from which it determines if the capability is immediately available. When used with the same frontier as make_available, this method can ensure that notifications are non-decreasing. Simply using notify_at will only insert new notifications into the list of pending notifications, which are only re-examine with calls to make_available.

Enables pending notifications not in advance of any element of frontiers.

Returns the next available capability with respect to the supplied frontiers, if one exists.

In the interest of efficiency, this method may yield capabilities in decreasing order, in certain circumstances. If you want to iterate through capabilities with an in-order guarantee, either (i) use for_each

Repeatedly calls logic till exhaustion of the notifications made available by inspecting the frontiers.

logic receives a capability for t, the timestamp being notified.

Important traits for Notificator<'a, T>

Creates a notificator session in which delivered notification will be non-decreasing.

This implementation can be emulated with judicious use of make_available and notify_at_frontiered, in the event that Notificator provides too restrictive an interface.

Important traits for Iter<'a, T>

Iterates over pending capabilities and their count. The count represents how often a capability has been requested.

To make sure all pending capabilities are above the frontier, use for_each or exhaust next to consume all available capabilities.

#Examples

use timely::dataflow::operators::{ToStream, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;

timely::example(|scope| {
    (0..10).to_stream(scope)
           .unary_frontier(Pipeline, "example", |_, _| {
               let mut notificator = FrontierNotificator::new();
               move |input, output| {
                   input.for_each(|cap, data| {
                       output.session(&cap).give_content(data);
                       let mut time = cap.time().clone();
                       time.inner += 1;
                       notificator.notify_at(cap.delayed(&time));
                       assert_eq!(notificator.pending().filter(|t| t.0.time() == &time).count(), 1);
                   });
                   notificator.for_each(&[input.frontier()], |cap, _| {
                       println!("done with time: {:?}", cap.time());
                   });
               }
           });
});

Auto Trait Implementations

impl<T> !Send for FrontierNotificator<T>

impl<T> !Sync for FrontierNotificator<T>