Skip to main content

folk_plugin_process/
supervisor.rs

1use std::process::Stdio;
2use std::sync::Arc;
3use std::time::Instant;
4
5use tokio::process::Command;
6use tokio::sync::{Mutex, Notify, watch};
7use tracing::{error, info, warn};
8
9use crate::config::{OutputTarget, ProcessDef};
10use crate::metrics::ProcessMetrics;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum ProcessStatus {
14    Starting,
15    Running,
16    Stopped,
17    Failed { restarts: u32 },
18}
19
20pub struct ProcessSupervisor {
21    def: ProcessDef,
22    status: Arc<Mutex<ProcessStatus>>,
23    restart_notify: Arc<Notify>,
24    started_at: Arc<Mutex<Option<Instant>>>,
25    last_exit_code: Arc<Mutex<Option<i32>>>,
26}
27
28impl ProcessSupervisor {
29    pub fn new(def: ProcessDef) -> Self {
30        Self {
31            def,
32            status: Arc::new(Mutex::new(ProcessStatus::Starting)),
33            restart_notify: Arc::new(Notify::new()),
34            started_at: Arc::new(Mutex::new(None)),
35            last_exit_code: Arc::new(Mutex::new(None)),
36        }
37    }
38
39    pub fn name(&self) -> &str {
40        &self.def.name
41    }
42
43    pub async fn status(&self) -> ProcessStatus {
44        *self.status.lock().await
45    }
46
47    pub async fn uptime_secs(&self) -> f64 {
48        self.started_at
49            .lock()
50            .await
51            .map(|t| t.elapsed().as_secs_f64())
52            .unwrap_or(0.0)
53    }
54
55    pub async fn last_exit_code(&self) -> Option<i32> {
56        *self.last_exit_code.lock().await
57    }
58
59    pub fn request_restart(&self) {
60        self.restart_notify.notify_one();
61    }
62
63    fn build_command(&self, parts: &[String]) -> Result<Command, String> {
64        let (prog, args) = parts
65            .split_first()
66            .ok_or_else(|| "empty command".to_string())?;
67
68        let mut cmd = Command::new(prog);
69        cmd.args(args);
70
71        if let Some(dir) = &self.def.directory {
72            cmd.current_dir(dir);
73        }
74
75        if !self.def.env.is_empty() {
76            cmd.envs(&self.def.env);
77        }
78
79        cmd.stdout(output_target_to_stdio(&self.def.logging.stdout));
80        cmd.stderr(output_target_to_stdio(&self.def.logging.stderr));
81
82        Ok(cmd)
83    }
84
85    pub async fn run(
86        self: Arc<Self>,
87        mut shutdown: watch::Receiver<bool>,
88        metrics: Option<Arc<ProcessMetrics>>,
89    ) {
90        let mut restarts: u32 = 0;
91
92        let parts = match shell_words::split(&self.def.command) {
93            Ok(p) if !p.is_empty() => p,
94            Ok(_) => {
95                error!(process = %self.def.name, "empty command");
96                *self.status.lock().await = ProcessStatus::Failed { restarts: 0 };
97                return;
98            }
99            Err(e) => {
100                error!(process = %self.def.name, error = %e, "command parse failed");
101                *self.status.lock().await = ProcessStatus::Failed { restarts: 0 };
102                return;
103            }
104        };
105
106        loop {
107            *self.status.lock().await = ProcessStatus::Running;
108            *self.started_at.lock().await = Some(Instant::now());
109
110            if let Some(m) = &metrics {
111                m.set_up(&self.def.name, true);
112                m.set_status(&self.def.name, "running");
113            }
114
115            let mut cmd = match self.build_command(&parts) {
116                Ok(c) => c,
117                Err(e) => {
118                    error!(process = %self.def.name, error = %e, "build command failed");
119                    *self.status.lock().await = ProcessStatus::Failed { restarts };
120                    if let Some(m) = &metrics {
121                        m.set_up(&self.def.name, false);
122                        m.set_status(&self.def.name, "failed");
123                    }
124                    return;
125                }
126            };
127
128            let mut child = match cmd.spawn() {
129                Ok(c) => c,
130                Err(e) => {
131                    error!(process = %self.def.name, error = ?e, "spawn failed");
132                    *self.status.lock().await = ProcessStatus::Failed { restarts };
133                    if let Some(m) = &metrics {
134                        m.set_up(&self.def.name, false);
135                        m.set_status(&self.def.name, "failed");
136                    }
137                    return;
138                }
139            };
140
141            info!(process = %self.def.name, pid = ?child.id(), "started");
142
143            tokio::select! {
144                status = child.wait() => {
145                    let code = status.map(|s| s.code()).unwrap_or(None);
146                    *self.last_exit_code.lock().await = code;
147
148                    if let Some(m) = &metrics {
149                        if let Some(c) = code {
150                            m.set_exit_code(&self.def.name, c);
151                        }
152                    }
153
154                    let should_restart = match self.def.restart {
155                        crate::config::RestartPolicy::Always => true,
156                        crate::config::RestartPolicy::OnFailure => code != Some(0),
157                        crate::config::RestartPolicy::Never => false,
158                    };
159
160                    if !should_restart || restarts >= self.def.max_restarts {
161                        warn!(process = %self.def.name, exit_code = ?code, "not restarting");
162                        *self.status.lock().await = ProcessStatus::Failed { restarts };
163                        if let Some(m) = &metrics {
164                            m.set_up(&self.def.name, false);
165                            m.set_status(&self.def.name, "stopped");
166                        }
167                        return;
168                    }
169
170                    restarts += 1;
171                    warn!(process = %self.def.name, restarts, "restarting");
172                    if let Some(m) = &metrics {
173                        m.inc_restarts(&self.def.name);
174                        m.set_up(&self.def.name, false);
175                    }
176                    tokio::time::sleep(self.def.restart_delay).await;
177                }
178                _ = self.restart_notify.notified() => {
179                    info!(process = %self.def.name, "restart requested");
180                    kill_and_wait(&mut child, &self.def).await;
181                    if let Some(m) = &metrics {
182                        m.inc_restarts(&self.def.name);
183                    }
184                    // Loop continues — will spawn again
185                }
186                _ = shutdown.changed() => {
187                    if *shutdown.borrow() {
188                        kill_and_wait(&mut child, &self.def).await;
189                        *self.status.lock().await = ProcessStatus::Stopped;
190                        if let Some(m) = &metrics {
191                            m.set_up(&self.def.name, false);
192                            m.set_status(&self.def.name, "stopped");
193                        }
194                        return;
195                    }
196                }
197            }
198        }
199    }
200}
201
202async fn kill_and_wait(child: &mut tokio::process::Child, def: &ProcessDef) {
203    #[cfg(unix)]
204    if let Some(pid) = child.id() {
205        unsafe {
206            libc::kill(pid as libc::pid_t, def.stop_signal.as_libc_signal());
207        }
208    }
209    let _ = tokio::time::timeout(def.stop_timeout, child.wait()).await;
210}
211
212fn output_target_to_stdio(target: &OutputTarget) -> Stdio {
213    match target {
214        OutputTarget::Inherit => Stdio::inherit(),
215        OutputTarget::Null => Stdio::null(),
216        OutputTarget::File(path) => {
217            match std::fs::OpenOptions::new()
218                .create(true)
219                .append(true)
220                .open(path)
221            {
222                Ok(f) => Stdio::from(f),
223                Err(e) => {
224                    tracing::warn!(path = %path.display(), error = %e, "failed to open log file, falling back to inherit");
225                    Stdio::inherit()
226                }
227            }
228        }
229    }
230}