Skip to main content

sqlite_graphrag/
lock.rs

1//! Counting semaphore via lock files to limit parallel CLI invocations.
2//!
3//! `acquire_cli_slot` tries to acquire one of `N` available slots by opening the file
4//! `cli-slot-{N}.lock` in the OS cache directory and obtaining an exclusive `flock`.
5//! The returned [`std::fs::File`] MUST be kept alive for the entire duration of `main`;
6//! dropping it releases the slot automatically for the next invocation.
7//!
8//! When `wait_seconds` is `Some(n) > 0`, the function polls every
9//! [`crate::constants::CLI_LOCK_POLL_INTERVAL_MS`] milliseconds until the deadline. When it
10//! is `None` or `Some(0)`, a single attempt is made and `Err(AppError::AllSlotsFull)` is
11//! returned immediately if all slots are occupied.
12// Workload: I/O-bound (flock polling with exponential backoff sleep)
13
14use std::fs::{File, OpenOptions};
15use std::path::PathBuf;
16use std::thread;
17use std::time::{Duration, Instant};
18
19use directories::ProjectDirs;
20use fs4::fs_std::FileExt;
21
22use crate::constants::{CLI_LOCK_POLL_INTERVAL_MS, MAX_CONCURRENT_CLI_INSTANCES};
23use crate::errors::AppError;
24
25/// Returns the lock file path for the given slot.
26///
27/// Honours `SQLITE_GRAPHRAG_CACHE_DIR` when set (useful for tests, containers,
28/// and NFS caches), falling back to the OS default cache directory via
29/// `directories::ProjectDirs`. The slot must be 1-based.
30fn slot_path(slot: usize) -> Result<PathBuf, AppError> {
31    let cache = if let Some(override_dir) = std::env::var_os("SQLITE_GRAPHRAG_CACHE_DIR") {
32        PathBuf::from(override_dir)
33    } else {
34        let dirs = ProjectDirs::from("", "", "sqlite-graphrag").ok_or_else(|| {
35            AppError::Io(std::io::Error::new(
36                std::io::ErrorKind::NotFound,
37                "could not determine cache directory for sqlite-graphrag lock files",
38            ))
39        })?;
40        dirs.cache_dir().to_path_buf()
41    };
42    std::fs::create_dir_all(&cache)?;
43    Ok(cache.join(format!("cli-slot-{slot}.lock")))
44}
45
46/// Tries to open and exclusively lock the lock file for the given slot.
47///
48/// Returns `Ok(file)` if the slot is free, or `Err(io::Error)` if it is
49/// held by another instance (non-blocking).
50fn try_acquire_slot(slot: usize) -> Result<File, AppError> {
51    let path = slot_path(slot)?;
52    let file = OpenOptions::new()
53        .read(true)
54        .write(true)
55        .create(true)
56        .truncate(false)
57        .open(&path)?;
58    file.try_lock_exclusive().map_err(AppError::Io)?;
59    Ok(file)
60}
61
62/// Acquires a concurrency slot from the `max_concurrency`-position semaphore.
63///
64/// Iterates slots `1..=max_concurrency` attempting `try_lock_exclusive` on each
65/// `cli-slot-N.lock` file. When a free slot is found, returns `(File, slot_number)`.
66/// If all slots are occupied:
67///
68/// - If `wait_seconds` is `None` or `Some(0)`, returns immediately with
69///   `AppError::AllSlotsFull { max, waited_secs: 0 }`.
70/// - If `wait_seconds` is `Some(n) > 0`, enters a polling loop every
71///   [`crate::constants::CLI_LOCK_POLL_INTERVAL_MS`] ms until the deadline expires, returning
72///   `AppError::AllSlotsFull { max, waited_secs: n }` if no slot opens.
73///
74/// The returned `File` MUST be kept alive until the process exits; dropping it
75/// releases the slot automatically via the implicit `flock` on close.
76pub fn acquire_cli_slot(
77    max_concurrency: usize,
78    wait_seconds: Option<u64>,
79) -> Result<(File, usize), AppError> {
80    // G18: use env override or 2*cpus as ceiling instead of hardcoded 4
81    let ncpus = std::thread::available_parallelism()
82        .map(|n| n.get())
83        .unwrap_or(4);
84    let ceiling = std::env::var("SQLITE_GRAPHRAG_MAX_CLI_INSTANCES")
85        .ok()
86        .and_then(|v| v.parse::<usize>().ok())
87        .unwrap_or_else(|| (2 * ncpus).max(MAX_CONCURRENT_CLI_INSTANCES));
88    let max = max_concurrency.clamp(1, ceiling);
89    let wait_secs = wait_seconds.unwrap_or(0);
90
91    // Tentativa inicial sem espera.
92    if let Some((file, slot)) = try_any_slot(max)? {
93        return Ok((file, slot));
94    }
95
96    if wait_secs == 0 {
97        return Err(AppError::AllSlotsFull {
98            max,
99            waited_secs: 0,
100        });
101    }
102
103    // Polling loop with progressive backoff until the deadline.
104    let deadline = Instant::now() + Duration::from_secs(wait_secs);
105    let mut polls: u64 = 0;
106    loop {
107        let poll_delay = CLI_LOCK_POLL_INTERVAL_MS
108            .saturating_mul(1 + polls / 4)
109            .min(CLI_LOCK_POLL_INTERVAL_MS * 4);
110        thread::sleep(Duration::from_millis(poll_delay));
111        polls += 1;
112        if let Some((file, slot)) = try_any_slot(max)? {
113            return Ok((file, slot));
114        }
115        if Instant::now() >= deadline {
116            return Err(AppError::AllSlotsFull {
117                max,
118                waited_secs: wait_secs,
119            });
120        }
121    }
122}
123
124/// Tries to acquire any free slot in `1..=max`, returning the first available one.
125///
126/// Returns `Ok(Some((file, slot)))` if a slot was obtained, `Ok(None)` if all are
127/// occupied (`EWOULDBLOCK`). Propagates I/O errors other than "lock contended".
128fn try_any_slot(max: usize) -> Result<Option<(File, usize)>, AppError> {
129    for slot in 1..=max {
130        match try_acquire_slot(slot) {
131            Ok(file) => return Ok(Some((file, slot))),
132            Err(AppError::Io(e)) if is_lock_contended(&e) => continue,
133            Err(e) => return Err(e),
134        }
135    }
136    Ok(None)
137}
138
139fn is_lock_contended(error: &std::io::Error) -> bool {
140    if error.kind() == std::io::ErrorKind::WouldBlock {
141        return true;
142    }
143
144    #[cfg(windows)]
145    {
146        matches!(error.raw_os_error(), Some(32 | 33))
147    }
148
149    #[cfg(not(windows))]
150    {
151        false
152    }
153}