apalis_core/monitor/
shutdown.rs1use std::{
8 pin::Pin,
9 sync::{
10 Arc, Mutex,
11 atomic::{AtomicBool, Ordering},
12 },
13 task::{Context, Poll, Waker},
14};
15
16use futures_core::Future;
17
18#[derive(Clone, Debug)]
20pub struct Shutdown {
21 inner: Arc<ShutdownCtx>,
22}
23
24impl Shutdown {
25 pub fn new() -> Shutdown {
27 Shutdown {
28 inner: Arc::new(ShutdownCtx::new()),
29 }
30 }
31
32 pub fn shutdown_after<F: Future>(&self, f: F) -> impl Future<Output = F::Output> + use<F> {
34 let handle = self.clone();
35 async move {
36 let result = f.await;
37 handle.start_shutdown();
38 result
39 }
40 }
41}
42
43impl Default for Shutdown {
44 fn default() -> Self {
45 Self::new()
46 }
47}
48
49#[derive(Debug)]
50pub(crate) struct ShutdownCtx {
51 state: AtomicBool,
52 waker: Mutex<Option<Waker>>,
53}
54impl ShutdownCtx {
55 fn new() -> ShutdownCtx {
56 Self {
57 state: AtomicBool::default(),
58 waker: Mutex::default(),
59 }
60 }
61 fn shutdown(&self) {
62 self.state.store(true, Ordering::Relaxed);
63 self.wake();
64 }
65
66 fn is_shutting_down(&self) -> bool {
67 self.state.load(Ordering::Relaxed)
68 }
69
70 pub(crate) fn wake(&self) {
71 if let Some(waker) = self.waker.lock().unwrap().take() {
72 waker.wake();
73 }
74 }
75}
76
77impl Shutdown {
78 pub fn is_shutting_down(&self) -> bool {
80 self.inner.is_shutting_down()
81 }
82
83 pub fn start_shutdown(&self) {
85 self.inner.shutdown()
86 }
87}
88
89impl Future for Shutdown {
90 type Output = ();
91
92 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
93 let ctx = &self.inner;
94 if ctx.state.load(Ordering::Relaxed) {
95 Poll::Ready(())
96 } else {
97 *ctx.waker.lock().unwrap() = Some(cx.waker().clone());
98 Poll::Pending
99 }
100 }
101}