folk-plugin-process 0.2.2

Auxiliary process supervisor plugin for Folk — starts, monitors, and restarts sidecar processes
Documentation
use std::collections::HashMap;
use std::sync::Arc;

use anyhow::Result;
use async_trait::async_trait;
use bytes::Bytes;
use folk_api::{PluginContext, RpcMethodDef, ServerPlugin};
use tracing::info;

use crate::config::ProcessConfig;
use crate::metrics::ProcessMetrics;
use crate::supervisor::{ProcessStatus, ProcessSupervisor};

pub struct ProcessPlugin {
    config: ProcessConfig,
}

impl ProcessPlugin {
    pub fn new(config: ProcessConfig) -> Self {
        Self { config }
    }
}

#[async_trait]
impl ServerPlugin for ProcessPlugin {
    fn name(&self) -> &'static str {
        "process"
    }

    async fn run(&self, ctx: PluginContext) -> Result<()> {
        let metrics = ctx
            .metrics_registry
            .as_ref()
            .map(|r| Arc::new(ProcessMetrics::new(r.as_ref())));

        // Expand numprocs: each def with numprocs > 1 becomes N supervisors
        let mut supervisors: Vec<Arc<ProcessSupervisor>> = Vec::new();
        for def in &self.config.processes {
            let n = def.numprocs.max(1);
            for i in 0..n {
                let mut d = def.clone();
                if n > 1 {
                    d.name = format!("{}:{}", def.name, i);
                }
                supervisors.push(Arc::new(ProcessSupervisor::new(d)));
            }
        }

        // Index by name for RPC lookup
        let by_name: Arc<HashMap<String, Arc<ProcessSupervisor>>> = Arc::new(
            supervisors
                .iter()
                .map(|s| (s.name().to_string(), s.clone()))
                .collect(),
        );

        // Register process.list RPC
        if let Some(reg) = &ctx.rpc_registrar {
            let sups = supervisors.clone();
            reg.register_raw(
                "process.list".into(),
                Arc::new(move |_: Bytes| {
                    let sups = sups.clone();
                    Box::pin(async move {
                        let mut statuses = Vec::new();
                        for s in &sups {
                            let st = s.status().await;
                            statuses.push(format!("{}: {:?}", s.name(), st));
                        }
                        Ok(Bytes::from(serde_json::to_vec(&statuses)?))
                    })
                }),
            )
            .await;

            // Register process.restart RPC
            let names = by_name.clone();
            reg.register_raw(
                "process.restart".into(),
                Arc::new(move |payload: Bytes| {
                    let names = names.clone();
                    Box::pin(async move {
                        let name: String = serde_json::from_slice(&payload)?;
                        if let Some(sup) = names.get(&name) {
                            sup.request_restart();
                            Ok(Bytes::from(serde_json::to_vec(&format!(
                                "restarting {name}"
                            ))?))
                        } else {
                            Err(anyhow::anyhow!("unknown process: {name}"))
                        }
                    })
                }),
            )
            .await;
        }

        // Register health check
        if let Some(health) = &ctx.health_registry {
            let sups = supervisors.clone();
            health
                .register(
                    "process".into(),
                    Arc::new(move || {
                        let sups = sups.clone();
                        Box::pin(async move {
                            let statuses =
                                futures_util::future::join_all(sups.iter().map(|s| s.status()))
                                    .await;
                            let all_ok = statuses
                                .iter()
                                .all(|st| !matches!(st, ProcessStatus::Failed { .. }));
                            if all_ok {
                                folk_api::HealthStatus::ok()
                            } else {
                                folk_api::HealthStatus::degraded("one or more processes failed")
                            }
                        })
                    }),
                )
                .await;
        }

        // Spawn supervisor tasks
        let handles: Vec<tokio::task::JoinHandle<()>> = supervisors
            .iter()
            .map(|s| {
                let s = s.clone();
                let sd = ctx.shutdown.clone();
                let m = metrics.clone();
                tokio::spawn(async move {
                    s.run(sd, m).await;
                })
            })
            .collect();

        // Uptime ticker — periodically update uptime metric
        let uptime_handle = if let Some(m) = &metrics {
            let sups = supervisors.clone();
            let m = m.clone();
            let mut sd = ctx.shutdown.clone();
            Some(tokio::spawn(async move {
                let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
                loop {
                    tokio::select! {
                        _ = interval.tick() => {
                            for s in &sups {
                                let uptime = s.uptime_secs().await;
                                m.set_uptime(s.name(), uptime);
                            }
                        }
                        _ = sd.changed() => break,
                    }
                }
            }))
        } else {
            None
        };

        info!("process plugin running");

        let mut sd = ctx.shutdown.clone();
        sd.changed().await.ok();
        for h in handles {
            let _ = h.await;
        }
        if let Some(h) = uptime_handle {
            h.abort();
        }
        Ok(())
    }

    fn rpc_methods(&self) -> Vec<RpcMethodDef> {
        vec![
            RpcMethodDef::new("process.list", "list supervised process statuses"),
            RpcMethodDef::new("process.restart", "restart a named process"),
        ]
    }
}