1use core::cmp;
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
39#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
40pub struct HybridTimestamp {
41 pub physical: u64,
43 pub logical: u16,
45 pub node_id: u16,
47}
48
49impl HybridTimestamp {
50 pub fn zero() -> Self {
52 Self {
53 physical: 0,
54 logical: 0,
55 node_id: 0,
56 }
57 }
58
59 pub fn to_u128(&self) -> u128 {
62 ((self.physical as u128) << 64)
63 | ((self.logical as u128) << 48)
64 | ((self.node_id as u128) << 32)
65 }
66}
67
68impl Ord for HybridTimestamp {
69 fn cmp(&self, other: &Self) -> cmp::Ordering {
70 self.physical
71 .cmp(&other.physical)
72 .then(self.logical.cmp(&other.logical))
73 .then(self.node_id.cmp(&other.node_id))
74 }
75}
76
77impl PartialOrd for HybridTimestamp {
78 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
79 Some(self.cmp(other))
80 }
81}
82
83pub struct HybridClock {
88 node_id: u16,
89 last: HybridTimestamp,
90 physical_time_fn: fn() -> u64,
93}
94
95#[cfg(feature = "std")]
96fn system_time_ms() -> u64 {
97 std::time::SystemTime::now()
98 .duration_since(std::time::UNIX_EPOCH)
99 .unwrap_or_default()
100 .as_millis() as u64
101}
102
103#[cfg(not(feature = "std"))]
104fn fallback_time_ms() -> u64 {
105 0 }
107
108impl HybridClock {
109 pub fn new(node_id: u16) -> Self {
114 Self {
115 node_id,
116 last: HybridTimestamp::zero(),
117 #[cfg(feature = "std")]
118 physical_time_fn: system_time_ms,
119 #[cfg(not(feature = "std"))]
120 physical_time_fn: fallback_time_ms,
121 }
122 }
123
124 pub fn with_time_source(node_id: u16, time_fn: fn() -> u64) -> Self {
127 Self {
128 node_id,
129 last: HybridTimestamp::zero(),
130 physical_time_fn: time_fn,
131 }
132 }
133
134 pub fn now(&mut self) -> HybridTimestamp {
139 let pt = (self.physical_time_fn)();
140
141 if pt > self.last.physical {
142 self.last = HybridTimestamp {
143 physical: pt,
144 logical: 0,
145 node_id: self.node_id,
146 };
147 } else {
148 self.last = HybridTimestamp {
149 physical: self.last.physical,
150 logical: self.last.logical + 1,
151 node_id: self.node_id,
152 };
153 }
154
155 self.last
156 }
157
158 pub fn receive(&mut self, remote: &HybridTimestamp) -> HybridTimestamp {
163 let pt = (self.physical_time_fn)();
164 let max_pt = cmp::max(cmp::max(pt, self.last.physical), remote.physical);
165
166 let logical = if max_pt == self.last.physical && max_pt == remote.physical {
167 cmp::max(self.last.logical, remote.logical) + 1
168 } else if max_pt == self.last.physical {
169 self.last.logical + 1
170 } else if max_pt == remote.physical {
171 remote.logical + 1
172 } else {
173 0
174 };
175
176 self.last = HybridTimestamp {
177 physical: max_pt,
178 logical,
179 node_id: self.node_id,
180 };
181
182 self.last
183 }
184
185 pub fn node_id(&self) -> u16 {
187 self.node_id
188 }
189
190 pub fn last_timestamp(&self) -> HybridTimestamp {
192 self.last
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use super::*;
199 use core::sync::atomic::{AtomicU64, Ordering};
200
201 static MOCK_TIME: AtomicU64 = AtomicU64::new(1000);
202
203 fn mock_time() -> u64 {
204 MOCK_TIME.load(Ordering::SeqCst)
205 }
206
207 fn set_mock_time(ms: u64) {
208 MOCK_TIME.store(ms, Ordering::SeqCst);
209 }
210
211 #[test]
212 fn monotonic_within_same_ms() {
213 set_mock_time(5000);
214 let mut clock = HybridClock::with_time_source(1, mock_time);
215
216 let ts1 = clock.now();
217 let ts2 = clock.now();
218 let ts3 = clock.now();
219
220 assert!(ts1 < ts2);
221 assert!(ts2 < ts3);
222 assert_eq!(ts1.physical, 5000);
223 assert_eq!(ts1.logical, 0);
224 assert_eq!(ts2.logical, 1);
225 assert_eq!(ts3.logical, 2);
226 }
227
228 #[test]
229 fn physical_time_advance_resets_logical() {
230 set_mock_time(1000);
231 let mut clock = HybridClock::with_time_source(1, mock_time);
232
233 let ts1 = clock.now();
234 assert_eq!(ts1.logical, 0);
235
236 let _ts2 = clock.now();
237
238 set_mock_time(2000);
239 let ts3 = clock.now();
240 assert_eq!(ts3.physical, 2000);
241 assert_eq!(ts3.logical, 0);
242 }
243
244 #[test]
245 fn receive_advances_clock() {
246 set_mock_time(1000);
247 let mut clock = HybridClock::with_time_source(1, mock_time);
248
249 let remote = HybridTimestamp {
251 physical: 5000,
252 logical: 3,
253 node_id: 2,
254 };
255
256 let ts = clock.receive(&remote);
257 assert!(ts > remote);
258 assert_eq!(ts.physical, 5000);
259 assert_eq!(ts.logical, 4); }
261
262 #[test]
263 fn receive_same_physical_time() {
264 set_mock_time(5000);
265 let mut clock = HybridClock::with_time_source(1, mock_time);
266
267 let _local = clock.now(); let remote = HybridTimestamp {
270 physical: 5000,
271 logical: 5,
272 node_id: 2,
273 };
274
275 let ts = clock.receive(&remote);
276 assert!(ts > remote);
277 assert_eq!(ts.physical, 5000);
278 assert_eq!(ts.logical, 6); }
280
281 #[test]
282 fn ordering_is_total() {
283 let a = HybridTimestamp {
284 physical: 1000,
285 logical: 0,
286 node_id: 1,
287 };
288 let b = HybridTimestamp {
289 physical: 1000,
290 logical: 0,
291 node_id: 2,
292 };
293 let c = HybridTimestamp {
294 physical: 1000,
295 logical: 1,
296 node_id: 1,
297 };
298
299 assert!(a < b); assert!(a < c); assert!(b < c); }
303
304 #[test]
305 fn to_u128_preserves_ordering() {
306 let a = HybridTimestamp {
307 physical: 1000,
308 logical: 5,
309 node_id: 1,
310 };
311 let b = HybridTimestamp {
312 physical: 1000,
313 logical: 6,
314 node_id: 1,
315 };
316 let c = HybridTimestamp {
317 physical: 1001,
318 logical: 0,
319 node_id: 1,
320 };
321
322 assert!(a.to_u128() < b.to_u128());
323 assert!(b.to_u128() < c.to_u128());
324 }
325}