use progress::Timestamp;
use progress::ChangeBatch;
use timely_communication::Allocate;
use {Push, Pull};
use logging::Logger;
pub type ProgressVec<T> = Vec<((usize, usize, T), i64)>;
pub type ProgressMsg<T> = (usize, usize, ProgressVec<T>, ProgressVec<T>);
pub struct Progcaster<T:Timestamp> {
pushers: Vec<Box<Push<ProgressMsg<T>>>>,
puller: Box<Pull<ProgressMsg<T>>>,
source: usize,
counter: usize,
addr: Vec<usize>,
comm_channel: Option<usize>,
logging: Logger,
}
impl<T:Timestamp+Send> Progcaster<T> {
pub fn new<A: Allocate>(allocator: &mut A, path: &Vec<usize>, logging: Logger) -> Progcaster<T> {
let (pushers, puller, chan) = allocator.allocate();
logging.when_enabled(|l| l.log(::logging::TimelyEvent::CommChannels(::logging::CommChannelsEvent {
comm_channel: chan,
comm_channel_kind: ::logging::CommChannelKind::Progress,
})));
let worker = allocator.index();
let addr = path.clone();
Progcaster { pushers: pushers, puller: puller, source: worker,
counter: 0, addr: addr, comm_channel: chan,
logging: logging }
}
pub fn send_and_recv(
&mut self,
messages: &mut ChangeBatch<(usize, usize, T)>,
internal: &mut ChangeBatch<(usize, usize, T)>)
{
if self.pushers.len() > 1 { if !messages.is_empty() || !internal.is_empty() {
self.logging.when_enabled(|l| l.log(::logging::TimelyEvent::Progress(::logging::ProgressEvent {
is_send: true,
source: self.source,
comm_channel: self.comm_channel,
seq_no: self.counter,
addr: self.addr.clone(),
messages: Vec::new(),
internal: Vec::new(),
})));
for pusher in self.pushers.iter_mut() {
pusher.push(&mut Some((self.source, self.counter, messages.clone().into_inner(), internal.clone().into_inner())));
}
self.counter += 1;
messages.clear();
internal.clear();
}
while let Some((ref source, ref counter, ref mut recv_messages, ref mut recv_internal)) = *self.puller.pull() {
let comm_channel = self.comm_channel;
let addr = &mut self.addr;
self.logging.when_enabled(|l| l.log(::logging::TimelyEvent::Progress(::logging::ProgressEvent {
is_send: false,
source: *source,
seq_no: *counter,
comm_channel,
addr: addr.clone(),
messages: Vec::new(),
internal: Vec::new(),
})));
for &(ref update, delta) in recv_messages.iter() {
messages.update(update.clone(), delta);
}
for &(ref update, delta) in recv_internal.iter() {
internal.update(update.clone(), delta);
}
}
}
}
}