1use std::fs::{File, OpenOptions};
24use std::path::PathBuf;
25use std::thread;
26use std::time::{Duration, Instant};
27
28use directories::ProjectDirs;
29use fs4::fs_std::FileExt;
30
31use crate::constants::{
32 CLI_LOCK_POLL_INTERVAL_MS, JOB_SINGLETON_POLL_INTERVAL_MS, MAX_CONCURRENT_CLI_INSTANCES,
33};
34use crate::errors::AppError;
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum JobType {
43 Enrich,
45 IngestClaudeCode,
47 IngestCodex,
49}
50
51impl JobType {
52 fn tag(self) -> &'static str {
54 match self {
55 JobType::Enrich => "enrich",
56 JobType::IngestClaudeCode => "ingest-claude-code",
57 JobType::IngestCodex => "ingest-codex",
58 }
59 }
60}
61
62fn slot_path(slot: usize) -> Result<PathBuf, AppError> {
68 let cache = cache_dir()?;
69 std::fs::create_dir_all(&cache)?;
70 Ok(cache.join(format!("cli-slot-{slot}.lock")))
71}
72
73fn cache_dir() -> Result<PathBuf, AppError> {
75 if let Some(override_dir) = std::env::var_os("SQLITE_GRAPHRAG_CACHE_DIR") {
76 Ok(PathBuf::from(override_dir))
77 } else {
78 let dirs = ProjectDirs::from("", "", "sqlite-graphrag").ok_or_else(|| {
79 AppError::Io(std::io::Error::new(
80 std::io::ErrorKind::NotFound,
81 "could not determine cache directory for sqlite-graphrag lock files",
82 ))
83 })?;
84 Ok(dirs.cache_dir().to_path_buf())
85 }
86}
87
88fn job_singleton_path(job_type: JobType, namespace: &str) -> Result<PathBuf, AppError> {
94 let cache = cache_dir()?;
95 std::fs::create_dir_all(&cache)?;
96 let slug = if namespace.is_empty() {
97 "default".to_string()
98 } else {
99 namespace
100 .chars()
101 .map(|c| {
102 if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
103 c.to_ascii_lowercase()
104 } else {
105 '-'
106 }
107 })
108 .collect::<String>()
109 };
110 Ok(cache.join(format!("job-singleton-{}-{slug}.lock", job_type.tag())))
111}
112
113fn try_acquire_slot(slot: usize) -> Result<File, AppError> {
118 let path = slot_path(slot)?;
119 let file = OpenOptions::new()
120 .read(true)
121 .write(true)
122 .create(true)
123 .truncate(false)
124 .open(&path)?;
125 file.try_lock_exclusive().map_err(AppError::Io)?;
126 Ok(file)
127}
128
129pub fn acquire_cli_slot(
144 max_concurrency: usize,
145 wait_seconds: Option<u64>,
146) -> Result<(File, usize), AppError> {
147 let ncpus = std::thread::available_parallelism()
149 .map(|n| n.get())
150 .unwrap_or(4);
151 let ceiling = std::env::var("SQLITE_GRAPHRAG_MAX_CLI_INSTANCES")
152 .ok()
153 .and_then(|v| v.parse::<usize>().ok())
154 .unwrap_or_else(|| (2 * ncpus).max(MAX_CONCURRENT_CLI_INSTANCES));
155 let max = max_concurrency.clamp(1, ceiling);
156 let wait_secs = wait_seconds.unwrap_or(0);
157
158 if let Some((file, slot)) = try_any_slot(max)? {
160 return Ok((file, slot));
161 }
162
163 if wait_secs == 0 {
164 return Err(AppError::AllSlotsFull {
165 max,
166 waited_secs: 0,
167 });
168 }
169
170 let deadline = Instant::now() + Duration::from_secs(wait_secs);
172 let mut polls: u64 = 0;
173 loop {
174 let poll_delay = CLI_LOCK_POLL_INTERVAL_MS
175 .saturating_mul(1 + polls / 4)
176 .min(CLI_LOCK_POLL_INTERVAL_MS * 4);
177 thread::sleep(Duration::from_millis(poll_delay));
178 polls += 1;
179 if let Some((file, slot)) = try_any_slot(max)? {
180 return Ok((file, slot));
181 }
182 if Instant::now() >= deadline {
183 return Err(AppError::AllSlotsFull {
184 max,
185 waited_secs: wait_secs,
186 });
187 }
188 }
189}
190
191pub fn acquire_job_singleton(
205 job_type: JobType,
206 namespace: &str,
207 wait_seconds: Option<u64>,
208) -> Result<File, AppError> {
209 let path = job_singleton_path(job_type, namespace)?;
210 let file = OpenOptions::new()
211 .read(true)
212 .write(true)
213 .create(true)
214 .truncate(false)
215 .open(&path)?;
216 if let Err(e) = file.try_lock_exclusive() {
217 if !is_lock_contended(&e) {
218 return Err(AppError::Io(e));
219 }
220 let wait_secs = wait_seconds.unwrap_or(0);
222 if wait_secs == 0 {
223 return Err(AppError::JobSingletonLocked {
224 job_type: job_type.tag().to_string(),
225 namespace: namespace.to_string(),
226 });
227 }
228 let deadline = Instant::now() + Duration::from_secs(wait_secs);
229 drop(file);
232 loop {
233 thread::sleep(Duration::from_millis(JOB_SINGLETON_POLL_INTERVAL_MS));
234 let file = OpenOptions::new()
235 .read(true)
236 .write(true)
237 .create(true)
238 .truncate(false)
239 .open(&path)?;
240 if file.try_lock_exclusive().is_ok() {
241 return Ok(file);
242 }
243 if Instant::now() >= deadline {
244 return Err(AppError::JobSingletonLocked {
245 job_type: job_type.tag().to_string(),
246 namespace: namespace.to_string(),
247 });
248 }
249 }
250 }
251 Ok(file)
252}
253
254fn try_any_slot(max: usize) -> Result<Option<(File, usize)>, AppError> {
259 for slot in 1..=max {
260 match try_acquire_slot(slot) {
261 Ok(file) => return Ok(Some((file, slot))),
262 Err(AppError::Io(e)) if is_lock_contended(&e) => continue,
263 Err(e) => return Err(e),
264 }
265 }
266 Ok(None)
267}
268
269fn is_lock_contended(error: &std::io::Error) -> bool {
270 if error.kind() == std::io::ErrorKind::WouldBlock {
271 return true;
272 }
273
274 #[cfg(windows)]
275 {
276 matches!(error.raw_os_error(), Some(32 | 33))
277 }
278
279 #[cfg(not(windows))]
280 {
281 false
282 }
283}
284
285#[cfg(test)]
286mod tests {
287 use super::*;
288 use std::sync::atomic::{AtomicUsize, Ordering};
289 static SEQ: AtomicUsize = AtomicUsize::new(0);
290
291 fn unique_ns() -> String {
292 let n = SEQ.fetch_add(1, Ordering::SeqCst);
293 let pid = std::process::id();
294 format!("test-{pid}-{n}")
295 }
296
297 #[test]
298 fn job_singleton_path_sanitises_namespace() {
299 let p = job_singleton_path(JobType::Enrich, "Foo Bar/Baz").expect("path should resolve");
300 let name = p.file_name().unwrap().to_string_lossy().to_string();
301 assert!(name.contains("enrich"), "got {name}");
302 assert!(name.contains("foo-bar-baz"), "got {name}");
303 }
304
305 #[test]
306 fn job_singleton_blocks_second_invocation_same_namespace() {
307 let ns = unique_ns();
308 let first = acquire_job_singleton(JobType::Enrich, &ns, Some(0))
309 .expect("first acquire should succeed");
310 let second = acquire_job_singleton(JobType::Enrich, &ns, Some(0));
311 assert!(
312 matches!(second, Err(AppError::JobSingletonLocked { .. })),
313 "expected JobSingletonLocked, got {second:?}"
314 );
315 drop(first);
316 }
317
318 #[test]
319 fn job_singleton_allows_different_namespaces() {
320 let ns_a = unique_ns();
321 let ns_b = unique_ns();
322 let first = acquire_job_singleton(JobType::IngestClaudeCode, &ns_a, Some(0))
323 .expect("ns_a should acquire");
324 let second = acquire_job_singleton(JobType::IngestClaudeCode, &ns_b, Some(0))
325 .expect("ns_b should acquire in parallel");
326 drop(first);
327 drop(second);
328 }
329}