use reddb_server::storage::blockchain::{compute_block_hash, verify_chain, Block, GENESIS_PREV_HASH};
use reddb_server::storage::schema::Value;
use reddb_server::{RedDBError, RedDBOptions, RedDBRuntime, RuntimeQueryResult};
use std::sync::Arc;
fn rt() -> RedDBRuntime {
RedDBRuntime::with_options(RedDBOptions::in_memory()).expect("runtime boots")
}
fn select_all(rt: &RedDBRuntime, name: &str) -> RuntimeQueryResult {
rt.execute_query(&format!("SELECT * FROM {name}"))
.expect("select")
}
fn sort_by_height(res: &mut RuntimeQueryResult) {
res.result.records.sort_by_key(|r| match r.get("block_height") {
Some(Value::UnsignedInteger(v)) => *v as i64,
Some(Value::Integer(v)) => *v,
_ => i64::MAX,
});
}
fn hex32(bytes: &[u8]) -> String {
let mut s = String::with_capacity(bytes.len() * 2);
for b in bytes {
s.push_str(&format!("{b:02x}"));
}
s
}
#[test]
fn caller_prev_hash_mismatch_returns_chain_conflict() {
let rt = rt();
rt.execute_query("CREATE COLLECTION audit_log KIND blockchain")
.expect("create");
let tip = rt
.chain_tip_for_collection("audit_log")
.expect("genesis tip exists");
let bogus = "ff".repeat(32);
let height = tip.height + 1;
let ts = tip.timestamp_ms; let stmt = format!(
"INSERT INTO audit_log (actor, prev_hash, block_height, timestamp) \
VALUES ('alice', '{bogus}', {height}, {ts})"
);
let err = rt.execute_query(&stmt).expect_err("conflict");
match err {
RedDBError::InvalidOperation(msg) => {
assert!(msg.starts_with("BlockchainConflict:"), "got {msg}");
assert!(msg.contains("prev_hash"), "got {msg}");
assert!(
msg.contains(&hex32(&tip.hash)),
"tip hash must appear in body: {msg}"
);
}
other => panic!("expected InvalidOperation, got {other:?}"),
}
}
#[test]
fn caller_height_mismatch_returns_chain_conflict() {
let rt = rt();
rt.execute_query("CREATE COLLECTION audit_log KIND blockchain")
.expect("create");
let tip = rt.chain_tip_for_collection("audit_log").unwrap();
let prev = hex32(&tip.hash);
let bad_height = tip.height + 99; let ts = tip.timestamp_ms;
let stmt = format!(
"INSERT INTO audit_log (actor, prev_hash, block_height, timestamp) \
VALUES ('alice', '{prev}', {bad_height}, {ts})"
);
let err = rt.execute_query(&stmt).expect_err("conflict");
match err {
RedDBError::InvalidOperation(msg) => {
assert!(msg.starts_with("BlockchainConflict:"), "got {msg}");
assert!(msg.contains("block_height"), "got {msg}");
}
other => panic!("expected InvalidOperation, got {other:?}"),
}
}
#[test]
fn caller_timestamp_outside_60s_returns_chain_conflict() {
let rt = rt();
rt.execute_query("CREATE COLLECTION audit_log KIND blockchain")
.expect("create");
let tip = rt.chain_tip_for_collection("audit_log").unwrap();
let prev = hex32(&tip.hash);
let height = tip.height + 1;
let ts = 1u64; let stmt = format!(
"INSERT INTO audit_log (actor, prev_hash, block_height, timestamp) \
VALUES ('alice', '{prev}', {height}, {ts})"
);
let err = rt.execute_query(&stmt).expect_err("conflict");
match err {
RedDBError::InvalidOperation(msg) => {
assert!(msg.starts_with("BlockchainConflict:"), "got {msg}");
assert!(msg.contains("timestamp"), "got {msg}");
}
other => panic!("expected InvalidOperation, got {other:?}"),
}
}
#[test]
fn caller_supplied_correct_values_appended_and_hash_recomputed() {
let rt = rt();
rt.execute_query("CREATE COLLECTION audit_log KIND blockchain")
.expect("create");
let tip = rt.chain_tip_for_collection("audit_log").unwrap();
let prev = hex32(&tip.hash);
let height = tip.height + 1;
let ts = reddb_server::runtime::blockchain_kind::now_ms();
let stmt = format!(
"INSERT INTO audit_log (actor, prev_hash, block_height, timestamp) \
VALUES ('alice', '{prev}', {height}, {ts})"
);
rt.execute_query(&stmt).expect("insert with correct triple");
let new_tip = rt.chain_tip_for_collection("audit_log").unwrap();
assert_eq!(new_tip.height, height);
assert_ne!(new_tip.hash, tip.hash);
assert_eq!(new_tip.timestamp_ms, ts);
let payload = b"actor=alice;".to_vec();
let recomputed =
compute_block_hash(&tip.hash, height, ts, &payload, None);
assert_eq!(recomputed, new_tip.hash);
}
#[test]
fn tip_cache_advances_atomically_with_insert() {
let rt = rt();
rt.execute_query("CREATE COLLECTION audit_log KIND blockchain")
.expect("create");
let t0 = rt.chain_tip_for_collection("audit_log").unwrap();
rt.execute_query("INSERT INTO audit_log (actor) VALUES ('alice')")
.expect("insert");
let t1 = rt.chain_tip_for_collection("audit_log").unwrap();
assert_eq!(t1.height, t0.height + 1);
assert_eq!(t1.height, 1);
rt.execute_query("INSERT INTO audit_log (actor) VALUES ('bob')")
.expect("insert");
let t2 = rt.chain_tip_for_collection("audit_log").unwrap();
assert_eq!(t2.height, 2);
}
#[test]
fn concurrent_writers_loser_gets_conflict_then_retries_to_success() {
let rt = Arc::new(rt());
rt.execute_query("CREATE COLLECTION audit_log KIND blockchain")
.expect("create");
let tip = rt.chain_tip_for_collection("audit_log").unwrap();
let prev_hex = hex32(&tip.hash);
let next_height = tip.height + 1;
let ts = reddb_server::runtime::blockchain_kind::now_ms();
let stmt_a = format!(
"INSERT INTO audit_log (actor, prev_hash, block_height, timestamp) \
VALUES ('a', '{prev_hex}', {next_height}, {ts})"
);
let stmt_b = format!(
"INSERT INTO audit_log (actor, prev_hash, block_height, timestamp) \
VALUES ('b', '{prev_hex}', {next_height}, {ts})"
);
let rt_a = Arc::clone(&rt);
let rt_b = Arc::clone(&rt);
let h_a = std::thread::spawn(move || rt_a.execute_query(&stmt_a));
let h_b = std::thread::spawn(move || rt_b.execute_query(&stmt_b));
let res_a = h_a.join().expect("a joined");
let res_b = h_b.join().expect("b joined");
let (winner_ok, loser_err) = match (res_a, res_b) {
(Ok(_), Err(e)) => ("a", e),
(Err(e), Ok(_)) => ("b", e),
(Ok(_), Ok(_)) => panic!("both writers succeeded — chain lock not enforcing"),
(Err(ea), Err(eb)) => panic!("both writers failed: {ea:?}, {eb:?}"),
};
let _ = winner_ok;
match loser_err {
RedDBError::InvalidOperation(msg) => {
assert!(msg.starts_with("BlockchainConflict:"), "got {msg}");
}
other => panic!("loser must surface BlockchainConflict, got {other:?}"),
}
let new_tip = rt.chain_tip_for_collection("audit_log").unwrap();
let retry_prev = hex32(&new_tip.hash);
let retry_height = new_tip.height + 1;
let retry_ts = reddb_server::runtime::blockchain_kind::now_ms();
let retry_stmt = format!(
"INSERT INTO audit_log (actor, prev_hash, block_height, timestamp) \
VALUES ('loser-retry', '{retry_prev}', {retry_height}, {retry_ts})"
);
rt.execute_query(&retry_stmt).expect("retry after conflict");
let mut res = select_all(&rt, "audit_log");
sort_by_height(&mut res);
let mut blocks = Vec::new();
for rec in &res.result.records {
let height = match rec.get("block_height") {
Some(Value::UnsignedInteger(v)) => *v,
Some(Value::Integer(v)) => *v as u64,
_ => panic!("missing height"),
};
let prev_hash = match rec.get("prev_hash") {
Some(Value::Blob(b)) => {
let mut a = [0u8; 32];
a.copy_from_slice(b);
a
}
_ => panic!("missing prev"),
};
let timestamp_ms = match rec.get("timestamp") {
Some(Value::UnsignedInteger(v)) => *v,
Some(Value::Integer(v)) => *v as u64,
_ => 0,
};
let hash = match rec.get("hash") {
Some(Value::Blob(b)) => {
let mut a = [0u8; 32];
a.copy_from_slice(b);
a
}
_ => panic!("missing hash"),
};
let mut user_pairs: Vec<(String, String)> = rec
.iter_fields()
.filter(|(k, _)| !matches!(k.as_ref(), "block_height" | "prev_hash" | "timestamp" | "hash"))
.map(|(k, v)| (k.to_string(), value_to_plain(v)))
.collect();
user_pairs.sort_by(|a, b| a.0.cmp(&b.0));
let mut payload = Vec::new();
for (k, v) in user_pairs {
payload.extend_from_slice(k.as_bytes());
payload.push(b'=');
payload.extend_from_slice(v.as_bytes());
payload.push(b';');
}
blocks.push(Block {
block_height: height,
prev_hash,
timestamp_ms,
payload,
signed: None,
hash,
});
}
assert!(!blocks.is_empty());
assert!(!blocks.is_empty());
assert_eq!(blocks[0].prev_hash, GENESIS_PREV_HASH);
let _ = verify_chain(&blocks);
}
fn value_to_plain(v: &Value) -> String {
v.plain_text()
}