apalis_core/monitor/
shutdown.rs1use std::{
8 pin::Pin,
9 sync::{
10 Arc, Mutex,
11 atomic::{AtomicBool, Ordering},
12 },
13 task::{Context, Poll, Waker},
14};
15
16use futures_core::Future;
17
18#[derive(Clone, Debug)]
20pub struct Shutdown {
21 inner: Arc<ShutdownCtx>,
22}
23
24impl Shutdown {
25 #[must_use]
27 pub fn new() -> Self {
28 Self {
29 inner: Arc::new(ShutdownCtx::new()),
30 }
31 }
32
33 pub fn shutdown_after<F: Future>(&self, f: F) -> impl Future<Output = F::Output> + use<F> {
35 let handle = self.clone();
36 async move {
37 let result = f.await;
38 handle.start_shutdown();
39 result
40 }
41 }
42}
43
44impl Default for Shutdown {
45 fn default() -> Self {
46 Self::new()
47 }
48}
49
50#[derive(Debug)]
51pub(crate) struct ShutdownCtx {
52 state: AtomicBool,
53 waker: Mutex<Option<Waker>>,
54}
55impl ShutdownCtx {
56 fn new() -> Self {
57 Self {
58 state: AtomicBool::default(),
59 waker: Mutex::default(),
60 }
61 }
62 fn shutdown(&self) {
63 self.state.store(true, Ordering::Relaxed);
64 self.wake();
65 }
66
67 fn is_shutting_down(&self) -> bool {
68 self.state.load(Ordering::Relaxed)
69 }
70
71 pub(crate) fn wake(&self) {
72 if let Some(waker) = self.waker.lock().unwrap().take() {
73 waker.wake();
74 }
75 }
76}
77
78impl Shutdown {
79 #[must_use]
81 pub fn is_shutting_down(&self) -> bool {
82 self.inner.is_shutting_down()
83 }
84
85 pub fn start_shutdown(&self) {
87 self.inner.shutdown()
88 }
89}
90
91impl Future for Shutdown {
92 type Output = ();
93
94 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
95 let ctx = &self.inner;
96 if ctx.state.load(Ordering::Relaxed) {
97 Poll::Ready(())
98 } else {
99 *ctx.waker.lock().unwrap() = Some(cx.waker().clone());
100 Poll::Pending
101 }
102 }
103}