1use std::time::Duration;
7
8use tokio::sync::watch;
9use tokio::task::JoinHandle;
10
11use crate::config::DaemonConfig;
12
13#[non_exhaustive]
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub enum ComponentStatus {
16 Running,
17 Failed(String),
18 Stopped,
19}
20
21#[non_exhaustive]
22#[derive(Debug, thiserror::Error)]
24pub enum DaemonError {
25 #[error("task error: {0}")]
26 Task(String),
27 #[error("shutdown error: {0}")]
28 Shutdown(String),
29}
30
31pub struct ComponentHandle {
32 pub name: String,
33 handle: JoinHandle<Result<(), DaemonError>>,
34 pub status: ComponentStatus,
35 pub restart_count: u32,
36}
37
38impl ComponentHandle {
39 #[must_use]
40 pub fn new(name: impl Into<String>, handle: JoinHandle<Result<(), DaemonError>>) -> Self {
41 Self {
42 name: name.into(),
43 handle,
44 status: ComponentStatus::Running,
45 restart_count: 0,
46 }
47 }
48
49 #[must_use]
50 pub fn is_finished(&self) -> bool {
51 self.handle.is_finished()
52 }
53}
54
55pub struct DaemonSupervisor {
56 components: Vec<ComponentHandle>,
57 health_interval: Duration,
58 _max_backoff: Duration,
59 shutdown_rx: watch::Receiver<bool>,
60}
61
62impl DaemonSupervisor {
63 #[must_use]
64 pub fn new(config: &DaemonConfig, shutdown_rx: watch::Receiver<bool>) -> Self {
65 Self {
66 components: Vec::new(),
67 health_interval: Duration::from_secs(config.health_interval_secs),
68 _max_backoff: Duration::from_secs(config.max_restart_backoff_secs),
69 shutdown_rx,
70 }
71 }
72
73 pub fn add_component(&mut self, handle: ComponentHandle) {
74 self.components.push(handle);
75 }
76
77 #[must_use]
78 pub fn component_count(&self) -> usize {
79 self.components.len()
80 }
81
82 pub async fn run(&mut self) {
84 let mut interval = tokio::time::interval(self.health_interval);
85 loop {
86 tokio::select! {
87 _ = interval.tick() => {
88 self.check_health();
89 }
90 _ = self.shutdown_rx.changed() => {
91 if *self.shutdown_rx.borrow() {
92 tracing::info!("daemon supervisor shutting down");
93 break;
94 }
95 }
96 }
97 }
98 }
99
100 fn check_health(&mut self) {
101 for component in &mut self.components {
102 if component.status == ComponentStatus::Running && component.is_finished() {
103 component.status = ComponentStatus::Failed("task exited".into());
104 component.restart_count += 1;
105 tracing::warn!(
106 component = %component.name,
107 restarts = component.restart_count,
108 "component exited unexpectedly"
109 );
110 }
111 }
112 }
113
114 #[must_use]
115 pub fn component_statuses(&self) -> Vec<(&str, &ComponentStatus)> {
116 self.components
117 .iter()
118 .map(|c| (c.name.as_str(), &c.status))
119 .collect()
120 }
121}
122
123#[must_use]
129pub fn is_process_alive(pid: u32) -> bool {
130 #[cfg(unix)]
131 {
132 let Ok(signed) = i32::try_from(pid) else {
135 return false;
136 };
137 if signed <= 0 {
138 return false;
139 }
140 std::process::Command::new("kill")
141 .args(["-0", &signed.to_string()])
142 .output()
143 .is_ok_and(|o| o.status.success())
144 }
145 #[cfg(windows)]
146 {
147 std::process::Command::new("tasklist")
148 .args(["/FI", &format!("PID eq {pid}"), "/NH", "/FO", "CSV"])
149 .output()
150 .map(|o| {
151 let stdout = String::from_utf8_lossy(&o.stdout);
152 stdout.contains(&format!("\"{pid}\""))
155 })
156 .unwrap_or(false)
157 }
158 #[cfg(not(any(unix, windows)))]
159 {
160 let _ = pid;
161 false
162 }
163}
164
165pub fn write_pid_file(path: &str) -> std::io::Result<()> {
172 use std::io::Write as _;
173 let expanded = expand_tilde(path);
174 let path = std::path::Path::new(&expanded);
175 if let Some(parent) = path.parent() {
176 std::fs::create_dir_all(parent)?;
177 }
178 let mut file = std::fs::OpenOptions::new()
179 .write(true)
180 .create_new(true)
181 .open(path)?;
182 file.write_all(std::process::id().to_string().as_bytes())
183}
184
185pub fn read_pid_file(path: &str) -> std::io::Result<u32> {
191 let expanded = expand_tilde(path);
192 let content = std::fs::read_to_string(&expanded)?;
193 content
194 .trim()
195 .parse::<u32>()
196 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
197}
198
199pub fn remove_pid_file(path: &str) -> std::io::Result<()> {
205 let expanded = expand_tilde(path);
206 match std::fs::remove_file(&expanded) {
207 Ok(()) => Ok(()),
208 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
209 Err(e) => Err(e),
210 }
211}
212
213fn expand_tilde(path: &str) -> String {
214 if let Some(rest) = path.strip_prefix("~/")
215 && let Some(home) = std::env::var_os("HOME").or_else(|| std::env::var_os("USERPROFILE"))
216 {
217 return format!("{}/{rest}", home.to_string_lossy());
218 }
219 path.to_owned()
220}
221
222#[cfg(test)]
223mod tests {
224 #![allow(clippy::field_reassign_with_default)]
225
226 use super::*;
227
228 #[test]
229 fn expand_tilde_with_home() {
230 let result = expand_tilde("~/test/file.pid");
231 assert!(!result.starts_with("~/"));
232 }
233
234 #[test]
235 fn expand_tilde_absolute_unchanged() {
236 assert_eq!(expand_tilde("/tmp/zeph.pid"), "/tmp/zeph.pid");
237 }
238
239 #[test]
240 fn pid_file_roundtrip() {
241 let dir = tempfile::tempdir().unwrap();
242 let path = dir.path().join("test.pid");
243 let path_str = path.to_string_lossy().to_string();
244
245 write_pid_file(&path_str).unwrap();
246 let pid = read_pid_file(&path_str).unwrap();
247 assert_eq!(pid, std::process::id());
248 remove_pid_file(&path_str).unwrap();
249 assert!(!path.exists());
250 }
251
252 #[test]
253 fn remove_nonexistent_pid_file_ok() {
254 assert!(remove_pid_file("/tmp/nonexistent_zeph_test.pid").is_ok());
255 }
256
257 #[test]
258 fn read_invalid_pid_file() {
259 let dir = tempfile::tempdir().unwrap();
260 let path = dir.path().join("bad.pid");
261 std::fs::write(&path, "not_a_number").unwrap();
262 assert!(read_pid_file(&path.to_string_lossy()).is_err());
263 }
264
265 #[tokio::test]
266 async fn supervisor_tracks_components() {
267 let config = DaemonConfig::default();
268 let (_tx, rx) = watch::channel(false);
269 let mut supervisor = DaemonSupervisor::new(&config, rx);
270
271 let handle = tokio::spawn(async { Ok::<(), DaemonError>(()) });
272 supervisor.add_component(ComponentHandle::new("test", handle));
273 assert_eq!(supervisor.component_count(), 1);
274 }
275
276 #[tokio::test]
277 async fn supervisor_detects_finished_component() {
278 let config = DaemonConfig::default();
279 let (_tx, rx) = watch::channel(false);
280 let mut supervisor = DaemonSupervisor::new(&config, rx);
281
282 let handle = tokio::spawn(async { Ok::<(), DaemonError>(()) });
283 tokio::time::sleep(Duration::from_millis(10)).await;
284 supervisor.add_component(ComponentHandle::new("finished", handle));
285 supervisor.check_health();
286
287 let statuses = supervisor.component_statuses();
288 assert_eq!(statuses.len(), 1);
289 assert!(matches!(statuses[0].1, ComponentStatus::Failed(_)));
290 }
291
292 #[tokio::test]
293 async fn supervisor_shutdown() {
294 let config = DaemonConfig {
295 health_interval_secs: 1,
296 ..DaemonConfig::default()
297 };
298 let (tx, rx) = watch::channel(false);
299 let mut supervisor = DaemonSupervisor::new(&config, rx);
300
301 let run_handle = tokio::spawn(async move { supervisor.run().await });
302 tokio::time::sleep(Duration::from_millis(50)).await;
303 let _ = tx.send(true);
304 tokio::time::timeout(Duration::from_secs(2), run_handle)
305 .await
306 .expect("supervisor should stop on shutdown")
307 .expect("task should complete");
308 }
309
310 #[test]
311 fn component_status_eq() {
312 assert_eq!(ComponentStatus::Running, ComponentStatus::Running);
313 assert_eq!(ComponentStatus::Stopped, ComponentStatus::Stopped);
314 assert_ne!(ComponentStatus::Running, ComponentStatus::Stopped);
315 }
316
317 #[test]
318 fn is_process_alive_current_process() {
319 let pid = std::process::id();
320 assert!(is_process_alive(pid), "current process must be alive");
321 }
322
323 #[test]
324 fn is_process_alive_nonexistent_pid() {
325 assert!(
327 !is_process_alive(u32::MAX),
328 "PID u32::MAX must not be alive"
329 );
330 }
331}