1use std::fs::{File, OpenOptions};
24use std::path::{Path, PathBuf};
25use std::thread;
26use std::time::{Duration, Instant};
27
28use directories::ProjectDirs;
29use fs4::fs_std::FileExt;
30
31use crate::constants::{
32 CLI_LOCK_POLL_INTERVAL_MS, JOB_SINGLETON_POLL_INTERVAL_MS, LLM_WORKER_RSS_MB,
33 MAX_CONCURRENT_CLI_INSTANCES,
34};
35use crate::errors::AppError;
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
43pub enum JobType {
44 Enrich,
46 IngestClaudeCode,
48 IngestCodex,
50}
51
52impl JobType {
53 fn tag(self) -> &'static str {
55 match self {
56 JobType::Enrich => "enrich",
57 JobType::IngestClaudeCode => "ingest-claude-code",
58 JobType::IngestCodex => "ingest-codex",
59 }
60 }
61}
62
63fn slot_path(slot: usize) -> Result<PathBuf, AppError> {
69 let cache = cache_dir()?;
70 std::fs::create_dir_all(&cache)?;
71 Ok(cache.join(format!("cli-slot-{slot}.lock")))
72}
73
74fn cache_dir() -> Result<PathBuf, AppError> {
76 if let Some(override_dir) = std::env::var_os("SQLITE_GRAPHRAG_CACHE_DIR") {
77 Ok(PathBuf::from(override_dir))
78 } else {
79 let dirs = ProjectDirs::from("", "", "sqlite-graphrag").ok_or_else(|| {
80 AppError::Io(std::io::Error::new(
81 std::io::ErrorKind::NotFound,
82 "could not determine cache directory for sqlite-graphrag lock files",
83 ))
84 })?;
85 Ok(dirs.cache_dir().to_path_buf())
86 }
87}
88
89pub fn db_path_hash(db_path: &Path) -> String {
94 let canonical = db_path
95 .canonicalize()
96 .unwrap_or_else(|_| db_path.to_path_buf());
97 let hash = blake3::hash(canonical.to_string_lossy().as_bytes());
98 hash.to_hex().to_string()[..12].to_string()
99}
100
101pub fn job_singleton_path(
114 job_type: JobType,
115 namespace: &str,
116 db_hash: &str,
117) -> Result<PathBuf, AppError> {
118 let cache = cache_dir()?;
119 std::fs::create_dir_all(&cache)?;
120 let slug = if namespace.is_empty() {
121 "default".to_string()
122 } else {
123 namespace
124 .chars()
125 .map(|c| {
126 if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
127 c.to_ascii_lowercase()
128 } else {
129 '-'
130 }
131 })
132 .collect::<String>()
133 };
134 let safe_hash: String = db_hash
135 .chars()
136 .filter(|c| c.is_ascii_alphanumeric())
137 .take(16)
138 .collect();
139 Ok(cache.join(format!(
140 "job-singleton-{}-{slug}-{safe_hash}.lock",
141 job_type.tag()
142 )))
143}
144
145fn try_acquire_slot(slot: usize) -> Result<File, AppError> {
150 let path = slot_path(slot)?;
151 let file = OpenOptions::new()
152 .read(true)
153 .write(true)
154 .create(true)
155 .truncate(false)
156 .open(&path)?;
157 file.try_lock_exclusive().map_err(AppError::Io)?;
158 Ok(file)
159}
160
161pub fn calculate_safe_concurrency() -> usize {
186 use sysinfo::System;
187 let mut sys = System::new();
188 sys.refresh_memory();
189 let available_mb = sys.available_memory() / 1_048_576;
190 let cpus = std::thread::available_parallelism()
191 .map(|n| n.get())
192 .unwrap_or(2);
193
194 let per_worker_mb = LLM_WORKER_RSS_MB;
195
196 let memory_bound = if available_mb == 0 {
197 cpus
198 } else {
199 (available_mb / per_worker_mb.max(1)) as usize
200 };
201 let raw = cpus.min(memory_bound).max(1);
202 raw.min(MAX_CONCURRENT_CLI_INSTANCES)
203}
204
205pub fn worker_cost_mb() -> u64 {
208 LLM_WORKER_RSS_MB
209}
210
211pub fn acquire_cli_slot(
216 max_concurrency: usize,
217 wait_seconds: Option<u64>,
218) -> Result<(File, usize), AppError> {
219 let ncpus = std::thread::available_parallelism()
221 .map(|n| n.get())
222 .unwrap_or(4);
223 let ceiling = std::env::var("SQLITE_GRAPHRAG_MAX_CLI_INSTANCES")
224 .ok()
225 .and_then(|v| v.parse::<usize>().ok())
226 .unwrap_or_else(|| (2 * ncpus).max(MAX_CONCURRENT_CLI_INSTANCES));
227 let max = max_concurrency.clamp(1, ceiling);
228 let wait_secs = wait_seconds.unwrap_or(0);
229
230 if let Some((file, slot)) = try_any_slot(max)? {
232 return Ok((file, slot));
233 }
234
235 if wait_secs == 0 {
236 return Err(AppError::AllSlotsFull {
237 max,
238 waited_secs: 0,
239 });
240 }
241
242 let deadline = Instant::now() + Duration::from_secs(wait_secs);
244 let mut polls: u64 = 0;
245 loop {
246 let poll_delay = CLI_LOCK_POLL_INTERVAL_MS
247 .saturating_mul(1 + polls / 4)
248 .min(CLI_LOCK_POLL_INTERVAL_MS * 4);
249 thread::sleep(Duration::from_millis(poll_delay));
250 polls += 1;
251 if let Some((file, slot)) = try_any_slot(max)? {
252 return Ok((file, slot));
253 }
254 if Instant::now() >= deadline {
255 return Err(AppError::AllSlotsFull {
256 max,
257 waited_secs: wait_secs,
258 });
259 }
260 }
261}
262
263pub fn acquire_job_singleton(
277 job_type: JobType,
278 namespace: &str,
279 db_path: &Path,
280 wait_seconds: Option<u64>,
281 force: bool,
282) -> Result<File, AppError> {
283 let db_hash = db_path_hash(db_path);
284 let path = job_singleton_path(job_type, namespace, &db_hash)?;
285
286 if force && path.exists() {
292 tracing::warn!(target: "lock",
293 path = %path.display(),
294 "force=true; removing pre-existing singleton lock file"
295 );
296 let _ = std::fs::remove_file(&path);
297 }
298
299 let file = OpenOptions::new()
300 .read(true)
301 .write(true)
302 .create(true)
303 .truncate(false)
304 .open(&path)?;
305 if let Err(e) = file.try_lock_exclusive() {
306 if !is_lock_contended(&e) {
307 return Err(AppError::Io(e));
308 }
309 let wait_secs = wait_seconds.unwrap_or(0);
311 if wait_secs == 0 {
312 return Err(AppError::JobSingletonLocked {
313 job_type: job_type.tag().to_string(),
314 namespace: namespace.to_string(),
315 });
316 }
317 let deadline = Instant::now() + Duration::from_secs(wait_secs);
318 drop(file);
321 loop {
322 thread::sleep(Duration::from_millis(JOB_SINGLETON_POLL_INTERVAL_MS));
323 let file = OpenOptions::new()
324 .read(true)
325 .write(true)
326 .create(true)
327 .truncate(false)
328 .open(&path)?;
329 if file.try_lock_exclusive().is_ok() {
330 return Ok(file);
331 }
332 if Instant::now() >= deadline {
333 return Err(AppError::JobSingletonLocked {
334 job_type: job_type.tag().to_string(),
335 namespace: namespace.to_string(),
336 });
337 }
338 }
339 }
340 Ok(file)
341}
342
343fn embedding_singleton_path(namespace: &str, db_hash: &str) -> Result<PathBuf, AppError> {
349 let cache = cache_dir()?;
350 std::fs::create_dir_all(&cache)?;
351 let slug = if namespace.is_empty() {
352 "default".to_string()
353 } else {
354 namespace
355 .chars()
356 .map(|c| {
357 if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
358 c.to_ascii_lowercase()
359 } else {
360 '-'
361 }
362 })
363 .collect::<String>()
364 };
365 let safe_hash: String = db_hash
366 .chars()
367 .filter(|c| c.is_ascii_alphanumeric())
368 .take(16)
369 .collect();
370 Ok(cache.join(format!("embed-singleton-{slug}-{safe_hash}.lock")))
371}
372
373pub fn acquire_embedding_singleton(
395 namespace: &str,
396 db_path: &Path,
397 wait_seconds: Option<u64>,
398 force: bool,
399) -> Result<File, AppError> {
400 let db_hash = db_path_hash(db_path);
401 let path = embedding_singleton_path(namespace, &db_hash)?;
402
403 if force && path.exists() {
404 tracing::warn!(target: "lock.g45",
405 path = %path.display(),
406 "force=true; removing pre-existing embedding singleton lock file"
407 );
408 let _ = std::fs::remove_file(&path);
409 }
410
411 let file = OpenOptions::new()
412 .read(true)
413 .write(true)
414 .create(true)
415 .truncate(false)
416 .open(&path)?;
417 if let Err(e) = file.try_lock_exclusive() {
418 if !is_lock_contended(&e) {
419 return Err(AppError::Io(e));
420 }
421 let wait_secs = wait_seconds.unwrap_or(0);
422 if wait_secs == 0 {
423 return Err(AppError::EmbeddingSingletonLocked {
424 namespace: namespace.to_string(),
425 });
426 }
427 let deadline = Instant::now() + Duration::from_secs(wait_secs);
428 drop(file);
429 loop {
430 thread::sleep(Duration::from_millis(JOB_SINGLETON_POLL_INTERVAL_MS));
431 let file = OpenOptions::new()
432 .read(true)
433 .write(true)
434 .create(true)
435 .truncate(false)
436 .open(&path)?;
437 if file.try_lock_exclusive().is_ok() {
438 return Ok(file);
439 }
440 if Instant::now() >= deadline {
441 return Err(AppError::EmbeddingSingletonLocked {
442 namespace: namespace.to_string(),
443 });
444 }
445 }
446 }
447 Ok(file)
448}
449
450fn try_any_slot(max: usize) -> Result<Option<(File, usize)>, AppError> {
455 for slot in 1..=max {
456 match try_acquire_slot(slot) {
457 Ok(file) => return Ok(Some((file, slot))),
458 Err(AppError::Io(e)) if is_lock_contended(&e) => continue,
459 Err(e) => return Err(e),
460 }
461 }
462 Ok(None)
463}
464
465fn is_lock_contended(error: &std::io::Error) -> bool {
466 if error.kind() == std::io::ErrorKind::WouldBlock {
467 return true;
468 }
469
470 #[cfg(windows)]
471 {
472 matches!(error.raw_os_error(), Some(32 | 33))
473 }
474
475 #[cfg(not(windows))]
476 {
477 false
478 }
479}
480
481#[cfg(test)]
482mod tests {
483 use super::*;
484 use std::sync::atomic::{AtomicUsize, Ordering};
485 static SEQ: AtomicUsize = AtomicUsize::new(0);
486
487 fn unique_ns() -> String {
488 let n = SEQ.fetch_add(1, Ordering::SeqCst);
489 let pid = std::process::id();
490 format!("test-{pid}-{n}")
491 }
492
493 #[test]
494 fn job_singleton_path_sanitises_namespace() {
495 let p = job_singleton_path(JobType::Enrich, "Foo Bar/Baz", "abc123def456")
496 .expect("path should resolve");
497 let name = p.file_name().unwrap().to_string_lossy().to_string();
498 assert!(name.contains("enrich"), "got {name}");
499 assert!(name.contains("foo-bar-baz"), "got {name}");
500 assert!(
501 name.contains("abc123def456"),
502 "must embed db_hash: got {name}"
503 );
504 }
505
506 #[test]
507 fn job_singleton_blocks_second_invocation_same_namespace() {
508 let ns = unique_ns();
509 let db = std::env::temp_dir().join(format!("test-{}.sqlite", unique_ns()));
510 let first = acquire_job_singleton(JobType::Enrich, &ns, &db, Some(0), false)
511 .expect("first acquire should succeed");
512 let second = acquire_job_singleton(JobType::Enrich, &ns, &db, Some(0), false);
513 assert!(
514 matches!(second, Err(AppError::JobSingletonLocked { .. })),
515 "expected JobSingletonLocked, got {second:?}"
516 );
517 drop(first);
518 }
519
520 #[test]
521 fn job_singleton_allows_different_namespaces() {
522 let ns_a = unique_ns();
523 let ns_b = unique_ns();
524 let db_a = std::env::temp_dir().join(format!("test-a-{}.sqlite", unique_ns()));
525 let db_b = std::env::temp_dir().join(format!("test-b-{}.sqlite", unique_ns()));
526 let first = acquire_job_singleton(JobType::IngestClaudeCode, &ns_a, &db_a, Some(0), false)
527 .expect("ns_a should acquire");
528 let second = acquire_job_singleton(JobType::IngestClaudeCode, &ns_b, &db_b, Some(0), false)
529 .expect("ns_b should acquire in parallel");
530 drop(first);
531 drop(second);
532 }
533
534 #[test]
535 fn job_singleton_scoped_by_db_hash() {
536 let ns = unique_ns();
539 let db_a = std::env::temp_dir().join(format!("test-x-{}.sqlite", unique_ns()));
540 let db_b = std::env::temp_dir().join(format!("test-y-{}.sqlite", unique_ns()));
541 let first = acquire_job_singleton(JobType::Enrich, &ns, &db_a, Some(0), false)
542 .expect("db_a should acquire");
543 let second = acquire_job_singleton(JobType::Enrich, &ns, &db_b, Some(0), false)
544 .expect("db_b should acquire independently (G30 fix)");
545 drop(first);
546 drop(second);
547 }
548
549 #[test]
550 fn db_path_hash_is_stable_for_same_path() {
551 let p = std::env::temp_dir().join("hashing-test.sqlite");
552 let h1 = db_path_hash(&p);
553 let h2 = db_path_hash(&p);
554 assert_eq!(h1, h2, "same path must produce same hash");
555 assert_eq!(h1.len(), 12, "BLAKE3 prefix must be 12 hex chars");
556 }
557
558 #[test]
559 fn db_path_hash_differs_for_different_paths() {
560 let a = std::env::temp_dir().join("hash-a.sqlite");
561 let b = std::env::temp_dir().join("hash-b.sqlite");
562 assert_ne!(db_path_hash(&a), db_path_hash(&b));
563 }
564
565 #[test]
567 fn g45_embedding_singleton_blocks_second_invocation_same_db() {
568 let ns = unique_ns();
569 let db = std::env::temp_dir().join(format!("g45-{}.sqlite", unique_ns()));
570 let first = acquire_embedding_singleton(&ns, &db, Some(0), false)
571 .expect("first acquire should succeed");
572 let second = acquire_embedding_singleton(&ns, &db, Some(0), false);
573 assert!(
574 matches!(second, Err(AppError::EmbeddingSingletonLocked { .. })),
575 "expected EmbeddingSingletonLocked, got {second:?}"
576 );
577 drop(first);
578 }
579
580 #[test]
581 fn g45_embedding_singleton_allows_different_namespaces() {
582 let ns_a = unique_ns();
583 let ns_b = unique_ns();
584 let db = std::env::temp_dir().join(format!("g45-multi-{}.sqlite", unique_ns()));
585 let first =
586 acquire_embedding_singleton(&ns_a, &db, Some(0), false).expect("ns_a should acquire");
587 let second = acquire_embedding_singleton(&ns_b, &db, Some(0), false)
588 .expect("ns_b should acquire in parallel (different namespace)");
589 drop(first);
590 drop(second);
591 }
592
593 #[test]
594 fn g45_embedding_singleton_scoped_by_db_hash() {
595 let ns = unique_ns();
597 let db_a = std::env::temp_dir().join(format!("g45-x-{}.sqlite", unique_ns()));
598 let db_b = std::env::temp_dir().join(format!("g45-y-{}.sqlite", unique_ns()));
599 let first =
600 acquire_embedding_singleton(&ns, &db_a, Some(0), false).expect("db_a should acquire");
601 let second = acquire_embedding_singleton(&ns, &db_b, Some(0), false)
602 .expect("db_b should acquire independently (G45 db_hash scope)");
603 drop(first);
604 drop(second);
605 }
606}