1use std::{cmp::Ordering, collections::BinaryHeap, time::Duration};
7
8#[derive(Debug, Clone, PartialEq, Eq)]
10pub enum Event {
11 Timer {
13 task_id: u64,
15 },
16
17 Network {
19 connection_id: u64,
21 operation: NetworkOperation,
23 },
24
25 Connection {
27 id: u64,
29 state: ConnectionStateChange,
31 },
32
33 Shutdown,
35}
36
37impl Event {
38 pub fn is_infrastructure_event(&self) -> bool {
44 matches!(
45 self,
46 Event::Connection {
47 state: ConnectionStateChange::PartitionRestore
48 | ConnectionStateChange::SendPartitionClear
49 | ConnectionStateChange::RecvPartitionClear
50 | ConnectionStateChange::CutRestore,
51 ..
52 } )
55 }
56}
57
58#[derive(Debug, Clone, PartialEq, Eq)]
60pub enum NetworkOperation {
61 DataDelivery {
63 data: Vec<u8>,
65 },
66 ProcessSendBuffer,
68}
69
70#[derive(Debug, Clone, PartialEq, Eq)]
72pub enum ConnectionStateChange {
73 BindComplete,
75 ConnectionReady,
77 ClogClear,
79 ReadClogClear,
81 CutRestore,
83 PartitionRestore,
85 SendPartitionClear,
87 RecvPartitionClear,
89 HalfOpenError,
91}
92
93#[derive(Debug, Clone, PartialEq, Eq)]
95pub struct ScheduledEvent {
96 time: Duration,
97 event: Event,
98 pub sequence: u64,
100}
101
102impl ScheduledEvent {
103 pub fn new(time: Duration, event: Event, sequence: u64) -> Self {
105 Self {
106 time,
107 event,
108 sequence,
109 }
110 }
111
112 pub fn time(&self) -> Duration {
114 self.time
115 }
116
117 pub fn event(&self) -> &Event {
119 &self.event
120 }
121
122 pub fn into_event(self) -> Event {
124 self.event
125 }
126}
127
128impl PartialOrd for ScheduledEvent {
129 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
130 Some(self.cmp(other))
131 }
132}
133
134impl Ord for ScheduledEvent {
135 fn cmp(&self, other: &Self) -> Ordering {
136 match other.time.cmp(&self.time) {
139 Ordering::Equal => {
140 other.sequence.cmp(&self.sequence)
143 }
144 other => other,
145 }
146 }
147}
148
149#[derive(Debug)]
154pub struct EventQueue {
155 heap: BinaryHeap<ScheduledEvent>,
156}
157
158impl EventQueue {
159 pub fn new() -> Self {
161 Self {
162 heap: BinaryHeap::new(),
163 }
164 }
165
166 pub fn schedule(&mut self, event: ScheduledEvent) {
168 self.heap.push(event);
169 }
170
171 pub fn pop_earliest(&mut self) -> Option<ScheduledEvent> {
173 self.heap.pop()
174 }
175
176 #[allow(dead_code)]
178 pub fn peek_earliest(&self) -> Option<&ScheduledEvent> {
179 self.heap.peek()
180 }
181
182 pub fn is_empty(&self) -> bool {
184 self.heap.is_empty()
185 }
186
187 pub fn len(&self) -> usize {
189 self.heap.len()
190 }
191
192 pub fn has_only_infrastructure_events(&self) -> bool {
198 self.heap
199 .iter()
200 .all(|scheduled_event| scheduled_event.event().is_infrastructure_event())
201 }
202}
203
204impl Default for EventQueue {
205 fn default() -> Self {
206 Self::new()
207 }
208}
209
210#[cfg(test)]
211mod tests {
212 use super::*;
213
214 #[test]
215 fn test_infrastructure_event_detection() {
216 let restore_event = Event::Connection {
218 id: 1,
219 state: ConnectionStateChange::PartitionRestore,
220 };
221 assert!(restore_event.is_infrastructure_event());
222
223 let timer_event = Event::Timer { task_id: 1 };
224 assert!(!timer_event.is_infrastructure_event());
225
226 let network_event = Event::Network {
227 connection_id: 1,
228 operation: NetworkOperation::DataDelivery {
229 data: vec![1, 2, 3],
230 },
231 };
232 assert!(!network_event.is_infrastructure_event());
233
234 let shutdown_event = Event::Shutdown;
235 assert!(!shutdown_event.is_infrastructure_event());
236
237 let mut queue = EventQueue::new();
239
240 assert!(queue.has_only_infrastructure_events());
242
243 queue.schedule(ScheduledEvent::new(
245 Duration::from_secs(1),
246 restore_event,
247 1,
248 ));
249 assert!(queue.has_only_infrastructure_events());
250
251 queue.schedule(ScheduledEvent::new(Duration::from_secs(2), timer_event, 2));
253 assert!(!queue.has_only_infrastructure_events());
254
255 let mut queue2 = EventQueue::new();
257 queue2.schedule(ScheduledEvent::new(
258 Duration::from_secs(1),
259 network_event,
260 1,
261 ));
262 assert!(!queue2.has_only_infrastructure_events());
263 }
264
265 #[test]
266 fn event_queue_ordering() {
267 let mut queue = EventQueue::new();
268
269 queue.schedule(ScheduledEvent::new(
271 Duration::from_millis(300),
272 Event::Timer { task_id: 3 },
273 2,
274 ));
275 queue.schedule(ScheduledEvent::new(
276 Duration::from_millis(100),
277 Event::Timer { task_id: 1 },
278 0,
279 ));
280 queue.schedule(ScheduledEvent::new(
281 Duration::from_millis(200),
282 Event::Timer { task_id: 2 },
283 1,
284 ));
285
286 let event1 = queue.pop_earliest().expect("should have event");
288 assert_eq!(event1.time(), Duration::from_millis(100));
289 assert_eq!(event1.event(), &Event::Timer { task_id: 1 });
290
291 let event2 = queue.pop_earliest().expect("should have event");
292 assert_eq!(event2.time(), Duration::from_millis(200));
293 assert_eq!(event2.event(), &Event::Timer { task_id: 2 });
294
295 let event3 = queue.pop_earliest().expect("should have event");
296 assert_eq!(event3.time(), Duration::from_millis(300));
297 assert_eq!(event3.event(), &Event::Timer { task_id: 3 });
298
299 assert!(queue.is_empty());
300 }
301
302 #[test]
303 fn same_time_deterministic_ordering() {
304 let mut queue = EventQueue::new();
305 let same_time = Duration::from_millis(100);
306
307 queue.schedule(ScheduledEvent::new(
309 same_time,
310 Event::Timer { task_id: 3 },
311 2, ));
313 queue.schedule(ScheduledEvent::new(
314 same_time,
315 Event::Timer { task_id: 1 },
316 0, ));
318 queue.schedule(ScheduledEvent::new(
319 same_time,
320 Event::Timer { task_id: 2 },
321 1, ));
323
324 let event1 = queue.pop_earliest().expect("should have event");
326 assert_eq!(event1.event(), &Event::Timer { task_id: 1 });
327 assert_eq!(event1.sequence, 0);
328
329 let event2 = queue.pop_earliest().expect("should have event");
330 assert_eq!(event2.event(), &Event::Timer { task_id: 2 });
331 assert_eq!(event2.sequence, 1);
332
333 let event3 = queue.pop_earliest().expect("should have event");
334 assert_eq!(event3.event(), &Event::Timer { task_id: 3 });
335 assert_eq!(event3.sequence, 2);
336
337 assert!(queue.is_empty());
338 }
339}