1use std::cmp::Ordering;
14use std::sync::Mutex;
15use std::time::{SystemTime, UNIX_EPOCH};
16
17use serde::{Deserialize, Serialize};
18
19#[non_exhaustive]
30#[derive(
31 Debug,
32 Clone,
33 Copy,
34 Default,
35 PartialEq,
36 Eq,
37 Hash,
38 Serialize,
39 Deserialize,
40 zerompk::ToMessagePack,
41 zerompk::FromMessagePack,
42)]
43pub struct Hlc {
44 pub wall_ns: u64,
46 pub logical: u64,
48}
49
50impl Hlc {
51 pub const ZERO: Hlc = Hlc {
52 wall_ns: 0,
53 logical: 0,
54 };
55
56 pub const fn new(wall_ns: u64, logical: u64) -> Self {
57 Self { wall_ns, logical }
58 }
59}
60
61impl PartialOrd for Hlc {
62 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
63 Some(self.cmp(other))
64 }
65}
66
67impl Ord for Hlc {
68 fn cmp(&self, other: &Self) -> Ordering {
69 match self.wall_ns.cmp(&other.wall_ns) {
70 Ordering::Equal => self.logical.cmp(&other.logical),
71 other => other,
72 }
73 }
74}
75
76#[derive(Debug, Default)]
78pub struct HlcClock {
79 state: Mutex<Hlc>,
80}
81
82impl HlcClock {
83 pub fn new() -> Self {
84 Self {
85 state: Mutex::new(Hlc::ZERO),
86 }
87 }
88
89 pub fn now(&self) -> Hlc {
92 let wall = wall_now_ns();
93 let mut st = self.state.lock().unwrap_or_else(|p| p.into_inner());
94 let wall = wall.max(st.wall_ns);
97 let next = if wall > st.wall_ns {
98 Hlc::new(wall, 0)
99 } else {
100 Hlc::new(st.wall_ns, st.logical + 1)
101 };
102 *st = next;
103 next
104 }
105
106 pub fn update(&self, remote: Hlc) -> Hlc {
109 let wall = wall_now_ns();
110 let mut st = self.state.lock().unwrap_or_else(|p| p.into_inner());
111 let prev = *st;
112 let max_wall = wall.max(prev.wall_ns).max(remote.wall_ns);
113 let next_logical = if max_wall == prev.wall_ns && max_wall == remote.wall_ns {
114 prev.logical.max(remote.logical) + 1
115 } else if max_wall == prev.wall_ns {
116 prev.logical + 1
117 } else if max_wall == remote.wall_ns {
118 remote.logical + 1
119 } else {
120 0
121 };
122 let next = Hlc::new(max_wall, next_logical);
123 *st = next;
124 next
125 }
126
127 pub fn peek(&self) -> Hlc {
129 *self.state.lock().unwrap_or_else(|p| p.into_inner())
130 }
131}
132
133fn wall_now_ns() -> u64 {
134 SystemTime::now()
135 .duration_since(UNIX_EPOCH)
136 .map(|d| d.as_nanos() as u64)
137 .unwrap_or_else(|_| {
138 use std::sync::atomic::{AtomicBool, Ordering};
139 static LOGGED: AtomicBool = AtomicBool::new(false);
140 if !LOGGED.swap(true, Ordering::Relaxed) {
141 tracing::error!(
142 module = module_path!(),
143 "system clock is before UNIX_EPOCH; using 0 (epoch) \
144 — check NTP/RTC configuration"
145 );
146 }
147 0
148 })
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154
155 #[test]
156 fn monotonic_within_tick() {
157 let clock = HlcClock::new();
158 let mut prev = clock.now();
159 for _ in 0..1_000 {
160 let next = clock.now();
161 assert!(next > prev);
162 prev = next;
163 }
164 }
165
166 #[test]
167 fn update_produces_strictly_greater() {
168 let clock = HlcClock::new();
169 let local = clock.now();
170 let remote = Hlc::new(local.wall_ns + 1_000_000, 7);
171 let merged = clock.update(remote);
172 assert!(merged > remote);
173 assert!(merged > local);
174 }
175
176 #[test]
177 fn ordering_is_total() {
178 let a = Hlc::new(10, 0);
179 let b = Hlc::new(10, 1);
180 let c = Hlc::new(11, 0);
181 assert!(a < b);
182 assert!(b < c);
183 assert!(a < c);
184 }
185
186 #[test]
190 fn burst_strictly_monotonic() {
191 let clock = HlcClock::new();
192 let mut prev = clock.now();
193 for _ in 0..100_000 {
194 let next = clock.now();
195 assert!(
196 next > prev,
197 "monotonicity violated: prev={prev:?} next={next:?}"
198 );
199 prev = next;
200 }
201 }
202
203 #[test]
208 fn saturating_add_regression() {
209 let clock = HlcClock::new();
210 let seed = clock.now();
212 {
215 let mut st = clock.state.lock().unwrap_or_else(|p| p.into_inner());
216 *st = Hlc::new(seed.wall_ns, u32::MAX as u64);
217 }
218 let next = clock.now();
219 assert!(
220 next > Hlc::new(seed.wall_ns, u32::MAX as u64),
221 "logical counter pinned at u32::MAX: next={next:?}"
222 );
223 }
224
225 #[test]
228 fn update_burst_strictly_monotonic() {
229 let clock = HlcClock::new();
230 let remote = clock.now();
231 let mut prev = clock.update(remote);
232 for _ in 0..100_000 {
233 let next = clock.update(remote);
234 assert!(
235 next > prev,
236 "update monotonicity violated: prev={prev:?} next={next:?}"
237 );
238 prev = next;
239 }
240 }
241
242 #[test]
245 fn roundtrip_logical_gt_u32_max() {
246 let logical_val = (u32::MAX as u64) + 1;
247 let original = Hlc::new(1_700_000_000_000_000_000_u64, logical_val);
248 let bytes = zerompk::to_msgpack_vec(&original).expect("encode");
249 let decoded: Hlc = zerompk::from_msgpack(&bytes).expect("decode");
250 assert_eq!(decoded, original);
251 assert_eq!(decoded.logical, logical_val);
252 }
253}