ddshow_types/
progress_logging.rs

1//! Timely progress events
2
3use crate::{
4    ids::{ChannelId, PortId, WorkerId},
5    OperatorAddr,
6};
7use timely::logging::TimelyProgressEvent as RawTimelyProgressEvent;
8
9#[cfg(feature = "enable_abomonation")]
10use abomonation_derive::Abomonation;
11
12#[cfg(feature = "rkyv")]
13use rkyv_dep::{Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
14
15#[cfg(feature = "serde")]
16use serde_dep::{Deserialize as SerdeDeserialize, Serialize as SerdeSerialize};
17
18#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
19#[cfg_attr(
20    feature = "serde",
21    derive(SerdeSerialize, SerdeDeserialize),
22    serde(crate = "serde_dep")
23)]
24#[cfg_attr(
25    feature = "rkyv",
26    derive(Archive, RkyvSerialize, RkyvDeserialize),
27    archive(crate = "rkyv_dep"),
28    archive_attr(derive(bytecheck::CheckBytes))
29)]
30#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
31pub struct TimelyProgressEvent {
32    /// `true` if the event is a send, and `false` if it is a receive.
33    pub is_send: bool,
34    /// Source worker index.
35    pub worker: WorkerId,
36    /// Communication channel identifier
37    pub channel: ChannelId,
38    /// Message sequence number.
39    pub seq_no: usize,
40    /// Sequence of nested scope identifiers indicating the path from the root to this instance.
41    pub addr: OperatorAddr,
42    /// List of message updates containing Target descriptor, timestamp as string, and delta.
43    pub messages: Vec<MessageUpdate>,
44    /// List of capability updates containing Source descriptor, timestamp as string, and delta.
45    pub internal: Vec<CapabilityUpdate>,
46}
47
48impl TimelyProgressEvent {
49    pub fn new(
50        is_send: bool,
51        worker: WorkerId,
52        channel: ChannelId,
53        seq_no: usize,
54        addr: OperatorAddr,
55        messages: Vec<MessageUpdate>,
56        internal: Vec<CapabilityUpdate>,
57    ) -> Self {
58        Self {
59            is_send,
60            worker,
61            channel,
62            seq_no,
63            addr,
64            messages,
65            internal,
66        }
67    }
68}
69
70impl From<RawTimelyProgressEvent> for TimelyProgressEvent {
71    fn from(event: RawTimelyProgressEvent) -> Self {
72        let messages = event
73            .messages
74            .iter()
75            .map(|(&node, &port, time, &diff)| {
76                MessageUpdate::new(
77                    PortId::new(node),
78                    PortId::new(port),
79                    format!("{:?}", time),
80                    time.type_name().to_owned(),
81                    diff,
82                )
83            })
84            .collect();
85
86        let internal = event
87            .internal
88            .iter()
89            .map(|(&node, &port, time, &diff)| {
90                CapabilityUpdate::new(
91                    PortId::new(node),
92                    PortId::new(port),
93                    format!("{:?}", time),
94                    time.type_name().to_owned(),
95                    diff,
96                )
97            })
98            .collect();
99
100        Self {
101            is_send: event.is_send,
102            worker: WorkerId::new(event.source),
103            channel: ChannelId::new(event.channel),
104            seq_no: event.seq_no,
105            addr: OperatorAddr::from(event.addr),
106            messages,
107            internal,
108        }
109    }
110}
111
112#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
113#[cfg_attr(
114    feature = "serde",
115    derive(SerdeSerialize, SerdeDeserialize),
116    serde(crate = "serde_dep")
117)]
118#[cfg_attr(
119    feature = "rkyv",
120    derive(Archive, RkyvSerialize, RkyvDeserialize),
121    archive(crate = "rkyv_dep"),
122    archive_attr(derive(bytecheck::CheckBytes))
123)]
124#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
125pub struct MessageUpdate {
126    pub node: PortId,
127    pub port: PortId,
128    /// The update's timestamp, printed via its [`Debug`](`std::fmt::Debug`) implementation
129    pub timestamp: String,
130    /// The type of the update's timestamp, printed via its [`Any`](`std::any::Any`) implementation
131    pub timestamp_type: String,
132    /// The number of message updates
133    pub diff: i64,
134}
135
136impl MessageUpdate {
137    pub const fn new(
138        node: PortId,
139        port: PortId,
140        timestamp: String,
141        timestamp_type: String,
142        diff: i64,
143    ) -> Self {
144        Self {
145            node,
146            port,
147            timestamp,
148            timestamp_type,
149            diff,
150        }
151    }
152}
153
154#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
155#[cfg_attr(
156    feature = "serde",
157    derive(SerdeSerialize, SerdeDeserialize),
158    serde(crate = "serde_dep")
159)]
160#[cfg_attr(
161    feature = "rkyv",
162    derive(Archive, RkyvSerialize, RkyvDeserialize),
163    archive(crate = "rkyv_dep"),
164    archive_attr(derive(bytecheck::CheckBytes))
165)]
166#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
167pub struct CapabilityUpdate {
168    pub node: PortId,
169    pub port: PortId,
170    /// The update's timestamp, printed via its [`Debug`](`std::fmt::Debug`) implementation
171    pub timestamp: String,
172    /// The type of the update's timestamp, printed via its [`Any`](`std::any::Any`) implementation
173    pub timestamp_type: String,
174    /// The number of capability updates
175    pub diff: i64,
176}
177
178impl CapabilityUpdate {
179    pub const fn new(
180        node: PortId,
181        port: PortId,
182        timestamp: String,
183        timestamp_type: String,
184        diff: i64,
185    ) -> Self {
186        Self {
187            node,
188            port,
189            timestamp,
190            timestamp_type,
191            diff,
192        }
193    }
194}