1use core::cmp;
29
30use crate::NodeId;
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
45#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
46pub struct HybridTimestamp {
47 pub physical: u64,
49 pub logical: u16,
51 pub node_id: u16,
53}
54
55impl HybridTimestamp {
56 pub fn zero() -> Self {
58 Self {
59 physical: 0,
60 logical: 0,
61 node_id: 0,
62 }
63 }
64
65 pub fn to_u128(&self) -> u128 {
68 ((self.physical as u128) << 64)
69 | ((self.logical as u128) << 48)
70 | ((self.node_id as u128) << 32)
71 }
72}
73
74impl Ord for HybridTimestamp {
75 fn cmp(&self, other: &Self) -> cmp::Ordering {
76 self.physical
77 .cmp(&other.physical)
78 .then(self.logical.cmp(&other.logical))
79 .then(self.node_id.cmp(&other.node_id))
80 }
81}
82
83impl PartialOrd for HybridTimestamp {
84 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
85 Some(self.cmp(other))
86 }
87}
88
89#[derive(Debug, Clone)]
97pub struct HybridClock {
98 node_id: u16,
99 last: HybridTimestamp,
100 physical_time_fn: fn() -> u64,
103}
104
105#[cfg(feature = "std")]
106fn system_time_ms() -> u64 {
107 std::time::SystemTime::now()
108 .duration_since(std::time::UNIX_EPOCH)
109 .unwrap_or_default()
110 .as_millis() as u64
111}
112
113#[cfg(not(feature = "std"))]
114fn fallback_time_ms() -> u64 {
115 0 }
117
118impl HybridClock {
119 pub fn new(node_id: NodeId) -> Self {
127 Self {
128 node_id: node_id as u16,
129 last: HybridTimestamp::zero(),
130 #[cfg(feature = "std")]
131 physical_time_fn: system_time_ms,
132 #[cfg(not(feature = "std"))]
133 physical_time_fn: fallback_time_ms,
134 }
135 }
136
137 pub fn with_time_source(node_id: NodeId, time_fn: fn() -> u64) -> Self {
143 Self {
144 node_id: node_id as u16,
145 last: HybridTimestamp::zero(),
146 physical_time_fn: time_fn,
147 }
148 }
149
150 pub fn now(&mut self) -> HybridTimestamp {
155 let pt = (self.physical_time_fn)();
156
157 if pt > self.last.physical {
158 self.last = HybridTimestamp {
159 physical: pt,
160 logical: 0,
161 node_id: self.node_id,
162 };
163 } else {
164 self.last = HybridTimestamp {
165 physical: self.last.physical,
166 logical: self.last.logical + 1,
167 node_id: self.node_id,
168 };
169 }
170
171 self.last
172 }
173
174 pub fn receive(&mut self, remote: &HybridTimestamp) -> HybridTimestamp {
179 let pt = (self.physical_time_fn)();
180 let max_pt = cmp::max(cmp::max(pt, self.last.physical), remote.physical);
181
182 let logical = if max_pt == self.last.physical && max_pt == remote.physical {
183 cmp::max(self.last.logical, remote.logical) + 1
184 } else if max_pt == self.last.physical {
185 self.last.logical + 1
186 } else if max_pt == remote.physical {
187 remote.logical + 1
188 } else {
189 0
190 };
191
192 self.last = HybridTimestamp {
193 physical: max_pt,
194 logical,
195 node_id: self.node_id,
196 };
197
198 self.last
199 }
200
201 pub fn node_id(&self) -> u16 {
203 self.node_id
204 }
205
206 pub fn last_timestamp(&self) -> HybridTimestamp {
208 self.last
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215 use core::sync::atomic::{AtomicU64, Ordering};
216
217 static MOCK_TIME: AtomicU64 = AtomicU64::new(1000);
218
219 fn mock_time() -> u64 {
220 MOCK_TIME.load(Ordering::SeqCst)
221 }
222
223 fn set_mock_time(ms: u64) {
224 MOCK_TIME.store(ms, Ordering::SeqCst);
225 }
226
227 #[test]
228 fn monotonic_within_same_ms() {
229 set_mock_time(5000);
230 let mut clock = HybridClock::with_time_source(1, mock_time);
231
232 let ts1 = clock.now();
233 let ts2 = clock.now();
234 let ts3 = clock.now();
235
236 assert!(ts1 < ts2);
237 assert!(ts2 < ts3);
238 assert_eq!(ts1.physical, 5000);
239 assert_eq!(ts1.logical, 0);
240 assert_eq!(ts2.logical, 1);
241 assert_eq!(ts3.logical, 2);
242 }
243
244 #[test]
245 fn physical_time_advance_resets_logical() {
246 set_mock_time(1000);
247 let mut clock = HybridClock::with_time_source(1, mock_time);
248
249 let ts1 = clock.now();
250 assert_eq!(ts1.logical, 0);
251
252 let _ts2 = clock.now();
253
254 set_mock_time(2000);
255 let ts3 = clock.now();
256 assert_eq!(ts3.physical, 2000);
257 assert_eq!(ts3.logical, 0);
258 }
259
260 #[test]
261 fn receive_advances_clock() {
262 set_mock_time(1000);
263 let mut clock = HybridClock::with_time_source(1, mock_time);
264
265 let remote = HybridTimestamp {
267 physical: 5000,
268 logical: 3,
269 node_id: 2,
270 };
271
272 let ts = clock.receive(&remote);
273 assert!(ts > remote);
274 assert_eq!(ts.physical, 5000);
275 assert_eq!(ts.logical, 4); }
277
278 #[test]
279 fn receive_same_physical_time() {
280 set_mock_time(5000);
281 let mut clock = HybridClock::with_time_source(1, mock_time);
282
283 let _local = clock.now(); let remote = HybridTimestamp {
286 physical: 5000,
287 logical: 5,
288 node_id: 2,
289 };
290
291 let ts = clock.receive(&remote);
292 assert!(ts > remote);
293 assert_eq!(ts.physical, 5000);
294 assert_eq!(ts.logical, 6); }
296
297 #[test]
298 fn ordering_is_total() {
299 let a = HybridTimestamp {
300 physical: 1000,
301 logical: 0,
302 node_id: 1,
303 };
304 let b = HybridTimestamp {
305 physical: 1000,
306 logical: 0,
307 node_id: 2,
308 };
309 let c = HybridTimestamp {
310 physical: 1000,
311 logical: 1,
312 node_id: 1,
313 };
314
315 assert!(a < b); assert!(a < c); assert!(b < c); }
319
320 #[test]
321 fn to_u128_preserves_ordering() {
322 let a = HybridTimestamp {
323 physical: 1000,
324 logical: 5,
325 node_id: 1,
326 };
327 let b = HybridTimestamp {
328 physical: 1000,
329 logical: 6,
330 node_id: 1,
331 };
332 let c = HybridTimestamp {
333 physical: 1001,
334 logical: 0,
335 node_id: 1,
336 };
337
338 assert!(a.to_u128() < b.to_u128());
339 assert!(b.to_u128() < c.to_u128());
340 }
341}