obj-core 1.1.0

Storage engine internals for the obj embedded document database (pager, WAL, B-tree, codec, catalog).
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
//! B+tree node encode / decode.
//!
//! See `docs/format.md` § B+tree for the on-disk layout. This module
//! is the reference implementation; the tests at the bottom validate
//! round-trip equality and rejection of every documented invariant
//! violation.
//!
//! # Power-of-ten posture
//!
//! - **Rule 2.** Every loop iterates over `key_count`, the slot
//!   directory length, or the page-byte range — all bounded by
//!   `PAGE_SIZE`.
//! - **Rule 3.** The in-memory [`DecodedNode`] uses `Vec`s, not
//!   `heapless::Vec`s of the worst-case slot count: the worst-case
//!   slot count times the worst-case per-key buffer (~1 MiB) would
//!   overflow a 2 MiB stack. The B+tree's *traversal stack* uses
//!   `heapless::Vec` to satisfy Rule 3 on the hot read path; the
//!   *decoded representation* of a single node is a transient
//!   per-page artifact whose heap allocation is unavoidable and is
//!   not on a real-time path.
//!
//!   Phase 7A perf-audit note (extended). The initial Power-of-Ten
//!   audit flagged the three `Vec`s in [`DecodedNode`] (`children`,
//!   `leaves`, `internals`) as a hot-path allocation cost. Inspection
//!   shows the actual cost is far smaller than it looks and a
//!   `heapless`-conversion is not worth shipping. Four reasons:
//!   (1) `Vec::new()` does not allocate — the first `reserve` does —
//!   so a leaf decode allocates exactly ONE outer `Vec` (the `leaves`
//!   spine) and an internal decode allocates TWO (`children` +
//!   `internals`); the unused vecs stay at zero capacity.
//!   (2) Each leaf entry's `key` and `value` are themselves `Vec<u8>`;
//!   for an N-entry leaf that's 2N inner allocations vs. the 1
//!   outer, so converting the outer saves at best 1/(2N+1) on a real
//!   workload (~0.5 % on a 100-entry leaf).
//!   (3) `LEAF_SLOT_CAP ≈ 1017` and `sizeof(LeafEntry) = 48` — a
//!   stack-allocated `heapless::Vec<LeafEntry, LEAF_SLOT_CAP>` adds
//!   ~48 KiB to every [`decode_node`] frame, a stack-overflow hazard
//!   on small-stack targets (embedded, fiber runtimes) and a 48 KiB
//!   zero-write per call under debug + stack-protectors. The
//!   conversion makes the hot path *worse* in latency-sensitive
//!   deployments.
//!   (4) `index_lookup` (warm-cache point read) spends most of its
//!   time in `pager.read_page` (page-cache hash lookup), CRC32C
//!   verification of the page trailer, and the descend stack — not
//!   in `Vec::reserve`. A 100k-leaf `collection_scan` does ~100 000
//!   outer allocs vs. ~50 000 000 inner allocs (~500 bytes/doc with
//!   2 inner `Vec`s per entry); the outer count is a noise floor.
//!
//!   The conclusion: the outer `Vec`s stay. A real perf win for
//!   `decode_node` requires attacking the inner-vec allocations
//!   (e.g. decode-in-place with `&[u8]` slices into the page buffer,
//!   borrowing the pager's page-cache buffer for the iterator's
//!   lifetime), which is a format-aware refactor of the node API
//!   and is out of scope for Phase 7 polish.
//! - **Rule 5.** `validate_node` is the central invariant check; it
//!   is `debug_assert!`-ed at every mutation site and surfaces as
//!   `Error::Corruption` on decode of a malformed page.
//! - **Rule 7.** No `unwrap` / `expect` on any error-bearing path;
//!   length-prefixed varint reads return `Error::Corruption` on
//!   underflow.

#![forbid(unsafe_code)]

use crate::error::{Error, Result};
use crate::pager::page::{Page, PAGE_SIZE, PAGE_TRAILER_SIZE};

/// Page-type tag for an internal B+tree node. See `docs/format.md`
/// § Page types.
pub const PAGE_TYPE_BTREE_INTERNAL: u8 = 0x02;
/// Page-type tag for a leaf B+tree node.
pub const PAGE_TYPE_BTREE_LEAF: u8 = 0x03;

/// Fixed-size B+tree node header. See `docs/format.md` § Node header.
pub const NODE_HEADER_SIZE: usize = 24;

// Field offsets within the node header.
const OFF_PAGE_TYPE: usize = 0;
const OFF_LEVEL: usize = 1;
const OFF_KEY_COUNT: usize = 2;
const OFF_FREE_SPACE_OFF: usize = 4;
const OFF_NEXT_SIBLING: usize = 8;
const OFF_RESERVED: usize = 16;

/// Bytes per slot in a leaf slot directory. The entry is a single
/// u32 LE giving the byte offset of the corresponding heap entry.
pub const LEAF_SLOT_BYTES: usize = 4;

/// Bytes per slot in an internal slot directory. The entry is a u32
/// LE offset followed by a u64 LE right-child page-id.
pub const INTERNAL_SLOT_BYTES: usize = 12;

/// Bytes the leftmost-child page-id occupies in an internal node,
/// immediately after the node header.
pub const INTERNAL_LEFTMOST_CHILD_BYTES: usize = 8;

/// Byte offset of the first byte after the node header (leaf slot
/// directory start, OR internal leftmost-child).
const PAYLOAD_OFFSET: usize = NODE_HEADER_SIZE;

/// Last byte index (exclusive) of the in-node payload. Beyond this
/// lies the page trailer.
const PAYLOAD_END: usize = PAGE_SIZE - PAGE_TRAILER_SIZE;

/// Maximum number of slots a leaf can hold at the worst-case empty
/// key/value sizes. Used to size the in-memory slot vector.
pub const LEAF_SLOT_CAP: usize = (PAYLOAD_END - PAYLOAD_OFFSET) / LEAF_SLOT_BYTES;

/// Maximum number of slots an internal node can hold at the worst
/// case. Used to size the in-memory slot vector.
pub const INTERNAL_SLOT_CAP: usize =
    (PAYLOAD_END - PAYLOAD_OFFSET - INTERNAL_LEFTMOST_CHILD_BYTES) / INTERNAL_SLOT_BYTES;

/// Maximum key length the format permits. See `docs/format.md`
/// § Key and value encoding.
#[must_use]
pub const fn max_key_len() -> usize {
    PAGE_SIZE / 4
}

