Skip to main content

event_driven/
event_driven.rs

1use timely::dataflow::Scope;
2use timely::dataflow::operators::{Input, Probe, Enter, Leave};
3use timely::dataflow::operators::vec::Map;
4
5fn main() {
6    // initializes and runs a timely dataflow.
7    timely::execute_from_args(std::env::args(), |worker| {
8
9        let timer = std::time::Instant::now();
10
11        let mut args = std::env::args();
12        args.next();
13
14        let dataflows = args.next().unwrap().parse::<usize>().unwrap();
15        let length = args.next().unwrap().parse::<usize>().unwrap();
16        let record = args.next() == Some("record".to_string());
17
18        let mut inputs = Vec::new();
19        let mut probes = Vec::new();
20
21        // create a new input, exchange data, and inspect its output
22        for _dataflow in 0 .. dataflows {
23            worker.dataflow(|scope| {
24                let (input, stream) = scope.new_input();
25                let stream = scope.region(|inner| {
26                    let mut stream = stream.enter(inner);
27                    for _step in 0 .. length {
28                        stream = stream.map(|x: ()| x);
29                    }
30                    stream.leave()
31                });
32                let (probe, _stream) = stream.probe();
33                inputs.push(input);
34                probes.push(probe);
35            });
36        }
37
38        println!("{:?}\tdataflows built ({} x {})", timer.elapsed(), dataflows, length);
39
40        for round in 0 .. {
41            let dataflow = round % dataflows;
42            if record {
43                inputs[dataflow].send(());
44            }
45            inputs[dataflow].advance_to(round);
46            let mut steps = 0;
47            while probes[dataflow].less_than(&round) {
48                worker.step();
49                steps += 1;
50            }
51            if round % 1000 == 0 { println!("{:?}\tround {} complete in {} steps", timer.elapsed(), round, steps); }
52        }
53
54    }).unwrap();
55}