sqlite_graphrag/
llm_slots.rs1use fs4::fs_std::FileExt;
26use std::fs::{self, File, OpenOptions};
27use std::path::PathBuf;
28use std::time::{Duration, Instant};
29
30use crate::errors::AppError;
31
32pub struct LlmSlotGuard {
35 #[allow(dead_code)]
36 slot_file: File,
37 slot_id: u32,
38 acquired_at: Instant,
39}
40
41impl LlmSlotGuard {
42 pub fn slot_id(&self) -> u32 {
45 self.slot_id
46 }
47}
48
49impl Drop for LlmSlotGuard {
50 fn drop(&mut self) {
51 let path = slot_path(self.slot_id);
54 if let Err(e) = fs::remove_file(&path) {
55 tracing::debug!(slot_id = self.slot_id, error = %e, "slot file removal failed (already gone?)");
56 }
57 tracing::debug!(
58 slot_id = self.slot_id,
59 held_ms = self.acquired_at.elapsed().as_millis() as u64,
60 "llm slot released"
61 );
62 }
63}
64
65pub fn acquire_llm_slot(max_concurrent: u32, wait_secs: u64) -> Result<LlmSlotGuard, AppError> {
74 if max_concurrent == 0 {
75 return Err(AppError::Validation(
76 "max_concurrent deve ser >= 1 para acquire_llm_slot".to_string(),
77 ));
78 }
79 let dir = slots_dir();
80 fs::create_dir_all(&dir).map_err(|e| {
81 AppError::Io(std::io::Error::new(
82 e.kind(),
83 format!("failed to create slots dir {}: {e}", dir.display()),
84 ))
85 })?;
86
87 let start = Instant::now();
88 let timeout = Duration::from_secs(wait_secs);
89
90 loop {
91 for slot_id in 0..max_concurrent {
92 let path = slot_path(slot_id);
93 match OpenOptions::new().write(true).create_new(true).open(&path) {
94 Ok(mut file) => {
95 if file.try_lock_exclusive().is_ok() {
96 let pid = std::process::id();
97 use std::io::Write;
99 let _ = writeln!(file, "pid={pid}");
100 tracing::debug!(slot_id, pid, "llm slot acquired");
101 return Ok(LlmSlotGuard {
102 slot_file: file,
103 slot_id,
104 acquired_at: Instant::now(),
105 });
106 }
107 }
109 Err(_) => {
110 }
112 }
113 }
114 if start.elapsed() >= timeout {
116 return Err(AppError::LockBusy(format!(
117 "failed to acquire LLM slot within {wait_secs}s (max={max_concurrent} concurrent)"
118 )));
119 }
120 std::thread::sleep(Duration::from_millis(100));
121 }
122}
123
124#[derive(Debug, Clone, serde::Serialize)]
126pub struct SlotStatus {
127 pub max: u32,
128 pub active: u32,
129 pub pids: Vec<u32>,
130}
131
132pub fn read_status(max_concurrent: u32) -> SlotStatus {
133 let mut active = 0u32;
134 let mut pids = Vec::new();
135 for slot_id in 0..max_concurrent {
136 let path = slot_path(slot_id);
137 if path.exists() {
138 active += 1;
139 if let Ok(content) = fs::read_to_string(&path) {
140 if let Some(pid_line) = content.lines().find(|l| l.starts_with("pid=")) {
141 if let Ok(pid) = pid_line[4..].parse::<u32>() {
142 pids.push(pid);
143 }
144 }
145 }
146 }
147 }
148 SlotStatus {
149 max: max_concurrent,
150 active,
151 pids,
152 }
153}
154
155pub fn force_release(slot_id: u32) -> Result<(), AppError> {
157 let path = slot_path(slot_id);
158 if path.exists() {
159 fs::remove_file(&path).map_err(|e| {
160 AppError::Io(std::io::Error::new(
161 e.kind(),
162 format!("failed to release slot {slot_id}: {e}"),
163 ))
164 })?;
165 }
166 Ok(())
167}
168
169pub fn find_stale_slots(max_concurrent: u32) -> Vec<u32> {
171 let mut stale = Vec::new();
172 for slot_id in 0..max_concurrent {
173 let path = slot_path(slot_id);
174 if path.exists() {
175 if let Ok(content) = fs::read_to_string(&path) {
176 if let Some(pid_line) = content.lines().find(|l| l.starts_with("pid=")) {
177 if let Ok(pid) = pid_line[4..].parse::<u32>() {
178 if !pid_alive(pid) {
179 stale.push(slot_id);
180 }
181 }
182 }
183 }
184 }
185 }
186 stale
187}
188
189#[cfg(unix)]
191fn pid_alive(pid: u32) -> bool {
192 unsafe { libc::kill(pid as i32, 0) == 0 }
194}
195
196#[cfg(not(unix))]
197fn pid_alive(pid: u32) -> bool {
198 let _ = pid;
201 true
202}
203
204pub fn slots_dir() -> PathBuf {
205 let base = std::env::var("XDG_RUNTIME_DIR")
206 .or_else(|_| std::env::var("SQLITE_GRAPHRAG_CACHE_DIR"))
207 .unwrap_or_else(|_| {
208 std::env::var("HOME")
209 .map(|h| format!("{h}/.local/share"))
210 .unwrap_or_else(|_| "/tmp".to_string())
211 });
212 PathBuf::from(base).join("sqlite-graphrag/llm-slots")
213}
214
215pub fn slot_path(id: u32) -> PathBuf {
216 slots_dir().join(format!("slot-{id}.lock"))
217}
218
219pub fn default_max_concurrency() -> u32 {
229 let cpus = std::thread::available_parallelism()
230 .map(|n| n.get() as u32)
231 .unwrap_or(4);
232 let assumed_available_mb: u32 = 4096;
238 let per_worker = crate::constants::LLM_WORKER_RSS_MB as u32;
239 let safe = assumed_available_mb / per_worker.max(1);
240 let capped = safe.min(crate::constants::MAX_CONCURRENT_CLI_INSTANCES as u32);
241 cpus.min(capped).max(1)
242}
243
244#[cfg(test)]
245mod tests {
246 use super::*;
247 use std::sync::Arc;
248 use std::sync::Barrier;
249 use std::thread;
250
251 fn unique_test_dir() -> PathBuf {
252 let mut dir = std::env::temp_dir();
253 dir.push(format!(
254 "llm-slots-test-{}-{}",
255 std::process::id(),
256 std::time::SystemTime::now()
257 .duration_since(std::time::UNIX_EPOCH)
258 .unwrap()
259 .as_nanos()
260 ));
261 dir
262 }
263
264 #[test]
265 fn slot_enforces_max_concurrency() {
266 let original = std::env::var("SQLITE_GRAPHRAG_CACHE_DIR").ok();
268 std::env::set_var("SQLITE_GRAPHRAG_CACHE_DIR", unique_test_dir());
269
270 let _g1 = acquire_llm_slot(2, 5).expect("first slot");
271 let _g2 = acquire_llm_slot(2, 5).expect("second slot");
272 let start = std::time::Instant::now();
273 let result = acquire_llm_slot(2, 1);
274 assert!(result.is_err(), "third slot should fail with max=2");
275 assert!(
276 start.elapsed() >= std::time::Duration::from_secs(1),
277 "should wait full timeout before failing"
278 );
279
280 if let Some(v) = original {
281 std::env::set_var("SQLITE_GRAPHRAG_CACHE_DIR", v);
282 } else {
283 std::env::remove_var("SQLITE_GRAPHRAG_CACHE_DIR");
284 }
285 }
286
287 #[test]
288 fn slot_releases_on_drop() {
289 let original = std::env::var("SQLITE_GRAPHRAG_CACHE_DIR").ok();
290 std::env::set_var("SQLITE_GRAPHRAG_CACHE_DIR", unique_test_dir());
291
292 let g1 = acquire_llm_slot(1, 5).expect("first slot");
293 drop(g1);
294 let _g2 = acquire_llm_slot(1, 5).expect("second slot after drop");
296
297 if let Some(v) = original {
298 std::env::set_var("SQLITE_GRAPHRAG_CACHE_DIR", v);
299 } else {
300 std::env::remove_var("SQLITE_GRAPHRAG_CACHE_DIR");
301 }
302 }
303
304 #[test]
305 fn slot_max_concurrent_zero_is_validation_error() {
306 let result = acquire_llm_slot(0, 1);
307 assert!(matches!(result, Err(AppError::Validation(_))));
308 }
309
310 #[test]
311 fn read_status_reflects_active_slots() {
312 let original = std::env::var("SQLITE_GRAPHRAG_CACHE_DIR").ok();
313 std::env::set_var("SQLITE_GRAPHRAG_CACHE_DIR", unique_test_dir());
314
315 let _g1 = acquire_llm_slot(4, 5).expect("first slot");
316 let status = read_status(4);
317 assert_eq!(status.max, 4);
318 assert!(status.active >= 1);
319 assert!(!status.pids.is_empty());
320
321 if let Some(v) = original {
322 std::env::set_var("SQLITE_GRAPHRAG_CACHE_DIR", v);
323 } else {
324 std::env::remove_var("SQLITE_GRAPHRAG_CACHE_DIR");
325 }
326 }
327
328 #[test]
329 fn concurrent_acquires_with_2_threads_serialize() {
330 let original = std::env::var("SQLITE_GRAPHRAG_CACHE_DIR").ok();
331 std::env::set_var("SQLITE_GRAPHRAG_CACHE_DIR", unique_test_dir());
332
333 let barrier = Arc::new(Barrier::new(3));
334 let mut handles = vec![];
335 for _ in 0..3 {
336 let b = barrier.clone();
337 handles.push(thread::spawn(move || {
338 b.wait();
339 acquire_llm_slot(2, 5)
340 }));
341 }
342 let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
343 let successes = results.iter().filter(|r| r.is_ok()).count();
344 assert!(successes >= 1);
346
347 if let Some(v) = original {
348 std::env::set_var("SQLITE_GRAPHRAG_CACHE_DIR", v);
349 } else {
350 std::env::remove_var("SQLITE_GRAPHRAG_CACHE_DIR");
351 }
352 }
353}