use std::fmt;
pub mod buffered_sink;
pub mod reader;
pub use buffered_sink::BufferedWalSink;
pub use reader::{StreamingWalReader, WalStreamReader};
#[cfg(test)]
mod wire_stability;
#[cfg(test)]
mod round_trip_tests;
pub const STREAM_HEADER_MAGIC: [u8; 8] = *b"ARKHEXP1";
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
#[non_exhaustive]
pub enum StreamMagic {
V1,
}
impl StreamMagic {
#[must_use]
pub const fn bytes(self) -> &'static [u8; 8] {
match self {
Self::V1 => b"ARKHEXP1",
}
}
#[must_use]
pub fn recognize(bytes: &[u8; 8]) -> Option<Self> {
if bytes == b"ARKHEXP1" {
Some(Self::V1)
} else {
None
}
}
}
pub const MAX_RECORD_BYTES: u64 = 1 << 24;
#[derive(Debug)]
#[non_exhaustive]
pub enum WalExportError {
Io(std::io::Error),
InvalidFraming(InvalidFramingReason),
AppendOnlyViolation {
expected_seq: u64,
got_seq: u64,
previous_seq: Option<u64>,
},
SeqExhausted {
last_seq: u64,
},
UnsupportedStreamVersion {
magic: [u8; 8],
},
BufferOverflow {
capacity: usize,
requested: usize,
current_buffer: usize,
},
}
#[derive(Debug)]
#[non_exhaustive]
pub enum InvalidFramingReason {
LengthExceedsMax {
prefix: u64,
max: u64,
},
LengthZero,
Truncated,
HeaderMissing,
}
impl fmt::Display for WalExportError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Io(e) => write!(f, "wal export io: {e}"),
Self::InvalidFraming(r) => write!(f, "wal export framing rejected: {r}"),
Self::AppendOnlyViolation {
expected_seq,
got_seq,
previous_seq,
} => match previous_seq {
Some(prev) => write!(
f,
"wal export append-only violation: expected seq {expected_seq}, got {got_seq} (last appended {prev})"
),
None => write!(
f,
"wal export append-only violation: expected seq {expected_seq}, got {got_seq} (fresh stream, no prior appends)"
),
},
Self::SeqExhausted { last_seq } => write!(
f,
"wal export seq space exhausted at last_seq {last_seq} — rotate stream"
),
Self::UnsupportedStreamVersion { magic } => write!(
f,
"wal export unsupported stream version: magic {magic:?} not recognised"
),
Self::BufferOverflow {
capacity,
requested,
current_buffer,
} => write!(
f,
"wal export buffer overflow: capacity {capacity} bytes, current {current_buffer}, requested {requested}"
),
}
}
}
impl fmt::Display for InvalidFramingReason {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::LengthExceedsMax { prefix, max } => {
write!(f, "length prefix {prefix} exceeds maximum {max} bytes")
}
Self::LengthZero => write!(f, "length prefix is zero — empty records disallowed"),
Self::Truncated => write!(f, "stream truncated mid-record"),
Self::HeaderMissing => write!(
f,
"stream header magic missing or mismatched (expected ARKHEXP1)"
),
}
}
}
impl std::error::Error for WalExportError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Io(e) => Some(e),
Self::InvalidFraming(_)
| Self::AppendOnlyViolation { .. }
| Self::SeqExhausted { .. }
| Self::UnsupportedStreamVersion { .. }
| Self::BufferOverflow { .. } => None,
}
}
}
impl std::error::Error for InvalidFramingReason {}
impl From<std::io::Error> for WalExportError {
fn from(e: std::io::Error) -> Self {
Self::Io(e)
}
}
pub trait WalRecordSink {
fn append_record(&mut self, record_bytes: &[u8]) -> Result<(), WalExportError>;
fn flush(&mut self) -> Result<(), WalExportError>;
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used)]
mod tests {
use super::*;
#[test]
fn stream_header_magic_is_arkhexp1() {
assert_eq!(&STREAM_HEADER_MAGIC, b"ARKHEXP1");
}
#[test]
fn stream_magic_v1_bytes_match_legacy_constant() {
assert_eq!(StreamMagic::V1.bytes(), &STREAM_HEADER_MAGIC);
assert_eq!(
StreamMagic::V1.bytes(),
&[0x41, 0x52, 0x4B, 0x48, 0x45, 0x58, 0x50, 0x31]
);
}
#[test]
fn stream_magic_recognize_v1_and_rejects_unknown() {
assert_eq!(StreamMagic::recognize(b"ARKHEXP1"), Some(StreamMagic::V1));
assert_eq!(StreamMagic::recognize(b"ARKHEXP2"), None);
assert_eq!(StreamMagic::recognize(b"ARKHEWAL"), None);
assert_eq!(StreamMagic::recognize(b"\0\0\0\0\0\0\0\0"), None);
}
#[test]
fn unsupported_stream_version_display_distinguishes_with_magic() {
let err = WalExportError::UnsupportedStreamVersion {
magic: *b"ARKHEXP2",
};
let msg = err.to_string();
assert!(msg.contains("unsupported stream version"));
assert!(msg.contains("not recognised"));
assert!(msg.contains("65")); }
#[test]
fn stream_header_magic_distinct_from_l0_wal_magic() {
assert_ne!(
&STREAM_HEADER_MAGIC,
&arkhe_kernel::persist::WalHeader::MAGIC
);
}
#[test]
fn max_record_bytes_is_sixteen_mib() {
assert_eq!(MAX_RECORD_BYTES, 1 << 24);
assert_eq!(MAX_RECORD_BYTES, 16_777_216);
}
#[test]
fn wal_export_error_implements_standard_error_traits() {
fn assert_send_sync<T: Send + Sync>() {}
fn assert_error<T: std::error::Error>() {}
assert_send_sync::<WalExportError>();
assert_error::<WalExportError>();
assert_send_sync::<InvalidFramingReason>();
assert_error::<InvalidFramingReason>();
}
#[test]
fn wal_export_error_display_is_distinguishable() {
let io_err = WalExportError::Io(std::io::Error::other("test"));
let framing_err = WalExportError::InvalidFraming(InvalidFramingReason::LengthZero);
let append_err = WalExportError::AppendOnlyViolation {
expected_seq: 5,
got_seq: 3,
previous_seq: Some(4),
};
let exhausted_err = WalExportError::SeqExhausted { last_seq: u64::MAX };
let buf_err = WalExportError::BufferOverflow {
capacity: 1024,
requested: 2048,
current_buffer: 768,
};
let io_msg = io_err.to_string();
let framing_msg = framing_err.to_string();
let append_msg = append_err.to_string();
let exhausted_msg = exhausted_err.to_string();
let buf_msg = buf_err.to_string();
assert!(io_msg.contains("io"));
assert!(framing_msg.contains("framing"));
assert!(append_msg.contains("append-only"));
assert!(append_msg.contains('5'));
assert!(append_msg.contains('3'));
assert!(append_msg.contains('4')); assert!(exhausted_msg.contains("exhausted"));
assert!(exhausted_msg.contains("rotate"));
assert!(buf_msg.contains("buffer overflow"));
assert!(buf_msg.contains("1024"));
assert!(buf_msg.contains("2048"));
assert!(buf_msg.contains("768")); }
#[test]
fn append_only_violation_fresh_stream_display_distinguishes_from_some_case() {
let fresh = WalExportError::AppendOnlyViolation {
expected_seq: 0,
got_seq: 7,
previous_seq: None,
};
let after = WalExportError::AppendOnlyViolation {
expected_seq: 5,
got_seq: 3,
previous_seq: Some(4),
};
let fresh_msg = fresh.to_string();
let after_msg = after.to_string();
assert!(fresh_msg.contains("fresh stream"));
assert!(fresh_msg.contains("no prior appends"));
assert!(after_msg.contains("last appended"));
assert!(after_msg.contains('4'));
assert_ne!(fresh_msg, after_msg);
}
#[test]
fn invalid_framing_reason_display_is_distinguishable() {
let exceeds = InvalidFramingReason::LengthExceedsMax {
prefix: MAX_RECORD_BYTES + 1,
max: MAX_RECORD_BYTES,
};
let zero = InvalidFramingReason::LengthZero;
let trunc = InvalidFramingReason::Truncated;
let hdr = InvalidFramingReason::HeaderMissing;
assert!(exceeds.to_string().contains("exceeds maximum"));
assert!(zero.to_string().contains("zero"));
assert!(trunc.to_string().contains("truncated"));
assert!(hdr.to_string().contains("ARKHEXP1"));
}
#[test]
fn wal_export_error_source_chains_through_io() {
use std::error::Error;
let io_err = WalExportError::Io(std::io::Error::other("test"));
let framing_err = WalExportError::InvalidFraming(InvalidFramingReason::LengthZero);
assert!(io_err.source().is_some());
assert!(framing_err.source().is_none());
}
#[test]
fn from_io_error_lifts_to_wal_export_error() {
let io_err = std::io::Error::other("test");
let lifted: WalExportError = io_err.into();
assert!(matches!(lifted, WalExportError::Io(_)));
}
struct NoopSink;
impl WalRecordSink for NoopSink {
fn append_record(&mut self, _record_bytes: &[u8]) -> Result<(), WalExportError> {
Ok(())
}
fn flush(&mut self) -> Result<(), WalExportError> {
Ok(())
}
}
#[test]
fn wal_record_sink_satisfiable_without_seek() {
let mut sink = NoopSink;
sink.append_record(b"sample").expect("noop sink succeeds");
sink.flush().expect("noop flush");
}
}