rocket_community/shutdown/
handle.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use futures::{FutureExt, StreamExt};
6
7use crate::request::{FromRequest, Outcome, Request};
8use crate::shutdown::{ShutdownConfig, TripWire};
9
10/// A request guard and future for graceful shutdown.
11///
12/// A server shutdown is manually requested by calling [`Shutdown::notify()`]
13/// or, if enabled, through [automatic triggers] like `Ctrl-C`. Rocket will stop
14/// accepting new requests, finish handling any pending requests, wait a grace
15/// period before cancelling any outstanding I/O, and return `Ok()` to the
16/// caller of [`Rocket::launch()`]. Graceful shutdown is configured via
17/// [`ShutdownConfig`](crate::config::ShutdownConfig).
18///
19/// [`Rocket::launch()`]: crate::Rocket::launch()
20/// [automatic triggers]: crate::shutdown::Shutdown#triggers
21///
22/// # Detecting Shutdown
23///
24/// `Shutdown` is also a future that resolves when [`Shutdown::notify()`] is
25/// called. This can be used to detect shutdown in any part of the application:
26///
27/// ```rust
28/// # extern crate rocket_community as rocket;
29///
30/// # use rocket::*;
31/// use rocket::Shutdown;
32///
33/// #[get("/wait/for/shutdown")]
34/// async fn wait_for_shutdown(shutdown: Shutdown) -> &'static str {
35///     shutdown.await;
36///     "Somewhere, shutdown was requested."
37/// }
38/// ```
39///
40/// See the [`stream`](crate::response::stream#graceful-shutdown) docs for an
41/// example of detecting shutdown in an infinite responder.
42///
43/// Additionally, a completed shutdown request resolves the future returned from
44/// [`Rocket::launch()`](crate::Rocket::launch()):
45///
46/// ```rust,no_run
47/// # #[macro_use] extern crate rocket_community as rocket;
48/// #
49/// use rocket::Shutdown;
50///
51/// #[get("/shutdown")]
52/// fn shutdown(shutdown: Shutdown) -> &'static str {
53///     shutdown.notify();
54///     "Shutting down..."
55/// }
56///
57/// #[rocket::main]
58/// async fn main() {
59///     let result = rocket::build()
60///         .mount("/", routes![shutdown])
61///         .launch()
62///         .await;
63///
64///     // If the server shut down (by visiting `/shutdown`), `result` is `Ok`.
65///     result.expect("server failed unexpectedly");
66/// }
67/// ```
68#[derive(Debug, Clone)]
69#[must_use = "`Shutdown` does nothing unless polled or `notify`ed"]
70pub struct Shutdown {
71    wire: TripWire,
72}
73
74#[derive(Debug, Clone)]
75pub struct Stages {
76    pub start: Shutdown,
77    pub grace: Shutdown,
78    pub mercy: Shutdown,
79}
80
81impl Shutdown {
82    fn new() -> Self {
83        Shutdown {
84            wire: TripWire::new(),
85        }
86    }
87
88    /// Notify the application to shut down gracefully.
89    ///
90    /// This function returns immediately; pending requests will continue to run
91    /// until completion or expiration of the grace period, which ever comes
92    /// first, before the actual shutdown occurs. The grace period can be
93    /// configured via [`ShutdownConfig`]'s `grace` field.
94    ///
95    /// ```rust
96    /// # extern crate rocket_community as rocket;
97    ///
98    /// # use rocket::*;
99    /// use rocket::Shutdown;
100    ///
101    /// #[get("/shutdown")]
102    /// fn shutdown(shutdown: Shutdown) -> &'static str {
103    ///     shutdown.notify();
104    ///     "Shutting down..."
105    /// }
106    /// ```
107    #[inline(always)]
108    pub fn notify(&self) {
109        self.wire.trip();
110    }
111
112    /// Returns `true` if `Shutdown::notify()` has already been called.
113    ///
114    /// # Example
115    ///
116    /// ```rust
117    /// # extern crate rocket_community as rocket;
118    ///
119    /// # use rocket::*;
120    /// use rocket::Shutdown;
121    ///
122    /// #[get("/shutdown")]
123    /// fn shutdown(shutdown: Shutdown) {
124    ///     shutdown.notify();
125    ///     assert!(shutdown.notified());
126    /// }
127    /// ```
128    #[must_use]
129    #[inline(always)]
130    pub fn notified(&self) -> bool {
131        self.wire.tripped()
132    }
133}
134
135impl Future for Shutdown {
136    type Output = ();
137
138    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
139        self.wire.poll_unpin(cx)
140    }
141}
142
143#[crate::async_trait]
144impl<'r> FromRequest<'r> for Shutdown {
145    type Error = std::convert::Infallible;
146
147    #[inline]
148    async fn from_request(request: &'r Request<'_>) -> Outcome<Self, Self::Error> {
149        Outcome::Success(request.rocket().shutdown())
150    }
151}
152
153impl Stages {
154    pub fn new() -> Self {
155        Stages {
156            start: Shutdown::new(),
157            grace: Shutdown::new(),
158            mercy: Shutdown::new(),
159        }
160    }
161
162    pub(crate) fn spawn_listener(&self, config: &ShutdownConfig) {
163        use futures::future::{select, Either};
164        use futures::stream;
165
166        let mut signal = match config.signal_stream() {
167            Some(stream) => Either::Left(stream.chain(stream::pending())),
168            None => Either::Right(stream::pending()),
169        };
170
171        let start = self.start.clone();
172        let (grace, grace_duration) = (self.grace.clone(), config.grace());
173        let (mercy, mercy_duration) = (self.mercy.clone(), config.mercy());
174        tokio::spawn(async move {
175            if let Either::Left((sig, start)) = select(signal.next(), start).await {
176                warn!("Received {}. Shutdown started.", sig.unwrap());
177                start.notify();
178            }
179
180            tokio::time::sleep(grace_duration).await;
181            warn!("Shutdown grace period elapsed. Shutting down I/O.");
182            grace.notify();
183
184            tokio::time::sleep(mercy_duration).await;
185            warn!("Mercy period elapsed. Terminating I/O.");
186            mercy.notify();
187        });
188    }
189}
190
191#[cfg(test)]
192mod tests {
193    use super::Shutdown;
194
195    #[test]
196    fn ensure_is_send_sync_clone_unpin() {
197        fn is_send_sync_clone_unpin<T: Send + Sync + Clone + Unpin>() {}
198        is_send_sync_clone_unpin::<Shutdown>();
199    }
200}