event_driven/
event_driven.rs1use timely::dataflow::operators::{Input, Probe};
2use timely::dataflow::operators::vec::Map;
3
4fn main() {
5 timely::execute_from_args(std::env::args(), |worker| {
7
8 let timer = std::time::Instant::now();
9
10 let mut args = std::env::args();
11 args.next();
12
13 let dataflows = args.next().unwrap().parse::<usize>().unwrap();
14 let length = args.next().unwrap().parse::<usize>().unwrap();
15 let record = args.next() == Some("record".to_string());
16
17 let mut inputs = Vec::new();
18 let mut probes = Vec::new();
19
20 for _dataflow in 0 .. dataflows {
22 worker.dataflow(|scope| {
23 let (input, mut stream) = scope.new_input();
24 for _step in 0 .. length {
25 stream = stream.map(|x: ()| x);
26 }
27 let (probe, _stream) = stream.probe();
28 inputs.push(input);
29 probes.push(probe);
30 });
31 }
32
33 println!("{:?}\tdataflows built ({} x {})", timer.elapsed(), dataflows, length);
34
35 for round in 0 .. {
36 let dataflow = round % dataflows;
37 if record {
38 inputs[dataflow].send(());
39 }
40 inputs[dataflow].advance_to(round);
41 let mut steps = 0;
42 while probes[dataflow].less_than(&round) {
43 worker.step();
44 steps += 1;
45 }
46 println!("{:?}\tround {} complete in {} steps", timer.elapsed(), round, steps);
47 }
48
49 }).unwrap();
50}