Skip to main content

zeph_core/
daemon.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Daemon supervisor for component lifecycle management.
5
6use std::time::Duration;
7
8use tokio::sync::watch;
9use tokio::task::JoinHandle;
10
11use crate::config::DaemonConfig;
12
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub enum ComponentStatus {
15    Running,
16    Failed(String),
17    Stopped,
18}
19
20pub struct ComponentHandle {
21    pub name: String,
22    handle: JoinHandle<anyhow::Result<()>>,
23    pub status: ComponentStatus,
24    pub restart_count: u32,
25}
26
27impl ComponentHandle {
28    #[must_use]
29    pub fn new(name: impl Into<String>, handle: JoinHandle<anyhow::Result<()>>) -> Self {
30        Self {
31            name: name.into(),
32            handle,
33            status: ComponentStatus::Running,
34            restart_count: 0,
35        }
36    }
37
38    #[must_use]
39    pub fn is_finished(&self) -> bool {
40        self.handle.is_finished()
41    }
42}
43
44pub struct DaemonSupervisor {
45    components: Vec<ComponentHandle>,
46    health_interval: Duration,
47    _max_backoff: Duration,
48    shutdown_rx: watch::Receiver<bool>,
49}
50
51impl DaemonSupervisor {
52    #[must_use]
53    pub fn new(config: &DaemonConfig, shutdown_rx: watch::Receiver<bool>) -> Self {
54        Self {
55            components: Vec::new(),
56            health_interval: Duration::from_secs(config.health_interval_secs),
57            _max_backoff: Duration::from_secs(config.max_restart_backoff_secs),
58            shutdown_rx,
59        }
60    }
61
62    pub fn add_component(&mut self, handle: ComponentHandle) {
63        self.components.push(handle);
64    }
65
66    #[must_use]
67    pub fn component_count(&self) -> usize {
68        self.components.len()
69    }
70
71    /// Run the health monitoring loop until shutdown signal.
72    pub async fn run(&mut self) {
73        let mut interval = tokio::time::interval(self.health_interval);
74        loop {
75            tokio::select! {
76                _ = interval.tick() => {
77                    self.check_health();
78                }
79                _ = self.shutdown_rx.changed() => {
80                    if *self.shutdown_rx.borrow() {
81                        tracing::info!("daemon supervisor shutting down");
82                        break;
83                    }
84                }
85            }
86        }
87    }
88
89    fn check_health(&mut self) {
90        for component in &mut self.components {
91            if component.status == ComponentStatus::Running && component.is_finished() {
92                component.status = ComponentStatus::Failed("task exited".into());
93                component.restart_count += 1;
94                tracing::warn!(
95                    component = %component.name,
96                    restarts = component.restart_count,
97                    "component exited unexpectedly"
98                );
99            }
100        }
101    }
102
103    #[must_use]
104    pub fn component_statuses(&self) -> Vec<(&str, &ComponentStatus)> {
105        self.components
106            .iter()
107            .map(|c| (c.name.as_str(), &c.status))
108            .collect()
109    }
110}
111
112/// Write a PID file atomically using `O_CREAT | O_EXCL` to prevent TOCTOU races.
113///
114/// # Errors
115///
116/// Returns an error if the PID file directory cannot be created, the file already exists,
117/// or the file cannot be written.
118pub fn write_pid_file(path: &str) -> std::io::Result<()> {
119    use std::io::Write as _;
120    let expanded = expand_tilde(path);
121    let path = std::path::Path::new(&expanded);
122    if let Some(parent) = path.parent() {
123        std::fs::create_dir_all(parent)?;
124    }
125    let mut file = std::fs::OpenOptions::new()
126        .write(true)
127        .create_new(true)
128        .open(path)?;
129    file.write_all(std::process::id().to_string().as_bytes())
130}
131
132/// Read the PID from a PID file.
133///
134/// # Errors
135///
136/// Returns an error if the file cannot be read or the content is not a valid PID.
137pub fn read_pid_file(path: &str) -> std::io::Result<u32> {
138    let expanded = expand_tilde(path);
139    let content = std::fs::read_to_string(&expanded)?;
140    content
141        .trim()
142        .parse::<u32>()
143        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
144}
145
146/// Remove the PID file.
147///
148/// # Errors
149///
150/// Returns an error if the file cannot be removed.
151pub fn remove_pid_file(path: &str) -> std::io::Result<()> {
152    let expanded = expand_tilde(path);
153    match std::fs::remove_file(&expanded) {
154        Ok(()) => Ok(()),
155        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
156        Err(e) => Err(e),
157    }
158}
159
160fn expand_tilde(path: &str) -> String {
161    if let Some(rest) = path.strip_prefix("~/")
162        && let Some(home) = std::env::var_os("HOME").or_else(|| std::env::var_os("USERPROFILE"))
163    {
164        return format!("{}/{rest}", home.to_string_lossy());
165    }
166    path.to_owned()
167}
168
169#[cfg(test)]
170mod tests {
171    use super::*;
172
173    #[test]
174    fn expand_tilde_with_home() {
175        let result = expand_tilde("~/test/file.pid");
176        assert!(!result.starts_with("~/"));
177    }
178
179    #[test]
180    fn expand_tilde_absolute_unchanged() {
181        assert_eq!(expand_tilde("/tmp/zeph.pid"), "/tmp/zeph.pid");
182    }
183
184    #[test]
185    fn pid_file_roundtrip() {
186        let dir = tempfile::tempdir().unwrap();
187        let path = dir.path().join("test.pid");
188        let path_str = path.to_string_lossy().to_string();
189
190        write_pid_file(&path_str).unwrap();
191        let pid = read_pid_file(&path_str).unwrap();
192        assert_eq!(pid, std::process::id());
193        remove_pid_file(&path_str).unwrap();
194        assert!(!path.exists());
195    }
196
197    #[test]
198    fn remove_nonexistent_pid_file_ok() {
199        assert!(remove_pid_file("/tmp/nonexistent_zeph_test.pid").is_ok());
200    }
201
202    #[test]
203    fn read_invalid_pid_file() {
204        let dir = tempfile::tempdir().unwrap();
205        let path = dir.path().join("bad.pid");
206        std::fs::write(&path, "not_a_number").unwrap();
207        assert!(read_pid_file(&path.to_string_lossy()).is_err());
208    }
209
210    #[tokio::test]
211    async fn supervisor_tracks_components() {
212        let config = DaemonConfig::default();
213        let (_tx, rx) = watch::channel(false);
214        let mut supervisor = DaemonSupervisor::new(&config, rx);
215
216        let handle = tokio::spawn(async { Ok(()) });
217        supervisor.add_component(ComponentHandle::new("test", handle));
218        assert_eq!(supervisor.component_count(), 1);
219    }
220
221    #[tokio::test]
222    async fn supervisor_detects_finished_component() {
223        let config = DaemonConfig::default();
224        let (_tx, rx) = watch::channel(false);
225        let mut supervisor = DaemonSupervisor::new(&config, rx);
226
227        let handle = tokio::spawn(async { Ok(()) });
228        tokio::time::sleep(Duration::from_millis(10)).await;
229        supervisor.add_component(ComponentHandle::new("finished", handle));
230        supervisor.check_health();
231
232        let statuses = supervisor.component_statuses();
233        assert_eq!(statuses.len(), 1);
234        assert!(matches!(statuses[0].1, ComponentStatus::Failed(_)));
235    }
236
237    #[tokio::test]
238    async fn supervisor_shutdown() {
239        let mut config = DaemonConfig::default();
240        config.health_interval_secs = 1;
241        let (tx, rx) = watch::channel(false);
242        let mut supervisor = DaemonSupervisor::new(&config, rx);
243
244        let run_handle = tokio::spawn(async move { supervisor.run().await });
245        tokio::time::sleep(Duration::from_millis(50)).await;
246        let _ = tx.send(true);
247        tokio::time::timeout(Duration::from_secs(2), run_handle)
248            .await
249            .expect("supervisor should stop on shutdown")
250            .expect("task should complete");
251    }
252
253    #[test]
254    fn component_status_eq() {
255        assert_eq!(ComponentStatus::Running, ComponentStatus::Running);
256        assert_eq!(ComponentStatus::Stopped, ComponentStatus::Stopped);
257        assert_ne!(ComponentStatus::Running, ComponentStatus::Stopped);
258    }
259}