fsys 0.9.1

Adaptive file and directory IO for Rust — fast, hardware-aware, multi-strategy.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
//! Journal record framing format — self-identifying, integrity-
//! protected, tail-truncation-safe.
//!
//! Every record written by [`JournalHandle::append`](super::JournalHandle::append)
//! is wrapped in a 12-byte frame:
//!
//! ```text
//!  bytes  field           description
//!  -----  -----           -----------
//!  0..4   magic_and_ver   big-endian u32: 0x46535901 ("FSY\x01")
//!                         — identifies the file as an fsys journal
//!                         v1 stream.
//!  4..8   length          little-endian u32: payload byte length.
//!  8..    payload         exactly `length` bytes of caller payload.
//!  ..+4   crc32c          little-endian u32: crc32c of the
//!                         preceding (magic_and_ver + length +
//!                         payload). Validates the entire frame.
//! ```
//!
//! Total frame overhead: **12 bytes per record** (4 magic+ver +
//! 4 length + 4 crc32c). At 64-byte records that's 19% overhead;
//! at 4 KiB it's 0.3%; at 64 KiB it's 0.02%. The overhead is
//! constant per record, so larger records amortise the framing
//! to negligible cost.
//!
//! ## Why these specific choices
//!
//! - **Magic prefix:** lets a reader detect immediately whether
//!   the file at a path is actually an fsys journal vs. a raw
//!   file vs. a different format. Catches format-confusion
//!   attacks (a file with the right extension but the wrong
//!   contents) before they corrupt downstream logic.
//! - **Version byte in the magic:** allows the on-disk format to
//!   evolve without breaking forward compatibility. v2/v3 frames
//!   would set the low byte to 0x02 / 0x03; readers reject
//!   unknown versions explicitly.
//! - **CRC32C** (Castagnoli polynomial) over **xxhash / SHA / etc:**
//!   crc32c is hardware-accelerated on every x86 CPU since 2008
//!   (via the SSE4.2 `crc32` instruction) and on every ARMv8.1+
//!   CPU. On hardware where it's accelerated, crc32c is the
//!   fastest checksum available — typically 1 cycle per 8 bytes.
//!   Checksumming a 4 KiB record costs ~500 ns on modern x86.
//!   xxhash64 would be faster on hardware *without* crc32c
//!   acceleration, but for the database-WAL target the user-
//!   facing platforms (modern Linux x86-64 / ARM64 servers) all
//!   have crc32c hardware support.
//! - **Trailing checksum** vs. inline header checksum: the
//!   checksum is at the end so the writer can compute it from
//!   the already-buffered preceding bytes without re-reading.
//!   For journals with very large records this matters; for
//!   small records it's wash.
//! - **Forward-iteration only:** the format does not include a
//!   length trailer that would enable backward iteration.
//!   Forward replay is the canonical WAL pattern (replay from
//!   last checkpoint to the end). A reverse iteration would
//!   need length-trailer plumbing — filed as 0.9.x if a real
//!   user need surfaces.
//!
//! ## Tail-truncation detection
//!
//! On a crash mid-write, the journal file may end with a
//! partially written frame. The reader detects this in one of
//! three ways:
//!
//! 1. **Truncated header:** less than 8 bytes left when starting
//!    a new frame → end-of-journal.
//! 2. **Truncated payload:** length field claims N bytes but
//!    the file ends before N bytes are read → end-of-journal.
//! 3. **Truncated checksum or checksum mismatch:** the trailing
//!    crc32c is missing or doesn't match the computed value →
//!    end-of-journal.
//!
//! In all three cases the reader stops cleanly at the last
//! fully-written record. Partial bytes after that point are
//! discarded (not returned to the caller), and the resume
//! cursor for a new [`JournalHandle`] should be set to the LSN
//! at which the partial bytes started — the writer will
//! overwrite them.

#![allow(dead_code)] // some helpers are reserved for the writer-side path

use crate::{Error, Result};

/// Frame magic + format version. Big-endian on disk so a
/// hexdump shows `46 53 59 01` clearly.
///
/// `0x46535901`:
/// - `0x46 0x53 0x59` = `"FSY"` (the same prefix as the
///   `fsys::primitive` constants and the `FS-NNNNN` error codes).
/// - `0x01` = format version. Future v2 would be `0x46535902`.
pub(crate) const FRAME_MAGIC_V1: u32 = 0x4653_5901;

/// Frame overhead: magic_and_ver (4) + length (4) + crc32c (4) = 12 bytes.
pub(crate) const FRAME_OVERHEAD: usize = 12;

