timely/dataflow/operators/generic/notificator.rs
1use crate::progress::frontier::{AntichainRef, MutableAntichain};
2use crate::progress::Timestamp;
3use crate::dataflow::operators::Capability;
4use crate::logging::TimelyLogger as Logger;
5
6/// Tracks requests for notification and delivers available notifications.
7///
8/// A `Notificator` represents a dynamic set of notifications and a fixed notification frontier.
9/// One can interact with one by requesting notification with `notify_at`, and retrieving notifications
10/// with `for_each` and `next`. The next notification to be delivered will be the available notification
11/// with the least timestamp, with the implication that the notifications will be non-decreasing as long
12/// as you do not request notifications at times prior to those that have already been delivered.
13///
14/// Notification requests persist across uses of `Notificator`, and it may help to think of `Notificator`
15/// as a notification *session*. However, idiomatically it seems you mostly want to restrict your usage
16/// to such sessions, which is why this is the main notificator type.
17#[derive(Debug)]
18pub struct Notificator<'a, T: Timestamp> {
19 frontiers: &'a [&'a MutableAntichain<T>],
20 inner: &'a mut FrontierNotificator<T>,
21 logging: &'a Option<Logger>,
22}
23
24impl<'a, T: Timestamp> Notificator<'a, T> {
25 /// Allocates a new `Notificator`.
26 ///
27 /// This is more commonly accomplished using `input.monotonic(frontiers)`.
28 pub fn new(
29 frontiers: &'a [&'a MutableAntichain<T>],
30 inner: &'a mut FrontierNotificator<T>,
31 logging: &'a Option<Logger>) -> Self {
32
33 inner.make_available(frontiers);
34
35 Notificator {
36 frontiers,
37 inner,
38 logging,
39 }
40 }
41
42 /// Reveals the elements in the frontier of the indicated input.
43 pub fn frontier(&self, input: usize) -> AntichainRef<T> {
44 self.frontiers[input].frontier()
45 }
46
47 /// Requests a notification at the time associated with capability `cap`.
48 ///
49 /// In order to request a notification at future timestamp, obtain a capability for the new
50 /// timestamp first, as show in the example.
51 ///
52 /// # Examples
53 /// ```
54 /// use timely::dataflow::operators::ToStream;
55 /// use timely::dataflow::operators::generic::Operator;
56 /// use timely::dataflow::channels::pact::Pipeline;
57 ///
58 /// timely::example(|scope| {
59 /// (0..10).to_stream(scope)
60 /// .unary_notify(Pipeline, "example", Some(0), |input, output, notificator| {
61 /// input.for_each(|cap, data| {
62 /// output.session(&cap).give_vec(&mut data.replace(Vec::new()));
63 /// let time = cap.time().clone() + 1;
64 /// notificator.notify_at(cap.delayed(&time));
65 /// });
66 /// notificator.for_each(|cap, count, _| {
67 /// println!("done with time: {:?}, requested {} times", cap.time(), count);
68 /// assert!(*cap.time() == 0 && count == 2 || count == 1);
69 /// });
70 /// });
71 /// });
72 /// ```
73 #[inline]
74 pub fn notify_at(&mut self, cap: Capability<T>) {
75 self.inner.notify_at_frontiered(cap, self.frontiers);
76 }
77
78 /// Repeatedly calls `logic` until exhaustion of the available notifications.
79 ///
80 /// `logic` receives a capability for `t`, the timestamp being notified and a `count`
81 /// representing how many capabilities were requested for that specific timestamp.
82 #[inline]
83 pub fn for_each<F: FnMut(Capability<T>, u64, &mut Notificator<T>)>(&mut self, mut logic: F) {
84 while let Some((cap, count)) = self.next() {
85 self.logging.as_ref().map(|l| l.log(crate::logging::GuardedProgressEvent { is_start: true }));
86 logic(cap, count, self);
87 self.logging.as_ref().map(|l| l.log(crate::logging::GuardedProgressEvent { is_start: false }));
88 }
89 }
90}
91
92impl<'a, T: Timestamp> Iterator for Notificator<'a, T> {
93 type Item = (Capability<T>, u64);
94
95 /// Retrieve the next available notification.
96 ///
97 /// Returns `None` if no notification is available. Returns `Some(cap, count)` otherwise:
98 /// `cap` is a capability for `t`, the timestamp being notified and, `count` represents
99 /// how many notifications (out of those requested) are being delivered for that specific
100 /// timestamp.
101 #[inline]
102 fn next(&mut self) -> Option<(Capability<T>, u64)> {
103 self.inner.next_count(self.frontiers)
104 }
105}
106
107#[test]
108fn notificator_delivers_notifications_in_topo_order() {
109 use std::rc::Rc;
110 use std::cell::RefCell;
111 use crate::progress::ChangeBatch;
112 use crate::progress::frontier::MutableAntichain;
113 use crate::order::Product;
114 use crate::dataflow::operators::capability::Capability;
115
116 let mut frontier = MutableAntichain::new_bottom(Product::new(0, 0));
117
118 let root_capability = Capability::new(Product::new(0,0), Rc::new(RefCell::new(ChangeBatch::new())));
119
120 let logging = None;//::logging::new_inactive_logger();
121
122 // notificator.update_frontier_from_cm(&mut vec![ChangeBatch::new_from(ts_from_tuple((0, 0)), 1)]);
123 let times = vec![
124 Product::new(3, 5),
125 Product::new(5, 4),
126 Product::new(1, 2),
127 Product::new(1, 1),
128 Product::new(1, 1),
129 Product::new(5, 4),
130 Product::new(6, 0),
131 Product::new(6, 2),
132 Product::new(5, 8),
133 ];
134
135 // create a raw notificator with pending notifications at the times above.
136 let mut frontier_notificator = FrontierNotificator::from(times.iter().map(|t| root_capability.delayed(t)));
137
138 // the frontier is initially (0,0), and so we should deliver no notifications.
139 assert!(frontier_notificator.monotonic(&[&frontier], &logging).next().is_none());
140
141 // advance the frontier to [(5,7), (6,0)], opening up some notifications.
142 frontier.update_iter(vec![(Product::new(0,0),-1), (Product::new(5,7), 1), (Product::new(6,1), 1)]);
143
144 {
145 let frontiers = [&frontier];
146 let mut notificator = frontier_notificator.monotonic(&frontiers, &logging);
147
148 // we should deliver the following available notifications, in this order.
149 assert_eq!(notificator.next().unwrap().0.time(), &Product::new(1,1));
150 assert_eq!(notificator.next().unwrap().0.time(), &Product::new(1,2));
151 assert_eq!(notificator.next().unwrap().0.time(), &Product::new(3,5));
152 assert_eq!(notificator.next().unwrap().0.time(), &Product::new(5,4));
153 assert_eq!(notificator.next().unwrap().0.time(), &Product::new(6,0));
154 assert_eq!(notificator.next(), None);
155 }
156
157 // advance the frontier to [(6,10)] opening up all remaining notifications.
158 frontier.update_iter(vec![(Product::new(5,7), -1), (Product::new(6,1), -1), (Product::new(6,10), 1)]);
159
160 {
161 let frontiers = [&frontier];
162 let mut notificator = frontier_notificator.monotonic(&frontiers, &logging);
163
164 // the first available notification should be (5,8). Note: before (6,0) in the total order, but not
165 // in the partial order. We don't make the promise that we respect the total order.
166 assert_eq!(notificator.next().unwrap().0.time(), &Product::new(5, 8));
167
168 // add a new notification, mid notification session.
169 notificator.notify_at(root_capability.delayed(&Product::new(5,9)));
170
171 // we expect to see (5,9) before we see (6,2) before we see None.
172 assert_eq!(notificator.next().unwrap().0.time(), &Product::new(5,9));
173 assert_eq!(notificator.next().unwrap().0.time(), &Product::new(6,2));
174 assert_eq!(notificator.next(), None);
175 }
176}
177
178/// Tracks requests for notification and delivers available notifications.
179///
180/// `FrontierNotificator` is meant to manage the delivery of requested notifications in the
181/// presence of inputs that may have outstanding messages to deliver.
182/// The notificator inspects the frontiers, as presented from the outside, for each input.
183/// Requested notifications can be served only once there are no frontier elements less-or-equal
184/// to them, and there are no other pending notification requests less than them. Each will be
185/// less-or-equal to itself, so we want to dodge that corner case.
186///
187/// # Examples
188/// ```
189/// use std::collections::HashMap;
190/// use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
191/// use timely::dataflow::operators::generic::operator::Operator;
192/// use timely::dataflow::channels::pact::Pipeline;
193///
194/// timely::execute(timely::Config::thread(), |worker| {
195/// let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
196/// let (in1_handle, in1) = scope.new_input();
197/// let (in2_handle, in2) = scope.new_input();
198/// in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| {
199/// let mut notificator = FrontierNotificator::new();
200/// let mut stash = HashMap::new();
201/// let mut vector1 = Vec::new();
202/// let mut vector2 = Vec::new();
203/// move |input1, input2, output| {
204/// while let Some((time, data)) = input1.next() {
205/// data.swap(&mut vector1);
206/// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(vector1.drain(..));
207/// notificator.notify_at(time.retain());
208/// }
209/// while let Some((time, data)) = input2.next() {
210/// data.swap(&mut vector2);
211/// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(vector2.drain(..));
212/// notificator.notify_at(time.retain());
213/// }
214/// notificator.for_each(&[input1.frontier(), input2.frontier()], |time, _| {
215/// if let Some(mut vec) = stash.remove(time.time()) {
216/// output.session(&time).give_iterator(vec.drain(..));
217/// }
218/// });
219/// }
220/// }).inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
221///
222/// (in1_handle, in2_handle)
223/// });
224///
225/// for i in 1..10 {
226/// in1.send(i - 1);
227/// in1.advance_to(i);
228/// in2.send(i - 1);
229/// in2.advance_to(i);
230/// }
231/// in1.close();
232/// in2.close();
233/// }).unwrap();
234/// ```
235#[derive(Debug)]
236pub struct FrontierNotificator<T: Timestamp> {
237 pending: Vec<(Capability<T>, u64)>,
238 available: ::std::collections::BinaryHeap<OrderReversed<T>>,
239}
240
241impl<T: Timestamp> FrontierNotificator<T> {
242 /// Allocates a new `FrontierNotificator`.
243 pub fn new() -> Self {
244 FrontierNotificator {
245 pending: Vec::new(),
246 available: ::std::collections::BinaryHeap::new(),
247 }
248 }
249
250 /// Allocates a new `FrontierNotificator` with initial capabilities.
251 pub fn from<I: IntoIterator<Item=Capability<T>>>(iter: I) -> Self {
252 FrontierNotificator {
253 pending: iter.into_iter().map(|x| (x,1)).collect(),
254 available: ::std::collections::BinaryHeap::new(),
255 }
256 }
257
258 /// Requests a notification at the time associated with capability `cap`. Takes ownership of
259 /// the capability.
260 ///
261 /// In order to request a notification at future timestamp, obtain a capability for the new
262 /// timestamp first, as shown in the example.
263 ///
264 /// # Examples
265 /// ```
266 /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
267 /// use timely::dataflow::operators::generic::operator::Operator;
268 /// use timely::dataflow::channels::pact::Pipeline;
269 ///
270 /// timely::example(|scope| {
271 /// (0..10).to_stream(scope)
272 /// .unary_frontier(Pipeline, "example", |_, _| {
273 /// let mut notificator = FrontierNotificator::new();
274 /// move |input, output| {
275 /// input.for_each(|cap, data| {
276 /// output.session(&cap).give_vec(&mut data.replace(Vec::new()));
277 /// let time = cap.time().clone() + 1;
278 /// notificator.notify_at(cap.delayed(&time));
279 /// });
280 /// notificator.for_each(&[input.frontier()], |cap, _| {
281 /// println!("done with time: {:?}", cap.time());
282 /// });
283 /// }
284 /// });
285 /// });
286 /// ```
287 #[inline]
288 pub fn notify_at(&mut self, cap: Capability<T>) {
289 self.pending.push((cap,1));
290 }
291
292 /// Requests a notification at the time associated with capability `cap`.
293 ///
294 /// The method takes list of frontiers from which it determines if the capability is immediately available.
295 /// When used with the same frontier as `make_available`, this method can ensure that notifications are
296 /// non-decreasing. Simply using `notify_at` will only insert new notifications into the list of pending
297 /// notifications, which are only re-examine with calls to `make_available`.
298 #[inline]
299 pub fn notify_at_frontiered<'a>(&mut self, cap: Capability<T>, frontiers: &'a [&'a MutableAntichain<T>]) {
300 if frontiers.iter().all(|f| !f.less_equal(cap.time())) {
301 self.available.push(OrderReversed::new(cap, 1));
302 }
303 else {
304 self.pending.push((cap,1));
305 }
306 }
307
308 /// Enables pending notifications not in advance of any element of `frontiers`.
309 pub fn make_available<'a>(&mut self, frontiers: &'a [&'a MutableAntichain<T>]) {
310
311 // By invariant, nothing in self.available is greater_equal anything in self.pending.
312 // It should be safe to append any ordered subset of self.pending to self.available,
313 // in that the sequence of capabilities in self.available will remain non-decreasing.
314
315 if !self.pending.is_empty() {
316
317 self.pending.sort_by(|x,y| x.0.time().cmp(y.0.time()));
318 for i in 0 .. self.pending.len() - 1 {
319 if self.pending[i].0.time() == self.pending[i+1].0.time() {
320 self.pending[i+1].1 += self.pending[i].1;
321 self.pending[i].1 = 0;
322 }
323 }
324 self.pending.retain(|x| x.1 > 0);
325
326 for i in 0 .. self.pending.len() {
327 if frontiers.iter().all(|f| !f.less_equal(&self.pending[i].0)) {
328 // TODO : This clones a capability, whereas we could move it instead.
329 self.available.push(OrderReversed::new(self.pending[i].0.clone(), self.pending[i].1));
330 self.pending[i].1 = 0;
331 }
332 }
333 self.pending.retain(|x| x.1 > 0);
334 }
335 }
336
337 /// Returns the next available capability with respect to the supplied frontiers, if one exists,
338 /// and the count of how many instances are found.
339 ///
340 /// In the interest of efficiency, this method may yield capabilities in decreasing order, in certain
341 /// circumstances. If you want to iterate through capabilities with an in-order guarantee, either (i)
342 /// use `for_each`, or (ii) call `make_available` first.
343 #[inline]
344 pub fn next_count<'a>(&mut self, frontiers: &'a [&'a MutableAntichain<T>]) -> Option<(Capability<T>, u64)> {
345 if self.available.is_empty() {
346 self.make_available(frontiers);
347 }
348 self.available.pop().map(|front| {
349 let mut count = front.value;
350 while self.available.peek() == Some(&front) {
351 count += self.available.pop().unwrap().value;
352 }
353 (front.element, count)
354 })
355 }
356
357 /// Returns the next available capability with respect to the supplied frontiers, if one exists.
358 ///
359 /// In the interest of efficiency, this method may yield capabilities in decreasing order, in certain
360 /// circumstances. If you want to iterate through capabilities with an in-order guarantee, either (i)
361 /// use `for_each`, or (ii) call `make_available` first.
362 #[inline]
363 pub fn next<'a>(&mut self, frontiers: &'a [&'a MutableAntichain<T>]) -> Option<Capability<T>> {
364 self.next_count(frontiers).map(|(cap, _)| cap)
365 }
366
367 /// Repeatedly calls `logic` till exhaustion of the notifications made available by inspecting
368 /// the frontiers.
369 ///
370 /// `logic` receives a capability for `t`, the timestamp being notified.
371 #[inline]
372 pub fn for_each<'a, F: FnMut(Capability<T>, &mut FrontierNotificator<T>)>(&mut self, frontiers: &'a [&'a MutableAntichain<T>], mut logic: F) {
373 self.make_available(frontiers);
374 while let Some(cap) = self.next(frontiers) {
375 logic(cap, self);
376 }
377 }
378
379 /// Creates a notificator session in which delivered notification will be non-decreasing.
380 ///
381 /// This implementation can be emulated with judicious use of `make_available` and `notify_at_frontiered`,
382 /// in the event that `Notificator` provides too restrictive an interface.
383 #[inline]
384 pub fn monotonic<'a>(&'a mut self, frontiers: &'a [&'a MutableAntichain<T>], logging: &'a Option<Logger>) -> Notificator<'a, T> {
385 Notificator::new(frontiers, self, logging)
386 }
387
388 /// Iterates over pending capabilities and their count. The count represents how often a
389 /// capability has been requested.
390 ///
391 /// To make sure all pending capabilities are above the frontier, use `for_each` or exhaust
392 /// `next` to consume all available capabilities.
393 ///
394 /// # Examples
395 /// ```
396 /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
397 /// use timely::dataflow::operators::generic::operator::Operator;
398 /// use timely::dataflow::channels::pact::Pipeline;
399 ///
400 /// timely::example(|scope| {
401 /// (0..10).to_stream(scope)
402 /// .unary_frontier(Pipeline, "example", |_, _| {
403 /// let mut notificator = FrontierNotificator::new();
404 /// move |input, output| {
405 /// input.for_each(|cap, data| {
406 /// output.session(&cap).give_vec(&mut data.replace(Vec::new()));
407 /// let time = cap.time().clone() + 1;
408 /// notificator.notify_at(cap.delayed(&time));
409 /// assert_eq!(notificator.pending().filter(|t| t.0.time() == &time).count(), 1);
410 /// });
411 /// notificator.for_each(&[input.frontier()], |cap, _| {
412 /// println!("done with time: {:?}", cap.time());
413 /// });
414 /// }
415 /// });
416 /// });
417 /// ```
418 pub fn pending(&self) -> ::std::slice::Iter<'_, (Capability<T>, u64)> {
419 self.pending.iter()
420 }
421}
422
423#[derive(Debug, PartialEq, Eq)]
424struct OrderReversed<T: Timestamp> {
425 element: Capability<T>,
426 value: u64,
427}
428
429impl<T: Timestamp> OrderReversed<T> {
430 fn new(element: Capability<T>, value: u64) -> Self { OrderReversed { element, value} }
431}
432
433impl<T: Timestamp> PartialOrd for OrderReversed<T> {
434 fn partial_cmp(&self, other: &Self) -> Option<::std::cmp::Ordering> {
435 other.element.time().partial_cmp(self.element.time())
436 }
437}
438impl<T: Timestamp> Ord for OrderReversed<T> {
439 fn cmp(&self, other: &Self) -> ::std::cmp::Ordering {
440 other.element.time().cmp(self.element.time())
441 }
442}