hlc_gen/
timestamp.rs

1use core::fmt;
2use core::ops::{Add, AddAssign, Sub, SubAssign};
3use core::sync::atomic::{AtomicU64, Ordering};
4
5use crate::epoch::CustomEpochTimestamp;
6use crate::error::{HlcError, HlcResult};
7
8/// Number of bits to represent physical time in milliseconds since custom
9/// epoch.
10static PT_BITS: u8 = 42;
11
12/// Maximum value for physical time.
13static PT_MAX: u64 = (1 << PT_BITS) - 1;
14
15/// Number of bits to represent logical clock counter.
16static LC_BITS: u8 = 22;
17
18/// Maximum value for logical clock.
19static LC_MAX: u64 = (1 << LC_BITS) - 1;
20
21/// Hybrid logical clock (HLC) timestamp.
22///
23/// This is a wrapper around raw `u64` data of HLC atomic timestamp.
24///
25/// The timestamp is represented as a 64-bit unsigned integer. The upper 42 bits
26/// represent the physical time in milliseconds since a custom epoch, and the
27/// lower 22 bits represent the logical clock count.
28///
29/// Normally, you don't need to worry about the details of the representation.
30///
31/// Whenever you need to create a new timestamp, use the
32/// [`new()`](Self::new()) to create a timestamp with the given time,
33/// or [`from_parts()`](Self::from_parts()) to create a timestamp with a
34/// specific Unix timestamp (in ms) and logical clock count.
35///
36/// To get the physical time and logical clock count, use the
37/// [`parts()`](Self::parts()) which returns a tuple of `(pt, lc)`.
38///
39/// Alternatively, rely on [`timestamp()`](Self::timestamp()) and
40/// [`count()`](Self::count()) methods to get the physical time and logical
41/// clock count.
42///
43/// Finally, you can use the [`as_u64()`](Self::as_u64()) method to get the raw
44/// data, which is guaranteed to be monotonically increasing and capturing the
45/// happens-before relationship.
46#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
47pub struct HlcTimestamp(u64);
48
49impl fmt::Display for HlcTimestamp {
50    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51        write!(
52            f,
53            "HlcTimestamp {{ timestamp: {}, count: {} }}",
54            self.timestamp(),
55            self.count()
56        )
57    }
58}
59
60impl TryFrom<u64> for HlcTimestamp {
61    type Error = HlcError;
62
63    fn try_from(value: u64) -> Result<Self, Self::Error> {
64        let pt = (value >> LC_BITS) & PT_MAX;
65        let lc = value & LC_MAX;
66        Self::from_parts(CustomEpochTimestamp::to_unix_timestamp(pt), lc)
67    }
68}
69
70macro_rules! impl_sub {
71    ($lhs:ty, $rhs:ty) => {
72        impl Sub<$rhs> for $lhs {
73            type Output = i64;
74
75            fn sub(self, rhs: $rhs) -> Self::Output {
76                let pt1 = ((self.0 >> LC_BITS) & PT_MAX) as i64;
77                let pt2 = ((rhs.0 >> LC_BITS) & PT_MAX) as i64;
78                pt1 - pt2
79            }
80        }
81    };
82}
83
84impl_sub!(HlcTimestamp, HlcTimestamp);
85impl_sub!(&HlcTimestamp, &HlcTimestamp);
86impl_sub!(HlcTimestamp, &HlcTimestamp);
87impl_sub!(&HlcTimestamp, HlcTimestamp);
88
89impl Sub<u64> for HlcTimestamp {
90    type Output = Self;
91
92    fn sub(self, ts: u64) -> Self::Output {
93        let (pt, lc) = self.split();
94        Self((pt.wrapping_sub(ts) << LC_BITS) | lc)
95    }
96}
97
98impl SubAssign<u64> for HlcTimestamp {
99    fn sub_assign(&mut self, ts: u64) {
100        let (pt, lc) = self.split();
101        self.0 = (pt.wrapping_sub(ts) << LC_BITS) | lc;
102    }
103}
104
105impl Add<u64> for HlcTimestamp {
106    type Output = Self;
107
108    fn add(self, ts: u64) -> Self::Output {
109        let (pt, lc) = self.split();
110        Self((pt.wrapping_add(ts) << LC_BITS) | lc)
111    }
112}
113
114impl AddAssign<u64> for HlcTimestamp {
115    fn add_assign(&mut self, ts: u64) {
116        let (pt, lc) = self.split();
117        self.0 = (pt.wrapping_add(ts) << LC_BITS) | lc;
118    }
119}
120
121impl HlcTimestamp {
122    /// Creates a new HLC timestamp from incoming physical time.
123    pub fn new(unix_timestamp: i64) -> HlcResult<Self> {
124        Self::from_parts(unix_timestamp, 0)
125    }
126
127    /// Creates a new HLC timestamp from the given physical time and logical
128    /// clock count.
129    pub fn from_parts(pt: i64, lc: u64) -> HlcResult<Self> {
130        if pt > PT_MAX as i64 {
131            return Err(HlcError::PhysicalTimeExceedsMax(pt, PT_MAX));
132        }
133        if lc > LC_MAX {
134            return Err(HlcError::LogicalClockExceedsMax(lc, LC_MAX));
135        }
136
137        // Convert the physical time to milliseconds since the custom epoch.
138        let ts = CustomEpochTimestamp::from_unix_timestamp(pt)?;
139
140        let combined = (ts.millis() << LC_BITS) | lc;
141        Ok(Self(combined))
142    }
143
144    /// Unix timestamp in milliseconds.
145    pub fn timestamp(&self) -> i64 {
146        CustomEpochTimestamp::to_unix_timestamp((self.0 >> LC_BITS) & PT_MAX)
147    }
148
149    /// Logical clock count.
150    pub fn count(&self) -> u64 {
151        self.0 & LC_MAX
152    }
153
154    /// Returns the physical time and logical clock count as a tuple.
155    pub fn parts(&self) -> (i64, u64) {
156        (self.timestamp(), self.count())
157    }
158
159    /// Returns the raw `u64` value of the HLC ID.
160    pub fn as_u64(&self) -> u64 {
161        self.0
162    }
163
164    /// Returns *raw* physical time and logical clock count parts.
165    fn split(&self) -> (u64, u64) {
166        let pt = (self.0 >> LC_BITS) & PT_MAX;
167        let lc = self.0 & LC_MAX;
168        (pt, lc)
169    }
170}
171
172#[derive(Debug)]
173pub struct HlcAtomicTimestamp(AtomicU64);
174
175impl From<HlcTimestamp> for HlcAtomicTimestamp {
176    fn from(ts: HlcTimestamp) -> Self {
177        Self(AtomicU64::new(ts.0))
178    }
179}
180
181impl HlcAtomicTimestamp {
182    /// Sets the physical time and logical clock count.
183    ///
184    /// Expected closure gets the current physical time and logical clock count
185    /// at the moment of the call and must return the new values for both.
186    ///
187    /// This is an atomic operation that ensures thread safety in a lock-free
188    /// fashion. Either both values are updated or none are.
189    pub fn update<F>(&self, new_values: F) -> HlcResult<Self>
190    where
191        F: Fn(i64, u64) -> HlcResult<(i64, u64)>,
192    {
193        loop {
194            let current = self.0.load(Ordering::Acquire);
195
196            // Obtain new values for physical time and logical clock count.
197            let (pt, lc) = new_values(
198                CustomEpochTimestamp::to_unix_timestamp((current >> LC_BITS) & PT_MAX),
199                current & LC_MAX,
200            )?;
201
202            if pt > PT_MAX as i64 {
203                return Err(HlcError::PhysicalTimeExceedsMax(pt, PT_MAX));
204            }
205            if lc > LC_MAX {
206                return Err(HlcError::LogicalClockExceedsMax(lc, LC_MAX));
207            }
208
209            let ts = CustomEpochTimestamp::from_unix_timestamp(pt)?;
210            let new_combined = (ts.millis() << LC_BITS) | lc;
211
212            if self
213                .0
214                .compare_exchange(current, new_combined, Ordering::AcqRel, Ordering::Acquire)
215                .is_ok()
216            {
217                return Ok(Self(AtomicU64::new(new_combined)));
218            }
219        }
220    }
221
222    /// Creates a new HLC timestamp snapshot.
223    pub fn snapshot(&self) -> HlcTimestamp {
224        HlcTimestamp(self.0.load(Ordering::Acquire))
225    }
226}
227
228#[cfg(test)]
229mod tests {
230
231    use std::sync::Arc;
232
233    use chrono::Utc;
234
235    use super::*;
236    use crate::epoch::EPOCH;
237
238    #[test]
239    fn concurrent_updates_to_atomic_timestamp() {
240        let timestamp = Arc::new(HlcAtomicTimestamp(AtomicU64::new(0)));
241
242        // Create multiple threads to update the timestamp concurrently.
243        let mut handles = vec![];
244        for t in 0..10 {
245            let timestamp_clone = Arc::clone(&timestamp);
246            handles.push(std::thread::spawn(move || {
247                for i in 0..100 {
248                    let _ =
249                        timestamp_clone.update(move |_pt, _lc| Ok((EPOCH + t * 100 + i, 67890)));
250                }
251            }));
252        }
253        // Wait for all threads to finish.
254        for handle in handles {
255            handle.join().unwrap();
256        }
257
258        // Check that the timestamp is updated correctly.
259        let final_timestamp = timestamp.snapshot().timestamp();
260
261        assert!(final_timestamp >= EPOCH);
262        assert!(final_timestamp <= EPOCH + 1000);
263
264        // One of the threads made the last update, so possible values are in range
265        // [EPOCH, EPOCH + 1000]
266        assert!((final_timestamp + 1) % 100 == 0);
267    }
268
269    #[test]
270    #[allow(clippy::op_ref)]
271    fn arithmetics() {
272        let start = Utc::now().timestamp_millis();
273        let t1 = HlcTimestamp::from_parts(start, 123).unwrap();
274
275        let t2 = t1 + 1000;
276        assert_eq!(t2.timestamp(), start + 1000);
277        assert_eq!(t2.count(), 123);
278
279        let mut t3 = t2 - 1000;
280        assert_eq!(t3, t1);
281        assert_eq!(t3.timestamp(), start);
282        assert_eq!(t3.count(), 123);
283
284        t3 += 1000;
285        assert_eq!(t3.timestamp(), start + 1000);
286        assert_eq!(t3.count(), 123);
287
288        t3 -= 1000;
289        assert_eq!(t3, t1);
290        assert_eq!(t3.timestamp(), start);
291
292        assert_eq!(t2 - t1, 1000i64);
293        assert_eq!(t1 - t2, -1000i64);
294
295        assert_eq!(&t2 - &t1, 1000i64);
296        assert_eq!(&t1 - &t2, -1000i64);
297
298        assert_eq!(&t2 - t1, 1000i64);
299        assert_eq!(&t1 - t2, -1000i64);
300
301        assert_eq!(t2 - &t1, 1000i64);
302        assert_eq!(t1 - &t2, -1000i64);
303    }
304}