1use std::fmt;
2use std::fs::{self, File, OpenOptions};
3use std::io::{self, Write};
4use std::path::{Path, PathBuf};
5use std::sync::{
6 atomic::{AtomicBool, AtomicU64, Ordering},
7 mpsc, Arc,
8};
9use std::thread::{self, JoinHandle};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use serde::{Deserialize, Serialize};
13
14use crate::{slog_error, slog_info, slog_warn};
15
16pub const HEARTBEAT_INTERVAL_MS: u64 = 5_000;
17pub const STALE_HEARTBEAT_MS: u64 = 15_000;
18pub const LIVE_OWNER_WARN_MS: u64 = 600_000;
19pub const POLL_INTERVAL_MS: u64 = 100;
20
21#[derive(Clone, Copy, Debug)]
22struct LockConfig {
23 heartbeat_interval_ms: u64,
24 stale_heartbeat_ms: u64,
25 live_owner_warn_ms: u64,
26 poll_interval_ms: u64,
27}
28
29impl Default for LockConfig {
30 fn default() -> Self {
31 Self {
32 heartbeat_interval_ms: HEARTBEAT_INTERVAL_MS,
33 stale_heartbeat_ms: STALE_HEARTBEAT_MS,
34 live_owner_warn_ms: LIVE_OWNER_WARN_MS,
35 poll_interval_ms: POLL_INTERVAL_MS,
36 }
37 }
38}
39
40#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
41struct LockMetadata {
42 pid: u32,
43 hostname: String,
44 created_at_ms: u64,
45 heartbeat_at_ms: u64,
46}
47
48pub fn acquire(path: &Path) -> Result<LockGuard, AcquireError> {
53 acquire_with_config(path, None, LockConfig::default())
54}
55
56pub fn try_acquire(path: &Path, timeout: Duration) -> Result<LockGuard, AcquireError> {
58 acquire_with_config(path, Some(timeout), LockConfig::default())
59}
60
61pub struct LockGuard {
62 path: PathBuf,
63 metadata: LockMetadata,
64 shutdown: Arc<AtomicBool>,
65 heartbeat_done: mpsc::Receiver<()>,
66 heartbeat: Option<JoinHandle<()>>,
67}
68
69impl Drop for LockGuard {
70 fn drop(&mut self) {
71 self.shutdown.store(true, Ordering::Release);
72 if let Some(handle) = self.heartbeat.as_ref() {
73 handle.thread().unpark();
74 }
75
76 if self
77 .heartbeat_done
78 .recv_timeout(Duration::from_millis(100))
79 .is_ok()
80 {
81 if let Some(handle) = self.heartbeat.take() {
82 let _ = handle.join();
83 }
84 } else {
85 slog_warn!(
86 "fs lock heartbeat thread for {} did not stop within 100ms",
87 self.path.display()
88 );
89 }
90
91 match remove_lock_if_owned(&self.path, &self.metadata) {
92 Ok(true) => slog_info!("released filesystem lock at {}", self.path.display()),
93 Ok(false) => {}
94 Err(error) => slog_warn!(
95 "failed to release filesystem lock at {}: {}",
96 self.path.display(),
97 error
98 ),
99 }
100 }
101}
102
103#[derive(Debug)]
104pub enum AcquireError {
105 Io(io::Error),
106 Timeout,
107}
108
109impl fmt::Display for AcquireError {
110 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111 match self {
112 AcquireError::Io(error) => write!(f, "filesystem lock I/O error: {error}"),
113 AcquireError::Timeout => write!(f, "timed out acquiring filesystem lock"),
114 }
115 }
116}
117
118impl std::error::Error for AcquireError {}
119
120impl From<io::Error> for AcquireError {
121 fn from(error: io::Error) -> Self {
122 AcquireError::Io(error)
123 }
124}
125
126fn acquire_with_config(
127 path: &Path,
128 timeout: Option<Duration>,
129 config: LockConfig,
130) -> Result<LockGuard, AcquireError> {
131 let deadline = timeout.map(|timeout| Instant::now() + timeout);
132 let hostname = current_hostname();
133 let mut warned_live_owner = false;
134
135 loop {
136 if let Some(deadline) = deadline {
137 if Instant::now() >= deadline {
138 return Err(AcquireError::Timeout);
139 }
140 }
141
142 match create_new_lock(path, &hostname, config) {
143 Ok(guard) => return Ok(guard),
144 Err(error) if error.kind() == io::ErrorKind::AlreadyExists => {}
145 Err(error) => return Err(error.into()),
146 }
147
148 let metadata = match read_lock_metadata(path) {
149 Ok(metadata) => metadata,
150 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => continue,
151 Err(ReadLockError::Io(error)) => return Err(error.into()),
152 Err(ReadLockError::Malformed(error)) => {
153 sleep_until_retry(deadline, config.poll_interval_ms)?;
157 match read_lock_metadata(path) {
158 Ok(_) => continue,
159 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
160 continue;
161 }
162 Err(ReadLockError::Io(error)) => return Err(error.into()),
163 Err(ReadLockError::Malformed(_)) => {}
164 }
165 slog_warn!(
166 "removing malformed filesystem lock at {}: {}",
167 path.display(),
168 error
169 );
170 remove_lock_file(path)?;
171 continue;
172 }
173 };
174
175 if metadata.hostname != hostname {
176 sleep_until_retry(deadline, config.poll_interval_ms)?;
177 continue;
178 }
179
180 if !process_alive(metadata.pid) {
181 slog_warn!(
182 "removing filesystem lock at {} from dead PID {}",
183 path.display(),
184 metadata.pid
185 );
186 remove_lock_file(path)?;
187 continue;
188 }
189
190 let now = now_ms();
191 let since_heartbeat = now.saturating_sub(metadata.heartbeat_at_ms);
192 if since_heartbeat > config.stale_heartbeat_ms {
193 slog_warn!(
194 "reclaiming filesystem lock at {}: PID {} is alive but heartbeat is stale ({}ms)",
195 path.display(),
196 metadata.pid,
197 since_heartbeat
198 );
199 remove_lock_file(path)?;
200 continue;
201 }
202
203 let held_for = now.saturating_sub(metadata.created_at_ms);
204 if held_for > config.live_owner_warn_ms && !warned_live_owner {
205 slog_warn!(
206 "filesystem lock at {} held >10min by live heartbeating PID {}; NOT breaking",
207 path.display(),
208 metadata.pid
209 );
210 warned_live_owner = true;
211 }
212
213 sleep_until_retry(deadline, config.poll_interval_ms)?;
214 }
215}
216
217fn create_new_lock(path: &Path, hostname: &str, config: LockConfig) -> io::Result<LockGuard> {
218 let now = now_ms();
219 let metadata = LockMetadata {
220 pid: std::process::id(),
221 hostname: hostname.to_string(),
222 created_at_ms: now,
223 heartbeat_at_ms: now,
224 };
225
226 create_lock_file_atomically(path, &metadata)?;
227
228 let shutdown = Arc::new(AtomicBool::new(false));
229 let (done_tx, done_rx) = mpsc::channel();
230 let heartbeat_path = path.to_path_buf();
231 let heartbeat_metadata = metadata.clone();
232 let heartbeat_shutdown = Arc::clone(&shutdown);
233 let heartbeat = thread::Builder::new()
234 .name("aft-fs-lock-heartbeat".to_string())
235 .spawn(move || {
236 run_heartbeat(
237 heartbeat_path,
238 heartbeat_metadata,
239 heartbeat_shutdown,
240 config,
241 );
242 let _ = done_tx.send(());
243 })?;
244
245 slog_info!("acquired filesystem lock at {}", path.display());
246
247 Ok(LockGuard {
248 path: path.to_path_buf(),
249 metadata,
250 shutdown,
251 heartbeat_done: done_rx,
252 heartbeat: Some(heartbeat),
253 })
254}
255
256fn run_heartbeat(
257 path: PathBuf,
258 owner: LockMetadata,
259 shutdown: Arc<AtomicBool>,
260 config: LockConfig,
261) {
262 loop {
263 thread::park_timeout(Duration::from_millis(config.heartbeat_interval_ms));
264 if shutdown.load(Ordering::Acquire) {
265 return;
266 }
267
268 match heartbeat_once(&path, &owner) {
269 Ok(()) => {}
270 Err(HeartbeatError::LockGone) => {
271 slog_error!(
272 "filesystem lock at {} disappeared; stopping heartbeat",
273 path.display()
274 );
275 return;
276 }
277 Err(HeartbeatError::NotOwner) => {
278 slog_error!(
279 "filesystem lock at {} is no longer owned by this guard; stopping heartbeat",
280 path.display()
281 );
282 return;
283 }
284 Err(HeartbeatError::Malformed(error)) => {
285 slog_error!(
286 "filesystem lock at {} became malformed: {}; stopping heartbeat",
287 path.display(),
288 error
289 );
290 return;
291 }
292 Err(HeartbeatError::Io(error)) => {
293 slog_error!(
294 "failed to heartbeat filesystem lock at {}: {}; stopping heartbeat",
295 path.display(),
296 error
297 );
298 return;
299 }
300 }
301 }
302}
303
304fn heartbeat_once(path: &Path, owner: &LockMetadata) -> Result<(), HeartbeatError> {
305 let mut metadata = match read_lock_metadata(path) {
306 Ok(metadata) => metadata,
307 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
308 return Err(HeartbeatError::LockGone);
309 }
310 Err(ReadLockError::Io(error)) => return Err(HeartbeatError::Io(error)),
311 Err(ReadLockError::Malformed(error)) => return Err(HeartbeatError::Malformed(error)),
312 };
313
314 if metadata.pid != owner.pid
315 || metadata.hostname != owner.hostname
316 || metadata.created_at_ms != owner.created_at_ms
317 {
318 return Err(HeartbeatError::NotOwner);
319 }
320
321 metadata.heartbeat_at_ms = now_ms();
322 atomic_write_lock_metadata(path, &metadata).map_err(HeartbeatError::Io)
323}
324
325#[derive(Debug)]
326enum HeartbeatError {
327 Io(io::Error),
328 LockGone,
329 Malformed(serde_json::Error),
330 NotOwner,
331}
332
333#[derive(Debug)]
334enum ReadLockError {
335 Io(io::Error),
336 Malformed(serde_json::Error),
337}
338
339fn read_lock_metadata(path: &Path) -> Result<LockMetadata, ReadLockError> {
340 let bytes = fs::read(path).map_err(ReadLockError::Io)?;
341 serde_json::from_slice(&bytes).map_err(ReadLockError::Malformed)
342}
343
344#[cfg(unix)]
345fn open_new_lock_file(path: &Path) -> io::Result<File> {
346 use std::os::unix::fs::OpenOptionsExt;
347
348 OpenOptions::new()
349 .write(true)
350 .create_new(true)
351 .mode(0o644)
352 .open(path)
353}
354
355#[cfg(not(unix))]
356fn open_new_lock_file(path: &Path) -> io::Result<File> {
357 OpenOptions::new().write(true).create_new(true).open(path)
358}
359
360fn write_lock_metadata_to_file(file: &mut File, metadata: &LockMetadata) -> io::Result<()> {
361 serde_json::to_writer(&mut *file, metadata).map_err(io::Error::other)?;
362 file.write_all(b"\n")?;
363 file.sync_all()
364}
365
366fn create_lock_file_atomically(path: &Path, metadata: &LockMetadata) -> io::Result<()> {
367 let tmp_path = temp_path_for_lock(path);
368 let result = (|| {
369 let mut file = open_new_lock_file(&tmp_path)?;
370 write_lock_metadata_to_file(&mut file, metadata)?;
371 drop(file);
372
373 fs::hard_link(&tmp_path, path)?;
374 sync_parent(path);
375 Ok(())
376 })();
377
378 let _ = fs::remove_file(&tmp_path);
379 result
380}
381
382fn atomic_write_lock_metadata(path: &Path, metadata: &LockMetadata) -> io::Result<()> {
383 let tmp_path = temp_path_for_lock(path);
384 let write_result = (|| {
385 let mut file = OpenOptions::new()
386 .write(true)
387 .create_new(true)
388 .open(&tmp_path)?;
389 write_lock_metadata_to_file(&mut file, metadata)?;
390 drop(file);
391
392 rename_over(&tmp_path, path)?;
393 sync_parent(path);
394 Ok(())
395 })();
396
397 if write_result.is_err() {
398 let _ = fs::remove_file(&tmp_path);
399 }
400
401 write_result
402}
403
404#[cfg(windows)]
405fn rename_over(from: &Path, to: &Path) -> io::Result<()> {
406 let _ = fs::remove_file(to);
407 fs::rename(from, to)
408}
409
410#[cfg(not(windows))]
411fn rename_over(from: &Path, to: &Path) -> io::Result<()> {
412 fs::rename(from, to)
413}
414
415static TEMP_LOCK_COUNTER: AtomicU64 = AtomicU64::new(0);
428
429fn temp_path_for_lock(path: &Path) -> PathBuf {
430 let file_name = path
431 .file_name()
432 .and_then(|name| name.to_str())
433 .unwrap_or("lock");
434 let seq = TEMP_LOCK_COUNTER.fetch_add(1, Ordering::Relaxed);
435 path.with_file_name(format!(
436 ".{file_name}.tmp.{}.{}.{}",
437 std::process::id(),
438 now_nanos(),
439 seq
440 ))
441}
442
443fn remove_lock_if_owned(path: &Path, owner: &LockMetadata) -> io::Result<bool> {
444 let metadata = match read_lock_metadata(path) {
445 Ok(metadata) => metadata,
446 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
447 return Ok(false);
448 }
449 Err(ReadLockError::Io(error)) => return Err(error),
450 Err(ReadLockError::Malformed(_)) => return Ok(false),
451 };
452
453 if metadata.pid == owner.pid
454 && metadata.hostname == owner.hostname
455 && metadata.created_at_ms == owner.created_at_ms
456 {
457 remove_lock_file(path)?;
458 Ok(true)
459 } else {
460 Ok(false)
461 }
462}
463
464fn remove_lock_file(path: &Path) -> io::Result<()> {
465 match fs::remove_file(path) {
466 Ok(()) => Ok(()),
467 Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(()),
468 Err(error) => Err(error),
469 }
470}
471
472fn sleep_until_retry(deadline: Option<Instant>, poll_interval_ms: u64) -> Result<(), AcquireError> {
473 let poll = Duration::from_millis(poll_interval_ms);
474 let sleep_for = match deadline {
475 Some(deadline) => {
476 let now = Instant::now();
477 if now >= deadline {
478 return Err(AcquireError::Timeout);
479 }
480 poll.min(deadline.saturating_duration_since(now))
481 }
482 None => poll,
483 };
484 thread::sleep(sleep_for);
485 Ok(())
486}
487
488fn sync_parent(path: &Path) {
489 if let Some(parent) = path.parent() {
490 if let Ok(dir) = File::open(parent) {
491 let _ = dir.sync_all();
492 }
493 }
494}
495
496fn now_ms() -> u64 {
497 SystemTime::now()
498 .duration_since(UNIX_EPOCH)
499 .unwrap_or(Duration::ZERO)
500 .as_millis() as u64
501}
502
503fn now_nanos() -> u128 {
504 SystemTime::now()
505 .duration_since(UNIX_EPOCH)
506 .unwrap_or(Duration::ZERO)
507 .as_nanos()
508}
509
510#[cfg(unix)]
511fn current_hostname() -> String {
512 let mut buffer = [0u8; 256];
513 let result = unsafe { libc::gethostname(buffer.as_mut_ptr().cast(), buffer.len()) };
514 if result == 0 {
515 let len = buffer
516 .iter()
517 .position(|byte| *byte == 0)
518 .unwrap_or(buffer.len());
519 if len > 0 {
520 return String::from_utf8_lossy(&buffer[..len]).into_owned();
521 }
522 }
523
524 std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown-host".to_string())
525}
526
527#[cfg(windows)]
528fn current_hostname() -> String {
529 std::env::var("COMPUTERNAME")
530 .or_else(|_| std::env::var("HOSTNAME"))
531 .unwrap_or_else(|_| "unknown-host".to_string())
532}
533
534#[cfg(not(any(unix, windows)))]
535fn current_hostname() -> String {
536 std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown-host".to_string())
537}
538
539#[cfg(unix)]
540fn process_alive(pid: u32) -> bool {
541 if pid == 0 || pid > i32::MAX as u32 {
542 return false;
543 }
544
545 let result = unsafe { libc::kill(pid as libc::pid_t, 0) };
546 if result == 0 {
547 return true;
548 }
549
550 io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
551}
552
553#[cfg(windows)]
554fn process_alive(pid: u32) -> bool {
555 let filter = format!("PID eq {pid}");
556 let Ok(output) = std::process::Command::new("tasklist")
557 .args(["/FI", &filter, "/FO", "CSV", "/NH"])
558 .output()
559 else {
560 return true;
561 };
562
563 if !output.status.success() {
564 return true;
565 }
566
567 let stdout = String::from_utf8_lossy(&output.stdout);
568
569 if stdout.contains("No tasks are running") {
580 return false;
581 }
582 stdout.contains(&format!("\"{pid}\""))
583}
584
585#[cfg(not(any(unix, windows)))]
586fn process_alive(_pid: u32) -> bool {
587 true
588}
589
590#[cfg(test)]
591mod tests {
592 use super::*;
593 use std::sync::atomic::{AtomicUsize, Ordering};
594 use std::sync::{Arc, Barrier};
595
596 fn test_config() -> LockConfig {
597 LockConfig {
598 heartbeat_interval_ms: 25,
599 stale_heartbeat_ms: 2_000,
600 live_owner_warn_ms: LIVE_OWNER_WARN_MS,
601 poll_interval_ms: 10,
602 }
603 }
604
605 fn test_lock_path() -> (tempfile::TempDir, PathBuf) {
606 let dir = tempfile::tempdir().expect("create temp dir");
607 let path = dir.path().join("test.lock");
608 (dir, path)
609 }
610
611 fn write_synthetic_lock(path: &Path, metadata: &LockMetadata) {
612 let mut file = open_new_lock_file(path).expect("create synthetic lock");
613 write_lock_metadata_to_file(&mut file, metadata).expect("write synthetic lock");
614 }
615
616 fn synthetic_metadata(pid: u32, hostname: String, created_at_ms: u64) -> LockMetadata {
617 LockMetadata {
618 pid,
619 hostname,
620 created_at_ms,
621 heartbeat_at_ms: created_at_ms,
622 }
623 }
624
625 fn current_process_metadata() -> LockMetadata {
626 let now = now_ms();
627 synthetic_metadata(std::process::id(), current_hostname(), now)
628 }
629
630 #[test]
631 fn acquire_creates_lockfile_and_unlocks_on_drop() {
632 let (_dir, path) = test_lock_path();
633
634 let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
635 let metadata = read_lock_metadata(&path).expect("read lock metadata");
636 assert_eq!(metadata.pid, std::process::id());
637 assert_eq!(metadata.hostname, current_hostname());
638 assert_eq!(metadata.created_at_ms, guard.metadata.created_at_ms);
639
640 drop(guard);
641 assert!(!path.exists());
642 }
643
644 #[test]
645 fn acquire_serializes_concurrent_callers() {
646 let (_dir, path) = test_lock_path();
647 let path = Arc::new(path);
648 let barrier = Arc::new(Barrier::new(3));
649 let inside = Arc::new(AtomicUsize::new(0));
650 let entered = Arc::new(AtomicUsize::new(0));
651 let max_inside = Arc::new(AtomicUsize::new(0));
652
653 let mut handles = Vec::new();
654 for _ in 0..2 {
655 let path = Arc::clone(&path);
656 let barrier = Arc::clone(&barrier);
657 let inside = Arc::clone(&inside);
658 let entered = Arc::clone(&entered);
659 let max_inside = Arc::clone(&max_inside);
660 handles.push(thread::spawn(move || {
661 barrier.wait();
662 let guard = acquire_with_config(&path, Some(Duration::from_secs(2)), test_config())
663 .expect("thread acquire lock");
664 let previous = inside.fetch_add(1, Ordering::SeqCst);
665 assert_eq!(previous, 0, "two lock holders overlapped");
666 entered.fetch_add(1, Ordering::SeqCst);
667 max_inside.fetch_max(previous + 1, Ordering::SeqCst);
668 thread::sleep(Duration::from_millis(75));
669 inside.fetch_sub(1, Ordering::SeqCst);
670 drop(guard);
671 }));
672 }
673
674 barrier.wait();
675 for handle in handles {
676 handle.join().expect("join worker");
677 }
678
679 assert_eq!(entered.load(Ordering::SeqCst), 2);
680 assert_eq!(max_inside.load(Ordering::SeqCst), 1);
681 assert!(!path.exists());
682 }
683
684 #[test]
685 fn heartbeat_updates_lockfile_timestamp() {
686 let (_dir, path) = test_lock_path();
687 let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
688 let initial = read_lock_metadata(&path)
689 .expect("read initial metadata")
690 .heartbeat_at_ms;
691
692 let deadline = std::time::Instant::now() + Duration::from_millis(2_000);
701 let mut updated = initial;
702 while std::time::Instant::now() < deadline {
703 thread::sleep(Duration::from_millis(50));
704 updated = read_lock_metadata(&path)
705 .expect("read updated metadata")
706 .heartbeat_at_ms;
707 if updated > initial {
708 break;
709 }
710 }
711 assert!(
712 updated > initial,
713 "heartbeat timestamp did not advance within 2s"
714 );
715 drop(guard);
716 }
717
718 #[test]
719 fn dead_pid_lock_is_reclaimed() {
720 let (_dir, path) = test_lock_path();
721 let metadata = synthetic_metadata(999_999_999, current_hostname(), now_ms());
722 write_synthetic_lock(&path, &metadata);
723
724 let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
725 .expect("reclaim dead pid lock");
726 let metadata = read_lock_metadata(&path).expect("read reclaimed lock");
727 assert_eq!(metadata.pid, std::process::id());
728 drop(guard);
729 }
730
731 #[test]
732 fn stale_heartbeat_lock_is_reclaimed() {
733 let (_dir, path) = test_lock_path();
734 let mut metadata = current_process_metadata();
735 metadata.created_at_ms = now_ms().saturating_sub(60_000);
736 metadata.heartbeat_at_ms = now_ms().saturating_sub(60_000);
737 write_synthetic_lock(&path, &metadata);
738
739 let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
740 .expect("reclaim stale heartbeat lock");
741 let reclaimed = read_lock_metadata(&path).expect("read reclaimed lock");
742 assert_ne!(reclaimed.created_at_ms, metadata.created_at_ms);
743 drop(guard);
744 }
745
746 #[test]
747 fn healthy_live_owner_blocks() {
748 let (_dir, path) = test_lock_path();
749 let metadata = current_process_metadata();
750 write_synthetic_lock(&path, &metadata);
751
752 let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
753 assert!(matches!(result, Err(AcquireError::Timeout)));
754
755 remove_lock_file(&path).expect("cleanup synthetic lock");
756 }
757
758 #[test]
759 fn malformed_lockfile_is_reclaimed() {
760 let (_dir, path) = test_lock_path();
761 fs::write(&path, b"not valid json").expect("write malformed lock");
762
763 let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
764 .expect("reclaim malformed lock");
765 let metadata = read_lock_metadata(&path).expect("read reclaimed lock");
766 assert_eq!(metadata.pid, std::process::id());
767 drop(guard);
768 }
769
770 #[test]
771 fn cross_host_lock_is_not_stolen() {
772 let (_dir, path) = test_lock_path();
773 let now = now_ms().saturating_sub(60_000);
774 let metadata = LockMetadata {
775 pid: std::process::id(),
776 hostname: format!("{}-other", current_hostname()),
777 created_at_ms: now,
778 heartbeat_at_ms: now,
779 };
780 write_synthetic_lock(&path, &metadata);
781
782 let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
783 assert!(matches!(result, Err(AcquireError::Timeout)));
784 assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
785
786 remove_lock_file(&path).expect("cleanup synthetic lock");
787 }
788
789 #[test]
790 fn live_owner_over_10min_warns_but_blocks() {
791 let (_dir, path) = test_lock_path();
792 let mut metadata = current_process_metadata();
793 metadata.created_at_ms = now_ms().saturating_sub(11 * 60 * 1_000);
794 metadata.heartbeat_at_ms = now_ms();
795 write_synthetic_lock(&path, &metadata);
796
797 let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
798 assert!(matches!(result, Err(AcquireError::Timeout)));
799 assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
800
801 remove_lock_file(&path).expect("cleanup synthetic lock");
802 }
803
804 #[test]
805 fn drop_stops_heartbeat_thread() {
806 let (_dir, path) = test_lock_path();
807 let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
808 drop(guard);
809
810 thread::sleep(Duration::from_millis(
811 test_config().heartbeat_interval_ms * 3,
812 ));
813 assert!(
814 !path.exists(),
815 "heartbeat recreated or kept updating lockfile"
816 );
817 }
818}