use std::{
collections::HashMap,
io,
num::NonZeroU32,
path::PathBuf,
process::{ExitStatus, Stdio},
sync::atomic::{AtomicU64, Ordering},
};
use metrics::gauge;
use nix::{
errno::Errno,
sys::signal::{kill, SIGTERM},
unistd::Pid,
};
use tokio::process::Command;
use tracing::{error, info};
pub use crate::common::{Behavior, Output};
use crate::{common::stdio, observer::RSS_BYTES, signals::Shutdown};
pub(crate) static RSS_BYTES_LIMIT: AtomicU64 = AtomicU64::new(u64::MAX);
#[allow(clippy::module_name_repetitions)]
pub type TargetPidReceiver = tokio::sync::broadcast::Receiver<Option<u32>>;
#[allow(clippy::module_name_repetitions)]
type TargetPidSender = tokio::sync::broadcast::Sender<Option<u32>>;
#[derive(thiserror::Error, Debug, Clone, Copy)]
pub enum MetaError {
#[error("unable to support bytes greater than u64::MAX")]
ByteLimitTooLarge,
}
#[derive(Debug, Clone, Copy)]
pub struct Meta {}
impl Meta {
#[inline]
#[allow(clippy::cast_possible_truncation)]
pub fn set_rss_bytes_limit(limit: byte_unit::Byte) -> Result<(), MetaError> {
let raw_limit: u128 = limit.get_bytes();
if raw_limit > u128::from(u64::MAX) {
return Err(MetaError::ByteLimitTooLarge);
}
gauge!("rss_bytes_limit", raw_limit as f64);
RSS_BYTES_LIMIT.store(raw_limit as u64, Ordering::Relaxed);
Ok(())
}
#[inline]
pub(crate) fn rss_bytes_limit_exceeded() -> bool {
let limit: u64 = RSS_BYTES_LIMIT.load(Ordering::Relaxed);
let current: u64 = RSS_BYTES.load(Ordering::Relaxed);
gauge!(
"rss_bytes_limit_overage",
current.saturating_sub(limit) as f64
);
current > limit
}
}
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("unable to spawn target: {0}")]
TargetSpawn(io::Error),
#[error("unable to wait for target exit: {0}")]
TargetWait(io::Error),
#[error("unable to create PidFd: {0}")]
PidConversion(io::Error),
#[error("unable to terminate target process: {0}")]
SigTerm(Errno),
#[error("PID not found: {0}")]
PidNotFound(u32),
#[error("target exited unexpectedly: {0:?}")]
TargetExited(Option<ExitStatus>),
}
#[allow(missing_copy_implementations)]
#[derive(Debug, PartialEq, Eq)]
pub struct PidConfig {
pub pid: NonZeroU32,
}
#[derive(Debug, PartialEq, Eq)]
pub struct BinaryConfig {
pub command: PathBuf,
pub arguments: Vec<String>,
pub inherit_environment: bool,
pub environment_variables: HashMap<String, String>,
pub output: Output,
}
#[derive(Debug, PartialEq, Eq)]
pub enum Config {
Pid(PidConfig),
Binary(BinaryConfig),
}
#[derive(Debug)]
pub struct Server {
config: Config,
shutdown: Shutdown,
}
impl Server {
#[must_use]
pub fn new(config: Config, shutdown: Shutdown) -> Self {
Self { config, shutdown }
}
pub async fn run(self, pid_snd: TargetPidSender) -> Result<(), Error> {
let config = self.config;
match config {
Config::Pid(config) => {
Self::watch(config, pid_snd, self.shutdown).await?;
}
Config::Binary(config) => {
Self::execute_binary(config, pid_snd, self.shutdown).await?;
}
}
Ok(())
}
async fn watch(
config: PidConfig,
pid_snd: TargetPidSender,
mut shutdown: Shutdown,
) -> Result<(), Error> {
let raw_pid: i32 = config
.pid
.get()
.try_into()
.map_err(|_| Error::PidNotFound(config.pid.get()))?;
let pid = Pid::from_raw(raw_pid);
let ret = kill(pid, None);
if ret.is_err() {
return Err(Error::PidNotFound(config.pid.get()));
}
pid_snd
.send(Some(config.pid.get()))
.expect("target server unable to transmit PID, catastrophic failure");
drop(pid_snd);
#[cfg(target_os = "linux")]
let target_wait = {
use async_pidfd::AsyncPidFd;
let pidfd = AsyncPidFd::from_pid(raw_pid).map_err(Error::PidConversion)?;
async move {
let exit_info = pidfd.wait().await;
exit_info.map(|info| info.status()).ok()
}
};
#[cfg(not(target_os = "linux"))]
let target_wait = async move {
use std::time::Duration;
use tokio::time::sleep;
loop {
let ret = kill(pid, None);
if ret.is_err() {
break;
}
sleep(Duration::from_secs(1)).await;
}
Option::<ExitStatus>::None
};
tokio::select! {
target_exit = target_wait => {
if let Some(code) = target_exit{
error!("target exited unexpectedly with code {}", code);
Err(Error::TargetExited(Some(code)))
} else {
error!("target exited unexpectedly; exit code unavailable");
Err(Error::TargetExited(None))
}
},
_ = shutdown.recv() => {
info!("shutdown signal received");
Ok(())
}
}
}
async fn execute_binary(
config: BinaryConfig,
pid_snd: TargetPidSender,
mut shutdown: Shutdown,
) -> Result<ExitStatus, Error> {
let mut target_cmd = Command::new(config.command);
target_cmd
.stdin(Stdio::null())
.stdout(stdio(&config.output.stdout))
.stderr(stdio(&config.output.stderr));
if !config.inherit_environment {
target_cmd.env_clear();
}
target_cmd
.kill_on_drop(true)
.args(config.arguments)
.envs(config.environment_variables.iter());
let mut target_child = target_cmd.spawn().map_err(Error::TargetSpawn)?;
let target_id = target_child.id().expect("target must have PID");
pid_snd
.send(Some(target_id))
.expect("target server unable to transmit PID, catastrophic failure");
drop(pid_snd);
tokio::select! {
res = target_child.wait() => {
match res {
Ok(res) => {
error!("target exited unexpectedly with code {}", res);
Err(Error::TargetExited(Some(res)))
},
Err(e) => {
error!("target exited unexpectedly; exit code unavailable ({})", e);
Err(Error::TargetExited(None))
},
}
},
_ = shutdown.recv() => {
info!("shutdown signal received");
let pid: Pid = Pid::from_raw(target_id.try_into().unwrap());
kill(pid, SIGTERM).map_err(Error::SigTerm)?;
let res = target_child.wait().await.map_err(Error::TargetWait)?;
Ok(res)
}
}
}
}