sochdb_storage/
hlc.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Hybrid Logical Clock (HLC) for Monotonic Commit Timestamps
16//!
17//! From mm.md Task 1.3: HLC-Based Transaction Ordering
18//!
19//! ## Problem
20//!
21//! Using wall-clock timestamps can violate `commit_ts >= start_ts` due to:
22//! - NTP time regression
23//! - Clock skew across threads
24//! - Ambiguous GC boundaries
25//!
26//! ## Solution
27//!
28//! Hybrid Logical Clock provides monotonic timestamps even if physical time regresses.
29//!
30//! ## Algorithm
31//!
32//! ```text
33//! HLC timestamp: ts = (physical_time << k) | logical_counter
34//!
35//! On event:
36//!   physical = now_micros()
37//!   if physical > last_physical:
38//!     logical = 0
39//!   else:
40//!     logical = last_logical + 1
41//!   ts = max(last_ts + 1, (physical << 16) | logical)
42//!   last_ts = ts
43//!
44//! Properties:
45//! - Monotonic: ts_i < ts_{i+1} always
46//! - Causally consistent: if A → B then ts_A < ts_B
47//! - Bounded drift: ts - real_time < max_clock_drift
48//! ```
49//!
50//! Cost: O(1) per timestamp allocation
51
52use std::sync::atomic::{AtomicU64, Ordering};
53use std::time::{SystemTime, UNIX_EPOCH};
54
55/// Bits reserved for logical counter (16 bits = 65K events per microsecond)
56const LOGICAL_BITS: u32 = 16;
57const LOGICAL_MASK: u64 = (1 << LOGICAL_BITS) - 1;
58
59/// Maximum clock drift we tolerate (1 second in microseconds)
60const MAX_DRIFT_US: u64 = 1_000_000;
61
62/// Hybrid Logical Clock for monotonic, causally consistent timestamps
63///
64/// Thread-safe implementation using atomic operations.
65///
66/// ## Performance
67///
68/// - Allocation: O(1) amortized, single CAS operation
69/// - Memory: 16 bytes (two atomic u64s)
70/// - Contention: Low under typical workloads (physical time advances)
71#[derive(Debug)]
72pub struct HybridLogicalClock {
73    /// Last allocated timestamp (physical << 16 | logical)
74    last_ts: AtomicU64,
75    /// Last physical time seen (microseconds since epoch)
76    last_physical: AtomicU64,
77}
78
79impl Default for HybridLogicalClock {
80    fn default() -> Self {
81        Self::new()
82    }
83}
84
85impl HybridLogicalClock {
86    /// Create a new HLC initialized to current time
87    pub fn new() -> Self {
88        let physical = Self::now_physical();
89        let initial_ts = physical << LOGICAL_BITS;
90        Self {
91            last_ts: AtomicU64::new(initial_ts),
92            last_physical: AtomicU64::new(physical),
93        }
94    }
95
96    /// Create HLC with a specific starting timestamp (for recovery)
97    pub fn with_timestamp(ts: u64) -> Self {
98        let physical = ts >> LOGICAL_BITS;
99        Self {
100            last_ts: AtomicU64::new(ts),
101            last_physical: AtomicU64::new(physical),
102        }
103    }
104
105    /// Get current physical time in microseconds
106    #[inline]
107    fn now_physical() -> u64 {
108        SystemTime::now()
109            .duration_since(UNIX_EPOCH)
110            .expect("System time before UNIX epoch")
111            .as_micros() as u64
112    }
113
114    /// Allocate the next timestamp (monotonically increasing)
115    ///
116    /// This is the main API for transaction commit timestamps.
117    ///
118    /// ## Guarantees
119    ///
120    /// - Strictly monotonic: result > any previous result
121    /// - Causally consistent: happens-before relationships preserved
122    /// - Bounded drift: timestamp within MAX_DRIFT_US of real time
123    #[inline]
124    pub fn next(&self) -> u64 {
125        loop {
126            let physical = Self::now_physical();
127            let last = self.last_ts.load(Ordering::Acquire);
128            let last_physical = self.last_physical.load(Ordering::Acquire);
129
130            let new_ts = if physical > last_physical {
131                // Physical time advanced - reset logical counter
132                physical << LOGICAL_BITS
133            } else {
134                // Physical time same or regressed - increment logical
135                let logical = (last & LOGICAL_MASK) + 1;
136                if logical > LOGICAL_MASK {
137                    // Logical overflow - wait for physical time to advance
138                    std::thread::yield_now();
139                    continue;
140                }
141                (last & !LOGICAL_MASK) | logical
142            };
143
144            // Ensure monotonicity
145            let new_ts = new_ts.max(last + 1);
146
147            // CAS to update
148            if self
149                .last_ts
150                .compare_exchange_weak(last, new_ts, Ordering::AcqRel, Ordering::Acquire)
151                .is_ok()
152            {
153                // Update last physical if we advanced
154                if physical > last_physical {
155                    self.last_physical.store(physical, Ordering::Release);
156                }
157                return new_ts;
158            }
159            // CAS failed, retry
160        }
161    }
162
163    /// Receive a timestamp from another node (for distributed scenarios)
164    ///
165    /// Updates local clock to be at least as recent as the received timestamp.
166    pub fn receive(&self, remote_ts: u64) {
167        loop {
168            let last = self.last_ts.load(Ordering::Acquire);
169            if remote_ts <= last {
170                return; // Already ahead
171            }
172
173            let physical = Self::now_physical();
174            let remote_physical = remote_ts >> LOGICAL_BITS;
175
176            // Check drift
177            if remote_physical > physical + MAX_DRIFT_US {
178                // Remote clock too far ahead - could indicate attack or misconfiguration
179                // We cap at our physical time + reasonable drift
180                let capped = (physical + MAX_DRIFT_US) << LOGICAL_BITS;
181                if self
182                    .last_ts
183                    .compare_exchange_weak(last, capped.max(last + 1), Ordering::AcqRel, Ordering::Acquire)
184                    .is_ok()
185                {
186                    return;
187                }
188            } else {
189                // Accept remote timestamp
190                if self
191                    .last_ts
192                    .compare_exchange_weak(last, remote_ts, Ordering::AcqRel, Ordering::Acquire)
193                    .is_ok()
194                {
195                    self.last_physical
196                        .fetch_max(remote_physical, Ordering::Release);
197                    return;
198                }
199            }
200        }
201    }
202
203    /// Get the current timestamp without advancing
204    #[inline]
205    pub fn current(&self) -> u64 {
206        self.last_ts.load(Ordering::Acquire)
207    }
208
209    /// Extract physical time component from a timestamp
210    #[inline]
211    pub fn physical_time(ts: u64) -> u64 {
212        ts >> LOGICAL_BITS
213    }
214
215    /// Extract logical counter from a timestamp
216    #[inline]
217    pub fn logical_counter(ts: u64) -> u64 {
218        ts & LOGICAL_MASK
219    }
220
221    /// Compare two timestamps
222    #[inline]
223    pub fn compare(a: u64, b: u64) -> std::cmp::Ordering {
224        a.cmp(&b)
225    }
226
227    /// Check if timestamp a happened before timestamp b
228    #[inline]
229    pub fn happened_before(a: u64, b: u64) -> bool {
230        a < b
231    }
232}
233
234/// HLC timestamp with named components for debugging
235#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
236pub struct HlcTimestamp {
237    /// Raw timestamp value
238    pub raw: u64,
239}
240
241impl HlcTimestamp {
242    /// Create from raw value
243    pub const fn from_raw(raw: u64) -> Self {
244        Self { raw }
245    }
246
247    /// Create from components
248    pub const fn new(physical_us: u64, logical: u16) -> Self {
249        Self {
250            raw: (physical_us << LOGICAL_BITS) | (logical as u64),
251        }
252    }
253
254    /// Get physical time in microseconds
255    pub const fn physical_us(&self) -> u64 {
256        self.raw >> LOGICAL_BITS
257    }
258
259    /// Get logical counter
260    pub const fn logical(&self) -> u16 {
261        (self.raw & LOGICAL_MASK) as u16
262    }
263
264    /// Convert to raw u64
265    pub const fn as_u64(&self) -> u64 {
266        self.raw
267    }
268}
269
270impl std::fmt::Display for HlcTimestamp {
271    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
272        write!(f, "{}:{}", self.physical_us(), self.logical())
273    }
274}
275
276#[cfg(test)]
277mod tests {
278    use super::*;
279    use std::sync::Arc;
280    use std::thread;
281
282    #[test]
283    fn test_monotonicity() {
284        let hlc = HybridLogicalClock::new();
285        let mut prev = 0u64;
286
287        for _ in 0..10000 {
288            let ts = hlc.next();
289            assert!(ts > prev, "Timestamp {} should be > {}", ts, prev);
290            prev = ts;
291        }
292    }
293
294    #[test]
295    fn test_concurrent_monotonicity() {
296        let hlc = Arc::new(HybridLogicalClock::new());
297        let num_threads = 8;
298        let ops_per_thread = 10000;
299
300        let handles: Vec<_> = (0..num_threads)
301            .map(|_| {
302                let hlc = Arc::clone(&hlc);
303                thread::spawn(move || {
304                    let mut timestamps = Vec::with_capacity(ops_per_thread);
305                    for _ in 0..ops_per_thread {
306                        timestamps.push(hlc.next());
307                    }
308                    timestamps
309                })
310            })
311            .collect();
312
313        let mut all_timestamps = Vec::new();
314        for handle in handles {
315            all_timestamps.extend(handle.join().unwrap());
316        }
317
318        // All timestamps should be unique
319        let unique_count = {
320            let mut sorted = all_timestamps.clone();
321            sorted.sort();
322            sorted.dedup();
323            sorted.len()
324        };
325
326        assert_eq!(
327            unique_count,
328            all_timestamps.len(),
329            "All timestamps should be unique"
330        );
331    }
332
333    #[test]
334    fn test_receive_advances_clock() {
335        let hlc = HybridLogicalClock::new();
336        let current = hlc.current();
337
338        // Simulate receiving a future timestamp
339        let future_ts = current + 1000;
340        hlc.receive(future_ts);
341
342        assert!(hlc.current() >= future_ts);
343    }
344
345    #[test]
346    fn test_hlc_timestamp_display() {
347        let ts = HlcTimestamp::new(1000000, 42);
348        assert_eq!(ts.physical_us(), 1000000);
349        assert_eq!(ts.logical(), 42);
350        assert_eq!(format!("{}", ts), "1000000:42");
351    }
352}