Skip to main content

ubiquisync_core/hlc/
service.rs

1//! Shared, persistent hybrid logical clock service.
2//!
3//! [`HlcService`] is the clock handle subsystems actually hold: it wraps the
4//! pure [`Hlc`](super::Hlc) in a lock so every log domain draws from one causal clock domain,
5//! and persists the clock state through [`HlcStorage`] so monotonicity survives restarts — a peer
6//! must never reissue a timestamp it already wrote, even after a crash.
7//!
8//! It is not `Clone`; construct it once per database, wrap it in an `Arc`,
9//! and hand a clone of that `Arc` to each subsystem.
10
11use std::sync::Mutex;
12
13use super::{Hlc, SkewError, Timestamp, wall_ms};
14
15/// Durable storage for the clock state: a single packed-`u64` register.
16///
17/// Implemented by storage backend crates (e.g. as one row in a metadata
18/// table). The contract is a plain register — `load` returns whatever was
19/// last `save`d, or `None` if nothing ever was. Durability must match the
20/// data it timestamps: if log entries survive a crash, the saved clock
21/// state that covered them must too.
22pub trait HlcStorage {
23    /// Backend error type surfaced through the service's results.
24    type Error;
25
26    /// The transaction/batch `save` enqueues into (e.g. a `DbBatch`).
27    /// `?Sized` so a backend can use a trait object (`dyn DbBatch`).
28    type Sink: ?Sized;
29
30    /// Load the last persisted clock state, or `None` for a fresh store.
31    fn load(&self) -> Result<Option<u64>, Self::Error>;
32
33    /// Enqueue a write of the clock state into `sink`; it becomes durable when
34    /// the caller commits the sink, not before.
35    fn save(&self, sink: &mut Self::Sink, raw: u64) -> Result<(), Self::Error>;
36}
37
38/// Error from a clock operation: either the storage backend failed, or a
39/// remote timestamp was rejected by the skew bound.
40#[derive(Debug)]
41pub enum HlcError<E> {
42    /// The remote timestamp is too far ahead of the local wall clock — the
43    /// entry carrying it must be rejected. See [`SkewError`](super::SkewError).
44    Skew(SkewError),
45    /// The storage backend failed to load or save clock state.
46    Storage(E),
47}
48
49impl<E: std::fmt::Display> std::fmt::Display for HlcError<E> {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        match self {
52            HlcError::Skew(e) => write!(f, "{e}"),
53            HlcError::Storage(e) => write!(f, "hlc storage: {e}"),
54        }
55    }
56}
57
58impl<E: std::error::Error + 'static> std::error::Error for HlcError<E> {
59    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
60        match self {
61            HlcError::Skew(e) => Some(e),
62            HlcError::Storage(e) => Some(e),
63        }
64    }
65}
66
67/// Shared HLC service: one lock-protected clock plus its persistence.
68/// Serializes `now()`/`observe()` across subsystems so they share a single
69/// causal clock domain in memory and on disk.
70pub struct HlcService<S: HlcStorage> {
71    state: Mutex<Hlc>,
72    storage: S,
73}
74
75impl<S: HlcStorage> HlcService<S> {
76    /// Seed the in-memory clock from the persisted state (0 if none) and
77    /// return the service. The persisted state is the last-observed clock
78    /// position, so causal monotonicity survives crashes.
79    pub fn open(storage: S) -> Result<Self, S::Error> {
80        let seed = storage.load()?.unwrap_or(0);
81        Ok(Self {
82            state: Mutex::new(Hlc::new(seed)),
83            storage,
84        })
85    }
86
87    /// Lock the clock, recovering the guard if a previous holder panicked.
88    /// Recovery is safe because the protected [`Hlc`] state can never be left
89    /// logically corrupt by a panic: it is only ever *replaced* with a fully
90    /// validated, strictly larger [`Timestamp`], so whether the panic came
91    /// from a `storage` call or from `tick`'s wall-ceiling assert, the
92    /// recovered state is an intact, monotonic prior value. A recovered clock
93    /// can only tick forward, so it can't hand back an out-of-order or
94    /// duplicate timestamp, and the underlying failure resurfaces on the next
95    /// operation. This beats `unwrap()`, which would let one subsystem's
96    /// panic crash every subsystem sharing the clock.
97    fn lock(&self) -> std::sync::MutexGuard<'_, Hlc> {
98        self.state.lock().unwrap_or_else(|poisoned| poisoned.into_inner())
99    }
100
101    /// Generate a fresh timestamp for a local write and enqueue the new state
102    /// into `sink`. Always advances; always writes — the state must reach the
103    /// committing batch, or a crash could reissue it.
104    pub fn now(&self, sink: &mut S::Sink) -> Result<Timestamp, S::Error> {
105        let mut hlc = self.lock();
106        let ts = hlc.tick(wall_ms());
107        self.storage.save(sink, hlc.state().raw())?;
108        Ok(ts)
109    }
110
111    /// Absorb a remote timestamp, enforcing the skew bound (see
112    /// [`Hlc::observe`]). Saves into `sink` only when the in-memory state
113    /// actually advances — observe is a no-op for stale receipts and we don't
114    /// want to spam the register.
115    ///
116    /// `local_wall_ms` is supplied by the caller — typically a single
117    /// [`wall_ms`](super::wall_ms) reading taken once when a received
118    /// batch starts replaying — rather than read here per call. This judges a
119    /// whole batch against one reference instant, so entries don't drift in
120    /// and out of the skew window depending on where they fall in the loop,
121    /// and it keeps the skew tests deterministic. The wall clock only moves
122    /// forward across a batch, so a shared start-of-batch reading is at worst
123    /// conservative (it may reject a borderline entry that a fresher reading
124    /// would admit) — never unsound, since it can't widen the window.
125    pub fn observe(
126        &self,
127        received: Timestamp,
128        local_wall_ms: u64,
129        sink: &mut S::Sink,
130    ) -> Result<(), HlcError<S::Error>> {
131        let mut hlc = self.lock();
132        let before = hlc.state();
133        hlc.observe(received, local_wall_ms)
134            .map_err(HlcError::Skew)?;
135        let after = hlc.state();
136        if after != before {
137            self.storage.save(sink, after.raw()).map_err(HlcError::Storage)?;
138        }
139        Ok(())
140    }
141
142    /// Current clock state without advancing it. Cheap snapshot for callers
143    /// that only need to peek (e.g. tests, diagnostics).
144    pub fn state(&self) -> Timestamp {
145        self.lock().state()
146    }
147}
148
149#[cfg(test)]
150mod tests {
151    use super::*;
152    use super::super::{MAX_SKEW_MS, wall_ms};
153    use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
154
155    /// In-memory register standing in for a backend metadata row.
156    #[derive(Default)]
157    struct MemStorage {
158        /// Last saved clock state; meaningful only when `present` is set.
159        value: AtomicU64,
160        /// Count of `save` calls, so tests can assert persistence cadence.
161        saves: AtomicUsize,
162        /// Whether anything has been saved yet — drives `load` returning
163        /// `None` for a fresh store rather than relying on a value sentinel.
164        present: std::sync::atomic::AtomicBool,
165    }
166
167    impl HlcStorage for &MemStorage {
168        type Error = std::convert::Infallible;
169        type Sink = ();
170
171        fn load(&self) -> Result<Option<u64>, Self::Error> {
172            Ok(self
173                .present
174                .load(Ordering::SeqCst)
175                .then(|| self.value.load(Ordering::SeqCst)))
176        }
177
178        fn save(&self, _sink: &mut (), raw: u64) -> Result<(), Self::Error> {
179            self.value.store(raw, Ordering::SeqCst);
180            self.present.store(true, Ordering::SeqCst);
181            self.saves.fetch_add(1, Ordering::SeqCst);
182            Ok(())
183        }
184    }
185
186    #[test]
187    fn now_persists_every_tick() {
188        // Goal: a timestamp is never handed out without its covering state
189        // being saved — a crash must not reissue one.
190        // Given: a fresh service over empty storage.
191        let mem = MemStorage::default();
192        let svc = HlcService::open(&mem).unwrap();
193        // When: ticking twice. Then: storage holds the latest tick and was
194        // written once per tick.
195        let t1 = svc.now(&mut ()).unwrap();
196        let t2 = svc.now(&mut ()).unwrap();
197        assert!(t2 > t1);
198        assert_eq!(mem.value.load(Ordering::SeqCst), t2.raw());
199        assert_eq!(mem.saves.load(Ordering::SeqCst), 2);
200    }
201
202    #[test]
203    fn reopen_resumes_past_persisted_state() {
204        // Goal: monotonicity survives a restart.
205        // Given: a service that ticked, then was dropped.
206        let mem = MemStorage::default();
207        let last = {
208            let svc = HlcService::open(&mem).unwrap();
209            svc.now(&mut ()).unwrap()
210        };
211        // When: reopening over the same storage and ticking.
212        let svc2 = HlcService::open(&mem).unwrap();
213        let t = svc2.now(&mut ()).unwrap();
214        // Then: the new tick beats everything from the previous life.
215        assert!(t > last);
216    }
217
218    #[test]
219    fn observe_persists_only_on_advance() {
220        // Goal: stale receipts don't spam the storage register.
221        // Given: a service whose clock has advanced past some remote ts.
222        let mem = MemStorage::default();
223        let svc = HlcService::open(&mem).unwrap();
224        let local = wall_ms();
225        let ahead = Timestamp::from_parts(local + MAX_SKEW_MS, 7);
226        svc.observe(ahead, local, &mut ()).unwrap();
227        let saves_after_advance = mem.saves.load(Ordering::SeqCst);
228        assert_eq!(saves_after_advance, 1, "advancing observe persists");
229        // When: observing something older. Then: no new save.
230        svc.observe(Timestamp::from_parts(local, 0), local, &mut ()).unwrap();
231        assert_eq!(mem.saves.load(Ordering::SeqCst), saves_after_advance);
232        assert_eq!(svc.state(), ahead);
233    }
234
235    #[test]
236    fn observe_beyond_skew_errors_and_persists_nothing() {
237        // Goal: the service surfaces the clock's skew rejection and leaves
238        // both memory and storage untouched.
239        // Given: a remote timestamp beyond the skew window.
240        let mem = MemStorage::default();
241        let svc = HlcService::open(&mem).unwrap();
242        let local = wall_ms();
243        let too_far = Timestamp::from_parts(local + MAX_SKEW_MS + 1, 0);
244        // When: observing it. Then: HlcError::Skew, state still 0, no save.
245        let err = svc.observe(too_far, local, &mut ()).unwrap_err();
246        assert!(matches!(err, HlcError::Skew(_)));
247        assert_eq!(svc.state(), Timestamp::from_raw(0));
248        assert_eq!(mem.saves.load(Ordering::SeqCst), 0);
249    }
250
251    /// Storage whose `save` always fails — exercises the fallible path that
252    /// the `Infallible` `MemStorage` can't reach.
253    struct FailingStorage;
254
255    impl HlcStorage for FailingStorage {
256        type Error = &'static str;
257        type Sink = ();
258
259        fn load(&self) -> Result<Option<u64>, Self::Error> {
260            Ok(None)
261        }
262
263        fn save(&self, _sink: &mut (), _raw: u64) -> Result<(), Self::Error> {
264            Err("save failed")
265        }
266    }
267
268    #[test]
269    fn now_propagates_save_failure() {
270        // A timestamp must never be handed back when its covering state could
271        // not be persisted — the storage error propagates in its place.
272        let svc = HlcService::open(FailingStorage).unwrap();
273        assert!(svc.now(&mut ()).is_err());
274    }
275
276    #[test]
277    fn advancing_observe_surfaces_save_failure_as_storage_error() {
278        // An advancing observe that can't persist must report Storage, not
279        // Skew — the timestamp was within the window; only the save failed.
280        let svc = HlcService::open(FailingStorage).unwrap();
281        let local = wall_ms();
282        let ahead = Timestamp::from_parts(local + MAX_SKEW_MS, 1);
283        assert!(matches!(
284            svc.observe(ahead, local, &mut ()),
285            Err(HlcError::Storage(_))
286        ));
287    }
288
289    #[test]
290    fn lock_recovers_from_a_poisoned_clock() {
291        use std::panic::{catch_unwind, AssertUnwindSafe};
292        // Goal: a panic that poisons the clock mutex (as a panicking
293        // `storage.save` would) does not brick the clock for everyone else.
294        let mem = MemStorage::default();
295        let svc = HlcService::open(&mem).unwrap();
296        let t1 = svc.now(&mut ()).unwrap();
297        // Poison the mutex by panicking while holding the guard.
298        let _ = catch_unwind(AssertUnwindSafe(|| {
299            let _guard = svc.lock();
300            panic!("poison the clock");
301        }));
302        // A bare `.lock().unwrap()` would panic here; `lock()` recovers the
303        // guard, so the clock stays usable and still strictly monotone.
304        let t2 = svc.now(&mut ()).unwrap();
305        assert!(t2 > t1);
306    }
307
308    #[test]
309    fn concurrent_ticks_are_all_distinct() {
310        // Goal: the mutex actually serializes ticks — under contention every
311        // issued timestamp is unique (hence strictly ordered), none reused.
312        let mem = MemStorage::default();
313        let svc = HlcService::open(&mem).unwrap();
314        let mut all: Vec<Timestamp> = std::thread::scope(|s| {
315            let handles: Vec<_> = (0..8)
316                .map(|_| s.spawn(|| (0..100).map(|_| svc.now(&mut ()).unwrap()).collect::<Vec<_>>()))
317                .collect();
318            handles
319                .into_iter()
320                .flat_map(|h| h.join().unwrap())
321                .collect()
322        });
323        let total = all.len();
324        all.sort();
325        all.dedup();
326        assert_eq!(all.len(), total, "every concurrent tick must be unique");
327    }
328}