Skip to main content

sochdb_storage/
hlc.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Hybrid Logical Clock (HLC) for Monotonic Commit Timestamps
19//!
20//! From mm.md Task 1.3: HLC-Based Transaction Ordering
21//!
22//! ## Problem
23//!
24//! Using wall-clock timestamps can violate `commit_ts >= start_ts` due to:
25//! - NTP time regression
26//! - Clock skew across threads
27//! - Ambiguous GC boundaries
28//!
29//! ## Solution
30//!
31//! Hybrid Logical Clock provides monotonic timestamps even if physical time regresses.
32//!
33//! ## Algorithm
34//!
35//! ```text
36//! HLC timestamp: ts = (physical_time << k) | logical_counter
37//!
38//! On event:
39//!   physical = now_micros()
40//!   if physical > last_physical:
41//!     logical = 0
42//!   else:
43//!     logical = last_logical + 1
44//!   ts = max(last_ts + 1, (physical << 16) | logical)
45//!   last_ts = ts
46//!
47//! Properties:
48//! - Monotonic: ts_i < ts_{i+1} always
49//! - Causally consistent: if A → B then ts_A < ts_B
50//! - Bounded drift: ts - real_time < max_clock_drift
51//! ```
52//!
53//! Cost: O(1) per timestamp allocation
54
55use std::sync::atomic::{AtomicU64, Ordering};
56use std::time::{SystemTime, UNIX_EPOCH};
57
58/// Bits reserved for logical counter (16 bits = 65K events per microsecond)
59const LOGICAL_BITS: u32 = 16;
60const LOGICAL_MASK: u64 = (1 << LOGICAL_BITS) - 1;
61
62/// Maximum clock drift we tolerate (1 second in microseconds)
63const MAX_DRIFT_US: u64 = 1_000_000;
64
65/// Hybrid Logical Clock for monotonic, causally consistent timestamps
66///
67/// Thread-safe implementation using atomic operations.
68///
69/// ## Performance
70///
71/// - Allocation: O(1) amortized, single CAS operation
72/// - Memory: 16 bytes (two atomic u64s)
73/// - Contention: Low under typical workloads (physical time advances)
74#[derive(Debug)]
75pub struct HybridLogicalClock {
76    /// Last allocated timestamp (physical << 16 | logical)
77    last_ts: AtomicU64,
78    /// Last physical time seen (microseconds since epoch)
79    last_physical: AtomicU64,
80}
81
82impl Default for HybridLogicalClock {
83    fn default() -> Self {
84        Self::new()
85    }
86}
87
88impl HybridLogicalClock {
89    /// Create a new HLC initialized to current time
90    pub fn new() -> Self {
91        let physical = Self::now_physical();
92        let initial_ts = physical << LOGICAL_BITS;
93        Self {
94            last_ts: AtomicU64::new(initial_ts),
95            last_physical: AtomicU64::new(physical),
96        }
97    }
98
99    /// Create HLC with a specific starting timestamp (for recovery)
100    pub fn with_timestamp(ts: u64) -> Self {
101        let physical = ts >> LOGICAL_BITS;
102        Self {
103            last_ts: AtomicU64::new(ts),
104            last_physical: AtomicU64::new(physical),
105        }
106    }
107
108    /// Get current physical time in microseconds
109    #[inline]
110    fn now_physical() -> u64 {
111        SystemTime::now()
112            .duration_since(UNIX_EPOCH)
113            .expect("System time before UNIX epoch")
114            .as_micros() as u64
115    }
116
117    /// Allocate the next timestamp (monotonically increasing)
118    ///
119    /// This is the main API for transaction commit timestamps.
120    ///
121    /// ## Guarantees
122    ///
123    /// - Strictly monotonic: result > any previous result
124    /// - Causally consistent: happens-before relationships preserved
125    /// - Bounded drift: timestamp within MAX_DRIFT_US of real time
126    #[inline]
127    pub fn next(&self) -> u64 {
128        loop {
129            let physical = Self::now_physical();
130            let last = self.last_ts.load(Ordering::Acquire);
131            let last_physical = self.last_physical.load(Ordering::Acquire);
132
133            let new_ts = if physical > last_physical {
134                // Physical time advanced - reset logical counter
135                physical << LOGICAL_BITS
136            } else {
137                // Physical time same or regressed - increment logical
138                let logical = (last & LOGICAL_MASK) + 1;
139                if logical > LOGICAL_MASK {
140                    // Logical overflow - wait for physical time to advance
141                    std::thread::yield_now();
142                    continue;
143                }
144                (last & !LOGICAL_MASK) | logical
145            };
146
147            // Ensure monotonicity
148            let new_ts = new_ts.max(last + 1);
149
150            // CAS to update
151            if self
152                .last_ts
153                .compare_exchange_weak(last, new_ts, Ordering::AcqRel, Ordering::Acquire)
154                .is_ok()
155            {
156                // Update last physical if we advanced
157                if physical > last_physical {
158                    self.last_physical.store(physical, Ordering::Release);
159                }
160                return new_ts;
161            }
162            // CAS failed, retry
163        }
164    }
165
166    /// Receive a timestamp from another node (for distributed scenarios)
167    ///
168    /// Updates local clock to be at least as recent as the received timestamp.
169    pub fn receive(&self, remote_ts: u64) {
170        loop {
171            let last = self.last_ts.load(Ordering::Acquire);
172            if remote_ts <= last {
173                return; // Already ahead
174            }
175
176            let physical = Self::now_physical();
177            let remote_physical = remote_ts >> LOGICAL_BITS;
178
179            // Check drift
180            if remote_physical > physical + MAX_DRIFT_US {
181                // Remote clock too far ahead - could indicate attack or misconfiguration
182                // We cap at our physical time + reasonable drift
183                let capped = (physical + MAX_DRIFT_US) << LOGICAL_BITS;
184                if self
185                    .last_ts
186                    .compare_exchange_weak(last, capped.max(last + 1), Ordering::AcqRel, Ordering::Acquire)
187                    .is_ok()
188                {
189                    return;
190                }
191            } else {
192                // Accept remote timestamp
193                if self
194                    .last_ts
195                    .compare_exchange_weak(last, remote_ts, Ordering::AcqRel, Ordering::Acquire)
196                    .is_ok()
197                {
198                    self.last_physical
199                        .fetch_max(remote_physical, Ordering::Release);
200                    return;
201                }
202            }
203        }
204    }
205
206    /// Get the current timestamp without advancing
207    #[inline]
208    pub fn current(&self) -> u64 {
209        self.last_ts.load(Ordering::Acquire)
210    }
211
212    /// Extract physical time component from a timestamp
213    #[inline]
214    pub fn physical_time(ts: u64) -> u64 {
215        ts >> LOGICAL_BITS
216    }
217
218    /// Extract logical counter from a timestamp
219    #[inline]
220    pub fn logical_counter(ts: u64) -> u64 {
221        ts & LOGICAL_MASK
222    }
223
224    /// Compare two timestamps
225    #[inline]
226    pub fn compare(a: u64, b: u64) -> std::cmp::Ordering {
227        a.cmp(&b)
228    }
229
230    /// Check if timestamp a happened before timestamp b
231    #[inline]
232    pub fn happened_before(a: u64, b: u64) -> bool {
233        a < b
234    }
235}
236
237/// HLC timestamp with named components for debugging
238#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
239pub struct HlcTimestamp {
240    /// Raw timestamp value
241    pub raw: u64,
242}
243
244impl HlcTimestamp {
245    /// Create from raw value
246    pub const fn from_raw(raw: u64) -> Self {
247        Self { raw }
248    }
249
250    /// Create from components
251    pub const fn new(physical_us: u64, logical: u16) -> Self {
252        Self {
253            raw: (physical_us << LOGICAL_BITS) | (logical as u64),
254        }
255    }
256
257    /// Get physical time in microseconds
258    pub const fn physical_us(&self) -> u64 {
259        self.raw >> LOGICAL_BITS
260    }
261
262    /// Get logical counter
263    pub const fn logical(&self) -> u16 {
264        (self.raw & LOGICAL_MASK) as u16
265    }
266
267    /// Convert to raw u64
268    pub const fn as_u64(&self) -> u64 {
269        self.raw
270    }
271}
272
273impl std::fmt::Display for HlcTimestamp {
274    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
275        write!(f, "{}:{}", self.physical_us(), self.logical())
276    }
277}
278
279#[cfg(test)]
280mod tests {
281    use super::*;
282    use std::sync::Arc;
283    use std::thread;
284
285    #[test]
286    fn test_monotonicity() {
287        let hlc = HybridLogicalClock::new();
288        let mut prev = 0u64;
289
290        for _ in 0..10000 {
291            let ts = hlc.next();
292            assert!(ts > prev, "Timestamp {} should be > {}", ts, prev);
293            prev = ts;
294        }
295    }
296
297    #[test]
298    fn test_concurrent_monotonicity() {
299        let hlc = Arc::new(HybridLogicalClock::new());
300        let num_threads = 8;
301        let ops_per_thread = 10000;
302
303        let handles: Vec<_> = (0..num_threads)
304            .map(|_| {
305                let hlc = Arc::clone(&hlc);
306                thread::spawn(move || {
307                    let mut timestamps = Vec::with_capacity(ops_per_thread);
308                    for _ in 0..ops_per_thread {
309                        timestamps.push(hlc.next());
310                    }
311                    timestamps
312                })
313            })
314            .collect();
315
316        let mut all_timestamps = Vec::new();
317        for handle in handles {
318            all_timestamps.extend(handle.join().unwrap());
319        }
320
321        // All timestamps should be unique
322        let unique_count = {
323            let mut sorted = all_timestamps.clone();
324            sorted.sort();
325            sorted.dedup();
326            sorted.len()
327        };
328
329        assert_eq!(
330            unique_count,
331            all_timestamps.len(),
332            "All timestamps should be unique"
333        );
334    }
335
336    #[test]
337    fn test_receive_advances_clock() {
338        let hlc = HybridLogicalClock::new();
339        let current = hlc.current();
340
341        // Simulate receiving a future timestamp
342        let future_ts = current + 1000;
343        hlc.receive(future_ts);
344
345        assert!(hlc.current() >= future_ts);
346    }
347
348    #[test]
349    fn test_hlc_timestamp_display() {
350        let ts = HlcTimestamp::new(1000000, 42);
351        assert_eq!(ts.physical_us(), 1000000);
352        assert_eq!(ts.logical(), 42);
353        assert_eq!(format!("{}", ts), "1000000:42");
354    }
355}