sqlite_graphrag/
llm_slots.rs1use fs4::fs_std::FileExt;
26use std::fs::{self, File, OpenOptions};
27use std::path::PathBuf;
28use std::time::{Duration, Instant};
29
30use crate::errors::AppError;
31
32pub struct LlmSlotGuard {
35 #[allow(dead_code)]
36 slot_file: File,
37 slot_id: u32,
38 acquired_at: Instant,
39}
40
41impl LlmSlotGuard {
42 pub fn slot_id(&self) -> u32 {
45 self.slot_id
46 }
47}
48
49impl Drop for LlmSlotGuard {
50 fn drop(&mut self) {
51 let path = slot_path(self.slot_id);
54 if let Err(e) = fs::remove_file(&path) {
55 tracing::debug!(slot_id = self.slot_id, error = %e, "slot file removal failed (already gone?)");
56 }
57 tracing::debug!(
58 slot_id = self.slot_id,
59 held_ms = self.acquired_at.elapsed().as_millis() as u64,
60 "llm slot released"
61 );
62 }
63}
64
65pub fn acquire_llm_slot(max_concurrent: u32, wait_secs: u64) -> Result<LlmSlotGuard, AppError> {
74 if max_concurrent == 0 {
75 return Err(AppError::Validation(
76 "max_concurrent deve ser >= 1 para acquire_llm_slot".to_string(),
77 ));
78 }
79 let dir = slots_dir();
80 fs::create_dir_all(&dir).map_err(|e| {
81 AppError::Io(std::io::Error::new(
82 e.kind(),
83 format!("failed to create slots dir {}: {e}", dir.display()),
84 ))
85 })?;
86
87 let stale = find_stale_slots(max_concurrent);
88 for slot_id in &stale {
89 let _ = force_release(*slot_id);
90 tracing::info!(slot_id, "released stale LLM slot (PID dead)");
91 }
92
93 let start = Instant::now();
94 let timeout = Duration::from_secs(wait_secs);
95
96 loop {
97 for slot_id in 0..max_concurrent {
98 let path = slot_path(slot_id);
99 match OpenOptions::new().write(true).create_new(true).open(&path) {
100 Ok(mut file) => {
101 if file.try_lock_exclusive().is_ok() {
102 let pid = std::process::id();
103 use std::io::Write;
105 let _ = writeln!(file, "pid={pid}");
106 tracing::debug!(slot_id, pid, "llm slot acquired");
107 return Ok(LlmSlotGuard {
108 slot_file: file,
109 slot_id,
110 acquired_at: Instant::now(),
111 });
112 }
113 }
115 Err(_) => {
116 }
118 }
119 }
120 if start.elapsed() >= timeout {
122 return Err(AppError::LockBusy(format!(
123 "failed to acquire LLM slot within {wait_secs}s (max={max_concurrent} concurrent)"
124 )));
125 }
126 std::thread::sleep(Duration::from_millis(100));
127 }
128}
129
130#[derive(Debug, Clone, serde::Serialize)]
132pub struct SlotStatus {
133 pub max: u32,
134 pub active: u32,
135 pub pids: Vec<u32>,
136}
137
138pub fn read_status(max_concurrent: u32) -> SlotStatus {
139 let mut active = 0u32;
140 let mut pids = Vec::new();
141 for slot_id in 0..max_concurrent {
142 let path = slot_path(slot_id);
143 if path.exists() {
144 active += 1;
145 if let Ok(content) = fs::read_to_string(&path) {
146 if let Some(pid_line) = content.lines().find(|l| l.starts_with("pid=")) {
147 if let Ok(pid) = pid_line[4..].parse::<u32>() {
148 pids.push(pid);
149 }
150 }
151 }
152 }
153 }
154 SlotStatus {
155 max: max_concurrent,
156 active,
157 pids,
158 }
159}
160
161pub fn force_release(slot_id: u32) -> Result<(), AppError> {
163 let path = slot_path(slot_id);
164 if path.exists() {
165 fs::remove_file(&path).map_err(|e| {
166 AppError::Io(std::io::Error::new(
167 e.kind(),
168 format!("failed to release slot {slot_id}: {e}"),
169 ))
170 })?;
171 }
172 Ok(())
173}
174
175pub fn find_stale_slots(max_concurrent: u32) -> Vec<u32> {
177 let mut stale = Vec::new();
178 for slot_id in 0..max_concurrent {
179 let path = slot_path(slot_id);
180 if path.exists() {
181 if let Ok(content) = fs::read_to_string(&path) {
182 if let Some(pid_line) = content.lines().find(|l| l.starts_with("pid=")) {
183 if let Ok(pid) = pid_line[4..].parse::<u32>() {
184 if !pid_alive(pid) {
185 stale.push(slot_id);
186 }
187 }
188 }
189 }
190 }
191 }
192 stale
193}
194
195#[cfg(unix)]
197fn pid_alive(pid: u32) -> bool {
198 unsafe { libc::kill(pid as i32, 0) == 0 }
200}
201
202#[cfg(not(unix))]
203fn pid_alive(pid: u32) -> bool {
204 let _ = pid;
207 true
208}
209
210pub fn slots_dir() -> PathBuf {
211 let base = std::env::var("XDG_RUNTIME_DIR")
212 .or_else(|_| std::env::var("SQLITE_GRAPHRAG_CACHE_DIR"))
213 .unwrap_or_else(|_| {
214 std::env::var("HOME")
215 .map(|h| format!("{h}/.local/share"))
216 .unwrap_or_else(|_| "/tmp".to_string())
217 });
218 PathBuf::from(base).join("sqlite-graphrag/llm-slots")
219}
220
221pub fn slot_path(id: u32) -> PathBuf {
222 slots_dir().join(format!("slot-{id}.lock"))
223}
224
225pub fn default_max_concurrency() -> u32 {
235 let cpus = std::thread::available_parallelism()
236 .map(|n| n.get() as u32)
237 .unwrap_or(4);
238 let assumed_available_mb: u32 = 4096;
244 let per_worker = crate::constants::LLM_WORKER_RSS_MB as u32;
245 let safe = assumed_available_mb / per_worker.max(1);
246 let capped = safe.min(crate::constants::MAX_CONCURRENT_CLI_INSTANCES as u32);
247 cpus.min(capped).max(1)
248}
249
250#[cfg(test)]
251mod tests {
252 use super::*;
253 use std::sync::Arc;
254 use std::sync::Barrier;
255 use std::thread;
256
257 fn unique_test_dir() -> PathBuf {
258 let mut dir = std::env::temp_dir();
259 dir.push(format!(
260 "llm-slots-test-{}-{}",
261 std::process::id(),
262 std::time::SystemTime::now()
263 .duration_since(std::time::UNIX_EPOCH)
264 .unwrap()
265 .as_nanos()
266 ));
267 dir
268 }
269
270 #[test]
271 fn slot_enforces_max_concurrency() {
272 let original = std::env::var("SQLITE_GRAPHRAG_CACHE_DIR").ok();
274 std::env::set_var("SQLITE_GRAPHRAG_CACHE_DIR", unique_test_dir());
275
276 let _g1 = acquire_llm_slot(2, 5).expect("first slot");
277 let _g2 = acquire_llm_slot(2, 5).expect("second slot");
278 let start = std::time::Instant::now();
279 let result = acquire_llm_slot(2, 1);
280 assert!(result.is_err(), "third slot should fail with max=2");
281 assert!(
282 start.elapsed() >= std::time::Duration::from_secs(1),
283 "should wait full timeout before failing"
284 );
285
286 if let Some(v) = original {
287 std::env::set_var("SQLITE_GRAPHRAG_CACHE_DIR", v);
288 } else {
289 std::env::remove_var("SQLITE_GRAPHRAG_CACHE_DIR");
290 }
291 }
292
293 #[test]
294 fn slot_releases_on_drop() {
295 let original = std::env::var("SQLITE_GRAPHRAG_CACHE_DIR").ok();
296 std::env::set_var("SQLITE_GRAPHRAG_CACHE_DIR", unique_test_dir());
297
298 let g1 = acquire_llm_slot(1, 5).expect("first slot");
299 drop(g1);
300 let _g2 = acquire_llm_slot(1, 5).expect("second slot after drop");
302
303 if let Some(v) = original {
304 std::env::set_var("SQLITE_GRAPHRAG_CACHE_DIR", v);
305 } else {
306 std::env::remove_var("SQLITE_GRAPHRAG_CACHE_DIR");
307 }
308 }
309
310 #[test]
311 fn slot_max_concurrent_zero_is_validation_error() {
312 let result = acquire_llm_slot(0, 1);
313 assert!(matches!(result, Err(AppError::Validation(_))));
314 }
315
316 #[test]
317 fn read_status_reflects_active_slots() {
318 let original = std::env::var("SQLITE_GRAPHRAG_CACHE_DIR").ok();
319 std::env::set_var("SQLITE_GRAPHRAG_CACHE_DIR", unique_test_dir());
320
321 let _g1 = acquire_llm_slot(4, 5).expect("first slot");
322 let status = read_status(4);
323 assert_eq!(status.max, 4);
324 assert!(status.active >= 1);
325 assert!(!status.pids.is_empty());
326
327 if let Some(v) = original {
328 std::env::set_var("SQLITE_GRAPHRAG_CACHE_DIR", v);
329 } else {
330 std::env::remove_var("SQLITE_GRAPHRAG_CACHE_DIR");
331 }
332 }
333
334 #[test]
335 fn concurrent_acquires_with_2_threads_serialize() {
336 let original = std::env::var("SQLITE_GRAPHRAG_CACHE_DIR").ok();
337 std::env::set_var("SQLITE_GRAPHRAG_CACHE_DIR", unique_test_dir());
338
339 let barrier = Arc::new(Barrier::new(3));
340 let mut handles = vec![];
341 for _ in 0..3 {
342 let b = barrier.clone();
343 handles.push(thread::spawn(move || {
344 b.wait();
345 acquire_llm_slot(2, 5)
346 }));
347 }
348 let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
349 let successes = results.iter().filter(|r| r.is_ok()).count();
350 assert!(successes >= 1);
352
353 if let Some(v) = original {
354 std::env::set_var("SQLITE_GRAPHRAG_CACHE_DIR", v);
355 } else {
356 std::env::remove_var("SQLITE_GRAPHRAG_CACHE_DIR");
357 }
358 }
359}