redb/
multimap_table.rs

1use crate::db::TransactionGuard;
2use crate::multimap_table::DynamicCollectionType::{Inline, SubtreeV2};
3use crate::sealed::Sealed;
4use crate::table::{ReadableTableMetadata, TableStats};
5use crate::tree_store::{
6    AllPageNumbersBtreeIter, BRANCH, BranchAccessor, BranchMutator, Btree, BtreeHeader, BtreeMut,
7    BtreeRangeIter, BtreeStats, Checksum, DEFERRED, LEAF, LeafAccessor, LeafMutator,
8    MAX_PAIR_LENGTH, MAX_VALUE_LENGTH, Page, PageHint, PageNumber, PagePath, PageTrackerPolicy,
9    RawBtree, RawLeafBuilder, TransactionalMemory, UntypedBtree, UntypedBtreeMut, btree_stats,
10};
11use crate::types::{Key, TypeName, Value};
12use crate::{AccessGuard, MultimapTableHandle, Result, StorageError, WriteTransaction};
13use std::borrow::Borrow;
14use std::cmp::max;
15use std::collections::HashMap;
16use std::convert::TryInto;
17use std::marker::PhantomData;
18use std::mem;
19use std::mem::size_of;
20use std::ops::{RangeBounds, RangeFull};
21use std::sync::{Arc, Mutex};
22
23pub(crate) fn multimap_btree_stats(
24    root: Option<PageNumber>,
25    mem: &TransactionalMemory,
26    fixed_key_size: Option<usize>,
27    fixed_value_size: Option<usize>,
28) -> Result<BtreeStats> {
29    if let Some(root) = root {
30        multimap_stats_helper(root, mem, fixed_key_size, fixed_value_size)
31    } else {
32        Ok(BtreeStats {
33            tree_height: 0,
34            leaf_pages: 0,
35            branch_pages: 0,
36            stored_leaf_bytes: 0,
37            metadata_bytes: 0,
38            fragmented_bytes: 0,
39        })
40    }
41}
42
43fn multimap_stats_helper(
44    page_number: PageNumber,
45    mem: &TransactionalMemory,
46    fixed_key_size: Option<usize>,
47    fixed_value_size: Option<usize>,
48) -> Result<BtreeStats> {
49    let page = mem.get_page(page_number)?;
50    let node_mem = page.memory();
51    match node_mem[0] {
52        LEAF => {
53            let accessor = LeafAccessor::new(
54                page.memory(),
55                fixed_key_size,
56                DynamicCollection::<()>::fixed_width_with(fixed_value_size),
57            );
58            let mut leaf_bytes = 0u64;
59            let mut is_branch = false;
60            for i in 0..accessor.num_pairs() {
61                let entry = accessor.entry(i).unwrap();
62                let collection: &UntypedDynamicCollection =
63                    UntypedDynamicCollection::new(entry.value());
64                match collection.collection_type() {
65                    Inline => {
66                        let inline_accessor = LeafAccessor::new(
67                            collection.as_inline(),
68                            fixed_value_size,
69                            <() as Value>::fixed_width(),
70                        );
71                        leaf_bytes +=
72                            inline_accessor.length_of_pairs(0, inline_accessor.num_pairs()) as u64;
73                    }
74                    SubtreeV2 => {
75                        is_branch = true;
76                    }
77                }
78            }
79            let mut overhead_bytes = (accessor.total_length() as u64) - leaf_bytes;
80            let mut fragmented_bytes = (page.memory().len() - accessor.total_length()) as u64;
81            let mut max_child_height = 0;
82            let (mut leaf_pages, mut branch_pages) = if is_branch { (0, 1) } else { (1, 0) };
83
84            for i in 0..accessor.num_pairs() {
85                let entry = accessor.entry(i).unwrap();
86                let collection: &UntypedDynamicCollection =
87                    UntypedDynamicCollection::new(entry.value());
88                match collection.collection_type() {
89                    Inline => {
90                        // data is inline, so it was already counted above
91                    }
92                    SubtreeV2 => {
93                        // this is a sub-tree, so traverse it
94                        let stats = btree_stats(
95                            Some(collection.as_subtree().root),
96                            mem,
97                            fixed_value_size,
98                            <() as Value>::fixed_width(),
99                        )?;
100                        max_child_height = max(max_child_height, stats.tree_height);
101                        branch_pages += stats.branch_pages;
102                        leaf_pages += stats.leaf_pages;
103                        fragmented_bytes += stats.fragmented_bytes;
104                        overhead_bytes += stats.metadata_bytes;
105                        leaf_bytes += stats.stored_leaf_bytes;
106                    }
107                }
108            }
109
110            Ok(BtreeStats {
111                tree_height: max_child_height + 1,
112                leaf_pages,
113                branch_pages,
114                stored_leaf_bytes: leaf_bytes,
115                metadata_bytes: overhead_bytes,
116                fragmented_bytes,
117            })
118        }
119        BRANCH => {
120            let accessor = BranchAccessor::new(&page, fixed_key_size);
121            let mut max_child_height = 0;
122            let mut leaf_pages = 0;
123            let mut branch_pages = 1;
124            let mut stored_leaf_bytes = 0;
125            let mut metadata_bytes = accessor.total_length() as u64;
126            let mut fragmented_bytes = (page.memory().len() - accessor.total_length()) as u64;
127            for i in 0..accessor.count_children() {
128                if let Some(child) = accessor.child_page(i) {
129                    let stats =
130                        multimap_stats_helper(child, mem, fixed_key_size, fixed_value_size)?;
131                    max_child_height = max(max_child_height, stats.tree_height);
132                    leaf_pages += stats.leaf_pages;
133                    branch_pages += stats.branch_pages;
134                    stored_leaf_bytes += stats.stored_leaf_bytes;
135                    metadata_bytes += stats.metadata_bytes;
136                    fragmented_bytes += stats.fragmented_bytes;
137                }
138            }
139
140            Ok(BtreeStats {
141                tree_height: max_child_height + 1,
142                leaf_pages,
143                branch_pages,
144                stored_leaf_bytes,
145                metadata_bytes,
146                fragmented_bytes,
147            })
148        }
149        _ => unreachable!(),
150    }
151}
152
153// Verify all the checksums in the tree, including any Dynamic collection subtrees
154pub(crate) fn verify_tree_and_subtree_checksums(
155    root: Option<BtreeHeader>,
156    key_size: Option<usize>,
157    value_size: Option<usize>,
158    mem: Arc<TransactionalMemory>,
159) -> Result<bool> {
160    if let Some(header) = root {
161        if !RawBtree::new(
162            Some(header),
163            key_size,
164            DynamicCollection::<()>::fixed_width_with(value_size),
165            mem.clone(),
166        )
167        .verify_checksum()?
168        {
169            return Ok(false);
170        }
171
172        let table_pages_iter = AllPageNumbersBtreeIter::new(
173            header.root,
174            key_size,
175            DynamicCollection::<()>::fixed_width_with(value_size),
176            mem.clone(),
177        )?;
178        for table_page in table_pages_iter {
179            let page = mem.get_page(table_page?)?;
180            let subtree_roots = parse_subtree_roots(&page, key_size, value_size);
181            for header in subtree_roots {
182                if !RawBtree::new(Some(header), value_size, <()>::fixed_width(), mem.clone())
183                    .verify_checksum()?
184                {
185                    return Ok(false);
186                }
187            }
188        }
189    }
190
191    Ok(true)
192}
193
194// Relocate all subtrees to lower index pages, if possible
195pub(crate) fn relocate_subtrees(
196    root: (PageNumber, Checksum),
197    key_size: Option<usize>,
198    value_size: Option<usize>,
199    mem: Arc<TransactionalMemory>,
200    freed_pages: Arc<Mutex<Vec<PageNumber>>>,
201    relocation_map: &HashMap<PageNumber, PageNumber>,
202) -> Result<(PageNumber, Checksum)> {
203    let old_page = mem.get_page(root.0)?;
204    let mut new_page = if let Some(new_page_number) = relocation_map.get(&root.0) {
205        mem.get_page_mut(*new_page_number)?
206    } else {
207        return Ok(root);
208    };
209    let new_page_number = new_page.get_page_number();
210    new_page.memory_mut().copy_from_slice(old_page.memory());
211
212    match old_page.memory()[0] {
213        LEAF => {
214            let accessor = LeafAccessor::new(
215                old_page.memory(),
216                key_size,
217                UntypedDynamicCollection::fixed_width_with(value_size),
218            );
219            // TODO: maybe there's a better abstraction, so that we don't need to call into this low-level method?
220            let mut mutator = LeafMutator::new(
221                &mut new_page,
222                key_size,
223                UntypedDynamicCollection::fixed_width_with(value_size),
224            );
225            for i in 0..accessor.num_pairs() {
226                let entry = accessor.entry(i).unwrap();
227                let collection = UntypedDynamicCollection::from_bytes(entry.value());
228                if matches!(collection.collection_type(), SubtreeV2) {
229                    let sub_root = collection.as_subtree();
230                    let mut tree = UntypedBtreeMut::new(
231                        Some(sub_root),
232                        mem.clone(),
233                        freed_pages.clone(),
234                        value_size,
235                        <() as Value>::fixed_width(),
236                    );
237                    tree.relocate(relocation_map)?;
238                    if sub_root != tree.get_root().unwrap() {
239                        let new_collection =
240                            UntypedDynamicCollection::make_subtree_data(tree.get_root().unwrap());
241                        mutator.insert(i, true, entry.key(), &new_collection);
242                    }
243                }
244            }
245        }
246        BRANCH => {
247            let accessor = BranchAccessor::new(&old_page, key_size);
248            let mut mutator = BranchMutator::new(&mut new_page);
249            for i in 0..accessor.count_children() {
250                if let Some(child) = accessor.child_page(i) {
251                    let child_checksum = accessor.child_checksum(i).unwrap();
252                    let (new_child, new_checksum) = relocate_subtrees(
253                        (child, child_checksum),
254                        key_size,
255                        value_size,
256                        mem.clone(),
257                        freed_pages.clone(),
258                        relocation_map,
259                    )?;
260                    mutator.write_child_page(i, new_child, new_checksum);
261                }
262            }
263        }
264        _ => unreachable!(),
265    }
266
267    let old_page_number = old_page.get_page_number();
268    drop(old_page);
269    // No need to track allocations, because this method is only called during compaction when
270    // there can't be any savepoints
271    let mut ignore = PageTrackerPolicy::Ignore;
272    if !mem.free_if_uncommitted(old_page_number, &mut ignore) {
273        freed_pages.lock().unwrap().push(old_page_number);
274    }
275    Ok((new_page_number, DEFERRED))
276}
277
278// Finalize all the checksums in the tree, including any Dynamic collection subtrees
279// Returns the root checksum
280pub(crate) fn finalize_tree_and_subtree_checksums(
281    root: Option<BtreeHeader>,
282    key_size: Option<usize>,
283    value_size: Option<usize>,
284    mem: Arc<TransactionalMemory>,
285) -> Result<Option<BtreeHeader>> {
286    let freed_pages = Arc::new(Mutex::new(vec![]));
287    let mut tree = UntypedBtreeMut::new(
288        root,
289        mem.clone(),
290        freed_pages.clone(),
291        key_size,
292        DynamicCollection::<()>::fixed_width_with(value_size),
293    );
294    tree.dirty_leaf_visitor(|mut leaf_page| {
295        let mut sub_root_updates = vec![];
296        let accessor = LeafAccessor::new(
297            leaf_page.memory(),
298            key_size,
299            DynamicCollection::<()>::fixed_width_with(value_size),
300        );
301        for i in 0..accessor.num_pairs() {
302            let entry = accessor.entry(i).unwrap();
303            let collection = <&DynamicCollection<()>>::from_bytes(entry.value());
304            if matches!(collection.collection_type(), SubtreeV2) {
305                let sub_root = collection.as_subtree();
306                if mem.uncommitted(sub_root.root) {
307                    let mut subtree = UntypedBtreeMut::new(
308                        Some(sub_root),
309                        mem.clone(),
310                        freed_pages.clone(),
311                        value_size,
312                        <()>::fixed_width(),
313                    );
314                    let subtree_root = subtree.finalize_dirty_checksums()?.unwrap();
315                    sub_root_updates.push((i, entry.key().to_vec(), subtree_root));
316                }
317            }
318        }
319        // TODO: maybe there's a better abstraction, so that we don't need to call into this low-level method?
320        let mut mutator = LeafMutator::new(
321            &mut leaf_page,
322            key_size,
323            DynamicCollection::<()>::fixed_width_with(value_size),
324        );
325        for (i, key, sub_root) in sub_root_updates {
326            let collection = DynamicCollection::<()>::make_subtree_data(sub_root);
327            mutator.insert(i, true, &key, &collection);
328        }
329
330        Ok(())
331    })?;
332
333    let root = tree.finalize_dirty_checksums()?;
334    // No pages should have been freed by this operation
335    assert!(freed_pages.lock().unwrap().is_empty());
336    Ok(root)
337}
338
339fn parse_subtree_roots<T: Page>(
340    page: &T,
341    fixed_key_size: Option<usize>,
342    fixed_value_size: Option<usize>,
343) -> Vec<BtreeHeader> {
344    match page.memory()[0] {
345        BRANCH => {
346            vec![]
347        }
348        LEAF => {
349            let mut result = vec![];
350            let accessor = LeafAccessor::new(
351                page.memory(),
352                fixed_key_size,
353                DynamicCollection::<()>::fixed_width_with(fixed_value_size),
354            );
355            for i in 0..accessor.num_pairs() {
356                let entry = accessor.entry(i).unwrap();
357                let collection = <&DynamicCollection<()>>::from_bytes(entry.value());
358                if matches!(collection.collection_type(), SubtreeV2) {
359                    result.push(collection.as_subtree());
360                }
361            }
362
363            result
364        }
365        _ => unreachable!(),
366    }
367}
368
369pub(crate) struct UntypedMultiBtree {
370    mem: Arc<TransactionalMemory>,
371    root: Option<BtreeHeader>,
372    key_width: Option<usize>,
373    value_width: Option<usize>,
374}
375
376impl UntypedMultiBtree {
377    pub(crate) fn new(
378        root: Option<BtreeHeader>,
379        mem: Arc<TransactionalMemory>,
380        key_width: Option<usize>,
381        value_width: Option<usize>,
382    ) -> Self {
383        Self {
384            mem,
385            root,
386            key_width,
387            value_width,
388        }
389    }
390
391    // Applies visitor to pages in the tree
392    pub(crate) fn visit_all_pages<F>(&self, mut visitor: F) -> Result
393    where
394        F: FnMut(&PagePath) -> Result,
395    {
396        let tree = UntypedBtree::new(
397            self.root,
398            self.mem.clone(),
399            self.key_width,
400            UntypedDynamicCollection::fixed_width_with(self.value_width),
401        );
402        tree.visit_all_pages(|path| {
403            visitor(path)?;
404            let page = self.mem.get_page(path.page_number())?;
405            match page.memory()[0] {
406                LEAF => {
407                    for header in parse_subtree_roots(&page, self.key_width, self.value_width) {
408                        let subtree = UntypedBtree::new(
409                            Some(header),
410                            self.mem.clone(),
411                            self.value_width,
412                            <() as Value>::fixed_width(),
413                        );
414                        subtree.visit_all_pages(|subpath| {
415                            let full_path = path.with_subpath(subpath);
416                            visitor(&full_path)
417                        })?;
418                    }
419                }
420                BRANCH => {
421                    // No-op. The tree.visit_pages() call will process this sub-tree
422                }
423                _ => unreachable!(),
424            }
425            Ok(())
426        })?;
427
428        Ok(())
429    }
430}
431
432pub(crate) struct LeafKeyIter<'a, V: Key + 'static> {
433    inline_collection: AccessGuard<'a, &'static DynamicCollection<V>>,
434    fixed_key_size: Option<usize>,
435    fixed_value_size: Option<usize>,
436    start_entry: isize, // inclusive
437    end_entry: isize,   // inclusive
438}
439
440impl<'a, V: Key> LeafKeyIter<'a, V> {
441    fn new(
442        data: AccessGuard<'a, &'static DynamicCollection<V>>,
443        fixed_key_size: Option<usize>,
444        fixed_value_size: Option<usize>,
445    ) -> Self {
446        let accessor =
447            LeafAccessor::new(data.value().as_inline(), fixed_key_size, fixed_value_size);
448        let end_entry = isize::try_from(accessor.num_pairs()).unwrap() - 1;
449        Self {
450            inline_collection: data,
451            fixed_key_size,
452            fixed_value_size,
453            start_entry: 0,
454            end_entry,
455        }
456    }
457
458    fn next_key(&mut self) -> Option<&[u8]> {
459        if self.end_entry < self.start_entry {
460            return None;
461        }
462        let accessor = LeafAccessor::new(
463            self.inline_collection.value().as_inline(),
464            self.fixed_key_size,
465            self.fixed_value_size,
466        );
467        self.start_entry += 1;
468        accessor
469            .entry((self.start_entry - 1).try_into().unwrap())
470            .map(|e| e.key())
471    }
472
473    fn next_key_back(&mut self) -> Option<&[u8]> {
474        if self.end_entry < self.start_entry {
475            return None;
476        }
477        let accessor = LeafAccessor::new(
478            self.inline_collection.value().as_inline(),
479            self.fixed_key_size,
480            self.fixed_value_size,
481        );
482        self.end_entry -= 1;
483        accessor
484            .entry((self.end_entry + 1).try_into().unwrap())
485            .map(|e| e.key())
486    }
487}
488
489enum DynamicCollectionType {
490    Inline,
491    // Was used in file format version 1
492    // Subtree,
493    SubtreeV2,
494}
495
496impl From<u8> for DynamicCollectionType {
497    fn from(value: u8) -> Self {
498        match value {
499            LEAF => Inline,
500            // 2 => Subtree,
501            3 => SubtreeV2,
502            _ => unreachable!(),
503        }
504    }
505}
506
507#[allow(clippy::from_over_into)]
508impl Into<u8> for DynamicCollectionType {
509    fn into(self) -> u8 {
510        match self {
511            // Reuse the LEAF type id, so that we can cast this directly into the format used by
512            // LeafAccessor
513            Inline => LEAF,
514            // Subtree => 2,
515            SubtreeV2 => 3,
516        }
517    }
518}
519
520/// Layout:
521/// type (1 byte):
522/// * 1 = inline data
523/// * 2 = sub tree
524///
525/// (when type = 1) data (n bytes): inlined leaf node
526///
527/// (when type = 2) root (8 bytes): sub tree root page number
528/// (when type = 2) checksum (16 bytes): sub tree checksum
529///
530/// NOTE: Even though the [`PhantomData`] is zero-sized, the inner data DST must be placed last.
531/// See [Exotically Sized Types](https://doc.rust-lang.org/nomicon/exotic-sizes.html#dynamically-sized-types-dsts)
532/// section of the Rustonomicon for more details.
533#[repr(transparent)]
534struct DynamicCollection<V: Key> {
535    _value_type: PhantomData<V>,
536    data: [u8],
537}
538
539impl<V: Key> std::fmt::Debug for DynamicCollection<V> {
540    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
541        f.debug_struct("DynamicCollection")
542            .field("data", &&self.data)
543            .finish()
544    }
545}
546
547impl<V: Key> Value for &DynamicCollection<V> {
548    type SelfType<'a>
549        = &'a DynamicCollection<V>
550    where
551        Self: 'a;
552    type AsBytes<'a>
553        = &'a [u8]
554    where
555        Self: 'a;
556
557    fn fixed_width() -> Option<usize> {
558        None
559    }
560
561    fn from_bytes<'a>(data: &'a [u8]) -> &'a DynamicCollection<V>
562    where
563        Self: 'a,
564    {
565        DynamicCollection::new(data)
566    }
567
568    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> &'a [u8]
569    where
570        Self: 'b,
571    {
572        &value.data
573    }
574
575    fn type_name() -> TypeName {
576        TypeName::internal("redb::DynamicCollection")
577    }
578}
579
580impl<V: Key> DynamicCollection<V> {
581    fn new(data: &[u8]) -> &Self {
582        unsafe { &*(std::ptr::from_ref::<[u8]>(data) as *const DynamicCollection<V>) }
583    }
584
585    fn collection_type(&self) -> DynamicCollectionType {
586        DynamicCollectionType::from(self.data[0])
587    }
588
589    fn as_inline(&self) -> &[u8] {
590        debug_assert!(matches!(self.collection_type(), Inline));
591        &self.data[1..]
592    }
593
594    fn as_subtree(&self) -> BtreeHeader {
595        assert!(matches!(self.collection_type(), SubtreeV2));
596        BtreeHeader::from_le_bytes(
597            self.data[1..=BtreeHeader::serialized_size()]
598                .try_into()
599                .unwrap(),
600        )
601    }
602
603    fn get_num_values(&self) -> u64 {
604        match self.collection_type() {
605            Inline => {
606                let leaf_data = self.as_inline();
607                let accessor =
608                    LeafAccessor::new(leaf_data, V::fixed_width(), <() as Value>::fixed_width());
609                accessor.num_pairs() as u64
610            }
611            SubtreeV2 => {
612                let offset = 1 + PageNumber::serialized_size() + size_of::<Checksum>();
613                u64::from_le_bytes(
614                    self.data[offset..(offset + size_of::<u64>())]
615                        .try_into()
616                        .unwrap(),
617                )
618            }
619        }
620    }
621
622    fn make_inline_data(data: &[u8]) -> Vec<u8> {
623        let mut result = vec![Inline.into()];
624        result.extend_from_slice(data);
625
626        result
627    }
628
629    fn make_subtree_data(header: BtreeHeader) -> Vec<u8> {
630        let mut result = vec![SubtreeV2.into()];
631        result.extend_from_slice(&header.to_le_bytes());
632        result
633    }
634
635    pub(crate) fn fixed_width_with(_value_width: Option<usize>) -> Option<usize> {
636        None
637    }
638}
639
640impl<V: Key> DynamicCollection<V> {
641    fn iter<'a>(
642        collection: AccessGuard<'a, &'static DynamicCollection<V>>,
643        guard: Arc<TransactionGuard>,
644        mem: Arc<TransactionalMemory>,
645    ) -> Result<MultimapValue<'a, V>> {
646        Ok(match collection.value().collection_type() {
647            Inline => {
648                let leaf_iter =
649                    LeafKeyIter::new(collection, V::fixed_width(), <() as Value>::fixed_width());
650                MultimapValue::new_inline(leaf_iter, guard)
651            }
652            SubtreeV2 => {
653                let root = collection.value().as_subtree().root;
654                MultimapValue::new_subtree(
655                    BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), Some(root), mem)?,
656                    collection.value().get_num_values(),
657                    guard,
658                )
659            }
660        })
661    }
662
663    fn iter_free_on_drop<'a>(
664        collection: AccessGuard<'a, &'static DynamicCollection<V>>,
665        pages: Vec<PageNumber>,
666        freed_pages: Arc<Mutex<Vec<PageNumber>>>,
667        allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
668        guard: Arc<TransactionGuard>,
669        mem: Arc<TransactionalMemory>,
670    ) -> Result<MultimapValue<'a, V>> {
671        let num_values = collection.value().get_num_values();
672        Ok(match collection.value().collection_type() {
673            Inline => {
674                let leaf_iter =
675                    LeafKeyIter::new(collection, V::fixed_width(), <() as Value>::fixed_width());
676                MultimapValue::new_inline(leaf_iter, guard)
677            }
678            SubtreeV2 => {
679                let root = collection.value().as_subtree().root;
680                let inner = BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(
681                    &(..),
682                    Some(root),
683                    mem.clone(),
684                )?;
685                MultimapValue::new_subtree_free_on_drop(
686                    inner,
687                    num_values,
688                    freed_pages,
689                    allocated_pages,
690                    pages,
691                    guard,
692                    mem,
693                )
694            }
695        })
696    }
697}
698
699#[repr(transparent)]
700struct UntypedDynamicCollection {
701    data: [u8],
702}
703
704impl UntypedDynamicCollection {
705    pub(crate) fn fixed_width_with(_value_width: Option<usize>) -> Option<usize> {
706        None
707    }
708
709    fn new(data: &[u8]) -> &Self {
710        unsafe { &*(std::ptr::from_ref::<[u8]>(data) as *const UntypedDynamicCollection) }
711    }
712
713    fn make_subtree_data(header: BtreeHeader) -> Vec<u8> {
714        let mut result = vec![SubtreeV2.into()];
715        result.extend_from_slice(&header.to_le_bytes());
716        result
717    }
718
719    fn from_bytes(data: &[u8]) -> &Self {
720        Self::new(data)
721    }
722
723    fn collection_type(&self) -> DynamicCollectionType {
724        DynamicCollectionType::from(self.data[0])
725    }
726
727    fn as_inline(&self) -> &[u8] {
728        debug_assert!(matches!(self.collection_type(), Inline));
729        &self.data[1..]
730    }
731
732    fn as_subtree(&self) -> BtreeHeader {
733        assert!(matches!(self.collection_type(), SubtreeV2));
734        BtreeHeader::from_le_bytes(
735            self.data[1..=BtreeHeader::serialized_size()]
736                .try_into()
737                .unwrap(),
738        )
739    }
740}
741
742enum ValueIterState<'a, V: Key + 'static> {
743    Subtree(BtreeRangeIter<V, ()>),
744    InlineLeaf(LeafKeyIter<'a, V>),
745}
746
747pub struct MultimapValue<'a, V: Key + 'static> {
748    inner: Option<ValueIterState<'a, V>>,
749    remaining: u64,
750    freed_pages: Option<Arc<Mutex<Vec<PageNumber>>>>,
751    allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
752    free_on_drop: Vec<PageNumber>,
753    _transaction_guard: Arc<TransactionGuard>,
754    mem: Option<Arc<TransactionalMemory>>,
755    _value_type: PhantomData<V>,
756}
757
758impl<'a, V: Key + 'static> MultimapValue<'a, V> {
759    fn new_subtree(
760        inner: BtreeRangeIter<V, ()>,
761        num_values: u64,
762        guard: Arc<TransactionGuard>,
763    ) -> Self {
764        Self {
765            inner: Some(ValueIterState::Subtree(inner)),
766            remaining: num_values,
767            freed_pages: None,
768            allocated_pages: Arc::new(Mutex::new(PageTrackerPolicy::Closed)),
769            free_on_drop: vec![],
770            _transaction_guard: guard,
771            mem: None,
772            _value_type: Default::default(),
773        }
774    }
775
776    fn new_subtree_free_on_drop(
777        inner: BtreeRangeIter<V, ()>,
778        num_values: u64,
779        freed_pages: Arc<Mutex<Vec<PageNumber>>>,
780        allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
781        pages: Vec<PageNumber>,
782        guard: Arc<TransactionGuard>,
783        mem: Arc<TransactionalMemory>,
784    ) -> Self {
785        Self {
786            inner: Some(ValueIterState::Subtree(inner)),
787            remaining: num_values,
788            freed_pages: Some(freed_pages),
789            free_on_drop: pages,
790            allocated_pages,
791            _transaction_guard: guard,
792            mem: Some(mem),
793            _value_type: Default::default(),
794        }
795    }
796
797    fn new_inline(inner: LeafKeyIter<'a, V>, guard: Arc<TransactionGuard>) -> Self {
798        let remaining = inner.inline_collection.value().get_num_values();
799        Self {
800            inner: Some(ValueIterState::InlineLeaf(inner)),
801            remaining,
802            freed_pages: None,
803            allocated_pages: Arc::new(Mutex::new(PageTrackerPolicy::Closed)),
804            free_on_drop: vec![],
805            _transaction_guard: guard,
806            mem: None,
807            _value_type: Default::default(),
808        }
809    }
810
811    /// Returns the number of times this iterator will return `Some(Ok(_))`
812    ///
813    /// Note that `Some` may be returned from `next()` more than `len()` times if `Some(Err(_))` is returned
814    pub fn len(&self) -> u64 {
815        self.remaining
816    }
817
818    pub fn is_empty(&self) -> bool {
819        self.len() == 0
820    }
821}
822
823impl<'a, V: Key + 'static> Iterator for MultimapValue<'a, V> {
824    type Item = Result<AccessGuard<'a, V>>;
825
826    fn next(&mut self) -> Option<Self::Item> {
827        // TODO: optimize out this copy
828        let bytes = match self.inner.as_mut().unwrap() {
829            ValueIterState::Subtree(iter) => match iter.next()? {
830                Ok(e) => e.key_data(),
831                Err(err) => {
832                    return Some(Err(err));
833                }
834            },
835            ValueIterState::InlineLeaf(iter) => iter.next_key()?.to_vec(),
836        };
837        self.remaining -= 1;
838        Some(Ok(AccessGuard::with_owned_value(bytes)))
839    }
840}
841
842impl<V: Key + 'static> DoubleEndedIterator for MultimapValue<'_, V> {
843    fn next_back(&mut self) -> Option<Self::Item> {
844        // TODO: optimize out this copy
845        let bytes = match self.inner.as_mut().unwrap() {
846            ValueIterState::Subtree(iter) => match iter.next_back()? {
847                Ok(e) => e.key_data(),
848                Err(err) => {
849                    return Some(Err(err));
850                }
851            },
852            ValueIterState::InlineLeaf(iter) => iter.next_key_back()?.to_vec(),
853        };
854        Some(Ok(AccessGuard::with_owned_value(bytes)))
855    }
856}
857
858impl<V: Key + 'static> Drop for MultimapValue<'_, V> {
859    fn drop(&mut self) {
860        // Drop our references to the pages that are about to be freed
861        drop(mem::take(&mut self.inner));
862        if !self.free_on_drop.is_empty() {
863            let mut freed_pages = self.freed_pages.as_ref().unwrap().lock().unwrap();
864            let mut allocated_pages = self.allocated_pages.lock().unwrap();
865            for page in &self.free_on_drop {
866                if !self
867                    .mem
868                    .as_ref()
869                    .unwrap()
870                    .free_if_uncommitted(*page, &mut allocated_pages)
871                {
872                    freed_pages.push(*page);
873                }
874            }
875        }
876    }
877}
878
879pub struct MultimapRange<'a, K: Key + 'static, V: Key + 'static> {
880    inner: BtreeRangeIter<K, &'static DynamicCollection<V>>,
881    mem: Arc<TransactionalMemory>,
882    transaction_guard: Arc<TransactionGuard>,
883    _key_type: PhantomData<K>,
884    _value_type: PhantomData<V>,
885    _lifetime: PhantomData<&'a ()>,
886}
887
888impl<K: Key + 'static, V: Key + 'static> MultimapRange<'_, K, V> {
889    fn new(
890        inner: BtreeRangeIter<K, &'static DynamicCollection<V>>,
891        guard: Arc<TransactionGuard>,
892        mem: Arc<TransactionalMemory>,
893    ) -> Self {
894        Self {
895            inner,
896            mem,
897            transaction_guard: guard,
898            _key_type: Default::default(),
899            _value_type: Default::default(),
900            _lifetime: Default::default(),
901        }
902    }
903}
904
905impl<'a, K: Key + 'static, V: Key + 'static> Iterator for MultimapRange<'a, K, V> {
906    type Item = Result<(AccessGuard<'a, K>, MultimapValue<'a, V>)>;
907
908    fn next(&mut self) -> Option<Self::Item> {
909        match self.inner.next()? {
910            Ok(entry) => {
911                let key = AccessGuard::with_owned_value(entry.key_data());
912                let (page, _, value_range) = entry.into_raw();
913                let collection = AccessGuard::with_page(page, value_range);
914                Some(
915                    DynamicCollection::iter(
916                        collection,
917                        self.transaction_guard.clone(),
918                        self.mem.clone(),
919                    )
920                    .map(|iter| (key, iter)),
921                )
922            }
923            Err(err) => Some(Err(err)),
924        }
925    }
926}
927
928impl<K: Key + 'static, V: Key + 'static> DoubleEndedIterator for MultimapRange<'_, K, V> {
929    fn next_back(&mut self) -> Option<Self::Item> {
930        match self.inner.next_back()? {
931            Ok(entry) => {
932                let key = AccessGuard::with_owned_value(entry.key_data());
933                let (page, _, value_range) = entry.into_raw();
934                let collection = AccessGuard::with_page(page, value_range);
935                Some(
936                    DynamicCollection::iter(
937                        collection,
938                        self.transaction_guard.clone(),
939                        self.mem.clone(),
940                    )
941                    .map(|iter| (key, iter)),
942                )
943            }
944            Err(err) => Some(Err(err)),
945        }
946    }
947}
948
949/// A multimap table
950///
951/// [Multimap tables](https://en.wikipedia.org/wiki/Multimap) may have multiple values associated with each key
952pub struct MultimapTable<'txn, K: Key + 'static, V: Key + 'static> {
953    name: String,
954    num_values: u64,
955    transaction: &'txn WriteTransaction,
956    freed_pages: Arc<Mutex<Vec<PageNumber>>>,
957    allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
958    tree: BtreeMut<'txn, K, &'static DynamicCollection<V>>,
959    mem: Arc<TransactionalMemory>,
960    _value_type: PhantomData<V>,
961}
962
963impl<K: Key + 'static, V: Key + 'static> MultimapTableHandle for MultimapTable<'_, K, V> {
964    fn name(&self) -> &str {
965        &self.name
966    }
967}
968
969impl<'txn, K: Key + 'static, V: Key + 'static> MultimapTable<'txn, K, V> {
970    pub(crate) fn new(
971        name: &str,
972        table_root: Option<BtreeHeader>,
973        num_values: u64,
974        freed_pages: Arc<Mutex<Vec<PageNumber>>>,
975        allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
976        mem: Arc<TransactionalMemory>,
977        transaction: &'txn WriteTransaction,
978    ) -> MultimapTable<'txn, K, V> {
979        MultimapTable {
980            name: name.to_string(),
981            num_values,
982            transaction,
983            freed_pages: freed_pages.clone(),
984            allocated_pages: allocated_pages.clone(),
985            tree: BtreeMut::new(
986                table_root,
987                transaction.transaction_guard(),
988                mem.clone(),
989                freed_pages,
990                allocated_pages,
991            ),
992            mem,
993            _value_type: Default::default(),
994        }
995    }
996
997    #[allow(dead_code)]
998    pub(crate) fn print_debug(&self, include_values: bool) -> Result {
999        self.tree.print_debug(include_values)
1000    }
1001
1002    /// Add the given value to the mapping of the key
1003    ///
1004    /// Returns `true` if the key-value pair was present
1005    pub fn insert<'k, 'v>(
1006        &mut self,
1007        key: impl Borrow<K::SelfType<'k>>,
1008        value: impl Borrow<V::SelfType<'v>>,
1009    ) -> Result<bool> {
1010        let value_bytes = V::as_bytes(value.borrow());
1011        let value_bytes_ref = value_bytes.as_ref();
1012        if value_bytes_ref.len() > MAX_VALUE_LENGTH {
1013            return Err(StorageError::ValueTooLarge(value_bytes_ref.len()));
1014        }
1015        let key_len = K::as_bytes(key.borrow()).as_ref().len();
1016        if key_len > MAX_VALUE_LENGTH {
1017            return Err(StorageError::ValueTooLarge(key_len));
1018        }
1019        if value_bytes_ref.len() + key_len > MAX_PAIR_LENGTH {
1020            return Err(StorageError::ValueTooLarge(value_bytes_ref.len() + key_len));
1021        }
1022        let get_result = self.tree.get(key.borrow())?;
1023        let existed = if get_result.is_some() {
1024            #[allow(clippy::unnecessary_unwrap)]
1025            let guard = get_result.unwrap();
1026            let collection_type = guard.value().collection_type();
1027            match collection_type {
1028                Inline => {
1029                    let leaf_data = guard.value().as_inline();
1030                    let accessor = LeafAccessor::new(
1031                        leaf_data,
1032                        V::fixed_width(),
1033                        <() as Value>::fixed_width(),
1034                    );
1035                    let (position, found) = accessor.position::<V>(value_bytes_ref);
1036                    if found {
1037                        return Ok(true);
1038                    }
1039
1040                    let num_pairs = accessor.num_pairs();
1041                    let new_pairs = num_pairs + 1;
1042                    let new_pair_bytes =
1043                        accessor.length_of_pairs(0, accessor.num_pairs()) + value_bytes_ref.len();
1044                    let new_key_bytes =
1045                        accessor.length_of_keys(0, accessor.num_pairs()) + value_bytes_ref.len();
1046                    let required_inline_bytes = RawLeafBuilder::required_bytes(
1047                        new_pairs,
1048                        new_pair_bytes,
1049                        V::fixed_width(),
1050                        <() as Value>::fixed_width(),
1051                    );
1052
1053                    if required_inline_bytes < self.mem.get_page_size() / 2 {
1054                        let mut data = vec![0; required_inline_bytes];
1055                        let mut builder = RawLeafBuilder::new(
1056                            &mut data,
1057                            new_pairs,
1058                            V::fixed_width(),
1059                            <() as Value>::fixed_width(),
1060                            new_key_bytes,
1061                        );
1062                        for i in 0..accessor.num_pairs() {
1063                            if i == position {
1064                                builder
1065                                    .append(value_bytes_ref, <() as Value>::as_bytes(&()).as_ref());
1066                            }
1067                            let entry = accessor.entry(i).unwrap();
1068                            builder.append(entry.key(), entry.value());
1069                        }
1070                        if position == accessor.num_pairs() {
1071                            builder.append(value_bytes_ref, <() as Value>::as_bytes(&()).as_ref());
1072                        }
1073                        drop(builder);
1074                        drop(guard);
1075                        let inline_data = DynamicCollection::<V>::make_inline_data(&data);
1076                        self.tree
1077                            .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1078                    } else {
1079                        // convert into a subtree
1080                        let mut allocated = self.allocated_pages.lock().unwrap();
1081                        let mut page = self.mem.allocate(leaf_data.len(), &mut allocated)?;
1082                        drop(allocated);
1083                        page.memory_mut()[..leaf_data.len()].copy_from_slice(leaf_data);
1084                        let page_number = page.get_page_number();
1085                        drop(page);
1086                        drop(guard);
1087
1088                        // Don't bother computing the checksum, since we're about to modify the tree
1089                        let mut subtree: BtreeMut<'_, V, ()> = BtreeMut::new(
1090                            Some(BtreeHeader::new(page_number, 0, num_pairs as u64)),
1091                            self.transaction.transaction_guard(),
1092                            self.mem.clone(),
1093                            self.freed_pages.clone(),
1094                            self.allocated_pages.clone(),
1095                        );
1096                        let existed = subtree.insert(value.borrow(), &())?.is_some();
1097                        assert_eq!(existed, found);
1098                        let subtree_data =
1099                            DynamicCollection::<V>::make_subtree_data(subtree.get_root().unwrap());
1100                        self.tree
1101                            .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1102                    }
1103
1104                    found
1105                }
1106                SubtreeV2 => {
1107                    let mut subtree: BtreeMut<'_, V, ()> = BtreeMut::new(
1108                        Some(guard.value().as_subtree()),
1109                        self.transaction.transaction_guard(),
1110                        self.mem.clone(),
1111                        self.freed_pages.clone(),
1112                        self.allocated_pages.clone(),
1113                    );
1114                    drop(guard);
1115                    let existed = subtree.insert(value.borrow(), &())?.is_some();
1116                    let subtree_data =
1117                        DynamicCollection::<V>::make_subtree_data(subtree.get_root().unwrap());
1118                    self.tree
1119                        .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1120
1121                    existed
1122                }
1123            }
1124        } else {
1125            drop(get_result);
1126            let required_inline_bytes = RawLeafBuilder::required_bytes(
1127                1,
1128                value_bytes_ref.len(),
1129                V::fixed_width(),
1130                <() as Value>::fixed_width(),
1131            );
1132            if required_inline_bytes < self.mem.get_page_size() / 2 {
1133                let mut data = vec![0; required_inline_bytes];
1134                let mut builder = RawLeafBuilder::new(
1135                    &mut data,
1136                    1,
1137                    V::fixed_width(),
1138                    <() as Value>::fixed_width(),
1139                    value_bytes_ref.len(),
1140                );
1141                builder.append(value_bytes_ref, <() as Value>::as_bytes(&()).as_ref());
1142                drop(builder);
1143                let inline_data = DynamicCollection::<V>::make_inline_data(&data);
1144                self.tree
1145                    .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1146            } else {
1147                let mut subtree: BtreeMut<'_, V, ()> = BtreeMut::new(
1148                    None,
1149                    self.transaction.transaction_guard(),
1150                    self.mem.clone(),
1151                    self.freed_pages.clone(),
1152                    self.allocated_pages.clone(),
1153                );
1154                subtree.insert(value.borrow(), &())?;
1155                let subtree_data =
1156                    DynamicCollection::<V>::make_subtree_data(subtree.get_root().unwrap());
1157                self.tree
1158                    .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1159            }
1160            false
1161        };
1162
1163        if !existed {
1164            self.num_values += 1;
1165        }
1166
1167        Ok(existed)
1168    }
1169
1170    /// Removes the given key-value pair
1171    ///
1172    /// Returns `true` if the key-value pair was present
1173    pub fn remove<'k, 'v>(
1174        &mut self,
1175        key: impl Borrow<K::SelfType<'k>>,
1176        value: impl Borrow<V::SelfType<'v>>,
1177    ) -> Result<bool> {
1178        let get_result = self.tree.get(key.borrow())?;
1179        if get_result.is_none() {
1180            return Ok(false);
1181        }
1182        let guard = get_result.unwrap();
1183        let v = guard.value();
1184        let existed = match v.collection_type() {
1185            Inline => {
1186                let leaf_data = v.as_inline();
1187                let accessor =
1188                    LeafAccessor::new(leaf_data, V::fixed_width(), <() as Value>::fixed_width());
1189                if let Some(position) = accessor.find_key::<V>(V::as_bytes(value.borrow()).as_ref())
1190                {
1191                    let old_num_pairs = accessor.num_pairs();
1192                    if old_num_pairs == 1 {
1193                        drop(guard);
1194                        self.tree.remove(key.borrow())?;
1195                    } else {
1196                        let old_pairs_len = accessor.length_of_pairs(0, old_num_pairs);
1197                        let removed_value_len = accessor.entry(position).unwrap().key().len();
1198                        let required = RawLeafBuilder::required_bytes(
1199                            old_num_pairs - 1,
1200                            old_pairs_len - removed_value_len,
1201                            V::fixed_width(),
1202                            <() as Value>::fixed_width(),
1203                        );
1204                        let mut new_data = vec![0; required];
1205                        let new_key_len =
1206                            accessor.length_of_keys(0, old_num_pairs) - removed_value_len;
1207                        let mut builder = RawLeafBuilder::new(
1208                            &mut new_data,
1209                            old_num_pairs - 1,
1210                            V::fixed_width(),
1211                            <() as Value>::fixed_width(),
1212                            new_key_len,
1213                        );
1214                        for i in 0..old_num_pairs {
1215                            if i != position {
1216                                let entry = accessor.entry(i).unwrap();
1217                                builder.append(entry.key(), entry.value());
1218                            }
1219                        }
1220                        drop(builder);
1221                        drop(guard);
1222
1223                        let inline_data = DynamicCollection::<V>::make_inline_data(&new_data);
1224                        self.tree
1225                            .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1226                    }
1227                    true
1228                } else {
1229                    drop(guard);
1230                    false
1231                }
1232            }
1233            SubtreeV2 => {
1234                let mut subtree: BtreeMut<V, ()> = BtreeMut::new(
1235                    Some(v.as_subtree()),
1236                    self.transaction.transaction_guard(),
1237                    self.mem.clone(),
1238                    self.freed_pages.clone(),
1239                    self.allocated_pages.clone(),
1240                );
1241                drop(guard);
1242                let existed = subtree.remove(value.borrow())?.is_some();
1243
1244                if let Some(BtreeHeader {
1245                    root: new_root,
1246                    checksum: new_checksum,
1247                    length: new_length,
1248                }) = subtree.get_root()
1249                {
1250                    let page = self.mem.get_page(new_root)?;
1251                    match page.memory()[0] {
1252                        LEAF => {
1253                            let accessor = LeafAccessor::new(
1254                                page.memory(),
1255                                V::fixed_width(),
1256                                <() as Value>::fixed_width(),
1257                            );
1258                            let len = accessor.total_length();
1259                            if len < self.mem.get_page_size() / 2 {
1260                                let inline_data =
1261                                    DynamicCollection::<V>::make_inline_data(&page.memory()[..len]);
1262                                self.tree
1263                                    .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1264                                drop(page);
1265                                let mut allocated_pages = self.allocated_pages.lock().unwrap();
1266                                if !self.mem.free_if_uncommitted(new_root, &mut allocated_pages) {
1267                                    (*self.freed_pages).lock().unwrap().push(new_root);
1268                                }
1269                            } else {
1270                                let subtree_data =
1271                                    DynamicCollection::<V>::make_subtree_data(BtreeHeader::new(
1272                                        new_root,
1273                                        new_checksum,
1274                                        accessor.num_pairs() as u64,
1275                                    ));
1276                                self.tree
1277                                    .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1278                            }
1279                        }
1280                        BRANCH => {
1281                            let subtree_data = DynamicCollection::<V>::make_subtree_data(
1282                                BtreeHeader::new(new_root, new_checksum, new_length),
1283                            );
1284                            self.tree
1285                                .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1286                        }
1287                        _ => unreachable!(),
1288                    }
1289                } else {
1290                    self.tree.remove(key.borrow())?;
1291                }
1292
1293                existed
1294            }
1295        };
1296
1297        if existed {
1298            self.num_values -= 1;
1299        }
1300
1301        Ok(existed)
1302    }
1303
1304    /// Removes all values for the given key
1305    ///
1306    /// Returns an iterator over the removed values. Values are in ascending order.
1307    pub fn remove_all<'a>(
1308        &mut self,
1309        key: impl Borrow<K::SelfType<'a>>,
1310    ) -> Result<MultimapValue<V>> {
1311        let iter = if let Some(collection) = self.tree.remove(key.borrow())? {
1312            let mut pages = vec![];
1313            if matches!(
1314                collection.value().collection_type(),
1315                DynamicCollectionType::SubtreeV2
1316            ) {
1317                let root = collection.value().as_subtree().root;
1318                let all_pages = AllPageNumbersBtreeIter::new(
1319                    root,
1320                    V::fixed_width(),
1321                    <() as Value>::fixed_width(),
1322                    self.mem.clone(),
1323                )?;
1324                for page in all_pages {
1325                    pages.push(page?);
1326                }
1327            }
1328
1329            self.num_values -= collection.value().get_num_values();
1330
1331            DynamicCollection::iter_free_on_drop(
1332                collection,
1333                pages,
1334                self.freed_pages.clone(),
1335                self.allocated_pages.clone(),
1336                self.transaction.transaction_guard(),
1337                self.mem.clone(),
1338            )?
1339        } else {
1340            MultimapValue::new_subtree(
1341                BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), None, self.mem.clone())?,
1342                0,
1343                self.transaction.transaction_guard(),
1344            )
1345        };
1346
1347        Ok(iter)
1348    }
1349}
1350
1351impl<K: Key + 'static, V: Key + 'static> ReadableTableMetadata for MultimapTable<'_, K, V> {
1352    fn stats(&self) -> Result<TableStats> {
1353        let tree_stats = multimap_btree_stats(
1354            self.tree.get_root().map(|x| x.root),
1355            &self.mem,
1356            K::fixed_width(),
1357            V::fixed_width(),
1358        )?;
1359
1360        Ok(TableStats {
1361            tree_height: tree_stats.tree_height,
1362            leaf_pages: tree_stats.leaf_pages,
1363            branch_pages: tree_stats.branch_pages,
1364            stored_leaf_bytes: tree_stats.stored_leaf_bytes,
1365            metadata_bytes: tree_stats.metadata_bytes,
1366            fragmented_bytes: tree_stats.fragmented_bytes,
1367        })
1368    }
1369
1370    /// Returns the number of key-value pairs in the table
1371    fn len(&self) -> Result<u64> {
1372        Ok(self.num_values)
1373    }
1374}
1375
1376impl<K: Key + 'static, V: Key + 'static> ReadableMultimapTable<K, V> for MultimapTable<'_, K, V> {
1377    /// Returns an iterator over all values for the given key. Values are in ascending order.
1378    fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<V>> {
1379        let guard = self.transaction.transaction_guard();
1380        let iter = if let Some(collection) = self.tree.get(key.borrow())? {
1381            DynamicCollection::iter(collection, guard, self.mem.clone())?
1382        } else {
1383            MultimapValue::new_subtree(
1384                BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), None, self.mem.clone())?,
1385                0,
1386                guard,
1387            )
1388        };
1389
1390        Ok(iter)
1391    }
1392
1393    /// Returns a double-ended iterator over a range of elements in the table
1394    fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<MultimapRange<K, V>>
1395    where
1396        KR: Borrow<K::SelfType<'a>> + 'a,
1397    {
1398        let inner = self.tree.range(&range)?;
1399        Ok(MultimapRange::new(
1400            inner,
1401            self.transaction.transaction_guard(),
1402            self.mem.clone(),
1403        ))
1404    }
1405}
1406
1407impl<K: Key + 'static, V: Key + 'static> Sealed for MultimapTable<'_, K, V> {}
1408
1409impl<K: Key + 'static, V: Key + 'static> Drop for MultimapTable<'_, K, V> {
1410    fn drop(&mut self) {
1411        self.transaction
1412            .close_table(&self.name, &self.tree, self.num_values);
1413    }
1414}
1415
1416pub trait ReadableMultimapTable<K: Key + 'static, V: Key + 'static>: ReadableTableMetadata {
1417    /// Returns an iterator over all values for the given key. Values are in ascending order.
1418    fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<V>>;
1419
1420    fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<MultimapRange<K, V>>
1421    where
1422        KR: Borrow<K::SelfType<'a>> + 'a;
1423
1424    /// Returns an double-ended iterator over all elements in the table. Values are in ascending
1425    /// order.
1426    fn iter(&self) -> Result<MultimapRange<K, V>> {
1427        self.range::<K::SelfType<'_>>(..)
1428    }
1429}
1430
1431/// A read-only untyped multimap table
1432pub struct ReadOnlyUntypedMultimapTable {
1433    num_values: u64,
1434    tree: RawBtree,
1435    fixed_key_size: Option<usize>,
1436    fixed_value_size: Option<usize>,
1437    mem: Arc<TransactionalMemory>,
1438}
1439
1440impl Sealed for ReadOnlyUntypedMultimapTable {}
1441
1442impl ReadableTableMetadata for ReadOnlyUntypedMultimapTable {
1443    /// Retrieves information about storage usage for the table
1444    fn stats(&self) -> Result<TableStats> {
1445        let tree_stats = multimap_btree_stats(
1446            self.tree.get_root().map(|x| x.root),
1447            &self.mem,
1448            self.fixed_key_size,
1449            self.fixed_value_size,
1450        )?;
1451
1452        Ok(TableStats {
1453            tree_height: tree_stats.tree_height,
1454            leaf_pages: tree_stats.leaf_pages,
1455            branch_pages: tree_stats.branch_pages,
1456            stored_leaf_bytes: tree_stats.stored_leaf_bytes,
1457            metadata_bytes: tree_stats.metadata_bytes,
1458            fragmented_bytes: tree_stats.fragmented_bytes,
1459        })
1460    }
1461
1462    fn len(&self) -> Result<u64> {
1463        Ok(self.num_values)
1464    }
1465}
1466
1467impl ReadOnlyUntypedMultimapTable {
1468    pub(crate) fn new(
1469        root: Option<BtreeHeader>,
1470        num_values: u64,
1471        fixed_key_size: Option<usize>,
1472        fixed_value_size: Option<usize>,
1473        mem: Arc<TransactionalMemory>,
1474    ) -> Self {
1475        Self {
1476            num_values,
1477            tree: RawBtree::new(
1478                root,
1479                fixed_key_size,
1480                DynamicCollection::<()>::fixed_width_with(fixed_value_size),
1481                mem.clone(),
1482            ),
1483            fixed_key_size,
1484            fixed_value_size,
1485            mem,
1486        }
1487    }
1488}
1489
1490/// A read-only multimap table
1491pub struct ReadOnlyMultimapTable<K: Key + 'static, V: Key + 'static> {
1492    tree: Btree<K, &'static DynamicCollection<V>>,
1493    num_values: u64,
1494    mem: Arc<TransactionalMemory>,
1495    transaction_guard: Arc<TransactionGuard>,
1496    _value_type: PhantomData<V>,
1497}
1498
1499impl<K: Key + 'static, V: Key + 'static> ReadOnlyMultimapTable<K, V> {
1500    pub(crate) fn new(
1501        root: Option<BtreeHeader>,
1502        num_values: u64,
1503        hint: PageHint,
1504        guard: Arc<TransactionGuard>,
1505        mem: Arc<TransactionalMemory>,
1506    ) -> Result<ReadOnlyMultimapTable<K, V>> {
1507        Ok(ReadOnlyMultimapTable {
1508            tree: Btree::new(root, hint, guard.clone(), mem.clone())?,
1509            num_values,
1510            mem,
1511            transaction_guard: guard,
1512            _value_type: Default::default(),
1513        })
1514    }
1515
1516    /// This method is like [`ReadableMultimapTable::get()`], but the iterator is reference counted and keeps the transaction
1517    /// alive until it is dropped.
1518    pub fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<'static, V>> {
1519        let iter = if let Some(collection) = self.tree.get(key.borrow())? {
1520            DynamicCollection::iter(collection, self.transaction_guard.clone(), self.mem.clone())?
1521        } else {
1522            MultimapValue::new_subtree(
1523                BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), None, self.mem.clone())?,
1524                0,
1525                self.transaction_guard.clone(),
1526            )
1527        };
1528
1529        Ok(iter)
1530    }
1531
1532    /// This method is like [`ReadableMultimapTable::range()`], but the iterator is reference counted and keeps the transaction
1533    /// alive until it is dropped.
1534    pub fn range<'a, KR>(&self, range: impl RangeBounds<KR>) -> Result<MultimapRange<'static, K, V>>
1535    where
1536        KR: Borrow<K::SelfType<'a>>,
1537    {
1538        let inner = self.tree.range(&range)?;
1539        Ok(MultimapRange::new(
1540            inner,
1541            self.transaction_guard.clone(),
1542            self.mem.clone(),
1543        ))
1544    }
1545}
1546
1547impl<K: Key + 'static, V: Key + 'static> ReadableTableMetadata for ReadOnlyMultimapTable<K, V> {
1548    fn stats(&self) -> Result<TableStats> {
1549        let tree_stats = multimap_btree_stats(
1550            self.tree.get_root().map(|x| x.root),
1551            &self.mem,
1552            K::fixed_width(),
1553            V::fixed_width(),
1554        )?;
1555
1556        Ok(TableStats {
1557            tree_height: tree_stats.tree_height,
1558            leaf_pages: tree_stats.leaf_pages,
1559            branch_pages: tree_stats.branch_pages,
1560            stored_leaf_bytes: tree_stats.stored_leaf_bytes,
1561            metadata_bytes: tree_stats.metadata_bytes,
1562            fragmented_bytes: tree_stats.fragmented_bytes,
1563        })
1564    }
1565
1566    fn len(&self) -> Result<u64> {
1567        Ok(self.num_values)
1568    }
1569}
1570
1571impl<K: Key + 'static, V: Key + 'static> ReadableMultimapTable<K, V>
1572    for ReadOnlyMultimapTable<K, V>
1573{
1574    /// Returns an iterator over all values for the given key. Values are in ascending order.
1575    fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<V>> {
1576        let iter = if let Some(collection) = self.tree.get(key.borrow())? {
1577            DynamicCollection::iter(collection, self.transaction_guard.clone(), self.mem.clone())?
1578        } else {
1579            MultimapValue::new_subtree(
1580                BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), None, self.mem.clone())?,
1581                0,
1582                self.transaction_guard.clone(),
1583            )
1584        };
1585
1586        Ok(iter)
1587    }
1588
1589    fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<MultimapRange<K, V>>
1590    where
1591        KR: Borrow<K::SelfType<'a>> + 'a,
1592    {
1593        let inner = self.tree.range(&range)?;
1594        Ok(MultimapRange::new(
1595            inner,
1596            self.transaction_guard.clone(),
1597            self.mem.clone(),
1598        ))
1599    }
1600}
1601
1602impl<K: Key, V: Key> Sealed for ReadOnlyMultimapTable<K, V> {}