timely/dataflow/operators/
flow_controlled.rs

1//! Methods to construct flow-controlled sources.
2
3use crate::Data;
4use crate::order::{PartialOrder, TotalOrder};
5use crate::progress::timestamp::Timestamp;
6use crate::dataflow::operators::generic::operator::source;
7use crate::dataflow::operators::probe::Handle;
8use crate::dataflow::{Stream, Scope};
9
10/// Output of the input reading function for iterator_source.
11pub struct IteratorSourceInput<T: Clone, D: Data, DI: IntoIterator<Item=D>, I: IntoIterator<Item=(T, DI)>> {
12    /// Lower bound on timestamps that can be emitted by this input in the future.
13    pub lower_bound: T,
14    /// Any `T: IntoIterator` of new input data in the form (time, data): time must be
15    /// monotonically increasing.
16    pub data: I,
17    /// A timestamp that represents the frontier that the probe should have
18    /// reached before the function is invoked again to ingest additional input.
19    pub target: T,
20}
21
22/// Construct a source that repeatedly calls the provided function to ingest input.
23/// - The function can return None to signal the end of the input;
24/// - otherwise, it should return a `IteratorSourceInput`, where:
25///   * `lower_bound` is a lower bound on timestamps that can be emitted by this input in the future,
26///   `Default::default()` can be used if this isn't needed (the source will assume that
27///   the timestamps in `data` are monotonically increasing and will release capabilities
28///   accordingly);
29///   * `data` is any `T: IntoIterator` of new input data in the form (time, data): time must be
30///   monotonically increasing;
31///   * `target` is a timestamp that represents the frontier that the probe should have
32///   reached before the function is invoked again to ingest additional input.
33/// The function will receive the current lower bound of timestamps that can be inserted,
34/// `lower_bound`.
35///
36/// # Example
37/// ```rust
38/// extern crate timely;
39///
40/// use timely::dataflow::operators::flow_controlled::{iterator_source, IteratorSourceInput};
41/// use timely::dataflow::operators::{probe, Probe, Inspect};
42///
43/// fn main() {
44///     timely::execute_from_args(std::env::args(), |worker| {
45///         let mut input = (0u64..100000).peekable();
46///         worker.dataflow(|scope| {
47///             let mut probe_handle = probe::Handle::new();
48///             let probe_handle_2 = probe_handle.clone();
49///
50///             let mut next_t: u64 = 0;
51///             iterator_source(
52///                 scope,
53///                 "Source",
54///                 move |prev_t| {
55///                     if let Some(first_x) = input.peek().cloned() {
56///                         next_t = first_x / 100 * 100;
57///                         Some(IteratorSourceInput {
58///                             lower_bound: Default::default(),
59///                             data: vec![
60///                                 (next_t,
61///                                  input.by_ref().take(10).map(|x| (/* "timestamp" */ x, x)).collect::<Vec<_>>())],
62///                             target: *prev_t,
63///                         })
64///                     } else {
65///                         None
66///                     }
67///                 },
68///                 probe_handle_2)
69///             .inspect_time(|t, d| eprintln!("@ {:?}: {:?}", t, d))
70///             .probe_with(&mut probe_handle);
71///         });
72///     }).unwrap();
73/// }
74/// ```
75pub fn iterator_source<
76    G: Scope,
77    D: Data,
78    DI: IntoIterator<Item=D>,
79    I: IntoIterator<Item=(G::Timestamp, DI)>,
80    F: FnMut(&G::Timestamp)->Option<IteratorSourceInput<G::Timestamp, D, DI, I>>+'static>(
81        scope: &G,
82        name: &str,
83        mut input_f: F,
84        probe: Handle<G::Timestamp>,
85        ) -> Stream<G, D> where G::Timestamp: TotalOrder {
86
87    let mut target = G::Timestamp::minimum();
88    source(scope, name, |cap, info| {
89        let mut cap = Some(cap);
90        let activator = scope.activator_for(&info.address[..]);
91        move |output| {
92            cap = cap.take().and_then(|mut cap| {
93                loop {
94                    if !probe.less_than(&target) {
95                        if let Some(IteratorSourceInput {
96                             lower_bound,
97                             data,
98                             target: new_target,
99                         }) = input_f(cap.time()) {
100                            target = new_target;
101                            let mut has_data = false;
102                            for (t, ds) in data.into_iter() {
103                                cap = if cap.time() != &t { cap.delayed(&t) } else { cap };
104                                let mut session = output.session(&cap);
105                                session.give_iterator(ds.into_iter());
106                                has_data = true;
107                            }
108
109                            cap = if cap.time().less_than(&lower_bound) { cap.delayed(&lower_bound) } else { cap };
110                            if !has_data {
111                                break Some(cap);
112                            }
113                        } else {
114                            break None;
115                        }
116                    } else {
117                        break Some(cap);
118                    }
119                }
120            });
121
122            if cap.is_some() {
123                activator.activate();
124            }
125        }
126    })
127}