agpm_cli/cache/
lock.rs

1//! File locking utilities for cache operations.
2//!
3//! This module provides thread-safe and process-safe file locking for cache directories
4//! to prevent corruption during concurrent cache operations. The locks are automatically
5//! released when the lock object is dropped.
6
7use anyhow::{Context, Result};
8use fs4::fs_std::FileExt;
9use std::fs::{File, OpenOptions};
10use std::path::{Path, PathBuf};
11
12/// A file lock for cache operations
13pub struct CacheLock {
14    _file: File,
15    path: PathBuf,
16}
17
18impl CacheLock {
19    /// Acquires an exclusive lock for a specific source in the cache directory.
20    ///
21    /// This async method creates and acquires an exclusive file lock for the specified
22    /// source name. The file locking operation uses `spawn_blocking` internally to avoid
23    /// blocking the tokio runtime, while still providing blocking file lock semantics.
24    ///
25    /// # Lock File Management
26    ///
27    /// The method performs several setup operations:
28    /// 1. **Locks directory creation**: Creates `.locks/` directory if needed
29    /// 2. **Lock file creation**: Creates `{source_name}.lock` file
30    /// 3. **Exclusive locking**: Acquires exclusive access via OS file locking
31    /// 4. **Handle retention**: Keeps file handle open to maintain lock
32    ///
33    /// # Async and Blocking Behavior
34    ///
35    /// If another process already holds a lock for the same source:
36    /// - **Async-friendly**: Uses `spawn_blocking` to avoid blocking the tokio runtime
37    /// - **Blocking wait**: The spawned task blocks until other lock is released
38    /// - **Fair queuing**: Locks are typically acquired in FIFO order
39    /// - **No timeout**: Task will wait indefinitely (use with caution)
40    /// - **Interruptible**: Can be interrupted by process signals
41    ///
42    /// # Lock File Location
43    ///
44    /// Lock files are created in a dedicated subdirectory:
45    /// ```text
46    /// {cache_dir}/.locks/{source_name}.lock
47    /// ```
48    ///
49    /// Examples:
50    /// - `~/.agpm/cache/.locks/community.lock`
51    /// - `~/.agpm/cache/.locks/work-tools.lock`
52    /// - `~/.agpm/cache/.locks/my-project.lock`
53    ///
54    /// # Parameters
55    ///
56    /// * `cache_dir` - Root cache directory path
57    /// * `source_name` - Unique identifier for the source being locked
58    ///
59    /// # Returns
60    ///
61    /// Returns a `CacheLock` instance that holds the exclusive lock. The lock
62    /// remains active until the returned instance is dropped.
63    ///
64    /// # Errors
65    ///
66    /// The method can fail for several reasons:
67    ///
68    /// ## Directory Creation Errors
69    /// - Permission denied creating `.locks/` directory
70    /// - Disk space exhausted
71    /// - Path length exceeds system limits
72    ///
73    /// ## File Operation Errors
74    /// - Permission denied creating/opening lock file
75    /// - File system full
76    /// - Invalid characters in source name
77    ///
78    /// ## Locking Errors
79    /// - File locking not supported by file system
80    /// - Lock file corrupted or in invalid state
81    /// - System resource limits exceeded
82    ///
83    /// # Platform Considerations
84    ///
85    /// - **Windows**: Uses Win32 `LockFile` API via [`fs4`]
86    /// - **Unix**: Uses POSIX `fcntl()` locking via [`fs4`]
87    /// - **NFS/Network**: Behavior depends on file system support
88    /// - **Docker**: Works within containers with proper volume mounts
89    ///
90    /// # Examples
91    ///
92    /// Simple lock acquisition:
93    ///
94    /// ```rust,no_run
95    /// use agpm_cli::cache::lock::CacheLock;
96    /// use std::path::PathBuf;
97    ///
98    /// # async fn example() -> anyhow::Result<()> {
99    /// let cache_dir = PathBuf::from("/home/user/.agpm/cache");
100    ///
101    /// // This will block if another process has the lock
102    /// let lock = CacheLock::acquire(&cache_dir, "my-source").await?;
103    ///
104    /// // Perform cache operations safely...
105    /// println!("Lock acquired successfully!");
106    ///
107    /// // Lock is released when 'lock' variable is dropped
108    /// drop(lock);
109    /// # Ok(())
110    /// # }
111    /// ```
112    ///
113    /// Error handling for lock acquisition:
114    ///
115    /// ```rust,no_run
116    /// use agpm_cli::cache::lock::CacheLock;
117    /// use std::path::PathBuf;
118    ///
119    /// # async fn example() -> anyhow::Result<()> {
120    /// let cache_dir = PathBuf::from("/tmp/cache");
121    ///
122    /// match CacheLock::acquire(&cache_dir, "problematic-source").await {
123    ///     Ok(lock) => {
124    ///         println!("Lock acquired, proceeding with operations");
125    ///         // Use lock...
126    ///     }
127    ///     Err(e) => {
128    ///         eprintln!("Failed to acquire lock: {}", e);
129    ///         eprintln!("Another process may be using this source");
130    ///         return Err(e);
131    ///     }
132    /// }
133    /// # Ok(())
134    /// # }
135    /// ```
136    pub async fn acquire(cache_dir: &Path, source_name: &str) -> Result<Self> {
137        // Create lock file path: ~/.agpm/cache/.locks/source_name.lock
138        let locks_dir = cache_dir.join(".locks");
139        tokio::fs::create_dir_all(&locks_dir).await.map_err(|e| {
140            if e.kind() == std::io::ErrorKind::NotADirectory {
141                anyhow::anyhow!(
142                    "Cannot create directory: cache path is not a directory ({})",
143                    cache_dir.display()
144                )
145            } else if e.kind() == std::io::ErrorKind::PermissionDenied {
146                anyhow::anyhow!(
147                    "Permission denied: cannot create locks directory at {}",
148                    locks_dir.display()
149                )
150            } else if e.raw_os_error() == Some(28) {
151                // ENOSPC on Unix
152                anyhow::anyhow!("No space left on device to create locks directory")
153            } else {
154                anyhow::anyhow!("Failed to create directory {}: {}", locks_dir.display(), e)
155            }
156        })?;
157
158        let lock_path = locks_dir.join(format!("{source_name}.lock"));
159        let lock_path_clone = lock_path.clone();
160        let source_name = source_name.to_string();
161
162        // Use spawn_blocking to perform blocking file lock operations
163        // This prevents blocking the tokio runtime
164        let file = tokio::task::spawn_blocking(move || -> Result<File> {
165            // Open or create the lock file
166            let file = OpenOptions::new()
167                .create(true)
168                .write(true)
169                .truncate(true)
170                .open(&lock_path_clone)
171                .with_context(|| {
172                    format!("Failed to open lock file: {}", lock_path_clone.display())
173                })?;
174
175            // Try to acquire exclusive lock (blocking)
176            file.lock_exclusive()
177                .with_context(|| format!("Failed to acquire lock for: {source_name}"))?;
178
179            Ok(file)
180        })
181        .await
182        .context("Failed to spawn blocking task for lock acquisition")??;
183
184        Ok(Self {
185            _file: file,
186            path: lock_path,
187        })
188    }
189}
190
191impl Drop for CacheLock {
192    fn drop(&mut self) {
193        // Lock is automatically released when file is closed (on Drop)
194        // But we can explicitly unlock for clarity
195        #[allow(unstable_name_collisions)]
196        if let Err(e) = self._file.unlock() {
197            eprintln!("Warning: Failed to unlock {}: {}", self.path.display(), e);
198        }
199    }
200}
201
202/// Cleans up stale lock files in the cache directory.
203///
204/// This function removes lock files that are older than the specified TTL (time-to-live)
205/// in seconds. Lock files can become stale if a process crashes without properly releasing
206/// its locks. This cleanup helps prevent lock file accumulation over time.
207///
208/// # Parameters
209///
210/// * `cache_dir` - Root cache directory containing the `.locks/` subdirectory
211/// * `ttl_seconds` - Maximum age in seconds for lock files (e.g., 3600 for 1 hour)
212///
213/// # Returns
214///
215/// Returns the number of stale lock files that were removed.
216///
217/// # Example
218///
219/// ```rust,no_run
220/// use agpm_cli::cache::lock::cleanup_stale_locks;
221/// use std::path::PathBuf;
222///
223/// # async fn example() -> anyhow::Result<()> {
224/// let cache_dir = PathBuf::from("/home/user/.agpm/cache");
225/// // Clean up lock files older than 1 hour
226/// let removed = cleanup_stale_locks(&cache_dir, 3600).await?;
227/// println!("Removed {} stale lock files", removed);
228/// # Ok(())
229/// # }
230/// ```
231pub async fn cleanup_stale_locks(cache_dir: &Path, ttl_seconds: u64) -> Result<usize> {
232    use std::time::{Duration, SystemTime};
233    use tokio::fs;
234
235    let locks_dir = cache_dir.join(".locks");
236    if !locks_dir.exists() {
237        return Ok(0);
238    }
239
240    let mut removed_count = 0;
241    let now = SystemTime::now();
242    let ttl_duration = Duration::from_secs(ttl_seconds);
243
244    let mut entries = fs::read_dir(&locks_dir).await.context("Failed to read locks directory")?;
245
246    while let Some(entry) = entries.next_entry().await? {
247        let path = entry.path();
248
249        // Only process .lock files
250        if path.extension().and_then(|s| s.to_str()) != Some("lock") {
251            continue;
252        }
253
254        // Check file age
255        let metadata = match fs::metadata(&path).await {
256            Ok(m) => m,
257            Err(_) => continue, // Skip if we can't read metadata
258        };
259
260        let modified = match metadata.modified() {
261            Ok(t) => t,
262            Err(_) => continue, // Skip if we can't get modification time
263        };
264
265        // Remove if older than TTL
266        if let Ok(age) = now.duration_since(modified)
267            && age > ttl_duration
268        {
269            // Try to remove the file (it might be locked by another process)
270            if fs::remove_file(&path).await.is_ok() {
271                removed_count += 1;
272            }
273        }
274    }
275
276    Ok(removed_count)
277}
278
279#[cfg(test)]
280mod tests {
281    use super::*;
282    use tempfile::TempDir;
283
284    #[tokio::test]
285    async fn test_cache_lock_acquire_and_release() {
286        let temp_dir = TempDir::new().unwrap();
287        let cache_dir = temp_dir.path();
288
289        // Acquire lock
290        let lock = CacheLock::acquire(cache_dir, "test_source").await.unwrap();
291
292        // Verify lock file was created
293        let lock_path = cache_dir.join(".locks").join("test_source.lock");
294        assert!(lock_path.exists());
295
296        // Drop the lock
297        drop(lock);
298
299        // Lock file should still exist (we don't delete it)
300        assert!(lock_path.exists());
301    }
302
303    #[tokio::test]
304    async fn test_cache_lock_creates_locks_directory() {
305        let temp_dir = TempDir::new().unwrap();
306        let cache_dir = temp_dir.path();
307
308        // Locks directory shouldn't exist initially
309        let locks_dir = cache_dir.join(".locks");
310        assert!(!locks_dir.exists());
311
312        // Acquire lock - should create directory
313        let _lock = CacheLock::acquire(cache_dir, "test").await.unwrap();
314
315        // Verify locks directory was created
316        assert!(locks_dir.exists());
317        assert!(locks_dir.is_dir());
318    }
319
320    #[tokio::test]
321    async fn test_cache_lock_exclusive_blocking() {
322        use std::sync::Arc;
323        use std::time::{Duration, Instant};
324        use tokio::sync::Barrier;
325
326        let temp_dir = TempDir::new().unwrap();
327        let cache_dir = Arc::new(temp_dir.path().to_path_buf());
328        let barrier = Arc::new(Barrier::new(2));
329
330        let cache_dir1 = cache_dir.clone();
331        let barrier1 = barrier.clone();
332
333        // Task 1: Acquire lock and hold it
334        let handle1 = tokio::spawn(async move {
335            let _lock = CacheLock::acquire(&cache_dir1, "exclusive_test").await.unwrap();
336            barrier1.wait().await; // Signal that lock is acquired
337            tokio::time::sleep(Duration::from_millis(100)).await; // Hold lock
338            // Lock released on drop
339        });
340
341        let cache_dir2 = cache_dir.clone();
342
343        // Task 2: Try to acquire same lock (should block)
344        let handle2 = tokio::spawn(async move {
345            barrier.wait().await; // Wait for first task to acquire lock
346            let start = Instant::now();
347            let _lock = CacheLock::acquire(&cache_dir2, "exclusive_test").await.unwrap();
348            let elapsed = start.elapsed();
349
350            // Should have blocked for at least 50ms (less than 100ms due to timing)
351            assert!(elapsed >= Duration::from_millis(50));
352        });
353
354        handle1.await.unwrap();
355        handle2.await.unwrap();
356    }
357
358    #[tokio::test]
359    async fn test_cache_lock_different_sources_dont_block() {
360        use std::sync::Arc;
361        use std::time::{Duration, Instant};
362        use tokio::sync::Barrier;
363
364        let temp_dir = TempDir::new().unwrap();
365        let cache_dir = Arc::new(temp_dir.path().to_path_buf());
366        let barrier = Arc::new(Barrier::new(2));
367
368        let cache_dir1 = cache_dir.clone();
369        let barrier1 = barrier.clone();
370
371        // Task 1: Lock source1
372        let handle1 = tokio::spawn(async move {
373            let _lock = CacheLock::acquire(&cache_dir1, "source1").await.unwrap();
374            barrier1.wait().await;
375            tokio::time::sleep(Duration::from_millis(100)).await;
376        });
377
378        let cache_dir2 = cache_dir.clone();
379
380        // Task 2: Lock source2 (different source, shouldn't block)
381        let handle2 = tokio::spawn(async move {
382            barrier.wait().await;
383            let start = Instant::now();
384            let _lock = CacheLock::acquire(&cache_dir2, "source2").await.unwrap();
385            let elapsed = start.elapsed();
386
387            // Should not block (complete quickly)
388            // Increased timeout for slower systems while still ensuring no blocking
389            assert!(
390                elapsed < Duration::from_millis(200),
391                "Lock acquisition took {:?}, expected < 200ms for non-blocking operation",
392                elapsed
393            );
394        });
395
396        handle1.await.unwrap();
397        handle2.await.unwrap();
398    }
399
400    #[tokio::test]
401    async fn test_cache_lock_path_with_special_characters() {
402        let temp_dir = TempDir::new().unwrap();
403        let cache_dir = temp_dir.path();
404
405        // Test with various special characters in source name
406        let special_names = vec![
407            "source-with-dash",
408            "source_with_underscore",
409            "source.with.dots",
410            "source@special",
411        ];
412
413        for name in special_names {
414            let lock = CacheLock::acquire(cache_dir, name).await.unwrap();
415            let expected_path = cache_dir.join(".locks").join(format!("{name}.lock"));
416            assert!(expected_path.exists());
417            drop(lock);
418        }
419    }
420}