apalis_core/monitor/
shutdown.rs1use std::{
2 pin::Pin,
3 sync::{
4 atomic::{AtomicBool, Ordering},
5 Arc, Mutex,
6 },
7 task::{Context, Poll, Waker},
8};
9
10use futures::Future;
11
12#[derive(Clone, Debug)]
14pub struct Shutdown {
15 inner: Arc<ShutdownCtx>,
16}
17
18impl Shutdown {
19 pub fn new() -> Shutdown {
21 Shutdown {
22 inner: Arc::new(ShutdownCtx::new()),
23 }
24 }
25
26 pub fn shutdown_after<F: Future>(&self, f: F) -> impl Future<Output = F::Output> {
28 let handle = self.clone();
29 async move {
30 let result = f.await;
31 handle.start_shutdown();
32 result
33 }
34 }
35}
36
37impl Default for Shutdown {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43#[derive(Debug)]
44pub(crate) struct ShutdownCtx {
45 state: AtomicBool,
46 waker: Mutex<Option<Waker>>,
47}
48impl ShutdownCtx {
49 fn new() -> ShutdownCtx {
50 Self {
51 state: AtomicBool::default(),
52 waker: Mutex::default(),
53 }
54 }
55 fn shutdown(&self) {
56 self.state.store(true, Ordering::Relaxed);
57 self.wake();
58 }
59
60 fn is_shutting_down(&self) -> bool {
61 self.state.load(Ordering::Relaxed)
62 }
63
64 pub(crate) fn wake(&self) {
65 if let Some(waker) = self.waker.lock().unwrap().take() {
66 waker.wake();
67 }
68 }
69}
70
71impl Shutdown {
72 pub fn is_shutting_down(&self) -> bool {
74 self.inner.is_shutting_down()
75 }
76
77 pub fn start_shutdown(&self) {
79 self.inner.shutdown()
80 }
81}
82
83impl Future for Shutdown {
84 type Output = ();
85
86 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
87 let ctx = &self.inner;
88 if ctx.state.load(Ordering::Relaxed) {
89 Poll::Ready(())
90 } else {
91 *ctx.waker.lock().unwrap() = Some(cx.waker().clone());
92 Poll::Pending
93 }
94 }
95}