use std::{
collections::{BTreeMap, HashSet},
mem,
time::Duration,
};
#[cfg(unix)]
use actix::AsyncContext;
use actix::{
prelude::{Actor, Context},
Addr, Handler, Message, Recipient, ResponseFuture, System,
};
use derive_more::Display;
use failure::Fail;
use futures::{future, stream, FutureExt as _, StreamExt as _};
use tokio::time::timeout;
use crate::log::prelude::*;
#[derive(Clone, Copy, Eq, Ord, PartialOrd, PartialEq)]
pub struct Priority(pub u8);
#[derive(Debug, Message)]
#[rtype(result = "()")]
pub struct ShutdownGracefully;
pub struct GracefulShutdown {
subs: BTreeMap<Priority, HashSet<Recipient<ShutdownGracefully>>>,
timeout: Duration,
state: State,
}
enum State {
Listening,
ShuttingDown,
}
impl GracefulShutdown {
#[inline]
#[must_use]
pub fn new(timeout: Duration) -> Self {
Self {
subs: BTreeMap::new(),
timeout,
state: State::Listening,
}
}
}
impl Actor for GracefulShutdown {
type Context = Context<Self>;
#[cfg(not(unix))]
fn started(&mut self, _: &mut Self::Context) {
warn!(
"Graceful shutdown is disabled: only UNIX signals are supported, \
and current platform is not UNIX"
);
}
#[cfg(unix)]
fn started(&mut self, ctx: &mut Self::Context) {
use tokio::signal::unix::{signal, SignalKind};
let mut register_sig = |kind: SignalKind, num: i32| match signal(kind) {
Ok(sig_stream) => {
ctx.add_message_stream(sig_stream.map(move |_| OsSignal(num)));
}
Err(e) => error!("Cannot register OsSignal: {:?}", e),
};
register_sig(SignalKind::hangup(), 1);
register_sig(SignalKind::interrupt(), 2);
register_sig(SignalKind::quit(), 3);
register_sig(SignalKind::terminate(), 15);
}
fn stopped(&mut self, _: &mut Self::Context) {
if let State::ShuttingDown = self.state {
info!("Graceful shutdown has been completed");
}
}
}
#[derive(Message)]
#[rtype(result = "()")]
struct OsSignal(i32);
impl Handler<OsSignal> for GracefulShutdown {
type Result = ResponseFuture<()>;
fn handle(&mut self, sig: OsSignal, _: &mut Context<Self>) -> Self::Result {
info!("OS signal '{}' received", sig.0);
match self.state {
State::ShuttingDown => {
return future::ready(()).boxed_local();
}
State::Listening => {
self.state = State::ShuttingDown;
}
}
info!("Initiating graceful shutdown...");
if self.subs.is_empty() {
System::current().stop();
return future::ready(()).boxed_local();
}
let subs = mem::replace(&mut self.subs, BTreeMap::new());
let ordered_subs: Vec<_> = subs
.into_iter()
.rev()
.map(|(_, addrs)| {
let addrs: Vec<_> = addrs
.into_iter()
.map(|addr| async move {
if let Err(e) = addr.send(ShutdownGracefully).await {
error!("Error requesting shutdown: {}", e);
};
})
.collect();
future::join_all(addrs)
})
.collect();
let deadline = self.timeout;
async move {
let wait_finish = timeout(
deadline,
stream::iter(ordered_subs).for_each(|row| row.map(drop)),
)
.await;
if wait_finish.is_ok() {
info!("Graceful shutdown succeeded, stopping system");
} else {
error!("Graceful shutdown has timed out, stopping system");
}
System::current().stop()
}
.boxed_local()
}
}
pub struct Subscriber {
pub priority: Priority,
pub addr: Recipient<ShutdownGracefully>,
}
#[derive(Message)]
#[rtype(result = "Result<(), ShuttingDownError>")]
struct Subscribe(pub Subscriber);
impl Handler<Subscribe> for GracefulShutdown {
type Result = Result<(), ShuttingDownError>;
fn handle(&mut self, m: Subscribe, _: &mut Context<Self>) -> Self::Result {
if let State::ShuttingDown = self.state {
return Err(ShuttingDownError);
}
let addrs = self.subs.entry(m.0.priority).or_default();
addrs.insert(m.0.addr);
Ok(())
}
}
#[derive(Clone, Copy, Debug, Display, Fail)]
#[display(fmt = "Process is shutting down at the moment")]
pub struct ShuttingDownError;
#[derive(Message)]
#[rtype(result = "()")]
struct Unsubscribe(pub Subscriber);
impl Handler<Unsubscribe> for GracefulShutdown {
type Result = ();
fn handle(&mut self, m: Unsubscribe, _: &mut Context<Self>) {
let mut remove = false;
if let Some(addrs) = self.subs.get_mut(&m.0.priority) {
addrs.remove(&m.0.addr);
if addrs.is_empty() {
remove = true;
}
}
if remove {
self.subs.remove(&m.0.priority);
}
}
}
pub fn subscribe(
shutdown_addr: &Addr<GracefulShutdown>,
subscriber: Recipient<ShutdownGracefully>,
priority: Priority,
) {
shutdown_addr.do_send(Subscribe(Subscriber {
priority,
addr: subscriber,
}));
}
pub fn unsubscribe(
shutdown_addr: &Addr<GracefulShutdown>,
subscriber: Recipient<ShutdownGracefully>,
priority: Priority,
) {
shutdown_addr.do_send(Unsubscribe(Subscriber {
priority,
addr: subscriber,
}));
}