#![allow(unused_unsafe)]
#![cfg_attr(not(feature = "rt"), allow(dead_code))]
mod entry;
pub(crate) use entry::TimerEntry;
use entry::{EntryList, TimerHandle, TimerShared};
mod handle;
pub(crate) use self::handle::Handle;
mod source;
pub(crate) use source::TimeSource;
mod wheel;
use crate::loom::sync::atomic::{AtomicBool, Ordering};
use crate::loom::sync::Mutex;
use crate::runtime::driver::{self, IoHandle, IoStack};
use crate::time::error::Error;
use crate::time::{Clock, Duration};
use std::fmt;
use std::{num::NonZeroU64, ptr::NonNull, task::Waker};
#[derive(Debug)]
pub(crate) struct Driver {
park: IoStack,
}
struct Inner {
pub(super) state: Mutex<InnerState>,
pub(super) is_shutdown: AtomicBool,
#[cfg(feature = "test-util")]
did_wake: AtomicBool,
}
struct InnerState {
elapsed: u64,
next_wake: Option<NonZeroU64>,
wheel: wheel::Wheel,
}
impl Driver {
pub(crate) fn new(park: IoStack, clock: Clock) -> (Driver, Handle) {
let time_source = TimeSource::new(clock);
let handle = Handle {
time_source,
inner: Inner {
state: Mutex::new(InnerState {
elapsed: 0,
next_wake: None,
wheel: wheel::Wheel::new(),
}),
is_shutdown: AtomicBool::new(false),
#[cfg(feature = "test-util")]
did_wake: AtomicBool::new(false),
},
};
let driver = Driver { park };
(driver, handle)
}
pub(crate) fn park(&mut self, handle: &driver::Handle) {
self.park_internal(handle, None)
}
pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) {
self.park_internal(handle, Some(duration))
}
pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) {
let handle = rt_handle.time();
if handle.is_shutdown() {
return;
}
handle.inner.is_shutdown.store(true, Ordering::SeqCst);
handle.process_at_time(u64::MAX);
self.park.shutdown(rt_handle);
}
fn park_internal(&mut self, rt_handle: &driver::Handle, limit: Option<Duration>) {
let handle = rt_handle.time();
let mut lock = handle.inner.state.lock();
assert!(!handle.is_shutdown());
let next_wake = lock.wheel.next_expiration_time();
lock.next_wake =
next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap()));
drop(lock);
match next_wake {
Some(when) => {
let now = handle.time_source.now();
let mut duration = handle
.time_source
.tick_to_duration(when.saturating_sub(now));
if duration > Duration::from_millis(0) {
if let Some(limit) = limit {
duration = std::cmp::min(limit, duration);
}
self.park_thread_timeout(rt_handle, duration);
} else {
self.park.park_timeout(rt_handle, Duration::from_secs(0));
}
}
None => {
if let Some(duration) = limit {
self.park_thread_timeout(rt_handle, duration);
} else {
self.park.park(rt_handle);
}
}
}
handle.process();
}
cfg_test_util! {
fn park_thread_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
let handle = rt_handle.time();
let clock = &handle.time_source.clock;
if clock.can_auto_advance() {
self.park.park_timeout(rt_handle, Duration::from_secs(0));
if !handle.did_wake() {
clock.advance(duration);
}
} else {
self.park.park_timeout(rt_handle, duration);
}
}
}
cfg_not_test_util! {
fn park_thread_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
self.park.park_timeout(rt_handle, duration);
}
}
}
impl Handle {
pub(self) fn process(&self) {
let now = self.time_source().now();
self.process_at_time(now)
}
pub(self) fn process_at_time(&self, mut now: u64) {
let mut waker_list: [Option<Waker>; 32] = Default::default();
let mut waker_idx = 0;
let mut lock = self.inner.lock();
if now < lock.elapsed {
now = lock.elapsed;
}
while let Some(entry) = lock.wheel.poll(now) {
debug_assert!(unsafe { entry.is_pending() });
if let Some(waker) = unsafe { entry.fire(Ok(())) } {
waker_list[waker_idx] = Some(waker);
waker_idx += 1;
if waker_idx == waker_list.len() {
drop(lock);
for waker in waker_list.iter_mut() {
waker.take().unwrap().wake();
}
waker_idx = 0;
lock = self.inner.lock();
}
}
}
lock.elapsed = lock.wheel.elapsed();
lock.next_wake = lock
.wheel
.poll_at()
.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap()));
drop(lock);
for waker in waker_list[0..waker_idx].iter_mut() {
waker.take().unwrap().wake();
}
}
pub(self) unsafe fn clear_entry(&self, entry: NonNull<TimerShared>) {
unsafe {
let mut lock = self.inner.lock();
if entry.as_ref().might_be_registered() {
lock.wheel.remove(entry);
}
entry.as_ref().handle().fire(Ok(()));
}
}
pub(self) unsafe fn reregister(
&self,
unpark: &IoHandle,
new_tick: u64,
entry: NonNull<TimerShared>,
) {
let waker = unsafe {
let mut lock = self.inner.lock();
if unsafe { entry.as_ref().might_be_registered() } {
lock.wheel.remove(entry);
}
let entry = entry.as_ref().handle();
if self.is_shutdown() {
unsafe { entry.fire(Err(crate::time::error::Error::shutdown())) }
} else {
entry.set_expiration(new_tick);
match unsafe { lock.wheel.insert(entry) } {
Ok(when) => {
if lock
.next_wake
.map(|next_wake| when < next_wake.get())
.unwrap_or(true)
{
unpark.unpark();
}
None
}
Err((entry, crate::time::error::InsertError::Elapsed)) => unsafe {
entry.fire(Ok(()))
},
}
}
};
if let Some(waker) = waker {
waker.wake();
}
}
cfg_test_util! {
fn did_wake(&self) -> bool {
self.inner.did_wake.swap(false, Ordering::SeqCst)
}
}
}
impl Inner {
pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, InnerState> {
self.state.lock()
}
pub(super) fn is_shutdown(&self) -> bool {
self.is_shutdown.load(Ordering::SeqCst)
}
}
impl fmt::Debug for Inner {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Inner").finish()
}
}
#[cfg(test)]
mod tests;