sqlite_graphrag/
reaper.rs1#[cfg(unix)]
23use std::time::Duration;
24
25#[cfg(unix)]
26const ORPHAN_MIN_AGE_SECS: u64 = 60;
27
28#[cfg(unix)]
29const ORPHAN_SCAN_TARGETS: &[&str] = &["claude", "codex", "sqlite-graphrag"];
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub struct ReaperReport {
33 pub found: usize,
35 pub killed: usize,
37 pub failed: usize,
39 pub elapsed_ms: u64,
41}
42
43pub fn scan_and_kill_orphans() -> ReaperReport {
48 let start = std::time::Instant::now();
49 let mut report = ReaperReport {
50 found: 0,
51 killed: 0,
52 failed: 0,
53 elapsed_ms: 0,
54 };
55
56 #[cfg(unix)]
57 {
58 if let Err(e) = scan_unix(&mut report) {
59 tracing::warn!(target: "reaper", error = %e, "orphan scan failed");
60 }
61 clean_stale_codex_homes();
64 }
65
66 let max = crate::llm_slots::default_max_concurrency();
67 let stale = crate::llm_slots::find_stale_slots(max);
68 for slot_id in &stale {
69 let _ = crate::llm_slots::force_release(*slot_id);
70 tracing::info!(target: "reaper", slot_id, "released stale LLM slot (PID dead)");
71 }
72
73 #[cfg(not(unix))]
74 {
75 tracing::debug!(target: "reaper", "orphan scan is a no-op on non-Unix platforms");
76 }
77
78 report.elapsed_ms = start.elapsed().as_millis() as u64;
79 if report.killed > 0 {
80 tracing::warn!(
81 target: "reaper",
82 found = report.found,
83 killed = report.killed,
84 failed = report.failed,
85 "reaped orphan LLM subprocesses"
86 );
87 } else {
88 tracing::info!(target: "reaper", found = report.found, "no orphan LLM subprocesses detected");
89 }
90 report
91}
92
93#[cfg(unix)]
94fn scan_unix(report: &mut ReaperReport) -> std::io::Result<()> {
95 use std::fs;
96 use std::path::Path;
97
98 let proc = Path::new("/proc");
99 let entries = fs::read_dir(proc)?;
100 for entry in entries.flatten() {
101 let name = entry.file_name();
102 let Some(name_str) = name.to_str() else {
103 continue;
104 };
105 if !name_str.chars().all(|c| c.is_ascii_digit()) {
106 continue;
107 }
108 let pid: i32 = match name_str.parse() {
109 Ok(p) => p,
110 Err(_) => continue,
111 };
112 if pid == std::process::id() as i32 {
113 continue;
114 }
115
116 let stat_path = entry.path().join("stat");
117 let stat = match fs::read_to_string(&stat_path) {
118 Ok(s) => s,
119 Err(_) => continue,
120 };
121
122 let Some(close_paren) = stat.rfind(')') else {
126 continue;
127 };
128 let after = &stat[close_paren + 1..];
129 let mut parts = after.split_whitespace();
130 let state = parts.next().unwrap_or("");
132 let ppid: i32 = parts.next().and_then(|p| p.parse().ok()).unwrap_or(-1);
133
134 if ppid != 1 {
137 continue;
138 }
139
140 if state.starts_with('Z') {
142 continue;
143 }
144
145 let comm_path = entry.path().join("comm");
149 let comm = match fs::read_to_string(&comm_path) {
150 Ok(s) => s.trim().to_string(),
151 Err(_) => continue,
152 };
153
154 if !ORPHAN_SCAN_TARGETS.iter().any(|t| comm == *t) {
155 continue;
156 }
157
158 let age_ok = check_process_age(pid, ORPHAN_MIN_AGE_SECS);
161 if !age_ok {
162 continue;
163 }
164
165 report.found += 1;
166 match terminate_pid(pid) {
167 Ok(()) => {
168 report.killed += 1;
169 tracing::info!(target: "reaper", pid, comm = %comm, "killed orphan LLM subprocess");
170 }
171 Err(e) => {
172 report.failed += 1;
173 tracing::warn!(target: "reaper", pid, comm = %comm, error = %e, "failed to kill orphan");
174 }
175 }
176 }
177 Ok(())
178}
179
180#[cfg(unix)]
181fn check_process_age(pid: i32, min_age_secs: u64) -> bool {
182 use std::fs;
183 let stat_path = std::path::Path::new("/proc")
186 .join(pid.to_string())
187 .join("stat");
188 let Ok(meta) = fs::metadata(&stat_path) else {
189 return false;
190 };
191 let Ok(modified) = meta.modified() else {
192 return false;
193 };
194 let Ok(elapsed) = std::time::SystemTime::now().duration_since(modified) else {
195 return false;
196 };
197 elapsed >= Duration::from_secs(min_age_secs)
198}
199
200#[cfg(unix)]
212fn clean_stale_codex_homes() {
213 let Ok(home) = std::env::var("HOME") else {
214 return;
215 };
216 let base = std::path::Path::new(&home).join(".local/share/sqlite-graphrag");
217 let Ok(entries) = std::fs::read_dir(&base) else {
218 return;
219 };
220 let mut removed = 0usize;
221 for entry in entries.flatten() {
222 let name = entry.file_name();
223 let Some(name_str) = name.to_str() else {
224 continue;
225 };
226 let Some(pid_str) = name_str.strip_prefix("codex-home-") else {
227 continue;
228 };
229 let Ok(pid) = pid_str.parse::<i32>() else {
230 continue;
231 };
232 if pid == std::process::id() as i32 {
233 continue;
234 }
235 let alive = unsafe { libc::kill(pid, 0) } == 0
238 || std::io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH);
239 if alive {
240 continue;
241 }
242 if std::fs::remove_dir_all(entry.path()).is_ok() {
243 removed += 1;
244 }
245 }
246 if removed > 0 {
247 tracing::info!(target: "reaper", removed, "removed stale codex-home isolation dirs");
248 }
249}
250
251#[cfg(unix)]
252fn terminate_pid(pid: i32) -> std::io::Result<()> {
253 let rc = unsafe { libc::kill(pid, libc::SIGTERM) };
257 if rc == 0 {
258 Ok(())
259 } else {
260 Err(std::io::Error::last_os_error())
261 }
262}
263
264#[cfg(test)]
265mod tests {
266 use super::*;
267
268 #[test]
269 fn reaper_report_starts_zeroed() {
270 let r = ReaperReport {
271 found: 0,
272 killed: 0,
273 failed: 0,
274 elapsed_ms: 0,
275 };
276 assert_eq!(r.found, 0);
277 assert_eq!(r.killed, 0);
278 assert_eq!(r.failed, 0);
279 }
280
281 #[cfg(unix)]
282 #[test]
283 fn orphan_min_age_is_one_minute() {
284 assert_eq!(ORPHAN_MIN_AGE_SECS, 60);
288 }
289
290 #[cfg(unix)]
291 #[test]
292 fn orphan_targets_include_claude_and_codex() {
293 assert!(ORPHAN_SCAN_TARGETS.contains(&"claude"));
294 assert!(ORPHAN_SCAN_TARGETS.contains(&"codex"));
295 }
296
297 #[cfg(unix)]
298 #[test]
299 fn orphan_targets_include_sqlite_graphrag() {
300 assert!(ORPHAN_SCAN_TARGETS.contains(&"sqlite-graphrag"));
301 }
302
303 #[test]
304 fn scan_completes_without_panic_on_linux() {
305 let r = scan_and_kill_orphans();
309 assert!(r.elapsed_ms < 30_000, "scan must finish in <30s");
310 }
311}