1pub type WorkerIdentifier = usize;
5pub type Logger<Event> = crate::logging_core::Logger<Event, WorkerIdentifier>;
7pub type TimelyLogger = Logger<TimelyEvent>;
9pub type TimelyProgressLogger = Logger<TimelyProgressEvent>;
11
12use std::time::Duration;
13use crate::dataflow::operators::capture::{Event, EventPusher};
14
15pub struct BatchLogger<T, E, P> where P: EventPusher<Duration, (Duration, E, T)> {
17 time: Duration,
19 event_pusher: P,
20 _phantom: ::std::marker::PhantomData<(E, T)>,
21}
22
23impl<T, E, P> BatchLogger<T, E, P> where P: EventPusher<Duration, (Duration, E, T)> {
24 pub fn new(event_pusher: P) -> Self {
26 BatchLogger {
27 time: Default::default(),
28 event_pusher,
29 _phantom: ::std::marker::PhantomData,
30 }
31 }
32 pub fn publish_batch(&mut self, &time: &Duration, data: &mut Vec<(Duration, E, T)>) {
34 if !data.is_empty() {
35 self.event_pusher.push(Event::Messages(self.time, data.drain(..).collect()));
36 }
37 if self.time < time {
38 let new_frontier = time;
39 let old_frontier = self.time;
40 self.event_pusher.push(Event::Progress(vec![(new_frontier, 1), (old_frontier, -1)]));
41 }
42 self.time = time;
43 }
44}
45impl<T, E, P> Drop for BatchLogger<T, E, P> where P: EventPusher<Duration, (Duration, E, T)> {
46 fn drop(&mut self) {
47 self.event_pusher.push(Event::Progress(vec![(self.time, -1)]));
48 }
49}
50
51#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
52pub struct OperatesEvent {
54 pub id: usize,
56 pub addr: Vec<usize>,
58 pub name: String,
60}
61
62#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
63pub struct ChannelsEvent {
65 pub id: usize,
67 pub scope_addr: Vec<usize>,
69 pub source: (usize, usize),
71 pub target: (usize, usize),
73}
74
75pub trait ProgressEventTimestamp: std::fmt::Debug + std::any::Any {
77 fn as_any(&self) -> &dyn std::any::Any;
91
92 fn type_name(&self) -> &'static str;
102}
103impl<T: crate::Data + std::fmt::Debug + std::any::Any> ProgressEventTimestamp for T {
104 fn as_any(&self) -> &dyn std::any::Any { self }
105
106 fn type_name(&self) -> &'static str { std::any::type_name::<T>() }
107}
108
109pub trait ProgressEventTimestampVec: std::fmt::Debug + std::any::Any {
116 fn iter<'a>(&'a self) -> Box<dyn Iterator<Item=(&'a usize, &'a usize, &'a dyn ProgressEventTimestamp, &'a i64)>+'a>;
118}
119
120impl<T: ProgressEventTimestamp> ProgressEventTimestampVec for Vec<(usize, usize, T, i64)> {
121 fn iter<'a>(&'a self) -> Box<dyn Iterator<Item=(&'a usize, &'a usize, &'a dyn ProgressEventTimestamp, &'a i64)>+'a> {
122 Box::new(<[(usize, usize, T, i64)]>::iter(&self[..]).map(|(n, p, t, d)| {
123 let t: &dyn ProgressEventTimestamp = t;
124 (n, p, t, d)
125 }))
126 }
127}
128
129#[derive(Debug)]
130pub struct TimelyProgressEvent {
132 pub is_send: bool,
134 pub source: usize,
136 pub channel: usize,
138 pub seq_no: usize,
140 pub addr: Vec<usize>,
142 pub messages: Box<dyn ProgressEventTimestampVec>,
144 pub internal: Box<dyn ProgressEventTimestampVec>,
146}
147
148#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
149pub struct PushProgressEvent {
151 pub op_id: usize,
153}
154
155#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
156pub struct MessagesEvent {
158 pub is_send: bool,
160 pub channel: usize,
162 pub source: usize,
164 pub target: usize,
166 pub seq_no: usize,
168 pub length: usize,
170}
171
172#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, PartialEq, Eq, Ord, PartialOrd)]
174pub enum StartStop {
175 Start,
177 Stop,
179}
180
181#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
182pub struct ScheduleEvent {
184 pub id: usize,
186 pub start_stop: StartStop,
190}
191
192impl ScheduleEvent {
193 pub fn start(id: usize) -> Self { ScheduleEvent { id, start_stop: StartStop::Start } }
195 pub fn stop(id: usize) -> Self { ScheduleEvent { id, start_stop: StartStop::Stop } }
197}
198
199#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
200pub struct ShutdownEvent {
202 pub id: usize,
204}
205
206#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
207pub struct ApplicationEvent {
209 pub id: usize,
211 pub is_start: bool,
213}
214
215#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
216pub struct GuardedMessageEvent {
218 pub is_start: bool,
220}
221
222#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
223pub struct GuardedProgressEvent {
225 pub is_start: bool,
227}
228
229#[derive(Serialize, Deserialize, Abomonation, Debug, PartialEq, Eq, Hash, Clone, Copy)]
230pub struct TimelySetup {
232 pub index: usize,
234}
235
236#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
237pub enum CommChannelKind {
239 Progress,
241 Data,
243}
244
245#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
246pub struct CommChannelsEvent {
248 pub identifier: usize,
250 pub kind: CommChannelKind,
252}
253
254#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
255pub struct InputEvent {
257 pub start_stop: StartStop,
259}
260
261#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, PartialEq, Eq, Ord, PartialOrd)]
263pub enum ParkEvent {
264 Park(Option<Duration>),
266 Unpark,
268}
269
270impl ParkEvent {
271 pub fn park(duration: Option<Duration>) -> Self { ParkEvent::Park(duration) }
273 pub fn unpark() -> Self { ParkEvent::Unpark }
275}
276
277#[derive(Serialize, Deserialize, Debug, Clone, Abomonation, Hash, Eq, PartialEq, Ord, PartialOrd)]
278pub enum TimelyEvent {
280 Operates(OperatesEvent),
282 Channels(ChannelsEvent),
284 PushProgress(PushProgressEvent),
286 Messages(MessagesEvent),
288 Schedule(ScheduleEvent),
290 Shutdown(ShutdownEvent),
292 Application(ApplicationEvent),
294 GuardedMessage(GuardedMessageEvent),
296 GuardedProgress(GuardedProgressEvent),
298 CommChannels(CommChannelsEvent),
300 Input(InputEvent),
302 Park(ParkEvent),
304 Text(String),
306}
307
308impl From<OperatesEvent> for TimelyEvent {
309 fn from(v: OperatesEvent) -> TimelyEvent { TimelyEvent::Operates(v) }
310}
311
312impl From<ChannelsEvent> for TimelyEvent {
313 fn from(v: ChannelsEvent) -> TimelyEvent { TimelyEvent::Channels(v) }
314}
315
316impl From<PushProgressEvent> for TimelyEvent {
317 fn from(v: PushProgressEvent) -> TimelyEvent { TimelyEvent::PushProgress(v) }
318}
319
320impl From<MessagesEvent> for TimelyEvent {
321 fn from(v: MessagesEvent) -> TimelyEvent { TimelyEvent::Messages(v) }
322}
323
324impl From<ScheduleEvent> for TimelyEvent {
325 fn from(v: ScheduleEvent) -> TimelyEvent { TimelyEvent::Schedule(v) }
326}
327
328impl From<ShutdownEvent> for TimelyEvent {
329 fn from(v: ShutdownEvent) -> TimelyEvent { TimelyEvent::Shutdown(v) }
330}
331
332impl From<ApplicationEvent> for TimelyEvent {
333 fn from(v: ApplicationEvent) -> TimelyEvent { TimelyEvent::Application(v) }
334}
335
336impl From<GuardedMessageEvent> for TimelyEvent {
337 fn from(v: GuardedMessageEvent) -> TimelyEvent { TimelyEvent::GuardedMessage(v) }
338}
339
340impl From<GuardedProgressEvent> for TimelyEvent {
341 fn from(v: GuardedProgressEvent) -> TimelyEvent { TimelyEvent::GuardedProgress(v) }
342}
343
344impl From<CommChannelsEvent> for TimelyEvent {
345 fn from(v: CommChannelsEvent) -> TimelyEvent { TimelyEvent::CommChannels(v) }
346}
347
348impl From<InputEvent> for TimelyEvent {
349 fn from(v: InputEvent) -> TimelyEvent { TimelyEvent::Input(v) }
350}
351
352impl From<ParkEvent> for TimelyEvent {
353 fn from(v: ParkEvent) -> TimelyEvent { TimelyEvent::Park(v) }
354}