use crate::reactor::Reactor;
use crate::sync::ThreadSafety;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
use std::time::{Duration, Instant};
use futures_lite::stream::Stream;
pub struct Timer<TS: ThreadSafety = crate::DefaultThreadSafety> {
reactor: TS::Rc<Reactor<TS>>,
id_and_waker: Option<(usize, Waker)>,
deadline: Option<Instant>,
period: Duration,
}
impl<TS: ThreadSafety> fmt::Debug for Timer<TS> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Timer")
.field("deadline", &self.deadline)
.field("period", &self.period)
.field("registered", &self.id_and_waker.is_some())
.finish()
}
}
impl<TS: ThreadSafety> Unpin for Timer<TS> {}
impl<TS: ThreadSafety> Timer<TS> {
pub fn never() -> Self {
Self {
reactor: Reactor::<TS>::get(),
id_and_waker: None,
deadline: None,
period: Duration::MAX,
}
}
pub fn will_fire(&self) -> bool {
self.deadline.is_some()
}
pub fn after(duration: Duration) -> Self {
Instant::now()
.checked_add(duration)
.map_or_else(Self::never, Self::at)
}
pub fn at(deadline: Instant) -> Self {
Self::interval_at(deadline, Duration::MAX)
}
pub fn interval(period: Duration) -> Self {
Instant::now()
.checked_add(period)
.map_or_else(Self::never, |deadline| Self::interval_at(deadline, period))
}
pub fn interval_at(start: Instant, period: Duration) -> Self {
Self {
reactor: Reactor::<TS>::get(),
id_and_waker: None,
deadline: Some(start),
period,
}
}
pub fn set_never(&mut self) {
self.clear();
self.deadline = None;
}
pub fn set_after(&mut self, duration: Duration) {
match Instant::now().checked_add(duration) {
Some(deadline) => self.set_at(deadline),
None => self.set_never(),
}
}
pub fn set_at(&mut self, deadline: Instant) {
self.set_interval_at(deadline, Duration::MAX)
}
pub fn set_interval(&mut self, period: Duration) {
match Instant::now().checked_add(period) {
Some(deadline) => self.set_interval_at(deadline, period),
None => self.set_never(),
}
}
pub fn set_interval_at(&mut self, start: Instant, period: Duration) {
self.clear();
self.deadline = Some(start);
self.period = period;
if let Some((id, waker)) = self.id_and_waker.as_mut() {
*id = self.reactor.insert_timer(start, waker);
}
}
fn clear(&mut self) {
if let (Some(deadline), Some((id, _))) = (self.deadline.take(), self.id_and_waker.take()) {
self.reactor.remove_timer(deadline, id);
}
}
}
impl<TS: ThreadSafety> Drop for Timer<TS> {
fn drop(&mut self) {
self.clear();
}
}
impl<TS: ThreadSafety> Future for Timer<TS> {
type Output = Instant;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.poll_next(cx).map(Option::unwrap)
}
}
impl<TS: ThreadSafety> Stream for Timer<TS> {
type Item = Instant;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if let Some(ref mut deadline) = this.deadline {
if *deadline < Instant::now() {
if let Some((id, _)) = this.id_and_waker.take() {
this.reactor.remove_timer(*deadline, id);
}
let result_time = *deadline;
if let Some(next) = deadline.checked_add(this.period) {
*deadline = next;
let id = this.reactor.insert_timer(next, cx.waker());
this.id_and_waker = Some((id, cx.waker().clone()));
} else {
this.deadline = None;
}
return Poll::Ready(Some(result_time));
} else {
match &this.id_and_waker {
None => {
let id = this.reactor.insert_timer(*deadline, cx.waker());
this.id_and_waker = Some((id, cx.waker().clone()));
}
Some((id, w)) if !w.will_wake(cx.waker()) => {
this.reactor.remove_timer(*deadline, *id);
let id = this.reactor.insert_timer(*deadline, cx.waker());
this.id_and_waker = Some((id, cx.waker().clone()));
}
_ => {}
}
}
}
Poll::Pending
}
}