sochdb_storage/hlc.rs
1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Hybrid Logical Clock (HLC) for Monotonic Commit Timestamps
16//!
17//! From mm.md Task 1.3: HLC-Based Transaction Ordering
18//!
19//! ## Problem
20//!
21//! Using wall-clock timestamps can violate `commit_ts >= start_ts` due to:
22//! - NTP time regression
23//! - Clock skew across threads
24//! - Ambiguous GC boundaries
25//!
26//! ## Solution
27//!
28//! Hybrid Logical Clock provides monotonic timestamps even if physical time regresses.
29//!
30//! ## Algorithm
31//!
32//! ```text
33//! HLC timestamp: ts = (physical_time << k) | logical_counter
34//!
35//! On event:
36//! physical = now_micros()
37//! if physical > last_physical:
38//! logical = 0
39//! else:
40//! logical = last_logical + 1
41//! ts = max(last_ts + 1, (physical << 16) | logical)
42//! last_ts = ts
43//!
44//! Properties:
45//! - Monotonic: ts_i < ts_{i+1} always
46//! - Causally consistent: if A → B then ts_A < ts_B
47//! - Bounded drift: ts - real_time < max_clock_drift
48//! ```
49//!
50//! Cost: O(1) per timestamp allocation
51
52use std::sync::atomic::{AtomicU64, Ordering};
53use std::time::{SystemTime, UNIX_EPOCH};
54
55/// Bits reserved for logical counter (16 bits = 65K events per microsecond)
56const LOGICAL_BITS: u32 = 16;
57const LOGICAL_MASK: u64 = (1 << LOGICAL_BITS) - 1;
58
59/// Maximum clock drift we tolerate (1 second in microseconds)
60const MAX_DRIFT_US: u64 = 1_000_000;
61
62/// Hybrid Logical Clock for monotonic, causally consistent timestamps
63///
64/// Thread-safe implementation using atomic operations.
65///
66/// ## Performance
67///
68/// - Allocation: O(1) amortized, single CAS operation
69/// - Memory: 16 bytes (two atomic u64s)
70/// - Contention: Low under typical workloads (physical time advances)
71#[derive(Debug)]
72pub struct HybridLogicalClock {
73 /// Last allocated timestamp (physical << 16 | logical)
74 last_ts: AtomicU64,
75 /// Last physical time seen (microseconds since epoch)
76 last_physical: AtomicU64,
77}
78
79impl Default for HybridLogicalClock {
80 fn default() -> Self {
81 Self::new()
82 }
83}
84
85impl HybridLogicalClock {
86 /// Create a new HLC initialized to current time
87 pub fn new() -> Self {
88 let physical = Self::now_physical();
89 let initial_ts = physical << LOGICAL_BITS;
90 Self {
91 last_ts: AtomicU64::new(initial_ts),
92 last_physical: AtomicU64::new(physical),
93 }
94 }
95
96 /// Create HLC with a specific starting timestamp (for recovery)
97 pub fn with_timestamp(ts: u64) -> Self {
98 let physical = ts >> LOGICAL_BITS;
99 Self {
100 last_ts: AtomicU64::new(ts),
101 last_physical: AtomicU64::new(physical),
102 }
103 }
104
105 /// Get current physical time in microseconds
106 #[inline]
107 fn now_physical() -> u64 {
108 SystemTime::now()
109 .duration_since(UNIX_EPOCH)
110 .expect("System time before UNIX epoch")
111 .as_micros() as u64
112 }
113
114 /// Allocate the next timestamp (monotonically increasing)
115 ///
116 /// This is the main API for transaction commit timestamps.
117 ///
118 /// ## Guarantees
119 ///
120 /// - Strictly monotonic: result > any previous result
121 /// - Causally consistent: happens-before relationships preserved
122 /// - Bounded drift: timestamp within MAX_DRIFT_US of real time
123 #[inline]
124 pub fn next(&self) -> u64 {
125 loop {
126 let physical = Self::now_physical();
127 let last = self.last_ts.load(Ordering::Acquire);
128 let last_physical = self.last_physical.load(Ordering::Acquire);
129
130 let new_ts = if physical > last_physical {
131 // Physical time advanced - reset logical counter
132 physical << LOGICAL_BITS
133 } else {
134 // Physical time same or regressed - increment logical
135 let logical = (last & LOGICAL_MASK) + 1;
136 if logical > LOGICAL_MASK {
137 // Logical overflow - wait for physical time to advance
138 std::thread::yield_now();
139 continue;
140 }
141 (last & !LOGICAL_MASK) | logical
142 };
143
144 // Ensure monotonicity
145 let new_ts = new_ts.max(last + 1);
146
147 // CAS to update
148 if self
149 .last_ts
150 .compare_exchange_weak(last, new_ts, Ordering::AcqRel, Ordering::Acquire)
151 .is_ok()
152 {
153 // Update last physical if we advanced
154 if physical > last_physical {
155 self.last_physical.store(physical, Ordering::Release);
156 }
157 return new_ts;
158 }
159 // CAS failed, retry
160 }
161 }
162
163 /// Receive a timestamp from another node (for distributed scenarios)
164 ///
165 /// Updates local clock to be at least as recent as the received timestamp.
166 pub fn receive(&self, remote_ts: u64) {
167 loop {
168 let last = self.last_ts.load(Ordering::Acquire);
169 if remote_ts <= last {
170 return; // Already ahead
171 }
172
173 let physical = Self::now_physical();
174 let remote_physical = remote_ts >> LOGICAL_BITS;
175
176 // Check drift
177 if remote_physical > physical + MAX_DRIFT_US {
178 // Remote clock too far ahead - could indicate attack or misconfiguration
179 // We cap at our physical time + reasonable drift
180 let capped = (physical + MAX_DRIFT_US) << LOGICAL_BITS;
181 if self
182 .last_ts
183 .compare_exchange_weak(last, capped.max(last + 1), Ordering::AcqRel, Ordering::Acquire)
184 .is_ok()
185 {
186 return;
187 }
188 } else {
189 // Accept remote timestamp
190 if self
191 .last_ts
192 .compare_exchange_weak(last, remote_ts, Ordering::AcqRel, Ordering::Acquire)
193 .is_ok()
194 {
195 self.last_physical
196 .fetch_max(remote_physical, Ordering::Release);
197 return;
198 }
199 }
200 }
201 }
202
203 /// Get the current timestamp without advancing
204 #[inline]
205 pub fn current(&self) -> u64 {
206 self.last_ts.load(Ordering::Acquire)
207 }
208
209 /// Extract physical time component from a timestamp
210 #[inline]
211 pub fn physical_time(ts: u64) -> u64 {
212 ts >> LOGICAL_BITS
213 }
214
215 /// Extract logical counter from a timestamp
216 #[inline]
217 pub fn logical_counter(ts: u64) -> u64 {
218 ts & LOGICAL_MASK
219 }
220
221 /// Compare two timestamps
222 #[inline]
223 pub fn compare(a: u64, b: u64) -> std::cmp::Ordering {
224 a.cmp(&b)
225 }
226
227 /// Check if timestamp a happened before timestamp b
228 #[inline]
229 pub fn happened_before(a: u64, b: u64) -> bool {
230 a < b
231 }
232}
233
234/// HLC timestamp with named components for debugging
235#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
236pub struct HlcTimestamp {
237 /// Raw timestamp value
238 pub raw: u64,
239}
240
241impl HlcTimestamp {
242 /// Create from raw value
243 pub const fn from_raw(raw: u64) -> Self {
244 Self { raw }
245 }
246
247 /// Create from components
248 pub const fn new(physical_us: u64, logical: u16) -> Self {
249 Self {
250 raw: (physical_us << LOGICAL_BITS) | (logical as u64),
251 }
252 }
253
254 /// Get physical time in microseconds
255 pub const fn physical_us(&self) -> u64 {
256 self.raw >> LOGICAL_BITS
257 }
258
259 /// Get logical counter
260 pub const fn logical(&self) -> u16 {
261 (self.raw & LOGICAL_MASK) as u16
262 }
263
264 /// Convert to raw u64
265 pub const fn as_u64(&self) -> u64 {
266 self.raw
267 }
268}
269
270impl std::fmt::Display for HlcTimestamp {
271 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
272 write!(f, "{}:{}", self.physical_us(), self.logical())
273 }
274}
275
276#[cfg(test)]
277mod tests {
278 use super::*;
279 use std::sync::Arc;
280 use std::thread;
281
282 #[test]
283 fn test_monotonicity() {
284 let hlc = HybridLogicalClock::new();
285 let mut prev = 0u64;
286
287 for _ in 0..10000 {
288 let ts = hlc.next();
289 assert!(ts > prev, "Timestamp {} should be > {}", ts, prev);
290 prev = ts;
291 }
292 }
293
294 #[test]
295 fn test_concurrent_monotonicity() {
296 let hlc = Arc::new(HybridLogicalClock::new());
297 let num_threads = 8;
298 let ops_per_thread = 10000;
299
300 let handles: Vec<_> = (0..num_threads)
301 .map(|_| {
302 let hlc = Arc::clone(&hlc);
303 thread::spawn(move || {
304 let mut timestamps = Vec::with_capacity(ops_per_thread);
305 for _ in 0..ops_per_thread {
306 timestamps.push(hlc.next());
307 }
308 timestamps
309 })
310 })
311 .collect();
312
313 let mut all_timestamps = Vec::new();
314 for handle in handles {
315 all_timestamps.extend(handle.join().unwrap());
316 }
317
318 // All timestamps should be unique
319 let unique_count = {
320 let mut sorted = all_timestamps.clone();
321 sorted.sort();
322 sorted.dedup();
323 sorted.len()
324 };
325
326 assert_eq!(
327 unique_count,
328 all_timestamps.len(),
329 "All timestamps should be unique"
330 );
331 }
332
333 #[test]
334 fn test_receive_advances_clock() {
335 let hlc = HybridLogicalClock::new();
336 let current = hlc.current();
337
338 // Simulate receiving a future timestamp
339 let future_ts = current + 1000;
340 hlc.receive(future_ts);
341
342 assert!(hlc.current() >= future_ts);
343 }
344
345 #[test]
346 fn test_hlc_timestamp_display() {
347 let ts = HlcTimestamp::new(1000000, 42);
348 assert_eq!(ts.physical_us(), 1000000);
349 assert_eq!(ts.logical(), 42);
350 assert_eq!(format!("{}", ts), "1000000:42");
351 }
352}