1use core::fmt;
2use core::ops::{Add, AddAssign, Sub, SubAssign};
3use core::sync::atomic::{AtomicU64, Ordering};
4
5use crate::epoch::CustomEpochTimestamp;
6use crate::error::{HlcError, HlcResult};
7
8static PT_BITS: u8 = 42;
11
12static PT_MAX: u64 = (1 << PT_BITS) - 1;
14
15static LC_BITS: u8 = 22;
17
18static LC_MAX: u64 = (1 << LC_BITS) - 1;
20
21#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
47pub struct HlcTimestamp(u64);
48
49impl fmt::Display for HlcTimestamp {
50 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51 write!(
52 f,
53 "HlcTimestamp {{ timestamp: {}, count: {} }}",
54 self.timestamp(),
55 self.count()
56 )
57 }
58}
59
60impl TryFrom<u64> for HlcTimestamp {
61 type Error = HlcError;
62
63 fn try_from(value: u64) -> Result<Self, Self::Error> {
64 let pt = (value >> LC_BITS) & PT_MAX;
65 let lc = value & LC_MAX;
66 Self::from_parts(CustomEpochTimestamp::to_unix_timestamp(pt), lc)
67 }
68}
69
70macro_rules! impl_sub {
71 ($lhs:ty, $rhs:ty) => {
72 impl Sub<$rhs> for $lhs {
73 type Output = i64;
74
75 fn sub(self, rhs: $rhs) -> Self::Output {
76 let pt1 = ((self.0 >> LC_BITS) & PT_MAX) as i64;
77 let pt2 = ((rhs.0 >> LC_BITS) & PT_MAX) as i64;
78 pt1 - pt2
79 }
80 }
81 };
82}
83
84impl_sub!(HlcTimestamp, HlcTimestamp);
85impl_sub!(&HlcTimestamp, &HlcTimestamp);
86impl_sub!(HlcTimestamp, &HlcTimestamp);
87impl_sub!(&HlcTimestamp, HlcTimestamp);
88
89impl Sub<u64> for HlcTimestamp {
90 type Output = Self;
91
92 fn sub(self, ts: u64) -> Self::Output {
93 let (pt, lc) = self.split();
94 Self((pt.wrapping_sub(ts) << LC_BITS) | lc)
95 }
96}
97
98impl SubAssign<u64> for HlcTimestamp {
99 fn sub_assign(&mut self, ts: u64) {
100 let (pt, lc) = self.split();
101 self.0 = (pt.wrapping_sub(ts) << LC_BITS) | lc;
102 }
103}
104
105impl Add<u64> for HlcTimestamp {
106 type Output = Self;
107
108 fn add(self, ts: u64) -> Self::Output {
109 let (pt, lc) = self.split();
110 Self((pt.wrapping_add(ts) << LC_BITS) | lc)
111 }
112}
113
114impl AddAssign<u64> for HlcTimestamp {
115 fn add_assign(&mut self, ts: u64) {
116 let (pt, lc) = self.split();
117 self.0 = (pt.wrapping_add(ts) << LC_BITS) | lc;
118 }
119}
120
121impl HlcTimestamp {
122 pub fn new(unix_timestamp: i64) -> HlcResult<Self> {
124 Self::from_parts(unix_timestamp, 0)
125 }
126
127 pub fn from_parts(pt: i64, lc: u64) -> HlcResult<Self> {
130 if pt > PT_MAX as i64 {
131 return Err(HlcError::PhysicalTimeExceedsMax(pt, PT_MAX));
132 }
133 if lc > LC_MAX {
134 return Err(HlcError::LogicalClockExceedsMax(lc, LC_MAX));
135 }
136
137 let ts = CustomEpochTimestamp::from_unix_timestamp(pt)?;
139
140 let combined = (ts.millis() << LC_BITS) | lc;
141 Ok(Self(combined))
142 }
143
144 pub fn timestamp(&self) -> i64 {
146 CustomEpochTimestamp::to_unix_timestamp((self.0 >> LC_BITS) & PT_MAX)
147 }
148
149 pub fn count(&self) -> u64 {
151 self.0 & LC_MAX
152 }
153
154 pub fn parts(&self) -> (i64, u64) {
156 (self.timestamp(), self.count())
157 }
158
159 pub fn as_u64(&self) -> u64 {
161 self.0
162 }
163
164 fn split(&self) -> (u64, u64) {
166 let pt = (self.0 >> LC_BITS) & PT_MAX;
167 let lc = self.0 & LC_MAX;
168 (pt, lc)
169 }
170}
171
172#[derive(Debug)]
173pub struct HlcAtomicTimestamp(AtomicU64);
174
175impl From<HlcTimestamp> for HlcAtomicTimestamp {
176 fn from(ts: HlcTimestamp) -> Self {
177 Self(AtomicU64::new(ts.0))
178 }
179}
180
181impl HlcAtomicTimestamp {
182 pub fn update<F>(&self, new_values: F) -> HlcResult<Self>
190 where
191 F: Fn(i64, u64) -> HlcResult<(i64, u64)>,
192 {
193 loop {
194 let current = self.0.load(Ordering::Acquire);
195
196 let (pt, lc) = new_values(
198 CustomEpochTimestamp::to_unix_timestamp((current >> LC_BITS) & PT_MAX),
199 current & LC_MAX,
200 )?;
201
202 if pt > PT_MAX as i64 {
203 return Err(HlcError::PhysicalTimeExceedsMax(pt, PT_MAX));
204 }
205 if lc > LC_MAX {
206 return Err(HlcError::LogicalClockExceedsMax(lc, LC_MAX));
207 }
208
209 let ts = CustomEpochTimestamp::from_unix_timestamp(pt)?;
210 let new_combined = (ts.millis() << LC_BITS) | lc;
211
212 if self
213 .0
214 .compare_exchange(current, new_combined, Ordering::AcqRel, Ordering::Acquire)
215 .is_ok()
216 {
217 return Ok(Self(AtomicU64::new(new_combined)));
218 }
219 }
220 }
221
222 pub fn snapshot(&self) -> HlcTimestamp {
224 HlcTimestamp(self.0.load(Ordering::Acquire))
225 }
226}
227
228#[cfg(test)]
229mod tests {
230
231 use std::sync::Arc;
232
233 use chrono::Utc;
234
235 use super::*;
236 use crate::epoch::EPOCH;
237
238 #[test]
239 fn concurrent_updates_to_atomic_timestamp() {
240 let timestamp = Arc::new(HlcAtomicTimestamp(AtomicU64::new(0)));
241
242 let mut handles = vec![];
244 for t in 0..10 {
245 let timestamp_clone = Arc::clone(×tamp);
246 handles.push(std::thread::spawn(move || {
247 for i in 0..100 {
248 let _ =
249 timestamp_clone.update(move |_pt, _lc| Ok((EPOCH + t * 100 + i, 67890)));
250 }
251 }));
252 }
253 for handle in handles {
255 handle.join().unwrap();
256 }
257
258 let final_timestamp = timestamp.snapshot().timestamp();
260
261 assert!(final_timestamp >= EPOCH);
262 assert!(final_timestamp <= EPOCH + 1000);
263
264 assert!((final_timestamp + 1) % 100 == 0);
267 }
268
269 #[test]
270 #[allow(clippy::op_ref)]
271 fn arithmetics() {
272 let start = Utc::now().timestamp_millis();
273 let t1 = HlcTimestamp::from_parts(start, 123).unwrap();
274
275 let t2 = t1 + 1000;
276 assert_eq!(t2.timestamp(), start + 1000);
277 assert_eq!(t2.count(), 123);
278
279 let mut t3 = t2 - 1000;
280 assert_eq!(t3, t1);
281 assert_eq!(t3.timestamp(), start);
282 assert_eq!(t3.count(), 123);
283
284 t3 += 1000;
285 assert_eq!(t3.timestamp(), start + 1000);
286 assert_eq!(t3.count(), 123);
287
288 t3 -= 1000;
289 assert_eq!(t3, t1);
290 assert_eq!(t3.timestamp(), start);
291
292 assert_eq!(t2 - t1, 1000i64);
293 assert_eq!(t1 - t2, -1000i64);
294
295 assert_eq!(&t2 - &t1, 1000i64);
296 assert_eq!(&t1 - &t2, -1000i64);
297
298 assert_eq!(&t2 - t1, 1000i64);
299 assert_eq!(&t1 - t2, -1000i64);
300
301 assert_eq!(t2 - &t1, 1000i64);
302 assert_eq!(t1 - &t2, -1000i64);
303 }
304}