hlc_gen/lib.rs
1#![doc = include_str!("../README.md")]
2#![cfg_attr(not(feature = "std"), no_std)]
3
4#[cfg(test)]
5#[macro_use]
6extern crate std;
7
8mod epoch;
9pub mod error;
10pub mod source;
11mod timestamp;
12
13use core::cmp::Ordering;
14
15use timestamp::HlcAtomicTimestamp;
16pub use timestamp::HlcTimestamp;
17
18use crate::error::{HlcError, HlcResult};
19use crate::source::{ClockSource, ManualClock, UtcClock};
20
21/// Hybrid Logical Clock (HLC) generator.
22pub struct HlcGenerator<S: ClockSource = UtcClock> {
23 /// The last timestamp generated by the clock.
24 state: HlcAtomicTimestamp,
25
26 /// The maximum drift (in milliseconds) allowed between the physical clock
27 /// and the wall-clock time.
28 max_drift: usize,
29
30 /// The timestamp provider used to get the current timestamp.
31 clock: S,
32}
33
34impl Default for HlcGenerator<UtcClock> {
35 /// Creates a new HLC clock without any drift.
36 fn default() -> Self {
37 Self::new(0)
38 }
39}
40
41impl HlcGenerator<UtcClock> {
42 /// Creates a new HLC clock with the specified maximum drift.
43 ///
44 /// Set `max_drift` to 0, if the clock is running in a single node settings
45 /// (useful for generating timestamp-based monotonically increasing
46 /// IDs). In such settings, there is no need to worry about drift, as no
47 /// adjustments are made to the clock, i.e.
48 /// [`update()`](HlcGenerator::update) is never called.
49 pub fn new(max_drift: usize) -> Self {
50 Self::with_max_drift(max_drift)
51 }
52}
53
54impl HlcGenerator<ManualClock> {
55 /// Creates a new manual HLC clock with the specified maximum drift.
56 ///
57 /// Useful for testing purposes, where manual timestamps are used.
58 pub fn manual(max_drift: usize) -> Self {
59 Self::with_max_drift(max_drift)
60 }
61
62 pub fn set_current_timestamp(&self, timestamp: i64) {
63 self.clock.set_current_timestamp(timestamp);
64 }
65}
66
67impl<S: ClockSource> HlcGenerator<S> {
68 /// Creates a new HLC clock with the specified maximum drift.
69 fn with_max_drift(max_drift: usize) -> Self {
70 let clock = S::default();
71 let state = HlcTimestamp::from_parts(clock.current_timestamp(), 0)
72 .unwrap_or_default()
73 .into();
74 Self {
75 state,
76 max_drift,
77 clock,
78 }
79 }
80
81 /// Current timestamp.
82 ///
83 /// Use [`next_timestamp()`](HlcGenerator::next_timestamp) to get the
84 /// timestamp for local or send events.
85 pub fn timestamp(&self) -> HlcTimestamp {
86 self.state.snapshot()
87 }
88
89 /// Timestamp for the local or send event.
90 pub fn next_timestamp(&self) -> Option<HlcTimestamp> {
91 let timestamp = self.clock.current_timestamp();
92
93 self.state
94 .update(move |pt, lc| {
95 // Update the physical time and increment the logical count.
96 if pt >= timestamp {
97 Ok((pt, lc + 1))
98 } else {
99 Ok((timestamp, 0))
100 }
101 })
102 .map(|ts| ts.snapshot())
103 .ok()
104 }
105
106 /// Adjust the clock based on incoming timestamp.
107 ///
108 /// Usually this happens when a timestamp is received from another node.
109 /// An error may occur if drift is exceeded (if `max_drift` is set to 0,
110 /// then such a check is ignored).
111 ///
112 /// Updated timestamp is returned.
113 pub fn update(&self, incoming_state: &HlcTimestamp) -> HlcResult<HlcTimestamp> {
114 let max_drift = self.max_drift;
115 let timestamp = self.clock.current_timestamp();
116
117 self.state
118 .update(move |pt, lc| {
119 let (incoming_pt, incoming_lc) = incoming_state.parts();
120
121 // Physical clock is ahead of both the incoming timestamp and the current state.
122 if timestamp > incoming_pt && timestamp > pt {
123 // Update the clock state.
124 return Ok((timestamp, 0));
125 }
126
127 match incoming_pt.cmp(&pt) {
128 // Incoming timestamp is ahead of the current state.
129 Ordering::Greater => {
130 // Check for drift.
131 if max_drift > 0 {
132 let drift = usize::try_from(incoming_pt - timestamp)
133 .map_err(|_| HlcError::OutOfRangeTimestamp)?;
134 if drift > max_drift {
135 return Err(HlcError::DriftTooLarge(drift, max_drift));
136 }
137 }
138 // Remote timestamp is ahead of the current state. Update local state.
139 Ok((incoming_pt, incoming_lc + 1))
140 }
141 // Incoming timestamp is behind the current state.
142 Ordering::Less => {
143 // Our timestamp is ahead of the incoming timestamp, so it remains
144 // unchanged. We only need to update the logical
145 // count.
146 Ok((pt, lc + 1))
147 }
148 // Timestamps are equal, so we need to use the maximum logical count for update.
149 Ordering::Equal => {
150 // Timestamps are equal, so we need to use the maximum logical count for
151 // update.
152 Ok((pt, lc.max(incoming_lc) + 1))
153 }
154 }
155 })
156 .map(|ts| ts.snapshot())
157 }
158}