use futures::stream::{FusedStream, Stream};
use std::error::Error;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
use std::time::{Duration, Instant};
use super::reactor::{AfterTimerId, Reactor, RegularTimerId};
#[derive(Debug)]
pub struct IntervalError;
impl Error for IntervalError {}
impl fmt::Display for IntervalError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("Interval period can't be null")
}
}
pub fn delay_for(delay: Duration) -> Oneshot {
if delay <= Reactor::with(|r| r.half_max_throttling()) {
return Oneshot::new(Reactor::with(|r| r.timers_check_instant()));
}
Oneshot::new(Instant::now() + delay)
}
pub fn at(when: Instant) -> Oneshot {
if when <= Instant::now() {
return Oneshot::new(Reactor::with(|r| r.timers_check_instant()));
}
Oneshot::new(when)
}
pub fn interval(period: Duration) -> Result<Interval, IntervalError> {
interval_at(Instant::now(), period)
}
pub fn interval_delayed_by(delay: Duration, period: Duration) -> Result<Interval, IntervalError> {
interval_at(Instant::now() + delay, period)
}
pub fn interval_at(start: Instant, period: Duration) -> Result<Interval, IntervalError> {
if period.is_zero() {
return Err(IntervalError);
}
Ok(Interval::new(start, period))
}
#[track_caller]
pub fn delay_for_at_least(delay: Duration) -> OneshotAfter {
if delay.is_zero() {
return OneshotAfter::new(Reactor::with(|r| r.timers_check_instant()));
}
OneshotAfter::new(Instant::now() + delay)
}
#[track_caller]
pub fn after(when: Instant) -> OneshotAfter {
if when <= Instant::now() {
return OneshotAfter::new(Reactor::with(|r| r.timers_check_instant()));
}
OneshotAfter::new(when)
}
pub fn interval_at_least(period: Duration) -> Result<IntervalAfter, IntervalError> {
interval_after_at_least(Instant::now(), period)
}
#[track_caller]
pub fn interval_delayed_by_at_least(
delay: Duration,
period: Duration,
) -> Result<IntervalAfter, IntervalError> {
interval_after_at_least(Instant::now() + delay, period)
}
#[track_caller]
pub fn interval_after_at_least(
start: Instant,
period: Duration,
) -> Result<IntervalAfter, IntervalError> {
if period.is_zero() {
return Err(IntervalError);
}
Ok(IntervalAfter::new(start, period))
}
#[derive(Debug)]
pub struct Oneshot {
id_and_waker: Option<(RegularTimerId, Waker)>,
when: Instant,
}
impl Oneshot {
fn new(when: Instant) -> Self {
Oneshot {
id_and_waker: None,
when,
}
}
}
impl Drop for Oneshot {
fn drop(&mut self) {
if let Some((id, _)) = self.id_and_waker.take() {
Reactor::with_mut(|reactor| {
reactor.remove_timer(self.when, id);
});
}
}
}
impl Future for Oneshot {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Reactor::with_mut(|reactor| {
if reactor.time_slice_end() >= self.when {
if let Some((id, _)) = self.id_and_waker.take() {
reactor.remove_timer(self.when, id);
}
Poll::Ready(())
} else {
match &self.id_and_waker {
None => {
let id = reactor.insert_regular_timer(self.when, cx.waker());
self.id_and_waker = Some((id, cx.waker().clone()));
}
Some((id, w)) if !w.will_wake(cx.waker()) => {
reactor.remove_timer(self.when, *id);
let id = reactor.insert_regular_timer(self.when, cx.waker());
self.id_and_waker = Some((id, cx.waker().clone()));
}
Some(_) => {}
}
Poll::Pending
}
})
}
}
#[derive(Debug)]
pub struct OneshotAfter {
id_and_waker: Option<(AfterTimerId, Waker)>,
when: Instant,
}
impl OneshotAfter {
fn new(when: Instant) -> Self {
OneshotAfter {
id_and_waker: None,
when,
}
}
}
impl Drop for OneshotAfter {
fn drop(&mut self) {
if let Some((id, _)) = self.id_and_waker.take() {
Reactor::with_mut(|reactor| {
reactor.remove_timer(self.when, id);
});
}
}
}
impl Future for OneshotAfter {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Reactor::with_mut(|reactor| {
if reactor.timers_check_instant() >= self.when {
if let Some((id, _)) = self.id_and_waker.take() {
reactor.remove_timer(self.when, id);
}
Poll::Ready(())
} else {
match &self.id_and_waker {
None => {
let id = reactor.insert_after_timer(self.when, cx.waker());
self.id_and_waker = Some((id, cx.waker().clone()));
}
Some((id, w)) if !w.will_wake(cx.waker()) => {
reactor.remove_timer(self.when, *id);
let id = reactor.insert_after_timer(self.when, cx.waker());
self.id_and_waker = Some((id, cx.waker().clone()));
}
Some(_) => {}
}
Poll::Pending
}
})
}
}
#[derive(Debug)]
pub struct Interval {
id_and_waker: Option<(RegularTimerId, Waker)>,
when: Instant,
period: Duration,
}
impl Interval {
fn new(start: Instant, period: Duration) -> Self {
Interval {
id_and_waker: None,
when: start,
period,
}
}
}
impl Drop for Interval {
fn drop(&mut self) {
if let Some((id, _)) = self.id_and_waker.take() {
Reactor::with_mut(|reactor| {
reactor.remove_timer(self.when, id);
});
}
}
}
impl Stream for Interval {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Reactor::with_mut(|reactor| {
let time_slice_end = reactor.time_slice_end();
if time_slice_end >= self.when {
if let Some((id, _)) = self.id_and_waker.take() {
reactor.remove_timer(self.when, id);
}
let period = self.period;
while time_slice_end >= self.when {
self.when += period;
}
let id = reactor.insert_regular_timer(self.when, cx.waker());
self.id_and_waker = Some((id, cx.waker().clone()));
Poll::Ready(Some(()))
} else {
match &self.id_and_waker {
None => {
let id = reactor.insert_regular_timer(self.when, cx.waker());
self.id_and_waker = Some((id, cx.waker().clone()));
}
Some((id, w)) if !w.will_wake(cx.waker()) => {
reactor.remove_timer(self.when, *id);
let id = reactor.insert_regular_timer(self.when, cx.waker());
self.id_and_waker = Some((id, cx.waker().clone()));
}
Some(_) => {}
}
Poll::Pending
}
})
}
}
impl FusedStream for Interval {
fn is_terminated(&self) -> bool {
false
}
}
#[derive(Debug)]
pub struct IntervalAfter {
id_and_waker: Option<(AfterTimerId, Waker)>,
when: Instant,
period: Duration,
}
impl IntervalAfter {
fn new(start: Instant, period: Duration) -> Self {
IntervalAfter {
id_and_waker: None,
when: start,
period,
}
}
}
impl Drop for IntervalAfter {
fn drop(&mut self) {
if let Some((id, _)) = self.id_and_waker.take() {
Reactor::with_mut(|reactor| {
reactor.remove_timer(self.when, id);
});
}
}
}
impl Stream for IntervalAfter {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Reactor::with_mut(|reactor| {
let timers_check_instant = reactor.timers_check_instant();
if timers_check_instant >= self.when {
if let Some((id, _)) = self.id_and_waker.take() {
reactor.remove_timer(self.when, id);
}
let period = self.period;
while timers_check_instant >= self.when {
self.when += period;
}
let id = reactor.insert_after_timer(self.when, cx.waker());
self.id_and_waker = Some((id, cx.waker().clone()));
Poll::Ready(Some(()))
} else {
match &self.id_and_waker {
None => {
let id = reactor.insert_after_timer(self.when, cx.waker());
self.id_and_waker = Some((id, cx.waker().clone()));
}
Some((id, w)) if !w.will_wake(cx.waker()) => {
reactor.remove_timer(self.when, *id);
let id = reactor.insert_after_timer(self.when, cx.waker());
self.id_and_waker = Some((id, cx.waker().clone()));
}
Some(_) => {}
}
Poll::Pending
}
})
}
}
impl FusedStream for IntervalAfter {
fn is_terminated(&self) -> bool {
false
}
}
#[cfg(test)]
mod tests {
use std::time::{Duration, Instant};
use crate::runtime::executor::Scheduler;
const MAX_THROTTLING: Duration = Duration::from_millis(10);
const DELAY: Duration = Duration::from_millis(12);
const PERIOD: Duration = Duration::from_millis(15);
#[test]
fn delay_for_regular() {
gst::init().unwrap();
let handle = Scheduler::start("delay_for_regular", MAX_THROTTLING);
futures::executor::block_on(handle.spawn(async {
let start = Instant::now();
super::delay_for(DELAY).await;
assert!(start.elapsed() + MAX_THROTTLING / 2 >= DELAY);
}))
.unwrap();
}
#[test]
fn delay_for_at_least() {
gst::init().unwrap();
let handle = Scheduler::start("delay_for_at_least", MAX_THROTTLING);
futures::executor::block_on(handle.spawn(async {
let start = Instant::now();
super::delay_for_at_least(DELAY).await;
assert!(start.elapsed() >= DELAY);
}))
.unwrap();
}
#[test]
fn interval_regular() {
use futures::prelude::*;
gst::init().unwrap();
let handle = Scheduler::start("interval_regular", MAX_THROTTLING);
let join_handle = handle.spawn(async move {
let mut acc = Duration::ZERO;
let start = Instant::now();
let mut interval = super::interval(PERIOD).unwrap();
interval.next().await.unwrap();
assert!(start.elapsed() + MAX_THROTTLING / 2 >= acc);
for _ in 0..10 {
interval.next().await.unwrap();
acc += PERIOD;
assert!(start.elapsed() + MAX_THROTTLING / 2 >= acc);
}
});
futures::executor::block_on(join_handle).unwrap();
}
#[test]
fn interval_after_at_least() {
use futures::prelude::*;
gst::init().unwrap();
let handle = Scheduler::start("interval_after", MAX_THROTTLING);
let join_handle = handle.spawn(async move {
let mut acc = DELAY;
let start = Instant::now();
let mut interval = super::interval_after_at_least(start + DELAY, PERIOD).unwrap();
interval.next().await.unwrap();
assert!(start.elapsed() >= acc);
for _ in 1..10 {
interval.next().await.unwrap();
acc += PERIOD;
assert!(start.elapsed() >= acc);
}
});
futures::executor::block_on(join_handle).unwrap();
}
}