1use std::fs::{self, File, OpenOptions};
7use std::io::{self, Read, Write};
8use std::path::{Path, PathBuf};
9use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
10use thiserror::Error;
11
12pub const DEFAULT_LOCK_TIMEOUT: Duration = Duration::from_secs(300);
14
15pub const STALE_LOCK_THRESHOLD: Duration = Duration::from_secs(600);
17
18const POLL_INTERVAL: Duration = Duration::from_millis(500);
20
21#[derive(Debug, Error)]
23pub enum LockError {
24 #[error("Lock acquisition timed out for group '{group}' after {timeout_secs}s")]
26 Timeout { group: String, timeout_secs: u64 },
27
28 #[error("Lock file error for group '{group}': {source}")]
30 Io {
31 group: String,
32 #[source]
33 source: io::Error,
34 },
35
36 #[error("Failed to create lock directory: {0}")]
38 DirectoryCreation(io::Error),
39
40 #[error("Lock held by process {pid} (acquired {age_secs}s ago)")]
42 HeldByOther { pid: u32, age_secs: u64 },
43}
44
45#[derive(Debug, Clone)]
47pub struct LockMetadata {
48 pub pid: u32,
50 pub acquired_at: u64,
52 pub task_id: String,
54}
55
56impl LockMetadata {
57 fn serialize(&self) -> String {
58 format!("{}:{}:{}", self.pid, self.acquired_at, self.task_id)
59 }
60
61 fn deserialize(s: &str) -> Option<Self> {
62 let parts: Vec<&str> = s.splitn(3, ':').collect();
63 if parts.len() != 3 {
64 return None;
65 }
66 Some(Self {
67 pid: parts[0].parse().ok()?,
68 acquired_at: parts[1].parse().ok()?,
69 task_id: parts[2].to_string(),
70 })
71 }
72}
73
74#[derive(Debug, Clone)]
76pub struct LockConfig {
77 pub lock_dir: PathBuf,
79 pub timeout: Duration,
81 pub stale_threshold: Duration,
83}
84
85impl Default for LockConfig {
86 fn default() -> Self {
87 Self {
88 lock_dir: PathBuf::from(".cuenv/locks"),
89 timeout: DEFAULT_LOCK_TIMEOUT,
90 stale_threshold: STALE_LOCK_THRESHOLD,
91 }
92 }
93}
94
95#[derive(Debug)]
112pub struct ConcurrencyLock {
113 config: LockConfig,
114}
115
116impl ConcurrencyLock {
117 #[must_use]
119 pub fn new() -> Self {
120 Self {
121 config: LockConfig::default(),
122 }
123 }
124
125 #[must_use]
127 pub fn with_config(config: LockConfig) -> Self {
128 Self { config }
129 }
130
131 #[must_use]
133 pub fn with_lock_dir(mut self, dir: impl Into<PathBuf>) -> Self {
134 self.config.lock_dir = dir.into();
135 self
136 }
137
138 #[must_use]
140 pub fn with_timeout(mut self, timeout: Duration) -> Self {
141 self.config.timeout = timeout;
142 self
143 }
144
145 pub async fn acquire(&self, group: &str, task_id: &str) -> Result<LockGuard, LockError> {
156 fs::create_dir_all(&self.config.lock_dir).map_err(LockError::DirectoryCreation)?;
158
159 let lock_path = self.lock_path(group);
160 let start = Instant::now();
161 let pid = std::process::id();
162 let metadata = LockMetadata {
163 pid,
164 acquired_at: current_timestamp(),
165 task_id: task_id.to_string(),
166 };
167
168 loop {
169 match Self::try_acquire(&lock_path, &metadata) {
171 Ok(()) => {
172 tracing::info!(
173 group = %group,
174 task = %task_id,
175 "Acquired concurrency lock"
176 );
177 return Ok(LockGuard {
178 lock_path,
179 group: group.to_string(),
180 });
181 }
182 Err(LockError::HeldByOther { pid, age_secs }) => {
183 if Duration::from_secs(age_secs) > self.config.stale_threshold {
185 tracing::warn!(
186 group = %group,
187 holder_pid = pid,
188 age_secs = age_secs,
189 "Breaking stale lock"
190 );
191 let _ = fs::remove_file(&lock_path);
193 continue;
194 }
195
196 if start.elapsed() >= self.config.timeout {
198 return Err(LockError::Timeout {
199 group: group.to_string(),
200 timeout_secs: self.config.timeout.as_secs(),
201 });
202 }
203
204 tracing::debug!(
205 group = %group,
206 holder_pid = pid,
207 "Lock held by another process, waiting..."
208 );
209 }
210 Err(e) => return Err(e),
211 }
212
213 tokio::time::sleep(POLL_INTERVAL).await;
215 }
216 }
217
218 pub fn try_acquire_sync(&self, group: &str, task_id: &str) -> Result<LockGuard, LockError> {
224 fs::create_dir_all(&self.config.lock_dir).map_err(LockError::DirectoryCreation)?;
225
226 let lock_path = self.lock_path(group);
227 let metadata = LockMetadata {
228 pid: std::process::id(),
229 acquired_at: current_timestamp(),
230 task_id: task_id.to_string(),
231 };
232
233 Self::try_acquire(&lock_path, &metadata)?;
234
235 Ok(LockGuard {
236 lock_path,
237 group: group.to_string(),
238 })
239 }
240
241 #[must_use]
243 pub fn is_locked(&self, group: &str) -> bool {
244 let lock_path = self.lock_path(group);
245 lock_path.exists()
246 }
247
248 #[must_use]
250 pub fn lock_info(&self, group: &str) -> Option<LockMetadata> {
251 let lock_path = self.lock_path(group);
252 read_lock_metadata(&lock_path)
253 }
254
255 fn lock_path(&self, group: &str) -> PathBuf {
256 let safe_name: String = group
258 .chars()
259 .map(|c| {
260 if c.is_alphanumeric() || c == '-' || c == '_' {
261 c
262 } else {
263 '_'
264 }
265 })
266 .collect();
267 self.config.lock_dir.join(format!("{safe_name}.lock"))
268 }
269
270 fn try_acquire(lock_path: &Path, metadata: &LockMetadata) -> Result<(), LockError> {
271 match OpenOptions::new()
273 .write(true)
274 .create_new(true)
275 .open(lock_path)
276 {
277 Ok(mut file) => {
278 file.write_all(metadata.serialize().as_bytes())
280 .map_err(|e| LockError::Io {
281 group: metadata.task_id.clone(),
282 source: e,
283 })?;
284 Ok(())
285 }
286 Err(e) if e.kind() == io::ErrorKind::AlreadyExists => {
287 if let Some(existing) = read_lock_metadata(lock_path) {
289 let now = current_timestamp();
290 let age_secs = now.saturating_sub(existing.acquired_at);
291 Err(LockError::HeldByOther {
292 pid: existing.pid,
293 age_secs,
294 })
295 } else {
296 let _ = fs::remove_file(lock_path);
298 Err(LockError::HeldByOther {
299 pid: 0,
300 age_secs: 0,
301 })
302 }
303 }
304 Err(e) => Err(LockError::Io {
305 group: metadata.task_id.clone(),
306 source: e,
307 }),
308 }
309 }
310}
311
312impl Default for ConcurrencyLock {
313 fn default() -> Self {
314 Self::new()
315 }
316}
317
318#[derive(Debug)]
320pub struct LockGuard {
321 lock_path: PathBuf,
322 group: String,
323}
324
325impl LockGuard {
326 #[must_use]
328 pub fn group(&self) -> &str {
329 &self.group
330 }
331
332 pub fn release(self) {
334 drop(self);
336 }
337}
338
339impl Drop for LockGuard {
340 fn drop(&mut self) {
341 if let Err(e) = fs::remove_file(&self.lock_path) {
342 if e.kind() != io::ErrorKind::NotFound {
343 tracing::warn!(
344 group = %self.group,
345 error = %e,
346 "Failed to release lock"
347 );
348 }
349 } else {
350 tracing::debug!(group = %self.group, "Released concurrency lock");
351 }
352 }
353}
354
355fn read_lock_metadata(path: &Path) -> Option<LockMetadata> {
356 let mut file = File::open(path).ok()?;
357 let mut contents = String::new();
358 file.read_to_string(&mut contents).ok()?;
359 LockMetadata::deserialize(&contents)
360}
361
362fn current_timestamp() -> u64 {
363 SystemTime::now()
366 .duration_since(UNIX_EPOCH)
367 .map(|d| d.as_secs())
368 .unwrap_or(0)
369}
370
371#[cfg(test)]
372mod tests {
373 use super::*;
374 use tempfile::TempDir;
375
376 #[test]
377 fn test_lock_metadata_serialization() {
378 let metadata = LockMetadata {
379 pid: 12345,
380 acquired_at: 1_234_567_890,
381 task_id: "test-task".to_string(),
382 };
383
384 let serialized = metadata.serialize();
385 let deserialized = LockMetadata::deserialize(&serialized).unwrap();
386
387 assert_eq!(deserialized.pid, 12345);
388 assert_eq!(deserialized.acquired_at, 1_234_567_890);
389 assert_eq!(deserialized.task_id, "test-task");
390 }
391
392 #[test]
393 fn test_lock_acquisition_sync() {
394 let tmp = TempDir::new().unwrap();
395 let lock = ConcurrencyLock::new().with_lock_dir(tmp.path());
396
397 let guard1 = lock.try_acquire_sync("test-group", "task1").unwrap();
399 assert!(lock.is_locked("test-group"));
400
401 let result = lock.try_acquire_sync("test-group", "task2");
403 assert!(matches!(result, Err(LockError::HeldByOther { .. })));
404
405 drop(guard1);
407 assert!(!lock.is_locked("test-group"));
408
409 let _guard2 = lock.try_acquire_sync("test-group", "task2").unwrap();
411 assert!(lock.is_locked("test-group"));
412 }
413
414 #[test]
415 fn test_different_groups() {
416 let tmp = TempDir::new().unwrap();
417 let lock = ConcurrencyLock::new().with_lock_dir(tmp.path());
418
419 let _guard1 = lock.try_acquire_sync("group-a", "task1").unwrap();
420 let _guard2 = lock.try_acquire_sync("group-b", "task2").unwrap();
421
422 assert!(lock.is_locked("group-a"));
423 assert!(lock.is_locked("group-b"));
424 }
425
426 #[test]
427 fn test_lock_info() {
428 let tmp = TempDir::new().unwrap();
429 let lock = ConcurrencyLock::new().with_lock_dir(tmp.path());
430
431 let _guard = lock.try_acquire_sync("test-group", "my-task").unwrap();
432
433 let info = lock.lock_info("test-group").unwrap();
434 assert_eq!(info.task_id, "my-task");
435 assert_eq!(info.pid, std::process::id());
436 }
437
438 #[test]
439 fn test_group_name_sanitization() {
440 let tmp = TempDir::new().unwrap();
441 let lock = ConcurrencyLock::new().with_lock_dir(tmp.path());
442
443 let _guard = lock
445 .try_acquire_sync("production/deploy:v1", "task1")
446 .unwrap();
447
448 let lock_path = tmp.path().join("production_deploy_v1.lock");
450 assert!(lock_path.exists());
451 }
452
453 #[tokio::test]
454 async fn test_async_acquisition() {
455 let tmp = TempDir::new().unwrap();
456 let lock = ConcurrencyLock::new()
457 .with_lock_dir(tmp.path())
458 .with_timeout(Duration::from_secs(1));
459
460 let guard = lock.acquire("async-group", "task1").await.unwrap();
461 assert!(lock.is_locked("async-group"));
462 drop(guard);
463 }
464
465 #[tokio::test]
466 async fn test_timeout() {
467 let tmp = TempDir::new().unwrap();
468 let lock = ConcurrencyLock::new()
469 .with_lock_dir(tmp.path())
470 .with_timeout(Duration::from_millis(100));
471
472 let _guard = lock.try_acquire_sync("timeout-group", "holder").unwrap();
474
475 let result = lock.acquire("timeout-group", "waiter").await;
477 assert!(matches!(result, Err(LockError::Timeout { .. })));
478 }
479
480 #[test]
481 fn test_lock_release_on_drop() {
482 let tmp = TempDir::new().unwrap();
483 let lock = ConcurrencyLock::new().with_lock_dir(tmp.path());
484
485 {
486 let _guard = lock.try_acquire_sync("drop-test", "task1").unwrap();
487 assert!(lock.is_locked("drop-test"));
488 }
489
490 assert!(!lock.is_locked("drop-test"));
492 }
493}