Skip to main content

sqlite_graphrag/
lock.rs

1//! Counting semaphore via lock files to limit parallel CLI invocations.
2//!
3//! `acquire_cli_slot` tries to acquire one of `N` available slots by opening the file
4//! `cli-slot-{N}.lock` in the OS cache directory and obtaining an exclusive `flock`.
5//! The returned [`std::fs::File`] MUST be kept alive for the entire duration of `main`;
6//! dropping it releases the slot automatically for the next invocation.
7//!
8//! When `wait_seconds` is `Some(n) > 0`, the function polls every
9//! [`crate::constants::CLI_LOCK_POLL_INTERVAL_MS`] milliseconds until the deadline. When it
10//! is `None` or `Some(0)`, a single attempt is made and `Err(AppError::AllSlotsFull)` is
11//! returned immediately if all slots are occupied.
12//!
13//! ## Job-type singleton (G28-B, v1.0.68)
14//!
15//! Heavy long-running jobs (`enrich`, `ingest --mode claude-code`,
16//! `ingest --mode codex`) also acquire a *singleton* lock per `(job_type,
17//! namespace)` via `acquire_job_singleton`.  This guarantees at most one
18//! heavy job per namespace runs at any time, which was the root cause
19//! of the 2026-06-03 process-proliferation incident (4 parallel `enrich`
20//! instances × N workers × 10 MCP servers = ~192 spawned processes).
21// Workload: I/O-bound (flock polling with exponential backoff sleep)
22
23use std::fs::{File, OpenOptions};
24use std::path::{Path, PathBuf};
25use std::thread;
26use std::time::{Duration, Instant};
27
28use directories::ProjectDirs;
29use fs4::fs_std::FileExt;
30
31use crate::constants::{
32    CLI_LOCK_POLL_INTERVAL_MS, EMBEDDING_LOAD_EXPECTED_RSS_MB, JOB_SINGLETON_POLL_INTERVAL_MS,
33    LLM_WORKER_RSS_MB, MAX_CONCURRENT_CLI_INSTANCES,
34};
35use crate::errors::AppError;
36
37/// Job-type classification for `acquire_job_singleton`.
38///
39/// `Light` is intentionally NOT a variant here because lightweight
40/// commands (`recall`, `stats`, `read`, `list`) share the existing
41/// counting-semaphore in [`acquire_cli_slot`] and do not need a singleton.
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum JobType {
44    /// `enrich` command (LLM-driven entity/relation/body enrichment).
45    Enrich,
46    /// `ingest --mode claude-code` (LLM-curated ingestion).
47    IngestClaudeCode,
48    /// `ingest --mode codex` (OpenAI Codex CLI ingestion).
49    IngestCodex,
50}
51
52impl JobType {
53    /// Returns the kebab-case tag used inside the lock file name.
54    fn tag(self) -> &'static str {
55        match self {
56            JobType::Enrich => "enrich",
57            JobType::IngestClaudeCode => "ingest-claude-code",
58            JobType::IngestCodex => "ingest-codex",
59        }
60    }
61}
62
63/// Returns the lock file path for the given slot.
64///
65/// Honours `SQLITE_GRAPHRAG_CACHE_DIR` when set (useful for tests, containers,
66/// and NFS caches), falling back to the OS default cache directory via
67/// `directories::ProjectDirs`. The slot must be 1-based.
68fn slot_path(slot: usize) -> Result<PathBuf, AppError> {
69    let cache = cache_dir()?;
70    std::fs::create_dir_all(&cache)?;
71    Ok(cache.join(format!("cli-slot-{slot}.lock")))
72}
73
74/// Resolves the lock-file directory honouring `SQLITE_GRAPHRAG_CACHE_DIR`.
75fn cache_dir() -> Result<PathBuf, AppError> {
76    if let Some(override_dir) = std::env::var_os("SQLITE_GRAPHRAG_CACHE_DIR") {
77        Ok(PathBuf::from(override_dir))
78    } else {
79        let dirs = ProjectDirs::from("", "", "sqlite-graphrag").ok_or_else(|| {
80            AppError::Io(std::io::Error::new(
81                std::io::ErrorKind::NotFound,
82                "could not determine cache directory for sqlite-graphrag lock files",
83            ))
84        })?;
85        Ok(dirs.cache_dir().to_path_buf())
86    }
87}
88
89/// Computes a short, filesystem-safe hash of the database path so two distinct
90/// databases (e.g. `/tmp/a.sqlite` and `/tmp/b.sqlite`) get distinct lock
91/// files in the shared cache directory. First 12 hex chars of BLAKE3 are
92/// sufficient for collision avoidance across the local filesystem.
93pub fn db_path_hash(db_path: &Path) -> String {
94    let canonical = db_path
95        .canonicalize()
96        .unwrap_or_else(|_| db_path.to_path_buf());
97    let hash = blake3::hash(canonical.to_string_lossy().as_bytes());
98    hash.to_hex().to_string()[..12].to_string()
99}
100
101/// Returns the singleton lock file path for a given (job_type, namespace, db_hash).
102///
103/// Layout: `job-singleton-{tag}-{namespace_slug}-{db_hash}.lock` in the same
104/// cache dir as the CLI slots. The namespace is sanitised to a filesystem-safe
105/// slug (lowercase, hyphens, alphanumeric) and defaults to `default` when
106/// empty. The `db_hash` is the BLAKE3 prefix returned by [`db_path_hash`].
107///
108/// G30 (v1.0.69): the previous implementation ignored the database path
109/// entirely, so two concurrent `enrich` invocations against different
110/// `graphrag.sqlite` files (production vs. test) collided on the same
111/// cache-dir lock. The db_hash scope makes the singleton per-database while
112/// still sharing the same cache dir.
113pub fn job_singleton_path(
114    job_type: JobType,
115    namespace: &str,
116    db_hash: &str,
117) -> Result<PathBuf, AppError> {
118    let cache = cache_dir()?;
119    std::fs::create_dir_all(&cache)?;
120    let slug = if namespace.is_empty() {
121        "default".to_string()
122    } else {
123        namespace
124            .chars()
125            .map(|c| {
126                if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
127                    c.to_ascii_lowercase()
128                } else {
129                    '-'
130                }
131            })
132            .collect::<String>()
133    };
134    let safe_hash: String = db_hash
135        .chars()
136        .filter(|c| c.is_ascii_alphanumeric())
137        .take(16)
138        .collect();
139    Ok(cache.join(format!(
140        "job-singleton-{}-{slug}-{safe_hash}.lock",
141        job_type.tag()
142    )))
143}
144
145/// Tries to open and exclusively lock the lock file for the given slot.
146///
147/// Returns `Ok(file)` if the slot is free, or `Err(io::Error)` if it is
148/// held by another instance (non-blocking).
149fn try_acquire_slot(slot: usize) -> Result<File, AppError> {
150    let path = slot_path(slot)?;
151    let file = OpenOptions::new()
152        .read(true)
153        .write(true)
154        .create(true)
155        .truncate(false)
156        .open(&path)?;
157    file.try_lock_exclusive().map_err(AppError::Io)?;
158    Ok(file)
159}
160
161/// Acquires a concurrency slot from the `max_concurrency`-position semaphore.
162///
163/// Iterates slots `1..=max_concurrency` attempting `try_lock_exclusive` on each
164/// `cli-slot-N.lock` file. When a free slot is found, returns `(File, slot_number)`.
165/// If all slots are occupied:
166///
167/// - If `wait_seconds` is `None` or `Some(0)`, returns immediately with
168///   `AppError::AllSlotsFull { max, waited_secs: 0 }`.
169/// - If `wait_seconds` is `Some(n) > 0`, enters a polling loop every
170///
171/// Returns the maximum number of parallel CLI instances the host can sustain
172/// without thrashing. The formula:
173///
174///   safe = min(cpus, available_mb / per_worker_mb) * 1.0
175///
176/// replaces the previous `... * 0.5` halving factor. The `* 0.5` was the
177/// root cause of G18: even on a 64 GB host the result was always
178/// clamped to 4 because of the division-by-2.
179///
180/// The per-worker cost is the lower of `EMBEDDING_LOAD_EXPECTED_RSS_MB`
181/// (1100) and `LLM_WORKER_RSS_MB` (350), so LLM-only builds get a
182/// proportionally higher parallelism budget without changing the API.
183///
184/// Returns 1 as a defensive floor when system stats are unavailable.
185pub fn calculate_safe_concurrency() -> usize {
186    use sysinfo::System;
187    let mut sys = System::new();
188    sys.refresh_memory();
189    let available_mb = sys.available_memory() / 1_048_576;
190    let cpus = std::thread::available_parallelism()
191        .map(|n| n.get())
192        .unwrap_or(2);
193
194    let per_worker_mb = if cfg!(feature = "llm-only") && !cfg!(feature = "embedding-legacy") {
195        LLM_WORKER_RSS_MB
196    } else if cfg!(feature = "embedding-legacy") && !cfg!(feature = "llm-only") {
197        EMBEDDING_LOAD_EXPECTED_RSS_MB
198    } else {
199        LLM_WORKER_RSS_MB.min(EMBEDDING_LOAD_EXPECTED_RSS_MB)
200    };
201
202    let memory_bound = if available_mb == 0 {
203        cpus
204    } else {
205        (available_mb / per_worker_mb.max(1)) as usize
206    };
207    let raw = cpus.min(memory_bound).max(1);
208    raw.min(MAX_CONCURRENT_CLI_INSTANCES)
209}
210
211/// v1.0.75 — Returns the worker cost in MiB used by `calculate_safe_concurrency`.
212/// Exposed for telemetry and `--info` output.
213pub fn worker_cost_mb() -> u64 {
214    if cfg!(feature = "llm-only") && !cfg!(feature = "embedding-legacy") {
215        LLM_WORKER_RSS_MB
216    } else if cfg!(feature = "embedding-legacy") && !cfg!(feature = "llm-only") {
217        EMBEDDING_LOAD_EXPECTED_RSS_MB
218    } else {
219        LLM_WORKER_RSS_MB.min(EMBEDDING_LOAD_EXPECTED_RSS_MB)
220    }
221}
222
223///   `AppError::AllSlotsFull { max, waited_secs: n }` if no slot opens.
224///
225/// The returned `File` MUST be kept alive until the process exits; dropping it
226/// releases the slot automatically via the implicit `flock` on close.
227pub fn acquire_cli_slot(
228    max_concurrency: usize,
229    wait_seconds: Option<u64>,
230) -> Result<(File, usize), AppError> {
231    // G18: use env override or 2*cpus as ceiling instead of hardcoded 4
232    let ncpus = std::thread::available_parallelism()
233        .map(|n| n.get())
234        .unwrap_or(4);
235    let ceiling = std::env::var("SQLITE_GRAPHRAG_MAX_CLI_INSTANCES")
236        .ok()
237        .and_then(|v| v.parse::<usize>().ok())
238        .unwrap_or_else(|| (2 * ncpus).max(MAX_CONCURRENT_CLI_INSTANCES));
239    let max = max_concurrency.clamp(1, ceiling);
240    let wait_secs = wait_seconds.unwrap_or(0);
241
242    // Tentativa inicial sem espera.
243    if let Some((file, slot)) = try_any_slot(max)? {
244        return Ok((file, slot));
245    }
246
247    if wait_secs == 0 {
248        return Err(AppError::AllSlotsFull {
249            max,
250            waited_secs: 0,
251        });
252    }
253
254    // Polling loop with progressive backoff until the deadline.
255    let deadline = Instant::now() + Duration::from_secs(wait_secs);
256    let mut polls: u64 = 0;
257    loop {
258        let poll_delay = CLI_LOCK_POLL_INTERVAL_MS
259            .saturating_mul(1 + polls / 4)
260            .min(CLI_LOCK_POLL_INTERVAL_MS * 4);
261        thread::sleep(Duration::from_millis(poll_delay));
262        polls += 1;
263        if let Some((file, slot)) = try_any_slot(max)? {
264            return Ok((file, slot));
265        }
266        if Instant::now() >= deadline {
267            return Err(AppError::AllSlotsFull {
268                max,
269                waited_secs: wait_secs,
270            });
271        }
272    }
273}
274
275/// Acquires a process-wide singleton lock for a heavy job type and namespace.
276///
277/// G28-B (v1.0.68): ensures at most one `enrich`, `ingest --mode
278/// claude-code`, or `ingest --mode codex` runs at a time per namespace.
279/// A second invocation in the same namespace either:
280///
281/// - Returns immediately with `AppError::JobSingletonLocked { job_type,
282///   namespace }` when `wait_seconds` is `None` or `Some(0)`.
283/// - Polls every [`JOB_SINGLETON_POLL_INTERVAL_MS`] ms until the lock
284///   drops or the deadline expires, returning the same error on timeout.
285///
286/// The returned `File` MUST be kept alive until the process exits;
287/// dropping it releases the singleton for the next invocation.
288pub fn acquire_job_singleton(
289    job_type: JobType,
290    namespace: &str,
291    db_path: &Path,
292    wait_seconds: Option<u64>,
293    force: bool,
294) -> Result<File, AppError> {
295    let db_hash = db_path_hash(db_path);
296    let path = job_singleton_path(job_type, namespace, &db_hash)?;
297
298    // G30+G09: when --force is set, attempt to break a stale lock by
299    // detecting and removing a pre-existing lock file. This is a last
300    // resort: only enabled by an explicit operator flag. A real orphan
301    // lock from a previous crash leaves a 0-byte file behind, which the
302    // next non-forced caller would still try to lock.
303    if force && path.exists() {
304        tracing::warn!(target: "lock",
305            path = %path.display(),
306            "force=true; removing pre-existing singleton lock file"
307        );
308        let _ = std::fs::remove_file(&path);
309    }
310
311    let file = OpenOptions::new()
312        .read(true)
313        .write(true)
314        .create(true)
315        .truncate(false)
316        .open(&path)?;
317    if let Err(e) = file.try_lock_exclusive() {
318        if !is_lock_contended(&e) {
319            return Err(AppError::Io(e));
320        }
321        // Already held by another instance.
322        let wait_secs = wait_seconds.unwrap_or(0);
323        if wait_secs == 0 {
324            return Err(AppError::JobSingletonLocked {
325                job_type: job_type.tag().to_string(),
326                namespace: namespace.to_string(),
327            });
328        }
329        let deadline = Instant::now() + Duration::from_secs(wait_secs);
330        // Drop the failed handle before polling; flock is per-process so we
331        // re-open each attempt to refresh contention state.
332        drop(file);
333        loop {
334            thread::sleep(Duration::from_millis(JOB_SINGLETON_POLL_INTERVAL_MS));
335            let file = OpenOptions::new()
336                .read(true)
337                .write(true)
338                .create(true)
339                .truncate(false)
340                .open(&path)?;
341            if file.try_lock_exclusive().is_ok() {
342                return Ok(file);
343            }
344            if Instant::now() >= deadline {
345                return Err(AppError::JobSingletonLocked {
346                    job_type: job_type.tag().to_string(),
347                    namespace: namespace.to_string(),
348                });
349            }
350        }
351    }
352    Ok(file)
353}
354
355/// Tries to acquire any free slot in `1..=max`, returning the first available one.
356///
357/// Returns `Ok(Some((file, slot)))` if a slot was obtained, `Ok(None)` if all are
358/// occupied (`EWOULDBLOCK`). Propagates I/O errors other than "lock contended".
359fn try_any_slot(max: usize) -> Result<Option<(File, usize)>, AppError> {
360    for slot in 1..=max {
361        match try_acquire_slot(slot) {
362            Ok(file) => return Ok(Some((file, slot))),
363            Err(AppError::Io(e)) if is_lock_contended(&e) => continue,
364            Err(e) => return Err(e),
365        }
366    }
367    Ok(None)
368}
369
370fn is_lock_contended(error: &std::io::Error) -> bool {
371    if error.kind() == std::io::ErrorKind::WouldBlock {
372        return true;
373    }
374
375    #[cfg(windows)]
376    {
377        matches!(error.raw_os_error(), Some(32 | 33))
378    }
379
380    #[cfg(not(windows))]
381    {
382        false
383    }
384}
385
386#[cfg(test)]
387mod tests {
388    use super::*;
389    use std::sync::atomic::{AtomicUsize, Ordering};
390    static SEQ: AtomicUsize = AtomicUsize::new(0);
391
392    fn unique_ns() -> String {
393        let n = SEQ.fetch_add(1, Ordering::SeqCst);
394        let pid = std::process::id();
395        format!("test-{pid}-{n}")
396    }
397
398    #[test]
399    fn job_singleton_path_sanitises_namespace() {
400        let p = job_singleton_path(JobType::Enrich, "Foo Bar/Baz", "abc123def456")
401            .expect("path should resolve");
402        let name = p.file_name().unwrap().to_string_lossy().to_string();
403        assert!(name.contains("enrich"), "got {name}");
404        assert!(name.contains("foo-bar-baz"), "got {name}");
405        assert!(
406            name.contains("abc123def456"),
407            "must embed db_hash: got {name}"
408        );
409    }
410
411    #[test]
412    fn job_singleton_blocks_second_invocation_same_namespace() {
413        let ns = unique_ns();
414        let db = std::env::temp_dir().join(format!("test-{}.sqlite", unique_ns()));
415        let first = acquire_job_singleton(JobType::Enrich, &ns, &db, Some(0), false)
416            .expect("first acquire should succeed");
417        let second = acquire_job_singleton(JobType::Enrich, &ns, &db, Some(0), false);
418        assert!(
419            matches!(second, Err(AppError::JobSingletonLocked { .. })),
420            "expected JobSingletonLocked, got {second:?}"
421        );
422        drop(first);
423    }
424
425    #[test]
426    fn job_singleton_allows_different_namespaces() {
427        let ns_a = unique_ns();
428        let ns_b = unique_ns();
429        let db_a = std::env::temp_dir().join(format!("test-a-{}.sqlite", unique_ns()));
430        let db_b = std::env::temp_dir().join(format!("test-b-{}.sqlite", unique_ns()));
431        let first = acquire_job_singleton(JobType::IngestClaudeCode, &ns_a, &db_a, Some(0), false)
432            .expect("ns_a should acquire");
433        let second = acquire_job_singleton(JobType::IngestClaudeCode, &ns_b, &db_b, Some(0), false)
434            .expect("ns_b should acquire in parallel");
435        drop(first);
436        drop(second);
437    }
438
439    #[test]
440    fn job_singleton_scoped_by_db_hash() {
441        // G30: two databases, same namespace, different content. Both locks
442        // should succeed because the db_hash differs.
443        let ns = unique_ns();
444        let db_a = std::env::temp_dir().join(format!("test-x-{}.sqlite", unique_ns()));
445        let db_b = std::env::temp_dir().join(format!("test-y-{}.sqlite", unique_ns()));
446        let first = acquire_job_singleton(JobType::Enrich, &ns, &db_a, Some(0), false)
447            .expect("db_a should acquire");
448        let second = acquire_job_singleton(JobType::Enrich, &ns, &db_b, Some(0), false)
449            .expect("db_b should acquire independently (G30 fix)");
450        drop(first);
451        drop(second);
452    }
453
454    #[test]
455    fn db_path_hash_is_stable_for_same_path() {
456        let p = std::env::temp_dir().join("hashing-test.sqlite");
457        let h1 = db_path_hash(&p);
458        let h2 = db_path_hash(&p);
459        assert_eq!(h1, h2, "same path must produce same hash");
460        assert_eq!(h1.len(), 12, "BLAKE3 prefix must be 12 hex chars");
461    }
462
463    #[test]
464    fn db_path_hash_differs_for_different_paths() {
465        let a = std::env::temp_dir().join("hash-a.sqlite");
466        let b = std::env::temp_dir().join("hash-b.sqlite");
467        assert_ne!(db_path_hash(&a), db_path_hash(&b));
468    }
469}