timely/dataflow/operators/
input.rs

1//! Create new `Streams` connected to external inputs.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::scheduling::{Schedule, Activator};
7
8use crate::progress::frontier::Antichain;
9use crate::progress::{Operate, operate::SharedProgress, Timestamp, ChangeBatch};
10use crate::progress::Source;
11
12use crate::{Container, Data};
13use crate::communication::Push;
14use crate::dataflow::{Stream, ScopeParent, Scope, StreamCore};
15use crate::dataflow::channels::pushers::{TeeCore, CounterCore};
16use crate::dataflow::channels::Message;
17
18
19// TODO : This is an exogenous input, but it would be nice to wrap a Subgraph in something
20// TODO : more like a harness, with direct access to its inputs.
21
22// NOTE : This only takes a &self, not a &mut self, which works but is a bit weird.
23// NOTE : Experiments with &mut indicate that the borrow of 'a lives for too long.
24// NOTE : Might be able to fix with another lifetime parameter, say 'c: 'a.
25
26/// Create a new `Stream` and `Handle` through which to supply input.
27pub trait Input : Scope {
28    /// Create a new `Stream` and `Handle` through which to supply input.
29    ///
30    /// The `new_input` method returns a pair `(Handle, Stream)` where the `Stream` can be used
31    /// immediately for timely dataflow construction, and the `Handle` is later used to introduce
32    /// data into the timely dataflow computation.
33    ///
34    /// The `Handle` also provides a means to indicate
35    /// to timely dataflow that the input has advanced beyond certain timestamps, allowing timely
36    /// to issue progress notifications.
37    ///
38    /// # Examples
39    /// ```
40    /// use timely::*;
41    /// use timely::dataflow::operators::{Input, Inspect};
42    ///
43    /// // construct and execute a timely dataflow
44    /// timely::execute(Config::thread(), |worker| {
45    ///
46    ///     // add an input and base computation off of it
47    ///     let mut input = worker.dataflow(|scope| {
48    ///         let (input, stream) = scope.new_input();
49    ///         stream.inspect(|x| println!("hello {:?}", x));
50    ///         input
51    ///     });
52    ///
53    ///     // introduce input, advance computation
54    ///     for round in 0..10 {
55    ///         input.send(round);
56    ///         input.advance_to(round + 1);
57    ///         worker.step();
58    ///     }
59    /// });
60    /// ```
61    fn new_input<D: Data>(&mut self) -> (Handle<<Self as ScopeParent>::Timestamp, D>, Stream<Self, D>);
62
63    /// Create a new [StreamCore] and [HandleCore] through which to supply input.
64    ///
65    /// The `new_input_core` method returns a pair `(HandleCore, StreamCore)` where the [StreamCore] can be used
66    /// immediately for timely dataflow construction, and the `HandleCore` is later used to introduce
67    /// data into the timely dataflow computation.
68    ///
69    /// The `HandleCore` also provides a means to indicate
70    /// to timely dataflow that the input has advanced beyond certain timestamps, allowing timely
71    /// to issue progress notifications.
72    ///
73    /// # Examples
74    /// ```
75    /// use timely::*;
76    /// use timely::dataflow::operators::{Input, Inspect};
77    ///
78    /// // construct and execute a timely dataflow
79    /// timely::execute(Config::thread(), |worker| {
80    ///
81    ///     // add an input and base computation off of it
82    ///     let mut input = worker.dataflow(|scope| {
83    ///         let (input, stream) = scope.new_input_core::<Vec<_>>();
84    ///         stream.inspect(|x| println!("hello {:?}", x));
85    ///         input
86    ///     });
87    ///
88    ///     // introduce input, advance computation
89    ///     for round in 0..10 {
90    ///         input.send(round);
91    ///         input.advance_to(round + 1);
92    ///         worker.step();
93    ///     }
94    /// });
95    /// ```
96    fn new_input_core<D: Container>(&mut self) -> (HandleCore<<Self as ScopeParent>::Timestamp, D>, StreamCore<Self, D>);
97
98    /// Create a new stream from a supplied interactive handle.
99    ///
100    /// This method creates a new timely stream whose data are supplied interactively through the `handle`
101    /// argument. Each handle may be used multiple times (or not at all), and will clone data as appropriate
102    /// if it as attached to more than one stream.
103    ///
104    /// # Examples
105    /// ```
106    /// use timely::*;
107    /// use timely::dataflow::operators::{Input, Inspect};
108    /// use timely::dataflow::operators::input::Handle;
109    ///
110    /// // construct and execute a timely dataflow
111    /// timely::execute(Config::thread(), |worker| {
112    ///
113    ///     // add an input and base computation off of it
114    ///     let mut input = Handle::new();
115    ///     worker.dataflow(|scope| {
116    ///         scope.input_from(&mut input)
117    ///              .inspect(|x| println!("hello {:?}", x));
118    ///     });
119    ///
120    ///     // introduce input, advance computation
121    ///     for round in 0..10 {
122    ///         input.send(round);
123    ///         input.advance_to(round + 1);
124    ///         worker.step();
125    ///     }
126    /// });
127    /// ```
128    fn input_from<D: Data>(&mut self, handle: &mut Handle<<Self as ScopeParent>::Timestamp, D>) -> Stream<Self, D>;
129
130    /// Create a new stream from a supplied interactive handle.
131    ///
132    /// This method creates a new timely stream whose data are supplied interactively through the `handle`
133    /// argument. Each handle may be used multiple times (or not at all), and will clone data as appropriate
134    /// if it as attached to more than one stream.
135    ///
136    /// # Examples
137    /// ```
138    /// use timely::*;
139    /// use timely::dataflow::operators::{Input, Inspect};
140    /// use timely::dataflow::operators::input::Handle;
141    ///
142    /// // construct and execute a timely dataflow
143    /// timely::execute(Config::thread(), |worker| {
144    ///
145    ///     // add an input and base computation off of it
146    ///     let mut input = Handle::new();
147    ///     worker.dataflow(|scope| {
148    ///         scope.input_from_core(&mut input)
149    ///              .inspect(|x| println!("hello {:?}", x));
150    ///     });
151    ///
152    ///     // introduce input, advance computation
153    ///     for round in 0..10 {
154    ///         input.send(round);
155    ///         input.advance_to(round + 1);
156    ///         worker.step();
157    ///     }
158    /// });
159    /// ```
160    fn input_from_core<D: Container>(&mut self, handle: &mut HandleCore<<Self as ScopeParent>::Timestamp, D>) -> StreamCore<Self, D>;
161}
162
163use crate::order::TotalOrder;
164impl<G: Scope> Input for G where <G as ScopeParent>::Timestamp: TotalOrder {
165    fn new_input<D: Data>(&mut self) -> (Handle<<G as ScopeParent>::Timestamp, D>, Stream<G, D>) {
166        self.new_input_core()
167    }
168
169    fn input_from<D: Data>(&mut self, handle: &mut Handle<<G as ScopeParent>::Timestamp, D>) -> Stream<G, D> {
170        self.input_from_core(handle)
171    }
172
173    fn new_input_core<D: Container>(&mut self) -> (HandleCore<<G as ScopeParent>::Timestamp, D>, StreamCore<G, D>) {
174        let mut handle = HandleCore::new();
175        let stream = self.input_from_core(&mut handle);
176        (handle, stream)
177    }
178
179    fn input_from_core<D: Container>(&mut self, handle: &mut HandleCore<<G as ScopeParent>::Timestamp, D>) -> StreamCore<G, D> {
180        let (output, registrar) = TeeCore::<<G as ScopeParent>::Timestamp, D>::new();
181        let counter = CounterCore::new(output);
182        let produced = counter.produced().clone();
183
184        let index = self.allocate_operator_index();
185        let mut address = self.addr();
186        address.push(index);
187
188        handle.activate.push(self.activator_for(&address[..]));
189
190        let progress = Rc::new(RefCell::new(ChangeBatch::new()));
191
192        handle.register(counter, progress.clone());
193
194        let copies = self.peers();
195
196        self.add_operator_with_index(Box::new(Operator {
197            name: "Input".to_owned(),
198            address,
199            shared_progress: Rc::new(RefCell::new(SharedProgress::new(0, 1))),
200            progress,
201            messages: produced,
202            copies,
203        }), index);
204
205        StreamCore::new(Source::new(index, 0), registrar, self.clone())
206    }
207}
208
209#[derive(Debug)]
210struct Operator<T:Timestamp> {
211    name: String,
212    address: Vec<usize>,
213    shared_progress: Rc<RefCell<SharedProgress<T>>>,
214    progress:   Rc<RefCell<ChangeBatch<T>>>,           // times closed since last asked
215    messages:   Rc<RefCell<ChangeBatch<T>>>,           // messages sent since last asked
216    copies:     usize,
217}
218
219impl<T:Timestamp> Schedule for Operator<T> {
220
221    fn name(&self) -> &str { &self.name }
222
223    fn path(&self) -> &[usize] { &self.address[..] }
224
225    fn schedule(&mut self) -> bool {
226        let shared_progress = &mut *self.shared_progress.borrow_mut();
227        self.progress.borrow_mut().drain_into(&mut shared_progress.internals[0]);
228        self.messages.borrow_mut().drain_into(&mut shared_progress.produceds[0]);
229        false
230    }
231}
232
233impl<T:Timestamp> Operate<T> for Operator<T> {
234
235    fn inputs(&self) -> usize { 0 }
236    fn outputs(&self) -> usize { 1 }
237
238    fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<<T as Timestamp>::Summary>>>, Rc<RefCell<SharedProgress<T>>>) {
239        self.shared_progress.borrow_mut().internals[0].update(T::minimum(), self.copies as i64);
240        (Vec::new(), self.shared_progress.clone())
241    }
242
243    fn notify_me(&self) -> bool { false }
244}
245
246
247/// A handle to an input `Stream`, used to introduce data to a timely dataflow computation.
248#[derive(Debug)]
249pub struct HandleCore<T: Timestamp, C: Container> {
250    activate: Vec<Activator>,
251    progress: Vec<Rc<RefCell<ChangeBatch<T>>>>,
252    pushers: Vec<CounterCore<T, C, TeeCore<T, C>>>,
253    buffer1: C,
254    buffer2: C,
255    now_at: T,
256}
257
258/// A handle specialized to vector-based containers.
259pub type Handle<T, D> = HandleCore<T, Vec<D>>;
260
261impl<T: Timestamp, D: Container> HandleCore<T, D> {
262    /// Allocates a new input handle, from which one can create timely streams.
263    ///
264    /// # Examples
265    /// ```
266    /// use timely::*;
267    /// use timely::dataflow::operators::{Input, Inspect};
268    /// use timely::dataflow::operators::input::Handle;
269    ///
270    /// // construct and execute a timely dataflow
271    /// timely::execute(Config::thread(), |worker| {
272    ///
273    ///     // add an input and base computation off of it
274    ///     let mut input = Handle::new();
275    ///     worker.dataflow(|scope| {
276    ///         scope.input_from(&mut input)
277    ///              .inspect(|x| println!("hello {:?}", x));
278    ///     });
279    ///
280    ///     // introduce input, advance computation
281    ///     for round in 0..10 {
282    ///         input.send(round);
283    ///         input.advance_to(round + 1);
284    ///         worker.step();
285    ///     }
286    /// });
287    /// ```
288    pub fn new() -> Self {
289        Self {
290            activate: Vec::new(),
291            progress: Vec::new(),
292            pushers: Vec::new(),
293            buffer1: Default::default(),
294            buffer2: Default::default(),
295            now_at: T::minimum(),
296        }
297    }
298
299    /// Creates an input stream from the handle in the supplied scope.
300    ///
301    /// # Examples
302    /// ```
303    /// use timely::*;
304    /// use timely::dataflow::operators::{Input, Inspect};
305    /// use timely::dataflow::operators::input::Handle;
306    ///
307    /// // construct and execute a timely dataflow
308    /// timely::execute(Config::thread(), |worker| {
309    ///
310    ///     // add an input and base computation off of it
311    ///     let mut input = Handle::new();
312    ///     worker.dataflow(|scope| {
313    ///         input.to_stream(scope)
314    ///              .inspect(|x| println!("hello {:?}", x));
315    ///     });
316    ///
317    ///     // introduce input, advance computation
318    ///     for round in 0..10 {
319    ///         input.send(round);
320    ///         input.advance_to(round + 1);
321    ///         worker.step();
322    ///     }
323    /// });
324    /// ```
325    pub fn to_stream<G: Scope>(&mut self, scope: &mut G) -> StreamCore<G, D>
326    where
327        T: TotalOrder,
328        G: ScopeParent<Timestamp=T>,
329    {
330        scope.input_from_core(self)
331    }
332
333    fn register(
334        &mut self,
335        pusher: CounterCore<T, D, TeeCore<T, D>>,
336        progress: Rc<RefCell<ChangeBatch<T>>>,
337    ) {
338        // flush current contents, so new registrant does not see existing data.
339        if !self.buffer1.is_empty() { self.flush(); }
340
341        // we need to produce an appropriate update to the capabilities for `progress`, in case a
342        // user has decided to drive the handle around a bit before registering it.
343        progress.borrow_mut().update(T::minimum(), -1);
344        progress.borrow_mut().update(self.now_at.clone(), 1);
345
346        self.progress.push(progress);
347        self.pushers.push(pusher);
348    }
349
350    // flushes our buffer at each of the destinations. there can be more than one; clone if needed.
351    #[inline(never)]
352    fn flush(&mut self) {
353        for index in 0 .. self.pushers.len() {
354            if index < self.pushers.len() - 1 {
355                self.buffer2.clone_from(&self.buffer1);
356                Message::push_at(&mut self.buffer2, self.now_at.clone(), &mut self.pushers[index]);
357                debug_assert!(self.buffer2.is_empty());
358            }
359            else {
360                Message::push_at(&mut self.buffer1, self.now_at.clone(), &mut self.pushers[index]);
361                debug_assert!(self.buffer1.is_empty());
362            }
363        }
364        self.buffer1.clear();
365    }
366
367    // closes the current epoch, flushing if needed, shutting if needed, and updating the frontier.
368    fn close_epoch(&mut self) {
369        if !self.buffer1.is_empty() { self.flush(); }
370        for pusher in self.pushers.iter_mut() {
371            pusher.done();
372        }
373        for progress in self.progress.iter() {
374            progress.borrow_mut().update(self.now_at.clone(), -1);
375        }
376        // Alert worker of each active input operator.
377        for activate in self.activate.iter() {
378            activate.activate();
379        }
380    }
381
382    /// Sends a batch of records into the corresponding timely dataflow [StreamCore], at the current epoch.
383    ///
384    /// This method flushes single elements previously sent with `send`, to keep the insertion order.
385    ///
386    /// # Examples
387    /// ```
388    /// use timely::*;
389    /// use timely::dataflow::operators::{Input, InspectCore};
390    /// use timely::dataflow::operators::input::HandleCore;
391    ///
392    /// // construct and execute a timely dataflow
393    /// timely::execute(Config::thread(), |worker| {
394    ///
395    ///     // add an input and base computation off of it
396    ///     let mut input = HandleCore::new();
397    ///     worker.dataflow(|scope| {
398    ///         scope.input_from_core(&mut input)
399    ///              .inspect_container(|x| println!("hello {:?}", x));
400    ///     });
401    ///
402    ///     // introduce input, advance computation
403    ///     for round in 0..10 {
404    ///         input.send_batch(&mut vec![format!("{}", round)]);
405    ///         input.advance_to(round + 1);
406    ///         worker.step();
407    ///     }
408    /// });
409    /// ```
410    pub fn send_batch(&mut self, buffer: &mut D) {
411
412        if !buffer.is_empty() {
413            // flush buffered elements to ensure local fifo.
414            if !self.buffer1.is_empty() { self.flush(); }
415
416            // push buffer (or clone of buffer) at each destination.
417            for index in 0 .. self.pushers.len() {
418                if index < self.pushers.len() - 1 {
419                    self.buffer2.clone_from(&buffer);
420                    Message::push_at(&mut self.buffer2, self.now_at.clone(), &mut self.pushers[index]);
421                    assert!(self.buffer2.is_empty());
422                }
423                else {
424                    Message::push_at(buffer, self.now_at.clone(), &mut self.pushers[index]);
425                    assert!(buffer.is_empty());
426                }
427            }
428            buffer.clear();
429        }
430    }
431
432    /// Advances the current epoch to `next`.
433    ///
434    /// This method allows timely dataflow to issue progress notifications as it can now determine
435    /// that this input can no longer produce data at earlier timestamps.
436    pub fn advance_to(&mut self, next: T) {
437        // Assert that we do not rewind time.
438        assert!(self.now_at.less_equal(&next));
439        // Flush buffers if time has actually changed.
440        if !self.now_at.eq(&next) {
441            self.close_epoch();
442            self.now_at = next;
443            for progress in self.progress.iter() {
444                progress.borrow_mut().update(self.now_at.clone(), 1);
445            }
446        }
447    }
448
449    /// Closes the input.
450    ///
451    /// This method allows timely dataflow to issue all progress notifications blocked by this input
452    /// and to begin to shut down operators, as this input can no longer produce data.
453    pub fn close(self) { }
454
455    /// Reports the current epoch.
456    pub fn epoch(&self) -> &T {
457        &self.now_at
458    }
459
460    /// Reports the current timestamp.
461    pub fn time(&self) -> &T {
462        &self.now_at
463    }
464}
465
466impl<T: Timestamp, D: Data> Handle<T, D> {
467    #[inline]
468    /// Sends one record into the corresponding timely dataflow `Stream`, at the current epoch.
469    ///
470    /// # Examples
471    /// ```
472    /// use timely::*;
473    /// use timely::dataflow::operators::{Input, Inspect};
474    /// use timely::dataflow::operators::input::Handle;
475    ///
476    /// // construct and execute a timely dataflow
477    /// timely::execute(Config::thread(), |worker| {
478    ///
479    ///     // add an input and base computation off of it
480    ///     let mut input = Handle::new();
481    ///     worker.dataflow(|scope| {
482    ///         scope.input_from(&mut input)
483    ///              .inspect(|x| println!("hello {:?}", x));
484    ///     });
485    ///
486    ///     // introduce input, advance computation
487    ///     for round in 0..10 {
488    ///         input.send(round);
489    ///         input.advance_to(round + 1);
490    ///         worker.step();
491    ///     }
492    /// });
493    /// ```
494    pub fn send(&mut self, data: D) {
495        // assert!(self.buffer1.capacity() == Message::<T, D>::default_length());
496        self.buffer1.push(data);
497        if self.buffer1.len() == self.buffer1.capacity() {
498            self.flush();
499        }
500    }
501}
502
503impl<T: Timestamp, D: Data> Default for Handle<T, D> {
504    fn default() -> Self {
505        Self::new()
506    }
507}
508
509impl<T:Timestamp, C: Container> Drop for HandleCore<T, C> {
510    fn drop(&mut self) {
511        self.close_epoch();
512    }
513}