use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::{FutureExt, StreamExt};
use crate::request::{FromRequest, Outcome, Request};
use crate::shutdown::{ShutdownConfig, TripWire};
#[derive(Debug, Clone)]
#[must_use = "`Shutdown` does nothing unless polled or `notify`ed"]
pub struct Shutdown {
wire: TripWire,
}
#[derive(Debug, Clone)]
pub struct Stages {
pub start: Shutdown,
pub grace: Shutdown,
pub mercy: Shutdown,
}
impl Shutdown {
fn new() -> Self {
Shutdown {
wire: TripWire::new(),
}
}
#[inline(always)]
pub fn notify(&self) {
self.wire.trip();
}
#[must_use]
#[inline(always)]
pub fn notified(&self) -> bool {
self.wire.tripped()
}
}
impl Future for Shutdown {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.wire.poll_unpin(cx)
}
}
#[crate::async_trait]
impl<'r> FromRequest<'r> for Shutdown {
type Error = std::convert::Infallible;
#[inline]
async fn from_request(request: &'r Request<'_>) -> Outcome<Self, Self::Error> {
Outcome::Success(request.rocket().shutdown())
}
}
impl Stages {
pub fn new() -> Self {
Stages {
start: Shutdown::new(),
grace: Shutdown::new(),
mercy: Shutdown::new(),
}
}
pub(crate) fn spawn_listener(&self, config: &ShutdownConfig) {
use futures::future::{select, Either};
use futures::stream;
let mut signal = match config.signal_stream() {
Some(stream) => Either::Left(stream.chain(stream::pending())),
None => Either::Right(stream::pending()),
};
let start = self.start.clone();
let (grace, grace_duration) = (self.grace.clone(), config.grace());
let (mercy, mercy_duration) = (self.mercy.clone(), config.mercy());
tokio::spawn(async move {
if let Either::Left((sig, start)) = select(signal.next(), start).await {
warn!("Received {}. Shutdown started.", sig.unwrap());
start.notify();
}
tokio::time::sleep(grace_duration).await;
warn!("Shutdown grace period elapsed. Shutting down I/O.");
grace.notify();
tokio::time::sleep(mercy_duration).await;
warn!("Mercy period elapsed. Terminating I/O.");
mercy.notify();
});
}
}
#[cfg(test)]
mod tests {
use super::Shutdown;
#[test]
fn ensure_is_send_sync_clone_unpin() {
fn is_send_sync_clone_unpin<T: Send + Sync + Clone + Unpin>() {}
is_send_sync_clone_unpin::<Shutdown>();
}
}