#![cfg(target_os = "linux")]
use std::fs;
use std::path::{Path, PathBuf};
use datawal::format::RecordType;
use datawal::RecordLog;
fn shm_test_dir() -> Option<PathBuf> {
let shm = Path::new("/dev/shm");
if !shm.is_dir() {
eprintln!("skip: /dev/shm not present");
return None;
}
let unique = format!(
"datawal-disk-full-{}-{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0)
);
let probe = shm.join(unique);
if let Err(e) = fs::create_dir_all(&probe) {
eprintln!("skip: /dev/shm not writable: {e}");
return None;
}
Some(probe)
}
fn append_until_failure(log: &mut RecordLog) -> (u64, String) {
let payload = vec![0u8; 64 * 1024];
let mut count: u64 = 0;
loop {
let key = format!("k{count:010}");
match log.append_record(RecordType::Put, key.as_bytes(), &payload) {
Ok(_) => {
count += 1;
if count > 1_000_000 {
panic!(
"wrote 1M records ({} GiB) without ENOSPC; \
is the tmpfs too large for this test?",
count * 64 / (1024 * 1024)
);
}
}
Err(e) => return (count, format!("{e:#}")),
}
}
}
#[test]
fn enospc_on_tmpfs_poisons_writer() {
let Some(dir) = shm_test_dir() else { return };
let mut log = RecordLog::open(&dir).expect("open log on tmpfs");
assert!(!log.is_poisoned(), "fresh log must not be poisoned");
let (count, msg) = append_until_failure(&mut log);
assert!(
count >= 1,
"no record was written before failure; tmpfs too small? msg={msg}"
);
assert!(
log.is_poisoned(),
"writer must be poisoned after I/O failure"
);
let err = log
.append_record(RecordType::Put, b"after", b"after")
.expect_err("post-poison append must fail");
let post_msg = format!("{err:#}");
assert!(
post_msg.starts_with("datawal: writer poisoned: "),
"post-poison message must use the documented prefix; got: {post_msg}"
);
assert!(
post_msg.ends_with("; drop handle and reopen"),
"post-poison message must use the documented suffix; got: {post_msg}"
);
let report = log.recovery_report().expect("recovery_report after poison");
assert_eq!(report.files_scanned, 1);
drop(log);
for entry in fs::read_dir(&dir).expect("read tmpfs dir") {
let entry = entry.unwrap();
if entry.path().extension().and_then(|s| s.to_str()) == Some("dwal") {
let _ = fs::remove_file(entry.path());
}
}
let mut log2 = RecordLog::open(&dir).expect("reopen after cleanup");
assert!(!log2.is_poisoned(), "reopen must clear poison");
log2.append_record(RecordType::Put, b"fresh", b"v")
.expect("write must work on the reopened handle");
let _ = fs::remove_dir_all(&dir);
}
#[test]
#[ignore]
fn manual_disk_full() {
let Ok(dir) = std::env::var("DATAWAL_DISK_FULL_DIR") else {
eprintln!("DATAWAL_DISK_FULL_DIR not set; skipping");
return;
};
let dir = PathBuf::from(dir);
assert!(dir.is_dir(), "DATAWAL_DISK_FULL_DIR must be a directory");
let mut log = RecordLog::open(&dir).expect("open log on small fs");
let (count, msg) = append_until_failure(&mut log);
assert!(count >= 1, "no records written; msg={msg}");
assert!(log.is_poisoned(), "writer must be poisoned");
let err = log
.append_record(RecordType::Put, b"x", b"y")
.expect_err("post-poison must fail");
let post = format!("{err:#}");
assert!(post.starts_with("datawal: writer poisoned: "));
assert!(post.ends_with("; drop handle and reopen"));
}