/// Maximum record payload length supported by the v1 frame
/// format. The length field is `u32`, but we cap below
/// `u32::MAX` to leave headroom for future flag-bit semantics
/// in the high bits.
///
/// Records larger than this should be sharded by the caller
/// (split across multiple appends with an application-level
/// sequencing scheme). For typical WAL workloads (record sizes
/// 64 B – 4 KiB) this cap is unreachable.
pub(crate) const FRAME_MAX_PAYLOAD: u32 = (1 << 28) - 1; // 256 MiB

// ─────────────────────────────────────────────────────────────────
// CRC32C (Castagnoli polynomial, used by SCSI / iSCSI / Btrfs /
// every database WAL since the late 2000s).
//
// Implementation (0.9.1): the `crc32c` crate, which performs
// runtime CPU-feature dispatch — SSE4.2 `crc32` on x86_64 (~30
// GB/s/core), ARMv8 CRC extensions on aarch64, pure-Rust
// fallback elsewhere. Replaces the software lookup-table
// implementation that shipped through 0.9.0 (~2 GB/s/core).
//
// The bit-pattern result is identical: both implementations follow
// RFC 3720 (initial state 0xFFFFFFFF internally; output is
// post-NOT). The KAT vectors in this module's tests
// (`crc32c_known_answer_vectors`) pin the wire format and would
// break loudly if the implementation deviated.
//
// 0.9.1 motivation: the journal's per-frame CRC is hot on the
// bulk-load path (5 M tight-loop appends in the lmdb_style bench).
// Hardware acceleration is one of the load-bearing wins behind
// the post-0.9.0 bulk-load lead recovery vs v0.8.5.
// ─────────────────────────────────────────────────────────────────

/// Computes the CRC-32C checksum of `bytes` per RFC 3720.
///
/// Hardware-dispatched (SSE4.2 / ARMv8 CRC) at runtime via the
/// `crc32c` crate; transparently falls back to a Rust software
/// implementation on hosts without the feature.
#[inline]
pub(crate) fn crc32c(bytes: &[u8]) -> u32 {
    ::crc32c::crc32c(bytes)
}

/// Streaming CRC-32C — useful when checksumming is split across
/// multiple buffers (header bytes + payload bytes) without
/// allocating a contiguous combined buffer.
///
/// Call [`Crc32cBuilder::new`] to start; feed bytes via
/// [`Crc32cBuilder::update`]; finalise with
/// [`Crc32cBuilder::finalize`].
///
/// 0.9.1: backed by `crc32c::crc32c_append`, which preserves the
/// runtime CPU-feature dispatch across multi-chunk inputs. The
/// internal state is the post-NOT (caller-visible) value, so
/// `Crc32cBuilder::new()` initialises with `0` rather than the
/// raw `0xFFFF_FFFF` initial register value. The wire-level
/// output is unchanged.
pub(crate) struct Crc32cBuilder {
    crc: u32,
}

impl Crc32cBuilder {
    /// Begins a new CRC computation.
    #[inline]
    pub(crate) fn new() -> Self {
        Self { crc: 0 }
    }

    /// Feeds `bytes` into the computation.
    #[inline]
    pub(crate) fn update(&mut self, bytes: &[u8]) {
        self.crc = ::crc32c::crc32c_append(self.crc, bytes);
    }

    /// Returns the finalised CRC-32C value.
    #[inline]
    pub(crate) fn finalize(self) -> u32 {
        self.crc
    }
}

// ─────────────────────────────────────────────────────────────────
// Frame encoding / decoding helpers
// ─────────────────────────────────────────────────────────────────

/// Encodes a frame for `payload` into `buf` (which must be sized
/// to `FRAME_OVERHEAD + payload.len()`).
///
/// On success, returns the number of bytes written (= buf len).
#[inline]
pub(crate) fn encode_frame_into(payload: &[u8], buf: &mut [u8]) -> Result<usize> {
    let total = payload.len().saturating_add(FRAME_OVERHEAD);
    if buf.len() < total {
        return Err(Error::Io(std::io::Error::new(
            std::io::ErrorKind::InvalidInput,
            "encode_frame_into: buffer too small",
        )));
    }
    if (payload.len() as u64) > (FRAME_MAX_PAYLOAD as u64) {
        return Err(Error::Io(std::io::Error::new(
            std::io::ErrorKind::InvalidInput,
            "journal record exceeds FRAME_MAX_PAYLOAD (256 MiB)",
        )));
    }
    let len = payload.len() as u32;

    // magic_and_ver — big-endian.
    buf[0..4].copy_from_slice(&FRAME_MAGIC_V1.to_be_bytes());
    // length — little-endian.
    buf[4..8].copy_from_slice(&len.to_le_bytes());
    // payload.
    buf[8..8 + payload.len()].copy_from_slice(payload);

    // crc32c over magic_and_ver + length + payload (everything
    // before the trailing checksum).
    let crc = crc32c(&buf[..8 + payload.len()]);
    buf[8 + payload.len()..total].copy_from_slice(&crc.to_le_bytes());

    Ok(total)
}

