use powdb_query::executor::Engine;
use powdb_query::result::QueryResult;
use powdb_storage::types::Value;
fn temp_dir(name: &str) -> std::path::PathBuf {
std::env::temp_dir().join(format!(
"powdb_durability_{name}_{}_{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos()
))
}
fn exec(engine: &mut Engine, query: &str) -> QueryResult {
engine
.execute_powql(query)
.unwrap_or_else(|e| panic!("failed to execute `{query}`: {e}"))
}
fn count(engine: &mut Engine, query: &str) -> i64 {
match exec(engine, query) {
QueryResult::Scalar(Value::Int(n)) => n,
QueryResult::Rows { rows, .. } if rows.len() == 1 && rows[0].len() == 1 => {
match &rows[0][0] {
Value::Int(n) => *n,
other => panic!("count returned non-int {other:?}"),
}
}
other => panic!("expected scalar count, got {other:?}"),
}
}
fn int_field(engine: &mut Engine, query: &str, field: &str) -> Option<i64> {
match exec(engine, query) {
QueryResult::Rows { columns, rows } => {
let idx = columns.iter().position(|c| c == field)?;
rows.first().and_then(|r| match &r[idx] {
Value::Int(n) => Some(*n),
_ => None,
})
}
_ => None,
}
}
#[test]
fn test_writes_after_crash_recovery_survive_second_crash() {
let dir = temp_dir("after_recovery");
std::fs::create_dir_all(&dir).unwrap();
{
let mut engine = Engine::new(&dir).unwrap();
exec(&mut engine, "type K { required id: int, v: int }");
for i in 1..=200i64 {
exec(&mut engine, &format!("insert K {{ id := {i}, v := {i} }}"));
}
assert_eq!(count(&mut engine, "count(K)"), 200);
std::mem::forget(engine); }
{
let mut engine = Engine::new(&dir).unwrap();
assert_eq!(
count(&mut engine, "count(K)"),
200,
"the 200 rows must be recovered after crash #1"
);
for i in 201..=250i64 {
exec(&mut engine, &format!("insert K {{ id := {i}, v := {i} }}"));
}
assert_eq!(count(&mut engine, "count(K)"), 250);
std::mem::forget(engine); }
{
let mut engine = Engine::new(&dir).unwrap();
assert_eq!(
count(&mut engine, "count(K)"),
250,
"writes taken after crash-recovery were lost on the next crash — \
next_lsn reset to 1 on WAL open and replay skipped them"
);
for i in 201..=250i64 {
assert_eq!(
int_field(&mut engine, &format!("K filter .id = {i} {{ .v }}"), "v"),
Some(i),
"row id={i} written after recovery is missing after crash #2"
);
}
}
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_repeated_restart_write_crash_cycles() {
let dir = temp_dir("restart_cycles");
std::fs::create_dir_all(&dir).unwrap();
{
let mut engine = Engine::new(&dir).unwrap();
exec(&mut engine, "type K { required id: int }");
}
let cycles = 5i64;
let per_cycle = 4i64;
for c in 0..cycles {
let mut engine = Engine::new(&dir).unwrap();
for j in 0..per_cycle {
let id = c * per_cycle + j;
exec(&mut engine, &format!("insert K {{ id := {id} }}"));
}
if c == cycles - 1 {
} else {
std::mem::forget(engine);
}
}
let mut engine = Engine::new(&dir).unwrap();
assert_eq!(
count(&mut engine, "count(K)"),
cycles * per_cycle,
"rows from earlier restart→write→crash cycles were lost"
);
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_mixed_mutations_survive_crash() {
let dir = temp_dir("mixed_mutations");
std::fs::create_dir_all(&dir).unwrap();
{
let mut engine = Engine::new(&dir).unwrap();
exec(
&mut engine,
"type P { required unique id: int, price: int, tag: str }",
);
for i in 1..=300i64 {
exec(
&mut engine,
&format!(
"insert P {{ id := {i}, price := {p}, tag := \"t\" }}",
p = i
),
);
}
exec(&mut engine, "P filter .id <= 100 update { price := 9999 }");
exec(&mut engine, "P filter .id > 250 delete");
exec(
&mut engine,
"upsert P on .id { id := 50, price := 7777, tag := \"u\" }",
);
exec(
&mut engine,
"upsert P on .id { id := 400, price := 1, tag := \"new\" }",
);
std::mem::forget(engine); }
let mut engine = Engine::new(&dir).unwrap();
assert_eq!(
count(&mut engine, "count(P)"),
251,
"row count wrong after crash recovery of mixed mutations"
);
assert_eq!(
count(&mut engine, "count(P filter .id > 250 { .id })"),
1,
"deleted band reappeared after crash (only the upserted id=400 should remain > 250)"
);
assert_eq!(
int_field(&mut engine, "P filter .id = 100 { .price }", "price"),
Some(9999),
"bulk update reverted after crash"
);
assert_eq!(
int_field(&mut engine, "P filter .id = 50 { .price }", "price"),
Some(7777),
"upsert-update reverted after crash"
);
assert_eq!(
int_field(&mut engine, "P filter .id = 400 { .price }", "price"),
Some(1),
"upsert-insert lost after crash"
);
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_cross_table_mutations_with_ddl_survive_crash() {
let dir = temp_dir("cross_table_ddl");
std::fs::create_dir_all(&dir).unwrap();
{
let mut engine = Engine::new(&dir).unwrap();
exec(&mut engine, "type A { required id: int, name: str }");
exec(&mut engine, "type B { required id: int, val: int }");
for i in 1..=150i64 {
exec(
&mut engine,
&format!("insert A {{ id := {i}, name := \"a\" }}"),
);
exec(
&mut engine,
&format!("insert B {{ id := {i}, val := {i} }}"),
);
}
exec(&mut engine, "B filter .id > 100 delete");
exec(&mut engine, "B filter .id <= 50 update { val := -1 }");
exec(&mut engine, "alter A add column score: int");
exec(&mut engine, "alter A add index .id");
std::mem::forget(engine); }
let mut engine = Engine::new(&dir).unwrap();
assert_eq!(
count(&mut engine, "count(A)"),
150,
"A rows lost after crash"
);
assert_eq!(
count(&mut engine, "count(B)"),
100,
"B delete reverted after crash (DDL on A skipped B's records)"
);
assert_eq!(
count(&mut engine, "count(B filter .val = -1 { .id })"),
50,
"B bulk update reverted after crash"
);
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_prepared_insert_survives_crash() {
use powdb_query::ast::Literal;
let dir = temp_dir("prepared_insert");
std::fs::create_dir_all(&dir).unwrap();
{
let mut engine = Engine::new(&dir).unwrap();
exec(&mut engine, "type K { required id: int, v: int }");
let prep = engine
.prepare("insert K { id := 0, v := 0 }")
.expect("prepare insert");
for i in 1..=50i64 {
engine
.execute_prepared(&prep, &[Literal::Int(i), Literal::Int(i * 10)])
.expect("prepared insert");
}
assert_eq!(count(&mut engine, "count(K)"), 50);
std::mem::forget(engine); }
let mut engine = Engine::new(&dir).unwrap();
assert_eq!(
count(&mut engine, "count(K)"),
50,
"prepared inserts were lost on crash — the fast path must WAL-log"
);
assert_eq!(
int_field(&mut engine, "K filter .id = 25 { .v }", "v"),
Some(250)
);
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn test_var_length_update_relocation_survives_crash() {
let dir = temp_dir("var_relocation");
std::fs::create_dir_all(&dir).unwrap();
{
let mut engine = Engine::new(&dir).unwrap();
exec(&mut engine, "type D { required id: int, blob: str }");
for i in 1..=120i64 {
exec(
&mut engine,
&format!("insert D {{ id := {i}, blob := \"x\" }}"),
);
}
let big = "y".repeat(400);
for i in 1..=120i64 {
exec(
&mut engine,
&format!("D filter .id = {i} update {{ blob := \"{big}\" }}"),
);
}
std::mem::forget(engine); }
let mut engine = Engine::new(&dir).unwrap();
assert_eq!(
count(&mut engine, "count(D)"),
120,
"row count wrong after crash recovery of relocating updates"
);
let big = "y".repeat(400);
match exec(&mut engine, "D filter .id = 60 { .blob }") {
QueryResult::Rows { rows, .. } => {
assert_eq!(rows.len(), 1, "id=60 must exist exactly once");
assert_eq!(rows[0][0], Value::Str(big.clone()), "grown value lost");
}
other => panic!("expected rows, got {other:?}"),
}
assert_eq!(
count(
&mut engine,
&format!("count(D filter .blob = \"{big}\" {{ .id }})")
),
120,
"every row's update must survive",
);
std::fs::remove_dir_all(&dir).ok();
}