1use std::{cmp::Ordering, collections::BinaryHeap, net::SocketAddr, time::Duration};
10
11use super::{
12 rng::SimulationRng,
13 time::{TimeSource, VirtualTime},
14};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
20pub struct EventId(u64);
21
22impl EventId {
23 pub fn as_u64(&self) -> u64 {
24 self.0
25 }
26}
27
28#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
30pub enum EventType {
31 MessageDelivery {
33 from: SocketAddr,
35 to: SocketAddr,
37 payload: Vec<u8>,
39 },
40 Timer {
42 peer: SocketAddr,
44 timer_id: u64,
46 },
47 Fault {
49 peers: Vec<SocketAddr>,
51 description: String,
53 },
54 Custom {
56 peer: SocketAddr,
58 kind: String,
60 data: Vec<u8>,
62 },
63}
64
65impl EventType {
66 pub fn primary_peer(&self) -> Option<SocketAddr> {
68 match self {
69 EventType::MessageDelivery { to, .. } => Some(*to),
70 EventType::Timer { peer, .. } => Some(*peer),
71 EventType::Fault { peers, .. } => peers.first().copied(),
72 EventType::Custom { peer, .. } => Some(*peer),
73 }
74 }
75}
76
77#[derive(Debug, Clone)]
79pub struct Event {
80 pub timestamp: u64,
82 pub id: EventId,
84 pub event_type: EventType,
86}
87
88impl Event {
89 fn new(timestamp: u64, id: EventId, event_type: EventType) -> Self {
90 Self {
91 timestamp,
92 id,
93 event_type,
94 }
95 }
96}
97
98impl PartialEq for Event {
99 fn eq(&self, other: &Self) -> bool {
100 self.id == other.id
101 }
102}
103
104impl Eq for Event {}
105
106impl PartialOrd for Event {
107 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
108 Some(self.cmp(other))
109 }
110}
111
112impl Ord for Event {
113 fn cmp(&self, other: &Self) -> Ordering {
114 match other.timestamp.cmp(&self.timestamp) {
121 Ordering::Equal => {
122 let self_peer = self.event_type.primary_peer();
123 let other_peer = other.event_type.primary_peer();
124 match other_peer.cmp(&self_peer) {
125 Ordering::Equal => match other.event_type.cmp(&self.event_type) {
126 Ordering::Equal => other.id.0.cmp(&self.id.0),
127 ord => ord,
128 },
129 ord => ord,
130 }
131 }
132 ord => ord,
133 }
134 }
135}
136
137#[derive(Debug, Clone, Default)]
139pub struct SchedulerConfig {
140 pub max_events_per_step: usize,
142 pub trace_events: bool,
144}
145
146pub struct Scheduler {
151 time: VirtualTime,
153 rng: SimulationRng,
155 pending_events: BinaryHeap<Event>,
157 next_event_id: u64,
159 event_log: Vec<Event>,
161 config: SchedulerConfig,
163}
164
165impl Scheduler {
166 pub fn new(seed: u64) -> Self {
168 Self::with_config(seed, SchedulerConfig::default())
169 }
170
171 pub fn with_config(seed: u64, config: SchedulerConfig) -> Self {
173 Self {
174 time: VirtualTime::new(),
175 rng: SimulationRng::new(seed),
176 pending_events: BinaryHeap::new(),
177 next_event_id: 0,
178 event_log: Vec::new(),
179 config,
180 }
181 }
182
183 pub fn time(&self) -> &VirtualTime {
185 &self.time
186 }
187
188 pub fn time_mut(&mut self) -> &mut VirtualTime {
190 &mut self.time
191 }
192
193 pub fn now(&self) -> u64 {
195 self.time.now_nanos()
196 }
197
198 pub fn rng(&self) -> &SimulationRng {
200 &self.rng
201 }
202
203 pub fn seed(&self) -> u64 {
205 self.rng.seed()
206 }
207
208 pub fn pending_count(&self) -> usize {
210 self.pending_events.len()
211 }
212
213 pub fn event_log(&self) -> &[Event] {
215 &self.event_log
216 }
217
218 pub fn clear_event_log(&mut self) {
220 self.event_log.clear();
221 }
222
223 pub fn schedule_at(&mut self, timestamp: u64, event_type: EventType) -> EventId {
225 let id = EventId(self.next_event_id);
226 self.next_event_id += 1;
227
228 let event = Event::new(timestamp, id, event_type);
229 self.pending_events.push(event);
230 id
231 }
232
233 pub fn schedule_after(&mut self, delay: Duration, event_type: EventType) -> EventId {
235 let timestamp = self.now().saturating_add(delay.as_nanos() as u64);
236 self.schedule_at(timestamp, event_type)
237 }
238
239 pub fn schedule_now(&mut self, event_type: EventType) -> EventId {
241 self.schedule_at(self.now(), event_type)
242 }
243
244 pub fn cancel(&mut self, id: EventId) -> bool {
248 let mut events: Vec<_> = std::mem::take(&mut self.pending_events).into_vec();
249 let original_len = events.len();
250 events.retain(|e| e.id != id);
251 let cancelled = events.len() < original_len;
252 self.pending_events = BinaryHeap::from(events);
253 cancelled
254 }
255
256 pub fn next_event_time(&self) -> Option<u64> {
258 self.pending_events.peek().map(|e| e.timestamp)
259 }
260
261 pub fn step(&mut self) -> Option<Event> {
265 let event = self.pending_events.pop()?;
266
267 if event.timestamp > self.now() {
269 self.time.advance_to(event.timestamp);
270 }
271
272 if self.config.trace_events {
273 tracing::trace!(
274 timestamp = event.timestamp,
275 id = event.id.0,
276 ?event.event_type,
277 "Processing event"
278 );
279 }
280
281 self.event_log.push(event.clone());
282 Some(event)
283 }
284
285 pub fn run_until<F>(&mut self, mut condition: F) -> usize
289 where
290 F: FnMut(&Scheduler) -> bool,
291 {
292 let mut processed = 0;
293 while !condition(self) {
294 if self.step().is_none() {
295 break;
296 }
297 processed += 1;
298
299 if self.config.max_events_per_step > 0 && processed >= self.config.max_events_per_step {
300 break;
301 }
302 }
303 processed
304 }
305
306 pub fn run_until_time(&mut self, target_time: u64) -> usize {
310 let mut processed = 0;
311 while let Some(next_time) = self.next_event_time() {
312 if next_time > target_time {
313 break;
314 }
315 if self.step().is_none() {
316 break;
317 }
318 processed += 1;
319 }
320
321 if self.now() < target_time {
323 self.time.advance_to(target_time);
324 }
325
326 processed
327 }
328
329 pub fn run_all(&mut self) -> usize {
333 let mut processed = 0;
334 while self.step().is_some() {
335 processed += 1;
336 }
337 processed
338 }
339
340 pub fn drain_pending(&mut self) -> Vec<Event> {
342 std::mem::take(&mut self.pending_events).into_vec()
343 }
344}
345
346impl std::fmt::Debug for Scheduler {
347 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
348 f.debug_struct("Scheduler")
349 .field("now", &self.now())
350 .field("seed", &self.rng.seed())
351 .field("pending_count", &self.pending_count())
352 .field("event_log_len", &self.event_log.len())
353 .finish()
354 }
355}
356
357#[cfg(test)]
358mod tests {
359 use super::*;
360 use std::net::{IpAddr, Ipv4Addr};
361
362 fn addr(port: u16) -> SocketAddr {
363 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port)
364 }
365
366 #[test]
367 fn test_scheduler_basic() {
368 let mut scheduler = Scheduler::new(42);
369
370 scheduler.schedule_at(
371 100,
372 EventType::Timer {
373 peer: addr(1000),
374 timer_id: 1,
375 },
376 );
377 scheduler.schedule_at(
378 50,
379 EventType::Timer {
380 peer: addr(1000),
381 timer_id: 2,
382 },
383 );
384 scheduler.schedule_at(
385 200,
386 EventType::Timer {
387 peer: addr(1000),
388 timer_id: 3,
389 },
390 );
391
392 let e1 = scheduler.step().unwrap();
394 assert_eq!(e1.timestamp, 50);
395
396 let e2 = scheduler.step().unwrap();
397 assert_eq!(e2.timestamp, 100);
398
399 let e3 = scheduler.step().unwrap();
400 assert_eq!(e3.timestamp, 200);
401
402 assert!(scheduler.step().is_none());
403 }
404
405 #[test]
406 fn test_scheduler_time_advancement() {
407 let mut scheduler = Scheduler::new(42);
408
409 scheduler.schedule_at(
410 1000,
411 EventType::Timer {
412 peer: addr(1000),
413 timer_id: 1,
414 },
415 );
416
417 assert_eq!(scheduler.now(), 0);
418 scheduler.step();
419 assert_eq!(scheduler.now(), 1000);
420 }
421
422 #[test]
423 fn test_scheduler_same_time_ordering() {
424 let mut scheduler = Scheduler::new(42);
425
426 scheduler.schedule_at(
428 100,
429 EventType::Timer {
430 peer: addr(3000),
431 timer_id: 1,
432 },
433 );
434 scheduler.schedule_at(
435 100,
436 EventType::Timer {
437 peer: addr(1000),
438 timer_id: 2,
439 },
440 );
441 scheduler.schedule_at(
442 100,
443 EventType::Timer {
444 peer: addr(2000),
445 timer_id: 3,
446 },
447 );
448
449 let e1 = scheduler.step().unwrap();
451 let e2 = scheduler.step().unwrap();
452 let e3 = scheduler.step().unwrap();
453
454 assert_eq!(e1.timestamp, 100);
456 assert_eq!(e2.timestamp, 100);
457 assert_eq!(e3.timestamp, 100);
458
459 let get_peer = |e: &Event| match &e.event_type {
461 EventType::Timer { peer, .. } => *peer,
462 _ => panic!("unexpected event type"),
463 };
464
465 let peers: Vec<_> = [&e1, &e2, &e3].iter().map(|e| get_peer(e)).collect();
466 let mut sorted_peers = peers.clone();
468 sorted_peers.sort();
469 assert_eq!(peers, sorted_peers);
470 }
471
472 #[test]
473 fn test_scheduler_cancel() {
474 let mut scheduler = Scheduler::new(42);
475
476 let id1 = scheduler.schedule_at(
477 100,
478 EventType::Timer {
479 peer: addr(1000),
480 timer_id: 1,
481 },
482 );
483 scheduler.schedule_at(
484 200,
485 EventType::Timer {
486 peer: addr(1000),
487 timer_id: 2,
488 },
489 );
490
491 assert!(scheduler.cancel(id1));
492 assert!(!scheduler.cancel(id1)); let e = scheduler.step().unwrap();
495 assert_eq!(e.timestamp, 200); }
497
498 #[test]
499 fn test_scheduler_run_until_time() {
500 let mut scheduler = Scheduler::new(42);
501
502 for i in 1..=5 {
503 scheduler.schedule_at(
504 i * 100,
505 EventType::Timer {
506 peer: addr(1000),
507 timer_id: i,
508 },
509 );
510 }
511
512 let processed = scheduler.run_until_time(250);
513 assert_eq!(processed, 2); assert_eq!(scheduler.now(), 250);
515 assert_eq!(scheduler.pending_count(), 3); }
517
518 #[test]
519 fn test_scheduler_run_until() {
520 let mut scheduler = Scheduler::new(42);
521
522 for i in 1..=10 {
523 scheduler.schedule_at(
524 i * 100,
525 EventType::Timer {
526 peer: addr(1000),
527 timer_id: i,
528 },
529 );
530 }
531
532 let processed = scheduler.run_until(|s| s.now() >= 500);
533 assert_eq!(processed, 5);
534 assert_eq!(scheduler.now(), 500);
535 }
536
537 #[test]
538 fn test_scheduler_event_log() {
539 let mut scheduler = Scheduler::new(42);
540
541 scheduler.schedule_at(
542 100,
543 EventType::Timer {
544 peer: addr(1000),
545 timer_id: 1,
546 },
547 );
548 scheduler.schedule_at(
549 200,
550 EventType::Timer {
551 peer: addr(1000),
552 timer_id: 2,
553 },
554 );
555
556 scheduler.run_all();
557
558 let log = scheduler.event_log();
559 assert_eq!(log.len(), 2);
560 assert_eq!(log[0].timestamp, 100);
561 assert_eq!(log[1].timestamp, 200);
562 }
563
564 #[test]
565 fn test_scheduler_determinism() {
566 fn run_simulation(seed: u64) -> Vec<(u64, EventId)> {
567 let mut scheduler = Scheduler::new(seed);
568
569 for i in 0..20 {
571 let delay = scheduler.rng().gen_range(0..1000) as u64;
572 scheduler.schedule_at(
573 delay,
574 EventType::Timer {
575 peer: addr((1000 + i) as u16),
576 timer_id: i as u64,
577 },
578 );
579 }
580
581 scheduler.run_all();
582 scheduler
583 .event_log()
584 .iter()
585 .map(|e| (e.timestamp, e.id))
586 .collect()
587 }
588
589 let result1 = run_simulation(42);
591 let result2 = run_simulation(42);
592 assert_eq!(result1, result2);
593
594 let result3 = run_simulation(43);
596 assert_ne!(result1, result3);
597 }
598
599 #[test]
600 fn test_message_delivery_event() {
601 let mut scheduler = Scheduler::new(42);
602
603 scheduler.schedule_at(
604 100,
605 EventType::MessageDelivery {
606 from: addr(1000),
607 to: addr(2000),
608 payload: vec![1, 2, 3],
609 },
610 );
611
612 let event = scheduler.step().unwrap();
613 match event.event_type {
614 EventType::MessageDelivery { from, to, payload } => {
615 assert_eq!(from, addr(1000));
616 assert_eq!(to, addr(2000));
617 assert_eq!(payload, vec![1, 2, 3]);
618 }
619 _ => panic!("unexpected event type"),
620 }
621 }
622}