1use columnar::Columnar;
4use serde::{Deserialize, Serialize};
5
6pub type DifferentialEventBuilder = timely::container::CapacityContainerBuilder<Vec<(std::time::Duration, DifferentialEvent)>>;
8
9pub type Logger = ::timely::logging_core::TypedLogger<DifferentialEventBuilder, DifferentialEvent>;
11
12pub fn enable<W>(worker: &mut timely::worker::Worker, writer: W) -> Option<Box<dyn std::any::Any+'static>>
14where
15 W: std::io::Write + 'static,
16{
17 worker.log_register().and_then(|mut log_register| {
18 let writer = ::timely::dataflow::operators::capture::EventWriter::new(writer);
19 let mut logger = ::timely::logging::BatchLogger::new(writer);
20 log_register.insert::<DifferentialEventBuilder,_>("differential/arrange", move |time, data| logger.publish_batch(time, data))
21 })
22}
23
24#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
26pub enum DifferentialEvent {
27 Batch(BatchEvent),
29 Merge(MergeEvent),
31 Drop(DropEvent),
33 MergeShortfall(MergeShortfall),
35 TraceShare(TraceShare),
37 Batcher(BatcherEvent),
39}
40
41#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
43pub struct BatchEvent {
44 pub operator: usize,
46 pub length: usize,
48}
49
50impl From<BatchEvent> for DifferentialEvent { fn from(e: BatchEvent) -> Self { DifferentialEvent::Batch(e) } }
51
52
53#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
55pub struct BatcherEvent {
56 pub operator: usize,
58 pub records_diff: isize,
60 pub size_diff: isize,
62 pub capacity_diff: isize,
64 pub allocations_diff: isize,
66}
67
68impl From<BatcherEvent> for DifferentialEvent { fn from(e: BatcherEvent) -> Self { DifferentialEvent::Batcher(e) } }
69
70#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
72pub struct DropEvent {
73 pub operator: usize,
75 pub length: usize,
77}
78
79impl From<DropEvent> for DifferentialEvent { fn from(e: DropEvent) -> Self { DifferentialEvent::Drop(e) } }
80
81#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
83pub struct MergeEvent {
84 pub operator: usize,
86 pub scale: usize,
88 pub length1: usize,
90 pub length2: usize,
92 pub complete: Option<usize>,
94}
95
96impl From<MergeEvent> for DifferentialEvent { fn from(e: MergeEvent) -> Self { DifferentialEvent::Merge(e) } }
97
98#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
100pub struct MergeShortfall {
101 pub operator: usize,
103 pub scale: usize,
105 pub shortfall: usize,
107}
108
109impl From<MergeShortfall> for DifferentialEvent { fn from(e: MergeShortfall) -> Self { DifferentialEvent::MergeShortfall(e) } }
110
111#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
113pub struct TraceShare {
114 pub operator: usize,
116 pub diff: isize,
118}
119
120impl From<TraceShare> for DifferentialEvent { fn from(e: TraceShare) -> Self { DifferentialEvent::TraceShare(e) } }