Skip to main content

flow_controlled/
flow_controlled.rs

1use timely::dataflow::operators::vec::flow_controlled::{iterator_source, IteratorSourceInput};
2use timely::dataflow::operators::{probe, Probe, Inspect};
3
4fn main() {
5    timely::execute_from_args(std::env::args(), |worker| {
6        let mut input = (0u64..100000).peekable();
7        worker.dataflow(|scope| {
8            let probe_handle = probe::Handle::new();
9            let probe_handle_2 = probe_handle.clone();
10
11            iterator_source(
12                scope,
13                "Source",
14                move |prev_t| {
15                    if let Some(first_x) = input.peek().cloned() {
16                        let next_t = first_x / 100 * 100;
17                        Some(IteratorSourceInput {
18                            lower_bound: Default::default(),
19                            data: vec![
20                                (next_t,
21                                 input.by_ref().take(10).map(|x| (/* "timestamp" */ x, x)).collect::<Vec<_>>())],
22                            target: *prev_t,
23                        })
24                    } else {
25                        None
26                    }
27                },
28                probe_handle_2)
29            .inspect_time(|t, d| eprintln!("@ {:?}: {:?}", t, d))
30            .probe_with(&probe_handle);
31        });
32    }).unwrap();
33}