agpm_cli/cache/
lock.rs

1//! File locking utilities for cache operations.
2//!
3//! This module provides thread-safe and process-safe file locking for cache directories
4//! to prevent corruption during concurrent cache operations. The locks are automatically
5//! released when the lock object is dropped.
6
7use anyhow::{Context, Result};
8use fs4::fs_std::FileExt;
9use std::fs::{File, OpenOptions};
10use std::path::Path;
11
12/// A file lock for cache operations
13pub struct CacheLock {
14    _file: File,
15}
16
17impl CacheLock {
18    /// Acquires an exclusive lock for a specific source in the cache directory.
19    ///
20    /// This async method creates and acquires an exclusive file lock for the specified
21    /// source name. The file locking operation uses `spawn_blocking` internally to avoid
22    /// blocking the tokio runtime, while still providing blocking file lock semantics.
23    ///
24    /// # Lock File Management
25    ///
26    /// The method performs several setup operations:
27    /// 1. **Locks directory creation**: Creates `.locks/` directory if needed
28    /// 2. **Lock file creation**: Creates `{source_name}.lock` file
29    /// 3. **Exclusive locking**: Acquires exclusive access via OS file locking
30    /// 4. **Handle retention**: Keeps file handle open to maintain lock
31    ///
32    /// # Async and Blocking Behavior
33    ///
34    /// If another process already holds a lock for the same source:
35    /// - **Async-friendly**: Uses `spawn_blocking` to avoid blocking the tokio runtime
36    /// - **Blocking wait**: The spawned task blocks until other lock is released
37    /// - **Fair queuing**: Locks are typically acquired in FIFO order
38    /// - **No timeout**: Task will wait indefinitely (use with caution)
39    /// - **Interruptible**: Can be interrupted by process signals
40    ///
41    /// # Lock File Location
42    ///
43    /// Lock files are created in a dedicated subdirectory:
44    /// ```text
45    /// {cache_dir}/.locks/{source_name}.lock
46    /// ```
47    ///
48    /// Examples:
49    /// - `~/.agpm/cache/.locks/community.lock`
50    /// - `~/.agpm/cache/.locks/work-tools.lock`
51    /// - `~/.agpm/cache/.locks/my-project.lock`
52    ///
53    /// # Parameters
54    ///
55    /// * `cache_dir` - Root cache directory path
56    /// * `source_name` - Unique identifier for the source being locked
57    ///
58    /// # Returns
59    ///
60    /// Returns a `CacheLock` instance that holds the exclusive lock. The lock
61    /// remains active until the returned instance is dropped.
62    ///
63    /// # Errors
64    ///
65    /// The method can fail for several reasons:
66    ///
67    /// ## Directory Creation Errors
68    /// - Permission denied creating `.locks/` directory
69    /// - Disk space exhausted
70    /// - Path length exceeds system limits
71    ///
72    /// ## File Operation Errors
73    /// - Permission denied creating/opening lock file
74    /// - File system full
75    /// - Invalid characters in source name
76    ///
77    /// ## Locking Errors
78    /// - File locking not supported by file system
79    /// - Lock file corrupted or in invalid state
80    /// - System resource limits exceeded
81    ///
82    /// # Platform Considerations
83    ///
84    /// - **Windows**: Uses Win32 `LockFile` API via [`fs4`]
85    /// - **Unix**: Uses POSIX `fcntl()` locking via [`fs4`]
86    /// - **NFS/Network**: Behavior depends on file system support
87    /// - **Docker**: Works within containers with proper volume mounts
88    ///
89    /// # Examples
90    ///
91    /// Simple lock acquisition:
92    ///
93    /// ```rust,no_run
94    /// use agpm_cli::cache::lock::CacheLock;
95    /// use std::path::PathBuf;
96    ///
97    /// # async fn example() -> anyhow::Result<()> {
98    /// let cache_dir = PathBuf::from("/home/user/.agpm/cache");
99    ///
100    /// // This will block if another process has the lock
101    /// let lock = CacheLock::acquire(&cache_dir, "my-source").await?;
102    ///
103    /// // Perform cache operations safely...
104    /// println!("Lock acquired successfully!");
105    ///
106    /// // Lock is released when 'lock' variable is dropped
107    /// drop(lock);
108    /// # Ok(())
109    /// # }
110    /// ```
111    ///
112    /// Error handling for lock acquisition:
113    ///
114    /// ```rust,no_run
115    /// use agpm_cli::cache::lock::CacheLock;
116    /// use std::path::PathBuf;
117    ///
118    /// # async fn example() -> anyhow::Result<()> {
119    /// let cache_dir = PathBuf::from("/tmp/cache");
120    ///
121    /// match CacheLock::acquire(&cache_dir, "problematic-source").await {
122    ///     Ok(lock) => {
123    ///         println!("Lock acquired, proceeding with operations");
124    ///         // Use lock...
125    ///     }
126    ///     Err(e) => {
127    ///         eprintln!("Failed to acquire lock: {}", e);
128    ///         eprintln!("Another process may be using this source");
129    ///         return Err(e);
130    ///     }
131    /// }
132    /// # Ok(())
133    /// # }
134    /// ```
135    pub async fn acquire(cache_dir: &Path, source_name: &str) -> Result<Self> {
136        // Create lock file path: ~/.agpm/cache/.locks/source_name.lock
137        let locks_dir = cache_dir.join(".locks");
138        tokio::fs::create_dir_all(&locks_dir).await.map_err(|e| {
139            if e.kind() == std::io::ErrorKind::NotADirectory {
140                anyhow::anyhow!(
141                    "Cannot create directory: cache path is not a directory ({})",
142                    cache_dir.display()
143                )
144            } else if e.kind() == std::io::ErrorKind::PermissionDenied {
145                anyhow::anyhow!(
146                    "Permission denied: cannot create locks directory at {}",
147                    locks_dir.display()
148                )
149            } else if e.raw_os_error() == Some(28) {
150                // ENOSPC on Unix
151                anyhow::anyhow!("No space left on device to create locks directory")
152            } else {
153                anyhow::anyhow!("Failed to create directory {}: {}", locks_dir.display(), e)
154            }
155        })?;
156
157        let lock_path = locks_dir.join(format!("{source_name}.lock"));
158        let lock_path_clone = lock_path.clone();
159        let source_name = source_name.to_string();
160
161        // Use spawn_blocking to perform blocking file lock operations
162        // This prevents blocking the tokio runtime
163        let file = tokio::task::spawn_blocking(move || -> Result<File> {
164            // Open or create the lock file
165            let file = OpenOptions::new()
166                .create(true)
167                .write(true)
168                .truncate(true)
169                .open(&lock_path_clone)
170                .with_context(|| {
171                    format!("Failed to open lock file: {}", lock_path_clone.display())
172                })?;
173
174            // Try to acquire exclusive lock (blocking)
175            file.lock_exclusive()
176                .with_context(|| format!("Failed to acquire lock for: {source_name}"))?;
177
178            Ok(file)
179        })
180        .await
181        .context("Failed to spawn blocking task for lock acquisition")??;
182
183        Ok(Self {
184            _file: file,
185        })
186    }
187}
188
189/// Cleans up stale lock files in the cache directory.
190///
191/// This function removes lock files that are older than the specified TTL (time-to-live)
192/// in seconds. Lock files can become stale if a process crashes without properly releasing
193/// its locks. This cleanup helps prevent lock file accumulation over time.
194///
195/// # Parameters
196///
197/// * `cache_dir` - Root cache directory containing the `.locks/` subdirectory
198/// * `ttl_seconds` - Maximum age in seconds for lock files (e.g., 3600 for 1 hour)
199///
200/// # Returns
201///
202/// Returns the number of stale lock files that were removed.
203///
204/// # Example
205///
206/// ```rust,no_run
207/// use agpm_cli::cache::lock::cleanup_stale_locks;
208/// use std::path::PathBuf;
209///
210/// # async fn example() -> anyhow::Result<()> {
211/// let cache_dir = PathBuf::from("/home/user/.agpm/cache");
212/// // Clean up lock files older than 1 hour
213/// let removed = cleanup_stale_locks(&cache_dir, 3600).await?;
214/// println!("Removed {} stale lock files", removed);
215/// # Ok(())
216/// # }
217/// ```
218pub async fn cleanup_stale_locks(cache_dir: &Path, ttl_seconds: u64) -> Result<usize> {
219    use std::time::{Duration, SystemTime};
220    use tokio::fs;
221
222    let locks_dir = cache_dir.join(".locks");
223    if !locks_dir.exists() {
224        return Ok(0);
225    }
226
227    let mut removed_count = 0;
228    let now = SystemTime::now();
229    let ttl_duration = Duration::from_secs(ttl_seconds);
230
231    let mut entries = fs::read_dir(&locks_dir).await.context("Failed to read locks directory")?;
232
233    while let Some(entry) = entries.next_entry().await? {
234        let path = entry.path();
235
236        // Only process .lock files
237        if path.extension().and_then(|s| s.to_str()) != Some("lock") {
238            continue;
239        }
240
241        // Check file age
242        let metadata = match fs::metadata(&path).await {
243            Ok(m) => m,
244            Err(_) => continue, // Skip if we can't read metadata
245        };
246
247        let modified = match metadata.modified() {
248            Ok(t) => t,
249            Err(_) => continue, // Skip if we can't get modification time
250        };
251
252        // Remove if older than TTL
253        if let Ok(age) = now.duration_since(modified)
254            && age > ttl_duration
255        {
256            // Try to remove the file (it might be locked by another process)
257            if fs::remove_file(&path).await.is_ok() {
258                removed_count += 1;
259            }
260        }
261    }
262
263    Ok(removed_count)
264}
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269    use tempfile::TempDir;
270
271    #[tokio::test]
272    async fn test_cache_lock_acquire_and_release() {
273        let temp_dir = TempDir::new().unwrap();
274        let cache_dir = temp_dir.path();
275
276        // Acquire lock
277        let lock = CacheLock::acquire(cache_dir, "test_source").await.unwrap();
278
279        // Verify lock file was created
280        let lock_path = cache_dir.join(".locks").join("test_source.lock");
281        assert!(lock_path.exists());
282
283        // Drop the lock
284        drop(lock);
285
286        // Lock file should still exist (we don't delete it)
287        assert!(lock_path.exists());
288    }
289
290    #[tokio::test]
291    async fn test_cache_lock_creates_locks_directory() {
292        let temp_dir = TempDir::new().unwrap();
293        let cache_dir = temp_dir.path();
294
295        // Locks directory shouldn't exist initially
296        let locks_dir = cache_dir.join(".locks");
297        assert!(!locks_dir.exists());
298
299        // Acquire lock - should create directory
300        let lock = CacheLock::acquire(cache_dir, "test").await.unwrap();
301
302        // Verify locks directory was created
303        assert!(locks_dir.exists());
304        assert!(locks_dir.is_dir());
305
306        // Explicitly drop the lock to release the file handle before TempDir cleanup
307        drop(lock);
308    }
309
310    #[tokio::test]
311    async fn test_cache_lock_exclusive_blocking() {
312        use std::sync::Arc;
313        use std::time::{Duration, Instant};
314        use tokio::sync::Barrier;
315
316        let temp_dir = TempDir::new().unwrap();
317        let cache_dir = Arc::new(temp_dir.path().to_path_buf());
318        let barrier = Arc::new(Barrier::new(2));
319
320        let cache_dir1 = cache_dir.clone();
321        let barrier1 = barrier.clone();
322
323        // Task 1: Acquire lock and hold it
324        let handle1 = tokio::spawn(async move {
325            let _lock = CacheLock::acquire(&cache_dir1, "exclusive_test").await.unwrap();
326            barrier1.wait().await; // Signal that lock is acquired
327            tokio::time::sleep(Duration::from_millis(100)).await; // Hold lock
328            // Lock released on drop
329        });
330
331        let cache_dir2 = cache_dir.clone();
332
333        // Task 2: Try to acquire same lock (should block)
334        let handle2 = tokio::spawn(async move {
335            barrier.wait().await; // Wait for first task to acquire lock
336            let start = Instant::now();
337            let _lock = CacheLock::acquire(&cache_dir2, "exclusive_test").await.unwrap();
338            let elapsed = start.elapsed();
339
340            // Should have blocked for at least 50ms (less than 100ms due to timing)
341            assert!(elapsed >= Duration::from_millis(50));
342        });
343
344        handle1.await.unwrap();
345        handle2.await.unwrap();
346    }
347
348    #[tokio::test]
349    async fn test_cache_lock_different_sources_dont_block() {
350        use std::sync::Arc;
351        use std::time::{Duration, Instant};
352        use tokio::sync::Barrier;
353
354        let temp_dir = TempDir::new().unwrap();
355        let cache_dir = Arc::new(temp_dir.path().to_path_buf());
356        let barrier = Arc::new(Barrier::new(2));
357
358        let cache_dir1 = cache_dir.clone();
359        let barrier1 = barrier.clone();
360
361        // Task 1: Lock source1
362        let handle1 = tokio::spawn(async move {
363            let _lock = CacheLock::acquire(&cache_dir1, "source1").await.unwrap();
364            barrier1.wait().await;
365            tokio::time::sleep(Duration::from_millis(100)).await;
366        });
367
368        let cache_dir2 = cache_dir.clone();
369
370        // Task 2: Lock source2 (different source, shouldn't block)
371        let handle2 = tokio::spawn(async move {
372            barrier.wait().await;
373            let start = Instant::now();
374            let _lock = CacheLock::acquire(&cache_dir2, "source2").await.unwrap();
375            let elapsed = start.elapsed();
376
377            // Should not block (complete quickly)
378            // Increased timeout for slower systems while still ensuring no blocking
379            assert!(
380                elapsed < Duration::from_millis(200),
381                "Lock acquisition took {:?}, expected < 200ms for non-blocking operation",
382                elapsed
383            );
384        });
385
386        handle1.await.unwrap();
387        handle2.await.unwrap();
388    }
389
390    #[tokio::test]
391    async fn test_cache_lock_path_with_special_characters() {
392        let temp_dir = TempDir::new().unwrap();
393        let cache_dir = temp_dir.path();
394
395        // Test with various special characters in source name
396        let special_names = vec![
397            "source-with-dash",
398            "source_with_underscore",
399            "source.with.dots",
400            "source@special",
401        ];
402
403        for name in special_names {
404            let lock = CacheLock::acquire(cache_dir, name).await.unwrap();
405            let expected_path = cache_dir.join(".locks").join(format!("{name}.lock"));
406            assert!(expected_path.exists());
407            drop(lock);
408        }
409    }
410}