moonpool_sim/sim/events.rs
1//! Event scheduling and processing for the simulation engine.
2//!
3//! This module provides the core event types and queue for scheduling
4//! events in chronological order with deterministic ordering.
5
6use std::{cmp::Ordering, collections::BinaryHeap, time::Duration};
7
8pub use crate::storage::StorageOperation;
9
10/// Events that can be scheduled in the simulation.
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub enum Event {
13 /// Timer event for waking sleeping tasks
14 Timer {
15 /// The unique identifier for the task to wake.
16 task_id: u64,
17 },
18
19 /// Network data operations
20 Network {
21 /// The connection involved
22 connection_id: u64,
23 /// The operation type
24 operation: NetworkOperation,
25 },
26
27 /// Connection state changes
28 Connection {
29 /// The connection or listener ID
30 id: u64,
31 /// The state change type
32 state: ConnectionStateChange,
33 },
34
35 /// Storage I/O operations
36 Storage {
37 /// The file involved
38 file_id: u64,
39 /// The operation type
40 operation: StorageOperation,
41 },
42
43 /// Shutdown event to wake all tasks for graceful termination
44 Shutdown,
45
46 /// Process restart event: a rebooted process is ready to boot again.
47 ///
48 /// Scheduled after a process is killed, at `now + recovery_delay`.
49 /// The orchestrator handles this by calling the process factory
50 /// and spawning a new `run()` task.
51 ProcessRestart {
52 /// The IP address of the process to restart.
53 ip: std::net::IpAddr,
54 },
55
56 /// Graceful shutdown initiated for a process.
57 ///
58 /// Cancels the per-process shutdown token so the process can observe
59 /// `ctx.shutdown().is_cancelled()` and perform cleanup. A
60 /// [`ProcessForceKill`](Event::ProcessForceKill) is scheduled after the
61 /// grace period expires.
62 ProcessGracefulShutdown {
63 /// The IP address of the process being gracefully shut down.
64 ip: std::net::IpAddr,
65 /// Grace period in milliseconds before force-kill.
66 grace_period_ms: u64,
67 /// Recovery delay in milliseconds after force-kill before restart.
68 recovery_delay_ms: u64,
69 },
70
71 /// Force-kill a process after a graceful shutdown grace period.
72 ///
73 /// Aborts the process task and all its connections, then schedules a
74 /// [`ProcessRestart`](Event::ProcessRestart) after a recovery delay.
75 ProcessForceKill {
76 /// The IP address of the process to force-kill.
77 ip: std::net::IpAddr,
78 /// Recovery delay in milliseconds before restart.
79 recovery_delay_ms: u64,
80 },
81}
82
83impl Event {
84 /// Determines if this event is purely infrastructural (not workload-related).
85 ///
86 /// Infrastructure events maintain simulation state but don't represent actual
87 /// application work. These events can be safely ignored when determining if
88 /// a simulation should terminate after workloads complete.
89 pub fn is_infrastructure_event(&self) -> bool {
90 matches!(
91 self,
92 Event::Connection {
93 state: ConnectionStateChange::PartitionRestore
94 | ConnectionStateChange::SendPartitionClear
95 | ConnectionStateChange::RecvPartitionClear
96 | ConnectionStateChange::CutRestore,
97 ..
98 } | Event::ProcessRestart { .. }
99 )
100 }
101}
102
103/// Network data operations
104#[derive(Debug, Clone, PartialEq, Eq)]
105pub enum NetworkOperation {
106 /// Deliver data to connection's receive buffer
107 DataDelivery {
108 /// The data bytes to deliver
109 data: Vec<u8>,
110 },
111 /// Process next message from connection's send buffer
112 ProcessSendBuffer,
113 /// Deliver FIN (graceful close) to a connection's receive side.
114 /// Scheduled after the last DataDelivery to ensure all data arrives first.
115 FinDelivery,
116}
117
118/// Connection state changes
119#[derive(Debug, Clone, PartialEq, Eq)]
120pub enum ConnectionStateChange {
121 /// Listener bind operation completed
122 BindComplete,
123 /// Connection establishment completed
124 ConnectionReady,
125 /// Clear write clog for a connection
126 ClogClear,
127 /// Clear read clog for a connection
128 ReadClogClear,
129 /// Restore a temporarily cut connection
130 CutRestore,
131 /// Restore network partition between IPs
132 PartitionRestore,
133 /// Clear send partition for an IP
134 SendPartitionClear,
135 /// Clear receive partition for an IP
136 RecvPartitionClear,
137 /// Half-open connection starts returning errors
138 HalfOpenError,
139}
140
141/// An event scheduled for execution at a specific simulation time.
142#[derive(Debug, Clone, PartialEq, Eq)]
143pub struct ScheduledEvent {
144 time: Duration,
145 event: Event,
146 /// Sequence number for deterministic ordering
147 pub sequence: u64,
148}
149
150impl ScheduledEvent {
151 /// Creates a new scheduled event.
152 pub fn new(time: Duration, event: Event, sequence: u64) -> Self {
153 Self {
154 time,
155 event,
156 sequence,
157 }
158 }
159
160 /// Returns the scheduled execution time.
161 pub fn time(&self) -> Duration {
162 self.time
163 }
164
165 /// Returns a reference to the event.
166 pub fn event(&self) -> &Event {
167 &self.event
168 }
169
170 /// Consumes the scheduled event and returns the event.
171 pub fn into_event(self) -> Event {
172 self.event
173 }
174}
175
176impl PartialOrd for ScheduledEvent {
177 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
178 Some(self.cmp(other))
179 }
180}
181
182impl Ord for ScheduledEvent {
183 fn cmp(&self, other: &Self) -> Ordering {
184 // BinaryHeap is a max heap, but we want earliest time first
185 // So we reverse the time comparison
186 match other.time.cmp(&self.time) {
187 Ordering::Equal => {
188 // For events at the same time, use sequence number for deterministic ordering
189 // Earlier sequence numbers should be processed first (also reversed for max heap)
190 other.sequence.cmp(&self.sequence)
191 }
192 other => other,
193 }
194 }
195}
196
197/// A priority queue for scheduling events in chronological order.
198///
199/// Events are processed in time order, with deterministic ordering for events
200/// scheduled at the same time using sequence numbers.
201#[derive(Debug)]
202pub struct EventQueue {
203 heap: BinaryHeap<ScheduledEvent>,
204}
205
206impl EventQueue {
207 /// Creates a new empty event queue.
208 pub fn new() -> Self {
209 Self {
210 heap: BinaryHeap::new(),
211 }
212 }
213
214 /// Schedules an event for execution.
215 pub fn schedule(&mut self, event: ScheduledEvent) {
216 self.heap.push(event);
217 }
218
219 /// Removes and returns the earliest scheduled event.
220 pub fn pop_earliest(&mut self) -> Option<ScheduledEvent> {
221 self.heap.pop()
222 }
223
224 /// Returns a reference to the earliest scheduled event without removing it.
225 #[allow(dead_code)]
226 pub fn peek_earliest(&self) -> Option<&ScheduledEvent> {
227 self.heap.peek()
228 }
229
230 /// Returns `true` if the queue is empty.
231 pub fn is_empty(&self) -> bool {
232 self.heap.is_empty()
233 }
234
235 /// Returns the number of events in the queue.
236 pub fn len(&self) -> usize {
237 self.heap.len()
238 }
239
240 /// Checks if the queue contains only infrastructure events (no workload events).
241 ///
242 /// Infrastructure events are those that maintain simulation state but don't
243 /// represent actual application work (like connection restoration).
244 /// Returns true if empty or contains only infrastructure events.
245 pub fn has_only_infrastructure_events(&self) -> bool {
246 self.heap
247 .iter()
248 .all(|scheduled_event| scheduled_event.event().is_infrastructure_event())
249 }
250}
251
252impl Default for EventQueue {
253 fn default() -> Self {
254 Self::new()
255 }
256}
257
258#[cfg(test)]
259mod tests {
260 use super::*;
261
262 #[test]
263 fn test_infrastructure_event_detection() {
264 // Test Event::is_infrastructure_event() method
265 let restore_event = Event::Connection {
266 id: 1,
267 state: ConnectionStateChange::PartitionRestore,
268 };
269 assert!(restore_event.is_infrastructure_event());
270
271 let timer_event = Event::Timer { task_id: 1 };
272 assert!(!timer_event.is_infrastructure_event());
273
274 let network_event = Event::Network {
275 connection_id: 1,
276 operation: NetworkOperation::DataDelivery {
277 data: vec![1, 2, 3],
278 },
279 };
280 assert!(!network_event.is_infrastructure_event());
281
282 let shutdown_event = Event::Shutdown;
283 assert!(!shutdown_event.is_infrastructure_event());
284
285 // Test EventQueue::has_only_infrastructure_events() method
286 let mut queue = EventQueue::new();
287
288 // Empty queue should be considered "only infrastructure"
289 assert!(queue.has_only_infrastructure_events());
290
291 // Queue with only ConnectionRestore events
292 queue.schedule(ScheduledEvent::new(
293 Duration::from_secs(1),
294 restore_event,
295 1,
296 ));
297 assert!(queue.has_only_infrastructure_events());
298
299 // Queue with workload events should return false
300 queue.schedule(ScheduledEvent::new(Duration::from_secs(2), timer_event, 2));
301 assert!(!queue.has_only_infrastructure_events());
302
303 // Queue with network events should return false
304 let mut queue2 = EventQueue::new();
305 queue2.schedule(ScheduledEvent::new(
306 Duration::from_secs(1),
307 network_event,
308 1,
309 ));
310 assert!(!queue2.has_only_infrastructure_events());
311 }
312
313 #[test]
314 fn event_queue_ordering() {
315 let mut queue = EventQueue::new();
316
317 // Schedule events in random order
318 queue.schedule(ScheduledEvent::new(
319 Duration::from_millis(300),
320 Event::Timer { task_id: 3 },
321 2,
322 ));
323 queue.schedule(ScheduledEvent::new(
324 Duration::from_millis(100),
325 Event::Timer { task_id: 1 },
326 0,
327 ));
328 queue.schedule(ScheduledEvent::new(
329 Duration::from_millis(200),
330 Event::Timer { task_id: 2 },
331 1,
332 ));
333
334 // Should pop in time order
335 let event1 = queue.pop_earliest().expect("should have event");
336 assert_eq!(event1.time(), Duration::from_millis(100));
337 assert_eq!(event1.event(), &Event::Timer { task_id: 1 });
338
339 let event2 = queue.pop_earliest().expect("should have event");
340 assert_eq!(event2.time(), Duration::from_millis(200));
341 assert_eq!(event2.event(), &Event::Timer { task_id: 2 });
342
343 let event3 = queue.pop_earliest().expect("should have event");
344 assert_eq!(event3.time(), Duration::from_millis(300));
345 assert_eq!(event3.event(), &Event::Timer { task_id: 3 });
346
347 assert!(queue.is_empty());
348 }
349
350 #[test]
351 fn same_time_deterministic_ordering() {
352 let mut queue = EventQueue::new();
353 let same_time = Duration::from_millis(100);
354
355 // Schedule multiple events at the same time with different sequence numbers
356 queue.schedule(ScheduledEvent::new(
357 same_time,
358 Event::Timer { task_id: 3 },
359 2, // Later sequence
360 ));
361 queue.schedule(ScheduledEvent::new(
362 same_time,
363 Event::Timer { task_id: 1 },
364 0, // Earlier sequence
365 ));
366 queue.schedule(ScheduledEvent::new(
367 same_time,
368 Event::Timer { task_id: 2 },
369 1, // Middle sequence
370 ));
371
372 // Should pop in sequence order when times are equal
373 let event1 = queue.pop_earliest().expect("should have event");
374 assert_eq!(event1.event(), &Event::Timer { task_id: 1 });
375 assert_eq!(event1.sequence, 0);
376
377 let event2 = queue.pop_earliest().expect("should have event");
378 assert_eq!(event2.event(), &Event::Timer { task_id: 2 });
379 assert_eq!(event2.sequence, 1);
380
381 let event3 = queue.pop_earliest().expect("should have event");
382 assert_eq!(event3.event(), &Event::Timer { task_id: 3 });
383 assert_eq!(event3.sequence, 2);
384
385 assert!(queue.is_empty());
386 }
387}