Skip to main content

aft/
fs_lock.rs

1use std::fmt;
2use std::fs::{self, File, OpenOptions};
3use std::io::{self, Write};
4use std::path::{Path, PathBuf};
5use std::sync::{
6    atomic::{AtomicBool, AtomicU64, Ordering},
7    mpsc, Arc,
8};
9use std::thread::{self, JoinHandle};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use serde::{Deserialize, Serialize};
13
14use crate::{slog_error, slog_info, slog_warn};
15
16pub const HEARTBEAT_INTERVAL_MS: u64 = 5_000;
17pub const STALE_HEARTBEAT_MS: u64 = 15_000;
18pub const LIVE_OWNER_WARN_MS: u64 = 600_000;
19pub const POLL_INTERVAL_MS: u64 = 100;
20
21/// Max consecutive transient OS errors tolerated while creating the lock file
22/// before giving up. On Windows, two processes/threads racing to create (or one
23/// creating while another deletes) the same path can momentarily return
24/// ERROR_ACCESS_DENIED (5) or ERROR_SHARING_VIOLATION (32) instead of a clean
25/// "already exists". Those windows close in milliseconds, so a small bounded
26/// retry rides them out while a genuinely persistent permission/IO failure still
27/// surfaces promptly.
28const MAX_TRANSIENT_CREATE_RETRIES: u32 = 50;
29
30/// True for OS errors that mean "another actor is touching this exact lock path
31/// right now", as opposed to a real, persistent failure. On Windows a contended
32/// create/delete on the same file surfaces as ERROR_ACCESS_DENIED (5) or
33/// ERROR_SHARING_VIOLATION (32); `PermissionDenied` covers the former across
34/// platforms. These are retried as contention, never treated as fatal.
35fn is_transient_create_contention(error: &io::Error) -> bool {
36    if error.kind() == io::ErrorKind::PermissionDenied {
37        return true;
38    }
39    #[cfg(windows)]
40    {
41        // ERROR_SHARING_VIOLATION = 32. ERROR_ACCESS_DENIED = 5 maps to
42        // PermissionDenied above, but match it explicitly too in case the OS
43        // surfaces it as an Other-kind raw error.
44        if let Some(code) = error.raw_os_error() {
45            if code == 32 || code == 5 {
46                return true;
47            }
48        }
49    }
50    false
51}
52
53#[derive(Clone, Copy, Debug)]
54struct LockConfig {
55    heartbeat_interval_ms: u64,
56    stale_heartbeat_ms: u64,
57    live_owner_warn_ms: u64,
58    poll_interval_ms: u64,
59}
60
61impl LockConfig {
62    fn cross_host_stale_heartbeat_ms(self) -> u64 {
63        self.stale_heartbeat_ms.saturating_mul(5)
64    }
65}
66
67impl Default for LockConfig {
68    fn default() -> Self {
69        Self {
70            heartbeat_interval_ms: HEARTBEAT_INTERVAL_MS,
71            stale_heartbeat_ms: STALE_HEARTBEAT_MS,
72            live_owner_warn_ms: LIVE_OWNER_WARN_MS,
73            poll_interval_ms: POLL_INTERVAL_MS,
74        }
75    }
76}
77
78#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
79struct LockMetadata {
80    pid: u32,
81    hostname: String,
82    created_at_ms: u64,
83    heartbeat_at_ms: u64,
84}
85
86/// Acquire a filesystem lock at `path`. Blocks until the lock is held.
87///
88/// The returned guard owns a background heartbeat thread; dropping it releases
89/// the lock and removes the lock file.
90pub fn acquire(path: &Path) -> Result<LockGuard, AcquireError> {
91    acquire_with_config(path, None, LockConfig::default())
92}
93
94/// Try to acquire a filesystem lock at `path` within `timeout`.
95pub fn try_acquire(path: &Path, timeout: Duration) -> Result<LockGuard, AcquireError> {
96    acquire_with_config(path, Some(timeout), LockConfig::default())
97}
98
99pub struct LockGuard {
100    path: PathBuf,
101    metadata: LockMetadata,
102    shutdown: Arc<AtomicBool>,
103    heartbeat_done: mpsc::Receiver<()>,
104    heartbeat: Option<JoinHandle<()>>,
105}
106
107impl Drop for LockGuard {
108    fn drop(&mut self) {
109        // Signal shutdown then unconditionally join the heartbeat thread
110        // BEFORE removing the lockfile. The earlier `recv_timeout(100ms)`
111        // implementation could let `remove_lock_if_owned` race with a
112        // still-alive heartbeat:
113        //
114        //   1. Drop signals shutdown, ack times out under CI load.
115        //   2. Drop calls `remove_lock_if_owned` → file removed.
116        //   3. Another caller acquires the lock → writes its metadata.
117        //   4. Our heartbeat (still alive, mid-`atomic_write_lock_metadata`
118        //      from before shutdown was checked) overwrites the new
119        //      owner's file with our stale metadata. heartbeat_once's
120        //      ownership check happens BEFORE the write, so it can race
121        //      with a concurrent acquire that flips ownership in between.
122        //   5. The new owner's heartbeat sees foreign metadata, exits
123        //      `NotOwner`. The new owner's drop sees foreign metadata,
124        //      `remove_lock_if_owned` returns `Ok(false)`, file persists.
125        //
126        // Always-joining bounds drop latency to one `park_timeout`
127        // iteration (~25ms) plus the current `heartbeat_once` IO —
128        // typically <500ms under CI load. The unused `heartbeat_done`
129        // channel is kept for backward compatibility with any external
130        // code that may still construct LockGuard manually, but Drop no
131        // longer relies on it.
132        self.shutdown.store(true, Ordering::Release);
133        if let Some(handle) = self.heartbeat.take() {
134            handle.thread().unpark();
135            let _ = handle.join();
136        }
137        // Drain any pending ack so the receiver doesn't carry stale state
138        // if this LockGuard is somehow re-used (it isn't today, but be
139        // defensive).
140        while self.heartbeat_done.try_recv().is_ok() {}
141
142        match remove_lock_if_owned(&self.path, &self.metadata) {
143            Ok(true) => slog_info!("released filesystem lock at {}", self.path.display()),
144            Ok(false) => {}
145            Err(error) => slog_warn!(
146                "failed to release filesystem lock at {}: {}",
147                self.path.display(),
148                error
149            ),
150        }
151    }
152}
153
154#[derive(Debug)]
155pub enum AcquireError {
156    Io(io::Error),
157    Timeout,
158}
159
160impl fmt::Display for AcquireError {
161    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
162        match self {
163            AcquireError::Io(error) => write!(f, "filesystem lock I/O error: {error}"),
164            AcquireError::Timeout => write!(f, "timed out acquiring filesystem lock"),
165        }
166    }
167}
168
169impl std::error::Error for AcquireError {}
170
171impl From<io::Error> for AcquireError {
172    fn from(error: io::Error) -> Self {
173        AcquireError::Io(error)
174    }
175}
176
177fn acquire_with_config(
178    path: &Path,
179    timeout: Option<Duration>,
180    config: LockConfig,
181) -> Result<LockGuard, AcquireError> {
182    let deadline = timeout.map(|timeout| Instant::now() + timeout);
183    let hostname = current_hostname();
184    let mut warned_live_owner = false;
185    let mut warned_stale_live_owner = false;
186    let mut transient_create_failures: u32 = 0;
187
188    loop {
189        if let Some(deadline) = deadline {
190            if Instant::now() >= deadline {
191                return Err(AcquireError::Timeout);
192            }
193        }
194
195        match create_new_lock(path, &hostname, config) {
196            Ok(guard) => return Ok(guard),
197            // The lock file already exists — fall through to inspect its owner.
198            Err(error) if error.kind() == io::ErrorKind::AlreadyExists => {}
199            // Transient contention (chiefly Windows: a concurrent create/delete
200            // on this exact path surfaces as access-denied/sharing-violation
201            // rather than already-exists). Back off one poll interval and retry,
202            // bounded so a persistent failure still propagates instead of
203            // spinning forever.
204            Err(error) if is_transient_create_contention(&error) => {
205                transient_create_failures += 1;
206                if transient_create_failures > MAX_TRANSIENT_CREATE_RETRIES {
207                    return Err(error.into());
208                }
209                sleep_until_retry(deadline, config.poll_interval_ms)?;
210                continue;
211            }
212            Err(error) => return Err(error.into()),
213        }
214        transient_create_failures = 0;
215
216        let metadata = match read_lock_metadata(path) {
217            Ok(metadata) => metadata,
218            Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => continue,
219            Err(ReadLockError::Io(error)) => return Err(error.into()),
220            Err(ReadLockError::Malformed(error)) => {
221                // A just-created O_EXCL file is visible before its owner has
222                // finished writing JSON. Give that transient creation window
223                // one poll interval before treating malformed contents as stale.
224                sleep_until_retry(deadline, config.poll_interval_ms)?;
225                match read_lock_metadata(path) {
226                    Ok(_) => continue,
227                    Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
228                        continue;
229                    }
230                    Err(ReadLockError::Io(error)) => return Err(error.into()),
231                    Err(ReadLockError::Malformed(_)) => {}
232                }
233                slog_warn!(
234                    "removing malformed filesystem lock at {}: {}",
235                    path.display(),
236                    error
237                );
238                remove_lock_file(path)?;
239                continue;
240            }
241        };
242
243        let now = now_ms();
244        let since_heartbeat = now.saturating_sub(metadata.heartbeat_at_ms);
245
246        if metadata.hostname != hostname {
247            let cross_host_stale_ms = config.cross_host_stale_heartbeat_ms();
248            if since_heartbeat > cross_host_stale_ms {
249                slog_warn!(
250                    "reclaiming cross-host filesystem lock at {} from host {} after stale heartbeat ({}ms > {}ms)",
251                    path.display(),
252                    metadata.hostname,
253                    since_heartbeat,
254                    cross_host_stale_ms
255                );
256                // Compare-and-delete: only remove if it's still the SAME stale
257                // owner (a fresh owner may have acquired it in the gap).
258                reclaim_lock_file(path, &metadata)?;
259                continue;
260            }
261            sleep_until_retry(deadline, config.poll_interval_ms)?;
262            continue;
263        }
264
265        if !process_alive(metadata.pid) {
266            slog_warn!(
267                "removing filesystem lock at {} from dead PID {}",
268                path.display(),
269                metadata.pid
270            );
271            // Compare-and-delete: only remove if it's still this dead owner's
272            // lock. A fresh owner could have written a new lock (with a recycled
273            // or different PID) between our liveness check and the unlink.
274            reclaim_lock_file(path, &metadata)?;
275            continue;
276        }
277
278        if since_heartbeat > config.stale_heartbeat_ms && !warned_stale_live_owner {
279            // Same-host PID liveness is authoritative. A SIGSTOP'd process,
280            // suspended VM, or sleeping laptop can miss heartbeats and later
281            // resume inside the critical section. Breaking that lock would allow
282            // split-brain writers, so a paused live owner blocks acquirers until
283            // it resumes and releases the lock or the PID dies.
284            slog_warn!(
285                "filesystem lock at {} held by live PID {} has stale heartbeat ({}ms); NOT breaking",
286                path.display(),
287                metadata.pid,
288                since_heartbeat
289            );
290            warned_stale_live_owner = true;
291        }
292
293        let held_for = now.saturating_sub(metadata.created_at_ms);
294        if held_for > config.live_owner_warn_ms && !warned_live_owner {
295            slog_warn!(
296                "filesystem lock at {} held >10min by live heartbeating PID {}; NOT breaking",
297                path.display(),
298                metadata.pid
299            );
300            warned_live_owner = true;
301        }
302
303        sleep_until_retry(deadline, config.poll_interval_ms)?;
304    }
305}
306
307fn create_new_lock(path: &Path, hostname: &str, config: LockConfig) -> io::Result<LockGuard> {
308    let now = now_ms();
309    let metadata = LockMetadata {
310        pid: std::process::id(),
311        hostname: hostname.to_string(),
312        created_at_ms: now,
313        heartbeat_at_ms: now,
314    };
315
316    create_lock_file_atomically(path, &metadata)?;
317
318    let shutdown = Arc::new(AtomicBool::new(false));
319    let (done_tx, done_rx) = mpsc::channel();
320    let heartbeat_path = path.to_path_buf();
321    let heartbeat_metadata = metadata.clone();
322    let heartbeat_shutdown = Arc::clone(&shutdown);
323    let heartbeat = thread::Builder::new()
324        .name("aft-fs-lock-heartbeat".to_string())
325        .spawn(move || {
326            run_heartbeat(
327                heartbeat_path,
328                heartbeat_metadata,
329                heartbeat_shutdown,
330                config,
331            );
332            let _ = done_tx.send(());
333        })?;
334
335    slog_info!("acquired filesystem lock at {}", path.display());
336
337    Ok(LockGuard {
338        path: path.to_path_buf(),
339        metadata,
340        shutdown,
341        heartbeat_done: done_rx,
342        heartbeat: Some(heartbeat),
343    })
344}
345
346fn run_heartbeat(
347    path: PathBuf,
348    owner: LockMetadata,
349    shutdown: Arc<AtomicBool>,
350    config: LockConfig,
351) {
352    // Number of consecutive heartbeat intervals that can be missed before the
353    // same-host stale window elapses and another process may reclaim the lock.
354    // Beyond this point a sustained failure is genuinely dangerous, so we
355    // escalate the log from warn to error — but we still keep retrying.
356    let stale_intervals = config
357        .stale_heartbeat_ms
358        .checked_div(config.heartbeat_interval_ms.max(1))
359        .unwrap_or(3)
360        .max(1);
361    let mut consecutive_transient_failures: u64 = 0;
362
363    loop {
364        thread::park_timeout(Duration::from_millis(config.heartbeat_interval_ms));
365        if shutdown.load(Ordering::Acquire) {
366            return;
367        }
368
369        match heartbeat_once(&path, &owner) {
370            Ok(()) => {
371                if consecutive_transient_failures > 0 {
372                    slog_info!(
373                        "filesystem lock at {} heartbeat recovered after {} transient failure(s)",
374                        path.display(),
375                        consecutive_transient_failures
376                    );
377                    consecutive_transient_failures = 0;
378                }
379            }
380            Err(error) if heartbeat_error_is_terminal(&error) => {
381                // Terminal states: the lock is provably gone or owned by
382                // someone else. Continuing to write would clobber a new owner's
383                // metadata (the exact race documented in LockGuard::drop), so
384                // stop heartbeating.
385                slog_error!(
386                    "{}; stopping heartbeat",
387                    terminal_heartbeat_message(&path, &error)
388                );
389                return;
390            }
391            Err(error) => {
392                // Transient states: a temporary I/O hiccup (disk/NFS blip,
393                // quota) or a read that raced a concurrent writer mid-write
394                // (momentarily unparseable file). A single such error must NOT
395                // permanently kill the heartbeat — that would silently stop
396                // refreshing heartbeat_at_ms while the guard holder keeps
397                // running its critical section, letting another process reclaim
398                // the lock after the stale window and produce concurrent
399                // writers. Log and retry on the next interval; a later success
400                // resumes heartbeating automatically.
401                consecutive_transient_failures += 1;
402                log_transient_heartbeat_failure(
403                    &path,
404                    &transient_heartbeat_reason(&error),
405                    consecutive_transient_failures,
406                    stale_intervals,
407                );
408            }
409        }
410    }
411}
412
413/// A heartbeat failure is terminal when the lock is provably no longer ours to
414/// refresh: it was removed (`LockGone`) or a different owner now holds it
415/// (`NotOwner`). I/O and malformed-read failures are treated as transient —
416/// they are typically temporary disk/NFS hiccups or a read that raced a
417/// concurrent writer — so the heartbeat retries rather than dying.
418fn heartbeat_error_is_terminal(error: &HeartbeatError) -> bool {
419    matches!(error, HeartbeatError::LockGone | HeartbeatError::NotOwner)
420}
421
422fn terminal_heartbeat_message(path: &Path, error: &HeartbeatError) -> String {
423    match error {
424        HeartbeatError::LockGone => {
425            format!("filesystem lock at {} disappeared", path.display())
426        }
427        HeartbeatError::NotOwner => format!(
428            "filesystem lock at {} is no longer owned by this guard",
429            path.display()
430        ),
431        // Not reachable for non-terminal errors, but keep a sensible string.
432        HeartbeatError::Io(error) => {
433            format!("filesystem lock at {} I/O error: {error}", path.display())
434        }
435        HeartbeatError::Malformed(error) => {
436            format!(
437                "filesystem lock at {} became malformed: {error}",
438                path.display()
439            )
440        }
441    }
442}
443
444fn transient_heartbeat_reason(error: &HeartbeatError) -> String {
445    match error {
446        HeartbeatError::Io(error) => format!("I/O error: {error}"),
447        HeartbeatError::Malformed(error) => format!("became malformed: {error}"),
448        HeartbeatError::LockGone => "lock disappeared".to_string(),
449        HeartbeatError::NotOwner => "lock no longer owned".to_string(),
450    }
451}
452
453/// Log a transient heartbeat failure, escalating to error exactly once when the
454/// failures have lasted long enough that the lock is now reclaimable by another
455/// owner. Beyond that point we stay quiet to avoid log spam while still
456/// retrying — the holder has already been warned the lock is at risk.
457fn log_transient_heartbeat_failure(
458    path: &Path,
459    reason: &str,
460    consecutive_failures: u64,
461    stale_intervals: u64,
462) {
463    if consecutive_failures < stale_intervals {
464        slog_warn!(
465            "transient failure to heartbeat filesystem lock at {}: {}; retrying (attempt {})",
466            path.display(),
467            reason,
468            consecutive_failures
469        );
470    } else if consecutive_failures == stale_intervals {
471        slog_error!(
472            "filesystem lock at {} has failed {} consecutive heartbeats: {}; \
473             the lock may now be reclaimed by another owner — continuing to retry",
474            path.display(),
475            consecutive_failures,
476            reason
477        );
478    }
479}
480
481fn heartbeat_once(path: &Path, owner: &LockMetadata) -> Result<(), HeartbeatError> {
482    let mut metadata = match read_lock_metadata(path) {
483        Ok(metadata) => metadata,
484        Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
485            return Err(HeartbeatError::LockGone);
486        }
487        Err(ReadLockError::Io(error)) => return Err(HeartbeatError::Io(error)),
488        Err(ReadLockError::Malformed(error)) => return Err(HeartbeatError::Malformed(error)),
489    };
490
491    if metadata.pid != owner.pid
492        || metadata.hostname != owner.hostname
493        || metadata.created_at_ms != owner.created_at_ms
494    {
495        return Err(HeartbeatError::NotOwner);
496    }
497
498    metadata.heartbeat_at_ms = now_ms();
499    atomic_write_lock_metadata(path, &metadata).map_err(HeartbeatError::Io)
500}
501
502#[derive(Debug)]
503enum HeartbeatError {
504    Io(io::Error),
505    LockGone,
506    Malformed(serde_json::Error),
507    NotOwner,
508}
509
510#[derive(Debug)]
511enum ReadLockError {
512    Io(io::Error),
513    Malformed(serde_json::Error),
514}
515
516fn read_lock_metadata(path: &Path) -> Result<LockMetadata, ReadLockError> {
517    let bytes = fs::read(path).map_err(ReadLockError::Io)?;
518    serde_json::from_slice(&bytes).map_err(ReadLockError::Malformed)
519}
520
521#[cfg(unix)]
522fn open_new_lock_file(path: &Path) -> io::Result<File> {
523    use std::os::unix::fs::OpenOptionsExt;
524
525    OpenOptions::new()
526        .write(true)
527        .create_new(true)
528        .mode(0o644)
529        .open(path)
530}
531
532#[cfg(not(unix))]
533fn open_new_lock_file(path: &Path) -> io::Result<File> {
534    OpenOptions::new().write(true).create_new(true).open(path)
535}
536
537fn write_lock_metadata_to_file(file: &mut File, metadata: &LockMetadata) -> io::Result<()> {
538    serde_json::to_writer(&mut *file, metadata).map_err(io::Error::other)?;
539    file.write_all(b"\n")?;
540    file.sync_all()
541}
542
543fn create_lock_file_atomically(path: &Path, metadata: &LockMetadata) -> io::Result<()> {
544    let tmp_path = temp_path_for_lock(path);
545    let result = (|| {
546        let mut file = open_new_lock_file(&tmp_path)?;
547        write_lock_metadata_to_file(&mut file, metadata)?;
548        drop(file);
549
550        fs::hard_link(&tmp_path, path)?;
551        sync_parent(path);
552        Ok(())
553    })();
554
555    let _ = fs::remove_file(&tmp_path);
556    result
557}
558
559fn atomic_write_lock_metadata(path: &Path, metadata: &LockMetadata) -> io::Result<()> {
560    let tmp_path = temp_path_for_lock(path);
561    let write_result = (|| {
562        let mut file = OpenOptions::new()
563            .write(true)
564            .create_new(true)
565            .open(&tmp_path)?;
566        write_lock_metadata_to_file(&mut file, metadata)?;
567        drop(file);
568
569        rename_over(&tmp_path, path)?;
570        sync_parent(path);
571        Ok(())
572    })();
573
574    if write_result.is_err() {
575        let _ = fs::remove_file(&tmp_path);
576    }
577
578    write_result
579}
580
581#[cfg(windows)]
582fn rename_over(from: &Path, to: &Path) -> io::Result<()> {
583    // std::fs::rename on Windows maps to MoveFileExW with
584    // MOVEFILE_REPLACE_EXISTING, which atomically replaces an existing
585    // destination. Try that FIRST: an unconditional `remove_file(to)` before
586    // the rename opens a window where `to` does not exist, and a concurrent
587    // reader (e.g. the heartbeat poll) landing in that gap reads NotFound ->
588    // LockGone (terminal) and kills the heartbeat thread. That race made
589    // heartbeat_survives_transient_malformed_and_recovers flaky on Windows CI.
590    match fs::rename(from, to) {
591        Ok(()) => Ok(()),
592        // Fall back to a copy-over (NOT remove-then-rename) when the atomic
593        // replace is refused (e.g. the destination is briefly open by another
594        // handle, or AV/indexer holds the temp source). `fs::copy` opens `to`
595        // with create+truncate and overwrites its bytes in place — the
596        // destination path never stops existing, so a concurrent heartbeat
597        // poll can never read NotFound -> LockGone (terminal). The earlier
598        // remove-then-rename fallback left a window where, if the second
599        // rename also failed, `to` was permanently deleted; copy-over closes
600        // that race class entirely. Worst case a reader observes a partially
601        // written file and gets Malformed, which is transient and retried —
602        // never fatal. Best-effort cleanup of the temp source afterward.
603        Err(original) => match fs::copy(from, to) {
604            Ok(_) => {
605                let _ = fs::remove_file(from);
606                Ok(())
607            }
608            // Both the atomic replace and the copy-over failed. Leave `to`
609            // untouched (copy create+truncate only proceeds once it can open
610            // the destination) and surface the original rename error.
611            Err(_) => Err(original),
612        },
613    }
614}
615
616#[cfg(not(windows))]
617fn rename_over(from: &Path, to: &Path) -> io::Result<()> {
618    fs::rename(from, to)
619}
620
621// Per-thread counter that disambiguates temp lockfile paths for callers
622// inside the same process. `now_nanos()` alone is not unique enough on
623// Windows when two threads race to acquire the same lock (caught by the
624// `acquire_serializes_concurrent_callers` test): two threads sampling the
625// nanosecond clock within the same scheduler quantum produce identical
626// timestamps, both write to the same `.lock.tmp.<pid>.<nanos>` file, one
627// thread's `fs::remove_file(&tmp_path)` cleanup deletes the file before
628// the other thread's `fs::hard_link(&tmp_path, ...)` runs, and the loser
629// panics with `Io(Os { code: 2, NotFound })`.
630//
631// `AtomicU64` shared across threads makes every temp path unique within
632// the process regardless of clock resolution or scheduling races.
633static TEMP_LOCK_COUNTER: AtomicU64 = AtomicU64::new(0);
634
635fn temp_path_for_lock(path: &Path) -> PathBuf {
636    let file_name = path
637        .file_name()
638        .and_then(|name| name.to_str())
639        .unwrap_or("lock");
640    let seq = TEMP_LOCK_COUNTER.fetch_add(1, Ordering::Relaxed);
641    path.with_file_name(format!(
642        ".{file_name}.tmp.{}.{}.{}",
643        std::process::id(),
644        now_nanos(),
645        seq
646    ))
647}
648
649fn remove_lock_if_owned(path: &Path, owner: &LockMetadata) -> io::Result<bool> {
650    let metadata = match read_lock_metadata(path) {
651        Ok(metadata) => metadata,
652        Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
653            return Ok(false);
654        }
655        Err(ReadLockError::Io(error)) => return Err(error),
656        Err(ReadLockError::Malformed(_)) => return Ok(false),
657    };
658
659    if metadata.pid == owner.pid
660        && metadata.hostname == owner.hostname
661        && metadata.created_at_ms == owner.created_at_ms
662    {
663        remove_lock_file(path)?;
664        Ok(true)
665    } else {
666        Ok(false)
667    }
668}
669
670fn remove_lock_file(path: &Path) -> io::Result<()> {
671    match fs::remove_file(path) {
672        Ok(()) => Ok(()),
673        Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(()),
674        Err(error) => Err(error),
675    }
676}
677
678/// Reclaim (delete) a lock file we judged stale/dead, but ONLY if it still holds
679/// the SAME owner identity we evaluated. Between reading the metadata and
680/// deleting, the stale owner could release and a FRESH owner acquire — blindly
681/// `remove_file` would then delete the fresh owner's lock, allowing split-brain
682/// writers. Re-read immediately before the unlink and bail if the identity
683/// (pid, hostname, created_at_ms) changed or the file vanished. POSIX has no
684/// atomic compare-and-unlink, so a microscopic residual race remains, but this
685/// shrinks the window from the whole judgment/poll duration to a couple of
686/// syscalls — the standard mitigation. Returns true if we removed it.
687fn reclaim_lock_file(path: &Path, judged: &LockMetadata) -> io::Result<bool> {
688    match read_lock_metadata(path) {
689        Ok(current) => {
690            if current.pid == judged.pid
691                && current.hostname == judged.hostname
692                && current.created_at_ms == judged.created_at_ms
693            {
694                remove_lock_file(path)?;
695                Ok(true)
696            } else {
697                // A different owner acquired it in the gap — do NOT delete.
698                Ok(false)
699            }
700        }
701        // Already gone (released/reclaimed by someone else) — nothing to do.
702        Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => Ok(false),
703        // Malformed now (mid-write by a new owner) — don't delete; retry next poll.
704        Err(ReadLockError::Malformed(_)) => Ok(false),
705        Err(ReadLockError::Io(error)) => Err(error),
706    }
707}
708
709fn sleep_until_retry(deadline: Option<Instant>, poll_interval_ms: u64) -> Result<(), AcquireError> {
710    let poll = Duration::from_millis(poll_interval_ms);
711    let sleep_for = match deadline {
712        Some(deadline) => {
713            let now = Instant::now();
714            if now >= deadline {
715                return Err(AcquireError::Timeout);
716            }
717            poll.min(deadline.saturating_duration_since(now))
718        }
719        None => poll,
720    };
721    thread::sleep(sleep_for);
722    Ok(())
723}
724
725fn sync_parent(path: &Path) {
726    if let Some(parent) = path.parent() {
727        if let Ok(dir) = File::open(parent) {
728            let _ = dir.sync_all();
729        }
730    }
731}
732
733fn now_ms() -> u64 {
734    SystemTime::now()
735        .duration_since(UNIX_EPOCH)
736        .unwrap_or(Duration::ZERO)
737        .as_millis() as u64
738}
739
740fn now_nanos() -> u128 {
741    SystemTime::now()
742        .duration_since(UNIX_EPOCH)
743        .unwrap_or(Duration::ZERO)
744        .as_nanos()
745}
746
747#[cfg(unix)]
748fn current_hostname() -> String {
749    let mut buffer = [0u8; 256];
750    let result = unsafe { libc::gethostname(buffer.as_mut_ptr().cast(), buffer.len()) };
751    if result == 0 {
752        let len = buffer
753            .iter()
754            .position(|byte| *byte == 0)
755            .unwrap_or(buffer.len());
756        if len > 0 {
757            return String::from_utf8_lossy(&buffer[..len]).into_owned();
758        }
759    }
760
761    std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown-host".to_string())
762}
763
764#[cfg(windows)]
765fn current_hostname() -> String {
766    std::env::var("COMPUTERNAME")
767        .or_else(|_| std::env::var("HOSTNAME"))
768        .unwrap_or_else(|_| "unknown-host".to_string())
769}
770
771#[cfg(not(any(unix, windows)))]
772fn current_hostname() -> String {
773    std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown-host".to_string())
774}
775
776#[cfg(unix)]
777fn process_alive(pid: u32) -> bool {
778    if pid == 0 || pid > i32::MAX as u32 {
779        return false;
780    }
781
782    let result = unsafe { libc::kill(pid as libc::pid_t, 0) };
783    if result == 0 {
784        return true;
785    }
786
787    io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
788}
789
790#[cfg(windows)]
791fn process_alive(pid: u32) -> bool {
792    let filter = format!("PID eq {pid}");
793    let Ok(output) = std::process::Command::new("tasklist")
794        .args(["/FI", &filter, "/FO", "CSV", "/NH"])
795        .output()
796    else {
797        return true;
798    };
799
800    if !output.status.success() {
801        return true;
802    }
803
804    let stdout = String::from_utf8_lossy(&output.stdout);
805
806    // `tasklist /NH /FO CSV` emits a single line per matching process with
807    // every field quoted, e.g. `"image","7420","Console","1","12,345 K"`.
808    // When the filter matches nothing, the literal text
809    // `INFO: No tasks are running which match the specified criteria.`
810    // is written to stdout. The previous matcher was too strict — it looked
811    // for `","{pid}",` patterns mid-line, which works on most Windows builds
812    // but missed Windows runners that emit slightly different quoting (e.g.
813    // a trailing CRLF leaves the pid token at end-of-line as `"7420"\r\n`).
814    // The robust check: confirm the "no tasks" sentinel is absent AND any
815    // PID-quoted form is present.
816    if stdout.contains("No tasks are running") {
817        return false;
818    }
819    stdout.contains(&format!("\"{pid}\""))
820}
821
822#[cfg(not(any(unix, windows)))]
823fn process_alive(_pid: u32) -> bool {
824    true
825}
826
827#[cfg(test)]
828mod tests {
829    use super::*;
830    use std::sync::atomic::{AtomicUsize, Ordering};
831    use std::sync::{Arc, Barrier};
832
833    fn test_config() -> LockConfig {
834        LockConfig {
835            heartbeat_interval_ms: 25,
836            stale_heartbeat_ms: 2_000,
837            live_owner_warn_ms: LIVE_OWNER_WARN_MS,
838            poll_interval_ms: 10,
839        }
840    }
841
842    fn test_lock_path() -> (tempfile::TempDir, PathBuf) {
843        let dir = tempfile::tempdir().expect("create temp dir");
844        let path = dir.path().join("test.lock");
845        (dir, path)
846    }
847
848    fn write_synthetic_lock(path: &Path, metadata: &LockMetadata) {
849        let mut file = open_new_lock_file(path).expect("create synthetic lock");
850        write_lock_metadata_to_file(&mut file, metadata).expect("write synthetic lock");
851    }
852
853    fn synthetic_metadata(pid: u32, hostname: String, created_at_ms: u64) -> LockMetadata {
854        LockMetadata {
855            pid,
856            hostname,
857            created_at_ms,
858            heartbeat_at_ms: created_at_ms,
859        }
860    }
861
862    fn current_process_metadata() -> LockMetadata {
863        let now = now_ms();
864        synthetic_metadata(std::process::id(), current_hostname(), now)
865    }
866
867    #[test]
868    fn acquire_creates_lockfile_and_unlocks_on_drop() {
869        let (_dir, path) = test_lock_path();
870
871        let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
872        let metadata = read_lock_metadata(&path).expect("read lock metadata");
873        assert_eq!(metadata.pid, std::process::id());
874        assert_eq!(metadata.hostname, current_hostname());
875        assert_eq!(metadata.created_at_ms, guard.metadata.created_at_ms);
876
877        drop(guard);
878        assert!(!path.exists());
879    }
880
881    #[test]
882    fn permission_denied_is_treated_as_transient_create_contention() {
883        // Windows surfaces a contended create/delete on the same lock path as
884        // access-denied; acquire must retry these rather than fail the caller.
885        let err = io::Error::from(io::ErrorKind::PermissionDenied);
886        assert!(is_transient_create_contention(&err));
887    }
888
889    #[test]
890    fn unrelated_io_errors_are_not_treated_as_contention() {
891        // A genuinely fatal error (e.g. the parent dir is missing) must still
892        // propagate, not spin in the transient-retry arm.
893        let err = io::Error::from(io::ErrorKind::NotFound);
894        assert!(!is_transient_create_contention(&err));
895    }
896
897    #[cfg(windows)]
898    #[test]
899    fn windows_sharing_violation_is_treated_as_transient_create_contention() {
900        // ERROR_SHARING_VIOLATION (32) is the other contention code Windows
901        // returns when a concurrent actor holds the path open mid-create.
902        let err = io::Error::from_raw_os_error(32);
903        assert!(is_transient_create_contention(&err));
904    }
905
906    #[test]
907    fn reclaim_refuses_to_delete_a_different_owners_lock() {
908        let (_dir, path) = test_lock_path();
909
910        // A lock currently owned by "owner B".
911        let owner_b = synthetic_metadata(4242, "host-b".to_string(), now_ms());
912        create_lock_file_atomically(&path, &owner_b).expect("write owner B lock");
913
914        // We judged a DIFFERENT (older) owner A as stale. Reclaiming must NOT
915        // delete B's lock (the TOCTOU split-brain guard).
916        let judged_a = synthetic_metadata(1111, "host-a".to_string(), now_ms() - 1_000_000);
917        let removed = reclaim_lock_file(&path, &judged_a).expect("reclaim");
918        assert!(!removed, "must not remove a different owner's lock");
919        assert!(path.exists(), "owner B's lock must survive");
920        let still = read_lock_metadata(&path).expect("still readable");
921        assert_eq!(still.pid, 4242, "owner B's lock intact");
922    }
923
924    #[test]
925    fn reclaim_deletes_when_identity_still_matches() {
926        let (_dir, path) = test_lock_path();
927        let owner = synthetic_metadata(1111, "host-a".to_string(), 5_000);
928        create_lock_file_atomically(&path, &owner).expect("write lock");
929
930        // Same identity we judged → safe to remove.
931        let removed = reclaim_lock_file(&path, &owner).expect("reclaim");
932        assert!(removed, "matching-identity stale lock should be removed");
933        assert!(!path.exists());
934
935        // Reclaiming a now-absent lock is a no-op, not an error.
936        assert!(!reclaim_lock_file(&path, &owner).expect("reclaim missing"));
937    }
938
939    #[test]
940    fn acquire_serializes_concurrent_callers() {
941        let (_dir, path) = test_lock_path();
942        let path = Arc::new(path);
943        let barrier = Arc::new(Barrier::new(3));
944        let inside = Arc::new(AtomicUsize::new(0));
945        let entered = Arc::new(AtomicUsize::new(0));
946        let max_inside = Arc::new(AtomicUsize::new(0));
947
948        let mut handles = Vec::new();
949        for _ in 0..2 {
950            let path = Arc::clone(&path);
951            let barrier = Arc::clone(&barrier);
952            let inside = Arc::clone(&inside);
953            let entered = Arc::clone(&entered);
954            let max_inside = Arc::clone(&max_inside);
955            handles.push(thread::spawn(move || {
956                barrier.wait();
957                let guard = acquire_with_config(&path, Some(Duration::from_secs(2)), test_config())
958                    .expect("thread acquire lock");
959                let previous = inside.fetch_add(1, Ordering::SeqCst);
960                assert_eq!(previous, 0, "two lock holders overlapped");
961                entered.fetch_add(1, Ordering::SeqCst);
962                max_inside.fetch_max(previous + 1, Ordering::SeqCst);
963                thread::sleep(Duration::from_millis(75));
964                inside.fetch_sub(1, Ordering::SeqCst);
965                drop(guard);
966            }));
967        }
968
969        barrier.wait();
970        for handle in handles {
971            handle.join().expect("join worker");
972        }
973
974        assert_eq!(entered.load(Ordering::SeqCst), 2);
975        assert_eq!(max_inside.load(Ordering::SeqCst), 1);
976        assert!(!path.exists());
977    }
978
979    #[test]
980    fn heartbeat_updates_lockfile_timestamp() {
981        let (_dir, path) = test_lock_path();
982        let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
983        let initial = read_lock_metadata(&path)
984            .expect("read initial metadata")
985            .heartbeat_at_ms;
986
987        // Poll for up to 2s rather than sleeping a fixed multiple of the
988        // heartbeat interval. `park_timeout` is a *maximum* wait, not a
989        // guaranteed periodic timer — under load (shared macOS CI runners
990        // running other cargo-test threads concurrently) the heartbeat
991        // thread may not fire 3 times within 75ms even though
992        // heartbeat_interval_ms=25. The contract being asserted is "the
993        // heartbeat advances eventually", not "it advances within N
994        // heartbeat intervals".
995        //
996        // On Windows, `rename_over` does `remove_file(to)` then
997        // `fs::rename(from, to)` because Windows can't atomically replace
998        // an open file. There's a brief window where the lockfile doesn't
999        // exist. If the poller hits that window, `read_lock_metadata`
1000        // returns `Io(NotFound)`. Production callers already handle this
1001        // (see `remove_lock_if_owned`), so the test treats `NotFound` the
1002        // same as "no update yet" and keeps polling.
1003        let deadline = std::time::Instant::now() + Duration::from_millis(2_000);
1004        let mut updated = initial;
1005        while std::time::Instant::now() < deadline {
1006            thread::sleep(Duration::from_millis(50));
1007            match read_lock_metadata(&path) {
1008                Ok(meta) => {
1009                    updated = meta.heartbeat_at_ms;
1010                    if updated > initial {
1011                        break;
1012                    }
1013                }
1014                Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
1015                    // Heartbeat thread is mid-rewrite (Windows
1016                    // remove-then-rename window). Retry next iteration.
1017                    continue;
1018                }
1019                Err(other) => panic!("read updated metadata: {other:?}"),
1020            }
1021        }
1022        assert!(
1023            updated > initial,
1024            "heartbeat timestamp did not advance within 2s"
1025        );
1026        drop(guard);
1027    }
1028
1029    #[test]
1030    fn dead_pid_lock_is_reclaimed() {
1031        let (_dir, path) = test_lock_path();
1032        let metadata = synthetic_metadata(999_999_999, current_hostname(), now_ms());
1033        write_synthetic_lock(&path, &metadata);
1034
1035        let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
1036            .expect("reclaim dead pid lock");
1037        let metadata = read_lock_metadata(&path).expect("read reclaimed lock");
1038        assert_eq!(metadata.pid, std::process::id());
1039        drop(guard);
1040    }
1041
1042    #[test]
1043    fn stale_heartbeat_from_live_pid_blocks() {
1044        let (_dir, path) = test_lock_path();
1045        let mut metadata = current_process_metadata();
1046        metadata.created_at_ms = now_ms().saturating_sub(60_000);
1047        metadata.heartbeat_at_ms = now_ms().saturating_sub(60_000);
1048        write_synthetic_lock(&path, &metadata);
1049
1050        let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
1051        assert!(matches!(result, Err(AcquireError::Timeout)));
1052        assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
1053
1054        remove_lock_file(&path).expect("cleanup synthetic lock");
1055    }
1056
1057    #[test]
1058    fn healthy_live_owner_blocks() {
1059        let (_dir, path) = test_lock_path();
1060        let metadata = current_process_metadata();
1061        write_synthetic_lock(&path, &metadata);
1062
1063        let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
1064        assert!(matches!(result, Err(AcquireError::Timeout)));
1065
1066        remove_lock_file(&path).expect("cleanup synthetic lock");
1067    }
1068
1069    #[test]
1070    fn malformed_lockfile_is_reclaimed() {
1071        let (_dir, path) = test_lock_path();
1072        fs::write(&path, b"not valid json").expect("write malformed lock");
1073
1074        let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
1075            .expect("reclaim malformed lock");
1076        let metadata = read_lock_metadata(&path).expect("read reclaimed lock");
1077        assert_eq!(metadata.pid, std::process::id());
1078        drop(guard);
1079    }
1080
1081    #[test]
1082    fn cross_host_lock_is_not_stolen_before_extended_stale_threshold() {
1083        let (_dir, path) = test_lock_path();
1084        let now = now_ms();
1085        let metadata = LockMetadata {
1086            pid: std::process::id(),
1087            hostname: format!("{}-other", current_hostname()),
1088            created_at_ms: now,
1089            heartbeat_at_ms: now,
1090        };
1091        write_synthetic_lock(&path, &metadata);
1092
1093        let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
1094        assert!(matches!(result, Err(AcquireError::Timeout)));
1095        assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
1096
1097        remove_lock_file(&path).expect("cleanup synthetic lock");
1098    }
1099
1100    #[test]
1101    fn stale_cross_host_lock_is_reclaimed_after_extended_threshold() {
1102        let (_dir, path) = test_lock_path();
1103        let stale_at =
1104            now_ms().saturating_sub(test_config().cross_host_stale_heartbeat_ms() + 1_000);
1105        let metadata = LockMetadata {
1106            pid: std::process::id(),
1107            hostname: format!("{}-other", current_hostname()),
1108            created_at_ms: stale_at,
1109            heartbeat_at_ms: stale_at,
1110        };
1111        write_synthetic_lock(&path, &metadata);
1112
1113        let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
1114            .expect("reclaim stale cross-host lock");
1115        let reclaimed = read_lock_metadata(&path).expect("read reclaimed lock");
1116        assert_eq!(reclaimed.hostname, current_hostname());
1117        assert_ne!(reclaimed.created_at_ms, metadata.created_at_ms);
1118        drop(guard);
1119    }
1120
1121    #[test]
1122    fn live_owner_over_10min_warns_but_blocks() {
1123        let (_dir, path) = test_lock_path();
1124        let mut metadata = current_process_metadata();
1125        metadata.created_at_ms = now_ms().saturating_sub(11 * 60 * 1_000);
1126        metadata.heartbeat_at_ms = now_ms();
1127        write_synthetic_lock(&path, &metadata);
1128
1129        let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
1130        assert!(matches!(result, Err(AcquireError::Timeout)));
1131        assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
1132
1133        remove_lock_file(&path).expect("cleanup synthetic lock");
1134    }
1135
1136    #[test]
1137    fn drop_stops_heartbeat_thread() {
1138        let (_dir, path) = test_lock_path();
1139        let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
1140        drop(guard);
1141
1142        thread::sleep(Duration::from_millis(
1143            test_config().heartbeat_interval_ms * 3,
1144        ));
1145        assert!(
1146            !path.exists(),
1147            "heartbeat recreated or kept updating lockfile"
1148        );
1149    }
1150
1151    #[test]
1152    fn heartbeat_error_classification_terminal_vs_transient() {
1153        // Terminal: the lock is provably no longer ours to refresh.
1154        assert!(heartbeat_error_is_terminal(&HeartbeatError::LockGone));
1155        assert!(heartbeat_error_is_terminal(&HeartbeatError::NotOwner));
1156        // Transient: a temporary I/O hiccup or a read that raced a concurrent
1157        // writer. These must NOT kill the heartbeat — it retries instead.
1158        assert!(!heartbeat_error_is_terminal(&HeartbeatError::Io(
1159            io::Error::other("disk blip")
1160        )));
1161        let malformed: serde_json::Error =
1162            serde_json::from_str::<LockMetadata>("not json").unwrap_err();
1163        assert!(!heartbeat_error_is_terminal(&HeartbeatError::Malformed(
1164            malformed
1165        )));
1166    }
1167
1168    #[test]
1169    fn heartbeat_survives_transient_malformed_and_recovers() {
1170        // Regression: a single transient failure (e.g. a read that races a
1171        // concurrent writer and sees a momentarily-unparseable file) used to
1172        // permanently kill the heartbeat thread. The guard holder would then
1173        // run its critical section with a stale heartbeat_at_ms, letting
1174        // another process reclaim the lock after the stale window — concurrent
1175        // writers / split-brain. The heartbeat must instead retry and resume
1176        // refreshing once the file is readable again.
1177        let (_dir, path) = test_lock_path();
1178        let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
1179        let owner = guard.metadata.clone();
1180
1181        // Corrupt the lockfile out from under the heartbeat (simulates a
1182        // concurrent-writer race producing a momentarily-unparseable read).
1183        // The heartbeat reads-then-writes, so it observes Malformed and, with
1184        // the fix, retries instead of dying.
1185        fs::write(&path, b"{ not valid json").expect("corrupt lockfile");
1186
1187        // Give the heartbeat several intervals to observe the malformed file.
1188        // Pre-fix, the thread is dead by now.
1189        thread::sleep(Duration::from_millis(
1190            test_config().heartbeat_interval_ms * 4,
1191        ));
1192
1193        // Restore valid owner metadata with a clearly-stale heartbeat sentinel.
1194        // Ownership fields must match `owner` exactly so heartbeat_once passes
1195        // its ownership check and writes a fresh timestamp.
1196        //
1197        // Use the atomic temp-write+rename path rather than remove-then-recreate:
1198        // a remove followed by a separate create leaves a window where the file
1199        // does not exist, and a heartbeat poll landing in that window reads
1200        // NotFound -> LockGone (terminal) and kills the thread, failing this test
1201        // spuriously under runner load (observed on macOS CI). The atomic replace
1202        // overwrites the corrupt file in place with no no-file window on Unix.
1203        let sentinel = now_ms().saturating_sub(1_000_000);
1204        let mut restored = owner.clone();
1205        restored.heartbeat_at_ms = sentinel;
1206        atomic_write_lock_metadata(&path, &restored).expect("atomically restore lock metadata");
1207
1208        // If the heartbeat thread is still alive (the fix), it will overwrite
1209        // heartbeat_at_ms with a current value. Poll for that recovery.
1210        let deadline = std::time::Instant::now() + Duration::from_millis(3_000);
1211        let mut recovered = false;
1212        while std::time::Instant::now() < deadline {
1213            thread::sleep(Duration::from_millis(25));
1214            match read_lock_metadata(&path) {
1215                Ok(meta)
1216                    if meta.created_at_ms == owner.created_at_ms
1217                        && meta.heartbeat_at_ms > sentinel =>
1218                {
1219                    recovered = true;
1220                    break;
1221                }
1222                _ => continue,
1223            }
1224        }
1225        assert!(
1226            recovered,
1227            "heartbeat did not recover after a transient malformed read — thread likely died"
1228        );
1229        drop(guard);
1230    }
1231}