Struct timely::dataflow::operators::generic::FrontierNotificator [−][src]
Tracks requests for notification and delivers available notifications.
FrontierNotificator
is meant to manage the delivery of requested notifications in the
presence of inputs that may have outstanding messages to deliver.
The notificator inspects the frontiers, as presented from the outside, for each input.
Requested notifications can be served only once there are no frontier elements less-or-equal
to them, and there are no other pending notification requests less than them. Each will be
less-or-equal to itself, so we want to dodge that corner case.
Examples
use std::collections::HashMap; use timely::dataflow::operators::{Input, Inspect, FrontierNotificator}; use timely::dataflow::operators::generic::operator::Operator; use timely::dataflow::channels::pact::Pipeline; timely::execute(timely::Config::thread(), |worker| { let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| { let (in1_handle, in1) = scope.new_input(); let (in2_handle, in2) = scope.new_input(); in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| { let mut notificator = FrontierNotificator::new(); let mut stash = HashMap::new(); let mut vector1 = Vec::new(); let mut vector2 = Vec::new(); move |input1, input2, output| { while let Some((time, data)) = input1.next() { data.swap(&mut vector1); stash.entry(time.time().clone()).or_insert(Vec::new()).extend(vector1.drain(..)); notificator.notify_at(time.retain()); } while let Some((time, data)) = input2.next() { data.swap(&mut vector2); stash.entry(time.time().clone()).or_insert(Vec::new()).extend(vector2.drain(..)); notificator.notify_at(time.retain()); } notificator.for_each(&[input1.frontier(), input2.frontier()], |time, _| { if let Some(mut vec) = stash.remove(time.time()) { output.session(&time).give_iterator(vec.drain(..)); } }); } }).inspect_batch(|t, x| println!("{:?} -> {:?}", t, x)); (in1_handle, in2_handle) }); for i in 1..10 { in1.send(i - 1); in1.advance_to(i); in2.send(i - 1); in2.advance_to(i); } in1.close(); in2.close(); }).unwrap();
Implementations
impl<T: Timestamp> FrontierNotificator<T>
[src]
pub fn new() -> Self
[src]
Allocates a new FrontierNotificator
.
pub fn from<I: IntoIterator<Item = Capability<T>>>(iter: I) -> Self
[src]
Allocates a new FrontierNotificator
with initial capabilities.
pub fn notify_at<'a>(&mut self, cap: Capability<T>)
[src]
Requests a notification at the time associated with capability cap
. Takes ownership of
the capability.
In order to request a notification at future timestamp, obtain a capability for the new timestamp first, as shown in the example.
Examples
use timely::dataflow::operators::{ToStream, FrontierNotificator}; use timely::dataflow::operators::generic::operator::Operator; use timely::dataflow::channels::pact::Pipeline; timely::example(|scope| { (0..10).to_stream(scope) .unary_frontier(Pipeline, "example", |_, _| { let mut notificator = FrontierNotificator::new(); move |input, output| { input.for_each(|cap, data| { output.session(&cap).give_vec(&mut data.replace(Vec::new())); let time = cap.time().clone() + 1; notificator.notify_at(cap.delayed(&time)); }); notificator.for_each(&[input.frontier()], |cap, _| { println!("done with time: {:?}", cap.time()); }); } }); });
pub fn notify_at_frontiered<'a>(
&mut self,
cap: Capability<T>,
frontiers: &'a [&'a MutableAntichain<T>]
)
[src]
&mut self,
cap: Capability<T>,
frontiers: &'a [&'a MutableAntichain<T>]
)
Requests a notification at the time associated with capability cap
.
The method takes list of frontiers from which it determines if the capability is immediately available.
When used with the same frontier as make_available
, this method can ensure that notifications are
non-decreasing. Simply using notify_at
will only insert new notifications into the list of pending
notifications, which are only re-examine with calls to make_available
.
pub fn make_available<'a>(&mut self, frontiers: &'a [&'a MutableAntichain<T>])
[src]
Enables pending notifications not in advance of any element of frontiers
.
pub fn next<'a>(
&mut self,
frontiers: &'a [&'a MutableAntichain<T>]
) -> Option<Capability<T>>
[src]
&mut self,
frontiers: &'a [&'a MutableAntichain<T>]
) -> Option<Capability<T>>
Returns the next available capability with respect to the supplied frontiers, if one exists.
In the interest of efficiency, this method may yield capabilities in decreasing order, in certain
circumstances. If you want to iterate through capabilities with an in-order guarantee, either (i)
use for_each
pub fn for_each<'a, F: FnMut(Capability<T>, &mut FrontierNotificator<T>)>(
&mut self,
frontiers: &'a [&'a MutableAntichain<T>],
logic: F
)
[src]
&mut self,
frontiers: &'a [&'a MutableAntichain<T>],
logic: F
)
Repeatedly calls logic
till exhaustion of the notifications made available by inspecting
the frontiers.
logic
receives a capability for t
, the timestamp being notified.
pub fn monotonic<'a>(
&'a mut self,
frontiers: &'a [&'a MutableAntichain<T>],
logging: &'a Option<Logger>
) -> Notificator<'a, T>ⓘNotable traits for Notificator<'a, T>
impl<'a, T: Timestamp> Iterator for Notificator<'a, T> type Item = (Capability<T>, u64);
[src]
&'a mut self,
frontiers: &'a [&'a MutableAntichain<T>],
logging: &'a Option<Logger>
) -> Notificator<'a, T>ⓘ
Notable traits for Notificator<'a, T>
impl<'a, T: Timestamp> Iterator for Notificator<'a, T> type Item = (Capability<T>, u64);
Creates a notificator session in which delivered notification will be non-decreasing.
This implementation can be emulated with judicious use of make_available
and notify_at_frontiered
,
in the event that Notificator
provides too restrictive an interface.
pub fn pending<'a>(&'a self) -> Iter<'a, (Capability<T>, u64)>
[src]
Iterates over pending capabilities and their count. The count represents how often a capability has been requested.
To make sure all pending capabilities are above the frontier, use for_each
or exhaust
next
to consume all available capabilities.
Examples
use timely::dataflow::operators::{ToStream, FrontierNotificator}; use timely::dataflow::operators::generic::operator::Operator; use timely::dataflow::channels::pact::Pipeline; timely::example(|scope| { (0..10).to_stream(scope) .unary_frontier(Pipeline, "example", |_, _| { let mut notificator = FrontierNotificator::new(); move |input, output| { input.for_each(|cap, data| { output.session(&cap).give_vec(&mut data.replace(Vec::new())); let time = cap.time().clone() + 1; notificator.notify_at(cap.delayed(&time)); assert_eq!(notificator.pending().filter(|t| t.0.time() == &time).count(), 1); }); notificator.for_each(&[input.frontier()], |cap, _| { println!("done with time: {:?}", cap.time()); }); } }); });
Auto Trait Implementations
impl<T> !RefUnwindSafe for FrontierNotificator<T>
impl<T> !Send for FrontierNotificator<T>
impl<T> !Sync for FrontierNotificator<T>
impl<T> Unpin for FrontierNotificator<T> where
T: Unpin,
T: Unpin,
impl<T> !UnwindSafe for FrontierNotificator<T>
Blanket Implementations
impl<T> Any for T where
T: 'static + ?Sized,
[src]
T: 'static + ?Sized,
impl<T> Borrow<T> for T where
T: ?Sized,
[src]
T: ?Sized,
impl<T> BorrowMut<T> for T where
T: ?Sized,
[src]
T: ?Sized,
pub fn borrow_mut(&mut self) -> &mut T
[src]
impl<T> From<T> for T
[src]
impl<T, U> Into<U> for T where
U: From<T>,
[src]
U: From<T>,
impl<T, U> TryFrom<U> for T where
U: Into<T>,
[src]
U: Into<T>,
type Error = Infallible
The type returned in the event of a conversion error.
pub fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>
[src]
impl<T, U> TryInto<U> for T where
U: TryFrom<T>,
[src]
U: TryFrom<T>,