hlc_gen/lib.rs
1#![doc = include_str!("../README.md")]
2
3pub mod error;
4
5use {
6 crate::error::{HlcError, HlcResult},
7 chrono::Utc,
8 parking_lot::RwLock,
9 std::{cmp::Ordering, sync::Arc},
10};
11
12/// Hybrid logical clock (HLC) timestamp.
13#[derive(Hash, Clone, PartialEq, Eq, PartialOrd, Ord, Debug)]
14pub struct HlcTimestamp {
15 /// Wall-clock time.
16 ///
17 /// Timestamp in nanoseconds since the Unix epoch.
18 pt: i64,
19
20 /// The logical clock value.
21 ///
22 /// Captures causality for events that occur at the same wall-clock time.
23 lc: u64,
24}
25
26impl Default for HlcTimestamp {
27 fn default() -> Self {
28 Self::new()
29 }
30}
31
32impl HlcTimestamp {
33 /// Creates a new HLC timestamp.
34 pub fn new() -> Self {
35 Self {
36 pt: UtcTimestamp::now(),
37 lc: 0,
38 }
39 }
40
41 /// Creates a new timestamp with the specified physical time and logical
42 /// clock count.
43 pub fn from_parts(timestamp: i64, count: u64) -> Self {
44 Self {
45 pt: timestamp,
46 lc: count,
47 }
48 }
49
50 /// Returns the wall-clock time in nanoseconds since the Unix epoch.
51 pub fn timestamp(&self) -> i64 {
52 self.pt
53 }
54
55 /// Returns the logical clock value.
56 pub fn count(&self) -> u64 {
57 self.lc
58 }
59}
60
61/// Provides the current timestamp in nanoseconds since the Unix epoch.
62pub trait CurrentTimestamp: Default {
63 /// Returns the current timestamp in nanoseconds since the Unix epoch.
64 fn current_timestamp(&self) -> Option<i64>;
65
66 /// Sets the current timestamp in nanoseconds since the Unix epoch.
67 fn set_current_timestamp(&self, timestamp: i64);
68}
69
70/// Implementation of the `CurrentTimestamp` trait using UTC.
71#[derive(Default)]
72pub struct UtcTimestamp;
73
74impl CurrentTimestamp for UtcTimestamp {
75 fn current_timestamp(&self) -> Option<i64> {
76 Utc::now().timestamp_nanos_opt()
77 }
78
79 fn set_current_timestamp(&self, _timestamp: i64) {
80 unimplemented!("Setting current timestamp is not supported for UtcTimestamp");
81 }
82}
83
84impl UtcTimestamp {
85 /// Returns the current timestamp in nanoseconds since the Unix epoch.
86 ///
87 /// This is a convenience method for getting the current timestamp without
88 /// creating an instance of `UtcTimestamp`.
89 pub fn now() -> i64 {
90 Utc::now().timestamp_nanos_opt().unwrap_or(0)
91 }
92}
93
94/// Implementation of the `CurrentTimestamp` trait using a manual timestamp.
95///
96/// Useful for testing purposes.
97#[derive(Default)]
98pub struct ManualTimestamp {
99 /// The current timestamp in nanoseconds since the Unix epoch.
100 timestamp: RwLock<i64>,
101}
102
103impl CurrentTimestamp for ManualTimestamp {
104 fn current_timestamp(&self) -> Option<i64> {
105 let r = self.timestamp.read();
106 Some(*r)
107 }
108
109 fn set_current_timestamp(&self, timestamp: i64) {
110 let mut w = self.timestamp.write();
111 *w = timestamp;
112 }
113}
114
115impl ManualTimestamp {
116 /// Creates a new `ManualTimestamp` with the specified timestamp.
117 pub fn new(timestamp: i64) -> Self {
118 Self {
119 timestamp: RwLock::new(timestamp),
120 }
121 }
122}
123
124/// Hybrid Logical Clock (HLC) generator.
125pub struct HlcGenerator<T: CurrentTimestamp = UtcTimestamp> {
126 inner: Arc<RwLock<InnerHlcClock>>,
127 ts_provider: Arc<T>,
128}
129
130struct InnerHlcClock {
131 /// The maximum drift (in nanoseconds) allowed between the physical clock
132 /// and the wall-clock time.
133 max_drift: usize,
134
135 /// The last timestamp generated by the clock.
136 state: HlcTimestamp,
137}
138
139impl Default for HlcGenerator {
140 fn default() -> Self {
141 Self::new()
142 }
143}
144
145impl<T: CurrentTimestamp> HlcGenerator<T> {
146 /// Creates a new HLC clock without any drift.
147 ///
148 /// Whenever the clock running on a single node is required, there is no
149 /// need to worry about drift, as no adjustments are made to the clock,
150 /// i.e. [`update()`](HlcGenerator::update) is never called.
151 pub fn new() -> Self {
152 Self::with_max_drift(0)
153 }
154
155 /// Creates a new HLC clock with the specified maximum drift.
156 pub fn with_max_drift(max_drift: usize) -> Self {
157 let ts_provider = Arc::new(T::default());
158 Self {
159 inner: Arc::new(RwLock::new(InnerHlcClock {
160 max_drift,
161 state: HlcTimestamp {
162 pt: ts_provider.current_timestamp().unwrap_or(0),
163 lc: 0,
164 },
165 })),
166 ts_provider,
167 }
168 }
169
170 /// Update the maximum drift.
171 pub fn set_max_drift(&self, max_drift: usize) {
172 let mut inner = self.inner.write();
173 inner.max_drift = max_drift;
174 }
175
176 /// Get timestamp provider.
177 ///
178 /// Useful for testing purposes, where manual timestamps are used.
179 pub fn ts_provider(&self) -> Arc<T> {
180 Arc::clone(&self.ts_provider)
181 }
182
183 /// Current timestamp.
184 ///
185 /// Use [`next_timestamp()`](HlcGenerator::next_timestamp) to get the
186 /// timestamp for local or send events.
187 pub fn timestamp(&self) -> HlcTimestamp {
188 let inner = self.inner.read();
189 inner.state.clone()
190 }
191
192 /// Timestamp for the local or send event.
193 pub fn next_timestamp(&self) -> Option<HlcTimestamp> {
194 let mut inner = self.inner.write();
195
196 let timestamp = self.ts_provider.current_timestamp()?;
197 if inner.state.pt >= timestamp {
198 // Known timestamp is not outdated, increment the logical count.
199 inner.state.lc += 1;
200 } else {
201 // Known timestamp is outdated, update timestamp and reset the logical count.
202 inner.state.pt = timestamp;
203 inner.state.lc = 0;
204 }
205 Some(inner.state.clone())
206 }
207
208 /// Adjust the clock based on incoming timestamp.
209 ///
210 /// Usually this happens when a timestamp is received from another node.
211 /// An error may occur if drift is exceeded (if `max_drift` is set to 0,
212 /// then such a check is ignored).
213 ///
214 /// Updated timestamp is returned.
215 pub fn update(&self, incoming_state: &HlcTimestamp) -> HlcResult<HlcTimestamp> {
216 let mut inner = self.inner.write();
217
218 let timestamp = self
219 .ts_provider
220 .current_timestamp()
221 .ok_or(HlcError::OutOfRangeTimestamp)?;
222
223 // Physical clock is ahead of both the incoming timestamp and the current state.
224 if timestamp > incoming_state.pt && timestamp > inner.state.pt {
225 // Update the clock state.
226 inner.state = HlcTimestamp {
227 pt: timestamp,
228 lc: 0,
229 };
230 return Ok(inner.state.clone());
231 }
232
233 match incoming_state.pt.cmp(&inner.state.pt) {
234 // Incoming timestamp is ahead of the current state.
235 Ordering::Greater => {
236 // Check for drift.
237 let drift = (incoming_state.pt - timestamp) as usize;
238 if inner.max_drift > 0 && drift > inner.max_drift {
239 return Err(HlcError::DriftTooLarge(drift, inner.max_drift));
240 } else {
241 // Remote timestamp is ahead of the current state. Update local state.
242 inner.state.pt = incoming_state.pt;
243 inner.state.lc = incoming_state.lc + 1;
244 }
245 }
246 // Incoming timestamp is behind the current state.
247 Ordering::Less => {
248 // Our timestamp is ahead of the incoming timestamp, so it remains unchanged.
249 // We only need to update the logical count.
250 inner.state.lc += 1;
251 }
252 // Timestamps are equal, so we need to use the maximum logical count for update.
253 Ordering::Equal => {
254 // Timestamps are equal, so we need to use the maximum logical count for update.
255 if incoming_state.lc > inner.state.lc {
256 inner.state.lc = incoming_state.lc;
257 }
258 inner.state.lc += 1;
259 }
260 };
261
262
263 Ok(inner.state.clone())
264 }
265}