redb_32bit/tree_store/
btree_base.rs

1use crate::tree_store::page_store::{
2    xxh3_checksum, CachePriority, Page, PageImpl, PageMut, TransactionalMemory,
3};
4use crate::tree_store::PageNumber;
5use crate::types::{MutInPlaceValue, RedbKey, RedbValue};
6use crate::{Result, StorageError};
7use std::cmp::Ordering;
8use std::marker::PhantomData;
9use std::mem::size_of;
10use std::ops::Range;
11use std::{mem, thread};
12
13pub(crate) const LEAF: u8 = 1;
14pub(crate) const BRANCH: u8 = 2;
15
16pub(crate) type Checksum = u128;
17// Dummy value. Final value will be computed during commit
18pub(crate) const DEFERRED: Checksum = 999;
19
20pub(super) fn leaf_checksum<T: Page>(
21    page: &T,
22    fixed_key_size: Option<usize>,
23    fixed_value_size: Option<usize>,
24) -> Result<Checksum, StorageError> {
25    let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size);
26    let end = accessor.value_end(accessor.num_pairs() - 1).unwrap();
27    if end > page.memory().len() {
28        Err(StorageError::Corrupted(format!(
29            "Leaf page {:?} corrupted. Last offset {} beyond end of data {}",
30            page.get_page_number(),
31            end,
32            page.memory().len()
33        )))
34    } else {
35        Ok(xxh3_checksum(&page.memory()[..end]))
36    }
37}
38
39pub(super) fn branch_checksum<T: Page>(
40    page: &T,
41    fixed_key_size: Option<usize>,
42) -> Result<Checksum, StorageError> {
43    let accessor = BranchAccessor::new(page, fixed_key_size);
44    let end = accessor.key_end(accessor.num_keys() - 1);
45    if end > page.memory().len() {
46        Err(StorageError::Corrupted(format!(
47            "Branch page {:?} corrupted. Last offset {} beyond end of data {}",
48            page.get_page_number(),
49            end,
50            page.memory().len()
51        )))
52    } else {
53        Ok(xxh3_checksum(&page.memory()[..end]))
54    }
55}
56
57enum OnDrop {
58    None,
59    Free(PageNumber),
60    RemoveEntry {
61        position: usize,
62        fixed_key_size: Option<usize>,
63    },
64}
65
66enum EitherPage<'a> {
67    Immutable(PageImpl<'a>),
68    Mutable(PageMut<'a>),
69    OwnedMemory(Vec<u8>),
70}
71
72impl<'a> EitherPage<'a> {
73    fn memory(&self) -> &[u8] {
74        match self {
75            EitherPage::Immutable(page) => page.memory(),
76            EitherPage::Mutable(page) => page.memory(),
77            EitherPage::OwnedMemory(mem) => mem.as_slice(),
78        }
79    }
80}
81
82pub struct AccessGuard<'a, V: RedbValue> {
83    page: EitherPage<'a>,
84    offset: usize,
85    len: usize,
86    on_drop: OnDrop,
87    mem: Option<&'a TransactionalMemory>,
88    _value_type: PhantomData<V>,
89}
90
91impl<'a, V: RedbValue> AccessGuard<'a, V> {
92    pub(super) fn new(
93        page: PageImpl<'a>,
94        offset: usize,
95        len: usize,
96        free_on_drop: bool,
97        mem: &'a TransactionalMemory,
98    ) -> Self {
99        let page_number = page.get_page_number();
100        Self {
101            page: EitherPage::Immutable(page),
102            offset,
103            len,
104            on_drop: if free_on_drop {
105                OnDrop::Free(page_number)
106            } else {
107                OnDrop::None
108            },
109            mem: Some(mem),
110            _value_type: Default::default(),
111        }
112    }
113
114    pub(crate) fn with_page(page: PageImpl<'a>, range: Range<usize>) -> Self {
115        Self {
116            page: EitherPage::Immutable(page),
117            offset: range.start,
118            len: range.len(),
119            on_drop: OnDrop::None,
120            mem: None,
121            _value_type: Default::default(),
122        }
123    }
124
125    pub(crate) fn with_owned_value(value: Vec<u8>) -> Self {
126        let len = value.len();
127        Self {
128            page: EitherPage::OwnedMemory(value),
129            offset: 0,
130            len,
131            on_drop: OnDrop::None,
132            mem: None,
133            _value_type: Default::default(),
134        }
135    }
136
137    pub(super) fn remove_on_drop(
138        page: PageMut<'a>,
139        offset: usize,
140        len: usize,
141        position: usize,
142        fixed_key_size: Option<usize>,
143        mem: &'a TransactionalMemory,
144    ) -> Self {
145        Self {
146            page: EitherPage::Mutable(page),
147            offset,
148            len,
149            on_drop: OnDrop::RemoveEntry {
150                position,
151                fixed_key_size,
152            },
153            mem: Some(mem),
154            _value_type: Default::default(),
155        }
156    }
157
158    pub fn value(&self) -> V::SelfType<'_> {
159        V::from_bytes(&self.page.memory()[self.offset..(self.offset + self.len)])
160    }
161}
162
163impl<'a, V: RedbValue> Drop for AccessGuard<'a, V> {
164    fn drop(&mut self) {
165        match self.on_drop {
166            OnDrop::None => {}
167            OnDrop::Free(page_number) => {
168                // Drop our reference to the page, so that it can be freed
169                let mut dummy = EitherPage::OwnedMemory(vec![]);
170                mem::swap(&mut self.page, &mut dummy);
171                drop(dummy);
172                self.mem.unwrap().free(page_number);
173            }
174            OnDrop::RemoveEntry {
175                position,
176                fixed_key_size,
177            } => {
178                if let EitherPage::Mutable(ref mut mut_page) = self.page {
179                    let mut mutator = LeafMutator::new(mut_page, fixed_key_size, V::fixed_width());
180                    mutator.remove(position);
181                } else if !thread::panicking() {
182                    unreachable!();
183                }
184            }
185        }
186    }
187}
188
189pub struct AccessGuardMut<'a, V: RedbValue> {
190    page: PageMut<'a>,
191    offset: usize,
192    len: usize,
193    _value_type: PhantomData<V>,
194}
195
196impl<'a, V: RedbValue> AccessGuardMut<'a, V> {
197    pub(crate) fn new(page: PageMut<'a>, offset: usize, len: usize) -> Self {
198        AccessGuardMut {
199            page,
200            offset,
201            len,
202            _value_type: Default::default(),
203        }
204    }
205}
206
207impl<'a, V: MutInPlaceValue> AsMut<V::BaseRefType> for AccessGuardMut<'a, V> {
208    fn as_mut(&mut self) -> &mut V::BaseRefType {
209        V::from_bytes_mut(&mut self.page.memory_mut()[self.offset..(self.offset + self.len)])
210    }
211}
212
213// Provides a simple zero-copy way to access entries
214pub struct EntryAccessor<'a> {
215    key: &'a [u8],
216    value: &'a [u8],
217}
218
219impl<'a> EntryAccessor<'a> {
220    fn new(key: &'a [u8], value: &'a [u8]) -> Self {
221        EntryAccessor { key, value }
222    }
223}
224
225impl<'a: 'b, 'b> EntryAccessor<'a> {
226    pub(crate) fn key(&'b self) -> &'a [u8] {
227        self.key
228    }
229
230    pub(crate) fn value(&'b self) -> &'a [u8] {
231        self.value
232    }
233}
234
235// Provides a simple zero-copy way to access a leaf page
236pub(crate) struct LeafAccessor<'a> {
237    page: &'a [u8],
238    fixed_key_size: Option<usize>,
239    fixed_value_size: Option<usize>,
240    num_pairs: usize,
241}
242
243impl<'a> LeafAccessor<'a> {
244    pub(crate) fn new(
245        page: &'a [u8],
246        fixed_key_size: Option<usize>,
247        fixed_value_size: Option<usize>,
248    ) -> Self {
249        debug_assert_eq!(page[0], LEAF);
250        let num_pairs = u16::from_le_bytes(page[2..4].try_into().unwrap()) as usize;
251        LeafAccessor {
252            page,
253            fixed_key_size,
254            fixed_value_size,
255            num_pairs,
256        }
257    }
258
259    pub(super) fn print_node<K: RedbKey, V: RedbValue>(&self, include_value: bool) {
260        let mut i = 0;
261        while let Some(entry) = self.entry(i) {
262            eprint!(" key_{}={:?}", i, K::from_bytes(entry.key()));
263            if include_value {
264                eprint!(" value_{}={:?}", i, V::from_bytes(entry.value()));
265            }
266            i += 1;
267        }
268    }
269
270    pub(crate) fn position<K: RedbKey>(&self, query: &[u8]) -> (usize, bool) {
271        // inclusive
272        let mut min_entry = 0;
273        // inclusive. Start past end, since it might be positioned beyond the end of the leaf
274        let mut max_entry = self.num_pairs();
275        while min_entry < max_entry {
276            let mid = (min_entry + max_entry) / 2;
277            let key = self.key_unchecked(mid);
278            match K::compare(query, key) {
279                Ordering::Less => {
280                    max_entry = mid;
281                }
282                Ordering::Equal => {
283                    return (mid, true);
284                }
285                Ordering::Greater => {
286                    min_entry = mid + 1;
287                }
288            }
289        }
290        debug_assert_eq!(min_entry, max_entry);
291        (min_entry, false)
292    }
293
294    pub(crate) fn find_key<K: RedbKey>(&self, query: &[u8]) -> Option<usize> {
295        let (entry, found) = self.position::<K>(query);
296        if found {
297            Some(entry)
298        } else {
299            None
300        }
301    }
302
303    fn key_section_start(&self) -> usize {
304        let mut offset = 4;
305        if self.fixed_key_size.is_none() {
306            offset += size_of::<u32>() * self.num_pairs;
307        }
308        if self.fixed_value_size.is_none() {
309            offset += size_of::<u32>() * self.num_pairs;
310        }
311
312        offset
313    }
314
315    fn key_start(&self, n: usize) -> Option<usize> {
316        if n == 0 {
317            Some(self.key_section_start())
318        } else {
319            self.key_end(n - 1)
320        }
321    }
322
323    fn key_end(&self, n: usize) -> Option<usize> {
324        if n >= self.num_pairs() {
325            None
326        } else {
327            if let Some(fixed) = self.fixed_key_size {
328                return Some(self.key_section_start() + fixed * (n + 1));
329            }
330            let offset = 4 + size_of::<u32>() * n;
331            let end = u32::from_le_bytes(
332                self.page[offset..(offset + size_of::<u32>())]
333                    .try_into()
334                    .unwrap(),
335            ) as usize;
336            Some(end)
337        }
338    }
339
340    fn value_start(&self, n: usize) -> Option<usize> {
341        if n == 0 {
342            self.key_end(self.num_pairs() - 1)
343        } else {
344            self.value_end(n - 1)
345        }
346    }
347
348    fn value_end(&self, n: usize) -> Option<usize> {
349        if n >= self.num_pairs() {
350            None
351        } else {
352            if let Some(fixed) = self.fixed_value_size {
353                return Some(self.key_end(self.num_pairs - 1).unwrap() + fixed * (n + 1));
354            }
355            let mut offset = 4 + size_of::<u32>() * n;
356            if self.fixed_key_size.is_none() {
357                offset += size_of::<u32>() * self.num_pairs;
358            }
359            let end = u32::from_le_bytes(
360                self.page[offset..(offset + size_of::<u32>())]
361                    .try_into()
362                    .unwrap(),
363            ) as usize;
364            Some(end)
365        }
366    }
367
368    pub(crate) fn num_pairs(&self) -> usize {
369        self.num_pairs
370    }
371
372    pub(super) fn offset_of_first_value(&self) -> usize {
373        self.offset_of_value(0).unwrap()
374    }
375
376    pub(super) fn offset_of_value(&self, n: usize) -> Option<usize> {
377        self.value_start(n)
378    }
379
380    pub(super) fn value_range(&self, n: usize) -> Option<(usize, usize)> {
381        Some((self.value_start(n)?, self.value_end(n)?))
382    }
383
384    // Returns the length of all keys and values between [start, end)
385    pub(crate) fn length_of_pairs(&self, start: usize, end: usize) -> usize {
386        self.length_of_values(start, end) + self.length_of_keys(start, end)
387    }
388
389    fn length_of_values(&self, start: usize, end: usize) -> usize {
390        if end == 0 {
391            return 0;
392        }
393        let end_offset = self.value_end(end - 1).unwrap();
394        let start_offset = self.value_start(start).unwrap();
395        end_offset - start_offset
396    }
397
398    // Returns the length of all keys between [start, end)
399    pub(crate) fn length_of_keys(&self, start: usize, end: usize) -> usize {
400        if end == 0 {
401            return 0;
402        }
403        let end_offset = self.key_end(end - 1).unwrap();
404        let start_offset = self.key_start(start).unwrap();
405        end_offset - start_offset
406    }
407
408    pub(crate) fn total_length(&self) -> usize {
409        // Values are stored last
410        self.value_end(self.num_pairs() - 1).unwrap()
411    }
412
413    fn key_unchecked(&self, n: usize) -> &[u8] {
414        &self.page[self.key_start(n).unwrap()..self.key_end(n).unwrap()]
415    }
416
417    pub(crate) fn entry(&self, n: usize) -> Option<EntryAccessor<'a>> {
418        let key = &self.page[self.key_start(n)?..self.key_end(n)?];
419        let value = &self.page[self.value_start(n)?..self.value_end(n)?];
420        Some(EntryAccessor::new(key, value))
421    }
422
423    pub(crate) fn entry_ranges(&self, n: usize) -> Option<(Range<usize>, Range<usize>)> {
424        let key = self.key_start(n)?..self.key_end(n)?;
425        let value = self.value_start(n)?..self.value_end(n)?;
426        Some((key, value))
427    }
428
429    pub(super) fn last_entry(&self) -> EntryAccessor<'a> {
430        self.entry(self.num_pairs() - 1).unwrap()
431    }
432}
433
434pub(super) struct LeafBuilder<'a, 'b> {
435    pairs: Vec<(&'a [u8], &'a [u8])>,
436    fixed_key_size: Option<usize>,
437    fixed_value_size: Option<usize>,
438    total_key_bytes: usize,
439    total_value_bytes: usize,
440    mem: &'b TransactionalMemory,
441}
442
443impl<'a, 'b> LeafBuilder<'a, 'b> {
444    pub(super) fn required_bytes(&self, num_pairs: usize, keys_values_bytes: usize) -> usize {
445        RawLeafBuilder::required_bytes(
446            num_pairs,
447            keys_values_bytes,
448            self.fixed_key_size,
449            self.fixed_value_size,
450        )
451    }
452
453    pub(super) fn new(
454        mem: &'b TransactionalMemory,
455        capacity: usize,
456        fixed_key_size: Option<usize>,
457        fixed_value_size: Option<usize>,
458    ) -> Self {
459        Self {
460            pairs: Vec::with_capacity(capacity),
461            fixed_key_size,
462            fixed_value_size,
463            total_key_bytes: 0,
464            total_value_bytes: 0,
465            mem,
466        }
467    }
468
469    pub(super) fn push(&mut self, key: &'a [u8], value: &'a [u8]) {
470        self.total_key_bytes += key.len();
471        self.total_value_bytes += value.len();
472        self.pairs.push((key, value))
473    }
474
475    pub(super) fn push_all_except(
476        &mut self,
477        accessor: &'a LeafAccessor<'_>,
478        except: Option<usize>,
479    ) {
480        for i in 0..accessor.num_pairs() {
481            if let Some(except) = except {
482                if except == i {
483                    continue;
484                }
485            }
486            let entry = accessor.entry(i).unwrap();
487            self.push(entry.key(), entry.value());
488        }
489    }
490
491    pub(super) fn should_split(&self) -> bool {
492        let required_size = self.required_bytes(
493            self.pairs.len(),
494            self.total_key_bytes + self.total_value_bytes,
495        );
496        required_size > self.mem.get_page_size() && self.pairs.len() > 1
497    }
498
499    pub(super) fn build_split(self) -> Result<(PageMut<'b>, &'a [u8], PageMut<'b>)> {
500        let total_size = self.total_key_bytes + self.total_value_bytes;
501        let mut division = 0;
502        let mut first_split_key_bytes = 0;
503        let mut first_split_value_bytes = 0;
504        for (key, value) in self.pairs.iter().take(self.pairs.len() - 1) {
505            first_split_key_bytes += key.len();
506            first_split_value_bytes += value.len();
507            division += 1;
508            if first_split_key_bytes + first_split_value_bytes >= total_size / 2 {
509                break;
510            }
511        }
512
513        let required_size =
514            self.required_bytes(division, first_split_key_bytes + first_split_value_bytes);
515        let mut page1 = self.mem.allocate(required_size, CachePriority::Low)?;
516        let mut builder = RawLeafBuilder::new(
517            page1.memory_mut(),
518            division,
519            self.fixed_key_size,
520            self.fixed_value_size,
521            first_split_key_bytes,
522        );
523        for (key, value) in self.pairs.iter().take(division) {
524            builder.append(key, value);
525        }
526        drop(builder);
527
528        let required_size = self.required_bytes(
529            self.pairs.len() - division,
530            self.total_key_bytes + self.total_value_bytes
531                - first_split_key_bytes
532                - first_split_value_bytes,
533        );
534        let mut page2 = self.mem.allocate(required_size, CachePriority::Low)?;
535        let mut builder = RawLeafBuilder::new(
536            page2.memory_mut(),
537            self.pairs.len() - division,
538            self.fixed_key_size,
539            self.fixed_value_size,
540            self.total_key_bytes - first_split_key_bytes,
541        );
542        for (key, value) in self.pairs[division..].iter() {
543            builder.append(key, value);
544        }
545        drop(builder);
546
547        Ok((page1, self.pairs[division - 1].0, page2))
548    }
549
550    pub(super) fn build(self) -> Result<PageMut<'b>> {
551        let required_size = self.required_bytes(
552            self.pairs.len(),
553            self.total_key_bytes + self.total_value_bytes,
554        );
555        let mut page = self.mem.allocate(required_size, CachePriority::Low)?;
556        let mut builder = RawLeafBuilder::new(
557            page.memory_mut(),
558            self.pairs.len(),
559            self.fixed_key_size,
560            self.fixed_value_size,
561            self.total_key_bytes,
562        );
563        for (key, value) in self.pairs {
564            builder.append(key, value);
565        }
566        drop(builder);
567        Ok(page)
568    }
569}
570
571// Note the caller is responsible for ensuring that the buffer is large enough
572// and rewriting all fields if any dynamically sized fields are written
573// Layout is:
574// 1 byte: type
575// 1 byte: reserved (padding to 32bits aligned)
576// 2 bytes: num_entries (number of pairs)
577// (optional) repeating (num_entries times):
578// 4 bytes: key_end
579// (optional) repeating (num_entries times):
580// 4 bytes: value_end
581// repeating (num_entries times):
582// * n bytes: key data
583// repeating (num_entries times):
584// * n bytes: value data
585pub(crate) struct RawLeafBuilder<'a> {
586    page: &'a mut [u8],
587    fixed_key_size: Option<usize>,
588    fixed_value_size: Option<usize>,
589    num_pairs: usize,
590    provisioned_key_bytes: usize,
591    pairs_written: usize, // used for debugging
592}
593
594impl<'a> RawLeafBuilder<'a> {
595    pub(crate) fn required_bytes(
596        num_pairs: usize,
597        keys_values_bytes: usize,
598        key_size: Option<usize>,
599        value_size: Option<usize>,
600    ) -> usize {
601        // Page id & header;
602        let mut result = 4;
603        // key & value lengths
604        if key_size.is_none() {
605            result += num_pairs * size_of::<u32>();
606        }
607        if value_size.is_none() {
608            result += num_pairs * size_of::<u32>();
609        }
610        result += keys_values_bytes;
611
612        result
613    }
614
615    pub(crate) fn new(
616        page: &'a mut [u8],
617        num_pairs: usize,
618        fixed_key_size: Option<usize>,
619        fixed_value_size: Option<usize>,
620        key_bytes: usize,
621    ) -> Self {
622        page[0] = LEAF;
623        page[2..4].copy_from_slice(&u16::try_from(num_pairs).unwrap().to_le_bytes());
624        #[cfg(debug_assertions)]
625        {
626            // Poison all the key & value offsets, in case the caller forgets to write them
627            let mut last = 4;
628            if fixed_key_size.is_none() {
629                last += size_of::<u32>() * num_pairs;
630            }
631            if fixed_value_size.is_none() {
632                last += size_of::<u32>() * num_pairs;
633            }
634            for x in &mut page[4..last] {
635                *x = 0xFF;
636            }
637        }
638        RawLeafBuilder {
639            page,
640            fixed_key_size,
641            fixed_value_size,
642            num_pairs,
643            provisioned_key_bytes: key_bytes,
644            pairs_written: 0,
645        }
646    }
647
648    fn value_end(&self, n: usize) -> usize {
649        if let Some(fixed) = self.fixed_value_size {
650            return self.key_section_start() + self.provisioned_key_bytes + fixed * (n + 1);
651        }
652        let mut offset = 4 + size_of::<u32>() * n;
653        if self.fixed_key_size.is_none() {
654            offset += size_of::<u32>() * self.num_pairs;
655        }
656        u32::from_le_bytes(
657            self.page[offset..(offset + size_of::<u32>())]
658                .try_into()
659                .unwrap(),
660        ) as usize
661    }
662
663    fn key_section_start(&self) -> usize {
664        let mut offset = 4;
665        if self.fixed_key_size.is_none() {
666            offset += size_of::<u32>() * self.num_pairs;
667        }
668        if self.fixed_value_size.is_none() {
669            offset += size_of::<u32>() * self.num_pairs;
670        }
671
672        offset
673    }
674
675    fn key_end(&self, n: usize) -> usize {
676        if let Some(fixed) = self.fixed_key_size {
677            return self.key_section_start() + fixed * (n + 1);
678        }
679        let offset = 4 + size_of::<u32>() * n;
680        u32::from_le_bytes(
681            self.page[offset..(offset + size_of::<u32>())]
682                .try_into()
683                .unwrap(),
684        ) as usize
685    }
686
687    pub(crate) fn append(&mut self, key: &[u8], value: &[u8]) {
688        if let Some(key_width) = self.fixed_key_size {
689            assert_eq!(key_width, key.len());
690        }
691        if let Some(value_width) = self.fixed_value_size {
692            assert_eq!(value_width, value.len());
693        }
694        let key_offset = if self.pairs_written == 0 {
695            self.key_section_start()
696        } else {
697            self.key_end(self.pairs_written - 1)
698        };
699        let value_offset = if self.pairs_written == 0 {
700            self.key_section_start() + self.provisioned_key_bytes
701        } else {
702            self.value_end(self.pairs_written - 1)
703        };
704
705        let n = self.pairs_written;
706        if self.fixed_key_size.is_none() {
707            let offset = 4 + size_of::<u32>() * n;
708            self.page[offset..(offset + size_of::<u32>())]
709                .copy_from_slice(&u32::try_from(key_offset + key.len()).unwrap().to_le_bytes());
710        }
711        self.page[key_offset..(key_offset + key.len())].copy_from_slice(key);
712        let written_key_len = key_offset + key.len() - self.key_section_start();
713        assert!(written_key_len <= self.provisioned_key_bytes);
714
715        if self.fixed_value_size.is_none() {
716            let mut offset = 4 + size_of::<u32>() * n;
717            if self.fixed_key_size.is_none() {
718                offset += size_of::<u32>() * self.num_pairs;
719            }
720            self.page[offset..(offset + size_of::<u32>())].copy_from_slice(
721                &u32::try_from(value_offset + value.len())
722                    .unwrap()
723                    .to_le_bytes(),
724            );
725        }
726        self.page[value_offset..(value_offset + value.len())].copy_from_slice(value);
727        self.pairs_written += 1;
728    }
729}
730
731impl<'a> Drop for RawLeafBuilder<'a> {
732    fn drop(&mut self) {
733        if !thread::panicking() {
734            assert_eq!(self.pairs_written, self.num_pairs);
735            assert_eq!(
736                self.key_section_start() + self.provisioned_key_bytes,
737                self.key_end(self.num_pairs - 1)
738            );
739        }
740    }
741}
742
743pub(crate) struct LeafMutator<'a: 'b, 'b> {
744    page: &'b mut PageMut<'a>,
745    fixed_key_size: Option<usize>,
746    fixed_value_size: Option<usize>,
747}
748
749impl<'a: 'b, 'b> LeafMutator<'a, 'b> {
750    pub(crate) fn new(
751        page: &'b mut PageMut<'a>,
752        fixed_key_size: Option<usize>,
753        fixed_value_size: Option<usize>,
754    ) -> Self {
755        assert_eq!(page.memory_mut()[0], LEAF);
756        Self {
757            page,
758            fixed_key_size,
759            fixed_value_size,
760        }
761    }
762
763    pub(super) fn sufficient_insert_inplace_space(
764        page: &'_ PageImpl<'_>,
765        position: usize,
766        overwrite: bool,
767        fixed_key_size: Option<usize>,
768        fixed_value_size: Option<usize>,
769        new_key: &[u8],
770        new_value: &[u8],
771    ) -> bool {
772        let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size);
773        if overwrite {
774            let remaining = page.memory().len() - accessor.total_length();
775            let required_delta = isize::try_from(new_key.len() + new_value.len()).unwrap()
776                - isize::try_from(accessor.length_of_pairs(position, position + 1)).unwrap();
777            required_delta <= isize::try_from(remaining).unwrap()
778        } else {
779            // If this is a large page, only allow in-place appending to avoid write amplification
780            //
781            // Note: this check is also required to avoid inserting an unbounded number of small values
782            // into a large page, which could result in overflowing a u16 which is the maximum number of entries per leaf
783            if page.get_page_number().page_order > 0 && position < accessor.num_pairs() {
784                return false;
785            }
786            let remaining = page.memory().len() - accessor.total_length();
787            let mut required_delta = new_key.len() + new_value.len();
788            if fixed_key_size.is_none() {
789                required_delta += size_of::<u32>();
790            }
791            if fixed_value_size.is_none() {
792                required_delta += size_of::<u32>();
793            }
794            required_delta <= remaining
795        }
796    }
797
798    // Insert the given key, value pair at index i and shift all following pairs to the right
799    pub(crate) fn insert(&mut self, i: usize, overwrite: bool, key: &[u8], value: &[u8]) {
800        let accessor = LeafAccessor::new(
801            self.page.memory(),
802            self.fixed_key_size,
803            self.fixed_value_size,
804        );
805        let required_delta = if overwrite {
806            isize::try_from(key.len() + value.len()).unwrap()
807                - isize::try_from(accessor.length_of_pairs(i, i + 1)).unwrap()
808        } else {
809            let mut delta = key.len() + value.len();
810            if self.fixed_key_size.is_none() {
811                delta += size_of::<u32>();
812            }
813            if self.fixed_value_size.is_none() {
814                delta += size_of::<u32>();
815            }
816            delta.try_into().unwrap()
817        };
818        assert!(
819            isize::try_from(accessor.total_length()).unwrap() + required_delta
820                <= isize::try_from(self.page.memory().len()).unwrap()
821        );
822
823        let num_pairs = accessor.num_pairs();
824        let last_key_end = accessor.key_end(accessor.num_pairs() - 1).unwrap();
825        let last_value_end = accessor.value_end(accessor.num_pairs() - 1).unwrap();
826        let shift_index = if overwrite { i + 1 } else { i };
827        let shift_key_start = accessor.key_start(shift_index).unwrap_or(last_key_end);
828        let shift_value_start = accessor.value_start(shift_index).unwrap_or(last_value_end);
829        let existing_value_len = accessor
830            .value_range(i)
831            .map(|(start, end)| end - start)
832            .unwrap_or_default();
833        drop(accessor);
834
835        let value_delta = if overwrite {
836            isize::try_from(value.len()).unwrap() - isize::try_from(existing_value_len).unwrap()
837        } else {
838            value.len().try_into().unwrap()
839        };
840
841        // Update all the pointers
842        let key_ptr_size: usize = if self.fixed_key_size.is_none() { 4 } else { 0 };
843        let value_ptr_size: usize = if self.fixed_value_size.is_none() {
844            4
845        } else {
846            0
847        };
848        if !overwrite {
849            for j in 0..i {
850                self.update_key_end(j, (key_ptr_size + value_ptr_size).try_into().unwrap());
851                let value_delta: isize = (key_ptr_size + value_ptr_size + key.len())
852                    .try_into()
853                    .unwrap();
854                self.update_value_end(j, value_delta);
855            }
856        }
857        for j in i..num_pairs {
858            if overwrite {
859                self.update_value_end(j, value_delta);
860            } else {
861                let key_delta: isize = (key_ptr_size + value_ptr_size + key.len())
862                    .try_into()
863                    .unwrap();
864                self.update_key_end(j, key_delta);
865                let value_delta = key_delta + isize::try_from(value.len()).unwrap();
866                self.update_value_end(j, value_delta);
867            }
868        }
869
870        let new_num_pairs = if overwrite { num_pairs } else { num_pairs + 1 };
871        self.page.memory_mut()[2..4]
872            .copy_from_slice(&u16::try_from(new_num_pairs).unwrap().to_le_bytes());
873
874        // Right shift the trailing values
875        let mut dest = if overwrite {
876            (isize::try_from(shift_value_start).unwrap() + value_delta)
877                .try_into()
878                .unwrap()
879        } else {
880            shift_value_start + key_ptr_size + value_ptr_size + key.len() + value.len()
881        };
882        let start = shift_value_start;
883        let end = last_value_end;
884        self.page.memory_mut().copy_within(start..end, dest);
885
886        // Insert the value
887        let inserted_value_end: u32 = dest.try_into().unwrap();
888        dest -= value.len();
889        self.page.memory_mut()[dest..(dest + value.len())].copy_from_slice(value);
890
891        if !overwrite {
892            // Right shift the trailing key data & preceding value data
893            let start = shift_key_start;
894            let end = shift_value_start;
895            dest -= end - start;
896            self.page.memory_mut().copy_within(start..end, dest);
897
898            // Insert the key
899            let inserted_key_end: u32 = dest.try_into().unwrap();
900            dest -= key.len();
901            self.page.memory_mut()[dest..(dest + key.len())].copy_from_slice(key);
902
903            // Right shift the trailing value pointers & preceding key data
904            let start = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
905            let end = shift_key_start;
906            dest -= end - start;
907            debug_assert_eq!(
908                dest,
909                4 + key_ptr_size * new_num_pairs + value_ptr_size * (i + 1)
910            );
911            self.page.memory_mut().copy_within(start..end, dest);
912
913            // Insert the value pointer
914            if self.fixed_value_size.is_none() {
915                dest -= size_of::<u32>();
916                self.page.memory_mut()[dest..(dest + size_of::<u32>())]
917                    .copy_from_slice(&inserted_value_end.to_le_bytes());
918            }
919
920            // Right shift the trailing key pointers & preceding value pointers
921            let start = 4 + key_ptr_size * i;
922            let end = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
923            dest -= end - start;
924            debug_assert_eq!(dest, 4 + key_ptr_size * (i + 1));
925            self.page.memory_mut().copy_within(start..end, dest);
926
927            // Insert the key pointer
928            if self.fixed_key_size.is_none() {
929                dest -= size_of::<u32>();
930                self.page.memory_mut()[dest..(dest + size_of::<u32>())]
931                    .copy_from_slice(&inserted_key_end.to_le_bytes());
932            }
933            debug_assert_eq!(dest, 4 + key_ptr_size * i);
934        }
935    }
936
937    pub(super) fn remove(&mut self, i: usize) {
938        let accessor = LeafAccessor::new(
939            self.page.memory(),
940            self.fixed_key_size,
941            self.fixed_value_size,
942        );
943        let num_pairs = accessor.num_pairs();
944        assert!(i < num_pairs);
945        assert!(num_pairs > 1);
946        let key_start = accessor.key_start(i).unwrap();
947        let key_end = accessor.key_end(i).unwrap();
948        let value_start = accessor.value_start(i).unwrap();
949        let value_end = accessor.value_end(i).unwrap();
950        let last_value_end = accessor.value_end(accessor.num_pairs() - 1).unwrap();
951        drop(accessor);
952
953        // Update all the pointers
954        let key_ptr_size = if self.fixed_key_size.is_none() {
955            size_of::<u32>()
956        } else {
957            0
958        };
959        let value_ptr_size = if self.fixed_value_size.is_none() {
960            size_of::<u32>()
961        } else {
962            0
963        };
964        for j in 0..i {
965            self.update_key_end(j, -isize::try_from(key_ptr_size + value_ptr_size).unwrap());
966            let value_delta = -isize::try_from(key_ptr_size + value_ptr_size).unwrap()
967                - isize::try_from(key_end - key_start).unwrap();
968            self.update_value_end(j, value_delta);
969        }
970        for j in (i + 1)..num_pairs {
971            let key_delta = -isize::try_from(key_ptr_size + value_ptr_size).unwrap()
972                - isize::try_from(key_end - key_start).unwrap();
973            self.update_key_end(j, key_delta);
974            let value_delta = key_delta - isize::try_from(value_end - value_start).unwrap();
975            self.update_value_end(j, value_delta);
976        }
977
978        // Left shift all the pointers & data
979
980        let new_num_pairs = num_pairs - 1;
981        self.page.memory_mut()[2..4]
982            .copy_from_slice(&u16::try_from(new_num_pairs).unwrap().to_le_bytes());
983        // Left shift the trailing key pointers & preceding value pointers
984        let mut dest = 4 + key_ptr_size * i;
985        // First trailing key pointer
986        let start = 4 + key_ptr_size * (i + 1);
987        // Last preceding value pointer
988        let end = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
989        self.page.memory_mut().copy_within(start..end, dest);
990        dest += end - start;
991        debug_assert_eq!(dest, 4 + key_ptr_size * new_num_pairs + value_ptr_size * i);
992
993        // Left shift the trailing value pointers & preceding key data
994        let start = 4 + key_ptr_size * num_pairs + value_ptr_size * (i + 1);
995        let end = key_start;
996        self.page.memory_mut().copy_within(start..end, dest);
997        dest += end - start;
998
999        let preceding_key_len = key_start - (4 + (key_ptr_size + value_ptr_size) * num_pairs);
1000        debug_assert_eq!(
1001            dest,
1002            4 + (key_ptr_size + value_ptr_size) * new_num_pairs + preceding_key_len
1003        );
1004
1005        // Left shift the trailing key data & preceding value data
1006        let start = key_end;
1007        let end = value_start;
1008        self.page.memory_mut().copy_within(start..end, dest);
1009        dest += end - start;
1010
1011        // Left shift the trailing value data
1012        let preceding_data_len =
1013            value_start - (4 + (key_ptr_size + value_ptr_size) * num_pairs) - (key_end - key_start);
1014        debug_assert_eq!(
1015            dest,
1016            4 + (key_ptr_size + value_ptr_size) * new_num_pairs + preceding_data_len
1017        );
1018        let start = value_end;
1019        let end = last_value_end;
1020        self.page.memory_mut().copy_within(start..end, dest);
1021    }
1022
1023    fn update_key_end(&mut self, i: usize, delta: isize) {
1024        if self.fixed_key_size.is_some() {
1025            return;
1026        }
1027        let offset = 4 + size_of::<u32>() * i;
1028        let mut ptr = u32::from_le_bytes(
1029            self.page.memory()[offset..(offset + size_of::<u32>())]
1030                .try_into()
1031                .unwrap(),
1032        );
1033        ptr = (isize::try_from(ptr).unwrap() + delta).try_into().unwrap();
1034        self.page.memory_mut()[offset..(offset + size_of::<u32>())]
1035            .copy_from_slice(&ptr.to_le_bytes());
1036    }
1037
1038    fn update_value_end(&mut self, i: usize, delta: isize) {
1039        if self.fixed_value_size.is_some() {
1040            return;
1041        }
1042        let accessor = LeafAccessor::new(
1043            self.page.memory(),
1044            self.fixed_key_size,
1045            self.fixed_value_size,
1046        );
1047        let num_pairs = accessor.num_pairs();
1048        drop(accessor);
1049        let mut offset = 4 + size_of::<u32>() * i;
1050        if self.fixed_key_size.is_none() {
1051            offset += size_of::<u32>() * num_pairs;
1052        }
1053        let mut ptr = u32::from_le_bytes(
1054            self.page.memory()[offset..(offset + size_of::<u32>())]
1055                .try_into()
1056                .unwrap(),
1057        );
1058        ptr = (isize::try_from(ptr).unwrap() + delta).try_into().unwrap();
1059        self.page.memory_mut()[offset..(offset + size_of::<u32>())]
1060            .copy_from_slice(&ptr.to_le_bytes());
1061    }
1062}
1063
1064// Provides a simple zero-copy way to access a branch page
1065// TODO: this should be pub(super) and the multimap btree stuff should be moved into this package
1066pub(crate) struct BranchAccessor<'a: 'b, 'b, T: Page + 'a> {
1067    page: &'b T,
1068    num_keys: usize,
1069    fixed_key_size: Option<usize>,
1070    _page_lifetime: PhantomData<&'a ()>,
1071}
1072
1073impl<'a: 'b, 'b, T: Page + 'a> BranchAccessor<'a, 'b, T> {
1074    pub(crate) fn new(page: &'b T, fixed_key_size: Option<usize>) -> Self {
1075        debug_assert_eq!(page.memory()[0], BRANCH);
1076        let num_keys = u16::from_le_bytes(page.memory()[2..4].try_into().unwrap()) as usize;
1077        BranchAccessor {
1078            page,
1079            num_keys,
1080            fixed_key_size,
1081            _page_lifetime: Default::default(),
1082        }
1083    }
1084
1085    pub(super) fn print_node<K: RedbKey>(&self) {
1086        eprint!(
1087            "Internal[ (page={:?}), child_0={:?}",
1088            self.page.get_page_number(),
1089            self.child_page(0).unwrap()
1090        );
1091        for i in 0..(self.count_children() - 1) {
1092            if let Some(child) = self.child_page(i + 1) {
1093                let key = self.key(i).unwrap();
1094                eprint!(" key_{}={:?}", i, K::from_bytes(key));
1095                eprint!(" child_{}={:?}", i + 1, child);
1096            }
1097        }
1098        eprint!("]");
1099    }
1100
1101    pub(crate) fn total_length(&self) -> usize {
1102        // Keys are stored at the end
1103        self.key_end(self.num_keys() - 1)
1104    }
1105
1106    pub(super) fn child_for_key<K: RedbKey>(&self, query: &[u8]) -> (usize, PageNumber) {
1107        let mut min_child = 0; // inclusive
1108        let mut max_child = self.num_keys(); // inclusive
1109        while min_child < max_child {
1110            let mid = (min_child + max_child) / 2;
1111            match K::compare(query, self.key(mid).unwrap()) {
1112                Ordering::Less => {
1113                    max_child = mid;
1114                }
1115                Ordering::Equal => {
1116                    return (mid, self.child_page(mid).unwrap());
1117                }
1118                Ordering::Greater => {
1119                    min_child = mid + 1;
1120                }
1121            }
1122        }
1123        debug_assert_eq!(min_child, max_child);
1124
1125        (min_child, self.child_page(min_child).unwrap())
1126    }
1127
1128    fn key_section_start(&self) -> usize {
1129        if self.fixed_key_size.is_none() {
1130            8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1131                + size_of::<u32>() * self.num_keys()
1132        } else {
1133            8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1134        }
1135    }
1136
1137    fn key_offset(&self, n: usize) -> usize {
1138        if n == 0 {
1139            self.key_section_start()
1140        } else {
1141            self.key_end(n - 1)
1142        }
1143    }
1144
1145    fn key_end(&self, n: usize) -> usize {
1146        if let Some(fixed) = self.fixed_key_size {
1147            return self.key_section_start() + fixed * (n + 1);
1148        }
1149        let offset = 8
1150            + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1151            + size_of::<u32>() * n;
1152        u32::from_le_bytes(
1153            self.page.memory()[offset..(offset + size_of::<u32>())]
1154                .try_into()
1155                .unwrap(),
1156        ) as usize
1157    }
1158
1159    pub(super) fn key(&self, n: usize) -> Option<&[u8]> {
1160        if n >= self.num_keys() {
1161            return None;
1162        }
1163        let offset = self.key_offset(n);
1164        let end = self.key_end(n);
1165        Some(&self.page.memory()[offset..end])
1166    }
1167
1168    pub(crate) fn count_children(&self) -> usize {
1169        self.num_keys() + 1
1170    }
1171
1172    pub(super) fn child_checksum(&self, n: usize) -> Option<Checksum> {
1173        if n >= self.count_children() {
1174            return None;
1175        }
1176
1177        let offset = 8 + size_of::<Checksum>() * n;
1178        Some(Checksum::from_le_bytes(
1179            self.page.memory()[offset..(offset + size_of::<Checksum>())]
1180                .try_into()
1181                .unwrap(),
1182        ))
1183    }
1184
1185    pub(crate) fn child_page(&self, n: usize) -> Option<PageNumber> {
1186        if n >= self.count_children() {
1187            return None;
1188        }
1189
1190        let offset =
1191            8 + size_of::<Checksum>() * self.count_children() + PageNumber::serialized_size() * n;
1192        Some(PageNumber::from_le_bytes(
1193            self.page.memory()[offset..(offset + PageNumber::serialized_size())]
1194                .try_into()
1195                .unwrap(),
1196        ))
1197    }
1198
1199    fn num_keys(&self) -> usize {
1200        self.num_keys
1201    }
1202}
1203
1204pub(super) struct BranchBuilder<'a, 'b> {
1205    children: Vec<(PageNumber, Checksum)>,
1206    keys: Vec<&'a [u8]>,
1207    total_key_bytes: usize,
1208    fixed_key_size: Option<usize>,
1209    mem: &'b TransactionalMemory,
1210}
1211
1212impl<'a, 'b> BranchBuilder<'a, 'b> {
1213    pub(super) fn new(
1214        mem: &'b TransactionalMemory,
1215        child_capacity: usize,
1216        fixed_key_size: Option<usize>,
1217    ) -> Self {
1218        Self {
1219            children: Vec::with_capacity(child_capacity),
1220            keys: Vec::with_capacity(child_capacity - 1),
1221            total_key_bytes: 0,
1222            fixed_key_size,
1223            mem,
1224        }
1225    }
1226
1227    pub(super) fn replace_child(&mut self, index: usize, child: PageNumber, checksum: Checksum) {
1228        self.children[index] = (child, checksum);
1229    }
1230
1231    pub(super) fn push_child(&mut self, child: PageNumber, checksum: Checksum) {
1232        self.children.push((child, checksum));
1233    }
1234
1235    pub(super) fn push_key(&mut self, key: &'a [u8]) {
1236        self.keys.push(key);
1237        self.total_key_bytes += key.len();
1238    }
1239
1240    pub(super) fn push_all<T: Page>(&mut self, accessor: &'a BranchAccessor<'_, '_, T>) {
1241        for i in 0..accessor.count_children() {
1242            let child = accessor.child_page(i).unwrap();
1243            let checksum = accessor.child_checksum(i).unwrap();
1244            self.push_child(child, checksum);
1245        }
1246        for i in 0..(accessor.count_children() - 1) {
1247            self.push_key(accessor.key(i).unwrap());
1248        }
1249    }
1250
1251    pub(super) fn to_single_child(&self) -> Option<(PageNumber, Checksum)> {
1252        if self.children.len() > 1 {
1253            None
1254        } else {
1255            Some(self.children[0])
1256        }
1257    }
1258
1259    pub(super) fn build(self) -> Result<PageMut<'b>> {
1260        assert_eq!(self.children.len(), self.keys.len() + 1);
1261        let size = RawBranchBuilder::required_bytes(
1262            self.keys.len(),
1263            self.total_key_bytes,
1264            self.fixed_key_size,
1265        );
1266        let mut page = self.mem.allocate(size, CachePriority::High)?;
1267        let mut builder = RawBranchBuilder::new(&mut page, self.keys.len(), self.fixed_key_size);
1268        builder.write_first_page(self.children[0].0, self.children[0].1);
1269        for i in 1..self.children.len() {
1270            let key = &self.keys[i - 1];
1271            builder.write_nth_key(key.as_ref(), self.children[i].0, self.children[i].1, i - 1);
1272        }
1273        drop(builder);
1274
1275        Ok(page)
1276    }
1277
1278    pub(super) fn should_split(&self) -> bool {
1279        let size = RawBranchBuilder::required_bytes(
1280            self.keys.len(),
1281            self.total_key_bytes,
1282            self.fixed_key_size,
1283        );
1284        size > self.mem.get_page_size() && self.keys.len() >= 3
1285    }
1286
1287    pub(super) fn build_split(self) -> Result<(PageMut<'b>, &'a [u8], PageMut<'b>)> {
1288        assert_eq!(self.children.len(), self.keys.len() + 1);
1289        assert!(self.keys.len() >= 3);
1290        let division = self.keys.len() / 2;
1291        let first_split_key_len: usize = self.keys.iter().take(division).map(|k| k.len()).sum();
1292        let division_key = self.keys[division];
1293        let second_split_key_len = self.total_key_bytes - first_split_key_len - division_key.len();
1294
1295        let size =
1296            RawBranchBuilder::required_bytes(division, first_split_key_len, self.fixed_key_size);
1297        let mut page1 = self.mem.allocate(size, CachePriority::High)?;
1298        let mut builder = RawBranchBuilder::new(&mut page1, division, self.fixed_key_size);
1299        builder.write_first_page(self.children[0].0, self.children[0].1);
1300        for i in 0..division {
1301            let key = &self.keys[i];
1302            builder.write_nth_key(
1303                key.as_ref(),
1304                self.children[i + 1].0,
1305                self.children[i + 1].1,
1306                i,
1307            );
1308        }
1309        drop(builder);
1310
1311        let size = RawBranchBuilder::required_bytes(
1312            self.keys.len() - division - 1,
1313            second_split_key_len,
1314            self.fixed_key_size,
1315        );
1316        let mut page2 = self.mem.allocate(size, CachePriority::High)?;
1317        let mut builder = RawBranchBuilder::new(
1318            &mut page2,
1319            self.keys.len() - division - 1,
1320            self.fixed_key_size,
1321        );
1322        builder.write_first_page(self.children[division + 1].0, self.children[division + 1].1);
1323        for i in (division + 1)..self.keys.len() {
1324            let key = &self.keys[i];
1325            builder.write_nth_key(
1326                key.as_ref(),
1327                self.children[i + 1].0,
1328                self.children[i + 1].1,
1329                i - division - 1,
1330            );
1331        }
1332        drop(builder);
1333
1334        Ok((page1, division_key, page2))
1335    }
1336}
1337
1338// Note the caller is responsible for ensuring that the buffer is large enough
1339// and rewriting all fields if any dynamically sized fields are written
1340// Layout is:
1341// 1 byte: type
1342// 1 byte: padding (padding to 16bits aligned)
1343// 2 bytes: num_keys (number of keys)
1344// 4 byte: padding (padding to 64bits aligned)
1345// repeating (num_keys + 1 times):
1346// 16 bytes: child page checksum
1347// repeating (num_keys + 1 times):
1348// 8 bytes: page number
1349// (optional) repeating (num_keys times):
1350// * 4 bytes: key end. Ending offset of the key, exclusive
1351// repeating (num_keys times):
1352// * n bytes: key data
1353pub(super) struct RawBranchBuilder<'a: 'b, 'b> {
1354    page: &'b mut PageMut<'a>,
1355    fixed_key_size: Option<usize>,
1356    num_keys: usize,
1357    keys_written: usize, // used for debugging
1358}
1359
1360impl<'a: 'b, 'b> RawBranchBuilder<'a, 'b> {
1361    pub(super) fn required_bytes(
1362        num_keys: usize,
1363        size_of_keys: usize,
1364        fixed_key_size: Option<usize>,
1365    ) -> usize {
1366        if fixed_key_size.is_none() {
1367            let fixed_size = 8
1368                + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1)
1369                + size_of::<u32>() * num_keys;
1370            size_of_keys + fixed_size
1371        } else {
1372            let fixed_size =
1373                8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1);
1374            size_of_keys + fixed_size
1375        }
1376    }
1377
1378    // Caller MUST write num_keys values
1379    pub(super) fn new(
1380        page: &'b mut PageMut<'a>,
1381        num_keys: usize,
1382        fixed_key_size: Option<usize>,
1383    ) -> Self {
1384        assert!(num_keys > 0);
1385        page.memory_mut()[0] = BRANCH;
1386        page.memory_mut()[2..4].copy_from_slice(&u16::try_from(num_keys).unwrap().to_le_bytes());
1387        #[cfg(debug_assertions)]
1388        {
1389            // Poison all the child pointers & key offsets, in case the caller forgets to write them
1390            let start = 8 + size_of::<Checksum>() * (num_keys + 1);
1391            let last = 8
1392                + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1)
1393                + size_of::<u32>() * num_keys;
1394            for x in &mut page.memory_mut()[start..last] {
1395                *x = 0xFF;
1396            }
1397        }
1398        RawBranchBuilder {
1399            page,
1400            fixed_key_size,
1401            num_keys,
1402            keys_written: 0,
1403        }
1404    }
1405
1406    pub(super) fn write_first_page(&mut self, page_number: PageNumber, checksum: Checksum) {
1407        let offset = 8;
1408        self.page.memory_mut()[offset..(offset + size_of::<Checksum>())]
1409            .copy_from_slice(&checksum.to_le_bytes());
1410        let offset = 8 + size_of::<Checksum>() * (self.num_keys + 1);
1411        self.page.memory_mut()[offset..(offset + PageNumber::serialized_size())]
1412            .copy_from_slice(&page_number.to_le_bytes());
1413    }
1414
1415    fn key_section_start(&self) -> usize {
1416        let mut offset =
1417            8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1);
1418        if self.fixed_key_size.is_none() {
1419            offset += size_of::<u32>() * self.num_keys;
1420        }
1421
1422        offset
1423    }
1424
1425    fn key_end(&self, n: usize) -> usize {
1426        if let Some(fixed) = self.fixed_key_size {
1427            return self.key_section_start() + fixed * (n + 1);
1428        }
1429        let offset = 8
1430            + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1)
1431            + size_of::<u32>() * n;
1432        u32::from_le_bytes(
1433            self.page.memory()[offset..(offset + size_of::<u32>())]
1434                .try_into()
1435                .unwrap(),
1436        ) as usize
1437    }
1438
1439    // Write the nth key and page of values greater than this key, but less than or equal to the next
1440    // Caller must write keys & pages in increasing order
1441    pub(super) fn write_nth_key(
1442        &mut self,
1443        key: &[u8],
1444        page_number: PageNumber,
1445        checksum: Checksum,
1446        n: usize,
1447    ) {
1448        assert!(n < self.num_keys);
1449        assert_eq!(n, self.keys_written);
1450        self.keys_written += 1;
1451        let offset = 8 + size_of::<Checksum>() * (n + 1);
1452        self.page.memory_mut()[offset..(offset + size_of::<Checksum>())]
1453            .copy_from_slice(&checksum.to_le_bytes());
1454        let offset = 8
1455            + size_of::<Checksum>() * (self.num_keys + 1)
1456            + PageNumber::serialized_size() * (n + 1);
1457        self.page.memory_mut()[offset..(offset + PageNumber::serialized_size())]
1458            .copy_from_slice(&page_number.to_le_bytes());
1459
1460        let data_offset = if n > 0 {
1461            self.key_end(n - 1)
1462        } else {
1463            self.key_section_start()
1464        };
1465        if self.fixed_key_size.is_none() {
1466            let offset = 8
1467                + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1)
1468                + size_of::<u32>() * n;
1469            self.page.memory_mut()[offset..(offset + size_of::<u32>())].copy_from_slice(
1470                &u32::try_from(data_offset + key.len())
1471                    .unwrap()
1472                    .to_le_bytes(),
1473            );
1474        }
1475
1476        debug_assert!(data_offset > offset);
1477        self.page.memory_mut()[data_offset..(data_offset + key.len())].copy_from_slice(key);
1478    }
1479}
1480
1481impl<'a: 'b, 'b> Drop for RawBranchBuilder<'a, 'b> {
1482    fn drop(&mut self) {
1483        if !thread::panicking() {
1484            assert_eq!(self.keys_written, self.num_keys);
1485        }
1486    }
1487}
1488
1489pub(super) struct BranchMutator<'a: 'b, 'b> {
1490    page: &'b mut PageMut<'a>,
1491}
1492
1493impl<'a: 'b, 'b> BranchMutator<'a, 'b> {
1494    pub(super) fn new(page: &'b mut PageMut<'a>) -> Self {
1495        assert_eq!(page.memory()[0], BRANCH);
1496        Self { page }
1497    }
1498
1499    fn num_keys(&self) -> usize {
1500        u16::from_le_bytes(self.page.memory()[2..4].try_into().unwrap()) as usize
1501    }
1502
1503    pub(super) fn write_child_page(
1504        &mut self,
1505        i: usize,
1506        page_number: PageNumber,
1507        checksum: Checksum,
1508    ) {
1509        debug_assert!(i <= self.num_keys());
1510        let offset = 8 + size_of::<Checksum>() * i;
1511        self.page.memory_mut()[offset..(offset + size_of::<Checksum>())]
1512            .copy_from_slice(&checksum.to_le_bytes());
1513        let offset =
1514            8 + size_of::<Checksum>() * (self.num_keys() + 1) + PageNumber::serialized_size() * i;
1515        self.page.memory_mut()[offset..(offset + PageNumber::serialized_size())]
1516            .copy_from_slice(&page_number.to_le_bytes());
1517    }
1518}