use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub struct LogicalTime(pub u64);
impl LogicalTime {
pub const ZERO: LogicalTime = LogicalTime(0);
pub const fn from_nanos(nanos: u64) -> Self {
LogicalTime(nanos)
}
pub const fn from_millis(millis: u64) -> Self {
LogicalTime(millis.saturating_mul(1_000_000))
}
pub const fn as_nanos(self) -> u64 {
self.0
}
}
pub trait Clock: Send + Sync {
fn now(&self) -> LogicalTime;
}
impl<C: Clock + ?Sized> Clock for Arc<C> {
fn now(&self) -> LogicalTime {
(**self).now()
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct SystemClock;
impl Clock for SystemClock {
fn now(&self) -> LogicalTime {
let nanos = SystemTime::now().duration_since(UNIX_EPOCH).map(|d| d.as_nanos() as u64).unwrap_or(0);
LogicalTime(nanos)
}
}
#[derive(Debug, Clone, Default)]
pub struct ManualClock {
watermark: Arc<AtomicU64>,
}
impl ManualClock {
pub fn new() -> Self {
Self { watermark: Arc::new(AtomicU64::new(0)) }
}
pub fn at(start: LogicalTime) -> Self {
Self { watermark: Arc::new(AtomicU64::new(start.0)) }
}
pub fn advance_to(&self, target: LogicalTime) {
self.watermark.fetch_max(target.0, Ordering::SeqCst);
}
pub fn advance_by(&self, delta: LogicalTime) {
self.watermark.fetch_add(delta.0, Ordering::SeqCst);
}
pub fn watermark(&self) -> LogicalTime {
LogicalTime(self.watermark.load(Ordering::SeqCst))
}
}
impl Clock for ManualClock {
fn now(&self) -> LogicalTime {
self.watermark()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn manual_clock_is_monotonic() {
let c = ManualClock::new();
assert_eq!(c.watermark(), LogicalTime::ZERO);
c.advance_to(LogicalTime::from_millis(100));
assert_eq!(c.now(), LogicalTime::from_millis(100));
c.advance_to(LogicalTime::from_millis(50));
assert_eq!(c.now(), LogicalTime::from_millis(100));
c.advance_by(LogicalTime::from_millis(25));
assert_eq!(c.now(), LogicalTime::from_millis(125));
}
#[test]
fn clones_share_watermark() {
let a = ManualClock::new();
let b = a.clone();
a.advance_to(LogicalTime::from_nanos(42));
assert_eq!(b.now(), LogicalTime::from_nanos(42));
}
#[test]
fn system_clock_advances() {
let c = SystemClock;
let t1 = c.now();
assert!(t1.as_nanos() > 0);
}
#[test]
fn arc_dyn_clock_dispatches() {
let c: Arc<dyn Clock> = Arc::new(ManualClock::at(LogicalTime::from_nanos(7)));
assert_eq!(c.now(), LogicalTime::from_nanos(7));
}
}