hlc_gen/
lib.rs

1#![doc = include_str!("../README.md")]
2
3pub mod error;
4
5use {
6    crate::error::{HlcError, HlcResult},
7    chrono::Utc,
8    parking_lot::RwLock,
9    std::{cmp::Ordering, sync::Arc},
10};
11
12/// Hybrid logical clock (HLC) timestamp.
13#[derive(Hash, Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
14pub struct HlcTimestamp {
15    /// Wall-clock time.
16    ///
17    /// Timestamp in nanoseconds since the Unix epoch.
18    pt: i64,
19
20    /// The logical clock value.
21    ///
22    /// Captures causality for events that occur at the same wall-clock time.
23    lc: u64,
24}
25
26impl Default for HlcTimestamp {
27    fn default() -> Self {
28        Self::new()
29    }
30}
31
32impl HlcTimestamp {
33    /// Creates a new HLC timestamp.
34    pub fn new() -> Self {
35        Self {
36            pt: UtcTimestamp::now(),
37            lc: 0,
38        }
39    }
40
41    /// Creates a new timestamp with the specified physical time and logical
42    /// clock count.
43    pub fn from_parts(timestamp: i64, count: u64) -> Self {
44        Self {
45            pt: timestamp,
46            lc: count,
47        }
48    }
49
50    /// Returns the wall-clock time in nanoseconds since the Unix epoch.
51    pub fn timestamp(&self) -> i64 {
52        self.pt
53    }
54
55    /// Returns the logical clock value.
56    pub fn count(&self) -> u64 {
57        self.lc
58    }
59}
60
61/// Provides the current timestamp in nanoseconds since the Unix epoch.
62pub trait CurrentTimestamp: Default {
63    /// Returns the current timestamp in nanoseconds since the Unix epoch.
64    fn current_timestamp(&self) -> Option<i64>;
65
66    /// Sets the current timestamp in nanoseconds since the Unix epoch.
67    fn set_current_timestamp(&self, timestamp: i64);
68}
69
70/// Implementation of the `CurrentTimestamp` trait using UTC.
71#[derive(Default)]
72pub struct UtcTimestamp;
73
74impl CurrentTimestamp for UtcTimestamp {
75    fn current_timestamp(&self) -> Option<i64> {
76        Utc::now().timestamp_nanos_opt()
77    }
78
79    fn set_current_timestamp(&self, _timestamp: i64) {
80        unimplemented!("Setting current timestamp is not supported for UtcTimestamp");
81    }
82}
83
84impl UtcTimestamp {
85    /// Returns the current timestamp in nanoseconds since the Unix epoch.
86    ///
87    /// This is a convenience method for getting the current timestamp without
88    /// creating an instance of `UtcTimestamp`.
89    pub fn now() -> i64 {
90        Utc::now().timestamp_nanos_opt().unwrap_or(0)
91    }
92}
93
94/// Implementation of the `CurrentTimestamp` trait using a manual timestamp.
95///
96/// Useful for testing purposes.
97#[derive(Default)]
98pub struct ManualTimestamp {
99    /// The current timestamp in nanoseconds since the Unix epoch.
100    timestamp: RwLock<i64>,
101}
102
103impl CurrentTimestamp for ManualTimestamp {
104    fn current_timestamp(&self) -> Option<i64> {
105        let r = self.timestamp.read();
106        Some(*r)
107    }
108
109    fn set_current_timestamp(&self, timestamp: i64) {
110        let mut w = self.timestamp.write();
111        *w = timestamp;
112    }
113}
114
115impl ManualTimestamp {
116    /// Creates a new `ManualTimestamp` with the specified timestamp.
117    pub fn new(timestamp: i64) -> Self {
118        Self {
119            timestamp: RwLock::new(timestamp),
120        }
121    }
122}
123
124/// Hybrid Logical Clock (HLC) generator.
125pub struct HlcGenerator<T: CurrentTimestamp = UtcTimestamp> {
126    inner: Arc<RwLock<InnerHlcClock>>,
127    ts_provider: Arc<T>,
128}
129
130struct InnerHlcClock {
131    /// The maximum drift (in nanoseconds) allowed between the physical clock
132    /// and the wall-clock time.
133    max_drift: usize,
134
135    /// The last timestamp generated by the clock.
136    state: HlcTimestamp,
137}
138
139impl Default for HlcGenerator {
140    fn default() -> Self {
141        Self::new()
142    }
143}
144
145impl<T: CurrentTimestamp> HlcGenerator<T> {
146    /// Creates a new HLC clock without any drift.
147    ///
148    /// Whenever the clock running on a single node is required, there is no
149    /// need to worry about drift, as no adjustments are made to the clock,
150    /// i.e. [`update()`](HlcGenerator::update) is never called.
151    pub fn new() -> Self {
152        Self::with_max_drift(0)
153    }
154
155    /// Creates a new HLC clock with the specified maximum drift.
156    pub fn with_max_drift(max_drift: usize) -> Self {
157        let ts_provider = Arc::new(T::default());
158        Self {
159            inner: Arc::new(RwLock::new(InnerHlcClock {
160                max_drift,
161                state: HlcTimestamp {
162                    pt: ts_provider.current_timestamp().unwrap_or(0),
163                    lc: 0,
164                },
165            })),
166            ts_provider,
167        }
168    }
169
170    /// Update the maximum drift.
171    pub fn set_max_drift(&self, max_drift: usize) {
172        let mut inner = self.inner.write();
173        inner.max_drift = max_drift;
174    }
175
176    /// Get timestamp provider.
177    ///
178    /// Useful for testing purposes, where manual timestamps are used.
179    pub fn ts_provider(&self) -> Arc<T> {
180        Arc::clone(&self.ts_provider)
181    }
182
183    /// Current timestamp.
184    ///
185    /// Use [`next_timestamp()`](HlcGenerator::next_timestamp) to get the
186    /// timestamp for local or send events.
187    pub fn timestamp(&self) -> HlcTimestamp {
188        let inner = self.inner.read();
189        inner.state.clone()
190    }
191
192    /// Timestamp for the local or send event.
193    pub fn next_timestamp(&self) -> Option<HlcTimestamp> {
194        let mut inner = self.inner.write();
195
196        let timestamp = self.ts_provider.current_timestamp()?;
197        if inner.state.pt >= timestamp {
198            // Known timestamp is not outdated, increment the logical count.
199            inner.state.lc += 1;
200        } else {
201            // Known timestamp is outdated, update timestamp and reset the logical count.
202            inner.state.pt = timestamp;
203            inner.state.lc = 0;
204        }
205        Some(inner.state.clone())
206    }
207
208    /// Adjust the clock based on incoming timestamp.
209    ///
210    /// Usually this happens when a timestamp is received from another node.
211    /// An error may occur if drift is exceeded (if `max_drift` is set to 0,
212    /// then such a check is ignored).
213    ///
214    /// Updated timestamp is returned.
215    pub fn update(&self, incoming_state: &HlcTimestamp) -> HlcResult<HlcTimestamp> {
216        let mut inner = self.inner.write();
217
218        let timestamp = self
219            .ts_provider
220            .current_timestamp()
221            .ok_or(HlcError::OutOfRangeTimestamp)?;
222
223        // Physical clock is ahead of both the incoming timestamp and the current state.
224        if timestamp > incoming_state.pt && timestamp > inner.state.pt {
225            // Update the clock state.
226            inner.state = HlcTimestamp {
227                pt: timestamp,
228                lc: 0,
229            };
230            return Ok(inner.state.clone());
231        }
232
233        match incoming_state.pt.cmp(&inner.state.pt) {
234            // Incoming timestamp is ahead of the current state.
235            Ordering::Greater => {
236                // Check for drift.
237                let drift = (incoming_state.pt - timestamp) as usize;
238                if inner.max_drift > 0 && drift > inner.max_drift {
239                    return Err(HlcError::DriftTooLarge(drift, inner.max_drift));
240                } else {
241                    // Remote timestamp is ahead of the current state. Update local state.
242                    inner.state.pt = incoming_state.pt;
243                    inner.state.lc = incoming_state.lc + 1;
244                }
245            }
246            // Incoming timestamp is behind the current state.
247            Ordering::Less => {
248                // Our timestamp is ahead of the incoming timestamp, so it remains unchanged.
249                // We only need to update the logical count.
250                inner.state.lc += 1;
251            }
252            // Timestamps are equal, so we need to use the maximum logical count for update.
253            Ordering::Equal => {
254                // Timestamps are equal, so we need to use the maximum logical count for update.
255                if incoming_state.lc > inner.state.lc {
256                    inner.state.lc = incoming_state.lc;
257                }
258                inner.state.lc += 1;
259            }
260        };
261
262
263        Ok(inner.state.clone())
264    }
265}