moonpool-sim 0.6.0

Simulation engine for the moonpool framework
Documentation
//! Event scheduling and processing for the simulation engine.
//!
//! This module provides the core event types and queue for scheduling
//! events in chronological order with deterministic ordering.

use std::{cmp::Ordering, collections::BinaryHeap, time::Duration};

pub use crate::storage::StorageOperation;

/// Events that can be scheduled in the simulation.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Event {
    /// Timer event for waking sleeping tasks
    Timer {
        /// The unique identifier for the task to wake.
        task_id: u64,
    },

    /// Network data operations
    Network {
        /// The connection involved
        connection_id: u64,
        /// The operation type
        operation: NetworkOperation,
    },

    /// Connection state changes
    Connection {
        /// The connection or listener ID
        id: u64,
        /// The state change type
        state: ConnectionStateChange,
    },

    /// Storage I/O operations
    Storage {
        /// The file involved
        file_id: u64,
        /// The operation type
        operation: StorageOperation,
    },

    /// Shutdown event to wake all tasks for graceful termination
    Shutdown,

    /// Process restart event: a rebooted process is ready to boot again.
    ///
    /// Scheduled after a process is killed, at `now + recovery_delay`.
    /// The orchestrator handles this by calling the process factory
    /// and spawning a new `run()` task.
    ProcessRestart {
        /// The IP address of the process to restart.
        ip: std::net::IpAddr,
    },

    /// Graceful shutdown initiated for a process.
    ///
    /// Cancels the per-process shutdown token so the process can observe
    /// `ctx.shutdown().is_cancelled()` and perform cleanup. A
    /// [`ProcessForceKill`](Event::ProcessForceKill) is scheduled after the
    /// grace period expires.
    ProcessGracefulShutdown {
        /// The IP address of the process being gracefully shut down.
        ip: std::net::IpAddr,
        /// Grace period in milliseconds before force-kill.
        grace_period_ms: u64,
        /// Recovery delay in milliseconds after force-kill before restart.
        recovery_delay_ms: u64,
    },

    /// Force-kill a process after a graceful shutdown grace period.
    ///
    /// Aborts the process task and all its connections, then schedules a
    /// [`ProcessRestart`](Event::ProcessRestart) after a recovery delay.
    ProcessForceKill {
        /// The IP address of the process to force-kill.
        ip: std::net::IpAddr,
        /// Recovery delay in milliseconds before restart.
        recovery_delay_ms: u64,
    },
}

impl Event {
    /// Determines if this event is purely infrastructural (not workload-related).
    ///
    /// Infrastructure events maintain simulation state but don't represent actual
    /// application work. These events can be safely ignored when determining if
    /// a simulation should terminate after workloads complete.
    pub fn is_infrastructure_event(&self) -> bool {
        matches!(
            self,
            Event::Connection {
                state: ConnectionStateChange::PartitionRestore
                    | ConnectionStateChange::SendPartitionClear
                    | ConnectionStateChange::RecvPartitionClear
                    | ConnectionStateChange::CutRestore,
                ..
            } | Event::ProcessRestart { .. }
        )
    }
}

/// Network data operations
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum NetworkOperation {
    /// Deliver data to connection's receive buffer
    DataDelivery {
        /// The data bytes to deliver
        data: Vec<u8>,
    },
    /// Process next message from connection's send buffer
    ProcessSendBuffer,
    /// Deliver FIN (graceful close) to a connection's receive side.
    /// Scheduled after the last DataDelivery to ensure all data arrives first.
    FinDelivery,
}

/// Connection state changes
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ConnectionStateChange {
    /// Listener bind operation completed
    BindComplete,
    /// Connection establishment completed
    ConnectionReady,
    /// Clear write clog for a connection
    ClogClear,
    /// Clear read clog for a connection
    ReadClogClear,
    /// Restore a temporarily cut connection
    CutRestore,
    /// Restore network partition between IPs
    PartitionRestore,
    /// Clear send partition for an IP
    SendPartitionClear,
    /// Clear receive partition for an IP
    RecvPartitionClear,
    /// Half-open connection starts returning errors
    HalfOpenError,
}

/// An event scheduled for execution at a specific simulation time.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ScheduledEvent {
    time: Duration,
    event: Event,
    /// Sequence number for deterministic ordering
    pub sequence: u64,
}

impl ScheduledEvent {
    /// Creates a new scheduled event.
    pub fn new(time: Duration, event: Event, sequence: u64) -> Self {
        Self {
            time,
            event,
            sequence,
        }
    }

    /// Returns the scheduled execution time.
    pub fn time(&self) -> Duration {
        self.time
    }

    /// Returns a reference to the event.
    pub fn event(&self) -> &Event {
        &self.event
    }

    /// Consumes the scheduled event and returns the event.
    pub fn into_event(self) -> Event {
        self.event
    }
}

impl PartialOrd for ScheduledEvent {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}

impl Ord for ScheduledEvent {
    fn cmp(&self, other: &Self) -> Ordering {
        // BinaryHeap is a max heap, but we want earliest time first
        // So we reverse the time comparison
        match other.time.cmp(&self.time) {
            Ordering::Equal => {
                // For events at the same time, use sequence number for deterministic ordering
                // Earlier sequence numbers should be processed first (also reversed for max heap)
                other.sequence.cmp(&self.sequence)
            }
            other => other,
        }
    }
}

/// A priority queue for scheduling events in chronological order.
///
/// Events are processed in time order, with deterministic ordering for events
/// scheduled at the same time using sequence numbers.
#[derive(Debug)]
pub struct EventQueue {
    heap: BinaryHeap<ScheduledEvent>,
}

