1use timely::dataflow::channels::pact::Pipeline;
2use timely::dataflow::operators::{Feedback, ConnectLoop};
3use timely::dataflow::operators::generic::operator::Operator;
4use timely::container::CapacityContainerBuilder;
5
6fn main() {
7
8 let iterations = std::env::args().nth(1).unwrap().parse::<usize>().unwrap_or(1_000_000);
9
10 timely::execute_from_args(std::env::args().skip(2), move |worker| {
11
12 worker.dataflow(move |scope| {
13 let (handle, stream) = scope.feedback::<Vec<usize>>(1);
14 stream.unary_notify::<CapacityContainerBuilder<_>, _, _>(
15 Pipeline,
16 "Barrier",
17 vec![0],
18 move |_, _, notificator| {
19 while let Some((cap, _count)) = notificator.next() {
20 let time = *cap.time() + 1;
21 if time < iterations {
22 notificator.notify_at(cap.delayed(&time));
23 }
24 }
25 }
26 )
27 .connect_loop(handle);
28 });
29 }).unwrap(); }