use std::rc::Rc;
use crate::progress::{ChangeBatch, Timestamp};
use crate::progress::{Location, Port};
use crate::communication::{Push, Pull};
use crate::logging::TimelyLogger as Logger;
use crate::logging::TimelyProgressLogger as ProgressLogger;
use crate::Bincode;
pub type ProgressMsg<T> = Bincode<(usize, usize, ChangeBatch<(Location, T)>)>;
pub struct Progcaster<T:Timestamp> {
pusher: Box<dyn Push<ProgressMsg<T>>>,
puller: Box<dyn Pull<ProgressMsg<T>>>,
source: usize,
counter: usize,
identifier: usize,
channel_identifier: usize,
progress_logging: Option<ProgressLogger<T>>,
}
impl<T:Timestamp+Send> Progcaster<T> {
pub fn new(worker: &crate::worker::Worker, addr: Rc<[usize]>, identifier: usize, mut logging: Option<Logger>, progress_logging: Option<ProgressLogger<T>>) -> Progcaster<T> {
let channel_identifier = worker.new_identifier();
let (pusher, puller) = worker.broadcast(channel_identifier, addr);
logging.as_mut().map(|l| l.log(crate::logging::CommChannelsEvent {
identifier: channel_identifier,
kind: crate::logging::CommChannelKind::Progress,
}));
let worker_index = worker.index();
Progcaster {
pusher,
puller,
source: worker_index,
counter: 0,
identifier,
channel_identifier,
progress_logging,
}
}
pub fn send(&mut self, changes: &mut ChangeBatch<(Location, T)>) {
changes.compact();
if !changes.is_empty() {
self.progress_logging.as_ref().map(|l| {
let mut messages = Vec::with_capacity(changes.len());
let mut internal = Vec::with_capacity(changes.len());
for ((location, time), diff) in changes.iter() {
match location.port {
Port::Target(port) => {
messages.push((location.node, port, time.clone(), *diff))
},
Port::Source(port) => {
internal.push((location.node, port, time.clone(), *diff))
}
}
}
l.log(crate::logging::TimelyProgressEvent {
is_send: true,
source: self.source,
channel: self.channel_identifier,
seq_no: self.counter,
identifier: self.identifier,
messages,
internal,
});
});
let payload = (self.source, self.counter, std::mem::take(changes));
let mut to_push = Some(Bincode { payload });
self.pusher.push(&mut to_push);
self.pusher.done();
if let Some(pushed) = to_push {
*changes = pushed.payload.2;
changes.clear();
}
self.counter += 1;
}
}
pub fn recv(&mut self, changes: &mut ChangeBatch<(Location, T)>) {
while let Some(message) = self.puller.pull() {
let source = message.0;
let counter = message.1;
let recv_changes = &mut message.2;
let channel = self.channel_identifier;
self.progress_logging.as_ref().map(|l| {
let mut messages = Vec::with_capacity(recv_changes.len());
let mut internal = Vec::with_capacity(recv_changes.len());
for ((location, time), diff) in recv_changes.iter() {
match location.port {
Port::Target(port) => {
messages.push((location.node, port, time.clone(), *diff))
},
Port::Source(port) => {
internal.push((location.node, port, time.clone(), *diff))
}
}
}
l.log(crate::logging::TimelyProgressEvent {
is_send: false,
source,
seq_no: counter,
channel,
identifier: self.identifier,
messages,
internal,
});
});
changes.extend(recv_changes.iter().map(|(u,d)| (u.clone(), *d)));
}
}
}