1use columnar::Columnar;
4use serde::{Deserialize, Serialize};
5
6pub type DifferentialEventBuilder = timely::container::CapacityContainerBuilder<Vec<(std::time::Duration, DifferentialEvent)>>;
8
9pub type Logger = ::timely::logging_core::TypedLogger<DifferentialEventBuilder, DifferentialEvent>;
11
12pub fn enable<A, W>(worker: &mut timely::worker::Worker<A>, writer: W) -> Option<Box<dyn std::any::Any+'static>>
14where
15    A: timely::communication::Allocate,
16    W: std::io::Write + 'static,
17{
18    worker.log_register().and_then(|mut log_register| {
19        let writer = ::timely::dataflow::operators::capture::EventWriter::new(writer);
20        let mut logger = ::timely::logging::BatchLogger::new(writer);
21        log_register.insert::<DifferentialEventBuilder,_>("differential/arrange", move |time, data| logger.publish_batch(time, data))
22    })
23}
24
25#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
27pub enum DifferentialEvent {
28    Batch(BatchEvent),
30    Merge(MergeEvent),
32    Drop(DropEvent),
34    MergeShortfall(MergeShortfall),
36    TraceShare(TraceShare),
38    Batcher(BatcherEvent),
40}
41
42#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
44pub struct BatchEvent {
45    pub operator: usize,
47    pub length: usize,
49}
50
51impl From<BatchEvent> for DifferentialEvent { fn from(e: BatchEvent) -> Self { DifferentialEvent::Batch(e) } }
52
53
54#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
56pub struct BatcherEvent {
57    pub operator: usize,
59    pub records_diff: isize,
61    pub size_diff: isize,
63    pub capacity_diff: isize,
65    pub allocations_diff: isize,
67}
68
69impl From<BatcherEvent> for DifferentialEvent { fn from(e: BatcherEvent) -> Self { DifferentialEvent::Batcher(e) } }
70
71#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
73pub struct DropEvent {
74    pub operator: usize,
76    pub length: usize,
78}
79
80impl From<DropEvent> for DifferentialEvent { fn from(e: DropEvent) -> Self { DifferentialEvent::Drop(e) } }
81
82#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
84pub struct MergeEvent {
85    pub operator: usize,
87    pub scale: usize,
89    pub length1: usize,
91    pub length2: usize,
93    pub complete: Option<usize>,
95}
96
97impl From<MergeEvent> for DifferentialEvent { fn from(e: MergeEvent) -> Self { DifferentialEvent::Merge(e) } }
98
99#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
101pub struct MergeShortfall {
102    pub operator: usize,
104    pub scale: usize,
106    pub shortfall: usize,
108}
109
110impl From<MergeShortfall> for DifferentialEvent { fn from(e: MergeShortfall) -> Self { DifferentialEvent::MergeShortfall(e) } }
111
112#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
114pub struct TraceShare {
115    pub operator: usize,
117    pub diff: isize,
119}
120
121impl From<TraceShare> for DifferentialEvent { fn from(e: TraceShare) -> Self { DifferentialEvent::TraceShare(e) } }