Skip to main content

exo_core/
hlc.rs

1// Copyright 2026 Exochain Foundation
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at:
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15// SPDX-License-Identifier: Apache-2.0
16
17//! Hybrid Logical Clock (HLC) for causal ordering.
18//!
19//! The HLC combines a caller-supplied physical component with a logical
20//! counter so that:
21//!
22//! 1. Timestamps are **monotonically increasing** even when the physical source
23//!    is stale or drifts backward.
24//! 2. Causally-related events are always ordered correctly.
25//! 3. No floating-point arithmetic is involved.
26
27use crate::{
28    error::{ExoError, Result},
29    types::Timestamp,
30};
31
32/// Default maximum tolerable forward drift in milliseconds.
33/// If a remote timestamp is more than this far ahead of our local physical
34/// source, we reject it as drift.
35const MAX_DRIFT_MS: u64 = 5_000; // 5 seconds
36/// Deterministic non-zero epoch for default clocks. This intentionally keeps
37/// zero-epoch records stale without reading host wall-clock time.
38const DEFAULT_DETERMINISTIC_PHYSICAL_MS: u64 = 1_000_000;
39
40/// A Hybrid Logical Clock instance.
41///
42/// Each node in the EXOCHAIN network maintains its own `HybridClock`.
43/// The clock is driven by a deterministic physical source and a logical
44/// counter. The default constructor does not read host time; runtime adapters
45/// that need deployment-specific physical metadata must inject an explicit HLC
46/// source.
47pub struct HybridClock {
48    /// Last-known HLC physical milliseconds.
49    physical: u64,
50    /// Logical counter within the same physical millisecond.
51    logical: u32,
52    /// Maximum tolerated remote forward drift for this clock.
53    max_drift_ms: u64,
54    /// Physical source — returns current HLC physical milliseconds.
55    physical_source: Box<dyn Fn() -> Result<u64> + Send>,
56}
57
58impl HybridClock {
59    /// Create a new clock driven by EXOCHAIN's deterministic default source.
60    #[must_use]
61    pub fn new() -> Self {
62        Self {
63            physical: 0,
64            logical: 0,
65            max_drift_ms: MAX_DRIFT_MS,
66            physical_source: Box::new(|| Ok(DEFAULT_DETERMINISTIC_PHYSICAL_MS)),
67        }
68    }
69
70    /// Create a clock with a custom physical source.
71    #[must_use]
72    pub fn with_wall_clock(physical_source: impl Fn() -> u64 + Send + 'static) -> Self {
73        Self::with_wall_clock_and_max_drift(physical_source, MAX_DRIFT_MS)
74    }
75
76    /// Create a clock with a custom physical source and drift tolerance.
77    #[must_use]
78    pub fn with_wall_clock_and_max_drift(
79        physical_source: impl Fn() -> u64 + Send + 'static,
80        max_drift_ms: u64,
81    ) -> Self {
82        Self::with_fallible_wall_clock_and_max_drift(move || Ok(physical_source()), max_drift_ms)
83    }
84
85    /// Create a clock with a fallible physical source.
86    #[must_use]
87    pub fn with_fallible_wall_clock(
88        physical_source: impl Fn() -> Result<u64> + Send + 'static,
89    ) -> Self {
90        Self::with_fallible_wall_clock_and_max_drift(physical_source, MAX_DRIFT_MS)
91    }
92
93    /// Create a clock with a fallible physical source and drift tolerance.
94    #[must_use]
95    pub fn with_fallible_wall_clock_and_max_drift(
96        physical_source: impl Fn() -> Result<u64> + Send + 'static,
97        max_drift_ms: u64,
98    ) -> Self {
99        Self {
100            physical: 0,
101            logical: 0,
102            max_drift_ms,
103            physical_source: Box::new(physical_source),
104        }
105    }
106
107    /// Return this clock's configured maximum forward drift in milliseconds.
108    #[must_use]
109    pub fn max_drift_ms(&self) -> u64 {
110        self.max_drift_ms
111    }
112
113    /// Generate the next timestamp.
114    ///
115    /// Guarantees: the returned timestamp is strictly greater than any
116    /// previously returned by this clock.
117    pub fn now(&mut self) -> Result<Timestamp> {
118        let physical_now = (self.physical_source)()?;
119        if physical_now > self.physical {
120            self.physical = physical_now;
121            self.logical = 0;
122        } else {
123            advance_logical_or_carry_physical(&mut self.physical, &mut self.logical)?;
124        }
125        Ok(Timestamp::new(self.physical, self.logical))
126    }
127
128    /// Merge a remote timestamp and advance the local clock.
129    ///
130    /// The returned timestamp is guaranteed to be greater than both the
131    /// local state and the remote timestamp.
132    ///
133    /// # Errors
134    ///
135    /// Returns `ExoError::ClockDrift` if the remote timestamp is
136    /// unreasonably far ahead of the local physical source.
137    pub fn update(&mut self, remote: &Timestamp) -> Result<Timestamp> {
138        let physical_now = (self.physical_source)()?;
139
140        // Drift guard
141        if remote.physical_ms > physical_now.saturating_add(self.max_drift_ms) {
142            return Err(ExoError::ClockDrift {
143                physical_ms: remote.physical_ms,
144                tolerance_ms: self.max_drift_ms,
145            });
146        }
147
148        if physical_now > self.physical && physical_now > remote.physical_ms {
149            // Local physical source is ahead of both — reset logical
150            self.physical = physical_now;
151            self.logical = 0;
152        } else if self.physical == remote.physical_ms {
153            // Same physical — advance logical past both
154            self.logical = self.logical.max(remote.logical);
155            advance_logical_or_carry_physical(&mut self.physical, &mut self.logical)?;
156        } else if remote.physical_ms > self.physical {
157            // Remote is ahead — adopt remote physical, advance logical
158            self.physical = remote.physical_ms;
159            self.logical = remote.logical;
160            advance_logical_or_carry_physical(&mut self.physical, &mut self.logical)?;
161        } else {
162            // Local is ahead — advance own logical
163            advance_logical_or_carry_physical(&mut self.physical, &mut self.logical)?;
164        }
165
166        Ok(Timestamp::new(self.physical, self.logical))
167    }
168
169    /// Causal ordering check: returns `true` if `a` happened-before `b`.
170    #[must_use]
171    pub fn is_before(a: &Timestamp, b: &Timestamp) -> bool {
172        a < b
173    }
174
175    /// Return the current state as a `Timestamp` without advancing.
176    #[must_use]
177    pub fn current(&self) -> Timestamp {
178        Timestamp::new(self.physical, self.logical)
179    }
180}
181
182fn advance_logical_or_carry_physical(physical: &mut u64, logical: &mut u32) -> Result<()> {
183    if *logical == u32::MAX {
184        if let Some(next_physical) = physical.checked_add(1) {
185            *physical = next_physical;
186            *logical = 0;
187            Ok(())
188        } else {
189            Err(ExoError::ClockOverflow {
190                physical_ms: *physical,
191                logical: *logical,
192            })
193        }
194    } else {
195        *logical += 1;
196        Ok(())
197    }
198}
199
200impl Default for HybridClock {
201    fn default() -> Self {
202        Self::new()
203    }
204}
205
206impl core::fmt::Debug for HybridClock {
207    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
208        f.debug_struct("HybridClock")
209            .field("physical", &self.physical)
210            .field("logical", &self.logical)
211            .field("max_drift_ms", &self.max_drift_ms)
212            .finish()
213    }
214}
215
216// ===========================================================================
217// Tests
218// ===========================================================================
219
220#[cfg(test)]
221mod tests {
222    use std::sync::{
223        Arc,
224        atomic::{AtomicU64, Ordering},
225    };
226
227    use super::*;
228
229    /// Helper: create a clock with a controllable wall time.
230    fn test_clock(initial: u64) -> (HybridClock, Arc<AtomicU64>) {
231        let time = Arc::new(AtomicU64::new(initial));
232        let t = Arc::clone(&time);
233        let clock = HybridClock::with_wall_clock(move || t.load(Ordering::Relaxed));
234        (clock, time)
235    }
236
237    #[test]
238    fn now_monotonic_same_wall_time() {
239        let (mut clock, _wall) = test_clock(1000);
240        let t1 = clock.now().expect("HLC timestamp");
241        let t2 = clock.now().expect("HLC timestamp");
242        let t3 = clock.now().expect("HLC timestamp");
243        assert!(t1 < t2);
244        assert!(t2 < t3);
245        // All share the same physical
246        assert_eq!(t1.physical_ms, 1000);
247        assert_eq!(t2.physical_ms, 1000);
248        assert_eq!(t3.physical_ms, 1000);
249        // Logical increments
250        assert_eq!(t1.logical, 0);
251        assert_eq!(t2.logical, 1);
252        assert_eq!(t3.logical, 2);
253    }
254
255    #[test]
256    fn now_advances_with_wall_clock() {
257        let (mut clock, wall) = test_clock(1000);
258        let t1 = clock.now().expect("HLC timestamp");
259        wall.store(2000, Ordering::Relaxed);
260        let t2 = clock.now().expect("HLC timestamp");
261        assert!(t1 < t2);
262        assert_eq!(t2.physical_ms, 2000);
263        assert_eq!(t2.logical, 0);
264    }
265
266    #[test]
267    fn now_handles_backward_wall_clock() {
268        let (mut clock, wall) = test_clock(2000);
269        let t1 = clock.now().expect("HLC timestamp");
270        wall.store(1000, Ordering::Relaxed); // wall goes backward
271        let t2 = clock.now().expect("HLC timestamp");
272        assert!(t1 < t2);
273        // Physical stays at 2000, logical increments
274        assert_eq!(t2.physical_ms, 2000);
275        assert_eq!(t2.logical, 1);
276    }
277
278    #[test]
279    fn update_wall_ahead_of_both() {
280        let (mut clock, wall) = test_clock(1000);
281        let _ = clock.now().expect("HLC timestamp");
282        wall.store(5000, Ordering::Relaxed);
283        let remote = Timestamp::new(3000, 5);
284        let result = clock.update(&remote).expect("ok");
285        assert_eq!(result.physical_ms, 5000);
286        assert_eq!(result.logical, 0);
287    }
288
289    #[test]
290    fn update_remote_ahead() {
291        let (mut clock, _wall) = test_clock(1000);
292        let _ = clock.now().expect("HLC timestamp");
293        let remote = Timestamp::new(2000, 10);
294        let result = clock.update(&remote).expect("ok");
295        assert_eq!(result.physical_ms, 2000);
296        assert_eq!(result.logical, 11);
297    }
298
299    #[test]
300    fn update_same_physical() {
301        let (mut clock, _wall) = test_clock(1000);
302        let _ = clock.now().expect("HLC timestamp"); // physical=1000, logical=0
303        let remote = Timestamp::new(1000, 5);
304        let result = clock.update(&remote).expect("ok");
305        assert_eq!(result.physical_ms, 1000);
306        // max(local_logical=0, remote_logical=5) + 1 = 6
307        assert_eq!(result.logical, 6);
308    }
309
310    #[test]
311    fn update_local_ahead() {
312        let (mut clock, wall) = test_clock(3000);
313        let _ = clock.now().expect("HLC timestamp"); // physical=3000, logical=0
314        wall.store(1000, Ordering::Relaxed); // wall backward
315        let remote = Timestamp::new(2000, 0);
316        let result = clock.update(&remote).expect("ok");
317        // Local physical (3000) > remote (2000), local advances logical
318        assert_eq!(result.physical_ms, 3000);
319        assert_eq!(result.logical, 1);
320    }
321
322    #[test]
323    fn update_rejects_excessive_drift() {
324        let (mut clock, _wall) = test_clock(1000);
325        let remote = Timestamp::new(1000 + MAX_DRIFT_MS + 1, 0);
326        let err = clock.update(&remote).unwrap_err();
327        assert!(matches!(err, ExoError::ClockDrift { .. }));
328    }
329
330    #[test]
331    fn update_rejects_remote_more_than_default_five_seconds_ahead() {
332        let (mut clock, _wall) = test_clock(1000);
333        let remote = Timestamp::new(1000 + 5_001, 0);
334
335        let err = clock
336            .update(&remote)
337            .expect_err("default HLC drift tolerance must be no more than five seconds");
338
339        assert!(matches!(
340            err,
341            ExoError::ClockDrift {
342                physical_ms: 6001,
343                tolerance_ms: 5000
344            }
345        ));
346    }
347
348    #[test]
349    fn update_accepts_at_drift_boundary() {
350        let (mut clock, _wall) = test_clock(1000);
351        let remote = Timestamp::new(1000 + MAX_DRIFT_MS, 0);
352        let result = clock.update(&remote);
353        assert!(result.is_ok());
354    }
355
356    #[test]
357    fn update_uses_deployment_configured_drift_tolerance() {
358        let mut boundary_clock = HybridClock::with_wall_clock_and_max_drift(|| 1000, 12_000);
359        let boundary = boundary_clock
360            .update(&Timestamp::new(13_000, 0))
361            .expect("configured drift boundary should be accepted");
362        assert_eq!(boundary, Timestamp::new(13_000, 1));
363
364        let mut over_boundary_clock = HybridClock::with_wall_clock_and_max_drift(|| 1000, 12_000);
365        let err = over_boundary_clock
366            .update(&Timestamp::new(13_001, 0))
367            .expect_err("remote timestamp beyond configured drift must be rejected");
368
369        assert!(matches!(
370            err,
371            ExoError::ClockDrift {
372                physical_ms: 13001,
373                tolerance_ms: 12000
374            }
375        ));
376    }
377
378    #[test]
379    fn is_before_ordering() {
380        let a = Timestamp::new(1, 0);
381        let b = Timestamp::new(1, 1);
382        let c = Timestamp::new(2, 0);
383        assert!(HybridClock::is_before(&a, &b));
384        assert!(HybridClock::is_before(&b, &c));
385        assert!(HybridClock::is_before(&a, &c));
386        assert!(!HybridClock::is_before(&b, &a));
387        assert!(!HybridClock::is_before(&a, &a));
388    }
389
390    #[test]
391    fn current_does_not_advance() {
392        let (mut clock, _wall) = test_clock(1000);
393        let _ = clock.now().expect("HLC timestamp");
394        let c1 = clock.current();
395        let c2 = clock.current();
396        assert_eq!(c1, c2);
397    }
398
399    #[test]
400    fn debug_format() {
401        let (clock, _wall) = test_clock(42);
402        let dbg = format!("{clock:?}");
403        assert!(dbg.contains("HybridClock"));
404    }
405
406    #[test]
407    fn default_clock() {
408        let mut clock = HybridClock::default();
409        let t = clock.now().expect("HLC timestamp");
410        assert_eq!(t, Timestamp::new(DEFAULT_DETERMINISTIC_PHYSICAL_MS, 0));
411    }
412
413    #[test]
414    fn default_clock_advances_logical_time_at_fixed_physical_epoch() {
415        let mut clock = HybridClock::default();
416
417        let first = clock.now().expect("first HLC timestamp");
418        let second = clock.now().expect("second HLC timestamp");
419        let third = clock.now().expect("third HLC timestamp");
420
421        assert_eq!(first, Timestamp::new(DEFAULT_DETERMINISTIC_PHYSICAL_MS, 0));
422        assert_eq!(second, Timestamp::new(DEFAULT_DETERMINISTIC_PHYSICAL_MS, 1));
423        assert_eq!(third, Timestamp::new(DEFAULT_DETERMINISTIC_PHYSICAL_MS, 2));
424    }
425
426    #[test]
427    fn production_hlc_source_does_not_read_host_wall_clock() {
428        let production = include_str!("hlc.rs")
429            .split("// ===========================================================================")
430            .next()
431            .expect("production section");
432        let system_time_now = format!("{}{}", "SystemTime::", "now()");
433        let date_now = format!("{}{}", "Date::", "now()");
434
435        assert!(
436            !production.contains(&system_time_now),
437            "production HLC must not read host SystemTime; callers must use deterministic HLC sources"
438        );
439        assert!(
440            !production.contains(&date_now),
441            "production HLC must not read browser Date.now; callers must use deterministic HLC sources"
442        );
443        assert!(
444            !production.contains("std::time"),
445            "production HLC must not import host wall-clock APIs"
446        );
447        assert!(
448            !production.contains("js_sys::Date"),
449            "production HLC must not import browser wall-clock APIs"
450        );
451        assert!(
452            !production.contains("fetch_update"),
453            "default HLC must not fabricate elapsed physical milliseconds from call count"
454        );
455    }
456
457    #[test]
458    fn concurrent_updates_maintain_monotonicity() {
459        let (mut clock, wall) = test_clock(100);
460        let _ = clock.now().expect("HLC timestamp");
461
462        // Simulate multiple rapid remote updates
463        let remotes = [
464            Timestamp::new(100, 3),
465            Timestamp::new(100, 1),
466            Timestamp::new(100, 7),
467            Timestamp::new(100, 2),
468        ];
469
470        let mut last = clock.current();
471        for r in &remotes {
472            let ts = clock.update(r).expect("ok");
473            assert!(ts > last, "monotonicity violated: {ts:?} <= {last:?}");
474            last = ts;
475        }
476
477        // Then advance wall clock
478        wall.store(200, Ordering::Relaxed);
479        let ts = clock.now().expect("HLC timestamp");
480        assert!(ts > last);
481        assert_eq!(ts.physical_ms, 200);
482        assert_eq!(ts.logical, 0);
483    }
484
485    #[test]
486    fn now_remains_monotonic_when_logical_counter_is_exhausted() {
487        let (mut clock, _wall) = test_clock(1000);
488        clock.physical = 1000;
489        clock.logical = u32::MAX;
490
491        let ts = clock.now().expect("HLC timestamp");
492
493        assert!(ts > Timestamp::new(1000, u32::MAX));
494        assert_eq!(ts.physical_ms, 1001);
495        assert_eq!(ts.logical, 0);
496    }
497
498    #[test]
499    fn update_remains_monotonic_when_logical_counter_is_exhausted() {
500        let (mut clock, _wall) = test_clock(1000);
501        clock.physical = 1000;
502        clock.logical = u32::MAX;
503
504        let ts = clock.update(&Timestamp::new(1000, u32::MAX)).expect("ok");
505
506        assert!(ts > Timestamp::new(1000, u32::MAX));
507        assert_eq!(ts.physical_ms, 1001);
508        assert_eq!(ts.logical, 0);
509    }
510
511    #[test]
512    fn now_rejects_terminal_clock_exhaustion_without_reusing_timestamp() {
513        let (mut clock, _wall) = test_clock(u64::MAX);
514        clock.physical = u64::MAX;
515        clock.logical = u32::MAX;
516
517        let err = clock
518            .now()
519            .expect_err("terminal HLC state must fail closed");
520
521        assert!(matches!(
522            err,
523            ExoError::ClockOverflow {
524                physical_ms: u64::MAX,
525                logical: u32::MAX
526            }
527        ));
528        assert_eq!(clock.current(), Timestamp::new(u64::MAX, u32::MAX));
529    }
530
531    #[test]
532    fn update_rejects_terminal_clock_exhaustion_without_reusing_timestamp() {
533        let (mut clock, _wall) = test_clock(u64::MAX);
534        clock.physical = u64::MAX;
535        clock.logical = u32::MAX;
536
537        let err = clock
538            .update(&Timestamp::new(u64::MAX, u32::MAX))
539            .expect_err("terminal HLC update must fail closed");
540
541        assert!(matches!(
542            err,
543            ExoError::ClockOverflow {
544                physical_ms: u64::MAX,
545                logical: u32::MAX
546            }
547        ));
548        assert_eq!(clock.current(), Timestamp::new(u64::MAX, u32::MAX));
549    }
550
551    #[test]
552    fn now_propagates_wall_clock_error_without_mutating_state() {
553        let mut clock = HybridClock::with_fallible_wall_clock(|| {
554            Err(ExoError::ClockUnavailable {
555                reason: "injected wall-clock failure".into(),
556            })
557        });
558
559        let err = clock
560            .now()
561            .expect_err("wall-clock failures must fail closed");
562
563        assert!(matches!(err, ExoError::ClockUnavailable { .. }));
564        assert_eq!(clock.current(), Timestamp::new(0, 0));
565    }
566
567    #[test]
568    fn update_propagates_wall_clock_error_without_mutating_state() {
569        let calls = Arc::new(AtomicU64::new(0));
570        let calls_for_clock = Arc::clone(&calls);
571        let mut clock = HybridClock::with_fallible_wall_clock(move || {
572            if calls_for_clock.fetch_add(1, Ordering::Relaxed) == 0 {
573                Ok(1000)
574            } else {
575                Err(ExoError::ClockUnavailable {
576                    reason: "injected wall-clock failure".into(),
577                })
578            }
579        });
580        let first = clock.now().expect("first timestamp");
581
582        let err = clock
583            .update(&Timestamp::new(1000, 0))
584            .expect_err("wall-clock failures must fail closed");
585
586        assert!(matches!(err, ExoError::ClockUnavailable { .. }));
587        assert_eq!(clock.current(), first);
588    }
589
590    #[test]
591    fn default_source_has_no_epoch_zero_fallback() {
592        let production = include_str!("hlc.rs")
593            .split("// ===========================================================================")
594            .next()
595            .expect("production section");
596
597        assert!(
598            !production.contains(".unwrap_or(0)"),
599            "HLC wall-clock failures must propagate instead of silently using epoch zero"
600        );
601    }
602}