differential_dataflow/
logging.rs

1//! Loggers and logging events for differential dataflow.
2
3use abomonation_derive::Abomonation;
4
5/// Logger for differential dataflow events.
6pub type Logger = ::timely::logging::Logger<DifferentialEvent>;
7
8/// Enables logging of differential dataflow events.
9pub fn enable<A, W>(worker: &mut timely::worker::Worker<A>, writer: W) -> Option<Box<dyn std::any::Any+'static>>
10where
11    A: timely::communication::Allocate,
12    W: std::io::Write+'static,
13{
14    let writer = ::timely::dataflow::operators::capture::EventWriter::new(writer);
15    let mut logger = ::timely::logging::BatchLogger::new(writer);
16    worker
17        .log_register()
18        .insert::<DifferentialEvent,_>("differential/arrange", move |time, data| logger.publish_batch(time, data))
19}
20
21/// Possible different differential events.
22#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
23pub enum DifferentialEvent {
24    /// Batch creation.
25    Batch(BatchEvent),
26    /// Merge start and stop events.
27    Merge(MergeEvent),
28    /// Batch dropped when trace dropped.
29    Drop(DropEvent),
30    /// A merge failed to complete in time.
31    MergeShortfall(MergeShortfall),
32    /// Trace sharing event.
33    TraceShare(TraceShare),
34    /// Batcher size event
35    Batcher(BatcherEvent),
36}
37
38/// Either the start or end of a merge event.
39#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
40pub struct BatchEvent {
41    /// Operator identifier.
42    pub operator: usize,
43    /// Which order of magnitude.
44    pub length: usize,
45}
46
47impl From<BatchEvent> for DifferentialEvent { fn from(e: BatchEvent) -> Self { DifferentialEvent::Batch(e) } }
48
49
50/// Either the start or end of a merge event.
51#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
52pub struct BatcherEvent {
53    /// Operator identifier.
54    pub operator: usize,
55    /// Change in records.
56    pub records_diff: isize,
57    /// Change in used size.
58    pub size_diff: isize,
59    /// Change in capacity.
60    pub capacity_diff: isize,
61    /// Change in number of allocations.
62    pub allocations_diff: isize,
63}
64
65impl From<BatcherEvent> for DifferentialEvent { fn from(e: BatcherEvent) -> Self { DifferentialEvent::Batcher(e) } }
66
67/// Either the start or end of a merge event.
68#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
69pub struct DropEvent {
70    /// Operator identifier.
71    pub operator: usize,
72    /// Which order of magnitude.
73    pub length: usize,
74}
75
76impl From<DropEvent> for DifferentialEvent { fn from(e: DropEvent) -> Self { DifferentialEvent::Drop(e) } }
77
78/// Either the start or end of a merge event.
79#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
80pub struct MergeEvent {
81    /// Operator identifier.
82    pub operator: usize,
83    /// Which order of magnitude.
84    pub scale: usize,
85    /// Length of first trace.
86    pub length1: usize,
87    /// Length of second trace.
88    pub length2: usize,
89    /// None implies a start.
90    pub complete: Option<usize>,
91}
92
93impl From<MergeEvent> for DifferentialEvent { fn from(e: MergeEvent) -> Self { DifferentialEvent::Merge(e) } }
94
95/// A merge failed to complete in time.
96#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
97pub struct MergeShortfall {
98    /// Operator identifer.
99    pub operator: usize,
100    /// Which order of magnitude.
101    pub scale: usize,
102    /// By how much were we short.
103    pub shortfall: usize,
104}
105
106impl From<MergeShortfall> for DifferentialEvent { fn from(e: MergeShortfall) -> Self { DifferentialEvent::MergeShortfall(e) } }
107
108/// Either the start or end of a merge event.
109#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
110pub struct TraceShare {
111    /// Operator identifier.
112    pub operator: usize,
113    /// Change in number of shares.
114    pub diff: isize,
115}
116
117impl From<TraceShare> for DifferentialEvent { fn from(e: TraceShare) -> Self { DifferentialEvent::TraceShare(e) } }