timely/dataflow/operators/
probe.rs

1//! Monitor progress at a `Stream`.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::progress::Timestamp;
7use crate::progress::frontier::{AntichainRef, MutableAntichain};
8use crate::dataflow::channels::pushers::CounterCore as PushCounter;
9use crate::dataflow::channels::pushers::buffer::BufferCore as PushBuffer;
10use crate::dataflow::channels::pact::Pipeline;
11use crate::dataflow::channels::pullers::Counter as PullCounter;
12use crate::dataflow::operators::generic::builder_raw::OperatorBuilder;
13
14
15use crate::dataflow::{StreamCore, Scope};
16use crate::Container;
17
18/// Monitors progress at a `Stream`.
19pub trait Probe<G: Scope, D: Container> {
20    /// Constructs a progress probe which indicates which timestamps have elapsed at the operator.
21    ///
22    /// # Examples
23    /// ```
24    /// use timely::*;
25    /// use timely::dataflow::Scope;
26    /// use timely::dataflow::operators::{Input, Probe, Inspect};
27    ///
28    /// // construct and execute a timely dataflow
29    /// timely::execute(Config::thread(), |worker| {
30    ///
31    ///     // add an input and base computation off of it
32    ///     let (mut input, probe) = worker.dataflow(|scope| {
33    ///         let (input, stream) = scope.new_input();
34    ///         let probe = stream.inspect(|x| println!("hello {:?}", x))
35    ///                           .probe();
36    ///         (input, probe)
37    ///     });
38    ///
39    ///     // introduce input, advance computation
40    ///     for round in 0..10 {
41    ///         input.send(round);
42    ///         input.advance_to(round + 1);
43    ///         worker.step_while(|| probe.less_than(input.time()));
44    ///     }
45    /// }).unwrap();
46    /// ```
47    fn probe(&self) -> Handle<G::Timestamp>;
48
49    /// Inserts a progress probe in a stream.
50    ///
51    /// # Examples
52    /// ```
53    /// use timely::*;
54    /// use timely::dataflow::Scope;
55    /// use timely::dataflow::operators::{Input, Probe, Inspect};
56    /// use timely::dataflow::operators::probe::Handle;
57    ///
58    /// // construct and execute a timely dataflow
59    /// timely::execute(Config::thread(), |worker| {
60    ///
61    ///     // add an input and base computation off of it
62    ///     let mut probe = Handle::new();
63    ///     let mut input = worker.dataflow(|scope| {
64    ///         let (input, stream) = scope.new_input();
65    ///         stream.probe_with(&mut probe)
66    ///               .inspect(|x| println!("hello {:?}", x));
67    ///
68    ///         input
69    ///     });
70    ///
71    ///     // introduce input, advance computation
72    ///     for round in 0..10 {
73    ///         input.send(round);
74    ///         input.advance_to(round + 1);
75    ///         worker.step_while(|| probe.less_than(input.time()));
76    ///     }
77    /// }).unwrap();
78    /// ```
79    fn probe_with(&self, handle: &mut Handle<G::Timestamp>) -> StreamCore<G, D>;
80}
81
82impl<G: Scope, D: Container> Probe<G, D> for StreamCore<G, D> {
83    fn probe(&self) -> Handle<G::Timestamp> {
84
85        // the frontier is shared state; scope updates, handle reads.
86        let mut handle = Handle::<G::Timestamp>::new();
87        self.probe_with(&mut handle);
88        handle
89    }
90    fn probe_with(&self, handle: &mut Handle<G::Timestamp>) -> StreamCore<G, D> {
91
92        let mut builder = OperatorBuilder::new("Probe".to_owned(), self.scope());
93        let mut input = PullCounter::new(builder.new_input(self, Pipeline));
94        let (tee, stream) = builder.new_output();
95        let mut output = PushBuffer::new(PushCounter::new(tee));
96
97        let shared_frontier = handle.frontier.clone();
98        let mut started = false;
99
100        let mut vector = Default::default();
101
102        builder.build(
103            move |progress| {
104
105                // surface all frontier changes to the shared frontier.
106                let mut borrow = shared_frontier.borrow_mut();
107                borrow.update_iter(progress.frontiers[0].drain());
108
109                if !started {
110                    // discard initial capability.
111                    progress.internals[0].update(G::Timestamp::minimum(), -1);
112                    started = true;
113                }
114
115                use crate::communication::message::RefOrMut;
116
117                while let Some(message) = input.next() {
118                    let (time, data) = match message.as_ref_or_mut() {
119                        RefOrMut::Ref(reference) => (&reference.time, RefOrMut::Ref(&reference.data)),
120                        RefOrMut::Mut(reference) => (&reference.time, RefOrMut::Mut(&mut reference.data)),
121                    };
122                    data.swap(&mut vector);
123                    output.session(time).give_container(&mut vector);
124                }
125                output.cease();
126
127                // extract what we know about progress from the input and output adapters.
128                input.consumed().borrow_mut().drain_into(&mut progress.consumeds[0]);
129                output.inner().produced().borrow_mut().drain_into(&mut progress.produceds[0]);
130
131                false
132            },
133        );
134
135        stream
136    }
137}
138
139/// Reports information about progress at the probe.
140#[derive(Debug)]
141pub struct Handle<T:Timestamp> {
142    frontier: Rc<RefCell<MutableAntichain<T>>>
143}
144
145impl<T: Timestamp> Handle<T> {
146    /// returns true iff the frontier is strictly less than `time`.
147    #[inline] pub fn less_than(&self, time: &T) -> bool { self.frontier.borrow().less_than(time) }
148    /// returns true iff the frontier is less than or equal to `time`.
149    #[inline] pub fn less_equal(&self, time: &T) -> bool { self.frontier.borrow().less_equal(time) }
150    /// returns true iff the frontier is empty.
151    #[inline] pub fn done(&self) -> bool { self.frontier.borrow().is_empty() }
152    /// Allocates a new handle.
153    #[inline] pub fn new() -> Self { Handle { frontier: Rc::new(RefCell::new(MutableAntichain::new())) } }
154
155    /// Invokes a method on the frontier, returning its result.
156    ///
157    /// This method allows inspection of the frontier, which cannot be returned by reference as
158    /// it is on the other side of a `RefCell`.
159    ///
160    /// # Examples
161    ///
162    /// ```
163    /// use timely::dataflow::operators::probe::Handle;
164    ///
165    /// let handle = Handle::<usize>::new();
166    /// let frontier = handle.with_frontier(|frontier| frontier.to_vec());
167    /// ```
168    #[inline]
169    pub fn with_frontier<R, F: FnMut(AntichainRef<T>)->R>(&self, mut function: F) -> R {
170        function(self.frontier.borrow().frontier())
171    }
172}
173
174impl<T: Timestamp> Clone for Handle<T> {
175    fn clone(&self) -> Self {
176        Handle {
177            frontier: self.frontier.clone()
178        }
179    }
180}
181
182impl<T> Default for Handle<T>
183where
184    T: Timestamp,
185{
186    fn default() -> Self {
187        Self::new()
188    }
189}
190
191#[cfg(test)]
192mod tests {
193
194    use crate::Config;
195    use crate::dataflow::operators::{Input, Probe};
196
197    #[test]
198    fn probe() {
199
200        // initializes and runs a timely dataflow computation
201        crate::execute(Config::thread(), |worker| {
202
203            // create a new input, and inspect its output
204            let (mut input, probe) = worker.dataflow(move |scope| {
205                let (input, stream) = scope.new_input::<String>();
206                (input, stream.probe())
207            });
208
209            // introduce data and watch!
210            for round in 0..10 {
211                assert!(!probe.done());
212                assert!(probe.less_equal(&round));
213                assert!(probe.less_than(&(round + 1)));
214                input.advance_to(round + 1);
215                worker.step();
216            }
217
218            // seal the input
219            input.close();
220
221            // finish off any remaining work
222            worker.step();
223            worker.step();
224            worker.step();
225            worker.step();
226            assert!(probe.done());
227        }).unwrap();
228    }
229
230}