Skip to main content

barrier/
barrier.rs

1use timely::dataflow::channels::pact::Pipeline;
2use timely::dataflow::operators::{Feedback, ConnectLoop};
3use timely::dataflow::operators::generic::operator::Operator;
4use timely::container::CapacityContainerBuilder;
5
6fn main() {
7
8    let iterations = std::env::args().nth(1).unwrap().parse::<usize>().unwrap_or(1_000_000);
9
10    timely::execute_from_args(std::env::args().skip(2), move |worker| {
11
12        worker.dataflow(move |scope| {
13            let (handle, stream) = scope.feedback::<Vec<usize>>(1);
14            stream.unary_notify::<CapacityContainerBuilder<_>, _, _>(
15                Pipeline,
16                "Barrier",
17                vec![0],
18                move |_, _, notificator| {
19                    while let Some((cap, _count)) = notificator.next() {
20                        let time = *cap.time() + 1;
21                        if time < iterations {
22                            notificator.notify_at(cap.delayed(&time));
23                        }
24                    }
25                }
26            )
27            .connect_loop(handle);
28        });
29    }).unwrap(); // asserts error-free execution;
30}