Skip to main content

pingpong/
pingpong.rs

1use timely::dataflow::operators::{ToStream, Exchange, Feedback, Concat, ConnectLoop, vec::{Map, BranchWhen}};
2
3fn main() {
4
5    let iterations = std::env::args().nth(1).unwrap().parse::<u64>().unwrap();
6    let elements = std::env::args().nth(2).unwrap().parse::<u64>().unwrap();
7
8    // initializes and runs a timely dataflow
9    timely::execute_from_args(std::env::args().skip(3), move |worker| {
10        let index = worker.index();
11        let peers = worker.peers();
12        worker.dataflow::<u64,_,_>(move |scope| {
13            let (helper, cycle) = scope.feedback(1);
14            (0 .. elements)
15                  .filter(move |&x| (x as usize) % peers == index)
16                  .to_stream(scope)
17                  .concat(cycle)
18                  .exchange(|&x| x)
19                  .map_in_place(|x| *x += 1)
20                  .branch_when(move |t| t < &iterations).1
21                  .connect_loop(helper);
22        });
23    }).unwrap();
24}