use std::collections::HashMap;
use std::sync::Arc;
use anyhow::Result;
use async_trait::async_trait;
use bytes::Bytes;
use folk_api::{PluginContext, RpcMethodDef, ServerPlugin};
use tracing::info;
use crate::config::ProcessConfig;
use crate::metrics::ProcessMetrics;
use crate::supervisor::{ProcessStatus, ProcessSupervisor};
pub struct ProcessPlugin {
config: ProcessConfig,
}
impl ProcessPlugin {
pub fn new(config: ProcessConfig) -> Self {
Self { config }
}
}
#[async_trait]
impl ServerPlugin for ProcessPlugin {
fn name(&self) -> &'static str {
"process"
}
async fn run(&self, ctx: PluginContext) -> Result<()> {
let metrics = ctx
.metrics_registry
.as_ref()
.map(|r| Arc::new(ProcessMetrics::new(r.as_ref())));
let mut supervisors: Vec<Arc<ProcessSupervisor>> = Vec::new();
for def in &self.config.processes {
let n = def.numprocs.max(1);
for i in 0..n {
let mut d = def.clone();
if n > 1 {
d.name = format!("{}:{}", def.name, i);
}
supervisors.push(Arc::new(ProcessSupervisor::new(d)));
}
}
let by_name: Arc<HashMap<String, Arc<ProcessSupervisor>>> = Arc::new(
supervisors
.iter()
.map(|s| (s.name().to_string(), s.clone()))
.collect(),
);
if let Some(reg) = &ctx.rpc_registrar {
let sups = supervisors.clone();
reg.register_raw(
"process.list".into(),
Arc::new(move |_: Bytes| {
let sups = sups.clone();
Box::pin(async move {
let mut statuses = Vec::new();
for s in &sups {
let st = s.status().await;
statuses.push(format!("{}: {:?}", s.name(), st));
}
Ok(Bytes::from(serde_json::to_vec(&statuses)?))
})
}),
)
.await;
let names = by_name.clone();
reg.register_raw(
"process.restart".into(),
Arc::new(move |payload: Bytes| {
let names = names.clone();
Box::pin(async move {
let name: String = serde_json::from_slice(&payload)?;
if let Some(sup) = names.get(&name) {
sup.request_restart();
Ok(Bytes::from(serde_json::to_vec(&format!(
"restarting {name}"
))?))
} else {
Err(anyhow::anyhow!("unknown process: {name}"))
}
})
}),
)
.await;
}
if let Some(health) = &ctx.health_registry {
let sups = supervisors.clone();
health
.register(
"process".into(),
Arc::new(move || {
let sups = sups.clone();
Box::pin(async move {
let statuses =
futures_util::future::join_all(sups.iter().map(|s| s.status()))
.await;
let all_ok = statuses
.iter()
.all(|st| !matches!(st, ProcessStatus::Failed { .. }));
if all_ok {
folk_api::HealthStatus::ok()
} else {
folk_api::HealthStatus::degraded("one or more processes failed")
}
})
}),
)
.await;
}
let handles: Vec<tokio::task::JoinHandle<()>> = supervisors
.iter()
.map(|s| {
let s = s.clone();
let sd = ctx.shutdown.clone();
let m = metrics.clone();
tokio::spawn(async move {
s.run(sd, m).await;
})
})
.collect();
let uptime_handle = if let Some(m) = &metrics {
let sups = supervisors.clone();
let m = m.clone();
let mut sd = ctx.shutdown.clone();
Some(tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
loop {
tokio::select! {
_ = interval.tick() => {
for s in &sups {
let uptime = s.uptime_secs().await;
m.set_uptime(s.name(), uptime);
}
}
_ = sd.changed() => break,
}
}
}))
} else {
None
};
info!("process plugin running");
let mut sd = ctx.shutdown.clone();
sd.changed().await.ok();
for h in handles {
let _ = h.await;
}
if let Some(h) = uptime_handle {
h.abort();
}
Ok(())
}
fn rpc_methods(&self) -> Vec<RpcMethodDef> {
vec![
RpcMethodDef::new("process.list", "list supervised process statuses"),
RpcMethodDef::new("process.restart", "restart a named process"),
]
}
}