allsource_core/infrastructure/cluster/
hlc.rs1use serde::{Deserialize, Serialize};
20use std::{
21 cmp::Ordering,
22 time::{SystemTime, UNIX_EPOCH},
23};
24
25#[derive(Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Hash)]
27pub struct HlcTimestamp {
28 pub physical_ms: u64,
30 pub logical: u32,
32 pub node_id: u32,
34}
35
36impl HlcTimestamp {
37 pub fn new(physical_ms: u64, logical: u32, node_id: u32) -> Self {
39 Self {
40 physical_ms,
41 logical,
42 node_id,
43 }
44 }
45
46 pub fn zero() -> Self {
48 Self {
49 physical_ms: 0,
50 logical: 0,
51 node_id: 0,
52 }
53 }
54
55 pub fn to_u128(&self) -> u128 {
58 (u128::from(self.physical_ms) << 64)
59 | (u128::from(self.logical) << 32)
60 | u128::from(self.node_id)
61 }
62
63 pub fn from_u128(val: u128) -> Self {
65 Self {
66 physical_ms: (val >> 64) as u64,
67 logical: ((val >> 32) & 0xFFFF_FFFF) as u32,
68 node_id: (val & 0xFFFF_FFFF) as u32,
69 }
70 }
71}
72
73impl Ord for HlcTimestamp {
74 fn cmp(&self, other: &Self) -> Ordering {
75 self.physical_ms
76 .cmp(&other.physical_ms)
77 .then(self.logical.cmp(&other.logical))
78 .then(self.node_id.cmp(&other.node_id))
79 }
80}
81
82impl PartialOrd for HlcTimestamp {
83 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
84 Some(self.cmp(other))
85 }
86}
87
88impl std::fmt::Display for HlcTimestamp {
89 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90 write!(f, "{}:{}:{}", self.physical_ms, self.logical, self.node_id)
91 }
92}
93
94pub struct HybridLogicalClock {
100 node_id: u32,
102 state: std::sync::Mutex<(u64, u32)>,
104 max_drift_ms: u64,
107}
108
109impl HybridLogicalClock {
110 pub fn new(node_id: u32) -> Self {
112 Self {
113 node_id,
114 state: std::sync::Mutex::new((0, 0)),
115 max_drift_ms: 60_000, }
117 }
118
119 pub fn with_max_drift(node_id: u32, max_drift_ms: u64) -> Self {
121 Self {
122 node_id,
123 state: std::sync::Mutex::new((0, 0)),
124 max_drift_ms,
125 }
126 }
127
128 pub fn node_id(&self) -> u32 {
130 self.node_id
131 }
132
133 fn wall_ms() -> u64 {
135 SystemTime::now()
136 .duration_since(UNIX_EPOCH)
137 .unwrap_or_default()
138 .as_millis() as u64
139 }
140
141 pub fn now(&self) -> HlcTimestamp {
148 let wall = Self::wall_ms();
149 let mut guard = self.state.lock().unwrap();
150 let (last_pt, last_l) = *guard;
151
152 let (new_pt, new_l) = if wall > last_pt {
153 (wall, 0)
154 } else {
155 (last_pt, last_l + 1)
156 };
157
158 *guard = (new_pt, new_l);
159 HlcTimestamp::new(new_pt, new_l, self.node_id)
160 }
161
162 pub fn receive(&self, remote: &HlcTimestamp) -> Result<HlcTimestamp, HlcDriftError> {
171 let wall = Self::wall_ms();
172
173 if remote.physical_ms > wall + self.max_drift_ms {
175 return Err(HlcDriftError {
176 remote_ms: remote.physical_ms,
177 local_wall_ms: wall,
178 max_drift_ms: self.max_drift_ms,
179 });
180 }
181
182 let mut guard = self.state.lock().unwrap();
183 let (last_pt, last_l) = *guard;
184
185 let new_pt = wall.max(last_pt).max(remote.physical_ms);
186
187 let new_l = if new_pt == last_pt && new_pt == remote.physical_ms {
188 last_l.max(remote.logical) + 1
190 } else if new_pt == last_pt {
191 last_l + 1
193 } else if new_pt == remote.physical_ms {
194 remote.logical + 1
196 } else {
197 0
199 };
200
201 *guard = (new_pt, new_l);
202 Ok(HlcTimestamp::new(new_pt, new_l, self.node_id))
203 }
204
205 pub fn current(&self) -> HlcTimestamp {
207 let guard = self.state.lock().unwrap();
208 HlcTimestamp::new(guard.0, guard.1, self.node_id)
209 }
210}
211
212#[derive(Debug, Clone)]
214pub struct HlcDriftError {
215 pub remote_ms: u64,
216 pub local_wall_ms: u64,
217 pub max_drift_ms: u64,
218}
219
220impl std::fmt::Display for HlcDriftError {
221 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222 write!(
223 f,
224 "HLC drift violation: remote={}, local_wall={}, max_drift={}",
225 self.remote_ms, self.local_wall_ms, self.max_drift_ms,
226 )
227 }
228}
229
230impl std::error::Error for HlcDriftError {}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235
236 #[test]
237 fn test_hlc_now_monotonic() {
238 let hlc = HybridLogicalClock::new(1);
239 let t1 = hlc.now();
240 let t2 = hlc.now();
241 let t3 = hlc.now();
242 assert!(t1 < t2);
243 assert!(t2 < t3);
244 }
245
246 #[test]
247 fn test_hlc_now_uses_wall_clock() {
248 let hlc = HybridLogicalClock::new(1);
249 let before = HybridLogicalClock::wall_ms();
250 let ts = hlc.now();
251 let after = HybridLogicalClock::wall_ms();
252 assert!(ts.physical_ms >= before);
253 assert!(ts.physical_ms <= after + 1); }
255
256 #[test]
257 fn test_hlc_receive_advances_past_remote() {
258 let hlc = HybridLogicalClock::new(1);
259 let _ = hlc.now(); let remote = HlcTimestamp::new(HybridLogicalClock::wall_ms() + 100, 5, 2);
263 let ts = hlc.receive(&remote).unwrap();
264 assert!(ts > remote);
265 }
266
267 #[test]
268 fn test_hlc_receive_drift_rejected() {
269 let hlc = HybridLogicalClock::with_max_drift(1, 1000); let remote = HlcTimestamp::new(HybridLogicalClock::wall_ms() + 5000, 0, 2);
271 assert!(hlc.receive(&remote).is_err());
272 }
273
274 #[test]
275 fn test_hlc_receive_same_physical_time() {
276 let hlc = HybridLogicalClock::new(1);
277 let wall = HybridLogicalClock::wall_ms();
278
279 let remote1 = HlcTimestamp::new(wall, 5, 2);
281 let ts1 = hlc.receive(&remote1).unwrap();
282 assert!(ts1.logical > 5);
283
284 let remote2 = HlcTimestamp::new(wall, 10, 3);
286 let ts2 = hlc.receive(&remote2).unwrap();
287 assert!(ts2 > ts1);
288 }
289
290 #[test]
291 fn test_hlc_timestamp_ordering() {
292 let a = HlcTimestamp::new(100, 0, 1);
293 let b = HlcTimestamp::new(100, 1, 1);
294 let c = HlcTimestamp::new(101, 0, 1);
295 let d = HlcTimestamp::new(100, 0, 2);
296
297 assert!(a < b); assert!(b < c); assert!(a < d); }
301
302 #[test]
303 fn test_hlc_timestamp_u128_roundtrip() {
304 let ts = HlcTimestamp::new(1_700_000_000_000, 42, 7);
305 let encoded = ts.to_u128();
306 let decoded = HlcTimestamp::from_u128(encoded);
307 assert_eq!(ts, decoded);
308 }
309
310 #[test]
311 fn test_hlc_timestamp_display() {
312 let ts = HlcTimestamp::new(1000, 5, 3);
313 assert_eq!(ts.to_string(), "1000:5:3");
314 }
315
316 #[test]
317 fn test_hlc_zero() {
318 let z = HlcTimestamp::zero();
319 assert_eq!(z.physical_ms, 0);
320 assert_eq!(z.logical, 0);
321 assert_eq!(z.node_id, 0);
322 }
323
324 #[test]
325 fn test_hlc_concurrent_access() {
326 use std::sync::Arc;
327 let hlc = Arc::new(HybridLogicalClock::new(1));
328 let mut handles = vec![];
329
330 for _ in 0..10 {
331 let hlc = Arc::clone(&hlc);
332 handles.push(std::thread::spawn(move || {
333 let mut timestamps = Vec::new();
334 for _ in 0..100 {
335 timestamps.push(hlc.now());
336 }
337 timestamps
338 }));
339 }
340
341 let mut all: Vec<HlcTimestamp> = vec![];
342 for h in handles {
343 all.extend(h.join().unwrap());
344 }
345
346 let mut sorted = all.clone();
348 sorted.sort();
349 sorted.dedup();
350 assert_eq!(
351 sorted.len(),
352 all.len(),
353 "HLC must produce unique timestamps under contention"
354 );
355 }
356}