moonpool_sim/sim/
events.rs

1//! Event scheduling and processing for the simulation engine.
2//!
3//! This module provides the core event types and queue for scheduling
4//! events in chronological order with deterministic ordering.
5
6use std::{cmp::Ordering, collections::BinaryHeap, time::Duration};
7
8/// Events that can be scheduled in the simulation.
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub enum Event {
11    /// Timer event for waking sleeping tasks
12    Timer {
13        /// The unique identifier for the task to wake.
14        task_id: u64,
15    },
16
17    /// Network data operations
18    Network {
19        /// The connection involved
20        connection_id: u64,
21        /// The operation type
22        operation: NetworkOperation,
23    },
24
25    /// Connection state changes
26    Connection {
27        /// The connection or listener ID
28        id: u64,
29        /// The state change type
30        state: ConnectionStateChange,
31    },
32
33    /// Shutdown event to wake all tasks for graceful termination
34    Shutdown,
35}
36
37impl Event {
38    /// Determines if this event is purely infrastructural (not workload-related).
39    ///
40    /// Infrastructure events maintain simulation state but don't represent actual
41    /// application work. These events can be safely ignored when determining if
42    /// a simulation should terminate after workloads complete.
43    pub fn is_infrastructure_event(&self) -> bool {
44        matches!(
45            self,
46            Event::Connection {
47                state: ConnectionStateChange::PartitionRestore
48                    | ConnectionStateChange::SendPartitionClear
49                    | ConnectionStateChange::RecvPartitionClear
50                    | ConnectionStateChange::CutRestore,
51                ..
52            } // Could add other infrastructure events here if needed:
53              // | Event::Connection { state: ConnectionStateChange::ClogClear, .. }
54        )
55    }
56}
57
58/// Network data operations
59#[derive(Debug, Clone, PartialEq, Eq)]
60pub enum NetworkOperation {
61    /// Deliver data to connection's receive buffer
62    DataDelivery {
63        /// The data bytes to deliver
64        data: Vec<u8>,
65    },
66    /// Process next message from connection's send buffer
67    ProcessSendBuffer,
68}
69
70/// Connection state changes
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub enum ConnectionStateChange {
73    /// Listener bind operation completed
74    BindComplete,
75    /// Connection establishment completed
76    ConnectionReady,
77    /// Clear write clog for a connection
78    ClogClear,
79    /// Clear read clog for a connection
80    ReadClogClear,
81    /// Restore a temporarily cut connection
82    CutRestore,
83    /// Restore network partition between IPs
84    PartitionRestore,
85    /// Clear send partition for an IP
86    SendPartitionClear,
87    /// Clear receive partition for an IP
88    RecvPartitionClear,
89    /// Half-open connection starts returning errors
90    HalfOpenError,
91}
92
93/// An event scheduled for execution at a specific simulation time.
94#[derive(Debug, Clone, PartialEq, Eq)]
95pub struct ScheduledEvent {
96    time: Duration,
97    event: Event,
98    /// Sequence number for deterministic ordering
99    pub sequence: u64,
100}
101
102impl ScheduledEvent {
103    /// Creates a new scheduled event.
104    pub fn new(time: Duration, event: Event, sequence: u64) -> Self {
105        Self {
106            time,
107            event,
108            sequence,
109        }
110    }
111
112    /// Returns the scheduled execution time.
113    pub fn time(&self) -> Duration {
114        self.time
115    }
116
117    /// Returns a reference to the event.
118    pub fn event(&self) -> &Event {
119        &self.event
120    }
121
122    /// Consumes the scheduled event and returns the event.
123    pub fn into_event(self) -> Event {
124        self.event
125    }
126}
127
128impl PartialOrd for ScheduledEvent {
129    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
130        Some(self.cmp(other))
131    }
132}
133
134impl Ord for ScheduledEvent {
135    fn cmp(&self, other: &Self) -> Ordering {
136        // BinaryHeap is a max heap, but we want earliest time first
137        // So we reverse the time comparison
138        match other.time.cmp(&self.time) {
139            Ordering::Equal => {
140                // For events at the same time, use sequence number for deterministic ordering
141                // Earlier sequence numbers should be processed first (also reversed for max heap)
142                other.sequence.cmp(&self.sequence)
143            }
144            other => other,
145        }
146    }
147}
148
149/// A priority queue for scheduling events in chronological order.
150///
151/// Events are processed in time order, with deterministic ordering for events
152/// scheduled at the same time using sequence numbers.
153#[derive(Debug)]
154pub struct EventQueue {
155    heap: BinaryHeap<ScheduledEvent>,
156}
157
158impl EventQueue {
159    /// Creates a new empty event queue.
160    pub fn new() -> Self {
161        Self {
162            heap: BinaryHeap::new(),
163        }
164    }
165
166    /// Schedules an event for execution.
167    pub fn schedule(&mut self, event: ScheduledEvent) {
168        self.heap.push(event);
169    }
170
171    /// Removes and returns the earliest scheduled event.
172    pub fn pop_earliest(&mut self) -> Option<ScheduledEvent> {
173        self.heap.pop()
174    }
175
176    /// Returns a reference to the earliest scheduled event without removing it.
177    #[allow(dead_code)]
178    pub fn peek_earliest(&self) -> Option<&ScheduledEvent> {
179        self.heap.peek()
180    }
181
182    /// Returns `true` if the queue is empty.
183    pub fn is_empty(&self) -> bool {
184        self.heap.is_empty()
185    }
186
187    /// Returns the number of events in the queue.
188    pub fn len(&self) -> usize {
189        self.heap.len()
190    }
191
192    /// Checks if the queue contains only infrastructure events (no workload events).
193    ///
194    /// Infrastructure events are those that maintain simulation state but don't
195    /// represent actual application work (like connection restoration).
196    /// Returns true if empty or contains only infrastructure events.
197    pub fn has_only_infrastructure_events(&self) -> bool {
198        self.heap
199            .iter()
200            .all(|scheduled_event| scheduled_event.event().is_infrastructure_event())
201    }
202}
203
204impl Default for EventQueue {
205    fn default() -> Self {
206        Self::new()
207    }
208}
209
210#[cfg(test)]
211mod tests {
212    use super::*;
213
214    #[test]
215    fn test_infrastructure_event_detection() {
216        // Test Event::is_infrastructure_event() method
217        let restore_event = Event::Connection {
218            id: 1,
219            state: ConnectionStateChange::PartitionRestore,
220        };
221        assert!(restore_event.is_infrastructure_event());
222
223        let timer_event = Event::Timer { task_id: 1 };
224        assert!(!timer_event.is_infrastructure_event());
225
226        let network_event = Event::Network {
227            connection_id: 1,
228            operation: NetworkOperation::DataDelivery {
229                data: vec![1, 2, 3],
230            },
231        };
232        assert!(!network_event.is_infrastructure_event());
233
234        let shutdown_event = Event::Shutdown;
235        assert!(!shutdown_event.is_infrastructure_event());
236
237        // Test EventQueue::has_only_infrastructure_events() method
238        let mut queue = EventQueue::new();
239
240        // Empty queue should be considered "only infrastructure"
241        assert!(queue.has_only_infrastructure_events());
242
243        // Queue with only ConnectionRestore events
244        queue.schedule(ScheduledEvent::new(
245            Duration::from_secs(1),
246            restore_event,
247            1,
248        ));
249        assert!(queue.has_only_infrastructure_events());
250
251        // Queue with workload events should return false
252        queue.schedule(ScheduledEvent::new(Duration::from_secs(2), timer_event, 2));
253        assert!(!queue.has_only_infrastructure_events());
254
255        // Queue with network events should return false
256        let mut queue2 = EventQueue::new();
257        queue2.schedule(ScheduledEvent::new(
258            Duration::from_secs(1),
259            network_event,
260            1,
261        ));
262        assert!(!queue2.has_only_infrastructure_events());
263    }
264
265    #[test]
266    fn event_queue_ordering() {
267        let mut queue = EventQueue::new();
268
269        // Schedule events in random order
270        queue.schedule(ScheduledEvent::new(
271            Duration::from_millis(300),
272            Event::Timer { task_id: 3 },
273            2,
274        ));
275        queue.schedule(ScheduledEvent::new(
276            Duration::from_millis(100),
277            Event::Timer { task_id: 1 },
278            0,
279        ));
280        queue.schedule(ScheduledEvent::new(
281            Duration::from_millis(200),
282            Event::Timer { task_id: 2 },
283            1,
284        ));
285
286        // Should pop in time order
287        let event1 = queue.pop_earliest().expect("should have event");
288        assert_eq!(event1.time(), Duration::from_millis(100));
289        assert_eq!(event1.event(), &Event::Timer { task_id: 1 });
290
291        let event2 = queue.pop_earliest().expect("should have event");
292        assert_eq!(event2.time(), Duration::from_millis(200));
293        assert_eq!(event2.event(), &Event::Timer { task_id: 2 });
294
295        let event3 = queue.pop_earliest().expect("should have event");
296        assert_eq!(event3.time(), Duration::from_millis(300));
297        assert_eq!(event3.event(), &Event::Timer { task_id: 3 });
298
299        assert!(queue.is_empty());
300    }
301
302    #[test]
303    fn same_time_deterministic_ordering() {
304        let mut queue = EventQueue::new();
305        let same_time = Duration::from_millis(100);
306
307        // Schedule multiple events at the same time with different sequence numbers
308        queue.schedule(ScheduledEvent::new(
309            same_time,
310            Event::Timer { task_id: 3 },
311            2, // Later sequence
312        ));
313        queue.schedule(ScheduledEvent::new(
314            same_time,
315            Event::Timer { task_id: 1 },
316            0, // Earlier sequence
317        ));
318        queue.schedule(ScheduledEvent::new(
319            same_time,
320            Event::Timer { task_id: 2 },
321            1, // Middle sequence
322        ));
323
324        // Should pop in sequence order when times are equal
325        let event1 = queue.pop_earliest().expect("should have event");
326        assert_eq!(event1.event(), &Event::Timer { task_id: 1 });
327        assert_eq!(event1.sequence, 0);
328
329        let event2 = queue.pop_earliest().expect("should have event");
330        assert_eq!(event2.event(), &Event::Timer { task_id: 2 });
331        assert_eq!(event2.sequence, 1);
332
333        let event3 = queue.pop_earliest().expect("should have event");
334        assert_eq!(event3.event(), &Event::Timer { task_id: 3 });
335        assert_eq!(event3.sequence, 2);
336
337        assert!(queue.is_empty());
338    }
339}