vortex-core 0.1.0

Core types and deterministic scheduler for Vortex simulation engine
Documentation
//! Deterministic single-threaded event scheduler.
//!
//! The scheduler is the heart of the simulation engine. All I/O operations
//! (message deliveries, disk completions, timer fires) are modeled as events
//! in a priority queue ordered by `(tick, event_id)`.
//!
//! Single-threaded execution eliminates OS scheduling non-determinism.
//! Same seed + same events = identical execution order.

use std::cmp::Reverse;
use std::collections::BinaryHeap;

/// A schedulable event in the simulation.
#[derive(Debug, Clone)]
pub struct SimEvent {
    /// When this event should fire (simulation tick).
    pub tick: u64,
    /// Unique event ID for tie-breaking (FIFO for same-tick events).
    pub id: u64,
    /// The event payload.
    pub kind: SimEventKind,
}

/// Types of simulation events.
#[derive(Debug, Clone)]
pub enum SimEventKind {
    /// Deliver a network message.
    DeliverMessage {
        from: u64,
        to: u64,
        payload: Vec<u8>,
    },
    /// Fire a timer.
    TimerFire { node_id: u64, timer_name: String },
    /// Inject a fault.
    InjectFault { description: String },
    /// Heal a fault.
    HealFault { description: String },
    /// Custom event for extensibility.
    Custom { tag: String, data: Vec<u8> },
}

impl PartialEq for SimEvent {
    fn eq(&self, other: &Self) -> bool {
        self.tick == other.tick && self.id == other.id
    }
}

impl Eq for SimEvent {}

impl PartialOrd for SimEvent {
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
        Some(self.cmp(other))
    }
}

impl Ord for SimEvent {
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
        // Min-heap: earliest tick first, then lowest ID (FIFO)
        (self.tick, self.id).cmp(&(other.tick, other.id))
    }
}

/// Single-threaded deterministic event scheduler.
///
/// Events are processed in `(tick, id)` order. Within the same tick, events
/// are delivered in FIFO insertion order. No wall-clock access, no OS threads.
pub struct SimScheduler {
    queue: BinaryHeap<Reverse<SimEvent>>,
    current_tick: u64,
    next_id: u64,
    events_processed: u64,
}

impl SimScheduler {
    /// Create a new scheduler starting at tick 0.
    pub fn new() -> Self {
        Self {
            queue: BinaryHeap::new(),
            current_tick: 0,
            next_id: 0,
            events_processed: 0,
        }
    }

    /// Schedule an event at a specific tick. Returns the event ID.
    pub fn schedule_at(&mut self, tick: u64, kind: SimEventKind) -> u64 {
        let id = self.next_id;
        self.next_id += 1;
        self.queue.push(Reverse(SimEvent { tick, id, kind }));
        id
    }

    /// Schedule an event relative to the current tick.
    pub fn schedule_after(&mut self, delay: u64, kind: SimEventKind) -> u64 {
        self.schedule_at(self.current_tick + delay, kind)
    }

    /// Process the next event. Returns it if available.
    pub fn step(&mut self) -> Option<SimEvent> {
        if let Some(Reverse(event)) = self.queue.pop() {
            self.current_tick = event.tick;
            self.events_processed += 1;
            Some(event)
        } else {
            None
        }
    }

    /// Process all events up to and including the given tick.
    pub fn run_until(&mut self, max_tick: u64) -> Vec<SimEvent> {
        let mut events = Vec::new();
        while let Some(Reverse(event)) = self.queue.peek() {
            if event.tick > max_tick {
                break;
            }
            let event = self.step().unwrap();
            events.push(event);
        }
        events
    }

    /// Current simulation tick.
    pub fn current_tick(&self) -> u64 {
        self.current_tick
    }

    /// Advance the current tick without processing events.
    pub fn advance_to(&mut self, tick: u64) {
        if tick > self.current_tick {
            self.current_tick = tick;
        }
    }

    /// Number of pending events.
    pub fn pending_count(&self) -> usize {
        self.queue.len()
    }

    /// Total events processed so far.
    pub fn events_processed(&self) -> u64 {
        self.events_processed
    }

    /// Is the scheduler empty?
    pub fn is_empty(&self) -> bool {
        self.queue.is_empty()
    }

    /// Peek at the next event's tick without consuming it.
    pub fn peek_tick(&self) -> Option<u64> {
        self.queue.peek().map(|Reverse(e)| e.tick)
    }
}

impl Default for SimScheduler {
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_ordering() {
        let mut sched = SimScheduler::new();
        sched.schedule_at(
            5,
            SimEventKind::TimerFire {
                node_id: 1,
                timer_name: "election".into(),
            },
        );
        sched.schedule_at(
            2,
            SimEventKind::TimerFire {
                node_id: 2,
                timer_name: "heartbeat".into(),
            },
        );
        sched.schedule_at(
            8,
            SimEventKind::TimerFire {
                node_id: 3,
                timer_name: "cleanup".into(),
            },
        );

        assert_eq!(sched.step().unwrap().tick, 2);
        assert_eq!(sched.step().unwrap().tick, 5);
        assert_eq!(sched.step().unwrap().tick, 8);
        assert!(sched.is_empty());
    }

    #[test]
    fn test_fifo_same_tick() {
        let mut sched = SimScheduler::new();
        let id_a = sched.schedule_at(
            10,
            SimEventKind::Custom {
                tag: "A".into(),
                data: vec![],
            },
        );
        let id_b = sched.schedule_at(
            10,
            SimEventKind::Custom {
                tag: "B".into(),
                data: vec![],
            },
        );

        assert_eq!(sched.step().unwrap().id, id_a);
        assert_eq!(sched.step().unwrap().id, id_b);
    }

    #[test]
    fn test_run_until() {
        let mut sched = SimScheduler::new();
        sched.schedule_at(
            1,
            SimEventKind::Custom {
                tag: "t1".into(),
                data: vec![],
            },
        );
        sched.schedule_at(
            3,
            SimEventKind::Custom {
                tag: "t3".into(),
                data: vec![],
            },
        );
        sched.schedule_at(
            5,
            SimEventKind::Custom {
                tag: "t5".into(),
                data: vec![],
            },
        );
        sched.schedule_at(
            7,
            SimEventKind::Custom {
                tag: "t7".into(),
                data: vec![],
            },
        );

        let events = sched.run_until(4);
        assert_eq!(events.len(), 2);
        assert_eq!(sched.pending_count(), 2);
    }

    #[test]
    fn test_schedule_after() {
        let mut sched = SimScheduler::new();
        sched.schedule_after(
            10,
            SimEventKind::Custom {
                tag: "delayed".into(),
                data: vec![],
            },
        );

        let e = sched.step().unwrap();
        assert_eq!(e.tick, 10);
        assert_eq!(sched.current_tick(), 10);
    }

    #[test]
    fn test_peek_tick() {
        let mut sched = SimScheduler::new();
        assert_eq!(sched.peek_tick(), None);
        sched.schedule_at(
            5,
            SimEventKind::Custom {
                tag: "x".into(),
                data: vec![],
            },
        );
        assert_eq!(sched.peek_tick(), Some(5));
    }

    #[test]
    fn test_advance_to() {
        let mut sched = SimScheduler::new();
        sched.advance_to(100);
        assert_eq!(sched.current_tick(), 100);
        // Can't go backwards
        sched.advance_to(50);
        assert_eq!(sched.current_tick(), 100);
    }
}