1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
pub type Logger = ::timely::logging::Logger<DifferentialEvent>;
pub fn enable<A, W>(worker: &mut timely::worker::Worker<A>, writer: W) -> Option<Box<dyn std::any::Any+'static>>
where
    A: timely::communication::Allocate,
    W: std::io::Write+'static,
{
    let writer = ::timely::dataflow::operators::capture::EventWriter::new(writer);
    let mut logger = ::timely::logging::BatchLogger::new(writer);
    worker
        .log_register()
        .insert::<DifferentialEvent,_>("differential/arrange", move |time, data| logger.publish_batch(time, data))
}
#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
pub enum DifferentialEvent {
    
    Batch(BatchEvent),
    
    Merge(MergeEvent),
    
    Drop(DropEvent),
    
    MergeShortfall(MergeShortfall),
    
    TraceShare(TraceShare),
}
#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
pub struct BatchEvent {
    
    pub operator: usize,
    
    pub length: usize,
}
impl From<BatchEvent> for DifferentialEvent { fn from(e: BatchEvent) -> Self { DifferentialEvent::Batch(e) } }
#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
pub struct DropEvent {
    
    pub operator: usize,
    
    pub length: usize,
}
impl From<DropEvent> for DifferentialEvent { fn from(e: DropEvent) -> Self { DifferentialEvent::Drop(e) } }
#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
pub struct MergeEvent {
    
    pub operator: usize,
    
    pub scale: usize,
    
    pub length1: usize,
    
    pub length2: usize,
    
    pub complete: Option<usize>,
}
impl From<MergeEvent> for DifferentialEvent { fn from(e: MergeEvent) -> Self { DifferentialEvent::Merge(e) } }
#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
pub struct MergeShortfall {
    
    pub operator: usize,
    
    pub scale: usize,
    
    pub shortfall: usize,
}
impl From<MergeShortfall> for DifferentialEvent { fn from(e: MergeShortfall) -> Self { DifferentialEvent::MergeShortfall(e) } }
#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
pub struct TraceShare {
    
    pub operator: usize,
    
    pub diff: isize,
}
impl From<TraceShare> for DifferentialEvent { fn from(e: TraceShare) -> Self { DifferentialEvent::TraceShare(e) } }