#![allow(unused_unsafe)]
mod entry;
use self::entry::{EntryList, TimerEntry, TimerHandle, TimerShared};
mod handle;
pub(crate) use self::handle::Handle;
mod wheel;
pub(super) mod sleep;
use std::{cell::RefCell, fmt, io, num::NonZeroU64, ptr::NonNull, rc::Rc};
use crate::{
driver::Driver,
time::{error::Error, Clock, Duration, Instant},
};
#[derive(Debug)]
pub struct TimeDriver<D: 'static> {
time_source: ClockTime,
pub(crate) handle: Handle,
park: D,
}
#[derive(Debug, Clone)]
struct ClockTime {
clock: super::clock::Clock,
start_time: Instant,
}
impl ClockTime {
pub(self) fn new(clock: Clock) -> Self {
Self {
start_time: clock.now(),
clock,
}
}
pub(self) fn deadline_to_tick(&self, t: Instant) -> u64 {
self.instant_to_tick(t + Duration::from_nanos(999_999))
}
pub(self) fn instant_to_tick(&self, t: Instant) -> u64 {
let dur: Duration = t
.checked_duration_since(self.start_time)
.unwrap_or_else(|| Duration::from_secs(0));
let ms = dur.as_millis();
ms.try_into().expect("Duration too far into the future")
}
pub(self) fn tick_to_duration(&self, t: u64) -> Duration {
Duration::from_millis(t)
}
pub(self) fn now(&self) -> u64 {
self.instant_to_tick(self.clock.now())
}
}
struct Inner {
pub(super) state: RefCell<InnerState>,
}
struct InnerState {
time_source: ClockTime,
elapsed: u64,
next_wake: Option<NonZeroU64>,
wheel: wheel::Wheel,
}
impl<D> TimeDriver<D>
where
D: Driver + 'static,
{
pub(crate) fn new(park: D, clock: Clock) -> TimeDriver<D> {
let time_source = ClockTime::new(clock);
let inner = Inner::new(time_source.clone());
TimeDriver {
time_source,
handle: Handle::new(Rc::new(inner)),
park,
}
}
fn park_internal(&self, limit: Option<Duration>) -> io::Result<()> {
let mut inner_state = self.handle.get().state.borrow_mut();
let next_wake = inner_state.wheel.next_expiration_time();
inner_state.next_wake =
next_wake.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap()));
drop(inner_state);
match next_wake {
Some(when) => {
let now = self.time_source.now();
let mut duration = self.time_source.tick_to_duration(when.saturating_sub(now));
if duration > Duration::from_millis(0) {
if let Some(limit) = limit {
duration = std::cmp::min(limit, duration);
}
self.park.park_timeout(duration)?;
} else {
self.park.park_timeout(Duration::from_secs(0))?;
}
}
None => {
if let Some(duration) = limit {
self.park.park_timeout(duration)?;
} else {
self.park.park()?;
}
}
}
self.handle.process();
Ok(())
}
}
impl Handle {
pub(self) fn process(&self) {
let now = self.time_source().now();
self.process_at_time(now)
}
pub(self) fn process_at_time(&self, mut now: u64) {
let mut state = self.get().state.borrow_mut();
if now < state.elapsed {
now = state.elapsed;
}
while let Some(entry) = state.wheel.poll(now) {
if let Some(waker) = unsafe { entry.fire(Ok(())) } {
waker.wake();
}
}
state.elapsed = state.wheel.elapsed();
state.next_wake = state
.wheel
.poll_at()
.map(|t| NonZeroU64::new(t).unwrap_or_else(|| NonZeroU64::new(1).unwrap()));
}
pub(self) unsafe fn clear_entry(&self, entry: NonNull<TimerShared>) {
unsafe {
let mut state = self.get().state.borrow_mut();
if entry.as_ref().might_be_registered() {
state.wheel.remove(entry);
}
entry.as_ref().handle().fire(Ok(()));
}
}
pub(self) unsafe fn reregister(&self, new_tick: u64, entry: NonNull<TimerShared>) {
let waker = unsafe {
let mut state = self.get().state.borrow_mut();
if unsafe { entry.as_ref().might_be_registered() } {
state.wheel.remove(entry);
}
let entry = entry.as_ref().handle();
entry.set_expiration(new_tick);
match unsafe { state.wheel.insert(entry) } {
Ok(_) => None,
Err((entry, super::error::InsertError::Elapsed)) => unsafe { entry.fire(Ok(())) },
}
};
if let Some(waker) = waker {
waker.wake();
}
}
}
impl<D> Driver for TimeDriver<D>
where
D: Driver + 'static,
{
fn with<R>(&self, f: impl FnOnce() -> R) -> R {
self.park.with(f)
}
fn submit(&self) -> io::Result<()> {
self.park.submit()
}
fn park(&self) -> io::Result<()> {
self.park_internal(None)
}
#[cfg(feature = "sync")]
type Unpark = D::Unpark;
fn park_timeout(&self, duration: Duration) -> io::Result<()> {
self.park_internal(Some(duration))
}
#[cfg(feature = "sync")]
fn unpark(&self) -> Self::Unpark {
self.park.unpark()
}
}
impl<D> Drop for TimeDriver<D>
where
D: 'static,
{
fn drop(&mut self) {
}
}
impl Inner {
pub(self) fn new(time_source: ClockTime) -> Self {
Inner {
state: RefCell::new(InnerState {
time_source,
elapsed: 0,
next_wake: None,
wheel: wheel::Wheel::new(),
}),
}
}
}
impl fmt::Debug for Inner {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Inner").finish()
}
}