Skip to main content

allsource_core/infrastructure/cluster/
hlc.rs

1/// Hybrid Logical Clock (HLC) for geo-replication.
2///
3/// Combines physical wall-clock time with a logical counter to provide
4/// causally consistent timestamps across distributed nodes. Based on the
5/// HLC algorithm from Kulkarni et al. (2014).
6///
7/// Properties:
8/// - Monotonically increasing on each node
9/// - Causally consistent across nodes (send/receive semantics)
10/// - Close to physical time (bounded drift)
11/// - No coordination required between nodes
12///
13/// # Timestamp Format
14///
15/// An HLC timestamp is a 128-bit value: `(physical_ms: u64, logical: u32, node_id: u32)`
16/// - `physical_ms`: milliseconds since Unix epoch
17/// - `logical`: counter that breaks ties when physical time hasn't advanced
18/// - `node_id`: originating node (for total ordering)
19use serde::{Deserialize, Serialize};
20use std::{
21    cmp::Ordering,
22    time::{SystemTime, UNIX_EPOCH},
23};
24
25/// A hybrid logical clock timestamp.
26#[derive(Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Hash)]
27pub struct HlcTimestamp {
28    /// Physical time in milliseconds since Unix epoch.
29    pub physical_ms: u64,
30    /// Logical counter (monotonically increasing within the same physical ms).
31    pub logical: u32,
32    /// Node ID that generated this timestamp.
33    pub node_id: u32,
34}
35
36impl HlcTimestamp {
37    /// Create a new HLC timestamp.
38    pub fn new(physical_ms: u64, logical: u32, node_id: u32) -> Self {
39        Self {
40            physical_ms,
41            logical,
42            node_id,
43        }
44    }
45
46    /// Zero timestamp (used as initial value).
47    pub fn zero() -> Self {
48        Self {
49            physical_ms: 0,
50            logical: 0,
51            node_id: 0,
52        }
53    }
54
55    /// Encode as a u128 for compact storage and comparison.
56    /// Format: `[physical_ms:64][logical:32][node_id:32]`
57    pub fn to_u128(&self) -> u128 {
58        (u128::from(self.physical_ms) << 64)
59            | (u128::from(self.logical) << 32)
60            | u128::from(self.node_id)
61    }
62
63    /// Decode from a u128.
64    pub fn from_u128(val: u128) -> Self {
65        Self {
66            physical_ms: (val >> 64) as u64,
67            logical: ((val >> 32) & 0xFFFF_FFFF) as u32,
68            node_id: (val & 0xFFFF_FFFF) as u32,
69        }
70    }
71}
72
73impl Ord for HlcTimestamp {
74    fn cmp(&self, other: &Self) -> Ordering {
75        self.physical_ms
76            .cmp(&other.physical_ms)
77            .then(self.logical.cmp(&other.logical))
78            .then(self.node_id.cmp(&other.node_id))
79    }
80}
81
82impl PartialOrd for HlcTimestamp {
83    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
84        Some(self.cmp(other))
85    }
86}
87
88impl std::fmt::Display for HlcTimestamp {
89    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90        write!(f, "{}:{}:{}", self.physical_ms, self.logical, self.node_id)
91    }
92}
93
94/// Hybrid Logical Clock state for a single node.
95///
96/// Thread-safe via a Mutex protecting the (physical_ms, logical) pair.
97/// This ensures the pair is updated atomically, preventing duplicate
98/// timestamps under high contention.
99pub struct HybridLogicalClock {
100    /// This node's ID.
101    node_id: u32,
102    /// Packed state: (physical_ms, logical). Protected by Mutex for atomicity.
103    state: std::sync::Mutex<(u64, u32)>,
104    /// Maximum allowed drift from wall clock (ms). Events with timestamps
105    /// further ahead than this are rejected.
106    max_drift_ms: u64,
107}
108
109impl HybridLogicalClock {
110    /// Create a new HLC for the given node.
111    pub fn new(node_id: u32) -> Self {
112        Self {
113            node_id,
114            state: std::sync::Mutex::new((0, 0)),
115            max_drift_ms: 60_000, // 1 minute default
116        }
117    }
118
119    /// Create with a custom max drift tolerance.
120    pub fn with_max_drift(node_id: u32, max_drift_ms: u64) -> Self {
121        Self {
122            node_id,
123            state: std::sync::Mutex::new((0, 0)),
124            max_drift_ms,
125        }
126    }
127
128    /// Get the node ID.
129    pub fn node_id(&self) -> u32 {
130        self.node_id
131    }
132
133    /// Get the current wall clock time in milliseconds.
134    fn wall_ms() -> u64 {
135        SystemTime::now()
136            .duration_since(UNIX_EPOCH)
137            .unwrap_or_default()
138            .as_millis() as u64
139    }
140
141    /// Generate a new timestamp for a local event.
142    ///
143    /// Algorithm (from HLC paper):
144    /// 1. pt = max(last_physical, wall_clock)
145    /// 2. if pt == last_physical: logical++, else logical = 0
146    /// 3. last_physical = pt
147    pub fn now(&self) -> HlcTimestamp {
148        let wall = Self::wall_ms();
149        let mut guard = self.state.lock().unwrap();
150        let (last_pt, last_l) = *guard;
151
152        let (new_pt, new_l) = if wall > last_pt {
153            (wall, 0)
154        } else {
155            (last_pt, last_l + 1)
156        };
157
158        *guard = (new_pt, new_l);
159        HlcTimestamp::new(new_pt, new_l, self.node_id)
160    }
161
162    /// Update the clock on receiving a remote message/event.
163    ///
164    /// Algorithm:
165    /// 1. pt = max(last_physical, remote_physical, wall_clock)
166    /// 2. Compute logical based on which components contributed to pt
167    /// 3. Validate drift bounds
168    ///
169    /// Returns `Err` if the remote timestamp is too far ahead (drift violation).
170    pub fn receive(&self, remote: &HlcTimestamp) -> Result<HlcTimestamp, HlcDriftError> {
171        let wall = Self::wall_ms();
172
173        // Check drift: remote timestamp shouldn't be too far ahead of our wall clock
174        if remote.physical_ms > wall + self.max_drift_ms {
175            return Err(HlcDriftError {
176                remote_ms: remote.physical_ms,
177                local_wall_ms: wall,
178                max_drift_ms: self.max_drift_ms,
179            });
180        }
181
182        let mut guard = self.state.lock().unwrap();
183        let (last_pt, last_l) = *guard;
184
185        let new_pt = wall.max(last_pt).max(remote.physical_ms);
186
187        let new_l = if new_pt == last_pt && new_pt == remote.physical_ms {
188            // All three tied — advance logical past both
189            last_l.max(remote.logical) + 1
190        } else if new_pt == last_pt {
191            // Wall advanced past remote, but matched our last
192            last_l + 1
193        } else if new_pt == remote.physical_ms {
194            // Remote is the most recent physical time
195            remote.logical + 1
196        } else {
197            // Wall clock is strictly the greatest — reset logical
198            0
199        };
200
201        *guard = (new_pt, new_l);
202        Ok(HlcTimestamp::new(new_pt, new_l, self.node_id))
203    }
204
205    /// Get the current HLC state without advancing it.
206    pub fn current(&self) -> HlcTimestamp {
207        let guard = self.state.lock().unwrap();
208        HlcTimestamp::new(guard.0, guard.1, self.node_id)
209    }
210}
211
212/// Error when a remote HLC timestamp exceeds the max allowed drift.
213#[derive(Debug, Clone)]
214pub struct HlcDriftError {
215    pub remote_ms: u64,
216    pub local_wall_ms: u64,
217    pub max_drift_ms: u64,
218}
219
220impl std::fmt::Display for HlcDriftError {
221    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222        write!(
223            f,
224            "HLC drift violation: remote={}, local_wall={}, max_drift={}",
225            self.remote_ms, self.local_wall_ms, self.max_drift_ms,
226        )
227    }
228}
229
230impl std::error::Error for HlcDriftError {}
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235
236    #[test]
237    fn test_hlc_now_monotonic() {
238        let hlc = HybridLogicalClock::new(1);
239        let t1 = hlc.now();
240        let t2 = hlc.now();
241        let t3 = hlc.now();
242        assert!(t1 < t2);
243        assert!(t2 < t3);
244    }
245
246    #[test]
247    fn test_hlc_now_uses_wall_clock() {
248        let hlc = HybridLogicalClock::new(1);
249        let before = HybridLogicalClock::wall_ms();
250        let ts = hlc.now();
251        let after = HybridLogicalClock::wall_ms();
252        assert!(ts.physical_ms >= before);
253        assert!(ts.physical_ms <= after + 1); // +1 for timing tolerance
254    }
255
256    #[test]
257    fn test_hlc_receive_advances_past_remote() {
258        let hlc = HybridLogicalClock::new(1);
259        let _ = hlc.now(); // initialize local state
260
261        // Simulate a remote timestamp far in the future (but within drift)
262        let remote = HlcTimestamp::new(HybridLogicalClock::wall_ms() + 100, 5, 2);
263        let ts = hlc.receive(&remote).unwrap();
264        assert!(ts > remote);
265    }
266
267    #[test]
268    fn test_hlc_receive_drift_rejected() {
269        let hlc = HybridLogicalClock::with_max_drift(1, 1000); // 1 second max drift
270        let remote = HlcTimestamp::new(HybridLogicalClock::wall_ms() + 5000, 0, 2);
271        assert!(hlc.receive(&remote).is_err());
272    }
273
274    #[test]
275    fn test_hlc_receive_same_physical_time() {
276        let hlc = HybridLogicalClock::new(1);
277        let wall = HybridLogicalClock::wall_ms();
278
279        // Set local state to current wall time with logical=5
280        let remote1 = HlcTimestamp::new(wall, 5, 2);
281        let ts1 = hlc.receive(&remote1).unwrap();
282        assert!(ts1.logical > 5);
283
284        // Another remote at same physical time but higher logical
285        let remote2 = HlcTimestamp::new(wall, 10, 3);
286        let ts2 = hlc.receive(&remote2).unwrap();
287        assert!(ts2 > ts1);
288    }
289
290    #[test]
291    fn test_hlc_timestamp_ordering() {
292        let a = HlcTimestamp::new(100, 0, 1);
293        let b = HlcTimestamp::new(100, 1, 1);
294        let c = HlcTimestamp::new(101, 0, 1);
295        let d = HlcTimestamp::new(100, 0, 2);
296
297        assert!(a < b); // same physical, higher logical
298        assert!(b < c); // higher physical wins
299        assert!(a < d); // same physical+logical, higher node_id
300    }
301
302    #[test]
303    fn test_hlc_timestamp_u128_roundtrip() {
304        let ts = HlcTimestamp::new(1_700_000_000_000, 42, 7);
305        let encoded = ts.to_u128();
306        let decoded = HlcTimestamp::from_u128(encoded);
307        assert_eq!(ts, decoded);
308    }
309
310    #[test]
311    fn test_hlc_timestamp_display() {
312        let ts = HlcTimestamp::new(1000, 5, 3);
313        assert_eq!(ts.to_string(), "1000:5:3");
314    }
315
316    #[test]
317    fn test_hlc_zero() {
318        let z = HlcTimestamp::zero();
319        assert_eq!(z.physical_ms, 0);
320        assert_eq!(z.logical, 0);
321        assert_eq!(z.node_id, 0);
322    }
323
324    #[test]
325    fn test_hlc_concurrent_access() {
326        use std::sync::Arc;
327        let hlc = Arc::new(HybridLogicalClock::new(1));
328        let mut handles = vec![];
329
330        for _ in 0..10 {
331            let hlc = Arc::clone(&hlc);
332            handles.push(std::thread::spawn(move || {
333                let mut timestamps = Vec::new();
334                for _ in 0..100 {
335                    timestamps.push(hlc.now());
336                }
337                timestamps
338            }));
339        }
340
341        let mut all: Vec<HlcTimestamp> = vec![];
342        for h in handles {
343            all.extend(h.join().unwrap());
344        }
345
346        // All timestamps should be unique
347        let mut sorted = all.clone();
348        sorted.sort();
349        sorted.dedup();
350        assert_eq!(
351            sorted.len(),
352            all.len(),
353            "HLC must produce unique timestamps under contention"
354        );
355    }
356}