agnostic_lite/tokio/
timeout.rs

1use crate::time::{AsyncLocalTimeout, AsyncTimeout, Elapsed};
2
3use ::tokio::time::{timeout, timeout_at, Timeout};
4use core::{
5  future::Future,
6  pin::Pin,
7  task::{Context, Poll},
8  time::Duration,
9};
10
11pin_project_lite::pin_project! {
12  /// The [`AsyncTimeout`] implementation for tokio runtime
13  #[repr(transparent)]
14  pub struct TokioTimeout<F> {
15    #[pin]
16    inner: Timeout<F>,
17  }
18}
19
20impl<F> From<Timeout<F>> for TokioTimeout<F> {
21  fn from(timeout: Timeout<F>) -> Self {
22    Self { inner: timeout }
23  }
24}
25
26impl<F> From<TokioTimeout<F>> for Timeout<F> {
27  fn from(timeout: TokioTimeout<F>) -> Self {
28    timeout.inner
29  }
30}
31
32impl<F: Future> Future for TokioTimeout<F> {
33  type Output = Result<F::Output, Elapsed>;
34
35  fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
36    match self.project().inner.poll(cx) {
37      Poll::Ready(Ok(rst)) => Poll::Ready(Ok(rst)),
38      Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
39      Poll::Pending => Poll::Pending,
40    }
41  }
42}
43
44impl<F: Future + Send> AsyncTimeout<F> for TokioTimeout<F> {
45  type Instant = ::tokio::time::Instant;
46
47  fn timeout(t: Duration, fut: F) -> Self
48  where
49    Self: Sized,
50  {
51    <Self as AsyncLocalTimeout<F>>::timeout_local(t, fut)
52  }
53
54  fn timeout_at(deadline: Self::Instant, fut: F) -> Self
55  where
56    Self: Sized,
57  {
58    <Self as AsyncLocalTimeout<F>>::timeout_local_at(deadline, fut)
59  }
60}
61
62impl<F> AsyncLocalTimeout<F> for TokioTimeout<F>
63where
64  F: Future,
65{
66  type Instant = tokio::time::Instant;
67
68  fn timeout_local(t: Duration, fut: F) -> Self
69  where
70    Self: Sized,
71  {
72    Self {
73      inner: timeout(t, fut),
74    }
75  }
76
77  fn timeout_local_at(deadline: Self::Instant, fut: F) -> Self
78  where
79    Self: Sized,
80  {
81    Self {
82      inner: timeout_at(deadline, fut),
83    }
84  }
85}
86
87#[cfg(test)]
88mod tests {
89  use super::{AsyncTimeout, TokioTimeout};
90  use std::time::{Duration, Instant};
91
92  const BAD: Duration = Duration::from_secs(1);
93  const GOOD: Duration = Duration::from_millis(10);
94  const TIMEOUT: Duration = Duration::from_millis(200);
95  const BOUND: Duration = Duration::from_secs(10);
96
97  #[tokio::test(flavor = "multi_thread")]
98  async fn test_timeout() {
99    futures::executor::block_on(async {
100      let fut = async {
101        tokio::time::sleep(BAD).await;
102        1
103      };
104      let start = Instant::now();
105      let rst = TokioTimeout::timeout(TIMEOUT, fut).await;
106      assert!(rst.is_err());
107      let elapsed = start.elapsed();
108      assert!(elapsed >= TIMEOUT && elapsed <= TIMEOUT + BOUND);
109
110      let fut = async {
111        tokio::time::sleep(GOOD).await;
112        1
113      };
114
115      let start = Instant::now();
116      let rst = TokioTimeout::timeout(TIMEOUT, fut).await;
117      assert!(rst.is_ok());
118      let elapsed = start.elapsed();
119      assert!(elapsed >= GOOD && elapsed <= GOOD + BOUND);
120    });
121  }
122
123  #[tokio::test(flavor = "multi_thread")]
124  async fn test_timeout_at() {
125    futures::executor::block_on(async {
126      let fut = async {
127        tokio::time::sleep(BAD).await;
128        1
129      };
130      let start = Instant::now();
131      let rst = TokioTimeout::timeout_at(::tokio::time::Instant::now() + TIMEOUT, fut).await;
132      assert!(rst.is_err());
133      let elapsed = start.elapsed();
134      assert!(elapsed >= TIMEOUT && elapsed <= TIMEOUT + BOUND);
135
136      let fut = async {
137        tokio::time::sleep(GOOD).await;
138        1
139      };
140
141      let start = Instant::now();
142      let rst = TokioTimeout::timeout_at(::tokio::time::Instant::now() + TIMEOUT, fut).await;
143      assert!(rst.is_ok());
144      let elapsed = start.elapsed();
145      assert!(elapsed >= GOOD && elapsed <= GOOD + BOUND);
146    });
147  }
148}