async_sleep/
impl_tokio.rs1use alloc::boxed::Box;
2use core::time::Duration;
3
4pub use tokio::time::{Sleep, Sleep as TokioTimeSleep};
5
6use crate::{Sleepble, SleepbleWaitBoxFuture};
7
8impl Sleepble for Sleep {
10 fn sleep(dur: Duration) -> Self {
11 tokio::time::sleep(tokio::time::Duration::from_micros(dur.as_micros() as u64))
12 }
13
14 fn wait(self) -> SleepbleWaitBoxFuture {
15 Box::pin(self)
16 }
17}
18
19#[derive(Debug)]
21pub struct UnpinSleep(pub Sleep);
22impl Unpin for UnpinSleep {}
23
24impl Sleepble for UnpinSleep {
25 fn sleep(dur: Duration) -> Self {
26 UnpinSleep(tokio::time::sleep(tokio::time::Duration::from_micros(
27 dur.as_micros() as u64,
28 )))
29 }
30
31 fn wait(self) -> SleepbleWaitBoxFuture {
32 Box::pin(self.0)
33 }
34}
35
36#[cfg(test)]
37mod tests {
38 #[allow(unused_imports)]
39 use super::*;
40
41 #[cfg(feature = "std")]
42 #[tokio::test]
43 async fn test_sleep() {
44 {
45 #[cfg(feature = "std")]
46 let now = std::time::Instant::now();
47
48 crate::sleep::sleep::<Sleep>(Duration::from_millis(100)).await;
49
50 #[cfg(feature = "std")]
51 {
52 let elapsed_dur = now.elapsed();
53 assert!(elapsed_dur.as_millis() >= 100 && elapsed_dur.as_millis() <= 105);
54 }
55 }
56
57 {
58 #[cfg(feature = "std")]
59 let now = std::time::Instant::now();
60
61 crate::sleep::sleep::<UnpinSleep>(Duration::from_millis(100)).await;
62
63 #[cfg(feature = "std")]
64 {
65 let elapsed_dur = now.elapsed();
66 assert!(elapsed_dur.as_millis() >= 100 && elapsed_dur.as_millis() <= 105);
67 }
68 }
69 }
70
71 #[cfg(feature = "std")]
72 #[tokio::test]
73 async fn test_sleep_until() {
74 {
75 let now = std::time::Instant::now();
76
77 crate::sleep::sleep_until::<Sleep>(
78 std::time::Instant::now() + Duration::from_millis(100),
79 )
80 .await;
81
82 let elapsed_dur = now.elapsed();
83 assert!(elapsed_dur.as_millis() >= 100 && elapsed_dur.as_millis() <= 105);
84 }
85
86 {
87 let now = std::time::Instant::now();
88
89 crate::sleep::sleep_until::<UnpinSleep>(
90 std::time::Instant::now() + Duration::from_millis(100),
91 )
92 .await;
93
94 let elapsed_dur = now.elapsed();
95 assert!(elapsed_dur.as_millis() >= 100 && elapsed_dur.as_millis() <= 105);
96 }
97 }
98
99 #[cfg(feature = "rw")]
100 #[cfg(test)]
101 mod rw_tests {
102 use core::time::Duration;
103 use std::{io::ErrorKind as IoErrorKind, time::Instant};
104
105 use async_compat::Compat;
106 use tokio::{
107 net::{TcpListener, TcpStream},
108 runtime::Runtime,
109 };
110
111 use crate::{
112 impl_tokio::Sleep,
113 rw::{AsyncReadWithTimeoutExt as _, AsyncWriteWithTimeoutExt as _},
114 };
115
116 #[test]
117 fn simple() -> Result<(), Box<dyn std::error::Error>> {
118 let rt = Runtime::new().unwrap();
119
120 let ret = rt.block_on(async {
121 let listener = TcpListener::bind("127.0.0.1:0").await?;
122
123 let addr = listener.local_addr()?;
124
125 let tcp_stream_c = TcpStream::connect(addr).await?;
126 let mut tcp_stream_c = Compat::new(tcp_stream_c);
127 let (tcp_stream_s, _) = listener.accept().await.expect("Accept failed");
128 let mut tcp_stream_s = Compat::new(tcp_stream_s);
129
130 tcp_stream_s
131 .write_with_timeout::<Sleep>(b"foo", Duration::from_secs(1))
132 .await?;
133
134 let mut buf = vec![0u8; 5];
135 let n = tcp_stream_c
136 .read_with_timeout::<Sleep>(&mut buf, Duration::from_secs(1))
137 .await?;
138 assert_eq!(n, 3);
139 assert_eq!(buf, b"foo\0\0");
140
141 let instant = Instant::now();
142 let two_secs = Duration::from_secs(2);
143 let three_secs = Duration::from_secs(3);
144 let err = tcp_stream_c
145 .read_with_timeout::<Sleep>(&mut buf, Duration::from_secs(2))
146 .await
147 .err()
148 .unwrap();
149 assert!(instant.elapsed() >= two_secs);
150 assert!(instant.elapsed() < three_secs);
151 assert_eq!(err.kind(), IoErrorKind::TimedOut);
152 assert_eq!(err.to_string(), "read timeout");
153
154 Result::<(), Box<dyn std::error::Error>>::Ok(())
155 });
156
157 match ret {
158 Ok(_) => {}
159 Err(err) => panic!("{err}"),
160 }
161
162 Ok(())
163 }
164 }
165}