erdos/node/operator_event.rs
1use std::{cmp::Ordering, collections::HashSet, fmt};
2
3use crate::{dataflow::Timestamp, Uuid};
4
5/// `OperatorType` is an enum that enumerates the type of operators in Rust.
6/// The different operator types have different execution semantics for the message and watermark
7/// callbacks dictated by the `Ord` trait on the `OperatorEvent`.
8pub enum OperatorType {
9    Parallel,
10    Sequential,
11}
12
13/// `OperatorEvent` is a structure that encapsulates a particular invocation of the
14/// callback in response to a message or watermark. These events are processed according to the
15/// partial order defined by the `PartialOrd` trait, where `x < y` implies `x` *precedes* `y`.
16///
17/// The event is passed to an instance of
18/// [`OperatorExecutor`](../operator_executor/struct.OperatorExecutor.html)
19/// which is in charge of inserting the event into a
20/// [`ExecutionLattice`](../lattice/struct.ExecutionLattice.html). The `ExecutionLattice` ensures
21/// that the events are processed in the partial order defined by the executor.
22pub struct OperatorEvent {
23    /// The timestamp of the event; timestamp of the message for regular callbacks, and timestamp of
24    /// the watermark for watermark callbacks.
25    pub timestamp: Timestamp,
26    /// True if the callback is a watermark callback. Used to ensure that watermark callbacks are
27    /// invoked after regular callbacks.
28    pub is_watermark_callback: bool,
29    /// The priority of the event. Smaller numbers imply higher priority.
30    /// Priority currently only affects the ordering and concurrency of watermark callbacks.
31    /// For two otherwise equal watermark callbacks, the lattice creates a dependency from the lower
32    /// priority event to the higher priority event. Thus, these events cannot run concurrently,
33    /// with the high-priority event running first. An effect is that only watermark callbacks with
34    /// the same priority can run concurrently.
35    pub priority: i8,
36    /// The callback invoked when the event is processed.
37    pub callback: Box<dyn FnOnce()>,
38    /// IDs of items the event requires read access to.
39    pub read_ids: HashSet<Uuid>,
40    /// IDs of items the event requires write access to.
41    pub write_ids: HashSet<Uuid>,
42    /// The type of the operator for which this event is for.
43    operator_type: OperatorType,
44}
45
46impl OperatorEvent {
47    pub fn new(
48        t: Timestamp,
49        is_watermark_callback: bool,
50        priority: i8,
51        read_ids: HashSet<Uuid>,
52        write_ids: HashSet<Uuid>,
53        callback: impl FnOnce() + 'static,
54        operator_type: OperatorType,
55    ) -> Self {
56        Self {
57            priority,
58            timestamp: t,
59            is_watermark_callback,
60            read_ids,
61            write_ids,
62            callback: Box::new(callback),
63            operator_type,
64        }
65    }
66}
67
68unsafe impl Send for OperatorEvent {}
69
70// Implement the `Display` and `Debug` traits so that we can visualize the event.
71impl fmt::Display for OperatorEvent {
72    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
73        write!(
74            f,
75            "Timestamp: {:?}, Watermark: {}",
76            self.timestamp, self.is_watermark_callback
77        )
78    }
79}
80
81impl fmt::Debug for OperatorEvent {
82    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
83        write!(
84            f,
85            "Timestamp: {:?}, Watermark: {}",
86            self.timestamp, self.is_watermark_callback
87        )
88    }
89}
90
91// Implement traits to define the order in which the events should be executed.
92// Make changes to the `cmp` function of the `Ord` trait to change the partial order of the events.
93impl Eq for OperatorEvent {}
94
95impl PartialEq for OperatorEvent {
96    fn eq(&self, other: &OperatorEvent) -> bool {
97        self.cmp(other).is_eq()
98    }
99}
100
101/// Ordering used in the lattice where `self < other` implies `self` *precedes* other.
102impl Ord for OperatorEvent {
103    fn cmp(&self, other: &OperatorEvent) -> Ordering {
104        match self.operator_type {
105            OperatorType::Parallel => {
106                match (self.is_watermark_callback, other.is_watermark_callback) {
107                    (true, true) => {
108                        // Both of the events are watermarks, so the watermark with the lower
109                        // timestamp should run first.
110                        self.timestamp.cmp(&other.timestamp)
111                    }
112                    (true, false) => {
113                        // `self` is a watermark, and `other` is a normal message callback.
114                        // TODO: can compare dependencies to increase parallelism and order on a
115                        // more fine-grained level.
116                        match self.timestamp.cmp(&other.timestamp) {
117                            Ordering::Greater => {
118                                // `self` timestamp is greater than `other`, execute `other` first.
119                                Ordering::Greater
120                            }
121                            Ordering::Equal => {
122                                // `self` timestamp is equal to `other`, execute `other` first.
123                                Ordering::Greater
124                            }
125                            Ordering::Less => {
126                                // `self` timestamp is less than `other`, run them in any order.
127                                // Assume state is time-versioned, so dependency issues should not
128                                // arise.
129                                Ordering::Equal
130                            }
131                        }
132                    }
133                    (false, true) => other.cmp(self).reverse(),
134                    // Neither of the callbacks are watermark callbacks, run in parallel.
135                    (false, false) => Ordering::Equal,
136                }
137            }
138            OperatorType::Sequential => {
139                match (self.is_watermark_callback, other.is_watermark_callback) {
140                    (true, true) => {
141                        // Both of the events are watermarks, so the watermark with the lower
142                        // timestamp should run first.
143                        self.timestamp.cmp(&other.timestamp)
144                    }
145                    (true, false) => {
146                        // `self` is a watermark, and `other` is a normal message callback.
147                        match self.timestamp.cmp(&other.timestamp) {
148                            Ordering::Greater => {
149                                // `self` timestamp is greater than `other`, execute `other` first.
150                                Ordering::Greater
151                            }
152                            Ordering::Equal => {
153                                // `self` timestamp is equal to `other`, execute `other` first.
154                                Ordering::Greater
155                            }
156                            Ordering::Less => {
157                                // `self` timestamp is less than `other`, execute `self` first.
158                                Ordering::Less
159                            }
160                        }
161                    }
162                    (false, true) => other.cmp(self).reverse(),
163                    (false, false) => {
164                        // Both `self` and `other` are message callbacks, execute the one with a
165                        // lower timestamp first, and break ties by prioritizing `self`.
166                        match self.timestamp.cmp(&other.timestamp) {
167                            Ordering::Greater => {
168                                // `self` timestamp is greater than `other`, execute `other` first.
169                                Ordering::Greater
170                            }
171                            Ordering::Equal => {
172                                // The timestamps are equal, prioritize `self`.
173                                Ordering::Less
174                            }
175                            Ordering::Less => {
176                                // `self` timestamp is less than `other`, execute `self` first.
177                                Ordering::Less
178                            }
179                        }
180                    }
181                }
182            }
183        }
184    }
185}
186
187impl PartialOrd for OperatorEvent {
188    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
189        Some(self.cmp(other))
190    }
191}
192
193/*
194#[cfg(test)]
195mod test {
196    use super::*;
197    /// This test ensures that two watermark messages are partially ordered based on their
198    /// timestamps, and the watermark with the lower timestamp is executed first.
199    #[test]
200    fn test_watermark_event_orderings() {
201        {
202            let watermark_event_a: OperatorEvent = OperatorEvent::new(
203                Timestamp::Time(vec![1]),
204                true,
205                0,
206                HashSet::new(),
207                HashSet::new(),
208                || (),
209            );
210            let watermark_event_b: OperatorEvent = OperatorEvent::new(
211                Timestamp::Time(vec![2]),
212                true,
213                0,
214                HashSet::new(),
215                HashSet::new(),
216                || (),
217            );
218            assert!(
219                watermark_event_a < watermark_event_b,
220                "A has a lower timestamp and should precede B."
221            );
222        }
223        // Test that priorities should break ties only for otherwise equal watermark callbacks.
224        {
225            let watermark_event_a: OperatorEvent = OperatorEvent::new(
226                Timestamp::Time(vec![1]),
227                true,
228                -1,
229                HashSet::new(),
230                HashSet::new(),
231                || (),
232            );
233            let watermark_event_b: OperatorEvent = OperatorEvent::new(
234                Timestamp::Time(vec![1]),
235                true,
236                1,
237                HashSet::new(),
238                HashSet::new(),
239                || (),
240            );
241            assert!(
242                watermark_event_a < watermark_event_b,
243                "A is higher priority and should precede B."
244            );
245
246            let watermark_event_c: OperatorEvent = OperatorEvent::new(
247                Timestamp::Time(vec![0]),
248                true,
249                0,
250                HashSet::new(),
251                HashSet::new(),
252                || (),
253            );
254            assert!(
255                watermark_event_a > watermark_event_c,
256                "C has a smaller timestamp and should precede A."
257            );
258            assert!(
259                watermark_event_b > watermark_event_c,
260                "C has a smaller timestamp and should precede B."
261            );
262
263            let watermark_event_d: OperatorEvent = OperatorEvent::new(
264                Timestamp::Time(vec![2]),
265                true,
266                0,
267                HashSet::new(),
268                HashSet::new(),
269                || (),
270            );
271            assert!(
272                watermark_event_a < watermark_event_d,
273                "D has a larger timestamp and should follow A."
274            );
275            assert!(
276                watermark_event_b < watermark_event_d,
277                "D has a larger timestamp and should follow B."
278            );
279
280            // Priority should not affect message events
281            let message_event_a: OperatorEvent = OperatorEvent::new(
282                Timestamp::Time(vec![1]),
283                false,
284                0,
285                HashSet::new(),
286                HashSet::new(),
287                || (),
288            );
289            assert!(
290                message_event_a < watermark_event_a,
291                "Message A should precede Watermark A independent of priority."
292            );
293            assert!(
294                message_event_a < watermark_event_b,
295                "Message A should precede Watermark B independent of priority."
296            );
297
298            let message_event_b: OperatorEvent = OperatorEvent::new(
299                Timestamp::Time(vec![2]),
300                false,
301                0,
302                HashSet::new(),
303                HashSet::new(),
304                || (),
305            );
306            assert_eq!(
307                watermark_event_a, message_event_b,
308                "Watermark A and Message B can execute concurrently."
309            );
310            assert_eq!(
311                watermark_event_b, message_event_b,
312                "Watermark B and Message B can execute concurrently."
313            );
314        }
315    }
316
317    /// This test ensures that two non-watermark messages are rendered equal in their partial order
318    /// and thus can be run concurrently by the executor.
319    #[test]
320    fn test_message_event_orderings() {
321        let message_event_a: OperatorEvent = OperatorEvent::new(
322            Timestamp::Time(vec![1]),
323            false,
324            0,
325            HashSet::new(),
326            HashSet::new(),
327            || (),
328        );
329        let message_event_b: OperatorEvent = OperatorEvent::new(
330            Timestamp::Time(vec![2]),
331            false,
332            0,
333            HashSet::new(),
334            HashSet::new(),
335            || (),
336        );
337        assert!(
338            message_event_a == message_event_b,
339            "Message A and Message B can run concurrently."
340        );
341    }
342
343    #[test]
344    fn test_message_watermark_event_orderings() {
345        // Test that a message with a timestamp less than the watermark ensures that the watermark
346        // is dependent on the message.
347        {
348            let message_event_a: OperatorEvent = OperatorEvent::new(
349                Timestamp::Time(vec![1]),
350                false,
351                0,
352                HashSet::new(),
353                HashSet::new(),
354                || (),
355            );
356            let watermark_event_b: OperatorEvent = OperatorEvent::new(
357                Timestamp::Time(vec![2]),
358                true,
359                0,
360                HashSet::new(),
361                HashSet::new(),
362                || (),
363            );
364            assert!(
365                message_event_a < watermark_event_b,
366                "Message A with timestamp 1 should precede Watermark B with timestamp 2."
367            );
368        }
369
370        // Test that a message with a timestamp equivalent to the watermark is run before the
371        // watermark.
372        {
373            let message_event_a: OperatorEvent = OperatorEvent::new(
374                Timestamp::Time(vec![1]),
375                false,
376                0,
377                HashSet::new(),
378                HashSet::new(),
379                || (),
380            );
381            let watermark_event_b: OperatorEvent = OperatorEvent::new(
382                Timestamp::Time(vec![1]),
383                true,
384                0,
385                HashSet::new(),
386                HashSet::new(),
387                || (),
388            );
389            assert!(
390                message_event_a < watermark_event_b,
391                "Message A with timestamp 1 should precede Watermark B with timestamp 1."
392            );
393        }
394
395        // Test that a message with a timestamp greater than a watermark can be run concurrently
396        // with a watermark of lesser timestamp.
397        {
398            let message_event_a: OperatorEvent = OperatorEvent::new(
399                Timestamp::Time(vec![2]),
400                false,
401                0,
402                HashSet::new(),
403                HashSet::new(),
404                || (),
405            );
406            let watermark_event_b: OperatorEvent = OperatorEvent::new(
407                Timestamp::Time(vec![1]),
408                true,
409                0,
410                HashSet::new(),
411                HashSet::new(),
412                || (),
413            );
414            assert!(
415                message_event_a == watermark_event_b,
416                "Message A with timestamp 1 and Watermark B with timestamp 2 can run concurrently."
417            );
418        }
419    }
420
421    #[test]
422    fn test_resolve_access_conflicts() {
423        let mut write_ids = HashSet::new();
424        write_ids.insert(Uuid::new_deterministic());
425        let event_a = OperatorEvent::new(
426            Timestamp::Time(vec![0]),
427            true,
428            0,
429            HashSet::new(),
430            write_ids.clone(),
431            || {},
432        );
433
434        let event_b = OperatorEvent::new(
435            Timestamp::Time(vec![0]),
436            true,
437            1,
438            HashSet::new(),
439            write_ids.clone(),
440            || {},
441        );
442        assert!(event_a < event_b, "A should precede B due to priority.");
443
444        let mut read_ids = HashSet::new();
445        read_ids.insert(Uuid::new_deterministic());
446        let event_c = OperatorEvent::new(
447            Timestamp::Time(vec![0]),
448            true,
449            0,
450            read_ids,
451            write_ids.clone(),
452            || {},
453        );
454        assert!(
455            event_a < event_c,
456            "A should precede C because A has fewer dependencies."
457        );
458
459        let read_ids = write_ids.clone();
460        let event_d = OperatorEvent::new(
461            Timestamp::Time(vec![0]),
462            true,
463            0,
464            read_ids,
465            HashSet::new(),
466            || {},
467        );
468        assert!(
469            event_a < event_d,
470            "A should precede D due to a WR conflict."
471        );
472    }
473}
474*/