Struct timely::dataflow::operators::FrontierNotificator[][src]

pub struct FrontierNotificator<T: Timestamp> { /* fields omitted */ }

Tracks requests for notification and delivers available notifications.

FrontierNotificator is meant to manage the delivery of requested notifications in the presence of inputs that may have outstanding messages to deliver. The notificator inspects the frontiers, as presented from the outside, for each input. Requested notifications can be served only once there are no frontier elements less-or-equal to them, and there are no other pending notification requests less than them. Each will be less-or-equal to itself, so we want to dodge that corner case.

Examples

use std::collections::HashMap;
use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;

timely::execute(timely::Config::thread(), |worker| {
    let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
        let (in1_handle, in1) = scope.new_input();
        let (in2_handle, in2) = scope.new_input();
        in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| {
            let mut notificator = FrontierNotificator::new();
            let mut stash = HashMap::new();
            let mut vector1 = Vec::new();
            let mut vector2 = Vec::new();
            move |input1, input2, output| {
                while let Some((time, data)) = input1.next() {
                    data.swap(&mut vector1);
                    stash.entry(time.time().clone()).or_insert(Vec::new()).extend(vector1.drain(..));
                    notificator.notify_at(time.retain());
                }
                while let Some((time, data)) = input2.next() {
                    data.swap(&mut vector2);
                    stash.entry(time.time().clone()).or_insert(Vec::new()).extend(vector2.drain(..));
                    notificator.notify_at(time.retain());
                }
                notificator.for_each(&[input1.frontier(), input2.frontier()], |time, _| {
                    if let Some(mut vec) = stash.remove(time.time()) {
                        output.session(&time).give_iterator(vec.drain(..));
                    }
                });
            }
        }).inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));

        (in1_handle, in2_handle)
    });

    for i in 1..10 {
        in1.send(i - 1);
        in1.advance_to(i);
        in2.send(i - 1);
        in2.advance_to(i);
    }
    in1.close();
    in2.close();
}).unwrap();

Implementations

impl<T: Timestamp> FrontierNotificator<T>[src]

pub fn new() -> Self[src]

Allocates a new FrontierNotificator.

pub fn from<I: IntoIterator<Item = Capability<T>>>(iter: I) -> Self[src]

Allocates a new FrontierNotificator with initial capabilities.

pub fn notify_at<'a>(&mut self, cap: Capability<T>)[src]

Requests a notification at the time associated with capability cap. Takes ownership of the capability.

In order to request a notification at future timestamp, obtain a capability for the new timestamp first, as shown in the example.

Examples

use timely::dataflow::operators::{ToStream, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;

timely::example(|scope| {
    (0..10).to_stream(scope)
           .unary_frontier(Pipeline, "example", |_, _| {
               let mut notificator = FrontierNotificator::new();
               move |input, output| {
                   input.for_each(|cap, data| {
                       output.session(&cap).give_vec(&mut data.replace(Vec::new()));
                       let time = cap.time().clone() + 1;
                       notificator.notify_at(cap.delayed(&time));
                   });
                   notificator.for_each(&[input.frontier()], |cap, _| {
                       println!("done with time: {:?}", cap.time());
                   });
               }
           });
});

pub fn notify_at_frontiered<'a>(
    &mut self,
    cap: Capability<T>,
    frontiers: &'a [&'a MutableAntichain<T>]
)
[src]

Requests a notification at the time associated with capability cap.

The method takes list of frontiers from which it determines if the capability is immediately available. When used with the same frontier as make_available, this method can ensure that notifications are non-decreasing. Simply using notify_at will only insert new notifications into the list of pending notifications, which are only re-examine with calls to make_available.

pub fn make_available<'a>(&mut self, frontiers: &'a [&'a MutableAntichain<T>])[src]

Enables pending notifications not in advance of any element of frontiers.

pub fn next<'a>(
    &mut self,
    frontiers: &'a [&'a MutableAntichain<T>]
) -> Option<Capability<T>>
[src]

Returns the next available capability with respect to the supplied frontiers, if one exists.

In the interest of efficiency, this method may yield capabilities in decreasing order, in certain circumstances. If you want to iterate through capabilities with an in-order guarantee, either (i) use for_each

pub fn for_each<'a, F: FnMut(Capability<T>, &mut FrontierNotificator<T>)>(
    &mut self,
    frontiers: &'a [&'a MutableAntichain<T>],
    logic: F
)
[src]

Repeatedly calls logic till exhaustion of the notifications made available by inspecting the frontiers.

logic receives a capability for t, the timestamp being notified.

pub fn monotonic<'a>(
    &'a mut self,
    frontiers: &'a [&'a MutableAntichain<T>],
    logging: &'a Option<Logger>
) -> Notificator<'a, T>

Notable traits for Notificator<'a, T>

impl<'a, T: Timestamp> Iterator for Notificator<'a, T> type Item = (Capability<T>, u64);
[src]

Creates a notificator session in which delivered notification will be non-decreasing.

This implementation can be emulated with judicious use of make_available and notify_at_frontiered, in the event that Notificator provides too restrictive an interface.

pub fn pending<'a>(&'a self) -> Iter<'a, (Capability<T>, u64)>[src]

Iterates over pending capabilities and their count. The count represents how often a capability has been requested.

To make sure all pending capabilities are above the frontier, use for_each or exhaust next to consume all available capabilities.

Examples

use timely::dataflow::operators::{ToStream, FrontierNotificator};
use timely::dataflow::operators::generic::operator::Operator;
use timely::dataflow::channels::pact::Pipeline;

timely::example(|scope| {
    (0..10).to_stream(scope)
           .unary_frontier(Pipeline, "example", |_, _| {
               let mut notificator = FrontierNotificator::new();
               move |input, output| {
                   input.for_each(|cap, data| {
                       output.session(&cap).give_vec(&mut data.replace(Vec::new()));
                       let time = cap.time().clone() + 1;
                       notificator.notify_at(cap.delayed(&time));
                       assert_eq!(notificator.pending().filter(|t| t.0.time() == &time).count(), 1);
                   });
                   notificator.for_each(&[input.frontier()], |cap, _| {
                       println!("done with time: {:?}", cap.time());
                   });
               }
           });
});

Auto Trait Implementations

impl<T> !RefUnwindSafe for FrontierNotificator<T>

impl<T> !Send for FrontierNotificator<T>

impl<T> !Sync for FrontierNotificator<T>

impl<T> Unpin for FrontierNotificator<T> where
    T: Unpin

impl<T> !UnwindSafe for FrontierNotificator<T>

Blanket Implementations

impl<T> Any for T where
    T: 'static + ?Sized
[src]

impl<T> Borrow<T> for T where
    T: ?Sized
[src]

impl<T> BorrowMut<T> for T where
    T: ?Sized
[src]

impl<T> From<T> for T[src]

impl<T, U> Into<U> for T where
    U: From<T>, 
[src]

impl<T, U> TryFrom<U> for T where
    U: Into<T>, 
[src]

type Error = Infallible

The type returned in the event of a conversion error.

impl<T, U> TryInto<U> for T where
    U: TryFrom<T>, 
[src]

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.