/// Maximum value length that still fits inline in a leaf alongside
/// at least one slot. See `docs/format.md` § Key and value encoding.
#[must_use]
pub const fn max_inline_value(key_len: usize) -> usize {
    // Available payload: PAGE_SIZE - trailer - node header - one slot
    // dir entry. Minus the heap entry's two varints.
    let base = PAYLOAD_END - PAYLOAD_OFFSET - LEAF_SLOT_BYTES;
    // 9 bytes max per LEB128 varint at 64-bit; we use two of them.
    let varints = 9 + 9;
    if key_len + varints >= base {
        0
    } else {
        base - key_len - varints
    }
}

/// Whether a node is an internal node or a leaf.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NodeKind {
    /// Internal node — pivots + child pointers, no user values.
    Internal,
    /// Leaf node — `(key, value)` slots.
    Leaf,
}

/// One leaf slot in the decoded in-memory representation.
#[derive(Debug, Clone)]
pub struct LeafEntry {
    /// The key bytes.
    pub key: Vec<u8>,
    /// The value bytes.
    pub value: Vec<u8>,
}

/// One internal-node pivot entry in the decoded in-memory
/// representation. The corresponding right child is stored in
/// [`DecodedNode::children`] at index `i + 1`.
#[derive(Debug, Clone)]
pub struct InternalEntry {
    /// The pivot key.
    pub key: Vec<u8>,
}

/// Alias for "one of the right-child page-ids in an internal node".
/// We store children as raw `u64`s; the on-disk format permits `0`
/// only as the leaf `next_sibling` sentinel, never as an internal
/// child pointer.
pub type ChildEntry = u64;

/// Decoded in-memory representation of a B+tree node.
///
/// One [`DecodedNode`] is built per `Pager::read_page` of a B+tree
/// page; lifetime is "until the caller is done inspecting the
/// node." See the module-level Rule 3 note for why we use `Vec`
/// rather than `heapless::Vec` for the slot collections.
#[derive(Debug, Clone)]
pub struct DecodedNode {
    /// Node kind (internal or leaf).
    pub kind: NodeKind,
    /// Tree level. Leaves are level 0; internals are level ≥ 1.
    pub level: u8,
    /// Right-sibling leaf page-id (leaves only; `0` on the last leaf
    /// and on internal nodes).
    pub next_sibling: u64,
    /// Child page-ids (internal nodes only). Length is
    /// `internals.len() + 1` when the node is internal, `0` for
    /// leaves.
    pub children: Vec<ChildEntry>,
    /// Leaf slots (leaf nodes only).
    pub leaves: Vec<LeafEntry>,
    /// Internal-node pivots (internal nodes only).
    pub internals: Vec<InternalEntry>,
}

impl DecodedNode {
    /// Number of bytes the slot directory + key/value heap currently
    /// occupy in a freshly-encoded page. Used by the insert path to
    /// decide whether a node has room for one more entry.
    #[must_use]
    pub fn occupied_bytes(&self) -> usize {
        match self.kind {
            NodeKind::Leaf => {
                let slot_bytes = self.leaves.len() * LEAF_SLOT_BYTES;
                let heap_bytes: usize = self
                    .leaves
                    .iter()
                    .map(|e| {
                        varint_len(u64_from_usize(e.key.len()))
                            + e.key.len()
                            + varint_len(u64_from_usize(e.value.len()))
                            + e.value.len()
                    })
                    .sum();
                slot_bytes + heap_bytes
            }
            NodeKind::Internal => {
                let slot_bytes = self.internals.len() * INTERNAL_SLOT_BYTES;
                let heap_bytes: usize = self
                    .internals
                    .iter()
                    .map(|e| varint_len(u64_from_usize(e.key.len())) + e.key.len())
                    .sum();
                INTERNAL_LEFTMOST_CHILD_BYTES + slot_bytes + heap_bytes
            }
        }
    }

    /// Available payload bytes a fresh encode would still have.
    #[must_use]
    pub fn free_bytes(&self) -> usize {
        let cap = PAYLOAD_END - PAYLOAD_OFFSET;
        cap.saturating_sub(self.occupied_bytes())
    }
}

/// Encode `node` into `page`. Stamps the page trailer.
///
/// # Errors
///
/// Returns [`Error::BTreeInvariantViolated`] if the node's slot count
/// or key/value sizes do not fit in a single page.
pub fn encode_node(node: &DecodedNode, page: &mut Page) -> Result<()> {
    validate_node_release(node)?;
    let buf = page.as_bytes_mut();
    buf.fill(0);
    write_node_header(buf, node);
    match node.kind {
        NodeKind::Leaf => encode_leaf_body(buf, node)?,
        NodeKind::Internal => encode_internal_body(buf, node)?,
    }
    // Stamp the trailer using the pager helper so the page is
    // immediately readable through `Pager::write_page` (which leaves
    // the trailer alone — it expects a stamped page).
    crate::pager::checksum::write_page_trailer(page);
    Ok(())
}

fn write_node_header(buf: &mut [u8; PAGE_SIZE], node: &DecodedNode) {
    let page_type = match node.kind {
        NodeKind::Leaf => PAGE_TYPE_BTREE_LEAF,
        NodeKind::Internal => PAGE_TYPE_BTREE_INTERNAL,
    };
    buf[OFF_PAGE_TYPE] = page_type;
    buf[OFF_LEVEL] = node.level;
    let key_count = match node.kind {
        NodeKind::Leaf => node.leaves.len(),
        NodeKind::Internal => node.internals.len(),
    };
    // Bounded by LEAF_SLOT_CAP / INTERNAL_SLOT_CAP at encode time; the
    // narrowing cast is safe because both caps fit in u16 (LEAF_SLOT_CAP
    // = (4092 - 24) / 4 = 1017 < 65535).
    let kc16 = u16_from_usize(key_count);
    buf[OFF_KEY_COUNT..OFF_KEY_COUNT + 2].copy_from_slice(&kc16.to_le_bytes());
    // `free_space_off` is the heap watermark — the byte offset of
    // the first heap entry currently in use. After a compacting
    // encode this is `PAYLOAD_END - heap_bytes`.
    let heap_bytes: usize = match node.kind {
        NodeKind::Leaf => node
            .leaves
            .iter()
            .map(|e| {
                varint_len(u64_from_usize(e.key.len()))
                    + e.key.len()
                    + varint_len(u64_from_usize(e.value.len()))
                    + e.value.len()
            })
            .sum(),
        NodeKind::Internal => node
            .internals
            .iter()
            .map(|e| varint_len(u64_from_usize(e.key.len())) + e.key.len())
            .sum(),
    };
    let fso = u32_from_usize(PAYLOAD_END.saturating_sub(heap_bytes));
    buf[OFF_FREE_SPACE_OFF..OFF_FREE_SPACE_OFF + 4].copy_from_slice(&fso.to_le_bytes());
    buf[OFF_NEXT_SIBLING..OFF_NEXT_SIBLING + 8].copy_from_slice(&node.next_sibling.to_le_bytes());
    // OFF_RESERVED (16..24) stays zero per the spec.
    let _ = OFF_RESERVED;
}

