Skip to main content

aft/
fs_lock.rs

1use std::fmt;
2use std::fs::{self, File, OpenOptions};
3use std::io::{self, Write};
4use std::path::{Path, PathBuf};
5use std::sync::{
6    atomic::{AtomicBool, AtomicU64, Ordering},
7    mpsc, Arc,
8};
9use std::thread::{self, JoinHandle};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use serde::{Deserialize, Serialize};
13
14use crate::{slog_error, slog_info, slog_warn};
15
16pub const HEARTBEAT_INTERVAL_MS: u64 = 5_000;
17pub const STALE_HEARTBEAT_MS: u64 = 15_000;
18pub const LIVE_OWNER_WARN_MS: u64 = 600_000;
19pub const POLL_INTERVAL_MS: u64 = 100;
20
21#[derive(Clone, Copy, Debug)]
22struct LockConfig {
23    heartbeat_interval_ms: u64,
24    stale_heartbeat_ms: u64,
25    live_owner_warn_ms: u64,
26    poll_interval_ms: u64,
27}
28
29impl LockConfig {
30    fn cross_host_stale_heartbeat_ms(self) -> u64 {
31        self.stale_heartbeat_ms.saturating_mul(5)
32    }
33}
34
35impl Default for LockConfig {
36    fn default() -> Self {
37        Self {
38            heartbeat_interval_ms: HEARTBEAT_INTERVAL_MS,
39            stale_heartbeat_ms: STALE_HEARTBEAT_MS,
40            live_owner_warn_ms: LIVE_OWNER_WARN_MS,
41            poll_interval_ms: POLL_INTERVAL_MS,
42        }
43    }
44}
45
46#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
47struct LockMetadata {
48    pid: u32,
49    hostname: String,
50    created_at_ms: u64,
51    heartbeat_at_ms: u64,
52}
53
54/// Acquire a filesystem lock at `path`. Blocks until the lock is held.
55///
56/// The returned guard owns a background heartbeat thread; dropping it releases
57/// the lock and removes the lock file.
58pub fn acquire(path: &Path) -> Result<LockGuard, AcquireError> {
59    acquire_with_config(path, None, LockConfig::default())
60}
61
62/// Try to acquire a filesystem lock at `path` within `timeout`.
63pub fn try_acquire(path: &Path, timeout: Duration) -> Result<LockGuard, AcquireError> {
64    acquire_with_config(path, Some(timeout), LockConfig::default())
65}
66
67pub struct LockGuard {
68    path: PathBuf,
69    metadata: LockMetadata,
70    shutdown: Arc<AtomicBool>,
71    heartbeat_done: mpsc::Receiver<()>,
72    heartbeat: Option<JoinHandle<()>>,
73}
74
75impl Drop for LockGuard {
76    fn drop(&mut self) {
77        // Signal shutdown then unconditionally join the heartbeat thread
78        // BEFORE removing the lockfile. The earlier `recv_timeout(100ms)`
79        // implementation could let `remove_lock_if_owned` race with a
80        // still-alive heartbeat:
81        //
82        //   1. Drop signals shutdown, ack times out under CI load.
83        //   2. Drop calls `remove_lock_if_owned` → file removed.
84        //   3. Another caller acquires the lock → writes its metadata.
85        //   4. Our heartbeat (still alive, mid-`atomic_write_lock_metadata`
86        //      from before shutdown was checked) overwrites the new
87        //      owner's file with our stale metadata. heartbeat_once's
88        //      ownership check happens BEFORE the write, so it can race
89        //      with a concurrent acquire that flips ownership in between.
90        //   5. The new owner's heartbeat sees foreign metadata, exits
91        //      `NotOwner`. The new owner's drop sees foreign metadata,
92        //      `remove_lock_if_owned` returns `Ok(false)`, file persists.
93        //
94        // Always-joining bounds drop latency to one `park_timeout`
95        // iteration (~25ms) plus the current `heartbeat_once` IO —
96        // typically <500ms under CI load. The unused `heartbeat_done`
97        // channel is kept for backward compatibility with any external
98        // code that may still construct LockGuard manually, but Drop no
99        // longer relies on it.
100        self.shutdown.store(true, Ordering::Release);
101        if let Some(handle) = self.heartbeat.take() {
102            handle.thread().unpark();
103            let _ = handle.join();
104        }
105        // Drain any pending ack so the receiver doesn't carry stale state
106        // if this LockGuard is somehow re-used (it isn't today, but be
107        // defensive).
108        while self.heartbeat_done.try_recv().is_ok() {}
109
110        match remove_lock_if_owned(&self.path, &self.metadata) {
111            Ok(true) => slog_info!("released filesystem lock at {}", self.path.display()),
112            Ok(false) => {}
113            Err(error) => slog_warn!(
114                "failed to release filesystem lock at {}: {}",
115                self.path.display(),
116                error
117            ),
118        }
119    }
120}
121
122#[derive(Debug)]
123pub enum AcquireError {
124    Io(io::Error),
125    Timeout,
126}
127
128impl fmt::Display for AcquireError {
129    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130        match self {
131            AcquireError::Io(error) => write!(f, "filesystem lock I/O error: {error}"),
132            AcquireError::Timeout => write!(f, "timed out acquiring filesystem lock"),
133        }
134    }
135}
136
137impl std::error::Error for AcquireError {}
138
139impl From<io::Error> for AcquireError {
140    fn from(error: io::Error) -> Self {
141        AcquireError::Io(error)
142    }
143}
144
145fn acquire_with_config(
146    path: &Path,
147    timeout: Option<Duration>,
148    config: LockConfig,
149) -> Result<LockGuard, AcquireError> {
150    let deadline = timeout.map(|timeout| Instant::now() + timeout);
151    let hostname = current_hostname();
152    let mut warned_live_owner = false;
153    let mut warned_stale_live_owner = false;
154
155    loop {
156        if let Some(deadline) = deadline {
157            if Instant::now() >= deadline {
158                return Err(AcquireError::Timeout);
159            }
160        }
161
162        match create_new_lock(path, &hostname, config) {
163            Ok(guard) => return Ok(guard),
164            Err(error) if error.kind() == io::ErrorKind::AlreadyExists => {}
165            Err(error) => return Err(error.into()),
166        }
167
168        let metadata = match read_lock_metadata(path) {
169            Ok(metadata) => metadata,
170            Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => continue,
171            Err(ReadLockError::Io(error)) => return Err(error.into()),
172            Err(ReadLockError::Malformed(error)) => {
173                // A just-created O_EXCL file is visible before its owner has
174                // finished writing JSON. Give that transient creation window
175                // one poll interval before treating malformed contents as stale.
176                sleep_until_retry(deadline, config.poll_interval_ms)?;
177                match read_lock_metadata(path) {
178                    Ok(_) => continue,
179                    Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
180                        continue;
181                    }
182                    Err(ReadLockError::Io(error)) => return Err(error.into()),
183                    Err(ReadLockError::Malformed(_)) => {}
184                }
185                slog_warn!(
186                    "removing malformed filesystem lock at {}: {}",
187                    path.display(),
188                    error
189                );
190                remove_lock_file(path)?;
191                continue;
192            }
193        };
194
195        let now = now_ms();
196        let since_heartbeat = now.saturating_sub(metadata.heartbeat_at_ms);
197
198        if metadata.hostname != hostname {
199            let cross_host_stale_ms = config.cross_host_stale_heartbeat_ms();
200            if since_heartbeat > cross_host_stale_ms {
201                slog_warn!(
202                    "reclaiming cross-host filesystem lock at {} from host {} after stale heartbeat ({}ms > {}ms)",
203                    path.display(),
204                    metadata.hostname,
205                    since_heartbeat,
206                    cross_host_stale_ms
207                );
208                remove_lock_file(path)?;
209                continue;
210            }
211            sleep_until_retry(deadline, config.poll_interval_ms)?;
212            continue;
213        }
214
215        if !process_alive(metadata.pid) {
216            slog_warn!(
217                "removing filesystem lock at {} from dead PID {}",
218                path.display(),
219                metadata.pid
220            );
221            remove_lock_file(path)?;
222            continue;
223        }
224
225        if since_heartbeat > config.stale_heartbeat_ms && !warned_stale_live_owner {
226            // Same-host PID liveness is authoritative. A SIGSTOP'd process,
227            // suspended VM, or sleeping laptop can miss heartbeats and later
228            // resume inside the critical section. Breaking that lock would allow
229            // split-brain writers, so a paused live owner blocks acquirers until
230            // it resumes and releases the lock or the PID dies.
231            slog_warn!(
232                "filesystem lock at {} held by live PID {} has stale heartbeat ({}ms); NOT breaking",
233                path.display(),
234                metadata.pid,
235                since_heartbeat
236            );
237            warned_stale_live_owner = true;
238        }
239
240        let held_for = now.saturating_sub(metadata.created_at_ms);
241        if held_for > config.live_owner_warn_ms && !warned_live_owner {
242            slog_warn!(
243                "filesystem lock at {} held >10min by live heartbeating PID {}; NOT breaking",
244                path.display(),
245                metadata.pid
246            );
247            warned_live_owner = true;
248        }
249
250        sleep_until_retry(deadline, config.poll_interval_ms)?;
251    }
252}
253
254fn create_new_lock(path: &Path, hostname: &str, config: LockConfig) -> io::Result<LockGuard> {
255    let now = now_ms();
256    let metadata = LockMetadata {
257        pid: std::process::id(),
258        hostname: hostname.to_string(),
259        created_at_ms: now,
260        heartbeat_at_ms: now,
261    };
262
263    create_lock_file_atomically(path, &metadata)?;
264
265    let shutdown = Arc::new(AtomicBool::new(false));
266    let (done_tx, done_rx) = mpsc::channel();
267    let heartbeat_path = path.to_path_buf();
268    let heartbeat_metadata = metadata.clone();
269    let heartbeat_shutdown = Arc::clone(&shutdown);
270    let heartbeat = thread::Builder::new()
271        .name("aft-fs-lock-heartbeat".to_string())
272        .spawn(move || {
273            run_heartbeat(
274                heartbeat_path,
275                heartbeat_metadata,
276                heartbeat_shutdown,
277                config,
278            );
279            let _ = done_tx.send(());
280        })?;
281
282    slog_info!("acquired filesystem lock at {}", path.display());
283
284    Ok(LockGuard {
285        path: path.to_path_buf(),
286        metadata,
287        shutdown,
288        heartbeat_done: done_rx,
289        heartbeat: Some(heartbeat),
290    })
291}
292
293fn run_heartbeat(
294    path: PathBuf,
295    owner: LockMetadata,
296    shutdown: Arc<AtomicBool>,
297    config: LockConfig,
298) {
299    // Number of consecutive heartbeat intervals that can be missed before the
300    // same-host stale window elapses and another process may reclaim the lock.
301    // Beyond this point a sustained failure is genuinely dangerous, so we
302    // escalate the log from warn to error — but we still keep retrying.
303    let stale_intervals = config
304        .stale_heartbeat_ms
305        .checked_div(config.heartbeat_interval_ms.max(1))
306        .unwrap_or(3)
307        .max(1);
308    let mut consecutive_transient_failures: u64 = 0;
309
310    loop {
311        thread::park_timeout(Duration::from_millis(config.heartbeat_interval_ms));
312        if shutdown.load(Ordering::Acquire) {
313            return;
314        }
315
316        match heartbeat_once(&path, &owner) {
317            Ok(()) => {
318                if consecutive_transient_failures > 0 {
319                    slog_info!(
320                        "filesystem lock at {} heartbeat recovered after {} transient failure(s)",
321                        path.display(),
322                        consecutive_transient_failures
323                    );
324                    consecutive_transient_failures = 0;
325                }
326            }
327            Err(error) if heartbeat_error_is_terminal(&error) => {
328                // Terminal states: the lock is provably gone or owned by
329                // someone else. Continuing to write would clobber a new owner's
330                // metadata (the exact race documented in LockGuard::drop), so
331                // stop heartbeating.
332                slog_error!(
333                    "{}; stopping heartbeat",
334                    terminal_heartbeat_message(&path, &error)
335                );
336                return;
337            }
338            Err(error) => {
339                // Transient states: a temporary I/O hiccup (disk/NFS blip,
340                // quota) or a read that raced a concurrent writer mid-write
341                // (momentarily unparseable file). A single such error must NOT
342                // permanently kill the heartbeat — that would silently stop
343                // refreshing heartbeat_at_ms while the guard holder keeps
344                // running its critical section, letting another process reclaim
345                // the lock after the stale window and produce concurrent
346                // writers. Log and retry on the next interval; a later success
347                // resumes heartbeating automatically.
348                consecutive_transient_failures += 1;
349                log_transient_heartbeat_failure(
350                    &path,
351                    &transient_heartbeat_reason(&error),
352                    consecutive_transient_failures,
353                    stale_intervals,
354                );
355            }
356        }
357    }
358}
359
360/// A heartbeat failure is terminal when the lock is provably no longer ours to
361/// refresh: it was removed (`LockGone`) or a different owner now holds it
362/// (`NotOwner`). I/O and malformed-read failures are treated as transient —
363/// they are typically temporary disk/NFS hiccups or a read that raced a
364/// concurrent writer — so the heartbeat retries rather than dying.
365fn heartbeat_error_is_terminal(error: &HeartbeatError) -> bool {
366    matches!(error, HeartbeatError::LockGone | HeartbeatError::NotOwner)
367}
368
369fn terminal_heartbeat_message(path: &Path, error: &HeartbeatError) -> String {
370    match error {
371        HeartbeatError::LockGone => {
372            format!("filesystem lock at {} disappeared", path.display())
373        }
374        HeartbeatError::NotOwner => format!(
375            "filesystem lock at {} is no longer owned by this guard",
376            path.display()
377        ),
378        // Not reachable for non-terminal errors, but keep a sensible string.
379        HeartbeatError::Io(error) => {
380            format!("filesystem lock at {} I/O error: {error}", path.display())
381        }
382        HeartbeatError::Malformed(error) => {
383            format!(
384                "filesystem lock at {} became malformed: {error}",
385                path.display()
386            )
387        }
388    }
389}
390
391fn transient_heartbeat_reason(error: &HeartbeatError) -> String {
392    match error {
393        HeartbeatError::Io(error) => format!("I/O error: {error}"),
394        HeartbeatError::Malformed(error) => format!("became malformed: {error}"),
395        HeartbeatError::LockGone => "lock disappeared".to_string(),
396        HeartbeatError::NotOwner => "lock no longer owned".to_string(),
397    }
398}
399
400/// Log a transient heartbeat failure, escalating to error exactly once when the
401/// failures have lasted long enough that the lock is now reclaimable by another
402/// owner. Beyond that point we stay quiet to avoid log spam while still
403/// retrying — the holder has already been warned the lock is at risk.
404fn log_transient_heartbeat_failure(
405    path: &Path,
406    reason: &str,
407    consecutive_failures: u64,
408    stale_intervals: u64,
409) {
410    if consecutive_failures < stale_intervals {
411        slog_warn!(
412            "transient failure to heartbeat filesystem lock at {}: {}; retrying (attempt {})",
413            path.display(),
414            reason,
415            consecutive_failures
416        );
417    } else if consecutive_failures == stale_intervals {
418        slog_error!(
419            "filesystem lock at {} has failed {} consecutive heartbeats: {}; \
420             the lock may now be reclaimed by another owner — continuing to retry",
421            path.display(),
422            consecutive_failures,
423            reason
424        );
425    }
426}
427
428fn heartbeat_once(path: &Path, owner: &LockMetadata) -> Result<(), HeartbeatError> {
429    let mut metadata = match read_lock_metadata(path) {
430        Ok(metadata) => metadata,
431        Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
432            return Err(HeartbeatError::LockGone);
433        }
434        Err(ReadLockError::Io(error)) => return Err(HeartbeatError::Io(error)),
435        Err(ReadLockError::Malformed(error)) => return Err(HeartbeatError::Malformed(error)),
436    };
437
438    if metadata.pid != owner.pid
439        || metadata.hostname != owner.hostname
440        || metadata.created_at_ms != owner.created_at_ms
441    {
442        return Err(HeartbeatError::NotOwner);
443    }
444
445    metadata.heartbeat_at_ms = now_ms();
446    atomic_write_lock_metadata(path, &metadata).map_err(HeartbeatError::Io)
447}
448
449#[derive(Debug)]
450enum HeartbeatError {
451    Io(io::Error),
452    LockGone,
453    Malformed(serde_json::Error),
454    NotOwner,
455}
456
457#[derive(Debug)]
458enum ReadLockError {
459    Io(io::Error),
460    Malformed(serde_json::Error),
461}
462
463fn read_lock_metadata(path: &Path) -> Result<LockMetadata, ReadLockError> {
464    let bytes = fs::read(path).map_err(ReadLockError::Io)?;
465    serde_json::from_slice(&bytes).map_err(ReadLockError::Malformed)
466}
467
468#[cfg(unix)]
469fn open_new_lock_file(path: &Path) -> io::Result<File> {
470    use std::os::unix::fs::OpenOptionsExt;
471
472    OpenOptions::new()
473        .write(true)
474        .create_new(true)
475        .mode(0o644)
476        .open(path)
477}
478
479#[cfg(not(unix))]
480fn open_new_lock_file(path: &Path) -> io::Result<File> {
481    OpenOptions::new().write(true).create_new(true).open(path)
482}
483
484fn write_lock_metadata_to_file(file: &mut File, metadata: &LockMetadata) -> io::Result<()> {
485    serde_json::to_writer(&mut *file, metadata).map_err(io::Error::other)?;
486    file.write_all(b"\n")?;
487    file.sync_all()
488}
489
490fn create_lock_file_atomically(path: &Path, metadata: &LockMetadata) -> io::Result<()> {
491    let tmp_path = temp_path_for_lock(path);
492    let result = (|| {
493        let mut file = open_new_lock_file(&tmp_path)?;
494        write_lock_metadata_to_file(&mut file, metadata)?;
495        drop(file);
496
497        fs::hard_link(&tmp_path, path)?;
498        sync_parent(path);
499        Ok(())
500    })();
501
502    let _ = fs::remove_file(&tmp_path);
503    result
504}
505
506fn atomic_write_lock_metadata(path: &Path, metadata: &LockMetadata) -> io::Result<()> {
507    let tmp_path = temp_path_for_lock(path);
508    let write_result = (|| {
509        let mut file = OpenOptions::new()
510            .write(true)
511            .create_new(true)
512            .open(&tmp_path)?;
513        write_lock_metadata_to_file(&mut file, metadata)?;
514        drop(file);
515
516        rename_over(&tmp_path, path)?;
517        sync_parent(path);
518        Ok(())
519    })();
520
521    if write_result.is_err() {
522        let _ = fs::remove_file(&tmp_path);
523    }
524
525    write_result
526}
527
528#[cfg(windows)]
529fn rename_over(from: &Path, to: &Path) -> io::Result<()> {
530    // std::fs::rename on Windows maps to MoveFileExW with
531    // MOVEFILE_REPLACE_EXISTING, which atomically replaces an existing
532    // destination. Try that FIRST: an unconditional `remove_file(to)` before
533    // the rename opens a window where `to` does not exist, and a concurrent
534    // reader (e.g. the heartbeat poll) landing in that gap reads NotFound ->
535    // LockGone (terminal) and kills the heartbeat thread. That race made
536    // heartbeat_survives_transient_malformed_and_recovers flaky on Windows CI.
537    match fs::rename(from, to) {
538        Ok(()) => Ok(()),
539        // Fall back to a copy-over (NOT remove-then-rename) when the atomic
540        // replace is refused (e.g. the destination is briefly open by another
541        // handle, or AV/indexer holds the temp source). `fs::copy` opens `to`
542        // with create+truncate and overwrites its bytes in place — the
543        // destination path never stops existing, so a concurrent heartbeat
544        // poll can never read NotFound -> LockGone (terminal). The earlier
545        // remove-then-rename fallback left a window where, if the second
546        // rename also failed, `to` was permanently deleted; copy-over closes
547        // that race class entirely. Worst case a reader observes a partially
548        // written file and gets Malformed, which is transient and retried —
549        // never fatal. Best-effort cleanup of the temp source afterward.
550        Err(original) => match fs::copy(from, to) {
551            Ok(_) => {
552                let _ = fs::remove_file(from);
553                Ok(())
554            }
555            // Both the atomic replace and the copy-over failed. Leave `to`
556            // untouched (copy create+truncate only proceeds once it can open
557            // the destination) and surface the original rename error.
558            Err(_) => Err(original),
559        },
560    }
561}
562
563#[cfg(not(windows))]
564fn rename_over(from: &Path, to: &Path) -> io::Result<()> {
565    fs::rename(from, to)
566}
567
568// Per-thread counter that disambiguates temp lockfile paths for callers
569// inside the same process. `now_nanos()` alone is not unique enough on
570// Windows when two threads race to acquire the same lock (caught by the
571// `acquire_serializes_concurrent_callers` test): two threads sampling the
572// nanosecond clock within the same scheduler quantum produce identical
573// timestamps, both write to the same `.lock.tmp.<pid>.<nanos>` file, one
574// thread's `fs::remove_file(&tmp_path)` cleanup deletes the file before
575// the other thread's `fs::hard_link(&tmp_path, ...)` runs, and the loser
576// panics with `Io(Os { code: 2, NotFound })`.
577//
578// `AtomicU64` shared across threads makes every temp path unique within
579// the process regardless of clock resolution or scheduling races.
580static TEMP_LOCK_COUNTER: AtomicU64 = AtomicU64::new(0);
581
582fn temp_path_for_lock(path: &Path) -> PathBuf {
583    let file_name = path
584        .file_name()
585        .and_then(|name| name.to_str())
586        .unwrap_or("lock");
587    let seq = TEMP_LOCK_COUNTER.fetch_add(1, Ordering::Relaxed);
588    path.with_file_name(format!(
589        ".{file_name}.tmp.{}.{}.{}",
590        std::process::id(),
591        now_nanos(),
592        seq
593    ))
594}
595
596fn remove_lock_if_owned(path: &Path, owner: &LockMetadata) -> io::Result<bool> {
597    let metadata = match read_lock_metadata(path) {
598        Ok(metadata) => metadata,
599        Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
600            return Ok(false);
601        }
602        Err(ReadLockError::Io(error)) => return Err(error),
603        Err(ReadLockError::Malformed(_)) => return Ok(false),
604    };
605
606    if metadata.pid == owner.pid
607        && metadata.hostname == owner.hostname
608        && metadata.created_at_ms == owner.created_at_ms
609    {
610        remove_lock_file(path)?;
611        Ok(true)
612    } else {
613        Ok(false)
614    }
615}
616
617fn remove_lock_file(path: &Path) -> io::Result<()> {
618    match fs::remove_file(path) {
619        Ok(()) => Ok(()),
620        Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(()),
621        Err(error) => Err(error),
622    }
623}
624
625fn sleep_until_retry(deadline: Option<Instant>, poll_interval_ms: u64) -> Result<(), AcquireError> {
626    let poll = Duration::from_millis(poll_interval_ms);
627    let sleep_for = match deadline {
628        Some(deadline) => {
629            let now = Instant::now();
630            if now >= deadline {
631                return Err(AcquireError::Timeout);
632            }
633            poll.min(deadline.saturating_duration_since(now))
634        }
635        None => poll,
636    };
637    thread::sleep(sleep_for);
638    Ok(())
639}
640
641fn sync_parent(path: &Path) {
642    if let Some(parent) = path.parent() {
643        if let Ok(dir) = File::open(parent) {
644            let _ = dir.sync_all();
645        }
646    }
647}
648
649fn now_ms() -> u64 {
650    SystemTime::now()
651        .duration_since(UNIX_EPOCH)
652        .unwrap_or(Duration::ZERO)
653        .as_millis() as u64
654}
655
656fn now_nanos() -> u128 {
657    SystemTime::now()
658        .duration_since(UNIX_EPOCH)
659        .unwrap_or(Duration::ZERO)
660        .as_nanos()
661}
662
663#[cfg(unix)]
664fn current_hostname() -> String {
665    let mut buffer = [0u8; 256];
666    let result = unsafe { libc::gethostname(buffer.as_mut_ptr().cast(), buffer.len()) };
667    if result == 0 {
668        let len = buffer
669            .iter()
670            .position(|byte| *byte == 0)
671            .unwrap_or(buffer.len());
672        if len > 0 {
673            return String::from_utf8_lossy(&buffer[..len]).into_owned();
674        }
675    }
676
677    std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown-host".to_string())
678}
679
680#[cfg(windows)]
681fn current_hostname() -> String {
682    std::env::var("COMPUTERNAME")
683        .or_else(|_| std::env::var("HOSTNAME"))
684        .unwrap_or_else(|_| "unknown-host".to_string())
685}
686
687#[cfg(not(any(unix, windows)))]
688fn current_hostname() -> String {
689    std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown-host".to_string())
690}
691
692#[cfg(unix)]
693fn process_alive(pid: u32) -> bool {
694    if pid == 0 || pid > i32::MAX as u32 {
695        return false;
696    }
697
698    let result = unsafe { libc::kill(pid as libc::pid_t, 0) };
699    if result == 0 {
700        return true;
701    }
702
703    io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
704}
705
706#[cfg(windows)]
707fn process_alive(pid: u32) -> bool {
708    let filter = format!("PID eq {pid}");
709    let Ok(output) = std::process::Command::new("tasklist")
710        .args(["/FI", &filter, "/FO", "CSV", "/NH"])
711        .output()
712    else {
713        return true;
714    };
715
716    if !output.status.success() {
717        return true;
718    }
719
720    let stdout = String::from_utf8_lossy(&output.stdout);
721
722    // `tasklist /NH /FO CSV` emits a single line per matching process with
723    // every field quoted, e.g. `"image","7420","Console","1","12,345 K"`.
724    // When the filter matches nothing, the literal text
725    // `INFO: No tasks are running which match the specified criteria.`
726    // is written to stdout. The previous matcher was too strict — it looked
727    // for `","{pid}",` patterns mid-line, which works on most Windows builds
728    // but missed Windows runners that emit slightly different quoting (e.g.
729    // a trailing CRLF leaves the pid token at end-of-line as `"7420"\r\n`).
730    // The robust check: confirm the "no tasks" sentinel is absent AND any
731    // PID-quoted form is present.
732    if stdout.contains("No tasks are running") {
733        return false;
734    }
735    stdout.contains(&format!("\"{pid}\""))
736}
737
738#[cfg(not(any(unix, windows)))]
739fn process_alive(_pid: u32) -> bool {
740    true
741}
742
743#[cfg(test)]
744mod tests {
745    use super::*;
746    use std::sync::atomic::{AtomicUsize, Ordering};
747    use std::sync::{Arc, Barrier};
748
749    fn test_config() -> LockConfig {
750        LockConfig {
751            heartbeat_interval_ms: 25,
752            stale_heartbeat_ms: 2_000,
753            live_owner_warn_ms: LIVE_OWNER_WARN_MS,
754            poll_interval_ms: 10,
755        }
756    }
757
758    fn test_lock_path() -> (tempfile::TempDir, PathBuf) {
759        let dir = tempfile::tempdir().expect("create temp dir");
760        let path = dir.path().join("test.lock");
761        (dir, path)
762    }
763
764    fn write_synthetic_lock(path: &Path, metadata: &LockMetadata) {
765        let mut file = open_new_lock_file(path).expect("create synthetic lock");
766        write_lock_metadata_to_file(&mut file, metadata).expect("write synthetic lock");
767    }
768
769    fn synthetic_metadata(pid: u32, hostname: String, created_at_ms: u64) -> LockMetadata {
770        LockMetadata {
771            pid,
772            hostname,
773            created_at_ms,
774            heartbeat_at_ms: created_at_ms,
775        }
776    }
777
778    fn current_process_metadata() -> LockMetadata {
779        let now = now_ms();
780        synthetic_metadata(std::process::id(), current_hostname(), now)
781    }
782
783    #[test]
784    fn acquire_creates_lockfile_and_unlocks_on_drop() {
785        let (_dir, path) = test_lock_path();
786
787        let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
788        let metadata = read_lock_metadata(&path).expect("read lock metadata");
789        assert_eq!(metadata.pid, std::process::id());
790        assert_eq!(metadata.hostname, current_hostname());
791        assert_eq!(metadata.created_at_ms, guard.metadata.created_at_ms);
792
793        drop(guard);
794        assert!(!path.exists());
795    }
796
797    #[test]
798    fn acquire_serializes_concurrent_callers() {
799        let (_dir, path) = test_lock_path();
800        let path = Arc::new(path);
801        let barrier = Arc::new(Barrier::new(3));
802        let inside = Arc::new(AtomicUsize::new(0));
803        let entered = Arc::new(AtomicUsize::new(0));
804        let max_inside = Arc::new(AtomicUsize::new(0));
805
806        let mut handles = Vec::new();
807        for _ in 0..2 {
808            let path = Arc::clone(&path);
809            let barrier = Arc::clone(&barrier);
810            let inside = Arc::clone(&inside);
811            let entered = Arc::clone(&entered);
812            let max_inside = Arc::clone(&max_inside);
813            handles.push(thread::spawn(move || {
814                barrier.wait();
815                let guard = acquire_with_config(&path, Some(Duration::from_secs(2)), test_config())
816                    .expect("thread acquire lock");
817                let previous = inside.fetch_add(1, Ordering::SeqCst);
818                assert_eq!(previous, 0, "two lock holders overlapped");
819                entered.fetch_add(1, Ordering::SeqCst);
820                max_inside.fetch_max(previous + 1, Ordering::SeqCst);
821                thread::sleep(Duration::from_millis(75));
822                inside.fetch_sub(1, Ordering::SeqCst);
823                drop(guard);
824            }));
825        }
826
827        barrier.wait();
828        for handle in handles {
829            handle.join().expect("join worker");
830        }
831
832        assert_eq!(entered.load(Ordering::SeqCst), 2);
833        assert_eq!(max_inside.load(Ordering::SeqCst), 1);
834        assert!(!path.exists());
835    }
836
837    #[test]
838    fn heartbeat_updates_lockfile_timestamp() {
839        let (_dir, path) = test_lock_path();
840        let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
841        let initial = read_lock_metadata(&path)
842            .expect("read initial metadata")
843            .heartbeat_at_ms;
844
845        // Poll for up to 2s rather than sleeping a fixed multiple of the
846        // heartbeat interval. `park_timeout` is a *maximum* wait, not a
847        // guaranteed periodic timer — under load (shared macOS CI runners
848        // running other cargo-test threads concurrently) the heartbeat
849        // thread may not fire 3 times within 75ms even though
850        // heartbeat_interval_ms=25. The contract being asserted is "the
851        // heartbeat advances eventually", not "it advances within N
852        // heartbeat intervals".
853        //
854        // On Windows, `rename_over` does `remove_file(to)` then
855        // `fs::rename(from, to)` because Windows can't atomically replace
856        // an open file. There's a brief window where the lockfile doesn't
857        // exist. If the poller hits that window, `read_lock_metadata`
858        // returns `Io(NotFound)`. Production callers already handle this
859        // (see `remove_lock_if_owned`), so the test treats `NotFound` the
860        // same as "no update yet" and keeps polling.
861        let deadline = std::time::Instant::now() + Duration::from_millis(2_000);
862        let mut updated = initial;
863        while std::time::Instant::now() < deadline {
864            thread::sleep(Duration::from_millis(50));
865            match read_lock_metadata(&path) {
866                Ok(meta) => {
867                    updated = meta.heartbeat_at_ms;
868                    if updated > initial {
869                        break;
870                    }
871                }
872                Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
873                    // Heartbeat thread is mid-rewrite (Windows
874                    // remove-then-rename window). Retry next iteration.
875                    continue;
876                }
877                Err(other) => panic!("read updated metadata: {other:?}"),
878            }
879        }
880        assert!(
881            updated > initial,
882            "heartbeat timestamp did not advance within 2s"
883        );
884        drop(guard);
885    }
886
887    #[test]
888    fn dead_pid_lock_is_reclaimed() {
889        let (_dir, path) = test_lock_path();
890        let metadata = synthetic_metadata(999_999_999, current_hostname(), now_ms());
891        write_synthetic_lock(&path, &metadata);
892
893        let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
894            .expect("reclaim dead pid lock");
895        let metadata = read_lock_metadata(&path).expect("read reclaimed lock");
896        assert_eq!(metadata.pid, std::process::id());
897        drop(guard);
898    }
899
900    #[test]
901    fn stale_heartbeat_from_live_pid_blocks() {
902        let (_dir, path) = test_lock_path();
903        let mut metadata = current_process_metadata();
904        metadata.created_at_ms = now_ms().saturating_sub(60_000);
905        metadata.heartbeat_at_ms = now_ms().saturating_sub(60_000);
906        write_synthetic_lock(&path, &metadata);
907
908        let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
909        assert!(matches!(result, Err(AcquireError::Timeout)));
910        assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
911
912        remove_lock_file(&path).expect("cleanup synthetic lock");
913    }
914
915    #[test]
916    fn healthy_live_owner_blocks() {
917        let (_dir, path) = test_lock_path();
918        let metadata = current_process_metadata();
919        write_synthetic_lock(&path, &metadata);
920
921        let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
922        assert!(matches!(result, Err(AcquireError::Timeout)));
923
924        remove_lock_file(&path).expect("cleanup synthetic lock");
925    }
926
927    #[test]
928    fn malformed_lockfile_is_reclaimed() {
929        let (_dir, path) = test_lock_path();
930        fs::write(&path, b"not valid json").expect("write malformed lock");
931
932        let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
933            .expect("reclaim malformed lock");
934        let metadata = read_lock_metadata(&path).expect("read reclaimed lock");
935        assert_eq!(metadata.pid, std::process::id());
936        drop(guard);
937    }
938
939    #[test]
940    fn cross_host_lock_is_not_stolen_before_extended_stale_threshold() {
941        let (_dir, path) = test_lock_path();
942        let now = now_ms();
943        let metadata = LockMetadata {
944            pid: std::process::id(),
945            hostname: format!("{}-other", current_hostname()),
946            created_at_ms: now,
947            heartbeat_at_ms: now,
948        };
949        write_synthetic_lock(&path, &metadata);
950
951        let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
952        assert!(matches!(result, Err(AcquireError::Timeout)));
953        assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
954
955        remove_lock_file(&path).expect("cleanup synthetic lock");
956    }
957
958    #[test]
959    fn stale_cross_host_lock_is_reclaimed_after_extended_threshold() {
960        let (_dir, path) = test_lock_path();
961        let stale_at =
962            now_ms().saturating_sub(test_config().cross_host_stale_heartbeat_ms() + 1_000);
963        let metadata = LockMetadata {
964            pid: std::process::id(),
965            hostname: format!("{}-other", current_hostname()),
966            created_at_ms: stale_at,
967            heartbeat_at_ms: stale_at,
968        };
969        write_synthetic_lock(&path, &metadata);
970
971        let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
972            .expect("reclaim stale cross-host lock");
973        let reclaimed = read_lock_metadata(&path).expect("read reclaimed lock");
974        assert_eq!(reclaimed.hostname, current_hostname());
975        assert_ne!(reclaimed.created_at_ms, metadata.created_at_ms);
976        drop(guard);
977    }
978
979    #[test]
980    fn live_owner_over_10min_warns_but_blocks() {
981        let (_dir, path) = test_lock_path();
982        let mut metadata = current_process_metadata();
983        metadata.created_at_ms = now_ms().saturating_sub(11 * 60 * 1_000);
984        metadata.heartbeat_at_ms = now_ms();
985        write_synthetic_lock(&path, &metadata);
986
987        let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
988        assert!(matches!(result, Err(AcquireError::Timeout)));
989        assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
990
991        remove_lock_file(&path).expect("cleanup synthetic lock");
992    }
993
994    #[test]
995    fn drop_stops_heartbeat_thread() {
996        let (_dir, path) = test_lock_path();
997        let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
998        drop(guard);
999
1000        thread::sleep(Duration::from_millis(
1001            test_config().heartbeat_interval_ms * 3,
1002        ));
1003        assert!(
1004            !path.exists(),
1005            "heartbeat recreated or kept updating lockfile"
1006        );
1007    }
1008
1009    #[test]
1010    fn heartbeat_error_classification_terminal_vs_transient() {
1011        // Terminal: the lock is provably no longer ours to refresh.
1012        assert!(heartbeat_error_is_terminal(&HeartbeatError::LockGone));
1013        assert!(heartbeat_error_is_terminal(&HeartbeatError::NotOwner));
1014        // Transient: a temporary I/O hiccup or a read that raced a concurrent
1015        // writer. These must NOT kill the heartbeat — it retries instead.
1016        assert!(!heartbeat_error_is_terminal(&HeartbeatError::Io(
1017            io::Error::other("disk blip")
1018        )));
1019        let malformed: serde_json::Error =
1020            serde_json::from_str::<LockMetadata>("not json").unwrap_err();
1021        assert!(!heartbeat_error_is_terminal(&HeartbeatError::Malformed(
1022            malformed
1023        )));
1024    }
1025
1026    #[test]
1027    fn heartbeat_survives_transient_malformed_and_recovers() {
1028        // Regression: a single transient failure (e.g. a read that races a
1029        // concurrent writer and sees a momentarily-unparseable file) used to
1030        // permanently kill the heartbeat thread. The guard holder would then
1031        // run its critical section with a stale heartbeat_at_ms, letting
1032        // another process reclaim the lock after the stale window — concurrent
1033        // writers / split-brain. The heartbeat must instead retry and resume
1034        // refreshing once the file is readable again.
1035        let (_dir, path) = test_lock_path();
1036        let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
1037        let owner = guard.metadata.clone();
1038
1039        // Corrupt the lockfile out from under the heartbeat (simulates a
1040        // concurrent-writer race producing a momentarily-unparseable read).
1041        // The heartbeat reads-then-writes, so it observes Malformed and, with
1042        // the fix, retries instead of dying.
1043        fs::write(&path, b"{ not valid json").expect("corrupt lockfile");
1044
1045        // Give the heartbeat several intervals to observe the malformed file.
1046        // Pre-fix, the thread is dead by now.
1047        thread::sleep(Duration::from_millis(
1048            test_config().heartbeat_interval_ms * 4,
1049        ));
1050
1051        // Restore valid owner metadata with a clearly-stale heartbeat sentinel.
1052        // Ownership fields must match `owner` exactly so heartbeat_once passes
1053        // its ownership check and writes a fresh timestamp.
1054        //
1055        // Use the atomic temp-write+rename path rather than remove-then-recreate:
1056        // a remove followed by a separate create leaves a window where the file
1057        // does not exist, and a heartbeat poll landing in that window reads
1058        // NotFound -> LockGone (terminal) and kills the thread, failing this test
1059        // spuriously under runner load (observed on macOS CI). The atomic replace
1060        // overwrites the corrupt file in place with no no-file window on Unix.
1061        let sentinel = now_ms().saturating_sub(1_000_000);
1062        let mut restored = owner.clone();
1063        restored.heartbeat_at_ms = sentinel;
1064        atomic_write_lock_metadata(&path, &restored).expect("atomically restore lock metadata");
1065
1066        // If the heartbeat thread is still alive (the fix), it will overwrite
1067        // heartbeat_at_ms with a current value. Poll for that recovery.
1068        let deadline = std::time::Instant::now() + Duration::from_millis(3_000);
1069        let mut recovered = false;
1070        while std::time::Instant::now() < deadline {
1071            thread::sleep(Duration::from_millis(25));
1072            match read_lock_metadata(&path) {
1073                Ok(meta)
1074                    if meta.created_at_ms == owner.created_at_ms
1075                        && meta.heartbeat_at_ms > sentinel =>
1076                {
1077                    recovered = true;
1078                    break;
1079                }
1080                _ => continue,
1081            }
1082        }
1083        assert!(
1084            recovered,
1085            "heartbeat did not recover after a transient malformed read — thread likely died"
1086        );
1087        drop(guard);
1088    }
1089}