1use crate::{
4 ids::{ChannelId, PortId, WorkerId},
5 OperatorAddr,
6};
7use timely::logging::TimelyProgressEvent as RawTimelyProgressEvent;
8
9#[cfg(feature = "enable_abomonation")]
10use abomonation_derive::Abomonation;
11
12#[cfg(feature = "rkyv")]
13use rkyv_dep::{Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
14
15#[cfg(feature = "serde")]
16use serde_dep::{Deserialize as SerdeDeserialize, Serialize as SerdeSerialize};
17
18#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
19#[cfg_attr(
20 feature = "serde",
21 derive(SerdeSerialize, SerdeDeserialize),
22 serde(crate = "serde_dep")
23)]
24#[cfg_attr(
25 feature = "rkyv",
26 derive(Archive, RkyvSerialize, RkyvDeserialize),
27 archive(crate = "rkyv_dep"),
28 archive_attr(derive(bytecheck::CheckBytes))
29)]
30#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
31pub struct TimelyProgressEvent {
32 pub is_send: bool,
34 pub worker: WorkerId,
36 pub channel: ChannelId,
38 pub seq_no: usize,
40 pub addr: OperatorAddr,
42 pub messages: Vec<MessageUpdate>,
44 pub internal: Vec<CapabilityUpdate>,
46}
47
48impl TimelyProgressEvent {
49 pub fn new(
50 is_send: bool,
51 worker: WorkerId,
52 channel: ChannelId,
53 seq_no: usize,
54 addr: OperatorAddr,
55 messages: Vec<MessageUpdate>,
56 internal: Vec<CapabilityUpdate>,
57 ) -> Self {
58 Self {
59 is_send,
60 worker,
61 channel,
62 seq_no,
63 addr,
64 messages,
65 internal,
66 }
67 }
68}
69
70impl From<RawTimelyProgressEvent> for TimelyProgressEvent {
71 fn from(event: RawTimelyProgressEvent) -> Self {
72 let messages = event
73 .messages
74 .iter()
75 .map(|(&node, &port, time, &diff)| {
76 MessageUpdate::new(
77 PortId::new(node),
78 PortId::new(port),
79 format!("{:?}", time),
80 time.type_name().to_owned(),
81 diff,
82 )
83 })
84 .collect();
85
86 let internal = event
87 .internal
88 .iter()
89 .map(|(&node, &port, time, &diff)| {
90 CapabilityUpdate::new(
91 PortId::new(node),
92 PortId::new(port),
93 format!("{:?}", time),
94 time.type_name().to_owned(),
95 diff,
96 )
97 })
98 .collect();
99
100 Self {
101 is_send: event.is_send,
102 worker: WorkerId::new(event.source),
103 channel: ChannelId::new(event.channel),
104 seq_no: event.seq_no,
105 addr: OperatorAddr::from(event.addr),
106 messages,
107 internal,
108 }
109 }
110}
111
112#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
113#[cfg_attr(
114 feature = "serde",
115 derive(SerdeSerialize, SerdeDeserialize),
116 serde(crate = "serde_dep")
117)]
118#[cfg_attr(
119 feature = "rkyv",
120 derive(Archive, RkyvSerialize, RkyvDeserialize),
121 archive(crate = "rkyv_dep"),
122 archive_attr(derive(bytecheck::CheckBytes))
123)]
124#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
125pub struct MessageUpdate {
126 pub node: PortId,
127 pub port: PortId,
128 pub timestamp: String,
130 pub timestamp_type: String,
132 pub diff: i64,
134}
135
136impl MessageUpdate {
137 pub const fn new(
138 node: PortId,
139 port: PortId,
140 timestamp: String,
141 timestamp_type: String,
142 diff: i64,
143 ) -> Self {
144 Self {
145 node,
146 port,
147 timestamp,
148 timestamp_type,
149 diff,
150 }
151 }
152}
153
154#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
155#[cfg_attr(
156 feature = "serde",
157 derive(SerdeSerialize, SerdeDeserialize),
158 serde(crate = "serde_dep")
159)]
160#[cfg_attr(
161 feature = "rkyv",
162 derive(Archive, RkyvSerialize, RkyvDeserialize),
163 archive(crate = "rkyv_dep"),
164 archive_attr(derive(bytecheck::CheckBytes))
165)]
166#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
167pub struct CapabilityUpdate {
168 pub node: PortId,
169 pub port: PortId,
170 pub timestamp: String,
172 pub timestamp_type: String,
174 pub diff: i64,
176}
177
178impl CapabilityUpdate {
179 pub const fn new(
180 node: PortId,
181 port: PortId,
182 timestamp: String,
183 timestamp_type: String,
184 diff: i64,
185 ) -> Self {
186 Self {
187 node,
188 port,
189 timestamp,
190 timestamp_type,
191 diff,
192 }
193 }
194}