Skip to main content

folk_plugin_process/
supervisor.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use tokio::process::Command;
5use tokio::sync::Mutex;
6use tokio::sync::watch;
7use tracing::{error, info, warn};
8
9use crate::config::{ProcessDef, RestartPolicy};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum ProcessStatus {
13    Starting,
14    Running,
15    Stopped,
16    Failed { restarts: u32 },
17}
18
19pub struct ProcessSupervisor {
20    def: ProcessDef,
21    status: Arc<Mutex<ProcessStatus>>,
22}
23
24impl ProcessSupervisor {
25    pub fn new(def: ProcessDef) -> Self {
26        Self {
27            def,
28            status: Arc::new(Mutex::new(ProcessStatus::Starting)),
29        }
30    }
31
32    pub fn name(&self) -> &str {
33        &self.def.name
34    }
35
36    pub async fn status(&self) -> ProcessStatus {
37        *self.status.lock().await
38    }
39
40    pub async fn run(self: Arc<Self>, mut shutdown: watch::Receiver<bool>) {
41        let mut restarts: u32 = 0;
42
43        loop {
44            *self.status.lock().await = ProcessStatus::Running;
45
46            let parts: Vec<&str> = self.def.command.split_whitespace().collect();
47            let (prog, args) = parts.split_first().unwrap_or((&"true", &[]));
48
49            let mut child = match Command::new(prog).args(args).spawn() {
50                Ok(c) => c,
51                Err(e) => {
52                    error!(process = %self.def.name, error = ?e, "spawn failed");
53                    *self.status.lock().await = ProcessStatus::Failed { restarts };
54                    return;
55                }
56            };
57
58            info!(process = %self.def.name, pid = ?child.id(), "started");
59
60            tokio::select! {
61                status = child.wait() => {
62                    let code = status.map(|s| s.code()).unwrap_or(None);
63                    let should_restart = match self.def.restart {
64                        RestartPolicy::Always => true,
65                        RestartPolicy::OnFailure => code != Some(0),
66                        RestartPolicy::Never => false,
67                    };
68
69                    if !should_restart || restarts >= self.def.max_restarts {
70                        warn!(process = %self.def.name, "not restarting");
71                        *self.status.lock().await = ProcessStatus::Failed { restarts };
72                        return;
73                    }
74
75                    restarts += 1;
76                    warn!(process = %self.def.name, restarts, "restarting");
77                    tokio::time::sleep(self.def.restart_delay).await;
78                }
79                _ = shutdown.changed() => {
80                    if *shutdown.borrow() {
81                        #[cfg(unix)]
82                        if let Some(pid) = child.id() {
83                            unsafe { libc::kill(pid as libc::pid_t, libc::SIGTERM); }
84                        }
85                        let _ = tokio::time::timeout(Duration::from_secs(5), child.wait()).await;
86                        *self.status.lock().await = ProcessStatus::Stopped;
87                        return;
88                    }
89                }
90            }
91        }
92    }
93}