flow_controlled/
flow_controlled.rs1use timely::dataflow::operators::vec::flow_controlled::{iterator_source, IteratorSourceInput};
2use timely::dataflow::operators::{probe, Probe, Inspect};
3
4fn main() {
5 timely::execute_from_args(std::env::args(), |worker| {
6 let mut input = (0u64..100000).peekable();
7 worker.dataflow(|scope| {
8 let probe_handle = probe::Handle::new();
9 let probe_handle_2 = probe_handle.clone();
10
11 iterator_source(
12 scope,
13 "Source",
14 move |prev_t| {
15 if let Some(first_x) = input.peek().cloned() {
16 let next_t = first_x / 100 * 100;
17 Some(IteratorSourceInput {
18 lower_bound: Default::default(),
19 data: vec![
20 (next_t,
21 input.by_ref().take(10).map(|x| (x, x)).collect::<Vec<_>>())],
22 target: *prev_t,
23 })
24 } else {
25 None
26 }
27 },
28 probe_handle_2)
29 .inspect_time(|t, d| eprintln!("@ {:?}: {:?}", t, d))
30 .probe_with(&probe_handle);
31 });
32 }).unwrap();
33}