Skip to main content

aft/
fs_lock.rs

1use std::fmt;
2use std::fs::{self, File, OpenOptions};
3use std::io::{self, Write};
4use std::path::{Path, PathBuf};
5use std::sync::{
6    atomic::{AtomicBool, AtomicU64, Ordering},
7    mpsc, Arc,
8};
9use std::thread::{self, JoinHandle};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use serde::{Deserialize, Serialize};
13
14use crate::{slog_error, slog_info, slog_warn};
15
16pub const HEARTBEAT_INTERVAL_MS: u64 = 5_000;
17pub const STALE_HEARTBEAT_MS: u64 = 15_000;
18pub const LIVE_OWNER_WARN_MS: u64 = 600_000;
19pub const POLL_INTERVAL_MS: u64 = 100;
20
21#[derive(Clone, Copy, Debug)]
22struct LockConfig {
23    heartbeat_interval_ms: u64,
24    stale_heartbeat_ms: u64,
25    live_owner_warn_ms: u64,
26    poll_interval_ms: u64,
27}
28
29impl LockConfig {
30    fn cross_host_stale_heartbeat_ms(self) -> u64 {
31        self.stale_heartbeat_ms.saturating_mul(5)
32    }
33}
34
35impl Default for LockConfig {
36    fn default() -> Self {
37        Self {
38            heartbeat_interval_ms: HEARTBEAT_INTERVAL_MS,
39            stale_heartbeat_ms: STALE_HEARTBEAT_MS,
40            live_owner_warn_ms: LIVE_OWNER_WARN_MS,
41            poll_interval_ms: POLL_INTERVAL_MS,
42        }
43    }
44}
45
46#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
47struct LockMetadata {
48    pid: u32,
49    hostname: String,
50    created_at_ms: u64,
51    heartbeat_at_ms: u64,
52}
53
54/// Acquire a filesystem lock at `path`. Blocks until the lock is held.
55///
56/// The returned guard owns a background heartbeat thread; dropping it releases
57/// the lock and removes the lock file.
58pub fn acquire(path: &Path) -> Result<LockGuard, AcquireError> {
59    acquire_with_config(path, None, LockConfig::default())
60}
61
62/// Try to acquire a filesystem lock at `path` within `timeout`.
63pub fn try_acquire(path: &Path, timeout: Duration) -> Result<LockGuard, AcquireError> {
64    acquire_with_config(path, Some(timeout), LockConfig::default())
65}
66
67pub struct LockGuard {
68    path: PathBuf,
69    metadata: LockMetadata,
70    shutdown: Arc<AtomicBool>,
71    heartbeat_done: mpsc::Receiver<()>,
72    heartbeat: Option<JoinHandle<()>>,
73}
74
75impl Drop for LockGuard {
76    fn drop(&mut self) {
77        // Signal shutdown then unconditionally join the heartbeat thread
78        // BEFORE removing the lockfile. The earlier `recv_timeout(100ms)`
79        // implementation could let `remove_lock_if_owned` race with a
80        // still-alive heartbeat:
81        //
82        //   1. Drop signals shutdown, ack times out under CI load.
83        //   2. Drop calls `remove_lock_if_owned` → file removed.
84        //   3. Another caller acquires the lock → writes its metadata.
85        //   4. Our heartbeat (still alive, mid-`atomic_write_lock_metadata`
86        //      from before shutdown was checked) overwrites the new
87        //      owner's file with our stale metadata. heartbeat_once's
88        //      ownership check happens BEFORE the write, so it can race
89        //      with a concurrent acquire that flips ownership in between.
90        //   5. The new owner's heartbeat sees foreign metadata, exits
91        //      `NotOwner`. The new owner's drop sees foreign metadata,
92        //      `remove_lock_if_owned` returns `Ok(false)`, file persists.
93        //
94        // Always-joining bounds drop latency to one `park_timeout`
95        // iteration (~25ms) plus the current `heartbeat_once` IO —
96        // typically <500ms under CI load. The unused `heartbeat_done`
97        // channel is kept for backward compatibility with any external
98        // code that may still construct LockGuard manually, but Drop no
99        // longer relies on it.
100        self.shutdown.store(true, Ordering::Release);
101        if let Some(handle) = self.heartbeat.take() {
102            handle.thread().unpark();
103            let _ = handle.join();
104        }
105        // Drain any pending ack so the receiver doesn't carry stale state
106        // if this LockGuard is somehow re-used (it isn't today, but be
107        // defensive).
108        while self.heartbeat_done.try_recv().is_ok() {}
109
110        match remove_lock_if_owned(&self.path, &self.metadata) {
111            Ok(true) => slog_info!("released filesystem lock at {}", self.path.display()),
112            Ok(false) => {}
113            Err(error) => slog_warn!(
114                "failed to release filesystem lock at {}: {}",
115                self.path.display(),
116                error
117            ),
118        }
119    }
120}
121
122#[derive(Debug)]
123pub enum AcquireError {
124    Io(io::Error),
125    Timeout,
126}
127
128impl fmt::Display for AcquireError {
129    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130        match self {
131            AcquireError::Io(error) => write!(f, "filesystem lock I/O error: {error}"),
132            AcquireError::Timeout => write!(f, "timed out acquiring filesystem lock"),
133        }
134    }
135}
136
137impl std::error::Error for AcquireError {}
138
139impl From<io::Error> for AcquireError {
140    fn from(error: io::Error) -> Self {
141        AcquireError::Io(error)
142    }
143}
144
145fn acquire_with_config(
146    path: &Path,
147    timeout: Option<Duration>,
148    config: LockConfig,
149) -> Result<LockGuard, AcquireError> {
150    let deadline = timeout.map(|timeout| Instant::now() + timeout);
151    let hostname = current_hostname();
152    let mut warned_live_owner = false;
153    let mut warned_stale_live_owner = false;
154
155    loop {
156        if let Some(deadline) = deadline {
157            if Instant::now() >= deadline {
158                return Err(AcquireError::Timeout);
159            }
160        }
161
162        match create_new_lock(path, &hostname, config) {
163            Ok(guard) => return Ok(guard),
164            Err(error) if error.kind() == io::ErrorKind::AlreadyExists => {}
165            Err(error) => return Err(error.into()),
166        }
167
168        let metadata = match read_lock_metadata(path) {
169            Ok(metadata) => metadata,
170            Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => continue,
171            Err(ReadLockError::Io(error)) => return Err(error.into()),
172            Err(ReadLockError::Malformed(error)) => {
173                // A just-created O_EXCL file is visible before its owner has
174                // finished writing JSON. Give that transient creation window
175                // one poll interval before treating malformed contents as stale.
176                sleep_until_retry(deadline, config.poll_interval_ms)?;
177                match read_lock_metadata(path) {
178                    Ok(_) => continue,
179                    Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
180                        continue;
181                    }
182                    Err(ReadLockError::Io(error)) => return Err(error.into()),
183                    Err(ReadLockError::Malformed(_)) => {}
184                }
185                slog_warn!(
186                    "removing malformed filesystem lock at {}: {}",
187                    path.display(),
188                    error
189                );
190                remove_lock_file(path)?;
191                continue;
192            }
193        };
194
195        let now = now_ms();
196        let since_heartbeat = now.saturating_sub(metadata.heartbeat_at_ms);
197
198        if metadata.hostname != hostname {
199            let cross_host_stale_ms = config.cross_host_stale_heartbeat_ms();
200            if since_heartbeat > cross_host_stale_ms {
201                slog_warn!(
202                    "reclaiming cross-host filesystem lock at {} from host {} after stale heartbeat ({}ms > {}ms)",
203                    path.display(),
204                    metadata.hostname,
205                    since_heartbeat,
206                    cross_host_stale_ms
207                );
208                remove_lock_file(path)?;
209                continue;
210            }
211            sleep_until_retry(deadline, config.poll_interval_ms)?;
212            continue;
213        }
214
215        if !process_alive(metadata.pid) {
216            slog_warn!(
217                "removing filesystem lock at {} from dead PID {}",
218                path.display(),
219                metadata.pid
220            );
221            remove_lock_file(path)?;
222            continue;
223        }
224
225        if since_heartbeat > config.stale_heartbeat_ms && !warned_stale_live_owner {
226            // Same-host PID liveness is authoritative. A SIGSTOP'd process,
227            // suspended VM, or sleeping laptop can miss heartbeats and later
228            // resume inside the critical section. Breaking that lock would allow
229            // split-brain writers, so a paused live owner blocks acquirers until
230            // it resumes and releases the lock or the PID dies.
231            slog_warn!(
232                "filesystem lock at {} held by live PID {} has stale heartbeat ({}ms); NOT breaking",
233                path.display(),
234                metadata.pid,
235                since_heartbeat
236            );
237            warned_stale_live_owner = true;
238        }
239
240        let held_for = now.saturating_sub(metadata.created_at_ms);
241        if held_for > config.live_owner_warn_ms && !warned_live_owner {
242            slog_warn!(
243                "filesystem lock at {} held >10min by live heartbeating PID {}; NOT breaking",
244                path.display(),
245                metadata.pid
246            );
247            warned_live_owner = true;
248        }
249
250        sleep_until_retry(deadline, config.poll_interval_ms)?;
251    }
252}
253
254fn create_new_lock(path: &Path, hostname: &str, config: LockConfig) -> io::Result<LockGuard> {
255    let now = now_ms();
256    let metadata = LockMetadata {
257        pid: std::process::id(),
258        hostname: hostname.to_string(),
259        created_at_ms: now,
260        heartbeat_at_ms: now,
261    };
262
263    create_lock_file_atomically(path, &metadata)?;
264
265    let shutdown = Arc::new(AtomicBool::new(false));
266    let (done_tx, done_rx) = mpsc::channel();
267    let heartbeat_path = path.to_path_buf();
268    let heartbeat_metadata = metadata.clone();
269    let heartbeat_shutdown = Arc::clone(&shutdown);
270    let heartbeat = thread::Builder::new()
271        .name("aft-fs-lock-heartbeat".to_string())
272        .spawn(move || {
273            run_heartbeat(
274                heartbeat_path,
275                heartbeat_metadata,
276                heartbeat_shutdown,
277                config,
278            );
279            let _ = done_tx.send(());
280        })?;
281
282    slog_info!("acquired filesystem lock at {}", path.display());
283
284    Ok(LockGuard {
285        path: path.to_path_buf(),
286        metadata,
287        shutdown,
288        heartbeat_done: done_rx,
289        heartbeat: Some(heartbeat),
290    })
291}
292
293fn run_heartbeat(
294    path: PathBuf,
295    owner: LockMetadata,
296    shutdown: Arc<AtomicBool>,
297    config: LockConfig,
298) {
299    // Number of consecutive heartbeat intervals that can be missed before the
300    // same-host stale window elapses and another process may reclaim the lock.
301    // Beyond this point a sustained failure is genuinely dangerous, so we
302    // escalate the log from warn to error — but we still keep retrying.
303    let stale_intervals = config
304        .stale_heartbeat_ms
305        .checked_div(config.heartbeat_interval_ms.max(1))
306        .unwrap_or(3)
307        .max(1);
308    let mut consecutive_transient_failures: u64 = 0;
309
310    loop {
311        thread::park_timeout(Duration::from_millis(config.heartbeat_interval_ms));
312        if shutdown.load(Ordering::Acquire) {
313            return;
314        }
315
316        match heartbeat_once(&path, &owner) {
317            Ok(()) => {
318                if consecutive_transient_failures > 0 {
319                    slog_info!(
320                        "filesystem lock at {} heartbeat recovered after {} transient failure(s)",
321                        path.display(),
322                        consecutive_transient_failures
323                    );
324                    consecutive_transient_failures = 0;
325                }
326            }
327            Err(error) if heartbeat_error_is_terminal(&error) => {
328                // Terminal states: the lock is provably gone or owned by
329                // someone else. Continuing to write would clobber a new owner's
330                // metadata (the exact race documented in LockGuard::drop), so
331                // stop heartbeating.
332                slog_error!(
333                    "{}; stopping heartbeat",
334                    terminal_heartbeat_message(&path, &error)
335                );
336                return;
337            }
338            Err(error) => {
339                // Transient states: a temporary I/O hiccup (disk/NFS blip,
340                // quota) or a read that raced a concurrent writer mid-write
341                // (momentarily unparseable file). A single such error must NOT
342                // permanently kill the heartbeat — that would silently stop
343                // refreshing heartbeat_at_ms while the guard holder keeps
344                // running its critical section, letting another process reclaim
345                // the lock after the stale window and produce concurrent
346                // writers. Log and retry on the next interval; a later success
347                // resumes heartbeating automatically.
348                consecutive_transient_failures += 1;
349                log_transient_heartbeat_failure(
350                    &path,
351                    &transient_heartbeat_reason(&error),
352                    consecutive_transient_failures,
353                    stale_intervals,
354                );
355            }
356        }
357    }
358}
359
360/// A heartbeat failure is terminal when the lock is provably no longer ours to
361/// refresh: it was removed (`LockGone`) or a different owner now holds it
362/// (`NotOwner`). I/O and malformed-read failures are treated as transient —
363/// they are typically temporary disk/NFS hiccups or a read that raced a
364/// concurrent writer — so the heartbeat retries rather than dying.
365fn heartbeat_error_is_terminal(error: &HeartbeatError) -> bool {
366    matches!(error, HeartbeatError::LockGone | HeartbeatError::NotOwner)
367}
368
369fn terminal_heartbeat_message(path: &Path, error: &HeartbeatError) -> String {
370    match error {
371        HeartbeatError::LockGone => {
372            format!("filesystem lock at {} disappeared", path.display())
373        }
374        HeartbeatError::NotOwner => format!(
375            "filesystem lock at {} is no longer owned by this guard",
376            path.display()
377        ),
378        // Not reachable for non-terminal errors, but keep a sensible string.
379        HeartbeatError::Io(error) => {
380            format!("filesystem lock at {} I/O error: {error}", path.display())
381        }
382        HeartbeatError::Malformed(error) => {
383            format!(
384                "filesystem lock at {} became malformed: {error}",
385                path.display()
386            )
387        }
388    }
389}
390
391fn transient_heartbeat_reason(error: &HeartbeatError) -> String {
392    match error {
393        HeartbeatError::Io(error) => format!("I/O error: {error}"),
394        HeartbeatError::Malformed(error) => format!("became malformed: {error}"),
395        HeartbeatError::LockGone => "lock disappeared".to_string(),
396        HeartbeatError::NotOwner => "lock no longer owned".to_string(),
397    }
398}
399
400/// Log a transient heartbeat failure, escalating to error exactly once when the
401/// failures have lasted long enough that the lock is now reclaimable by another
402/// owner. Beyond that point we stay quiet to avoid log spam while still
403/// retrying — the holder has already been warned the lock is at risk.
404fn log_transient_heartbeat_failure(
405    path: &Path,
406    reason: &str,
407    consecutive_failures: u64,
408    stale_intervals: u64,
409) {
410    if consecutive_failures < stale_intervals {
411        slog_warn!(
412            "transient failure to heartbeat filesystem lock at {}: {}; retrying (attempt {})",
413            path.display(),
414            reason,
415            consecutive_failures
416        );
417    } else if consecutive_failures == stale_intervals {
418        slog_error!(
419            "filesystem lock at {} has failed {} consecutive heartbeats: {}; \
420             the lock may now be reclaimed by another owner — continuing to retry",
421            path.display(),
422            consecutive_failures,
423            reason
424        );
425    }
426}
427
428fn heartbeat_once(path: &Path, owner: &LockMetadata) -> Result<(), HeartbeatError> {
429    let mut metadata = match read_lock_metadata(path) {
430        Ok(metadata) => metadata,
431        Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
432            return Err(HeartbeatError::LockGone);
433        }
434        Err(ReadLockError::Io(error)) => return Err(HeartbeatError::Io(error)),
435        Err(ReadLockError::Malformed(error)) => return Err(HeartbeatError::Malformed(error)),
436    };
437
438    if metadata.pid != owner.pid
439        || metadata.hostname != owner.hostname
440        || metadata.created_at_ms != owner.created_at_ms
441    {
442        return Err(HeartbeatError::NotOwner);
443    }
444
445    metadata.heartbeat_at_ms = now_ms();
446    atomic_write_lock_metadata(path, &metadata).map_err(HeartbeatError::Io)
447}
448
449#[derive(Debug)]
450enum HeartbeatError {
451    Io(io::Error),
452    LockGone,
453    Malformed(serde_json::Error),
454    NotOwner,
455}
456
457#[derive(Debug)]
458enum ReadLockError {
459    Io(io::Error),
460    Malformed(serde_json::Error),
461}
462
463fn read_lock_metadata(path: &Path) -> Result<LockMetadata, ReadLockError> {
464    let bytes = fs::read(path).map_err(ReadLockError::Io)?;
465    serde_json::from_slice(&bytes).map_err(ReadLockError::Malformed)
466}
467
468#[cfg(unix)]
469fn open_new_lock_file(path: &Path) -> io::Result<File> {
470    use std::os::unix::fs::OpenOptionsExt;
471
472    OpenOptions::new()
473        .write(true)
474        .create_new(true)
475        .mode(0o644)
476        .open(path)
477}
478
479#[cfg(not(unix))]
480fn open_new_lock_file(path: &Path) -> io::Result<File> {
481    OpenOptions::new().write(true).create_new(true).open(path)
482}
483
484fn write_lock_metadata_to_file(file: &mut File, metadata: &LockMetadata) -> io::Result<()> {
485    serde_json::to_writer(&mut *file, metadata).map_err(io::Error::other)?;
486    file.write_all(b"\n")?;
487    file.sync_all()
488}
489
490fn create_lock_file_atomically(path: &Path, metadata: &LockMetadata) -> io::Result<()> {
491    let tmp_path = temp_path_for_lock(path);
492    let result = (|| {
493        let mut file = open_new_lock_file(&tmp_path)?;
494        write_lock_metadata_to_file(&mut file, metadata)?;
495        drop(file);
496
497        fs::hard_link(&tmp_path, path)?;
498        sync_parent(path);
499        Ok(())
500    })();
501
502    let _ = fs::remove_file(&tmp_path);
503    result
504}
505
506fn atomic_write_lock_metadata(path: &Path, metadata: &LockMetadata) -> io::Result<()> {
507    let tmp_path = temp_path_for_lock(path);
508    let write_result = (|| {
509        let mut file = OpenOptions::new()
510            .write(true)
511            .create_new(true)
512            .open(&tmp_path)?;
513        write_lock_metadata_to_file(&mut file, metadata)?;
514        drop(file);
515
516        rename_over(&tmp_path, path)?;
517        sync_parent(path);
518        Ok(())
519    })();
520
521    if write_result.is_err() {
522        let _ = fs::remove_file(&tmp_path);
523    }
524
525    write_result
526}
527
528#[cfg(windows)]
529fn rename_over(from: &Path, to: &Path) -> io::Result<()> {
530    let _ = fs::remove_file(to);
531    fs::rename(from, to)
532}
533
534#[cfg(not(windows))]
535fn rename_over(from: &Path, to: &Path) -> io::Result<()> {
536    fs::rename(from, to)
537}
538
539// Per-thread counter that disambiguates temp lockfile paths for callers
540// inside the same process. `now_nanos()` alone is not unique enough on
541// Windows when two threads race to acquire the same lock (caught by the
542// `acquire_serializes_concurrent_callers` test): two threads sampling the
543// nanosecond clock within the same scheduler quantum produce identical
544// timestamps, both write to the same `.lock.tmp.<pid>.<nanos>` file, one
545// thread's `fs::remove_file(&tmp_path)` cleanup deletes the file before
546// the other thread's `fs::hard_link(&tmp_path, ...)` runs, and the loser
547// panics with `Io(Os { code: 2, NotFound })`.
548//
549// `AtomicU64` shared across threads makes every temp path unique within
550// the process regardless of clock resolution or scheduling races.
551static TEMP_LOCK_COUNTER: AtomicU64 = AtomicU64::new(0);
552
553fn temp_path_for_lock(path: &Path) -> PathBuf {
554    let file_name = path
555        .file_name()
556        .and_then(|name| name.to_str())
557        .unwrap_or("lock");
558    let seq = TEMP_LOCK_COUNTER.fetch_add(1, Ordering::Relaxed);
559    path.with_file_name(format!(
560        ".{file_name}.tmp.{}.{}.{}",
561        std::process::id(),
562        now_nanos(),
563        seq
564    ))
565}
566
567fn remove_lock_if_owned(path: &Path, owner: &LockMetadata) -> io::Result<bool> {
568    let metadata = match read_lock_metadata(path) {
569        Ok(metadata) => metadata,
570        Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
571            return Ok(false);
572        }
573        Err(ReadLockError::Io(error)) => return Err(error),
574        Err(ReadLockError::Malformed(_)) => return Ok(false),
575    };
576
577    if metadata.pid == owner.pid
578        && metadata.hostname == owner.hostname
579        && metadata.created_at_ms == owner.created_at_ms
580    {
581        remove_lock_file(path)?;
582        Ok(true)
583    } else {
584        Ok(false)
585    }
586}
587
588fn remove_lock_file(path: &Path) -> io::Result<()> {
589    match fs::remove_file(path) {
590        Ok(()) => Ok(()),
591        Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(()),
592        Err(error) => Err(error),
593    }
594}
595
596fn sleep_until_retry(deadline: Option<Instant>, poll_interval_ms: u64) -> Result<(), AcquireError> {
597    let poll = Duration::from_millis(poll_interval_ms);
598    let sleep_for = match deadline {
599        Some(deadline) => {
600            let now = Instant::now();
601            if now >= deadline {
602                return Err(AcquireError::Timeout);
603            }
604            poll.min(deadline.saturating_duration_since(now))
605        }
606        None => poll,
607    };
608    thread::sleep(sleep_for);
609    Ok(())
610}
611
612fn sync_parent(path: &Path) {
613    if let Some(parent) = path.parent() {
614        if let Ok(dir) = File::open(parent) {
615            let _ = dir.sync_all();
616        }
617    }
618}
619
620fn now_ms() -> u64 {
621    SystemTime::now()
622        .duration_since(UNIX_EPOCH)
623        .unwrap_or(Duration::ZERO)
624        .as_millis() as u64
625}
626
627fn now_nanos() -> u128 {
628    SystemTime::now()
629        .duration_since(UNIX_EPOCH)
630        .unwrap_or(Duration::ZERO)
631        .as_nanos()
632}
633
634#[cfg(unix)]
635fn current_hostname() -> String {
636    let mut buffer = [0u8; 256];
637    let result = unsafe { libc::gethostname(buffer.as_mut_ptr().cast(), buffer.len()) };
638    if result == 0 {
639        let len = buffer
640            .iter()
641            .position(|byte| *byte == 0)
642            .unwrap_or(buffer.len());
643        if len > 0 {
644            return String::from_utf8_lossy(&buffer[..len]).into_owned();
645        }
646    }
647
648    std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown-host".to_string())
649}
650
651#[cfg(windows)]
652fn current_hostname() -> String {
653    std::env::var("COMPUTERNAME")
654        .or_else(|_| std::env::var("HOSTNAME"))
655        .unwrap_or_else(|_| "unknown-host".to_string())
656}
657
658#[cfg(not(any(unix, windows)))]
659fn current_hostname() -> String {
660    std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown-host".to_string())
661}
662
663#[cfg(unix)]
664fn process_alive(pid: u32) -> bool {
665    if pid == 0 || pid > i32::MAX as u32 {
666        return false;
667    }
668
669    let result = unsafe { libc::kill(pid as libc::pid_t, 0) };
670    if result == 0 {
671        return true;
672    }
673
674    io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
675}
676
677#[cfg(windows)]
678fn process_alive(pid: u32) -> bool {
679    let filter = format!("PID eq {pid}");
680    let Ok(output) = std::process::Command::new("tasklist")
681        .args(["/FI", &filter, "/FO", "CSV", "/NH"])
682        .output()
683    else {
684        return true;
685    };
686
687    if !output.status.success() {
688        return true;
689    }
690
691    let stdout = String::from_utf8_lossy(&output.stdout);
692
693    // `tasklist /NH /FO CSV` emits a single line per matching process with
694    // every field quoted, e.g. `"image","7420","Console","1","12,345 K"`.
695    // When the filter matches nothing, the literal text
696    // `INFO: No tasks are running which match the specified criteria.`
697    // is written to stdout. The previous matcher was too strict — it looked
698    // for `","{pid}",` patterns mid-line, which works on most Windows builds
699    // but missed Windows runners that emit slightly different quoting (e.g.
700    // a trailing CRLF leaves the pid token at end-of-line as `"7420"\r\n`).
701    // The robust check: confirm the "no tasks" sentinel is absent AND any
702    // PID-quoted form is present.
703    if stdout.contains("No tasks are running") {
704        return false;
705    }
706    stdout.contains(&format!("\"{pid}\""))
707}
708
709#[cfg(not(any(unix, windows)))]
710fn process_alive(_pid: u32) -> bool {
711    true
712}
713
714#[cfg(test)]
715mod tests {
716    use super::*;
717    use std::sync::atomic::{AtomicUsize, Ordering};
718    use std::sync::{Arc, Barrier};
719
720    fn test_config() -> LockConfig {
721        LockConfig {
722            heartbeat_interval_ms: 25,
723            stale_heartbeat_ms: 2_000,
724            live_owner_warn_ms: LIVE_OWNER_WARN_MS,
725            poll_interval_ms: 10,
726        }
727    }
728
729    fn test_lock_path() -> (tempfile::TempDir, PathBuf) {
730        let dir = tempfile::tempdir().expect("create temp dir");
731        let path = dir.path().join("test.lock");
732        (dir, path)
733    }
734
735    fn write_synthetic_lock(path: &Path, metadata: &LockMetadata) {
736        let mut file = open_new_lock_file(path).expect("create synthetic lock");
737        write_lock_metadata_to_file(&mut file, metadata).expect("write synthetic lock");
738    }
739
740    fn synthetic_metadata(pid: u32, hostname: String, created_at_ms: u64) -> LockMetadata {
741        LockMetadata {
742            pid,
743            hostname,
744            created_at_ms,
745            heartbeat_at_ms: created_at_ms,
746        }
747    }
748
749    fn current_process_metadata() -> LockMetadata {
750        let now = now_ms();
751        synthetic_metadata(std::process::id(), current_hostname(), now)
752    }
753
754    #[test]
755    fn acquire_creates_lockfile_and_unlocks_on_drop() {
756        let (_dir, path) = test_lock_path();
757
758        let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
759        let metadata = read_lock_metadata(&path).expect("read lock metadata");
760        assert_eq!(metadata.pid, std::process::id());
761        assert_eq!(metadata.hostname, current_hostname());
762        assert_eq!(metadata.created_at_ms, guard.metadata.created_at_ms);
763
764        drop(guard);
765        assert!(!path.exists());
766    }
767
768    #[test]
769    fn acquire_serializes_concurrent_callers() {
770        let (_dir, path) = test_lock_path();
771        let path = Arc::new(path);
772        let barrier = Arc::new(Barrier::new(3));
773        let inside = Arc::new(AtomicUsize::new(0));
774        let entered = Arc::new(AtomicUsize::new(0));
775        let max_inside = Arc::new(AtomicUsize::new(0));
776
777        let mut handles = Vec::new();
778        for _ in 0..2 {
779            let path = Arc::clone(&path);
780            let barrier = Arc::clone(&barrier);
781            let inside = Arc::clone(&inside);
782            let entered = Arc::clone(&entered);
783            let max_inside = Arc::clone(&max_inside);
784            handles.push(thread::spawn(move || {
785                barrier.wait();
786                let guard = acquire_with_config(&path, Some(Duration::from_secs(2)), test_config())
787                    .expect("thread acquire lock");
788                let previous = inside.fetch_add(1, Ordering::SeqCst);
789                assert_eq!(previous, 0, "two lock holders overlapped");
790                entered.fetch_add(1, Ordering::SeqCst);
791                max_inside.fetch_max(previous + 1, Ordering::SeqCst);
792                thread::sleep(Duration::from_millis(75));
793                inside.fetch_sub(1, Ordering::SeqCst);
794                drop(guard);
795            }));
796        }
797
798        barrier.wait();
799        for handle in handles {
800            handle.join().expect("join worker");
801        }
802
803        assert_eq!(entered.load(Ordering::SeqCst), 2);
804        assert_eq!(max_inside.load(Ordering::SeqCst), 1);
805        assert!(!path.exists());
806    }
807
808    #[test]
809    fn heartbeat_updates_lockfile_timestamp() {
810        let (_dir, path) = test_lock_path();
811        let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
812        let initial = read_lock_metadata(&path)
813            .expect("read initial metadata")
814            .heartbeat_at_ms;
815
816        // Poll for up to 2s rather than sleeping a fixed multiple of the
817        // heartbeat interval. `park_timeout` is a *maximum* wait, not a
818        // guaranteed periodic timer — under load (shared macOS CI runners
819        // running other cargo-test threads concurrently) the heartbeat
820        // thread may not fire 3 times within 75ms even though
821        // heartbeat_interval_ms=25. The contract being asserted is "the
822        // heartbeat advances eventually", not "it advances within N
823        // heartbeat intervals".
824        //
825        // On Windows, `rename_over` does `remove_file(to)` then
826        // `fs::rename(from, to)` because Windows can't atomically replace
827        // an open file. There's a brief window where the lockfile doesn't
828        // exist. If the poller hits that window, `read_lock_metadata`
829        // returns `Io(NotFound)`. Production callers already handle this
830        // (see `remove_lock_if_owned`), so the test treats `NotFound` the
831        // same as "no update yet" and keeps polling.
832        let deadline = std::time::Instant::now() + Duration::from_millis(2_000);
833        let mut updated = initial;
834        while std::time::Instant::now() < deadline {
835            thread::sleep(Duration::from_millis(50));
836            match read_lock_metadata(&path) {
837                Ok(meta) => {
838                    updated = meta.heartbeat_at_ms;
839                    if updated > initial {
840                        break;
841                    }
842                }
843                Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
844                    // Heartbeat thread is mid-rewrite (Windows
845                    // remove-then-rename window). Retry next iteration.
846                    continue;
847                }
848                Err(other) => panic!("read updated metadata: {other:?}"),
849            }
850        }
851        assert!(
852            updated > initial,
853            "heartbeat timestamp did not advance within 2s"
854        );
855        drop(guard);
856    }
857
858    #[test]
859    fn dead_pid_lock_is_reclaimed() {
860        let (_dir, path) = test_lock_path();
861        let metadata = synthetic_metadata(999_999_999, current_hostname(), now_ms());
862        write_synthetic_lock(&path, &metadata);
863
864        let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
865            .expect("reclaim dead pid lock");
866        let metadata = read_lock_metadata(&path).expect("read reclaimed lock");
867        assert_eq!(metadata.pid, std::process::id());
868        drop(guard);
869    }
870
871    #[test]
872    fn stale_heartbeat_from_live_pid_blocks() {
873        let (_dir, path) = test_lock_path();
874        let mut metadata = current_process_metadata();
875        metadata.created_at_ms = now_ms().saturating_sub(60_000);
876        metadata.heartbeat_at_ms = now_ms().saturating_sub(60_000);
877        write_synthetic_lock(&path, &metadata);
878
879        let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
880        assert!(matches!(result, Err(AcquireError::Timeout)));
881        assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
882
883        remove_lock_file(&path).expect("cleanup synthetic lock");
884    }
885
886    #[test]
887    fn healthy_live_owner_blocks() {
888        let (_dir, path) = test_lock_path();
889        let metadata = current_process_metadata();
890        write_synthetic_lock(&path, &metadata);
891
892        let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
893        assert!(matches!(result, Err(AcquireError::Timeout)));
894
895        remove_lock_file(&path).expect("cleanup synthetic lock");
896    }
897
898    #[test]
899    fn malformed_lockfile_is_reclaimed() {
900        let (_dir, path) = test_lock_path();
901        fs::write(&path, b"not valid json").expect("write malformed lock");
902
903        let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
904            .expect("reclaim malformed lock");
905        let metadata = read_lock_metadata(&path).expect("read reclaimed lock");
906        assert_eq!(metadata.pid, std::process::id());
907        drop(guard);
908    }
909
910    #[test]
911    fn cross_host_lock_is_not_stolen_before_extended_stale_threshold() {
912        let (_dir, path) = test_lock_path();
913        let now = now_ms();
914        let metadata = LockMetadata {
915            pid: std::process::id(),
916            hostname: format!("{}-other", current_hostname()),
917            created_at_ms: now,
918            heartbeat_at_ms: now,
919        };
920        write_synthetic_lock(&path, &metadata);
921
922        let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
923        assert!(matches!(result, Err(AcquireError::Timeout)));
924        assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
925
926        remove_lock_file(&path).expect("cleanup synthetic lock");
927    }
928
929    #[test]
930    fn stale_cross_host_lock_is_reclaimed_after_extended_threshold() {
931        let (_dir, path) = test_lock_path();
932        let stale_at =
933            now_ms().saturating_sub(test_config().cross_host_stale_heartbeat_ms() + 1_000);
934        let metadata = LockMetadata {
935            pid: std::process::id(),
936            hostname: format!("{}-other", current_hostname()),
937            created_at_ms: stale_at,
938            heartbeat_at_ms: stale_at,
939        };
940        write_synthetic_lock(&path, &metadata);
941
942        let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
943            .expect("reclaim stale cross-host lock");
944        let reclaimed = read_lock_metadata(&path).expect("read reclaimed lock");
945        assert_eq!(reclaimed.hostname, current_hostname());
946        assert_ne!(reclaimed.created_at_ms, metadata.created_at_ms);
947        drop(guard);
948    }
949
950    #[test]
951    fn live_owner_over_10min_warns_but_blocks() {
952        let (_dir, path) = test_lock_path();
953        let mut metadata = current_process_metadata();
954        metadata.created_at_ms = now_ms().saturating_sub(11 * 60 * 1_000);
955        metadata.heartbeat_at_ms = now_ms();
956        write_synthetic_lock(&path, &metadata);
957
958        let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
959        assert!(matches!(result, Err(AcquireError::Timeout)));
960        assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
961
962        remove_lock_file(&path).expect("cleanup synthetic lock");
963    }
964
965    #[test]
966    fn drop_stops_heartbeat_thread() {
967        let (_dir, path) = test_lock_path();
968        let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
969        drop(guard);
970
971        thread::sleep(Duration::from_millis(
972            test_config().heartbeat_interval_ms * 3,
973        ));
974        assert!(
975            !path.exists(),
976            "heartbeat recreated or kept updating lockfile"
977        );
978    }
979
980    #[test]
981    fn heartbeat_error_classification_terminal_vs_transient() {
982        // Terminal: the lock is provably no longer ours to refresh.
983        assert!(heartbeat_error_is_terminal(&HeartbeatError::LockGone));
984        assert!(heartbeat_error_is_terminal(&HeartbeatError::NotOwner));
985        // Transient: a temporary I/O hiccup or a read that raced a concurrent
986        // writer. These must NOT kill the heartbeat — it retries instead.
987        assert!(!heartbeat_error_is_terminal(&HeartbeatError::Io(
988            io::Error::other("disk blip")
989        )));
990        let malformed: serde_json::Error =
991            serde_json::from_str::<LockMetadata>("not json").unwrap_err();
992        assert!(!heartbeat_error_is_terminal(&HeartbeatError::Malformed(
993            malformed
994        )));
995    }
996
997    #[test]
998    fn heartbeat_survives_transient_malformed_and_recovers() {
999        // Regression: a single transient failure (e.g. a read that races a
1000        // concurrent writer and sees a momentarily-unparseable file) used to
1001        // permanently kill the heartbeat thread. The guard holder would then
1002        // run its critical section with a stale heartbeat_at_ms, letting
1003        // another process reclaim the lock after the stale window — concurrent
1004        // writers / split-brain. The heartbeat must instead retry and resume
1005        // refreshing once the file is readable again.
1006        let (_dir, path) = test_lock_path();
1007        let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
1008        let owner = guard.metadata.clone();
1009
1010        // Corrupt the lockfile out from under the heartbeat (simulates a
1011        // concurrent-writer race producing a momentarily-unparseable read).
1012        // The heartbeat reads-then-writes, so it observes Malformed and, with
1013        // the fix, retries instead of dying.
1014        fs::write(&path, b"{ not valid json").expect("corrupt lockfile");
1015
1016        // Give the heartbeat several intervals to observe the malformed file.
1017        // Pre-fix, the thread is dead by now.
1018        thread::sleep(Duration::from_millis(
1019            test_config().heartbeat_interval_ms * 4,
1020        ));
1021
1022        // Restore valid owner metadata with a clearly-stale heartbeat sentinel.
1023        // Ownership fields must match `owner` exactly so heartbeat_once passes
1024        // its ownership check and writes a fresh timestamp.
1025        //
1026        // Use the atomic temp-write+rename path rather than remove-then-recreate:
1027        // a remove followed by a separate create leaves a window where the file
1028        // does not exist, and a heartbeat poll landing in that window reads
1029        // NotFound -> LockGone (terminal) and kills the thread, failing this test
1030        // spuriously under runner load (observed on macOS CI). The atomic replace
1031        // overwrites the corrupt file in place with no no-file window on Unix.
1032        let sentinel = now_ms().saturating_sub(1_000_000);
1033        let mut restored = owner.clone();
1034        restored.heartbeat_at_ms = sentinel;
1035        atomic_write_lock_metadata(&path, &restored).expect("atomically restore lock metadata");
1036
1037        // If the heartbeat thread is still alive (the fix), it will overwrite
1038        // heartbeat_at_ms with a current value. Poll for that recovery.
1039        let deadline = std::time::Instant::now() + Duration::from_millis(3_000);
1040        let mut recovered = false;
1041        while std::time::Instant::now() < deadline {
1042            thread::sleep(Duration::from_millis(25));
1043            match read_lock_metadata(&path) {
1044                Ok(meta)
1045                    if meta.created_at_ms == owner.created_at_ms
1046                        && meta.heartbeat_at_ms > sentinel =>
1047                {
1048                    recovered = true;
1049                    break;
1050                }
1051                _ => continue,
1052            }
1053        }
1054        assert!(
1055            recovered,
1056            "heartbeat did not recover after a transient malformed read — thread likely died"
1057        );
1058        drop(guard);
1059    }
1060}