use crate::io::error::InboxError;
use fs2::FileExt;
use std::fs::File;
use std::path::Path;
use std::time::Duration;
pub struct FileLock {
file: File,
}
impl Drop for FileLock {
fn drop(&mut self) {
let _ = FileExt::unlock(&self.file);
}
}
pub fn acquire_lock(path: &Path, max_retries: u32) -> Result<FileLock, InboxError> {
use std::fs::OpenOptions;
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(path)
.map_err(|e| InboxError::Io {
path: path.to_path_buf(),
source: e,
})?;
for attempt in 0..=max_retries {
match file.try_lock_exclusive() {
Ok(()) => {
return Ok(FileLock { file });
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock
|| e.kind() == std::io::ErrorKind::PermissionDenied
|| e.raw_os_error() == Some(33) =>
{
if attempt < max_retries {
let wait_ms = 50u64 * (1 << attempt);
std::thread::sleep(Duration::from_millis(wait_ms));
}
}
Err(e) => {
return Err(InboxError::Io {
path: path.to_path_buf(),
source: e,
});
}
}
}
Err(InboxError::LockTimeout {
path: path.to_path_buf(),
retries: max_retries,
})
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::{Arc, Barrier};
use std::thread;
use tempfile::TempDir;
#[test]
fn test_acquire_lock_success() {
let temp_dir = TempDir::new().unwrap();
let lock_path = temp_dir.path().join("test.lock");
let lock = acquire_lock(&lock_path, 5).unwrap();
assert!(lock_path.exists());
drop(lock);
}
#[test]
fn test_acquire_lock_sequential() {
let temp_dir = TempDir::new().unwrap();
let lock_path = temp_dir.path().join("test.lock");
{
let _lock1 = acquire_lock(&lock_path, 5).unwrap();
}
let _lock2 = acquire_lock(&lock_path, 5).unwrap();
}
#[test]
fn test_acquire_lock_concurrent() {
let temp_dir = TempDir::new().unwrap();
let lock_path = Arc::new(temp_dir.path().join("test.lock"));
let barrier = Arc::new(Barrier::new(2));
let lock_path_clone = Arc::clone(&lock_path);
let barrier_clone = Arc::clone(&barrier);
let handle1 = thread::spawn(move || {
let _lock = acquire_lock(&lock_path_clone, 5).unwrap();
barrier_clone.wait();
thread::sleep(Duration::from_millis(100));
});
let handle2 = thread::spawn(move || {
barrier.wait();
let result = acquire_lock(&lock_path, 5);
result.is_ok()
});
handle1.join().unwrap();
let success = handle2.join().unwrap();
assert!(success);
}
#[test]
fn test_acquire_lock_timeout() {
let temp_dir = TempDir::new().unwrap();
let lock_path = Arc::new(temp_dir.path().join("test.lock"));
let lock_path_clone = Arc::clone(&lock_path);
let handle1 = thread::spawn(move || {
let _lock = acquire_lock(&lock_path_clone, 5).unwrap();
thread::sleep(Duration::from_secs(2)); });
thread::sleep(Duration::from_millis(50));
let result = acquire_lock(&lock_path, 3); assert!(matches!(result, Err(InboxError::LockTimeout { .. })));
handle1.join().unwrap();
}
#[test]
fn test_lock_auto_release() {
let temp_dir = TempDir::new().unwrap();
let lock_path = temp_dir.path().join("test.lock");
{
let _lock = acquire_lock(&lock_path, 5).unwrap();
}
let _lock2 = acquire_lock(&lock_path, 5).unwrap();
}
}