apalis_core/monitor/
shutdown.rs

1//! A shutdown monitor that can be used to signal shutdown to tasks and workers.
2//!
3//! ## Overview
4//! It provides a future that resolves when a shutdown is initiated, allowing tasks to await shutdown
5//! signals and perform cleanup operations.
6
7use std::{
8    pin::Pin,
9    sync::{
10        Arc, Mutex,
11        atomic::{AtomicBool, Ordering},
12    },
13    task::{Context, Poll, Waker},
14};
15
16use futures_core::Future;
17
18/// A shutdown token that stops execution
19#[derive(Clone, Debug)]
20pub struct Shutdown {
21    inner: Arc<ShutdownCtx>,
22}
23
24impl Shutdown {
25    /// Create a new shutdown handle
26    pub fn new() -> Shutdown {
27        Shutdown {
28            inner: Arc::new(ShutdownCtx::new()),
29        }
30    }
31
32    /// Set the future to await before shutting down
33    pub fn shutdown_after<F: Future>(&self, f: F) -> impl Future<Output = F::Output> + use<F> {
34        let handle = self.clone();
35        async move {
36            let result = f.await;
37            handle.start_shutdown();
38            result
39        }
40    }
41}
42
43impl Default for Shutdown {
44    fn default() -> Self {
45        Self::new()
46    }
47}
48
49#[derive(Debug)]
50pub(crate) struct ShutdownCtx {
51    state: AtomicBool,
52    waker: Mutex<Option<Waker>>,
53}
54impl ShutdownCtx {
55    fn new() -> ShutdownCtx {
56        Self {
57            state: AtomicBool::default(),
58            waker: Mutex::default(),
59        }
60    }
61    fn shutdown(&self) {
62        self.state.store(true, Ordering::Relaxed);
63        self.wake();
64    }
65
66    fn is_shutting_down(&self) -> bool {
67        self.state.load(Ordering::Relaxed)
68    }
69
70    pub(crate) fn wake(&self) {
71        if let Some(waker) = self.waker.lock().unwrap().take() {
72            waker.wake();
73        }
74    }
75}
76
77impl Shutdown {
78    /// Check if the system is shutting down
79    pub fn is_shutting_down(&self) -> bool {
80        self.inner.is_shutting_down()
81    }
82
83    /// Start the shutdown process
84    pub fn start_shutdown(&self) {
85        self.inner.shutdown()
86    }
87}
88
89impl Future for Shutdown {
90    type Output = ();
91
92    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
93        let ctx = &self.inner;
94        if ctx.state.load(Ordering::Relaxed) {
95            Poll::Ready(())
96        } else {
97            *ctx.waker.lock().unwrap() = Some(cx.waker().clone());
98            Poll::Pending
99        }
100    }
101}