Skip to main content

moonpool_sim/sim/
events.rs

1//! Event scheduling and processing for the simulation engine.
2//!
3//! This module provides the core event types and queue for scheduling
4//! events in chronological order with deterministic ordering.
5
6use std::{cmp::Ordering, collections::BinaryHeap, time::Duration};
7
8pub use crate::storage::StorageOperation;
9
10/// Events that can be scheduled in the simulation.
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub enum Event {
13    /// Timer event for waking sleeping tasks
14    Timer {
15        /// The unique identifier for the task to wake.
16        task_id: u64,
17    },
18
19    /// Network data operations
20    Network {
21        /// The connection involved
22        connection_id: u64,
23        /// The operation type
24        operation: NetworkOperation,
25    },
26
27    /// Connection state changes
28    Connection {
29        /// The connection or listener ID
30        id: u64,
31        /// The state change type
32        state: ConnectionStateChange,
33    },
34
35    /// Storage I/O operations
36    Storage {
37        /// The file involved
38        file_id: u64,
39        /// The operation type
40        operation: StorageOperation,
41    },
42
43    /// Shutdown event to wake all tasks for graceful termination
44    Shutdown,
45
46    /// Process restart event: a rebooted process is ready to boot again.
47    ///
48    /// Scheduled after a process is killed, at `now + recovery_delay`.
49    /// The orchestrator handles this by calling the process factory
50    /// and spawning a new `run()` task.
51    ProcessRestart {
52        /// The IP address of the process to restart.
53        ip: std::net::IpAddr,
54    },
55
56    /// Graceful shutdown initiated for a process.
57    ///
58    /// Cancels the per-process shutdown token so the process can observe
59    /// `ctx.shutdown().is_cancelled()` and perform cleanup. A
60    /// [`ProcessForceKill`](Event::ProcessForceKill) is scheduled after the
61    /// grace period expires.
62    ProcessGracefulShutdown {
63        /// The IP address of the process being gracefully shut down.
64        ip: std::net::IpAddr,
65        /// Grace period in milliseconds before force-kill.
66        grace_period_ms: u64,
67        /// Recovery delay in milliseconds after force-kill before restart.
68        recovery_delay_ms: u64,
69    },
70
71    /// Force-kill a process after a graceful shutdown grace period.
72    ///
73    /// Aborts the process task and all its connections, then schedules a
74    /// [`ProcessRestart`](Event::ProcessRestart) after a recovery delay.
75    ProcessForceKill {
76        /// The IP address of the process to force-kill.
77        ip: std::net::IpAddr,
78        /// Recovery delay in milliseconds before restart.
79        recovery_delay_ms: u64,
80    },
81}
82
83impl Event {
84    /// Determines if this event is purely infrastructural (not workload-related).
85    ///
86    /// Infrastructure events maintain simulation state but don't represent actual
87    /// application work. These events can be safely ignored when determining if
88    /// a simulation should terminate after workloads complete.
89    pub fn is_infrastructure_event(&self) -> bool {
90        matches!(
91            self,
92            Event::Connection {
93                state: ConnectionStateChange::PartitionRestore
94                    | ConnectionStateChange::SendPartitionClear
95                    | ConnectionStateChange::RecvPartitionClear
96                    | ConnectionStateChange::CutRestore,
97                ..
98            } | Event::ProcessRestart { .. }
99        )
100    }
101}
102
103/// Network data operations
104#[derive(Debug, Clone, PartialEq, Eq)]
105pub enum NetworkOperation {
106    /// Deliver data to connection's receive buffer
107    DataDelivery {
108        /// The data bytes to deliver
109        data: Vec<u8>,
110    },
111    /// Process next message from connection's send buffer
112    ProcessSendBuffer,
113    /// Deliver FIN (graceful close) to a connection's receive side.
114    /// Scheduled after the last DataDelivery to ensure all data arrives first.
115    FinDelivery,
116}
117
118/// Connection state changes
119#[derive(Debug, Clone, PartialEq, Eq)]
120pub enum ConnectionStateChange {
121    /// Listener bind operation completed
122    BindComplete,
123    /// Connection establishment completed
124    ConnectionReady,
125    /// Clear write clog for a connection
126    ClogClear,
127    /// Clear read clog for a connection
128    ReadClogClear,
129    /// Restore a temporarily cut connection
130    CutRestore,
131    /// Restore network partition between IPs
132    PartitionRestore,
133    /// Clear send partition for an IP
134    SendPartitionClear,
135    /// Clear receive partition for an IP
136    RecvPartitionClear,
137    /// Half-open connection starts returning errors
138    HalfOpenError,
139}
140
141/// An event scheduled for execution at a specific simulation time.
142#[derive(Debug, Clone, PartialEq, Eq)]
143pub struct ScheduledEvent {
144    time: Duration,
145    event: Event,
146    /// Sequence number for deterministic ordering
147    pub sequence: u64,
148}
149
150impl ScheduledEvent {
151    /// Creates a new scheduled event.
152    pub fn new(time: Duration, event: Event, sequence: u64) -> Self {
153        Self {
154            time,
155            event,
156            sequence,
157        }
158    }
159
160    /// Returns the scheduled execution time.
161    pub fn time(&self) -> Duration {
162        self.time
163    }
164
165    /// Returns a reference to the event.
166    pub fn event(&self) -> &Event {
167        &self.event
168    }
169
170    /// Consumes the scheduled event and returns the event.
171    pub fn into_event(self) -> Event {
172        self.event
173    }
174}
175
176impl PartialOrd for ScheduledEvent {
177    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
178        Some(self.cmp(other))
179    }
180}
181
182impl Ord for ScheduledEvent {
183    fn cmp(&self, other: &Self) -> Ordering {
184        // BinaryHeap is a max heap, but we want earliest time first
185        // So we reverse the time comparison
186        match other.time.cmp(&self.time) {
187            Ordering::Equal => {
188                // For events at the same time, use sequence number for deterministic ordering
189                // Earlier sequence numbers should be processed first (also reversed for max heap)
190                other.sequence.cmp(&self.sequence)
191            }
192            other => other,
193        }
194    }
195}
196
197/// A priority queue for scheduling events in chronological order.
198///
199/// Events are processed in time order, with deterministic ordering for events
200/// scheduled at the same time using sequence numbers.
201#[derive(Debug)]
202pub struct EventQueue {
203    heap: BinaryHeap<ScheduledEvent>,
204}
205
206impl EventQueue {
207    /// Creates a new empty event queue.
208    pub fn new() -> Self {
209        Self {
210            heap: BinaryHeap::new(),
211        }
212    }
213
214    /// Schedules an event for execution.
215    pub fn schedule(&mut self, event: ScheduledEvent) {
216        self.heap.push(event);
217    }
218
219    /// Removes and returns the earliest scheduled event.
220    pub fn pop_earliest(&mut self) -> Option<ScheduledEvent> {
221        self.heap.pop()
222    }
223
224    /// Returns a reference to the earliest scheduled event without removing it.
225    #[allow(dead_code)]
226    pub fn peek_earliest(&self) -> Option<&ScheduledEvent> {
227        self.heap.peek()
228    }
229
230    /// Returns `true` if the queue is empty.
231    pub fn is_empty(&self) -> bool {
232        self.heap.is_empty()
233    }
234
235    /// Returns the number of events in the queue.
236    pub fn len(&self) -> usize {
237        self.heap.len()
238    }
239
240    /// Checks if the queue contains only infrastructure events (no workload events).
241    ///
242    /// Infrastructure events are those that maintain simulation state but don't
243    /// represent actual application work (like connection restoration).
244    /// Returns true if empty or contains only infrastructure events.
245    pub fn has_only_infrastructure_events(&self) -> bool {
246        self.heap
247            .iter()
248            .all(|scheduled_event| scheduled_event.event().is_infrastructure_event())
249    }
250}
251
252impl Default for EventQueue {
253    fn default() -> Self {
254        Self::new()
255    }
256}
257
258#[cfg(test)]
259mod tests {
260    use super::*;
261
262    #[test]
263    fn test_infrastructure_event_detection() {
264        // Test Event::is_infrastructure_event() method
265        let restore_event = Event::Connection {
266            id: 1,
267            state: ConnectionStateChange::PartitionRestore,
268        };
269        assert!(restore_event.is_infrastructure_event());
270
271        let timer_event = Event::Timer { task_id: 1 };
272        assert!(!timer_event.is_infrastructure_event());
273
274        let network_event = Event::Network {
275            connection_id: 1,
276            operation: NetworkOperation::DataDelivery {
277                data: vec![1, 2, 3],
278            },
279        };
280        assert!(!network_event.is_infrastructure_event());
281
282        let shutdown_event = Event::Shutdown;
283        assert!(!shutdown_event.is_infrastructure_event());
284
285        // Test EventQueue::has_only_infrastructure_events() method
286        let mut queue = EventQueue::new();
287
288        // Empty queue should be considered "only infrastructure"
289        assert!(queue.has_only_infrastructure_events());
290
291        // Queue with only ConnectionRestore events
292        queue.schedule(ScheduledEvent::new(
293            Duration::from_secs(1),
294            restore_event,
295            1,
296        ));
297        assert!(queue.has_only_infrastructure_events());
298
299        // Queue with workload events should return false
300        queue.schedule(ScheduledEvent::new(Duration::from_secs(2), timer_event, 2));
301        assert!(!queue.has_only_infrastructure_events());
302
303        // Queue with network events should return false
304        let mut queue2 = EventQueue::new();
305        queue2.schedule(ScheduledEvent::new(
306            Duration::from_secs(1),
307            network_event,
308            1,
309        ));
310        assert!(!queue2.has_only_infrastructure_events());
311    }
312
313    #[test]
314    fn event_queue_ordering() {
315        let mut queue = EventQueue::new();
316
317        // Schedule events in random order
318        queue.schedule(ScheduledEvent::new(
319            Duration::from_millis(300),
320            Event::Timer { task_id: 3 },
321            2,
322        ));
323        queue.schedule(ScheduledEvent::new(
324            Duration::from_millis(100),
325            Event::Timer { task_id: 1 },
326            0,
327        ));
328        queue.schedule(ScheduledEvent::new(
329            Duration::from_millis(200),
330            Event::Timer { task_id: 2 },
331            1,
332        ));
333
334        // Should pop in time order
335        let event1 = queue.pop_earliest().expect("should have event");
336        assert_eq!(event1.time(), Duration::from_millis(100));
337        assert_eq!(event1.event(), &Event::Timer { task_id: 1 });
338
339        let event2 = queue.pop_earliest().expect("should have event");
340        assert_eq!(event2.time(), Duration::from_millis(200));
341        assert_eq!(event2.event(), &Event::Timer { task_id: 2 });
342
343        let event3 = queue.pop_earliest().expect("should have event");
344        assert_eq!(event3.time(), Duration::from_millis(300));
345        assert_eq!(event3.event(), &Event::Timer { task_id: 3 });
346
347        assert!(queue.is_empty());
348    }
349
350    #[test]
351    fn same_time_deterministic_ordering() {
352        let mut queue = EventQueue::new();
353        let same_time = Duration::from_millis(100);
354
355        // Schedule multiple events at the same time with different sequence numbers
356        queue.schedule(ScheduledEvent::new(
357            same_time,
358            Event::Timer { task_id: 3 },
359            2, // Later sequence
360        ));
361        queue.schedule(ScheduledEvent::new(
362            same_time,
363            Event::Timer { task_id: 1 },
364            0, // Earlier sequence
365        ));
366        queue.schedule(ScheduledEvent::new(
367            same_time,
368            Event::Timer { task_id: 2 },
369            1, // Middle sequence
370        ));
371
372        // Should pop in sequence order when times are equal
373        let event1 = queue.pop_earliest().expect("should have event");
374        assert_eq!(event1.event(), &Event::Timer { task_id: 1 });
375        assert_eq!(event1.sequence, 0);
376
377        let event2 = queue.pop_earliest().expect("should have event");
378        assert_eq!(event2.event(), &Event::Timer { task_id: 2 });
379        assert_eq!(event2.sequence, 1);
380
381        let event3 = queue.pop_earliest().expect("should have event");
382        assert_eq!(event3.event(), &Event::Timer { task_id: 3 });
383        assert_eq!(event3.sequence, 2);
384
385        assert!(queue.is_empty());
386    }
387}