Skip to main content

sqlite_graphrag/
lock.rs

1//! Counting semaphore via lock files to limit parallel CLI invocations.
2//!
3//! `acquire_cli_slot` tries to acquire one of `N` available slots by opening the file
4//! `cli-slot-{N}.lock` in the OS cache directory and obtaining an exclusive `flock`.
5//! The returned [`std::fs::File`] MUST be kept alive for the entire duration of `main`;
6//! dropping it releases the slot automatically for the next invocation.
7//!
8//! When `wait_seconds` is `Some(n) > 0`, the function polls every
9//! [`crate::constants::CLI_LOCK_POLL_INTERVAL_MS`] milliseconds until the deadline. When it
10//! is `None` or `Some(0)`, a single attempt is made and `Err(AppError::AllSlotsFull)` is
11//! returned immediately if all slots are occupied.
12//!
13//! ## Job-type singleton (G28-B, v1.0.68)
14//!
15//! Heavy long-running jobs (`enrich`, `ingest --mode claude-code`,
16//! `ingest --mode codex`) also acquire a *singleton* lock per `(job_type,
17//! namespace)` via `acquire_job_singleton`.  This guarantees at most one
18//! heavy job per namespace runs at any time, which was the root cause
19//! of the 2026-06-03 process-proliferation incident (4 parallel `enrich`
20//! instances × N workers × 10 MCP servers = ~192 spawned processes).
21// Workload: I/O-bound (flock polling with exponential backoff sleep)
22
23use std::fs::{File, OpenOptions};
24use std::path::{Path, PathBuf};
25use std::thread;
26use std::time::{Duration, Instant};
27
28use directories::ProjectDirs;
29use fs4::fs_std::FileExt;
30
31use crate::constants::{
32    CLI_LOCK_POLL_INTERVAL_MS, JOB_SINGLETON_POLL_INTERVAL_MS, MAX_CONCURRENT_CLI_INSTANCES,
33};
34use crate::errors::AppError;
35
36/// Job-type classification for `acquire_job_singleton`.
37///
38/// `Light` is intentionally NOT a variant here because lightweight
39/// commands (`recall`, `stats`, `read`, `list`) share the existing
40/// counting-semaphore in [`acquire_cli_slot`] and do not need a singleton.
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum JobType {
43    /// `enrich` command (LLM-driven entity/relation/body enrichment).
44    Enrich,
45    /// `ingest --mode claude-code` (LLM-curated ingestion).
46    IngestClaudeCode,
47    /// `ingest --mode codex` (OpenAI Codex CLI ingestion).
48    IngestCodex,
49}
50
51impl JobType {
52    /// Returns the kebab-case tag used inside the lock file name.
53    fn tag(self) -> &'static str {
54        match self {
55            JobType::Enrich => "enrich",
56            JobType::IngestClaudeCode => "ingest-claude-code",
57            JobType::IngestCodex => "ingest-codex",
58        }
59    }
60}
61
62/// Returns the lock file path for the given slot.
63///
64/// Honours `SQLITE_GRAPHRAG_CACHE_DIR` when set (useful for tests, containers,
65/// and NFS caches), falling back to the OS default cache directory via
66/// `directories::ProjectDirs`. The slot must be 1-based.
67fn slot_path(slot: usize) -> Result<PathBuf, AppError> {
68    let cache = cache_dir()?;
69    std::fs::create_dir_all(&cache)?;
70    Ok(cache.join(format!("cli-slot-{slot}.lock")))
71}
72
73/// Resolves the lock-file directory honouring `SQLITE_GRAPHRAG_CACHE_DIR`.
74fn cache_dir() -> Result<PathBuf, AppError> {
75    if let Some(override_dir) = std::env::var_os("SQLITE_GRAPHRAG_CACHE_DIR") {
76        Ok(PathBuf::from(override_dir))
77    } else {
78        let dirs = ProjectDirs::from("", "", "sqlite-graphrag").ok_or_else(|| {
79            AppError::Io(std::io::Error::new(
80                std::io::ErrorKind::NotFound,
81                "could not determine cache directory for sqlite-graphrag lock files",
82            ))
83        })?;
84        Ok(dirs.cache_dir().to_path_buf())
85    }
86}
87
88/// Computes a short, filesystem-safe hash of the database path so two distinct
89/// databases (e.g. `/tmp/a.sqlite` and `/tmp/b.sqlite`) get distinct lock
90/// files in the shared cache directory. First 12 hex chars of BLAKE3 are
91/// sufficient for collision avoidance across the local filesystem.
92pub fn db_path_hash(db_path: &Path) -> String {
93    let canonical = db_path
94        .canonicalize()
95        .unwrap_or_else(|_| db_path.to_path_buf());
96    let hash = blake3::hash(canonical.to_string_lossy().as_bytes());
97    hash.to_hex().to_string()[..12].to_string()
98}
99
100/// Returns the singleton lock file path for a given (job_type, namespace, db_hash).
101///
102/// Layout: `job-singleton-{tag}-{namespace_slug}-{db_hash}.lock` in the same
103/// cache dir as the CLI slots. The namespace is sanitised to a filesystem-safe
104/// slug (lowercase, hyphens, alphanumeric) and defaults to `default` when
105/// empty. The `db_hash` is the BLAKE3 prefix returned by [`db_path_hash`].
106///
107/// G30 (v1.0.69): the previous implementation ignored the database path
108/// entirely, so two concurrent `enrich` invocations against different
109/// `graphrag.sqlite` files (production vs. test) collided on the same
110/// cache-dir lock. The db_hash scope makes the singleton per-database while
111/// still sharing the same cache dir.
112pub fn job_singleton_path(
113    job_type: JobType,
114    namespace: &str,
115    db_hash: &str,
116) -> Result<PathBuf, AppError> {
117    let cache = cache_dir()?;
118    std::fs::create_dir_all(&cache)?;
119    let slug = if namespace.is_empty() {
120        "default".to_string()
121    } else {
122        namespace
123            .chars()
124            .map(|c| {
125                if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
126                    c.to_ascii_lowercase()
127                } else {
128                    '-'
129                }
130            })
131            .collect::<String>()
132    };
133    let safe_hash: String = db_hash
134        .chars()
135        .filter(|c| c.is_ascii_alphanumeric())
136        .take(16)
137        .collect();
138    Ok(cache.join(format!(
139        "job-singleton-{}-{slug}-{safe_hash}.lock",
140        job_type.tag()
141    )))
142}
143
144/// Tries to open and exclusively lock the lock file for the given slot.
145///
146/// Returns `Ok(file)` if the slot is free, or `Err(io::Error)` if it is
147/// held by another instance (non-blocking).
148fn try_acquire_slot(slot: usize) -> Result<File, AppError> {
149    let path = slot_path(slot)?;
150    let file = OpenOptions::new()
151        .read(true)
152        .write(true)
153        .create(true)
154        .truncate(false)
155        .open(&path)?;
156    file.try_lock_exclusive().map_err(AppError::Io)?;
157    Ok(file)
158}
159
160/// Acquires a concurrency slot from the `max_concurrency`-position semaphore.
161///
162/// Iterates slots `1..=max_concurrency` attempting `try_lock_exclusive` on each
163/// `cli-slot-N.lock` file. When a free slot is found, returns `(File, slot_number)`.
164/// If all slots are occupied:
165///
166/// - If `wait_seconds` is `None` or `Some(0)`, returns immediately with
167///   `AppError::AllSlotsFull { max, waited_secs: 0 }`.
168/// - If `wait_seconds` is `Some(n) > 0`, enters a polling loop every
169///   [`crate::constants::CLI_LOCK_POLL_INTERVAL_MS`] ms until the deadline expires, returning
170///   `AppError::AllSlotsFull { max, waited_secs: n }` if no slot opens.
171///
172/// The returned `File` MUST be kept alive until the process exits; dropping it
173/// releases the slot automatically via the implicit `flock` on close.
174pub fn acquire_cli_slot(
175    max_concurrency: usize,
176    wait_seconds: Option<u64>,
177) -> Result<(File, usize), AppError> {
178    // G18: use env override or 2*cpus as ceiling instead of hardcoded 4
179    let ncpus = std::thread::available_parallelism()
180        .map(|n| n.get())
181        .unwrap_or(4);
182    let ceiling = std::env::var("SQLITE_GRAPHRAG_MAX_CLI_INSTANCES")
183        .ok()
184        .and_then(|v| v.parse::<usize>().ok())
185        .unwrap_or_else(|| (2 * ncpus).max(MAX_CONCURRENT_CLI_INSTANCES));
186    let max = max_concurrency.clamp(1, ceiling);
187    let wait_secs = wait_seconds.unwrap_or(0);
188
189    // Tentativa inicial sem espera.
190    if let Some((file, slot)) = try_any_slot(max)? {
191        return Ok((file, slot));
192    }
193
194    if wait_secs == 0 {
195        return Err(AppError::AllSlotsFull {
196            max,
197            waited_secs: 0,
198        });
199    }
200
201    // Polling loop with progressive backoff until the deadline.
202    let deadline = Instant::now() + Duration::from_secs(wait_secs);
203    let mut polls: u64 = 0;
204    loop {
205        let poll_delay = CLI_LOCK_POLL_INTERVAL_MS
206            .saturating_mul(1 + polls / 4)
207            .min(CLI_LOCK_POLL_INTERVAL_MS * 4);
208        thread::sleep(Duration::from_millis(poll_delay));
209        polls += 1;
210        if let Some((file, slot)) = try_any_slot(max)? {
211            return Ok((file, slot));
212        }
213        if Instant::now() >= deadline {
214            return Err(AppError::AllSlotsFull {
215                max,
216                waited_secs: wait_secs,
217            });
218        }
219    }
220}
221
222/// Acquires a process-wide singleton lock for a heavy job type and namespace.
223///
224/// G28-B (v1.0.68): ensures at most one `enrich`, `ingest --mode
225/// claude-code`, or `ingest --mode codex` runs at a time per namespace.
226/// A second invocation in the same namespace either:
227///
228/// - Returns immediately with `AppError::JobSingletonLocked { job_type,
229///   namespace }` when `wait_seconds` is `None` or `Some(0)`.
230/// - Polls every [`JOB_SINGLETON_POLL_INTERVAL_MS`] ms until the lock
231///   drops or the deadline expires, returning the same error on timeout.
232///
233/// The returned `File` MUST be kept alive until the process exits;
234/// dropping it releases the singleton for the next invocation.
235pub fn acquire_job_singleton(
236    job_type: JobType,
237    namespace: &str,
238    db_path: &Path,
239    wait_seconds: Option<u64>,
240    force: bool,
241) -> Result<File, AppError> {
242    let db_hash = db_path_hash(db_path);
243    let path = job_singleton_path(job_type, namespace, &db_hash)?;
244
245    // G30+G09: when --force is set, attempt to break a stale lock by
246    // detecting and removing a pre-existing lock file. This is a last
247    // resort: only enabled by an explicit operator flag. A real orphan
248    // lock from a previous crash leaves a 0-byte file behind, which the
249    // next non-forced caller would still try to lock.
250    if force && path.exists() {
251        tracing::warn!(target: "lock",
252            path = %path.display(),
253            "force=true; removing pre-existing singleton lock file"
254        );
255        let _ = std::fs::remove_file(&path);
256    }
257
258    let file = OpenOptions::new()
259        .read(true)
260        .write(true)
261        .create(true)
262        .truncate(false)
263        .open(&path)?;
264    if let Err(e) = file.try_lock_exclusive() {
265        if !is_lock_contended(&e) {
266            return Err(AppError::Io(e));
267        }
268        // Already held by another instance.
269        let wait_secs = wait_seconds.unwrap_or(0);
270        if wait_secs == 0 {
271            return Err(AppError::JobSingletonLocked {
272                job_type: job_type.tag().to_string(),
273                namespace: namespace.to_string(),
274            });
275        }
276        let deadline = Instant::now() + Duration::from_secs(wait_secs);
277        // Drop the failed handle before polling; flock is per-process so we
278        // re-open each attempt to refresh contention state.
279        drop(file);
280        loop {
281            thread::sleep(Duration::from_millis(JOB_SINGLETON_POLL_INTERVAL_MS));
282            let file = OpenOptions::new()
283                .read(true)
284                .write(true)
285                .create(true)
286                .truncate(false)
287                .open(&path)?;
288            if file.try_lock_exclusive().is_ok() {
289                return Ok(file);
290            }
291            if Instant::now() >= deadline {
292                return Err(AppError::JobSingletonLocked {
293                    job_type: job_type.tag().to_string(),
294                    namespace: namespace.to_string(),
295                });
296            }
297        }
298    }
299    Ok(file)
300}
301
302/// Tries to acquire any free slot in `1..=max`, returning the first available one.
303///
304/// Returns `Ok(Some((file, slot)))` if a slot was obtained, `Ok(None)` if all are
305/// occupied (`EWOULDBLOCK`). Propagates I/O errors other than "lock contended".
306fn try_any_slot(max: usize) -> Result<Option<(File, usize)>, AppError> {
307    for slot in 1..=max {
308        match try_acquire_slot(slot) {
309            Ok(file) => return Ok(Some((file, slot))),
310            Err(AppError::Io(e)) if is_lock_contended(&e) => continue,
311            Err(e) => return Err(e),
312        }
313    }
314    Ok(None)
315}
316
317fn is_lock_contended(error: &std::io::Error) -> bool {
318    if error.kind() == std::io::ErrorKind::WouldBlock {
319        return true;
320    }
321
322    #[cfg(windows)]
323    {
324        matches!(error.raw_os_error(), Some(32 | 33))
325    }
326
327    #[cfg(not(windows))]
328    {
329        false
330    }
331}
332
333#[cfg(test)]
334mod tests {
335    use super::*;
336    use std::sync::atomic::{AtomicUsize, Ordering};
337    static SEQ: AtomicUsize = AtomicUsize::new(0);
338
339    fn unique_ns() -> String {
340        let n = SEQ.fetch_add(1, Ordering::SeqCst);
341        let pid = std::process::id();
342        format!("test-{pid}-{n}")
343    }
344
345    #[test]
346    fn job_singleton_path_sanitises_namespace() {
347        let p = job_singleton_path(JobType::Enrich, "Foo Bar/Baz", "abc123def456")
348            .expect("path should resolve");
349        let name = p.file_name().unwrap().to_string_lossy().to_string();
350        assert!(name.contains("enrich"), "got {name}");
351        assert!(name.contains("foo-bar-baz"), "got {name}");
352        assert!(
353            name.contains("abc123def456"),
354            "must embed db_hash: got {name}"
355        );
356    }
357
358    #[test]
359    fn job_singleton_blocks_second_invocation_same_namespace() {
360        let ns = unique_ns();
361        let db = std::env::temp_dir().join(format!("test-{}.sqlite", unique_ns()));
362        let first = acquire_job_singleton(JobType::Enrich, &ns, &db, Some(0), false)
363            .expect("first acquire should succeed");
364        let second = acquire_job_singleton(JobType::Enrich, &ns, &db, Some(0), false);
365        assert!(
366            matches!(second, Err(AppError::JobSingletonLocked { .. })),
367            "expected JobSingletonLocked, got {second:?}"
368        );
369        drop(first);
370    }
371
372    #[test]
373    fn job_singleton_allows_different_namespaces() {
374        let ns_a = unique_ns();
375        let ns_b = unique_ns();
376        let db_a = std::env::temp_dir().join(format!("test-a-{}.sqlite", unique_ns()));
377        let db_b = std::env::temp_dir().join(format!("test-b-{}.sqlite", unique_ns()));
378        let first = acquire_job_singleton(JobType::IngestClaudeCode, &ns_a, &db_a, Some(0), false)
379            .expect("ns_a should acquire");
380        let second = acquire_job_singleton(JobType::IngestClaudeCode, &ns_b, &db_b, Some(0), false)
381            .expect("ns_b should acquire in parallel");
382        drop(first);
383        drop(second);
384    }
385
386    #[test]
387    fn job_singleton_scoped_by_db_hash() {
388        // G30: two databases, same namespace, different content. Both locks
389        // should succeed because the db_hash differs.
390        let ns = unique_ns();
391        let db_a = std::env::temp_dir().join(format!("test-x-{}.sqlite", unique_ns()));
392        let db_b = std::env::temp_dir().join(format!("test-y-{}.sqlite", unique_ns()));
393        let first = acquire_job_singleton(JobType::Enrich, &ns, &db_a, Some(0), false)
394            .expect("db_a should acquire");
395        let second = acquire_job_singleton(JobType::Enrich, &ns, &db_b, Some(0), false)
396            .expect("db_b should acquire independently (G30 fix)");
397        drop(first);
398        drop(second);
399    }
400
401    #[test]
402    fn db_path_hash_is_stable_for_same_path() {
403        let p = std::env::temp_dir().join("hashing-test.sqlite");
404        let h1 = db_path_hash(&p);
405        let h2 = db_path_hash(&p);
406        assert_eq!(h1, h2, "same path must produce same hash");
407        assert_eq!(h1.len(), 12, "BLAKE3 prefix must be 12 hex chars");
408    }
409
410    #[test]
411    fn db_path_hash_differs_for_different_paths() {
412        let a = std::env::temp_dir().join("hash-a.sqlite");
413        let b = std::env::temp_dir().join("hash-b.sqlite");
414        assert_ne!(db_path_hash(&a), db_path_hash(&b));
415    }
416}