freenet/simulation/
scheduler.rs

1//! Deterministic event scheduler for simulation.
2//!
3//! The scheduler processes events in a deterministic order based on:
4//! 1. Event timestamp (earlier first)
5//! 2. Peer ID (for same timestamp)
6//! 3. Event type (for same timestamp and peer)
7//! 4. Event ID (for complete tie-breaking)
8
9use std::{cmp::Ordering, collections::BinaryHeap, net::SocketAddr, time::Duration};
10
11use super::{
12    rng::SimulationRng,
13    time::{TimeSource, VirtualTime},
14};
15
16/// Unique identifier for an event.
17///
18/// Derives `Ord` to support use as BTreeMap key for deterministic iteration order.
19#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
20pub struct EventId(u64);
21
22impl EventId {
23    pub fn as_u64(&self) -> u64 {
24        self.0
25    }
26}
27
28/// Types of events that can be scheduled.
29#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
30pub enum EventType {
31    /// A network message delivery
32    MessageDelivery {
33        /// Source peer address
34        from: SocketAddr,
35        /// Target peer address
36        to: SocketAddr,
37        /// Message payload (serialized)
38        payload: Vec<u8>,
39    },
40    /// A timer/wakeup event
41    Timer {
42        /// Peer that scheduled the timer
43        peer: SocketAddr,
44        /// Timer identifier
45        timer_id: u64,
46    },
47    /// A fault injection event (partition start/end, node crash)
48    Fault {
49        /// Affected peers
50        peers: Vec<SocketAddr>,
51        /// Description of the fault
52        description: String,
53    },
54    /// Custom event for extensibility
55    Custom {
56        /// Peer associated with this event
57        peer: SocketAddr,
58        /// Event kind identifier
59        kind: String,
60        /// Event data
61        data: Vec<u8>,
62    },
63}
64
65impl EventType {
66    /// Returns the primary peer associated with this event (for ordering).
67    pub fn primary_peer(&self) -> Option<SocketAddr> {
68        match self {
69            EventType::MessageDelivery { to, .. } => Some(*to),
70            EventType::Timer { peer, .. } => Some(*peer),
71            EventType::Fault { peers, .. } => peers.first().copied(),
72            EventType::Custom { peer, .. } => Some(*peer),
73        }
74    }
75}
76
77/// A scheduled event in the simulation.
78#[derive(Debug, Clone)]
79pub struct Event {
80    /// When this event should be processed (virtual nanos)
81    pub timestamp: u64,
82    /// Unique identifier for ordering ties
83    pub id: EventId,
84    /// The event type and payload
85    pub event_type: EventType,
86}
87
88impl Event {
89    fn new(timestamp: u64, id: EventId, event_type: EventType) -> Self {
90        Self {
91            timestamp,
92            id,
93            event_type,
94        }
95    }
96}
97
98impl PartialEq for Event {
99    fn eq(&self, other: &Self) -> bool {
100        self.id == other.id
101    }
102}
103
104impl Eq for Event {}
105
106impl PartialOrd for Event {
107    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
108        Some(self.cmp(other))
109    }
110}
111
112impl Ord for Event {
113    fn cmp(&self, other: &Self) -> Ordering {
114        // Min-heap: reverse ordering so smallest timestamp comes first
115        // Tie-breaking order:
116        // 1. Timestamp (earlier first)
117        // 2. Primary peer address (for determinism)
118        // 3. Event type (via derived Ord)
119        // 4. Event ID (registration order)
120        match other.timestamp.cmp(&self.timestamp) {
121            Ordering::Equal => {
122                let self_peer = self.event_type.primary_peer();
123                let other_peer = other.event_type.primary_peer();
124                match other_peer.cmp(&self_peer) {
125                    Ordering::Equal => match other.event_type.cmp(&self.event_type) {
126                        Ordering::Equal => other.id.0.cmp(&self.id.0),
127                        ord => ord,
128                    },
129                    ord => ord,
130                }
131            }
132            ord => ord,
133        }
134    }
135}
136
137/// Configuration for the scheduler.
138#[derive(Debug, Clone, Default)]
139pub struct SchedulerConfig {
140    /// Maximum events to process per step (0 = unlimited)
141    pub max_events_per_step: usize,
142    /// Whether to log events as they're processed
143    pub trace_events: bool,
144}
145
146/// Deterministic event scheduler for simulation.
147///
148/// Processes events in timestamp order with deterministic tie-breaking.
149/// All randomness goes through the seeded RNG.
150pub struct Scheduler {
151    /// Virtual time for the simulation
152    time: VirtualTime,
153    /// Seeded RNG for deterministic decisions
154    rng: SimulationRng,
155    /// Priority queue of pending events
156    pending_events: BinaryHeap<Event>,
157    /// Counter for generating unique event IDs
158    next_event_id: u64,
159    /// Log of processed events (for replay verification)
160    event_log: Vec<Event>,
161    /// Configuration
162    config: SchedulerConfig,
163}
164
165impl Scheduler {
166    /// Creates a new scheduler with the given seed.
167    pub fn new(seed: u64) -> Self {
168        Self::with_config(seed, SchedulerConfig::default())
169    }
170
171    /// Creates a new scheduler with the given seed and configuration.
172    pub fn with_config(seed: u64, config: SchedulerConfig) -> Self {
173        Self {
174            time: VirtualTime::new(),
175            rng: SimulationRng::new(seed),
176            pending_events: BinaryHeap::new(),
177            next_event_id: 0,
178            event_log: Vec::new(),
179            config,
180        }
181    }
182
183    /// Returns a reference to the virtual time.
184    pub fn time(&self) -> &VirtualTime {
185        &self.time
186    }
187
188    /// Returns a mutable reference to the virtual time.
189    pub fn time_mut(&mut self) -> &mut VirtualTime {
190        &mut self.time
191    }
192
193    /// Returns the current virtual time in nanoseconds.
194    pub fn now(&self) -> u64 {
195        self.time.now_nanos()
196    }
197
198    /// Returns a reference to the RNG.
199    pub fn rng(&self) -> &SimulationRng {
200        &self.rng
201    }
202
203    /// Returns the seed used for this scheduler.
204    pub fn seed(&self) -> u64 {
205        self.rng.seed()
206    }
207
208    /// Returns the number of pending events.
209    pub fn pending_count(&self) -> usize {
210        self.pending_events.len()
211    }
212
213    /// Returns the event log for replay verification.
214    pub fn event_log(&self) -> &[Event] {
215        &self.event_log
216    }
217
218    /// Clears the event log.
219    pub fn clear_event_log(&mut self) {
220        self.event_log.clear();
221    }
222
223    /// Schedules an event at the given absolute timestamp.
224    pub fn schedule_at(&mut self, timestamp: u64, event_type: EventType) -> EventId {
225        let id = EventId(self.next_event_id);
226        self.next_event_id += 1;
227
228        let event = Event::new(timestamp, id, event_type);
229        self.pending_events.push(event);
230        id
231    }
232
233    /// Schedules an event after the given delay from now.
234    pub fn schedule_after(&mut self, delay: Duration, event_type: EventType) -> EventId {
235        let timestamp = self.now().saturating_add(delay.as_nanos() as u64);
236        self.schedule_at(timestamp, event_type)
237    }
238
239    /// Schedules an event at the current time.
240    pub fn schedule_now(&mut self, event_type: EventType) -> EventId {
241        self.schedule_at(self.now(), event_type)
242    }
243
244    /// Cancels a pending event by ID.
245    ///
246    /// Returns true if the event was found and cancelled.
247    pub fn cancel(&mut self, id: EventId) -> bool {
248        let mut events: Vec<_> = std::mem::take(&mut self.pending_events).into_vec();
249        let original_len = events.len();
250        events.retain(|e| e.id != id);
251        let cancelled = events.len() < original_len;
252        self.pending_events = BinaryHeap::from(events);
253        cancelled
254    }
255
256    /// Returns the timestamp of the next pending event, if any.
257    pub fn next_event_time(&self) -> Option<u64> {
258        self.pending_events.peek().map(|e| e.timestamp)
259    }
260
261    /// Processes the next pending event, advancing time if necessary.
262    ///
263    /// Returns the processed event, or None if no events are pending.
264    pub fn step(&mut self) -> Option<Event> {
265        let event = self.pending_events.pop()?;
266
267        // Advance time to the event's timestamp if needed
268        if event.timestamp > self.now() {
269            self.time.advance_to(event.timestamp);
270        }
271
272        if self.config.trace_events {
273            tracing::trace!(
274                timestamp = event.timestamp,
275                id = event.id.0,
276                ?event.event_type,
277                "Processing event"
278            );
279        }
280
281        self.event_log.push(event.clone());
282        Some(event)
283    }
284
285    /// Processes events until the given condition is true or no events remain.
286    ///
287    /// Returns the number of events processed.
288    pub fn run_until<F>(&mut self, mut condition: F) -> usize
289    where
290        F: FnMut(&Scheduler) -> bool,
291    {
292        let mut processed = 0;
293        while !condition(self) {
294            if self.step().is_none() {
295                break;
296            }
297            processed += 1;
298
299            if self.config.max_events_per_step > 0 && processed >= self.config.max_events_per_step {
300                break;
301            }
302        }
303        processed
304    }
305
306    /// Processes events until the given time is reached.
307    ///
308    /// Returns the number of events processed.
309    pub fn run_until_time(&mut self, target_time: u64) -> usize {
310        let mut processed = 0;
311        while let Some(next_time) = self.next_event_time() {
312            if next_time > target_time {
313                break;
314            }
315            if self.step().is_none() {
316                break;
317            }
318            processed += 1;
319        }
320
321        // Advance time to target even if no events
322        if self.now() < target_time {
323            self.time.advance_to(target_time);
324        }
325
326        processed
327    }
328
329    /// Processes all pending events.
330    ///
331    /// Returns the number of events processed.
332    pub fn run_all(&mut self) -> usize {
333        let mut processed = 0;
334        while self.step().is_some() {
335            processed += 1;
336        }
337        processed
338    }
339
340    /// Drains all pending events without processing them.
341    pub fn drain_pending(&mut self) -> Vec<Event> {
342        std::mem::take(&mut self.pending_events).into_vec()
343    }
344}
345
346impl std::fmt::Debug for Scheduler {
347    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
348        f.debug_struct("Scheduler")
349            .field("now", &self.now())
350            .field("seed", &self.rng.seed())
351            .field("pending_count", &self.pending_count())
352            .field("event_log_len", &self.event_log.len())
353            .finish()
354    }
355}
356
357#[cfg(test)]
358mod tests {
359    use super::*;
360    use std::net::{IpAddr, Ipv4Addr};
361
362    fn addr(port: u16) -> SocketAddr {
363        SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), port)
364    }
365
366    #[test]
367    fn test_scheduler_basic() {
368        let mut scheduler = Scheduler::new(42);
369
370        scheduler.schedule_at(
371            100,
372            EventType::Timer {
373                peer: addr(1000),
374                timer_id: 1,
375            },
376        );
377        scheduler.schedule_at(
378            50,
379            EventType::Timer {
380                peer: addr(1000),
381                timer_id: 2,
382            },
383        );
384        scheduler.schedule_at(
385            200,
386            EventType::Timer {
387                peer: addr(1000),
388                timer_id: 3,
389            },
390        );
391
392        // Should process in timestamp order
393        let e1 = scheduler.step().unwrap();
394        assert_eq!(e1.timestamp, 50);
395
396        let e2 = scheduler.step().unwrap();
397        assert_eq!(e2.timestamp, 100);
398
399        let e3 = scheduler.step().unwrap();
400        assert_eq!(e3.timestamp, 200);
401
402        assert!(scheduler.step().is_none());
403    }
404
405    #[test]
406    fn test_scheduler_time_advancement() {
407        let mut scheduler = Scheduler::new(42);
408
409        scheduler.schedule_at(
410            1000,
411            EventType::Timer {
412                peer: addr(1000),
413                timer_id: 1,
414            },
415        );
416
417        assert_eq!(scheduler.now(), 0);
418        scheduler.step();
419        assert_eq!(scheduler.now(), 1000);
420    }
421
422    #[test]
423    fn test_scheduler_same_time_ordering() {
424        let mut scheduler = Scheduler::new(42);
425
426        // Schedule events at same time but different peers
427        scheduler.schedule_at(
428            100,
429            EventType::Timer {
430                peer: addr(3000),
431                timer_id: 1,
432            },
433        );
434        scheduler.schedule_at(
435            100,
436            EventType::Timer {
437                peer: addr(1000),
438                timer_id: 2,
439            },
440        );
441        scheduler.schedule_at(
442            100,
443            EventType::Timer {
444                peer: addr(2000),
445                timer_id: 3,
446            },
447        );
448
449        // Should be ordered by peer address for determinism
450        let e1 = scheduler.step().unwrap();
451        let e2 = scheduler.step().unwrap();
452        let e3 = scheduler.step().unwrap();
453
454        // All at same timestamp
455        assert_eq!(e1.timestamp, 100);
456        assert_eq!(e2.timestamp, 100);
457        assert_eq!(e3.timestamp, 100);
458
459        // Verify deterministic order (by peer address)
460        let get_peer = |e: &Event| match &e.event_type {
461            EventType::Timer { peer, .. } => *peer,
462            _ => panic!("unexpected event type"),
463        };
464
465        let peers: Vec<_> = [&e1, &e2, &e3].iter().map(|e| get_peer(e)).collect();
466        // Should be sorted by peer address
467        let mut sorted_peers = peers.clone();
468        sorted_peers.sort();
469        assert_eq!(peers, sorted_peers);
470    }
471
472    #[test]
473    fn test_scheduler_cancel() {
474        let mut scheduler = Scheduler::new(42);
475
476        let id1 = scheduler.schedule_at(
477            100,
478            EventType::Timer {
479                peer: addr(1000),
480                timer_id: 1,
481            },
482        );
483        scheduler.schedule_at(
484            200,
485            EventType::Timer {
486                peer: addr(1000),
487                timer_id: 2,
488            },
489        );
490
491        assert!(scheduler.cancel(id1));
492        assert!(!scheduler.cancel(id1)); // Already cancelled
493
494        let e = scheduler.step().unwrap();
495        assert_eq!(e.timestamp, 200); // Should skip to second event
496    }
497
498    #[test]
499    fn test_scheduler_run_until_time() {
500        let mut scheduler = Scheduler::new(42);
501
502        for i in 1..=5 {
503            scheduler.schedule_at(
504                i * 100,
505                EventType::Timer {
506                    peer: addr(1000),
507                    timer_id: i,
508                },
509            );
510        }
511
512        let processed = scheduler.run_until_time(250);
513        assert_eq!(processed, 2); // Events at 100 and 200
514        assert_eq!(scheduler.now(), 250);
515        assert_eq!(scheduler.pending_count(), 3); // Events at 300, 400, 500
516    }
517
518    #[test]
519    fn test_scheduler_run_until() {
520        let mut scheduler = Scheduler::new(42);
521
522        for i in 1..=10 {
523            scheduler.schedule_at(
524                i * 100,
525                EventType::Timer {
526                    peer: addr(1000),
527                    timer_id: i,
528                },
529            );
530        }
531
532        let processed = scheduler.run_until(|s| s.now() >= 500);
533        assert_eq!(processed, 5);
534        assert_eq!(scheduler.now(), 500);
535    }
536
537    #[test]
538    fn test_scheduler_event_log() {
539        let mut scheduler = Scheduler::new(42);
540
541        scheduler.schedule_at(
542            100,
543            EventType::Timer {
544                peer: addr(1000),
545                timer_id: 1,
546            },
547        );
548        scheduler.schedule_at(
549            200,
550            EventType::Timer {
551                peer: addr(1000),
552                timer_id: 2,
553            },
554        );
555
556        scheduler.run_all();
557
558        let log = scheduler.event_log();
559        assert_eq!(log.len(), 2);
560        assert_eq!(log[0].timestamp, 100);
561        assert_eq!(log[1].timestamp, 200);
562    }
563
564    #[test]
565    fn test_scheduler_determinism() {
566        fn run_simulation(seed: u64) -> Vec<(u64, EventId)> {
567            let mut scheduler = Scheduler::new(seed);
568
569            // Schedule events using RNG for timestamps
570            for i in 0..20 {
571                let delay = scheduler.rng().gen_range(0..1000) as u64;
572                scheduler.schedule_at(
573                    delay,
574                    EventType::Timer {
575                        peer: addr((1000 + i) as u16),
576                        timer_id: i as u64,
577                    },
578                );
579            }
580
581            scheduler.run_all();
582            scheduler
583                .event_log()
584                .iter()
585                .map(|e| (e.timestamp, e.id))
586                .collect()
587        }
588
589        // Same seed should produce identical results
590        let result1 = run_simulation(42);
591        let result2 = run_simulation(42);
592        assert_eq!(result1, result2);
593
594        // Different seed should produce different results
595        let result3 = run_simulation(43);
596        assert_ne!(result1, result3);
597    }
598
599    #[test]
600    fn test_message_delivery_event() {
601        let mut scheduler = Scheduler::new(42);
602
603        scheduler.schedule_at(
604            100,
605            EventType::MessageDelivery {
606                from: addr(1000),
607                to: addr(2000),
608                payload: vec![1, 2, 3],
609            },
610        );
611
612        let event = scheduler.step().unwrap();
613        match event.event_type {
614            EventType::MessageDelivery { from, to, payload } => {
615                assert_eq!(from, addr(1000));
616                assert_eq!(to, addr(2000));
617                assert_eq!(payload, vec![1, 2, 3]);
618            }
619            _ => panic!("unexpected event type"),
620        }
621    }
622}