datacake_crdt/
timestamp.rs

1use std::cmp;
2use std::fmt::{Display, Formatter};
3use std::str::FromStr;
4use std::time::{Duration, SystemTime, UNIX_EPOCH};
5
6#[cfg(feature = "rkyv-support")]
7use rkyv::{Archive, Deserialize, Serialize};
8
9/// The maximum allowed clock drift between nodes.
10pub const MAX_CLOCK_DRIFT: Duration = Duration::from_secs(4_100);
11/// The maximum timestamp value in seconds that the timestamp can support (32 bits.)
12pub const TIMESTAMP_MAX: u64 = (1 << 32) - 1;
13/// The UNIX timestamp which datacake timestamps start counting from.
14///
15/// This is essentially the `1st Jan, 2023`.
16pub const DATACAKE_EPOCH: Duration = Duration::from_secs(1672534861);
17
18#[derive(Debug, Hash, Copy, Clone, Eq, PartialEq, PartialOrd, Ord)]
19#[repr(C)]
20#[cfg_attr(feature = "rkyv", derive(Serialize, Deserialize, Archive))]
21#[cfg_attr(feature = "rkyv", archive(compare(PartialEq), check_bytes))]
22#[cfg_attr(feature = "rkyv", archive_attr(repr(C), derive(Debug)))]
23/// A HLC (Hybrid Logical Clock) timestamp implementation.
24///
25/// This implementation is largely a port of the JavaScript implementation
26/// by @jlongster as provided here: <https://github.com/jlongster/crdt-example-app>
27///
28/// The demo its self implemented the concepts talked about in this talk:
29/// <https://www.youtube.com/watch?v=DEcwa68f-jY>
30///
31/// The timestamp doubles as a lock which can be used to maintain and consistently
32/// unique and monotonic clock.
33///
34/// This internally is a packed `u64` integer and breaks down as the following:
35/// - 34 bits for timestamp (seconds)
36/// - 22 bits for counter
37/// - 8 bits for node id
38///
39/// ```
40/// use std::time::Duration;
41/// use datacake_crdt::HLCTimestamp;
42///
43/// // Let's make two clocks, but we'll refer to them as our nodes, node-a and node-b.
44/// let mut node_a = HLCTimestamp::now(0, 0);
45///
46/// // Node-b has a clock drift of 5 seconds.
47/// let mut node_b = HLCTimestamp::new(node_a.datacake_timestamp() + Duration::from_secs(5), 0, 1);
48///
49/// // Node-b sends a payload with a new timestamp which we get by calling `send()`.
50/// // this makes sure our timestamp is unique and monotonic.
51/// let timestamp = node_b.send().unwrap();
52///
53/// // Node-a gets this payload with the timestamp and so we call `recv()` on our clock.
54/// // This was node-a is also unique and monotonic.
55/// node_a.recv(&timestamp).unwrap();
56/// ```
57pub struct HLCTimestamp(u64);
58
59impl HLCTimestamp {
60    /// Create a new [HLCTimestamp].
61    ///
62    /// You probably want to use the `now(counter, node)` convenience method rather than this.
63    pub fn new(duration: Duration, counter: u16, node: u8) -> Self {
64        let seconds = duration.as_secs();
65        assert!(
66            seconds <= TIMESTAMP_MAX,
67            "Timestamp cannot go beyond the maximum capacity of 32 bits. Has 500 years elapsed?",
68        );
69
70        Self(pack(duration, counter, node))
71    }
72
73    /// Create a new [HLCTimestamp].
74    ///
75    /// This internally gets the current UNIX timestamp in seconds.
76    pub fn now(counter: u16, node: u8) -> Self {
77        let duration = get_datacake_timestamp();
78        Self::new(duration, counter, node)
79    }
80
81    #[inline]
82    /// The node ID which produced this timestamp
83    pub fn node(&self) -> u8 {
84        (self.0 & 0xFF).try_into().unwrap_or_default()
85    }
86
87    #[inline]
88    /// The counter used to keep the clock monotonic.
89    pub fn counter(&self) -> u16 {
90        ((self.0 >> 8) & 0xFFFF).try_into().unwrap_or_default()
91    }
92
93    #[inline]
94    /// The datacake timestamp as seconds.
95    ///
96    /// This is NOT a UNIX timestamp, it is from a custom point in time.
97    /// To get a UNIX timestamp use the `as_duration` method.
98    pub fn seconds(&self) -> u64 {
99        self.0 >> 32
100    }
101
102    #[inline]
103    /// The fractions of a second in the timestamp.
104    ///
105    /// E.g. 500ms is 125 fractional.
106    pub fn fractional(&self) -> u8 {
107        ((self.0 >> 24) & 0xFF).try_into().unwrap_or_default()
108    }
109
110    #[inline]
111    /// The unix timestamp as a duration.
112    ///
113    /// This is probably the method you want to use for display purposes.
114    ///
115    /// NOTE:
116    /// This adds the [DATACAKE_EPOCH] back to the duration to convert
117    /// the timestamp back to a unix epoch.
118    pub fn unix_timestamp(&self) -> Duration {
119        parts_as_duration(self.seconds(), self.fractional()) + DATACAKE_EPOCH
120    }
121
122    #[inline]
123    /// The datacake timestamp as a duration.
124    ///
125    /// NOTE:
126    /// This does not add the original [DATACAKE_EPOCH] back to the duration.
127    pub fn datacake_timestamp(&self) -> Duration {
128        parts_as_duration(self.seconds(), self.fractional())
129    }
130
131    #[inline]
132    /// The timestamp as it's raw `u64`.
133    pub fn as_u64(&self) -> u64 {
134        self.0
135    }
136
137    #[inline]
138    /// Creates a new timestamp from a given `u64`.
139    ///
140    /// WARNING:
141    ///     It is *your* responsibility that the provided value is a correctly
142    ///     packed number, otherwise your timestamp will spit out gibberish.
143    pub fn from_u64(val: u64) -> Self {
144        Self(val)
145    }
146
147    /// Timestamp send. Generates a unique, monotonic timestamp suitable
148    /// for transmission to another system.
149    pub fn send(&mut self) -> Result<Self, TimestampError> {
150        let ts = get_datacake_timestamp();
151
152        let ts_old = self.datacake_timestamp();
153        let c_old = self.counter();
154
155        // Calculate the next logical time and counter
156        // * ensure that the logical time never goes backward
157        // * increment the counter if phys time does not advance
158        let ts_new = cmp::max(ts_old, ts);
159
160        if ts_new.saturating_sub(ts) > MAX_CLOCK_DRIFT {
161            return Err(TimestampError::ClockDrift);
162        }
163
164        let c_new = if ts_old == ts_new {
165            c_old.checked_add(1).ok_or(TimestampError::Overflow)?
166        } else {
167            0
168        };
169
170        self.0 = pack(ts_new, c_new, self.node());
171
172        Ok(*self)
173    }
174
175    /// Timestamp receive. Parses and merges a timestamp from a remote
176    /// system with the local time-global uniqueness and monotonicity are
177    /// preserved.
178    pub fn recv(&mut self, msg: &Self) -> Result<Self, TimestampError> {
179        if self.node() == msg.node() {
180            return Err(TimestampError::DuplicatedNode(msg.node()));
181        }
182
183        let ts = get_datacake_timestamp();
184
185        // Unpack the message wall time/counter
186        let ts_msg = msg.datacake_timestamp();
187        let c_msg = msg.counter();
188
189        // Assert the remote clock drift
190        if ts_msg.saturating_sub(ts) > MAX_CLOCK_DRIFT {
191            return Err(TimestampError::ClockDrift);
192        }
193
194        // Unpack the clock.timestamp logical time and counter
195        let ts_old = self.datacake_timestamp();
196        let c_old = self.counter();
197
198        // Calculate the next logical time and counter.
199        // Ensure that the logical time never goes backward;
200        // * if all logical clocks are equal, increment the max counter,
201        // * if max = old > message, increment local counter,
202        // * if max = message > old, increment message counter,
203        // * otherwise, clocks are monotonic, reset counter
204
205        let ts_new = cmp::max(cmp::max(ts_old, ts), ts_msg);
206
207        if ts_new.saturating_sub(ts) > MAX_CLOCK_DRIFT {
208            return Err(TimestampError::ClockDrift);
209        }
210
211        let c_new = {
212            if ts_new == ts_old && ts_new == ts_msg {
213                cmp::max(c_old, c_msg)
214                    .checked_add(1)
215                    .ok_or(TimestampError::Overflow)?
216            } else if ts_new == ts_old {
217                c_old.checked_add(1).ok_or(TimestampError::Overflow)?
218            } else if ts_new == ts_msg {
219                c_msg.checked_add(1).ok_or(TimestampError::Overflow)?
220            } else {
221                0
222            }
223        };
224
225        self.0 = pack(ts_new, c_new, self.node());
226
227        Ok(Self::new(ts_new, self.counter(), msg.node()))
228    }
229}
230
231impl Display for HLCTimestamp {
232    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
233        write!(
234            f,
235            "{}-{:0>4}-{:0>4X}-{:0>4}",
236            self.seconds(),
237            self.fractional(),
238            self.counter(),
239            self.node()
240        )
241    }
242}
243
244impl FromStr for HLCTimestamp {
245    type Err = InvalidFormat;
246
247    fn from_str(s: &str) -> Result<Self, Self::Err> {
248        let mut splits = s.splitn(4, '-');
249
250        let seconds = splits
251            .next()
252            .and_then(|v| v.parse::<u64>().ok())
253            .ok_or(InvalidFormat)?;
254        let fractional = splits
255            .next()
256            .and_then(|v| v.parse::<u8>().ok())
257            .ok_or(InvalidFormat)?;
258        let counter = splits
259            .next()
260            .and_then(|v| u16::from_str_radix(v, 16).ok())
261            .ok_or(InvalidFormat)?;
262        let node = splits
263            .next()
264            .and_then(|v| v.parse::<u8>().ok())
265            .ok_or(InvalidFormat)?;
266
267        Ok(Self::new(
268            parts_as_duration(seconds, fractional),
269            counter,
270            node,
271        ))
272    }
273}
274
275/// Packs the given values into
276fn pack(duration: Duration, counter: u16, node: u8) -> u64 {
277    let (seconds, fractional) = duration_to_parts(duration);
278
279    let counter = counter as u64;
280    let fractional = fractional as u64;
281    let node = node as u64;
282
283    (seconds << 32) | (fractional << 24) | (counter << 8) | node
284}
285
286fn duration_to_parts(duration: Duration) -> (u64, u8) {
287    let seconds = duration.as_secs();
288    let fractional = (duration.subsec_millis() / 4) as u8;
289    (seconds, fractional)
290}
291
292fn parts_as_duration(seconds: u64, fractional: u8) -> Duration {
293    Duration::from_secs(seconds) + Duration::from_millis(fractional as u64 * 4)
294}
295
296#[derive(Debug, Copy, Clone, thiserror::Error)]
297#[error("Invalid timestamp format.")]
298/// The provided timestamp in the given string format is invalid and unable to be parsed.
299pub struct InvalidFormat;
300
301#[derive(Debug, thiserror::Error)]
302/// The clock was unable to produce a timestamp due to an error.
303pub enum TimestampError {
304    #[error("Expected a different unique node, got node with the same id. {0:?}")]
305    /// The clock tried to receive/register a timestamp that it produced.
306    DuplicatedNode(u8),
307
308    #[error("The clock drift difference is too high to be used.")]
309    /// The clock on the remote node has drifted too far ahead for the timestamp
310    /// to be considered usable. The cut off is about 1 hour, so if this is raised
311    /// the NTP system on the server should be checked.
312    ClockDrift,
313
314    #[error("The timestamp counter is beyond the capacity of a u16 integer.")]
315    /// The timestamp counter overflowed, this normally means you're creating too many
316    /// timestamps every millisecond!
317    Overflow,
318}
319
320/// Get the current time since the [UNIX_EPOCH] in milliseconds.
321pub fn get_unix_timestamp_ms() -> u64 {
322    SystemTime::now()
323        .duration_since(UNIX_EPOCH)
324        .unwrap()
325        .as_millis() as u64
326}
327
328/// Get the current time since the [DATACAKE_EPOCH] as [Duration].
329///
330/// This timestamp is ensured to be accurate taking into account the
331/// resolution lost when converting the timestamp.
332pub fn get_datacake_timestamp() -> Duration {
333    let duration = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
334
335    let (seconds, fractional) = duration_to_parts(duration - DATACAKE_EPOCH);
336    parts_as_duration(seconds, fractional)
337}
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342
343    const TEST_TS: Duration = Duration::from_secs(1);
344
345    #[test]
346    fn test_unix_timestamp_conversion() {
347        let unix_ts = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
348        let dc_ts = get_datacake_timestamp();
349        let converted_ts = dc_ts + DATACAKE_EPOCH;
350
351        let fractional = (unix_ts.subsec_millis() / 4) * 4;
352
353        assert_eq!(converted_ts.as_secs(), unix_ts.as_secs());
354        assert_eq!(converted_ts.subsec_millis(), fractional);
355    }
356
357    #[test]
358    fn test_parse() {
359        let ts = HLCTimestamp::new(TEST_TS, 0, 0);
360
361        let str_ts = ts.to_string();
362        HLCTimestamp::from_str(&str_ts).expect("Parse timestamp");
363    }
364
365    #[test]
366    fn test_same_node_error() {
367        let mut ts1 = HLCTimestamp::new(TEST_TS, 0, 0);
368        let ts2 = HLCTimestamp::new(TEST_TS, 1, 0);
369
370        assert!(matches!(
371            ts1.recv(&ts2),
372            Err(TimestampError::DuplicatedNode(0)),
373        ))
374    }
375
376    #[test]
377    fn test_clock_drift_error() {
378        let drift = MAX_CLOCK_DRIFT + Duration::from_secs(1000);
379
380        let mut ts1 = HLCTimestamp::now(0, 0);
381        let ts2 = HLCTimestamp::new(ts1.datacake_timestamp() + drift, 0, 1);
382        assert!(matches!(ts1.recv(&ts2), Err(TimestampError::ClockDrift)));
383
384        let mut ts = HLCTimestamp::new(ts1.datacake_timestamp() + drift, 0, 1);
385        assert!(matches!(ts.send(), Err(TimestampError::ClockDrift)));
386    }
387
388    #[test]
389    fn test_clock_overflow_error() {
390        let mut ts1 = HLCTimestamp::now(u16::MAX, 0);
391        let ts2 = HLCTimestamp::new(ts1.datacake_timestamp(), u16::MAX, 1);
392
393        assert!(matches!(ts1.recv(&ts2), Err(TimestampError::Overflow)));
394    }
395
396    #[test]
397    fn test_timestamp_send() {
398        let mut ts1 = HLCTimestamp::now(0, 0);
399        let ts2 = ts1.send().unwrap();
400        assert_eq!(ts1.seconds(), ts2.seconds(), "Logical clock should match.");
401        assert_eq!(ts1.counter(), 1, "Counter should be incremented for ts1.");
402        assert_eq!(ts2.counter(), 1, "Counter should be incremented for ts2.");
403    }
404
405    #[test]
406    fn test_timestamp_recv() {
407        let mut ts1 = HLCTimestamp::now(0, 0);
408        let mut ts2 = HLCTimestamp::new(ts1.datacake_timestamp(), 3, 1);
409
410        let ts3 = ts1.recv(&ts2).unwrap();
411
412        // Ts3 is just a copy of the clock itself at this point.
413        assert_eq!(ts1.seconds(), ts3.seconds());
414        assert_eq!(ts1.counter(), ts3.counter());
415
416        assert_eq!(ts3.counter(), 4); // seconds stay the same, our counter should increment.
417
418        let ts4 = ts2.recv(&ts1).unwrap();
419        assert_eq!(ts2.seconds(), ts4.seconds());
420        assert_eq!(ts2.counter(), ts4.counter());
421        assert_eq!(ts4.counter(), 5); // seconds stay the same, our counter should increment.
422
423        assert!(ts1 < ts2);
424        assert!(ts3 < ts4);
425    }
426
427    #[test]
428    fn test_timestamp_ordering() {
429        let ts1 = HLCTimestamp::new(TEST_TS, 0, 0);
430        let ts2 = HLCTimestamp::new(TEST_TS, 1, 0);
431        let ts3 = HLCTimestamp::new(TEST_TS, 2, 0);
432        assert!(ts1 < ts2);
433        assert!(ts2 < ts3);
434
435        let ts1 = HLCTimestamp::new(TEST_TS, 0, 0);
436        let ts2 = HLCTimestamp::new(TEST_TS, 0, 1);
437        assert!(ts1 < ts2);
438
439        let ts1 = HLCTimestamp::new(TEST_TS, 0, 1);
440        let ts2 = HLCTimestamp::new(TEST_TS + Duration::from_secs(1), 0, 0);
441        assert!(ts1 < ts2);
442
443        let mut ts1 = HLCTimestamp::now(0, 1);
444        let ts2 = ts1.send().unwrap();
445        let ts3 = ts1.send().unwrap();
446        assert!(ts2 < ts3);
447
448        let mut ts1 = HLCTimestamp::now(0, 0);
449        let ts2 = HLCTimestamp::new(ts1.datacake_timestamp(), 1, 1);
450        let _ts3 = ts1.recv(&ts2).unwrap();
451        assert!(ts1 > ts2);
452    }
453}
454
455#[cfg(all(test, feature = "rkyv-support"))]
456mod rkyv_tests {
457    use super::*;
458
459    #[test]
460    fn test_serialize() {
461        let ts = HLCTimestamp::now(0, 0);
462        rkyv::to_bytes::<_, 1024>(&ts).expect("Serialize timestamp OK");
463    }
464
465    #[test]
466    fn test_deserialize() {
467        let ts = HLCTimestamp::now(0, 0);
468        let buffer = rkyv::to_bytes::<_, 1024>(&ts).expect("Serialize timestamp OK");
469
470        let new_ts: HLCTimestamp =
471            rkyv::from_bytes(&buffer).expect("Deserialize timestamp OK");
472        assert_eq!(ts, new_ts);
473    }
474}