timely/dataflow/operators/generic/
notificator.rs

1use crate::progress::frontier::{AntichainRef, MutableAntichain};
2use crate::progress::Timestamp;
3use crate::dataflow::operators::Capability;
4use crate::logging::TimelyLogger as Logger;
5
6/// Tracks requests for notification and delivers available notifications.
7///
8/// A `Notificator` represents a dynamic set of notifications and a fixed notification frontier.
9/// One can interact with one by requesting notification with `notify_at`, and retrieving notifications
10/// with `for_each` and `next`. The next notification to be delivered will be the available notification
11/// with the least timestamp, with the implication that the notifications will be non-decreasing as long
12/// as you do not request notifications at times prior to those that have already been delivered.
13///
14/// Notification requests persist across uses of `Notificator`, and it may help to think of `Notificator`
15/// as a notification *session*. However, idiomatically it seems you mostly want to restrict your usage
16/// to such sessions, which is why this is the main notificator type.
17#[derive(Debug)]
18pub struct Notificator<'a, T: Timestamp> {
19    frontiers: &'a [&'a MutableAntichain<T>],
20    inner: &'a mut FrontierNotificator<T>,
21    logging: &'a Option<Logger>,
22}
23
24impl<'a, T: Timestamp> Notificator<'a, T> {
25    /// Allocates a new `Notificator`.
26    ///
27    /// This is more commonly accomplished using `input.monotonic(frontiers)`.
28    pub fn new(
29        frontiers: &'a [&'a MutableAntichain<T>],
30        inner: &'a mut FrontierNotificator<T>,
31        logging: &'a Option<Logger>) -> Self {
32
33        inner.make_available(frontiers);
34
35        Notificator {
36            frontiers,
37            inner,
38            logging,
39        }
40    }
41
42    /// Reveals the elements in the frontier of the indicated input.
43    pub fn frontier(&self, input: usize) -> AntichainRef<'_, T> {
44        self.frontiers[input].frontier()
45    }
46
47    /// Requests a notification at the time associated with capability `cap`.
48    ///
49    /// In order to request a notification at future timestamp, obtain a capability for the new
50    /// timestamp first, as show in the example.
51    ///
52    /// # Examples
53    /// ```
54    /// use timely::dataflow::operators::ToStream;
55    /// use timely::dataflow::operators::generic::Operator;
56    /// use timely::dataflow::channels::pact::Pipeline;
57    ///
58    /// timely::example(|scope| {
59    ///     (0..10).to_stream(scope)
60    ///            .unary_notify(Pipeline, "example", Some(0), |input, output, notificator| {
61    ///                input.for_each(|cap, data| {
62    ///                    output.session(&cap).give_container(data);
63    ///                    let time = cap.time().clone() + 1;
64    ///                    notificator.notify_at(cap.delayed(&time));
65    ///                });
66    ///                notificator.for_each(|cap, count, _| {
67    ///                    println!("done with time: {:?}, requested {} times", cap.time(), count);
68    ///                    assert!(*cap.time() == 0 && count == 2 || count == 1);
69    ///                });
70    ///            });
71    /// });
72    /// ```
73    #[inline]
74    pub fn notify_at(&mut self, cap: Capability<T>) {
75        self.inner.notify_at_frontiered(cap, self.frontiers);
76    }
77
78    /// Repeatedly calls `logic` until exhaustion of the available notifications.
79    ///
80    /// `logic` receives a capability for `t`, the timestamp being notified and a `count`
81    /// representing how many capabilities were requested for that specific timestamp.
82    #[inline]
83    pub fn for_each<F: FnMut(Capability<T>, u64, &mut Notificator<T>)>(&mut self, mut logic: F) {
84        while let Some((cap, count)) = self.next() {
85            self.logging.as_ref().map(|l| l.log(crate::logging::GuardedProgressEvent { is_start: true }));
86            logic(cap, count, self);
87            self.logging.as_ref().map(|l| l.log(crate::logging::GuardedProgressEvent { is_start: false }));
88        }
89    }
90}
91
92impl<T: Timestamp> Iterator for Notificator<'_, T> {
93    type Item = (Capability<T>, u64);
94
95    /// Retrieve the next available notification.
96    ///
97    /// Returns `None` if no notification is available. Returns `Some(cap, count)` otherwise:
98    /// `cap` is a capability for `t`, the timestamp being notified and, `count` represents
99    /// how many notifications (out of those requested) are being delivered for that specific
100    /// timestamp.
101    #[inline]
102    fn next(&mut self) -> Option<(Capability<T>, u64)> {
103        self.inner.next_count(self.frontiers)
104    }
105}
106
107#[test]
108fn notificator_delivers_notifications_in_topo_order() {
109    use std::rc::Rc;
110    use std::cell::RefCell;
111    use crate::progress::ChangeBatch;
112    use crate::progress::frontier::MutableAntichain;
113    use crate::order::Product;
114    use crate::dataflow::operators::capability::Capability;
115
116    let mut frontier = MutableAntichain::new_bottom(Product::new(0, 0));
117
118    let root_capability = Capability::new(Product::new(0,0), Rc::new(RefCell::new(ChangeBatch::new())));
119
120    let logging = None;//::logging::new_inactive_logger();
121
122    // notificator.update_frontier_from_cm(&mut vec![ChangeBatch::new_from(ts_from_tuple((0, 0)), 1)]);
123    let times = [
124        Product::new(3, 5),
125        Product::new(5, 4),
126        Product::new(1, 2),
127        Product::new(1, 1),
128        Product::new(1, 1),
129        Product::new(5, 4),
130        Product::new(6, 0),
131        Product::new(6, 2),
132        Product::new(5, 8),
133    ];
134
135    // create a raw notificator with pending notifications at the times above.
136    let mut frontier_notificator = FrontierNotificator::from(times.iter().map(|t| root_capability.delayed(t)));
137
138    // the frontier is initially (0,0), and so we should deliver no notifications.
139    assert!(frontier_notificator.monotonic(&[&frontier], &logging).next().is_none());
140
141    // advance the frontier to [(5,7), (6,0)], opening up some notifications.
142    frontier.update_iter(vec![(Product::new(0,0),-1), (Product::new(5,7), 1), (Product::new(6,1), 1)]);
143
144    {
145        let frontiers = [&frontier];
146        let mut notificator = frontier_notificator.monotonic(&frontiers, &logging);
147
148        // we should deliver the following available notifications, in this order.
149        assert_eq!(notificator.next().unwrap().0.time(), &Product::new(1,1));
150        assert_eq!(notificator.next().unwrap().0.time(), &Product::new(1,2));
151        assert_eq!(notificator.next().unwrap().0.time(), &Product::new(3,5));
152        assert_eq!(notificator.next().unwrap().0.time(), &Product::new(5,4));
153        assert_eq!(notificator.next().unwrap().0.time(), &Product::new(6,0));
154        assert_eq!(notificator.next(), None);
155    }
156
157    // advance the frontier to [(6,10)] opening up all remaining notifications.
158    frontier.update_iter(vec![(Product::new(5,7), -1), (Product::new(6,1), -1), (Product::new(6,10), 1)]);
159
160    {
161        let frontiers = [&frontier];
162        let mut notificator = frontier_notificator.monotonic(&frontiers, &logging);
163
164        // the first available notification should be (5,8). Note: before (6,0) in the total order, but not
165        // in the partial order. We don't make the promise that we respect the total order.
166        assert_eq!(notificator.next().unwrap().0.time(), &Product::new(5, 8));
167
168        // add a new notification, mid notification session.
169        notificator.notify_at(root_capability.delayed(&Product::new(5,9)));
170
171        // we expect to see (5,9) before we see (6,2) before we see None.
172        assert_eq!(notificator.next().unwrap().0.time(), &Product::new(5,9));
173        assert_eq!(notificator.next().unwrap().0.time(), &Product::new(6,2));
174        assert_eq!(notificator.next(), None);
175    }
176}
177
178/// Tracks requests for notification and delivers available notifications.
179///
180/// `FrontierNotificator` is meant to manage the delivery of requested notifications in the
181/// presence of inputs that may have outstanding messages to deliver.
182/// The notificator inspects the frontiers, as presented from the outside, for each input.
183/// Requested notifications can be served only once there are no frontier elements less-or-equal
184/// to them, and there are no other pending notification requests less than them. Each will be
185/// less-or-equal to itself, so we want to dodge that corner case.
186///
187/// # Examples
188/// ```
189/// use std::collections::HashMap;
190/// use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
191/// use timely::dataflow::operators::generic::operator::Operator;
192/// use timely::dataflow::channels::pact::Pipeline;
193///
194/// timely::execute(timely::Config::thread(), |worker| {
195///     let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
196///         let (in1_handle, in1) = scope.new_input();
197///         let (in2_handle, in2) = scope.new_input();
198///         in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| {
199///             let mut notificator = FrontierNotificator::default();
200///             let mut stash = HashMap::new();
201///             move |input1, input2, output| {
202///                 while let Some((time, data)) = input1.next() {
203///                     stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.drain(..));
204///                     notificator.notify_at(time.retain());
205///                 }
206///                 while let Some((time, data)) = input2.next() {
207///                     stash.entry(time.time().clone()).or_insert(Vec::new()).extend(data.drain(..));
208///                     notificator.notify_at(time.retain());
209///                 }
210///                 notificator.for_each(&[input1.frontier(), input2.frontier()], |time, _| {
211///                     if let Some(mut vec) = stash.remove(time.time()) {
212///                         output.session(&time).give_iterator(vec.drain(..));
213///                     }
214///                 });
215///             }
216///         })
217///         .container::<Vec<_>>()
218///         .inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
219///
220///         (in1_handle, in2_handle)
221///     });
222///
223///     for i in 1..10 {
224///         in1.send(i - 1);
225///         in1.advance_to(i);
226///         in2.send(i - 1);
227///         in2.advance_to(i);
228///     }
229///     in1.close();
230///     in2.close();
231/// }).unwrap();
232/// ```
233#[derive(Debug)]
234pub struct FrontierNotificator<T: Timestamp> {
235    pending: Vec<(Capability<T>, u64)>,
236    available: ::std::collections::BinaryHeap<OrderReversed<T>>,
237}
238
239impl<T: Timestamp> Default for FrontierNotificator<T> {
240    fn default() -> Self {
241        FrontierNotificator {
242            pending: Vec::new(),
243            available: ::std::collections::BinaryHeap::new(),
244        }
245    }
246}
247
248impl<T: Timestamp> FrontierNotificator<T> {
249    /// Allocates a new `FrontierNotificator` with initial capabilities.
250    pub fn from<I: IntoIterator<Item=Capability<T>>>(iter: I) -> Self {
251        FrontierNotificator {
252            pending: iter.into_iter().map(|x| (x,1)).collect(),
253            available: ::std::collections::BinaryHeap::new(),
254        }
255    }
256
257    /// Requests a notification at the time associated with capability `cap`. Takes ownership of
258    /// the capability.
259    ///
260    /// In order to request a notification at future timestamp, obtain a capability for the new
261    /// timestamp first, as shown in the example.
262    ///
263    /// # Examples
264    /// ```
265    /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
266    /// use timely::dataflow::operators::generic::operator::Operator;
267    /// use timely::dataflow::channels::pact::Pipeline;
268    ///
269    /// timely::example(|scope| {
270    ///     (0..10).to_stream(scope)
271    ///            .unary_frontier(Pipeline, "example", |_, _| {
272    ///                let mut notificator = FrontierNotificator::default();
273    ///                move |input, output| {
274    ///                    input.for_each(|cap, data| {
275    ///                        output.session(&cap).give_container(data);
276    ///                        let time = cap.time().clone() + 1;
277    ///                        notificator.notify_at(cap.delayed(&time));
278    ///                    });
279    ///                    notificator.for_each(&[input.frontier()], |cap, _| {
280    ///                        println!("done with time: {:?}", cap.time());
281    ///                    });
282    ///                }
283    ///            });
284    /// });
285    /// ```
286    #[inline]
287    pub fn notify_at(&mut self, cap: Capability<T>) {
288        self.pending.push((cap,1));
289    }
290
291    /// Requests a notification at the time associated with capability `cap`.
292    ///
293    /// The method takes list of frontiers from which it determines if the capability is immediately available.
294    /// When used with the same frontier as `make_available`, this method can ensure that notifications are
295    /// non-decreasing. Simply using `notify_at` will only insert new notifications into the list of pending
296    /// notifications, which are only re-examine with calls to `make_available`.
297    #[inline]
298    pub fn notify_at_frontiered<'a>(&mut self, cap: Capability<T>, frontiers: &'a [&'a MutableAntichain<T>]) {
299        if frontiers.iter().all(|f| !f.less_equal(cap.time())) {
300            self.available.push(OrderReversed::new(cap, 1));
301        }
302        else {
303            self.pending.push((cap,1));
304        }
305    }
306
307    /// Enables pending notifications not in advance of any element of `frontiers`.
308    pub fn make_available<'a>(&mut self, frontiers: &'a [&'a MutableAntichain<T>]) {
309
310        // By invariant, nothing in self.available is greater_equal anything in self.pending.
311        // It should be safe to append any ordered subset of self.pending to self.available,
312        // in that the sequence of capabilities in self.available will remain non-decreasing.
313
314        if !self.pending.is_empty() {
315
316            self.pending.sort_by(|x,y| x.0.time().cmp(y.0.time()));
317            for i in 0 .. self.pending.len() - 1 {
318                if self.pending[i].0.time() == self.pending[i+1].0.time() {
319                    self.pending[i+1].1 += self.pending[i].1;
320                    self.pending[i].1 = 0;
321                }
322            }
323            self.pending.retain(|x| x.1 > 0);
324
325            for i in 0 .. self.pending.len() {
326                if frontiers.iter().all(|f| !f.less_equal(&self.pending[i].0)) {
327                    // TODO : This clones a capability, whereas we could move it instead.
328                    self.available.push(OrderReversed::new(self.pending[i].0.clone(), self.pending[i].1));
329                    self.pending[i].1 = 0;
330                }
331            }
332            self.pending.retain(|x| x.1 > 0);
333        }
334    }
335
336    /// Returns the next available capability with respect to the supplied frontiers, if one exists,
337    /// and the count of how many instances are found.
338    ///
339    /// In the interest of efficiency, this method may yield capabilities in decreasing order, in certain
340    /// circumstances. If you want to iterate through capabilities with an in-order guarantee, either (i)
341    /// use `for_each`, or (ii) call `make_available` first.
342    #[inline]
343    pub fn next_count<'a>(&mut self, frontiers: &'a [&'a MutableAntichain<T>]) -> Option<(Capability<T>, u64)> {
344        if self.available.is_empty() {
345            self.make_available(frontiers);
346        }
347        self.available.pop().map(|front| {
348            let mut count = front.value;
349            while self.available.peek() == Some(&front) {
350                count += self.available.pop().unwrap().value;
351            }
352            (front.element, count)
353        })
354    }
355
356    /// Returns the next available capability with respect to the supplied frontiers, if one exists.
357    ///
358    /// In the interest of efficiency, this method may yield capabilities in decreasing order, in certain
359    /// circumstances. If you want to iterate through capabilities with an in-order guarantee, either (i)
360    /// use `for_each`, or (ii) call `make_available` first.
361    #[inline]
362    pub fn next<'a>(&mut self, frontiers: &'a [&'a MutableAntichain<T>]) -> Option<Capability<T>> {
363        self.next_count(frontiers).map(|(cap, _)| cap)
364    }
365
366    /// Repeatedly calls `logic` till exhaustion of the notifications made available by inspecting
367    /// the frontiers.
368    ///
369    /// `logic` receives a capability for `t`, the timestamp being notified.
370    #[inline]
371    pub fn for_each<'a, F: FnMut(Capability<T>, &mut FrontierNotificator<T>)>(&mut self, frontiers: &'a [&'a MutableAntichain<T>], mut logic: F) {
372        self.make_available(frontiers);
373        while let Some(cap) = self.next(frontiers) {
374            logic(cap, self);
375        }
376    }
377
378    /// Creates a notificator session in which delivered notification will be non-decreasing.
379    ///
380    /// This implementation can be emulated with judicious use of `make_available` and `notify_at_frontiered`,
381    /// in the event that `Notificator` provides too restrictive an interface.
382    #[inline]
383    pub fn monotonic<'a>(&'a mut self, frontiers: &'a [&'a MutableAntichain<T>], logging: &'a Option<Logger>) -> Notificator<'a, T> {
384        Notificator::new(frontiers, self, logging)
385    }
386
387    /// Iterates over pending capabilities and their count. The count represents how often a
388    /// capability has been requested.
389    ///
390    /// To make sure all pending capabilities are above the frontier, use `for_each` or exhaust
391    /// `next` to consume all available capabilities.
392    ///
393    /// # Examples
394    /// ```
395    /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
396    /// use timely::dataflow::operators::generic::operator::Operator;
397    /// use timely::dataflow::channels::pact::Pipeline;
398    ///
399    /// timely::example(|scope| {
400    ///     (0..10).to_stream(scope)
401    ///            .unary_frontier(Pipeline, "example", |_, _| {
402    ///                let mut notificator = FrontierNotificator::default();
403    ///                move |input, output| {
404    ///                    input.for_each(|cap, data| {
405    ///                        output.session(&cap).give_container(data);
406    ///                        let time = cap.time().clone() + 1;
407    ///                        notificator.notify_at(cap.delayed(&time));
408    ///                        assert_eq!(notificator.pending().filter(|t| t.0.time() == &time).count(), 1);
409    ///                    });
410    ///                    notificator.for_each(&[input.frontier()], |cap, _| {
411    ///                        println!("done with time: {:?}", cap.time());
412    ///                    });
413    ///                }
414    ///            });
415    /// });
416    /// ```
417    pub fn pending(&self) -> ::std::slice::Iter<'_, (Capability<T>, u64)> {
418        self.pending.iter()
419    }
420}
421
422#[derive(Debug, PartialEq, Eq)]
423struct OrderReversed<T: Timestamp> {
424    element: Capability<T>,
425    value: u64,
426}
427
428impl<T: Timestamp> OrderReversed<T> {
429    fn new(element: Capability<T>, value: u64) -> Self { OrderReversed { element, value} }
430}
431
432impl<T: Timestamp> PartialOrd for OrderReversed<T> {
433    fn partial_cmp(&self, other: &Self) -> Option<::std::cmp::Ordering> {
434        Some(self.cmp(other))
435    }
436}
437impl<T: Timestamp> Ord for OrderReversed<T> {
438    fn cmp(&self, other: &Self) -> ::std::cmp::Ordering {
439        other.element.time().cmp(self.element.time())
440    }
441}