use super::elapsed::Elapsed;
use super::sleep::Sleep;
use crate::types::Time;
use pin_project::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
#[derive(Debug)]
#[pin_project]
pub struct TimeoutFuture<F> {
#[pin]
future: F,
sleep: Sleep,
completed: bool,
timed_out: bool,
}
impl<F> TimeoutFuture<F> {
#[must_use]
pub fn new(future: F, deadline: Time) -> Self {
Self {
future,
sleep: Sleep::new(deadline),
completed: false,
timed_out: false,
}
}
#[must_use]
pub fn with_time_getter(future: F, deadline: Time, time_getter: fn() -> Time) -> Self {
Self {
future,
sleep: Sleep::with_time_getter(deadline, time_getter),
completed: false,
timed_out: false,
}
}
#[must_use]
pub fn after(now: Time, duration: Duration, future: F) -> Self {
Self {
future,
sleep: Sleep::after(now, duration),
completed: false,
timed_out: false,
}
}
#[must_use]
#[inline]
pub const fn deadline(&self) -> Time {
self.sleep.deadline()
}
#[must_use]
#[inline]
pub fn remaining(&self, now: Time) -> Duration {
self.sleep.remaining(now)
}
#[must_use]
#[inline]
pub fn is_elapsed(&self, now: Time) -> bool {
self.sleep.is_elapsed(now)
}
#[must_use]
#[inline]
pub const fn inner(&self) -> &F {
&self.future
}
#[inline]
pub fn inner_mut(&mut self) -> &mut F {
&mut self.future
}
#[must_use]
#[inline]
pub fn into_inner(self) -> F {
self.future
}
pub fn reset(&mut self, deadline: Time) {
self.completed = false;
self.timed_out = false;
self.sleep.reset(deadline);
}
pub fn reset_after(&mut self, now: Time, duration: Duration) {
self.completed = false;
self.timed_out = false;
self.sleep.reset_after(now, duration);
}
}
impl<F: Future + Unpin> TimeoutFuture<F> {
pub fn poll_with_time(
&mut self,
cx: &mut Context<'_>,
now: Time,
) -> Poll<Result<F::Output, Elapsed>> {
if self.completed || self.timed_out {
return Poll::Ready(Err(Elapsed::new(self.sleep.deadline())));
}
match Pin::new(&mut self.future).poll(cx) {
Poll::Ready(output) => {
self.completed = true;
self.timed_out = false;
return Poll::Ready(Ok(output));
}
Poll::Pending => {}
}
if self.sleep.poll_with_time(now).is_ready() {
self.completed = true;
self.timed_out = true;
return Poll::Ready(Err(Elapsed::new(self.sleep.deadline())));
}
let has_ambient_timer = crate::cx::Cx::current()
.and_then(|current| current.timer_driver())
.is_some();
if self.sleep.has_custom_time_getter() || has_ambient_timer {
match Pin::new(&mut self.sleep).poll(cx) {
Poll::Ready(()) => {
self.completed = true;
self.timed_out = true;
return Poll::Ready(Err(Elapsed::new(self.sleep.deadline())));
}
Poll::Pending => {}
}
}
Poll::Pending
}
}
impl<F: Future> Future for TimeoutFuture<F> {
type Output = Result<F::Output, Elapsed>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if *this.completed || *this.timed_out {
return Poll::Ready(Err(Elapsed::new(this.sleep.deadline())));
}
match this.future.poll(cx) {
Poll::Ready(output) => {
*this.completed = true;
*this.timed_out = false;
return Poll::Ready(Ok(output));
}
Poll::Pending => {}
}
let deadline = this.sleep.deadline();
match Pin::new(this.sleep).poll(cx) {
Poll::Ready(()) => {
*this.completed = true;
*this.timed_out = true;
Poll::Ready(Err(Elapsed::new(deadline)))
}
Poll::Pending => Poll::Pending,
}
}
}
impl<F: Clone> Clone for TimeoutFuture<F> {
fn clone(&self) -> Self {
Self {
future: self.future.clone(),
sleep: self.sleep.clone(),
completed: self.completed,
timed_out: self.timed_out,
}
}
}
#[must_use]
pub fn timeout<F>(now: Time, duration: Duration, future: F) -> TimeoutFuture<F> {
TimeoutFuture::after(now, duration, future)
}
#[must_use]
pub fn timeout_at<F>(deadline: Time, future: F) -> TimeoutFuture<F> {
TimeoutFuture::new(future, deadline)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::init_test_logging;
use std::future::Future;
use std::future::{pending, ready};
use std::pin::Pin;
use std::task::{Context, Poll, Waker};
fn init_test(name: &str) {
init_test_logging();
crate::test_phase!(name);
}
thread_local! {
static CURRENT_TIME: std::cell::Cell<u64> = const { std::cell::Cell::new(0) };
}
fn set_current_time(nanos: u64) {
CURRENT_TIME.with(|t| t.set(nanos));
}
fn get_current_time() -> u64 {
CURRENT_TIME.with(std::cell::Cell::get)
}
struct CountingFuture {
count: u32,
ready_at: u32,
}
impl Future for CountingFuture {
type Output = &'static str;
fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
self.count += 1;
if self.count >= self.ready_at {
Poll::Ready("done")
} else {
Poll::Pending
}
}
}
impl Unpin for CountingFuture {}
#[test]
fn new_creates_timeout() {
init_test("new_creates_timeout");
let future = ready(42);
let timeout = TimeoutFuture::new(future, Time::from_secs(5));
crate::assert_with_log!(
timeout.deadline() == Time::from_secs(5),
"deadline",
Time::from_secs(5),
timeout.deadline()
);
crate::test_complete!("new_creates_timeout");
}
#[test]
fn after_computes_deadline() {
init_test("after_computes_deadline");
let future = ready(42);
let timeout = TimeoutFuture::after(Time::from_secs(10), Duration::from_secs(5), future);
crate::assert_with_log!(
timeout.deadline() == Time::from_secs(15),
"deadline",
Time::from_secs(15),
timeout.deadline()
);
crate::test_complete!("after_computes_deadline");
}
#[test]
fn timeout_function() {
init_test("timeout_function");
let t = timeout(Time::from_secs(10), Duration::from_secs(3), ready(42));
crate::assert_with_log!(
t.deadline() == Time::from_secs(13),
"deadline",
Time::from_secs(13),
t.deadline()
);
crate::test_complete!("timeout_function");
}
#[test]
fn timeout_at_function() {
init_test("timeout_at_function");
let t = timeout_at(Time::from_secs(42), ready(123));
crate::assert_with_log!(
t.deadline() == Time::from_secs(42),
"deadline",
Time::from_secs(42),
t.deadline()
);
crate::test_complete!("timeout_at_function");
}
#[test]
fn remaining_before_deadline() {
init_test("remaining_before_deadline");
let t = TimeoutFuture::new(ready(42), Time::from_secs(10));
let remaining = t.remaining(Time::from_secs(7));
crate::assert_with_log!(
remaining == Duration::from_secs(3),
"remaining",
Duration::from_secs(3),
remaining
);
crate::test_complete!("remaining_before_deadline");
}
#[test]
fn remaining_after_deadline() {
init_test("remaining_after_deadline");
let t = TimeoutFuture::new(ready(42), Time::from_secs(10));
let remaining = t.remaining(Time::from_secs(15));
crate::assert_with_log!(
remaining == Duration::ZERO,
"remaining",
Duration::ZERO,
remaining
);
crate::test_complete!("remaining_after_deadline");
}
#[test]
fn is_elapsed() {
init_test("is_elapsed");
let t = TimeoutFuture::new(ready(42), Time::from_secs(10));
crate::assert_with_log!(
!t.is_elapsed(Time::from_secs(5)),
"not elapsed at t=5",
false,
t.is_elapsed(Time::from_secs(5))
);
crate::assert_with_log!(
t.is_elapsed(Time::from_secs(10)),
"elapsed at t=10",
true,
t.is_elapsed(Time::from_secs(10))
);
crate::assert_with_log!(
t.is_elapsed(Time::from_secs(15)),
"elapsed at t=15",
true,
t.is_elapsed(Time::from_secs(15))
);
crate::test_complete!("is_elapsed");
}
#[test]
fn inner() {
init_test("inner");
let future = ready(42);
let t = TimeoutFuture::new(future, Time::from_secs(5));
let _ = t.inner(); crate::test_complete!("inner");
}
#[test]
fn inner_mut() {
init_test("inner_mut");
let future = ready(42);
let mut t = TimeoutFuture::new(future, Time::from_secs(5));
let _inner = t.inner_mut(); crate::test_complete!("inner_mut");
}
#[test]
fn into_inner() {
init_test("into_inner");
let future = ready(42);
let t = TimeoutFuture::new(future, Time::from_secs(5));
let _inner = t.into_inner();
crate::test_complete!("into_inner");
}
#[test]
fn reset_changes_deadline() {
init_test("reset_changes_deadline");
let mut t = TimeoutFuture::new(ready(42), Time::from_secs(5));
t.reset(Time::from_secs(10));
crate::assert_with_log!(
t.deadline() == Time::from_secs(10),
"deadline",
Time::from_secs(10),
t.deadline()
);
crate::test_complete!("reset_changes_deadline");
}
#[test]
fn reset_after_changes_deadline() {
init_test("reset_after_changes_deadline");
let mut t = TimeoutFuture::new(ready(42), Time::from_secs(5));
t.reset_after(Time::from_secs(3), Duration::from_secs(7));
crate::assert_with_log!(
t.deadline() == Time::from_secs(10),
"deadline",
Time::from_secs(10),
t.deadline()
);
crate::test_complete!("reset_after_changes_deadline");
}
#[test]
fn poll_with_time_future_completes() {
init_test("poll_with_time_future_completes");
let mut t = TimeoutFuture::new(ready(42), Time::from_secs(10));
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let result = t.poll_with_time(&mut cx, Time::from_secs(5));
let ready = matches!(result, Poll::Ready(Ok(42)));
crate::assert_with_log!(ready, "ready ok", true, ready);
crate::test_complete!("poll_with_time_future_completes");
}
#[test]
fn poll_with_time_timeout_elapsed() {
init_test("poll_with_time_timeout_elapsed");
let mut t = TimeoutFuture::new(pending::<i32>(), Time::from_secs(10));
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let result = t.poll_with_time(&mut cx, Time::from_secs(15));
let elapsed = matches!(result, Poll::Ready(Err(_)));
crate::assert_with_log!(elapsed, "elapsed", true, elapsed);
if let Poll::Ready(Err(elapsed)) = result {
crate::assert_with_log!(
elapsed.deadline() == Time::from_secs(10),
"deadline",
Time::from_secs(10),
elapsed.deadline()
);
}
crate::test_complete!("poll_with_time_timeout_elapsed");
}
#[test]
fn poll_with_time_pending() {
init_test("poll_with_time_pending");
let mut t = TimeoutFuture::new(pending::<i32>(), Time::from_secs(10));
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let result = t.poll_with_time(&mut cx, Time::from_secs(5));
crate::assert_with_log!(result.is_pending(), "pending", true, result.is_pending());
crate::test_complete!("poll_with_time_pending");
}
#[test]
fn poll_with_time_at_exact_deadline() {
init_test("poll_with_time_at_exact_deadline");
let mut t = TimeoutFuture::new(pending::<i32>(), Time::from_secs(10));
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let result = t.poll_with_time(&mut cx, Time::from_secs(10));
let elapsed = matches!(result, Poll::Ready(Err(_)));
crate::assert_with_log!(elapsed, "elapsed at deadline", true, elapsed);
crate::test_complete!("poll_with_time_at_exact_deadline");
}
#[test]
fn poll_with_time_zero_deadline() {
init_test("poll_with_time_zero_deadline");
let mut t = TimeoutFuture::new(pending::<i32>(), Time::ZERO);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let result = t.poll_with_time(&mut cx, Time::ZERO);
let elapsed = matches!(result, Poll::Ready(Err(_)));
crate::assert_with_log!(elapsed, "elapsed at zero", true, elapsed);
crate::test_complete!("poll_with_time_zero_deadline");
}
#[test]
fn poll_with_time_returns_elapsed_after_success_completion() {
let mut t = TimeoutFuture::new(ready(42), Time::from_secs(10));
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let first = t.poll_with_time(&mut cx, Time::from_secs(5));
assert!(matches!(first, Poll::Ready(Ok(42))));
let repoll = t.poll_with_time(&mut cx, Time::from_secs(6));
assert!(matches!(repoll, Poll::Ready(Err(_))));
}
#[test]
fn poll_with_time_returns_elapsed_after_timeout_until_reset() {
set_current_time(0);
let future = CountingFuture {
count: 0,
ready_at: 3,
};
let mut t = TimeoutFuture::with_time_getter(future, Time::from_secs(5), test_now);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
assert!(t.poll_with_time(&mut cx, Time::from_secs(0)).is_pending());
let elapsed = t.poll_with_time(&mut cx, Time::from_secs(10));
assert!(matches!(elapsed, Poll::Ready(Err(_))));
let repoll = t.poll_with_time(&mut cx, Time::from_secs(11));
assert!(matches!(repoll, Poll::Ready(Err(_))));
t.reset(Time::from_secs(20));
let resumed = t.poll_with_time(&mut cx, Time::from_secs(12));
assert!(matches!(resumed, Poll::Ready(Ok("done"))));
}
fn test_now() -> Time {
Time::from_nanos(get_current_time())
}
#[test]
fn poll_returns_elapsed_after_success_completion() {
set_current_time(0);
let mut t = TimeoutFuture::with_time_getter(ready(42), Time::from_secs(10), test_now);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let first = Pin::new(&mut t).poll(&mut cx);
assert!(matches!(first, Poll::Ready(Ok(42))));
let repoll = Pin::new(&mut t).poll(&mut cx);
assert!(matches!(repoll, Poll::Ready(Err(_))));
}
#[test]
fn poll_returns_elapsed_after_timeout_until_reset() {
set_current_time(0);
let future = CountingFuture {
count: 0,
ready_at: 3,
};
let mut t = TimeoutFuture::with_time_getter(future, Time::from_secs(5), test_now);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
assert!(Pin::new(&mut t).poll(&mut cx).is_pending());
set_current_time(10_000_000_000);
let elapsed = Pin::new(&mut t).poll(&mut cx);
assert!(matches!(elapsed, Poll::Ready(Err(_))));
let repoll = Pin::new(&mut t).poll(&mut cx);
assert!(matches!(repoll, Poll::Ready(Err(_))));
t.reset(Time::from_secs(20));
set_current_time(12_000_000_000);
let resumed = Pin::new(&mut t).poll(&mut cx);
assert!(matches!(resumed, Poll::Ready(Ok("done"))));
}
#[test]
fn clone_copies_deadline_and_future() {
init_test("clone_copies_deadline_and_future");
let t = TimeoutFuture::new(ready(42), Time::from_secs(10));
let t2 = t.clone();
crate::assert_with_log!(
t.deadline() == Time::from_secs(10),
"t deadline",
Time::from_secs(10),
t.deadline()
);
crate::assert_with_log!(
t2.deadline() == Time::from_secs(10),
"t2 deadline",
Time::from_secs(10),
t2.deadline()
);
crate::test_complete!("clone_copies_deadline_and_future");
}
#[test]
fn simulated_timeout_scenario() {
init_test("simulated_timeout_scenario");
set_current_time(0);
let mut t = TimeoutFuture::with_time_getter(pending::<i32>(), Time::from_secs(5), test_now);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let pending = t.poll_with_time(&mut cx, Time::ZERO).is_pending();
crate::assert_with_log!(pending, "pending at t=0", true, pending);
let pending = t.poll_with_time(&mut cx, Time::from_secs(2)).is_pending();
crate::assert_with_log!(pending, "pending at t=2", true, pending);
let pending = t.poll_with_time(&mut cx, Time::from_secs(4)).is_pending();
crate::assert_with_log!(pending, "pending at t=4", true, pending);
let result = t.poll_with_time(&mut cx, Time::from_secs(5));
let elapsed = matches!(result, Poll::Ready(Err(_)));
crate::assert_with_log!(elapsed, "elapsed at t=5", true, elapsed);
crate::test_complete!("simulated_timeout_scenario");
}
#[test]
fn simulated_success_scenario() {
init_test("simulated_success_scenario");
let future = CountingFuture {
count: 0,
ready_at: 3,
};
let mut t = TimeoutFuture::new(future, Time::from_secs(10));
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
let pending = t.poll_with_time(&mut cx, Time::from_secs(1)).is_pending();
crate::assert_with_log!(pending, "pending at t=1", true, pending);
let pending = t.poll_with_time(&mut cx, Time::from_secs(2)).is_pending();
crate::assert_with_log!(pending, "pending at t=2", true, pending);
let result = t.poll_with_time(&mut cx, Time::from_secs(3));
let ready = matches!(result, Poll::Ready(Ok("done")));
crate::assert_with_log!(ready, "ready at t=3", true, ready);
crate::test_complete!("simulated_success_scenario");
}
fn noop_waker() -> Waker {
std::task::Waker::noop().clone()
}
}