Skip to main content

aft/
fs_lock.rs

1use std::fmt;
2use std::fs::{self, File, OpenOptions};
3use std::io::{self, Write};
4use std::path::{Path, PathBuf};
5use std::sync::{
6    atomic::{AtomicBool, AtomicU64, Ordering},
7    mpsc, Arc,
8};
9use std::thread::{self, JoinHandle};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use serde::{Deserialize, Serialize};
13
14use crate::{slog_error, slog_info, slog_warn};
15
16pub const HEARTBEAT_INTERVAL_MS: u64 = 5_000;
17pub const STALE_HEARTBEAT_MS: u64 = 15_000;
18pub const LIVE_OWNER_WARN_MS: u64 = 600_000;
19pub const POLL_INTERVAL_MS: u64 = 100;
20
21#[derive(Clone, Copy, Debug)]
22struct LockConfig {
23    heartbeat_interval_ms: u64,
24    stale_heartbeat_ms: u64,
25    live_owner_warn_ms: u64,
26    poll_interval_ms: u64,
27}
28
29impl LockConfig {
30    fn cross_host_stale_heartbeat_ms(self) -> u64 {
31        self.stale_heartbeat_ms.saturating_mul(5)
32    }
33}
34
35impl Default for LockConfig {
36    fn default() -> Self {
37        Self {
38            heartbeat_interval_ms: HEARTBEAT_INTERVAL_MS,
39            stale_heartbeat_ms: STALE_HEARTBEAT_MS,
40            live_owner_warn_ms: LIVE_OWNER_WARN_MS,
41            poll_interval_ms: POLL_INTERVAL_MS,
42        }
43    }
44}
45
46#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
47struct LockMetadata {
48    pid: u32,
49    hostname: String,
50    created_at_ms: u64,
51    heartbeat_at_ms: u64,
52}
53
54/// Acquire a filesystem lock at `path`. Blocks until the lock is held.
55///
56/// The returned guard owns a background heartbeat thread; dropping it releases
57/// the lock and removes the lock file.
58pub fn acquire(path: &Path) -> Result<LockGuard, AcquireError> {
59    acquire_with_config(path, None, LockConfig::default())
60}
61
62/// Try to acquire a filesystem lock at `path` within `timeout`.
63pub fn try_acquire(path: &Path, timeout: Duration) -> Result<LockGuard, AcquireError> {
64    acquire_with_config(path, Some(timeout), LockConfig::default())
65}
66
67pub struct LockGuard {
68    path: PathBuf,
69    metadata: LockMetadata,
70    shutdown: Arc<AtomicBool>,
71    heartbeat_done: mpsc::Receiver<()>,
72    heartbeat: Option<JoinHandle<()>>,
73}
74
75impl Drop for LockGuard {
76    fn drop(&mut self) {
77        // Signal shutdown then unconditionally join the heartbeat thread
78        // BEFORE removing the lockfile. The earlier `recv_timeout(100ms)`
79        // implementation could let `remove_lock_if_owned` race with a
80        // still-alive heartbeat:
81        //
82        //   1. Drop signals shutdown, ack times out under CI load.
83        //   2. Drop calls `remove_lock_if_owned` → file removed.
84        //   3. Another caller acquires the lock → writes its metadata.
85        //   4. Our heartbeat (still alive, mid-`atomic_write_lock_metadata`
86        //      from before shutdown was checked) overwrites the new
87        //      owner's file with our stale metadata. heartbeat_once's
88        //      ownership check happens BEFORE the write, so it can race
89        //      with a concurrent acquire that flips ownership in between.
90        //   5. The new owner's heartbeat sees foreign metadata, exits
91        //      `NotOwner`. The new owner's drop sees foreign metadata,
92        //      `remove_lock_if_owned` returns `Ok(false)`, file persists.
93        //
94        // Always-joining bounds drop latency to one `park_timeout`
95        // iteration (~25ms) plus the current `heartbeat_once` IO —
96        // typically <500ms under CI load. The unused `heartbeat_done`
97        // channel is kept for backward compatibility with any external
98        // code that may still construct LockGuard manually, but Drop no
99        // longer relies on it.
100        self.shutdown.store(true, Ordering::Release);
101        if let Some(handle) = self.heartbeat.take() {
102            handle.thread().unpark();
103            let _ = handle.join();
104        }
105        // Drain any pending ack so the receiver doesn't carry stale state
106        // if this LockGuard is somehow re-used (it isn't today, but be
107        // defensive).
108        while self.heartbeat_done.try_recv().is_ok() {}
109
110        match remove_lock_if_owned(&self.path, &self.metadata) {
111            Ok(true) => slog_info!("released filesystem lock at {}", self.path.display()),
112            Ok(false) => {}
113            Err(error) => slog_warn!(
114                "failed to release filesystem lock at {}: {}",
115                self.path.display(),
116                error
117            ),
118        }
119    }
120}
121
122#[derive(Debug)]
123pub enum AcquireError {
124    Io(io::Error),
125    Timeout,
126}
127
128impl fmt::Display for AcquireError {
129    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130        match self {
131            AcquireError::Io(error) => write!(f, "filesystem lock I/O error: {error}"),
132            AcquireError::Timeout => write!(f, "timed out acquiring filesystem lock"),
133        }
134    }
135}
136
137impl std::error::Error for AcquireError {}
138
139impl From<io::Error> for AcquireError {
140    fn from(error: io::Error) -> Self {
141        AcquireError::Io(error)
142    }
143}
144
145fn acquire_with_config(
146    path: &Path,
147    timeout: Option<Duration>,
148    config: LockConfig,
149) -> Result<LockGuard, AcquireError> {
150    let deadline = timeout.map(|timeout| Instant::now() + timeout);
151    let hostname = current_hostname();
152    let mut warned_live_owner = false;
153    let mut warned_stale_live_owner = false;
154
155    loop {
156        if let Some(deadline) = deadline {
157            if Instant::now() >= deadline {
158                return Err(AcquireError::Timeout);
159            }
160        }
161
162        match create_new_lock(path, &hostname, config) {
163            Ok(guard) => return Ok(guard),
164            Err(error) if error.kind() == io::ErrorKind::AlreadyExists => {}
165            Err(error) => return Err(error.into()),
166        }
167
168        let metadata = match read_lock_metadata(path) {
169            Ok(metadata) => metadata,
170            Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => continue,
171            Err(ReadLockError::Io(error)) => return Err(error.into()),
172            Err(ReadLockError::Malformed(error)) => {
173                // A just-created O_EXCL file is visible before its owner has
174                // finished writing JSON. Give that transient creation window
175                // one poll interval before treating malformed contents as stale.
176                sleep_until_retry(deadline, config.poll_interval_ms)?;
177                match read_lock_metadata(path) {
178                    Ok(_) => continue,
179                    Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
180                        continue;
181                    }
182                    Err(ReadLockError::Io(error)) => return Err(error.into()),
183                    Err(ReadLockError::Malformed(_)) => {}
184                }
185                slog_warn!(
186                    "removing malformed filesystem lock at {}: {}",
187                    path.display(),
188                    error
189                );
190                remove_lock_file(path)?;
191                continue;
192            }
193        };
194
195        let now = now_ms();
196        let since_heartbeat = now.saturating_sub(metadata.heartbeat_at_ms);
197
198        if metadata.hostname != hostname {
199            let cross_host_stale_ms = config.cross_host_stale_heartbeat_ms();
200            if since_heartbeat > cross_host_stale_ms {
201                slog_warn!(
202                    "reclaiming cross-host filesystem lock at {} from host {} after stale heartbeat ({}ms > {}ms)",
203                    path.display(),
204                    metadata.hostname,
205                    since_heartbeat,
206                    cross_host_stale_ms
207                );
208                remove_lock_file(path)?;
209                continue;
210            }
211            sleep_until_retry(deadline, config.poll_interval_ms)?;
212            continue;
213        }
214
215        if !process_alive(metadata.pid) {
216            slog_warn!(
217                "removing filesystem lock at {} from dead PID {}",
218                path.display(),
219                metadata.pid
220            );
221            remove_lock_file(path)?;
222            continue;
223        }
224
225        if since_heartbeat > config.stale_heartbeat_ms && !warned_stale_live_owner {
226            // Same-host PID liveness is authoritative. A SIGSTOP'd process,
227            // suspended VM, or sleeping laptop can miss heartbeats and later
228            // resume inside the critical section. Breaking that lock would allow
229            // split-brain writers, so a paused live owner blocks acquirers until
230            // it resumes and releases the lock or the PID dies.
231            slog_warn!(
232                "filesystem lock at {} held by live PID {} has stale heartbeat ({}ms); NOT breaking",
233                path.display(),
234                metadata.pid,
235                since_heartbeat
236            );
237            warned_stale_live_owner = true;
238        }
239
240        let held_for = now.saturating_sub(metadata.created_at_ms);
241        if held_for > config.live_owner_warn_ms && !warned_live_owner {
242            slog_warn!(
243                "filesystem lock at {} held >10min by live heartbeating PID {}; NOT breaking",
244                path.display(),
245                metadata.pid
246            );
247            warned_live_owner = true;
248        }
249
250        sleep_until_retry(deadline, config.poll_interval_ms)?;
251    }
252}
253
254fn create_new_lock(path: &Path, hostname: &str, config: LockConfig) -> io::Result<LockGuard> {
255    let now = now_ms();
256    let metadata = LockMetadata {
257        pid: std::process::id(),
258        hostname: hostname.to_string(),
259        created_at_ms: now,
260        heartbeat_at_ms: now,
261    };
262
263    create_lock_file_atomically(path, &metadata)?;
264
265    let shutdown = Arc::new(AtomicBool::new(false));
266    let (done_tx, done_rx) = mpsc::channel();
267    let heartbeat_path = path.to_path_buf();
268    let heartbeat_metadata = metadata.clone();
269    let heartbeat_shutdown = Arc::clone(&shutdown);
270    let heartbeat = thread::Builder::new()
271        .name("aft-fs-lock-heartbeat".to_string())
272        .spawn(move || {
273            run_heartbeat(
274                heartbeat_path,
275                heartbeat_metadata,
276                heartbeat_shutdown,
277                config,
278            );
279            let _ = done_tx.send(());
280        })?;
281
282    slog_info!("acquired filesystem lock at {}", path.display());
283
284    Ok(LockGuard {
285        path: path.to_path_buf(),
286        metadata,
287        shutdown,
288        heartbeat_done: done_rx,
289        heartbeat: Some(heartbeat),
290    })
291}
292
293fn run_heartbeat(
294    path: PathBuf,
295    owner: LockMetadata,
296    shutdown: Arc<AtomicBool>,
297    config: LockConfig,
298) {
299    loop {
300        thread::park_timeout(Duration::from_millis(config.heartbeat_interval_ms));
301        if shutdown.load(Ordering::Acquire) {
302            return;
303        }
304
305        match heartbeat_once(&path, &owner) {
306            Ok(()) => {}
307            Err(HeartbeatError::LockGone) => {
308                slog_error!(
309                    "filesystem lock at {} disappeared; stopping heartbeat",
310                    path.display()
311                );
312                return;
313            }
314            Err(HeartbeatError::NotOwner) => {
315                slog_error!(
316                    "filesystem lock at {} is no longer owned by this guard; stopping heartbeat",
317                    path.display()
318                );
319                return;
320            }
321            Err(HeartbeatError::Malformed(error)) => {
322                slog_error!(
323                    "filesystem lock at {} became malformed: {}; stopping heartbeat",
324                    path.display(),
325                    error
326                );
327                return;
328            }
329            Err(HeartbeatError::Io(error)) => {
330                slog_error!(
331                    "failed to heartbeat filesystem lock at {}: {}; stopping heartbeat",
332                    path.display(),
333                    error
334                );
335                return;
336            }
337        }
338    }
339}
340
341fn heartbeat_once(path: &Path, owner: &LockMetadata) -> Result<(), HeartbeatError> {
342    let mut metadata = match read_lock_metadata(path) {
343        Ok(metadata) => metadata,
344        Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
345            return Err(HeartbeatError::LockGone);
346        }
347        Err(ReadLockError::Io(error)) => return Err(HeartbeatError::Io(error)),
348        Err(ReadLockError::Malformed(error)) => return Err(HeartbeatError::Malformed(error)),
349    };
350
351    if metadata.pid != owner.pid
352        || metadata.hostname != owner.hostname
353        || metadata.created_at_ms != owner.created_at_ms
354    {
355        return Err(HeartbeatError::NotOwner);
356    }
357
358    metadata.heartbeat_at_ms = now_ms();
359    atomic_write_lock_metadata(path, &metadata).map_err(HeartbeatError::Io)
360}
361
362#[derive(Debug)]
363enum HeartbeatError {
364    Io(io::Error),
365    LockGone,
366    Malformed(serde_json::Error),
367    NotOwner,
368}
369
370#[derive(Debug)]
371enum ReadLockError {
372    Io(io::Error),
373    Malformed(serde_json::Error),
374}
375
376fn read_lock_metadata(path: &Path) -> Result<LockMetadata, ReadLockError> {
377    let bytes = fs::read(path).map_err(ReadLockError::Io)?;
378    serde_json::from_slice(&bytes).map_err(ReadLockError::Malformed)
379}
380
381#[cfg(unix)]
382fn open_new_lock_file(path: &Path) -> io::Result<File> {
383    use std::os::unix::fs::OpenOptionsExt;
384
385    OpenOptions::new()
386        .write(true)
387        .create_new(true)
388        .mode(0o644)
389        .open(path)
390}
391
392#[cfg(not(unix))]
393fn open_new_lock_file(path: &Path) -> io::Result<File> {
394    OpenOptions::new().write(true).create_new(true).open(path)
395}
396
397fn write_lock_metadata_to_file(file: &mut File, metadata: &LockMetadata) -> io::Result<()> {
398    serde_json::to_writer(&mut *file, metadata).map_err(io::Error::other)?;
399    file.write_all(b"\n")?;
400    file.sync_all()
401}
402
403fn create_lock_file_atomically(path: &Path, metadata: &LockMetadata) -> io::Result<()> {
404    let tmp_path = temp_path_for_lock(path);
405    let result = (|| {
406        let mut file = open_new_lock_file(&tmp_path)?;
407        write_lock_metadata_to_file(&mut file, metadata)?;
408        drop(file);
409
410        fs::hard_link(&tmp_path, path)?;
411        sync_parent(path);
412        Ok(())
413    })();
414
415    let _ = fs::remove_file(&tmp_path);
416    result
417}
418
419fn atomic_write_lock_metadata(path: &Path, metadata: &LockMetadata) -> io::Result<()> {
420    let tmp_path = temp_path_for_lock(path);
421    let write_result = (|| {
422        let mut file = OpenOptions::new()
423            .write(true)
424            .create_new(true)
425            .open(&tmp_path)?;
426        write_lock_metadata_to_file(&mut file, metadata)?;
427        drop(file);
428
429        rename_over(&tmp_path, path)?;
430        sync_parent(path);
431        Ok(())
432    })();
433
434    if write_result.is_err() {
435        let _ = fs::remove_file(&tmp_path);
436    }
437
438    write_result
439}
440
441#[cfg(windows)]
442fn rename_over(from: &Path, to: &Path) -> io::Result<()> {
443    let _ = fs::remove_file(to);
444    fs::rename(from, to)
445}
446
447#[cfg(not(windows))]
448fn rename_over(from: &Path, to: &Path) -> io::Result<()> {
449    fs::rename(from, to)
450}
451
452// Per-thread counter that disambiguates temp lockfile paths for callers
453// inside the same process. `now_nanos()` alone is not unique enough on
454// Windows when two threads race to acquire the same lock (caught by the
455// `acquire_serializes_concurrent_callers` test): two threads sampling the
456// nanosecond clock within the same scheduler quantum produce identical
457// timestamps, both write to the same `.lock.tmp.<pid>.<nanos>` file, one
458// thread's `fs::remove_file(&tmp_path)` cleanup deletes the file before
459// the other thread's `fs::hard_link(&tmp_path, ...)` runs, and the loser
460// panics with `Io(Os { code: 2, NotFound })`.
461//
462// `AtomicU64` shared across threads makes every temp path unique within
463// the process regardless of clock resolution or scheduling races.
464static TEMP_LOCK_COUNTER: AtomicU64 = AtomicU64::new(0);
465
466fn temp_path_for_lock(path: &Path) -> PathBuf {
467    let file_name = path
468        .file_name()
469        .and_then(|name| name.to_str())
470        .unwrap_or("lock");
471    let seq = TEMP_LOCK_COUNTER.fetch_add(1, Ordering::Relaxed);
472    path.with_file_name(format!(
473        ".{file_name}.tmp.{}.{}.{}",
474        std::process::id(),
475        now_nanos(),
476        seq
477    ))
478}
479
480fn remove_lock_if_owned(path: &Path, owner: &LockMetadata) -> io::Result<bool> {
481    let metadata = match read_lock_metadata(path) {
482        Ok(metadata) => metadata,
483        Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
484            return Ok(false);
485        }
486        Err(ReadLockError::Io(error)) => return Err(error),
487        Err(ReadLockError::Malformed(_)) => return Ok(false),
488    };
489
490    if metadata.pid == owner.pid
491        && metadata.hostname == owner.hostname
492        && metadata.created_at_ms == owner.created_at_ms
493    {
494        remove_lock_file(path)?;
495        Ok(true)
496    } else {
497        Ok(false)
498    }
499}
500
501fn remove_lock_file(path: &Path) -> io::Result<()> {
502    match fs::remove_file(path) {
503        Ok(()) => Ok(()),
504        Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(()),
505        Err(error) => Err(error),
506    }
507}
508
509fn sleep_until_retry(deadline: Option<Instant>, poll_interval_ms: u64) -> Result<(), AcquireError> {
510    let poll = Duration::from_millis(poll_interval_ms);
511    let sleep_for = match deadline {
512        Some(deadline) => {
513            let now = Instant::now();
514            if now >= deadline {
515                return Err(AcquireError::Timeout);
516            }
517            poll.min(deadline.saturating_duration_since(now))
518        }
519        None => poll,
520    };
521    thread::sleep(sleep_for);
522    Ok(())
523}
524
525fn sync_parent(path: &Path) {
526    if let Some(parent) = path.parent() {
527        if let Ok(dir) = File::open(parent) {
528            let _ = dir.sync_all();
529        }
530    }
531}
532
533fn now_ms() -> u64 {
534    SystemTime::now()
535        .duration_since(UNIX_EPOCH)
536        .unwrap_or(Duration::ZERO)
537        .as_millis() as u64
538}
539
540fn now_nanos() -> u128 {
541    SystemTime::now()
542        .duration_since(UNIX_EPOCH)
543        .unwrap_or(Duration::ZERO)
544        .as_nanos()
545}
546
547#[cfg(unix)]
548fn current_hostname() -> String {
549    let mut buffer = [0u8; 256];
550    let result = unsafe { libc::gethostname(buffer.as_mut_ptr().cast(), buffer.len()) };
551    if result == 0 {
552        let len = buffer
553            .iter()
554            .position(|byte| *byte == 0)
555            .unwrap_or(buffer.len());
556        if len > 0 {
557            return String::from_utf8_lossy(&buffer[..len]).into_owned();
558        }
559    }
560
561    std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown-host".to_string())
562}
563
564#[cfg(windows)]
565fn current_hostname() -> String {
566    std::env::var("COMPUTERNAME")
567        .or_else(|_| std::env::var("HOSTNAME"))
568        .unwrap_or_else(|_| "unknown-host".to_string())
569}
570
571#[cfg(not(any(unix, windows)))]
572fn current_hostname() -> String {
573    std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown-host".to_string())
574}
575
576#[cfg(unix)]
577fn process_alive(pid: u32) -> bool {
578    if pid == 0 || pid > i32::MAX as u32 {
579        return false;
580    }
581
582    let result = unsafe { libc::kill(pid as libc::pid_t, 0) };
583    if result == 0 {
584        return true;
585    }
586
587    io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
588}
589
590#[cfg(windows)]
591fn process_alive(pid: u32) -> bool {
592    let filter = format!("PID eq {pid}");
593    let Ok(output) = std::process::Command::new("tasklist")
594        .args(["/FI", &filter, "/FO", "CSV", "/NH"])
595        .output()
596    else {
597        return true;
598    };
599
600    if !output.status.success() {
601        return true;
602    }
603
604    let stdout = String::from_utf8_lossy(&output.stdout);
605
606    // `tasklist /NH /FO CSV` emits a single line per matching process with
607    // every field quoted, e.g. `"image","7420","Console","1","12,345 K"`.
608    // When the filter matches nothing, the literal text
609    // `INFO: No tasks are running which match the specified criteria.`
610    // is written to stdout. The previous matcher was too strict — it looked
611    // for `","{pid}",` patterns mid-line, which works on most Windows builds
612    // but missed Windows runners that emit slightly different quoting (e.g.
613    // a trailing CRLF leaves the pid token at end-of-line as `"7420"\r\n`).
614    // The robust check: confirm the "no tasks" sentinel is absent AND any
615    // PID-quoted form is present.
616    if stdout.contains("No tasks are running") {
617        return false;
618    }
619    stdout.contains(&format!("\"{pid}\""))
620}
621
622#[cfg(not(any(unix, windows)))]
623fn process_alive(_pid: u32) -> bool {
624    true
625}
626
627#[cfg(test)]
628mod tests {
629    use super::*;
630    use std::sync::atomic::{AtomicUsize, Ordering};
631    use std::sync::{Arc, Barrier};
632
633    fn test_config() -> LockConfig {
634        LockConfig {
635            heartbeat_interval_ms: 25,
636            stale_heartbeat_ms: 2_000,
637            live_owner_warn_ms: LIVE_OWNER_WARN_MS,
638            poll_interval_ms: 10,
639        }
640    }
641
642    fn test_lock_path() -> (tempfile::TempDir, PathBuf) {
643        let dir = tempfile::tempdir().expect("create temp dir");
644        let path = dir.path().join("test.lock");
645        (dir, path)
646    }
647
648    fn write_synthetic_lock(path: &Path, metadata: &LockMetadata) {
649        let mut file = open_new_lock_file(path).expect("create synthetic lock");
650        write_lock_metadata_to_file(&mut file, metadata).expect("write synthetic lock");
651    }
652
653    fn synthetic_metadata(pid: u32, hostname: String, created_at_ms: u64) -> LockMetadata {
654        LockMetadata {
655            pid,
656            hostname,
657            created_at_ms,
658            heartbeat_at_ms: created_at_ms,
659        }
660    }
661
662    fn current_process_metadata() -> LockMetadata {
663        let now = now_ms();
664        synthetic_metadata(std::process::id(), current_hostname(), now)
665    }
666
667    #[test]
668    fn acquire_creates_lockfile_and_unlocks_on_drop() {
669        let (_dir, path) = test_lock_path();
670
671        let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
672        let metadata = read_lock_metadata(&path).expect("read lock metadata");
673        assert_eq!(metadata.pid, std::process::id());
674        assert_eq!(metadata.hostname, current_hostname());
675        assert_eq!(metadata.created_at_ms, guard.metadata.created_at_ms);
676
677        drop(guard);
678        assert!(!path.exists());
679    }
680
681    #[test]
682    fn acquire_serializes_concurrent_callers() {
683        let (_dir, path) = test_lock_path();
684        let path = Arc::new(path);
685        let barrier = Arc::new(Barrier::new(3));
686        let inside = Arc::new(AtomicUsize::new(0));
687        let entered = Arc::new(AtomicUsize::new(0));
688        let max_inside = Arc::new(AtomicUsize::new(0));
689
690        let mut handles = Vec::new();
691        for _ in 0..2 {
692            let path = Arc::clone(&path);
693            let barrier = Arc::clone(&barrier);
694            let inside = Arc::clone(&inside);
695            let entered = Arc::clone(&entered);
696            let max_inside = Arc::clone(&max_inside);
697            handles.push(thread::spawn(move || {
698                barrier.wait();
699                let guard = acquire_with_config(&path, Some(Duration::from_secs(2)), test_config())
700                    .expect("thread acquire lock");
701                let previous = inside.fetch_add(1, Ordering::SeqCst);
702                assert_eq!(previous, 0, "two lock holders overlapped");
703                entered.fetch_add(1, Ordering::SeqCst);
704                max_inside.fetch_max(previous + 1, Ordering::SeqCst);
705                thread::sleep(Duration::from_millis(75));
706                inside.fetch_sub(1, Ordering::SeqCst);
707                drop(guard);
708            }));
709        }
710
711        barrier.wait();
712        for handle in handles {
713            handle.join().expect("join worker");
714        }
715
716        assert_eq!(entered.load(Ordering::SeqCst), 2);
717        assert_eq!(max_inside.load(Ordering::SeqCst), 1);
718        assert!(!path.exists());
719    }
720
721    #[test]
722    fn heartbeat_updates_lockfile_timestamp() {
723        let (_dir, path) = test_lock_path();
724        let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
725        let initial = read_lock_metadata(&path)
726            .expect("read initial metadata")
727            .heartbeat_at_ms;
728
729        // Poll for up to 2s rather than sleeping a fixed multiple of the
730        // heartbeat interval. `park_timeout` is a *maximum* wait, not a
731        // guaranteed periodic timer — under load (shared macOS CI runners
732        // running other cargo-test threads concurrently) the heartbeat
733        // thread may not fire 3 times within 75ms even though
734        // heartbeat_interval_ms=25. The contract being asserted is "the
735        // heartbeat advances eventually", not "it advances within N
736        // heartbeat intervals".
737        let deadline = std::time::Instant::now() + Duration::from_millis(2_000);
738        let mut updated = initial;
739        while std::time::Instant::now() < deadline {
740            thread::sleep(Duration::from_millis(50));
741            updated = read_lock_metadata(&path)
742                .expect("read updated metadata")
743                .heartbeat_at_ms;
744            if updated > initial {
745                break;
746            }
747        }
748        assert!(
749            updated > initial,
750            "heartbeat timestamp did not advance within 2s"
751        );
752        drop(guard);
753    }
754
755    #[test]
756    fn dead_pid_lock_is_reclaimed() {
757        let (_dir, path) = test_lock_path();
758        let metadata = synthetic_metadata(999_999_999, current_hostname(), now_ms());
759        write_synthetic_lock(&path, &metadata);
760
761        let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
762            .expect("reclaim dead pid lock");
763        let metadata = read_lock_metadata(&path).expect("read reclaimed lock");
764        assert_eq!(metadata.pid, std::process::id());
765        drop(guard);
766    }
767
768    #[test]
769    fn stale_heartbeat_from_live_pid_blocks() {
770        let (_dir, path) = test_lock_path();
771        let mut metadata = current_process_metadata();
772        metadata.created_at_ms = now_ms().saturating_sub(60_000);
773        metadata.heartbeat_at_ms = now_ms().saturating_sub(60_000);
774        write_synthetic_lock(&path, &metadata);
775
776        let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
777        assert!(matches!(result, Err(AcquireError::Timeout)));
778        assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
779
780        remove_lock_file(&path).expect("cleanup synthetic lock");
781    }
782
783    #[test]
784    fn healthy_live_owner_blocks() {
785        let (_dir, path) = test_lock_path();
786        let metadata = current_process_metadata();
787        write_synthetic_lock(&path, &metadata);
788
789        let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
790        assert!(matches!(result, Err(AcquireError::Timeout)));
791
792        remove_lock_file(&path).expect("cleanup synthetic lock");
793    }
794
795    #[test]
796    fn malformed_lockfile_is_reclaimed() {
797        let (_dir, path) = test_lock_path();
798        fs::write(&path, b"not valid json").expect("write malformed lock");
799
800        let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
801            .expect("reclaim malformed lock");
802        let metadata = read_lock_metadata(&path).expect("read reclaimed lock");
803        assert_eq!(metadata.pid, std::process::id());
804        drop(guard);
805    }
806
807    #[test]
808    fn cross_host_lock_is_not_stolen_before_extended_stale_threshold() {
809        let (_dir, path) = test_lock_path();
810        let now = now_ms();
811        let metadata = LockMetadata {
812            pid: std::process::id(),
813            hostname: format!("{}-other", current_hostname()),
814            created_at_ms: now,
815            heartbeat_at_ms: now,
816        };
817        write_synthetic_lock(&path, &metadata);
818
819        let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
820        assert!(matches!(result, Err(AcquireError::Timeout)));
821        assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
822
823        remove_lock_file(&path).expect("cleanup synthetic lock");
824    }
825
826    #[test]
827    fn stale_cross_host_lock_is_reclaimed_after_extended_threshold() {
828        let (_dir, path) = test_lock_path();
829        let stale_at =
830            now_ms().saturating_sub(test_config().cross_host_stale_heartbeat_ms() + 1_000);
831        let metadata = LockMetadata {
832            pid: std::process::id(),
833            hostname: format!("{}-other", current_hostname()),
834            created_at_ms: stale_at,
835            heartbeat_at_ms: stale_at,
836        };
837        write_synthetic_lock(&path, &metadata);
838
839        let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
840            .expect("reclaim stale cross-host lock");
841        let reclaimed = read_lock_metadata(&path).expect("read reclaimed lock");
842        assert_eq!(reclaimed.hostname, current_hostname());
843        assert_ne!(reclaimed.created_at_ms, metadata.created_at_ms);
844        drop(guard);
845    }
846
847    #[test]
848    fn live_owner_over_10min_warns_but_blocks() {
849        let (_dir, path) = test_lock_path();
850        let mut metadata = current_process_metadata();
851        metadata.created_at_ms = now_ms().saturating_sub(11 * 60 * 1_000);
852        metadata.heartbeat_at_ms = now_ms();
853        write_synthetic_lock(&path, &metadata);
854
855        let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
856        assert!(matches!(result, Err(AcquireError::Timeout)));
857        assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
858
859        remove_lock_file(&path).expect("cleanup synthetic lock");
860    }
861
862    #[test]
863    fn drop_stops_heartbeat_thread() {
864        let (_dir, path) = test_lock_path();
865        let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
866        drop(guard);
867
868        thread::sleep(Duration::from_millis(
869            test_config().heartbeat_interval_ms * 3,
870        ));
871        assert!(
872            !path.exists(),
873            "heartbeat recreated or kept updating lockfile"
874        );
875    }
876}