use std::sync::Arc;
use std::time::Duration;
use tokio::process::Command;
use tokio::sync::Mutex;
use tokio::sync::watch;
use tracing::{error, info, warn};
use crate::config::{ProcessDef, RestartPolicy};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ProcessStatus {
Starting,
Running,
Stopped,
Failed { restarts: u32 },
}
pub struct ProcessSupervisor {
def: ProcessDef,
status: Arc<Mutex<ProcessStatus>>,
}
impl ProcessSupervisor {
pub fn new(def: ProcessDef) -> Self {
Self {
def,
status: Arc::new(Mutex::new(ProcessStatus::Starting)),
}
}
pub fn name(&self) -> &str {
&self.def.name
}
pub async fn status(&self) -> ProcessStatus {
*self.status.lock().await
}
pub async fn run(self: Arc<Self>, mut shutdown: watch::Receiver<bool>) {
let mut restarts: u32 = 0;
loop {
*self.status.lock().await = ProcessStatus::Running;
let parts: Vec<&str> = self.def.command.split_whitespace().collect();
let (prog, args) = parts.split_first().unwrap_or((&"true", &[]));
let mut child = match Command::new(prog).args(args).spawn() {
Ok(c) => c,
Err(e) => {
error!(process = %self.def.name, error = ?e, "spawn failed");
*self.status.lock().await = ProcessStatus::Failed { restarts };
return;
}
};
info!(process = %self.def.name, pid = ?child.id(), "started");
tokio::select! {
status = child.wait() => {
let code = status.map(|s| s.code()).unwrap_or(None);
let should_restart = match self.def.restart {
RestartPolicy::Always => true,
RestartPolicy::OnFailure => code != Some(0),
RestartPolicy::Never => false,
};
if !should_restart || restarts >= self.def.max_restarts {
warn!(process = %self.def.name, "not restarting");
*self.status.lock().await = ProcessStatus::Failed { restarts };
return;
}
restarts += 1;
warn!(process = %self.def.name, restarts, "restarting");
tokio::time::sleep(self.def.restart_delay).await;
}
_ = shutdown.changed() => {
if *shutdown.borrow() {
#[cfg(unix)]
if let Some(pid) = child.id() {
unsafe { libc::kill(pid as libc::pid_t, libc::SIGTERM); }
}
let _ = tokio::time::timeout(Duration::from_secs(5), child.wait()).await;
*self.status.lock().await = ProcessStatus::Stopped;
return;
}
}
}
}
}
}