use crate::cx::Cx;
use crate::time::{TimerDriverHandle, TimerHandle};
use crate::trace::TraceEvent;
use crate::types::Time;
use parking_lot::Mutex;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, OnceLock};
use std::task::{Context, Poll, Waker};
use std::time::{Duration, Instant};
static START_TIME: OnceLock<Instant> = OnceLock::new();
const CUSTOM_TIME_GETTER_POLL_INTERVAL: Duration = Duration::from_millis(1);
#[derive(Debug)]
struct FallbackThread {
stop: Arc<AtomicBool>,
completed: Arc<AtomicBool>,
thread: std::thread::Thread,
join: std::thread::JoinHandle<()>,
}
#[inline]
fn request_stop_fallback(fallback: &FallbackThread) {
fallback.stop.store(true, Ordering::Release);
fallback.thread.unpark();
}
#[inline]
fn take_finished_fallbacks(state: &mut SleepState) -> Vec<std::thread::JoinHandle<()>> {
let mut finished = Vec::new();
if state
.fallback
.as_ref()
.is_some_and(|fallback| fallback.completed.load(Ordering::Acquire))
{
state.zombie_fallbacks.push(
state
.fallback
.take()
.expect("completed implies fallback exists")
.join,
);
} else if state
.fallback
.as_ref()
.is_some_and(|fallback| fallback.join.is_finished())
{
finished.push(
state
.fallback
.take()
.expect("finished implies fallback exists")
.join,
);
}
let mut i = 0;
while i < state.zombie_fallbacks.len() {
if state.zombie_fallbacks[i].is_finished() {
finished.push(state.zombie_fallbacks.remove(i));
} else {
i += 1;
}
}
finished
}
#[inline]
fn duration_to_nanos(duration: Duration) -> u64 {
duration.as_nanos().min(u128::from(u64::MAX)) as u64
}
#[must_use]
#[inline]
pub fn wall_now() -> Time {
let start = START_TIME.get_or_init(Instant::now);
let now = Instant::now();
if now < *start {
Time::ZERO
} else {
let elapsed = now.duration_since(*start);
Time::from_nanos(duration_to_nanos(elapsed))
}
}
#[derive(Debug)]
struct SleepState {
waker: Option<Waker>,
fallback: Option<FallbackThread>,
zombie_fallbacks: Vec<std::thread::JoinHandle<()>>,
timer_handle: Option<TimerHandle>,
timer_driver: Option<TimerDriverHandle>,
}
#[derive(Debug)]
struct ReadyWaker {
ready: Arc<AtomicBool>,
inner: Waker,
}
use std::task::Wake;
impl Wake for ReadyWaker {
#[inline]
fn wake(self: Arc<Self>) {
self.ready.store(true, Ordering::Release);
self.inner.wake_by_ref();
}
#[inline]
fn wake_by_ref(self: &Arc<Self>) {
self.ready.store(true, Ordering::Release);
self.inner.wake_by_ref();
}
}
#[inline]
fn readiness_waker(ready: Arc<AtomicBool>, inner: Waker) -> Waker {
Waker::from(Arc::new(ReadyWaker { ready, inner }))
}
#[derive(Debug)]
pub struct Sleep {
deadline: Time,
pub(crate) time_getter: Option<fn() -> Time>,
bound_timer_driver: Option<TimerDriverHandle>,
polled: std::sync::atomic::AtomicBool,
completed: std::sync::atomic::AtomicBool,
ready: Arc<AtomicBool>,
state: Arc<Mutex<SleepState>>,
}
impl Sleep {
#[must_use]
#[inline]
pub fn new(deadline: Time) -> Self {
Self {
deadline,
time_getter: None,
bound_timer_driver: None,
polled: std::sync::atomic::AtomicBool::new(false),
completed: std::sync::atomic::AtomicBool::new(false),
ready: Arc::new(AtomicBool::new(false)),
state: Arc::new(Mutex::new(SleepState {
waker: None,
fallback: None,
zombie_fallbacks: Vec::new(),
timer_handle: None,
timer_driver: None,
})),
}
}
#[must_use]
#[inline]
pub fn after(now: Time, duration: Duration) -> Self {
let deadline = now.saturating_add_nanos(duration_to_nanos(duration));
Self::new(deadline)
}
#[inline]
#[must_use]
pub fn with_time_getter(deadline: Time, time_getter: fn() -> Time) -> Self {
Self {
deadline,
time_getter: Some(time_getter),
bound_timer_driver: None,
polled: std::sync::atomic::AtomicBool::new(false),
completed: std::sync::atomic::AtomicBool::new(false),
ready: Arc::new(AtomicBool::new(false)),
state: Arc::new(Mutex::new(SleepState {
waker: None,
fallback: None,
zombie_fallbacks: Vec::new(),
timer_handle: None,
timer_driver: None,
})),
}
}
#[inline]
#[must_use]
pub(crate) fn with_timer_driver(deadline: Time, timer_driver: TimerDriverHandle) -> Self {
Self {
deadline,
time_getter: None,
bound_timer_driver: Some(timer_driver),
polled: std::sync::atomic::AtomicBool::new(false),
completed: std::sync::atomic::AtomicBool::new(false),
ready: Arc::new(AtomicBool::new(false)),
state: Arc::new(Mutex::new(SleepState {
waker: None,
fallback: None,
zombie_fallbacks: Vec::new(),
timer_handle: None,
timer_driver: None,
})),
}
}
#[inline]
#[must_use]
pub const fn deadline(&self) -> Time {
self.deadline
}
#[inline]
#[must_use]
pub fn remaining(&self, now: Time) -> Duration {
if now >= self.deadline {
Duration::ZERO
} else {
let nanos = self.deadline.as_nanos().saturating_sub(now.as_nanos());
Duration::from_nanos(nanos)
}
}
#[inline]
#[must_use]
pub fn is_elapsed(&self, now: Time) -> bool {
now >= self.deadline
}
#[inline]
pub fn reset(&mut self, deadline: Time) {
self.deadline = deadline;
self.polled
.store(false, std::sync::atomic::Ordering::Relaxed);
self.completed
.store(false, std::sync::atomic::Ordering::Relaxed);
self.ready = Arc::new(AtomicBool::new(false));
let (handle, driver, fallback_handles) = {
let mut state = self.state.lock();
let mut handles = std::mem::take(&mut state.zombie_fallbacks);
if let Some(fallback) = state.fallback.take() {
request_stop_fallback(&fallback);
handles.push(fallback.join);
}
(
state.timer_handle.take(),
state.timer_driver.take(),
handles,
)
};
drop(fallback_handles);
if let (Some(handle), Some(driver)) = (handle, driver) {
let trace = Cx::current().and_then(|current| current.trace_buffer());
if let Some(trace) = trace.as_ref() {
let now = driver.now();
trace.record_event(|seq| TraceEvent::timer_cancelled(seq, now, handle.id()));
}
let _ = driver.cancel(&handle);
}
}
#[inline]
pub fn reset_after(&mut self, now: Time, duration: Duration) {
self.deadline = now.saturating_add_nanos(duration_to_nanos(duration));
self.polled
.store(false, std::sync::atomic::Ordering::Relaxed);
self.completed
.store(false, std::sync::atomic::Ordering::Relaxed);
self.ready = Arc::new(AtomicBool::new(false));
let (handle, driver, fallback_handles) = {
let mut state = self.state.lock();
let mut handles = std::mem::take(&mut state.zombie_fallbacks);
if let Some(fallback) = state.fallback.take() {
request_stop_fallback(&fallback);
handles.push(fallback.join);
}
(
state.timer_handle.take(),
state.timer_driver.take(),
handles,
)
};
drop(fallback_handles);
if let (Some(handle), Some(driver)) = (handle, driver) {
let trace = Cx::current().and_then(|current| current.trace_buffer());
if let Some(trace) = trace.as_ref() {
let now = driver.now();
trace.record_event(|seq| TraceEvent::timer_cancelled(seq, now, handle.id()));
}
let _ = driver.cancel(&handle);
}
}
#[must_use]
#[inline]
pub fn was_polled(&self) -> bool {
self.polled.load(std::sync::atomic::Ordering::Relaxed)
}
#[inline]
fn current_time(&self) -> Time {
self.time_getter.map_or_else(wall_now, |getter| getter())
}
#[inline]
#[must_use]
pub const fn has_custom_time_getter(&self) -> bool {
self.time_getter.is_some()
}
pub fn poll_with_time(&self, now: Time) -> Poll<()> {
assert!(
!self.completed.load(std::sync::atomic::Ordering::Acquire),
"Sleep polled after completion"
);
self.polled
.store(true, std::sync::atomic::Ordering::Relaxed);
if self.ready.swap(false, Ordering::AcqRel) || now >= self.deadline {
self.completed
.store(true, std::sync::atomic::Ordering::Release);
Poll::Ready(())
} else {
Poll::Pending
}
}
}
impl Future for Sleep {
type Output = ();
#[allow(clippy::too_many_lines)]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let (ambient_timer_driver, trace) = Cx::current().map_or_else(
|| (None, None),
|current| (current.timer_driver(), current.trace_buffer()),
);
let timer_driver = self
.bound_timer_driver
.clone()
.or_else(|| ambient_timer_driver.clone());
let now = if let Some(timer) = self.bound_timer_driver.as_ref() {
timer.now()
} else if self.time_getter.is_some() {
self.current_time()
} else {
timer_driver
.as_ref()
.map_or_else(|| self.current_time(), TimerDriverHandle::now)
};
match self.poll_with_time(now) {
Poll::Ready(()) => {
let (handle, driver) = {
let mut state = self.state.lock();
(state.timer_handle.take(), state.timer_driver.clone())
};
if let Some(handle) = handle {
if let Some(trace) = trace.as_ref() {
let fired_at = now.max(self.deadline);
trace.record_event(|seq| {
TraceEvent::timer_fired(seq, fired_at, handle.id())
});
}
if let Some(driver) = driver.or_else(|| timer_driver.clone()) {
let _ = driver.cancel(&handle);
}
}
Poll::Ready(())
}
Poll::Pending => {
let mut state = self.state.lock();
let finished_handles = take_finished_fallbacks(&mut state);
let waker_changed = !state
.waker
.as_ref()
.is_some_and(|w| w.will_wake(cx.waker()));
if waker_changed {
state.waker = Some(cx.waker().clone());
}
if let Some(timer) = timer_driver.as_ref() {
if let Some(fallback) = state.fallback.take() {
request_stop_fallback(&fallback);
state.zombie_fallbacks.push(fallback.join);
}
let needs_cancel = state
.timer_driver
.as_ref()
.is_some_and(|prev| !timer.ptr_eq(prev));
if needs_cancel {
let old_driver = state.timer_driver.take();
let old_handle = state.timer_handle.take();
if let (Some(prev_driver), Some(handle)) = (old_driver, old_handle) {
if let Some(trace) = trace.as_ref() {
trace.record_event(|seq| {
TraceEvent::timer_cancelled(seq, prev_driver.now(), handle.id())
});
}
let _ = prev_driver.cancel(&handle);
}
}
state.timer_driver = Some(timer.clone());
if state.timer_handle.is_none() {
let handle = timer.register(
self.deadline,
readiness_waker(Arc::clone(&self.ready), cx.waker().clone()),
);
if let Some(trace) = trace.as_ref() {
trace.record_event(|seq| {
TraceEvent::timer_scheduled(seq, now, handle.id(), self.deadline)
});
}
state.timer_handle = Some(handle);
} else if waker_changed {
if let Some(handle) = state.timer_handle.take() {
let old_id = handle.id();
let new_handle = timer.update(
&handle,
self.deadline,
readiness_waker(Arc::clone(&self.ready), cx.waker().clone()),
);
if let Some(trace) = trace.as_ref() {
trace.record_event(|seq| {
TraceEvent::timer_cancelled(seq, now, old_id)
});
trace.record_event(|seq| {
TraceEvent::timer_scheduled(
seq,
now,
new_handle.id(),
self.deadline,
)
});
}
state.timer_handle = Some(new_handle);
}
}
} else {
if let Some(prev_driver) = state.timer_driver.take() {
if let Some(old_handle) = state.timer_handle.take() {
if let Some(trace) = trace.as_ref() {
trace.record_event(|seq| {
TraceEvent::timer_cancelled(
seq,
prev_driver.now(),
old_handle.id(),
)
});
}
let _ = prev_driver.cancel(&old_handle);
}
}
if state.fallback.is_none() {
let deadline = self.deadline;
let getter = self.time_getter.unwrap_or(wall_now);
let polls_custom_time_getter = self.time_getter.is_some();
let state_clone = Arc::clone(&self.state);
let stop = Arc::new(AtomicBool::new(false));
let stop_for_thread = Arc::clone(&stop);
let completed = Arc::new(AtomicBool::new(false));
let completed_for_thread = Arc::clone(&completed);
let ready_for_thread = Arc::clone(&self.ready);
let handle = std::thread::spawn(move || {
while !stop_for_thread.load(Ordering::Acquire) {
let current = getter();
if current >= deadline {
break;
}
let remaining =
deadline.as_nanos().saturating_sub(current.as_nanos());
let mut park_dur = Duration::from_nanos(remaining);
if polls_custom_time_getter {
park_dur = park_dur.min(CUSTOM_TIME_GETTER_POLL_INTERVAL);
}
std::thread::park_timeout(park_dur);
}
if stop_for_thread.load(Ordering::Acquire) {
return;
}
ready_for_thread.store(true, Ordering::Release);
let waker = state_clone.lock().waker.take();
if let Some(waker) = waker {
waker.wake();
}
completed_for_thread.store(true, Ordering::Release);
});
let thread = handle.thread().clone();
state.fallback = Some(FallbackThread {
stop,
completed,
thread,
join: handle,
});
}
}
drop(state);
for handle in finished_handles {
let _ = handle.join();
}
Poll::Pending
}
}
}
}
impl Drop for Sleep {
fn drop(&mut self) {
let (handle, driver, fallback_handles) = {
let mut state = self.state.lock();
state.waker = None;
let mut handles = std::mem::take(&mut state.zombie_fallbacks);
if let Some(fallback) = state.fallback.take() {
request_stop_fallback(&fallback);
handles.push(fallback.join);
}
(
state.timer_handle.take(),
state.timer_driver.take(),
handles,
)
};
drop(fallback_handles);
if let (Some(handle), Some(driver)) = (handle, driver) {
let trace = Cx::current().and_then(|current| current.trace_buffer());
if let Some(trace) = trace.as_ref() {
let now = driver.now();
trace.record_event(|seq| TraceEvent::timer_cancelled(seq, now, handle.id()));
}
let _ = driver.cancel(&handle);
}
}
}
impl Clone for Sleep {
fn clone(&self) -> Self {
Self {
deadline: self.deadline,
time_getter: self.time_getter,
bound_timer_driver: self.bound_timer_driver.clone(),
polled: std::sync::atomic::AtomicBool::new(false), completed: std::sync::atomic::AtomicBool::new(false),
ready: Arc::new(AtomicBool::new(false)),
state: Arc::new(Mutex::new(SleepState {
waker: None,
fallback: None,
zombie_fallbacks: Vec::new(),
timer_handle: None, timer_driver: None,
})),
}
}
}
#[must_use]
#[inline]
pub fn sleep(now: Time, duration: Duration) -> Sleep {
Sleep::after(now, duration)
}
#[must_use]
#[inline]
pub fn sleep_until(deadline: Time) -> Sleep {
Sleep::new(deadline)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cx::Cx;
use crate::test_utils::init_test_logging;
use crate::time::{TimerDriverHandle, VirtualClock};
use crate::types::{Budget, RegionId, TaskId};
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::task::{Context, Waker};
fn init_test(name: &str) {
init_test_logging();
crate::test_phase!(name);
}
static CURRENT_TIME: AtomicU64 = AtomicU64::new(0);
fn get_time() -> Time {
Time::from_nanos(CURRENT_TIME.load(Ordering::SeqCst))
}
#[test]
fn new_creates_sleep_with_deadline() {
init_test("new_creates_sleep_with_deadline");
let sleep = Sleep::new(Time::from_secs(5));
crate::assert_with_log!(
sleep.deadline() == Time::from_secs(5),
"deadline",
Time::from_secs(5),
sleep.deadline()
);
crate::assert_with_log!(!sleep.was_polled(), "not polled", false, sleep.was_polled());
crate::test_complete!("new_creates_sleep_with_deadline");
}
#[test]
fn after_computes_deadline() {
init_test("after_computes_deadline");
let now = Time::from_secs(10);
let sleep = Sleep::after(now, Duration::from_secs(5));
crate::assert_with_log!(
sleep.deadline() == Time::from_secs(15),
"deadline",
Time::from_secs(15),
sleep.deadline()
);
crate::test_complete!("after_computes_deadline");
}
#[test]
fn after_saturates() {
init_test("after_saturates");
let now = Time::from_nanos(u64::MAX - 1000);
let sleep = Sleep::after(now, Duration::from_secs(1));
crate::assert_with_log!(
sleep.deadline() == Time::MAX,
"deadline",
Time::MAX,
sleep.deadline()
);
crate::test_complete!("after_saturates");
}
#[test]
fn sleep_function() {
init_test("sleep_function");
let now = Time::from_millis(100);
let s = sleep(now, Duration::from_millis(50));
crate::assert_with_log!(
s.deadline() == Time::from_millis(150),
"deadline",
Time::from_millis(150),
s.deadline()
);
crate::test_complete!("sleep_function");
}
#[test]
fn sleep_until_function() {
init_test("sleep_until_function");
let s = sleep_until(Time::from_secs(42));
crate::assert_with_log!(
s.deadline() == Time::from_secs(42),
"deadline",
Time::from_secs(42),
s.deadline()
);
crate::test_complete!("sleep_until_function");
}
#[test]
fn with_time_getter() {
init_test("with_time_getter");
CURRENT_TIME.store(0, Ordering::SeqCst);
let sleep = Sleep::with_time_getter(Time::from_secs(5), get_time);
let elapsed = sleep.is_elapsed(get_time());
crate::assert_with_log!(!elapsed, "not elapsed", false, elapsed);
CURRENT_TIME.store(6_000_000_000, Ordering::SeqCst);
let elapsed = sleep.is_elapsed(get_time());
crate::assert_with_log!(elapsed, "elapsed", true, elapsed);
crate::test_complete!("with_time_getter");
}
#[test]
fn custom_time_getter_wakes_promptly_after_logical_time_advance() {
init_test("custom_time_getter_wakes_promptly_after_logical_time_advance");
CURRENT_TIME.store(0, Ordering::SeqCst);
let woken = Arc::new(AtomicBool::new(false));
let waker = waker_that_sets(Arc::clone(&woken));
let mut task_cx = Context::from_waker(&waker);
let mut sleep = Sleep::with_time_getter(Time::from_secs(10), get_time);
let first = Pin::new(&mut sleep).poll(&mut task_cx);
crate::assert_with_log!(
first.is_pending(),
"first pending",
true,
first.is_pending()
);
CURRENT_TIME.store(Time::from_secs(10).as_nanos(), Ordering::SeqCst);
let wait_deadline = Instant::now() + Duration::from_millis(100);
while !woken.load(Ordering::SeqCst) && Instant::now() < wait_deadline {
std::thread::sleep(Duration::from_millis(1));
}
let woke = woken.load(Ordering::SeqCst);
crate::assert_with_log!(woke, "waker fired", true, woke);
let second = Pin::new(&mut sleep).poll(&mut task_cx);
crate::assert_with_log!(second.is_ready(), "second ready", true, second.is_ready());
crate::test_complete!("custom_time_getter_wakes_promptly_after_logical_time_advance");
}
#[test]
fn is_elapsed_before_deadline() {
init_test("is_elapsed_before_deadline");
let sleep = Sleep::new(Time::from_secs(10));
let elapsed = sleep.is_elapsed(Time::from_secs(5));
crate::assert_with_log!(!elapsed, "not elapsed", false, elapsed);
crate::test_complete!("is_elapsed_before_deadline");
}
#[test]
fn is_elapsed_at_deadline() {
init_test("is_elapsed_at_deadline");
let sleep = Sleep::new(Time::from_secs(10));
let elapsed = sleep.is_elapsed(Time::from_secs(10));
crate::assert_with_log!(elapsed, "elapsed", true, elapsed);
crate::test_complete!("is_elapsed_at_deadline");
}
#[test]
fn is_elapsed_after_deadline() {
init_test("is_elapsed_after_deadline");
let sleep = Sleep::new(Time::from_secs(10));
let elapsed = sleep.is_elapsed(Time::from_secs(15));
crate::assert_with_log!(elapsed, "elapsed", true, elapsed);
crate::test_complete!("is_elapsed_after_deadline");
}
#[test]
fn remaining_before_deadline() {
init_test("remaining_before_deadline");
let sleep = Sleep::new(Time::from_secs(10));
let remaining = sleep.remaining(Time::from_secs(7));
crate::assert_with_log!(
remaining == Duration::from_secs(3),
"remaining",
Duration::from_secs(3),
remaining
);
crate::test_complete!("remaining_before_deadline");
}
#[test]
fn remaining_at_deadline() {
init_test("remaining_at_deadline");
let sleep = Sleep::new(Time::from_secs(10));
let remaining = sleep.remaining(Time::from_secs(10));
crate::assert_with_log!(
remaining == Duration::ZERO,
"remaining",
Duration::ZERO,
remaining
);
crate::test_complete!("remaining_at_deadline");
}
#[test]
fn remaining_after_deadline() {
init_test("remaining_after_deadline");
let sleep = Sleep::new(Time::from_secs(10));
let remaining = sleep.remaining(Time::from_secs(15));
crate::assert_with_log!(
remaining == Duration::ZERO,
"remaining",
Duration::ZERO,
remaining
);
crate::test_complete!("remaining_after_deadline");
}
#[test]
fn poll_with_time_before_deadline() {
init_test("poll_with_time_before_deadline");
let sleep = Sleep::new(Time::from_secs(10));
let poll = sleep.poll_with_time(Time::from_secs(5));
crate::assert_with_log!(poll.is_pending(), "pending", true, poll.is_pending());
crate::assert_with_log!(sleep.was_polled(), "was polled", true, sleep.was_polled());
crate::test_complete!("poll_with_time_before_deadline");
}
#[test]
fn poll_with_time_at_deadline() {
init_test("poll_with_time_at_deadline");
let sleep = Sleep::new(Time::from_secs(10));
let poll = sleep.poll_with_time(Time::from_secs(10));
crate::assert_with_log!(poll.is_ready(), "ready", true, poll.is_ready());
crate::test_complete!("poll_with_time_at_deadline");
}
#[test]
fn poll_with_time_after_deadline() {
init_test("poll_with_time_after_deadline");
let sleep = Sleep::new(Time::from_secs(10));
let poll = sleep.poll_with_time(Time::from_secs(15));
crate::assert_with_log!(poll.is_ready(), "ready", true, poll.is_ready());
crate::test_complete!("poll_with_time_after_deadline");
}
#[test]
fn poll_with_time_zero_deadline() {
init_test("poll_with_time_zero_deadline");
let sleep = Sleep::new(Time::ZERO);
let poll = sleep.poll_with_time(Time::ZERO);
crate::assert_with_log!(poll.is_ready(), "ready", true, poll.is_ready());
crate::test_complete!("poll_with_time_zero_deadline");
}
#[test]
fn poll_with_time_repoll_after_completion_panics() {
init_test("poll_with_time_repoll_after_completion_panics");
let sleep = Sleep::new(Time::from_secs(10));
let first = sleep.poll_with_time(Time::from_secs(10));
crate::assert_with_log!(first.is_ready(), "first ready", true, first.is_ready());
let repoll = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _ = sleep.poll_with_time(Time::from_secs(10));
}));
crate::assert_with_log!(repoll.is_err(), "repoll panics", true, repoll.is_err());
crate::test_complete!("poll_with_time_repoll_after_completion_panics");
}
#[test]
fn reset_changes_deadline() {
init_test("reset_changes_deadline");
let mut sleep = Sleep::new(Time::from_secs(10));
let _ = sleep.poll_with_time(Time::from_secs(5));
crate::assert_with_log!(sleep.was_polled(), "was polled", true, sleep.was_polled());
sleep.reset(Time::from_secs(20));
crate::assert_with_log!(
sleep.deadline() == Time::from_secs(20),
"deadline",
Time::from_secs(20),
sleep.deadline()
);
crate::assert_with_log!(
!sleep.was_polled(),
"reset clears polled",
false,
sleep.was_polled()
); crate::test_complete!("reset_changes_deadline");
}
#[test]
fn reset_after_changes_deadline() {
init_test("reset_after_changes_deadline");
let mut sleep = Sleep::new(Time::from_secs(10));
sleep.reset_after(Time::from_secs(5), Duration::from_secs(3));
crate::assert_with_log!(
sleep.deadline() == Time::from_secs(8),
"deadline",
Time::from_secs(8),
sleep.deadline()
);
crate::test_complete!("reset_after_changes_deadline");
}
#[test]
fn reset_after_completion_allows_sleep_reuse() {
init_test("reset_after_completion_allows_sleep_reuse");
let mut sleep = Sleep::new(Time::from_secs(10));
let first = sleep.poll_with_time(Time::from_secs(10));
crate::assert_with_log!(first.is_ready(), "first ready", true, first.is_ready());
sleep.reset(Time::from_secs(20));
let second = sleep.poll_with_time(Time::from_secs(15));
crate::assert_with_log!(
second.is_pending(),
"pending after reset before deadline",
true,
second.is_pending()
);
let third = sleep.poll_with_time(Time::from_secs(20));
crate::assert_with_log!(
third.is_ready(),
"ready after reset",
true,
third.is_ready()
);
crate::test_complete!("reset_after_completion_allows_sleep_reuse");
}
fn noop_waker() -> Waker {
std::task::Waker::noop().clone()
}
fn waker_that_sets(flag: Arc<AtomicBool>) -> Waker {
struct FlagWaker {
flag: Arc<AtomicBool>,
}
impl Wake for FlagWaker {
fn wake(self: Arc<Self>) {
self.flag.store(true, Ordering::SeqCst);
}
fn wake_by_ref(self: &Arc<Self>) {
self.flag.store(true, Ordering::SeqCst);
}
}
Waker::from(Arc::new(FlagWaker { flag }))
}
#[test]
fn drop_cancels_timer_registration() {
init_test("drop_cancels_timer_registration");
let clock = Arc::new(VirtualClock::new());
let timer = TimerDriverHandle::with_virtual_clock(clock);
let cx = Cx::new_with_drivers(
RegionId::new_for_test(0, 0),
TaskId::new_for_test(0, 0),
Budget::INFINITE,
None,
None,
None,
Some(timer.clone()),
None,
);
let _guard = Cx::set_current(Some(cx));
let mut sleep = Sleep::after(timer.now(), Duration::from_secs(1));
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let poll = Pin::new(&mut sleep).poll(&mut cx);
crate::assert_with_log!(poll.is_pending(), "pending", true, poll.is_pending());
crate::assert_with_log!(
timer.pending_count() == 1,
"timer registered",
1,
timer.pending_count()
);
drop(sleep);
crate::assert_with_log!(
timer.pending_count() == 0,
"timer cancelled on drop",
0,
timer.pending_count()
);
crate::test_complete!("drop_cancels_timer_registration");
}
#[test]
fn reset_cancels_old_timer_and_re_registers_on_poll() {
init_test("reset_cancels_old_timer_and_re_registers_on_poll");
let clock = Arc::new(VirtualClock::new());
let timer = TimerDriverHandle::with_virtual_clock(clock.clone());
let cx = Cx::new_with_drivers(
RegionId::new_for_test(0, 0),
TaskId::new_for_test(0, 0),
Budget::INFINITE,
None,
None,
None,
Some(timer.clone()),
None,
);
let _guard = Cx::set_current(Some(cx));
let mut sleep = Sleep::after(timer.now(), Duration::from_secs(5));
let waker = noop_waker();
let mut task_cx = Context::from_waker(&waker);
let first_poll = Pin::new(&mut sleep).poll(&mut task_cx);
crate::assert_with_log!(
first_poll.is_pending(),
"first poll pending",
true,
first_poll.is_pending()
);
crate::assert_with_log!(
timer.pending_count() == 1,
"first timer registration",
1,
timer.pending_count()
);
sleep.reset(Time::from_secs(10));
crate::assert_with_log!(
timer.pending_count() == 0,
"reset cancels previous timer",
0,
timer.pending_count()
);
crate::assert_with_log!(
sleep.deadline() == Time::from_secs(10),
"deadline updated on reset",
Time::from_secs(10),
sleep.deadline()
);
let second_poll = Pin::new(&mut sleep).poll(&mut task_cx);
crate::assert_with_log!(
second_poll.is_pending(),
"second poll pending after reset",
true,
second_poll.is_pending()
);
crate::assert_with_log!(
timer.pending_count() == 1,
"timer re-registered after reset",
1,
timer.pending_count()
);
clock.set(Time::from_secs(9));
let fired_before_deadline = timer.process_timers();
crate::assert_with_log!(
fired_before_deadline == 0,
"no timers fire before new deadline",
0,
fired_before_deadline
);
clock.set(Time::from_secs(10));
let _ = timer.process_timers();
let ready_poll = Pin::new(&mut sleep).poll(&mut task_cx);
crate::assert_with_log!(
ready_poll.is_ready(),
"sleep ready at reset deadline",
true,
ready_poll.is_ready()
);
crate::assert_with_log!(
timer.pending_count() == 0,
"timer registration cleared on completion",
0,
timer.pending_count()
);
crate::test_complete!("reset_cancels_old_timer_and_re_registers_on_poll");
}
#[test]
#[should_panic(expected = "Sleep polled after completion")]
fn future_repoll_after_completion_panics() {
init_test("future_repoll_after_completion_panics");
CURRENT_TIME.store(Time::from_secs(10).as_nanos(), Ordering::SeqCst);
let mut sleep = Sleep::with_time_getter(Time::from_secs(10), get_time);
let waker = noop_waker();
let mut task_cx = Context::from_waker(&waker);
let first = Pin::new(&mut sleep).poll(&mut task_cx);
crate::assert_with_log!(first.is_ready(), "first ready", true, first.is_ready());
let _ = Pin::new(&mut sleep).poll(&mut task_cx);
crate::test_complete!("future_repoll_after_completion_panics");
}
#[test]
fn poll_with_new_timer_driver_migrates_registration() {
init_test("poll_with_new_timer_driver_migrates_registration");
let clock1 = Arc::new(VirtualClock::new());
let timer1 = TimerDriverHandle::with_virtual_clock(clock1);
let cx1 = Cx::new_with_drivers(
RegionId::new_for_test(0, 1),
TaskId::new_for_test(0, 1),
Budget::INFINITE,
None,
None,
None,
Some(timer1.clone()),
None,
);
let _guard1 = Cx::set_current(Some(cx1));
let mut sleep = Sleep::after(timer1.now(), Duration::from_secs(5));
let waker = noop_waker();
let mut task_cx = Context::from_waker(&waker);
let first_poll = Pin::new(&mut sleep).poll(&mut task_cx);
crate::assert_with_log!(
first_poll.is_pending(),
"first poll pending",
true,
first_poll.is_pending()
);
crate::assert_with_log!(
timer1.pending_count() == 1,
"timer1 has registration",
1,
timer1.pending_count()
);
let clock2 = Arc::new(VirtualClock::new());
let timer2 = TimerDriverHandle::with_virtual_clock(clock2);
let cx2 = Cx::new_with_drivers(
RegionId::new_for_test(0, 2),
TaskId::new_for_test(0, 2),
Budget::INFINITE,
None,
None,
None,
Some(timer2.clone()),
None,
);
{
let _guard2 = Cx::set_current(Some(cx2));
let second_poll = Pin::new(&mut sleep).poll(&mut task_cx);
crate::assert_with_log!(
second_poll.is_pending(),
"second poll pending on new driver",
true,
second_poll.is_pending()
);
crate::assert_with_log!(
timer1.pending_count() == 0,
"timer1 registration canceled after migration",
0,
timer1.pending_count()
);
crate::assert_with_log!(
timer2.pending_count() == 1,
"timer2 owns migrated registration",
1,
timer2.pending_count()
);
drop(sleep);
crate::assert_with_log!(
timer2.pending_count() == 0,
"drop cancels migrated timer registration",
0,
timer2.pending_count()
);
}
crate::test_complete!("poll_with_new_timer_driver_migrates_registration");
}
#[test]
fn poll_after_timer_fire_stays_ready_across_driver_migration() {
init_test("poll_after_timer_fire_stays_ready_across_driver_migration");
let clock1 = Arc::new(VirtualClock::new());
let timer1 = TimerDriverHandle::with_virtual_clock(clock1.clone());
let cx1 = Cx::new_with_drivers(
RegionId::new_for_test(0, 3),
TaskId::new_for_test(0, 3),
Budget::INFINITE,
None,
None,
None,
Some(timer1.clone()),
None,
);
let _guard1 = Cx::set_current(Some(cx1));
let mut sleep = Sleep::after(timer1.now(), Duration::from_secs(5));
let woke = Arc::new(AtomicBool::new(false));
let waker = waker_that_sets(Arc::clone(&woke));
let mut task_cx = Context::from_waker(&waker);
let first_poll = Pin::new(&mut sleep).poll(&mut task_cx);
crate::assert_with_log!(
first_poll.is_pending(),
"first poll pending",
true,
first_poll.is_pending()
);
clock1.set(Time::from_secs(6));
let fired = timer1.process_timers();
crate::assert_with_log!(fired == 1, "old driver fires timer once", 1usize, fired);
crate::assert_with_log!(
woke.load(Ordering::SeqCst),
"timer wake reached task waker",
true,
woke.load(Ordering::SeqCst)
);
let clock2 = Arc::new(VirtualClock::new());
let timer2 = TimerDriverHandle::with_virtual_clock(clock2);
let cx2 = Cx::new_with_drivers(
RegionId::new_for_test(0, 4),
TaskId::new_for_test(0, 4),
Budget::INFINITE,
None,
None,
None,
Some(timer2.clone()),
None,
);
let _guard2 = Cx::set_current(Some(cx2));
let second_poll = Pin::new(&mut sleep).poll(&mut task_cx);
crate::assert_with_log!(
second_poll.is_ready(),
"fired timer remains ready on new driver",
true,
second_poll.is_ready()
);
crate::assert_with_log!(
timer2.pending_count() == 0,
"new driver does not re-arm an already fired sleep",
0,
timer2.pending_count()
);
crate::test_complete!("poll_after_timer_fire_stays_ready_across_driver_migration");
}
#[test]
fn poll_after_fallback_wake_stays_ready_on_driver() {
init_test("poll_after_fallback_wake_stays_ready_on_driver");
let _guard = Cx::set_current(None);
let mut sleep = Sleep::after(wall_now(), Duration::from_millis(10));
let woke = Arc::new(AtomicBool::new(false));
let waker = waker_that_sets(Arc::clone(&woke));
let mut task_cx = Context::from_waker(&waker);
let first_poll = Pin::new(&mut sleep).poll(&mut task_cx);
crate::assert_with_log!(
first_poll.is_pending(),
"first poll pending",
true,
first_poll.is_pending()
);
let start = Instant::now();
while !woke.load(Ordering::SeqCst) && start.elapsed() < Duration::from_millis(250) {
std::thread::sleep(Duration::from_millis(1));
}
crate::assert_with_log!(
woke.load(Ordering::SeqCst),
"fallback thread wakes task",
true,
woke.load(Ordering::SeqCst)
);
let clock = Arc::new(VirtualClock::new());
let timer = TimerDriverHandle::with_virtual_clock(clock);
let cx = Cx::new_with_drivers(
RegionId::new_for_test(0, 5),
TaskId::new_for_test(0, 5),
Budget::INFINITE,
None,
None,
None,
Some(timer.clone()),
None,
);
let _guard2 = Cx::set_current(Some(cx));
let second_poll = Pin::new(&mut sleep).poll(&mut task_cx);
crate::assert_with_log!(
second_poll.is_ready(),
"fallback wake remains ready after driver appears",
true,
second_poll.is_ready()
);
crate::assert_with_log!(
timer.pending_count() == 0,
"driver does not re-arm an already fired fallback sleep",
0,
timer.pending_count()
);
crate::test_complete!("poll_after_fallback_wake_stays_ready_on_driver");
}
#[test]
fn clone_copies_deadline() {
init_test("clone_copies_deadline");
let original = Sleep::new(Time::from_secs(10));
let cloned = original.clone();
crate::assert_with_log!(
original.deadline() == Time::from_secs(10),
"original deadline",
Time::from_secs(10),
original.deadline()
);
crate::assert_with_log!(
cloned.deadline() == Time::from_secs(10),
"cloned deadline",
Time::from_secs(10),
cloned.deadline()
);
crate::test_complete!("clone_copies_deadline");
}
#[test]
fn clone_has_fresh_polled_flag() {
init_test("clone_has_fresh_polled_flag");
let original = Sleep::new(Time::from_secs(10));
let _ = original.poll_with_time(Time::from_secs(5));
crate::assert_with_log!(
original.was_polled(),
"original polled",
true,
original.was_polled()
);
let cloned = original.clone();
crate::assert_with_log!(
original.was_polled(),
"original still polled",
true,
original.was_polled()
);
crate::assert_with_log!(
!cloned.was_polled(),
"cloned not polled",
false,
cloned.was_polled()
);
crate::test_complete!("clone_has_fresh_polled_flag");
}
#[test]
fn zero_duration_sleep() {
init_test("zero_duration_sleep");
let now = Time::from_secs(10);
let sleep = sleep(now, Duration::ZERO);
crate::assert_with_log!(
sleep.deadline() == Time::from_secs(10),
"deadline",
Time::from_secs(10),
sleep.deadline()
);
let poll = sleep.poll_with_time(now);
crate::assert_with_log!(poll.is_ready(), "ready", true, poll.is_ready());
crate::test_complete!("zero_duration_sleep");
}
#[test]
fn max_time_deadline() {
init_test("max_time_deadline");
let sleep = Sleep::new(Time::MAX);
let poll = sleep.poll_with_time(Time::from_secs(1000));
crate::assert_with_log!(poll.is_pending(), "pending", true, poll.is_pending());
let poll = sleep.poll_with_time(Time::MAX);
crate::assert_with_log!(poll.is_ready(), "ready at max", true, poll.is_ready());
crate::test_complete!("max_time_deadline");
}
#[test]
fn time_zero_deadline() {
init_test("time_zero_deadline");
let sleep = Sleep::new(Time::ZERO);
let poll = sleep.poll_with_time(Time::from_nanos(1));
crate::assert_with_log!(poll.is_ready(), "ready", true, poll.is_ready());
crate::test_complete!("time_zero_deadline");
}
#[test]
fn mr_cancel_idempotency() {
init_test("mr_cancel_idempotency");
let clock = Arc::new(VirtualClock::new());
let timer = TimerDriverHandle::with_virtual_clock(clock.clone());
let cx = Cx::new_with_drivers(
RegionId::new_for_test(0, 100),
TaskId::new_for_test(0, 100),
Budget::INFINITE,
None,
None,
None,
Some(timer.clone()),
None,
);
let _guard = Cx::set_current(Some(cx));
let initial_deadline = Time::from_secs(10);
let reset_deadline = Time::from_secs(20);
let mut sleep1 = Sleep::new(initial_deadline);
let mut sleep2 = Sleep::new(initial_deadline);
let waker = noop_waker();
let mut task_cx = Context::from_waker(&waker);
let _ = Pin::new(&mut sleep1).poll(&mut task_cx);
let _ = Pin::new(&mut sleep2).poll(&mut task_cx);
sleep1.reset(reset_deadline); sleep2.reset(reset_deadline); sleep2.reset(reset_deadline);
crate::assert_with_log!(
sleep1.deadline() == sleep2.deadline(),
"deadlines equal after reset idempotency",
sleep1.deadline(),
sleep2.deadline()
);
crate::assert_with_log!(
sleep1.was_polled() == sleep2.was_polled(),
"polled state equal after reset idempotency",
sleep1.was_polled(),
sleep2.was_polled()
);
let poll1 = Pin::new(&mut sleep1).poll(&mut task_cx);
let poll2 = Pin::new(&mut sleep2).poll(&mut task_cx);
crate::assert_with_log!(
poll1.is_pending() && poll2.is_pending(),
"both pending after reset idempotency",
true,
poll1.is_pending() && poll2.is_pending()
);
clock.set(reset_deadline);
let _ = timer.process_timers();
let final1 = Pin::new(&mut sleep1).poll(&mut task_cx);
let final2 = Pin::new(&mut sleep2).poll(&mut task_cx);
crate::assert_with_log!(
final1.is_ready() && final2.is_ready(),
"both ready after timer fires",
true,
final1.is_ready() && final2.is_ready()
);
crate::test_complete!("mr_cancel_idempotency");
}
#[test]
fn mr_cancel_after_fire_noop() {
init_test("mr_cancel_after_fire_noop");
let clock = Arc::new(VirtualClock::new());
let timer = TimerDriverHandle::with_virtual_clock(clock.clone());
let cx = Cx::new_with_drivers(
RegionId::new_for_test(0, 101),
TaskId::new_for_test(0, 101),
Budget::INFINITE,
None,
None,
None,
Some(timer.clone()),
None,
);
let _guard = Cx::set_current(Some(cx));
let deadline = Time::from_secs(5);
let mut sleep = Sleep::new(deadline);
let waker = noop_waker();
let mut task_cx = Context::from_waker(&waker);
let initial = Pin::new(&mut sleep).poll(&mut task_cx);
crate::assert_with_log!(
initial.is_pending(),
"initial poll pending",
true,
initial.is_pending()
);
clock.set(deadline);
let _ = timer.process_timers();
let fired = Pin::new(&mut sleep).poll(&mut task_cx);
crate::assert_with_log!(
fired.is_ready(),
"sleep ready after timer fires",
true,
fired.is_ready()
);
crate::assert_with_log!(
timer.pending_count() == 0,
"no timers pending after completion",
0,
timer.pending_count()
);
let new_deadline = Time::from_secs(10);
sleep.reset(new_deadline);
crate::assert_with_log!(
sleep.deadline() == new_deadline,
"deadline updated after reset",
new_deadline,
sleep.deadline()
);
crate::assert_with_log!(
!sleep.was_polled(),
"polled flag cleared after reset",
false,
sleep.was_polled()
);
let after_reset = Pin::new(&mut sleep).poll(&mut task_cx);
crate::assert_with_log!(
after_reset.is_pending(),
"pending after reset on completed sleep",
true,
after_reset.is_pending()
);
crate::test_complete!("mr_cancel_after_fire_noop");
}
#[test]
fn mr_reset_after_cancel_fresh() {
init_test("mr_reset_after_cancel_fresh");
let clock = Arc::new(VirtualClock::new());
let timer = TimerDriverHandle::with_virtual_clock(clock.clone());
let cx = Cx::new_with_drivers(
RegionId::new_for_test(0, 102),
TaskId::new_for_test(0, 102),
Budget::INFINITE,
None,
None,
None,
Some(timer.clone()),
None,
);
let _guard = Cx::set_current(Some(cx));
let original_deadline = Time::from_secs(5);
let reset_deadline = Time::from_secs(15);
let mut sleep = Sleep::new(original_deadline);
let waker = noop_waker();
let mut task_cx = Context::from_waker(&waker);
let _ = Pin::new(&mut sleep).poll(&mut task_cx);
crate::assert_with_log!(
timer.pending_count() == 1,
"original timer registered",
1,
timer.pending_count()
);
sleep.reset(reset_deadline);
crate::assert_with_log!(
timer.pending_count() == 0,
"reset cancels original timer",
0,
timer.pending_count()
);
crate::assert_with_log!(
sleep.deadline() == reset_deadline,
"deadline updated by reset",
reset_deadline,
sleep.deadline()
);
let after_reset = Pin::new(&mut sleep).poll(&mut task_cx);
crate::assert_with_log!(
after_reset.is_pending(),
"pending after reset",
true,
after_reset.is_pending()
);
crate::assert_with_log!(
timer.pending_count() == 1,
"new timer registered after reset",
1,
timer.pending_count()
);
clock.set(original_deadline);
let original_fires = timer.process_timers();
crate::assert_with_log!(
original_fires == 0,
"original deadline does not fire reset timer",
0,
original_fires
);
let still_pending = Pin::new(&mut sleep).poll(&mut task_cx);
crate::assert_with_log!(
still_pending.is_pending(),
"sleep still pending at original deadline",
true,
still_pending.is_pending()
);
clock.set(reset_deadline);
let reset_fires = timer.process_timers();
crate::assert_with_log!(
reset_fires == 1,
"reset deadline fires timer",
1,
reset_fires
);
let ready = Pin::new(&mut sleep).poll(&mut task_cx);
crate::assert_with_log!(
ready.is_ready(),
"sleep ready at reset deadline",
true,
ready.is_ready()
);
crate::test_complete!("mr_reset_after_cancel_fresh");
}
#[test]
fn mr_deterministic_order_same_deadline() {
init_test("mr_deterministic_order_same_deadline");
let clock = Arc::new(VirtualClock::new());
let timer = TimerDriverHandle::with_virtual_clock(clock.clone());
let cx = Cx::new_with_drivers(
RegionId::new_for_test(0, 103),
TaskId::new_for_test(0, 103),
Budget::INFINITE,
None,
None,
None,
Some(timer.clone()),
None,
);
let _guard = Cx::set_current(Some(cx));
let shared_deadline = Time::from_secs(10);
let mut sleeps = Vec::new();
let mut woke_flags = Vec::new();
for i in 0..5 {
let mut sleep = Sleep::new(shared_deadline);
let woke = Arc::new(AtomicBool::new(false));
let waker = waker_that_sets(Arc::clone(&woke));
let mut task_cx = Context::from_waker(&waker);
let poll = Pin::new(&mut sleep).poll(&mut task_cx);
crate::assert_with_log!(
poll.is_pending(),
&format!("sleep {} pending", i),
true,
poll.is_pending()
);
sleeps.push(sleep);
woke_flags.push(woke);
}
crate::assert_with_log!(
timer.pending_count() == 5,
"all timers registered",
5,
timer.pending_count()
);
clock.set(shared_deadline);
let fired_count = timer.process_timers();
crate::assert_with_log!(
fired_count == 5,
"all timers fire at deadline",
5,
fired_count
);
for (i, woke) in woke_flags.iter().enumerate() {
crate::assert_with_log!(
woke.load(Ordering::SeqCst),
&format!("waker {} fired", i),
true,
woke.load(Ordering::SeqCst)
);
}
for (i, sleep) in sleeps.iter_mut().enumerate() {
let waker = noop_waker();
let mut task_cx = Context::from_waker(&waker);
let ready = Pin::new(sleep).poll(&mut task_cx);
crate::assert_with_log!(
ready.is_ready(),
&format!("sleep {} ready after timer fire", i),
true,
ready.is_ready()
);
}
crate::assert_with_log!(
timer.pending_count() == 0,
"no pending timers after completion",
0,
timer.pending_count()
);
crate::test_complete!("mr_deterministic_order_same_deadline");
}
#[test]
fn mr_drop_removes_atomically() {
init_test("mr_drop_removes_atomically");
let clock = Arc::new(VirtualClock::new());
let timer = TimerDriverHandle::with_virtual_clock(clock.clone());
let cx = Cx::new_with_drivers(
RegionId::new_for_test(0, 104),
TaskId::new_for_test(0, 104),
Budget::INFINITE,
None,
None,
None,
Some(timer.clone()),
None,
);
let _guard = Cx::set_current(Some(cx));
crate::assert_with_log!(
timer.pending_count() == 0,
"timer starts empty",
0,
timer.pending_count()
);
{
let mut sleep = Sleep::new(Time::from_secs(10));
let waker = noop_waker();
let mut task_cx = Context::from_waker(&waker);
let poll = Pin::new(&mut sleep).poll(&mut task_cx);
crate::assert_with_log!(
poll.is_pending(),
"sleep pending after registration",
true,
poll.is_pending()
);
crate::assert_with_log!(
timer.pending_count() == 1,
"timer registered",
1,
timer.pending_count()
);
clock.set(Time::from_secs(5));
let midway_fires = timer.process_timers();
crate::assert_with_log!(
midway_fires == 0,
"timer does not fire before deadline",
0,
midway_fires
);
}
crate::assert_with_log!(
timer.pending_count() == 0,
"timer cancelled on drop",
0,
timer.pending_count()
);
clock.set(Time::from_secs(10));
let dropped_fires = timer.process_timers();
crate::assert_with_log!(
dropped_fires == 0,
"no spurious fires after drop",
0,
dropped_fires
);
clock.set(Time::from_secs(15));
let later_fires = timer.process_timers();
crate::assert_with_log!(
later_fires == 0,
"timer wheel remains clean",
0,
later_fires
);
crate::test_complete!("mr_drop_removes_atomically");
}
#[test]
fn mr_cancellation_composition() {
init_test("mr_cancellation_composition");
let clock = Arc::new(VirtualClock::new());
let timer = TimerDriverHandle::with_virtual_clock(clock.clone());
let cx = Cx::new_with_drivers(
RegionId::new_for_test(0, 105),
TaskId::new_for_test(0, 105),
Budget::INFINITE,
None,
None,
None,
Some(timer.clone()),
None,
);
let _guard = Cx::set_current(Some(cx));
let waker = noop_waker();
let mut task_cx = Context::from_waker(&waker);
let original = Sleep::new(Time::from_secs(5));
let mut cloned = original.clone();
let _ = Pin::new(&mut cloned).poll(&mut task_cx);
crate::assert_with_log!(
timer.pending_count() == 1,
"cloned sleep registers independently",
1,
timer.pending_count()
);
cloned.reset(Time::from_secs(10));
crate::assert_with_log!(
original.deadline() == Time::from_secs(5),
"original unaffected by clone reset",
Time::from_secs(5),
original.deadline()
);
crate::assert_with_log!(
cloned.deadline() == Time::from_secs(10),
"cloned deadline updated",
Time::from_secs(10),
cloned.deadline()
);
let mut sleep1 = Sleep::new(Time::from_secs(1));
let mut sleep2 = Sleep::new(Time::from_secs(1));
let _ = Pin::new(&mut sleep1).poll(&mut task_cx);
let _ = Pin::new(&mut sleep2).poll(&mut task_cx);
crate::assert_with_log!(
timer.pending_count() == 3,
"all sleeps registered",
3,
timer.pending_count()
);
sleep1.reset(Time::from_secs(2));
sleep1.reset(Time::from_secs(3));
sleep1.reset(Time::from_secs(4));
drop(sleep1);
drop(sleep2);
crate::assert_with_log!(
timer.pending_count() == 0,
"multiple resets + drop ≡ direct drop",
0,
timer.pending_count()
);
crate::test_complete!("mr_cancellation_composition");
}
}