Skip to main content

sqlite_graphrag/
lock.rs

1//! Counting semaphore via lock files to limit parallel CLI invocations.
2//!
3//! `acquire_cli_slot` tries to acquire one of `N` available slots by opening the file
4//! `cli-slot-{N}.lock` in the OS cache directory and obtaining an exclusive `flock`.
5//! The returned [`std::fs::File`] MUST be kept alive for the entire duration of `main`;
6//! dropping it releases the slot automatically for the next invocation.
7//!
8//! When `wait_seconds` is `Some(n) > 0`, the function polls every
9//! [`crate::constants::CLI_LOCK_POLL_INTERVAL_MS`] milliseconds until the deadline. When it
10//! is `None` or `Some(0)`, a single attempt is made and `Err(AppError::AllSlotsFull)` is
11//! returned immediately if all slots are occupied.
12//!
13//! ## Job-type singleton (G28-B, v1.0.68)
14//!
15//! Heavy long-running jobs (`enrich`, `ingest --mode claude-code`,
16//! `ingest --mode codex`) also acquire a *singleton* lock per `(job_type,
17//! namespace)` via `acquire_job_singleton`.  This guarantees at most one
18//! heavy job per namespace runs at any time, which was the root cause
19//! of the 2026-06-03 process-proliferation incident (4 parallel `enrich`
20//! instances × N workers × 10 MCP servers = ~192 spawned processes).
21// Workload: I/O-bound (flock polling with exponential backoff sleep)
22
23use std::fs::{File, OpenOptions};
24use std::path::PathBuf;
25use std::thread;
26use std::time::{Duration, Instant};
27
28use directories::ProjectDirs;
29use fs4::fs_std::FileExt;
30
31use crate::constants::{
32    CLI_LOCK_POLL_INTERVAL_MS, JOB_SINGLETON_POLL_INTERVAL_MS, MAX_CONCURRENT_CLI_INSTANCES,
33};
34use crate::errors::AppError;
35
36/// Job-type classification for `acquire_job_singleton`.
37///
38/// `Light` is intentionally NOT a variant here because lightweight
39/// commands (`recall`, `stats`, `read`, `list`) share the existing
40/// counting-semaphore in [`acquire_cli_slot`] and do not need a singleton.
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum JobType {
43    /// `enrich` command (LLM-driven entity/relation/body enrichment).
44    Enrich,
45    /// `ingest --mode claude-code` (LLM-curated ingestion).
46    IngestClaudeCode,
47    /// `ingest --mode codex` (OpenAI Codex CLI ingestion).
48    IngestCodex,
49}
50
51impl JobType {
52    /// Returns the kebab-case tag used inside the lock file name.
53    fn tag(self) -> &'static str {
54        match self {
55            JobType::Enrich => "enrich",
56            JobType::IngestClaudeCode => "ingest-claude-code",
57            JobType::IngestCodex => "ingest-codex",
58        }
59    }
60}
61
62/// Returns the lock file path for the given slot.
63///
64/// Honours `SQLITE_GRAPHRAG_CACHE_DIR` when set (useful for tests, containers,
65/// and NFS caches), falling back to the OS default cache directory via
66/// `directories::ProjectDirs`. The slot must be 1-based.
67fn slot_path(slot: usize) -> Result<PathBuf, AppError> {
68    let cache = cache_dir()?;
69    std::fs::create_dir_all(&cache)?;
70    Ok(cache.join(format!("cli-slot-{slot}.lock")))
71}
72
73/// Resolves the lock-file directory honouring `SQLITE_GRAPHRAG_CACHE_DIR`.
74fn cache_dir() -> Result<PathBuf, AppError> {
75    if let Some(override_dir) = std::env::var_os("SQLITE_GRAPHRAG_CACHE_DIR") {
76        Ok(PathBuf::from(override_dir))
77    } else {
78        let dirs = ProjectDirs::from("", "", "sqlite-graphrag").ok_or_else(|| {
79            AppError::Io(std::io::Error::new(
80                std::io::ErrorKind::NotFound,
81                "could not determine cache directory for sqlite-graphrag lock files",
82            ))
83        })?;
84        Ok(dirs.cache_dir().to_path_buf())
85    }
86}
87
88/// Returns the singleton lock file path for a given (job_type, namespace).
89///
90/// Layout: `job-singleton-{tag}-{namespace}.lock` in the same cache dir as
91/// the CLI slots.  The namespace is sanitised to a filesystem-safe slug
92/// (lowercase, hyphens, alphanumeric) and defaults to `default` when empty.
93fn job_singleton_path(job_type: JobType, namespace: &str) -> Result<PathBuf, AppError> {
94    let cache = cache_dir()?;
95    std::fs::create_dir_all(&cache)?;
96    let slug = if namespace.is_empty() {
97        "default".to_string()
98    } else {
99        namespace
100            .chars()
101            .map(|c| {
102                if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
103                    c.to_ascii_lowercase()
104                } else {
105                    '-'
106                }
107            })
108            .collect::<String>()
109    };
110    Ok(cache.join(format!("job-singleton-{}-{slug}.lock", job_type.tag())))
111}
112
113/// Tries to open and exclusively lock the lock file for the given slot.
114///
115/// Returns `Ok(file)` if the slot is free, or `Err(io::Error)` if it is
116/// held by another instance (non-blocking).
117fn try_acquire_slot(slot: usize) -> Result<File, AppError> {
118    let path = slot_path(slot)?;
119    let file = OpenOptions::new()
120        .read(true)
121        .write(true)
122        .create(true)
123        .truncate(false)
124        .open(&path)?;
125    file.try_lock_exclusive().map_err(AppError::Io)?;
126    Ok(file)
127}
128
129/// Acquires a concurrency slot from the `max_concurrency`-position semaphore.
130///
131/// Iterates slots `1..=max_concurrency` attempting `try_lock_exclusive` on each
132/// `cli-slot-N.lock` file. When a free slot is found, returns `(File, slot_number)`.
133/// If all slots are occupied:
134///
135/// - If `wait_seconds` is `None` or `Some(0)`, returns immediately with
136///   `AppError::AllSlotsFull { max, waited_secs: 0 }`.
137/// - If `wait_seconds` is `Some(n) > 0`, enters a polling loop every
138///   [`crate::constants::CLI_LOCK_POLL_INTERVAL_MS`] ms until the deadline expires, returning
139///   `AppError::AllSlotsFull { max, waited_secs: n }` if no slot opens.
140///
141/// The returned `File` MUST be kept alive until the process exits; dropping it
142/// releases the slot automatically via the implicit `flock` on close.
143pub fn acquire_cli_slot(
144    max_concurrency: usize,
145    wait_seconds: Option<u64>,
146) -> Result<(File, usize), AppError> {
147    // G18: use env override or 2*cpus as ceiling instead of hardcoded 4
148    let ncpus = std::thread::available_parallelism()
149        .map(|n| n.get())
150        .unwrap_or(4);
151    let ceiling = std::env::var("SQLITE_GRAPHRAG_MAX_CLI_INSTANCES")
152        .ok()
153        .and_then(|v| v.parse::<usize>().ok())
154        .unwrap_or_else(|| (2 * ncpus).max(MAX_CONCURRENT_CLI_INSTANCES));
155    let max = max_concurrency.clamp(1, ceiling);
156    let wait_secs = wait_seconds.unwrap_or(0);
157
158    // Tentativa inicial sem espera.
159    if let Some((file, slot)) = try_any_slot(max)? {
160        return Ok((file, slot));
161    }
162
163    if wait_secs == 0 {
164        return Err(AppError::AllSlotsFull {
165            max,
166            waited_secs: 0,
167        });
168    }
169
170    // Polling loop with progressive backoff until the deadline.
171    let deadline = Instant::now() + Duration::from_secs(wait_secs);
172    let mut polls: u64 = 0;
173    loop {
174        let poll_delay = CLI_LOCK_POLL_INTERVAL_MS
175            .saturating_mul(1 + polls / 4)
176            .min(CLI_LOCK_POLL_INTERVAL_MS * 4);
177        thread::sleep(Duration::from_millis(poll_delay));
178        polls += 1;
179        if let Some((file, slot)) = try_any_slot(max)? {
180            return Ok((file, slot));
181        }
182        if Instant::now() >= deadline {
183            return Err(AppError::AllSlotsFull {
184                max,
185                waited_secs: wait_secs,
186            });
187        }
188    }
189}
190
191/// Acquires a process-wide singleton lock for a heavy job type and namespace.
192///
193/// G28-B (v1.0.68): ensures at most one `enrich`, `ingest --mode
194/// claude-code`, or `ingest --mode codex` runs at a time per namespace.
195/// A second invocation in the same namespace either:
196///
197/// - Returns immediately with `AppError::JobSingletonLocked { job_type,
198///   namespace }` when `wait_seconds` is `None` or `Some(0)`.
199/// - Polls every [`JOB_SINGLETON_POLL_INTERVAL_MS`] ms until the lock
200///   drops or the deadline expires, returning the same error on timeout.
201///
202/// The returned `File` MUST be kept alive until the process exits;
203/// dropping it releases the singleton for the next invocation.
204pub fn acquire_job_singleton(
205    job_type: JobType,
206    namespace: &str,
207    wait_seconds: Option<u64>,
208) -> Result<File, AppError> {
209    let path = job_singleton_path(job_type, namespace)?;
210    let file = OpenOptions::new()
211        .read(true)
212        .write(true)
213        .create(true)
214        .truncate(false)
215        .open(&path)?;
216    if let Err(e) = file.try_lock_exclusive() {
217        if !is_lock_contended(&e) {
218            return Err(AppError::Io(e));
219        }
220        // Already held by another instance.
221        let wait_secs = wait_seconds.unwrap_or(0);
222        if wait_secs == 0 {
223            return Err(AppError::JobSingletonLocked {
224                job_type: job_type.tag().to_string(),
225                namespace: namespace.to_string(),
226            });
227        }
228        let deadline = Instant::now() + Duration::from_secs(wait_secs);
229        // Drop the failed handle before polling; flock is per-process so we
230        // re-open each attempt to refresh contention state.
231        drop(file);
232        loop {
233            thread::sleep(Duration::from_millis(JOB_SINGLETON_POLL_INTERVAL_MS));
234            let file = OpenOptions::new()
235                .read(true)
236                .write(true)
237                .create(true)
238                .truncate(false)
239                .open(&path)?;
240            if file.try_lock_exclusive().is_ok() {
241                return Ok(file);
242            }
243            if Instant::now() >= deadline {
244                return Err(AppError::JobSingletonLocked {
245                    job_type: job_type.tag().to_string(),
246                    namespace: namespace.to_string(),
247                });
248            }
249        }
250    }
251    Ok(file)
252}
253
254/// Tries to acquire any free slot in `1..=max`, returning the first available one.
255///
256/// Returns `Ok(Some((file, slot)))` if a slot was obtained, `Ok(None)` if all are
257/// occupied (`EWOULDBLOCK`). Propagates I/O errors other than "lock contended".
258fn try_any_slot(max: usize) -> Result<Option<(File, usize)>, AppError> {
259    for slot in 1..=max {
260        match try_acquire_slot(slot) {
261            Ok(file) => return Ok(Some((file, slot))),
262            Err(AppError::Io(e)) if is_lock_contended(&e) => continue,
263            Err(e) => return Err(e),
264        }
265    }
266    Ok(None)
267}
268
269fn is_lock_contended(error: &std::io::Error) -> bool {
270    if error.kind() == std::io::ErrorKind::WouldBlock {
271        return true;
272    }
273
274    #[cfg(windows)]
275    {
276        matches!(error.raw_os_error(), Some(32 | 33))
277    }
278
279    #[cfg(not(windows))]
280    {
281        false
282    }
283}
284
285#[cfg(test)]
286mod tests {
287    use super::*;
288    use std::sync::atomic::{AtomicUsize, Ordering};
289    static SEQ: AtomicUsize = AtomicUsize::new(0);
290
291    fn unique_ns() -> String {
292        let n = SEQ.fetch_add(1, Ordering::SeqCst);
293        let pid = std::process::id();
294        format!("test-{pid}-{n}")
295    }
296
297    #[test]
298    fn job_singleton_path_sanitises_namespace() {
299        let p = job_singleton_path(JobType::Enrich, "Foo Bar/Baz").expect("path should resolve");
300        let name = p.file_name().unwrap().to_string_lossy().to_string();
301        assert!(name.contains("enrich"), "got {name}");
302        assert!(name.contains("foo-bar-baz"), "got {name}");
303    }
304
305    #[test]
306    fn job_singleton_blocks_second_invocation_same_namespace() {
307        let ns = unique_ns();
308        let first = acquire_job_singleton(JobType::Enrich, &ns, Some(0))
309            .expect("first acquire should succeed");
310        let second = acquire_job_singleton(JobType::Enrich, &ns, Some(0));
311        assert!(
312            matches!(second, Err(AppError::JobSingletonLocked { .. })),
313            "expected JobSingletonLocked, got {second:?}"
314        );
315        drop(first);
316    }
317
318    #[test]
319    fn job_singleton_allows_different_namespaces() {
320        let ns_a = unique_ns();
321        let ns_b = unique_ns();
322        let first = acquire_job_singleton(JobType::IngestClaudeCode, &ns_a, Some(0))
323            .expect("ns_a should acquire");
324        let second = acquire_job_singleton(JobType::IngestClaudeCode, &ns_b, Some(0))
325            .expect("ns_b should acquire in parallel");
326        drop(first);
327        drop(second);
328    }
329}