use crate::linked_list::{self, Link, LinkedList};
use crate::TimeoutError;
use core::future::Future;
use core::pin::Pin;
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use core::task::{Poll, Waker};
mod backend;
mod tick_type;
pub use backend::TimerQueueBackend;
pub use tick_type::TimerQueueTicks;
struct WaitingWaker<Backend: TimerQueueBackend> {
waker: Waker,
release_at: Backend::Ticks,
was_popped: AtomicBool,
}
impl<Backend: TimerQueueBackend> Clone for WaitingWaker<Backend> {
fn clone(&self) -> Self {
Self {
waker: self.waker.clone(),
release_at: self.release_at,
was_popped: AtomicBool::new(self.was_popped.load(Ordering::Relaxed)),
}
}
}
impl<Backend: TimerQueueBackend> PartialEq for WaitingWaker<Backend> {
fn eq(&self, other: &Self) -> bool {
self.release_at == other.release_at
}
}
impl<Backend: TimerQueueBackend> PartialOrd for WaitingWaker<Backend> {
fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
Some(self.release_at.compare(other.release_at))
}
}
pub struct TimerQueue<Backend: TimerQueueBackend> {
queue: LinkedList<WaitingWaker<Backend>>,
initialized: AtomicBool,
}
impl<Backend: TimerQueueBackend> Default for TimerQueue<Backend> {
fn default() -> Self {
Self::new()
}
}
impl<Backend: TimerQueueBackend> TimerQueue<Backend> {
pub const fn new() -> Self {
Self {
queue: LinkedList::new(),
initialized: AtomicBool::new(false),
}
}
#[inline(always)]
pub fn now(&self) -> Backend::Ticks {
Backend::now()
}
pub fn initialize(&self, backend: Backend) {
self.initialized.store(true, Ordering::SeqCst);
core::mem::forget(backend);
}
pub unsafe fn on_monotonic_interrupt(&self) {
Backend::clear_compare_flag();
Backend::on_interrupt();
loop {
let mut release_at = None;
let head = self.queue.pop_if(|head| {
release_at = Some(head.release_at);
let should_pop = Backend::now().is_at_least(head.release_at);
head.was_popped.store(should_pop, Ordering::Relaxed);
should_pop
});
match (head, release_at) {
(Some(link), _) => {
link.waker.wake();
}
(None, Some(instant)) => {
Backend::enable_timer();
Backend::set_compare(instant);
if Backend::now().is_at_least(instant) {
continue;
}
break;
}
(None, None) => {
Backend::disable_timer();
break;
}
}
}
}
pub fn timeout_at<F: Future>(
&self,
instant: Backend::Ticks,
future: F,
) -> Timeout<'_, Backend, F> {
Timeout {
delay: Delay::<Backend> {
instant,
queue: &self.queue,
link_ptr: None,
marker: AtomicUsize::new(0),
},
future,
}
}
#[inline]
pub fn timeout_after<F: Future>(
&self,
duration: Backend::Ticks,
future: F,
) -> Timeout<'_, Backend, F> {
let now = Backend::now();
let mut timeout = now.wrapping_add(duration);
if now != timeout {
timeout = timeout.wrapping_add(Backend::Ticks::ONE_TICK);
}
self.timeout_at(timeout, future)
}
#[inline]
pub fn delay(&self, duration: Backend::Ticks) -> Delay<'_, Backend> {
let now = Backend::now();
let mut timeout = now.wrapping_add(duration);
if now != timeout {
timeout = timeout.wrapping_add(Backend::Ticks::ONE_TICK);
}
self.delay_until(timeout)
}
pub fn delay_until(&self, instant: Backend::Ticks) -> Delay<'_, Backend> {
if !self.initialized.load(Ordering::Relaxed) {
panic!(
"The timer queue is not initialized with a monotonic, you need to run `initialize`"
);
}
Delay::<Backend> {
instant,
queue: &self.queue,
link_ptr: None,
marker: AtomicUsize::new(0),
}
}
}
pub struct Delay<'q, Backend: TimerQueueBackend> {
instant: Backend::Ticks,
queue: &'q LinkedList<WaitingWaker<Backend>>,
link_ptr: Option<linked_list::Link<WaitingWaker<Backend>>>,
marker: AtomicUsize,
}
impl<Backend: TimerQueueBackend> Future for Delay<'_, Backend> {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
let this = unsafe { self.get_unchecked_mut() };
if Backend::now().is_at_least(this.instant) {
return Poll::Ready(());
}
let link = &mut this.link_ptr;
if link.is_none() {
let link_ref = link.insert(Link::new(WaitingWaker {
waker: cx.waker().clone(),
release_at: this.instant,
was_popped: AtomicBool::new(false),
}));
let (head_updated, addr) = unsafe { this.queue.insert(Pin::new_unchecked(link_ref)) };
this.marker.store(addr, Ordering::Relaxed);
if head_updated {
Backend::pend_interrupt()
}
}
Poll::Pending
}
}
impl<Backend: TimerQueueBackend> Drop for Delay<'_, Backend> {
fn drop(&mut self) {
match self.link_ptr.as_ref() {
None => return,
Some(link) if link.val.was_popped.load(Ordering::Relaxed) => return,
_ => {}
}
self.queue.delete(self.marker.load(Ordering::Relaxed));
}
}
pub struct Timeout<'q, Backend: TimerQueueBackend, F> {
delay: Delay<'q, Backend>,
future: F,
}
impl<Backend: TimerQueueBackend, F: Future> Future for Timeout<'_, Backend, F> {
type Output = Result<F::Output, TimeoutError>;
fn poll(self: Pin<&mut Self>, cx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
let inner = unsafe { self.get_unchecked_mut() };
{
let f = unsafe { Pin::new_unchecked(&mut inner.future) };
if let Poll::Ready(v) = f.poll(cx) {
return Poll::Ready(Ok(v));
}
}
{
let d = unsafe { Pin::new_unchecked(&mut inner.delay) };
if d.poll(cx).is_ready() {
return Poll::Ready(Err(TimeoutError));
}
}
Poll::Pending
}
}