#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod tests {
use crate::wal_export::{
buffered_sink::BufferedWalSink, InvalidFramingReason, WalExportError, WalRecordSink,
MAX_RECORD_BYTES, STREAM_HEADER_MAGIC,
};
use arkhe_forge_core::arkhe_pure;
use arkhe_kernel::abi::{CapabilityMask, Principal, Tick};
use arkhe_kernel::persist::{Wal, WalRecord};
use arkhe_kernel::state::{
Action, ActionCompute, ActionContext, ActionDeriv, InstanceConfig, Op,
};
use arkhe_kernel::{ArkheAction, Kernel};
use serde::{Deserialize, Serialize};
const WORLD_ID: [u8; 32] = [0x42u8; 32];
const MANIFEST_DIGEST: [u8; 32] = [0x9Au8; 32];
#[derive(Debug, Serialize, Deserialize, Clone, ArkheAction)]
#[arkhe(type_code = 7000, schema_version = 1)]
struct TestAction {
nonce: u64,
}
impl ActionCompute for TestAction {
#[arkhe_pure]
fn compute(&self, _ctx: &ActionContext) -> Vec<Op> {
Vec::new()
}
}
fn build_wal_with_records(n: u64) -> Wal {
let mut kernel = Kernel::new_with_wal(WORLD_ID, MANIFEST_DIGEST);
kernel.register_action::<TestAction>();
let inst = kernel.create_instance(InstanceConfig::default());
for i in 0..n {
let action = TestAction { nonce: i };
let bytes = Action::canonical_bytes(&action);
let at = Tick(i);
kernel
.submit(
inst,
Principal::System,
None,
at,
TestAction::TYPE_CODE,
bytes,
)
.expect("submit OK");
let _ = kernel.step(at, CapabilityMask::SYSTEM);
}
kernel.export_wal().expect("WAL attached")
}
fn export_records_to_stream(records: &[WalRecord]) -> Vec<u8> {
let mut buf: Vec<u8> = Vec::new();
{
let mut sink = BufferedWalSink::new(&mut buf);
for r in records {
let bytes = postcard::to_allocvec(r).expect("postcard OK");
sink.append_record(&bytes).expect("append OK");
}
sink.flush().expect("flush OK");
}
buf
}
fn parse_stream(stream: &[u8]) -> Result<Vec<Vec<u8>>, WalExportError> {
if stream.len() < STREAM_HEADER_MAGIC.len() {
return Err(WalExportError::InvalidFraming(
InvalidFramingReason::Truncated,
));
}
if stream[..STREAM_HEADER_MAGIC.len()] != STREAM_HEADER_MAGIC {
return Err(WalExportError::InvalidFraming(
InvalidFramingReason::HeaderMissing,
));
}
let mut cursor = STREAM_HEADER_MAGIC.len();
let mut records = Vec::new();
while cursor < stream.len() {
if stream.len() - cursor < 8 {
return Err(WalExportError::InvalidFraming(
InvalidFramingReason::Truncated,
));
}
let len_bytes: [u8; 8] = stream[cursor..cursor + 8].try_into().expect("8-byte slice");
let len = u64::from_be_bytes(len_bytes);
if len == 0 {
return Err(WalExportError::InvalidFraming(
InvalidFramingReason::LengthZero,
));
}
if len > MAX_RECORD_BYTES {
return Err(WalExportError::InvalidFraming(
InvalidFramingReason::LengthExceedsMax {
prefix: len,
max: MAX_RECORD_BYTES,
},
));
}
cursor += 8;
let len_usize = len as usize;
if stream.len() - cursor < len_usize {
return Err(WalExportError::InvalidFraming(
InvalidFramingReason::Truncated,
));
}
records.push(stream[cursor..cursor + len_usize].to_vec());
cursor += len_usize;
}
Ok(records)
}
fn reconstruct_wal(
header: arkhe_kernel::persist::WalHeader,
record_byte_slices: Vec<Vec<u8>>,
) -> Wal {
let records: Vec<WalRecord> = record_byte_slices
.iter()
.map(|bytes| postcard::from_bytes(bytes).expect("postcard decode OK"))
.collect();
Wal { header, records }
}
#[test]
fn round_trip_real_wal_records_verify_chain() {
let original = build_wal_with_records(5);
original
.verify_chain(WORLD_ID)
.expect("baseline WAL chain valid");
let stream = export_records_to_stream(&original.records);
let parsed = parse_stream(&stream).expect("parse OK");
assert_eq!(
parsed.len(),
original.records.len(),
"round-trip preserves record count"
);
let reconstructed = reconstruct_wal(original.header.clone(), parsed);
reconstructed
.verify_chain(WORLD_ID)
.expect("round-trip WAL chain still valid");
}
#[test]
fn round_trip_records_byte_identical_to_originals() {
let original = build_wal_with_records(3);
let stream = export_records_to_stream(&original.records);
let parsed = parse_stream(&stream).expect("parse OK");
for (i, parsed_bytes) in parsed.iter().enumerate() {
let original_bytes = postcard::to_allocvec(&original.records[i]).expect("postcard OK");
assert_eq!(
parsed_bytes, &original_bytes,
"record {i} parsed bytes equal original postcard encoding"
);
}
}
#[test]
fn empty_stream_rejected_with_truncated() {
let result = parse_stream(&[]);
assert!(matches!(
result,
Err(WalExportError::InvalidFraming(
InvalidFramingReason::Truncated
))
));
}
#[test]
fn invalid_header_magic_rejected_with_header_missing() {
let mut bogus = vec![0xFFu8; 8];
bogus.extend_from_slice(&[0u8; 16]);
let result = parse_stream(&bogus);
assert!(matches!(
result,
Err(WalExportError::InvalidFraming(
InvalidFramingReason::HeaderMissing
))
));
}
#[test]
fn truncated_in_length_prefix_rejected_with_truncated() {
let mut stream = STREAM_HEADER_MAGIC.to_vec();
stream.extend_from_slice(&[0x00, 0x00, 0x00, 0x00]); let result = parse_stream(&stream);
assert!(matches!(
result,
Err(WalExportError::InvalidFraming(
InvalidFramingReason::Truncated
))
));
}
#[test]
fn zero_length_prefix_rejected_with_length_zero() {
let mut stream = STREAM_HEADER_MAGIC.to_vec();
stream.extend_from_slice(&0u64.to_be_bytes());
let result = parse_stream(&stream);
assert!(matches!(
result,
Err(WalExportError::InvalidFraming(
InvalidFramingReason::LengthZero
))
));
}
#[test]
fn oversized_length_prefix_rejected_with_length_exceeds_max() {
let mut stream = STREAM_HEADER_MAGIC.to_vec();
stream.extend_from_slice(&(MAX_RECORD_BYTES + 1).to_be_bytes());
let result = parse_stream(&stream);
match result {
Err(WalExportError::InvalidFraming(InvalidFramingReason::LengthExceedsMax {
prefix,
max,
})) => {
assert_eq!(prefix, MAX_RECORD_BYTES + 1);
assert_eq!(max, MAX_RECORD_BYTES);
}
other => panic!("expected LengthExceedsMax, got: {other:?}"),
}
}
#[test]
fn truncated_in_payload_rejected_with_truncated() {
let original = build_wal_with_records(2);
let stream = export_records_to_stream(&original.records);
let mut stream = stream;
stream.truncate(stream.len() - 4);
let result = parse_stream(&stream);
assert!(matches!(
result,
Err(WalExportError::InvalidFraming(
InvalidFramingReason::Truncated
))
));
}
#[test]
fn tampered_record_section_breaks_verify_chain() {
let original = build_wal_with_records(3);
let mut stream = export_records_to_stream(&original.records);
let perturb_offset = STREAM_HEADER_MAGIC.len() + 8 + 32; stream[perturb_offset] ^= 0x01;
let parsed = parse_stream(&stream).expect("framing still valid");
let reconstructed = reconstruct_wal(original.header.clone(), parsed);
let result = reconstructed.verify_chain(WORLD_ID);
assert!(
result.is_err(),
"tampered record bytes break chain verification"
);
}
#[test]
fn record_duplication_rejected_at_sink_with_append_only_violation() {
let original = build_wal_with_records(2);
let mut buf = Vec::<u8>::new();
{
let mut sink = BufferedWalSink::new(&mut buf);
let bytes = postcard::to_allocvec(&original.records[0]).expect("postcard OK");
sink.append_record(&bytes).expect("first append OK");
let result = sink.append_record(&bytes);
assert!(matches!(
result,
Err(WalExportError::AppendOnlyViolation {
expected_seq: 2,
got_seq: 1,
previous_seq: Some(1),
})
));
}
}
#[test]
fn duplicated_record_in_parsed_stream_breaks_verify_chain() {
let original = build_wal_with_records(3);
let mut tampered_records = original.records.clone();
tampered_records[2] = tampered_records[1].clone();
let tampered_wal = Wal {
header: original.header.clone(),
records: tampered_records,
};
let result = tampered_wal.verify_chain(WORLD_ID);
assert!(
result.is_err(),
"duplicated record breaks verify_chain (replay-attack defence)"
);
}
#[test]
fn mid_stream_magic_insertion_caught_by_length_exceeds_max() {
let original = build_wal_with_records(2);
let stream = export_records_to_stream(&original.records);
let header_end = STREAM_HEADER_MAGIC.len();
let len_bytes: [u8; 8] = stream[header_end..header_end + 8]
.try_into()
.expect("8 bytes");
let rec0_len = u64::from_be_bytes(len_bytes) as usize;
let insert_at = header_end + 8 + rec0_len;
let mut tampered = stream[..insert_at].to_vec();
tampered.extend_from_slice(&STREAM_HEADER_MAGIC);
tampered.extend_from_slice(&stream[insert_at..]);
let result = parse_stream(&tampered);
match result {
Err(WalExportError::InvalidFraming(InvalidFramingReason::LengthExceedsMax {
prefix,
max,
})) => {
let expected_prefix = u64::from_be_bytes(STREAM_HEADER_MAGIC);
assert_eq!(prefix, expected_prefix);
assert_eq!(max, MAX_RECORD_BYTES);
}
other => panic!("expected LengthExceedsMax (mid-stream magic), got: {other:?}"),
}
}
#[test]
fn streaming_export_is_byte_deterministic_across_runs() {
let wal1 = build_wal_with_records(3);
let wal2 = build_wal_with_records(3);
let s1 = export_records_to_stream(&wal1.records);
let s2 = export_records_to_stream(&wal2.records);
assert_eq!(s1, s2, "streamed export bytes match across runs");
}
#[test]
fn walrecord_leading_seq_invariant_bridge() {
let wal = build_wal_with_records(3);
assert_eq!(
wal.records.len(),
3,
"Kernel pipeline must produce 3 records"
);
for (idx, record) in wal.records.iter().enumerate() {
let bytes = postcard::to_allocvec(record).expect("postcard encode OK");
let extracted = BufferedWalSink::<Vec<u8>>::extract_seq(&bytes)
.expect("extract_seq decodes leading seq u64");
assert_eq!(
extracted, record.seq,
"record #{idx}: extract_seq must equal record.seq (L0 schema coupling)"
);
}
}
}