Skip to main content

sqlite_graphrag/
llm_slots.rs

1//! GAP-004 (v1.0.82): cross-process semaphore for spawning LLM subprocesses.
2//!
3//! When N Claude Code sessions run in parallel on the same host, each `remember`/`edit`/
4//! `recall`/`hybrid-search`/`enrich`/`deep-research`/`ingest` wants to spawn its own
5//! `codex exec` or `claude -p` subprocess. Without coordination, N subprocesses saturate
6//! the shared OAuth rate limit (observed: 19+ concurrent codex in the transcript
7//! of 2026-06-15).
8//!
9//! ## Solution
10//! - Slot files at `${XDG_RUNTIME_DIR:-~/.local/share}/sqlite-graphrag/llm-slots/slot-{0..N}.lock`
11//! - `fs4::FileExt::try_lock_exclusive` for atomic cross-process acquire (fcntl on Unix,
12//!   LockFileEx on Windows — `fs4` 0.9 with trustScore 9.6 confirmed via context7)
13//! - RAII guard `LlmSlotGuard` with `Drop` releases automatically on panic
14//! - Integration with `reaper.rs::scan_and_kill_orphans` to detect orphaned slots
15//!
16//! ## Usage
17//! ```rust,ignore
18//! use crate::llm_slots::acquire_llm_slot;
19//!
20//! let _guard = acquire_llm_slot(4, 30)?;
21//! // ... spawn LLM subprocess ...
22//! // the guard releases the slot automatically when it leaves scope
23//! ```
24
25use fs4::fs_std::FileExt;
26use std::fs::{self, File, OpenOptions};
27use std::path::PathBuf;
28use std::time::{Duration, Instant};
29
30use crate::errors::AppError;
31
32/// RAII guard that releases the slot automatically on panic, abrupt cancellation,
33/// or normal scope exit.
34pub struct LlmSlotGuard {
35    #[allow(dead_code)]
36    slot_file: File,
37    slot_id: u32,
38    acquired_at: Instant,
39}
40
41impl LlmSlotGuard {
42    /// Returns the slot id (0..max-1) this guard holds. Used by
43    /// `slots release --slot-id N` to map back to the file path.
44    pub fn slot_id(&self) -> u32 {
45        self.slot_id
46    }
47}
48
49impl Drop for LlmSlotGuard {
50    fn drop(&mut self) {
51        // Libera o lock do filesystem E remove o slot file.
52        // O flock é liberado automaticamente quando `slot_file` é dropado (RAII).
53        let path = slot_path(self.slot_id);
54        if let Err(e) = fs::remove_file(&path) {
55            tracing::debug!(slot_id = self.slot_id, error = %e, "slot file removal failed (already gone?)");
56        }
57        tracing::debug!(
58            slot_id = self.slot_id,
59            held_ms = self.acquired_at.elapsed().as_millis() as u64,
60            "llm slot released"
61        );
62    }
63}
64
65/// Acquires a free LLM slot, waiting up to `wait_secs` seconds.
66///
67/// Iterates over `slot_id` in `[0, max_concurrent)` and tries `create_new` + `try_lock_exclusive`.
68/// If all slots are busy, polls with `sleep(100ms)` until `wait_secs` expires.
69///
70/// ## Errors
71/// - `AppError::LockBusy` (exit 75) if `wait_secs` expires without a free slot
72/// - `AppError::Io` if the filesystem fails
73pub fn acquire_llm_slot(max_concurrent: u32, wait_secs: u64) -> Result<LlmSlotGuard, AppError> {
74    if max_concurrent == 0 {
75        return Err(AppError::Validation(
76            "max_concurrent deve ser >= 1 para acquire_llm_slot".to_string(),
77        ));
78    }
79    let dir = slots_dir();
80    fs::create_dir_all(&dir).map_err(|e| {
81        AppError::Io(std::io::Error::new(
82            e.kind(),
83            format!("failed to create slots dir {}: {e}", dir.display()),
84        ))
85    })?;
86
87    let stale = find_stale_slots(max_concurrent);
88    for slot_id in &stale {
89        let _ = force_release(*slot_id);
90        tracing::info!(slot_id, "released stale LLM slot (PID dead)");
91    }
92
93    let start = Instant::now();
94    let timeout = Duration::from_secs(wait_secs);
95
96    loop {
97        for slot_id in 0..max_concurrent {
98            let path = slot_path(slot_id);
99            match OpenOptions::new().write(true).create_new(true).open(&path) {
100                Ok(mut file) => {
101                    if file.try_lock_exclusive().is_ok() {
102                        let pid = std::process::id();
103                        // Escreve pid no arquivo para que  possa reportar
104                        use std::io::Write;
105                        let _ = writeln!(file, "pid={pid}");
106                        tracing::debug!(slot_id, pid, "llm slot acquired");
107                        return Ok(LlmSlotGuard {
108                            slot_file: file,
109                            slot_id,
110                            acquired_at: Instant::now(),
111                        });
112                    }
113                    // Slot file existe mas está locked por outro processo
114                }
115                Err(_) => {
116                    // Slot file já existe (race condition rara) — tenta próximo
117                }
118            }
119        }
120        // Todos os slots ocupados — polling
121        if start.elapsed() >= timeout {
122            return Err(AppError::LockBusy(format!(
123                "failed to acquire LLM slot within {wait_secs}s (max={max_concurrent} concurrent)"
124            )));
125        }
126        std::thread::sleep(Duration::from_millis(100));
127    }
128}
129
130/// Returns the current status of the LLM slots (for the `slots status --json` subcommand).
131#[derive(Debug, Clone, serde::Serialize)]
132pub struct SlotStatus {
133    pub max: u32,
134    pub active: u32,
135    pub pids: Vec<u32>,
136}
137
138pub fn read_status(max_concurrent: u32) -> SlotStatus {
139    let mut active = 0u32;
140    let mut pids = Vec::new();
141    for slot_id in 0..max_concurrent {
142        let path = slot_path(slot_id);
143        if path.exists() {
144            active += 1;
145            if let Ok(content) = fs::read_to_string(&path) {
146                if let Some(pid_line) = content.lines().find(|l| l.starts_with("pid=")) {
147                    if let Ok(pid) = pid_line[4..].parse::<u32>() {
148                        pids.push(pid);
149                    }
150                }
151            }
152        }
153    }
154    SlotStatus {
155        max: max_concurrent,
156        active,
157        pids,
158    }
159}
160
161/// Releases a specific slot (for the `slots release --slot-id N --yes` subcommand).
162pub fn force_release(slot_id: u32) -> Result<(), AppError> {
163    let path = slot_path(slot_id);
164    if path.exists() {
165        fs::remove_file(&path).map_err(|e| {
166            AppError::Io(std::io::Error::new(
167                e.kind(),
168                format!("failed to release slot {slot_id}: {e}"),
169            ))
170        })?;
171    }
172    Ok(())
173}
174
175/// Lists stale slot IDs (orphaned PIDs) — for automatic cleanup.
176pub fn find_stale_slots(max_concurrent: u32) -> Vec<u32> {
177    let mut stale = Vec::new();
178    for slot_id in 0..max_concurrent {
179        let path = slot_path(slot_id);
180        if path.exists() {
181            if let Ok(content) = fs::read_to_string(&path) {
182                if let Some(pid_line) = content.lines().find(|l| l.starts_with("pid=")) {
183                    if let Ok(pid) = pid_line[4..].parse::<u32>() {
184                        if !pid_alive(pid) {
185                            stale.push(slot_id);
186                        }
187                    }
188                }
189            }
190        }
191    }
192    stale
193}
194
195/// Checks whether a PID is alive on the system (best-effort cross-platform).
196#[cfg(unix)]
197fn pid_alive(pid: u32) -> bool {
198    // Tenta enviar signal 0 (no-op) para verificar existência
199    unsafe { libc::kill(pid as i32, 0) == 0 }
200}
201
202#[cfg(not(unix))]
203fn pid_alive(pid: u32) -> bool {
204    // No Windows, sem equivalente direto; assume vivo se arquivo existe.
205    // Cleanup manual via `slots cleanup --yes` é a via.
206    let _ = pid;
207    true
208}
209
210pub fn slots_dir() -> PathBuf {
211    let base = std::env::var("XDG_RUNTIME_DIR")
212        .or_else(|_| std::env::var("SQLITE_GRAPHRAG_CACHE_DIR"))
213        .unwrap_or_else(|_| {
214            std::env::var("HOME")
215                .map(|h| format!("{h}/.local/share"))
216                .unwrap_or_else(|_| "/tmp".to_string())
217        });
218    PathBuf::from(base).join("sqlite-graphrag/llm-slots")
219}
220
221pub fn slot_path(id: u32) -> PathBuf {
222    slots_dir().join(format!("slot-{id}.lock"))
223}
224
225/// Resolves the default LLM max-host-concurrency value.
226///
227/// Calibrated for the LLM-only build: each worker holds one subprocess
228/// `codex` or `claude` invocation. The formula mirrors the CLI semaphore
229/// in `lock::calculate_safe_concurrency`:
230///   `min(ncpus, available_memory_mb / LLM_WORKER_RSS_MB)`
231///
232/// Falls back to `MAX_CONCURRENT_CLI_INSTANCES` (16) when `sysinfo`
233/// cannot read `/proc/meminfo` (rare).
234pub fn default_max_concurrency() -> u32 {
235    let cpus = std::thread::available_parallelism()
236        .map(|n| n.get() as u32)
237        .unwrap_or(4);
238    // Without `sysinfo` at hand here, we use a conservative memory
239    // estimate: 4 GiB available on most hosts. The CLI semaphore in
240    // `lock::calculate_safe_concurrency` is the source of truth when
241    // exact memory data is available; this fallback just keeps the
242    // LLM slot default in the same order of magnitude.
243    let assumed_available_mb: u32 = 4096;
244    let per_worker = crate::constants::LLM_WORKER_RSS_MB as u32;
245    let safe = assumed_available_mb / per_worker.max(1);
246    let capped = safe.min(crate::constants::MAX_CONCURRENT_CLI_INSTANCES as u32);
247    cpus.min(capped).max(1)
248}
249
250#[cfg(test)]
251mod tests {
252    use super::*;
253    use std::sync::Arc;
254    use std::sync::Barrier;
255    use std::thread;
256
257    // Serialises every test that mutates the process-global slot env
258    // (XDG_RUNTIME_DIR / SQLITE_GRAPHRAG_CACHE_DIR). Without this, parallel
259    // tests clobber each other's env and collide in the same slots dir.
260    static SLOT_TEST_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
261
262    fn unique_test_dir() -> PathBuf {
263        let mut dir = std::env::temp_dir();
264        dir.push(format!(
265            "llm-slots-test-{}-{}",
266            std::process::id(),
267            std::time::SystemTime::now()
268                .duration_since(std::time::UNIX_EPOCH)
269                .unwrap()
270                .as_nanos()
271        ));
272        dir
273    }
274
275    fn isolate_slots_env() -> (Option<String>, Option<String>) {
276        let orig_xdg = std::env::var("XDG_RUNTIME_DIR").ok();
277        let orig_cache = std::env::var("SQLITE_GRAPHRAG_CACHE_DIR").ok();
278        std::env::remove_var("XDG_RUNTIME_DIR");
279        std::env::set_var("SQLITE_GRAPHRAG_CACHE_DIR", unique_test_dir());
280        (orig_xdg, orig_cache)
281    }
282
283    fn restore_slots_env(orig_xdg: Option<String>, orig_cache: Option<String>) {
284        match orig_xdg {
285            Some(v) => std::env::set_var("XDG_RUNTIME_DIR", v),
286            None => std::env::remove_var("XDG_RUNTIME_DIR"),
287        }
288        match orig_cache {
289            Some(v) => std::env::set_var("SQLITE_GRAPHRAG_CACHE_DIR", v),
290            None => std::env::remove_var("SQLITE_GRAPHRAG_CACHE_DIR"),
291        }
292    }
293
294    #[test]
295    fn slot_enforces_max_concurrency() {
296        let _serial = SLOT_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner());
297        let (orig_xdg, orig_cache) = isolate_slots_env();
298
299        let _g1 = acquire_llm_slot(2, 5).expect("first slot");
300        let _g2 = acquire_llm_slot(2, 5).expect("second slot");
301        let start = std::time::Instant::now();
302        let result = acquire_llm_slot(2, 1);
303        assert!(result.is_err(), "third slot should fail with max=2");
304        assert!(
305            start.elapsed() >= std::time::Duration::from_secs(1),
306            "should wait full timeout before failing"
307        );
308
309        restore_slots_env(orig_xdg, orig_cache);
310    }
311
312    #[test]
313    fn slot_releases_on_drop() {
314        let _serial = SLOT_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner());
315        let (orig_xdg, orig_cache) = isolate_slots_env();
316
317        let g1 = acquire_llm_slot(1, 5).expect("first slot");
318        drop(g1);
319        let _g2 = acquire_llm_slot(1, 5).expect("second slot after drop");
320
321        restore_slots_env(orig_xdg, orig_cache);
322    }
323
324    #[test]
325    fn slot_max_concurrent_zero_is_validation_error() {
326        let result = acquire_llm_slot(0, 1);
327        assert!(matches!(result, Err(AppError::Validation(_))));
328    }
329
330    #[test]
331    fn read_status_reflects_active_slots() {
332        let _serial = SLOT_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner());
333        let (orig_xdg, orig_cache) = isolate_slots_env();
334
335        let _g1 = acquire_llm_slot(4, 5).expect("first slot");
336        let status = read_status(4);
337        assert_eq!(status.max, 4);
338        assert!(status.active >= 1);
339        assert!(!status.pids.is_empty());
340
341        restore_slots_env(orig_xdg, orig_cache);
342    }
343
344    #[test]
345    fn concurrent_acquires_with_2_threads_serialize() {
346        let _serial = SLOT_TEST_LOCK.lock().unwrap_or_else(|e| e.into_inner());
347        let (orig_xdg, orig_cache) = isolate_slots_env();
348
349        let barrier = Arc::new(Barrier::new(3));
350        let mut handles = vec![];
351        for _ in 0..3 {
352            let b = barrier.clone();
353            handles.push(thread::spawn(move || {
354                b.wait();
355                acquire_llm_slot(2, 5)
356            }));
357        }
358        let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
359        let successes = results.iter().filter(|r| r.is_ok()).count();
360        // max=2 → no máximo 2 succeeds simultâneos (mas teste serializa)
361        assert!(successes >= 1);
362
363        restore_slots_env(orig_xdg, orig_cache);
364    }
365}