Skip to main content

aft/
fs_lock.rs

1use std::fmt;
2use std::fs::{self, File, OpenOptions};
3use std::io::{self, Write};
4use std::path::{Path, PathBuf};
5use std::sync::{
6    atomic::{AtomicBool, AtomicU64, Ordering},
7    mpsc, Arc,
8};
9use std::thread::{self, JoinHandle};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use serde::{Deserialize, Serialize};
13
14use crate::{slog_error, slog_info, slog_warn};
15
16pub const HEARTBEAT_INTERVAL_MS: u64 = 5_000;
17pub const STALE_HEARTBEAT_MS: u64 = 15_000;
18pub const LIVE_OWNER_WARN_MS: u64 = 600_000;
19pub const POLL_INTERVAL_MS: u64 = 100;
20
21#[derive(Clone, Copy, Debug)]
22struct LockConfig {
23    heartbeat_interval_ms: u64,
24    stale_heartbeat_ms: u64,
25    live_owner_warn_ms: u64,
26    poll_interval_ms: u64,
27}
28
29impl Default for LockConfig {
30    fn default() -> Self {
31        Self {
32            heartbeat_interval_ms: HEARTBEAT_INTERVAL_MS,
33            stale_heartbeat_ms: STALE_HEARTBEAT_MS,
34            live_owner_warn_ms: LIVE_OWNER_WARN_MS,
35            poll_interval_ms: POLL_INTERVAL_MS,
36        }
37    }
38}
39
40#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
41struct LockMetadata {
42    pid: u32,
43    hostname: String,
44    created_at_ms: u64,
45    heartbeat_at_ms: u64,
46}
47
48/// Acquire a filesystem lock at `path`. Blocks until the lock is held.
49///
50/// The returned guard owns a background heartbeat thread; dropping it releases
51/// the lock and removes the lock file.
52pub fn acquire(path: &Path) -> Result<LockGuard, AcquireError> {
53    acquire_with_config(path, None, LockConfig::default())
54}
55
56/// Try to acquire a filesystem lock at `path` within `timeout`.
57pub fn try_acquire(path: &Path, timeout: Duration) -> Result<LockGuard, AcquireError> {
58    acquire_with_config(path, Some(timeout), LockConfig::default())
59}
60
61pub struct LockGuard {
62    path: PathBuf,
63    metadata: LockMetadata,
64    shutdown: Arc<AtomicBool>,
65    heartbeat_done: mpsc::Receiver<()>,
66    heartbeat: Option<JoinHandle<()>>,
67}
68
69impl Drop for LockGuard {
70    fn drop(&mut self) {
71        self.shutdown.store(true, Ordering::Release);
72        if let Some(handle) = self.heartbeat.as_ref() {
73            handle.thread().unpark();
74        }
75
76        if self
77            .heartbeat_done
78            .recv_timeout(Duration::from_millis(100))
79            .is_ok()
80        {
81            if let Some(handle) = self.heartbeat.take() {
82                let _ = handle.join();
83            }
84        } else {
85            slog_warn!(
86                "fs lock heartbeat thread for {} did not stop within 100ms",
87                self.path.display()
88            );
89        }
90
91        match remove_lock_if_owned(&self.path, &self.metadata) {
92            Ok(true) => slog_info!("released filesystem lock at {}", self.path.display()),
93            Ok(false) => {}
94            Err(error) => slog_warn!(
95                "failed to release filesystem lock at {}: {}",
96                self.path.display(),
97                error
98            ),
99        }
100    }
101}
102
103#[derive(Debug)]
104pub enum AcquireError {
105    Io(io::Error),
106    Timeout,
107}
108
109impl fmt::Display for AcquireError {
110    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111        match self {
112            AcquireError::Io(error) => write!(f, "filesystem lock I/O error: {error}"),
113            AcquireError::Timeout => write!(f, "timed out acquiring filesystem lock"),
114        }
115    }
116}
117
118impl std::error::Error for AcquireError {}
119
120impl From<io::Error> for AcquireError {
121    fn from(error: io::Error) -> Self {
122        AcquireError::Io(error)
123    }
124}
125
126fn acquire_with_config(
127    path: &Path,
128    timeout: Option<Duration>,
129    config: LockConfig,
130) -> Result<LockGuard, AcquireError> {
131    let deadline = timeout.map(|timeout| Instant::now() + timeout);
132    let hostname = current_hostname();
133    let mut warned_live_owner = false;
134
135    loop {
136        if let Some(deadline) = deadline {
137            if Instant::now() >= deadline {
138                return Err(AcquireError::Timeout);
139            }
140        }
141
142        match create_new_lock(path, &hostname, config) {
143            Ok(guard) => return Ok(guard),
144            Err(error) if error.kind() == io::ErrorKind::AlreadyExists => {}
145            Err(error) => return Err(error.into()),
146        }
147
148        let metadata = match read_lock_metadata(path) {
149            Ok(metadata) => metadata,
150            Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => continue,
151            Err(ReadLockError::Io(error)) => return Err(error.into()),
152            Err(ReadLockError::Malformed(error)) => {
153                // A just-created O_EXCL file is visible before its owner has
154                // finished writing JSON. Give that transient creation window
155                // one poll interval before treating malformed contents as stale.
156                sleep_until_retry(deadline, config.poll_interval_ms)?;
157                match read_lock_metadata(path) {
158                    Ok(_) => continue,
159                    Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
160                        continue;
161                    }
162                    Err(ReadLockError::Io(error)) => return Err(error.into()),
163                    Err(ReadLockError::Malformed(_)) => {}
164                }
165                slog_warn!(
166                    "removing malformed filesystem lock at {}: {}",
167                    path.display(),
168                    error
169                );
170                remove_lock_file(path)?;
171                continue;
172            }
173        };
174
175        if metadata.hostname != hostname {
176            sleep_until_retry(deadline, config.poll_interval_ms)?;
177            continue;
178        }
179
180        if !process_alive(metadata.pid) {
181            slog_warn!(
182                "removing filesystem lock at {} from dead PID {}",
183                path.display(),
184                metadata.pid
185            );
186            remove_lock_file(path)?;
187            continue;
188        }
189
190        let now = now_ms();
191        let since_heartbeat = now.saturating_sub(metadata.heartbeat_at_ms);
192        if since_heartbeat > config.stale_heartbeat_ms {
193            slog_warn!(
194                "reclaiming filesystem lock at {}: PID {} is alive but heartbeat is stale ({}ms)",
195                path.display(),
196                metadata.pid,
197                since_heartbeat
198            );
199            remove_lock_file(path)?;
200            continue;
201        }
202
203        let held_for = now.saturating_sub(metadata.created_at_ms);
204        if held_for > config.live_owner_warn_ms && !warned_live_owner {
205            slog_warn!(
206                "filesystem lock at {} held >10min by live heartbeating PID {}; NOT breaking",
207                path.display(),
208                metadata.pid
209            );
210            warned_live_owner = true;
211        }
212
213        sleep_until_retry(deadline, config.poll_interval_ms)?;
214    }
215}
216
217fn create_new_lock(path: &Path, hostname: &str, config: LockConfig) -> io::Result<LockGuard> {
218    let now = now_ms();
219    let metadata = LockMetadata {
220        pid: std::process::id(),
221        hostname: hostname.to_string(),
222        created_at_ms: now,
223        heartbeat_at_ms: now,
224    };
225
226    create_lock_file_atomically(path, &metadata)?;
227
228    let shutdown = Arc::new(AtomicBool::new(false));
229    let (done_tx, done_rx) = mpsc::channel();
230    let heartbeat_path = path.to_path_buf();
231    let heartbeat_metadata = metadata.clone();
232    let heartbeat_shutdown = Arc::clone(&shutdown);
233    let heartbeat = thread::Builder::new()
234        .name("aft-fs-lock-heartbeat".to_string())
235        .spawn(move || {
236            run_heartbeat(
237                heartbeat_path,
238                heartbeat_metadata,
239                heartbeat_shutdown,
240                config,
241            );
242            let _ = done_tx.send(());
243        })?;
244
245    slog_info!("acquired filesystem lock at {}", path.display());
246
247    Ok(LockGuard {
248        path: path.to_path_buf(),
249        metadata,
250        shutdown,
251        heartbeat_done: done_rx,
252        heartbeat: Some(heartbeat),
253    })
254}
255
256fn run_heartbeat(
257    path: PathBuf,
258    owner: LockMetadata,
259    shutdown: Arc<AtomicBool>,
260    config: LockConfig,
261) {
262    loop {
263        thread::park_timeout(Duration::from_millis(config.heartbeat_interval_ms));
264        if shutdown.load(Ordering::Acquire) {
265            return;
266        }
267
268        match heartbeat_once(&path, &owner) {
269            Ok(()) => {}
270            Err(HeartbeatError::LockGone) => {
271                slog_error!(
272                    "filesystem lock at {} disappeared; stopping heartbeat",
273                    path.display()
274                );
275                return;
276            }
277            Err(HeartbeatError::NotOwner) => {
278                slog_error!(
279                    "filesystem lock at {} is no longer owned by this guard; stopping heartbeat",
280                    path.display()
281                );
282                return;
283            }
284            Err(HeartbeatError::Malformed(error)) => {
285                slog_error!(
286                    "filesystem lock at {} became malformed: {}; stopping heartbeat",
287                    path.display(),
288                    error
289                );
290                return;
291            }
292            Err(HeartbeatError::Io(error)) => {
293                slog_error!(
294                    "failed to heartbeat filesystem lock at {}: {}; stopping heartbeat",
295                    path.display(),
296                    error
297                );
298                return;
299            }
300        }
301    }
302}
303
304fn heartbeat_once(path: &Path, owner: &LockMetadata) -> Result<(), HeartbeatError> {
305    let mut metadata = match read_lock_metadata(path) {
306        Ok(metadata) => metadata,
307        Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
308            return Err(HeartbeatError::LockGone);
309        }
310        Err(ReadLockError::Io(error)) => return Err(HeartbeatError::Io(error)),
311        Err(ReadLockError::Malformed(error)) => return Err(HeartbeatError::Malformed(error)),
312    };
313
314    if metadata.pid != owner.pid
315        || metadata.hostname != owner.hostname
316        || metadata.created_at_ms != owner.created_at_ms
317    {
318        return Err(HeartbeatError::NotOwner);
319    }
320
321    metadata.heartbeat_at_ms = now_ms();
322    atomic_write_lock_metadata(path, &metadata).map_err(HeartbeatError::Io)
323}
324
325#[derive(Debug)]
326enum HeartbeatError {
327    Io(io::Error),
328    LockGone,
329    Malformed(serde_json::Error),
330    NotOwner,
331}
332
333#[derive(Debug)]
334enum ReadLockError {
335    Io(io::Error),
336    Malformed(serde_json::Error),
337}
338
339fn read_lock_metadata(path: &Path) -> Result<LockMetadata, ReadLockError> {
340    let bytes = fs::read(path).map_err(ReadLockError::Io)?;
341    serde_json::from_slice(&bytes).map_err(ReadLockError::Malformed)
342}
343
344#[cfg(unix)]
345fn open_new_lock_file(path: &Path) -> io::Result<File> {
346    use std::os::unix::fs::OpenOptionsExt;
347
348    OpenOptions::new()
349        .write(true)
350        .create_new(true)
351        .mode(0o644)
352        .open(path)
353}
354
355#[cfg(not(unix))]
356fn open_new_lock_file(path: &Path) -> io::Result<File> {
357    OpenOptions::new().write(true).create_new(true).open(path)
358}
359
360fn write_lock_metadata_to_file(file: &mut File, metadata: &LockMetadata) -> io::Result<()> {
361    serde_json::to_writer(&mut *file, metadata).map_err(io::Error::other)?;
362    file.write_all(b"\n")?;
363    file.sync_all()
364}
365
366fn create_lock_file_atomically(path: &Path, metadata: &LockMetadata) -> io::Result<()> {
367    let tmp_path = temp_path_for_lock(path);
368    let result = (|| {
369        let mut file = open_new_lock_file(&tmp_path)?;
370        write_lock_metadata_to_file(&mut file, metadata)?;
371        drop(file);
372
373        fs::hard_link(&tmp_path, path)?;
374        sync_parent(path);
375        Ok(())
376    })();
377
378    let _ = fs::remove_file(&tmp_path);
379    result
380}
381
382fn atomic_write_lock_metadata(path: &Path, metadata: &LockMetadata) -> io::Result<()> {
383    let tmp_path = temp_path_for_lock(path);
384    let write_result = (|| {
385        let mut file = OpenOptions::new()
386            .write(true)
387            .create_new(true)
388            .open(&tmp_path)?;
389        write_lock_metadata_to_file(&mut file, metadata)?;
390        drop(file);
391
392        rename_over(&tmp_path, path)?;
393        sync_parent(path);
394        Ok(())
395    })();
396
397    if write_result.is_err() {
398        let _ = fs::remove_file(&tmp_path);
399    }
400
401    write_result
402}
403
404#[cfg(windows)]
405fn rename_over(from: &Path, to: &Path) -> io::Result<()> {
406    let _ = fs::remove_file(to);
407    fs::rename(from, to)
408}
409
410#[cfg(not(windows))]
411fn rename_over(from: &Path, to: &Path) -> io::Result<()> {
412    fs::rename(from, to)
413}
414
415// Per-thread counter that disambiguates temp lockfile paths for callers
416// inside the same process. `now_nanos()` alone is not unique enough on
417// Windows when two threads race to acquire the same lock (caught by the
418// `acquire_serializes_concurrent_callers` test): two threads sampling the
419// nanosecond clock within the same scheduler quantum produce identical
420// timestamps, both write to the same `.lock.tmp.<pid>.<nanos>` file, one
421// thread's `fs::remove_file(&tmp_path)` cleanup deletes the file before
422// the other thread's `fs::hard_link(&tmp_path, ...)` runs, and the loser
423// panics with `Io(Os { code: 2, NotFound })`.
424//
425// `AtomicU64` shared across threads makes every temp path unique within
426// the process regardless of clock resolution or scheduling races.
427static TEMP_LOCK_COUNTER: AtomicU64 = AtomicU64::new(0);
428
429fn temp_path_for_lock(path: &Path) -> PathBuf {
430    let file_name = path
431        .file_name()
432        .and_then(|name| name.to_str())
433        .unwrap_or("lock");
434    let seq = TEMP_LOCK_COUNTER.fetch_add(1, Ordering::Relaxed);
435    path.with_file_name(format!(
436        ".{file_name}.tmp.{}.{}.{}",
437        std::process::id(),
438        now_nanos(),
439        seq
440    ))
441}
442
443fn remove_lock_if_owned(path: &Path, owner: &LockMetadata) -> io::Result<bool> {
444    let metadata = match read_lock_metadata(path) {
445        Ok(metadata) => metadata,
446        Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
447            return Ok(false);
448        }
449        Err(ReadLockError::Io(error)) => return Err(error),
450        Err(ReadLockError::Malformed(_)) => return Ok(false),
451    };
452
453    if metadata.pid == owner.pid
454        && metadata.hostname == owner.hostname
455        && metadata.created_at_ms == owner.created_at_ms
456    {
457        remove_lock_file(path)?;
458        Ok(true)
459    } else {
460        Ok(false)
461    }
462}
463
464fn remove_lock_file(path: &Path) -> io::Result<()> {
465    match fs::remove_file(path) {
466        Ok(()) => Ok(()),
467        Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(()),
468        Err(error) => Err(error),
469    }
470}
471
472fn sleep_until_retry(deadline: Option<Instant>, poll_interval_ms: u64) -> Result<(), AcquireError> {
473    let poll = Duration::from_millis(poll_interval_ms);
474    let sleep_for = match deadline {
475        Some(deadline) => {
476            let now = Instant::now();
477            if now >= deadline {
478                return Err(AcquireError::Timeout);
479            }
480            poll.min(deadline.saturating_duration_since(now))
481        }
482        None => poll,
483    };
484    thread::sleep(sleep_for);
485    Ok(())
486}
487
488fn sync_parent(path: &Path) {
489    if let Some(parent) = path.parent() {
490        if let Ok(dir) = File::open(parent) {
491            let _ = dir.sync_all();
492        }
493    }
494}
495
496fn now_ms() -> u64 {
497    SystemTime::now()
498        .duration_since(UNIX_EPOCH)
499        .unwrap_or(Duration::ZERO)
500        .as_millis() as u64
501}
502
503fn now_nanos() -> u128 {
504    SystemTime::now()
505        .duration_since(UNIX_EPOCH)
506        .unwrap_or(Duration::ZERO)
507        .as_nanos()
508}
509
510#[cfg(unix)]
511fn current_hostname() -> String {
512    let mut buffer = [0u8; 256];
513    let result = unsafe { libc::gethostname(buffer.as_mut_ptr().cast(), buffer.len()) };
514    if result == 0 {
515        let len = buffer
516            .iter()
517            .position(|byte| *byte == 0)
518            .unwrap_or(buffer.len());
519        if len > 0 {
520            return String::from_utf8_lossy(&buffer[..len]).into_owned();
521        }
522    }
523
524    std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown-host".to_string())
525}
526
527#[cfg(windows)]
528fn current_hostname() -> String {
529    std::env::var("COMPUTERNAME")
530        .or_else(|_| std::env::var("HOSTNAME"))
531        .unwrap_or_else(|_| "unknown-host".to_string())
532}
533
534#[cfg(not(any(unix, windows)))]
535fn current_hostname() -> String {
536    std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown-host".to_string())
537}
538
539#[cfg(unix)]
540fn process_alive(pid: u32) -> bool {
541    if pid == 0 || pid > i32::MAX as u32 {
542        return false;
543    }
544
545    let result = unsafe { libc::kill(pid as libc::pid_t, 0) };
546    if result == 0 {
547        return true;
548    }
549
550    io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
551}
552
553#[cfg(windows)]
554fn process_alive(pid: u32) -> bool {
555    let filter = format!("PID eq {pid}");
556    let Ok(output) = std::process::Command::new("tasklist")
557        .args(["/FI", &filter, "/FO", "CSV", "/NH"])
558        .output()
559    else {
560        return true;
561    };
562
563    if !output.status.success() {
564        return true;
565    }
566
567    let stdout = String::from_utf8_lossy(&output.stdout);
568
569    // `tasklist /NH /FO CSV` emits a single line per matching process with
570    // every field quoted, e.g. `"image","7420","Console","1","12,345 K"`.
571    // When the filter matches nothing, the literal text
572    // `INFO: No tasks are running which match the specified criteria.`
573    // is written to stdout. The previous matcher was too strict — it looked
574    // for `","{pid}",` patterns mid-line, which works on most Windows builds
575    // but missed Windows runners that emit slightly different quoting (e.g.
576    // a trailing CRLF leaves the pid token at end-of-line as `"7420"\r\n`).
577    // The robust check: confirm the "no tasks" sentinel is absent AND any
578    // PID-quoted form is present.
579    if stdout.contains("No tasks are running") {
580        return false;
581    }
582    stdout.contains(&format!("\"{pid}\""))
583}
584
585#[cfg(not(any(unix, windows)))]
586fn process_alive(_pid: u32) -> bool {
587    true
588}
589
590#[cfg(test)]
591mod tests {
592    use super::*;
593    use std::sync::atomic::{AtomicUsize, Ordering};
594    use std::sync::{Arc, Barrier};
595
596    fn test_config() -> LockConfig {
597        LockConfig {
598            heartbeat_interval_ms: 25,
599            stale_heartbeat_ms: 2_000,
600            live_owner_warn_ms: LIVE_OWNER_WARN_MS,
601            poll_interval_ms: 10,
602        }
603    }
604
605    fn test_lock_path() -> (tempfile::TempDir, PathBuf) {
606        let dir = tempfile::tempdir().expect("create temp dir");
607        let path = dir.path().join("test.lock");
608        (dir, path)
609    }
610
611    fn write_synthetic_lock(path: &Path, metadata: &LockMetadata) {
612        let mut file = open_new_lock_file(path).expect("create synthetic lock");
613        write_lock_metadata_to_file(&mut file, metadata).expect("write synthetic lock");
614    }
615
616    fn synthetic_metadata(pid: u32, hostname: String, created_at_ms: u64) -> LockMetadata {
617        LockMetadata {
618            pid,
619            hostname,
620            created_at_ms,
621            heartbeat_at_ms: created_at_ms,
622        }
623    }
624
625    fn current_process_metadata() -> LockMetadata {
626        let now = now_ms();
627        synthetic_metadata(std::process::id(), current_hostname(), now)
628    }
629
630    #[test]
631    fn acquire_creates_lockfile_and_unlocks_on_drop() {
632        let (_dir, path) = test_lock_path();
633
634        let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
635        let metadata = read_lock_metadata(&path).expect("read lock metadata");
636        assert_eq!(metadata.pid, std::process::id());
637        assert_eq!(metadata.hostname, current_hostname());
638        assert_eq!(metadata.created_at_ms, guard.metadata.created_at_ms);
639
640        drop(guard);
641        assert!(!path.exists());
642    }
643
644    #[test]
645    fn acquire_serializes_concurrent_callers() {
646        let (_dir, path) = test_lock_path();
647        let path = Arc::new(path);
648        let barrier = Arc::new(Barrier::new(3));
649        let inside = Arc::new(AtomicUsize::new(0));
650        let entered = Arc::new(AtomicUsize::new(0));
651        let max_inside = Arc::new(AtomicUsize::new(0));
652
653        let mut handles = Vec::new();
654        for _ in 0..2 {
655            let path = Arc::clone(&path);
656            let barrier = Arc::clone(&barrier);
657            let inside = Arc::clone(&inside);
658            let entered = Arc::clone(&entered);
659            let max_inside = Arc::clone(&max_inside);
660            handles.push(thread::spawn(move || {
661                barrier.wait();
662                let guard = acquire_with_config(&path, Some(Duration::from_secs(2)), test_config())
663                    .expect("thread acquire lock");
664                let previous = inside.fetch_add(1, Ordering::SeqCst);
665                assert_eq!(previous, 0, "two lock holders overlapped");
666                entered.fetch_add(1, Ordering::SeqCst);
667                max_inside.fetch_max(previous + 1, Ordering::SeqCst);
668                thread::sleep(Duration::from_millis(75));
669                inside.fetch_sub(1, Ordering::SeqCst);
670                drop(guard);
671            }));
672        }
673
674        barrier.wait();
675        for handle in handles {
676            handle.join().expect("join worker");
677        }
678
679        assert_eq!(entered.load(Ordering::SeqCst), 2);
680        assert_eq!(max_inside.load(Ordering::SeqCst), 1);
681        assert!(!path.exists());
682    }
683
684    #[test]
685    fn heartbeat_updates_lockfile_timestamp() {
686        let (_dir, path) = test_lock_path();
687        let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
688        let initial = read_lock_metadata(&path)
689            .expect("read initial metadata")
690            .heartbeat_at_ms;
691
692        // Poll for up to 2s rather than sleeping a fixed multiple of the
693        // heartbeat interval. `park_timeout` is a *maximum* wait, not a
694        // guaranteed periodic timer — under load (shared macOS CI runners
695        // running other cargo-test threads concurrently) the heartbeat
696        // thread may not fire 3 times within 75ms even though
697        // heartbeat_interval_ms=25. The contract being asserted is "the
698        // heartbeat advances eventually", not "it advances within N
699        // heartbeat intervals".
700        let deadline = std::time::Instant::now() + Duration::from_millis(2_000);
701        let mut updated = initial;
702        while std::time::Instant::now() < deadline {
703            thread::sleep(Duration::from_millis(50));
704            updated = read_lock_metadata(&path)
705                .expect("read updated metadata")
706                .heartbeat_at_ms;
707            if updated > initial {
708                break;
709            }
710        }
711        assert!(
712            updated > initial,
713            "heartbeat timestamp did not advance within 2s"
714        );
715        drop(guard);
716    }
717
718    #[test]
719    fn dead_pid_lock_is_reclaimed() {
720        let (_dir, path) = test_lock_path();
721        let metadata = synthetic_metadata(999_999_999, current_hostname(), now_ms());
722        write_synthetic_lock(&path, &metadata);
723
724        let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
725            .expect("reclaim dead pid lock");
726        let metadata = read_lock_metadata(&path).expect("read reclaimed lock");
727        assert_eq!(metadata.pid, std::process::id());
728        drop(guard);
729    }
730
731    #[test]
732    fn stale_heartbeat_lock_is_reclaimed() {
733        let (_dir, path) = test_lock_path();
734        let mut metadata = current_process_metadata();
735        metadata.created_at_ms = now_ms().saturating_sub(60_000);
736        metadata.heartbeat_at_ms = now_ms().saturating_sub(60_000);
737        write_synthetic_lock(&path, &metadata);
738
739        let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
740            .expect("reclaim stale heartbeat lock");
741        let reclaimed = read_lock_metadata(&path).expect("read reclaimed lock");
742        assert_ne!(reclaimed.created_at_ms, metadata.created_at_ms);
743        drop(guard);
744    }
745
746    #[test]
747    fn healthy_live_owner_blocks() {
748        let (_dir, path) = test_lock_path();
749        let metadata = current_process_metadata();
750        write_synthetic_lock(&path, &metadata);
751
752        let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
753        assert!(matches!(result, Err(AcquireError::Timeout)));
754
755        remove_lock_file(&path).expect("cleanup synthetic lock");
756    }
757
758    #[test]
759    fn malformed_lockfile_is_reclaimed() {
760        let (_dir, path) = test_lock_path();
761        fs::write(&path, b"not valid json").expect("write malformed lock");
762
763        let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
764            .expect("reclaim malformed lock");
765        let metadata = read_lock_metadata(&path).expect("read reclaimed lock");
766        assert_eq!(metadata.pid, std::process::id());
767        drop(guard);
768    }
769
770    #[test]
771    fn cross_host_lock_is_not_stolen() {
772        let (_dir, path) = test_lock_path();
773        let now = now_ms().saturating_sub(60_000);
774        let metadata = LockMetadata {
775            pid: std::process::id(),
776            hostname: format!("{}-other", current_hostname()),
777            created_at_ms: now,
778            heartbeat_at_ms: now,
779        };
780        write_synthetic_lock(&path, &metadata);
781
782        let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
783        assert!(matches!(result, Err(AcquireError::Timeout)));
784        assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
785
786        remove_lock_file(&path).expect("cleanup synthetic lock");
787    }
788
789    #[test]
790    fn live_owner_over_10min_warns_but_blocks() {
791        let (_dir, path) = test_lock_path();
792        let mut metadata = current_process_metadata();
793        metadata.created_at_ms = now_ms().saturating_sub(11 * 60 * 1_000);
794        metadata.heartbeat_at_ms = now_ms();
795        write_synthetic_lock(&path, &metadata);
796
797        let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
798        assert!(matches!(result, Err(AcquireError::Timeout)));
799        assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
800
801        remove_lock_file(&path).expect("cleanup synthetic lock");
802    }
803
804    #[test]
805    fn drop_stops_heartbeat_thread() {
806        let (_dir, path) = test_lock_path();
807        let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
808        drop(guard);
809
810        thread::sleep(Duration::from_millis(
811            test_config().heartbeat_interval_ms * 3,
812        ));
813        assert!(
814            !path.exists(),
815            "heartbeat recreated or kept updating lockfile"
816        );
817    }
818}