Skip to main content

differential_dataflow/
logging.rs

1//! Loggers and logging events for differential dataflow.
2
3use columnar::Columnar;
4use serde::{Deserialize, Serialize};
5
6/// Container builder for differential log events.
7pub type DifferentialEventBuilder = timely::container::CapacityContainerBuilder<Vec<(std::time::Duration, DifferentialEvent)>>;
8
9/// Logger for differential dataflow events.
10pub type Logger = ::timely::logging_core::TypedLogger<DifferentialEventBuilder, DifferentialEvent>;
11
12/// Enables logging of differential dataflow events.
13pub fn enable<W>(worker: &mut timely::worker::Worker, writer: W) -> Option<Box<dyn std::any::Any+'static>>
14where
15    W: std::io::Write + 'static,
16{
17    worker.log_register().and_then(|mut log_register| {
18        let writer = ::timely::dataflow::operators::capture::EventWriter::new(writer);
19        let mut logger = ::timely::logging::BatchLogger::new(writer);
20        log_register.insert::<DifferentialEventBuilder,_>("differential/arrange", move |time, data| logger.publish_batch(time, data))
21    })
22}
23
24/// Possible different differential events.
25#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
26pub enum DifferentialEvent {
27    /// Batch creation.
28    Batch(BatchEvent),
29    /// Merge start and stop events.
30    Merge(MergeEvent),
31    /// Batch dropped when trace dropped.
32    Drop(DropEvent),
33    /// A merge failed to complete in time.
34    MergeShortfall(MergeShortfall),
35    /// Trace sharing event.
36    TraceShare(TraceShare),
37    /// Batcher size event
38    Batcher(BatcherEvent),
39}
40
41/// Either the start or end of a merge event.
42#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
43pub struct BatchEvent {
44    /// Operator identifier.
45    pub operator: usize,
46    /// Which order of magnitude.
47    pub length: usize,
48}
49
50impl From<BatchEvent> for DifferentialEvent { fn from(e: BatchEvent) -> Self { DifferentialEvent::Batch(e) } }
51
52
53/// Either the start or end of a merge event.
54#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
55pub struct BatcherEvent {
56    /// Operator identifier.
57    pub operator: usize,
58    /// Change in records.
59    pub records_diff: isize,
60    /// Change in used size.
61    pub size_diff: isize,
62    /// Change in capacity.
63    pub capacity_diff: isize,
64    /// Change in number of allocations.
65    pub allocations_diff: isize,
66}
67
68impl From<BatcherEvent> for DifferentialEvent { fn from(e: BatcherEvent) -> Self { DifferentialEvent::Batcher(e) } }
69
70/// Either the start or end of a merge event.
71#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
72pub struct DropEvent {
73    /// Operator identifier.
74    pub operator: usize,
75    /// Which order of magnitude.
76    pub length: usize,
77}
78
79impl From<DropEvent> for DifferentialEvent { fn from(e: DropEvent) -> Self { DifferentialEvent::Drop(e) } }
80
81/// Either the start or end of a merge event.
82#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
83pub struct MergeEvent {
84    /// Operator identifier.
85    pub operator: usize,
86    /// Which order of magnitude.
87    pub scale: usize,
88    /// Length of first trace.
89    pub length1: usize,
90    /// Length of second trace.
91    pub length2: usize,
92    /// None implies a start.
93    pub complete: Option<usize>,
94}
95
96impl From<MergeEvent> for DifferentialEvent { fn from(e: MergeEvent) -> Self { DifferentialEvent::Merge(e) } }
97
98/// A merge failed to complete in time.
99#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
100pub struct MergeShortfall {
101    /// Operator identifier.
102    pub operator: usize,
103    /// Which order of magnitude.
104    pub scale: usize,
105    /// By how much were we short.
106    pub shortfall: usize,
107}
108
109impl From<MergeShortfall> for DifferentialEvent { fn from(e: MergeShortfall) -> Self { DifferentialEvent::MergeShortfall(e) } }
110
111/// Either the start or end of a merge event.
112#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
113pub struct TraceShare {
114    /// Operator identifier.
115    pub operator: usize,
116    /// Change in number of shares.
117    pub diff: isize,
118}
119
120impl From<TraceShare> for DifferentialEvent { fn from(e: TraceShare) -> Self { DifferentialEvent::TraceShare(e) } }