use anyhow::Result;
use std::os::fd::OwnedFd;
use std::path::Path;
use super::FlockMode;
use super::holder::format_holder_list;
use super::primitives::try_flock;
use super::proc_locks::read_holders;
const FLOCK_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(100);
pub(crate) fn acquire_flock_with_timeout(
lock_path: &Path,
mode: FlockMode,
timeout: std::time::Duration,
context: &str,
remediation: Option<&str>,
) -> Result<OwnedFd> {
use anyhow::Context;
if let Some(parent) = lock_path.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("create lock subdirectory {}", parent.display()))?;
}
let deadline = std::time::Instant::now() + timeout;
loop {
match try_flock(lock_path, mode)? {
Some(fd) => return Ok(fd),
None => {
if std::time::Instant::now() >= deadline {
let holders = read_holders(lock_path).unwrap_or_default();
let kind_str = match mode {
FlockMode::Exclusive => "LOCK_EX",
FlockMode::Shared => "LOCK_SH",
};
let tail = remediation.map(|r| format!(" {r}")).unwrap_or_default();
anyhow::bail!(
"flock {kind_str} on {context} timed out after \
{timeout:?} (lockfile {lock_path}, holders: \
{holders}).{tail}",
lock_path = lock_path.display(),
holders = format_holder_list(&holders),
);
}
tracing::debug!("waiting on flock at {lock_path:?}");
std::thread::sleep(FLOCK_POLL_INTERVAL);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn acquire_flock_creates_parent_lazily() {
use tempfile::TempDir;
let tmp = TempDir::new().expect("tempdir");
let lockfile = tmp.path().join(".locks").join("fresh.lock");
assert!(
!tmp.path().join(".locks").exists(),
".locks/ must not exist before first acquire (sanity)",
);
let fd = acquire_flock_with_timeout(
&lockfile,
FlockMode::Exclusive,
std::time::Duration::from_secs(1),
"test",
None,
)
.expect("first acquire on fresh path must succeed");
assert!(
tmp.path().join(".locks").is_dir(),
"parent .locks/ must be created lazily on first acquire",
);
assert!(
lockfile.exists(),
"lockfile itself must materialize via try_flock's O_CREAT",
);
drop(fd);
}
#[test]
fn acquire_flock_times_out_when_peer_holds_lock() {
use tempfile::TempDir;
let tmp = TempDir::new().expect("tempdir");
let lockfile = tmp.path().join(".locks").join("contended.lock");
std::fs::create_dir_all(lockfile.parent().unwrap()).unwrap();
let _peer = try_flock(&lockfile, FlockMode::Exclusive)
.expect("peer flock attempt")
.expect("peer must acquire on a fresh lockfile");
let start = std::time::Instant::now();
let err = acquire_flock_with_timeout(
&lockfile,
FlockMode::Exclusive,
std::time::Duration::from_millis(300),
"contended-resource",
None,
)
.expect_err("acquire must fail while peer holds LOCK_EX");
let elapsed = start.elapsed();
assert!(
elapsed >= std::time::Duration::from_millis(250),
"acquire must wait ~timeout before erroring; elapsed={elapsed:?}",
);
let msg = format!("{err:#}");
assert!(
msg.contains("timed out"),
"error must surface the timeout cause; got: {msg}",
);
assert!(
msg.contains("LOCK_EX"),
"error must name the flock mode for operator triage; got: {msg}",
);
assert!(
msg.contains("contended-resource"),
"error must include the caller-supplied context; got: {msg}",
);
}
#[test]
fn acquire_flock_shared_timeout_names_lock_sh() {
use tempfile::TempDir;
let tmp = TempDir::new().expect("tempdir");
let lockfile = tmp.path().join(".locks").join("shared.lock");
std::fs::create_dir_all(lockfile.parent().unwrap()).unwrap();
let _peer = try_flock(&lockfile, FlockMode::Exclusive)
.expect("peer flock attempt")
.expect("peer must acquire on a fresh lockfile");
let err = acquire_flock_with_timeout(
&lockfile,
FlockMode::Shared,
std::time::Duration::from_millis(150),
"shared-test",
None,
)
.expect_err("shared acquire must fail under LOCK_EX peer");
let msg = format!("{err:#}");
assert!(
msg.contains("LOCK_SH"),
"shared-mode timeout must name LOCK_SH; got: {msg}",
);
}
#[test]
fn acquire_flock_remediation_appended_when_some() {
use tempfile::TempDir;
let tmp = TempDir::new().expect("tempdir");
let locks_dir = tmp.path().join(".locks");
std::fs::create_dir_all(&locks_dir).unwrap();
let hint = "Wait for peer or kill it, then retry.";
let lockfile_with = locks_dir.join("remediation_some.lock");
let _peer_with = try_flock(&lockfile_with, FlockMode::Exclusive)
.expect("peer (Some case) flock attempt")
.expect("peer (Some case) must acquire on a fresh lockfile");
let err_with = acquire_flock_with_timeout(
&lockfile_with,
FlockMode::Exclusive,
std::time::Duration::from_millis(120),
"rem-test",
Some(hint),
)
.expect_err("acquire must fail under LOCK_EX peer (Some case)");
let msg_with = format!("{err_with:#}");
assert!(
msg_with.contains(hint),
"Some(hint) must append the remediation; got: {msg_with}",
);
let lockfile_without = locks_dir.join("remediation_none.lock");
let _peer_without = try_flock(&lockfile_without, FlockMode::Exclusive)
.expect("peer (None case) flock attempt")
.expect("peer (None case) must acquire on a fresh lockfile");
let err_without = acquire_flock_with_timeout(
&lockfile_without,
FlockMode::Exclusive,
std::time::Duration::from_millis(120),
"rem-test",
None,
)
.expect_err("acquire must fail under LOCK_EX peer (None case)");
let msg_without = format!("{err_without:#}");
assert!(
!msg_without.contains(hint),
"None must NOT append the remediation; got: {msg_without}",
);
}
#[test]
fn acquire_flock_timeout_message_pins_eval_seam_substrings() {
use tempfile::TempDir;
for mode in [FlockMode::Shared, FlockMode::Exclusive] {
let tmp = TempDir::new().expect("tempdir");
let lockfile = tmp.path().join(".locks").join("seam.lock");
std::fs::create_dir_all(lockfile.parent().unwrap()).unwrap();
let _peer = try_flock(&lockfile, FlockMode::Exclusive)
.expect("peer flock attempt")
.expect("peer must acquire on a fresh lockfile");
let err = acquire_flock_with_timeout(
&lockfile,
mode,
std::time::Duration::from_millis(120),
"seam-test",
None,
)
.expect_err("acquire must time out under LOCK_EX peer");
let msg = format!("{err:#}");
assert!(
msg.contains("flock LOCK_"),
"rendered error must include the literal `flock LOCK_` \
prefix that eval.rs::is_flock_timeout_message keys on; \
mode={mode:?} got: {msg}",
);
assert!(
msg.contains("timed out after"),
"rendered error must include the literal `timed out \
after` substring that eval.rs::is_flock_timeout_message \
keys on; mode={mode:?} got: {msg}",
);
}
}
}