use crate::constants::{MAX_BACKOFF_DELAY_MS, STARTING_BACKOFF_DELAY_MS, default_lock_timeout};
use anyhow::{Context, Result};
use fs4::fs_std::FileExt;
use std::fs::{File, OpenOptions};
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use tokio_retry::strategy::ExponentialBackoff;
use tracing::debug;
#[derive(Debug)]
pub struct CacheLock {
_file: Arc<File>,
lock_name: String,
}
impl Drop for CacheLock {
fn drop(&mut self) {
debug!(lock_name = %self.lock_name, "File lock released");
}
}
impl CacheLock {
pub async fn acquire(cache_dir: &Path, source_name: &str) -> Result<Self> {
Self::acquire_with_timeout(cache_dir, source_name, default_lock_timeout()).await
}
pub async fn acquire_with_timeout(
cache_dir: &Path,
source_name: &str,
timeout: std::time::Duration,
) -> Result<Self> {
use tokio::fs;
let lock_name = format!("file:{}", source_name);
debug!(lock_name = %lock_name, "Waiting for file lock");
let locks_dir = cache_dir.join(".locks");
fs::create_dir_all(&locks_dir).await.with_context(|| {
format!("Failed to create locks directory: {}", locks_dir.display())
})?;
let lock_path = locks_dir.join(format!("{source_name}.lock"));
let lock_path_clone = lock_path.clone();
let file = tokio::task::spawn_blocking(move || {
OpenOptions::new().create(true).write(true).truncate(false).open(&lock_path_clone)
})
.await
.with_context(|| "spawn_blocking panicked")?
.with_context(|| format!("Failed to open lock file: {}", lock_path.display()))?;
let file = Arc::new(file);
let start = std::time::Instant::now();
let backoff = ExponentialBackoff::from_millis(STARTING_BACKOFF_DELAY_MS)
.max_delay(Duration::from_millis(MAX_BACKOFF_DELAY_MS));
let mut rng_state: u64 = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(12345);
for delay in backoff {
rng_state ^= rng_state << 13;
rng_state ^= rng_state >> 7;
rng_state ^= rng_state << 17;
let jitter_factor = 1.0 + (rng_state % 25) as f64 / 100.0;
let jittered_delay =
Duration::from_millis((delay.as_millis() as f64 * jitter_factor) as u64);
let file_clone = Arc::clone(&file);
let lock_result = tokio::task::spawn_blocking(move || file_clone.try_lock_exclusive())
.await
.with_context(|| "spawn_blocking panicked")?;
match lock_result {
Ok(true) => {
debug!(
lock_name = %lock_name,
wait_ms = start.elapsed().as_millis(),
"File lock acquired"
);
return Ok(Self {
_file: file,
lock_name,
});
}
Ok(false) | Err(_) => {
let remaining = timeout.saturating_sub(start.elapsed());
if remaining.is_zero() {
return Err(anyhow::anyhow!(
"Timeout acquiring lock for '{}' after {:?}",
source_name,
timeout
));
}
tokio::time::sleep(jittered_delay.min(remaining)).await;
}
}
}
Err(anyhow::anyhow!("Timeout acquiring lock for '{}' after {:?}", source_name, timeout))
}
pub async fn acquire_shared(cache_dir: &Path, source_name: &str) -> Result<Self> {
Self::acquire_shared_with_timeout(cache_dir, source_name, default_lock_timeout()).await
}
pub async fn acquire_shared_with_timeout(
cache_dir: &Path,
source_name: &str,
timeout: std::time::Duration,
) -> Result<Self> {
use tokio::fs;
let lock_name = format!("file-shared:{}", source_name);
debug!(lock_name = %lock_name, "Waiting for shared file lock");
let locks_dir = cache_dir.join(".locks");
fs::create_dir_all(&locks_dir).await.with_context(|| {
format!("Failed to create locks directory: {}", locks_dir.display())
})?;
let lock_path = locks_dir.join(format!("{source_name}.lock"));
let lock_path_clone = lock_path.clone();
let file = tokio::task::spawn_blocking(move || {
OpenOptions::new().create(true).write(true).truncate(false).open(&lock_path_clone)
})
.await
.with_context(|| "spawn_blocking panicked")?
.with_context(|| format!("Failed to open lock file: {}", lock_path.display()))?;
let file = Arc::new(file);
let start = std::time::Instant::now();
let backoff = ExponentialBackoff::from_millis(STARTING_BACKOFF_DELAY_MS)
.max_delay(Duration::from_millis(MAX_BACKOFF_DELAY_MS));
let mut rng_state: u64 = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(12345);
for delay in backoff {
rng_state ^= rng_state << 13;
rng_state ^= rng_state >> 7;
rng_state ^= rng_state << 17;
let jitter_factor = 1.0 + (rng_state % 25) as f64 / 100.0;
let jittered_delay =
Duration::from_millis((delay.as_millis() as f64 * jitter_factor) as u64);
let file_clone = Arc::clone(&file);
let lock_result =
tokio::task::spawn_blocking(move || FileExt::try_lock_shared(file_clone.as_ref()))
.await
.with_context(|| "spawn_blocking panicked")?;
match lock_result {
Ok(true) => {
debug!(
lock_name = %lock_name,
wait_ms = start.elapsed().as_millis(),
"Shared file lock acquired"
);
return Ok(Self {
_file: file,
lock_name,
});
}
Ok(false) | Err(_) => {
let remaining = timeout.saturating_sub(start.elapsed());
if remaining.is_zero() {
return Err(anyhow::anyhow!(
"Timeout acquiring shared lock for '{}' after {:?}",
source_name,
timeout
));
}
tokio::time::sleep(jittered_delay.min(remaining)).await;
}
}
}
Err(anyhow::anyhow!(
"Timeout acquiring shared lock for '{}' after {:?}",
source_name,
timeout
))
}
}
pub async fn cleanup_stale_locks(cache_dir: &Path, ttl_seconds: u64) -> Result<usize> {
use std::time::{Duration, SystemTime};
use tokio::fs;
let locks_dir = cache_dir.join(".locks");
if !locks_dir.exists() {
return Ok(0);
}
let mut removed_count = 0;
let now = SystemTime::now();
let ttl_duration = Duration::from_secs(ttl_seconds);
let mut entries = fs::read_dir(&locks_dir).await.context("Failed to read locks directory")?;
while let Some(entry) = entries.next_entry().await? {
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) != Some("lock") {
continue;
}
let Ok(metadata) = fs::metadata(&path).await else {
continue; };
let Ok(modified) = metadata.modified() else {
continue; };
if let Ok(age) = now.duration_since(modified)
&& age > ttl_duration
{
if fs::remove_file(&path).await.is_ok() {
removed_count += 1;
}
}
}
Ok(removed_count)
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_cache_lock_acquire_and_release() {
let temp_dir = TempDir::new().unwrap();
let cache_dir = temp_dir.path();
let lock = CacheLock::acquire(cache_dir, "test_source").await.unwrap();
let lock_path = cache_dir.join(".locks").join("test_source.lock");
assert!(lock_path.exists());
drop(lock);
assert!(lock_path.exists());
}
#[tokio::test]
async fn test_cache_lock_creates_locks_directory() {
let temp_dir = TempDir::new().unwrap();
let cache_dir = temp_dir.path();
let locks_dir = cache_dir.join(".locks");
assert!(!locks_dir.exists());
let lock = CacheLock::acquire(cache_dir, "test").await.unwrap();
assert!(locks_dir.exists());
assert!(locks_dir.is_dir());
drop(lock);
}
#[tokio::test]
async fn test_cache_lock_exclusive_blocking() {
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Barrier;
let temp_dir = TempDir::new().unwrap();
let cache_dir = Arc::new(temp_dir.path().to_path_buf());
let barrier = Arc::new(Barrier::new(2));
let cache_dir1 = cache_dir.clone();
let barrier1 = barrier.clone();
let handle1 = tokio::spawn(async move {
let _lock = CacheLock::acquire(&cache_dir1, "exclusive_test").await.unwrap();
barrier1.wait().await; tokio::time::sleep(Duration::from_millis(100)).await; });
let cache_dir2 = cache_dir.clone();
let handle2 = tokio::spawn(async move {
barrier.wait().await; let start = Instant::now();
let _lock = CacheLock::acquire(&cache_dir2, "exclusive_test").await.unwrap();
let elapsed = start.elapsed();
assert!(elapsed >= Duration::from_millis(50));
});
handle1.await.unwrap();
handle2.await.unwrap();
}
#[tokio::test]
async fn test_cache_lock_different_sources_dont_block() {
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Barrier;
let temp_dir = TempDir::new().unwrap();
let cache_dir = Arc::new(temp_dir.path().to_path_buf());
let barrier = Arc::new(Barrier::new(2));
let cache_dir1 = cache_dir.clone();
let barrier1 = barrier.clone();
let handle1 = tokio::spawn(async move {
let _lock = CacheLock::acquire(&cache_dir1, "source1").await.unwrap();
barrier1.wait().await;
tokio::time::sleep(Duration::from_millis(100)).await;
});
let cache_dir2 = cache_dir.clone();
let handle2 = tokio::spawn(async move {
barrier.wait().await;
let start = Instant::now();
let _lock = CacheLock::acquire(&cache_dir2, "source2").await.unwrap();
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_millis(200),
"Lock acquisition took {:?}, expected < 200ms for non-blocking operation",
elapsed
);
});
handle1.await.unwrap();
handle2.await.unwrap();
}
#[tokio::test]
async fn test_cache_lock_path_with_special_characters() {
let temp_dir = TempDir::new().unwrap();
let cache_dir = temp_dir.path();
let special_names = vec![
"source-with-dash",
"source_with_underscore",
"source.with.dots",
"source@special",
];
for name in special_names {
let lock = CacheLock::acquire(cache_dir, name).await.unwrap();
let expected_path = cache_dir.join(".locks").join(format!("{name}.lock"));
assert!(expected_path.exists());
drop(lock);
}
}
#[tokio::test]
async fn test_cache_lock_acquire_timeout() {
let temp_dir = TempDir::new().unwrap();
let cache_dir = temp_dir.path();
let _lock1 = CacheLock::acquire(cache_dir, "test-source").await.unwrap();
let start = std::time::Instant::now();
let result =
CacheLock::acquire_with_timeout(cache_dir, "test-source", Duration::from_millis(100))
.await;
let elapsed = start.elapsed();
assert!(result.is_err(), "Expected timeout error");
match result {
Ok(_) => panic!("Expected timeout error, but got success"),
Err(error) => {
let error_msg = error.to_string();
assert!(
error_msg.contains("Timeout") || error_msg.contains("timeout"),
"Error message should mention timeout: {}",
error_msg
);
assert!(
error_msg.contains("test-source"),
"Error message should include source name: {}",
error_msg
);
}
}
assert!(elapsed >= Duration::from_millis(50), "Timeout too quick: {:?}", elapsed);
assert!(elapsed < Duration::from_millis(500), "Timeout too slow: {:?}", elapsed);
}
#[tokio::test]
async fn test_cache_lock_acquire_timeout_succeeds_eventually() {
let temp_dir = TempDir::new().unwrap();
let cache_dir = temp_dir.path();
let cache_dir_clone = cache_dir.to_path_buf();
let handle = tokio::spawn(async move {
let lock = CacheLock::acquire(&cache_dir_clone, "test-source").await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
drop(lock); });
tokio::time::sleep(Duration::from_millis(10)).await;
let result =
CacheLock::acquire_with_timeout(cache_dir, "test-source", Duration::from_millis(500))
.await;
assert!(result.is_ok(), "Lock should be acquired after first one is released");
handle.await.unwrap();
}
#[tokio::test]
async fn test_shared_locks_dont_block_each_other() {
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Barrier;
let temp_dir = TempDir::new().unwrap();
let cache_dir = Arc::new(temp_dir.path().to_path_buf());
let barrier = Arc::new(Barrier::new(2));
let cache_dir1 = cache_dir.clone();
let barrier1 = barrier.clone();
let handle1 = tokio::spawn(async move {
let _lock = CacheLock::acquire_shared(&cache_dir1, "shared_test").await.unwrap();
barrier1.wait().await; tokio::time::sleep(Duration::from_millis(100)).await; });
let cache_dir2 = cache_dir.clone();
let handle2 = tokio::spawn(async move {
barrier.wait().await; let start = Instant::now();
let _lock = CacheLock::acquire_shared(&cache_dir2, "shared_test").await.unwrap();
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_millis(200),
"Shared lock took {:?}, expected < 200ms (no blocking)",
elapsed
);
});
handle1.await.unwrap();
handle2.await.unwrap();
}
#[tokio::test]
async fn test_exclusive_blocks_shared() {
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Barrier;
let temp_dir = TempDir::new().unwrap();
let cache_dir = Arc::new(temp_dir.path().to_path_buf());
let barrier = Arc::new(Barrier::new(2));
let cache_dir1 = cache_dir.clone();
let barrier1 = barrier.clone();
let handle1 = tokio::spawn(async move {
let _lock = CacheLock::acquire(&cache_dir1, "exclusive_shared_test").await.unwrap();
barrier1.wait().await;
tokio::time::sleep(Duration::from_millis(100)).await;
});
let cache_dir2 = cache_dir.clone();
let handle2 = tokio::spawn(async move {
barrier.wait().await;
let start = Instant::now();
let _lock =
CacheLock::acquire_shared(&cache_dir2, "exclusive_shared_test").await.unwrap();
let elapsed = start.elapsed();
assert!(
elapsed >= Duration::from_millis(50),
"Shared lock should have blocked: {:?}",
elapsed
);
});
handle1.await.unwrap();
handle2.await.unwrap();
}
#[tokio::test]
async fn test_shared_blocks_exclusive() {
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Barrier;
let temp_dir = TempDir::new().unwrap();
let cache_dir = Arc::new(temp_dir.path().to_path_buf());
let barrier = Arc::new(Barrier::new(2));
let cache_dir1 = cache_dir.clone();
let barrier1 = barrier.clone();
let handle1 = tokio::spawn(async move {
let _lock =
CacheLock::acquire_shared(&cache_dir1, "shared_exclusive_test").await.unwrap();
barrier1.wait().await;
tokio::time::sleep(Duration::from_millis(100)).await;
});
let cache_dir2 = cache_dir.clone();
let handle2 = tokio::spawn(async move {
barrier.wait().await;
let start = Instant::now();
let _lock = CacheLock::acquire(&cache_dir2, "shared_exclusive_test").await.unwrap();
let elapsed = start.elapsed();
assert!(
elapsed >= Duration::from_millis(50),
"Exclusive lock should have blocked: {:?}",
elapsed
);
});
handle1.await.unwrap();
handle2.await.unwrap();
}
}