blockz_futures/
lib.rs

1//! Utilities for working with futures for the tokio stack.
2
3#[macro_use]
4extern crate pin_project;
5
6mod ext;
7
8pub mod cancel;
9pub mod timeout;
10
11pub use self::ext::*;
12
13#[cfg(test)]
14mod test {
15    use std::time::Duration;
16
17    use futures::FutureExt;
18
19    use crate::BlockzFutureExt;
20
21    #[tokio::test]
22    async fn test_blockz_future_ext_with_cancel_handle_future_dropped() {
23        let fut = async {
24            tokio::time::sleep(std::time::Duration::from_millis(2)).await;
25        };
26
27        let (cancel, cancel_handle) = fut.with_cancel_handle();
28
29        tokio::select! {
30            _ = tokio::time::sleep(std::time::Duration::from_millis(1)) => {},
31            _ = cancel => {
32                panic!("cancelable future completed before the sleep");
33            },
34        }
35
36        // the future has not been canceled since it has been already dropped
37        // when the `sleep` finished earlier
38        assert!(!cancel_handle.cancel());
39    }
40
41    #[tokio::test]
42    async fn test_blockz_future_ext_with_cancel_handle_future_completed() {
43        let fut = async {
44            tokio::time::sleep(std::time::Duration::from_millis(1)).await;
45        };
46
47        let (cancel, cancel_handle) = fut.with_cancel_handle();
48
49        tokio::select! {
50            _ = tokio::time::sleep(std::time::Duration::from_millis(2)) => {
51                panic!("sleep completed before cancelable future");
52            },
53            _ = cancel => {},
54        }
55
56        // the future has not been canceled since it has finished already
57        assert!(!cancel_handle.cancel());
58    }
59
60    #[tokio::test]
61    async fn test_blockz_future_ext_with_cancel_handle_cancel_ok() {
62        let fut = async {
63            tokio::time::sleep(std::time::Duration::from_millis(2)).await;
64        };
65
66        let (cancel, cancel_handle) = fut.with_cancel_handle();
67        // we create a shared future so that it doesn't get dropped when the
68        // `sleep` finishes earlier
69        let shared = cancel.shared();
70
71        tokio::select! {
72            _ = tokio::time::sleep(std::time::Duration::from_millis(1)) => {},
73            _ = shared.clone() => {
74                panic!("cancelable future completed before the sleep");
75            },
76        }
77
78        assert!(cancel_handle.cancel());
79
80        let result = shared.await;
81        assert!(result.is_err());
82    }
83
84    #[tokio::test]
85    async fn test_blockz_future_ext_with_cancel_handle_ok() {
86        let fut = async {
87            tokio::time::sleep(std::time::Duration::from_millis(1)).await;
88        };
89
90        let (cancel, cancel_handle) = fut.with_cancel_handle();
91        // we create a shared future so that it doesn't get dropped when the
92        // `sleep` finishes earlier
93        let shared = cancel.shared();
94
95        tokio::select! {
96            _ = tokio::time::sleep(std::time::Duration::from_millis(2)) => {},
97            _ = shared.clone() => {},
98        }
99
100        assert!(!cancel_handle.cancel());
101
102        let result = shared.await;
103        assert!(result.is_ok());
104    }
105
106    #[tokio::test]
107    async fn test_blockz_future_ext_with_cancel_future_cancel() {
108        let fut = async {
109            tokio::time::sleep(std::time::Duration::from_millis(2)).await;
110        };
111
112        let cancel = fut.with_cancel_future(async {
113            tokio::time::sleep(std::time::Duration::from_millis(1)).await;
114        });
115
116        let result = cancel.await;
117        assert!(result.is_err());
118    }
119
120    #[tokio::test]
121    async fn test_blockz_future_ext_with_cancel_future_ok() {
122        let fut = async {
123            tokio::time::sleep(std::time::Duration::from_millis(1)).await;
124        };
125
126        let cancel = fut.with_cancel_future(async {
127            tokio::time::sleep(std::time::Duration::from_millis(2)).await;
128        });
129
130        let result = cancel.await;
131        assert!(result.is_ok());
132    }
133
134    #[tokio::test]
135    async fn test_blockz_future_ext_with_cancel_channel_cancel() {
136        let fut = async {
137            tokio::time::sleep(std::time::Duration::from_millis(2)).await;
138        };
139
140        let (tx, rx) = tokio::sync::oneshot::channel();
141
142        let cancel = fut.with_cancel_channel(rx);
143        // we create a shared future so that it doesn't get dropped when the
144        // `sleep` finishes earlier
145        let shared = cancel.shared();
146
147        tokio::select! {
148            _ = tokio::time::sleep(std::time::Duration::from_millis(1)) => {},
149            _ = shared.clone() => {
150                panic!("cancelable future completed before the sleep");
151            },
152        }
153
154        assert!(tx.send(()).is_ok());
155
156        let result = shared.await;
157        assert!(result.is_err());
158    }
159
160    #[tokio::test]
161    async fn test_blockz_future_ext_with_cancel_channel_ok() {
162        let fut = async {
163            tokio::time::sleep(std::time::Duration::from_millis(1)).await;
164        };
165
166        let (_tx, rx) = tokio::sync::oneshot::channel();
167
168        let cancel = fut.with_cancel_channel(rx);
169
170        let result = cancel.await;
171        assert!(result.is_ok());
172    }
173
174    #[tokio::test]
175    async fn test_blockz_future_ext_timeout() {
176        let fut = async {
177            tokio::time::sleep(std::time::Duration::from_millis(2)).await;
178        };
179
180        assert!(fut
181            .timeout(std::time::Duration::from_millis(1))
182            .await
183            .is_err());
184    }
185
186    #[tokio::test]
187    async fn test_blockz_future_ext_timeout_ok() {
188        let fut = async {
189            tokio::time::sleep(std::time::Duration::from_millis(1)).await;
190        };
191
192        assert!(fut
193            .timeout(std::time::Duration::from_millis(2))
194            .await
195            .is_ok());
196    }
197
198    #[tokio::test]
199    async fn test_blockz_future_ext_deadline() {
200        let fut = async {
201            tokio::time::sleep(std::time::Duration::from_millis(3)).await;
202        };
203
204        assert!(fut
205            .deadline(std::time::Instant::now() + std::time::Duration::from_millis(1))
206            .await
207            .is_err());
208    }
209
210    #[tokio::test]
211    async fn test_blockz_future_ext_deadline_ok() {
212        let fut = async {
213            tokio::time::sleep(std::time::Duration::from_millis(1)).await;
214        };
215
216        assert!(fut
217            .deadline(std::time::Instant::now() + std::time::Duration::from_millis(2))
218            .await
219            .is_ok());
220    }
221
222    #[tokio::test]
223    async fn test_interrupt_chain_timed_out() {
224        let fut = async {
225            tokio::time::sleep(std::time::Duration::from_millis(3)).await;
226        };
227
228        let (fut, cancel) = fut.timeout(Duration::from_millis(1)).with_cancel_handle();
229        let fut = fut.shared();
230
231        tokio::select! {
232            _ = tokio::time::sleep(std::time::Duration::from_millis(2)) => {},
233            _ = fut.clone() => {},
234        };
235
236        assert!(!cancel.cancel());
237        assert!(matches!(fut.await, Ok(Err(_))));
238    }
239
240    #[tokio::test]
241    async fn test_interrupt_chain_canceled() {
242        let fut = async {
243            tokio::time::sleep(std::time::Duration::from_millis(3)).await;
244        };
245
246        let (fut, cancel) = fut.timeout(Duration::from_millis(2)).with_cancel_handle();
247        let fut = fut.shared();
248
249        tokio::select! {
250            _ = tokio::time::sleep(std::time::Duration::from_millis(1)) => {},
251            _ = fut.clone() => {},
252        };
253
254        assert!(cancel.cancel());
255        assert!(matches!(fut.await, Err(_)));
256    }
257
258    #[tokio::test]
259    async fn test_interrupt_chain_ok() {
260        let fut = async {
261            tokio::time::sleep(std::time::Duration::from_millis(1)).await;
262            Ok::<_, &str>(())
263        };
264
265        let (fut, cancel) = fut.timeout(Duration::from_millis(2)).with_cancel_handle();
266        let fut = fut.shared();
267
268        tokio::select! {
269            _ = tokio::time::sleep(std::time::Duration::from_millis(2)) => {},
270            _ = fut.clone() => {},
271        };
272
273        assert!(!cancel.cancel());
274        assert!(matches!(fut.await, Ok(Ok(Ok(())))));
275    }
276
277    #[tokio::test]
278    async fn test_interrupt_chain_err() {
279        let fut = async {
280            tokio::time::sleep(std::time::Duration::from_millis(1)).await;
281            Err::<(), _>("error")
282        };
283
284        let (fut, cancel) = fut.timeout(Duration::from_millis(2)).with_cancel_handle();
285        let fut = fut.shared();
286
287        tokio::select! {
288            _ = tokio::time::sleep(std::time::Duration::from_millis(2)) => {},
289            _ = fut.clone() => {},
290        };
291
292        assert!(!cancel.cancel());
293        assert!(matches!(fut.await, Ok(Ok(Err("error")))));
294    }
295}