1use std::sync::Mutex;
16use std::time::{SystemTime, UNIX_EPOCH};
17
18use serde::{Deserialize, Serialize};
19
20use crate::error::{ArrayError, ArrayResult};
21use crate::sync::replica_id::ReplicaId;
22
23pub const MAX_PHYSICAL_MS: u64 = (1u64 << 48) - 1;
29
30#[derive(
35 Copy,
36 Clone,
37 Debug,
38 PartialEq,
39 Eq,
40 Serialize,
41 Deserialize,
42 zerompk::ToMessagePack,
43 zerompk::FromMessagePack,
44)]
45pub struct Hlc {
46 pub physical_ms: u64,
49 pub logical: u16,
51 pub replica_id: ReplicaId,
53}
54
55impl Hlc {
56 pub const ZERO: Self = Self {
58 physical_ms: 0,
59 logical: 0,
60 replica_id: ReplicaId(0),
61 };
62
63 pub fn new(physical_ms: u64, logical: u16, replica_id: ReplicaId) -> ArrayResult<Self> {
66 if physical_ms > MAX_PHYSICAL_MS {
67 return Err(ArrayError::InvalidHlc {
68 detail: format!(
69 "physical_ms {physical_ms} exceeds MAX_PHYSICAL_MS {MAX_PHYSICAL_MS}"
70 ),
71 });
72 }
73 Ok(Self {
74 physical_ms,
75 logical,
76 replica_id,
77 })
78 }
79
80 pub fn to_bytes(&self) -> [u8; 18] {
84 let mut out = [0u8; 18];
85 out[0..8].copy_from_slice(&self.physical_ms.to_be_bytes());
86 out[8..10].copy_from_slice(&self.logical.to_be_bytes());
87 out[10..18].copy_from_slice(&self.replica_id.0.to_be_bytes());
88 out
89 }
90
91 pub fn from_bytes(b: &[u8; 18]) -> Self {
96 let physical_ms = u64::from_be_bytes(
97 b[0..8]
98 .try_into()
99 .expect("invariant: b is [u8; 18], slice [0..8] is exactly 8 bytes"),
100 );
101 let logical = u16::from_be_bytes(
102 b[8..10]
103 .try_into()
104 .expect("invariant: b is [u8; 18], slice [8..10] is exactly 2 bytes"),
105 );
106 let replica_id =
107 ReplicaId(u64::from_be_bytes(b[10..18].try_into().expect(
108 "invariant: b is [u8; 18], slice [10..18] is exactly 8 bytes",
109 )));
110 Self {
111 physical_ms,
112 logical,
113 replica_id,
114 }
115 }
116}
117
118impl PartialOrd for Hlc {
119 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
120 Some(self.cmp(other))
121 }
122}
123
124impl Ord for Hlc {
125 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
126 (self.physical_ms, self.logical, self.replica_id.0).cmp(&(
127 other.physical_ms,
128 other.logical,
129 other.replica_id.0,
130 ))
131 }
132}
133
134pub struct HlcGenerator {
139 replica_id: ReplicaId,
140 state: Mutex<(u64, u16)>,
142}
143
144impl HlcGenerator {
145 pub fn new(replica_id: ReplicaId) -> Self {
147 Self {
148 replica_id,
149 state: Mutex::new((0, 0)),
150 }
151 }
152
153 fn now_ms() -> ArrayResult<u64> {
155 SystemTime::now()
156 .duration_since(UNIX_EPOCH)
157 .map(|d| d.as_millis() as u64)
158 .map_err(|e| ArrayError::InvalidHlc {
159 detail: format!("system clock before Unix epoch: {e}"),
160 })
161 }
162
163 pub fn next(&self) -> ArrayResult<Hlc> {
170 let now_ms = Self::now_ms()?;
171 if now_ms > MAX_PHYSICAL_MS {
172 return Err(ArrayError::InvalidHlc {
173 detail: format!("system clock {now_ms} exceeds MAX_PHYSICAL_MS"),
174 });
175 }
176
177 let mut guard = self.state.lock().map_err(|_| ArrayError::HlcLockPoisoned)?;
178 let (last_physical, last_logical) = *guard;
179
180 let new_physical = now_ms.max(last_physical);
181 let new_logical = if new_physical == last_physical {
182 last_logical
183 .checked_add(1)
184 .ok_or_else(|| ArrayError::InvalidHlc {
185 detail: "logical counter overflow within one millisecond".into(),
186 })?
187 } else {
188 0
189 };
190
191 *guard = (new_physical, new_logical);
192 drop(guard);
193
194 Hlc::new(new_physical, new_logical, self.replica_id)
195 }
196
197 pub fn observe(&self, remote: Hlc) -> ArrayResult<()> {
203 let now_ms = Self::now_ms()?.min(MAX_PHYSICAL_MS);
204
205 let mut guard = self.state.lock().map_err(|_| ArrayError::HlcLockPoisoned)?;
206 let (last_physical, last_logical) = *guard;
207
208 let new_physical = now_ms.max(last_physical).max(remote.physical_ms);
209 let new_logical = if new_physical == last_physical && new_physical == remote.physical_ms {
210 last_logical
212 .max(remote.logical)
213 .checked_add(1)
214 .ok_or_else(|| ArrayError::InvalidHlc {
215 detail: "logical counter overflow during observe".into(),
216 })?
217 } else if new_physical == last_physical {
218 last_logical
219 .checked_add(1)
220 .ok_or_else(|| ArrayError::InvalidHlc {
221 detail: "logical counter overflow during observe (local wins)".into(),
222 })?
223 } else if new_physical == remote.physical_ms {
224 remote
225 .logical
226 .checked_add(1)
227 .ok_or_else(|| ArrayError::InvalidHlc {
228 detail: "logical counter overflow during observe (remote wins)".into(),
229 })?
230 } else {
231 0
233 };
234
235 *guard = (new_physical, new_logical);
236 Ok(())
237 }
238}
239
240#[cfg(test)]
241mod tests {
242 use super::*;
243
244 fn test_replica() -> ReplicaId {
245 ReplicaId::new(42)
246 }
247
248 #[test]
249 fn monotonic_under_fast_calls() {
250 let g = HlcGenerator::new(test_replica());
251 let mut prev = g.next().unwrap();
252 for _ in 0..999 {
253 let curr = g.next().unwrap();
254 assert!(
255 curr > prev,
256 "HLC must be strictly increasing: {curr:?} <= {prev:?}"
257 );
258 prev = curr;
259 }
260 }
261
262 #[test]
263 fn survives_clock_skew_injection() {
264 let g = HlcGenerator::new(test_replica());
265
266 let baseline = g.next().unwrap();
268
269 let future_physical = baseline.physical_ms + 100_000; let future_hlc = Hlc::new(future_physical, 50, ReplicaId::new(99)).unwrap();
272
273 g.observe(future_hlc).unwrap();
275
276 let next = g.next().unwrap();
278 assert!(
279 next > future_hlc,
280 "next {next:?} should be > observed future {future_hlc:?}"
281 );
282 }
283
284 #[test]
285 fn to_bytes_roundtrip() {
286 let hlc = Hlc::new(123_456_789, 7, ReplicaId::new(0xabcd)).unwrap();
287 let bytes = hlc.to_bytes();
288 let back = Hlc::from_bytes(&bytes);
289 assert_eq!(hlc, back);
290 }
291
292 #[test]
293 fn byte_order_matches_lexicographic() {
294 use std::collections::BTreeMap;
295
296 let replica = ReplicaId::new(1);
298 let mut hlcs: Vec<Hlc> = (0u64..100)
299 .map(|i| Hlc {
300 physical_ms: i / 10,
301 logical: (i % 10) as u16,
302 replica_id: replica,
303 })
304 .collect();
305
306 let mut by_ord = hlcs.clone();
308 by_ord.sort();
309
310 hlcs.sort_by_key(|h| h.to_bytes());
312
313 assert_eq!(by_ord, hlcs, "byte sort must match Ord sort");
314
315 let mut map: BTreeMap<[u8; 18], Hlc> = BTreeMap::new();
317 for h in &by_ord {
318 map.insert(h.to_bytes(), *h);
319 }
320 let map_order: Vec<Hlc> = map.into_values().collect();
321 assert_eq!(by_ord, map_order);
322 }
323
324 #[test]
325 fn serialize_roundtrip() {
326 let hlc = Hlc::new(9_999, 3, ReplicaId::new(77)).unwrap();
327 let bytes = zerompk::to_msgpack_vec(&hlc).expect("serialize");
328 let back: Hlc = zerompk::from_msgpack(&bytes).expect("deserialize");
329 assert_eq!(hlc, back);
330 }
331
332 #[test]
333 fn physical_ms_overflow_errors() {
334 let result = Hlc::new(MAX_PHYSICAL_MS + 1, 0, ReplicaId::new(1));
335 assert!(
336 matches!(result, Err(ArrayError::InvalidHlc { .. })),
337 "expected InvalidHlc, got: {result:?}"
338 );
339 }
340
341 #[test]
342 fn hlc_zero_is_minimum() {
343 let non_zero = Hlc::new(1, 0, ReplicaId::new(0)).unwrap();
344 assert!(Hlc::ZERO < non_zero);
345 }
346}