Skip to main content

folk_plugin_process/
plugin.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use anyhow::Result;
5use async_trait::async_trait;
6use bytes::Bytes;
7use folk_api::{PluginContext, RpcMethodDef, ServerPlugin};
8use tracing::info;
9
10use crate::config::ProcessConfig;
11use crate::metrics::ProcessMetrics;
12use crate::supervisor::{ProcessStatus, ProcessSupervisor};
13
14pub struct ProcessPlugin {
15    config: ProcessConfig,
16}
17
18impl ProcessPlugin {
19    pub fn new(config: ProcessConfig) -> Self {
20        Self { config }
21    }
22}
23
24#[async_trait]
25impl ServerPlugin for ProcessPlugin {
26    fn name(&self) -> &'static str {
27        "process"
28    }
29
30    async fn run(&self, ctx: PluginContext) -> Result<()> {
31        let metrics = ctx
32            .metrics_registry
33            .as_ref()
34            .map(|r| Arc::new(ProcessMetrics::new(r.as_ref())));
35
36        // Expand numprocs: each def with numprocs > 1 becomes N supervisors
37        let mut supervisors: Vec<Arc<ProcessSupervisor>> = Vec::new();
38        for def in &self.config.processes {
39            let n = def.numprocs.max(1);
40            for i in 0..n {
41                let mut d = def.clone();
42                if n > 1 {
43                    d.name = format!("{}:{}", def.name, i);
44                }
45                supervisors.push(Arc::new(ProcessSupervisor::new(d)));
46            }
47        }
48
49        // Index by name for RPC lookup
50        let by_name: Arc<HashMap<String, Arc<ProcessSupervisor>>> = Arc::new(
51            supervisors
52                .iter()
53                .map(|s| (s.name().to_string(), s.clone()))
54                .collect(),
55        );
56
57        // Register process.list RPC
58        if let Some(reg) = &ctx.rpc_registrar {
59            let sups = supervisors.clone();
60            reg.register_raw(
61                "process.list".into(),
62                Arc::new(move |_: Bytes| {
63                    let sups = sups.clone();
64                    Box::pin(async move {
65                        let mut statuses = Vec::new();
66                        for s in &sups {
67                            let st = s.status().await;
68                            statuses.push(format!("{}: {:?}", s.name(), st));
69                        }
70                        Ok(Bytes::from(serde_json::to_vec(&statuses)?))
71                    })
72                }),
73            )
74            .await;
75
76            // Register process.restart RPC
77            let names = by_name.clone();
78            reg.register_raw(
79                "process.restart".into(),
80                Arc::new(move |payload: Bytes| {
81                    let names = names.clone();
82                    Box::pin(async move {
83                        let name: String = serde_json::from_slice(&payload)?;
84                        if let Some(sup) = names.get(&name) {
85                            sup.request_restart();
86                            Ok(Bytes::from(serde_json::to_vec(&format!(
87                                "restarting {name}"
88                            ))?))
89                        } else {
90                            Err(anyhow::anyhow!("unknown process: {name}"))
91                        }
92                    })
93                }),
94            )
95            .await;
96        }
97
98        // Register health check
99        if let Some(health) = &ctx.health_registry {
100            let sups = supervisors.clone();
101            health
102                .register(
103                    "process".into(),
104                    Arc::new(move || {
105                        let sups = sups.clone();
106                        Box::pin(async move {
107                            let statuses =
108                                futures_util::future::join_all(sups.iter().map(|s| s.status()))
109                                    .await;
110                            let all_ok = statuses
111                                .iter()
112                                .all(|st| !matches!(st, ProcessStatus::Failed { .. }));
113                            if all_ok {
114                                folk_api::HealthStatus::ok()
115                            } else {
116                                folk_api::HealthStatus::degraded("one or more processes failed")
117                            }
118                        })
119                    }),
120                )
121                .await;
122        }
123
124        // Spawn supervisor tasks
125        let handles: Vec<tokio::task::JoinHandle<()>> = supervisors
126            .iter()
127            .map(|s| {
128                let s = s.clone();
129                let sd = ctx.shutdown.clone();
130                let m = metrics.clone();
131                tokio::spawn(async move {
132                    s.run(sd, m).await;
133                })
134            })
135            .collect();
136
137        // Uptime ticker — periodically update uptime metric
138        let uptime_handle = if let Some(m) = &metrics {
139            let sups = supervisors.clone();
140            let m = m.clone();
141            let mut sd = ctx.shutdown.clone();
142            Some(tokio::spawn(async move {
143                let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
144                loop {
145                    tokio::select! {
146                        _ = interval.tick() => {
147                            for s in &sups {
148                                let uptime = s.uptime_secs().await;
149                                m.set_uptime(s.name(), uptime);
150                            }
151                        }
152                        _ = sd.changed() => break,
153                    }
154                }
155            }))
156        } else {
157            None
158        };
159
160        info!("process plugin running");
161
162        let mut sd = ctx.shutdown.clone();
163        sd.changed().await.ok();
164        for h in handles {
165            let _ = h.await;
166        }
167        if let Some(h) = uptime_handle {
168            h.abort();
169        }
170        Ok(())
171    }
172
173    fn rpc_methods(&self) -> Vec<RpcMethodDef> {
174        vec![
175            RpcMethodDef::new("process.list", "list supervised process statuses"),
176            RpcMethodDef::new("process.restart", "restart a named process"),
177        ]
178    }
179}