tokio_graceful/
lib.rs

1#![doc = include_str!("../README.md")]
2#![warn(
3    clippy::all,
4    clippy::dbg_macro,
5    clippy::todo,
6    clippy::empty_enum,
7    clippy::enum_glob_use,
8    clippy::mem_forget,
9    clippy::unused_self,
10    clippy::filter_map_next,
11    clippy::needless_continue,
12    clippy::needless_borrow,
13    clippy::match_wildcard_for_single_variants,
14    clippy::if_let_mutex,
15    clippy::await_holding_lock,
16    clippy::match_on_vec_items,
17    clippy::imprecise_flops,
18    clippy::suboptimal_flops,
19    clippy::lossy_float_literal,
20    clippy::rest_pat_in_fully_bound_structs,
21    clippy::fn_params_excessive_bools,
22    clippy::exit,
23    clippy::inefficient_to_string,
24    clippy::linkedlist,
25    clippy::macro_use_imports,
26    clippy::option_option,
27    clippy::verbose_file_reads,
28    clippy::unnested_or_patterns,
29    rust_2018_idioms,
30    future_incompatible,
31    nonstandard_style,
32    missing_docs
33)]
34#![allow(elided_lifetimes_in_paths, clippy::type_complexity)]
35#![cfg_attr(test, allow(clippy::float_cmp))]
36#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))]
37
38mod guard;
39pub use guard::{ShutdownGuard, WeakShutdownGuard};
40
41mod shutdown;
42#[cfg(not(loom))]
43pub use shutdown::default_signal;
44pub use shutdown::{Shutdown, ShutdownBuilder};
45
46pub(crate) mod sync;
47pub(crate) mod trigger;
48
49#[doc = include_str!("../README.md")]
50#[cfg(doctest)]
51pub struct ReadmeDoctests;
52
53#[cfg(test)]
54mod tests {
55    use std::time::Duration;
56
57    use tokio::sync::oneshot;
58
59    use super::*;
60
61    #[tokio::test]
62    async fn test_shutdown_nope() {
63        let (tx, rx) = oneshot::channel::<()>();
64        let shutdown = Shutdown::new(async {
65            rx.await.unwrap();
66        });
67        crate::sync::spawn(async move {
68            tx.send(()).unwrap();
69        });
70        shutdown.shutdown().await;
71    }
72
73    #[tokio::test]
74    async fn test_shutdown_nope_limit() {
75        let (tx, rx) = oneshot::channel::<()>();
76        let shutdown = Shutdown::new(async {
77            rx.await.unwrap();
78        });
79        crate::sync::spawn(async move {
80            tx.send(()).unwrap();
81        });
82        shutdown
83            .shutdown_with_limit(Duration::from_secs(60))
84            .await
85            .unwrap();
86    }
87
88    #[tokio::test]
89    async fn test_shutdown_guard_cancel_safety() {
90        let (tx, rx) = oneshot::channel::<()>();
91        let shutdown = Shutdown::new(async {
92            rx.await.unwrap();
93        });
94        let guard = shutdown.guard();
95
96        tokio::select! {
97            _ = guard.cancelled() => {}
98            _ = tokio::time::sleep(Duration::from_millis(50)) => {},
99        }
100
101        tx.send(()).unwrap();
102
103        drop(guard);
104
105        shutdown.shutdown().await;
106    }
107
108    #[tokio::test]
109    async fn test_shutdown_guard_weak_cancel_safety() {
110        let (tx, rx) = oneshot::channel::<()>();
111        let shutdown = Shutdown::new(async {
112            rx.await.unwrap();
113        });
114        let guard = shutdown.guard_weak();
115
116        tokio::select! {
117            _ = guard.into_cancelled() => {}
118            _ = tokio::time::sleep(Duration::from_millis(50)) => {},
119        }
120
121        tx.send(()).unwrap();
122
123        shutdown.shutdown().await;
124    }
125
126    #[tokio::test]
127    async fn test_shutdown_cancelled_after_shutdown() {
128        let (tx, rx) = oneshot::channel::<()>();
129        let shutdown = Shutdown::new(async {
130            rx.await.unwrap();
131        });
132        let weak_guard = shutdown.guard_weak();
133        tx.send(()).unwrap();
134        shutdown.shutdown().await;
135        weak_guard.cancelled().await;
136    }
137
138    #[tokio::test]
139    async fn test_shutdown_after_delay() {
140        let (tx, rx) = oneshot::channel::<()>();
141        let shutdown = Shutdown::builder()
142            .with_delay(Duration::from_micros(500))
143            .with_signal(async {
144                rx.await.unwrap();
145            })
146            .build();
147        tx.send(()).unwrap();
148        shutdown.shutdown().await;
149    }
150
151    #[tokio::test]
152    async fn test_shutdown_force() {
153        let (tx, rx) = oneshot::channel::<()>();
154        let (overwrite_tx, overwrite_rx) = oneshot::channel::<()>();
155        let shutdown = Shutdown::builder()
156            .with_signal(rx)
157            .with_overwrite_fn(|| overwrite_rx)
158            .build();
159        let _guard = shutdown.guard();
160        tx.send(()).unwrap();
161        overwrite_tx.send(()).unwrap();
162        shutdown.shutdown().await;
163    }
164
165    #[tokio::test]
166    async fn test_shutdown_with_limit_force() {
167        let (tx, rx) = oneshot::channel::<()>();
168        let (overwrite_tx, overwrite_rx) = oneshot::channel::<()>();
169        let shutdown = Shutdown::builder()
170            .with_signal(rx)
171            .with_overwrite_fn(|| overwrite_rx)
172            .build();
173        let _guard = shutdown.guard();
174        tx.send(()).unwrap();
175        overwrite_tx.send(()).unwrap();
176        assert!(shutdown
177            .shutdown_with_limit(Duration::from_secs(60))
178            .await
179            .is_err());
180    }
181
182    #[tokio::test]
183    async fn test_shutdown_with_delay_force() {
184        let (tx, rx) = oneshot::channel::<()>();
185        let (overwrite_tx, overwrite_rx) = oneshot::channel::<()>();
186        let shutdown = Shutdown::builder()
187            .with_delay(Duration::from_micros(500))
188            .with_signal(rx)
189            .with_overwrite_fn(|| overwrite_rx)
190            .build();
191        let _guard = shutdown.guard();
192        tx.send(()).unwrap();
193        overwrite_tx.send(()).unwrap();
194        shutdown.shutdown().await;
195    }
196
197    #[tokio::test]
198    async fn test_shutdown_with_limit_and_delay_force() {
199        let (tx, rx) = oneshot::channel::<()>();
200        let (overwrite_tx, overwrite_rx) = oneshot::channel::<()>();
201        let shutdown = Shutdown::builder()
202            .with_delay(Duration::from_micros(500))
203            .with_signal(rx)
204            .with_overwrite_fn(|| overwrite_rx)
205            .build();
206        let _guard = shutdown.guard();
207        tx.send(()).unwrap();
208        overwrite_tx.send(()).unwrap();
209        assert!(shutdown
210            .shutdown_with_limit(Duration::from_secs(60))
211            .await
212            .is_err());
213    }
214
215    #[tokio::test]
216    async fn test_shutdown_after_delay_check() {
217        let (tx, rx) = oneshot::channel::<()>();
218        let shutdown = Shutdown::builder()
219            .with_delay(Duration::from_secs(5))
220            .with_signal(rx)
221            .build();
222        tx.send(()).unwrap();
223        let result = tokio::time::timeout(Duration::from_micros(100), shutdown.shutdown()).await;
224        assert!(result.is_err(), "{result:?}");
225    }
226
227    #[tokio::test]
228    async fn test_shutdown_cancelled_vs_shutdown_signal_triggered() {
229        let (tx, rx) = oneshot::channel::<()>();
230        let shutdown = Shutdown::builder()
231            .with_delay(Duration::from_secs(5))
232            .with_signal(rx)
233            .build();
234        tx.send(()).unwrap();
235
236        let weak_guard = shutdown.guard_weak();
237
238        // will fail because delay is still being awaited
239        let result = tokio::time::timeout(Duration::from_micros(100), weak_guard.cancelled()).await;
240        assert!(result.is_err(), "{result:?}");
241
242        // this will succeed however, as it does not await the delay
243        let result = tokio::time::timeout(
244            Duration::from_millis(100),
245            weak_guard.shutdown_signal_triggered(),
246        )
247        .await;
248        assert!(result.is_ok(), "{result:?}");
249    }
250
251    #[tokio::test]
252    async fn test_shutdown_nested_guards() {
253        let (tx, rx) = oneshot::channel::<()>();
254        let shutdown = Shutdown::new(async {
255            rx.await.unwrap();
256        });
257        shutdown.spawn_task_fn(|guard| async move {
258            guard.spawn_task_fn(|guard| async move {
259                guard.spawn_task_fn(|guard| async move {
260                    guard.spawn_task(async {
261                        tokio::time::sleep(Duration::from_millis(50)).await;
262                    });
263                });
264            });
265        });
266        tx.send(()).unwrap();
267        shutdown.shutdown().await;
268    }
269
270    #[tokio::test]
271    async fn test_shutdown_sixten_thousand_guards() {
272        let (tx, rx) = oneshot::channel::<()>();
273        let shutdown = Shutdown::new(async {
274            rx.await.unwrap();
275        });
276        for _ in 0..16_000 {
277            shutdown.spawn_task(async {
278                // sleep random between 1 and 80ms
279                let duration = Duration::from_millis(rand::random::<u64>() % 80 + 1);
280                tokio::time::sleep(duration).await;
281            });
282        }
283        tx.send(()).unwrap();
284        shutdown.shutdown().await;
285    }
286}