1use std::fmt;
2use std::fs::{self, File, OpenOptions};
3use std::io::{self, Write};
4use std::path::{Path, PathBuf};
5use std::sync::{
6 atomic::{AtomicBool, AtomicU64, Ordering},
7 mpsc, Arc,
8};
9use std::thread::{self, JoinHandle};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use serde::{Deserialize, Serialize};
13
14use crate::{slog_error, slog_info, slog_warn};
15
16pub const HEARTBEAT_INTERVAL_MS: u64 = 5_000;
17pub const STALE_HEARTBEAT_MS: u64 = 15_000;
18pub const LIVE_OWNER_WARN_MS: u64 = 600_000;
19pub const POLL_INTERVAL_MS: u64 = 100;
20
21#[derive(Clone, Copy, Debug)]
22struct LockConfig {
23 heartbeat_interval_ms: u64,
24 stale_heartbeat_ms: u64,
25 live_owner_warn_ms: u64,
26 poll_interval_ms: u64,
27}
28
29impl LockConfig {
30 fn cross_host_stale_heartbeat_ms(self) -> u64 {
31 self.stale_heartbeat_ms.saturating_mul(5)
32 }
33}
34
35impl Default for LockConfig {
36 fn default() -> Self {
37 Self {
38 heartbeat_interval_ms: HEARTBEAT_INTERVAL_MS,
39 stale_heartbeat_ms: STALE_HEARTBEAT_MS,
40 live_owner_warn_ms: LIVE_OWNER_WARN_MS,
41 poll_interval_ms: POLL_INTERVAL_MS,
42 }
43 }
44}
45
46#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
47struct LockMetadata {
48 pid: u32,
49 hostname: String,
50 created_at_ms: u64,
51 heartbeat_at_ms: u64,
52}
53
54pub fn acquire(path: &Path) -> Result<LockGuard, AcquireError> {
59 acquire_with_config(path, None, LockConfig::default())
60}
61
62pub fn try_acquire(path: &Path, timeout: Duration) -> Result<LockGuard, AcquireError> {
64 acquire_with_config(path, Some(timeout), LockConfig::default())
65}
66
67pub struct LockGuard {
68 path: PathBuf,
69 metadata: LockMetadata,
70 shutdown: Arc<AtomicBool>,
71 heartbeat_done: mpsc::Receiver<()>,
72 heartbeat: Option<JoinHandle<()>>,
73}
74
75impl Drop for LockGuard {
76 fn drop(&mut self) {
77 self.shutdown.store(true, Ordering::Release);
101 if let Some(handle) = self.heartbeat.take() {
102 handle.thread().unpark();
103 let _ = handle.join();
104 }
105 while self.heartbeat_done.try_recv().is_ok() {}
109
110 match remove_lock_if_owned(&self.path, &self.metadata) {
111 Ok(true) => slog_info!("released filesystem lock at {}", self.path.display()),
112 Ok(false) => {}
113 Err(error) => slog_warn!(
114 "failed to release filesystem lock at {}: {}",
115 self.path.display(),
116 error
117 ),
118 }
119 }
120}
121
122#[derive(Debug)]
123pub enum AcquireError {
124 Io(io::Error),
125 Timeout,
126}
127
128impl fmt::Display for AcquireError {
129 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130 match self {
131 AcquireError::Io(error) => write!(f, "filesystem lock I/O error: {error}"),
132 AcquireError::Timeout => write!(f, "timed out acquiring filesystem lock"),
133 }
134 }
135}
136
137impl std::error::Error for AcquireError {}
138
139impl From<io::Error> for AcquireError {
140 fn from(error: io::Error) -> Self {
141 AcquireError::Io(error)
142 }
143}
144
145fn acquire_with_config(
146 path: &Path,
147 timeout: Option<Duration>,
148 config: LockConfig,
149) -> Result<LockGuard, AcquireError> {
150 let deadline = timeout.map(|timeout| Instant::now() + timeout);
151 let hostname = current_hostname();
152 let mut warned_live_owner = false;
153 let mut warned_stale_live_owner = false;
154
155 loop {
156 if let Some(deadline) = deadline {
157 if Instant::now() >= deadline {
158 return Err(AcquireError::Timeout);
159 }
160 }
161
162 match create_new_lock(path, &hostname, config) {
163 Ok(guard) => return Ok(guard),
164 Err(error) if error.kind() == io::ErrorKind::AlreadyExists => {}
165 Err(error) => return Err(error.into()),
166 }
167
168 let metadata = match read_lock_metadata(path) {
169 Ok(metadata) => metadata,
170 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => continue,
171 Err(ReadLockError::Io(error)) => return Err(error.into()),
172 Err(ReadLockError::Malformed(error)) => {
173 sleep_until_retry(deadline, config.poll_interval_ms)?;
177 match read_lock_metadata(path) {
178 Ok(_) => continue,
179 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
180 continue;
181 }
182 Err(ReadLockError::Io(error)) => return Err(error.into()),
183 Err(ReadLockError::Malformed(_)) => {}
184 }
185 slog_warn!(
186 "removing malformed filesystem lock at {}: {}",
187 path.display(),
188 error
189 );
190 remove_lock_file(path)?;
191 continue;
192 }
193 };
194
195 let now = now_ms();
196 let since_heartbeat = now.saturating_sub(metadata.heartbeat_at_ms);
197
198 if metadata.hostname != hostname {
199 let cross_host_stale_ms = config.cross_host_stale_heartbeat_ms();
200 if since_heartbeat > cross_host_stale_ms {
201 slog_warn!(
202 "reclaiming cross-host filesystem lock at {} from host {} after stale heartbeat ({}ms > {}ms)",
203 path.display(),
204 metadata.hostname,
205 since_heartbeat,
206 cross_host_stale_ms
207 );
208 reclaim_lock_file(path, &metadata)?;
211 continue;
212 }
213 sleep_until_retry(deadline, config.poll_interval_ms)?;
214 continue;
215 }
216
217 if !process_alive(metadata.pid) {
218 slog_warn!(
219 "removing filesystem lock at {} from dead PID {}",
220 path.display(),
221 metadata.pid
222 );
223 reclaim_lock_file(path, &metadata)?;
227 continue;
228 }
229
230 if since_heartbeat > config.stale_heartbeat_ms && !warned_stale_live_owner {
231 slog_warn!(
237 "filesystem lock at {} held by live PID {} has stale heartbeat ({}ms); NOT breaking",
238 path.display(),
239 metadata.pid,
240 since_heartbeat
241 );
242 warned_stale_live_owner = true;
243 }
244
245 let held_for = now.saturating_sub(metadata.created_at_ms);
246 if held_for > config.live_owner_warn_ms && !warned_live_owner {
247 slog_warn!(
248 "filesystem lock at {} held >10min by live heartbeating PID {}; NOT breaking",
249 path.display(),
250 metadata.pid
251 );
252 warned_live_owner = true;
253 }
254
255 sleep_until_retry(deadline, config.poll_interval_ms)?;
256 }
257}
258
259fn create_new_lock(path: &Path, hostname: &str, config: LockConfig) -> io::Result<LockGuard> {
260 let now = now_ms();
261 let metadata = LockMetadata {
262 pid: std::process::id(),
263 hostname: hostname.to_string(),
264 created_at_ms: now,
265 heartbeat_at_ms: now,
266 };
267
268 create_lock_file_atomically(path, &metadata)?;
269
270 let shutdown = Arc::new(AtomicBool::new(false));
271 let (done_tx, done_rx) = mpsc::channel();
272 let heartbeat_path = path.to_path_buf();
273 let heartbeat_metadata = metadata.clone();
274 let heartbeat_shutdown = Arc::clone(&shutdown);
275 let heartbeat = thread::Builder::new()
276 .name("aft-fs-lock-heartbeat".to_string())
277 .spawn(move || {
278 run_heartbeat(
279 heartbeat_path,
280 heartbeat_metadata,
281 heartbeat_shutdown,
282 config,
283 );
284 let _ = done_tx.send(());
285 })?;
286
287 slog_info!("acquired filesystem lock at {}", path.display());
288
289 Ok(LockGuard {
290 path: path.to_path_buf(),
291 metadata,
292 shutdown,
293 heartbeat_done: done_rx,
294 heartbeat: Some(heartbeat),
295 })
296}
297
298fn run_heartbeat(
299 path: PathBuf,
300 owner: LockMetadata,
301 shutdown: Arc<AtomicBool>,
302 config: LockConfig,
303) {
304 let stale_intervals = config
309 .stale_heartbeat_ms
310 .checked_div(config.heartbeat_interval_ms.max(1))
311 .unwrap_or(3)
312 .max(1);
313 let mut consecutive_transient_failures: u64 = 0;
314
315 loop {
316 thread::park_timeout(Duration::from_millis(config.heartbeat_interval_ms));
317 if shutdown.load(Ordering::Acquire) {
318 return;
319 }
320
321 match heartbeat_once(&path, &owner) {
322 Ok(()) => {
323 if consecutive_transient_failures > 0 {
324 slog_info!(
325 "filesystem lock at {} heartbeat recovered after {} transient failure(s)",
326 path.display(),
327 consecutive_transient_failures
328 );
329 consecutive_transient_failures = 0;
330 }
331 }
332 Err(error) if heartbeat_error_is_terminal(&error) => {
333 slog_error!(
338 "{}; stopping heartbeat",
339 terminal_heartbeat_message(&path, &error)
340 );
341 return;
342 }
343 Err(error) => {
344 consecutive_transient_failures += 1;
354 log_transient_heartbeat_failure(
355 &path,
356 &transient_heartbeat_reason(&error),
357 consecutive_transient_failures,
358 stale_intervals,
359 );
360 }
361 }
362 }
363}
364
365fn heartbeat_error_is_terminal(error: &HeartbeatError) -> bool {
371 matches!(error, HeartbeatError::LockGone | HeartbeatError::NotOwner)
372}
373
374fn terminal_heartbeat_message(path: &Path, error: &HeartbeatError) -> String {
375 match error {
376 HeartbeatError::LockGone => {
377 format!("filesystem lock at {} disappeared", path.display())
378 }
379 HeartbeatError::NotOwner => format!(
380 "filesystem lock at {} is no longer owned by this guard",
381 path.display()
382 ),
383 HeartbeatError::Io(error) => {
385 format!("filesystem lock at {} I/O error: {error}", path.display())
386 }
387 HeartbeatError::Malformed(error) => {
388 format!(
389 "filesystem lock at {} became malformed: {error}",
390 path.display()
391 )
392 }
393 }
394}
395
396fn transient_heartbeat_reason(error: &HeartbeatError) -> String {
397 match error {
398 HeartbeatError::Io(error) => format!("I/O error: {error}"),
399 HeartbeatError::Malformed(error) => format!("became malformed: {error}"),
400 HeartbeatError::LockGone => "lock disappeared".to_string(),
401 HeartbeatError::NotOwner => "lock no longer owned".to_string(),
402 }
403}
404
405fn log_transient_heartbeat_failure(
410 path: &Path,
411 reason: &str,
412 consecutive_failures: u64,
413 stale_intervals: u64,
414) {
415 if consecutive_failures < stale_intervals {
416 slog_warn!(
417 "transient failure to heartbeat filesystem lock at {}: {}; retrying (attempt {})",
418 path.display(),
419 reason,
420 consecutive_failures
421 );
422 } else if consecutive_failures == stale_intervals {
423 slog_error!(
424 "filesystem lock at {} has failed {} consecutive heartbeats: {}; \
425 the lock may now be reclaimed by another owner — continuing to retry",
426 path.display(),
427 consecutive_failures,
428 reason
429 );
430 }
431}
432
433fn heartbeat_once(path: &Path, owner: &LockMetadata) -> Result<(), HeartbeatError> {
434 let mut metadata = match read_lock_metadata(path) {
435 Ok(metadata) => metadata,
436 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
437 return Err(HeartbeatError::LockGone);
438 }
439 Err(ReadLockError::Io(error)) => return Err(HeartbeatError::Io(error)),
440 Err(ReadLockError::Malformed(error)) => return Err(HeartbeatError::Malformed(error)),
441 };
442
443 if metadata.pid != owner.pid
444 || metadata.hostname != owner.hostname
445 || metadata.created_at_ms != owner.created_at_ms
446 {
447 return Err(HeartbeatError::NotOwner);
448 }
449
450 metadata.heartbeat_at_ms = now_ms();
451 atomic_write_lock_metadata(path, &metadata).map_err(HeartbeatError::Io)
452}
453
454#[derive(Debug)]
455enum HeartbeatError {
456 Io(io::Error),
457 LockGone,
458 Malformed(serde_json::Error),
459 NotOwner,
460}
461
462#[derive(Debug)]
463enum ReadLockError {
464 Io(io::Error),
465 Malformed(serde_json::Error),
466}
467
468fn read_lock_metadata(path: &Path) -> Result<LockMetadata, ReadLockError> {
469 let bytes = fs::read(path).map_err(ReadLockError::Io)?;
470 serde_json::from_slice(&bytes).map_err(ReadLockError::Malformed)
471}
472
473#[cfg(unix)]
474fn open_new_lock_file(path: &Path) -> io::Result<File> {
475 use std::os::unix::fs::OpenOptionsExt;
476
477 OpenOptions::new()
478 .write(true)
479 .create_new(true)
480 .mode(0o644)
481 .open(path)
482}
483
484#[cfg(not(unix))]
485fn open_new_lock_file(path: &Path) -> io::Result<File> {
486 OpenOptions::new().write(true).create_new(true).open(path)
487}
488
489fn write_lock_metadata_to_file(file: &mut File, metadata: &LockMetadata) -> io::Result<()> {
490 serde_json::to_writer(&mut *file, metadata).map_err(io::Error::other)?;
491 file.write_all(b"\n")?;
492 file.sync_all()
493}
494
495fn create_lock_file_atomically(path: &Path, metadata: &LockMetadata) -> io::Result<()> {
496 let tmp_path = temp_path_for_lock(path);
497 let result = (|| {
498 let mut file = open_new_lock_file(&tmp_path)?;
499 write_lock_metadata_to_file(&mut file, metadata)?;
500 drop(file);
501
502 fs::hard_link(&tmp_path, path)?;
503 sync_parent(path);
504 Ok(())
505 })();
506
507 let _ = fs::remove_file(&tmp_path);
508 result
509}
510
511fn atomic_write_lock_metadata(path: &Path, metadata: &LockMetadata) -> io::Result<()> {
512 let tmp_path = temp_path_for_lock(path);
513 let write_result = (|| {
514 let mut file = OpenOptions::new()
515 .write(true)
516 .create_new(true)
517 .open(&tmp_path)?;
518 write_lock_metadata_to_file(&mut file, metadata)?;
519 drop(file);
520
521 rename_over(&tmp_path, path)?;
522 sync_parent(path);
523 Ok(())
524 })();
525
526 if write_result.is_err() {
527 let _ = fs::remove_file(&tmp_path);
528 }
529
530 write_result
531}
532
533#[cfg(windows)]
534fn rename_over(from: &Path, to: &Path) -> io::Result<()> {
535 match fs::rename(from, to) {
543 Ok(()) => Ok(()),
544 Err(original) => match fs::copy(from, to) {
556 Ok(_) => {
557 let _ = fs::remove_file(from);
558 Ok(())
559 }
560 Err(_) => Err(original),
564 },
565 }
566}
567
568#[cfg(not(windows))]
569fn rename_over(from: &Path, to: &Path) -> io::Result<()> {
570 fs::rename(from, to)
571}
572
573static TEMP_LOCK_COUNTER: AtomicU64 = AtomicU64::new(0);
586
587fn temp_path_for_lock(path: &Path) -> PathBuf {
588 let file_name = path
589 .file_name()
590 .and_then(|name| name.to_str())
591 .unwrap_or("lock");
592 let seq = TEMP_LOCK_COUNTER.fetch_add(1, Ordering::Relaxed);
593 path.with_file_name(format!(
594 ".{file_name}.tmp.{}.{}.{}",
595 std::process::id(),
596 now_nanos(),
597 seq
598 ))
599}
600
601fn remove_lock_if_owned(path: &Path, owner: &LockMetadata) -> io::Result<bool> {
602 let metadata = match read_lock_metadata(path) {
603 Ok(metadata) => metadata,
604 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
605 return Ok(false);
606 }
607 Err(ReadLockError::Io(error)) => return Err(error),
608 Err(ReadLockError::Malformed(_)) => return Ok(false),
609 };
610
611 if metadata.pid == owner.pid
612 && metadata.hostname == owner.hostname
613 && metadata.created_at_ms == owner.created_at_ms
614 {
615 remove_lock_file(path)?;
616 Ok(true)
617 } else {
618 Ok(false)
619 }
620}
621
622fn remove_lock_file(path: &Path) -> io::Result<()> {
623 match fs::remove_file(path) {
624 Ok(()) => Ok(()),
625 Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(()),
626 Err(error) => Err(error),
627 }
628}
629
630fn reclaim_lock_file(path: &Path, judged: &LockMetadata) -> io::Result<bool> {
640 match read_lock_metadata(path) {
641 Ok(current) => {
642 if current.pid == judged.pid
643 && current.hostname == judged.hostname
644 && current.created_at_ms == judged.created_at_ms
645 {
646 remove_lock_file(path)?;
647 Ok(true)
648 } else {
649 Ok(false)
651 }
652 }
653 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => Ok(false),
655 Err(ReadLockError::Malformed(_)) => Ok(false),
657 Err(ReadLockError::Io(error)) => Err(error),
658 }
659}
660
661fn sleep_until_retry(deadline: Option<Instant>, poll_interval_ms: u64) -> Result<(), AcquireError> {
662 let poll = Duration::from_millis(poll_interval_ms);
663 let sleep_for = match deadline {
664 Some(deadline) => {
665 let now = Instant::now();
666 if now >= deadline {
667 return Err(AcquireError::Timeout);
668 }
669 poll.min(deadline.saturating_duration_since(now))
670 }
671 None => poll,
672 };
673 thread::sleep(sleep_for);
674 Ok(())
675}
676
677fn sync_parent(path: &Path) {
678 if let Some(parent) = path.parent() {
679 if let Ok(dir) = File::open(parent) {
680 let _ = dir.sync_all();
681 }
682 }
683}
684
685fn now_ms() -> u64 {
686 SystemTime::now()
687 .duration_since(UNIX_EPOCH)
688 .unwrap_or(Duration::ZERO)
689 .as_millis() as u64
690}
691
692fn now_nanos() -> u128 {
693 SystemTime::now()
694 .duration_since(UNIX_EPOCH)
695 .unwrap_or(Duration::ZERO)
696 .as_nanos()
697}
698
699#[cfg(unix)]
700fn current_hostname() -> String {
701 let mut buffer = [0u8; 256];
702 let result = unsafe { libc::gethostname(buffer.as_mut_ptr().cast(), buffer.len()) };
703 if result == 0 {
704 let len = buffer
705 .iter()
706 .position(|byte| *byte == 0)
707 .unwrap_or(buffer.len());
708 if len > 0 {
709 return String::from_utf8_lossy(&buffer[..len]).into_owned();
710 }
711 }
712
713 std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown-host".to_string())
714}
715
716#[cfg(windows)]
717fn current_hostname() -> String {
718 std::env::var("COMPUTERNAME")
719 .or_else(|_| std::env::var("HOSTNAME"))
720 .unwrap_or_else(|_| "unknown-host".to_string())
721}
722
723#[cfg(not(any(unix, windows)))]
724fn current_hostname() -> String {
725 std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown-host".to_string())
726}
727
728#[cfg(unix)]
729fn process_alive(pid: u32) -> bool {
730 if pid == 0 || pid > i32::MAX as u32 {
731 return false;
732 }
733
734 let result = unsafe { libc::kill(pid as libc::pid_t, 0) };
735 if result == 0 {
736 return true;
737 }
738
739 io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
740}
741
742#[cfg(windows)]
743fn process_alive(pid: u32) -> bool {
744 let filter = format!("PID eq {pid}");
745 let Ok(output) = std::process::Command::new("tasklist")
746 .args(["/FI", &filter, "/FO", "CSV", "/NH"])
747 .output()
748 else {
749 return true;
750 };
751
752 if !output.status.success() {
753 return true;
754 }
755
756 let stdout = String::from_utf8_lossy(&output.stdout);
757
758 if stdout.contains("No tasks are running") {
769 return false;
770 }
771 stdout.contains(&format!("\"{pid}\""))
772}
773
774#[cfg(not(any(unix, windows)))]
775fn process_alive(_pid: u32) -> bool {
776 true
777}
778
779#[cfg(test)]
780mod tests {
781 use super::*;
782 use std::sync::atomic::{AtomicUsize, Ordering};
783 use std::sync::{Arc, Barrier};
784
785 fn test_config() -> LockConfig {
786 LockConfig {
787 heartbeat_interval_ms: 25,
788 stale_heartbeat_ms: 2_000,
789 live_owner_warn_ms: LIVE_OWNER_WARN_MS,
790 poll_interval_ms: 10,
791 }
792 }
793
794 fn test_lock_path() -> (tempfile::TempDir, PathBuf) {
795 let dir = tempfile::tempdir().expect("create temp dir");
796 let path = dir.path().join("test.lock");
797 (dir, path)
798 }
799
800 fn write_synthetic_lock(path: &Path, metadata: &LockMetadata) {
801 let mut file = open_new_lock_file(path).expect("create synthetic lock");
802 write_lock_metadata_to_file(&mut file, metadata).expect("write synthetic lock");
803 }
804
805 fn synthetic_metadata(pid: u32, hostname: String, created_at_ms: u64) -> LockMetadata {
806 LockMetadata {
807 pid,
808 hostname,
809 created_at_ms,
810 heartbeat_at_ms: created_at_ms,
811 }
812 }
813
814 fn current_process_metadata() -> LockMetadata {
815 let now = now_ms();
816 synthetic_metadata(std::process::id(), current_hostname(), now)
817 }
818
819 #[test]
820 fn acquire_creates_lockfile_and_unlocks_on_drop() {
821 let (_dir, path) = test_lock_path();
822
823 let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
824 let metadata = read_lock_metadata(&path).expect("read lock metadata");
825 assert_eq!(metadata.pid, std::process::id());
826 assert_eq!(metadata.hostname, current_hostname());
827 assert_eq!(metadata.created_at_ms, guard.metadata.created_at_ms);
828
829 drop(guard);
830 assert!(!path.exists());
831 }
832
833 #[test]
834 fn reclaim_refuses_to_delete_a_different_owners_lock() {
835 let (_dir, path) = test_lock_path();
836
837 let owner_b = synthetic_metadata(4242, "host-b".to_string(), now_ms());
839 create_lock_file_atomically(&path, &owner_b).expect("write owner B lock");
840
841 let judged_a = synthetic_metadata(1111, "host-a".to_string(), now_ms() - 1_000_000);
844 let removed = reclaim_lock_file(&path, &judged_a).expect("reclaim");
845 assert!(!removed, "must not remove a different owner's lock");
846 assert!(path.exists(), "owner B's lock must survive");
847 let still = read_lock_metadata(&path).expect("still readable");
848 assert_eq!(still.pid, 4242, "owner B's lock intact");
849 }
850
851 #[test]
852 fn reclaim_deletes_when_identity_still_matches() {
853 let (_dir, path) = test_lock_path();
854 let owner = synthetic_metadata(1111, "host-a".to_string(), 5_000);
855 create_lock_file_atomically(&path, &owner).expect("write lock");
856
857 let removed = reclaim_lock_file(&path, &owner).expect("reclaim");
859 assert!(removed, "matching-identity stale lock should be removed");
860 assert!(!path.exists());
861
862 assert!(!reclaim_lock_file(&path, &owner).expect("reclaim missing"));
864 }
865
866 #[test]
867 fn acquire_serializes_concurrent_callers() {
868 let (_dir, path) = test_lock_path();
869 let path = Arc::new(path);
870 let barrier = Arc::new(Barrier::new(3));
871 let inside = Arc::new(AtomicUsize::new(0));
872 let entered = Arc::new(AtomicUsize::new(0));
873 let max_inside = Arc::new(AtomicUsize::new(0));
874
875 let mut handles = Vec::new();
876 for _ in 0..2 {
877 let path = Arc::clone(&path);
878 let barrier = Arc::clone(&barrier);
879 let inside = Arc::clone(&inside);
880 let entered = Arc::clone(&entered);
881 let max_inside = Arc::clone(&max_inside);
882 handles.push(thread::spawn(move || {
883 barrier.wait();
884 let guard = acquire_with_config(&path, Some(Duration::from_secs(2)), test_config())
885 .expect("thread acquire lock");
886 let previous = inside.fetch_add(1, Ordering::SeqCst);
887 assert_eq!(previous, 0, "two lock holders overlapped");
888 entered.fetch_add(1, Ordering::SeqCst);
889 max_inside.fetch_max(previous + 1, Ordering::SeqCst);
890 thread::sleep(Duration::from_millis(75));
891 inside.fetch_sub(1, Ordering::SeqCst);
892 drop(guard);
893 }));
894 }
895
896 barrier.wait();
897 for handle in handles {
898 handle.join().expect("join worker");
899 }
900
901 assert_eq!(entered.load(Ordering::SeqCst), 2);
902 assert_eq!(max_inside.load(Ordering::SeqCst), 1);
903 assert!(!path.exists());
904 }
905
906 #[test]
907 fn heartbeat_updates_lockfile_timestamp() {
908 let (_dir, path) = test_lock_path();
909 let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
910 let initial = read_lock_metadata(&path)
911 .expect("read initial metadata")
912 .heartbeat_at_ms;
913
914 let deadline = std::time::Instant::now() + Duration::from_millis(2_000);
931 let mut updated = initial;
932 while std::time::Instant::now() < deadline {
933 thread::sleep(Duration::from_millis(50));
934 match read_lock_metadata(&path) {
935 Ok(meta) => {
936 updated = meta.heartbeat_at_ms;
937 if updated > initial {
938 break;
939 }
940 }
941 Err(ReadLockError::Io(error)) if error.kind() == io::ErrorKind::NotFound => {
942 continue;
945 }
946 Err(other) => panic!("read updated metadata: {other:?}"),
947 }
948 }
949 assert!(
950 updated > initial,
951 "heartbeat timestamp did not advance within 2s"
952 );
953 drop(guard);
954 }
955
956 #[test]
957 fn dead_pid_lock_is_reclaimed() {
958 let (_dir, path) = test_lock_path();
959 let metadata = synthetic_metadata(999_999_999, current_hostname(), now_ms());
960 write_synthetic_lock(&path, &metadata);
961
962 let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
963 .expect("reclaim dead pid lock");
964 let metadata = read_lock_metadata(&path).expect("read reclaimed lock");
965 assert_eq!(metadata.pid, std::process::id());
966 drop(guard);
967 }
968
969 #[test]
970 fn stale_heartbeat_from_live_pid_blocks() {
971 let (_dir, path) = test_lock_path();
972 let mut metadata = current_process_metadata();
973 metadata.created_at_ms = now_ms().saturating_sub(60_000);
974 metadata.heartbeat_at_ms = now_ms().saturating_sub(60_000);
975 write_synthetic_lock(&path, &metadata);
976
977 let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
978 assert!(matches!(result, Err(AcquireError::Timeout)));
979 assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
980
981 remove_lock_file(&path).expect("cleanup synthetic lock");
982 }
983
984 #[test]
985 fn healthy_live_owner_blocks() {
986 let (_dir, path) = test_lock_path();
987 let metadata = current_process_metadata();
988 write_synthetic_lock(&path, &metadata);
989
990 let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
991 assert!(matches!(result, Err(AcquireError::Timeout)));
992
993 remove_lock_file(&path).expect("cleanup synthetic lock");
994 }
995
996 #[test]
997 fn malformed_lockfile_is_reclaimed() {
998 let (_dir, path) = test_lock_path();
999 fs::write(&path, b"not valid json").expect("write malformed lock");
1000
1001 let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
1002 .expect("reclaim malformed lock");
1003 let metadata = read_lock_metadata(&path).expect("read reclaimed lock");
1004 assert_eq!(metadata.pid, std::process::id());
1005 drop(guard);
1006 }
1007
1008 #[test]
1009 fn cross_host_lock_is_not_stolen_before_extended_stale_threshold() {
1010 let (_dir, path) = test_lock_path();
1011 let now = now_ms();
1012 let metadata = LockMetadata {
1013 pid: std::process::id(),
1014 hostname: format!("{}-other", current_hostname()),
1015 created_at_ms: now,
1016 heartbeat_at_ms: now,
1017 };
1018 write_synthetic_lock(&path, &metadata);
1019
1020 let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
1021 assert!(matches!(result, Err(AcquireError::Timeout)));
1022 assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
1023
1024 remove_lock_file(&path).expect("cleanup synthetic lock");
1025 }
1026
1027 #[test]
1028 fn stale_cross_host_lock_is_reclaimed_after_extended_threshold() {
1029 let (_dir, path) = test_lock_path();
1030 let stale_at =
1031 now_ms().saturating_sub(test_config().cross_host_stale_heartbeat_ms() + 1_000);
1032 let metadata = LockMetadata {
1033 pid: std::process::id(),
1034 hostname: format!("{}-other", current_hostname()),
1035 created_at_ms: stale_at,
1036 heartbeat_at_ms: stale_at,
1037 };
1038 write_synthetic_lock(&path, &metadata);
1039
1040 let guard = acquire_with_config(&path, Some(Duration::from_secs(1)), test_config())
1041 .expect("reclaim stale cross-host lock");
1042 let reclaimed = read_lock_metadata(&path).expect("read reclaimed lock");
1043 assert_eq!(reclaimed.hostname, current_hostname());
1044 assert_ne!(reclaimed.created_at_ms, metadata.created_at_ms);
1045 drop(guard);
1046 }
1047
1048 #[test]
1049 fn live_owner_over_10min_warns_but_blocks() {
1050 let (_dir, path) = test_lock_path();
1051 let mut metadata = current_process_metadata();
1052 metadata.created_at_ms = now_ms().saturating_sub(11 * 60 * 1_000);
1053 metadata.heartbeat_at_ms = now_ms();
1054 write_synthetic_lock(&path, &metadata);
1055
1056 let result = acquire_with_config(&path, Some(Duration::from_millis(80)), test_config());
1057 assert!(matches!(result, Err(AcquireError::Timeout)));
1058 assert_eq!(read_lock_metadata(&path).expect("read lock"), metadata);
1059
1060 remove_lock_file(&path).expect("cleanup synthetic lock");
1061 }
1062
1063 #[test]
1064 fn drop_stops_heartbeat_thread() {
1065 let (_dir, path) = test_lock_path();
1066 let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
1067 drop(guard);
1068
1069 thread::sleep(Duration::from_millis(
1070 test_config().heartbeat_interval_ms * 3,
1071 ));
1072 assert!(
1073 !path.exists(),
1074 "heartbeat recreated or kept updating lockfile"
1075 );
1076 }
1077
1078 #[test]
1079 fn heartbeat_error_classification_terminal_vs_transient() {
1080 assert!(heartbeat_error_is_terminal(&HeartbeatError::LockGone));
1082 assert!(heartbeat_error_is_terminal(&HeartbeatError::NotOwner));
1083 assert!(!heartbeat_error_is_terminal(&HeartbeatError::Io(
1086 io::Error::other("disk blip")
1087 )));
1088 let malformed: serde_json::Error =
1089 serde_json::from_str::<LockMetadata>("not json").unwrap_err();
1090 assert!(!heartbeat_error_is_terminal(&HeartbeatError::Malformed(
1091 malformed
1092 )));
1093 }
1094
1095 #[test]
1096 fn heartbeat_survives_transient_malformed_and_recovers() {
1097 let (_dir, path) = test_lock_path();
1105 let guard = acquire_with_config(&path, None, test_config()).expect("acquire lock");
1106 let owner = guard.metadata.clone();
1107
1108 fs::write(&path, b"{ not valid json").expect("corrupt lockfile");
1113
1114 thread::sleep(Duration::from_millis(
1117 test_config().heartbeat_interval_ms * 4,
1118 ));
1119
1120 let sentinel = now_ms().saturating_sub(1_000_000);
1131 let mut restored = owner.clone();
1132 restored.heartbeat_at_ms = sentinel;
1133 atomic_write_lock_metadata(&path, &restored).expect("atomically restore lock metadata");
1134
1135 let deadline = std::time::Instant::now() + Duration::from_millis(3_000);
1138 let mut recovered = false;
1139 while std::time::Instant::now() < deadline {
1140 thread::sleep(Duration::from_millis(25));
1141 match read_lock_metadata(&path) {
1142 Ok(meta)
1143 if meta.created_at_ms == owner.created_at_ms
1144 && meta.heartbeat_at_ms > sentinel =>
1145 {
1146 recovered = true;
1147 break;
1148 }
1149 _ => continue,
1150 }
1151 }
1152 assert!(
1153 recovered,
1154 "heartbeat did not recover after a transient malformed read — thread likely died"
1155 );
1156 drop(guard);
1157 }
1158}