#![doc(
html_logo_url = "https://www.rust-lang.org/logos/rust-logo-128x128-blk.png",
html_favicon_url = "https://www.rust-lang.org/favicon.ico",
html_root_url = "https://atolab.github.io/uhlc-rs/"
)]
#![cfg_attr(not(feature = "std"), no_std)]
extern crate alloc;
use alloc::{format, string::String};
use core::cmp;
use core::time::Duration;
#[cfg(feature = "std")]
use {
lazy_static::lazy_static,
std::env::var,
std::sync::Mutex,
std::time::{SystemTime, UNIX_EPOCH},
};
#[cfg(not(feature = "std"))]
use spin::Mutex;
mod id;
pub use id::*;
mod ntp64;
pub use ntp64::*;
mod timestamp;
pub use timestamp::*;
pub const CSIZE: u8 = 4u8;
const CMASK: u64 = (1u64 << CSIZE) - 1u64;
const LMASK: u64 = !CMASK;
const DEFAULT_DELTA_MS: u64 = 500;
#[cfg(feature = "std")]
lazy_static! {
static ref DELTA_MS: u64 = match var("UHLC_MAX_DELTA_MS") {
Ok(s) => s.parse().unwrap_or_else(|e| panic!(
"Error parsing environment variable ${{UHLC_MAX_DELTA_MS}}={} : {}",
s, e
)),
Err(std::env::VarError::NotPresent) => DEFAULT_DELTA_MS,
Err(e) => panic!(
"Error parsing environment variable ${{UHLC_MAX_DELTA_MS}}: {}",
e
),
};
}
#[cfg(not(feature = "std"))]
static DELTA_MS: &u64 = &DEFAULT_DELTA_MS;
pub struct HLCBuilder {
hlc: HLC,
}
impl HLCBuilder {
pub fn new() -> HLCBuilder {
HLCBuilder::default()
}
pub fn with_id(mut self, id: ID) -> HLCBuilder {
self.hlc.id = id;
self
}
pub fn with_clock(mut self, clock: fn() -> NTP64) -> HLCBuilder {
self.hlc.clock = clock;
self
}
pub fn with_max_delta(mut self, delta: Duration) -> HLCBuilder {
self.hlc.delta = delta.into();
self
}
pub fn build(self) -> HLC {
self.hlc
}
}
impl Default for HLCBuilder {
fn default() -> Self {
HLCBuilder {
hlc: HLC {
id: uuid::Uuid::new_v4().into(),
#[cfg(feature = "std")]
clock: system_time_clock,
#[cfg(not(feature = "std"))]
clock: zero_clock,
delta: NTP64::from(Duration::from_millis(*DELTA_MS)),
last_time: Default::default(),
},
}
}
}
pub struct HLC {
id: ID,
clock: fn() -> NTP64,
delta: NTP64,
last_time: Mutex<NTP64>,
}
#[cfg(feature = "std")]
macro_rules! lock {
($var:expr) => {
match $var.try_lock() {
Ok(guard) => guard,
Err(_) => $var.lock().unwrap(),
}
};
}
#[cfg(not(feature = "std"))]
macro_rules! lock {
($var:expr) => {
$var.lock()
};
}
impl HLC {
pub fn new_timestamp(&self) -> Timestamp {
let mut now = (self.clock)();
now.0 &= LMASK;
let mut last_time = lock!(self.last_time);
if now.0 > (last_time.0 & LMASK) {
*last_time = now
} else {
*last_time += 1;
}
Timestamp::new(*last_time, self.id)
}
pub fn get_id(&self) -> &ID {
&self.id
}
pub fn get_delta(&self) -> &NTP64 {
&self.delta
}
pub fn update_with_timestamp(&self, timestamp: &Timestamp) -> Result<(), String> {
let mut now = (self.clock)();
now.0 &= LMASK;
let msg_time = timestamp.get_time();
if *msg_time > now && *msg_time - now > self.delta {
let err_msg = format!(
"incoming timestamp from {} exceeding delta {}ms is rejected: {} vs. now: {}",
timestamp.get_id(),
self.delta.to_duration().as_millis(),
msg_time,
now
);
#[cfg(feature = "std")]
log::warn!("{}", err_msg);
#[cfg(feature = "defmt")]
defmt::warn!("{}", err_msg);
Err(err_msg)
} else {
let mut last_time = lock!(self.last_time);
let max_time = cmp::max(cmp::max(now, *msg_time), *last_time);
if max_time == now {
*last_time = now;
} else if max_time == *msg_time {
*last_time = *msg_time + 1;
} else {
*last_time += 1;
}
Ok(())
}
}
}
impl Default for HLC {
fn default() -> Self {
HLCBuilder::default().build()
}
}
#[inline]
#[cfg(feature = "std")]
pub fn system_time_clock() -> NTP64 {
NTP64::from(SystemTime::now().duration_since(UNIX_EPOCH).unwrap())
}
#[inline]
pub fn zero_clock() -> NTP64 {
NTP64(0)
}
#[cfg(test)]
mod tests {
use crate::*;
use async_std::sync::Arc;
use async_std::task;
use core::convert::TryFrom;
use core::time::Duration;
use futures::join;
fn is_sorted(vec: &[Timestamp]) -> bool {
let mut it = vec.iter();
let mut ts = it.next().unwrap();
for next in it {
if next <= ts {
return false;
};
ts = next;
}
true
}
#[test]
fn hlc_parallel() {
use alloc::vec::Vec;
task::block_on(async {
let id0: ID = ID::try_from([0x01]).unwrap();
let id1: ID = ID::try_from([0x02]).unwrap();
let id2: ID = ID::try_from([0x03]).unwrap();
let id3: ID = ID::try_from([0x04]).unwrap();
let hlc0 = Arc::new(HLCBuilder::new().with_id(id0).build());
let hlc1 = Arc::new(HLCBuilder::new().with_id(id1).build());
let hlc2 = Arc::new(HLCBuilder::new().with_id(id2).build());
let hlc3 = Arc::new(HLCBuilder::new().with_id(id3).build());
const NB_TIME: usize = 10000;
let t0 = {
let hlc0 = hlc0.clone();
let hlc1 = hlc1.clone();
task::spawn(async move {
let mut times: Vec<Timestamp> = Vec::with_capacity(10000);
for _ in 0..NB_TIME {
let ts = hlc0.new_timestamp();
assert!(hlc1.update_with_timestamp(&ts).is_ok());
times.push(ts)
}
times
})
};
let t1 = {
let hlc1 = hlc1.clone();
let hlc2 = hlc2.clone();
task::spawn(async move {
let mut times: Vec<Timestamp> = Vec::with_capacity(10000);
for _ in 0..NB_TIME {
let ts = hlc1.new_timestamp();
assert!(hlc2.update_with_timestamp(&ts).is_ok());
times.push(ts)
}
times
})
};
let t2 = {
let hlc2 = hlc3.clone();
let hlc3 = hlc3.clone();
task::spawn(async move {
let mut times: Vec<Timestamp> = Vec::with_capacity(10000);
for _ in 0..NB_TIME {
let ts = hlc2.new_timestamp();
assert!(hlc3.update_with_timestamp(&ts).is_ok());
times.push(ts)
}
times
})
};
let t3 = {
let hlc3 = hlc3.clone();
let hlc0 = hlc0.clone();
task::spawn(async move {
let mut times: Vec<Timestamp> = Vec::with_capacity(10000);
for _ in 0..NB_TIME {
let ts = hlc3.new_timestamp();
assert!(hlc0.update_with_timestamp(&ts).is_ok());
times.push(ts)
}
times
})
};
let vecs = join!(t0, t1, t2, t3);
assert!(is_sorted(&vecs.0));
assert!(is_sorted(&vecs.1));
assert!(is_sorted(&vecs.2));
assert!(is_sorted(&vecs.3));
let mut all_times: Vec<Timestamp> = vecs
.0
.into_iter()
.chain(vecs.1.into_iter())
.chain(vecs.2.into_iter())
.chain(vecs.3.into_iter())
.collect::<Vec<Timestamp>>();
assert_eq!(NB_TIME * 4, all_times.len());
all_times.sort();
all_times.dedup();
assert_eq!(NB_TIME * 4, all_times.len());
});
}
#[test]
fn hlc_update_with_timestamp() {
let id: ID = ID::from(uuid::Uuid::new_v4());
let hlc = HLCBuilder::new().with_id(id).build();
let past_ts = Timestamp::new(Default::default(), id);
let now_ts = hlc.new_timestamp();
assert!(hlc.update_with_timestamp(&past_ts).is_ok());
assert!(hlc.new_timestamp() > now_ts);
let now_ts = hlc.new_timestamp();
let future_time = now_ts.get_time() + NTP64::from(Duration::from_millis(1000));
let future_ts = Timestamp::new(future_time, id);
assert!(hlc.update_with_timestamp(&future_ts).is_err())
}
#[test]
fn stack_sizes() {
assert_eq!(core::mem::size_of::<ID>(), 16);
assert_eq!(core::mem::size_of::<Option<ID>>(), 16);
assert_eq!(core::mem::size_of::<Timestamp>(), 24);
assert_eq!(core::mem::size_of::<Option<Timestamp>>(), 24);
}
}