#![cfg(unix)]
use std::env;
use std::io::{BufRead, BufReader};
use std::path::PathBuf;
use std::process::{Command, Stdio};
use std::thread;
use std::time::{Duration, Instant};
use datawal::RecordLog;
const HOLD_ENV: &str = "DATAWAL_TEST_LOCK_HOLDER";
fn child_run_lock_holder(dir: PathBuf) -> ! {
let _log = RecordLog::open(&dir).expect("child: open + take lock");
println!("LOCK_HELD");
use std::io::Write;
std::io::stdout().flush().ok();
thread::sleep(Duration::from_secs(60));
std::process::exit(0);
}
fn spawn_lock_holder(dir: &std::path::Path) -> std::process::Child {
let exe = env::current_exe().expect("locate test binary");
Command::new(exe)
.args([
"--exact",
"lock_is_contended_across_processes",
"--nocapture",
])
.env(HOLD_ENV, dir)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("spawn child")
}
#[test]
fn lock_is_contended_across_processes() {
if let Ok(dir) = env::var(HOLD_ENV) {
child_run_lock_holder(PathBuf::from(dir));
}
let tmp = tempfile::tempdir().expect("tempdir");
std::fs::create_dir_all(tmp.path()).unwrap();
let mut child = spawn_lock_holder(tmp.path());
let child_stdout = child.stdout.take().expect("child stdout");
let (tx, rx) = std::sync::mpsc::channel::<()>();
let reader_handle = thread::spawn(move || {
let reader = BufReader::new(child_stdout);
for line in reader.lines() {
let line = match line {
Ok(l) => l,
Err(_) => return,
};
if line.contains("LOCK_HELD") {
let _ = tx.send(());
return;
}
}
});
rx.recv_timeout(Duration::from_secs(10))
.expect("child must report LOCK_HELD within 10s");
let err = RecordLog::open(tmp.path())
.expect_err("contended open must fail while child holds the lock");
let msg = format!("{err:#}");
let lower = msg.to_lowercase();
assert!(
lower.contains("lock") || lower.contains("locked"),
"expected a lock-related error, got: {msg}"
);
child.kill().expect("kill child");
let _ = child.wait();
let _ = reader_handle.join();
let deadline = Instant::now() + Duration::from_secs(5);
loop {
match RecordLog::open(tmp.path()) {
Ok(_log) => break,
Err(e) => {
if Instant::now() > deadline {
panic!("lock never became acquirable after child kill; last error: {e:#}");
}
thread::sleep(Duration::from_millis(50));
}
}
}
}