Skip to main content

folk_plugin_process/
plugin.rs

1use std::sync::Arc;
2
3use anyhow::Result;
4use async_trait::async_trait;
5use bytes::Bytes;
6use folk_api::{PluginContext, RpcMethodDef, ServerPlugin};
7use tracing::info;
8
9use crate::config::ProcessConfig;
10use crate::supervisor::{ProcessStatus, ProcessSupervisor};
11
12pub struct ProcessPlugin {
13    config: ProcessConfig,
14}
15
16impl ProcessPlugin {
17    pub fn new(config: ProcessConfig) -> Self {
18        Self { config }
19    }
20}
21
22#[async_trait]
23impl ServerPlugin for ProcessPlugin {
24    fn name(&self) -> &'static str {
25        "process"
26    }
27
28    async fn run(&self, ctx: PluginContext) -> Result<()> {
29        let supervisors: Vec<Arc<ProcessSupervisor>> = self
30            .config
31            .processes
32            .iter()
33            .map(|def| Arc::new(ProcessSupervisor::new(def.clone())))
34            .collect();
35
36        if let Some(reg) = &ctx.rpc_registrar {
37            let sups = supervisors.clone();
38            reg.register_raw(
39                "process.list".into(),
40                Arc::new(move |_: Bytes| {
41                    let sups = sups.clone();
42                    Box::pin(async move {
43                        let mut statuses = Vec::new();
44                        for s in &sups {
45                            let st = s.status().await;
46                            statuses.push(format!("{}: {:?}", s.name(), st));
47                        }
48                        Ok(Bytes::from(rmp_serde::to_vec(&statuses)?))
49                    })
50                }),
51            )
52            .await;
53        }
54
55        if let Some(health) = &ctx.health_registry {
56            let sups = supervisors.clone();
57            health
58                .register(
59                    "process".into(),
60                    Arc::new(move || {
61                        let sups = sups.clone();
62                        Box::pin(async move {
63                            let statuses =
64                                futures_util::future::join_all(sups.iter().map(|s| s.status()))
65                                    .await;
66                            let all_ok = statuses
67                                .iter()
68                                .all(|st| !matches!(st, ProcessStatus::Failed { .. }));
69                            if all_ok {
70                                folk_api::HealthStatus::ok()
71                            } else {
72                                folk_api::HealthStatus::degraded("one or more processes failed")
73                            }
74                        })
75                    }),
76                )
77                .await;
78        }
79
80        let handles: Vec<tokio::task::JoinHandle<()>> = supervisors
81            .into_iter()
82            .map(|s| {
83                let sd = ctx.shutdown.clone();
84                tokio::spawn(async move {
85                    s.run(sd).await;
86                })
87            })
88            .collect();
89
90        info!("process plugin running");
91
92        let mut sd = ctx.shutdown.clone();
93        sd.changed().await.ok();
94        for h in handles {
95            let _ = h.await;
96        }
97        Ok(())
98    }
99
100    fn rpc_methods(&self) -> Vec<RpcMethodDef> {
101        vec![
102            RpcMethodDef::new("process.list", "list supervised process statuses"),
103            RpcMethodDef::new("process.restart", "restart a named process"),
104        ]
105    }
106}