1#[macro_use]
4extern crate pin_project;
5
6mod ext;
7
8pub mod cancel;
9pub mod timeout;
10
11pub use self::ext::*;
12
13#[cfg(test)]
14mod test {
15 use std::time::Duration;
16
17 use futures::FutureExt;
18
19 use crate::BlockzFutureExt;
20
21 #[tokio::test]
22 async fn test_blockz_future_ext_with_cancel_handle_future_dropped() {
23 let fut = async {
24 tokio::time::sleep(std::time::Duration::from_millis(2)).await;
25 };
26
27 let (cancel, cancel_handle) = fut.with_cancel_handle();
28
29 tokio::select! {
30 _ = tokio::time::sleep(std::time::Duration::from_millis(1)) => {},
31 _ = cancel => {
32 panic!("cancelable future completed before the sleep");
33 },
34 }
35
36 assert!(!cancel_handle.cancel());
39 }
40
41 #[tokio::test]
42 async fn test_blockz_future_ext_with_cancel_handle_future_completed() {
43 let fut = async {
44 tokio::time::sleep(std::time::Duration::from_millis(1)).await;
45 };
46
47 let (cancel, cancel_handle) = fut.with_cancel_handle();
48
49 tokio::select! {
50 _ = tokio::time::sleep(std::time::Duration::from_millis(2)) => {
51 panic!("sleep completed before cancelable future");
52 },
53 _ = cancel => {},
54 }
55
56 assert!(!cancel_handle.cancel());
58 }
59
60 #[tokio::test]
61 async fn test_blockz_future_ext_with_cancel_handle_cancel_ok() {
62 let fut = async {
63 tokio::time::sleep(std::time::Duration::from_millis(2)).await;
64 };
65
66 let (cancel, cancel_handle) = fut.with_cancel_handle();
67 let shared = cancel.shared();
70
71 tokio::select! {
72 _ = tokio::time::sleep(std::time::Duration::from_millis(1)) => {},
73 _ = shared.clone() => {
74 panic!("cancelable future completed before the sleep");
75 },
76 }
77
78 assert!(cancel_handle.cancel());
79
80 let result = shared.await;
81 assert!(result.is_err());
82 }
83
84 #[tokio::test]
85 async fn test_blockz_future_ext_with_cancel_handle_ok() {
86 let fut = async {
87 tokio::time::sleep(std::time::Duration::from_millis(1)).await;
88 };
89
90 let (cancel, cancel_handle) = fut.with_cancel_handle();
91 let shared = cancel.shared();
94
95 tokio::select! {
96 _ = tokio::time::sleep(std::time::Duration::from_millis(2)) => {},
97 _ = shared.clone() => {},
98 }
99
100 assert!(!cancel_handle.cancel());
101
102 let result = shared.await;
103 assert!(result.is_ok());
104 }
105
106 #[tokio::test]
107 async fn test_blockz_future_ext_with_cancel_future_cancel() {
108 let fut = async {
109 tokio::time::sleep(std::time::Duration::from_millis(2)).await;
110 };
111
112 let cancel = fut.with_cancel_future(async {
113 tokio::time::sleep(std::time::Duration::from_millis(1)).await;
114 });
115
116 let result = cancel.await;
117 assert!(result.is_err());
118 }
119
120 #[tokio::test]
121 async fn test_blockz_future_ext_with_cancel_future_ok() {
122 let fut = async {
123 tokio::time::sleep(std::time::Duration::from_millis(1)).await;
124 };
125
126 let cancel = fut.with_cancel_future(async {
127 tokio::time::sleep(std::time::Duration::from_millis(2)).await;
128 });
129
130 let result = cancel.await;
131 assert!(result.is_ok());
132 }
133
134 #[tokio::test]
135 async fn test_blockz_future_ext_with_cancel_channel_cancel() {
136 let fut = async {
137 tokio::time::sleep(std::time::Duration::from_millis(2)).await;
138 };
139
140 let (tx, rx) = tokio::sync::oneshot::channel();
141
142 let cancel = fut.with_cancel_channel(rx);
143 let shared = cancel.shared();
146
147 tokio::select! {
148 _ = tokio::time::sleep(std::time::Duration::from_millis(1)) => {},
149 _ = shared.clone() => {
150 panic!("cancelable future completed before the sleep");
151 },
152 }
153
154 assert!(tx.send(()).is_ok());
155
156 let result = shared.await;
157 assert!(result.is_err());
158 }
159
160 #[tokio::test]
161 async fn test_blockz_future_ext_with_cancel_channel_ok() {
162 let fut = async {
163 tokio::time::sleep(std::time::Duration::from_millis(1)).await;
164 };
165
166 let (_tx, rx) = tokio::sync::oneshot::channel();
167
168 let cancel = fut.with_cancel_channel(rx);
169
170 let result = cancel.await;
171 assert!(result.is_ok());
172 }
173
174 #[tokio::test]
175 async fn test_blockz_future_ext_timeout() {
176 let fut = async {
177 tokio::time::sleep(std::time::Duration::from_millis(2)).await;
178 };
179
180 assert!(fut
181 .timeout(std::time::Duration::from_millis(1))
182 .await
183 .is_err());
184 }
185
186 #[tokio::test]
187 async fn test_blockz_future_ext_timeout_ok() {
188 let fut = async {
189 tokio::time::sleep(std::time::Duration::from_millis(1)).await;
190 };
191
192 assert!(fut
193 .timeout(std::time::Duration::from_millis(2))
194 .await
195 .is_ok());
196 }
197
198 #[tokio::test]
199 async fn test_blockz_future_ext_deadline() {
200 let fut = async {
201 tokio::time::sleep(std::time::Duration::from_millis(3)).await;
202 };
203
204 assert!(fut
205 .deadline(std::time::Instant::now() + std::time::Duration::from_millis(1))
206 .await
207 .is_err());
208 }
209
210 #[tokio::test]
211 async fn test_blockz_future_ext_deadline_ok() {
212 let fut = async {
213 tokio::time::sleep(std::time::Duration::from_millis(1)).await;
214 };
215
216 assert!(fut
217 .deadline(std::time::Instant::now() + std::time::Duration::from_millis(2))
218 .await
219 .is_ok());
220 }
221
222 #[tokio::test]
223 async fn test_interrupt_chain_timed_out() {
224 let fut = async {
225 tokio::time::sleep(std::time::Duration::from_millis(3)).await;
226 };
227
228 let (fut, cancel) = fut.timeout(Duration::from_millis(1)).with_cancel_handle();
229 let fut = fut.shared();
230
231 tokio::select! {
232 _ = tokio::time::sleep(std::time::Duration::from_millis(2)) => {},
233 _ = fut.clone() => {},
234 };
235
236 assert!(!cancel.cancel());
237 assert!(matches!(fut.await, Ok(Err(_))));
238 }
239
240 #[tokio::test]
241 async fn test_interrupt_chain_canceled() {
242 let fut = async {
243 tokio::time::sleep(std::time::Duration::from_millis(3)).await;
244 };
245
246 let (fut, cancel) = fut.timeout(Duration::from_millis(2)).with_cancel_handle();
247 let fut = fut.shared();
248
249 tokio::select! {
250 _ = tokio::time::sleep(std::time::Duration::from_millis(1)) => {},
251 _ = fut.clone() => {},
252 };
253
254 assert!(cancel.cancel());
255 assert!(matches!(fut.await, Err(_)));
256 }
257
258 #[tokio::test]
259 async fn test_interrupt_chain_ok() {
260 let fut = async {
261 tokio::time::sleep(std::time::Duration::from_millis(1)).await;
262 Ok::<_, &str>(())
263 };
264
265 let (fut, cancel) = fut.timeout(Duration::from_millis(2)).with_cancel_handle();
266 let fut = fut.shared();
267
268 tokio::select! {
269 _ = tokio::time::sleep(std::time::Duration::from_millis(2)) => {},
270 _ = fut.clone() => {},
271 };
272
273 assert!(!cancel.cancel());
274 assert!(matches!(fut.await, Ok(Ok(Ok(())))));
275 }
276
277 #[tokio::test]
278 async fn test_interrupt_chain_err() {
279 let fut = async {
280 tokio::time::sleep(std::time::Duration::from_millis(1)).await;
281 Err::<(), _>("error")
282 };
283
284 let (fut, cancel) = fut.timeout(Duration::from_millis(2)).with_cancel_handle();
285 let fut = fut.shared();
286
287 tokio::select! {
288 _ = tokio::time::sleep(std::time::Duration::from_millis(2)) => {},
289 _ = fut.clone() => {},
290 };
291
292 assert!(!cancel.cancel());
293 assert!(matches!(fut.await, Ok(Ok(Err("error")))));
294 }
295}