folk-plugin-process 0.2.2

Auxiliary process supervisor plugin for Folk — starts, monitors, and restarts sidecar processes
Documentation
use std::process::Stdio;
use std::sync::Arc;
use std::time::Instant;

use tokio::process::Command;
use tokio::sync::{Mutex, Notify, watch};
use tracing::{error, info, warn};

use crate::config::{OutputTarget, ProcessDef};
use crate::metrics::ProcessMetrics;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ProcessStatus {
    Starting,
    Running,
    Stopped,
    Failed { restarts: u32 },
}

pub struct ProcessSupervisor {
    def: ProcessDef,
    status: Arc<Mutex<ProcessStatus>>,
    restart_notify: Arc<Notify>,
    started_at: Arc<Mutex<Option<Instant>>>,
    last_exit_code: Arc<Mutex<Option<i32>>>,
}

impl ProcessSupervisor {
    pub fn new(def: ProcessDef) -> Self {
        Self {
            def,
            status: Arc::new(Mutex::new(ProcessStatus::Starting)),
            restart_notify: Arc::new(Notify::new()),
            started_at: Arc::new(Mutex::new(None)),
            last_exit_code: Arc::new(Mutex::new(None)),
        }
    }

    pub fn name(&self) -> &str {
        &self.def.name
    }

    pub async fn status(&self) -> ProcessStatus {
        *self.status.lock().await
    }

    pub async fn uptime_secs(&self) -> f64 {
        self.started_at
            .lock()
            .await
            .map(|t| t.elapsed().as_secs_f64())
            .unwrap_or(0.0)
    }

    pub async fn last_exit_code(&self) -> Option<i32> {
        *self.last_exit_code.lock().await
    }

    pub fn request_restart(&self) {
        self.restart_notify.notify_one();
    }

    fn build_command(&self, parts: &[String]) -> Result<Command, String> {
        let (prog, args) = parts
            .split_first()
            .ok_or_else(|| "empty command".to_string())?;

        let mut cmd = Command::new(prog);
        cmd.args(args);

        if let Some(dir) = &self.def.directory {
            cmd.current_dir(dir);
        }

        if !self.def.env.is_empty() {
            cmd.envs(&self.def.env);
        }

        cmd.stdout(output_target_to_stdio(&self.def.logging.stdout));
        cmd.stderr(output_target_to_stdio(&self.def.logging.stderr));

        Ok(cmd)
    }

    pub async fn run(
        self: Arc<Self>,
        mut shutdown: watch::Receiver<bool>,
        metrics: Option<Arc<ProcessMetrics>>,
    ) {
        let mut restarts: u32 = 0;

        let parts = match shell_words::split(&self.def.command) {
            Ok(p) if !p.is_empty() => p,
            Ok(_) => {
                error!(process = %self.def.name, "empty command");
                *self.status.lock().await = ProcessStatus::Failed { restarts: 0 };
                return;
            }
            Err(e) => {
                error!(process = %self.def.name, error = %e, "command parse failed");
                *self.status.lock().await = ProcessStatus::Failed { restarts: 0 };
                return;
            }
        };

        loop {
            *self.status.lock().await = ProcessStatus::Running;
            *self.started_at.lock().await = Some(Instant::now());

            if let Some(m) = &metrics {
                m.set_up(&self.def.name, true);
                m.set_status(&self.def.name, "running");
            }

            let mut cmd = match self.build_command(&parts) {
                Ok(c) => c,
                Err(e) => {
                    error!(process = %self.def.name, error = %e, "build command failed");
                    *self.status.lock().await = ProcessStatus::Failed { restarts };
                    if let Some(m) = &metrics {
                        m.set_up(&self.def.name, false);
                        m.set_status(&self.def.name, "failed");
                    }
                    return;
                }
            };

            let mut child = match cmd.spawn() {
                Ok(c) => c,
                Err(e) => {
                    error!(process = %self.def.name, error = ?e, "spawn failed");
                    *self.status.lock().await = ProcessStatus::Failed { restarts };
                    if let Some(m) = &metrics {
                        m.set_up(&self.def.name, false);
                        m.set_status(&self.def.name, "failed");
                    }
                    return;
                }
            };

            info!(process = %self.def.name, pid = ?child.id(), "started");

            tokio::select! {
                status = child.wait() => {
                    let code = status.map(|s| s.code()).unwrap_or(None);
                    *self.last_exit_code.lock().await = code;

                    if let Some(m) = &metrics {
                        if let Some(c) = code {
                            m.set_exit_code(&self.def.name, c);
                        }
                    }

                    let should_restart = match self.def.restart {
                        crate::config::RestartPolicy::Always => true,
                        crate::config::RestartPolicy::OnFailure => code != Some(0),
                        crate::config::RestartPolicy::Never => false,
                    };

                    if !should_restart || restarts >= self.def.max_restarts {
                        warn!(process = %self.def.name, exit_code = ?code, "not restarting");
                        *self.status.lock().await = ProcessStatus::Failed { restarts };
                        if let Some(m) = &metrics {
                            m.set_up(&self.def.name, false);
                            m.set_status(&self.def.name, "stopped");
                        }
                        return;
                    }

                    restarts += 1;
                    warn!(process = %self.def.name, restarts, "restarting");
                    if let Some(m) = &metrics {
                        m.inc_restarts(&self.def.name);
                        m.set_up(&self.def.name, false);
                    }
                    tokio::time::sleep(self.def.restart_delay).await;
                }
                _ = self.restart_notify.notified() => {
                    info!(process = %self.def.name, "restart requested");
                    kill_and_wait(&mut child, &self.def).await;
                    if let Some(m) = &metrics {
                        m.inc_restarts(&self.def.name);
                    }
                    // Loop continues — will spawn again
                }
                _ = shutdown.changed() => {
                    if *shutdown.borrow() {
                        kill_and_wait(&mut child, &self.def).await;
                        *self.status.lock().await = ProcessStatus::Stopped;
                        if let Some(m) = &metrics {
                            m.set_up(&self.def.name, false);
                            m.set_status(&self.def.name, "stopped");
                        }
                        return;
                    }
                }
            }
        }
    }
}

async fn kill_and_wait(child: &mut tokio::process::Child, def: &ProcessDef) {
    #[cfg(unix)]
    if let Some(pid) = child.id() {
        let ret = unsafe { libc::kill(pid as libc::pid_t, def.stop_signal.as_libc_signal()) };
        if ret != 0 {
            tracing::warn!(
                pid,
                signal = def.stop_signal.as_libc_signal(),
                errno = std::io::Error::last_os_error().raw_os_error().unwrap_or(-1),
                "kill() failed"
            );
        }
    }
    #[cfg(not(unix))]
    {
        let _ = child.kill().await;
    }
    let _ = tokio::time::timeout(def.stop_timeout, child.wait()).await;
}

fn output_target_to_stdio(target: &OutputTarget) -> Stdio {
    match target {
        OutputTarget::Inherit => Stdio::inherit(),
        OutputTarget::Null => Stdio::null(),
        OutputTarget::File(path) => {
            match std::fs::OpenOptions::new()
                .create(true)
                .append(true)
                .open(path)
            {
                Ok(f) => Stdio::from(f),
                Err(e) => {
                    tracing::warn!(path = %path.display(), error = %e, "failed to open log file, falling back to inherit");
                    Stdio::inherit()
                }
            }
        }
    }
}