vortex-sim 0.1.0

Simulated I/O implementations (network, storage, clock) for Vortex
Documentation
//! Deterministic clock with manual advance and per-node skew injection.
//!
//! All timers and timestamps use `SimClock` instead of `SystemTime::now()`.
//! This enables deterministic replay and clock anomaly testing.

use std::collections::HashMap;
use vortex_core::NodeId;

/// A deterministic clock with virtual time and per-node skew.
///
/// Time is in microseconds and only advances when explicitly told to.
/// No wall-clock access — all time is virtual.
pub struct SimClock {
    /// Global simulation time in microseconds.
    global_time_us: u64,
    /// Per-node clock skew in microseconds (positive = ahead, negative = behind).
    node_skews: HashMap<NodeId, i64>,
    /// Frozen nodes: maps node to the time it was frozen at.
    /// While frozen, `node_now_us` always returns this value regardless of global advances.
    frozen_nodes: HashMap<NodeId, u64>,
}

impl SimClock {
    /// Create a new simulation clock starting at 0.
    pub fn new() -> Self {
        Self {
            global_time_us: 0,
            node_skews: HashMap::new(),
            frozen_nodes: HashMap::new(),
        }
    }

    /// Get the global simulation time in microseconds.
    pub fn now_us(&self) -> u64 {
        self.global_time_us
    }

    /// Get the time as seen by a specific node (global time + skew).
    /// If the node is frozen, returns the frozen time.
    pub fn node_now_us(&self, node_id: NodeId) -> u64 {
        if let Some(&frozen_time) = self.frozen_nodes.get(&node_id) {
            return frozen_time;
        }
        let global = self.global_time_us as i64;
        let skew = self.node_skews.get(&node_id).copied().unwrap_or(0);
        (global + skew).max(0) as u64
    }

    /// Advance the global clock by the given number of microseconds.
    pub fn advance_us(&mut self, delta_us: u64) {
        self.global_time_us += delta_us;
    }

    /// Advance by milliseconds.
    pub fn advance_ms(&mut self, delta_ms: u64) {
        self.advance_us(delta_ms * 1000);
    }

    /// Set the clock skew for a specific node.
    pub fn set_node_skew(&mut self, node_id: NodeId, skew_us: i64) {
        self.node_skews.insert(node_id, skew_us);
    }

    /// Get the current skew for a node.
    pub fn get_node_skew(&self, node_id: NodeId) -> i64 {
        self.node_skews.get(&node_id).copied().unwrap_or(0)
    }

    /// Set the global clock to an absolute value.
    pub fn set_us(&mut self, time_us: u64) {
        self.global_time_us = time_us;
    }

    /// Apply clock drift: accumulate skew as if the node drifts at a given rate.
    pub fn drift(&mut self, node_id: NodeId, drift_us_per_sec: i64, elapsed_secs: u64) {
        let skew = self.node_skews.entry(node_id).or_insert(0);
        *skew += drift_us_per_sec * elapsed_secs as i64;
    }

    /// Apply an instantaneous step jump (NTP correction, VM clock correction).
    pub fn step_jump(&mut self, node_id: NodeId, delta_us: i64) {
        let skew = self.node_skews.entry(node_id).or_insert(0);
        *skew += delta_us;
    }

    /// Freeze a node's clock at its current time.
    /// While frozen, the node's time does not advance when global time advances.
    pub fn freeze(&mut self, node_id: NodeId) {
        let frozen_time = self.node_now_us(node_id);
        self.frozen_nodes.insert(node_id, frozen_time);
    }

    /// Unfreeze a node's clock. The node resumes from where it was frozen,
    /// adjusting skew so `node_now_us` continues from the frozen time.
    pub fn unfreeze(&mut self, node_id: NodeId) {
        if let Some(frozen_time) = self.frozen_nodes.remove(&node_id) {
            // Set skew so that global_time + skew = frozen_time
            let skew = frozen_time as i64 - self.global_time_us as i64;
            self.node_skews.insert(node_id, skew);
        }
    }

    /// Check if a node's clock is frozen.
    pub fn is_frozen(&self, node_id: NodeId) -> bool {
        self.frozen_nodes.contains_key(&node_id)
    }

    /// Warp a node's clock to an arbitrary absolute time.
    /// This sets the node's skew so that `node_now_us` returns the target time.
    pub fn warp(&mut self, node_id: NodeId, target_us: u64) {
        // Remove from frozen if it was frozen
        self.frozen_nodes.remove(&node_id);
        let skew = target_us as i64 - self.global_time_us as i64;
        self.node_skews.insert(node_id, skew);
    }

    /// Inject a positive leap second on a node. The node's clock repeats 1 second
    /// (jumps backward by 1_000_000 us), simulating a UTC leap second insertion.
    pub fn inject_leap_second(&mut self, node_id: NodeId) {
        let skew = self.node_skews.entry(node_id).or_insert(0);
        *skew -= 1_000_000; // -1 second
    }

