sochdb_storage/hlc.rs
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Hybrid Logical Clock (HLC) for Monotonic Commit Timestamps
19//!
20//! From mm.md Task 1.3: HLC-Based Transaction Ordering
21//!
22//! ## Problem
23//!
24//! Using wall-clock timestamps can violate `commit_ts >= start_ts` due to:
25//! - NTP time regression
26//! - Clock skew across threads
27//! - Ambiguous GC boundaries
28//!
29//! ## Solution
30//!
31//! Hybrid Logical Clock provides monotonic timestamps even if physical time regresses.
32//!
33//! ## Algorithm
34//!
35//! ```text
36//! HLC timestamp: ts = (physical_time << k) | logical_counter
37//!
38//! On event:
39//! physical = now_micros()
40//! if physical > last_physical:
41//! logical = 0
42//! else:
43//! logical = last_logical + 1
44//! ts = max(last_ts + 1, (physical << 16) | logical)
45//! last_ts = ts
46//!
47//! Properties:
48//! - Monotonic: ts_i < ts_{i+1} always
49//! - Causally consistent: if A → B then ts_A < ts_B
50//! - Bounded drift: ts - real_time < max_clock_drift
51//! ```
52//!
53//! Cost: O(1) per timestamp allocation
54
55use std::sync::atomic::{AtomicU64, Ordering};
56use std::time::{SystemTime, UNIX_EPOCH};
57
58/// Bits reserved for logical counter (16 bits = 65K events per microsecond)
59const LOGICAL_BITS: u32 = 16;
60const LOGICAL_MASK: u64 = (1 << LOGICAL_BITS) - 1;
61
62/// Maximum clock drift we tolerate (1 second in microseconds)
63const MAX_DRIFT_US: u64 = 1_000_000;
64
65/// Hybrid Logical Clock for monotonic, causally consistent timestamps
66///
67/// Thread-safe implementation using atomic operations.
68///
69/// ## Performance
70///
71/// - Allocation: O(1) amortized, single CAS operation
72/// - Memory: 16 bytes (two atomic u64s)
73/// - Contention: Low under typical workloads (physical time advances)
74#[derive(Debug)]
75pub struct HybridLogicalClock {
76 /// Last allocated timestamp (physical << 16 | logical)
77 last_ts: AtomicU64,
78 /// Last physical time seen (microseconds since epoch)
79 last_physical: AtomicU64,
80}
81
82impl Default for HybridLogicalClock {
83 fn default() -> Self {
84 Self::new()
85 }
86}
87
88impl HybridLogicalClock {
89 /// Create a new HLC initialized to current time
90 pub fn new() -> Self {
91 let physical = Self::now_physical();
92 let initial_ts = physical << LOGICAL_BITS;
93 Self {
94 last_ts: AtomicU64::new(initial_ts),
95 last_physical: AtomicU64::new(physical),
96 }
97 }
98
99 /// Create HLC with a specific starting timestamp (for recovery)
100 pub fn with_timestamp(ts: u64) -> Self {
101 let physical = ts >> LOGICAL_BITS;
102 Self {
103 last_ts: AtomicU64::new(ts),
104 last_physical: AtomicU64::new(physical),
105 }
106 }
107
108 /// Get current physical time in microseconds
109 #[inline]
110 fn now_physical() -> u64 {
111 SystemTime::now()
112 .duration_since(UNIX_EPOCH)
113 .expect("System time before UNIX epoch")
114 .as_micros() as u64
115 }
116
117 /// Allocate the next timestamp (monotonically increasing)
118 ///
119 /// This is the main API for transaction commit timestamps.
120 ///
121 /// ## Guarantees
122 ///
123 /// - Strictly monotonic: result > any previous result
124 /// - Causally consistent: happens-before relationships preserved
125 /// - Bounded drift: timestamp within MAX_DRIFT_US of real time
126 #[inline]
127 pub fn next(&self) -> u64 {
128 loop {
129 let physical = Self::now_physical();
130 let last = self.last_ts.load(Ordering::Acquire);
131 let last_physical = self.last_physical.load(Ordering::Acquire);
132
133 let new_ts = if physical > last_physical {
134 // Physical time advanced - reset logical counter
135 physical << LOGICAL_BITS
136 } else {
137 // Physical time same or regressed - increment logical
138 let logical = (last & LOGICAL_MASK) + 1;
139 if logical > LOGICAL_MASK {
140 // Logical overflow - wait for physical time to advance
141 std::thread::yield_now();
142 continue;
143 }
144 (last & !LOGICAL_MASK) | logical
145 };
146
147 // Ensure monotonicity
148 let new_ts = new_ts.max(last + 1);
149
150 // CAS to update
151 if self
152 .last_ts
153 .compare_exchange_weak(last, new_ts, Ordering::AcqRel, Ordering::Acquire)
154 .is_ok()
155 {
156 // Update last physical if we advanced
157 if physical > last_physical {
158 self.last_physical.store(physical, Ordering::Release);
159 }
160 return new_ts;
161 }
162 // CAS failed, retry
163 }
164 }
165
166 /// Receive a timestamp from another node (for distributed scenarios)
167 ///
168 /// Updates local clock to be at least as recent as the received timestamp.
169 pub fn receive(&self, remote_ts: u64) {
170 loop {
171 let last = self.last_ts.load(Ordering::Acquire);
172 if remote_ts <= last {
173 return; // Already ahead
174 }
175
176 let physical = Self::now_physical();
177 let remote_physical = remote_ts >> LOGICAL_BITS;
178
179 // Check drift
180 if remote_physical > physical + MAX_DRIFT_US {
181 // Remote clock too far ahead - could indicate attack or misconfiguration
182 // We cap at our physical time + reasonable drift
183 let capped = (physical + MAX_DRIFT_US) << LOGICAL_BITS;
184 if self
185 .last_ts
186 .compare_exchange_weak(
187 last,
188 capped.max(last + 1),
189 Ordering::AcqRel,
190 Ordering::Acquire,
191 )
192 .is_ok()
193 {
194 return;
195 }
196 } else {
197 // Accept remote timestamp
198 if self
199 .last_ts
200 .compare_exchange_weak(last, remote_ts, Ordering::AcqRel, Ordering::Acquire)
201 .is_ok()
202 {
203 self.last_physical
204 .fetch_max(remote_physical, Ordering::Release);
205 return;
206 }
207 }
208 }
209 }
210
211 /// Get the current timestamp without advancing
212 #[inline]
213 pub fn current(&self) -> u64 {
214 self.last_ts.load(Ordering::Acquire)
215 }
216
217 /// Extract physical time component from a timestamp
218 #[inline]
219 pub fn physical_time(ts: u64) -> u64 {
220 ts >> LOGICAL_BITS
221 }
222
223 /// Extract logical counter from a timestamp
224 #[inline]
225 pub fn logical_counter(ts: u64) -> u64 {
226 ts & LOGICAL_MASK
227 }
228
229 /// Compare two timestamps
230 #[inline]
231 pub fn compare(a: u64, b: u64) -> std::cmp::Ordering {
232 a.cmp(&b)
233 }
234
235 /// Check if timestamp a happened before timestamp b
236 #[inline]
237 pub fn happened_before(a: u64, b: u64) -> bool {
238 a < b
239 }
240}
241
242/// HLC timestamp with named components for debugging
243#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
244pub struct HlcTimestamp {
245 /// Raw timestamp value
246 pub raw: u64,
247}
248
249impl HlcTimestamp {
250 /// Create from raw value
251 pub const fn from_raw(raw: u64) -> Self {
252 Self { raw }
253 }
254
255 /// Create from components
256 pub const fn new(physical_us: u64, logical: u16) -> Self {
257 Self {
258 raw: (physical_us << LOGICAL_BITS) | (logical as u64),
259 }
260 }
261
262 /// Get physical time in microseconds
263 pub const fn physical_us(&self) -> u64 {
264 self.raw >> LOGICAL_BITS
265 }
266
267 /// Get logical counter
268 pub const fn logical(&self) -> u16 {
269 (self.raw & LOGICAL_MASK) as u16
270 }
271
272 /// Convert to raw u64
273 pub const fn as_u64(&self) -> u64 {
274 self.raw
275 }
276}
277
278impl std::fmt::Display for HlcTimestamp {
279 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280 write!(f, "{}:{}", self.physical_us(), self.logical())
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287 use std::sync::Arc;
288 use std::thread;
289
290 #[test]
291 fn test_monotonicity() {
292 let hlc = HybridLogicalClock::new();
293 let mut prev = 0u64;
294
295 for _ in 0..10000 {
296 let ts = hlc.next();
297 assert!(ts > prev, "Timestamp {} should be > {}", ts, prev);
298 prev = ts;
299 }
300 }
301
302 #[test]
303 fn test_concurrent_monotonicity() {
304 let hlc = Arc::new(HybridLogicalClock::new());
305 let num_threads = 8;
306 let ops_per_thread = 10000;
307
308 let handles: Vec<_> = (0..num_threads)
309 .map(|_| {
310 let hlc = Arc::clone(&hlc);
311 thread::spawn(move || {
312 let mut timestamps = Vec::with_capacity(ops_per_thread);
313 for _ in 0..ops_per_thread {
314 timestamps.push(hlc.next());
315 }
316 timestamps
317 })
318 })
319 .collect();
320
321 let mut all_timestamps = Vec::new();
322 for handle in handles {
323 all_timestamps.extend(handle.join().unwrap());
324 }
325
326 // All timestamps should be unique
327 let unique_count = {
328 let mut sorted = all_timestamps.clone();
329 sorted.sort();
330 sorted.dedup();
331 sorted.len()
332 };
333
334 assert_eq!(
335 unique_count,
336 all_timestamps.len(),
337 "All timestamps should be unique"
338 );
339 }
340
341 #[test]
342 fn test_receive_advances_clock() {
343 let hlc = HybridLogicalClock::new();
344 let current = hlc.current();
345
346 // Simulate receiving a future timestamp
347 let future_ts = current + 1000;
348 hlc.receive(future_ts);
349
350 assert!(hlc.current() >= future_ts);
351 }
352
353 #[test]
354 fn test_hlc_timestamp_display() {
355 let ts = HlcTimestamp::new(1000000, 42);
356 assert_eq!(ts.physical_us(), 1000000);
357 assert_eq!(ts.logical(), 42);
358 assert_eq!(format!("{}", ts), "1000000:42");
359 }
360}