use crate::communication::Allocate;
use crate::dataflow::{InputHandle, ProbeHandle};
use crate::worker::Worker;
pub struct Barrier<A: Allocate> {
input: InputHandle<usize, ()>,
probe: ProbeHandle<usize>,
worker: Worker<A>,
}
impl<A: Allocate> Barrier<A> {
pub fn new(worker: &mut Worker<A>) -> Self {
use crate::dataflow::operators::{Input, Probe};
let (input, probe) = worker.dataflow(|scope| {
let (handle, stream) = scope.new_input::<()>();
(handle, stream.probe())
});
Barrier { input, probe, worker: worker.clone() }
}
pub fn wait(&mut self) {
self.advance();
while !self.reached() {
self.worker.step();
}
}
#[inline]
pub fn advance(&mut self) {
let round = *self.input.time();
self.input.advance_to(round + 1);
}
#[inline]
pub fn reached(&mut self) -> bool {
!self.probe.less_than(self.input.time())
}
}