Skip to main content

vortex_core/
scheduler.rs

1//! Deterministic single-threaded event scheduler.
2//!
3//! The scheduler is the heart of the simulation engine. All I/O operations
4//! (message deliveries, disk completions, timer fires) are modeled as events
5//! in a priority queue ordered by `(tick, event_id)`.
6//!
7//! Single-threaded execution eliminates OS scheduling non-determinism.
8//! Same seed + same events = identical execution order.
9
10use std::cmp::Reverse;
11use std::collections::BinaryHeap;
12
13/// A schedulable event in the simulation.
14#[derive(Debug, Clone)]
15pub struct SimEvent {
16    /// When this event should fire (simulation tick).
17    pub tick: u64,
18    /// Unique event ID for tie-breaking (FIFO for same-tick events).
19    pub id: u64,
20    /// The event payload.
21    pub kind: SimEventKind,
22}
23
24/// Types of simulation events.
25#[derive(Debug, Clone)]
26pub enum SimEventKind {
27    /// Deliver a network message.
28    DeliverMessage {
29        from: u64,
30        to: u64,
31        payload: Vec<u8>,
32    },
33    /// Fire a timer.
34    TimerFire { node_id: u64, timer_name: String },
35    /// Inject a fault.
36    InjectFault { description: String },
37    /// Heal a fault.
38    HealFault { description: String },
39    /// Custom event for extensibility.
40    Custom { tag: String, data: Vec<u8> },
41}
42
43impl PartialEq for SimEvent {
44    fn eq(&self, other: &Self) -> bool {
45        self.tick == other.tick && self.id == other.id
46    }
47}
48
49impl Eq for SimEvent {}
50
51impl PartialOrd for SimEvent {
52    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
53        Some(self.cmp(other))
54    }
55}
56
57impl Ord for SimEvent {
58    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
59        // Min-heap: earliest tick first, then lowest ID (FIFO)
60        (self.tick, self.id).cmp(&(other.tick, other.id))
61    }
62}
63
64/// Single-threaded deterministic event scheduler.
65///
66/// Events are processed in `(tick, id)` order. Within the same tick, events
67/// are delivered in FIFO insertion order. No wall-clock access, no OS threads.
68pub struct SimScheduler {
69    queue: BinaryHeap<Reverse<SimEvent>>,
70    current_tick: u64,
71    next_id: u64,
72    events_processed: u64,
73}
74
75impl SimScheduler {
76    /// Create a new scheduler starting at tick 0.
77    pub fn new() -> Self {
78        Self {
79            queue: BinaryHeap::new(),
80            current_tick: 0,
81            next_id: 0,
82            events_processed: 0,
83        }
84    }
85
86    /// Schedule an event at a specific tick. Returns the event ID.
87    pub fn schedule_at(&mut self, tick: u64, kind: SimEventKind) -> u64 {
88        let id = self.next_id;
89        self.next_id += 1;
90        self.queue.push(Reverse(SimEvent { tick, id, kind }));
91        id
92    }
93
94    /// Schedule an event relative to the current tick.
95    pub fn schedule_after(&mut self, delay: u64, kind: SimEventKind) -> u64 {
96        self.schedule_at(self.current_tick + delay, kind)
97    }
98
99    /// Process the next event. Returns it if available.
100    pub fn step(&mut self) -> Option<SimEvent> {
101        if let Some(Reverse(event)) = self.queue.pop() {
102            self.current_tick = event.tick;
103            self.events_processed += 1;
104            Some(event)
105        } else {
106            None
107        }
108    }
109
110    /// Process all events up to and including the given tick.
111    pub fn run_until(&mut self, max_tick: u64) -> Vec<SimEvent> {
112        let mut events = Vec::new();
113        while let Some(Reverse(event)) = self.queue.peek() {
114            if event.tick > max_tick {
115                break;
116            }
117            let event = self.step().unwrap();
118            events.push(event);
119        }
120        events
121    }
122
123    /// Current simulation tick.
124    pub fn current_tick(&self) -> u64 {
125        self.current_tick
126    }
127
128    /// Advance the current tick without processing events.
129    pub fn advance_to(&mut self, tick: u64) {
130        if tick > self.current_tick {
131            self.current_tick = tick;
132        }
133    }
134
135    /// Number of pending events.
136    pub fn pending_count(&self) -> usize {
137        self.queue.len()
138    }
139
140    /// Total events processed so far.
141    pub fn events_processed(&self) -> u64 {
142        self.events_processed
143    }
144
145    /// Is the scheduler empty?
146    pub fn is_empty(&self) -> bool {
147        self.queue.is_empty()
148    }
149
150    /// Peek at the next event's tick without consuming it.
151    pub fn peek_tick(&self) -> Option<u64> {
152        self.queue.peek().map(|Reverse(e)| e.tick)
153    }
154}
155
156impl Default for SimScheduler {
157    fn default() -> Self {
158        Self::new()
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165
166    #[test]
167    fn test_ordering() {
168        let mut sched = SimScheduler::new();
169        sched.schedule_at(
170            5,
171            SimEventKind::TimerFire {
172                node_id: 1,
173                timer_name: "election".into(),
174            },
175        );
176        sched.schedule_at(
177            2,
178            SimEventKind::TimerFire {
179                node_id: 2,
180                timer_name: "heartbeat".into(),
181            },
182        );
183        sched.schedule_at(
184            8,
185            SimEventKind::TimerFire {
186                node_id: 3,
187                timer_name: "cleanup".into(),
188            },
189        );
190
191        assert_eq!(sched.step().unwrap().tick, 2);
192        assert_eq!(sched.step().unwrap().tick, 5);
193        assert_eq!(sched.step().unwrap().tick, 8);
194        assert!(sched.is_empty());
195    }
196
197    #[test]
198    fn test_fifo_same_tick() {
199        let mut sched = SimScheduler::new();
200        let id_a = sched.schedule_at(
201            10,
202            SimEventKind::Custom {
203                tag: "A".into(),
204                data: vec![],
205            },
206        );
207        let id_b = sched.schedule_at(
208            10,
209            SimEventKind::Custom {
210                tag: "B".into(),
211                data: vec![],
212            },
213        );
214
215        assert_eq!(sched.step().unwrap().id, id_a);
216        assert_eq!(sched.step().unwrap().id, id_b);
217    }
218
219    #[test]
220    fn test_run_until() {
221        let mut sched = SimScheduler::new();
222        sched.schedule_at(
223            1,
224            SimEventKind::Custom {
225                tag: "t1".into(),
226                data: vec![],
227            },
228        );
229        sched.schedule_at(
230            3,
231            SimEventKind::Custom {
232                tag: "t3".into(),
233                data: vec![],
234            },
235        );
236        sched.schedule_at(
237            5,
238            SimEventKind::Custom {
239                tag: "t5".into(),
240                data: vec![],
241            },
242        );
243        sched.schedule_at(
244            7,
245            SimEventKind::Custom {
246                tag: "t7".into(),
247                data: vec![],
248            },
249        );
250
251        let events = sched.run_until(4);
252        assert_eq!(events.len(), 2);
253        assert_eq!(sched.pending_count(), 2);
254    }
255
256    #[test]
257    fn test_schedule_after() {
258        let mut sched = SimScheduler::new();
259        sched.schedule_after(
260            10,
261            SimEventKind::Custom {
262                tag: "delayed".into(),
263                data: vec![],
264            },
265        );
266
267        let e = sched.step().unwrap();
268        assert_eq!(e.tick, 10);
269        assert_eq!(sched.current_tick(), 10);
270    }
271
272    #[test]
273    fn test_peek_tick() {
274        let mut sched = SimScheduler::new();
275        assert_eq!(sched.peek_tick(), None);
276        sched.schedule_at(
277            5,
278            SimEventKind::Custom {
279                tag: "x".into(),
280                data: vec![],
281            },
282        );
283        assert_eq!(sched.peek_tick(), Some(5));
284    }
285
286    #[test]
287    fn test_advance_to() {
288        let mut sched = SimScheduler::new();
289        sched.advance_to(100);
290        assert_eq!(sched.current_tick(), 100);
291        // Can't go backwards
292        sched.advance_to(50);
293        assert_eq!(sched.current_tick(), 100);
294    }
295}