1use crate::constants::{MAX_BACKOFF_DELAY_MS, STARTING_BACKOFF_DELAY_MS, default_lock_timeout};
14use anyhow::{Context, Result};
15use fs4::fs_std::FileExt;
16use std::fs::{File, OpenOptions};
17use std::path::Path;
18use std::sync::Arc;
19use std::time::Duration;
20use tokio_retry::strategy::ExponentialBackoff;
21use tracing::debug;
22
23#[derive(Debug)]
29pub struct CacheLock {
30 _file: Arc<File>,
32 lock_name: String,
34}
35
36impl Drop for CacheLock {
37 fn drop(&mut self) {
38 debug!(lock_name = %self.lock_name, "File lock released");
39 }
40}
41
42impl CacheLock {
43 pub async fn acquire(cache_dir: &Path, source_name: &str) -> Result<Self> {
93 Self::acquire_with_timeout(cache_dir, source_name, default_lock_timeout()).await
94 }
95
96 pub async fn acquire_with_timeout(
119 cache_dir: &Path,
120 source_name: &str,
121 timeout: std::time::Duration,
122 ) -> Result<Self> {
123 use tokio::fs;
124
125 let lock_name = format!("file:{}", source_name);
126 debug!(lock_name = %lock_name, "Waiting for file lock");
127
128 let locks_dir = cache_dir.join(".locks");
130 fs::create_dir_all(&locks_dir).await.with_context(|| {
131 format!("Failed to create locks directory: {}", locks_dir.display())
132 })?;
133
134 let lock_path = locks_dir.join(format!("{source_name}.lock"));
136
137 let lock_path_clone = lock_path.clone();
140 let file = tokio::task::spawn_blocking(move || {
141 OpenOptions::new().create(true).write(true).truncate(false).open(&lock_path_clone)
142 })
143 .await
144 .with_context(|| "spawn_blocking panicked")?
145 .with_context(|| format!("Failed to open lock file: {}", lock_path.display()))?;
146
147 let file = Arc::new(file);
149
150 let start = std::time::Instant::now();
152
153 let backoff = ExponentialBackoff::from_millis(STARTING_BACKOFF_DELAY_MS)
157 .max_delay(Duration::from_millis(MAX_BACKOFF_DELAY_MS));
158
159 let mut rng_state: u64 = std::time::SystemTime::now()
161 .duration_since(std::time::UNIX_EPOCH)
162 .map(|d| d.as_nanos() as u64)
163 .unwrap_or(12345);
164
165 for delay in backoff {
166 rng_state ^= rng_state << 13;
168 rng_state ^= rng_state >> 7;
169 rng_state ^= rng_state << 17;
170 let jitter_factor = 1.0 + (rng_state % 25) as f64 / 100.0;
171 let jittered_delay =
172 Duration::from_millis((delay.as_millis() as f64 * jitter_factor) as u64);
173 let file_clone = Arc::clone(&file);
175 let lock_result = tokio::task::spawn_blocking(move || file_clone.try_lock_exclusive())
176 .await
177 .with_context(|| "spawn_blocking panicked")?;
178
179 match lock_result {
180 Ok(true) => {
181 debug!(
182 lock_name = %lock_name,
183 wait_ms = start.elapsed().as_millis(),
184 "File lock acquired"
185 );
186 return Ok(Self {
187 _file: file,
188 lock_name,
189 });
190 }
191 Ok(false) | Err(_) => {
192 let remaining = timeout.saturating_sub(start.elapsed());
194 if remaining.is_zero() {
195 return Err(anyhow::anyhow!(
196 "Timeout acquiring lock for '{}' after {:?}",
197 source_name,
198 timeout
199 ));
200 }
201 tokio::time::sleep(jittered_delay.min(remaining)).await;
203 }
204 }
205 }
206
207 Err(anyhow::anyhow!("Timeout acquiring lock for '{}' after {:?}", source_name, timeout))
209 }
210
211 pub async fn acquire_shared(cache_dir: &Path, source_name: &str) -> Result<Self> {
242 Self::acquire_shared_with_timeout(cache_dir, source_name, default_lock_timeout()).await
243 }
244
245 pub async fn acquire_shared_with_timeout(
253 cache_dir: &Path,
254 source_name: &str,
255 timeout: std::time::Duration,
256 ) -> Result<Self> {
257 use tokio::fs;
258
259 let lock_name = format!("file-shared:{}", source_name);
260 debug!(lock_name = %lock_name, "Waiting for shared file lock");
261
262 let locks_dir = cache_dir.join(".locks");
264 fs::create_dir_all(&locks_dir).await.with_context(|| {
265 format!("Failed to create locks directory: {}", locks_dir.display())
266 })?;
267
268 let lock_path = locks_dir.join(format!("{source_name}.lock"));
270
271 let lock_path_clone = lock_path.clone();
273 let file = tokio::task::spawn_blocking(move || {
274 OpenOptions::new().create(true).write(true).truncate(false).open(&lock_path_clone)
275 })
276 .await
277 .with_context(|| "spawn_blocking panicked")?
278 .with_context(|| format!("Failed to open lock file: {}", lock_path.display()))?;
279
280 let file = Arc::new(file);
282
283 let start = std::time::Instant::now();
285
286 let backoff = ExponentialBackoff::from_millis(STARTING_BACKOFF_DELAY_MS)
287 .max_delay(Duration::from_millis(MAX_BACKOFF_DELAY_MS));
288
289 let mut rng_state: u64 = std::time::SystemTime::now()
291 .duration_since(std::time::UNIX_EPOCH)
292 .map(|d| d.as_nanos() as u64)
293 .unwrap_or(12345);
294
295 for delay in backoff {
296 rng_state ^= rng_state << 13;
297 rng_state ^= rng_state >> 7;
298 rng_state ^= rng_state << 17;
299 let jitter_factor = 1.0 + (rng_state % 25) as f64 / 100.0;
300 let jittered_delay =
301 Duration::from_millis((delay.as_millis() as f64 * jitter_factor) as u64);
302
303 let file_clone = Arc::clone(&file);
306 let lock_result =
307 tokio::task::spawn_blocking(move || FileExt::try_lock_shared(file_clone.as_ref()))
308 .await
309 .with_context(|| "spawn_blocking panicked")?;
310
311 match lock_result {
312 Ok(true) => {
313 debug!(
314 lock_name = %lock_name,
315 wait_ms = start.elapsed().as_millis(),
316 "Shared file lock acquired"
317 );
318 return Ok(Self {
319 _file: file,
320 lock_name,
321 });
322 }
323 Ok(false) | Err(_) => {
324 let remaining = timeout.saturating_sub(start.elapsed());
326 if remaining.is_zero() {
327 return Err(anyhow::anyhow!(
328 "Timeout acquiring shared lock for '{}' after {:?}",
329 source_name,
330 timeout
331 ));
332 }
333 tokio::time::sleep(jittered_delay.min(remaining)).await;
334 }
335 }
336 }
337
338 Err(anyhow::anyhow!(
339 "Timeout acquiring shared lock for '{}' after {:?}",
340 source_name,
341 timeout
342 ))
343 }
344}
345
346pub async fn cleanup_stale_locks(cache_dir: &Path, ttl_seconds: u64) -> Result<usize> {
365 use std::time::{Duration, SystemTime};
366 use tokio::fs;
367
368 let locks_dir = cache_dir.join(".locks");
369 if !locks_dir.exists() {
370 return Ok(0);
371 }
372
373 let mut removed_count = 0;
374 let now = SystemTime::now();
375 let ttl_duration = Duration::from_secs(ttl_seconds);
376
377 let mut entries = fs::read_dir(&locks_dir).await.context("Failed to read locks directory")?;
378
379 while let Some(entry) = entries.next_entry().await? {
380 let path = entry.path();
381
382 if path.extension().and_then(|s| s.to_str()) != Some("lock") {
384 continue;
385 }
386
387 let Ok(metadata) = fs::metadata(&path).await else {
389 continue; };
391
392 let Ok(modified) = metadata.modified() else {
393 continue; };
395
396 if let Ok(age) = now.duration_since(modified)
398 && age > ttl_duration
399 {
400 if fs::remove_file(&path).await.is_ok() {
402 removed_count += 1;
403 }
404 }
405 }
406
407 Ok(removed_count)
408}
409
410#[cfg(test)]
411mod tests {
412 use super::*;
413 use tempfile::TempDir;
414
415 #[tokio::test]
416 async fn test_cache_lock_acquire_and_release() {
417 let temp_dir = TempDir::new().unwrap();
418 let cache_dir = temp_dir.path();
419
420 let lock = CacheLock::acquire(cache_dir, "test_source").await.unwrap();
422
423 let lock_path = cache_dir.join(".locks").join("test_source.lock");
425 assert!(lock_path.exists());
426
427 drop(lock);
429
430 assert!(lock_path.exists());
432 }
433
434 #[tokio::test]
435 async fn test_cache_lock_creates_locks_directory() {
436 let temp_dir = TempDir::new().unwrap();
437 let cache_dir = temp_dir.path();
438
439 let locks_dir = cache_dir.join(".locks");
441 assert!(!locks_dir.exists());
442
443 let lock = CacheLock::acquire(cache_dir, "test").await.unwrap();
445
446 assert!(locks_dir.exists());
448 assert!(locks_dir.is_dir());
449
450 drop(lock);
452 }
453
454 #[tokio::test]
455 async fn test_cache_lock_exclusive_blocking() {
456 use std::sync::Arc;
457 use std::time::{Duration, Instant};
458 use tokio::sync::Barrier;
459
460 let temp_dir = TempDir::new().unwrap();
461 let cache_dir = Arc::new(temp_dir.path().to_path_buf());
462 let barrier = Arc::new(Barrier::new(2));
463
464 let cache_dir1 = cache_dir.clone();
465 let barrier1 = barrier.clone();
466
467 let handle1 = tokio::spawn(async move {
469 let _lock = CacheLock::acquire(&cache_dir1, "exclusive_test").await.unwrap();
470 barrier1.wait().await; tokio::time::sleep(Duration::from_millis(100)).await; });
474
475 let cache_dir2 = cache_dir.clone();
476
477 let handle2 = tokio::spawn(async move {
479 barrier.wait().await; let start = Instant::now();
481 let _lock = CacheLock::acquire(&cache_dir2, "exclusive_test").await.unwrap();
482 let elapsed = start.elapsed();
483
484 assert!(elapsed >= Duration::from_millis(50));
486 });
487
488 handle1.await.unwrap();
489 handle2.await.unwrap();
490 }
491
492 #[tokio::test]
493 async fn test_cache_lock_different_sources_dont_block() {
494 use std::sync::Arc;
495 use std::time::{Duration, Instant};
496 use tokio::sync::Barrier;
497
498 let temp_dir = TempDir::new().unwrap();
499 let cache_dir = Arc::new(temp_dir.path().to_path_buf());
500 let barrier = Arc::new(Barrier::new(2));
501
502 let cache_dir1 = cache_dir.clone();
503 let barrier1 = barrier.clone();
504
505 let handle1 = tokio::spawn(async move {
507 let _lock = CacheLock::acquire(&cache_dir1, "source1").await.unwrap();
508 barrier1.wait().await;
509 tokio::time::sleep(Duration::from_millis(100)).await;
510 });
511
512 let cache_dir2 = cache_dir.clone();
513
514 let handle2 = tokio::spawn(async move {
516 barrier.wait().await;
517 let start = Instant::now();
518 let _lock = CacheLock::acquire(&cache_dir2, "source2").await.unwrap();
519 let elapsed = start.elapsed();
520
521 assert!(
524 elapsed < Duration::from_millis(200),
525 "Lock acquisition took {:?}, expected < 200ms for non-blocking operation",
526 elapsed
527 );
528 });
529
530 handle1.await.unwrap();
531 handle2.await.unwrap();
532 }
533
534 #[tokio::test]
535 async fn test_cache_lock_path_with_special_characters() {
536 let temp_dir = TempDir::new().unwrap();
537 let cache_dir = temp_dir.path();
538
539 let special_names = vec![
541 "source-with-dash",
542 "source_with_underscore",
543 "source.with.dots",
544 "source@special",
545 ];
546
547 for name in special_names {
548 let lock = CacheLock::acquire(cache_dir, name).await.unwrap();
549 let expected_path = cache_dir.join(".locks").join(format!("{name}.lock"));
550 assert!(expected_path.exists());
551 drop(lock);
552 }
553 }
554
555 #[tokio::test]
556 async fn test_cache_lock_acquire_timeout() {
557 let temp_dir = TempDir::new().unwrap();
558 let cache_dir = temp_dir.path();
559
560 let _lock1 = CacheLock::acquire(cache_dir, "test-source").await.unwrap();
562
563 let start = std::time::Instant::now();
565 let result =
566 CacheLock::acquire_with_timeout(cache_dir, "test-source", Duration::from_millis(100))
567 .await;
568
569 let elapsed = start.elapsed();
570
571 assert!(result.is_err(), "Expected timeout error");
573
574 match result {
576 Ok(_) => panic!("Expected timeout error, but got success"),
577 Err(error) => {
578 let error_msg = error.to_string();
579 assert!(
580 error_msg.contains("Timeout") || error_msg.contains("timeout"),
581 "Error message should mention timeout: {}",
582 error_msg
583 );
584 assert!(
585 error_msg.contains("test-source"),
586 "Error message should include source name: {}",
587 error_msg
588 );
589 }
590 }
591
592 assert!(elapsed >= Duration::from_millis(50), "Timeout too quick: {:?}", elapsed);
595 assert!(elapsed < Duration::from_millis(500), "Timeout too slow: {:?}", elapsed);
596 }
597
598 #[tokio::test]
599 async fn test_cache_lock_acquire_timeout_succeeds_eventually() {
600 let temp_dir = TempDir::new().unwrap();
601 let cache_dir = temp_dir.path();
602
603 let cache_dir_clone = cache_dir.to_path_buf();
605 let handle = tokio::spawn(async move {
606 let lock = CacheLock::acquire(&cache_dir_clone, "test-source").await.unwrap();
607 tokio::time::sleep(Duration::from_millis(50)).await;
608 drop(lock); });
610
611 tokio::time::sleep(Duration::from_millis(10)).await;
613
614 let result =
617 CacheLock::acquire_with_timeout(cache_dir, "test-source", Duration::from_millis(500))
618 .await;
619
620 assert!(result.is_ok(), "Lock should be acquired after first one is released");
621
622 handle.await.unwrap();
624 }
625
626 #[tokio::test]
627 async fn test_shared_locks_dont_block_each_other() {
628 use std::sync::Arc;
629 use std::time::{Duration, Instant};
630 use tokio::sync::Barrier;
631
632 let temp_dir = TempDir::new().unwrap();
633 let cache_dir = Arc::new(temp_dir.path().to_path_buf());
634 let barrier = Arc::new(Barrier::new(2));
635
636 let cache_dir1 = cache_dir.clone();
637 let barrier1 = barrier.clone();
638
639 let handle1 = tokio::spawn(async move {
641 let _lock = CacheLock::acquire_shared(&cache_dir1, "shared_test").await.unwrap();
642 barrier1.wait().await; tokio::time::sleep(Duration::from_millis(100)).await; });
645
646 let cache_dir2 = cache_dir.clone();
647
648 let handle2 = tokio::spawn(async move {
650 barrier.wait().await; let start = Instant::now();
652 let _lock = CacheLock::acquire_shared(&cache_dir2, "shared_test").await.unwrap();
653 let elapsed = start.elapsed();
654
655 assert!(
657 elapsed < Duration::from_millis(200),
658 "Shared lock took {:?}, expected < 200ms (no blocking)",
659 elapsed
660 );
661 });
662
663 handle1.await.unwrap();
664 handle2.await.unwrap();
665 }
666
667 #[tokio::test]
668 async fn test_exclusive_blocks_shared() {
669 use std::sync::Arc;
670 use std::time::{Duration, Instant};
671 use tokio::sync::Barrier;
672
673 let temp_dir = TempDir::new().unwrap();
674 let cache_dir = Arc::new(temp_dir.path().to_path_buf());
675 let barrier = Arc::new(Barrier::new(2));
676
677 let cache_dir1 = cache_dir.clone();
678 let barrier1 = barrier.clone();
679
680 let handle1 = tokio::spawn(async move {
682 let _lock = CacheLock::acquire(&cache_dir1, "exclusive_shared_test").await.unwrap();
683 barrier1.wait().await;
684 tokio::time::sleep(Duration::from_millis(100)).await;
685 });
686
687 let cache_dir2 = cache_dir.clone();
688
689 let handle2 = tokio::spawn(async move {
691 barrier.wait().await;
692 let start = Instant::now();
693 let _lock =
694 CacheLock::acquire_shared(&cache_dir2, "exclusive_shared_test").await.unwrap();
695 let elapsed = start.elapsed();
696
697 assert!(
699 elapsed >= Duration::from_millis(50),
700 "Shared lock should have blocked: {:?}",
701 elapsed
702 );
703 });
704
705 handle1.await.unwrap();
706 handle2.await.unwrap();
707 }
708
709 #[tokio::test]
710 async fn test_shared_blocks_exclusive() {
711 use std::sync::Arc;
712 use std::time::{Duration, Instant};
713 use tokio::sync::Barrier;
714
715 let temp_dir = TempDir::new().unwrap();
716 let cache_dir = Arc::new(temp_dir.path().to_path_buf());
717 let barrier = Arc::new(Barrier::new(2));
718
719 let cache_dir1 = cache_dir.clone();
720 let barrier1 = barrier.clone();
721
722 let handle1 = tokio::spawn(async move {
724 let _lock =
725 CacheLock::acquire_shared(&cache_dir1, "shared_exclusive_test").await.unwrap();
726 barrier1.wait().await;
727 tokio::time::sleep(Duration::from_millis(100)).await;
728 });
729
730 let cache_dir2 = cache_dir.clone();
731
732 let handle2 = tokio::spawn(async move {
734 barrier.wait().await;
735 let start = Instant::now();
736 let _lock = CacheLock::acquire(&cache_dir2, "shared_exclusive_test").await.unwrap();
737 let elapsed = start.elapsed();
738
739 assert!(
741 elapsed >= Duration::from_millis(50),
742 "Exclusive lock should have blocked: {:?}",
743 elapsed
744 );
745 });
746
747 handle1.await.unwrap();
748 handle2.await.unwrap();
749 }
750}