differential_dataflow/
logging.rs1use abomonation_derive::Abomonation;
4
5pub type Logger = ::timely::logging::Logger<DifferentialEvent>;
7
8pub fn enable<A, W>(worker: &mut timely::worker::Worker<A>, writer: W) -> Option<Box<dyn std::any::Any+'static>>
10where
11 A: timely::communication::Allocate,
12 W: std::io::Write+'static,
13{
14 let writer = ::timely::dataflow::operators::capture::EventWriter::new(writer);
15 let mut logger = ::timely::logging::BatchLogger::new(writer);
16 worker
17 .log_register()
18 .insert::<DifferentialEvent,_>("differential/arrange", move |time, data| logger.publish_batch(time, data))
19}
20
21#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
23pub enum DifferentialEvent {
24 Batch(BatchEvent),
26 Merge(MergeEvent),
28 Drop(DropEvent),
30 MergeShortfall(MergeShortfall),
32 TraceShare(TraceShare),
34 Batcher(BatcherEvent),
36}
37
38#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
40pub struct BatchEvent {
41 pub operator: usize,
43 pub length: usize,
45}
46
47impl From<BatchEvent> for DifferentialEvent { fn from(e: BatchEvent) -> Self { DifferentialEvent::Batch(e) } }
48
49
50#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
52pub struct BatcherEvent {
53 pub operator: usize,
55 pub records_diff: isize,
57 pub size_diff: isize,
59 pub capacity_diff: isize,
61 pub allocations_diff: isize,
63}
64
65impl From<BatcherEvent> for DifferentialEvent { fn from(e: BatcherEvent) -> Self { DifferentialEvent::Batcher(e) } }
66
67#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
69pub struct DropEvent {
70 pub operator: usize,
72 pub length: usize,
74}
75
76impl From<DropEvent> for DifferentialEvent { fn from(e: DropEvent) -> Self { DifferentialEvent::Drop(e) } }
77
78#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
80pub struct MergeEvent {
81 pub operator: usize,
83 pub scale: usize,
85 pub length1: usize,
87 pub length2: usize,
89 pub complete: Option<usize>,
91}
92
93impl From<MergeEvent> for DifferentialEvent { fn from(e: MergeEvent) -> Self { DifferentialEvent::Merge(e) } }
94
95#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
97pub struct MergeShortfall {
98 pub operator: usize,
100 pub scale: usize,
102 pub shortfall: usize,
104}
105
106impl From<MergeShortfall> for DifferentialEvent { fn from(e: MergeShortfall) -> Self { DifferentialEvent::MergeShortfall(e) } }
107
108#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
110pub struct TraceShare {
111 pub operator: usize,
113 pub diff: isize,
115}
116
117impl From<TraceShare> for DifferentialEvent { fn from(e: TraceShare) -> Self { DifferentialEvent::TraceShare(e) } }