1use crate::cdc::types::{CdcEvent, ChangeOp};
2use crate::compat::{HashMap, Mutex};
3#[cfg(feature = "std")]
4use crate::db::CorruptPageInfo;
5use crate::db::TransactionGuard;
6use crate::multimap_table::DynamicCollectionType::{Inline, SubtreeV2};
7use crate::sealed::Sealed;
8use crate::table::{ReadableTableMetadata, TableStats};
9use crate::tree_store::{
10 AllPageNumbersBtreeIter, BRANCH, BranchAccessor, BranchMutator, Btree, BtreeHeader, BtreeMut,
11 BtreeRangeIter, BtreeStats, Checksum, DEFERRED, LEAF, LeafAccessor, LeafMutator,
12 MAX_PAIR_LENGTH, MAX_VALUE_LENGTH, Page, PageHint, PageNumber, PagePath, PageTrackerPolicy,
13 RawBtree, RawLeafBuilder, TransactionalMemory, UntypedBtree, UntypedBtreeMut, btree_stats,
14};
15use crate::types::{Key, TypeName, Value};
16use crate::{AccessGuard, MultimapTableHandle, Result, StorageError, WriteTransaction};
17use alloc::format;
18use alloc::string::{String, ToString};
19use alloc::sync::Arc;
20use alloc::vec;
21use alloc::vec::Vec;
22use core::borrow::Borrow;
23use core::cmp::max;
24use core::convert::TryInto;
25use core::marker::PhantomData;
26use core::mem;
27use core::mem::size_of;
28use core::ops::{RangeBounds, RangeFull};
29
30pub(crate) fn multimap_btree_stats(
31 root: Option<PageNumber>,
32 mem: &TransactionalMemory,
33 fixed_key_size: Option<usize>,
34 fixed_value_size: Option<usize>,
35) -> Result<BtreeStats> {
36 if let Some(root) = root {
37 multimap_stats_helper(root, mem, fixed_key_size, fixed_value_size)
38 } else {
39 Ok(BtreeStats {
40 tree_height: 0,
41 leaf_pages: 0,
42 branch_pages: 0,
43 stored_leaf_bytes: 0,
44 metadata_bytes: 0,
45 fragmented_bytes: 0,
46 })
47 }
48}
49
50fn multimap_stats_helper(
51 page_number: PageNumber,
52 mem: &TransactionalMemory,
53 fixed_key_size: Option<usize>,
54 fixed_value_size: Option<usize>,
55) -> Result<BtreeStats> {
56 let page = mem.get_page(page_number)?;
57 let node_mem = page.memory();
58 match node_mem[0] {
59 LEAF => {
60 let accessor = LeafAccessor::new(
61 page.memory(),
62 fixed_key_size,
63 DynamicCollection::<()>::fixed_width_with(fixed_value_size),
64 )?;
65 let mut leaf_bytes = 0u64;
66 let mut is_branch = false;
67 for i in 0..accessor.num_pairs() {
68 let entry = accessor
69 .entry(i)
70 .ok_or_else(|| StorageError::invalid_entry_index(page_number, i))?;
71 let collection: &UntypedDynamicCollection =
72 UntypedDynamicCollection::new(entry.value());
73 match collection.collection_type()? {
74 Inline => {
75 let inline_accessor = LeafAccessor::new(
76 collection.as_inline(),
77 fixed_value_size,
78 <() as Value>::fixed_width(),
79 )?;
80 leaf_bytes +=
81 inline_accessor.length_of_pairs(0, inline_accessor.num_pairs()) as u64;
82 }
83 SubtreeV2 => {
84 is_branch = true;
85 }
86 }
87 }
88 let mut overhead_bytes = (accessor.total_length() as u64) - leaf_bytes;
89 let mut fragmented_bytes = (page.memory().len() - accessor.total_length()) as u64;
90 let mut max_child_height = 0;
91 let (mut leaf_pages, mut branch_pages) = if is_branch { (0, 1) } else { (1, 0) };
92
93 for i in 0..accessor.num_pairs() {
94 let entry = accessor
95 .entry(i)
96 .ok_or_else(|| StorageError::invalid_entry_index(page_number, i))?;
97 let collection: &UntypedDynamicCollection =
98 UntypedDynamicCollection::new(entry.value());
99 match collection.collection_type()? {
100 Inline => {
101 }
103 SubtreeV2 => {
104 let stats = btree_stats(
106 Some(collection.as_subtree().root),
107 mem,
108 fixed_value_size,
109 <() as Value>::fixed_width(),
110 )?;
111 max_child_height = max(max_child_height, stats.tree_height);
112 branch_pages += stats.branch_pages;
113 leaf_pages += stats.leaf_pages;
114 fragmented_bytes += stats.fragmented_bytes;
115 overhead_bytes += stats.metadata_bytes;
116 leaf_bytes += stats.stored_leaf_bytes;
117 }
118 }
119 }
120
121 Ok(BtreeStats {
122 tree_height: max_child_height + 1,
123 leaf_pages,
124 branch_pages,
125 stored_leaf_bytes: leaf_bytes,
126 metadata_bytes: overhead_bytes,
127 fragmented_bytes,
128 })
129 }
130 BRANCH => {
131 let accessor = BranchAccessor::new(&page, fixed_key_size)?;
132 let mut max_child_height = 0;
133 let mut leaf_pages = 0;
134 let mut branch_pages = 1;
135 let mut stored_leaf_bytes = 0;
136 let mut metadata_bytes = accessor.total_length() as u64;
137 let mut fragmented_bytes = (page.memory().len() - accessor.total_length()) as u64;
138 for i in 0..accessor.count_children() {
139 if let Some(child) = accessor.child_page(i) {
140 let stats =
141 multimap_stats_helper(child, mem, fixed_key_size, fixed_value_size)?;
142 max_child_height = max(max_child_height, stats.tree_height);
143 leaf_pages += stats.leaf_pages;
144 branch_pages += stats.branch_pages;
145 stored_leaf_bytes += stats.stored_leaf_bytes;
146 metadata_bytes += stats.metadata_bytes;
147 fragmented_bytes += stats.fragmented_bytes;
148 }
149 }
150
151 Ok(BtreeStats {
152 tree_height: max_child_height + 1,
153 leaf_pages,
154 branch_pages,
155 stored_leaf_bytes,
156 metadata_bytes,
157 fragmented_bytes,
158 })
159 }
160 other => Err(StorageError::invalid_page_type(page_number, other)),
161 }
162}
163
164pub(crate) fn verify_tree_and_subtree_checksums(
166 root: Option<BtreeHeader>,
167 key_size: Option<usize>,
168 value_size: Option<usize>,
169 mem: Arc<TransactionalMemory>,
170) -> Result<bool> {
171 if let Some(header) = root {
172 if !RawBtree::new(
173 Some(header),
174 key_size,
175 DynamicCollection::<()>::fixed_width_with(value_size),
176 mem.clone(),
177 )
178 .verify_checksum()?
179 {
180 return Ok(false);
181 }
182
183 let table_pages_iter = AllPageNumbersBtreeIter::new(
184 header.root,
185 key_size,
186 DynamicCollection::<()>::fixed_width_with(value_size),
187 mem.clone(),
188 )?;
189 for table_page in table_pages_iter {
190 let page = mem.get_page(table_page?)?;
191 let subtree_roots = parse_subtree_roots(&page, key_size, value_size)?;
192 for header in subtree_roots {
193 if !RawBtree::new(Some(header), value_size, <()>::fixed_width(), mem.clone())
194 .verify_checksum()?
195 {
196 return Ok(false);
197 }
198 }
199 }
200 }
201
202 Ok(true)
203}
204
205#[cfg(feature = "std")]
207pub(crate) fn verify_tree_and_subtree_checksums_detailed(
208 root: Option<BtreeHeader>,
209 key_size: Option<usize>,
210 value_size: Option<usize>,
211 mem: Arc<TransactionalMemory>,
212) -> Result<(u64, Vec<CorruptPageInfo>)> {
213 let mut total_pages = 0u64;
214 let mut all_corruptions = Vec::new();
215
216 if let Some(header) = root {
217 let tree = RawBtree::new(
218 Some(header),
219 key_size,
220 DynamicCollection::<()>::fixed_width_with(value_size),
221 mem.clone(),
222 );
223 let (pages, corruptions) = tree.verify_checksum_detailed()?;
224 total_pages += pages;
225 all_corruptions.extend(corruptions);
226
227 let table_pages_iter = AllPageNumbersBtreeIter::new(
228 header.root,
229 key_size,
230 DynamicCollection::<()>::fixed_width_with(value_size),
231 mem.clone(),
232 )?;
233 for table_page in table_pages_iter {
234 let page = mem.get_page(table_page?)?;
235 let subtree_roots = parse_subtree_roots(&page, key_size, value_size)?;
236 for sub_header in subtree_roots {
237 let sub_tree = RawBtree::new(
238 Some(sub_header),
239 value_size,
240 <()>::fixed_width(),
241 mem.clone(),
242 );
243 let (pages, corruptions) = sub_tree.verify_checksum_detailed()?;
244 total_pages += pages;
245 all_corruptions.extend(corruptions);
246 }
247 }
248 }
249
250 Ok((total_pages, all_corruptions))
251}
252
253#[cfg(feature = "std")]
255pub(crate) fn verify_tree_and_subtree_structure(
256 root: Option<BtreeHeader>,
257 key_size: Option<usize>,
258 value_size: Option<usize>,
259 mem: Arc<TransactionalMemory>,
260) -> Result<Vec<CorruptPageInfo>> {
261 let mut all_corruptions = Vec::new();
262
263 if let Some(header) = root {
264 let tree = RawBtree::new(
265 Some(header),
266 key_size,
267 DynamicCollection::<()>::fixed_width_with(value_size),
268 mem.clone(),
269 );
270 all_corruptions.extend(tree.verify_structure()?);
271
272 let table_pages_iter = AllPageNumbersBtreeIter::new(
273 header.root,
274 key_size,
275 DynamicCollection::<()>::fixed_width_with(value_size),
276 mem.clone(),
277 )?;
278 for table_page in table_pages_iter {
279 let page = mem.get_page(table_page?)?;
280 let subtree_roots = parse_subtree_roots(&page, key_size, value_size)?;
281 for sub_header in subtree_roots {
282 let sub_tree = RawBtree::new(
283 Some(sub_header),
284 value_size,
285 <()>::fixed_width(),
286 mem.clone(),
287 );
288 all_corruptions.extend(sub_tree.verify_structure()?);
289 }
290 }
291 }
292
293 Ok(all_corruptions)
294}
295
296pub(crate) fn relocate_subtrees(
298 root: (PageNumber, Checksum),
299 key_size: Option<usize>,
300 value_size: Option<usize>,
301 mem: Arc<TransactionalMemory>,
302 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
303 relocation_map: &HashMap<PageNumber, PageNumber>,
304) -> Result<(PageNumber, Checksum)> {
305 let old_page = mem.get_page(root.0)?;
306 let mut new_page = if let Some(new_page_number) = relocation_map.get(&root.0) {
307 mem.get_page_mut(*new_page_number)?
308 } else {
309 return Ok(root);
310 };
311 let new_page_number = new_page.get_page_number();
312 new_page.memory_mut()?.copy_from_slice(old_page.memory());
313
314 match old_page.memory()[0] {
315 LEAF => {
316 let accessor = LeafAccessor::new(
317 old_page.memory(),
318 key_size,
319 UntypedDynamicCollection::fixed_width_with(value_size),
320 )?;
321 let mut mutator = LeafMutator::new(
323 new_page.memory_mut()?,
324 key_size,
325 UntypedDynamicCollection::fixed_width_with(value_size),
326 );
327 for i in 0..accessor.num_pairs() {
328 let entry = accessor
329 .entry(i)
330 .ok_or_else(|| StorageError::invalid_entry_index(root.0, i))?;
331 let collection = UntypedDynamicCollection::from_bytes(entry.value());
332 if matches!(collection.collection_type()?, SubtreeV2) {
333 let sub_root = collection.as_subtree();
334 let mut tree = UntypedBtreeMut::new(
335 Some(sub_root),
336 mem.clone(),
337 freed_pages.clone(),
338 value_size,
339 <() as Value>::fixed_width(),
340 );
341 tree.relocate(relocation_map)?;
342 let new_root = tree.get_root().ok_or_else(|| {
343 StorageError::Internal(
344 "missing subtree root after relocate in relocate_subtrees".to_string(),
345 )
346 })?;
347 if sub_root != new_root {
348 let new_collection = UntypedDynamicCollection::make_subtree_data(new_root);
349 mutator.insert(i, true, entry.key(), &new_collection);
350 }
351 }
352 }
353 }
354 BRANCH => {
355 let accessor = BranchAccessor::new(&old_page, key_size)?;
356 let mut mutator = BranchMutator::new(new_page.memory_mut()?);
357 for i in 0..accessor.count_children() {
358 if let Some(child) = accessor.child_page(i) {
359 let child_checksum = accessor.child_checksum(i).unwrap();
360 let (new_child, new_checksum) = relocate_subtrees(
361 (child, child_checksum),
362 key_size,
363 value_size,
364 mem.clone(),
365 freed_pages.clone(),
366 relocation_map,
367 )?;
368 mutator.write_child_page(i, new_child, new_checksum);
369 }
370 }
371 }
372 _ => {
373 return Err(StorageError::invalid_page_type(
374 root.0,
375 old_page.memory()[0],
376 ));
377 }
378 }
379
380 let old_page_number = old_page.get_page_number();
381 drop(old_page);
382 let mut ignore = PageTrackerPolicy::Ignore;
385 if !mem.free_if_uncommitted(old_page_number, &mut ignore) {
386 freed_pages.lock().push(old_page_number);
387 }
388 Ok((new_page_number, DEFERRED))
389}
390
391pub(crate) fn finalize_tree_and_subtree_checksums(
394 root: Option<BtreeHeader>,
395 key_size: Option<usize>,
396 value_size: Option<usize>,
397 mem: Arc<TransactionalMemory>,
398) -> Result<Option<BtreeHeader>> {
399 let freed_pages = Arc::new(Mutex::new(vec![]));
400 let mut tree = UntypedBtreeMut::new(
401 root,
402 mem.clone(),
403 freed_pages.clone(),
404 key_size,
405 DynamicCollection::<()>::fixed_width_with(value_size),
406 );
407 tree.dirty_leaf_visitor(|mut leaf_page| {
408 let mut sub_root_updates = vec![];
409 let accessor = LeafAccessor::new(
410 leaf_page.memory(),
411 key_size,
412 DynamicCollection::<()>::fixed_width_with(value_size),
413 )?;
414 for i in 0..accessor.num_pairs() {
415 let entry = accessor
416 .entry(i)
417 .ok_or_else(|| StorageError::invalid_entry_index(leaf_page.get_page_number(), i))?;
418 let collection = <&DynamicCollection<()>>::from_bytes(entry.value());
419 if matches!(collection.collection_type()?, SubtreeV2) {
420 let sub_root = collection.as_subtree();
421 if mem.uncommitted(sub_root.root) {
422 let mut subtree = UntypedBtreeMut::new(
423 Some(sub_root),
424 mem.clone(),
425 freed_pages.clone(),
426 value_size,
427 <()>::fixed_width(),
428 );
429 let subtree_root = subtree.finalize_dirty_checksums()?.unwrap();
430 sub_root_updates.push((i, entry.key().to_vec(), subtree_root));
431 }
432 }
433 }
434 let mut mutator = LeafMutator::new(
436 leaf_page.memory_mut()?,
437 key_size,
438 DynamicCollection::<()>::fixed_width_with(value_size),
439 );
440 for (i, key, sub_root) in sub_root_updates {
441 let collection = DynamicCollection::<()>::make_subtree_data(sub_root);
442 mutator.insert(i, true, &key, &collection);
443 }
444
445 Ok(())
446 })?;
447
448 let root = tree.finalize_dirty_checksums()?;
449 assert!(freed_pages.lock().is_empty());
451 Ok(root)
452}
453
454fn parse_subtree_roots<T: Page>(
455 page: &T,
456 fixed_key_size: Option<usize>,
457 fixed_value_size: Option<usize>,
458) -> crate::Result<Vec<BtreeHeader>> {
459 match page.memory()[0] {
460 BRANCH => Ok(vec![]),
461 LEAF => {
462 let mut result = vec![];
463 let accessor = LeafAccessor::new(
464 page.memory(),
465 fixed_key_size,
466 DynamicCollection::<()>::fixed_width_with(fixed_value_size),
467 )?;
468 for i in 0..accessor.num_pairs() {
469 let entry = accessor
470 .entry(i)
471 .ok_or_else(|| StorageError::invalid_entry_index(page.get_page_number(), i))?;
472 let collection = <&DynamicCollection<()>>::from_bytes(entry.value());
473 if matches!(collection.collection_type()?, SubtreeV2) {
474 result.push(collection.as_subtree());
475 }
476 }
477
478 Ok(result)
479 }
480 _ => Err(StorageError::invalid_page_type(
481 page.get_page_number(),
482 page.memory()[0],
483 )),
484 }
485}
486
487pub(crate) struct UntypedMultiBtree {
488 mem: Arc<TransactionalMemory>,
489 root: Option<BtreeHeader>,
490 key_width: Option<usize>,
491 value_width: Option<usize>,
492}
493
494impl UntypedMultiBtree {
495 pub(crate) fn new(
496 root: Option<BtreeHeader>,
497 mem: Arc<TransactionalMemory>,
498 key_width: Option<usize>,
499 value_width: Option<usize>,
500 ) -> Self {
501 Self {
502 mem,
503 root,
504 key_width,
505 value_width,
506 }
507 }
508
509 pub(crate) fn visit_all_pages<F>(&self, mut visitor: F) -> Result
511 where
512 F: FnMut(&PagePath) -> Result,
513 {
514 let tree = UntypedBtree::new(
515 self.root,
516 self.mem.clone(),
517 self.key_width,
518 UntypedDynamicCollection::fixed_width_with(self.value_width),
519 );
520 tree.visit_all_pages(|path| {
521 visitor(path)?;
522 let page = self.mem.get_page(path.page_number())?;
523 match page.memory()[0] {
524 LEAF => {
525 for header in parse_subtree_roots(&page, self.key_width, self.value_width)? {
526 let subtree = UntypedBtree::new(
527 Some(header),
528 self.mem.clone(),
529 self.value_width,
530 <() as Value>::fixed_width(),
531 );
532 subtree.visit_all_pages(|subpath| {
533 let full_path = path.with_subpath(subpath);
534 visitor(&full_path)
535 })?;
536 }
537 }
538 BRANCH => {
539 }
541 other => {
542 return Err(StorageError::invalid_page_type(path.page_number(), other));
543 }
544 }
545 Ok(())
546 })?;
547
548 Ok(())
549 }
550}
551
552pub(crate) struct LeafKeyIter<'a, V: Key + 'static> {
553 inline_collection: AccessGuard<'a, &'static DynamicCollection<V>>,
554 fixed_key_size: Option<usize>,
555 fixed_value_size: Option<usize>,
556 start_entry: isize, end_entry: isize, }
559
560impl<'a, V: Key> LeafKeyIter<'a, V> {
561 fn new(
562 data: AccessGuard<'a, &'static DynamicCollection<V>>,
563 fixed_key_size: Option<usize>,
564 fixed_value_size: Option<usize>,
565 ) -> Result<Self> {
566 let accessor =
567 LeafAccessor::new(data.value().as_inline(), fixed_key_size, fixed_value_size)?;
568 let num_pairs = accessor.num_pairs();
569 let end_entry = if num_pairs == 0 {
570 -1
571 } else {
572 isize::try_from(num_pairs).unwrap_or(isize::MAX) - 1
573 };
574 Ok(Self {
575 inline_collection: data,
576 fixed_key_size,
577 fixed_value_size,
578 start_entry: 0,
579 end_entry,
580 })
581 }
582
583 fn next_key(&mut self) -> Option<Result<&[u8]>> {
584 if self.end_entry < self.start_entry {
585 return None;
586 }
587 let accessor = match LeafAccessor::new(
588 self.inline_collection.value().as_inline(),
589 self.fixed_key_size,
590 self.fixed_value_size,
591 ) {
592 Ok(a) => a,
593 Err(e) => return Some(Err(e)),
594 };
595 self.start_entry += 1;
596 accessor
597 .entry((self.start_entry - 1).try_into().unwrap())
598 .map(|e| Ok(e.key()))
599 }
600
601 fn next_key_back(&mut self) -> Option<Result<&[u8]>> {
602 if self.end_entry < self.start_entry {
603 return None;
604 }
605 let accessor = match LeafAccessor::new(
606 self.inline_collection.value().as_inline(),
607 self.fixed_key_size,
608 self.fixed_value_size,
609 ) {
610 Ok(a) => a,
611 Err(e) => return Some(Err(e)),
612 };
613 self.end_entry -= 1;
614 accessor
615 .entry((self.end_entry + 1).try_into().unwrap())
616 .map(|e| Ok(e.key()))
617 }
618}
619
620enum DynamicCollectionType {
621 Inline,
622 SubtreeV2,
625}
626
627impl DynamicCollectionType {
628 fn try_from_byte(value: u8) -> crate::Result<Self> {
629 match value {
630 LEAF => Ok(Inline),
631 3 => Ok(SubtreeV2),
633 _ => Err(StorageError::Corrupted(format!(
634 "invalid DynamicCollectionType byte: {value}"
635 ))),
636 }
637 }
638}
639
640#[allow(clippy::from_over_into)]
641impl Into<u8> for DynamicCollectionType {
642 fn into(self) -> u8 {
643 match self {
644 Inline => LEAF,
647 SubtreeV2 => 3,
649 }
650 }
651}
652
653#[repr(transparent)]
667struct DynamicCollection<V: Key> {
668 _value_type: PhantomData<V>,
669 data: [u8],
670}
671
672impl<V: Key> core::fmt::Debug for DynamicCollection<V> {
673 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
674 f.debug_struct("DynamicCollection")
675 .field("data", &&self.data)
676 .finish()
677 }
678}
679
680impl<V: Key> Value for &DynamicCollection<V> {
681 type SelfType<'a>
682 = &'a DynamicCollection<V>
683 where
684 Self: 'a;
685 type AsBytes<'a>
686 = &'a [u8]
687 where
688 Self: 'a;
689
690 fn fixed_width() -> Option<usize> {
691 None
692 }
693
694 fn from_bytes<'a>(data: &'a [u8]) -> &'a DynamicCollection<V>
695 where
696 Self: 'a,
697 {
698 DynamicCollection::new(data)
699 }
700
701 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> &'a [u8]
702 where
703 Self: 'b,
704 {
705 &value.data
706 }
707
708 fn type_name() -> TypeName {
709 TypeName::internal("redb::DynamicCollection")
710 }
711}
712
713impl<V: Key> DynamicCollection<V> {
714 fn new(data: &[u8]) -> &Self {
715 unsafe { &*(core::ptr::from_ref::<[u8]>(data) as *const DynamicCollection<V>) }
720 }
721
722 fn collection_type(&self) -> crate::Result<DynamicCollectionType> {
723 DynamicCollectionType::try_from_byte(self.data[0])
724 }
725
726 fn as_inline(&self) -> &[u8] {
727 &self.data[1..]
728 }
729
730 fn as_subtree(&self) -> BtreeHeader {
731 BtreeHeader::from_le_bytes(
732 self.data[1..=BtreeHeader::serialized_size()]
733 .try_into()
734 .unwrap(),
735 )
736 }
737
738 fn get_num_values(&self) -> crate::Result<u64> {
739 Ok(match self.collection_type()? {
740 Inline => {
741 let leaf_data = self.as_inline();
742 let accessor =
743 LeafAccessor::new(leaf_data, V::fixed_width(), <() as Value>::fixed_width())?;
744 accessor.num_pairs() as u64
745 }
746 SubtreeV2 => {
747 let offset = 1 + PageNumber::serialized_size() + size_of::<Checksum>();
748 u64::from_le_bytes(
749 self.data[offset..(offset + size_of::<u64>())]
750 .try_into()
751 .unwrap(),
752 )
753 }
754 })
755 }
756
757 fn make_inline_data(data: &[u8]) -> Vec<u8> {
758 let mut result = vec![Inline.into()];
759 result.extend_from_slice(data);
760
761 result
762 }
763
764 fn make_subtree_data(header: BtreeHeader) -> Vec<u8> {
765 let mut result = vec![SubtreeV2.into()];
766 result.extend_from_slice(&header.to_le_bytes());
767 result
768 }
769
770 pub(crate) fn fixed_width_with(_value_width: Option<usize>) -> Option<usize> {
771 None
772 }
773}
774
775impl<V: Key> DynamicCollection<V> {
776 fn iter<'a>(
777 collection: AccessGuard<'a, &'static DynamicCollection<V>>,
778 guard: Arc<TransactionGuard>,
779 mem: Arc<TransactionalMemory>,
780 ) -> Result<MultimapValue<'a, V>> {
781 Ok(match collection.value().collection_type()? {
782 Inline => {
783 let leaf_iter =
784 LeafKeyIter::new(collection, V::fixed_width(), <() as Value>::fixed_width())?;
785 MultimapValue::new_inline(leaf_iter, guard)?
786 }
787 SubtreeV2 => {
788 let root = collection.value().as_subtree().root;
789 MultimapValue::new_subtree(
790 BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(
791 &(..),
792 Some(root),
793 None,
794 if mem.compression().is_enabled() {
795 None
796 } else {
797 <() as Value>::fixed_width()
798 },
799 mem,
800 false,
801 )?,
802 collection.value().get_num_values()?,
803 guard,
804 )
805 }
806 })
807 }
808
809 fn iter_free_on_drop<'a>(
810 collection: AccessGuard<'a, &'static DynamicCollection<V>>,
811 pages: Vec<PageNumber>,
812 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
813 allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
814 guard: Arc<TransactionGuard>,
815 mem: Arc<TransactionalMemory>,
816 ) -> Result<MultimapValue<'a, V>> {
817 let num_values = collection.value().get_num_values()?;
818 Ok(match collection.value().collection_type()? {
819 Inline => {
820 let leaf_iter =
821 LeafKeyIter::new(collection, V::fixed_width(), <() as Value>::fixed_width())?;
822 MultimapValue::new_inline(leaf_iter, guard)?
823 }
824 SubtreeV2 => {
825 let root = collection.value().as_subtree().root;
826 let inner = BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(
827 &(..),
828 Some(root),
829 None,
830 if mem.compression().is_enabled() {
831 None
832 } else {
833 <() as Value>::fixed_width()
834 },
835 mem.clone(),
836 false,
837 )?;
838 MultimapValue::new_subtree_free_on_drop(
839 inner,
840 num_values,
841 freed_pages,
842 allocated_pages,
843 pages,
844 guard,
845 mem,
846 )
847 }
848 })
849 }
850}
851
852#[repr(transparent)]
853struct UntypedDynamicCollection {
854 data: [u8],
855}
856
857impl UntypedDynamicCollection {
858 pub(crate) fn fixed_width_with(_value_width: Option<usize>) -> Option<usize> {
859 None
860 }
861
862 fn new(data: &[u8]) -> &Self {
863 unsafe { &*(core::ptr::from_ref::<[u8]>(data) as *const UntypedDynamicCollection) }
868 }
869
870 fn make_subtree_data(header: BtreeHeader) -> Vec<u8> {
871 let mut result = vec![SubtreeV2.into()];
872 result.extend_from_slice(&header.to_le_bytes());
873 result
874 }
875
876 fn from_bytes(data: &[u8]) -> &Self {
877 Self::new(data)
878 }
879
880 fn collection_type(&self) -> crate::Result<DynamicCollectionType> {
881 DynamicCollectionType::try_from_byte(self.data[0])
882 }
883
884 fn as_inline(&self) -> &[u8] {
885 &self.data[1..]
886 }
887
888 fn as_subtree(&self) -> BtreeHeader {
889 BtreeHeader::from_le_bytes(
890 self.data[1..=BtreeHeader::serialized_size()]
891 .try_into()
892 .unwrap(),
893 )
894 }
895}
896
897enum ValueIterState<'a, V: Key + 'static> {
898 Subtree(BtreeRangeIter<V, ()>),
899 InlineLeaf(LeafKeyIter<'a, V>),
900}
901
902pub struct MultimapValue<'a, V: Key + 'static> {
903 inner: Option<ValueIterState<'a, V>>,
904 remaining: u64,
905 freed_pages: Option<Arc<Mutex<Vec<PageNumber>>>>,
906 allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
907 free_on_drop: Vec<PageNumber>,
908 _transaction_guard: Arc<TransactionGuard>,
909 mem: Option<Arc<TransactionalMemory>>,
910 _value_type: PhantomData<V>,
911}
912
913impl<'a, V: Key + 'static> MultimapValue<'a, V> {
914 fn new_subtree(
915 inner: BtreeRangeIter<V, ()>,
916 num_values: u64,
917 guard: Arc<TransactionGuard>,
918 ) -> Self {
919 Self {
920 inner: Some(ValueIterState::Subtree(inner)),
921 remaining: num_values,
922 freed_pages: None,
923 allocated_pages: Arc::new(Mutex::new(PageTrackerPolicy::Closed)),
924 free_on_drop: vec![],
925 _transaction_guard: guard,
926 mem: None,
927 _value_type: Default::default(),
928 }
929 }
930
931 fn new_subtree_free_on_drop(
932 inner: BtreeRangeIter<V, ()>,
933 num_values: u64,
934 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
935 allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
936 pages: Vec<PageNumber>,
937 guard: Arc<TransactionGuard>,
938 mem: Arc<TransactionalMemory>,
939 ) -> Self {
940 Self {
941 inner: Some(ValueIterState::Subtree(inner)),
942 remaining: num_values,
943 freed_pages: Some(freed_pages),
944 free_on_drop: pages,
945 allocated_pages,
946 _transaction_guard: guard,
947 mem: Some(mem),
948 _value_type: Default::default(),
949 }
950 }
951
952 fn new_inline(inner: LeafKeyIter<'a, V>, guard: Arc<TransactionGuard>) -> Result<Self> {
953 let remaining = inner.inline_collection.value().get_num_values()?;
954 Ok(Self {
955 inner: Some(ValueIterState::InlineLeaf(inner)),
956 remaining,
957 freed_pages: None,
958 allocated_pages: Arc::new(Mutex::new(PageTrackerPolicy::Closed)),
959 free_on_drop: vec![],
960 _transaction_guard: guard,
961 mem: None,
962 _value_type: Default::default(),
963 })
964 }
965
966 pub fn len(&self) -> u64 {
970 self.remaining
971 }
972
973 pub fn is_empty(&self) -> bool {
974 self.len() == 0
975 }
976}
977
978impl<'a, V: Key + 'static> Iterator for MultimapValue<'a, V> {
979 type Item = Result<AccessGuard<'a, V>>;
980
981 fn next(&mut self) -> Option<Self::Item> {
982 let bytes = match self.inner.as_mut()? {
983 ValueIterState::Subtree(iter) => match iter.next()? {
984 Ok(e) => e.key_data(),
985 Err(err) => {
986 return Some(Err(err));
987 }
988 },
989 ValueIterState::InlineLeaf(iter) => match iter.next_key()? {
990 Ok(bytes) => bytes.to_vec(),
991 Err(err) => return Some(Err(err)),
992 },
993 };
994 self.remaining -= 1;
995 Some(Ok(AccessGuard::with_owned_value(bytes)))
996 }
997}
998
999impl<V: Key + 'static> DoubleEndedIterator for MultimapValue<'_, V> {
1000 fn next_back(&mut self) -> Option<Self::Item> {
1001 let bytes = match self.inner.as_mut()? {
1002 ValueIterState::Subtree(iter) => match iter.next_back()? {
1003 Ok(e) => e.key_data(),
1004 Err(err) => {
1005 return Some(Err(err));
1006 }
1007 },
1008 ValueIterState::InlineLeaf(iter) => match iter.next_key_back()? {
1009 Ok(bytes) => bytes.to_vec(),
1010 Err(err) => return Some(Err(err)),
1011 },
1012 };
1013 self.remaining -= 1;
1014 Some(Ok(AccessGuard::with_owned_value(bytes)))
1015 }
1016}
1017
1018impl<V: Key + 'static> Drop for MultimapValue<'_, V> {
1019 fn drop(&mut self) {
1020 drop(mem::take(&mut self.inner));
1022 if !self.free_on_drop.is_empty()
1023 && let (Some(freed_pages_arc), Some(mem_arc)) =
1024 (self.freed_pages.as_ref(), self.mem.as_ref())
1025 {
1026 let mut freed_pages = freed_pages_arc.lock();
1027 let mut allocated_pages = self.allocated_pages.lock();
1028 for page in &self.free_on_drop {
1029 if !mem_arc.free_if_uncommitted(*page, &mut allocated_pages) {
1030 freed_pages.push(*page);
1031 }
1032 }
1033 }
1034 }
1035}
1036
1037pub struct MultimapRange<'a, K: Key + 'static, V: Key + 'static> {
1038 inner: BtreeRangeIter<K, &'static DynamicCollection<V>>,
1039 mem: Arc<TransactionalMemory>,
1040 transaction_guard: Arc<TransactionGuard>,
1041 _key_type: PhantomData<K>,
1042 _value_type: PhantomData<V>,
1043 _lifetime: PhantomData<&'a ()>,
1044}
1045
1046impl<K: Key + 'static, V: Key + 'static> MultimapRange<'_, K, V> {
1047 fn new(
1048 inner: BtreeRangeIter<K, &'static DynamicCollection<V>>,
1049 guard: Arc<TransactionGuard>,
1050 mem: Arc<TransactionalMemory>,
1051 ) -> Self {
1052 Self {
1053 inner,
1054 mem,
1055 transaction_guard: guard,
1056 _key_type: Default::default(),
1057 _value_type: Default::default(),
1058 _lifetime: Default::default(),
1059 }
1060 }
1061}
1062
1063impl<'a, K: Key + 'static, V: Key + 'static> Iterator for MultimapRange<'a, K, V> {
1064 type Item = Result<(AccessGuard<'a, K>, MultimapValue<'a, V>)>;
1065
1066 fn next(&mut self) -> Option<Self::Item> {
1067 match self.inner.next()? {
1068 Ok(entry) => {
1069 let key = AccessGuard::with_owned_value(entry.key_data());
1070 let (page, _, value_range, decompressed_value) = entry.into_raw();
1071 let collection = if let Some(bytes) = decompressed_value {
1072 AccessGuard::with_owned_value(bytes)
1073 } else {
1074 AccessGuard::with_page(page, value_range)
1075 };
1076 Some(
1077 DynamicCollection::iter(
1078 collection,
1079 self.transaction_guard.clone(),
1080 self.mem.clone(),
1081 )
1082 .map(|iter| (key, iter)),
1083 )
1084 }
1085 Err(err) => Some(Err(err)),
1086 }
1087 }
1088}
1089
1090impl<K: Key + 'static, V: Key + 'static> DoubleEndedIterator for MultimapRange<'_, K, V> {
1091 fn next_back(&mut self) -> Option<Self::Item> {
1092 match self.inner.next_back()? {
1093 Ok(entry) => {
1094 let key = AccessGuard::with_owned_value(entry.key_data());
1095 let (page, _, value_range, decompressed_value) = entry.into_raw();
1096 let collection = if let Some(bytes) = decompressed_value {
1097 AccessGuard::with_owned_value(bytes)
1098 } else {
1099 AccessGuard::with_page(page, value_range)
1100 };
1101 Some(
1102 DynamicCollection::iter(
1103 collection,
1104 self.transaction_guard.clone(),
1105 self.mem.clone(),
1106 )
1107 .map(|iter| (key, iter)),
1108 )
1109 }
1110 Err(err) => Some(Err(err)),
1111 }
1112 }
1113}
1114
1115pub struct MultimapTable<'txn, K: Key + 'static, V: Key + 'static> {
1119 name: String,
1120 num_values: u64,
1121 transaction: &'txn WriteTransaction,
1122 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
1123 allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
1124 tree: BtreeMut<'txn, K, &'static DynamicCollection<V>>,
1125 mem: Arc<TransactionalMemory>,
1126 _value_type: PhantomData<V>,
1127}
1128
1129impl<K: Key + 'static, V: Key + 'static> MultimapTableHandle for MultimapTable<'_, K, V> {
1130 fn name(&self) -> &str {
1131 &self.name
1132 }
1133}
1134
1135impl<'txn, K: Key + 'static, V: Key + 'static> MultimapTable<'txn, K, V> {
1136 pub(crate) fn new(
1137 name: &str,
1138 table_root: Option<BtreeHeader>,
1139 num_values: u64,
1140 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
1141 allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
1142 mem: Arc<TransactionalMemory>,
1143 transaction: &'txn WriteTransaction,
1144 ) -> MultimapTable<'txn, K, V> {
1145 MultimapTable {
1146 name: name.to_string(),
1147 num_values,
1148 transaction,
1149 freed_pages: freed_pages.clone(),
1150 allocated_pages: allocated_pages.clone(),
1151 tree: BtreeMut::new(
1152 table_root,
1153 transaction.transaction_guard(),
1154 mem.clone(),
1155 freed_pages,
1156 allocated_pages,
1157 ),
1158 mem,
1159 _value_type: Default::default(),
1160 }
1161 }
1162
1163 #[allow(dead_code)]
1164 #[cfg(feature = "std")]
1165 pub(crate) fn print_debug(&self, include_values: bool) -> Result {
1166 self.tree.print_debug(include_values)
1167 }
1168
1169 pub fn insert<'k, 'v>(
1173 &mut self,
1174 key: impl Borrow<K::SelfType<'k>>,
1175 value: impl Borrow<V::SelfType<'v>>,
1176 ) -> Result<bool> {
1177 let value_bytes = V::as_bytes(value.borrow());
1178 let value_bytes_ref = value_bytes.as_ref();
1179 if value_bytes_ref.len() > MAX_VALUE_LENGTH {
1180 return Err(StorageError::ValueTooLarge(value_bytes_ref.len()));
1181 }
1182 let key_len = K::as_bytes(key.borrow()).as_ref().len();
1183 if key_len > MAX_VALUE_LENGTH {
1184 return Err(StorageError::ValueTooLarge(key_len));
1185 }
1186 if value_bytes_ref.len() + key_len > MAX_PAIR_LENGTH {
1187 return Err(StorageError::ValueTooLarge(value_bytes_ref.len() + key_len));
1188 }
1189 let get_result = self.tree.get(key.borrow())?;
1190 let existed = if get_result.is_some() {
1191 #[allow(clippy::unnecessary_unwrap)]
1192 let guard = get_result.unwrap();
1193 let collection_type = guard.value().collection_type()?;
1194 match collection_type {
1195 Inline => {
1196 let leaf_data = guard.value().as_inline();
1197 let accessor = LeafAccessor::new(
1198 leaf_data,
1199 V::fixed_width(),
1200 <() as Value>::fixed_width(),
1201 )?;
1202 let (position, found) = accessor.position::<V>(value_bytes_ref);
1203 if found {
1204 return Ok(true);
1205 }
1206
1207 let num_pairs = accessor.num_pairs();
1208 let new_pairs = num_pairs + 1;
1209 let new_pair_bytes =
1210 accessor.length_of_pairs(0, accessor.num_pairs()) + value_bytes_ref.len();
1211 let new_key_bytes =
1212 accessor.length_of_keys(0, accessor.num_pairs()) + value_bytes_ref.len();
1213 let required_inline_bytes = RawLeafBuilder::required_bytes(
1214 new_pairs,
1215 new_pair_bytes,
1216 V::fixed_width(),
1217 <() as Value>::fixed_width(),
1218 );
1219
1220 if required_inline_bytes < self.mem.get_page_size() / 2 {
1221 let mut data = vec![0; required_inline_bytes];
1222 let mut builder = RawLeafBuilder::new(
1223 &mut data,
1224 new_pairs,
1225 V::fixed_width(),
1226 <() as Value>::fixed_width(),
1227 new_key_bytes,
1228 );
1229 for i in 0..accessor.num_pairs() {
1230 if i == position {
1231 builder
1232 .append(value_bytes_ref, <() as Value>::as_bytes(&()).as_ref());
1233 }
1234 let entry = accessor.entry(i).ok_or_else(|| {
1235 StorageError::Corrupted(format!(
1236 "invalid entry index {i} in multimap insert inline expansion"
1237 ))
1238 })?;
1239 builder.append(entry.key(), entry.value());
1240 }
1241 if position == accessor.num_pairs() {
1242 builder.append(value_bytes_ref, <() as Value>::as_bytes(&()).as_ref());
1243 }
1244 drop(builder);
1245 drop(guard);
1246 let inline_data = DynamicCollection::<V>::make_inline_data(&data);
1247 self.tree
1248 .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1249 } else {
1250 let mut allocated = self.allocated_pages.lock();
1252 let mut page = self.mem.allocate(leaf_data.len(), &mut allocated)?;
1253 drop(allocated);
1254 page.memory_mut()?[..leaf_data.len()].copy_from_slice(leaf_data);
1255 let page_number = page.get_page_number();
1256 drop(page);
1257 drop(guard);
1258
1259 let mut subtree: BtreeMut<'_, V, ()> = BtreeMut::new(
1261 Some(BtreeHeader::new(page_number, 0, num_pairs as u64)),
1262 self.transaction.transaction_guard(),
1263 self.mem.clone(),
1264 self.freed_pages.clone(),
1265 self.allocated_pages.clone(),
1266 );
1267 let existed = subtree.insert(value.borrow(), &())?.is_some();
1268 assert_eq!(existed, found);
1269 let subtree_root = subtree.get_root().ok_or_else(|| {
1270 StorageError::Internal(
1271 "missing subtree root after insert in inline-to-subtree conversion"
1272 .to_string(),
1273 )
1274 })?;
1275 let subtree_data = DynamicCollection::<V>::make_subtree_data(subtree_root);
1276 self.tree
1277 .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1278 }
1279
1280 found
1281 }
1282 SubtreeV2 => {
1283 let mut subtree: BtreeMut<'_, V, ()> = BtreeMut::new(
1284 Some(guard.value().as_subtree()),
1285 self.transaction.transaction_guard(),
1286 self.mem.clone(),
1287 self.freed_pages.clone(),
1288 self.allocated_pages.clone(),
1289 );
1290 drop(guard);
1291 let existed = subtree.insert(value.borrow(), &())?.is_some();
1292 let subtree_root = subtree.get_root().ok_or_else(|| {
1293 StorageError::Internal(
1294 "missing subtree root after insert into existing subtree".to_string(),
1295 )
1296 })?;
1297 let subtree_data = DynamicCollection::<V>::make_subtree_data(subtree_root);
1298 self.tree
1299 .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1300
1301 existed
1302 }
1303 }
1304 } else {
1305 drop(get_result);
1306 let required_inline_bytes = RawLeafBuilder::required_bytes(
1307 1,
1308 value_bytes_ref.len(),
1309 V::fixed_width(),
1310 <() as Value>::fixed_width(),
1311 );
1312 if required_inline_bytes < self.mem.get_page_size() / 2 {
1313 let mut data = vec![0; required_inline_bytes];
1314 let mut builder = RawLeafBuilder::new(
1315 &mut data,
1316 1,
1317 V::fixed_width(),
1318 <() as Value>::fixed_width(),
1319 value_bytes_ref.len(),
1320 );
1321 builder.append(value_bytes_ref, <() as Value>::as_bytes(&()).as_ref());
1322 drop(builder);
1323 let inline_data = DynamicCollection::<V>::make_inline_data(&data);
1324 self.tree
1325 .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1326 } else {
1327 let mut subtree: BtreeMut<'_, V, ()> = BtreeMut::new(
1328 None,
1329 self.transaction.transaction_guard(),
1330 self.mem.clone(),
1331 self.freed_pages.clone(),
1332 self.allocated_pages.clone(),
1333 );
1334 subtree.insert(value.borrow(), &())?;
1335 let subtree_root = subtree.get_root().ok_or_else(|| {
1336 StorageError::Internal(
1337 "missing subtree root after insert into new subtree".to_string(),
1338 )
1339 })?;
1340 let subtree_data = DynamicCollection::<V>::make_subtree_data(subtree_root);
1341 self.tree
1342 .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1343 }
1344 false
1345 };
1346
1347 if !existed {
1348 self.num_values += 1;
1349 if self.transaction.cdc_log.is_some() {
1350 self.transaction.record_cdc(CdcEvent {
1351 table_name: self.name.clone(),
1352 op: ChangeOp::Insert,
1353 key: K::as_bytes(key.borrow()).as_ref().to_vec(),
1354 new_value: Some(V::as_bytes(value.borrow()).as_ref().to_vec()),
1355 old_value: None,
1356 });
1357 }
1358 }
1359
1360 Ok(existed)
1361 }
1362
1363 pub fn remove<'k, 'v>(
1367 &mut self,
1368 key: impl Borrow<K::SelfType<'k>>,
1369 value: impl Borrow<V::SelfType<'v>>,
1370 ) -> Result<bool> {
1371 let get_result = self.tree.get(key.borrow())?;
1372 if get_result.is_none() {
1373 return Ok(false);
1374 }
1375 let guard = get_result.unwrap();
1376 let v = guard.value();
1377 let existed = match v.collection_type()? {
1378 Inline => {
1379 let leaf_data = v.as_inline();
1380 let accessor =
1381 LeafAccessor::new(leaf_data, V::fixed_width(), <() as Value>::fixed_width())?;
1382 if let Some(position) = accessor.find_key::<V>(V::as_bytes(value.borrow()).as_ref())
1383 {
1384 let old_num_pairs = accessor.num_pairs();
1385 if old_num_pairs == 1 {
1386 drop(guard);
1387 self.tree.remove(key.borrow())?;
1388 } else {
1389 let old_pairs_len = accessor.length_of_pairs(0, old_num_pairs);
1390 let removed_value_len = accessor
1391 .entry(position)
1392 .ok_or_else(|| {
1393 StorageError::Corrupted(format!(
1394 "invalid entry index {position} in multimap remove"
1395 ))
1396 })?
1397 .key()
1398 .len();
1399 let required = RawLeafBuilder::required_bytes(
1400 old_num_pairs - 1,
1401 old_pairs_len - removed_value_len,
1402 V::fixed_width(),
1403 <() as Value>::fixed_width(),
1404 );
1405 let mut new_data = vec![0; required];
1406 let new_key_len =
1407 accessor.length_of_keys(0, old_num_pairs) - removed_value_len;
1408 let mut builder = RawLeafBuilder::new(
1409 &mut new_data,
1410 old_num_pairs - 1,
1411 V::fixed_width(),
1412 <() as Value>::fixed_width(),
1413 new_key_len,
1414 );
1415 for i in 0..old_num_pairs {
1416 if i != position {
1417 let entry = accessor.entry(i).ok_or_else(|| {
1418 StorageError::Corrupted(format!(
1419 "invalid entry index {i} in multimap remove rebuild"
1420 ))
1421 })?;
1422 builder.append(entry.key(), entry.value());
1423 }
1424 }
1425 drop(builder);
1426 drop(guard);
1427
1428 let inline_data = DynamicCollection::<V>::make_inline_data(&new_data);
1429 self.tree
1430 .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1431 }
1432 true
1433 } else {
1434 drop(guard);
1435 false
1436 }
1437 }
1438 SubtreeV2 => {
1439 let mut subtree: BtreeMut<V, ()> = BtreeMut::new(
1440 Some(v.as_subtree()),
1441 self.transaction.transaction_guard(),
1442 self.mem.clone(),
1443 self.freed_pages.clone(),
1444 self.allocated_pages.clone(),
1445 );
1446 drop(guard);
1447 let existed = subtree.remove(value.borrow())?.is_some();
1448
1449 if let Some(BtreeHeader {
1450 root: new_root,
1451 checksum: new_checksum,
1452 length: new_length,
1453 }) = subtree.get_root()
1454 {
1455 let page = self.mem.get_page(new_root)?;
1456 match page.memory()[0] {
1457 LEAF => {
1458 let accessor = LeafAccessor::new(
1459 page.memory(),
1460 V::fixed_width(),
1461 <() as Value>::fixed_width(),
1462 )?;
1463 let len = accessor.total_length();
1464 if len < self.mem.get_page_size() / 2 {
1465 let inline_data =
1466 DynamicCollection::<V>::make_inline_data(&page.memory()[..len]);
1467 self.tree
1468 .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1469 drop(page);
1470 let mut allocated_pages = self.allocated_pages.lock();
1471 if !self.mem.free_if_uncommitted(new_root, &mut allocated_pages) {
1472 (*self.freed_pages).lock().push(new_root);
1473 }
1474 } else {
1475 let subtree_data =
1476 DynamicCollection::<V>::make_subtree_data(BtreeHeader::new(
1477 new_root,
1478 new_checksum,
1479 accessor.num_pairs() as u64,
1480 ));
1481 self.tree
1482 .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1483 }
1484 }
1485 BRANCH => {
1486 let subtree_data = DynamicCollection::<V>::make_subtree_data(
1487 BtreeHeader::new(new_root, new_checksum, new_length),
1488 );
1489 self.tree
1490 .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1491 }
1492 other => {
1493 return Err(StorageError::invalid_page_type(new_root, other));
1494 }
1495 }
1496 } else {
1497 self.tree.remove(key.borrow())?;
1498 }
1499
1500 existed
1501 }
1502 };
1503
1504 if existed {
1505 self.num_values -= 1;
1506 if self.transaction.cdc_log.is_some() {
1507 self.transaction.record_cdc(CdcEvent {
1508 table_name: self.name.clone(),
1509 op: ChangeOp::Delete,
1510 key: K::as_bytes(key.borrow()).as_ref().to_vec(),
1511 new_value: None,
1512 old_value: Some(V::as_bytes(value.borrow()).as_ref().to_vec()),
1513 });
1514 }
1515 }
1516
1517 Ok(existed)
1518 }
1519
1520 pub fn drain<'a, KR>(&mut self, range: impl RangeBounds<KR> + 'a) -> Result<u64>
1524 where
1525 KR: Borrow<K::SelfType<'a>> + 'a,
1526 {
1527 let keys: Vec<Vec<u8>> = self
1529 .range(range)?
1530 .map(|result| result.map(|(k, _)| K::as_bytes(&k.value()).as_ref().to_vec()))
1531 .collect::<Result<Vec<_>>>()?;
1532
1533 let mut count = 0u64;
1534 for key_bytes in &keys {
1535 let key = K::from_bytes(key_bytes);
1536 for result in self.remove_all(&key)? {
1538 result?;
1539 }
1540 count += 1;
1541 }
1542 Ok(count)
1543 }
1544
1545 pub fn drain_all(&mut self) -> Result<u64> {
1549 self.drain::<K::SelfType<'_>>(..)
1550 }
1551
1552 pub fn remove_all<'a>(
1556 &mut self,
1557 key: impl Borrow<K::SelfType<'a>>,
1558 ) -> Result<MultimapValue<'_, V>> {
1559 if self.transaction.cdc_log.is_some() {
1561 let key_bytes = K::as_bytes(key.borrow()).as_ref().to_vec();
1562 let guard = self.transaction.transaction_guard();
1563 if let Some(collection) = self.tree.get(key.borrow())? {
1564 let value_iter = DynamicCollection::iter(collection, guard, self.mem.clone())?;
1565 let mut cdc_events: Vec<CdcEvent> = Vec::new();
1566 for result in value_iter {
1567 let val = result?;
1568 cdc_events.push(CdcEvent {
1569 table_name: self.name.clone(),
1570 op: ChangeOp::Delete,
1571 key: key_bytes.clone(),
1572 new_value: None,
1573 old_value: Some(V::as_bytes(&val.value()).as_ref().to_vec()),
1574 });
1575 }
1576 for event in cdc_events {
1577 self.transaction.record_cdc(event);
1578 }
1579 }
1580 }
1581
1582 let iter = if let Some(collection) = self.tree.remove(key.borrow())? {
1583 let mut pages = vec![];
1584 if matches!(
1585 collection.value().collection_type()?,
1586 DynamicCollectionType::SubtreeV2
1587 ) {
1588 let root = collection.value().as_subtree().root;
1589 let all_pages = AllPageNumbersBtreeIter::new(
1590 root,
1591 V::fixed_width(),
1592 <() as Value>::fixed_width(),
1593 self.mem.clone(),
1594 )?;
1595 for page in all_pages {
1596 pages.push(page?);
1597 }
1598 }
1599
1600 self.num_values -= collection.value().get_num_values()?;
1601
1602 DynamicCollection::iter_free_on_drop(
1603 collection,
1604 pages,
1605 self.freed_pages.clone(),
1606 self.allocated_pages.clone(),
1607 self.transaction.transaction_guard(),
1608 self.mem.clone(),
1609 )?
1610 } else {
1611 MultimapValue::new_subtree(
1612 BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(
1613 &(..),
1614 None,
1615 None,
1616 if self.mem.compression().is_enabled() {
1617 None
1618 } else {
1619 <() as Value>::fixed_width()
1620 },
1621 self.mem.clone(),
1622 false,
1623 )?,
1624 0,
1625 self.transaction.transaction_guard(),
1626 )
1627 };
1628
1629 Ok(iter)
1630 }
1631}
1632
1633impl<K: Key + 'static, V: Key + 'static> ReadableTableMetadata for MultimapTable<'_, K, V> {
1634 fn stats(&self) -> Result<TableStats> {
1635 let tree_stats = multimap_btree_stats(
1636 self.tree.get_root().map(|x| x.root),
1637 &self.mem,
1638 K::fixed_width(),
1639 V::fixed_width(),
1640 )?;
1641
1642 Ok(TableStats {
1643 tree_height: tree_stats.tree_height,
1644 leaf_pages: tree_stats.leaf_pages,
1645 branch_pages: tree_stats.branch_pages,
1646 stored_leaf_bytes: tree_stats.stored_leaf_bytes,
1647 metadata_bytes: tree_stats.metadata_bytes,
1648 fragmented_bytes: tree_stats.fragmented_bytes,
1649 })
1650 }
1651
1652 fn len(&self) -> Result<u64> {
1654 Ok(self.num_values)
1655 }
1656}
1657
1658impl<K: Key + 'static, V: Key + 'static> ReadableMultimapTable<K, V> for MultimapTable<'_, K, V> {
1659 fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<'_, V>> {
1660 let guard = self.transaction.transaction_guard();
1661 let iter = if let Some(collection) = self.tree.get(key.borrow())? {
1662 DynamicCollection::iter(collection, guard, self.mem.clone())?
1663 } else {
1664 MultimapValue::new_subtree(
1665 BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(
1666 &(..),
1667 None,
1668 None,
1669 if self.mem.compression().is_enabled() {
1670 None
1671 } else {
1672 <() as Value>::fixed_width()
1673 },
1674 self.mem.clone(),
1675 false,
1676 )?,
1677 0,
1678 guard,
1679 )
1680 };
1681
1682 Ok(iter)
1683 }
1684
1685 fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<MultimapRange<'_, K, V>>
1686 where
1687 KR: Borrow<K::SelfType<'a>> + 'a,
1688 {
1689 let inner = self.tree.range(&range)?;
1690 Ok(MultimapRange::new(
1691 inner,
1692 self.transaction.transaction_guard(),
1693 self.mem.clone(),
1694 ))
1695 }
1696}
1697
1698impl<K: Key + 'static, V: Key + 'static> Sealed for MultimapTable<'_, K, V> {}
1699
1700impl<K: Key + 'static, V: Key + 'static> Drop for MultimapTable<'_, K, V> {
1701 fn drop(&mut self) {
1702 self.transaction
1703 .close_table(&self.name, &self.tree, self.num_values);
1704 }
1705}
1706
1707pub trait ReadableMultimapTable<K: Key + 'static, V: Key + 'static>: ReadableTableMetadata {
1708 fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<'_, V>>;
1710
1711 fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<MultimapRange<'_, K, V>>
1713 where
1714 KR: Borrow<K::SelfType<'a>> + 'a;
1715
1716 fn iter(&self) -> Result<MultimapRange<'_, K, V>> {
1719 self.range::<K::SelfType<'_>>(..)
1720 }
1721}
1722
1723pub struct ReadOnlyUntypedMultimapTable {
1725 num_values: u64,
1726 tree: RawBtree,
1727 fixed_key_size: Option<usize>,
1728 fixed_value_size: Option<usize>,
1729 mem: Arc<TransactionalMemory>,
1730}
1731
1732impl Sealed for ReadOnlyUntypedMultimapTable {}
1733
1734impl ReadableTableMetadata for ReadOnlyUntypedMultimapTable {
1735 fn stats(&self) -> Result<TableStats> {
1737 let tree_stats = multimap_btree_stats(
1738 self.tree.get_root().map(|x| x.root),
1739 &self.mem,
1740 self.fixed_key_size,
1741 self.fixed_value_size,
1742 )?;
1743
1744 Ok(TableStats {
1745 tree_height: tree_stats.tree_height,
1746 leaf_pages: tree_stats.leaf_pages,
1747 branch_pages: tree_stats.branch_pages,
1748 stored_leaf_bytes: tree_stats.stored_leaf_bytes,
1749 metadata_bytes: tree_stats.metadata_bytes,
1750 fragmented_bytes: tree_stats.fragmented_bytes,
1751 })
1752 }
1753
1754 fn len(&self) -> Result<u64> {
1755 Ok(self.num_values)
1756 }
1757}
1758
1759impl ReadOnlyUntypedMultimapTable {
1760 pub(crate) fn new(
1761 root: Option<BtreeHeader>,
1762 num_values: u64,
1763 fixed_key_size: Option<usize>,
1764 fixed_value_size: Option<usize>,
1765 mem: Arc<TransactionalMemory>,
1766 ) -> Self {
1767 Self {
1768 num_values,
1769 tree: RawBtree::new(
1770 root,
1771 fixed_key_size,
1772 DynamicCollection::<()>::fixed_width_with(fixed_value_size),
1773 mem.clone(),
1774 ),
1775 fixed_key_size,
1776 fixed_value_size,
1777 mem,
1778 }
1779 }
1780}
1781
1782pub struct ReadOnlyMultimapTable<K: Key + 'static, V: Key + 'static> {
1784 tree: Btree<K, &'static DynamicCollection<V>>,
1785 num_values: u64,
1786 mem: Arc<TransactionalMemory>,
1787 transaction_guard: Arc<TransactionGuard>,
1788 _value_type: PhantomData<V>,
1789}
1790
1791impl<K: Key + 'static, V: Key + 'static> ReadOnlyMultimapTable<K, V> {
1792 pub(crate) fn new(
1793 root: Option<BtreeHeader>,
1794 num_values: u64,
1795 hint: PageHint,
1796 guard: Arc<TransactionGuard>,
1797 mem: Arc<TransactionalMemory>,
1798 ) -> Result<ReadOnlyMultimapTable<K, V>> {
1799 Ok(ReadOnlyMultimapTable {
1800 tree: Btree::new(root, hint, guard.clone(), mem.clone())?,
1801 num_values,
1802 mem,
1803 transaction_guard: guard,
1804 _value_type: Default::default(),
1805 })
1806 }
1807
1808 pub fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<'static, V>> {
1811 let iter = if let Some(collection) = self.tree.get(key.borrow())? {
1812 DynamicCollection::iter(collection, self.transaction_guard.clone(), self.mem.clone())?
1813 } else {
1814 MultimapValue::new_subtree(
1815 BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(
1816 &(..),
1817 None,
1818 None,
1819 if self.mem.compression().is_enabled() {
1820 None
1821 } else {
1822 <() as Value>::fixed_width()
1823 },
1824 self.mem.clone(),
1825 false,
1826 )?,
1827 0,
1828 self.transaction_guard.clone(),
1829 )
1830 };
1831
1832 Ok(iter)
1833 }
1834
1835 pub fn range<'a, KR>(&self, range: impl RangeBounds<KR>) -> Result<MultimapRange<'static, K, V>>
1838 where
1839 KR: Borrow<K::SelfType<'a>>,
1840 {
1841 let inner = self.tree.range(&range)?;
1842 Ok(MultimapRange::new(
1843 inner,
1844 self.transaction_guard.clone(),
1845 self.mem.clone(),
1846 ))
1847 }
1848}
1849
1850impl<K: Key + 'static, V: Key + 'static> ReadableTableMetadata for ReadOnlyMultimapTable<K, V> {
1851 fn stats(&self) -> Result<TableStats> {
1852 let tree_stats = multimap_btree_stats(
1853 self.tree.get_root().map(|x| x.root),
1854 &self.mem,
1855 K::fixed_width(),
1856 V::fixed_width(),
1857 )?;
1858
1859 Ok(TableStats {
1860 tree_height: tree_stats.tree_height,
1861 leaf_pages: tree_stats.leaf_pages,
1862 branch_pages: tree_stats.branch_pages,
1863 stored_leaf_bytes: tree_stats.stored_leaf_bytes,
1864 metadata_bytes: tree_stats.metadata_bytes,
1865 fragmented_bytes: tree_stats.fragmented_bytes,
1866 })
1867 }
1868
1869 fn len(&self) -> Result<u64> {
1870 Ok(self.num_values)
1871 }
1872}
1873
1874impl<K: Key + 'static, V: Key + 'static> ReadableMultimapTable<K, V>
1875 for ReadOnlyMultimapTable<K, V>
1876{
1877 fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<'_, V>> {
1879 let iter = if let Some(collection) = self.tree.get(key.borrow())? {
1880 DynamicCollection::iter(collection, self.transaction_guard.clone(), self.mem.clone())?
1881 } else {
1882 MultimapValue::new_subtree(
1883 BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(
1884 &(..),
1885 None,
1886 None,
1887 if self.mem.compression().is_enabled() {
1888 None
1889 } else {
1890 <() as Value>::fixed_width()
1891 },
1892 self.mem.clone(),
1893 false,
1894 )?,
1895 0,
1896 self.transaction_guard.clone(),
1897 )
1898 };
1899
1900 Ok(iter)
1901 }
1902
1903 fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<MultimapRange<'_, K, V>>
1904 where
1905 KR: Borrow<K::SelfType<'a>> + 'a,
1906 {
1907 let inner = self.tree.range(&range)?;
1908 Ok(MultimapRange::new(
1909 inner,
1910 self.transaction_guard.clone(),
1911 self.mem.clone(),
1912 ))
1913 }
1914}
1915
1916impl<K: Key, V: Key> Sealed for ReadOnlyMultimapTable<K, V> {}