1use crate::ids::OperatorId;
4use differential_dataflow::logging::{
5 BatchEvent as RawBatchEvent, DifferentialEvent as RawDifferentialEvent,
6 DropEvent as RawDropEvent, MergeEvent as RawMergeEvent, MergeShortfall as RawMergeShortfall,
7 TraceShare as RawTraceShare,
8};
9
10#[cfg(feature = "enable_abomonation")]
11use abomonation_derive::Abomonation;
12
13#[cfg(feature = "rkyv")]
14use rkyv_dep::{Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
15
16#[cfg(feature = "serde")]
17use serde_dep::{Deserialize as SerdeDeserialize, Serialize as SerdeSerialize};
18
19#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
21#[cfg_attr(
22 feature = "serde",
23 derive(SerdeSerialize, SerdeDeserialize),
24 serde(crate = "serde_dep")
25)]
26#[cfg_attr(
27 feature = "rkyv",
28 derive(Archive, RkyvSerialize, RkyvDeserialize),
29 archive(crate = "rkyv_dep"),
30 archive_attr(derive(bytecheck::CheckBytes))
31)]
32#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
33pub enum DifferentialEvent {
34 Batch(BatchEvent),
36 Merge(MergeEvent),
38 Drop(DropEvent),
40 MergeShortfall(MergeShortfall),
42 TraceShare(TraceShare),
44}
45
46impl DifferentialEvent {
47 #[inline]
49 pub const fn is_batch(&self) -> bool {
50 matches!(self, Self::Batch(..))
51 }
52
53 #[inline]
55 pub const fn is_merge(&self) -> bool {
56 matches!(self, Self::Merge(..))
57 }
58
59 #[inline]
61 pub const fn is_drop(&self) -> bool {
62 matches!(self, Self::Drop(..))
63 }
64
65 #[inline]
67 pub const fn is_merge_shortfall(&self) -> bool {
68 matches!(self, Self::MergeShortfall(..))
69 }
70
71 #[inline]
73 pub const fn is_trace_share(&self) -> bool {
74 matches!(self, Self::TraceShare(..))
75 }
76}
77
78impl From<RawDifferentialEvent> for DifferentialEvent {
79 #[inline]
80 fn from(event: RawDifferentialEvent) -> Self {
81 match event {
82 RawDifferentialEvent::Batch(batch) => Self::Batch(batch.into()),
83 RawDifferentialEvent::Merge(merge) => Self::Merge(merge.into()),
84 RawDifferentialEvent::Drop(drop) => Self::Drop(drop.into()),
85 RawDifferentialEvent::MergeShortfall(shortfall) => {
86 Self::MergeShortfall(shortfall.into())
87 }
88 RawDifferentialEvent::TraceShare(share) => Self::TraceShare(share.into()),
89 }
90 }
91}
92
93impl From<DifferentialEvent> for RawDifferentialEvent {
94 #[inline]
95 fn from(event: DifferentialEvent) -> Self {
96 match event {
97 DifferentialEvent::Batch(batch) => Self::Batch(batch.into()),
98 DifferentialEvent::Merge(merge) => Self::Merge(merge.into()),
99 DifferentialEvent::Drop(drop) => Self::Drop(drop.into()),
100 DifferentialEvent::MergeShortfall(shortfall) => Self::MergeShortfall(shortfall.into()),
101 DifferentialEvent::TraceShare(share) => Self::TraceShare(share.into()),
102 }
103 }
104}
105
106impl From<TraceShare> for DifferentialEvent {
107 #[inline]
108 fn from(share: TraceShare) -> Self {
109 Self::TraceShare(share)
110 }
111}
112
113impl From<MergeShortfall> for DifferentialEvent {
114 #[inline]
115 fn from(shortfall: MergeShortfall) -> Self {
116 Self::MergeShortfall(shortfall)
117 }
118}
119
120impl From<MergeEvent> for DifferentialEvent {
121 #[inline]
122 fn from(merge: MergeEvent) -> Self {
123 Self::Merge(merge)
124 }
125}
126
127impl From<BatchEvent> for DifferentialEvent {
128 #[inline]
129 fn from(batch: BatchEvent) -> Self {
130 Self::Batch(batch)
131 }
132}
133
134#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
136#[cfg_attr(
137 feature = "serde",
138 derive(SerdeSerialize, SerdeDeserialize),
139 serde(crate = "serde_dep")
140)]
141#[cfg_attr(
142 feature = "rkyv",
143 derive(Archive, RkyvSerialize, RkyvDeserialize),
144 archive(crate = "rkyv_dep"),
145 archive_attr(derive(bytecheck::CheckBytes))
146)]
147#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
148pub struct BatchEvent {
149 pub operator: OperatorId,
151 pub length: usize,
153}
154
155impl BatchEvent {
156 #[inline]
157 pub const fn new(operator: OperatorId, length: usize) -> Self {
158 Self { operator, length }
159 }
160}
161
162impl From<RawBatchEvent> for BatchEvent {
163 #[inline]
164 fn from(event: RawBatchEvent) -> Self {
165 Self {
166 operator: OperatorId::new(event.operator),
167 length: event.length,
168 }
169 }
170}
171
172impl From<BatchEvent> for RawBatchEvent {
173 #[inline]
174 fn from(event: BatchEvent) -> Self {
175 Self {
176 operator: event.operator.into_inner(),
177 length: event.length,
178 }
179 }
180}
181
182#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
184#[cfg_attr(
185 feature = "serde",
186 derive(SerdeSerialize, SerdeDeserialize),
187 serde(crate = "serde_dep")
188)]
189#[cfg_attr(
190 feature = "rkyv",
191 derive(Archive, RkyvSerialize, RkyvDeserialize),
192 archive(crate = "rkyv_dep"),
193 archive_attr(derive(bytecheck::CheckBytes))
194)]
195#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
196pub struct DropEvent {
197 pub operator: OperatorId,
199 pub length: usize,
201}
202
203impl DropEvent {
204 #[inline]
205 pub const fn new(operator: OperatorId, length: usize) -> Self {
206 Self { operator, length }
207 }
208}
209
210impl From<RawDropEvent> for DropEvent {
211 #[inline]
212 fn from(event: RawDropEvent) -> Self {
213 Self {
214 operator: OperatorId::new(event.operator),
215 length: event.length,
216 }
217 }
218}
219
220impl From<DropEvent> for RawDropEvent {
221 #[inline]
222 fn from(event: DropEvent) -> Self {
223 Self {
224 operator: event.operator.into_inner(),
225 length: event.length,
226 }
227 }
228}
229
230#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
232#[cfg_attr(
233 feature = "serde",
234 derive(SerdeSerialize, SerdeDeserialize),
235 serde(crate = "serde_dep")
236)]
237#[cfg_attr(
238 feature = "rkyv",
239 derive(Archive, RkyvSerialize, RkyvDeserialize),
240 archive(crate = "rkyv_dep"),
241 archive_attr(derive(bytecheck::CheckBytes))
242)]
243#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
244pub struct MergeEvent {
245 pub operator: OperatorId,
247 pub scale: usize,
249 pub length1: usize,
251 pub length2: usize,
253 pub complete: Option<usize>,
255}
256
257impl MergeEvent {
258 #[inline]
259 pub const fn new(
260 operator: OperatorId,
261 scale: usize,
262 length1: usize,
263 length2: usize,
264 complete: Option<usize>,
265 ) -> Self {
266 Self {
267 operator,
268 scale,
269 length1,
270 length2,
271 complete,
272 }
273 }
274}
275
276impl From<RawMergeEvent> for MergeEvent {
277 #[inline]
278 fn from(event: RawMergeEvent) -> Self {
279 Self {
280 operator: OperatorId::new(event.operator),
281 scale: event.scale,
282 length1: event.length1,
283 length2: event.length2,
284 complete: event.complete,
285 }
286 }
287}
288
289impl From<MergeEvent> for RawMergeEvent {
290 #[inline]
291 fn from(event: MergeEvent) -> Self {
292 Self {
293 operator: event.operator.into_inner(),
294 scale: event.scale,
295 length1: event.length1,
296 length2: event.length2,
297 complete: event.complete,
298 }
299 }
300}
301
302#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
304#[cfg_attr(
305 feature = "serde",
306 derive(SerdeSerialize, SerdeDeserialize),
307 serde(crate = "serde_dep")
308)]
309#[cfg_attr(
310 feature = "rkyv",
311 derive(Archive, RkyvSerialize, RkyvDeserialize),
312 archive(crate = "rkyv_dep"),
313 archive_attr(derive(bytecheck::CheckBytes))
314)]
315#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
316pub struct MergeShortfall {
317 pub operator: OperatorId,
319 pub scale: usize,
321 pub shortfall: usize,
323}
324
325impl MergeShortfall {
326 #[inline]
327 pub const fn new(operator: OperatorId, scale: usize, shortfall: usize) -> Self {
328 Self {
329 operator,
330 scale,
331 shortfall,
332 }
333 }
334}
335
336impl From<RawMergeShortfall> for MergeShortfall {
337 #[inline]
338 fn from(event: RawMergeShortfall) -> Self {
339 Self {
340 operator: OperatorId::new(event.operator),
341 scale: event.scale,
342 shortfall: event.shortfall,
343 }
344 }
345}
346
347impl From<MergeShortfall> for RawMergeShortfall {
348 #[inline]
349 fn from(event: MergeShortfall) -> Self {
350 Self {
351 operator: event.operator.into_inner(),
352 scale: event.scale,
353 shortfall: event.shortfall,
354 }
355 }
356}
357
358#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
360#[cfg_attr(
361 feature = "serde",
362 derive(SerdeSerialize, SerdeDeserialize),
363 serde(crate = "serde_dep")
364)]
365#[cfg_attr(
366 feature = "rkyv",
367 derive(Archive, RkyvSerialize, RkyvDeserialize),
368 archive(crate = "rkyv_dep"),
369 archive_attr(derive(bytecheck::CheckBytes))
370)]
371#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
372pub struct TraceShare {
373 pub operator: OperatorId,
375 pub diff: isize,
377}
378
379impl TraceShare {
380 #[inline]
381 pub const fn new(operator: OperatorId, diff: isize) -> Self {
382 Self { operator, diff }
383 }
384}
385
386impl From<RawTraceShare> for TraceShare {
387 #[inline]
388 fn from(event: RawTraceShare) -> Self {
389 Self {
390 operator: OperatorId::new(event.operator),
391 diff: event.diff,
392 }
393 }
394}
395
396impl From<TraceShare> for RawTraceShare {
397 #[inline]
398 fn from(event: TraceShare) -> Self {
399 Self {
400 operator: event.operator.into_inner(),
401 diff: event.diff,
402 }
403 }
404}