use std::fs::{File, OpenOptions};
use std::path::PathBuf;
use std::thread;
use std::time::{Duration, Instant};
use directories::ProjectDirs;
use fs4::fs_std::FileExt;
use crate::constants::{
CLI_LOCK_POLL_INTERVAL_MS, JOB_SINGLETON_POLL_INTERVAL_MS, MAX_CONCURRENT_CLI_INSTANCES,
};
use crate::errors::AppError;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum JobType {
Enrich,
IngestClaudeCode,
IngestCodex,
}
impl JobType {
fn tag(self) -> &'static str {
match self {
JobType::Enrich => "enrich",
JobType::IngestClaudeCode => "ingest-claude-code",
JobType::IngestCodex => "ingest-codex",
}
}
}
fn slot_path(slot: usize) -> Result<PathBuf, AppError> {
let cache = cache_dir()?;
std::fs::create_dir_all(&cache)?;
Ok(cache.join(format!("cli-slot-{slot}.lock")))
}
fn cache_dir() -> Result<PathBuf, AppError> {
if let Some(override_dir) = std::env::var_os("SQLITE_GRAPHRAG_CACHE_DIR") {
Ok(PathBuf::from(override_dir))
} else {
let dirs = ProjectDirs::from("", "", "sqlite-graphrag").ok_or_else(|| {
AppError::Io(std::io::Error::new(
std::io::ErrorKind::NotFound,
"could not determine cache directory for sqlite-graphrag lock files",
))
})?;
Ok(dirs.cache_dir().to_path_buf())
}
}
fn job_singleton_path(job_type: JobType, namespace: &str) -> Result<PathBuf, AppError> {
let cache = cache_dir()?;
std::fs::create_dir_all(&cache)?;
let slug = if namespace.is_empty() {
"default".to_string()
} else {
namespace
.chars()
.map(|c| {
if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
c.to_ascii_lowercase()
} else {
'-'
}
})
.collect::<String>()
};
Ok(cache.join(format!("job-singleton-{}-{slug}.lock", job_type.tag())))
}
fn try_acquire_slot(slot: usize) -> Result<File, AppError> {
let path = slot_path(slot)?;
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&path)?;
file.try_lock_exclusive().map_err(AppError::Io)?;
Ok(file)
}
pub fn acquire_cli_slot(
max_concurrency: usize,
wait_seconds: Option<u64>,
) -> Result<(File, usize), AppError> {
let ncpus = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4);
let ceiling = std::env::var("SQLITE_GRAPHRAG_MAX_CLI_INSTANCES")
.ok()
.and_then(|v| v.parse::<usize>().ok())
.unwrap_or_else(|| (2 * ncpus).max(MAX_CONCURRENT_CLI_INSTANCES));
let max = max_concurrency.clamp(1, ceiling);
let wait_secs = wait_seconds.unwrap_or(0);
if let Some((file, slot)) = try_any_slot(max)? {
return Ok((file, slot));
}
if wait_secs == 0 {
return Err(AppError::AllSlotsFull {
max,
waited_secs: 0,
});
}
let deadline = Instant::now() + Duration::from_secs(wait_secs);
let mut polls: u64 = 0;
loop {
let poll_delay = CLI_LOCK_POLL_INTERVAL_MS
.saturating_mul(1 + polls / 4)
.min(CLI_LOCK_POLL_INTERVAL_MS * 4);
thread::sleep(Duration::from_millis(poll_delay));
polls += 1;
if let Some((file, slot)) = try_any_slot(max)? {
return Ok((file, slot));
}
if Instant::now() >= deadline {
return Err(AppError::AllSlotsFull {
max,
waited_secs: wait_secs,
});
}
}
}
pub fn acquire_job_singleton(
job_type: JobType,
namespace: &str,
wait_seconds: Option<u64>,
) -> Result<File, AppError> {
let path = job_singleton_path(job_type, namespace)?;
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&path)?;
if let Err(e) = file.try_lock_exclusive() {
if !is_lock_contended(&e) {
return Err(AppError::Io(e));
}
let wait_secs = wait_seconds.unwrap_or(0);
if wait_secs == 0 {
return Err(AppError::JobSingletonLocked {
job_type: job_type.tag().to_string(),
namespace: namespace.to_string(),
});
}
let deadline = Instant::now() + Duration::from_secs(wait_secs);
drop(file);
loop {
thread::sleep(Duration::from_millis(JOB_SINGLETON_POLL_INTERVAL_MS));
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&path)?;
if file.try_lock_exclusive().is_ok() {
return Ok(file);
}
if Instant::now() >= deadline {
return Err(AppError::JobSingletonLocked {
job_type: job_type.tag().to_string(),
namespace: namespace.to_string(),
});
}
}
}
Ok(file)
}
fn try_any_slot(max: usize) -> Result<Option<(File, usize)>, AppError> {
for slot in 1..=max {
match try_acquire_slot(slot) {
Ok(file) => return Ok(Some((file, slot))),
Err(AppError::Io(e)) if is_lock_contended(&e) => continue,
Err(e) => return Err(e),
}
}
Ok(None)
}
fn is_lock_contended(error: &std::io::Error) -> bool {
if error.kind() == std::io::ErrorKind::WouldBlock {
return true;
}
#[cfg(windows)]
{
matches!(error.raw_os_error(), Some(32 | 33))
}
#[cfg(not(windows))]
{
false
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
static SEQ: AtomicUsize = AtomicUsize::new(0);
fn unique_ns() -> String {
let n = SEQ.fetch_add(1, Ordering::SeqCst);
let pid = std::process::id();
format!("test-{pid}-{n}")
}
#[test]
fn job_singleton_path_sanitises_namespace() {
let p = job_singleton_path(JobType::Enrich, "Foo Bar/Baz").expect("path should resolve");
let name = p.file_name().unwrap().to_string_lossy().to_string();
assert!(name.contains("enrich"), "got {name}");
assert!(name.contains("foo-bar-baz"), "got {name}");
}
#[test]
fn job_singleton_blocks_second_invocation_same_namespace() {
let ns = unique_ns();
let first = acquire_job_singleton(JobType::Enrich, &ns, Some(0))
.expect("first acquire should succeed");
let second = acquire_job_singleton(JobType::Enrich, &ns, Some(0));
assert!(
matches!(second, Err(AppError::JobSingletonLocked { .. })),
"expected JobSingletonLocked, got {second:?}"
);
drop(first);
}
#[test]
fn job_singleton_allows_different_namespaces() {
let ns_a = unique_ns();
let ns_b = unique_ns();
let first = acquire_job_singleton(JobType::IngestClaudeCode, &ns_a, Some(0))
.expect("ns_a should acquire");
let second = acquire_job_singleton(JobType::IngestClaudeCode, &ns_b, Some(0))
.expect("ns_b should acquire in parallel");
drop(first);
drop(second);
}
}