1use std::fmt;
2use std::fs::{self, File, OpenOptions};
3use std::io::{self, Write};
4use std::path::{Path, PathBuf};
5use std::sync::{
6 atomic::{AtomicBool, AtomicU64, Ordering},
7 mpsc, Arc,
8};
9use std::thread::{self, JoinHandle};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use serde::{Deserialize, Serialize};
13
14use crate::{slog_error, slog_info, slog_warn};
15
16pub const HEARTBEAT_INTERVAL_MS: u64 = 5_000;
17pub const STALE_HEARTBEAT_MS: u64 = 15_000;
18pub const LIVE_OWNER_WARN_MS: u64 = 600_000;
19pub const POLL_INTERVAL_MS: u64 = 100;
20
21#[derive(Clone, Copy, Debug)]
22struct LockConfig {
23 heartbeat_interval_ms: u64,
24 stale_heartbeat_ms: u64,
25 live_owner_warn_ms: u64,
26 poll_interval_ms: u64,
27}
28
29impl Default for LockConfig {
30 fn default() -> Self {
31 Self {
32 heartbeat_interval_ms: HEARTBEAT_INTERVAL_MS,
33 stale_heartbeat_ms: STALE_HEARTBEAT_MS,
34 live_owner_warn_ms: LIVE_OWNER_WARN_MS,
35 poll_interval_ms: POLL_INTERVAL_MS,
36 }
37 }
38}
39
40#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
41struct LockMetadata {
42 pid: u32,
43 hostname: String,
44 created_at_ms: u64,
45 heartbeat_at_ms: u64,
46}
47
48pub fn acquire(path: &Path) -> Result<LockGuard, AcquireError> {
53 acquire_with_config(path, None, LockConfig::default())
54}
55
56pub fn try_acquire(path: &Path, timeout: Duration) -> Result<LockGuard, AcquireError> {
58 acquire_with_config(path, Some(timeout), LockConfig::default())
59}
60
61pub struct LockGuard {
62 path: PathBuf,
63 metadata: LockMetadata,
64 shutdown: Arc<AtomicBool>,
65 heartbeat_done: mpsc::Receiver<()>,
66 heartbeat: Option<JoinHandle<()>>,
67}
68
69impl Drop for LockGuard {
70 fn drop(&mut self) {
71 self.shutdown.store(true, Ordering::Release);
95 if let Some(handle) = self.heartbeat.take() {
96 handle.thread().unpark();
97 let _ = handle.join();
98 }
99 while self.heartbeat_done.try_recv().is_ok() {}
103
104 match remove_lock_if_owned(&self.path, &self.metadata) {
105 Ok(true) => slog_info!("released filesystem lock at {}", self.path.display()),
106 Ok(false) => {}
107 Err(error) => slog_warn!(
108 "failed to release filesystem lock at {}: {}",
109 self.path.display(),
110 error
111 ),
112 }
113 }
114}
115
116#[derive(Debug)]
117pub enum AcquireError {
118 Io(io::Error),
119 Timeout,
120}
121
122impl fmt::Display for AcquireError {
123 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124 match self {
125 AcquireError::Io(error) => write!(f, "filesystem lock I/O error: {error}"),
126 AcquireError::Timeout => write!(f, "timed out acquiring filesystem lock"),
127 }
128 }
129}
130
131impl std::error::Error for AcquireError {}
132
133impl From<io::Error> for AcquireError {
134 fn from(error: io::Error) -> Self {
135 AcquireError::Io(error)
136 }
137}
138
139fn acquire_with_config(
140 path: &Path,
141 timeout: Option<Duration>,
142 config: LockConfig,
143) -> Result<LockGuard, AcquireError> {
144 let deadline = timeout.map(|timeout| Instant::now() + timeout);
145 let hostname = current_hostname();
146 let mut warned_live_owner = false;
147
148 loop {
149 if let Some(deadline) = deadline {
150 if Instant::now() >= deadline {
151 return Err(AcquireError::Timeout);
152 }
153 }
154
155 match create_new_lock(path, &hostname, config) {
156 Ok(guard) => return Ok(guard),
157 Err(error) if error.kind() == io::ErrorKind::AlreadyExists => {}
158 Err(error) => return Err(error.into()),
159 }
160
161 let metadata = match read_lock_metadata(path) {
162 Ok(metadata) => metadata,
163 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => continue,
164 Err(ReadLockError::Io(error)) => return Err(error.into()),
165 Err(ReadLockError::Malformed(error)) => {
166 sleep_until_retry(deadline, config.poll_interval_ms)?;
170 match read_lock_metadata(path) {
171 Ok(_) => continue,
172 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
173 continue;
174 }
175 Err(ReadLockError::Io(error)) => return Err(error.into()),
176 Err(ReadLockError::Malformed(_)) => {}
177 }
178 slog_warn!(
179 "removing malformed filesystem lock at {}: {}",
180 path.display(),
181 error
182 );
183 remove_lock_file(path)?;
184 continue;
185 }
186 };
187
188 if metadata.hostname != hostname {
189 sleep_until_retry(deadline, config.poll_interval_ms)?;
190 continue;
191 }
192
193 if !process_alive(metadata.pid) {
194 slog_warn!(
195 "removing filesystem lock at {} from dead PID {}",
196 path.display(),
197 metadata.pid
198 );
199 remove_lock_file(path)?;
200 continue;
201 }
202
203 let now = now_ms();
204 let since_heartbeat = now.saturating_sub(metadata.heartbeat_at_ms);
205 if since_heartbeat > config.stale_heartbeat_ms {
206 slog_warn!(
207 "reclaiming filesystem lock at {}: PID {} is alive but heartbeat is stale ({}ms)",
208 path.display(),
209 metadata.pid,
210 since_heartbeat
211 );
212 remove_lock_file(path)?;
213 continue;
214 }
215
216 let held_for = now.saturating_sub(metadata.created_at_ms);
217 if held_for > config.live_owner_warn_ms && !warned_live_owner {
218 slog_warn!(
219 "filesystem lock at {} held >10min by live heartbeating PID {}; NOT breaking",
220 path.display(),
221 metadata.pid
222 );
223 warned_live_owner = true;
224 }
225
226 sleep_until_retry(deadline, config.poll_interval_ms)?;
227 }
228}
229
230fn create_new_lock(path: &Path, hostname: &str, config: LockConfig) -> io::Result<LockGuard> {
231 let now = now_ms();
232 let metadata = LockMetadata {
233 pid: std::process::id(),
234 hostname: hostname.to_string(),
235 created_at_ms: now,
236 heartbeat_at_ms: now,
237 };
238
239 create_lock_file_atomically(path, &metadata)?;
240
241 let shutdown = Arc::new(AtomicBool::new(false));
242 let (done_tx, done_rx) = mpsc::channel();
243 let heartbeat_path = path.to_path_buf();
244 let heartbeat_metadata = metadata.clone();
245 let heartbeat_shutdown = Arc::clone(&shutdown);
246 let heartbeat = thread::Builder::new()
247 .name("aft-fs-lock-heartbeat".to_string())
248 .spawn(move || {
249 run_heartbeat(
250 heartbeat_path,
251 heartbeat_metadata,
252 heartbeat_shutdown,
253 config,
254 );
255 let _ = done_tx.send(());
256 })?;
257
258 slog_info!("acquired filesystem lock at {}", path.display());
259
260 Ok(LockGuard {
261 path: path.to_path_buf(),
262 metadata,
263 shutdown,
264 heartbeat_done: done_rx,
265 heartbeat: Some(heartbeat),
266 })
267}
268
269fn run_heartbeat(
270 path: PathBuf,
271 owner: LockMetadata,
272 shutdown: Arc<AtomicBool>,
273 config: LockConfig,
274) {
275 loop {
276 thread::park_timeout(Duration::from_millis(config.heartbeat_interval_ms));
277 if shutdown.load(Ordering::Acquire) {
278 return;
279 }
280
281 match heartbeat_once(&path, &owner) {
282 Ok(()) => {}
283 Err(HeartbeatError::LockGone) => {
284 slog_error!(
285 "filesystem lock at {} disappeared; stopping heartbeat",
286 path.display()
287 );
288 return;
289 }
290 Err(HeartbeatError::NotOwner) => {
291 slog_error!(
292 "filesystem lock at {} is no longer owned by this guard; stopping heartbeat",
293 path.display()
294 );
295 return;
296 }
297 Err(HeartbeatError::Malformed(error)) => {
298 slog_error!(
299 "filesystem lock at {} became malformed: {}; stopping heartbeat",
300 path.display(),
301 error
302 );
303 return;
304 }
305 Err(HeartbeatError::Io(error)) => {
306 slog_error!(
307 "failed to heartbeat filesystem lock at {}: {}; stopping heartbeat",
308 path.display(),
309 error
310 );
311 return;
312 }
313 }
314 }
315}
316
317fn heartbeat_once(path: &Path, owner: &LockMetadata) -> Result<(), HeartbeatError> {
318 let mut metadata = match read_lock_metadata(path) {
319 Ok(metadata) => metadata,
320 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
321 return Err(HeartbeatError::LockGone);
322 }
323 Err(ReadLockError::Io(error)) => return Err(HeartbeatError::Io(error)),
324 Err(ReadLockError::Malformed(error)) => return Err(HeartbeatError::Malformed(error)),
325 };
326
327 if metadata.pid != owner.pid
328 || metadata.hostname != owner.hostname
329 || metadata.created_at_ms != owner.created_at_ms
330 {
331 return Err(HeartbeatError::NotOwner);
332 }
333
334 metadata.heartbeat_at_ms = now_ms();
335 atomic_write_lock_metadata(path, &metadata).map_err(HeartbeatError::Io)
336}
337
338#[derive(Debug)]
339enum HeartbeatError {
340 Io(io::Error),
341 LockGone,
342 Malformed(serde_json::Error),
343 NotOwner,
344}
345
346#[derive(Debug)]
347enum ReadLockError {
348 Io(io::Error),
349 Malformed(serde_json::Error),
350}
351
352fn read_lock_metadata(path: &Path) -> Result<LockMetadata, ReadLockError> {
353 let bytes = fs::read(path).map_err(ReadLockError::Io)?;
354 serde_json::from_slice(&bytes).map_err(ReadLockError::Malformed)
355}
356
357#[cfg(unix)]
358fn open_new_lock_file(path: &Path) -> io::Result<File> {
359 use std::os::unix::fs::OpenOptionsExt;
360
361 OpenOptions::new()
362 .write(true)
363 .create_new(true)
364 .mode(0o644)
365 .open(path)
366}
367
368#[cfg(not(unix))]
369fn open_new_lock_file(path: &Path) -> io::Result<File> {
370 OpenOptions::new().write(true).create_new(true).open(path)
371}
372
373fn write_lock_metadata_to_file(file: &mut File, metadata: &LockMetadata) -> io::Result<()> {
374 serde_json::to_writer(&mut *file, metadata).map_err(io::Error::other)?;
375 file.write_all(b"\n")?;
376 file.sync_all()
377}
378
379fn create_lock_file_atomically(path: &Path, metadata: &LockMetadata) -> io::Result<()> {
380 let tmp_path = temp_path_for_lock(path);
381 let result = (|| {
382 let mut file = open_new_lock_file(&tmp_path)?;
383 write_lock_metadata_to_file(&mut file, metadata)?;
384 drop(file);
385
386 fs::hard_link(&tmp_path, path)?;
387 sync_parent(path);
388 Ok(())
389 })();
390
391 let _ = fs::remove_file(&tmp_path);
392 result
393}
394
395fn atomic_write_lock_metadata(path: &Path, metadata: &LockMetadata) -> io::Result<()> {
396 let tmp_path = temp_path_for_lock(path);
397 let write_result = (|| {
398 let mut file = OpenOptions::new()
399 .write(true)
400 .create_new(true)
401 .open(&tmp_path)?;
402 write_lock_metadata_to_file(&mut file, metadata)?;
403 drop(file);
404
405 rename_over(&tmp_path, path)?;
406 sync_parent(path);
407 Ok(())
408 })();
409
410 if write_result.is_err() {
411 let _ = fs::remove_file(&tmp_path);
412 }
413
414 write_result
415}
416
417#[cfg(windows)]
418fn rename_over(from: &Path, to: &Path) -> io::Result<()> {
419 let _ = fs::remove_file(to);
420 fs::rename(from, to)
421}
422
423#[cfg(not(windows))]
424fn rename_over(from: &Path, to: &Path) -> io::Result<()> {
425 fs::rename(from, to)
426}
427
428static TEMP_LOCK_COUNTER: AtomicU64 = AtomicU64::new(0);
441
442fn temp_path_for_lock(path: &Path) -> PathBuf {
443 let file_name = path
444 .file_name()
445 .and_then(|name| name.to_str())
446 .unwrap_or("lock");
447 let seq = TEMP_LOCK_COUNTER.fetch_add(1, Ordering::Relaxed);
448 path.with_file_name(format!(
449 ".{file_name}.tmp.{}.{}.{}",
450 std::process::id(),
451 now_nanos(),
452 seq
453 ))
454}
455
456fn remove_lock_if_owned(path: &Path, owner: &LockMetadata) -> io::Result<bool> {
457 let metadata = match read_lock_metadata(path) {
458 Ok(metadata) => metadata,
459 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
460 return Ok(false);
461 }
462 Err(ReadLockError::Io(error)) => return Err(error),
463 Err(ReadLockError::Malformed(_)) => return Ok(false),
464 };
465
466 if metadata.pid == owner.pid
467 && metadata.hostname == owner.hostname
468 && metadata.created_at_ms == owner.created_at_ms
469 {
470 remove_lock_file(path)?;
471 Ok(true)
472 } else {
473 Ok(false)
474 }
475}
476
477fn remove_lock_file(path: &Path) -> io::Result<()> {
478 match fs::remove_file(path) {
479 Ok(()) => Ok(()),
480 Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(()),
481 Err(error) => Err(error),
482 }
483}
484
485fn sleep_until_retry(deadline: Option<Instant>, poll_interval_ms: u64) -> Result<(), AcquireError> {
486 let poll = Duration::from_millis(poll_interval_ms);
487 let sleep_for = match deadline {
488 Some(deadline) => {
489 let now = Instant::now();
490 if now >= deadline {
491 return Err(AcquireError::Timeout);
492 }
493 poll.min(deadline.saturating_duration_since(now))
494 }
495 None => poll,
496 };
497 thread::sleep(sleep_for);
498 Ok(())
499}
500
501fn sync_parent(path: &Path) {
502 if let Some(parent) = path.parent() {
503 if let Ok(dir) = File::open(parent) {
504 let _ = dir.sync_all();
505 }
506 }
507}
508
509fn now_ms() -> u64 {
510 SystemTime::now()
511 .duration_since(UNIX_EPOCH)
512 .unwrap_or(Duration::ZERO)
513 .as_millis() as u64
514}
515
516fn now_nanos() -> u128 {
517 SystemTime::now()
518 .duration_since(UNIX_EPOCH)
519 .unwrap_or(Duration::ZERO)
520 .as_nanos()
521}
522
523#[cfg(unix)]
524fn current_hostname() -> String {
525 let mut buffer = [0u8; 256];
526 let result = unsafe { libc::gethostname(buffer.as_mut_ptr().cast(), buffer.len()) };
527 if result == 0 {
528 let len = buffer
529 .iter()
530 .position(|byte| *byte == 0)
531 .unwrap_or(buffer.len());
532 if len > 0 {
533 return String::from_utf8_lossy(&buffer[..len]).into_owned();
534 }
535 }
536
537 std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown-host".to_string())
538}
539
540#[cfg(windows)]
541fn current_hostname() -> String {
542 std::env::var("COMPUTERNAME")
543 .or_else(|_| std::env::var("HOSTNAME"))
544 .unwrap_or_else(|_| "unknown-host".to_string())
545}
546
547#[cfg(not(any(unix, windows)))]
548fn current_hostname() -> String {
549 std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown-host".to_string())
550}
551
552#[cfg(unix)]
553fn process_alive(pid: u32) -> bool {
554 if pid == 0 || pid > i32::MAX as u32 {
555 return false;
556 }
557
558 let result = unsafe { libc::kill(pid as libc::pid_t, 0) };
559 if result == 0 {
560 return true;
561 }
562
563 io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
564}
565
566#[cfg(windows)]
567fn process_alive(pid: u32) -> bool {
568 let filter = format!("PID eq {pid}");
569 let Ok(output) = std::process::Command::new("tasklist")
570 .args(["/FI", &filter, "/FO", "CSV", "/NH"])
571 .output()
572 else {
573 return true;
574 };
575
576 if !output.status.success() {
577 return true;
578 }
579
580 let stdout = String::from_utf8_lossy(&output.stdout);
581
582 if stdout.contains("No tasks are running") {
593 return false;
594 }
595 stdout.contains(&format!("\"{pid}\""))
596}
597
598#[cfg(not(any(unix, windows)))]
599fn process_alive(_pid: u32) -> bool {
600 true
601}
602
603#[cfg(test)]
604mod tests {
605 use super::*;
606 use std::sync::atomic::{AtomicUsize, Ordering};
607 use std::sync::{Arc, Barrier};
608
609 fn test_config() -> LockConfig {
610 LockConfig {
611 heartbeat_interval_ms: 25,
612 stale_heartbeat_ms: 2_000,
613 live_owner_warn_ms: LIVE_OWNER_WARN_MS,
614 poll_interval_ms: 10,
615 }
616 }
617
618 fn test_lock_path() -> (tempfile::TempDir, PathBuf) {
619 let dir = tempfile::tempdir().expect("create temp dir");
620 let path = dir.path().join("test.lock");
621 (dir, path)
622 }
623
624 fn write_synthetic_lock(path: &Path, metadata: &LockMetadata) {
625 let mut file = open_new_lock_file(path).expect("create synthetic lock");
626 write_lock_metadata_to_file(&mut file, metadata).expect("write synthetic lock");
627 }
628
629 fn synthetic_metadata(pid: u32, hostname: String, created_at_ms: u64) -> LockMetadata {
630 LockMetadata {
631 pid,
632 hostname,
633 created_at_ms,
634 heartbeat_at_ms: created_at_ms,
635 }
636 }
637
638 fn current_process_metadata() -> LockMetadata {
639 let now = now_ms();
640 synthetic_metadata(std::process::id(), current_hostname(), now)
641 }
642
643 #[test]
644 fn acquire_creates_lockfile_and_unlocks_on_drop() {
645 let (_dir, path) = test_lock_path();
646
647 let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
648 let metadata = read_lock_metadata(&path).expect("read lock metadata");
649 assert_eq!(metadata.pid, std::process::id());
650 assert_eq!(metadata.hostname, current_hostname());
651 assert_eq!(metadata.created_at_ms, guard.metadata.created_at_ms);
652
653 drop(guard);
654 assert!(!path.exists());
655 }
656
657 #[test]
658 fn acquire_serializes_concurrent_callers() {
659 let (_dir, path) = test_lock_path();
660 let path = Arc::new(path);
661 let barrier = Arc::new(Barrier::new(3));
662 let inside = Arc::new(AtomicUsize::new(0));
663 let entered = Arc::new(AtomicUsize::new(0));
664 let max_inside = Arc::new(AtomicUsize::new(0));
665
666 let mut handles = Vec::new();
667 for _ in 0..2 {
668 let path = Arc::clone(&path);
669 let barrier = Arc::clone(&barrier);
670 let inside = Arc::clone(&inside);
671 let entered = Arc::clone(&entered);
672 let max_inside = Arc::clone(&max_inside);
673 handles.push(thread::spawn(move || {
674 barrier.wait();
675 let guard = acquire_with_config(&path, Some(Duration::from_secs(2)), test_config())
676 .expect("thread acquire lock");
677 let previous = inside.fetch_add(1, Ordering::SeqCst);
678 assert_eq!(previous, 0, "two lock holders overlapped");
679 entered.fetch_add(1, Ordering::SeqCst);
680 max_inside.fetch_max(previous + 1, Ordering::SeqCst);
681 thread::sleep(Duration::from_millis(75));
682 inside.fetch_sub(1, Ordering::SeqCst);
683 drop(guard);
684 }));
685 }
686
687 barrier.wait();
688 for handle in handles {
689 handle.join().expect("join worker");
690 }
691
692 assert_eq!(entered.load(Ordering::SeqCst), 2);
693 assert_eq!(max_inside.load(Ordering::SeqCst), 1);
694 assert!(!path.exists());
695 }
696
697 #[test]
698 fn heartbeat_updates_lockfile_timestamp() {
699 let (_dir, path) = test_lock_path();
700 let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
701 let initial = read_lock_metadata(&path)
702 .expect("read initial metadata")
703 .heartbeat_at_ms;
704
705 let deadline = std::time::Instant::now() + Duration::from_millis(2_000);
714 let mut updated = initial;
715 while std::time::Instant::now() < deadline {
716 thread::sleep(Duration::from_millis(50));
717 updated = read_lock_metadata(&path)
718 .expect("read updated metadata")
719 .heartbeat_at_ms;
720 if updated > initial {
721 break;
722 }
723 }
724 assert!(
725 updated > initial,
726 "heartbeat timestamp did not advance within 2s"
727 );
728 drop(guard);
729 }
730
731 #[test]
732 fn dead_pid_lock_is_reclaimed() {
733 let (_dir, path) = test_lock_path();
734 let metadata = synthetic_metadata(999_999_999, current_hostname(), now_ms());
735 write_synthetic_lock(&path, &metadata);
736
737 let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
738 .expect("reclaim dead pid lock");
739 let metadata = read_lock_metadata(&path).expect("read reclaimed lock");
740 assert_eq!(metadata.pid, std::process::id());
741 drop(guard);
742 }
743
744 #[test]
745 fn stale_heartbeat_lock_is_reclaimed() {
746 let (_dir, path) = test_lock_path();
747 let mut metadata = current_process_metadata();
748 metadata.created_at_ms = now_ms().saturating_sub(60_000);
749 metadata.heartbeat_at_ms = now_ms().saturating_sub(60_000);
750 write_synthetic_lock(&path, &metadata);
751
752 let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
753 .expect("reclaim stale heartbeat lock");
754 let reclaimed = read_lock_metadata(&path).expect("read reclaimed lock");
755 assert_ne!(reclaimed.created_at_ms, metadata.created_at_ms);
756 drop(guard);
757 }
758
759 #[test]
760 fn healthy_live_owner_blocks() {
761 let (_dir, path) = test_lock_path();
762 let metadata = current_process_metadata();
763 write_synthetic_lock(&path, &metadata);
764
765 let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
766 assert!(matches!(result, Err(AcquireError::Timeout)));
767
768 remove_lock_file(&path).expect("cleanup synthetic lock");
769 }
770
771 #[test]
772 fn malformed_lockfile_is_reclaimed() {
773 let (_dir, path) = test_lock_path();
774 fs::write(&path, b"not valid json").expect("write malformed lock");
775
776 let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
777 .expect("reclaim malformed lock");
778 let metadata = read_lock_metadata(&path).expect("read reclaimed lock");
779 assert_eq!(metadata.pid, std::process::id());
780 drop(guard);
781 }
782
783 #[test]
784 fn cross_host_lock_is_not_stolen() {
785 let (_dir, path) = test_lock_path();
786 let now = now_ms().saturating_sub(60_000);
787 let metadata = LockMetadata {
788 pid: std::process::id(),
789 hostname: format!("{}-other", current_hostname()),
790 created_at_ms: now,
791 heartbeat_at_ms: now,
792 };
793 write_synthetic_lock(&path, &metadata);
794
795 let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
796 assert!(matches!(result, Err(AcquireError::Timeout)));
797 assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
798
799 remove_lock_file(&path).expect("cleanup synthetic lock");
800 }
801
802 #[test]
803 fn live_owner_over_10min_warns_but_blocks() {
804 let (_dir, path) = test_lock_path();
805 let mut metadata = current_process_metadata();
806 metadata.created_at_ms = now_ms().saturating_sub(11 * 60 * 1_000);
807 metadata.heartbeat_at_ms = now_ms();
808 write_synthetic_lock(&path, &metadata);
809
810 let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
811 assert!(matches!(result, Err(AcquireError::Timeout)));
812 assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
813
814 remove_lock_file(&path).expect("cleanup synthetic lock");
815 }
816
817 #[test]
818 fn drop_stops_heartbeat_thread() {
819 let (_dir, path) = test_lock_path();
820 let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
821 drop(guard);
822
823 thread::sleep(Duration::from_millis(
824 test_config().heartbeat_interval_ms * 3,
825 ));
826 assert!(
827 !path.exists(),
828 "heartbeat recreated or kept updating lockfile"
829 );
830 }
831}