fn encode_leaf_body(buf: &mut [u8; PAGE_SIZE], node: &DecodedNode) -> Result<()> {
    let mut heap_cursor = PAYLOAD_END;
    let mut slot_off = PAYLOAD_OFFSET;
    for entry in &node.leaves {
        let entry_len = varint_len(u64_from_usize(entry.key.len()))
            + entry.key.len()
            + varint_len(u64_from_usize(entry.value.len()))
            + entry.value.len();
        if heap_cursor < entry_len + slot_off + LEAF_SLOT_BYTES {
            return Err(Error::BTreeInvariantViolated {
                reason: "leaf encode: slot dir and heap collide",
            });
        }
        heap_cursor -= entry_len;
        let mut cur = heap_cursor;
        let kl = u64_from_usize(entry.key.len());
        cur += write_varint(&mut buf[cur..], kl);
        buf[cur..cur + entry.key.len()].copy_from_slice(&entry.key);
        cur += entry.key.len();
        let vl = u64_from_usize(entry.value.len());
        cur += write_varint(&mut buf[cur..], vl);
        buf[cur..cur + entry.value.len()].copy_from_slice(&entry.value);
        let off32 = u32_from_usize(heap_cursor);
        buf[slot_off..slot_off + LEAF_SLOT_BYTES].copy_from_slice(&off32.to_le_bytes());
        slot_off += LEAF_SLOT_BYTES;
    }
    Ok(())
}

fn encode_internal_body(buf: &mut [u8; PAGE_SIZE], node: &DecodedNode) -> Result<()> {
    if node.children.len() != node.internals.len() + 1 {
        return Err(Error::BTreeInvariantViolated {
            reason: "internal node has children.len() != pivots+1",
        });
    }
    let leftmost = node.children[0];
    buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + INTERNAL_LEFTMOST_CHILD_BYTES]
        .copy_from_slice(&leftmost.to_le_bytes());
    let mut heap_cursor = PAYLOAD_END;
    let mut slot_off = PAYLOAD_OFFSET + INTERNAL_LEFTMOST_CHILD_BYTES;
    for (i, pivot) in node.internals.iter().enumerate() {
        let right_child = node.children[i + 1];
        let entry_len = varint_len(u64_from_usize(pivot.key.len())) + pivot.key.len();
        if heap_cursor < entry_len + slot_off + INTERNAL_SLOT_BYTES {
            return Err(Error::BTreeInvariantViolated {
                reason: "internal encode: slot dir and heap collide",
            });
        }
        heap_cursor -= entry_len;
        let mut cur = heap_cursor;
        cur += write_varint(&mut buf[cur..], u64_from_usize(pivot.key.len()));
        buf[cur..cur + pivot.key.len()].copy_from_slice(&pivot.key);
        let off32 = u32_from_usize(heap_cursor);
        buf[slot_off..slot_off + 4].copy_from_slice(&off32.to_le_bytes());
        buf[slot_off + 4..slot_off + INTERNAL_SLOT_BYTES]
            .copy_from_slice(&right_child.to_le_bytes());
        slot_off += INTERNAL_SLOT_BYTES;
    }
    Ok(())
}

/// Decode a B+tree page into a [`DecodedNode`].
///
/// # Errors
///
/// Returns [`Error::Corruption`] if any field is malformed: bad
/// page-type tag, key-count exceeding the slot cap, slot offset
/// outside the page, varint length running past the page end, or
/// keys not in ascending order.
///
/// # Perf note (Phase 7A audit)
///
/// The three `Vec`s on [`DecodedNode`] (`children`, `leaves`,
/// `internals`) stay heap-allocated rather than `heapless::Vec`-
/// backed. See the module-level Rule 3 note for the full
/// allocator-vs-stack-frame analysis; in summary, the outer-vec
/// cost is one allocation per leaf / two per internal — dwarfed
/// by the 2N inner `Vec<u8>` allocations for the entry keys and
/// values. The inner allocations are the real hot-path tax and
/// require a borrowed-slice refactor of the node API to remove.
pub fn decode_node(buf: &[u8; PAGE_SIZE]) -> Result<DecodedNode> {
    let page_type = buf[OFF_PAGE_TYPE];
    let kind = match page_type {
        PAGE_TYPE_BTREE_LEAF => NodeKind::Leaf,
        PAGE_TYPE_BTREE_INTERNAL => NodeKind::Internal,
        _ => return Err(Error::Corruption { page_id: 0 }),
    };
    let level = buf[OFF_LEVEL];
    let key_count = u16::from_le_bytes([buf[OFF_KEY_COUNT], buf[OFF_KEY_COUNT + 1]]) as usize;
    let next_sibling = u64::from_le_bytes([
        buf[OFF_NEXT_SIBLING],
        buf[OFF_NEXT_SIBLING + 1],
        buf[OFF_NEXT_SIBLING + 2],
        buf[OFF_NEXT_SIBLING + 3],
        buf[OFF_NEXT_SIBLING + 4],
        buf[OFF_NEXT_SIBLING + 5],
        buf[OFF_NEXT_SIBLING + 6],
        buf[OFF_NEXT_SIBLING + 7],
    ]);
    let mut node = DecodedNode {
        kind,
        level,
        next_sibling,
        children: Vec::new(),
        leaves: Vec::new(),
        internals: Vec::new(),
    };
    match kind {
        NodeKind::Leaf => decode_leaf_body(buf, &mut node, key_count)?,
        NodeKind::Internal => decode_internal_body(buf, &mut node, key_count)?,
    }
    validate_node_release(&node)?;
    Ok(node)
}

/// Read only the node-kind tag from a B+tree page header.
///
/// O(1), allocation-free. Used by the snapshot read path to decide
/// whether to descend (full [`decode_node`] for an internal node, so
/// `choose_child` has the pivots) or to copy out a single value via
/// [`decode_node_find`] (a leaf). A malformed page-type tag surfaces
/// as [`Error::Corruption`] identically to [`decode_node`].
///
/// # Errors
///
/// [`Error::Corruption`] if the page-type tag is neither leaf nor
/// internal.
pub fn peek_node_kind(buf: &[u8; PAGE_SIZE]) -> Result<NodeKind> {
    match buf[OFF_PAGE_TYPE] {
        PAGE_TYPE_BTREE_LEAF => Ok(NodeKind::Leaf),
        PAGE_TYPE_BTREE_INTERNAL => Ok(NodeKind::Internal),
        _ => Err(Error::Corruption { page_id: 0 }),
    }
}

