Skip to main content

arkhe_forge_platform/wal_export/
reader.rs

1//! `WalStreamReader` — production reader-side surface.
2//!
3//! Consumes the byte stream produced by [`super::BufferedWalSink`] (or
4//! any `[u64 BE length prefix][payload]`-framed sequence beginning
5//! with a recognised [`StreamMagic`] tag) and yields per-record
6//! payload borrows. The complement to the writer-side
7//! [`super::BufferedWalSink`].
8//!
9//! # Streaming-iterator pattern
10//!
11//! `WalStreamReader::next_record` returns `Option<&[u8]>` where the
12//! borrow is tied to `&mut self` — the slice is valid until the next
13//! call mutates the internal buffer. Callers decode the borrowed
14//! payload (typically via `postcard::from_bytes::<WalRecord>`) inside
15//! the loop iteration and must not retain the slice across iterations.
16//! This avoids the per-record allocation that an owned `Vec<u8>`
17//! return would force, at the cost of giving up `std::iter::Iterator`
18//! (whose lifetime contract precludes the borrow tie).
19//!
20//! # Fail-secure framing posture
21//!
22//! Five reject paths cover the framing surface:
23//!
24//! 1. **Unknown magic** at stream open → [`WalExportError::UnsupportedStreamVersion`].
25//!    Stack-only 8-byte read, no heap alloc pre-rejection (fail-fast).
26//! 2. **Truncated header** (fewer than 8 bytes) at open →
27//!    [`InvalidFramingReason::HeaderMissing`] via `Truncated` mapping.
28//! 3. **Length prefix exceeds bound** → [`InvalidFramingReason::LengthExceedsMax`]
29//!    rejection BEFORE allocating the payload buffer (16 MiB
30//!    fail-secure ceiling).
31//! 4. **Length prefix zero** → [`InvalidFramingReason::LengthZero`].
32//! 5. **Premature EOF mid-record** (length prefix complete but
33//!    payload short) → [`InvalidFramingReason::Truncated`].
34//!
35//! Clean EOF (zero bytes available at the start of the next record)
36//! returns `Ok(None)` — distinguishable from torn headers via a
37//! single-byte probe before committing to the full 8-byte read.
38//!
39//! # Bridge to integration round-trip tests
40//!
41//! The sibling `round_trip_tests` module's `parse_stream` test helper
42//! duplicates the framing logic to keep that module independent of
43//! this reader API; this module hosts the production-grade equivalent
44//! for non-test callers.
45
46use std::io::{ErrorKind, Read};
47
48use super::{InvalidFramingReason, StreamMagic, WalExportError, MAX_RECORD_BYTES};
49
50/// Reader-side trait — `next_record` plus `cumulative_position`.
51///
52/// The `next_record` lifetime tie (`&mut self` borrow → `&[u8]`
53/// borrow on the return) makes this trait a *streaming iterator*
54/// rather than a `std::iter::Iterator`. Callers iterate via:
55///
56/// ```ignore
57/// while let Some(payload) = reader.next_record()? {
58///     // decode `payload` here; the slice expires next iteration
59/// }
60/// ```
61pub trait WalStreamReader {
62    /// Read the next record's payload bytes.
63    ///
64    /// Returns:
65    /// - `Ok(Some(payload))` — a borrow into the reader's internal
66    ///   buffer, valid until the next call.
67    /// - `Ok(None)` — clean EOF, no more records.
68    /// - `Err(WalExportError::InvalidFraming(...))` — framing reject
69    ///   per the five fail-secure paths documented above.
70    /// - `Err(WalExportError::Io(...))` — underlying I/O failure.
71    fn next_record(&mut self) -> Result<Option<&[u8]>, WalExportError>;
72
73    /// Cumulative byte count consumed from the underlying reader,
74    /// including the 8-byte stream header magic + each record's
75    /// `[8 byte length prefix][payload]` frame. Useful for forensic
76    /// "where did the parse fail" reporting and for operator metrics
77    /// (bytes-processed throughput).
78    fn cumulative_position(&self) -> u64;
79}
80
81/// Concrete `WalStreamReader` impl over an arbitrary `std::io::Read`.
82///
83/// Typical instantiations:
84/// - `StreamingWalReader<std::fs::File>` — on-disk archive reader
85/// - `StreamingWalReader<&[u8]>` — in-memory test fixture (the common
86///   case in this module's own test suite)
87#[derive(Debug)]
88pub struct StreamingWalReader<R: Read> {
89    reader: R,
90    /// Internal buffer reused across `next_record` calls. Sized up
91    /// to fit each record's payload; capacity grows monotonically
92    /// (no shrink) so steady-state throughput avoids reallocation
93    /// once the largest record so far has been seen.
94    buffer: Vec<u8>,
95    /// Cumulative bytes consumed from `reader`.
96    cumulative_pos: u64,
97    /// The recognised stream magic (always `V1` currently; future
98    /// versions add variants without breaking the reader's frame
99    /// loop, which is version-agnostic at the framing layer).
100    magic: StreamMagic,
101}
102
103impl<R: Read> StreamingWalReader<R> {
104    /// Open a reader by consuming + dispatching the 8-byte stream
105    /// header magic.
106    ///
107    /// **Fail-fast posture**: the 8-byte header is read into a stack
108    /// array; an unrecognised magic yields
109    /// [`WalExportError::UnsupportedStreamVersion`] with no heap
110    /// allocation pre-rejection. A truncated header (fewer than 8
111    /// bytes) yields [`InvalidFramingReason::HeaderMissing`].
112    pub fn open_v1(mut reader: R) -> Result<Self, WalExportError> {
113        let mut magic_bytes = [0u8; 8];
114        match reader.read_exact(&mut magic_bytes) {
115            Ok(()) => {}
116            Err(e) if e.kind() == ErrorKind::UnexpectedEof => {
117                return Err(WalExportError::InvalidFraming(
118                    InvalidFramingReason::HeaderMissing,
119                ));
120            }
121            Err(e) => return Err(WalExportError::Io(e)),
122        }
123        let magic = StreamMagic::recognize(&magic_bytes)
124            .ok_or(WalExportError::UnsupportedStreamVersion { magic: magic_bytes })?;
125        Ok(Self {
126            reader,
127            buffer: Vec::new(),
128            cumulative_pos: 8,
129            magic,
130        })
131    }
132
133    /// Recognised stream magic version. Currently always
134    /// [`StreamMagic::V1`].
135    #[must_use]
136    pub fn magic(&self) -> StreamMagic {
137        self.magic
138    }
139}
140
141impl<R: Read> WalStreamReader for StreamingWalReader<R> {
142    fn next_record(&mut self) -> Result<Option<&[u8]>, WalExportError> {
143        // Step 1: probe one byte to distinguish clean EOF (Ok(0))
144        // from a torn length prefix (UnexpectedEof on the remaining
145        // 7 bytes). `read_exact` would conflate the two.
146        let mut first_byte = [0u8; 1];
147        let n = self
148            .reader
149            .read(&mut first_byte)
150            .map_err(WalExportError::Io)?;
151        if n == 0 {
152            return Ok(None);
153        }
154
155        // Step 2: read the remaining 7 bytes of the length prefix.
156        // A short read here = torn frame = `Truncated`.
157        let mut rest = [0u8; 7];
158        self.reader.read_exact(&mut rest).map_err(|e| {
159            if e.kind() == ErrorKind::UnexpectedEof {
160                WalExportError::InvalidFraming(InvalidFramingReason::Truncated)
161            } else {
162                WalExportError::Io(e)
163            }
164        })?;
165
166        // Step 3: assemble + validate the length.
167        let mut len_bytes = [0u8; 8];
168        len_bytes[0] = first_byte[0];
169        len_bytes[1..].copy_from_slice(&rest);
170        let len = u64::from_be_bytes(len_bytes);
171        self.cumulative_pos += 8;
172
173        if len == 0 {
174            return Err(WalExportError::InvalidFraming(
175                InvalidFramingReason::LengthZero,
176            ));
177        }
178        if len > MAX_RECORD_BYTES {
179            return Err(WalExportError::InvalidFraming(
180                InvalidFramingReason::LengthExceedsMax {
181                    prefix: len,
182                    max: MAX_RECORD_BYTES,
183                },
184            ));
185        }
186
187        // Step 4: read the payload BEFORE returning a borrow.
188        // `len` already validated above so the `as usize` is safe on
189        // any 64-bit (or smaller) platform: 16 MiB fits in usize on
190        // every supported target.
191        self.buffer.resize(len as usize, 0);
192        self.reader.read_exact(&mut self.buffer).map_err(|e| {
193            if e.kind() == ErrorKind::UnexpectedEof {
194                WalExportError::InvalidFraming(InvalidFramingReason::Truncated)
195            } else {
196                WalExportError::Io(e)
197            }
198        })?;
199        self.cumulative_pos += len;
200
201        Ok(Some(&self.buffer))
202    }
203
204    fn cumulative_position(&self) -> u64 {
205        self.cumulative_pos
206    }
207}
208
209#[cfg(test)]
210#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
211mod tests {
212    use super::super::buffered_sink::BufferedWalSink;
213    use super::super::{InvalidFramingReason, StreamMagic, WalExportError, WalRecordSink};
214    use super::*;
215    use std::io::Cursor;
216
217    /// Build a synthetic record: postcard-encoded `seq: u64` followed
218    /// by `padding` zero bytes. Mirrors the `synth_record` helper in
219    /// `buffered_sink.rs` tests.
220    fn synth_record(seq: u64, padding: usize) -> Vec<u8> {
221        let mut bytes = postcard::to_stdvec(&seq).unwrap();
222        bytes.extend(std::iter::repeat_n(0u8, padding));
223        bytes
224    }
225
226    /// Encode `n` records via `BufferedWalSink<Vec<u8>>` and return the
227    /// flushed byte stream.
228    fn build_stream(n: u64) -> Vec<u8> {
229        let mut sink = BufferedWalSink::new(Vec::<u8>::new());
230        for i in 0..n {
231            let rec = synth_record(i, 4);
232            sink.append_record(&rec).expect("append OK");
233        }
234        sink.flush().expect("flush OK");
235        sink.into_writer_for_test()
236    }
237
238    /// Round-trip: writer-emitted stream parses back to the original
239    /// record sequence via `StreamingWalReader::open_v1` +
240    /// `next_record`.
241    #[test]
242    fn writer_to_reader_round_trip_three_records() {
243        let stream = build_stream(3);
244        let mut reader = StreamingWalReader::open_v1(Cursor::new(&stream)).expect("open OK");
245        assert_eq!(reader.magic(), StreamMagic::V1);
246
247        let mut decoded_seqs = Vec::new();
248        while let Some(payload) = reader.next_record().expect("next OK") {
249            let seq: u64 = postcard::from_bytes(payload).expect("postcard OK");
250            decoded_seqs.push(seq);
251        }
252        assert_eq!(decoded_seqs, vec![0, 1, 2]);
253        // Header + 3 frames each = (8 + N) bytes.
254        let header_len = 8u64;
255        let per_record_len = 8 + synth_record(0, 4).len() as u64;
256        assert_eq!(
257            reader.cumulative_position(),
258            header_len + 3 * per_record_len
259        );
260    }
261
262    /// `open_v1` on an empty stream returns `HeaderMissing` —
263    /// 0-byte input cannot satisfy the 8-byte magic read.
264    #[test]
265    fn open_v1_empty_stream_rejected_with_header_missing() {
266        let result = StreamingWalReader::open_v1(Cursor::new(Vec::<u8>::new()));
267        assert!(matches!(
268            result,
269            Err(WalExportError::InvalidFraming(
270                InvalidFramingReason::HeaderMissing
271            ))
272        ));
273    }
274
275    /// `open_v1` on a 7-byte truncated header returns `HeaderMissing`.
276    #[test]
277    fn open_v1_truncated_header_rejected_with_header_missing() {
278        let truncated: &[u8] = b"ARKHEXP"; // 7 bytes, missing trailing '1'
279        let result = StreamingWalReader::open_v1(Cursor::new(truncated));
280        assert!(matches!(
281            result,
282            Err(WalExportError::InvalidFraming(
283                InvalidFramingReason::HeaderMissing
284            ))
285        ));
286    }
287
288    /// `open_v1` on an unrecognised magic returns
289    /// `UnsupportedStreamVersion` with the offending bytes carried.
290    #[test]
291    fn open_v1_unknown_magic_rejected_with_unsupported_version() {
292        let unknown: &[u8] = b"ARKHEXP9"; // unrecognised version tag
293        let result = StreamingWalReader::open_v1(Cursor::new(unknown));
294        match result {
295            Err(WalExportError::UnsupportedStreamVersion { magic }) => {
296                assert_eq!(&magic, b"ARKHEXP9");
297            }
298            other => panic!("expected UnsupportedStreamVersion, got {other:?}"),
299        }
300    }
301
302    /// L0 WAL magic (`ARKHEWAL`) misfed to the stream reader is
303    /// rejected as `UnsupportedStreamVersion` (transport mismatch
304    /// defence).
305    #[test]
306    fn open_v1_l0_wal_magic_rejected_with_unsupported_version() {
307        let l0_magic = arkhe_kernel::persist::WalHeader::MAGIC;
308        let result = StreamingWalReader::open_v1(Cursor::new(&l0_magic[..]));
309        assert!(matches!(
310            result,
311            Err(WalExportError::UnsupportedStreamVersion { .. })
312        ));
313    }
314
315    /// `next_record` on a stream with header but zero records returns
316    /// `Ok(None)` — clean EOF.
317    #[test]
318    fn next_record_after_header_only_returns_none_clean_eof() {
319        let mut sink = BufferedWalSink::new(Vec::<u8>::new());
320        // No appends; flush also a no-op since header is emitted only on
321        // first append. So zero-record case = stream is just empty bytes.
322        sink.flush().expect("flush OK");
323        let stream = sink.into_writer_for_test();
324        // Without any records, the stream has 0 bytes (header is
325        // emitted only on first append). A reader cannot open it.
326        // Build a *header-only* stream by appending and then truncating
327        // back to the magic bytes.
328        if !stream.is_empty() {
329            // Defensive: should be empty in current sink semantics.
330            return;
331        }
332        // Construct header-only stream manually.
333        let mut hdr_only = Vec::new();
334        hdr_only.extend_from_slice(StreamMagic::V1.bytes());
335        let mut reader = StreamingWalReader::open_v1(Cursor::new(&hdr_only)).expect("open OK");
336        assert!(matches!(reader.next_record(), Ok(None)));
337        assert_eq!(reader.cumulative_position(), 8);
338    }
339
340    /// Length prefix zero rejected with `LengthZero` (mid-stream).
341    #[test]
342    fn next_record_length_zero_rejected() {
343        let mut stream = Vec::new();
344        stream.extend_from_slice(StreamMagic::V1.bytes());
345        stream.extend_from_slice(&0u64.to_be_bytes()); // explicit length-zero frame
346        let mut reader = StreamingWalReader::open_v1(Cursor::new(&stream)).expect("open OK");
347        assert!(matches!(
348            reader.next_record(),
349            Err(WalExportError::InvalidFraming(
350                InvalidFramingReason::LengthZero
351            ))
352        ));
353    }
354
355    /// Length prefix exceeding `MAX_RECORD_BYTES` rejected with
356    /// `LengthExceedsMax` (no payload alloc attempted — the bound
357    /// check fires before `Vec::resize`).
358    #[test]
359    fn next_record_length_exceeds_max_rejected_pre_alloc() {
360        let mut stream = Vec::new();
361        stream.extend_from_slice(StreamMagic::V1.bytes());
362        // 17 MiB length prefix — exceeds 16 MiB ceiling.
363        let oversize = MAX_RECORD_BYTES + 1;
364        stream.extend_from_slice(&oversize.to_be_bytes());
365        // Note: NO payload bytes follow — the reader must reject on
366        // length prefix alone, never reach `read_exact(&mut buffer)`.
367        let mut reader = StreamingWalReader::open_v1(Cursor::new(&stream)).expect("open OK");
368        match reader.next_record() {
369            Err(WalExportError::InvalidFraming(InvalidFramingReason::LengthExceedsMax {
370                prefix,
371                max,
372            })) => {
373                assert_eq!(prefix, oversize);
374                assert_eq!(max, MAX_RECORD_BYTES);
375            }
376            other => panic!("expected LengthExceedsMax, got {other:?}"),
377        }
378    }
379
380    /// Truncated mid-length-prefix (4 of 8 bytes available) rejected
381    /// with `Truncated`.
382    #[test]
383    fn next_record_truncated_mid_length_prefix_rejected() {
384        let mut stream = Vec::new();
385        stream.extend_from_slice(StreamMagic::V1.bytes());
386        stream.extend_from_slice(&[0xAA, 0xBB, 0xCC, 0xDD]); // only 4 of 8 length bytes
387        let mut reader = StreamingWalReader::open_v1(Cursor::new(&stream)).expect("open OK");
388        assert!(matches!(
389            reader.next_record(),
390            Err(WalExportError::InvalidFraming(
391                InvalidFramingReason::Truncated
392            ))
393        ));
394    }
395
396    /// Truncated mid-payload (length says N bytes but only N-3
397    /// available) rejected with `Truncated`.
398    #[test]
399    fn next_record_truncated_mid_payload_rejected() {
400        let mut stream = Vec::new();
401        stream.extend_from_slice(StreamMagic::V1.bytes());
402        // Length prefix = 10 bytes; supply only 6.
403        stream.extend_from_slice(&10u64.to_be_bytes());
404        stream.extend_from_slice(&[0u8; 6]);
405        let mut reader = StreamingWalReader::open_v1(Cursor::new(&stream)).expect("open OK");
406        assert!(matches!(
407            reader.next_record(),
408            Err(WalExportError::InvalidFraming(
409                InvalidFramingReason::Truncated
410            ))
411        ));
412    }
413
414    /// Reader resumes correctly across multiple successful records,
415    /// and `cumulative_position` tracks total bytes consumed.
416    #[test]
417    fn cumulative_position_tracks_per_record_advance() {
418        let stream = build_stream(2);
419        let mut reader = StreamingWalReader::open_v1(Cursor::new(&stream)).expect("open OK");
420        let header = 8u64;
421        let frame = 8u64 + synth_record(0, 4).len() as u64;
422        assert_eq!(reader.cumulative_position(), header);
423
424        let _ = reader.next_record().expect("rec0").expect("Some");
425        assert_eq!(reader.cumulative_position(), header + frame);
426
427        let _ = reader.next_record().expect("rec1").expect("Some");
428        assert_eq!(reader.cumulative_position(), header + 2 * frame);
429
430        assert!(matches!(reader.next_record(), Ok(None)));
431        assert_eq!(reader.cumulative_position(), header + 2 * frame);
432    }
433
434    /// Multiple sequential `next_record` calls correctly invalidate
435    /// the previous borrow: each call returns a fresh slice into the
436    /// reused internal buffer. (Compile-fail behaviour around
437    /// retaining an old borrow across a call is the point of the
438    /// streaming-iterator pattern; this test exercises the runtime
439    /// behaviour.)
440    #[test]
441    fn next_record_borrow_refreshes_on_each_call() {
442        let stream = build_stream(3);
443        let mut reader = StreamingWalReader::open_v1(Cursor::new(&stream)).expect("open OK");
444
445        // Iterate; each call replaces the buffer contents with the new
446        // record's payload. We capture decoded values into owned data
447        // (Vec<u64>) to release the borrow before the next call.
448        let mut seqs = Vec::new();
449        while let Some(payload) = reader.next_record().expect("next OK") {
450            let s: u64 = postcard::from_bytes(payload).expect("decode");
451            seqs.push(s);
452        }
453        assert_eq!(seqs, vec![0u64, 1, 2]);
454    }
455
456    /// `WalStreamReader` dyn-trait object usability check (Send +
457    /// concrete-type erasure for higher-order callers).
458    #[test]
459    fn wal_stream_reader_trait_object_usable() {
460        let stream = build_stream(1);
461        let cursor = Cursor::new(stream);
462        let reader = StreamingWalReader::open_v1(cursor).expect("open OK");
463        let mut boxed: Box<dyn WalStreamReader> = Box::new(reader);
464        let payload = boxed.next_record().expect("next").expect("Some");
465        let seq: u64 = postcard::from_bytes(payload).expect("decode");
466        assert_eq!(seq, 0);
467    }
468}