#![allow(unused_unsafe)]
#![cfg_attr(not(feature = "rt"), allow(dead_code))]
mod entry;
pub(crate) use entry::TimerEntry;
use entry::{EntryList, TimerHandle, TimerShared, MAX_SAFE_MILLIS_DURATION};
mod handle;
pub(crate) use self::handle::Handle;
use self::wheel::Wheel;
mod source;
pub(crate) use source::TimeSource;
mod wheel;
use crate::loom::sync::atomic::{AtomicBool, Ordering};
use crate::loom::sync::{Mutex, RwLock};
use crate::runtime::driver::{self, IoHandle, IoStack};
use crate::time::error::Error;
use crate::time::{Clock, Duration};
use crate::util::WakeList;
use crate::loom::sync::atomic::AtomicU64;
use std::fmt;
use std::{num::NonZeroU64, ptr::NonNull};
struct AtomicOptionNonZeroU64(AtomicU64);
impl AtomicOptionNonZeroU64 {
fn new(val: Option<NonZeroU64>) -> Self {
Self(AtomicU64::new(val.map_or(0, NonZeroU64::get)))
}
fn store(&self, val: Option<NonZeroU64>) {
self.0
.store(val.map_or(0, NonZeroU64::get), Ordering::Relaxed);
}
fn load(&self) -> Option<NonZeroU64> {
NonZeroU64::new(self.0.load(Ordering::Relaxed))
}
}
#[derive(Debug)]
pub(crate) struct Driver {
park: IoStack,
}
struct Inner {
next_wake: AtomicOptionNonZeroU64,
wheels: RwLock<ShardedWheel>,
wheels_len: u32,
pub(super) is_shutdown: AtomicBool,
#[cfg(feature = "test-util")]
did_wake: AtomicBool,
}
struct ShardedWheel(Box<[Mutex<wheel::Wheel>]>);
impl Driver {
pub(crate) fn new(park: IoStack, clock: &Clock, shards: u32) -> (Driver, Handle) {
assert!(shards > 0);
let time_source = TimeSource::new(clock);
let wheels: Vec<_> = (0..shards)
.map(|_| Mutex::new(wheel::Wheel::new()))
.collect();
let handle = Handle {
time_source,
inner: Inner {
next_wake: AtomicOptionNonZeroU64::new(None),
wheels: RwLock::new(ShardedWheel(wheels.into_boxed_slice())),
wheels_len: shards,
is_shutdown: AtomicBool::new(false),
#[cfg(feature = "test-util")]
did_wake: AtomicBool::new(false),
},
};
let driver = Driver { park };
(driver, handle)
}
pub(crate) fn park(&mut self, handle: &driver::Handle) {
self.park_internal(handle, None);
}
pub(crate) fn park_timeout(&mut self, handle: &driver::Handle, duration: Duration) {
self.park_internal(handle, Some(duration));
}
pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) {
let handle = rt_handle.time();
if handle.is_shutdown() {
return;
}
handle.inner.is_shutdown.store(true, Ordering::SeqCst);
handle.process_at_time(0, u64::MAX);
self.park.shutdown(rt_handle);
}
fn park_internal(&mut self, rt_handle: &driver::Handle, limit: Option<Duration>) {
let handle = rt_handle.time();
assert!(!handle.is_shutdown());
let expiration_time = {
let mut wheels_lock = rt_handle.time().inner.wheels.write();
let expiration_time = wheels_lock
.0
.iter_mut()
.filter_map(|wheel| wheel.get_mut().next_expiration_time())
.min();
rt_handle
.time()
.inner
.next_wake
.store(next_wake_time(expiration_time));
expiration_time
};
match expiration_time {
Some(when) => {
let now = handle.time_source.now(rt_handle.clock());
let mut duration = handle
.time_source
.tick_to_duration(when.saturating_sub(now));
if duration > Duration::from_millis(0) {
if let Some(limit) = limit {
duration = std::cmp::min(limit, duration);
}
self.park_thread_timeout(rt_handle, duration);
} else {
self.park.park_timeout(rt_handle, Duration::from_secs(0));
}
}
None => {
if let Some(duration) = limit {
self.park_thread_timeout(rt_handle, duration);
} else {
self.park.park(rt_handle);
}
}
}
handle.process(rt_handle.clock());
}
cfg_test_util! {
fn park_thread_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
let handle = rt_handle.time();
let clock = rt_handle.clock();
if clock.can_auto_advance() {
self.park.park_timeout(rt_handle, Duration::from_secs(0));
if !handle.did_wake() {
if let Err(msg) = clock.advance(duration) {
panic!("{}", msg);
}
}
} else {
self.park.park_timeout(rt_handle, duration);
}
}
}
cfg_not_test_util! {
fn park_thread_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
self.park.park_timeout(rt_handle, duration);
}
}
}
fn next_wake_time(expiration_time: Option<u64>) -> Option<NonZeroU64> {
expiration_time.and_then(|v| {
if v == 0 {
NonZeroU64::new(1)
} else {
NonZeroU64::new(v)
}
})
}
impl Handle {
pub(self) fn process(&self, clock: &Clock) {
let now = self.time_source().now(clock);
let shards = self.inner.get_shard_size();
let start = crate::runtime::context::thread_rng_n(shards);
self.process_at_time(start, now);
}
pub(self) fn process_at_time(&self, start: u32, now: u64) {
let shards = self.inner.get_shard_size();
let expiration_time = (start..shards + start)
.filter_map(|i| self.process_at_sharded_time(i, now))
.min();
self.inner.next_wake.store(next_wake_time(expiration_time));
}
pub(self) fn process_at_sharded_time(&self, id: u32, mut now: u64) -> Option<u64> {
let mut waker_list = WakeList::new();
let mut wheels_lock = self.inner.wheels.read();
let mut lock = wheels_lock.lock_sharded_wheel(id);
if now < lock.elapsed() {
now = lock.elapsed();
}
while let Some(entry) = lock.poll(now) {
debug_assert!(unsafe { entry.is_pending() });
if let Some(waker) = unsafe { entry.fire(Ok(())) } {
waker_list.push(waker);
if !waker_list.can_push() {
drop(lock);
drop(wheels_lock);
waker_list.wake_all();
wheels_lock = self.inner.wheels.read();
lock = wheels_lock.lock_sharded_wheel(id);
}
}
}
let next_wake_up = lock.poll_at();
drop(lock);
drop(wheels_lock);
waker_list.wake_all();
next_wake_up
}
pub(self) unsafe fn clear_entry(&self, entry: NonNull<TimerShared>) {
unsafe {
let wheels_lock = self.inner.wheels.read();
let mut lock = wheels_lock.lock_sharded_wheel(entry.as_ref().shard_id());
if entry.as_ref().might_be_registered() {
lock.remove(entry);
}
entry.as_ref().handle().fire(Ok(()));
}
}
pub(self) unsafe fn reregister(
&self,
unpark: &IoHandle,
new_tick: u64,
entry: NonNull<TimerShared>,
) {
let waker = unsafe {
let wheels_lock = self.inner.wheels.read();
let mut lock = wheels_lock.lock_sharded_wheel(entry.as_ref().shard_id());
if unsafe { entry.as_ref().might_be_registered() } {
lock.remove(entry);
}
let entry = entry.as_ref().handle();
if self.is_shutdown() {
unsafe { entry.fire(Err(crate::time::error::Error::shutdown())) }
} else {
entry.set_expiration(new_tick);
match unsafe { lock.insert(entry) } {
Ok(when) => {
if self
.inner
.next_wake
.load()
.map(|next_wake| when < next_wake.get())
.unwrap_or(true)
{
unpark.unpark();
}
None
}
Err((entry, crate::time::error::InsertError::Elapsed)) => unsafe {
entry.fire(Ok(()))
},
}
}
};
if let Some(waker) = waker {
waker.wake();
}
}
cfg_test_util! {
fn did_wake(&self) -> bool {
self.inner.did_wake.swap(false, Ordering::SeqCst)
}
}
}
impl Inner {
pub(super) fn is_shutdown(&self) -> bool {
self.is_shutdown.load(Ordering::SeqCst)
}
fn get_shard_size(&self) -> u32 {
self.wheels_len
}
}
impl fmt::Debug for Inner {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Inner").finish()
}
}
impl ShardedWheel {
pub(super) fn lock_sharded_wheel(
&self,
shard_id: u32,
) -> crate::loom::sync::MutexGuard<'_, Wheel> {
let index = shard_id % (self.0.len() as u32);
unsafe { self.0.get_unchecked(index as usize) }.lock()
}
}
#[cfg(test)]
mod tests;