1use std::collections::HashMap;
7use vortex_core::NodeId;
8
9pub struct SimClock {
14 global_time_us: u64,
16 node_skews: HashMap<NodeId, i64>,
18 frozen_nodes: HashMap<NodeId, u64>,
21}
22
23impl SimClock {
24 pub fn new() -> Self {
26 Self {
27 global_time_us: 0,
28 node_skews: HashMap::new(),
29 frozen_nodes: HashMap::new(),
30 }
31 }
32
33 pub fn now_us(&self) -> u64 {
35 self.global_time_us
36 }
37
38 pub fn node_now_us(&self, node_id: NodeId) -> u64 {
41 if let Some(&frozen_time) = self.frozen_nodes.get(&node_id) {
42 return frozen_time;
43 }
44 let global = self.global_time_us as i64;
45 let skew = self.node_skews.get(&node_id).copied().unwrap_or(0);
46 (global + skew).max(0) as u64
47 }
48
49 pub fn advance_us(&mut self, delta_us: u64) {
51 self.global_time_us += delta_us;
52 }
53
54 pub fn advance_ms(&mut self, delta_ms: u64) {
56 self.advance_us(delta_ms * 1000);
57 }
58
59 pub fn set_node_skew(&mut self, node_id: NodeId, skew_us: i64) {
61 self.node_skews.insert(node_id, skew_us);
62 }
63
64 pub fn get_node_skew(&self, node_id: NodeId) -> i64 {
66 self.node_skews.get(&node_id).copied().unwrap_or(0)
67 }
68
69 pub fn set_us(&mut self, time_us: u64) {
71 self.global_time_us = time_us;
72 }
73
74 pub fn drift(&mut self, node_id: NodeId, drift_us_per_sec: i64, elapsed_secs: u64) {
76 let skew = self.node_skews.entry(node_id).or_insert(0);
77 *skew += drift_us_per_sec * elapsed_secs as i64;
78 }
79
80 pub fn step_jump(&mut self, node_id: NodeId, delta_us: i64) {
82 let skew = self.node_skews.entry(node_id).or_insert(0);
83 *skew += delta_us;
84 }
85
86 pub fn freeze(&mut self, node_id: NodeId) {
89 let frozen_time = self.node_now_us(node_id);
90 self.frozen_nodes.insert(node_id, frozen_time);
91 }
92
93 pub fn unfreeze(&mut self, node_id: NodeId) {
96 if let Some(frozen_time) = self.frozen_nodes.remove(&node_id) {
97 let skew = frozen_time as i64 - self.global_time_us as i64;
99 self.node_skews.insert(node_id, skew);
100 }
101 }
102
103 pub fn is_frozen(&self, node_id: NodeId) -> bool {
105 self.frozen_nodes.contains_key(&node_id)
106 }
107
108 pub fn warp(&mut self, node_id: NodeId, target_us: u64) {
111 self.frozen_nodes.remove(&node_id);
113 let skew = target_us as i64 - self.global_time_us as i64;
114 self.node_skews.insert(node_id, skew);
115 }
116
117 pub fn inject_leap_second(&mut self, node_id: NodeId) {
120 let skew = self.node_skews.entry(node_id).or_insert(0);
121 *skew -= 1_000_000; }
123
124 pub fn inject_negative_leap_second(&mut self, node_id: NodeId) {
127 let skew = self.node_skews.entry(node_id).or_insert(0);
128 *skew += 1_000_000; }
130
131 pub fn reset(&mut self) {
133 self.global_time_us = 0;
134 self.node_skews.clear();
135 self.frozen_nodes.clear();
136 }
137}
138
139impl Default for SimClock {
140 fn default() -> Self {
141 Self::new()
142 }
143}
144
145#[cfg(test)]
146mod tests {
147 use super::*;
148
149 #[test]
150 fn test_basic_advance() {
151 let mut clock = SimClock::new();
152 assert_eq!(clock.now_us(), 0);
153 clock.advance_ms(100);
154 assert_eq!(clock.now_us(), 100_000);
155 clock.advance_us(500);
156 assert_eq!(clock.now_us(), 100_500);
157 }
158
159 #[test]
160 fn test_node_skew() {
161 let mut clock = SimClock::new();
162 clock.advance_ms(1000);
163
164 clock.set_node_skew(1, 50_000); assert_eq!(clock.node_now_us(1), 1_050_000);
166
167 clock.set_node_skew(2, -30_000); assert_eq!(clock.node_now_us(2), 970_000);
169
170 assert_eq!(clock.node_now_us(3), 1_000_000);
172 }
173
174 #[test]
175 fn test_drift() {
176 let mut clock = SimClock::new();
177 clock.advance_ms(1000);
178
179 clock.drift(1, 100, 10);
181 assert_eq!(clock.get_node_skew(1), 1000);
182 assert_eq!(clock.node_now_us(1), 1_001_000);
183 }
184
185 #[test]
186 fn test_step_jump() {
187 let mut clock = SimClock::new();
188 clock.advance_ms(1000);
189
190 clock.step_jump(1, 50_000); assert_eq!(clock.node_now_us(1), 1_050_000);
192
193 clock.step_jump(1, -100_000); assert_eq!(clock.node_now_us(1), 950_000);
195 }
196
197 #[test]
198 fn test_reset() {
199 let mut clock = SimClock::new();
200 clock.advance_ms(500);
201 clock.set_node_skew(1, 100);
202 clock.reset();
203 assert_eq!(clock.now_us(), 0);
204 assert_eq!(clock.get_node_skew(1), 0);
205 }
206
207 #[test]
208 fn test_freeze_and_unfreeze() {
209 let mut clock = SimClock::new();
210 clock.advance_ms(100);
211 clock.set_node_skew(1, 5_000); let frozen_time = clock.node_now_us(1); clock.freeze(1);
215 assert!(clock.is_frozen(1));
216
217 clock.advance_ms(200);
219 assert_eq!(clock.node_now_us(1), frozen_time);
220 assert_eq!(clock.now_us(), 300_000); clock.unfreeze(1);
224 assert!(!clock.is_frozen(1));
225 assert_eq!(clock.node_now_us(1), frozen_time); clock.advance_ms(50);
229 assert_eq!(clock.node_now_us(1), frozen_time + 50_000);
230 }
231
232 #[test]
233 fn test_warp() {
234 let mut clock = SimClock::new();
235 clock.advance_ms(100);
236
237 clock.warp(1, 999_000_000);
239 assert_eq!(clock.node_now_us(1), 999_000_000);
240
241 clock.warp(2, 10_000);
243 assert_eq!(clock.node_now_us(2), 10_000);
244
245 clock.advance_ms(10);
247 assert_eq!(clock.node_now_us(1), 999_010_000);
248 assert_eq!(clock.node_now_us(2), 20_000);
249 }
250
251 #[test]
252 fn test_warp_unfreezes() {
253 let mut clock = SimClock::new();
254 clock.advance_ms(100);
255
256 clock.freeze(1);
257 assert!(clock.is_frozen(1));
258
259 clock.warp(1, 500_000);
261 assert!(!clock.is_frozen(1));
262 assert_eq!(clock.node_now_us(1), 500_000);
263 }
264
265 #[test]
266 fn test_node_time_never_negative() {
267 let mut clock = SimClock::new();
268 clock.advance_ms(10);
269 clock.set_node_skew(1, -1_000_000); assert_eq!(clock.node_now_us(1), 0); }
272
273 #[test]
274 fn test_leap_second_positive() {
275 let mut clock = SimClock::new();
276 clock.advance_us(10_000_000); let before = clock.node_now_us(1);
279 clock.inject_leap_second(1);
280 let after = clock.node_now_us(1);
281
282 assert_eq!(after, before - 1_000_000);
284 }
285
286 #[test]
287 fn test_leap_second_negative() {
288 let mut clock = SimClock::new();
289 clock.advance_us(10_000_000); let before = clock.node_now_us(1);
292 clock.inject_negative_leap_second(1);
293 let after = clock.node_now_us(1);
294
295 assert_eq!(after, before + 1_000_000);
297 }
298}