apalis_core/monitor/
shutdown.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
use std::{
    pin::Pin,
    sync::{
        atomic::{AtomicBool, Ordering},
        Arc, Mutex,
    },
    task::{Context, Poll, Waker},
};

use futures::Future;

/// A shutdown token that stops execution
#[derive(Clone, Debug)]
pub struct Shutdown {
    inner: Arc<ShutdownCtx>,
}

impl Shutdown {
    /// Create a new shutdown handle
    pub fn new() -> Shutdown {
        Shutdown {
            inner: Arc::new(ShutdownCtx::new()),
        }
    }

    /// Set the future to await before shutting down
    pub fn shutdown_after<F: Future>(&self, f: F) -> impl Future<Output = F::Output> {
        let handle = self.clone();
        async move {
            let result = f.await;
            handle.start_shutdown();
            result
        }
    }
}

impl Default for Shutdown {
    fn default() -> Self {
        Self::new()
    }
}

#[derive(Debug)]
pub(crate) struct ShutdownCtx {
    state: AtomicBool,
    waker: Mutex<Option<Waker>>,
}
impl ShutdownCtx {
    fn new() -> ShutdownCtx {
        Self {
            state: AtomicBool::default(),
            waker: Mutex::default(),
        }
    }
    fn shutdown(&self) {
        self.state.store(true, Ordering::Relaxed);
        self.wake();
    }

    fn is_shutting_down(&self) -> bool {
        self.state.load(Ordering::Relaxed)
    }

    pub(crate) fn wake(&self) {
        if let Some(waker) = self.waker.lock().unwrap().take() {
            waker.wake();
        }
    }
}

impl Shutdown {
    /// Check if the system is shutting down
    pub fn is_shutting_down(&self) -> bool {
        self.inner.is_shutting_down()
    }

    /// Start the shutdown process
    pub fn start_shutdown(&self) {
        self.inner.shutdown()
    }
}

impl Future for Shutdown {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        let ctx = &self.inner;
        if ctx.state.load(Ordering::Relaxed) {
            Poll::Ready(())
        } else {
            *ctx.waker.lock().unwrap() = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}