1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
//! Data structure for a `LogRecord`, `Pair` two-dimensional time type. //! A `LogRecord` constitutes the unified `struct` representation of //! log messages from various stream processors. //! It is the underlying structure from which the PAG construction starts. #![deny(missing_docs)] #[macro_use] extern crate abomonation_derive; use std::cmp::Ordering; /// The various types of activity that can happen in a dataflow. /// `Unknown` et al. shouldn't be emitted by instrumentation. Instead, /// they might be inserted as helpers during PAG construction. #[derive(Abomonation, PartialEq, Debug, Clone, Copy, Hash, Eq, PartialOrd, Ord)] pub enum ActivityType { /// Operator scheduled. Used as temporary state for `LogRecord`s /// where it's still unclear whether they were only spinning or /// also did some work Scheduling = 0, /// Operator actually doing work Processing = 2, /// Operator scheduled, but not doing any work Spinning = 1, /// Data serialization Serialization = 3, /// Data deserialization Deserialization = 4, /// remote control messages, e.g. about progress ControlMessage = 5, /// remote data messages, e.g. moving tuples around DataMessage = 6, /// Waiting for unblocking. /// In particular, operator might wait for external input. /// (not emitted by profiling) Waiting = 8, /// Waiting where next activity is actively prepared, /// e.g. in-between a ScheduleEnd and consecutive ScheduleStart. /// In particular, operator doesn't depend on external input. /// (not emitted by profiling) Busy = 9, } /// What "side" of the event did we log? E.g., for /// scheduling events, it might be the start or end of the event; /// for messages, we might log the sender or receiver. #[derive(Abomonation, PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Clone, Copy)] pub enum EventType { /// Start of an event (e.g. ScheduleStart, Sending a Message) Start = 1, /// End of an event (e.g. ScheduleEnd, Receiving a Message) End = 2, /// Sender end of an event (e.g. a data message) Sent = 3, /// Receiver end of an event (e.g. a data message) Received = 4, } /// A worker ID pub type Worker = u64; /// An event timestamp pub type Timestamp = std::time::Duration; /// Type used as identifiers for (mapping between) two event sides pub type CorrelatorId = u64; /// A worker-local operator ID pub type OperatorId = u64; /// A worker-local channel ID pub type ChannelId = u64; /// A `LogRecord` constitutes the unified `struct` representation of /// log messages from various stream processors. /// /// It is the underlying structure from which the PAG construction starts. /// If necessary, it can also be serialized e.g. into a `msgpack` representation. #[derive(Abomonation, PartialEq, Eq, Hash, Clone, Debug)] pub struct LogRecord { /// worker-unique identifier of a message, given in order the events are logged /// in the computation. pub seq_no: u64, /// epoch of the computation this record belongs to pub epoch: u64, /// Event time in nanoseconds since the Epoch (midnight, January 1, 1970 UTC). pub timestamp: Timestamp, /// Context this event occured in; denotes which of the parallel timelines it belongs to. pub local_worker: Worker, /// Describes the instrumentation point which triggered this event. pub activity_type: ActivityType, /// Identifies which end of an edge this program event belongs to. pub event_type: EventType, /// Similar to `local_worker` but specifies the worker ID for the other end of a sent/received message. pub remote_worker: Option<Worker>, /// Unique id for the operator in the dataflow. This only applies for some event types, e.g. scheduling or processing. pub operator_id: Option<OperatorId>, /// Unique id for the channel in the dataflow. This only applies for some event types, e.g. data / control messages. pub channel_id: Option<ChannelId>, /// correlates remote events belonging together pub correlator_id: Option<u64>, /// Number of records to detect skew pub length: Option<usize>, } impl Ord for LogRecord { fn cmp(&self, other: &LogRecord) -> Ordering { self.timestamp.cmp(&other.timestamp) } } impl PartialOrd for LogRecord { fn partial_cmp(&self, other: &LogRecord) -> Option<Ordering> { Some(self.cmp(other)) } } /// This module contains a definition of a new timestamp time, a "pair" or product. /// /// Note: Its partial order trait is modified so that it follows a lexicographical order; /// It is not truly partially ordered (cf. the `compare_pairs` test)! pub mod pair { use differential_dataflow::lattice::Lattice; use timely::{ order::{PartialOrder, TotalOrder}, progress::{timestamp::Refines, PathSummary, Timestamp} }; use std::fmt::{Formatter, Error, Debug}; /// A pair of timestamps, partially ordered by the product order. #[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Abomonation)] pub struct Pair<S, T> { /// first part of timestamp pub first: S, /// second part of timestampe pub second: T, } impl<S, T> Pair<S, T> { /// Create a new pair. pub fn new(first: S, second: T) -> Self { Pair { first, second } } } /// Implement timely dataflow's `PartialOrder` trait. /// Note: This is in fact a total order implementation! impl<S: PartialOrder, T: PartialOrder> PartialOrder for Pair<S, T> { fn less_equal(&self, other: &Self) -> bool { self.first.less_than(&other.first) || self.first.less_equal(&other.first) && self.second.less_equal(&other.second) } } impl<S: TotalOrder, T: TotalOrder> TotalOrder for Pair<S, T> {} #[test] fn compare_pairs() { assert!(Pair::new(0, 0).less_equal(&Pair::new(0,0))); assert!(Pair::new(0, 0).less_equal(&Pair::new(0,1))); assert!(Pair::new(0, 0).less_equal(&Pair::new(1,0))); assert!(Pair::new(0, 1).less_equal(&Pair::new(1,0))); assert!(Pair::new(1, 0).less_equal(&Pair::new(1,0))); assert!(Pair::new(1, 0).less_equal(&Pair::new(1,1))); assert!(!Pair::new(1, 0).less_equal(&Pair::new(0,1))); assert!(!Pair::new(1, 1).less_equal(&Pair::new(0,1000))); assert!(Pair::new(0, 1000).less_equal(&Pair::new(1, 1))); } impl<S: Timestamp, T: Timestamp> Refines<()> for Pair<S, T> { fn to_inner(_outer: ()) -> Self { Default::default() } fn to_outer(self) -> () { () } fn summarize(_summary: <Self>::Summary) -> () { () } } /// Implement timely dataflow's `PathSummary` trait. /// This is preparation for the `Timestamp` implementation below. impl<S: Timestamp, T: Timestamp> PathSummary<Pair<S,T>> for () { fn results_in(&self, timestamp: &Pair<S, T>) -> Option<Pair<S,T>> { Some(timestamp.clone()) } fn followed_by(&self, other: &Self) -> Option<Self> { Some(other.clone()) } } /// Implement timely dataflow's `Timestamp` trait. impl<S: Timestamp, T: Timestamp> Timestamp for Pair<S, T> { type Summary = (); } /// Implement differential dataflow's `Lattice` trait. /// This extends the `PartialOrder` implementation with additional structure. impl<S: Lattice, T: Lattice> Lattice for Pair<S, T> { fn minimum() -> Self { Pair { first: S::minimum(), second: T::minimum() }} fn join(&self, other: &Self) -> Self { Pair { first: self.first.join(&other.first), second: self.second.join(&other.second), } } fn meet(&self, other: &Self) -> Self { Pair { first: self.first.meet(&other.first), second: self.second.meet(&other.second), } } } /// Debug implementation to avoid seeing fully qualified path names. impl<TOuter: Debug, TInner: Debug> Debug for Pair<TOuter, TInner> { fn fmt(&self, f: &mut Formatter) -> Result<(), Error> { f.write_str(&format!("({:?}, {:?})", self.first, self.second)) } } }