/// Find-only leaf decode: scan a leaf page for `target` and copy out
/// ONLY the matched value as an owned `Vec<u8>` — never the whole
/// 2N-entry node.
///
/// This is the read-path fast lane for [`crate::btree::BTree::get`] /
/// `get_via_snapshot`. The page must be a leaf; an internal-node tag
/// is rejected as [`Error::Corruption`] (a read never calls this on an
/// internal node — `get_via_snapshot` gates on [`peek_node_kind`] and
/// `BTree::get` descends to a leaf first).
///
/// # Safety / corruption posture
///
/// This path keeps EVERY per-slot integrity check that [`decode_node`]
/// applies to the slots it actually reads: the leaf header invariants
/// (`level == 0`, `key_count <= LEAF_SLOT_CAP`), the slot-offset range
/// check, `read_length_prefixed`'s `checked_add` overflow guard and
/// payload-end bound, the `key.len() <= max_key_len()` cap, and the
/// inline strictly-ascending-key check. A tampered page therefore
/// yields [`Error::Corruption`] identically to the full decode. The
/// ONLY check it skips is the whole-node `windows(2)` ordering
/// re-walk in `validate_node_release`, which is redundant because
/// the inline ascending check is re-derived on every key the scan
/// compares. No data-derived index touches `buf[i]` raw — every such
/// access goes through `read_length_prefixed`'s checked slicing.
///
/// # Errors
///
/// [`Error::Corruption`] on any malformed header, slot, or varint;
/// [`Error::BTreeInvariantViolated`] if the leaf header invariants
/// fail.
pub fn decode_node_find(buf: &[u8; PAGE_SIZE], target: &[u8]) -> Result<Option<Vec<u8>>> {
    match peek_node_kind(buf)? {
        NodeKind::Leaf => {}
        NodeKind::Internal => {
            return Err(Error::BTreeInvariantViolated {
                reason: "decode_node_find called on an internal node",
            });
        }
    }
    // Leaf structural header invariants (the O(1) subset of the full
    // validate that the inline scan does not cover).
    if buf[OFF_LEVEL] != 0 {
        return Err(Error::BTreeInvariantViolated {
            reason: "leaf has non-zero level",
        });
    }
    let key_count = u16::from_le_bytes([buf[OFF_KEY_COUNT], buf[OFF_KEY_COUNT + 1]]) as usize;
    if key_count > LEAF_SLOT_CAP {
        return Err(Error::Corruption { page_id: 0 });
    }
    find_leaf_value(buf, key_count, target)
}

/// Linear scan of a validated-header leaf for `target`. Applies the
/// identical per-slot bounds / varint / ascending-key checks as
/// [`decode_leaf_body`] on every slot it visits, comparing keys as
/// borrowed `&[u8]` slices into `buf` — no per-entry `to_vec`.
fn find_leaf_value(
    buf: &[u8; PAGE_SIZE],
    key_count: usize,
    target: &[u8],
) -> Result<Option<Vec<u8>>> {
    let mut prev_key: Option<&[u8]> = None;
    for i in 0..key_count {
        let slot_off = PAYLOAD_OFFSET + i * LEAF_SLOT_BYTES;
        if slot_off + LEAF_SLOT_BYTES > PAYLOAD_END {
            return Err(Error::Corruption { page_id: 0 });
        }
        let entry_off = u32::from_le_bytes([
            buf[slot_off],
            buf[slot_off + 1],
            buf[slot_off + 2],
            buf[slot_off + 3],
        ]) as usize;
        if !(PAYLOAD_OFFSET..PAYLOAD_END).contains(&entry_off) {
            return Err(Error::Corruption { page_id: 0 });
        }
        let (key, after_key) = read_length_prefixed(buf, entry_off)?;
        let (value, _) = read_length_prefixed(buf, after_key)?;
        if key.len() > max_key_len() {
            return Err(Error::Corruption { page_id: 0 });
        }
        // Identical inline strictly-ascending check as decode_leaf_body.
        if let Some(prev) = prev_key {
            if key <= prev {
                return Err(Error::Corruption { page_id: 0 });
            }
        }
        // Copy out ONLY the matched value; keep scanning otherwise so
        // every slot's checks still run (corruption past the match is
        // not silently ignored on a hit at index i).
        if key == target {
            return Ok(Some(value.to_vec()));
        }
        prev_key = Some(key);
    }
    Ok(None)
}

fn decode_leaf_body(buf: &[u8; PAGE_SIZE], node: &mut DecodedNode, key_count: usize) -> Result<()> {
    if key_count > LEAF_SLOT_CAP {
        return Err(Error::Corruption { page_id: 0 });
    }
    node.leaves.reserve(key_count);
    for i in 0..key_count {
        let slot_off = PAYLOAD_OFFSET + i * LEAF_SLOT_BYTES;
        if slot_off + LEAF_SLOT_BYTES > PAYLOAD_END {
            return Err(Error::Corruption { page_id: 0 });
        }
        let entry_off = u32::from_le_bytes([
            buf[slot_off],
            buf[slot_off + 1],
            buf[slot_off + 2],
            buf[slot_off + 3],
        ]) as usize;
        if !(PAYLOAD_OFFSET..PAYLOAD_END).contains(&entry_off) {
            return Err(Error::Corruption { page_id: 0 });
        }
        let (key, after_key) = read_length_prefixed(buf, entry_off)?;
        let (value, _) = read_length_prefixed(buf, after_key)?;
        if key.len() > max_key_len() {
            return Err(Error::Corruption { page_id: 0 });
        }
        // Inline strictly-ascending check against the last key already
        // in the vec — no per-iteration `prev_key` clone needed.
        if let Some(prev) = node.leaves.last() {
            if key <= prev.key.as_slice() {
                return Err(Error::Corruption { page_id: 0 });
            }
        }
        node.leaves.push(LeafEntry {
            key: key.to_vec(),
            value: value.to_vec(),
        });
    }
    Ok(())
}

