Skip to main content

moonpool_sim/sim/
events.rs

1//! Event scheduling and processing for the simulation engine.
2//!
3//! This module provides the core event types and queue for scheduling
4//! events in chronological order with deterministic ordering.
5
6use std::{cmp::Ordering, collections::BinaryHeap, time::Duration};
7
8pub use crate::storage::StorageOperation;
9
10/// Events that can be scheduled in the simulation.
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub enum Event {
13    /// Timer event for waking sleeping tasks
14    Timer {
15        /// The unique identifier for the task to wake.
16        task_id: u64,
17    },
18
19    /// Network data operations
20    Network {
21        /// The connection involved
22        connection_id: u64,
23        /// The operation type
24        operation: NetworkOperation,
25    },
26
27    /// Connection state changes
28    Connection {
29        /// The connection or listener ID
30        id: u64,
31        /// The state change type
32        state: ConnectionStateChange,
33    },
34
35    /// Storage I/O operations
36    Storage {
37        /// The file involved
38        file_id: u64,
39        /// The operation type
40        operation: StorageOperation,
41    },
42
43    /// Shutdown event to wake all tasks for graceful termination
44    Shutdown,
45}
46
47impl Event {
48    /// Determines if this event is purely infrastructural (not workload-related).
49    ///
50    /// Infrastructure events maintain simulation state but don't represent actual
51    /// application work. These events can be safely ignored when determining if
52    /// a simulation should terminate after workloads complete.
53    pub fn is_infrastructure_event(&self) -> bool {
54        matches!(
55            self,
56            Event::Connection {
57                state: ConnectionStateChange::PartitionRestore
58                    | ConnectionStateChange::SendPartitionClear
59                    | ConnectionStateChange::RecvPartitionClear
60                    | ConnectionStateChange::CutRestore,
61                ..
62            } // Could add other infrastructure events here if needed:
63              // | Event::Connection { state: ConnectionStateChange::ClogClear, .. }
64        )
65    }
66}
67
68/// Network data operations
69#[derive(Debug, Clone, PartialEq, Eq)]
70pub enum NetworkOperation {
71    /// Deliver data to connection's receive buffer
72    DataDelivery {
73        /// The data bytes to deliver
74        data: Vec<u8>,
75    },
76    /// Process next message from connection's send buffer
77    ProcessSendBuffer,
78    /// Deliver FIN (graceful close) to a connection's receive side.
79    /// Scheduled after the last DataDelivery to ensure all data arrives first.
80    FinDelivery,
81}
82
83/// Connection state changes
84#[derive(Debug, Clone, PartialEq, Eq)]
85pub enum ConnectionStateChange {
86    /// Listener bind operation completed
87    BindComplete,
88    /// Connection establishment completed
89    ConnectionReady,
90    /// Clear write clog for a connection
91    ClogClear,
92    /// Clear read clog for a connection
93    ReadClogClear,
94    /// Restore a temporarily cut connection
95    CutRestore,
96    /// Restore network partition between IPs
97    PartitionRestore,
98    /// Clear send partition for an IP
99    SendPartitionClear,
100    /// Clear receive partition for an IP
101    RecvPartitionClear,
102    /// Half-open connection starts returning errors
103    HalfOpenError,
104}
105
106/// An event scheduled for execution at a specific simulation time.
107#[derive(Debug, Clone, PartialEq, Eq)]
108pub struct ScheduledEvent {
109    time: Duration,
110    event: Event,
111    /// Sequence number for deterministic ordering
112    pub sequence: u64,
113}
114
115impl ScheduledEvent {
116    /// Creates a new scheduled event.
117    pub fn new(time: Duration, event: Event, sequence: u64) -> Self {
118        Self {
119            time,
120            event,
121            sequence,
122        }
123    }
124
125    /// Returns the scheduled execution time.
126    pub fn time(&self) -> Duration {
127        self.time
128    }
129
130    /// Returns a reference to the event.
131    pub fn event(&self) -> &Event {
132        &self.event
133    }
134
135    /// Consumes the scheduled event and returns the event.
136    pub fn into_event(self) -> Event {
137        self.event
138    }
139}
140
141impl PartialOrd for ScheduledEvent {
142    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
143        Some(self.cmp(other))
144    }
145}
146
147impl Ord for ScheduledEvent {
148    fn cmp(&self, other: &Self) -> Ordering {
149        // BinaryHeap is a max heap, but we want earliest time first
150        // So we reverse the time comparison
151        match other.time.cmp(&self.time) {
152            Ordering::Equal => {
153                // For events at the same time, use sequence number for deterministic ordering
154                // Earlier sequence numbers should be processed first (also reversed for max heap)
155                other.sequence.cmp(&self.sequence)
156            }
157            other => other,
158        }
159    }
160}
161
162/// A priority queue for scheduling events in chronological order.
163///
164/// Events are processed in time order, with deterministic ordering for events
165/// scheduled at the same time using sequence numbers.
166#[derive(Debug)]
167pub struct EventQueue {
168    heap: BinaryHeap<ScheduledEvent>,
169}
170
171impl EventQueue {
172    /// Creates a new empty event queue.
173    pub fn new() -> Self {
174        Self {
175            heap: BinaryHeap::new(),
176        }
177    }
178
179    /// Schedules an event for execution.
180    pub fn schedule(&mut self, event: ScheduledEvent) {
181        self.heap.push(event);
182    }
183
184    /// Removes and returns the earliest scheduled event.
185    pub fn pop_earliest(&mut self) -> Option<ScheduledEvent> {
186        self.heap.pop()
187    }
188
189    /// Returns a reference to the earliest scheduled event without removing it.
190    #[allow(dead_code)]
191    pub fn peek_earliest(&self) -> Option<&ScheduledEvent> {
192        self.heap.peek()
193    }
194
195    /// Returns `true` if the queue is empty.
196    pub fn is_empty(&self) -> bool {
197        self.heap.is_empty()
198    }
199
200    /// Returns the number of events in the queue.
201    pub fn len(&self) -> usize {
202        self.heap.len()
203    }
204
205    /// Checks if the queue contains only infrastructure events (no workload events).
206    ///
207    /// Infrastructure events are those that maintain simulation state but don't
208    /// represent actual application work (like connection restoration).
209    /// Returns true if empty or contains only infrastructure events.
210    pub fn has_only_infrastructure_events(&self) -> bool {
211        self.heap
212            .iter()
213            .all(|scheduled_event| scheduled_event.event().is_infrastructure_event())
214    }
215}
216
217impl Default for EventQueue {
218    fn default() -> Self {
219        Self::new()
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226
227    #[test]
228    fn test_infrastructure_event_detection() {
229        // Test Event::is_infrastructure_event() method
230        let restore_event = Event::Connection {
231            id: 1,
232            state: ConnectionStateChange::PartitionRestore,
233        };
234        assert!(restore_event.is_infrastructure_event());
235
236        let timer_event = Event::Timer { task_id: 1 };
237        assert!(!timer_event.is_infrastructure_event());
238
239        let network_event = Event::Network {
240            connection_id: 1,
241            operation: NetworkOperation::DataDelivery {
242                data: vec![1, 2, 3],
243            },
244        };
245        assert!(!network_event.is_infrastructure_event());
246
247        let shutdown_event = Event::Shutdown;
248        assert!(!shutdown_event.is_infrastructure_event());
249
250        // Test EventQueue::has_only_infrastructure_events() method
251        let mut queue = EventQueue::new();
252
253        // Empty queue should be considered "only infrastructure"
254        assert!(queue.has_only_infrastructure_events());
255
256        // Queue with only ConnectionRestore events
257        queue.schedule(ScheduledEvent::new(
258            Duration::from_secs(1),
259            restore_event,
260            1,
261        ));
262        assert!(queue.has_only_infrastructure_events());
263
264        // Queue with workload events should return false
265        queue.schedule(ScheduledEvent::new(Duration::from_secs(2), timer_event, 2));
266        assert!(!queue.has_only_infrastructure_events());
267
268        // Queue with network events should return false
269        let mut queue2 = EventQueue::new();
270        queue2.schedule(ScheduledEvent::new(
271            Duration::from_secs(1),
272            network_event,
273            1,
274        ));
275        assert!(!queue2.has_only_infrastructure_events());
276    }
277
278    #[test]
279    fn event_queue_ordering() {
280        let mut queue = EventQueue::new();
281
282        // Schedule events in random order
283        queue.schedule(ScheduledEvent::new(
284            Duration::from_millis(300),
285            Event::Timer { task_id: 3 },
286            2,
287        ));
288        queue.schedule(ScheduledEvent::new(
289            Duration::from_millis(100),
290            Event::Timer { task_id: 1 },
291            0,
292        ));
293        queue.schedule(ScheduledEvent::new(
294            Duration::from_millis(200),
295            Event::Timer { task_id: 2 },
296            1,
297        ));
298
299        // Should pop in time order
300        let event1 = queue.pop_earliest().expect("should have event");
301        assert_eq!(event1.time(), Duration::from_millis(100));
302        assert_eq!(event1.event(), &Event::Timer { task_id: 1 });
303
304        let event2 = queue.pop_earliest().expect("should have event");
305        assert_eq!(event2.time(), Duration::from_millis(200));
306        assert_eq!(event2.event(), &Event::Timer { task_id: 2 });
307
308        let event3 = queue.pop_earliest().expect("should have event");
309        assert_eq!(event3.time(), Duration::from_millis(300));
310        assert_eq!(event3.event(), &Event::Timer { task_id: 3 });
311
312        assert!(queue.is_empty());
313    }
314
315    #[test]
316    fn same_time_deterministic_ordering() {
317        let mut queue = EventQueue::new();
318        let same_time = Duration::from_millis(100);
319
320        // Schedule multiple events at the same time with different sequence numbers
321        queue.schedule(ScheduledEvent::new(
322            same_time,
323            Event::Timer { task_id: 3 },
324            2, // Later sequence
325        ));
326        queue.schedule(ScheduledEvent::new(
327            same_time,
328            Event::Timer { task_id: 1 },
329            0, // Earlier sequence
330        ));
331        queue.schedule(ScheduledEvent::new(
332            same_time,
333            Event::Timer { task_id: 2 },
334            1, // Middle sequence
335        ));
336
337        // Should pop in sequence order when times are equal
338        let event1 = queue.pop_earliest().expect("should have event");
339        assert_eq!(event1.event(), &Event::Timer { task_id: 1 });
340        assert_eq!(event1.sequence, 0);
341
342        let event2 = queue.pop_earliest().expect("should have event");
343        assert_eq!(event2.event(), &Event::Timer { task_id: 2 });
344        assert_eq!(event2.sequence, 1);
345
346        let event3 = queue.pop_earliest().expect("should have event");
347        assert_eq!(event3.event(), &Event::Timer { task_id: 3 });
348        assert_eq!(event3.sequence, 2);
349
350        assert!(queue.is_empty());
351    }
352}