use aurora_db::{Aurora, AuroraConfig, FieldType, Value};
use rand::Rng;
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::{Duration, Instant};
#[derive(Debug)]
struct Config {
rounds: usize,
db_path: PathBuf,
kill_delay_ms_min: u64,
kill_delay_ms_max: u64,
}
impl Default for Config {
fn default() -> Self {
Self {
rounds: 20,
db_path: PathBuf::from("/tmp/aurora_chaos_db"),
kill_delay_ms_min: 500,
kill_delay_ms_max: 2000,
}
}
}
fn parse_args() -> Config {
let mut cfg = Config::default();
let args: Vec<String> = std::env::args().collect();
let mut i = 1;
let next_val = |i: usize, flag: &str, args: &[String]| -> String {
if i + 1 >= args.len() {
eprintln!("error: '{}' requires a value", flag);
std::process::exit(2);
}
args[i + 1].clone()
};
while i < args.len() {
match args[i].as_str() {
"--rounds" => {
let v = next_val(i, "--rounds", &args);
cfg.rounds = v.parse().unwrap_or_else(|_| {
eprintln!("error: --rounds must be a positive integer");
std::process::exit(2);
});
i += 1;
}
"--db-path" => {
let v = next_val(i, "--db-path", &args);
cfg.db_path = PathBuf::from(v);
i += 1;
}
"--kill-min-ms" => {
let v = next_val(i, "--kill-min-ms", &args);
cfg.kill_delay_ms_min = v.parse().unwrap_or_else(|_| {
eprintln!("error: --kill-min-ms must be a non-negative integer");
std::process::exit(2);
});
i += 1;
}
"--kill-max-ms" => {
let v = next_val(i, "--kill-max-ms", &args);
cfg.kill_delay_ms_max = v.parse().unwrap_or_else(|_| {
eprintln!("error: --kill-max-ms must be a non-negative integer");
std::process::exit(2);
});
i += 1;
}
_ => {}
}
i += 1;
}
if cfg.kill_delay_ms_min >= cfg.kill_delay_ms_max {
eprintln!(
"error: --kill-min-ms ({}) must be less than --kill-max-ms ({})",
cfg.kill_delay_ms_min, cfg.kill_delay_ms_max
);
std::process::exit(2);
}
cfg
}
fn counter_path(db_path: &PathBuf) -> PathBuf {
db_path.with_extension("counter")
}
fn read_counter(db_path: &PathBuf) -> u64 {
std::fs::read_to_string(counter_path(db_path))
.ok()
.and_then(|s| s.trim().parse().ok())
.unwrap_or(0)
}
fn write_counter(db_path: &PathBuf, n: u64) {
let path = counter_path(db_path);
let tmp = path.with_extension("counter_tmp");
if std::fs::write(&tmp, n.to_string()).is_ok() {
let _ = std::fs::rename(&tmp, &path);
}
}
async fn run_worker(db_path: PathBuf) {
let db = Aurora::with_config(AuroraConfig {
db_path: db_path.clone(),
enable_wal: true,
enable_write_buffering: false,
..Default::default()
})
.await
.expect("worker: failed to open db");
let _ = db
.new_collection(
"records",
vec![
("seq".to_string(), FieldType::SCALAR_INT, false),
("payload".to_string(), FieldType::SCALAR_STRING, false),
],
)
.await;
let start = read_counter(&db_path);
let mut seq = start;
loop {
let mut data = HashMap::new();
data.insert("seq".to_string(), Value::Int(seq as i64));
data.insert(
"payload".to_string(),
Value::String(format!("record-{}", seq)),
);
if db.insert_map("records", data).await.is_ok() {
seq += 1;
write_counter(&db_path, seq);
}
}
}
fn random_delay(min: u64, max: u64) -> u64 {
if min >= max {
return min;
}
rand::thread_rng().gen_range(min..max)
}
fn clean_db(base: &PathBuf) {
let cold_dir = base.with_extension("db");
let wal_file = base.with_extension("wal");
let counter = counter_path(base);
let _ = std::fs::remove_dir_all(base); let _ = std::fs::remove_dir_all(&cold_dir); let _ = std::fs::remove_file(&wal_file);
let _ = std::fs::remove_file(&counter);
}
#[allow(dead_code)]
struct RoundResult {
round: usize,
committed: u64,
recovered: u64,
duration: Duration,
}
async fn verify_database(db_path: &PathBuf, expected_count: u64) -> Result<(u64, u64), String> {
let db = Aurora::with_config(AuroraConfig {
db_path: db_path.clone(),
enable_wal: true,
enable_write_buffering: false,
..Default::default()
})
.await
.map_err(|e| format!("verifier: failed to open db: {e}"))?;
let docs = db
.get_all_collection("records")
.await
.map_err(|e| format!("verifier: get_all_collection failed: {e}"))?;
let mut found_seqs = std::collections::HashSet::new();
let mut corrupted = 0u64;
for doc in &docs {
match doc.data.get("seq") {
Some(Value::Int(seq)) => {
found_seqs.insert(*seq as u64);
}
_ => {
corrupted += 1;
}
}
}
let recovered = (0..expected_count)
.filter(|s| found_seqs.contains(s))
.count() as u64;
let missing = expected_count.saturating_sub(recovered);
Ok((recovered, missing + corrupted))
}
async fn run_supervisor(cfg: Config) {
println!("╔════════════════════════════════════════════╗");
println!("║ AURORA CHAOS SUITE ║");
println!("╠════════════════════════════════════════════╣");
println!("║ Rounds: {:>6} ║", cfg.rounds);
println!(
"║ Kill window: {:>4}-{:<4}ms ║",
cfg.kill_delay_ms_min, cfg.kill_delay_ms_max
);
println!("╚════════════════════════════════════════════╝\n");
clean_db(&cfg.db_path);
let binary = std::env::current_exe().expect("cannot find current exe");
let mut results: Vec<RoundResult> = Vec::new();
for round in 1..=cfg.rounds {
print!("Round {:>3}/{} — spawning writer... ", round, cfg.rounds);
let committed_before = read_counter(&cfg.db_path);
let mut child = std::process::Command::new(&binary)
.env("CHAOS_WORKER", "1")
.env("CHAOS_DB_PATH", &cfg.db_path)
.spawn()
.expect("failed to spawn worker");
let kill_after = random_delay(cfg.kill_delay_ms_min, cfg.kill_delay_ms_max);
std::thread::sleep(Duration::from_millis(kill_after));
child.kill().ok();
child.wait().ok();
let committed_after = read_counter(&cfg.db_path);
let written_this_round = committed_after.saturating_sub(committed_before);
let start = Instant::now();
let (recovered, lost) = match verify_database(&cfg.db_path, committed_after).await {
Ok(v) => v,
Err(e) => {
eprintln!("\nverify failed on round {round}: {e}");
std::process::exit(1);
}
};
let duration = start.elapsed();
println!(
"killed after {:>4}ms | wrote {:>5} | committed {:>6} | recovered {:>6} | lost {}",
kill_after, written_this_round, committed_after, recovered, lost
);
results.push(RoundResult {
round,
committed: committed_after,
recovered,
duration,
});
}
let total_committed = results.iter().map(|r| r.committed).max().unwrap_or(0);
let total_recovered = results.iter().map(|r| r.recovered).max().unwrap_or(0);
let total_data_loss = total_committed.saturating_sub(total_recovered);
println!("\n╔════════════════════════════════════════════╗");
println!("║ CHAOS RESULTS ║");
println!("╠════════════════════════════════════════════╣");
println!("║ Rounds completed: {:>6} ║", cfg.rounds);
println!(
"║ Total committed: {:>6} ║",
total_committed
);
println!(
"║ Total recovered: {:>6} ║",
total_recovered
);
println!(
"║ Total lost: {:>6} ║",
total_data_loss
);
println!("╠════════════════════════════════════════════╣");
if total_data_loss == 0 {
println!("║ ✓ PASS — 0.0% data loss across all rounds ║");
} else if total_committed > 0 {
let loss_pct = total_data_loss as f64 / total_committed as f64 * 100.0;
println!("║ ✗ FAIL — {:.4}% data loss detected ║", loss_pct);
} else {
println!("║ — no commits observed across all rounds ║");
}
println!("╚════════════════════════════════════════════╝");
if total_data_loss > 0 {
std::process::exit(1);
}
}
#[tokio::main]
async fn main() {
if std::env::var("CHAOS_WORKER").is_ok() {
let db_path = std::env::var_os("CHAOS_DB_PATH")
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from("/tmp/aurora_chaos_db"));
run_worker(db_path).await;
} else {
let cfg = parse_args();
run_supervisor(cfg).await;
}
}