use progress::Timestamp;
use progress::count_map::CountMap;
use timely_communication::Allocate;
use {Push, Pull};
pub type ProgressVec<T> = Vec<((usize, usize, T), i64)>;
pub struct Progcaster<T:Timestamp> {
pushers: Vec<Box<Push<(ProgressVec<T>, ProgressVec<T>)>>>,
puller: Box<Pull<(ProgressVec<T>, ProgressVec<T>)>>,
}
impl<T:Timestamp+Send> Progcaster<T> {
pub fn new<A: Allocate>(allocator: &mut A) -> Progcaster<T> {
let (pushers, puller) = allocator.allocate();
Progcaster { pushers: pushers, puller: puller }
}
pub fn send_and_recv(
&mut self,
messages: &mut CountMap<(usize, usize, T)>,
internal: &mut CountMap<(usize, usize, T)>)
{
if self.pushers.len() > 1 { if messages.len() > 0 || internal.len() > 0 {
for pusher in self.pushers.iter_mut() {
pusher.push(&mut Some((messages.clone().into_inner(), internal.clone().into_inner())));
}
messages.clear();
internal.clear();
}
while let Some((ref recv_messages, ref recv_internal)) = *self.puller.pull() {
for &(ref update, delta) in recv_messages {
messages.update(update, delta);
}
for &(ref update, delta) in recv_internal {
internal.update(update, delta);
}
}
}
}
}