1use crate::WorkerId;
4use core::time::Duration;
5use timely::dataflow::operators::capture::event::Event as TimelyEvent;
6
7#[cfg(feature = "enable_abomonation")]
8use abomonation_derive::Abomonation;
9
10#[cfg(feature = "rkyv")]
11use rkyv_dep::{Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
12
13#[cfg(feature = "serde")]
14use serde_dep::{Deserialize as SerdeDeserialize, Serialize as SerdeSerialize};
15
16#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
17#[cfg_attr(
18 feature = "serde",
19 derive(SerdeSerialize, SerdeDeserialize),
20 serde(crate = "serde_dep")
21)]
22#[cfg_attr(
23 feature = "rkyv",
24 derive(Archive, RkyvSerialize, RkyvDeserialize),
25 archive(crate = "rkyv_dep"),
26 archive_attr(derive(bytecheck::CheckBytes))
27)]
28#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
29pub struct Bundle<D, Id = WorkerId> {
30 pub time: Duration,
31 pub worker: Id,
32 pub event: D,
33}
34
35impl<D, Id> Bundle<D, Id> {
36 #[inline]
37 pub fn new(time: Duration, worker: Id, event: D) -> Self {
38 Self {
39 time,
40 worker,
41 event,
42 }
43 }
44}
45
46impl<D, Id> From<(Duration, Id, D)> for Bundle<D, Id> {
47 #[inline]
48 fn from((time, worker, event): (Duration, Id, D)) -> Self {
49 Self {
50 time,
51 worker,
52 event,
53 }
54 }
55}
56
57impl<D, Id> From<Bundle<D, Id>> for (Duration, Id, D) {
58 #[inline]
59 fn from(
60 Bundle {
61 time,
62 worker,
63 event,
64 }: Bundle<D, Id>,
65 ) -> Self {
66 (time, worker, event)
67 }
68}
69
70#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
71#[cfg_attr(
72 feature = "serde",
73 derive(SerdeSerialize, SerdeDeserialize),
74 serde(crate = "serde_dep")
75)]
76#[cfg_attr(
77 feature = "rkyv",
78 derive(Archive, RkyvSerialize, RkyvDeserialize),
79 archive(crate = "rkyv_dep"),
80 archive_attr(derive(bytecheck::CheckBytes))
81)]
82#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
83pub struct CapabilityBundle<T> {
84 pub time: T,
85 pub diff: i64,
86}
87
88impl<T> CapabilityBundle<T> {
89 #[inline]
90 pub fn new(time: T, diff: i64) -> Self {
91 Self { time, diff }
92 }
93}
94
95impl<T> From<(T, i64)> for CapabilityBundle<T> {
96 #[inline]
97 fn from((time, diff): (T, i64)) -> Self {
98 Self { time, diff }
99 }
100}
101
102impl<T> From<CapabilityBundle<T>> for (T, i64) {
103 #[inline]
104 fn from(CapabilityBundle { time, diff }: CapabilityBundle<T>) -> Self {
105 (time, diff)
106 }
107}
108
109#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
111#[cfg_attr(
112 feature = "serde",
113 derive(SerdeSerialize, SerdeDeserialize),
114 serde(crate = "serde_dep")
115)]
116#[cfg_attr(
117 feature = "rkyv",
118 derive(Archive, RkyvSerialize, RkyvDeserialize),
119 archive(crate = "rkyv_dep"),
120 archive_attr(derive(bytecheck::CheckBytes))
121)]
122#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
123pub enum Event<T, D> {
124 Progress(Vec<(T, i64)>),
126 Messages(T, Vec<D>),
128}
129
130impl<T, D> Event<T, D> {
131 #[inline]
133 pub const fn is_progress(&self) -> bool {
134 matches!(self, Self::Progress(..))
135 }
136
137 #[inline]
139 pub const fn is_messages(&self) -> bool {
140 matches!(self, Self::Messages(..))
141 }
142
143 #[inline]
145 pub const fn as_progress(&self) -> Option<&Vec<(T, i64)>> {
146 if let Self::Progress(progress) = self {
147 Some(progress)
148 } else {
149 None
150 }
151 }
152
153 #[inline]
155 pub const fn as_messages(&self) -> Option<(&T, &Vec<D>)> {
156 if let Self::Messages(time, messages) = self {
157 Some((time, messages))
158 } else {
159 None
160 }
161 }
162
163 #[inline]
165 pub fn into_progress(self) -> Result<Vec<(T, i64)>, Self> {
166 if let Self::Progress(progress) = self {
167 Ok(progress)
168 } else {
169 Err(self)
170 }
171 }
172
173 #[inline]
175 pub fn into_messages(self) -> Result<(T, Vec<D>), Self> {
176 if let Self::Messages(time, messages) = self {
177 Ok((time, messages))
178 } else {
179 Err(self)
180 }
181 }
182}
183
184impl<T, D> From<TimelyEvent<T, D>> for Event<T, D> {
185 #[inline]
186 fn from(event: TimelyEvent<T, D>) -> Self {
187 match event {
188 TimelyEvent::Progress(progress) => Self::Progress(progress),
189 TimelyEvent::Messages(time, messages) => Self::Messages(time, messages),
190 }
191 }
192}
193
194impl<T, D> From<Event<T, D>> for TimelyEvent<T, D> {
195 #[inline]
196 fn from(val: Event<T, D>) -> Self {
197 match val {
198 Event::Progress(progress) => Self::Progress(progress),
199 Event::Messages(time, messages) => Self::Messages(time, messages),
200 }
201 }
202}