Skip to main content

folk_plugin_process/
plugin.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use anyhow::Result;
5use async_trait::async_trait;
6use bytes::Bytes;
7use folk_api::{PluginContext, RpcMethodDef, ServerPlugin};
8use tracing::info;
9
10use crate::config::ProcessConfig;
11use crate::metrics::ProcessMetrics;
12use crate::supervisor::{ProcessStatus, ProcessSupervisor};
13
14pub struct ProcessPlugin {
15    config: ProcessConfig,
16}
17
18impl ProcessPlugin {
19    pub fn new(config: ProcessConfig) -> Self {
20        Self { config }
21    }
22}
23
24#[async_trait]
25impl ServerPlugin for ProcessPlugin {
26    fn name(&self) -> &'static str {
27        "process"
28    }
29
30    async fn run(&self, ctx: PluginContext) -> Result<()> {
31        let metrics = ctx
32            .metrics_registry
33            .as_ref()
34            .map(|r| Arc::new(ProcessMetrics::new(r.as_ref())));
35
36        // Expand numprocs: each def with numprocs > 1 becomes N supervisors
37        let mut supervisors: Vec<Arc<ProcessSupervisor>> = Vec::new();
38        for def in &self.config.processes {
39            let n = def.numprocs.max(1);
40            for i in 0..n {
41                let mut d = def.clone();
42                if n > 1 {
43                    d.name = format!("{}:{}", def.name, i);
44                }
45                supervisors.push(Arc::new(ProcessSupervisor::new(d)));
46            }
47        }
48
49        // Index by name for RPC lookup
50        let by_name: Arc<HashMap<String, Arc<ProcessSupervisor>>> = Arc::new(
51            supervisors
52                .iter()
53                .map(|s| (s.name().to_string(), s.clone()))
54                .collect(),
55        );
56
57        // Register process.list RPC
58        if let Some(reg) = &ctx.rpc_registrar {
59            let sups = supervisors.clone();
60            reg.register_raw(
61                "process.list".into(),
62                Arc::new(move |_: Bytes| {
63                    let sups = sups.clone();
64                    Box::pin(async move {
65                        let mut statuses = Vec::new();
66                        for s in &sups {
67                            let st = s.status().await;
68                            statuses.push(format!("{}: {:?}", s.name(), st));
69                        }
70                        Ok(Bytes::from(rmp_serde::to_vec(&statuses)?))
71                    })
72                }),
73            )
74            .await;
75
76            // Register process.restart RPC
77            let names = by_name.clone();
78            reg.register_raw(
79                "process.restart".into(),
80                Arc::new(move |payload: Bytes| {
81                    let names = names.clone();
82                    Box::pin(async move {
83                        let name: String = rmp_serde::from_slice(&payload)?;
84                        if let Some(sup) = names.get(&name) {
85                            sup.request_restart();
86                            Ok(Bytes::from(rmp_serde::to_vec(&format!(
87                                "restarting {name}"
88                            ))?))
89                        } else {
90                            Ok(Bytes::from(rmp_serde::to_vec(&format!(
91                                "unknown process: {name}"
92                            ))?))
93                        }
94                    })
95                }),
96            )
97            .await;
98        }
99
100        // Register health check
101        if let Some(health) = &ctx.health_registry {
102            let sups = supervisors.clone();
103            health
104                .register(
105                    "process".into(),
106                    Arc::new(move || {
107                        let sups = sups.clone();
108                        Box::pin(async move {
109                            let statuses =
110                                futures_util::future::join_all(sups.iter().map(|s| s.status()))
111                                    .await;
112                            let all_ok = statuses
113                                .iter()
114                                .all(|st| !matches!(st, ProcessStatus::Failed { .. }));
115                            if all_ok {
116                                folk_api::HealthStatus::ok()
117                            } else {
118                                folk_api::HealthStatus::degraded("one or more processes failed")
119                            }
120                        })
121                    }),
122                )
123                .await;
124        }
125
126        // Spawn supervisor tasks
127        let handles: Vec<tokio::task::JoinHandle<()>> = supervisors
128            .iter()
129            .map(|s| {
130                let s = s.clone();
131                let sd = ctx.shutdown.clone();
132                let m = metrics.clone();
133                tokio::spawn(async move {
134                    s.run(sd, m).await;
135                })
136            })
137            .collect();
138
139        // Uptime ticker — periodically update uptime metric
140        let uptime_handle = if let Some(m) = &metrics {
141            let sups = supervisors.clone();
142            let m = m.clone();
143            let mut sd = ctx.shutdown.clone();
144            Some(tokio::spawn(async move {
145                let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
146                loop {
147                    tokio::select! {
148                        _ = interval.tick() => {
149                            for s in &sups {
150                                let uptime = s.uptime_secs().await;
151                                m.set_uptime(s.name(), uptime);
152                            }
153                        }
154                        _ = sd.changed() => break,
155                    }
156                }
157            }))
158        } else {
159            None
160        };
161
162        info!("process plugin running");
163
164        let mut sd = ctx.shutdown.clone();
165        sd.changed().await.ok();
166        for h in handles {
167            let _ = h.await;
168        }
169        if let Some(h) = uptime_handle {
170            h.abort();
171        }
172        Ok(())
173    }
174
175    fn rpc_methods(&self) -> Vec<RpcMethodDef> {
176        vec![
177            RpcMethodDef::new("process.list", "list supervised process statuses"),
178            RpcMethodDef::new("process.restart", "restart a named process"),
179        ]
180    }
181}