Skip to main content

difflore_core/infra/
daemon.rs

1// SAFETY scope: this module wraps `libc::kill` for unix signal delivery
2// (liveness probe + SIGTERM/SIGKILL). Each call site has a per-block
3// SAFETY comment; the operations are read-only signal sends and do not
4// mutate process memory.
5#![allow(unsafe_code)]
6
7//! Background daemon for `DiffLore`.
8//!
9//! Consumes the `SQLite` outbox in a loop so AI-agent hooks stay on the
10//! fast path (hook = enqueue, daemon = drain). Does NOT listen on any
11//! HTTP port — the local stack is Rust-on-SQLite so in-process access
12//! is strictly cheaper than IPC. Lifecycle management uses a PID file
13//! at `~/.difflore/daemon.pid`; liveness is checked with `kill(pid, 0)`.
14//!
15//! `run` is the only long-running entry point; it installs a SIGTERM /
16//! SIGINT handler that breaks out of the drain loop cleanly and deletes
17//! the PID file before returning.
18
19use std::fs;
20use std::path::PathBuf;
21#[cfg(windows)]
22use std::process::Command;
23use std::time::Duration;
24
25use crate::cloud::client::CloudClient;
26use crate::cloud::outbox::{DEFAULT_STALE_SECONDS, OutboxQueue, drain_outbox};
27use crate::db::init_db;
28use crate::paths;
29
30/// Path of the PID file used by the internal daemon helpers.
31pub fn pid_path() -> Result<PathBuf, String> {
32    Ok(paths::data_home()?.join("daemon.pid"))
33}
34
35/// Report the daemon liveness state.
36#[derive(Debug, Clone, PartialEq, Eq)]
37pub enum DaemonStatus {
38    /// A PID file exists and the process is still alive.
39    Running { pid: i32 },
40    /// A PID file exists but no process by that PID responds to
41    /// `kill(pid, 0)`. The file is effectively orphaned.
42    Stale { pid: i32 },
43    /// No PID file found.
44    NotRunning,
45}
46
47impl DaemonStatus {
48    pub fn short(&self) -> String {
49        match self {
50            Self::Running { pid } => format!("running (pid {pid})"),
51            Self::Stale { pid } => format!("stale pid file (pid {pid}); not running"),
52            Self::NotRunning => "not running".to_owned(),
53        }
54    }
55}
56
57/// Probe the PID file + live process without mutating anything.
58pub fn status() -> DaemonStatus {
59    let Ok(path) = pid_path() else {
60        return DaemonStatus::NotRunning;
61    };
62    let Some(pid) = read_pid(&path) else {
63        return DaemonStatus::NotRunning;
64    };
65    if is_process_alive(pid) {
66        DaemonStatus::Running { pid }
67    } else {
68        DaemonStatus::Stale { pid }
69    }
70}
71
72fn read_pid(path: &std::path::Path) -> Option<i32> {
73    let raw = fs::read_to_string(path).ok()?;
74    raw.trim().parse::<i32>().ok()
75}
76
77fn write_pid(path: &std::path::Path, pid: i32) -> Result<(), String> {
78    if let Some(parent) = path.parent() {
79        fs::create_dir_all(parent).map_err(|e| format!("create parent: {e}"))?;
80    }
81    fs::write(path, pid.to_string()).map_err(|e| format!("write pid: {e}"))
82}
83
84fn remove_pid_file(path: &std::path::Path) {
85    // Best-effort; a missing pid file on shutdown is not an error.
86    let _ = fs::remove_file(path);
87}
88
89#[cfg(unix)]
90/// Signal 0 delivers nothing but still validates the target exists and
91/// we have permission to signal it. Covers SIGTERM check without
92/// actually killing anything. Safe on macOS and Linux.
93fn is_process_alive(pid: i32) -> bool {
94    // SAFETY: libc::kill with signal 0 is a classic liveness probe —
95    // returns 0 on success, -1 + errno=ESRCH if the PID is gone.
96    // Neither outcome mutates state.
97    unsafe { libc::kill(pid, 0) == 0 }
98}
99
100#[cfg(windows)]
101fn is_process_alive(pid: i32) -> bool {
102    let Ok(output) = Command::new("tasklist")
103        .args(["/FI", &format!("PID eq {pid}"), "/FO", "CSV", "/NH"])
104        .output()
105    else {
106        return false;
107    };
108    if !output.status.success() {
109        return false;
110    }
111    let stdout = String::from_utf8_lossy(&output.stdout);
112    stdout.contains(&format!("\"{pid}\"")) || stdout.contains(&format!(",{pid},"))
113}
114
115#[cfg(unix)]
116fn send_term(pid: i32) -> std::io::Result<()> {
117    send_signal(pid, libc::SIGTERM)
118}
119
120#[cfg(unix)]
121fn send_kill(pid: i32) -> std::io::Result<()> {
122    send_signal(pid, libc::SIGKILL)
123}
124
125#[cfg(unix)]
126fn send_signal(pid: i32, signum: libc::c_int) -> std::io::Result<()> {
127    // SAFETY: delegating to libc::kill; we already validated `pid` is
128    // a positive integer from the PID file. Errors are converted to
129    // `io::Error` via `errno`.
130    let rc = unsafe { libc::kill(pid, signum) };
131    if rc == 0 {
132        Ok(())
133    } else {
134        Err(std::io::Error::last_os_error())
135    }
136}
137
138#[cfg(windows)]
139fn send_term(pid: i32) -> std::io::Result<()> {
140    let status = Command::new("taskkill")
141        .args(["/PID", &pid.to_string()])
142        .status()?;
143    if status.success() {
144        Ok(())
145    } else {
146        Err(std::io::Error::other(format!(
147            "taskkill exited with {status}"
148        )))
149    }
150}
151
152#[cfg(windows)]
153fn send_kill(pid: i32) -> std::io::Result<()> {
154    let status = Command::new("taskkill")
155        .args(["/PID", &pid.to_string(), "/F"])
156        .status()?;
157    if status.success() {
158        Ok(())
159    } else {
160        Err(std::io::Error::other(format!(
161            "taskkill /F exited with {status}"
162        )))
163    }
164}
165
166/// Gracefully stop the daemon. Sends SIGTERM, waits up to
167/// `grace_secs` for the process to exit, then escalates to SIGKILL.
168/// Removes a stale PID file regardless of which path exits. Returns
169/// what actually happened so the CLI can phrase the UX correctly.
170pub async fn stop(grace_secs: u64) -> Result<StopOutcome, String> {
171    let path = pid_path()?;
172    let Some(pid) = read_pid(&path) else {
173        return Ok(StopOutcome::NotRunning);
174    };
175    if !is_process_alive(pid) {
176        remove_pid_file(&path);
177        return Ok(StopOutcome::StaleCleaned { pid });
178    }
179
180    send_term(pid).map_err(|e| format!("terminate pid {pid}: {e}"))?;
181
182    // Poll every 200 ms for up to `grace_secs` so a well-behaved daemon
183    // can finish its in-flight drain + unlink the pid file itself.
184    let poll = Duration::from_millis(200);
185    let deadline = tokio::time::Instant::now() + Duration::from_secs(grace_secs.max(1));
186    while tokio::time::Instant::now() < deadline {
187        if !is_process_alive(pid) {
188            remove_pid_file(&path);
189            return Ok(StopOutcome::Terminated { pid });
190        }
191        tokio::time::sleep(poll).await;
192    }
193
194    // Grace expired — escalate. We still remove the pid file because
195    // SIGKILL cannot be caught; whatever process by this PID is going
196    // away whether it wanted to or not.
197    let _ = send_kill(pid);
198    remove_pid_file(&path);
199    Ok(StopOutcome::Killed { pid })
200}
201
202/// What `stop` actually did — useful for UX messages.
203#[derive(Debug, Clone, PartialEq, Eq)]
204pub enum StopOutcome {
205    NotRunning,
206    StaleCleaned { pid: i32 },
207    Terminated { pid: i32 },
208    Killed { pid: i32 },
209}
210
211/// Long-running drain loop. Claims outbox rows and dispatches them to
212/// the cloud on a fixed cadence. Only exits on SIGTERM / SIGINT or on
213/// a genuinely fatal error (DB open failure, etc.).
214///
215/// Safe to call at most once per process: writes its own PID file on
216/// entry and errors out if one already belongs to a live process.
217pub async fn run(tick_interval_secs: u64, batch_size: usize) -> Result<(), String> {
218    let path = pid_path()?;
219
220    // Refuse to start a second daemon against the same DIFFLORE_HOME —
221    // two concurrent drainers would double-send every outbox row.
222    match status() {
223        DaemonStatus::Running { pid } => {
224            return Err(format!(
225                "another daemon is already running (pid {pid}); stop that process before starting another"
226            ));
227        }
228        DaemonStatus::Stale { .. } | DaemonStatus::NotRunning => {}
229    }
230
231    let my_pid = std::process::id() as i32;
232    write_pid(&path, my_pid)?;
233
234    let db = init_db().await?;
235    let queue = OutboxQueue::new(db);
236    let client = CloudClient::create().await;
237
238    // Install SIGTERM / SIGINT handler BEFORE entering the loop so a
239    // signal that arrives while we're mid-drain still causes a clean
240    // exit at the next tick boundary.
241    let shutdown = shutdown_signal_future();
242    tokio::pin!(shutdown);
243
244    let tick = Duration::from_secs(tick_interval_secs.max(1));
245    loop {
246        tokio::select! {
247            biased;
248            () = &mut shutdown => break,
249            () = tokio::time::sleep(tick) => {
250                // Step 1: opportunistic stale reclaim sweeps the queue
251                // before draining so rows stranded by a prior crashed
252                // daemon (same `DIFFLORE_HOME`, different PID) come
253                // back into play. `claim_next` also self-heals
254                // individual rows — this is a defence in depth.
255                let _ = queue.reset_stale(DEFAULT_STALE_SECONDS).await;
256
257                // Step 2: drain. Only SQL-level failures are surfaced;
258                // upload-level errors are absorbed by the queue's
259                // retry + circuit-breaker state, so logging them here
260                // would be noisy.
261                if let Err(e) = drain_outbox(&queue, &client, batch_size).await {
262                    eprintln!("[difflore.daemon] drain error: {e}");
263                }
264            }
265        }
266    }
267
268    remove_pid_file(&path);
269    Ok(())
270}
271
272/// Future that resolves on the first SIGTERM or SIGINT.
273///
274/// Kept separate so `run` stays readable; also makes it testable
275/// through a feature flag if we ever need a synthetic shutdown.
276async fn shutdown_signal_future() {
277    #[cfg(unix)]
278    {
279        use tokio::signal::unix::{SignalKind, signal};
280        let Ok(mut sigterm) = signal(SignalKind::terminate()) else {
281            return;
282        };
283        let Ok(mut sigint) = signal(SignalKind::interrupt()) else {
284            return;
285        };
286        tokio::select! {
287            _ = sigterm.recv() => {}
288            _ = sigint.recv() => {}
289        }
290    }
291
292    #[cfg(windows)]
293    {
294        let _ = tokio::signal::ctrl_c().await;
295    }
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301
302    /// Serialise every test in this module — they all touch the
303    /// single `~/.difflore/daemon.pid` under `shared_test_home`, so
304    /// running them in parallel means one test's dead-pid write gets
305    /// read by another's `status()` probe. `tokio::sync::Mutex` works
306    /// for both sync and async tests via `blocking_lock`.
307    static TEST_SERIAL: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
308
309    /// Spawn a throwaway child, wait for it to exit, return its PID.
310    /// That PID is guaranteed-dead for the probe window (at least
311    /// until the OS rolls the counter back to it).
312    fn spawn_dead_pid() -> i32 {
313        #[cfg(unix)]
314        let mut child = std::process::Command::new("true")
315            .spawn()
316            .expect("spawn true");
317        #[cfg(windows)]
318        let mut child = Command::new("cmd")
319            .args(["/C", "exit", "0"])
320            .spawn()
321            .expect("spawn cmd");
322        let id = child.id() as i32;
323        let _ = child.wait();
324        id
325    }
326
327    #[test]
328    fn status_reports_not_running_when_pid_file_missing() {
329        let _g = TEST_SERIAL.blocking_lock();
330        let _ = crate::db::shared_test_home();
331        let path = pid_path().expect("pid path");
332        let _ = fs::remove_file(&path);
333        assert_eq!(status(), DaemonStatus::NotRunning);
334    }
335
336    #[test]
337    fn status_detects_stale_pid_file() {
338        let _g = TEST_SERIAL.blocking_lock();
339        let _ = crate::db::shared_test_home();
340        let path = pid_path().expect("pid path");
341        let dead_pid = spawn_dead_pid();
342        fs::write(&path, dead_pid.to_string()).unwrap();
343
344        // Re-read from the file rather than trusting our local
345        // `dead_pid` — the goal is to verify `status()`'s transport
346        // (file -> probe), not to compare against a racy OS PID.
347        let stored: i32 = fs::read_to_string(&path).unwrap().trim().parse().unwrap();
348
349        match status() {
350            DaemonStatus::Stale { pid } => assert_eq!(pid, stored),
351            other => panic!("expected Stale, got {other:?}"),
352        }
353        let _ = fs::remove_file(&path);
354    }
355
356    #[tokio::test]
357    async fn stop_is_noop_when_not_running() {
358        let _g = TEST_SERIAL.lock().await;
359        let _ = crate::db::shared_test_home();
360        let path = pid_path().unwrap();
361        let _ = fs::remove_file(&path);
362        let outcome = stop(1).await.unwrap();
363        assert_eq!(outcome, StopOutcome::NotRunning);
364    }
365
366    #[tokio::test]
367    async fn stop_cleans_stale_pid_file_without_signalling() {
368        let _g = TEST_SERIAL.lock().await;
369        let _ = crate::db::shared_test_home();
370        let path = pid_path().unwrap();
371        let dead_pid = spawn_dead_pid();
372        fs::write(&path, dead_pid.to_string()).unwrap();
373        let stored: i32 = fs::read_to_string(&path).unwrap().trim().parse().unwrap();
374
375        let outcome = stop(1).await.unwrap();
376        assert_eq!(outcome, StopOutcome::StaleCleaned { pid: stored });
377        assert!(!path.exists(), "stale pid file should be removed by stop()");
378    }
379}