use {
crate::{
epoch::CustomEpochTimestamp,
error::{HlcError, HlcResult},
},
std::{
ops::{Add, AddAssign, Sub, SubAssign},
sync::atomic::{AtomicU64, Ordering},
},
};
static PT_BITS: u8 = 42;
static PT_MAX: u64 = (1 << PT_BITS) - 1;
static LC_BITS: u8 = 22;
static LC_MAX: u64 = (1 << LC_BITS) - 1;
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct HlcTimestamp(u64);
impl std::fmt::Display for HlcTimestamp {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"HlcTimestamp {{ timestamp: {}, count: {} }}",
self.timestamp(),
self.count()
)
}
}
impl TryFrom<u64> for HlcTimestamp {
type Error = HlcError;
fn try_from(value: u64) -> Result<Self, Self::Error> {
let pt = (value >> LC_BITS) & PT_MAX;
let lc = value & LC_MAX;
Self::from_parts(CustomEpochTimestamp::to_unix_timestamp(pt), lc)
}
}
macro_rules! impl_sub {
($lhs:ty, $rhs:ty) => {
impl Sub<$rhs> for $lhs {
type Output = i64;
fn sub(self, rhs: $rhs) -> Self::Output {
let pt1 = ((self.0 >> LC_BITS) & PT_MAX) as i64;
let pt2 = ((rhs.0 >> LC_BITS) & PT_MAX) as i64;
pt1 - pt2
}
}
};
}
impl_sub!(HlcTimestamp, HlcTimestamp);
impl_sub!(&HlcTimestamp, &HlcTimestamp);
impl_sub!(HlcTimestamp, &HlcTimestamp);
impl_sub!(&HlcTimestamp, HlcTimestamp);
impl Sub<u64> for HlcTimestamp {
type Output = Self;
fn sub(self, ts: u64) -> Self::Output {
let (pt, lc) = self.split();
Self((pt.wrapping_sub(ts) << LC_BITS) | lc)
}
}
impl SubAssign<u64> for HlcTimestamp {
fn sub_assign(&mut self, ts: u64) {
let (pt, lc) = self.split();
self.0 = (pt.wrapping_sub(ts) << LC_BITS) | lc;
}
}
impl Add<u64> for HlcTimestamp {
type Output = Self;
fn add(self, ts: u64) -> Self::Output {
let (pt, lc) = self.split();
Self((pt.wrapping_add(ts) << LC_BITS) | lc)
}
}
impl AddAssign<u64> for HlcTimestamp {
fn add_assign(&mut self, ts: u64) {
let (pt, lc) = self.split();
self.0 = (pt.wrapping_add(ts) << LC_BITS) | lc;
}
}
impl HlcTimestamp {
pub fn new(unix_timestamp: i64) -> HlcResult<Self> {
Self::from_parts(unix_timestamp, 0)
}
pub fn from_parts(pt: i64, lc: u64) -> HlcResult<Self> {
if pt > PT_MAX as i64 {
return Err(HlcError::PhysicalTimeExceedsMax(pt, PT_MAX));
}
if lc > LC_MAX {
return Err(HlcError::LogicalClockExceedsMax(lc, LC_MAX));
}
let ts = CustomEpochTimestamp::from_unix_timestamp(pt)?;
let combined = (ts.millis() << LC_BITS) | lc;
Ok(Self(combined))
}
pub fn timestamp(&self) -> i64 {
CustomEpochTimestamp::to_unix_timestamp((self.0 >> LC_BITS) & PT_MAX)
}
pub fn count(&self) -> u64 {
self.0 & LC_MAX
}
pub fn parts(&self) -> (i64, u64) {
(self.timestamp(), self.count())
}
pub fn as_u64(&self) -> u64 {
self.0
}
fn split(&self) -> (u64, u64) {
let pt = (self.0 >> LC_BITS) & PT_MAX;
let lc = self.0 & LC_MAX;
(pt, lc)
}
}
#[derive(Debug)]
pub struct HlcAtomicTimestamp(AtomicU64);
impl From<HlcTimestamp> for HlcAtomicTimestamp {
fn from(ts: HlcTimestamp) -> Self {
Self(AtomicU64::new(ts.0))
}
}
impl HlcAtomicTimestamp {
pub fn update<F>(&self, new_values: F) -> HlcResult<Self>
where
F: Fn(i64, u64) -> HlcResult<(i64, u64)>,
{
loop {
let current = self.0.load(Ordering::Acquire);
let (pt, lc) = new_values(
CustomEpochTimestamp::to_unix_timestamp((current >> LC_BITS) & PT_MAX),
current & LC_MAX,
)?;
if pt > PT_MAX as i64 {
return Err(HlcError::PhysicalTimeExceedsMax(pt, PT_MAX));
}
if lc > LC_MAX {
return Err(HlcError::LogicalClockExceedsMax(lc, LC_MAX));
}
let ts = CustomEpochTimestamp::from_unix_timestamp(pt)?;
let new_combined = (ts.millis() << LC_BITS) | lc;
if self
.0
.compare_exchange(current, new_combined, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
return Ok(Self(AtomicU64::new(new_combined)));
}
}
}
pub fn snapshot(&self) -> HlcTimestamp {
HlcTimestamp(self.0.load(Ordering::Acquire))
}
}
#[cfg(test)]
mod tests {
use {super::*, crate::epoch::EPOCH, chrono::Utc, std::sync::Arc};
#[test]
fn concurrent_updates_to_atomic_timestamp() {
let timestamp = Arc::new(HlcAtomicTimestamp(AtomicU64::new(0)));
let mut handles = vec![];
for t in 0..10 {
let timestamp_clone = Arc::clone(×tamp);
handles.push(std::thread::spawn(move || {
for i in 0..100 {
let _ =
timestamp_clone.update(move |_pt, _lc| Ok((EPOCH + t * 100 + i, 67890)));
}
}));
}
for handle in handles {
handle.join().unwrap();
}
let final_timestamp = timestamp.snapshot().timestamp();
assert!(final_timestamp >= EPOCH);
assert!(final_timestamp <= EPOCH + 1000);
assert!((final_timestamp + 1) % 100 == 0);
}
#[test]
fn arithmetics() {
let start = Utc::now().timestamp_millis();
let t1 = HlcTimestamp::from_parts(start, 123).unwrap();
let t2 = t1 + 1000;
assert_eq!(t2.timestamp(), start + 1000);
assert_eq!(t2.count(), 123);
let mut t3 = t2 - 1000;
assert_eq!(t3, t1);
assert_eq!(t3.timestamp(), start);
assert_eq!(t3.count(), 123);
t3 += 1000;
assert_eq!(t3.timestamp(), start + 1000);
assert_eq!(t3.count(), 123);
t3 -= 1000;
assert_eq!(t3, t1);
assert_eq!(t3.timestamp(), start);
assert_eq!(t2 - t1, 1000i64);
assert_eq!(t1 - t2, -1000i64);
assert_eq!(&t2 - &t1, 1000i64);
assert_eq!(&t1 - &t2, -1000i64);
assert_eq!(&t2 - t1, 1000i64);
assert_eq!(&t1 - t2, -1000i64);
assert_eq!(t2 - &t1, 1000i64);
assert_eq!(t1 - &t2, -1000i64);
}
}