Skip to main content

vortex_sim/
clock.rs

1//! Deterministic clock with manual advance and per-node skew injection.
2//!
3//! All timers and timestamps use `SimClock` instead of `SystemTime::now()`.
4//! This enables deterministic replay and clock anomaly testing.
5
6use std::collections::HashMap;
7use vortex_core::NodeId;
8
9/// A deterministic clock with virtual time and per-node skew.
10///
11/// Time is in microseconds and only advances when explicitly told to.
12/// No wall-clock access — all time is virtual.
13pub struct SimClock {
14    /// Global simulation time in microseconds.
15    global_time_us: u64,
16    /// Per-node clock skew in microseconds (positive = ahead, negative = behind).
17    node_skews: HashMap<NodeId, i64>,
18    /// Frozen nodes: maps node to the time it was frozen at.
19    /// While frozen, `node_now_us` always returns this value regardless of global advances.
20    frozen_nodes: HashMap<NodeId, u64>,
21}
22
23impl SimClock {
24    /// Create a new simulation clock starting at 0.
25    pub fn new() -> Self {
26        Self {
27            global_time_us: 0,
28            node_skews: HashMap::new(),
29            frozen_nodes: HashMap::new(),
30        }
31    }
32
33    /// Get the global simulation time in microseconds.
34    pub fn now_us(&self) -> u64 {
35        self.global_time_us
36    }
37
38    /// Get the time as seen by a specific node (global time + skew).
39    /// If the node is frozen, returns the frozen time.
40    pub fn node_now_us(&self, node_id: NodeId) -> u64 {
41        if let Some(&frozen_time) = self.frozen_nodes.get(&node_id) {
42            return frozen_time;
43        }
44        let global = self.global_time_us as i64;
45        let skew = self.node_skews.get(&node_id).copied().unwrap_or(0);
46        (global + skew).max(0) as u64
47    }
48
49    /// Advance the global clock by the given number of microseconds.
50    pub fn advance_us(&mut self, delta_us: u64) {
51        self.global_time_us += delta_us;
52    }
53
54    /// Advance by milliseconds.
55    pub fn advance_ms(&mut self, delta_ms: u64) {
56        self.advance_us(delta_ms * 1000);
57    }
58
59    /// Set the clock skew for a specific node.
60    pub fn set_node_skew(&mut self, node_id: NodeId, skew_us: i64) {
61        self.node_skews.insert(node_id, skew_us);
62    }
63
64    /// Get the current skew for a node.
65    pub fn get_node_skew(&self, node_id: NodeId) -> i64 {
66        self.node_skews.get(&node_id).copied().unwrap_or(0)
67    }
68
69    /// Set the global clock to an absolute value.
70    pub fn set_us(&mut self, time_us: u64) {
71        self.global_time_us = time_us;
72    }
73
74    /// Apply clock drift: accumulate skew as if the node drifts at a given rate.
75    pub fn drift(&mut self, node_id: NodeId, drift_us_per_sec: i64, elapsed_secs: u64) {
76        let skew = self.node_skews.entry(node_id).or_insert(0);
77        *skew += drift_us_per_sec * elapsed_secs as i64;
78    }
79
80    /// Apply an instantaneous step jump (NTP correction, VM clock correction).
81    pub fn step_jump(&mut self, node_id: NodeId, delta_us: i64) {
82        let skew = self.node_skews.entry(node_id).or_insert(0);
83        *skew += delta_us;
84    }
85
86    /// Freeze a node's clock at its current time.
87    /// While frozen, the node's time does not advance when global time advances.
88    pub fn freeze(&mut self, node_id: NodeId) {
89        let frozen_time = self.node_now_us(node_id);
90        self.frozen_nodes.insert(node_id, frozen_time);
91    }
92
93    /// Unfreeze a node's clock. The node resumes from where it was frozen,
94    /// adjusting skew so `node_now_us` continues from the frozen time.
95    pub fn unfreeze(&mut self, node_id: NodeId) {
96        if let Some(frozen_time) = self.frozen_nodes.remove(&node_id) {
97            // Set skew so that global_time + skew = frozen_time
98            let skew = frozen_time as i64 - self.global_time_us as i64;
99            self.node_skews.insert(node_id, skew);
100        }
101    }
102
103    /// Check if a node's clock is frozen.
104    pub fn is_frozen(&self, node_id: NodeId) -> bool {
105        self.frozen_nodes.contains_key(&node_id)
106    }
107
108    /// Warp a node's clock to an arbitrary absolute time.
109    /// This sets the node's skew so that `node_now_us` returns the target time.
110    pub fn warp(&mut self, node_id: NodeId, target_us: u64) {
111        // Remove from frozen if it was frozen
112        self.frozen_nodes.remove(&node_id);
113        let skew = target_us as i64 - self.global_time_us as i64;
114        self.node_skews.insert(node_id, skew);
115    }
116
117    /// Inject a positive leap second on a node. The node's clock repeats 1 second
118    /// (jumps backward by 1_000_000 us), simulating a UTC leap second insertion.
119    pub fn inject_leap_second(&mut self, node_id: NodeId) {
120        let skew = self.node_skews.entry(node_id).or_insert(0);
121        *skew -= 1_000_000; // -1 second
122    }
123
124    /// Inject a negative leap second on a node. The node's clock skips 1 second
125    /// forward (jumps forward by 1_000_000 us), simulating a UTC leap second deletion.
126    pub fn inject_negative_leap_second(&mut self, node_id: NodeId) {
127        let skew = self.node_skews.entry(node_id).or_insert(0);
128        *skew += 1_000_000; // +1 second
129    }
130
131    /// Reset the clock to zero and clear all skews.
132    pub fn reset(&mut self) {
133        self.global_time_us = 0;
134        self.node_skews.clear();
135        self.frozen_nodes.clear();
136    }
137}
138
139impl Default for SimClock {
140    fn default() -> Self {
141        Self::new()
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148
149    #[test]
150    fn test_basic_advance() {
151        let mut clock = SimClock::new();
152        assert_eq!(clock.now_us(), 0);
153        clock.advance_ms(100);
154        assert_eq!(clock.now_us(), 100_000);
155        clock.advance_us(500);
156        assert_eq!(clock.now_us(), 100_500);
157    }
158
159    #[test]
160    fn test_node_skew() {
161        let mut clock = SimClock::new();
162        clock.advance_ms(1000);
163
164        clock.set_node_skew(1, 50_000); // 50ms ahead
165        assert_eq!(clock.node_now_us(1), 1_050_000);
166
167        clock.set_node_skew(2, -30_000); // 30ms behind
168        assert_eq!(clock.node_now_us(2), 970_000);
169
170        // No skew
171        assert_eq!(clock.node_now_us(3), 1_000_000);
172    }
173
174    #[test]
175    fn test_drift() {
176        let mut clock = SimClock::new();
177        clock.advance_ms(1000);
178
179        // Node 1 drifts 100us/sec for 10 seconds
180        clock.drift(1, 100, 10);
181        assert_eq!(clock.get_node_skew(1), 1000);
182        assert_eq!(clock.node_now_us(1), 1_001_000);
183    }
184
185    #[test]
186    fn test_step_jump() {
187        let mut clock = SimClock::new();
188        clock.advance_ms(1000);
189
190        clock.step_jump(1, 50_000); // +50ms jump
191        assert_eq!(clock.node_now_us(1), 1_050_000);
192
193        clock.step_jump(1, -100_000); // -100ms jump
194        assert_eq!(clock.node_now_us(1), 950_000);
195    }
196
197    #[test]
198    fn test_reset() {
199        let mut clock = SimClock::new();
200        clock.advance_ms(500);
201        clock.set_node_skew(1, 100);
202        clock.reset();
203        assert_eq!(clock.now_us(), 0);
204        assert_eq!(clock.get_node_skew(1), 0);
205    }
206
207    #[test]
208    fn test_freeze_and_unfreeze() {
209        let mut clock = SimClock::new();
210        clock.advance_ms(100);
211        clock.set_node_skew(1, 5_000); // +5ms
212
213        let frozen_time = clock.node_now_us(1); // 105_000
214        clock.freeze(1);
215        assert!(clock.is_frozen(1));
216
217        // Advance global time — frozen node stays the same
218        clock.advance_ms(200);
219        assert_eq!(clock.node_now_us(1), frozen_time);
220        assert_eq!(clock.now_us(), 300_000); // Global advanced
221
222        // Unfreeze — node resumes from frozen time
223        clock.unfreeze(1);
224        assert!(!clock.is_frozen(1));
225        assert_eq!(clock.node_now_us(1), frozen_time); // Still at frozen time right after unfreeze
226
227        // Further global advances now affect the node
228        clock.advance_ms(50);
229        assert_eq!(clock.node_now_us(1), frozen_time + 50_000);
230    }
231
232    #[test]
233    fn test_warp() {
234        let mut clock = SimClock::new();
235        clock.advance_ms(100);
236
237        // Warp node 1 far into the future
238        clock.warp(1, 999_000_000);
239        assert_eq!(clock.node_now_us(1), 999_000_000);
240
241        // Warp node 2 to the past
242        clock.warp(2, 10_000);
243        assert_eq!(clock.node_now_us(2), 10_000);
244
245        // Global advance also advances warped nodes
246        clock.advance_ms(10);
247        assert_eq!(clock.node_now_us(1), 999_010_000);
248        assert_eq!(clock.node_now_us(2), 20_000);
249    }
250
251    #[test]
252    fn test_warp_unfreezes() {
253        let mut clock = SimClock::new();
254        clock.advance_ms(100);
255
256        clock.freeze(1);
257        assert!(clock.is_frozen(1));
258
259        // Warp should unfreeze
260        clock.warp(1, 500_000);
261        assert!(!clock.is_frozen(1));
262        assert_eq!(clock.node_now_us(1), 500_000);
263    }
264
265    #[test]
266    fn test_node_time_never_negative() {
267        let mut clock = SimClock::new();
268        clock.advance_ms(10);
269        clock.set_node_skew(1, -1_000_000); // huge negative skew
270        assert_eq!(clock.node_now_us(1), 0); // clamped to 0
271    }
272
273    #[test]
274    fn test_leap_second_positive() {
275        let mut clock = SimClock::new();
276        clock.advance_us(10_000_000); // 10 seconds
277
278        let before = clock.node_now_us(1);
279        clock.inject_leap_second(1);
280        let after = clock.node_now_us(1);
281
282        // Positive leap second means clock goes backward by 1s
283        assert_eq!(after, before - 1_000_000);
284    }
285
286    #[test]
287    fn test_leap_second_negative() {
288        let mut clock = SimClock::new();
289        clock.advance_us(10_000_000); // 10 seconds
290
291        let before = clock.node_now_us(1);
292        clock.inject_negative_leap_second(1);
293        let after = clock.node_now_us(1);
294
295        // Negative leap second means clock jumps forward by 1s
296        assert_eq!(after, before + 1_000_000);
297    }
298}