hlc_gen/
lib.rs

1#![doc = include_str!("../README.md")]
2
3mod epoch;
4pub mod error;
5mod timestamp;
6
7pub use timestamp::HlcTimestamp;
8use {
9    crate::{
10        epoch::CUSTOM_EPOCH,
11        error::{HlcError, HlcResult},
12    },
13    chrono::Utc,
14    parking_lot::RwLock,
15    std::cmp::Ordering,
16};
17
18/// Provides the current timestamp in milliseconds since the Unix epoch.
19pub trait CurrentTimestamp: Default {
20    /// Returns the current timestamp in milliseconds since the Unix epoch.
21    fn current_timestamp(&self) -> i64;
22
23    /// Sets the current timestamp in milliseconds since the Unix epoch.
24    fn set_current_timestamp(&self, timestamp: i64);
25}
26
27/// Implementation of the `CurrentTimestamp` trait using UTC.
28#[derive(Default)]
29pub struct UtcTimestamp;
30
31impl CurrentTimestamp for UtcTimestamp {
32    fn current_timestamp(&self) -> i64 {
33        Utc::now().timestamp_millis()
34    }
35
36    fn set_current_timestamp(&self, _timestamp: i64) {
37        unimplemented!("Setting current timestamp is not supported for UtcTimestamp");
38    }
39}
40
41/// Implementation of the `CurrentTimestamp` trait using a manual timestamp.
42///
43/// Useful for testing purposes.
44pub struct ManualTimestamp {
45    /// The current timestamp in milliseconds since the Unix epoch.
46    timestamp: RwLock<i64>,
47}
48
49impl Default for ManualTimestamp {
50    fn default() -> Self {
51        Self::new(CUSTOM_EPOCH)
52    }
53}
54
55impl CurrentTimestamp for ManualTimestamp {
56    fn current_timestamp(&self) -> i64 {
57        let r = self.timestamp.read();
58        *r
59    }
60
61    fn set_current_timestamp(&self, timestamp: i64) {
62        let mut w = self.timestamp.write();
63        *w = timestamp;
64    }
65}
66
67impl ManualTimestamp {
68    /// Creates a new `ManualTimestamp` with the specified timestamp.
69    pub fn new(timestamp: i64) -> Self {
70        Self {
71            timestamp: RwLock::new(timestamp),
72        }
73    }
74}
75
76/// Hybrid Logical Clock (HLC) generator.
77pub struct HlcGenerator<T: CurrentTimestamp = UtcTimestamp> {
78    /// The last timestamp generated by the clock.
79    state: HlcTimestamp,
80
81    /// The maximum drift (in milliseconds) allowed between the physical clock
82    /// and the wall-clock time.
83    max_drift: usize,
84
85    /// The timestamp provider used to get the current timestamp.
86    ts_provider: T,
87}
88
89impl Default for HlcGenerator {
90    fn default() -> Self {
91        Self::new()
92    }
93}
94
95impl<T: CurrentTimestamp> HlcGenerator<T> {
96    /// Creates a new HLC clock without any drift.
97    ///
98    /// Whenever the clock running on a single node is required, there is no
99    /// need to worry about drift, as no adjustments are made to the clock,
100    /// i.e. [`update()`](HlcGenerator::update) is never called.
101    pub fn new() -> Self {
102        Self::with_max_drift(0)
103    }
104
105    /// Creates a new HLC clock with the specified maximum drift.
106    pub fn with_max_drift(max_drift: usize) -> Self {
107        let ts_provider = T::default();
108        let state =
109            HlcTimestamp::from_parts(ts_provider.current_timestamp(), 0).unwrap_or_else(|_| {
110                HlcTimestamp::from_parts(CUSTOM_EPOCH, 0).expect("Invalid timestamp")
111            });
112        Self {
113            state,
114            max_drift,
115            ts_provider,
116        }
117    }
118
119    /// Get timestamp provider.
120    ///
121    /// Useful for testing purposes, where manual timestamps are used.
122    pub fn ts_provider(&self) -> &T {
123        &self.ts_provider
124    }
125
126    /// Current timestamp.
127    ///
128    /// Use [`next_timestamp()`](HlcGenerator::next_timestamp) to get the
129    /// timestamp for local or send events.
130    pub fn timestamp(&self) -> HlcTimestamp {
131        self.state.clone()
132    }
133
134    /// Timestamp for the local or send event.
135    pub fn next_timestamp(&self) -> Option<HlcTimestamp> {
136        let timestamp = self.ts_provider.current_timestamp();
137
138        self.state
139            .update(move |pt, lc| {
140                // Update the physical time and increment the logical count.
141                if pt >= timestamp {
142                    Ok((pt, lc + 1))
143                } else {
144                    Ok((timestamp, 0))
145                }
146            })
147            .ok()
148    }
149
150    /// Adjust the clock based on incoming timestamp.
151    ///
152    /// Usually this happens when a timestamp is received from another node.
153    /// An error may occur if drift is exceeded (if `max_drift` is set to 0,
154    /// then such a check is ignored).
155    ///
156    /// Updated timestamp is returned.
157    pub fn update(&self, incoming_state: &HlcTimestamp) -> HlcResult<HlcTimestamp> {
158        let max_drift = self.max_drift;
159        let timestamp = self.ts_provider.current_timestamp();
160
161        self.state.update(move |pt, lc| {
162            let (incoming_pt, incoming_lc) = incoming_state.parts();
163
164            // Physical clock is ahead of both the incoming timestamp and the current state.
165            if timestamp > incoming_pt && timestamp > pt {
166                // Update the clock state.
167                return Ok((timestamp, 0));
168            }
169
170            match incoming_pt.cmp(&pt) {
171                // Incoming timestamp is ahead of the current state.
172                Ordering::Greater => {
173                    // Check for drift.
174                    let drift = (incoming_pt - timestamp) as usize;
175                    if max_drift > 0 && drift > max_drift {
176                        Err(HlcError::DriftTooLarge(drift, max_drift))
177                    } else {
178                        // Remote timestamp is ahead of the current state. Update local state.
179                        Ok((incoming_pt, incoming_lc + 1))
180                    }
181                }
182                // Incoming timestamp is behind the current state.
183                Ordering::Less => {
184                    // Our timestamp is ahead of the incoming timestamp, so it remains unchanged.
185                    // We only need to update the logical count.
186                    Ok((pt, lc + 1))
187                }
188                // Timestamps are equal, so we need to use the maximum logical count for update.
189                Ordering::Equal => {
190                    // Timestamps are equal, so we need to use the maximum logical count for update.
191                    Ok((pt, lc.max(incoming_lc) + 1))
192                }
193            }
194        })
195    }
196}