#![allow(unused_unsafe)]
#![cfg_attr(not(feature = "rt"), allow(dead_code))]
mod entry;
pub(self) use self::entry::{EntryList, TimerEntry, TimerHandle, TimerShared};
mod handle;
pub(crate) use self::handle::Handle;
mod wheel;
pub(super) mod sleep;
use crate::loom::sync::{Arc, Mutex};
use crate::park::{Park, Unpark};
use crate::time::error::Error;
use crate::time::{Clock, Duration, Instant};
use std::convert::TryInto;
use std::fmt;
use std::{num::NonZeroU64, ptr::NonNull, task::Waker};
#[derive(Debug)]
pub(crate) struct Driver<P: Park + 'static> {
time_source: ClockTime,
inner: Handle,
park: P,
}
#[derive(Debug, Clone)]
pub(self) struct ClockTime {
clock: super::clock::Clock,
start_time: Instant,
}
impl ClockTime {
pub(self) fn new(clock: Clock) -> Self {
Self {
start_time: clock.now(),
clock,
}
}
pub(self) fn deadline_to_tick(&self, t: Instant) -> u64 {
self.instant_to_tick(t + Duration::from_nanos(999_999))
}
pub(self) fn instant_to_tick(&self, t: Instant) -> u64 {
let dur: Duration = t
.checked_duration_since(self.start_time)
.unwrap_or_else(|| Duration::from_secs(0));
let ms = dur.as_millis();
ms.try_into().expect("Duration too far into the future")
}
pub(self) fn tick_to_duration(&self, t: u64) -> Duration {
Duration::from_millis(t)
}
pub(self) fn now(&self) -> u64 {
self.instant_to_tick(self.clock.now())
}
}
pub(self) struct Inner {
time_source: ClockTime,
elapsed: u64,
next_wake: Option<NonZeroU64>,
wheel: wheel::Wheel,
is_shutdown: bool,
unpark: Box<dyn Unpark>,
}
impl<P> Driver<P>
where
P: Park + 'static,
{
pub(crate) fn new(park: P, clock: Clock) -> Driver<P> {
let time_source = ClockTime::new(clock);
let inner = Inner::new(time_source.clone(), Box::new(park.unpark()));
Driver {
time_source,
inner: Handle::new(Arc::new(Mutex::new(inner))),
park,
}
}
pub(crate) fn handle(&self) -> Handle {
self.inner.clone()
}
fn park_internal(&mut self, limit: Option<Duration>) -> Result<(), P::Error> {
let clock = &self.time_source.clock;
let mut lock = self.inner.lock();
assert!(!lock.is_shutdown);
let next_wake = lock.wheel.next_expiration_time();
lock.next_wake =
next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap()));
drop(lock);
match next_wake {
Some(when) => {
let now = self.time_source.now();
let mut duration = self.time_source.tick_to_duration(when.saturating_sub(now));
if duration > Duration::from_millis(0) {
if let Some(limit) = limit {
duration = std::cmp::min(limit, duration);
}
if clock.is_paused() {
self.park.park_timeout(Duration::from_secs(0))?;
clock.advance(duration);
} else {
self.park.park_timeout(duration)?;
}
} else {
self.park.park_timeout(Duration::from_secs(0))?;
}
}
None => {
if let Some(duration) = limit {
if clock.is_paused() {
self.park.park_timeout(Duration::from_secs(0))?;
clock.advance(duration);
} else {
self.park.park_timeout(duration)?;
}
} else {
self.park.park()?;
}
}
}
self.inner.process();
Ok(())
}
}
impl Handle {
pub(self) fn process(&self) {
let now = self.time_source().now();
self.process_at_time(now)
}
pub(self) fn process_at_time(&self, now: u64) {
let mut waker_list: [Option<Waker>; 32] = Default::default();
let mut waker_idx = 0;
let mut lock = self.lock();
assert!(now >= lock.elapsed);
while let Some(entry) = lock.wheel.poll(now) {
debug_assert!(unsafe { entry.is_pending() });
if let Some(waker) = unsafe { entry.fire(Ok(())) } {
waker_list[waker_idx] = Some(waker);
waker_idx += 1;
if waker_idx == waker_list.len() {
drop(lock);
for waker in waker_list.iter_mut() {
waker.take().unwrap().wake();
}
waker_idx = 0;
lock = self.lock();
}
}
}
lock.elapsed = lock.wheel.elapsed();
lock.next_wake = lock
.wheel
.poll_at()
.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap()));
drop(lock);
for waker in waker_list[0..waker_idx].iter_mut() {
waker.take().unwrap().wake();
}
}
pub(self) unsafe fn clear_entry(&self, entry: NonNull<TimerShared>) {
unsafe {
let mut lock = self.lock();
if entry.as_ref().might_be_registered() {
lock.wheel.remove(entry);
}
entry.as_ref().handle().fire(Ok(()));
}
}
pub(self) unsafe fn reregister(&self, new_tick: u64, entry: NonNull<TimerShared>) {
let waker = unsafe {
let mut lock = self.lock();
if unsafe { entry.as_ref().might_be_registered() } {
lock.wheel.remove(entry);
}
let entry = entry.as_ref().handle();
if lock.is_shutdown {
unsafe { entry.fire(Err(crate::time::error::Error::shutdown())) }
} else {
entry.set_expiration(new_tick);
match unsafe { lock.wheel.insert(entry) } {
Ok(when) => {
if lock
.next_wake
.map(|next_wake| when < next_wake.get())
.unwrap_or(true)
{
lock.unpark.unpark();
}
None
}
Err((entry, super::error::InsertError::Elapsed)) => unsafe {
entry.fire(Ok(()))
},
}
}
};
if let Some(waker) = waker {
waker.wake();
}
}
}
impl<P> Park for Driver<P>
where
P: Park + 'static,
{
type Unpark = P::Unpark;
type Error = P::Error;
fn unpark(&self) -> Self::Unpark {
self.park.unpark()
}
fn park(&mut self) -> Result<(), Self::Error> {
self.park_internal(None)
}
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
self.park_internal(Some(duration))
}
fn shutdown(&mut self) {
let mut lock = self.inner.lock();
if lock.is_shutdown {
return;
}
lock.is_shutdown = true;
drop(lock);
self.inner.process_at_time(u64::MAX);
self.park.shutdown();
}
}
impl<P> Drop for Driver<P>
where
P: Park + 'static,
{
fn drop(&mut self) {
self.shutdown();
}
}
impl Inner {
pub(self) fn new(time_source: ClockTime, unpark: Box<dyn Unpark>) -> Self {
Inner {
time_source,
elapsed: 0,
next_wake: None,
unpark,
wheel: wheel::Wheel::new(),
is_shutdown: false,
}
}
}
impl fmt::Debug for Inner {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Inner").finish()
}
}
#[cfg(test)]
mod tests;