1#![forbid(unsafe_code)]
63
64use crate::error::{Error, Result};
65use crate::pager::page::{Page, PAGE_SIZE, PAGE_TRAILER_SIZE};
66
67pub const PAGE_TYPE_BTREE_INTERNAL: u8 = 0x02;
70pub const PAGE_TYPE_BTREE_LEAF: u8 = 0x03;
72
73pub const NODE_HEADER_SIZE: usize = 24;
75
76const OFF_PAGE_TYPE: usize = 0;
78const OFF_LEVEL: usize = 1;
79const OFF_KEY_COUNT: usize = 2;
80const OFF_FREE_SPACE_OFF: usize = 4;
81const OFF_NEXT_SIBLING: usize = 8;
82const OFF_RESERVED: usize = 16;
83
84pub const LEAF_SLOT_BYTES: usize = 4;
87
88pub const INTERNAL_SLOT_BYTES: usize = 12;
91
92pub const INTERNAL_LEFTMOST_CHILD_BYTES: usize = 8;
95
96const PAYLOAD_OFFSET: usize = NODE_HEADER_SIZE;
99
100const PAYLOAD_END: usize = PAGE_SIZE - PAGE_TRAILER_SIZE;
103
104pub const LEAF_SLOT_CAP: usize = (PAYLOAD_END - PAYLOAD_OFFSET) / LEAF_SLOT_BYTES;
107
108pub const INTERNAL_SLOT_CAP: usize =
111 (PAYLOAD_END - PAYLOAD_OFFSET - INTERNAL_LEFTMOST_CHILD_BYTES) / INTERNAL_SLOT_BYTES;
112
113#[must_use]
116pub const fn max_key_len() -> usize {
117 PAGE_SIZE / 4
118}
119
120#[must_use]
123pub const fn max_inline_value(key_len: usize) -> usize {
124 let base = PAYLOAD_END - PAYLOAD_OFFSET - LEAF_SLOT_BYTES;
127 let varints = 9 + 9;
129 if key_len + varints >= base {
130 0
131 } else {
132 base - key_len - varints
133 }
134}
135
136#[derive(Debug, Clone, Copy, PartialEq, Eq)]
138pub enum NodeKind {
139 Internal,
141 Leaf,
143}
144
145#[derive(Debug, Clone)]
147pub struct LeafEntry {
148 pub key: Vec<u8>,
150 pub value: Vec<u8>,
152}
153
154#[derive(Debug, Clone)]
158pub struct InternalEntry {
159 pub key: Vec<u8>,
161}
162
163pub type ChildEntry = u64;
168
169#[derive(Debug, Clone)]
176pub struct DecodedNode {
177 pub kind: NodeKind,
179 pub level: u8,
181 pub next_sibling: u64,
184 pub children: Vec<ChildEntry>,
188 pub leaves: Vec<LeafEntry>,
190 pub internals: Vec<InternalEntry>,
192}
193
194impl DecodedNode {
195 #[must_use]
199 pub fn occupied_bytes(&self) -> usize {
200 match self.kind {
201 NodeKind::Leaf => {
202 let slot_bytes = self.leaves.len() * LEAF_SLOT_BYTES;
203 let heap_bytes: usize = self
204 .leaves
205 .iter()
206 .map(|e| {
207 varint_len(u64_from_usize(e.key.len()))
208 + e.key.len()
209 + varint_len(u64_from_usize(e.value.len()))
210 + e.value.len()
211 })
212 .sum();
213 slot_bytes + heap_bytes
214 }
215 NodeKind::Internal => {
216 let slot_bytes = self.internals.len() * INTERNAL_SLOT_BYTES;
217 let heap_bytes: usize = self
218 .internals
219 .iter()
220 .map(|e| varint_len(u64_from_usize(e.key.len())) + e.key.len())
221 .sum();
222 INTERNAL_LEFTMOST_CHILD_BYTES + slot_bytes + heap_bytes
223 }
224 }
225 }
226
227 #[must_use]
229 pub fn free_bytes(&self) -> usize {
230 let cap = PAYLOAD_END - PAYLOAD_OFFSET;
231 cap.saturating_sub(self.occupied_bytes())
232 }
233}
234
235pub fn encode_node(node: &DecodedNode, page: &mut Page) -> Result<()> {
242 validate_node_release(node)?;
243 let buf = page.as_bytes_mut();
244 buf.fill(0);
245 write_node_header(buf, node);
246 match node.kind {
247 NodeKind::Leaf => encode_leaf_body(buf, node)?,
248 NodeKind::Internal => encode_internal_body(buf, node)?,
249 }
250 crate::pager::checksum::write_page_trailer(page);
254 Ok(())
255}
256
257fn write_node_header(buf: &mut [u8; PAGE_SIZE], node: &DecodedNode) {
258 let page_type = match node.kind {
259 NodeKind::Leaf => PAGE_TYPE_BTREE_LEAF,
260 NodeKind::Internal => PAGE_TYPE_BTREE_INTERNAL,
261 };
262 buf[OFF_PAGE_TYPE] = page_type;
263 buf[OFF_LEVEL] = node.level;
264 let key_count = match node.kind {
265 NodeKind::Leaf => node.leaves.len(),
266 NodeKind::Internal => node.internals.len(),
267 };
268 let kc16 = u16_from_usize(key_count);
272 buf[OFF_KEY_COUNT..OFF_KEY_COUNT + 2].copy_from_slice(&kc16.to_le_bytes());
273 let heap_bytes: usize = match node.kind {
277 NodeKind::Leaf => node
278 .leaves
279 .iter()
280 .map(|e| {
281 varint_len(u64_from_usize(e.key.len()))
282 + e.key.len()
283 + varint_len(u64_from_usize(e.value.len()))
284 + e.value.len()
285 })
286 .sum(),
287 NodeKind::Internal => node
288 .internals
289 .iter()
290 .map(|e| varint_len(u64_from_usize(e.key.len())) + e.key.len())
291 .sum(),
292 };
293 let fso = u32_from_usize(PAYLOAD_END.saturating_sub(heap_bytes));
294 buf[OFF_FREE_SPACE_OFF..OFF_FREE_SPACE_OFF + 4].copy_from_slice(&fso.to_le_bytes());
295 buf[OFF_NEXT_SIBLING..OFF_NEXT_SIBLING + 8].copy_from_slice(&node.next_sibling.to_le_bytes());
296 let _ = OFF_RESERVED;
298}
299
300fn encode_leaf_body(buf: &mut [u8; PAGE_SIZE], node: &DecodedNode) -> Result<()> {
301 let mut heap_cursor = PAYLOAD_END;
302 let mut slot_off = PAYLOAD_OFFSET;
303 for entry in &node.leaves {
304 let entry_len = varint_len(u64_from_usize(entry.key.len()))
305 + entry.key.len()
306 + varint_len(u64_from_usize(entry.value.len()))
307 + entry.value.len();
308 if heap_cursor < entry_len + slot_off + LEAF_SLOT_BYTES {
309 return Err(Error::BTreeInvariantViolated {
310 reason: "leaf encode: slot dir and heap collide",
311 });
312 }
313 heap_cursor -= entry_len;
314 let mut cur = heap_cursor;
315 let kl = u64_from_usize(entry.key.len());
316 cur += write_varint(&mut buf[cur..], kl);
317 buf[cur..cur + entry.key.len()].copy_from_slice(&entry.key);
318 cur += entry.key.len();
319 let vl = u64_from_usize(entry.value.len());
320 cur += write_varint(&mut buf[cur..], vl);
321 buf[cur..cur + entry.value.len()].copy_from_slice(&entry.value);
322 let off32 = u32_from_usize(heap_cursor);
323 buf[slot_off..slot_off + LEAF_SLOT_BYTES].copy_from_slice(&off32.to_le_bytes());
324 slot_off += LEAF_SLOT_BYTES;
325 }
326 Ok(())
327}
328
329fn encode_internal_body(buf: &mut [u8; PAGE_SIZE], node: &DecodedNode) -> Result<()> {
330 if node.children.len() != node.internals.len() + 1 {
331 return Err(Error::BTreeInvariantViolated {
332 reason: "internal node has children.len() != pivots+1",
333 });
334 }
335 let leftmost = node.children[0];
336 buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + INTERNAL_LEFTMOST_CHILD_BYTES]
337 .copy_from_slice(&leftmost.to_le_bytes());
338 let mut heap_cursor = PAYLOAD_END;
339 let mut slot_off = PAYLOAD_OFFSET + INTERNAL_LEFTMOST_CHILD_BYTES;
340 for (i, pivot) in node.internals.iter().enumerate() {
341 let right_child = node.children[i + 1];
342 let entry_len = varint_len(u64_from_usize(pivot.key.len())) + pivot.key.len();
343 if heap_cursor < entry_len + slot_off + INTERNAL_SLOT_BYTES {
344 return Err(Error::BTreeInvariantViolated {
345 reason: "internal encode: slot dir and heap collide",
346 });
347 }
348 heap_cursor -= entry_len;
349 let mut cur = heap_cursor;
350 cur += write_varint(&mut buf[cur..], u64_from_usize(pivot.key.len()));
351 buf[cur..cur + pivot.key.len()].copy_from_slice(&pivot.key);
352 let off32 = u32_from_usize(heap_cursor);
353 buf[slot_off..slot_off + 4].copy_from_slice(&off32.to_le_bytes());
354 buf[slot_off + 4..slot_off + INTERNAL_SLOT_BYTES]
355 .copy_from_slice(&right_child.to_le_bytes());
356 slot_off += INTERNAL_SLOT_BYTES;
357 }
358 Ok(())
359}
360
361pub fn decode_node(buf: &[u8; PAGE_SIZE]) -> Result<DecodedNode> {
381 let page_type = buf[OFF_PAGE_TYPE];
382 let kind = match page_type {
383 PAGE_TYPE_BTREE_LEAF => NodeKind::Leaf,
384 PAGE_TYPE_BTREE_INTERNAL => NodeKind::Internal,
385 _ => return Err(Error::Corruption { page_id: 0 }),
386 };
387 let level = buf[OFF_LEVEL];
388 let key_count = u16::from_le_bytes([buf[OFF_KEY_COUNT], buf[OFF_KEY_COUNT + 1]]) as usize;
389 let next_sibling = u64::from_le_bytes([
390 buf[OFF_NEXT_SIBLING],
391 buf[OFF_NEXT_SIBLING + 1],
392 buf[OFF_NEXT_SIBLING + 2],
393 buf[OFF_NEXT_SIBLING + 3],
394 buf[OFF_NEXT_SIBLING + 4],
395 buf[OFF_NEXT_SIBLING + 5],
396 buf[OFF_NEXT_SIBLING + 6],
397 buf[OFF_NEXT_SIBLING + 7],
398 ]);
399 let mut node = DecodedNode {
400 kind,
401 level,
402 next_sibling,
403 children: Vec::new(),
404 leaves: Vec::new(),
405 internals: Vec::new(),
406 };
407 match kind {
408 NodeKind::Leaf => decode_leaf_body(buf, &mut node, key_count)?,
409 NodeKind::Internal => decode_internal_body(buf, &mut node, key_count)?,
410 }
411 validate_node_release(&node)?;
412 Ok(node)
413}
414
415pub fn peek_node_kind(buf: &[u8; PAGE_SIZE]) -> Result<NodeKind> {
428 match buf[OFF_PAGE_TYPE] {
429 PAGE_TYPE_BTREE_LEAF => Ok(NodeKind::Leaf),
430 PAGE_TYPE_BTREE_INTERNAL => Ok(NodeKind::Internal),
431 _ => Err(Error::Corruption { page_id: 0 }),
432 }
433}
434
435pub fn decode_node_find(buf: &[u8; PAGE_SIZE], target: &[u8]) -> Result<Option<Vec<u8>>> {
466 match peek_node_kind(buf)? {
467 NodeKind::Leaf => {}
468 NodeKind::Internal => {
469 return Err(Error::BTreeInvariantViolated {
470 reason: "decode_node_find called on an internal node",
471 });
472 }
473 }
474 if buf[OFF_LEVEL] != 0 {
477 return Err(Error::BTreeInvariantViolated {
478 reason: "leaf has non-zero level",
479 });
480 }
481 let key_count = u16::from_le_bytes([buf[OFF_KEY_COUNT], buf[OFF_KEY_COUNT + 1]]) as usize;
482 if key_count > LEAF_SLOT_CAP {
483 return Err(Error::Corruption { page_id: 0 });
484 }
485 find_leaf_value(buf, key_count, target)
486}
487
488fn find_leaf_value(
493 buf: &[u8; PAGE_SIZE],
494 key_count: usize,
495 target: &[u8],
496) -> Result<Option<Vec<u8>>> {
497 let mut prev_key: Option<&[u8]> = None;
498 for i in 0..key_count {
499 let slot_off = PAYLOAD_OFFSET + i * LEAF_SLOT_BYTES;
500 if slot_off + LEAF_SLOT_BYTES > PAYLOAD_END {
501 return Err(Error::Corruption { page_id: 0 });
502 }
503 let entry_off = u32::from_le_bytes([
504 buf[slot_off],
505 buf[slot_off + 1],
506 buf[slot_off + 2],
507 buf[slot_off + 3],
508 ]) as usize;
509 if !(PAYLOAD_OFFSET..PAYLOAD_END).contains(&entry_off) {
510 return Err(Error::Corruption { page_id: 0 });
511 }
512 let (key, after_key) = read_length_prefixed(buf, entry_off)?;
513 let (value, _) = read_length_prefixed(buf, after_key)?;
514 if key.len() > max_key_len() {
515 return Err(Error::Corruption { page_id: 0 });
516 }
517 if let Some(prev) = prev_key {
519 if key <= prev {
520 return Err(Error::Corruption { page_id: 0 });
521 }
522 }
523 if key == target {
527 return Ok(Some(value.to_vec()));
528 }
529 prev_key = Some(key);
530 }
531 Ok(None)
532}
533
534fn decode_leaf_body(buf: &[u8; PAGE_SIZE], node: &mut DecodedNode, key_count: usize) -> Result<()> {
535 if key_count > LEAF_SLOT_CAP {
536 return Err(Error::Corruption { page_id: 0 });
537 }
538 node.leaves.reserve(key_count);
539 for i in 0..key_count {
540 let slot_off = PAYLOAD_OFFSET + i * LEAF_SLOT_BYTES;
541 if slot_off + LEAF_SLOT_BYTES > PAYLOAD_END {
542 return Err(Error::Corruption { page_id: 0 });
543 }
544 let entry_off = u32::from_le_bytes([
545 buf[slot_off],
546 buf[slot_off + 1],
547 buf[slot_off + 2],
548 buf[slot_off + 3],
549 ]) as usize;
550 if !(PAYLOAD_OFFSET..PAYLOAD_END).contains(&entry_off) {
551 return Err(Error::Corruption { page_id: 0 });
552 }
553 let (key, after_key) = read_length_prefixed(buf, entry_off)?;
554 let (value, _) = read_length_prefixed(buf, after_key)?;
555 if key.len() > max_key_len() {
556 return Err(Error::Corruption { page_id: 0 });
557 }
558 if let Some(prev) = node.leaves.last() {
561 if key <= prev.key.as_slice() {
562 return Err(Error::Corruption { page_id: 0 });
563 }
564 }
565 node.leaves.push(LeafEntry {
566 key: key.to_vec(),
567 value: value.to_vec(),
568 });
569 }
570 Ok(())
571}
572
573fn decode_internal_body(
574 buf: &[u8; PAGE_SIZE],
575 node: &mut DecodedNode,
576 key_count: usize,
577) -> Result<()> {
578 if key_count > INTERNAL_SLOT_CAP {
579 return Err(Error::Corruption { page_id: 0 });
580 }
581 let leftmost = read_u64(buf, PAYLOAD_OFFSET);
582 if leftmost == 0 {
583 return Err(Error::Corruption { page_id: 0 });
584 }
585 node.children.reserve(key_count + 1);
586 node.internals.reserve(key_count);
587 node.children.push(leftmost);
588 for i in 0..key_count {
589 let (key_vec, right_child) = decode_internal_slot(buf, i)?;
590 if let Some(prev) = node.internals.last() {
593 if key_vec.as_slice() <= prev.key.as_slice() {
594 return Err(Error::Corruption { page_id: 0 });
595 }
596 }
597 node.internals.push(InternalEntry { key: key_vec });
598 node.children.push(right_child);
599 }
600 Ok(())
601}
602
603fn decode_internal_slot(buf: &[u8; PAGE_SIZE], i: usize) -> Result<(Vec<u8>, u64)> {
607 let slot_off = PAYLOAD_OFFSET + INTERNAL_LEFTMOST_CHILD_BYTES + i * INTERNAL_SLOT_BYTES;
608 if slot_off + INTERNAL_SLOT_BYTES > PAYLOAD_END {
609 return Err(Error::Corruption { page_id: 0 });
610 }
611 let entry_off = u32::from_le_bytes([
612 buf[slot_off],
613 buf[slot_off + 1],
614 buf[slot_off + 2],
615 buf[slot_off + 3],
616 ]) as usize;
617 let right_child = read_u64(buf, slot_off + 4);
618 if right_child == 0 || !(PAYLOAD_OFFSET..PAYLOAD_END).contains(&entry_off) {
619 return Err(Error::Corruption { page_id: 0 });
620 }
621 let (key, _) = read_length_prefixed(buf, entry_off)?;
622 if key.len() > max_key_len() {
623 return Err(Error::Corruption { page_id: 0 });
624 }
625 Ok((key.to_vec(), right_child))
626}
627
628fn read_u64(buf: &[u8; PAGE_SIZE], offset: usize) -> u64 {
631 u64::from_le_bytes([
632 buf[offset],
633 buf[offset + 1],
634 buf[offset + 2],
635 buf[offset + 3],
636 buf[offset + 4],
637 buf[offset + 5],
638 buf[offset + 6],
639 buf[offset + 7],
640 ])
641}
642
643fn read_length_prefixed(buf: &[u8; PAGE_SIZE], offset: usize) -> Result<(&[u8], usize)> {
659 if offset >= PAYLOAD_END {
660 return Err(Error::Corruption { page_id: 0 });
661 }
662 let (len, after) = read_varint(buf, offset)?;
663 let len_usize = usize_from_u64(len)?;
664 let end = after
667 .checked_add(len_usize)
668 .ok_or(Error::Corruption { page_id: 0 })?;
669 if end > PAYLOAD_END {
670 return Err(Error::Corruption { page_id: 0 });
671 }
672 Ok((&buf[after..end], end))
673}
674
675pub fn validate_node(node: &DecodedNode) -> Result<()> {
683 validate_node_release(node)
684}
685
686fn validate_node_release(node: &DecodedNode) -> Result<()> {
687 validate_node_structural(node)?;
688 validate_node_ordering(node)
689}
690
691fn validate_node_structural(node: &DecodedNode) -> Result<()> {
703 match node.kind {
704 NodeKind::Leaf => {
705 if node.level != 0 {
706 return Err(Error::BTreeInvariantViolated {
707 reason: "leaf has non-zero level",
708 });
709 }
710 if node.leaves.len() > LEAF_SLOT_CAP {
711 return Err(Error::BTreeInvariantViolated {
712 reason: "leaf key count exceeds slot cap",
713 });
714 }
715 }
716 NodeKind::Internal => {
717 if node.level == 0 {
718 return Err(Error::BTreeInvariantViolated {
719 reason: "internal node at level 0",
720 });
721 }
722 if node.internals.len() > INTERNAL_SLOT_CAP {
723 return Err(Error::BTreeInvariantViolated {
724 reason: "internal key count exceeds slot cap",
725 });
726 }
727 if node.children.len() != node.internals.len() + 1 {
728 return Err(Error::BTreeInvariantViolated {
729 reason: "internal children.len() != pivots+1",
730 });
731 }
732 for c in &node.children {
733 if *c == 0 {
734 return Err(Error::BTreeInvariantViolated {
735 reason: "internal node has zero child page-id",
736 });
737 }
738 }
739 if node.next_sibling != 0 {
740 return Err(Error::BTreeInvariantViolated {
741 reason: "internal node has non-zero next_sibling",
742 });
743 }
744 }
745 }
746 Ok(())
747}
748
749fn validate_node_ordering(node: &DecodedNode) -> Result<()> {
755 match node.kind {
756 NodeKind::Leaf => {
757 for w in node.leaves.windows(2) {
758 if w[0].key.as_slice() >= w[1].key.as_slice() {
759 return Err(Error::BTreeInvariantViolated {
760 reason: "leaf keys not strictly sorted",
761 });
762 }
763 }
764 }
765 NodeKind::Internal => {
766 for w in node.internals.windows(2) {
767 if w[0].key.as_slice() >= w[1].key.as_slice() {
768 return Err(Error::BTreeInvariantViolated {
769 reason: "internal keys not strictly sorted",
770 });
771 }
772 }
773 }
774 }
775 Ok(())
776}
777
778#[must_use]
782pub fn varint_len(v: u64) -> usize {
783 let mut n: usize = 1;
784 let mut x = v >> 7;
785 while x != 0 {
786 n += 1;
787 x >>= 7;
788 }
789 n
790}
791
792fn write_varint(dst: &mut [u8], mut v: u64) -> usize {
795 let mut i = 0;
796 loop {
797 let mut byte = (v & 0x7F) as u8;
798 v >>= 7;
799 if v != 0 {
800 byte |= 0x80;
801 dst[i] = byte;
802 i += 1;
803 } else {
804 dst[i] = byte;
805 i += 1;
806 return i;
807 }
808 }
809}
810
811fn read_varint(buf: &[u8; PAGE_SIZE], offset: usize) -> Result<(u64, usize)> {
816 let mut value: u64 = 0;
817 let mut shift: u32 = 0;
818 let mut i = offset;
819 for _ in 0..10 {
820 if i >= PAYLOAD_END {
821 return Err(Error::Corruption { page_id: 0 });
822 }
823 let byte = buf[i];
824 i += 1;
825 let chunk = u64::from(byte & 0x7F);
826 value |= chunk << shift;
828 if (byte & 0x80) == 0 {
829 return Ok((value, i));
830 }
831 shift += 7;
832 if shift >= 64 {
833 return Err(Error::Corruption { page_id: 0 });
834 }
835 }
836 Err(Error::Corruption { page_id: 0 })
837}
838
839fn u32_from_usize(v: usize) -> u32 {
842 debug_assert!(u32::try_from(v).is_ok());
844 u32::try_from(v).unwrap_or(u32::MAX)
848}
849
850fn u16_from_usize(v: usize) -> u16 {
851 debug_assert!(u16::try_from(v).is_ok());
852 u16::try_from(v).unwrap_or(u16::MAX)
853}
854
855fn u64_from_usize(v: usize) -> u64 {
856 v as u64
857}
858
859fn usize_from_u64(v: u64) -> Result<usize> {
860 usize::try_from(v).map_err(|_| Error::Corruption { page_id: 0 })
861}
862
863#[cfg(test)]
864mod tests {
865 use super::*;
866
867 fn empty_leaf() -> DecodedNode {
868 DecodedNode {
869 kind: NodeKind::Leaf,
870 level: 0,
871 next_sibling: 0,
872 children: Vec::new(),
873 leaves: Vec::new(),
874 internals: Vec::new(),
875 }
876 }
877
878 fn leaf_with(entries: &[(&[u8], &[u8])]) -> DecodedNode {
879 let mut leaf = empty_leaf();
880 for (k, v) in entries {
881 leaf.leaves.push(LeafEntry {
882 key: k.to_vec(),
883 value: v.to_vec(),
884 });
885 }
886 leaf
887 }
888
889 #[test]
890 fn round_trip_empty_leaf() {
891 let leaf = empty_leaf();
892 let mut page = Page::zeroed();
893 encode_node(&leaf, &mut page).expect("encode");
894 let decoded = decode_node(page.as_bytes()).expect("decode");
895 assert_eq!(decoded.kind, NodeKind::Leaf);
896 assert_eq!(decoded.level, 0);
897 assert_eq!(decoded.next_sibling, 0);
898 assert!(decoded.leaves.is_empty());
899 }
900
901 #[test]
902 fn round_trip_populated_leaf() {
903 let leaf = leaf_with(&[(b"alpha", b"AAA"), (b"bravo", b"BBB"), (b"charlie", b"CCC")]);
904 let mut page = Page::zeroed();
905 encode_node(&leaf, &mut page).expect("encode");
906 let decoded = decode_node(page.as_bytes()).expect("decode");
907 assert_eq!(decoded.leaves.len(), 3);
908 assert_eq!(decoded.leaves[0].key.as_slice(), b"alpha");
909 assert_eq!(decoded.leaves[0].value.as_slice(), b"AAA");
910 assert_eq!(decoded.leaves[2].key.as_slice(), b"charlie");
911 }
912
913 #[test]
914 fn round_trip_internal() {
915 let mut internal = DecodedNode {
916 kind: NodeKind::Internal,
917 level: 1,
918 next_sibling: 0,
919 children: Vec::new(),
920 leaves: Vec::new(),
921 internals: Vec::new(),
922 };
923 for raw in [10u64, 20, 30, 40] {
925 internal.children.push(raw);
926 }
927 for k in [b"d".as_slice(), b"h", b"m"] {
928 internal.internals.push(InternalEntry { key: k.to_vec() });
929 }
930 let mut page = Page::zeroed();
931 encode_node(&internal, &mut page).expect("encode");
932 let decoded = decode_node(page.as_bytes()).expect("decode");
933 assert_eq!(decoded.kind, NodeKind::Internal);
934 assert_eq!(decoded.level, 1);
935 assert_eq!(decoded.internals.len(), 3);
936 assert_eq!(decoded.children.as_slice(), &[10, 20, 30, 40]);
937 assert_eq!(decoded.internals[0].key.as_slice(), b"d");
938 assert_eq!(decoded.internals[2].key.as_slice(), b"m");
939 }
940
941 #[test]
942 fn decode_rejects_unsorted_leaf() {
943 let mut page = Page::zeroed();
946 let buf = page.as_bytes_mut();
947 buf[OFF_PAGE_TYPE] = PAGE_TYPE_BTREE_LEAF;
948 buf[OFF_LEVEL] = 0;
949 buf[OFF_KEY_COUNT..OFF_KEY_COUNT + 2].copy_from_slice(&2u16.to_le_bytes());
950 let slot0 = u32_from_usize(PAYLOAD_END - 4);
953 let slot1 = u32_from_usize(PAYLOAD_END - 8);
954 buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + 4].copy_from_slice(&slot0.to_le_bytes());
955 buf[PAYLOAD_OFFSET + 4..PAYLOAD_OFFSET + 8].copy_from_slice(&slot1.to_le_bytes());
956 buf[PAYLOAD_END - 4] = 1;
958 buf[PAYLOAD_END - 3] = b'c';
959 buf[PAYLOAD_END - 2] = 1;
960 buf[PAYLOAD_END - 1] = b'x';
961 buf[PAYLOAD_END - 8] = 1;
963 buf[PAYLOAD_END - 7] = b'a';
964 buf[PAYLOAD_END - 6] = 1;
965 buf[PAYLOAD_END - 5] = b'y';
966 crate::pager::checksum::write_page_trailer(&mut page);
967 let err = decode_node(page.as_bytes()).expect_err("decode must reject");
968 assert!(matches!(err, Error::Corruption { .. }));
969 }
970
971 #[test]
972 fn varint_round_trip() {
973 for v in [0u64, 1, 127, 128, 0xFFFF, 0xDEAD_BEEF, u64::MAX] {
974 let mut buf = [0u8; 16];
975 let n = write_varint(&mut buf, v);
976 assert_eq!(n, varint_len(v));
977 let mut page = [0u8; PAGE_SIZE];
979 page[..n].copy_from_slice(&buf[..n]);
980 let (decoded, after) = read_varint(&page, 0).expect("decode varint");
981 assert_eq!(decoded, v);
982 assert_eq!(after, n);
983 }
984 }
985
986 #[test]
997 fn decode_node_varint_max_returns_corruption_not_panic() {
998 let mut page = Page::zeroed();
999 {
1000 let buf = page.as_bytes_mut();
1001 buf[OFF_PAGE_TYPE] = PAGE_TYPE_BTREE_LEAF;
1002 buf[OFF_LEVEL] = 0;
1003 buf[OFF_KEY_COUNT..OFF_KEY_COUNT + 2].copy_from_slice(&1u16.to_le_bytes());
1007 let entry_off = PAYLOAD_END - 16;
1012 let slot_off_bytes = u32_from_usize(entry_off).to_le_bytes();
1013 buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + 4].copy_from_slice(&slot_off_bytes);
1014 let varint = [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01];
1020 buf[entry_off..entry_off + varint.len()].copy_from_slice(&varint);
1021 }
1022 crate::pager::checksum::write_page_trailer(&mut page);
1023 let result = decode_node(page.as_bytes());
1024 match result {
1025 Err(Error::Corruption { .. }) => {}
1026 other => panic!(
1027 "expected Err(Error::Corruption), got {:?} \
1028 (a panic here would indicate the M13 #115 bug \
1029 is still present in release builds)",
1030 other.map_or("non-corruption err", |_| "Ok(_)")
1031 ),
1032 }
1033 }
1034
1035 #[test]
1040 fn decode_internal_node_varint_max_returns_corruption() {
1041 let mut page = Page::zeroed();
1042 {
1043 let buf = page.as_bytes_mut();
1044 buf[OFF_PAGE_TYPE] = PAGE_TYPE_BTREE_INTERNAL;
1045 buf[OFF_LEVEL] = 1;
1046 buf[OFF_KEY_COUNT..OFF_KEY_COUNT + 2].copy_from_slice(&1u16.to_le_bytes());
1047 buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + 8].copy_from_slice(&42u64.to_le_bytes());
1049 let slot_base = PAYLOAD_OFFSET + INTERNAL_LEFTMOST_CHILD_BYTES;
1051 let entry_off = PAYLOAD_END - 16;
1052 buf[slot_base..slot_base + 4].copy_from_slice(&u32_from_usize(entry_off).to_le_bytes());
1053 buf[slot_base + 4..slot_base + INTERNAL_SLOT_BYTES]
1054 .copy_from_slice(&7u64.to_le_bytes());
1055 let varint = [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0x01];
1057 buf[entry_off..entry_off + varint.len()].copy_from_slice(&varint);
1058 }
1059 crate::pager::checksum::write_page_trailer(&mut page);
1060 let result = decode_node(page.as_bytes());
1061 assert!(
1062 matches!(result, Err(Error::Corruption { .. })),
1063 "expected Err(Error::Corruption) on u64::MAX varint in internal slot"
1064 );
1065 }
1066
1067 #[test]
1068 fn max_inline_value_is_below_payload() {
1069 let k = 8;
1070 let v = max_inline_value(k);
1071 let entry_len = varint_len(u64_from_usize(k)) + k + varint_len(u64_from_usize(v)) + v;
1074 let payload = entry_len + LEAF_SLOT_BYTES;
1075 assert!(payload <= PAYLOAD_END - PAYLOAD_OFFSET);
1076 }
1077
1078 #[test]
1083 fn decode_node_find_matches_full_decode() {
1084 let leaf = leaf_with(&[(b"alpha", b"AAA"), (b"bravo", b"BBB"), (b"charlie", b"CCC")]);
1085 let mut page = Page::zeroed();
1086 encode_node(&leaf, &mut page).expect("encode");
1087 let found = decode_node_find(page.as_bytes(), b"bravo").expect("find");
1088 assert_eq!(found.as_deref(), Some(b"BBB".as_slice()));
1089 let missing = decode_node_find(page.as_bytes(), b"zeta").expect("find");
1090 assert_eq!(missing, None);
1091 assert_eq!(
1093 decode_node_find(page.as_bytes(), b"alpha")
1094 .expect("find")
1095 .as_deref(),
1096 Some(b"AAA".as_slice())
1097 );
1098 assert_eq!(
1099 decode_node_find(page.as_bytes(), b"charlie")
1100 .expect("find")
1101 .as_deref(),
1102 Some(b"CCC".as_slice())
1103 );
1104 }
1105
1106 #[test]
1112 fn decode_node_find_rejects_corrupted_slot_offset() {
1113 let leaf = leaf_with(&[(b"a", b"x"), (b"b", b"y")]);
1114 let mut page = Page::zeroed();
1115 encode_node(&leaf, &mut page).expect("encode");
1116 let buf = page.as_bytes_mut();
1117 let bad = u32_from_usize(PAYLOAD_END + 4);
1119 buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + 4].copy_from_slice(&bad.to_le_bytes());
1120 crate::pager::checksum::write_page_trailer(&mut page);
1121 let r = decode_node_find(page.as_bytes(), b"b");
1122 assert!(
1123 matches!(r, Err(Error::Corruption { .. })),
1124 "corrupted slot offset not caught by find path: {r:?}"
1125 );
1126 }
1127
1128 #[test]
1134 fn decode_node_find_rejects_out_of_order_keys() {
1135 let mut page = Page::zeroed();
1136 {
1137 let buf = page.as_bytes_mut();
1138 buf[OFF_PAGE_TYPE] = PAGE_TYPE_BTREE_LEAF;
1139 buf[OFF_LEVEL] = 0;
1140 buf[OFF_KEY_COUNT..OFF_KEY_COUNT + 2].copy_from_slice(&2u16.to_le_bytes());
1141 let slot0 = u32_from_usize(PAYLOAD_END - 4);
1143 let slot1 = u32_from_usize(PAYLOAD_END - 8);
1144 buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + 4].copy_from_slice(&slot0.to_le_bytes());
1145 buf[PAYLOAD_OFFSET + 4..PAYLOAD_OFFSET + 8].copy_from_slice(&slot1.to_le_bytes());
1146 buf[PAYLOAD_END - 4] = 1;
1147 buf[PAYLOAD_END - 3] = b'c';
1148 buf[PAYLOAD_END - 2] = 1;
1149 buf[PAYLOAD_END - 1] = b'x';
1150 buf[PAYLOAD_END - 8] = 1;
1151 buf[PAYLOAD_END - 7] = b'a';
1152 buf[PAYLOAD_END - 6] = 1;
1153 buf[PAYLOAD_END - 5] = b'y';
1154 }
1155 crate::pager::checksum::write_page_trailer(&mut page);
1156 let r = decode_node_find(page.as_bytes(), b"z");
1157 assert!(
1158 matches!(r, Err(Error::Corruption { .. })),
1159 "out-of-order key not caught by find path: {r:?}"
1160 );
1161 }
1162
1163 #[test]
1168 fn decode_node_find_rejects_over_long_key() {
1169 let mut page = Page::zeroed();
1170 {
1171 let buf = page.as_bytes_mut();
1172 buf[OFF_PAGE_TYPE] = PAGE_TYPE_BTREE_LEAF;
1173 buf[OFF_LEVEL] = 0;
1174 buf[OFF_KEY_COUNT..OFF_KEY_COUNT + 2].copy_from_slice(&1u16.to_le_bytes());
1175 let over = max_key_len() + 1;
1176 let entry_off = PAYLOAD_OFFSET + LEAF_SLOT_BYTES;
1177 buf[PAYLOAD_OFFSET..PAYLOAD_OFFSET + 4]
1178 .copy_from_slice(&u32_from_usize(entry_off).to_le_bytes());
1179 let n = write_varint(&mut buf[entry_off..], u64_from_usize(over));
1180 assert_eq!(n, varint_len(u64_from_usize(over)));
1181 }
1184 crate::pager::checksum::write_page_trailer(&mut page);
1185 let r = decode_node_find(page.as_bytes(), b"anything");
1186 assert!(
1187 matches!(r, Err(Error::Corruption { .. })),
1188 "over-long key length not caught by find path: {r:?}"
1189 );
1190 }
1191
1192 #[test]
1195 fn decode_node_find_rejects_internal_node() {
1196 let mut internal = DecodedNode {
1197 kind: NodeKind::Internal,
1198 level: 1,
1199 next_sibling: 0,
1200 children: Vec::new(),
1201 leaves: Vec::new(),
1202 internals: Vec::new(),
1203 };
1204 internal.children.push(10);
1205 internal.children.push(20);
1206 internal
1207 .internals
1208 .push(InternalEntry { key: b"m".to_vec() });
1209 let mut page = Page::zeroed();
1210 encode_node(&internal, &mut page).expect("encode");
1211 let r = decode_node_find(page.as_bytes(), b"m");
1212 assert!(matches!(r, Err(Error::BTreeInvariantViolated { .. })));
1213 }
1214}