use anyhow::{Context, Result};
use fs4::fs_std::FileExt;
use std::fs::{File, OpenOptions};
use std::path::Path;
pub struct CacheLock {
_file: File,
}
impl CacheLock {
pub async fn acquire(cache_dir: &Path, source_name: &str) -> Result<Self> {
let locks_dir = cache_dir.join(".locks");
tokio::fs::create_dir_all(&locks_dir).await.map_err(|e| {
if e.kind() == std::io::ErrorKind::NotADirectory {
anyhow::anyhow!(
"Cannot create directory: cache path is not a directory ({})",
cache_dir.display()
)
} else if e.kind() == std::io::ErrorKind::PermissionDenied {
anyhow::anyhow!(
"Permission denied: cannot create locks directory at {}",
locks_dir.display()
)
} else if e.raw_os_error() == Some(28) {
anyhow::anyhow!("No space left on device to create locks directory")
} else {
anyhow::anyhow!("Failed to create directory {}: {}", locks_dir.display(), e)
}
})?;
let lock_path = locks_dir.join(format!("{source_name}.lock"));
let lock_path_clone = lock_path.clone();
let source_name = source_name.to_string();
let file = tokio::task::spawn_blocking(move || -> Result<File> {
let file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&lock_path_clone)
.with_context(|| {
format!("Failed to open lock file: {}", lock_path_clone.display())
})?;
file.lock_exclusive()
.with_context(|| format!("Failed to acquire lock for: {source_name}"))?;
Ok(file)
})
.await
.context("Failed to spawn blocking task for lock acquisition")??;
Ok(Self {
_file: file,
})
}
}
pub async fn cleanup_stale_locks(cache_dir: &Path, ttl_seconds: u64) -> Result<usize> {
use std::time::{Duration, SystemTime};
use tokio::fs;
let locks_dir = cache_dir.join(".locks");
if !locks_dir.exists() {
return Ok(0);
}
let mut removed_count = 0;
let now = SystemTime::now();
let ttl_duration = Duration::from_secs(ttl_seconds);
let mut entries = fs::read_dir(&locks_dir).await.context("Failed to read locks directory")?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) != Some("lock") {
continue;
}
let metadata = match fs::metadata(&path).await {
Ok(m) => m,
Err(_) => continue, };
let modified = match metadata.modified() {
Ok(t) => t,
Err(_) => continue, };
if let Ok(age) = now.duration_since(modified)
&& age > ttl_duration
{
if fs::remove_file(&path).await.is_ok() {
removed_count += 1;
}
}
}
Ok(removed_count)
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_cache_lock_acquire_and_release() {
let temp_dir = TempDir::new().unwrap();
let cache_dir = temp_dir.path();
let lock = CacheLock::acquire(cache_dir, "test_source").await.unwrap();
let lock_path = cache_dir.join(".locks").join("test_source.lock");
assert!(lock_path.exists());
drop(lock);
assert!(lock_path.exists());
}
#[tokio::test]
async fn test_cache_lock_creates_locks_directory() {
let temp_dir = TempDir::new().unwrap();
let cache_dir = temp_dir.path();
let locks_dir = cache_dir.join(".locks");
assert!(!locks_dir.exists());
let _lock = CacheLock::acquire(cache_dir, "test").await.unwrap();
assert!(locks_dir.exists());
assert!(locks_dir.is_dir());
}
#[tokio::test]
async fn test_cache_lock_exclusive_blocking() {
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Barrier;
let temp_dir = TempDir::new().unwrap();
let cache_dir = Arc::new(temp_dir.path().to_path_buf());
let barrier = Arc::new(Barrier::new(2));
let cache_dir1 = cache_dir.clone();
let barrier1 = barrier.clone();
let handle1 = tokio::spawn(async move {
let _lock = CacheLock::acquire(&cache_dir1, "exclusive_test").await.unwrap();
barrier1.wait().await; tokio::time::sleep(Duration::from_millis(100)).await; });
let cache_dir2 = cache_dir.clone();
let handle2 = tokio::spawn(async move {
barrier.wait().await; let start = Instant::now();
let _lock = CacheLock::acquire(&cache_dir2, "exclusive_test").await.unwrap();
let elapsed = start.elapsed();
assert!(elapsed >= Duration::from_millis(50));
});
handle1.await.unwrap();
handle2.await.unwrap();
}
#[tokio::test]
async fn test_cache_lock_different_sources_dont_block() {
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Barrier;
let temp_dir = TempDir::new().unwrap();
let cache_dir = Arc::new(temp_dir.path().to_path_buf());
let barrier = Arc::new(Barrier::new(2));
let cache_dir1 = cache_dir.clone();
let barrier1 = barrier.clone();
let handle1 = tokio::spawn(async move {
let _lock = CacheLock::acquire(&cache_dir1, "source1").await.unwrap();
barrier1.wait().await;
tokio::time::sleep(Duration::from_millis(100)).await;
});
let cache_dir2 = cache_dir.clone();
let handle2 = tokio::spawn(async move {
barrier.wait().await;
let start = Instant::now();
let _lock = CacheLock::acquire(&cache_dir2, "source2").await.unwrap();
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_millis(200),
"Lock acquisition took {:?}, expected < 200ms for non-blocking operation",
elapsed
);
});
handle1.await.unwrap();
handle2.await.unwrap();
}
#[tokio::test]
async fn test_cache_lock_path_with_special_characters() {
let temp_dir = TempDir::new().unwrap();
let cache_dir = temp_dir.path();
let special_names = vec![
"source-with-dash",
"source_with_underscore",
"source.with.dots",
"source@special",
];
for name in special_names {
let lock = CacheLock::acquire(cache_dir, name).await.unwrap();
let expected_path = cache_dir.join(".locks").join(format!("{name}.lock"));
assert!(expected_path.exists());
drop(lock);
}
}
}