use std;
use std::io;
use libc;
use futures::{Future, Stream};
use tokio_signal;
#[cfg(unix)]
use tokio_signal::unix;
use prelude::*;
#[derive(PartialEq, Clone, Copy, Debug)]
pub enum SignalType {
Hup,
Int,
Term,
Quit,
Child,
}
impl ResponseType for SignalType {
type Item = ();
type Error = ();
}
pub struct Signal(pub SignalType);
impl ResponseType for Signal {
type Item = ();
type Error = ();
}
pub struct ProcessSignals {
subscribers: Vec<Box<actix::Subscriber<Signal>>>,
}
impl Default for ProcessSignals {
fn default() -> Self {
ProcessSignals{subscribers: Vec::new()}
}
}
impl Actor for ProcessSignals {
type Context = Context<Self>;
}
impl actix::Supervised for ProcessSignals {}
impl actix::SystemService for ProcessSignals {
fn service_started(&mut self, ctx: &mut Self::Context) {
let handle = actix::Arbiter::handle();
tokio_signal::ctrl_c(handle).map_err(|_| ())
.actfuture()
.map(|sig, _: &mut Self, ctx: &mut actix::Context<Self>|
ctx.add_stream(sig.map(|_| SignalType::Int)))
.spawn(ctx);
#[cfg(unix)]
{
unix::Signal::new(libc::SIGHUP, handle).map_err(|_| ())
.actfuture()
.map(|sig, _: &mut Self, ctx: &mut actix::Context<Self>|
ctx.add_stream(sig.map(|_| SignalType::Hup)))
.spawn(ctx);
unix::Signal::new(libc::SIGTERM, handle).map_err(|_| ())
.actfuture()
.map(|sig, _: &mut Self, ctx: &mut actix::Context<Self>|
ctx.add_stream(sig.map(|_| SignalType::Term)))
.spawn(ctx);
unix::Signal::new(libc::SIGQUIT, handle).map_err(|_| ())
.actfuture()
.map(|sig, _: &mut Self, ctx: &mut actix::Context<Self>|
ctx.add_stream(sig.map(|_| SignalType::Quit)))
.spawn(ctx);
unix::Signal::new(libc::SIGCHLD, handle).map_err(|_| ())
.actfuture()
.map(|sig, _: &mut Self, ctx: &mut actix::Context<Self>|
ctx.add_stream(sig.map(|_| SignalType::Child)))
.spawn(ctx);
}
}
}
#[doc(hidden)]
impl Handler<io::Result<SignalType>> for ProcessSignals {
type Result = ();
fn handle(&mut self, msg: io::Result<SignalType>, _: &mut Self::Context) {
match msg {
Ok(sig) => {
let subscribers = std::mem::replace(&mut self.subscribers, Vec::new());
for subscr in subscribers {
if subscr.send(Signal(sig)).is_ok() {
self.subscribers.push(subscr);
}
}
},
Err(err) => {
error!("Error during signal handling: {}", err);
}
}
}
}
pub struct Subscribe(pub Box<actix::Subscriber<Signal> + Send>);
impl actix::ResponseType for Subscribe {
type Item = ();
type Error = ();
}
impl actix::Handler<Subscribe> for ProcessSignals {
type Result = ();
fn handle(&mut self, msg: Subscribe, _: &mut Self::Context) {
self.subscribers.push(msg.0);
}
}
pub struct DefaultSignalsHandler;
impl Default for DefaultSignalsHandler {
fn default() -> Self {
DefaultSignalsHandler
}
}
impl Actor for DefaultSignalsHandler {
type Context = actix::Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
let addr = Arbiter::system_registry().get::<ProcessSignals>();
let slf: SyncAddress<_> = ctx.address();
addr.send(Subscribe(slf.subscriber()))
}
}
impl actix::Handler<Signal> for DefaultSignalsHandler {
type Result = ();
fn handle(&mut self, msg: Signal, _: &mut Self::Context) {
match msg.0 {
SignalType::Int => {
info!("SIGINT received, exiting");
Arbiter::system().send(actix::msgs::SystemExit(0));
}
SignalType::Hup => {
info!("SIGHUP received, reloading");
}
SignalType::Term => {
info!("SIGTERM received, stopping");
Arbiter::system().send(actix::msgs::SystemExit(0));
}
SignalType::Quit => {
info!("SIGQUIT received, exiting");
Arbiter::system().send(actix::msgs::SystemExit(0));
}
_ => (),
}
}
}