erdos/node/
lattice.rs

1use std::{
2    cmp::Ordering,
3    collections::{BinaryHeap, HashSet},
4    fmt,
5    sync::Arc,
6};
7
8use futures::lock::Mutex;
9use petgraph::{
10    dot::{self, Dot},
11    stable_graph::{EdgeIndex, NodeIndex, StableGraph},
12    visit::{DfsPostOrder, Reversed},
13    Direction,
14};
15
16use crate::{dataflow::Timestamp, node::operator_event::OperatorEvent};
17
18/// `RunnableEvent` is a data structure that is used to represent an event that is ready to be
19/// executed.
20///
21/// A `RunnableEvent` is essentially an index into the lattice, with additional metadata to
22/// prioritize events that are ready to run.
23#[derive(Clone)]
24pub struct RunnableEvent {
25    /// The `node_index` is the index of the runnable event in the lattice.
26    node_index: NodeIndex<u32>,
27    /// The `timestamp` is the timestamp of the event indexed by the id.
28    timestamp: Option<Timestamp>,
29}
30
31impl RunnableEvent {
32    /// Creates a new instance of `RunnableEvent`.
33    pub fn new(node_index: NodeIndex<u32>) -> Self {
34        RunnableEvent {
35            node_index,
36            timestamp: None,
37        }
38    }
39
40    /// Adds a [`Timestamp`] to the event.
41    pub fn with_timestamp(mut self, timestamp: Timestamp) -> Self {
42        self.timestamp = Some(timestamp);
43        self
44    }
45}
46
47// Implement the `Display` and `Debug` traits so that we can visualize the event.
48impl fmt::Display for RunnableEvent {
49    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
50        write!(
51            f,
52            "RunnableEvent(index: {}, Timestamp: {:?}",
53            self.node_index.index(),
54            self.timestamp
55        )
56    }
57}
58
59impl fmt::Debug for RunnableEvent {
60    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
61        write!(
62            f,
63            "RunnableEvent(index: {}, Timestamp: {:?}",
64            self.node_index.index(),
65            self.timestamp
66        )
67    }
68}
69
70// Implement the equality criteria for a RunnableEvent.
71impl Eq for RunnableEvent {}
72
73impl PartialEq for RunnableEvent {
74    // Two events are equal iff they are the same i.e. same index into the lattice.
75    fn eq(&self, other: &RunnableEvent) -> bool {
76        self.node_index == other.node_index
77    }
78}
79
80// Implement the Ordering for a RunnableEvent.
81impl Ord for RunnableEvent {
82    fn cmp(&self, other: &RunnableEvent) -> Ordering {
83        match (self.timestamp.as_ref(), other.timestamp.as_ref()) {
84            (Some(ts1), Some(ts2)) => match ts1.cmp(ts2) {
85                Ordering::Less => Ordering::Greater,
86                Ordering::Greater => Ordering::Less,
87                Ordering::Equal => {
88                    // Break ties with the order of insertion into the lattice.
89                    self.node_index
90                        .index()
91                        .cmp(&other.node_index.index())
92                        .reverse()
93                }
94            },
95            _ => {
96                // We don't have enough information about the timestamps.
97                // Order based on the order of insertion into the lattice.
98                self.node_index
99                    .index()
100                    .cmp(&other.node_index.index())
101                    .reverse()
102            }
103        }
104    }
105}
106
107impl PartialOrd for RunnableEvent {
108    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
109        Some(self.cmp(other))
110    }
111}
112
113/// `ExecutionLattice` is a data structure that maintains [`OperatorEvent`]s in a
114/// [dependency graph](https://en.wikipedia.org/wiki/Dependency_graph) according to the partial
115/// order defined.
116///
117/// Events can be added to the lattice using the `add_events` function, and retrieved using the
118/// `get_event` function. The lattice requires a notification of the completion of the event using
119/// the `mark_as_completed` function in order to unblock dependent events, and make them runnable.
120///
121/// # Example
122/// The below example shows how to insert events into the Lattice and retrieve runnable events from
123/// the lattice.
124/// ```ignore
125/// use erdos::node::{operator_event::OperatorEvent, lattice::ExecutionLattice};
126/// use erdos::dataflow::Timestamp;
127/// use futures::executor::block_on;
128///
129/// async fn async_main() {
130///     let mut lattice: ExecutionLattice = ExecutionLattice::new();
131///
132///     // Add two events of timestamp 1 and 2 to the lattice with empty callbacks.
133///     events = vec![
134///         OperatorEvent::new(Timestamp::Time(vec![1]),
135///             true, 0, HashSet::new(), HashSet::new(), || ())
136///         OperatorEvent::new(Timestamp::Time(vec![2]),
137///             true, 0, HashSet::new(), HashSet::new(), || ())
138///     ];
139///     lattice.add_events(events).await;
140///
141///     // Retrieve the first event from the lattice.
142///     let (event_1, event_id_1) = lattice.get_event().await.unwrap();
143///
144///     // If we try to retrieve another event, we get None since we haven't marked the
145///     // completion of the event with timestamp 1.
146///     assert_eq!(lattice.get_event().await.is_none(), true);
147///
148///     // Mark the first event as completed.
149///     lattice.mark_as_completed(event_id_1).await;
150///
151///     // Now, get the second event from the lattice.
152///     let (event_2, event_id_2) = lattice.get_event().await.unwrap();
153/// }
154///
155/// fn main() {
156///     block_on(async_main());
157/// }
158/// ```
159pub struct ExecutionLattice {
160    /// The `forest` is the directed acyclic graph that maintains the dependency graph of the
161    /// events. The relation A -> B means that A *depends on* B.This dependency also indicates that
162    /// B precedes A (B < A) in the ordering. An event can be executed if it has no outbound edges.
163    forest: Arc<Mutex<StableGraph<Option<OperatorEvent>, ()>>>,
164    /// The `leaves` are the leaves of the forest of graphs, have no dependencies and can be run by
165    /// the event executors.
166    leaves: Arc<Mutex<Vec<RunnableEvent>>>,
167    /// The `run_queue` is the queue that maintains the events to be executed next. Note that this
168    /// is different from the `leaves` because a leaf is only removed once its marked as complete.
169    run_queue: Arc<Mutex<BinaryHeap<RunnableEvent>>>,
170}
171
172impl ExecutionLattice {
173    /// Creates a new instance of `ExecutionLattice`.
174    pub fn new() -> Self {
175        ExecutionLattice {
176            forest: Arc::new(Mutex::new(StableGraph::new())),
177            leaves: Arc::new(Mutex::new(Vec::new())),
178            run_queue: Arc::new(Mutex::new(BinaryHeap::new())),
179        }
180    }
181
182    /// Add a batch of events to the lattice.
183    ///
184    /// This function moves the passed events into the lattice, and inserts the appropriate edges
185    /// to existing events in the graph based on the partial order defined in [`OperatorEvent`].
186    pub async fn add_events(&self, events: Vec<OperatorEvent>) {
187        // Take locks over everything.
188        let mut forest = self.forest.lock().await;
189        let mut leaves = self.leaves.lock().await;
190        let mut run_queue = self.run_queue.lock().await;
191
192        // If add_events becomes a bottleneck, look into changing the insertion algorithm to
193        // perform only 1 DFS instead of 1 per event. This could lead to more complex code to deal
194        // with dependency interactions among the batch of inserted events.
195        for added_event in events {
196            // Begins a DFS from each leaf, traversing the graph in the opposite direction of the
197            // edges. The purpose of this search is to find where new edges representing
198            // dependencies must be added for the new event.
199            // If the DFS becomes a performance bottleneck, consider searching from the roots of
200            // the forest as an optimization under the assumption that newly inserted events will
201            // likely depend on blocked already-inserted events.
202            let mut dfs_from_leaf = DfsPostOrder::empty(Reversed(&*forest));
203            // Caches preceding events to avoid adding redundant dependencies.
204            // For example, A -> C is redundant if A -> B -> C.
205            let mut preceding_events: HashSet<NodeIndex<u32>> = HashSet::new();
206            // The added event depends on these nodes.
207            let mut children: HashSet<NodeIndex<u32>> = HashSet::new();
208            // Other nodes depend on the added event.
209            let mut parents: HashSet<NodeIndex<u32>> = HashSet::new();
210            // These nodes are no longer leaves after the added event is inserted into the graph.
211            let mut demoted_leaves: Vec<NodeIndex> = Vec::new();
212            // These edges are redundant and must be removed.
213            let mut redundant_edges: Vec<EdgeIndex> = Vec::new();
214            // Iterate through the forest with a DFS from each leaf to figure out where to add
215            // dependency edges.
216            'dfs_leaves: for leaf in leaves.iter() {
217                // Begin a reverse postorder DFS from the specified leaf.
218                dfs_from_leaf.move_to(leaf.node_index);
219                while let Some(visited_node_idx) = dfs_from_leaf.next(Reversed(&*forest)) {
220                    // If any of the current node's parents already precede the added event, then
221                    // the current node also precedes the added event. Due to the reverse
222                    // post-order DFS from the leaves, the added event must already depend on an
223                    // ancenstor of the current node so we can skip this node because the
224                    // dependency is already established.
225                    for parent in forest.neighbors_directed(visited_node_idx, Direction::Incoming) {
226                        if preceding_events.contains(&parent) {
227                            preceding_events.insert(visited_node_idx);
228                            continue 'dfs_leaves;
229                        }
230                    }
231
232                    if let Some(visited_event) =
233                        forest.node_weight(visited_node_idx).unwrap().as_ref()
234                    {
235                        match added_event.cmp(visited_event) {
236                            Ordering::Less => {
237                                // The visited event depends on the added event.
238                                // Add a dependency to the added event if the current node is a
239                                // leaf.  Otherwise, the dependency is resolved in the current
240                                // node's descendants.
241                                if forest
242                                    .neighbors_directed(visited_node_idx, Direction::Outgoing)
243                                    .count()
244                                    == 0
245                                {
246                                    parents.insert(visited_node_idx);
247                                    for n in run_queue.iter() {
248                                        if n.node_index.index() == visited_node_idx.index() {
249                                            demoted_leaves.push(n.node_index);
250                                        }
251                                    }
252                                }
253                            }
254                            Ordering::Equal => {
255                                // There are no dependencies between current event and added event.
256                                // Add dependencies from the parents of the visited node to the
257                                // added event.
258                                for parent_idx in
259                                    forest.neighbors_directed(visited_node_idx, Direction::Incoming)
260                                {
261                                    let parent_event =
262                                        forest.node_weight(parent_idx).unwrap().as_ref().unwrap();
263                                    if parent_event > &added_event {
264                                        // The added event precedes the parent, so the parent
265                                        // depends on the added event.
266                                        parents.insert(parent_idx);
267                                    }
268                                }
269                            }
270                            Ordering::Greater => {
271                                // The added event depends on the visited event.
272                                children.insert(visited_node_idx);
273                                preceding_events.insert(visited_node_idx);
274                                // Add dependencies from the parents of the visited node to the
275                                // added event.  Also, note edges that become redundant for
276                                // removal.
277                                for parent_idx in
278                                    forest.neighbors_directed(visited_node_idx, Direction::Incoming)
279                                {
280                                    let parent_event =
281                                        forest.node_weight(parent_idx).unwrap().as_ref().unwrap();
282                                    if parent_event > &added_event {
283                                        // The added event precedes the parent, so the parent
284                                        // depends on the added event.
285                                        parents.insert(parent_idx);
286                                        // Edge from parent to visited node becomes redundant.
287                                        let redundant_edge =
288                                            forest.find_edge(parent_idx, visited_node_idx).unwrap();
289                                        redundant_edges.push(redundant_edge);
290                                    }
291                                }
292                            }
293                        };
294                    } else {
295                        // Reached a node that is already executing, but hasn't been completed.
296                        // The current node will probably get added as a leaf. Add dependencies
297                        // between parents and current event.
298                        for parent in
299                            forest.neighbors_directed(visited_node_idx, Direction::Incoming)
300                        {
301                            let parent_node = forest.node_weight(parent).unwrap().as_ref().unwrap();
302                            if parent_node > &added_event {
303                                parents.insert(parent);
304                            }
305                        }
306                    }
307                }
308            }
309
310            // Add the node into the forest.
311            let event_timestamp: Timestamp = added_event.timestamp.clone();
312            let event_idx: NodeIndex<u32> = forest.add_node(Some(added_event));
313
314            // Add edges indicating dependencies.
315            for child in children {
316                forest.add_edge(event_idx, child, ());
317            }
318            for parent in parents {
319                forest.add_edge(parent, event_idx, ());
320            }
321
322            // Remove redundant edges
323            for redundant_edge in redundant_edges {
324                forest.remove_edge(redundant_edge).unwrap();
325            }
326
327            // Clean up the leaves and the run queue, if any.
328            // TODO (Sukrit) :: BinaryHeap does not provide a way to remove an element that is not
329            // at the top of the heap. So, this particularly costly implementation clones the
330            // elements out of the earlier run_queue, clears the run_queue and initializes it
331            // afresh with the set difference of the old run_queue and the nodes to remove.
332            // Since the invocation of this code is hopefully rare, we can optimize it later.
333            if !demoted_leaves.is_empty() {
334                leaves.retain(|event| !demoted_leaves.contains(&event.node_index));
335                // Reconstruct the run queue.
336                let old_run_queue: Vec<RunnableEvent> = run_queue.drain().collect();
337                for event in old_run_queue {
338                    if !demoted_leaves.contains(&event.node_index) {
339                        run_queue.push(event);
340                    }
341                }
342            }
343
344            // If the added event depends on no others, then we can safely create a new leaf in the
345            // forest and add the event to the run queue.
346            if preceding_events.is_empty() {
347                leaves.push(RunnableEvent::new(event_idx).with_timestamp(event_timestamp.clone()));
348                run_queue.push(RunnableEvent::new(event_idx).with_timestamp(event_timestamp));
349            }
350        }
351
352        if forest.node_count() > 100 {
353            tracing::warn!(
354                "{} operator events queued in lattice. Increase number of operator executors or \
355                decrease incoming message frequency to reduce load.",
356                forest.node_count()
357            )
358        }
359    }
360
361    /// Retrieve an event to be executed from the lattice.
362    ///
363    /// This function retrieves an event that is not being executed by any other executor, along
364    /// with a unique identifier for the event. This unique identifier needs to be passed to the
365    /// [`ExecutionLattice::mark_as_completed`] function to remove the event from the lattice, and
366    /// ensure that its dependencies are runnable.
367    pub async fn get_event(&self) -> Option<(OperatorEvent, usize)> {
368        // Take locks over everything.
369        let mut forest = self.forest.lock().await;
370        let _leaves = self.leaves.lock().await;
371        let mut run_queue = self.run_queue.lock().await;
372
373        // Retrieve the event
374        match run_queue.pop() {
375            Some(runnable_event) => {
376                let event = forest[runnable_event.node_index].take();
377                Some((event.unwrap(), runnable_event.node_index.index()))
378            }
379            None => None,
380        }
381    }
382
383    /// Mark an event as completed, and break the dependency from this event to its parents.
384    ///
385    /// `event_id` is the unique identifer returned by the [`ExecutionLattice::get_event`]
386    /// invocation.
387    pub async fn mark_as_completed(&self, event_id: usize) {
388        // Take locks over everything.
389        let mut forest = self.forest.lock().await;
390        let mut leaves = self.leaves.lock().await;
391        let mut run_queue = self.run_queue.lock().await;
392
393        let node_idx: NodeIndex<u32> = NodeIndex::new(event_id);
394
395        // Throw an error if the item was not in the leaves.
396        let event = RunnableEvent::new(node_idx);
397        let event_idx = leaves
398            .iter()
399            .position(|e| e == &event)
400            .expect("Item must be in the leaves of the lattice.");
401        leaves.remove(event_idx);
402
403        // Capture the parents of the node.
404        let parent_ids: Vec<NodeIndex> = forest
405            .neighbors_directed(node_idx, Direction::Incoming)
406            .collect();
407
408        // Remove the node from the graph. This will also remove edges from the parents.
409        forest.remove_node(node_idx);
410
411        // Promote parents to leaves if they have no dependencies, and add their corresponding
412        // events to the run queue.
413        for parent_id in parent_ids {
414            if forest
415                .neighbors_directed(parent_id, Direction::Outgoing)
416                .count()
417                == 0
418            {
419                let timestamp: Timestamp = forest[parent_id].as_ref().unwrap().timestamp.clone();
420                let parent = RunnableEvent::new(parent_id).with_timestamp(timestamp);
421                leaves.push(parent.clone());
422                run_queue.push(parent);
423            }
424        }
425    }
426
427    /// Convert graph to string in DOT format.
428    #[allow(dead_code)]
429    pub async fn to_dot(&self) -> String {
430        // Lock the graph.
431        let forest = self.forest.lock().await;
432        let leaves = self.leaves.lock().await;
433        let graph = forest.map(
434            |nx, n| {
435                n.as_ref().map_or_else(
436                    || {
437                        leaves
438                            .iter()
439                            .find(|r| r.node_index == nx)
440                            .map_or_else(|| "Executing".to_string(), |r| format!("Executing {}", r))
441                    },
442                    |x| x.to_string(),
443                )
444            },
445            |_, e| *e,
446        );
447        format!(
448            "{:?}",
449            Dot::with_config(&graph, &[dot::Config::EdgeNoLabel])
450        )
451    }
452}
453
454impl fmt::Debug for ExecutionLattice {
455    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
456        write!(f, "ExecutionLattice")
457    }
458}
459
460/*
461#[cfg(test)]
462mod test {
463    use super::*;
464    use crate::dataflow::Timestamp;
465    use futures::executor::block_on;
466
467    /// Test that a leaf gets added correctly to an empty lattice and that we can retrieve it from
468    /// the lattice.
469    #[test]
470    fn test_leaf_addition() {
471        let lattice: ExecutionLattice = ExecutionLattice::new();
472        let events = vec![OperatorEvent::new(
473            Timestamp::Time(vec![1]),
474            false,
475            0,
476            HashSet::new(),
477            HashSet::new(),
478            || (),
479        )];
480        block_on(lattice.add_events(events));
481
482        // Ensure that the correct event is returned by the lattice.
483        let (event, _event_id) = block_on(lattice.get_event()).unwrap();
484        assert_eq!(
485            event.timestamp,
486            Timestamp::Time(vec![1 as u64]),
487            "The wrong event was returned by the lattice."
488        );
489
490        // Ensure that only one event is returned by the lattice.
491        let next_event = block_on(lattice.get_event());
492        assert!(next_event.is_none(), "Expected no event from the lattice.");
493    }
494
495    /// Test that the addition of two messages of the same timestamp leads to no dependencies.
496    #[test]
497    fn test_concurrent_messages() {
498        let lattice: ExecutionLattice = ExecutionLattice::new();
499        let events = vec![
500            OperatorEvent::new(
501                Timestamp::Time(vec![1]),
502                false,
503                0,
504                HashSet::new(),
505                HashSet::new(),
506                || (),
507            ),
508            OperatorEvent::new(
509                Timestamp::Time(vec![1]),
510                false,
511                0,
512                HashSet::new(),
513                HashSet::new(),
514                || (),
515            ),
516        ];
517        block_on(lattice.add_events(events));
518
519        // Check the first event is returned correctly by the lattice.
520        let (event, _event_id) = block_on(lattice.get_event()).unwrap();
521        assert_eq!(
522            event.timestamp,
523            Timestamp::Time(vec![1 as u64]),
524            "The wrong event was returned by the lattice."
525        );
526
527        // Check that the other event is returned without marking the first one as completed.
528        // This shows that they can be executed concurrently.
529        let (event_2, _event_id_2) = block_on(lattice.get_event()).unwrap();
530        assert_eq!(
531            event_2.timestamp,
532            Timestamp::Time(vec![1 as u64]),
533            "The wrong event was returned by the lattice."
534        );
535    }
536
537    /// Test that the addition of two messages of same timestamp, with their watermark ensures that
538    /// the watermark runs after both of the messages are marked as finished executing.
539    #[test]
540    fn test_watermark_post_concurrent_messages() {
541        let lattice: ExecutionLattice = ExecutionLattice::new();
542        let events = vec![
543            OperatorEvent::new(
544                Timestamp::Time(vec![1]),
545                false,
546                0,
547                HashSet::new(),
548                HashSet::new(),
549                || (),
550            ),
551            OperatorEvent::new(
552                Timestamp::Time(vec![1]),
553                false,
554                0,
555                HashSet::new(),
556                HashSet::new(),
557                || (),
558            ),
559            OperatorEvent::new(
560                Timestamp::Time(vec![1]),
561                true,
562                0,
563                HashSet::new(),
564                HashSet::new(),
565                || (),
566            ),
567        ];
568        block_on(lattice.add_events(events));
569        // Check that the first event is returned correctly by the lattice.
570        let (event, event_id) = block_on(lattice.get_event()).unwrap();
571        assert!(
572            event.timestamp == Timestamp::Time(vec![1 as u64]) && !event.is_watermark_callback,
573            "The wrong event was returned by the lattice."
574        );
575
576        // Check that the first event is returned correctly by the lattice.
577        let (event_2, event_id_2) = block_on(lattice.get_event()).unwrap();
578        assert!(
579            event_2.timestamp == Timestamp::Time(vec![1 as u64]) && !event.is_watermark_callback,
580            "The wrong event was returned by the lattice."
581        );
582        let no_event = block_on(lattice.get_event());
583        assert!(no_event.is_none(), "Expected no event from the lattice.");
584
585        // Mark one of the event as completed, and still don't expect an event.
586        block_on(lattice.mark_as_completed(event_id));
587
588        let no_event_2 = block_on(lattice.get_event());
589        assert!(no_event_2.is_none(), "Expected no event from the lattice.");
590
591        // Mark the other as completed and expect a Watermark.
592        block_on(lattice.mark_as_completed(event_id_2));
593
594        let (event_3, _event_id_3) = block_on(lattice.get_event()).unwrap();
595        assert!(
596            event_3.timestamp == Timestamp::Time(vec![1 as u64]) && event_3.is_watermark_callback,
597            "The wrong event was returned by the lattice."
598        );
599    }
600
601    /// Test that the addition of three watermark messages in reverse order, leads to them being
602    /// executed in the correct order.
603    #[test]
604    fn test_unordered_watermark() {
605        let lattice: ExecutionLattice = ExecutionLattice::new();
606        let events = vec![
607            OperatorEvent::new(
608                Timestamp::Time(vec![3]),
609                true,
610                0,
611                HashSet::new(),
612                HashSet::new(),
613                || (),
614            ),
615            OperatorEvent::new(
616                Timestamp::Time(vec![2]),
617                true,
618                0,
619                HashSet::new(),
620                HashSet::new(),
621                || (),
622            ),
623            OperatorEvent::new(
624                Timestamp::Time(vec![1]),
625                true,
626                0,
627                HashSet::new(),
628                HashSet::new(),
629                || (),
630            ),
631        ];
632        block_on(lattice.add_events(events));
633
634        let (event, event_id) = block_on(lattice.get_event()).unwrap();
635        assert_eq!(
636            event.timestamp,
637            Timestamp::Time(vec![1 as u64]),
638            "The wrong event was returned by the lattice."
639        );
640        assert!(
641            block_on(lattice.get_event()).is_none(),
642            "The wrong event was returned by the lattice."
643        );
644        block_on(lattice.mark_as_completed(event_id));
645        let (event_2, event_id_2) = block_on(lattice.get_event()).unwrap();
646        assert_eq!(
647            event_2.timestamp,
648            Timestamp::Time(vec![2 as u64]),
649            "The wrong event was returned by the lattice."
650        );
651        assert!(
652            block_on(lattice.get_event()).is_none(),
653            "The wrong event was returned by the lattice."
654        );
655        block_on(lattice.mark_as_completed(event_id_2));
656        let (event_3, _event_id_3) = block_on(lattice.get_event()).unwrap();
657        assert_eq!(
658            event_3.timestamp,
659            Timestamp::Time(vec![3 as u64]),
660            "The wrong event was returned by the lattice."
661        );
662        assert!(
663            block_on(lattice.get_event()).is_none(),
664            "The wrong event was returned by the lattice."
665        );
666    }
667
668    /// Test that the addition of messages of different timestamps leads to concurrent execution.
669    #[test]
670    fn test_concurrent_messages_diff_timestamps() {
671        let lattice: ExecutionLattice = ExecutionLattice::new();
672        let events = vec![
673            OperatorEvent::new(
674                Timestamp::Time(vec![3]),
675                false,
676                0,
677                HashSet::new(),
678                HashSet::new(),
679                || (),
680            ),
681            OperatorEvent::new(
682                Timestamp::Time(vec![2]),
683                false,
684                0,
685                HashSet::new(),
686                HashSet::new(),
687                || (),
688            ),
689            OperatorEvent::new(
690                Timestamp::Time(vec![1]),
691                false,
692                0,
693                HashSet::new(),
694                HashSet::new(),
695                || (),
696            ),
697        ];
698        block_on(lattice.add_events(events));
699
700        let (event, _event_id) = block_on(lattice.get_event()).unwrap();
701        assert_eq!(
702            event.timestamp,
703            Timestamp::Time(vec![1 as u64]),
704            "The wrong event was returned by the lattice."
705        );
706        let (event_2, _event_id_2) = block_on(lattice.get_event()).unwrap();
707        assert_eq!(
708            event_2.timestamp,
709            Timestamp::Time(vec![2 as u64]),
710            "The wrong event was returned by the lattice."
711        );
712        let (event_3, _event_id_3) = block_on(lattice.get_event()).unwrap();
713        assert_eq!(
714            event_3.timestamp,
715            Timestamp::Time(vec![3 as u64]),
716            "The wrong event was returned by the lattice."
717        );
718    }
719
720    /// Test that concurrent messages are followed by their watermarks.
721    #[test]
722    fn test_concurrent_messages_watermarks_diff_timestamps() {
723        let lattice: ExecutionLattice = ExecutionLattice::new();
724        let events = vec![
725            OperatorEvent::new(
726                Timestamp::Time(vec![3]),
727                true,
728                0,
729                HashSet::new(),
730                HashSet::new(),
731                || (),
732            ),
733            OperatorEvent::new(
734                Timestamp::Time(vec![2]),
735                true,
736                0,
737                HashSet::new(),
738                HashSet::new(),
739                || (),
740            ),
741            OperatorEvent::new(
742                Timestamp::Time(vec![1]),
743                true,
744                0,
745                HashSet::new(),
746                HashSet::new(),
747                || (),
748            ),
749            OperatorEvent::new(
750                Timestamp::Time(vec![1]),
751                false,
752                0,
753                HashSet::new(),
754                HashSet::new(),
755                || (),
756            ),
757            OperatorEvent::new(
758                Timestamp::Time(vec![2]),
759                false,
760                0,
761                HashSet::new(),
762                HashSet::new(),
763                || (),
764            ),
765            OperatorEvent::new(
766                Timestamp::Time(vec![3]),
767                false,
768                0,
769                HashSet::new(),
770                HashSet::new(),
771                || (),
772            ),
773        ];
774        block_on(lattice.add_events(events));
775        let (event, event_id) = block_on(lattice.get_event()).unwrap();
776        assert!(
777            event.timestamp == Timestamp::Time(vec![1 as u64]) && !event.is_watermark_callback,
778            "The wrong event was returned by the lattice."
779        );
780        let (event_2, event_id_2) = block_on(lattice.get_event()).unwrap();
781        assert!(
782            event_2.timestamp == Timestamp::Time(vec![2 as u64]) && !event_2.is_watermark_callback,
783            "The wrong event was returned by the lattice."
784        );
785        let (event_3, event_id_3) = block_on(lattice.get_event()).unwrap();
786        assert!(
787            event_3.timestamp == Timestamp::Time(vec![3 as u64]) && !event_3.is_watermark_callback,
788            "The wrong event was returned by the lattice."
789        );
790        assert!(
791            block_on(lattice.get_event()).is_none(),
792            "The wrong event was returned by the lattice."
793        );
794        block_on(lattice.mark_as_completed(event_id));
795        let (event_4, event_id_4) = block_on(lattice.get_event()).unwrap();
796        assert!(
797            event_4.timestamp == Timestamp::Time(vec![1 as u64]) && event_4.is_watermark_callback,
798            "The wrong event was returned by the lattice."
799        );
800        assert!(
801            block_on(lattice.get_event()).is_none(),
802            "The wrong event was returned by the lattice."
803        );
804        block_on(lattice.mark_as_completed(event_id_4));
805        assert!(
806            block_on(lattice.get_event()).is_none(),
807            "The wrong event was returned by the lattice."
808        );
809        block_on(lattice.mark_as_completed(event_id_2));
810        let (event_5, event_id_5) = block_on(lattice.get_event()).unwrap();
811        assert!(
812            event_5.timestamp == Timestamp::Time(vec![2 as u64]) && event_5.is_watermark_callback,
813            "The wrong event was returned by the lattice."
814        );
815        block_on(lattice.mark_as_completed(event_id_3));
816        assert!(
817            block_on(lattice.get_event()).is_none(),
818            "The wrong event was returned by the lattice."
819        );
820        block_on(lattice.mark_as_completed(event_id_5));
821        let (event_6, event_id_6) = block_on(lattice.get_event()).unwrap();
822        assert!(
823            event_6.timestamp == Timestamp::Time(vec![3 as u64]) && event_6.is_watermark_callback,
824            "The wrong event was returned by the lattice."
825        );
826        block_on(lattice.mark_as_completed(event_id_6));
827        assert!(
828            block_on(lattice.get_event()).is_none(),
829            "The wrong event was returned by the lattice."
830        );
831    }
832
833    /// Tests that duplicate events do not end up in the lattice's leaves or
834    /// run queue. This can happen if duplicate edges exist in the dependency
835    /// graph.
836    #[test]
837    fn test_no_duplicates() {
838        let lattice = ExecutionLattice::new();
839        // Add 2 operators that can run concurrently.
840        let initial_events = vec![
841            OperatorEvent::new(
842                Timestamp::Time(vec![0]),
843                false,
844                0,
845                HashSet::new(),
846                HashSet::new(),
847                || {},
848            ),
849            OperatorEvent::new(
850                Timestamp::Time(vec![0]),
851                false,
852                0,
853                HashSet::new(),
854                HashSet::new(),
855                || {},
856            ),
857        ];
858        block_on(lattice.add_events(initial_events));
859
860        // Generate events A and B where B precedes A.
861        let event_a = OperatorEvent::new(
862            Timestamp::Time(vec![0]),
863            true,
864            20,
865            HashSet::new(),
866            HashSet::new(),
867            || {},
868        );
869        let event_b = OperatorEvent::new(
870            Timestamp::Time(vec![0]),
871            true,
872            0,
873            HashSet::new(),
874            HashSet::new(),
875            || {},
876        );
877        assert!(event_a > event_b, "Event B must precede event A.");
878
879        // Insert events in reverse order. Due to how the traversal of the
880        // dependency graph is performed, this can result in duplicate edges
881        // when using vectors instead of sets to store an inserted event's
882        // parents and children. Duplicate edges may result in duplicate
883        // attempts to run the same event.
884        block_on(lattice.add_events(vec![event_a]));
885        block_on(lattice.add_events(vec![event_b]));
886        // Dependency graph should be:
887        //        -> C
888        // A -> B
889        //        -> D
890
891        // Run events C and D
892        let (event_1, event_1_id) = block_on(lattice.get_event()).unwrap();
893        let (event_2, event_2_id) = block_on(lattice.get_event()).unwrap();
894        assert!(
895            !event_1.is_watermark_callback,
896            "Should process events C and D before watermark callbacks."
897        );
898        assert!(
899            !event_2.is_watermark_callback,
900            "Should process events C and D before watermark callbacks."
901        );
902        assert!(
903            block_on(lattice.get_event()).is_none(),
904            "No other events should run until C and D complete."
905        );
906        block_on(lattice.mark_as_completed(event_1_id));
907        assert!(
908            block_on(lattice.get_event()).is_none(),
909            "No other events should run until C and D complete."
910        );
911        block_on(lattice.mark_as_completed(event_2_id));
912
913        // Run event B.
914        let (event_b, event_b_id) = block_on(lattice.get_event()).unwrap();
915        assert_eq!(
916            event_b.priority, 0,
917            "Event B should run after events C and D."
918        );
919        assert!(
920            block_on(lattice.get_event()).is_none(),
921            "A should not run until B completes."
922        );
923        block_on(lattice.mark_as_completed(event_b_id));
924
925        // Run event A.
926        let (_event_a, event_a_id) = block_on(lattice.get_event()).unwrap();
927        block_on(lattice.mark_as_completed(event_a_id));
928
929        // No more events should be in the lattice.
930        assert!(
931            block_on(lattice.get_event()).is_none(),
932            "There should be no more events in the lattice."
933        );
934    }
935}
936*/