Skip to main content

zeph_core/
daemon.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Daemon supervisor for component lifecycle management.
5
6use std::time::Duration;
7
8use tokio::sync::watch;
9use tokio::task::JoinHandle;
10
11use crate::config::DaemonConfig;
12
13#[non_exhaustive]
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub enum ComponentStatus {
16    Running,
17    Failed(String),
18    Stopped,
19}
20
21#[non_exhaustive]
22/// Error type for daemon component task failures.
23#[derive(Debug, thiserror::Error)]
24pub enum DaemonError {
25    #[error("task error: {0}")]
26    Task(String),
27    #[error("shutdown error: {0}")]
28    Shutdown(String),
29}
30
31pub struct ComponentHandle {
32    pub name: String,
33    handle: JoinHandle<Result<(), DaemonError>>,
34    pub status: ComponentStatus,
35    pub restart_count: u32,
36}
37
38impl ComponentHandle {
39    #[must_use]
40    pub fn new(name: impl Into<String>, handle: JoinHandle<Result<(), DaemonError>>) -> Self {
41        Self {
42            name: name.into(),
43            handle,
44            status: ComponentStatus::Running,
45            restart_count: 0,
46        }
47    }
48
49    #[must_use]
50    pub fn is_finished(&self) -> bool {
51        self.handle.is_finished()
52    }
53}
54
55pub struct DaemonSupervisor {
56    components: Vec<ComponentHandle>,
57    health_interval: Duration,
58    _max_backoff: Duration,
59    shutdown_rx: watch::Receiver<bool>,
60}
61
62impl DaemonSupervisor {
63    #[must_use]
64    pub fn new(config: &DaemonConfig, shutdown_rx: watch::Receiver<bool>) -> Self {
65        Self {
66            components: Vec::new(),
67            health_interval: Duration::from_secs(config.health_interval_secs),
68            _max_backoff: Duration::from_secs(config.max_restart_backoff_secs),
69            shutdown_rx,
70        }
71    }
72
73    pub fn add_component(&mut self, handle: ComponentHandle) {
74        self.components.push(handle);
75    }
76
77    #[must_use]
78    pub fn component_count(&self) -> usize {
79        self.components.len()
80    }
81
82    /// Run the health monitoring loop until shutdown signal.
83    pub async fn run(&mut self) {
84        let mut interval = tokio::time::interval(self.health_interval);
85        loop {
86            tokio::select! {
87                _ = interval.tick() => {
88                    self.check_health();
89                }
90                _ = self.shutdown_rx.changed() => {
91                    if *self.shutdown_rx.borrow() {
92                        tracing::info!("daemon supervisor shutting down");
93                        break;
94                    }
95                }
96            }
97        }
98    }
99
100    fn check_health(&mut self) {
101        for component in &mut self.components {
102            if component.status == ComponentStatus::Running && component.is_finished() {
103                component.status = ComponentStatus::Failed("task exited".into());
104                component.restart_count += 1;
105                tracing::warn!(
106                    component = %component.name,
107                    restarts = component.restart_count,
108                    "component exited unexpectedly"
109                );
110            }
111        }
112    }
113
114    #[must_use]
115    pub fn component_statuses(&self) -> Vec<(&str, &ComponentStatus)> {
116        self.components
117            .iter()
118            .map(|c| (c.name.as_str(), &c.status))
119            .collect()
120    }
121}
122
123/// Check whether a process with the given PID is currently alive.
124///
125/// On Unix, uses `kill -0` which returns success if the process exists and the current user
126/// has permission to signal it.
127/// On Windows, uses `tasklist /FI "PID eq <pid>"` and checks for the PID in the output.
128#[must_use]
129pub fn is_process_alive(pid: u32) -> bool {
130    #[cfg(unix)]
131    {
132        // PIDs on Unix are signed (pid_t = i32); u32::MAX wraps to -1 which would
133        // signal every process, so reject anything that does not fit in a positive i32.
134        let Ok(signed) = i32::try_from(pid) else {
135            return false;
136        };
137        if signed <= 0 {
138            return false;
139        }
140        std::process::Command::new("kill")
141            .args(["-0", &signed.to_string()])
142            .output()
143            .is_ok_and(|o| o.status.success())
144    }
145    #[cfg(windows)]
146    {
147        std::process::Command::new("tasklist")
148            .args(["/FI", &format!("PID eq {pid}"), "/NH", "/FO", "CSV"])
149            .output()
150            .map(|o| {
151                let stdout = String::from_utf8_lossy(&o.stdout);
152                // tasklist outputs lines like: "process.exe","PID","..."
153                // We look for the PID appearing as a quoted field.
154                stdout.contains(&format!("\"{pid}\""))
155            })
156            .unwrap_or(false)
157    }
158    #[cfg(not(any(unix, windows)))]
159    {
160        let _ = pid;
161        false
162    }
163}
164
165/// Write a PID file atomically using `O_CREAT | O_EXCL` to prevent TOCTOU races.
166///
167/// # Errors
168///
169/// Returns an error if the PID file directory cannot be created, the file already exists,
170/// or the file cannot be written.
171pub fn write_pid_file(path: &str) -> std::io::Result<()> {
172    use std::io::Write as _;
173    let expanded = expand_tilde(path);
174    let path = std::path::Path::new(&expanded);
175    if let Some(parent) = path.parent() {
176        std::fs::create_dir_all(parent)?;
177    }
178    let mut file = std::fs::OpenOptions::new()
179        .write(true)
180        .create_new(true)
181        .open(path)?;
182    file.write_all(std::process::id().to_string().as_bytes())
183}
184
185/// Read the PID from a PID file.
186///
187/// # Errors
188///
189/// Returns an error if the file cannot be read or the content is not a valid PID.
190pub fn read_pid_file(path: &str) -> std::io::Result<u32> {
191    let expanded = expand_tilde(path);
192    let content = std::fs::read_to_string(&expanded)?;
193    content
194        .trim()
195        .parse::<u32>()
196        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
197}
198
199/// Remove the PID file.
200///
201/// # Errors
202///
203/// Returns an error if the file cannot be removed.
204pub fn remove_pid_file(path: &str) -> std::io::Result<()> {
205    let expanded = expand_tilde(path);
206    match std::fs::remove_file(&expanded) {
207        Ok(()) => Ok(()),
208        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
209        Err(e) => Err(e),
210    }
211}
212
213fn expand_tilde(path: &str) -> String {
214    if let Some(rest) = path.strip_prefix("~/")
215        && let Some(home) = std::env::var_os("HOME").or_else(|| std::env::var_os("USERPROFILE"))
216    {
217        return format!("{}/{rest}", home.to_string_lossy());
218    }
219    path.to_owned()
220}
221
222#[cfg(test)]
223mod tests {
224    #![allow(clippy::field_reassign_with_default)]
225
226    use super::*;
227
228    #[test]
229    fn expand_tilde_with_home() {
230        let result = expand_tilde("~/test/file.pid");
231        assert!(!result.starts_with("~/"));
232    }
233
234    #[test]
235    fn expand_tilde_absolute_unchanged() {
236        assert_eq!(expand_tilde("/tmp/zeph.pid"), "/tmp/zeph.pid");
237    }
238
239    #[test]
240    fn pid_file_roundtrip() {
241        let dir = tempfile::tempdir().unwrap();
242        let path = dir.path().join("test.pid");
243        let path_str = path.to_string_lossy().to_string();
244
245        write_pid_file(&path_str).unwrap();
246        let pid = read_pid_file(&path_str).unwrap();
247        assert_eq!(pid, std::process::id());
248        remove_pid_file(&path_str).unwrap();
249        assert!(!path.exists());
250    }
251
252    #[test]
253    fn remove_nonexistent_pid_file_ok() {
254        assert!(remove_pid_file("/tmp/nonexistent_zeph_test.pid").is_ok());
255    }
256
257    #[test]
258    fn read_invalid_pid_file() {
259        let dir = tempfile::tempdir().unwrap();
260        let path = dir.path().join("bad.pid");
261        std::fs::write(&path, "not_a_number").unwrap();
262        assert!(read_pid_file(&path.to_string_lossy()).is_err());
263    }
264
265    #[tokio::test]
266    async fn supervisor_tracks_components() {
267        let config = DaemonConfig::default();
268        let (_tx, rx) = watch::channel(false);
269        let mut supervisor = DaemonSupervisor::new(&config, rx);
270
271        let handle = tokio::spawn(async { Ok::<(), DaemonError>(()) });
272        supervisor.add_component(ComponentHandle::new("test", handle));
273        assert_eq!(supervisor.component_count(), 1);
274    }
275
276    #[tokio::test]
277    async fn supervisor_detects_finished_component() {
278        let config = DaemonConfig::default();
279        let (_tx, rx) = watch::channel(false);
280        let mut supervisor = DaemonSupervisor::new(&config, rx);
281
282        let handle = tokio::spawn(async { Ok::<(), DaemonError>(()) });
283        tokio::time::sleep(Duration::from_millis(10)).await;
284        supervisor.add_component(ComponentHandle::new("finished", handle));
285        supervisor.check_health();
286
287        let statuses = supervisor.component_statuses();
288        assert_eq!(statuses.len(), 1);
289        assert!(matches!(statuses[0].1, ComponentStatus::Failed(_)));
290    }
291
292    #[tokio::test]
293    async fn supervisor_shutdown() {
294        let config = DaemonConfig {
295            health_interval_secs: 1,
296            ..DaemonConfig::default()
297        };
298        let (tx, rx) = watch::channel(false);
299        let mut supervisor = DaemonSupervisor::new(&config, rx);
300
301        let run_handle = tokio::spawn(async move { supervisor.run().await });
302        tokio::time::sleep(Duration::from_millis(50)).await;
303        let _ = tx.send(true);
304        tokio::time::timeout(Duration::from_secs(2), run_handle)
305            .await
306            .expect("supervisor should stop on shutdown")
307            .expect("task should complete");
308    }
309
310    #[test]
311    fn component_status_eq() {
312        assert_eq!(ComponentStatus::Running, ComponentStatus::Running);
313        assert_eq!(ComponentStatus::Stopped, ComponentStatus::Stopped);
314        assert_ne!(ComponentStatus::Running, ComponentStatus::Stopped);
315    }
316
317    #[test]
318    fn is_process_alive_current_process() {
319        let pid = std::process::id();
320        assert!(is_process_alive(pid), "current process must be alive");
321    }
322
323    #[test]
324    fn is_process_alive_nonexistent_pid() {
325        // u32::MAX is effectively guaranteed to not be a valid running PID.
326        assert!(
327            !is_process_alive(u32::MAX),
328            "PID u32::MAX must not be alive"
329        );
330    }
331}