use std::{
fmt,
future::Future,
pin::Pin,
task::{Context, Poll},
time::{Duration, SystemTime},
};
use switchy_time::{instant_now, now};
use futures::future::FusedFuture;
use pin_project_lite::pin_project;
pin_project! {
#[derive(Debug, Copy, Clone)]
pub struct Sleep {
#[pin]
now: SystemTime,
#[pin]
duration: Duration,
#[pin]
polled: bool,
#[pin]
completed: bool,
}
}
impl Sleep {
#[must_use]
pub fn new(duration: Duration) -> Self {
Self {
now: switchy_time::now(),
duration,
polled: false,
completed: false,
}
}
}
impl Future for Sleep {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut this = self.project();
log::trace!(
"Polling Sleep: now={:?} duration={:?} polled={} completed={}",
this.now,
this.duration,
this.polled,
this.completed,
);
let polled = *this.polled;
if polled {
let duration = switchy_time::now().duration_since(*this.now).unwrap();
log::trace!(
"Sleep polled: {}ms/{}ms",
duration.as_millis(),
this.duration.as_millis(),
);
if duration >= *this.duration {
*this.completed.as_mut() = true;
return Poll::Ready(());
}
}
if !polled {
*this.polled.as_mut() = true;
}
cx.waker().wake_by_ref();
Poll::Pending
}
}
impl FusedFuture for Sleep {
fn is_terminated(&self) -> bool {
self.completed
}
}
pin_project! {
#[allow(clippy::struct_field_names)]
#[derive(Debug, Copy, Clone)]
pub struct Instant {
#[pin]
instant: std::time::Instant,
#[pin]
polled: bool,
#[pin]
completed: bool,
}
}
impl Instant {
#[must_use]
pub const fn new(instant: std::time::Instant) -> Self {
Self {
instant,
polled: false,
completed: false,
}
}
}
fn system_time_to_instant(
target: SystemTime,
) -> Result<std::time::Instant, std::time::SystemTimeError> {
let now_sys = now();
let now_inst = instant_now();
if target >= now_sys {
let delta: Duration = target.duration_since(now_sys)?;
Ok(now_inst + delta)
} else {
let delta: Duration = now_sys.duration_since(target)?;
Ok(now_inst.checked_sub(delta).unwrap())
}
}
impl Future for Instant {
type Output = std::time::Instant;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut this = self.project();
log::trace!(
"Polling Instant: instant={:?} polled={} completed={}",
this.instant,
this.polled,
this.completed,
);
let polled = *this.polled;
if polled {
let now = system_time_to_instant(switchy_time::now()).unwrap();
log::trace!("Instant polled: now={:?} instant={:?}", now, this.instant);
if now > *this.instant {
*this.completed.as_mut() = true;
return Poll::Ready(now);
}
}
if !polled {
*this.polled.as_mut() = true;
}
cx.waker().wake_by_ref();
Poll::Pending
}
}
impl FusedFuture for Instant {
fn is_terminated(&self) -> bool {
self.completed
}
}
pin_project! {
#[allow(clippy::struct_field_names)]
#[derive(Debug, Copy, Clone)]
pub struct Interval {
#[pin]
now: SystemTime,
#[pin]
interval: Duration,
#[pin]
polled: bool,
#[pin]
completed: bool,
}
}
impl Interval {
#[must_use]
pub fn new(interval: Duration) -> Self {
Self {
now: switchy_time::now(),
interval,
polled: false,
completed: false,
}
}
pub fn tick(&mut self) -> Instant {
Instant::new(system_time_to_instant(switchy_time::now() + self.interval).unwrap())
}
pub fn reset(&mut self) {
self.now = switchy_time::now();
self.polled = false;
self.completed = false;
}
pub fn poll_tick(&mut self, cx: &mut Context) -> Poll<std::time::Instant> {
if self.completed {
self.now = switchy_time::now();
self.polled = false;
self.completed = false;
}
if self.polled {
let duration = switchy_time::now().duration_since(self.now).unwrap();
if duration >= self.interval {
self.completed = true;
let instant = system_time_to_instant(switchy_time::now()).unwrap();
return Poll::Ready(instant);
}
}
if !self.polled {
self.polled = true;
}
cx.waker().wake_by_ref();
Poll::Pending
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Elapsed;
impl fmt::Display for Elapsed {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "deadline has elapsed")
}
}
impl std::error::Error for Elapsed {}
pin_project! {
#[derive(Debug)]
pub struct Timeout<F> {
#[pin]
future: F,
#[pin]
sleep: Sleep,
}
}
impl<F> Timeout<F> {
#[must_use]
pub fn new(duration: Duration, future: F) -> Self {
Self {
future,
sleep: Sleep::new(duration),
}
}
#[must_use]
pub fn into_inner(self) -> F {
self.future
}
}
impl<F> Future for Timeout<F>
where
F: Future,
{
type Output = Result<F::Output, Elapsed>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.project();
if let Poll::Ready(output) = this.future.poll(cx) {
return Poll::Ready(Ok(output));
}
if this.sleep.poll(cx) == Poll::Ready(()) {
return Poll::Ready(Err(Elapsed));
}
Poll::Pending
}
}
impl<F> FusedFuture for Timeout<F>
where
F: FusedFuture,
{
fn is_terminated(&self) -> bool {
self.future.is_terminated() || self.sleep.is_terminated()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::future::ready;
#[test_log::test]
fn sleep_future_implements_fused_future() {
let sleep = Sleep::new(Duration::from_millis(10));
assert!(!sleep.is_terminated());
}
#[test_log::test]
fn interval_reset_restarts_timing() {
let mut interval = Interval::new(Duration::from_millis(100));
let _tick1 = interval.tick();
interval.reset();
assert!(!interval.polled);
assert!(!interval.completed);
}
#[test_log::test]
fn interval_poll_tick_returns_ready_after_duration() {
use std::task::{Context, Poll};
let mut interval = Interval::new(Duration::from_millis(1));
let waker = futures::task::noop_waker();
let mut cx = Context::from_waker(&waker);
assert!(matches!(interval.poll_tick(&mut cx), Poll::Pending));
interval.polled = true;
interval.now = switchy_time::now() - Duration::from_millis(2);
let result = interval.poll_tick(&mut cx);
assert!(matches!(result, Poll::Ready(_)));
}
#[test_log::test]
fn instant_future_implements_fused_future() {
let instant = Instant::new(instant_now() + Duration::from_millis(10));
assert!(!instant.is_terminated());
}
#[test_log::test]
fn timeout_into_inner_returns_original_future() {
let original_future = ready(42);
let timeout = Timeout::new(Duration::from_millis(100), original_future);
let inner = timeout.into_inner();
let result = futures::executor::block_on(inner);
assert_eq!(result, 42);
}
#[test_log::test]
fn elapsed_error_displays_correctly() {
let err = Elapsed;
assert_eq!(err.to_string(), "deadline has elapsed");
}
#[test_log::test]
fn elapsed_error_is_clonable() {
let err1 = Elapsed;
let err2 = err1.clone();
assert_eq!(err1, err2);
}
#[test_log::test]
fn sleep_creates_with_current_time() {
let sleep = Sleep::new(Duration::from_millis(100));
let now = switchy_time::now();
let diff = sleep
.now
.duration_since(now)
.unwrap_or_else(|_| now.duration_since(sleep.now).unwrap());
assert!(diff < Duration::from_millis(10));
}
#[test_log::test]
fn interval_creates_with_current_time() {
let interval = Interval::new(Duration::from_millis(100));
let now = switchy_time::now();
let diff = interval
.now
.duration_since(now)
.unwrap_or_else(|_| now.duration_since(interval.now).unwrap());
assert!(diff < Duration::from_millis(10));
}
#[test_log::test]
fn system_time_to_instant_handles_future_time() {
let future_time = now() + Duration::from_secs(10);
let result = system_time_to_instant(future_time);
assert!(result.is_ok());
}
#[test_log::test]
fn system_time_to_instant_handles_past_time() {
let past_time = now() - Duration::from_secs(10);
let result = system_time_to_instant(past_time);
assert!(result.is_ok());
}
#[test_log::test]
fn system_time_to_instant_handles_current_time() {
let current_time = now();
let result = system_time_to_instant(current_time);
assert!(result.is_ok());
}
#[test_log::test]
fn timeout_fused_future_not_terminated_initially() {
use futures::future::{Fuse, FutureExt};
let fused_pending: Fuse<std::future::Pending<()>> = std::future::pending().fuse();
let timeout = Timeout::new(Duration::from_millis(100), fused_pending);
assert!(!timeout.is_terminated());
}
#[test_log::test]
fn timeout_fused_future_terminated_when_inner_future_terminates() {
use futures::future::Fuse;
let terminated_fused: Fuse<std::future::Ready<()>> = Fuse::terminated();
let timeout = Timeout::new(Duration::from_millis(100), terminated_fused);
assert!(timeout.is_terminated());
}
#[test_log::test]
fn timeout_fused_future_terminated_when_sleep_terminates() {
use futures::future::{Fuse, FutureExt};
use std::task::{Context, Poll};
let never_ready: Fuse<std::future::Pending<()>> = std::future::pending().fuse();
let timeout = Timeout::new(Duration::ZERO, never_ready);
let waker = futures::task::noop_waker();
let mut cx = Context::from_waker(&waker);
let mut pinned_timeout = std::pin::pin!(timeout);
let _ = pinned_timeout.as_mut().poll(&mut cx);
let result = pinned_timeout.as_mut().poll(&mut cx);
assert!(matches!(result, Poll::Ready(Err(Elapsed))));
}
#[test_log::test]
fn interval_poll_tick_resets_after_completion() {
use std::task::{Context, Poll};
let mut interval = Interval::new(Duration::from_millis(1));
let waker = futures::task::noop_waker();
let mut cx = Context::from_waker(&waker);
assert!(matches!(interval.poll_tick(&mut cx), Poll::Pending));
interval.polled = true;
interval.now = switchy_time::now()
.checked_sub(Duration::from_millis(2))
.unwrap();
let result = interval.poll_tick(&mut cx);
assert!(matches!(result, Poll::Ready(_)));
assert!(interval.completed);
let result2 = interval.poll_tick(&mut cx);
assert!(matches!(result2, Poll::Pending));
assert!(interval.polled); assert!(!interval.completed);
}
#[test_log::test]
fn sleep_poll_completes_after_duration_elapses() {
use std::task::{Context, Poll};
let mut sleep = Sleep::new(Duration::from_millis(1));
let waker = futures::task::noop_waker();
let mut cx = Context::from_waker(&waker);
let mut pinned_sleep = std::pin::Pin::new(&mut sleep);
assert!(matches!(pinned_sleep.as_mut().poll(&mut cx), Poll::Pending));
{
let mut projected = pinned_sleep.as_mut().project();
*projected.polled = true;
*projected.now = switchy_time::now() - Duration::from_millis(2);
}
let result = pinned_sleep.as_mut().poll(&mut cx);
assert!(matches!(result, Poll::Ready(())));
assert!(sleep.is_terminated());
}
#[test_log::test]
fn instant_poll_returns_ready_when_time_passes() {
use std::task::{Context, Poll};
let past_instant = instant_now()
.checked_sub(Duration::from_millis(100))
.unwrap();
let mut instant = Instant::new(past_instant);
let waker = futures::task::noop_waker();
let mut cx = Context::from_waker(&waker);
let mut pinned_instant = std::pin::Pin::new(&mut instant);
let result1 = pinned_instant.as_mut().poll(&mut cx);
let result2 = pinned_instant.as_mut().poll(&mut cx);
assert!(
matches!(result1, Poll::Pending) || matches!(result2, Poll::Ready(_)),
"Expected either first poll pending or second ready"
);
if matches!(result2, Poll::Ready(_)) {
assert!(instant.is_terminated());
}
}
}