timely/dataflow/operators/input.rs
1//! Create new `Streams` connected to external inputs.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5
6use crate::scheduling::{Schedule, Activator};
7
8use crate::progress::frontier::Antichain;
9use crate::progress::{Operate, operate::SharedProgress, Timestamp, ChangeBatch};
10use crate::progress::Source;
11
12use crate::{Container, Data};
13use crate::communication::Push;
14use crate::dataflow::{Stream, ScopeParent, Scope, StreamCore};
15use crate::dataflow::channels::pushers::{TeeCore, CounterCore};
16use crate::dataflow::channels::Message;
17
18
19// TODO : This is an exogenous input, but it would be nice to wrap a Subgraph in something
20// TODO : more like a harness, with direct access to its inputs.
21
22// NOTE : This only takes a &self, not a &mut self, which works but is a bit weird.
23// NOTE : Experiments with &mut indicate that the borrow of 'a lives for too long.
24// NOTE : Might be able to fix with another lifetime parameter, say 'c: 'a.
25
26/// Create a new `Stream` and `Handle` through which to supply input.
27pub trait Input : Scope {
28 /// Create a new `Stream` and `Handle` through which to supply input.
29 ///
30 /// The `new_input` method returns a pair `(Handle, Stream)` where the `Stream` can be used
31 /// immediately for timely dataflow construction, and the `Handle` is later used to introduce
32 /// data into the timely dataflow computation.
33 ///
34 /// The `Handle` also provides a means to indicate
35 /// to timely dataflow that the input has advanced beyond certain timestamps, allowing timely
36 /// to issue progress notifications.
37 ///
38 /// # Examples
39 /// ```
40 /// use timely::*;
41 /// use timely::dataflow::operators::{Input, Inspect};
42 ///
43 /// // construct and execute a timely dataflow
44 /// timely::execute(Config::thread(), |worker| {
45 ///
46 /// // add an input and base computation off of it
47 /// let mut input = worker.dataflow(|scope| {
48 /// let (input, stream) = scope.new_input();
49 /// stream.inspect(|x| println!("hello {:?}", x));
50 /// input
51 /// });
52 ///
53 /// // introduce input, advance computation
54 /// for round in 0..10 {
55 /// input.send(round);
56 /// input.advance_to(round + 1);
57 /// worker.step();
58 /// }
59 /// });
60 /// ```
61 fn new_input<D: Data>(&mut self) -> (Handle<<Self as ScopeParent>::Timestamp, D>, Stream<Self, D>);
62
63 /// Create a new [StreamCore] and [HandleCore] through which to supply input.
64 ///
65 /// The `new_input_core` method returns a pair `(HandleCore, StreamCore)` where the [StreamCore] can be used
66 /// immediately for timely dataflow construction, and the `HandleCore` is later used to introduce
67 /// data into the timely dataflow computation.
68 ///
69 /// The `HandleCore` also provides a means to indicate
70 /// to timely dataflow that the input has advanced beyond certain timestamps, allowing timely
71 /// to issue progress notifications.
72 ///
73 /// # Examples
74 /// ```
75 /// use timely::*;
76 /// use timely::dataflow::operators::{Input, Inspect};
77 ///
78 /// // construct and execute a timely dataflow
79 /// timely::execute(Config::thread(), |worker| {
80 ///
81 /// // add an input and base computation off of it
82 /// let mut input = worker.dataflow(|scope| {
83 /// let (input, stream) = scope.new_input_core::<Vec<_>>();
84 /// stream.inspect(|x| println!("hello {:?}", x));
85 /// input
86 /// });
87 ///
88 /// // introduce input, advance computation
89 /// for round in 0..10 {
90 /// input.send(round);
91 /// input.advance_to(round + 1);
92 /// worker.step();
93 /// }
94 /// });
95 /// ```
96 fn new_input_core<D: Container>(&mut self) -> (HandleCore<<Self as ScopeParent>::Timestamp, D>, StreamCore<Self, D>);
97
98 /// Create a new stream from a supplied interactive handle.
99 ///
100 /// This method creates a new timely stream whose data are supplied interactively through the `handle`
101 /// argument. Each handle may be used multiple times (or not at all), and will clone data as appropriate
102 /// if it as attached to more than one stream.
103 ///
104 /// # Examples
105 /// ```
106 /// use timely::*;
107 /// use timely::dataflow::operators::{Input, Inspect};
108 /// use timely::dataflow::operators::input::Handle;
109 ///
110 /// // construct and execute a timely dataflow
111 /// timely::execute(Config::thread(), |worker| {
112 ///
113 /// // add an input and base computation off of it
114 /// let mut input = Handle::new();
115 /// worker.dataflow(|scope| {
116 /// scope.input_from(&mut input)
117 /// .inspect(|x| println!("hello {:?}", x));
118 /// });
119 ///
120 /// // introduce input, advance computation
121 /// for round in 0..10 {
122 /// input.send(round);
123 /// input.advance_to(round + 1);
124 /// worker.step();
125 /// }
126 /// });
127 /// ```
128 fn input_from<D: Data>(&mut self, handle: &mut Handle<<Self as ScopeParent>::Timestamp, D>) -> Stream<Self, D>;
129
130 /// Create a new stream from a supplied interactive handle.
131 ///
132 /// This method creates a new timely stream whose data are supplied interactively through the `handle`
133 /// argument. Each handle may be used multiple times (or not at all), and will clone data as appropriate
134 /// if it as attached to more than one stream.
135 ///
136 /// # Examples
137 /// ```
138 /// use timely::*;
139 /// use timely::dataflow::operators::{Input, Inspect};
140 /// use timely::dataflow::operators::input::Handle;
141 ///
142 /// // construct and execute a timely dataflow
143 /// timely::execute(Config::thread(), |worker| {
144 ///
145 /// // add an input and base computation off of it
146 /// let mut input = Handle::new();
147 /// worker.dataflow(|scope| {
148 /// scope.input_from_core(&mut input)
149 /// .inspect(|x| println!("hello {:?}", x));
150 /// });
151 ///
152 /// // introduce input, advance computation
153 /// for round in 0..10 {
154 /// input.send(round);
155 /// input.advance_to(round + 1);
156 /// worker.step();
157 /// }
158 /// });
159 /// ```
160 fn input_from_core<D: Container>(&mut self, handle: &mut HandleCore<<Self as ScopeParent>::Timestamp, D>) -> StreamCore<Self, D>;
161}
162
163use crate::order::TotalOrder;
164impl<G: Scope> Input for G where <G as ScopeParent>::Timestamp: TotalOrder {
165 fn new_input<D: Data>(&mut self) -> (Handle<<G as ScopeParent>::Timestamp, D>, Stream<G, D>) {
166 self.new_input_core()
167 }
168
169 fn input_from<D: Data>(&mut self, handle: &mut Handle<<G as ScopeParent>::Timestamp, D>) -> Stream<G, D> {
170 self.input_from_core(handle)
171 }
172
173 fn new_input_core<D: Container>(&mut self) -> (HandleCore<<G as ScopeParent>::Timestamp, D>, StreamCore<G, D>) {
174 let mut handle = HandleCore::new();
175 let stream = self.input_from_core(&mut handle);
176 (handle, stream)
177 }
178
179 fn input_from_core<D: Container>(&mut self, handle: &mut HandleCore<<G as ScopeParent>::Timestamp, D>) -> StreamCore<G, D> {
180 let (output, registrar) = TeeCore::<<G as ScopeParent>::Timestamp, D>::new();
181 let counter = CounterCore::new(output);
182 let produced = counter.produced().clone();
183
184 let index = self.allocate_operator_index();
185 let mut address = self.addr();
186 address.push(index);
187
188 handle.activate.push(self.activator_for(&address[..]));
189
190 let progress = Rc::new(RefCell::new(ChangeBatch::new()));
191
192 handle.register(counter, progress.clone());
193
194 let copies = self.peers();
195
196 self.add_operator_with_index(Box::new(Operator {
197 name: "Input".to_owned(),
198 address,
199 shared_progress: Rc::new(RefCell::new(SharedProgress::new(0, 1))),
200 progress,
201 messages: produced,
202 copies,
203 }), index);
204
205 StreamCore::new(Source::new(index, 0), registrar, self.clone())
206 }
207}
208
209#[derive(Debug)]
210struct Operator<T:Timestamp> {
211 name: String,
212 address: Vec<usize>,
213 shared_progress: Rc<RefCell<SharedProgress<T>>>,
214 progress: Rc<RefCell<ChangeBatch<T>>>, // times closed since last asked
215 messages: Rc<RefCell<ChangeBatch<T>>>, // messages sent since last asked
216 copies: usize,
217}
218
219impl<T:Timestamp> Schedule for Operator<T> {
220
221 fn name(&self) -> &str { &self.name }
222
223 fn path(&self) -> &[usize] { &self.address[..] }
224
225 fn schedule(&mut self) -> bool {
226 let shared_progress = &mut *self.shared_progress.borrow_mut();
227 self.progress.borrow_mut().drain_into(&mut shared_progress.internals[0]);
228 self.messages.borrow_mut().drain_into(&mut shared_progress.produceds[0]);
229 false
230 }
231}
232
233impl<T:Timestamp> Operate<T> for Operator<T> {
234
235 fn inputs(&self) -> usize { 0 }
236 fn outputs(&self) -> usize { 1 }
237
238 fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<<T as Timestamp>::Summary>>>, Rc<RefCell<SharedProgress<T>>>) {
239 self.shared_progress.borrow_mut().internals[0].update(T::minimum(), self.copies as i64);
240 (Vec::new(), self.shared_progress.clone())
241 }
242
243 fn notify_me(&self) -> bool { false }
244}
245
246
247/// A handle to an input `Stream`, used to introduce data to a timely dataflow computation.
248#[derive(Debug)]
249pub struct HandleCore<T: Timestamp, C: Container> {
250 activate: Vec<Activator>,
251 progress: Vec<Rc<RefCell<ChangeBatch<T>>>>,
252 pushers: Vec<CounterCore<T, C, TeeCore<T, C>>>,
253 buffer1: C,
254 buffer2: C,
255 now_at: T,
256}
257
258/// A handle specialized to vector-based containers.
259pub type Handle<T, D> = HandleCore<T, Vec<D>>;
260
261impl<T: Timestamp, D: Container> HandleCore<T, D> {
262 /// Allocates a new input handle, from which one can create timely streams.
263 ///
264 /// # Examples
265 /// ```
266 /// use timely::*;
267 /// use timely::dataflow::operators::{Input, Inspect};
268 /// use timely::dataflow::operators::input::Handle;
269 ///
270 /// // construct and execute a timely dataflow
271 /// timely::execute(Config::thread(), |worker| {
272 ///
273 /// // add an input and base computation off of it
274 /// let mut input = Handle::new();
275 /// worker.dataflow(|scope| {
276 /// scope.input_from(&mut input)
277 /// .inspect(|x| println!("hello {:?}", x));
278 /// });
279 ///
280 /// // introduce input, advance computation
281 /// for round in 0..10 {
282 /// input.send(round);
283 /// input.advance_to(round + 1);
284 /// worker.step();
285 /// }
286 /// });
287 /// ```
288 pub fn new() -> Self {
289 Self {
290 activate: Vec::new(),
291 progress: Vec::new(),
292 pushers: Vec::new(),
293 buffer1: Default::default(),
294 buffer2: Default::default(),
295 now_at: T::minimum(),
296 }
297 }
298
299 /// Creates an input stream from the handle in the supplied scope.
300 ///
301 /// # Examples
302 /// ```
303 /// use timely::*;
304 /// use timely::dataflow::operators::{Input, Inspect};
305 /// use timely::dataflow::operators::input::Handle;
306 ///
307 /// // construct and execute a timely dataflow
308 /// timely::execute(Config::thread(), |worker| {
309 ///
310 /// // add an input and base computation off of it
311 /// let mut input = Handle::new();
312 /// worker.dataflow(|scope| {
313 /// input.to_stream(scope)
314 /// .inspect(|x| println!("hello {:?}", x));
315 /// });
316 ///
317 /// // introduce input, advance computation
318 /// for round in 0..10 {
319 /// input.send(round);
320 /// input.advance_to(round + 1);
321 /// worker.step();
322 /// }
323 /// });
324 /// ```
325 pub fn to_stream<G: Scope>(&mut self, scope: &mut G) -> StreamCore<G, D>
326 where
327 T: TotalOrder,
328 G: ScopeParent<Timestamp=T>,
329 {
330 scope.input_from_core(self)
331 }
332
333 fn register(
334 &mut self,
335 pusher: CounterCore<T, D, TeeCore<T, D>>,
336 progress: Rc<RefCell<ChangeBatch<T>>>,
337 ) {
338 // flush current contents, so new registrant does not see existing data.
339 if !self.buffer1.is_empty() { self.flush(); }
340
341 // we need to produce an appropriate update to the capabilities for `progress`, in case a
342 // user has decided to drive the handle around a bit before registering it.
343 progress.borrow_mut().update(T::minimum(), -1);
344 progress.borrow_mut().update(self.now_at.clone(), 1);
345
346 self.progress.push(progress);
347 self.pushers.push(pusher);
348 }
349
350 // flushes our buffer at each of the destinations. there can be more than one; clone if needed.
351 #[inline(never)]
352 fn flush(&mut self) {
353 for index in 0 .. self.pushers.len() {
354 if index < self.pushers.len() - 1 {
355 self.buffer2.clone_from(&self.buffer1);
356 Message::push_at(&mut self.buffer2, self.now_at.clone(), &mut self.pushers[index]);
357 debug_assert!(self.buffer2.is_empty());
358 }
359 else {
360 Message::push_at(&mut self.buffer1, self.now_at.clone(), &mut self.pushers[index]);
361 debug_assert!(self.buffer1.is_empty());
362 }
363 }
364 self.buffer1.clear();
365 }
366
367 // closes the current epoch, flushing if needed, shutting if needed, and updating the frontier.
368 fn close_epoch(&mut self) {
369 if !self.buffer1.is_empty() { self.flush(); }
370 for pusher in self.pushers.iter_mut() {
371 pusher.done();
372 }
373 for progress in self.progress.iter() {
374 progress.borrow_mut().update(self.now_at.clone(), -1);
375 }
376 // Alert worker of each active input operator.
377 for activate in self.activate.iter() {
378 activate.activate();
379 }
380 }
381
382 /// Sends a batch of records into the corresponding timely dataflow [StreamCore], at the current epoch.
383 ///
384 /// This method flushes single elements previously sent with `send`, to keep the insertion order.
385 ///
386 /// # Examples
387 /// ```
388 /// use timely::*;
389 /// use timely::dataflow::operators::{Input, InspectCore};
390 /// use timely::dataflow::operators::input::HandleCore;
391 ///
392 /// // construct and execute a timely dataflow
393 /// timely::execute(Config::thread(), |worker| {
394 ///
395 /// // add an input and base computation off of it
396 /// let mut input = HandleCore::new();
397 /// worker.dataflow(|scope| {
398 /// scope.input_from_core(&mut input)
399 /// .inspect_container(|x| println!("hello {:?}", x));
400 /// });
401 ///
402 /// // introduce input, advance computation
403 /// for round in 0..10 {
404 /// input.send_batch(&mut vec![format!("{}", round)]);
405 /// input.advance_to(round + 1);
406 /// worker.step();
407 /// }
408 /// });
409 /// ```
410 pub fn send_batch(&mut self, buffer: &mut D) {
411
412 if !buffer.is_empty() {
413 // flush buffered elements to ensure local fifo.
414 if !self.buffer1.is_empty() { self.flush(); }
415
416 // push buffer (or clone of buffer) at each destination.
417 for index in 0 .. self.pushers.len() {
418 if index < self.pushers.len() - 1 {
419 self.buffer2.clone_from(&buffer);
420 Message::push_at(&mut self.buffer2, self.now_at.clone(), &mut self.pushers[index]);
421 assert!(self.buffer2.is_empty());
422 }
423 else {
424 Message::push_at(buffer, self.now_at.clone(), &mut self.pushers[index]);
425 assert!(buffer.is_empty());
426 }
427 }
428 buffer.clear();
429 }
430 }
431
432 /// Advances the current epoch to `next`.
433 ///
434 /// This method allows timely dataflow to issue progress notifications as it can now determine
435 /// that this input can no longer produce data at earlier timestamps.
436 pub fn advance_to(&mut self, next: T) {
437 // Assert that we do not rewind time.
438 assert!(self.now_at.less_equal(&next));
439 // Flush buffers if time has actually changed.
440 if !self.now_at.eq(&next) {
441 self.close_epoch();
442 self.now_at = next;
443 for progress in self.progress.iter() {
444 progress.borrow_mut().update(self.now_at.clone(), 1);
445 }
446 }
447 }
448
449 /// Closes the input.
450 ///
451 /// This method allows timely dataflow to issue all progress notifications blocked by this input
452 /// and to begin to shut down operators, as this input can no longer produce data.
453 pub fn close(self) { }
454
455 /// Reports the current epoch.
456 pub fn epoch(&self) -> &T {
457 &self.now_at
458 }
459
460 /// Reports the current timestamp.
461 pub fn time(&self) -> &T {
462 &self.now_at
463 }
464}
465
466impl<T: Timestamp, D: Data> Handle<T, D> {
467 #[inline]
468 /// Sends one record into the corresponding timely dataflow `Stream`, at the current epoch.
469 ///
470 /// # Examples
471 /// ```
472 /// use timely::*;
473 /// use timely::dataflow::operators::{Input, Inspect};
474 /// use timely::dataflow::operators::input::Handle;
475 ///
476 /// // construct and execute a timely dataflow
477 /// timely::execute(Config::thread(), |worker| {
478 ///
479 /// // add an input and base computation off of it
480 /// let mut input = Handle::new();
481 /// worker.dataflow(|scope| {
482 /// scope.input_from(&mut input)
483 /// .inspect(|x| println!("hello {:?}", x));
484 /// });
485 ///
486 /// // introduce input, advance computation
487 /// for round in 0..10 {
488 /// input.send(round);
489 /// input.advance_to(round + 1);
490 /// worker.step();
491 /// }
492 /// });
493 /// ```
494 pub fn send(&mut self, data: D) {
495 // assert!(self.buffer1.capacity() == Message::<T, D>::default_length());
496 self.buffer1.push(data);
497 if self.buffer1.len() == self.buffer1.capacity() {
498 self.flush();
499 }
500 }
501}
502
503impl<T: Timestamp, D: Data> Default for Handle<T, D> {
504 fn default() -> Self {
505 Self::new()
506 }
507}
508
509impl<T:Timestamp, C: Container> Drop for HandleCore<T, C> {
510 fn drop(&mut self) {
511 self.close_epoch();
512 }
513}