agpm_cli/cache/lock.rs
1//! File locking utilities for cache operations.
2//!
3//! This module provides thread-safe and process-safe file locking for cache directories
4//! to prevent corruption during concurrent cache operations. The locks are automatically
5//! released when the lock object is dropped.
6
7use anyhow::{Context, Result};
8use fs4::fs_std::FileExt;
9use std::fs::{File, OpenOptions};
10use std::path::Path;
11
12/// A file lock for cache operations
13pub struct CacheLock {
14 _file: File,
15}
16
17impl CacheLock {
18 /// Acquires an exclusive lock for a specific source in the cache directory.
19 ///
20 /// This async method creates and acquires an exclusive file lock for the specified
21 /// source name. The file locking operation uses `spawn_blocking` internally to avoid
22 /// blocking the tokio runtime, while still providing blocking file lock semantics.
23 ///
24 /// # Lock File Management
25 ///
26 /// The method performs several setup operations:
27 /// 1. **Locks directory creation**: Creates `.locks/` directory if needed
28 /// 2. **Lock file creation**: Creates `{source_name}.lock` file
29 /// 3. **Exclusive locking**: Acquires exclusive access via OS file locking
30 /// 4. **Handle retention**: Keeps file handle open to maintain lock
31 ///
32 /// # Async and Blocking Behavior
33 ///
34 /// If another process already holds a lock for the same source:
35 /// - **Async-friendly**: Uses `spawn_blocking` to avoid blocking the tokio runtime
36 /// - **Blocking wait**: The spawned task blocks until other lock is released
37 /// - **Fair queuing**: Locks are typically acquired in FIFO order
38 /// - **No timeout**: Task will wait indefinitely (use with caution)
39 /// - **Interruptible**: Can be interrupted by process signals
40 ///
41 /// # Lock File Location
42 ///
43 /// Lock files are created in a dedicated subdirectory:
44 /// ```text
45 /// {cache_dir}/.locks/{source_name}.lock
46 /// ```
47 ///
48 /// Examples:
49 /// - `~/.agpm/cache/.locks/community.lock`
50 /// - `~/.agpm/cache/.locks/work-tools.lock`
51 /// - `~/.agpm/cache/.locks/my-project.lock`
52 ///
53 /// # Parameters
54 ///
55 /// * `cache_dir` - Root cache directory path
56 /// * `source_name` - Unique identifier for the source being locked
57 ///
58 /// # Returns
59 ///
60 /// Returns a `CacheLock` instance that holds the exclusive lock. The lock
61 /// remains active until the returned instance is dropped.
62 ///
63 /// # Errors
64 ///
65 /// The method can fail for several reasons:
66 ///
67 /// ## Directory Creation Errors
68 /// - Permission denied creating `.locks/` directory
69 /// - Disk space exhausted
70 /// - Path length exceeds system limits
71 ///
72 /// ## File Operation Errors
73 /// - Permission denied creating/opening lock file
74 /// - File system full
75 /// - Invalid characters in source name
76 ///
77 /// ## Locking Errors
78 /// - File locking not supported by file system
79 /// - Lock file corrupted or in invalid state
80 /// - System resource limits exceeded
81 ///
82 /// # Platform Considerations
83 ///
84 /// - **Windows**: Uses Win32 `LockFile` API via [`fs4`]
85 /// - **Unix**: Uses POSIX `fcntl()` locking via [`fs4`]
86 /// - **NFS/Network**: Behavior depends on file system support
87 /// - **Docker**: Works within containers with proper volume mounts
88 ///
89 /// # Examples
90 ///
91 /// Simple lock acquisition:
92 ///
93 /// ```rust,no_run
94 /// use agpm_cli::cache::lock::CacheLock;
95 /// use std::path::PathBuf;
96 ///
97 /// # async fn example() -> anyhow::Result<()> {
98 /// let cache_dir = PathBuf::from("/home/user/.agpm/cache");
99 ///
100 /// // This will block if another process has the lock
101 /// let lock = CacheLock::acquire(&cache_dir, "my-source").await?;
102 ///
103 /// // Perform cache operations safely...
104 /// println!("Lock acquired successfully!");
105 ///
106 /// // Lock is released when 'lock' variable is dropped
107 /// drop(lock);
108 /// # Ok(())
109 /// # }
110 /// ```
111 ///
112 /// Error handling for lock acquisition:
113 ///
114 /// ```rust,no_run
115 /// use agpm_cli::cache::lock::CacheLock;
116 /// use std::path::PathBuf;
117 ///
118 /// # async fn example() -> anyhow::Result<()> {
119 /// let cache_dir = PathBuf::from("/tmp/cache");
120 ///
121 /// match CacheLock::acquire(&cache_dir, "problematic-source").await {
122 /// Ok(lock) => {
123 /// println!("Lock acquired, proceeding with operations");
124 /// // Use lock...
125 /// }
126 /// Err(e) => {
127 /// eprintln!("Failed to acquire lock: {}", e);
128 /// eprintln!("Another process may be using this source");
129 /// return Err(e);
130 /// }
131 /// }
132 /// # Ok(())
133 /// # }
134 /// ```
135 pub async fn acquire(cache_dir: &Path, source_name: &str) -> Result<Self> {
136 // Create lock file path: ~/.agpm/cache/.locks/source_name.lock
137 let locks_dir = cache_dir.join(".locks");
138 tokio::fs::create_dir_all(&locks_dir).await.map_err(|e| {
139 if e.kind() == std::io::ErrorKind::NotADirectory {
140 anyhow::anyhow!(
141 "Cannot create directory: cache path is not a directory ({})",
142 cache_dir.display()
143 )
144 } else if e.kind() == std::io::ErrorKind::PermissionDenied {
145 anyhow::anyhow!(
146 "Permission denied: cannot create locks directory at {}",
147 locks_dir.display()
148 )
149 } else if e.raw_os_error() == Some(28) {
150 // ENOSPC on Unix
151 anyhow::anyhow!("No space left on device to create locks directory")
152 } else {
153 anyhow::anyhow!("Failed to create directory {}: {}", locks_dir.display(), e)
154 }
155 })?;
156
157 let lock_path = locks_dir.join(format!("{source_name}.lock"));
158 let lock_path_clone = lock_path.clone();
159 let source_name = source_name.to_string();
160
161 // Use spawn_blocking to perform blocking file lock operations
162 // This prevents blocking the tokio runtime
163 let file = tokio::task::spawn_blocking(move || -> Result<File> {
164 // Open or create the lock file
165 let file = OpenOptions::new()
166 .create(true)
167 .write(true)
168 .truncate(true)
169 .open(&lock_path_clone)
170 .with_context(|| {
171 format!("Failed to open lock file: {}", lock_path_clone.display())
172 })?;
173
174 // Try to acquire exclusive lock (blocking)
175 file.lock_exclusive()
176 .with_context(|| format!("Failed to acquire lock for: {source_name}"))?;
177
178 Ok(file)
179 })
180 .await
181 .context("Failed to spawn blocking task for lock acquisition")??;
182
183 Ok(Self {
184 _file: file,
185 })
186 }
187}
188
189/// Cleans up stale lock files in the cache directory.
190///
191/// This function removes lock files that are older than the specified TTL (time-to-live)
192/// in seconds. Lock files can become stale if a process crashes without properly releasing
193/// its locks. This cleanup helps prevent lock file accumulation over time.
194///
195/// # Parameters
196///
197/// * `cache_dir` - Root cache directory containing the `.locks/` subdirectory
198/// * `ttl_seconds` - Maximum age in seconds for lock files (e.g., 3600 for 1 hour)
199///
200/// # Returns
201///
202/// Returns the number of stale lock files that were removed.
203///
204/// # Example
205///
206/// ```rust,no_run
207/// use agpm_cli::cache::lock::cleanup_stale_locks;
208/// use std::path::PathBuf;
209///
210/// # async fn example() -> anyhow::Result<()> {
211/// let cache_dir = PathBuf::from("/home/user/.agpm/cache");
212/// // Clean up lock files older than 1 hour
213/// let removed = cleanup_stale_locks(&cache_dir, 3600).await?;
214/// println!("Removed {} stale lock files", removed);
215/// # Ok(())
216/// # }
217/// ```
218pub async fn cleanup_stale_locks(cache_dir: &Path, ttl_seconds: u64) -> Result<usize> {
219 use std::time::{Duration, SystemTime};
220 use tokio::fs;
221
222 let locks_dir = cache_dir.join(".locks");
223 if !locks_dir.exists() {
224 return Ok(0);
225 }
226
227 let mut removed_count = 0;
228 let now = SystemTime::now();
229 let ttl_duration = Duration::from_secs(ttl_seconds);
230
231 let mut entries = fs::read_dir(&locks_dir).await.context("Failed to read locks directory")?;
232
233 while let Some(entry) = entries.next_entry().await? {
234 let path = entry.path();
235
236 // Only process .lock files
237 if path.extension().and_then(|s| s.to_str()) != Some("lock") {
238 continue;
239 }
240
241 // Check file age
242 let metadata = match fs::metadata(&path).await {
243 Ok(m) => m,
244 Err(_) => continue, // Skip if we can't read metadata
245 };
246
247 let modified = match metadata.modified() {
248 Ok(t) => t,
249 Err(_) => continue, // Skip if we can't get modification time
250 };
251
252 // Remove if older than TTL
253 if let Ok(age) = now.duration_since(modified)
254 && age > ttl_duration
255 {
256 // Try to remove the file (it might be locked by another process)
257 if fs::remove_file(&path).await.is_ok() {
258 removed_count += 1;
259 }
260 }
261 }
262
263 Ok(removed_count)
264}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269 use tempfile::TempDir;
270
271 #[tokio::test]
272 async fn test_cache_lock_acquire_and_release() {
273 let temp_dir = TempDir::new().unwrap();
274 let cache_dir = temp_dir.path();
275
276 // Acquire lock
277 let lock = CacheLock::acquire(cache_dir, "test_source").await.unwrap();
278
279 // Verify lock file was created
280 let lock_path = cache_dir.join(".locks").join("test_source.lock");
281 assert!(lock_path.exists());
282
283 // Drop the lock
284 drop(lock);
285
286 // Lock file should still exist (we don't delete it)
287 assert!(lock_path.exists());
288 }
289
290 #[tokio::test]
291 async fn test_cache_lock_creates_locks_directory() {
292 let temp_dir = TempDir::new().unwrap();
293 let cache_dir = temp_dir.path();
294
295 // Locks directory shouldn't exist initially
296 let locks_dir = cache_dir.join(".locks");
297 assert!(!locks_dir.exists());
298
299 // Acquire lock - should create directory
300 let _lock = CacheLock::acquire(cache_dir, "test").await.unwrap();
301
302 // Verify locks directory was created
303 assert!(locks_dir.exists());
304 assert!(locks_dir.is_dir());
305 }
306
307 #[tokio::test]
308 async fn test_cache_lock_exclusive_blocking() {
309 use std::sync::Arc;
310 use std::time::{Duration, Instant};
311 use tokio::sync::Barrier;
312
313 let temp_dir = TempDir::new().unwrap();
314 let cache_dir = Arc::new(temp_dir.path().to_path_buf());
315 let barrier = Arc::new(Barrier::new(2));
316
317 let cache_dir1 = cache_dir.clone();
318 let barrier1 = barrier.clone();
319
320 // Task 1: Acquire lock and hold it
321 let handle1 = tokio::spawn(async move {
322 let _lock = CacheLock::acquire(&cache_dir1, "exclusive_test").await.unwrap();
323 barrier1.wait().await; // Signal that lock is acquired
324 tokio::time::sleep(Duration::from_millis(100)).await; // Hold lock
325 // Lock released on drop
326 });
327
328 let cache_dir2 = cache_dir.clone();
329
330 // Task 2: Try to acquire same lock (should block)
331 let handle2 = tokio::spawn(async move {
332 barrier.wait().await; // Wait for first task to acquire lock
333 let start = Instant::now();
334 let _lock = CacheLock::acquire(&cache_dir2, "exclusive_test").await.unwrap();
335 let elapsed = start.elapsed();
336
337 // Should have blocked for at least 50ms (less than 100ms due to timing)
338 assert!(elapsed >= Duration::from_millis(50));
339 });
340
341 handle1.await.unwrap();
342 handle2.await.unwrap();
343 }
344
345 #[tokio::test]
346 async fn test_cache_lock_different_sources_dont_block() {
347 use std::sync::Arc;
348 use std::time::{Duration, Instant};
349 use tokio::sync::Barrier;
350
351 let temp_dir = TempDir::new().unwrap();
352 let cache_dir = Arc::new(temp_dir.path().to_path_buf());
353 let barrier = Arc::new(Barrier::new(2));
354
355 let cache_dir1 = cache_dir.clone();
356 let barrier1 = barrier.clone();
357
358 // Task 1: Lock source1
359 let handle1 = tokio::spawn(async move {
360 let _lock = CacheLock::acquire(&cache_dir1, "source1").await.unwrap();
361 barrier1.wait().await;
362 tokio::time::sleep(Duration::from_millis(100)).await;
363 });
364
365 let cache_dir2 = cache_dir.clone();
366
367 // Task 2: Lock source2 (different source, shouldn't block)
368 let handle2 = tokio::spawn(async move {
369 barrier.wait().await;
370 let start = Instant::now();
371 let _lock = CacheLock::acquire(&cache_dir2, "source2").await.unwrap();
372 let elapsed = start.elapsed();
373
374 // Should not block (complete quickly)
375 // Increased timeout for slower systems while still ensuring no blocking
376 assert!(
377 elapsed < Duration::from_millis(200),
378 "Lock acquisition took {:?}, expected < 200ms for non-blocking operation",
379 elapsed
380 );
381 });
382
383 handle1.await.unwrap();
384 handle2.await.unwrap();
385 }
386
387 #[tokio::test]
388 async fn test_cache_lock_path_with_special_characters() {
389 let temp_dir = TempDir::new().unwrap();
390 let cache_dir = temp_dir.path();
391
392 // Test with various special characters in source name
393 let special_names = vec![
394 "source-with-dash",
395 "source_with_underscore",
396 "source.with.dots",
397 "source@special",
398 ];
399
400 for name in special_names {
401 let lock = CacheLock::acquire(cache_dir, name).await.unwrap();
402 let expected_path = cache_dir.join(".locks").join(format!("{name}.lock"));
403 assert!(expected_path.exists());
404 drop(lock);
405 }
406 }
407}