timely/dataflow/operators/core/probe.rs
1//! Monitor progress at a `Stream`.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::progress::Timestamp;
7use crate::progress::frontier::{AntichainRef, MutableAntichain};
8use crate::dataflow::channels::pushers::Counter as PushCounter;
9use crate::dataflow::channels::pact::Pipeline;
10use crate::dataflow::channels::pullers::Counter as PullCounter;
11use crate::dataflow::operators::generic::builder_raw::OperatorBuilder;
12
13
14use crate::dataflow::{Stream, Scope};
15use crate::Container;
16use crate::dataflow::channels::Message;
17
18/// Monitors progress at a `Stream`.
19pub trait Probe<G: Scope, C: Container> {
20 /// Constructs a progress probe which indicates which timestamps have elapsed at the operator.
21 ///
22 /// Returns a tuple of a probe handle and the input stream.
23 ///
24 /// # Examples
25 /// ```
26 /// use timely::*;
27 /// use timely::dataflow::Scope;
28 /// use timely::dataflow::operators::{Input, Probe, Inspect};
29 ///
30 /// // construct and execute a timely dataflow
31 /// timely::execute(Config::thread(), |worker| {
32 ///
33 /// // add an input and base computation off of it
34 /// let (mut input, probe) = worker.dataflow(|scope| {
35 /// let (input, stream) = scope.new_input::<Vec<_>>();
36 /// let (probe, _) = stream.inspect(|x| println!("hello {:?}", x))
37 /// .probe();
38 /// (input, probe)
39 /// });
40 ///
41 /// // introduce input, advance computation
42 /// for round in 0..10 {
43 /// input.send(round);
44 /// input.advance_to(round + 1);
45 /// worker.step_while(|| probe.less_than(input.time()));
46 /// }
47 /// }).unwrap();
48 /// ```
49 fn probe(self) -> (Handle<G::Timestamp>, Self);
50
51 /// Inserts a progress probe in a stream.
52 ///
53 /// # Examples
54 /// ```
55 /// use timely::*;
56 /// use timely::dataflow::Scope;
57 /// use timely::dataflow::operators::{Input, Probe, Inspect};
58 /// use timely::dataflow::operators::probe::Handle;
59 ///
60 /// // construct and execute a timely dataflow
61 /// timely::execute(Config::thread(), |worker| {
62 ///
63 /// // add an input and base computation off of it
64 /// let mut probe = Handle::new();
65 /// let mut input = worker.dataflow(|scope| {
66 /// let (input, stream) = scope.new_input::<Vec<_>>();
67 /// stream.probe_with(&mut probe)
68 /// .inspect(|x| println!("hello {:?}", x));
69 ///
70 /// input
71 /// });
72 ///
73 /// // introduce input, advance computation
74 /// for round in 0..10 {
75 /// input.send(round);
76 /// input.advance_to(round + 1);
77 /// worker.step_while(|| probe.less_than(input.time()));
78 /// }
79 /// }).unwrap();
80 /// ```
81 fn probe_with(self, handle: &Handle<G::Timestamp>) -> Stream<G, C>;
82}
83
84impl<G: Scope, C: Container> Probe<G, C> for Stream<G, C> {
85 fn probe(self) -> (Handle<G::Timestamp>, Self) {
86
87 // the frontier is shared state; scope updates, handle reads.
88 let handle = Handle::<G::Timestamp>::new();
89 let stream = self.probe_with(&handle);
90 (handle, stream)
91 }
92 fn probe_with(self, handle: &Handle<G::Timestamp>) -> Stream<G, C> {
93
94 let mut builder = OperatorBuilder::new("Probe".to_owned(), self.scope());
95 let mut input = PullCounter::new(builder.new_input(self, Pipeline));
96 let (tee, stream) = builder.new_output();
97 let mut output = PushCounter::new(tee);
98
99 let shared_frontier = Rc::downgrade(&handle.frontier);
100 let mut started = false;
101
102 builder.build(
103 move |progress| {
104
105 // surface all frontier changes to the shared frontier.
106 if let Some(shared_frontier) = shared_frontier.upgrade() {
107 let mut borrow = shared_frontier.borrow_mut();
108 borrow.update_iter(progress.frontiers[0].drain());
109 }
110
111 if !started {
112 // discard initial capability.
113 progress.internals[0].update(G::Timestamp::minimum(), -1);
114 started = true;
115 }
116
117 while let Some(message) = input.next() {
118 Message::push_at(&mut message.data, message.time.clone(), &mut output);
119 }
120 use timely_communication::Push;
121 output.done();
122
123 // extract what we know about progress from the input and output adapters.
124 input.consumed().borrow_mut().drain_into(&mut progress.consumeds[0]);
125 output.produced().borrow_mut().drain_into(&mut progress.produceds[0]);
126
127 false
128 },
129 );
130
131 stream
132 }
133}
134
135/// Reports information about progress at the probe.
136#[derive(Debug)]
137pub struct Handle<T:Timestamp> {
138 frontier: Rc<RefCell<MutableAntichain<T>>>
139}
140
141impl<T: Timestamp> Handle<T> {
142 /// Returns `true` iff the frontier is strictly less than `time`.
143 #[inline] pub fn less_than(&self, time: &T) -> bool { self.frontier.borrow().less_than(time) }
144 /// Returns `true` iff the frontier is less than or equal to `time`.
145 #[inline] pub fn less_equal(&self, time: &T) -> bool { self.frontier.borrow().less_equal(time) }
146 /// Returns `true` iff the frontier is empty.
147 #[inline] pub fn done(&self) -> bool { self.frontier.borrow().is_empty() }
148 /// Allocates a new handle.
149 #[inline] pub fn new() -> Self { Handle { frontier: Rc::new(RefCell::new(MutableAntichain::new())) } }
150
151 /// Invokes a method on the frontier, returning its result.
152 ///
153 /// This method allows inspection of the frontier, which cannot be returned by reference as
154 /// it is on the other side of a `RefCell`.
155 ///
156 /// # Examples
157 ///
158 /// ```
159 /// use timely::dataflow::operators::probe::Handle;
160 ///
161 /// let handle = Handle::<usize>::new();
162 /// let frontier = handle.with_frontier(|frontier| frontier.to_vec());
163 /// ```
164 #[inline]
165 pub fn with_frontier<R, F: FnMut(AntichainRef<T>)->R>(&self, mut function: F) -> R {
166 function(self.frontier.borrow().frontier())
167 }
168}
169
170impl<T: Timestamp> Clone for Handle<T> {
171 fn clone(&self) -> Self {
172 Handle {
173 frontier: Rc::clone(&self.frontier)
174 }
175 }
176}
177
178impl<T> Default for Handle<T>
179where
180 T: Timestamp,
181{
182 fn default() -> Self {
183 Self::new()
184 }
185}
186
187#[cfg(test)]
188mod tests {
189
190 use crate::Config;
191 use crate::dataflow::operators::{Input, Probe};
192
193 #[test]
194 fn probe() {
195
196 // initializes and runs a timely dataflow computation
197 crate::execute(Config::thread(), |worker| {
198
199 // create a new input, and inspect its output
200 let (mut input, probe) = worker.dataflow(move |scope| {
201 let (input, stream) = scope.new_input::<Vec<String>>();
202 (input, stream.probe().0)
203 });
204
205 // introduce data and watch!
206 for round in 0..10 {
207 assert!(!probe.done());
208 assert!(probe.less_equal(&round));
209 assert!(probe.less_than(&(round + 1)));
210 input.advance_to(round + 1);
211 worker.step();
212 }
213
214 // seal the input
215 input.close();
216
217 // finish off any remaining work
218 worker.step();
219 worker.step();
220 worker.step();
221 worker.step();
222 assert!(probe.done());
223 }).unwrap();
224 }
225
226}