Skip to main content

event_driven/
event_driven.rs

1use timely::dataflow::operators::{Input, Probe, Enter, Leave};
2use timely::dataflow::operators::vec::Map;
3
4fn main() {
5    // initializes and runs a timely dataflow.
6    timely::execute_from_args(std::env::args(), |worker| {
7
8        let timer = std::time::Instant::now();
9
10        let mut args = std::env::args();
11        args.next();
12
13        let dataflows = args.next().unwrap().parse::<usize>().unwrap();
14        let length = args.next().unwrap().parse::<usize>().unwrap();
15        let record = args.next() == Some("record".to_string());
16
17        let mut inputs = Vec::new();
18        let mut probes = Vec::new();
19
20        // create a new input, exchange data, and inspect its output
21        for _dataflow in 0 .. dataflows {
22            worker.dataflow(|scope| {
23                let (input, stream) = scope.new_input();
24                let stream = scope.region(|inner| {
25                    let mut stream = stream.enter(inner);
26                    for _step in 0 .. length {
27                        stream = stream.map(|x: ()| x);
28                    }
29                    stream.leave(scope)
30                });
31                let (probe, _stream) = stream.probe();
32                inputs.push(input);
33                probes.push(probe);
34            });
35        }
36
37        println!("{:?}\tdataflows built ({} x {})", timer.elapsed(), dataflows, length);
38
39        for round in 0 .. {
40            let dataflow = round % dataflows;
41            if record {
42                inputs[dataflow].send(());
43            }
44            inputs[dataflow].advance_to(round);
45            let mut steps = 0;
46            while probes[dataflow].less_than(&round) {
47                worker.step();
48                steps += 1;
49            }
50            if round % 1000 == 0 { println!("{:?}\tround {} complete in {} steps", timer.elapsed(), round, steps); }
51        }
52
53    }).unwrap();
54}