timely/dataflow/operators/generic/operator.rs
1
2//! Methods to construct generic streaming and blocking unary operators.
3
4use crate::dataflow::channels::pushers::TeeCore;
5use crate::dataflow::channels::pact::ParallelizationContractCore;
6
7use crate::dataflow::operators::generic::handles::{InputHandleCore, FrontieredInputHandleCore, OutputHandleCore};
8use crate::dataflow::operators::capability::Capability;
9
10use crate::dataflow::{Scope, StreamCore};
11
12use super::builder_rc::OperatorBuilder;
13use crate::dataflow::operators::generic::OperatorInfo;
14use crate::dataflow::operators::generic::notificator::{Notificator, FrontierNotificator};
15use crate::Container;
16
17/// Methods to construct generic streaming and blocking operators.
18pub trait Operator<G: Scope, D1: Container> {
19 /// Creates a new dataflow operator that partitions its input stream by a parallelization
20 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
21 /// `logic` can read from the input stream, write to the output stream, and inspect the frontier at the input.
22 ///
23 /// # Examples
24 /// ```
25 /// use std::collections::HashMap;
26 /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
27 /// use timely::dataflow::operators::generic::Operator;
28 /// use timely::dataflow::channels::pact::Pipeline;
29 ///
30 /// fn main() {
31 /// timely::example(|scope| {
32 /// (0u64..10).to_stream(scope)
33 /// .unary_frontier(Pipeline, "example", |default_cap, _info| {
34 /// let mut cap = Some(default_cap.delayed(&12));
35 /// let mut notificator = FrontierNotificator::new();
36 /// let mut stash = HashMap::new();
37 /// let mut vector = Vec::new();
38 /// move |input, output| {
39 /// if let Some(ref c) = cap.take() {
40 /// output.session(&c).give(12);
41 /// }
42 /// while let Some((time, data)) = input.next() {
43 /// data.swap(&mut vector);
44 /// stash.entry(time.time().clone())
45 /// .or_insert(Vec::new())
46 /// .extend(vector.drain(..));
47 /// }
48 /// notificator.for_each(&[input.frontier()], |time, _not| {
49 /// if let Some(mut vec) = stash.remove(time.time()) {
50 /// output.session(&time).give_iterator(vec.drain(..));
51 /// }
52 /// });
53 /// }
54 /// });
55 /// });
56 /// }
57 /// ```
58 fn unary_frontier<D2, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, D2>
59 where
60 D2: Container,
61 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
62 L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, D1, P::Puller>,
63 &mut OutputHandleCore<G::Timestamp, D2, TeeCore<G::Timestamp, D2>>)+'static,
64 P: ParallelizationContractCore<G::Timestamp, D1>;
65
66 /// Creates a new dataflow operator that partitions its input stream by a parallelization
67 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
68 /// `logic` can read from the input stream, write to the output stream, and inspect the frontier at the input.
69 ///
70 /// # Examples
71 /// ```
72 /// use std::collections::HashMap;
73 /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
74 /// use timely::dataflow::operators::generic::Operator;
75 /// use timely::dataflow::channels::pact::Pipeline;
76 ///
77 /// fn main() {
78 /// timely::example(|scope| {
79 /// let mut vector = Vec::new();
80 /// (0u64..10)
81 /// .to_stream(scope)
82 /// .unary_notify(Pipeline, "example", None, move |input, output, notificator| {
83 /// input.for_each(|time, data| {
84 /// data.swap(&mut vector);
85 /// output.session(&time).give_vec(&mut vector);
86 /// notificator.notify_at(time.retain());
87 /// });
88 /// notificator.for_each(|time, _cnt, _not| {
89 /// println!("notified at {:?}", time);
90 /// });
91 /// });
92 /// });
93 /// }
94 /// ```
95 fn unary_notify<D2: Container,
96 L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P::Puller>,
97 &mut OutputHandleCore<G::Timestamp, D2, TeeCore<G::Timestamp, D2>>,
98 &mut Notificator<G::Timestamp>)+'static,
99 P: ParallelizationContractCore<G::Timestamp, D1>>
100 (&self, pact: P, name: &str, init: impl IntoIterator<Item=G::Timestamp>, logic: L) -> StreamCore<G, D2>;
101
102 /// Creates a new dataflow operator that partitions its input stream by a parallelization
103 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
104 /// `logic` can read from the input stream, and write to the output stream.
105 ///
106 /// # Examples
107 /// ```
108 /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
109 /// use timely::dataflow::operators::generic::operator::Operator;
110 /// use timely::dataflow::channels::pact::Pipeline;
111 /// use timely::dataflow::Scope;
112 ///
113 /// timely::example(|scope| {
114 /// (0u64..10).to_stream(scope)
115 /// .unary(Pipeline, "example", |default_cap, _info| {
116 /// let mut cap = Some(default_cap.delayed(&12));
117 /// let mut vector = Vec::new();
118 /// move |input, output| {
119 /// if let Some(ref c) = cap.take() {
120 /// output.session(&c).give(100);
121 /// }
122 /// while let Some((time, data)) = input.next() {
123 /// data.swap(&mut vector);
124 /// output.session(&time).give_vec(&mut vector);
125 /// }
126 /// }
127 /// });
128 /// });
129 /// ```
130 fn unary<D2, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, D2>
131 where
132 D2: Container,
133 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
134 L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P::Puller>,
135 &mut OutputHandleCore<G::Timestamp, D2, TeeCore<G::Timestamp, D2>>)+'static,
136 P: ParallelizationContractCore<G::Timestamp, D1>;
137
138 /// Creates a new dataflow operator that partitions its input streams by a parallelization
139 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
140 /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
141 ///
142 /// # Examples
143 /// ```
144 /// use std::collections::HashMap;
145 /// use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
146 /// use timely::dataflow::operators::generic::operator::Operator;
147 /// use timely::dataflow::channels::pact::Pipeline;
148 ///
149 /// timely::execute(timely::Config::thread(), |worker| {
150 /// let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
151 /// let (in1_handle, in1) = scope.new_input();
152 /// let (in2_handle, in2) = scope.new_input();
153 /// in1.binary_frontier(&in2, Pipeline, Pipeline, "example", |mut _default_cap, _info| {
154 /// let mut notificator = FrontierNotificator::new();
155 /// let mut stash = HashMap::new();
156 /// let mut vector1 = Vec::new();
157 /// let mut vector2 = Vec::new();
158 /// move |input1, input2, output| {
159 /// while let Some((time, data)) = input1.next() {
160 /// data.swap(&mut vector1);
161 /// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(vector1.drain(..));
162 /// notificator.notify_at(time.retain());
163 /// }
164 /// while let Some((time, data)) = input2.next() {
165 /// data.swap(&mut vector2);
166 /// stash.entry(time.time().clone()).or_insert(Vec::new()).extend(vector2.drain(..));
167 /// notificator.notify_at(time.retain());
168 /// }
169 /// notificator.for_each(&[input1.frontier(), input2.frontier()], |time, _not| {
170 /// if let Some(mut vec) = stash.remove(time.time()) {
171 /// output.session(&time).give_iterator(vec.drain(..));
172 /// }
173 /// });
174 /// }
175 /// }).inspect_batch(|t, x| println!("{:?} -> {:?}", t, x));
176 ///
177 /// (in1_handle, in2_handle)
178 /// });
179 ///
180 /// for i in 1..10 {
181 /// in1.send(i - 1);
182 /// in1.advance_to(i);
183 /// in2.send(i - 1);
184 /// in2.advance_to(i);
185 /// }
186 /// }).unwrap();
187 /// ```
188 fn binary_frontier<D2, D3, B, L, P1, P2>(&self, other: &StreamCore<G, D2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, D3>
189 where
190 D2: Container,
191 D3: Container,
192 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
193 L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, D1, P1::Puller>,
194 &mut FrontieredInputHandleCore<G::Timestamp, D2, P2::Puller>,
195 &mut OutputHandleCore<G::Timestamp, D3, TeeCore<G::Timestamp, D3>>)+'static,
196 P1: ParallelizationContractCore<G::Timestamp, D1>,
197 P2: ParallelizationContractCore<G::Timestamp, D2>;
198
199 /// Creates a new dataflow operator that partitions its input streams by a parallelization
200 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
201 /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
202 ///
203 /// # Examples
204 /// ```
205 /// use std::collections::HashMap;
206 /// use timely::dataflow::operators::{Input, Inspect, FrontierNotificator};
207 /// use timely::dataflow::operators::generic::operator::Operator;
208 /// use timely::dataflow::channels::pact::Pipeline;
209 ///
210 /// timely::execute(timely::Config::thread(), |worker| {
211 /// let (mut in1, mut in2) = worker.dataflow::<usize,_,_>(|scope| {
212 /// let (in1_handle, in1) = scope.new_input();
213 /// let (in2_handle, in2) = scope.new_input();
214 ///
215 /// let mut vector1 = Vec::new();
216 /// let mut vector2 = Vec::new();
217 /// in1.binary_notify(&in2, Pipeline, Pipeline, "example", None, move |input1, input2, output, notificator| {
218 /// input1.for_each(|time, data| {
219 /// data.swap(&mut vector1);
220 /// output.session(&time).give_vec(&mut vector1);
221 /// notificator.notify_at(time.retain());
222 /// });
223 /// input2.for_each(|time, data| {
224 /// data.swap(&mut vector2);
225 /// output.session(&time).give_vec(&mut vector2);
226 /// notificator.notify_at(time.retain());
227 /// });
228 /// notificator.for_each(|time, _cnt, _not| {
229 /// println!("notified at {:?}", time);
230 /// });
231 /// });
232 ///
233 /// (in1_handle, in2_handle)
234 /// });
235 ///
236 /// for i in 1..10 {
237 /// in1.send(i - 1);
238 /// in1.advance_to(i);
239 /// in2.send(i - 1);
240 /// in2.advance_to(i);
241 /// }
242 /// }).unwrap();
243 /// ```
244 fn binary_notify<D2: Container,
245 D3: Container,
246 L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P1::Puller>,
247 &mut InputHandleCore<G::Timestamp, D2, P2::Puller>,
248 &mut OutputHandleCore<G::Timestamp, D3, TeeCore<G::Timestamp, D3>>,
249 &mut Notificator<G::Timestamp>)+'static,
250 P1: ParallelizationContractCore<G::Timestamp, D1>,
251 P2: ParallelizationContractCore<G::Timestamp, D2>>
252 (&self, other: &StreamCore<G, D2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item=G::Timestamp>, logic: L) -> StreamCore<G, D3>;
253
254 /// Creates a new dataflow operator that partitions its input streams by a parallelization
255 /// strategy `pact`, and repeatedly invokes `logic`, the function returned by the function passed as `constructor`.
256 /// `logic` can read from the input streams, write to the output stream, and inspect the frontier at the inputs.
257 ///
258 /// # Examples
259 /// ```
260 /// use timely::dataflow::operators::{ToStream, Inspect, FrontierNotificator};
261 /// use timely::dataflow::operators::generic::operator::Operator;
262 /// use timely::dataflow::channels::pact::Pipeline;
263 /// use timely::dataflow::Scope;
264 ///
265 /// timely::example(|scope| {
266 /// let stream2 = (0u64..10).to_stream(scope);
267 /// (0u64..10).to_stream(scope)
268 /// .binary(&stream2, Pipeline, Pipeline, "example", |default_cap, _info| {
269 /// let mut cap = Some(default_cap.delayed(&12));
270 /// let mut vector1 = Vec::new();
271 /// let mut vector2 = Vec::new();
272 /// move |input1, input2, output| {
273 /// if let Some(ref c) = cap.take() {
274 /// output.session(&c).give(100);
275 /// }
276 /// while let Some((time, data)) = input1.next() {
277 /// data.swap(&mut vector1);
278 /// output.session(&time).give_vec(&mut vector1);
279 /// }
280 /// while let Some((time, data)) = input2.next() {
281 /// data.swap(&mut vector2);
282 /// output.session(&time).give_vec(&mut vector2);
283 /// }
284 /// }
285 /// }).inspect(|x| println!("{:?}", x));
286 /// });
287 /// ```
288 fn binary<D2, D3, B, L, P1, P2>(&self, other: &StreamCore<G, D2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, D3>
289 where
290 D2: Container,
291 D3: Container,
292 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
293 L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P1::Puller>,
294 &mut InputHandleCore<G::Timestamp, D2, P2::Puller>,
295 &mut OutputHandleCore<G::Timestamp, D3, TeeCore<G::Timestamp, D3>>)+'static,
296 P1: ParallelizationContractCore<G::Timestamp, D1>,
297 P2: ParallelizationContractCore<G::Timestamp, D2>;
298
299 /// Creates a new dataflow operator that partitions its input stream by a parallelization
300 /// strategy `pact`, and repeatedly invokes the function `logic` which can read from the input stream
301 /// and inspect the frontier at the input.
302 ///
303 /// # Examples
304 /// ```
305 /// use timely::dataflow::operators::{ToStream, FrontierNotificator};
306 /// use timely::dataflow::operators::generic::operator::Operator;
307 /// use timely::dataflow::channels::pact::Pipeline;
308 /// use timely::dataflow::Scope;
309 ///
310 /// timely::example(|scope| {
311 /// (0u64..10)
312 /// .to_stream(scope)
313 /// .sink(Pipeline, "example", |input| {
314 /// while let Some((time, data)) = input.next() {
315 /// for datum in data.iter() {
316 /// println!("{:?}:\t{:?}", time, datum);
317 /// }
318 /// }
319 /// });
320 /// });
321 /// ```
322 fn sink<L, P>(&self, pact: P, name: &str, logic: L)
323 where
324 L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, D1, P::Puller>)+'static,
325 P: ParallelizationContractCore<G::Timestamp, D1>;
326}
327
328impl<G: Scope, D1: Container> Operator<G, D1> for StreamCore<G, D1> {
329
330 fn unary_frontier<D2, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, D2>
331 where
332 D2: Container,
333 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
334 L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, D1, P::Puller>,
335 &mut OutputHandleCore<G::Timestamp, D2, TeeCore<G::Timestamp, D2>>)+'static,
336 P: ParallelizationContractCore<G::Timestamp, D1> {
337
338 let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
339 let operator_info = builder.operator_info();
340
341 let mut input = builder.new_input(self, pact);
342 let (mut output, stream) = builder.new_output();
343
344 builder.build(move |mut capabilities| {
345 // `capabilities` should be a single-element vector.
346 let capability = capabilities.pop().unwrap();
347 let mut logic = constructor(capability, operator_info);
348 move |frontiers| {
349 let mut input_handle = FrontieredInputHandleCore::new(&mut input, &frontiers[0]);
350 let mut output_handle = output.activate();
351 logic(&mut input_handle, &mut output_handle);
352 }
353 });
354
355 stream
356 }
357
358 fn unary_notify<D2: Container,
359 L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P::Puller>,
360 &mut OutputHandleCore<G::Timestamp, D2, TeeCore<G::Timestamp, D2>>,
361 &mut Notificator<G::Timestamp>)+'static,
362 P: ParallelizationContractCore<G::Timestamp, D1>>
363 (&self, pact: P, name: &str, init: impl IntoIterator<Item=G::Timestamp>, mut logic: L) -> StreamCore<G, D2> {
364
365 self.unary_frontier(pact, name, move |capability, _info| {
366 let mut notificator = FrontierNotificator::new();
367 for time in init {
368 notificator.notify_at(capability.delayed(&time));
369 }
370
371 let logging = self.scope().logging();
372 move |input, output| {
373 let frontier = &[input.frontier()];
374 let notificator = &mut Notificator::new(frontier, &mut notificator, &logging);
375 logic(&mut input.handle, output, notificator);
376 }
377 })
378 }
379
380 fn unary<D2, B, L, P>(&self, pact: P, name: &str, constructor: B) -> StreamCore<G, D2>
381 where
382 D2: Container,
383 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
384 L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P::Puller>,
385 &mut OutputHandleCore<G::Timestamp, D2, TeeCore<G::Timestamp, D2>>)+'static,
386 P: ParallelizationContractCore<G::Timestamp, D1> {
387
388 let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
389 let operator_info = builder.operator_info();
390
391 let mut input = builder.new_input(self, pact);
392 let (mut output, stream) = builder.new_output();
393 builder.set_notify(false);
394
395 builder.build(move |mut capabilities| {
396 // `capabilities` should be a single-element vector.
397 let capability = capabilities.pop().unwrap();
398 let mut logic = constructor(capability, operator_info);
399 move |_frontiers| {
400 let mut output_handle = output.activate();
401 logic(&mut input, &mut output_handle);
402 }
403 });
404
405 stream
406 }
407
408 fn binary_frontier<D2, D3, B, L, P1, P2>(&self, other: &StreamCore<G, D2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, D3>
409 where
410 D2: Container,
411 D3: Container,
412 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
413 L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, D1, P1::Puller>,
414 &mut FrontieredInputHandleCore<G::Timestamp, D2, P2::Puller>,
415 &mut OutputHandleCore<G::Timestamp, D3, TeeCore<G::Timestamp, D3>>)+'static,
416 P1: ParallelizationContractCore<G::Timestamp, D1>,
417 P2: ParallelizationContractCore<G::Timestamp, D2> {
418
419 let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
420 let operator_info = builder.operator_info();
421
422 let mut input1 = builder.new_input(self, pact1);
423 let mut input2 = builder.new_input(other, pact2);
424 let (mut output, stream) = builder.new_output();
425
426 builder.build(move |mut capabilities| {
427 // `capabilities` should be a single-element vector.
428 let capability = capabilities.pop().unwrap();
429 let mut logic = constructor(capability, operator_info);
430 move |frontiers| {
431 let mut input1_handle = FrontieredInputHandleCore::new(&mut input1, &frontiers[0]);
432 let mut input2_handle = FrontieredInputHandleCore::new(&mut input2, &frontiers[1]);
433 let mut output_handle = output.activate();
434 logic(&mut input1_handle, &mut input2_handle, &mut output_handle);
435 }
436 });
437
438 stream
439 }
440
441 fn binary_notify<D2: Container,
442 D3: Container,
443 L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P1::Puller>,
444 &mut InputHandleCore<G::Timestamp, D2, P2::Puller>,
445 &mut OutputHandleCore<G::Timestamp, D3, TeeCore<G::Timestamp, D3>>,
446 &mut Notificator<G::Timestamp>)+'static,
447 P1: ParallelizationContractCore<G::Timestamp, D1>,
448 P2: ParallelizationContractCore<G::Timestamp, D2>>
449 (&self, other: &StreamCore<G, D2>, pact1: P1, pact2: P2, name: &str, init: impl IntoIterator<Item=G::Timestamp>, mut logic: L) -> StreamCore<G, D3> {
450
451 self.binary_frontier(other, pact1, pact2, name, |capability, _info| {
452 let mut notificator = FrontierNotificator::new();
453 for time in init {
454 notificator.notify_at(capability.delayed(&time));
455 }
456
457 let logging = self.scope().logging();
458 move |input1, input2, output| {
459 let frontiers = &[input1.frontier(), input2.frontier()];
460 let notificator = &mut Notificator::new(frontiers, &mut notificator, &logging);
461 logic(&mut input1.handle, &mut input2.handle, output, notificator);
462 }
463 })
464
465 }
466
467
468 fn binary<D2, D3, B, L, P1, P2>(&self, other: &StreamCore<G, D2>, pact1: P1, pact2: P2, name: &str, constructor: B) -> StreamCore<G, D3>
469 where
470 D2: Container,
471 D3: Container,
472 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
473 L: FnMut(&mut InputHandleCore<G::Timestamp, D1, P1::Puller>,
474 &mut InputHandleCore<G::Timestamp, D2, P2::Puller>,
475 &mut OutputHandleCore<G::Timestamp, D3, TeeCore<G::Timestamp, D3>>)+'static,
476 P1: ParallelizationContractCore<G::Timestamp, D1>,
477 P2: ParallelizationContractCore<G::Timestamp, D2> {
478
479 let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
480 let operator_info = builder.operator_info();
481
482 let mut input1 = builder.new_input(self, pact1);
483 let mut input2 = builder.new_input(other, pact2);
484 let (mut output, stream) = builder.new_output();
485 builder.set_notify(false);
486
487 builder.build(move |mut capabilities| {
488 // `capabilities` should be a single-element vector.
489 let capability = capabilities.pop().unwrap();
490 let mut logic = constructor(capability, operator_info);
491 move |_frontiers| {
492 let mut output_handle = output.activate();
493 logic(&mut input1, &mut input2, &mut output_handle);
494 }
495 });
496
497 stream
498 }
499
500 fn sink<L, P>(&self, pact: P, name: &str, mut logic: L)
501 where
502 L: FnMut(&mut FrontieredInputHandleCore<G::Timestamp, D1, P::Puller>)+'static,
503 P: ParallelizationContractCore<G::Timestamp, D1> {
504
505 let mut builder = OperatorBuilder::new(name.to_owned(), self.scope());
506 let mut input = builder.new_input(self, pact);
507
508 builder.build(|_capabilities| {
509 move |frontiers| {
510 let mut input_handle = FrontieredInputHandleCore::new(&mut input, &frontiers[0]);
511 logic(&mut input_handle);
512 }
513 });
514 }
515}
516
517/// Creates a new data stream source for a scope.
518///
519/// The source is defined by a name, and a constructor which takes a default capability to
520/// a method that can be repeatedly called on a output handle. The method is then repeatedly
521/// invoked, and is expected to eventually send data and downgrade and release capabilities.
522///
523/// # Examples
524/// ```
525/// use timely::scheduling::Scheduler;
526/// use timely::dataflow::operators::Inspect;
527/// use timely::dataflow::operators::generic::operator::source;
528/// use timely::dataflow::Scope;
529///
530/// timely::example(|scope| {
531///
532/// source(scope, "Source", |capability, info| {
533///
534/// let activator = scope.activator_for(&info.address[..]);
535///
536/// let mut cap = Some(capability);
537/// move |output| {
538///
539/// let mut done = false;
540/// if let Some(cap) = cap.as_mut() {
541/// // get some data and send it.
542/// let time = cap.time().clone();
543/// output.session(&cap)
544/// .give(*cap.time());
545///
546/// // downgrade capability.
547/// cap.downgrade(&(time + 1));
548/// done = time > 20;
549/// }
550///
551/// if done { cap = None; }
552/// else { activator.activate(); }
553/// }
554/// })
555/// .inspect(|x| println!("number: {:?}", x));
556/// });
557/// ```
558pub fn source<G: Scope, D, B, L>(scope: &G, name: &str, constructor: B) -> StreamCore<G, D>
559where
560 D: Container,
561 B: FnOnce(Capability<G::Timestamp>, OperatorInfo) -> L,
562 L: FnMut(&mut OutputHandleCore<G::Timestamp, D, TeeCore<G::Timestamp, D>>)+'static {
563
564 let mut builder = OperatorBuilder::new(name.to_owned(), scope.clone());
565 let operator_info = builder.operator_info();
566
567 let (mut output, stream) = builder.new_output();
568 builder.set_notify(false);
569
570 builder.build(move |mut capabilities| {
571 // `capabilities` should be a single-element vector.
572 let capability = capabilities.pop().unwrap();
573 let mut logic = constructor(capability, operator_info);
574 move |_frontier| {
575 logic(&mut output.activate());
576 }
577 });
578
579 stream
580}
581
582/// Constructs an empty stream.
583///
584/// This method is useful in patterns where an input is required, but there is no
585/// meaningful data to provide. The replaces patterns like `stream.filter(|_| false)`
586/// which are just silly.
587///
588/// # Examples
589/// ```
590/// use timely::dataflow::operators::Inspect;
591/// use timely::dataflow::operators::generic::operator::empty;
592/// use timely::dataflow::Scope;
593///
594/// timely::example(|scope| {
595///
596///
597/// empty::<_, Vec<_>>(scope) // type required in this example
598/// .inspect(|()| panic!("never called"));
599///
600/// });
601/// ```
602pub fn empty<G: Scope, D: Container>(scope: &G) -> StreamCore<G, D> {
603 source(scope, "Empty", |_capability, _info| |_output| {
604 // drop capability, do nothing
605 })
606}