1use std::fmt;
2use std::fs::{self, File, OpenOptions};
3use std::io::{self, Write};
4use std::path::{Path, PathBuf};
5use std::sync::{
6 atomic::{AtomicBool, AtomicU64, Ordering},
7 mpsc, Arc,
8};
9use std::thread::{self, JoinHandle};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use serde::{Deserialize, Serialize};
13
14use crate::{slog_error, slog_info, slog_warn};
15
16pub const HEARTBEAT_INTERVAL_MS: u64 = 5_000;
17pub const STALE_HEARTBEAT_MS: u64 = 15_000;
18pub const LIVE_OWNER_WARN_MS: u64 = 600_000;
19pub const POLL_INTERVAL_MS: u64 = 100;
20
21const MAX_TRANSIENT_CREATE_RETRIES: u32 = 50;
29
30fn is_transient_create_contention(error: &io::Error) -> bool {
36 if error.kind() == io::ErrorKind::PermissionDenied {
37 return true;
38 }
39 #[cfg(windows)]
40 {
41 if let Some(code) = error.raw_os_error() {
45 if code == 32 || code == 5 {
46 return true;
47 }
48 }
49 }
50 false
51}
52
53#[derive(Clone, Copy, Debug)]
54struct LockConfig {
55 heartbeat_interval_ms: u64,
56 stale_heartbeat_ms: u64,
57 live_owner_warn_ms: u64,
58 poll_interval_ms: u64,
59}
60
61impl LockConfig {
62 fn cross_host_stale_heartbeat_ms(self) -> u64 {
63 self.stale_heartbeat_ms.saturating_mul(5)
64 }
65}
66
67impl Default for LockConfig {
68 fn default() -> Self {
69 Self {
70 heartbeat_interval_ms: HEARTBEAT_INTERVAL_MS,
71 stale_heartbeat_ms: STALE_HEARTBEAT_MS,
72 live_owner_warn_ms: LIVE_OWNER_WARN_MS,
73 poll_interval_ms: POLL_INTERVAL_MS,
74 }
75 }
76}
77
78#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
79struct LockMetadata {
80 pid: u32,
81 hostname: String,
82 created_at_ms: u64,
83 heartbeat_at_ms: u64,
84}
85
86pub fn acquire(path: &Path) -> Result<LockGuard, AcquireError> {
91 acquire_with_config(path, None, LockConfig::default())
92}
93
94pub fn try_acquire(path: &Path, timeout: Duration) -> Result<LockGuard, AcquireError> {
96 acquire_with_config(path, Some(timeout), LockConfig::default())
97}
98
99pub struct LockGuard {
100 path: PathBuf,
101 metadata: LockMetadata,
102 shutdown: Arc<AtomicBool>,
103 heartbeat_done: mpsc::Receiver<()>,
104 heartbeat: Option<JoinHandle<()>>,
105}
106
107impl Drop for LockGuard {
108 fn drop(&mut self) {
109 self.shutdown.store(true, Ordering::Release);
133 if let Some(handle) = self.heartbeat.take() {
134 handle.thread().unpark();
135 let _ = handle.join();
136 }
137 while self.heartbeat_done.try_recv().is_ok() {}
141
142 match remove_lock_if_owned(&self.path, &self.metadata) {
143 Ok(true) => slog_info!("released filesystem lock at {}", self.path.display()),
144 Ok(false) => {}
145 Err(error) => slog_warn!(
146 "failed to release filesystem lock at {}: {}",
147 self.path.display(),
148 error
149 ),
150 }
151 }
152}
153
154#[derive(Debug)]
155pub enum AcquireError {
156 Io(io::Error),
157 Timeout,
158}
159
160impl fmt::Display for AcquireError {
161 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
162 match self {
163 AcquireError::Io(error) => write!(f, "filesystem lock I/O error: {error}"),
164 AcquireError::Timeout => write!(f, "timed out acquiring filesystem lock"),
165 }
166 }
167}
168
169impl std::error::Error for AcquireError {}
170
171impl From<io::Error> for AcquireError {
172 fn from(error: io::Error) -> Self {
173 AcquireError::Io(error)
174 }
175}
176
177fn acquire_with_config(
178 path: &Path,
179 timeout: Option<Duration>,
180 config: LockConfig,
181) -> Result<LockGuard, AcquireError> {
182 let deadline = timeout.map(|timeout| Instant::now() + timeout);
183 let hostname = current_hostname();
184 let mut warned_live_owner = false;
185 let mut warned_stale_live_owner = false;
186 let mut transient_create_failures: u32 = 0;
187
188 loop {
189 if let Some(deadline) = deadline {
190 if Instant::now() >= deadline {
191 return Err(AcquireError::Timeout);
192 }
193 }
194
195 match create_new_lock(path, &hostname, config) {
196 Ok(guard) => return Ok(guard),
197 Err(error) if error.kind() == io::ErrorKind::AlreadyExists => {}
199 Err(error) if is_transient_create_contention(&error) => {
205 transient_create_failures += 1;
206 if transient_create_failures > MAX_TRANSIENT_CREATE_RETRIES {
207 return Err(error.into());
208 }
209 sleep_until_retry(deadline, config.poll_interval_ms)?;
210 continue;
211 }
212 Err(error) => return Err(error.into()),
213 }
214 transient_create_failures = 0;
215
216 let metadata = match read_lock_metadata(path) {
217 Ok(metadata) => metadata,
218 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => continue,
219 Err(ReadLockError::Io(error)) => return Err(error.into()),
220 Err(ReadLockError::Malformed(error)) => {
221 sleep_until_retry(deadline, config.poll_interval_ms)?;
225 match read_lock_metadata(path) {
226 Ok(_) => continue,
227 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
228 continue;
229 }
230 Err(ReadLockError::Io(error)) => return Err(error.into()),
231 Err(ReadLockError::Malformed(_)) => {}
232 }
233 slog_warn!(
234 "removing malformed filesystem lock at {}: {}",
235 path.display(),
236 error
237 );
238 remove_lock_file(path)?;
239 continue;
240 }
241 };
242
243 let now = now_ms();
244 let since_heartbeat = now.saturating_sub(metadata.heartbeat_at_ms);
245
246 if metadata.hostname != hostname {
247 let cross_host_stale_ms = config.cross_host_stale_heartbeat_ms();
248 if since_heartbeat > cross_host_stale_ms {
249 slog_warn!(
250 "reclaiming cross-host filesystem lock at {} from host {} after stale heartbeat ({}ms > {}ms)",
251 path.display(),
252 metadata.hostname,
253 since_heartbeat,
254 cross_host_stale_ms
255 );
256 reclaim_lock_file(path, &metadata)?;
259 continue;
260 }
261 sleep_until_retry(deadline, config.poll_interval_ms)?;
262 continue;
263 }
264
265 if !process_alive(metadata.pid) {
266 slog_warn!(
267 "removing filesystem lock at {} from dead PID {}",
268 path.display(),
269 metadata.pid
270 );
271 reclaim_lock_file(path, &metadata)?;
275 continue;
276 }
277
278 if since_heartbeat > config.stale_heartbeat_ms && !warned_stale_live_owner {
279 slog_warn!(
285 "filesystem lock at {} held by live PID {} has stale heartbeat ({}ms); NOT breaking",
286 path.display(),
287 metadata.pid,
288 since_heartbeat
289 );
290 warned_stale_live_owner = true;
291 }
292
293 let held_for = now.saturating_sub(metadata.created_at_ms);
294 if held_for > config.live_owner_warn_ms && !warned_live_owner {
295 slog_warn!(
296 "filesystem lock at {} held >10min by live heartbeating PID {}; NOT breaking",
297 path.display(),
298 metadata.pid
299 );
300 warned_live_owner = true;
301 }
302
303 sleep_until_retry(deadline, config.poll_interval_ms)?;
304 }
305}
306
307fn create_new_lock(path: &Path, hostname: &str, config: LockConfig) -> io::Result<LockGuard> {
308 let now = now_ms();
309 let metadata = LockMetadata {
310 pid: std::process::id(),
311 hostname: hostname.to_string(),
312 created_at_ms: now,
313 heartbeat_at_ms: now,
314 };
315
316 create_lock_file_atomically(path, &metadata)?;
317
318 let shutdown = Arc::new(AtomicBool::new(false));
319 let (done_tx, done_rx) = mpsc::channel();
320 let heartbeat_path = path.to_path_buf();
321 let heartbeat_metadata = metadata.clone();
322 let heartbeat_shutdown = Arc::clone(&shutdown);
323 let heartbeat = thread::Builder::new()
324 .name("aft-fs-lock-heartbeat".to_string())
325 .spawn(move || {
326 run_heartbeat(
327 heartbeat_path,
328 heartbeat_metadata,
329 heartbeat_shutdown,
330 config,
331 );
332 let _ = done_tx.send(());
333 })?;
334
335 slog_info!("acquired filesystem lock at {}", path.display());
336
337 Ok(LockGuard {
338 path: path.to_path_buf(),
339 metadata,
340 shutdown,
341 heartbeat_done: done_rx,
342 heartbeat: Some(heartbeat),
343 })
344}
345
346fn run_heartbeat(
347 path: PathBuf,
348 owner: LockMetadata,
349 shutdown: Arc<AtomicBool>,
350 config: LockConfig,
351) {
352 let stale_intervals = config
357 .stale_heartbeat_ms
358 .checked_div(config.heartbeat_interval_ms.max(1))
359 .unwrap_or(3)
360 .max(1);
361 let mut consecutive_transient_failures: u64 = 0;
362
363 loop {
364 thread::park_timeout(Duration::from_millis(config.heartbeat_interval_ms));
365 if shutdown.load(Ordering::Acquire) {
366 return;
367 }
368
369 match heartbeat_once(&path, &owner) {
370 Ok(()) => {
371 if consecutive_transient_failures > 0 {
372 slog_info!(
373 "filesystem lock at {} heartbeat recovered after {} transient failure(s)",
374 path.display(),
375 consecutive_transient_failures
376 );
377 consecutive_transient_failures = 0;
378 }
379 }
380 Err(error) if heartbeat_error_is_terminal(&error) => {
381 slog_error!(
386 "{}; stopping heartbeat",
387 terminal_heartbeat_message(&path, &error)
388 );
389 return;
390 }
391 Err(error) => {
392 consecutive_transient_failures += 1;
402 log_transient_heartbeat_failure(
403 &path,
404 &transient_heartbeat_reason(&error),
405 consecutive_transient_failures,
406 stale_intervals,
407 );
408 }
409 }
410 }
411}
412
413fn heartbeat_error_is_terminal(error: &HeartbeatError) -> bool {
419 matches!(error, HeartbeatError::LockGone | HeartbeatError::NotOwner)
420}
421
422fn terminal_heartbeat_message(path: &Path, error: &HeartbeatError) -> String {
423 match error {
424 HeartbeatError::LockGone => {
425 format!("filesystem lock at {} disappeared", path.display())
426 }
427 HeartbeatError::NotOwner => format!(
428 "filesystem lock at {} is no longer owned by this guard",
429 path.display()
430 ),
431 HeartbeatError::Io(error) => {
433 format!("filesystem lock at {} I/O error: {error}", path.display())
434 }
435 HeartbeatError::Malformed(error) => {
436 format!(
437 "filesystem lock at {} became malformed: {error}",
438 path.display()
439 )
440 }
441 }
442}
443
444fn transient_heartbeat_reason(error: &HeartbeatError) -> String {
445 match error {
446 HeartbeatError::Io(error) => format!("I/O error: {error}"),
447 HeartbeatError::Malformed(error) => format!("became malformed: {error}"),
448 HeartbeatError::LockGone => "lock disappeared".to_string(),
449 HeartbeatError::NotOwner => "lock no longer owned".to_string(),
450 }
451}
452
453fn log_transient_heartbeat_failure(
458 path: &Path,
459 reason: &str,
460 consecutive_failures: u64,
461 stale_intervals: u64,
462) {
463 if consecutive_failures < stale_intervals {
464 slog_warn!(
465 "transient failure to heartbeat filesystem lock at {}: {}; retrying (attempt {})",
466 path.display(),
467 reason,
468 consecutive_failures
469 );
470 } else if consecutive_failures == stale_intervals {
471 slog_error!(
472 "filesystem lock at {} has failed {} consecutive heartbeats: {}; \
473 the lock may now be reclaimed by another owner — continuing to retry",
474 path.display(),
475 consecutive_failures,
476 reason
477 );
478 }
479}
480
481fn heartbeat_once(path: &Path, owner: &LockMetadata) -> Result<(), HeartbeatError> {
482 let mut metadata = match read_lock_metadata(path) {
483 Ok(metadata) => metadata,
484 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
485 return Err(HeartbeatError::LockGone);
486 }
487 Err(ReadLockError::Io(error)) => return Err(HeartbeatError::Io(error)),
488 Err(ReadLockError::Malformed(error)) => return Err(HeartbeatError::Malformed(error)),
489 };
490
491 if metadata.pid != owner.pid
492 || metadata.hostname != owner.hostname
493 || metadata.created_at_ms != owner.created_at_ms
494 {
495 return Err(HeartbeatError::NotOwner);
496 }
497
498 metadata.heartbeat_at_ms = now_ms();
499 atomic_write_lock_metadata(path, &metadata).map_err(HeartbeatError::Io)
500}
501
502#[derive(Debug)]
503enum HeartbeatError {
504 Io(io::Error),
505 LockGone,
506 Malformed(serde_json::Error),
507 NotOwner,
508}
509
510#[derive(Debug)]
511enum ReadLockError {
512 Io(io::Error),
513 Malformed(serde_json::Error),
514}
515
516fn read_lock_metadata(path: &Path) -> Result<LockMetadata, ReadLockError> {
517 let bytes = fs::read(path).map_err(ReadLockError::Io)?;
518 serde_json::from_slice(&bytes).map_err(ReadLockError::Malformed)
519}
520
521#[cfg(unix)]
522fn open_new_lock_file(path: &Path) -> io::Result<File> {
523 use std::os::unix::fs::OpenOptionsExt;
524
525 OpenOptions::new()
526 .write(true)
527 .create_new(true)
528 .mode(0o644)
529 .open(path)
530}
531
532#[cfg(not(unix))]
533fn open_new_lock_file(path: &Path) -> io::Result<File> {
534 OpenOptions::new().write(true).create_new(true).open(path)
535}
536
537fn write_lock_metadata_to_file(file: &mut File, metadata: &LockMetadata) -> io::Result<()> {
538 serde_json::to_writer(&mut *file, metadata).map_err(io::Error::other)?;
539 file.write_all(b"\n")?;
540 file.sync_all()
541}
542
543fn create_lock_file_atomically(path: &Path, metadata: &LockMetadata) -> io::Result<()> {
544 let tmp_path = temp_path_for_lock(path);
545 let result = (|| {
546 let mut file = open_new_lock_file(&tmp_path)?;
547 write_lock_metadata_to_file(&mut file, metadata)?;
548 drop(file);
549
550 fs::hard_link(&tmp_path, path)?;
551 sync_parent(path);
552 Ok(())
553 })();
554
555 let _ = fs::remove_file(&tmp_path);
556 result
557}
558
559fn atomic_write_lock_metadata(path: &Path, metadata: &LockMetadata) -> io::Result<()> {
560 let tmp_path = temp_path_for_lock(path);
561 let write_result = (|| {
562 let mut file = OpenOptions::new()
563 .write(true)
564 .create_new(true)
565 .open(&tmp_path)?;
566 write_lock_metadata_to_file(&mut file, metadata)?;
567 drop(file);
568
569 rename_over(&tmp_path, path)?;
570 sync_parent(path);
571 Ok(())
572 })();
573
574 if write_result.is_err() {
575 let _ = fs::remove_file(&tmp_path);
576 }
577
578 write_result
579}
580
581#[cfg(windows)]
582fn rename_over(from: &Path, to: &Path) -> io::Result<()> {
583 match fs::rename(from, to) {
591 Ok(()) => Ok(()),
592 Err(original) => match fs::copy(from, to) {
604 Ok(_) => {
605 let _ = fs::remove_file(from);
606 Ok(())
607 }
608 Err(_) => Err(original),
612 },
613 }
614}
615
616#[cfg(not(windows))]
617fn rename_over(from: &Path, to: &Path) -> io::Result<()> {
618 fs::rename(from, to)
619}
620
621static TEMP_LOCK_COUNTER: AtomicU64 = AtomicU64::new(0);
634
635fn temp_path_for_lock(path: &Path) -> PathBuf {
636 let file_name = path
637 .file_name()
638 .and_then(|name| name.to_str())
639 .unwrap_or("lock");
640 let seq = TEMP_LOCK_COUNTER.fetch_add(1, Ordering::Relaxed);
641 path.with_file_name(format!(
642 ".{file_name}.tmp.{}.{}.{}",
643 std::process::id(),
644 now_nanos(),
645 seq
646 ))
647}
648
649fn remove_lock_if_owned(path: &Path, owner: &LockMetadata) -> io::Result<bool> {
650 let metadata = match read_lock_metadata(path) {
651 Ok(metadata) => metadata,
652 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
653 return Ok(false);
654 }
655 Err(ReadLockError::Io(error)) => return Err(error),
656 Err(ReadLockError::Malformed(_)) => return Ok(false),
657 };
658
659 if metadata.pid == owner.pid
660 && metadata.hostname == owner.hostname
661 && metadata.created_at_ms == owner.created_at_ms
662 {
663 remove_lock_file(path)?;
664 Ok(true)
665 } else {
666 Ok(false)
667 }
668}
669
670fn remove_lock_file(path: &Path) -> io::Result<()> {
671 match fs::remove_file(path) {
672 Ok(()) => Ok(()),
673 Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(()),
674 Err(error) => Err(error),
675 }
676}
677
678fn reclaim_lock_file(path: &Path, judged: &LockMetadata) -> io::Result<bool> {
688 match read_lock_metadata(path) {
689 Ok(current) => {
690 if current.pid == judged.pid
691 && current.hostname == judged.hostname
692 && current.created_at_ms == judged.created_at_ms
693 {
694 remove_lock_file(path)?;
695 Ok(true)
696 } else {
697 Ok(false)
699 }
700 }
701 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => Ok(false),
703 Err(ReadLockError::Malformed(_)) => Ok(false),
705 Err(ReadLockError::Io(error)) => Err(error),
706 }
707}
708
709fn sleep_until_retry(deadline: Option<Instant>, poll_interval_ms: u64) -> Result<(), AcquireError> {
710 let poll = Duration::from_millis(poll_interval_ms);
711 let sleep_for = match deadline {
712 Some(deadline) => {
713 let now = Instant::now();
714 if now >= deadline {
715 return Err(AcquireError::Timeout);
716 }
717 poll.min(deadline.saturating_duration_since(now))
718 }
719 None => poll,
720 };
721 thread::sleep(sleep_for);
722 Ok(())
723}
724
725fn sync_parent(path: &Path) {
726 if let Some(parent) = path.parent() {
727 if let Ok(dir) = File::open(parent) {
728 let _ = dir.sync_all();
729 }
730 }
731}
732
733fn now_ms() -> u64 {
734 SystemTime::now()
735 .duration_since(UNIX_EPOCH)
736 .unwrap_or(Duration::ZERO)
737 .as_millis() as u64
738}
739
740fn now_nanos() -> u128 {
741 SystemTime::now()
742 .duration_since(UNIX_EPOCH)
743 .unwrap_or(Duration::ZERO)
744 .as_nanos()
745}
746
747#[cfg(unix)]
748fn current_hostname() -> String {
749 let mut buffer = [0u8; 256];
750 let result = unsafe { libc::gethostname(buffer.as_mut_ptr().cast(), buffer.len()) };
751 if result == 0 {
752 let len = buffer
753 .iter()
754 .position(|byte| *byte == 0)
755 .unwrap_or(buffer.len());
756 if len > 0 {
757 return String::from_utf8_lossy(&buffer[..len]).into_owned();
758 }
759 }
760
761 std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown-host".to_string())
762}
763
764#[cfg(windows)]
765fn current_hostname() -> String {
766 std::env::var("COMPUTERNAME")
767 .or_else(|_| std::env::var("HOSTNAME"))
768 .unwrap_or_else(|_| "unknown-host".to_string())
769}
770
771#[cfg(not(any(unix, windows)))]
772fn current_hostname() -> String {
773 std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown-host".to_string())
774}
775
776#[cfg(unix)]
777fn process_alive(pid: u32) -> bool {
778 if pid == 0 || pid > i32::MAX as u32 {
779 return false;
780 }
781
782 let result = unsafe { libc::kill(pid as libc::pid_t, 0) };
783 if result == 0 {
784 return true;
785 }
786
787 io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
788}
789
790#[cfg(windows)]
791fn process_alive(pid: u32) -> bool {
792 let filter = format!("PID eq {pid}");
793 let Ok(output) = std::process::Command::new("tasklist")
794 .args(["/FI", &filter, "/FO", "CSV", "/NH"])
795 .output()
796 else {
797 return true;
798 };
799
800 if !output.status.success() {
801 return true;
802 }
803
804 let stdout = String::from_utf8_lossy(&output.stdout);
805
806 if stdout.contains("No tasks are running") {
817 return false;
818 }
819 stdout.contains(&format!("\"{pid}\""))
820}
821
822#[cfg(not(any(unix, windows)))]
823fn process_alive(_pid: u32) -> bool {
824 true
825}
826
827#[cfg(test)]
828mod tests {
829 use super::*;
830 use std::sync::atomic::{AtomicUsize, Ordering};
831 use std::sync::{Arc, Barrier};
832
833 fn test_config() -> LockConfig {
834 LockConfig {
835 heartbeat_interval_ms: 25,
836 stale_heartbeat_ms: 2_000,
837 live_owner_warn_ms: LIVE_OWNER_WARN_MS,
838 poll_interval_ms: 10,
839 }
840 }
841
842 fn test_lock_path() -> (tempfile::TempDir, PathBuf) {
843 let dir = tempfile::tempdir().expect("create temp dir");
844 let path = dir.path().join("test.lock");
845 (dir, path)
846 }
847
848 fn write_synthetic_lock(path: &Path, metadata: &LockMetadata) {
849 let mut file = open_new_lock_file(path).expect("create synthetic lock");
850 write_lock_metadata_to_file(&mut file, metadata).expect("write synthetic lock");
851 }
852
853 fn synthetic_metadata(pid: u32, hostname: String, created_at_ms: u64) -> LockMetadata {
854 LockMetadata {
855 pid,
856 hostname,
857 created_at_ms,
858 heartbeat_at_ms: created_at_ms,
859 }
860 }
861
862 fn current_process_metadata() -> LockMetadata {
863 let now = now_ms();
864 synthetic_metadata(std::process::id(), current_hostname(), now)
865 }
866
867 #[test]
868 fn acquire_creates_lockfile_and_unlocks_on_drop() {
869 let (_dir, path) = test_lock_path();
870
871 let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
872 let metadata = read_lock_metadata(&path).expect("read lock metadata");
873 assert_eq!(metadata.pid, std::process::id());
874 assert_eq!(metadata.hostname, current_hostname());
875 assert_eq!(metadata.created_at_ms, guard.metadata.created_at_ms);
876
877 drop(guard);
878 assert!(!path.exists());
879 }
880
881 #[test]
882 fn permission_denied_is_treated_as_transient_create_contention() {
883 let err = io::Error::from(io::ErrorKind::PermissionDenied);
886 assert!(is_transient_create_contention(&err));
887 }
888
889 #[test]
890 fn unrelated_io_errors_are_not_treated_as_contention() {
891 let err = io::Error::from(io::ErrorKind::NotFound);
894 assert!(!is_transient_create_contention(&err));
895 }
896
897 #[cfg(windows)]
898 #[test]
899 fn windows_sharing_violation_is_treated_as_transient_create_contention() {
900 let err = io::Error::from_raw_os_error(32);
903 assert!(is_transient_create_contention(&err));
904 }
905
906 #[test]
907 fn reclaim_refuses_to_delete_a_different_owners_lock() {
908 let (_dir, path) = test_lock_path();
909
910 let owner_b = synthetic_metadata(4242, "host-b".to_string(), now_ms());
912 create_lock_file_atomically(&path, &owner_b).expect("write owner B lock");
913
914 let judged_a = synthetic_metadata(1111, "host-a".to_string(), now_ms() - 1_000_000);
917 let removed = reclaim_lock_file(&path, &judged_a).expect("reclaim");
918 assert!(!removed, "must not remove a different owner's lock");
919 assert!(path.exists(), "owner B's lock must survive");
920 let still = read_lock_metadata(&path).expect("still readable");
921 assert_eq!(still.pid, 4242, "owner B's lock intact");
922 }
923
924 #[test]
925 fn reclaim_deletes_when_identity_still_matches() {
926 let (_dir, path) = test_lock_path();
927 let owner = synthetic_metadata(1111, "host-a".to_string(), 5_000);
928 create_lock_file_atomically(&path, &owner).expect("write lock");
929
930 let removed = reclaim_lock_file(&path, &owner).expect("reclaim");
932 assert!(removed, "matching-identity stale lock should be removed");
933 assert!(!path.exists());
934
935 assert!(!reclaim_lock_file(&path, &owner).expect("reclaim missing"));
937 }
938
939 #[test]
940 fn acquire_serializes_concurrent_callers() {
941 let (_dir, path) = test_lock_path();
942 let path = Arc::new(path);
943 let barrier = Arc::new(Barrier::new(3));
944 let inside = Arc::new(AtomicUsize::new(0));
945 let entered = Arc::new(AtomicUsize::new(0));
946 let max_inside = Arc::new(AtomicUsize::new(0));
947
948 let mut handles = Vec::new();
949 for _ in 0..2 {
950 let path = Arc::clone(&path);
951 let barrier = Arc::clone(&barrier);
952 let inside = Arc::clone(&inside);
953 let entered = Arc::clone(&entered);
954 let max_inside = Arc::clone(&max_inside);
955 handles.push(thread::spawn(move || {
956 barrier.wait();
957 let guard = acquire_with_config(&path, Some(Duration::from_secs(2)), test_config())
958 .expect("thread acquire lock");
959 let previous = inside.fetch_add(1, Ordering::SeqCst);
960 assert_eq!(previous, 0, "two lock holders overlapped");
961 entered.fetch_add(1, Ordering::SeqCst);
962 max_inside.fetch_max(previous + 1, Ordering::SeqCst);
963 thread::sleep(Duration::from_millis(75));
964 inside.fetch_sub(1, Ordering::SeqCst);
965 drop(guard);
966 }));
967 }
968
969 barrier.wait();
970 for handle in handles {
971 handle.join().expect("join worker");
972 }
973
974 assert_eq!(entered.load(Ordering::SeqCst), 2);
975 assert_eq!(max_inside.load(Ordering::SeqCst), 1);
976 assert!(!path.exists());
977 }
978
979 #[test]
980 fn heartbeat_updates_lockfile_timestamp() {
981 let (_dir, path) = test_lock_path();
982 let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
983 let initial = read_lock_metadata(&path)
984 .expect("read initial metadata")
985 .heartbeat_at_ms;
986
987 let deadline = std::time::Instant::now() + Duration::from_millis(2_000);
1004 let mut updated = initial;
1005 while std::time::Instant::now() < deadline {
1006 thread::sleep(Duration::from_millis(50));
1007 match read_lock_metadata(&path) {
1008 Ok(meta) => {
1009 updated = meta.heartbeat_at_ms;
1010 if updated > initial {
1011 break;
1012 }
1013 }
1014 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
1015 continue;
1018 }
1019 Err(other) => panic!("read updated metadata: {other:?}"),
1020 }
1021 }
1022 assert!(
1023 updated > initial,
1024 "heartbeat timestamp did not advance within 2s"
1025 );
1026 drop(guard);
1027 }
1028
1029 #[test]
1030 fn dead_pid_lock_is_reclaimed() {
1031 let (_dir, path) = test_lock_path();
1032 let metadata = synthetic_metadata(999_999_999, current_hostname(), now_ms());
1033 write_synthetic_lock(&path, &metadata);
1034
1035 let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
1036 .expect("reclaim dead pid lock");
1037 let metadata = read_lock_metadata(&path).expect("read reclaimed lock");
1038 assert_eq!(metadata.pid, std::process::id());
1039 drop(guard);
1040 }
1041
1042 #[test]
1043 fn stale_heartbeat_from_live_pid_blocks() {
1044 let (_dir, path) = test_lock_path();
1045 let mut metadata = current_process_metadata();
1046 metadata.created_at_ms = now_ms().saturating_sub(60_000);
1047 metadata.heartbeat_at_ms = now_ms().saturating_sub(60_000);
1048 write_synthetic_lock(&path, &metadata);
1049
1050 let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
1051 assert!(matches!(result, Err(AcquireError::Timeout)));
1052 assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
1053
1054 remove_lock_file(&path).expect("cleanup synthetic lock");
1055 }
1056
1057 #[test]
1058 fn healthy_live_owner_blocks() {
1059 let (_dir, path) = test_lock_path();
1060 let metadata = current_process_metadata();
1061 write_synthetic_lock(&path, &metadata);
1062
1063 let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
1064 assert!(matches!(result, Err(AcquireError::Timeout)));
1065
1066 remove_lock_file(&path).expect("cleanup synthetic lock");
1067 }
1068
1069 #[test]
1070 fn malformed_lockfile_is_reclaimed() {
1071 let (_dir, path) = test_lock_path();
1072 fs::write(&path, b"not valid json").expect("write malformed lock");
1073
1074 let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
1075 .expect("reclaim malformed lock");
1076 let metadata = read_lock_metadata(&path).expect("read reclaimed lock");
1077 assert_eq!(metadata.pid, std::process::id());
1078 drop(guard);
1079 }
1080
1081 #[test]
1082 fn cross_host_lock_is_not_stolen_before_extended_stale_threshold() {
1083 let (_dir, path) = test_lock_path();
1084 let now = now_ms();
1085 let metadata = LockMetadata {
1086 pid: std::process::id(),
1087 hostname: format!("{}-other", current_hostname()),
1088 created_at_ms: now,
1089 heartbeat_at_ms: now,
1090 };
1091 write_synthetic_lock(&path, &metadata);
1092
1093 let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
1094 assert!(matches!(result, Err(AcquireError::Timeout)));
1095 assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
1096
1097 remove_lock_file(&path).expect("cleanup synthetic lock");
1098 }
1099
1100 #[test]
1101 fn stale_cross_host_lock_is_reclaimed_after_extended_threshold() {
1102 let (_dir, path) = test_lock_path();
1103 let stale_at =
1104 now_ms().saturating_sub(test_config().cross_host_stale_heartbeat_ms() + 1_000);
1105 let metadata = LockMetadata {
1106 pid: std::process::id(),
1107 hostname: format!("{}-other", current_hostname()),
1108 created_at_ms: stale_at,
1109 heartbeat_at_ms: stale_at,
1110 };
1111 write_synthetic_lock(&path, &metadata);
1112
1113 let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
1114 .expect("reclaim stale cross-host lock");
1115 let reclaimed = read_lock_metadata(&path).expect("read reclaimed lock");
1116 assert_eq!(reclaimed.hostname, current_hostname());
1117 assert_ne!(reclaimed.created_at_ms, metadata.created_at_ms);
1118 drop(guard);
1119 }
1120
1121 #[test]
1122 fn live_owner_over_10min_warns_but_blocks() {
1123 let (_dir, path) = test_lock_path();
1124 let mut metadata = current_process_metadata();
1125 metadata.created_at_ms = now_ms().saturating_sub(11 * 60 * 1_000);
1126 metadata.heartbeat_at_ms = now_ms();
1127 write_synthetic_lock(&path, &metadata);
1128
1129 let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
1130 assert!(matches!(result, Err(AcquireError::Timeout)));
1131 assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
1132
1133 remove_lock_file(&path).expect("cleanup synthetic lock");
1134 }
1135
1136 #[test]
1137 fn drop_stops_heartbeat_thread() {
1138 let (_dir, path) = test_lock_path();
1139 let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
1140 drop(guard);
1141
1142 thread::sleep(Duration::from_millis(
1143 test_config().heartbeat_interval_ms * 3,
1144 ));
1145 assert!(
1146 !path.exists(),
1147 "heartbeat recreated or kept updating lockfile"
1148 );
1149 }
1150
1151 #[test]
1152 fn heartbeat_error_classification_terminal_vs_transient() {
1153 assert!(heartbeat_error_is_terminal(&HeartbeatError::LockGone));
1155 assert!(heartbeat_error_is_terminal(&HeartbeatError::NotOwner));
1156 assert!(!heartbeat_error_is_terminal(&HeartbeatError::Io(
1159 io::Error::other("disk blip")
1160 )));
1161 let malformed: serde_json::Error =
1162 serde_json::from_str::<LockMetadata>("not json").unwrap_err();
1163 assert!(!heartbeat_error_is_terminal(&HeartbeatError::Malformed(
1164 malformed
1165 )));
1166 }
1167
1168 #[test]
1169 fn heartbeat_survives_transient_malformed_and_recovers() {
1170 let (_dir, path) = test_lock_path();
1178 let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
1179 let owner = guard.metadata.clone();
1180
1181 fs::write(&path, b"{ not valid json").expect("corrupt lockfile");
1186
1187 thread::sleep(Duration::from_millis(
1190 test_config().heartbeat_interval_ms * 4,
1191 ));
1192
1193 let sentinel = now_ms().saturating_sub(1_000_000);
1204 let mut restored = owner.clone();
1205 restored.heartbeat_at_ms = sentinel;
1206 atomic_write_lock_metadata(&path, &restored).expect("atomically restore lock metadata");
1207
1208 let deadline = std::time::Instant::now() + Duration::from_millis(3_000);
1211 let mut recovered = false;
1212 while std::time::Instant::now() < deadline {
1213 thread::sleep(Duration::from_millis(25));
1214 match read_lock_metadata(&path) {
1215 Ok(meta)
1216 if meta.created_at_ms == owner.created_at_ms
1217 && meta.heartbeat_at_ms > sentinel =>
1218 {
1219 recovered = true;
1220 break;
1221 }
1222 _ => continue,
1223 }
1224 }
1225 assert!(
1226 recovered,
1227 "heartbeat did not recover after a transient malformed read — thread likely died"
1228 );
1229 drop(guard);
1230 }
1231}