ddshow_types/timely_logging/
mod.rs

1mod timely_event;
2
3pub use timely_event::{ArchivedTimelyEvent, DistinguishingId, TimelyEvent, TimelyEventResolver};
4
5use crate::{
6    ids::{ChannelId, OperatorId, PortId},
7    OperatorAddr,
8};
9use core::{fmt::Debug, time::Duration};
10use timely::logging::{
11    ApplicationEvent as TimelyApplicationEvent, ChannelsEvent as TimelyChannelsEvent,
12    CommChannelKind as TimelyCommChannelKind, CommChannelsEvent as TimelyCommChannelsEvent,
13    GuardedMessageEvent as TimelyGuardedMessageEvent,
14    GuardedProgressEvent as TimelyGuardedProgressEvent, InputEvent as TimelyInputEvent,
15    MessagesEvent as TimelyMessagesEvent, OperatesEvent as TimelyOperatesEvent,
16    ParkEvent as TimelyParkEvent, PushProgressEvent as TimelyPushProgressEvent,
17    ScheduleEvent as TimelyScheduleEvent, ShutdownEvent as TimelyShutdownEvent,
18    StartStop as TimelyStartStop,
19};
20
21#[cfg(feature = "rkyv")]
22use rkyv_dep::{Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
23
24#[cfg(feature = "serde")]
25use serde_dep::{Deserialize as SerdeDeserialize, Serialize as SerdeSerialize};
26
27#[cfg(feature = "enable_abomonation")]
28use abomonation_derive::Abomonation;
29
30#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
31#[cfg_attr(
32    feature = "serde",
33    derive(SerdeSerialize, SerdeDeserialize),
34    serde(crate = "serde_dep")
35)]
36#[cfg_attr(
37    feature = "rkyv",
38    derive(Archive, RkyvSerialize, RkyvDeserialize),
39    archive(crate = "rkyv_dep"),
40    archive_attr(derive(bytecheck::CheckBytes))
41)]
42#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
43pub struct OperatesEvent {
44    pub id: OperatorId,
45    pub addr: OperatorAddr,
46    pub name: String,
47}
48
49impl OperatesEvent {
50    #[inline]
51    pub const fn new(id: OperatorId, addr: OperatorAddr, name: String) -> Self {
52        Self { id, addr, name }
53    }
54}
55
56impl From<TimelyOperatesEvent> for OperatesEvent {
57    #[inline]
58    fn from(event: TimelyOperatesEvent) -> Self {
59        Self {
60            id: OperatorId::new(event.id),
61            addr: OperatorAddr::from(event.addr),
62            name: event.name,
63        }
64    }
65}
66
67/// The creation of a channel between two operators
68#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
69#[cfg_attr(
70    feature = "serde",
71    derive(SerdeSerialize, SerdeDeserialize),
72    serde(crate = "serde_dep")
73)]
74#[cfg_attr(
75    feature = "rkyv",
76    derive(Archive, RkyvSerialize, RkyvDeserialize),
77    archive(crate = "rkyv_dep"),
78    archive_attr(derive(bytecheck::CheckBytes))
79)]
80#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
81pub struct ChannelsEvent {
82    /// The id of the channel
83    pub id: ChannelId,
84    /// The address of the enclosing scope the channel is in
85    pub scope_addr: OperatorAddr,
86    /// The operator index and output port of the channel's source
87    // TODO: Make this a named struct
88    pub source: [PortId; 2],
89    /// The operator index and input port of the channel's target
90    // TODO: Make this a named struct
91    pub target: [PortId; 2],
92}
93
94impl ChannelsEvent {
95    /// Create a new [`ChannelsEvent`]
96    #[inline]
97    pub const fn new(
98        id: ChannelId,
99        scope_addr: OperatorAddr,
100        source: (PortId, PortId),
101        target: (PortId, PortId),
102    ) -> Self {
103        Self {
104            id,
105            scope_addr,
106            source: [source.0, source.1],
107            target: [target.0, target.1],
108        }
109    }
110}
111
112impl From<TimelyChannelsEvent> for ChannelsEvent {
113    #[inline]
114    fn from(event: TimelyChannelsEvent) -> Self {
115        Self {
116            id: ChannelId::new(event.id),
117            scope_addr: OperatorAddr::from(event.scope_addr),
118            source: [PortId::new(event.source.0), PortId::new(event.source.1)],
119            target: [PortId::new(event.target.0), PortId::new(event.target.1)],
120        }
121    }
122}
123
124#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
125#[cfg_attr(
126    feature = "serde",
127    derive(SerdeSerialize, SerdeDeserialize),
128    serde(crate = "serde_dep")
129)]
130#[cfg_attr(
131    feature = "rkyv",
132    derive(Archive, RkyvSerialize, RkyvDeserialize),
133    archive(crate = "rkyv_dep"),
134    archive_attr(derive(bytecheck::CheckBytes))
135)]
136#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
137pub struct PushProgressEvent {
138    pub op_id: OperatorId,
139}
140
141impl From<TimelyPushProgressEvent> for PushProgressEvent {
142    #[inline]
143    fn from(event: TimelyPushProgressEvent) -> Self {
144        Self {
145            op_id: OperatorId::new(event.op_id),
146        }
147    }
148}
149
150#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
151#[cfg_attr(
152    feature = "serde",
153    derive(SerdeSerialize, SerdeDeserialize),
154    serde(crate = "serde_dep")
155)]
156#[cfg_attr(
157    feature = "rkyv",
158    derive(Archive, RkyvSerialize, RkyvDeserialize),
159    archive(crate = "rkyv_dep"),
160    archive_attr(derive(bytecheck::CheckBytes))
161)]
162#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
163pub struct MessagesEvent {
164    /// `true` if send event, `false` if receive event.
165    pub is_send: bool,
166    /// Channel identifier
167    pub channel: ChannelId,
168    /// Source worker index.
169    pub source: OperatorId,
170    /// Target worker index.
171    pub target: OperatorId,
172    /// Message sequence number.
173    pub seq_no: usize,
174    /// Number of typed records in the message.
175    pub length: usize,
176}
177
178impl From<TimelyMessagesEvent> for MessagesEvent {
179    #[inline]
180    fn from(event: TimelyMessagesEvent) -> Self {
181        Self {
182            is_send: event.is_send,
183            channel: ChannelId::new(event.channel),
184            source: OperatorId::new(event.source),
185            target: OperatorId::new(event.target),
186            seq_no: event.seq_no,
187            length: event.length,
188        }
189    }
190}
191
192#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
193#[cfg_attr(
194    feature = "serde",
195    derive(SerdeSerialize, SerdeDeserialize),
196    serde(crate = "serde_dep")
197)]
198#[cfg_attr(
199    feature = "rkyv",
200    derive(Archive, RkyvSerialize, RkyvDeserialize),
201    archive(crate = "rkyv_dep"),
202    archive_attr(derive(bytecheck::CheckBytes))
203)]
204#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
205pub enum StartStop {
206    /// Operator starts
207    Start,
208    /// Operator stops
209    Stop,
210}
211
212impl StartStop {
213    #[inline]
214    pub const fn start() -> Self {
215        Self::Start
216    }
217
218    #[inline]
219    pub const fn stop() -> Self {
220        Self::Stop
221    }
222
223    /// Returns `true` if the start_stop is [`StartStop::Start`].
224    #[inline]
225    pub const fn is_start(&self) -> bool {
226        matches!(self, Self::Start)
227    }
228
229    /// Returns `true` if the start_stop is [`StartStop::Stop`].
230    #[inline]
231    pub const fn is_stop(&self) -> bool {
232        matches!(self, Self::Stop)
233    }
234}
235
236impl From<TimelyStartStop> for StartStop {
237    #[inline]
238    fn from(start_stop: TimelyStartStop) -> Self {
239        match start_stop {
240            TimelyStartStop::Start => Self::Start,
241            TimelyStartStop::Stop => Self::Stop,
242        }
243    }
244}
245
246#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
247#[cfg_attr(
248    feature = "serde",
249    derive(SerdeSerialize, SerdeDeserialize),
250    serde(crate = "serde_dep")
251)]
252#[cfg_attr(
253    feature = "rkyv",
254    derive(Archive, RkyvSerialize, RkyvDeserialize),
255    archive(crate = "rkyv_dep"),
256    archive_attr(derive(bytecheck::CheckBytes))
257)]
258#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
259pub struct ScheduleEvent {
260    pub id: OperatorId,
261    pub start_stop: StartStop,
262}
263
264impl From<TimelyScheduleEvent> for ScheduleEvent {
265    #[inline]
266    fn from(event: TimelyScheduleEvent) -> Self {
267        Self {
268            id: OperatorId::new(event.id),
269            start_stop: event.start_stop.into(),
270        }
271    }
272}
273
274#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
275#[cfg_attr(
276    feature = "serde",
277    derive(SerdeSerialize, SerdeDeserialize),
278    serde(crate = "serde_dep")
279)]
280#[cfg_attr(
281    feature = "rkyv",
282    derive(Archive, RkyvSerialize, RkyvDeserialize),
283    archive(crate = "rkyv_dep"),
284    archive_attr(derive(bytecheck::CheckBytes))
285)]
286#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
287pub struct ShutdownEvent {
288    pub id: OperatorId,
289}
290
291impl From<TimelyShutdownEvent> for ShutdownEvent {
292    #[inline]
293    fn from(event: TimelyShutdownEvent) -> Self {
294        Self {
295            id: OperatorId::new(event.id),
296        }
297    }
298}
299
300#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
301#[cfg_attr(
302    feature = "serde",
303    derive(SerdeSerialize, SerdeDeserialize),
304    serde(crate = "serde_dep")
305)]
306#[cfg_attr(
307    feature = "rkyv",
308    derive(Archive, RkyvSerialize, RkyvDeserialize),
309    archive(crate = "rkyv_dep"),
310    archive_attr(derive(bytecheck::CheckBytes))
311)]
312#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
313pub struct ApplicationEvent {
314    pub id: usize,
315    // TODO: Make this a `RkyvStartStop`?
316    pub is_start: bool,
317}
318
319impl From<TimelyApplicationEvent> for ApplicationEvent {
320    #[inline]
321    fn from(event: TimelyApplicationEvent) -> Self {
322        Self {
323            id: event.id,
324            is_start: event.is_start,
325        }
326    }
327}
328
329#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
330#[cfg_attr(
331    feature = "serde",
332    derive(SerdeSerialize, SerdeDeserialize),
333    serde(crate = "serde_dep")
334)]
335#[cfg_attr(
336    feature = "rkyv",
337    derive(Archive, RkyvSerialize, RkyvDeserialize),
338    archive(crate = "rkyv_dep"),
339    archive_attr(derive(bytecheck::CheckBytes))
340)]
341#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
342pub struct GuardedMessageEvent {
343    // TODO: Make this a `RkyvStartStop`?
344    pub is_start: bool,
345}
346
347impl From<TimelyGuardedMessageEvent> for GuardedMessageEvent {
348    #[inline]
349    fn from(event: TimelyGuardedMessageEvent) -> Self {
350        Self {
351            is_start: event.is_start,
352        }
353    }
354}
355
356#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
357#[cfg_attr(
358    feature = "serde",
359    derive(SerdeSerialize, SerdeDeserialize),
360    serde(crate = "serde_dep")
361)]
362#[cfg_attr(
363    feature = "rkyv",
364    derive(Archive, RkyvSerialize, RkyvDeserialize),
365    archive(crate = "rkyv_dep"),
366    archive_attr(derive(bytecheck::CheckBytes))
367)]
368#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
369pub struct GuardedProgressEvent {
370    // TODO: Make this a `RkyvStartStop`?
371    pub is_start: bool,
372}
373
374impl From<TimelyGuardedProgressEvent> for GuardedProgressEvent {
375    #[inline]
376    fn from(event: TimelyGuardedProgressEvent) -> Self {
377        Self {
378            is_start: event.is_start,
379        }
380    }
381}
382
383#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
384#[cfg_attr(
385    feature = "serde",
386    derive(SerdeSerialize, SerdeDeserialize),
387    serde(crate = "serde_dep")
388)]
389#[cfg_attr(
390    feature = "rkyv",
391    derive(Archive, RkyvSerialize, RkyvDeserialize),
392    archive(crate = "rkyv_dep"),
393    archive_attr(derive(bytecheck::CheckBytes))
394)]
395#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
396pub struct CommChannelsEvent {
397    pub identifier: usize,
398    pub kind: CommChannelKind,
399}
400
401impl From<TimelyCommChannelsEvent> for CommChannelsEvent {
402    #[inline]
403    fn from(event: TimelyCommChannelsEvent) -> Self {
404        Self {
405            identifier: event.identifier,
406            kind: event.kind.into(),
407        }
408    }
409}
410
411#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
412#[cfg_attr(
413    feature = "serde",
414    derive(SerdeSerialize, SerdeDeserialize),
415    serde(crate = "serde_dep")
416)]
417#[cfg_attr(
418    feature = "rkyv",
419    derive(Archive, RkyvSerialize, RkyvDeserialize),
420    archive(crate = "rkyv_dep"),
421    archive_attr(derive(bytecheck::CheckBytes))
422)]
423#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
424pub enum CommChannelKind {
425    Progress,
426    Data,
427}
428
429impl CommChannelKind {
430    /// Returns `true` if the comm_channel_kind is [`CommChannelKind::Progress`].
431    #[inline]
432    pub const fn is_progress(&self) -> bool {
433        matches!(self, Self::Progress)
434    }
435
436    /// Returns `true` if the comm_channel_kind is [`CommChannelKind::Data`].
437    #[inline]
438    pub const fn is_data(&self) -> bool {
439        matches!(self, Self::Data)
440    }
441}
442
443impl From<TimelyCommChannelKind> for CommChannelKind {
444    #[inline]
445    fn from(channel_kind: TimelyCommChannelKind) -> Self {
446        match channel_kind {
447            TimelyCommChannelKind::Progress => Self::Progress,
448            TimelyCommChannelKind::Data => Self::Data,
449        }
450    }
451}
452
453#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
454#[cfg_attr(
455    feature = "serde",
456    derive(SerdeSerialize, SerdeDeserialize),
457    serde(crate = "serde_dep")
458)]
459#[cfg_attr(
460    feature = "rkyv",
461    derive(Archive, RkyvSerialize, RkyvDeserialize),
462    archive(crate = "rkyv_dep"),
463    archive_attr(derive(bytecheck::CheckBytes))
464)]
465#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
466pub struct InputEvent {
467    pub start_stop: StartStop,
468}
469
470impl InputEvent {
471    #[inline]
472    pub const fn new(start_stop: StartStop) -> Self {
473        Self { start_stop }
474    }
475}
476
477impl From<TimelyInputEvent> for InputEvent {
478    #[inline]
479    fn from(event: TimelyInputEvent) -> Self {
480        Self {
481            start_stop: event.start_stop.into(),
482        }
483    }
484}
485
486#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
487#[cfg_attr(
488    feature = "serde",
489    derive(SerdeSerialize, SerdeDeserialize),
490    serde(crate = "serde_dep")
491)]
492#[cfg_attr(
493    feature = "rkyv",
494    derive(Archive, RkyvSerialize, RkyvDeserialize),
495    archive(crate = "rkyv_dep"),
496    archive_attr(derive(bytecheck::CheckBytes))
497)]
498#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
499pub enum ParkEvent {
500    Park(Option<Duration>),
501    Unpark,
502}
503
504impl ParkEvent {
505    /// Returns `true` if the park_event is [`ParkEvent::Park`].
506    #[inline]
507    pub const fn is_park(&self) -> bool {
508        matches!(self, Self::Park(..))
509    }
510
511    /// Returns `true` if the park_event is [`ParkEvent::Unpark`].
512    #[inline]
513    pub const fn is_unpark(&self) -> bool {
514        matches!(self, Self::Unpark)
515    }
516
517    /// Returns the maximum duration the park event will last for
518    #[inline]
519    pub const fn as_park(&self) -> Option<&Option<Duration>> {
520        if let Self::Park(duration) = self {
521            Some(duration)
522        } else {
523            None
524        }
525    }
526}
527
528impl From<TimelyParkEvent> for ParkEvent {
529    #[inline]
530    fn from(park: TimelyParkEvent) -> Self {
531        match park {
532            TimelyParkEvent::Park(duration) => Self::Park(duration),
533            TimelyParkEvent::Unpark => Self::Unpark,
534        }
535    }
536}