impl EventQueue {
    /// Creates a new empty event queue.
    pub fn new() -> Self {
        Self {
            heap: BinaryHeap::new(),
        }
    }

    /// Schedules an event for execution.
    pub fn schedule(&mut self, event: ScheduledEvent) {
        self.heap.push(event);
    }

    /// Removes and returns the earliest scheduled event.
    pub fn pop_earliest(&mut self) -> Option<ScheduledEvent> {
        self.heap.pop()
    }

    /// Returns a reference to the earliest scheduled event without removing it.
    #[allow(dead_code)]
    pub fn peek_earliest(&self) -> Option<&ScheduledEvent> {
        self.heap.peek()
    }

    /// Returns `true` if the queue is empty.
    pub fn is_empty(&self) -> bool {
        self.heap.is_empty()
    }

    /// Returns the number of events in the queue.
    pub fn len(&self) -> usize {
        self.heap.len()
    }

    /// Checks if the queue contains only infrastructure events (no workload events).
    ///
    /// Infrastructure events are those that maintain simulation state but don't
    /// represent actual application work (like connection restoration).
    /// Returns true if empty or contains only infrastructure events.
    pub fn has_only_infrastructure_events(&self) -> bool {
        self.heap
            .iter()
            .all(|scheduled_event| scheduled_event.event().is_infrastructure_event())
    }
}

impl Default for EventQueue {
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_infrastructure_event_detection() {
        // Test Event::is_infrastructure_event() method
        let restore_event = Event::Connection {
            id: 1,
            state: ConnectionStateChange::PartitionRestore,
        };
        assert!(restore_event.is_infrastructure_event());

        let timer_event = Event::Timer { task_id: 1 };
        assert!(!timer_event.is_infrastructure_event());

        let network_event = Event::Network {
            connection_id: 1,
            operation: NetworkOperation::DataDelivery {
                data: vec![1, 2, 3],
            },
        };
        assert!(!network_event.is_infrastructure_event());

        let shutdown_event = Event::Shutdown;
        assert!(!shutdown_event.is_infrastructure_event());

        // Test EventQueue::has_only_infrastructure_events() method
        let mut queue = EventQueue::new();

        // Empty queue should be considered "only infrastructure"
        assert!(queue.has_only_infrastructure_events());

        // Queue with only ConnectionRestore events
        queue.schedule(ScheduledEvent::new(
            Duration::from_secs(1),
            restore_event,
            1,
        ));
        assert!(queue.has_only_infrastructure_events());

        // Queue with workload events should return false
        queue.schedule(ScheduledEvent::new(Duration::from_secs(2), timer_event, 2));
        assert!(!queue.has_only_infrastructure_events());

        // Queue with network events should return false
        let mut queue2 = EventQueue::new();
        queue2.schedule(ScheduledEvent::new(
            Duration::from_secs(1),
            network_event,
            1,
        ));
        assert!(!queue2.has_only_infrastructure_events());
    }

    #[test]
    fn event_queue_ordering() {
        let mut queue = EventQueue::new();

        // Schedule events in random order
        queue.schedule(ScheduledEvent::new(
            Duration::from_millis(300),
            Event::Timer { task_id: 3 },
            2,
        ));
        queue.schedule(ScheduledEvent::new(
            Duration::from_millis(100),
            Event::Timer { task_id: 1 },
            0,
        ));
        queue.schedule(ScheduledEvent::new(
            Duration::from_millis(200),
            Event::Timer { task_id: 2 },
            1,
        ));

        // Should pop in time order
        let event1 = queue.pop_earliest().expect("should have event");
        assert_eq!(event1.time(), Duration::from_millis(100));
        assert_eq!(event1.event(), &Event::Timer { task_id: 1 });

        let event2 = queue.pop_earliest().expect("should have event");
        assert_eq!(event2.time(), Duration::from_millis(200));
        assert_eq!(event2.event(), &Event::Timer { task_id: 2 });

        let event3 = queue.pop_earliest().expect("should have event");
        assert_eq!(event3.time(), Duration::from_millis(300));
        assert_eq!(event3.event(), &Event::Timer { task_id: 3 });

        assert!(queue.is_empty());
    }

    #[test]
    fn same_time_deterministic_ordering() {
        let mut queue = EventQueue::new();
        let same_time = Duration::from_millis(100);

        // Schedule multiple events at the same time with different sequence numbers
        queue.schedule(ScheduledEvent::new(
            same_time,
            Event::Timer { task_id: 3 },
            2, // Later sequence
        ));
        queue.schedule(ScheduledEvent::new(
            same_time,
            Event::Timer { task_id: 1 },
            0, // Earlier sequence
        ));
        queue.schedule(ScheduledEvent::new(
            same_time,
            Event::Timer { task_id: 2 },
            1, // Middle sequence
        ));

        // Should pop in sequence order when times are equal
        let event1 = queue.pop_earliest().expect("should have event");
        assert_eq!(event1.event(), &Event::Timer { task_id: 1 });
        assert_eq!(event1.sequence, 0);

        let event2 = queue.pop_earliest().expect("should have event");
        assert_eq!(event2.event(), &Event::Timer { task_id: 2 });
        assert_eq!(event2.sequence, 1);

        let event3 = queue.pop_earliest().expect("should have event");
        assert_eq!(event3.event(), &Event::Timer { task_id: 3 });
        assert_eq!(event3.sequence, 2);

        assert!(queue.is_empty());
    }
}