Skip to main content

palimpsest_dataflow/
logging.rs

1//! Loggers and logging events for differential dataflow.
2
3use columnar::Columnar;
4use serde::{Deserialize, Serialize};
5
6/// Container builder for differential log events.
7pub type DifferentialEventBuilder =
8    timely::container::CapacityContainerBuilder<Vec<(std::time::Duration, DifferentialEvent)>>;
9
10/// Logger for differential dataflow events.
11pub type Logger = ::timely::logging_core::TypedLogger<DifferentialEventBuilder, DifferentialEvent>;
12
13/// Enables logging of differential dataflow events.
14pub fn enable<A, W>(
15    worker: &mut timely::worker::Worker<A>,
16    writer: W,
17) -> Option<Box<dyn std::any::Any + 'static>>
18where
19    A: timely::communication::Allocate,
20    W: std::io::Write + 'static,
21{
22    worker.log_register().and_then(|mut log_register| {
23        let writer = ::timely::dataflow::operators::capture::EventWriter::new(writer);
24        let mut logger = ::timely::logging::BatchLogger::new(writer);
25        log_register
26            .insert::<DifferentialEventBuilder, _>("differential/arrange", move |time, data| {
27                logger.publish_batch(time, data)
28            })
29    })
30}
31
32/// Possible different differential events.
33#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
34pub enum DifferentialEvent {
35    /// Batch creation.
36    Batch(BatchEvent),
37    /// Merge start and stop events.
38    Merge(MergeEvent),
39    /// Batch dropped when trace dropped.
40    Drop(DropEvent),
41    /// A merge failed to complete in time.
42    MergeShortfall(MergeShortfall),
43    /// Trace sharing event.
44    TraceShare(TraceShare),
45    /// Batcher size event
46    Batcher(BatcherEvent),
47}
48
49/// Either the start or end of a merge event.
50#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
51pub struct BatchEvent {
52    /// Operator identifier.
53    pub operator: usize,
54    /// Which order of magnitude.
55    pub length: usize,
56}
57
58impl From<BatchEvent> for DifferentialEvent {
59    fn from(e: BatchEvent) -> Self {
60        DifferentialEvent::Batch(e)
61    }
62}
63
64/// Either the start or end of a merge event.
65#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
66pub struct BatcherEvent {
67    /// Operator identifier.
68    pub operator: usize,
69    /// Change in records.
70    pub records_diff: isize,
71    /// Change in used size.
72    pub size_diff: isize,
73    /// Change in capacity.
74    pub capacity_diff: isize,
75    /// Change in number of allocations.
76    pub allocations_diff: isize,
77}
78
79impl From<BatcherEvent> for DifferentialEvent {
80    fn from(e: BatcherEvent) -> Self {
81        DifferentialEvent::Batcher(e)
82    }
83}
84
85/// Either the start or end of a merge event.
86#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
87pub struct DropEvent {
88    /// Operator identifier.
89    pub operator: usize,
90    /// Which order of magnitude.
91    pub length: usize,
92}
93
94impl From<DropEvent> for DifferentialEvent {
95    fn from(e: DropEvent) -> Self {
96        DifferentialEvent::Drop(e)
97    }
98}
99
100/// Either the start or end of a merge event.
101#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
102pub struct MergeEvent {
103    /// Operator identifier.
104    pub operator: usize,
105    /// Which order of magnitude.
106    pub scale: usize,
107    /// Length of first trace.
108    pub length1: usize,
109    /// Length of second trace.
110    pub length2: usize,
111    /// None implies a start.
112    pub complete: Option<usize>,
113}
114
115impl From<MergeEvent> for DifferentialEvent {
116    fn from(e: MergeEvent) -> Self {
117        DifferentialEvent::Merge(e)
118    }
119}
120
121/// A merge failed to complete in time.
122#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
123pub struct MergeShortfall {
124    /// Operator identifier.
125    pub operator: usize,
126    /// Which order of magnitude.
127    pub scale: usize,
128    /// By how much were we short.
129    pub shortfall: usize,
130}
131
132impl From<MergeShortfall> for DifferentialEvent {
133    fn from(e: MergeShortfall) -> Self {
134        DifferentialEvent::MergeShortfall(e)
135    }
136}
137
138/// Either the start or end of a merge event.
139#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
140pub struct TraceShare {
141    /// Operator identifier.
142    pub operator: usize,
143    /// Change in number of shares.
144    pub diff: isize,
145}
146
147impl From<TraceShare> for DifferentialEvent {
148    fn from(e: TraceShare) -> Self {
149        DifferentialEvent::TraceShare(e)
150    }
151}