1use crate::db::TransactionGuard;
2use crate::multimap_table::DynamicCollectionType::{Inline, SubtreeV2};
3use crate::sealed::Sealed;
4use crate::table::{ReadableTableMetadata, TableStats};
5use crate::tree_store::{
6 AllPageNumbersBtreeIter, BRANCH, BranchAccessor, BranchMutator, Btree, BtreeHeader, BtreeMut,
7 BtreeRangeIter, BtreeStats, Checksum, DEFERRED, LEAF, LeafAccessor, LeafMutator,
8 MAX_PAIR_LENGTH, MAX_VALUE_LENGTH, Page, PageHint, PageNumber, PagePath, PageTrackerPolicy,
9 RawBtree, RawLeafBuilder, TransactionalMemory, UntypedBtree, UntypedBtreeMut, btree_stats,
10};
11use crate::types::{Key, TypeName, Value};
12use crate::{AccessGuard, MultimapTableHandle, Result, StorageError, WriteTransaction};
13use std::borrow::Borrow;
14use std::cmp::max;
15use std::collections::HashMap;
16use std::convert::TryInto;
17use std::marker::PhantomData;
18use std::mem;
19use std::mem::size_of;
20use std::ops::{RangeBounds, RangeFull};
21use std::sync::{Arc, Mutex};
22
23pub(crate) fn multimap_btree_stats(
24 root: Option<PageNumber>,
25 mem: &TransactionalMemory,
26 fixed_key_size: Option<usize>,
27 fixed_value_size: Option<usize>,
28) -> Result<BtreeStats> {
29 if let Some(root) = root {
30 multimap_stats_helper(root, mem, fixed_key_size, fixed_value_size)
31 } else {
32 Ok(BtreeStats {
33 tree_height: 0,
34 leaf_pages: 0,
35 branch_pages: 0,
36 stored_leaf_bytes: 0,
37 metadata_bytes: 0,
38 fragmented_bytes: 0,
39 })
40 }
41}
42
43fn multimap_stats_helper(
44 page_number: PageNumber,
45 mem: &TransactionalMemory,
46 fixed_key_size: Option<usize>,
47 fixed_value_size: Option<usize>,
48) -> Result<BtreeStats> {
49 let page = mem.get_page(page_number)?;
50 let node_mem = page.memory();
51 match node_mem[0] {
52 LEAF => {
53 let accessor = LeafAccessor::new(
54 page.memory(),
55 fixed_key_size,
56 DynamicCollection::<()>::fixed_width_with(fixed_value_size),
57 );
58 let mut leaf_bytes = 0u64;
59 let mut is_branch = false;
60 for i in 0..accessor.num_pairs() {
61 let entry = accessor.entry(i).unwrap();
62 let collection: &UntypedDynamicCollection =
63 UntypedDynamicCollection::new(entry.value());
64 match collection.collection_type() {
65 Inline => {
66 let inline_accessor = LeafAccessor::new(
67 collection.as_inline(),
68 fixed_value_size,
69 <() as Value>::fixed_width(),
70 );
71 leaf_bytes +=
72 inline_accessor.length_of_pairs(0, inline_accessor.num_pairs()) as u64;
73 }
74 SubtreeV2 => {
75 is_branch = true;
76 }
77 }
78 }
79 let mut overhead_bytes = (accessor.total_length() as u64) - leaf_bytes;
80 let mut fragmented_bytes = (page.memory().len() - accessor.total_length()) as u64;
81 let mut max_child_height = 0;
82 let (mut leaf_pages, mut branch_pages) = if is_branch { (0, 1) } else { (1, 0) };
83
84 for i in 0..accessor.num_pairs() {
85 let entry = accessor.entry(i).unwrap();
86 let collection: &UntypedDynamicCollection =
87 UntypedDynamicCollection::new(entry.value());
88 match collection.collection_type() {
89 Inline => {
90 }
92 SubtreeV2 => {
93 let stats = btree_stats(
95 Some(collection.as_subtree().root),
96 mem,
97 fixed_value_size,
98 <() as Value>::fixed_width(),
99 )?;
100 max_child_height = max(max_child_height, stats.tree_height);
101 branch_pages += stats.branch_pages;
102 leaf_pages += stats.leaf_pages;
103 fragmented_bytes += stats.fragmented_bytes;
104 overhead_bytes += stats.metadata_bytes;
105 leaf_bytes += stats.stored_leaf_bytes;
106 }
107 }
108 }
109
110 Ok(BtreeStats {
111 tree_height: max_child_height + 1,
112 leaf_pages,
113 branch_pages,
114 stored_leaf_bytes: leaf_bytes,
115 metadata_bytes: overhead_bytes,
116 fragmented_bytes,
117 })
118 }
119 BRANCH => {
120 let accessor = BranchAccessor::new(&page, fixed_key_size);
121 let mut max_child_height = 0;
122 let mut leaf_pages = 0;
123 let mut branch_pages = 1;
124 let mut stored_leaf_bytes = 0;
125 let mut metadata_bytes = accessor.total_length() as u64;
126 let mut fragmented_bytes = (page.memory().len() - accessor.total_length()) as u64;
127 for i in 0..accessor.count_children() {
128 if let Some(child) = accessor.child_page(i) {
129 let stats =
130 multimap_stats_helper(child, mem, fixed_key_size, fixed_value_size)?;
131 max_child_height = max(max_child_height, stats.tree_height);
132 leaf_pages += stats.leaf_pages;
133 branch_pages += stats.branch_pages;
134 stored_leaf_bytes += stats.stored_leaf_bytes;
135 metadata_bytes += stats.metadata_bytes;
136 fragmented_bytes += stats.fragmented_bytes;
137 }
138 }
139
140 Ok(BtreeStats {
141 tree_height: max_child_height + 1,
142 leaf_pages,
143 branch_pages,
144 stored_leaf_bytes,
145 metadata_bytes,
146 fragmented_bytes,
147 })
148 }
149 _ => unreachable!(),
150 }
151}
152
153pub(crate) fn verify_tree_and_subtree_checksums(
155 root: Option<BtreeHeader>,
156 key_size: Option<usize>,
157 value_size: Option<usize>,
158 mem: Arc<TransactionalMemory>,
159) -> Result<bool> {
160 if let Some(header) = root {
161 if !RawBtree::new(
162 Some(header),
163 key_size,
164 DynamicCollection::<()>::fixed_width_with(value_size),
165 mem.clone(),
166 )
167 .verify_checksum()?
168 {
169 return Ok(false);
170 }
171
172 let table_pages_iter = AllPageNumbersBtreeIter::new(
173 header.root,
174 key_size,
175 DynamicCollection::<()>::fixed_width_with(value_size),
176 mem.clone(),
177 )?;
178 for table_page in table_pages_iter {
179 let page = mem.get_page(table_page?)?;
180 let subtree_roots = parse_subtree_roots(&page, key_size, value_size);
181 for header in subtree_roots {
182 if !RawBtree::new(Some(header), value_size, <()>::fixed_width(), mem.clone())
183 .verify_checksum()?
184 {
185 return Ok(false);
186 }
187 }
188 }
189 }
190
191 Ok(true)
192}
193
194pub(crate) fn relocate_subtrees(
196 root: (PageNumber, Checksum),
197 key_size: Option<usize>,
198 value_size: Option<usize>,
199 mem: Arc<TransactionalMemory>,
200 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
201 relocation_map: &HashMap<PageNumber, PageNumber>,
202) -> Result<(PageNumber, Checksum)> {
203 let old_page = mem.get_page(root.0)?;
204 let mut new_page = if let Some(new_page_number) = relocation_map.get(&root.0) {
205 mem.get_page_mut(*new_page_number)?
206 } else {
207 return Ok(root);
208 };
209 let new_page_number = new_page.get_page_number();
210 new_page.memory_mut().copy_from_slice(old_page.memory());
211
212 match old_page.memory()[0] {
213 LEAF => {
214 let accessor = LeafAccessor::new(
215 old_page.memory(),
216 key_size,
217 UntypedDynamicCollection::fixed_width_with(value_size),
218 );
219 let mut mutator = LeafMutator::new(
221 &mut new_page,
222 key_size,
223 UntypedDynamicCollection::fixed_width_with(value_size),
224 );
225 for i in 0..accessor.num_pairs() {
226 let entry = accessor.entry(i).unwrap();
227 let collection = UntypedDynamicCollection::from_bytes(entry.value());
228 if matches!(collection.collection_type(), SubtreeV2) {
229 let sub_root = collection.as_subtree();
230 let mut tree = UntypedBtreeMut::new(
231 Some(sub_root),
232 mem.clone(),
233 freed_pages.clone(),
234 value_size,
235 <() as Value>::fixed_width(),
236 );
237 tree.relocate(relocation_map)?;
238 if sub_root != tree.get_root().unwrap() {
239 let new_collection =
240 UntypedDynamicCollection::make_subtree_data(tree.get_root().unwrap());
241 mutator.insert(i, true, entry.key(), &new_collection);
242 }
243 }
244 }
245 }
246 BRANCH => {
247 let accessor = BranchAccessor::new(&old_page, key_size);
248 let mut mutator = BranchMutator::new(&mut new_page);
249 for i in 0..accessor.count_children() {
250 if let Some(child) = accessor.child_page(i) {
251 let child_checksum = accessor.child_checksum(i).unwrap();
252 let (new_child, new_checksum) = relocate_subtrees(
253 (child, child_checksum),
254 key_size,
255 value_size,
256 mem.clone(),
257 freed_pages.clone(),
258 relocation_map,
259 )?;
260 mutator.write_child_page(i, new_child, new_checksum);
261 }
262 }
263 }
264 _ => unreachable!(),
265 }
266
267 let old_page_number = old_page.get_page_number();
268 drop(old_page);
269 let mut ignore = PageTrackerPolicy::Ignore;
272 if !mem.free_if_uncommitted(old_page_number, &mut ignore) {
273 freed_pages.lock().unwrap().push(old_page_number);
274 }
275 Ok((new_page_number, DEFERRED))
276}
277
278pub(crate) fn finalize_tree_and_subtree_checksums(
281 root: Option<BtreeHeader>,
282 key_size: Option<usize>,
283 value_size: Option<usize>,
284 mem: Arc<TransactionalMemory>,
285) -> Result<Option<BtreeHeader>> {
286 let freed_pages = Arc::new(Mutex::new(vec![]));
287 let mut tree = UntypedBtreeMut::new(
288 root,
289 mem.clone(),
290 freed_pages.clone(),
291 key_size,
292 DynamicCollection::<()>::fixed_width_with(value_size),
293 );
294 tree.dirty_leaf_visitor(|mut leaf_page| {
295 let mut sub_root_updates = vec![];
296 let accessor = LeafAccessor::new(
297 leaf_page.memory(),
298 key_size,
299 DynamicCollection::<()>::fixed_width_with(value_size),
300 );
301 for i in 0..accessor.num_pairs() {
302 let entry = accessor.entry(i).unwrap();
303 let collection = <&DynamicCollection<()>>::from_bytes(entry.value());
304 if matches!(collection.collection_type(), SubtreeV2) {
305 let sub_root = collection.as_subtree();
306 if mem.uncommitted(sub_root.root) {
307 let mut subtree = UntypedBtreeMut::new(
308 Some(sub_root),
309 mem.clone(),
310 freed_pages.clone(),
311 value_size,
312 <()>::fixed_width(),
313 );
314 let subtree_root = subtree.finalize_dirty_checksums()?.unwrap();
315 sub_root_updates.push((i, entry.key().to_vec(), subtree_root));
316 }
317 }
318 }
319 let mut mutator = LeafMutator::new(
321 &mut leaf_page,
322 key_size,
323 DynamicCollection::<()>::fixed_width_with(value_size),
324 );
325 for (i, key, sub_root) in sub_root_updates {
326 let collection = DynamicCollection::<()>::make_subtree_data(sub_root);
327 mutator.insert(i, true, &key, &collection);
328 }
329
330 Ok(())
331 })?;
332
333 let root = tree.finalize_dirty_checksums()?;
334 assert!(freed_pages.lock().unwrap().is_empty());
336 Ok(root)
337}
338
339fn parse_subtree_roots<T: Page>(
340 page: &T,
341 fixed_key_size: Option<usize>,
342 fixed_value_size: Option<usize>,
343) -> Vec<BtreeHeader> {
344 match page.memory()[0] {
345 BRANCH => {
346 vec![]
347 }
348 LEAF => {
349 let mut result = vec![];
350 let accessor = LeafAccessor::new(
351 page.memory(),
352 fixed_key_size,
353 DynamicCollection::<()>::fixed_width_with(fixed_value_size),
354 );
355 for i in 0..accessor.num_pairs() {
356 let entry = accessor.entry(i).unwrap();
357 let collection = <&DynamicCollection<()>>::from_bytes(entry.value());
358 if matches!(collection.collection_type(), SubtreeV2) {
359 result.push(collection.as_subtree());
360 }
361 }
362
363 result
364 }
365 _ => unreachable!(),
366 }
367}
368
369pub(crate) struct UntypedMultiBtree {
370 mem: Arc<TransactionalMemory>,
371 root: Option<BtreeHeader>,
372 key_width: Option<usize>,
373 value_width: Option<usize>,
374}
375
376impl UntypedMultiBtree {
377 pub(crate) fn new(
378 root: Option<BtreeHeader>,
379 mem: Arc<TransactionalMemory>,
380 key_width: Option<usize>,
381 value_width: Option<usize>,
382 ) -> Self {
383 Self {
384 mem,
385 root,
386 key_width,
387 value_width,
388 }
389 }
390
391 pub(crate) fn visit_all_pages<F>(&self, mut visitor: F) -> Result
393 where
394 F: FnMut(&PagePath) -> Result,
395 {
396 let tree = UntypedBtree::new(
397 self.root,
398 self.mem.clone(),
399 self.key_width,
400 UntypedDynamicCollection::fixed_width_with(self.value_width),
401 );
402 tree.visit_all_pages(|path| {
403 visitor(path)?;
404 let page = self.mem.get_page(path.page_number())?;
405 match page.memory()[0] {
406 LEAF => {
407 for header in parse_subtree_roots(&page, self.key_width, self.value_width) {
408 let subtree = UntypedBtree::new(
409 Some(header),
410 self.mem.clone(),
411 self.value_width,
412 <() as Value>::fixed_width(),
413 );
414 subtree.visit_all_pages(|subpath| {
415 let full_path = path.with_subpath(subpath);
416 visitor(&full_path)
417 })?;
418 }
419 }
420 BRANCH => {
421 }
423 _ => unreachable!(),
424 }
425 Ok(())
426 })?;
427
428 Ok(())
429 }
430}
431
432pub(crate) struct LeafKeyIter<'a, V: Key + 'static> {
433 inline_collection: AccessGuard<'a, &'static DynamicCollection<V>>,
434 fixed_key_size: Option<usize>,
435 fixed_value_size: Option<usize>,
436 start_entry: isize, end_entry: isize, }
439
440impl<'a, V: Key> LeafKeyIter<'a, V> {
441 fn new(
442 data: AccessGuard<'a, &'static DynamicCollection<V>>,
443 fixed_key_size: Option<usize>,
444 fixed_value_size: Option<usize>,
445 ) -> Self {
446 let accessor =
447 LeafAccessor::new(data.value().as_inline(), fixed_key_size, fixed_value_size);
448 let end_entry = isize::try_from(accessor.num_pairs()).unwrap() - 1;
449 Self {
450 inline_collection: data,
451 fixed_key_size,
452 fixed_value_size,
453 start_entry: 0,
454 end_entry,
455 }
456 }
457
458 fn next_key(&mut self) -> Option<&[u8]> {
459 if self.end_entry < self.start_entry {
460 return None;
461 }
462 let accessor = LeafAccessor::new(
463 self.inline_collection.value().as_inline(),
464 self.fixed_key_size,
465 self.fixed_value_size,
466 );
467 self.start_entry += 1;
468 accessor
469 .entry((self.start_entry - 1).try_into().unwrap())
470 .map(|e| e.key())
471 }
472
473 fn next_key_back(&mut self) -> Option<&[u8]> {
474 if self.end_entry < self.start_entry {
475 return None;
476 }
477 let accessor = LeafAccessor::new(
478 self.inline_collection.value().as_inline(),
479 self.fixed_key_size,
480 self.fixed_value_size,
481 );
482 self.end_entry -= 1;
483 accessor
484 .entry((self.end_entry + 1).try_into().unwrap())
485 .map(|e| e.key())
486 }
487}
488
489enum DynamicCollectionType {
490 Inline,
491 SubtreeV2,
494}
495
496impl From<u8> for DynamicCollectionType {
497 fn from(value: u8) -> Self {
498 match value {
499 LEAF => Inline,
500 3 => SubtreeV2,
502 _ => unreachable!(),
503 }
504 }
505}
506
507#[allow(clippy::from_over_into)]
508impl Into<u8> for DynamicCollectionType {
509 fn into(self) -> u8 {
510 match self {
511 Inline => LEAF,
514 SubtreeV2 => 3,
516 }
517 }
518}
519
520#[repr(transparent)]
534struct DynamicCollection<V: Key> {
535 _value_type: PhantomData<V>,
536 data: [u8],
537}
538
539impl<V: Key> std::fmt::Debug for DynamicCollection<V> {
540 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
541 f.debug_struct("DynamicCollection")
542 .field("data", &&self.data)
543 .finish()
544 }
545}
546
547impl<V: Key> Value for &DynamicCollection<V> {
548 type SelfType<'a>
549 = &'a DynamicCollection<V>
550 where
551 Self: 'a;
552 type AsBytes<'a>
553 = &'a [u8]
554 where
555 Self: 'a;
556
557 fn fixed_width() -> Option<usize> {
558 None
559 }
560
561 fn from_bytes<'a>(data: &'a [u8]) -> &'a DynamicCollection<V>
562 where
563 Self: 'a,
564 {
565 DynamicCollection::new(data)
566 }
567
568 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> &'a [u8]
569 where
570 Self: 'b,
571 {
572 &value.data
573 }
574
575 fn type_name() -> TypeName {
576 TypeName::internal("redb::DynamicCollection")
577 }
578}
579
580impl<V: Key> DynamicCollection<V> {
581 fn new(data: &[u8]) -> &Self {
582 unsafe { &*(std::ptr::from_ref::<[u8]>(data) as *const DynamicCollection<V>) }
583 }
584
585 fn collection_type(&self) -> DynamicCollectionType {
586 DynamicCollectionType::from(self.data[0])
587 }
588
589 fn as_inline(&self) -> &[u8] {
590 debug_assert!(matches!(self.collection_type(), Inline));
591 &self.data[1..]
592 }
593
594 fn as_subtree(&self) -> BtreeHeader {
595 assert!(matches!(self.collection_type(), SubtreeV2));
596 BtreeHeader::from_le_bytes(
597 self.data[1..=BtreeHeader::serialized_size()]
598 .try_into()
599 .unwrap(),
600 )
601 }
602
603 fn get_num_values(&self) -> u64 {
604 match self.collection_type() {
605 Inline => {
606 let leaf_data = self.as_inline();
607 let accessor =
608 LeafAccessor::new(leaf_data, V::fixed_width(), <() as Value>::fixed_width());
609 accessor.num_pairs() as u64
610 }
611 SubtreeV2 => {
612 let offset = 1 + PageNumber::serialized_size() + size_of::<Checksum>();
613 u64::from_le_bytes(
614 self.data[offset..(offset + size_of::<u64>())]
615 .try_into()
616 .unwrap(),
617 )
618 }
619 }
620 }
621
622 fn make_inline_data(data: &[u8]) -> Vec<u8> {
623 let mut result = vec![Inline.into()];
624 result.extend_from_slice(data);
625
626 result
627 }
628
629 fn make_subtree_data(header: BtreeHeader) -> Vec<u8> {
630 let mut result = vec![SubtreeV2.into()];
631 result.extend_from_slice(&header.to_le_bytes());
632 result
633 }
634
635 pub(crate) fn fixed_width_with(_value_width: Option<usize>) -> Option<usize> {
636 None
637 }
638}
639
640impl<V: Key> DynamicCollection<V> {
641 fn iter<'a>(
642 collection: AccessGuard<'a, &'static DynamicCollection<V>>,
643 guard: Arc<TransactionGuard>,
644 mem: Arc<TransactionalMemory>,
645 ) -> Result<MultimapValue<'a, V>> {
646 Ok(match collection.value().collection_type() {
647 Inline => {
648 let leaf_iter =
649 LeafKeyIter::new(collection, V::fixed_width(), <() as Value>::fixed_width());
650 MultimapValue::new_inline(leaf_iter, guard)
651 }
652 SubtreeV2 => {
653 let root = collection.value().as_subtree().root;
654 MultimapValue::new_subtree(
655 BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), Some(root), mem)?,
656 collection.value().get_num_values(),
657 guard,
658 )
659 }
660 })
661 }
662
663 fn iter_free_on_drop<'a>(
664 collection: AccessGuard<'a, &'static DynamicCollection<V>>,
665 pages: Vec<PageNumber>,
666 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
667 allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
668 guard: Arc<TransactionGuard>,
669 mem: Arc<TransactionalMemory>,
670 ) -> Result<MultimapValue<'a, V>> {
671 let num_values = collection.value().get_num_values();
672 Ok(match collection.value().collection_type() {
673 Inline => {
674 let leaf_iter =
675 LeafKeyIter::new(collection, V::fixed_width(), <() as Value>::fixed_width());
676 MultimapValue::new_inline(leaf_iter, guard)
677 }
678 SubtreeV2 => {
679 let root = collection.value().as_subtree().root;
680 let inner = BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(
681 &(..),
682 Some(root),
683 mem.clone(),
684 )?;
685 MultimapValue::new_subtree_free_on_drop(
686 inner,
687 num_values,
688 freed_pages,
689 allocated_pages,
690 pages,
691 guard,
692 mem,
693 )
694 }
695 })
696 }
697}
698
699#[repr(transparent)]
700struct UntypedDynamicCollection {
701 data: [u8],
702}
703
704impl UntypedDynamicCollection {
705 pub(crate) fn fixed_width_with(_value_width: Option<usize>) -> Option<usize> {
706 None
707 }
708
709 fn new(data: &[u8]) -> &Self {
710 unsafe { &*(std::ptr::from_ref::<[u8]>(data) as *const UntypedDynamicCollection) }
711 }
712
713 fn make_subtree_data(header: BtreeHeader) -> Vec<u8> {
714 let mut result = vec![SubtreeV2.into()];
715 result.extend_from_slice(&header.to_le_bytes());
716 result
717 }
718
719 fn from_bytes(data: &[u8]) -> &Self {
720 Self::new(data)
721 }
722
723 fn collection_type(&self) -> DynamicCollectionType {
724 DynamicCollectionType::from(self.data[0])
725 }
726
727 fn as_inline(&self) -> &[u8] {
728 debug_assert!(matches!(self.collection_type(), Inline));
729 &self.data[1..]
730 }
731
732 fn as_subtree(&self) -> BtreeHeader {
733 assert!(matches!(self.collection_type(), SubtreeV2));
734 BtreeHeader::from_le_bytes(
735 self.data[1..=BtreeHeader::serialized_size()]
736 .try_into()
737 .unwrap(),
738 )
739 }
740}
741
742enum ValueIterState<'a, V: Key + 'static> {
743 Subtree(BtreeRangeIter<V, ()>),
744 InlineLeaf(LeafKeyIter<'a, V>),
745}
746
747pub struct MultimapValue<'a, V: Key + 'static> {
748 inner: Option<ValueIterState<'a, V>>,
749 remaining: u64,
750 freed_pages: Option<Arc<Mutex<Vec<PageNumber>>>>,
751 allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
752 free_on_drop: Vec<PageNumber>,
753 _transaction_guard: Arc<TransactionGuard>,
754 mem: Option<Arc<TransactionalMemory>>,
755 _value_type: PhantomData<V>,
756}
757
758impl<'a, V: Key + 'static> MultimapValue<'a, V> {
759 fn new_subtree(
760 inner: BtreeRangeIter<V, ()>,
761 num_values: u64,
762 guard: Arc<TransactionGuard>,
763 ) -> Self {
764 Self {
765 inner: Some(ValueIterState::Subtree(inner)),
766 remaining: num_values,
767 freed_pages: None,
768 allocated_pages: Arc::new(Mutex::new(PageTrackerPolicy::Closed)),
769 free_on_drop: vec![],
770 _transaction_guard: guard,
771 mem: None,
772 _value_type: Default::default(),
773 }
774 }
775
776 fn new_subtree_free_on_drop(
777 inner: BtreeRangeIter<V, ()>,
778 num_values: u64,
779 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
780 allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
781 pages: Vec<PageNumber>,
782 guard: Arc<TransactionGuard>,
783 mem: Arc<TransactionalMemory>,
784 ) -> Self {
785 Self {
786 inner: Some(ValueIterState::Subtree(inner)),
787 remaining: num_values,
788 freed_pages: Some(freed_pages),
789 free_on_drop: pages,
790 allocated_pages,
791 _transaction_guard: guard,
792 mem: Some(mem),
793 _value_type: Default::default(),
794 }
795 }
796
797 fn new_inline(inner: LeafKeyIter<'a, V>, guard: Arc<TransactionGuard>) -> Self {
798 let remaining = inner.inline_collection.value().get_num_values();
799 Self {
800 inner: Some(ValueIterState::InlineLeaf(inner)),
801 remaining,
802 freed_pages: None,
803 allocated_pages: Arc::new(Mutex::new(PageTrackerPolicy::Closed)),
804 free_on_drop: vec![],
805 _transaction_guard: guard,
806 mem: None,
807 _value_type: Default::default(),
808 }
809 }
810
811 pub fn len(&self) -> u64 {
815 self.remaining
816 }
817
818 pub fn is_empty(&self) -> bool {
819 self.len() == 0
820 }
821}
822
823impl<'a, V: Key + 'static> Iterator for MultimapValue<'a, V> {
824 type Item = Result<AccessGuard<'a, V>>;
825
826 fn next(&mut self) -> Option<Self::Item> {
827 let bytes = match self.inner.as_mut().unwrap() {
829 ValueIterState::Subtree(iter) => match iter.next()? {
830 Ok(e) => e.key_data(),
831 Err(err) => {
832 return Some(Err(err));
833 }
834 },
835 ValueIterState::InlineLeaf(iter) => iter.next_key()?.to_vec(),
836 };
837 self.remaining -= 1;
838 Some(Ok(AccessGuard::with_owned_value(bytes)))
839 }
840}
841
842impl<V: Key + 'static> DoubleEndedIterator for MultimapValue<'_, V> {
843 fn next_back(&mut self) -> Option<Self::Item> {
844 let bytes = match self.inner.as_mut().unwrap() {
846 ValueIterState::Subtree(iter) => match iter.next_back()? {
847 Ok(e) => e.key_data(),
848 Err(err) => {
849 return Some(Err(err));
850 }
851 },
852 ValueIterState::InlineLeaf(iter) => iter.next_key_back()?.to_vec(),
853 };
854 Some(Ok(AccessGuard::with_owned_value(bytes)))
855 }
856}
857
858impl<V: Key + 'static> Drop for MultimapValue<'_, V> {
859 fn drop(&mut self) {
860 drop(mem::take(&mut self.inner));
862 if !self.free_on_drop.is_empty() {
863 let mut freed_pages = self.freed_pages.as_ref().unwrap().lock().unwrap();
864 let mut allocated_pages = self.allocated_pages.lock().unwrap();
865 for page in &self.free_on_drop {
866 if !self
867 .mem
868 .as_ref()
869 .unwrap()
870 .free_if_uncommitted(*page, &mut allocated_pages)
871 {
872 freed_pages.push(*page);
873 }
874 }
875 }
876 }
877}
878
879pub struct MultimapRange<'a, K: Key + 'static, V: Key + 'static> {
880 inner: BtreeRangeIter<K, &'static DynamicCollection<V>>,
881 mem: Arc<TransactionalMemory>,
882 transaction_guard: Arc<TransactionGuard>,
883 _key_type: PhantomData<K>,
884 _value_type: PhantomData<V>,
885 _lifetime: PhantomData<&'a ()>,
886}
887
888impl<K: Key + 'static, V: Key + 'static> MultimapRange<'_, K, V> {
889 fn new(
890 inner: BtreeRangeIter<K, &'static DynamicCollection<V>>,
891 guard: Arc<TransactionGuard>,
892 mem: Arc<TransactionalMemory>,
893 ) -> Self {
894 Self {
895 inner,
896 mem,
897 transaction_guard: guard,
898 _key_type: Default::default(),
899 _value_type: Default::default(),
900 _lifetime: Default::default(),
901 }
902 }
903}
904
905impl<'a, K: Key + 'static, V: Key + 'static> Iterator for MultimapRange<'a, K, V> {
906 type Item = Result<(AccessGuard<'a, K>, MultimapValue<'a, V>)>;
907
908 fn next(&mut self) -> Option<Self::Item> {
909 match self.inner.next()? {
910 Ok(entry) => {
911 let key = AccessGuard::with_owned_value(entry.key_data());
912 let (page, _, value_range) = entry.into_raw();
913 let collection = AccessGuard::with_page(page, value_range);
914 Some(
915 DynamicCollection::iter(
916 collection,
917 self.transaction_guard.clone(),
918 self.mem.clone(),
919 )
920 .map(|iter| (key, iter)),
921 )
922 }
923 Err(err) => Some(Err(err)),
924 }
925 }
926}
927
928impl<K: Key + 'static, V: Key + 'static> DoubleEndedIterator for MultimapRange<'_, K, V> {
929 fn next_back(&mut self) -> Option<Self::Item> {
930 match self.inner.next_back()? {
931 Ok(entry) => {
932 let key = AccessGuard::with_owned_value(entry.key_data());
933 let (page, _, value_range) = entry.into_raw();
934 let collection = AccessGuard::with_page(page, value_range);
935 Some(
936 DynamicCollection::iter(
937 collection,
938 self.transaction_guard.clone(),
939 self.mem.clone(),
940 )
941 .map(|iter| (key, iter)),
942 )
943 }
944 Err(err) => Some(Err(err)),
945 }
946 }
947}
948
949pub struct MultimapTable<'txn, K: Key + 'static, V: Key + 'static> {
953 name: String,
954 num_values: u64,
955 transaction: &'txn WriteTransaction,
956 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
957 allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
958 tree: BtreeMut<'txn, K, &'static DynamicCollection<V>>,
959 mem: Arc<TransactionalMemory>,
960 _value_type: PhantomData<V>,
961}
962
963impl<K: Key + 'static, V: Key + 'static> MultimapTableHandle for MultimapTable<'_, K, V> {
964 fn name(&self) -> &str {
965 &self.name
966 }
967}
968
969impl<'txn, K: Key + 'static, V: Key + 'static> MultimapTable<'txn, K, V> {
970 pub(crate) fn new(
971 name: &str,
972 table_root: Option<BtreeHeader>,
973 num_values: u64,
974 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
975 allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
976 mem: Arc<TransactionalMemory>,
977 transaction: &'txn WriteTransaction,
978 ) -> MultimapTable<'txn, K, V> {
979 MultimapTable {
980 name: name.to_string(),
981 num_values,
982 transaction,
983 freed_pages: freed_pages.clone(),
984 allocated_pages: allocated_pages.clone(),
985 tree: BtreeMut::new(
986 table_root,
987 transaction.transaction_guard(),
988 mem.clone(),
989 freed_pages,
990 allocated_pages,
991 ),
992 mem,
993 _value_type: Default::default(),
994 }
995 }
996
997 #[allow(dead_code)]
998 pub(crate) fn print_debug(&self, include_values: bool) -> Result {
999 self.tree.print_debug(include_values)
1000 }
1001
1002 pub fn insert<'k, 'v>(
1006 &mut self,
1007 key: impl Borrow<K::SelfType<'k>>,
1008 value: impl Borrow<V::SelfType<'v>>,
1009 ) -> Result<bool> {
1010 let value_bytes = V::as_bytes(value.borrow());
1011 let value_bytes_ref = value_bytes.as_ref();
1012 if value_bytes_ref.len() > MAX_VALUE_LENGTH {
1013 return Err(StorageError::ValueTooLarge(value_bytes_ref.len()));
1014 }
1015 let key_len = K::as_bytes(key.borrow()).as_ref().len();
1016 if key_len > MAX_VALUE_LENGTH {
1017 return Err(StorageError::ValueTooLarge(key_len));
1018 }
1019 if value_bytes_ref.len() + key_len > MAX_PAIR_LENGTH {
1020 return Err(StorageError::ValueTooLarge(value_bytes_ref.len() + key_len));
1021 }
1022 let get_result = self.tree.get(key.borrow())?;
1023 let existed = if get_result.is_some() {
1024 #[allow(clippy::unnecessary_unwrap)]
1025 let guard = get_result.unwrap();
1026 let collection_type = guard.value().collection_type();
1027 match collection_type {
1028 Inline => {
1029 let leaf_data = guard.value().as_inline();
1030 let accessor = LeafAccessor::new(
1031 leaf_data,
1032 V::fixed_width(),
1033 <() as Value>::fixed_width(),
1034 );
1035 let (position, found) = accessor.position::<V>(value_bytes_ref);
1036 if found {
1037 return Ok(true);
1038 }
1039
1040 let num_pairs = accessor.num_pairs();
1041 let new_pairs = num_pairs + 1;
1042 let new_pair_bytes =
1043 accessor.length_of_pairs(0, accessor.num_pairs()) + value_bytes_ref.len();
1044 let new_key_bytes =
1045 accessor.length_of_keys(0, accessor.num_pairs()) + value_bytes_ref.len();
1046 let required_inline_bytes = RawLeafBuilder::required_bytes(
1047 new_pairs,
1048 new_pair_bytes,
1049 V::fixed_width(),
1050 <() as Value>::fixed_width(),
1051 );
1052
1053 if required_inline_bytes < self.mem.get_page_size() / 2 {
1054 let mut data = vec![0; required_inline_bytes];
1055 let mut builder = RawLeafBuilder::new(
1056 &mut data,
1057 new_pairs,
1058 V::fixed_width(),
1059 <() as Value>::fixed_width(),
1060 new_key_bytes,
1061 );
1062 for i in 0..accessor.num_pairs() {
1063 if i == position {
1064 builder
1065 .append(value_bytes_ref, <() as Value>::as_bytes(&()).as_ref());
1066 }
1067 let entry = accessor.entry(i).unwrap();
1068 builder.append(entry.key(), entry.value());
1069 }
1070 if position == accessor.num_pairs() {
1071 builder.append(value_bytes_ref, <() as Value>::as_bytes(&()).as_ref());
1072 }
1073 drop(builder);
1074 drop(guard);
1075 let inline_data = DynamicCollection::<V>::make_inline_data(&data);
1076 self.tree
1077 .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1078 } else {
1079 let mut allocated = self.allocated_pages.lock().unwrap();
1081 let mut page = self.mem.allocate(leaf_data.len(), &mut allocated)?;
1082 drop(allocated);
1083 page.memory_mut()[..leaf_data.len()].copy_from_slice(leaf_data);
1084 let page_number = page.get_page_number();
1085 drop(page);
1086 drop(guard);
1087
1088 let mut subtree: BtreeMut<'_, V, ()> = BtreeMut::new(
1090 Some(BtreeHeader::new(page_number, 0, num_pairs as u64)),
1091 self.transaction.transaction_guard(),
1092 self.mem.clone(),
1093 self.freed_pages.clone(),
1094 self.allocated_pages.clone(),
1095 );
1096 let existed = subtree.insert(value.borrow(), &())?.is_some();
1097 assert_eq!(existed, found);
1098 let subtree_data =
1099 DynamicCollection::<V>::make_subtree_data(subtree.get_root().unwrap());
1100 self.tree
1101 .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1102 }
1103
1104 found
1105 }
1106 SubtreeV2 => {
1107 let mut subtree: BtreeMut<'_, V, ()> = BtreeMut::new(
1108 Some(guard.value().as_subtree()),
1109 self.transaction.transaction_guard(),
1110 self.mem.clone(),
1111 self.freed_pages.clone(),
1112 self.allocated_pages.clone(),
1113 );
1114 drop(guard);
1115 let existed = subtree.insert(value.borrow(), &())?.is_some();
1116 let subtree_data =
1117 DynamicCollection::<V>::make_subtree_data(subtree.get_root().unwrap());
1118 self.tree
1119 .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1120
1121 existed
1122 }
1123 }
1124 } else {
1125 drop(get_result);
1126 let required_inline_bytes = RawLeafBuilder::required_bytes(
1127 1,
1128 value_bytes_ref.len(),
1129 V::fixed_width(),
1130 <() as Value>::fixed_width(),
1131 );
1132 if required_inline_bytes < self.mem.get_page_size() / 2 {
1133 let mut data = vec![0; required_inline_bytes];
1134 let mut builder = RawLeafBuilder::new(
1135 &mut data,
1136 1,
1137 V::fixed_width(),
1138 <() as Value>::fixed_width(),
1139 value_bytes_ref.len(),
1140 );
1141 builder.append(value_bytes_ref, <() as Value>::as_bytes(&()).as_ref());
1142 drop(builder);
1143 let inline_data = DynamicCollection::<V>::make_inline_data(&data);
1144 self.tree
1145 .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1146 } else {
1147 let mut subtree: BtreeMut<'_, V, ()> = BtreeMut::new(
1148 None,
1149 self.transaction.transaction_guard(),
1150 self.mem.clone(),
1151 self.freed_pages.clone(),
1152 self.allocated_pages.clone(),
1153 );
1154 subtree.insert(value.borrow(), &())?;
1155 let subtree_data =
1156 DynamicCollection::<V>::make_subtree_data(subtree.get_root().unwrap());
1157 self.tree
1158 .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1159 }
1160 false
1161 };
1162
1163 if !existed {
1164 self.num_values += 1;
1165 }
1166
1167 Ok(existed)
1168 }
1169
1170 pub fn remove<'k, 'v>(
1174 &mut self,
1175 key: impl Borrow<K::SelfType<'k>>,
1176 value: impl Borrow<V::SelfType<'v>>,
1177 ) -> Result<bool> {
1178 let get_result = self.tree.get(key.borrow())?;
1179 if get_result.is_none() {
1180 return Ok(false);
1181 }
1182 let guard = get_result.unwrap();
1183 let v = guard.value();
1184 let existed = match v.collection_type() {
1185 Inline => {
1186 let leaf_data = v.as_inline();
1187 let accessor =
1188 LeafAccessor::new(leaf_data, V::fixed_width(), <() as Value>::fixed_width());
1189 if let Some(position) = accessor.find_key::<V>(V::as_bytes(value.borrow()).as_ref())
1190 {
1191 let old_num_pairs = accessor.num_pairs();
1192 if old_num_pairs == 1 {
1193 drop(guard);
1194 self.tree.remove(key.borrow())?;
1195 } else {
1196 let old_pairs_len = accessor.length_of_pairs(0, old_num_pairs);
1197 let removed_value_len = accessor.entry(position).unwrap().key().len();
1198 let required = RawLeafBuilder::required_bytes(
1199 old_num_pairs - 1,
1200 old_pairs_len - removed_value_len,
1201 V::fixed_width(),
1202 <() as Value>::fixed_width(),
1203 );
1204 let mut new_data = vec![0; required];
1205 let new_key_len =
1206 accessor.length_of_keys(0, old_num_pairs) - removed_value_len;
1207 let mut builder = RawLeafBuilder::new(
1208 &mut new_data,
1209 old_num_pairs - 1,
1210 V::fixed_width(),
1211 <() as Value>::fixed_width(),
1212 new_key_len,
1213 );
1214 for i in 0..old_num_pairs {
1215 if i != position {
1216 let entry = accessor.entry(i).unwrap();
1217 builder.append(entry.key(), entry.value());
1218 }
1219 }
1220 drop(builder);
1221 drop(guard);
1222
1223 let inline_data = DynamicCollection::<V>::make_inline_data(&new_data);
1224 self.tree
1225 .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1226 }
1227 true
1228 } else {
1229 drop(guard);
1230 false
1231 }
1232 }
1233 SubtreeV2 => {
1234 let mut subtree: BtreeMut<V, ()> = BtreeMut::new(
1235 Some(v.as_subtree()),
1236 self.transaction.transaction_guard(),
1237 self.mem.clone(),
1238 self.freed_pages.clone(),
1239 self.allocated_pages.clone(),
1240 );
1241 drop(guard);
1242 let existed = subtree.remove(value.borrow())?.is_some();
1243
1244 if let Some(BtreeHeader {
1245 root: new_root,
1246 checksum: new_checksum,
1247 length: new_length,
1248 }) = subtree.get_root()
1249 {
1250 let page = self.mem.get_page(new_root)?;
1251 match page.memory()[0] {
1252 LEAF => {
1253 let accessor = LeafAccessor::new(
1254 page.memory(),
1255 V::fixed_width(),
1256 <() as Value>::fixed_width(),
1257 );
1258 let len = accessor.total_length();
1259 if len < self.mem.get_page_size() / 2 {
1260 let inline_data =
1261 DynamicCollection::<V>::make_inline_data(&page.memory()[..len]);
1262 self.tree
1263 .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1264 drop(page);
1265 let mut allocated_pages = self.allocated_pages.lock().unwrap();
1266 if !self.mem.free_if_uncommitted(new_root, &mut allocated_pages) {
1267 (*self.freed_pages).lock().unwrap().push(new_root);
1268 }
1269 } else {
1270 let subtree_data =
1271 DynamicCollection::<V>::make_subtree_data(BtreeHeader::new(
1272 new_root,
1273 new_checksum,
1274 accessor.num_pairs() as u64,
1275 ));
1276 self.tree
1277 .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1278 }
1279 }
1280 BRANCH => {
1281 let subtree_data = DynamicCollection::<V>::make_subtree_data(
1282 BtreeHeader::new(new_root, new_checksum, new_length),
1283 );
1284 self.tree
1285 .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1286 }
1287 _ => unreachable!(),
1288 }
1289 } else {
1290 self.tree.remove(key.borrow())?;
1291 }
1292
1293 existed
1294 }
1295 };
1296
1297 if existed {
1298 self.num_values -= 1;
1299 }
1300
1301 Ok(existed)
1302 }
1303
1304 pub fn remove_all<'a>(
1308 &mut self,
1309 key: impl Borrow<K::SelfType<'a>>,
1310 ) -> Result<MultimapValue<V>> {
1311 let iter = if let Some(collection) = self.tree.remove(key.borrow())? {
1312 let mut pages = vec![];
1313 if matches!(
1314 collection.value().collection_type(),
1315 DynamicCollectionType::SubtreeV2
1316 ) {
1317 let root = collection.value().as_subtree().root;
1318 let all_pages = AllPageNumbersBtreeIter::new(
1319 root,
1320 V::fixed_width(),
1321 <() as Value>::fixed_width(),
1322 self.mem.clone(),
1323 )?;
1324 for page in all_pages {
1325 pages.push(page?);
1326 }
1327 }
1328
1329 self.num_values -= collection.value().get_num_values();
1330
1331 DynamicCollection::iter_free_on_drop(
1332 collection,
1333 pages,
1334 self.freed_pages.clone(),
1335 self.allocated_pages.clone(),
1336 self.transaction.transaction_guard(),
1337 self.mem.clone(),
1338 )?
1339 } else {
1340 MultimapValue::new_subtree(
1341 BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), None, self.mem.clone())?,
1342 0,
1343 self.transaction.transaction_guard(),
1344 )
1345 };
1346
1347 Ok(iter)
1348 }
1349}
1350
1351impl<K: Key + 'static, V: Key + 'static> ReadableTableMetadata for MultimapTable<'_, K, V> {
1352 fn stats(&self) -> Result<TableStats> {
1353 let tree_stats = multimap_btree_stats(
1354 self.tree.get_root().map(|x| x.root),
1355 &self.mem,
1356 K::fixed_width(),
1357 V::fixed_width(),
1358 )?;
1359
1360 Ok(TableStats {
1361 tree_height: tree_stats.tree_height,
1362 leaf_pages: tree_stats.leaf_pages,
1363 branch_pages: tree_stats.branch_pages,
1364 stored_leaf_bytes: tree_stats.stored_leaf_bytes,
1365 metadata_bytes: tree_stats.metadata_bytes,
1366 fragmented_bytes: tree_stats.fragmented_bytes,
1367 })
1368 }
1369
1370 fn len(&self) -> Result<u64> {
1372 Ok(self.num_values)
1373 }
1374}
1375
1376impl<K: Key + 'static, V: Key + 'static> ReadableMultimapTable<K, V> for MultimapTable<'_, K, V> {
1377 fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<V>> {
1379 let guard = self.transaction.transaction_guard();
1380 let iter = if let Some(collection) = self.tree.get(key.borrow())? {
1381 DynamicCollection::iter(collection, guard, self.mem.clone())?
1382 } else {
1383 MultimapValue::new_subtree(
1384 BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), None, self.mem.clone())?,
1385 0,
1386 guard,
1387 )
1388 };
1389
1390 Ok(iter)
1391 }
1392
1393 fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<MultimapRange<K, V>>
1395 where
1396 KR: Borrow<K::SelfType<'a>> + 'a,
1397 {
1398 let inner = self.tree.range(&range)?;
1399 Ok(MultimapRange::new(
1400 inner,
1401 self.transaction.transaction_guard(),
1402 self.mem.clone(),
1403 ))
1404 }
1405}
1406
1407impl<K: Key + 'static, V: Key + 'static> Sealed for MultimapTable<'_, K, V> {}
1408
1409impl<K: Key + 'static, V: Key + 'static> Drop for MultimapTable<'_, K, V> {
1410 fn drop(&mut self) {
1411 self.transaction
1412 .close_table(&self.name, &self.tree, self.num_values);
1413 }
1414}
1415
1416pub trait ReadableMultimapTable<K: Key + 'static, V: Key + 'static>: ReadableTableMetadata {
1417 fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<V>>;
1419
1420 fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<MultimapRange<K, V>>
1421 where
1422 KR: Borrow<K::SelfType<'a>> + 'a;
1423
1424 fn iter(&self) -> Result<MultimapRange<K, V>> {
1427 self.range::<K::SelfType<'_>>(..)
1428 }
1429}
1430
1431pub struct ReadOnlyUntypedMultimapTable {
1433 num_values: u64,
1434 tree: RawBtree,
1435 fixed_key_size: Option<usize>,
1436 fixed_value_size: Option<usize>,
1437 mem: Arc<TransactionalMemory>,
1438}
1439
1440impl Sealed for ReadOnlyUntypedMultimapTable {}
1441
1442impl ReadableTableMetadata for ReadOnlyUntypedMultimapTable {
1443 fn stats(&self) -> Result<TableStats> {
1445 let tree_stats = multimap_btree_stats(
1446 self.tree.get_root().map(|x| x.root),
1447 &self.mem,
1448 self.fixed_key_size,
1449 self.fixed_value_size,
1450 )?;
1451
1452 Ok(TableStats {
1453 tree_height: tree_stats.tree_height,
1454 leaf_pages: tree_stats.leaf_pages,
1455 branch_pages: tree_stats.branch_pages,
1456 stored_leaf_bytes: tree_stats.stored_leaf_bytes,
1457 metadata_bytes: tree_stats.metadata_bytes,
1458 fragmented_bytes: tree_stats.fragmented_bytes,
1459 })
1460 }
1461
1462 fn len(&self) -> Result<u64> {
1463 Ok(self.num_values)
1464 }
1465}
1466
1467impl ReadOnlyUntypedMultimapTable {
1468 pub(crate) fn new(
1469 root: Option<BtreeHeader>,
1470 num_values: u64,
1471 fixed_key_size: Option<usize>,
1472 fixed_value_size: Option<usize>,
1473 mem: Arc<TransactionalMemory>,
1474 ) -> Self {
1475 Self {
1476 num_values,
1477 tree: RawBtree::new(
1478 root,
1479 fixed_key_size,
1480 DynamicCollection::<()>::fixed_width_with(fixed_value_size),
1481 mem.clone(),
1482 ),
1483 fixed_key_size,
1484 fixed_value_size,
1485 mem,
1486 }
1487 }
1488}
1489
1490pub struct ReadOnlyMultimapTable<K: Key + 'static, V: Key + 'static> {
1492 tree: Btree<K, &'static DynamicCollection<V>>,
1493 num_values: u64,
1494 mem: Arc<TransactionalMemory>,
1495 transaction_guard: Arc<TransactionGuard>,
1496 _value_type: PhantomData<V>,
1497}
1498
1499impl<K: Key + 'static, V: Key + 'static> ReadOnlyMultimapTable<K, V> {
1500 pub(crate) fn new(
1501 root: Option<BtreeHeader>,
1502 num_values: u64,
1503 hint: PageHint,
1504 guard: Arc<TransactionGuard>,
1505 mem: Arc<TransactionalMemory>,
1506 ) -> Result<ReadOnlyMultimapTable<K, V>> {
1507 Ok(ReadOnlyMultimapTable {
1508 tree: Btree::new(root, hint, guard.clone(), mem.clone())?,
1509 num_values,
1510 mem,
1511 transaction_guard: guard,
1512 _value_type: Default::default(),
1513 })
1514 }
1515
1516 pub fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<'static, V>> {
1519 let iter = if let Some(collection) = self.tree.get(key.borrow())? {
1520 DynamicCollection::iter(collection, self.transaction_guard.clone(), self.mem.clone())?
1521 } else {
1522 MultimapValue::new_subtree(
1523 BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), None, self.mem.clone())?,
1524 0,
1525 self.transaction_guard.clone(),
1526 )
1527 };
1528
1529 Ok(iter)
1530 }
1531
1532 pub fn range<'a, KR>(&self, range: impl RangeBounds<KR>) -> Result<MultimapRange<'static, K, V>>
1535 where
1536 KR: Borrow<K::SelfType<'a>>,
1537 {
1538 let inner = self.tree.range(&range)?;
1539 Ok(MultimapRange::new(
1540 inner,
1541 self.transaction_guard.clone(),
1542 self.mem.clone(),
1543 ))
1544 }
1545}
1546
1547impl<K: Key + 'static, V: Key + 'static> ReadableTableMetadata for ReadOnlyMultimapTable<K, V> {
1548 fn stats(&self) -> Result<TableStats> {
1549 let tree_stats = multimap_btree_stats(
1550 self.tree.get_root().map(|x| x.root),
1551 &self.mem,
1552 K::fixed_width(),
1553 V::fixed_width(),
1554 )?;
1555
1556 Ok(TableStats {
1557 tree_height: tree_stats.tree_height,
1558 leaf_pages: tree_stats.leaf_pages,
1559 branch_pages: tree_stats.branch_pages,
1560 stored_leaf_bytes: tree_stats.stored_leaf_bytes,
1561 metadata_bytes: tree_stats.metadata_bytes,
1562 fragmented_bytes: tree_stats.fragmented_bytes,
1563 })
1564 }
1565
1566 fn len(&self) -> Result<u64> {
1567 Ok(self.num_values)
1568 }
1569}
1570
1571impl<K: Key + 'static, V: Key + 'static> ReadableMultimapTable<K, V>
1572 for ReadOnlyMultimapTable<K, V>
1573{
1574 fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<V>> {
1576 let iter = if let Some(collection) = self.tree.get(key.borrow())? {
1577 DynamicCollection::iter(collection, self.transaction_guard.clone(), self.mem.clone())?
1578 } else {
1579 MultimapValue::new_subtree(
1580 BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), None, self.mem.clone())?,
1581 0,
1582 self.transaction_guard.clone(),
1583 )
1584 };
1585
1586 Ok(iter)
1587 }
1588
1589 fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<MultimapRange<K, V>>
1590 where
1591 KR: Borrow<K::SelfType<'a>> + 'a,
1592 {
1593 let inner = self.tree.range(&range)?;
1594 Ok(MultimapRange::new(
1595 inner,
1596 self.transaction_guard.clone(),
1597 self.mem.clone(),
1598 ))
1599 }
1600}
1601
1602impl<K: Key, V: Key> Sealed for ReadOnlyMultimapTable<K, V> {}