sochdb_storage/hlc.rs
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Hybrid Logical Clock (HLC) for Monotonic Commit Timestamps
19//!
20//! From mm.md Task 1.3: HLC-Based Transaction Ordering
21//!
22//! ## Problem
23//!
24//! Using wall-clock timestamps can violate `commit_ts >= start_ts` due to:
25//! - NTP time regression
26//! - Clock skew across threads
27//! - Ambiguous GC boundaries
28//!
29//! ## Solution
30//!
31//! Hybrid Logical Clock provides monotonic timestamps even if physical time regresses.
32//!
33//! ## Algorithm
34//!
35//! ```text
36//! HLC timestamp: ts = (physical_time << k) | logical_counter
37//!
38//! On event:
39//! physical = now_micros()
40//! if physical > last_physical:
41//! logical = 0
42//! else:
43//! logical = last_logical + 1
44//! ts = max(last_ts + 1, (physical << 16) | logical)
45//! last_ts = ts
46//!
47//! Properties:
48//! - Monotonic: ts_i < ts_{i+1} always
49//! - Causally consistent: if A → B then ts_A < ts_B
50//! - Bounded drift: ts - real_time < max_clock_drift
51//! ```
52//!
53//! Cost: O(1) per timestamp allocation
54
55use std::sync::atomic::{AtomicU64, Ordering};
56use std::time::{SystemTime, UNIX_EPOCH};
57
58/// Bits reserved for logical counter (16 bits = 65K events per microsecond)
59const LOGICAL_BITS: u32 = 16;
60const LOGICAL_MASK: u64 = (1 << LOGICAL_BITS) - 1;
61
62/// Maximum clock drift we tolerate (1 second in microseconds)
63const MAX_DRIFT_US: u64 = 1_000_000;
64
65/// Hybrid Logical Clock for monotonic, causally consistent timestamps
66///
67/// Thread-safe implementation using atomic operations.
68///
69/// ## Performance
70///
71/// - Allocation: O(1) amortized, single CAS operation
72/// - Memory: 16 bytes (two atomic u64s)
73/// - Contention: Low under typical workloads (physical time advances)
74#[derive(Debug)]
75pub struct HybridLogicalClock {
76 /// Last allocated timestamp (physical << 16 | logical)
77 last_ts: AtomicU64,
78 /// Last physical time seen (microseconds since epoch)
79 last_physical: AtomicU64,
80}
81
82impl Default for HybridLogicalClock {
83 fn default() -> Self {
84 Self::new()
85 }
86}
87
88impl HybridLogicalClock {
89 /// Create a new HLC initialized to current time
90 pub fn new() -> Self {
91 let physical = Self::now_physical();
92 let initial_ts = physical << LOGICAL_BITS;
93 Self {
94 last_ts: AtomicU64::new(initial_ts),
95 last_physical: AtomicU64::new(physical),
96 }
97 }
98
99 /// Create HLC with a specific starting timestamp (for recovery)
100 pub fn with_timestamp(ts: u64) -> Self {
101 let physical = ts >> LOGICAL_BITS;
102 Self {
103 last_ts: AtomicU64::new(ts),
104 last_physical: AtomicU64::new(physical),
105 }
106 }
107
108 /// Get current physical time in microseconds
109 #[inline]
110 fn now_physical() -> u64 {
111 SystemTime::now()
112 .duration_since(UNIX_EPOCH)
113 .expect("System time before UNIX epoch")
114 .as_micros() as u64
115 }
116
117 /// Allocate the next timestamp (monotonically increasing)
118 ///
119 /// This is the main API for transaction commit timestamps.
120 ///
121 /// ## Guarantees
122 ///
123 /// - Strictly monotonic: result > any previous result
124 /// - Causally consistent: happens-before relationships preserved
125 /// - Bounded drift: timestamp within MAX_DRIFT_US of real time
126 #[inline]
127 pub fn next(&self) -> u64 {
128 loop {
129 let physical = Self::now_physical();
130 let last = self.last_ts.load(Ordering::Acquire);
131 let last_physical = self.last_physical.load(Ordering::Acquire);
132
133 let new_ts = if physical > last_physical {
134 // Physical time advanced - reset logical counter
135 physical << LOGICAL_BITS
136 } else {
137 // Physical time same or regressed - increment logical
138 let logical = (last & LOGICAL_MASK) + 1;
139 if logical > LOGICAL_MASK {
140 // Logical overflow - wait for physical time to advance
141 std::thread::yield_now();
142 continue;
143 }
144 (last & !LOGICAL_MASK) | logical
145 };
146
147 // Ensure monotonicity
148 let new_ts = new_ts.max(last + 1);
149
150 // CAS to update
151 if self
152 .last_ts
153 .compare_exchange_weak(last, new_ts, Ordering::AcqRel, Ordering::Acquire)
154 .is_ok()
155 {
156 // Update last physical if we advanced
157 if physical > last_physical {
158 self.last_physical.store(physical, Ordering::Release);
159 }
160 return new_ts;
161 }
162 // CAS failed, retry
163 }
164 }
165
166 /// Receive a timestamp from another node (for distributed scenarios)
167 ///
168 /// Updates local clock to be at least as recent as the received timestamp.
169 pub fn receive(&self, remote_ts: u64) {
170 loop {
171 let last = self.last_ts.load(Ordering::Acquire);
172 if remote_ts <= last {
173 return; // Already ahead
174 }
175
176 let physical = Self::now_physical();
177 let remote_physical = remote_ts >> LOGICAL_BITS;
178
179 // Check drift
180 if remote_physical > physical + MAX_DRIFT_US {
181 // Remote clock too far ahead - could indicate attack or misconfiguration
182 // We cap at our physical time + reasonable drift
183 let capped = (physical + MAX_DRIFT_US) << LOGICAL_BITS;
184 if self
185 .last_ts
186 .compare_exchange_weak(last, capped.max(last + 1), Ordering::AcqRel, Ordering::Acquire)
187 .is_ok()
188 {
189 return;
190 }
191 } else {
192 // Accept remote timestamp
193 if self
194 .last_ts
195 .compare_exchange_weak(last, remote_ts, Ordering::AcqRel, Ordering::Acquire)
196 .is_ok()
197 {
198 self.last_physical
199 .fetch_max(remote_physical, Ordering::Release);
200 return;
201 }
202 }
203 }
204 }
205
206 /// Get the current timestamp without advancing
207 #[inline]
208 pub fn current(&self) -> u64 {
209 self.last_ts.load(Ordering::Acquire)
210 }
211
212 /// Extract physical time component from a timestamp
213 #[inline]
214 pub fn physical_time(ts: u64) -> u64 {
215 ts >> LOGICAL_BITS
216 }
217
218 /// Extract logical counter from a timestamp
219 #[inline]
220 pub fn logical_counter(ts: u64) -> u64 {
221 ts & LOGICAL_MASK
222 }
223
224 /// Compare two timestamps
225 #[inline]
226 pub fn compare(a: u64, b: u64) -> std::cmp::Ordering {
227 a.cmp(&b)
228 }
229
230 /// Check if timestamp a happened before timestamp b
231 #[inline]
232 pub fn happened_before(a: u64, b: u64) -> bool {
233 a < b
234 }
235}
236
237/// HLC timestamp with named components for debugging
238#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
239pub struct HlcTimestamp {
240 /// Raw timestamp value
241 pub raw: u64,
242}
243
244impl HlcTimestamp {
245 /// Create from raw value
246 pub const fn from_raw(raw: u64) -> Self {
247 Self { raw }
248 }
249
250 /// Create from components
251 pub const fn new(physical_us: u64, logical: u16) -> Self {
252 Self {
253 raw: (physical_us << LOGICAL_BITS) | (logical as u64),
254 }
255 }
256
257 /// Get physical time in microseconds
258 pub const fn physical_us(&self) -> u64 {
259 self.raw >> LOGICAL_BITS
260 }
261
262 /// Get logical counter
263 pub const fn logical(&self) -> u16 {
264 (self.raw & LOGICAL_MASK) as u16
265 }
266
267 /// Convert to raw u64
268 pub const fn as_u64(&self) -> u64 {
269 self.raw
270 }
271}
272
273impl std::fmt::Display for HlcTimestamp {
274 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275 write!(f, "{}:{}", self.physical_us(), self.logical())
276 }
277}
278
279#[cfg(test)]
280mod tests {
281 use super::*;
282 use std::sync::Arc;
283 use std::thread;
284
285 #[test]
286 fn test_monotonicity() {
287 let hlc = HybridLogicalClock::new();
288 let mut prev = 0u64;
289
290 for _ in 0..10000 {
291 let ts = hlc.next();
292 assert!(ts > prev, "Timestamp {} should be > {}", ts, prev);
293 prev = ts;
294 }
295 }
296
297 #[test]
298 fn test_concurrent_monotonicity() {
299 let hlc = Arc::new(HybridLogicalClock::new());
300 let num_threads = 8;
301 let ops_per_thread = 10000;
302
303 let handles: Vec<_> = (0..num_threads)
304 .map(|_| {
305 let hlc = Arc::clone(&hlc);
306 thread::spawn(move || {
307 let mut timestamps = Vec::with_capacity(ops_per_thread);
308 for _ in 0..ops_per_thread {
309 timestamps.push(hlc.next());
310 }
311 timestamps
312 })
313 })
314 .collect();
315
316 let mut all_timestamps = Vec::new();
317 for handle in handles {
318 all_timestamps.extend(handle.join().unwrap());
319 }
320
321 // All timestamps should be unique
322 let unique_count = {
323 let mut sorted = all_timestamps.clone();
324 sorted.sort();
325 sorted.dedup();
326 sorted.len()
327 };
328
329 assert_eq!(
330 unique_count,
331 all_timestamps.len(),
332 "All timestamps should be unique"
333 );
334 }
335
336 #[test]
337 fn test_receive_advances_clock() {
338 let hlc = HybridLogicalClock::new();
339 let current = hlc.current();
340
341 // Simulate receiving a future timestamp
342 let future_ts = current + 1000;
343 hlc.receive(future_ts);
344
345 assert!(hlc.current() >= future_ts);
346 }
347
348 #[test]
349 fn test_hlc_timestamp_display() {
350 let ts = HlcTimestamp::new(1000000, 42);
351 assert_eq!(ts.physical_us(), 1000000);
352 assert_eq!(ts.logical(), 42);
353 assert_eq!(format!("{}", ts), "1000000:42");
354 }
355}