ubiquisync_core/hlc/service.rs
1//! Shared, persistent hybrid logical clock service.
2//!
3//! [`HlcService`] is the clock handle subsystems actually hold: it wraps the
4//! pure [`Hlc`](super::Hlc) in a lock so every log domain draws from one causal clock domain,
5//! and persists the clock state through [`HlcStorage`] so monotonicity survives restarts — a peer
6//! must never reissue a timestamp it already wrote, even after a crash.
7//!
8//! It is not `Clone`; construct it once per database, wrap it in an `Arc`,
9//! and hand a clone of that `Arc` to each subsystem.
10
11use std::sync::Mutex;
12
13use super::{Hlc, SkewError, Timestamp, wall_ms};
14
15/// Durable storage for the clock state: a single packed-`u64` register.
16///
17/// Implemented by storage backend crates (e.g. as one row in a metadata
18/// table). The contract is a plain register — `load` returns whatever was
19/// last `save`d, or `None` if nothing ever was. Durability must match the
20/// data it timestamps: if log entries survive a crash, the saved clock
21/// state that covered them must too.
22pub trait HlcStorage {
23 /// Backend error type surfaced through the service's results.
24 type Error;
25
26 /// The transaction/batch `save` enqueues into (e.g. a `DbBatch`).
27 /// `?Sized` so a backend can use a trait object (`dyn DbBatch`).
28 type Sink: ?Sized;
29
30 /// Load the last persisted clock state, or `None` for a fresh store.
31 fn load(&self) -> Result<Option<u64>, Self::Error>;
32
33 /// Enqueue a write of the clock state into `sink`; it becomes durable when
34 /// the caller commits the sink, not before.
35 fn save(&self, sink: &mut Self::Sink, raw: u64) -> Result<(), Self::Error>;
36}
37
38/// Error from a clock operation: either the storage backend failed, or a
39/// remote timestamp was rejected by the skew bound.
40#[derive(Debug)]
41pub enum HlcError<E> {
42 /// The remote timestamp is too far ahead of the local wall clock — the
43 /// entry carrying it must be rejected. See [`SkewError`](super::SkewError).
44 Skew(SkewError),
45 /// The storage backend failed to load or save clock state.
46 Storage(E),
47}
48
49impl<E: std::fmt::Display> std::fmt::Display for HlcError<E> {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 match self {
52 HlcError::Skew(e) => write!(f, "{e}"),
53 HlcError::Storage(e) => write!(f, "hlc storage: {e}"),
54 }
55 }
56}
57
58impl<E: std::error::Error + 'static> std::error::Error for HlcError<E> {
59 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
60 match self {
61 HlcError::Skew(e) => Some(e),
62 HlcError::Storage(e) => Some(e),
63 }
64 }
65}
66
67/// Shared HLC service: one lock-protected clock plus its persistence.
68/// Serializes `now()`/`observe()` across subsystems so they share a single
69/// causal clock domain in memory and on disk.
70pub struct HlcService<S: HlcStorage> {
71 state: Mutex<Hlc>,
72 storage: S,
73}
74
75impl<S: HlcStorage> HlcService<S> {
76 /// Seed the in-memory clock from the persisted state (0 if none) and
77 /// return the service. The persisted state is the last-observed clock
78 /// position, so causal monotonicity survives crashes.
79 pub fn open(storage: S) -> Result<Self, S::Error> {
80 let seed = storage.load()?.unwrap_or(0);
81 Ok(Self {
82 state: Mutex::new(Hlc::new(seed)),
83 storage,
84 })
85 }
86
87 /// Lock the clock, recovering the guard if a previous holder panicked.
88 /// Recovery is safe because the protected [`Hlc`] state can never be left
89 /// logically corrupt by a panic: it is only ever *replaced* with a fully
90 /// validated, strictly larger [`Timestamp`], so whether the panic came
91 /// from a `storage` call or from `tick`'s wall-ceiling assert, the
92 /// recovered state is an intact, monotonic prior value. A recovered clock
93 /// can only tick forward, so it can't hand back an out-of-order or
94 /// duplicate timestamp, and the underlying failure resurfaces on the next
95 /// operation. This beats `unwrap()`, which would let one subsystem's
96 /// panic crash every subsystem sharing the clock.
97 fn lock(&self) -> std::sync::MutexGuard<'_, Hlc> {
98 self.state.lock().unwrap_or_else(|poisoned| poisoned.into_inner())
99 }
100
101 /// Generate a fresh timestamp for a local write and enqueue the new state
102 /// into `sink`. Always advances; always writes — the state must reach the
103 /// committing batch, or a crash could reissue it.
104 pub fn now(&self, sink: &mut S::Sink) -> Result<Timestamp, S::Error> {
105 let mut hlc = self.lock();
106 let ts = hlc.tick(wall_ms());
107 self.storage.save(sink, hlc.state().raw())?;
108 Ok(ts)
109 }
110
111 /// Absorb a remote timestamp, enforcing the skew bound (see
112 /// [`Hlc::observe`]). Saves into `sink` only when the in-memory state
113 /// actually advances — observe is a no-op for stale receipts and we don't
114 /// want to spam the register.
115 ///
116 /// `local_wall_ms` is supplied by the caller — typically a single
117 /// [`wall_ms`](super::wall_ms) reading taken once when a received
118 /// batch starts replaying — rather than read here per call. This judges a
119 /// whole batch against one reference instant, so entries don't drift in
120 /// and out of the skew window depending on where they fall in the loop,
121 /// and it keeps the skew tests deterministic. The wall clock only moves
122 /// forward across a batch, so a shared start-of-batch reading is at worst
123 /// conservative (it may reject a borderline entry that a fresher reading
124 /// would admit) — never unsound, since it can't widen the window.
125 pub fn observe(
126 &self,
127 received: Timestamp,
128 local_wall_ms: u64,
129 sink: &mut S::Sink,
130 ) -> Result<(), HlcError<S::Error>> {
131 let mut hlc = self.lock();
132 let before = hlc.state();
133 hlc.observe(received, local_wall_ms)
134 .map_err(HlcError::Skew)?;
135 let after = hlc.state();
136 if after != before {
137 self.storage.save(sink, after.raw()).map_err(HlcError::Storage)?;
138 }
139 Ok(())
140 }
141
142 /// Current clock state without advancing it. Cheap snapshot for callers
143 /// that only need to peek (e.g. tests, diagnostics).
144 pub fn state(&self) -> Timestamp {
145 self.lock().state()
146 }
147}
148
149#[cfg(test)]
150mod tests {
151 use super::*;
152 use super::super::{MAX_SKEW_MS, wall_ms};
153 use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
154
155 /// In-memory register standing in for a backend metadata row.
156 #[derive(Default)]
157 struct MemStorage {
158 /// Last saved clock state; meaningful only when `present` is set.
159 value: AtomicU64,
160 /// Count of `save` calls, so tests can assert persistence cadence.
161 saves: AtomicUsize,
162 /// Whether anything has been saved yet — drives `load` returning
163 /// `None` for a fresh store rather than relying on a value sentinel.
164 present: std::sync::atomic::AtomicBool,
165 }
166
167 impl HlcStorage for &MemStorage {
168 type Error = std::convert::Infallible;
169 type Sink = ();
170
171 fn load(&self) -> Result<Option<u64>, Self::Error> {
172 Ok(self
173 .present
174 .load(Ordering::SeqCst)
175 .then(|| self.value.load(Ordering::SeqCst)))
176 }
177
178 fn save(&self, _sink: &mut (), raw: u64) -> Result<(), Self::Error> {
179 self.value.store(raw, Ordering::SeqCst);
180 self.present.store(true, Ordering::SeqCst);
181 self.saves.fetch_add(1, Ordering::SeqCst);
182 Ok(())
183 }
184 }
185
186 #[test]
187 fn now_persists_every_tick() {
188 // Goal: a timestamp is never handed out without its covering state
189 // being saved — a crash must not reissue one.
190 // Given: a fresh service over empty storage.
191 let mem = MemStorage::default();
192 let svc = HlcService::open(&mem).unwrap();
193 // When: ticking twice. Then: storage holds the latest tick and was
194 // written once per tick.
195 let t1 = svc.now(&mut ()).unwrap();
196 let t2 = svc.now(&mut ()).unwrap();
197 assert!(t2 > t1);
198 assert_eq!(mem.value.load(Ordering::SeqCst), t2.raw());
199 assert_eq!(mem.saves.load(Ordering::SeqCst), 2);
200 }
201
202 #[test]
203 fn reopen_resumes_past_persisted_state() {
204 // Goal: monotonicity survives a restart.
205 // Given: a service that ticked, then was dropped.
206 let mem = MemStorage::default();
207 let last = {
208 let svc = HlcService::open(&mem).unwrap();
209 svc.now(&mut ()).unwrap()
210 };
211 // When: reopening over the same storage and ticking.
212 let svc2 = HlcService::open(&mem).unwrap();
213 let t = svc2.now(&mut ()).unwrap();
214 // Then: the new tick beats everything from the previous life.
215 assert!(t > last);
216 }
217
218 #[test]
219 fn observe_persists_only_on_advance() {
220 // Goal: stale receipts don't spam the storage register.
221 // Given: a service whose clock has advanced past some remote ts.
222 let mem = MemStorage::default();
223 let svc = HlcService::open(&mem).unwrap();
224 let local = wall_ms();
225 let ahead = Timestamp::from_parts(local + MAX_SKEW_MS, 7);
226 svc.observe(ahead, local, &mut ()).unwrap();
227 let saves_after_advance = mem.saves.load(Ordering::SeqCst);
228 assert_eq!(saves_after_advance, 1, "advancing observe persists");
229 // When: observing something older. Then: no new save.
230 svc.observe(Timestamp::from_parts(local, 0), local, &mut ()).unwrap();
231 assert_eq!(mem.saves.load(Ordering::SeqCst), saves_after_advance);
232 assert_eq!(svc.state(), ahead);
233 }
234
235 #[test]
236 fn observe_beyond_skew_errors_and_persists_nothing() {
237 // Goal: the service surfaces the clock's skew rejection and leaves
238 // both memory and storage untouched.
239 // Given: a remote timestamp beyond the skew window.
240 let mem = MemStorage::default();
241 let svc = HlcService::open(&mem).unwrap();
242 let local = wall_ms();
243 let too_far = Timestamp::from_parts(local + MAX_SKEW_MS + 1, 0);
244 // When: observing it. Then: HlcError::Skew, state still 0, no save.
245 let err = svc.observe(too_far, local, &mut ()).unwrap_err();
246 assert!(matches!(err, HlcError::Skew(_)));
247 assert_eq!(svc.state(), Timestamp::from_raw(0));
248 assert_eq!(mem.saves.load(Ordering::SeqCst), 0);
249 }
250
251 /// Storage whose `save` always fails — exercises the fallible path that
252 /// the `Infallible` `MemStorage` can't reach.
253 struct FailingStorage;
254
255 impl HlcStorage for FailingStorage {
256 type Error = &'static str;
257 type Sink = ();
258
259 fn load(&self) -> Result<Option<u64>, Self::Error> {
260 Ok(None)
261 }
262
263 fn save(&self, _sink: &mut (), _raw: u64) -> Result<(), Self::Error> {
264 Err("save failed")
265 }
266 }
267
268 #[test]
269 fn now_propagates_save_failure() {
270 // A timestamp must never be handed back when its covering state could
271 // not be persisted — the storage error propagates in its place.
272 let svc = HlcService::open(FailingStorage).unwrap();
273 assert!(svc.now(&mut ()).is_err());
274 }
275
276 #[test]
277 fn advancing_observe_surfaces_save_failure_as_storage_error() {
278 // An advancing observe that can't persist must report Storage, not
279 // Skew — the timestamp was within the window; only the save failed.
280 let svc = HlcService::open(FailingStorage).unwrap();
281 let local = wall_ms();
282 let ahead = Timestamp::from_parts(local + MAX_SKEW_MS, 1);
283 assert!(matches!(
284 svc.observe(ahead, local, &mut ()),
285 Err(HlcError::Storage(_))
286 ));
287 }
288
289 #[test]
290 fn lock_recovers_from_a_poisoned_clock() {
291 use std::panic::{catch_unwind, AssertUnwindSafe};
292 // Goal: a panic that poisons the clock mutex (as a panicking
293 // `storage.save` would) does not brick the clock for everyone else.
294 let mem = MemStorage::default();
295 let svc = HlcService::open(&mem).unwrap();
296 let t1 = svc.now(&mut ()).unwrap();
297 // Poison the mutex by panicking while holding the guard.
298 let _ = catch_unwind(AssertUnwindSafe(|| {
299 let _guard = svc.lock();
300 panic!("poison the clock");
301 }));
302 // A bare `.lock().unwrap()` would panic here; `lock()` recovers the
303 // guard, so the clock stays usable and still strictly monotone.
304 let t2 = svc.now(&mut ()).unwrap();
305 assert!(t2 > t1);
306 }
307
308 #[test]
309 fn concurrent_ticks_are_all_distinct() {
310 // Goal: the mutex actually serializes ticks — under contention every
311 // issued timestamp is unique (hence strictly ordered), none reused.
312 let mem = MemStorage::default();
313 let svc = HlcService::open(&mem).unwrap();
314 let mut all: Vec<Timestamp> = std::thread::scope(|s| {
315 let handles: Vec<_> = (0..8)
316 .map(|_| s.spawn(|| (0..100).map(|_| svc.now(&mut ()).unwrap()).collect::<Vec<_>>()))
317 .collect();
318 handles
319 .into_iter()
320 .flat_map(|h| h.join().unwrap())
321 .collect()
322 });
323 let total = all.len();
324 all.sort();
325 all.dedup();
326 assert_eq!(all.len(), total, "every concurrent tick must be unique");
327 }
328}