1use columnar::Columnar;
4use serde::{Deserialize, Serialize};
5
6pub type DifferentialEventBuilder =
8 timely::container::CapacityContainerBuilder<Vec<(std::time::Duration, DifferentialEvent)>>;
9
10pub type Logger = ::timely::logging_core::TypedLogger<DifferentialEventBuilder, DifferentialEvent>;
12
13pub fn enable<A, W>(
15 worker: &mut timely::worker::Worker<A>,
16 writer: W,
17) -> Option<Box<dyn std::any::Any + 'static>>
18where
19 A: timely::communication::Allocate,
20 W: std::io::Write + 'static,
21{
22 worker.log_register().and_then(|mut log_register| {
23 let writer = ::timely::dataflow::operators::capture::EventWriter::new(writer);
24 let mut logger = ::timely::logging::BatchLogger::new(writer);
25 log_register
26 .insert::<DifferentialEventBuilder, _>("differential/arrange", move |time, data| {
27 logger.publish_batch(time, data)
28 })
29 })
30}
31
32#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
34pub enum DifferentialEvent {
35 Batch(BatchEvent),
37 Merge(MergeEvent),
39 Drop(DropEvent),
41 MergeShortfall(MergeShortfall),
43 TraceShare(TraceShare),
45 Batcher(BatcherEvent),
47}
48
49#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
51pub struct BatchEvent {
52 pub operator: usize,
54 pub length: usize,
56}
57
58impl From<BatchEvent> for DifferentialEvent {
59 fn from(e: BatchEvent) -> Self {
60 DifferentialEvent::Batch(e)
61 }
62}
63
64#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
66pub struct BatcherEvent {
67 pub operator: usize,
69 pub records_diff: isize,
71 pub size_diff: isize,
73 pub capacity_diff: isize,
75 pub allocations_diff: isize,
77}
78
79impl From<BatcherEvent> for DifferentialEvent {
80 fn from(e: BatcherEvent) -> Self {
81 DifferentialEvent::Batcher(e)
82 }
83}
84
85#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
87pub struct DropEvent {
88 pub operator: usize,
90 pub length: usize,
92}
93
94impl From<DropEvent> for DifferentialEvent {
95 fn from(e: DropEvent) -> Self {
96 DifferentialEvent::Drop(e)
97 }
98}
99
100#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
102pub struct MergeEvent {
103 pub operator: usize,
105 pub scale: usize,
107 pub length1: usize,
109 pub length2: usize,
111 pub complete: Option<usize>,
113}
114
115impl From<MergeEvent> for DifferentialEvent {
116 fn from(e: MergeEvent) -> Self {
117 DifferentialEvent::Merge(e)
118 }
119}
120
121#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
123pub struct MergeShortfall {
124 pub operator: usize,
126 pub scale: usize,
128 pub shortfall: usize,
130}
131
132impl From<MergeShortfall> for DifferentialEvent {
133 fn from(e: MergeShortfall) -> Self {
134 DifferentialEvent::MergeShortfall(e)
135 }
136}
137
138#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
140pub struct TraceShare {
141 pub operator: usize,
143 pub diff: isize,
145}
146
147impl From<TraceShare> for DifferentialEvent {
148 fn from(e: TraceShare) -> Self {
149 DifferentialEvent::TraceShare(e)
150 }
151}