folk_plugin_process/
supervisor.rs1use std::sync::Arc;
2use std::time::Duration;
3
4use tokio::process::Command;
5use tokio::sync::Mutex;
6use tokio::sync::watch;
7use tracing::{error, info, warn};
8
9use crate::config::{ProcessDef, RestartPolicy};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum ProcessStatus {
13 Starting,
14 Running,
15 Stopped,
16 Failed { restarts: u32 },
17}
18
19pub struct ProcessSupervisor {
20 def: ProcessDef,
21 status: Arc<Mutex<ProcessStatus>>,
22}
23
24impl ProcessSupervisor {
25 pub fn new(def: ProcessDef) -> Self {
26 Self {
27 def,
28 status: Arc::new(Mutex::new(ProcessStatus::Starting)),
29 }
30 }
31
32 pub fn name(&self) -> &str {
33 &self.def.name
34 }
35
36 pub async fn status(&self) -> ProcessStatus {
37 *self.status.lock().await
38 }
39
40 pub async fn run(self: Arc<Self>, mut shutdown: watch::Receiver<bool>) {
41 let mut restarts: u32 = 0;
42
43 loop {
44 *self.status.lock().await = ProcessStatus::Running;
45
46 let parts: Vec<&str> = self.def.command.split_whitespace().collect();
47 let (prog, args) = parts.split_first().unwrap_or((&"true", &[]));
48
49 let mut child = match Command::new(prog).args(args).spawn() {
50 Ok(c) => c,
51 Err(e) => {
52 error!(process = %self.def.name, error = ?e, "spawn failed");
53 *self.status.lock().await = ProcessStatus::Failed { restarts };
54 return;
55 }
56 };
57
58 info!(process = %self.def.name, pid = ?child.id(), "started");
59
60 tokio::select! {
61 status = child.wait() => {
62 let code = status.map(|s| s.code()).unwrap_or(None);
63 let should_restart = match self.def.restart {
64 RestartPolicy::Always => true,
65 RestartPolicy::OnFailure => code != Some(0),
66 RestartPolicy::Never => false,
67 };
68
69 if !should_restart || restarts >= self.def.max_restarts {
70 warn!(process = %self.def.name, "not restarting");
71 *self.status.lock().await = ProcessStatus::Failed { restarts };
72 return;
73 }
74
75 restarts += 1;
76 warn!(process = %self.def.name, restarts, "restarting");
77 tokio::time::sleep(self.def.restart_delay).await;
78 }
79 _ = shutdown.changed() => {
80 if *shutdown.borrow() {
81 #[cfg(unix)]
82 if let Some(pid) = child.id() {
83 unsafe { libc::kill(pid as libc::pid_t, libc::SIGTERM); }
84 }
85 let _ = tokio::time::timeout(Duration::from_secs(5), child.wait()).await;
86 *self.status.lock().await = ProcessStatus::Stopped;
87 return;
88 }
89 }
90 }
91 }
92 }
93}