agnostic_lite/tokio/
timeout.rs1use crate::time::{AsyncLocalTimeout, AsyncTimeout, Elapsed};
2
3use ::tokio::time::{timeout, timeout_at, Timeout};
4use core::{
5 future::Future,
6 pin::Pin,
7 task::{Context, Poll},
8 time::Duration,
9};
10
11pin_project_lite::pin_project! {
12 #[repr(transparent)]
14 pub struct TokioTimeout<F> {
15 #[pin]
16 inner: Timeout<F>,
17 }
18}
19
20impl<F> From<Timeout<F>> for TokioTimeout<F> {
21 fn from(timeout: Timeout<F>) -> Self {
22 Self { inner: timeout }
23 }
24}
25
26impl<F> From<TokioTimeout<F>> for Timeout<F> {
27 fn from(timeout: TokioTimeout<F>) -> Self {
28 timeout.inner
29 }
30}
31
32impl<F: Future> Future for TokioTimeout<F> {
33 type Output = Result<F::Output, Elapsed>;
34
35 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
36 match self.project().inner.poll(cx) {
37 Poll::Ready(Ok(rst)) => Poll::Ready(Ok(rst)),
38 Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
39 Poll::Pending => Poll::Pending,
40 }
41 }
42}
43
44impl<F: Future + Send> AsyncTimeout<F> for TokioTimeout<F> {
45 type Instant = ::tokio::time::Instant;
46
47 fn timeout(t: Duration, fut: F) -> Self
48 where
49 Self: Sized,
50 {
51 <Self as AsyncLocalTimeout<F>>::timeout_local(t, fut)
52 }
53
54 fn timeout_at(deadline: Self::Instant, fut: F) -> Self
55 where
56 Self: Sized,
57 {
58 <Self as AsyncLocalTimeout<F>>::timeout_local_at(deadline, fut)
59 }
60}
61
62impl<F> AsyncLocalTimeout<F> for TokioTimeout<F>
63where
64 F: Future,
65{
66 type Instant = tokio::time::Instant;
67
68 fn timeout_local(t: Duration, fut: F) -> Self
69 where
70 Self: Sized,
71 {
72 Self {
73 inner: timeout(t, fut),
74 }
75 }
76
77 fn timeout_local_at(deadline: Self::Instant, fut: F) -> Self
78 where
79 Self: Sized,
80 {
81 Self {
82 inner: timeout_at(deadline, fut),
83 }
84 }
85}
86
87#[cfg(test)]
88mod tests {
89 use super::{AsyncTimeout, TokioTimeout};
90 use std::time::{Duration, Instant};
91
92 const BAD: Duration = Duration::from_secs(1);
93 const GOOD: Duration = Duration::from_millis(10);
94 const TIMEOUT: Duration = Duration::from_millis(200);
95 const BOUND: Duration = Duration::from_secs(10);
96
97 #[tokio::test(flavor = "multi_thread")]
98 async fn test_timeout() {
99 futures::executor::block_on(async {
100 let fut = async {
101 tokio::time::sleep(BAD).await;
102 1
103 };
104 let start = Instant::now();
105 let rst = TokioTimeout::timeout(TIMEOUT, fut).await;
106 assert!(rst.is_err());
107 let elapsed = start.elapsed();
108 assert!(elapsed >= TIMEOUT && elapsed <= TIMEOUT + BOUND);
109
110 let fut = async {
111 tokio::time::sleep(GOOD).await;
112 1
113 };
114
115 let start = Instant::now();
116 let rst = TokioTimeout::timeout(TIMEOUT, fut).await;
117 assert!(rst.is_ok());
118 let elapsed = start.elapsed();
119 assert!(elapsed >= GOOD && elapsed <= GOOD + BOUND);
120 });
121 }
122
123 #[tokio::test(flavor = "multi_thread")]
124 async fn test_timeout_at() {
125 futures::executor::block_on(async {
126 let fut = async {
127 tokio::time::sleep(BAD).await;
128 1
129 };
130 let start = Instant::now();
131 let rst = TokioTimeout::timeout_at(::tokio::time::Instant::now() + TIMEOUT, fut).await;
132 assert!(rst.is_err());
133 let elapsed = start.elapsed();
134 assert!(elapsed >= TIMEOUT && elapsed <= TIMEOUT + BOUND);
135
136 let fut = async {
137 tokio::time::sleep(GOOD).await;
138 1
139 };
140
141 let start = Instant::now();
142 let rst = TokioTimeout::timeout_at(::tokio::time::Instant::now() + TIMEOUT, fut).await;
143 assert!(rst.is_ok());
144 let elapsed = start.elapsed();
145 assert!(elapsed >= GOOD && elapsed <= GOOD + BOUND);
146 });
147 }
148}