use std::collections::HashMap;
use std::panic::AssertUnwindSafe;
use std::path::{Path, PathBuf};
use obj_core::pager::page::{Page, PageId, PAGE_SIZE};
use obj_core::pager::{wal_path_for, Config, Pager};
use obj_core::platform::fault::{FaultPlan, FaultyFileHandle};
use obj_core::platform::{FileBackend, FileHandle};
use rand::{Rng, SeedableRng};
use rand_chacha::ChaCha8Rng;
use tempfile::TempDir;
const DEFAULT_START: u64 = 0;
const DEFAULT_END: u64 = 10_000;
#[derive(Debug, Clone, Copy)]
enum Op {
Write(usize),
Commit,
Checkpoint,
Reopen,
Alloc,
Free(usize),
}
#[derive(Debug, Clone, Copy)]
enum BackendKind {
PanicOnly,
IntraSyscall,
}
#[test]
#[ignore = "10k-cycle stress test; ~12 min locally. Run via `cargo test --test crash_cycles -- --ignored`"]
fn crash_cycles_seed_range() {
run_seed_range(BackendKind::PanicOnly, "crash_cycles");
}
#[test]
#[ignore = "10k-cycle stress test with intra-syscall faults (issue #20); slower than the panic-only variant. Run via `cargo test --test crash_cycles -- --ignored`"]
fn crash_cycles_with_intra_syscall_faults() {
run_seed_range(BackendKind::IntraSyscall, "crash_cycles_faulty");
}
fn run_seed_range(kind: BackendKind, label: &str) {
let (start, end) = read_seed_range();
let mut failures: Vec<u64> = Vec::new();
let mut total: u64 = 0;
let mode = match kind {
BackendKind::PanicOnly => VerifyMode::Strict,
BackendKind::IntraSyscall => VerifyMode::Lenient,
};
for seed in start..end {
total += 1;
if let Err(msg) = run_one_cycle(seed, kind, mode) {
eprintln!("SEED={seed} ({label}) FAILED: {msg}");
failures.push(seed);
}
}
assert!(
failures.is_empty(),
"{label}: {} of {} seeds failed: first 10 = {:?}",
failures.len(),
total,
&failures[..failures.len().min(10)]
);
eprintln!("{label}: {total} seeds passed (range {start}..{end})");
}
#[derive(Debug, Clone, Copy)]
enum VerifyMode {
Strict,
Lenient,
}
fn read_seed_range() -> (u64, u64) {
let start = std::env::var("CRASH_CYCLES_START")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(DEFAULT_START);
let end = std::env::var("CRASH_CYCLES_END")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(DEFAULT_END);
(start, end)
}
fn run_one_cycle(seed: u64, kind: BackendKind, mode: VerifyMode) -> Result<(), String> {
match kind {
BackendKind::PanicOnly => run_one_cycle_with::<FileHandle, _>(seed, &PanicOnlyOpener, mode),
BackendKind::IntraSyscall => {
let opener = FaultyOpener::from_seed(seed);
run_one_cycle_with::<FaultyFileHandle, _>(seed, &opener, mode)
}
}
}
trait CycleOpener<F: FileBackend> {
fn open(&self, path: &Path, config: Config) -> obj_core::Result<Pager<F>>;
}
struct PanicOnlyOpener;
impl CycleOpener<FileHandle> for PanicOnlyOpener {
fn open(&self, path: &Path, config: Config) -> obj_core::Result<Pager<FileHandle>> {
let mut p = Pager::open(path, config)?;
p.begin_txn();
Ok(p)
}
}
struct FaultyOpener {
main_seed: u64,
wal_seed: u64,
}
impl FaultyOpener {
fn from_seed(seed: u64) -> Self {
Self {
main_seed: seed ^ 0x9E37_79B9_7F4A_7C15,
wal_seed: seed ^ 0xBB67_AE85_84CA_A73B,
}
}
fn main_plan(&self) -> FaultPlan {
FaultPlan::new(self.main_seed, 0.0, 0.01, 0.0, 0.0, 0)
}
fn wal_plan(&self) -> FaultPlan {
FaultPlan::new(self.wal_seed, 0.003, 0.01, 0.0, 0.0008, 0)
}
}
impl CycleOpener<FaultyFileHandle> for FaultyOpener {
fn open(&self, path: &Path, config: Config) -> obj_core::Result<Pager<FaultyFileHandle>> {
let main = FaultyFileHandle::new(FileHandle::open_or_create(path)?, self.main_plan());
let wal_path = wal_path_for(path);
let wal = FaultyFileHandle::new(FileHandle::open_or_create(&wal_path)?, self.wal_plan());
let mut p = Pager::<FaultyFileHandle>::open_with_backends(main, wal, wal_path, config)?;
p.begin_txn();
Ok(p)
}
}
fn run_one_cycle_with<F: FileBackend, O: CycleOpener<F>>(
seed: u64,
opener: &O,
mode: VerifyMode,
) -> Result<(), String> {
let mut rng = ChaCha8Rng::seed_from_u64(seed);
let dir = TempDir::new().map_err(|e| format!("tempdir: {e}"))?;
let db_path = dir.path().join("crash.obj");
let n_pages: u32 = rng.random_range(4u32..12);
let n_ops: u32 = rng.random_range(20u32..60);
let panic_after_commit: Option<u32> = if rng.random::<u32>() % 4 == 0 {
Some(rng.random_range(0u32..n_ops))
} else {
None
};
let mut state = CycleState::new(&db_path, n_pages, opener)?;
let mut log: Vec<String> = Vec::with_capacity(n_ops as usize);
let ops: Vec<Op> = (0..n_ops).map(|_| random_op(&mut rng, n_pages)).collect();
let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
run_workload(&mut state, &ops, panic_after_commit, &mut log, opener)
}));
handle_workload_result(seed, result, &log)?;
let verify = match mode {
VerifyMode::Strict => verify_recovery_strict,
VerifyMode::Lenient => verify_recovery_lenient,
};
verify(&db_path, &state.history, &state.allocated).inspect_err(|_| {
let _ = write_seed_log(seed, &log);
})?;
Ok(())
}
fn handle_workload_result(
seed: u64,
result: std::thread::Result<Result<(), String>>,
log: &[String],
) -> Result<(), String> {
match result {
Err(panic) => {
let msg = panic_message(&panic);
if msg.contains("__INJECTED_CRASH__") || msg.contains("obj-core::fault") {
Ok(())
} else {
write_seed_log(seed, log).map_err(|e| format!("write log: {e}"))?;
Err(format!("workload panicked: {msg}"))
}
}
Ok(Err(workload_err)) => {
if workload_err.starts_with("write_page:")
|| workload_err.starts_with("commit:")
|| workload_err.starts_with("checkpoint:")
|| workload_err.starts_with("close:")
|| workload_err.starts_with("reopen:")
{
Ok(())
} else {
write_seed_log(seed, log).map_err(|e| format!("write log: {e}"))?;
Err(format!("workload error: {workload_err}"))
}
}
Ok(Ok(())) => Ok(()),
}
}
fn random_op(rng: &mut ChaCha8Rng, n_pages: u32) -> Op {
let pick = rng.random::<u32>() % 100;
match pick {
0..=49 => Op::Write(usize::try_from(rng.random_range(0u32..n_pages)).unwrap_or(0)),
50..=74 => Op::Commit,
75..=84 => Op::Checkpoint,
85..=89 => Op::Reopen,
90..=94 => Op::Alloc,
_ => Op::Free(usize::try_from(rng.random_range(0u32..n_pages)).unwrap_or(0)),
}
}
struct CycleState<F: FileBackend> {
pager: Option<Pager<F>>,
db_path: PathBuf,
expected: HashMap<PageId, Vec<u8>>,
history: HashMap<PageId, Vec<Vec<u8>>>,
pending: HashMap<PageId, Vec<u8>>,
allocated: Vec<PageId>,
commit_count: u32,
}
impl<F: FileBackend> CycleState<F> {
fn new<O: CycleOpener<F>>(db_path: &Path, n_pages: u32, opener: &O) -> Result<Self, String> {
let mut p = opener
.open(db_path, Config::default())
.map_err(|e| format!("open: {e}"))?;
let cap = usize::try_from(n_pages).unwrap_or(0);
let mut allocated = Vec::with_capacity(cap);
for _ in 0..n_pages {
allocated.push(p.alloc_page().map_err(|e| format!("alloc: {e}"))?);
}
let _ = p.commit().map_err(|e| format!("setup commit: {e}"))?;
Ok(Self {
pager: Some(p),
db_path: db_path.to_path_buf(),
expected: HashMap::new(),
history: HashMap::new(),
pending: HashMap::new(),
allocated,
commit_count: 0,
})
}
fn pager_mut(&mut self) -> &mut Pager<F> {
self.pager
.as_mut()
.expect("pager Some during workload step")
}
}
fn run_workload<F: FileBackend, O: CycleOpener<F>>(
state: &mut CycleState<F>,
ops: &[Op],
panic_after_commit: Option<u32>,
log: &mut Vec<String>,
opener: &O,
) -> Result<(), String> {
for (step, op) in ops.iter().enumerate() {
match *op {
Op::Write(idx) => op_write(state, step, idx, log)?,
Op::Commit => op_commit(state, step, panic_after_commit, log)?,
Op::Checkpoint => {
state
.pager_mut()
.checkpoint()
.map_err(|e| format!("checkpoint: {e}"))?;
log.push(format!("{step}: checkpoint"));
}
Op::Reopen => op_reopen(state, step, log, opener)?,
Op::Alloc => op_alloc(state, step, log)?,
Op::Free(idx) => op_free(state, step, idx, log)?,
}
}
Ok(())
}
fn op_write<F: FileBackend>(
state: &mut CycleState<F>,
step: usize,
idx: usize,
log: &mut Vec<String>,
) -> Result<(), String> {
if state.allocated.is_empty() {
return Ok(());
}
let pid = state.allocated[idx % state.allocated.len()];
let body = page_for(step, pid);
let mut page = Page::zeroed();
let body_len = body.len().min(PAGE_SIZE - 4);
page.as_bytes_mut()[..body_len].copy_from_slice(&body[..body_len]);
state
.pager_mut()
.write_page(pid, &page)
.map_err(|e| format!("write_page: {e}"))?;
state.pending.insert(pid, page.as_bytes().to_vec());
log.push(format!("{step}: write {pid:?}", pid = pid.get()));
Ok(())
}
fn op_commit<F: FileBackend>(
state: &mut CycleState<F>,
step: usize,
panic_after_commit: Option<u32>,
log: &mut Vec<String>,
) -> Result<(), String> {
let _ = state
.pager_mut()
.commit()
.map_err(|e| format!("commit: {e}"))?;
merge_pending_into_history(state);
state.commit_count = state.commit_count.saturating_add(1);
log.push(format!(
"{step}: commit (#{count})",
count = state.commit_count
));
if panic_after_commit == Some(state.commit_count.saturating_sub(1)) {
log.push(format!("{step}: __INJECTED_CRASH__"));
panic!("__INJECTED_CRASH__ at step {step}");
}
Ok(())
}
fn op_reopen<F: FileBackend, O: CycleOpener<F>>(
state: &mut CycleState<F>,
step: usize,
log: &mut Vec<String>,
opener: &O,
) -> Result<(), String> {
let p = state.pager.take().expect("pager");
p.close().map_err(|e| format!("close: {e}"))?;
state.pending.clear();
let p = opener
.open(&state.db_path, Config::default())
.map_err(|e| format!("reopen: {e}"))?;
state.pager = Some(p);
log.push(format!("{step}: reopen"));
Ok(())
}
fn op_alloc<F: FileBackend>(
state: &mut CycleState<F>,
step: usize,
log: &mut Vec<String>,
) -> Result<(), String> {
let pid = state
.pager_mut()
.alloc_page()
.map_err(|e| format!("alloc: {e}"))?;
state.allocated.push(pid);
log.push(format!("{step}: alloc -> {id}", id = pid.get()));
let _ = state
.pager_mut()
.commit()
.map_err(|e| format!("commit: {e}"))?;
merge_pending_into_history(state);
state.commit_count = state.commit_count.saturating_add(1);
Ok(())
}
fn op_free<F: FileBackend>(
state: &mut CycleState<F>,
step: usize,
idx: usize,
log: &mut Vec<String>,
) -> Result<(), String> {
if state.allocated.is_empty() {
return Ok(());
}
let pos = idx % state.allocated.len();
let victim = state.allocated.remove(pos);
state.expected.remove(&victim);
state.history.remove(&victim);
state.pending.remove(&victim);
state
.pager_mut()
.free_page(victim)
.map_err(|e| format!("free: {e}"))?;
log.push(format!("{step}: free {id}", id = victim.get()));
let _ = state
.pager_mut()
.commit()
.map_err(|e| format!("commit: {e}"))?;
merge_pending_into_history(state);
state.commit_count = state.commit_count.saturating_add(1);
Ok(())
}
fn merge_pending_into_history<F: FileBackend>(state: &mut CycleState<F>) {
for (id, body) in state.pending.drain() {
state.expected.insert(id, body.clone());
state.history.entry(id).or_default().push(body);
}
}
fn verify_recovery_lenient(
db_path: &std::path::Path,
history: &HashMap<PageId, Vec<Vec<u8>>>,
allocated: &[PageId],
) -> Result<(), String> {
let mut p = match Pager::open(db_path, Config::default()) {
Ok(p) => p,
Err(obj_core::Error::WalCorruption { .. }) => {
return Ok(());
}
Err(e) => return Err(format!("recovery open: {e}")),
};
for pid in allocated {
let Some(want_versions) = history.get(pid) else {
match p.read_page(*pid) {
Ok(_) | Err(obj_core::Error::Corruption { .. }) => continue,
Err(e) => {
return Err(format!("recovery read {pid:?}: {e}", pid = pid.get()));
}
}
};
let read = match p.read_page(*pid) {
Ok(r) => r,
Err(obj_core::Error::Corruption { .. }) => {
return Err(format!(
"page {pid:?}: unexpected on-disk corruption",
pid = pid.get()
));
}
Err(e) => return Err(format!("recovery read {pid:?}: {e}", pid = pid.get())),
};
let body_len = PAGE_SIZE - 4;
let got: &[u8] = &read.as_bytes()[..body_len];
if !page_matches_any_committed(got, want_versions) {
return Err(format!(
"page {pid:?}: bytes match neither any historical commit nor zeros",
pid = pid.get()
));
}
}
Ok(())
}
fn page_matches_any_committed(got: &[u8], history: &[Vec<u8>]) -> bool {
if got.iter().all(|&b| b == 0) {
return true;
}
history.iter().any(|want| {
let want_body: &[u8] = &want[..got.len().min(want.len())];
got[..want_body.len()] == *want_body
})
}
fn page_matches_last_committed(got: &[u8], history: &[Vec<u8>]) -> bool {
let Some(want) = history.last() else {
return got.iter().all(|&b| b == 0);
};
let want_body: &[u8] = &want[..got.len().min(want.len())];
got[..want_body.len()] == *want_body
}
fn verify_recovery_strict(
db_path: &std::path::Path,
history: &HashMap<PageId, Vec<Vec<u8>>>,
allocated: &[PageId],
) -> Result<(), String> {
let mut p = match Pager::open(db_path, Config::default()) {
Ok(p) => p,
Err(e) => return Err(format!("recovery open: {e}")),
};
for pid in allocated {
let Some(want_versions) = history.get(pid) else {
match p.read_page(*pid) {
Ok(_) | Err(obj_core::Error::Corruption { .. }) => continue,
Err(e) => {
return Err(format!("recovery read {pid:?}: {e}", pid = pid.get()));
}
}
};
let read = p
.read_page(*pid)
.map_err(|e| format!("recovery read {pid:?}: {e}", pid = pid.get()))?;
let body_len = PAGE_SIZE - 4;
let got: &[u8] = &read.as_bytes()[..body_len];
if !page_matches_last_committed(got, want_versions) {
return Err(format!(
"page {pid:?}: strict recovery: bytes do not match the LAST committed value \
(panic-only ACID violation — a committed txn rolled back to an earlier state)",
pid = pid.get()
));
}
}
Ok(())
}
fn page_for(step: usize, pid: PageId) -> Vec<u8> {
let mut out = vec![0u8; 64];
let mix = (step as u64).wrapping_mul(0x9E37_79B9_7F4A_7C15) ^ pid.get();
for (i, b) in out.iter_mut().enumerate() {
*b = ((mix >> ((i % 8) * 8)) & 0xFF) as u8;
}
out
}
fn panic_message(payload: &Box<dyn std::any::Any + Send>) -> String {
if let Some(s) = payload.downcast_ref::<String>() {
return s.clone();
}
if let Some(s) = payload.downcast_ref::<&'static str>() {
return (*s).to_string();
}
"<non-string panic payload>".to_string()
}
fn write_seed_log(seed: u64, log: &[String]) -> std::io::Result<()> {
let dir = PathBuf::from("target/crash_cycles");
std::fs::create_dir_all(&dir)?;
let path = dir.join(format!("seed-{seed}.log"));
std::fs::write(&path, log.join("\n"))?;
eprintln!(
"crash_cycles: wrote log for seed {seed} to {}",
path.display()
);
Ok(())
}