st2_logformat/
lib.rs

1//! Data structure for a `LogRecord`, `Pair` two-dimensional time type.
2//! A `LogRecord` constitutes the unified `struct` representation of
3//! log messages from various stream processors.
4//! It is the underlying structure from which the PAG construction starts.
5
6#![deny(missing_docs)]
7
8#[macro_use]
9extern crate abomonation_derive;
10
11use std::cmp::Ordering;
12
13/// The various types of activity that can happen in a dataflow.
14/// `Unknown` et al. shouldn't be emitted by instrumentation. Instead,
15/// they might be inserted as helpers during PAG construction.
16#[derive(Abomonation, PartialEq, Debug, Clone, Copy, Hash, Eq, PartialOrd, Ord)]
17pub enum ActivityType {
18    /// Operator scheduled. Used as temporary state for `LogRecord`s
19    /// where it's still unclear whether they were only spinning or
20    /// also did some work
21    Scheduling = 0,
22    /// Operator actually doing work
23    Processing = 2,
24    /// Operator scheduled, but not doing any work
25    Spinning = 1,
26    /// Data serialization
27    Serialization = 3,
28    /// Data deserialization
29    Deserialization = 4,
30    /// remote control messages, e.g. about progress
31    ControlMessage = 5,
32    /// remote data messages, e.g. moving tuples around
33    DataMessage = 6,
34    /// Waiting for unblocking.
35    /// In particular, operator might wait for external input.
36    /// (not emitted by profiling)
37    Waiting = 8,
38    /// Waiting where next activity is actively prepared,
39    /// e.g. in-between a ScheduleEnd and consecutive ScheduleStart.
40    /// In particular, operator doesn't depend on external input.
41    /// (not emitted by profiling)
42    Busy = 9,
43}
44
45/// What "side" of the event did we log? E.g., for
46/// scheduling events, it might be the start or end of the event;
47/// for messages, we might log the sender or receiver.
48#[derive(Abomonation, PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone, Copy)]
49pub enum EventType {
50    /// Start of an event (e.g. ScheduleStart, Sending a Message)
51    Start = 1,
52    /// End of an event (e.g. ScheduleEnd, Receiving a Message)
53    End = 2,
54    /// Sender end of an event (e.g. a data message)
55    Sent = 3,
56    /// Receiver end of an event (e.g. a data message)
57    Received = 4,
58}
59
60/// A worker ID
61pub type Worker = u64;
62/// An event timestamp
63pub type Timestamp = std::time::Duration;
64/// Type used as identifiers for (mapping between) two event sides
65pub type CorrelatorId = u64;
66/// A worker-local operator ID
67pub type OperatorId = u64;
68/// A worker-local channel ID
69pub type ChannelId = u64;
70
71
72/// A `LogRecord` constitutes the unified `struct` representation of
73/// log messages from various stream processors.
74///
75/// It is the underlying structure from which the PAG construction starts.
76/// If necessary, it can also be serialized e.g. into a `msgpack` representation.
77#[derive(Abomonation, PartialEq, Eq, Hash, Clone, Debug)]
78pub struct LogRecord {
79    /// worker-unique identifier of a message, given in order the events are logged
80    /// in the computation.
81    pub seq_no: u64,
82    /// epoch of the computation this record belongs to
83    pub epoch: u64,
84    /// Event time in nanoseconds since the Epoch (midnight, January 1, 1970 UTC).
85    pub timestamp: Timestamp,
86    /// Context this event occured in; denotes which of the parallel timelines it belongs to.
87    pub local_worker: Worker,
88    /// Describes the instrumentation point which triggered this event.
89    pub activity_type: ActivityType,
90    /// Identifies which end of an edge this program event belongs to.
91    pub event_type: EventType,
92    /// Similar to `local_worker` but specifies the worker ID for the other end of a sent/received message.
93    pub remote_worker: Option<Worker>,
94    /// Unique id for the operator in the dataflow. This only applies for some event types, e.g. scheduling or processing.
95    pub operator_id: Option<OperatorId>,
96    /// Unique id for the channel in the dataflow. This only applies for some event types, e.g. data / control messages.
97    pub channel_id: Option<ChannelId>,
98    /// correlates remote events belonging together
99    pub correlator_id: Option<u64>,
100    /// Number of records to detect skew
101    pub length: Option<usize>,
102}
103
104impl Ord for LogRecord {
105    fn cmp(&self, other: &LogRecord) -> Ordering {
106        self.timestamp.cmp(&other.timestamp)
107    }
108}
109
110impl PartialOrd for LogRecord {
111    fn partial_cmp(&self, other: &LogRecord) -> Option<Ordering> {
112        Some(self.cmp(other))
113    }
114}
115
116
117/// This module contains a definition of a new timestamp time, a "pair" or product.
118///
119/// Note: Its partial order trait is modified so that it follows a lexicographical order;
120/// It is not truly partially ordered (cf. the `compare_pairs` test)!
121pub mod pair {
122    use differential_dataflow::lattice::Lattice;
123
124    use timely::{
125        order::{PartialOrder, TotalOrder},
126        progress::{timestamp::Refines, PathSummary, Timestamp}
127    };
128
129    use std::fmt::{Formatter, Error, Debug};
130
131    /// A pair of timestamps, partially ordered by the product order.
132    #[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Abomonation)]
133    pub struct Pair<S, T> {
134        /// first part of timestamp
135        pub first: S,
136        /// second part of timestampe
137        pub second: T,
138    }
139
140    impl<S, T> Pair<S, T> {
141        /// Create a new pair.
142        pub fn new(first: S, second: T) -> Self {
143            Pair { first, second }
144        }
145    }
146
147    /// Implement timely dataflow's `PartialOrder` trait.
148    /// Note: This is in fact a total order implementation!
149    impl<S: PartialOrder, T: PartialOrder> PartialOrder for Pair<S, T> {
150        fn less_equal(&self, other: &Self) -> bool {
151            self.first.less_than(&other.first) ||
152               self.first.less_equal(&other.first) && self.second.less_equal(&other.second)
153        }
154    }
155
156    impl<S: TotalOrder, T: TotalOrder> TotalOrder for Pair<S, T> {}
157
158    #[test]
159    fn compare_pairs() {
160        assert!(Pair::new(0, 0).less_equal(&Pair::new(0,0)));
161        assert!(Pair::new(0, 0).less_equal(&Pair::new(0,1)));
162        assert!(Pair::new(0, 0).less_equal(&Pair::new(1,0)));
163        assert!(Pair::new(0, 1).less_equal(&Pair::new(1,0)));
164        assert!(Pair::new(1, 0).less_equal(&Pair::new(1,0)));
165        assert!(Pair::new(1, 0).less_equal(&Pair::new(1,1)));
166        assert!(!Pair::new(1, 0).less_equal(&Pair::new(0,1)));
167
168        assert!(!Pair::new(1, 1).less_equal(&Pair::new(0,1000)));
169        assert!(Pair::new(0, 1000).less_equal(&Pair::new(1, 1)));
170    }
171
172    impl<S: Timestamp, T: Timestamp> Refines<()> for Pair<S, T> {
173        fn to_inner(_outer: ()) -> Self { Default::default() }
174        fn to_outer(self) -> () { () }
175        fn summarize(_summary: <Self>::Summary) -> () { () }
176    }
177
178    /// Implement timely dataflow's `PathSummary` trait.
179    /// This is preparation for the `Timestamp` implementation below.
180    impl<S: Timestamp, T: Timestamp> PathSummary<Pair<S,T>> for () {
181        fn results_in(&self, timestamp: &Pair<S, T>) -> Option<Pair<S,T>> {
182            Some(timestamp.clone())
183        }
184        fn followed_by(&self, other: &Self) -> Option<Self> {
185            Some(other.clone())
186        }
187    }
188
189    /// Implement timely dataflow's `Timestamp` trait.
190    impl<S: Timestamp, T: Timestamp> Timestamp for Pair<S, T> {
191        type Summary = ();
192    }
193
194    /// Implement differential dataflow's `Lattice` trait.
195    /// This extends the `PartialOrder` implementation with additional structure.
196    impl<S: Lattice, T: Lattice> Lattice for Pair<S, T> {
197        fn minimum() -> Self { Pair { first: S::minimum(), second: T::minimum() }}
198        fn join(&self, other: &Self) -> Self {
199            Pair {
200                first: self.first.join(&other.first),
201                second: self.second.join(&other.second),
202            }
203        }
204        fn meet(&self, other: &Self) -> Self {
205            Pair {
206                first: self.first.meet(&other.first),
207                second: self.second.meet(&other.second),
208            }
209        }
210    }
211
212
213    /// Debug implementation to avoid seeing fully qualified path names.
214    impl<TOuter: Debug, TInner: Debug> Debug for Pair<TOuter, TInner> {
215        fn fmt(&self, f: &mut Formatter) -> Result<(), Error> {
216            f.write_str(&format!("({:?}, {:?})", self.first, self.second))
217        }
218    }
219}