timely/dataflow/operators/generic/
operator.rs

1
2//! Methods to construct generic streaming and blocking unary operators.
3
4use crate::dataflow::channels::pushers::TeeCore;
5use crate::dataflow::channels::pact::ParallelizationContractCore;
6
7use crate::dataflow::operators::generic::handles::{InputHandleCore, FrontieredInputHandleCore, OutputHandleCore};
8use crate::dataflow::operators::capability::Capability;
9
10use crate::dataflow::{Scope, StreamCore};
11
12use super::builder_rc::OperatorBuilder;
13use crate::dataflow::operators::generic::OperatorInfo;
14use crate::dataflow::operators::generic::notificator::{Notificator, FrontierNotificator};
15use crate::Container;
16
17/// Methods to construct generic streaming and blocking operators.
18pub trait Operator<G: Scope, D1: Container> {
19    /// Creates a new dataflow operator that partitions its input stream by a parallelization
20    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
21    /// `logic` can read from the input stream, write to the output stream, and inspect the frontier at the input.
22    ///
23    /// # Examples
24    /// ```
25    /// use std::collections::HashMap;
26    /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
27    /// use timely::dataflow::operators::generic::Operator;
28    /// use timely::dataflow::channels::pact::Pipeline;
29    ///
30    /// fn main() {
31    ///     timely::example(|scope| {
32    ///         (0u64..10).to_stream(scope)
33    ///             .unary_frontier(Pipeline, "example", |default_cap, _info| {
34    ///                 let mut cap = Some(default_cap.delayed(&12));
35    ///                 let mut notificator = FrontierNotificator::new();
36    ///                 let mut stash = HashMap::new();
37    ///                 let mut vector = Vec::new();
38    ///                 move |input, output| {
39    ///                     if let Some(ref c) = cap.take() {
40    ///                         output.session(&c).give(12);
41    ///                     }
42    ///                     while let Some((time, data)) = input.next() {
43    ///                         data.swap(&mut vector);
44    ///                         stash.entry(time.time().clone())
45    ///                              .or_insert(Vec::new())
46    ///                              .extend(vector.drain(..));
47    ///                     }
48    ///                     notificator.for_each(&[input.frontier()], |time, _not| {
49    ///                         if let Some(mut vec) = stash.remove(time.time()) {
50    ///                             output.session(&time).give_iterator(vec.drain(..));
51    ///                         }
52    ///                     });
53    ///                 }
54    ///             });
55    ///     });
56    /// }
57    /// ```
58    fn unary_frontier<D2, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, D2>
59    where
60        D2: Container,
61        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
62        L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, D1, P::Puller>,
63                 &mut OutputHandleCore<G::Timestamp, D2, TeeCore<G::Timestamp, D2>>)+'static,
64        P: ParallelizationContractCore<G::Timestamp, D1>;
65
66    /// Creates a new dataflow operator that partitions its input stream by a parallelization
67    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
68    /// `logic` can read from the input stream, write to the output stream, and inspect the frontier at the input.
69    ///
70    /// # Examples
71    /// ```
72    /// use std::collections::HashMap;
73    /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
74    /// use timely::dataflow::operators::generic::Operator;
75    /// use timely::dataflow::channels::pact::Pipeline;
76    ///
77    /// fn main() {
78    ///     timely::example(|scope| {
79    ///         let mut vector = Vec::new();
80    ///         (0u64..10)
81    ///             .to_stream(scope)
82    ///             .unary_notify(Pipeline, "example", None, move |input, output, notificator| {
83    ///                 input.for_each(|time, data| {
84    ///                     data.swap(&mut vector);
85    ///                     output.session(&time).give_vec(&mut vector);
86    ///                     notificator.notify_at(time.retain());
87    ///                 });
88    ///                 notificator.for_each(|time, _cnt, _not| {
89    ///                     println!("notified at {:?}", time);
90    ///                 });
91    ///             });
92    ///     });
93    /// }
94    /// ```
95    fn unary_notify<D2: Container,
96            L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P::Puller>,
97                     &mut OutputHandleCore<G::Timestamp, D2, TeeCore<G::Timestamp, D2>>,
98                     &mut Notificator<G::Timestamp>)+'static,
99             P: ParallelizationContractCore<G::Timestamp, D1>>
100             (&self, pact: P, name: &str, init: impl IntoIterator<Item=G::Timestamp>, logic: L) -> StreamCore<G, D2>;
101
102    /// Creates a new dataflow operator that partitions its input stream by a parallelization
103    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
104    /// `logic` can read from the input stream, and write to the output stream.
105    ///
106    /// # Examples
107    /// ```
108    /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
109    /// use timely::dataflow::operators::generic::operator::Operator;
110    /// use timely::dataflow::channels::pact::Pipeline;
111    /// use timely::dataflow::Scope;
112    ///
113    /// timely::example(|scope| {
114    ///     (0u64..10).to_stream(scope)
115    ///         .unary(Pipeline, "example", |default_cap, _info| {
116    ///             let mut cap = Some(default_cap.delayed(&12));
117    ///             let mut vector = Vec::new();
118    ///             move |input, output| {
119    ///                 if let Some(ref c) = cap.take() {
120    ///                     output.session(&c).give(100);
121    ///                 }
122    ///                 while let Some((time, data)) = input.next() {
123    ///                     data.swap(&mut vector);
124    ///                     output.session(&time).give_vec(&mut vector);
125    ///                 }
126    ///             }
127    ///         });
128    /// });
129    /// ```
130    fn unary<D2, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, D2>
131    where
132        D2: Container,
133        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
134        L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P::Puller>,
135                 &mut OutputHandleCore<G::Timestamp, D2, TeeCore<G::Timestamp, D2>>)+'static,
136        P: ParallelizationContractCore<G::Timestamp, D1>;
137
138    /// Creates a new dataflow operator that partitions its input streams by a parallelization
139    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
140    /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
141    ///
142    /// # Examples
143    /// ```
144    /// use std::collections::HashMap;
145    /// use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
146    /// use timely::dataflow::operators::generic::operator::Operator;
147    /// use timely::dataflow::channels::pact::Pipeline;
148    ///
149    /// timely::execute(timely::Config::thread(), |worker| {
150    ///    let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
151    ///        let (in1_handle, in1) = scope.new_input();
152    ///        let (in2_handle, in2) = scope.new_input();
153    ///        in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| {
154    ///            let mut notificator = FrontierNotificator::new();
155    ///            let mut stash = HashMap::new();
156    ///            let mut vector1 = Vec::new();
157    ///            let mut vector2 = Vec::new();
158    ///            move |input1, input2, output| {
159    ///                while let Some((time, data)) = input1.next() {
160    ///                    data.swap(&mut vector1);
161    ///                    stash.entry(time.time().clone()).or_insert(Vec::new()).extend(vector1.drain(..));
162    ///                    notificator.notify_at(time.retain());
163    ///                }
164    ///                while let Some((time, data)) = input2.next() {
165    ///                    data.swap(&mut vector2);
166    ///                    stash.entry(time.time().clone()).or_insert(Vec::new()).extend(vector2.drain(..));
167    ///                    notificator.notify_at(time.retain());
168    ///                }
169    ///                notificator.for_each(&[input1.frontier(), input2.frontier()], |time, _not| {
170    ///                    if let Some(mut vec) = stash.remove(time.time()) {
171    ///                        output.session(&time).give_iterator(vec.drain(..));
172    ///                    }
173    ///                });
174    ///            }
175    ///        }).inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
176    ///
177    ///        (in1_handle, in2_handle)
178    ///    });
179    ///
180    ///    for i in 1..10 {
181    ///        in1.send(i - 1);
182    ///        in1.advance_to(i);
183    ///        in2.send(i - 1);
184    ///        in2.advance_to(i);
185    ///    }
186    /// }).unwrap();
187    /// ```
188    fn binary_frontier<D2, D3, B, L, P1, P2>(&self, other: &StreamCore<G, D2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, D3>
189    where
190        D2: Container,
191        D3: Container,
192        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
193        L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, D1, P1::Puller>,
194                 &mut FrontieredInputHandleCore<G::Timestamp, D2, P2::Puller>,
195                 &mut OutputHandleCore<G::Timestamp, D3, TeeCore<G::Timestamp, D3>>)+'static,
196        P1: ParallelizationContractCore<G::Timestamp, D1>,
197        P2: ParallelizationContractCore<G::Timestamp, D2>;
198
199    /// Creates a new dataflow operator that partitions its input streams by a parallelization
200    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
201    /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
202    ///
203    /// # Examples
204    /// ```
205    /// use std::collections::HashMap;
206    /// use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
207    /// use timely::dataflow::operators::generic::operator::Operator;
208    /// use timely::dataflow::channels::pact::Pipeline;
209    ///
210    /// timely::execute(timely::Config::thread(), |worker| {
211    ///    let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
212    ///        let (in1_handle, in1) = scope.new_input();
213    ///        let (in2_handle, in2) = scope.new_input();
214    ///
215    ///        let mut vector1 = Vec::new();
216    ///        let mut vector2 = Vec::new();
217    ///        in1.binary_notify(&in2, Pipeline, Pipeline, "example", None, move |input1, input2, output, notificator| {
218    ///            input1.for_each(|time, data| {
219    ///                data.swap(&mut vector1);
220    ///                output.session(&time).give_vec(&mut vector1);
221    ///                notificator.notify_at(time.retain());
222    ///            });
223    ///            input2.for_each(|time, data| {
224    ///                data.swap(&mut vector2);
225    ///                output.session(&time).give_vec(&mut vector2);
226    ///                notificator.notify_at(time.retain());
227    ///            });
228    ///            notificator.for_each(|time, _cnt, _not| {
229    ///                println!("notified at {:?}", time);
230    ///            });
231    ///        });
232    ///
233    ///        (in1_handle, in2_handle)
234    ///    });
235    ///
236    ///    for i in 1..10 {
237    ///        in1.send(i - 1);
238    ///        in1.advance_to(i);
239    ///        in2.send(i - 1);
240    ///        in2.advance_to(i);
241    ///    }
242    /// }).unwrap();
243    /// ```
244    fn binary_notify<D2: Container,
245              D3: Container,
246              L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P1::Puller>,
247                       &mut InputHandleCore<G::Timestamp, D2, P2::Puller>,
248                       &mut OutputHandleCore<G::Timestamp, D3, TeeCore<G::Timestamp, D3>>,
249                       &mut Notificator<G::Timestamp>)+'static,
250              P1: ParallelizationContractCore<G::Timestamp, D1>,
251              P2: ParallelizationContractCore<G::Timestamp, D2>>
252            (&self, other: &StreamCore<G, D2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item=G::Timestamp>, logic: L) -> StreamCore<G, D3>;
253
254    /// Creates a new dataflow operator that partitions its input streams by a parallelization
255    /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
256    /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
257    ///
258    /// # Examples
259    /// ```
260    /// use timely::dataflow::operators::{ToStream, Inspect, FrontierNotificator};
261    /// use timely::dataflow::operators::generic::operator::Operator;
262    /// use timely::dataflow::channels::pact::Pipeline;
263    /// use timely::dataflow::Scope;
264    ///
265    /// timely::example(|scope| {
266    ///     let stream2 = (0u64..10).to_stream(scope);
267    ///     (0u64..10).to_stream(scope)
268    ///         .binary(&stream2, Pipeline, Pipeline, "example", |default_cap, _info| {
269    ///             let mut cap = Some(default_cap.delayed(&12));
270    ///             let mut vector1 = Vec::new();
271    ///             let mut vector2 = Vec::new();
272    ///             move |input1, input2, output| {
273    ///                 if let Some(ref c) = cap.take() {
274    ///                     output.session(&c).give(100);
275    ///                 }
276    ///                 while let Some((time, data)) = input1.next() {
277    ///                     data.swap(&mut vector1);
278    ///                     output.session(&time).give_vec(&mut vector1);
279    ///                 }
280    ///                 while let Some((time, data)) = input2.next() {
281    ///                     data.swap(&mut vector2);
282    ///                     output.session(&time).give_vec(&mut vector2);
283    ///                 }
284    ///             }
285    ///         }).inspect(|x| println!("{:?}", x));
286    /// });
287    /// ```
288    fn binary<D2, D3, B, L, P1, P2>(&self, other: &StreamCore<G, D2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, D3>
289    where
290        D2: Container,
291        D3: Container,
292        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
293        L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P1::Puller>,
294                 &mut InputHandleCore<G::Timestamp, D2, P2::Puller>,
295                 &mut OutputHandleCore<G::Timestamp, D3, TeeCore<G::Timestamp, D3>>)+'static,
296        P1: ParallelizationContractCore<G::Timestamp, D1>,
297        P2: ParallelizationContractCore<G::Timestamp, D2>;
298
299    /// Creates a new dataflow operator that partitions its input stream by a parallelization
300    /// strategy `pact`, and repeatedly invokes the function `logic` which can read from the input stream
301    /// and inspect the frontier at the input.
302    ///
303    /// # Examples
304    /// ```
305    /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
306    /// use timely::dataflow::operators::generic::operator::Operator;
307    /// use timely::dataflow::channels::pact::Pipeline;
308    /// use timely::dataflow::Scope;
309    ///
310    /// timely::example(|scope| {
311    ///     (0u64..10)
312    ///         .to_stream(scope)
313    ///         .sink(Pipeline, "example", |input| {
314    ///             while let Some((time, data)) = input.next() {
315    ///                 for datum in data.iter() {
316    ///                     println!("{:?}:\t{:?}", time, datum);
317    ///                 }
318    ///             }
319    ///         });
320    /// });
321    /// ```
322    fn sink<L, P>(&self, pact: P, name: &str, logic: L)
323    where
324        L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, D1, P::Puller>)+'static,
325        P: ParallelizationContractCore<G::Timestamp, D1>;
326}
327
328impl<G: Scope, D1: Container> Operator<G, D1> for StreamCore<G, D1> {
329
330    fn unary_frontier<D2, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, D2>
331    where
332        D2: Container,
333        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
334        L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, D1, P::Puller>,
335                 &mut OutputHandleCore<G::Timestamp, D2, TeeCore<G::Timestamp, D2>>)+'static,
336        P: ParallelizationContractCore<G::Timestamp, D1> {
337
338        let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
339        let operator_info = builder.operator_info();
340
341        let mut input = builder.new_input(self, pact);
342        let (mut output, stream) = builder.new_output();
343
344        builder.build(move |mut capabilities| {
345            // `capabilities` should be a single-element vector.
346            let capability = capabilities.pop().unwrap();
347            let mut logic = constructor(capability, operator_info);
348            move |frontiers| {
349                let mut input_handle = FrontieredInputHandleCore::new(&mut input, &frontiers[0]);
350                let mut output_handle = output.activate();
351                logic(&mut input_handle, &mut output_handle);
352            }
353        });
354
355        stream
356    }
357
358    fn unary_notify<D2: Container,
359            L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P::Puller>,
360                     &mut OutputHandleCore<G::Timestamp, D2, TeeCore<G::Timestamp, D2>>,
361                     &mut Notificator<G::Timestamp>)+'static,
362             P: ParallelizationContractCore<G::Timestamp, D1>>
363             (&self, pact: P, name: &str, init: impl IntoIterator<Item=G::Timestamp>, mut logic: L) -> StreamCore<G, D2> {
364
365        self.unary_frontier(pact, name, move |capability, _info| {
366            let mut notificator = FrontierNotificator::new();
367            for time in init {
368                notificator.notify_at(capability.delayed(&time));
369            }
370
371            let logging = self.scope().logging();
372            move |input, output| {
373                let frontier = &[input.frontier()];
374                let notificator = &mut Notificator::new(frontier, &mut notificator, &logging);
375                logic(&mut input.handle, output, notificator);
376            }
377        })
378    }
379
380    fn unary<D2, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, D2>
381    where
382        D2: Container,
383        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
384        L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P::Puller>,
385                 &mut OutputHandleCore<G::Timestamp, D2, TeeCore<G::Timestamp, D2>>)+'static,
386        P: ParallelizationContractCore<G::Timestamp, D1> {
387
388        let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
389        let operator_info = builder.operator_info();
390
391        let mut input = builder.new_input(self, pact);
392        let (mut output, stream) = builder.new_output();
393        builder.set_notify(false);
394
395        builder.build(move |mut capabilities| {
396            // `capabilities` should be a single-element vector.
397            let capability = capabilities.pop().unwrap();
398            let mut logic = constructor(capability, operator_info);
399            move |_frontiers| {
400                let mut output_handle = output.activate();
401                logic(&mut input, &mut output_handle);
402            }
403        });
404
405        stream
406    }
407
408    fn binary_frontier<D2, D3, B, L, P1, P2>(&self, other: &StreamCore<G, D2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, D3>
409    where
410        D2: Container,
411        D3: Container,
412        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
413        L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, D1, P1::Puller>,
414                 &mut FrontieredInputHandleCore<G::Timestamp, D2, P2::Puller>,
415                 &mut OutputHandleCore<G::Timestamp, D3, TeeCore<G::Timestamp, D3>>)+'static,
416        P1: ParallelizationContractCore<G::Timestamp, D1>,
417        P2: ParallelizationContractCore<G::Timestamp, D2> {
418
419        let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
420        let operator_info = builder.operator_info();
421
422        let mut input1 = builder.new_input(self, pact1);
423        let mut input2 = builder.new_input(other, pact2);
424        let (mut output, stream) = builder.new_output();
425
426        builder.build(move |mut capabilities| {
427            // `capabilities` should be a single-element vector.
428            let capability = capabilities.pop().unwrap();
429            let mut logic = constructor(capability, operator_info);
430            move |frontiers| {
431                let mut input1_handle = FrontieredInputHandleCore::new(&mut input1, &frontiers[0]);
432                let mut input2_handle = FrontieredInputHandleCore::new(&mut input2, &frontiers[1]);
433                let mut output_handle = output.activate();
434                logic(&mut input1_handle, &mut input2_handle, &mut output_handle);
435            }
436        });
437
438        stream
439    }
440
441    fn binary_notify<D2: Container,
442              D3: Container,
443              L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P1::Puller>,
444                       &mut InputHandleCore<G::Timestamp, D2, P2::Puller>,
445                       &mut OutputHandleCore<G::Timestamp, D3, TeeCore<G::Timestamp, D3>>,
446                       &mut Notificator<G::Timestamp>)+'static,
447              P1: ParallelizationContractCore<G::Timestamp, D1>,
448              P2: ParallelizationContractCore<G::Timestamp, D2>>
449            (&self, other: &StreamCore<G, D2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item=G::Timestamp>, mut logic: L) -> StreamCore<G, D3> {
450
451        self.binary_frontier(other, pact1, pact2, name, |capability, _info| {
452            let mut notificator = FrontierNotificator::new();
453            for time in init {
454                notificator.notify_at(capability.delayed(&time));
455            }
456
457            let logging = self.scope().logging();
458            move |input1, input2, output| {
459                let frontiers = &[input1.frontier(), input2.frontier()];
460                let notificator = &mut Notificator::new(frontiers, &mut notificator, &logging);
461                logic(&mut input1.handle, &mut input2.handle, output, notificator);
462            }
463        })
464
465    }
466
467
468    fn binary<D2, D3, B, L, P1, P2>(&self, other: &StreamCore<G, D2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, D3>
469    where
470        D2: Container,
471        D3: Container,
472        B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
473        L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P1::Puller>,
474                 &mut InputHandleCore<G::Timestamp, D2, P2::Puller>,
475                 &mut OutputHandleCore<G::Timestamp, D3, TeeCore<G::Timestamp, D3>>)+'static,
476        P1: ParallelizationContractCore<G::Timestamp, D1>,
477        P2: ParallelizationContractCore<G::Timestamp, D2> {
478
479        let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
480        let operator_info = builder.operator_info();
481
482        let mut input1 = builder.new_input(self, pact1);
483        let mut input2 = builder.new_input(other, pact2);
484        let (mut output, stream) = builder.new_output();
485        builder.set_notify(false);
486
487        builder.build(move |mut capabilities| {
488            // `capabilities` should be a single-element vector.
489            let capability = capabilities.pop().unwrap();
490            let mut logic = constructor(capability, operator_info);
491            move |_frontiers| {
492                let mut output_handle = output.activate();
493                logic(&mut input1, &mut input2, &mut output_handle);
494            }
495        });
496
497        stream
498    }
499
500    fn sink<L, P>(&self, pact: P, name: &str, mut logic: L)
501    where
502        L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, D1, P::Puller>)+'static,
503        P: ParallelizationContractCore<G::Timestamp, D1> {
504
505        let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
506        let mut input = builder.new_input(self, pact);
507
508        builder.build(|_capabilities| {
509            move |frontiers| {
510                let mut input_handle = FrontieredInputHandleCore::new(&mut input, &frontiers[0]);
511                logic(&mut input_handle);
512            }
513        });
514    }
515}
516
517/// Creates a new data stream source for a scope.
518///
519/// The source is defined by a name, and a constructor which takes a default capability to
520/// a method that can be repeatedly called on a output handle. The method is then repeatedly
521/// invoked, and is expected to eventually send data and downgrade and release capabilities.
522///
523/// # Examples
524/// ```
525/// use timely::scheduling::Scheduler;
526/// use timely::dataflow::operators::Inspect;
527/// use timely::dataflow::operators::generic::operator::source;
528/// use timely::dataflow::Scope;
529///
530/// timely::example(|scope| {
531///
532///     source(scope, "Source", |capability, info| {
533///
534///         let activator = scope.activator_for(&info.address[..]);
535///
536///         let mut cap = Some(capability);
537///         move |output| {
538///
539///             let mut done = false;
540///             if let Some(cap) = cap.as_mut() {
541///                 // get some data and send it.
542///                 let time = cap.time().clone();
543///                 output.session(&cap)
544///                       .give(*cap.time());
545///
546///                 // downgrade capability.
547///                 cap.downgrade(&(time + 1));
548///                 done = time > 20;
549///             }
550///
551///             if done { cap = None; }
552///             else    { activator.activate(); }
553///         }
554///     })
555///     .inspect(|x| println!("number: {:?}", x));
556/// });
557/// ```
558pub fn source<G: Scope, D, B, L>(scope: &G, name: &str, constructor: B) -> StreamCore<G, D>
559where
560    D: Container,
561    B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
562    L: FnMut(&mut OutputHandleCore<G::Timestamp, D, TeeCore<G::Timestamp, D>>)+'static {
563
564    let mut builder = OperatorBuilder::new(name.to_owned(), scope.clone());
565    let operator_info = builder.operator_info();
566
567    let (mut output, stream) = builder.new_output();
568    builder.set_notify(false);
569
570    builder.build(move |mut capabilities| {
571        // `capabilities` should be a single-element vector.
572        let capability = capabilities.pop().unwrap();
573        let mut logic = constructor(capability, operator_info);
574        move |_frontier| {
575            logic(&mut output.activate());
576        }
577    });
578
579    stream
580}
581
582/// Constructs an empty stream.
583///
584/// This method is useful in patterns where an input is required, but there is no
585/// meaningful data to provide. The replaces patterns like `stream.filter(|_| false)`
586/// which are just silly.
587///
588/// # Examples
589/// ```
590/// use timely::dataflow::operators::Inspect;
591/// use timely::dataflow::operators::generic::operator::empty;
592/// use timely::dataflow::Scope;
593///
594/// timely::example(|scope| {
595///
596///
597///     empty::<_, Vec<_>>(scope)     // type required in this example
598///         .inspect(|()| panic!("never called"));
599///
600/// });
601/// ```
602pub fn empty<G: Scope, D: Container>(scope: &G) -> StreamCore<G, D> {
603    source(scope, "Empty", |_capability, _info| |_output| {
604        // drop capability, do nothing
605    })
606}