/// Allocates a fresh `Vec<u8>` containing the encoded frame for
/// `payload`. Convenience wrapper over [`encode_frame_into`] when
/// the caller doesn't already own a buffer.
#[inline]
pub(crate) fn encode_frame_owned(payload: &[u8]) -> Result<Vec<u8>> {
    let total = payload.len().saturating_add(FRAME_OVERHEAD);
    let mut buf = vec![0u8; total];
    let _ = encode_frame_into(payload, &mut buf)?;
    Ok(buf)
}

/// Outcome of attempting to decode a frame from a byte slice.
#[derive(Debug)]
pub(crate) enum FrameDecode {
    /// Successfully decoded a frame.
    Ok {
        /// Total bytes consumed (= [`FRAME_OVERHEAD`] + payload length).
        consumed: usize,
        /// Payload byte range within the input slice.
        payload_start: usize,
        payload_end: usize,
    },
    /// The input is too short to contain a complete frame —
    /// either the header is truncated or the payload is.
    /// The reader treats this as end-of-journal (tail truncation).
    Truncated,
    /// The frame's magic doesn't match — either this isn't an
    /// fsys journal, or the file is corrupted at this offset.
    BadMagic,
    /// The frame's length field exceeds [`FRAME_MAX_PAYLOAD`].
    LengthOverflow,
    /// The trailing CRC-32C doesn't match the computed value —
    /// the record's bytes are corrupt. The reader treats this
    /// as end-of-journal (tail truncation from a partial
    /// write).
    ChecksumMismatch,
}

/// Attempts to decode a frame starting at offset 0 of `bytes`.
///
/// `bytes` is typically a slice of a larger buffer; the decoder
/// reads only what it needs. The caller advances by `consumed`
/// bytes after a successful decode.
#[inline]
pub(crate) fn decode_frame(bytes: &[u8]) -> FrameDecode {
    if bytes.len() < 8 {
        return FrameDecode::Truncated;
    }
    let magic = u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
    if magic != FRAME_MAGIC_V1 {
        return FrameDecode::BadMagic;
    }
    let length = u32::from_le_bytes([bytes[4], bytes[5], bytes[6], bytes[7]]);
    if length > FRAME_MAX_PAYLOAD {
        return FrameDecode::LengthOverflow;
    }
    let length = length as usize;
    let total = 8 + length + 4; // header + payload + crc
    if bytes.len() < total {
        return FrameDecode::Truncated;
    }
    // Compute CRC-32C over the header + payload region.
    let computed = crc32c(&bytes[..8 + length]);
    let stored_crc = u32::from_le_bytes([
        bytes[8 + length],
        bytes[8 + length + 1],
        bytes[8 + length + 2],
        bytes[8 + length + 3],
    ]);
    if computed != stored_crc {
        return FrameDecode::ChecksumMismatch;
    }
    FrameDecode::Ok {
        consumed: total,
        payload_start: 8,
        payload_end: 8 + length,
    }
}

// ─────────────────────────────────────────────────────────────────
// Tests
// ─────────────────────────────────────────────────────────────────

#[cfg(test)]
mod tests {
    use super::*;

    /// Known-answer test vectors for CRC-32C, taken from RFC 3720
    /// and Castagnoli's original paper. If these fail, the
    /// implementation is wrong.
    #[test]
    fn crc32c_known_answer_vectors() {
        // Empty input → CRC-32C = 0x00000000 (initial 0xFFFFFFFF
        // post-NOT = 0).
        assert_eq!(crc32c(b""), 0);

        // 32 bytes of 0x00 → 0x8a9136aa per RFC 3720.
        let zeros = [0u8; 32];
        assert_eq!(crc32c(&zeros), 0x8a9136aa);

        // 32 bytes of 0xff → 0x62a8ab43 per RFC 3720.
        let ones = [0xffu8; 32];
        assert_eq!(crc32c(&ones), 0x62a8ab43);

        // Sequential bytes 0..32 → 0x46dd794e per RFC 3720.
        let seq: [u8; 32] = std::array::from_fn(|i| i as u8);
        assert_eq!(crc32c(&seq), 0x46dd794e);
    }

