Skip to main content

exchange/
exchange.rs

1use timely::dataflow::InputHandle;
2use timely::dataflow::operators::{Input, Exchange, Probe};
3
4fn main() {
5    // initializes and runs a timely dataflow.
6    timely::execute_from_args(std::env::args(), |worker| {
7
8        let batch = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
9        let rounds = std::env::args().nth(2).unwrap().parse::<usize>().unwrap();
10        let mut input = InputHandle::new();
11
12        // create a new input, exchange data, and inspect its output
13        let probe = worker.dataflow(|scope|
14            scope
15                .input_from(&mut input)
16                .container::<Vec<_>>()
17                .exchange(|&x| x as u64)
18                .probe()
19                .0
20        );
21
22
23        let timer = std::time::Instant::now();
24
25        for round in 0 .. rounds {
26
27            for i in 0 .. batch {
28                input.send(i);
29            }
30            input.advance_to(round);
31
32            while probe.less_than(input.time()) {
33                worker.step();
34            }
35
36        }
37
38        let volume = (rounds * batch) as f64;
39        let elapsed = timer.elapsed();
40        let seconds = elapsed.as_secs() as f64 + (f64::from(elapsed.subsec_nanos())/1000000000.0);
41
42        println!("{:?}\tworker {} complete; rate: {:?}", timer.elapsed(), worker.index(), volume / seconds);
43
44    }).unwrap();
45}