erdos/node/lattice.rs
1use std::{
2 cmp::Ordering,
3 collections::{BinaryHeap, HashSet},
4 fmt,
5 sync::Arc,
6};
7
8use futures::lock::Mutex;
9use petgraph::{
10 dot::{self, Dot},
11 stable_graph::{EdgeIndex, NodeIndex, StableGraph},
12 visit::{DfsPostOrder, Reversed},
13 Direction,
14};
15
16use crate::{dataflow::Timestamp, node::operator_event::OperatorEvent};
17
18/// `RunnableEvent` is a data structure that is used to represent an event that is ready to be
19/// executed.
20///
21/// A `RunnableEvent` is essentially an index into the lattice, with additional metadata to
22/// prioritize events that are ready to run.
23#[derive(Clone)]
24pub struct RunnableEvent {
25 /// The `node_index` is the index of the runnable event in the lattice.
26 node_index: NodeIndex<u32>,
27 /// The `timestamp` is the timestamp of the event indexed by the id.
28 timestamp: Option<Timestamp>,
29}
30
31impl RunnableEvent {
32 /// Creates a new instance of `RunnableEvent`.
33 pub fn new(node_index: NodeIndex<u32>) -> Self {
34 RunnableEvent {
35 node_index,
36 timestamp: None,
37 }
38 }
39
40 /// Adds a [`Timestamp`] to the event.
41 pub fn with_timestamp(mut self, timestamp: Timestamp) -> Self {
42 self.timestamp = Some(timestamp);
43 self
44 }
45}
46
47// Implement the `Display` and `Debug` traits so that we can visualize the event.
48impl fmt::Display for RunnableEvent {
49 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
50 write!(
51 f,
52 "RunnableEvent(index: {}, Timestamp: {:?}",
53 self.node_index.index(),
54 self.timestamp
55 )
56 }
57}
58
59impl fmt::Debug for RunnableEvent {
60 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
61 write!(
62 f,
63 "RunnableEvent(index: {}, Timestamp: {:?}",
64 self.node_index.index(),
65 self.timestamp
66 )
67 }
68}
69
70// Implement the equality criteria for a RunnableEvent.
71impl Eq for RunnableEvent {}
72
73impl PartialEq for RunnableEvent {
74 // Two events are equal iff they are the same i.e. same index into the lattice.
75 fn eq(&self, other: &RunnableEvent) -> bool {
76 self.node_index == other.node_index
77 }
78}
79
80// Implement the Ordering for a RunnableEvent.
81impl Ord for RunnableEvent {
82 fn cmp(&self, other: &RunnableEvent) -> Ordering {
83 match (self.timestamp.as_ref(), other.timestamp.as_ref()) {
84 (Some(ts1), Some(ts2)) => match ts1.cmp(ts2) {
85 Ordering::Less => Ordering::Greater,
86 Ordering::Greater => Ordering::Less,
87 Ordering::Equal => {
88 // Break ties with the order of insertion into the lattice.
89 self.node_index
90 .index()
91 .cmp(&other.node_index.index())
92 .reverse()
93 }
94 },
95 _ => {
96 // We don't have enough information about the timestamps.
97 // Order based on the order of insertion into the lattice.
98 self.node_index
99 .index()
100 .cmp(&other.node_index.index())
101 .reverse()
102 }
103 }
104 }
105}
106
107impl PartialOrd for RunnableEvent {
108 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
109 Some(self.cmp(other))
110 }
111}
112
113/// `ExecutionLattice` is a data structure that maintains [`OperatorEvent`]s in a
114/// [dependency graph](https://en.wikipedia.org/wiki/Dependency_graph) according to the partial
115/// order defined.
116///
117/// Events can be added to the lattice using the `add_events` function, and retrieved using the
118/// `get_event` function. The lattice requires a notification of the completion of the event using
119/// the `mark_as_completed` function in order to unblock dependent events, and make them runnable.
120///
121/// # Example
122/// The below example shows how to insert events into the Lattice and retrieve runnable events from
123/// the lattice.
124/// ```ignore
125/// use erdos::node::{operator_event::OperatorEvent, lattice::ExecutionLattice};
126/// use erdos::dataflow::Timestamp;
127/// use futures::executor::block_on;
128///
129/// async fn async_main() {
130/// let mut lattice: ExecutionLattice = ExecutionLattice::new();
131///
132/// // Add two events of timestamp 1 and 2 to the lattice with empty callbacks.
133/// events = vec![
134/// OperatorEvent::new(Timestamp::Time(vec![1]),
135/// true, 0, HashSet::new(), HashSet::new(), || ())
136/// OperatorEvent::new(Timestamp::Time(vec![2]),
137/// true, 0, HashSet::new(), HashSet::new(), || ())
138/// ];
139/// lattice.add_events(events).await;
140///
141/// // Retrieve the first event from the lattice.
142/// let (event_1, event_id_1) = lattice.get_event().await.unwrap();
143///
144/// // If we try to retrieve another event, we get None since we haven't marked the
145/// // completion of the event with timestamp 1.
146/// assert_eq!(lattice.get_event().await.is_none(), true);
147///
148/// // Mark the first event as completed.
149/// lattice.mark_as_completed(event_id_1).await;
150///
151/// // Now, get the second event from the lattice.
152/// let (event_2, event_id_2) = lattice.get_event().await.unwrap();
153/// }
154///
155/// fn main() {
156/// block_on(async_main());
157/// }
158/// ```
159pub struct ExecutionLattice {
160 /// The `forest` is the directed acyclic graph that maintains the dependency graph of the
161 /// events. The relation A -> B means that A *depends on* B.This dependency also indicates that
162 /// B precedes A (B < A) in the ordering. An event can be executed if it has no outbound edges.
163 forest: Arc<Mutex<StableGraph<Option<OperatorEvent>, ()>>>,
164 /// The `leaves` are the leaves of the forest of graphs, have no dependencies and can be run by
165 /// the event executors.
166 leaves: Arc<Mutex<Vec<RunnableEvent>>>,
167 /// The `run_queue` is the queue that maintains the events to be executed next. Note that this
168 /// is different from the `leaves` because a leaf is only removed once its marked as complete.
169 run_queue: Arc<Mutex<BinaryHeap<RunnableEvent>>>,
170}
171
172impl ExecutionLattice {
173 /// Creates a new instance of `ExecutionLattice`.
174 pub fn new() -> Self {
175 ExecutionLattice {
176 forest: Arc::new(Mutex::new(StableGraph::new())),
177 leaves: Arc::new(Mutex::new(Vec::new())),
178 run_queue: Arc::new(Mutex::new(BinaryHeap::new())),
179 }
180 }
181
182 /// Add a batch of events to the lattice.
183 ///
184 /// This function moves the passed events into the lattice, and inserts the appropriate edges
185 /// to existing events in the graph based on the partial order defined in [`OperatorEvent`].
186 pub async fn add_events(&self, events: Vec<OperatorEvent>) {
187 // Take locks over everything.
188 let mut forest = self.forest.lock().await;
189 let mut leaves = self.leaves.lock().await;
190 let mut run_queue = self.run_queue.lock().await;
191
192 // If add_events becomes a bottleneck, look into changing the insertion algorithm to
193 // perform only 1 DFS instead of 1 per event. This could lead to more complex code to deal
194 // with dependency interactions among the batch of inserted events.
195 for added_event in events {
196 // Begins a DFS from each leaf, traversing the graph in the opposite direction of the
197 // edges. The purpose of this search is to find where new edges representing
198 // dependencies must be added for the new event.
199 // If the DFS becomes a performance bottleneck, consider searching from the roots of
200 // the forest as an optimization under the assumption that newly inserted events will
201 // likely depend on blocked already-inserted events.
202 let mut dfs_from_leaf = DfsPostOrder::empty(Reversed(&*forest));
203 // Caches preceding events to avoid adding redundant dependencies.
204 // For example, A -> C is redundant if A -> B -> C.
205 let mut preceding_events: HashSet<NodeIndex<u32>> = HashSet::new();
206 // The added event depends on these nodes.
207 let mut children: HashSet<NodeIndex<u32>> = HashSet::new();
208 // Other nodes depend on the added event.
209 let mut parents: HashSet<NodeIndex<u32>> = HashSet::new();
210 // These nodes are no longer leaves after the added event is inserted into the graph.
211 let mut demoted_leaves: Vec<NodeIndex> = Vec::new();
212 // These edges are redundant and must be removed.
213 let mut redundant_edges: Vec<EdgeIndex> = Vec::new();
214 // Iterate through the forest with a DFS from each leaf to figure out where to add
215 // dependency edges.
216 'dfs_leaves: for leaf in leaves.iter() {
217 // Begin a reverse postorder DFS from the specified leaf.
218 dfs_from_leaf.move_to(leaf.node_index);
219 while let Some(visited_node_idx) = dfs_from_leaf.next(Reversed(&*forest)) {
220 // If any of the current node's parents already precede the added event, then
221 // the current node also precedes the added event. Due to the reverse
222 // post-order DFS from the leaves, the added event must already depend on an
223 // ancenstor of the current node so we can skip this node because the
224 // dependency is already established.
225 for parent in forest.neighbors_directed(visited_node_idx, Direction::Incoming) {
226 if preceding_events.contains(&parent) {
227 preceding_events.insert(visited_node_idx);
228 continue 'dfs_leaves;
229 }
230 }
231
232 if let Some(visited_event) =
233 forest.node_weight(visited_node_idx).unwrap().as_ref()
234 {
235 match added_event.cmp(visited_event) {
236 Ordering::Less => {
237 // The visited event depends on the added event.
238 // Add a dependency to the added event if the current node is a
239 // leaf. Otherwise, the dependency is resolved in the current
240 // node's descendants.
241 if forest
242 .neighbors_directed(visited_node_idx, Direction::Outgoing)
243 .count()
244 == 0
245 {
246 parents.insert(visited_node_idx);
247 for n in run_queue.iter() {
248 if n.node_index.index() == visited_node_idx.index() {
249 demoted_leaves.push(n.node_index);
250 }
251 }
252 }
253 }
254 Ordering::Equal => {
255 // There are no dependencies between current event and added event.
256 // Add dependencies from the parents of the visited node to the
257 // added event.
258 for parent_idx in
259 forest.neighbors_directed(visited_node_idx, Direction::Incoming)
260 {
261 let parent_event =
262 forest.node_weight(parent_idx).unwrap().as_ref().unwrap();
263 if parent_event > &added_event {
264 // The added event precedes the parent, so the parent
265 // depends on the added event.
266 parents.insert(parent_idx);
267 }
268 }
269 }
270 Ordering::Greater => {
271 // The added event depends on the visited event.
272 children.insert(visited_node_idx);
273 preceding_events.insert(visited_node_idx);
274 // Add dependencies from the parents of the visited node to the
275 // added event. Also, note edges that become redundant for
276 // removal.
277 for parent_idx in
278 forest.neighbors_directed(visited_node_idx, Direction::Incoming)
279 {
280 let parent_event =
281 forest.node_weight(parent_idx).unwrap().as_ref().unwrap();
282 if parent_event > &added_event {
283 // The added event precedes the parent, so the parent
284 // depends on the added event.
285 parents.insert(parent_idx);
286 // Edge from parent to visited node becomes redundant.
287 let redundant_edge =
288 forest.find_edge(parent_idx, visited_node_idx).unwrap();
289 redundant_edges.push(redundant_edge);
290 }
291 }
292 }
293 };
294 } else {
295 // Reached a node that is already executing, but hasn't been completed.
296 // The current node will probably get added as a leaf. Add dependencies
297 // between parents and current event.
298 for parent in
299 forest.neighbors_directed(visited_node_idx, Direction::Incoming)
300 {
301 let parent_node = forest.node_weight(parent).unwrap().as_ref().unwrap();
302 if parent_node > &added_event {
303 parents.insert(parent);
304 }
305 }
306 }
307 }
308 }
309
310 // Add the node into the forest.
311 let event_timestamp: Timestamp = added_event.timestamp.clone();
312 let event_idx: NodeIndex<u32> = forest.add_node(Some(added_event));
313
314 // Add edges indicating dependencies.
315 for child in children {
316 forest.add_edge(event_idx, child, ());
317 }
318 for parent in parents {
319 forest.add_edge(parent, event_idx, ());
320 }
321
322 // Remove redundant edges
323 for redundant_edge in redundant_edges {
324 forest.remove_edge(redundant_edge).unwrap();
325 }
326
327 // Clean up the leaves and the run queue, if any.
328 // TODO (Sukrit) :: BinaryHeap does not provide a way to remove an element that is not
329 // at the top of the heap. So, this particularly costly implementation clones the
330 // elements out of the earlier run_queue, clears the run_queue and initializes it
331 // afresh with the set difference of the old run_queue and the nodes to remove.
332 // Since the invocation of this code is hopefully rare, we can optimize it later.
333 if !demoted_leaves.is_empty() {
334 leaves.retain(|event| !demoted_leaves.contains(&event.node_index));
335 // Reconstruct the run queue.
336 let old_run_queue: Vec<RunnableEvent> = run_queue.drain().collect();
337 for event in old_run_queue {
338 if !demoted_leaves.contains(&event.node_index) {
339 run_queue.push(event);
340 }
341 }
342 }
343
344 // If the added event depends on no others, then we can safely create a new leaf in the
345 // forest and add the event to the run queue.
346 if preceding_events.is_empty() {
347 leaves.push(RunnableEvent::new(event_idx).with_timestamp(event_timestamp.clone()));
348 run_queue.push(RunnableEvent::new(event_idx).with_timestamp(event_timestamp));
349 }
350 }
351
352 if forest.node_count() > 100 {
353 tracing::warn!(
354 "{} operator events queued in lattice. Increase number of operator executors or \
355 decrease incoming message frequency to reduce load.",
356 forest.node_count()
357 )
358 }
359 }
360
361 /// Retrieve an event to be executed from the lattice.
362 ///
363 /// This function retrieves an event that is not being executed by any other executor, along
364 /// with a unique identifier for the event. This unique identifier needs to be passed to the
365 /// [`ExecutionLattice::mark_as_completed`] function to remove the event from the lattice, and
366 /// ensure that its dependencies are runnable.
367 pub async fn get_event(&self) -> Option<(OperatorEvent, usize)> {
368 // Take locks over everything.
369 let mut forest = self.forest.lock().await;
370 let _leaves = self.leaves.lock().await;
371 let mut run_queue = self.run_queue.lock().await;
372
373 // Retrieve the event
374 match run_queue.pop() {
375 Some(runnable_event) => {
376 let event = forest[runnable_event.node_index].take();
377 Some((event.unwrap(), runnable_event.node_index.index()))
378 }
379 None => None,
380 }
381 }
382
383 /// Mark an event as completed, and break the dependency from this event to its parents.
384 ///
385 /// `event_id` is the unique identifer returned by the [`ExecutionLattice::get_event`]
386 /// invocation.
387 pub async fn mark_as_completed(&self, event_id: usize) {
388 // Take locks over everything.
389 let mut forest = self.forest.lock().await;
390 let mut leaves = self.leaves.lock().await;
391 let mut run_queue = self.run_queue.lock().await;
392
393 let node_idx: NodeIndex<u32> = NodeIndex::new(event_id);
394
395 // Throw an error if the item was not in the leaves.
396 let event = RunnableEvent::new(node_idx);
397 let event_idx = leaves
398 .iter()
399 .position(|e| e == &event)
400 .expect("Item must be in the leaves of the lattice.");
401 leaves.remove(event_idx);
402
403 // Capture the parents of the node.
404 let parent_ids: Vec<NodeIndex> = forest
405 .neighbors_directed(node_idx, Direction::Incoming)
406 .collect();
407
408 // Remove the node from the graph. This will also remove edges from the parents.
409 forest.remove_node(node_idx);
410
411 // Promote parents to leaves if they have no dependencies, and add their corresponding
412 // events to the run queue.
413 for parent_id in parent_ids {
414 if forest
415 .neighbors_directed(parent_id, Direction::Outgoing)
416 .count()
417 == 0
418 {
419 let timestamp: Timestamp = forest[parent_id].as_ref().unwrap().timestamp.clone();
420 let parent = RunnableEvent::new(parent_id).with_timestamp(timestamp);
421 leaves.push(parent.clone());
422 run_queue.push(parent);
423 }
424 }
425 }
426
427 /// Convert graph to string in DOT format.
428 #[allow(dead_code)]
429 pub async fn to_dot(&self) -> String {
430 // Lock the graph.
431 let forest = self.forest.lock().await;
432 let leaves = self.leaves.lock().await;
433 let graph = forest.map(
434 |nx, n| {
435 n.as_ref().map_or_else(
436 || {
437 leaves
438 .iter()
439 .find(|r| r.node_index == nx)
440 .map_or_else(|| "Executing".to_string(), |r| format!("Executing {}", r))
441 },
442 |x| x.to_string(),
443 )
444 },
445 |_, e| *e,
446 );
447 format!(
448 "{:?}",
449 Dot::with_config(&graph, &[dot::Config::EdgeNoLabel])
450 )
451 }
452}
453
454impl fmt::Debug for ExecutionLattice {
455 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
456 write!(f, "ExecutionLattice")
457 }
458}
459
460/*
461#[cfg(test)]
462mod test {
463 use super::*;
464 use crate::dataflow::Timestamp;
465 use futures::executor::block_on;
466
467 /// Test that a leaf gets added correctly to an empty lattice and that we can retrieve it from
468 /// the lattice.
469 #[test]
470 fn test_leaf_addition() {
471 let lattice: ExecutionLattice = ExecutionLattice::new();
472 let events = vec![OperatorEvent::new(
473 Timestamp::Time(vec![1]),
474 false,
475 0,
476 HashSet::new(),
477 HashSet::new(),
478 || (),
479 )];
480 block_on(lattice.add_events(events));
481
482 // Ensure that the correct event is returned by the lattice.
483 let (event, _event_id) = block_on(lattice.get_event()).unwrap();
484 assert_eq!(
485 event.timestamp,
486 Timestamp::Time(vec![1 as u64]),
487 "The wrong event was returned by the lattice."
488 );
489
490 // Ensure that only one event is returned by the lattice.
491 let next_event = block_on(lattice.get_event());
492 assert!(next_event.is_none(), "Expected no event from the lattice.");
493 }
494
495 /// Test that the addition of two messages of the same timestamp leads to no dependencies.
496 #[test]
497 fn test_concurrent_messages() {
498 let lattice: ExecutionLattice = ExecutionLattice::new();
499 let events = vec![
500 OperatorEvent::new(
501 Timestamp::Time(vec![1]),
502 false,
503 0,
504 HashSet::new(),
505 HashSet::new(),
506 || (),
507 ),
508 OperatorEvent::new(
509 Timestamp::Time(vec![1]),
510 false,
511 0,
512 HashSet::new(),
513 HashSet::new(),
514 || (),
515 ),
516 ];
517 block_on(lattice.add_events(events));
518
519 // Check the first event is returned correctly by the lattice.
520 let (event, _event_id) = block_on(lattice.get_event()).unwrap();
521 assert_eq!(
522 event.timestamp,
523 Timestamp::Time(vec![1 as u64]),
524 "The wrong event was returned by the lattice."
525 );
526
527 // Check that the other event is returned without marking the first one as completed.
528 // This shows that they can be executed concurrently.
529 let (event_2, _event_id_2) = block_on(lattice.get_event()).unwrap();
530 assert_eq!(
531 event_2.timestamp,
532 Timestamp::Time(vec![1 as u64]),
533 "The wrong event was returned by the lattice."
534 );
535 }
536
537 /// Test that the addition of two messages of same timestamp, with their watermark ensures that
538 /// the watermark runs after both of the messages are marked as finished executing.
539 #[test]
540 fn test_watermark_post_concurrent_messages() {
541 let lattice: ExecutionLattice = ExecutionLattice::new();
542 let events = vec![
543 OperatorEvent::new(
544 Timestamp::Time(vec![1]),
545 false,
546 0,
547 HashSet::new(),
548 HashSet::new(),
549 || (),
550 ),
551 OperatorEvent::new(
552 Timestamp::Time(vec![1]),
553 false,
554 0,
555 HashSet::new(),
556 HashSet::new(),
557 || (),
558 ),
559 OperatorEvent::new(
560 Timestamp::Time(vec![1]),
561 true,
562 0,
563 HashSet::new(),
564 HashSet::new(),
565 || (),
566 ),
567 ];
568 block_on(lattice.add_events(events));
569 // Check that the first event is returned correctly by the lattice.
570 let (event, event_id) = block_on(lattice.get_event()).unwrap();
571 assert!(
572 event.timestamp == Timestamp::Time(vec![1 as u64]) && !event.is_watermark_callback,
573 "The wrong event was returned by the lattice."
574 );
575
576 // Check that the first event is returned correctly by the lattice.
577 let (event_2, event_id_2) = block_on(lattice.get_event()).unwrap();
578 assert!(
579 event_2.timestamp == Timestamp::Time(vec![1 as u64]) && !event.is_watermark_callback,
580 "The wrong event was returned by the lattice."
581 );
582 let no_event = block_on(lattice.get_event());
583 assert!(no_event.is_none(), "Expected no event from the lattice.");
584
585 // Mark one of the event as completed, and still don't expect an event.
586 block_on(lattice.mark_as_completed(event_id));
587
588 let no_event_2 = block_on(lattice.get_event());
589 assert!(no_event_2.is_none(), "Expected no event from the lattice.");
590
591 // Mark the other as completed and expect a Watermark.
592 block_on(lattice.mark_as_completed(event_id_2));
593
594 let (event_3, _event_id_3) = block_on(lattice.get_event()).unwrap();
595 assert!(
596 event_3.timestamp == Timestamp::Time(vec![1 as u64]) && event_3.is_watermark_callback,
597 "The wrong event was returned by the lattice."
598 );
599 }
600
601 /// Test that the addition of three watermark messages in reverse order, leads to them being
602 /// executed in the correct order.
603 #[test]
604 fn test_unordered_watermark() {
605 let lattice: ExecutionLattice = ExecutionLattice::new();
606 let events = vec![
607 OperatorEvent::new(
608 Timestamp::Time(vec![3]),
609 true,
610 0,
611 HashSet::new(),
612 HashSet::new(),
613 || (),
614 ),
615 OperatorEvent::new(
616 Timestamp::Time(vec![2]),
617 true,
618 0,
619 HashSet::new(),
620 HashSet::new(),
621 || (),
622 ),
623 OperatorEvent::new(
624 Timestamp::Time(vec![1]),
625 true,
626 0,
627 HashSet::new(),
628 HashSet::new(),
629 || (),
630 ),
631 ];
632 block_on(lattice.add_events(events));
633
634 let (event, event_id) = block_on(lattice.get_event()).unwrap();
635 assert_eq!(
636 event.timestamp,
637 Timestamp::Time(vec![1 as u64]),
638 "The wrong event was returned by the lattice."
639 );
640 assert!(
641 block_on(lattice.get_event()).is_none(),
642 "The wrong event was returned by the lattice."
643 );
644 block_on(lattice.mark_as_completed(event_id));
645 let (event_2, event_id_2) = block_on(lattice.get_event()).unwrap();
646 assert_eq!(
647 event_2.timestamp,
648 Timestamp::Time(vec![2 as u64]),
649 "The wrong event was returned by the lattice."
650 );
651 assert!(
652 block_on(lattice.get_event()).is_none(),
653 "The wrong event was returned by the lattice."
654 );
655 block_on(lattice.mark_as_completed(event_id_2));
656 let (event_3, _event_id_3) = block_on(lattice.get_event()).unwrap();
657 assert_eq!(
658 event_3.timestamp,
659 Timestamp::Time(vec![3 as u64]),
660 "The wrong event was returned by the lattice."
661 );
662 assert!(
663 block_on(lattice.get_event()).is_none(),
664 "The wrong event was returned by the lattice."
665 );
666 }
667
668 /// Test that the addition of messages of different timestamps leads to concurrent execution.
669 #[test]
670 fn test_concurrent_messages_diff_timestamps() {
671 let lattice: ExecutionLattice = ExecutionLattice::new();
672 let events = vec![
673 OperatorEvent::new(
674 Timestamp::Time(vec![3]),
675 false,
676 0,
677 HashSet::new(),
678 HashSet::new(),
679 || (),
680 ),
681 OperatorEvent::new(
682 Timestamp::Time(vec![2]),
683 false,
684 0,
685 HashSet::new(),
686 HashSet::new(),
687 || (),
688 ),
689 OperatorEvent::new(
690 Timestamp::Time(vec![1]),
691 false,
692 0,
693 HashSet::new(),
694 HashSet::new(),
695 || (),
696 ),
697 ];
698 block_on(lattice.add_events(events));
699
700 let (event, _event_id) = block_on(lattice.get_event()).unwrap();
701 assert_eq!(
702 event.timestamp,
703 Timestamp::Time(vec![1 as u64]),
704 "The wrong event was returned by the lattice."
705 );
706 let (event_2, _event_id_2) = block_on(lattice.get_event()).unwrap();
707 assert_eq!(
708 event_2.timestamp,
709 Timestamp::Time(vec![2 as u64]),
710 "The wrong event was returned by the lattice."
711 );
712 let (event_3, _event_id_3) = block_on(lattice.get_event()).unwrap();
713 assert_eq!(
714 event_3.timestamp,
715 Timestamp::Time(vec![3 as u64]),
716 "The wrong event was returned by the lattice."
717 );
718 }
719
720 /// Test that concurrent messages are followed by their watermarks.
721 #[test]
722 fn test_concurrent_messages_watermarks_diff_timestamps() {
723 let lattice: ExecutionLattice = ExecutionLattice::new();
724 let events = vec![
725 OperatorEvent::new(
726 Timestamp::Time(vec![3]),
727 true,
728 0,
729 HashSet::new(),
730 HashSet::new(),
731 || (),
732 ),
733 OperatorEvent::new(
734 Timestamp::Time(vec![2]),
735 true,
736 0,
737 HashSet::new(),
738 HashSet::new(),
739 || (),
740 ),
741 OperatorEvent::new(
742 Timestamp::Time(vec![1]),
743 true,
744 0,
745 HashSet::new(),
746 HashSet::new(),
747 || (),
748 ),
749 OperatorEvent::new(
750 Timestamp::Time(vec![1]),
751 false,
752 0,
753 HashSet::new(),
754 HashSet::new(),
755 || (),
756 ),
757 OperatorEvent::new(
758 Timestamp::Time(vec![2]),
759 false,
760 0,
761 HashSet::new(),
762 HashSet::new(),
763 || (),
764 ),
765 OperatorEvent::new(
766 Timestamp::Time(vec![3]),
767 false,
768 0,
769 HashSet::new(),
770 HashSet::new(),
771 || (),
772 ),
773 ];
774 block_on(lattice.add_events(events));
775 let (event, event_id) = block_on(lattice.get_event()).unwrap();
776 assert!(
777 event.timestamp == Timestamp::Time(vec![1 as u64]) && !event.is_watermark_callback,
778 "The wrong event was returned by the lattice."
779 );
780 let (event_2, event_id_2) = block_on(lattice.get_event()).unwrap();
781 assert!(
782 event_2.timestamp == Timestamp::Time(vec![2 as u64]) && !event_2.is_watermark_callback,
783 "The wrong event was returned by the lattice."
784 );
785 let (event_3, event_id_3) = block_on(lattice.get_event()).unwrap();
786 assert!(
787 event_3.timestamp == Timestamp::Time(vec![3 as u64]) && !event_3.is_watermark_callback,
788 "The wrong event was returned by the lattice."
789 );
790 assert!(
791 block_on(lattice.get_event()).is_none(),
792 "The wrong event was returned by the lattice."
793 );
794 block_on(lattice.mark_as_completed(event_id));
795 let (event_4, event_id_4) = block_on(lattice.get_event()).unwrap();
796 assert!(
797 event_4.timestamp == Timestamp::Time(vec![1 as u64]) && event_4.is_watermark_callback,
798 "The wrong event was returned by the lattice."
799 );
800 assert!(
801 block_on(lattice.get_event()).is_none(),
802 "The wrong event was returned by the lattice."
803 );
804 block_on(lattice.mark_as_completed(event_id_4));
805 assert!(
806 block_on(lattice.get_event()).is_none(),
807 "The wrong event was returned by the lattice."
808 );
809 block_on(lattice.mark_as_completed(event_id_2));
810 let (event_5, event_id_5) = block_on(lattice.get_event()).unwrap();
811 assert!(
812 event_5.timestamp == Timestamp::Time(vec![2 as u64]) && event_5.is_watermark_callback,
813 "The wrong event was returned by the lattice."
814 );
815 block_on(lattice.mark_as_completed(event_id_3));
816 assert!(
817 block_on(lattice.get_event()).is_none(),
818 "The wrong event was returned by the lattice."
819 );
820 block_on(lattice.mark_as_completed(event_id_5));
821 let (event_6, event_id_6) = block_on(lattice.get_event()).unwrap();
822 assert!(
823 event_6.timestamp == Timestamp::Time(vec![3 as u64]) && event_6.is_watermark_callback,
824 "The wrong event was returned by the lattice."
825 );
826 block_on(lattice.mark_as_completed(event_id_6));
827 assert!(
828 block_on(lattice.get_event()).is_none(),
829 "The wrong event was returned by the lattice."
830 );
831 }
832
833 /// Tests that duplicate events do not end up in the lattice's leaves or
834 /// run queue. This can happen if duplicate edges exist in the dependency
835 /// graph.
836 #[test]
837 fn test_no_duplicates() {
838 let lattice = ExecutionLattice::new();
839 // Add 2 operators that can run concurrently.
840 let initial_events = vec![
841 OperatorEvent::new(
842 Timestamp::Time(vec![0]),
843 false,
844 0,
845 HashSet::new(),
846 HashSet::new(),
847 || {},
848 ),
849 OperatorEvent::new(
850 Timestamp::Time(vec![0]),
851 false,
852 0,
853 HashSet::new(),
854 HashSet::new(),
855 || {},
856 ),
857 ];
858 block_on(lattice.add_events(initial_events));
859
860 // Generate events A and B where B precedes A.
861 let event_a = OperatorEvent::new(
862 Timestamp::Time(vec![0]),
863 true,
864 20,
865 HashSet::new(),
866 HashSet::new(),
867 || {},
868 );
869 let event_b = OperatorEvent::new(
870 Timestamp::Time(vec![0]),
871 true,
872 0,
873 HashSet::new(),
874 HashSet::new(),
875 || {},
876 );
877 assert!(event_a > event_b, "Event B must precede event A.");
878
879 // Insert events in reverse order. Due to how the traversal of the
880 // dependency graph is performed, this can result in duplicate edges
881 // when using vectors instead of sets to store an inserted event's
882 // parents and children. Duplicate edges may result in duplicate
883 // attempts to run the same event.
884 block_on(lattice.add_events(vec![event_a]));
885 block_on(lattice.add_events(vec![event_b]));
886 // Dependency graph should be:
887 // -> C
888 // A -> B
889 // -> D
890
891 // Run events C and D
892 let (event_1, event_1_id) = block_on(lattice.get_event()).unwrap();
893 let (event_2, event_2_id) = block_on(lattice.get_event()).unwrap();
894 assert!(
895 !event_1.is_watermark_callback,
896 "Should process events C and D before watermark callbacks."
897 );
898 assert!(
899 !event_2.is_watermark_callback,
900 "Should process events C and D before watermark callbacks."
901 );
902 assert!(
903 block_on(lattice.get_event()).is_none(),
904 "No other events should run until C and D complete."
905 );
906 block_on(lattice.mark_as_completed(event_1_id));
907 assert!(
908 block_on(lattice.get_event()).is_none(),
909 "No other events should run until C and D complete."
910 );
911 block_on(lattice.mark_as_completed(event_2_id));
912
913 // Run event B.
914 let (event_b, event_b_id) = block_on(lattice.get_event()).unwrap();
915 assert_eq!(
916 event_b.priority, 0,
917 "Event B should run after events C and D."
918 );
919 assert!(
920 block_on(lattice.get_event()).is_none(),
921 "A should not run until B completes."
922 );
923 block_on(lattice.mark_as_completed(event_b_id));
924
925 // Run event A.
926 let (_event_a, event_a_id) = block_on(lattice.get_event()).unwrap();
927 block_on(lattice.mark_as_completed(event_a_id));
928
929 // No more events should be in the lattice.
930 assert!(
931 block_on(lattice.get_event()).is_none(),
932 "There should be no more events in the lattice."
933 );
934 }
935}
936*/