difflore_core/infra/
daemon.rs1#![allow(unsafe_code)]
6
7use std::fs;
20use std::path::PathBuf;
21#[cfg(windows)]
22use std::process::Command;
23use std::time::Duration;
24
25use crate::cloud::client::CloudClient;
26use crate::cloud::outbox::{DEFAULT_STALE_SECONDS, OutboxQueue, drain_outbox};
27use crate::db::init_db;
28use crate::paths;
29
30pub fn pid_path() -> Result<PathBuf, String> {
32 Ok(paths::data_home()?.join("daemon.pid"))
33}
34
35#[derive(Debug, Clone, PartialEq, Eq)]
37pub enum DaemonStatus {
38 Running { pid: i32 },
40 Stale { pid: i32 },
43 NotRunning,
45}
46
47impl DaemonStatus {
48 pub fn short(&self) -> String {
49 match self {
50 Self::Running { pid } => format!("running (pid {pid})"),
51 Self::Stale { pid } => format!("stale pid file (pid {pid}); not running"),
52 Self::NotRunning => "not running".to_owned(),
53 }
54 }
55}
56
57pub fn status() -> DaemonStatus {
59 let Ok(path) = pid_path() else {
60 return DaemonStatus::NotRunning;
61 };
62 let Some(pid) = read_pid(&path) else {
63 return DaemonStatus::NotRunning;
64 };
65 if is_process_alive(pid) {
66 DaemonStatus::Running { pid }
67 } else {
68 DaemonStatus::Stale { pid }
69 }
70}
71
72fn read_pid(path: &std::path::Path) -> Option<i32> {
73 let raw = fs::read_to_string(path).ok()?;
74 raw.trim().parse::<i32>().ok()
75}
76
77fn write_pid(path: &std::path::Path, pid: i32) -> Result<(), String> {
78 if let Some(parent) = path.parent() {
79 fs::create_dir_all(parent).map_err(|e| format!("create parent: {e}"))?;
80 }
81 fs::write(path, pid.to_string()).map_err(|e| format!("write pid: {e}"))
82}
83
84fn remove_pid_file(path: &std::path::Path) {
85 let _ = fs::remove_file(path);
87}
88
89#[cfg(unix)]
90fn is_process_alive(pid: i32) -> bool {
94 unsafe { libc::kill(pid, 0) == 0 }
98}
99
100#[cfg(windows)]
101fn is_process_alive(pid: i32) -> bool {
102 let Ok(output) = Command::new("tasklist")
103 .args(["/FI", &format!("PID eq {pid}"), "/FO", "CSV", "/NH"])
104 .output()
105 else {
106 return false;
107 };
108 if !output.status.success() {
109 return false;
110 }
111 let stdout = String::from_utf8_lossy(&output.stdout);
112 stdout.contains(&format!("\"{pid}\"")) || stdout.contains(&format!(",{pid},"))
113}
114
115#[cfg(unix)]
116fn send_term(pid: i32) -> std::io::Result<()> {
117 send_signal(pid, libc::SIGTERM)
118}
119
120#[cfg(unix)]
121fn send_kill(pid: i32) -> std::io::Result<()> {
122 send_signal(pid, libc::SIGKILL)
123}
124
125#[cfg(unix)]
126fn send_signal(pid: i32, signum: libc::c_int) -> std::io::Result<()> {
127 let rc = unsafe { libc::kill(pid, signum) };
131 if rc == 0 {
132 Ok(())
133 } else {
134 Err(std::io::Error::last_os_error())
135 }
136}
137
138#[cfg(windows)]
139fn send_term(pid: i32) -> std::io::Result<()> {
140 let status = Command::new("taskkill")
141 .args(["/PID", &pid.to_string()])
142 .status()?;
143 if status.success() {
144 Ok(())
145 } else {
146 Err(std::io::Error::other(format!(
147 "taskkill exited with {status}"
148 )))
149 }
150}
151
152#[cfg(windows)]
153fn send_kill(pid: i32) -> std::io::Result<()> {
154 let status = Command::new("taskkill")
155 .args(["/PID", &pid.to_string(), "/F"])
156 .status()?;
157 if status.success() {
158 Ok(())
159 } else {
160 Err(std::io::Error::other(format!(
161 "taskkill /F exited with {status}"
162 )))
163 }
164}
165
166pub async fn stop(grace_secs: u64) -> Result<StopOutcome, String> {
171 let path = pid_path()?;
172 let Some(pid) = read_pid(&path) else {
173 return Ok(StopOutcome::NotRunning);
174 };
175 if !is_process_alive(pid) {
176 remove_pid_file(&path);
177 return Ok(StopOutcome::StaleCleaned { pid });
178 }
179
180 send_term(pid).map_err(|e| format!("terminate pid {pid}: {e}"))?;
181
182 let poll = Duration::from_millis(200);
185 let deadline = tokio::time::Instant::now() + Duration::from_secs(grace_secs.max(1));
186 while tokio::time::Instant::now() < deadline {
187 if !is_process_alive(pid) {
188 remove_pid_file(&path);
189 return Ok(StopOutcome::Terminated { pid });
190 }
191 tokio::time::sleep(poll).await;
192 }
193
194 let _ = send_kill(pid);
198 remove_pid_file(&path);
199 Ok(StopOutcome::Killed { pid })
200}
201
202#[derive(Debug, Clone, PartialEq, Eq)]
204pub enum StopOutcome {
205 NotRunning,
206 StaleCleaned { pid: i32 },
207 Terminated { pid: i32 },
208 Killed { pid: i32 },
209}
210
211pub async fn run(tick_interval_secs: u64, batch_size: usize) -> Result<(), String> {
218 let path = pid_path()?;
219
220 match status() {
223 DaemonStatus::Running { pid } => {
224 return Err(format!(
225 "another daemon is already running (pid {pid}); stop that process before starting another"
226 ));
227 }
228 DaemonStatus::Stale { .. } | DaemonStatus::NotRunning => {}
229 }
230
231 let my_pid = std::process::id() as i32;
232 write_pid(&path, my_pid)?;
233
234 let db = init_db().await?;
235 let queue = OutboxQueue::new(db);
236 let client = CloudClient::create().await;
237
238 let shutdown = shutdown_signal_future();
242 tokio::pin!(shutdown);
243
244 let tick = Duration::from_secs(tick_interval_secs.max(1));
245 loop {
246 tokio::select! {
247 biased;
248 () = &mut shutdown => break,
249 () = tokio::time::sleep(tick) => {
250 let _ = queue.reset_stale(DEFAULT_STALE_SECONDS).await;
256
257 if let Err(e) = drain_outbox(&queue, &client, batch_size).await {
262 eprintln!("[difflore.daemon] drain error: {e}");
263 }
264 }
265 }
266 }
267
268 remove_pid_file(&path);
269 Ok(())
270}
271
272async fn shutdown_signal_future() {
277 #[cfg(unix)]
278 {
279 use tokio::signal::unix::{SignalKind, signal};
280 let Ok(mut sigterm) = signal(SignalKind::terminate()) else {
281 return;
282 };
283 let Ok(mut sigint) = signal(SignalKind::interrupt()) else {
284 return;
285 };
286 tokio::select! {
287 _ = sigterm.recv() => {}
288 _ = sigint.recv() => {}
289 }
290 }
291
292 #[cfg(windows)]
293 {
294 let _ = tokio::signal::ctrl_c().await;
295 }
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301
302 static TEST_SERIAL: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
308
309 fn spawn_dead_pid() -> i32 {
313 #[cfg(unix)]
314 let mut child = std::process::Command::new("true")
315 .spawn()
316 .expect("spawn true");
317 #[cfg(windows)]
318 let mut child = Command::new("cmd")
319 .args(["/C", "exit", "0"])
320 .spawn()
321 .expect("spawn cmd");
322 let id = child.id() as i32;
323 let _ = child.wait();
324 id
325 }
326
327 #[test]
328 fn status_reports_not_running_when_pid_file_missing() {
329 let _g = TEST_SERIAL.blocking_lock();
330 let _ = crate::db::shared_test_home();
331 let path = pid_path().expect("pid path");
332 let _ = fs::remove_file(&path);
333 assert_eq!(status(), DaemonStatus::NotRunning);
334 }
335
336 #[test]
337 fn status_detects_stale_pid_file() {
338 let _g = TEST_SERIAL.blocking_lock();
339 let _ = crate::db::shared_test_home();
340 let path = pid_path().expect("pid path");
341 let dead_pid = spawn_dead_pid();
342 fs::write(&path, dead_pid.to_string()).unwrap();
343
344 let stored: i32 = fs::read_to_string(&path).unwrap().trim().parse().unwrap();
348
349 match status() {
350 DaemonStatus::Stale { pid } => assert_eq!(pid, stored),
351 other => panic!("expected Stale, got {other:?}"),
352 }
353 let _ = fs::remove_file(&path);
354 }
355
356 #[tokio::test]
357 async fn stop_is_noop_when_not_running() {
358 let _g = TEST_SERIAL.lock().await;
359 let _ = crate::db::shared_test_home();
360 let path = pid_path().unwrap();
361 let _ = fs::remove_file(&path);
362 let outcome = stop(1).await.unwrap();
363 assert_eq!(outcome, StopOutcome::NotRunning);
364 }
365
366 #[tokio::test]
367 async fn stop_cleans_stale_pid_file_without_signalling() {
368 let _g = TEST_SERIAL.lock().await;
369 let _ = crate::db::shared_test_home();
370 let path = pid_path().unwrap();
371 let dead_pid = spawn_dead_pid();
372 fs::write(&path, dead_pid.to_string()).unwrap();
373 let stored: i32 = fs::read_to_string(&path).unwrap().trim().parse().unwrap();
374
375 let outcome = stop(1).await.unwrap();
376 assert_eq!(outcome, StopOutcome::StaleCleaned { pid: stored });
377 assert!(!path.exists(), "stale pid file should be removed by stop()");
378 }
379}