Skip to main content

sqlite_graphrag/
lock.rs

1//! Counting semaphore via lock files to limit parallel CLI invocations.
2//!
3//! `acquire_cli_slot` tries to acquire one of `N` available slots by opening the file
4//! `cli-slot-{N}.lock` in the OS cache directory and obtaining an exclusive `flock`.
5//! The returned [`std::fs::File`] MUST be kept alive for the entire duration of `main`;
6//! dropping it releases the slot automatically for the next invocation.
7//!
8//! When `wait_seconds` is `Some(n) > 0`, the function polls every
9//! [`crate::constants::CLI_LOCK_POLL_INTERVAL_MS`] milliseconds until the deadline. When it
10//! is `None` or `Some(0)`, a single attempt is made and `Err(AppError::AllSlotsFull)` is
11//! returned immediately if all slots are occupied.
12//!
13//! ## Job-type singleton (G28-B, v1.0.68)
14//!
15//! Heavy long-running jobs (`enrich`, `ingest --mode claude-code`,
16//! `ingest --mode codex`) also acquire a *singleton* lock per `(job_type,
17//! namespace)` via `acquire_job_singleton`.  This guarantees at most one
18//! heavy job per namespace runs at any time, which was the root cause
19//! of the 2026-06-03 process-proliferation incident (4 parallel `enrich`
20//! instances × N workers × 10 MCP servers = ~192 spawned processes).
21// Workload: I/O-bound (flock polling with exponential backoff sleep)
22
23use std::fs::{File, OpenOptions};
24use std::path::{Path, PathBuf};
25use std::thread;
26use std::time::{Duration, Instant};
27
28use directories::ProjectDirs;
29use fs4::fs_std::FileExt;
30
31use crate::constants::{
32    CLI_LOCK_POLL_INTERVAL_MS, JOB_SINGLETON_POLL_INTERVAL_MS, LLM_WORKER_RSS_MB,
33    MAX_CONCURRENT_CLI_INSTANCES,
34};
35use crate::errors::AppError;
36
37/// Job-type classification for `acquire_job_singleton`.
38///
39/// `Light` is intentionally NOT a variant here because lightweight
40/// commands (`recall`, `stats`, `read`, `list`) share the existing
41/// counting-semaphore in [`acquire_cli_slot`] and do not need a singleton.
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum JobType {
44    /// `enrich` command (LLM-driven entity/relation/body enrichment).
45    Enrich,
46    /// `ingest --mode claude-code` (LLM-curated ingestion).
47    IngestClaudeCode,
48    /// `ingest --mode codex` (OpenAI Codex CLI ingestion).
49    IngestCodex,
50}
51
52impl JobType {
53    /// Returns the kebab-case tag used inside the lock file name.
54    fn tag(self) -> &'static str {
55        match self {
56            JobType::Enrich => "enrich",
57            JobType::IngestClaudeCode => "ingest-claude-code",
58            JobType::IngestCodex => "ingest-codex",
59        }
60    }
61}
62
63/// Returns the lock file path for the given slot.
64///
65/// Honours `SQLITE_GRAPHRAG_CACHE_DIR` when set (useful for tests, containers,
66/// and NFS caches), falling back to the OS default cache directory via
67/// `directories::ProjectDirs`. The slot must be 1-based.
68fn slot_path(slot: usize) -> Result<PathBuf, AppError> {
69    let cache = cache_dir()?;
70    std::fs::create_dir_all(&cache)?;
71    Ok(cache.join(format!("cli-slot-{slot}.lock")))
72}
73
74/// Resolves the lock-file directory honouring `SQLITE_GRAPHRAG_CACHE_DIR`.
75fn cache_dir() -> Result<PathBuf, AppError> {
76    if let Some(override_dir) = std::env::var_os("SQLITE_GRAPHRAG_CACHE_DIR") {
77        Ok(PathBuf::from(override_dir))
78    } else {
79        let dirs = ProjectDirs::from("", "", "sqlite-graphrag").ok_or_else(|| {
80            AppError::Io(std::io::Error::new(
81                std::io::ErrorKind::NotFound,
82                "could not determine cache directory for sqlite-graphrag lock files",
83            ))
84        })?;
85        Ok(dirs.cache_dir().to_path_buf())
86    }
87}
88
89/// Computes a short, filesystem-safe hash of the database path so two distinct
90/// databases (e.g. `/tmp/a.sqlite` and `/tmp/b.sqlite`) get distinct lock
91/// files in the shared cache directory. First 12 hex chars of BLAKE3 are
92/// sufficient for collision avoidance across the local filesystem.
93pub fn db_path_hash(db_path: &Path) -> String {
94    let canonical = db_path
95        .canonicalize()
96        .unwrap_or_else(|_| db_path.to_path_buf());
97    let hash = blake3::hash(canonical.to_string_lossy().as_bytes());
98    hash.to_hex().to_string()[..12].to_string()
99}
100
101/// Returns the singleton lock file path for a given (job_type, namespace, db_hash).
102///
103/// Layout: `job-singleton-{tag}-{namespace_slug}-{db_hash}.lock` in the same
104/// cache dir as the CLI slots. The namespace is sanitised to a filesystem-safe
105/// slug (lowercase, hyphens, alphanumeric) and defaults to `default` when
106/// empty. The `db_hash` is the BLAKE3 prefix returned by [`db_path_hash`].
107///
108/// G30 (v1.0.69): the previous implementation ignored the database path
109/// entirely, so two concurrent `enrich` invocations against different
110/// `graphrag.sqlite` files (production vs. test) collided on the same
111/// cache-dir lock. The db_hash scope makes the singleton per-database while
112/// still sharing the same cache dir.
113pub fn job_singleton_path(
114    job_type: JobType,
115    namespace: &str,
116    db_hash: &str,
117) -> Result<PathBuf, AppError> {
118    let cache = cache_dir()?;
119    std::fs::create_dir_all(&cache)?;
120    let slug = if namespace.is_empty() {
121        "default".to_string()
122    } else {
123        namespace
124            .chars()
125            .map(|c| {
126                if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
127                    c.to_ascii_lowercase()
128                } else {
129                    '-'
130                }
131            })
132            .collect::<String>()
133    };
134    let safe_hash: String = db_hash
135        .chars()
136        .filter(|c| c.is_ascii_alphanumeric())
137        .take(16)
138        .collect();
139    Ok(cache.join(format!(
140        "job-singleton-{}-{slug}-{safe_hash}.lock",
141        job_type.tag()
142    )))
143}
144
145/// Tries to open and exclusively lock the lock file for the given slot.
146///
147/// Returns `Ok(file)` if the slot is free, or `Err(io::Error)` if it is
148/// held by another instance (non-blocking).
149fn try_acquire_slot(slot: usize) -> Result<File, AppError> {
150    let path = slot_path(slot)?;
151    let file = OpenOptions::new()
152        .read(true)
153        .write(true)
154        .create(true)
155        .truncate(false)
156        .open(&path)?;
157    file.try_lock_exclusive().map_err(AppError::Io)?;
158    Ok(file)
159}
160
161/// Acquires a concurrency slot from the `max_concurrency`-position semaphore.
162///
163/// Iterates slots `1..=max_concurrency` attempting `try_lock_exclusive` on each
164/// `cli-slot-N.lock` file. When a free slot is found, returns `(File, slot_number)`.
165/// If all slots are occupied:
166///
167/// - If `wait_seconds` is `None` or `Some(0)`, returns immediately with
168///   `AppError::AllSlotsFull { max, waited_secs: 0 }`.
169/// - If `wait_seconds` is `Some(n) > 0`, enters a polling loop every
170///
171/// Returns the maximum number of parallel CLI instances the host can sustain
172/// without thrashing. The formula:
173///
174///   safe = min(cpus, available_mb / per_worker_mb) * 1.0
175///
176/// replaces the previous `... * 0.5` halving factor. The `* 0.5` was the
177/// root cause of G18: even on a 64 GB host the result was always
178/// clamped to 4 because of the division-by-2.
179///
180/// The per-worker cost is `LLM_WORKER_RSS_MB` (350): since v1.0.79 every
181/// build is LLM-only (the `embedding-legacy` feature and the ONNX path
182/// were removed), so the higher fastembed worker cost no longer applies.
183///
184/// Returns 1 as a defensive floor when system stats are unavailable.
185pub fn calculate_safe_concurrency() -> usize {
186    use sysinfo::System;
187    let mut sys = System::new();
188    sys.refresh_memory();
189    let available_mb = sys.available_memory() / 1_048_576;
190    let cpus = std::thread::available_parallelism()
191        .map(|n| n.get())
192        .unwrap_or(2);
193
194    let per_worker_mb = LLM_WORKER_RSS_MB;
195
196    let memory_bound = if available_mb == 0 {
197        cpus
198    } else {
199        (available_mb / per_worker_mb.max(1)) as usize
200    };
201    let raw = cpus.min(memory_bound).max(1);
202    raw.min(MAX_CONCURRENT_CLI_INSTANCES)
203}
204
205/// v1.0.75 — Returns the worker cost in MiB used by `calculate_safe_concurrency`.
206/// Exposed for telemetry and `--info` output.
207pub fn worker_cost_mb() -> u64 {
208    LLM_WORKER_RSS_MB
209}
210
211///   `AppError::AllSlotsFull { max, waited_secs: n }` if no slot opens.
212///
213/// The returned `File` MUST be kept alive until the process exits; dropping it
214/// releases the slot automatically via the implicit `flock` on close.
215pub fn acquire_cli_slot(
216    max_concurrency: usize,
217    wait_seconds: Option<u64>,
218) -> Result<(File, usize), AppError> {
219    // G18: use env override or 2*cpus as ceiling instead of hardcoded 4
220    let ncpus = std::thread::available_parallelism()
221        .map(|n| n.get())
222        .unwrap_or(4);
223    let ceiling = std::env::var("SQLITE_GRAPHRAG_MAX_CLI_INSTANCES")
224        .ok()
225        .and_then(|v| v.parse::<usize>().ok())
226        .unwrap_or_else(|| (2 * ncpus).max(MAX_CONCURRENT_CLI_INSTANCES));
227    let max = max_concurrency.clamp(1, ceiling);
228    let wait_secs = wait_seconds.unwrap_or(0);
229
230    // Tentativa inicial sem espera.
231    if let Some((file, slot)) = try_any_slot(max)? {
232        return Ok((file, slot));
233    }
234
235    if wait_secs == 0 {
236        return Err(AppError::AllSlotsFull {
237            max,
238            waited_secs: 0,
239        });
240    }
241
242    // Polling loop with progressive backoff until the deadline.
243    let deadline = Instant::now() + Duration::from_secs(wait_secs);
244    let mut polls: u64 = 0;
245    loop {
246        let poll_delay = CLI_LOCK_POLL_INTERVAL_MS
247            .saturating_mul(1 + polls / 4)
248            .min(CLI_LOCK_POLL_INTERVAL_MS * 4);
249        thread::sleep(Duration::from_millis(poll_delay));
250        polls += 1;
251        if let Some((file, slot)) = try_any_slot(max)? {
252            return Ok((file, slot));
253        }
254        if Instant::now() >= deadline {
255            return Err(AppError::AllSlotsFull {
256                max,
257                waited_secs: wait_secs,
258            });
259        }
260    }
261}
262
263/// Acquires a process-wide singleton lock for a heavy job type and namespace.
264///
265/// G28-B (v1.0.68): ensures at most one `enrich`, `ingest --mode
266/// claude-code`, or `ingest --mode codex` runs at a time per namespace.
267/// A second invocation in the same namespace either:
268///
269/// - Returns immediately with `AppError::JobSingletonLocked { job_type,
270///   namespace }` when `wait_seconds` is `None` or `Some(0)`.
271/// - Polls every [`JOB_SINGLETON_POLL_INTERVAL_MS`] ms until the lock
272///   drops or the deadline expires, returning the same error on timeout.
273///
274/// The returned `File` MUST be kept alive until the process exits;
275/// dropping it releases the singleton for the next invocation.
276pub fn acquire_job_singleton(
277    job_type: JobType,
278    namespace: &str,
279    db_path: &Path,
280    wait_seconds: Option<u64>,
281    force: bool,
282) -> Result<File, AppError> {
283    let db_hash = db_path_hash(db_path);
284    let path = job_singleton_path(job_type, namespace, &db_hash)?;
285
286    // G30+G09: when --force is set, attempt to break a stale lock by
287    // detecting and removing a pre-existing lock file. This is a last
288    // resort: only enabled by an explicit operator flag. A real orphan
289    // lock from a previous crash leaves a 0-byte file behind, which the
290    // next non-forced caller would still try to lock.
291    if force && path.exists() {
292        tracing::warn!(target: "lock",
293            path = %path.display(),
294            "force=true; removing pre-existing singleton lock file"
295        );
296        let _ = std::fs::remove_file(&path);
297    }
298
299    let file = OpenOptions::new()
300        .read(true)
301        .write(true)
302        .create(true)
303        .truncate(false)
304        .open(&path)?;
305    if let Err(e) = file.try_lock_exclusive() {
306        if !is_lock_contended(&e) {
307            return Err(AppError::Io(e));
308        }
309        // Already held by another instance.
310        let wait_secs = wait_seconds.unwrap_or(0);
311        if wait_secs == 0 {
312            return Err(AppError::JobSingletonLocked {
313                job_type: job_type.tag().to_string(),
314                namespace: namespace.to_string(),
315            });
316        }
317        let deadline = Instant::now() + Duration::from_secs(wait_secs);
318        // Drop the failed handle before polling; flock is per-process so we
319        // re-open each attempt to refresh contention state.
320        drop(file);
321        loop {
322            thread::sleep(Duration::from_millis(JOB_SINGLETON_POLL_INTERVAL_MS));
323            let file = OpenOptions::new()
324                .read(true)
325                .write(true)
326                .create(true)
327                .truncate(false)
328                .open(&path)?;
329            if file.try_lock_exclusive().is_ok() {
330                return Ok(file);
331            }
332            if Instant::now() >= deadline {
333                return Err(AppError::JobSingletonLocked {
334                    job_type: job_type.tag().to_string(),
335                    namespace: namespace.to_string(),
336                });
337            }
338        }
339    }
340    Ok(file)
341}
342
343/// G45: returns the lock file path for the embedding singleton
344/// of a `(namespace, db_hash)` pair. Layout:
345/// `embed-singleton-{namespace_slug}-{db_hash}.lock` in the same
346/// cache directory as the other singletons. The namespace is sanitised
347/// to a filesystem-safe slug the same way as [`job_singleton_path`].
348fn embedding_singleton_path(namespace: &str, db_hash: &str) -> Result<PathBuf, AppError> {
349    let cache = cache_dir()?;
350    std::fs::create_dir_all(&cache)?;
351    let slug = if namespace.is_empty() {
352        "default".to_string()
353    } else {
354        namespace
355            .chars()
356            .map(|c| {
357                if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
358                    c.to_ascii_lowercase()
359                } else {
360                    '-'
361                }
362            })
363            .collect::<String>()
364    };
365    let safe_hash: String = db_hash
366        .chars()
367        .filter(|c| c.is_ascii_alphanumeric())
368        .take(16)
369        .collect();
370    Ok(cache.join(format!("embed-singleton-{slug}-{safe_hash}.lock")))
371}
372
373/// G45: acquires a cross-process singleton lock for LLM embedding
374/// operations against a given `(namespace, db)` pair.
375///
376/// The lock is opened and held with `flock` (same mechanism as
377/// [`acquire_job_singleton`]). Two CLI invocations writing to the same
378/// database while both are calling the LLM on entity names will now
379/// serialise: the second one receives [`AppError::EmbeddingSingletonLocked`]
380/// (exit 75) instead of double-spawning `claude -p` / `codex exec`
381/// subprocesses.
382///
383/// Behaviour:
384/// - `wait_seconds = Some(0)` or `None` → fail immediately if held.
385/// - `wait_seconds = Some(n) > 0` → poll every
386///   [`JOB_SINGLETON_POLL_INTERVAL_MS`] ms until the lock drops or the
387///   deadline expires.
388/// - `force = true` → remove a stale lock file before acquiring
389///   (operator escape hatch, same contract as `acquire_job_singleton`).
390///
391/// The returned [`File`] MUST be kept alive for the duration of the
392/// embedding work; dropping it releases the singleton for the next
393/// process.
394pub fn acquire_embedding_singleton(
395    namespace: &str,
396    db_path: &Path,
397    wait_seconds: Option<u64>,
398    force: bool,
399) -> Result<File, AppError> {
400    let db_hash = db_path_hash(db_path);
401    let path = embedding_singleton_path(namespace, &db_hash)?;
402
403    if force && path.exists() {
404        tracing::warn!(target: "lock.g45",
405            path = %path.display(),
406            "force=true; removing pre-existing embedding singleton lock file"
407        );
408        let _ = std::fs::remove_file(&path);
409    }
410
411    let file = OpenOptions::new()
412        .read(true)
413        .write(true)
414        .create(true)
415        .truncate(false)
416        .open(&path)?;
417    if let Err(e) = file.try_lock_exclusive() {
418        if !is_lock_contended(&e) {
419            return Err(AppError::Io(e));
420        }
421        let wait_secs = wait_seconds.unwrap_or(0);
422        if wait_secs == 0 {
423            return Err(AppError::EmbeddingSingletonLocked {
424                namespace: namespace.to_string(),
425            });
426        }
427        let deadline = Instant::now() + Duration::from_secs(wait_secs);
428        drop(file);
429        loop {
430            thread::sleep(Duration::from_millis(JOB_SINGLETON_POLL_INTERVAL_MS));
431            let file = OpenOptions::new()
432                .read(true)
433                .write(true)
434                .create(true)
435                .truncate(false)
436                .open(&path)?;
437            if file.try_lock_exclusive().is_ok() {
438                return Ok(file);
439            }
440            if Instant::now() >= deadline {
441                return Err(AppError::EmbeddingSingletonLocked {
442                    namespace: namespace.to_string(),
443                });
444            }
445        }
446    }
447    Ok(file)
448}
449
450/// Tries to acquire any free slot in `1..=max`, returning the first available one.
451///
452/// Returns `Ok(Some((file, slot)))` if a slot was obtained, `Ok(None)` if all are
453/// occupied (`EWOULDBLOCK`). Propagates I/O errors other than "lock contended".
454fn try_any_slot(max: usize) -> Result<Option<(File, usize)>, AppError> {
455    for slot in 1..=max {
456        match try_acquire_slot(slot) {
457            Ok(file) => return Ok(Some((file, slot))),
458            Err(AppError::Io(e)) if is_lock_contended(&e) => continue,
459            Err(e) => return Err(e),
460        }
461    }
462    Ok(None)
463}
464
465fn is_lock_contended(error: &std::io::Error) -> bool {
466    if error.kind() == std::io::ErrorKind::WouldBlock {
467        return true;
468    }
469
470    #[cfg(windows)]
471    {
472        matches!(error.raw_os_error(), Some(32 | 33))
473    }
474
475    #[cfg(not(windows))]
476    {
477        false
478    }
479}
480
481#[cfg(test)]
482mod tests {
483    use super::*;
484    use std::sync::atomic::{AtomicUsize, Ordering};
485    static SEQ: AtomicUsize = AtomicUsize::new(0);
486
487    fn unique_ns() -> String {
488        let n = SEQ.fetch_add(1, Ordering::SeqCst);
489        let pid = std::process::id();
490        format!("test-{pid}-{n}")
491    }
492
493    #[test]
494    fn job_singleton_path_sanitises_namespace() {
495        let p = job_singleton_path(JobType::Enrich, "Foo Bar/Baz", "abc123def456")
496            .expect("path should resolve");
497        let name = p.file_name().unwrap().to_string_lossy().to_string();
498        assert!(name.contains("enrich"), "got {name}");
499        assert!(name.contains("foo-bar-baz"), "got {name}");
500        assert!(
501            name.contains("abc123def456"),
502            "must embed db_hash: got {name}"
503        );
504    }
505
506    #[test]
507    fn job_singleton_blocks_second_invocation_same_namespace() {
508        let ns = unique_ns();
509        let db = std::env::temp_dir().join(format!("test-{}.sqlite", unique_ns()));
510        let first = acquire_job_singleton(JobType::Enrich, &ns, &db, Some(0), false)
511            .expect("first acquire should succeed");
512        let second = acquire_job_singleton(JobType::Enrich, &ns, &db, Some(0), false);
513        assert!(
514            matches!(second, Err(AppError::JobSingletonLocked { .. })),
515            "expected JobSingletonLocked, got {second:?}"
516        );
517        drop(first);
518    }
519
520    #[test]
521    fn job_singleton_allows_different_namespaces() {
522        let ns_a = unique_ns();
523        let ns_b = unique_ns();
524        let db_a = std::env::temp_dir().join(format!("test-a-{}.sqlite", unique_ns()));
525        let db_b = std::env::temp_dir().join(format!("test-b-{}.sqlite", unique_ns()));
526        let first = acquire_job_singleton(JobType::IngestClaudeCode, &ns_a, &db_a, Some(0), false)
527            .expect("ns_a should acquire");
528        let second = acquire_job_singleton(JobType::IngestClaudeCode, &ns_b, &db_b, Some(0), false)
529            .expect("ns_b should acquire in parallel");
530        drop(first);
531        drop(second);
532    }
533
534    #[test]
535    fn job_singleton_scoped_by_db_hash() {
536        // G30: two databases, same namespace, different content. Both locks
537        // should succeed because the db_hash differs.
538        let ns = unique_ns();
539        let db_a = std::env::temp_dir().join(format!("test-x-{}.sqlite", unique_ns()));
540        let db_b = std::env::temp_dir().join(format!("test-y-{}.sqlite", unique_ns()));
541        let first = acquire_job_singleton(JobType::Enrich, &ns, &db_a, Some(0), false)
542            .expect("db_a should acquire");
543        let second = acquire_job_singleton(JobType::Enrich, &ns, &db_b, Some(0), false)
544            .expect("db_b should acquire independently (G30 fix)");
545        drop(first);
546        drop(second);
547    }
548
549    #[test]
550    fn db_path_hash_is_stable_for_same_path() {
551        let p = std::env::temp_dir().join("hashing-test.sqlite");
552        let h1 = db_path_hash(&p);
553        let h2 = db_path_hash(&p);
554        assert_eq!(h1, h2, "same path must produce same hash");
555        assert_eq!(h1.len(), 12, "BLAKE3 prefix must be 12 hex chars");
556    }
557
558    #[test]
559    fn db_path_hash_differs_for_different_paths() {
560        let a = std::env::temp_dir().join("hash-a.sqlite");
561        let b = std::env::temp_dir().join("hash-b.sqlite");
562        assert_ne!(db_path_hash(&a), db_path_hash(&b));
563    }
564
565    // G45: embedding singleton — cross-process coordination
566    #[test]
567    fn g45_embedding_singleton_blocks_second_invocation_same_db() {
568        let ns = unique_ns();
569        let db = std::env::temp_dir().join(format!("g45-{}.sqlite", unique_ns()));
570        let first = acquire_embedding_singleton(&ns, &db, Some(0), false)
571            .expect("first acquire should succeed");
572        let second = acquire_embedding_singleton(&ns, &db, Some(0), false);
573        assert!(
574            matches!(second, Err(AppError::EmbeddingSingletonLocked { .. })),
575            "expected EmbeddingSingletonLocked, got {second:?}"
576        );
577        drop(first);
578    }
579
580    #[test]
581    fn g45_embedding_singleton_allows_different_namespaces() {
582        let ns_a = unique_ns();
583        let ns_b = unique_ns();
584        let db = std::env::temp_dir().join(format!("g45-multi-{}.sqlite", unique_ns()));
585        let first =
586            acquire_embedding_singleton(&ns_a, &db, Some(0), false).expect("ns_a should acquire");
587        let second = acquire_embedding_singleton(&ns_b, &db, Some(0), false)
588            .expect("ns_b should acquire in parallel (different namespace)");
589        drop(first);
590        drop(second);
591    }
592
593    #[test]
594    fn g45_embedding_singleton_scoped_by_db_hash() {
595        // Same namespace, different databases → independent locks.
596        let ns = unique_ns();
597        let db_a = std::env::temp_dir().join(format!("g45-x-{}.sqlite", unique_ns()));
598        let db_b = std::env::temp_dir().join(format!("g45-y-{}.sqlite", unique_ns()));
599        let first =
600            acquire_embedding_singleton(&ns, &db_a, Some(0), false).expect("db_a should acquire");
601        let second = acquire_embedding_singleton(&ns, &db_b, Some(0), false)
602            .expect("db_b should acquire independently (G45 db_hash scope)");
603        drop(first);
604        drop(second);
605    }
606}