folk_plugin_process/
plugin.rs1use std::collections::HashMap;
2use std::sync::Arc;
3
4use anyhow::Result;
5use async_trait::async_trait;
6use bytes::Bytes;
7use folk_api::{PluginContext, RpcMethodDef, ServerPlugin};
8use tracing::info;
9
10use crate::config::ProcessConfig;
11use crate::metrics::ProcessMetrics;
12use crate::supervisor::{ProcessStatus, ProcessSupervisor};
13
14pub struct ProcessPlugin {
15 config: ProcessConfig,
16}
17
18impl ProcessPlugin {
19 pub fn new(config: ProcessConfig) -> Self {
20 Self { config }
21 }
22}
23
24#[async_trait]
25impl ServerPlugin for ProcessPlugin {
26 fn name(&self) -> &'static str {
27 "process"
28 }
29
30 async fn run(&self, ctx: PluginContext) -> Result<()> {
31 let metrics = ctx
32 .metrics_registry
33 .as_ref()
34 .map(|r| Arc::new(ProcessMetrics::new(r.as_ref())));
35
36 let mut supervisors: Vec<Arc<ProcessSupervisor>> = Vec::new();
38 for def in &self.config.processes {
39 let n = def.numprocs.max(1);
40 for i in 0..n {
41 let mut d = def.clone();
42 if n > 1 {
43 d.name = format!("{}:{}", def.name, i);
44 }
45 supervisors.push(Arc::new(ProcessSupervisor::new(d)));
46 }
47 }
48
49 let by_name: Arc<HashMap<String, Arc<ProcessSupervisor>>> = Arc::new(
51 supervisors
52 .iter()
53 .map(|s| (s.name().to_string(), s.clone()))
54 .collect(),
55 );
56
57 if let Some(reg) = &ctx.rpc_registrar {
59 let sups = supervisors.clone();
60 reg.register_raw(
61 "process.list".into(),
62 Arc::new(move |_: Bytes| {
63 let sups = sups.clone();
64 Box::pin(async move {
65 let mut statuses = Vec::new();
66 for s in &sups {
67 let st = s.status().await;
68 statuses.push(format!("{}: {:?}", s.name(), st));
69 }
70 Ok(Bytes::from(rmp_serde::to_vec(&statuses)?))
71 })
72 }),
73 )
74 .await;
75
76 let names = by_name.clone();
78 reg.register_raw(
79 "process.restart".into(),
80 Arc::new(move |payload: Bytes| {
81 let names = names.clone();
82 Box::pin(async move {
83 let name: String = rmp_serde::from_slice(&payload)?;
84 if let Some(sup) = names.get(&name) {
85 sup.request_restart();
86 Ok(Bytes::from(rmp_serde::to_vec(&format!(
87 "restarting {name}"
88 ))?))
89 } else {
90 Ok(Bytes::from(rmp_serde::to_vec(&format!(
91 "unknown process: {name}"
92 ))?))
93 }
94 })
95 }),
96 )
97 .await;
98 }
99
100 if let Some(health) = &ctx.health_registry {
102 let sups = supervisors.clone();
103 health
104 .register(
105 "process".into(),
106 Arc::new(move || {
107 let sups = sups.clone();
108 Box::pin(async move {
109 let statuses =
110 futures_util::future::join_all(sups.iter().map(|s| s.status()))
111 .await;
112 let all_ok = statuses
113 .iter()
114 .all(|st| !matches!(st, ProcessStatus::Failed { .. }));
115 if all_ok {
116 folk_api::HealthStatus::ok()
117 } else {
118 folk_api::HealthStatus::degraded("one or more processes failed")
119 }
120 })
121 }),
122 )
123 .await;
124 }
125
126 let handles: Vec<tokio::task::JoinHandle<()>> = supervisors
128 .iter()
129 .map(|s| {
130 let s = s.clone();
131 let sd = ctx.shutdown.clone();
132 let m = metrics.clone();
133 tokio::spawn(async move {
134 s.run(sd, m).await;
135 })
136 })
137 .collect();
138
139 let uptime_handle = if let Some(m) = &metrics {
141 let sups = supervisors.clone();
142 let m = m.clone();
143 let mut sd = ctx.shutdown.clone();
144 Some(tokio::spawn(async move {
145 let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
146 loop {
147 tokio::select! {
148 _ = interval.tick() => {
149 for s in &sups {
150 let uptime = s.uptime_secs().await;
151 m.set_uptime(s.name(), uptime);
152 }
153 }
154 _ = sd.changed() => break,
155 }
156 }
157 }))
158 } else {
159 None
160 };
161
162 info!("process plugin running");
163
164 let mut sd = ctx.shutdown.clone();
165 sd.changed().await.ok();
166 for h in handles {
167 let _ = h.await;
168 }
169 if let Some(h) = uptime_handle {
170 h.abort();
171 }
172 Ok(())
173 }
174
175 fn rpc_methods(&self) -> Vec<RpcMethodDef> {
176 vec![
177 RpcMethodDef::new("process.list", "list supervised process statuses"),
178 RpcMethodDef::new("process.restart", "restart a named process"),
179 ]
180 }
181}