cuenv_ci/executor/
lock.rs

1//! Concurrency Control
2//!
3//! Provides distributed locking for task concurrency groups to ensure
4//! serialized execution of deployment tasks.
5
6use std::fs::{self, File, OpenOptions};
7use std::io::{self, Read, Write};
8use std::path::{Path, PathBuf};
9use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
10use thiserror::Error;
11
12/// Default lock timeout (5 minutes)
13pub const DEFAULT_LOCK_TIMEOUT: Duration = Duration::from_secs(300);
14
15/// Default stale lock threshold (10 minutes)
16pub const STALE_LOCK_THRESHOLD: Duration = Duration::from_secs(600);
17
18/// Lock acquisition poll interval
19const POLL_INTERVAL: Duration = Duration::from_millis(500);
20
21/// Errors for lock operations
22#[derive(Debug, Error)]
23pub enum LockError {
24    /// Lock acquisition timed out
25    #[error("Lock acquisition timed out for group '{group}' after {timeout_secs}s")]
26    Timeout { group: String, timeout_secs: u64 },
27
28    /// Lock file IO error
29    #[error("Lock file error for group '{group}': {source}")]
30    Io {
31        group: String,
32        #[source]
33        source: io::Error,
34    },
35
36    /// Lock directory creation failed
37    #[error("Failed to create lock directory: {0}")]
38    DirectoryCreation(io::Error),
39
40    /// Lock is held by another process
41    #[error("Lock held by process {pid} (acquired {age_secs}s ago)")]
42    HeldByOther { pid: u32, age_secs: u64 },
43}
44
45/// Lock metadata stored in lock file
46#[derive(Debug, Clone)]
47pub struct LockMetadata {
48    /// Process ID that holds the lock
49    pub pid: u32,
50    /// Timestamp when lock was acquired
51    pub acquired_at: u64,
52    /// Task ID that holds the lock
53    pub task_id: String,
54}
55
56impl LockMetadata {
57    fn serialize(&self) -> String {
58        format!("{}:{}:{}", self.pid, self.acquired_at, self.task_id)
59    }
60
61    fn deserialize(s: &str) -> Option<Self> {
62        let parts: Vec<&str> = s.splitn(3, ':').collect();
63        if parts.len() != 3 {
64            return None;
65        }
66        Some(Self {
67            pid: parts[0].parse().ok()?,
68            acquired_at: parts[1].parse().ok()?,
69            task_id: parts[2].to_string(),
70        })
71    }
72}
73
74/// Configuration for concurrency lock
75#[derive(Debug, Clone)]
76pub struct LockConfig {
77    /// Directory to store lock files
78    pub lock_dir: PathBuf,
79    /// Maximum time to wait for lock acquisition
80    pub timeout: Duration,
81    /// Threshold for considering a lock stale
82    pub stale_threshold: Duration,
83}
84
85impl Default for LockConfig {
86    fn default() -> Self {
87        Self {
88            lock_dir: PathBuf::from(".cuenv/locks"),
89            timeout: DEFAULT_LOCK_TIMEOUT,
90            stale_threshold: STALE_LOCK_THRESHOLD,
91        }
92    }
93}
94
95/// Concurrency lock manager
96///
97/// Provides file-based locking for concurrency groups. Locks are automatically
98/// released when the guard is dropped.
99///
100/// # Stale Lock Detection
101///
102/// Locks are considered stale if their age (based on the `acquired_at` timestamp
103/// stored in the lock file) exceeds the configured `stale_threshold`. This approach
104/// has a limitation: if the lock holder process crashes without releasing the lock,
105/// the staleness is determined by the original acquisition time, not by process
106/// liveness. This means a lock may be held longer than expected if the holder
107/// process hangs but doesn't crash.
108///
109/// For truly distributed scenarios, consider integrating with a proper distributed
110/// lock service (e.g., etcd, Redis, or cloud-native locking APIs).
111#[derive(Debug)]
112pub struct ConcurrencyLock {
113    config: LockConfig,
114}
115
116impl ConcurrencyLock {
117    /// Create a new lock manager with default configuration
118    #[must_use]
119    pub fn new() -> Self {
120        Self {
121            config: LockConfig::default(),
122        }
123    }
124
125    /// Create a lock manager with custom configuration
126    #[must_use]
127    pub fn with_config(config: LockConfig) -> Self {
128        Self { config }
129    }
130
131    /// Set the lock directory
132    #[must_use]
133    pub fn with_lock_dir(mut self, dir: impl Into<PathBuf>) -> Self {
134        self.config.lock_dir = dir.into();
135        self
136    }
137
138    /// Set the lock acquisition timeout
139    #[must_use]
140    pub fn with_timeout(mut self, timeout: Duration) -> Self {
141        self.config.timeout = timeout;
142        self
143    }
144
145    /// Acquire a lock for the given concurrency group
146    ///
147    /// Blocks until the lock is acquired or timeout is reached.
148    ///
149    /// # Arguments
150    /// * `group` - Concurrency group name
151    /// * `task_id` - Task ID acquiring the lock (for diagnostics)
152    ///
153    /// # Errors
154    /// Returns error if lock cannot be acquired within timeout
155    pub async fn acquire(&self, group: &str, task_id: &str) -> Result<LockGuard, LockError> {
156        // Ensure lock directory exists
157        fs::create_dir_all(&self.config.lock_dir).map_err(LockError::DirectoryCreation)?;
158
159        let lock_path = self.lock_path(group);
160        let start = Instant::now();
161        let pid = std::process::id();
162        let metadata = LockMetadata {
163            pid,
164            acquired_at: current_timestamp(),
165            task_id: task_id.to_string(),
166        };
167
168        loop {
169            // Try to acquire the lock
170            match Self::try_acquire(&lock_path, &metadata) {
171                Ok(()) => {
172                    tracing::info!(
173                        group = %group,
174                        task = %task_id,
175                        "Acquired concurrency lock"
176                    );
177                    return Ok(LockGuard {
178                        lock_path,
179                        group: group.to_string(),
180                    });
181                }
182                Err(LockError::HeldByOther { pid, age_secs }) => {
183                    // Check if lock is stale
184                    if Duration::from_secs(age_secs) > self.config.stale_threshold {
185                        tracing::warn!(
186                            group = %group,
187                            holder_pid = pid,
188                            age_secs = age_secs,
189                            "Breaking stale lock"
190                        );
191                        // Remove stale lock and retry
192                        let _ = fs::remove_file(&lock_path);
193                        continue;
194                    }
195
196                    // Check timeout
197                    if start.elapsed() >= self.config.timeout {
198                        return Err(LockError::Timeout {
199                            group: group.to_string(),
200                            timeout_secs: self.config.timeout.as_secs(),
201                        });
202                    }
203
204                    tracing::debug!(
205                        group = %group,
206                        holder_pid = pid,
207                        "Lock held by another process, waiting..."
208                    );
209                }
210                Err(e) => return Err(e),
211            }
212
213            // Wait before retrying
214            tokio::time::sleep(POLL_INTERVAL).await;
215        }
216    }
217
218    /// Try to acquire lock without blocking
219    ///
220    /// # Errors
221    ///
222    /// Returns `LockError` if lock cannot be acquired or lock directory cannot be created.
223    pub fn try_acquire_sync(&self, group: &str, task_id: &str) -> Result<LockGuard, LockError> {
224        fs::create_dir_all(&self.config.lock_dir).map_err(LockError::DirectoryCreation)?;
225
226        let lock_path = self.lock_path(group);
227        let metadata = LockMetadata {
228            pid: std::process::id(),
229            acquired_at: current_timestamp(),
230            task_id: task_id.to_string(),
231        };
232
233        Self::try_acquire(&lock_path, &metadata)?;
234
235        Ok(LockGuard {
236            lock_path,
237            group: group.to_string(),
238        })
239    }
240
241    /// Check if a lock is currently held
242    #[must_use]
243    pub fn is_locked(&self, group: &str) -> bool {
244        let lock_path = self.lock_path(group);
245        lock_path.exists()
246    }
247
248    /// Get information about current lock holder
249    #[must_use]
250    pub fn lock_info(&self, group: &str) -> Option<LockMetadata> {
251        let lock_path = self.lock_path(group);
252        read_lock_metadata(&lock_path)
253    }
254
255    fn lock_path(&self, group: &str) -> PathBuf {
256        // Sanitize group name for filesystem
257        let safe_name: String = group
258            .chars()
259            .map(|c| {
260                if c.is_alphanumeric() || c == '-' || c == '_' {
261                    c
262                } else {
263                    '_'
264                }
265            })
266            .collect();
267        self.config.lock_dir.join(format!("{safe_name}.lock"))
268    }
269
270    fn try_acquire(lock_path: &Path, metadata: &LockMetadata) -> Result<(), LockError> {
271        // Try to create lock file exclusively
272        match OpenOptions::new()
273            .write(true)
274            .create_new(true)
275            .open(lock_path)
276        {
277            Ok(mut file) => {
278                // Write metadata
279                file.write_all(metadata.serialize().as_bytes())
280                    .map_err(|e| LockError::Io {
281                        group: metadata.task_id.clone(),
282                        source: e,
283                    })?;
284                Ok(())
285            }
286            Err(e) if e.kind() == io::ErrorKind::AlreadyExists => {
287                // Lock exists, check who holds it
288                if let Some(existing) = read_lock_metadata(lock_path) {
289                    let now = current_timestamp();
290                    let age_secs = now.saturating_sub(existing.acquired_at);
291                    Err(LockError::HeldByOther {
292                        pid: existing.pid,
293                        age_secs,
294                    })
295                } else {
296                    // Can't read lock file, assume it's invalid and remove
297                    let _ = fs::remove_file(lock_path);
298                    Err(LockError::HeldByOther {
299                        pid: 0,
300                        age_secs: 0,
301                    })
302                }
303            }
304            Err(e) => Err(LockError::Io {
305                group: metadata.task_id.clone(),
306                source: e,
307            }),
308        }
309    }
310}
311
312impl Default for ConcurrencyLock {
313    fn default() -> Self {
314        Self::new()
315    }
316}
317
318/// Guard that releases lock when dropped
319#[derive(Debug)]
320pub struct LockGuard {
321    lock_path: PathBuf,
322    group: String,
323}
324
325impl LockGuard {
326    /// Get the concurrency group name
327    #[must_use]
328    pub fn group(&self) -> &str {
329        &self.group
330    }
331
332    /// Explicitly release the lock
333    pub fn release(self) {
334        // Drop will handle it
335        drop(self);
336    }
337}
338
339impl Drop for LockGuard {
340    fn drop(&mut self) {
341        if let Err(e) = fs::remove_file(&self.lock_path) {
342            if e.kind() != io::ErrorKind::NotFound {
343                tracing::warn!(
344                    group = %self.group,
345                    error = %e,
346                    "Failed to release lock"
347                );
348            }
349        } else {
350            tracing::debug!(group = %self.group, "Released concurrency lock");
351        }
352    }
353}
354
355fn read_lock_metadata(path: &Path) -> Option<LockMetadata> {
356    let mut file = File::open(path).ok()?;
357    let mut contents = String::new();
358    file.read_to_string(&mut contents).ok()?;
359    LockMetadata::deserialize(&contents)
360}
361
362fn current_timestamp() -> u64 {
363    // System time before UNIX epoch is practically impossible on modern systems,
364    // but we handle it gracefully by returning 0 in that case
365    SystemTime::now()
366        .duration_since(UNIX_EPOCH)
367        .map(|d| d.as_secs())
368        .unwrap_or(0)
369}
370
371#[cfg(test)]
372mod tests {
373    use super::*;
374    use tempfile::TempDir;
375
376    #[test]
377    fn test_lock_metadata_serialization() {
378        let metadata = LockMetadata {
379            pid: 12345,
380            acquired_at: 1_234_567_890,
381            task_id: "test-task".to_string(),
382        };
383
384        let serialized = metadata.serialize();
385        let deserialized = LockMetadata::deserialize(&serialized).unwrap();
386
387        assert_eq!(deserialized.pid, 12345);
388        assert_eq!(deserialized.acquired_at, 1_234_567_890);
389        assert_eq!(deserialized.task_id, "test-task");
390    }
391
392    #[test]
393    fn test_lock_acquisition_sync() {
394        let tmp = TempDir::new().unwrap();
395        let lock = ConcurrencyLock::new().with_lock_dir(tmp.path());
396
397        // First acquisition should succeed
398        let guard1 = lock.try_acquire_sync("test-group", "task1").unwrap();
399        assert!(lock.is_locked("test-group"));
400
401        // Second acquisition should fail
402        let result = lock.try_acquire_sync("test-group", "task2");
403        assert!(matches!(result, Err(LockError::HeldByOther { .. })));
404
405        // Release first lock
406        drop(guard1);
407        assert!(!lock.is_locked("test-group"));
408
409        // Now we can acquire again
410        let _guard2 = lock.try_acquire_sync("test-group", "task2").unwrap();
411        assert!(lock.is_locked("test-group"));
412    }
413
414    #[test]
415    fn test_different_groups() {
416        let tmp = TempDir::new().unwrap();
417        let lock = ConcurrencyLock::new().with_lock_dir(tmp.path());
418
419        let _guard1 = lock.try_acquire_sync("group-a", "task1").unwrap();
420        let _guard2 = lock.try_acquire_sync("group-b", "task2").unwrap();
421
422        assert!(lock.is_locked("group-a"));
423        assert!(lock.is_locked("group-b"));
424    }
425
426    #[test]
427    fn test_lock_info() {
428        let tmp = TempDir::new().unwrap();
429        let lock = ConcurrencyLock::new().with_lock_dir(tmp.path());
430
431        let _guard = lock.try_acquire_sync("test-group", "my-task").unwrap();
432
433        let info = lock.lock_info("test-group").unwrap();
434        assert_eq!(info.task_id, "my-task");
435        assert_eq!(info.pid, std::process::id());
436    }
437
438    #[test]
439    fn test_group_name_sanitization() {
440        let tmp = TempDir::new().unwrap();
441        let lock = ConcurrencyLock::new().with_lock_dir(tmp.path());
442
443        // Group with special characters
444        let _guard = lock
445            .try_acquire_sync("production/deploy:v1", "task1")
446            .unwrap();
447
448        // Lock file should exist with sanitized name
449        let lock_path = tmp.path().join("production_deploy_v1.lock");
450        assert!(lock_path.exists());
451    }
452
453    #[tokio::test]
454    async fn test_async_acquisition() {
455        let tmp = TempDir::new().unwrap();
456        let lock = ConcurrencyLock::new()
457            .with_lock_dir(tmp.path())
458            .with_timeout(Duration::from_secs(1));
459
460        let guard = lock.acquire("async-group", "task1").await.unwrap();
461        assert!(lock.is_locked("async-group"));
462        drop(guard);
463    }
464
465    #[tokio::test]
466    async fn test_timeout() {
467        let tmp = TempDir::new().unwrap();
468        let lock = ConcurrencyLock::new()
469            .with_lock_dir(tmp.path())
470            .with_timeout(Duration::from_millis(100));
471
472        // Hold the lock
473        let _guard = lock.try_acquire_sync("timeout-group", "holder").unwrap();
474
475        // Try to acquire with short timeout
476        let result = lock.acquire("timeout-group", "waiter").await;
477        assert!(matches!(result, Err(LockError::Timeout { .. })));
478    }
479
480    #[test]
481    fn test_lock_release_on_drop() {
482        let tmp = TempDir::new().unwrap();
483        let lock = ConcurrencyLock::new().with_lock_dir(tmp.path());
484
485        {
486            let _guard = lock.try_acquire_sync("drop-test", "task1").unwrap();
487            assert!(lock.is_locked("drop-test"));
488        }
489
490        // Lock should be released after guard is dropped
491        assert!(!lock.is_locked("drop-test"));
492    }
493}