1use crate::compat::Mutex;
2use crate::tree_store::page_store::compression::{
3 CompressionConfig, compress_value, decompress_value,
4};
5use crate::tree_store::page_store::{Page, PageImpl, PageMut, TransactionalMemory, xxh3_checksum};
6use crate::tree_store::{PageNumber, PageTrackerPolicy};
7use crate::types::{Key, MutInPlaceValue, Value};
8use crate::{Result, StorageError};
9use alloc::borrow::Cow;
10use alloc::boxed::Box;
11use alloc::format;
12use alloc::sync::Arc;
13use alloc::vec::Vec;
14use core::borrow::Borrow;
15use core::cmp::Ordering;
16use core::marker::PhantomData;
17use core::mem::size_of;
18use core::ops::Range;
19
20pub(crate) const LEAF: u8 = 1;
21pub(crate) const BRANCH: u8 = 2;
22
23pub(crate) type Checksum = u128;
24pub(crate) const DEFERRED: Checksum = 999;
26
27pub(super) fn leaf_checksum<T: Page>(
28 page: &T,
29 fixed_key_size: Option<usize>,
30 fixed_value_size: Option<usize>,
31) -> Result<Checksum, StorageError> {
32 let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size)?;
33 let last_pair = accessor.num_pairs().checked_sub(1).ok_or_else(|| {
34 StorageError::page_corrupted(page.get_page_number(), "leaf page has zero pairs")
35 })?;
36 let end = accessor.value_end(last_pair).ok_or_else(|| {
37 StorageError::page_corrupted(page.get_page_number(), "leaf page has invalid pair offset")
38 })?;
39 if end > page.memory().len() {
40 Err(StorageError::page_corrupted(
41 page.get_page_number(),
42 "leaf page last offset beyond end of data",
43 ))
44 } else {
45 Ok(xxh3_checksum(&page.memory()[..end]))
46 }
47}
48
49pub(super) fn branch_checksum<T: Page>(
50 page: &T,
51 fixed_key_size: Option<usize>,
52) -> Result<Checksum, StorageError> {
53 let accessor = BranchAccessor::new(page, fixed_key_size)?;
54 let last_key = accessor.num_keys().checked_sub(1).ok_or_else(|| {
55 StorageError::page_corrupted(page.get_page_number(), "branch page has zero keys")
56 })?;
57 let end = accessor.key_end(last_key).ok_or_else(|| {
58 StorageError::page_corrupted(page.get_page_number(), "branch page has invalid key offset")
59 })?;
60 if end > page.memory().len() {
61 Err(StorageError::page_corrupted(
62 page.get_page_number(),
63 "branch page last offset beyond end of data",
64 ))
65 } else {
66 Ok(xxh3_checksum(&page.memory()[..end]))
67 }
68}
69
70#[derive(Debug, Copy, Clone, Eq, PartialEq)]
71pub(crate) struct BtreeHeader {
72 pub(crate) root: PageNumber,
73 pub(crate) checksum: Checksum,
74 pub(crate) length: u64,
75}
76
77impl BtreeHeader {
78 pub(crate) fn new(root: PageNumber, checksum: Checksum, length: u64) -> Self {
79 Self {
80 root,
81 checksum,
82 length,
83 }
84 }
85
86 pub(crate) const fn serialized_size() -> usize {
87 PageNumber::serialized_size() + size_of::<Checksum>() + size_of::<u64>()
88 }
89
90 pub(crate) fn from_le_bytes(bytes: [u8; Self::serialized_size()]) -> Self {
91 let root =
92 PageNumber::from_le_bytes(bytes[..PageNumber::serialized_size()].try_into().unwrap());
93 let mut offset = PageNumber::serialized_size();
94 let checksum = Checksum::from_le_bytes(
95 bytes[offset..(offset + size_of::<Checksum>())]
96 .try_into()
97 .unwrap(),
98 );
99 offset += size_of::<Checksum>();
100 let length = u64::from_le_bytes(
101 bytes[offset..(offset + size_of::<u64>())]
102 .try_into()
103 .unwrap(),
104 );
105
106 Self {
107 root,
108 checksum,
109 length,
110 }
111 }
112
113 pub(crate) fn to_le_bytes(self) -> [u8; Self::serialized_size()] {
114 let mut result = [0; Self::serialized_size()];
115 result[..PageNumber::serialized_size()].copy_from_slice(&self.root.to_le_bytes());
116 result[PageNumber::serialized_size()
117 ..(PageNumber::serialized_size() + size_of::<Checksum>())]
118 .copy_from_slice(&self.checksum.to_le_bytes());
119 result[(PageNumber::serialized_size() + size_of::<Checksum>())..]
120 .copy_from_slice(&self.length.to_le_bytes());
121
122 result
123 }
124}
125
126enum OnDrop {
127 None,
128 RemoveEntry {
129 position: usize,
130 fixed_key_size: Option<usize>,
131 fixed_value_size: Option<usize>,
132 },
133}
134
135enum EitherPage {
136 Immutable(PageImpl),
137 Mutable(PageMut),
138 OwnedMemory(Vec<u8>),
139 ArcMemory(Arc<[u8]>),
140}
141
142impl EitherPage {
143 fn memory(&self) -> &[u8] {
144 match self {
145 EitherPage::Immutable(page) => page.memory(),
146 EitherPage::Mutable(page) => page.memory(),
147 EitherPage::OwnedMemory(mem) => mem.as_slice(),
148 EitherPage::ArcMemory(mem) => mem,
149 }
150 }
151}
152
153pub struct AccessGuard<'a, V: Value + 'static> {
157 page: EitherPage,
158 offset: usize,
159 len: usize,
160 on_drop: OnDrop,
161 decompressed_value: Option<Box<[u8]>>,
164 _value_type: PhantomData<V>,
165 _lifetime: PhantomData<&'a ()>,
167}
168
169impl<V: Value + 'static> AccessGuard<'_, V> {
170 pub(crate) fn with_page(page: PageImpl, range: Range<usize>) -> Self {
171 Self {
172 page: EitherPage::Immutable(page),
173 offset: range.start,
174 len: range.len(),
175 on_drop: OnDrop::None,
176 decompressed_value: None,
177 _value_type: Default::default(),
178 _lifetime: Default::default(),
179 }
180 }
181
182 pub(crate) fn with_page_decompress(page: PageImpl, range: Range<usize>) -> Result<Self> {
185 let raw = &page.memory()[range.clone()];
186 match decompress_value(raw)? {
187 Cow::Borrowed(_) => {
188 Ok(Self::with_page(page, (range.start + 1)..range.end))
190 }
191 Cow::Owned(decompressed) => Ok(Self::with_owned_value(decompressed)),
192 }
193 }
194
195 pub(crate) fn with_arc_page(page: Arc<[u8]>, range: Range<usize>) -> Self {
196 Self {
197 page: EitherPage::ArcMemory(page),
198 offset: range.start,
199 len: range.len(),
200 on_drop: OnDrop::None,
201 decompressed_value: None,
202 _value_type: Default::default(),
203 _lifetime: Default::default(),
204 }
205 }
206
207 pub(crate) fn with_arc_page_decompress(page: Arc<[u8]>, range: Range<usize>) -> Result<Self> {
209 let raw = &page[range.clone()];
210 match decompress_value(raw)? {
211 Cow::Borrowed(_) => {
212 Ok(Self::with_arc_page(page, (range.start + 1)..range.end))
214 }
215 Cow::Owned(decompressed) => Ok(Self::with_owned_value(decompressed)),
216 }
217 }
218
219 pub(crate) fn with_owned_value(value: Vec<u8>) -> Self {
220 let len = value.len();
221 Self {
222 page: EitherPage::OwnedMemory(value),
223 offset: 0,
224 len,
225 on_drop: OnDrop::None,
226 decompressed_value: None,
227 _value_type: Default::default(),
228 _lifetime: Default::default(),
229 }
230 }
231
232 pub(crate) fn with_owned_value_decompress(value: Vec<u8>) -> Result<Self> {
234 match decompress_value(&value)? {
235 Cow::Borrowed(_) => {
236 Ok(Self::with_owned_value(value[1..].to_vec()))
238 }
239 Cow::Owned(decompressed) => Ok(Self::with_owned_value(decompressed)),
240 }
241 }
242
243 pub(super) fn remove_on_drop(
244 page: PageMut,
245 offset: usize,
246 len: usize,
247 position: usize,
248 fixed_key_size: Option<usize>,
249 fixed_value_size: Option<usize>,
250 ) -> Self {
251 Self {
252 page: EitherPage::Mutable(page),
253 offset,
254 len,
255 on_drop: OnDrop::RemoveEntry {
256 position,
257 fixed_key_size,
258 fixed_value_size,
259 },
260 decompressed_value: None,
261 _value_type: Default::default(),
262 _lifetime: Default::default(),
263 }
264 }
265
266 pub(super) fn remove_on_drop_decompress(
268 page: PageMut,
269 offset: usize,
270 len: usize,
271 position: usize,
272 fixed_key_size: Option<usize>,
273 fixed_value_size: Option<usize>,
274 ) -> Result<Self> {
275 let raw = &page.memory()[offset..(offset + len)];
276 let decompressed_value = match decompress_value(raw)? {
277 Cow::Borrowed(_) => {
278 Some(raw[1..].to_vec().into_boxed_slice())
280 }
281 Cow::Owned(decompressed) => Some(decompressed.into_boxed_slice()),
282 };
283 Ok(Self {
284 page: EitherPage::Mutable(page),
285 offset,
286 len,
287 on_drop: OnDrop::RemoveEntry {
288 position,
289 fixed_key_size,
290 fixed_value_size,
291 },
292 decompressed_value,
293 _value_type: Default::default(),
294 _lifetime: Default::default(),
295 })
296 }
297
298 pub fn value(&self) -> V::SelfType<'_> {
300 if let Some(ref decompressed) = self.decompressed_value {
301 V::from_bytes(decompressed)
302 } else {
303 V::from_bytes(&self.page.memory()[self.offset..(self.offset + self.len)])
304 }
305 }
306}
307
308impl<V: Value + 'static> Drop for AccessGuard<'_, V> {
309 fn drop(&mut self) {
310 match self.on_drop {
311 OnDrop::None => {}
312 OnDrop::RemoveEntry {
313 position,
314 fixed_key_size,
315 fixed_value_size,
316 } => {
317 if let EitherPage::Mutable(ref mut mut_page) = self.page {
318 if let Ok(mem) = mut_page.memory_mut() {
319 let mut mutator = LeafMutator::new(mem, fixed_key_size, fixed_value_size);
320 mutator.remove(position);
321 }
322 } else {
323 let is_panicking = {
324 #[cfg(feature = "std")]
325 {
326 std::thread::panicking()
327 }
328 #[cfg(not(feature = "std"))]
329 {
330 false
331 }
332 };
333 assert!(
334 is_panicking,
335 "AccessGuard with RemoveEntry on-drop requires a mutable page, but page was immutable"
336 );
337 }
338 }
339 }
340 }
341}
342
343pub struct AccessGuardMut<'a, V: Value + 'static> {
344 page: PageMut,
345 offset: usize,
346 len: usize,
347 entry_index: usize,
348 parent: Option<(PageMut, usize)>,
349 mem: Arc<TransactionalMemory>,
350 allocated: Arc<Mutex<PageTrackerPolicy>>,
351 root_ref: &'a mut BtreeHeader,
352 key_width: Option<usize>,
353 fixed_value_size: Option<usize>,
354 compression: CompressionConfig,
355 decompressed_value: Option<Box<[u8]>>,
356 _value_type: PhantomData<V>,
357}
358
359impl<'a, V: Value + 'static> AccessGuardMut<'a, V> {
360 #[allow(clippy::too_many_arguments)]
361 pub(crate) fn new(
362 page: PageMut,
363 offset: usize,
364 len: usize,
365 entry_index: usize,
366 parent: Option<(PageMut, usize)>,
367 mem: Arc<TransactionalMemory>,
368 allocated: Arc<Mutex<PageTrackerPolicy>>,
369 root_ref: &'a mut BtreeHeader,
370 key_width: Option<usize>,
371 fixed_value_size: Option<usize>,
372 compression: CompressionConfig,
373 ) -> Result<Self> {
374 assert!(mem.uncommitted(page.get_page_number()));
375 if let Some((ref parent_page, _)) = parent {
376 assert!(mem.uncommitted(parent_page.get_page_number()));
377 }
378 let decompressed_value = if compression.is_enabled() && len > 0 {
379 let raw = &page.memory()[offset..(offset + len)];
380 let decompressed = decompress_value(raw)?;
381 match decompressed {
382 Cow::Borrowed(_) => None,
383 Cow::Owned(v) => Some(v.into_boxed_slice()),
384 }
385 } else {
386 None
387 };
388 Ok(AccessGuardMut {
389 page,
390 offset,
391 len,
392 entry_index,
393 parent,
394 mem,
395 allocated,
396 root_ref,
397 key_width,
398 fixed_value_size,
399 compression,
400 decompressed_value,
401 _value_type: Default::default(),
402 })
403 }
404
405 pub fn value(&self) -> V::SelfType<'_> {
407 if let Some(ref decompressed) = self.decompressed_value {
408 V::from_bytes(decompressed)
409 } else if self.compression.is_enabled() && self.len > 0 {
410 V::from_bytes(&self.page.memory()[(self.offset + 1)..(self.offset + self.len)])
412 } else {
413 V::from_bytes(&self.page.memory()[self.offset..(self.offset + self.len)])
414 }
415 }
416
417 pub fn insert<'v>(&mut self, value: impl Borrow<V::SelfType<'v>>) -> Result<()> {
419 let raw_value_bytes = V::as_bytes(value.borrow());
420 let stored_value = if self.compression.is_enabled() {
421 compress_value(raw_value_bytes.as_ref(), self.compression)
422 } else {
423 raw_value_bytes.as_ref().to_vec()
424 };
425
426 let key_bytes = {
427 let accessor =
428 LeafAccessor::new(self.page.memory(), self.key_width, self.fixed_value_size)?;
429 accessor.key_unchecked(self.entry_index).to_vec()
430 };
431
432 if LeafMutator::sufficient_insert_inplace_space(
433 &self.page,
434 self.entry_index,
435 true,
436 self.key_width,
437 self.fixed_value_size,
438 key_bytes.as_slice(),
439 &stored_value,
440 ) {
441 let mut mutator = LeafMutator::new(
442 self.page.memory_mut()?,
443 self.key_width,
444 self.fixed_value_size,
445 );
446 mutator.insert(self.entry_index, true, &key_bytes, &stored_value);
447 } else {
448 let accessor =
449 LeafAccessor::new(self.page.memory(), self.key_width, self.fixed_value_size)?;
450 let mut builder = LeafBuilder::new(
451 &self.mem,
452 &self.allocated,
453 accessor.num_pairs(),
454 self.key_width,
455 self.fixed_value_size,
456 );
457
458 for i in 0..accessor.num_pairs() {
459 if i == self.entry_index {
460 builder.push(&key_bytes, &stored_value);
461 } else {
462 let entry = accessor.entry(i).unwrap();
463 builder.push(entry.key(), entry.value());
464 }
465 }
466
467 let new_page = builder.build()?;
468
469 if let Some((ref mut parent_page, parent_entry_index)) = self.parent {
471 let mut mutator = BranchMutator::new(parent_page.memory_mut()?);
472 mutator.write_child_page(parent_entry_index, new_page.get_page_number(), DEFERRED);
473 } else {
474 self.root_ref.root = new_page.get_page_number();
475 self.root_ref.checksum = DEFERRED;
476 }
477
478 let old_page_number = self.page.get_page_number();
479 self.page = new_page;
480 let mut allocated = self.allocated.lock();
481 assert!(
482 self.mem
483 .free_if_uncommitted(old_page_number, &mut allocated)
484 );
485 }
486
487 let new_accessor =
489 LeafAccessor::new(self.page.memory(), self.key_width, self.fixed_value_size)?;
490 let (new_start, new_end) = new_accessor.value_range(self.entry_index).unwrap();
491
492 self.offset = new_start;
493 self.len = new_end - new_start;
494
495 if self.compression.is_enabled() && self.len > 0 {
497 let raw = &self.page.memory()[self.offset..(self.offset + self.len)];
498 self.decompressed_value = match decompress_value(raw)? {
499 Cow::Borrowed(_) => None,
500 Cow::Owned(v) => Some(v.into_boxed_slice()),
501 };
502 } else {
503 self.decompressed_value = None;
504 }
505
506 Ok(())
507 }
508}
509
510pub struct AccessGuardMutInPlace<'a, V: Value + 'static> {
511 page: PageMut,
512 offset: usize,
513 len: usize,
514 _value_type: PhantomData<V>,
515 _lifetime: PhantomData<&'a ()>,
517}
518
519impl<V: Value + 'static> AccessGuardMutInPlace<'_, V> {
520 pub(crate) fn new(page: PageMut, offset: usize, len: usize) -> Self {
521 AccessGuardMutInPlace {
522 page,
523 offset,
524 len,
525 _value_type: Default::default(),
526 _lifetime: Default::default(),
527 }
528 }
529}
530
531impl<V: MutInPlaceValue + 'static> AsMut<V::BaseRefType> for AccessGuardMutInPlace<'_, V> {
532 fn as_mut(&mut self) -> &mut V::BaseRefType {
533 let mem = self
534 .page
535 .memory_mut()
536 .expect("AccessGuardMutInPlace requires exclusive page access");
537 V::from_bytes_mut(&mut mem[self.offset..(self.offset + self.len)])
538 }
539}
540
541pub struct EntryAccessor<'a> {
543 key: &'a [u8],
544 value: &'a [u8],
545}
546
547impl<'a> EntryAccessor<'a> {
548 fn new(key: &'a [u8], value: &'a [u8]) -> Self {
549 EntryAccessor { key, value }
550 }
551}
552
553impl<'a: 'b, 'b> EntryAccessor<'a> {
554 pub(crate) fn key(&'b self) -> &'a [u8] {
555 self.key
556 }
557
558 pub(crate) fn value(&'b self) -> &'a [u8] {
559 self.value
560 }
561}
562
563pub(crate) struct LeafAccessor<'a> {
565 page: &'a [u8],
566 fixed_key_size: Option<usize>,
567 fixed_value_size: Option<usize>,
568 num_pairs: usize,
569}
570
571impl<'a> LeafAccessor<'a> {
572 pub(crate) fn new(
573 page: &'a [u8],
574 fixed_key_size: Option<usize>,
575 fixed_value_size: Option<usize>,
576 ) -> core::result::Result<Self, StorageError> {
577 if page.len() < 4 {
578 return Err(StorageError::Corrupted(
579 "Leaf page too small for header".into(),
580 ));
581 }
582 debug_assert_eq!(page[0], LEAF);
583 let num_pairs = u16::from_le_bytes(
584 page[2..4]
585 .try_into()
586 .map_err(|_| StorageError::Corrupted("Leaf page: bad num_pairs".into()))?,
587 ) as usize;
588 let mut min_size = 4usize;
589 if fixed_key_size.is_none() {
590 min_size = min_size.saturating_add(size_of::<u32>().saturating_mul(num_pairs));
591 }
592 if fixed_value_size.is_none() {
593 min_size = min_size.saturating_add(size_of::<u32>().saturating_mul(num_pairs));
594 }
595 if min_size > page.len() {
596 return Err(StorageError::Corrupted(format!(
597 "Leaf page: num_pairs={num_pairs} requires {min_size}B for offset tables, page is {}B",
598 page.len()
599 )));
600 }
601 Ok(LeafAccessor {
602 page,
603 fixed_key_size,
604 fixed_value_size,
605 num_pairs,
606 })
607 }
608
609 #[cfg(feature = "std")]
610 pub(super) fn print_node<K: Key, V: Value>(&self, include_value: bool) {
611 let mut i = 0;
612 while let Some(entry) = self.entry(i) {
613 eprint!(" key_{i}={:?}", K::from_bytes(entry.key()));
614 if include_value {
615 eprint!(" value_{i}={:?}", V::from_bytes(entry.value()));
616 }
617 i += 1;
618 }
619 }
620
621 pub(crate) fn position<K: Key>(&self, query: &[u8]) -> (usize, bool) {
622 let mut min_entry = 0;
624 let mut max_entry = self.num_pairs();
626 while min_entry < max_entry {
627 let mid = min_entry.midpoint(max_entry);
628 let key = self.key_unchecked(mid);
629 match K::compare(query, key) {
630 Ordering::Less => {
631 max_entry = mid;
632 }
633 Ordering::Equal => {
634 return (mid, true);
635 }
636 Ordering::Greater => {
637 min_entry = mid + 1;
638 }
639 }
640 }
641 debug_assert_eq!(min_entry, max_entry);
642 (min_entry, false)
643 }
644
645 pub(crate) fn find_key<K: Key>(&self, query: &[u8]) -> Option<usize> {
646 let (entry, found) = self.position::<K>(query);
647 if found { Some(entry) } else { None }
648 }
649
650 fn key_section_start(&self) -> usize {
651 let mut offset = 4;
652 if self.fixed_key_size.is_none() {
653 offset += size_of::<u32>() * self.num_pairs;
654 }
655 if self.fixed_value_size.is_none() {
656 offset += size_of::<u32>() * self.num_pairs;
657 }
658
659 offset
660 }
661
662 fn key_start(&self, n: usize) -> Option<usize> {
663 if n == 0 {
664 Some(self.key_section_start())
665 } else {
666 self.key_end(n - 1)
667 }
668 }
669
670 fn key_end(&self, n: usize) -> Option<usize> {
671 if n >= self.num_pairs() {
672 None
673 } else {
674 if let Some(fixed) = self.fixed_key_size {
675 return Some(self.key_section_start() + fixed * (n + 1));
676 }
677 let offset = 4 + size_of::<u32>() * n;
678 let end = u32::from_le_bytes(
679 self.page
680 .get(offset..(offset + size_of::<u32>()))?
681 .try_into()
682 .unwrap(),
683 ) as usize;
684 Some(end)
685 }
686 }
687
688 fn value_start(&self, n: usize) -> Option<usize> {
689 if self.num_pairs() == 0 {
690 return None;
691 }
692 if n == 0 {
693 self.key_end(self.num_pairs() - 1)
694 } else {
695 self.value_end(n - 1)
696 }
697 }
698
699 fn value_end(&self, n: usize) -> Option<usize> {
700 if n >= self.num_pairs() {
701 None
702 } else {
703 if let Some(fixed) = self.fixed_value_size {
704 return Some(self.key_end(self.num_pairs.checked_sub(1)?)? + fixed * (n + 1));
705 }
706 let mut offset = 4 + size_of::<u32>() * n;
707 if self.fixed_key_size.is_none() {
708 offset += size_of::<u32>() * self.num_pairs;
709 }
710 let end = u32::from_le_bytes(
711 self.page
712 .get(offset..(offset + size_of::<u32>()))?
713 .try_into()
714 .unwrap(),
715 ) as usize;
716 Some(end)
717 }
718 }
719
720 pub(crate) fn num_pairs(&self) -> usize {
721 self.num_pairs
722 }
723
724 pub(super) fn offset_of_first_value(&self) -> usize {
725 self.offset_of_value(0).unwrap()
726 }
727
728 pub(super) fn offset_of_value(&self, n: usize) -> Option<usize> {
729 self.value_start(n)
730 }
731
732 pub(super) fn value_range(&self, n: usize) -> Option<(usize, usize)> {
733 Some((self.value_start(n)?, self.value_end(n)?))
734 }
735
736 pub(crate) fn length_of_pairs(&self, start: usize, end: usize) -> usize {
738 self.length_of_values(start, end) + self.length_of_keys(start, end)
739 }
740
741 fn length_of_values(&self, start: usize, end: usize) -> usize {
742 if end == 0 {
743 return 0;
744 }
745 let end_offset = self.value_end(end - 1).unwrap();
746 let start_offset = self.value_start(start).unwrap();
747 end_offset - start_offset
748 }
749
750 pub(crate) fn length_of_keys(&self, start: usize, end: usize) -> usize {
752 if end == 0 {
753 return 0;
754 }
755 let end_offset = self.key_end(end - 1).unwrap();
756 let start_offset = self.key_start(start).unwrap();
757 end_offset - start_offset
758 }
759
760 pub(crate) fn total_length(&self) -> usize {
761 self.value_end(self.num_pairs() - 1).unwrap()
763 }
764
765 fn key_unchecked(&self, n: usize) -> &[u8] {
766 &self.page[self.key_start(n).unwrap()..self.key_end(n).unwrap()]
767 }
768
769 pub(crate) fn entry(&self, n: usize) -> Option<EntryAccessor<'a>> {
770 let key = &self.page[self.key_start(n)?..self.key_end(n)?];
771 let value = &self.page[self.value_start(n)?..self.value_end(n)?];
772 Some(EntryAccessor::new(key, value))
773 }
774
775 pub(crate) fn entry_ranges(&self, n: usize) -> Option<(Range<usize>, Range<usize>)> {
776 let key = self.key_start(n)?..self.key_end(n)?;
777 let value = self.value_start(n)?..self.value_end(n)?;
778 Some((key, value))
779 }
780
781 pub(super) fn last_entry(&self) -> EntryAccessor<'a> {
782 self.entry(self.num_pairs() - 1).unwrap()
783 }
784}
785
786pub(super) struct LeafBuilder<'a, 'b> {
787 pairs: Vec<(&'a [u8], &'a [u8])>,
788 fixed_key_size: Option<usize>,
789 fixed_value_size: Option<usize>,
790 total_key_bytes: usize,
791 total_value_bytes: usize,
792 mem: &'b TransactionalMemory,
793 allocated_pages: &'b Mutex<PageTrackerPolicy>,
794}
795
796impl<'a, 'b> LeafBuilder<'a, 'b> {
797 pub(super) fn required_bytes(&self, num_pairs: usize, keys_values_bytes: usize) -> usize {
798 RawLeafBuilder::required_bytes(
799 num_pairs,
800 keys_values_bytes,
801 self.fixed_key_size,
802 self.fixed_value_size,
803 )
804 }
805
806 pub(super) fn new(
807 mem: &'b TransactionalMemory,
808 allocated_pages: &'b Mutex<PageTrackerPolicy>,
809 capacity: usize,
810 fixed_key_size: Option<usize>,
811 fixed_value_size: Option<usize>,
812 ) -> Self {
813 Self {
814 pairs: Vec::with_capacity(capacity),
815 fixed_key_size,
816 fixed_value_size,
817 total_key_bytes: 0,
818 total_value_bytes: 0,
819 mem,
820 allocated_pages,
821 }
822 }
823
824 pub(super) fn push(&mut self, key: &'a [u8], value: &'a [u8]) {
825 self.total_key_bytes += key.len();
826 self.total_value_bytes += value.len();
827 self.pairs.push((key, value));
828 }
829
830 pub(super) fn push_all_except(
831 &mut self,
832 accessor: &'a LeafAccessor<'_>,
833 except: Option<usize>,
834 ) {
835 for i in 0..accessor.num_pairs() {
836 if let Some(except) = except
837 && except == i
838 {
839 continue;
840 }
841 let entry = accessor.entry(i).unwrap();
842 self.push(entry.key(), entry.value());
843 }
844 }
845
846 pub(super) fn should_split(&self) -> bool {
847 let required_size = self.required_bytes(
848 self.pairs.len(),
849 self.total_key_bytes + self.total_value_bytes,
850 );
851 required_size > self.mem.get_page_size() && self.pairs.len() > 1
852 }
853
854 pub(super) fn build_split(self) -> Result<(PageMut, &'a [u8], PageMut)> {
855 let total_size = self.total_key_bytes + self.total_value_bytes;
856 let mut division = 0;
857 let mut first_split_key_bytes = 0;
858 let mut first_split_value_bytes = 0;
859 for (key, value) in self.pairs.iter().take(self.pairs.len() - 1) {
860 first_split_key_bytes += key.len();
861 first_split_value_bytes += value.len();
862 division += 1;
863 if first_split_key_bytes + first_split_value_bytes >= total_size / 2 {
864 break;
865 }
866 }
867
868 let required_size =
869 self.required_bytes(division, first_split_key_bytes + first_split_value_bytes);
870 let mut allocated_pages = self.allocated_pages.lock();
871 let mut page1 = self.mem.allocate(required_size, &mut allocated_pages)?;
872 let mut builder = RawLeafBuilder::new(
873 page1.memory_mut()?,
874 division,
875 self.fixed_key_size,
876 self.fixed_value_size,
877 first_split_key_bytes,
878 );
879 for (key, value) in self.pairs.iter().take(division) {
880 builder.append(key, value);
881 }
882 drop(builder);
883
884 let required_size = self.required_bytes(
885 self.pairs.len() - division,
886 self.total_key_bytes + self.total_value_bytes
887 - first_split_key_bytes
888 - first_split_value_bytes,
889 );
890 let mut page2 = self.mem.allocate(required_size, &mut allocated_pages)?;
891 let mut builder = RawLeafBuilder::new(
892 page2.memory_mut()?,
893 self.pairs.len() - division,
894 self.fixed_key_size,
895 self.fixed_value_size,
896 self.total_key_bytes - first_split_key_bytes,
897 );
898 for (key, value) in &self.pairs[division..] {
899 builder.append(key, value);
900 }
901 drop(builder);
902
903 Ok((page1, self.pairs[division - 1].0, page2))
904 }
905
906 pub(super) fn build(self) -> Result<PageMut> {
907 let required_size = self.required_bytes(
908 self.pairs.len(),
909 self.total_key_bytes + self.total_value_bytes,
910 );
911 let mut allocated_pages = self.allocated_pages.lock();
912 let mut page = self.mem.allocate(required_size, &mut allocated_pages)?;
913 let mut builder = RawLeafBuilder::new(
914 page.memory_mut()?,
915 self.pairs.len(),
916 self.fixed_key_size,
917 self.fixed_value_size,
918 self.total_key_bytes,
919 );
920 for (key, value) in self.pairs {
921 builder.append(key, value);
922 }
923 drop(builder);
924 Ok(page)
925 }
926}
927
928pub(crate) struct RawLeafBuilder<'a> {
943 page: &'a mut [u8],
944 fixed_key_size: Option<usize>,
945 fixed_value_size: Option<usize>,
946 num_pairs: usize,
947 provisioned_key_bytes: usize,
948 pairs_written: usize, }
950
951impl<'a> RawLeafBuilder<'a> {
952 pub(crate) fn required_bytes(
953 num_pairs: usize,
954 keys_values_bytes: usize,
955 key_size: Option<usize>,
956 value_size: Option<usize>,
957 ) -> usize {
958 let mut result = 4;
960 if key_size.is_none() {
962 result += num_pairs * size_of::<u32>();
963 }
964 if value_size.is_none() {
965 result += num_pairs * size_of::<u32>();
966 }
967 result += keys_values_bytes;
968
969 result
970 }
971
972 pub(crate) fn new(
973 page: &'a mut [u8],
974 num_pairs: usize,
975 fixed_key_size: Option<usize>,
976 fixed_value_size: Option<usize>,
977 key_bytes: usize,
978 ) -> Self {
979 page[0] = LEAF;
980 page[2..4].copy_from_slice(&u16::try_from(num_pairs).unwrap().to_le_bytes());
981 #[cfg(debug_assertions)]
982 {
983 let mut last = 4;
985 if fixed_key_size.is_none() {
986 last += size_of::<u32>() * num_pairs;
987 }
988 if fixed_value_size.is_none() {
989 last += size_of::<u32>() * num_pairs;
990 }
991 for x in &mut page[4..last] {
992 *x = 0xFF;
993 }
994 }
995 RawLeafBuilder {
996 page,
997 fixed_key_size,
998 fixed_value_size,
999 num_pairs,
1000 provisioned_key_bytes: key_bytes,
1001 pairs_written: 0,
1002 }
1003 }
1004
1005 fn value_end(&self, n: usize) -> usize {
1006 if let Some(fixed) = self.fixed_value_size {
1007 return self.key_section_start() + self.provisioned_key_bytes + fixed * (n + 1);
1008 }
1009 let mut offset = 4 + size_of::<u32>() * n;
1010 if self.fixed_key_size.is_none() {
1011 offset += size_of::<u32>() * self.num_pairs;
1012 }
1013 u32::from_le_bytes(
1014 self.page[offset..(offset + size_of::<u32>())]
1015 .try_into()
1016 .unwrap(),
1017 ) as usize
1018 }
1019
1020 fn key_section_start(&self) -> usize {
1021 let mut offset = 4;
1022 if self.fixed_key_size.is_none() {
1023 offset += size_of::<u32>() * self.num_pairs;
1024 }
1025 if self.fixed_value_size.is_none() {
1026 offset += size_of::<u32>() * self.num_pairs;
1027 }
1028
1029 offset
1030 }
1031
1032 fn key_end(&self, n: usize) -> usize {
1033 if let Some(fixed) = self.fixed_key_size {
1034 return self.key_section_start() + fixed * (n + 1);
1035 }
1036 let offset = 4 + size_of::<u32>() * n;
1037 u32::from_le_bytes(
1038 self.page[offset..(offset + size_of::<u32>())]
1039 .try_into()
1040 .unwrap(),
1041 ) as usize
1042 }
1043
1044 pub(crate) fn append(&mut self, key: &[u8], value: &[u8]) {
1045 if let Some(key_width) = self.fixed_key_size {
1046 assert_eq!(key_width, key.len());
1047 }
1048 if let Some(value_width) = self.fixed_value_size {
1049 assert_eq!(value_width, value.len());
1050 }
1051 let key_offset = if self.pairs_written == 0 {
1052 self.key_section_start()
1053 } else {
1054 self.key_end(self.pairs_written - 1)
1055 };
1056 let value_offset = if self.pairs_written == 0 {
1057 self.key_section_start() + self.provisioned_key_bytes
1058 } else {
1059 self.value_end(self.pairs_written - 1)
1060 };
1061
1062 let n = self.pairs_written;
1063 if self.fixed_key_size.is_none() {
1064 let offset = 4 + size_of::<u32>() * n;
1065 self.page[offset..(offset + size_of::<u32>())]
1066 .copy_from_slice(&u32::try_from(key_offset + key.len()).unwrap().to_le_bytes());
1067 }
1068 self.page[key_offset..(key_offset + key.len())].copy_from_slice(key);
1069 let written_key_len = key_offset + key.len() - self.key_section_start();
1070 assert!(written_key_len <= self.provisioned_key_bytes);
1071
1072 if self.fixed_value_size.is_none() {
1073 let mut offset = 4 + size_of::<u32>() * n;
1074 if self.fixed_key_size.is_none() {
1075 offset += size_of::<u32>() * self.num_pairs;
1076 }
1077 self.page[offset..(offset + size_of::<u32>())].copy_from_slice(
1078 &u32::try_from(value_offset + value.len())
1079 .unwrap()
1080 .to_le_bytes(),
1081 );
1082 }
1083 self.page[value_offset..(value_offset + value.len())].copy_from_slice(value);
1084 self.pairs_written += 1;
1085 }
1086}
1087
1088impl Drop for RawLeafBuilder<'_> {
1089 fn drop(&mut self) {
1090 let is_panicking = {
1091 #[cfg(feature = "std")]
1092 {
1093 std::thread::panicking()
1094 }
1095 #[cfg(not(feature = "std"))]
1096 {
1097 false
1098 }
1099 };
1100 if !is_panicking {
1101 assert_eq!(self.pairs_written, self.num_pairs);
1102 if self.num_pairs > 0 {
1103 assert_eq!(
1104 self.key_section_start() + self.provisioned_key_bytes,
1105 self.key_end(self.num_pairs - 1)
1106 );
1107 }
1108 }
1109 }
1110}
1111
1112pub(crate) struct LeafMutator<'b> {
1113 page: &'b mut [u8],
1114 fixed_key_size: Option<usize>,
1115 fixed_value_size: Option<usize>,
1116}
1117
1118impl<'b> LeafMutator<'b> {
1119 pub(crate) fn new(
1120 page: &'b mut [u8],
1121 fixed_key_size: Option<usize>,
1122 fixed_value_size: Option<usize>,
1123 ) -> Self {
1124 assert_eq!(page[0], LEAF);
1125 Self {
1126 page,
1127 fixed_key_size,
1128 fixed_value_size,
1129 }
1130 }
1131
1132 pub(super) fn sufficient_insert_inplace_space(
1133 page: &'_ impl Page,
1134 position: usize,
1135 overwrite: bool,
1136 fixed_key_size: Option<usize>,
1137 fixed_value_size: Option<usize>,
1138 new_key: &[u8],
1139 new_value: &[u8],
1140 ) -> bool {
1141 let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size)
1142 .expect("internal: constructed page is valid");
1143 if overwrite {
1144 let remaining = page.memory().len() - accessor.total_length();
1145 let required_delta = isize::try_from(new_key.len() + new_value.len()).unwrap()
1146 - isize::try_from(accessor.length_of_pairs(position, position + 1)).unwrap();
1147 required_delta <= isize::try_from(remaining).unwrap()
1148 } else {
1149 if page.get_page_number().page_order > 0 && position < accessor.num_pairs() {
1154 return false;
1155 }
1156 let remaining = page.memory().len() - accessor.total_length();
1157 let mut required_delta = new_key.len() + new_value.len();
1158 if fixed_key_size.is_none() {
1159 required_delta += size_of::<u32>();
1160 }
1161 if fixed_value_size.is_none() {
1162 required_delta += size_of::<u32>();
1163 }
1164 required_delta <= remaining
1165 }
1166 }
1167
1168 pub(crate) fn insert(&mut self, i: usize, overwrite: bool, key: &[u8], value: &[u8]) {
1170 let accessor = LeafAccessor::new(self.page, self.fixed_key_size, self.fixed_value_size)
1171 .expect("internal: constructed page is valid");
1172 let required_delta = if overwrite {
1173 isize::try_from(key.len() + value.len()).unwrap()
1174 - isize::try_from(accessor.length_of_pairs(i, i + 1)).unwrap()
1175 } else {
1176 let mut delta = key.len() + value.len();
1177 if self.fixed_key_size.is_none() {
1178 delta += size_of::<u32>();
1179 }
1180 if self.fixed_value_size.is_none() {
1181 delta += size_of::<u32>();
1182 }
1183 delta.try_into().unwrap()
1184 };
1185 assert!(
1186 isize::try_from(accessor.total_length()).unwrap() + required_delta
1187 <= isize::try_from(self.page.len()).unwrap()
1188 );
1189
1190 let num_pairs = accessor.num_pairs();
1191 let last_key_end = accessor.key_end(accessor.num_pairs() - 1).unwrap();
1192 let last_value_end = accessor.value_end(accessor.num_pairs() - 1).unwrap();
1193 let shift_index = if overwrite { i + 1 } else { i };
1194 let shift_key_start = accessor.key_start(shift_index).unwrap_or(last_key_end);
1195 let shift_value_start = accessor.value_start(shift_index).unwrap_or(last_value_end);
1196 let existing_value_len = accessor
1197 .value_range(i)
1198 .map(|(start, end)| end - start)
1199 .unwrap_or_default();
1200
1201 let value_delta = if overwrite {
1202 isize::try_from(value.len()).unwrap() - isize::try_from(existing_value_len).unwrap()
1203 } else {
1204 value.len().try_into().unwrap()
1205 };
1206
1207 let key_ptr_size: usize = if self.fixed_key_size.is_none() { 4 } else { 0 };
1209 let value_ptr_size: usize = if self.fixed_value_size.is_none() {
1210 4
1211 } else {
1212 0
1213 };
1214 if !overwrite {
1215 for j in 0..i {
1216 self.update_key_end(j, (key_ptr_size + value_ptr_size).try_into().unwrap());
1217 let value_delta: isize = (key_ptr_size + value_ptr_size + key.len())
1218 .try_into()
1219 .unwrap();
1220 self.update_value_end(j, value_delta);
1221 }
1222 }
1223 for j in i..num_pairs {
1224 if overwrite {
1225 self.update_value_end(j, value_delta);
1226 } else {
1227 let key_delta: isize = (key_ptr_size + value_ptr_size + key.len())
1228 .try_into()
1229 .unwrap();
1230 self.update_key_end(j, key_delta);
1231 let value_delta = key_delta + isize::try_from(value.len()).unwrap();
1232 self.update_value_end(j, value_delta);
1233 }
1234 }
1235
1236 let new_num_pairs = if overwrite { num_pairs } else { num_pairs + 1 };
1237 self.page[2..4].copy_from_slice(&u16::try_from(new_num_pairs).unwrap().to_le_bytes());
1238
1239 let mut dest = if overwrite {
1241 (isize::try_from(shift_value_start).unwrap() + value_delta)
1242 .try_into()
1243 .unwrap()
1244 } else {
1245 shift_value_start + key_ptr_size + value_ptr_size + key.len() + value.len()
1246 };
1247 let start = shift_value_start;
1248 let end = last_value_end;
1249 self.page.copy_within(start..end, dest);
1250
1251 let inserted_value_end: u32 = dest.try_into().unwrap();
1253 dest -= value.len();
1254 self.page[dest..(dest + value.len())].copy_from_slice(value);
1255
1256 if !overwrite {
1257 let start = shift_key_start;
1259 let end = shift_value_start;
1260 dest -= end - start;
1261 self.page.copy_within(start..end, dest);
1262
1263 let inserted_key_end: u32 = dest.try_into().unwrap();
1265 dest -= key.len();
1266 self.page[dest..(dest + key.len())].copy_from_slice(key);
1267
1268 let start = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
1270 let end = shift_key_start;
1271 dest -= end - start;
1272 debug_assert_eq!(
1273 dest,
1274 4 + key_ptr_size * new_num_pairs + value_ptr_size * (i + 1)
1275 );
1276 self.page.copy_within(start..end, dest);
1277
1278 if self.fixed_value_size.is_none() {
1280 dest -= size_of::<u32>();
1281 self.page[dest..(dest + size_of::<u32>())]
1282 .copy_from_slice(&inserted_value_end.to_le_bytes());
1283 }
1284
1285 let start = 4 + key_ptr_size * i;
1287 let end = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
1288 dest -= end - start;
1289 debug_assert_eq!(dest, 4 + key_ptr_size * (i + 1));
1290 self.page.copy_within(start..end, dest);
1291
1292 if self.fixed_key_size.is_none() {
1294 dest -= size_of::<u32>();
1295 self.page[dest..(dest + size_of::<u32>())]
1296 .copy_from_slice(&inserted_key_end.to_le_bytes());
1297 }
1298 debug_assert_eq!(dest, 4 + key_ptr_size * i);
1299 }
1300 }
1301
1302 pub(super) fn remove(&mut self, i: usize) {
1303 let accessor = LeafAccessor::new(self.page, self.fixed_key_size, self.fixed_value_size)
1304 .expect("internal: constructed page is valid");
1305 let num_pairs = accessor.num_pairs();
1306 assert!(i < num_pairs);
1307 assert!(num_pairs > 1);
1308 let key_start = accessor.key_start(i).unwrap();
1309 let key_end = accessor.key_end(i).unwrap();
1310 let value_start = accessor.value_start(i).unwrap();
1311 let value_end = accessor.value_end(i).unwrap();
1312 let last_value_end = accessor.value_end(accessor.num_pairs() - 1).unwrap();
1313
1314 let key_ptr_size = if self.fixed_key_size.is_none() {
1316 size_of::<u32>()
1317 } else {
1318 0
1319 };
1320 let value_ptr_size = if self.fixed_value_size.is_none() {
1321 size_of::<u32>()
1322 } else {
1323 0
1324 };
1325 for j in 0..i {
1326 self.update_key_end(j, -isize::try_from(key_ptr_size + value_ptr_size).unwrap());
1327 let value_delta = -isize::try_from(key_ptr_size + value_ptr_size).unwrap()
1328 - isize::try_from(key_end - key_start).unwrap();
1329 self.update_value_end(j, value_delta);
1330 }
1331 for j in (i + 1)..num_pairs {
1332 let key_delta = -isize::try_from(key_ptr_size + value_ptr_size).unwrap()
1333 - isize::try_from(key_end - key_start).unwrap();
1334 self.update_key_end(j, key_delta);
1335 let value_delta = key_delta - isize::try_from(value_end - value_start).unwrap();
1336 self.update_value_end(j, value_delta);
1337 }
1338
1339 let new_num_pairs = num_pairs - 1;
1342 self.page[2..4].copy_from_slice(&u16::try_from(new_num_pairs).unwrap().to_le_bytes());
1343 let mut dest = 4 + key_ptr_size * i;
1345 let start = 4 + key_ptr_size * (i + 1);
1347 let end = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
1349 self.page.copy_within(start..end, dest);
1350 dest += end - start;
1351 debug_assert_eq!(dest, 4 + key_ptr_size * new_num_pairs + value_ptr_size * i);
1352
1353 let start = 4 + key_ptr_size * num_pairs + value_ptr_size * (i + 1);
1355 let end = key_start;
1356 self.page.copy_within(start..end, dest);
1357 dest += end - start;
1358
1359 let preceding_key_len = key_start - (4 + (key_ptr_size + value_ptr_size) * num_pairs);
1360 debug_assert_eq!(
1361 dest,
1362 4 + (key_ptr_size + value_ptr_size) * new_num_pairs + preceding_key_len
1363 );
1364
1365 let start = key_end;
1367 let end = value_start;
1368 self.page.copy_within(start..end, dest);
1369 dest += end - start;
1370
1371 let preceding_data_len =
1373 value_start - (4 + (key_ptr_size + value_ptr_size) * num_pairs) - (key_end - key_start);
1374 debug_assert_eq!(
1375 dest,
1376 4 + (key_ptr_size + value_ptr_size) * new_num_pairs + preceding_data_len
1377 );
1378 let start = value_end;
1379 let end = last_value_end;
1380 self.page.copy_within(start..end, dest);
1381 }
1382
1383 fn update_key_end(&mut self, i: usize, delta: isize) {
1384 if self.fixed_key_size.is_some() {
1385 return;
1386 }
1387 let offset = 4 + size_of::<u32>() * i;
1388 let mut ptr = u32::from_le_bytes(
1389 self.page[offset..(offset + size_of::<u32>())]
1390 .try_into()
1391 .unwrap(),
1392 );
1393 ptr = (isize::try_from(ptr).unwrap() + delta).try_into().unwrap();
1394 self.page[offset..(offset + size_of::<u32>())].copy_from_slice(&ptr.to_le_bytes());
1395 }
1396
1397 fn update_value_end(&mut self, i: usize, delta: isize) {
1398 if self.fixed_value_size.is_some() {
1399 return;
1400 }
1401 let accessor = LeafAccessor::new(self.page, self.fixed_key_size, self.fixed_value_size)
1402 .expect("internal: constructed page is valid");
1403 let num_pairs = accessor.num_pairs();
1404 let mut offset = 4 + size_of::<u32>() * i;
1405 if self.fixed_key_size.is_none() {
1406 offset += size_of::<u32>() * num_pairs;
1407 }
1408 let mut ptr = u32::from_le_bytes(
1409 self.page[offset..(offset + size_of::<u32>())]
1410 .try_into()
1411 .unwrap(),
1412 );
1413 ptr = (isize::try_from(ptr).unwrap() + delta).try_into().unwrap();
1414 self.page[offset..(offset + size_of::<u32>())].copy_from_slice(&ptr.to_le_bytes());
1415 }
1416}
1417
1418pub(crate) struct BranchAccessor<'a: 'b, 'b, T: Page + 'a> {
1421 page: &'b T,
1422 num_keys: usize,
1423 fixed_key_size: Option<usize>,
1424 _page_lifetime: PhantomData<&'a ()>,
1425}
1426
1427impl<'a: 'b, 'b, T: Page + 'a> BranchAccessor<'a, 'b, T> {
1428 pub(crate) fn new(
1429 page: &'b T,
1430 fixed_key_size: Option<usize>,
1431 ) -> core::result::Result<Self, StorageError> {
1432 let mem = page.memory();
1433 if mem.len() < 8 {
1434 return Err(StorageError::Corrupted(
1435 "Branch page too small for header".into(),
1436 ));
1437 }
1438 debug_assert_eq!(mem[0], BRANCH);
1439 let num_keys = u16::from_le_bytes(
1440 mem[2..4]
1441 .try_into()
1442 .map_err(|_| StorageError::Corrupted("Branch page: bad num_keys".into()))?,
1443 ) as usize;
1444 let num_children = num_keys + 1;
1445 let child_table_size =
1446 (PageNumber::serialized_size() + size_of::<Checksum>()) * num_children;
1447 let mut min_size = 8usize.saturating_add(child_table_size);
1448 if fixed_key_size.is_none() {
1449 min_size = min_size.saturating_add(size_of::<u32>().saturating_mul(num_keys));
1450 }
1451 if min_size > mem.len() {
1452 return Err(StorageError::Corrupted(format!(
1453 "Branch page: num_keys={num_keys} requires {min_size}B for headers, page is {}B",
1454 mem.len()
1455 )));
1456 }
1457 Ok(BranchAccessor {
1458 page,
1459 num_keys,
1460 fixed_key_size,
1461 _page_lifetime: Default::default(),
1462 })
1463 }
1464
1465 #[cfg(feature = "std")]
1466 pub(super) fn print_node<K: Key>(&self) {
1467 eprint!(
1468 "Internal[ (page={:?}), child_0={:?}",
1469 self.page.get_page_number(),
1470 self.child_page(0).unwrap()
1471 );
1472 for i in 0..(self.count_children() - 1) {
1473 if let Some(child) = self.child_page(i + 1) {
1474 let key = self.key(i).unwrap();
1475 eprint!(" key_{i}={:?}", K::from_bytes(key));
1476 eprint!(" child_{}={child:?}", i + 1);
1477 }
1478 }
1479 eprint!("]");
1480 }
1481
1482 pub(crate) fn total_length(&self) -> usize {
1483 self.key_end(self.num_keys() - 1).unwrap()
1485 }
1486
1487 pub(super) fn child_for_key<K: Key>(&self, query: &[u8]) -> (usize, PageNumber) {
1488 let mut min_child = 0; let mut max_child = self.num_keys(); while min_child < max_child {
1491 let mid = min_child.midpoint(max_child);
1492 match K::compare(query, self.key(mid).unwrap()) {
1493 Ordering::Less => {
1494 max_child = mid;
1495 }
1496 Ordering::Equal => {
1497 return (mid, self.child_page(mid).unwrap());
1498 }
1499 Ordering::Greater => {
1500 min_child = mid + 1;
1501 }
1502 }
1503 }
1504 debug_assert_eq!(min_child, max_child);
1505
1506 (min_child, self.child_page(min_child).unwrap())
1507 }
1508
1509 fn key_section_start(&self) -> usize {
1510 if self.fixed_key_size.is_none() {
1511 8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1512 + size_of::<u32>() * self.num_keys()
1513 } else {
1514 8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1515 }
1516 }
1517
1518 fn key_offset(&self, n: usize) -> usize {
1519 if n == 0 {
1520 self.key_section_start()
1521 } else {
1522 self.key_end(n - 1).unwrap()
1523 }
1524 }
1525
1526 fn key_end(&self, n: usize) -> Option<usize> {
1527 if let Some(fixed) = self.fixed_key_size {
1528 return Some(self.key_section_start() + fixed * (n + 1));
1529 }
1530 let offset = 8
1531 + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1532 + size_of::<u32>() * n;
1533 Some(u32::from_le_bytes(
1534 self.page
1535 .memory()
1536 .get(offset..(offset + size_of::<u32>()))?
1537 .try_into()
1538 .unwrap(),
1539 ) as usize)
1540 }
1541
1542 pub(super) fn key(&self, n: usize) -> Option<&[u8]> {
1543 if n >= self.num_keys() {
1544 return None;
1545 }
1546 let offset = self.key_offset(n);
1547 let end = self.key_end(n)?;
1548 Some(&self.page.memory()[offset..end])
1549 }
1550
1551 pub(crate) fn count_children(&self) -> usize {
1552 self.num_keys() + 1
1553 }
1554
1555 pub(crate) fn child_checksum(&self, n: usize) -> Option<Checksum> {
1556 if n >= self.count_children() {
1557 return None;
1558 }
1559
1560 let offset = 8 + size_of::<Checksum>() * n;
1561 Some(Checksum::from_le_bytes(
1562 self.page.memory()[offset..(offset + size_of::<Checksum>())]
1563 .try_into()
1564 .unwrap(),
1565 ))
1566 }
1567
1568 pub(crate) fn child_page(&self, n: usize) -> Option<PageNumber> {
1569 if n >= self.count_children() {
1570 return None;
1571 }
1572
1573 let offset =
1574 8 + size_of::<Checksum>() * self.count_children() + PageNumber::serialized_size() * n;
1575 Some(PageNumber::from_le_bytes(
1576 self.page.memory()[offset..(offset + PageNumber::serialized_size())]
1577 .try_into()
1578 .unwrap(),
1579 ))
1580 }
1581
1582 fn num_keys(&self) -> usize {
1583 self.num_keys
1584 }
1585}
1586
1587pub(super) struct BranchBuilder<'a, 'b> {
1588 children: Vec<(PageNumber, Checksum)>,
1589 keys: Vec<&'a [u8]>,
1590 total_key_bytes: usize,
1591 fixed_key_size: Option<usize>,
1592 mem: &'b TransactionalMemory,
1593 allocated_pages: &'b Mutex<PageTrackerPolicy>,
1594}
1595
1596impl<'a, 'b> BranchBuilder<'a, 'b> {
1597 pub(super) fn new(
1598 mem: &'b TransactionalMemory,
1599 allocated_pages: &'b Mutex<PageTrackerPolicy>,
1600 child_capacity: usize,
1601 fixed_key_size: Option<usize>,
1602 ) -> Self {
1603 Self {
1604 children: Vec::with_capacity(child_capacity),
1605 keys: Vec::with_capacity(child_capacity - 1),
1606 total_key_bytes: 0,
1607 fixed_key_size,
1608 mem,
1609 allocated_pages,
1610 }
1611 }
1612
1613 pub(super) fn replace_child(&mut self, index: usize, child: PageNumber, checksum: Checksum) {
1614 self.children[index] = (child, checksum);
1615 }
1616
1617 pub(super) fn push_child(&mut self, child: PageNumber, checksum: Checksum) {
1618 self.children.push((child, checksum));
1619 }
1620
1621 pub(super) fn push_key(&mut self, key: &'a [u8]) {
1622 self.keys.push(key);
1623 self.total_key_bytes += key.len();
1624 }
1625
1626 pub(super) fn push_all<T: Page>(&mut self, accessor: &'a BranchAccessor<'_, '_, T>) {
1627 for i in 0..accessor.count_children() {
1628 let child = accessor.child_page(i).unwrap();
1629 let checksum = accessor.child_checksum(i).unwrap();
1630 self.push_child(child, checksum);
1631 }
1632 for i in 0..(accessor.count_children() - 1) {
1633 self.push_key(accessor.key(i).unwrap());
1634 }
1635 }
1636
1637 pub(super) fn to_single_child(&self) -> Option<(PageNumber, Checksum)> {
1638 if self.children.len() > 1 {
1639 None
1640 } else {
1641 Some(self.children[0])
1642 }
1643 }
1644
1645 pub(super) fn build(self) -> Result<PageMut> {
1646 assert_eq!(self.children.len(), self.keys.len() + 1);
1647 let size = RawBranchBuilder::required_bytes(
1648 self.keys.len(),
1649 self.total_key_bytes,
1650 self.fixed_key_size,
1651 );
1652 let mut allocated_pages = self.allocated_pages.lock();
1653 let mut page = self.mem.allocate(size, &mut allocated_pages)?;
1654 let mut builder =
1655 RawBranchBuilder::new(page.memory_mut()?, self.keys.len(), self.fixed_key_size);
1656 builder.write_first_page(self.children[0].0, self.children[0].1);
1657 for i in 1..self.children.len() {
1658 let key = &self.keys[i - 1];
1659 builder.write_nth_key(key.as_ref(), self.children[i].0, self.children[i].1, i - 1);
1660 }
1661 drop(builder);
1662
1663 Ok(page)
1664 }
1665
1666 pub(super) fn should_split(&self) -> bool {
1667 let size = RawBranchBuilder::required_bytes(
1668 self.keys.len(),
1669 self.total_key_bytes,
1670 self.fixed_key_size,
1671 );
1672 size > self.mem.get_page_size() && self.keys.len() >= 3
1673 }
1674
1675 pub(super) fn build_split(self) -> Result<(PageMut, &'a [u8], PageMut)> {
1676 let mut allocated_pages = self.allocated_pages.lock();
1677 assert_eq!(self.children.len(), self.keys.len() + 1);
1678 assert!(self.keys.len() >= 3);
1679 let division = self.keys.len() / 2;
1680 let first_split_key_len: usize = self.keys.iter().take(division).map(|k| k.len()).sum();
1681 let division_key = self.keys[division];
1682 let second_split_key_len = self.total_key_bytes - first_split_key_len - division_key.len();
1683
1684 let size =
1685 RawBranchBuilder::required_bytes(division, first_split_key_len, self.fixed_key_size);
1686 let mut page1 = self.mem.allocate(size, &mut allocated_pages)?;
1687 let mut builder = RawBranchBuilder::new(page1.memory_mut()?, division, self.fixed_key_size);
1688 builder.write_first_page(self.children[0].0, self.children[0].1);
1689 for i in 0..division {
1690 let key = &self.keys[i];
1691 builder.write_nth_key(
1692 key.as_ref(),
1693 self.children[i + 1].0,
1694 self.children[i + 1].1,
1695 i,
1696 );
1697 }
1698 drop(builder);
1699
1700 let size = RawBranchBuilder::required_bytes(
1701 self.keys.len() - division - 1,
1702 second_split_key_len,
1703 self.fixed_key_size,
1704 );
1705 let mut page2 = self.mem.allocate(size, &mut allocated_pages)?;
1706 let mut builder = RawBranchBuilder::new(
1707 page2.memory_mut()?,
1708 self.keys.len() - division - 1,
1709 self.fixed_key_size,
1710 );
1711 builder.write_first_page(self.children[division + 1].0, self.children[division + 1].1);
1712 for i in (division + 1)..self.keys.len() {
1713 let key = &self.keys[i];
1714 builder.write_nth_key(
1715 key.as_ref(),
1716 self.children[i + 1].0,
1717 self.children[i + 1].1,
1718 i - division - 1,
1719 );
1720 }
1721 drop(builder);
1722
1723 Ok((page1, division_key, page2))
1724 }
1725}
1726
1727pub(super) struct RawBranchBuilder<'b> {
1743 page: &'b mut [u8],
1744 fixed_key_size: Option<usize>,
1745 num_keys: usize,
1746 keys_written: usize, }
1748
1749impl<'b> RawBranchBuilder<'b> {
1750 pub(super) fn required_bytes(
1751 num_keys: usize,
1752 size_of_keys: usize,
1753 fixed_key_size: Option<usize>,
1754 ) -> usize {
1755 if fixed_key_size.is_none() {
1756 let fixed_size = 8
1757 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1)
1758 + size_of::<u32>() * num_keys;
1759 size_of_keys + fixed_size
1760 } else {
1761 let fixed_size =
1762 8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1);
1763 size_of_keys + fixed_size
1764 }
1765 }
1766
1767 pub(super) fn new(page: &'b mut [u8], num_keys: usize, fixed_key_size: Option<usize>) -> Self {
1769 assert!(num_keys > 0);
1770 page[0] = BRANCH;
1771 page[2..4].copy_from_slice(&u16::try_from(num_keys).unwrap().to_le_bytes());
1772 #[cfg(debug_assertions)]
1773 {
1774 let start = 8 + size_of::<Checksum>() * (num_keys + 1);
1776 let mut last =
1777 8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1);
1778 if fixed_key_size.is_none() {
1779 last += size_of::<u32>() * num_keys;
1780 }
1781 for x in &mut page[start..last] {
1782 *x = 0xFF;
1783 }
1784 }
1785 RawBranchBuilder {
1786 page,
1787 fixed_key_size,
1788 num_keys,
1789 keys_written: 0,
1790 }
1791 }
1792
1793 pub(super) fn write_first_page(&mut self, page_number: PageNumber, checksum: Checksum) {
1794 let offset = 8;
1795 self.page[offset..(offset + size_of::<Checksum>())]
1796 .copy_from_slice(&checksum.to_le_bytes());
1797 let offset = 8 + size_of::<Checksum>() * (self.num_keys + 1);
1798 self.page[offset..(offset + PageNumber::serialized_size())]
1799 .copy_from_slice(&page_number.to_le_bytes());
1800 }
1801
1802 fn key_section_start(&self) -> usize {
1803 let mut offset =
1804 8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1);
1805 if self.fixed_key_size.is_none() {
1806 offset += size_of::<u32>() * self.num_keys;
1807 }
1808
1809 offset
1810 }
1811
1812 fn key_end(&self, n: usize) -> usize {
1813 if let Some(fixed) = self.fixed_key_size {
1814 return self.key_section_start() + fixed * (n + 1);
1815 }
1816 let offset = 8
1817 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1)
1818 + size_of::<u32>() * n;
1819 u32::from_le_bytes(
1820 self.page[offset..(offset + size_of::<u32>())]
1821 .try_into()
1822 .unwrap(),
1823 ) as usize
1824 }
1825
1826 pub(super) fn write_nth_key(
1829 &mut self,
1830 key: &[u8],
1831 page_number: PageNumber,
1832 checksum: Checksum,
1833 n: usize,
1834 ) {
1835 assert!(n < self.num_keys);
1836 assert_eq!(n, self.keys_written);
1837 self.keys_written += 1;
1838 let offset = 8 + size_of::<Checksum>() * (n + 1);
1839 self.page[offset..(offset + size_of::<Checksum>())]
1840 .copy_from_slice(&checksum.to_le_bytes());
1841 let offset = 8
1842 + size_of::<Checksum>() * (self.num_keys + 1)
1843 + PageNumber::serialized_size() * (n + 1);
1844 self.page[offset..(offset + PageNumber::serialized_size())]
1845 .copy_from_slice(&page_number.to_le_bytes());
1846
1847 let data_offset = if n > 0 {
1848 self.key_end(n - 1)
1849 } else {
1850 self.key_section_start()
1851 };
1852 if self.fixed_key_size.is_none() {
1853 let offset = 8
1854 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1)
1855 + size_of::<u32>() * n;
1856 self.page[offset..(offset + size_of::<u32>())].copy_from_slice(
1857 &u32::try_from(data_offset + key.len())
1858 .unwrap()
1859 .to_le_bytes(),
1860 );
1861 }
1862
1863 debug_assert!(data_offset > offset);
1864 self.page[data_offset..(data_offset + key.len())].copy_from_slice(key);
1865 }
1866}
1867
1868impl Drop for RawBranchBuilder<'_> {
1869 fn drop(&mut self) {
1870 let is_panicking = {
1871 #[cfg(feature = "std")]
1872 {
1873 std::thread::panicking()
1874 }
1875 #[cfg(not(feature = "std"))]
1876 {
1877 false
1878 }
1879 };
1880 if !is_panicking {
1881 assert_eq!(self.keys_written, self.num_keys);
1882 }
1883 }
1884}
1885
1886pub(crate) struct BranchMutator<'b> {
1887 page: &'b mut [u8],
1888}
1889
1890impl<'b> BranchMutator<'b> {
1891 pub(crate) fn new(page: &'b mut [u8]) -> Self {
1892 assert_eq!(page[0], BRANCH);
1893 Self { page }
1894 }
1895
1896 fn num_keys(&self) -> usize {
1897 u16::from_le_bytes(self.page[2..4].try_into().unwrap()) as usize
1898 }
1899
1900 pub(crate) fn write_child_page(
1901 &mut self,
1902 i: usize,
1903 page_number: PageNumber,
1904 checksum: Checksum,
1905 ) {
1906 debug_assert!(i <= self.num_keys());
1907 let offset = 8 + size_of::<Checksum>() * i;
1908 self.page[offset..(offset + size_of::<Checksum>())]
1909 .copy_from_slice(&checksum.to_le_bytes());
1910 let offset =
1911 8 + size_of::<Checksum>() * (self.num_keys() + 1) + PageNumber::serialized_size() * i;
1912 self.page[offset..(offset + PageNumber::serialized_size())]
1913 .copy_from_slice(&page_number.to_le_bytes());
1914 }
1915}