use ::communication::Allocate;
use dataflow::{InputHandle, ProbeHandle};
use worker::Worker;
pub struct Barrier<A: Allocate> {
round: usize,
input: InputHandle<usize, ()>,
probe: ProbeHandle<usize>,
worker: Worker<A>,
}
impl<A: Allocate> Barrier<A> {
pub fn new(worker: &mut Worker<A>) -> Self {
use dataflow::operators::{Input, Probe};
let (input, probe) = worker.dataflow(|scope| {
let (handle, stream) = scope.new_input::<()>();
(handle, stream.probe())
});
Barrier { round: 0, input, probe, worker: worker.clone() }
}
pub fn wait(&mut self) {
self.round += 1;
self.input.advance_to(self.round);
while self.probe.less_than(self.input.time()) {
self.worker.step();
}
}
}