use crate::types::Time;
use parking_lot::Mutex;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::task::Waker;
use std::time::Duration;
use super::wheel::{TimerWheel, WakerBatch};
#[inline]
fn duration_to_nanos_saturating(duration: Duration) -> u64 {
duration.as_nanos().min(u128::from(u64::MAX)) as u64
}
pub trait TimeSource: Send + Sync {
fn now(&self) -> Time;
}
#[derive(Debug)]
pub struct WallClock {
epoch: std::time::Instant,
}
impl WallClock {
#[must_use]
pub fn new() -> Self {
Self {
epoch: std::time::Instant::now(),
}
}
}
impl Default for WallClock {
fn default() -> Self {
Self::new()
}
}
impl TimeSource for WallClock {
fn now(&self) -> Time {
let elapsed = self.epoch.elapsed();
Time::from_nanos(duration_to_nanos_saturating(elapsed))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct BrowserClockConfig {
pub max_forward_step: Duration,
pub jitter_floor: Duration,
}
impl Default for BrowserClockConfig {
fn default() -> Self {
Self {
max_forward_step: Duration::from_millis(250),
jitter_floor: Duration::from_millis(1),
}
}
}
#[derive(Debug)]
pub struct BrowserMonotonicClock {
now: AtomicU64,
paused: AtomicBool,
paused_at: AtomicU64,
has_host_sample: AtomicBool,
last_host_sample: AtomicU64,
pending_catch_up_ns: AtomicU64,
max_forward_step_ns: u64,
jitter_floor_ns: u64,
}
impl BrowserMonotonicClock {
#[must_use]
pub fn new(config: BrowserClockConfig) -> Self {
Self {
now: AtomicU64::new(0),
paused: AtomicBool::new(false),
paused_at: AtomicU64::new(0),
has_host_sample: AtomicBool::new(false),
last_host_sample: AtomicU64::new(0),
pending_catch_up_ns: AtomicU64::new(0),
max_forward_step_ns: duration_to_nanos_saturating(config.max_forward_step),
jitter_floor_ns: duration_to_nanos_saturating(config.jitter_floor),
}
}
pub fn observe_host_time(&self, host: Duration) -> Time {
let host_ns = duration_to_nanos_saturating(host);
self.observe_host_nanos(host_ns)
}
pub fn observe_host_nanos(&self, host_ns: u64) -> Time {
if self.paused.load(Ordering::Acquire) {
return Time::from_nanos(self.paused_at.load(Ordering::Acquire));
}
if self
.has_host_sample
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
self.last_host_sample.store(host_ns, Ordering::Release);
return self.now();
}
let previous_host = self.last_host_sample.fetch_max(host_ns, Ordering::AcqRel);
let host_delta = host_ns.saturating_sub(previous_host);
let mut current_pending = self.pending_catch_up_ns.load(Ordering::Acquire);
loop {
let combined_delta = host_delta.saturating_add(current_pending);
if combined_delta < self.jitter_floor_ns {
match self.pending_catch_up_ns.compare_exchange_weak(
current_pending,
combined_delta,
Ordering::Release,
Ordering::Acquire,
) {
Ok(_) => return self.now(),
Err(actual) => {
current_pending = actual;
continue;
}
}
}
let applied = if self.max_forward_step_ns == 0 {
combined_delta
} else {
combined_delta.min(self.max_forward_step_ns)
};
let new_pending = combined_delta.saturating_sub(applied);
match self.pending_catch_up_ns.compare_exchange_weak(
current_pending,
new_pending,
Ordering::Release,
Ordering::Acquire,
) {
Ok(_) => return self.advance_now(applied),
Err(actual) => {
current_pending = actual;
}
}
}
}
pub fn suspend(&self) {
let now = self.now.load(Ordering::Acquire);
self.paused_at.store(now, Ordering::Release);
self.paused.store(true, Ordering::Release);
}
pub fn resume(&self) {
self.paused.store(false, Ordering::Release);
self.has_host_sample.store(false, Ordering::Release);
self.pending_catch_up_ns.store(0, Ordering::Release);
}
#[must_use]
#[inline]
pub fn is_suspended(&self) -> bool {
self.paused.load(Ordering::Acquire)
}
#[must_use]
#[inline]
pub fn pending_catch_up(&self) -> Duration {
Duration::from_nanos(self.pending_catch_up_ns.load(Ordering::Acquire))
}
fn advance_now(&self, delta: u64) -> Time {
if delta == 0 {
return self.now();
}
let mut current = self.now.load(Ordering::Acquire);
loop {
let next = current.saturating_add(delta);
match self
.now
.compare_exchange_weak(current, next, Ordering::AcqRel, Ordering::Acquire)
{
Ok(_) => return Time::from_nanos(next),
Err(actual) => current = actual,
}
}
}
}
impl Default for BrowserMonotonicClock {
fn default() -> Self {
Self::new(BrowserClockConfig::default())
}
}
impl TimeSource for BrowserMonotonicClock {
fn now(&self) -> Time {
if self.paused.load(Ordering::Acquire) {
Time::from_nanos(self.paused_at.load(Ordering::Acquire))
} else {
Time::from_nanos(self.now.load(Ordering::Acquire))
}
}
}
#[derive(Debug)]
pub struct VirtualClock {
now: AtomicU64,
paused: AtomicBool,
frozen_at: AtomicU64,
}
impl VirtualClock {
#[must_use]
pub fn new() -> Self {
Self {
now: AtomicU64::new(0),
paused: AtomicBool::new(false),
frozen_at: AtomicU64::new(0),
}
}
#[must_use]
pub fn starting_at(time: Time) -> Self {
Self {
now: AtomicU64::new(time.as_nanos()),
paused: AtomicBool::new(false),
frozen_at: AtomicU64::new(time.as_nanos()),
}
}
pub fn advance(&self, nanos: u64) {
if !self.paused.load(Ordering::Acquire) {
let mut current = self.now.load(Ordering::Acquire);
loop {
let next = current.saturating_add(nanos);
match self.now.compare_exchange_weak(
current,
next,
Ordering::Release,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => current = actual,
}
}
}
}
pub fn advance_to(&self, time: Time) {
if self.paused.load(Ordering::Acquire) {
return;
}
let target = time.as_nanos();
let mut current = self.now.load(Ordering::Acquire);
while current < target {
match self.now.compare_exchange_weak(
current,
target,
Ordering::Release,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => current = actual,
}
}
}
pub fn set(&self, time: Time) {
let nanos = time.as_nanos();
self.now.store(nanos, Ordering::Release);
if self.paused.load(Ordering::Acquire) {
self.frozen_at.store(nanos, Ordering::Release);
}
}
pub fn pause(&self) {
let current = self.now.load(Ordering::Acquire);
self.frozen_at.store(current, Ordering::Release);
self.paused.store(true, Ordering::Release);
}
pub fn resume(&self) {
self.paused.store(false, Ordering::Release);
}
#[must_use]
#[inline]
pub fn is_paused(&self) -> bool {
self.paused.load(Ordering::Acquire)
}
}
impl Default for VirtualClock {
fn default() -> Self {
Self::new()
}
}
impl TimeSource for VirtualClock {
fn now(&self) -> Time {
if self.paused.load(Ordering::Acquire) {
Time::from_nanos(self.frozen_at.load(Ordering::Acquire))
} else {
Time::from_nanos(self.now.load(Ordering::Acquire))
}
}
}
pub use super::wheel::TimerHandle;
#[derive(Debug)]
pub struct TimerDriver<T: TimeSource = VirtualClock> {
clock: std::sync::Arc<T>,
wheel: Mutex<TimerWheel>,
}
impl<T: TimeSource> TimerDriver<T> {
#[must_use]
pub fn with_clock(clock: std::sync::Arc<T>) -> Self {
let now = clock.now();
Self {
clock,
wheel: Mutex::new(TimerWheel::new_at(now)),
}
}
#[inline]
#[must_use]
pub fn now(&self) -> Time {
self.clock.now()
}
pub fn register(&self, deadline: Time, waker: Waker) -> TimerHandle {
let mut wheel = self.wheel.lock();
let now = self.clock.now();
wheel.synchronize(now);
wheel.register(deadline, waker)
}
pub fn update(&self, handle: &TimerHandle, deadline: Time, waker: Waker) -> TimerHandle {
let mut wheel = self.wheel.lock();
let now = self.clock.now();
wheel.synchronize(now);
if wheel.cancel(handle) {
wheel.register(deadline, waker)
} else {
*handle
}
}
pub fn cancel(&self, handle: &TimerHandle) -> bool {
self.wheel.lock().cancel(handle)
}
#[must_use]
pub fn next_deadline(&self) -> Option<Time> {
let mut wheel = self.wheel.lock();
let now = self.clock.now();
wheel.synchronize(now);
wheel
.next_deadline()
.map(|deadline| if deadline < now { now } else { deadline })
}
pub fn process_timers(&self) -> usize {
let now = self.clock.now();
let expired_wakers = self.collect_expired(now);
let fired = expired_wakers.len();
for waker in expired_wakers {
let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| waker.wake()));
}
fired
}
#[inline]
#[allow(clippy::significant_drop_tightening)]
fn collect_expired(&self, now: Time) -> WakerBatch {
self.wheel.lock().collect_expired(now)
}
#[inline]
#[must_use]
pub fn pending_count(&self) -> usize {
self.wheel.lock().len()
}
#[inline]
#[must_use]
pub fn is_empty(&self) -> bool {
self.wheel.lock().is_empty()
}
pub fn clear(&self) {
self.wheel.lock().clear();
}
}
impl TimerDriver<VirtualClock> {
#[must_use]
pub fn new() -> Self {
Self::with_clock(std::sync::Arc::new(VirtualClock::new()))
}
}
impl Default for TimerDriver<VirtualClock> {
fn default() -> Self {
Self::new()
}
}
pub trait TimerDriverApi: Send + Sync + std::fmt::Debug {
fn now(&self) -> Time;
fn register(&self, deadline: Time, waker: Waker) -> TimerHandle;
fn update(&self, handle: &TimerHandle, deadline: Time, waker: Waker) -> TimerHandle;
fn cancel(&self, handle: &TimerHandle) -> bool;
fn next_deadline(&self) -> Option<Time>;
fn process_timers(&self) -> usize;
fn pending_count(&self) -> usize;
fn is_empty(&self) -> bool;
}
impl<T: TimeSource + std::fmt::Debug + 'static> TimerDriverApi for TimerDriver<T> {
fn now(&self) -> Time {
Self::now(self)
}
fn register(&self, deadline: Time, waker: Waker) -> TimerHandle {
Self::register(self, deadline, waker)
}
fn update(&self, handle: &TimerHandle, deadline: Time, waker: Waker) -> TimerHandle {
Self::update(self, handle, deadline, waker)
}
fn cancel(&self, handle: &TimerHandle) -> bool {
Self::cancel(self, handle)
}
fn next_deadline(&self) -> Option<Time> {
Self::next_deadline(self)
}
fn process_timers(&self) -> usize {
Self::process_timers(self)
}
fn pending_count(&self) -> usize {
Self::pending_count(self)
}
fn is_empty(&self) -> bool {
Self::is_empty(self)
}
}
#[derive(Clone)]
pub struct TimerDriverHandle {
inner: Arc<dyn TimerDriverApi>,
}
impl std::fmt::Debug for TimerDriverHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TimerDriverHandle")
.field("pending_count", &self.inner.pending_count())
.finish()
}
}
impl TimerDriverHandle {
#[inline]
pub fn new<T: TimeSource + std::fmt::Debug + 'static>(driver: Arc<TimerDriver<T>>) -> Self {
Self { inner: driver }
}
#[inline]
pub(crate) fn ptr_eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.inner, &other.inner)
}
#[must_use]
pub fn with_wall_clock() -> Self {
let clock = Arc::new(WallClock::new());
let driver = Arc::new(TimerDriver::with_clock(clock));
Self::new(driver)
}
#[must_use]
pub fn with_virtual_clock(clock: Arc<VirtualClock>) -> Self {
let driver = Arc::new(TimerDriver::with_clock(clock));
Self::new(driver)
}
#[must_use]
pub fn with_browser_clock(clock: Arc<BrowserMonotonicClock>) -> Self {
let driver = Arc::new(TimerDriver::with_clock(clock));
Self::new(driver)
}
#[inline]
#[must_use]
pub fn now(&self) -> Time {
self.inner.now()
}
#[inline]
#[must_use]
pub fn register(&self, deadline: Time, waker: Waker) -> TimerHandle {
self.inner.register(deadline, waker)
}
#[inline]
#[must_use]
pub fn update(&self, handle: &TimerHandle, deadline: Time, waker: Waker) -> TimerHandle {
self.inner.update(handle, deadline, waker)
}
#[inline]
#[must_use]
pub fn cancel(&self, handle: &TimerHandle) -> bool {
self.inner.cancel(handle)
}
#[inline]
#[must_use]
pub fn next_deadline(&self) -> Option<Time> {
self.inner.next_deadline()
}
#[inline]
#[must_use]
pub fn process_timers(&self) -> usize {
self.inner.process_timers()
}
#[inline]
#[must_use]
pub fn pending_count(&self) -> usize {
self.inner.pending_count()
}
#[inline]
#[must_use]
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::time::{CoalescingConfig, TimerWheelConfig};
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
fn init_test(name: &str) {
crate::test_utils::init_test_logging();
crate::test_phase!(name);
}
#[test]
fn virtual_clock_starts_at_zero() {
init_test("virtual_clock_starts_at_zero");
let clock = VirtualClock::new();
let now = clock.now();
crate::assert_with_log!(now == Time::ZERO, "clock starts at zero", Time::ZERO, now);
crate::test_complete!("virtual_clock_starts_at_zero");
}
#[test]
fn virtual_clock_starting_at() {
init_test("virtual_clock_starting_at");
let clock = VirtualClock::starting_at(Time::from_secs(10));
let now = clock.now();
crate::assert_with_log!(
now == Time::from_secs(10),
"clock starts at 10s",
Time::from_secs(10),
now
);
crate::test_complete!("virtual_clock_starting_at");
}
#[test]
fn virtual_clock_advance() {
init_test("virtual_clock_advance");
let clock = VirtualClock::new();
clock.advance(1_000_000_000); let now = clock.now();
crate::assert_with_log!(
now == Time::from_secs(1),
"advance 1s",
Time::from_secs(1),
now
);
clock.advance(500_000_000); let nanos = clock.now().as_nanos();
crate::assert_with_log!(nanos == 1_500_000_000, "advance 0.5s", 1_500_000_000, nanos);
crate::test_complete!("virtual_clock_advance");
}
#[test]
fn virtual_clock_advance_saturates_at_time_max() {
init_test("virtual_clock_advance_saturates_at_time_max");
let clock = VirtualClock::starting_at(Time::from_nanos(u64::MAX - 5));
clock.advance(10);
let now = clock.now();
crate::assert_with_log!(
now == Time::MAX,
"advance saturates at Time::MAX",
Time::MAX,
now
);
crate::test_complete!("virtual_clock_advance_saturates_at_time_max");
}
#[test]
fn virtual_clock_advance_to() {
init_test("virtual_clock_advance_to");
let clock = VirtualClock::new();
clock.advance_to(Time::from_secs(5));
let now = clock.now();
crate::assert_with_log!(
now == Time::from_secs(5),
"advance_to 5s",
Time::from_secs(5),
now
);
clock.advance_to(Time::from_secs(3));
let now_after = clock.now();
crate::assert_with_log!(
now_after == Time::from_secs(5),
"advance_to past is no-op",
Time::from_secs(5),
now_after
);
crate::test_complete!("virtual_clock_advance_to");
}
#[test]
fn virtual_clock_set() {
init_test("virtual_clock_set");
let clock = VirtualClock::new();
clock.set(Time::from_secs(100));
let now = clock.now();
crate::assert_with_log!(
now == Time::from_secs(100),
"set to 100s",
Time::from_secs(100),
now
);
clock.set(Time::from_secs(50));
let now_back = clock.now();
crate::assert_with_log!(
now_back == Time::from_secs(50),
"set backwards to 50s",
Time::from_secs(50),
now_back
);
crate::test_complete!("virtual_clock_set");
}
#[test]
fn virtual_clock_set_updates_frozen_time_while_paused() {
init_test("virtual_clock_set_updates_frozen_time_while_paused");
let clock = VirtualClock::starting_at(Time::from_secs(1));
clock.pause();
clock.set(Time::from_secs(9));
let now = clock.now();
crate::assert_with_log!(
now == Time::from_secs(9),
"set updates the frozen clock view while paused",
Time::from_secs(9),
now
);
clock.resume();
let resumed_now = clock.now();
crate::assert_with_log!(
resumed_now == Time::from_secs(9),
"resume continues from the explicitly set time",
Time::from_secs(9),
resumed_now
);
crate::test_complete!("virtual_clock_set_updates_frozen_time_while_paused");
}
#[test]
fn virtual_clock_pause_freezes_time() {
init_test("virtual_clock_pause_freezes_time");
let clock = VirtualClock::new();
clock.advance(1_000_000_000); clock.pause();
let paused = clock.is_paused();
crate::assert_with_log!(paused, "is_paused", true, paused);
let frozen = clock.now();
crate::assert_with_log!(
frozen == Time::from_secs(1),
"frozen at 1s",
Time::from_secs(1),
frozen
);
clock.advance(5_000_000_000);
let still_frozen = clock.now();
crate::assert_with_log!(
still_frozen == Time::from_secs(1),
"still frozen at 1s",
Time::from_secs(1),
still_frozen
);
clock.advance_to(Time::from_secs(100));
let still_frozen2 = clock.now();
crate::assert_with_log!(
still_frozen2 == Time::from_secs(1),
"still frozen after advance_to",
Time::from_secs(1),
still_frozen2
);
crate::test_complete!("virtual_clock_pause_freezes_time");
}
#[test]
fn virtual_clock_resume_unfreezes() {
init_test("virtual_clock_resume_unfreezes");
let clock = VirtualClock::new();
clock.advance(1_000_000_000);
clock.pause();
clock.resume();
let resumed = !clock.is_paused();
crate::assert_with_log!(resumed, "not paused", true, resumed);
clock.advance(2_000_000_000);
let now = clock.now();
crate::assert_with_log!(
now == Time::from_secs(3),
"resumed and advanced",
Time::from_secs(3),
now
);
crate::test_complete!("virtual_clock_resume_unfreezes");
}
#[test]
fn browser_clock_first_sample_rebases_without_jump() {
init_test("browser_clock_first_sample_rebases_without_jump");
let clock = BrowserMonotonicClock::default();
crate::assert_with_log!(
clock.now() == Time::ZERO,
"starts at zero",
Time::ZERO,
clock.now()
);
let t = clock.observe_host_time(Duration::from_millis(250));
crate::assert_with_log!(
t == Time::ZERO,
"first sample is baseline only",
Time::ZERO,
t
);
crate::test_complete!("browser_clock_first_sample_rebases_without_jump");
}
#[test]
fn browser_clock_clamps_regression_monotonically() {
init_test("browser_clock_clamps_regression_monotonically");
let clock = BrowserMonotonicClock::new(BrowserClockConfig {
max_forward_step: Duration::ZERO,
jitter_floor: Duration::ZERO,
});
let _ = clock.observe_host_time(Duration::from_millis(100));
let t1 = clock.observe_host_time(Duration::from_millis(130));
crate::assert_with_log!(
t1 == Time::from_millis(30),
"advances with forward host sample",
Time::from_millis(30),
t1
);
let t2 = clock.observe_host_time(Duration::from_millis(120));
crate::assert_with_log!(
t2 == Time::from_millis(30),
"regressed host sample does not move clock backward",
Time::from_millis(30),
t2
);
let t3 = clock.observe_host_time(Duration::from_millis(150));
crate::assert_with_log!(
t3 == Time::from_millis(50),
"clock resumes monotonic progression after regression",
Time::from_millis(50),
t3
);
crate::test_complete!("browser_clock_clamps_regression_monotonically");
}
#[test]
fn browser_clock_jitter_floor_accumulates_small_deltas() {
init_test("browser_clock_jitter_floor_accumulates_small_deltas");
let clock = BrowserMonotonicClock::new(BrowserClockConfig {
max_forward_step: Duration::ZERO,
jitter_floor: Duration::from_millis(10),
});
let _ = clock.observe_host_time(Duration::from_millis(100));
let t1 = clock.observe_host_time(Duration::from_millis(103));
crate::assert_with_log!(t1 == Time::ZERO, "sub-floor delta deferred", Time::ZERO, t1);
crate::assert_with_log!(
clock.pending_catch_up() == Duration::from_millis(3),
"pending catch-up tracks deferred jitter",
Duration::from_millis(3),
clock.pending_catch_up()
);
let t2 = clock.observe_host_time(Duration::from_millis(110));
crate::assert_with_log!(
t2 == Time::from_millis(10),
"accumulated jitter released at floor",
Time::from_millis(10),
t2
);
crate::assert_with_log!(
clock.pending_catch_up() == Duration::ZERO,
"pending catch-up drained",
Duration::ZERO,
clock.pending_catch_up()
);
crate::test_complete!("browser_clock_jitter_floor_accumulates_small_deltas");
}
#[test]
fn browser_clock_limits_catch_up_per_observation() {
init_test("browser_clock_limits_catch_up_per_observation");
let clock = BrowserMonotonicClock::new(BrowserClockConfig {
max_forward_step: Duration::from_millis(50),
jitter_floor: Duration::ZERO,
});
let _ = clock.observe_host_time(Duration::ZERO);
let t1 = clock.observe_host_time(Duration::from_millis(200));
crate::assert_with_log!(
t1 == Time::from_millis(50),
"first catch-up slice capped",
Time::from_millis(50),
t1
);
crate::assert_with_log!(
clock.pending_catch_up() == Duration::from_millis(150),
"remaining catch-up retained",
Duration::from_millis(150),
clock.pending_catch_up()
);
let t2 = clock.observe_host_time(Duration::from_millis(200));
crate::assert_with_log!(
t2 == Time::from_millis(100),
"second slice advances deterministically",
Time::from_millis(100),
t2
);
crate::assert_with_log!(
clock.pending_catch_up() == Duration::from_millis(100),
"catch-up debt decreases by cap",
Duration::from_millis(100),
clock.pending_catch_up()
);
crate::test_complete!("browser_clock_limits_catch_up_per_observation");
}
#[test]
fn browser_clock_suspend_resume_rebases_without_jump() {
init_test("browser_clock_suspend_resume_rebases_without_jump");
let clock = BrowserMonotonicClock::new(BrowserClockConfig {
max_forward_step: Duration::ZERO,
jitter_floor: Duration::ZERO,
});
let _ = clock.observe_host_time(Duration::from_millis(100));
let t1 = clock.observe_host_time(Duration::from_millis(120));
crate::assert_with_log!(
t1 == Time::from_millis(20),
"advances before suspend",
Time::from_millis(20),
t1
);
clock.suspend();
crate::assert_with_log!(
clock.is_suspended(),
"clock suspended",
true,
clock.is_suspended()
);
let t2 = clock.observe_host_time(Duration::from_millis(500));
crate::assert_with_log!(
t2 == Time::from_millis(20),
"suspended clock does not advance",
Time::from_millis(20),
t2
);
clock.resume();
crate::assert_with_log!(
!clock.is_suspended(),
"clock resumed",
false,
clock.is_suspended()
);
let t3 = clock.observe_host_time(Duration::from_millis(700));
crate::assert_with_log!(
t3 == Time::from_millis(20),
"first post-resume sample rebases",
Time::from_millis(20),
t3
);
let t4 = clock.observe_host_time(Duration::from_millis(730));
crate::assert_with_log!(
t4 == Time::from_millis(50),
"post-resume progression uses new baseline",
Time::from_millis(50),
t4
);
crate::test_complete!("browser_clock_suspend_resume_rebases_without_jump");
}
#[test]
fn duration_to_nanos_saturates_at_u64_max() {
init_test("duration_to_nanos_saturates_at_u64_max");
let nanos = duration_to_nanos_saturating(Duration::MAX);
crate::assert_with_log!(nanos == u64::MAX, "duration saturates", u64::MAX, nanos);
crate::test_complete!("duration_to_nanos_saturates_at_u64_max");
}
#[test]
fn wall_clock_starts_near_zero() {
init_test("wall_clock_starts_near_zero");
let clock = WallClock::new();
let now = clock.now();
let max_nanos = 1_000_000;
let actual = now.as_nanos();
crate::assert_with_log!(actual < max_nanos, "near zero", max_nanos, actual);
crate::test_complete!("wall_clock_starts_near_zero");
}
#[test]
fn wall_clock_advances() {
init_test("wall_clock_advances");
let clock = WallClock::new();
let t1 = clock.now();
std::thread::sleep(std::time::Duration::from_millis(10));
let t2 = clock.now();
crate::assert_with_log!(t2 > t1, "clock advances", "t2 > t1", (t1, t2));
crate::test_complete!("wall_clock_advances");
}
#[test]
fn timer_driver_new() {
init_test("timer_driver_new");
let driver = TimerDriver::new();
crate::assert_with_log!(driver.is_empty(), "driver empty", true, driver.is_empty());
crate::assert_with_log!(
driver.pending_count() == 0,
"pending count",
0,
driver.pending_count()
);
crate::test_complete!("timer_driver_new");
}
#[test]
fn timer_driver_register() {
init_test("timer_driver_register");
let clock = Arc::new(VirtualClock::new());
let driver = TimerDriver::with_clock(clock);
let waker = futures_waker();
let handle = driver.register(Time::from_secs(1), waker);
crate::assert_with_log!(handle.id() == 0, "handle id", 0, handle.id());
crate::assert_with_log!(
driver.pending_count() == 1,
"pending count",
1,
driver.pending_count()
);
crate::assert_with_log!(
!driver.is_empty(),
"driver not empty",
false,
driver.is_empty()
);
crate::test_complete!("timer_driver_register");
}
#[test]
fn timer_driver_next_deadline() {
init_test("timer_driver_next_deadline");
let clock = Arc::new(VirtualClock::new());
let driver = TimerDriver::with_clock(clock);
let expected: Option<Time> = None;
let actual = driver.next_deadline();
crate::assert_with_log!(actual == expected, "empty next_deadline", expected, actual);
driver.register(Time::from_secs(5), futures_waker());
driver.register(Time::from_secs(3), futures_waker());
driver.register(Time::from_secs(7), futures_waker());
let expected = Some(Time::from_secs(3));
let actual = driver.next_deadline();
crate::assert_with_log!(actual == expected, "earliest deadline", expected, actual);
crate::test_complete!("timer_driver_next_deadline");
}
#[test]
fn timer_driver_next_deadline_clamps_overdue_timer_to_now() {
init_test("timer_driver_next_deadline_clamps_overdue_timer_to_now");
let clock = Arc::new(VirtualClock::new());
let driver = TimerDriver::with_clock(clock.clone());
driver.register(Time::from_secs(3), futures_waker());
clock.set(Time::from_secs(10));
let actual = driver.next_deadline();
let expected = Some(Time::from_secs(10));
crate::assert_with_log!(
actual == expected,
"overdue timer is reported as immediately due",
expected,
actual
);
crate::test_complete!("timer_driver_next_deadline_clamps_overdue_timer_to_now");
}
#[test]
fn timer_driver_next_deadline_returns_now_for_coalescing_ready_group() {
init_test("timer_driver_next_deadline_returns_now_for_coalescing_ready_group");
let clock = Arc::new(VirtualClock::new());
let driver = TimerDriver {
clock: clock.clone(),
wheel: Mutex::new(TimerWheel::with_config(
Time::ZERO,
TimerWheelConfig::default(),
CoalescingConfig::new()
.coalesce_window(Duration::from_millis(5))
.min_group_size(2)
.enable(),
)),
};
driver.register(Time::from_millis(2), futures_waker());
driver.register(Time::from_millis(4), futures_waker());
clock.set(Time::from_millis(1));
let actual = driver.next_deadline();
let expected = Some(Time::from_millis(1));
crate::assert_with_log!(
actual == expected,
"driver reports immediate wake when coalescing can fire now",
expected,
actual
);
crate::test_complete!("timer_driver_next_deadline_returns_now_for_coalescing_ready_group");
}
#[test]
fn timer_driver_register_after_idle_gap_uses_current_clock_baseline() {
init_test("timer_driver_register_after_idle_gap_uses_current_clock_baseline");
let clock = Arc::new(VirtualClock::new());
let driver = TimerDriver::with_clock(clock.clone());
let woken = Arc::new(AtomicBool::new(false));
clock.set(Time::from_secs(8 * 24 * 60 * 60));
let deadline = clock.now() + Duration::from_secs(1);
driver.register(deadline, waker_that_sets(woken.clone()));
let next = driver.next_deadline();
let expected_next = Some(deadline);
crate::assert_with_log!(
next == expected_next,
"idle-gap registration keeps the true short future deadline",
expected_next,
next
);
let fired_early = driver.process_timers();
crate::assert_with_log!(
fired_early == 0,
"freshly registered short timer does not fire immediately after idle gap",
0usize,
fired_early
);
crate::assert_with_log!(
!woken.load(Ordering::SeqCst),
"waker not called before the new short deadline",
false,
woken.load(Ordering::SeqCst)
);
clock.advance(2_000_000_000);
let fired = driver.process_timers();
crate::assert_with_log!(
fired == 1,
"timer fires once the real post-idle deadline passes",
1usize,
fired
);
crate::assert_with_log!(
woken.load(Ordering::SeqCst),
"waker called after the real deadline",
true,
woken.load(Ordering::SeqCst)
);
crate::test_complete!("timer_driver_register_after_idle_gap_uses_current_clock_baseline");
}
#[test]
fn timer_driver_register_resamples_clock_after_waiting_for_wheel_lock() {
init_test("timer_driver_register_resamples_clock_after_waiting_for_wheel_lock");
let clock = Arc::new(VirtualClock::new());
let driver = Arc::new(TimerDriver::with_clock(clock.clone()));
let deadline = Time::from_secs(8 * 24 * 60 * 60 + 1);
let wheel_guard = driver.wheel.lock();
let driver_for_thread = Arc::clone(&driver);
let register_thread =
std::thread::spawn(move || driver_for_thread.register(deadline, futures_waker()));
clock.set(Time::from_secs(8 * 24 * 60 * 60));
drop(wheel_guard);
let register_handle = register_thread
.join()
.expect("register thread should complete without panicking");
let next = driver.next_deadline();
let expected_next = Some(deadline);
crate::assert_with_log!(
next == expected_next,
"register re-samples clock after lock wait so long absolute deadlines are not stale-clamped",
expected_next,
next
);
let fired_early = driver.process_timers();
crate::assert_with_log!(
fired_early == 0,
"waiting on the wheel lock does not make the newly registered timer immediately due",
0usize,
fired_early
);
clock.advance(2_000_000_000);
let fired = driver.process_timers();
crate::assert_with_log!(
fired == 1,
"timer still fires once the true absolute deadline passes",
1usize,
fired
);
let cancelled_after_fire = driver.cancel(®ister_handle);
crate::assert_with_log!(
!cancelled_after_fire,
"fired timer is no longer cancellable",
false,
cancelled_after_fire
);
crate::test_complete!("timer_driver_register_resamples_clock_after_waiting_for_wheel_lock");
}
#[test]
fn timer_driver_process_expired() {
init_test("timer_driver_process_expired");
let clock = Arc::new(VirtualClock::new());
let driver = TimerDriver::with_clock(clock.clone());
let woken = Arc::new(AtomicBool::new(false));
let woken_clone = woken.clone();
let waker = waker_that_sets(woken_clone);
driver.register(Time::from_secs(1), waker);
let processed = driver.process_timers();
crate::assert_with_log!(processed == 0, "process_timers at t=0", 0, processed);
let woken_now = woken.load(Ordering::SeqCst);
crate::assert_with_log!(!woken_now, "not woken", false, woken_now);
clock.advance(2_000_000_000); let processed = driver.process_timers();
crate::assert_with_log!(processed == 1, "process_timers after advance", 1, processed);
let woken_now = woken.load(Ordering::SeqCst);
crate::assert_with_log!(woken_now, "woken", true, woken_now);
crate::assert_with_log!(driver.is_empty(), "driver empty", true, driver.is_empty());
crate::test_complete!("timer_driver_process_expired");
}
#[test]
fn timer_driver_does_not_fire_while_clock_paused() {
init_test("timer_driver_does_not_fire_while_clock_paused");
let clock = Arc::new(VirtualClock::new());
let driver = TimerDriver::with_clock(clock.clone());
let woken = Arc::new(AtomicBool::new(false));
let waker = waker_that_sets(woken.clone());
driver.register(Time::from_secs(1), waker);
clock.pause();
clock.advance(2_000_000_000);
let fired_while_paused = driver.process_timers();
crate::assert_with_log!(
fired_while_paused == 0,
"paused clock does not advance timers",
0,
fired_while_paused
);
crate::assert_with_log!(
!woken.load(Ordering::SeqCst),
"waker not called while paused",
false,
woken.load(Ordering::SeqCst)
);
crate::assert_with_log!(
driver.pending_count() == 1,
"timer remains pending while paused",
1,
driver.pending_count()
);
clock.resume();
clock.advance(2_000_000_000);
let fired_after_resume = driver.process_timers();
crate::assert_with_log!(
fired_after_resume == 1,
"timer fires after resume and advance",
1,
fired_after_resume
);
crate::assert_with_log!(
woken.load(Ordering::SeqCst),
"waker called after resume",
true,
woken.load(Ordering::SeqCst)
);
crate::assert_with_log!(driver.is_empty(), "driver empty", true, driver.is_empty());
crate::test_complete!("timer_driver_does_not_fire_while_clock_paused");
}
#[test]
fn timer_driver_multiple_timers() {
init_test("timer_driver_multiple_timers");
let clock = Arc::new(VirtualClock::new());
let driver = TimerDriver::with_clock(clock.clone());
let count = Arc::new(AtomicU64::new(0));
for i in 1..=5 {
let count_clone = count.clone();
let waker = waker_that_increments(count_clone);
driver.register(Time::from_secs(i), waker);
}
crate::assert_with_log!(
driver.pending_count() == 5,
"pending count",
5,
driver.pending_count()
);
clock.set(Time::from_secs(3));
let processed = driver.process_timers();
crate::assert_with_log!(processed == 3, "process_timers at t=3", 3, processed);
let count_now = count.load(Ordering::SeqCst);
crate::assert_with_log!(count_now == 3, "count at t=3", 3, count_now);
crate::assert_with_log!(
driver.pending_count() == 2,
"pending count after t=3",
2,
driver.pending_count()
);
clock.set(Time::from_secs(10));
let processed = driver.process_timers();
crate::assert_with_log!(processed == 2, "process_timers at t=10", 2, processed);
let count_now = count.load(Ordering::SeqCst);
crate::assert_with_log!(count_now == 5, "count at t=10", 5, count_now);
crate::assert_with_log!(driver.is_empty(), "driver empty", true, driver.is_empty());
crate::test_complete!("timer_driver_multiple_timers");
}
#[test]
fn timer_driver_update_cancels_old_handle() {
init_test("timer_driver_update_cancels_old_handle");
let clock = Arc::new(VirtualClock::new());
let driver = TimerDriver::with_clock(clock.clone());
let counter = Arc::new(AtomicU64::new(0));
let waker = waker_that_increments(counter.clone());
let handle = driver.register(Time::from_secs(5), waker);
let waker2 = waker_that_increments(counter.clone());
let _new_handle = driver.update(&handle, Time::from_secs(2), waker2);
clock.set(Time::from_secs(3));
let processed = driver.process_timers();
crate::assert_with_log!(processed == 1, "process_timers at t=3", 1, processed);
let count_now = counter.load(Ordering::SeqCst);
crate::assert_with_log!(count_now == 1, "counter", 1, count_now);
clock.set(Time::from_secs(10));
let processed = driver.process_timers();
crate::assert_with_log!(processed == 0, "process_timers at t=10", 0, processed);
let count_now = counter.load(Ordering::SeqCst);
crate::assert_with_log!(count_now == 1, "counter stable", 1, count_now);
crate::test_complete!("timer_driver_update_cancels_old_handle");
}
#[test]
fn timer_driver_update_rejects_stale_handle_without_registering_new_timer() {
init_test("timer_driver_update_rejects_stale_handle_without_registering_new_timer");
let clock = Arc::new(VirtualClock::new());
let driver = TimerDriver::with_clock(clock.clone());
let stale_counter = Arc::new(AtomicU64::new(0));
let stale_handle = driver.register(Time::from_secs(5), futures_waker());
let cancelled = driver.cancel(&stale_handle);
crate::assert_with_log!(
cancelled,
"live handle cancelled before stale update",
true,
cancelled
);
let returned = driver.update(
&stale_handle,
Time::from_secs(2),
waker_that_increments(Arc::clone(&stale_counter)),
);
crate::assert_with_log!(
returned == stale_handle,
"stale update returns unchanged handle",
stale_handle,
returned
);
crate::assert_with_log!(
driver.pending_count() == 0,
"stale update leaves pending timer count unchanged",
0,
driver.pending_count()
);
clock.set(Time::from_secs(3));
let early_processed = driver.process_timers();
crate::assert_with_log!(
early_processed == 0,
"stale update does not create an early timer",
0usize,
early_processed
);
crate::assert_with_log!(
stale_counter.load(Ordering::SeqCst) == 0,
"stale update waker not fired",
0u64,
stale_counter.load(Ordering::SeqCst)
);
clock.set(Time::from_secs(6));
let processed = driver.process_timers();
crate::assert_with_log!(
processed == 0,
"stale timer never fires later",
0usize,
processed
);
crate::assert_with_log!(
stale_counter.load(Ordering::SeqCst) == 0,
"stale timer never registered",
0u64,
stale_counter.load(Ordering::SeqCst)
);
crate::test_complete!(
"timer_driver_update_rejects_stale_handle_without_registering_new_timer"
);
}
#[test]
fn timer_driver_update_after_idle_gap_keeps_future_deadline() {
init_test("timer_driver_update_after_idle_gap_keeps_future_deadline");
let clock = Arc::new(VirtualClock::new());
let driver = TimerDriver::with_clock(clock.clone());
let counter = Arc::new(AtomicU64::new(0));
let handle = driver.register(Time::from_secs(10), waker_that_increments(counter.clone()));
clock.set(Time::from_secs(8 * 24 * 60 * 60));
let updated_deadline = clock.now() + Duration::from_secs(2);
let updated = driver.update(
&handle,
updated_deadline,
waker_that_increments(counter.clone()),
);
crate::assert_with_log!(
updated != handle,
"live timer update after idle gap still produces a fresh handle",
"different handle",
(handle, updated)
);
let expected_next = Some(updated_deadline);
let next = driver.next_deadline();
crate::assert_with_log!(
next == expected_next,
"updated deadline remains in the future after idle gap",
expected_next,
next
);
let fired_early = driver.process_timers();
crate::assert_with_log!(
fired_early == 0,
"updated timer does not fire immediately after idle gap",
0usize,
fired_early
);
crate::assert_with_log!(
counter.load(Ordering::SeqCst) == 0,
"counter not incremented before updated deadline",
0u64,
counter.load(Ordering::SeqCst)
);
clock.advance(3_000_000_000);
let fired = driver.process_timers();
crate::assert_with_log!(
fired == 1,
"updated timer fires after the true future deadline",
1usize,
fired
);
crate::assert_with_log!(
counter.load(Ordering::SeqCst) == 1,
"updated timer fires exactly once",
1u64,
counter.load(Ordering::SeqCst)
);
crate::test_complete!("timer_driver_update_after_idle_gap_keeps_future_deadline");
}
#[test]
fn timer_driver_clear() {
init_test("timer_driver_clear");
let clock = Arc::new(VirtualClock::new());
let driver = TimerDriver::with_clock(clock);
driver.register(Time::from_secs(1), futures_waker());
driver.register(Time::from_secs(2), futures_waker());
crate::assert_with_log!(
driver.pending_count() == 2,
"pending count",
2,
driver.pending_count()
);
driver.clear();
crate::assert_with_log!(driver.is_empty(), "driver empty", true, driver.is_empty());
crate::test_complete!("timer_driver_clear");
}
#[test]
fn timer_driver_now() {
init_test("timer_driver_now");
let clock = Arc::new(VirtualClock::new());
let driver = TimerDriver::with_clock(clock.clone());
let now = driver.now();
crate::assert_with_log!(now == Time::ZERO, "now at zero", Time::ZERO, now);
clock.advance(1_000_000_000);
let now = driver.now();
crate::assert_with_log!(
now == Time::from_secs(1),
"now after advance",
Time::from_secs(1),
now
);
crate::test_complete!("timer_driver_now");
}
#[test]
fn timer_driver_with_browser_clock_respects_catch_up_cap() {
init_test("timer_driver_with_browser_clock_respects_catch_up_cap");
let clock = Arc::new(BrowserMonotonicClock::new(BrowserClockConfig {
max_forward_step: Duration::from_millis(50),
jitter_floor: Duration::ZERO,
}));
let driver = TimerDriver::with_clock(clock.clone());
let woken = Arc::new(AtomicBool::new(false));
let _ = clock.observe_host_time(Duration::ZERO);
driver.register(Time::from_millis(80), waker_that_sets(woken.clone()));
let _ = clock.observe_host_time(Duration::from_millis(100));
let fired_1 = driver.process_timers();
crate::assert_with_log!(
fired_1 == 0,
"first capped catch-up does not fire 80ms timer",
0usize,
fired_1
);
let _ = clock.observe_host_time(Duration::from_millis(100));
let fired_2 = driver.process_timers();
crate::assert_with_log!(
fired_2 == 1,
"second catch-up slice fires timer",
1usize,
fired_2
);
crate::assert_with_log!(
woken.load(Ordering::SeqCst),
"timer waker called after bounded catch-up",
true,
woken.load(Ordering::SeqCst)
);
crate::test_complete!("timer_driver_with_browser_clock_respects_catch_up_cap");
}
#[test]
fn timer_handle_id_and_generation() {
init_test("timer_handle_id_and_generation");
let clock = Arc::new(VirtualClock::new());
let driver = TimerDriver::with_clock(clock);
let h1 = driver.register(Time::from_secs(1), futures_waker());
let h2 = driver.register(Time::from_secs(2), futures_waker());
crate::assert_with_log!(h1.id() == 0, "h1 id", 0, h1.id());
crate::assert_with_log!(h2.id() == 1, "h2 id", 1, h2.id());
let gen1 = h1.generation();
let gen2 = h2.generation();
crate::assert_with_log!(
gen1 != gen2,
"generation differs",
"not equal",
(gen1, gen2)
);
crate::test_complete!("timer_handle_id_and_generation");
}
#[test]
fn timer_driver_handle_with_virtual_clock() {
init_test("timer_driver_handle_with_virtual_clock");
let clock = Arc::new(VirtualClock::new());
let handle = TimerDriverHandle::with_virtual_clock(clock.clone());
let now = handle.now();
crate::assert_with_log!(now == Time::ZERO, "initial time", Time::ZERO, now);
clock.advance(1_000_000_000);
let now = handle.now();
crate::assert_with_log!(
now == Time::from_secs(1),
"time after advance",
Time::from_secs(1),
now
);
crate::test_complete!("timer_driver_handle_with_virtual_clock");
}
#[test]
fn timer_driver_handle_register_and_cancel() {
init_test("timer_driver_handle_register_and_cancel");
let clock = Arc::new(VirtualClock::new());
let handle = TimerDriverHandle::with_virtual_clock(clock.clone());
let woken = Arc::new(AtomicBool::new(false));
let waker = waker_that_sets(woken.clone());
let timer_handle = handle.register(Time::from_secs(5), waker);
let cancelled = handle.cancel(&timer_handle);
crate::assert_with_log!(cancelled, "timer cancelled", true, cancelled);
clock.set(Time::from_secs(10));
let fired = handle.process_timers();
crate::assert_with_log!(fired == 0, "no timers fire after cancel", 0usize, fired);
let woken_val = woken.load(Ordering::SeqCst);
crate::assert_with_log!(!woken_val, "waker not called", false, woken_val);
crate::test_complete!("timer_driver_handle_register_and_cancel");
}
#[test]
fn timer_driver_handle_update_reschedules() {
init_test("timer_driver_handle_update_reschedules");
let clock = Arc::new(VirtualClock::new());
let handle = TimerDriverHandle::with_virtual_clock(clock.clone());
let counter = Arc::new(AtomicU64::new(0));
let waker = waker_that_increments(counter.clone());
let timer_handle = handle.register(Time::from_secs(5), waker);
let waker2 = waker_that_increments(counter.clone());
let _new_handle = handle.update(&timer_handle, Time::from_secs(2), waker2);
clock.set(Time::from_secs(3));
let fired = handle.process_timers();
crate::assert_with_log!(fired == 1, "updated timer fires", 1usize, fired);
let count = counter.load(Ordering::SeqCst);
crate::assert_with_log!(count == 1, "counter incremented", 1u64, count);
clock.set(Time::from_secs(10));
let fired = handle.process_timers();
crate::assert_with_log!(fired == 0, "old timer cancelled", 0usize, fired);
crate::test_complete!("timer_driver_handle_update_reschedules");
}
#[test]
fn timer_driver_handle_next_deadline() {
init_test("timer_driver_handle_next_deadline");
let clock = Arc::new(VirtualClock::new());
let handle = TimerDriverHandle::with_virtual_clock(clock);
let no_deadline = handle.next_deadline();
crate::assert_with_log!(
no_deadline.is_none(),
"no deadline when empty",
true,
no_deadline.is_none()
);
let _ = handle.register(Time::from_secs(10), futures_waker());
let _ = handle.register(Time::from_secs(3), futures_waker());
let _ = handle.register(Time::from_secs(7), futures_waker());
let next = handle.next_deadline();
crate::assert_with_log!(
next == Some(Time::from_secs(3)),
"earliest deadline returned",
Some(Time::from_secs(3)),
next
);
crate::test_complete!("timer_driver_handle_next_deadline");
}
#[test]
fn timer_driver_handle_next_deadline_clamps_overdue_timer_to_now() {
init_test("timer_driver_handle_next_deadline_clamps_overdue_timer_to_now");
let clock = Arc::new(VirtualClock::new());
let handle = TimerDriverHandle::with_virtual_clock(clock.clone());
let _ = handle.register(Time::from_secs(4), futures_waker());
clock.set(Time::from_secs(9));
let next = handle.next_deadline();
crate::assert_with_log!(
next == Some(Time::from_secs(9)),
"handle reports overdue timer as immediately due",
Some(Time::from_secs(9)),
next
);
crate::test_complete!("timer_driver_handle_next_deadline_clamps_overdue_timer_to_now");
}
#[test]
fn timer_driver_handle_ptr_eq() {
init_test("timer_driver_handle_ptr_eq");
let clock = Arc::new(VirtualClock::new());
let handle1 = TimerDriverHandle::with_virtual_clock(clock.clone());
let handle2 = TimerDriverHandle::with_virtual_clock(clock);
let eq = handle1.ptr_eq(&handle2);
crate::assert_with_log!(!eq, "different drivers not equal", false, eq);
crate::test_complete!("timer_driver_handle_ptr_eq");
}
#[test]
fn timer_driver_handle_with_browser_clock() {
init_test("timer_driver_handle_with_browser_clock");
let clock = Arc::new(BrowserMonotonicClock::new(BrowserClockConfig {
max_forward_step: Duration::ZERO,
jitter_floor: Duration::ZERO,
}));
let handle = TimerDriverHandle::with_browser_clock(clock.clone());
let _ = clock.observe_host_time(Duration::from_millis(20));
let _ = clock.observe_host_time(Duration::from_millis(35));
let now = handle.now();
crate::assert_with_log!(
now == Time::from_millis(15),
"driver handle reflects browser clock advancement",
Time::from_millis(15),
now
);
crate::test_complete!("timer_driver_handle_with_browser_clock");
}
fn futures_waker() -> Waker {
std::task::Waker::noop().clone()
}
fn waker_that_sets(flag: Arc<AtomicBool>) -> Waker {
struct SharedFlagWaker {
flag: Arc<AtomicBool>,
}
use std::task::Wake;
impl Wake for SharedFlagWaker {
fn wake(self: Arc<Self>) {
self.flag.store(true, Ordering::SeqCst);
}
fn wake_by_ref(self: &Arc<Self>) {
self.flag.store(true, Ordering::SeqCst);
}
}
Arc::new(SharedFlagWaker { flag }).into()
}
struct CounterWaker {
counter: Arc<AtomicU64>,
}
use std::task::Wake;
impl Wake for CounterWaker {
fn wake(self: Arc<Self>) {
self.counter.fetch_add(1, Ordering::SeqCst);
}
fn wake_by_ref(self: &Arc<Self>) {
self.counter.fetch_add(1, Ordering::SeqCst);
}
}
fn waker_that_increments(counter: Arc<AtomicU64>) -> Waker {
Arc::new(CounterWaker { counter }).into()
}
#[test]
fn conformance_timer_driver_precision_wall_clock() {
crate::test_utils::init_test_logging();
crate::test_phase!("conformance_timer_driver_precision_wall_clock");
let wall_clock = Arc::new(WallClock::new());
let driver = Arc::new(TimerDriver::with_clock(wall_clock.clone()));
let handle = TimerDriverHandle::new(driver);
let counter = Arc::new(AtomicU64::new(0));
let short_duration = Duration::from_millis(5);
let start_time = wall_clock.now();
let _timer_id = handle.register(
handle
.now()
.saturating_add_nanos(short_duration.as_nanos().min(u128::from(u64::MAX)) as u64),
waker_that_increments(counter.clone()),
);
let timeout = start_time.saturating_add_nanos(
Duration::from_millis(100)
.as_nanos()
.min(u128::from(u64::MAX)) as u64,
);
while wall_clock.now() < timeout && counter.load(Ordering::SeqCst) == 0 {
let _ = handle.process_timers();
std::thread::sleep(Duration::from_millis(1));
}
let end_time = wall_clock.now();
let elapsed = Duration::from_nanos(end_time.duration_since(start_time));
crate::assert_with_log!(
counter.load(Ordering::SeqCst) > 0,
"wall clock timer fired",
true,
counter.load(Ordering::SeqCst) > 0
);
let tolerance = Duration::from_millis(10); crate::assert_with_log!(
elapsed <= short_duration + tolerance,
&format!(
"timer precision within tolerance: {:?} <= {:?}",
elapsed,
short_duration + tolerance
),
true,
elapsed <= short_duration + tolerance
);
crate::test_complete!("conformance_timer_driver_precision_wall_clock");
}
#[test]
fn conformance_timer_driver_virtual_clock() {
crate::test_utils::init_test_logging();
crate::test_phase!("conformance_timer_driver_virtual_clock");
let virtual_clock = Arc::new(VirtualClock::new());
let driver = Arc::new(TimerDriver::with_clock(virtual_clock.clone()));
let handle = TimerDriverHandle::new(driver);
let counter = Arc::new(AtomicU64::new(0));
let durations = [
Duration::from_millis(10),
Duration::from_millis(50),
Duration::from_millis(100),
Duration::from_millis(500),
];
let mut timer_ids = Vec::new();
for duration in &durations {
let timer_id = handle.register(
handle
.now()
.saturating_add_nanos(duration.as_nanos().min(u128::from(u64::MAX)) as u64),
waker_that_increments(counter.clone()),
);
timer_ids.push(timer_id);
}
let advance_steps = [
Duration::from_millis(15), Duration::from_millis(60), Duration::from_millis(110), Duration::from_millis(510), ];
let mut expected_fired = 0;
for advance_duration in &advance_steps {
virtual_clock.advance(advance_duration.as_nanos() as u64);
let _ = handle.process_timers();
expected_fired += 1;
let actual_fired = counter.load(Ordering::SeqCst);
crate::assert_with_log!(
actual_fired == expected_fired,
&format!(
"virtual time advance fired correct number: {} at {:?}",
actual_fired, advance_duration
),
expected_fired,
actual_fired
);
}
crate::test_complete!("conformance_timer_driver_virtual_clock");
}
#[test]
fn conformance_timer_driver_concurrent_registrations() {
crate::test_utils::init_test_logging();
crate::test_phase!("conformance_timer_driver_concurrent_registrations");
let virtual_clock = Arc::new(VirtualClock::new());
let driver = Arc::new(TimerDriver::with_clock(virtual_clock.clone()));
let handle = TimerDriverHandle::new(driver);
const TIMER_COUNT: usize = 1000;
let counters: Vec<_> = (0..TIMER_COUNT)
.map(|_| Arc::new(AtomicU64::new(0)))
.collect();
let mut timer_ids = Vec::new();
for (i, counter) in counters.iter().enumerate() {
let duration = Duration::from_millis(10 + (i as u64 % 100)); let timer_id = handle.register(
handle
.now()
.saturating_add_nanos(duration.as_nanos().min(u128::from(u64::MAX)) as u64),
waker_that_increments(counter.clone()),
);
timer_ids.push(timer_id);
}
crate::assert_with_log!(
handle.pending_count() == TIMER_COUNT,
"all timers registered",
TIMER_COUNT,
handle.pending_count()
);
virtual_clock.advance(Duration::from_millis(200).as_nanos() as u64);
let _ = handle.process_timers();
let fired_count = counters
.iter()
.map(|c| usize::from(c.load(Ordering::SeqCst) > 0))
.sum::<usize>();
crate::assert_with_log!(
fired_count == TIMER_COUNT,
"all timers fired",
TIMER_COUNT,
fired_count
);
crate::assert_with_log!(
handle.pending_count() == 0,
"no pending timers after firing",
0usize,
handle.pending_count()
);
crate::test_complete!("conformance_timer_driver_concurrent_registrations");
}
#[test]
fn conformance_timer_driver_cancellation_cleanup() {
crate::test_utils::init_test_logging();
crate::test_phase!("conformance_timer_driver_cancellation_cleanup");
let virtual_clock = Arc::new(VirtualClock::new());
let driver = Arc::new(TimerDriver::with_clock(virtual_clock.clone()));
let handle = TimerDriverHandle::new(driver);
let counter = Arc::new(AtomicU64::new(0));
let timer_id = handle.register(
handle.now().saturating_add_nanos(
Duration::from_millis(100)
.as_nanos()
.min(u128::from(u64::MAX)) as u64,
),
waker_that_increments(counter.clone()),
);
crate::assert_with_log!(
handle.pending_count() == 1,
"timer registered",
1usize,
handle.pending_count()
);
let cancelled = handle.cancel(&timer_id);
crate::assert_with_log!(cancelled, "timer cancelled", true, cancelled);
crate::assert_with_log!(
handle.pending_count() == 0,
"timer removed from pending",
0usize,
handle.pending_count()
);
virtual_clock.advance(Duration::from_millis(200).as_nanos() as u64);
let _ = handle.process_timers();
crate::assert_with_log!(
counter.load(Ordering::SeqCst) == 0,
"cancelled timer did not fire",
0u64,
counter.load(Ordering::SeqCst)
);
let double_cancel = handle.cancel(&timer_id);
crate::assert_with_log!(
!double_cancel,
"double cancel returns false",
false,
double_cancel
);
crate::test_complete!("conformance_timer_driver_cancellation_cleanup");
}
#[test]
fn conformance_timer_driver_browser_clock_monotonic() {
crate::test_utils::init_test_logging();
crate::test_phase!("conformance_timer_driver_browser_clock_monotonic");
let config = BrowserClockConfig::default();
let browser_clock = Arc::new(BrowserMonotonicClock::new(config));
let driver = Arc::new(TimerDriver::with_clock(browser_clock.clone()));
let handle = TimerDriverHandle::new(driver);
let counter = Arc::new(AtomicU64::new(0));
let host_samples = [0.0, 10.0, 20.0, 15.0, 30.0];
let mut last_time = Time::ZERO;
for &sample_ms in &host_samples {
browser_clock.observe_host_time(Duration::from_millis(sample_ms as u64));
let current_time = browser_clock.now();
crate::assert_with_log!(
current_time >= last_time,
&format!(
"monotonic time: {:?} >= {:?} at sample {sample_ms}",
current_time, last_time
),
true,
current_time >= last_time
);
last_time = current_time;
}
let _timer_id = handle.register(
handle.now().saturating_add_nanos(
Duration::from_millis(5)
.as_nanos()
.min(u128::from(u64::MAX)) as u64,
),
waker_that_increments(counter.clone()),
);
browser_clock.observe_host_time(Duration::from_millis(50));
let _ = handle.process_timers();
crate::assert_with_log!(
counter.load(Ordering::SeqCst) > 0,
"timer fired with browser clock",
true,
counter.load(Ordering::SeqCst) > 0
);
crate::test_complete!("conformance_timer_driver_browser_clock_monotonic");
}
}