use chrono::{DateTime, TimeDelta, Utc};
pub trait Clock: Send + Sync {
fn wall_now(&self) -> DateTime<Utc>;
}
pub struct SystemClock;
impl Clock for SystemClock {
fn wall_now(&self) -> DateTime<Utc> {
Utc::now()
}
}
fn align_to_epoch(timestamp: DateTime<Utc>, interval: TimeDelta) -> DateTime<Utc> {
let Some(interval_nanos) = interval.num_nanoseconds() else {
return timestamp;
};
if interval_nanos <= 0 {
return timestamp;
}
let Some(ts_nanos) = timestamp
.signed_duration_since(DateTime::<Utc>::UNIX_EPOCH)
.num_nanoseconds()
else {
return timestamp;
};
let aligned = ts_nanos.div_euclid(interval_nanos) * interval_nanos;
DateTime::<Utc>::UNIX_EPOCH + TimeDelta::nanoseconds(aligned)
}
#[derive(Debug, Clone)]
pub struct TickInfo {
pub expected_tick_time: DateTime<Utc>,
pub resynced: bool,
}
pub struct WallClockTimer<C: Clock> {
interval: TimeDelta,
next_tick: DateTime<Utc>,
clock: C,
last_wall: DateTime<Utc>,
last_monotonic: tokio::time::Instant,
}
impl<C: Clock> WallClockTimer<C> {
pub fn try_new(interval: TimeDelta, clock: C) -> Result<Self, crate::Error> {
if interval <= TimeDelta::zero() {
return Err(crate::Error::invalid_config(format!(
"interval must be positive, got {interval:?}",
)));
}
if interval.to_std().is_err() {
return Err(crate::Error::invalid_config(format!(
"interval too large for std::time::Duration: {interval:?}",
)));
}
let now = clock.wall_now();
let next_tick = align_to_epoch(now, interval) + interval;
Ok(Self {
interval,
next_tick,
clock,
last_wall: now,
last_monotonic: tokio::time::Instant::now(),
})
}
pub fn next_tick_time(&self) -> DateTime<Utc> {
self.next_tick
}
pub async fn tick(&mut self) -> TickInfo {
let threshold = self.interval;
loop {
let wall_now = self.clock.wall_now();
let monotonic_now = tokio::time::Instant::now();
let wall_elapsed = wall_now - self.last_wall;
let monotonic_elapsed = TimeDelta::from_std(
monotonic_now.duration_since(self.last_monotonic),
)
.unwrap_or_else(|_| {
tracing::warn!(
"monotonic elapsed exceeds TimeDelta range (~292 years); clamping to TimeDelta::MAX, will trigger resync",
);
TimeDelta::MAX
});
let drift = wall_elapsed - monotonic_elapsed;
if drift.abs() > threshold {
let expected = self.next_tick;
self.next_tick = align_to_epoch(wall_now, self.interval) + self.interval;
self.last_wall = wall_now;
self.last_monotonic = monotonic_now;
let drift_secs = drift.num_nanoseconds().unwrap_or(0) as f64 / 1e9;
tracing::warn!(
"wall clock jumped (drift={drift_secs:+.3}s); re-syncing, tick fires immediately",
);
return TickInfo {
expected_tick_time: expected,
resynced: true,
};
}
let to_next = self.next_tick - wall_now;
if to_next <= TimeDelta::zero() {
let expected = self.next_tick;
self.next_tick = expected + self.interval;
self.last_wall = wall_now;
self.last_monotonic = monotonic_now;
return TickInfo {
expected_tick_time: expected,
resynced: false,
};
}
let sleep_for = to_next.to_std().unwrap_or_else(|_| {
tracing::warn!(
"to_next ({to_next:?}) does not fit in std::time::Duration; sleeping for one interval and retrying",
);
self.interval
.to_std()
.unwrap_or(std::time::Duration::from_secs(1))
});
tokio::time::sleep(sleep_for).await;
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_align_to_epoch_rounds_down() {
let ts = DateTime::from_timestamp(1_000_005, 0).unwrap();
let aligned = align_to_epoch(ts, TimeDelta::try_seconds(10).unwrap());
assert_eq!(aligned.timestamp(), 1_000_000);
}
#[test]
fn test_align_to_epoch_already_aligned() {
let ts = DateTime::from_timestamp(1_000_000, 0).unwrap();
let aligned = align_to_epoch(ts, TimeDelta::try_seconds(10).unwrap());
assert_eq!(aligned, ts);
}
#[test]
fn test_align_to_epoch_sub_second() {
let ts = DateTime::from_timestamp_millis(1_000_000_750).unwrap();
let aligned = align_to_epoch(ts, TimeDelta::try_milliseconds(200).unwrap());
assert_eq!(aligned.timestamp_millis(), 1_000_000_600);
}
#[test]
fn test_align_to_epoch_rounds_down_for_pre_epoch() {
let ts = DateTime::from_timestamp(-5, 0).unwrap();
let aligned = align_to_epoch(ts, TimeDelta::try_seconds(10).unwrap());
assert_eq!(aligned.timestamp(), -10);
let ts = DateTime::from_timestamp(-3, 0).unwrap();
let aligned = align_to_epoch(ts, TimeDelta::try_seconds(5).unwrap());
assert_eq!(aligned.timestamp(), -5);
}
#[test]
fn test_align_to_epoch_unrepresentable_is_identity() {
let ts = DateTime::from_timestamp(500 * 365 * 86400, 0).unwrap();
assert_eq!(align_to_epoch(ts, TimeDelta::try_seconds(1).unwrap()), ts);
}
#[test]
fn test_align_to_epoch_zero_or_negative_interval_is_identity() {
let ts = DateTime::from_timestamp_millis(1_000_000_750).unwrap();
assert_eq!(align_to_epoch(ts, TimeDelta::zero()), ts);
assert_eq!(align_to_epoch(ts, -TimeDelta::try_seconds(1).unwrap()), ts);
}
#[tokio::test(start_paused = true)]
async fn test_try_new_rejects_non_positive_interval() {
for bad in [TimeDelta::zero(), -TimeDelta::try_milliseconds(1).unwrap()] {
let clock = crate::client::test_utils::TokioSyncedClock::new();
let err = WallClockTimer::try_new(bad, clock)
.err()
.unwrap_or_else(|| panic!("expected error for interval {bad:?}"));
assert_eq!(err.kind(), crate::ErrorKind::InvalidConfig);
}
}
#[tokio::test(start_paused = true)]
async fn test_timer_ticks_at_interval() {
let clock = crate::client::test_utils::TokioSyncedClock::new();
let interval = TimeDelta::try_milliseconds(20).unwrap();
let mut timer = WallClockTimer::try_new(interval, clock).unwrap();
let t1 = timer.tick().await.expected_tick_time;
let t2 = timer.tick().await.expected_tick_time;
let t3 = timer.tick().await.expected_tick_time;
assert_eq!(t2 - t1, interval);
assert_eq!(t3 - t2, interval);
}
#[tokio::test(start_paused = true)]
async fn test_timer_detects_forward_jump() {
let clock = crate::client::test_utils::TokioSyncedClock::new();
let interval = TimeDelta::try_milliseconds(200).unwrap();
let mut timer = WallClockTimer::try_new(interval, clock.clone()).unwrap();
let first = timer.tick().await;
assert!(!first.resynced);
clock.inject_wall_jump(TimeDelta::try_seconds(30).unwrap());
let after_jump = timer.tick().await;
assert!(after_jump.resynced, "expected resync on forward jump");
}
#[tokio::test(start_paused = true)]
async fn test_timer_detects_backward_jump() {
let clock = crate::client::test_utils::TokioSyncedClock::new();
let interval = TimeDelta::try_milliseconds(200).unwrap();
let mut timer = WallClockTimer::try_new(interval, clock.clone()).unwrap();
let _ = timer.tick().await;
clock.inject_wall_jump(-TimeDelta::try_seconds(30).unwrap());
let after_jump = timer.tick().await;
assert!(after_jump.resynced, "expected resync on backward jump");
}
#[tokio::test(start_paused = true)]
async fn test_timer_first_tick_is_aligned_to_epoch() {
let anchor = DateTime::from_timestamp_millis(1_000_000_750).unwrap();
let clock = crate::client::test_utils::TokioSyncedClock::with_wall_anchor(anchor);
let timer = WallClockTimer::try_new(TimeDelta::try_seconds(1).unwrap(), clock).unwrap();
assert_eq!(timer.next_tick_time().timestamp_millis(), 1_000_001_000);
}
#[tokio::test(start_paused = true)]
async fn test_timer_subthreshold_forward_drift_does_not_resync() {
let clock = crate::client::test_utils::TokioSyncedClock::new();
let interval = TimeDelta::try_seconds(1).unwrap();
let mut timer = WallClockTimer::try_new(interval, clock.clone()).unwrap();
let _ = timer.tick().await;
clock.inject_wall_jump(TimeDelta::try_milliseconds(500).unwrap());
let info = timer.tick().await;
assert!(!info.resynced);
}
#[tokio::test(start_paused = true)]
async fn test_timer_subthreshold_backward_drift_does_not_resync() {
let clock = crate::client::test_utils::TokioSyncedClock::new();
let interval = TimeDelta::try_seconds(1).unwrap();
let mut timer = WallClockTimer::try_new(interval, clock.clone()).unwrap();
let _ = timer.tick().await;
clock.inject_wall_jump(-TimeDelta::try_milliseconds(500).unwrap());
let info = timer.tick().await;
assert!(!info.resynced);
}
#[tokio::test(start_paused = true)]
async fn test_timer_at_exact_interval_forward_drift_does_not_resync() {
let clock = crate::client::test_utils::TokioSyncedClock::new();
let interval = TimeDelta::try_seconds(1).unwrap();
let mut timer = WallClockTimer::try_new(interval, clock.clone()).unwrap();
let _ = timer.tick().await;
clock.inject_wall_jump(interval);
let info = timer.tick().await;
assert!(
!info.resynced,
"drift of exactly 1× interval should be treated as jitter, not a jump",
);
}
#[tokio::test(start_paused = true)]
async fn test_timer_at_exact_interval_backward_drift_does_not_resync() {
let clock = crate::client::test_utils::TokioSyncedClock::new();
let interval = TimeDelta::try_seconds(1).unwrap();
let mut timer = WallClockTimer::try_new(interval, clock.clone()).unwrap();
let _ = timer.tick().await;
clock.inject_wall_jump(-interval);
let info = timer.tick().await;
assert!(
!info.resynced,
"backward drift of exactly 1× interval should be treated as jitter, not a jump",
);
}
#[tokio::test(start_paused = true)]
async fn test_timer_detects_forward_jump_just_over_interval() {
let clock = crate::client::test_utils::TokioSyncedClock::new();
let interval = TimeDelta::try_seconds(1).unwrap();
let mut timer = WallClockTimer::try_new(interval, clock.clone()).unwrap();
let _ = timer.tick().await;
clock.inject_wall_jump(TimeDelta::try_milliseconds(1500).unwrap());
let info = timer.tick().await;
assert!(
info.resynced,
"expected resync on forward jump > 1× interval"
);
}
#[tokio::test(start_paused = true)]
async fn test_timer_detects_backward_jump_just_over_interval() {
let clock = crate::client::test_utils::TokioSyncedClock::new();
let interval = TimeDelta::try_seconds(1).unwrap();
let mut timer = WallClockTimer::try_new(interval, clock.clone()).unwrap();
let _ = timer.tick().await;
clock.inject_wall_jump(-TimeDelta::try_milliseconds(1500).unwrap());
let info = timer.tick().await;
assert!(
info.resynced,
"expected resync on backward jump > 1× interval"
);
}
#[tokio::test(start_paused = true)]
async fn test_timer_resumes_normal_cadence_after_forward_jump() {
let clock = crate::client::test_utils::TokioSyncedClock::new();
let interval = TimeDelta::try_milliseconds(200).unwrap();
let mut timer = WallClockTimer::try_new(interval, clock.clone()).unwrap();
let _ = timer.tick().await;
clock.inject_wall_jump(TimeDelta::try_seconds(30).unwrap());
assert!(timer.tick().await.resynced);
let a = timer.tick().await;
let b = timer.tick().await;
assert!(!a.resynced && !b.resynced);
assert_eq!(b.expected_tick_time - a.expected_tick_time, interval);
}
#[tokio::test(start_paused = true)]
async fn test_timer_resumes_normal_cadence_after_backward_jump() {
let clock = crate::client::test_utils::TokioSyncedClock::new();
let interval = TimeDelta::try_milliseconds(200).unwrap();
let mut timer = WallClockTimer::try_new(interval, clock.clone()).unwrap();
let _ = timer.tick().await;
clock.inject_wall_jump(-TimeDelta::try_seconds(30).unwrap());
assert!(timer.tick().await.resynced);
let a = timer.tick().await;
let b = timer.tick().await;
assert!(!a.resynced && !b.resynced);
assert_eq!(b.expected_tick_time - a.expected_tick_time, interval);
}
#[tokio::test(start_paused = true)]
async fn test_timer_resync_expected_tick_time_is_prejump_schedule() {
let clock = crate::client::test_utils::TokioSyncedClock::new();
let interval = TimeDelta::try_seconds(1).unwrap();
let mut timer = WallClockTimer::try_new(interval, clock.clone()).unwrap();
let _ = timer.tick().await;
let scheduled_next = timer.next_tick_time();
clock.inject_wall_jump(TimeDelta::try_seconds(30).unwrap());
let info = timer.tick().await;
assert!(info.resynced);
assert_eq!(info.expected_tick_time, scheduled_next);
}
#[tokio::test(start_paused = true)]
async fn test_timer_does_not_resync_on_late_caller() {
let clock = crate::client::test_utils::TokioSyncedClock::new();
let interval = TimeDelta::try_milliseconds(200).unwrap();
let mut timer = WallClockTimer::try_new(interval, clock).unwrap();
let _ = timer.tick().await;
tokio::time::sleep(std::time::Duration::from_millis(700)).await;
let info = timer.tick().await;
assert!(
!info.resynced,
"late caller (no wall jump) must not look like a wall-clock jump",
);
}
#[tokio::test(start_paused = true)]
async fn test_timer_absorbs_many_subthreshold_drifts() {
let clock = crate::client::test_utils::TokioSyncedClock::new();
let interval = TimeDelta::try_seconds(1).unwrap();
let mut timer = WallClockTimer::try_new(interval, clock.clone()).unwrap();
let _ = timer.tick().await;
for _ in 0..10 {
clock.inject_wall_jump(TimeDelta::try_milliseconds(750).unwrap());
let info = timer.tick().await;
assert!(!info.resynced, "sub-threshold drift should not resync");
}
}
#[tokio::test(start_paused = true)]
async fn test_timer_next_tick_time_realigns_after_jump() {
let clock = crate::client::test_utils::TokioSyncedClock::new();
let interval = TimeDelta::try_seconds(1).unwrap();
let mut timer = WallClockTimer::try_new(interval, clock.clone()).unwrap();
let _ = timer.tick().await;
let before_next = timer.next_tick_time();
clock.inject_wall_jump(TimeDelta::try_seconds(30).unwrap());
let _ = timer.tick().await;
let after_next = timer.next_tick_time();
let shift = (after_next - before_next).num_seconds();
assert!(
(29..=31).contains(&shift),
"expected ~30s shift, got {shift}s"
);
}
}