timely/
logging.rs

1//! Traits, implementations, and macros related to logging timely events.
2
3/// Type alias for logging timely events.
4pub type WorkerIdentifier = usize;
5/// Logger type for worker-local logging.
6pub type Logger<Event> = crate::logging_core::Logger<Event, WorkerIdentifier>;
7/// Logger for timely dataflow system events.
8pub type TimelyLogger = Logger<TimelyEvent>;
9/// Logger for timely dataflow progress events (the "timely/progress" log stream).
10pub type TimelyProgressLogger = Logger<TimelyProgressEvent>;
11
12use std::time::Duration;
13use crate::dataflow::operators::capture::{Event, EventPusher};
14
15/// Logs events as a timely stream, with progress statements.
16pub struct BatchLogger<T, E, P> where P: EventPusher<Duration, (Duration, E, T)> {
17    // None when the logging stream is closed
18    time: Duration,
19    event_pusher: P,
20    _phantom: ::std::marker::PhantomData<(E, T)>,
21}
22
23impl<T, E, P> BatchLogger<T, E, P> where P: EventPusher<Duration, (Duration, E, T)> {
24    /// Creates a new batch logger.
25    pub fn new(event_pusher: P) -> Self {
26        BatchLogger {
27            time: Default::default(),
28            event_pusher,
29            _phantom: ::std::marker::PhantomData,
30        }
31    }
32    /// Publishes a batch of logged events and advances the capability.
33    pub fn publish_batch(&mut self, &time: &Duration, data: &mut Vec<(Duration, E, T)>) {
34        if !data.is_empty() {
35            self.event_pusher.push(Event::Messages(self.time, data.drain(..).collect()));
36        }
37        if self.time < time {
38            let new_frontier = time;
39            let old_frontier = self.time;
40            self.event_pusher.push(Event::Progress(vec![(new_frontier, 1), (old_frontier, -1)]));
41        }
42        self.time = time;
43    }
44}
45impl<T, E, P> Drop for BatchLogger<T, E, P> where P: EventPusher<Duration, (Duration, E, T)> {
46    fn drop(&mut self) {
47        self.event_pusher.push(Event::Progress(vec![(self.time, -1)]));
48    }
49}
50
51#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
52/// The creation of an `Operate` implementor.
53pub struct OperatesEvent {
54    /// Worker-unique identifier for the operator.
55    pub id: usize,
56    /// Sequence of nested scope identifiers indicating the path from the root to this instance.
57    pub addr: Vec<usize>,
58    /// A helpful name.
59    pub name: String,
60}
61
62#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
63/// The creation of a channel between operators.
64pub struct ChannelsEvent {
65    /// Worker-unique identifier for the channel
66    pub id: usize,
67    /// Sequence of nested scope identifiers indicating the path from the root to this instance.
68    pub scope_addr: Vec<usize>,
69    /// Source descriptor, indicating operator index and output port.
70    pub source: (usize, usize),
71    /// Target descriptor, indicating operator index and input port.
72    pub target: (usize, usize),
73}
74
75/// Encapsulates Any and Debug for dynamically typed timestamps in logs
76pub trait ProgressEventTimestamp: std::fmt::Debug + std::any::Any {
77    /// Upcasts this `ProgressEventTimestamp` to `Any`.
78    ///
79    /// NOTE: This is required until <https://github.com/rust-lang/rfcs/issues/2765> is fixed
80    ///
81    /// # Example
82    /// ```rust
83    /// let ts = vec![(0usize, 0usize, (23u64, 10u64), -4i64), (0usize, 0usize, (23u64, 11u64), 1i64)];
84    /// let ts: &timely::logging::ProgressEventTimestampVec = &ts;
85    /// for (n, p, t, d) in ts.iter() {
86    ///     print!("{:?}, ", (n, p, t.as_any().downcast_ref::<(u64, u64)>(), d));
87    /// }
88    /// println!();
89    /// ```
90    fn as_any(&self) -> &dyn std::any::Any;
91
92    /// Returns the name of the concrete type of this object.
93    ///
94    /// # Note
95    ///
96    /// This is intended for diagnostic use. The exact contents and format of the
97    /// string returned are not specified, other than being a best-effort
98    /// description of the type. For example, amongst the strings
99    /// that `type_name::<Option<String>>()` might return are `"Option<String>"` and
100    /// `"std::option::Option<std::string::String>"`.
101    fn type_name(&self) -> &'static str;
102}
103impl<T: crate::Data + std::fmt::Debug + std::any::Any> ProgressEventTimestamp for T {
104    fn as_any(&self) -> &dyn std::any::Any { self }
105
106    fn type_name(&self) -> &'static str { std::any::type_name::<T>() }
107}
108
109/// A vector of progress updates in logs
110///
111/// This exists to support upcasting of the concrecte progress update vectors to
112/// `dyn ProgressEventTimestamp`. Doing so at the vector granularity allows us to
113/// use a single allocation for the entire vector (as opposed to a `Box` allocation
114/// for each dynamically typed element).
115pub trait ProgressEventTimestampVec: std::fmt::Debug + std::any::Any {
116    /// Iterate over the contents of the vector
117    fn iter<'a>(&'a self) -> Box<dyn Iterator<Item=(&'a usize, &'a usize, &'a dyn ProgressEventTimestamp, &'a i64)>+'a>;
118}
119
120impl<T: ProgressEventTimestamp> ProgressEventTimestampVec for Vec<(usize, usize, T, i64)> {
121    fn iter<'a>(&'a self) -> Box<dyn Iterator<Item=(&'a usize, &'a usize, &'a dyn ProgressEventTimestamp, &'a i64)>+'a> {
122        Box::new(<[(usize, usize, T, i64)]>::iter(&self[..]).map(|(n, p, t, d)| {
123            let t: &dyn ProgressEventTimestamp = t;
124            (n, p, t, d)
125        }))
126    }
127}
128
129#[derive(Debug)]
130/// Send or receive of progress information.
131pub struct TimelyProgressEvent {
132    /// `true` if the event is a send, and `false` if it is a receive.
133    pub is_send: bool,
134    /// Source worker index.
135    pub source: usize,
136    /// Communication channel identifier
137    pub channel: usize,
138    /// Message sequence number.
139    pub seq_no: usize,
140    /// Sequence of nested scope identifiers indicating the path from the root to this instance.
141    pub addr: Vec<usize>,
142    /// List of message updates, containing Target descriptor, timestamp as string, and delta.
143    pub messages: Box<dyn ProgressEventTimestampVec>,
144    /// List of capability updates, containing Source descriptor, timestamp as string, and delta.
145    pub internal: Box<dyn ProgressEventTimestampVec>,
146}
147
148#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
149/// External progress pushed onto an operator
150pub struct PushProgressEvent {
151    /// Worker-unique operator identifier
152    pub op_id: usize,
153}
154
155#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
156/// Message send or receive event
157pub struct MessagesEvent {
158    /// `true` if send event, `false` if receive event.
159    pub is_send: bool,
160    /// Channel identifier
161    pub channel: usize,
162    /// Source worker index.
163    pub source: usize,
164    /// Target worker index.
165    pub target: usize,
166    /// Message sequence number.
167    pub seq_no: usize,
168    /// Number of typed records in the message.
169    pub length: usize,
170}
171
172/// Records the starting and stopping of an operator.
173#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, PartialEq, Eq, Ord, PartialOrd)]
174pub enum StartStop {
175    /// Operator starts.
176    Start,
177    /// Operator stops.
178    Stop,
179}
180
181#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
182/// Operator start or stop.
183pub struct ScheduleEvent {
184    /// Worker-unique identifier for the operator, linkable to the identifiers in `OperatesEvent`.
185    pub id: usize,
186    /// `Start` if the operator is starting, `Stop` if it is stopping.
187    /// activity is true if it looks like some useful work was performed during this call (data was
188    /// read or written, notifications were requested / delivered)
189    pub start_stop: StartStop,
190}
191
192impl ScheduleEvent {
193    /// Creates a new start scheduling event.
194    pub fn start(id: usize) -> Self { ScheduleEvent { id, start_stop: StartStop::Start } }
195    /// Creates a new stop scheduling event and reports whether work occurred.
196    pub fn stop(id: usize) -> Self { ScheduleEvent { id, start_stop: StartStop::Stop } }
197}
198
199#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
200/// Operator shutdown.
201pub struct ShutdownEvent {
202    /// Worker-unique identifier for the operator, linkable to the identifiers in `OperatesEvent`.
203    pub id: usize,
204}
205
206#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
207/// Application-defined code start or stop
208pub struct ApplicationEvent {
209    /// Unique event type identifier
210    pub id: usize,
211    /// True when activity begins, false when it stops
212    pub is_start: bool,
213}
214
215#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
216/// Application-defined code start or stop
217pub struct GuardedMessageEvent {
218    /// True when activity begins, false when it stops
219    pub is_start: bool,
220}
221
222#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
223/// Application-defined code start or stop
224pub struct GuardedProgressEvent {
225    /// True when activity begins, false when it stops
226    pub is_start: bool,
227}
228
229#[derive(Serialize, Deserialize, Abomonation, Debug, PartialEq, Eq, Hash, Clone, Copy)]
230/// Identifier of the worker that generated a log line
231pub struct TimelySetup {
232    /// Worker index
233    pub index: usize,
234}
235
236#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
237/// Kind of communication channel
238pub enum CommChannelKind {
239    /// Communication channel carrying progress information
240    Progress,
241    /// Communication channel carrying data
242    Data,
243}
244
245#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
246/// Event on a communication channel
247pub struct CommChannelsEvent {
248    /// Communication channel identifier
249    pub identifier: usize,
250    /// Kind of communication channel (progress / data)
251    pub kind: CommChannelKind,
252}
253
254#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
255/// Input logic start/stop
256pub struct InputEvent {
257    /// True when activity begins, false when it stops
258    pub start_stop: StartStop,
259}
260
261/// Records the starting and stopping of an operator.
262#[derive(Serialize, Deserialize, Abomonation, Debug, Clone, Hash, PartialEq, Eq, Ord, PartialOrd)]
263pub enum ParkEvent {
264    /// Worker parks.
265    Park(Option<Duration>),
266    /// Worker unparks.
267    Unpark,
268}
269
270impl ParkEvent {
271    /// Creates a new park event from the supplied duration.
272    pub fn park(duration: Option<Duration>) -> Self { ParkEvent::Park(duration) }
273    /// Creates a new unpark event.
274    pub fn unpark() -> Self { ParkEvent::Unpark }
275}
276
277#[derive(Serialize, Deserialize, Debug, Clone, Abomonation, Hash, Eq, PartialEq, Ord, PartialOrd)]
278/// An event in a timely worker
279pub enum TimelyEvent {
280    /// Operator creation.
281    Operates(OperatesEvent),
282    /// Channel creation.
283    Channels(ChannelsEvent),
284    /// Progress propagation (reasoning).
285    PushProgress(PushProgressEvent),
286    /// Message send or receive.
287    Messages(MessagesEvent),
288    /// Operator start or stop.
289    Schedule(ScheduleEvent),
290    /// Operator shutdown.
291    Shutdown(ShutdownEvent),
292    /// No clue.
293    Application(ApplicationEvent),
294    /// Per-message computation.
295    GuardedMessage(GuardedMessageEvent),
296    /// Per-notification computation.
297    GuardedProgress(GuardedProgressEvent),
298    /// Communication channel event.
299    CommChannels(CommChannelsEvent),
300    /// Input event.
301    Input(InputEvent),
302    /// Park event.
303    Park(ParkEvent),
304    /// Unstructured event.
305    Text(String),
306}
307
308impl From<OperatesEvent> for TimelyEvent {
309    fn from(v: OperatesEvent) -> TimelyEvent { TimelyEvent::Operates(v) }
310}
311
312impl From<ChannelsEvent> for TimelyEvent {
313    fn from(v: ChannelsEvent) -> TimelyEvent { TimelyEvent::Channels(v) }
314}
315
316impl From<PushProgressEvent> for TimelyEvent {
317    fn from(v: PushProgressEvent) -> TimelyEvent { TimelyEvent::PushProgress(v) }
318}
319
320impl From<MessagesEvent> for TimelyEvent {
321    fn from(v: MessagesEvent) -> TimelyEvent { TimelyEvent::Messages(v) }
322}
323
324impl From<ScheduleEvent> for TimelyEvent {
325    fn from(v: ScheduleEvent) -> TimelyEvent { TimelyEvent::Schedule(v) }
326}
327
328impl From<ShutdownEvent> for TimelyEvent {
329    fn from(v: ShutdownEvent) -> TimelyEvent { TimelyEvent::Shutdown(v) }
330}
331
332impl From<ApplicationEvent> for TimelyEvent {
333    fn from(v: ApplicationEvent) -> TimelyEvent { TimelyEvent::Application(v) }
334}
335
336impl From<GuardedMessageEvent> for TimelyEvent {
337    fn from(v: GuardedMessageEvent) -> TimelyEvent { TimelyEvent::GuardedMessage(v) }
338}
339
340impl From<GuardedProgressEvent> for TimelyEvent {
341    fn from(v: GuardedProgressEvent) -> TimelyEvent { TimelyEvent::GuardedProgress(v) }
342}
343
344impl From<CommChannelsEvent> for TimelyEvent {
345    fn from(v: CommChannelsEvent) -> TimelyEvent { TimelyEvent::CommChannels(v) }
346}
347
348impl From<InputEvent> for TimelyEvent {
349    fn from(v: InputEvent) -> TimelyEvent { TimelyEvent::Input(v) }
350}
351
352impl From<ParkEvent> for TimelyEvent {
353    fn from(v: ParkEvent) -> TimelyEvent { TimelyEvent::Park(v) }
354}