apalis_core/monitor/
shutdown.rs

1use std::{
2    pin::Pin,
3    sync::{
4        atomic::{AtomicBool, Ordering},
5        Arc, Mutex,
6    },
7    task::{Context, Poll, Waker},
8};
9
10use futures::Future;
11
12/// A shutdown token that stops execution
13#[derive(Clone, Debug)]
14pub struct Shutdown {
15    inner: Arc<ShutdownCtx>,
16}
17
18impl Shutdown {
19    /// Create a new shutdown handle
20    pub fn new() -> Shutdown {
21        Shutdown {
22            inner: Arc::new(ShutdownCtx::new()),
23        }
24    }
25
26    /// Set the future to await before shutting down
27    pub fn shutdown_after<F: Future>(&self, f: F) -> impl Future<Output = F::Output> {
28        let handle = self.clone();
29        async move {
30            let result = f.await;
31            handle.start_shutdown();
32            result
33        }
34    }
35}
36
37impl Default for Shutdown {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43#[derive(Debug)]
44pub(crate) struct ShutdownCtx {
45    state: AtomicBool,
46    waker: Mutex<Option<Waker>>,
47}
48impl ShutdownCtx {
49    fn new() -> ShutdownCtx {
50        Self {
51            state: AtomicBool::default(),
52            waker: Mutex::default(),
53        }
54    }
55    fn shutdown(&self) {
56        self.state.store(true, Ordering::Relaxed);
57        self.wake();
58    }
59
60    fn is_shutting_down(&self) -> bool {
61        self.state.load(Ordering::Relaxed)
62    }
63
64    pub(crate) fn wake(&self) {
65        if let Some(waker) = self.waker.lock().unwrap().take() {
66            waker.wake();
67        }
68    }
69}
70
71impl Shutdown {
72    /// Check if the system is shutting down
73    pub fn is_shutting_down(&self) -> bool {
74        self.inner.is_shutting_down()
75    }
76
77    /// Start the shutdown process
78    pub fn start_shutdown(&self) {
79        self.inner.shutdown()
80    }
81}
82
83impl Future for Shutdown {
84    type Output = ();
85
86    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
87        let ctx = &self.inner;
88        if ctx.state.load(Ordering::Relaxed) {
89            Poll::Ready(())
90        } else {
91            *ctx.waker.lock().unwrap() = Some(cx.waker().clone());
92            Poll::Pending
93        }
94    }
95}