1use crate::tree_store::page_store::{
2 xxh3_checksum, CachePriority, Page, PageImpl, PageMut, TransactionalMemory,
3};
4use crate::tree_store::PageNumber;
5use crate::types::{MutInPlaceValue, RedbKey, RedbValue};
6use crate::{Result, StorageError};
7use std::cmp::Ordering;
8use std::marker::PhantomData;
9use std::mem::size_of;
10use std::ops::Range;
11use std::{mem, thread};
12
13pub(crate) const LEAF: u8 = 1;
14pub(crate) const BRANCH: u8 = 2;
15
16pub(crate) type Checksum = u128;
17pub(crate) const DEFERRED: Checksum = 999;
19
20pub(super) fn leaf_checksum<T: Page>(
21 page: &T,
22 fixed_key_size: Option<usize>,
23 fixed_value_size: Option<usize>,
24) -> Result<Checksum, StorageError> {
25 let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size);
26 let end = accessor.value_end(accessor.num_pairs() - 1).unwrap();
27 if end > page.memory().len() {
28 Err(StorageError::Corrupted(format!(
29 "Leaf page {:?} corrupted. Last offset {} beyond end of data {}",
30 page.get_page_number(),
31 end,
32 page.memory().len()
33 )))
34 } else {
35 Ok(xxh3_checksum(&page.memory()[..end]))
36 }
37}
38
39pub(super) fn branch_checksum<T: Page>(
40 page: &T,
41 fixed_key_size: Option<usize>,
42) -> Result<Checksum, StorageError> {
43 let accessor = BranchAccessor::new(page, fixed_key_size);
44 let end = accessor.key_end(accessor.num_keys() - 1);
45 if end > page.memory().len() {
46 Err(StorageError::Corrupted(format!(
47 "Branch page {:?} corrupted. Last offset {} beyond end of data {}",
48 page.get_page_number(),
49 end,
50 page.memory().len()
51 )))
52 } else {
53 Ok(xxh3_checksum(&page.memory()[..end]))
54 }
55}
56
57enum OnDrop {
58 None,
59 Free(PageNumber),
60 RemoveEntry {
61 position: usize,
62 fixed_key_size: Option<usize>,
63 },
64}
65
66enum EitherPage<'a> {
67 Immutable(PageImpl<'a>),
68 Mutable(PageMut<'a>),
69 OwnedMemory(Vec<u8>),
70}
71
72impl<'a> EitherPage<'a> {
73 fn memory(&self) -> &[u8] {
74 match self {
75 EitherPage::Immutable(page) => page.memory(),
76 EitherPage::Mutable(page) => page.memory(),
77 EitherPage::OwnedMemory(mem) => mem.as_slice(),
78 }
79 }
80}
81
82pub struct AccessGuard<'a, V: RedbValue> {
83 page: EitherPage<'a>,
84 offset: usize,
85 len: usize,
86 on_drop: OnDrop,
87 mem: Option<&'a TransactionalMemory>,
88 _value_type: PhantomData<V>,
89}
90
91impl<'a, V: RedbValue> AccessGuard<'a, V> {
92 pub(super) fn new(
93 page: PageImpl<'a>,
94 offset: usize,
95 len: usize,
96 free_on_drop: bool,
97 mem: &'a TransactionalMemory,
98 ) -> Self {
99 let page_number = page.get_page_number();
100 Self {
101 page: EitherPage::Immutable(page),
102 offset,
103 len,
104 on_drop: if free_on_drop {
105 OnDrop::Free(page_number)
106 } else {
107 OnDrop::None
108 },
109 mem: Some(mem),
110 _value_type: Default::default(),
111 }
112 }
113
114 pub(crate) fn with_page(page: PageImpl<'a>, range: Range<usize>) -> Self {
115 Self {
116 page: EitherPage::Immutable(page),
117 offset: range.start,
118 len: range.len(),
119 on_drop: OnDrop::None,
120 mem: None,
121 _value_type: Default::default(),
122 }
123 }
124
125 pub(crate) fn with_owned_value(value: Vec<u8>) -> Self {
126 let len = value.len();
127 Self {
128 page: EitherPage::OwnedMemory(value),
129 offset: 0,
130 len,
131 on_drop: OnDrop::None,
132 mem: None,
133 _value_type: Default::default(),
134 }
135 }
136
137 pub(super) fn remove_on_drop(
138 page: PageMut<'a>,
139 offset: usize,
140 len: usize,
141 position: usize,
142 fixed_key_size: Option<usize>,
143 mem: &'a TransactionalMemory,
144 ) -> Self {
145 Self {
146 page: EitherPage::Mutable(page),
147 offset,
148 len,
149 on_drop: OnDrop::RemoveEntry {
150 position,
151 fixed_key_size,
152 },
153 mem: Some(mem),
154 _value_type: Default::default(),
155 }
156 }
157
158 pub fn value(&self) -> V::SelfType<'_> {
159 V::from_bytes(&self.page.memory()[self.offset..(self.offset + self.len)])
160 }
161}
162
163impl<'a, V: RedbValue> Drop for AccessGuard<'a, V> {
164 fn drop(&mut self) {
165 match self.on_drop {
166 OnDrop::None => {}
167 OnDrop::Free(page_number) => {
168 let mut dummy = EitherPage::OwnedMemory(vec![]);
170 mem::swap(&mut self.page, &mut dummy);
171 drop(dummy);
172 self.mem.unwrap().free(page_number);
173 }
174 OnDrop::RemoveEntry {
175 position,
176 fixed_key_size,
177 } => {
178 if let EitherPage::Mutable(ref mut mut_page) = self.page {
179 let mut mutator = LeafMutator::new(mut_page, fixed_key_size, V::fixed_width());
180 mutator.remove(position);
181 } else if !thread::panicking() {
182 unreachable!();
183 }
184 }
185 }
186 }
187}
188
189pub struct AccessGuardMut<'a, V: RedbValue> {
190 page: PageMut<'a>,
191 offset: usize,
192 len: usize,
193 _value_type: PhantomData<V>,
194}
195
196impl<'a, V: RedbValue> AccessGuardMut<'a, V> {
197 pub(crate) fn new(page: PageMut<'a>, offset: usize, len: usize) -> Self {
198 AccessGuardMut {
199 page,
200 offset,
201 len,
202 _value_type: Default::default(),
203 }
204 }
205}
206
207impl<'a, V: MutInPlaceValue> AsMut<V::BaseRefType> for AccessGuardMut<'a, V> {
208 fn as_mut(&mut self) -> &mut V::BaseRefType {
209 V::from_bytes_mut(&mut self.page.memory_mut()[self.offset..(self.offset + self.len)])
210 }
211}
212
213pub struct EntryAccessor<'a> {
215 key: &'a [u8],
216 value: &'a [u8],
217}
218
219impl<'a> EntryAccessor<'a> {
220 fn new(key: &'a [u8], value: &'a [u8]) -> Self {
221 EntryAccessor { key, value }
222 }
223}
224
225impl<'a: 'b, 'b> EntryAccessor<'a> {
226 pub(crate) fn key(&'b self) -> &'a [u8] {
227 self.key
228 }
229
230 pub(crate) fn value(&'b self) -> &'a [u8] {
231 self.value
232 }
233}
234
235pub(crate) struct LeafAccessor<'a> {
237 page: &'a [u8],
238 fixed_key_size: Option<usize>,
239 fixed_value_size: Option<usize>,
240 num_pairs: usize,
241}
242
243impl<'a> LeafAccessor<'a> {
244 pub(crate) fn new(
245 page: &'a [u8],
246 fixed_key_size: Option<usize>,
247 fixed_value_size: Option<usize>,
248 ) -> Self {
249 debug_assert_eq!(page[0], LEAF);
250 let num_pairs = u16::from_le_bytes(page[2..4].try_into().unwrap()) as usize;
251 LeafAccessor {
252 page,
253 fixed_key_size,
254 fixed_value_size,
255 num_pairs,
256 }
257 }
258
259 pub(super) fn print_node<K: RedbKey, V: RedbValue>(&self, include_value: bool) {
260 let mut i = 0;
261 while let Some(entry) = self.entry(i) {
262 eprint!(" key_{}={:?}", i, K::from_bytes(entry.key()));
263 if include_value {
264 eprint!(" value_{}={:?}", i, V::from_bytes(entry.value()));
265 }
266 i += 1;
267 }
268 }
269
270 pub(crate) fn position<K: RedbKey>(&self, query: &[u8]) -> (usize, bool) {
271 let mut min_entry = 0;
273 let mut max_entry = self.num_pairs();
275 while min_entry < max_entry {
276 let mid = (min_entry + max_entry) / 2;
277 let key = self.key_unchecked(mid);
278 match K::compare(query, key) {
279 Ordering::Less => {
280 max_entry = mid;
281 }
282 Ordering::Equal => {
283 return (mid, true);
284 }
285 Ordering::Greater => {
286 min_entry = mid + 1;
287 }
288 }
289 }
290 debug_assert_eq!(min_entry, max_entry);
291 (min_entry, false)
292 }
293
294 pub(crate) fn find_key<K: RedbKey>(&self, query: &[u8]) -> Option<usize> {
295 let (entry, found) = self.position::<K>(query);
296 if found {
297 Some(entry)
298 } else {
299 None
300 }
301 }
302
303 fn key_section_start(&self) -> usize {
304 let mut offset = 4;
305 if self.fixed_key_size.is_none() {
306 offset += size_of::<u32>() * self.num_pairs;
307 }
308 if self.fixed_value_size.is_none() {
309 offset += size_of::<u32>() * self.num_pairs;
310 }
311
312 offset
313 }
314
315 fn key_start(&self, n: usize) -> Option<usize> {
316 if n == 0 {
317 Some(self.key_section_start())
318 } else {
319 self.key_end(n - 1)
320 }
321 }
322
323 fn key_end(&self, n: usize) -> Option<usize> {
324 if n >= self.num_pairs() {
325 None
326 } else {
327 if let Some(fixed) = self.fixed_key_size {
328 return Some(self.key_section_start() + fixed * (n + 1));
329 }
330 let offset = 4 + size_of::<u32>() * n;
331 let end = u32::from_le_bytes(
332 self.page[offset..(offset + size_of::<u32>())]
333 .try_into()
334 .unwrap(),
335 ) as usize;
336 Some(end)
337 }
338 }
339
340 fn value_start(&self, n: usize) -> Option<usize> {
341 if n == 0 {
342 self.key_end(self.num_pairs() - 1)
343 } else {
344 self.value_end(n - 1)
345 }
346 }
347
348 fn value_end(&self, n: usize) -> Option<usize> {
349 if n >= self.num_pairs() {
350 None
351 } else {
352 if let Some(fixed) = self.fixed_value_size {
353 return Some(self.key_end(self.num_pairs - 1).unwrap() + fixed * (n + 1));
354 }
355 let mut offset = 4 + size_of::<u32>() * n;
356 if self.fixed_key_size.is_none() {
357 offset += size_of::<u32>() * self.num_pairs;
358 }
359 let end = u32::from_le_bytes(
360 self.page[offset..(offset + size_of::<u32>())]
361 .try_into()
362 .unwrap(),
363 ) as usize;
364 Some(end)
365 }
366 }
367
368 pub(crate) fn num_pairs(&self) -> usize {
369 self.num_pairs
370 }
371
372 pub(super) fn offset_of_first_value(&self) -> usize {
373 self.offset_of_value(0).unwrap()
374 }
375
376 pub(super) fn offset_of_value(&self, n: usize) -> Option<usize> {
377 self.value_start(n)
378 }
379
380 pub(super) fn value_range(&self, n: usize) -> Option<(usize, usize)> {
381 Some((self.value_start(n)?, self.value_end(n)?))
382 }
383
384 pub(crate) fn length_of_pairs(&self, start: usize, end: usize) -> usize {
386 self.length_of_values(start, end) + self.length_of_keys(start, end)
387 }
388
389 fn length_of_values(&self, start: usize, end: usize) -> usize {
390 if end == 0 {
391 return 0;
392 }
393 let end_offset = self.value_end(end - 1).unwrap();
394 let start_offset = self.value_start(start).unwrap();
395 end_offset - start_offset
396 }
397
398 pub(crate) fn length_of_keys(&self, start: usize, end: usize) -> usize {
400 if end == 0 {
401 return 0;
402 }
403 let end_offset = self.key_end(end - 1).unwrap();
404 let start_offset = self.key_start(start).unwrap();
405 end_offset - start_offset
406 }
407
408 pub(crate) fn total_length(&self) -> usize {
409 self.value_end(self.num_pairs() - 1).unwrap()
411 }
412
413 fn key_unchecked(&self, n: usize) -> &[u8] {
414 &self.page[self.key_start(n).unwrap()..self.key_end(n).unwrap()]
415 }
416
417 pub(crate) fn entry(&self, n: usize) -> Option<EntryAccessor<'a>> {
418 let key = &self.page[self.key_start(n)?..self.key_end(n)?];
419 let value = &self.page[self.value_start(n)?..self.value_end(n)?];
420 Some(EntryAccessor::new(key, value))
421 }
422
423 pub(crate) fn entry_ranges(&self, n: usize) -> Option<(Range<usize>, Range<usize>)> {
424 let key = self.key_start(n)?..self.key_end(n)?;
425 let value = self.value_start(n)?..self.value_end(n)?;
426 Some((key, value))
427 }
428
429 pub(super) fn last_entry(&self) -> EntryAccessor<'a> {
430 self.entry(self.num_pairs() - 1).unwrap()
431 }
432}
433
434pub(super) struct LeafBuilder<'a, 'b> {
435 pairs: Vec<(&'a [u8], &'a [u8])>,
436 fixed_key_size: Option<usize>,
437 fixed_value_size: Option<usize>,
438 total_key_bytes: usize,
439 total_value_bytes: usize,
440 mem: &'b TransactionalMemory,
441}
442
443impl<'a, 'b> LeafBuilder<'a, 'b> {
444 pub(super) fn required_bytes(&self, num_pairs: usize, keys_values_bytes: usize) -> usize {
445 RawLeafBuilder::required_bytes(
446 num_pairs,
447 keys_values_bytes,
448 self.fixed_key_size,
449 self.fixed_value_size,
450 )
451 }
452
453 pub(super) fn new(
454 mem: &'b TransactionalMemory,
455 capacity: usize,
456 fixed_key_size: Option<usize>,
457 fixed_value_size: Option<usize>,
458 ) -> Self {
459 Self {
460 pairs: Vec::with_capacity(capacity),
461 fixed_key_size,
462 fixed_value_size,
463 total_key_bytes: 0,
464 total_value_bytes: 0,
465 mem,
466 }
467 }
468
469 pub(super) fn push(&mut self, key: &'a [u8], value: &'a [u8]) {
470 self.total_key_bytes += key.len();
471 self.total_value_bytes += value.len();
472 self.pairs.push((key, value))
473 }
474
475 pub(super) fn push_all_except(
476 &mut self,
477 accessor: &'a LeafAccessor<'_>,
478 except: Option<usize>,
479 ) {
480 for i in 0..accessor.num_pairs() {
481 if let Some(except) = except {
482 if except == i {
483 continue;
484 }
485 }
486 let entry = accessor.entry(i).unwrap();
487 self.push(entry.key(), entry.value());
488 }
489 }
490
491 pub(super) fn should_split(&self) -> bool {
492 let required_size = self.required_bytes(
493 self.pairs.len(),
494 self.total_key_bytes + self.total_value_bytes,
495 );
496 required_size > self.mem.get_page_size() && self.pairs.len() > 1
497 }
498
499 pub(super) fn build_split(self) -> Result<(PageMut<'b>, &'a [u8], PageMut<'b>)> {
500 let total_size = self.total_key_bytes + self.total_value_bytes;
501 let mut division = 0;
502 let mut first_split_key_bytes = 0;
503 let mut first_split_value_bytes = 0;
504 for (key, value) in self.pairs.iter().take(self.pairs.len() - 1) {
505 first_split_key_bytes += key.len();
506 first_split_value_bytes += value.len();
507 division += 1;
508 if first_split_key_bytes + first_split_value_bytes >= total_size / 2 {
509 break;
510 }
511 }
512
513 let required_size =
514 self.required_bytes(division, first_split_key_bytes + first_split_value_bytes);
515 let mut page1 = self.mem.allocate(required_size, CachePriority::Low)?;
516 let mut builder = RawLeafBuilder::new(
517 page1.memory_mut(),
518 division,
519 self.fixed_key_size,
520 self.fixed_value_size,
521 first_split_key_bytes,
522 );
523 for (key, value) in self.pairs.iter().take(division) {
524 builder.append(key, value);
525 }
526 drop(builder);
527
528 let required_size = self.required_bytes(
529 self.pairs.len() - division,
530 self.total_key_bytes + self.total_value_bytes
531 - first_split_key_bytes
532 - first_split_value_bytes,
533 );
534 let mut page2 = self.mem.allocate(required_size, CachePriority::Low)?;
535 let mut builder = RawLeafBuilder::new(
536 page2.memory_mut(),
537 self.pairs.len() - division,
538 self.fixed_key_size,
539 self.fixed_value_size,
540 self.total_key_bytes - first_split_key_bytes,
541 );
542 for (key, value) in self.pairs[division..].iter() {
543 builder.append(key, value);
544 }
545 drop(builder);
546
547 Ok((page1, self.pairs[division - 1].0, page2))
548 }
549
550 pub(super) fn build(self) -> Result<PageMut<'b>> {
551 let required_size = self.required_bytes(
552 self.pairs.len(),
553 self.total_key_bytes + self.total_value_bytes,
554 );
555 let mut page = self.mem.allocate(required_size, CachePriority::Low)?;
556 let mut builder = RawLeafBuilder::new(
557 page.memory_mut(),
558 self.pairs.len(),
559 self.fixed_key_size,
560 self.fixed_value_size,
561 self.total_key_bytes,
562 );
563 for (key, value) in self.pairs {
564 builder.append(key, value);
565 }
566 drop(builder);
567 Ok(page)
568 }
569}
570
571pub(crate) struct RawLeafBuilder<'a> {
586 page: &'a mut [u8],
587 fixed_key_size: Option<usize>,
588 fixed_value_size: Option<usize>,
589 num_pairs: usize,
590 provisioned_key_bytes: usize,
591 pairs_written: usize, }
593
594impl<'a> RawLeafBuilder<'a> {
595 pub(crate) fn required_bytes(
596 num_pairs: usize,
597 keys_values_bytes: usize,
598 key_size: Option<usize>,
599 value_size: Option<usize>,
600 ) -> usize {
601 let mut result = 4;
603 if key_size.is_none() {
605 result += num_pairs * size_of::<u32>();
606 }
607 if value_size.is_none() {
608 result += num_pairs * size_of::<u32>();
609 }
610 result += keys_values_bytes;
611
612 result
613 }
614
615 pub(crate) fn new(
616 page: &'a mut [u8],
617 num_pairs: usize,
618 fixed_key_size: Option<usize>,
619 fixed_value_size: Option<usize>,
620 key_bytes: usize,
621 ) -> Self {
622 page[0] = LEAF;
623 page[2..4].copy_from_slice(&u16::try_from(num_pairs).unwrap().to_le_bytes());
624 #[cfg(debug_assertions)]
625 {
626 let mut last = 4;
628 if fixed_key_size.is_none() {
629 last += size_of::<u32>() * num_pairs;
630 }
631 if fixed_value_size.is_none() {
632 last += size_of::<u32>() * num_pairs;
633 }
634 for x in &mut page[4..last] {
635 *x = 0xFF;
636 }
637 }
638 RawLeafBuilder {
639 page,
640 fixed_key_size,
641 fixed_value_size,
642 num_pairs,
643 provisioned_key_bytes: key_bytes,
644 pairs_written: 0,
645 }
646 }
647
648 fn value_end(&self, n: usize) -> usize {
649 if let Some(fixed) = self.fixed_value_size {
650 return self.key_section_start() + self.provisioned_key_bytes + fixed * (n + 1);
651 }
652 let mut offset = 4 + size_of::<u32>() * n;
653 if self.fixed_key_size.is_none() {
654 offset += size_of::<u32>() * self.num_pairs;
655 }
656 u32::from_le_bytes(
657 self.page[offset..(offset + size_of::<u32>())]
658 .try_into()
659 .unwrap(),
660 ) as usize
661 }
662
663 fn key_section_start(&self) -> usize {
664 let mut offset = 4;
665 if self.fixed_key_size.is_none() {
666 offset += size_of::<u32>() * self.num_pairs;
667 }
668 if self.fixed_value_size.is_none() {
669 offset += size_of::<u32>() * self.num_pairs;
670 }
671
672 offset
673 }
674
675 fn key_end(&self, n: usize) -> usize {
676 if let Some(fixed) = self.fixed_key_size {
677 return self.key_section_start() + fixed * (n + 1);
678 }
679 let offset = 4 + size_of::<u32>() * n;
680 u32::from_le_bytes(
681 self.page[offset..(offset + size_of::<u32>())]
682 .try_into()
683 .unwrap(),
684 ) as usize
685 }
686
687 pub(crate) fn append(&mut self, key: &[u8], value: &[u8]) {
688 if let Some(key_width) = self.fixed_key_size {
689 assert_eq!(key_width, key.len());
690 }
691 if let Some(value_width) = self.fixed_value_size {
692 assert_eq!(value_width, value.len());
693 }
694 let key_offset = if self.pairs_written == 0 {
695 self.key_section_start()
696 } else {
697 self.key_end(self.pairs_written - 1)
698 };
699 let value_offset = if self.pairs_written == 0 {
700 self.key_section_start() + self.provisioned_key_bytes
701 } else {
702 self.value_end(self.pairs_written - 1)
703 };
704
705 let n = self.pairs_written;
706 if self.fixed_key_size.is_none() {
707 let offset = 4 + size_of::<u32>() * n;
708 self.page[offset..(offset + size_of::<u32>())]
709 .copy_from_slice(&u32::try_from(key_offset + key.len()).unwrap().to_le_bytes());
710 }
711 self.page[key_offset..(key_offset + key.len())].copy_from_slice(key);
712 let written_key_len = key_offset + key.len() - self.key_section_start();
713 assert!(written_key_len <= self.provisioned_key_bytes);
714
715 if self.fixed_value_size.is_none() {
716 let mut offset = 4 + size_of::<u32>() * n;
717 if self.fixed_key_size.is_none() {
718 offset += size_of::<u32>() * self.num_pairs;
719 }
720 self.page[offset..(offset + size_of::<u32>())].copy_from_slice(
721 &u32::try_from(value_offset + value.len())
722 .unwrap()
723 .to_le_bytes(),
724 );
725 }
726 self.page[value_offset..(value_offset + value.len())].copy_from_slice(value);
727 self.pairs_written += 1;
728 }
729}
730
731impl<'a> Drop for RawLeafBuilder<'a> {
732 fn drop(&mut self) {
733 if !thread::panicking() {
734 assert_eq!(self.pairs_written, self.num_pairs);
735 assert_eq!(
736 self.key_section_start() + self.provisioned_key_bytes,
737 self.key_end(self.num_pairs - 1)
738 );
739 }
740 }
741}
742
743pub(crate) struct LeafMutator<'a: 'b, 'b> {
744 page: &'b mut PageMut<'a>,
745 fixed_key_size: Option<usize>,
746 fixed_value_size: Option<usize>,
747}
748
749impl<'a: 'b, 'b> LeafMutator<'a, 'b> {
750 pub(crate) fn new(
751 page: &'b mut PageMut<'a>,
752 fixed_key_size: Option<usize>,
753 fixed_value_size: Option<usize>,
754 ) -> Self {
755 assert_eq!(page.memory_mut()[0], LEAF);
756 Self {
757 page,
758 fixed_key_size,
759 fixed_value_size,
760 }
761 }
762
763 pub(super) fn sufficient_insert_inplace_space(
764 page: &'_ PageImpl<'_>,
765 position: usize,
766 overwrite: bool,
767 fixed_key_size: Option<usize>,
768 fixed_value_size: Option<usize>,
769 new_key: &[u8],
770 new_value: &[u8],
771 ) -> bool {
772 let accessor = LeafAccessor::new(page.memory(), fixed_key_size, fixed_value_size);
773 if overwrite {
774 let remaining = page.memory().len() - accessor.total_length();
775 let required_delta = isize::try_from(new_key.len() + new_value.len()).unwrap()
776 - isize::try_from(accessor.length_of_pairs(position, position + 1)).unwrap();
777 required_delta <= isize::try_from(remaining).unwrap()
778 } else {
779 if page.get_page_number().page_order > 0 && position < accessor.num_pairs() {
784 return false;
785 }
786 let remaining = page.memory().len() - accessor.total_length();
787 let mut required_delta = new_key.len() + new_value.len();
788 if fixed_key_size.is_none() {
789 required_delta += size_of::<u32>();
790 }
791 if fixed_value_size.is_none() {
792 required_delta += size_of::<u32>();
793 }
794 required_delta <= remaining
795 }
796 }
797
798 pub(crate) fn insert(&mut self, i: usize, overwrite: bool, key: &[u8], value: &[u8]) {
800 let accessor = LeafAccessor::new(
801 self.page.memory(),
802 self.fixed_key_size,
803 self.fixed_value_size,
804 );
805 let required_delta = if overwrite {
806 isize::try_from(key.len() + value.len()).unwrap()
807 - isize::try_from(accessor.length_of_pairs(i, i + 1)).unwrap()
808 } else {
809 let mut delta = key.len() + value.len();
810 if self.fixed_key_size.is_none() {
811 delta += size_of::<u32>();
812 }
813 if self.fixed_value_size.is_none() {
814 delta += size_of::<u32>();
815 }
816 delta.try_into().unwrap()
817 };
818 assert!(
819 isize::try_from(accessor.total_length()).unwrap() + required_delta
820 <= isize::try_from(self.page.memory().len()).unwrap()
821 );
822
823 let num_pairs = accessor.num_pairs();
824 let last_key_end = accessor.key_end(accessor.num_pairs() - 1).unwrap();
825 let last_value_end = accessor.value_end(accessor.num_pairs() - 1).unwrap();
826 let shift_index = if overwrite { i + 1 } else { i };
827 let shift_key_start = accessor.key_start(shift_index).unwrap_or(last_key_end);
828 let shift_value_start = accessor.value_start(shift_index).unwrap_or(last_value_end);
829 let existing_value_len = accessor
830 .value_range(i)
831 .map(|(start, end)| end - start)
832 .unwrap_or_default();
833 drop(accessor);
834
835 let value_delta = if overwrite {
836 isize::try_from(value.len()).unwrap() - isize::try_from(existing_value_len).unwrap()
837 } else {
838 value.len().try_into().unwrap()
839 };
840
841 let key_ptr_size: usize = if self.fixed_key_size.is_none() { 4 } else { 0 };
843 let value_ptr_size: usize = if self.fixed_value_size.is_none() {
844 4
845 } else {
846 0
847 };
848 if !overwrite {
849 for j in 0..i {
850 self.update_key_end(j, (key_ptr_size + value_ptr_size).try_into().unwrap());
851 let value_delta: isize = (key_ptr_size + value_ptr_size + key.len())
852 .try_into()
853 .unwrap();
854 self.update_value_end(j, value_delta);
855 }
856 }
857 for j in i..num_pairs {
858 if overwrite {
859 self.update_value_end(j, value_delta);
860 } else {
861 let key_delta: isize = (key_ptr_size + value_ptr_size + key.len())
862 .try_into()
863 .unwrap();
864 self.update_key_end(j, key_delta);
865 let value_delta = key_delta + isize::try_from(value.len()).unwrap();
866 self.update_value_end(j, value_delta);
867 }
868 }
869
870 let new_num_pairs = if overwrite { num_pairs } else { num_pairs + 1 };
871 self.page.memory_mut()[2..4]
872 .copy_from_slice(&u16::try_from(new_num_pairs).unwrap().to_le_bytes());
873
874 let mut dest = if overwrite {
876 (isize::try_from(shift_value_start).unwrap() + value_delta)
877 .try_into()
878 .unwrap()
879 } else {
880 shift_value_start + key_ptr_size + value_ptr_size + key.len() + value.len()
881 };
882 let start = shift_value_start;
883 let end = last_value_end;
884 self.page.memory_mut().copy_within(start..end, dest);
885
886 let inserted_value_end: u32 = dest.try_into().unwrap();
888 dest -= value.len();
889 self.page.memory_mut()[dest..(dest + value.len())].copy_from_slice(value);
890
891 if !overwrite {
892 let start = shift_key_start;
894 let end = shift_value_start;
895 dest -= end - start;
896 self.page.memory_mut().copy_within(start..end, dest);
897
898 let inserted_key_end: u32 = dest.try_into().unwrap();
900 dest -= key.len();
901 self.page.memory_mut()[dest..(dest + key.len())].copy_from_slice(key);
902
903 let start = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
905 let end = shift_key_start;
906 dest -= end - start;
907 debug_assert_eq!(
908 dest,
909 4 + key_ptr_size * new_num_pairs + value_ptr_size * (i + 1)
910 );
911 self.page.memory_mut().copy_within(start..end, dest);
912
913 if self.fixed_value_size.is_none() {
915 dest -= size_of::<u32>();
916 self.page.memory_mut()[dest..(dest + size_of::<u32>())]
917 .copy_from_slice(&inserted_value_end.to_le_bytes());
918 }
919
920 let start = 4 + key_ptr_size * i;
922 let end = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
923 dest -= end - start;
924 debug_assert_eq!(dest, 4 + key_ptr_size * (i + 1));
925 self.page.memory_mut().copy_within(start..end, dest);
926
927 if self.fixed_key_size.is_none() {
929 dest -= size_of::<u32>();
930 self.page.memory_mut()[dest..(dest + size_of::<u32>())]
931 .copy_from_slice(&inserted_key_end.to_le_bytes());
932 }
933 debug_assert_eq!(dest, 4 + key_ptr_size * i);
934 }
935 }
936
937 pub(super) fn remove(&mut self, i: usize) {
938 let accessor = LeafAccessor::new(
939 self.page.memory(),
940 self.fixed_key_size,
941 self.fixed_value_size,
942 );
943 let num_pairs = accessor.num_pairs();
944 assert!(i < num_pairs);
945 assert!(num_pairs > 1);
946 let key_start = accessor.key_start(i).unwrap();
947 let key_end = accessor.key_end(i).unwrap();
948 let value_start = accessor.value_start(i).unwrap();
949 let value_end = accessor.value_end(i).unwrap();
950 let last_value_end = accessor.value_end(accessor.num_pairs() - 1).unwrap();
951 drop(accessor);
952
953 let key_ptr_size = if self.fixed_key_size.is_none() {
955 size_of::<u32>()
956 } else {
957 0
958 };
959 let value_ptr_size = if self.fixed_value_size.is_none() {
960 size_of::<u32>()
961 } else {
962 0
963 };
964 for j in 0..i {
965 self.update_key_end(j, -isize::try_from(key_ptr_size + value_ptr_size).unwrap());
966 let value_delta = -isize::try_from(key_ptr_size + value_ptr_size).unwrap()
967 - isize::try_from(key_end - key_start).unwrap();
968 self.update_value_end(j, value_delta);
969 }
970 for j in (i + 1)..num_pairs {
971 let key_delta = -isize::try_from(key_ptr_size + value_ptr_size).unwrap()
972 - isize::try_from(key_end - key_start).unwrap();
973 self.update_key_end(j, key_delta);
974 let value_delta = key_delta - isize::try_from(value_end - value_start).unwrap();
975 self.update_value_end(j, value_delta);
976 }
977
978 let new_num_pairs = num_pairs - 1;
981 self.page.memory_mut()[2..4]
982 .copy_from_slice(&u16::try_from(new_num_pairs).unwrap().to_le_bytes());
983 let mut dest = 4 + key_ptr_size * i;
985 let start = 4 + key_ptr_size * (i + 1);
987 let end = 4 + key_ptr_size * num_pairs + value_ptr_size * i;
989 self.page.memory_mut().copy_within(start..end, dest);
990 dest += end - start;
991 debug_assert_eq!(dest, 4 + key_ptr_size * new_num_pairs + value_ptr_size * i);
992
993 let start = 4 + key_ptr_size * num_pairs + value_ptr_size * (i + 1);
995 let end = key_start;
996 self.page.memory_mut().copy_within(start..end, dest);
997 dest += end - start;
998
999 let preceding_key_len = key_start - (4 + (key_ptr_size + value_ptr_size) * num_pairs);
1000 debug_assert_eq!(
1001 dest,
1002 4 + (key_ptr_size + value_ptr_size) * new_num_pairs + preceding_key_len
1003 );
1004
1005 let start = key_end;
1007 let end = value_start;
1008 self.page.memory_mut().copy_within(start..end, dest);
1009 dest += end - start;
1010
1011 let preceding_data_len =
1013 value_start - (4 + (key_ptr_size + value_ptr_size) * num_pairs) - (key_end - key_start);
1014 debug_assert_eq!(
1015 dest,
1016 4 + (key_ptr_size + value_ptr_size) * new_num_pairs + preceding_data_len
1017 );
1018 let start = value_end;
1019 let end = last_value_end;
1020 self.page.memory_mut().copy_within(start..end, dest);
1021 }
1022
1023 fn update_key_end(&mut self, i: usize, delta: isize) {
1024 if self.fixed_key_size.is_some() {
1025 return;
1026 }
1027 let offset = 4 + size_of::<u32>() * i;
1028 let mut ptr = u32::from_le_bytes(
1029 self.page.memory()[offset..(offset + size_of::<u32>())]
1030 .try_into()
1031 .unwrap(),
1032 );
1033 ptr = (isize::try_from(ptr).unwrap() + delta).try_into().unwrap();
1034 self.page.memory_mut()[offset..(offset + size_of::<u32>())]
1035 .copy_from_slice(&ptr.to_le_bytes());
1036 }
1037
1038 fn update_value_end(&mut self, i: usize, delta: isize) {
1039 if self.fixed_value_size.is_some() {
1040 return;
1041 }
1042 let accessor = LeafAccessor::new(
1043 self.page.memory(),
1044 self.fixed_key_size,
1045 self.fixed_value_size,
1046 );
1047 let num_pairs = accessor.num_pairs();
1048 drop(accessor);
1049 let mut offset = 4 + size_of::<u32>() * i;
1050 if self.fixed_key_size.is_none() {
1051 offset += size_of::<u32>() * num_pairs;
1052 }
1053 let mut ptr = u32::from_le_bytes(
1054 self.page.memory()[offset..(offset + size_of::<u32>())]
1055 .try_into()
1056 .unwrap(),
1057 );
1058 ptr = (isize::try_from(ptr).unwrap() + delta).try_into().unwrap();
1059 self.page.memory_mut()[offset..(offset + size_of::<u32>())]
1060 .copy_from_slice(&ptr.to_le_bytes());
1061 }
1062}
1063
1064pub(crate) struct BranchAccessor<'a: 'b, 'b, T: Page + 'a> {
1067 page: &'b T,
1068 num_keys: usize,
1069 fixed_key_size: Option<usize>,
1070 _page_lifetime: PhantomData<&'a ()>,
1071}
1072
1073impl<'a: 'b, 'b, T: Page + 'a> BranchAccessor<'a, 'b, T> {
1074 pub(crate) fn new(page: &'b T, fixed_key_size: Option<usize>) -> Self {
1075 debug_assert_eq!(page.memory()[0], BRANCH);
1076 let num_keys = u16::from_le_bytes(page.memory()[2..4].try_into().unwrap()) as usize;
1077 BranchAccessor {
1078 page,
1079 num_keys,
1080 fixed_key_size,
1081 _page_lifetime: Default::default(),
1082 }
1083 }
1084
1085 pub(super) fn print_node<K: RedbKey>(&self) {
1086 eprint!(
1087 "Internal[ (page={:?}), child_0={:?}",
1088 self.page.get_page_number(),
1089 self.child_page(0).unwrap()
1090 );
1091 for i in 0..(self.count_children() - 1) {
1092 if let Some(child) = self.child_page(i + 1) {
1093 let key = self.key(i).unwrap();
1094 eprint!(" key_{}={:?}", i, K::from_bytes(key));
1095 eprint!(" child_{}={:?}", i + 1, child);
1096 }
1097 }
1098 eprint!("]");
1099 }
1100
1101 pub(crate) fn total_length(&self) -> usize {
1102 self.key_end(self.num_keys() - 1)
1104 }
1105
1106 pub(super) fn child_for_key<K: RedbKey>(&self, query: &[u8]) -> (usize, PageNumber) {
1107 let mut min_child = 0; let mut max_child = self.num_keys(); while min_child < max_child {
1110 let mid = (min_child + max_child) / 2;
1111 match K::compare(query, self.key(mid).unwrap()) {
1112 Ordering::Less => {
1113 max_child = mid;
1114 }
1115 Ordering::Equal => {
1116 return (mid, self.child_page(mid).unwrap());
1117 }
1118 Ordering::Greater => {
1119 min_child = mid + 1;
1120 }
1121 }
1122 }
1123 debug_assert_eq!(min_child, max_child);
1124
1125 (min_child, self.child_page(min_child).unwrap())
1126 }
1127
1128 fn key_section_start(&self) -> usize {
1129 if self.fixed_key_size.is_none() {
1130 8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1131 + size_of::<u32>() * self.num_keys()
1132 } else {
1133 8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1134 }
1135 }
1136
1137 fn key_offset(&self, n: usize) -> usize {
1138 if n == 0 {
1139 self.key_section_start()
1140 } else {
1141 self.key_end(n - 1)
1142 }
1143 }
1144
1145 fn key_end(&self, n: usize) -> usize {
1146 if let Some(fixed) = self.fixed_key_size {
1147 return self.key_section_start() + fixed * (n + 1);
1148 }
1149 let offset = 8
1150 + (PageNumber::serialized_size() + size_of::<Checksum>()) * self.count_children()
1151 + size_of::<u32>() * n;
1152 u32::from_le_bytes(
1153 self.page.memory()[offset..(offset + size_of::<u32>())]
1154 .try_into()
1155 .unwrap(),
1156 ) as usize
1157 }
1158
1159 pub(super) fn key(&self, n: usize) -> Option<&[u8]> {
1160 if n >= self.num_keys() {
1161 return None;
1162 }
1163 let offset = self.key_offset(n);
1164 let end = self.key_end(n);
1165 Some(&self.page.memory()[offset..end])
1166 }
1167
1168 pub(crate) fn count_children(&self) -> usize {
1169 self.num_keys() + 1
1170 }
1171
1172 pub(super) fn child_checksum(&self, n: usize) -> Option<Checksum> {
1173 if n >= self.count_children() {
1174 return None;
1175 }
1176
1177 let offset = 8 + size_of::<Checksum>() * n;
1178 Some(Checksum::from_le_bytes(
1179 self.page.memory()[offset..(offset + size_of::<Checksum>())]
1180 .try_into()
1181 .unwrap(),
1182 ))
1183 }
1184
1185 pub(crate) fn child_page(&self, n: usize) -> Option<PageNumber> {
1186 if n >= self.count_children() {
1187 return None;
1188 }
1189
1190 let offset =
1191 8 + size_of::<Checksum>() * self.count_children() + PageNumber::serialized_size() * n;
1192 Some(PageNumber::from_le_bytes(
1193 self.page.memory()[offset..(offset + PageNumber::serialized_size())]
1194 .try_into()
1195 .unwrap(),
1196 ))
1197 }
1198
1199 fn num_keys(&self) -> usize {
1200 self.num_keys
1201 }
1202}
1203
1204pub(super) struct BranchBuilder<'a, 'b> {
1205 children: Vec<(PageNumber, Checksum)>,
1206 keys: Vec<&'a [u8]>,
1207 total_key_bytes: usize,
1208 fixed_key_size: Option<usize>,
1209 mem: &'b TransactionalMemory,
1210}
1211
1212impl<'a, 'b> BranchBuilder<'a, 'b> {
1213 pub(super) fn new(
1214 mem: &'b TransactionalMemory,
1215 child_capacity: usize,
1216 fixed_key_size: Option<usize>,
1217 ) -> Self {
1218 Self {
1219 children: Vec::with_capacity(child_capacity),
1220 keys: Vec::with_capacity(child_capacity - 1),
1221 total_key_bytes: 0,
1222 fixed_key_size,
1223 mem,
1224 }
1225 }
1226
1227 pub(super) fn replace_child(&mut self, index: usize, child: PageNumber, checksum: Checksum) {
1228 self.children[index] = (child, checksum);
1229 }
1230
1231 pub(super) fn push_child(&mut self, child: PageNumber, checksum: Checksum) {
1232 self.children.push((child, checksum));
1233 }
1234
1235 pub(super) fn push_key(&mut self, key: &'a [u8]) {
1236 self.keys.push(key);
1237 self.total_key_bytes += key.len();
1238 }
1239
1240 pub(super) fn push_all<T: Page>(&mut self, accessor: &'a BranchAccessor<'_, '_, T>) {
1241 for i in 0..accessor.count_children() {
1242 let child = accessor.child_page(i).unwrap();
1243 let checksum = accessor.child_checksum(i).unwrap();
1244 self.push_child(child, checksum);
1245 }
1246 for i in 0..(accessor.count_children() - 1) {
1247 self.push_key(accessor.key(i).unwrap());
1248 }
1249 }
1250
1251 pub(super) fn to_single_child(&self) -> Option<(PageNumber, Checksum)> {
1252 if self.children.len() > 1 {
1253 None
1254 } else {
1255 Some(self.children[0])
1256 }
1257 }
1258
1259 pub(super) fn build(self) -> Result<PageMut<'b>> {
1260 assert_eq!(self.children.len(), self.keys.len() + 1);
1261 let size = RawBranchBuilder::required_bytes(
1262 self.keys.len(),
1263 self.total_key_bytes,
1264 self.fixed_key_size,
1265 );
1266 let mut page = self.mem.allocate(size, CachePriority::High)?;
1267 let mut builder = RawBranchBuilder::new(&mut page, self.keys.len(), self.fixed_key_size);
1268 builder.write_first_page(self.children[0].0, self.children[0].1);
1269 for i in 1..self.children.len() {
1270 let key = &self.keys[i - 1];
1271 builder.write_nth_key(key.as_ref(), self.children[i].0, self.children[i].1, i - 1);
1272 }
1273 drop(builder);
1274
1275 Ok(page)
1276 }
1277
1278 pub(super) fn should_split(&self) -> bool {
1279 let size = RawBranchBuilder::required_bytes(
1280 self.keys.len(),
1281 self.total_key_bytes,
1282 self.fixed_key_size,
1283 );
1284 size > self.mem.get_page_size() && self.keys.len() >= 3
1285 }
1286
1287 pub(super) fn build_split(self) -> Result<(PageMut<'b>, &'a [u8], PageMut<'b>)> {
1288 assert_eq!(self.children.len(), self.keys.len() + 1);
1289 assert!(self.keys.len() >= 3);
1290 let division = self.keys.len() / 2;
1291 let first_split_key_len: usize = self.keys.iter().take(division).map(|k| k.len()).sum();
1292 let division_key = self.keys[division];
1293 let second_split_key_len = self.total_key_bytes - first_split_key_len - division_key.len();
1294
1295 let size =
1296 RawBranchBuilder::required_bytes(division, first_split_key_len, self.fixed_key_size);
1297 let mut page1 = self.mem.allocate(size, CachePriority::High)?;
1298 let mut builder = RawBranchBuilder::new(&mut page1, division, self.fixed_key_size);
1299 builder.write_first_page(self.children[0].0, self.children[0].1);
1300 for i in 0..division {
1301 let key = &self.keys[i];
1302 builder.write_nth_key(
1303 key.as_ref(),
1304 self.children[i + 1].0,
1305 self.children[i + 1].1,
1306 i,
1307 );
1308 }
1309 drop(builder);
1310
1311 let size = RawBranchBuilder::required_bytes(
1312 self.keys.len() - division - 1,
1313 second_split_key_len,
1314 self.fixed_key_size,
1315 );
1316 let mut page2 = self.mem.allocate(size, CachePriority::High)?;
1317 let mut builder = RawBranchBuilder::new(
1318 &mut page2,
1319 self.keys.len() - division - 1,
1320 self.fixed_key_size,
1321 );
1322 builder.write_first_page(self.children[division + 1].0, self.children[division + 1].1);
1323 for i in (division + 1)..self.keys.len() {
1324 let key = &self.keys[i];
1325 builder.write_nth_key(
1326 key.as_ref(),
1327 self.children[i + 1].0,
1328 self.children[i + 1].1,
1329 i - division - 1,
1330 );
1331 }
1332 drop(builder);
1333
1334 Ok((page1, division_key, page2))
1335 }
1336}
1337
1338pub(super) struct RawBranchBuilder<'a: 'b, 'b> {
1354 page: &'b mut PageMut<'a>,
1355 fixed_key_size: Option<usize>,
1356 num_keys: usize,
1357 keys_written: usize, }
1359
1360impl<'a: 'b, 'b> RawBranchBuilder<'a, 'b> {
1361 pub(super) fn required_bytes(
1362 num_keys: usize,
1363 size_of_keys: usize,
1364 fixed_key_size: Option<usize>,
1365 ) -> usize {
1366 if fixed_key_size.is_none() {
1367 let fixed_size = 8
1368 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1)
1369 + size_of::<u32>() * num_keys;
1370 size_of_keys + fixed_size
1371 } else {
1372 let fixed_size =
1373 8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1);
1374 size_of_keys + fixed_size
1375 }
1376 }
1377
1378 pub(super) fn new(
1380 page: &'b mut PageMut<'a>,
1381 num_keys: usize,
1382 fixed_key_size: Option<usize>,
1383 ) -> Self {
1384 assert!(num_keys > 0);
1385 page.memory_mut()[0] = BRANCH;
1386 page.memory_mut()[2..4].copy_from_slice(&u16::try_from(num_keys).unwrap().to_le_bytes());
1387 #[cfg(debug_assertions)]
1388 {
1389 let start = 8 + size_of::<Checksum>() * (num_keys + 1);
1391 let last = 8
1392 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (num_keys + 1)
1393 + size_of::<u32>() * num_keys;
1394 for x in &mut page.memory_mut()[start..last] {
1395 *x = 0xFF;
1396 }
1397 }
1398 RawBranchBuilder {
1399 page,
1400 fixed_key_size,
1401 num_keys,
1402 keys_written: 0,
1403 }
1404 }
1405
1406 pub(super) fn write_first_page(&mut self, page_number: PageNumber, checksum: Checksum) {
1407 let offset = 8;
1408 self.page.memory_mut()[offset..(offset + size_of::<Checksum>())]
1409 .copy_from_slice(&checksum.to_le_bytes());
1410 let offset = 8 + size_of::<Checksum>() * (self.num_keys + 1);
1411 self.page.memory_mut()[offset..(offset + PageNumber::serialized_size())]
1412 .copy_from_slice(&page_number.to_le_bytes());
1413 }
1414
1415 fn key_section_start(&self) -> usize {
1416 let mut offset =
1417 8 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1);
1418 if self.fixed_key_size.is_none() {
1419 offset += size_of::<u32>() * self.num_keys;
1420 }
1421
1422 offset
1423 }
1424
1425 fn key_end(&self, n: usize) -> usize {
1426 if let Some(fixed) = self.fixed_key_size {
1427 return self.key_section_start() + fixed * (n + 1);
1428 }
1429 let offset = 8
1430 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1)
1431 + size_of::<u32>() * n;
1432 u32::from_le_bytes(
1433 self.page.memory()[offset..(offset + size_of::<u32>())]
1434 .try_into()
1435 .unwrap(),
1436 ) as usize
1437 }
1438
1439 pub(super) fn write_nth_key(
1442 &mut self,
1443 key: &[u8],
1444 page_number: PageNumber,
1445 checksum: Checksum,
1446 n: usize,
1447 ) {
1448 assert!(n < self.num_keys);
1449 assert_eq!(n, self.keys_written);
1450 self.keys_written += 1;
1451 let offset = 8 + size_of::<Checksum>() * (n + 1);
1452 self.page.memory_mut()[offset..(offset + size_of::<Checksum>())]
1453 .copy_from_slice(&checksum.to_le_bytes());
1454 let offset = 8
1455 + size_of::<Checksum>() * (self.num_keys + 1)
1456 + PageNumber::serialized_size() * (n + 1);
1457 self.page.memory_mut()[offset..(offset + PageNumber::serialized_size())]
1458 .copy_from_slice(&page_number.to_le_bytes());
1459
1460 let data_offset = if n > 0 {
1461 self.key_end(n - 1)
1462 } else {
1463 self.key_section_start()
1464 };
1465 if self.fixed_key_size.is_none() {
1466 let offset = 8
1467 + (PageNumber::serialized_size() + size_of::<Checksum>()) * (self.num_keys + 1)
1468 + size_of::<u32>() * n;
1469 self.page.memory_mut()[offset..(offset + size_of::<u32>())].copy_from_slice(
1470 &u32::try_from(data_offset + key.len())
1471 .unwrap()
1472 .to_le_bytes(),
1473 );
1474 }
1475
1476 debug_assert!(data_offset > offset);
1477 self.page.memory_mut()[data_offset..(data_offset + key.len())].copy_from_slice(key);
1478 }
1479}
1480
1481impl<'a: 'b, 'b> Drop for RawBranchBuilder<'a, 'b> {
1482 fn drop(&mut self) {
1483 if !thread::panicking() {
1484 assert_eq!(self.keys_written, self.num_keys);
1485 }
1486 }
1487}
1488
1489pub(super) struct BranchMutator<'a: 'b, 'b> {
1490 page: &'b mut PageMut<'a>,
1491}
1492
1493impl<'a: 'b, 'b> BranchMutator<'a, 'b> {
1494 pub(super) fn new(page: &'b mut PageMut<'a>) -> Self {
1495 assert_eq!(page.memory()[0], BRANCH);
1496 Self { page }
1497 }
1498
1499 fn num_keys(&self) -> usize {
1500 u16::from_le_bytes(self.page.memory()[2..4].try_into().unwrap()) as usize
1501 }
1502
1503 pub(super) fn write_child_page(
1504 &mut self,
1505 i: usize,
1506 page_number: PageNumber,
1507 checksum: Checksum,
1508 ) {
1509 debug_assert!(i <= self.num_keys());
1510 let offset = 8 + size_of::<Checksum>() * i;
1511 self.page.memory_mut()[offset..(offset + size_of::<Checksum>())]
1512 .copy_from_slice(&checksum.to_le_bytes());
1513 let offset =
1514 8 + size_of::<Checksum>() * (self.num_keys() + 1) + PageNumber::serialized_size() * i;
1515 self.page.memory_mut()[offset..(offset + PageNumber::serialized_size())]
1516 .copy_from_slice(&page_number.to_le_bytes());
1517 }
1518}