1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
//! Methods to construct flow-controlled sources.
use crateData;
use crate;
use cratesource;
use crateHandle;
use crate;
/// Output of the input reading function for iterator_source.
/// Construct a source that repeatedly calls the provided function to ingest input.
/// - The function can return None to signal the end of the input;
/// - otherwise, it should return a `IteratorSourceInput`, where:
/// * `lower_bound` is a lower bound on timestamps that can be emitted by this input in the future,
/// `Default::default()` can be used if this isn't needed (the source will assume that
/// the timestamps in `data` are monotonically increasing and will release capabilities
/// accordingly);
/// * `data` is any `T: IntoIterator` of new input data in the form (time, data): time must be
/// monotonically increasing;
/// * `target` is a timestamp that represents the frontier that the probe should have
/// reached before the function is invoked again to ingest additional input.
/// The function will receive the current lower bound of timestamps that can be inserted,
/// `lower_bound`.
///
/// # Example
/// ```rust
/// extern crate timely;
///
/// use timely::dataflow::operators::flow_controlled::{iterator_source, IteratorSourceInput};
/// use timely::dataflow::operators::{probe, Probe, Inspect};
///
/// fn main() {
/// timely::execute_from_args(std::env::args(), |worker| {
/// let mut input = (0u64..100000).peekable();
/// worker.dataflow(|scope| {
/// let mut probe_handle = probe::Handle::new();
/// let probe_handle_2 = probe_handle.clone();
///
/// let mut next_t: u64 = 0;
/// iterator_source(
/// scope,
/// "Source",
/// move |prev_t| {
/// if let Some(first_x) = input.peek().cloned() {
/// next_t = first_x / 100 * 100;
/// Some(IteratorSourceInput {
/// lower_bound: Default::default(),
/// data: vec![
/// (next_t,
/// input.by_ref().take(10).map(|x| (/* "timestamp" */ x, x)).collect::<Vec<_>>())],
/// target: *prev_t,
/// })
/// } else {
/// None
/// }
/// },
/// probe_handle_2)
/// .inspect_time(|t, d| eprintln!("@ {:?}: {:?}", t, d))
/// .probe_with(&mut probe_handle);
/// });
/// }).unwrap();
/// }
/// ```