fn decode_internal_body(
    buf: &[u8; PAGE_SIZE],
    node: &mut DecodedNode,
    key_count: usize,
) -> Result<()> {
    if key_count > INTERNAL_SLOT_CAP {
        return Err(Error::Corruption { page_id: 0 });
    }
    let leftmost = read_u64(buf, PAYLOAD_OFFSET);
    if leftmost == 0 {
        return Err(Error::Corruption { page_id: 0 });
    }
    node.children.reserve(key_count + 1);
    node.internals.reserve(key_count);
    node.children.push(leftmost);
    for i in 0..key_count {
        let (key_vec, right_child) = decode_internal_slot(buf, i)?;
        // Inline strictly-ascending check against the last pivot
        // already in the vec — no per-iteration `prev_key` clone.
        if let Some(prev) = node.internals.last() {
            if key_vec.as_slice() <= prev.key.as_slice() {
                return Err(Error::Corruption { page_id: 0 });
            }
        }
        node.internals.push(InternalEntry { key: key_vec });
        node.children.push(right_child);
    }
    Ok(())
}

/// Decode the i-th slot of an internal-node body. Returns the pivot
/// key and the right-child page-id, or `Error::Corruption` if the
/// slot is malformed.
fn decode_internal_slot(buf: &[u8; PAGE_SIZE], i: usize) -> Result<(Vec<u8>, u64)> {
    let slot_off = PAYLOAD_OFFSET + INTERNAL_LEFTMOST_CHILD_BYTES + i * INTERNAL_SLOT_BYTES;
    if slot_off + INTERNAL_SLOT_BYTES > PAYLOAD_END {
        return Err(Error::Corruption { page_id: 0 });
    }
    let entry_off = u32::from_le_bytes([
        buf[slot_off],
        buf[slot_off + 1],
        buf[slot_off + 2],
        buf[slot_off + 3],
    ]) as usize;
    let right_child = read_u64(buf, slot_off + 4);
    if right_child == 0 || !(PAYLOAD_OFFSET..PAYLOAD_END).contains(&entry_off) {
        return Err(Error::Corruption { page_id: 0 });
    }
    let (key, _) = read_length_prefixed(buf, entry_off)?;
    if key.len() > max_key_len() {
        return Err(Error::Corruption { page_id: 0 });
    }
    Ok((key.to_vec(), right_child))
}

/// Read a u64 LE from `buf` at `offset`. Panics if `offset + 8` is
/// out of bounds; callers must validate the offset.
fn read_u64(buf: &[u8; PAGE_SIZE], offset: usize) -> u64 {
    u64::from_le_bytes([
        buf[offset],
        buf[offset + 1],
        buf[offset + 2],
        buf[offset + 3],
        buf[offset + 4],
        buf[offset + 5],
        buf[offset + 6],
        buf[offset + 7],
    ])
}

/// Read a `(length-prefixed bytes, next_offset)` pair from `buf`
/// starting at `offset`. Length is an unsigned LEB128 varint; the
/// payload is `length` raw bytes.
///
/// # Power-of-ten Rule 7 posture
///
/// `len` is caller-controlled (it lives on disk in a page byte that
/// can be tampered with or random in a fuzz harness). The
/// `after + len_usize` end-of-slice computation MUST use
/// `checked_add`: with `overflow-checks = true` in release (per
/// Rule 10) a raw `+` panics on a wrap; without overflow-checks the
/// bounds check is silently bypassed. Either failure mode is a
/// denial-of-service hazard reachable from
/// `Pager::read_page → BTree::open → decode_node` on any tampered
/// B+tree page (M13 #115).
fn read_length_prefixed(buf: &[u8; PAGE_SIZE], offset: usize) -> Result<(&[u8], usize)> {
    if offset >= PAYLOAD_END {
        return Err(Error::Corruption { page_id: 0 });
    }
    let (len, after) = read_varint(buf, offset)?;
    let len_usize = usize_from_u64(len)?;
    // Rule 7: `len_usize` is unbounded (varint domain is u64); raw
    // `after + len_usize` wraps on tampered input.
    let end = after
        .checked_add(len_usize)
        .ok_or(Error::Corruption { page_id: 0 })?;
    if end > PAYLOAD_END {
        return Err(Error::Corruption { page_id: 0 });
    }
    Ok((&buf[after..end], end))
}

/// Validate `node` against the format-version-0 invariants.
///
/// # Errors
///
/// Returns [`Error::BTreeInvariantViolated`] if any invariant fails.
/// Callers can `debug_assert!` on a wrapping helper for development
/// builds; release builds get the error surfaced cleanly.
pub fn validate_node(node: &DecodedNode) -> Result<()> {
    validate_node_release(node)
}

fn validate_node_release(node: &DecodedNode) -> Result<()> {
    validate_node_structural(node)?;
    validate_node_ordering(node)
}

/// The cheap O(1) header / capacity / children invariants that the
/// inline decode loop does *not* enforce: leaf `level == 0`, internal
/// `level != 0`, internal `next_sibling == 0`,
/// `children.len() == pivots + 1`, slot-cap bounds, and non-zero
/// internal child page-ids.
///
/// This is the only part of the full validate that the read find path
/// needs: the strictly-ascending key ordering ([`validate_node_ordering`])
/// is enforced inline during decode, so the find path — which already
/// re-derives the same ordering on every slot it touches — does not
/// repeat the whole-node `windows(2)` re-walk.
fn validate_node_structural(node: &DecodedNode) -> Result<()> {
    match node.kind {
        NodeKind::Leaf => {
            if node.level != 0 {
                return Err(Error::BTreeInvariantViolated {
                    reason: "leaf has non-zero level",
                });
            }
            if node.leaves.len() > LEAF_SLOT_CAP {
                return Err(Error::BTreeInvariantViolated {
                    reason: "leaf key count exceeds slot cap",
                });
            }
        }
        NodeKind::Internal => {
            if node.level == 0 {
                return Err(Error::BTreeInvariantViolated {
                    reason: "internal node at level 0",
                });
            }
            if node.internals.len() > INTERNAL_SLOT_CAP {
                return Err(Error::BTreeInvariantViolated {
                    reason: "internal key count exceeds slot cap",
                });
            }
            if node.children.len() != node.internals.len() + 1 {
                return Err(Error::BTreeInvariantViolated {
                    reason: "internal children.len() != pivots+1",
                });
            }
            for c in &node.children {
                if *c == 0 {
                    return Err(Error::BTreeInvariantViolated {
                        reason: "internal node has zero child page-id",
                    });
                }
            }
            if node.next_sibling != 0 {
                return Err(Error::BTreeInvariantViolated {
                    reason: "internal node has non-zero next_sibling",
                });
            }
        }
    }
    Ok(())
}

