1use std::cmp;
2use std::fmt::{Display, Formatter};
3use std::str::FromStr;
4use std::time::{Duration, SystemTime, UNIX_EPOCH};
5
6#[cfg(feature = "rkyv-support")]
7use rkyv::{Archive, Deserialize, Serialize};
8
9pub const MAX_CLOCK_DRIFT: Duration = Duration::from_secs(4_100);
11pub const TIMESTAMP_MAX: u64 = (1 << 32) - 1;
13pub const DATACAKE_EPOCH: Duration = Duration::from_secs(1672534861);
17
18#[derive(Debug, Hash, Copy, Clone, Eq, PartialEq, PartialOrd, Ord)]
19#[repr(C)]
20#[cfg_attr(feature = "rkyv", derive(Serialize, Deserialize, Archive))]
21#[cfg_attr(feature = "rkyv", archive(compare(PartialEq), check_bytes))]
22#[cfg_attr(feature = "rkyv", archive_attr(repr(C), derive(Debug)))]
23pub struct HLCTimestamp(u64);
58
59impl HLCTimestamp {
60 pub fn new(duration: Duration, counter: u16, node: u8) -> Self {
64 let seconds = duration.as_secs();
65 assert!(
66 seconds <= TIMESTAMP_MAX,
67 "Timestamp cannot go beyond the maximum capacity of 32 bits. Has 500 years elapsed?",
68 );
69
70 Self(pack(duration, counter, node))
71 }
72
73 pub fn now(counter: u16, node: u8) -> Self {
77 let duration = get_datacake_timestamp();
78 Self::new(duration, counter, node)
79 }
80
81 #[inline]
82 pub fn node(&self) -> u8 {
84 (self.0 & 0xFF).try_into().unwrap_or_default()
85 }
86
87 #[inline]
88 pub fn counter(&self) -> u16 {
90 ((self.0 >> 8) & 0xFFFF).try_into().unwrap_or_default()
91 }
92
93 #[inline]
94 pub fn seconds(&self) -> u64 {
99 self.0 >> 32
100 }
101
102 #[inline]
103 pub fn fractional(&self) -> u8 {
107 ((self.0 >> 24) & 0xFF).try_into().unwrap_or_default()
108 }
109
110 #[inline]
111 pub fn unix_timestamp(&self) -> Duration {
119 parts_as_duration(self.seconds(), self.fractional()) + DATACAKE_EPOCH
120 }
121
122 #[inline]
123 pub fn datacake_timestamp(&self) -> Duration {
128 parts_as_duration(self.seconds(), self.fractional())
129 }
130
131 #[inline]
132 pub fn as_u64(&self) -> u64 {
134 self.0
135 }
136
137 #[inline]
138 pub fn from_u64(val: u64) -> Self {
144 Self(val)
145 }
146
147 pub fn send(&mut self) -> Result<Self, TimestampError> {
150 let ts = get_datacake_timestamp();
151
152 let ts_old = self.datacake_timestamp();
153 let c_old = self.counter();
154
155 let ts_new = cmp::max(ts_old, ts);
159
160 if ts_new.saturating_sub(ts) > MAX_CLOCK_DRIFT {
161 return Err(TimestampError::ClockDrift);
162 }
163
164 let c_new = if ts_old == ts_new {
165 c_old.checked_add(1).ok_or(TimestampError::Overflow)?
166 } else {
167 0
168 };
169
170 self.0 = pack(ts_new, c_new, self.node());
171
172 Ok(*self)
173 }
174
175 pub fn recv(&mut self, msg: &Self) -> Result<Self, TimestampError> {
179 if self.node() == msg.node() {
180 return Err(TimestampError::DuplicatedNode(msg.node()));
181 }
182
183 let ts = get_datacake_timestamp();
184
185 let ts_msg = msg.datacake_timestamp();
187 let c_msg = msg.counter();
188
189 if ts_msg.saturating_sub(ts) > MAX_CLOCK_DRIFT {
191 return Err(TimestampError::ClockDrift);
192 }
193
194 let ts_old = self.datacake_timestamp();
196 let c_old = self.counter();
197
198 let ts_new = cmp::max(cmp::max(ts_old, ts), ts_msg);
206
207 if ts_new.saturating_sub(ts) > MAX_CLOCK_DRIFT {
208 return Err(TimestampError::ClockDrift);
209 }
210
211 let c_new = {
212 if ts_new == ts_old && ts_new == ts_msg {
213 cmp::max(c_old, c_msg)
214 .checked_add(1)
215 .ok_or(TimestampError::Overflow)?
216 } else if ts_new == ts_old {
217 c_old.checked_add(1).ok_or(TimestampError::Overflow)?
218 } else if ts_new == ts_msg {
219 c_msg.checked_add(1).ok_or(TimestampError::Overflow)?
220 } else {
221 0
222 }
223 };
224
225 self.0 = pack(ts_new, c_new, self.node());
226
227 Ok(Self::new(ts_new, self.counter(), msg.node()))
228 }
229}
230
231impl Display for HLCTimestamp {
232 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
233 write!(
234 f,
235 "{}-{:0>4}-{:0>4X}-{:0>4}",
236 self.seconds(),
237 self.fractional(),
238 self.counter(),
239 self.node()
240 )
241 }
242}
243
244impl FromStr for HLCTimestamp {
245 type Err = InvalidFormat;
246
247 fn from_str(s: &str) -> Result<Self, Self::Err> {
248 let mut splits = s.splitn(4, '-');
249
250 let seconds = splits
251 .next()
252 .and_then(|v| v.parse::<u64>().ok())
253 .ok_or(InvalidFormat)?;
254 let fractional = splits
255 .next()
256 .and_then(|v| v.parse::<u8>().ok())
257 .ok_or(InvalidFormat)?;
258 let counter = splits
259 .next()
260 .and_then(|v| u16::from_str_radix(v, 16).ok())
261 .ok_or(InvalidFormat)?;
262 let node = splits
263 .next()
264 .and_then(|v| v.parse::<u8>().ok())
265 .ok_or(InvalidFormat)?;
266
267 Ok(Self::new(
268 parts_as_duration(seconds, fractional),
269 counter,
270 node,
271 ))
272 }
273}
274
275fn pack(duration: Duration, counter: u16, node: u8) -> u64 {
277 let (seconds, fractional) = duration_to_parts(duration);
278
279 let counter = counter as u64;
280 let fractional = fractional as u64;
281 let node = node as u64;
282
283 (seconds << 32) | (fractional << 24) | (counter << 8) | node
284}
285
286fn duration_to_parts(duration: Duration) -> (u64, u8) {
287 let seconds = duration.as_secs();
288 let fractional = (duration.subsec_millis() / 4) as u8;
289 (seconds, fractional)
290}
291
292fn parts_as_duration(seconds: u64, fractional: u8) -> Duration {
293 Duration::from_secs(seconds) + Duration::from_millis(fractional as u64 * 4)
294}
295
296#[derive(Debug, Copy, Clone, thiserror::Error)]
297#[error("Invalid timestamp format.")]
298pub struct InvalidFormat;
300
301#[derive(Debug, thiserror::Error)]
302pub enum TimestampError {
304 #[error("Expected a different unique node, got node with the same id. {0:?}")]
305 DuplicatedNode(u8),
307
308 #[error("The clock drift difference is too high to be used.")]
309 ClockDrift,
313
314 #[error("The timestamp counter is beyond the capacity of a u16 integer.")]
315 Overflow,
318}
319
320pub fn get_unix_timestamp_ms() -> u64 {
322 SystemTime::now()
323 .duration_since(UNIX_EPOCH)
324 .unwrap()
325 .as_millis() as u64
326}
327
328pub fn get_datacake_timestamp() -> Duration {
333 let duration = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
334
335 let (seconds, fractional) = duration_to_parts(duration - DATACAKE_EPOCH);
336 parts_as_duration(seconds, fractional)
337}
338
339#[cfg(test)]
340mod tests {
341 use super::*;
342
343 const TEST_TS: Duration = Duration::from_secs(1);
344
345 #[test]
346 fn test_unix_timestamp_conversion() {
347 let unix_ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
348 let dc_ts = get_datacake_timestamp();
349 let converted_ts = dc_ts + DATACAKE_EPOCH;
350
351 let fractional = (unix_ts.subsec_millis() / 4) * 4;
352
353 assert_eq!(converted_ts.as_secs(), unix_ts.as_secs());
354 assert_eq!(converted_ts.subsec_millis(), fractional);
355 }
356
357 #[test]
358 fn test_parse() {
359 let ts = HLCTimestamp::new(TEST_TS, 0, 0);
360
361 let str_ts = ts.to_string();
362 HLCTimestamp::from_str(&str_ts).expect("Parse timestamp");
363 }
364
365 #[test]
366 fn test_same_node_error() {
367 let mut ts1 = HLCTimestamp::new(TEST_TS, 0, 0);
368 let ts2 = HLCTimestamp::new(TEST_TS, 1, 0);
369
370 assert!(matches!(
371 ts1.recv(&ts2),
372 Err(TimestampError::DuplicatedNode(0)),
373 ))
374 }
375
376 #[test]
377 fn test_clock_drift_error() {
378 let drift = MAX_CLOCK_DRIFT + Duration::from_secs(1000);
379
380 let mut ts1 = HLCTimestamp::now(0, 0);
381 let ts2 = HLCTimestamp::new(ts1.datacake_timestamp() + drift, 0, 1);
382 assert!(matches!(ts1.recv(&ts2), Err(TimestampError::ClockDrift)));
383
384 let mut ts = HLCTimestamp::new(ts1.datacake_timestamp() + drift, 0, 1);
385 assert!(matches!(ts.send(), Err(TimestampError::ClockDrift)));
386 }
387
388 #[test]
389 fn test_clock_overflow_error() {
390 let mut ts1 = HLCTimestamp::now(u16::MAX, 0);
391 let ts2 = HLCTimestamp::new(ts1.datacake_timestamp(), u16::MAX, 1);
392
393 assert!(matches!(ts1.recv(&ts2), Err(TimestampError::Overflow)));
394 }
395
396 #[test]
397 fn test_timestamp_send() {
398 let mut ts1 = HLCTimestamp::now(0, 0);
399 let ts2 = ts1.send().unwrap();
400 assert_eq!(ts1.seconds(), ts2.seconds(), "Logical clock should match.");
401 assert_eq!(ts1.counter(), 1, "Counter should be incremented for ts1.");
402 assert_eq!(ts2.counter(), 1, "Counter should be incremented for ts2.");
403 }
404
405 #[test]
406 fn test_timestamp_recv() {
407 let mut ts1 = HLCTimestamp::now(0, 0);
408 let mut ts2 = HLCTimestamp::new(ts1.datacake_timestamp(), 3, 1);
409
410 let ts3 = ts1.recv(&ts2).unwrap();
411
412 assert_eq!(ts1.seconds(), ts3.seconds());
414 assert_eq!(ts1.counter(), ts3.counter());
415
416 assert_eq!(ts3.counter(), 4); let ts4 = ts2.recv(&ts1).unwrap();
419 assert_eq!(ts2.seconds(), ts4.seconds());
420 assert_eq!(ts2.counter(), ts4.counter());
421 assert_eq!(ts4.counter(), 5); assert!(ts1 < ts2);
424 assert!(ts3 < ts4);
425 }
426
427 #[test]
428 fn test_timestamp_ordering() {
429 let ts1 = HLCTimestamp::new(TEST_TS, 0, 0);
430 let ts2 = HLCTimestamp::new(TEST_TS, 1, 0);
431 let ts3 = HLCTimestamp::new(TEST_TS, 2, 0);
432 assert!(ts1 < ts2);
433 assert!(ts2 < ts3);
434
435 let ts1 = HLCTimestamp::new(TEST_TS, 0, 0);
436 let ts2 = HLCTimestamp::new(TEST_TS, 0, 1);
437 assert!(ts1 < ts2);
438
439 let ts1 = HLCTimestamp::new(TEST_TS, 0, 1);
440 let ts2 = HLCTimestamp::new(TEST_TS + Duration::from_secs(1), 0, 0);
441 assert!(ts1 < ts2);
442
443 let mut ts1 = HLCTimestamp::now(0, 1);
444 let ts2 = ts1.send().unwrap();
445 let ts3 = ts1.send().unwrap();
446 assert!(ts2 < ts3);
447
448 let mut ts1 = HLCTimestamp::now(0, 0);
449 let ts2 = HLCTimestamp::new(ts1.datacake_timestamp(), 1, 1);
450 let _ts3 = ts1.recv(&ts2).unwrap();
451 assert!(ts1 > ts2);
452 }
453}
454
455#[cfg(all(test, feature = "rkyv-support"))]
456mod rkyv_tests {
457 use super::*;
458
459 #[test]
460 fn test_serialize() {
461 let ts = HLCTimestamp::now(0, 0);
462 rkyv::to_bytes::<_, 1024>(&ts).expect("Serialize timestamp OK");
463 }
464
465 #[test]
466 fn test_deserialize() {
467 let ts = HLCTimestamp::now(0, 0);
468 let buffer = rkyv::to_bytes::<_, 1024>(&ts).expect("Serialize timestamp OK");
469
470 let new_ts: HLCTimestamp =
471 rkyv::from_bytes(&buffer).expect("Deserialize timestamp OK");
472 assert_eq!(ts, new_ts);
473 }
474}