use ;
use crate::;
/// `OperatorType` is an enum that enumerates the type of operators in Rust.
/// The different operator types have different execution semantics for the message and watermark
/// callbacks dictated by the `Ord` trait on the `OperatorEvent`.
/// `OperatorEvent` is a structure that encapsulates a particular invocation of the
/// callback in response to a message or watermark. These events are processed according to the
/// partial order defined by the `PartialOrd` trait, where `x < y` implies `x` *precedes* `y`.
///
/// The event is passed to an instance of
/// [`OperatorExecutor`](../operator_executor/struct.OperatorExecutor.html)
/// which is in charge of inserting the event into a
/// [`ExecutionLattice`](../lattice/struct.ExecutionLattice.html). The `ExecutionLattice` ensures
/// that the events are processed in the partial order defined by the executor.
unsafe
// Implement the `Display` and `Debug` traits so that we can visualize the event.
// Implement traits to define the order in which the events should be executed.
// Make changes to the `cmp` function of the `Ord` trait to change the partial order of the events.
/// Ordering used in the lattice where `self < other` implies `self` *precedes* other.
/*
#[cfg(test)]
mod test {
use super::*;
/// This test ensures that two watermark messages are partially ordered based on their
/// timestamps, and the watermark with the lower timestamp is executed first.
#[test]
fn test_watermark_event_orderings() {
{
let watermark_event_a: OperatorEvent = OperatorEvent::new(
Timestamp::Time(vec![1]),
true,
0,
HashSet::new(),
HashSet::new(),
|| (),
);
let watermark_event_b: OperatorEvent = OperatorEvent::new(
Timestamp::Time(vec![2]),
true,
0,
HashSet::new(),
HashSet::new(),
|| (),
);
assert!(
watermark_event_a < watermark_event_b,
"A has a lower timestamp and should precede B."
);
}
// Test that priorities should break ties only for otherwise equal watermark callbacks.
{
let watermark_event_a: OperatorEvent = OperatorEvent::new(
Timestamp::Time(vec![1]),
true,
-1,
HashSet::new(),
HashSet::new(),
|| (),
);
let watermark_event_b: OperatorEvent = OperatorEvent::new(
Timestamp::Time(vec![1]),
true,
1,
HashSet::new(),
HashSet::new(),
|| (),
);
assert!(
watermark_event_a < watermark_event_b,
"A is higher priority and should precede B."
);
let watermark_event_c: OperatorEvent = OperatorEvent::new(
Timestamp::Time(vec![0]),
true,
0,
HashSet::new(),
HashSet::new(),
|| (),
);
assert!(
watermark_event_a > watermark_event_c,
"C has a smaller timestamp and should precede A."
);
assert!(
watermark_event_b > watermark_event_c,
"C has a smaller timestamp and should precede B."
);
let watermark_event_d: OperatorEvent = OperatorEvent::new(
Timestamp::Time(vec![2]),
true,
0,
HashSet::new(),
HashSet::new(),
|| (),
);
assert!(
watermark_event_a < watermark_event_d,
"D has a larger timestamp and should follow A."
);
assert!(
watermark_event_b < watermark_event_d,
"D has a larger timestamp and should follow B."
);
// Priority should not affect message events
let message_event_a: OperatorEvent = OperatorEvent::new(
Timestamp::Time(vec![1]),
false,
0,
HashSet::new(),
HashSet::new(),
|| (),
);
assert!(
message_event_a < watermark_event_a,
"Message A should precede Watermark A independent of priority."
);
assert!(
message_event_a < watermark_event_b,
"Message A should precede Watermark B independent of priority."
);
let message_event_b: OperatorEvent = OperatorEvent::new(
Timestamp::Time(vec![2]),
false,
0,
HashSet::new(),
HashSet::new(),
|| (),
);
assert_eq!(
watermark_event_a, message_event_b,
"Watermark A and Message B can execute concurrently."
);
assert_eq!(
watermark_event_b, message_event_b,
"Watermark B and Message B can execute concurrently."
);
}
}
/// This test ensures that two non-watermark messages are rendered equal in their partial order
/// and thus can be run concurrently by the executor.
#[test]
fn test_message_event_orderings() {
let message_event_a: OperatorEvent = OperatorEvent::new(
Timestamp::Time(vec![1]),
false,
0,
HashSet::new(),
HashSet::new(),
|| (),
);
let message_event_b: OperatorEvent = OperatorEvent::new(
Timestamp::Time(vec![2]),
false,
0,
HashSet::new(),
HashSet::new(),
|| (),
);
assert!(
message_event_a == message_event_b,
"Message A and Message B can run concurrently."
);
}
#[test]
fn test_message_watermark_event_orderings() {
// Test that a message with a timestamp less than the watermark ensures that the watermark
// is dependent on the message.
{
let message_event_a: OperatorEvent = OperatorEvent::new(
Timestamp::Time(vec![1]),
false,
0,
HashSet::new(),
HashSet::new(),
|| (),
);
let watermark_event_b: OperatorEvent = OperatorEvent::new(
Timestamp::Time(vec![2]),
true,
0,
HashSet::new(),
HashSet::new(),
|| (),
);
assert!(
message_event_a < watermark_event_b,
"Message A with timestamp 1 should precede Watermark B with timestamp 2."
);
}
// Test that a message with a timestamp equivalent to the watermark is run before the
// watermark.
{
let message_event_a: OperatorEvent = OperatorEvent::new(
Timestamp::Time(vec![1]),
false,
0,
HashSet::new(),
HashSet::new(),
|| (),
);
let watermark_event_b: OperatorEvent = OperatorEvent::new(
Timestamp::Time(vec![1]),
true,
0,
HashSet::new(),
HashSet::new(),
|| (),
);
assert!(
message_event_a < watermark_event_b,
"Message A with timestamp 1 should precede Watermark B with timestamp 1."
);
}
// Test that a message with a timestamp greater than a watermark can be run concurrently
// with a watermark of lesser timestamp.
{
let message_event_a: OperatorEvent = OperatorEvent::new(
Timestamp::Time(vec![2]),
false,
0,
HashSet::new(),
HashSet::new(),
|| (),
);
let watermark_event_b: OperatorEvent = OperatorEvent::new(
Timestamp::Time(vec![1]),
true,
0,
HashSet::new(),
HashSet::new(),
|| (),
);
assert!(
message_event_a == watermark_event_b,
"Message A with timestamp 1 and Watermark B with timestamp 2 can run concurrently."
);
}
}
#[test]
fn test_resolve_access_conflicts() {
let mut write_ids = HashSet::new();
write_ids.insert(Uuid::new_deterministic());
let event_a = OperatorEvent::new(
Timestamp::Time(vec![0]),
true,
0,
HashSet::new(),
write_ids.clone(),
|| {},
);
let event_b = OperatorEvent::new(
Timestamp::Time(vec![0]),
true,
1,
HashSet::new(),
write_ids.clone(),
|| {},
);
assert!(event_a < event_b, "A should precede B due to priority.");
let mut read_ids = HashSet::new();
read_ids.insert(Uuid::new_deterministic());
let event_c = OperatorEvent::new(
Timestamp::Time(vec![0]),
true,
0,
read_ids,
write_ids.clone(),
|| {},
);
assert!(
event_a < event_c,
"A should precede C because A has fewer dependencies."
);
let read_ids = write_ids.clone();
let event_d = OperatorEvent::new(
Timestamp::Time(vec![0]),
true,
0,
read_ids,
HashSet::new(),
|| {},
);
assert!(
event_a < event_d,
"A should precede D due to a WR conflict."
);
}
}
*/