1use std::fs::{File, OpenOptions};
24use std::path::{Path, PathBuf};
25use std::thread;
26use std::time::{Duration, Instant};
27
28use directories::ProjectDirs;
29use fs4::fs_std::FileExt;
30
31use crate::constants::{
32 CLI_LOCK_POLL_INTERVAL_MS, JOB_SINGLETON_POLL_INTERVAL_MS, MAX_CONCURRENT_CLI_INSTANCES,
33};
34use crate::errors::AppError;
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42pub enum JobType {
43 Enrich,
45 IngestClaudeCode,
47 IngestCodex,
49}
50
51impl JobType {
52 fn tag(self) -> &'static str {
54 match self {
55 JobType::Enrich => "enrich",
56 JobType::IngestClaudeCode => "ingest-claude-code",
57 JobType::IngestCodex => "ingest-codex",
58 }
59 }
60}
61
62fn slot_path(slot: usize) -> Result<PathBuf, AppError> {
68 let cache = cache_dir()?;
69 std::fs::create_dir_all(&cache)?;
70 Ok(cache.join(format!("cli-slot-{slot}.lock")))
71}
72
73fn cache_dir() -> Result<PathBuf, AppError> {
75 if let Some(override_dir) = std::env::var_os("SQLITE_GRAPHRAG_CACHE_DIR") {
76 Ok(PathBuf::from(override_dir))
77 } else {
78 let dirs = ProjectDirs::from("", "", "sqlite-graphrag").ok_or_else(|| {
79 AppError::Io(std::io::Error::new(
80 std::io::ErrorKind::NotFound,
81 "could not determine cache directory for sqlite-graphrag lock files",
82 ))
83 })?;
84 Ok(dirs.cache_dir().to_path_buf())
85 }
86}
87
88pub fn db_path_hash(db_path: &Path) -> String {
93 let canonical = db_path
94 .canonicalize()
95 .unwrap_or_else(|_| db_path.to_path_buf());
96 let hash = blake3::hash(canonical.to_string_lossy().as_bytes());
97 hash.to_hex().to_string()[..12].to_string()
98}
99
100pub fn job_singleton_path(
113 job_type: JobType,
114 namespace: &str,
115 db_hash: &str,
116) -> Result<PathBuf, AppError> {
117 let cache = cache_dir()?;
118 std::fs::create_dir_all(&cache)?;
119 let slug = if namespace.is_empty() {
120 "default".to_string()
121 } else {
122 namespace
123 .chars()
124 .map(|c| {
125 if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
126 c.to_ascii_lowercase()
127 } else {
128 '-'
129 }
130 })
131 .collect::<String>()
132 };
133 let safe_hash: String = db_hash
134 .chars()
135 .filter(|c| c.is_ascii_alphanumeric())
136 .take(16)
137 .collect();
138 Ok(cache.join(format!(
139 "job-singleton-{}-{slug}-{safe_hash}.lock",
140 job_type.tag()
141 )))
142}
143
144fn try_acquire_slot(slot: usize) -> Result<File, AppError> {
149 let path = slot_path(slot)?;
150 let file = OpenOptions::new()
151 .read(true)
152 .write(true)
153 .create(true)
154 .truncate(false)
155 .open(&path)?;
156 file.try_lock_exclusive().map_err(AppError::Io)?;
157 Ok(file)
158}
159
160pub fn acquire_cli_slot(
175 max_concurrency: usize,
176 wait_seconds: Option<u64>,
177) -> Result<(File, usize), AppError> {
178 let ncpus = std::thread::available_parallelism()
180 .map(|n| n.get())
181 .unwrap_or(4);
182 let ceiling = std::env::var("SQLITE_GRAPHRAG_MAX_CLI_INSTANCES")
183 .ok()
184 .and_then(|v| v.parse::<usize>().ok())
185 .unwrap_or_else(|| (2 * ncpus).max(MAX_CONCURRENT_CLI_INSTANCES));
186 let max = max_concurrency.clamp(1, ceiling);
187 let wait_secs = wait_seconds.unwrap_or(0);
188
189 if let Some((file, slot)) = try_any_slot(max)? {
191 return Ok((file, slot));
192 }
193
194 if wait_secs == 0 {
195 return Err(AppError::AllSlotsFull {
196 max,
197 waited_secs: 0,
198 });
199 }
200
201 let deadline = Instant::now() + Duration::from_secs(wait_secs);
203 let mut polls: u64 = 0;
204 loop {
205 let poll_delay = CLI_LOCK_POLL_INTERVAL_MS
206 .saturating_mul(1 + polls / 4)
207 .min(CLI_LOCK_POLL_INTERVAL_MS * 4);
208 thread::sleep(Duration::from_millis(poll_delay));
209 polls += 1;
210 if let Some((file, slot)) = try_any_slot(max)? {
211 return Ok((file, slot));
212 }
213 if Instant::now() >= deadline {
214 return Err(AppError::AllSlotsFull {
215 max,
216 waited_secs: wait_secs,
217 });
218 }
219 }
220}
221
222pub fn acquire_job_singleton(
236 job_type: JobType,
237 namespace: &str,
238 db_path: &Path,
239 wait_seconds: Option<u64>,
240 force: bool,
241) -> Result<File, AppError> {
242 let db_hash = db_path_hash(db_path);
243 let path = job_singleton_path(job_type, namespace, &db_hash)?;
244
245 if force && path.exists() {
251 tracing::warn!(target: "lock",
252 path = %path.display(),
253 "force=true; removing pre-existing singleton lock file"
254 );
255 let _ = std::fs::remove_file(&path);
256 }
257
258 let file = OpenOptions::new()
259 .read(true)
260 .write(true)
261 .create(true)
262 .truncate(false)
263 .open(&path)?;
264 if let Err(e) = file.try_lock_exclusive() {
265 if !is_lock_contended(&e) {
266 return Err(AppError::Io(e));
267 }
268 let wait_secs = wait_seconds.unwrap_or(0);
270 if wait_secs == 0 {
271 return Err(AppError::JobSingletonLocked {
272 job_type: job_type.tag().to_string(),
273 namespace: namespace.to_string(),
274 });
275 }
276 let deadline = Instant::now() + Duration::from_secs(wait_secs);
277 drop(file);
280 loop {
281 thread::sleep(Duration::from_millis(JOB_SINGLETON_POLL_INTERVAL_MS));
282 let file = OpenOptions::new()
283 .read(true)
284 .write(true)
285 .create(true)
286 .truncate(false)
287 .open(&path)?;
288 if file.try_lock_exclusive().is_ok() {
289 return Ok(file);
290 }
291 if Instant::now() >= deadline {
292 return Err(AppError::JobSingletonLocked {
293 job_type: job_type.tag().to_string(),
294 namespace: namespace.to_string(),
295 });
296 }
297 }
298 }
299 Ok(file)
300}
301
302fn try_any_slot(max: usize) -> Result<Option<(File, usize)>, AppError> {
307 for slot in 1..=max {
308 match try_acquire_slot(slot) {
309 Ok(file) => return Ok(Some((file, slot))),
310 Err(AppError::Io(e)) if is_lock_contended(&e) => continue,
311 Err(e) => return Err(e),
312 }
313 }
314 Ok(None)
315}
316
317fn is_lock_contended(error: &std::io::Error) -> bool {
318 if error.kind() == std::io::ErrorKind::WouldBlock {
319 return true;
320 }
321
322 #[cfg(windows)]
323 {
324 matches!(error.raw_os_error(), Some(32 | 33))
325 }
326
327 #[cfg(not(windows))]
328 {
329 false
330 }
331}
332
333#[cfg(test)]
334mod tests {
335 use super::*;
336 use std::sync::atomic::{AtomicUsize, Ordering};
337 static SEQ: AtomicUsize = AtomicUsize::new(0);
338
339 fn unique_ns() -> String {
340 let n = SEQ.fetch_add(1, Ordering::SeqCst);
341 let pid = std::process::id();
342 format!("test-{pid}-{n}")
343 }
344
345 #[test]
346 fn job_singleton_path_sanitises_namespace() {
347 let p = job_singleton_path(JobType::Enrich, "Foo Bar/Baz", "abc123def456")
348 .expect("path should resolve");
349 let name = p.file_name().unwrap().to_string_lossy().to_string();
350 assert!(name.contains("enrich"), "got {name}");
351 assert!(name.contains("foo-bar-baz"), "got {name}");
352 assert!(
353 name.contains("abc123def456"),
354 "must embed db_hash: got {name}"
355 );
356 }
357
358 #[test]
359 fn job_singleton_blocks_second_invocation_same_namespace() {
360 let ns = unique_ns();
361 let db = std::env::temp_dir().join(format!("test-{}.sqlite", unique_ns()));
362 let first = acquire_job_singleton(JobType::Enrich, &ns, &db, Some(0), false)
363 .expect("first acquire should succeed");
364 let second = acquire_job_singleton(JobType::Enrich, &ns, &db, Some(0), false);
365 assert!(
366 matches!(second, Err(AppError::JobSingletonLocked { .. })),
367 "expected JobSingletonLocked, got {second:?}"
368 );
369 drop(first);
370 }
371
372 #[test]
373 fn job_singleton_allows_different_namespaces() {
374 let ns_a = unique_ns();
375 let ns_b = unique_ns();
376 let db_a = std::env::temp_dir().join(format!("test-a-{}.sqlite", unique_ns()));
377 let db_b = std::env::temp_dir().join(format!("test-b-{}.sqlite", unique_ns()));
378 let first = acquire_job_singleton(JobType::IngestClaudeCode, &ns_a, &db_a, Some(0), false)
379 .expect("ns_a should acquire");
380 let second = acquire_job_singleton(JobType::IngestClaudeCode, &ns_b, &db_b, Some(0), false)
381 .expect("ns_b should acquire in parallel");
382 drop(first);
383 drop(second);
384 }
385
386 #[test]
387 fn job_singleton_scoped_by_db_hash() {
388 let ns = unique_ns();
391 let db_a = std::env::temp_dir().join(format!("test-x-{}.sqlite", unique_ns()));
392 let db_b = std::env::temp_dir().join(format!("test-y-{}.sqlite", unique_ns()));
393 let first = acquire_job_singleton(JobType::Enrich, &ns, &db_a, Some(0), false)
394 .expect("db_a should acquire");
395 let second = acquire_job_singleton(JobType::Enrich, &ns, &db_b, Some(0), false)
396 .expect("db_b should acquire independently (G30 fix)");
397 drop(first);
398 drop(second);
399 }
400
401 #[test]
402 fn db_path_hash_is_stable_for_same_path() {
403 let p = std::env::temp_dir().join("hashing-test.sqlite");
404 let h1 = db_path_hash(&p);
405 let h2 = db_path_hash(&p);
406 assert_eq!(h1, h2, "same path must produce same hash");
407 assert_eq!(h1.len(), 12, "BLAKE3 prefix must be 12 hex chars");
408 }
409
410 #[test]
411 fn db_path_hash_differs_for_different_paths() {
412 let a = std::env::temp_dir().join("hash-a.sqlite");
413 let b = std::env::temp_dir().join("hash-b.sqlite");
414 assert_ne!(db_path_hash(&a), db_path_hash(&b));
415 }
416}