hlc_gen/
lib.rs

1#![doc = include_str!("../README.md")]
2#![cfg_attr(not(feature = "std"), no_std)]
3
4#[cfg(test)]
5#[macro_use]
6extern crate std;
7
8mod epoch;
9pub mod error;
10pub mod source;
11mod timestamp;
12
13use core::cmp::Ordering;
14
15use timestamp::HlcAtomicTimestamp;
16pub use timestamp::HlcTimestamp;
17
18use crate::error::{HlcError, HlcResult};
19use crate::source::{ClockSource, ManualClock, UtcClock};
20
21/// Hybrid Logical Clock (HLC) generator.
22pub struct HlcGenerator<S: ClockSource = UtcClock> {
23    /// The last timestamp generated by the clock.
24    state: HlcAtomicTimestamp,
25
26    /// The maximum drift (in milliseconds) allowed between the physical clock
27    /// and the wall-clock time.
28    max_drift: usize,
29
30    /// The timestamp provider used to get the current timestamp.
31    clock: S,
32}
33
34impl Default for HlcGenerator<UtcClock> {
35    /// Creates a new HLC clock without any drift.
36    fn default() -> Self {
37        Self::new(0)
38    }
39}
40
41impl HlcGenerator<UtcClock> {
42    /// Creates a new HLC clock with the specified maximum drift.
43    ///
44    /// Set `max_drift` to 0, if the clock is running in a single node settings
45    /// (useful for generating timestamp-based monotonically increasing
46    /// IDs). In such settings, there is no need to worry about drift, as no
47    /// adjustments are made to the clock, i.e.
48    /// [`update()`](HlcGenerator::update) is never called.
49    pub fn new(max_drift: usize) -> Self {
50        Self::with_max_drift(max_drift)
51    }
52}
53
54impl HlcGenerator<ManualClock> {
55    /// Creates a new manual HLC clock with the specified maximum drift.
56    ///
57    /// Useful for testing purposes, where manual timestamps are used.
58    pub fn manual(max_drift: usize) -> Self {
59        Self::with_max_drift(max_drift)
60    }
61
62    pub fn set_current_timestamp(&self, timestamp: i64) {
63        self.clock.set_current_timestamp(timestamp);
64    }
65}
66
67impl<S: ClockSource> HlcGenerator<S> {
68    /// Creates a new HLC clock with the specified maximum drift.
69    fn with_max_drift(max_drift: usize) -> Self {
70        let clock = S::default();
71        let state = HlcTimestamp::from_parts(clock.current_timestamp(), 0)
72            .unwrap_or_default()
73            .into();
74        Self {
75            state,
76            max_drift,
77            clock,
78        }
79    }
80
81    /// Current timestamp.
82    ///
83    /// Use [`next_timestamp()`](HlcGenerator::next_timestamp) to get the
84    /// timestamp for local or send events.
85    pub fn timestamp(&self) -> HlcTimestamp {
86        self.state.snapshot()
87    }
88
89    /// Timestamp for the local or send event.
90    pub fn next_timestamp(&self) -> Option<HlcTimestamp> {
91        let timestamp = self.clock.current_timestamp();
92
93        self.state
94            .update(move |pt, lc| {
95                // Update the physical time and increment the logical count.
96                if pt >= timestamp {
97                    Ok((pt, lc + 1))
98                } else {
99                    Ok((timestamp, 0))
100                }
101            })
102            .map(|ts| ts.snapshot())
103            .ok()
104    }
105
106    /// Adjust the clock based on incoming timestamp.
107    ///
108    /// Usually this happens when a timestamp is received from another node.
109    /// An error may occur if drift is exceeded (if `max_drift` is set to 0,
110    /// then such a check is ignored).
111    ///
112    /// Updated timestamp is returned.
113    pub fn update(&self, incoming_state: &HlcTimestamp) -> HlcResult<HlcTimestamp> {
114        let max_drift = self.max_drift;
115        let timestamp = self.clock.current_timestamp();
116
117        self.state
118            .update(move |pt, lc| {
119                let (incoming_pt, incoming_lc) = incoming_state.parts();
120
121                // Physical clock is ahead of both the incoming timestamp and the current state.
122                if timestamp > incoming_pt && timestamp > pt {
123                    // Update the clock state.
124                    return Ok((timestamp, 0));
125                }
126
127                match incoming_pt.cmp(&pt) {
128                    // Incoming timestamp is ahead of the current state.
129                    Ordering::Greater => {
130                        // Check for drift.
131                        if max_drift > 0 {
132                            let drift = usize::try_from(incoming_pt - timestamp)
133                                .map_err(|_| HlcError::OutOfRangeTimestamp)?;
134                            if drift > max_drift {
135                                return Err(HlcError::DriftTooLarge(drift, max_drift));
136                            }
137                        }
138                        // Remote timestamp is ahead of the current state. Update local state.
139                        Ok((incoming_pt, incoming_lc + 1))
140                    }
141                    // Incoming timestamp is behind the current state.
142                    Ordering::Less => {
143                        // Our timestamp is ahead of the incoming timestamp, so it remains
144                        // unchanged. We only need to update the logical
145                        // count.
146                        Ok((pt, lc + 1))
147                    }
148                    // Timestamps are equal, so we need to use the maximum logical count for update.
149                    Ordering::Equal => {
150                        // Timestamps are equal, so we need to use the maximum logical count for
151                        // update.
152                        Ok((pt, lc.max(incoming_lc) + 1))
153                    }
154                }
155            })
156            .map(|ts| ts.snapshot())
157    }
158}