sqlite_graphrag/lock.rs
1//! Counting semaphore via lock files to limit parallel CLI invocations.
2//!
3//! `acquire_cli_slot` tries to acquire one of `N` available slots by opening the file
4//! `cli-slot-{N}.lock` in the OS cache directory and obtaining an exclusive `flock`.
5//! The returned [`std::fs::File`] MUST be kept alive for the entire duration of `main`;
6//! dropping it releases the slot automatically for the next invocation.
7//!
8//! When `wait_seconds` is `Some(n) > 0`, the function polls every
9//! [`crate::constants::CLI_LOCK_POLL_INTERVAL_MS`] milliseconds until the deadline. When it
10//! is `None` or `Some(0)`, a single attempt is made and `Err(AppError::AllSlotsFull)` is
11//! returned immediately if all slots are occupied.
12
13use std::fs::{File, OpenOptions};
14use std::path::PathBuf;
15use std::thread;
16use std::time::{Duration, Instant};
17
18use directories::ProjectDirs;
19use fs4::fs_std::FileExt;
20
21use crate::constants::{CLI_LOCK_POLL_INTERVAL_MS, MAX_CONCURRENT_CLI_INSTANCES};
22use crate::errors::AppError;
23
24/// Returns the lock file path for the given slot.
25///
26/// Honours `SQLITE_GRAPHRAG_CACHE_DIR` when set (useful for tests, containers,
27/// and NFS caches), falling back to the OS default cache directory via
28/// `directories::ProjectDirs`. The slot must be 1-based.
29fn slot_path(slot: usize) -> Result<PathBuf, AppError> {
30 let cache = if let Some(override_dir) = std::env::var_os("SQLITE_GRAPHRAG_CACHE_DIR") {
31 PathBuf::from(override_dir)
32 } else {
33 let dirs = ProjectDirs::from("", "", "sqlite-graphrag").ok_or_else(|| {
34 AppError::Io(std::io::Error::new(
35 std::io::ErrorKind::NotFound,
36 "não foi possível determinar o diretório de cache para os lock files do sqlite-graphrag",
37 ))
38 })?;
39 dirs.cache_dir().to_path_buf()
40 };
41 std::fs::create_dir_all(&cache)?;
42 Ok(cache.join(format!("cli-slot-{slot}.lock")))
43}
44
45/// Tries to open and exclusively lock the lock file for the given slot.
46///
47/// Returns `Ok(file)` if the slot is free, or `Err(io::Error)` if it is
48/// held by another instance (non-blocking).
49fn try_acquire_slot(slot: usize) -> Result<File, AppError> {
50 let path = slot_path(slot)?;
51 let file = OpenOptions::new()
52 .read(true)
53 .write(true)
54 .create(true)
55 .truncate(false)
56 .open(&path)?;
57 file.try_lock_exclusive().map_err(AppError::Io)?;
58 Ok(file)
59}
60
61/// Acquires a concurrency slot from the `max_concurrency`-position semaphore.
62///
63/// Iterates slots `1..=max_concurrency` attempting `try_lock_exclusive` on each
64/// `cli-slot-N.lock` file. When a free slot is found, returns `(File, slot_number)`.
65/// If all slots are occupied:
66///
67/// - If `wait_seconds` is `None` or `Some(0)`, returns immediately with
68/// `AppError::AllSlotsFull { max, waited_secs: 0 }`.
69/// - If `wait_seconds` is `Some(n) > 0`, enters a polling loop every
70/// [`crate::constants::CLI_LOCK_POLL_INTERVAL_MS`] ms until the deadline expires, returning
71/// `AppError::AllSlotsFull { max, waited_secs: n }` if no slot opens.
72///
73/// The returned `File` MUST be kept alive until the process exits; dropping it
74/// releases the slot automatically via the implicit `flock` on close.
75pub fn acquire_cli_slot(
76 max_concurrency: usize,
77 wait_seconds: Option<u64>,
78) -> Result<(File, usize), AppError> {
79 let max = max_concurrency.clamp(1, MAX_CONCURRENT_CLI_INSTANCES);
80 let wait_secs = wait_seconds.unwrap_or(0);
81
82 // Tentativa inicial sem espera.
83 if let Some((file, slot)) = try_any_slot(max)? {
84 return Ok((file, slot));
85 }
86
87 if wait_secs == 0 {
88 return Err(AppError::AllSlotsFull {
89 max,
90 waited_secs: 0,
91 });
92 }
93
94 // Loop de polling até o deadline.
95 let deadline = Instant::now() + Duration::from_secs(wait_secs);
96 loop {
97 thread::sleep(Duration::from_millis(CLI_LOCK_POLL_INTERVAL_MS));
98 if let Some((file, slot)) = try_any_slot(max)? {
99 return Ok((file, slot));
100 }
101 if Instant::now() >= deadline {
102 return Err(AppError::AllSlotsFull {
103 max,
104 waited_secs: wait_secs,
105 });
106 }
107 }
108}
109
110/// Tries to acquire any free slot in `1..=max`, returning the first available one.
111///
112/// Returns `Ok(Some((file, slot)))` if a slot was obtained, `Ok(None)` if all are
113/// occupied (`EWOULDBLOCK`). Propagates I/O errors other than "lock contended".
114fn try_any_slot(max: usize) -> Result<Option<(File, usize)>, AppError> {
115 for slot in 1..=max {
116 match try_acquire_slot(slot) {
117 Ok(file) => return Ok(Some((file, slot))),
118 Err(AppError::Io(e)) if is_lock_contended(&e) => continue,
119 Err(e) => return Err(e),
120 }
121 }
122 Ok(None)
123}
124
125fn is_lock_contended(error: &std::io::Error) -> bool {
126 if error.kind() == std::io::ErrorKind::WouldBlock {
127 return true;
128 }
129
130 #[cfg(windows)]
131 {
132 matches!(error.raw_os_error(), Some(32 | 33))
133 }
134
135 #[cfg(not(windows))]
136 {
137 false
138 }
139}