/// The strictly-ascending key-ordering re-walk. On the full decode and
/// every mutation/integrity caller this confirms the whole node is
/// sorted; the read find path skips it because [`decode_leaf_body`] /
/// [`decode_internal_body`] enforce the identical inline check on every
/// key as it is pushed (node.rs leaf/internal `last()` compares).
fn validate_node_ordering(node: &DecodedNode) -> Result<()> {
    match node.kind {
        NodeKind::Leaf => {
            for w in node.leaves.windows(2) {
                if w[0].key.as_slice() >= w[1].key.as_slice() {
                    return Err(Error::BTreeInvariantViolated {
                        reason: "leaf keys not strictly sorted",
                    });
                }
            }
        }
        NodeKind::Internal => {
            for w in node.internals.windows(2) {
                if w[0].key.as_slice() >= w[1].key.as_slice() {
                    return Err(Error::BTreeInvariantViolated {
                        reason: "internal keys not strictly sorted",
                    });
                }
            }
        }
    }
    Ok(())
}

// --------- varint helpers ---------

/// Number of bytes an unsigned LEB128 varint would use for `v`.
#[must_use]
pub fn varint_len(v: u64) -> usize {
    let mut n: usize = 1;
    let mut x = v >> 7;
    while x != 0 {
        n += 1;
        x >>= 7;
    }
    n
}

/// Write an unsigned LEB128 varint into `dst` starting at byte 0.
/// Returns the number of bytes written.
fn write_varint(dst: &mut [u8], mut v: u64) -> usize {
    let mut i = 0;
    loop {
        let mut byte = (v & 0x7F) as u8;
        v >>= 7;
        if v != 0 {
            byte |= 0x80;
            dst[i] = byte;
            i += 1;
        } else {
            dst[i] = byte;
            i += 1;
            return i;
        }
    }
}

/// Read an unsigned LEB128 varint from `buf` starting at `offset`.
/// Returns `(value, next_offset)`. Power-of-ten Rule 2: the loop is
/// bounded by 10 bytes (`ceil(64 / 7)`), which is the maximum a
/// 64-bit varint can occupy.
fn read_varint(buf: &[u8; PAGE_SIZE], offset: usize) -> Result<(u64, usize)> {
    let mut value: u64 = 0;
    let mut shift: u32 = 0;
    let mut i = offset;
    for _ in 0..10 {
        if i >= PAYLOAD_END {
            return Err(Error::Corruption { page_id: 0 });
        }
        let byte = buf[i];
        i += 1;
        let chunk = u64::from(byte & 0x7F);
        // `shift` is in range [0..63]; the cast cannot truncate.
        value |= chunk << shift;
        if (byte & 0x80) == 0 {
            return Ok((value, i));
        }
        shift += 7;
        if shift >= 64 {
            return Err(Error::Corruption { page_id: 0 });
        }
    }
    Err(Error::Corruption { page_id: 0 })
}

// --------- narrow casts ---------

fn u32_from_usize(v: usize) -> u32 {
    // PAYLOAD_END < 2^32; every caller passes such a value.
    debug_assert!(u32::try_from(v).is_ok());
    // Saturating downcast: the asserted bound is the binding contract;
    // a release-mode violation would still return a too-large u32, which
    // an encoder cross-check (decode_node) would catch on read.
    u32::try_from(v).unwrap_or(u32::MAX)
}

fn u16_from_usize(v: usize) -> u16 {
    debug_assert!(u16::try_from(v).is_ok());
    u16::try_from(v).unwrap_or(u16::MAX)
}

fn u64_from_usize(v: usize) -> u64 {
    v as u64
}

fn usize_from_u64(v: u64) -> Result<usize> {
    usize::try_from(v).map_err(|_| Error::Corruption { page_id: 0 })
}

#[cfg(test)]
mod tests {
    use super::*;

    fn empty_leaf() -> DecodedNode {
        DecodedNode {
            kind: NodeKind::Leaf,
            level: 0,
            next_sibling: 0,
            children: Vec::new(),
            leaves: Vec::new(),
            internals: Vec::new(),
        }
    }

    fn leaf_with(entries: &[(&[u8], &[u8])]) -> DecodedNode {
        let mut leaf = empty_leaf();
        for (k, v) in entries {
            leaf.leaves.push(LeafEntry {
                key: k.to_vec(),
                value: v.to_vec(),
            });
        }
        leaf
    }

    #[test]
    fn round_trip_empty_leaf() {
        let leaf = empty_leaf();
        let mut page = Page::zeroed();
        encode_node(&leaf, &mut page).expect("encode");
        let decoded = decode_node(page.as_bytes()).expect("decode");
        assert_eq!(decoded.kind, NodeKind::Leaf);
        assert_eq!(decoded.level, 0);
        assert_eq!(decoded.next_sibling, 0);
        assert!(decoded.leaves.is_empty());
    }

    #[test]
    fn round_trip_populated_leaf() {
        let leaf = leaf_with(&[(b"alpha", b"AAA"), (b"bravo", b"BBB"), (b"charlie", b"CCC")]);
        let mut page = Page::zeroed();
        encode_node(&leaf, &mut page).expect("encode");
        let decoded = decode_node(page.as_bytes()).expect("decode");
        assert_eq!(decoded.leaves.len(), 3);
        assert_eq!(decoded.leaves[0].key.as_slice(), b"alpha");
        assert_eq!(decoded.leaves[0].value.as_slice(), b"AAA");
        assert_eq!(decoded.leaves[2].key.as_slice(), b"charlie");
    }

    #[test]
    fn round_trip_internal() {
        let mut internal = DecodedNode {
            kind: NodeKind::Internal,
            level: 1,
            next_sibling: 0,
            children: Vec::new(),
            leaves: Vec::new(),
            internals: Vec::new(),
        };
        // 3 pivots → 4 children.
        for raw in [10u64, 20, 30, 40] {
            internal.children.push(raw);
        }
        for k in [b"d".as_slice(), b"h", b"m"] {
            internal.internals.push(InternalEntry { key: k.to_vec() });
        }
        let mut page = Page::zeroed();
        encode_node(&internal, &mut page).expect("encode");
        let decoded = decode_node(page.as_bytes()).expect("decode");
        assert_eq!(decoded.kind, NodeKind::Internal);
        assert_eq!(decoded.level, 1);
        assert_eq!(decoded.internals.len(), 3);
        assert_eq!(decoded.children.as_slice(), &[10, 20, 30, 40]);
        assert_eq!(decoded.internals[0].key.as_slice(), b"d");
        assert_eq!(decoded.internals[2].key.as_slice(), b"m");
    }

