use std::rc::Rc;
use std::cell::RefCell;
use std::sync::Arc;
use ::progress::timestamp::RootTimestamp;
use ::progress::nested::product::Product;
use ::progress::frontier::MutableAntichain;
use dataflow::operators::capture::{Event, EventPusher};
use timely_communication::logging::{BufferingLogger, LoggerBatch, CommsEvent, CommsSetup};
type LogMessage = (u64, TimelySetup, TimelyEvent);
type CommsMessage = (u64, CommsSetup, CommsEvent);
pub type Logger = Rc<BufferingLogger<TimelySetup, TimelyEvent>>;
pub fn new_inactive_logger() -> Logger {
BufferingLogger::<TimelySetup, TimelyEvent>::new_inactive()
}
pub struct LoggerConfig {
pub timely_logging: Arc<Fn(TimelySetup)->Rc<BufferingLogger<TimelySetup, TimelyEvent>>+Send+Sync>,
pub communication_logging: Arc<Fn(CommsSetup)->Rc<BufferingLogger<CommsSetup, CommsEvent>>+Send+Sync>,
}
impl LoggerConfig {
pub fn new<P1: 'static, P2: 'static, F1: 'static, F2: 'static>(
timely_subscription: F1, communication_subscription: F2) -> Self where
P1: EventPusher<Product<RootTimestamp, u64>, LogMessage> + Send,
P2: EventPusher<Product<RootTimestamp, u64>, CommsMessage> + Send,
F1: Fn(TimelySetup)->P1+Send+Sync,
F2: Fn(CommsSetup)->P2+Send+Sync {
LoggerConfig {
timely_logging: Arc::new(move |events_setup: TimelySetup| {
let logger = RefCell::new(BatchLogger::new((timely_subscription)(events_setup)));
Rc::new(BufferingLogger::new(events_setup, Box::new(move |data| logger.borrow_mut().publish_batch(data))))
}),
communication_logging: Arc::new(move |comms_setup: CommsSetup| {
let logger = RefCell::new(BatchLogger::new((communication_subscription)(comms_setup)));
Rc::new(BufferingLogger::new(comms_setup, Box::new(move |data| logger.borrow_mut().publish_batch(data))))
}),
}
}
}
impl Default for LoggerConfig {
fn default() -> Self {
LoggerConfig {
timely_logging: Arc::new(|_setup| BufferingLogger::new_inactive()),
communication_logging: Arc::new(|_setup| BufferingLogger::new_inactive()),
}
}
}
struct BatchLogger<S, E, P> where P: EventPusher<Product<RootTimestamp, u64>, (u64, S, E)> {
frontier: Option<Product<RootTimestamp, u64>>,
event_pusher: P,
_s: ::std::marker::PhantomData<S>,
_e: ::std::marker::PhantomData<E>,
}
impl<S, E, P> BatchLogger<S, E, P> where P: EventPusher<Product<RootTimestamp, u64>, (u64, S, E)> {
fn new(event_pusher: P) -> Self {
BatchLogger {
frontier: Some(Default::default()),
event_pusher,
_s: ::std::marker::PhantomData,
_e: ::std::marker::PhantomData,
}
}
}
impl<S: Clone, E: Clone, P> BatchLogger<S, E, P> where P: EventPusher<Product<RootTimestamp, u64>, (u64, S, E)> {
pub fn publish_batch(&mut self, logger_batch: LoggerBatch<S, E>) -> () {
match logger_batch {
LoggerBatch::Logs(evs) => {
if let Some(frontier) = self.frontier {
let &(last_ts, _, _) = evs.last().unwrap();
self.event_pusher.push(Event::Messages(frontier, evs));
let new_frontier = RootTimestamp::new(last_ts);
self.event_pusher.push(Event::Progress(vec![(new_frontier, 1), (frontier, -1)]));
self.frontier = Some(new_frontier);
}
},
LoggerBatch::End => {
if let Some(frontier) = self.frontier {
self.event_pusher.push(Event::Progress(vec![(frontier, -1)]));
self.frontier = None;
}
},
}
}
}
pub struct EventPusherTee<T: ::order::PartialOrder+Ord+Default+Clone+'static, D: Clone> {
frontier: MutableAntichain<T>,
listeners: Vec<Box<EventPusher<T, D>+Send>>,
}
impl<T: ::order::PartialOrder+Ord+Default+Clone+'static, D: Clone> EventPusherTee<T, D> {
pub fn new() -> Self {
Self {
frontier: Default::default(),
listeners: Vec::new(),
}
}
pub fn subscribe(&mut self, mut listener: Box<EventPusher<T, D>+Send>) {
let mut changes = vec![(Default::default(), -1)];
changes.extend(self.frontier.frontier().iter().map(|x| (x.clone(), 1)));
listener.push(Event::Progress(changes));
self.listeners.push(listener);
}
}
impl<T: ::order::PartialOrder+Ord+Default+Clone, D: Clone> EventPusher<T, D> for EventPusherTee<T, D> {
fn push(&mut self, event: Event<T, D>) {
if let &Event::Progress(ref updates) = &event {
self.frontier.update_iter(updates.iter().cloned());
}
for listener in self.listeners.iter_mut() {
listener.push(event.clone());
}
}
}
#[derive(Abomonation, Debug, Clone)]
pub struct OperatesEvent {
pub id: usize,
pub addr: Vec<usize>,
pub name: String,
}
#[derive(Abomonation, Debug, Clone)]
pub struct ChannelsEvent {
pub id: usize,
pub scope_addr: Vec<usize>,
pub source: (usize, usize),
pub target: (usize, usize),
}
#[derive(Abomonation, Debug, Clone)]
pub struct ProgressEvent {
pub is_send: bool,
pub source: usize,
pub comm_channel: Option<usize>,
pub seq_no: usize,
pub addr: Vec<usize>,
pub messages: Vec<(usize, usize, String, i64)>,
pub internal: Vec<(usize, usize, String, i64)>,
}
#[derive(Abomonation, Debug, Clone)]
pub struct PushProgressEvent {
pub op_id: usize,
}
#[derive(Abomonation, Debug, Clone)]
pub struct MessagesEvent {
pub is_send: bool,
pub channel: usize,
pub comm_channel: Option<usize>,
pub source: usize,
pub target: usize,
pub seq_no: usize,
pub length: usize,
}
#[derive(Abomonation, Debug, Clone, PartialEq, Eq)]
pub enum StartStop {
Start,
Stop {
activity: bool
},
}
#[derive(Abomonation, Debug, Clone)]
pub struct ScheduleEvent {
pub id: usize,
pub start_stop: StartStop,
}
#[derive(Abomonation, Debug, Clone)]
pub struct ApplicationEvent {
pub id: usize,
pub is_start: bool,
}
#[derive(Abomonation, Debug, Clone)]
pub struct GuardedMessageEvent {
pub is_start: bool,
}
#[derive(Abomonation, Debug, Clone)]
pub struct GuardedProgressEvent {
pub is_start: bool,
}
#[derive(Abomonation, Debug, PartialEq, Eq, Hash, Clone, Copy)]
pub struct TimelySetup {
pub index: usize,
}
#[derive(Abomonation, Debug, Clone)]
pub enum CommChannelKind {
Progress,
Data,
}
#[derive(Abomonation, Debug, Clone)]
pub struct CommChannelsEvent {
pub comm_channel: Option<usize>,
pub comm_channel_kind: CommChannelKind,
}
#[derive(Abomonation, Debug, Clone)]
pub struct InputEvent {
pub start_stop: StartStop,
}
#[derive(Debug, Clone, Abomonation)]
#[allow(missing_docs)]
pub enum TimelyEvent {
Operates(OperatesEvent),
Channels(ChannelsEvent),
Progress(ProgressEvent),
PushProgress(PushProgressEvent),
Messages(MessagesEvent),
Schedule(ScheduleEvent),
Application(ApplicationEvent),
GuardedMessage(GuardedMessageEvent),
GuardedProgress(GuardedProgressEvent),
CommChannels(CommChannelsEvent),
Input(InputEvent),
}
impl From<OperatesEvent> for TimelyEvent {
fn from(v: OperatesEvent) -> TimelyEvent { TimelyEvent::Operates(v) }
}
impl From<ChannelsEvent> for TimelyEvent {
fn from(v: ChannelsEvent) -> TimelyEvent { TimelyEvent::Channels(v) }
}
impl From<ProgressEvent> for TimelyEvent {
fn from(v: ProgressEvent) -> TimelyEvent { TimelyEvent::Progress(v) }
}
impl From<PushProgressEvent> for TimelyEvent {
fn from(v: PushProgressEvent) -> TimelyEvent { TimelyEvent::PushProgress(v) }
}
impl From<MessagesEvent> for TimelyEvent {
fn from(v: MessagesEvent) -> TimelyEvent { TimelyEvent::Messages(v) }
}
impl From<ScheduleEvent> for TimelyEvent {
fn from(v: ScheduleEvent) -> TimelyEvent { TimelyEvent::Schedule(v) }
}
impl From<ApplicationEvent> for TimelyEvent {
fn from(v: ApplicationEvent) -> TimelyEvent { TimelyEvent::Application(v) }
}
impl From<GuardedMessageEvent> for TimelyEvent {
fn from(v: GuardedMessageEvent) -> TimelyEvent { TimelyEvent::GuardedMessage(v) }
}
impl From<GuardedProgressEvent> for TimelyEvent {
fn from(v: GuardedProgressEvent) -> TimelyEvent { TimelyEvent::GuardedProgress(v) }
}
impl From<CommChannelsEvent> for TimelyEvent {
fn from(v: CommChannelsEvent) -> TimelyEvent { TimelyEvent::CommChannels(v) }
}
impl From<InputEvent> for TimelyEvent {
fn from(v: InputEvent) -> TimelyEvent { TimelyEvent::Input(v) }
}