st2_logformat/lib.rs
1//! Data structure for a `LogRecord`, `Pair` two-dimensional time type.
2//! A `LogRecord` constitutes the unified `struct` representation of
3//! log messages from various stream processors.
4//! It is the underlying structure from which the PAG construction starts.
5
6#![deny(missing_docs)]
7
8#[macro_use]
9extern crate abomonation_derive;
10
11use std::cmp::Ordering;
12
13/// The various types of activity that can happen in a dataflow.
14/// `Unknown` et al. shouldn't be emitted by instrumentation. Instead,
15/// they might be inserted as helpers during PAG construction.
16#[derive(Abomonation, PartialEq, Debug, Clone, Copy, Hash, Eq, PartialOrd, Ord)]
17pub enum ActivityType {
18 /// Operator scheduled. Used as temporary state for `LogRecord`s
19 /// where it's still unclear whether they were only spinning or
20 /// also did some work
21 Scheduling = 0,
22 /// Operator actually doing work
23 Processing = 2,
24 /// Operator scheduled, but not doing any work
25 Spinning = 1,
26 /// Data serialization
27 Serialization = 3,
28 /// Data deserialization
29 Deserialization = 4,
30 /// remote control messages, e.g. about progress
31 ControlMessage = 5,
32 /// remote data messages, e.g. moving tuples around
33 DataMessage = 6,
34 /// Waiting for unblocking.
35 /// In particular, operator might wait for external input.
36 /// (not emitted by profiling)
37 Waiting = 8,
38 /// Waiting where next activity is actively prepared,
39 /// e.g. in-between a ScheduleEnd and consecutive ScheduleStart.
40 /// In particular, operator doesn't depend on external input.
41 /// (not emitted by profiling)
42 Busy = 9,
43}
44
45/// What "side" of the event did we log? E.g., for
46/// scheduling events, it might be the start or end of the event;
47/// for messages, we might log the sender or receiver.
48#[derive(Abomonation, PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone, Copy)]
49pub enum EventType {
50 /// Start of an event (e.g. ScheduleStart, Sending a Message)
51 Start = 1,
52 /// End of an event (e.g. ScheduleEnd, Receiving a Message)
53 End = 2,
54 /// Sender end of an event (e.g. a data message)
55 Sent = 3,
56 /// Receiver end of an event (e.g. a data message)
57 Received = 4,
58}
59
60/// A worker ID
61pub type Worker = u64;
62/// An event timestamp
63pub type Timestamp = std::time::Duration;
64/// Type used as identifiers for (mapping between) two event sides
65pub type CorrelatorId = u64;
66/// A worker-local operator ID
67pub type OperatorId = u64;
68/// A worker-local channel ID
69pub type ChannelId = u64;
70
71
72/// A `LogRecord` constitutes the unified `struct` representation of
73/// log messages from various stream processors.
74///
75/// It is the underlying structure from which the PAG construction starts.
76/// If necessary, it can also be serialized e.g. into a `msgpack` representation.
77#[derive(Abomonation, PartialEq, Eq, Hash, Clone, Debug)]
78pub struct LogRecord {
79 /// worker-unique identifier of a message, given in order the events are logged
80 /// in the computation.
81 pub seq_no: u64,
82 /// epoch of the computation this record belongs to
83 pub epoch: u64,
84 /// Event time in nanoseconds since the Epoch (midnight, January 1, 1970 UTC).
85 pub timestamp: Timestamp,
86 /// Context this event occured in; denotes which of the parallel timelines it belongs to.
87 pub local_worker: Worker,
88 /// Describes the instrumentation point which triggered this event.
89 pub activity_type: ActivityType,
90 /// Identifies which end of an edge this program event belongs to.
91 pub event_type: EventType,
92 /// Similar to `local_worker` but specifies the worker ID for the other end of a sent/received message.
93 pub remote_worker: Option<Worker>,
94 /// Unique id for the operator in the dataflow. This only applies for some event types, e.g. scheduling or processing.
95 pub operator_id: Option<OperatorId>,
96 /// Unique id for the channel in the dataflow. This only applies for some event types, e.g. data / control messages.
97 pub channel_id: Option<ChannelId>,
98 /// correlates remote events belonging together
99 pub correlator_id: Option<u64>,
100 /// Number of records to detect skew
101 pub length: Option<usize>,
102}
103
104impl Ord for LogRecord {
105 fn cmp(&self, other: &LogRecord) -> Ordering {
106 self.timestamp.cmp(&other.timestamp)
107 }
108}
109
110impl PartialOrd for LogRecord {
111 fn partial_cmp(&self, other: &LogRecord) -> Option<Ordering> {
112 Some(self.cmp(other))
113 }
114}
115
116
117/// This module contains a definition of a new timestamp time, a "pair" or product.
118///
119/// Note: Its partial order trait is modified so that it follows a lexicographical order;
120/// It is not truly partially ordered (cf. the `compare_pairs` test)!
121pub mod pair {
122 use differential_dataflow::lattice::Lattice;
123
124 use timely::{
125 order::{PartialOrder, TotalOrder},
126 progress::{timestamp::Refines, PathSummary, Timestamp}
127 };
128
129 use std::fmt::{Formatter, Error, Debug};
130
131 /// A pair of timestamps, partially ordered by the product order.
132 #[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Abomonation)]
133 pub struct Pair<S, T> {
134 /// first part of timestamp
135 pub first: S,
136 /// second part of timestampe
137 pub second: T,
138 }
139
140 impl<S, T> Pair<S, T> {
141 /// Create a new pair.
142 pub fn new(first: S, second: T) -> Self {
143 Pair { first, second }
144 }
145 }
146
147 /// Implement timely dataflow's `PartialOrder` trait.
148 /// Note: This is in fact a total order implementation!
149 impl<S: PartialOrder, T: PartialOrder> PartialOrder for Pair<S, T> {
150 fn less_equal(&self, other: &Self) -> bool {
151 self.first.less_than(&other.first) ||
152 self.first.less_equal(&other.first) && self.second.less_equal(&other.second)
153 }
154 }
155
156 impl<S: TotalOrder, T: TotalOrder> TotalOrder for Pair<S, T> {}
157
158 #[test]
159 fn compare_pairs() {
160 assert!(Pair::new(0, 0).less_equal(&Pair::new(0,0)));
161 assert!(Pair::new(0, 0).less_equal(&Pair::new(0,1)));
162 assert!(Pair::new(0, 0).less_equal(&Pair::new(1,0)));
163 assert!(Pair::new(0, 1).less_equal(&Pair::new(1,0)));
164 assert!(Pair::new(1, 0).less_equal(&Pair::new(1,0)));
165 assert!(Pair::new(1, 0).less_equal(&Pair::new(1,1)));
166 assert!(!Pair::new(1, 0).less_equal(&Pair::new(0,1)));
167
168 assert!(!Pair::new(1, 1).less_equal(&Pair::new(0,1000)));
169 assert!(Pair::new(0, 1000).less_equal(&Pair::new(1, 1)));
170 }
171
172 impl<S: Timestamp, T: Timestamp> Refines<()> for Pair<S, T> {
173 fn to_inner(_outer: ()) -> Self { Default::default() }
174 fn to_outer(self) -> () { () }
175 fn summarize(_summary: <Self>::Summary) -> () { () }
176 }
177
178 /// Implement timely dataflow's `PathSummary` trait.
179 /// This is preparation for the `Timestamp` implementation below.
180 impl<S: Timestamp, T: Timestamp> PathSummary<Pair<S,T>> for () {
181 fn results_in(&self, timestamp: &Pair<S, T>) -> Option<Pair<S,T>> {
182 Some(timestamp.clone())
183 }
184 fn followed_by(&self, other: &Self) -> Option<Self> {
185 Some(other.clone())
186 }
187 }
188
189 /// Implement timely dataflow's `Timestamp` trait.
190 impl<S: Timestamp, T: Timestamp> Timestamp for Pair<S, T> {
191 type Summary = ();
192 }
193
194 /// Implement differential dataflow's `Lattice` trait.
195 /// This extends the `PartialOrder` implementation with additional structure.
196 impl<S: Lattice, T: Lattice> Lattice for Pair<S, T> {
197 fn minimum() -> Self { Pair { first: S::minimum(), second: T::minimum() }}
198 fn join(&self, other: &Self) -> Self {
199 Pair {
200 first: self.first.join(&other.first),
201 second: self.second.join(&other.second),
202 }
203 }
204 fn meet(&self, other: &Self) -> Self {
205 Pair {
206 first: self.first.meet(&other.first),
207 second: self.second.meet(&other.second),
208 }
209 }
210 }
211
212
213 /// Debug implementation to avoid seeing fully qualified path names.
214 impl<TOuter: Debug, TInner: Debug> Debug for Pair<TOuter, TInner> {
215 fn fmt(&self, f: &mut Formatter) -> Result<(), Error> {
216 f.write_str(&format!("({:?}, {:?})", self.first, self.second))
217 }
218 }
219}