Skip to main content

arkhe_forge_platform/wal_export/
mod.rs

1//! WAL streaming export — incremental record append for snapshot / backup.
2//!
3//! # Three firm requirements
4//!
5//! 1. **Fixed-width `u64` big-endian length prefix** before every record
6//!    in the stream — deterministic + cross-platform; varint routes
7//!    through a separate determinism verify path.
8//! 2. **Bounds-check on the length prefix** before any deref of record
9//!    bytes — `0 < prefix ≤ `[`MAX_RECORD_BYTES`]. Malformed prefixes
10//!    MUST be rejected with
11//!    [`WalExportError::InvalidFraming`]`(`[`InvalidFramingReason`]`)`.
12//!    Pattern mirrors the host-fn `(ptr, len)` bounds-check contract.
13//! 3. **Stream header magic** [`STREAM_HEADER_MAGIC`] (`ARKHEXP1`)
14//!    pinned at the beginning of every export stream so truncation can
15//!    be distinguished from "the consumer is reading garbage that
16//!    happens to start with a valid length prefix".
17//!
18//! # Type-level append-only invariant (A14 projection)
19//!
20//! [`WalRecordSink`] exposes only [`WalRecordSink::append_record`] and
21//! [`WalRecordSink::flush`]. **Seek / truncate / rewrite operations are
22//! deliberately absent from the trait surface** — external callers
23//! cannot invoke them through the trait, and the L0 A14 append-only
24//! invariant is projected into the L2 export layer **without touching
25//! L0 source** (DO NOT TOUCH #7 preserved). Concrete implementations
26//! MAY add private internal positioning helpers but MUST NOT expose
27//! them on the public surface.
28//!
29//! # DO NOT TOUCH #7 (postcard field order) invariant
30//!
31//! The streaming format wraps **unmodified L0 record bytes**. The
32//! length-prefix + stream-header framing wraps the protected payload
33//! from the outside; postcard field order inside each record is
34//! preserved bit-exact. The wire-stability tests verify this by
35//! comparing streamed record bytes against
36//! `arkhe_kernel::persist::Wal::serialize` record bytes.
37//!
38//! # Forward-compatibility
39//!
40//! All public types are `#[non_exhaustive]` — adding variants or fields
41//! does not break external matchers. [`WalRecordSink`] trait may grow
42//! additional methods only if they preserve the append-only invariant
43//! (no seek / truncate / rewrite).
44//!
45//! # Stability
46//!
47//! Wire-format surface frozen. Subsequent magic variants are added
48//! through migration mechanisms, never silent reuse — see
49//! [`StreamMagic::recognize`] for the reader-side dispatch.
50
51use std::fmt;
52
53pub mod buffered_sink;
54pub mod reader;
55
56pub use buffered_sink::BufferedWalSink;
57pub use reader::{StreamingWalReader, WalStreamReader};
58
59#[cfg(test)]
60mod wire_stability;
61
62#[cfg(test)]
63mod round_trip_tests;
64
65/// Magic bytes pinned at the start of every WAL export stream.
66///
67/// `ARKHEXP1` = "ArKhe EXPort version 1". Distinct from L0
68/// `WalHeader::MAGIC` (`ARKHEWAL`) — the streaming export is a
69/// separate transport format that wraps L0 records, so the stream-
70/// header magic is independent of the WAL header magic. A future
71/// version bump (`ARKHEXP2`, …) is a wire-format break and must be
72/// handled by an explicit migration mechanism, not silent reuse —
73/// see [`StreamMagic`] for the structured cross-version dispatch
74/// surface that makes the bump path mechanical.
75pub const STREAM_HEADER_MAGIC: [u8; 8] = *b"ARKHEXP1";
76
77/// Stream-header magic version dispatch — cross-version
78/// forward-compat dispatch surface. Each enum variant maps to a distinct
79/// 8-byte ASCII tag at the start of an exported stream; the reader
80/// dispatches on the recognised tag to select the corresponding
81/// frame-format parser.
82///
83/// `V1` (the `ARKHEXP1` baseline) is the only currently-reserved
84/// variant. Additional variants (`V2` / `V3` / …) may be added
85/// alongside this enum and the reader-side `recognize` lookup. The
86/// `#[non_exhaustive]` attribute makes the additive expansion
87/// non-breaking for external matchers.
88///
89/// **Wire-stability invariant**: the byte mapping for `V1` is pinned
90/// at `*b"ARKHEXP1"` (golden hex vector
91/// `0x41 0x52 0x4B 0x48 0x45 0x58 0x50 0x31`). Any change to the V1
92/// byte pattern is a wire-format break and requires the same
93/// migration mechanism as a new variant.
94#[derive(Debug, Clone, Copy, Eq, PartialEq)]
95#[non_exhaustive]
96pub enum StreamMagic {
97    /// `ARKHEXP1` — wire baseline. Currently the only supported
98    /// version.
99    V1,
100}
101
102impl StreamMagic {
103    /// Return the 8-byte tag for this version. `const fn` so callers
104    /// can pin the byte pattern at compile time + use the result in
105    /// `const` contexts.
106    #[must_use]
107    pub const fn bytes(self) -> &'static [u8; 8] {
108        match self {
109            Self::V1 => b"ARKHEXP1",
110        }
111    }
112
113    /// Recognise a stream-header magic tag — `Some(variant)` when the
114    /// 8-byte input matches a supported version, `None` for any
115    /// unknown tag (caller surfaces via
116    /// [`WalExportError::UnsupportedStreamVersion`]).
117    ///
118    /// **Fail-fast posture**: this lookup runs against a stack 8-byte
119    /// array — no heap allocation, no buffer slicing beyond the
120    /// caller's already-read header bytes.
121    #[must_use]
122    pub fn recognize(bytes: &[u8; 8]) -> Option<Self> {
123        if bytes == b"ARKHEXP1" {
124            Some(Self::V1)
125        } else {
126            None
127        }
128    }
129}
130
131/// Maximum byte length accepted in a single record's length prefix.
132///
133/// 16 MiB fail-secure bound — any record whose length prefix exceeds
134/// this is rejected as malformed framing per cryptographer firm
135/// requirement #2 (bounds-check before deref). Production WAL records
136/// sit far below the threshold (typical record < 64 KiB); 16 MiB =
137/// ~256× typical, ample for legitimate growth and tight enough to
138/// short-circuit attacker-supplied prefix-overflow + memory-DoS
139/// before any deref attempt.
140///
141/// **Rationale (fail-secure pattern)**: the bound serves both
142/// (a) anti-overflow (length prefix arithmetic) and (b) anti-memory-
143/// DoS (allocation cap before record-bytes deref). Choosing the
144/// tighter of the two acceptable bounds (16 MiB vs 1 GiB) closes the
145/// memory-DoS attack vector with zero impact on legitimate workloads.
146///
147/// **Absolute vs per-sink soft cap**: this constant is the **absolute
148/// ceiling** baked into the wire-format contract. Concrete sinks MAY
149/// impose tighter per-deployment soft caps via their own
150/// configuration (e.g., a 1 MiB sink for embedded targets) and
151/// surface those rejections through [`WalExportError::BufferOverflow`].
152/// The hard ceiling here remains 16 MiB irrespective of sink
153/// configuration.
154pub const MAX_RECORD_BYTES: u64 = 1 << 24;
155
156/// Top-level streaming export error surface.
157///
158/// `#[non_exhaustive]` — variants may be added (e.g., compression /
159/// signature validation surfaces) without breaking external matchers.
160#[derive(Debug)]
161#[non_exhaustive]
162pub enum WalExportError {
163    /// Underlying sink I/O failure — operator-side disk / network /
164    /// buffer error. Carries the upstream `std::io::Error` for
165    /// diagnosis.
166    Io(std::io::Error),
167    /// Length-prefix or stream-header framing rejection. See
168    /// [`InvalidFramingReason`] for the specific malformation.
169    InvalidFraming(InvalidFramingReason),
170    /// Append-only invariant violated — caller attempted to submit a
171    /// record whose `seq` is not the strict successor of the most
172    /// recently appended record's `seq`. Surface enforcement: the
173    /// type-level invariant (no seek / truncate / rewrite methods)
174    /// prevents the *machinery* from rewriting, and this runtime-side
175    /// check prevents the *caller* from re-ordering.
176    ///
177    /// **Operator response**: fix the caller — the seq ordering
178    /// invariant was violated by upstream. Distinguish from
179    /// [`Self::SeqExhausted`] (intrinsic limit, rotate stream).
180    AppendOnlyViolation {
181        /// Expected next `seq` (most recent + 1, or 0 for a fresh
182        /// stream).
183        expected_seq: u64,
184        /// `seq` carried by the offending record.
185        got_seq: u64,
186        /// **Forensic field**: most recently appended `seq`, if any.
187        /// `None` means no record had been appended yet at the time
188        /// of the violation (fresh stream — and `expected_seq` will
189        /// be `0`). Gives the operator the prior high-water mark
190        /// independent of the violation triple, useful when the
191        /// violation post-dates a long sequence of appends and the
192        /// operator wants the prior-state context without re-deriving
193        /// `expected_seq - 1`.
194        previous_seq: Option<u64>,
195    },
196    /// Sequence space exhausted — the most recently appended `seq`
197    /// is `u64::MAX`, so any further append would require an
198    /// arithmetic wraparound. Distinct from
199    /// [`Self::AppendOnlyViolation`] because it signals an intrinsic
200    /// limit rather than a caller ordering error. Architecturally
201    /// unreachable on any real-world stream (u64::MAX appends at
202    /// 1ns/append exceeds the universe lifetime by ~17 orders of
203    /// magnitude), but the explicit variant makes the fail-secure
204    /// posture mechanical rather than implicit.
205    ///
206    /// **Operator response**: rotate the stream — start a new
207    /// stream, this one has exhausted its u64 seq space. The current
208    /// stream's `last_seq = u64::MAX` state is preserved (no
209    /// corruption), so the operator may rotate at leisure.
210    SeqExhausted {
211        /// The exhausted seq (always `u64::MAX`).
212        last_seq: u64,
213    },
214    /// Stream header magic recognised no supported version. The reader
215    /// (or framing-validation caller) read 8 bytes at the stream start,
216    /// asked [`StreamMagic::recognize`] to dispatch, and the lookup
217    /// returned `None`. Distinct from
218    /// [`InvalidFramingReason::HeaderMissing`] because that variant
219    /// signals "no magic at all" (truncated header) —
220    /// `UnsupportedStreamVersion` signals "magic present but
221    /// unrecognised" (e.g., a stream emitted by an unrecognised future
222    /// writer tag).
223    ///
224    /// **Operator response**: upgrade the reader, or downgrade the
225    /// writer to a known version. The carrier `magic` field gives
226    /// the operator the exact 8-byte tag for triage.
227    UnsupportedStreamVersion {
228        /// The 8-byte tag at stream start that did not match any
229        /// recognised [`StreamMagic`] variant.
230        magic: [u8; 8],
231    },
232    /// Internal buffer capacity exhausted before flush. Caller should
233    /// flush to drain. Concrete sink implementations choose their own
234    /// `capacity` — [`BufferedWalSink`] uses a 16 MiB default and
235    /// exposes configuration knobs through the constructor.
236    BufferOverflow {
237        /// Sink's configured buffer capacity in bytes.
238        capacity: usize,
239        /// Bytes the caller attempted to append at overflow time.
240        requested: usize,
241        /// **Forensic field**: bytes already buffered at overflow
242        /// time. Combined with `capacity` and `requested`, gives the
243        /// operator the full sizing context: `current_buffer +
244        /// requested > capacity` is the predicate that triggered
245        /// overflow.
246        current_buffer: usize,
247    },
248}
249
250/// Reasons a streamed framing wrapper is rejected.
251///
252/// Distinct from [`WalExportError`] so callers can match the framing
253/// failure mode without needing to reach into a tuple variant.
254/// [`BufferedWalSink`] uses the variants individually when
255/// constructing rejection paths.
256#[derive(Debug)]
257#[non_exhaustive]
258pub enum InvalidFramingReason {
259    /// Length prefix exceeds [`MAX_RECORD_BYTES`]. Carries both the
260    /// observed prefix and the bound for caller diagnostics.
261    ///
262    /// **Operator response**: investigate hostile stream injection
263    /// or fragment producer. The 16 MiB ceiling is a fail-secure
264    /// hard bound — a legitimate producer should never emit a frame
265    /// at this size.
266    LengthExceedsMax {
267        /// The prefix value that triggered rejection.
268        prefix: u64,
269        /// The bound that was exceeded — equals [`MAX_RECORD_BYTES`].
270        max: u64,
271    },
272    /// Length prefix is zero — empty records are disallowed.
273    ///
274    /// Every frame must carry a payload; the reader rejects defensively
275    /// to prevent silent-no-op streams.
276    ///
277    /// **Operator response**: re-emit (the producer should never emit
278    /// a length-zero frame).
279    LengthZero,
280    /// Stream truncated — bytes ended mid-record before the full
281    /// length-prefixed payload arrived. Distinguishes operator-side
282    /// disk failure from valid-but-shorter streams.
283    ///
284    /// **Operator response**: re-emit; source likely truncated mid-
285    /// write (disk full, network drop, process crash). The reader
286    /// surfaces this distinct from `HeaderMissing` so the operator
287    /// can distinguish "never started" from "started but cut off".
288    Truncated,
289    /// Stream header magic missing or mismatched at stream start.
290    /// Triggered when the leading 8 bytes do not equal
291    /// [`STREAM_HEADER_MAGIC`].
292    ///
293    /// **Operator response**: verify the source stream is an
294    /// ARKHEXP-format export (not, e.g., an L0 WAL file or an
295    /// unrelated binary). Distinct from
296    /// [`crate::wal_export::WalExportError::UnsupportedStreamVersion`]
297    /// which signals "magic present but version unrecognised".
298    HeaderMissing,
299}
300
301impl fmt::Display for WalExportError {
302    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
303        match self {
304            Self::Io(e) => write!(f, "wal export io: {e}"),
305            Self::InvalidFraming(r) => write!(f, "wal export framing rejected: {r}"),
306            Self::AppendOnlyViolation {
307                expected_seq,
308                got_seq,
309                previous_seq,
310            } => match previous_seq {
311                Some(prev) => write!(
312                    f,
313                    "wal export append-only violation: expected seq {expected_seq}, got {got_seq} (last appended {prev})"
314                ),
315                None => write!(
316                    f,
317                    "wal export append-only violation: expected seq {expected_seq}, got {got_seq} (fresh stream, no prior appends)"
318                ),
319            },
320            Self::SeqExhausted { last_seq } => write!(
321                f,
322                "wal export seq space exhausted at last_seq {last_seq} — rotate stream"
323            ),
324            Self::UnsupportedStreamVersion { magic } => write!(
325                f,
326                "wal export unsupported stream version: magic {magic:?} not recognised"
327            ),
328            Self::BufferOverflow {
329                capacity,
330                requested,
331                current_buffer,
332            } => write!(
333                f,
334                "wal export buffer overflow: capacity {capacity} bytes, current {current_buffer}, requested {requested}"
335            ),
336        }
337    }
338}
339
340impl fmt::Display for InvalidFramingReason {
341    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
342        match self {
343            Self::LengthExceedsMax { prefix, max } => {
344                write!(f, "length prefix {prefix} exceeds maximum {max} bytes")
345            }
346            Self::LengthZero => write!(f, "length prefix is zero — empty records disallowed"),
347            Self::Truncated => write!(f, "stream truncated mid-record"),
348            Self::HeaderMissing => write!(
349                f,
350                "stream header magic missing or mismatched (expected ARKHEXP1)"
351            ),
352        }
353    }
354}
355
356impl std::error::Error for WalExportError {
357    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
358        match self {
359            Self::Io(e) => Some(e),
360            Self::InvalidFraming(_)
361            | Self::AppendOnlyViolation { .. }
362            | Self::SeqExhausted { .. }
363            | Self::UnsupportedStreamVersion { .. }
364            | Self::BufferOverflow { .. } => None,
365        }
366    }
367}
368
369impl std::error::Error for InvalidFramingReason {}
370
371impl From<std::io::Error> for WalExportError {
372    fn from(e: std::io::Error) -> Self {
373        Self::Io(e)
374    }
375}
376
377/// Append-only sink for streaming WAL records.
378///
379/// Implementations append fully-encoded record bytes (postcard
380/// `WalRecord` output from `arkhe_kernel::persist::Wal::serialize`)
381/// one record at a time, framing each with a fixed-width `u64` BE
382/// length prefix and following the stream-header pin (firm
383/// requirement #1 + #3). The trait deliberately exposes neither
384/// seek nor truncate nor rewrite — A14 append-only is type-level
385/// enforced by the absence of those methods.
386///
387/// Implementations are responsible for:
388///
389/// - validating the length prefix bounds (`0 < len ≤
390///   `[`MAX_RECORD_BYTES`]) before deref (firm requirement #2);
391/// - emitting the [`STREAM_HEADER_MAGIC`] pin at stream start;
392/// - rejecting out-of-sequence `seq` with
393///   [`WalExportError::AppendOnlyViolation`] (caller-side append-only);
394/// - durable persistence on [`flush`](Self::flush) — concrete
395///   implementations decide the persistence model (fsync per record
396///   vs batched fsync on flush).
397///
398/// `Send + Sync` are not required by the trait itself — concrete sinks
399/// may be `!Sync` if they wrap interior-mutable state. The reference
400/// implementation [`BufferedWalSink`] is `Send` but `!Sync` (uses an
401/// internal `&mut` cursor into the buffer).
402pub trait WalRecordSink {
403    /// Append one record's encoded bytes to the sink. The implementation
404    /// frames the bytes (length prefix + payload) and records them in
405    /// append order. Callers MUST submit records in monotone-increasing
406    /// `seq` order; out-of-order submissions trip
407    /// [`WalExportError::AppendOnlyViolation`].
408    ///
409    /// The bytes parameter SHOULD be the postcard encoding of an L0
410    /// `WalRecord`. The wire-stability test
411    /// `postcard_record_bytes_preserved_bit_exact` verifies that
412    /// streamed bytes match `Wal::serialize` record-section bytes
413    /// without any field-order rewrite (DO NOT TOUCH #7).
414    fn append_record(&mut self, record_bytes: &[u8]) -> Result<(), WalExportError>;
415
416    /// Flush buffered records to the underlying durable storage.
417    /// Returns `Ok(())` once the implementation's durability contract
418    /// is satisfied (fsync completed, network ack received, etc.).
419    /// Implementations are free to flush eagerly inside
420    /// [`append_record`](Self::append_record) — `flush` then becomes
421    /// a no-op.
422    fn flush(&mut self) -> Result<(), WalExportError>;
423}
424
425#[cfg(test)]
426#[allow(clippy::expect_used, clippy::unwrap_used)]
427mod tests {
428    use super::*;
429
430    /// `STREAM_HEADER_MAGIC` is the 8-byte ASCII tag pinned at the
431    /// start of every stream. Wire-stable — a value change is a
432    /// wire-format break and requires a new magic value plus migration,
433    /// not silent reuse.
434    #[test]
435    fn stream_header_magic_is_arkhexp1() {
436        assert_eq!(&STREAM_HEADER_MAGIC, b"ARKHEXP1");
437    }
438
439    /// `StreamMagic::V1` byte mapping pinned at the wire-stability
440    /// golden vector (`0x41 0x52 0x4B 0x48 0x45 0x58 0x50 0x31`).
441    /// Drift between the enum dispatch and the legacy
442    /// `STREAM_HEADER_MAGIC` constant trips this test — keeping the
443    /// byte tag invariant across the cross-version surface.
444    #[test]
445    fn stream_magic_v1_bytes_match_legacy_constant() {
446        assert_eq!(StreamMagic::V1.bytes(), &STREAM_HEADER_MAGIC);
447        assert_eq!(
448            StreamMagic::V1.bytes(),
449            &[0x41, 0x52, 0x4B, 0x48, 0x45, 0x58, 0x50, 0x31]
450        );
451    }
452
453    /// `StreamMagic::recognize` returns `Some(V1)` for the ARKHEXP1
454    /// byte tag and `None` for anything else. Sample `None` cases
455    /// include an unallocated tag (`ARKHEXP2`) and an L0 WAL magic
456    /// mistakenly fed to the export reader (`ARKHEWAL`) — both must
457    /// reject without heap alloc.
458    #[test]
459    fn stream_magic_recognize_v1_and_rejects_unknown() {
460        assert_eq!(StreamMagic::recognize(b"ARKHEXP1"), Some(StreamMagic::V1));
461        // Unallocated tag — must reject without heap alloc.
462        assert_eq!(StreamMagic::recognize(b"ARKHEXP2"), None);
463        // L0 WAL magic — wrong transport, must reject.
464        assert_eq!(StreamMagic::recognize(b"ARKHEWAL"), None);
465        // Random eight bytes.
466        assert_eq!(StreamMagic::recognize(b"\0\0\0\0\0\0\0\0"), None);
467    }
468
469    /// `UnsupportedStreamVersion` Display surface distinguishable +
470    /// carries the offending magic for operator triage.
471    #[test]
472    fn unsupported_stream_version_display_distinguishes_with_magic() {
473        let err = WalExportError::UnsupportedStreamVersion {
474            magic: *b"ARKHEXP2",
475        };
476        let msg = err.to_string();
477        assert!(msg.contains("unsupported stream version"));
478        assert!(msg.contains("not recognised"));
479        // The offending magic appears in the message via Debug repr
480        // so operators can pin the exact byte pattern.
481        assert!(msg.contains("65")); // 0x41 is 'A' = 65 in decimal Debug
482    }
483
484    /// `STREAM_HEADER_MAGIC` distinct from L0 `WalHeader::MAGIC`
485    /// (`ARKHEWAL`) — separate transport format, separate magic.
486    #[test]
487    fn stream_header_magic_distinct_from_l0_wal_magic() {
488        assert_ne!(
489            &STREAM_HEADER_MAGIC,
490            &arkhe_kernel::persist::WalHeader::MAGIC
491        );
492    }
493
494    /// `MAX_RECORD_BYTES` pinned at 16 MiB (fail-secure, ~256× typical
495    /// record size). Bound short-circuits attacker-supplied length-
496    /// prefix overflow + memory-DoS before any deref attempt.
497    #[test]
498    fn max_record_bytes_is_sixteen_mib() {
499        assert_eq!(MAX_RECORD_BYTES, 1 << 24);
500        assert_eq!(MAX_RECORD_BYTES, 16_777_216);
501    }
502
503    /// `WalExportError` implements `Display` + `Error` + `Send + Sync`
504    /// — standard error-type expectations. `Send + Sync` lets the
505    /// error cross thread / async boundaries when sinks operate
506    /// off-runtime-thread.
507    #[test]
508    fn wal_export_error_implements_standard_error_traits() {
509        fn assert_send_sync<T: Send + Sync>() {}
510        fn assert_error<T: std::error::Error>() {}
511        assert_send_sync::<WalExportError>();
512        assert_error::<WalExportError>();
513        assert_send_sync::<InvalidFramingReason>();
514        assert_error::<InvalidFramingReason>();
515    }
516
517    /// Display surface — each variant produces a non-empty
518    /// distinguishable message. The exact strings are operator-facing
519    /// log lines, not part of the wire protocol; the wire-stability
520    /// tests do not verify them bit-exact.
521    #[test]
522    fn wal_export_error_display_is_distinguishable() {
523        let io_err = WalExportError::Io(std::io::Error::other("test"));
524        let framing_err = WalExportError::InvalidFraming(InvalidFramingReason::LengthZero);
525        let append_err = WalExportError::AppendOnlyViolation {
526            expected_seq: 5,
527            got_seq: 3,
528            previous_seq: Some(4),
529        };
530        let exhausted_err = WalExportError::SeqExhausted { last_seq: u64::MAX };
531        let buf_err = WalExportError::BufferOverflow {
532            capacity: 1024,
533            requested: 2048,
534            current_buffer: 768,
535        };
536
537        let io_msg = io_err.to_string();
538        let framing_msg = framing_err.to_string();
539        let append_msg = append_err.to_string();
540        let exhausted_msg = exhausted_err.to_string();
541        let buf_msg = buf_err.to_string();
542
543        assert!(io_msg.contains("io"));
544        assert!(framing_msg.contains("framing"));
545        assert!(append_msg.contains("append-only"));
546        assert!(append_msg.contains('5'));
547        assert!(append_msg.contains('3'));
548        assert!(append_msg.contains('4')); // previous_seq forensic
549        assert!(exhausted_msg.contains("exhausted"));
550        assert!(exhausted_msg.contains("rotate"));
551        assert!(buf_msg.contains("buffer overflow"));
552        assert!(buf_msg.contains("1024"));
553        assert!(buf_msg.contains("2048"));
554        assert!(buf_msg.contains("768")); // current_buffer forensic
555    }
556
557    /// `AppendOnlyViolation` with `previous_seq: None` (fresh stream)
558    /// renders a distinguishable Display string. The fresh-stream
559    /// forensic case is what an operator hits when the caller submits
560    /// a non-zero seq as the first record (e.g., resuming a stream
561    /// from a stale checkpoint).
562    #[test]
563    fn append_only_violation_fresh_stream_display_distinguishes_from_some_case() {
564        let fresh = WalExportError::AppendOnlyViolation {
565            expected_seq: 0,
566            got_seq: 7,
567            previous_seq: None,
568        };
569        let after = WalExportError::AppendOnlyViolation {
570            expected_seq: 5,
571            got_seq: 3,
572            previous_seq: Some(4),
573        };
574        let fresh_msg = fresh.to_string();
575        let after_msg = after.to_string();
576        assert!(fresh_msg.contains("fresh stream"));
577        assert!(fresh_msg.contains("no prior appends"));
578        assert!(after_msg.contains("last appended"));
579        assert!(after_msg.contains('4'));
580        assert_ne!(fresh_msg, after_msg);
581    }
582
583    /// Each `InvalidFramingReason` variant produces a distinguishable
584    /// Display message. Operators triage on these strings.
585    #[test]
586    fn invalid_framing_reason_display_is_distinguishable() {
587        let exceeds = InvalidFramingReason::LengthExceedsMax {
588            prefix: MAX_RECORD_BYTES + 1,
589            max: MAX_RECORD_BYTES,
590        };
591        let zero = InvalidFramingReason::LengthZero;
592        let trunc = InvalidFramingReason::Truncated;
593        let hdr = InvalidFramingReason::HeaderMissing;
594
595        assert!(exceeds.to_string().contains("exceeds maximum"));
596        assert!(zero.to_string().contains("zero"));
597        assert!(trunc.to_string().contains("truncated"));
598        assert!(hdr.to_string().contains("ARKHEXP1"));
599    }
600
601    /// `WalExportError::source()` exposes the underlying `std::io::Error`
602    /// when present so error-chain printers / observability hooks can
603    /// drill down. Non-IO variants return `None`.
604    #[test]
605    fn wal_export_error_source_chains_through_io() {
606        use std::error::Error;
607        let io_err = WalExportError::Io(std::io::Error::other("test"));
608        let framing_err = WalExportError::InvalidFraming(InvalidFramingReason::LengthZero);
609
610        assert!(io_err.source().is_some());
611        assert!(framing_err.source().is_none());
612    }
613
614    /// `From<std::io::Error>` — `?`-friendly conversion at concrete
615    /// sink implementation sites (buffered sink + round-trip fixtures).
616    #[test]
617    fn from_io_error_lifts_to_wal_export_error() {
618        let io_err = std::io::Error::other("test");
619        let lifted: WalExportError = io_err.into();
620        assert!(matches!(lifted, WalExportError::Io(_)));
621    }
622
623    /// No-op stub `WalRecordSink` impl — confirms the trait is
624    /// satisfiable without seek / truncate. Concrete impl is
625    /// [`BufferedWalSink`]. Test asserts the trait's *method shape*
626    /// alone.
627    struct NoopSink;
628
629    impl WalRecordSink for NoopSink {
630        fn append_record(&mut self, _record_bytes: &[u8]) -> Result<(), WalExportError> {
631            Ok(())
632        }
633        fn flush(&mut self) -> Result<(), WalExportError> {
634            Ok(())
635        }
636    }
637
638    /// Trait surface satisfiable without any positioning operation.
639    /// If a future change to [`WalRecordSink`] adds a seek / truncate
640    /// method, this test compiles trivially but its rationale
641    /// docstring is the trip-wire for human review.
642    #[test]
643    fn wal_record_sink_satisfiable_without_seek() {
644        let mut sink = NoopSink;
645        sink.append_record(b"sample").expect("noop sink succeeds");
646        sink.flush().expect("noop flush");
647    }
648}