use std::collections::HashMap;
use vortex_core::NodeId;
pub struct SimClock {
global_time_us: u64,
node_skews: HashMap<NodeId, i64>,
frozen_nodes: HashMap<NodeId, u64>,
}
impl SimClock {
pub fn new() -> Self {
Self {
global_time_us: 0,
node_skews: HashMap::new(),
frozen_nodes: HashMap::new(),
}
}
pub fn now_us(&self) -> u64 {
self.global_time_us
}
pub fn node_now_us(&self, node_id: NodeId) -> u64 {
if let Some(&frozen_time) = self.frozen_nodes.get(&node_id) {
return frozen_time;
}
let global = self.global_time_us as i64;
let skew = self.node_skews.get(&node_id).copied().unwrap_or(0);
(global + skew).max(0) as u64
}
pub fn advance_us(&mut self, delta_us: u64) {
self.global_time_us += delta_us;
}
pub fn advance_ms(&mut self, delta_ms: u64) {
self.advance_us(delta_ms * 1000);
}
pub fn set_node_skew(&mut self, node_id: NodeId, skew_us: i64) {
self.node_skews.insert(node_id, skew_us);
}
pub fn get_node_skew(&self, node_id: NodeId) -> i64 {
self.node_skews.get(&node_id).copied().unwrap_or(0)
}
pub fn set_us(&mut self, time_us: u64) {
self.global_time_us = time_us;
}
pub fn drift(&mut self, node_id: NodeId, drift_us_per_sec: i64, elapsed_secs: u64) {
let skew = self.node_skews.entry(node_id).or_insert(0);
*skew += drift_us_per_sec * elapsed_secs as i64;
}
pub fn step_jump(&mut self, node_id: NodeId, delta_us: i64) {
let skew = self.node_skews.entry(node_id).or_insert(0);
*skew += delta_us;
}
pub fn freeze(&mut self, node_id: NodeId) {
let frozen_time = self.node_now_us(node_id);
self.frozen_nodes.insert(node_id, frozen_time);
}
pub fn unfreeze(&mut self, node_id: NodeId) {
if let Some(frozen_time) = self.frozen_nodes.remove(&node_id) {
let skew = frozen_time as i64 - self.global_time_us as i64;
self.node_skews.insert(node_id, skew);
}
}
pub fn is_frozen(&self, node_id: NodeId) -> bool {
self.frozen_nodes.contains_key(&node_id)
}
pub fn warp(&mut self, node_id: NodeId, target_us: u64) {
self.frozen_nodes.remove(&node_id);
let skew = target_us as i64 - self.global_time_us as i64;
self.node_skews.insert(node_id, skew);
}
pub fn inject_leap_second(&mut self, node_id: NodeId) {
let skew = self.node_skews.entry(node_id).or_insert(0);
*skew -= 1_000_000; }
pub fn inject_negative_leap_second(&mut self, node_id: NodeId) {
let skew = self.node_skews.entry(node_id).or_insert(0);
*skew += 1_000_000; }
pub fn reset(&mut self) {
self.global_time_us = 0;
self.node_skews.clear();
self.frozen_nodes.clear();
}
}
impl Default for SimClock {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_basic_advance() {
let mut clock = SimClock::new();
assert_eq!(clock.now_us(), 0);
clock.advance_ms(100);
assert_eq!(clock.now_us(), 100_000);
clock.advance_us(500);
assert_eq!(clock.now_us(), 100_500);
}
#[test]
fn test_node_skew() {
let mut clock = SimClock::new();
clock.advance_ms(1000);
clock.set_node_skew(1, 50_000); assert_eq!(clock.node_now_us(1), 1_050_000);
clock.set_node_skew(2, -30_000); assert_eq!(clock.node_now_us(2), 970_000);
assert_eq!(clock.node_now_us(3), 1_000_000);
}
#[test]
fn test_drift() {
let mut clock = SimClock::new();
clock.advance_ms(1000);
clock.drift(1, 100, 10);
assert_eq!(clock.get_node_skew(1), 1000);
assert_eq!(clock.node_now_us(1), 1_001_000);
}
#[test]
fn test_step_jump() {
let mut clock = SimClock::new();
clock.advance_ms(1000);
clock.step_jump(1, 50_000); assert_eq!(clock.node_now_us(1), 1_050_000);
clock.step_jump(1, -100_000); assert_eq!(clock.node_now_us(1), 950_000);
}
#[test]
fn test_reset() {
let mut clock = SimClock::new();
clock.advance_ms(500);
clock.set_node_skew(1, 100);
clock.reset();
assert_eq!(clock.now_us(), 0);
assert_eq!(clock.get_node_skew(1), 0);
}
#[test]
fn test_freeze_and_unfreeze() {
let mut clock = SimClock::new();
clock.advance_ms(100);
clock.set_node_skew(1, 5_000);
let frozen_time = clock.node_now_us(1); clock.freeze(1);
assert!(clock.is_frozen(1));
clock.advance_ms(200);
assert_eq!(clock.node_now_us(1), frozen_time);
assert_eq!(clock.now_us(), 300_000);
clock.unfreeze(1);
assert!(!clock.is_frozen(1));
assert_eq!(clock.node_now_us(1), frozen_time);
clock.advance_ms(50);
assert_eq!(clock.node_now_us(1), frozen_time + 50_000);
}
#[test]
fn test_warp() {
let mut clock = SimClock::new();
clock.advance_ms(100);
clock.warp(1, 999_000_000);
assert_eq!(clock.node_now_us(1), 999_000_000);
clock.warp(2, 10_000);
assert_eq!(clock.node_now_us(2), 10_000);
clock.advance_ms(10);
assert_eq!(clock.node_now_us(1), 999_010_000);
assert_eq!(clock.node_now_us(2), 20_000);
}
#[test]
fn test_warp_unfreezes() {
let mut clock = SimClock::new();
clock.advance_ms(100);
clock.freeze(1);
assert!(clock.is_frozen(1));
clock.warp(1, 500_000);
assert!(!clock.is_frozen(1));
assert_eq!(clock.node_now_us(1), 500_000);
}
#[test]
fn test_node_time_never_negative() {
let mut clock = SimClock::new();
clock.advance_ms(10);
clock.set_node_skew(1, -1_000_000); assert_eq!(clock.node_now_us(1), 0); }
#[test]
fn test_leap_second_positive() {
let mut clock = SimClock::new();
clock.advance_us(10_000_000);
let before = clock.node_now_us(1);
clock.inject_leap_second(1);
let after = clock.node_now_us(1);
assert_eq!(after, before - 1_000_000);
}
#[test]
fn test_leap_second_negative() {
let mut clock = SimClock::new();
clock.advance_us(10_000_000);
let before = clock.node_now_us(1);
clock.inject_negative_leap_second(1);
let after = clock.node_now_us(1);
assert_eq!(after, before + 1_000_000);
}
}