use std::io::{ErrorKind, Read};
use super::{InvalidFramingReason, StreamMagic, WalExportError, MAX_RECORD_BYTES};
pub trait WalStreamReader {
fn next_record(&mut self) -> Result<Option<&[u8]>, WalExportError>;
fn cumulative_position(&self) -> u64;
}
#[derive(Debug)]
pub struct StreamingWalReader<R: Read> {
reader: R,
buffer: Vec<u8>,
cumulative_pos: u64,
magic: StreamMagic,
}
impl<R: Read> StreamingWalReader<R> {
pub fn open_v1(mut reader: R) -> Result<Self, WalExportError> {
let mut magic_bytes = [0u8; 8];
match reader.read_exact(&mut magic_bytes) {
Ok(()) => {}
Err(e) if e.kind() == ErrorKind::UnexpectedEof => {
return Err(WalExportError::InvalidFraming(
InvalidFramingReason::HeaderMissing,
));
}
Err(e) => return Err(WalExportError::Io(e)),
}
let magic = StreamMagic::recognize(&magic_bytes)
.ok_or(WalExportError::UnsupportedStreamVersion { magic: magic_bytes })?;
Ok(Self {
reader,
buffer: Vec::new(),
cumulative_pos: 8,
magic,
})
}
#[must_use]
pub fn magic(&self) -> StreamMagic {
self.magic
}
}
impl<R: Read> WalStreamReader for StreamingWalReader<R> {
fn next_record(&mut self) -> Result<Option<&[u8]>, WalExportError> {
let mut first_byte = [0u8; 1];
let n = self
.reader
.read(&mut first_byte)
.map_err(WalExportError::Io)?;
if n == 0 {
return Ok(None);
}
let mut rest = [0u8; 7];
self.reader.read_exact(&mut rest).map_err(|e| {
if e.kind() == ErrorKind::UnexpectedEof {
WalExportError::InvalidFraming(InvalidFramingReason::Truncated)
} else {
WalExportError::Io(e)
}
})?;
let mut len_bytes = [0u8; 8];
len_bytes[0] = first_byte[0];
len_bytes[1..].copy_from_slice(&rest);
let len = u64::from_be_bytes(len_bytes);
self.cumulative_pos += 8;
if len == 0 {
return Err(WalExportError::InvalidFraming(
InvalidFramingReason::LengthZero,
));
}
if len > MAX_RECORD_BYTES {
return Err(WalExportError::InvalidFraming(
InvalidFramingReason::LengthExceedsMax {
prefix: len,
max: MAX_RECORD_BYTES,
},
));
}
self.buffer.resize(len as usize, 0);
self.reader.read_exact(&mut self.buffer).map_err(|e| {
if e.kind() == ErrorKind::UnexpectedEof {
WalExportError::InvalidFraming(InvalidFramingReason::Truncated)
} else {
WalExportError::Io(e)
}
})?;
self.cumulative_pos += len;
Ok(Some(&self.buffer))
}
fn cumulative_position(&self) -> u64 {
self.cumulative_pos
}
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod tests {
use super::super::buffered_sink::BufferedWalSink;
use super::super::{InvalidFramingReason, StreamMagic, WalExportError, WalRecordSink};
use super::*;
use std::io::Cursor;
fn synth_record(seq: u64, padding: usize) -> Vec<u8> {
let mut bytes = postcard::to_stdvec(&seq).unwrap();
bytes.extend(std::iter::repeat_n(0u8, padding));
bytes
}
fn build_stream(n: u64) -> Vec<u8> {
let mut sink = BufferedWalSink::new(Vec::<u8>::new());
for i in 0..n {
let rec = synth_record(i, 4);
sink.append_record(&rec).expect("append OK");
}
sink.flush().expect("flush OK");
sink.into_writer_for_test()
}
#[test]
fn writer_to_reader_round_trip_three_records() {
let stream = build_stream(3);
let mut reader = StreamingWalReader::open_v1(Cursor::new(&stream)).expect("open OK");
assert_eq!(reader.magic(), StreamMagic::V1);
let mut decoded_seqs = Vec::new();
while let Some(payload) = reader.next_record().expect("next OK") {
let seq: u64 = postcard::from_bytes(payload).expect("postcard OK");
decoded_seqs.push(seq);
}
assert_eq!(decoded_seqs, vec![0, 1, 2]);
let header_len = 8u64;
let per_record_len = 8 + synth_record(0, 4).len() as u64;
assert_eq!(
reader.cumulative_position(),
header_len + 3 * per_record_len
);
}
#[test]
fn open_v1_empty_stream_rejected_with_header_missing() {
let result = StreamingWalReader::open_v1(Cursor::new(Vec::<u8>::new()));
assert!(matches!(
result,
Err(WalExportError::InvalidFraming(
InvalidFramingReason::HeaderMissing
))
));
}
#[test]
fn open_v1_truncated_header_rejected_with_header_missing() {
let truncated: &[u8] = b"ARKHEXP"; let result = StreamingWalReader::open_v1(Cursor::new(truncated));
assert!(matches!(
result,
Err(WalExportError::InvalidFraming(
InvalidFramingReason::HeaderMissing
))
));
}
#[test]
fn open_v1_unknown_magic_rejected_with_unsupported_version() {
let unknown: &[u8] = b"ARKHEXP9"; let result = StreamingWalReader::open_v1(Cursor::new(unknown));
match result {
Err(WalExportError::UnsupportedStreamVersion { magic }) => {
assert_eq!(&magic, b"ARKHEXP9");
}
other => panic!("expected UnsupportedStreamVersion, got {other:?}"),
}
}
#[test]
fn open_v1_l0_wal_magic_rejected_with_unsupported_version() {
let l0_magic = arkhe_kernel::persist::WalHeader::MAGIC;
let result = StreamingWalReader::open_v1(Cursor::new(&l0_magic[..]));
assert!(matches!(
result,
Err(WalExportError::UnsupportedStreamVersion { .. })
));
}
#[test]
fn next_record_after_header_only_returns_none_clean_eof() {
let mut sink = BufferedWalSink::new(Vec::<u8>::new());
sink.flush().expect("flush OK");
let stream = sink.into_writer_for_test();
if !stream.is_empty() {
return;
}
let mut hdr_only = Vec::new();
hdr_only.extend_from_slice(StreamMagic::V1.bytes());
let mut reader = StreamingWalReader::open_v1(Cursor::new(&hdr_only)).expect("open OK");
assert!(matches!(reader.next_record(), Ok(None)));
assert_eq!(reader.cumulative_position(), 8);
}
#[test]
fn next_record_length_zero_rejected() {
let mut stream = Vec::new();
stream.extend_from_slice(StreamMagic::V1.bytes());
stream.extend_from_slice(&0u64.to_be_bytes()); let mut reader = StreamingWalReader::open_v1(Cursor::new(&stream)).expect("open OK");
assert!(matches!(
reader.next_record(),
Err(WalExportError::InvalidFraming(
InvalidFramingReason::LengthZero
))
));
}
#[test]
fn next_record_length_exceeds_max_rejected_pre_alloc() {
let mut stream = Vec::new();
stream.extend_from_slice(StreamMagic::V1.bytes());
let oversize = MAX_RECORD_BYTES + 1;
stream.extend_from_slice(&oversize.to_be_bytes());
let mut reader = StreamingWalReader::open_v1(Cursor::new(&stream)).expect("open OK");
match reader.next_record() {
Err(WalExportError::InvalidFraming(InvalidFramingReason::LengthExceedsMax {
prefix,
max,
})) => {
assert_eq!(prefix, oversize);
assert_eq!(max, MAX_RECORD_BYTES);
}
other => panic!("expected LengthExceedsMax, got {other:?}"),
}
}
#[test]
fn next_record_truncated_mid_length_prefix_rejected() {
let mut stream = Vec::new();
stream.extend_from_slice(StreamMagic::V1.bytes());
stream.extend_from_slice(&[0xAA, 0xBB, 0xCC, 0xDD]); let mut reader = StreamingWalReader::open_v1(Cursor::new(&stream)).expect("open OK");
assert!(matches!(
reader.next_record(),
Err(WalExportError::InvalidFraming(
InvalidFramingReason::Truncated
))
));
}
#[test]
fn next_record_truncated_mid_payload_rejected() {
let mut stream = Vec::new();
stream.extend_from_slice(StreamMagic::V1.bytes());
stream.extend_from_slice(&10u64.to_be_bytes());
stream.extend_from_slice(&[0u8; 6]);
let mut reader = StreamingWalReader::open_v1(Cursor::new(&stream)).expect("open OK");
assert!(matches!(
reader.next_record(),
Err(WalExportError::InvalidFraming(
InvalidFramingReason::Truncated
))
));
}
#[test]
fn cumulative_position_tracks_per_record_advance() {
let stream = build_stream(2);
let mut reader = StreamingWalReader::open_v1(Cursor::new(&stream)).expect("open OK");
let header = 8u64;
let frame = 8u64 + synth_record(0, 4).len() as u64;
assert_eq!(reader.cumulative_position(), header);
let _ = reader.next_record().expect("rec0").expect("Some");
assert_eq!(reader.cumulative_position(), header + frame);
let _ = reader.next_record().expect("rec1").expect("Some");
assert_eq!(reader.cumulative_position(), header + 2 * frame);
assert!(matches!(reader.next_record(), Ok(None)));
assert_eq!(reader.cumulative_position(), header + 2 * frame);
}
#[test]
fn next_record_borrow_refreshes_on_each_call() {
let stream = build_stream(3);
let mut reader = StreamingWalReader::open_v1(Cursor::new(&stream)).expect("open OK");
let mut seqs = Vec::new();
while let Some(payload) = reader.next_record().expect("next OK") {
let s: u64 = postcard::from_bytes(payload).expect("decode");
seqs.push(s);
}
assert_eq!(seqs, vec![0u64, 1, 2]);
}
#[test]
fn wal_stream_reader_trait_object_usable() {
let stream = build_stream(1);
let cursor = Cursor::new(stream);
let reader = StreamingWalReader::open_v1(cursor).expect("open OK");
let mut boxed: Box<dyn WalStreamReader> = Box::new(reader);
let payload = boxed.next_record().expect("next").expect("Some");
let seq: u64 = postcard::from_bytes(payload).expect("decode");
assert_eq!(seq, 0);
}
}