1use std::fmt;
2use std::fs::{self, File, OpenOptions};
3use std::io::{self, Write};
4use std::path::{Path, PathBuf};
5use std::sync::{
6 atomic::{AtomicBool, AtomicU64, Ordering},
7 mpsc, Arc,
8};
9use std::thread::{self, JoinHandle};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use serde::{Deserialize, Serialize};
13
14use crate::{slog_error, slog_info, slog_warn};
15
16pub const HEARTBEAT_INTERVAL_MS: u64 = 5_000;
17pub const STALE_HEARTBEAT_MS: u64 = 15_000;
18pub const LIVE_OWNER_WARN_MS: u64 = 600_000;
19pub const POLL_INTERVAL_MS: u64 = 100;
20
21#[derive(Clone, Copy, Debug)]
22struct LockConfig {
23 heartbeat_interval_ms: u64,
24 stale_heartbeat_ms: u64,
25 live_owner_warn_ms: u64,
26 poll_interval_ms: u64,
27}
28
29impl LockConfig {
30 fn cross_host_stale_heartbeat_ms(self) -> u64 {
31 self.stale_heartbeat_ms.saturating_mul(5)
32 }
33}
34
35impl Default for LockConfig {
36 fn default() -> Self {
37 Self {
38 heartbeat_interval_ms: HEARTBEAT_INTERVAL_MS,
39 stale_heartbeat_ms: STALE_HEARTBEAT_MS,
40 live_owner_warn_ms: LIVE_OWNER_WARN_MS,
41 poll_interval_ms: POLL_INTERVAL_MS,
42 }
43 }
44}
45
46#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
47struct LockMetadata {
48 pid: u32,
49 hostname: String,
50 created_at_ms: u64,
51 heartbeat_at_ms: u64,
52}
53
54pub fn acquire(path: &Path) -> Result<LockGuard, AcquireError> {
59 acquire_with_config(path, None, LockConfig::default())
60}
61
62pub fn try_acquire(path: &Path, timeout: Duration) -> Result<LockGuard, AcquireError> {
64 acquire_with_config(path, Some(timeout), LockConfig::default())
65}
66
67pub struct LockGuard {
68 path: PathBuf,
69 metadata: LockMetadata,
70 shutdown: Arc<AtomicBool>,
71 heartbeat_done: mpsc::Receiver<()>,
72 heartbeat: Option<JoinHandle<()>>,
73}
74
75impl Drop for LockGuard {
76 fn drop(&mut self) {
77 self.shutdown.store(true, Ordering::Release);
101 if let Some(handle) = self.heartbeat.take() {
102 handle.thread().unpark();
103 let _ = handle.join();
104 }
105 while self.heartbeat_done.try_recv().is_ok() {}
109
110 match remove_lock_if_owned(&self.path, &self.metadata) {
111 Ok(true) => slog_info!("released filesystem lock at {}", self.path.display()),
112 Ok(false) => {}
113 Err(error) => slog_warn!(
114 "failed to release filesystem lock at {}: {}",
115 self.path.display(),
116 error
117 ),
118 }
119 }
120}
121
122#[derive(Debug)]
123pub enum AcquireError {
124 Io(io::Error),
125 Timeout,
126}
127
128impl fmt::Display for AcquireError {
129 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130 match self {
131 AcquireError::Io(error) => write!(f, "filesystem lock I/O error: {error}"),
132 AcquireError::Timeout => write!(f, "timed out acquiring filesystem lock"),
133 }
134 }
135}
136
137impl std::error::Error for AcquireError {}
138
139impl From<io::Error> for AcquireError {
140 fn from(error: io::Error) -> Self {
141 AcquireError::Io(error)
142 }
143}
144
145fn acquire_with_config(
146 path: &Path,
147 timeout: Option<Duration>,
148 config: LockConfig,
149) -> Result<LockGuard, AcquireError> {
150 let deadline = timeout.map(|timeout| Instant::now() + timeout);
151 let hostname = current_hostname();
152 let mut warned_live_owner = false;
153 let mut warned_stale_live_owner = false;
154
155 loop {
156 if let Some(deadline) = deadline {
157 if Instant::now() >= deadline {
158 return Err(AcquireError::Timeout);
159 }
160 }
161
162 match create_new_lock(path, &hostname, config) {
163 Ok(guard) => return Ok(guard),
164 Err(error) if error.kind() == io::ErrorKind::AlreadyExists => {}
165 Err(error) => return Err(error.into()),
166 }
167
168 let metadata = match read_lock_metadata(path) {
169 Ok(metadata) => metadata,
170 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => continue,
171 Err(ReadLockError::Io(error)) => return Err(error.into()),
172 Err(ReadLockError::Malformed(error)) => {
173 sleep_until_retry(deadline, config.poll_interval_ms)?;
177 match read_lock_metadata(path) {
178 Ok(_) => continue,
179 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
180 continue;
181 }
182 Err(ReadLockError::Io(error)) => return Err(error.into()),
183 Err(ReadLockError::Malformed(_)) => {}
184 }
185 slog_warn!(
186 "removing malformed filesystem lock at {}: {}",
187 path.display(),
188 error
189 );
190 remove_lock_file(path)?;
191 continue;
192 }
193 };
194
195 let now = now_ms();
196 let since_heartbeat = now.saturating_sub(metadata.heartbeat_at_ms);
197
198 if metadata.hostname != hostname {
199 let cross_host_stale_ms = config.cross_host_stale_heartbeat_ms();
200 if since_heartbeat > cross_host_stale_ms {
201 slog_warn!(
202 "reclaiming cross-host filesystem lock at {} from host {} after stale heartbeat ({}ms > {}ms)",
203 path.display(),
204 metadata.hostname,
205 since_heartbeat,
206 cross_host_stale_ms
207 );
208 remove_lock_file(path)?;
209 continue;
210 }
211 sleep_until_retry(deadline, config.poll_interval_ms)?;
212 continue;
213 }
214
215 if !process_alive(metadata.pid) {
216 slog_warn!(
217 "removing filesystem lock at {} from dead PID {}",
218 path.display(),
219 metadata.pid
220 );
221 remove_lock_file(path)?;
222 continue;
223 }
224
225 if since_heartbeat > config.stale_heartbeat_ms && !warned_stale_live_owner {
226 slog_warn!(
232 "filesystem lock at {} held by live PID {} has stale heartbeat ({}ms); NOT breaking",
233 path.display(),
234 metadata.pid,
235 since_heartbeat
236 );
237 warned_stale_live_owner = true;
238 }
239
240 let held_for = now.saturating_sub(metadata.created_at_ms);
241 if held_for > config.live_owner_warn_ms && !warned_live_owner {
242 slog_warn!(
243 "filesystem lock at {} held >10min by live heartbeating PID {}; NOT breaking",
244 path.display(),
245 metadata.pid
246 );
247 warned_live_owner = true;
248 }
249
250 sleep_until_retry(deadline, config.poll_interval_ms)?;
251 }
252}
253
254fn create_new_lock(path: &Path, hostname: &str, config: LockConfig) -> io::Result<LockGuard> {
255 let now = now_ms();
256 let metadata = LockMetadata {
257 pid: std::process::id(),
258 hostname: hostname.to_string(),
259 created_at_ms: now,
260 heartbeat_at_ms: now,
261 };
262
263 create_lock_file_atomically(path, &metadata)?;
264
265 let shutdown = Arc::new(AtomicBool::new(false));
266 let (done_tx, done_rx) = mpsc::channel();
267 let heartbeat_path = path.to_path_buf();
268 let heartbeat_metadata = metadata.clone();
269 let heartbeat_shutdown = Arc::clone(&shutdown);
270 let heartbeat = thread::Builder::new()
271 .name("aft-fs-lock-heartbeat".to_string())
272 .spawn(move || {
273 run_heartbeat(
274 heartbeat_path,
275 heartbeat_metadata,
276 heartbeat_shutdown,
277 config,
278 );
279 let _ = done_tx.send(());
280 })?;
281
282 slog_info!("acquired filesystem lock at {}", path.display());
283
284 Ok(LockGuard {
285 path: path.to_path_buf(),
286 metadata,
287 shutdown,
288 heartbeat_done: done_rx,
289 heartbeat: Some(heartbeat),
290 })
291}
292
293fn run_heartbeat(
294 path: PathBuf,
295 owner: LockMetadata,
296 shutdown: Arc<AtomicBool>,
297 config: LockConfig,
298) {
299 let stale_intervals = config
304 .stale_heartbeat_ms
305 .checked_div(config.heartbeat_interval_ms.max(1))
306 .unwrap_or(3)
307 .max(1);
308 let mut consecutive_transient_failures: u64 = 0;
309
310 loop {
311 thread::park_timeout(Duration::from_millis(config.heartbeat_interval_ms));
312 if shutdown.load(Ordering::Acquire) {
313 return;
314 }
315
316 match heartbeat_once(&path, &owner) {
317 Ok(()) => {
318 if consecutive_transient_failures > 0 {
319 slog_info!(
320 "filesystem lock at {} heartbeat recovered after {} transient failure(s)",
321 path.display(),
322 consecutive_transient_failures
323 );
324 consecutive_transient_failures = 0;
325 }
326 }
327 Err(error) if heartbeat_error_is_terminal(&error) => {
328 slog_error!(
333 "{}; stopping heartbeat",
334 terminal_heartbeat_message(&path, &error)
335 );
336 return;
337 }
338 Err(error) => {
339 consecutive_transient_failures += 1;
349 log_transient_heartbeat_failure(
350 &path,
351 &transient_heartbeat_reason(&error),
352 consecutive_transient_failures,
353 stale_intervals,
354 );
355 }
356 }
357 }
358}
359
360fn heartbeat_error_is_terminal(error: &HeartbeatError) -> bool {
366 matches!(error, HeartbeatError::LockGone | HeartbeatError::NotOwner)
367}
368
369fn terminal_heartbeat_message(path: &Path, error: &HeartbeatError) -> String {
370 match error {
371 HeartbeatError::LockGone => {
372 format!("filesystem lock at {} disappeared", path.display())
373 }
374 HeartbeatError::NotOwner => format!(
375 "filesystem lock at {} is no longer owned by this guard",
376 path.display()
377 ),
378 HeartbeatError::Io(error) => {
380 format!("filesystem lock at {} I/O error: {error}", path.display())
381 }
382 HeartbeatError::Malformed(error) => {
383 format!(
384 "filesystem lock at {} became malformed: {error}",
385 path.display()
386 )
387 }
388 }
389}
390
391fn transient_heartbeat_reason(error: &HeartbeatError) -> String {
392 match error {
393 HeartbeatError::Io(error) => format!("I/O error: {error}"),
394 HeartbeatError::Malformed(error) => format!("became malformed: {error}"),
395 HeartbeatError::LockGone => "lock disappeared".to_string(),
396 HeartbeatError::NotOwner => "lock no longer owned".to_string(),
397 }
398}
399
400fn log_transient_heartbeat_failure(
405 path: &Path,
406 reason: &str,
407 consecutive_failures: u64,
408 stale_intervals: u64,
409) {
410 if consecutive_failures < stale_intervals {
411 slog_warn!(
412 "transient failure to heartbeat filesystem lock at {}: {}; retrying (attempt {})",
413 path.display(),
414 reason,
415 consecutive_failures
416 );
417 } else if consecutive_failures == stale_intervals {
418 slog_error!(
419 "filesystem lock at {} has failed {} consecutive heartbeats: {}; \
420 the lock may now be reclaimed by another owner — continuing to retry",
421 path.display(),
422 consecutive_failures,
423 reason
424 );
425 }
426}
427
428fn heartbeat_once(path: &Path, owner: &LockMetadata) -> Result<(), HeartbeatError> {
429 let mut metadata = match read_lock_metadata(path) {
430 Ok(metadata) => metadata,
431 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
432 return Err(HeartbeatError::LockGone);
433 }
434 Err(ReadLockError::Io(error)) => return Err(HeartbeatError::Io(error)),
435 Err(ReadLockError::Malformed(error)) => return Err(HeartbeatError::Malformed(error)),
436 };
437
438 if metadata.pid != owner.pid
439 || metadata.hostname != owner.hostname
440 || metadata.created_at_ms != owner.created_at_ms
441 {
442 return Err(HeartbeatError::NotOwner);
443 }
444
445 metadata.heartbeat_at_ms = now_ms();
446 atomic_write_lock_metadata(path, &metadata).map_err(HeartbeatError::Io)
447}
448
449#[derive(Debug)]
450enum HeartbeatError {
451 Io(io::Error),
452 LockGone,
453 Malformed(serde_json::Error),
454 NotOwner,
455}
456
457#[derive(Debug)]
458enum ReadLockError {
459 Io(io::Error),
460 Malformed(serde_json::Error),
461}
462
463fn read_lock_metadata(path: &Path) -> Result<LockMetadata, ReadLockError> {
464 let bytes = fs::read(path).map_err(ReadLockError::Io)?;
465 serde_json::from_slice(&bytes).map_err(ReadLockError::Malformed)
466}
467
468#[cfg(unix)]
469fn open_new_lock_file(path: &Path) -> io::Result<File> {
470 use std::os::unix::fs::OpenOptionsExt;
471
472 OpenOptions::new()
473 .write(true)
474 .create_new(true)
475 .mode(0o644)
476 .open(path)
477}
478
479#[cfg(not(unix))]
480fn open_new_lock_file(path: &Path) -> io::Result<File> {
481 OpenOptions::new().write(true).create_new(true).open(path)
482}
483
484fn write_lock_metadata_to_file(file: &mut File, metadata: &LockMetadata) -> io::Result<()> {
485 serde_json::to_writer(&mut *file, metadata).map_err(io::Error::other)?;
486 file.write_all(b"\n")?;
487 file.sync_all()
488}
489
490fn create_lock_file_atomically(path: &Path, metadata: &LockMetadata) -> io::Result<()> {
491 let tmp_path = temp_path_for_lock(path);
492 let result = (|| {
493 let mut file = open_new_lock_file(&tmp_path)?;
494 write_lock_metadata_to_file(&mut file, metadata)?;
495 drop(file);
496
497 fs::hard_link(&tmp_path, path)?;
498 sync_parent(path);
499 Ok(())
500 })();
501
502 let _ = fs::remove_file(&tmp_path);
503 result
504}
505
506fn atomic_write_lock_metadata(path: &Path, metadata: &LockMetadata) -> io::Result<()> {
507 let tmp_path = temp_path_for_lock(path);
508 let write_result = (|| {
509 let mut file = OpenOptions::new()
510 .write(true)
511 .create_new(true)
512 .open(&tmp_path)?;
513 write_lock_metadata_to_file(&mut file, metadata)?;
514 drop(file);
515
516 rename_over(&tmp_path, path)?;
517 sync_parent(path);
518 Ok(())
519 })();
520
521 if write_result.is_err() {
522 let _ = fs::remove_file(&tmp_path);
523 }
524
525 write_result
526}
527
528#[cfg(windows)]
529fn rename_over(from: &Path, to: &Path) -> io::Result<()> {
530 match fs::rename(from, to) {
538 Ok(()) => Ok(()),
539 Err(original) => match fs::copy(from, to) {
551 Ok(_) => {
552 let _ = fs::remove_file(from);
553 Ok(())
554 }
555 Err(_) => Err(original),
559 },
560 }
561}
562
563#[cfg(not(windows))]
564fn rename_over(from: &Path, to: &Path) -> io::Result<()> {
565 fs::rename(from, to)
566}
567
568static TEMP_LOCK_COUNTER: AtomicU64 = AtomicU64::new(0);
581
582fn temp_path_for_lock(path: &Path) -> PathBuf {
583 let file_name = path
584 .file_name()
585 .and_then(|name| name.to_str())
586 .unwrap_or("lock");
587 let seq = TEMP_LOCK_COUNTER.fetch_add(1, Ordering::Relaxed);
588 path.with_file_name(format!(
589 ".{file_name}.tmp.{}.{}.{}",
590 std::process::id(),
591 now_nanos(),
592 seq
593 ))
594}
595
596fn remove_lock_if_owned(path: &Path, owner: &LockMetadata) -> io::Result<bool> {
597 let metadata = match read_lock_metadata(path) {
598 Ok(metadata) => metadata,
599 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
600 return Ok(false);
601 }
602 Err(ReadLockError::Io(error)) => return Err(error),
603 Err(ReadLockError::Malformed(_)) => return Ok(false),
604 };
605
606 if metadata.pid == owner.pid
607 && metadata.hostname == owner.hostname
608 && metadata.created_at_ms == owner.created_at_ms
609 {
610 remove_lock_file(path)?;
611 Ok(true)
612 } else {
613 Ok(false)
614 }
615}
616
617fn remove_lock_file(path: &Path) -> io::Result<()> {
618 match fs::remove_file(path) {
619 Ok(()) => Ok(()),
620 Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(()),
621 Err(error) => Err(error),
622 }
623}
624
625fn sleep_until_retry(deadline: Option<Instant>, poll_interval_ms: u64) -> Result<(), AcquireError> {
626 let poll = Duration::from_millis(poll_interval_ms);
627 let sleep_for = match deadline {
628 Some(deadline) => {
629 let now = Instant::now();
630 if now >= deadline {
631 return Err(AcquireError::Timeout);
632 }
633 poll.min(deadline.saturating_duration_since(now))
634 }
635 None => poll,
636 };
637 thread::sleep(sleep_for);
638 Ok(())
639}
640
641fn sync_parent(path: &Path) {
642 if let Some(parent) = path.parent() {
643 if let Ok(dir) = File::open(parent) {
644 let _ = dir.sync_all();
645 }
646 }
647}
648
649fn now_ms() -> u64 {
650 SystemTime::now()
651 .duration_since(UNIX_EPOCH)
652 .unwrap_or(Duration::ZERO)
653 .as_millis() as u64
654}
655
656fn now_nanos() -> u128 {
657 SystemTime::now()
658 .duration_since(UNIX_EPOCH)
659 .unwrap_or(Duration::ZERO)
660 .as_nanos()
661}
662
663#[cfg(unix)]
664fn current_hostname() -> String {
665 let mut buffer = [0u8; 256];
666 let result = unsafe { libc::gethostname(buffer.as_mut_ptr().cast(), buffer.len()) };
667 if result == 0 {
668 let len = buffer
669 .iter()
670 .position(|byte| *byte == 0)
671 .unwrap_or(buffer.len());
672 if len > 0 {
673 return String::from_utf8_lossy(&buffer[..len]).into_owned();
674 }
675 }
676
677 std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown-host".to_string())
678}
679
680#[cfg(windows)]
681fn current_hostname() -> String {
682 std::env::var("COMPUTERNAME")
683 .or_else(|_| std::env::var("HOSTNAME"))
684 .unwrap_or_else(|_| "unknown-host".to_string())
685}
686
687#[cfg(not(any(unix, windows)))]
688fn current_hostname() -> String {
689 std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown-host".to_string())
690}
691
692#[cfg(unix)]
693fn process_alive(pid: u32) -> bool {
694 if pid == 0 || pid > i32::MAX as u32 {
695 return false;
696 }
697
698 let result = unsafe { libc::kill(pid as libc::pid_t, 0) };
699 if result == 0 {
700 return true;
701 }
702
703 io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
704}
705
706#[cfg(windows)]
707fn process_alive(pid: u32) -> bool {
708 let filter = format!("PID eq {pid}");
709 let Ok(output) = std::process::Command::new("tasklist")
710 .args(["/FI", &filter, "/FO", "CSV", "/NH"])
711 .output()
712 else {
713 return true;
714 };
715
716 if !output.status.success() {
717 return true;
718 }
719
720 let stdout = String::from_utf8_lossy(&output.stdout);
721
722 if stdout.contains("No tasks are running") {
733 return false;
734 }
735 stdout.contains(&format!("\"{pid}\""))
736}
737
738#[cfg(not(any(unix, windows)))]
739fn process_alive(_pid: u32) -> bool {
740 true
741}
742
743#[cfg(test)]
744mod tests {
745 use super::*;
746 use std::sync::atomic::{AtomicUsize, Ordering};
747 use std::sync::{Arc, Barrier};
748
749 fn test_config() -> LockConfig {
750 LockConfig {
751 heartbeat_interval_ms: 25,
752 stale_heartbeat_ms: 2_000,
753 live_owner_warn_ms: LIVE_OWNER_WARN_MS,
754 poll_interval_ms: 10,
755 }
756 }
757
758 fn test_lock_path() -> (tempfile::TempDir, PathBuf) {
759 let dir = tempfile::tempdir().expect("create temp dir");
760 let path = dir.path().join("test.lock");
761 (dir, path)
762 }
763
764 fn write_synthetic_lock(path: &Path, metadata: &LockMetadata) {
765 let mut file = open_new_lock_file(path).expect("create synthetic lock");
766 write_lock_metadata_to_file(&mut file, metadata).expect("write synthetic lock");
767 }
768
769 fn synthetic_metadata(pid: u32, hostname: String, created_at_ms: u64) -> LockMetadata {
770 LockMetadata {
771 pid,
772 hostname,
773 created_at_ms,
774 heartbeat_at_ms: created_at_ms,
775 }
776 }
777
778 fn current_process_metadata() -> LockMetadata {
779 let now = now_ms();
780 synthetic_metadata(std::process::id(), current_hostname(), now)
781 }
782
783 #[test]
784 fn acquire_creates_lockfile_and_unlocks_on_drop() {
785 let (_dir, path) = test_lock_path();
786
787 let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
788 let metadata = read_lock_metadata(&path).expect("read lock metadata");
789 assert_eq!(metadata.pid, std::process::id());
790 assert_eq!(metadata.hostname, current_hostname());
791 assert_eq!(metadata.created_at_ms, guard.metadata.created_at_ms);
792
793 drop(guard);
794 assert!(!path.exists());
795 }
796
797 #[test]
798 fn acquire_serializes_concurrent_callers() {
799 let (_dir, path) = test_lock_path();
800 let path = Arc::new(path);
801 let barrier = Arc::new(Barrier::new(3));
802 let inside = Arc::new(AtomicUsize::new(0));
803 let entered = Arc::new(AtomicUsize::new(0));
804 let max_inside = Arc::new(AtomicUsize::new(0));
805
806 let mut handles = Vec::new();
807 for _ in 0..2 {
808 let path = Arc::clone(&path);
809 let barrier = Arc::clone(&barrier);
810 let inside = Arc::clone(&inside);
811 let entered = Arc::clone(&entered);
812 let max_inside = Arc::clone(&max_inside);
813 handles.push(thread::spawn(move || {
814 barrier.wait();
815 let guard = acquire_with_config(&path, Some(Duration::from_secs(2)), test_config())
816 .expect("thread acquire lock");
817 let previous = inside.fetch_add(1, Ordering::SeqCst);
818 assert_eq!(previous, 0, "two lock holders overlapped");
819 entered.fetch_add(1, Ordering::SeqCst);
820 max_inside.fetch_max(previous + 1, Ordering::SeqCst);
821 thread::sleep(Duration::from_millis(75));
822 inside.fetch_sub(1, Ordering::SeqCst);
823 drop(guard);
824 }));
825 }
826
827 barrier.wait();
828 for handle in handles {
829 handle.join().expect("join worker");
830 }
831
832 assert_eq!(entered.load(Ordering::SeqCst), 2);
833 assert_eq!(max_inside.load(Ordering::SeqCst), 1);
834 assert!(!path.exists());
835 }
836
837 #[test]
838 fn heartbeat_updates_lockfile_timestamp() {
839 let (_dir, path) = test_lock_path();
840 let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
841 let initial = read_lock_metadata(&path)
842 .expect("read initial metadata")
843 .heartbeat_at_ms;
844
845 let deadline = std::time::Instant::now() + Duration::from_millis(2_000);
862 let mut updated = initial;
863 while std::time::Instant::now() < deadline {
864 thread::sleep(Duration::from_millis(50));
865 match read_lock_metadata(&path) {
866 Ok(meta) => {
867 updated = meta.heartbeat_at_ms;
868 if updated > initial {
869 break;
870 }
871 }
872 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
873 continue;
876 }
877 Err(other) => panic!("read updated metadata: {other:?}"),
878 }
879 }
880 assert!(
881 updated > initial,
882 "heartbeat timestamp did not advance within 2s"
883 );
884 drop(guard);
885 }
886
887 #[test]
888 fn dead_pid_lock_is_reclaimed() {
889 let (_dir, path) = test_lock_path();
890 let metadata = synthetic_metadata(999_999_999, current_hostname(), now_ms());
891 write_synthetic_lock(&path, &metadata);
892
893 let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
894 .expect("reclaim dead pid lock");
895 let metadata = read_lock_metadata(&path).expect("read reclaimed lock");
896 assert_eq!(metadata.pid, std::process::id());
897 drop(guard);
898 }
899
900 #[test]
901 fn stale_heartbeat_from_live_pid_blocks() {
902 let (_dir, path) = test_lock_path();
903 let mut metadata = current_process_metadata();
904 metadata.created_at_ms = now_ms().saturating_sub(60_000);
905 metadata.heartbeat_at_ms = now_ms().saturating_sub(60_000);
906 write_synthetic_lock(&path, &metadata);
907
908 let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
909 assert!(matches!(result, Err(AcquireError::Timeout)));
910 assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
911
912 remove_lock_file(&path).expect("cleanup synthetic lock");
913 }
914
915 #[test]
916 fn healthy_live_owner_blocks() {
917 let (_dir, path) = test_lock_path();
918 let metadata = current_process_metadata();
919 write_synthetic_lock(&path, &metadata);
920
921 let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
922 assert!(matches!(result, Err(AcquireError::Timeout)));
923
924 remove_lock_file(&path).expect("cleanup synthetic lock");
925 }
926
927 #[test]
928 fn malformed_lockfile_is_reclaimed() {
929 let (_dir, path) = test_lock_path();
930 fs::write(&path, b"not valid json").expect("write malformed lock");
931
932 let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
933 .expect("reclaim malformed lock");
934 let metadata = read_lock_metadata(&path).expect("read reclaimed lock");
935 assert_eq!(metadata.pid, std::process::id());
936 drop(guard);
937 }
938
939 #[test]
940 fn cross_host_lock_is_not_stolen_before_extended_stale_threshold() {
941 let (_dir, path) = test_lock_path();
942 let now = now_ms();
943 let metadata = LockMetadata {
944 pid: std::process::id(),
945 hostname: format!("{}-other", current_hostname()),
946 created_at_ms: now,
947 heartbeat_at_ms: now,
948 };
949 write_synthetic_lock(&path, &metadata);
950
951 let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
952 assert!(matches!(result, Err(AcquireError::Timeout)));
953 assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
954
955 remove_lock_file(&path).expect("cleanup synthetic lock");
956 }
957
958 #[test]
959 fn stale_cross_host_lock_is_reclaimed_after_extended_threshold() {
960 let (_dir, path) = test_lock_path();
961 let stale_at =
962 now_ms().saturating_sub(test_config().cross_host_stale_heartbeat_ms() + 1_000);
963 let metadata = LockMetadata {
964 pid: std::process::id(),
965 hostname: format!("{}-other", current_hostname()),
966 created_at_ms: stale_at,
967 heartbeat_at_ms: stale_at,
968 };
969 write_synthetic_lock(&path, &metadata);
970
971 let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
972 .expect("reclaim stale cross-host lock");
973 let reclaimed = read_lock_metadata(&path).expect("read reclaimed lock");
974 assert_eq!(reclaimed.hostname, current_hostname());
975 assert_ne!(reclaimed.created_at_ms, metadata.created_at_ms);
976 drop(guard);
977 }
978
979 #[test]
980 fn live_owner_over_10min_warns_but_blocks() {
981 let (_dir, path) = test_lock_path();
982 let mut metadata = current_process_metadata();
983 metadata.created_at_ms = now_ms().saturating_sub(11 * 60 * 1_000);
984 metadata.heartbeat_at_ms = now_ms();
985 write_synthetic_lock(&path, &metadata);
986
987 let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
988 assert!(matches!(result, Err(AcquireError::Timeout)));
989 assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
990
991 remove_lock_file(&path).expect("cleanup synthetic lock");
992 }
993
994 #[test]
995 fn drop_stops_heartbeat_thread() {
996 let (_dir, path) = test_lock_path();
997 let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
998 drop(guard);
999
1000 thread::sleep(Duration::from_millis(
1001 test_config().heartbeat_interval_ms * 3,
1002 ));
1003 assert!(
1004 !path.exists(),
1005 "heartbeat recreated or kept updating lockfile"
1006 );
1007 }
1008
1009 #[test]
1010 fn heartbeat_error_classification_terminal_vs_transient() {
1011 assert!(heartbeat_error_is_terminal(&HeartbeatError::LockGone));
1013 assert!(heartbeat_error_is_terminal(&HeartbeatError::NotOwner));
1014 assert!(!heartbeat_error_is_terminal(&HeartbeatError::Io(
1017 io::Error::other("disk blip")
1018 )));
1019 let malformed: serde_json::Error =
1020 serde_json::from_str::<LockMetadata>("not json").unwrap_err();
1021 assert!(!heartbeat_error_is_terminal(&HeartbeatError::Malformed(
1022 malformed
1023 )));
1024 }
1025
1026 #[test]
1027 fn heartbeat_survives_transient_malformed_and_recovers() {
1028 let (_dir, path) = test_lock_path();
1036 let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
1037 let owner = guard.metadata.clone();
1038
1039 fs::write(&path, b"{ not valid json").expect("corrupt lockfile");
1044
1045 thread::sleep(Duration::from_millis(
1048 test_config().heartbeat_interval_ms * 4,
1049 ));
1050
1051 let sentinel = now_ms().saturating_sub(1_000_000);
1062 let mut restored = owner.clone();
1063 restored.heartbeat_at_ms = sentinel;
1064 atomic_write_lock_metadata(&path, &restored).expect("atomically restore lock metadata");
1065
1066 let deadline = std::time::Instant::now() + Duration::from_millis(3_000);
1069 let mut recovered = false;
1070 while std::time::Instant::now() < deadline {
1071 thread::sleep(Duration::from_millis(25));
1072 match read_lock_metadata(&path) {
1073 Ok(meta)
1074 if meta.created_at_ms == owner.created_at_ms
1075 && meta.heartbeat_at_ms > sentinel =>
1076 {
1077 recovered = true;
1078 break;
1079 }
1080 _ => continue,
1081 }
1082 }
1083 assert!(
1084 recovered,
1085 "heartbeat did not recover after a transient malformed read — thread likely died"
1086 );
1087 drop(guard);
1088 }
1089}