use std::{cmp::Ordering, collections::BinaryHeap, time::Duration};
pub use crate::storage::StorageOperation;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Event {
Timer {
task_id: u64,
},
Network {
connection_id: u64,
operation: NetworkOperation,
},
Connection {
id: u64,
state: ConnectionStateChange,
},
Storage {
file_id: u64,
operation: StorageOperation,
},
Shutdown,
ProcessRestart {
ip: std::net::IpAddr,
},
ProcessGracefulShutdown {
ip: std::net::IpAddr,
grace_period_ms: u64,
recovery_delay_ms: u64,
},
ProcessForceKill {
ip: std::net::IpAddr,
recovery_delay_ms: u64,
},
}
impl Event {
pub fn is_infrastructure_event(&self) -> bool {
matches!(
self,
Event::Connection {
state: ConnectionStateChange::PartitionRestore
| ConnectionStateChange::SendPartitionClear
| ConnectionStateChange::RecvPartitionClear
| ConnectionStateChange::CutRestore,
..
} | Event::ProcessRestart { .. }
)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum NetworkOperation {
DataDelivery {
data: Vec<u8>,
},
ProcessSendBuffer,
FinDelivery,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ConnectionStateChange {
BindComplete,
ConnectionReady,
ClogClear,
ReadClogClear,
CutRestore,
PartitionRestore,
SendPartitionClear,
RecvPartitionClear,
HalfOpenError,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ScheduledEvent {
time: Duration,
event: Event,
pub sequence: u64,
}
impl ScheduledEvent {
pub fn new(time: Duration, event: Event, sequence: u64) -> Self {
Self {
time,
event,
sequence,
}
}
pub fn time(&self) -> Duration {
self.time
}
pub fn event(&self) -> &Event {
&self.event
}
pub fn into_event(self) -> Event {
self.event
}
}
impl PartialOrd for ScheduledEvent {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for ScheduledEvent {
fn cmp(&self, other: &Self) -> Ordering {
match other.time.cmp(&self.time) {
Ordering::Equal => {
other.sequence.cmp(&self.sequence)
}
other => other,
}
}
}
#[derive(Debug)]
pub struct EventQueue {
heap: BinaryHeap<ScheduledEvent>,
}
impl EventQueue {
pub fn new() -> Self {
Self {
heap: BinaryHeap::new(),
}
}
pub fn schedule(&mut self, event: ScheduledEvent) {
self.heap.push(event);
}
pub fn pop_earliest(&mut self) -> Option<ScheduledEvent> {
self.heap.pop()
}
#[allow(dead_code)]
pub fn peek_earliest(&self) -> Option<&ScheduledEvent> {
self.heap.peek()
}
pub fn is_empty(&self) -> bool {
self.heap.is_empty()
}
pub fn len(&self) -> usize {
self.heap.len()
}
pub fn has_only_infrastructure_events(&self) -> bool {
self.heap
.iter()
.all(|scheduled_event| scheduled_event.event().is_infrastructure_event())
}
}
impl Default for EventQueue {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_infrastructure_event_detection() {
let restore_event = Event::Connection {
id: 1,
state: ConnectionStateChange::PartitionRestore,
};
assert!(restore_event.is_infrastructure_event());
let timer_event = Event::Timer { task_id: 1 };
assert!(!timer_event.is_infrastructure_event());
let network_event = Event::Network {
connection_id: 1,
operation: NetworkOperation::DataDelivery {
data: vec![1, 2, 3],
},
};
assert!(!network_event.is_infrastructure_event());
let shutdown_event = Event::Shutdown;
assert!(!shutdown_event.is_infrastructure_event());
let mut queue = EventQueue::new();
assert!(queue.has_only_infrastructure_events());
queue.schedule(ScheduledEvent::new(
Duration::from_secs(1),
restore_event,
1,
));
assert!(queue.has_only_infrastructure_events());
queue.schedule(ScheduledEvent::new(Duration::from_secs(2), timer_event, 2));
assert!(!queue.has_only_infrastructure_events());
let mut queue2 = EventQueue::new();
queue2.schedule(ScheduledEvent::new(
Duration::from_secs(1),
network_event,
1,
));
assert!(!queue2.has_only_infrastructure_events());
}
#[test]
fn event_queue_ordering() {
let mut queue = EventQueue::new();
queue.schedule(ScheduledEvent::new(
Duration::from_millis(300),
Event::Timer { task_id: 3 },
2,
));
queue.schedule(ScheduledEvent::new(
Duration::from_millis(100),
Event::Timer { task_id: 1 },
0,
));
queue.schedule(ScheduledEvent::new(
Duration::from_millis(200),
Event::Timer { task_id: 2 },
1,
));
let event1 = queue.pop_earliest().expect("should have event");
assert_eq!(event1.time(), Duration::from_millis(100));
assert_eq!(event1.event(), &Event::Timer { task_id: 1 });
let event2 = queue.pop_earliest().expect("should have event");
assert_eq!(event2.time(), Duration::from_millis(200));
assert_eq!(event2.event(), &Event::Timer { task_id: 2 });
let event3 = queue.pop_earliest().expect("should have event");
assert_eq!(event3.time(), Duration::from_millis(300));
assert_eq!(event3.event(), &Event::Timer { task_id: 3 });
assert!(queue.is_empty());
}
#[test]
fn same_time_deterministic_ordering() {
let mut queue = EventQueue::new();
let same_time = Duration::from_millis(100);
queue.schedule(ScheduledEvent::new(
same_time,
Event::Timer { task_id: 3 },
2, ));
queue.schedule(ScheduledEvent::new(
same_time,
Event::Timer { task_id: 1 },
0, ));
queue.schedule(ScheduledEvent::new(
same_time,
Event::Timer { task_id: 2 },
1, ));
let event1 = queue.pop_earliest().expect("should have event");
assert_eq!(event1.event(), &Event::Timer { task_id: 1 });
assert_eq!(event1.sequence, 0);
let event2 = queue.pop_earliest().expect("should have event");
assert_eq!(event2.event(), &Event::Timer { task_id: 2 });
assert_eq!(event2.sequence, 1);
let event3 = queue.pop_earliest().expect("should have event");
assert_eq!(event3.event(), &Event::Timer { task_id: 3 });
assert_eq!(event3.sequence, 2);
assert!(queue.is_empty());
}
}