folk_plugin_process/
supervisor.rs1use std::process::Stdio;
2use std::sync::Arc;
3use std::time::Instant;
4
5use tokio::process::Command;
6use tokio::sync::{Mutex, Notify, watch};
7use tracing::{error, info, warn};
8
9use crate::config::{OutputTarget, ProcessDef};
10use crate::metrics::ProcessMetrics;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum ProcessStatus {
14 Starting,
15 Running,
16 Stopped,
17 Failed { restarts: u32 },
18}
19
20pub struct ProcessSupervisor {
21 def: ProcessDef,
22 status: Arc<Mutex<ProcessStatus>>,
23 restart_notify: Arc<Notify>,
24 started_at: Arc<Mutex<Option<Instant>>>,
25 last_exit_code: Arc<Mutex<Option<i32>>>,
26}
27
28impl ProcessSupervisor {
29 pub fn new(def: ProcessDef) -> Self {
30 Self {
31 def,
32 status: Arc::new(Mutex::new(ProcessStatus::Starting)),
33 restart_notify: Arc::new(Notify::new()),
34 started_at: Arc::new(Mutex::new(None)),
35 last_exit_code: Arc::new(Mutex::new(None)),
36 }
37 }
38
39 pub fn name(&self) -> &str {
40 &self.def.name
41 }
42
43 pub async fn status(&self) -> ProcessStatus {
44 *self.status.lock().await
45 }
46
47 pub async fn uptime_secs(&self) -> f64 {
48 self.started_at
49 .lock()
50 .await
51 .map(|t| t.elapsed().as_secs_f64())
52 .unwrap_or(0.0)
53 }
54
55 pub async fn last_exit_code(&self) -> Option<i32> {
56 *self.last_exit_code.lock().await
57 }
58
59 pub fn request_restart(&self) {
60 self.restart_notify.notify_one();
61 }
62
63 fn build_command(&self, parts: &[String]) -> Result<Command, String> {
64 let (prog, args) = parts
65 .split_first()
66 .ok_or_else(|| "empty command".to_string())?;
67
68 let mut cmd = Command::new(prog);
69 cmd.args(args);
70
71 if let Some(dir) = &self.def.directory {
72 cmd.current_dir(dir);
73 }
74
75 if !self.def.env.is_empty() {
76 cmd.envs(&self.def.env);
77 }
78
79 cmd.stdout(output_target_to_stdio(&self.def.logging.stdout));
80 cmd.stderr(output_target_to_stdio(&self.def.logging.stderr));
81
82 Ok(cmd)
83 }
84
85 pub async fn run(
86 self: Arc<Self>,
87 mut shutdown: watch::Receiver<bool>,
88 metrics: Option<Arc<ProcessMetrics>>,
89 ) {
90 let mut restarts: u32 = 0;
91
92 let parts = match shell_words::split(&self.def.command) {
93 Ok(p) if !p.is_empty() => p,
94 Ok(_) => {
95 error!(process = %self.def.name, "empty command");
96 *self.status.lock().await = ProcessStatus::Failed { restarts: 0 };
97 return;
98 }
99 Err(e) => {
100 error!(process = %self.def.name, error = %e, "command parse failed");
101 *self.status.lock().await = ProcessStatus::Failed { restarts: 0 };
102 return;
103 }
104 };
105
106 loop {
107 *self.status.lock().await = ProcessStatus::Running;
108 *self.started_at.lock().await = Some(Instant::now());
109
110 if let Some(m) = &metrics {
111 m.set_up(&self.def.name, true);
112 m.set_status(&self.def.name, "running");
113 }
114
115 let mut cmd = match self.build_command(&parts) {
116 Ok(c) => c,
117 Err(e) => {
118 error!(process = %self.def.name, error = %e, "build command failed");
119 *self.status.lock().await = ProcessStatus::Failed { restarts };
120 if let Some(m) = &metrics {
121 m.set_up(&self.def.name, false);
122 m.set_status(&self.def.name, "failed");
123 }
124 return;
125 }
126 };
127
128 let mut child = match cmd.spawn() {
129 Ok(c) => c,
130 Err(e) => {
131 error!(process = %self.def.name, error = ?e, "spawn failed");
132 *self.status.lock().await = ProcessStatus::Failed { restarts };
133 if let Some(m) = &metrics {
134 m.set_up(&self.def.name, false);
135 m.set_status(&self.def.name, "failed");
136 }
137 return;
138 }
139 };
140
141 info!(process = %self.def.name, pid = ?child.id(), "started");
142
143 tokio::select! {
144 status = child.wait() => {
145 let code = status.map(|s| s.code()).unwrap_or(None);
146 *self.last_exit_code.lock().await = code;
147
148 if let Some(m) = &metrics {
149 if let Some(c) = code {
150 m.set_exit_code(&self.def.name, c);
151 }
152 }
153
154 let should_restart = match self.def.restart {
155 crate::config::RestartPolicy::Always => true,
156 crate::config::RestartPolicy::OnFailure => code != Some(0),
157 crate::config::RestartPolicy::Never => false,
158 };
159
160 if !should_restart || restarts >= self.def.max_restarts {
161 warn!(process = %self.def.name, exit_code = ?code, "not restarting");
162 *self.status.lock().await = ProcessStatus::Failed { restarts };
163 if let Some(m) = &metrics {
164 m.set_up(&self.def.name, false);
165 m.set_status(&self.def.name, "stopped");
166 }
167 return;
168 }
169
170 restarts += 1;
171 warn!(process = %self.def.name, restarts, "restarting");
172 if let Some(m) = &metrics {
173 m.inc_restarts(&self.def.name);
174 m.set_up(&self.def.name, false);
175 }
176 tokio::time::sleep(self.def.restart_delay).await;
177 }
178 _ = self.restart_notify.notified() => {
179 info!(process = %self.def.name, "restart requested");
180 kill_and_wait(&mut child, &self.def).await;
181 if let Some(m) = &metrics {
182 m.inc_restarts(&self.def.name);
183 }
184 }
186 _ = shutdown.changed() => {
187 if *shutdown.borrow() {
188 kill_and_wait(&mut child, &self.def).await;
189 *self.status.lock().await = ProcessStatus::Stopped;
190 if let Some(m) = &metrics {
191 m.set_up(&self.def.name, false);
192 m.set_status(&self.def.name, "stopped");
193 }
194 return;
195 }
196 }
197 }
198 }
199 }
200}
201
202async fn kill_and_wait(child: &mut tokio::process::Child, def: &ProcessDef) {
203 #[cfg(unix)]
204 if let Some(pid) = child.id() {
205 unsafe {
206 libc::kill(pid as libc::pid_t, def.stop_signal.as_libc_signal());
207 }
208 }
209 let _ = tokio::time::timeout(def.stop_timeout, child.wait()).await;
210}
211
212fn output_target_to_stdio(target: &OutputTarget) -> Stdio {
213 match target {
214 OutputTarget::Inherit => Stdio::inherit(),
215 OutputTarget::Null => Stdio::null(),
216 OutputTarget::File(path) => {
217 match std::fs::OpenOptions::new()
218 .create(true)
219 .append(true)
220 .open(path)
221 {
222 Ok(f) => Stdio::from(f),
223 Err(e) => {
224 tracing::warn!(path = %path.display(), error = %e, "failed to open log file, falling back to inherit");
225 Stdio::inherit()
226 }
227 }
228 }
229 }
230}