hlc_gen/lib.rs
1#![doc = include_str!("../README.md")]
2
3mod epoch;
4pub mod error;
5mod timestamp;
6
7pub use timestamp::HlcTimestamp;
8use {
9 crate::{
10 epoch::CUSTOM_EPOCH,
11 error::{HlcError, HlcResult},
12 },
13 chrono::Utc,
14 parking_lot::RwLock,
15 std::cmp::Ordering,
16};
17
18/// Provides the current timestamp in milliseconds since the Unix epoch.
19pub trait CurrentTimestamp: Default {
20 /// Returns the current timestamp in milliseconds since the Unix epoch.
21 fn current_timestamp(&self) -> i64;
22
23 /// Sets the current timestamp in milliseconds since the Unix epoch.
24 fn set_current_timestamp(&self, timestamp: i64);
25}
26
27/// Implementation of the `CurrentTimestamp` trait using UTC.
28#[derive(Default)]
29pub struct UtcTimestamp;
30
31impl CurrentTimestamp for UtcTimestamp {
32 fn current_timestamp(&self) -> i64 {
33 Utc::now().timestamp_millis()
34 }
35
36 fn set_current_timestamp(&self, _timestamp: i64) {
37 unimplemented!("Setting current timestamp is not supported for UtcTimestamp");
38 }
39}
40
41/// Implementation of the `CurrentTimestamp` trait using a manual timestamp.
42///
43/// Useful for testing purposes.
44pub struct ManualTimestamp {
45 /// The current timestamp in milliseconds since the Unix epoch.
46 timestamp: RwLock<i64>,
47}
48
49impl Default for ManualTimestamp {
50 fn default() -> Self {
51 Self::new(CUSTOM_EPOCH)
52 }
53}
54
55impl CurrentTimestamp for ManualTimestamp {
56 fn current_timestamp(&self) -> i64 {
57 let r = self.timestamp.read();
58 *r
59 }
60
61 fn set_current_timestamp(&self, timestamp: i64) {
62 let mut w = self.timestamp.write();
63 *w = timestamp;
64 }
65}
66
67impl ManualTimestamp {
68 /// Creates a new `ManualTimestamp` with the specified timestamp.
69 pub fn new(timestamp: i64) -> Self {
70 Self {
71 timestamp: RwLock::new(timestamp),
72 }
73 }
74}
75
76/// Hybrid Logical Clock (HLC) generator.
77pub struct HlcGenerator<T: CurrentTimestamp = UtcTimestamp> {
78 /// The last timestamp generated by the clock.
79 state: HlcTimestamp,
80
81 /// The maximum drift (in milliseconds) allowed between the physical clock
82 /// and the wall-clock time.
83 max_drift: usize,
84
85 /// The timestamp provider used to get the current timestamp.
86 ts_provider: T,
87}
88
89impl Default for HlcGenerator {
90 fn default() -> Self {
91 Self::new()
92 }
93}
94
95impl<T: CurrentTimestamp> HlcGenerator<T> {
96 /// Creates a new HLC clock without any drift.
97 ///
98 /// Whenever the clock running on a single node is required, there is no
99 /// need to worry about drift, as no adjustments are made to the clock,
100 /// i.e. [`update()`](HlcGenerator::update) is never called.
101 pub fn new() -> Self {
102 Self::with_max_drift(0)
103 }
104
105 /// Creates a new HLC clock with the specified maximum drift.
106 pub fn with_max_drift(max_drift: usize) -> Self {
107 let ts_provider = T::default();
108 let state =
109 HlcTimestamp::from_parts(ts_provider.current_timestamp(), 0).unwrap_or_else(|_| {
110 HlcTimestamp::from_parts(CUSTOM_EPOCH, 0).expect("Invalid timestamp")
111 });
112 Self {
113 state,
114 max_drift,
115 ts_provider,
116 }
117 }
118
119 /// Get timestamp provider.
120 ///
121 /// Useful for testing purposes, where manual timestamps are used.
122 pub fn ts_provider(&self) -> &T {
123 &self.ts_provider
124 }
125
126 /// Current timestamp.
127 ///
128 /// Use [`next_timestamp()`](HlcGenerator::next_timestamp) to get the
129 /// timestamp for local or send events.
130 pub fn timestamp(&self) -> HlcTimestamp {
131 self.state.clone()
132 }
133
134 /// Timestamp for the local or send event.
135 pub fn next_timestamp(&self) -> Option<HlcTimestamp> {
136 let timestamp = self.ts_provider.current_timestamp();
137
138 self.state
139 .update(move |pt, lc| {
140 // Update the physical time and increment the logical count.
141 if pt >= timestamp {
142 Ok((pt, lc + 1))
143 } else {
144 Ok((timestamp, 0))
145 }
146 })
147 .ok()
148 }
149
150 /// Adjust the clock based on incoming timestamp.
151 ///
152 /// Usually this happens when a timestamp is received from another node.
153 /// An error may occur if drift is exceeded (if `max_drift` is set to 0,
154 /// then such a check is ignored).
155 ///
156 /// Updated timestamp is returned.
157 pub fn update(&self, incoming_state: &HlcTimestamp) -> HlcResult<HlcTimestamp> {
158 let max_drift = self.max_drift;
159 let timestamp = self.ts_provider.current_timestamp();
160
161 self.state.update(move |pt, lc| {
162 let (incoming_pt, incoming_lc) = incoming_state.parts();
163
164 // Physical clock is ahead of both the incoming timestamp and the current state.
165 if timestamp > incoming_pt && timestamp > pt {
166 // Update the clock state.
167 return Ok((timestamp, 0));
168 }
169
170 match incoming_pt.cmp(&pt) {
171 // Incoming timestamp is ahead of the current state.
172 Ordering::Greater => {
173 // Check for drift.
174 let drift = (incoming_pt - timestamp) as usize;
175 if max_drift > 0 && drift > max_drift {
176 Err(HlcError::DriftTooLarge(drift, max_drift))
177 } else {
178 // Remote timestamp is ahead of the current state. Update local state.
179 Ok((incoming_pt, incoming_lc + 1))
180 }
181 }
182 // Incoming timestamp is behind the current state.
183 Ordering::Less => {
184 // Our timestamp is ahead of the incoming timestamp, so it remains unchanged.
185 // We only need to update the logical count.
186 Ok((pt, lc + 1))
187 }
188 // Timestamps are equal, so we need to use the maximum logical count for update.
189 Ordering::Equal => {
190 // Timestamps are equal, so we need to use the maximum logical count for update.
191 Ok((pt, lc.max(incoming_lc) + 1))
192 }
193 }
194 })
195 }
196}