Struct timely::dataflow::operators::FrontierNotificator
[−]
[src]
pub struct FrontierNotificator<T: Timestamp> { /* fields omitted */ }
Tracks requests for notification and delivers available notifications.
FrontierNotificator
is meant to manage the delivery of requested notifications in the
presence of inputs that may have outstanding messages to deliver.
The notificator inspects the frontiers, as presented from the outside, for each input.
Requested notifications can be served only once there are no frontier elements less-or-equal
to them, and there are no other pending notification requests less than them. Each will be
less-or-equal to itself, so we want to dodge that corner case.
Examples
use std::collections::HashMap; use timely::dataflow::operators::{Input, Inspect, FrontierNotificator}; use timely::dataflow::operators::generic::operator::Operator; use timely::dataflow::channels::pact::Pipeline; timely::execute(timely::Configuration::Thread, |worker| { let (mut in1, mut in2) = worker.dataflow(|scope| { let (in1_handle, in1) = scope.new_input(); let (in2_handle, in2) = scope.new_input(); in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _default_cap| { let mut notificator = FrontierNotificator::new(); let mut stash = HashMap::new(); move |input1, input2, output| { while let Some((time, data)) = input1.next() { stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.drain(..)); notificator.notify_at(time); } while let Some((time, data)) = input2.next() { stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.drain(..)); notificator.notify_at(time); } notificator.for_each(&[input1.frontier(), input2.frontier()], |time, _| { if let Some(mut vec) = stash.remove(time.time()) { output.session(&time).give_iterator(vec.drain(..)); } }); } }).inspect_batch(|t, x| println!("{:?} -> {:?}", t, x)); (in1_handle, in2_handle) }); for i in 1..10 { in1.send(i - 1); in1.advance_to(i); in2.send(i - 1); in2.advance_to(i); } in1.close(); in2.close(); }).unwrap();
Methods
impl<T: Timestamp> FrontierNotificator<T>
[src]
fn new() -> FrontierNotificator<T>
[src]
Allocates a new Notificator
.
fn notify_at(&mut self, cap: Capability<T>)
[src]
Requests a notification at the time associated with capability cap
. Takes ownership of
the capability.
In order to request a notification at future timestamp, obtain a capability for the new timestamp first, as shown in the example.
Examples
use timely::dataflow::operators::{ToStream, FrontierNotificator}; use timely::dataflow::operators::generic::operator::Operator; use timely::dataflow::channels::pact::Pipeline; timely::example(|scope| { (0..10).to_stream(scope) .unary_frontier(Pipeline, "example", |_| { let mut notificator = FrontierNotificator::new(); move |input, output| { input.for_each(|cap, data| { output.session(&cap).give_content(data); let mut time = cap.time().clone(); time.inner += 1; notificator.notify_at(cap.delayed(&time)); }); notificator.for_each(&[input.frontier()], |cap, _| { println!("done with time: {:?}", cap.time()); }); } }); });
fn drain<'a>(
&'a mut self,
frontiers: &'a [&'a MutableAntichain<T>]
) -> Drain<'a, Capability<T>>
[src]
&'a mut self,
frontiers: &'a [&'a MutableAntichain<T>]
) -> Drain<'a, Capability<T>>
Iterate over the notifications made available by inspecting the frontiers.
fn for_each<'a, F: FnMut(Capability<T>, &mut FrontierNotificator<T>)>(
&mut self,
frontiers: &'a [&'a MutableAntichain<T>],
logic: F
)
[src]
&mut self,
frontiers: &'a [&'a MutableAntichain<T>],
logic: F
)
Repeatedly calls logic
till exhaustion of the notifications made available by inspecting
the frontiers.
logic
receives a capability for t
, the timestamp being notified.