Skip to main content

shodh_redb/tree_store/
btree_base.rs

1use crate::compat::Mutex;
2use crate::tree_store::page_store::compression::{
3    CompressionConfig, compress_value, decompress_value,
4};
5use crate::tree_store::page_store::{Page, PageImpl, PageMut, TransactionalMemory, xxh3_checksum};
6use crate::tree_store::{PageNumber, PageTrackerPolicy};
7use crate::types::{Key, MutInPlaceValue, Value};
8use crate::{Result, StorageError};
9use alloc::borrow::Cow;
10use alloc::boxed::Box;
11use alloc::format;
12use alloc::sync::Arc;
13use alloc::vec::Vec;
14use core::borrow::Borrow;
15use core::cmp::Ordering;
16use core::marker::PhantomData;
17use core::mem::size_of;
18use core::ops::Range;
19
20pub(crate) const LEAF: u8 = 1;
21pub(crate) const BRANCH: u8 = 2;
22
23pub(crate) type Checksum = u128;
24// Dummy value. Final value will be computed during commit
25pub(crate) const DEFERRED: Checksum = 999;
26
27pub(super) fn leaf_checksum<T: Page>(
28    page: &T,
29    fixed_key_size: Option<usize>,
30    fixed_value_size: Option<usize>,
31) -> Result<Checksum, StorageError> {
32    let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size)?;
33    let last_pair = accessor.num_pairs().checked_sub(1).ok_or_else(|| {
34        StorageError::page_corrupted(page.get_page_number(), "leaf page has zero pairs")
35    })?;
36    let end = accessor.value_end(last_pair).ok_or_else(|| {
37        StorageError::page_corrupted(page.get_page_number(), "leaf page has invalid pair offset")
38    })?;
39    if end > page.memory().len() {
40        Err(StorageError::page_corrupted(
41            page.get_page_number(),
42            "leaf page last offset beyond end of data",
43        ))
44    } else {
45        Ok(xxh3_checksum(&page.memory()[..end]))
46    }
47}
48
49pub(super) fn branch_checksum<T: Page>(
50    page: &T,
51    fixed_key_size: Option<usize>,
52) -> Result<Checksum, StorageError> {
53    let accessor = BranchAccessor::new(page, fixed_key_size)?;
54    let last_key = accessor.num_keys().checked_sub(1).ok_or_else(|| {
55        StorageError::page_corrupted(page.get_page_number(), "branch page has zero keys")
56    })?;
57    let end = accessor.key_end(last_key).ok_or_else(|| {
58        StorageError::page_corrupted(page.get_page_number(), "branch page has invalid key offset")
59    })?;
60    if end > page.memory().len() {
61        Err(StorageError::page_corrupted(
62            page.get_page_number(),
63            "branch page last offset beyond end of data",
64        ))
65    } else {
66        Ok(xxh3_checksum(&page.memory()[..end]))
67    }
68}
69
70#[derive(Debug, Copy, Clone, Eq, PartialEq)]
71pub(crate) struct BtreeHeader {
72    pub(crate) root: PageNumber,
73    pub(crate) checksum: Checksum,
74    pub(crate) length: u64,
75}
76
77impl BtreeHeader {
78    pub(crate) fn new(root: PageNumber, checksum: Checksum, length: u64) -> Self {
79        Self {
80            root,
81            checksum,
82            length,
83        }
84    }
85
86    pub(crate) const fn serialized_size() -> usize {
87        PageNumber::serialized_size() + size_of::<Checksum>() + size_of::<u64>()
88    }
89
90    pub(crate) fn from_le_bytes(bytes: [u8; Self::serialized_size()]) -> Self {
91        let root =
92            PageNumber::from_le_bytes(bytes[..PageNumber::serialized_size()].try_into().unwrap());
93        let mut offset = PageNumber::serialized_size();
94        let checksum = Checksum::from_le_bytes(
95            bytes[offset..(offset + size_of::<Checksum>())]
96                .try_into()
97                .unwrap(),
98        );
99        offset += size_of::<Checksum>();
100        let length = u64::from_le_bytes(
101            bytes[offset..(offset + size_of::<u64>())]
102                .try_into()
103                .unwrap(),
104        );
105
106        Self {
107            root,
108            checksum,
109            length,
110        }
111    }
112
113    pub(crate) fn to_le_bytes(self) -> [u8; Self::serialized_size()] {
114        let mut result = [0; Self::serialized_size()];
115        result[..PageNumber::serialized_size()].copy_from_slice(&self.root.to_le_bytes());
116        result[PageNumber::serialized_size()
117            ..(PageNumber::serialized_size() + size_of::<Checksum>())]
118            .copy_from_slice(&self.checksum.to_le_bytes());
119        result[(PageNumber::serialized_size() + size_of::<Checksum>())..]
120            .copy_from_slice(&self.length.to_le_bytes());
121
122        result
123    }
124}
125
126enum OnDrop {
127    None,
128    RemoveEntry {
129        position: usize,
130        fixed_key_size: Option<usize>,
131        fixed_value_size: Option<usize>,
132    },
133}
134
135enum EitherPage {
136    Immutable(PageImpl),
137    Mutable(PageMut),
138    OwnedMemory(Vec<u8>),
139    ArcMemory(Arc<[u8]>),
140}
141
142impl EitherPage {
143    fn memory(&self) -> &[u8] {
144        match self {
145            EitherPage::Immutable(page) => page.memory(),
146            EitherPage::Mutable(page) => page.memory(),
147            EitherPage::OwnedMemory(mem) => mem.as_slice(),
148            EitherPage::ArcMemory(mem) => mem,
149        }
150    }
151}
152
153/// Scoped accessor to data in the database
154///
155/// When this structure is dropped (goes out of scope), the data is released
156pub struct AccessGuard<'a, V: Value + 'static> {
157    page: EitherPage,
158    offset: usize,
159    len: usize,
160    on_drop: OnDrop,
161    // Boxed to minimize struct size -- AccessGuard is returned from recursive B-tree
162    // operations, and every byte matters for stack depth on Windows (1MB default stack).
163    decompressed_value: Option<Box<[u8]>>,
164    _value_type: PhantomData<V>,
165    // Used so that logical references into a Table respect the appropriate lifetime
166    _lifetime: PhantomData<&'a ()>,
167}
168
169impl<V: Value + 'static> AccessGuard<'_, V> {
170    pub(crate) fn with_page(page: PageImpl, range: Range<usize>) -> Self {
171        Self {
172            page: EitherPage::Immutable(page),
173            offset: range.start,
174            len: range.len(),
175            on_drop: OnDrop::None,
176            decompressed_value: None,
177            _value_type: Default::default(),
178            _lifetime: Default::default(),
179        }
180    }
181
182    /// Construct an `AccessGuard` for a value, decompressing if needed.
183    /// Only call for values from compressed databases.
184    pub(crate) fn with_page_decompress(page: PageImpl, range: Range<usize>) -> Result<Self> {
185        let raw = &page.memory()[range.clone()];
186        match decompress_value(raw)? {
187            Cow::Borrowed(_) => {
188                // Uncompressed passthrough -- flags byte stripped, data starts at offset+1
189                Ok(Self::with_page(page, (range.start + 1)..range.end))
190            }
191            Cow::Owned(decompressed) => Ok(Self::with_owned_value(decompressed)),
192        }
193    }
194
195    pub(crate) fn with_arc_page(page: Arc<[u8]>, range: Range<usize>) -> Self {
196        Self {
197            page: EitherPage::ArcMemory(page),
198            offset: range.start,
199            len: range.len(),
200            on_drop: OnDrop::None,
201            decompressed_value: None,
202            _value_type: Default::default(),
203            _lifetime: Default::default(),
204        }
205    }
206
207    /// Construct an `AccessGuard` for a value from an Arc page, decompressing if needed.
208    pub(crate) fn with_arc_page_decompress(page: Arc<[u8]>, range: Range<usize>) -> Result<Self> {
209        let raw = &page[range.clone()];
210        match decompress_value(raw)? {
211            Cow::Borrowed(_) => {
212                // Uncompressed -- flags byte stripped, skip 1 byte
213                Ok(Self::with_arc_page(page, (range.start + 1)..range.end))
214            }
215            Cow::Owned(decompressed) => Ok(Self::with_owned_value(decompressed)),
216        }
217    }
218
219    pub(crate) fn with_owned_value(value: Vec<u8>) -> Self {
220        let len = value.len();
221        Self {
222            page: EitherPage::OwnedMemory(value),
223            offset: 0,
224            len,
225            on_drop: OnDrop::None,
226            decompressed_value: None,
227            _value_type: Default::default(),
228            _lifetime: Default::default(),
229        }
230    }
231
232    /// Construct an `AccessGuard` with an owned value, decompressing if needed.
233    pub(crate) fn with_owned_value_decompress(value: Vec<u8>) -> Result<Self> {
234        match decompress_value(&value)? {
235            Cow::Borrowed(_) => {
236                // Not compressed envelope -- strip flags byte
237                Ok(Self::with_owned_value(value[1..].to_vec()))
238            }
239            Cow::Owned(decompressed) => Ok(Self::with_owned_value(decompressed)),
240        }
241    }
242
243    pub(super) fn remove_on_drop(
244        page: PageMut,
245        offset: usize,
246        len: usize,
247        position: usize,
248        fixed_key_size: Option<usize>,
249        fixed_value_size: Option<usize>,
250    ) -> Self {
251        Self {
252            page: EitherPage::Mutable(page),
253            offset,
254            len,
255            on_drop: OnDrop::RemoveEntry {
256                position,
257                fixed_key_size,
258                fixed_value_size,
259            },
260            decompressed_value: None,
261            _value_type: Default::default(),
262            _lifetime: Default::default(),
263        }
264    }
265
266    /// Construct a remove-on-drop `AccessGuard` with decompression.
267    pub(super) fn remove_on_drop_decompress(
268        page: PageMut,
269        offset: usize,
270        len: usize,
271        position: usize,
272        fixed_key_size: Option<usize>,
273        fixed_value_size: Option<usize>,
274    ) -> Result<Self> {
275        let raw = &page.memory()[offset..(offset + len)];
276        let decompressed_value = match decompress_value(raw)? {
277            Cow::Borrowed(_) => {
278                // Uncompressed -- strip flags byte
279                Some(raw[1..].to_vec().into_boxed_slice())
280            }
281            Cow::Owned(decompressed) => Some(decompressed.into_boxed_slice()),
282        };
283        Ok(Self {
284            page: EitherPage::Mutable(page),
285            offset,
286            len,
287            on_drop: OnDrop::RemoveEntry {
288                position,
289                fixed_key_size,
290                fixed_value_size,
291            },
292            decompressed_value,
293            _value_type: Default::default(),
294            _lifetime: Default::default(),
295        })
296    }
297
298    /// Access the stored value
299    pub fn value(&self) -> V::SelfType<'_> {
300        if let Some(ref decompressed) = self.decompressed_value {
301            V::from_bytes(decompressed)
302        } else {
303            V::from_bytes(&self.page.memory()[self.offset..(self.offset + self.len)])
304        }
305    }
306}
307
308impl<V: Value + 'static> Drop for AccessGuard<'_, V> {
309    fn drop(&mut self) {
310        match self.on_drop {
311            OnDrop::None => {}
312            OnDrop::RemoveEntry {
313                position,
314                fixed_key_size,
315                fixed_value_size,
316            } => {
317                if let EitherPage::Mutable(ref mut mut_page) = self.page {
318                    if let Ok(mem) = mut_page.memory_mut() {
319                        let mut mutator = LeafMutator::new(mem, fixed_key_size, fixed_value_size);
320                        mutator.remove(position);
321                    }
322                } else {
323                    let is_panicking = {
324                        #[cfg(feature = "std")]
325                        {
326                            std::thread::panicking()
327                        }
328                        #[cfg(not(feature = "std"))]
329                        {
330                            false
331                        }
332                    };
333                    assert!(
334                        is_panicking,
335                        "AccessGuard with RemoveEntry on-drop requires a mutable page, but page was immutable"
336                    );
337                }
338            }
339        }
340    }
341}
342
343pub struct AccessGuardMut<'a, V: Value + 'static> {
344    page: PageMut,
345    offset: usize,
346    len: usize,
347    entry_index: usize,
348    parent: Option<(PageMut, usize)>,
349    mem: Arc<TransactionalMemory>,
350    allocated: Arc<Mutex<PageTrackerPolicy>>,
351    root_ref: &'a mut BtreeHeader,
352    key_width: Option<usize>,
353    fixed_value_size: Option<usize>,
354    compression: CompressionConfig,
355    decompressed_value: Option<Box<[u8]>>,
356    _value_type: PhantomData<V>,
357}
358
359impl<'a, V: Value + 'static> AccessGuardMut<'a, V> {
360    #[allow(clippy::too_many_arguments)]
361    pub(crate) fn new(
362        page: PageMut,
363        offset: usize,
364        len: usize,
365        entry_index: usize,
366        parent: Option<(PageMut, usize)>,
367        mem: Arc<TransactionalMemory>,
368        allocated: Arc<Mutex<PageTrackerPolicy>>,
369        root_ref: &'a mut BtreeHeader,
370        key_width: Option<usize>,
371        fixed_value_size: Option<usize>,
372        compression: CompressionConfig,
373    ) -> Result<Self> {
374        assert!(mem.uncommitted(page.get_page_number()));
375        if let Some((ref parent_page, _)) = parent {
376            assert!(mem.uncommitted(parent_page.get_page_number()));
377        }
378        let decompressed_value = if compression.is_enabled() && len > 0 {
379            let raw = &page.memory()[offset..(offset + len)];
380            let decompressed = decompress_value(raw)?;
381            match decompressed {
382                Cow::Borrowed(_) => None,
383                Cow::Owned(v) => Some(v.into_boxed_slice()),
384            }
385        } else {
386            None
387        };
388        Ok(AccessGuardMut {
389            page,
390            offset,
391            len,
392            entry_index,
393            parent,
394            mem,
395            allocated,
396            root_ref,
397            key_width,
398            fixed_value_size,
399            compression,
400            decompressed_value,
401            _value_type: Default::default(),
402        })
403    }
404
405    /// Access the stored value
406    pub fn value(&self) -> V::SelfType<'_> {
407        if let Some(ref decompressed) = self.decompressed_value {
408            V::from_bytes(decompressed)
409        } else if self.compression.is_enabled() && self.len > 0 {
410            // Flags byte 0x00 passthrough: strip it
411            V::from_bytes(&self.page.memory()[(self.offset + 1)..(self.offset + self.len)])
412        } else {
413            V::from_bytes(&self.page.memory()[self.offset..(self.offset + self.len)])
414        }
415    }
416
417    /// Replace the stored value
418    pub fn insert<'v>(&mut self, value: impl Borrow<V::SelfType<'v>>) -> Result<()> {
419        let raw_value_bytes = V::as_bytes(value.borrow());
420        let stored_value = if self.compression.is_enabled() {
421            compress_value(raw_value_bytes.as_ref(), self.compression)
422        } else {
423            raw_value_bytes.as_ref().to_vec()
424        };
425
426        let key_bytes = {
427            let accessor =
428                LeafAccessor::new(self.page.memory(), self.key_width, self.fixed_value_size)?;
429            accessor.key_unchecked(self.entry_index).to_vec()
430        };
431
432        if LeafMutator::sufficient_insert_inplace_space(
433            &self.page,
434            self.entry_index,
435            true,
436            self.key_width,
437            self.fixed_value_size,
438            key_bytes.as_slice(),
439            &stored_value,
440        ) {
441            let mut mutator = LeafMutator::new(
442                self.page.memory_mut()?,
443                self.key_width,
444                self.fixed_value_size,
445            );
446            mutator.insert(self.entry_index, true, &key_bytes, &stored_value);
447        } else {
448            let accessor =
449                LeafAccessor::new(self.page.memory(), self.key_width, self.fixed_value_size)?;
450            let mut builder = LeafBuilder::new(
451                &self.mem,
452                &self.allocated,
453                accessor.num_pairs(),
454                self.key_width,
455                self.fixed_value_size,
456            );
457
458            for i in 0..accessor.num_pairs() {
459                if i == self.entry_index {
460                    builder.push(&key_bytes, &stored_value);
461                } else {
462                    let entry = accessor.entry(i).unwrap();
463                    builder.push(entry.key(), entry.value());
464                }
465            }
466
467            let new_page = builder.build()?;
468
469            // Update parent branch page if it exists, otherwise update root
470            if let Some((ref mut parent_page, parent_entry_index)) = self.parent {
471                let mut mutator = BranchMutator::new(parent_page.memory_mut()?);
472                mutator.write_child_page(parent_entry_index, new_page.get_page_number(), DEFERRED);
473            } else {
474                self.root_ref.root = new_page.get_page_number();
475                self.root_ref.checksum = DEFERRED;
476            }
477
478            let old_page_number = self.page.get_page_number();
479            self.page = new_page;
480            let mut allocated = self.allocated.lock();
481            assert!(
482                self.mem
483                    .free_if_uncommitted(old_page_number, &mut allocated)
484            );
485        }
486
487        // Update our page reference to the new page and recalculate offset/length
488        let new_accessor =
489            LeafAccessor::new(self.page.memory(), self.key_width, self.fixed_value_size)?;
490        let (new_start, new_end) = new_accessor.value_range(self.entry_index).unwrap();
491
492        self.offset = new_start;
493        self.len = new_end - new_start;
494
495        // Update decompressed value cache
496        if self.compression.is_enabled() && self.len > 0 {
497            let raw = &self.page.memory()[self.offset..(self.offset + self.len)];
498            self.decompressed_value = match decompress_value(raw)? {
499                Cow::Borrowed(_) => None,
500                Cow::Owned(v) => Some(v.into_boxed_slice()),
501            };
502        } else {
503            self.decompressed_value = None;
504        }
505
506        Ok(())
507    }
508}
509
510pub struct AccessGuardMutInPlace<'a, V: Value + 'static> {
511    page: PageMut,
512    offset: usize,
513    len: usize,
514    _value_type: PhantomData<V>,
515    // Used so that logical references into a Table respect the appropriate lifetime
516    _lifetime: PhantomData<&'a ()>,
517}
518
519impl<V: Value + 'static> AccessGuardMutInPlace<'_, V> {
520    pub(crate) fn new(page: PageMut, offset: usize, len: usize) -> Self {
521        AccessGuardMutInPlace {
522            page,
523            offset,
524            len,
525            _value_type: Default::default(),
526            _lifetime: Default::default(),
527        }
528    }
529}
530
531impl<V: MutInPlaceValue + 'static> AsMut<V::BaseRefType> for AccessGuardMutInPlace<'_, V> {
532    fn as_mut(&mut self) -> &mut V::BaseRefType {
533        let mem = self
534            .page
535            .memory_mut()
536            .expect("AccessGuardMutInPlace requires exclusive page access");
537        V::from_bytes_mut(&mut mem[self.offset..(self.offset + self.len)])
538    }
539}
540
541// Provides a simple zero-copy way to access entries
542pub struct EntryAccessor<'a> {
543    key: &'a [u8],
544    value: &'a [u8],
545}
546
547impl<'a> EntryAccessor<'a> {
548    fn new(key: &'a [u8], value: &'a [u8]) -> Self {
549        EntryAccessor { key, value }
550    }
551}
552
553impl<'a: 'b, 'b> EntryAccessor<'a> {
554    pub(crate) fn key(&'b self) -> &'a [u8] {
555        self.key
556    }
557
558    pub(crate) fn value(&'b self) -> &'a [u8] {
559        self.value
560    }
561}
562
563// Provides a simple zero-copy way to access a leaf page
564pub(crate) struct LeafAccessor<'a> {
565    page: &'a [u8],
566    fixed_key_size: Option<usize>,
567    fixed_value_size: Option<usize>,
568    num_pairs: usize,
569}
570
571impl<'a> LeafAccessor<'a> {
572    pub(crate) fn new(
573        page: &'a [u8],
574        fixed_key_size: Option<usize>,
575        fixed_value_size: Option<usize>,
576    ) -> core::result::Result<Self, StorageError> {
577        if page.len() < 4 {
578            return Err(StorageError::Corrupted(
579                "Leaf page too small for header".into(),
580            ));
581        }
582        debug_assert_eq!(page[0], LEAF);
583        let num_pairs = u16::from_le_bytes(
584            page[2..4]
585                .try_into()
586                .map_err(|_| StorageError::Corrupted("Leaf page: bad num_pairs".into()))?,
587        ) as usize;
588        let mut min_size = 4usize;
589        if fixed_key_size.is_none() {
590            min_size = min_size.saturating_add(size_of::<u32>().saturating_mul(num_pairs));
591        }
592        if fixed_value_size.is_none() {
593            min_size = min_size.saturating_add(size_of::<u32>().saturating_mul(num_pairs));
594        }
595        if min_size > page.len() {
596            return Err(StorageError::Corrupted(format!(
597                "Leaf page: num_pairs={num_pairs} requires {min_size}B for offset tables, page is {}B",
598                page.len()
599            )));
600        }
601        Ok(LeafAccessor {
602            page,
603            fixed_key_size,
604            fixed_value_size,
605            num_pairs,
606        })
607    }
608
609    #[cfg(feature = "std")]
610    pub(super) fn print_node<K: Key, V: Value>(&self, include_value: bool) {
611        let mut i = 0;
612        while let Some(entry) = self.entry(i) {
613            eprint!(" key_{i}={:?}", K::from_bytes(entry.key()));
614            if include_value {
615                eprint!(" value_{i}={:?}", V::from_bytes(entry.value()));
616            }
617            i += 1;
618        }
619    }
620
621    pub(crate) fn position<K: Key>(&self, query: &[u8]) -> (usize, bool) {
622        // inclusive
623        let mut min_entry = 0;
624        // inclusive. Start past end, since it might be positioned beyond the end of the leaf
625        let mut max_entry = self.num_pairs();
626        while min_entry < max_entry {
627            let mid = min_entry.midpoint(max_entry);
628            let key = self.key_unchecked(mid);
629            match K::compare(query, key) {
630                Ordering::Less => {
631                    max_entry = mid;
632                }
633                Ordering::Equal => {
634                    return (mid, true);
635                }
636                Ordering::Greater => {
637                    min_entry = mid + 1;
638                }
639            }
640        }
641        debug_assert_eq!(min_entry, max_entry);
642        (min_entry, false)
643    }
644
645    pub(crate) fn find_key<K: Key>(&self, query: &[u8]) -> Option<usize> {
646        let (entry, found) = self.position::<K>(query);
647        if found { Some(entry) } else { None }
648    }
649
650    fn key_section_start(&self) -> usize {
651        let mut offset = 4;
652        if self.fixed_key_size.is_none() {
653            offset += size_of::<u32>() * self.num_pairs;
654        }
655        if self.fixed_value_size.is_none() {
656            offset += size_of::<u32>() * self.num_pairs;
657        }
658
659        offset
660    }
661
662    fn key_start(&self, n: usize) -> Option<usize> {
663        if n == 0 {
664            Some(self.key_section_start())
665        } else {
666            self.key_end(n - 1)
667        }
668    }
669
670    fn key_end(&self, n: usize) -> Option<usize> {
671        if n >= self.num_pairs() {
672            None
673        } else {
674            if let Some(fixed) = self.fixed_key_size {
675                return Some(self.key_section_start() + fixed * (n + 1));
676            }
677            let offset = 4 + size_of::<u32>() * n;
678            let end = u32::from_le_bytes(
679                self.page
680                    .get(offset..(offset + size_of::<u32>()))?
681                    .try_into()
682                    .unwrap(),
683            ) as usize;
684            Some(end)
685        }
686    }
687
688    fn value_start(&self, n: usize) -> Option<usize> {
689        if self.num_pairs() == 0 {
690            return None;
691        }
692        if n == 0 {
693            self.key_end(self.num_pairs() - 1)
694        } else {
695            self.value_end(n - 1)
696        }
697    }
698
699    fn value_end(&self, n: usize) -> Option<usize> {
700        if n >= self.num_pairs() {
701            None
702        } else {
703            if let Some(fixed) = self.fixed_value_size {
704                return Some(self.key_end(self.num_pairs.checked_sub(1)?)? + fixed * (n + 1));
705            }
706            let mut offset = 4 + size_of::<u32>() * n;
707            if self.fixed_key_size.is_none() {
708                offset += size_of::<u32>() * self.num_pairs;
709            }
710            let end = u32::from_le_bytes(
711                self.page
712                    .get(offset..(offset + size_of::<u32>()))?
713                    .try_into()
714                    .unwrap(),
715            ) as usize;
716            Some(end)
717        }
718    }
719
720    pub(crate) fn num_pairs(&self) -> usize {
721        self.num_pairs
722    }
723
724    pub(super) fn offset_of_first_value(&self) -> usize {
725        self.offset_of_value(0).unwrap()
726    }
727
728    pub(super) fn offset_of_value(&self, n: usize) -> Option<usize> {
729        self.value_start(n)
730    }
731
732    pub(super) fn value_range(&self, n: usize) -> Option<(usize, usize)> {
733        Some((self.value_start(n)?, self.value_end(n)?))
734    }
735
736    // Returns the length of all keys and values between [start, end)
737    pub(crate) fn length_of_pairs(&self, start: usize, end: usize) -> usize {
738        self.length_of_values(start, end) + self.length_of_keys(start, end)
739    }
740
741    fn length_of_values(&self, start: usize, end: usize) -> usize {
742        if end == 0 {
743            return 0;
744        }
745        let end_offset = self.value_end(end - 1).unwrap();
746        let start_offset = self.value_start(start).unwrap();
747        end_offset - start_offset
748    }
749
750    // Returns the length of all keys between [start, end)
751    pub(crate) fn length_of_keys(&self, start: usize, end: usize) -> usize {
752        if end == 0 {
753            return 0;
754        }
755        let end_offset = self.key_end(end - 1).unwrap();
756        let start_offset = self.key_start(start).unwrap();
757        end_offset - start_offset
758    }
759
760    pub(crate) fn total_length(&self) -> usize {
761        // Values are stored last
762        self.value_end(self.num_pairs() - 1).unwrap()
763    }
764
765    fn key_unchecked(&self, n: usize) -> &[u8] {
766        &self.page[self.key_start(n).unwrap()..self.key_end(n).unwrap()]
767    }
768
769    pub(crate) fn entry(&self, n: usize) -> Option<EntryAccessor<'a>> {
770        let key = &self.page[self.key_start(n)?..self.key_end(n)?];
771        let value = &self.page[self.value_start(n)?..self.value_end(n)?];
772        Some(EntryAccessor::new(key, value))
773    }
774
775    pub(crate) fn entry_ranges(&self, n: usize) -> Option<(Range<usize>, Range<usize>)> {
776        let key = self.key_start(n)?..self.key_end(n)?;
777        let value = self.value_start(n)?..self.value_end(n)?;
778        Some((key, value))
779    }
780
781    pub(super) fn last_entry(&self) -> EntryAccessor<'a> {
782        self.entry(self.num_pairs() - 1).unwrap()
783    }
784}
785
786pub(super) struct LeafBuilder<'a, 'b> {
787    pairs: Vec<(&'a [u8], &'a [u8])>,
788    fixed_key_size: Option<usize>,
789    fixed_value_size: Option<usize>,
790    total_key_bytes: usize,
791    total_value_bytes: usize,
792    mem: &'b TransactionalMemory,
793    allocated_pages: &'b Mutex<PageTrackerPolicy>,
794}
795
796impl<'a, 'b> LeafBuilder<'a, 'b> {
797    pub(super) fn required_bytes(&self, num_pairs: usize, keys_values_bytes: usize) -> usize {
798        RawLeafBuilder::required_bytes(
799            num_pairs,
800            keys_values_bytes,
801            self.fixed_key_size,
802            self.fixed_value_size,
803        )
804    }
805
806    pub(super) fn new(
807        mem: &'b TransactionalMemory,
808        allocated_pages: &'b Mutex<PageTrackerPolicy>,
809        capacity: usize,
810        fixed_key_size: Option<usize>,
811        fixed_value_size: Option<usize>,
812    ) -> Self {
813        Self {
814            pairs: Vec::with_capacity(capacity),
815            fixed_key_size,
816            fixed_value_size,
817            total_key_bytes: 0,
818            total_value_bytes: 0,
819            mem,
820            allocated_pages,
821        }
822    }
823
824    pub(super) fn push(&mut self, key: &'a [u8], value: &'a [u8]) {
825        self.total_key_bytes += key.len();
826        self.total_value_bytes += value.len();
827        self.pairs.push((key, value));
828    }
829
830    pub(super) fn push_all_except(
831        &mut self,
832        accessor: &'a LeafAccessor<'_>,
833        except: Option<usize>,
834    ) {
835        for i in 0..accessor.num_pairs() {
836            if let Some(except) = except
837                && except == i
838            {
839                continue;
840            }
841            let entry = accessor.entry(i).unwrap();
842            self.push(entry.key(), entry.value());
843        }
844    }
845
846    pub(super) fn should_split(&self) -> bool {
847        let required_size = self.required_bytes(
848            self.pairs.len(),
849            self.total_key_bytes + self.total_value_bytes,
850        );
851        required_size > self.mem.get_page_size() && self.pairs.len() > 1
852    }
853
854    pub(super) fn build_split(self) -> Result<(PageMut, &'a [u8], PageMut)> {
855        let total_size = self.total_key_bytes + self.total_value_bytes;
856        let mut division = 0;
857        let mut first_split_key_bytes = 0;
858        let mut first_split_value_bytes = 0;
859        for (key, value) in self.pairs.iter().take(self.pairs.len() - 1) {
860            first_split_key_bytes += key.len();
861            first_split_value_bytes += value.len();
862            division += 1;
863            if first_split_key_bytes + first_split_value_bytes >= total_size / 2 {
864                break;
865            }
866        }
867
868        let required_size =
869            self.required_bytes(division, first_split_key_bytes + first_split_value_bytes);
870        let mut allocated_pages = self.allocated_pages.lock();
871        let mut page1 = self.mem.allocate(required_size, &mut allocated_pages)?;
872        let mut builder = RawLeafBuilder::new(
873            page1.memory_mut()?,
874            division,
875            self.fixed_key_size,
876            self.fixed_value_size,
877            first_split_key_bytes,
878        );
879        for (key, value) in self.pairs.iter().take(division) {
880            builder.append(key, value);
881        }
882        drop(builder);
883
884        let required_size = self.required_bytes(
885            self.pairs.len() - division,
886            self.total_key_bytes + self.total_value_bytes
887                - first_split_key_bytes
888                - first_split_value_bytes,
889        );
890        let mut page2 = self.mem.allocate(required_size, &mut allocated_pages)?;
891        let mut builder = RawLeafBuilder::new(
892            page2.memory_mut()?,
893            self.pairs.len() - division,
894            self.fixed_key_size,
895            self.fixed_value_size,
896            self.total_key_bytes - first_split_key_bytes,
897        );
898        for (key, value) in &self.pairs[division..] {
899            builder.append(key, value);
900        }
901        drop(builder);
902
903        Ok((page1, self.pairs[division - 1].0, page2))
904    }
905
906    pub(super) fn build(self) -> Result<PageMut> {
907        let required_size = self.required_bytes(
908            self.pairs.len(),
909            self.total_key_bytes + self.total_value_bytes,
910        );
911        let mut allocated_pages = self.allocated_pages.lock();
912        let mut page = self.mem.allocate(required_size, &mut allocated_pages)?;
913        let mut builder = RawLeafBuilder::new(
914            page.memory_mut()?,
915            self.pairs.len(),
916            self.fixed_key_size,
917            self.fixed_value_size,
918            self.total_key_bytes,
919        );
920        for (key, value) in self.pairs {
921            builder.append(key, value);
922        }
923        drop(builder);
924        Ok(page)
925    }
926}
927
928// Note the caller is responsible for ensuring that the buffer is large enough
929// and rewriting all fields if any dynamically sized fields are written
930// Layout is:
931// 1 byte: type
932// 1 byte: reserved (padding to 32bits aligned)
933// 2 bytes: num_entries (number of pairs)
934// (optional) repeating (num_entries times):
935// 4 bytes: key_end
936// (optional) repeating (num_entries times):
937// 4 bytes: value_end
938// repeating (num_entries times):
939// * n bytes: key data
940// repeating (num_entries times):
941// * n bytes: value data
942pub(crate) struct RawLeafBuilder<'a> {
943    page: &'a mut [u8],
944    fixed_key_size: Option<usize>,
945    fixed_value_size: Option<usize>,
946    num_pairs: usize,
947    provisioned_key_bytes: usize,
948    pairs_written: usize, // used for debugging
949}
950
951impl<'a> RawLeafBuilder<'a> {
952    pub(crate) fn required_bytes(
953        num_pairs: usize,
954        keys_values_bytes: usize,
955        key_size: Option<usize>,
956        value_size: Option<usize>,
957    ) -> usize {
958        // Page id & header;
959        let mut result = 4;
960        // key & value lengths
961        if key_size.is_none() {
962            result += num_pairs * size_of::<u32>();
963        }
964        if value_size.is_none() {
965            result += num_pairs * size_of::<u32>();
966        }
967        result += keys_values_bytes;
968
969        result
970    }
971
972    pub(crate) fn new(
973        page: &'a mut [u8],
974        num_pairs: usize,
975        fixed_key_size: Option<usize>,
976        fixed_value_size: Option<usize>,
977        key_bytes: usize,
978    ) -> Self {
979        page[0] = LEAF;
980        page[2..4].copy_from_slice(&u16::try_from(num_pairs).unwrap().to_le_bytes());
981        #[cfg(debug_assertions)]
982        {
983            // Poison all the key & value offsets, in case the caller forgets to write them
984            let mut last = 4;
985            if fixed_key_size.is_none() {
986                last += size_of::<u32>() * num_pairs;
987            }
988            if fixed_value_size.is_none() {
989                last += size_of::<u32>() * num_pairs;
990            }
991            for x in &mut page[4..last] {
992                *x = 0xFF;
993            }
994        }
995        RawLeafBuilder {
996            page,
997            fixed_key_size,
998            fixed_value_size,
999            num_pairs,
1000            provisioned_key_bytes: key_bytes,
1001            pairs_written: 0,
1002        }
1003    }
1004
1005    fn value_end(&self, n: usize) -> usize {
1006        if let Some(fixed) = self.fixed_value_size {
1007            return self.key_section_start() + self.provisioned_key_bytes + fixed * (n + 1);
1008        }
1009        let mut offset = 4 + size_of::<u32>() * n;
1010        if self.fixed_key_size.is_none() {
1011            offset += size_of::<u32>() * self.num_pairs;
1012        }
1013        u32::from_le_bytes(
1014            self.page[offset..(offset + size_of::<u32>())]
1015                .try_into()
1016                .unwrap(),
1017        ) as usize
1018    }
1019
1020    fn key_section_start(&self) -> usize {
1021        let mut offset = 4;
1022        if self.fixed_key_size.is_none() {
1023            offset += size_of::<u32>() * self.num_pairs;
1024        }
1025        if self.fixed_value_size.is_none() {
1026            offset += size_of::<u32>() * self.num_pairs;
1027        }
1028
1029        offset
1030    }
1031
1032    fn key_end(&self, n: usize) -> usize {
1033        if let Some(fixed) = self.fixed_key_size {
1034            return self.key_section_start() + fixed * (n + 1);
1035        }
1036        let offset = 4 + size_of::<u32>() * n;
1037        u32::from_le_bytes(
1038            self.page[offset..(offset + size_of::<u32>())]
1039                .try_into()
1040                .unwrap(),
1041        ) as usize
1042    }
1043
1044    pub(crate) fn append(&mut self, key: &[u8], value: &[u8]) {
1045        if let Some(key_width) = self.fixed_key_size {
1046            assert_eq!(key_width, key.len());
1047        }
1048        if let Some(value_width) = self.fixed_value_size {
1049            assert_eq!(value_width, value.len());
1050        }
1051        let key_offset = if self.pairs_written == 0 {
1052            self.key_section_start()
1053        } else {
1054            self.key_end(self.pairs_written - 1)
1055        };
1056        let value_offset = if self.pairs_written == 0 {
1057            self.key_section_start() + self.provisioned_key_bytes
1058        } else {
1059            self.value_end(self.pairs_written - 1)
1060        };
1061
1062        let n = self.pairs_written;
1063        if self.fixed_key_size.is_none() {
1064            let offset = 4 + size_of::<u32>() * n;
1065            self.page[offset..(offset + size_of::<u32>())]
1066                .copy_from_slice(&u32::try_from(key_offset + key.len()).unwrap().to_le_bytes());
1067        }
1068        self.page[key_offset..(key_offset + key.len())].copy_from_slice(key);
1069        let written_key_len = key_offset + key.len() - self.key_section_start();
1070        assert!(written_key_len <= self.provisioned_key_bytes);
1071
1072        if self.fixed_value_size.is_none() {
1073            let mut offset = 4 + size_of::<u32>() * n;
1074            if self.fixed_key_size.is_none() {
1075                offset += size_of::<u32>() * self.num_pairs;
1076            }
1077            self.page[offset..(offset + size_of::<u32>())].copy_from_slice(
1078                &u32::try_from(value_offset + value.len())
1079                    .unwrap()
1080                    .to_le_bytes(),
1081            );
1082        }
1083        self.page[value_offset..(value_offset + value.len())].copy_from_slice(value);
1084        self.pairs_written += 1;
1085    }
1086}
1087
1088impl Drop for RawLeafBuilder<'_> {
1089    fn drop(&mut self) {
1090        let is_panicking = {
1091            #[cfg(feature = "std")]
1092            {
1093                std::thread::panicking()
1094            }
1095            #[cfg(not(feature = "std"))]
1096            {
1097                false
1098            }
1099        };
1100        if !is_panicking {
1101            assert_eq!(self.pairs_written, self.num_pairs);
1102            if self.num_pairs > 0 {
1103                assert_eq!(
1104                    self.key_section_start() + self.provisioned_key_bytes,
1105                    self.key_end(self.num_pairs - 1)
1106                );
1107            }
1108        }
1109    }
1110}
1111
1112pub(crate) struct LeafMutator<'b> {
1113    page: &'b mut [u8],
1114    fixed_key_size: Option<usize>,
1115    fixed_value_size: Option<usize>,
1116}
1117
1118impl<'b> LeafMutator<'b> {
1119    pub(crate) fn new(
1120        page: &'b mut [u8],
1121        fixed_key_size: Option<usize>,
1122        fixed_value_size: Option<usize>,
1123    ) -> Self {
1124        assert_eq!(page[0], LEAF);
1125        Self {
1126            page,
1127            fixed_key_size,
1128            fixed_value_size,
1129        }
1130    }
1131
1132    pub(super) fn sufficient_insert_inplace_space(
1133        page: &'_ impl Page,
1134        position: usize,
1135        overwrite: bool,
1136        fixed_key_size: Option<usize>,
1137        fixed_value_size: Option<usize>,
1138        new_key: &[u8],
1139        new_value: &[u8],
1140    ) -> bool {
1141        let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size)
1142            .expect("internal: constructed page is valid");
1143        if overwrite {
1144            let remaining = page.memory().len() - accessor.total_length();
1145            let required_delta = isize::try_from(new_key.len() + new_value.len()).unwrap()
1146                - isize::try_from(accessor.length_of_pairs(position, position + 1)).unwrap();
1147            required_delta <= isize::try_from(remaining).unwrap()
1148        } else {
1149            // If this is a large page, only allow in-place appending to avoid write amplification
1150            //
1151            // Note: this check is also required to avoid inserting an unbounded number of small values
1152            // into a large page, which could result in overflowing a u16 which is the maximum number of entries per leaf
1153            if page.get_page_number().page_order > 0 && position < accessor.num_pairs() {
1154                return false;
1155            }
1156            let remaining = page.memory().len() - accessor.total_length();
1157            let mut required_delta = new_key.len() + new_value.len();
1158            if fixed_key_size.is_none() {
1159                required_delta += size_of::<u32>();
1160            }
1161            if fixed_value_size.is_none() {
1162                required_delta += size_of::<u32>();
1163            }
1164            required_delta <= remaining
1165        }
1166    }
1167
1168    // Insert the given key, value pair at index i and shift all following pairs to the right
1169    pub(crate) fn insert(&mut self, i: usize, overwrite: bool, key: &[u8], value: &[u8]) {
1170        let accessor = LeafAccessor::new(self.page, self.fixed_key_size, self.fixed_value_size)
1171            .expect("internal: constructed page is valid");
1172        let required_delta = if overwrite {
1173            isize::try_from(key.len() + value.len()).unwrap()
1174                - isize::try_from(accessor.length_of_pairs(i, i + 1)).unwrap()
1175        } else {
1176            let mut delta = key.len() + value.len();
1177            if self.fixed_key_size.is_none() {
1178                delta += size_of::<u32>();
1179            }
1180            if self.fixed_value_size.is_none() {
1181                delta += size_of::<u32>();
1182            }
1183            delta.try_into().unwrap()
1184        };
1185        assert!(
1186            isize::try_from(accessor.total_length()).unwrap() + required_delta
1187                <= isize::try_from(self.page.len()).unwrap()
1188        );
1189
1190        let num_pairs = accessor.num_pairs();
1191        let last_key_end = accessor.key_end(accessor.num_pairs() - 1).unwrap();
1192        let last_value_end = accessor.value_end(accessor.num_pairs() - 1).unwrap();
1193        let shift_index = if overwrite { i + 1 } else { i };
1194        let shift_key_start = accessor.key_start(shift_index).unwrap_or(last_key_end);
1195        let shift_value_start = accessor.value_start(shift_index).unwrap_or(last_value_end);
1196        let existing_value_len = accessor
1197            .value_range(i)
1198            .map(|(start, end)| end - start)
1199            .unwrap_or_default();
1200
1201        let value_delta = if overwrite {
1202            isize::try_from(value.len()).unwrap() - isize::try_from(existing_value_len).unwrap()
1203        } else {
1204            value.len().try_into().unwrap()
1205        };
1206
1207        // Update all the pointers
1208        let key_ptr_size: usize = if self.fixed_key_size.is_none() { 4 } else { 0 };
1209        let value_ptr_size: usize = if self.fixed_value_size.is_none() {
1210            4
1211        } else {
1212            0
1213        };
1214        if !overwrite {
1215            for j in 0..i {
1216                self.update_key_end(j, (key_ptr_size + value_ptr_size).try_into().unwrap());
1217                let value_delta: isize = (key_ptr_size + value_ptr_size + key.len())
1218                    .try_into()
1219                    .unwrap();
1220                self.update_value_end(j, value_delta);
1221            }
1222        }
1223        for j in i..num_pairs {
1224            if overwrite {
1225                self.update_value_end(j, value_delta);
1226            } else {
1227                let key_delta: isize = (key_ptr_size + value_ptr_size + key.len())
1228                    .try_into()
1229                    .unwrap();
1230                self.update_key_end(j, key_delta);
1231                let value_delta = key_delta + isize::try_from(value.len()).unwrap();
1232                self.update_value_end(j, value_delta);
1233            }
1234        }
1235
1236        let new_num_pairs = if overwrite { num_pairs } else { num_pairs + 1 };
1237        self.page[2..4].copy_from_slice(&u16::try_from(new_num_pairs).unwrap().to_le_bytes());
1238
1239        // Right shift the trailing values
1240        let mut dest = if overwrite {
1241            (isize::try_from(shift_value_start).unwrap() + value_delta)
1242                .try_into()
1243                .unwrap()
1244        } else {
1245            shift_value_start + key_ptr_size + value_ptr_size + key.len() + value.len()
1246        };
1247        let start = shift_value_start;
1248        let end = last_value_end;
1249        self.page.copy_within(start..end, dest);
1250
1251        // Insert the value
1252        let inserted_value_end: u32 = dest.try_into().unwrap();
1253        dest -= value.len();
1254        self.page[dest..(dest + value.len())].copy_from_slice(value);
1255
1256        if !overwrite {
1257            // Right shift the trailing key data & preceding value data
1258            let start = shift_key_start;
1259            let end = shift_value_start;
1260            dest -= end - start;
1261            self.page.copy_within(start..end, dest);
1262
1263            // Insert the key
1264            let inserted_key_end: u32 = dest.try_into().unwrap();
1265            dest -= key.len();
1266            self.page[dest..(dest + key.len())].copy_from_slice(key);
1267
1268            // Right shift the trailing value pointers & preceding key data
1269            let start = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
1270            let end = shift_key_start;
1271            dest -= end - start;
1272            debug_assert_eq!(
1273                dest,
1274                4 + key_ptr_size * new_num_pairs + value_ptr_size * (i + 1)
1275            );
1276            self.page.copy_within(start..end, dest);
1277
1278            // Insert the value pointer
1279            if self.fixed_value_size.is_none() {
1280                dest -= size_of::<u32>();
1281                self.page[dest..(dest + size_of::<u32>())]
1282                    .copy_from_slice(&inserted_value_end.to_le_bytes());
1283            }
1284
1285            // Right shift the trailing key pointers & preceding value pointers
1286            let start = 4 + key_ptr_size * i;
1287            let end = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
1288            dest -= end - start;
1289            debug_assert_eq!(dest, 4 + key_ptr_size * (i + 1));
1290            self.page.copy_within(start..end, dest);
1291
1292            // Insert the key pointer
1293            if self.fixed_key_size.is_none() {
1294                dest -= size_of::<u32>();
1295                self.page[dest..(dest + size_of::<u32>())]
1296                    .copy_from_slice(&inserted_key_end.to_le_bytes());
1297            }
1298            debug_assert_eq!(dest, 4 + key_ptr_size * i);
1299        }
1300    }
1301
1302    pub(super) fn remove(&mut self, i: usize) {
1303        let accessor = LeafAccessor::new(self.page, self.fixed_key_size, self.fixed_value_size)
1304            .expect("internal: constructed page is valid");
1305        let num_pairs = accessor.num_pairs();
1306        assert!(i < num_pairs);
1307        assert!(num_pairs > 1);
1308        let key_start = accessor.key_start(i).unwrap();
1309        let key_end = accessor.key_end(i).unwrap();
1310        let value_start = accessor.value_start(i).unwrap();
1311        let value_end = accessor.value_end(i).unwrap();
1312        let last_value_end = accessor.value_end(accessor.num_pairs() - 1).unwrap();
1313
1314        // Update all the pointers
1315        let key_ptr_size = if self.fixed_key_size.is_none() {
1316            size_of::<u32>()
1317        } else {
1318            0
1319        };
1320        let value_ptr_size = if self.fixed_value_size.is_none() {
1321            size_of::<u32>()
1322        } else {
1323            0
1324        };
1325        for j in 0..i {
1326            self.update_key_end(j, -isize::try_from(key_ptr_size + value_ptr_size).unwrap());
1327            let value_delta = -isize::try_from(key_ptr_size + value_ptr_size).unwrap()
1328                - isize::try_from(key_end - key_start).unwrap();
1329            self.update_value_end(j, value_delta);
1330        }
1331        for j in (i + 1)..num_pairs {
1332            let key_delta = -isize::try_from(key_ptr_size + value_ptr_size).unwrap()
1333                - isize::try_from(key_end - key_start).unwrap();
1334            self.update_key_end(j, key_delta);
1335            let value_delta = key_delta - isize::try_from(value_end - value_start).unwrap();
1336            self.update_value_end(j, value_delta);
1337        }
1338
1339        // Left shift all the pointers & data
1340
1341        let new_num_pairs = num_pairs - 1;
1342        self.page[2..4].copy_from_slice(&u16::try_from(new_num_pairs).unwrap().to_le_bytes());
1343        // Left shift the trailing key pointers & preceding value pointers
1344        let mut dest = 4 + key_ptr_size * i;
1345        // First trailing key pointer
1346        let start = 4 + key_ptr_size * (i + 1);
1347        // Last preceding value pointer
1348        let end = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
1349        self.page.copy_within(start..end, dest);
1350        dest += end - start;
1351        debug_assert_eq!(dest, 4 + key_ptr_size * new_num_pairs + value_ptr_size * i);
1352
1353        // Left shift the trailing value pointers & preceding key data
1354        let start = 4 + key_ptr_size * num_pairs + value_ptr_size * (i + 1);
1355        let end = key_start;
1356        self.page.copy_within(start..end, dest);
1357        dest += end - start;
1358
1359        let preceding_key_len = key_start - (4 + (key_ptr_size + value_ptr_size) * num_pairs);
1360        debug_assert_eq!(
1361            dest,
1362            4 + (key_ptr_size + value_ptr_size) * new_num_pairs + preceding_key_len
1363        );
1364
1365        // Left shift the trailing key data & preceding value data
1366        let start = key_end;
1367        let end = value_start;
1368        self.page.copy_within(start..end, dest);
1369        dest += end - start;
1370
1371        // Left shift the trailing value data
1372        let preceding_data_len =
1373            value_start - (4 + (key_ptr_size + value_ptr_size) * num_pairs) - (key_end - key_start);
1374        debug_assert_eq!(
1375            dest,
1376            4 + (key_ptr_size + value_ptr_size) * new_num_pairs + preceding_data_len
1377        );
1378        let start = value_end;
1379        let end = last_value_end;
1380        self.page.copy_within(start..end, dest);
1381    }
1382
1383    fn update_key_end(&mut self, i: usize, delta: isize) {
1384        if self.fixed_key_size.is_some() {
1385            return;
1386        }
1387        let offset = 4 + size_of::<u32>() * i;
1388        let mut ptr = u32::from_le_bytes(
1389            self.page[offset..(offset + size_of::<u32>())]
1390                .try_into()
1391                .unwrap(),
1392        );
1393        ptr = (isize::try_from(ptr).unwrap() + delta).try_into().unwrap();
1394        self.page[offset..(offset + size_of::<u32>())].copy_from_slice(&ptr.to_le_bytes());
1395    }
1396
1397    fn update_value_end(&mut self, i: usize, delta: isize) {
1398        if self.fixed_value_size.is_some() {
1399            return;
1400        }
1401        let accessor = LeafAccessor::new(self.page, self.fixed_key_size, self.fixed_value_size)
1402            .expect("internal: constructed page is valid");
1403        let num_pairs = accessor.num_pairs();
1404        let mut offset = 4 + size_of::<u32>() * i;
1405        if self.fixed_key_size.is_none() {
1406            offset += size_of::<u32>() * num_pairs;
1407        }
1408        let mut ptr = u32::from_le_bytes(
1409            self.page[offset..(offset + size_of::<u32>())]
1410                .try_into()
1411                .unwrap(),
1412        );
1413        ptr = (isize::try_from(ptr).unwrap() + delta).try_into().unwrap();
1414        self.page[offset..(offset + size_of::<u32>())].copy_from_slice(&ptr.to_le_bytes());
1415    }
1416}
1417
1418// Provides a simple zero-copy way to access a branch page
1419// TODO: this should be pub(super) and the multimap btree stuff should be moved into this package
1420pub(crate) struct BranchAccessor<'a: 'b, 'b, T: Page + 'a> {
1421    page: &'b T,
1422    num_keys: usize,
1423    fixed_key_size: Option<usize>,
1424    _page_lifetime: PhantomData<&'a ()>,
1425}
1426
1427impl<'a: 'b, 'b, T: Page + 'a> BranchAccessor<'a, 'b, T> {
1428    pub(crate) fn new(
1429        page: &'b T,
1430        fixed_key_size: Option<usize>,
1431    ) -> core::result::Result<Self, StorageError> {
1432        let mem = page.memory();
1433        if mem.len() < 8 {
1434            return Err(StorageError::Corrupted(
1435                "Branch page too small for header".into(),
1436            ));
1437        }
1438        debug_assert_eq!(mem[0], BRANCH);
1439        let num_keys = u16::from_le_bytes(
1440            mem[2..4]
1441                .try_into()
1442                .map_err(|_| StorageError::Corrupted("Branch page: bad num_keys".into()))?,
1443        ) as usize;
1444        let num_children = num_keys + 1;
1445        let child_table_size =
1446            (PageNumber::serialized_size() + size_of::<Checksum>()) * num_children;
1447        let mut min_size = 8usize.saturating_add(child_table_size);
1448        if fixed_key_size.is_none() {
1449            min_size = min_size.saturating_add(size_of::<u32>().saturating_mul(num_keys));
1450        }
1451        if min_size > mem.len() {
1452            return Err(StorageError::Corrupted(format!(
1453                "Branch page: num_keys={num_keys} requires {min_size}B for headers, page is {}B",
1454                mem.len()
1455            )));
1456        }
1457        Ok(BranchAccessor {
1458            page,
1459            num_keys,
1460            fixed_key_size,
1461            _page_lifetime: Default::default(),
1462        })
1463    }
1464
1465    #[cfg(feature = "std")]
1466    pub(super) fn print_node<K: Key>(&self) {
1467        eprint!(
1468            "Internal[ (page={:?}), child_0={:?}",
1469            self.page.get_page_number(),
1470            self.child_page(0).unwrap()
1471        );
1472        for i in 0..(self.count_children() - 1) {
1473            if let Some(child) = self.child_page(i + 1) {
1474                let key = self.key(i).unwrap();
1475                eprint!(" key_{i}={:?}", K::from_bytes(key));
1476                eprint!(" child_{}={child:?}", i + 1);
1477            }
1478        }
1479        eprint!("]");
1480    }
1481
1482    pub(crate) fn total_length(&self) -> usize {
1483        // Keys are stored at the end
1484        self.key_end(self.num_keys() - 1).unwrap()
1485    }
1486
1487    pub(super) fn child_for_key<K: Key>(&self, query: &[u8]) -> (usize, PageNumber) {
1488        let mut min_child = 0; // inclusive
1489        let mut max_child = self.num_keys(); // inclusive
1490        while min_child < max_child {
1491            let mid = min_child.midpoint(max_child);
1492            match K::compare(query, self.key(mid).unwrap()) {
1493                Ordering::Less => {
1494                    max_child = mid;
1495                }
1496                Ordering::Equal => {
1497                    return (mid, self.child_page(mid).unwrap());
1498                }
1499                Ordering::Greater => {
1500                    min_child = mid + 1;
1501                }
1502            }
1503        }
1504        debug_assert_eq!(min_child, max_child);
1505
1506        (min_child, self.child_page(min_child).unwrap())
1507    }
1508
1509    fn key_section_start(&self) -> usize {
1510        if self.fixed_key_size.is_none() {
1511            8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1512                + size_of::<u32>() * self.num_keys()
1513        } else {
1514            8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1515        }
1516    }
1517
1518    fn key_offset(&self, n: usize) -> usize {
1519        if n == 0 {
1520            self.key_section_start()
1521        } else {
1522            self.key_end(n - 1).unwrap()
1523        }
1524    }
1525
1526    fn key_end(&self, n: usize) -> Option<usize> {
1527        if let Some(fixed) = self.fixed_key_size {
1528            return Some(self.key_section_start() + fixed * (n + 1));
1529        }
1530        let offset = 8
1531            + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1532            + size_of::<u32>() * n;
1533        Some(u32::from_le_bytes(
1534            self.page
1535                .memory()
1536                .get(offset..(offset + size_of::<u32>()))?
1537                .try_into()
1538                .unwrap(),
1539        ) as usize)
1540    }
1541
1542    pub(super) fn key(&self, n: usize) -> Option<&[u8]> {
1543        if n >= self.num_keys() {
1544            return None;
1545        }
1546        let offset = self.key_offset(n);
1547        let end = self.key_end(n)?;
1548        Some(&self.page.memory()[offset..end])
1549    }
1550
1551    pub(crate) fn count_children(&self) -> usize {
1552        self.num_keys() + 1
1553    }
1554
1555    pub(crate) fn child_checksum(&self, n: usize) -> Option<Checksum> {
1556        if n >= self.count_children() {
1557            return None;
1558        }
1559
1560        let offset = 8 + size_of::<Checksum>() * n;
1561        Some(Checksum::from_le_bytes(
1562            self.page.memory()[offset..(offset + size_of::<Checksum>())]
1563                .try_into()
1564                .unwrap(),
1565        ))
1566    }
1567
1568    pub(crate) fn child_page(&self, n: usize) -> Option<PageNumber> {
1569        if n >= self.count_children() {
1570            return None;
1571        }
1572
1573        let offset =
1574            8 + size_of::<Checksum>() * self.count_children() + PageNumber::serialized_size() * n;
1575        Some(PageNumber::from_le_bytes(
1576            self.page.memory()[offset..(offset + PageNumber::serialized_size())]
1577                .try_into()
1578                .unwrap(),
1579        ))
1580    }
1581
1582    fn num_keys(&self) -> usize {
1583        self.num_keys
1584    }
1585}
1586
1587pub(super) struct BranchBuilder<'a, 'b> {
1588    children: Vec<(PageNumber, Checksum)>,
1589    keys: Vec<&'a [u8]>,
1590    total_key_bytes: usize,
1591    fixed_key_size: Option<usize>,
1592    mem: &'b TransactionalMemory,
1593    allocated_pages: &'b Mutex<PageTrackerPolicy>,
1594}
1595
1596impl<'a, 'b> BranchBuilder<'a, 'b> {
1597    pub(super) fn new(
1598        mem: &'b TransactionalMemory,
1599        allocated_pages: &'b Mutex<PageTrackerPolicy>,
1600        child_capacity: usize,
1601        fixed_key_size: Option<usize>,
1602    ) -> Self {
1603        Self {
1604            children: Vec::with_capacity(child_capacity),
1605            keys: Vec::with_capacity(child_capacity - 1),
1606            total_key_bytes: 0,
1607            fixed_key_size,
1608            mem,
1609            allocated_pages,
1610        }
1611    }
1612
1613    pub(super) fn replace_child(&mut self, index: usize, child: PageNumber, checksum: Checksum) {
1614        self.children[index] = (child, checksum);
1615    }
1616
1617    pub(super) fn push_child(&mut self, child: PageNumber, checksum: Checksum) {
1618        self.children.push((child, checksum));
1619    }
1620
1621    pub(super) fn push_key(&mut self, key: &'a [u8]) {
1622        self.keys.push(key);
1623        self.total_key_bytes += key.len();
1624    }
1625
1626    pub(super) fn push_all<T: Page>(&mut self, accessor: &'a BranchAccessor<'_, '_, T>) {
1627        for i in 0..accessor.count_children() {
1628            let child = accessor.child_page(i).unwrap();
1629            let checksum = accessor.child_checksum(i).unwrap();
1630            self.push_child(child, checksum);
1631        }
1632        for i in 0..(accessor.count_children() - 1) {
1633            self.push_key(accessor.key(i).unwrap());
1634        }
1635    }
1636
1637    pub(super) fn to_single_child(&self) -> Option<(PageNumber, Checksum)> {
1638        if self.children.len() > 1 {
1639            None
1640        } else {
1641            Some(self.children[0])
1642        }
1643    }
1644
1645    pub(super) fn build(self) -> Result<PageMut> {
1646        assert_eq!(self.children.len(), self.keys.len() + 1);
1647        let size = RawBranchBuilder::required_bytes(
1648            self.keys.len(),
1649            self.total_key_bytes,
1650            self.fixed_key_size,
1651        );
1652        let mut allocated_pages = self.allocated_pages.lock();
1653        let mut page = self.mem.allocate(size, &mut allocated_pages)?;
1654        let mut builder =
1655            RawBranchBuilder::new(page.memory_mut()?, self.keys.len(), self.fixed_key_size);
1656        builder.write_first_page(self.children[0].0, self.children[0].1);
1657        for i in 1..self.children.len() {
1658            let key = &self.keys[i - 1];
1659            builder.write_nth_key(key.as_ref(), self.children[i].0, self.children[i].1, i - 1);
1660        }
1661        drop(builder);
1662
1663        Ok(page)
1664    }
1665
1666    pub(super) fn should_split(&self) -> bool {
1667        let size = RawBranchBuilder::required_bytes(
1668            self.keys.len(),
1669            self.total_key_bytes,
1670            self.fixed_key_size,
1671        );
1672        size > self.mem.get_page_size() && self.keys.len() >= 3
1673    }
1674
1675    pub(super) fn build_split(self) -> Result<(PageMut, &'a [u8], PageMut)> {
1676        let mut allocated_pages = self.allocated_pages.lock();
1677        assert_eq!(self.children.len(), self.keys.len() + 1);
1678        assert!(self.keys.len() >= 3);
1679        let division = self.keys.len() / 2;
1680        let first_split_key_len: usize = self.keys.iter().take(division).map(|k| k.len()).sum();
1681        let division_key = self.keys[division];
1682        let second_split_key_len = self.total_key_bytes - first_split_key_len - division_key.len();
1683
1684        let size =
1685            RawBranchBuilder::required_bytes(division, first_split_key_len, self.fixed_key_size);
1686        let mut page1 = self.mem.allocate(size, &mut allocated_pages)?;
1687        let mut builder = RawBranchBuilder::new(page1.memory_mut()?, division, self.fixed_key_size);
1688        builder.write_first_page(self.children[0].0, self.children[0].1);
1689        for i in 0..division {
1690            let key = &self.keys[i];
1691            builder.write_nth_key(
1692                key.as_ref(),
1693                self.children[i + 1].0,
1694                self.children[i + 1].1,
1695                i,
1696            );
1697        }
1698        drop(builder);
1699
1700        let size = RawBranchBuilder::required_bytes(
1701            self.keys.len() - division - 1,
1702            second_split_key_len,
1703            self.fixed_key_size,
1704        );
1705        let mut page2 = self.mem.allocate(size, &mut allocated_pages)?;
1706        let mut builder = RawBranchBuilder::new(
1707            page2.memory_mut()?,
1708            self.keys.len() - division - 1,
1709            self.fixed_key_size,
1710        );
1711        builder.write_first_page(self.children[division + 1].0, self.children[division + 1].1);
1712        for i in (division + 1)..self.keys.len() {
1713            let key = &self.keys[i];
1714            builder.write_nth_key(
1715                key.as_ref(),
1716                self.children[i + 1].0,
1717                self.children[i + 1].1,
1718                i - division - 1,
1719            );
1720        }
1721        drop(builder);
1722
1723        Ok((page1, division_key, page2))
1724    }
1725}
1726
1727// Note the caller is responsible for ensuring that the buffer is large enough
1728// and rewriting all fields if any dynamically sized fields are written
1729// Layout is:
1730// 1 byte: type
1731// 1 byte: padding (padding to 16bits aligned)
1732// 2 bytes: num_keys (number of keys)
1733// 4 byte: padding (padding to 64bits aligned)
1734// repeating (num_keys + 1 times):
1735// 16 bytes: child page checksum
1736// repeating (num_keys + 1 times):
1737// 8 bytes: page number
1738// (optional) repeating (num_keys times):
1739// * 4 bytes: key end. Ending offset of the key, exclusive
1740// repeating (num_keys times):
1741// * n bytes: key data
1742pub(super) struct RawBranchBuilder<'b> {
1743    page: &'b mut [u8],
1744    fixed_key_size: Option<usize>,
1745    num_keys: usize,
1746    keys_written: usize, // used for debugging
1747}
1748
1749impl<'b> RawBranchBuilder<'b> {
1750    pub(super) fn required_bytes(
1751        num_keys: usize,
1752        size_of_keys: usize,
1753        fixed_key_size: Option<usize>,
1754    ) -> usize {
1755        if fixed_key_size.is_none() {
1756            let fixed_size = 8
1757                + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1)
1758                + size_of::<u32>() * num_keys;
1759            size_of_keys + fixed_size
1760        } else {
1761            let fixed_size =
1762                8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1);
1763            size_of_keys + fixed_size
1764        }
1765    }
1766
1767    // Caller MUST write num_keys values
1768    pub(super) fn new(page: &'b mut [u8], num_keys: usize, fixed_key_size: Option<usize>) -> Self {
1769        assert!(num_keys > 0);
1770        page[0] = BRANCH;
1771        page[2..4].copy_from_slice(&u16::try_from(num_keys).unwrap().to_le_bytes());
1772        #[cfg(debug_assertions)]
1773        {
1774            // Poison all the child pointers & key offsets, in case the caller forgets to write them
1775            let start = 8 + size_of::<Checksum>() * (num_keys + 1);
1776            let mut last =
1777                8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1);
1778            if fixed_key_size.is_none() {
1779                last += size_of::<u32>() * num_keys;
1780            }
1781            for x in &mut page[start..last] {
1782                *x = 0xFF;
1783            }
1784        }
1785        RawBranchBuilder {
1786            page,
1787            fixed_key_size,
1788            num_keys,
1789            keys_written: 0,
1790        }
1791    }
1792
1793    pub(super) fn write_first_page(&mut self, page_number: PageNumber, checksum: Checksum) {
1794        let offset = 8;
1795        self.page[offset..(offset + size_of::<Checksum>())]
1796            .copy_from_slice(&checksum.to_le_bytes());
1797        let offset = 8 + size_of::<Checksum>() * (self.num_keys + 1);
1798        self.page[offset..(offset + PageNumber::serialized_size())]
1799            .copy_from_slice(&page_number.to_le_bytes());
1800    }
1801
1802    fn key_section_start(&self) -> usize {
1803        let mut offset =
1804            8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1);
1805        if self.fixed_key_size.is_none() {
1806            offset += size_of::<u32>() * self.num_keys;
1807        }
1808
1809        offset
1810    }
1811
1812    fn key_end(&self, n: usize) -> usize {
1813        if let Some(fixed) = self.fixed_key_size {
1814            return self.key_section_start() + fixed * (n + 1);
1815        }
1816        let offset = 8
1817            + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1)
1818            + size_of::<u32>() * n;
1819        u32::from_le_bytes(
1820            self.page[offset..(offset + size_of::<u32>())]
1821                .try_into()
1822                .unwrap(),
1823        ) as usize
1824    }
1825
1826    // Write the nth key and page of values greater than this key, but less than or equal to the next
1827    // Caller must write keys & pages in increasing order
1828    pub(super) fn write_nth_key(
1829        &mut self,
1830        key: &[u8],
1831        page_number: PageNumber,
1832        checksum: Checksum,
1833        n: usize,
1834    ) {
1835        assert!(n < self.num_keys);
1836        assert_eq!(n, self.keys_written);
1837        self.keys_written += 1;
1838        let offset = 8 + size_of::<Checksum>() * (n + 1);
1839        self.page[offset..(offset + size_of::<Checksum>())]
1840            .copy_from_slice(&checksum.to_le_bytes());
1841        let offset = 8
1842            + size_of::<Checksum>() * (self.num_keys + 1)
1843            + PageNumber::serialized_size() * (n + 1);
1844        self.page[offset..(offset + PageNumber::serialized_size())]
1845            .copy_from_slice(&page_number.to_le_bytes());
1846
1847        let data_offset = if n > 0 {
1848            self.key_end(n - 1)
1849        } else {
1850            self.key_section_start()
1851        };
1852        if self.fixed_key_size.is_none() {
1853            let offset = 8
1854                + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1)
1855                + size_of::<u32>() * n;
1856            self.page[offset..(offset + size_of::<u32>())].copy_from_slice(
1857                &u32::try_from(data_offset + key.len())
1858                    .unwrap()
1859                    .to_le_bytes(),
1860            );
1861        }
1862
1863        debug_assert!(data_offset > offset);
1864        self.page[data_offset..(data_offset + key.len())].copy_from_slice(key);
1865    }
1866}
1867
1868impl Drop for RawBranchBuilder<'_> {
1869    fn drop(&mut self) {
1870        let is_panicking = {
1871            #[cfg(feature = "std")]
1872            {
1873                std::thread::panicking()
1874            }
1875            #[cfg(not(feature = "std"))]
1876            {
1877                false
1878            }
1879        };
1880        if !is_panicking {
1881            assert_eq!(self.keys_written, self.num_keys);
1882        }
1883    }
1884}
1885
1886pub(crate) struct BranchMutator<'b> {
1887    page: &'b mut [u8],
1888}
1889
1890impl<'b> BranchMutator<'b> {
1891    pub(crate) fn new(page: &'b mut [u8]) -> Self {
1892        assert_eq!(page[0], BRANCH);
1893        Self { page }
1894    }
1895
1896    fn num_keys(&self) -> usize {
1897        u16::from_le_bytes(self.page[2..4].try_into().unwrap()) as usize
1898    }
1899
1900    pub(crate) fn write_child_page(
1901        &mut self,
1902        i: usize,
1903        page_number: PageNumber,
1904        checksum: Checksum,
1905    ) {
1906        debug_assert!(i <= self.num_keys());
1907        let offset = 8 + size_of::<Checksum>() * i;
1908        self.page[offset..(offset + size_of::<Checksum>())]
1909            .copy_from_slice(&checksum.to_le_bytes());
1910        let offset =
1911            8 + size_of::<Checksum>() * (self.num_keys() + 1) + PageNumber::serialized_size() * i;
1912        self.page[offset..(offset + PageNumber::serialized_size())]
1913            .copy_from_slice(&page_number.to_le_bytes());
1914    }
1915}