    /// Inject a negative leap second on a node. The node's clock skips 1 second
    /// forward (jumps forward by 1_000_000 us), simulating a UTC leap second deletion.
    pub fn inject_negative_leap_second(&mut self, node_id: NodeId) {
        let skew = self.node_skews.entry(node_id).or_insert(0);
        *skew += 1_000_000; // +1 second
    }

    /// Reset the clock to zero and clear all skews.
    pub fn reset(&mut self) {
        self.global_time_us = 0;
        self.node_skews.clear();
        self.frozen_nodes.clear();
    }
}

impl Default for SimClock {
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_basic_advance() {
        let mut clock = SimClock::new();
        assert_eq!(clock.now_us(), 0);
        clock.advance_ms(100);
        assert_eq!(clock.now_us(), 100_000);
        clock.advance_us(500);
        assert_eq!(clock.now_us(), 100_500);
    }

    #[test]
    fn test_node_skew() {
        let mut clock = SimClock::new();
        clock.advance_ms(1000);

        clock.set_node_skew(1, 50_000); // 50ms ahead
        assert_eq!(clock.node_now_us(1), 1_050_000);

        clock.set_node_skew(2, -30_000); // 30ms behind
        assert_eq!(clock.node_now_us(2), 970_000);

        // No skew
        assert_eq!(clock.node_now_us(3), 1_000_000);
    }

    #[test]
    fn test_drift() {
        let mut clock = SimClock::new();
        clock.advance_ms(1000);

        // Node 1 drifts 100us/sec for 10 seconds
        clock.drift(1, 100, 10);
        assert_eq!(clock.get_node_skew(1), 1000);
        assert_eq!(clock.node_now_us(1), 1_001_000);
    }

    #[test]
    fn test_step_jump() {
        let mut clock = SimClock::new();
        clock.advance_ms(1000);

        clock.step_jump(1, 50_000); // +50ms jump
        assert_eq!(clock.node_now_us(1), 1_050_000);

        clock.step_jump(1, -100_000); // -100ms jump
        assert_eq!(clock.node_now_us(1), 950_000);
    }

    #[test]
    fn test_reset() {
        let mut clock = SimClock::new();
        clock.advance_ms(500);
        clock.set_node_skew(1, 100);
        clock.reset();
        assert_eq!(clock.now_us(), 0);
        assert_eq!(clock.get_node_skew(1), 0);
    }

    #[test]
    fn test_freeze_and_unfreeze() {
        let mut clock = SimClock::new();
        clock.advance_ms(100);
        clock.set_node_skew(1, 5_000); // +5ms

        let frozen_time = clock.node_now_us(1); // 105_000
        clock.freeze(1);
        assert!(clock.is_frozen(1));

        // Advance global time — frozen node stays the same
        clock.advance_ms(200);
        assert_eq!(clock.node_now_us(1), frozen_time);
        assert_eq!(clock.now_us(), 300_000); // Global advanced

        // Unfreeze — node resumes from frozen time
        clock.unfreeze(1);
        assert!(!clock.is_frozen(1));
        assert_eq!(clock.node_now_us(1), frozen_time); // Still at frozen time right after unfreeze

        // Further global advances now affect the node
        clock.advance_ms(50);
        assert_eq!(clock.node_now_us(1), frozen_time + 50_000);
    }

    #[test]
    fn test_warp() {
        let mut clock = SimClock::new();
        clock.advance_ms(100);

        // Warp node 1 far into the future
        clock.warp(1, 999_000_000);
        assert_eq!(clock.node_now_us(1), 999_000_000);

        // Warp node 2 to the past
        clock.warp(2, 10_000);
        assert_eq!(clock.node_now_us(2), 10_000);

        // Global advance also advances warped nodes
        clock.advance_ms(10);
        assert_eq!(clock.node_now_us(1), 999_010_000);
        assert_eq!(clock.node_now_us(2), 20_000);
    }

    #[test]
    fn test_warp_unfreezes() {
        let mut clock = SimClock::new();
        clock.advance_ms(100);

        clock.freeze(1);
        assert!(clock.is_frozen(1));

        // Warp should unfreeze
        clock.warp(1, 500_000);
        assert!(!clock.is_frozen(1));
        assert_eq!(clock.node_now_us(1), 500_000);
    }

    #[test]
    fn test_node_time_never_negative() {
        let mut clock = SimClock::new();
        clock.advance_ms(10);
        clock.set_node_skew(1, -1_000_000); // huge negative skew
        assert_eq!(clock.node_now_us(1), 0); // clamped to 0
    }

    #[test]
    fn test_leap_second_positive() {
        let mut clock = SimClock::new();
        clock.advance_us(10_000_000); // 10 seconds

        let before = clock.node_now_us(1);
        clock.inject_leap_second(1);
        let after = clock.node_now_us(1);

        // Positive leap second means clock goes backward by 1s
        assert_eq!(after, before - 1_000_000);
    }

    #[test]
    fn test_leap_second_negative() {
        let mut clock = SimClock::new();
        clock.advance_us(10_000_000); // 10 seconds

        let before = clock.node_now_us(1);
        clock.inject_negative_leap_second(1);
        let after = clock.node_now_us(1);

        // Negative leap second means clock jumps forward by 1s
        assert_eq!(after, before + 1_000_000);
    }
}