1use std::time::Duration;
7
8use tokio::sync::watch;
9use tokio::task::JoinHandle;
10
11use crate::config::DaemonConfig;
12
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub enum ComponentStatus {
15 Running,
16 Failed(String),
17 Stopped,
18}
19
20pub struct ComponentHandle {
21 pub name: String,
22 handle: JoinHandle<anyhow::Result<()>>,
23 pub status: ComponentStatus,
24 pub restart_count: u32,
25}
26
27impl ComponentHandle {
28 #[must_use]
29 pub fn new(name: impl Into<String>, handle: JoinHandle<anyhow::Result<()>>) -> Self {
30 Self {
31 name: name.into(),
32 handle,
33 status: ComponentStatus::Running,
34 restart_count: 0,
35 }
36 }
37
38 #[must_use]
39 pub fn is_finished(&self) -> bool {
40 self.handle.is_finished()
41 }
42}
43
44pub struct DaemonSupervisor {
45 components: Vec<ComponentHandle>,
46 health_interval: Duration,
47 _max_backoff: Duration,
48 shutdown_rx: watch::Receiver<bool>,
49}
50
51impl DaemonSupervisor {
52 #[must_use]
53 pub fn new(config: &DaemonConfig, shutdown_rx: watch::Receiver<bool>) -> Self {
54 Self {
55 components: Vec::new(),
56 health_interval: Duration::from_secs(config.health_interval_secs),
57 _max_backoff: Duration::from_secs(config.max_restart_backoff_secs),
58 shutdown_rx,
59 }
60 }
61
62 pub fn add_component(&mut self, handle: ComponentHandle) {
63 self.components.push(handle);
64 }
65
66 #[must_use]
67 pub fn component_count(&self) -> usize {
68 self.components.len()
69 }
70
71 pub async fn run(&mut self) {
73 let mut interval = tokio::time::interval(self.health_interval);
74 loop {
75 tokio::select! {
76 _ = interval.tick() => {
77 self.check_health();
78 }
79 _ = self.shutdown_rx.changed() => {
80 if *self.shutdown_rx.borrow() {
81 tracing::info!("daemon supervisor shutting down");
82 break;
83 }
84 }
85 }
86 }
87 }
88
89 fn check_health(&mut self) {
90 for component in &mut self.components {
91 if component.status == ComponentStatus::Running && component.is_finished() {
92 component.status = ComponentStatus::Failed("task exited".into());
93 component.restart_count += 1;
94 tracing::warn!(
95 component = %component.name,
96 restarts = component.restart_count,
97 "component exited unexpectedly"
98 );
99 }
100 }
101 }
102
103 #[must_use]
104 pub fn component_statuses(&self) -> Vec<(&str, &ComponentStatus)> {
105 self.components
106 .iter()
107 .map(|c| (c.name.as_str(), &c.status))
108 .collect()
109 }
110}
111
112pub fn write_pid_file(path: &str) -> std::io::Result<()> {
119 use std::io::Write as _;
120 let expanded = expand_tilde(path);
121 let path = std::path::Path::new(&expanded);
122 if let Some(parent) = path.parent() {
123 std::fs::create_dir_all(parent)?;
124 }
125 let mut file = std::fs::OpenOptions::new()
126 .write(true)
127 .create_new(true)
128 .open(path)?;
129 file.write_all(std::process::id().to_string().as_bytes())
130}
131
132pub fn read_pid_file(path: &str) -> std::io::Result<u32> {
138 let expanded = expand_tilde(path);
139 let content = std::fs::read_to_string(&expanded)?;
140 content
141 .trim()
142 .parse::<u32>()
143 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))
144}
145
146pub fn remove_pid_file(path: &str) -> std::io::Result<()> {
152 let expanded = expand_tilde(path);
153 match std::fs::remove_file(&expanded) {
154 Ok(()) => Ok(()),
155 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
156 Err(e) => Err(e),
157 }
158}
159
160fn expand_tilde(path: &str) -> String {
161 if let Some(rest) = path.strip_prefix("~/")
162 && let Some(home) = std::env::var_os("HOME").or_else(|| std::env::var_os("USERPROFILE"))
163 {
164 return format!("{}/{rest}", home.to_string_lossy());
165 }
166 path.to_owned()
167}
168
169#[cfg(test)]
170mod tests {
171 use super::*;
172
173 #[test]
174 fn expand_tilde_with_home() {
175 let result = expand_tilde("~/test/file.pid");
176 assert!(!result.starts_with("~/"));
177 }
178
179 #[test]
180 fn expand_tilde_absolute_unchanged() {
181 assert_eq!(expand_tilde("/tmp/zeph.pid"), "/tmp/zeph.pid");
182 }
183
184 #[test]
185 fn pid_file_roundtrip() {
186 let dir = tempfile::tempdir().unwrap();
187 let path = dir.path().join("test.pid");
188 let path_str = path.to_string_lossy().to_string();
189
190 write_pid_file(&path_str).unwrap();
191 let pid = read_pid_file(&path_str).unwrap();
192 assert_eq!(pid, std::process::id());
193 remove_pid_file(&path_str).unwrap();
194 assert!(!path.exists());
195 }
196
197 #[test]
198 fn remove_nonexistent_pid_file_ok() {
199 assert!(remove_pid_file("/tmp/nonexistent_zeph_test.pid").is_ok());
200 }
201
202 #[test]
203 fn read_invalid_pid_file() {
204 let dir = tempfile::tempdir().unwrap();
205 let path = dir.path().join("bad.pid");
206 std::fs::write(&path, "not_a_number").unwrap();
207 assert!(read_pid_file(&path.to_string_lossy()).is_err());
208 }
209
210 #[tokio::test]
211 async fn supervisor_tracks_components() {
212 let config = DaemonConfig::default();
213 let (_tx, rx) = watch::channel(false);
214 let mut supervisor = DaemonSupervisor::new(&config, rx);
215
216 let handle = tokio::spawn(async { Ok(()) });
217 supervisor.add_component(ComponentHandle::new("test", handle));
218 assert_eq!(supervisor.component_count(), 1);
219 }
220
221 #[tokio::test]
222 async fn supervisor_detects_finished_component() {
223 let config = DaemonConfig::default();
224 let (_tx, rx) = watch::channel(false);
225 let mut supervisor = DaemonSupervisor::new(&config, rx);
226
227 let handle = tokio::spawn(async { Ok(()) });
228 tokio::time::sleep(Duration::from_millis(10)).await;
229 supervisor.add_component(ComponentHandle::new("finished", handle));
230 supervisor.check_health();
231
232 let statuses = supervisor.component_statuses();
233 assert_eq!(statuses.len(), 1);
234 assert!(matches!(statuses[0].1, ComponentStatus::Failed(_)));
235 }
236
237 #[tokio::test]
238 async fn supervisor_shutdown() {
239 let mut config = DaemonConfig::default();
240 config.health_interval_secs = 1;
241 let (tx, rx) = watch::channel(false);
242 let mut supervisor = DaemonSupervisor::new(&config, rx);
243
244 let run_handle = tokio::spawn(async move { supervisor.run().await });
245 tokio::time::sleep(Duration::from_millis(50)).await;
246 let _ = tx.send(true);
247 tokio::time::timeout(Duration::from_secs(2), run_handle)
248 .await
249 .expect("supervisor should stop on shutdown")
250 .expect("task should complete");
251 }
252
253 #[test]
254 fn component_status_eq() {
255 assert_eq!(ComponentStatus::Running, ComponentStatus::Running);
256 assert_eq!(ComponentStatus::Stopped, ComponentStatus::Stopped);
257 assert_ne!(ComponentStatus::Running, ComponentStatus::Stopped);
258 }
259}