1use std::fmt;
2use std::fs::{self, File, OpenOptions};
3use std::io::{self, Write};
4use std::path::{Path, PathBuf};
5use std::sync::{
6 atomic::{AtomicBool, AtomicU64, Ordering},
7 mpsc, Arc,
8};
9use std::thread::{self, JoinHandle};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use serde::{Deserialize, Serialize};
13
14use crate::{slog_error, slog_info, slog_warn};
15
16pub const HEARTBEAT_INTERVAL_MS: u64 = 5_000;
17pub const STALE_HEARTBEAT_MS: u64 = 15_000;
18pub const LIVE_OWNER_WARN_MS: u64 = 600_000;
19pub const POLL_INTERVAL_MS: u64 = 100;
20
21#[derive(Clone, Copy, Debug)]
22struct LockConfig {
23 heartbeat_interval_ms: u64,
24 stale_heartbeat_ms: u64,
25 live_owner_warn_ms: u64,
26 poll_interval_ms: u64,
27}
28
29impl LockConfig {
30 fn cross_host_stale_heartbeat_ms(self) -> u64 {
31 self.stale_heartbeat_ms.saturating_mul(5)
32 }
33}
34
35impl Default for LockConfig {
36 fn default() -> Self {
37 Self {
38 heartbeat_interval_ms: HEARTBEAT_INTERVAL_MS,
39 stale_heartbeat_ms: STALE_HEARTBEAT_MS,
40 live_owner_warn_ms: LIVE_OWNER_WARN_MS,
41 poll_interval_ms: POLL_INTERVAL_MS,
42 }
43 }
44}
45
46#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
47struct LockMetadata {
48 pid: u32,
49 hostname: String,
50 created_at_ms: u64,
51 heartbeat_at_ms: u64,
52}
53
54pub fn acquire(path: &Path) -> Result<LockGuard, AcquireError> {
59 acquire_with_config(path, None, LockConfig::default())
60}
61
62pub fn try_acquire(path: &Path, timeout: Duration) -> Result<LockGuard, AcquireError> {
64 acquire_with_config(path, Some(timeout), LockConfig::default())
65}
66
67pub struct LockGuard {
68 path: PathBuf,
69 metadata: LockMetadata,
70 shutdown: Arc<AtomicBool>,
71 heartbeat_done: mpsc::Receiver<()>,
72 heartbeat: Option<JoinHandle<()>>,
73}
74
75impl Drop for LockGuard {
76 fn drop(&mut self) {
77 self.shutdown.store(true, Ordering::Release);
101 if let Some(handle) = self.heartbeat.take() {
102 handle.thread().unpark();
103 let _ = handle.join();
104 }
105 while self.heartbeat_done.try_recv().is_ok() {}
109
110 match remove_lock_if_owned(&self.path, &self.metadata) {
111 Ok(true) => slog_info!("released filesystem lock at {}", self.path.display()),
112 Ok(false) => {}
113 Err(error) => slog_warn!(
114 "failed to release filesystem lock at {}: {}",
115 self.path.display(),
116 error
117 ),
118 }
119 }
120}
121
122#[derive(Debug)]
123pub enum AcquireError {
124 Io(io::Error),
125 Timeout,
126}
127
128impl fmt::Display for AcquireError {
129 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130 match self {
131 AcquireError::Io(error) => write!(f, "filesystem lock I/O error: {error}"),
132 AcquireError::Timeout => write!(f, "timed out acquiring filesystem lock"),
133 }
134 }
135}
136
137impl std::error::Error for AcquireError {}
138
139impl From<io::Error> for AcquireError {
140 fn from(error: io::Error) -> Self {
141 AcquireError::Io(error)
142 }
143}
144
145fn acquire_with_config(
146 path: &Path,
147 timeout: Option<Duration>,
148 config: LockConfig,
149) -> Result<LockGuard, AcquireError> {
150 let deadline = timeout.map(|timeout| Instant::now() + timeout);
151 let hostname = current_hostname();
152 let mut warned_live_owner = false;
153 let mut warned_stale_live_owner = false;
154
155 loop {
156 if let Some(deadline) = deadline {
157 if Instant::now() >= deadline {
158 return Err(AcquireError::Timeout);
159 }
160 }
161
162 match create_new_lock(path, &hostname, config) {
163 Ok(guard) => return Ok(guard),
164 Err(error) if error.kind() == io::ErrorKind::AlreadyExists => {}
165 Err(error) => return Err(error.into()),
166 }
167
168 let metadata = match read_lock_metadata(path) {
169 Ok(metadata) => metadata,
170 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => continue,
171 Err(ReadLockError::Io(error)) => return Err(error.into()),
172 Err(ReadLockError::Malformed(error)) => {
173 sleep_until_retry(deadline, config.poll_interval_ms)?;
177 match read_lock_metadata(path) {
178 Ok(_) => continue,
179 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
180 continue;
181 }
182 Err(ReadLockError::Io(error)) => return Err(error.into()),
183 Err(ReadLockError::Malformed(_)) => {}
184 }
185 slog_warn!(
186 "removing malformed filesystem lock at {}: {}",
187 path.display(),
188 error
189 );
190 remove_lock_file(path)?;
191 continue;
192 }
193 };
194
195 let now = now_ms();
196 let since_heartbeat = now.saturating_sub(metadata.heartbeat_at_ms);
197
198 if metadata.hostname != hostname {
199 let cross_host_stale_ms = config.cross_host_stale_heartbeat_ms();
200 if since_heartbeat > cross_host_stale_ms {
201 slog_warn!(
202 "reclaiming cross-host filesystem lock at {} from host {} after stale heartbeat ({}ms > {}ms)",
203 path.display(),
204 metadata.hostname,
205 since_heartbeat,
206 cross_host_stale_ms
207 );
208 remove_lock_file(path)?;
209 continue;
210 }
211 sleep_until_retry(deadline, config.poll_interval_ms)?;
212 continue;
213 }
214
215 if !process_alive(metadata.pid) {
216 slog_warn!(
217 "removing filesystem lock at {} from dead PID {}",
218 path.display(),
219 metadata.pid
220 );
221 remove_lock_file(path)?;
222 continue;
223 }
224
225 if since_heartbeat > config.stale_heartbeat_ms && !warned_stale_live_owner {
226 slog_warn!(
232 "filesystem lock at {} held by live PID {} has stale heartbeat ({}ms); NOT breaking",
233 path.display(),
234 metadata.pid,
235 since_heartbeat
236 );
237 warned_stale_live_owner = true;
238 }
239
240 let held_for = now.saturating_sub(metadata.created_at_ms);
241 if held_for > config.live_owner_warn_ms && !warned_live_owner {
242 slog_warn!(
243 "filesystem lock at {} held >10min by live heartbeating PID {}; NOT breaking",
244 path.display(),
245 metadata.pid
246 );
247 warned_live_owner = true;
248 }
249
250 sleep_until_retry(deadline, config.poll_interval_ms)?;
251 }
252}
253
254fn create_new_lock(path: &Path, hostname: &str, config: LockConfig) -> io::Result<LockGuard> {
255 let now = now_ms();
256 let metadata = LockMetadata {
257 pid: std::process::id(),
258 hostname: hostname.to_string(),
259 created_at_ms: now,
260 heartbeat_at_ms: now,
261 };
262
263 create_lock_file_atomically(path, &metadata)?;
264
265 let shutdown = Arc::new(AtomicBool::new(false));
266 let (done_tx, done_rx) = mpsc::channel();
267 let heartbeat_path = path.to_path_buf();
268 let heartbeat_metadata = metadata.clone();
269 let heartbeat_shutdown = Arc::clone(&shutdown);
270 let heartbeat = thread::Builder::new()
271 .name("aft-fs-lock-heartbeat".to_string())
272 .spawn(move || {
273 run_heartbeat(
274 heartbeat_path,
275 heartbeat_metadata,
276 heartbeat_shutdown,
277 config,
278 );
279 let _ = done_tx.send(());
280 })?;
281
282 slog_info!("acquired filesystem lock at {}", path.display());
283
284 Ok(LockGuard {
285 path: path.to_path_buf(),
286 metadata,
287 shutdown,
288 heartbeat_done: done_rx,
289 heartbeat: Some(heartbeat),
290 })
291}
292
293fn run_heartbeat(
294 path: PathBuf,
295 owner: LockMetadata,
296 shutdown: Arc<AtomicBool>,
297 config: LockConfig,
298) {
299 loop {
300 thread::park_timeout(Duration::from_millis(config.heartbeat_interval_ms));
301 if shutdown.load(Ordering::Acquire) {
302 return;
303 }
304
305 match heartbeat_once(&path, &owner) {
306 Ok(()) => {}
307 Err(HeartbeatError::LockGone) => {
308 slog_error!(
309 "filesystem lock at {} disappeared; stopping heartbeat",
310 path.display()
311 );
312 return;
313 }
314 Err(HeartbeatError::NotOwner) => {
315 slog_error!(
316 "filesystem lock at {} is no longer owned by this guard; stopping heartbeat",
317 path.display()
318 );
319 return;
320 }
321 Err(HeartbeatError::Malformed(error)) => {
322 slog_error!(
323 "filesystem lock at {} became malformed: {}; stopping heartbeat",
324 path.display(),
325 error
326 );
327 return;
328 }
329 Err(HeartbeatError::Io(error)) => {
330 slog_error!(
331 "failed to heartbeat filesystem lock at {}: {}; stopping heartbeat",
332 path.display(),
333 error
334 );
335 return;
336 }
337 }
338 }
339}
340
341fn heartbeat_once(path: &Path, owner: &LockMetadata) -> Result<(), HeartbeatError> {
342 let mut metadata = match read_lock_metadata(path) {
343 Ok(metadata) => metadata,
344 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
345 return Err(HeartbeatError::LockGone);
346 }
347 Err(ReadLockError::Io(error)) => return Err(HeartbeatError::Io(error)),
348 Err(ReadLockError::Malformed(error)) => return Err(HeartbeatError::Malformed(error)),
349 };
350
351 if metadata.pid != owner.pid
352 || metadata.hostname != owner.hostname
353 || metadata.created_at_ms != owner.created_at_ms
354 {
355 return Err(HeartbeatError::NotOwner);
356 }
357
358 metadata.heartbeat_at_ms = now_ms();
359 atomic_write_lock_metadata(path, &metadata).map_err(HeartbeatError::Io)
360}
361
362#[derive(Debug)]
363enum HeartbeatError {
364 Io(io::Error),
365 LockGone,
366 Malformed(serde_json::Error),
367 NotOwner,
368}
369
370#[derive(Debug)]
371enum ReadLockError {
372 Io(io::Error),
373 Malformed(serde_json::Error),
374}
375
376fn read_lock_metadata(path: &Path) -> Result<LockMetadata, ReadLockError> {
377 let bytes = fs::read(path).map_err(ReadLockError::Io)?;
378 serde_json::from_slice(&bytes).map_err(ReadLockError::Malformed)
379}
380
381#[cfg(unix)]
382fn open_new_lock_file(path: &Path) -> io::Result<File> {
383 use std::os::unix::fs::OpenOptionsExt;
384
385 OpenOptions::new()
386 .write(true)
387 .create_new(true)
388 .mode(0o644)
389 .open(path)
390}
391
392#[cfg(not(unix))]
393fn open_new_lock_file(path: &Path) -> io::Result<File> {
394 OpenOptions::new().write(true).create_new(true).open(path)
395}
396
397fn write_lock_metadata_to_file(file: &mut File, metadata: &LockMetadata) -> io::Result<()> {
398 serde_json::to_writer(&mut *file, metadata).map_err(io::Error::other)?;
399 file.write_all(b"\n")?;
400 file.sync_all()
401}
402
403fn create_lock_file_atomically(path: &Path, metadata: &LockMetadata) -> io::Result<()> {
404 let tmp_path = temp_path_for_lock(path);
405 let result = (|| {
406 let mut file = open_new_lock_file(&tmp_path)?;
407 write_lock_metadata_to_file(&mut file, metadata)?;
408 drop(file);
409
410 fs::hard_link(&tmp_path, path)?;
411 sync_parent(path);
412 Ok(())
413 })();
414
415 let _ = fs::remove_file(&tmp_path);
416 result
417}
418
419fn atomic_write_lock_metadata(path: &Path, metadata: &LockMetadata) -> io::Result<()> {
420 let tmp_path = temp_path_for_lock(path);
421 let write_result = (|| {
422 let mut file = OpenOptions::new()
423 .write(true)
424 .create_new(true)
425 .open(&tmp_path)?;
426 write_lock_metadata_to_file(&mut file, metadata)?;
427 drop(file);
428
429 rename_over(&tmp_path, path)?;
430 sync_parent(path);
431 Ok(())
432 })();
433
434 if write_result.is_err() {
435 let _ = fs::remove_file(&tmp_path);
436 }
437
438 write_result
439}
440
441#[cfg(windows)]
442fn rename_over(from: &Path, to: &Path) -> io::Result<()> {
443 let _ = fs::remove_file(to);
444 fs::rename(from, to)
445}
446
447#[cfg(not(windows))]
448fn rename_over(from: &Path, to: &Path) -> io::Result<()> {
449 fs::rename(from, to)
450}
451
452static TEMP_LOCK_COUNTER: AtomicU64 = AtomicU64::new(0);
465
466fn temp_path_for_lock(path: &Path) -> PathBuf {
467 let file_name = path
468 .file_name()
469 .and_then(|name| name.to_str())
470 .unwrap_or("lock");
471 let seq = TEMP_LOCK_COUNTER.fetch_add(1, Ordering::Relaxed);
472 path.with_file_name(format!(
473 ".{file_name}.tmp.{}.{}.{}",
474 std::process::id(),
475 now_nanos(),
476 seq
477 ))
478}
479
480fn remove_lock_if_owned(path: &Path, owner: &LockMetadata) -> io::Result<bool> {
481 let metadata = match read_lock_metadata(path) {
482 Ok(metadata) => metadata,
483 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
484 return Ok(false);
485 }
486 Err(ReadLockError::Io(error)) => return Err(error),
487 Err(ReadLockError::Malformed(_)) => return Ok(false),
488 };
489
490 if metadata.pid == owner.pid
491 && metadata.hostname == owner.hostname
492 && metadata.created_at_ms == owner.created_at_ms
493 {
494 remove_lock_file(path)?;
495 Ok(true)
496 } else {
497 Ok(false)
498 }
499}
500
501fn remove_lock_file(path: &Path) -> io::Result<()> {
502 match fs::remove_file(path) {
503 Ok(()) => Ok(()),
504 Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(()),
505 Err(error) => Err(error),
506 }
507}
508
509fn sleep_until_retry(deadline: Option<Instant>, poll_interval_ms: u64) -> Result<(), AcquireError> {
510 let poll = Duration::from_millis(poll_interval_ms);
511 let sleep_for = match deadline {
512 Some(deadline) => {
513 let now = Instant::now();
514 if now >= deadline {
515 return Err(AcquireError::Timeout);
516 }
517 poll.min(deadline.saturating_duration_since(now))
518 }
519 None => poll,
520 };
521 thread::sleep(sleep_for);
522 Ok(())
523}
524
525fn sync_parent(path: &Path) {
526 if let Some(parent) = path.parent() {
527 if let Ok(dir) = File::open(parent) {
528 let _ = dir.sync_all();
529 }
530 }
531}
532
533fn now_ms() -> u64 {
534 SystemTime::now()
535 .duration_since(UNIX_EPOCH)
536 .unwrap_or(Duration::ZERO)
537 .as_millis() as u64
538}
539
540fn now_nanos() -> u128 {
541 SystemTime::now()
542 .duration_since(UNIX_EPOCH)
543 .unwrap_or(Duration::ZERO)
544 .as_nanos()
545}
546
547#[cfg(unix)]
548fn current_hostname() -> String {
549 let mut buffer = [0u8; 256];
550 let result = unsafe { libc::gethostname(buffer.as_mut_ptr().cast(), buffer.len()) };
551 if result == 0 {
552 let len = buffer
553 .iter()
554 .position(|byte| *byte == 0)
555 .unwrap_or(buffer.len());
556 if len > 0 {
557 return String::from_utf8_lossy(&buffer[..len]).into_owned();
558 }
559 }
560
561 std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown-host".to_string())
562}
563
564#[cfg(windows)]
565fn current_hostname() -> String {
566 std::env::var("COMPUTERNAME")
567 .or_else(|_| std::env::var("HOSTNAME"))
568 .unwrap_or_else(|_| "unknown-host".to_string())
569}
570
571#[cfg(not(any(unix, windows)))]
572fn current_hostname() -> String {
573 std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown-host".to_string())
574}
575
576#[cfg(unix)]
577fn process_alive(pid: u32) -> bool {
578 if pid == 0 || pid > i32::MAX as u32 {
579 return false;
580 }
581
582 let result = unsafe { libc::kill(pid as libc::pid_t, 0) };
583 if result == 0 {
584 return true;
585 }
586
587 io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
588}
589
590#[cfg(windows)]
591fn process_alive(pid: u32) -> bool {
592 let filter = format!("PID eq {pid}");
593 let Ok(output) = std::process::Command::new("tasklist")
594 .args(["/FI", &filter, "/FO", "CSV", "/NH"])
595 .output()
596 else {
597 return true;
598 };
599
600 if !output.status.success() {
601 return true;
602 }
603
604 let stdout = String::from_utf8_lossy(&output.stdout);
605
606 if stdout.contains("No tasks are running") {
617 return false;
618 }
619 stdout.contains(&format!("\"{pid}\""))
620}
621
622#[cfg(not(any(unix, windows)))]
623fn process_alive(_pid: u32) -> bool {
624 true
625}
626
627#[cfg(test)]
628mod tests {
629 use super::*;
630 use std::sync::atomic::{AtomicUsize, Ordering};
631 use std::sync::{Arc, Barrier};
632
633 fn test_config() -> LockConfig {
634 LockConfig {
635 heartbeat_interval_ms: 25,
636 stale_heartbeat_ms: 2_000,
637 live_owner_warn_ms: LIVE_OWNER_WARN_MS,
638 poll_interval_ms: 10,
639 }
640 }
641
642 fn test_lock_path() -> (tempfile::TempDir, PathBuf) {
643 let dir = tempfile::tempdir().expect("create temp dir");
644 let path = dir.path().join("test.lock");
645 (dir, path)
646 }
647
648 fn write_synthetic_lock(path: &Path, metadata: &LockMetadata) {
649 let mut file = open_new_lock_file(path).expect("create synthetic lock");
650 write_lock_metadata_to_file(&mut file, metadata).expect("write synthetic lock");
651 }
652
653 fn synthetic_metadata(pid: u32, hostname: String, created_at_ms: u64) -> LockMetadata {
654 LockMetadata {
655 pid,
656 hostname,
657 created_at_ms,
658 heartbeat_at_ms: created_at_ms,
659 }
660 }
661
662 fn current_process_metadata() -> LockMetadata {
663 let now = now_ms();
664 synthetic_metadata(std::process::id(), current_hostname(), now)
665 }
666
667 #[test]
668 fn acquire_creates_lockfile_and_unlocks_on_drop() {
669 let (_dir, path) = test_lock_path();
670
671 let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
672 let metadata = read_lock_metadata(&path).expect("read lock metadata");
673 assert_eq!(metadata.pid, std::process::id());
674 assert_eq!(metadata.hostname, current_hostname());
675 assert_eq!(metadata.created_at_ms, guard.metadata.created_at_ms);
676
677 drop(guard);
678 assert!(!path.exists());
679 }
680
681 #[test]
682 fn acquire_serializes_concurrent_callers() {
683 let (_dir, path) = test_lock_path();
684 let path = Arc::new(path);
685 let barrier = Arc::new(Barrier::new(3));
686 let inside = Arc::new(AtomicUsize::new(0));
687 let entered = Arc::new(AtomicUsize::new(0));
688 let max_inside = Arc::new(AtomicUsize::new(0));
689
690 let mut handles = Vec::new();
691 for _ in 0..2 {
692 let path = Arc::clone(&path);
693 let barrier = Arc::clone(&barrier);
694 let inside = Arc::clone(&inside);
695 let entered = Arc::clone(&entered);
696 let max_inside = Arc::clone(&max_inside);
697 handles.push(thread::spawn(move || {
698 barrier.wait();
699 let guard = acquire_with_config(&path, Some(Duration::from_secs(2)), test_config())
700 .expect("thread acquire lock");
701 let previous = inside.fetch_add(1, Ordering::SeqCst);
702 assert_eq!(previous, 0, "two lock holders overlapped");
703 entered.fetch_add(1, Ordering::SeqCst);
704 max_inside.fetch_max(previous + 1, Ordering::SeqCst);
705 thread::sleep(Duration::from_millis(75));
706 inside.fetch_sub(1, Ordering::SeqCst);
707 drop(guard);
708 }));
709 }
710
711 barrier.wait();
712 for handle in handles {
713 handle.join().expect("join worker");
714 }
715
716 assert_eq!(entered.load(Ordering::SeqCst), 2);
717 assert_eq!(max_inside.load(Ordering::SeqCst), 1);
718 assert!(!path.exists());
719 }
720
721 #[test]
722 fn heartbeat_updates_lockfile_timestamp() {
723 let (_dir, path) = test_lock_path();
724 let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
725 let initial = read_lock_metadata(&path)
726 .expect("read initial metadata")
727 .heartbeat_at_ms;
728
729 let deadline = std::time::Instant::now() + Duration::from_millis(2_000);
738 let mut updated = initial;
739 while std::time::Instant::now() < deadline {
740 thread::sleep(Duration::from_millis(50));
741 updated = read_lock_metadata(&path)
742 .expect("read updated metadata")
743 .heartbeat_at_ms;
744 if updated > initial {
745 break;
746 }
747 }
748 assert!(
749 updated > initial,
750 "heartbeat timestamp did not advance within 2s"
751 );
752 drop(guard);
753 }
754
755 #[test]
756 fn dead_pid_lock_is_reclaimed() {
757 let (_dir, path) = test_lock_path();
758 let metadata = synthetic_metadata(999_999_999, current_hostname(), now_ms());
759 write_synthetic_lock(&path, &metadata);
760
761 let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
762 .expect("reclaim dead pid lock");
763 let metadata = read_lock_metadata(&path).expect("read reclaimed lock");
764 assert_eq!(metadata.pid, std::process::id());
765 drop(guard);
766 }
767
768 #[test]
769 fn stale_heartbeat_from_live_pid_blocks() {
770 let (_dir, path) = test_lock_path();
771 let mut metadata = current_process_metadata();
772 metadata.created_at_ms = now_ms().saturating_sub(60_000);
773 metadata.heartbeat_at_ms = now_ms().saturating_sub(60_000);
774 write_synthetic_lock(&path, &metadata);
775
776 let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
777 assert!(matches!(result, Err(AcquireError::Timeout)));
778 assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
779
780 remove_lock_file(&path).expect("cleanup synthetic lock");
781 }
782
783 #[test]
784 fn healthy_live_owner_blocks() {
785 let (_dir, path) = test_lock_path();
786 let metadata = current_process_metadata();
787 write_synthetic_lock(&path, &metadata);
788
789 let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
790 assert!(matches!(result, Err(AcquireError::Timeout)));
791
792 remove_lock_file(&path).expect("cleanup synthetic lock");
793 }
794
795 #[test]
796 fn malformed_lockfile_is_reclaimed() {
797 let (_dir, path) = test_lock_path();
798 fs::write(&path, b"not valid json").expect("write malformed lock");
799
800 let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
801 .expect("reclaim malformed lock");
802 let metadata = read_lock_metadata(&path).expect("read reclaimed lock");
803 assert_eq!(metadata.pid, std::process::id());
804 drop(guard);
805 }
806
807 #[test]
808 fn cross_host_lock_is_not_stolen_before_extended_stale_threshold() {
809 let (_dir, path) = test_lock_path();
810 let now = now_ms();
811 let metadata = LockMetadata {
812 pid: std::process::id(),
813 hostname: format!("{}-other", current_hostname()),
814 created_at_ms: now,
815 heartbeat_at_ms: now,
816 };
817 write_synthetic_lock(&path, &metadata);
818
819 let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
820 assert!(matches!(result, Err(AcquireError::Timeout)));
821 assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
822
823 remove_lock_file(&path).expect("cleanup synthetic lock");
824 }
825
826 #[test]
827 fn stale_cross_host_lock_is_reclaimed_after_extended_threshold() {
828 let (_dir, path) = test_lock_path();
829 let stale_at =
830 now_ms().saturating_sub(test_config().cross_host_stale_heartbeat_ms() + 1_000);
831 let metadata = LockMetadata {
832 pid: std::process::id(),
833 hostname: format!("{}-other", current_hostname()),
834 created_at_ms: stale_at,
835 heartbeat_at_ms: stale_at,
836 };
837 write_synthetic_lock(&path, &metadata);
838
839 let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
840 .expect("reclaim stale cross-host lock");
841 let reclaimed = read_lock_metadata(&path).expect("read reclaimed lock");
842 assert_eq!(reclaimed.hostname, current_hostname());
843 assert_ne!(reclaimed.created_at_ms, metadata.created_at_ms);
844 drop(guard);
845 }
846
847 #[test]
848 fn live_owner_over_10min_warns_but_blocks() {
849 let (_dir, path) = test_lock_path();
850 let mut metadata = current_process_metadata();
851 metadata.created_at_ms = now_ms().saturating_sub(11 * 60 * 1_000);
852 metadata.heartbeat_at_ms = now_ms();
853 write_synthetic_lock(&path, &metadata);
854
855 let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
856 assert!(matches!(result, Err(AcquireError::Timeout)));
857 assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
858
859 remove_lock_file(&path).expect("cleanup synthetic lock");
860 }
861
862 #[test]
863 fn drop_stops_heartbeat_thread() {
864 let (_dir, path) = test_lock_path();
865 let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
866 drop(guard);
867
868 thread::sleep(Duration::from_millis(
869 test_config().heartbeat_interval_ms * 3,
870 ));
871 assert!(
872 !path.exists(),
873 "heartbeat recreated or kept updating lockfile"
874 );
875 }
876}