event_driven/
event_driven.rs1use timely::dataflow::operators::{Input, Probe, Enter, Leave};
2use timely::dataflow::operators::vec::Map;
3
4fn main() {
5 timely::execute_from_args(std::env::args(), |worker| {
7
8 let timer = std::time::Instant::now();
9
10 let mut args = std::env::args();
11 args.next();
12
13 let dataflows = args.next().unwrap().parse::<usize>().unwrap();
14 let length = args.next().unwrap().parse::<usize>().unwrap();
15 let record = args.next() == Some("record".to_string());
16
17 let mut inputs = Vec::new();
18 let mut probes = Vec::new();
19
20 for _dataflow in 0 .. dataflows {
22 worker.dataflow(|scope| {
23 let (input, stream) = scope.new_input();
24 let stream = scope.region(|inner| {
25 let mut stream = stream.enter(inner);
26 for _step in 0 .. length {
27 stream = stream.map(|x: ()| x);
28 }
29 stream.leave(scope)
30 });
31 let (probe, _stream) = stream.probe();
32 inputs.push(input);
33 probes.push(probe);
34 });
35 }
36
37 println!("{:?}\tdataflows built ({} x {})", timer.elapsed(), dataflows, length);
38
39 for round in 0 .. {
40 let dataflow = round % dataflows;
41 if record {
42 inputs[dataflow].send(());
43 }
44 inputs[dataflow].advance_to(round);
45 let mut steps = 0;
46 while probes[dataflow].less_than(&round) {
47 worker.step();
48 steps += 1;
49 }
50 if round % 1000 == 0 { println!("{:?}\tround {} complete in {} steps", timer.elapsed(), round, steps); }
51 }
52
53 }).unwrap();
54}