1use std::{cmp::Ordering, collections::BinaryHeap, time::Duration};
7
8pub use crate::storage::StorageOperation;
9
10#[derive(Debug, Clone, PartialEq, Eq)]
12pub enum Event {
13 Timer {
15 task_id: u64,
17 },
18
19 Network {
21 connection_id: u64,
23 operation: NetworkOperation,
25 },
26
27 Connection {
29 id: u64,
31 state: ConnectionStateChange,
33 },
34
35 Storage {
37 file_id: u64,
39 operation: StorageOperation,
41 },
42
43 Shutdown,
45}
46
47impl Event {
48 pub fn is_infrastructure_event(&self) -> bool {
54 matches!(
55 self,
56 Event::Connection {
57 state: ConnectionStateChange::PartitionRestore
58 | ConnectionStateChange::SendPartitionClear
59 | ConnectionStateChange::RecvPartitionClear
60 | ConnectionStateChange::CutRestore,
61 ..
62 } )
65 }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
70pub enum NetworkOperation {
71 DataDelivery {
73 data: Vec<u8>,
75 },
76 ProcessSendBuffer,
78 FinDelivery,
81}
82
83#[derive(Debug, Clone, PartialEq, Eq)]
85pub enum ConnectionStateChange {
86 BindComplete,
88 ConnectionReady,
90 ClogClear,
92 ReadClogClear,
94 CutRestore,
96 PartitionRestore,
98 SendPartitionClear,
100 RecvPartitionClear,
102 HalfOpenError,
104}
105
106#[derive(Debug, Clone, PartialEq, Eq)]
108pub struct ScheduledEvent {
109 time: Duration,
110 event: Event,
111 pub sequence: u64,
113}
114
115impl ScheduledEvent {
116 pub fn new(time: Duration, event: Event, sequence: u64) -> Self {
118 Self {
119 time,
120 event,
121 sequence,
122 }
123 }
124
125 pub fn time(&self) -> Duration {
127 self.time
128 }
129
130 pub fn event(&self) -> &Event {
132 &self.event
133 }
134
135 pub fn into_event(self) -> Event {
137 self.event
138 }
139}
140
141impl PartialOrd for ScheduledEvent {
142 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
143 Some(self.cmp(other))
144 }
145}
146
147impl Ord for ScheduledEvent {
148 fn cmp(&self, other: &Self) -> Ordering {
149 match other.time.cmp(&self.time) {
152 Ordering::Equal => {
153 other.sequence.cmp(&self.sequence)
156 }
157 other => other,
158 }
159 }
160}
161
162#[derive(Debug)]
167pub struct EventQueue {
168 heap: BinaryHeap<ScheduledEvent>,
169}
170
171impl EventQueue {
172 pub fn new() -> Self {
174 Self {
175 heap: BinaryHeap::new(),
176 }
177 }
178
179 pub fn schedule(&mut self, event: ScheduledEvent) {
181 self.heap.push(event);
182 }
183
184 pub fn pop_earliest(&mut self) -> Option<ScheduledEvent> {
186 self.heap.pop()
187 }
188
189 #[allow(dead_code)]
191 pub fn peek_earliest(&self) -> Option<&ScheduledEvent> {
192 self.heap.peek()
193 }
194
195 pub fn is_empty(&self) -> bool {
197 self.heap.is_empty()
198 }
199
200 pub fn len(&self) -> usize {
202 self.heap.len()
203 }
204
205 pub fn has_only_infrastructure_events(&self) -> bool {
211 self.heap
212 .iter()
213 .all(|scheduled_event| scheduled_event.event().is_infrastructure_event())
214 }
215}
216
217impl Default for EventQueue {
218 fn default() -> Self {
219 Self::new()
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use super::*;
226
227 #[test]
228 fn test_infrastructure_event_detection() {
229 let restore_event = Event::Connection {
231 id: 1,
232 state: ConnectionStateChange::PartitionRestore,
233 };
234 assert!(restore_event.is_infrastructure_event());
235
236 let timer_event = Event::Timer { task_id: 1 };
237 assert!(!timer_event.is_infrastructure_event());
238
239 let network_event = Event::Network {
240 connection_id: 1,
241 operation: NetworkOperation::DataDelivery {
242 data: vec![1, 2, 3],
243 },
244 };
245 assert!(!network_event.is_infrastructure_event());
246
247 let shutdown_event = Event::Shutdown;
248 assert!(!shutdown_event.is_infrastructure_event());
249
250 let mut queue = EventQueue::new();
252
253 assert!(queue.has_only_infrastructure_events());
255
256 queue.schedule(ScheduledEvent::new(
258 Duration::from_secs(1),
259 restore_event,
260 1,
261 ));
262 assert!(queue.has_only_infrastructure_events());
263
264 queue.schedule(ScheduledEvent::new(Duration::from_secs(2), timer_event, 2));
266 assert!(!queue.has_only_infrastructure_events());
267
268 let mut queue2 = EventQueue::new();
270 queue2.schedule(ScheduledEvent::new(
271 Duration::from_secs(1),
272 network_event,
273 1,
274 ));
275 assert!(!queue2.has_only_infrastructure_events());
276 }
277
278 #[test]
279 fn event_queue_ordering() {
280 let mut queue = EventQueue::new();
281
282 queue.schedule(ScheduledEvent::new(
284 Duration::from_millis(300),
285 Event::Timer { task_id: 3 },
286 2,
287 ));
288 queue.schedule(ScheduledEvent::new(
289 Duration::from_millis(100),
290 Event::Timer { task_id: 1 },
291 0,
292 ));
293 queue.schedule(ScheduledEvent::new(
294 Duration::from_millis(200),
295 Event::Timer { task_id: 2 },
296 1,
297 ));
298
299 let event1 = queue.pop_earliest().expect("should have event");
301 assert_eq!(event1.time(), Duration::from_millis(100));
302 assert_eq!(event1.event(), &Event::Timer { task_id: 1 });
303
304 let event2 = queue.pop_earliest().expect("should have event");
305 assert_eq!(event2.time(), Duration::from_millis(200));
306 assert_eq!(event2.event(), &Event::Timer { task_id: 2 });
307
308 let event3 = queue.pop_earliest().expect("should have event");
309 assert_eq!(event3.time(), Duration::from_millis(300));
310 assert_eq!(event3.event(), &Event::Timer { task_id: 3 });
311
312 assert!(queue.is_empty());
313 }
314
315 #[test]
316 fn same_time_deterministic_ordering() {
317 let mut queue = EventQueue::new();
318 let same_time = Duration::from_millis(100);
319
320 queue.schedule(ScheduledEvent::new(
322 same_time,
323 Event::Timer { task_id: 3 },
324 2, ));
326 queue.schedule(ScheduledEvent::new(
327 same_time,
328 Event::Timer { task_id: 1 },
329 0, ));
331 queue.schedule(ScheduledEvent::new(
332 same_time,
333 Event::Timer { task_id: 2 },
334 1, ));
336
337 let event1 = queue.pop_earliest().expect("should have event");
339 assert_eq!(event1.event(), &Event::Timer { task_id: 1 });
340 assert_eq!(event1.sequence, 0);
341
342 let event2 = queue.pop_earliest().expect("should have event");
343 assert_eq!(event2.event(), &Event::Timer { task_id: 2 });
344 assert_eq!(event2.sequence, 1);
345
346 let event3 = queue.pop_earliest().expect("should have event");
347 assert_eq!(event3.event(), &Event::Timer { task_id: 3 });
348 assert_eq!(event3.sequence, 2);
349
350 assert!(queue.is_empty());
351 }
352}