Skip to main content

aft/
fs_lock.rs

1use std::fmt;
2use std::fs::{self, File, OpenOptions};
3use std::io::{self, Write};
4use std::path::{Path, PathBuf};
5use std::sync::{
6    atomic::{AtomicBool, AtomicU64, Ordering},
7    mpsc, Arc,
8};
9use std::thread::{self, JoinHandle};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use serde::{Deserialize, Serialize};
13
14use crate::{slog_error, slog_info, slog_warn};
15
16pub const HEARTBEAT_INTERVAL_MS: u64 = 5_000;
17pub const STALE_HEARTBEAT_MS: u64 = 15_000;
18pub const LIVE_OWNER_WARN_MS: u64 = 600_000;
19pub const POLL_INTERVAL_MS: u64 = 100;
20
21#[derive(Clone, Copy, Debug)]
22struct LockConfig {
23    heartbeat_interval_ms: u64,
24    stale_heartbeat_ms: u64,
25    live_owner_warn_ms: u64,
26    poll_interval_ms: u64,
27}
28
29impl Default for LockConfig {
30    fn default() -> Self {
31        Self {
32            heartbeat_interval_ms: HEARTBEAT_INTERVAL_MS,
33            stale_heartbeat_ms: STALE_HEARTBEAT_MS,
34            live_owner_warn_ms: LIVE_OWNER_WARN_MS,
35            poll_interval_ms: POLL_INTERVAL_MS,
36        }
37    }
38}
39
40#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
41struct LockMetadata {
42    pid: u32,
43    hostname: String,
44    created_at_ms: u64,
45    heartbeat_at_ms: u64,
46}
47
48/// Acquire a filesystem lock at `path`. Blocks until the lock is held.
49///
50/// The returned guard owns a background heartbeat thread; dropping it releases
51/// the lock and removes the lock file.
52pub fn acquire(path: &Path) -> Result<LockGuard, AcquireError> {
53    acquire_with_config(path, None, LockConfig::default())
54}
55
56/// Try to acquire a filesystem lock at `path` within `timeout`.
57pub fn try_acquire(path: &Path, timeout: Duration) -> Result<LockGuard, AcquireError> {
58    acquire_with_config(path, Some(timeout), LockConfig::default())
59}
60
61pub struct LockGuard {
62    path: PathBuf,
63    metadata: LockMetadata,
64    shutdown: Arc<AtomicBool>,
65    heartbeat_done: mpsc::Receiver<()>,
66    heartbeat: Option<JoinHandle<()>>,
67}
68
69impl Drop for LockGuard {
70    fn drop(&mut self) {
71        // Signal shutdown then unconditionally join the heartbeat thread
72        // BEFORE removing the lockfile. The earlier `recv_timeout(100ms)`
73        // implementation could let `remove_lock_if_owned` race with a
74        // still-alive heartbeat:
75        //
76        //   1. Drop signals shutdown, ack times out under CI load.
77        //   2. Drop calls `remove_lock_if_owned` → file removed.
78        //   3. Another caller acquires the lock → writes its metadata.
79        //   4. Our heartbeat (still alive, mid-`atomic_write_lock_metadata`
80        //      from before shutdown was checked) overwrites the new
81        //      owner's file with our stale metadata. heartbeat_once's
82        //      ownership check happens BEFORE the write, so it can race
83        //      with a concurrent acquire that flips ownership in between.
84        //   5. The new owner's heartbeat sees foreign metadata, exits
85        //      `NotOwner`. The new owner's drop sees foreign metadata,
86        //      `remove_lock_if_owned` returns `Ok(false)`, file persists.
87        //
88        // Always-joining bounds drop latency to one `park_timeout`
89        // iteration (~25ms) plus the current `heartbeat_once` IO —
90        // typically <500ms under CI load. The unused `heartbeat_done`
91        // channel is kept for backward compatibility with any external
92        // code that may still construct LockGuard manually, but Drop no
93        // longer relies on it.
94        self.shutdown.store(true, Ordering::Release);
95        if let Some(handle) = self.heartbeat.take() {
96            handle.thread().unpark();
97            let _ = handle.join();
98        }
99        // Drain any pending ack so the receiver doesn't carry stale state
100        // if this LockGuard is somehow re-used (it isn't today, but be
101        // defensive).
102        while self.heartbeat_done.try_recv().is_ok() {}
103
104        match remove_lock_if_owned(&self.path, &self.metadata) {
105            Ok(true) => slog_info!("released filesystem lock at {}", self.path.display()),
106            Ok(false) => {}
107            Err(error) => slog_warn!(
108                "failed to release filesystem lock at {}: {}",
109                self.path.display(),
110                error
111            ),
112        }
113    }
114}
115
116#[derive(Debug)]
117pub enum AcquireError {
118    Io(io::Error),
119    Timeout,
120}
121
122impl fmt::Display for AcquireError {
123    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124        match self {
125            AcquireError::Io(error) => write!(f, "filesystem lock I/O error: {error}"),
126            AcquireError::Timeout => write!(f, "timed out acquiring filesystem lock"),
127        }
128    }
129}
130
131impl std::error::Error for AcquireError {}
132
133impl From<io::Error> for AcquireError {
134    fn from(error: io::Error) -> Self {
135        AcquireError::Io(error)
136    }
137}
138
139fn acquire_with_config(
140    path: &Path,
141    timeout: Option<Duration>,
142    config: LockConfig,
143) -> Result<LockGuard, AcquireError> {
144    let deadline = timeout.map(|timeout| Instant::now() + timeout);
145    let hostname = current_hostname();
146    let mut warned_live_owner = false;
147
148    loop {
149        if let Some(deadline) = deadline {
150            if Instant::now() >= deadline {
151                return Err(AcquireError::Timeout);
152            }
153        }
154
155        match create_new_lock(path, &hostname, config) {
156            Ok(guard) => return Ok(guard),
157            Err(error) if error.kind() == io::ErrorKind::AlreadyExists => {}
158            Err(error) => return Err(error.into()),
159        }
160
161        let metadata = match read_lock_metadata(path) {
162            Ok(metadata) => metadata,
163            Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => continue,
164            Err(ReadLockError::Io(error)) => return Err(error.into()),
165            Err(ReadLockError::Malformed(error)) => {
166                // A just-created O_EXCL file is visible before its owner has
167                // finished writing JSON. Give that transient creation window
168                // one poll interval before treating malformed contents as stale.
169                sleep_until_retry(deadline, config.poll_interval_ms)?;
170                match read_lock_metadata(path) {
171                    Ok(_) => continue,
172                    Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
173                        continue;
174                    }
175                    Err(ReadLockError::Io(error)) => return Err(error.into()),
176                    Err(ReadLockError::Malformed(_)) => {}
177                }
178                slog_warn!(
179                    "removing malformed filesystem lock at {}: {}",
180                    path.display(),
181                    error
182                );
183                remove_lock_file(path)?;
184                continue;
185            }
186        };
187
188        if metadata.hostname != hostname {
189            sleep_until_retry(deadline, config.poll_interval_ms)?;
190            continue;
191        }
192
193        if !process_alive(metadata.pid) {
194            slog_warn!(
195                "removing filesystem lock at {} from dead PID {}",
196                path.display(),
197                metadata.pid
198            );
199            remove_lock_file(path)?;
200            continue;
201        }
202
203        let now = now_ms();
204        let since_heartbeat = now.saturating_sub(metadata.heartbeat_at_ms);
205        if since_heartbeat > config.stale_heartbeat_ms {
206            slog_warn!(
207                "reclaiming filesystem lock at {}: PID {} is alive but heartbeat is stale ({}ms)",
208                path.display(),
209                metadata.pid,
210                since_heartbeat
211            );
212            remove_lock_file(path)?;
213            continue;
214        }
215
216        let held_for = now.saturating_sub(metadata.created_at_ms);
217        if held_for > config.live_owner_warn_ms && !warned_live_owner {
218            slog_warn!(
219                "filesystem lock at {} held >10min by live heartbeating PID {}; NOT breaking",
220                path.display(),
221                metadata.pid
222            );
223            warned_live_owner = true;
224        }
225
226        sleep_until_retry(deadline, config.poll_interval_ms)?;
227    }
228}
229
230fn create_new_lock(path: &Path, hostname: &str, config: LockConfig) -> io::Result<LockGuard> {
231    let now = now_ms();
232    let metadata = LockMetadata {
233        pid: std::process::id(),
234        hostname: hostname.to_string(),
235        created_at_ms: now,
236        heartbeat_at_ms: now,
237    };
238
239    create_lock_file_atomically(path, &metadata)?;
240
241    let shutdown = Arc::new(AtomicBool::new(false));
242    let (done_tx, done_rx) = mpsc::channel();
243    let heartbeat_path = path.to_path_buf();
244    let heartbeat_metadata = metadata.clone();
245    let heartbeat_shutdown = Arc::clone(&shutdown);
246    let heartbeat = thread::Builder::new()
247        .name("aft-fs-lock-heartbeat".to_string())
248        .spawn(move || {
249            run_heartbeat(
250                heartbeat_path,
251                heartbeat_metadata,
252                heartbeat_shutdown,
253                config,
254            );
255            let _ = done_tx.send(());
256        })?;
257
258    slog_info!("acquired filesystem lock at {}", path.display());
259
260    Ok(LockGuard {
261        path: path.to_path_buf(),
262        metadata,
263        shutdown,
264        heartbeat_done: done_rx,
265        heartbeat: Some(heartbeat),
266    })
267}
268
269fn run_heartbeat(
270    path: PathBuf,
271    owner: LockMetadata,
272    shutdown: Arc<AtomicBool>,
273    config: LockConfig,
274) {
275    loop {
276        thread::park_timeout(Duration::from_millis(config.heartbeat_interval_ms));
277        if shutdown.load(Ordering::Acquire) {
278            return;
279        }
280
281        match heartbeat_once(&path, &owner) {
282            Ok(()) => {}
283            Err(HeartbeatError::LockGone) => {
284                slog_error!(
285                    "filesystem lock at {} disappeared; stopping heartbeat",
286                    path.display()
287                );
288                return;
289            }
290            Err(HeartbeatError::NotOwner) => {
291                slog_error!(
292                    "filesystem lock at {} is no longer owned by this guard; stopping heartbeat",
293                    path.display()
294                );
295                return;
296            }
297            Err(HeartbeatError::Malformed(error)) => {
298                slog_error!(
299                    "filesystem lock at {} became malformed: {}; stopping heartbeat",
300                    path.display(),
301                    error
302                );
303                return;
304            }
305            Err(HeartbeatError::Io(error)) => {
306                slog_error!(
307                    "failed to heartbeat filesystem lock at {}: {}; stopping heartbeat",
308                    path.display(),
309                    error
310                );
311                return;
312            }
313        }
314    }
315}
316
317fn heartbeat_once(path: &Path, owner: &LockMetadata) -> Result<(), HeartbeatError> {
318    let mut metadata = match read_lock_metadata(path) {
319        Ok(metadata) => metadata,
320        Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
321            return Err(HeartbeatError::LockGone);
322        }
323        Err(ReadLockError::Io(error)) => return Err(HeartbeatError::Io(error)),
324        Err(ReadLockError::Malformed(error)) => return Err(HeartbeatError::Malformed(error)),
325    };
326
327    if metadata.pid != owner.pid
328        || metadata.hostname != owner.hostname
329        || metadata.created_at_ms != owner.created_at_ms
330    {
331        return Err(HeartbeatError::NotOwner);
332    }
333
334    metadata.heartbeat_at_ms = now_ms();
335    atomic_write_lock_metadata(path, &metadata).map_err(HeartbeatError::Io)
336}
337
338#[derive(Debug)]
339enum HeartbeatError {
340    Io(io::Error),
341    LockGone,
342    Malformed(serde_json::Error),
343    NotOwner,
344}
345
346#[derive(Debug)]
347enum ReadLockError {
348    Io(io::Error),
349    Malformed(serde_json::Error),
350}
351
352fn read_lock_metadata(path: &Path) -> Result<LockMetadata, ReadLockError> {
353    let bytes = fs::read(path).map_err(ReadLockError::Io)?;
354    serde_json::from_slice(&bytes).map_err(ReadLockError::Malformed)
355}
356
357#[cfg(unix)]
358fn open_new_lock_file(path: &Path) -> io::Result<File> {
359    use std::os::unix::fs::OpenOptionsExt;
360
361    OpenOptions::new()
362        .write(true)
363        .create_new(true)
364        .mode(0o644)
365        .open(path)
366}
367
368#[cfg(not(unix))]
369fn open_new_lock_file(path: &Path) -> io::Result<File> {
370    OpenOptions::new().write(true).create_new(true).open(path)
371}
372
373fn write_lock_metadata_to_file(file: &mut File, metadata: &LockMetadata) -> io::Result<()> {
374    serde_json::to_writer(&mut *file, metadata).map_err(io::Error::other)?;
375    file.write_all(b"\n")?;
376    file.sync_all()
377}
378
379fn create_lock_file_atomically(path: &Path, metadata: &LockMetadata) -> io::Result<()> {
380    let tmp_path = temp_path_for_lock(path);
381    let result = (|| {
382        let mut file = open_new_lock_file(&tmp_path)?;
383        write_lock_metadata_to_file(&mut file, metadata)?;
384        drop(file);
385
386        fs::hard_link(&tmp_path, path)?;
387        sync_parent(path);
388        Ok(())
389    })();
390
391    let _ = fs::remove_file(&tmp_path);
392    result
393}
394
395fn atomic_write_lock_metadata(path: &Path, metadata: &LockMetadata) -> io::Result<()> {
396    let tmp_path = temp_path_for_lock(path);
397    let write_result = (|| {
398        let mut file = OpenOptions::new()
399            .write(true)
400            .create_new(true)
401            .open(&tmp_path)?;
402        write_lock_metadata_to_file(&mut file, metadata)?;
403        drop(file);
404
405        rename_over(&tmp_path, path)?;
406        sync_parent(path);
407        Ok(())
408    })();
409
410    if write_result.is_err() {
411        let _ = fs::remove_file(&tmp_path);
412    }
413
414    write_result
415}
416
417#[cfg(windows)]
418fn rename_over(from: &Path, to: &Path) -> io::Result<()> {
419    let _ = fs::remove_file(to);
420    fs::rename(from, to)
421}
422
423#[cfg(not(windows))]
424fn rename_over(from: &Path, to: &Path) -> io::Result<()> {
425    fs::rename(from, to)
426}
427
428// Per-thread counter that disambiguates temp lockfile paths for callers
429// inside the same process. `now_nanos()` alone is not unique enough on
430// Windows when two threads race to acquire the same lock (caught by the
431// `acquire_serializes_concurrent_callers` test): two threads sampling the
432// nanosecond clock within the same scheduler quantum produce identical
433// timestamps, both write to the same `.lock.tmp.<pid>.<nanos>` file, one
434// thread's `fs::remove_file(&tmp_path)` cleanup deletes the file before
435// the other thread's `fs::hard_link(&tmp_path, ...)` runs, and the loser
436// panics with `Io(Os { code: 2, NotFound })`.
437//
438// `AtomicU64` shared across threads makes every temp path unique within
439// the process regardless of clock resolution or scheduling races.
440static TEMP_LOCK_COUNTER: AtomicU64 = AtomicU64::new(0);
441
442fn temp_path_for_lock(path: &Path) -> PathBuf {
443    let file_name = path
444        .file_name()
445        .and_then(|name| name.to_str())
446        .unwrap_or("lock");
447    let seq = TEMP_LOCK_COUNTER.fetch_add(1, Ordering::Relaxed);
448    path.with_file_name(format!(
449        ".{file_name}.tmp.{}.{}.{}",
450        std::process::id(),
451        now_nanos(),
452        seq
453    ))
454}
455
456fn remove_lock_if_owned(path: &Path, owner: &LockMetadata) -> io::Result<bool> {
457    let metadata = match read_lock_metadata(path) {
458        Ok(metadata) => metadata,
459        Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
460            return Ok(false);
461        }
462        Err(ReadLockError::Io(error)) => return Err(error),
463        Err(ReadLockError::Malformed(_)) => return Ok(false),
464    };
465
466    if metadata.pid == owner.pid
467        && metadata.hostname == owner.hostname
468        && metadata.created_at_ms == owner.created_at_ms
469    {
470        remove_lock_file(path)?;
471        Ok(true)
472    } else {
473        Ok(false)
474    }
475}
476
477fn remove_lock_file(path: &Path) -> io::Result<()> {
478    match fs::remove_file(path) {
479        Ok(()) => Ok(()),
480        Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(()),
481        Err(error) => Err(error),
482    }
483}
484
485fn sleep_until_retry(deadline: Option<Instant>, poll_interval_ms: u64) -> Result<(), AcquireError> {
486    let poll = Duration::from_millis(poll_interval_ms);
487    let sleep_for = match deadline {
488        Some(deadline) => {
489            let now = Instant::now();
490            if now >= deadline {
491                return Err(AcquireError::Timeout);
492            }
493            poll.min(deadline.saturating_duration_since(now))
494        }
495        None => poll,
496    };
497    thread::sleep(sleep_for);
498    Ok(())
499}
500
501fn sync_parent(path: &Path) {
502    if let Some(parent) = path.parent() {
503        if let Ok(dir) = File::open(parent) {
504            let _ = dir.sync_all();
505        }
506    }
507}
508
509fn now_ms() -> u64 {
510    SystemTime::now()
511        .duration_since(UNIX_EPOCH)
512        .unwrap_or(Duration::ZERO)
513        .as_millis() as u64
514}
515
516fn now_nanos() -> u128 {
517    SystemTime::now()
518        .duration_since(UNIX_EPOCH)
519        .unwrap_or(Duration::ZERO)
520        .as_nanos()
521}
522
523#[cfg(unix)]
524fn current_hostname() -> String {
525    let mut buffer = [0u8; 256];
526    let result = unsafe { libc::gethostname(buffer.as_mut_ptr().cast(), buffer.len()) };
527    if result == 0 {
528        let len = buffer
529            .iter()
530            .position(|byte| *byte == 0)
531            .unwrap_or(buffer.len());
532        if len > 0 {
533            return String::from_utf8_lossy(&buffer[..len]).into_owned();
534        }
535    }
536
537    std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown-host".to_string())
538}
539
540#[cfg(windows)]
541fn current_hostname() -> String {
542    std::env::var("COMPUTERNAME")
543        .or_else(|_| std::env::var("HOSTNAME"))
544        .unwrap_or_else(|_| "unknown-host".to_string())
545}
546
547#[cfg(not(any(unix, windows)))]
548fn current_hostname() -> String {
549    std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown-host".to_string())
550}
551
552#[cfg(unix)]
553fn process_alive(pid: u32) -> bool {
554    if pid == 0 || pid > i32::MAX as u32 {
555        return false;
556    }
557
558    let result = unsafe { libc::kill(pid as libc::pid_t, 0) };
559    if result == 0 {
560        return true;
561    }
562
563    io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
564}
565
566#[cfg(windows)]
567fn process_alive(pid: u32) -> bool {
568    let filter = format!("PID eq {pid}");
569    let Ok(output) = std::process::Command::new("tasklist")
570        .args(["/FI", &filter, "/FO", "CSV", "/NH"])
571        .output()
572    else {
573        return true;
574    };
575
576    if !output.status.success() {
577        return true;
578    }
579
580    let stdout = String::from_utf8_lossy(&output.stdout);
581
582    // `tasklist /NH /FO CSV` emits a single line per matching process with
583    // every field quoted, e.g. `"image","7420","Console","1","12,345 K"`.
584    // When the filter matches nothing, the literal text
585    // `INFO: No tasks are running which match the specified criteria.`
586    // is written to stdout. The previous matcher was too strict — it looked
587    // for `","{pid}",` patterns mid-line, which works on most Windows builds
588    // but missed Windows runners that emit slightly different quoting (e.g.
589    // a trailing CRLF leaves the pid token at end-of-line as `"7420"\r\n`).
590    // The robust check: confirm the "no tasks" sentinel is absent AND any
591    // PID-quoted form is present.
592    if stdout.contains("No tasks are running") {
593        return false;
594    }
595    stdout.contains(&format!("\"{pid}\""))
596}
597
598#[cfg(not(any(unix, windows)))]
599fn process_alive(_pid: u32) -> bool {
600    true
601}
602
603#[cfg(test)]
604mod tests {
605    use super::*;
606    use std::sync::atomic::{AtomicUsize, Ordering};
607    use std::sync::{Arc, Barrier};
608
609    fn test_config() -> LockConfig {
610        LockConfig {
611            heartbeat_interval_ms: 25,
612            stale_heartbeat_ms: 2_000,
613            live_owner_warn_ms: LIVE_OWNER_WARN_MS,
614            poll_interval_ms: 10,
615        }
616    }
617
618    fn test_lock_path() -> (tempfile::TempDir, PathBuf) {
619        let dir = tempfile::tempdir().expect("create temp dir");
620        let path = dir.path().join("test.lock");
621        (dir, path)
622    }
623
624    fn write_synthetic_lock(path: &Path, metadata: &LockMetadata) {
625        let mut file = open_new_lock_file(path).expect("create synthetic lock");
626        write_lock_metadata_to_file(&mut file, metadata).expect("write synthetic lock");
627    }
628
629    fn synthetic_metadata(pid: u32, hostname: String, created_at_ms: u64) -> LockMetadata {
630        LockMetadata {
631            pid,
632            hostname,
633            created_at_ms,
634            heartbeat_at_ms: created_at_ms,
635        }
636    }
637
638    fn current_process_metadata() -> LockMetadata {
639        let now = now_ms();
640        synthetic_metadata(std::process::id(), current_hostname(), now)
641    }
642
643    #[test]
644    fn acquire_creates_lockfile_and_unlocks_on_drop() {
645        let (_dir, path) = test_lock_path();
646
647        let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
648        let metadata = read_lock_metadata(&path).expect("read lock metadata");
649        assert_eq!(metadata.pid, std::process::id());
650        assert_eq!(metadata.hostname, current_hostname());
651        assert_eq!(metadata.created_at_ms, guard.metadata.created_at_ms);
652
653        drop(guard);
654        assert!(!path.exists());
655    }
656
657    #[test]
658    fn acquire_serializes_concurrent_callers() {
659        let (_dir, path) = test_lock_path();
660        let path = Arc::new(path);
661        let barrier = Arc::new(Barrier::new(3));
662        let inside = Arc::new(AtomicUsize::new(0));
663        let entered = Arc::new(AtomicUsize::new(0));
664        let max_inside = Arc::new(AtomicUsize::new(0));
665
666        let mut handles = Vec::new();
667        for _ in 0..2 {
668            let path = Arc::clone(&path);
669            let barrier = Arc::clone(&barrier);
670            let inside = Arc::clone(&inside);
671            let entered = Arc::clone(&entered);
672            let max_inside = Arc::clone(&max_inside);
673            handles.push(thread::spawn(move || {
674                barrier.wait();
675                let guard = acquire_with_config(&path, Some(Duration::from_secs(2)), test_config())
676                    .expect("thread acquire lock");
677                let previous = inside.fetch_add(1, Ordering::SeqCst);
678                assert_eq!(previous, 0, "two lock holders overlapped");
679                entered.fetch_add(1, Ordering::SeqCst);
680                max_inside.fetch_max(previous + 1, Ordering::SeqCst);
681                thread::sleep(Duration::from_millis(75));
682                inside.fetch_sub(1, Ordering::SeqCst);
683                drop(guard);
684            }));
685        }
686
687        barrier.wait();
688        for handle in handles {
689            handle.join().expect("join worker");
690        }
691
692        assert_eq!(entered.load(Ordering::SeqCst), 2);
693        assert_eq!(max_inside.load(Ordering::SeqCst), 1);
694        assert!(!path.exists());
695    }
696
697    #[test]
698    fn heartbeat_updates_lockfile_timestamp() {
699        let (_dir, path) = test_lock_path();
700        let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
701        let initial = read_lock_metadata(&path)
702            .expect("read initial metadata")
703            .heartbeat_at_ms;
704
705        // Poll for up to 2s rather than sleeping a fixed multiple of the
706        // heartbeat interval. `park_timeout` is a *maximum* wait, not a
707        // guaranteed periodic timer — under load (shared macOS CI runners
708        // running other cargo-test threads concurrently) the heartbeat
709        // thread may not fire 3 times within 75ms even though
710        // heartbeat_interval_ms=25. The contract being asserted is "the
711        // heartbeat advances eventually", not "it advances within N
712        // heartbeat intervals".
713        let deadline = std::time::Instant::now() + Duration::from_millis(2_000);
714        let mut updated = initial;
715        while std::time::Instant::now() < deadline {
716            thread::sleep(Duration::from_millis(50));
717            updated = read_lock_metadata(&path)
718                .expect("read updated metadata")
719                .heartbeat_at_ms;
720            if updated > initial {
721                break;
722            }
723        }
724        assert!(
725            updated > initial,
726            "heartbeat timestamp did not advance within 2s"
727        );
728        drop(guard);
729    }
730
731    #[test]
732    fn dead_pid_lock_is_reclaimed() {
733        let (_dir, path) = test_lock_path();
734        let metadata = synthetic_metadata(999_999_999, current_hostname(), now_ms());
735        write_synthetic_lock(&path, &metadata);
736
737        let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
738            .expect("reclaim dead pid lock");
739        let metadata = read_lock_metadata(&path).expect("read reclaimed lock");
740        assert_eq!(metadata.pid, std::process::id());
741        drop(guard);
742    }
743
744    #[test]
745    fn stale_heartbeat_lock_is_reclaimed() {
746        let (_dir, path) = test_lock_path();
747        let mut metadata = current_process_metadata();
748        metadata.created_at_ms = now_ms().saturating_sub(60_000);
749        metadata.heartbeat_at_ms = now_ms().saturating_sub(60_000);
750        write_synthetic_lock(&path, &metadata);
751
752        let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
753            .expect("reclaim stale heartbeat lock");
754        let reclaimed = read_lock_metadata(&path).expect("read reclaimed lock");
755        assert_ne!(reclaimed.created_at_ms, metadata.created_at_ms);
756        drop(guard);
757    }
758
759    #[test]
760    fn healthy_live_owner_blocks() {
761        let (_dir, path) = test_lock_path();
762        let metadata = current_process_metadata();
763        write_synthetic_lock(&path, &metadata);
764
765        let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
766        assert!(matches!(result, Err(AcquireError::Timeout)));
767
768        remove_lock_file(&path).expect("cleanup synthetic lock");
769    }
770
771    #[test]
772    fn malformed_lockfile_is_reclaimed() {
773        let (_dir, path) = test_lock_path();
774        fs::write(&path, b"not valid json").expect("write malformed lock");
775
776        let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
777            .expect("reclaim malformed lock");
778        let metadata = read_lock_metadata(&path).expect("read reclaimed lock");
779        assert_eq!(metadata.pid, std::process::id());
780        drop(guard);
781    }
782
783    #[test]
784    fn cross_host_lock_is_not_stolen() {
785        let (_dir, path) = test_lock_path();
786        let now = now_ms().saturating_sub(60_000);
787        let metadata = LockMetadata {
788            pid: std::process::id(),
789            hostname: format!("{}-other", current_hostname()),
790            created_at_ms: now,
791            heartbeat_at_ms: now,
792        };
793        write_synthetic_lock(&path, &metadata);
794
795        let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
796        assert!(matches!(result, Err(AcquireError::Timeout)));
797        assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
798
799        remove_lock_file(&path).expect("cleanup synthetic lock");
800    }
801
802    #[test]
803    fn live_owner_over_10min_warns_but_blocks() {
804        let (_dir, path) = test_lock_path();
805        let mut metadata = current_process_metadata();
806        metadata.created_at_ms = now_ms().saturating_sub(11 * 60 * 1_000);
807        metadata.heartbeat_at_ms = now_ms();
808        write_synthetic_lock(&path, &metadata);
809
810        let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
811        assert!(matches!(result, Err(AcquireError::Timeout)));
812        assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
813
814        remove_lock_file(&path).expect("cleanup synthetic lock");
815    }
816
817    #[test]
818    fn drop_stops_heartbeat_thread() {
819        let (_dir, path) = test_lock_path();
820        let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
821        drop(guard);
822
823        thread::sleep(Duration::from_millis(
824            test_config().heartbeat_interval_ms * 3,
825        ));
826        assert!(
827            !path.exists(),
828            "heartbeat recreated or kept updating lockfile"
829        );
830    }
831}