use crate::traits::SleepProvider;
use futures::{Future, FutureExt};
use pin_project::pin_project;
use std::{
pin::Pin,
task::{Context, Poll},
time::{Duration, SystemTime},
};
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
#[allow(clippy::exhaustive_structs)]
pub struct TimeoutError;
impl std::error::Error for TimeoutError {}
impl std::fmt::Display for TimeoutError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Timeout expired")
}
}
impl From<TimeoutError> for std::io::Error {
fn from(err: TimeoutError) -> std::io::Error {
std::io::Error::new(std::io::ErrorKind::TimedOut, err)
}
}
pub trait SleepProviderExt: SleepProvider {
#[must_use = "timeout() returns a future, which does nothing unless used"]
fn timeout<F: Future>(&self, duration: Duration, future: F) -> Timeout<F, Self::SleepFuture> {
let sleep_future = self.sleep(duration);
Timeout {
future,
sleep_future,
}
}
#[must_use = "sleep_until_wallclock() returns a future, which does nothing unless used"]
fn sleep_until_wallclock(&self, when: SystemTime) -> SleepUntilWallclock<'_, Self> {
SleepUntilWallclock {
provider: self,
target: when,
sleep_future: None,
}
}
}
impl<T: SleepProvider> SleepProviderExt for T {}
#[pin_project]
pub struct Timeout<T, S> {
#[pin]
future: T,
#[pin]
sleep_future: S,
}
impl<T, S> Future for Timeout<T, S>
where
T: Future,
S: Future<Output = ()>,
{
type Output = Result<T::Output, TimeoutError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if let Poll::Ready(x) = this.future.poll(cx) {
return Poll::Ready(Ok(x));
}
match this.sleep_future.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(()) => Poll::Ready(Err(TimeoutError)),
}
}
}
pub struct SleepUntilWallclock<'a, SP: SleepProvider> {
provider: &'a SP,
target: SystemTime,
sleep_future: Option<Pin<Box<SP::SleepFuture>>>,
}
impl<'a, SP> Future for SleepUntilWallclock<'a, SP>
where
SP: SleepProvider,
{
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let target = self.target;
loop {
let now = self.provider.wallclock();
if now >= target {
return Poll::Ready(());
}
let (last_delay, delay) = calc_next_delay(now, target);
self.sleep_future.take();
let mut sleep_future = Box::pin(self.provider.sleep(delay));
match sleep_future.poll_unpin(cx) {
Poll::Pending => {
self.sleep_future = Some(sleep_future);
return Poll::Pending;
}
Poll::Ready(()) => {
if last_delay {
return Poll::Ready(());
}
}
}
}
}
}
const MAX_SLEEP: Duration = Duration::from_secs(600);
pub(crate) fn calc_next_delay(now: SystemTime, when: SystemTime) -> (bool, Duration) {
let remainder = when
.duration_since(now)
.unwrap_or_else(|_| Duration::from_secs(0));
if remainder > MAX_SLEEP {
(false, MAX_SLEEP)
} else {
(true, remainder)
}
}
#[cfg(test)]
mod test {
#![allow(clippy::bool_assert_comparison)]
#![allow(clippy::clone_on_copy)]
#![allow(clippy::dbg_macro)]
#![allow(clippy::mixed_attributes_style)]
#![allow(clippy::print_stderr)]
#![allow(clippy::print_stdout)]
#![allow(clippy::single_char_pattern)]
#![allow(clippy::unwrap_used)]
#![allow(clippy::unchecked_duration_subtraction)]
#![allow(clippy::useless_vec)]
#![allow(clippy::needless_pass_by_value)]
#![allow(clippy::erasing_op)]
#[cfg(not(miri))]
use super::*;
#[cfg(not(miri))] #[test]
fn sleep_delay() {
fn calc(now: SystemTime, when: SystemTime) -> Duration {
calc_next_delay(now, when).1
}
let minute = Duration::from_secs(60);
let second = Duration::from_secs(1);
let start = SystemTime::now();
let target = start + 30 * minute;
assert_eq!(calc(start, target), minute * 10);
assert_eq!(calc(target + minute, target), minute * 0);
assert_eq!(calc(target, target), minute * 0);
assert_eq!(calc(target - second, target), second);
assert_eq!(calc(target - minute * 9, target), minute * 9);
assert_eq!(calc(target - minute * 11, target), minute * 10);
}
}