Skip to main content

sqlite_graphrag/
llm_slots.rs

1//! GAP-004 (v1.0.82): Semáforo cross-process para spawn de subprocessos LLM.
2//!
3//! Quando N sessões Claude Code rodam em paralelo no mesmo host, cada `remember`/`edit`/
4//! `recall`/`hybrid-search`/`enrich`/`deep-research`/`ingest` quer spawnar seu próprio
5//! subprocesso `codex exec` ou `claude -p`. Sem coordenação, N subprocessos saturam
6//! o rate limit OAuth compartilhado (observado: 19+ codex simultâneos no transcript
7//! de 2026-06-15).
8//!
9//! ## Solução
10//! - Slot files em `${XDG_RUNTIME_DIR:-~/.local/share}/sqlite-graphrag/llm-slots/slot-{0..N}.lock`
11//! - `fs4::FileExt::try_lock_exclusive` para atomic acquire cross-process (fcntl no Unix,
12//!   LockFileEx no Windows — `fs4` 0.9 com trustScore 9.6 confirmado via context7)
13//! - RAII guard `LlmSlotGuard` com `Drop` libera automaticamente em panic
14//! - Integração com `reaper.rs::scan_and_kill_orphans` para detectar slots órfãos
15//!
16//! ## Uso
17//! ```rust,ignore
18//! use crate::llm_slots::acquire_llm_slot;
19//!
20//! let _guard = acquire_llm_slot(4, 30)?;
21//! // ... spawn subprocesso LLM ...
22//! // guard libera o slot automaticamente ao sair do escopo
23//! ```
24
25use fs4::fs_std::FileExt;
26use std::fs::{self, File, OpenOptions};
27use std::path::PathBuf;
28use std::time::{Duration, Instant};
29
30use crate::errors::AppError;
31
32/// RAII guard que libera o slot automaticamente em panic, cancelamento abrupto
33/// ou término normal do escopo.
34pub struct LlmSlotGuard {
35    #[allow(dead_code)]
36    slot_file: File,
37    slot_id: u32,
38    acquired_at: Instant,
39}
40
41impl LlmSlotGuard {
42    /// Returns the slot id (0..max-1) this guard holds. Used by
43    /// `slots release --slot-id N` to map back to the file path.
44    pub fn slot_id(&self) -> u32 {
45        self.slot_id
46    }
47}
48
49impl Drop for LlmSlotGuard {
50    fn drop(&mut self) {
51        // Libera o lock do filesystem E remove o slot file.
52        // O flock é liberado automaticamente quando `slot_file` é dropado (RAII).
53        let path = slot_path(self.slot_id);
54        if let Err(e) = fs::remove_file(&path) {
55            tracing::debug!(slot_id = self.slot_id, error = %e, "slot file removal failed (already gone?)");
56        }
57        tracing::debug!(
58            slot_id = self.slot_id,
59            held_ms = self.acquired_at.elapsed().as_millis() as u64,
60            "llm slot released"
61        );
62    }
63}
64
65/// Adquire um slot LLM livre, aguardando até `wait_secs` segundos.
66///
67/// Itera sobre `slot_id` em `[0, max_concurrent)` e tenta `create_new` + `try_lock_exclusive`.
68/// Se todos os slots estão ocupados, polling com `sleep(100ms)` até `wait_secs` expirar.
69///
70/// ## Erros
71/// - `AppError::LockBusy` (exit 75) se `wait_secs` expirar sem slot livre
72/// - `AppError::Io` se filesystem falhar
73pub fn acquire_llm_slot(max_concurrent: u32, wait_secs: u64) -> Result<LlmSlotGuard, AppError> {
74    if max_concurrent == 0 {
75        return Err(AppError::Validation(
76            "max_concurrent deve ser >= 1 para acquire_llm_slot".to_string(),
77        ));
78    }
79    let dir = slots_dir();
80    fs::create_dir_all(&dir).map_err(|e| {
81        AppError::Io(std::io::Error::new(
82            e.kind(),
83            format!("failed to create slots dir {}: {e}", dir.display()),
84        ))
85    })?;
86
87    let start = Instant::now();
88    let timeout = Duration::from_secs(wait_secs);
89
90    loop {
91        for slot_id in 0..max_concurrent {
92            let path = slot_path(slot_id);
93            match OpenOptions::new().write(true).create_new(true).open(&path) {
94                Ok(mut file) => {
95                    if file.try_lock_exclusive().is_ok() {
96                        let pid = std::process::id();
97                        // Escreve pid no arquivo para que  possa reportar
98                        use std::io::Write;
99                        let _ = writeln!(file, "pid={pid}");
100                        tracing::debug!(slot_id, pid, "llm slot acquired");
101                        return Ok(LlmSlotGuard {
102                            slot_file: file,
103                            slot_id,
104                            acquired_at: Instant::now(),
105                        });
106                    }
107                    // Slot file existe mas está locked por outro processo
108                }
109                Err(_) => {
110                    // Slot file já existe (race condition rara) — tenta próximo
111                }
112            }
113        }
114        // Todos os slots ocupados — polling
115        if start.elapsed() >= timeout {
116            return Err(AppError::LockBusy(format!(
117                "failed to acquire LLM slot within {wait_secs}s (max={max_concurrent} concurrent)"
118            )));
119        }
120        std::thread::sleep(Duration::from_millis(100));
121    }
122}
123
124/// Retorna o status atual dos slots LLM (para subcomando `slots status --json`).
125#[derive(Debug, Clone, serde::Serialize)]
126pub struct SlotStatus {
127    pub max: u32,
128    pub active: u32,
129    pub pids: Vec<u32>,
130}
131
132pub fn read_status(max_concurrent: u32) -> SlotStatus {
133    let mut active = 0u32;
134    let mut pids = Vec::new();
135    for slot_id in 0..max_concurrent {
136        let path = slot_path(slot_id);
137        if path.exists() {
138            active += 1;
139            if let Ok(content) = fs::read_to_string(&path) {
140                if let Some(pid_line) = content.lines().find(|l| l.starts_with("pid=")) {
141                    if let Ok(pid) = pid_line[4..].parse::<u32>() {
142                        pids.push(pid);
143                    }
144                }
145            }
146        }
147    }
148    SlotStatus {
149        max: max_concurrent,
150        active,
151        pids,
152    }
153}
154
155/// Libera um slot específico (para subcomando `slots release --slot-id N --yes`).
156pub fn force_release(slot_id: u32) -> Result<(), AppError> {
157    let path = slot_path(slot_id);
158    if path.exists() {
159        fs::remove_file(&path).map_err(|e| {
160            AppError::Io(std::io::Error::new(
161                e.kind(),
162                format!("failed to release slot {slot_id}: {e}"),
163            ))
164        })?;
165    }
166    Ok(())
167}
168
169/// Lista IDs de slots stale (PIDs órfãos) — para cleanup automático.
170pub fn find_stale_slots(max_concurrent: u32) -> Vec<u32> {
171    let mut stale = Vec::new();
172    for slot_id in 0..max_concurrent {
173        let path = slot_path(slot_id);
174        if path.exists() {
175            if let Ok(content) = fs::read_to_string(&path) {
176                if let Some(pid_line) = content.lines().find(|l| l.starts_with("pid=")) {
177                    if let Ok(pid) = pid_line[4..].parse::<u32>() {
178                        if !pid_alive(pid) {
179                            stale.push(slot_id);
180                        }
181                    }
182                }
183            }
184        }
185    }
186    stale
187}
188
189/// Verifica se um PID está vivo no sistema (best-effort cross-platform).
190#[cfg(unix)]
191fn pid_alive(pid: u32) -> bool {
192    // Tenta enviar signal 0 (no-op) para verificar existência
193    unsafe { libc::kill(pid as i32, 0) == 0 }
194}
195
196#[cfg(not(unix))]
197fn pid_alive(pid: u32) -> bool {
198    // No Windows, sem equivalente direto; assume vivo se arquivo existe.
199    // Cleanup manual via `slots cleanup --yes` é a via.
200    let _ = pid;
201    true
202}
203
204pub fn slots_dir() -> PathBuf {
205    let base = std::env::var("XDG_RUNTIME_DIR")
206        .or_else(|_| std::env::var("SQLITE_GRAPHRAG_CACHE_DIR"))
207        .unwrap_or_else(|_| {
208            std::env::var("HOME")
209                .map(|h| format!("{h}/.local/share"))
210                .unwrap_or_else(|_| "/tmp".to_string())
211        });
212    PathBuf::from(base).join("sqlite-graphrag/llm-slots")
213}
214
215pub fn slot_path(id: u32) -> PathBuf {
216    slots_dir().join(format!("slot-{id}.lock"))
217}
218
219/// Resolves the default LLM max-host-concurrency value.
220///
221/// Calibrated for the LLM-only build: each worker holds one subprocess
222/// `codex` or `claude` invocation. The formula mirrors the CLI semaphore
223/// in `lock::calculate_safe_concurrency`:
224///   `min(ncpus, available_memory_mb / LLM_WORKER_RSS_MB)`
225///
226/// Falls back to `MAX_CONCURRENT_CLI_INSTANCES` (16) when `sysinfo`
227/// cannot read `/proc/meminfo` (rare).
228pub fn default_max_concurrency() -> u32 {
229    let cpus = std::thread::available_parallelism()
230        .map(|n| n.get() as u32)
231        .unwrap_or(4);
232    // Without `sysinfo` at hand here, we use a conservative memory
233    // estimate: 4 GiB available on most hosts. The CLI semaphore in
234    // `lock::calculate_safe_concurrency` is the source of truth when
235    // exact memory data is available; this fallback just keeps the
236    // LLM slot default in the same order of magnitude.
237    let assumed_available_mb: u32 = 4096;
238    let per_worker = crate::constants::LLM_WORKER_RSS_MB as u32;
239    let safe = assumed_available_mb / per_worker.max(1);
240    let capped = safe.min(crate::constants::MAX_CONCURRENT_CLI_INSTANCES as u32);
241    cpus.min(capped).max(1)
242}
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247    use std::sync::Arc;
248    use std::sync::Barrier;
249    use std::thread;
250
251    fn unique_test_dir() -> PathBuf {
252        let mut dir = std::env::temp_dir();
253        dir.push(format!(
254            "llm-slots-test-{}-{}",
255            std::process::id(),
256            std::time::SystemTime::now()
257                .duration_since(std::time::UNIX_EPOCH)
258                .unwrap()
259                .as_nanos()
260        ));
261        dir
262    }
263
264    #[test]
265    fn slot_enforces_max_concurrency() {
266        // Salva env var, seta tempdir
267        let original = std::env::var("SQLITE_GRAPHRAG_CACHE_DIR").ok();
268        std::env::set_var("SQLITE_GRAPHRAG_CACHE_DIR", unique_test_dir());
269
270        let _g1 = acquire_llm_slot(2, 5).expect("first slot");
271        let _g2 = acquire_llm_slot(2, 5).expect("second slot");
272        let start = std::time::Instant::now();
273        let result = acquire_llm_slot(2, 1);
274        assert!(result.is_err(), "third slot should fail with max=2");
275        assert!(
276            start.elapsed() >= std::time::Duration::from_secs(1),
277            "should wait full timeout before failing"
278        );
279
280        if let Some(v) = original {
281            std::env::set_var("SQLITE_GRAPHRAG_CACHE_DIR", v);
282        } else {
283            std::env::remove_var("SQLITE_GRAPHRAG_CACHE_DIR");
284        }
285    }
286
287    #[test]
288    fn slot_releases_on_drop() {
289        let original = std::env::var("SQLITE_GRAPHRAG_CACHE_DIR").ok();
290        std::env::set_var("SQLITE_GRAPHRAG_CACHE_DIR", unique_test_dir());
291
292        let g1 = acquire_llm_slot(1, 5).expect("first slot");
293        drop(g1);
294        // Após drop, novo acquire deve succeed
295        let _g2 = acquire_llm_slot(1, 5).expect("second slot after drop");
296
297        if let Some(v) = original {
298            std::env::set_var("SQLITE_GRAPHRAG_CACHE_DIR", v);
299        } else {
300            std::env::remove_var("SQLITE_GRAPHRAG_CACHE_DIR");
301        }
302    }
303
304    #[test]
305    fn slot_max_concurrent_zero_is_validation_error() {
306        let result = acquire_llm_slot(0, 1);
307        assert!(matches!(result, Err(AppError::Validation(_))));
308    }
309
310    #[test]
311    fn read_status_reflects_active_slots() {
312        let original = std::env::var("SQLITE_GRAPHRAG_CACHE_DIR").ok();
313        std::env::set_var("SQLITE_GRAPHRAG_CACHE_DIR", unique_test_dir());
314
315        let _g1 = acquire_llm_slot(4, 5).expect("first slot");
316        let status = read_status(4);
317        assert_eq!(status.max, 4);
318        assert!(status.active >= 1);
319        assert!(!status.pids.is_empty());
320
321        if let Some(v) = original {
322            std::env::set_var("SQLITE_GRAPHRAG_CACHE_DIR", v);
323        } else {
324            std::env::remove_var("SQLITE_GRAPHRAG_CACHE_DIR");
325        }
326    }
327
328    #[test]
329    fn concurrent_acquires_with_2_threads_serialize() {
330        let original = std::env::var("SQLITE_GRAPHRAG_CACHE_DIR").ok();
331        std::env::set_var("SQLITE_GRAPHRAG_CACHE_DIR", unique_test_dir());
332
333        let barrier = Arc::new(Barrier::new(3));
334        let mut handles = vec![];
335        for _ in 0..3 {
336            let b = barrier.clone();
337            handles.push(thread::spawn(move || {
338                b.wait();
339                acquire_llm_slot(2, 5)
340            }));
341        }
342        let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
343        let successes = results.iter().filter(|r| r.is_ok()).count();
344        // max=2 → no máximo 2 succeeds simultâneos (mas teste serializa)
345        assert!(successes >= 1);
346
347        if let Some(v) = original {
348            std::env::set_var("SQLITE_GRAPHRAG_CACHE_DIR", v);
349        } else {
350            std::env::remove_var("SQLITE_GRAPHRAG_CACHE_DIR");
351        }
352    }
353}