use std::fs::{File, OpenOptions};
use std::io::Read;
use std::path::Path;
use crate::error::{EngramError, Result};
#[derive(Debug, Clone)]
pub struct LockInfo {
pub pid: i32,
pub command: String,
pub created_at: String,
}
pub struct StorageLock {
#[allow(dead_code)]
file: Option<File>,
}
impl StorageLock {
pub fn acquire(db_path: &str, command: &str) -> Result<Self> {
if db_path == ":memory:" {
return Ok(Self { file: None });
}
Self::acquire_path(&format!("{db_path}.lock"), command)
}
pub fn read_info(db_path: &str) -> Option<LockInfo> {
if db_path == ":memory:" {
return None;
}
Self::read_info_file(&format!("{db_path}.lock"))
}
#[cfg(unix)]
fn acquire_path(lock_path: &str, command: &str) -> Result<Self> {
use std::io::Write;
use std::os::unix::io::AsRawFd;
if let Some(parent) = Path::new(lock_path).parent() {
if !parent.as_os_str().is_empty() {
let _ = std::fs::create_dir_all(parent);
}
}
let mut file = OpenOptions::new()
.create(true)
.read(true)
.write(true)
.truncate(false)
.open(lock_path)
.map_err(|e| {
EngramError::Storage(format!("cannot open lock file '{lock_path}': {e}"))
})?;
let rc = unsafe { libc::flock(file.as_raw_fd(), libc::LOCK_EX | libc::LOCK_NB) };
if rc != 0 {
let holder = Self::read_info_file(lock_path)
.map(|i| format!("pid {} ({}), since {}", i.pid, i.command, i.created_at))
.unwrap_or_else(|| "another process".to_string());
return Err(EngramError::Storage(format!(
"storage path is already locked by {holder}; only one mutating worker may \
own '{lock_path}'. Stop the other process or use a different database path."
)));
}
let pid = std::process::id() as i32;
let created_at = chrono::Utc::now().to_rfc3339();
let _ = file.set_len(0);
let _ = write!(
file,
"pid={pid}\ncommand={command}\ncreated_at={created_at}\n"
);
let _ = file.flush();
Ok(Self { file: Some(file) })
}
#[cfg(not(unix))]
fn acquire_path(_lock_path: &str, _command: &str) -> Result<Self> {
Ok(Self { file: None })
}
fn read_info_file(lock_path: &str) -> Option<LockInfo> {
let mut contents = String::new();
File::open(lock_path)
.ok()?
.read_to_string(&mut contents)
.ok()?;
let field = |key: &str| {
contents
.lines()
.find_map(|line| line.strip_prefix(key).map(str::to_string))
};
Some(LockInfo {
pid: field("pid=").and_then(|v| v.parse().ok()).unwrap_or(0),
command: field("command=").unwrap_or_default(),
created_at: field("created_at=").unwrap_or_default(),
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_storage_lock_in_memory_is_noop() {
let _a = StorageLock::acquire(":memory:", "a").unwrap();
let _b = StorageLock::acquire(":memory:", "b").unwrap();
assert!(StorageLock::read_info(":memory:").is_none());
}
#[cfg(unix)]
#[test]
fn test_storage_lock_contention_and_release() {
let db = std::env::temp_dir()
.join("engram-lock-test-contention.db")
.to_string_lossy()
.into_owned();
let lock_file = format!("{db}.lock");
let _ = std::fs::remove_file(&lock_file);
let lock1 = StorageLock::acquire(&db, "worker-1").expect("first acquire succeeds");
let info = StorageLock::read_info(&db).expect("holder metadata is recorded");
assert_eq!(info.pid, std::process::id() as i32);
assert_eq!(info.command, "worker-1");
assert!(
StorageLock::acquire(&db, "worker-2").is_err(),
"second acquire must be blocked while the first is held"
);
drop(lock1);
let lock3 = StorageLock::acquire(&db, "worker-3").expect("re-acquire after release");
assert_eq!(StorageLock::read_info(&db).unwrap().command, "worker-3");
drop(lock3);
let _ = std::fs::remove_file(&lock_file);
}
}