use std::fs;
use std::io::Write;
use std::path::{Path, PathBuf};
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LockInfo {
pub unit_id: String,
pub pid: u32,
pub file_path: String,
pub locked_at: i64,
}
#[derive(Debug)]
pub struct ActiveLock {
pub info: LockInfo,
pub lock_path: PathBuf,
}
pub fn lock_dir(mana_dir: &Path) -> Result<PathBuf> {
let dir = mana_dir.join("locks");
fs::create_dir_all(&dir)
.with_context(|| format!("Failed to create locks directory: {}", dir.display()))?;
Ok(dir)
}
fn lock_filename(file_path: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(file_path.as_bytes());
let hash = hasher.finalize();
format!("{:x}.lock", hash)
}
fn lock_file_path(mana_dir: &Path, file_path: &str) -> Result<PathBuf> {
let dir = lock_dir(mana_dir)?;
Ok(dir.join(lock_filename(file_path)))
}
pub fn acquire(mana_dir: &Path, unit_id: &str, pid: u32, file_path: &str) -> Result<bool> {
let lock_path = lock_file_path(mana_dir, file_path)?;
let info = LockInfo {
unit_id: unit_id.to_string(),
pid,
file_path: file_path.to_string(),
locked_at: chrono::Utc::now().timestamp(),
};
let content = serde_json::to_string_pretty(&info).context("Failed to serialize lock info")?;
for _ in 0..2 {
match fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(&lock_path)
{
Ok(mut file) => {
file.write_all(content.as_bytes()).with_context(|| {
format!("Failed to write lock file: {}", lock_path.display())
})?;
return Ok(true);
}
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
match read_lock(&lock_path) {
Some(existing) if existing.unit_id == unit_id && existing.pid == pid => {
return Ok(true);
}
Some(existing) if is_process_alive(existing.pid) => {
return Ok(false);
}
_ => {
let _ = fs::remove_file(&lock_path);
}
}
}
Err(e) => {
return Err(e).with_context(|| {
format!("Failed to create lock file: {}", lock_path.display())
});
}
}
}
Ok(false)
}
pub fn release_all_for_unit(mana_dir: &Path, unit_id: &str) -> Result<u32> {
let mut released = 0;
for lock in list_locks(mana_dir)? {
if lock.info.unit_id == unit_id {
let _ = fs::remove_file(&lock.lock_path);
released += 1;
}
}
Ok(released)
}
pub fn clear_all(mana_dir: &Path) -> Result<u32> {
let mut cleared = 0;
for lock in list_locks(mana_dir)? {
let _ = fs::remove_file(&lock.lock_path);
cleared += 1;
}
Ok(cleared)
}
pub fn list_locks(mana_dir: &Path) -> Result<Vec<ActiveLock>> {
let dir = lock_dir(mana_dir)?;
let mut locks = Vec::new();
let entries = match fs::read_dir(&dir) {
Ok(entries) => entries,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(locks),
Err(e) => return Err(e).context("Failed to read locks directory"),
};
for entry in entries {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) != Some("lock") {
continue;
}
match read_lock(&path) {
Some(info) if is_process_alive(info.pid) => {
locks.push(ActiveLock {
info,
lock_path: path,
});
}
_ => {
let _ = fs::remove_file(&path);
}
}
}
Ok(locks)
}
pub fn check_lock(mana_dir: &Path, file_path: &str) -> Result<Option<LockInfo>> {
let lock_path = lock_file_path(mana_dir, file_path)?;
if !lock_path.exists() {
return Ok(None);
}
match read_lock(&lock_path) {
Some(info) => {
if is_process_alive(info.pid) {
Ok(Some(info))
} else {
let _ = fs::remove_file(&lock_path);
Ok(None)
}
}
None => {
let _ = fs::remove_file(&lock_path);
Ok(None)
}
}
}
fn read_lock(path: &Path) -> Option<LockInfo> {
let content = fs::read_to_string(path).ok()?;
serde_json::from_str(&content).ok()
}
fn is_process_alive(pid: u32) -> bool {
let ret = unsafe { libc::kill(pid as i32, 0) };
if ret == 0 {
return true;
}
std::io::Error::last_os_error().raw_os_error() == Some(libc::EPERM)
}
#[cfg(test)]
mod tests {
use super::*;
fn temp_mana_dir() -> (tempfile::TempDir, PathBuf) {
let dir = tempfile::tempdir().unwrap();
let mana_dir = dir.path().join(".mana");
fs::create_dir_all(&mana_dir).unwrap();
(dir, mana_dir)
}
#[test]
fn acquire_and_release_via_unit() {
let (_dir, mana_dir) = temp_mana_dir();
let pid = std::process::id();
let acquired = acquire(&mana_dir, "1.1", pid, "/tmp/test.rs").unwrap();
assert!(acquired);
let info = check_lock(&mana_dir, "/tmp/test.rs").unwrap();
assert!(info.is_some());
assert_eq!(info.unwrap().unit_id, "1.1");
release_all_for_unit(&mana_dir, "1.1").unwrap();
let info = check_lock(&mana_dir, "/tmp/test.rs").unwrap();
assert!(info.is_none());
}
#[test]
fn release_all_for_unit_works() {
let (_dir, mana_dir) = temp_mana_dir();
let pid = std::process::id();
acquire(&mana_dir, "2.1", pid, "/tmp/a.rs").unwrap();
acquire(&mana_dir, "2.1", pid, "/tmp/b.rs").unwrap();
acquire(&mana_dir, "2.2", pid, "/tmp/c.rs").unwrap();
let released = release_all_for_unit(&mana_dir, "2.1").unwrap();
assert_eq!(released, 2);
assert!(check_lock(&mana_dir, "/tmp/c.rs").unwrap().is_some());
assert!(check_lock(&mana_dir, "/tmp/a.rs").unwrap().is_none());
}
#[test]
fn list_locks_returns_all() {
let (_dir, mana_dir) = temp_mana_dir();
let pid = std::process::id();
acquire(&mana_dir, "3.1", pid, "/tmp/x.rs").unwrap();
acquire(&mana_dir, "3.2", pid, "/tmp/y.rs").unwrap();
let locks = list_locks(&mana_dir).unwrap();
assert_eq!(locks.len(), 2);
}
#[test]
fn clear_all_removes_everything() {
let (_dir, mana_dir) = temp_mana_dir();
let pid = std::process::id();
acquire(&mana_dir, "4.1", pid, "/tmp/p.rs").unwrap();
acquire(&mana_dir, "4.2", pid, "/tmp/q.rs").unwrap();
let cleared = clear_all(&mana_dir).unwrap();
assert_eq!(cleared, 2);
let locks = list_locks(&mana_dir).unwrap();
assert!(locks.is_empty());
}
#[test]
fn stale_lock_is_cleaned() {
let (_dir, mana_dir) = temp_mana_dir();
let lock_path = lock_file_path(&mana_dir, "/tmp/stale.rs").unwrap();
let info = LockInfo {
unit_id: "5.1".to_string(),
pid: 999_999_999, file_path: "/tmp/stale.rs".to_string(),
locked_at: 0,
};
fs::write(&lock_path, serde_json::to_string(&info).unwrap()).unwrap();
let result = check_lock(&mana_dir, "/tmp/stale.rs").unwrap();
assert!(result.is_none());
assert!(!lock_path.exists());
}
#[test]
fn acquire_cleans_stale_and_succeeds() {
let (_dir, mana_dir) = temp_mana_dir();
let lock_path = lock_file_path(&mana_dir, "/tmp/stale2.rs").unwrap();
let info = LockInfo {
unit_id: "6.1".to_string(),
pid: 999_999_999,
file_path: "/tmp/stale2.rs".to_string(),
locked_at: 0,
};
fs::write(&lock_path, serde_json::to_string(&info).unwrap()).unwrap();
let acquired = acquire(&mana_dir, "6.2", std::process::id(), "/tmp/stale2.rs").unwrap();
assert!(acquired);
}
#[test]
fn same_owner_reacquire_is_idempotent() {
let (_dir, mana_dir) = temp_mana_dir();
let pid = std::process::id();
let first = acquire(&mana_dir, "7.1", pid, "/tmp/idem.rs").unwrap();
assert!(first);
let second = acquire(&mana_dir, "7.1", pid, "/tmp/idem.rs").unwrap();
assert!(second);
let info = check_lock(&mana_dir, "/tmp/idem.rs").unwrap();
assert!(info.is_some());
assert_eq!(info.unwrap().unit_id, "7.1");
}
#[test]
fn different_owner_blocked_by_live_lock() {
let (_dir, mana_dir) = temp_mana_dir();
let pid = std::process::id();
let first = acquire(&mana_dir, "8.1", pid, "/tmp/contested.rs").unwrap();
assert!(first);
let second = acquire(&mana_dir, "8.2", pid + 1, "/tmp/contested.rs").unwrap();
assert!(!second);
}
#[test]
fn list_locks_filters_stale() {
let (_dir, mana_dir) = temp_mana_dir();
let pid = std::process::id();
acquire(&mana_dir, "9.1", pid, "/tmp/live.rs").unwrap();
let stale_path = lock_file_path(&mana_dir, "/tmp/ghost.rs").unwrap();
let stale = LockInfo {
unit_id: "9.2".to_string(),
pid: 999_999_999,
file_path: "/tmp/ghost.rs".to_string(),
locked_at: 0,
};
fs::write(&stale_path, serde_json::to_string(&stale).unwrap()).unwrap();
let locks = list_locks(&mana_dir).unwrap();
assert_eq!(locks.len(), 1);
assert_eq!(locks[0].info.unit_id, "9.1");
assert!(!stale_path.exists());
}
}