folk_plugin_process/
plugin.rs1use std::sync::Arc;
2
3use anyhow::Result;
4use async_trait::async_trait;
5use bytes::Bytes;
6use folk_api::{PluginContext, RpcMethodDef, ServerPlugin};
7use tracing::info;
8
9use crate::config::ProcessConfig;
10use crate::supervisor::{ProcessStatus, ProcessSupervisor};
11
12pub struct ProcessPlugin {
13 config: ProcessConfig,
14}
15
16impl ProcessPlugin {
17 pub fn new(config: ProcessConfig) -> Self {
18 Self { config }
19 }
20}
21
22#[async_trait]
23impl ServerPlugin for ProcessPlugin {
24 fn name(&self) -> &'static str {
25 "process"
26 }
27
28 async fn run(&self, ctx: PluginContext) -> Result<()> {
29 let supervisors: Vec<Arc<ProcessSupervisor>> = self
30 .config
31 .processes
32 .iter()
33 .map(|def| Arc::new(ProcessSupervisor::new(def.clone())))
34 .collect();
35
36 if let Some(reg) = &ctx.rpc_registrar {
37 let sups = supervisors.clone();
38 reg.register_raw(
39 "process.list".into(),
40 Arc::new(move |_: Bytes| {
41 let sups = sups.clone();
42 Box::pin(async move {
43 let mut statuses = Vec::new();
44 for s in &sups {
45 let st = s.status().await;
46 statuses.push(format!("{}: {:?}", s.name(), st));
47 }
48 Ok(Bytes::from(rmp_serde::to_vec(&statuses)?))
49 })
50 }),
51 )
52 .await;
53 }
54
55 if let Some(health) = &ctx.health_registry {
56 let sups = supervisors.clone();
57 health
58 .register(
59 "process".into(),
60 Arc::new(move || {
61 let sups = sups.clone();
62 Box::pin(async move {
63 let statuses =
64 futures_util::future::join_all(sups.iter().map(|s| s.status()))
65 .await;
66 let all_ok = statuses
67 .iter()
68 .all(|st| !matches!(st, ProcessStatus::Failed { .. }));
69 if all_ok {
70 folk_api::HealthStatus::ok()
71 } else {
72 folk_api::HealthStatus::degraded("one or more processes failed")
73 }
74 })
75 }),
76 )
77 .await;
78 }
79
80 let handles: Vec<tokio::task::JoinHandle<()>> = supervisors
81 .into_iter()
82 .map(|s| {
83 let sd = ctx.shutdown.clone();
84 tokio::spawn(async move {
85 s.run(sd).await;
86 })
87 })
88 .collect();
89
90 info!("process plugin running");
91
92 let mut sd = ctx.shutdown.clone();
93 sd.changed().await.ok();
94 for h in handles {
95 let _ = h.await;
96 }
97 Ok(())
98 }
99
100 fn rpc_methods(&self) -> Vec<RpcMethodDef> {
101 vec![
102 RpcMethodDef::new("process.list", "list supervised process statuses"),
103 RpcMethodDef::new("process.restart", "restart a named process"),
104 ]
105 }
106}