pub type WorkerIdentifier = usize;
pub type Logger<Event> = crate::logging_core::Logger<Event, WorkerIdentifier>;
pub type TimelyLogger = Logger<TimelyEvent>;
pub type TimelyProgressLogger = Logger<TimelyProgressEvent>;
use std::time::Duration;
use crate::dataflow::operators::capture::{Event, EventPusher};
pub struct BatchLogger<T, E, P> where P: EventPusher<Duration, (Duration, E, T)> {
time: Duration,
event_pusher: P,
_phantom: ::std::marker::PhantomData<(E, T)>,
}
impl<T, E, P> BatchLogger<T, E, P> where P: EventPusher<Duration, (Duration, E, T)> {
pub fn new(event_pusher: P) -> Self {
BatchLogger {
time: Default::default(),
event_pusher,
_phantom: ::std::marker::PhantomData,
}
}
pub fn publish_batch(&mut self, time: &Duration, data: &mut Vec<(Duration, E, T)>) {
if !data.is_empty() {
self.event_pusher.push(Event::Messages(self.time, data.drain(..).collect()));
}
if &self.time < time {
let new_frontier = time.clone();
let old_frontier = self.time.clone();
self.event_pusher.push(Event::Progress(vec![(new_frontier, 1), (old_frontier, -1)]));
}
self.time = time.clone();
}
}
impl<T, E, P> Drop for BatchLogger<T, E, P> where P: EventPusher<Duration, (Duration, E, T)> {
fn drop(&mut self) {
self.event_pusher.push(Event::Progress(vec![(self.time, -1)]));
}
}
#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
pub struct OperatesEvent {
pub id: usize,
pub addr: Vec<usize>,
pub name: String,
}
#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
pub struct ChannelsEvent {
pub id: usize,
pub scope_addr: Vec<usize>,
pub source: (usize, usize),
pub target: (usize, usize),
}
pub trait ProgressEventTimestamp: std::fmt::Debug + std::any::Any {
fn as_any(&self) -> &dyn std::any::Any;
fn type_name(&self) -> &'static str;
}
impl<T: crate::Data + std::fmt::Debug + std::any::Any> ProgressEventTimestamp for T {
fn as_any(&self) -> &dyn std::any::Any { self }
fn type_name(&self) -> &'static str { std::any::type_name::<T>() }
}
pub trait ProgressEventTimestampVec: std::fmt::Debug + std::any::Any {
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item=(&'a usize, &'a usize, &'a dyn ProgressEventTimestamp, &'a i64)>+'a>;
}
impl<T: ProgressEventTimestamp> ProgressEventTimestampVec for Vec<(usize, usize, T, i64)> {
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item=(&'a usize, &'a usize, &'a dyn ProgressEventTimestamp, &'a i64)>+'a> {
Box::new(<[(usize, usize, T, i64)]>::iter(&self[..]).map(|(n, p, t, d)| {
let t: &dyn ProgressEventTimestamp = t;
(n, p, t, d)
}))
}
}
#[derive(Debug)]
pub struct TimelyProgressEvent {
pub is_send: bool,
pub source: usize,
pub channel: usize,
pub seq_no: usize,
pub addr: Vec<usize>,
pub messages: Box<dyn ProgressEventTimestampVec>,
pub internal: Box<dyn ProgressEventTimestampVec>,
}
#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
pub struct PushProgressEvent {
pub op_id: usize,
}
#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
pub struct MessagesEvent {
pub is_send: bool,
pub channel: usize,
pub source: usize,
pub target: usize,
pub seq_no: usize,
pub length: usize,
}
#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, PartialEq, Eq, Ord, PartialOrd)]
pub enum StartStop {
Start,
Stop,
}
#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
pub struct ScheduleEvent {
pub id: usize,
pub start_stop: StartStop,
}
impl ScheduleEvent {
pub fn start(id: usize) -> Self { ScheduleEvent { id, start_stop: StartStop::Start } }
pub fn stop(id: usize) -> Self { ScheduleEvent { id, start_stop: StartStop::Stop } }
}
#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
pub struct ShutdownEvent {
pub id: usize,
}
#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
pub struct ApplicationEvent {
pub id: usize,
pub is_start: bool,
}
#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
pub struct GuardedMessageEvent {
pub is_start: bool,
}
#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
pub struct GuardedProgressEvent {
pub is_start: bool,
}
#[derive(Serialize, Deserialize, Abomonation, Debug, PartialEq, Eq, Hash, Clone, Copy)]
pub struct TimelySetup {
pub index: usize,
}
#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
pub enum CommChannelKind {
Progress,
Data,
}
#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
pub struct CommChannelsEvent {
pub identifier: usize,
pub kind: CommChannelKind,
}
#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
pub struct InputEvent {
pub start_stop: StartStop,
}
#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, PartialEq, Eq, Ord, PartialOrd)]
pub enum ParkEvent {
Park(Option<Duration>),
Unpark,
}
impl ParkEvent {
pub fn park(duration: Option<Duration>) -> Self { ParkEvent::Park(duration) }
pub fn unpark() -> Self { ParkEvent::Unpark }
}
#[derive(Serialize, Deserialize, Debug, Clone, Abomonation, Hash, Eq, PartialEq, Ord, PartialOrd)]
pub enum TimelyEvent {
Operates(OperatesEvent),
Channels(ChannelsEvent),
PushProgress(PushProgressEvent),
Messages(MessagesEvent),
Schedule(ScheduleEvent),
Shutdown(ShutdownEvent),
Application(ApplicationEvent),
GuardedMessage(GuardedMessageEvent),
GuardedProgress(GuardedProgressEvent),
CommChannels(CommChannelsEvent),
Input(InputEvent),
Park(ParkEvent),
Text(String),
}
impl From<OperatesEvent> for TimelyEvent {
fn from(v: OperatesEvent) -> TimelyEvent { TimelyEvent::Operates(v) }
}
impl From<ChannelsEvent> for TimelyEvent {
fn from(v: ChannelsEvent) -> TimelyEvent { TimelyEvent::Channels(v) }
}
impl From<PushProgressEvent> for TimelyEvent {
fn from(v: PushProgressEvent) -> TimelyEvent { TimelyEvent::PushProgress(v) }
}
impl From<MessagesEvent> for TimelyEvent {
fn from(v: MessagesEvent) -> TimelyEvent { TimelyEvent::Messages(v) }
}
impl From<ScheduleEvent> for TimelyEvent {
fn from(v: ScheduleEvent) -> TimelyEvent { TimelyEvent::Schedule(v) }
}
impl From<ShutdownEvent> for TimelyEvent {
fn from(v: ShutdownEvent) -> TimelyEvent { TimelyEvent::Shutdown(v) }
}
impl From<ApplicationEvent> for TimelyEvent {
fn from(v: ApplicationEvent) -> TimelyEvent { TimelyEvent::Application(v) }
}
impl From<GuardedMessageEvent> for TimelyEvent {
fn from(v: GuardedMessageEvent) -> TimelyEvent { TimelyEvent::GuardedMessage(v) }
}
impl From<GuardedProgressEvent> for TimelyEvent {
fn from(v: GuardedProgressEvent) -> TimelyEvent { TimelyEvent::GuardedProgress(v) }
}
impl From<CommChannelsEvent> for TimelyEvent {
fn from(v: CommChannelsEvent) -> TimelyEvent { TimelyEvent::CommChannels(v) }
}
impl From<InputEvent> for TimelyEvent {
fn from(v: InputEvent) -> TimelyEvent { TimelyEvent::Input(v) }
}
impl From<ParkEvent> for TimelyEvent {
fn from(v: ParkEvent) -> TimelyEvent { TimelyEvent::Park(v) }
}