use std;
use std::io;
use libc;
use futures::{Future, Stream};
use tokio_signal;
use tokio_signal::unix;
use prelude::*;
#[derive(PartialEq, Clone, Copy, Debug)]
pub enum SignalType {
Hup,
Int,
Term,
Quit,
Child,
}
impl ResponseType for SignalType {
type Item = ();
type Error = ();
}
pub struct Signal(pub SignalType);
impl ResponseType for Signal {
type Item = ();
type Error = ();
}
pub struct ProcessSignals {
subscribers: Vec<Box<Subscriber<Signal>>>,
}
impl Default for ProcessSignals {
fn default() -> Self {
ProcessSignals{subscribers: Vec::new()}
}
}
impl Actor for ProcessSignals {
type Context = Context<Self>;
}
impl Supervised for ProcessSignals {}
impl SystemService for ProcessSignals {
fn service_started(&mut self, ctx: &mut Context<Self>) {
let handle = Arbiter::handle();
tokio_signal::ctrl_c(handle).map_err(|_| ())
.actfuture()
.map(|sig, _: &mut ProcessSignals, ctx: &mut Context<Self>|
ctx.add_stream(sig.map(|_| SignalType::Int)))
.spawn(ctx);
unix::Signal::new(libc::SIGHUP, handle).map_err(|_| ())
.actfuture()
.map(|sig, _: &mut ProcessSignals, ctx: &mut Context<Self>|
ctx.add_stream(sig.map(|_| SignalType::Hup)))
.spawn(ctx);
unix::Signal::new(libc::SIGTERM, handle).map_err(|_| ())
.actfuture()
.map(|sig, _: &mut Self, ctx: &mut Context<Self>|
ctx.add_stream(sig.map(|_| SignalType::Term)))
.spawn(ctx);
unix::Signal::new(libc::SIGQUIT, handle).map_err(|_| ())
.actfuture()
.map(|sig, _: &mut ProcessSignals, ctx: &mut Context<Self>|
ctx.add_stream(sig.map(|_| SignalType::Quit)))
.spawn(ctx);
unix::Signal::new(libc::SIGCHLD, handle).map_err(|_| ())
.actfuture()
.map(|sig, _: &mut ProcessSignals, ctx: &mut Context<Self>|
ctx.add_stream(sig.map(|_| SignalType::Child)))
.spawn(ctx);
}
}
#[doc(hidden)]
impl StreamHandler<SignalType, io::Error> for ProcessSignals {}
#[doc(hidden)]
impl Handler<SignalType, io::Error> for ProcessSignals {
fn handle(&mut self, msg: SignalType, _: &mut Context<Self>) -> Response<Self, SignalType>
{
let subscribers = std::mem::replace(&mut self.subscribers, Vec::new());
for subscr in subscribers {
if subscr.send(Signal(msg)).is_ok() {
self.subscribers.push(subscr);
}
}
Self::empty()
}
fn error(&mut self, err: io::Error, _: &mut Context<ProcessSignals>) {
error!("Error during signal handling: {}", err);
}
}
pub struct Subscribe(pub Box<Subscriber<Signal> + Send>);
impl ResponseType for Subscribe {
type Item = ();
type Error = ();
}
impl Handler<Subscribe> for ProcessSignals {
fn handle(&mut self, msg: Subscribe,
_: &mut Context<ProcessSignals>) -> Response<Self, Subscribe>
{
self.subscribers.push(msg.0);
Self::empty()
}
}
pub struct DefaultSignalsHandler;
impl Default for DefaultSignalsHandler {
fn default() -> Self {
DefaultSignalsHandler
}
}
impl Actor for DefaultSignalsHandler {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Context<Self>) {
let addr = Arbiter::system_registry().get::<ProcessSignals>();
let slf: SyncAddress<_> = ctx.address();
addr.send(Subscribe(slf.subscriber()))
}
}
impl Handler<Signal> for DefaultSignalsHandler {
fn handle(&mut self, msg: Signal, _: &mut Context<Self>) -> Response<Self, Signal>
{
match msg.0 {
SignalType::Int => {
info!("SIGINT received, exiting");
Arbiter::system().send(msgs::SystemExit(0));
}
SignalType::Hup => {
info!("SIGHUP received, reloading");
}
SignalType::Term => {
info!("SIGTERM received, stopping");
Arbiter::system().send(msgs::SystemExit(0));
}
SignalType::Quit => {
info!("SIGQUIT received, exiting");
Arbiter::system().send(msgs::SystemExit(0));
}
_ => (),
};
Self::empty()
}
}