arkhe_forge_platform/wal_export/mod.rs
1//! WAL streaming export — incremental record append for snapshot / backup.
2//!
3//! # Three firm requirements
4//!
5//! 1. **Fixed-width `u64` big-endian length prefix** before every record
6//! in the stream — deterministic + cross-platform; varint routes
7//! through a separate determinism verify path.
8//! 2. **Bounds-check on the length prefix** before any deref of record
9//! bytes — `0 < prefix ≤ `[`MAX_RECORD_BYTES`]. Malformed prefixes
10//! MUST be rejected with
11//! [`WalExportError::InvalidFraming`]`(`[`InvalidFramingReason`]`)`.
12//! Pattern mirrors the host-fn `(ptr, len)` bounds-check contract.
13//! 3. **Stream header magic** [`STREAM_HEADER_MAGIC`] (`ARKHEXP1`)
14//! pinned at the beginning of every export stream so truncation can
15//! be distinguished from "the consumer is reading garbage that
16//! happens to start with a valid length prefix".
17//!
18//! # Type-level append-only invariant (A14 projection)
19//!
20//! [`WalRecordSink`] exposes only [`WalRecordSink::append_record`] and
21//! [`WalRecordSink::flush`]. **Seek / truncate / rewrite operations are
22//! deliberately absent from the trait surface** — external callers
23//! cannot invoke them through the trait, and the L0 A14 append-only
24//! invariant is projected into the L2 export layer **without touching
25//! L0 source** (DO NOT TOUCH #7 preserved). Concrete implementations
26//! MAY add private internal positioning helpers but MUST NOT expose
27//! them on the public surface.
28//!
29//! # DO NOT TOUCH #7 (postcard field order) invariant
30//!
31//! The streaming format wraps **unmodified L0 record bytes**. The
32//! length-prefix + stream-header framing wraps the protected payload
33//! from the outside; postcard field order inside each record is
34//! preserved bit-exact. The wire-stability tests verify this by
35//! comparing streamed record bytes against
36//! `arkhe_kernel::persist::Wal::serialize` record bytes.
37//!
38//! # Forward-compatibility
39//!
40//! All public types are `#[non_exhaustive]` — adding variants or fields
41//! does not break external matchers. [`WalRecordSink`] trait may grow
42//! additional methods only if they preserve the append-only invariant
43//! (no seek / truncate / rewrite).
44//!
45//! # Stability
46//!
47//! Wire-format surface frozen. Subsequent magic variants are added
48//! through migration mechanisms, never silent reuse — see
49//! [`StreamMagic::recognize`] for the reader-side dispatch.
50
51use std::fmt;
52
53pub mod buffered_sink;
54pub mod reader;
55
56pub use buffered_sink::BufferedWalSink;
57pub use reader::{StreamingWalReader, WalStreamReader};
58
59#[cfg(test)]
60mod wire_stability;
61
62#[cfg(test)]
63mod round_trip_tests;
64
65/// Magic bytes pinned at the start of every WAL export stream.
66///
67/// `ARKHEXP1` = "ArKhe EXPort version 1". Distinct from L0
68/// `WalHeader::MAGIC` (`ARKHEWAL`) — the streaming export is a
69/// separate transport format that wraps L0 records, so the stream-
70/// header magic is independent of the WAL header magic. A future
71/// version bump (`ARKHEXP2`, …) is a wire-format break and must be
72/// handled by an explicit migration mechanism, not silent reuse —
73/// see [`StreamMagic`] for the structured cross-version dispatch
74/// surface that makes the bump path mechanical.
75pub const STREAM_HEADER_MAGIC: [u8; 8] = *b"ARKHEXP1";
76
77/// Stream-header magic version dispatch — cross-version
78/// forward-compat dispatch surface. Each enum variant maps to a distinct
79/// 8-byte ASCII tag at the start of an exported stream; the reader
80/// dispatches on the recognised tag to select the corresponding
81/// frame-format parser.
82///
83/// `V1` (the `ARKHEXP1` baseline) is the only currently-reserved
84/// variant. Additional variants (`V2` / `V3` / …) may be added
85/// alongside this enum and the reader-side `recognize` lookup. The
86/// `#[non_exhaustive]` attribute makes the additive expansion
87/// non-breaking for external matchers.
88///
89/// **Wire-stability invariant**: the byte mapping for `V1` is pinned
90/// at `*b"ARKHEXP1"` (golden hex vector
91/// `0x41 0x52 0x4B 0x48 0x45 0x58 0x50 0x31`). Any change to the V1
92/// byte pattern is a wire-format break and requires the same
93/// migration mechanism as a new variant.
94#[derive(Debug, Clone, Copy, Eq, PartialEq)]
95#[non_exhaustive]
96pub enum StreamMagic {
97 /// `ARKHEXP1` — wire baseline. Currently the only supported
98 /// version.
99 V1,
100}
101
102impl StreamMagic {
103 /// Return the 8-byte tag for this version. `const fn` so callers
104 /// can pin the byte pattern at compile time + use the result in
105 /// `const` contexts.
106 #[must_use]
107 pub const fn bytes(self) -> &'static [u8; 8] {
108 match self {
109 Self::V1 => b"ARKHEXP1",
110 }
111 }
112
113 /// Recognise a stream-header magic tag — `Some(variant)` when the
114 /// 8-byte input matches a supported version, `None` for any
115 /// unknown tag (caller surfaces via
116 /// [`WalExportError::UnsupportedStreamVersion`]).
117 ///
118 /// **Fail-fast posture**: this lookup runs against a stack 8-byte
119 /// array — no heap allocation, no buffer slicing beyond the
120 /// caller's already-read header bytes.
121 #[must_use]
122 pub fn recognize(bytes: &[u8; 8]) -> Option<Self> {
123 if bytes == b"ARKHEXP1" {
124 Some(Self::V1)
125 } else {
126 None
127 }
128 }
129}
130
131/// Maximum byte length accepted in a single record's length prefix.
132///
133/// 16 MiB fail-secure bound — any record whose length prefix exceeds
134/// this is rejected as malformed framing per cryptographer firm
135/// requirement #2 (bounds-check before deref). Production WAL records
136/// sit far below the threshold (typical record < 64 KiB); 16 MiB =
137/// ~256× typical, ample for legitimate growth and tight enough to
138/// short-circuit attacker-supplied prefix-overflow + memory-DoS
139/// before any deref attempt.
140///
141/// **Rationale (fail-secure pattern)**: the bound serves both
142/// (a) anti-overflow (length prefix arithmetic) and (b) anti-memory-
143/// DoS (allocation cap before record-bytes deref). Choosing the
144/// tighter of the two acceptable bounds (16 MiB vs 1 GiB) closes the
145/// memory-DoS attack vector with zero impact on legitimate workloads.
146///
147/// **Absolute vs per-sink soft cap**: this constant is the **absolute
148/// ceiling** baked into the wire-format contract. Concrete sinks MAY
149/// impose tighter per-deployment soft caps via their own
150/// configuration (e.g., a 1 MiB sink for embedded targets) and
151/// surface those rejections through [`WalExportError::BufferOverflow`].
152/// The hard ceiling here remains 16 MiB irrespective of sink
153/// configuration.
154pub const MAX_RECORD_BYTES: u64 = 1 << 24;
155
156/// Top-level streaming export error surface.
157///
158/// `#[non_exhaustive]` — variants may be added (e.g., compression /
159/// signature validation surfaces) without breaking external matchers.
160#[derive(Debug)]
161#[non_exhaustive]
162pub enum WalExportError {
163 /// Underlying sink I/O failure — operator-side disk / network /
164 /// buffer error. Carries the upstream `std::io::Error` for
165 /// diagnosis.
166 Io(std::io::Error),
167 /// Length-prefix or stream-header framing rejection. See
168 /// [`InvalidFramingReason`] for the specific malformation.
169 InvalidFraming(InvalidFramingReason),
170 /// Append-only invariant violated — caller attempted to submit a
171 /// record whose `seq` is not the strict successor of the most
172 /// recently appended record's `seq`. Surface enforcement: the
173 /// type-level invariant (no seek / truncate / rewrite methods)
174 /// prevents the *machinery* from rewriting, and this runtime-side
175 /// check prevents the *caller* from re-ordering.
176 ///
177 /// **Operator response**: fix the caller — the seq ordering
178 /// invariant was violated by upstream. Distinguish from
179 /// [`Self::SeqExhausted`] (intrinsic limit, rotate stream).
180 AppendOnlyViolation {
181 /// Expected next `seq` (most recent + 1, or 0 for a fresh
182 /// stream).
183 expected_seq: u64,
184 /// `seq` carried by the offending record.
185 got_seq: u64,
186 /// **Forensic field**: most recently appended `seq`, if any.
187 /// `None` means no record had been appended yet at the time
188 /// of the violation (fresh stream — and `expected_seq` will
189 /// be `0`). Gives the operator the prior high-water mark
190 /// independent of the violation triple, useful when the
191 /// violation post-dates a long sequence of appends and the
192 /// operator wants the prior-state context without re-deriving
193 /// `expected_seq - 1`.
194 previous_seq: Option<u64>,
195 },
196 /// Sequence space exhausted — the most recently appended `seq`
197 /// is `u64::MAX`, so any further append would require an
198 /// arithmetic wraparound. Distinct from
199 /// [`Self::AppendOnlyViolation`] because it signals an intrinsic
200 /// limit rather than a caller ordering error. Architecturally
201 /// unreachable on any real-world stream (u64::MAX appends at
202 /// 1ns/append exceeds the universe lifetime by ~17 orders of
203 /// magnitude), but the explicit variant makes the fail-secure
204 /// posture mechanical rather than implicit.
205 ///
206 /// **Operator response**: rotate the stream — start a new
207 /// stream, this one has exhausted its u64 seq space. The current
208 /// stream's `last_seq = u64::MAX` state is preserved (no
209 /// corruption), so the operator may rotate at leisure.
210 SeqExhausted {
211 /// The exhausted seq (always `u64::MAX`).
212 last_seq: u64,
213 },
214 /// Stream header magic recognised no supported version. The reader
215 /// (or framing-validation caller) read 8 bytes at the stream start,
216 /// asked [`StreamMagic::recognize`] to dispatch, and the lookup
217 /// returned `None`. Distinct from
218 /// [`InvalidFramingReason::HeaderMissing`] because that variant
219 /// signals "no magic at all" (truncated header) —
220 /// `UnsupportedStreamVersion` signals "magic present but
221 /// unrecognised" (e.g., a stream emitted by an unrecognised future
222 /// writer tag).
223 ///
224 /// **Operator response**: upgrade the reader, or downgrade the
225 /// writer to a known version. The carrier `magic` field gives
226 /// the operator the exact 8-byte tag for triage.
227 UnsupportedStreamVersion {
228 /// The 8-byte tag at stream start that did not match any
229 /// recognised [`StreamMagic`] variant.
230 magic: [u8; 8],
231 },
232 /// Internal buffer capacity exhausted before flush. Caller should
233 /// flush to drain. Concrete sink implementations choose their own
234 /// `capacity` — [`BufferedWalSink`] uses a 16 MiB default and
235 /// exposes configuration knobs through the constructor.
236 BufferOverflow {
237 /// Sink's configured buffer capacity in bytes.
238 capacity: usize,
239 /// Bytes the caller attempted to append at overflow time.
240 requested: usize,
241 /// **Forensic field**: bytes already buffered at overflow
242 /// time. Combined with `capacity` and `requested`, gives the
243 /// operator the full sizing context: `current_buffer +
244 /// requested > capacity` is the predicate that triggered
245 /// overflow.
246 current_buffer: usize,
247 },
248}
249
250/// Reasons a streamed framing wrapper is rejected.
251///
252/// Distinct from [`WalExportError`] so callers can match the framing
253/// failure mode without needing to reach into a tuple variant.
254/// [`BufferedWalSink`] uses the variants individually when
255/// constructing rejection paths.
256#[derive(Debug)]
257#[non_exhaustive]
258pub enum InvalidFramingReason {
259 /// Length prefix exceeds [`MAX_RECORD_BYTES`]. Carries both the
260 /// observed prefix and the bound for caller diagnostics.
261 ///
262 /// **Operator response**: investigate hostile stream injection
263 /// or fragment producer. The 16 MiB ceiling is a fail-secure
264 /// hard bound — a legitimate producer should never emit a frame
265 /// at this size.
266 LengthExceedsMax {
267 /// The prefix value that triggered rejection.
268 prefix: u64,
269 /// The bound that was exceeded — equals [`MAX_RECORD_BYTES`].
270 max: u64,
271 },
272 /// Length prefix is zero — empty records are disallowed.
273 ///
274 /// Every frame must carry a payload; the reader rejects defensively
275 /// to prevent silent-no-op streams.
276 ///
277 /// **Operator response**: re-emit (the producer should never emit
278 /// a length-zero frame).
279 LengthZero,
280 /// Stream truncated — bytes ended mid-record before the full
281 /// length-prefixed payload arrived. Distinguishes operator-side
282 /// disk failure from valid-but-shorter streams.
283 ///
284 /// **Operator response**: re-emit; source likely truncated mid-
285 /// write (disk full, network drop, process crash). The reader
286 /// surfaces this distinct from `HeaderMissing` so the operator
287 /// can distinguish "never started" from "started but cut off".
288 Truncated,
289 /// Stream header magic missing or mismatched at stream start.
290 /// Triggered when the leading 8 bytes do not equal
291 /// [`STREAM_HEADER_MAGIC`].
292 ///
293 /// **Operator response**: verify the source stream is an
294 /// ARKHEXP-format export (not, e.g., an L0 WAL file or an
295 /// unrelated binary). Distinct from
296 /// [`crate::wal_export::WalExportError::UnsupportedStreamVersion`]
297 /// which signals "magic present but version unrecognised".
298 HeaderMissing,
299}
300
301impl fmt::Display for WalExportError {
302 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
303 match self {
304 Self::Io(e) => write!(f, "wal export io: {e}"),
305 Self::InvalidFraming(r) => write!(f, "wal export framing rejected: {r}"),
306 Self::AppendOnlyViolation {
307 expected_seq,
308 got_seq,
309 previous_seq,
310 } => match previous_seq {
311 Some(prev) => write!(
312 f,
313 "wal export append-only violation: expected seq {expected_seq}, got {got_seq} (last appended {prev})"
314 ),
315 None => write!(
316 f,
317 "wal export append-only violation: expected seq {expected_seq}, got {got_seq} (fresh stream, no prior appends)"
318 ),
319 },
320 Self::SeqExhausted { last_seq } => write!(
321 f,
322 "wal export seq space exhausted at last_seq {last_seq} — rotate stream"
323 ),
324 Self::UnsupportedStreamVersion { magic } => write!(
325 f,
326 "wal export unsupported stream version: magic {magic:?} not recognised"
327 ),
328 Self::BufferOverflow {
329 capacity,
330 requested,
331 current_buffer,
332 } => write!(
333 f,
334 "wal export buffer overflow: capacity {capacity} bytes, current {current_buffer}, requested {requested}"
335 ),
336 }
337 }
338}
339
340impl fmt::Display for InvalidFramingReason {
341 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
342 match self {
343 Self::LengthExceedsMax { prefix, max } => {
344 write!(f, "length prefix {prefix} exceeds maximum {max} bytes")
345 }
346 Self::LengthZero => write!(f, "length prefix is zero — empty records disallowed"),
347 Self::Truncated => write!(f, "stream truncated mid-record"),
348 Self::HeaderMissing => write!(
349 f,
350 "stream header magic missing or mismatched (expected ARKHEXP1)"
351 ),
352 }
353 }
354}
355
356impl std::error::Error for WalExportError {
357 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
358 match self {
359 Self::Io(e) => Some(e),
360 Self::InvalidFraming(_)
361 | Self::AppendOnlyViolation { .. }
362 | Self::SeqExhausted { .. }
363 | Self::UnsupportedStreamVersion { .. }
364 | Self::BufferOverflow { .. } => None,
365 }
366 }
367}
368
369impl std::error::Error for InvalidFramingReason {}
370
371impl From<std::io::Error> for WalExportError {
372 fn from(e: std::io::Error) -> Self {
373 Self::Io(e)
374 }
375}
376
377/// Append-only sink for streaming WAL records.
378///
379/// Implementations append fully-encoded record bytes (postcard
380/// `WalRecord` output from `arkhe_kernel::persist::Wal::serialize`)
381/// one record at a time, framing each with a fixed-width `u64` BE
382/// length prefix and following the stream-header pin (firm
383/// requirement #1 + #3). The trait deliberately exposes neither
384/// seek nor truncate nor rewrite — A14 append-only is type-level
385/// enforced by the absence of those methods.
386///
387/// Implementations are responsible for:
388///
389/// - validating the length prefix bounds (`0 < len ≤
390/// `[`MAX_RECORD_BYTES`]) before deref (firm requirement #2);
391/// - emitting the [`STREAM_HEADER_MAGIC`] pin at stream start;
392/// - rejecting out-of-sequence `seq` with
393/// [`WalExportError::AppendOnlyViolation`] (caller-side append-only);
394/// - durable persistence on [`flush`](Self::flush) — concrete
395/// implementations decide the persistence model (fsync per record
396/// vs batched fsync on flush).
397///
398/// `Send + Sync` are not required by the trait itself — concrete sinks
399/// may be `!Sync` if they wrap interior-mutable state. The reference
400/// implementation [`BufferedWalSink`] is `Send` but `!Sync` (uses an
401/// internal `&mut` cursor into the buffer).
402pub trait WalRecordSink {
403 /// Append one record's encoded bytes to the sink. The implementation
404 /// frames the bytes (length prefix + payload) and records them in
405 /// append order. Callers MUST submit records in monotone-increasing
406 /// `seq` order; out-of-order submissions trip
407 /// [`WalExportError::AppendOnlyViolation`].
408 ///
409 /// The bytes parameter SHOULD be the postcard encoding of an L0
410 /// `WalRecord`. The wire-stability test
411 /// `postcard_record_bytes_preserved_bit_exact` verifies that
412 /// streamed bytes match `Wal::serialize` record-section bytes
413 /// without any field-order rewrite (DO NOT TOUCH #7).
414 fn append_record(&mut self, record_bytes: &[u8]) -> Result<(), WalExportError>;
415
416 /// Flush buffered records to the underlying durable storage.
417 /// Returns `Ok(())` once the implementation's durability contract
418 /// is satisfied (fsync completed, network ack received, etc.).
419 /// Implementations are free to flush eagerly inside
420 /// [`append_record`](Self::append_record) — `flush` then becomes
421 /// a no-op.
422 fn flush(&mut self) -> Result<(), WalExportError>;
423}
424
425#[cfg(test)]
426#[allow(clippy::expect_used, clippy::unwrap_used)]
427mod tests {
428 use super::*;
429
430 /// `STREAM_HEADER_MAGIC` is the 8-byte ASCII tag pinned at the
431 /// start of every stream. Wire-stable — a value change is a
432 /// wire-format break and requires a new magic value plus migration,
433 /// not silent reuse.
434 #[test]
435 fn stream_header_magic_is_arkhexp1() {
436 assert_eq!(&STREAM_HEADER_MAGIC, b"ARKHEXP1");
437 }
438
439 /// `StreamMagic::V1` byte mapping pinned at the wire-stability
440 /// golden vector (`0x41 0x52 0x4B 0x48 0x45 0x58 0x50 0x31`).
441 /// Drift between the enum dispatch and the legacy
442 /// `STREAM_HEADER_MAGIC` constant trips this test — keeping the
443 /// byte tag invariant across the cross-version surface.
444 #[test]
445 fn stream_magic_v1_bytes_match_legacy_constant() {
446 assert_eq!(StreamMagic::V1.bytes(), &STREAM_HEADER_MAGIC);
447 assert_eq!(
448 StreamMagic::V1.bytes(),
449 &[0x41, 0x52, 0x4B, 0x48, 0x45, 0x58, 0x50, 0x31]
450 );
451 }
452
453 /// `StreamMagic::recognize` returns `Some(V1)` for the ARKHEXP1
454 /// byte tag and `None` for anything else. Sample `None` cases
455 /// include an unallocated tag (`ARKHEXP2`) and an L0 WAL magic
456 /// mistakenly fed to the export reader (`ARKHEWAL`) — both must
457 /// reject without heap alloc.
458 #[test]
459 fn stream_magic_recognize_v1_and_rejects_unknown() {
460 assert_eq!(StreamMagic::recognize(b"ARKHEXP1"), Some(StreamMagic::V1));
461 // Unallocated tag — must reject without heap alloc.
462 assert_eq!(StreamMagic::recognize(b"ARKHEXP2"), None);
463 // L0 WAL magic — wrong transport, must reject.
464 assert_eq!(StreamMagic::recognize(b"ARKHEWAL"), None);
465 // Random eight bytes.
466 assert_eq!(StreamMagic::recognize(b"\0\0\0\0\0\0\0\0"), None);
467 }
468
469 /// `UnsupportedStreamVersion` Display surface distinguishable +
470 /// carries the offending magic for operator triage.
471 #[test]
472 fn unsupported_stream_version_display_distinguishes_with_magic() {
473 let err = WalExportError::UnsupportedStreamVersion {
474 magic: *b"ARKHEXP2",
475 };
476 let msg = err.to_string();
477 assert!(msg.contains("unsupported stream version"));
478 assert!(msg.contains("not recognised"));
479 // The offending magic appears in the message via Debug repr
480 // so operators can pin the exact byte pattern.
481 assert!(msg.contains("65")); // 0x41 is 'A' = 65 in decimal Debug
482 }
483
484 /// `STREAM_HEADER_MAGIC` distinct from L0 `WalHeader::MAGIC`
485 /// (`ARKHEWAL`) — separate transport format, separate magic.
486 #[test]
487 fn stream_header_magic_distinct_from_l0_wal_magic() {
488 assert_ne!(
489 &STREAM_HEADER_MAGIC,
490 &arkhe_kernel::persist::WalHeader::MAGIC
491 );
492 }
493
494 /// `MAX_RECORD_BYTES` pinned at 16 MiB (fail-secure, ~256× typical
495 /// record size). Bound short-circuits attacker-supplied length-
496 /// prefix overflow + memory-DoS before any deref attempt.
497 #[test]
498 fn max_record_bytes_is_sixteen_mib() {
499 assert_eq!(MAX_RECORD_BYTES, 1 << 24);
500 assert_eq!(MAX_RECORD_BYTES, 16_777_216);
501 }
502
503 /// `WalExportError` implements `Display` + `Error` + `Send + Sync`
504 /// — standard error-type expectations. `Send + Sync` lets the
505 /// error cross thread / async boundaries when sinks operate
506 /// off-runtime-thread.
507 #[test]
508 fn wal_export_error_implements_standard_error_traits() {
509 fn assert_send_sync<T: Send + Sync>() {}
510 fn assert_error<T: std::error::Error>() {}
511 assert_send_sync::<WalExportError>();
512 assert_error::<WalExportError>();
513 assert_send_sync::<InvalidFramingReason>();
514 assert_error::<InvalidFramingReason>();
515 }
516
517 /// Display surface — each variant produces a non-empty
518 /// distinguishable message. The exact strings are operator-facing
519 /// log lines, not part of the wire protocol; the wire-stability
520 /// tests do not verify them bit-exact.
521 #[test]
522 fn wal_export_error_display_is_distinguishable() {
523 let io_err = WalExportError::Io(std::io::Error::other("test"));
524 let framing_err = WalExportError::InvalidFraming(InvalidFramingReason::LengthZero);
525 let append_err = WalExportError::AppendOnlyViolation {
526 expected_seq: 5,
527 got_seq: 3,
528 previous_seq: Some(4),
529 };
530 let exhausted_err = WalExportError::SeqExhausted { last_seq: u64::MAX };
531 let buf_err = WalExportError::BufferOverflow {
532 capacity: 1024,
533 requested: 2048,
534 current_buffer: 768,
535 };
536
537 let io_msg = io_err.to_string();
538 let framing_msg = framing_err.to_string();
539 let append_msg = append_err.to_string();
540 let exhausted_msg = exhausted_err.to_string();
541 let buf_msg = buf_err.to_string();
542
543 assert!(io_msg.contains("io"));
544 assert!(framing_msg.contains("framing"));
545 assert!(append_msg.contains("append-only"));
546 assert!(append_msg.contains('5'));
547 assert!(append_msg.contains('3'));
548 assert!(append_msg.contains('4')); // previous_seq forensic
549 assert!(exhausted_msg.contains("exhausted"));
550 assert!(exhausted_msg.contains("rotate"));
551 assert!(buf_msg.contains("buffer overflow"));
552 assert!(buf_msg.contains("1024"));
553 assert!(buf_msg.contains("2048"));
554 assert!(buf_msg.contains("768")); // current_buffer forensic
555 }
556
557 /// `AppendOnlyViolation` with `previous_seq: None` (fresh stream)
558 /// renders a distinguishable Display string. The fresh-stream
559 /// forensic case is what an operator hits when the caller submits
560 /// a non-zero seq as the first record (e.g., resuming a stream
561 /// from a stale checkpoint).
562 #[test]
563 fn append_only_violation_fresh_stream_display_distinguishes_from_some_case() {
564 let fresh = WalExportError::AppendOnlyViolation {
565 expected_seq: 0,
566 got_seq: 7,
567 previous_seq: None,
568 };
569 let after = WalExportError::AppendOnlyViolation {
570 expected_seq: 5,
571 got_seq: 3,
572 previous_seq: Some(4),
573 };
574 let fresh_msg = fresh.to_string();
575 let after_msg = after.to_string();
576 assert!(fresh_msg.contains("fresh stream"));
577 assert!(fresh_msg.contains("no prior appends"));
578 assert!(after_msg.contains("last appended"));
579 assert!(after_msg.contains('4'));
580 assert_ne!(fresh_msg, after_msg);
581 }
582
583 /// Each `InvalidFramingReason` variant produces a distinguishable
584 /// Display message. Operators triage on these strings.
585 #[test]
586 fn invalid_framing_reason_display_is_distinguishable() {
587 let exceeds = InvalidFramingReason::LengthExceedsMax {
588 prefix: MAX_RECORD_BYTES + 1,
589 max: MAX_RECORD_BYTES,
590 };
591 let zero = InvalidFramingReason::LengthZero;
592 let trunc = InvalidFramingReason::Truncated;
593 let hdr = InvalidFramingReason::HeaderMissing;
594
595 assert!(exceeds.to_string().contains("exceeds maximum"));
596 assert!(zero.to_string().contains("zero"));
597 assert!(trunc.to_string().contains("truncated"));
598 assert!(hdr.to_string().contains("ARKHEXP1"));
599 }
600
601 /// `WalExportError::source()` exposes the underlying `std::io::Error`
602 /// when present so error-chain printers / observability hooks can
603 /// drill down. Non-IO variants return `None`.
604 #[test]
605 fn wal_export_error_source_chains_through_io() {
606 use std::error::Error;
607 let io_err = WalExportError::Io(std::io::Error::other("test"));
608 let framing_err = WalExportError::InvalidFraming(InvalidFramingReason::LengthZero);
609
610 assert!(io_err.source().is_some());
611 assert!(framing_err.source().is_none());
612 }
613
614 /// `From<std::io::Error>` — `?`-friendly conversion at concrete
615 /// sink implementation sites (buffered sink + round-trip fixtures).
616 #[test]
617 fn from_io_error_lifts_to_wal_export_error() {
618 let io_err = std::io::Error::other("test");
619 let lifted: WalExportError = io_err.into();
620 assert!(matches!(lifted, WalExportError::Io(_)));
621 }
622
623 /// No-op stub `WalRecordSink` impl — confirms the trait is
624 /// satisfiable without seek / truncate. Concrete impl is
625 /// [`BufferedWalSink`]. Test asserts the trait's *method shape*
626 /// alone.
627 struct NoopSink;
628
629 impl WalRecordSink for NoopSink {
630 fn append_record(&mut self, _record_bytes: &[u8]) -> Result<(), WalExportError> {
631 Ok(())
632 }
633 fn flush(&mut self) -> Result<(), WalExportError> {
634 Ok(())
635 }
636 }
637
638 /// Trait surface satisfiable without any positioning operation.
639 /// If a future change to [`WalRecordSink`] adds a seek / truncate
640 /// method, this test compiles trivially but its rationale
641 /// docstring is the trip-wire for human review.
642 #[test]
643 fn wal_record_sink_satisfiable_without_seek() {
644 let mut sink = NoopSink;
645 sink.append_record(b"sample").expect("noop sink succeeds");
646 sink.flush().expect("noop flush");
647 }
648}