1mod timely_event;
2
3pub use timely_event::{ArchivedTimelyEvent, DistinguishingId, TimelyEvent, TimelyEventResolver};
4
5use crate::{
6 ids::{ChannelId, OperatorId, PortId},
7 OperatorAddr,
8};
9use core::{fmt::Debug, time::Duration};
10use timely::logging::{
11 ApplicationEvent as TimelyApplicationEvent, ChannelsEvent as TimelyChannelsEvent,
12 CommChannelKind as TimelyCommChannelKind, CommChannelsEvent as TimelyCommChannelsEvent,
13 GuardedMessageEvent as TimelyGuardedMessageEvent,
14 GuardedProgressEvent as TimelyGuardedProgressEvent, InputEvent as TimelyInputEvent,
15 MessagesEvent as TimelyMessagesEvent, OperatesEvent as TimelyOperatesEvent,
16 ParkEvent as TimelyParkEvent, PushProgressEvent as TimelyPushProgressEvent,
17 ScheduleEvent as TimelyScheduleEvent, ShutdownEvent as TimelyShutdownEvent,
18 StartStop as TimelyStartStop,
19};
20
21#[cfg(feature = "rkyv")]
22use rkyv_dep::{Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
23
24#[cfg(feature = "serde")]
25use serde_dep::{Deserialize as SerdeDeserialize, Serialize as SerdeSerialize};
26
27#[cfg(feature = "enable_abomonation")]
28use abomonation_derive::Abomonation;
29
30#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
31#[cfg_attr(
32 feature = "serde",
33 derive(SerdeSerialize, SerdeDeserialize),
34 serde(crate = "serde_dep")
35)]
36#[cfg_attr(
37 feature = "rkyv",
38 derive(Archive, RkyvSerialize, RkyvDeserialize),
39 archive(crate = "rkyv_dep"),
40 archive_attr(derive(bytecheck::CheckBytes))
41)]
42#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
43pub struct OperatesEvent {
44 pub id: OperatorId,
45 pub addr: OperatorAddr,
46 pub name: String,
47}
48
49impl OperatesEvent {
50 #[inline]
51 pub const fn new(id: OperatorId, addr: OperatorAddr, name: String) -> Self {
52 Self { id, addr, name }
53 }
54}
55
56impl From<TimelyOperatesEvent> for OperatesEvent {
57 #[inline]
58 fn from(event: TimelyOperatesEvent) -> Self {
59 Self {
60 id: OperatorId::new(event.id),
61 addr: OperatorAddr::from(event.addr),
62 name: event.name,
63 }
64 }
65}
66
67#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
69#[cfg_attr(
70 feature = "serde",
71 derive(SerdeSerialize, SerdeDeserialize),
72 serde(crate = "serde_dep")
73)]
74#[cfg_attr(
75 feature = "rkyv",
76 derive(Archive, RkyvSerialize, RkyvDeserialize),
77 archive(crate = "rkyv_dep"),
78 archive_attr(derive(bytecheck::CheckBytes))
79)]
80#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
81pub struct ChannelsEvent {
82 pub id: ChannelId,
84 pub scope_addr: OperatorAddr,
86 pub source: [PortId; 2],
89 pub target: [PortId; 2],
92}
93
94impl ChannelsEvent {
95 #[inline]
97 pub const fn new(
98 id: ChannelId,
99 scope_addr: OperatorAddr,
100 source: (PortId, PortId),
101 target: (PortId, PortId),
102 ) -> Self {
103 Self {
104 id,
105 scope_addr,
106 source: [source.0, source.1],
107 target: [target.0, target.1],
108 }
109 }
110}
111
112impl From<TimelyChannelsEvent> for ChannelsEvent {
113 #[inline]
114 fn from(event: TimelyChannelsEvent) -> Self {
115 Self {
116 id: ChannelId::new(event.id),
117 scope_addr: OperatorAddr::from(event.scope_addr),
118 source: [PortId::new(event.source.0), PortId::new(event.source.1)],
119 target: [PortId::new(event.target.0), PortId::new(event.target.1)],
120 }
121 }
122}
123
124#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
125#[cfg_attr(
126 feature = "serde",
127 derive(SerdeSerialize, SerdeDeserialize),
128 serde(crate = "serde_dep")
129)]
130#[cfg_attr(
131 feature = "rkyv",
132 derive(Archive, RkyvSerialize, RkyvDeserialize),
133 archive(crate = "rkyv_dep"),
134 archive_attr(derive(bytecheck::CheckBytes))
135)]
136#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
137pub struct PushProgressEvent {
138 pub op_id: OperatorId,
139}
140
141impl From<TimelyPushProgressEvent> for PushProgressEvent {
142 #[inline]
143 fn from(event: TimelyPushProgressEvent) -> Self {
144 Self {
145 op_id: OperatorId::new(event.op_id),
146 }
147 }
148}
149
150#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
151#[cfg_attr(
152 feature = "serde",
153 derive(SerdeSerialize, SerdeDeserialize),
154 serde(crate = "serde_dep")
155)]
156#[cfg_attr(
157 feature = "rkyv",
158 derive(Archive, RkyvSerialize, RkyvDeserialize),
159 archive(crate = "rkyv_dep"),
160 archive_attr(derive(bytecheck::CheckBytes))
161)]
162#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
163pub struct MessagesEvent {
164 pub is_send: bool,
166 pub channel: ChannelId,
168 pub source: OperatorId,
170 pub target: OperatorId,
172 pub seq_no: usize,
174 pub length: usize,
176}
177
178impl From<TimelyMessagesEvent> for MessagesEvent {
179 #[inline]
180 fn from(event: TimelyMessagesEvent) -> Self {
181 Self {
182 is_send: event.is_send,
183 channel: ChannelId::new(event.channel),
184 source: OperatorId::new(event.source),
185 target: OperatorId::new(event.target),
186 seq_no: event.seq_no,
187 length: event.length,
188 }
189 }
190}
191
192#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
193#[cfg_attr(
194 feature = "serde",
195 derive(SerdeSerialize, SerdeDeserialize),
196 serde(crate = "serde_dep")
197)]
198#[cfg_attr(
199 feature = "rkyv",
200 derive(Archive, RkyvSerialize, RkyvDeserialize),
201 archive(crate = "rkyv_dep"),
202 archive_attr(derive(bytecheck::CheckBytes))
203)]
204#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
205pub enum StartStop {
206 Start,
208 Stop,
210}
211
212impl StartStop {
213 #[inline]
214 pub const fn start() -> Self {
215 Self::Start
216 }
217
218 #[inline]
219 pub const fn stop() -> Self {
220 Self::Stop
221 }
222
223 #[inline]
225 pub const fn is_start(&self) -> bool {
226 matches!(self, Self::Start)
227 }
228
229 #[inline]
231 pub const fn is_stop(&self) -> bool {
232 matches!(self, Self::Stop)
233 }
234}
235
236impl From<TimelyStartStop> for StartStop {
237 #[inline]
238 fn from(start_stop: TimelyStartStop) -> Self {
239 match start_stop {
240 TimelyStartStop::Start => Self::Start,
241 TimelyStartStop::Stop => Self::Stop,
242 }
243 }
244}
245
246#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
247#[cfg_attr(
248 feature = "serde",
249 derive(SerdeSerialize, SerdeDeserialize),
250 serde(crate = "serde_dep")
251)]
252#[cfg_attr(
253 feature = "rkyv",
254 derive(Archive, RkyvSerialize, RkyvDeserialize),
255 archive(crate = "rkyv_dep"),
256 archive_attr(derive(bytecheck::CheckBytes))
257)]
258#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
259pub struct ScheduleEvent {
260 pub id: OperatorId,
261 pub start_stop: StartStop,
262}
263
264impl From<TimelyScheduleEvent> for ScheduleEvent {
265 #[inline]
266 fn from(event: TimelyScheduleEvent) -> Self {
267 Self {
268 id: OperatorId::new(event.id),
269 start_stop: event.start_stop.into(),
270 }
271 }
272}
273
274#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
275#[cfg_attr(
276 feature = "serde",
277 derive(SerdeSerialize, SerdeDeserialize),
278 serde(crate = "serde_dep")
279)]
280#[cfg_attr(
281 feature = "rkyv",
282 derive(Archive, RkyvSerialize, RkyvDeserialize),
283 archive(crate = "rkyv_dep"),
284 archive_attr(derive(bytecheck::CheckBytes))
285)]
286#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
287pub struct ShutdownEvent {
288 pub id: OperatorId,
289}
290
291impl From<TimelyShutdownEvent> for ShutdownEvent {
292 #[inline]
293 fn from(event: TimelyShutdownEvent) -> Self {
294 Self {
295 id: OperatorId::new(event.id),
296 }
297 }
298}
299
300#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
301#[cfg_attr(
302 feature = "serde",
303 derive(SerdeSerialize, SerdeDeserialize),
304 serde(crate = "serde_dep")
305)]
306#[cfg_attr(
307 feature = "rkyv",
308 derive(Archive, RkyvSerialize, RkyvDeserialize),
309 archive(crate = "rkyv_dep"),
310 archive_attr(derive(bytecheck::CheckBytes))
311)]
312#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
313pub struct ApplicationEvent {
314 pub id: usize,
315 pub is_start: bool,
317}
318
319impl From<TimelyApplicationEvent> for ApplicationEvent {
320 #[inline]
321 fn from(event: TimelyApplicationEvent) -> Self {
322 Self {
323 id: event.id,
324 is_start: event.is_start,
325 }
326 }
327}
328
329#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
330#[cfg_attr(
331 feature = "serde",
332 derive(SerdeSerialize, SerdeDeserialize),
333 serde(crate = "serde_dep")
334)]
335#[cfg_attr(
336 feature = "rkyv",
337 derive(Archive, RkyvSerialize, RkyvDeserialize),
338 archive(crate = "rkyv_dep"),
339 archive_attr(derive(bytecheck::CheckBytes))
340)]
341#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
342pub struct GuardedMessageEvent {
343 pub is_start: bool,
345}
346
347impl From<TimelyGuardedMessageEvent> for GuardedMessageEvent {
348 #[inline]
349 fn from(event: TimelyGuardedMessageEvent) -> Self {
350 Self {
351 is_start: event.is_start,
352 }
353 }
354}
355
356#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
357#[cfg_attr(
358 feature = "serde",
359 derive(SerdeSerialize, SerdeDeserialize),
360 serde(crate = "serde_dep")
361)]
362#[cfg_attr(
363 feature = "rkyv",
364 derive(Archive, RkyvSerialize, RkyvDeserialize),
365 archive(crate = "rkyv_dep"),
366 archive_attr(derive(bytecheck::CheckBytes))
367)]
368#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
369pub struct GuardedProgressEvent {
370 pub is_start: bool,
372}
373
374impl From<TimelyGuardedProgressEvent> for GuardedProgressEvent {
375 #[inline]
376 fn from(event: TimelyGuardedProgressEvent) -> Self {
377 Self {
378 is_start: event.is_start,
379 }
380 }
381}
382
383#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
384#[cfg_attr(
385 feature = "serde",
386 derive(SerdeSerialize, SerdeDeserialize),
387 serde(crate = "serde_dep")
388)]
389#[cfg_attr(
390 feature = "rkyv",
391 derive(Archive, RkyvSerialize, RkyvDeserialize),
392 archive(crate = "rkyv_dep"),
393 archive_attr(derive(bytecheck::CheckBytes))
394)]
395#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
396pub struct CommChannelsEvent {
397 pub identifier: usize,
398 pub kind: CommChannelKind,
399}
400
401impl From<TimelyCommChannelsEvent> for CommChannelsEvent {
402 #[inline]
403 fn from(event: TimelyCommChannelsEvent) -> Self {
404 Self {
405 identifier: event.identifier,
406 kind: event.kind.into(),
407 }
408 }
409}
410
411#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
412#[cfg_attr(
413 feature = "serde",
414 derive(SerdeSerialize, SerdeDeserialize),
415 serde(crate = "serde_dep")
416)]
417#[cfg_attr(
418 feature = "rkyv",
419 derive(Archive, RkyvSerialize, RkyvDeserialize),
420 archive(crate = "rkyv_dep"),
421 archive_attr(derive(bytecheck::CheckBytes))
422)]
423#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
424pub enum CommChannelKind {
425 Progress,
426 Data,
427}
428
429impl CommChannelKind {
430 #[inline]
432 pub const fn is_progress(&self) -> bool {
433 matches!(self, Self::Progress)
434 }
435
436 #[inline]
438 pub const fn is_data(&self) -> bool {
439 matches!(self, Self::Data)
440 }
441}
442
443impl From<TimelyCommChannelKind> for CommChannelKind {
444 #[inline]
445 fn from(channel_kind: TimelyCommChannelKind) -> Self {
446 match channel_kind {
447 TimelyCommChannelKind::Progress => Self::Progress,
448 TimelyCommChannelKind::Data => Self::Data,
449 }
450 }
451}
452
453#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
454#[cfg_attr(
455 feature = "serde",
456 derive(SerdeSerialize, SerdeDeserialize),
457 serde(crate = "serde_dep")
458)]
459#[cfg_attr(
460 feature = "rkyv",
461 derive(Archive, RkyvSerialize, RkyvDeserialize),
462 archive(crate = "rkyv_dep"),
463 archive_attr(derive(bytecheck::CheckBytes))
464)]
465#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
466pub struct InputEvent {
467 pub start_stop: StartStop,
468}
469
470impl InputEvent {
471 #[inline]
472 pub const fn new(start_stop: StartStop) -> Self {
473 Self { start_stop }
474 }
475}
476
477impl From<TimelyInputEvent> for InputEvent {
478 #[inline]
479 fn from(event: TimelyInputEvent) -> Self {
480 Self {
481 start_stop: event.start_stop.into(),
482 }
483 }
484}
485
486#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
487#[cfg_attr(
488 feature = "serde",
489 derive(SerdeSerialize, SerdeDeserialize),
490 serde(crate = "serde_dep")
491)]
492#[cfg_attr(
493 feature = "rkyv",
494 derive(Archive, RkyvSerialize, RkyvDeserialize),
495 archive(crate = "rkyv_dep"),
496 archive_attr(derive(bytecheck::CheckBytes))
497)]
498#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
499pub enum ParkEvent {
500 Park(Option<Duration>),
501 Unpark,
502}
503
504impl ParkEvent {
505 #[inline]
507 pub const fn is_park(&self) -> bool {
508 matches!(self, Self::Park(..))
509 }
510
511 #[inline]
513 pub const fn is_unpark(&self) -> bool {
514 matches!(self, Self::Unpark)
515 }
516
517 #[inline]
519 pub const fn as_park(&self) -> Option<&Option<Duration>> {
520 if let Self::Park(duration) = self {
521 Some(duration)
522 } else {
523 None
524 }
525 }
526}
527
528impl From<TimelyParkEvent> for ParkEvent {
529 #[inline]
530 fn from(park: TimelyParkEvent) -> Self {
531 match park {
532 TimelyParkEvent::Park(duration) => Self::Park(duration),
533 TimelyParkEvent::Unpark => Self::Unpark,
534 }
535 }
536}