Skip to main content

sochdb_storage/
hlc.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Hybrid Logical Clock (HLC) for Monotonic Commit Timestamps
19//!
20//! From mm.md Task 1.3: HLC-Based Transaction Ordering
21//!
22//! ## Problem
23//!
24//! Using wall-clock timestamps can violate `commit_ts >= start_ts` due to:
25//! - NTP time regression
26//! - Clock skew across threads
27//! - Ambiguous GC boundaries
28//!
29//! ## Solution
30//!
31//! Hybrid Logical Clock provides monotonic timestamps even if physical time regresses.
32//!
33//! ## Algorithm
34//!
35//! ```text
36//! HLC timestamp: ts = (physical_time << k) | logical_counter
37//!
38//! On event:
39//!   physical = now_micros()
40//!   if physical > last_physical:
41//!     logical = 0
42//!   else:
43//!     logical = last_logical + 1
44//!   ts = max(last_ts + 1, (physical << 16) | logical)
45//!   last_ts = ts
46//!
47//! Properties:
48//! - Monotonic: ts_i < ts_{i+1} always
49//! - Causally consistent: if A → B then ts_A < ts_B
50//! - Bounded drift: ts - real_time < max_clock_drift
51//! ```
52//!
53//! Cost: O(1) per timestamp allocation
54
55use std::sync::atomic::{AtomicU64, Ordering};
56use std::time::{SystemTime, UNIX_EPOCH};
57
58/// Bits reserved for logical counter (16 bits = 65K events per microsecond)
59const LOGICAL_BITS: u32 = 16;
60const LOGICAL_MASK: u64 = (1 << LOGICAL_BITS) - 1;
61
62/// Maximum clock drift we tolerate (1 second in microseconds)
63const MAX_DRIFT_US: u64 = 1_000_000;
64
65/// Hybrid Logical Clock for monotonic, causally consistent timestamps
66///
67/// Thread-safe implementation using atomic operations.
68///
69/// ## Performance
70///
71/// - Allocation: O(1) amortized, single CAS operation
72/// - Memory: 16 bytes (two atomic u64s)
73/// - Contention: Low under typical workloads (physical time advances)
74#[derive(Debug)]
75pub struct HybridLogicalClock {
76    /// Last allocated timestamp (physical << 16 | logical)
77    last_ts: AtomicU64,
78    /// Last physical time seen (microseconds since epoch)
79    last_physical: AtomicU64,
80}
81
82impl Default for HybridLogicalClock {
83    fn default() -> Self {
84        Self::new()
85    }
86}
87
88impl HybridLogicalClock {
89    /// Create a new HLC initialized to current time
90    pub fn new() -> Self {
91        let physical = Self::now_physical();
92        let initial_ts = physical << LOGICAL_BITS;
93        Self {
94            last_ts: AtomicU64::new(initial_ts),
95            last_physical: AtomicU64::new(physical),
96        }
97    }
98
99    /// Create HLC with a specific starting timestamp (for recovery)
100    pub fn with_timestamp(ts: u64) -> Self {
101        let physical = ts >> LOGICAL_BITS;
102        Self {
103            last_ts: AtomicU64::new(ts),
104            last_physical: AtomicU64::new(physical),
105        }
106    }
107
108    /// Get current physical time in microseconds
109    #[inline]
110    fn now_physical() -> u64 {
111        SystemTime::now()
112            .duration_since(UNIX_EPOCH)
113            .expect("System time before UNIX epoch")
114            .as_micros() as u64
115    }
116
117    /// Allocate the next timestamp (monotonically increasing)
118    ///
119    /// This is the main API for transaction commit timestamps.
120    ///
121    /// ## Guarantees
122    ///
123    /// - Strictly monotonic: result > any previous result
124    /// - Causally consistent: happens-before relationships preserved
125    /// - Bounded drift: timestamp within MAX_DRIFT_US of real time
126    #[inline]
127    pub fn next(&self) -> u64 {
128        loop {
129            let physical = Self::now_physical();
130            let last = self.last_ts.load(Ordering::Acquire);
131            let last_physical = self.last_physical.load(Ordering::Acquire);
132
133            let new_ts = if physical > last_physical {
134                // Physical time advanced - reset logical counter
135                physical << LOGICAL_BITS
136            } else {
137                // Physical time same or regressed - increment logical
138                let logical = (last & LOGICAL_MASK) + 1;
139                if logical > LOGICAL_MASK {
140                    // Logical overflow - wait for physical time to advance
141                    std::thread::yield_now();
142                    continue;
143                }
144                (last & !LOGICAL_MASK) | logical
145            };
146
147            // Ensure monotonicity
148            let new_ts = new_ts.max(last + 1);
149
150            // CAS to update
151            if self
152                .last_ts
153                .compare_exchange_weak(last, new_ts, Ordering::AcqRel, Ordering::Acquire)
154                .is_ok()
155            {
156                // Update last physical if we advanced
157                if physical > last_physical {
158                    self.last_physical.store(physical, Ordering::Release);
159                }
160                return new_ts;
161            }
162            // CAS failed, retry
163        }
164    }
165
166    /// Receive a timestamp from another node (for distributed scenarios)
167    ///
168    /// Updates local clock to be at least as recent as the received timestamp.
169    pub fn receive(&self, remote_ts: u64) {
170        loop {
171            let last = self.last_ts.load(Ordering::Acquire);
172            if remote_ts <= last {
173                return; // Already ahead
174            }
175
176            let physical = Self::now_physical();
177            let remote_physical = remote_ts >> LOGICAL_BITS;
178
179            // Check drift
180            if remote_physical > physical + MAX_DRIFT_US {
181                // Remote clock too far ahead - could indicate attack or misconfiguration
182                // We cap at our physical time + reasonable drift
183                let capped = (physical + MAX_DRIFT_US) << LOGICAL_BITS;
184                if self
185                    .last_ts
186                    .compare_exchange_weak(
187                        last,
188                        capped.max(last + 1),
189                        Ordering::AcqRel,
190                        Ordering::Acquire,
191                    )
192                    .is_ok()
193                {
194                    return;
195                }
196            } else {
197                // Accept remote timestamp
198                if self
199                    .last_ts
200                    .compare_exchange_weak(last, remote_ts, Ordering::AcqRel, Ordering::Acquire)
201                    .is_ok()
202                {
203                    self.last_physical
204                        .fetch_max(remote_physical, Ordering::Release);
205                    return;
206                }
207            }
208        }
209    }
210
211    /// Get the current timestamp without advancing
212    #[inline]
213    pub fn current(&self) -> u64 {
214        self.last_ts.load(Ordering::Acquire)
215    }
216
217    /// Extract physical time component from a timestamp
218    #[inline]
219    pub fn physical_time(ts: u64) -> u64 {
220        ts >> LOGICAL_BITS
221    }
222
223    /// Extract logical counter from a timestamp
224    #[inline]
225    pub fn logical_counter(ts: u64) -> u64 {
226        ts & LOGICAL_MASK
227    }
228
229    /// Compare two timestamps
230    #[inline]
231    pub fn compare(a: u64, b: u64) -> std::cmp::Ordering {
232        a.cmp(&b)
233    }
234
235    /// Check if timestamp a happened before timestamp b
236    #[inline]
237    pub fn happened_before(a: u64, b: u64) -> bool {
238        a < b
239    }
240}
241
242/// HLC timestamp with named components for debugging
243#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
244pub struct HlcTimestamp {
245    /// Raw timestamp value
246    pub raw: u64,
247}
248
249impl HlcTimestamp {
250    /// Create from raw value
251    pub const fn from_raw(raw: u64) -> Self {
252        Self { raw }
253    }
254
255    /// Create from components
256    pub const fn new(physical_us: u64, logical: u16) -> Self {
257        Self {
258            raw: (physical_us << LOGICAL_BITS) | (logical as u64),
259        }
260    }
261
262    /// Get physical time in microseconds
263    pub const fn physical_us(&self) -> u64 {
264        self.raw >> LOGICAL_BITS
265    }
266
267    /// Get logical counter
268    pub const fn logical(&self) -> u16 {
269        (self.raw & LOGICAL_MASK) as u16
270    }
271
272    /// Convert to raw u64
273    pub const fn as_u64(&self) -> u64 {
274        self.raw
275    }
276}
277
278impl std::fmt::Display for HlcTimestamp {
279    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280        write!(f, "{}:{}", self.physical_us(), self.logical())
281    }
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287    use std::sync::Arc;
288    use std::thread;
289
290    #[test]
291    fn test_monotonicity() {
292        let hlc = HybridLogicalClock::new();
293        let mut prev = 0u64;
294
295        for _ in 0..10000 {
296            let ts = hlc.next();
297            assert!(ts > prev, "Timestamp {} should be > {}", ts, prev);
298            prev = ts;
299        }
300    }
301
302    #[test]
303    fn test_concurrent_monotonicity() {
304        let hlc = Arc::new(HybridLogicalClock::new());
305        let num_threads = 8;
306        let ops_per_thread = 10000;
307
308        let handles: Vec<_> = (0..num_threads)
309            .map(|_| {
310                let hlc = Arc::clone(&hlc);
311                thread::spawn(move || {
312                    let mut timestamps = Vec::with_capacity(ops_per_thread);
313                    for _ in 0..ops_per_thread {
314                        timestamps.push(hlc.next());
315                    }
316                    timestamps
317                })
318            })
319            .collect();
320
321        let mut all_timestamps = Vec::new();
322        for handle in handles {
323            all_timestamps.extend(handle.join().unwrap());
324        }
325
326        // All timestamps should be unique
327        let unique_count = {
328            let mut sorted = all_timestamps.clone();
329            sorted.sort();
330            sorted.dedup();
331            sorted.len()
332        };
333
334        assert_eq!(
335            unique_count,
336            all_timestamps.len(),
337            "All timestamps should be unique"
338        );
339    }
340
341    #[test]
342    fn test_receive_advances_clock() {
343        let hlc = HybridLogicalClock::new();
344        let current = hlc.current();
345
346        // Simulate receiving a future timestamp
347        let future_ts = current + 1000;
348        hlc.receive(future_ts);
349
350        assert!(hlc.current() >= future_ts);
351    }
352
353    #[test]
354    fn test_hlc_timestamp_display() {
355        let ts = HlcTimestamp::new(1000000, 42);
356        assert_eq!(ts.physical_us(), 1000000);
357        assert_eq!(ts.logical(), 42);
358        assert_eq!(format!("{}", ts), "1000000:42");
359    }
360}