1use std::time::Duration;
7
8use tokio::sync::watch;
9use tokio::task::JoinHandle;
10
11use crate::config::DaemonConfig;
12
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub enum ComponentStatus {
15 Running,
16 Failed(String),
17 Stopped,
18}
19
20pub struct ComponentHandle {
21 pub name: String,
22 handle: JoinHandle<anyhow::Result<()>>,
23 pub status: ComponentStatus,
24 pub restart_count: u32,
25}
26
27impl ComponentHandle {
28 #[must_use]
29 pub fn new(name: impl Into<String>, handle: JoinHandle<anyhow::Result<()>>) -> Self {
30 Self {
31 name: name.into(),
32 handle,
33 status: ComponentStatus::Running,
34 restart_count: 0,
35 }
36 }
37
38 #[must_use]
39 pub fn is_finished(&self) -> bool {
40 self.handle.is_finished()
41 }
42}
43
44pub struct DaemonSupervisor {
45 components: Vec<ComponentHandle>,
46 health_interval: Duration,
47 _max_backoff: Duration,
48 shutdown_rx: watch::Receiver<bool>,
49}
50
51impl DaemonSupervisor {
52 #[must_use]
53 pub fn new(config: &DaemonConfig, shutdown_rx: watch::Receiver<bool>) -> Self {
54 Self {
55 components: Vec::new(),
56 health_interval: Duration::from_secs(config.health_interval_secs),
57 _max_backoff: Duration::from_secs(config.max_restart_backoff_secs),
58 shutdown_rx,
59 }
60 }
61
62 pub fn add_component(&mut self, handle: ComponentHandle) {
63 self.components.push(handle);
64 }
65
66 #[must_use]
67 pub fn component_count(&self) -> usize {
68 self.components.len()
69 }
70
71 pub async fn run(&mut self) {
73 let mut interval = tokio::time::interval(self.health_interval);
74 loop {
75 tokio::select! {
76 _ = interval.tick() => {
77 self.check_health();
78 }
79 _ = self.shutdown_rx.changed() => {
80 if *self.shutdown_rx.borrow() {
81 tracing::info!("daemon supervisor shutting down");
82 break;
83 }
84 }
85 }
86 }
87 }
88
89 fn check_health(&mut self) {
90 for component in &mut self.components {
91 if component.status == ComponentStatus::Running && component.is_finished() {
92 component.status = ComponentStatus::Failed("task exited".into());
93 component.restart_count += 1;
94 tracing::warn!(
95 component = %component.name,
96 restarts = component.restart_count,
97 "component exited unexpectedly"
98 );
99 }
100 }
101 }
102
103 #[must_use]
104 pub fn component_statuses(&self) -> Vec<(&str, &ComponentStatus)> {
105 self.components
106 .iter()
107 .map(|c| (c.name.as_str(), &c.status))
108 .collect()
109 }
110}
111
112pub fn write_pid_file(path: &str) -> std::io::Result<()> {
119 use std::io::Write as _;
120 let expanded = expand_tilde(path);
121 let path = std::path::Path::new(&expanded);
122 if let Some(parent) = path.parent() {
123 std::fs::create_dir_all(parent)?;
124 }
125 let mut file = std::fs::OpenOptions::new()
126 .write(true)
127 .create_new(true)
128 .open(path)?;
129 file.write_all(std::process::id().to_string().as_bytes())
130}
131
132pub fn read_pid_file(path: &str) -> std::io::Result<u32> {
138 let expanded = expand_tilde(path);
139 let content = std::fs::read_to_string(&expanded)?;
140 content
141 .trim()
142 .parse::<u32>()
143 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
144}
145
146pub fn remove_pid_file(path: &str) -> std::io::Result<()> {
152 let expanded = expand_tilde(path);
153 match std::fs::remove_file(&expanded) {
154 Ok(()) => Ok(()),
155 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
156 Err(e) => Err(e),
157 }
158}
159
160fn expand_tilde(path: &str) -> String {
161 if let Some(rest) = path.strip_prefix("~/")
162 && let Some(home) = std::env::var_os("HOME").or_else(|| std::env::var_os("USERPROFILE"))
163 {
164 return format!("{}/{rest}", home.to_string_lossy());
165 }
166 path.to_owned()
167}
168
169#[cfg(test)]
170mod tests {
171 #![allow(clippy::field_reassign_with_default)]
172
173 use super::*;
174
175 #[test]
176 fn expand_tilde_with_home() {
177 let result = expand_tilde("~/test/file.pid");
178 assert!(!result.starts_with("~/"));
179 }
180
181 #[test]
182 fn expand_tilde_absolute_unchanged() {
183 assert_eq!(expand_tilde("/tmp/zeph.pid"), "/tmp/zeph.pid");
184 }
185
186 #[test]
187 fn pid_file_roundtrip() {
188 let dir = tempfile::tempdir().unwrap();
189 let path = dir.path().join("test.pid");
190 let path_str = path.to_string_lossy().to_string();
191
192 write_pid_file(&path_str).unwrap();
193 let pid = read_pid_file(&path_str).unwrap();
194 assert_eq!(pid, std::process::id());
195 remove_pid_file(&path_str).unwrap();
196 assert!(!path.exists());
197 }
198
199 #[test]
200 fn remove_nonexistent_pid_file_ok() {
201 assert!(remove_pid_file("/tmp/nonexistent_zeph_test.pid").is_ok());
202 }
203
204 #[test]
205 fn read_invalid_pid_file() {
206 let dir = tempfile::tempdir().unwrap();
207 let path = dir.path().join("bad.pid");
208 std::fs::write(&path, "not_a_number").unwrap();
209 assert!(read_pid_file(&path.to_string_lossy()).is_err());
210 }
211
212 #[tokio::test]
213 async fn supervisor_tracks_components() {
214 let config = DaemonConfig::default();
215 let (_tx, rx) = watch::channel(false);
216 let mut supervisor = DaemonSupervisor::new(&config, rx);
217
218 let handle = tokio::spawn(async { Ok(()) });
219 supervisor.add_component(ComponentHandle::new("test", handle));
220 assert_eq!(supervisor.component_count(), 1);
221 }
222
223 #[tokio::test]
224 async fn supervisor_detects_finished_component() {
225 let config = DaemonConfig::default();
226 let (_tx, rx) = watch::channel(false);
227 let mut supervisor = DaemonSupervisor::new(&config, rx);
228
229 let handle = tokio::spawn(async { Ok(()) });
230 tokio::time::sleep(Duration::from_millis(10)).await;
231 supervisor.add_component(ComponentHandle::new("finished", handle));
232 supervisor.check_health();
233
234 let statuses = supervisor.component_statuses();
235 assert_eq!(statuses.len(), 1);
236 assert!(matches!(statuses[0].1, ComponentStatus::Failed(_)));
237 }
238
239 #[tokio::test]
240 async fn supervisor_shutdown() {
241 let config = DaemonConfig {
242 health_interval_secs: 1,
243 ..DaemonConfig::default()
244 };
245 let (tx, rx) = watch::channel(false);
246 let mut supervisor = DaemonSupervisor::new(&config, rx);
247
248 let run_handle = tokio::spawn(async move { supervisor.run().await });
249 tokio::time::sleep(Duration::from_millis(50)).await;
250 let _ = tx.send(true);
251 tokio::time::timeout(Duration::from_secs(2), run_handle)
252 .await
253 .expect("supervisor should stop on shutdown")
254 .expect("task should complete");
255 }
256
257 #[test]
258 fn component_status_eq() {
259 assert_eq!(ComponentStatus::Running, ComponentStatus::Running);
260 assert_eq!(ComponentStatus::Stopped, ComponentStatus::Stopped);
261 assert_ne!(ComponentStatus::Running, ComponentStatus::Stopped);
262 }
263}