nym_task/cancellation/
manager.rs

1// Copyright 2025 - Nym Technologies SA <contact@nymtech.net>
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::ShutdownToken;
5use crate::cancellation::tracker::{Cancelled, ShutdownTracker};
6use crate::spawn::JoinHandle;
7use futures::StreamExt;
8use futures::stream::FuturesUnordered;
9use log::error;
10use std::future::Future;
11use std::mem;
12use std::pin::Pin;
13use std::time::Duration;
14use tracing::info;
15
16#[cfg(not(target_arch = "wasm32"))]
17use tokio::time::sleep;
18
19#[cfg(target_arch = "wasm32")]
20use wasmtimer::tokio::sleep;
21
22#[cfg(unix)]
23use tokio::signal::unix::{SignalKind, signal};
24use tokio::task::JoinSet;
25
26/// A top level structure responsible for controlling process shutdown by listening to
27/// the underlying registered signals and issuing cancellation to tasks derived from its root cancellation token.
28#[allow(deprecated)]
29pub struct ShutdownManager {
30    /// Optional reference to the legacy [TaskManager](crate::TaskManager) to allow easier
31    /// transition to the new system.
32    pub(crate) legacy_task_manager: Option<crate::TaskManager>,
33
34    /// Registered [ShutdownSignals](ShutdownSignals) that will trigger process shutdown if detected.
35    pub(crate) shutdown_signals: ShutdownSignals,
36
37    /// Combined [TaskTracker](tokio_util::task::TaskTracker) and [ShutdownToken](ShutdownToken)
38    /// for spawning and tracking tasks associated with this ShutdownManager.
39    pub(crate) tracker: ShutdownTracker,
40
41    /// The maximum shutdown duration when tracked tasks could gracefully exit
42    /// before forcing the shutdown.
43    pub(crate) max_shutdown_duration: Duration,
44}
45
46/// Wrapper behind futures that upon completion will trigger binary shutdown.
47#[derive(Default)]
48pub struct ShutdownSignals(JoinSet<()>);
49
50impl ShutdownSignals {
51    /// Wait for any of the registered signals to be ready
52    pub async fn wait_for_signal(&mut self) {
53        self.0.join_next().await;
54    }
55}
56
57// note: default implementation will ONLY listen for SIGINT and will ignore SIGTERM and SIGQUIT
58// this is due to result type when registering the signal
59#[cfg(not(target_arch = "wasm32"))]
60impl Default for ShutdownManager {
61    fn default() -> Self {
62        ShutdownManager::new_without_signals()
63            .with_interrupt_signal()
64            .with_cancel_on_panic()
65    }
66}
67
68#[cfg(not(target_arch = "wasm32"))]
69impl ShutdownManager {
70    /// Create new instance of ShutdownManager with the most sensible defaults, so that:
71    /// - shutdown will be triggered upon either SIGINT, SIGTERM (unix only) or SIGQUIT (unix only)  being sent
72    /// - shutdown will be triggered upon any task panicking
73    pub fn build_new_default() -> std::io::Result<Self> {
74        Ok(ShutdownManager::new_without_signals()
75            .with_default_shutdown_signals()?
76            .with_cancel_on_panic())
77    }
78
79    /// Register a new shutdown signal that upon completion will trigger system shutdown.
80    #[must_use]
81    #[track_caller]
82    pub fn with_shutdown<F>(mut self, shutdown: F) -> Self
83    where
84        F: Future<Output = ()>,
85        F: Send + 'static,
86    {
87        let shutdown_token = self.tracker.clone_shutdown_token();
88        self.shutdown_signals.0.spawn(async move {
89            shutdown.await;
90
91            info!("sending cancellation after receiving shutdown signal");
92            shutdown_token.cancel();
93        });
94        self
95    }
96
97    /// Include support for the legacy [TaskManager](TaskManager) to this instance of the ShutdownManager.
98    /// This will allow issuing [TaskClient](TaskClient) for tasks that still require them.
99    #[allow(deprecated)]
100    pub fn with_legacy_task_manager(mut self) -> Self {
101        let mut legacy_manager = crate::TaskManager::default().named("legacy-task-manager");
102        let mut legacy_error_rx = legacy_manager.task_return_error_rx();
103        let mut legacy_drop_rx = legacy_manager.task_drop_rx();
104
105        self.legacy_task_manager = Some(legacy_manager);
106
107        // add a task that listens for legacy task clients being dropped to trigger cancellation
108        self.with_shutdown(async move {
109            tokio::select! {
110                _ = legacy_error_rx.recv() => (),
111                _ = legacy_drop_rx.recv() => (),
112            }
113
114            info!("received legacy shutdown signal");
115        })
116    }
117
118    /// Add the specified signal to the currently registered shutdown signals that will trigger
119    /// cancellation of all registered tasks.
120    #[cfg(unix)]
121    #[track_caller]
122    pub fn with_shutdown_signal(self, signal_kind: SignalKind) -> std::io::Result<Self> {
123        let mut sig = signal(signal_kind)?;
124        Ok(self.with_shutdown(async move {
125            sig.recv().await;
126        }))
127    }
128
129    /// Add the SIGTERM signal to the currently registered shutdown signals that will trigger
130    /// cancellation of all registered tasks.
131    #[cfg(unix)]
132    #[track_caller]
133    pub fn with_terminate_signal(self) -> std::io::Result<Self> {
134        self.with_shutdown_signal(SignalKind::terminate())
135    }
136
137    /// Add the SIGQUIT signal to the currently registered shutdown signals that will trigger
138    /// cancellation of all registered tasks.
139    #[cfg(unix)]
140    #[track_caller]
141    pub fn with_quit_signal(self) -> std::io::Result<Self> {
142        self.with_shutdown_signal(SignalKind::quit())
143    }
144
145    /// Add default signals to the set of the currently registered shutdown signals that will trigger
146    /// cancellation of all registered tasks.
147    /// This includes SIGINT, SIGTERM and SIGQUIT for unix-based platforms and SIGINT for other targets (such as windows)/
148    pub fn with_default_shutdown_signals(self) -> std::io::Result<Self> {
149        cfg_if::cfg_if! {
150            if #[cfg(unix)] {
151                self.with_interrupt_signal()
152                    .with_terminate_signal()?
153                    .with_quit_signal()
154            } else {
155                Ok(self.with_interrupt_signal())
156            }
157        }
158    }
159
160    /// Add the SIGINT (ctrl-c) signal to the currently registered shutdown signals that will trigger
161    /// cancellation of all registered tasks.
162    #[track_caller]
163    pub fn with_interrupt_signal(self) -> Self {
164        self.with_shutdown(async move {
165            let _ = tokio::signal::ctrl_c().await;
166        })
167    }
168
169    /// Spawn the provided future on the current Tokio runtime, and track it in the underlying [TaskTracker](tokio_util::task::TaskTracker).
170    #[track_caller]
171    pub fn spawn<F>(&self, task: F) -> JoinHandle<F::Output>
172    where
173        F: Future + Send + 'static,
174        F::Output: Send + 'static,
175    {
176        self.tracker.spawn(task)
177    }
178
179    /// Spawn the provided future on the current Tokio runtime,
180    /// and track it in the underlying [TaskTracker](tokio_util::task::TaskTracker).
181    /// Furthermore, attach a name to the spawned task to more easily track it within a [tokio console](https://github.com/tokio-rs/console)
182    ///
183    /// Note that is no different from [spawn](Self::spawn) if the underlying binary
184    /// has not been built with `RUSTFLAGS="--cfg tokio_unstable"` and `--features="tokio-tracing"`
185    #[track_caller]
186    pub fn try_spawn_named<F>(&self, task: F, name: &str) -> JoinHandle<F::Output>
187    where
188        F: Future + Send + 'static,
189        F::Output: Send + 'static,
190    {
191        self.tracker.try_spawn_named(task, name)
192    }
193
194    /// Spawn the provided future on the provided Tokio runtime,
195    /// and track it in the underlying [TaskTracker](tokio_util::task::TaskTracker).
196    #[track_caller]
197    pub fn spawn_on<F>(&self, task: F, handle: &tokio::runtime::Handle) -> JoinHandle<F::Output>
198    where
199        F: Future + Send + 'static,
200        F::Output: Send + 'static,
201    {
202        self.tracker.spawn_on(task, handle)
203    }
204
205    /// Spawn the provided future on the current [LocalSet](tokio::task::LocalSet),
206    /// and track it in the underlying [TaskTracker](tokio_util::task::TaskTracker).
207    #[track_caller]
208    pub fn spawn_local<F>(&self, task: F) -> JoinHandle<F::Output>
209    where
210        F: Future + 'static,
211        F::Output: 'static,
212    {
213        self.tracker.spawn_local(task)
214    }
215
216    /// Spawn the provided blocking task on the current Tokio runtime,
217    /// and track it in the underlying [TaskTracker](tokio_util::task::TaskTracker).
218    #[track_caller]
219    pub fn spawn_blocking<F, T>(&self, task: F) -> JoinHandle<T>
220    where
221        F: FnOnce() -> T,
222        F: Send + 'static,
223        T: Send + 'static,
224    {
225        self.tracker.spawn_blocking(task)
226    }
227
228    /// Spawn the provided blocking task on the provided Tokio runtime,
229    /// and track it in the underlying [TaskTracker](tokio_util::task::TaskTracker).
230    #[track_caller]
231    pub fn spawn_blocking_on<F, T>(&self, task: F, handle: &tokio::runtime::Handle) -> JoinHandle<T>
232    where
233        F: FnOnce() -> T,
234        F: Send + 'static,
235        T: Send + 'static,
236    {
237        self.tracker.spawn_blocking_on(task, handle)
238    }
239
240    /// Spawn the provided future on the current Tokio runtime
241    /// that will get cancelled once a global shutdown signal is detected,
242    /// and track it in the underlying [TaskTracker](tokio_util::task::TaskTracker).
243    ///
244    /// Note that to fully use the naming feature, such as tracking within a [tokio console](https://github.com/tokio-rs/console),
245    /// the underlying binary has to be built with `RUSTFLAGS="--cfg tokio_unstable"` and `--features="tokio-tracing"`
246    #[track_caller]
247    pub fn try_spawn_named_with_shutdown<F>(
248        &self,
249        task: F,
250        name: &str,
251    ) -> JoinHandle<Result<F::Output, Cancelled>>
252    where
253        F: Future + Send + 'static,
254        F::Output: Send + 'static,
255    {
256        self.tracker.try_spawn_named_with_shutdown(task, name)
257    }
258
259    /// Spawn the provided future on the current Tokio runtime
260    /// that will get cancelled once a global shutdown signal is detected,
261    /// and track it in the underlying [TaskTracker](tokio_util::task::TaskTracker).
262    #[track_caller]
263    pub fn spawn_with_shutdown<F>(&self, task: F) -> JoinHandle<Result<F::Output, Cancelled>>
264    where
265        F: Future + Send + 'static,
266        F::Output: Send + 'static,
267    {
268        self.tracker.spawn_with_shutdown(task)
269    }
270}
271
272#[cfg(target_arch = "wasm32")]
273impl ShutdownManager {
274    /// Run the provided future on the current thread, and track it in the underlying [TaskTracker](tokio_util::task::TaskTracker).
275    #[track_caller]
276    pub fn spawn<F>(&self, task: F) -> JoinHandle<F::Output>
277    where
278        F: Future + 'static,
279    {
280        self.tracker.spawn(task)
281    }
282
283    /// Run the provided future on the current thread, and track it in the underlying [TaskTracker](tokio_util::task::TaskTracker).
284    /// It has exactly the same behaviour as [spawn](Self::spawn) and it only exists to provide
285    /// the same interface as non-wasm32 targets.
286    #[track_caller]
287    pub fn try_spawn_named<F>(&self, task: F, name: &str) -> JoinHandle<F::Output>
288    where
289        F: Future + 'static,
290    {
291        self.tracker.try_spawn_named(task, name)
292    }
293
294    /// Run the provided future on the current thread
295    /// that will get cancelled once a global shutdown signal is detected,
296    /// and track it in the underlying [TaskTracker](tokio_util::task::TaskTracker).
297    /// It has exactly the same behaviour as [spawn_with_shutdown](Self::spawn_with_shutdown) and it only exists to provide
298    /// the same interface as non-wasm32 targets.
299    #[track_caller]
300    pub fn try_spawn_named_with_shutdown<F>(
301        &self,
302        task: F,
303        name: &str,
304    ) -> JoinHandle<Result<F::Output, Cancelled>>
305    where
306        F: Future<Output = ()> + Send + 'static,
307    {
308        self.tracker.try_spawn_named_with_shutdown(task, name)
309    }
310
311    /// Run the provided future on the current thread
312    /// that will get cancelled once a global shutdown signal is detected,
313    /// and track it in the underlying [TaskTracker](tokio_util::task::TaskTracker).
314    #[track_caller]
315    pub fn spawn_with_shutdown<F>(&self, task: F) -> JoinHandle<Result<F::Output, Cancelled>>
316    where
317        F: Future<Output = ()> + Send + 'static,
318    {
319        self.tracker.spawn_with_shutdown(task)
320    }
321}
322
323impl ShutdownManager {
324    /// Create new instance of ShutdownManager without any external shutdown signals registered,
325    /// meaning it will only attempt to wait for all tasks spawned on its tracker to gracefully finish execution.
326    pub fn new_without_signals() -> Self {
327        Self::new_from_external_shutdown_token(ShutdownToken::new())
328    }
329
330    /// Create new instance of the ShutdownManager using an external shutdown token.
331    ///
332    /// Note: it will not listen to any external shutdown signals!
333    /// You might want further customise it with [shutdown signals](Self::with_shutdown)
334    /// (or just use [the default set](Self::with_default_shutdown_signals).
335    /// Similarly, you might want to include [cancellation on panic](Self::with_cancel_on_panic)
336    /// to make sure everything gets cancelled if one of the tasks panics.
337    pub fn new_from_external_shutdown_token(shutdown_token: ShutdownToken) -> Self {
338        let manager = ShutdownManager {
339            legacy_task_manager: None,
340            shutdown_signals: Default::default(),
341            tracker: ShutdownTracker::new_from_external_shutdown_token(shutdown_token),
342            max_shutdown_duration: Duration::from_secs(10),
343        };
344
345        // we need to add an explicit watcher for the cancellation token being cancelled
346        // so that we could cancel all legacy tasks
347        cfg_if::cfg_if! {if #[cfg(not(target_arch = "wasm32"))] {
348            let cancel_watcher = manager.tracker.clone_shutdown_token();
349            manager.with_shutdown(async move { cancel_watcher.cancelled().await })
350        } else {
351            manager
352        }}
353    }
354
355    /// Create an empty testing mock of the ShutdownManager with no signals registered.
356    pub fn empty_mock() -> Self {
357        ShutdownManager {
358            legacy_task_manager: None,
359            shutdown_signals: Default::default(),
360            tracker: Default::default(),
361            max_shutdown_duration: Default::default(),
362        }
363    }
364
365    /// Add additional panic hook such that upon triggering, the root [ShutdownToken](ShutdownToken) gets cancelled.
366    /// Note: an unfortunate limitation of this is that graceful shutdown will no longer be possible
367    /// since that task that has panicked will not exit and thus all shutdowns will have to be either forced
368    /// or will have to time out.
369    #[must_use]
370    pub fn with_cancel_on_panic(self) -> Self {
371        let current_hook = std::panic::take_hook();
372
373        let shutdown_token = self.clone_shutdown_token();
374        std::panic::set_hook(Box::new(move |panic_info| {
375            // 1. call existing hook
376            current_hook(panic_info);
377
378            let location = panic_info
379                .location()
380                .map(|l| l.to_string())
381                .unwrap_or_else(|| "<unknown>".to_string());
382
383            let payload = if let Some(payload) = panic_info.payload().downcast_ref::<&str>() {
384                payload
385            } else {
386                ""
387            };
388
389            // 2. issue cancellation
390            error!("panicked at {location}: {payload}. issuing global cancellation");
391            shutdown_token.cancel();
392        }));
393        self
394    }
395
396    /// Change the maximum shutdown duration when tracked tasks could gracefully exit
397    /// before forcing the shutdown.
398    #[must_use]
399    pub fn with_shutdown_duration(mut self, duration: Duration) -> Self {
400        self.max_shutdown_duration = duration;
401        self
402    }
403
404    /// Returns true if the root [ShutdownToken](ShutdownToken) has been cancelled.
405    pub fn is_cancelled(&self) -> bool {
406        self.tracker.root_cancellation_token.is_cancelled()
407    }
408
409    /// Get a reference to the used [ShutdownTracker](ShutdownTracker)
410    pub fn shutdown_tracker(&self) -> &ShutdownTracker {
411        &self.tracker
412    }
413
414    /// Get a cloned instance of the used [ShutdownTracker](ShutdownTracker)
415    pub fn shutdown_tracker_owned(&self) -> ShutdownTracker {
416        self.tracker.clone()
417    }
418
419    /// Waits until the underlying [TaskTracker](tokio_util::task::TaskTracker) is both closed and empty.
420    ///
421    /// If the underlying [TaskTracker](tokio_util::task::TaskTracker) is already closed and empty when this method is called, then it
422    /// returns immediately.
423    pub async fn wait_for_tracker(&self) {
424        self.tracker.wait_for_tracker().await;
425    }
426
427    /// Close the underlying [TaskTracker](tokio_util::task::TaskTracker).
428    ///
429    /// This allows [`wait_for_tracker`] futures to complete. It does not prevent you from spawning new tasks.
430    ///
431    /// Returns `true` if this closed the underlying [TaskTracker](tokio_util::task::TaskTracker), or `false` if it was already closed.
432    ///
433    /// [`wait_for_tracker`]: ShutdownTracker::wait_for_tracker
434    pub fn close_tracker(&self) -> bool {
435        self.tracker.close_tracker()
436    }
437
438    /// Reopen the underlying [TaskTracker](tokio_util::task::TaskTracker).
439    ///
440    /// This prevents [`wait_for_tracker`] futures from completing even if the underlying [TaskTracker](tokio_util::task::TaskTracker) is empty.
441    ///
442    /// Returns `true` if this reopened the underlying [TaskTracker](tokio_util::task::TaskTracker), or `false` if it was already open.
443    ///
444    /// [`wait_for_tracker`]: ShutdownTracker::wait_for_tracker
445    pub fn reopen_tracker(&self) -> bool {
446        self.tracker.reopen_tracker()
447    }
448
449    /// Returns `true` if the underlying [TaskTracker](tokio_util::task::TaskTracker) is [closed](Self::close_tracker).
450    pub fn is_tracker_closed(&self) -> bool {
451        self.tracker.is_tracker_closed()
452    }
453
454    /// Returns the number of tasks tracked by the underlying [TaskTracker](tokio_util::task::TaskTracker).
455    pub fn tracked_tasks(&self) -> usize {
456        self.tracker.tracked_tasks()
457    }
458
459    /// Returns `true` if there are no tasks in the underlying [TaskTracker](tokio_util::task::TaskTracker).
460    pub fn is_tracker_empty(&self) -> bool {
461        self.tracker.is_tracker_empty()
462    }
463
464    /// Obtain a [ShutdownToken](crate::cancellation::ShutdownToken) that is a child of the root token
465    pub fn child_shutdown_token(&self) -> ShutdownToken {
466        self.tracker.root_cancellation_token.child_token()
467    }
468
469    /// Obtain a [ShutdownToken](crate::cancellation::ShutdownToken) on the same hierarchical structure as the root token
470    pub fn clone_shutdown_token(&self) -> ShutdownToken {
471        self.tracker.root_cancellation_token.clone()
472    }
473
474    /// Attempt to create a handle to a legacy [TaskClient] to support tasks that hasn't migrated
475    /// from the legacy [TaskManager].
476    /// Note. To use this method [ShutdownManager] must be built with `.with_legacy_task_manager()`
477    #[must_use]
478    #[deprecated]
479    #[allow(deprecated)]
480    pub fn subscribe_legacy<S: Into<String>>(&self, child_suffix: S) -> crate::TaskClient {
481        // alternatively we could have set self.legacy_task_manager = Some(TaskManager::default());
482        // on demand if it wasn't unavailable, but then we'd have to use mutable reference
483        #[allow(clippy::expect_used)]
484        self.legacy_task_manager
485            .as_ref()
486            .expect("did not enable legacy shutdown support")
487            .subscribe_named(child_suffix)
488    }
489
490    /// Finalise the shutdown procedure by waiting until either:
491    /// - all tracked tasks have terminated
492    /// - timeout has been reached
493    /// - shutdown has been forced (by sending SIGINT)
494    async fn finish_shutdown(&mut self) {
495        let mut wait_futures = FuturesUnordered::<Pin<Box<dyn Future<Output = ()> + Send>>>::new();
496
497        // force shutdown via ctrl-c
498        wait_futures.push(Box::pin(async move {
499            #[cfg(not(target_arch = "wasm32"))]
500            let interrupt_future = tokio::signal::ctrl_c();
501
502            #[cfg(target_arch = "wasm32")]
503            let interrupt_future = futures::future::pending::<()>();
504
505            let _ = interrupt_future.await;
506            info!("received interrupt - forcing shutdown");
507        }));
508
509        // timeout
510        let max_shutdown = self.max_shutdown_duration;
511        wait_futures.push(Box::pin(async move {
512            sleep(max_shutdown).await;
513            info!("timeout reached - forcing shutdown");
514        }));
515
516        // graceful
517        let tracker = self.tracker.clone();
518        wait_futures.push(Box::pin(async move {
519            tracker.wait_for_tracker().await;
520            info!("all tracked tasks successfully shutdown");
521            if let Some(legacy) = self.legacy_task_manager.as_mut() {
522                legacy.wait_for_graceful_shutdown().await;
523                info!("all legacy tasks successfully shutdown");
524            }
525
526            info!("all registered tasks successfully shutdown")
527        }));
528
529        wait_futures.next().await;
530    }
531
532    /// Remove the current set of [ShutdownSignals] from this instance of
533    /// [ShutdownManager] replacing it with an empty set.
534    ///
535    /// This is potentially useful if one wishes to start listening for the signals
536    /// before the whole process has been fully set up.
537    pub fn detach_shutdown_signals(&mut self) -> ShutdownSignals {
538        mem::take(&mut self.shutdown_signals)
539    }
540
541    /// Replace the current set of [ShutdownSignals] used for determining
542    /// whether the underlying process should be stopped.
543    pub fn replace_shutdown_signals(&mut self, signals: ShutdownSignals) {
544        self.shutdown_signals = signals;
545    }
546
547    /// Send cancellation signal to all registered tasks by cancelling the root token
548    /// and sending shutdown signal, if applicable, on the legacy [TaskManager]
549    pub fn send_cancellation(&self) {
550        if let Some(legacy_manager) = self.legacy_task_manager.as_ref() {
551            info!("attempting to shutdown legacy tasks");
552            let _ = legacy_manager.signal_shutdown();
553        }
554        self.tracker.root_cancellation_token.cancel();
555    }
556
557    /// Wait until receiving one of the registered shutdown signals
558    /// this method is cancellation safe
559    pub async fn wait_for_shutdown_signal(&mut self) {
560        #[cfg(not(target_arch = "wasm32"))]
561        self.shutdown_signals.0.join_next().await;
562
563        #[cfg(target_arch = "wasm32")]
564        self.tracker.root_cancellation_token.cancelled().await;
565    }
566
567    /// Perform system shutdown by sending relevant signals and waiting until either:
568    /// - all tracked tasks have terminated
569    /// - timeout has been reached
570    /// - shutdown has been forced (by sending SIGINT)
571    pub async fn perform_shutdown(&mut self) {
572        self.send_cancellation();
573
574        info!("waiting for tasks to finish... (press ctrl-c to force)");
575        self.finish_shutdown().await;
576    }
577
578    /// Wait until a shutdown signal has been received and trigger system shutdown.
579    pub async fn run_until_shutdown(&mut self) {
580        self.close_tracker();
581        self.wait_for_shutdown_signal().await;
582
583        self.perform_shutdown().await;
584    }
585}
586
587#[cfg(test)]
588mod tests {
589    use super::*;
590    use nym_test_utils::traits::{ElapsedExt, Timeboxed};
591    use std::sync::Arc;
592    use std::sync::atomic::AtomicBool;
593
594    #[tokio::test]
595    async fn shutdown_with_no_tracked_tasks_and_signals() -> anyhow::Result<()> {
596        let mut manager = ShutdownManager::new_without_signals();
597        let res = manager.run_until_shutdown().timeboxed().await;
598        assert!(res.has_elapsed());
599
600        let mut manager = ShutdownManager::new_without_signals();
601        let shutdown = manager.clone_shutdown_token();
602        shutdown.cancel();
603        let res = manager.run_until_shutdown().timeboxed().await;
604        assert!(!res.has_elapsed());
605
606        Ok(())
607    }
608
609    #[tokio::test]
610    async fn shutdown_signal() -> anyhow::Result<()> {
611        let timeout_shutdown = sleep(Duration::from_millis(100));
612        let mut manager = ShutdownManager::new_without_signals().with_shutdown(timeout_shutdown);
613
614        // execution finishes after the sleep gets finishes
615        let res = manager
616            .run_until_shutdown()
617            .execute_with_deadline(Duration::from_millis(200))
618            .await;
619        assert!(!res.has_elapsed());
620
621        Ok(())
622    }
623
624    #[tokio::test]
625    async fn panic_hook() -> anyhow::Result<()> {
626        let mut manager = ShutdownManager::new_without_signals().with_cancel_on_panic();
627        manager.spawn_with_shutdown(async move {
628            sleep(Duration::from_millis(10000)).await;
629        });
630        manager.spawn_with_shutdown(async move {
631            sleep(Duration::from_millis(10)).await;
632            panic!("panicking");
633        });
634
635        // execution finishes after the panic gets triggered
636        let res = manager
637            .run_until_shutdown()
638            .execute_with_deadline(Duration::from_millis(200))
639            .await;
640        assert!(!res.has_elapsed());
641
642        Ok(())
643    }
644
645    #[tokio::test]
646    async fn task_cancellation() -> anyhow::Result<()> {
647        let timeout_shutdown = sleep(Duration::from_millis(100));
648        let mut manager = ShutdownManager::new_without_signals().with_shutdown(timeout_shutdown);
649
650        let cancelled1 = Arc::new(AtomicBool::new(false));
651        let cancelled1_clone = cancelled1.clone();
652        let cancelled2 = Arc::new(AtomicBool::new(false));
653        let cancelled2_clone = cancelled2.clone();
654
655        let shutdown = manager.clone_shutdown_token();
656        manager.spawn(async move {
657            shutdown.cancelled().await;
658            cancelled1_clone.store(true, std::sync::atomic::Ordering::Relaxed);
659        });
660
661        let shutdown = manager.clone_shutdown_token();
662        manager.spawn(async move {
663            shutdown.cancelled().await;
664            cancelled2_clone.store(true, std::sync::atomic::Ordering::Relaxed);
665        });
666
667        let res = manager
668            .run_until_shutdown()
669            .execute_with_deadline(Duration::from_millis(200))
670            .await;
671
672        assert!(!res.has_elapsed());
673        assert!(cancelled1.load(std::sync::atomic::Ordering::Relaxed));
674        assert!(cancelled2.load(std::sync::atomic::Ordering::Relaxed));
675        Ok(())
676    }
677
678    #[tokio::test]
679    async fn cancellation_within_task() -> anyhow::Result<()> {
680        let mut manager = ShutdownManager::new_without_signals();
681
682        let cancelled1 = Arc::new(AtomicBool::new(false));
683        let cancelled1_clone = cancelled1.clone();
684
685        let shutdown = manager.clone_shutdown_token();
686        manager.spawn(async move {
687            shutdown.cancelled().await;
688            cancelled1_clone.store(true, std::sync::atomic::Ordering::Relaxed);
689        });
690
691        let shutdown = manager.clone_shutdown_token();
692        manager.spawn(async move {
693            sleep(Duration::from_millis(10)).await;
694            shutdown.cancel();
695        });
696
697        let res = manager
698            .run_until_shutdown()
699            .execute_with_deadline(Duration::from_millis(200))
700            .await;
701
702        assert!(!res.has_elapsed());
703        assert!(cancelled1.load(std::sync::atomic::Ordering::Relaxed));
704        Ok(())
705    }
706
707    #[tokio::test]
708    async fn shutdown_timeout() -> anyhow::Result<()> {
709        let timeout_shutdown = sleep(Duration::from_millis(50));
710        let mut manager = ShutdownManager::new_without_signals()
711            .with_shutdown(timeout_shutdown)
712            .with_shutdown_duration(Duration::from_millis(1000));
713
714        // ignore shutdown signals
715        manager.spawn(async move {
716            sleep(Duration::from_millis(1000)).await;
717        });
718
719        let res = manager
720            .run_until_shutdown()
721            .execute_with_deadline(Duration::from_millis(200))
722            .await;
723
724        assert!(res.has_elapsed());
725
726        let timeout_shutdown = sleep(Duration::from_millis(50));
727        let mut manager = ShutdownManager::new_without_signals()
728            .with_shutdown(timeout_shutdown)
729            .with_shutdown_duration(Duration::from_millis(100));
730
731        // ignore shutdown signals
732        manager.spawn(async move {
733            sleep(Duration::from_millis(1000)).await;
734        });
735
736        let res = manager
737            .run_until_shutdown()
738            .execute_with_deadline(Duration::from_millis(200))
739            .await;
740
741        assert!(!res.has_elapsed());
742        Ok(())
743    }
744}