1use timely::dataflow::operators::{ToStream, Exchange, Feedback, Concat, ConnectLoop, vec::{Map, BranchWhen}};
2
3fn main() {
4
5 let iterations = std::env::args().nth(1).unwrap().parse::<u64>().unwrap();
6 let elements = std::env::args().nth(2).unwrap().parse::<u64>().unwrap();
7
8 timely::execute_from_args(std::env::args().skip(3), move |worker| {
10 let index = worker.index();
11 let peers = worker.peers();
12 worker.dataflow::<u64,_,_>(move |scope| {
13 let (helper, cycle) = scope.feedback(1);
14 (0 .. elements)
15 .filter(move |&x| (x as usize) % peers == index)
16 .to_stream(scope)
17 .concat(cycle)
18 .exchange(|&x| x)
19 .map_in_place(|x| *x += 1)
20 .branch_when(move |t| t < &iterations).1
21 .connect_loop(helper);
22 });
23 }).unwrap();
24}