1use timely::dataflow::InputHandle;
2use timely::dataflow::operators::{Input, Exchange, Probe};
3
4fn main() {
5 timely::execute_from_args(std::env::args(), |worker| {
7
8 let batch = std::env::args().nth(1).unwrap().parse::<usize>().unwrap();
9 let rounds = std::env::args().nth(2).unwrap().parse::<usize>().unwrap();
10 let mut input = InputHandle::new();
11
12 let probe = worker.dataflow(|scope|
14 scope
15 .input_from(&mut input)
16 .container::<Vec<_>>()
17 .exchange(|&x| x as u64)
18 .probe()
19 .0
20 );
21
22
23 let timer = std::time::Instant::now();
24
25 for round in 0 .. rounds {
26
27 for i in 0 .. batch {
28 input.send(i);
29 }
30 input.advance_to(round);
31
32 while probe.less_than(input.time()) {
33 worker.step();
34 }
35
36 }
37
38 let volume = (rounds * batch) as f64;
39 let elapsed = timer.elapsed();
40 let seconds = elapsed.as_secs() as f64 + (f64::from(elapsed.subsec_nanos())/1000000000.0);
41
42 println!("{:?}\tworker {} complete; rate: {:?}", timer.elapsed(), worker.index(), volume / seconds);
43
44 }).unwrap();
45}