folk-plugin-process 0.2.0

Auxiliary process supervisor plugin for Folk — starts, monitors, and restarts sidecar processes
Documentation
use std::sync::Arc;
use std::time::Duration;

use tokio::process::Command;
use tokio::sync::Mutex;
use tokio::sync::watch;
use tracing::{error, info, warn};

use crate::config::{ProcessDef, RestartPolicy};

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ProcessStatus {
    Starting,
    Running,
    Stopped,
    Failed { restarts: u32 },
}

pub struct ProcessSupervisor {
    def: ProcessDef,
    status: Arc<Mutex<ProcessStatus>>,
}

impl ProcessSupervisor {
    pub fn new(def: ProcessDef) -> Self {
        Self {
            def,
            status: Arc::new(Mutex::new(ProcessStatus::Starting)),
        }
    }

    pub fn name(&self) -> &str {
        &self.def.name
    }

    pub async fn status(&self) -> ProcessStatus {
        *self.status.lock().await
    }

    pub async fn run(self: Arc<Self>, mut shutdown: watch::Receiver<bool>) {
        let mut restarts: u32 = 0;

        loop {
            *self.status.lock().await = ProcessStatus::Running;

            let parts: Vec<&str> = self.def.command.split_whitespace().collect();
            let (prog, args) = parts.split_first().unwrap_or((&"true", &[]));

            let mut child = match Command::new(prog).args(args).spawn() {
                Ok(c) => c,
                Err(e) => {
                    error!(process = %self.def.name, error = ?e, "spawn failed");
                    *self.status.lock().await = ProcessStatus::Failed { restarts };
                    return;
                }
            };

            info!(process = %self.def.name, pid = ?child.id(), "started");

            tokio::select! {
                status = child.wait() => {
                    let code = status.map(|s| s.code()).unwrap_or(None);
                    let should_restart = match self.def.restart {
                        RestartPolicy::Always => true,
                        RestartPolicy::OnFailure => code != Some(0),
                        RestartPolicy::Never => false,
                    };

                    if !should_restart || restarts >= self.def.max_restarts {
                        warn!(process = %self.def.name, "not restarting");
                        *self.status.lock().await = ProcessStatus::Failed { restarts };
                        return;
                    }

                    restarts += 1;
                    warn!(process = %self.def.name, restarts, "restarting");
                    tokio::time::sleep(self.def.restart_delay).await;
                }
                _ = shutdown.changed() => {
                    if *shutdown.borrow() {
                        #[cfg(unix)]
                        if let Some(pid) = child.id() {
                            unsafe { libc::kill(pid as libc::pid_t, libc::SIGTERM); }
                        }
                        let _ = tokio::time::timeout(Duration::from_secs(5), child.wait()).await;
                        *self.status.lock().await = ProcessStatus::Stopped;
                        return;
                    }
                }
            }
        }
    }
}