use std::convert::TryFrom;
use std::ffi::OsString;
use std::future::Future;
use std::path::Path;
use std::pin::Pin;
use std::str::FromStr;
use std::task::{self, Poll};
use std::time::Duration;
use std::{env, io, process};
use heph::actor;
use heph::messages::Terminate;
use log::{as_debug, debug, warn};
use mio::net::UnixDatagram;
use mio::Interest;
use socket2::SockRef;
use crate::timer::Interval;
use crate::util::{either, next};
use crate::{self as rt, Bound, Signal};
#[derive(Debug)]
pub struct Notify {
socket: UnixDatagram,
watch_dog: Option<Duration>,
}
impl Notify {
pub fn new<M, RT>(ctx: &mut actor::Context<M, RT>) -> io::Result<Option<Notify>>
where
RT: rt::Access,
{
const SOCKET_ENV_VAR: &str = "NOTIFY_SOCKET";
const WATCHDOG_PID_ENV_VAR: &str = "WATCHDOG_PID";
const WATCHDOG_USEC_ENV_VAR: &str = "WATCHDOG_USEC";
let socket_path = env::var_os(SOCKET_ENV_VAR);
let watchdog_pid = env::var_os(WATCHDOG_PID_ENV_VAR);
let mut watchdog_timeout = env::var_os(WATCHDOG_USEC_ENV_VAR);
if let Some(watchdog_pid) = watchdog_pid {
match parse_os_string::<u32>(watchdog_pid) {
Ok(pid) if pid == process::id() => {}
_ => watchdog_timeout = None,
}
}
let mut notifier = match socket_path {
Some(path) => Notify::connect(ctx, Path::new(&path))?,
None => return Ok(None),
};
if let Some(watchdog_timeout) = watchdog_timeout {
match parse_os_string(watchdog_timeout) {
Ok(micros) => {
let timeout = Duration::from_micros(micros);
notifier.set_watchdog_timeout(Some(timeout));
}
Err(()) => {
warn!(
"{} environment variable is invalid, ignoring it",
WATCHDOG_USEC_ENV_VAR
);
}
}
}
Ok(Some(notifier))
}
pub fn connect<M, RT, P>(ctx: &mut actor::Context<M, RT>, path: P) -> io::Result<Notify>
where
RT: rt::Access,
P: AsRef<Path>,
{
let mut socket = UnixDatagram::unbound()?;
socket.connect(path)?;
ctx.runtime().register(&mut socket, Interest::WRITABLE)?;
if let Some(cpu) = ctx.runtime_ref().cpu() {
if let Err(err) = SockRef::from(&socket).set_cpu_affinity(cpu) {
warn!("failed to set CPU affinity on systemd::Notify: {}", err);
}
}
Ok(Notify {
socket,
watch_dog: None,
})
}
pub fn set_watchdog_timeout(&mut self, timeout: Option<Duration>) {
self.watch_dog = timeout;
}
pub fn watchdog_timeout(&self) -> Option<Duration> {
self.watch_dog
}
pub fn change_state<'a>(&'a self, state: State, status: Option<&str>) -> ChangeState<'a> {
let state_line = match state {
State::Ready => "READY=1\n",
State::Reloading => "RELOADING=1\n",
State::Stopping => "STOPPING=1\n",
};
let state_update = match status {
Some(status) => {
let mut state_update =
String::with_capacity(state_line.len() + 7 + status.len() + 1);
state_update.push_str(state_line);
state_update.push_str("STATUS=");
state_update.push_str(status);
replace_newline(&mut state_update[state_line.len() + 7..]);
state_update.push('\n');
state_update
}
None => String::from(state_line),
};
ChangeState {
notifier: self,
state_update,
}
}
pub fn change_status<'a>(&'a self, status: &str) -> ChangeState<'a> {
let mut state_update = String::with_capacity(7 + status.len() + 1);
state_update.push_str("STATUS=");
state_update.push_str(status);
replace_newline(&mut state_update[7..]);
state_update.push('\n');
ChangeState {
notifier: self,
state_update,
}
}
pub fn ping_watchdog<'a>(&'a self) -> PingWatchdog<'a> {
PingWatchdog { notifier: self }
}
pub fn trigger_watchdog<'a>(&'a self) -> TriggerWatchdog<'a> {
TriggerWatchdog { notifier: self }
}
}
fn replace_newline(status: &mut str) {
for b in unsafe { status.as_bytes_mut().iter_mut() } {
match *b {
b'\r' | b'\n' => *b = b' ',
_ => {}
}
}
}
fn parse_os_string<T: FromStr>(str: OsString) -> Result<T, ()> {
match str.into_string() {
Ok(str) => match str.parse() {
Ok(value) => Ok(value),
Err(_) => Err(()),
},
Err(_) => Err(()),
}
}
#[derive(Copy, Clone, Debug)]
pub enum State {
Ready,
Reloading,
Stopping,
}
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ChangeState<'a> {
notifier: &'a Notify,
state_update: String,
}
impl<'a> Future for ChangeState<'a> {
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<Self::Output> {
try_io!(self.notifier.socket.send(self.state_update.as_bytes())).map_ok(|_| ())
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct PingWatchdog<'a> {
notifier: &'a Notify,
}
impl<'a> Future for PingWatchdog<'a> {
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<Self::Output> {
try_io!(self.notifier.socket.send(b"WATCHDOG=1")).map_ok(|_| ())
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct TriggerWatchdog<'a> {
notifier: &'a Notify,
}
impl<'a> Future for TriggerWatchdog<'a> {
type Output = io::Result<()>;
fn poll(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<Self::Output> {
try_io!(self.notifier.socket.send(b"WATCHDOG=trigger")).map_ok(|_| ())
}
}
impl<RT: rt::Access> Bound<RT> for Notify {
type Error = io::Error;
fn bind_to<M>(&mut self, ctx: &mut actor::Context<M, RT>) -> io::Result<()> {
ctx.runtime()
.reregister(&mut self.socket, Interest::WRITABLE)
}
}
#[allow(clippy::future_not_send)]
pub async fn actor<RT, H, E>(
mut ctx: actor::Context<ServiceMessage, RT>,
mut health_check: H,
) -> io::Result<()>
where
RT: rt::Access + Clone,
H: FnMut() -> Result<(), E>,
E: ToString,
{
let notify = match Notify::new(&mut ctx)? {
Some(notify) => notify,
None => {
debug!("not started via systemd, not starting `systemd::actor`");
return Ok(());
}
};
notify.change_state(State::Ready, None).await?;
if let Some(timeout) = notify.watchdog_timeout() {
debug!(timeout = as_debug!(timeout); "started via systemd with watchdog");
let mut interval = Interval::every(&mut ctx, timeout);
loop {
match either(ctx.receive_next(), next(&mut interval)).await {
Ok(Ok(msg)) => match msg {
ServiceMessage::ChangeState { state, status } => {
debug!(
"setting state to {:?}, {:?} with service manager",
state, status
);
notify.change_state(state, status.as_deref()).await?;
if let State::Stopping = state {
return Ok(());
}
}
ServiceMessage::ChangeStatus(status) => {
debug!("setting status with service manager to '{}'", status);
notify.change_status(&status).await?;
}
},
Ok(Err(_)) => {
warn!("all references to the systemd::watchdog are dropped, stopping it");
return Ok(());
}
Err(_) => {
if let Err(err) = health_check() {
let err = err.to_string();
debug!("setting status with service manager to '{}'", err);
notify.change_status(&err).await?;
} else {
debug!("pinging service manager watchdog");
notify.ping_watchdog().await?;
}
}
}
}
} else {
while let Ok(msg) = ctx.receive_next().await {
match msg {
ServiceMessage::ChangeState { state, status } => {
debug!(
"setting state to {:?}, {:?} with service manager",
state, status
);
notify.change_state(state, status.as_deref()).await?;
if let State::Stopping = state {
return Ok(());
}
}
ServiceMessage::ChangeStatus(status) => {
debug!("setting status with service manager to '{}'", status);
notify.change_status(&status).await?;
}
}
}
debug!("setting state to stopping with service manager");
notify.change_state(State::Stopping, None).await
}
}
#[non_exhaustive]
#[derive(Debug)]
pub enum ServiceMessage {
ChangeState {
state: State,
status: Option<String>,
},
ChangeStatus(String),
}
impl From<Terminate> for ServiceMessage {
fn from(_: Terminate) -> ServiceMessage {
ServiceMessage::ChangeState {
state: State::Stopping,
status: None,
}
}
}
impl TryFrom<Signal> for ServiceMessage {
type Error = ();
fn try_from(signal: Signal) -> Result<Self, Self::Error> {
match signal {
Signal::Interrupt | Signal::Terminate | Signal::Quit => {
Ok(ServiceMessage::ChangeState {
state: State::Stopping,
status: None,
})
}
_ => Err(()),
}
}
}