apalis_core/monitor/
shutdown.rs

1//! A shutdown monitor that can be used to signal shutdown to tasks and workers.
2//!
3//! ## Overview
4//! It provides a future that resolves when a shutdown is initiated, allowing tasks to await shutdown
5//! signals and perform cleanup operations.
6
7use std::{
8    pin::Pin,
9    sync::{
10        Arc, Mutex,
11        atomic::{AtomicBool, Ordering},
12    },
13    task::{Context, Poll, Waker},
14};
15
16use futures_core::Future;
17
18/// A shutdown token that stops execution
19#[derive(Clone, Debug)]
20pub struct Shutdown {
21    inner: Arc<ShutdownCtx>,
22}
23
24impl Shutdown {
25    /// Create a new shutdown handle
26    #[must_use]
27    pub fn new() -> Self {
28        Self {
29            inner: Arc::new(ShutdownCtx::new()),
30        }
31    }
32
33    /// Set the future to await before shutting down
34    pub fn shutdown_after<F: Future>(&self, f: F) -> impl Future<Output = F::Output> + use<F> {
35        let handle = self.clone();
36        async move {
37            let result = f.await;
38            handle.start_shutdown();
39            result
40        }
41    }
42}
43
44impl Default for Shutdown {
45    fn default() -> Self {
46        Self::new()
47    }
48}
49
50#[derive(Debug)]
51pub(crate) struct ShutdownCtx {
52    state: AtomicBool,
53    waker: Mutex<Option<Waker>>,
54}
55impl ShutdownCtx {
56    fn new() -> Self {
57        Self {
58            state: AtomicBool::default(),
59            waker: Mutex::default(),
60        }
61    }
62    fn shutdown(&self) {
63        self.state.store(true, Ordering::Relaxed);
64        self.wake();
65    }
66
67    fn is_shutting_down(&self) -> bool {
68        self.state.load(Ordering::Relaxed)
69    }
70
71    pub(crate) fn wake(&self) {
72        if let Some(waker) = self.waker.lock().unwrap().take() {
73            waker.wake();
74        }
75    }
76}
77
78impl Shutdown {
79    /// Check if the system is shutting down
80    #[must_use]
81    pub fn is_shutting_down(&self) -> bool {
82        self.inner.is_shutting_down()
83    }
84
85    /// Start the shutdown process
86    pub fn start_shutdown(&self) {
87        self.inner.shutdown()
88    }
89}
90
91impl Future for Shutdown {
92    type Output = ();
93
94    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
95        let ctx = &self.inner;
96        if ctx.state.load(Ordering::Relaxed) {
97            Poll::Ready(())
98        } else {
99            *ctx.waker.lock().unwrap() = Some(cx.waker().clone());
100            Poll::Pending
101        }
102    }
103}