async_sleep/
impl_tokio.rs

1use alloc::boxed::Box;
2use core::time::Duration;
3
4pub use tokio::time::{Sleep, Sleep as TokioTimeSleep};
5
6use crate::{Sleepble, SleepbleWaitBoxFuture};
7
8//
9impl Sleepble for Sleep {
10    fn sleep(dur: Duration) -> Self {
11        tokio::time::sleep(tokio::time::Duration::from_micros(dur.as_micros() as u64))
12    }
13
14    fn wait(self) -> SleepbleWaitBoxFuture {
15        Box::pin(self)
16    }
17}
18
19//
20#[derive(Debug)]
21pub struct UnpinSleep(pub Sleep);
22impl Unpin for UnpinSleep {}
23
24impl Sleepble for UnpinSleep {
25    fn sleep(dur: Duration) -> Self {
26        UnpinSleep(tokio::time::sleep(tokio::time::Duration::from_micros(
27            dur.as_micros() as u64,
28        )))
29    }
30
31    fn wait(self) -> SleepbleWaitBoxFuture {
32        Box::pin(self.0)
33    }
34}
35
36#[cfg(test)]
37mod tests {
38    #[allow(unused_imports)]
39    use super::*;
40
41    #[cfg(feature = "std")]
42    #[tokio::test]
43    async fn test_sleep() {
44        {
45            #[cfg(feature = "std")]
46            let now = std::time::Instant::now();
47
48            crate::sleep::sleep::<Sleep>(Duration::from_millis(100)).await;
49
50            #[cfg(feature = "std")]
51            {
52                let elapsed_dur = now.elapsed();
53                assert!(elapsed_dur.as_millis() >= 100 && elapsed_dur.as_millis() <= 105);
54            }
55        }
56
57        {
58            #[cfg(feature = "std")]
59            let now = std::time::Instant::now();
60
61            crate::sleep::sleep::<UnpinSleep>(Duration::from_millis(100)).await;
62
63            #[cfg(feature = "std")]
64            {
65                let elapsed_dur = now.elapsed();
66                assert!(elapsed_dur.as_millis() >= 100 && elapsed_dur.as_millis() <= 105);
67            }
68        }
69    }
70
71    #[cfg(feature = "std")]
72    #[tokio::test]
73    async fn test_sleep_until() {
74        {
75            let now = std::time::Instant::now();
76
77            crate::sleep::sleep_until::<Sleep>(
78                std::time::Instant::now() + Duration::from_millis(100),
79            )
80            .await;
81
82            let elapsed_dur = now.elapsed();
83            assert!(elapsed_dur.as_millis() >= 100 && elapsed_dur.as_millis() <= 105);
84        }
85
86        {
87            let now = std::time::Instant::now();
88
89            crate::sleep::sleep_until::<UnpinSleep>(
90                std::time::Instant::now() + Duration::from_millis(100),
91            )
92            .await;
93
94            let elapsed_dur = now.elapsed();
95            assert!(elapsed_dur.as_millis() >= 100 && elapsed_dur.as_millis() <= 105);
96        }
97    }
98
99    #[cfg(feature = "rw")]
100    #[cfg(test)]
101    mod rw_tests {
102        use core::time::Duration;
103        use std::{io::ErrorKind as IoErrorKind, time::Instant};
104
105        use async_compat::Compat;
106        use tokio::{
107            net::{TcpListener, TcpStream},
108            runtime::Runtime,
109        };
110
111        use crate::{
112            impl_tokio::Sleep,
113            rw::{AsyncReadWithTimeoutExt as _, AsyncWriteWithTimeoutExt as _},
114        };
115
116        #[test]
117        fn simple() -> Result<(), Box<dyn std::error::Error>> {
118            let rt = Runtime::new().unwrap();
119
120            let ret = rt.block_on(async {
121                let listener = TcpListener::bind("127.0.0.1:0").await?;
122
123                let addr = listener.local_addr()?;
124
125                let tcp_stream_c = TcpStream::connect(addr).await?;
126                let mut tcp_stream_c = Compat::new(tcp_stream_c);
127                let (tcp_stream_s, _) = listener.accept().await.expect("Accept failed");
128                let mut tcp_stream_s = Compat::new(tcp_stream_s);
129
130                tcp_stream_s
131                    .write_with_timeout::<Sleep>(b"foo", Duration::from_secs(1))
132                    .await?;
133
134                let mut buf = vec![0u8; 5];
135                let n = tcp_stream_c
136                    .read_with_timeout::<Sleep>(&mut buf, Duration::from_secs(1))
137                    .await?;
138                assert_eq!(n, 3);
139                assert_eq!(buf, b"foo\0\0");
140
141                let instant = Instant::now();
142                let two_secs = Duration::from_secs(2);
143                let three_secs = Duration::from_secs(3);
144                let err = tcp_stream_c
145                    .read_with_timeout::<Sleep>(&mut buf, Duration::from_secs(2))
146                    .await
147                    .err()
148                    .unwrap();
149                assert!(instant.elapsed() >= two_secs);
150                assert!(instant.elapsed() < three_secs);
151                assert_eq!(err.kind(), IoErrorKind::TimedOut);
152                assert_eq!(err.to_string(), "read timeout");
153
154                Result::<(), Box<dyn std::error::Error>>::Ok(())
155            });
156
157            match ret {
158                Ok(_) => {}
159                Err(err) => panic!("{err}"),
160            }
161
162            Ok(())
163        }
164    }
165}