extern crate tokio_signal;
#[cfg(unix)]
use self::tokio_signal::unix;
use futures::{Future, Stream};
#[cfg(unix)]
use libc;
use std;
use prelude::*;
#[derive(PartialEq, Clone, Copy, Debug)]
pub enum SignalType {
Hup,
Int,
Term,
Quit,
Child,
}
impl Message for SignalType {
type Result = ();
}
#[derive(Debug)]
pub struct Signal(pub SignalType);
impl Message for Signal {
type Result = ();
}
pub struct ProcessSignals {
subscribers: Vec<Recipient<Signal>>,
}
impl Default for ProcessSignals {
fn default() -> Self {
ProcessSignals {
subscribers: Vec::new(),
}
}
}
impl Actor for ProcessSignals {
type Context = Context<Self>;
}
impl actix::Supervised for ProcessSignals {}
impl actix::SystemService for ProcessSignals {
fn service_started(&mut self, ctx: &mut Self::Context) {
tokio_signal::ctrl_c()
.map_err(|_| ())
.actfuture()
.map(|sig, _: &mut Self, ctx: &mut actix::Context<Self>| {
ctx.add_message_stream(sig.map_err(|_| ()).map(|_| SignalType::Int))
})
.spawn(ctx);
#[cfg(unix)]
{
unix::Signal::new(libc::SIGHUP)
.actfuture()
.drop_err()
.map(|sig, _: &mut Self, ctx: &mut actix::Context<Self>| {
ctx.add_message_stream(sig.map_err(|_| ()).map(|_| SignalType::Hup))
})
.spawn(ctx);
unix::Signal::new(libc::SIGTERM)
.map_err(|_| ())
.actfuture()
.map(|sig, _: &mut Self, ctx: &mut actix::Context<Self>| {
ctx.add_message_stream(sig.map_err(|_| ()).map(|_| SignalType::Term))
})
.spawn(ctx);
unix::Signal::new(libc::SIGQUIT)
.map_err(|_| ())
.actfuture()
.map(|sig, _: &mut Self, ctx: &mut actix::Context<Self>| {
ctx.add_message_stream(sig.map_err(|_| ()).map(|_| SignalType::Quit))
})
.spawn(ctx);
unix::Signal::new(libc::SIGCHLD)
.map_err(|_| ())
.actfuture()
.map(|sig, _: &mut Self, ctx: &mut actix::Context<Self>| {
ctx.add_message_stream(
sig.map_err(|_| ()).map(|_| SignalType::Child),
)
})
.spawn(ctx);
}
}
}
#[doc(hidden)]
impl Handler<SignalType> for ProcessSignals {
type Result = ();
fn handle(&mut self, sig: SignalType, _: &mut Self::Context) {
let subscribers = std::mem::replace(&mut self.subscribers, Vec::new());
for subscr in subscribers {
if subscr.do_send(Signal(sig)).is_ok() {
self.subscribers.push(subscr);
}
}
}
}
pub struct Subscribe(pub Recipient<Signal>);
impl Message for Subscribe {
type Result = ();
}
impl actix::Handler<Subscribe> for ProcessSignals {
type Result = ();
fn handle(&mut self, msg: Subscribe, _: &mut Self::Context) {
self.subscribers.push(msg.0);
}
}
pub struct DefaultSignalsHandler;
impl Default for DefaultSignalsHandler {
fn default() -> Self {
DefaultSignalsHandler
}
}
impl Actor for DefaultSignalsHandler {
type Context = actix::Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
let addr = System::current().registry().get::<ProcessSignals>();
let slf = ctx.address();
addr.send(Subscribe(slf.recipient()))
.map(|_| ())
.map_err(|_| ())
.into_actor(self)
.wait(ctx)
}
}
impl actix::Handler<Signal> for DefaultSignalsHandler {
type Result = ();
fn handle(&mut self, msg: Signal, _: &mut Self::Context) {
match msg.0 {
SignalType::Int => {
info!("SIGINT received, exiting");
System::current().stop();
}
SignalType::Hup => {
info!("SIGHUP received, reloading");
}
SignalType::Term => {
info!("SIGTERM received, stopping");
System::current().stop();
}
SignalType::Quit => {
info!("SIGQUIT received, exiting");
System::current().stop();
}
_ => (),
}
}
}