use anyhow::Result;
use serde::Serialize;
use std::os::fd::OwnedFd;
use std::path::{Path, PathBuf};
pub(crate) const CMDLINE_MAX_CHARS: usize = 100;
pub(crate) const NO_HOLDERS_RECORDED: &str = "<none recorded>";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FlockMode {
Exclusive,
Shared,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub struct HolderInfo {
pub pid: u32,
pub cmdline: String,
}
mod fs_magic {
pub(super) const NFS: i64 = 0x6969;
pub(super) const CIFS: i64 = 0xFF53_4D42;
pub(super) const SMB2: i64 = 0xFE53_4D42;
pub(super) const CEPH: i64 = 0x00c3_6400;
pub(super) const AFS: i64 = 0x6B41_4653;
pub(super) const FUSE: i64 = 0x6573_5546;
}
fn reject_remote_fs(path: &Path) -> Result<()> {
let target: &Path = if path.exists() {
path
} else {
path.parent().unwrap_or(Path::new("/"))
};
let sfs = match rustix::fs::statfs(target) {
Ok(s) => s,
Err(_) => return Ok(()),
};
classify_fs_magic(sfs.f_type as i64).map_err(|rejection| {
anyhow::anyhow!(
"{}: filesystem {rejection} Move the lockfile path to a \
local filesystem (tmpfs, ext4, xfs, btrfs, f2fs, bcachefs).",
path.display()
)
})
}
pub(crate) fn classify_fs_magic(magic: i64) -> Result<()> {
let (name, reason) = match magic {
fs_magic::NFS => (
"NFS",
"NFSv3 is advisory-only without an NLM peer; NFSv4 byte-range \
locking does not cover flock(2)",
),
fs_magic::CIFS | fs_magic::SMB2 => (
"CIFS/SMB",
"SMB does not emit /proc/locks entries; ktstr cannot enumerate \
peer holders",
),
fs_magic::CEPH => (
"CephFS",
"Ceph MDS does not participate in flock serialization between \
ktstr peers on distinct nodes",
),
fs_magic::AFS => ("AFS", "AFS does not support flock(2)"),
fs_magic::FUSE => (
"FUSE",
"flock reliability depends on the userspace server's op \
implementation",
),
_ => return Ok(()),
};
anyhow::bail!("{name} is not supported for ktstr lockfiles ({reason}).")
}
pub(crate) fn materialize<P: AsRef<Path>>(path: P) -> Result<()> {
use rustix::fs::{Mode, OFlags, open};
let path = path.as_ref();
reject_remote_fs(path)?;
let fd = open(
path,
OFlags::CREATE | OFlags::RDWR | OFlags::CLOEXEC,
Mode::from_raw_mode(0o666),
)
.map_err(|e| anyhow::anyhow!("materialize lockfile {}: {e}", path.display()))?;
drop(fd);
Ok(())
}
pub fn try_flock<P: AsRef<Path>>(path: P, mode: FlockMode) -> Result<Option<OwnedFd>> {
use rustix::fs::{FlockOperation, Mode, OFlags, flock, open};
let path = path.as_ref();
reject_remote_fs(path)?;
let fd = open(
path,
OFlags::CREATE | OFlags::RDWR | OFlags::CLOEXEC,
Mode::from_raw_mode(0o666),
)
.map_err(|e| anyhow::anyhow!("open {}: {e}", path.display()))?;
let op = match mode {
FlockMode::Exclusive => FlockOperation::NonBlockingLockExclusive,
FlockMode::Shared => FlockOperation::NonBlockingLockShared,
};
match flock(&fd, op) {
Ok(()) => Ok(Some(fd)),
Err(e) if e == rustix::io::Errno::WOULDBLOCK => Ok(None),
Err(e) => anyhow::bail!("flock {}: {e}", path.display()),
}
}
pub(crate) fn read_holders_for_needle(needle: &str) -> Result<Vec<HolderInfo>> {
use anyhow::Context;
use std::fs;
let contents = fs::read_to_string("/proc/locks")
.with_context(|| "read /proc/locks for lockfile holder lookup")?;
Ok(read_holders_from_contents(&contents, needle))
}
pub(crate) fn read_holders_from_contents(contents: &str, needle: &str) -> Vec<HolderInfo> {
let pids = parse_flock_pids_for_needle(contents, needle);
pids.into_iter().map(holder_info_for_pid).collect()
}
pub(crate) fn parse_flock_pids_for_needle(contents: &str, needle: &str) -> Vec<u32> {
let mut pids: Vec<u32> = Vec::new();
for line in contents.lines() {
let mut fields = line.split_whitespace();
let _id = fields.next();
let lock_type = fields.next();
if lock_type != Some("FLOCK") {
continue;
}
let _adv = fields.next();
let _mode = fields.next();
let pid = match fields.next().and_then(|s| s.parse::<u32>().ok()) {
Some(p) => p,
None => continue,
};
let dev_inode = match fields.next() {
Some(s) => s,
None => continue,
};
if dev_inode == needle && !pids.contains(&pid) {
pids.push(pid);
}
}
pids
}
pub(crate) fn read_holders(path: &Path) -> Result<Vec<HolderInfo>> {
let needle = needle_from_path(path)?;
read_holders_for_needle(&needle)
}
pub(crate) fn read_holders_with_mountinfo(path: &Path, mountinfo: &str) -> Result<Vec<HolderInfo>> {
let needle = needle_from_path_with_mountinfo(path, mountinfo)?;
read_holders_for_needle(&needle)
}
pub(crate) fn read_mountinfo() -> Result<String> {
use anyhow::Context;
std::fs::read_to_string("/proc/self/mountinfo").context("read /proc/self/mountinfo")
}
pub(crate) fn needle_from_path(path: &Path) -> Result<String> {
let mountinfo = read_mountinfo()?;
needle_from_path_with_mountinfo(path, &mountinfo)
}
pub(crate) fn needle_from_path_with_mountinfo(path: &Path, mountinfo: &str) -> Result<String> {
use anyhow::Context;
use std::fs;
use std::os::unix::fs::MetadataExt;
let meta = fs::metadata(path)
.with_context(|| format!("stat lockfile {} for holder lookup", path.display()))?;
let inode = meta.ino();
let (major, minor) =
mount_major_minor_for_path_with_contents(path, mountinfo).with_context(|| {
format!(
"resolve kernel major:minor for {} via /proc/self/mountinfo",
path.display()
)
})?;
Ok(format!("{major:02x}:{minor:02x}:{inode}"))
}
fn mount_major_minor_for_path_with_contents(path: &Path, contents: &str) -> Result<(u32, u32)> {
use std::fs;
let canon: PathBuf = fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf());
mount_major_minor_for_path_from_contents(contents, &canon)
}
pub(crate) fn mount_major_minor_for_path_from_contents(
contents: &str,
path: &Path,
) -> Result<(u32, u32)> {
let mut best: Option<(usize, u32, u32)> = None;
for line in contents.lines() {
let mut fields = line.split_whitespace();
let _mount_id = fields.next();
let _parent_id = fields.next();
let major_minor = match fields.next() {
Some(s) => s,
None => continue,
};
let _root = fields.next();
let mount_point_raw = match fields.next() {
Some(s) => s,
None => continue,
};
let mount_point = unescape_mountinfo_field(mount_point_raw);
if !path_starts_with(path, Path::new(mount_point.as_ref())) {
continue;
}
let (major, minor) = match parse_major_minor(major_minor) {
Some(mm) => mm,
None => continue,
};
let len = mount_point.len();
if best.is_none_or(|(best_len, _, _)| len > best_len) {
best = Some((len, major, minor));
}
}
match best {
Some((_, major, minor)) => Ok((major, minor)),
None => anyhow::bail!(
"no mountinfo entry covers {} — is /proc mounted?",
path.display()
),
}
}
fn unescape_mountinfo_field(raw: &str) -> std::borrow::Cow<'_, str> {
if !raw.contains('\\') {
return std::borrow::Cow::Borrowed(raw);
}
let bytes = raw.as_bytes();
let mut out: Vec<u8> = Vec::with_capacity(bytes.len());
let mut i = 0;
while i < bytes.len() {
if bytes[i] == b'\\'
&& i + 3 < bytes.len()
&& is_octal_digit(bytes[i + 1])
&& is_octal_digit(bytes[i + 2])
&& is_octal_digit(bytes[i + 3])
{
let val = ((bytes[i + 1] - b'0') as u16) << 6
| ((bytes[i + 2] - b'0') as u16) << 3
| (bytes[i + 3] - b'0') as u16;
if val <= 0xff {
out.push(val as u8);
i += 4;
} else {
out.push(bytes[i]);
i += 1;
}
} else {
out.push(bytes[i]);
i += 1;
}
}
std::borrow::Cow::Owned(String::from_utf8_lossy(&out).into_owned())
}
#[inline]
fn is_octal_digit(b: u8) -> bool {
(b'0'..=b'7').contains(&b)
}
fn path_starts_with(path: &Path, prefix: &Path) -> bool {
path.starts_with(prefix)
}
fn parse_major_minor(s: &str) -> Option<(u32, u32)> {
let (maj, min) = s.split_once(':')?;
Some((maj.parse().ok()?, min.parse().ok()?))
}
fn holder_info_for_pid(pid: u32) -> HolderInfo {
let raw = match std::fs::read(format!("/proc/{pid}/cmdline")) {
Ok(bytes) => bytes,
Err(_) => {
return HolderInfo {
pid,
cmdline: "<cmdline unavailable>".to_string(),
};
}
};
let text: String = String::from_utf8_lossy(&raw)
.chars()
.map(|c| if c == '\0' { ' ' } else { c })
.collect::<String>()
.trim_end()
.to_string();
let truncated = if text.chars().count() > CMDLINE_MAX_CHARS {
let head: String = text.chars().take(CMDLINE_MAX_CHARS).collect();
format!("{head}…")
} else if text.is_empty() {
"<cmdline unavailable>".to_string()
} else {
text
};
HolderInfo {
pid,
cmdline: truncated,
}
}
pub fn format_holder_list(holders: &[HolderInfo]) -> String {
if holders.is_empty() {
NO_HOLDERS_RECORDED.to_string()
} else {
holders
.iter()
.map(|h| format!(" pid={} cmd={}", h.pid, h.cmdline))
.collect::<Vec<_>>()
.join("\n")
}
}
pub(crate) const LOCK_DIR_NAME: &str = ".locks";
pub(crate) const FLOCK_POLL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(100);
pub(crate) fn acquire_flock_with_timeout(
lock_path: &Path,
mode: FlockMode,
timeout: std::time::Duration,
context: &str,
remediation: Option<&str>,
) -> Result<OwnedFd> {
use anyhow::Context;
if let Some(parent) = lock_path.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("create lock subdirectory {}", parent.display()))?;
}
let deadline = std::time::Instant::now() + timeout;
loop {
match try_flock(lock_path, mode)? {
Some(fd) => return Ok(fd),
None => {
if std::time::Instant::now() >= deadline {
let holders = read_holders(lock_path).unwrap_or_default();
let kind_str = match mode {
FlockMode::Exclusive => "LOCK_EX",
FlockMode::Shared => "LOCK_SH",
};
let tail = remediation.map(|r| format!(" {r}")).unwrap_or_default();
anyhow::bail!(
"flock {kind_str} on {context} timed out after \
{timeout:?} (lockfile {lock_path}, holders: \
{holders}).{tail}",
lock_path = lock_path.display(),
holders = format_holder_list(&holders),
);
}
tracing::debug!("waiting on flock at {lock_path:?}");
std::thread::sleep(FLOCK_POLL_INTERVAL);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn fuse_magic_matches_linux_magic_h() {
assert_eq!(fs_magic::FUSE, 0x65735546);
}
#[test]
fn afs_magic_matches_in_tree_kafs() {
assert_eq!(fs_magic::AFS, 0x6B414653);
}
#[test]
fn classify_fs_magic_rejects_nfs() {
let err = classify_fs_magic(fs_magic::NFS).unwrap_err();
let msg = format!("{err:#}");
assert!(msg.contains("NFS"), "err={msg}");
assert!(
msg.contains("NFSv3"),
"err must name NFSv3 in reason: {msg}"
);
assert!(
msg.contains("is not supported"),
"err must contain the canonical rejection phrase: {msg}",
);
}
#[test]
fn classify_fs_magic_rejects_cifs_and_smb2() {
let err_cifs = classify_fs_magic(fs_magic::CIFS).unwrap_err();
let err_smb2 = classify_fs_magic(fs_magic::SMB2).unwrap_err();
let cifs_msg = format!("{err_cifs:#}");
let smb2_msg = format!("{err_smb2:#}");
assert!(cifs_msg.contains("CIFS/SMB"));
assert!(smb2_msg.contains("CIFS/SMB"));
assert!(
cifs_msg.contains("/proc/locks"),
"err must cite /proc/locks: {cifs_msg}",
);
assert!(
smb2_msg.contains("/proc/locks"),
"err must cite /proc/locks: {smb2_msg}",
);
assert!(
cifs_msg.contains("is not supported"),
"err must contain the canonical rejection phrase: {cifs_msg}",
);
assert!(
smb2_msg.contains("is not supported"),
"err must contain the canonical rejection phrase: {smb2_msg}",
);
}
#[test]
fn classify_fs_magic_rejects_ceph() {
let err = classify_fs_magic(fs_magic::CEPH).unwrap_err();
let msg = format!("{err:#}");
assert!(msg.contains("CephFS"));
assert!(msg.contains("MDS"), "err must name Ceph MDS: {msg}");
assert!(
msg.contains("is not supported"),
"err must contain the canonical rejection phrase: {msg}",
);
}
#[test]
fn classify_fs_magic_rejects_afs() {
let err = classify_fs_magic(fs_magic::AFS).unwrap_err();
let msg = format!("{err:#}");
assert!(msg.contains("AFS"));
assert!(msg.contains("flock(2)"), "err must cite flock(2): {msg}");
assert!(
msg.contains("is not supported"),
"err must contain the canonical rejection phrase: {msg}",
);
}
#[test]
fn classify_fs_magic_rejects_fuse() {
let err = classify_fs_magic(fs_magic::FUSE).unwrap_err();
let msg = format!("{err:#}");
assert!(msg.contains("FUSE"));
assert!(
msg.contains("userspace server"),
"err must name userspace server: {msg}",
);
assert!(
msg.contains("is not supported"),
"err must contain the canonical rejection phrase: {msg}",
);
}
#[test]
fn classify_fs_magic_accepts_local_filesystems() {
classify_fs_magic(0x01021994).expect("tmpfs accepted");
classify_fs_magic(0xEF53).expect("ext4 accepted");
classify_fs_magic(0x58465342).expect("xfs accepted");
classify_fs_magic(0x9123683E).expect("btrfs accepted");
classify_fs_magic(0xF2F52010).expect("f2fs accepted");
classify_fs_magic(0xCA451A4E).expect("bcachefs accepted");
classify_fs_magic(0xDEAD_BEEF).expect("unknown magic accepted");
}
#[test]
fn mountinfo_single_mount_hits_right_major_minor() {
let mountinfo = "\
22 28 0:21 / /tmp rw,nosuid,nodev shared:5 - tmpfs tmpfs rw,size=8g
";
let (major, minor) =
mount_major_minor_for_path_from_contents(mountinfo, Path::new("/tmp/ktstr-llc-0.lock"))
.expect("tmp mount covers the lockfile path");
assert_eq!((major, minor), (0, 21));
}
#[test]
fn mountinfo_longest_prefix_wins_for_bind_over_tmpfs() {
let mountinfo = "\
22 28 0:21 / /tmp rw,nosuid,nodev shared:5 - tmpfs tmpfs rw,size=8g
35 22 0:99 / /tmp/ktstr-cache rw,nosuid - tmpfs tmpfs rw,size=1g
";
let (major, minor) = mount_major_minor_for_path_from_contents(
mountinfo,
Path::new("/tmp/ktstr-cache/entry.lock"),
)
.expect("bind mount wins longest-prefix match");
assert_eq!((major, minor), (0, 99), "bind's major:minor expected");
}
#[test]
fn mountinfo_uncovered_path_errors() {
let mountinfo = "\
22 28 0:21 / /tmp rw - tmpfs tmpfs rw
";
let err = mount_major_minor_for_path_from_contents(
mountinfo,
Path::new("/var/log/unrelated.lock"),
)
.expect_err("no mountinfo entry covers /var/log/...");
let msg = format!("{err:#}");
assert!(msg.contains("no mountinfo entry covers"), "msg={msg}");
}
#[test]
fn mountinfo_respects_component_boundary() {
let mountinfo = "\
22 28 0:21 / /tmp rw - tmpfs tmpfs rw
35 22 0:99 / /tmp/foobar rw - tmpfs tmpfs rw
";
let (major, minor) =
mount_major_minor_for_path_from_contents(mountinfo, Path::new("/tmp/foo/entry.lock"))
.expect("path under /tmp (not /tmp/foobar) resolves to the tmp mount");
assert_eq!(
(major, minor),
(0, 21),
"/tmp/foo must NOT match the /tmp/foobar mount",
);
let (major, minor) = mount_major_minor_for_path_from_contents(
mountinfo,
Path::new("/tmp/foobar/entry.lock"),
)
.expect("path under /tmp/foobar resolves to the /tmp/foobar mount");
assert_eq!(
(major, minor),
(0, 99),
"/tmp/foobar/ must match the /tmp/foobar mount, not the /tmp one",
);
}
#[test]
fn mountinfo_skips_malformed_major_minor() {
let mountinfo = "\
22 28 BAD:NUMBER / /tmp rw - tmpfs tmpfs rw
35 28 0:42 / /tmp rw - tmpfs tmpfs rw
";
let (major, minor) =
mount_major_minor_for_path_from_contents(mountinfo, Path::new("/tmp/entry.lock"))
.expect("second (valid) line still matches after malformed first");
assert_eq!((major, minor), (0, 42));
}
#[test]
fn mountinfo_skips_truncated_lines() {
let mountinfo = "\
22 28 0:21
35 28 0:42 / /tmp rw - tmpfs tmpfs rw
";
let (major, minor) =
mount_major_minor_for_path_from_contents(mountinfo, Path::new("/tmp/entry.lock"))
.expect("truncated line skipped; second line matches");
assert_eq!((major, minor), (0, 42));
}
#[test]
fn mountinfo_unescapes_space_in_mount_point() {
let mountinfo = "\
22 28 0:77 / /mnt/my\\040dir rw,nosuid - tmpfs tmpfs rw
";
let (major, minor) = mount_major_minor_for_path_from_contents(
mountinfo,
Path::new("/mnt/my dir/cache.lock"),
)
.expect(
"mount point with `\\040`-escaped space must unescape to real \
space and match the query path's literal space",
);
assert_eq!((major, minor), (0, 77));
}
#[test]
fn mountinfo_unescapes_tab_in_mount_point() {
let mountinfo = "\
22 28 0:78 / /mnt/tab\\011dir rw,nosuid - tmpfs tmpfs rw
";
let (major, minor) = mount_major_minor_for_path_from_contents(
mountinfo,
Path::new("/mnt/tab\tdir/cache.lock"),
)
.expect("mount point with `\\011` must unescape to real tab");
assert_eq!((major, minor), (0, 78));
}
#[test]
fn mountinfo_unescapes_backslash_in_mount_point() {
let mountinfo = "\
22 28 0:79 / /mnt/bs\\134dir rw,nosuid - tmpfs tmpfs rw
";
let (major, minor) = mount_major_minor_for_path_from_contents(
mountinfo,
Path::new("/mnt/bs\\dir/cache.lock"),
)
.expect("mount point with `\\134` must unescape to real backslash");
assert_eq!((major, minor), (0, 79));
}
#[test]
fn unescape_mountinfo_field_borrows_when_no_escapes() {
let raw = "/tmp";
let decoded = unescape_mountinfo_field(raw);
match decoded {
std::borrow::Cow::Borrowed(b) => assert_eq!(b, raw),
std::borrow::Cow::Owned(_) => {
panic!("unescape must return Cow::Borrowed when input has no `\\`")
}
}
}
#[test]
fn unescape_mountinfo_field_handles_multiple_escapes() {
let raw = "/a\\040b\\011c";
let decoded = unescape_mountinfo_field(raw);
assert_eq!(decoded.as_ref(), "/a b\tc");
}
#[test]
fn unescape_mountinfo_field_preserves_non_octal_backslash() {
let raw = "/bad\\9suffix";
let decoded = unescape_mountinfo_field(raw);
assert_eq!(decoded.as_ref(), "/bad\\9suffix");
let raw = "/trunc\\04";
let decoded = unescape_mountinfo_field(raw);
assert_eq!(decoded.as_ref(), "/trunc\\04");
}
#[test]
fn unescape_mountinfo_field_preserves_out_of_range_octal() {
let decoded = unescape_mountinfo_field("\\377");
assert_eq!(decoded.as_ref(), "\u{FFFD}");
let decoded = unescape_mountinfo_field("\\400");
assert_eq!(decoded.as_ref(), "\\400");
let decoded = unescape_mountinfo_field("\\777");
assert_eq!(decoded.as_ref(), "\\777");
}
#[test]
fn is_octal_digit_rejects_8_and_9() {
for b in b'0'..=b'7' {
assert!(is_octal_digit(b), "byte 0x{b:02x} must be octal");
}
assert!(!is_octal_digit(b'8'), "byte 0x38 must NOT be octal");
assert!(!is_octal_digit(b'9'), "byte 0x39 must NOT be octal");
assert!(!is_octal_digit(b'a'), "non-digit must NOT be octal");
assert!(!is_octal_digit(b'/'), "byte before '0' must NOT be octal");
}
#[test]
fn path_starts_with_respects_component_boundary() {
assert!(
path_starts_with(Path::new("/tmp/foo"), Path::new("/tmp")),
"/tmp/foo must start with /tmp",
);
assert!(
path_starts_with(Path::new("/tmp/foo/bar"), Path::new("/tmp/foo")),
"/tmp/foo/bar must start with /tmp/foo (deeper component path)",
);
assert!(
!path_starts_with(Path::new("/tmp/foobar"), Path::new("/tmp/foo")),
"/tmp/foobar must NOT start with /tmp/foo (component boundary)",
);
assert!(
path_starts_with(Path::new("/tmp"), Path::new("/tmp")),
"/tmp must start with itself (identity)",
);
assert!(
!path_starts_with(Path::new("/"), Path::new("/tmp")),
"/ is a parent of /tmp, not a child — must NOT match",
);
}
#[test]
fn parse_major_minor_happy_path() {
assert_eq!(parse_major_minor("0:21"), Some((0, 21)));
assert_eq!(parse_major_minor("259:3"), Some((259, 3)));
}
#[test]
fn parse_major_minor_missing_colon() {
assert_eq!(parse_major_minor("notvalid"), None);
assert_eq!(parse_major_minor(""), None);
}
#[test]
fn parse_major_minor_non_numeric() {
assert_eq!(parse_major_minor("abc:21"), None);
assert_eq!(parse_major_minor("0:xyz"), None);
assert_eq!(parse_major_minor(":"), None);
}
#[test]
fn parse_major_minor_negative_numbers() {
assert_eq!(parse_major_minor("-1:0"), None);
assert_eq!(parse_major_minor("0:-1"), None);
}
#[test]
fn format_holder_list_empty_yields_sentinel() {
assert_eq!(format_holder_list(&[]), NO_HOLDERS_RECORDED);
}
#[test]
fn format_holder_list_single_holder() {
let holders = [HolderInfo {
pid: 12345,
cmdline: "cargo build".to_string(),
}];
assert_eq!(format_holder_list(&holders), " pid=12345 cmd=cargo build");
}
#[test]
fn format_holder_list_multiple_newline_separated() {
let holders = [
HolderInfo {
pid: 1,
cmdline: "a".to_string(),
},
HolderInfo {
pid: 2,
cmdline: "b".to_string(),
},
];
let out = format_holder_list(&holders);
assert!(out.contains("\n"), "must contain newline: {out}");
assert!(!out.contains(", "), "must NOT contain comma-space: {out}");
assert_eq!(out, " pid=1 cmd=a\n pid=2 cmd=b");
}
#[test]
fn parse_flock_pids_for_needle_skips_posix_and_ofdlck() {
let needle = "08:02:1234";
let contents = "\
1: POSIX ADVISORY WRITE 11111 08:02:1234 0 EOF
2: OFDLCK ADVISORY READ 22222 08:02:1234 0 EOF
3: FLOCK ADVISORY WRITE 33333 08:02:1234 0 EOF
4: FLOCK ADVISORY READ 44444 08:02:5678 0 EOF
";
let pids = parse_flock_pids_for_needle(contents, needle);
assert_eq!(
pids,
vec![33333],
"only the FLOCK line at the matching triple must contribute a PID; \
POSIX/OFDLCK must be filtered",
);
}
#[test]
fn parse_flock_pids_for_needle_deduplicates_pids() {
let needle = "08:02:1234";
let contents = "\
1: FLOCK ADVISORY WRITE 55555 08:02:1234 0 EOF
2: FLOCK ADVISORY READ 55555 08:02:1234 0 EOF
3: FLOCK ADVISORY WRITE 66666 08:02:1234 0 EOF
";
let pids = parse_flock_pids_for_needle(contents, needle);
assert_eq!(pids, vec![55555, 66666], "PIDs must dedupe");
}
#[test]
fn parse_flock_pids_for_needle_empty_contents_returns_empty() {
let pids = parse_flock_pids_for_needle("", "08:02:1234");
assert!(pids.is_empty());
}
#[test]
fn parse_flock_pids_for_needle_skips_malformed_lines() {
let needle = "08:02:1234";
let contents = "\
1: FLOCK
2: FLOCK ADVISORY WRITE notanumber 08:02:1234 0 EOF
3: FLOCK ADVISORY WRITE 77777 08:02:1234 0 EOF
";
let pids = parse_flock_pids_for_needle(contents, needle);
assert_eq!(
pids,
vec![77777],
"only the well-formed matching line contributes",
);
}
#[test]
fn read_holders_from_contents_returns_holder_info_per_matching_pid() {
let our_pid = std::process::id();
let needle = "08:02:1234";
let contents = format!(
"1: FLOCK ADVISORY WRITE {our_pid} 08:02:1234 0 EOF\n\
2: POSIX ADVISORY WRITE 11111 08:02:1234 0 EOF\n",
);
let holders = read_holders_from_contents(&contents, needle);
assert_eq!(
holders.len(),
1,
"only the FLOCK line at the matching triple produces a holder; \
POSIX must be filtered: {holders:?}",
);
assert_eq!(holders[0].pid, our_pid);
assert_ne!(holders[0].cmdline, "<cmdline unavailable>");
}
#[test]
fn read_holders_from_contents_empty_returns_empty() {
let holders = read_holders_from_contents("", "08:02:1234");
assert!(holders.is_empty());
}
#[test]
fn read_holders_from_contents_deterministic_for_same_input() {
let contents = format!(
"1: FLOCK ADVISORY WRITE {pid} 08:02:1234 0 EOF\n",
pid = std::process::id(),
);
let a = read_holders_from_contents(&contents, "08:02:1234");
let b = read_holders_from_contents(&contents, "08:02:1234");
assert_eq!(a.len(), b.len());
assert_eq!(a.len(), 1);
assert_eq!(a[0].pid, b[0].pid);
assert_eq!(a[0].cmdline, b[0].cmdline);
}
#[test]
fn read_holders_for_needle_no_match_returns_empty() {
let needle = "ff:ff:18446744073709551615";
let holders = read_holders_for_needle(needle)
.expect("/proc/locks read must succeed on any Linux host");
assert!(
holders.is_empty(),
"impossible needle must not match any holder: {holders:?}"
);
}
#[test]
fn needle_cached_mountinfo_equals_uncached() {
use tempfile::TempDir;
let tmp = TempDir::new().expect("tempdir");
let path = tmp.path().join("cache-equivalence.lock");
materialize(&path).expect("materialize lockfile");
let uncached = needle_from_path(&path).expect("uncached needle");
let mountinfo = read_mountinfo().expect("read mountinfo");
let cached = needle_from_path_with_mountinfo(&path, &mountinfo).expect("cached needle");
assert_eq!(
cached, uncached,
"cached and uncached paths must produce byte-identical needles \
for the same lockfile. Divergence means DISCOVER's /proc/locks \
lookup would miss holders the one-shot path would see. \
uncached={uncached} cached={cached}",
);
}
#[test]
fn read_holders_cached_mountinfo_equals_uncached() {
use tempfile::TempDir;
let tmp = TempDir::new().expect("tempdir");
let path = tmp.path().join("cache-holder-equivalence.lock");
let fd = try_flock(&path, FlockMode::Exclusive)
.expect("try_flock must succeed on fresh tempfile")
.expect("EX must acquire on clean pool");
let uncached = read_holders(&path).expect("uncached holders");
let mountinfo = read_mountinfo().expect("read mountinfo");
let cached = read_holders_with_mountinfo(&path, &mountinfo).expect("cached holders");
let our_pid = std::process::id();
assert!(
uncached.iter().any(|h| h.pid == our_pid),
"our pid {our_pid} must appear in uncached holders {uncached:?}",
);
assert!(
cached.iter().any(|h| h.pid == our_pid),
"our pid {our_pid} must appear in cached holders {cached:?}",
);
drop(fd);
}
#[test]
fn mount_major_minor_wrapper_matches_parser_seam() {
let mountinfo = "\
22 28 0:21 / /tmp rw,nosuid,nodev shared:5 - tmpfs tmpfs rw,size=8g
";
let path = Path::new("/tmp");
let (wrapper_major, wrapper_minor) =
mount_major_minor_for_path_with_contents(path, mountinfo)
.expect("wrapper must resolve /tmp under synthetic mountinfo");
let (parser_major, parser_minor) =
mount_major_minor_for_path_from_contents(mountinfo, path)
.expect("parser seam must resolve /tmp");
assert_eq!(
(wrapper_major, wrapper_minor),
(parser_major, parser_minor),
"wrapper + parser must produce the same (major, minor) for the \
same (path, mountinfo). Divergence means the cached DISCOVER \
path is reading different mount state than the uncached \
one-shot path would.",
);
assert_eq!((wrapper_major, wrapper_minor), (0, 21));
}
#[test]
fn holder_info_json_keys_are_snake_case() {
let holder = HolderInfo {
pid: 123,
cmdline: "bash".to_string(),
};
let val = serde_json::to_value(&holder).expect("serialize");
assert_eq!(val["pid"], serde_json::json!(123));
assert_eq!(val["cmdline"], serde_json::json!("bash"));
assert!(
val.get("cmdLine").is_none(),
"camelCase cmdLine must not appear: {val}",
);
}
#[test]
fn try_flock_sets_cloexec_on_returned_fd() {
use std::os::fd::AsRawFd;
use tempfile::TempDir;
let tmp = TempDir::new().expect("tempdir");
let path = tmp.path().join("cloexec.lock");
let fd = try_flock(&path, FlockMode::Exclusive)
.expect("try_flock must succeed on fresh tempfile")
.expect("EX must acquire on clean pool");
let flags = unsafe { libc::fcntl(fd.as_raw_fd(), libc::F_GETFD) };
assert!(
flags >= 0,
"fcntl F_GETFD must succeed on our fd; got errno={}",
std::io::Error::last_os_error(),
);
assert_eq!(
flags & libc::FD_CLOEXEC,
libc::FD_CLOEXEC,
"FD_CLOEXEC must be set on try_flock-returned fd; \
flags=0x{flags:x}. Without it, exec'd children \
inherit the flock and produce phantom holders.",
);
drop(fd);
}
#[test]
fn acquire_flock_creates_parent_lazily() {
use tempfile::TempDir;
let tmp = TempDir::new().expect("tempdir");
let lockfile = tmp.path().join(".locks").join("fresh.lock");
assert!(
!tmp.path().join(".locks").exists(),
".locks/ must not exist before first acquire (sanity)",
);
let fd = acquire_flock_with_timeout(
&lockfile,
FlockMode::Exclusive,
std::time::Duration::from_secs(1),
"test",
None,
)
.expect("first acquire on fresh path must succeed");
assert!(
tmp.path().join(".locks").is_dir(),
"parent .locks/ must be created lazily on first acquire",
);
assert!(
lockfile.exists(),
"lockfile itself must materialize via try_flock's O_CREAT",
);
drop(fd);
}
#[test]
fn acquire_flock_times_out_when_peer_holds_lock() {
use tempfile::TempDir;
let tmp = TempDir::new().expect("tempdir");
let lockfile = tmp.path().join(".locks").join("contended.lock");
std::fs::create_dir_all(lockfile.parent().unwrap()).unwrap();
let _peer = try_flock(&lockfile, FlockMode::Exclusive)
.expect("peer flock attempt")
.expect("peer must acquire on a fresh lockfile");
let start = std::time::Instant::now();
let err = acquire_flock_with_timeout(
&lockfile,
FlockMode::Exclusive,
std::time::Duration::from_millis(300),
"contended-resource",
None,
)
.expect_err("acquire must fail while peer holds LOCK_EX");
let elapsed = start.elapsed();
assert!(
elapsed >= std::time::Duration::from_millis(250),
"acquire must wait ~timeout before erroring; elapsed={elapsed:?}",
);
let msg = format!("{err:#}");
assert!(
msg.contains("timed out"),
"error must surface the timeout cause; got: {msg}",
);
assert!(
msg.contains("LOCK_EX"),
"error must name the flock mode for operator triage; got: {msg}",
);
assert!(
msg.contains("contended-resource"),
"error must include the caller-supplied context; got: {msg}",
);
}
#[test]
fn acquire_flock_shared_timeout_names_lock_sh() {
use tempfile::TempDir;
let tmp = TempDir::new().expect("tempdir");
let lockfile = tmp.path().join(".locks").join("shared.lock");
std::fs::create_dir_all(lockfile.parent().unwrap()).unwrap();
let _peer = try_flock(&lockfile, FlockMode::Exclusive)
.expect("peer flock attempt")
.expect("peer must acquire on a fresh lockfile");
let err = acquire_flock_with_timeout(
&lockfile,
FlockMode::Shared,
std::time::Duration::from_millis(150),
"shared-test",
None,
)
.expect_err("shared acquire must fail under LOCK_EX peer");
let msg = format!("{err:#}");
assert!(
msg.contains("LOCK_SH"),
"shared-mode timeout must name LOCK_SH; got: {msg}",
);
}
#[test]
fn acquire_flock_remediation_appended_when_some() {
use tempfile::TempDir;
let tmp = TempDir::new().expect("tempdir");
let lockfile = tmp.path().join(".locks").join("remediation.lock");
std::fs::create_dir_all(lockfile.parent().unwrap()).unwrap();
let _peer = try_flock(&lockfile, FlockMode::Exclusive)
.expect("peer flock attempt")
.expect("peer must acquire on a fresh lockfile");
let hint = "Wait for peer or kill it, then retry.";
let err_with = acquire_flock_with_timeout(
&lockfile,
FlockMode::Exclusive,
std::time::Duration::from_millis(120),
"rem-test",
Some(hint),
)
.expect_err("acquire must fail under LOCK_EX peer");
let msg_with = format!("{err_with:#}");
assert!(
msg_with.contains(hint),
"Some(hint) must append the remediation; got: {msg_with}",
);
drop(_peer);
let _peer2 = try_flock(&lockfile, FlockMode::Exclusive)
.expect("peer2 attempt")
.expect("peer2 must acquire");
let err_without = acquire_flock_with_timeout(
&lockfile,
FlockMode::Exclusive,
std::time::Duration::from_millis(120),
"rem-test",
None,
)
.expect_err("acquire must fail under LOCK_EX peer (None case)");
let msg_without = format!("{err_without:#}");
assert!(
!msg_without.contains(hint),
"None must NOT append the remediation; got: {msg_without}",
);
}
#[test]
fn acquire_flock_timeout_message_pins_eval_seam_substrings() {
use tempfile::TempDir;
for mode in [FlockMode::Shared, FlockMode::Exclusive] {
let tmp = TempDir::new().expect("tempdir");
let lockfile = tmp.path().join(".locks").join("seam.lock");
std::fs::create_dir_all(lockfile.parent().unwrap()).unwrap();
let _peer = try_flock(&lockfile, FlockMode::Exclusive)
.expect("peer flock attempt")
.expect("peer must acquire on a fresh lockfile");
let err = acquire_flock_with_timeout(
&lockfile,
mode,
std::time::Duration::from_millis(120),
"seam-test",
None,
)
.expect_err("acquire must time out under LOCK_EX peer");
let msg = format!("{err:#}");
assert!(
msg.contains("flock LOCK_"),
"rendered error must include the literal `flock LOCK_` \
prefix that eval.rs::is_flock_timeout_message keys on; \
mode={mode:?} got: {msg}",
);
assert!(
msg.contains("timed out after"),
"rendered error must include the literal `timed out \
after` substring that eval.rs::is_flock_timeout_message \
keys on; mode={mode:?} got: {msg}",
);
}
}
}