    #[test]
    fn decode_rejects_unsorted_leaf() {
        // Encode a deliberately-malformed leaf by hand: write two
        // out-of-order entries and check decode errors.
        let mut page = Page::zeroed();
        let buf = page.as_bytes_mut();
        buf[OFF_PAGE_TYPE] = PAGE_TYPE_BTREE_LEAF;
        buf[OFF_LEVEL] = 0;
        buf[OFF_KEY_COUNT..OFF_KEY_COUNT + 2].copy_from_slice(&2u16.to_le_bytes());
        // Slot 0 at PAYLOAD_END - 5  ("c" + "x").
        // Slot 1 at PAYLOAD_END - 10 ("a" + "y").
        let slot0 = u32_from_usize(PAYLOAD_END - 4);
        let slot1 = u32_from_usize(PAYLOAD_END - 8);
        buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + 4].copy_from_slice(&slot0.to_le_bytes());
        buf[PAYLOAD_OFFSET + 4..PAYLOAD_OFFSET + 8].copy_from_slice(&slot1.to_le_bytes());
        // Heap entry for slot 0: key="c", value="x"  -> 1,'c',1,'x'.
        buf[PAYLOAD_END - 4] = 1;
        buf[PAYLOAD_END - 3] = b'c';
        buf[PAYLOAD_END - 2] = 1;
        buf[PAYLOAD_END - 1] = b'x';
        // Heap entry for slot 1: key="a", value="y"  -> 1,'a',1,'y'.
        buf[PAYLOAD_END - 8] = 1;
        buf[PAYLOAD_END - 7] = b'a';
        buf[PAYLOAD_END - 6] = 1;
        buf[PAYLOAD_END - 5] = b'y';
        crate::pager::checksum::write_page_trailer(&mut page);
        let err = decode_node(page.as_bytes()).expect_err("decode must reject");
        assert!(matches!(err, Error::Corruption { .. }));
    }

    #[test]
    fn varint_round_trip() {
        for v in [0u64, 1, 127, 128, 0xFFFF, 0xDEAD_BEEF, u64::MAX] {
            let mut buf = [0u8; 16];
            let n = write_varint(&mut buf, v);
            assert_eq!(n, varint_len(v));
            // Read back using a faux PAGE_SIZE buffer.
            let mut page = [0u8; PAGE_SIZE];
            page[..n].copy_from_slice(&buf[..n]);
            let (decoded, after) = read_varint(&page, 0).expect("decode varint");
            assert_eq!(decoded, v);
            assert_eq!(after, n);
        }
    }

    /// M13 #115 regression: a tampered B+tree leaf carrying a
    /// length-prefix varint that encodes `u64::MAX` must return
    /// `Error::Corruption`, NOT panic in `read_length_prefixed` on
    /// the `after + len_usize` overflow.
    ///
    /// Critical: this test must also pass under `cargo test --release`
    /// — the workspace profile enables `overflow-checks = true` per
    /// power-of-ten Rule 10, which is the mechanism that turned the
    /// arithmetic wrap into a panic in the first place. A `checked_add`
    /// fix is the only thing that makes both debug and release happy.
    #[test]
    fn decode_node_varint_max_returns_corruption_not_panic() {
        let mut page = Page::zeroed();
        {
            let buf = page.as_bytes_mut();
            buf[OFF_PAGE_TYPE] = PAGE_TYPE_BTREE_LEAF;
            buf[OFF_LEVEL] = 0;
            // One slot. The varint at the slot's entry-offset will
            // encode `u64::MAX`, which `read_length_prefixed` must
            // reject rather than wrap.
            buf[OFF_KEY_COUNT..OFF_KEY_COUNT + 2].copy_from_slice(&1u16.to_le_bytes());
            // Place the heap entry near the end of the payload. The
            // bug triggers BEFORE the bounds check, so the absolute
            // offset value only matters to the extent that it points
            // somewhere parseable.
            let entry_off = PAYLOAD_END - 16;
            let slot_off_bytes = u32_from_usize(entry_off).to_le_bytes();
            buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + 4].copy_from_slice(&slot_off_bytes);
            // 10-byte LEB128 encoding of `u64::MAX`. The 10th byte's
            // top bit is clear so `read_varint` accepts it; the
            // decoded length is `u64::MAX`, which `as usize` becomes
            // `usize::MAX` on 64-bit and triggers the `after + len`
            // overflow in `read_length_prefixed`.
            let varint = [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01];
            buf[entry_off..entry_off + varint.len()].copy_from_slice(&varint);
        }
        crate::pager::checksum::write_page_trailer(&mut page);
        let result = decode_node(page.as_bytes());
        match result {
            Err(Error::Corruption { .. }) => {}
            other => panic!(
                "expected Err(Error::Corruption), got {:?} \
                 (a panic here would indicate the M13 #115 bug \
                  is still present in release builds)",
                other.map_or("non-corruption err", |_| "Ok(_)")
            ),
        }
    }

    /// M13 #115 regression for the internal-node variant: a tampered
    /// internal-node entry pointing at a varint that encodes
    /// `u64::MAX` must surface as `Error::Corruption`, not panic via
    /// `decode_internal_slot → read_length_prefixed`.
    #[test]
    fn decode_internal_node_varint_max_returns_corruption() {
        let mut page = Page::zeroed();
        {
            let buf = page.as_bytes_mut();
            buf[OFF_PAGE_TYPE] = PAGE_TYPE_BTREE_INTERNAL;
            buf[OFF_LEVEL] = 1;
            buf[OFF_KEY_COUNT..OFF_KEY_COUNT + 2].copy_from_slice(&1u16.to_le_bytes());
            // Leftmost child page-id — must be non-zero.
            buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + 8].copy_from_slice(&42u64.to_le_bytes());
            // One internal slot: u32 entry_off + u64 right_child.
            let slot_base = PAYLOAD_OFFSET + INTERNAL_LEFTMOST_CHILD_BYTES;
            let entry_off = PAYLOAD_END - 16;
            buf[slot_base..slot_base + 4].copy_from_slice(&u32_from_usize(entry_off).to_le_bytes());
            buf[slot_base + 4..slot_base + INTERNAL_SLOT_BYTES]
                .copy_from_slice(&7u64.to_le_bytes());
            // Same `u64::MAX` varint as the leaf test above.
            let varint = [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01];
            buf[entry_off..entry_off + varint.len()].copy_from_slice(&varint);
        }
        crate::pager::checksum::write_page_trailer(&mut page);
        let result = decode_node(page.as_bytes());
        assert!(
            matches!(result, Err(Error::Corruption { .. })),
            "expected Err(Error::Corruption) on u64::MAX varint in internal slot"
        );
    }

    #[test]
    fn max_inline_value_is_below_payload() {
        let k = 8;
        let v = max_inline_value(k);
        // Compute slot+heap bytes for one entry at this max value
        // length and assert it fits.
        let entry_len = varint_len(u64_from_usize(k)) + k + varint_len(u64_from_usize(v)) + v;
        let payload = entry_len + LEAF_SLOT_BYTES;
        assert!(payload <= PAYLOAD_END - PAYLOAD_OFFSET);
    }

    // ---- find-only path (#82) ----

    /// The find-only path returns the matched value and `None` for a
    /// miss, agreeing with a full decode.
    #[test]
    fn decode_node_find_matches_full_decode() {
        let leaf = leaf_with(&[(b"alpha", b"AAA"), (b"bravo", b"BBB"), (b"charlie", b"CCC")]);
        let mut page = Page::zeroed();
        encode_node(&leaf, &mut page).expect("encode");
        let found = decode_node_find(page.as_bytes(), b"bravo").expect("find");
        assert_eq!(found.as_deref(), Some(b"BBB".as_slice()));
        let missing = decode_node_find(page.as_bytes(), b"zeta").expect("find");
        assert_eq!(missing, None);
        // First and last keys, exercising both scan ends.
        assert_eq!(
            decode_node_find(page.as_bytes(), b"alpha")
                .expect("find")
                .as_deref(),
            Some(b"AAA".as_slice())
        );
        assert_eq!(
            decode_node_find(page.as_bytes(), b"charlie")
                .expect("find")
                .as_deref(),
            Some(b"CCC".as_slice())
        );
    }

    /// #82 safety regression (i): a tampered slot offset pointing
    /// outside the payload range must surface `Error::Corruption`
    /// THROUGH THE FIND PATH, not only through the full decode. The
    /// search targets `"b"`, which forces the scan to read the damaged
    /// slot 0 first.
    #[test]
    fn decode_node_find_rejects_corrupted_slot_offset() {
        let leaf = leaf_with(&[(b"a", b"x"), (b"b", b"y")]);
        let mut page = Page::zeroed();
        encode_node(&leaf, &mut page).expect("encode");
        let buf = page.as_bytes_mut();
        // Point slot 0's offset past the payload end → out of range.
        let bad = u32_from_usize(PAYLOAD_END + 4);
        buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + 4].copy_from_slice(&bad.to_le_bytes());
        crate::pager::checksum::write_page_trailer(&mut page);
        let r = decode_node_find(page.as_bytes(), b"b");
        assert!(
            matches!(r, Err(Error::Corruption { .. })),
            "corrupted slot offset not caught by find path: {r:?}"
        );
    }

    /// #82 safety regression (ii): two out-of-order keys must surface
    /// `Error::Corruption` THROUGH THE FIND PATH. Built by hand since
    /// `encode_node` would reject a descending leaf; the search target
    /// `"z"` (a miss) forces the full scan to reach the descending
    /// pair at slot 1.
    #[test]
    fn decode_node_find_rejects_out_of_order_keys() {
        let mut page = Page::zeroed();
        {
            let buf = page.as_bytes_mut();
            buf[OFF_PAGE_TYPE] = PAGE_TYPE_BTREE_LEAF;
            buf[OFF_LEVEL] = 0;
            buf[OFF_KEY_COUNT..OFF_KEY_COUNT + 2].copy_from_slice(&2u16.to_le_bytes());
            // Slot 0 -> ("c","x"); slot 1 -> ("a","y") — descending.
            let slot0 = u32_from_usize(PAYLOAD_END - 4);
            let slot1 = u32_from_usize(PAYLOAD_END - 8);
            buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + 4].copy_from_slice(&slot0.to_le_bytes());
            buf[PAYLOAD_OFFSET + 4..PAYLOAD_OFFSET + 8].copy_from_slice(&slot1.to_le_bytes());
            buf[PAYLOAD_END - 4] = 1;
            buf[PAYLOAD_END - 3] = b'c';
            buf[PAYLOAD_END - 2] = 1;
            buf[PAYLOAD_END - 1] = b'x';
            buf[PAYLOAD_END - 8] = 1;
            buf[PAYLOAD_END - 7] = b'a';
            buf[PAYLOAD_END - 6] = 1;
            buf[PAYLOAD_END - 5] = b'y';
        }
        crate::pager::checksum::write_page_trailer(&mut page);
        let r = decode_node_find(page.as_bytes(), b"z");
        assert!(
            matches!(r, Err(Error::Corruption { .. })),
            "out-of-order key not caught by find path: {r:?}"
        );
    }

    /// #82 safety regression (iii): a key whose length prefix claims
    /// `max_key_len() + 1` bytes (in-bounds for the page but over the
    /// format cap) must surface `Error::Corruption` THROUGH THE FIND
    /// PATH on the only slot it reads.
    #[test]
    fn decode_node_find_rejects_over_long_key() {
        let mut page = Page::zeroed();
        {
            let buf = page.as_bytes_mut();
            buf[OFF_PAGE_TYPE] = PAGE_TYPE_BTREE_LEAF;
            buf[OFF_LEVEL] = 0;
            buf[OFF_KEY_COUNT..OFF_KEY_COUNT + 2].copy_from_slice(&1u16.to_le_bytes());
            let over = max_key_len() + 1;
            let entry_off = PAYLOAD_OFFSET + LEAF_SLOT_BYTES;
            buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + 4]
                .copy_from_slice(&u32_from_usize(entry_off).to_le_bytes());
            let n = write_varint(&mut buf[entry_off..], u64_from_usize(over));
            assert_eq!(n, varint_len(u64_from_usize(over)));
            // Claimed key bytes stay zero; they fit because
            // max_key_len() = PAGE_SIZE/4 < the remaining payload.
        }
        crate::pager::checksum::write_page_trailer(&mut page);
        let r = decode_node_find(page.as_bytes(), b"anything");
        assert!(
            matches!(r, Err(Error::Corruption { .. })),
            "over-long key length not caught by find path: {r:?}"
        );
    }

    /// The find path rejects an internal-node page: a read never calls
    /// it on an internal node, and doing so is an invariant violation.
    #[test]
    fn decode_node_find_rejects_internal_node() {
        let mut internal = DecodedNode {
            kind: NodeKind::Internal,
            level: 1,
            next_sibling: 0,
            children: Vec::new(),
            leaves: Vec::new(),
            internals: Vec::new(),
        };
        internal.children.push(10);
        internal.children.push(20);
        internal
            .internals
            .push(InternalEntry { key: b"m".to_vec() });
        let mut page = Page::zeroed();
        encode_node(&internal, &mut page).expect("encode");
        let r = decode_node_find(page.as_bytes(), b"m");
        assert!(matches!(r, Err(Error::BTreeInvariantViolated { .. })));
    }
}