Skip to main content

sqlite_graphrag/
reaper.rs

1//! G28: Reaper for orphan external processes.
2//!
3//! When the CLI crashes or is killed (SIGKILL, OOM, machine reset), child
4//! processes spawned by `claude -p` or `codex exec` may be left running.
5//! Without cleanup they accumulate as zombies that consume CPU, RAM, and
6//! MCP-spawned subprocess trees (the 2026-06-03 incident: 1.877 processes
7//! total, load average 276 on a 10-CPU host).
8//!
9//! [`scan_and_kill_orphans`] walks the process table at startup and
10//! terminates any `claude` or `codex` invocation whose `PPID` is `1`
11//! (reparented to `init`/`launchd` after the parent died) and that is
12//! older than the `ORPHAN_MIN_AGE_SECS` constant. The scan is conservative: it only
13//! kills processes that (a) match a known LLM CLI name, AND (b) are
14//! orphaned, AND (c) are older than the threshold. A short-lived CLI
15//! that is just starting up is left alone.
16
17// v1.0.74: gate the orphan-reaper internals behind `cfg(unix)` so the
18// constants and the `Duration` import are not flagged as dead code on
19// Windows. The tests that reference them also need the same gate so the
20// Windows test compilation does not break (the tests assert the values
21// match the contract documented in CHANGELOG G28).
22#[cfg(unix)]
23use std::time::Duration;
24
25#[cfg(unix)]
26const ORPHAN_MIN_AGE_SECS: u64 = 60;
27
28#[cfg(unix)]
29const ORPHAN_SCAN_TARGETS: &[&str] = &["claude", "codex", "sqlite-graphrag"];
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub struct ReaperReport {
33    /// Number of orphan processes detected.
34    pub found: usize,
35    /// Number of orphan processes successfully terminated.
36    pub killed: usize,
37    /// Number that we could not terminate (permission, ESRCH, etc).
38    pub failed: usize,
39    /// Elapsed wall time of the scan.
40    pub elapsed_ms: u64,
41}
42
43/// Walks the process table and kills orphan LLM invocations.
44///
45/// The scan is best-effort and never panics: on any unexpected error it
46/// logs the failure and returns a report with `killed = 0`.
47pub fn scan_and_kill_orphans() -> ReaperReport {
48    let start = std::time::Instant::now();
49    let mut report = ReaperReport {
50        found: 0,
51        killed: 0,
52        failed: 0,
53        elapsed_ms: 0,
54    };
55
56    #[cfg(unix)]
57    {
58        if let Err(e) = scan_unix(&mut report) {
59            tracing::warn!(target: "reaper", error = %e, "orphan scan failed");
60        }
61        // G42/S4 (v1.0.79): also remove stale `codex-home-{pid}`
62        // isolation directories left behind by crashed invocations.
63        clean_stale_codex_homes();
64    }
65
66    let max = crate::llm_slots::default_max_concurrency();
67    let stale = crate::llm_slots::find_stale_slots(max);
68    for slot_id in &stale {
69        let _ = crate::llm_slots::force_release(*slot_id);
70        tracing::info!(target: "reaper", slot_id, "released stale LLM slot (PID dead)");
71    }
72
73    #[cfg(not(unix))]
74    {
75        tracing::debug!(target: "reaper", "orphan scan is a no-op on non-Unix platforms");
76    }
77
78    report.elapsed_ms = start.elapsed().as_millis() as u64;
79    if report.killed > 0 {
80        tracing::warn!(
81            target: "reaper",
82            found = report.found,
83            killed = report.killed,
84            failed = report.failed,
85            "reaped orphan LLM subprocesses"
86        );
87    } else {
88        tracing::info!(target: "reaper", found = report.found, "no orphan LLM subprocesses detected");
89    }
90    report
91}
92
93#[cfg(unix)]
94fn scan_unix(report: &mut ReaperReport) -> std::io::Result<()> {
95    use std::fs;
96    use std::path::Path;
97
98    let proc = Path::new("/proc");
99    let entries = fs::read_dir(proc)?;
100    for entry in entries.flatten() {
101        let name = entry.file_name();
102        let Some(name_str) = name.to_str() else {
103            continue;
104        };
105        if !name_str.chars().all(|c| c.is_ascii_digit()) {
106            continue;
107        }
108        let pid: i32 = match name_str.parse() {
109            Ok(p) => p,
110            Err(_) => continue,
111        };
112        if pid == std::process::id() as i32 {
113            continue;
114        }
115
116        let stat_path = entry.path().join("stat");
117        let stat = match fs::read_to_string(&stat_path) {
118            Ok(s) => s,
119            Err(_) => continue,
120        };
121
122        // /proc/[pid]/stat has the form: `pid (comm) state ppid ...`
123        // The comm field can contain spaces and parens; the last `)`
124        // separates the comm from the rest.
125        let Some(close_paren) = stat.rfind(')') else {
126            continue;
127        };
128        let after = &stat[close_paren + 1..];
129        let mut parts = after.split_whitespace();
130        // parts[0] = state (e.g. "R"), parts[1] = ppid, parts[2] = pgrp, ...
131        let state = parts.next().unwrap_or("");
132        let ppid: i32 = parts.next().and_then(|p| p.parse().ok()).unwrap_or(-1);
133
134        // Only target processes orphaned to init (PPID 1 on Linux/Unix
135        // when the parent is gone) or whose parent is also dead.
136        if ppid != 1 {
137            continue;
138        }
139
140        // Skip zombies (state Z) — they need no kill.
141        if state.starts_with('Z') {
142            continue;
143        }
144
145        // Resolve the comm field. proc/[pid]/comm is the short program
146        // name (no path); we use it instead of parsing the bracketed
147        // comm from stat to avoid encoding edge cases.
148        let comm_path = entry.path().join("comm");
149        let comm = match fs::read_to_string(&comm_path) {
150            Ok(s) => s.trim().to_string(),
151            Err(_) => continue,
152        };
153
154        if !ORPHAN_SCAN_TARGETS.iter().any(|t| comm == *t) {
155            continue;
156        }
157
158        // Age check: skip processes that just spawned (under 60s old) so
159        // we never race with a concurrent CLI invocation.
160        let age_ok = check_process_age(pid, ORPHAN_MIN_AGE_SECS);
161        if !age_ok {
162            continue;
163        }
164
165        report.found += 1;
166        match terminate_pid(pid) {
167            Ok(()) => {
168                report.killed += 1;
169                tracing::info!(target: "reaper", pid, comm = %comm, "killed orphan LLM subprocess");
170            }
171            Err(e) => {
172                report.failed += 1;
173                tracing::warn!(target: "reaper", pid, comm = %comm, error = %e, "failed to kill orphan");
174            }
175        }
176    }
177    Ok(())
178}
179
180#[cfg(unix)]
181fn check_process_age(pid: i32, min_age_secs: u64) -> bool {
182    use std::fs;
183    // /proc/[pid]/stat field 22 is start_time in clock ticks since boot.
184    // We instead use the simpler heuristic: stat file mtime.
185    let stat_path = std::path::Path::new("/proc")
186        .join(pid.to_string())
187        .join("stat");
188    let Ok(meta) = fs::metadata(&stat_path) else {
189        return false;
190    };
191    let Ok(modified) = meta.modified() else {
192        return false;
193    };
194    let Ok(elapsed) = std::time::SystemTime::now().duration_since(modified) else {
195        return false;
196    };
197    elapsed >= Duration::from_secs(min_age_secs)
198}
199
200/// G42/S4 (v1.0.79): removes `~/.local/share/sqlite-graphrag/codex-home-{pid}`
201/// directories whose owning PID is no longer alive.
202///
203/// `prepare_isolated_codex_home` creates one directory per process and
204/// never deletes it (deleting on exit would race a concurrent invocation
205/// re-using the same PID number). The reaper is the right owner for the
206/// cleanup: at startup it removes every stale dir in one sweep.
207///
208/// Best-effort and conservative: a dir is removed only when (a) the name
209/// parses as `codex-home-<pid>`, (b) `kill(pid, 0)` reports the process
210/// gone (ESRCH), and (c) the pid is not our own.
211#[cfg(unix)]
212fn clean_stale_codex_homes() {
213    let Ok(home) = std::env::var("HOME") else {
214        return;
215    };
216    let base = std::path::Path::new(&home).join(".local/share/sqlite-graphrag");
217    let Ok(entries) = std::fs::read_dir(&base) else {
218        return;
219    };
220    let mut removed = 0usize;
221    for entry in entries.flatten() {
222        let name = entry.file_name();
223        let Some(name_str) = name.to_str() else {
224            continue;
225        };
226        let Some(pid_str) = name_str.strip_prefix("codex-home-") else {
227            continue;
228        };
229        let Ok(pid) = pid_str.parse::<i32>() else {
230            continue;
231        };
232        if pid == std::process::id() as i32 {
233            continue;
234        }
235        // kill(pid, 0): signal 0 performs the permission/existence check
236        // without delivering a signal. ESRCH means the process is gone.
237        let alive = unsafe { libc::kill(pid, 0) } == 0
238            || std::io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH);
239        if alive {
240            continue;
241        }
242        if std::fs::remove_dir_all(entry.path()).is_ok() {
243            removed += 1;
244        }
245    }
246    if removed > 0 {
247        tracing::info!(target: "reaper", removed, "removed stale codex-home isolation dirs");
248    }
249}
250
251#[cfg(unix)]
252fn terminate_pid(pid: i32) -> std::io::Result<()> {
253    // SIGTERM first; if the process ignores it for >2s, the caller can
254    // escalate to SIGKILL. For the reaper we send TERM and return; a
255    // follow-up sweep can send KILL if needed.
256    let rc = unsafe { libc::kill(pid, libc::SIGTERM) };
257    if rc == 0 {
258        Ok(())
259    } else {
260        Err(std::io::Error::last_os_error())
261    }
262}
263
264#[cfg(test)]
265mod tests {
266    use super::*;
267
268    #[test]
269    fn reaper_report_starts_zeroed() {
270        let r = ReaperReport {
271            found: 0,
272            killed: 0,
273            failed: 0,
274            elapsed_ms: 0,
275        };
276        assert_eq!(r.found, 0);
277        assert_eq!(r.killed, 0);
278        assert_eq!(r.failed, 0);
279    }
280
281    #[cfg(unix)]
282    #[test]
283    fn orphan_min_age_is_one_minute() {
284        // G28: the threshold of 60s is the safety margin that prevents
285        // a CLI invocation from killing a concurrent peer that just
286        // started 5s ago.
287        assert_eq!(ORPHAN_MIN_AGE_SECS, 60);
288    }
289
290    #[cfg(unix)]
291    #[test]
292    fn orphan_targets_include_claude_and_codex() {
293        assert!(ORPHAN_SCAN_TARGETS.contains(&"claude"));
294        assert!(ORPHAN_SCAN_TARGETS.contains(&"codex"));
295    }
296
297    #[cfg(unix)]
298    #[test]
299    fn orphan_targets_include_sqlite_graphrag() {
300        assert!(ORPHAN_SCAN_TARGETS.contains(&"sqlite-graphrag"));
301    }
302
303    #[test]
304    fn scan_completes_without_panic_on_linux() {
305        // Just ensure the function returns a ReaperReport on the test
306        // host. On Linux CI we may be PID 1 in containers; the report
307        // will simply have found=0.
308        let r = scan_and_kill_orphans();
309        assert!(r.elapsed_ms < 30_000, "scan must finish in <30s");
310    }
311}