    #[test]
    fn streaming_crc_matches_one_shot() {
        let data = b"the quick brown fox jumps over the lazy dog";
        let one_shot = crc32c(data);

        let mut builder = Crc32cBuilder::new();
        builder.update(&data[..10]);
        builder.update(&data[10..25]);
        builder.update(&data[25..]);
        let streamed = builder.finalize();

        assert_eq!(one_shot, streamed);
    }

    #[test]
    fn frame_roundtrip_simple() {
        let payload = b"hello, journal!";
        let buf = encode_frame_owned(payload).expect("encode");
        assert_eq!(buf.len(), FRAME_OVERHEAD + payload.len());

        match decode_frame(&buf) {
            FrameDecode::Ok {
                consumed,
                payload_start,
                payload_end,
            } => {
                assert_eq!(consumed, buf.len());
                assert_eq!(&buf[payload_start..payload_end], payload);
            }
            other => panic!("expected Ok, got {other:?}"),
        }
    }

    #[test]
    fn frame_roundtrip_empty_payload() {
        let buf = encode_frame_owned(b"").expect("encode");
        assert_eq!(buf.len(), FRAME_OVERHEAD);
        match decode_frame(&buf) {
            FrameDecode::Ok {
                payload_start,
                payload_end,
                ..
            } => {
                assert_eq!(payload_start, payload_end);
            }
            other => panic!("expected Ok empty, got {other:?}"),
        }
    }

    #[test]
    fn frame_roundtrip_4kib() {
        let payload = vec![0xCDu8; 4096];
        let buf = encode_frame_owned(&payload).expect("encode");
        match decode_frame(&buf) {
            FrameDecode::Ok {
                payload_start,
                payload_end,
                ..
            } => {
                assert_eq!(&buf[payload_start..payload_end], payload.as_slice());
            }
            other => panic!("expected Ok, got {other:?}"),
        }
    }

    #[test]
    fn truncated_header_returns_truncated() {
        let buf = [0u8; 5]; // < 8
        assert!(matches!(decode_frame(&buf), FrameDecode::Truncated));
    }

    #[test]
    fn truncated_payload_returns_truncated() {
        let payload = b"this won't fit";
        let mut buf = encode_frame_owned(payload).expect("encode");
        buf.truncate(buf.len() - 5); // chop off CRC + last few bytes of payload
        assert!(matches!(decode_frame(&buf), FrameDecode::Truncated));
    }

    #[test]
    fn bad_magic_returns_bad_magic() {
        let mut buf = encode_frame_owned(b"hi").expect("encode");
        buf[0] = 0xDE; // corrupt magic
        assert!(matches!(decode_frame(&buf), FrameDecode::BadMagic));
    }

    #[test]
    fn checksum_mismatch_returns_mismatch() {
        let mut buf = encode_frame_owned(b"original").expect("encode");
        // Flip a payload byte without updating the CRC.
        buf[8] ^= 0xFF;
        assert!(matches!(decode_frame(&buf), FrameDecode::ChecksumMismatch));
    }

    #[test]
    fn length_overflow_rejected_at_encode() {
        // We can't construct a Vec of FRAME_MAX_PAYLOAD+1 bytes
        // in a unit test economically. Instead, we synthesise a
        // header with the bad length and confirm the decoder
        // catches it.
        let mut buf = vec![0u8; 8];
        buf[0..4].copy_from_slice(&FRAME_MAGIC_V1.to_be_bytes());
        let bad_len = (FRAME_MAX_PAYLOAD + 1).to_le_bytes();
        buf[4..8].copy_from_slice(&bad_len);
        assert!(matches!(decode_frame(&buf), FrameDecode::LengthOverflow));
    }

    #[test]
    fn frame_overhead_is_exactly_12_bytes() {
        // Pinning this — the BENCH.md numbers depend on it being 12.
        assert_eq!(FRAME_OVERHEAD, 12);
    }

    #[test]
    fn frame_max_payload_fits_in_u32() {
        assert!((FRAME_MAX_PAYLOAD as u64) < (u32::MAX as u64));
    }

