use anyhow::{Context, Result};
use fs4::fs_std::FileExt;
use std::fs::{File, OpenOptions};
use std::path::Path;
pub struct CacheLock {
_file: File,
}
impl CacheLock {
pub async fn acquire(cache_dir: &Path, source_name: &str) -> Result<Self> {
use tokio::fs;
use tokio::task::spawn_blocking;
let locks_dir = cache_dir.join(".locks");
fs::create_dir_all(&locks_dir).await.with_context(|| {
format!("Failed to create locks directory: {}", locks_dir.display())
})?;
let lock_path = locks_dir.join(format!("{source_name}.lock"));
let file = spawn_blocking(move || -> Result<File> {
let file = OpenOptions::new()
.create(true)
.write(true)
.truncate(false)
.open(&lock_path)
.with_context(|| format!("Failed to open lock file: {}", lock_path.display()))?;
file.lock_exclusive()
.with_context(|| format!("Failed to acquire lock: {}", lock_path.display()))?;
Ok(file)
})
.await
.with_context(|| "Failed to spawn blocking task for lock acquisition")??;
Ok(Self {
_file: file,
})
}
}
pub async fn cleanup_stale_locks(cache_dir: &Path, ttl_seconds: u64) -> Result<usize> {
use std::time::{Duration, SystemTime};
use tokio::fs;
let locks_dir = cache_dir.join(".locks");
if !locks_dir.exists() {
return Ok(0);
}
let mut removed_count = 0;
let now = SystemTime::now();
let ttl_duration = Duration::from_secs(ttl_seconds);
let mut entries = fs::read_dir(&locks_dir).await.context("Failed to read locks directory")?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) != Some("lock") {
continue;
}
let Ok(metadata) = fs::metadata(&path).await else {
continue; };
let Ok(modified) = metadata.modified() else {
continue; };
if let Ok(age) = now.duration_since(modified)
&& age > ttl_duration
{
if fs::remove_file(&path).await.is_ok() {
removed_count += 1;
}
}
}
Ok(removed_count)
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_cache_lock_acquire_and_release() {
let temp_dir = TempDir::new().unwrap();
let cache_dir = temp_dir.path();
let lock = CacheLock::acquire(cache_dir, "test_source").await.unwrap();
let lock_path = cache_dir.join(".locks").join("test_source.lock");
assert!(lock_path.exists());
drop(lock);
assert!(lock_path.exists());
}
#[tokio::test]
async fn test_cache_lock_creates_locks_directory() {
let temp_dir = TempDir::new().unwrap();
let cache_dir = temp_dir.path();
let locks_dir = cache_dir.join(".locks");
assert!(!locks_dir.exists());
let lock = CacheLock::acquire(cache_dir, "test").await.unwrap();
assert!(locks_dir.exists());
assert!(locks_dir.is_dir());
drop(lock);
}
#[tokio::test]
async fn test_cache_lock_exclusive_blocking() {
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Barrier;
let temp_dir = TempDir::new().unwrap();
let cache_dir = Arc::new(temp_dir.path().to_path_buf());
let barrier = Arc::new(Barrier::new(2));
let cache_dir1 = cache_dir.clone();
let barrier1 = barrier.clone();
let handle1 = tokio::spawn(async move {
let _lock = CacheLock::acquire(&cache_dir1, "exclusive_test").await.unwrap();
barrier1.wait().await; tokio::time::sleep(Duration::from_millis(100)).await; });
let cache_dir2 = cache_dir.clone();
let handle2 = tokio::spawn(async move {
barrier.wait().await; let start = Instant::now();
let _lock = CacheLock::acquire(&cache_dir2, "exclusive_test").await.unwrap();
let elapsed = start.elapsed();
assert!(elapsed >= Duration::from_millis(50));
});
handle1.await.unwrap();
handle2.await.unwrap();
}
#[tokio::test]
async fn test_cache_lock_different_sources_dont_block() {
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Barrier;
let temp_dir = TempDir::new().unwrap();
let cache_dir = Arc::new(temp_dir.path().to_path_buf());
let barrier = Arc::new(Barrier::new(2));
let cache_dir1 = cache_dir.clone();
let barrier1 = barrier.clone();
let handle1 = tokio::spawn(async move {
let _lock = CacheLock::acquire(&cache_dir1, "source1").await.unwrap();
barrier1.wait().await;
tokio::time::sleep(Duration::from_millis(100)).await;
});
let cache_dir2 = cache_dir.clone();
let handle2 = tokio::spawn(async move {
barrier.wait().await;
let start = Instant::now();
let _lock = CacheLock::acquire(&cache_dir2, "source2").await.unwrap();
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_millis(200),
"Lock acquisition took {:?}, expected < 200ms for non-blocking operation",
elapsed
);
});
handle1.await.unwrap();
handle2.await.unwrap();
}
#[tokio::test]
async fn test_cache_lock_path_with_special_characters() {
let temp_dir = TempDir::new().unwrap();
let cache_dir = temp_dir.path();
let special_names = vec![
"source-with-dash",
"source_with_underscore",
"source.with.dots",
"source@special",
];
for name in special_names {
let lock = CacheLock::acquire(cache_dir, name).await.unwrap();
let expected_path = cache_dir.join(".locks").join(format!("{name}.lock"));
assert!(expected_path.exists());
drop(lock);
}
}
}