use std::fmt::Debug;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::thread::JoinHandle;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use arc_swap::ArcSwap;
pub trait LogicalClock: Debug + Send + Sync {
fn now(&self) -> u64;
}
pub struct DefaultLogicalClock {
inner: Arc<DefaultLogicalClockInner>,
}
impl Drop for DefaultLogicalClock {
fn drop(&mut self) {
self.shutdown();
}
}
pub struct DefaultLogicalClockInner {
timestamp: AtomicU64,
reference: ArcSwap<(u64, Instant)>,
resync_enabled: AtomicBool,
resync_handle: Mutex<Option<JoinHandle<()>>>,
resync_interval: Duration,
}
impl Default for DefaultLogicalClock {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Debug for DefaultLogicalClock {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DefaultLogicalClock")
.field("current_timestamp", &self.current_timestamp())
.field("current_unix_ns", &Self::current_unix_ns())
.field("current_time_ns", &self.current_time_ns())
.finish()
}
}
impl DefaultLogicalClock {
pub fn new() -> Self {
let reference_unix = Self::current_unix_ns();
let reference_time = Instant::now();
let clock = Self {
inner: Arc::new(DefaultLogicalClockInner {
timestamp: AtomicU64::new(reference_unix),
reference: ArcSwap::new(Arc::new((reference_unix, reference_time))),
resync_enabled: AtomicBool::new(true),
resync_handle: Mutex::new(None),
resync_interval: Duration::from_secs(1),
}),
};
clock.spawn_clock_sync();
clock
}
#[inline]
pub fn current_timestamp(&self) -> u64 {
self.inner.timestamp.load(Ordering::Acquire)
}
#[inline]
pub(crate) fn current_unix_ns() -> u64 {
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH);
timestamp.unwrap_or_default().as_nanos() as u64
}
#[inline]
pub(crate) fn current_time_ns(&self) -> u64 {
let reference = self.inner.reference.load();
reference.0 + reference.1.elapsed().as_nanos() as u64
}
fn shutdown(&self) {
self.inner.resync_enabled.store(false, Ordering::Release);
if let Some(handle) = self.inner.resync_handle.lock().unwrap().take() {
handle.thread().unpark();
handle.join().unwrap();
}
}
fn spawn_clock_sync(&self) {
let inner = Arc::clone(&self.inner);
let interval = inner.resync_interval;
let handle = std::thread::spawn(move || {
while inner.resync_enabled.load(Ordering::Acquire) {
std::thread::park_timeout(interval);
let reference_unix = Self::current_unix_ns();
let reference_time = Instant::now();
inner.reference.store(Arc::new((reference_unix, reference_time)));
}
});
*self.inner.resync_handle.lock().unwrap() = Some(handle);
}
}
impl LogicalClock for DefaultLogicalClock {
#[inline]
fn now(&self) -> u64 {
let mut spins = 0;
loop {
let mut version = self.current_time_ns();
let current = self.inner.timestamp.load(Ordering::Acquire);
if version <= current {
version = current + 1;
}
if self
.inner
.timestamp
.compare_exchange_weak(current, version, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
return version;
}
if spins < 10 {
std::hint::spin_loop();
} else if spins < 100 {
std::thread::yield_now();
} else {
std::thread::park_timeout(Duration::from_micros(10));
}
spins += 1;
}
}
}
#[cfg(test)]
#[derive(Debug)]
pub struct MockLogicalClock {
current_tick: std::sync::atomic::AtomicI64,
}
#[cfg(test)]
impl Default for MockLogicalClock {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
impl MockLogicalClock {
pub fn new() -> Self {
Self {
current_tick: std::sync::atomic::AtomicI64::new(i64::MIN),
}
}
pub fn with_timestamp(timestamp: u64) -> Self {
Self {
current_tick: std::sync::atomic::AtomicI64::new(timestamp as i64),
}
}
pub fn set_time(&self, timestamp: u64) {
self.current_tick.store(timestamp as i64, Ordering::SeqCst);
}
}
#[cfg(test)]
impl LogicalClock for MockLogicalClock {
fn now(&self) -> u64 {
self.current_tick.load(Ordering::SeqCst) as u64
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_clock_immediate_shutdown() {
for _ in 0..5 {
let start = Instant::now();
{
let _clock = DefaultLogicalClock::new();
}
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_millis(50),
"Clock took too long to drop: {:?}. Thread may not be shutting down properly.",
elapsed
);
}
}
#[test]
fn test_default_clock_monotonicity() {
let clock = DefaultLogicalClock::new();
let mut last_ts = clock.now();
for _ in 0..100 {
let current_ts = clock.now();
assert!(current_ts >= last_ts, "Clock is not monotonic: {} < {}", current_ts, last_ts);
last_ts = current_ts;
}
}
#[test]
fn test_default_clock_progresses() {
let clock = DefaultLogicalClock::new();
let ts1 = clock.now();
std::thread::sleep(Duration::from_millis(10));
let ts2 = clock.now();
assert!(ts2 > ts1, "Clock did not progress over time: {} >= {}", ts1, ts2);
}
#[test]
fn test_default_clock_concurrent_monotonicity() {
use std::sync::Mutex as StdMutex;
let clock = Arc::new(DefaultLogicalClock::new());
let all_values = Arc::new(StdMutex::new(Vec::new()));
let mut handles = vec![];
for i in 0..10 {
let clock = Arc::clone(&clock);
let all_values = Arc::clone(&all_values);
let handle = std::thread::spawn(move || {
for _ in 0..100 {
let ts = clock.now();
all_values.lock().unwrap().push((i, ts));
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let mut values = all_values.lock().unwrap();
values.sort_by_key(|(_, ts)| *ts);
let mut last = 0;
for (thread_id, ts) in values.iter() {
assert!(
*ts > last,
"Clock not monotonic: thread {} got {}, but previous was {}",
thread_id,
ts,
last
);
last = *ts;
}
}
#[test]
fn test_default_clock_no_duplicates() {
use std::collections::HashSet;
let clock = Arc::new(DefaultLogicalClock::new());
let seen = Arc::new(Mutex::new(HashSet::new()));
let mut handles = vec![];
for _ in 0..5 {
let clock = Arc::clone(&clock);
let seen = Arc::clone(&seen);
let handle = std::thread::spawn(move || {
for _ in 0..1000 {
let ts = clock.now();
let mut seen_guard = seen.lock().unwrap();
assert!(seen_guard.insert(ts), "Duplicate timestamp: {}", ts);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
}
#[test]
fn test_default_clock_stays_close_to_wall_time() {
let clock = DefaultLogicalClock::new();
let ts1 = clock.now();
let wall1 = DefaultLogicalClock::current_unix_ns();
let diff1 = ts1.abs_diff(wall1);
assert!(diff1 < 1_000_000_000, "Clock too far from wall time: {} ns difference", diff1);
for _ in 0..10000 {
clock.now();
}
let ts2 = clock.now();
let wall2 = DefaultLogicalClock::current_unix_ns();
let diff2 = ts2.abs_diff(wall2);
assert!(
diff2 < 100_000_000,
"Clock drifted too far from wall time: {} ns difference",
diff2
);
}
}