    /// Property test: 10,000 random round-trips. Covers payload
    /// sizes from empty to 64 KiB with random content. If the
    /// encoder/decoder ever disagrees, this test catches it
    /// statistically.
    ///
    /// Uses a deterministic linear-congruential PRNG seeded from
    /// the payload-length cursor so the test is reproducible.
    #[test]
    fn property_random_round_trips() {
        // Tiny LCG (numerical recipes constants) so the test is
        // self-contained and reproducible — no proptest dep
        // needed for a property check this simple.
        let mut state: u64 = 0xCAFEBABE_DEADBEEF;
        let mut next = || {
            state = state
                .wrapping_mul(6364136223846793005)
                .wrapping_add(1442695040888963407);
            state
        };

        for iteration in 0..10_000u32 {
            // Vary payload size across iterations: power of 2
            // distribution from 0 to 16 KiB, with occasional
            // larger sizes up to 64 KiB.
            let size_class = (next() & 0xFF) as usize;
            let size = match size_class {
                0..=10 => 0,                                  // empty
                11..=50 => (next() % 64) as usize,            // tiny
                51..=150 => (next() % 1024) as usize,         // small
                151..=220 => (next() % (16 * 1024)) as usize, // medium
                _ => (next() % (64 * 1024)) as usize,         // large
            };
            // Fill payload with a deterministic but iteration-
            // sensitive byte pattern (xor with iteration so
            // adjacent iterations don't have identical content).
            let payload: Vec<u8> = (0..size)
                .map(|i| ((next() >> 16) as u8) ^ (iteration as u8) ^ (i as u8))
                .collect();
            let encoded = encode_frame_owned(&payload).expect("encode");
            assert_eq!(
                encoded.len(),
                FRAME_OVERHEAD + payload.len(),
                "encoded length wrong on iteration {iteration} (size {size})"
            );

            match decode_frame(&encoded) {
                FrameDecode::Ok {
                    consumed,
                    payload_start,
                    payload_end,
                } => {
                    assert_eq!(
                        consumed,
                        encoded.len(),
                        "consumed mismatch on iteration {iteration}"
                    );
                    assert_eq!(
                        &encoded[payload_start..payload_end],
                        payload.as_slice(),
                        "payload mismatch on iteration {iteration} (size {size})"
                    );
                }
                other => panic!("decode failed on iteration {iteration} (size {size}): {other:?}"),
            }
        }
    }

    /// Property test: every single-bit flip in any non-empty
    /// frame must surface as `ChecksumMismatch` or `BadMagic`
    /// — never as `Ok` with a different payload.
    ///
    /// CRC-32C provides single-bit-flip detection by
    /// construction (the polynomial is chosen for this); this
    /// test pins the property empirically.
    #[test]
    fn property_single_bit_flip_detected() {
        let payloads: &[&[u8]] = &[
            b"a",
            b"hello",
            b"the quick brown fox jumps over the lazy dog",
            &[0xFF; 256],
            &[0; 1024],
        ];
        for payload in payloads {
            let encoded = encode_frame_owned(payload).expect("encode");
            for byte_idx in 0..encoded.len() {
                for bit in 0..8 {
                    let mut corrupted = encoded.clone();
                    corrupted[byte_idx] ^= 1 << bit;
                    match decode_frame(&corrupted) {
                        FrameDecode::Ok {
                            payload_start,
                            payload_end,
                            ..
                        } if &corrupted[payload_start..payload_end] == *payload => {
                            panic!(
                                "single bit flip at byte {byte_idx} bit {bit} \
                                 produced a 'valid' decode with the original payload — \
                                 CRC-32C failed to detect corruption"
                            );
                        }
                        FrameDecode::Ok {
                            payload_start,
                            payload_end,
                            ..
                        } => {
                            // A different payload bytes that decoded
                            // — also a CRC failure (CRC must catch
                            // any single-bit flip).
                            panic!(
                                "single bit flip at byte {byte_idx} bit {bit} \
                                 produced a 'valid' decode with payload {:?}",
                                &corrupted[payload_start..payload_end]
                            );
                        }
                        // BadMagic / Truncated / LengthOverflow /
                        // ChecksumMismatch are all valid outcomes —
                        // the corruption was detected.
                        _ => {}
                    }
                }
            }
        }
    }

    #[test]
    fn forward_iteration_over_concatenated_frames() {
        // The reader's iter pattern: decode a frame, advance by
        // `consumed`, decode the next frame. This test pins the
        // invariant that consecutive frames decode cleanly.
        let mut all = Vec::new();
        let payloads: &[&[u8]] = &[b"first", b"second record", b"", b"fourth!"];
        for p in payloads {
            all.extend_from_slice(&encode_frame_owned(p).unwrap());
        }

        let mut cursor = 0;
        let mut decoded: Vec<&[u8]> = Vec::new();
        while cursor < all.len() {
            match decode_frame(&all[cursor..]) {
                FrameDecode::Ok {
                    consumed,
                    payload_start,
                    payload_end,
                } => {
                    decoded.push(&all[cursor + payload_start..cursor + payload_end]);
                    cursor += consumed;
                }
                other => panic!("frame {} unexpected: {other:?}", decoded.len()),
            }
        }
        assert_eq!(decoded, payloads);
    }
}