ddshow_types/
differential_logging.rs

1//! Differential dataflow logging events
2
3use crate::ids::OperatorId;
4use differential_dataflow::logging::{
5    BatchEvent as RawBatchEvent, DifferentialEvent as RawDifferentialEvent,
6    DropEvent as RawDropEvent, MergeEvent as RawMergeEvent, MergeShortfall as RawMergeShortfall,
7    TraceShare as RawTraceShare,
8};
9
10#[cfg(feature = "enable_abomonation")]
11use abomonation_derive::Abomonation;
12
13#[cfg(feature = "rkyv")]
14use rkyv_dep::{Archive, Deserialize as RkyvDeserialize, Serialize as RkyvSerialize};
15
16#[cfg(feature = "serde")]
17use serde_dep::{Deserialize as SerdeDeserialize, Serialize as SerdeSerialize};
18
19/// Differential dataflow events
20#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
21#[cfg_attr(
22    feature = "serde",
23    derive(SerdeSerialize, SerdeDeserialize),
24    serde(crate = "serde_dep")
25)]
26#[cfg_attr(
27    feature = "rkyv",
28    derive(Archive, RkyvSerialize, RkyvDeserialize),
29    archive(crate = "rkyv_dep"),
30    archive_attr(derive(bytecheck::CheckBytes))
31)]
32#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
33pub enum DifferentialEvent {
34    /// Batch creation.
35    Batch(BatchEvent),
36    /// Merge start and stop events.
37    Merge(MergeEvent),
38    /// Batch dropped when trace dropped.
39    Drop(DropEvent),
40    /// A merge failed to complete in time.
41    MergeShortfall(MergeShortfall),
42    /// Trace sharing event.
43    TraceShare(TraceShare),
44}
45
46impl DifferentialEvent {
47    /// Returns `true` if the differential_event is [`Batch`].
48    #[inline]
49    pub const fn is_batch(&self) -> bool {
50        matches!(self, Self::Batch(..))
51    }
52
53    /// Returns `true` if the differential_event is [`Merge`].
54    #[inline]
55    pub const fn is_merge(&self) -> bool {
56        matches!(self, Self::Merge(..))
57    }
58
59    /// Returns `true` if the differential_event is [`Drop`].
60    #[inline]
61    pub const fn is_drop(&self) -> bool {
62        matches!(self, Self::Drop(..))
63    }
64
65    /// Returns `true` if the differential_event is [`MergeShortfall`].
66    #[inline]
67    pub const fn is_merge_shortfall(&self) -> bool {
68        matches!(self, Self::MergeShortfall(..))
69    }
70
71    /// Returns `true` if the differential_event is [`TraceShare`].
72    #[inline]
73    pub const fn is_trace_share(&self) -> bool {
74        matches!(self, Self::TraceShare(..))
75    }
76}
77
78impl From<RawDifferentialEvent> for DifferentialEvent {
79    #[inline]
80    fn from(event: RawDifferentialEvent) -> Self {
81        match event {
82            RawDifferentialEvent::Batch(batch) => Self::Batch(batch.into()),
83            RawDifferentialEvent::Merge(merge) => Self::Merge(merge.into()),
84            RawDifferentialEvent::Drop(drop) => Self::Drop(drop.into()),
85            RawDifferentialEvent::MergeShortfall(shortfall) => {
86                Self::MergeShortfall(shortfall.into())
87            }
88            RawDifferentialEvent::TraceShare(share) => Self::TraceShare(share.into()),
89        }
90    }
91}
92
93impl From<DifferentialEvent> for RawDifferentialEvent {
94    #[inline]
95    fn from(event: DifferentialEvent) -> Self {
96        match event {
97            DifferentialEvent::Batch(batch) => Self::Batch(batch.into()),
98            DifferentialEvent::Merge(merge) => Self::Merge(merge.into()),
99            DifferentialEvent::Drop(drop) => Self::Drop(drop.into()),
100            DifferentialEvent::MergeShortfall(shortfall) => Self::MergeShortfall(shortfall.into()),
101            DifferentialEvent::TraceShare(share) => Self::TraceShare(share.into()),
102        }
103    }
104}
105
106impl From<TraceShare> for DifferentialEvent {
107    #[inline]
108    fn from(share: TraceShare) -> Self {
109        Self::TraceShare(share)
110    }
111}
112
113impl From<MergeShortfall> for DifferentialEvent {
114    #[inline]
115    fn from(shortfall: MergeShortfall) -> Self {
116        Self::MergeShortfall(shortfall)
117    }
118}
119
120impl From<MergeEvent> for DifferentialEvent {
121    #[inline]
122    fn from(merge: MergeEvent) -> Self {
123        Self::Merge(merge)
124    }
125}
126
127impl From<BatchEvent> for DifferentialEvent {
128    #[inline]
129    fn from(batch: BatchEvent) -> Self {
130        Self::Batch(batch)
131    }
132}
133
134/// A batch of data sent to an arrangement
135#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
136#[cfg_attr(
137    feature = "serde",
138    derive(SerdeSerialize, SerdeDeserialize),
139    serde(crate = "serde_dep")
140)]
141#[cfg_attr(
142    feature = "rkyv",
143    derive(Archive, RkyvSerialize, RkyvDeserialize),
144    archive(crate = "rkyv_dep"),
145    archive_attr(derive(bytecheck::CheckBytes))
146)]
147#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
148pub struct BatchEvent {
149    /// Operator identifier.
150    pub operator: OperatorId,
151    /// Which order of magnitude.
152    pub length: usize,
153}
154
155impl BatchEvent {
156    #[inline]
157    pub const fn new(operator: OperatorId, length: usize) -> Self {
158        Self { operator, length }
159    }
160}
161
162impl From<RawBatchEvent> for BatchEvent {
163    #[inline]
164    fn from(event: RawBatchEvent) -> Self {
165        Self {
166            operator: OperatorId::new(event.operator),
167            length: event.length,
168        }
169    }
170}
171
172impl From<BatchEvent> for RawBatchEvent {
173    #[inline]
174    fn from(event: BatchEvent) -> Self {
175        Self {
176            operator: event.operator.into_inner(),
177            length: event.length,
178        }
179    }
180}
181
182/// The destruction of an arrangement
183#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
184#[cfg_attr(
185    feature = "serde",
186    derive(SerdeSerialize, SerdeDeserialize),
187    serde(crate = "serde_dep")
188)]
189#[cfg_attr(
190    feature = "rkyv",
191    derive(Archive, RkyvSerialize, RkyvDeserialize),
192    archive(crate = "rkyv_dep"),
193    archive_attr(derive(bytecheck::CheckBytes))
194)]
195#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
196pub struct DropEvent {
197    /// Operator identifier.
198    pub operator: OperatorId,
199    /// Which order of magnitude.
200    pub length: usize,
201}
202
203impl DropEvent {
204    #[inline]
205    pub const fn new(operator: OperatorId, length: usize) -> Self {
206        Self { operator, length }
207    }
208}
209
210impl From<RawDropEvent> for DropEvent {
211    #[inline]
212    fn from(event: RawDropEvent) -> Self {
213        Self {
214            operator: OperatorId::new(event.operator),
215            length: event.length,
216        }
217    }
218}
219
220impl From<DropEvent> for RawDropEvent {
221    #[inline]
222    fn from(event: DropEvent) -> Self {
223        Self {
224            operator: event.operator.into_inner(),
225            length: event.length,
226        }
227    }
228}
229
230/// Either the start or end of a merge event.
231#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
232#[cfg_attr(
233    feature = "serde",
234    derive(SerdeSerialize, SerdeDeserialize),
235    serde(crate = "serde_dep")
236)]
237#[cfg_attr(
238    feature = "rkyv",
239    derive(Archive, RkyvSerialize, RkyvDeserialize),
240    archive(crate = "rkyv_dep"),
241    archive_attr(derive(bytecheck::CheckBytes))
242)]
243#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
244pub struct MergeEvent {
245    /// Operator identifier.
246    pub operator: OperatorId,
247    /// Which order of magnitude.
248    pub scale: usize,
249    /// Length of first trace.
250    pub length1: usize,
251    /// Length of second trace.
252    pub length2: usize,
253    /// None implies a start.
254    pub complete: Option<usize>,
255}
256
257impl MergeEvent {
258    #[inline]
259    pub const fn new(
260        operator: OperatorId,
261        scale: usize,
262        length1: usize,
263        length2: usize,
264        complete: Option<usize>,
265    ) -> Self {
266        Self {
267            operator,
268            scale,
269            length1,
270            length2,
271            complete,
272        }
273    }
274}
275
276impl From<RawMergeEvent> for MergeEvent {
277    #[inline]
278    fn from(event: RawMergeEvent) -> Self {
279        Self {
280            operator: OperatorId::new(event.operator),
281            scale: event.scale,
282            length1: event.length1,
283            length2: event.length2,
284            complete: event.complete,
285        }
286    }
287}
288
289impl From<MergeEvent> for RawMergeEvent {
290    #[inline]
291    fn from(event: MergeEvent) -> Self {
292        Self {
293            operator: event.operator.into_inner(),
294            scale: event.scale,
295            length1: event.length1,
296            length2: event.length2,
297            complete: event.complete,
298        }
299    }
300}
301
302/// A merge failed to complete in time.
303#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
304#[cfg_attr(
305    feature = "serde",
306    derive(SerdeSerialize, SerdeDeserialize),
307    serde(crate = "serde_dep")
308)]
309#[cfg_attr(
310    feature = "rkyv",
311    derive(Archive, RkyvSerialize, RkyvDeserialize),
312    archive(crate = "rkyv_dep"),
313    archive_attr(derive(bytecheck::CheckBytes))
314)]
315#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
316pub struct MergeShortfall {
317    /// Operator identifer.
318    pub operator: OperatorId,
319    /// Which order of magnitude.
320    pub scale: usize,
321    /// By how much were we short.
322    pub shortfall: usize,
323}
324
325impl MergeShortfall {
326    #[inline]
327    pub const fn new(operator: OperatorId, scale: usize, shortfall: usize) -> Self {
328        Self {
329            operator,
330            scale,
331            shortfall,
332        }
333    }
334}
335
336impl From<RawMergeShortfall> for MergeShortfall {
337    #[inline]
338    fn from(event: RawMergeShortfall) -> Self {
339        Self {
340            operator: OperatorId::new(event.operator),
341            scale: event.scale,
342            shortfall: event.shortfall,
343        }
344    }
345}
346
347impl From<MergeShortfall> for RawMergeShortfall {
348    #[inline]
349    fn from(event: MergeShortfall) -> Self {
350        Self {
351            operator: event.operator.into_inner(),
352            scale: event.scale,
353            shortfall: event.shortfall,
354        }
355    }
356}
357
358/// The sharing of an arrangement trace
359#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
360#[cfg_attr(
361    feature = "serde",
362    derive(SerdeSerialize, SerdeDeserialize),
363    serde(crate = "serde_dep")
364)]
365#[cfg_attr(
366    feature = "rkyv",
367    derive(Archive, RkyvSerialize, RkyvDeserialize),
368    archive(crate = "rkyv_dep"),
369    archive_attr(derive(bytecheck::CheckBytes))
370)]
371#[cfg_attr(feature = "enable_abomonation", derive(Abomonation))]
372pub struct TraceShare {
373    /// Operator identifier.
374    pub operator: OperatorId,
375    /// Change in number of shares.
376    pub diff: isize,
377}
378
379impl TraceShare {
380    #[inline]
381    pub const fn new(operator: OperatorId, diff: isize) -> Self {
382        Self { operator, diff }
383    }
384}
385
386impl From<RawTraceShare> for TraceShare {
387    #[inline]
388    fn from(event: RawTraceShare) -> Self {
389        Self {
390            operator: OperatorId::new(event.operator),
391            diff: event.diff,
392        }
393    }
394}
395
396impl From<TraceShare> for RawTraceShare {
397    #[inline]
398    fn from(event: TraceShare) -> Self {
399        Self {
400            operator: event.operator.into_inner(),
401            diff: event.diff,
402        }
403    }
404}