1use crate::{
28 error::{ExoError, Result},
29 types::Timestamp,
30};
31
32const MAX_DRIFT_MS: u64 = 5_000; const DEFAULT_DETERMINISTIC_PHYSICAL_MS: u64 = 1_000_000;
39
40pub struct HybridClock {
48 physical: u64,
50 logical: u32,
52 max_drift_ms: u64,
54 physical_source: Box<dyn Fn() -> Result<u64> + Send>,
56}
57
58impl HybridClock {
59 #[must_use]
61 pub fn new() -> Self {
62 Self {
63 physical: 0,
64 logical: 0,
65 max_drift_ms: MAX_DRIFT_MS,
66 physical_source: Box::new(|| Ok(DEFAULT_DETERMINISTIC_PHYSICAL_MS)),
67 }
68 }
69
70 #[must_use]
72 pub fn with_wall_clock(physical_source: impl Fn() -> u64 + Send + 'static) -> Self {
73 Self::with_wall_clock_and_max_drift(physical_source, MAX_DRIFT_MS)
74 }
75
76 #[must_use]
78 pub fn with_wall_clock_and_max_drift(
79 physical_source: impl Fn() -> u64 + Send + 'static,
80 max_drift_ms: u64,
81 ) -> Self {
82 Self::with_fallible_wall_clock_and_max_drift(move || Ok(physical_source()), max_drift_ms)
83 }
84
85 #[must_use]
87 pub fn with_fallible_wall_clock(
88 physical_source: impl Fn() -> Result<u64> + Send + 'static,
89 ) -> Self {
90 Self::with_fallible_wall_clock_and_max_drift(physical_source, MAX_DRIFT_MS)
91 }
92
93 #[must_use]
95 pub fn with_fallible_wall_clock_and_max_drift(
96 physical_source: impl Fn() -> Result<u64> + Send + 'static,
97 max_drift_ms: u64,
98 ) -> Self {
99 Self {
100 physical: 0,
101 logical: 0,
102 max_drift_ms,
103 physical_source: Box::new(physical_source),
104 }
105 }
106
107 #[must_use]
109 pub fn max_drift_ms(&self) -> u64 {
110 self.max_drift_ms
111 }
112
113 pub fn now(&mut self) -> Result<Timestamp> {
118 let physical_now = (self.physical_source)()?;
119 if physical_now > self.physical {
120 self.physical = physical_now;
121 self.logical = 0;
122 } else {
123 advance_logical_or_carry_physical(&mut self.physical, &mut self.logical)?;
124 }
125 Ok(Timestamp::new(self.physical, self.logical))
126 }
127
128 pub fn update(&mut self, remote: &Timestamp) -> Result<Timestamp> {
138 let physical_now = (self.physical_source)()?;
139
140 if remote.physical_ms > physical_now.saturating_add(self.max_drift_ms) {
142 return Err(ExoError::ClockDrift {
143 physical_ms: remote.physical_ms,
144 tolerance_ms: self.max_drift_ms,
145 });
146 }
147
148 if physical_now > self.physical && physical_now > remote.physical_ms {
149 self.physical = physical_now;
151 self.logical = 0;
152 } else if self.physical == remote.physical_ms {
153 self.logical = self.logical.max(remote.logical);
155 advance_logical_or_carry_physical(&mut self.physical, &mut self.logical)?;
156 } else if remote.physical_ms > self.physical {
157 self.physical = remote.physical_ms;
159 self.logical = remote.logical;
160 advance_logical_or_carry_physical(&mut self.physical, &mut self.logical)?;
161 } else {
162 advance_logical_or_carry_physical(&mut self.physical, &mut self.logical)?;
164 }
165
166 Ok(Timestamp::new(self.physical, self.logical))
167 }
168
169 #[must_use]
171 pub fn is_before(a: &Timestamp, b: &Timestamp) -> bool {
172 a < b
173 }
174
175 #[must_use]
177 pub fn current(&self) -> Timestamp {
178 Timestamp::new(self.physical, self.logical)
179 }
180}
181
182fn advance_logical_or_carry_physical(physical: &mut u64, logical: &mut u32) -> Result<()> {
183 if *logical == u32::MAX {
184 if let Some(next_physical) = physical.checked_add(1) {
185 *physical = next_physical;
186 *logical = 0;
187 Ok(())
188 } else {
189 Err(ExoError::ClockOverflow {
190 physical_ms: *physical,
191 logical: *logical,
192 })
193 }
194 } else {
195 *logical += 1;
196 Ok(())
197 }
198}
199
200impl Default for HybridClock {
201 fn default() -> Self {
202 Self::new()
203 }
204}
205
206impl core::fmt::Debug for HybridClock {
207 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
208 f.debug_struct("HybridClock")
209 .field("physical", &self.physical)
210 .field("logical", &self.logical)
211 .field("max_drift_ms", &self.max_drift_ms)
212 .finish()
213 }
214}
215
216#[cfg(test)]
221mod tests {
222 use std::sync::{
223 Arc,
224 atomic::{AtomicU64, Ordering},
225 };
226
227 use super::*;
228
229 fn test_clock(initial: u64) -> (HybridClock, Arc<AtomicU64>) {
231 let time = Arc::new(AtomicU64::new(initial));
232 let t = Arc::clone(&time);
233 let clock = HybridClock::with_wall_clock(move || t.load(Ordering::Relaxed));
234 (clock, time)
235 }
236
237 #[test]
238 fn now_monotonic_same_wall_time() {
239 let (mut clock, _wall) = test_clock(1000);
240 let t1 = clock.now().expect("HLC timestamp");
241 let t2 = clock.now().expect("HLC timestamp");
242 let t3 = clock.now().expect("HLC timestamp");
243 assert!(t1 < t2);
244 assert!(t2 < t3);
245 assert_eq!(t1.physical_ms, 1000);
247 assert_eq!(t2.physical_ms, 1000);
248 assert_eq!(t3.physical_ms, 1000);
249 assert_eq!(t1.logical, 0);
251 assert_eq!(t2.logical, 1);
252 assert_eq!(t3.logical, 2);
253 }
254
255 #[test]
256 fn now_advances_with_wall_clock() {
257 let (mut clock, wall) = test_clock(1000);
258 let t1 = clock.now().expect("HLC timestamp");
259 wall.store(2000, Ordering::Relaxed);
260 let t2 = clock.now().expect("HLC timestamp");
261 assert!(t1 < t2);
262 assert_eq!(t2.physical_ms, 2000);
263 assert_eq!(t2.logical, 0);
264 }
265
266 #[test]
267 fn now_handles_backward_wall_clock() {
268 let (mut clock, wall) = test_clock(2000);
269 let t1 = clock.now().expect("HLC timestamp");
270 wall.store(1000, Ordering::Relaxed); let t2 = clock.now().expect("HLC timestamp");
272 assert!(t1 < t2);
273 assert_eq!(t2.physical_ms, 2000);
275 assert_eq!(t2.logical, 1);
276 }
277
278 #[test]
279 fn update_wall_ahead_of_both() {
280 let (mut clock, wall) = test_clock(1000);
281 let _ = clock.now().expect("HLC timestamp");
282 wall.store(5000, Ordering::Relaxed);
283 let remote = Timestamp::new(3000, 5);
284 let result = clock.update(&remote).expect("ok");
285 assert_eq!(result.physical_ms, 5000);
286 assert_eq!(result.logical, 0);
287 }
288
289 #[test]
290 fn update_remote_ahead() {
291 let (mut clock, _wall) = test_clock(1000);
292 let _ = clock.now().expect("HLC timestamp");
293 let remote = Timestamp::new(2000, 10);
294 let result = clock.update(&remote).expect("ok");
295 assert_eq!(result.physical_ms, 2000);
296 assert_eq!(result.logical, 11);
297 }
298
299 #[test]
300 fn update_same_physical() {
301 let (mut clock, _wall) = test_clock(1000);
302 let _ = clock.now().expect("HLC timestamp"); let remote = Timestamp::new(1000, 5);
304 let result = clock.update(&remote).expect("ok");
305 assert_eq!(result.physical_ms, 1000);
306 assert_eq!(result.logical, 6);
308 }
309
310 #[test]
311 fn update_local_ahead() {
312 let (mut clock, wall) = test_clock(3000);
313 let _ = clock.now().expect("HLC timestamp"); wall.store(1000, Ordering::Relaxed); let remote = Timestamp::new(2000, 0);
316 let result = clock.update(&remote).expect("ok");
317 assert_eq!(result.physical_ms, 3000);
319 assert_eq!(result.logical, 1);
320 }
321
322 #[test]
323 fn update_rejects_excessive_drift() {
324 let (mut clock, _wall) = test_clock(1000);
325 let remote = Timestamp::new(1000 + MAX_DRIFT_MS + 1, 0);
326 let err = clock.update(&remote).unwrap_err();
327 assert!(matches!(err, ExoError::ClockDrift { .. }));
328 }
329
330 #[test]
331 fn update_rejects_remote_more_than_default_five_seconds_ahead() {
332 let (mut clock, _wall) = test_clock(1000);
333 let remote = Timestamp::new(1000 + 5_001, 0);
334
335 let err = clock
336 .update(&remote)
337 .expect_err("default HLC drift tolerance must be no more than five seconds");
338
339 assert!(matches!(
340 err,
341 ExoError::ClockDrift {
342 physical_ms: 6001,
343 tolerance_ms: 5000
344 }
345 ));
346 }
347
348 #[test]
349 fn update_accepts_at_drift_boundary() {
350 let (mut clock, _wall) = test_clock(1000);
351 let remote = Timestamp::new(1000 + MAX_DRIFT_MS, 0);
352 let result = clock.update(&remote);
353 assert!(result.is_ok());
354 }
355
356 #[test]
357 fn update_uses_deployment_configured_drift_tolerance() {
358 let mut boundary_clock = HybridClock::with_wall_clock_and_max_drift(|| 1000, 12_000);
359 let boundary = boundary_clock
360 .update(&Timestamp::new(13_000, 0))
361 .expect("configured drift boundary should be accepted");
362 assert_eq!(boundary, Timestamp::new(13_000, 1));
363
364 let mut over_boundary_clock = HybridClock::with_wall_clock_and_max_drift(|| 1000, 12_000);
365 let err = over_boundary_clock
366 .update(&Timestamp::new(13_001, 0))
367 .expect_err("remote timestamp beyond configured drift must be rejected");
368
369 assert!(matches!(
370 err,
371 ExoError::ClockDrift {
372 physical_ms: 13001,
373 tolerance_ms: 12000
374 }
375 ));
376 }
377
378 #[test]
379 fn is_before_ordering() {
380 let a = Timestamp::new(1, 0);
381 let b = Timestamp::new(1, 1);
382 let c = Timestamp::new(2, 0);
383 assert!(HybridClock::is_before(&a, &b));
384 assert!(HybridClock::is_before(&b, &c));
385 assert!(HybridClock::is_before(&a, &c));
386 assert!(!HybridClock::is_before(&b, &a));
387 assert!(!HybridClock::is_before(&a, &a));
388 }
389
390 #[test]
391 fn current_does_not_advance() {
392 let (mut clock, _wall) = test_clock(1000);
393 let _ = clock.now().expect("HLC timestamp");
394 let c1 = clock.current();
395 let c2 = clock.current();
396 assert_eq!(c1, c2);
397 }
398
399 #[test]
400 fn debug_format() {
401 let (clock, _wall) = test_clock(42);
402 let dbg = format!("{clock:?}");
403 assert!(dbg.contains("HybridClock"));
404 }
405
406 #[test]
407 fn default_clock() {
408 let mut clock = HybridClock::default();
409 let t = clock.now().expect("HLC timestamp");
410 assert_eq!(t, Timestamp::new(DEFAULT_DETERMINISTIC_PHYSICAL_MS, 0));
411 }
412
413 #[test]
414 fn default_clock_advances_logical_time_at_fixed_physical_epoch() {
415 let mut clock = HybridClock::default();
416
417 let first = clock.now().expect("first HLC timestamp");
418 let second = clock.now().expect("second HLC timestamp");
419 let third = clock.now().expect("third HLC timestamp");
420
421 assert_eq!(first, Timestamp::new(DEFAULT_DETERMINISTIC_PHYSICAL_MS, 0));
422 assert_eq!(second, Timestamp::new(DEFAULT_DETERMINISTIC_PHYSICAL_MS, 1));
423 assert_eq!(third, Timestamp::new(DEFAULT_DETERMINISTIC_PHYSICAL_MS, 2));
424 }
425
426 #[test]
427 fn production_hlc_source_does_not_read_host_wall_clock() {
428 let production = include_str!("hlc.rs")
429 .split("// ===========================================================================")
430 .next()
431 .expect("production section");
432 let system_time_now = format!("{}{}", "SystemTime::", "now()");
433 let date_now = format!("{}{}", "Date::", "now()");
434
435 assert!(
436 !production.contains(&system_time_now),
437 "production HLC must not read host SystemTime; callers must use deterministic HLC sources"
438 );
439 assert!(
440 !production.contains(&date_now),
441 "production HLC must not read browser Date.now; callers must use deterministic HLC sources"
442 );
443 assert!(
444 !production.contains("std::time"),
445 "production HLC must not import host wall-clock APIs"
446 );
447 assert!(
448 !production.contains("js_sys::Date"),
449 "production HLC must not import browser wall-clock APIs"
450 );
451 assert!(
452 !production.contains("fetch_update"),
453 "default HLC must not fabricate elapsed physical milliseconds from call count"
454 );
455 }
456
457 #[test]
458 fn concurrent_updates_maintain_monotonicity() {
459 let (mut clock, wall) = test_clock(100);
460 let _ = clock.now().expect("HLC timestamp");
461
462 let remotes = [
464 Timestamp::new(100, 3),
465 Timestamp::new(100, 1),
466 Timestamp::new(100, 7),
467 Timestamp::new(100, 2),
468 ];
469
470 let mut last = clock.current();
471 for r in &remotes {
472 let ts = clock.update(r).expect("ok");
473 assert!(ts > last, "monotonicity violated: {ts:?} <= {last:?}");
474 last = ts;
475 }
476
477 wall.store(200, Ordering::Relaxed);
479 let ts = clock.now().expect("HLC timestamp");
480 assert!(ts > last);
481 assert_eq!(ts.physical_ms, 200);
482 assert_eq!(ts.logical, 0);
483 }
484
485 #[test]
486 fn now_remains_monotonic_when_logical_counter_is_exhausted() {
487 let (mut clock, _wall) = test_clock(1000);
488 clock.physical = 1000;
489 clock.logical = u32::MAX;
490
491 let ts = clock.now().expect("HLC timestamp");
492
493 assert!(ts > Timestamp::new(1000, u32::MAX));
494 assert_eq!(ts.physical_ms, 1001);
495 assert_eq!(ts.logical, 0);
496 }
497
498 #[test]
499 fn update_remains_monotonic_when_logical_counter_is_exhausted() {
500 let (mut clock, _wall) = test_clock(1000);
501 clock.physical = 1000;
502 clock.logical = u32::MAX;
503
504 let ts = clock.update(&Timestamp::new(1000, u32::MAX)).expect("ok");
505
506 assert!(ts > Timestamp::new(1000, u32::MAX));
507 assert_eq!(ts.physical_ms, 1001);
508 assert_eq!(ts.logical, 0);
509 }
510
511 #[test]
512 fn now_rejects_terminal_clock_exhaustion_without_reusing_timestamp() {
513 let (mut clock, _wall) = test_clock(u64::MAX);
514 clock.physical = u64::MAX;
515 clock.logical = u32::MAX;
516
517 let err = clock
518 .now()
519 .expect_err("terminal HLC state must fail closed");
520
521 assert!(matches!(
522 err,
523 ExoError::ClockOverflow {
524 physical_ms: u64::MAX,
525 logical: u32::MAX
526 }
527 ));
528 assert_eq!(clock.current(), Timestamp::new(u64::MAX, u32::MAX));
529 }
530
531 #[test]
532 fn update_rejects_terminal_clock_exhaustion_without_reusing_timestamp() {
533 let (mut clock, _wall) = test_clock(u64::MAX);
534 clock.physical = u64::MAX;
535 clock.logical = u32::MAX;
536
537 let err = clock
538 .update(&Timestamp::new(u64::MAX, u32::MAX))
539 .expect_err("terminal HLC update must fail closed");
540
541 assert!(matches!(
542 err,
543 ExoError::ClockOverflow {
544 physical_ms: u64::MAX,
545 logical: u32::MAX
546 }
547 ));
548 assert_eq!(clock.current(), Timestamp::new(u64::MAX, u32::MAX));
549 }
550
551 #[test]
552 fn now_propagates_wall_clock_error_without_mutating_state() {
553 let mut clock = HybridClock::with_fallible_wall_clock(|| {
554 Err(ExoError::ClockUnavailable {
555 reason: "injected wall-clock failure".into(),
556 })
557 });
558
559 let err = clock
560 .now()
561 .expect_err("wall-clock failures must fail closed");
562
563 assert!(matches!(err, ExoError::ClockUnavailable { .. }));
564 assert_eq!(clock.current(), Timestamp::new(0, 0));
565 }
566
567 #[test]
568 fn update_propagates_wall_clock_error_without_mutating_state() {
569 let calls = Arc::new(AtomicU64::new(0));
570 let calls_for_clock = Arc::clone(&calls);
571 let mut clock = HybridClock::with_fallible_wall_clock(move || {
572 if calls_for_clock.fetch_add(1, Ordering::Relaxed) == 0 {
573 Ok(1000)
574 } else {
575 Err(ExoError::ClockUnavailable {
576 reason: "injected wall-clock failure".into(),
577 })
578 }
579 });
580 let first = clock.now().expect("first timestamp");
581
582 let err = clock
583 .update(&Timestamp::new(1000, 0))
584 .expect_err("wall-clock failures must fail closed");
585
586 assert!(matches!(err, ExoError::ClockUnavailable { .. }));
587 assert_eq!(clock.current(), first);
588 }
589
590 #[test]
591 fn default_source_has_no_epoch_zero_fallback() {
592 let production = include_str!("hlc.rs")
593 .split("// ===========================================================================")
594 .next()
595 .expect("production section");
596
597 assert!(
598 !production.contains(".unwrap_or(0)"),
599 "HLC wall-clock failures must propagate instead of silently using epoch zero"
600 );
601 }
602}