arkhe_forge_platform/wal_export/reader.rs
1//! `WalStreamReader` — production reader-side surface.
2//!
3//! Consumes the byte stream produced by [`super::BufferedWalSink`] (or
4//! any `[u64 BE length prefix][payload]`-framed sequence beginning
5//! with a recognised [`StreamMagic`] tag) and yields per-record
6//! payload borrows. The complement to the writer-side
7//! [`super::BufferedWalSink`].
8//!
9//! # Streaming-iterator pattern
10//!
11//! `WalStreamReader::next_record` returns `Option<&[u8]>` where the
12//! borrow is tied to `&mut self` — the slice is valid until the next
13//! call mutates the internal buffer. Callers decode the borrowed
14//! payload (typically via `postcard::from_bytes::<WalRecord>`) inside
15//! the loop iteration and must not retain the slice across iterations.
16//! This avoids the per-record allocation that an owned `Vec<u8>`
17//! return would force, at the cost of giving up `std::iter::Iterator`
18//! (whose lifetime contract precludes the borrow tie).
19//!
20//! # Fail-secure framing posture
21//!
22//! Five reject paths cover the framing surface:
23//!
24//! 1. **Unknown magic** at stream open → [`WalExportError::UnsupportedStreamVersion`].
25//! Stack-only 8-byte read, no heap alloc pre-rejection (fail-fast).
26//! 2. **Truncated header** (fewer than 8 bytes) at open →
27//! [`InvalidFramingReason::HeaderMissing`] via `Truncated` mapping.
28//! 3. **Length prefix exceeds bound** → [`InvalidFramingReason::LengthExceedsMax`]
29//! rejection BEFORE allocating the payload buffer (16 MiB
30//! fail-secure ceiling).
31//! 4. **Length prefix zero** → [`InvalidFramingReason::LengthZero`].
32//! 5. **Premature EOF mid-record** (length prefix complete but
33//! payload short) → [`InvalidFramingReason::Truncated`].
34//!
35//! Clean EOF (zero bytes available at the start of the next record)
36//! returns `Ok(None)` — distinguishable from torn headers via a
37//! single-byte probe before committing to the full 8-byte read.
38//!
39//! # Bridge to integration round-trip tests
40//!
41//! The sibling `round_trip_tests` module's `parse_stream` test helper
42//! duplicates the framing logic to keep that module independent of
43//! this reader API; this module hosts the production-grade equivalent
44//! for non-test callers.
45
46use std::io::{ErrorKind, Read};
47
48use super::{InvalidFramingReason, StreamMagic, WalExportError, MAX_RECORD_BYTES};
49
50/// Reader-side trait — `next_record` plus `cumulative_position`.
51///
52/// The `next_record` lifetime tie (`&mut self` borrow → `&[u8]`
53/// borrow on the return) makes this trait a *streaming iterator*
54/// rather than a `std::iter::Iterator`. Callers iterate via:
55///
56/// ```ignore
57/// while let Some(payload) = reader.next_record()? {
58/// // decode `payload` here; the slice expires next iteration
59/// }
60/// ```
61pub trait WalStreamReader {
62 /// Read the next record's payload bytes.
63 ///
64 /// Returns:
65 /// - `Ok(Some(payload))` — a borrow into the reader's internal
66 /// buffer, valid until the next call.
67 /// - `Ok(None)` — clean EOF, no more records.
68 /// - `Err(WalExportError::InvalidFraming(...))` — framing reject
69 /// per the five fail-secure paths documented above.
70 /// - `Err(WalExportError::Io(...))` — underlying I/O failure.
71 fn next_record(&mut self) -> Result<Option<&[u8]>, WalExportError>;
72
73 /// Cumulative byte count consumed from the underlying reader,
74 /// including the 8-byte stream header magic + each record's
75 /// `[8 byte length prefix][payload]` frame. Useful for forensic
76 /// "where did the parse fail" reporting and for operator metrics
77 /// (bytes-processed throughput).
78 fn cumulative_position(&self) -> u64;
79}
80
81/// Concrete `WalStreamReader` impl over an arbitrary `std::io::Read`.
82///
83/// Typical instantiations:
84/// - `StreamingWalReader<std::fs::File>` — on-disk archive reader
85/// - `StreamingWalReader<&[u8]>` — in-memory test fixture (the common
86/// case in this module's own test suite)
87#[derive(Debug)]
88pub struct StreamingWalReader<R: Read> {
89 reader: R,
90 /// Internal buffer reused across `next_record` calls. Sized up
91 /// to fit each record's payload; capacity grows monotonically
92 /// (no shrink) so steady-state throughput avoids reallocation
93 /// once the largest record so far has been seen.
94 buffer: Vec<u8>,
95 /// Cumulative bytes consumed from `reader`.
96 cumulative_pos: u64,
97 /// The recognised stream magic (always `V1` currently; future
98 /// versions add variants without breaking the reader's frame
99 /// loop, which is version-agnostic at the framing layer).
100 magic: StreamMagic,
101}
102
103impl<R: Read> StreamingWalReader<R> {
104 /// Open a reader by consuming + dispatching the 8-byte stream
105 /// header magic.
106 ///
107 /// **Fail-fast posture**: the 8-byte header is read into a stack
108 /// array; an unrecognised magic yields
109 /// [`WalExportError::UnsupportedStreamVersion`] with no heap
110 /// allocation pre-rejection. A truncated header (fewer than 8
111 /// bytes) yields [`InvalidFramingReason::HeaderMissing`].
112 pub fn open_v1(mut reader: R) -> Result<Self, WalExportError> {
113 let mut magic_bytes = [0u8; 8];
114 match reader.read_exact(&mut magic_bytes) {
115 Ok(()) => {}
116 Err(e) if e.kind() == ErrorKind::UnexpectedEof => {
117 return Err(WalExportError::InvalidFraming(
118 InvalidFramingReason::HeaderMissing,
119 ));
120 }
121 Err(e) => return Err(WalExportError::Io(e)),
122 }
123 let magic = StreamMagic::recognize(&magic_bytes)
124 .ok_or(WalExportError::UnsupportedStreamVersion { magic: magic_bytes })?;
125 Ok(Self {
126 reader,
127 buffer: Vec::new(),
128 cumulative_pos: 8,
129 magic,
130 })
131 }
132
133 /// Recognised stream magic version. Currently always
134 /// [`StreamMagic::V1`].
135 #[must_use]
136 pub fn magic(&self) -> StreamMagic {
137 self.magic
138 }
139}
140
141impl<R: Read> WalStreamReader for StreamingWalReader<R> {
142 fn next_record(&mut self) -> Result<Option<&[u8]>, WalExportError> {
143 // Step 1: probe one byte to distinguish clean EOF (Ok(0))
144 // from a torn length prefix (UnexpectedEof on the remaining
145 // 7 bytes). `read_exact` would conflate the two.
146 let mut first_byte = [0u8; 1];
147 let n = self
148 .reader
149 .read(&mut first_byte)
150 .map_err(WalExportError::Io)?;
151 if n == 0 {
152 return Ok(None);
153 }
154
155 // Step 2: read the remaining 7 bytes of the length prefix.
156 // A short read here = torn frame = `Truncated`.
157 let mut rest = [0u8; 7];
158 self.reader.read_exact(&mut rest).map_err(|e| {
159 if e.kind() == ErrorKind::UnexpectedEof {
160 WalExportError::InvalidFraming(InvalidFramingReason::Truncated)
161 } else {
162 WalExportError::Io(e)
163 }
164 })?;
165
166 // Step 3: assemble + validate the length.
167 let mut len_bytes = [0u8; 8];
168 len_bytes[0] = first_byte[0];
169 len_bytes[1..].copy_from_slice(&rest);
170 let len = u64::from_be_bytes(len_bytes);
171 self.cumulative_pos += 8;
172
173 if len == 0 {
174 return Err(WalExportError::InvalidFraming(
175 InvalidFramingReason::LengthZero,
176 ));
177 }
178 if len > MAX_RECORD_BYTES {
179 return Err(WalExportError::InvalidFraming(
180 InvalidFramingReason::LengthExceedsMax {
181 prefix: len,
182 max: MAX_RECORD_BYTES,
183 },
184 ));
185 }
186
187 // Step 4: read the payload BEFORE returning a borrow.
188 // `len` already validated above so the `as usize` is safe on
189 // any 64-bit (or smaller) platform: 16 MiB fits in usize on
190 // every supported target.
191 self.buffer.resize(len as usize, 0);
192 self.reader.read_exact(&mut self.buffer).map_err(|e| {
193 if e.kind() == ErrorKind::UnexpectedEof {
194 WalExportError::InvalidFraming(InvalidFramingReason::Truncated)
195 } else {
196 WalExportError::Io(e)
197 }
198 })?;
199 self.cumulative_pos += len;
200
201 Ok(Some(&self.buffer))
202 }
203
204 fn cumulative_position(&self) -> u64 {
205 self.cumulative_pos
206 }
207}
208
209#[cfg(test)]
210#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
211mod tests {
212 use super::super::buffered_sink::BufferedWalSink;
213 use super::super::{InvalidFramingReason, StreamMagic, WalExportError, WalRecordSink};
214 use super::*;
215 use std::io::Cursor;
216
217 /// Build a synthetic record: postcard-encoded `seq: u64` followed
218 /// by `padding` zero bytes. Mirrors the `synth_record` helper in
219 /// `buffered_sink.rs` tests.
220 fn synth_record(seq: u64, padding: usize) -> Vec<u8> {
221 let mut bytes = postcard::to_stdvec(&seq).unwrap();
222 bytes.extend(std::iter::repeat_n(0u8, padding));
223 bytes
224 }
225
226 /// Encode `n` records via `BufferedWalSink<Vec<u8>>` and return the
227 /// flushed byte stream.
228 fn build_stream(n: u64) -> Vec<u8> {
229 let mut sink = BufferedWalSink::new(Vec::<u8>::new());
230 for i in 0..n {
231 let rec = synth_record(i, 4);
232 sink.append_record(&rec).expect("append OK");
233 }
234 sink.flush().expect("flush OK");
235 sink.into_writer_for_test()
236 }
237
238 /// Round-trip: writer-emitted stream parses back to the original
239 /// record sequence via `StreamingWalReader::open_v1` +
240 /// `next_record`.
241 #[test]
242 fn writer_to_reader_round_trip_three_records() {
243 let stream = build_stream(3);
244 let mut reader = StreamingWalReader::open_v1(Cursor::new(&stream)).expect("open OK");
245 assert_eq!(reader.magic(), StreamMagic::V1);
246
247 let mut decoded_seqs = Vec::new();
248 while let Some(payload) = reader.next_record().expect("next OK") {
249 let seq: u64 = postcard::from_bytes(payload).expect("postcard OK");
250 decoded_seqs.push(seq);
251 }
252 assert_eq!(decoded_seqs, vec![0, 1, 2]);
253 // Header + 3 frames each = (8 + N) bytes.
254 let header_len = 8u64;
255 let per_record_len = 8 + synth_record(0, 4).len() as u64;
256 assert_eq!(
257 reader.cumulative_position(),
258 header_len + 3 * per_record_len
259 );
260 }
261
262 /// `open_v1` on an empty stream returns `HeaderMissing` —
263 /// 0-byte input cannot satisfy the 8-byte magic read.
264 #[test]
265 fn open_v1_empty_stream_rejected_with_header_missing() {
266 let result = StreamingWalReader::open_v1(Cursor::new(Vec::<u8>::new()));
267 assert!(matches!(
268 result,
269 Err(WalExportError::InvalidFraming(
270 InvalidFramingReason::HeaderMissing
271 ))
272 ));
273 }
274
275 /// `open_v1` on a 7-byte truncated header returns `HeaderMissing`.
276 #[test]
277 fn open_v1_truncated_header_rejected_with_header_missing() {
278 let truncated: &[u8] = b"ARKHEXP"; // 7 bytes, missing trailing '1'
279 let result = StreamingWalReader::open_v1(Cursor::new(truncated));
280 assert!(matches!(
281 result,
282 Err(WalExportError::InvalidFraming(
283 InvalidFramingReason::HeaderMissing
284 ))
285 ));
286 }
287
288 /// `open_v1` on an unrecognised magic returns
289 /// `UnsupportedStreamVersion` with the offending bytes carried.
290 #[test]
291 fn open_v1_unknown_magic_rejected_with_unsupported_version() {
292 let unknown: &[u8] = b"ARKHEXP9"; // unrecognised version tag
293 let result = StreamingWalReader::open_v1(Cursor::new(unknown));
294 match result {
295 Err(WalExportError::UnsupportedStreamVersion { magic }) => {
296 assert_eq!(&magic, b"ARKHEXP9");
297 }
298 other => panic!("expected UnsupportedStreamVersion, got {other:?}"),
299 }
300 }
301
302 /// L0 WAL magic (`ARKHEWAL`) misfed to the stream reader is
303 /// rejected as `UnsupportedStreamVersion` (transport mismatch
304 /// defence).
305 #[test]
306 fn open_v1_l0_wal_magic_rejected_with_unsupported_version() {
307 let l0_magic = arkhe_kernel::persist::WalHeader::MAGIC;
308 let result = StreamingWalReader::open_v1(Cursor::new(&l0_magic[..]));
309 assert!(matches!(
310 result,
311 Err(WalExportError::UnsupportedStreamVersion { .. })
312 ));
313 }
314
315 /// `next_record` on a stream with header but zero records returns
316 /// `Ok(None)` — clean EOF.
317 #[test]
318 fn next_record_after_header_only_returns_none_clean_eof() {
319 let mut sink = BufferedWalSink::new(Vec::<u8>::new());
320 // No appends; flush also a no-op since header is emitted only on
321 // first append. So zero-record case = stream is just empty bytes.
322 sink.flush().expect("flush OK");
323 let stream = sink.into_writer_for_test();
324 // Without any records, the stream has 0 bytes (header is
325 // emitted only on first append). A reader cannot open it.
326 // Build a *header-only* stream by appending and then truncating
327 // back to the magic bytes.
328 if !stream.is_empty() {
329 // Defensive: should be empty in current sink semantics.
330 return;
331 }
332 // Construct header-only stream manually.
333 let mut hdr_only = Vec::new();
334 hdr_only.extend_from_slice(StreamMagic::V1.bytes());
335 let mut reader = StreamingWalReader::open_v1(Cursor::new(&hdr_only)).expect("open OK");
336 assert!(matches!(reader.next_record(), Ok(None)));
337 assert_eq!(reader.cumulative_position(), 8);
338 }
339
340 /// Length prefix zero rejected with `LengthZero` (mid-stream).
341 #[test]
342 fn next_record_length_zero_rejected() {
343 let mut stream = Vec::new();
344 stream.extend_from_slice(StreamMagic::V1.bytes());
345 stream.extend_from_slice(&0u64.to_be_bytes()); // explicit length-zero frame
346 let mut reader = StreamingWalReader::open_v1(Cursor::new(&stream)).expect("open OK");
347 assert!(matches!(
348 reader.next_record(),
349 Err(WalExportError::InvalidFraming(
350 InvalidFramingReason::LengthZero
351 ))
352 ));
353 }
354
355 /// Length prefix exceeding `MAX_RECORD_BYTES` rejected with
356 /// `LengthExceedsMax` (no payload alloc attempted — the bound
357 /// check fires before `Vec::resize`).
358 #[test]
359 fn next_record_length_exceeds_max_rejected_pre_alloc() {
360 let mut stream = Vec::new();
361 stream.extend_from_slice(StreamMagic::V1.bytes());
362 // 17 MiB length prefix — exceeds 16 MiB ceiling.
363 let oversize = MAX_RECORD_BYTES + 1;
364 stream.extend_from_slice(&oversize.to_be_bytes());
365 // Note: NO payload bytes follow — the reader must reject on
366 // length prefix alone, never reach `read_exact(&mut buffer)`.
367 let mut reader = StreamingWalReader::open_v1(Cursor::new(&stream)).expect("open OK");
368 match reader.next_record() {
369 Err(WalExportError::InvalidFraming(InvalidFramingReason::LengthExceedsMax {
370 prefix,
371 max,
372 })) => {
373 assert_eq!(prefix, oversize);
374 assert_eq!(max, MAX_RECORD_BYTES);
375 }
376 other => panic!("expected LengthExceedsMax, got {other:?}"),
377 }
378 }
379
380 /// Truncated mid-length-prefix (4 of 8 bytes available) rejected
381 /// with `Truncated`.
382 #[test]
383 fn next_record_truncated_mid_length_prefix_rejected() {
384 let mut stream = Vec::new();
385 stream.extend_from_slice(StreamMagic::V1.bytes());
386 stream.extend_from_slice(&[0xAA, 0xBB, 0xCC, 0xDD]); // only 4 of 8 length bytes
387 let mut reader = StreamingWalReader::open_v1(Cursor::new(&stream)).expect("open OK");
388 assert!(matches!(
389 reader.next_record(),
390 Err(WalExportError::InvalidFraming(
391 InvalidFramingReason::Truncated
392 ))
393 ));
394 }
395
396 /// Truncated mid-payload (length says N bytes but only N-3
397 /// available) rejected with `Truncated`.
398 #[test]
399 fn next_record_truncated_mid_payload_rejected() {
400 let mut stream = Vec::new();
401 stream.extend_from_slice(StreamMagic::V1.bytes());
402 // Length prefix = 10 bytes; supply only 6.
403 stream.extend_from_slice(&10u64.to_be_bytes());
404 stream.extend_from_slice(&[0u8; 6]);
405 let mut reader = StreamingWalReader::open_v1(Cursor::new(&stream)).expect("open OK");
406 assert!(matches!(
407 reader.next_record(),
408 Err(WalExportError::InvalidFraming(
409 InvalidFramingReason::Truncated
410 ))
411 ));
412 }
413
414 /// Reader resumes correctly across multiple successful records,
415 /// and `cumulative_position` tracks total bytes consumed.
416 #[test]
417 fn cumulative_position_tracks_per_record_advance() {
418 let stream = build_stream(2);
419 let mut reader = StreamingWalReader::open_v1(Cursor::new(&stream)).expect("open OK");
420 let header = 8u64;
421 let frame = 8u64 + synth_record(0, 4).len() as u64;
422 assert_eq!(reader.cumulative_position(), header);
423
424 let _ = reader.next_record().expect("rec0").expect("Some");
425 assert_eq!(reader.cumulative_position(), header + frame);
426
427 let _ = reader.next_record().expect("rec1").expect("Some");
428 assert_eq!(reader.cumulative_position(), header + 2 * frame);
429
430 assert!(matches!(reader.next_record(), Ok(None)));
431 assert_eq!(reader.cumulative_position(), header + 2 * frame);
432 }
433
434 /// Multiple sequential `next_record` calls correctly invalidate
435 /// the previous borrow: each call returns a fresh slice into the
436 /// reused internal buffer. (Compile-fail behaviour around
437 /// retaining an old borrow across a call is the point of the
438 /// streaming-iterator pattern; this test exercises the runtime
439 /// behaviour.)
440 #[test]
441 fn next_record_borrow_refreshes_on_each_call() {
442 let stream = build_stream(3);
443 let mut reader = StreamingWalReader::open_v1(Cursor::new(&stream)).expect("open OK");
444
445 // Iterate; each call replaces the buffer contents with the new
446 // record's payload. We capture decoded values into owned data
447 // (Vec<u64>) to release the borrow before the next call.
448 let mut seqs = Vec::new();
449 while let Some(payload) = reader.next_record().expect("next OK") {
450 let s: u64 = postcard::from_bytes(payload).expect("decode");
451 seqs.push(s);
452 }
453 assert_eq!(seqs, vec![0u64, 1, 2]);
454 }
455
456 /// `WalStreamReader` dyn-trait object usability check (Send +
457 /// concrete-type erasure for higher-order callers).
458 #[test]
459 fn wal_stream_reader_trait_object_usable() {
460 let stream = build_stream(1);
461 let cursor = Cursor::new(stream);
462 let reader = StreamingWalReader::open_v1(cursor).expect("open OK");
463 let mut boxed: Box<dyn WalStreamReader> = Box::new(reader);
464 let payload = boxed.next_record().expect("next").expect("Some");
465 let seq: u64 = postcard::from_bytes(payload).expect("decode");
466 assert_eq!(seq, 0);
467 }
468}