1use std::fmt;
2use std::fs::{self, File, OpenOptions};
3use std::io::{self, Write};
4use std::path::{Path, PathBuf};
5use std::sync::{
6 atomic::{AtomicBool, AtomicU64, Ordering},
7 mpsc, Arc,
8};
9use std::thread::{self, JoinHandle};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use serde::{Deserialize, Serialize};
13
14use crate::{slog_error, slog_info, slog_warn};
15
16pub const HEARTBEAT_INTERVAL_MS: u64 = 5_000;
17pub const STALE_HEARTBEAT_MS: u64 = 15_000;
18pub const LIVE_OWNER_WARN_MS: u64 = 600_000;
19pub const POLL_INTERVAL_MS: u64 = 100;
20
21#[derive(Clone, Copy, Debug)]
22struct LockConfig {
23 heartbeat_interval_ms: u64,
24 stale_heartbeat_ms: u64,
25 live_owner_warn_ms: u64,
26 poll_interval_ms: u64,
27}
28
29impl LockConfig {
30 fn cross_host_stale_heartbeat_ms(self) -> u64 {
31 self.stale_heartbeat_ms.saturating_mul(5)
32 }
33}
34
35impl Default for LockConfig {
36 fn default() -> Self {
37 Self {
38 heartbeat_interval_ms: HEARTBEAT_INTERVAL_MS,
39 stale_heartbeat_ms: STALE_HEARTBEAT_MS,
40 live_owner_warn_ms: LIVE_OWNER_WARN_MS,
41 poll_interval_ms: POLL_INTERVAL_MS,
42 }
43 }
44}
45
46#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
47struct LockMetadata {
48 pid: u32,
49 hostname: String,
50 created_at_ms: u64,
51 heartbeat_at_ms: u64,
52}
53
54pub fn acquire(path: &Path) -> Result<LockGuard, AcquireError> {
59 acquire_with_config(path, None, LockConfig::default())
60}
61
62pub fn try_acquire(path: &Path, timeout: Duration) -> Result<LockGuard, AcquireError> {
64 acquire_with_config(path, Some(timeout), LockConfig::default())
65}
66
67pub struct LockGuard {
68 path: PathBuf,
69 metadata: LockMetadata,
70 shutdown: Arc<AtomicBool>,
71 heartbeat_done: mpsc::Receiver<()>,
72 heartbeat: Option<JoinHandle<()>>,
73}
74
75impl Drop for LockGuard {
76 fn drop(&mut self) {
77 self.shutdown.store(true, Ordering::Release);
101 if let Some(handle) = self.heartbeat.take() {
102 handle.thread().unpark();
103 let _ = handle.join();
104 }
105 while self.heartbeat_done.try_recv().is_ok() {}
109
110 match remove_lock_if_owned(&self.path, &self.metadata) {
111 Ok(true) => slog_info!("released filesystem lock at {}", self.path.display()),
112 Ok(false) => {}
113 Err(error) => slog_warn!(
114 "failed to release filesystem lock at {}: {}",
115 self.path.display(),
116 error
117 ),
118 }
119 }
120}
121
122#[derive(Debug)]
123pub enum AcquireError {
124 Io(io::Error),
125 Timeout,
126}
127
128impl fmt::Display for AcquireError {
129 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130 match self {
131 AcquireError::Io(error) => write!(f, "filesystem lock I/O error: {error}"),
132 AcquireError::Timeout => write!(f, "timed out acquiring filesystem lock"),
133 }
134 }
135}
136
137impl std::error::Error for AcquireError {}
138
139impl From<io::Error> for AcquireError {
140 fn from(error: io::Error) -> Self {
141 AcquireError::Io(error)
142 }
143}
144
145fn acquire_with_config(
146 path: &Path,
147 timeout: Option<Duration>,
148 config: LockConfig,
149) -> Result<LockGuard, AcquireError> {
150 let deadline = timeout.map(|timeout| Instant::now() + timeout);
151 let hostname = current_hostname();
152 let mut warned_live_owner = false;
153 let mut warned_stale_live_owner = false;
154
155 loop {
156 if let Some(deadline) = deadline {
157 if Instant::now() >= deadline {
158 return Err(AcquireError::Timeout);
159 }
160 }
161
162 match create_new_lock(path, &hostname, config) {
163 Ok(guard) => return Ok(guard),
164 Err(error) if error.kind() == io::ErrorKind::AlreadyExists => {}
165 Err(error) => return Err(error.into()),
166 }
167
168 let metadata = match read_lock_metadata(path) {
169 Ok(metadata) => metadata,
170 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => continue,
171 Err(ReadLockError::Io(error)) => return Err(error.into()),
172 Err(ReadLockError::Malformed(error)) => {
173 sleep_until_retry(deadline, config.poll_interval_ms)?;
177 match read_lock_metadata(path) {
178 Ok(_) => continue,
179 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
180 continue;
181 }
182 Err(ReadLockError::Io(error)) => return Err(error.into()),
183 Err(ReadLockError::Malformed(_)) => {}
184 }
185 slog_warn!(
186 "removing malformed filesystem lock at {}: {}",
187 path.display(),
188 error
189 );
190 remove_lock_file(path)?;
191 continue;
192 }
193 };
194
195 let now = now_ms();
196 let since_heartbeat = now.saturating_sub(metadata.heartbeat_at_ms);
197
198 if metadata.hostname != hostname {
199 let cross_host_stale_ms = config.cross_host_stale_heartbeat_ms();
200 if since_heartbeat > cross_host_stale_ms {
201 slog_warn!(
202 "reclaiming cross-host filesystem lock at {} from host {} after stale heartbeat ({}ms > {}ms)",
203 path.display(),
204 metadata.hostname,
205 since_heartbeat,
206 cross_host_stale_ms
207 );
208 remove_lock_file(path)?;
209 continue;
210 }
211 sleep_until_retry(deadline, config.poll_interval_ms)?;
212 continue;
213 }
214
215 if !process_alive(metadata.pid) {
216 slog_warn!(
217 "removing filesystem lock at {} from dead PID {}",
218 path.display(),
219 metadata.pid
220 );
221 remove_lock_file(path)?;
222 continue;
223 }
224
225 if since_heartbeat > config.stale_heartbeat_ms && !warned_stale_live_owner {
226 slog_warn!(
232 "filesystem lock at {} held by live PID {} has stale heartbeat ({}ms); NOT breaking",
233 path.display(),
234 metadata.pid,
235 since_heartbeat
236 );
237 warned_stale_live_owner = true;
238 }
239
240 let held_for = now.saturating_sub(metadata.created_at_ms);
241 if held_for > config.live_owner_warn_ms && !warned_live_owner {
242 slog_warn!(
243 "filesystem lock at {} held >10min by live heartbeating PID {}; NOT breaking",
244 path.display(),
245 metadata.pid
246 );
247 warned_live_owner = true;
248 }
249
250 sleep_until_retry(deadline, config.poll_interval_ms)?;
251 }
252}
253
254fn create_new_lock(path: &Path, hostname: &str, config: LockConfig) -> io::Result<LockGuard> {
255 let now = now_ms();
256 let metadata = LockMetadata {
257 pid: std::process::id(),
258 hostname: hostname.to_string(),
259 created_at_ms: now,
260 heartbeat_at_ms: now,
261 };
262
263 create_lock_file_atomically(path, &metadata)?;
264
265 let shutdown = Arc::new(AtomicBool::new(false));
266 let (done_tx, done_rx) = mpsc::channel();
267 let heartbeat_path = path.to_path_buf();
268 let heartbeat_metadata = metadata.clone();
269 let heartbeat_shutdown = Arc::clone(&shutdown);
270 let heartbeat = thread::Builder::new()
271 .name("aft-fs-lock-heartbeat".to_string())
272 .spawn(move || {
273 run_heartbeat(
274 heartbeat_path,
275 heartbeat_metadata,
276 heartbeat_shutdown,
277 config,
278 );
279 let _ = done_tx.send(());
280 })?;
281
282 slog_info!("acquired filesystem lock at {}", path.display());
283
284 Ok(LockGuard {
285 path: path.to_path_buf(),
286 metadata,
287 shutdown,
288 heartbeat_done: done_rx,
289 heartbeat: Some(heartbeat),
290 })
291}
292
293fn run_heartbeat(
294 path: PathBuf,
295 owner: LockMetadata,
296 shutdown: Arc<AtomicBool>,
297 config: LockConfig,
298) {
299 let stale_intervals = config
304 .stale_heartbeat_ms
305 .checked_div(config.heartbeat_interval_ms.max(1))
306 .unwrap_or(3)
307 .max(1);
308 let mut consecutive_transient_failures: u64 = 0;
309
310 loop {
311 thread::park_timeout(Duration::from_millis(config.heartbeat_interval_ms));
312 if shutdown.load(Ordering::Acquire) {
313 return;
314 }
315
316 match heartbeat_once(&path, &owner) {
317 Ok(()) => {
318 if consecutive_transient_failures > 0 {
319 slog_info!(
320 "filesystem lock at {} heartbeat recovered after {} transient failure(s)",
321 path.display(),
322 consecutive_transient_failures
323 );
324 consecutive_transient_failures = 0;
325 }
326 }
327 Err(error) if heartbeat_error_is_terminal(&error) => {
328 slog_error!(
333 "{}; stopping heartbeat",
334 terminal_heartbeat_message(&path, &error)
335 );
336 return;
337 }
338 Err(error) => {
339 consecutive_transient_failures += 1;
349 log_transient_heartbeat_failure(
350 &path,
351 &transient_heartbeat_reason(&error),
352 consecutive_transient_failures,
353 stale_intervals,
354 );
355 }
356 }
357 }
358}
359
360fn heartbeat_error_is_terminal(error: &HeartbeatError) -> bool {
366 matches!(error, HeartbeatError::LockGone | HeartbeatError::NotOwner)
367}
368
369fn terminal_heartbeat_message(path: &Path, error: &HeartbeatError) -> String {
370 match error {
371 HeartbeatError::LockGone => {
372 format!("filesystem lock at {} disappeared", path.display())
373 }
374 HeartbeatError::NotOwner => format!(
375 "filesystem lock at {} is no longer owned by this guard",
376 path.display()
377 ),
378 HeartbeatError::Io(error) => {
380 format!("filesystem lock at {} I/O error: {error}", path.display())
381 }
382 HeartbeatError::Malformed(error) => {
383 format!(
384 "filesystem lock at {} became malformed: {error}",
385 path.display()
386 )
387 }
388 }
389}
390
391fn transient_heartbeat_reason(error: &HeartbeatError) -> String {
392 match error {
393 HeartbeatError::Io(error) => format!("I/O error: {error}"),
394 HeartbeatError::Malformed(error) => format!("became malformed: {error}"),
395 HeartbeatError::LockGone => "lock disappeared".to_string(),
396 HeartbeatError::NotOwner => "lock no longer owned".to_string(),
397 }
398}
399
400fn log_transient_heartbeat_failure(
405 path: &Path,
406 reason: &str,
407 consecutive_failures: u64,
408 stale_intervals: u64,
409) {
410 if consecutive_failures < stale_intervals {
411 slog_warn!(
412 "transient failure to heartbeat filesystem lock at {}: {}; retrying (attempt {})",
413 path.display(),
414 reason,
415 consecutive_failures
416 );
417 } else if consecutive_failures == stale_intervals {
418 slog_error!(
419 "filesystem lock at {} has failed {} consecutive heartbeats: {}; \
420 the lock may now be reclaimed by another owner — continuing to retry",
421 path.display(),
422 consecutive_failures,
423 reason
424 );
425 }
426}
427
428fn heartbeat_once(path: &Path, owner: &LockMetadata) -> Result<(), HeartbeatError> {
429 let mut metadata = match read_lock_metadata(path) {
430 Ok(metadata) => metadata,
431 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
432 return Err(HeartbeatError::LockGone);
433 }
434 Err(ReadLockError::Io(error)) => return Err(HeartbeatError::Io(error)),
435 Err(ReadLockError::Malformed(error)) => return Err(HeartbeatError::Malformed(error)),
436 };
437
438 if metadata.pid != owner.pid
439 || metadata.hostname != owner.hostname
440 || metadata.created_at_ms != owner.created_at_ms
441 {
442 return Err(HeartbeatError::NotOwner);
443 }
444
445 metadata.heartbeat_at_ms = now_ms();
446 atomic_write_lock_metadata(path, &metadata).map_err(HeartbeatError::Io)
447}
448
449#[derive(Debug)]
450enum HeartbeatError {
451 Io(io::Error),
452 LockGone,
453 Malformed(serde_json::Error),
454 NotOwner,
455}
456
457#[derive(Debug)]
458enum ReadLockError {
459 Io(io::Error),
460 Malformed(serde_json::Error),
461}
462
463fn read_lock_metadata(path: &Path) -> Result<LockMetadata, ReadLockError> {
464 let bytes = fs::read(path).map_err(ReadLockError::Io)?;
465 serde_json::from_slice(&bytes).map_err(ReadLockError::Malformed)
466}
467
468#[cfg(unix)]
469fn open_new_lock_file(path: &Path) -> io::Result<File> {
470 use std::os::unix::fs::OpenOptionsExt;
471
472 OpenOptions::new()
473 .write(true)
474 .create_new(true)
475 .mode(0o644)
476 .open(path)
477}
478
479#[cfg(not(unix))]
480fn open_new_lock_file(path: &Path) -> io::Result<File> {
481 OpenOptions::new().write(true).create_new(true).open(path)
482}
483
484fn write_lock_metadata_to_file(file: &mut File, metadata: &LockMetadata) -> io::Result<()> {
485 serde_json::to_writer(&mut *file, metadata).map_err(io::Error::other)?;
486 file.write_all(b"\n")?;
487 file.sync_all()
488}
489
490fn create_lock_file_atomically(path: &Path, metadata: &LockMetadata) -> io::Result<()> {
491 let tmp_path = temp_path_for_lock(path);
492 let result = (|| {
493 let mut file = open_new_lock_file(&tmp_path)?;
494 write_lock_metadata_to_file(&mut file, metadata)?;
495 drop(file);
496
497 fs::hard_link(&tmp_path, path)?;
498 sync_parent(path);
499 Ok(())
500 })();
501
502 let _ = fs::remove_file(&tmp_path);
503 result
504}
505
506fn atomic_write_lock_metadata(path: &Path, metadata: &LockMetadata) -> io::Result<()> {
507 let tmp_path = temp_path_for_lock(path);
508 let write_result = (|| {
509 let mut file = OpenOptions::new()
510 .write(true)
511 .create_new(true)
512 .open(&tmp_path)?;
513 write_lock_metadata_to_file(&mut file, metadata)?;
514 drop(file);
515
516 rename_over(&tmp_path, path)?;
517 sync_parent(path);
518 Ok(())
519 })();
520
521 if write_result.is_err() {
522 let _ = fs::remove_file(&tmp_path);
523 }
524
525 write_result
526}
527
528#[cfg(windows)]
529fn rename_over(from: &Path, to: &Path) -> io::Result<()> {
530 let _ = fs::remove_file(to);
531 fs::rename(from, to)
532}
533
534#[cfg(not(windows))]
535fn rename_over(from: &Path, to: &Path) -> io::Result<()> {
536 fs::rename(from, to)
537}
538
539static TEMP_LOCK_COUNTER: AtomicU64 = AtomicU64::new(0);
552
553fn temp_path_for_lock(path: &Path) -> PathBuf {
554 let file_name = path
555 .file_name()
556 .and_then(|name| name.to_str())
557 .unwrap_or("lock");
558 let seq = TEMP_LOCK_COUNTER.fetch_add(1, Ordering::Relaxed);
559 path.with_file_name(format!(
560 ".{file_name}.tmp.{}.{}.{}",
561 std::process::id(),
562 now_nanos(),
563 seq
564 ))
565}
566
567fn remove_lock_if_owned(path: &Path, owner: &LockMetadata) -> io::Result<bool> {
568 let metadata = match read_lock_metadata(path) {
569 Ok(metadata) => metadata,
570 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
571 return Ok(false);
572 }
573 Err(ReadLockError::Io(error)) => return Err(error),
574 Err(ReadLockError::Malformed(_)) => return Ok(false),
575 };
576
577 if metadata.pid == owner.pid
578 && metadata.hostname == owner.hostname
579 && metadata.created_at_ms == owner.created_at_ms
580 {
581 remove_lock_file(path)?;
582 Ok(true)
583 } else {
584 Ok(false)
585 }
586}
587
588fn remove_lock_file(path: &Path) -> io::Result<()> {
589 match fs::remove_file(path) {
590 Ok(()) => Ok(()),
591 Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(()),
592 Err(error) => Err(error),
593 }
594}
595
596fn sleep_until_retry(deadline: Option<Instant>, poll_interval_ms: u64) -> Result<(), AcquireError> {
597 let poll = Duration::from_millis(poll_interval_ms);
598 let sleep_for = match deadline {
599 Some(deadline) => {
600 let now = Instant::now();
601 if now >= deadline {
602 return Err(AcquireError::Timeout);
603 }
604 poll.min(deadline.saturating_duration_since(now))
605 }
606 None => poll,
607 };
608 thread::sleep(sleep_for);
609 Ok(())
610}
611
612fn sync_parent(path: &Path) {
613 if let Some(parent) = path.parent() {
614 if let Ok(dir) = File::open(parent) {
615 let _ = dir.sync_all();
616 }
617 }
618}
619
620fn now_ms() -> u64 {
621 SystemTime::now()
622 .duration_since(UNIX_EPOCH)
623 .unwrap_or(Duration::ZERO)
624 .as_millis() as u64
625}
626
627fn now_nanos() -> u128 {
628 SystemTime::now()
629 .duration_since(UNIX_EPOCH)
630 .unwrap_or(Duration::ZERO)
631 .as_nanos()
632}
633
634#[cfg(unix)]
635fn current_hostname() -> String {
636 let mut buffer = [0u8; 256];
637 let result = unsafe { libc::gethostname(buffer.as_mut_ptr().cast(), buffer.len()) };
638 if result == 0 {
639 let len = buffer
640 .iter()
641 .position(|byte| *byte == 0)
642 .unwrap_or(buffer.len());
643 if len > 0 {
644 return String::from_utf8_lossy(&buffer[..len]).into_owned();
645 }
646 }
647
648 std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown-host".to_string())
649}
650
651#[cfg(windows)]
652fn current_hostname() -> String {
653 std::env::var("COMPUTERNAME")
654 .or_else(|_| std::env::var("HOSTNAME"))
655 .unwrap_or_else(|_| "unknown-host".to_string())
656}
657
658#[cfg(not(any(unix, windows)))]
659fn current_hostname() -> String {
660 std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown-host".to_string())
661}
662
663#[cfg(unix)]
664fn process_alive(pid: u32) -> bool {
665 if pid == 0 || pid > i32::MAX as u32 {
666 return false;
667 }
668
669 let result = unsafe { libc::kill(pid as libc::pid_t, 0) };
670 if result == 0 {
671 return true;
672 }
673
674 io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
675}
676
677#[cfg(windows)]
678fn process_alive(pid: u32) -> bool {
679 let filter = format!("PID eq {pid}");
680 let Ok(output) = std::process::Command::new("tasklist")
681 .args(["/FI", &filter, "/FO", "CSV", "/NH"])
682 .output()
683 else {
684 return true;
685 };
686
687 if !output.status.success() {
688 return true;
689 }
690
691 let stdout = String::from_utf8_lossy(&output.stdout);
692
693 if stdout.contains("No tasks are running") {
704 return false;
705 }
706 stdout.contains(&format!("\"{pid}\""))
707}
708
709#[cfg(not(any(unix, windows)))]
710fn process_alive(_pid: u32) -> bool {
711 true
712}
713
714#[cfg(test)]
715mod tests {
716 use super::*;
717 use std::sync::atomic::{AtomicUsize, Ordering};
718 use std::sync::{Arc, Barrier};
719
720 fn test_config() -> LockConfig {
721 LockConfig {
722 heartbeat_interval_ms: 25,
723 stale_heartbeat_ms: 2_000,
724 live_owner_warn_ms: LIVE_OWNER_WARN_MS,
725 poll_interval_ms: 10,
726 }
727 }
728
729 fn test_lock_path() -> (tempfile::TempDir, PathBuf) {
730 let dir = tempfile::tempdir().expect("create temp dir");
731 let path = dir.path().join("test.lock");
732 (dir, path)
733 }
734
735 fn write_synthetic_lock(path: &Path, metadata: &LockMetadata) {
736 let mut file = open_new_lock_file(path).expect("create synthetic lock");
737 write_lock_metadata_to_file(&mut file, metadata).expect("write synthetic lock");
738 }
739
740 fn synthetic_metadata(pid: u32, hostname: String, created_at_ms: u64) -> LockMetadata {
741 LockMetadata {
742 pid,
743 hostname,
744 created_at_ms,
745 heartbeat_at_ms: created_at_ms,
746 }
747 }
748
749 fn current_process_metadata() -> LockMetadata {
750 let now = now_ms();
751 synthetic_metadata(std::process::id(), current_hostname(), now)
752 }
753
754 #[test]
755 fn acquire_creates_lockfile_and_unlocks_on_drop() {
756 let (_dir, path) = test_lock_path();
757
758 let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
759 let metadata = read_lock_metadata(&path).expect("read lock metadata");
760 assert_eq!(metadata.pid, std::process::id());
761 assert_eq!(metadata.hostname, current_hostname());
762 assert_eq!(metadata.created_at_ms, guard.metadata.created_at_ms);
763
764 drop(guard);
765 assert!(!path.exists());
766 }
767
768 #[test]
769 fn acquire_serializes_concurrent_callers() {
770 let (_dir, path) = test_lock_path();
771 let path = Arc::new(path);
772 let barrier = Arc::new(Barrier::new(3));
773 let inside = Arc::new(AtomicUsize::new(0));
774 let entered = Arc::new(AtomicUsize::new(0));
775 let max_inside = Arc::new(AtomicUsize::new(0));
776
777 let mut handles = Vec::new();
778 for _ in 0..2 {
779 let path = Arc::clone(&path);
780 let barrier = Arc::clone(&barrier);
781 let inside = Arc::clone(&inside);
782 let entered = Arc::clone(&entered);
783 let max_inside = Arc::clone(&max_inside);
784 handles.push(thread::spawn(move || {
785 barrier.wait();
786 let guard = acquire_with_config(&path, Some(Duration::from_secs(2)), test_config())
787 .expect("thread acquire lock");
788 let previous = inside.fetch_add(1, Ordering::SeqCst);
789 assert_eq!(previous, 0, "two lock holders overlapped");
790 entered.fetch_add(1, Ordering::SeqCst);
791 max_inside.fetch_max(previous + 1, Ordering::SeqCst);
792 thread::sleep(Duration::from_millis(75));
793 inside.fetch_sub(1, Ordering::SeqCst);
794 drop(guard);
795 }));
796 }
797
798 barrier.wait();
799 for handle in handles {
800 handle.join().expect("join worker");
801 }
802
803 assert_eq!(entered.load(Ordering::SeqCst), 2);
804 assert_eq!(max_inside.load(Ordering::SeqCst), 1);
805 assert!(!path.exists());
806 }
807
808 #[test]
809 fn heartbeat_updates_lockfile_timestamp() {
810 let (_dir, path) = test_lock_path();
811 let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
812 let initial = read_lock_metadata(&path)
813 .expect("read initial metadata")
814 .heartbeat_at_ms;
815
816 let deadline = std::time::Instant::now() + Duration::from_millis(2_000);
833 let mut updated = initial;
834 while std::time::Instant::now() < deadline {
835 thread::sleep(Duration::from_millis(50));
836 match read_lock_metadata(&path) {
837 Ok(meta) => {
838 updated = meta.heartbeat_at_ms;
839 if updated > initial {
840 break;
841 }
842 }
843 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
844 continue;
847 }
848 Err(other) => panic!("read updated metadata: {other:?}"),
849 }
850 }
851 assert!(
852 updated > initial,
853 "heartbeat timestamp did not advance within 2s"
854 );
855 drop(guard);
856 }
857
858 #[test]
859 fn dead_pid_lock_is_reclaimed() {
860 let (_dir, path) = test_lock_path();
861 let metadata = synthetic_metadata(999_999_999, current_hostname(), now_ms());
862 write_synthetic_lock(&path, &metadata);
863
864 let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
865 .expect("reclaim dead pid lock");
866 let metadata = read_lock_metadata(&path).expect("read reclaimed lock");
867 assert_eq!(metadata.pid, std::process::id());
868 drop(guard);
869 }
870
871 #[test]
872 fn stale_heartbeat_from_live_pid_blocks() {
873 let (_dir, path) = test_lock_path();
874 let mut metadata = current_process_metadata();
875 metadata.created_at_ms = now_ms().saturating_sub(60_000);
876 metadata.heartbeat_at_ms = now_ms().saturating_sub(60_000);
877 write_synthetic_lock(&path, &metadata);
878
879 let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
880 assert!(matches!(result, Err(AcquireError::Timeout)));
881 assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
882
883 remove_lock_file(&path).expect("cleanup synthetic lock");
884 }
885
886 #[test]
887 fn healthy_live_owner_blocks() {
888 let (_dir, path) = test_lock_path();
889 let metadata = current_process_metadata();
890 write_synthetic_lock(&path, &metadata);
891
892 let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
893 assert!(matches!(result, Err(AcquireError::Timeout)));
894
895 remove_lock_file(&path).expect("cleanup synthetic lock");
896 }
897
898 #[test]
899 fn malformed_lockfile_is_reclaimed() {
900 let (_dir, path) = test_lock_path();
901 fs::write(&path, b"not valid json").expect("write malformed lock");
902
903 let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
904 .expect("reclaim malformed lock");
905 let metadata = read_lock_metadata(&path).expect("read reclaimed lock");
906 assert_eq!(metadata.pid, std::process::id());
907 drop(guard);
908 }
909
910 #[test]
911 fn cross_host_lock_is_not_stolen_before_extended_stale_threshold() {
912 let (_dir, path) = test_lock_path();
913 let now = now_ms();
914 let metadata = LockMetadata {
915 pid: std::process::id(),
916 hostname: format!("{}-other", current_hostname()),
917 created_at_ms: now,
918 heartbeat_at_ms: now,
919 };
920 write_synthetic_lock(&path, &metadata);
921
922 let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
923 assert!(matches!(result, Err(AcquireError::Timeout)));
924 assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
925
926 remove_lock_file(&path).expect("cleanup synthetic lock");
927 }
928
929 #[test]
930 fn stale_cross_host_lock_is_reclaimed_after_extended_threshold() {
931 let (_dir, path) = test_lock_path();
932 let stale_at =
933 now_ms().saturating_sub(test_config().cross_host_stale_heartbeat_ms() + 1_000);
934 let metadata = LockMetadata {
935 pid: std::process::id(),
936 hostname: format!("{}-other", current_hostname()),
937 created_at_ms: stale_at,
938 heartbeat_at_ms: stale_at,
939 };
940 write_synthetic_lock(&path, &metadata);
941
942 let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
943 .expect("reclaim stale cross-host lock");
944 let reclaimed = read_lock_metadata(&path).expect("read reclaimed lock");
945 assert_eq!(reclaimed.hostname, current_hostname());
946 assert_ne!(reclaimed.created_at_ms, metadata.created_at_ms);
947 drop(guard);
948 }
949
950 #[test]
951 fn live_owner_over_10min_warns_but_blocks() {
952 let (_dir, path) = test_lock_path();
953 let mut metadata = current_process_metadata();
954 metadata.created_at_ms = now_ms().saturating_sub(11 * 60 * 1_000);
955 metadata.heartbeat_at_ms = now_ms();
956 write_synthetic_lock(&path, &metadata);
957
958 let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
959 assert!(matches!(result, Err(AcquireError::Timeout)));
960 assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
961
962 remove_lock_file(&path).expect("cleanup synthetic lock");
963 }
964
965 #[test]
966 fn drop_stops_heartbeat_thread() {
967 let (_dir, path) = test_lock_path();
968 let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
969 drop(guard);
970
971 thread::sleep(Duration::from_millis(
972 test_config().heartbeat_interval_ms * 3,
973 ));
974 assert!(
975 !path.exists(),
976 "heartbeat recreated or kept updating lockfile"
977 );
978 }
979
980 #[test]
981 fn heartbeat_error_classification_terminal_vs_transient() {
982 assert!(heartbeat_error_is_terminal(&HeartbeatError::LockGone));
984 assert!(heartbeat_error_is_terminal(&HeartbeatError::NotOwner));
985 assert!(!heartbeat_error_is_terminal(&HeartbeatError::Io(
988 io::Error::other("disk blip")
989 )));
990 let malformed: serde_json::Error =
991 serde_json::from_str::<LockMetadata>("not json").unwrap_err();
992 assert!(!heartbeat_error_is_terminal(&HeartbeatError::Malformed(
993 malformed
994 )));
995 }
996
997 #[test]
998 fn heartbeat_survives_transient_malformed_and_recovers() {
999 let (_dir, path) = test_lock_path();
1007 let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
1008 let owner = guard.metadata.clone();
1009
1010 fs::write(&path, b"{ not valid json").expect("corrupt lockfile");
1015
1016 thread::sleep(Duration::from_millis(
1019 test_config().heartbeat_interval_ms * 4,
1020 ));
1021
1022 let sentinel = now_ms().saturating_sub(1_000_000);
1033 let mut restored = owner.clone();
1034 restored.heartbeat_at_ms = sentinel;
1035 atomic_write_lock_metadata(&path, &restored).expect("atomically restore lock metadata");
1036
1037 let deadline = std::time::Instant::now() + Duration::from_millis(3_000);
1040 let mut recovered = false;
1041 while std::time::Instant::now() < deadline {
1042 thread::sleep(Duration::from_millis(25));
1043 match read_lock_metadata(&path) {
1044 Ok(meta)
1045 if meta.created_at_ms == owner.created_at_ms
1046 && meta.heartbeat_at_ms > sentinel =>
1047 {
1048 recovered = true;
1049 break;
1050 }
1051 _ => continue,
1052 }
1053 }
1054 assert!(
1055 recovered,
1056 "heartbeat did not recover after a transient malformed read — thread likely died"
1057 );
1058 drop(guard);
1059 }
1060}