use std::process::Stdio;
use std::sync::Arc;
use std::time::Instant;
use tokio::process::Command;
use tokio::sync::{Mutex, Notify, watch};
use tracing::{error, info, warn};
use crate::config::{OutputTarget, ProcessDef};
use crate::metrics::ProcessMetrics;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ProcessStatus {
Starting,
Running,
Stopped,
Failed { restarts: u32 },
}
pub struct ProcessSupervisor {
def: ProcessDef,
status: Arc<Mutex<ProcessStatus>>,
restart_notify: Arc<Notify>,
started_at: Arc<Mutex<Option<Instant>>>,
last_exit_code: Arc<Mutex<Option<i32>>>,
}
impl ProcessSupervisor {
pub fn new(def: ProcessDef) -> Self {
Self {
def,
status: Arc::new(Mutex::new(ProcessStatus::Starting)),
restart_notify: Arc::new(Notify::new()),
started_at: Arc::new(Mutex::new(None)),
last_exit_code: Arc::new(Mutex::new(None)),
}
}
pub fn name(&self) -> &str {
&self.def.name
}
pub async fn status(&self) -> ProcessStatus {
*self.status.lock().await
}
pub async fn uptime_secs(&self) -> f64 {
self.started_at
.lock()
.await
.map(|t| t.elapsed().as_secs_f64())
.unwrap_or(0.0)
}
pub async fn last_exit_code(&self) -> Option<i32> {
*self.last_exit_code.lock().await
}
pub fn request_restart(&self) {
self.restart_notify.notify_one();
}
fn build_command(&self, parts: &[String]) -> Result<Command, String> {
let (prog, args) = parts
.split_first()
.ok_or_else(|| "empty command".to_string())?;
let mut cmd = Command::new(prog);
cmd.args(args);
if let Some(dir) = &self.def.directory {
cmd.current_dir(dir);
}
if !self.def.env.is_empty() {
cmd.envs(&self.def.env);
}
cmd.stdout(output_target_to_stdio(&self.def.logging.stdout));
cmd.stderr(output_target_to_stdio(&self.def.logging.stderr));
Ok(cmd)
}
pub async fn run(
self: Arc<Self>,
mut shutdown: watch::Receiver<bool>,
metrics: Option<Arc<ProcessMetrics>>,
) {
let mut restarts: u32 = 0;
let parts = match shell_words::split(&self.def.command) {
Ok(p) if !p.is_empty() => p,
Ok(_) => {
error!(process = %self.def.name, "empty command");
*self.status.lock().await = ProcessStatus::Failed { restarts: 0 };
return;
}
Err(e) => {
error!(process = %self.def.name, error = %e, "command parse failed");
*self.status.lock().await = ProcessStatus::Failed { restarts: 0 };
return;
}
};
loop {
*self.status.lock().await = ProcessStatus::Running;
*self.started_at.lock().await = Some(Instant::now());
if let Some(m) = &metrics {
m.set_up(&self.def.name, true);
m.set_status(&self.def.name, "running");
}
let mut cmd = match self.build_command(&parts) {
Ok(c) => c,
Err(e) => {
error!(process = %self.def.name, error = %e, "build command failed");
*self.status.lock().await = ProcessStatus::Failed { restarts };
if let Some(m) = &metrics {
m.set_up(&self.def.name, false);
m.set_status(&self.def.name, "failed");
}
return;
}
};
let mut child = match cmd.spawn() {
Ok(c) => c,
Err(e) => {
error!(process = %self.def.name, error = ?e, "spawn failed");
*self.status.lock().await = ProcessStatus::Failed { restarts };
if let Some(m) = &metrics {
m.set_up(&self.def.name, false);
m.set_status(&self.def.name, "failed");
}
return;
}
};
info!(process = %self.def.name, pid = ?child.id(), "started");
tokio::select! {
status = child.wait() => {
let code = status.map(|s| s.code()).unwrap_or(None);
*self.last_exit_code.lock().await = code;
if let Some(m) = &metrics {
if let Some(c) = code {
m.set_exit_code(&self.def.name, c);
}
}
let should_restart = match self.def.restart {
crate::config::RestartPolicy::Always => true,
crate::config::RestartPolicy::OnFailure => code != Some(0),
crate::config::RestartPolicy::Never => false,
};
if !should_restart || restarts >= self.def.max_restarts {
warn!(process = %self.def.name, exit_code = ?code, "not restarting");
*self.status.lock().await = ProcessStatus::Failed { restarts };
if let Some(m) = &metrics {
m.set_up(&self.def.name, false);
m.set_status(&self.def.name, "stopped");
}
return;
}
restarts += 1;
warn!(process = %self.def.name, restarts, "restarting");
if let Some(m) = &metrics {
m.inc_restarts(&self.def.name);
m.set_up(&self.def.name, false);
}
tokio::time::sleep(self.def.restart_delay).await;
}
_ = self.restart_notify.notified() => {
info!(process = %self.def.name, "restart requested");
kill_and_wait(&mut child, &self.def).await;
if let Some(m) = &metrics {
m.inc_restarts(&self.def.name);
}
}
_ = shutdown.changed() => {
if *shutdown.borrow() {
kill_and_wait(&mut child, &self.def).await;
*self.status.lock().await = ProcessStatus::Stopped;
if let Some(m) = &metrics {
m.set_up(&self.def.name, false);
m.set_status(&self.def.name, "stopped");
}
return;
}
}
}
}
}
}
async fn kill_and_wait(child: &mut tokio::process::Child, def: &ProcessDef) {
#[cfg(unix)]
if let Some(pid) = child.id() {
let ret = unsafe { libc::kill(pid as libc::pid_t, def.stop_signal.as_libc_signal()) };
if ret != 0 {
tracing::warn!(
pid,
signal = def.stop_signal.as_libc_signal(),
errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(-1),
"kill() failed"
);
}
}
#[cfg(not(unix))]
{
let _ = child.kill().await;
}
let _ = tokio::time::timeout(def.stop_timeout, child.wait()).await;
}
fn output_target_to_stdio(target: &OutputTarget) -> Stdio {
match target {
OutputTarget::Inherit => Stdio::inherit(),
OutputTarget::Null => Stdio::null(),
OutputTarget::File(path) => {
match std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)
{
Ok(f) => Stdio::from(f),
Err(e) => {
tracing::warn!(path = %path.display(), error = %e, "failed to open log file, falling back to inherit");
Stdio::inherit()
}
}
}
}
}