use anyhow::{Result, anyhow};
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
pub const DEFAULT_TIMEOUT_SECS: u64 = 180;
pub const DEFAULT_GRACE_SECS: u64 = 30;
const POLL_INTERVAL_MS: u64 = 500;
const STALENESS_MARGIN_SECS: u64 = 60;
pub struct FileLockManager {
locks_dir: PathBuf,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LockState {
pub owner: String,
pub file_path: String,
pub acquired_at: u64,
pub grace_until: Option<u64>,
pub last_activity: u64,
}
#[derive(Debug)]
pub enum AcquireResult {
Acquired,
AcquiredStaleRead {
context: String,
},
Denied {
reason: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ReadTrack {
mtime_ms: u64,
}
impl FileLockManager {
pub fn new() -> Result<Self> {
let locks_dir = locks_dir();
std::fs::create_dir_all(&locks_dir).map_err(|e| {
anyhow!(
"Failed to create locks directory {}: {e}",
locks_dir.display()
)
})?;
Ok(Self { locks_dir })
}
#[cfg(test)]
pub fn with_dir(locks_dir: PathBuf) -> Result<Self> {
std::fs::create_dir_all(&locks_dir)?;
Ok(Self { locks_dir })
}
#[must_use]
pub fn acquire(&self, file_path: &str, owner: &str, timeout_secs: u64) -> AcquireResult {
let lock_path = self.lock_path(file_path);
let now = unix_now();
let deadline = now + timeout_secs;
let staleness_threshold =
now.saturating_sub(timeout_secs + DEFAULT_GRACE_SECS + STALENESS_MARGIN_SECS);
loop {
let now = unix_now();
if now >= deadline {
if let Some(state) = self.read_lock(&lock_path) {
let held_secs = now.saturating_sub(state.acquired_at);
return AcquireResult::Denied {
reason: format!(
"File {file_path} is locked by another agent (held for {held_secs}s). \
Work on a different file and retry later."
),
};
}
if self.try_claim(&lock_path, file_path, owner, now) {
return self.check_stale_read(file_path, owner);
}
return AcquireResult::Denied {
reason: format!("File {file_path} lock acquisition timed out. Retry later."),
};
}
match self.read_lock(&lock_path) {
None => {
if self.try_claim(&lock_path, file_path, owner, now) {
return self.check_stale_read(file_path, owner);
}
}
Some(state) if state.owner == owner => {
if self.try_claim(&lock_path, file_path, owner, now) {
return self.check_stale_read(file_path, owner);
}
}
Some(state) => {
let reclaimable = state.grace_until.is_some_and(|g| now >= g)
|| state.last_activity < staleness_threshold;
if reclaimable && self.try_claim(&lock_path, file_path, owner, now) {
return self.check_stale_read(file_path, owner);
}
}
}
std::thread::sleep(Duration::from_millis(POLL_INTERVAL_MS));
}
}
pub fn release(&self, file_path: &str, owner: &str, grace_secs: u64) -> Result<()> {
let lock_path = self.lock_path(file_path);
let Some(state) = self.read_lock(&lock_path) else {
return Ok(());
};
if state.owner != owner {
return Ok(());
}
if grace_secs == 0 {
let _ = std::fs::remove_file(&lock_path);
return Ok(());
}
let now = unix_now();
let updated = LockState {
grace_until: Some(now + grace_secs),
last_activity: now,
..state
};
self.atomic_write(&lock_path, &updated)
}
pub fn track_read(&self, file_path: &str, owner: &str) -> Result<()> {
let mtime = file_mtime_ms(file_path).unwrap_or(0);
let track = ReadTrack { mtime_ms: mtime };
let reads_dir = self.reads_dir(file_path);
std::fs::create_dir_all(&reads_dir)?;
let track_path = reads_dir.join(format!("{}.json", fnv1a_hash(owner)));
let bytes = serde_json::to_vec(&track)
.map_err(|e| anyhow!("Failed to serialize read track: {e}"))?;
self.atomic_write_bytes(&track_path, &bytes)
}
fn lock_path(&self, file_path: &str) -> PathBuf {
self.locks_dir
.join(format!("{}.json", fnv1a_hash(file_path)))
}
fn reads_dir(&self, file_path: &str) -> PathBuf {
self.locks_dir
.join(format!("{}.reads", fnv1a_hash(file_path)))
}
#[allow(clippy::unused_self, reason = "Method on manager for API consistency")]
fn read_lock(&self, lock_path: &Path) -> Option<LockState> {
let data = std::fs::read_to_string(lock_path).ok()?;
serde_json::from_str(&data).ok()
}
fn try_claim(&self, lock_path: &Path, file_path: &str, owner: &str, now: u64) -> bool {
let state = LockState {
owner: owner.to_string(),
file_path: file_path.to_string(),
acquired_at: now,
grace_until: None,
last_activity: now,
};
self.atomic_write(lock_path, &state).is_ok()
}
fn check_stale_read(&self, file_path: &str, owner: &str) -> AcquireResult {
let track_path = self
.reads_dir(file_path)
.join(format!("{}.json", fnv1a_hash(owner)));
let Some(data) = std::fs::read_to_string(&track_path).ok() else {
return AcquireResult::Acquired;
};
let Some(track) = serde_json::from_str::<ReadTrack>(&data).ok() else {
return AcquireResult::Acquired;
};
let current_mtime = file_mtime_ms(file_path).unwrap_or(0);
if track.mtime_ms != 0 && current_mtime != track.mtime_ms {
AcquireResult::AcquiredStaleRead {
context: format!(
"Warning: {file_path} was modified by another agent since you last \
read it. Re-read the file before editing to avoid overwriting changes."
),
}
} else {
AcquireResult::Acquired
}
}
fn atomic_write(&self, path: &Path, state: &LockState) -> Result<()> {
let bytes = serde_json::to_vec_pretty(state).map_err(|e| anyhow!("JSON serialize: {e}"))?;
self.atomic_write_bytes(path, &bytes)
}
#[allow(clippy::unused_self, reason = "Method on manager for API consistency")]
fn atomic_write_bytes(&self, path: &Path, data: &[u8]) -> Result<()> {
let pid = std::process::id();
let temp_path = path.with_extension(format!("tmp.{pid}"));
std::fs::write(&temp_path, data).map_err(|e| {
anyhow!(
"Failed to write temp lock file {}: {e}",
temp_path.display()
)
})?;
std::fs::rename(&temp_path, path).map_err(|e| {
let _ = std::fs::remove_file(&temp_path);
anyhow!(
"Failed to rename {} -> {}: {e}",
temp_path.display(),
path.display()
)
})
}
}
pub fn locks_dir() -> PathBuf {
let state_dir = dirs::state_dir()
.or_else(dirs::data_local_dir)
.unwrap_or_else(|| PathBuf::from("/tmp"));
state_dir.join("catenary").join("locks")
}
fn fnv1a_hash(input: &str) -> String {
const FNV_OFFSET: u64 = 0xcbf2_9ce4_8422_2325;
const FNV_PRIME: u64 = 0x0100_0000_01b3;
let mut hash: u64 = FNV_OFFSET;
for byte in input.as_bytes() {
hash ^= u64::from(*byte);
hash = hash.wrapping_mul(FNV_PRIME);
}
format!("{hash:016x}")
}
fn unix_now() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}
fn file_mtime_ms(path: &str) -> Option<u64> {
let duration = std::fs::metadata(path)
.ok()?
.modified()
.ok()?
.duration_since(UNIX_EPOCH)
.ok()?;
Some(u64::try_from(duration.as_millis()).unwrap_or(u64::MAX))
}
#[cfg(test)]
#[allow(clippy::unwrap_used, reason = "Tests use unwrap for brevity")]
mod tests {
use super::*;
fn setup() -> (FileLockManager, tempfile::TempDir) {
let dir = tempfile::tempdir().ok().unwrap();
let mgr = FileLockManager::with_dir(dir.path().join("locks"))
.ok()
.unwrap();
(mgr, dir)
}
#[test]
fn acquire_uncontested() {
let (mgr, _dir) = setup();
let result = mgr.acquire("/tmp/test.rs", "agent-a", 5);
assert!(matches!(result, AcquireResult::Acquired));
}
#[test]
fn reacquire_same_owner() {
let (mgr, _dir) = setup();
let result = mgr.acquire("/tmp/test.rs", "agent-a", 5);
assert!(matches!(result, AcquireResult::Acquired));
mgr.release("/tmp/test.rs", "agent-a", 30).ok().unwrap();
let result = mgr.acquire("/tmp/test.rs", "agent-a", 5);
assert!(matches!(result, AcquireResult::Acquired));
}
#[test]
fn blocked_by_different_owner() {
let (mgr, _dir) = setup();
let result = mgr.acquire("/tmp/test.rs", "agent-a", 5);
assert!(matches!(result, AcquireResult::Acquired));
let result = mgr.acquire("/tmp/test.rs", "agent-b", 1);
assert!(matches!(result, AcquireResult::Denied { .. }));
}
#[test]
fn grace_period_allows_reclaim() {
let (mgr, _dir) = setup();
let result = mgr.acquire("/tmp/test.rs", "agent-a", 5);
assert!(matches!(result, AcquireResult::Acquired));
mgr.release("/tmp/test.rs", "agent-a", 0).ok().unwrap();
let result = mgr.acquire("/tmp/test.rs", "agent-b", 1);
assert!(matches!(result, AcquireResult::Acquired));
}
#[test]
fn grace_period_blocks_until_expired() {
let (mgr, _dir) = setup();
let result = mgr.acquire("/tmp/test.rs", "agent-a", 5);
assert!(matches!(result, AcquireResult::Acquired));
mgr.release("/tmp/test.rs", "agent-a", 1).ok().unwrap();
let start = std::time::Instant::now();
let result = mgr.acquire("/tmp/test.rs", "agent-b", 5);
let elapsed = start.elapsed();
assert!(matches!(result, AcquireResult::Acquired));
assert!(
elapsed >= Duration::from_millis(400),
"Expected wait, got {elapsed:?}"
);
}
#[test]
fn release_not_owner_is_noop() {
let (mgr, _dir) = setup();
let result = mgr.acquire("/tmp/test.rs", "agent-a", 5);
assert!(matches!(result, AcquireResult::Acquired));
mgr.release("/tmp/test.rs", "agent-b", 0).ok().unwrap();
let result = mgr.acquire("/tmp/test.rs", "agent-b", 1);
assert!(matches!(result, AcquireResult::Denied { .. }));
}
#[test]
fn release_nonexistent_is_ok() {
let (mgr, _dir) = setup();
mgr.release("/tmp/nonexistent.rs", "agent-a", 0)
.ok()
.unwrap();
}
#[test]
fn read_tracking_detects_change() {
let (mgr, dir) = setup();
let test_file = dir.path().join("tracked.rs");
std::fs::write(&test_file, "original content").ok().unwrap();
let file_str = test_file.to_string_lossy().to_string();
mgr.track_read(&file_str, "agent-a").ok().unwrap();
std::thread::sleep(Duration::from_millis(50));
std::fs::write(&test_file, "modified content").ok().unwrap();
let result = mgr.acquire(&file_str, "agent-a", 5);
assert!(
matches!(result, AcquireResult::AcquiredStaleRead { .. }),
"Expected AcquiredStaleRead, got {result:?}"
);
}
#[test]
fn self_edit_not_stale() {
let (mgr, dir) = setup();
let test_file = dir.path().join("selfed.rs");
std::fs::write(&test_file, "original content").ok().unwrap();
let file_str = test_file.to_string_lossy().to_string();
mgr.track_read(&file_str, "agent-a").ok().unwrap();
std::thread::sleep(Duration::from_millis(50));
std::fs::write(&test_file, "edited content").ok().unwrap();
mgr.track_read(&file_str, "agent-a").ok().unwrap();
let result = mgr.acquire(&file_str, "agent-a", 5);
assert!(
matches!(result, AcquireResult::Acquired),
"Expected Acquired after self-edit + track_read, got {result:?}"
);
}
#[test]
fn read_tracking_no_change() {
let (mgr, dir) = setup();
let test_file = dir.path().join("stable.rs");
std::fs::write(&test_file, "content").ok().unwrap();
let file_str = test_file.to_string_lossy().to_string();
mgr.track_read(&file_str, "agent-a").ok().unwrap();
let result = mgr.acquire(&file_str, "agent-a", 5);
assert!(matches!(result, AcquireResult::Acquired));
}
#[test]
fn fnv1a_hash_deterministic() {
let h1 = fnv1a_hash("/home/user/project/src/main.rs");
let h2 = fnv1a_hash("/home/user/project/src/main.rs");
assert_eq!(h1, h2);
assert_eq!(h1.len(), 16);
let h3 = fnv1a_hash("/home/user/project/src/lib.rs");
assert_ne!(h1, h3);
}
}