Skip to main content

aft/
fs_lock.rs

1use std::fmt;
2use std::fs::{self, File, OpenOptions};
3use std::io::{self, Write};
4use std::path::{Path, PathBuf};
5use std::sync::{
6    atomic::{AtomicBool, AtomicU64, Ordering},
7    mpsc, Arc,
8};
9use std::thread::{self, JoinHandle};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use serde::{Deserialize, Serialize};
13
14use crate::{slog_error, slog_info, slog_warn};
15
16pub const HEARTBEAT_INTERVAL_MS: u64 = 5_000;
17pub const STALE_HEARTBEAT_MS: u64 = 15_000;
18pub const LIVE_OWNER_WARN_MS: u64 = 600_000;
19pub const POLL_INTERVAL_MS: u64 = 100;
20
21#[derive(Clone, Copy, Debug)]
22struct LockConfig {
23    heartbeat_interval_ms: u64,
24    stale_heartbeat_ms: u64,
25    live_owner_warn_ms: u64,
26    poll_interval_ms: u64,
27}
28
29impl LockConfig {
30    fn cross_host_stale_heartbeat_ms(self) -> u64 {
31        self.stale_heartbeat_ms.saturating_mul(5)
32    }
33}
34
35impl Default for LockConfig {
36    fn default() -> Self {
37        Self {
38            heartbeat_interval_ms: HEARTBEAT_INTERVAL_MS,
39            stale_heartbeat_ms: STALE_HEARTBEAT_MS,
40            live_owner_warn_ms: LIVE_OWNER_WARN_MS,
41            poll_interval_ms: POLL_INTERVAL_MS,
42        }
43    }
44}
45
46#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
47struct LockMetadata {
48    pid: u32,
49    hostname: String,
50    created_at_ms: u64,
51    heartbeat_at_ms: u64,
52}
53
54/// Acquire a filesystem lock at `path`. Blocks until the lock is held.
55///
56/// The returned guard owns a background heartbeat thread; dropping it releases
57/// the lock and removes the lock file.
58pub fn acquire(path: &Path) -> Result<LockGuard, AcquireError> {
59    acquire_with_config(path, None, LockConfig::default())
60}
61
62/// Try to acquire a filesystem lock at `path` within `timeout`.
63pub fn try_acquire(path: &Path, timeout: Duration) -> Result<LockGuard, AcquireError> {
64    acquire_with_config(path, Some(timeout), LockConfig::default())
65}
66
67pub struct LockGuard {
68    path: PathBuf,
69    metadata: LockMetadata,
70    shutdown: Arc<AtomicBool>,
71    heartbeat_done: mpsc::Receiver<()>,
72    heartbeat: Option<JoinHandle<()>>,
73}
74
75impl Drop for LockGuard {
76    fn drop(&mut self) {
77        // Signal shutdown then unconditionally join the heartbeat thread
78        // BEFORE removing the lockfile. The earlier `recv_timeout(100ms)`
79        // implementation could let `remove_lock_if_owned` race with a
80        // still-alive heartbeat:
81        //
82        //   1. Drop signals shutdown, ack times out under CI load.
83        //   2. Drop calls `remove_lock_if_owned` → file removed.
84        //   3. Another caller acquires the lock → writes its metadata.
85        //   4. Our heartbeat (still alive, mid-`atomic_write_lock_metadata`
86        //      from before shutdown was checked) overwrites the new
87        //      owner's file with our stale metadata. heartbeat_once's
88        //      ownership check happens BEFORE the write, so it can race
89        //      with a concurrent acquire that flips ownership in between.
90        //   5. The new owner's heartbeat sees foreign metadata, exits
91        //      `NotOwner`. The new owner's drop sees foreign metadata,
92        //      `remove_lock_if_owned` returns `Ok(false)`, file persists.
93        //
94        // Always-joining bounds drop latency to one `park_timeout`
95        // iteration (~25ms) plus the current `heartbeat_once` IO —
96        // typically <500ms under CI load. The unused `heartbeat_done`
97        // channel is kept for backward compatibility with any external
98        // code that may still construct LockGuard manually, but Drop no
99        // longer relies on it.
100        self.shutdown.store(true, Ordering::Release);
101        if let Some(handle) = self.heartbeat.take() {
102            handle.thread().unpark();
103            let _ = handle.join();
104        }
105        // Drain any pending ack so the receiver doesn't carry stale state
106        // if this LockGuard is somehow re-used (it isn't today, but be
107        // defensive).
108        while self.heartbeat_done.try_recv().is_ok() {}
109
110        match remove_lock_if_owned(&self.path, &self.metadata) {
111            Ok(true) => slog_info!("released filesystem lock at {}", self.path.display()),
112            Ok(false) => {}
113            Err(error) => slog_warn!(
114                "failed to release filesystem lock at {}: {}",
115                self.path.display(),
116                error
117            ),
118        }
119    }
120}
121
122#[derive(Debug)]
123pub enum AcquireError {
124    Io(io::Error),
125    Timeout,
126}
127
128impl fmt::Display for AcquireError {
129    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130        match self {
131            AcquireError::Io(error) => write!(f, "filesystem lock I/O error: {error}"),
132            AcquireError::Timeout => write!(f, "timed out acquiring filesystem lock"),
133        }
134    }
135}
136
137impl std::error::Error for AcquireError {}
138
139impl From<io::Error> for AcquireError {
140    fn from(error: io::Error) -> Self {
141        AcquireError::Io(error)
142    }
143}
144
145fn acquire_with_config(
146    path: &Path,
147    timeout: Option<Duration>,
148    config: LockConfig,
149) -> Result<LockGuard, AcquireError> {
150    let deadline = timeout.map(|timeout| Instant::now() + timeout);
151    let hostname = current_hostname();
152    let mut warned_live_owner = false;
153    let mut warned_stale_live_owner = false;
154
155    loop {
156        if let Some(deadline) = deadline {
157            if Instant::now() >= deadline {
158                return Err(AcquireError::Timeout);
159            }
160        }
161
162        match create_new_lock(path, &hostname, config) {
163            Ok(guard) => return Ok(guard),
164            Err(error) if error.kind() == io::ErrorKind::AlreadyExists => {}
165            Err(error) => return Err(error.into()),
166        }
167
168        let metadata = match read_lock_metadata(path) {
169            Ok(metadata) => metadata,
170            Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => continue,
171            Err(ReadLockError::Io(error)) => return Err(error.into()),
172            Err(ReadLockError::Malformed(error)) => {
173                // A just-created O_EXCL file is visible before its owner has
174                // finished writing JSON. Give that transient creation window
175                // one poll interval before treating malformed contents as stale.
176                sleep_until_retry(deadline, config.poll_interval_ms)?;
177                match read_lock_metadata(path) {
178                    Ok(_) => continue,
179                    Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
180                        continue;
181                    }
182                    Err(ReadLockError::Io(error)) => return Err(error.into()),
183                    Err(ReadLockError::Malformed(_)) => {}
184                }
185                slog_warn!(
186                    "removing malformed filesystem lock at {}: {}",
187                    path.display(),
188                    error
189                );
190                remove_lock_file(path)?;
191                continue;
192            }
193        };
194
195        let now = now_ms();
196        let since_heartbeat = now.saturating_sub(metadata.heartbeat_at_ms);
197
198        if metadata.hostname != hostname {
199            let cross_host_stale_ms = config.cross_host_stale_heartbeat_ms();
200            if since_heartbeat > cross_host_stale_ms {
201                slog_warn!(
202                    "reclaiming cross-host filesystem lock at {} from host {} after stale heartbeat ({}ms > {}ms)",
203                    path.display(),
204                    metadata.hostname,
205                    since_heartbeat,
206                    cross_host_stale_ms
207                );
208                // Compare-and-delete: only remove if it's still the SAME stale
209                // owner (a fresh owner may have acquired it in the gap).
210                reclaim_lock_file(path, &metadata)?;
211                continue;
212            }
213            sleep_until_retry(deadline, config.poll_interval_ms)?;
214            continue;
215        }
216
217        if !process_alive(metadata.pid) {
218            slog_warn!(
219                "removing filesystem lock at {} from dead PID {}",
220                path.display(),
221                metadata.pid
222            );
223            // Compare-and-delete: only remove if it's still this dead owner's
224            // lock. A fresh owner could have written a new lock (with a recycled
225            // or different PID) between our liveness check and the unlink.
226            reclaim_lock_file(path, &metadata)?;
227            continue;
228        }
229
230        if since_heartbeat > config.stale_heartbeat_ms && !warned_stale_live_owner {
231            // Same-host PID liveness is authoritative. A SIGSTOP'd process,
232            // suspended VM, or sleeping laptop can miss heartbeats and later
233            // resume inside the critical section. Breaking that lock would allow
234            // split-brain writers, so a paused live owner blocks acquirers until
235            // it resumes and releases the lock or the PID dies.
236            slog_warn!(
237                "filesystem lock at {} held by live PID {} has stale heartbeat ({}ms); NOT breaking",
238                path.display(),
239                metadata.pid,
240                since_heartbeat
241            );
242            warned_stale_live_owner = true;
243        }
244
245        let held_for = now.saturating_sub(metadata.created_at_ms);
246        if held_for > config.live_owner_warn_ms && !warned_live_owner {
247            slog_warn!(
248                "filesystem lock at {} held >10min by live heartbeating PID {}; NOT breaking",
249                path.display(),
250                metadata.pid
251            );
252            warned_live_owner = true;
253        }
254
255        sleep_until_retry(deadline, config.poll_interval_ms)?;
256    }
257}
258
259fn create_new_lock(path: &Path, hostname: &str, config: LockConfig) -> io::Result<LockGuard> {
260    let now = now_ms();
261    let metadata = LockMetadata {
262        pid: std::process::id(),
263        hostname: hostname.to_string(),
264        created_at_ms: now,
265        heartbeat_at_ms: now,
266    };
267
268    create_lock_file_atomically(path, &metadata)?;
269
270    let shutdown = Arc::new(AtomicBool::new(false));
271    let (done_tx, done_rx) = mpsc::channel();
272    let heartbeat_path = path.to_path_buf();
273    let heartbeat_metadata = metadata.clone();
274    let heartbeat_shutdown = Arc::clone(&shutdown);
275    let heartbeat = thread::Builder::new()
276        .name("aft-fs-lock-heartbeat".to_string())
277        .spawn(move || {
278            run_heartbeat(
279                heartbeat_path,
280                heartbeat_metadata,
281                heartbeat_shutdown,
282                config,
283            );
284            let _ = done_tx.send(());
285        })?;
286
287    slog_info!("acquired filesystem lock at {}", path.display());
288
289    Ok(LockGuard {
290        path: path.to_path_buf(),
291        metadata,
292        shutdown,
293        heartbeat_done: done_rx,
294        heartbeat: Some(heartbeat),
295    })
296}
297
298fn run_heartbeat(
299    path: PathBuf,
300    owner: LockMetadata,
301    shutdown: Arc<AtomicBool>,
302    config: LockConfig,
303) {
304    // Number of consecutive heartbeat intervals that can be missed before the
305    // same-host stale window elapses and another process may reclaim the lock.
306    // Beyond this point a sustained failure is genuinely dangerous, so we
307    // escalate the log from warn to error — but we still keep retrying.
308    let stale_intervals = config
309        .stale_heartbeat_ms
310        .checked_div(config.heartbeat_interval_ms.max(1))
311        .unwrap_or(3)
312        .max(1);
313    let mut consecutive_transient_failures: u64 = 0;
314
315    loop {
316        thread::park_timeout(Duration::from_millis(config.heartbeat_interval_ms));
317        if shutdown.load(Ordering::Acquire) {
318            return;
319        }
320
321        match heartbeat_once(&path, &owner) {
322            Ok(()) => {
323                if consecutive_transient_failures > 0 {
324                    slog_info!(
325                        "filesystem lock at {} heartbeat recovered after {} transient failure(s)",
326                        path.display(),
327                        consecutive_transient_failures
328                    );
329                    consecutive_transient_failures = 0;
330                }
331            }
332            Err(error) if heartbeat_error_is_terminal(&error) => {
333                // Terminal states: the lock is provably gone or owned by
334                // someone else. Continuing to write would clobber a new owner's
335                // metadata (the exact race documented in LockGuard::drop), so
336                // stop heartbeating.
337                slog_error!(
338                    "{}; stopping heartbeat",
339                    terminal_heartbeat_message(&path, &error)
340                );
341                return;
342            }
343            Err(error) => {
344                // Transient states: a temporary I/O hiccup (disk/NFS blip,
345                // quota) or a read that raced a concurrent writer mid-write
346                // (momentarily unparseable file). A single such error must NOT
347                // permanently kill the heartbeat — that would silently stop
348                // refreshing heartbeat_at_ms while the guard holder keeps
349                // running its critical section, letting another process reclaim
350                // the lock after the stale window and produce concurrent
351                // writers. Log and retry on the next interval; a later success
352                // resumes heartbeating automatically.
353                consecutive_transient_failures += 1;
354                log_transient_heartbeat_failure(
355                    &path,
356                    &transient_heartbeat_reason(&error),
357                    consecutive_transient_failures,
358                    stale_intervals,
359                );
360            }
361        }
362    }
363}
364
365/// A heartbeat failure is terminal when the lock is provably no longer ours to
366/// refresh: it was removed (`LockGone`) or a different owner now holds it
367/// (`NotOwner`). I/O and malformed-read failures are treated as transient —
368/// they are typically temporary disk/NFS hiccups or a read that raced a
369/// concurrent writer — so the heartbeat retries rather than dying.
370fn heartbeat_error_is_terminal(error: &HeartbeatError) -> bool {
371    matches!(error, HeartbeatError::LockGone | HeartbeatError::NotOwner)
372}
373
374fn terminal_heartbeat_message(path: &Path, error: &HeartbeatError) -> String {
375    match error {
376        HeartbeatError::LockGone => {
377            format!("filesystem lock at {} disappeared", path.display())
378        }
379        HeartbeatError::NotOwner => format!(
380            "filesystem lock at {} is no longer owned by this guard",
381            path.display()
382        ),
383        // Not reachable for non-terminal errors, but keep a sensible string.
384        HeartbeatError::Io(error) => {
385            format!("filesystem lock at {} I/O error: {error}", path.display())
386        }
387        HeartbeatError::Malformed(error) => {
388            format!(
389                "filesystem lock at {} became malformed: {error}",
390                path.display()
391            )
392        }
393    }
394}
395
396fn transient_heartbeat_reason(error: &HeartbeatError) -> String {
397    match error {
398        HeartbeatError::Io(error) => format!("I/O error: {error}"),
399        HeartbeatError::Malformed(error) => format!("became malformed: {error}"),
400        HeartbeatError::LockGone => "lock disappeared".to_string(),
401        HeartbeatError::NotOwner => "lock no longer owned".to_string(),
402    }
403}
404
405/// Log a transient heartbeat failure, escalating to error exactly once when the
406/// failures have lasted long enough that the lock is now reclaimable by another
407/// owner. Beyond that point we stay quiet to avoid log spam while still
408/// retrying — the holder has already been warned the lock is at risk.
409fn log_transient_heartbeat_failure(
410    path: &Path,
411    reason: &str,
412    consecutive_failures: u64,
413    stale_intervals: u64,
414) {
415    if consecutive_failures < stale_intervals {
416        slog_warn!(
417            "transient failure to heartbeat filesystem lock at {}: {}; retrying (attempt {})",
418            path.display(),
419            reason,
420            consecutive_failures
421        );
422    } else if consecutive_failures == stale_intervals {
423        slog_error!(
424            "filesystem lock at {} has failed {} consecutive heartbeats: {}; \
425             the lock may now be reclaimed by another owner — continuing to retry",
426            path.display(),
427            consecutive_failures,
428            reason
429        );
430    }
431}
432
433fn heartbeat_once(path: &Path, owner: &LockMetadata) -> Result<(), HeartbeatError> {
434    let mut metadata = match read_lock_metadata(path) {
435        Ok(metadata) => metadata,
436        Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
437            return Err(HeartbeatError::LockGone);
438        }
439        Err(ReadLockError::Io(error)) => return Err(HeartbeatError::Io(error)),
440        Err(ReadLockError::Malformed(error)) => return Err(HeartbeatError::Malformed(error)),
441    };
442
443    if metadata.pid != owner.pid
444        || metadata.hostname != owner.hostname
445        || metadata.created_at_ms != owner.created_at_ms
446    {
447        return Err(HeartbeatError::NotOwner);
448    }
449
450    metadata.heartbeat_at_ms = now_ms();
451    atomic_write_lock_metadata(path, &metadata).map_err(HeartbeatError::Io)
452}
453
454#[derive(Debug)]
455enum HeartbeatError {
456    Io(io::Error),
457    LockGone,
458    Malformed(serde_json::Error),
459    NotOwner,
460}
461
462#[derive(Debug)]
463enum ReadLockError {
464    Io(io::Error),
465    Malformed(serde_json::Error),
466}
467
468fn read_lock_metadata(path: &Path) -> Result<LockMetadata, ReadLockError> {
469    let bytes = fs::read(path).map_err(ReadLockError::Io)?;
470    serde_json::from_slice(&bytes).map_err(ReadLockError::Malformed)
471}
472
473#[cfg(unix)]
474fn open_new_lock_file(path: &Path) -> io::Result<File> {
475    use std::os::unix::fs::OpenOptionsExt;
476
477    OpenOptions::new()
478        .write(true)
479        .create_new(true)
480        .mode(0o644)
481        .open(path)
482}
483
484#[cfg(not(unix))]
485fn open_new_lock_file(path: &Path) -> io::Result<File> {
486    OpenOptions::new().write(true).create_new(true).open(path)
487}
488
489fn write_lock_metadata_to_file(file: &mut File, metadata: &LockMetadata) -> io::Result<()> {
490    serde_json::to_writer(&mut *file, metadata).map_err(io::Error::other)?;
491    file.write_all(b"\n")?;
492    file.sync_all()
493}
494
495fn create_lock_file_atomically(path: &Path, metadata: &LockMetadata) -> io::Result<()> {
496    let tmp_path = temp_path_for_lock(path);
497    let result = (|| {
498        let mut file = open_new_lock_file(&tmp_path)?;
499        write_lock_metadata_to_file(&mut file, metadata)?;
500        drop(file);
501
502        fs::hard_link(&tmp_path, path)?;
503        sync_parent(path);
504        Ok(())
505    })();
506
507    let _ = fs::remove_file(&tmp_path);
508    result
509}
510
511fn atomic_write_lock_metadata(path: &Path, metadata: &LockMetadata) -> io::Result<()> {
512    let tmp_path = temp_path_for_lock(path);
513    let write_result = (|| {
514        let mut file = OpenOptions::new()
515            .write(true)
516            .create_new(true)
517            .open(&tmp_path)?;
518        write_lock_metadata_to_file(&mut file, metadata)?;
519        drop(file);
520
521        rename_over(&tmp_path, path)?;
522        sync_parent(path);
523        Ok(())
524    })();
525
526    if write_result.is_err() {
527        let _ = fs::remove_file(&tmp_path);
528    }
529
530    write_result
531}
532
533#[cfg(windows)]
534fn rename_over(from: &Path, to: &Path) -> io::Result<()> {
535    // std::fs::rename on Windows maps to MoveFileExW with
536    // MOVEFILE_REPLACE_EXISTING, which atomically replaces an existing
537    // destination. Try that FIRST: an unconditional `remove_file(to)` before
538    // the rename opens a window where `to` does not exist, and a concurrent
539    // reader (e.g. the heartbeat poll) landing in that gap reads NotFound ->
540    // LockGone (terminal) and kills the heartbeat thread. That race made
541    // heartbeat_survives_transient_malformed_and_recovers flaky on Windows CI.
542    match fs::rename(from, to) {
543        Ok(()) => Ok(()),
544        // Fall back to a copy-over (NOT remove-then-rename) when the atomic
545        // replace is refused (e.g. the destination is briefly open by another
546        // handle, or AV/indexer holds the temp source). `fs::copy` opens `to`
547        // with create+truncate and overwrites its bytes in place — the
548        // destination path never stops existing, so a concurrent heartbeat
549        // poll can never read NotFound -> LockGone (terminal). The earlier
550        // remove-then-rename fallback left a window where, if the second
551        // rename also failed, `to` was permanently deleted; copy-over closes
552        // that race class entirely. Worst case a reader observes a partially
553        // written file and gets Malformed, which is transient and retried —
554        // never fatal. Best-effort cleanup of the temp source afterward.
555        Err(original) => match fs::copy(from, to) {
556            Ok(_) => {
557                let _ = fs::remove_file(from);
558                Ok(())
559            }
560            // Both the atomic replace and the copy-over failed. Leave `to`
561            // untouched (copy create+truncate only proceeds once it can open
562            // the destination) and surface the original rename error.
563            Err(_) => Err(original),
564        },
565    }
566}
567
568#[cfg(not(windows))]
569fn rename_over(from: &Path, to: &Path) -> io::Result<()> {
570    fs::rename(from, to)
571}
572
573// Per-thread counter that disambiguates temp lockfile paths for callers
574// inside the same process. `now_nanos()` alone is not unique enough on
575// Windows when two threads race to acquire the same lock (caught by the
576// `acquire_serializes_concurrent_callers` test): two threads sampling the
577// nanosecond clock within the same scheduler quantum produce identical
578// timestamps, both write to the same `.lock.tmp.<pid>.<nanos>` file, one
579// thread's `fs::remove_file(&tmp_path)` cleanup deletes the file before
580// the other thread's `fs::hard_link(&tmp_path, ...)` runs, and the loser
581// panics with `Io(Os { code: 2, NotFound })`.
582//
583// `AtomicU64` shared across threads makes every temp path unique within
584// the process regardless of clock resolution or scheduling races.
585static TEMP_LOCK_COUNTER: AtomicU64 = AtomicU64::new(0);
586
587fn temp_path_for_lock(path: &Path) -> PathBuf {
588    let file_name = path
589        .file_name()
590        .and_then(|name| name.to_str())
591        .unwrap_or("lock");
592    let seq = TEMP_LOCK_COUNTER.fetch_add(1, Ordering::Relaxed);
593    path.with_file_name(format!(
594        ".{file_name}.tmp.{}.{}.{}",
595        std::process::id(),
596        now_nanos(),
597        seq
598    ))
599}
600
601fn remove_lock_if_owned(path: &Path, owner: &LockMetadata) -> io::Result<bool> {
602    let metadata = match read_lock_metadata(path) {
603        Ok(metadata) => metadata,
604        Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
605            return Ok(false);
606        }
607        Err(ReadLockError::Io(error)) => return Err(error),
608        Err(ReadLockError::Malformed(_)) => return Ok(false),
609    };
610
611    if metadata.pid == owner.pid
612        && metadata.hostname == owner.hostname
613        && metadata.created_at_ms == owner.created_at_ms
614    {
615        remove_lock_file(path)?;
616        Ok(true)
617    } else {
618        Ok(false)
619    }
620}
621
622fn remove_lock_file(path: &Path) -> io::Result<()> {
623    match fs::remove_file(path) {
624        Ok(()) => Ok(()),
625        Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(()),
626        Err(error) => Err(error),
627    }
628}
629
630/// Reclaim (delete) a lock file we judged stale/dead, but ONLY if it still holds
631/// the SAME owner identity we evaluated. Between reading the metadata and
632/// deleting, the stale owner could release and a FRESH owner acquire — blindly
633/// `remove_file` would then delete the fresh owner's lock, allowing split-brain
634/// writers. Re-read immediately before the unlink and bail if the identity
635/// (pid, hostname, created_at_ms) changed or the file vanished. POSIX has no
636/// atomic compare-and-unlink, so a microscopic residual race remains, but this
637/// shrinks the window from the whole judgment/poll duration to a couple of
638/// syscalls — the standard mitigation. Returns true if we removed it.
639fn reclaim_lock_file(path: &Path, judged: &LockMetadata) -> io::Result<bool> {
640    match read_lock_metadata(path) {
641        Ok(current) => {
642            if current.pid == judged.pid
643                && current.hostname == judged.hostname
644                && current.created_at_ms == judged.created_at_ms
645            {
646                remove_lock_file(path)?;
647                Ok(true)
648            } else {
649                // A different owner acquired it in the gap — do NOT delete.
650                Ok(false)
651            }
652        }
653        // Already gone (released/reclaimed by someone else) — nothing to do.
654        Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => Ok(false),
655        // Malformed now (mid-write by a new owner) — don't delete; retry next poll.
656        Err(ReadLockError::Malformed(_)) => Ok(false),
657        Err(ReadLockError::Io(error)) => Err(error),
658    }
659}
660
661fn sleep_until_retry(deadline: Option<Instant>, poll_interval_ms: u64) -> Result<(), AcquireError> {
662    let poll = Duration::from_millis(poll_interval_ms);
663    let sleep_for = match deadline {
664        Some(deadline) => {
665            let now = Instant::now();
666            if now >= deadline {
667                return Err(AcquireError::Timeout);
668            }
669            poll.min(deadline.saturating_duration_since(now))
670        }
671        None => poll,
672    };
673    thread::sleep(sleep_for);
674    Ok(())
675}
676
677fn sync_parent(path: &Path) {
678    if let Some(parent) = path.parent() {
679        if let Ok(dir) = File::open(parent) {
680            let _ = dir.sync_all();
681        }
682    }
683}
684
685fn now_ms() -> u64 {
686    SystemTime::now()
687        .duration_since(UNIX_EPOCH)
688        .unwrap_or(Duration::ZERO)
689        .as_millis() as u64
690}
691
692fn now_nanos() -> u128 {
693    SystemTime::now()
694        .duration_since(UNIX_EPOCH)
695        .unwrap_or(Duration::ZERO)
696        .as_nanos()
697}
698
699#[cfg(unix)]
700fn current_hostname() -> String {
701    let mut buffer = [0u8; 256];
702    let result = unsafe { libc::gethostname(buffer.as_mut_ptr().cast(), buffer.len()) };
703    if result == 0 {
704        let len = buffer
705            .iter()
706            .position(|byte| *byte == 0)
707            .unwrap_or(buffer.len());
708        if len > 0 {
709            return String::from_utf8_lossy(&buffer[..len]).into_owned();
710        }
711    }
712
713    std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown-host".to_string())
714}
715
716#[cfg(windows)]
717fn current_hostname() -> String {
718    std::env::var("COMPUTERNAME")
719        .or_else(|_| std::env::var("HOSTNAME"))
720        .unwrap_or_else(|_| "unknown-host".to_string())
721}
722
723#[cfg(not(any(unix, windows)))]
724fn current_hostname() -> String {
725    std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown-host".to_string())
726}
727
728#[cfg(unix)]
729fn process_alive(pid: u32) -> bool {
730    if pid == 0 || pid > i32::MAX as u32 {
731        return false;
732    }
733
734    let result = unsafe { libc::kill(pid as libc::pid_t, 0) };
735    if result == 0 {
736        return true;
737    }
738
739    io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
740}
741
742#[cfg(windows)]
743fn process_alive(pid: u32) -> bool {
744    let filter = format!("PID eq {pid}");
745    let Ok(output) = std::process::Command::new("tasklist")
746        .args(["/FI", &filter, "/FO", "CSV", "/NH"])
747        .output()
748    else {
749        return true;
750    };
751
752    if !output.status.success() {
753        return true;
754    }
755
756    let stdout = String::from_utf8_lossy(&output.stdout);
757
758    // `tasklist /NH /FO CSV` emits a single line per matching process with
759    // every field quoted, e.g. `"image","7420","Console","1","12,345 K"`.
760    // When the filter matches nothing, the literal text
761    // `INFO: No tasks are running which match the specified criteria.`
762    // is written to stdout. The previous matcher was too strict — it looked
763    // for `","{pid}",` patterns mid-line, which works on most Windows builds
764    // but missed Windows runners that emit slightly different quoting (e.g.
765    // a trailing CRLF leaves the pid token at end-of-line as `"7420"\r\n`).
766    // The robust check: confirm the "no tasks" sentinel is absent AND any
767    // PID-quoted form is present.
768    if stdout.contains("No tasks are running") {
769        return false;
770    }
771    stdout.contains(&format!("\"{pid}\""))
772}
773
774#[cfg(not(any(unix, windows)))]
775fn process_alive(_pid: u32) -> bool {
776    true
777}
778
779#[cfg(test)]
780mod tests {
781    use super::*;
782    use std::sync::atomic::{AtomicUsize, Ordering};
783    use std::sync::{Arc, Barrier};
784
785    fn test_config() -> LockConfig {
786        LockConfig {
787            heartbeat_interval_ms: 25,
788            stale_heartbeat_ms: 2_000,
789            live_owner_warn_ms: LIVE_OWNER_WARN_MS,
790            poll_interval_ms: 10,
791        }
792    }
793
794    fn test_lock_path() -> (tempfile::TempDir, PathBuf) {
795        let dir = tempfile::tempdir().expect("create temp dir");
796        let path = dir.path().join("test.lock");
797        (dir, path)
798    }
799
800    fn write_synthetic_lock(path: &Path, metadata: &LockMetadata) {
801        let mut file = open_new_lock_file(path).expect("create synthetic lock");
802        write_lock_metadata_to_file(&mut file, metadata).expect("write synthetic lock");
803    }
804
805    fn synthetic_metadata(pid: u32, hostname: String, created_at_ms: u64) -> LockMetadata {
806        LockMetadata {
807            pid,
808            hostname,
809            created_at_ms,
810            heartbeat_at_ms: created_at_ms,
811        }
812    }
813
814    fn current_process_metadata() -> LockMetadata {
815        let now = now_ms();
816        synthetic_metadata(std::process::id(), current_hostname(), now)
817    }
818
819    #[test]
820    fn acquire_creates_lockfile_and_unlocks_on_drop() {
821        let (_dir, path) = test_lock_path();
822
823        let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
824        let metadata = read_lock_metadata(&path).expect("read lock metadata");
825        assert_eq!(metadata.pid, std::process::id());
826        assert_eq!(metadata.hostname, current_hostname());
827        assert_eq!(metadata.created_at_ms, guard.metadata.created_at_ms);
828
829        drop(guard);
830        assert!(!path.exists());
831    }
832
833    #[test]
834    fn reclaim_refuses_to_delete_a_different_owners_lock() {
835        let (_dir, path) = test_lock_path();
836
837        // A lock currently owned by "owner B".
838        let owner_b = synthetic_metadata(4242, "host-b".to_string(), now_ms());
839        create_lock_file_atomically(&path, &owner_b).expect("write owner B lock");
840
841        // We judged a DIFFERENT (older) owner A as stale. Reclaiming must NOT
842        // delete B's lock (the TOCTOU split-brain guard).
843        let judged_a = synthetic_metadata(1111, "host-a".to_string(), now_ms() - 1_000_000);
844        let removed = reclaim_lock_file(&path, &judged_a).expect("reclaim");
845        assert!(!removed, "must not remove a different owner's lock");
846        assert!(path.exists(), "owner B's lock must survive");
847        let still = read_lock_metadata(&path).expect("still readable");
848        assert_eq!(still.pid, 4242, "owner B's lock intact");
849    }
850
851    #[test]
852    fn reclaim_deletes_when_identity_still_matches() {
853        let (_dir, path) = test_lock_path();
854        let owner = synthetic_metadata(1111, "host-a".to_string(), 5_000);
855        create_lock_file_atomically(&path, &owner).expect("write lock");
856
857        // Same identity we judged → safe to remove.
858        let removed = reclaim_lock_file(&path, &owner).expect("reclaim");
859        assert!(removed, "matching-identity stale lock should be removed");
860        assert!(!path.exists());
861
862        // Reclaiming a now-absent lock is a no-op, not an error.
863        assert!(!reclaim_lock_file(&path, &owner).expect("reclaim missing"));
864    }
865
866    #[test]
867    fn acquire_serializes_concurrent_callers() {
868        let (_dir, path) = test_lock_path();
869        let path = Arc::new(path);
870        let barrier = Arc::new(Barrier::new(3));
871        let inside = Arc::new(AtomicUsize::new(0));
872        let entered = Arc::new(AtomicUsize::new(0));
873        let max_inside = Arc::new(AtomicUsize::new(0));
874
875        let mut handles = Vec::new();
876        for _ in 0..2 {
877            let path = Arc::clone(&path);
878            let barrier = Arc::clone(&barrier);
879            let inside = Arc::clone(&inside);
880            let entered = Arc::clone(&entered);
881            let max_inside = Arc::clone(&max_inside);
882            handles.push(thread::spawn(move || {
883                barrier.wait();
884                let guard = acquire_with_config(&path, Some(Duration::from_secs(2)), test_config())
885                    .expect("thread acquire lock");
886                let previous = inside.fetch_add(1, Ordering::SeqCst);
887                assert_eq!(previous, 0, "two lock holders overlapped");
888                entered.fetch_add(1, Ordering::SeqCst);
889                max_inside.fetch_max(previous + 1, Ordering::SeqCst);
890                thread::sleep(Duration::from_millis(75));
891                inside.fetch_sub(1, Ordering::SeqCst);
892                drop(guard);
893            }));
894        }
895
896        barrier.wait();
897        for handle in handles {
898            handle.join().expect("join worker");
899        }
900
901        assert_eq!(entered.load(Ordering::SeqCst), 2);
902        assert_eq!(max_inside.load(Ordering::SeqCst), 1);
903        assert!(!path.exists());
904    }
905
906    #[test]
907    fn heartbeat_updates_lockfile_timestamp() {
908        let (_dir, path) = test_lock_path();
909        let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
910        let initial = read_lock_metadata(&path)
911            .expect("read initial metadata")
912            .heartbeat_at_ms;
913
914        // Poll for up to 2s rather than sleeping a fixed multiple of the
915        // heartbeat interval. `park_timeout` is a *maximum* wait, not a
916        // guaranteed periodic timer — under load (shared macOS CI runners
917        // running other cargo-test threads concurrently) the heartbeat
918        // thread may not fire 3 times within 75ms even though
919        // heartbeat_interval_ms=25. The contract being asserted is "the
920        // heartbeat advances eventually", not "it advances within N
921        // heartbeat intervals".
922        //
923        // On Windows, `rename_over` does `remove_file(to)` then
924        // `fs::rename(from, to)` because Windows can't atomically replace
925        // an open file. There's a brief window where the lockfile doesn't
926        // exist. If the poller hits that window, `read_lock_metadata`
927        // returns `Io(NotFound)`. Production callers already handle this
928        // (see `remove_lock_if_owned`), so the test treats `NotFound` the
929        // same as "no update yet" and keeps polling.
930        let deadline = std::time::Instant::now() + Duration::from_millis(2_000);
931        let mut updated = initial;
932        while std::time::Instant::now() < deadline {
933            thread::sleep(Duration::from_millis(50));
934            match read_lock_metadata(&path) {
935                Ok(meta) => {
936                    updated = meta.heartbeat_at_ms;
937                    if updated > initial {
938                        break;
939                    }
940                }
941                Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
942                    // Heartbeat thread is mid-rewrite (Windows
943                    // remove-then-rename window). Retry next iteration.
944                    continue;
945                }
946                Err(other) => panic!("read updated metadata: {other:?}"),
947            }
948        }
949        assert!(
950            updated > initial,
951            "heartbeat timestamp did not advance within 2s"
952        );
953        drop(guard);
954    }
955
956    #[test]
957    fn dead_pid_lock_is_reclaimed() {
958        let (_dir, path) = test_lock_path();
959        let metadata = synthetic_metadata(999_999_999, current_hostname(), now_ms());
960        write_synthetic_lock(&path, &metadata);
961
962        let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
963            .expect("reclaim dead pid lock");
964        let metadata = read_lock_metadata(&path).expect("read reclaimed lock");
965        assert_eq!(metadata.pid, std::process::id());
966        drop(guard);
967    }
968
969    #[test]
970    fn stale_heartbeat_from_live_pid_blocks() {
971        let (_dir, path) = test_lock_path();
972        let mut metadata = current_process_metadata();
973        metadata.created_at_ms = now_ms().saturating_sub(60_000);
974        metadata.heartbeat_at_ms = now_ms().saturating_sub(60_000);
975        write_synthetic_lock(&path, &metadata);
976
977        let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
978        assert!(matches!(result, Err(AcquireError::Timeout)));
979        assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
980
981        remove_lock_file(&path).expect("cleanup synthetic lock");
982    }
983
984    #[test]
985    fn healthy_live_owner_blocks() {
986        let (_dir, path) = test_lock_path();
987        let metadata = current_process_metadata();
988        write_synthetic_lock(&path, &metadata);
989
990        let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
991        assert!(matches!(result, Err(AcquireError::Timeout)));
992
993        remove_lock_file(&path).expect("cleanup synthetic lock");
994    }
995
996    #[test]
997    fn malformed_lockfile_is_reclaimed() {
998        let (_dir, path) = test_lock_path();
999        fs::write(&path, b"not valid json").expect("write malformed lock");
1000
1001        let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
1002            .expect("reclaim malformed lock");
1003        let metadata = read_lock_metadata(&path).expect("read reclaimed lock");
1004        assert_eq!(metadata.pid, std::process::id());
1005        drop(guard);
1006    }
1007
1008    #[test]
1009    fn cross_host_lock_is_not_stolen_before_extended_stale_threshold() {
1010        let (_dir, path) = test_lock_path();
1011        let now = now_ms();
1012        let metadata = LockMetadata {
1013            pid: std::process::id(),
1014            hostname: format!("{}-other", current_hostname()),
1015            created_at_ms: now,
1016            heartbeat_at_ms: now,
1017        };
1018        write_synthetic_lock(&path, &metadata);
1019
1020        let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
1021        assert!(matches!(result, Err(AcquireError::Timeout)));
1022        assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
1023
1024        remove_lock_file(&path).expect("cleanup synthetic lock");
1025    }
1026
1027    #[test]
1028    fn stale_cross_host_lock_is_reclaimed_after_extended_threshold() {
1029        let (_dir, path) = test_lock_path();
1030        let stale_at =
1031            now_ms().saturating_sub(test_config().cross_host_stale_heartbeat_ms() + 1_000);
1032        let metadata = LockMetadata {
1033            pid: std::process::id(),
1034            hostname: format!("{}-other", current_hostname()),
1035            created_at_ms: stale_at,
1036            heartbeat_at_ms: stale_at,
1037        };
1038        write_synthetic_lock(&path, &metadata);
1039
1040        let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
1041            .expect("reclaim stale cross-host lock");
1042        let reclaimed = read_lock_metadata(&path).expect("read reclaimed lock");
1043        assert_eq!(reclaimed.hostname, current_hostname());
1044        assert_ne!(reclaimed.created_at_ms, metadata.created_at_ms);
1045        drop(guard);
1046    }
1047
1048    #[test]
1049    fn live_owner_over_10min_warns_but_blocks() {
1050        let (_dir, path) = test_lock_path();
1051        let mut metadata = current_process_metadata();
1052        metadata.created_at_ms = now_ms().saturating_sub(11 * 60 * 1_000);
1053        metadata.heartbeat_at_ms = now_ms();
1054        write_synthetic_lock(&path, &metadata);
1055
1056        let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
1057        assert!(matches!(result, Err(AcquireError::Timeout)));
1058        assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
1059
1060        remove_lock_file(&path).expect("cleanup synthetic lock");
1061    }
1062
1063    #[test]
1064    fn drop_stops_heartbeat_thread() {
1065        let (_dir, path) = test_lock_path();
1066        let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
1067        drop(guard);
1068
1069        thread::sleep(Duration::from_millis(
1070            test_config().heartbeat_interval_ms * 3,
1071        ));
1072        assert!(
1073            !path.exists(),
1074            "heartbeat recreated or kept updating lockfile"
1075        );
1076    }
1077
1078    #[test]
1079    fn heartbeat_error_classification_terminal_vs_transient() {
1080        // Terminal: the lock is provably no longer ours to refresh.
1081        assert!(heartbeat_error_is_terminal(&HeartbeatError::LockGone));
1082        assert!(heartbeat_error_is_terminal(&HeartbeatError::NotOwner));
1083        // Transient: a temporary I/O hiccup or a read that raced a concurrent
1084        // writer. These must NOT kill the heartbeat — it retries instead.
1085        assert!(!heartbeat_error_is_terminal(&HeartbeatError::Io(
1086            io::Error::other("disk blip")
1087        )));
1088        let malformed: serde_json::Error =
1089            serde_json::from_str::<LockMetadata>("not json").unwrap_err();
1090        assert!(!heartbeat_error_is_terminal(&HeartbeatError::Malformed(
1091            malformed
1092        )));
1093    }
1094
1095    #[test]
1096    fn heartbeat_survives_transient_malformed_and_recovers() {
1097        // Regression: a single transient failure (e.g. a read that races a
1098        // concurrent writer and sees a momentarily-unparseable file) used to
1099        // permanently kill the heartbeat thread. The guard holder would then
1100        // run its critical section with a stale heartbeat_at_ms, letting
1101        // another process reclaim the lock after the stale window — concurrent
1102        // writers / split-brain. The heartbeat must instead retry and resume
1103        // refreshing once the file is readable again.
1104        let (_dir, path) = test_lock_path();
1105        let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
1106        let owner = guard.metadata.clone();
1107
1108        // Corrupt the lockfile out from under the heartbeat (simulates a
1109        // concurrent-writer race producing a momentarily-unparseable read).
1110        // The heartbeat reads-then-writes, so it observes Malformed and, with
1111        // the fix, retries instead of dying.
1112        fs::write(&path, b"{ not valid json").expect("corrupt lockfile");
1113
1114        // Give the heartbeat several intervals to observe the malformed file.
1115        // Pre-fix, the thread is dead by now.
1116        thread::sleep(Duration::from_millis(
1117            test_config().heartbeat_interval_ms * 4,
1118        ));
1119
1120        // Restore valid owner metadata with a clearly-stale heartbeat sentinel.
1121        // Ownership fields must match `owner` exactly so heartbeat_once passes
1122        // its ownership check and writes a fresh timestamp.
1123        //
1124        // Use the atomic temp-write+rename path rather than remove-then-recreate:
1125        // a remove followed by a separate create leaves a window where the file
1126        // does not exist, and a heartbeat poll landing in that window reads
1127        // NotFound -> LockGone (terminal) and kills the thread, failing this test
1128        // spuriously under runner load (observed on macOS CI). The atomic replace
1129        // overwrites the corrupt file in place with no no-file window on Unix.
1130        let sentinel = now_ms().saturating_sub(1_000_000);
1131        let mut restored = owner.clone();
1132        restored.heartbeat_at_ms = sentinel;
1133        atomic_write_lock_metadata(&path, &restored).expect("atomically restore lock metadata");
1134
1135        // If the heartbeat thread is still alive (the fix), it will overwrite
1136        // heartbeat_at_ms with a current value. Poll for that recovery.
1137        let deadline = std::time::Instant::now() + Duration::from_millis(3_000);
1138        let mut recovered = false;
1139        while std::time::Instant::now() < deadline {
1140            thread::sleep(Duration::from_millis(25));
1141            match read_lock_metadata(&path) {
1142                Ok(meta)
1143                    if meta.created_at_ms == owner.created_at_ms
1144                        && meta.heartbeat_at_ms > sentinel =>
1145                {
1146                    recovered = true;
1147                    break;
1148                }
1149                _ => continue,
1150            }
1151        }
1152        assert!(
1153            recovered,
1154            "heartbeat did not recover after a transient malformed read — thread likely died"
1155        );
1156        drop(guard);
1157    }
1158}