Skip to main content

greentic_start/
supervisor.rs

1#![allow(dead_code)]
2
3use std::collections::BTreeMap;
4use std::path::{Path, PathBuf};
5use std::process::{Command, Stdio};
6use std::time::{Duration, Instant};
7
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10use sysinfo::{Pid, ProcessesToUpdate, System};
11
12use crate::runtime_state::{RuntimePaths, read_json, write_json};
13
14#[derive(Clone, Debug, Eq, PartialEq, Hash)]
15pub struct ServiceId(String);
16
17impl ServiceId {
18    pub fn new(value: impl Into<String>) -> anyhow::Result<Self> {
19        let value = value.into();
20        if value.is_empty() {
21            return Err(anyhow::anyhow!("service id cannot be empty"));
22        }
23        if !value
24            .chars()
25            .all(|ch| ch.is_ascii_alphanumeric() || ch == '-' || ch == '_')
26        {
27            return Err(anyhow::anyhow!(
28                "invalid service id '{}'; use alphanumeric, '-' or '_'",
29                value
30            ));
31        }
32        Ok(Self(value))
33    }
34
35    pub fn as_str(&self) -> &str {
36        &self.0
37    }
38}
39
40#[derive(Clone, Debug)]
41pub struct ServiceSpec {
42    pub id: ServiceId,
43    pub argv: Vec<String>,
44    pub cwd: Option<PathBuf>,
45    pub env: BTreeMap<String, String>,
46}
47
48#[derive(Clone, Debug)]
49pub struct ServiceHandle {
50    pub id: ServiceId,
51    pub pid: u32,
52    pub started_at: DateTime<Utc>,
53    pub log_path: PathBuf,
54}
55
56#[derive(Clone, Debug)]
57pub struct ServiceStatus {
58    pub id: ServiceId,
59    pub running: bool,
60    pub pid: Option<u32>,
61    pub log_path: PathBuf,
62    pub last_error: Option<String>,
63}
64
65#[derive(Clone, Debug, Serialize, Deserialize)]
66pub struct ResolvedService {
67    pub argv: Vec<String>,
68    pub cwd: Option<PathBuf>,
69    pub env: BTreeMap<String, String>,
70    #[serde(default)]
71    pub log_path: Option<PathBuf>,
72}
73
74pub fn spawn_service(
75    paths: &RuntimePaths,
76    spec: ServiceSpec,
77    log_path_override: Option<PathBuf>,
78) -> anyhow::Result<ServiceHandle> {
79    if spec.argv.is_empty() {
80        return Err(anyhow::anyhow!("service argv cannot be empty"));
81    }
82    let pid_path = paths.pid_path(spec.id.as_str());
83    if let Some(pid) = read_pid(&pid_path)?
84        && is_running(pid)
85    {
86        return Err(anyhow::anyhow!(
87            "service {} already running (pid {})",
88            spec.id.as_str(),
89            pid
90        ));
91    }
92
93    let log_path = log_path_override.unwrap_or_else(|| paths.log_path(spec.id.as_str()));
94    if let Some(parent) = log_path.parent() {
95        std::fs::create_dir_all(parent)?;
96    }
97    let log_file = std::fs::OpenOptions::new()
98        .create(true)
99        .append(true)
100        .open(&log_path)?;
101    let log_err = log_file.try_clone()?;
102
103    let mut command = Command::new(&spec.argv[0]);
104    if spec.argv.len() > 1 {
105        command.args(&spec.argv[1..]);
106    }
107    if let Some(cwd) = &spec.cwd {
108        command.current_dir(cwd);
109    }
110    command.envs(spec.env.iter());
111    let child = command
112        .stdout(Stdio::from(log_file))
113        .stderr(Stdio::from(log_err))
114        .spawn()?;
115
116    let pid = child.id();
117    std::fs::create_dir_all(paths.pids_dir())?;
118    std::fs::write(&pid_path, pid.to_string())?;
119
120    let resolved = ResolvedService {
121        argv: spec.argv.clone(),
122        cwd: spec.cwd.clone(),
123        env: spec.env.clone(),
124        log_path: Some(log_path.clone()),
125    };
126    write_json(&paths.resolved_path(spec.id.as_str()), &resolved)?;
127
128    Ok(ServiceHandle {
129        id: spec.id,
130        pid,
131        started_at: Utc::now(),
132        log_path,
133    })
134}
135
136pub fn stop_service(
137    paths: &RuntimePaths,
138    id: &ServiceId,
139    graceful_timeout_ms: u64,
140) -> anyhow::Result<()> {
141    let pid_path = paths.pid_path(id.as_str());
142    stop_pidfile(&pid_path, graceful_timeout_ms)
143}
144
145pub fn stop_pidfile(pid_path: &Path, graceful_timeout_ms: u64) -> anyhow::Result<()> {
146    let pid = match read_pid(pid_path)? {
147        Some(pid) => pid,
148        None => return Ok(()),
149    };
150
151    if !is_running(pid) {
152        let _ = std::fs::remove_file(pid_path);
153        return Ok(());
154    }
155
156    terminate_process(pid, graceful_timeout_ms)?;
157    let _ = std::fs::remove_file(pid_path);
158    Ok(())
159}
160
161pub fn read_status(paths: &RuntimePaths) -> anyhow::Result<Vec<ServiceStatus>> {
162    let mut statuses = Vec::new();
163    let pids_dir = paths.pids_dir();
164    if !pids_dir.exists() {
165        return Ok(statuses);
166    }
167    for entry in std::fs::read_dir(&pids_dir)? {
168        let entry = entry?;
169        if !entry.file_type()?.is_file() {
170            continue;
171        }
172        let path = entry.path();
173        if path.extension().and_then(|ext| ext.to_str()) != Some("pid") {
174            continue;
175        }
176        let file_name = entry.file_name();
177        let Some(stem) = Path::new(&file_name).file_stem().and_then(|s| s.to_str()) else {
178            continue;
179        };
180        let id = ServiceId::new(stem.to_string())?;
181        let pid = read_pid(&path)?;
182        let running = pid.map(is_running).unwrap_or(false);
183        let log_path = if let Some(resolved) = read_resolved(paths, &id)? {
184            resolved
185                .log_path
186                .or_else(|| Some(paths.log_path(stem)))
187                .unwrap()
188        } else {
189            paths.log_path(stem)
190        };
191        statuses.push(ServiceStatus {
192            id,
193            running,
194            pid,
195            log_path,
196            last_error: None,
197        });
198    }
199    Ok(statuses)
200}
201
202pub fn read_resolved(
203    paths: &RuntimePaths,
204    id: &ServiceId,
205) -> anyhow::Result<Option<ResolvedService>> {
206    read_json(&paths.resolved_path(id.as_str()))
207}
208
209pub fn is_running(pid: u32) -> bool {
210    let mut system = System::new();
211    let pid = Pid::from_u32(pid);
212    system.refresh_processes(ProcessesToUpdate::Some(&[pid]), true);
213    system.process(pid).is_some()
214}
215
216fn read_pid(pid_path: &Path) -> anyhow::Result<Option<u32>> {
217    if !pid_path.exists() {
218        return Ok(None);
219    }
220    let contents = std::fs::read_to_string(pid_path)?;
221    let trimmed = contents.trim();
222    if trimmed.is_empty() {
223        return Ok(None);
224    }
225    Ok(Some(trimmed.parse()?))
226}
227
228fn terminate_process(pid: u32, graceful_timeout_ms: u64) -> anyhow::Result<()> {
229    #[cfg(unix)]
230    {
231        let _ = unsafe { libc::kill(pid as i32, libc::SIGTERM) };
232        let deadline = Instant::now() + Duration::from_millis(graceful_timeout_ms);
233        while Instant::now() < deadline {
234            if !is_running(pid) {
235                return Ok(());
236            }
237            std::thread::sleep(Duration::from_millis(50));
238        }
239        let _ = unsafe { libc::kill(pid as i32, libc::SIGKILL) };
240        Ok(())
241    }
242
243    #[cfg(windows)]
244    {
245        let _ = graceful_timeout_ms;
246        let _ = Command::new("taskkill")
247            .args(["/PID", &pid.to_string(), "/T", "/F"])
248            .status();
249        Ok(())
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use super::*;
256    use tempfile::tempdir;
257
258    #[test]
259    fn service_id_validates_expected_formats() {
260        assert_eq!(
261            ServiceId::new("worker_1").expect("service id").as_str(),
262            "worker_1"
263        );
264        assert!(ServiceId::new("").is_err());
265        assert!(ServiceId::new("bad/id").is_err());
266    }
267
268    #[test]
269    fn read_status_returns_empty_without_pid_directory() {
270        let dir = tempdir().expect("tempdir");
271        let paths = RuntimePaths::new(dir.path().join("state"), "demo", "default");
272        assert!(read_status(&paths).expect("read status").is_empty());
273    }
274
275    #[test]
276    fn read_status_uses_resolved_log_path_and_running_flag() {
277        let dir = tempdir().expect("tempdir");
278        let paths = RuntimePaths::new(dir.path().join("state"), "demo", "default");
279        let id = ServiceId::new("svc").expect("id");
280        std::fs::create_dir_all(paths.pids_dir()).expect("pids dir");
281        std::fs::write(paths.pid_path(id.as_str()), "999999").expect("pid");
282        write_json(
283            &paths.resolved_path(id.as_str()),
284            &ResolvedService {
285                argv: vec!["echo".to_string()],
286                cwd: None,
287                env: BTreeMap::new(),
288                log_path: Some(dir.path().join("custom.log")),
289            },
290        )
291        .expect("resolved");
292
293        let statuses = read_status(&paths).expect("statuses");
294        assert_eq!(statuses.len(), 1);
295        assert_eq!(statuses[0].id.as_str(), "svc");
296        assert!(!statuses[0].running);
297        assert_eq!(statuses[0].pid, Some(999999));
298        assert_eq!(statuses[0].log_path, dir.path().join("custom.log"));
299    }
300
301    #[test]
302    fn read_resolved_returns_none_for_missing_file() {
303        let dir = tempdir().expect("tempdir");
304        let paths = RuntimePaths::new(dir.path().join("state"), "demo", "default");
305        let id = ServiceId::new("svc").expect("id");
306        assert!(read_resolved(&paths, &id).expect("read resolved").is_none());
307    }
308
309    #[test]
310    fn stop_pidfile_and_read_pid_handle_missing_and_stale_files() {
311        let dir = tempdir().expect("tempdir");
312        let pid_path = dir.path().join("svc.pid");
313
314        stop_pidfile(&pid_path, 1).expect("missing pidfile");
315        assert_eq!(read_pid(&pid_path).expect("missing pid"), None);
316
317        std::fs::write(&pid_path, "999999").expect("stale pid");
318        stop_pidfile(&pid_path, 1).expect("stale pidfile");
319        assert!(!pid_path.exists());
320    }
321}