folk_plugin_process/
plugin.rs1use std::collections::HashMap;
2use std::sync::Arc;
3
4use anyhow::Result;
5use async_trait::async_trait;
6use bytes::Bytes;
7use folk_api::{PluginContext, RpcMethodDef, ServerPlugin};
8use tracing::info;
9
10use crate::config::ProcessConfig;
11use crate::metrics::ProcessMetrics;
12use crate::supervisor::{ProcessStatus, ProcessSupervisor};
13
14pub struct ProcessPlugin {
15 config: ProcessConfig,
16}
17
18impl ProcessPlugin {
19 pub fn new(config: ProcessConfig) -> Self {
20 Self { config }
21 }
22}
23
24#[async_trait]
25impl ServerPlugin for ProcessPlugin {
26 fn name(&self) -> &'static str {
27 "process"
28 }
29
30 async fn run(&self, ctx: PluginContext) -> Result<()> {
31 let metrics = ctx
32 .metrics_registry
33 .as_ref()
34 .map(|r| Arc::new(ProcessMetrics::new(r.as_ref())));
35
36 let mut supervisors: Vec<Arc<ProcessSupervisor>> = Vec::new();
38 for def in &self.config.processes {
39 let n = def.numprocs.max(1);
40 for i in 0..n {
41 let mut d = def.clone();
42 if n > 1 {
43 d.name = format!("{}:{}", def.name, i);
44 }
45 supervisors.push(Arc::new(ProcessSupervisor::new(d)));
46 }
47 }
48
49 let by_name: Arc<HashMap<String, Arc<ProcessSupervisor>>> = Arc::new(
51 supervisors
52 .iter()
53 .map(|s| (s.name().to_string(), s.clone()))
54 .collect(),
55 );
56
57 if let Some(reg) = &ctx.rpc_registrar {
59 let sups = supervisors.clone();
60 reg.register_raw(
61 "process.list".into(),
62 Arc::new(move |_: Bytes| {
63 let sups = sups.clone();
64 Box::pin(async move {
65 let mut statuses = Vec::new();
66 for s in &sups {
67 let st = s.status().await;
68 statuses.push(format!("{}: {:?}", s.name(), st));
69 }
70 Ok(Bytes::from(serde_json::to_vec(&statuses)?))
71 })
72 }),
73 )
74 .await;
75
76 let names = by_name.clone();
78 reg.register_raw(
79 "process.restart".into(),
80 Arc::new(move |payload: Bytes| {
81 let names = names.clone();
82 Box::pin(async move {
83 let name: String = serde_json::from_slice(&payload)?;
84 if let Some(sup) = names.get(&name) {
85 sup.request_restart();
86 Ok(Bytes::from(serde_json::to_vec(&format!(
87 "restarting {name}"
88 ))?))
89 } else {
90 Err(anyhow::anyhow!("unknown process: {name}"))
91 }
92 })
93 }),
94 )
95 .await;
96 }
97
98 if let Some(health) = &ctx.health_registry {
100 let sups = supervisors.clone();
101 health
102 .register(
103 "process".into(),
104 Arc::new(move || {
105 let sups = sups.clone();
106 Box::pin(async move {
107 let statuses =
108 futures_util::future::join_all(sups.iter().map(|s| s.status()))
109 .await;
110 let all_ok = statuses
111 .iter()
112 .all(|st| !matches!(st, ProcessStatus::Failed { .. }));
113 if all_ok {
114 folk_api::HealthStatus::ok()
115 } else {
116 folk_api::HealthStatus::degraded("one or more processes failed")
117 }
118 })
119 }),
120 )
121 .await;
122 }
123
124 let handles: Vec<tokio::task::JoinHandle<()>> = supervisors
126 .iter()
127 .map(|s| {
128 let s = s.clone();
129 let sd = ctx.shutdown.clone();
130 let m = metrics.clone();
131 tokio::spawn(async move {
132 s.run(sd, m).await;
133 })
134 })
135 .collect();
136
137 let uptime_handle = if let Some(m) = &metrics {
139 let sups = supervisors.clone();
140 let m = m.clone();
141 let mut sd = ctx.shutdown.clone();
142 Some(tokio::spawn(async move {
143 let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
144 loop {
145 tokio::select! {
146 _ = interval.tick() => {
147 for s in &sups {
148 let uptime = s.uptime_secs().await;
149 m.set_uptime(s.name(), uptime);
150 }
151 }
152 _ = sd.changed() => break,
153 }
154 }
155 }))
156 } else {
157 None
158 };
159
160 info!("process plugin running");
161
162 let mut sd = ctx.shutdown.clone();
163 sd.changed().await.ok();
164 for h in handles {
165 let _ = h.await;
166 }
167 if let Some(h) = uptime_handle {
168 h.abort();
169 }
170 Ok(())
171 }
172
173 fn rpc_methods(&self) -> Vec<RpcMethodDef> {
174 vec![
175 RpcMethodDef::new("process.list", "list supervised process statuses"),
176 RpcMethodDef::new("process.restart", "restart a named process"),
177 ]
178 }
179}