ddshow_types/
event.rs

1//! Timely progress events
2
3use crate::WorkerId;
4use core::time::Duration;
5use timely::dataflow::operators::capture::event::Event as TimelyEvent;
6
7#[cfg(feature = "enable_abomonation")]
8use abomonation_derive::Abomonation;
9
10#[cfg(feature = "rkyv")]
11use rkyv_dep::{Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
12
13#[cfg(feature = "serde")]
14use serde_dep::{Deserialize as SerdeDeserialize, Serialize as SerdeSerialize};
15
16#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
17#[cfg_attr(
18    feature = "serde",
19    derive(SerdeSerialize, SerdeDeserialize),
20    serde(crate = "serde_dep")
21)]
22#[cfg_attr(
23    feature = "rkyv",
24    derive(Archive, RkyvSerialize, RkyvDeserialize),
25    archive(crate = "rkyv_dep"),
26    archive_attr(derive(bytecheck::CheckBytes))
27)]
28#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
29pub struct Bundle<D, Id = WorkerId> {
30    pub time: Duration,
31    pub worker: Id,
32    pub event: D,
33}
34
35impl<D, Id> Bundle<D, Id> {
36    #[inline]
37    pub fn new(time: Duration, worker: Id, event: D) -> Self {
38        Self {
39            time,
40            worker,
41            event,
42        }
43    }
44}
45
46impl<D, Id> From<(Duration, Id, D)> for Bundle<D, Id> {
47    #[inline]
48    fn from((time, worker, event): (Duration, Id, D)) -> Self {
49        Self {
50            time,
51            worker,
52            event,
53        }
54    }
55}
56
57impl<D, Id> From<Bundle<D, Id>> for (Duration, Id, D) {
58    #[inline]
59    fn from(
60        Bundle {
61            time,
62            worker,
63            event,
64        }: Bundle<D, Id>,
65    ) -> Self {
66        (time, worker, event)
67    }
68}
69
70#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
71#[cfg_attr(
72    feature = "serde",
73    derive(SerdeSerialize, SerdeDeserialize),
74    serde(crate = "serde_dep")
75)]
76#[cfg_attr(
77    feature = "rkyv",
78    derive(Archive, RkyvSerialize, RkyvDeserialize),
79    archive(crate = "rkyv_dep"),
80    archive_attr(derive(bytecheck::CheckBytes))
81)]
82#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
83pub struct CapabilityBundle<T> {
84    pub time: T,
85    pub diff: i64,
86}
87
88impl<T> CapabilityBundle<T> {
89    #[inline]
90    pub fn new(time: T, diff: i64) -> Self {
91        Self { time, diff }
92    }
93}
94
95impl<T> From<(T, i64)> for CapabilityBundle<T> {
96    #[inline]
97    fn from((time, diff): (T, i64)) -> Self {
98        Self { time, diff }
99    }
100}
101
102impl<T> From<CapabilityBundle<T>> for (T, i64) {
103    #[inline]
104    fn from(CapabilityBundle { time, diff }: CapabilityBundle<T>) -> Self {
105        (time, diff)
106    }
107}
108
109/// Data and progress events of a captured stream.
110#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
111#[cfg_attr(
112    feature = "serde",
113    derive(SerdeSerialize, SerdeDeserialize),
114    serde(crate = "serde_dep")
115)]
116#[cfg_attr(
117    feature = "rkyv",
118    derive(Archive, RkyvSerialize, RkyvDeserialize),
119    archive(crate = "rkyv_dep"),
120    archive_attr(derive(bytecheck::CheckBytes))
121)]
122#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
123pub enum Event<T, D> {
124    /// Progress received via `push_external_progress`
125    Progress(Vec<(T, i64)>),
126    /// Messages received via the data stream
127    Messages(T, Vec<D>),
128}
129
130impl<T, D> Event<T, D> {
131    /// Returns `true` if the event is [`Event::Progress`].
132    #[inline]
133    pub const fn is_progress(&self) -> bool {
134        matches!(self, Self::Progress(..))
135    }
136
137    /// Returns `true` if the event is [`Event::Messages`]
138    #[inline]
139    pub const fn is_messages(&self) -> bool {
140        matches!(self, Self::Messages(..))
141    }
142
143    /// Returns progress data if the event is [`Event::Progress`]
144    #[inline]
145    pub const fn as_progress(&self) -> Option<&Vec<(T, i64)>> {
146        if let Self::Progress(progress) = self {
147            Some(progress)
148        } else {
149            None
150        }
151    }
152
153    /// Returns message data if the event is [`Event::Messages`]
154    #[inline]
155    pub const fn as_messages(&self) -> Option<(&T, &Vec<D>)> {
156        if let Self::Messages(time, messages) = self {
157            Some((time, messages))
158        } else {
159            None
160        }
161    }
162
163    /// Returns progress data if the event is [`Event::Progress`]
164    #[inline]
165    pub fn into_progress(self) -> Result<Vec<(T, i64)>, Self> {
166        if let Self::Progress(progress) = self {
167            Ok(progress)
168        } else {
169            Err(self)
170        }
171    }
172
173    /// Returns message data if the event is [`Event::Messages`]
174    #[inline]
175    pub fn into_messages(self) -> Result<(T, Vec<D>), Self> {
176        if let Self::Messages(time, messages) = self {
177            Ok((time, messages))
178        } else {
179            Err(self)
180        }
181    }
182}
183
184impl<T, D> From<TimelyEvent<T, D>> for Event<T, D> {
185    #[inline]
186    fn from(event: TimelyEvent<T, D>) -> Self {
187        match event {
188            TimelyEvent::Progress(progress) => Self::Progress(progress),
189            TimelyEvent::Messages(time, messages) => Self::Messages(time, messages),
190        }
191    }
192}
193
194impl<T, D> From<Event<T, D>> for TimelyEvent<T, D> {
195    #[inline]
196    fn from(val: Event<T, D>) -> Self {
197        match val {
198            Event::Progress(progress) => Self::Progress(progress),
199            Event::Messages(time, messages) => Self::Messages(time, messages),
200        }
201    }
202}