redb_32bit/
multimap_table.rs

1use crate::multimap_table::DynamicCollectionType::{Inline, Subtree};
2use crate::sealed::Sealed;
3use crate::table::TableStats;
4use crate::tree_store::{
5    btree_stats, AllPageNumbersBtreeIter, BranchAccessor, Btree, BtreeMut, BtreeRangeIter,
6    BtreeStats, CachePriority, Checksum, LeafAccessor, LeafMutator, Page, PageHint, PageNumber,
7    RawBtree, RawLeafBuilder, TransactionalMemory, UntypedBtreeMut, BRANCH, LEAF, MAX_VALUE_LENGTH,
8};
9use crate::types::{RedbKey, RedbValue, TypeName};
10use crate::{AccessGuard, MultimapTableHandle, Result, StorageError, WriteTransaction};
11use std::borrow::Borrow;
12use std::cmp::max;
13use std::convert::TryInto;
14use std::marker::PhantomData;
15use std::mem;
16use std::mem::size_of;
17use std::ops::{RangeBounds, RangeFull};
18use std::sync::{Arc, Mutex};
19
20pub(crate) fn multimap_btree_stats(
21    root: Option<PageNumber>,
22    mem: &TransactionalMemory,
23    fixed_key_size: Option<usize>,
24    fixed_value_size: Option<usize>,
25) -> Result<BtreeStats> {
26    if let Some(root) = root {
27        multimap_stats_helper(root, mem, fixed_key_size, fixed_value_size)
28    } else {
29        Ok(BtreeStats {
30            tree_height: 0,
31            leaf_pages: 0,
32            branch_pages: 0,
33            stored_leaf_bytes: 0,
34            metadata_bytes: 0,
35            fragmented_bytes: 0,
36        })
37    }
38}
39
40fn multimap_stats_helper(
41    page_number: PageNumber,
42    mem: &TransactionalMemory,
43    fixed_key_size: Option<usize>,
44    fixed_value_size: Option<usize>,
45) -> Result<BtreeStats> {
46    let page = mem.get_page(page_number)?;
47    let node_mem = page.memory();
48    match node_mem[0] {
49        LEAF => {
50            let accessor = LeafAccessor::new(
51                page.memory(),
52                fixed_key_size,
53                DynamicCollection::<()>::fixed_width_with(fixed_value_size),
54            );
55            let mut leaf_bytes = 0u64;
56            let mut is_branch = false;
57            for i in 0..accessor.num_pairs() {
58                let entry = accessor.entry(i).unwrap();
59                let collection: &UntypedDynamicCollection =
60                    UntypedDynamicCollection::new(entry.value());
61                match collection.collection_type() {
62                    Inline => {
63                        let inline_accessor = LeafAccessor::new(
64                            collection.as_inline(),
65                            fixed_value_size,
66                            <() as RedbValue>::fixed_width(),
67                        );
68                        leaf_bytes +=
69                            inline_accessor.length_of_pairs(0, inline_accessor.num_pairs()) as u64;
70                    }
71                    Subtree => {
72                        is_branch = true;
73                    }
74                }
75            }
76            let mut overhead_bytes = (accessor.total_length() as u64) - leaf_bytes;
77            let mut fragmented_bytes = (page.memory().len() - accessor.total_length()) as u64;
78            let mut max_child_height = 0;
79            let (mut leaf_pages, mut branch_pages) = if is_branch { (0, 1) } else { (1, 0) };
80
81            for i in 0..accessor.num_pairs() {
82                let entry = accessor.entry(i).unwrap();
83                let collection: &UntypedDynamicCollection =
84                    UntypedDynamicCollection::new(entry.value());
85                match collection.collection_type() {
86                    Inline => {
87                        // data is inline, so it was already counted above
88                    }
89                    Subtree => {
90                        // this is a sub-tree, so traverse it
91                        let stats = btree_stats(
92                            Some(collection.as_subtree().0),
93                            mem,
94                            fixed_value_size,
95                            <() as RedbValue>::fixed_width(),
96                        )?;
97                        max_child_height = max(max_child_height, stats.tree_height);
98                        branch_pages += stats.branch_pages;
99                        leaf_pages += stats.leaf_pages;
100                        fragmented_bytes += stats.fragmented_bytes;
101                        overhead_bytes += stats.metadata_bytes;
102                        leaf_bytes += stats.stored_leaf_bytes;
103                    }
104                }
105            }
106
107            Ok(BtreeStats {
108                tree_height: max_child_height + 1,
109                leaf_pages,
110                branch_pages,
111                stored_leaf_bytes: leaf_bytes,
112                metadata_bytes: overhead_bytes,
113                fragmented_bytes,
114            })
115        }
116        BRANCH => {
117            let accessor = BranchAccessor::new(&page, fixed_key_size);
118            let mut max_child_height = 0;
119            let mut leaf_pages = 0;
120            let mut branch_pages = 1;
121            let mut stored_leaf_bytes = 0;
122            let mut metadata_bytes = accessor.total_length() as u64;
123            let mut fragmented_bytes = (page.memory().len() - accessor.total_length()) as u64;
124            for i in 0..accessor.count_children() {
125                if let Some(child) = accessor.child_page(i) {
126                    let stats =
127                        multimap_stats_helper(child, mem, fixed_key_size, fixed_value_size)?;
128                    max_child_height = max(max_child_height, stats.tree_height);
129                    leaf_pages += stats.leaf_pages;
130                    branch_pages += stats.branch_pages;
131                    stored_leaf_bytes += stats.stored_leaf_bytes;
132                    metadata_bytes += stats.metadata_bytes;
133                    fragmented_bytes += stats.fragmented_bytes;
134                }
135            }
136
137            Ok(BtreeStats {
138                tree_height: max_child_height + 1,
139                leaf_pages,
140                branch_pages,
141                stored_leaf_bytes,
142                metadata_bytes,
143                fragmented_bytes,
144            })
145        }
146        _ => unreachable!(),
147    }
148}
149
150// Verify all the checksums in the tree, including any Dynamic collection subtrees
151pub(crate) fn verify_tree_and_subtree_checksums(
152    root: Option<(PageNumber, Checksum)>,
153    key_size: Option<usize>,
154    value_size: Option<usize>,
155    mem: &TransactionalMemory,
156) -> Result<bool> {
157    if let Some((root, root_checksum)) = root {
158        if !RawBtree::new(
159            Some((root, root_checksum)),
160            key_size,
161            DynamicCollection::<()>::fixed_width_with(value_size),
162            mem,
163        )
164        .verify_checksum()?
165        {
166            return Ok(false);
167        }
168
169        let table_pages_iter = AllPageNumbersBtreeIter::new(
170            root,
171            key_size,
172            DynamicCollection::<()>::fixed_width_with(value_size),
173            mem,
174        )?;
175        for table_page in table_pages_iter {
176            let page = mem.get_page(table_page?)?;
177            let subtree_roots = parse_subtree_roots(&page, key_size, value_size);
178            for (sub_root, sub_root_checksum) in subtree_roots {
179                if !RawBtree::new(
180                    Some((sub_root, sub_root_checksum)),
181                    value_size,
182                    <()>::fixed_width(),
183                    mem,
184                )
185                .verify_checksum()?
186                {
187                    return Ok(false);
188                }
189            }
190        }
191    }
192
193    Ok(true)
194}
195
196// Finalize all the checksums in the tree, including any Dynamic collection subtrees
197// Returns the root checksum
198pub(crate) fn finalize_tree_and_subtree_checksums(
199    root: Option<(PageNumber, Checksum)>,
200    key_size: Option<usize>,
201    value_size: Option<usize>,
202    mem: &TransactionalMemory,
203) -> Result<Option<(PageNumber, Checksum)>> {
204    let freed_pages = Arc::new(Mutex::new(vec![]));
205    let mut tree = UntypedBtreeMut::new(
206        root,
207        mem,
208        freed_pages.clone(),
209        key_size,
210        DynamicCollection::<()>::fixed_width_with(value_size),
211    );
212    tree.dirty_leaf_visitor(|mut leaf_page| {
213        let mut sub_root_updates = vec![];
214        let accessor = LeafAccessor::new(
215            leaf_page.memory(),
216            key_size,
217            DynamicCollection::<()>::fixed_width_with(value_size),
218        );
219        for i in 0..accessor.num_pairs() {
220            let entry = accessor.entry(i).unwrap();
221            let collection = <&DynamicCollection<()>>::from_bytes(entry.value());
222            if matches!(collection.collection_type(), DynamicCollectionType::Subtree) {
223                let sub_root = collection.as_subtree();
224                if mem.uncommitted(sub_root.0) {
225                    let mut subtree = UntypedBtreeMut::new(
226                        Some(sub_root),
227                        mem,
228                        freed_pages.clone(),
229                        value_size,
230                        <()>::fixed_width(),
231                    );
232                    subtree.finalize_dirty_checksums()?;
233                    sub_root_updates.push((i, entry.key().to_vec(), subtree.get_root().unwrap()));
234                }
235            }
236        }
237        drop(accessor);
238        // TODO: maybe there's a better abstraction, so that we don't need to call into this low-level method?
239        let mut mutator = LeafMutator::new(
240            &mut leaf_page,
241            key_size,
242            DynamicCollection::<()>::fixed_width_with(value_size),
243        );
244        for (i, key, (sub_root, checksum)) in sub_root_updates {
245            let collection = DynamicCollection::<()>::make_subtree_data(sub_root, checksum);
246            mutator.insert(i, true, &key, &collection);
247        }
248
249        Ok(())
250    })?;
251
252    tree.finalize_dirty_checksums()?;
253    // No pages should have been freed by this operation
254    assert!(freed_pages.lock().unwrap().is_empty());
255    Ok(tree.get_root())
256}
257
258pub(crate) fn parse_subtree_roots<T: Page>(
259    page: &T,
260    fixed_key_size: Option<usize>,
261    fixed_value_size: Option<usize>,
262) -> Vec<(PageNumber, Checksum)> {
263    match page.memory()[0] {
264        BRANCH => {
265            vec![]
266        }
267        LEAF => {
268            let mut result = vec![];
269            let accessor = LeafAccessor::new(
270                page.memory(),
271                fixed_key_size,
272                DynamicCollection::<()>::fixed_width_with(fixed_value_size),
273            );
274            for i in 0..accessor.num_pairs() {
275                let entry = accessor.entry(i).unwrap();
276                let collection = <&DynamicCollection<()>>::from_bytes(entry.value());
277                if matches!(collection.collection_type(), DynamicCollectionType::Subtree) {
278                    result.push(collection.as_subtree());
279                }
280            }
281
282            result
283        }
284        _ => unreachable!(),
285    }
286}
287
288pub(crate) struct LeafKeyIter<'a, V: 'static> {
289    inline_collection: AccessGuard<'a, &'static DynamicCollection<V>>,
290    fixed_key_size: Option<usize>,
291    fixed_value_size: Option<usize>,
292    start_entry: isize, // inclusive
293    end_entry: isize,   // inclusive
294}
295
296impl<'a, V> LeafKeyIter<'a, V> {
297    fn new(
298        data: AccessGuard<'a, &'static DynamicCollection<V>>,
299        fixed_key_size: Option<usize>,
300        fixed_value_size: Option<usize>,
301    ) -> Self {
302        let accessor =
303            LeafAccessor::new(data.value().as_inline(), fixed_key_size, fixed_value_size);
304        let end_entry = isize::try_from(accessor.num_pairs()).unwrap() - 1;
305        Self {
306            inline_collection: data,
307            fixed_key_size,
308            fixed_value_size,
309            start_entry: 0,
310            end_entry,
311        }
312    }
313
314    fn next_key(&mut self) -> Option<&[u8]> {
315        if self.end_entry < self.start_entry {
316            return None;
317        }
318        let accessor = LeafAccessor::new(
319            self.inline_collection.value().as_inline(),
320            self.fixed_key_size,
321            self.fixed_value_size,
322        );
323        self.start_entry += 1;
324        accessor
325            .entry((self.start_entry - 1).try_into().unwrap())
326            .map(|e| e.key())
327    }
328
329    fn next_key_back(&mut self) -> Option<&[u8]> {
330        if self.end_entry < self.start_entry {
331            return None;
332        }
333        let accessor = LeafAccessor::new(
334            self.inline_collection.value().as_inline(),
335            self.fixed_key_size,
336            self.fixed_value_size,
337        );
338        self.end_entry -= 1;
339        accessor
340            .entry((self.end_entry + 1).try_into().unwrap())
341            .map(|e| e.key())
342    }
343}
344
345enum DynamicCollectionType {
346    Inline,
347    Subtree,
348}
349
350impl From<u8> for DynamicCollectionType {
351    fn from(value: u8) -> Self {
352        match value {
353            LEAF => Inline,
354            2 => Subtree,
355            _ => unreachable!(),
356        }
357    }
358}
359
360#[allow(clippy::from_over_into)]
361impl Into<u8> for DynamicCollectionType {
362    fn into(self) -> u8 {
363        match self {
364            // Reuse the LEAF type id, so that we can cast this directly into the format used by
365            // LeafAccessor
366            Inline => LEAF,
367            Subtree => 2,
368        }
369    }
370}
371
372/// Layout:
373/// type (1 byte):
374/// * 1 = inline data
375/// * 2 = sub tree
376///
377/// (when type = 1) data (n bytes): inlined leaf node
378///
379/// (when type = 2) root (8 bytes): sub tree root page number
380/// (when type = 2) checksum (16 bytes): sub tree checksum
381///
382/// NOTE: Even though the [PhantomData] is zero-sized, the inner data DST must be placed last.
383/// See [Exotically Sized Types](https://doc.rust-lang.org/nomicon/exotic-sizes.html#dynamically-sized-types-dsts)
384/// section of the Rustonomicon for more details.
385#[repr(transparent)]
386pub(crate) struct DynamicCollection<V> {
387    _value_type: PhantomData<V>,
388    data: [u8],
389}
390
391impl<V> std::fmt::Debug for DynamicCollection<V> {
392    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
393        f.debug_struct("DynamicCollection")
394            .field("data", &&self.data)
395            .finish()
396    }
397}
398
399impl<V> RedbValue for &DynamicCollection<V> {
400    type SelfType<'a> = &'a DynamicCollection<V>
401    where
402        Self: 'a;
403    type AsBytes<'a> = &'a [u8]
404    where
405        Self: 'a;
406
407    fn fixed_width() -> Option<usize> {
408        None
409    }
410
411    fn from_bytes<'a>(data: &'a [u8]) -> &'a DynamicCollection<V>
412    where
413        Self: 'a,
414    {
415        DynamicCollection::new(data)
416    }
417
418    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> &'a [u8]
419    where
420        Self: 'a,
421        Self: 'b,
422    {
423        &value.data
424    }
425
426    fn type_name() -> TypeName {
427        TypeName::internal("redb::DynamicCollection")
428    }
429}
430
431impl<V> DynamicCollection<V> {
432    fn new(data: &[u8]) -> &Self {
433        unsafe { mem::transmute(data) }
434    }
435
436    fn collection_type(&self) -> DynamicCollectionType {
437        DynamicCollectionType::from(self.data[0])
438    }
439
440    fn as_inline(&self) -> &[u8] {
441        debug_assert!(matches!(self.collection_type(), Inline));
442        &self.data[1..]
443    }
444
445    fn as_subtree(&self) -> (PageNumber, Checksum) {
446        debug_assert!(matches!(self.collection_type(), Subtree));
447        let offset = 1 + PageNumber::serialized_size();
448        let page_number = PageNumber::from_le_bytes(self.data[1..offset].try_into().unwrap());
449        let checksum = Checksum::from_le_bytes(
450            self.data[offset..(offset + size_of::<Checksum>())]
451                .try_into()
452                .unwrap(),
453        );
454        (page_number, checksum)
455    }
456
457    fn make_inline_data(data: &[u8]) -> Vec<u8> {
458        let mut result = vec![Inline.into()];
459        result.extend_from_slice(data);
460
461        result
462    }
463
464    fn make_subtree_data(root: PageNumber, checksum: Checksum) -> Vec<u8> {
465        let mut result = vec![Subtree.into()];
466        result.extend_from_slice(&root.to_le_bytes());
467        result.extend_from_slice(Checksum::as_bytes(&checksum).as_ref());
468
469        result
470    }
471
472    pub(crate) fn fixed_width_with(_value_width: Option<usize>) -> Option<usize> {
473        None
474    }
475}
476
477impl<V: RedbKey> DynamicCollection<V> {
478    fn iter<'a>(
479        collection: AccessGuard<'a, &'static DynamicCollection<V>>,
480        mem: &'a TransactionalMemory,
481    ) -> Result<MultimapValue<'a, V>> {
482        Ok(match collection.value().collection_type() {
483            Inline => {
484                let leaf_iter = LeafKeyIter::new(
485                    collection,
486                    V::fixed_width(),
487                    <() as RedbValue>::fixed_width(),
488                );
489                MultimapValue::new_inline(leaf_iter)
490            }
491            Subtree => {
492                let root = collection.value().as_subtree().0;
493                MultimapValue::new_subtree(BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(
494                    &(..),
495                    Some(root),
496                    mem,
497                )?)
498            }
499        })
500    }
501
502    fn iter_free_on_drop<'a>(
503        collection: AccessGuard<'a, &'static DynamicCollection<V>>,
504        pages: Vec<PageNumber>,
505        freed_pages: Arc<Mutex<Vec<PageNumber>>>,
506        mem: &'a TransactionalMemory,
507    ) -> Result<MultimapValue<'a, V>> {
508        Ok(match collection.value().collection_type() {
509            Inline => {
510                let leaf_iter = LeafKeyIter::new(
511                    collection,
512                    V::fixed_width(),
513                    <() as RedbValue>::fixed_width(),
514                );
515                MultimapValue::new_inline(leaf_iter)
516            }
517            Subtree => {
518                let root = collection.value().as_subtree().0;
519                let inner =
520                    BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), Some(root), mem)?;
521                MultimapValue::new_subtree_free_on_drop(inner, freed_pages, pages, mem)
522            }
523        })
524    }
525}
526
527#[repr(transparent)]
528pub(crate) struct UntypedDynamicCollection {
529    data: [u8],
530}
531
532impl UntypedDynamicCollection {
533    fn new(data: &[u8]) -> &Self {
534        unsafe { mem::transmute(data) }
535    }
536
537    fn collection_type(&self) -> DynamicCollectionType {
538        DynamicCollectionType::from(self.data[0])
539    }
540
541    fn as_inline(&self) -> &[u8] {
542        debug_assert!(matches!(self.collection_type(), Inline));
543        &self.data[1..]
544    }
545
546    fn as_subtree(&self) -> (PageNumber, Checksum) {
547        debug_assert!(matches!(self.collection_type(), Subtree));
548        let offset = 1 + PageNumber::serialized_size();
549        let page_number = PageNumber::from_le_bytes(self.data[1..offset].try_into().unwrap());
550        let checksum = Checksum::from_le_bytes(
551            self.data[offset..(offset + size_of::<Checksum>())]
552                .try_into()
553                .unwrap(),
554        );
555        (page_number, checksum)
556    }
557}
558
559enum ValueIterState<'a, V: RedbKey + 'static> {
560    Subtree(BtreeRangeIter<'a, V, ()>),
561    InlineLeaf(LeafKeyIter<'a, V>),
562}
563
564pub struct MultimapValue<'a, V: RedbKey + 'static> {
565    inner: Option<ValueIterState<'a, V>>,
566    freed_pages: Option<Arc<Mutex<Vec<PageNumber>>>>,
567    free_on_drop: Vec<PageNumber>,
568    mem: Option<&'a TransactionalMemory>,
569    _value_type: PhantomData<V>,
570}
571
572impl<'a, V: RedbKey + 'static> MultimapValue<'a, V> {
573    fn new_subtree(inner: BtreeRangeIter<'a, V, ()>) -> Self {
574        Self {
575            inner: Some(ValueIterState::Subtree(inner)),
576            freed_pages: None,
577            free_on_drop: vec![],
578            mem: None,
579            _value_type: Default::default(),
580        }
581    }
582
583    fn new_subtree_free_on_drop(
584        inner: BtreeRangeIter<'a, V, ()>,
585        freed_pages: Arc<Mutex<Vec<PageNumber>>>,
586        pages: Vec<PageNumber>,
587        mem: &'a TransactionalMemory,
588    ) -> Self {
589        Self {
590            inner: Some(ValueIterState::Subtree(inner)),
591            freed_pages: Some(freed_pages),
592            free_on_drop: pages,
593            mem: Some(mem),
594            _value_type: Default::default(),
595        }
596    }
597
598    fn new_inline(inner: LeafKeyIter<'a, V>) -> Self {
599        Self {
600            inner: Some(ValueIterState::InlineLeaf(inner)),
601            freed_pages: None,
602            free_on_drop: vec![],
603            mem: None,
604            _value_type: Default::default(),
605        }
606    }
607}
608
609impl<'a, V: RedbKey + 'static> Iterator for MultimapValue<'a, V> {
610    type Item = Result<AccessGuard<'a, V>>;
611
612    fn next(&mut self) -> Option<Self::Item> {
613        // TODO: optimize out this copy
614        let bytes = match self.inner.as_mut().unwrap() {
615            ValueIterState::Subtree(ref mut iter) => match iter.next()? {
616                Ok(e) => e.key_data(),
617                Err(err) => {
618                    return Some(Err(err));
619                }
620            },
621            ValueIterState::InlineLeaf(ref mut iter) => iter.next_key()?.to_vec(),
622        };
623        Some(Ok(AccessGuard::with_owned_value(bytes)))
624    }
625}
626
627impl<'a, V: RedbKey + 'static> DoubleEndedIterator for MultimapValue<'a, V> {
628    fn next_back(&mut self) -> Option<Self::Item> {
629        // TODO: optimize out this copy
630        let bytes = match self.inner.as_mut().unwrap() {
631            ValueIterState::Subtree(ref mut iter) => match iter.next_back()? {
632                Ok(e) => e.key_data(),
633                Err(err) => {
634                    return Some(Err(err));
635                }
636            },
637            ValueIterState::InlineLeaf(ref mut iter) => iter.next_key_back()?.to_vec(),
638        };
639        Some(Ok(AccessGuard::with_owned_value(bytes)))
640    }
641}
642
643impl<'a, V: RedbKey + 'static> Drop for MultimapValue<'a, V> {
644    fn drop(&mut self) {
645        // Drop our references to the pages that are about to be freed
646        drop(mem::take(&mut self.inner));
647        if !self.free_on_drop.is_empty() {
648            let mut freed_pages = self.freed_pages.as_ref().unwrap().lock().unwrap();
649            for page in self.free_on_drop.iter() {
650                if !self.mem.unwrap().free_if_uncommitted(*page) {
651                    freed_pages.push(*page);
652                }
653            }
654        }
655    }
656}
657
658pub struct MultimapRange<'a, K: RedbKey + 'static, V: RedbKey + 'static> {
659    inner: BtreeRangeIter<'a, K, &'static DynamicCollection<V>>,
660    mem: &'a TransactionalMemory,
661    _key_type: PhantomData<K>,
662    _value_type: PhantomData<V>,
663}
664
665impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> MultimapRange<'a, K, V> {
666    fn new(
667        inner: BtreeRangeIter<'a, K, &'static DynamicCollection<V>>,
668        mem: &'a TransactionalMemory,
669    ) -> Self {
670        Self {
671            inner,
672            mem,
673            _key_type: Default::default(),
674            _value_type: Default::default(),
675        }
676    }
677}
678
679impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> Iterator for MultimapRange<'a, K, V> {
680    type Item = Result<(AccessGuard<'a, K>, MultimapValue<'a, V>)>;
681
682    fn next(&mut self) -> Option<Self::Item> {
683        match self.inner.next()? {
684            Ok(entry) => {
685                let key = AccessGuard::with_owned_value(entry.key_data());
686                let (page, _, value_range) = entry.into_raw();
687                let collection = AccessGuard::with_page(page, value_range);
688                Some(DynamicCollection::iter(collection, self.mem).map(|iter| (key, iter)))
689            }
690            Err(err) => Some(Err(err)),
691        }
692    }
693}
694
695impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> DoubleEndedIterator
696    for MultimapRange<'a, K, V>
697{
698    fn next_back(&mut self) -> Option<Self::Item> {
699        match self.inner.next_back()? {
700            Ok(entry) => {
701                let key = AccessGuard::with_owned_value(entry.key_data());
702                let (page, _, value_range) = entry.into_raw();
703                let collection = AccessGuard::with_page(page, value_range);
704                Some(DynamicCollection::iter(collection, self.mem).map(|iter| (key, iter)))
705            }
706            Err(err) => Some(Err(err)),
707        }
708    }
709}
710
711/// A multimap table
712///
713/// [Multimap tables](https://en.wikipedia.org/wiki/Multimap) may have multiple values associated with each key
714pub struct MultimapTable<'db, 'txn, K: RedbKey + 'static, V: RedbKey + 'static> {
715    name: String,
716    transaction: &'txn WriteTransaction<'db>,
717    freed_pages: Arc<Mutex<Vec<PageNumber>>>,
718    tree: BtreeMut<'txn, K, &'static DynamicCollection<V>>,
719    mem: &'db TransactionalMemory,
720    _value_type: PhantomData<V>,
721}
722
723impl<K: RedbKey + 'static, V: RedbKey + 'static> MultimapTableHandle
724    for MultimapTable<'_, '_, K, V>
725{
726    fn name(&self) -> &str {
727        &self.name
728    }
729}
730
731impl<'db, 'txn, K: RedbKey + 'static, V: RedbKey + 'static> MultimapTable<'db, 'txn, K, V> {
732    pub(crate) fn new(
733        name: &str,
734        table_root: Option<(PageNumber, Checksum)>,
735        freed_pages: Arc<Mutex<Vec<PageNumber>>>,
736        mem: &'db TransactionalMemory,
737        transaction: &'txn WriteTransaction<'db>,
738    ) -> MultimapTable<'db, 'txn, K, V> {
739        MultimapTable {
740            name: name.to_string(),
741            transaction,
742            freed_pages: freed_pages.clone(),
743            tree: BtreeMut::new(table_root, mem, freed_pages),
744            mem,
745            _value_type: Default::default(),
746        }
747    }
748
749    #[allow(dead_code)]
750    pub(crate) fn print_debug(&self, include_values: bool) -> Result {
751        self.tree.print_debug(include_values)
752    }
753
754    /// Add the given value to the mapping of the key
755    ///
756    /// Returns `true` if the key-value pair was present
757    pub fn insert<'k, 'v>(
758        &mut self,
759        key: impl Borrow<K::SelfType<'k>>,
760        value: impl Borrow<V::SelfType<'v>>,
761    ) -> Result<bool> {
762        let value_bytes = V::as_bytes(value.borrow());
763        let value_bytes_ref = value_bytes.as_ref();
764        if value_bytes_ref.len() > MAX_VALUE_LENGTH {
765            return Err(StorageError::ValueTooLarge(value_bytes_ref.len()));
766        }
767        let key_bytes = K::as_bytes(key.borrow());
768        if key_bytes.as_ref().len() > MAX_VALUE_LENGTH {
769            return Err(StorageError::ValueTooLarge(key_bytes.as_ref().len()));
770        }
771        let get_result = self.tree.get(key.borrow())?;
772        let existed = if get_result.is_some() {
773            #[allow(clippy::unnecessary_unwrap)]
774            let guard = get_result.unwrap();
775            let collection_type = guard.value().collection_type();
776            match collection_type {
777                Inline => {
778                    let leaf_data = guard.value().as_inline();
779                    let accessor = LeafAccessor::new(
780                        leaf_data,
781                        V::fixed_width(),
782                        <() as RedbValue>::fixed_width(),
783                    );
784                    let (position, found) = accessor.position::<V>(value_bytes_ref);
785                    if found {
786                        return Ok(true);
787                    }
788
789                    let new_pairs = accessor.num_pairs() + 1;
790                    let new_pair_bytes =
791                        accessor.length_of_pairs(0, accessor.num_pairs()) + value_bytes_ref.len();
792                    let new_key_bytes =
793                        accessor.length_of_keys(0, accessor.num_pairs()) + value_bytes_ref.len();
794                    let required_inline_bytes = RawLeafBuilder::required_bytes(
795                        new_pairs,
796                        new_pair_bytes,
797                        V::fixed_width(),
798                        <() as RedbValue>::fixed_width(),
799                    );
800
801                    if required_inline_bytes < self.mem.get_page_size() / 2 {
802                        let mut data = vec![0; required_inline_bytes];
803                        let mut builder = RawLeafBuilder::new(
804                            &mut data,
805                            new_pairs,
806                            V::fixed_width(),
807                            <() as RedbValue>::fixed_width(),
808                            new_key_bytes,
809                        );
810                        for i in 0..accessor.num_pairs() {
811                            if i == position {
812                                builder.append(
813                                    value_bytes_ref,
814                                    <() as RedbValue>::as_bytes(&()).as_ref(),
815                                );
816                            }
817                            let entry = accessor.entry(i).unwrap();
818                            builder.append(entry.key(), entry.value());
819                        }
820                        if position == accessor.num_pairs() {
821                            builder
822                                .append(value_bytes_ref, <() as RedbValue>::as_bytes(&()).as_ref());
823                        }
824                        drop(builder);
825                        drop(guard);
826                        let inline_data = DynamicCollection::<V>::make_inline_data(&data);
827                        self.tree
828                            .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
829                    } else {
830                        // convert into a subtree
831                        let mut page = self.mem.allocate(leaf_data.len(), CachePriority::Low)?;
832                        page.memory_mut()[..leaf_data.len()].copy_from_slice(leaf_data);
833                        let page_number = page.get_page_number();
834                        drop(page);
835                        drop(guard);
836
837                        // Don't bother computing the checksum, since we're about to modify the tree
838                        let mut subtree: BtreeMut<'_, V, ()> = BtreeMut::new(
839                            Some((page_number, 0)),
840                            self.mem,
841                            self.freed_pages.clone(),
842                        );
843                        let existed = subtree.insert(value.borrow(), &())?.is_some();
844                        assert_eq!(existed, found);
845                        let (new_root, new_checksum) = subtree.get_root().unwrap();
846                        let subtree_data =
847                            DynamicCollection::<V>::make_subtree_data(new_root, new_checksum);
848                        self.tree
849                            .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
850                    }
851
852                    found
853                }
854                Subtree => {
855                    let mut subtree: BtreeMut<'_, V, ()> = BtreeMut::new(
856                        Some(guard.value().as_subtree()),
857                        self.mem,
858                        self.freed_pages.clone(),
859                    );
860                    drop(guard);
861                    let existed = subtree.insert(value.borrow(), &())?.is_some();
862                    let (new_root, new_checksum) = subtree.get_root().unwrap();
863                    let subtree_data =
864                        DynamicCollection::<V>::make_subtree_data(new_root, new_checksum);
865                    self.tree
866                        .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
867
868                    existed
869                }
870            }
871        } else {
872            drop(get_result);
873            let required_inline_bytes = RawLeafBuilder::required_bytes(
874                1,
875                value_bytes_ref.len(),
876                V::fixed_width(),
877                <() as RedbValue>::fixed_width(),
878            );
879            if required_inline_bytes < self.mem.get_page_size() / 2 {
880                let mut data = vec![0; required_inline_bytes];
881                let mut builder = RawLeafBuilder::new(
882                    &mut data,
883                    1,
884                    V::fixed_width(),
885                    <() as RedbValue>::fixed_width(),
886                    value_bytes_ref.len(),
887                );
888                builder.append(value_bytes_ref, <() as RedbValue>::as_bytes(&()).as_ref());
889                drop(builder);
890                let inline_data = DynamicCollection::<V>::make_inline_data(&data);
891                self.tree
892                    .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
893            } else {
894                let mut subtree: BtreeMut<'_, V, ()> =
895                    BtreeMut::new(None, self.mem, self.freed_pages.clone());
896                subtree.insert(value.borrow(), &())?;
897                let (new_root, new_checksum) = subtree.get_root().unwrap();
898                let subtree_data =
899                    DynamicCollection::<V>::make_subtree_data(new_root, new_checksum);
900                self.tree
901                    .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
902            }
903            false
904        };
905
906        Ok(existed)
907    }
908
909    /// Removes the given key-value pair
910    ///
911    /// Returns `true` if the key-value pair was present
912    pub fn remove<'a>(
913        &mut self,
914        key: impl Borrow<K::SelfType<'a>>,
915        value: impl Borrow<V::SelfType<'a>>,
916    ) -> Result<bool>
917    where
918        K: 'a,
919        V: 'a,
920    {
921        let get_result = self.tree.get(key.borrow())?;
922        if get_result.is_none() {
923            return Ok(false);
924        }
925        let guard = get_result.unwrap();
926        let v = guard.value();
927        let existed = match v.collection_type() {
928            Inline => {
929                let leaf_data = v.as_inline();
930                let accessor = LeafAccessor::new(
931                    leaf_data,
932                    V::fixed_width(),
933                    <() as RedbValue>::fixed_width(),
934                );
935                if let Some(position) = accessor.find_key::<V>(V::as_bytes(value.borrow()).as_ref())
936                {
937                    let old_num_pairs = accessor.num_pairs();
938                    if old_num_pairs == 1 {
939                        drop(guard);
940                        self.tree.remove(key.borrow())?;
941                    } else {
942                        let old_pairs_len = accessor.length_of_pairs(0, old_num_pairs);
943                        let removed_value_len = accessor.entry(position).unwrap().key().len();
944                        let required = RawLeafBuilder::required_bytes(
945                            old_num_pairs - 1,
946                            old_pairs_len - removed_value_len,
947                            V::fixed_width(),
948                            <() as RedbValue>::fixed_width(),
949                        );
950                        let mut new_data = vec![0; required];
951                        let new_key_len =
952                            accessor.length_of_keys(0, old_num_pairs) - removed_value_len;
953                        let mut builder = RawLeafBuilder::new(
954                            &mut new_data,
955                            old_num_pairs - 1,
956                            V::fixed_width(),
957                            <() as RedbValue>::fixed_width(),
958                            new_key_len,
959                        );
960                        for i in 0..old_num_pairs {
961                            if i != position {
962                                let entry = accessor.entry(i).unwrap();
963                                builder.append(entry.key(), entry.value());
964                            }
965                        }
966                        drop(builder);
967                        drop(guard);
968
969                        let inline_data = DynamicCollection::<V>::make_inline_data(&new_data);
970                        self.tree
971                            .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
972                    }
973                    true
974                } else {
975                    drop(guard);
976                    false
977                }
978            }
979            Subtree => {
980                let mut subtree: BtreeMut<V, ()> =
981                    BtreeMut::new(Some(v.as_subtree()), self.mem, self.freed_pages.clone());
982                drop(guard);
983                let existed = subtree.remove(value.borrow())?.is_some();
984
985                if let Some((new_root, new_checksum)) = subtree.get_root() {
986                    let page = self.mem.get_page(new_root)?;
987                    match page.memory()[0] {
988                        LEAF => {
989                            let accessor = LeafAccessor::new(
990                                page.memory(),
991                                V::fixed_width(),
992                                <() as RedbValue>::fixed_width(),
993                            );
994                            let len = accessor.total_length();
995                            if len < self.mem.get_page_size() / 2 {
996                                let inline_data =
997                                    DynamicCollection::<V>::make_inline_data(&page.memory()[..len]);
998                                self.tree
999                                    .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1000                                drop(page);
1001                                if !self.mem.free_if_uncommitted(new_root) {
1002                                    (*self.freed_pages).lock().unwrap().push(new_root);
1003                                }
1004                            } else {
1005                                let subtree_data = DynamicCollection::<V>::make_subtree_data(
1006                                    new_root,
1007                                    new_checksum,
1008                                );
1009                                self.tree
1010                                    .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1011                            }
1012                        }
1013                        BRANCH => {
1014                            let subtree_data =
1015                                DynamicCollection::<V>::make_subtree_data(new_root, new_checksum);
1016                            self.tree
1017                                .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1018                        }
1019                        _ => unreachable!(),
1020                    }
1021                } else {
1022                    self.tree.remove(key.borrow())?;
1023                }
1024
1025                existed
1026            }
1027        };
1028
1029        Ok(existed)
1030    }
1031
1032    /// Removes all values for the given key
1033    ///
1034    /// Returns an iterator over the removed values. Values are in ascending order.
1035    pub fn remove_all<'a>(&mut self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<V>>
1036    where
1037        K: 'a,
1038    {
1039        let iter = if let Some(collection) = self.tree.remove(key.borrow())? {
1040            let mut pages = vec![];
1041            if matches!(
1042                collection.value().collection_type(),
1043                DynamicCollectionType::Subtree
1044            ) {
1045                let root = collection.value().as_subtree().0;
1046                let all_pages = AllPageNumbersBtreeIter::new(
1047                    root,
1048                    V::fixed_width(),
1049                    <() as RedbValue>::fixed_width(),
1050                    self.mem,
1051                )?;
1052                for page in all_pages {
1053                    pages.push(page?);
1054                }
1055            }
1056            DynamicCollection::iter_free_on_drop(
1057                collection,
1058                pages,
1059                self.freed_pages.clone(),
1060                self.mem,
1061            )?
1062        } else {
1063            MultimapValue::new_subtree(BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(
1064                &(..),
1065                None,
1066                self.mem,
1067            )?)
1068        };
1069
1070        Ok(iter)
1071    }
1072}
1073
1074impl<'db, 'txn, K: RedbKey + 'static, V: RedbKey + 'static> ReadableMultimapTable<K, V>
1075    for MultimapTable<'db, 'txn, K, V>
1076{
1077    /// Returns an iterator over all values for the given key. Values are in ascending order.
1078    fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<V>>
1079    where
1080        K: 'a,
1081    {
1082        let iter = if let Some(collection) = self.tree.get(key.borrow())? {
1083            DynamicCollection::iter(collection, self.mem)?
1084        } else {
1085            MultimapValue::new_subtree(BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(
1086                &(..),
1087                None,
1088                self.mem,
1089            )?)
1090        };
1091
1092        Ok(iter)
1093    }
1094
1095    /// Returns a double-ended iterator over a range of elements in the table
1096    fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<MultimapRange<K, V>>
1097    where
1098        K: 'a,
1099        KR: Borrow<K::SelfType<'a>> + 'a,
1100    {
1101        let inner = self.tree.range(&range)?;
1102        Ok(MultimapRange::new(inner, self.mem))
1103    }
1104
1105    fn stats(&self) -> Result<TableStats> {
1106        let tree_stats = multimap_btree_stats(
1107            self.tree.get_root().map(|(p, _)| p),
1108            self.mem,
1109            K::fixed_width(),
1110            V::fixed_width(),
1111        )?;
1112
1113        Ok(TableStats {
1114            tree_height: tree_stats.tree_height,
1115            leaf_pages: tree_stats.leaf_pages,
1116            branch_pages: tree_stats.branch_pages,
1117            stored_leaf_bytes: tree_stats.stored_leaf_bytes,
1118            metadata_bytes: tree_stats.metadata_bytes,
1119            fragmented_bytes: tree_stats.fragmented_bytes,
1120        })
1121    }
1122
1123    /// Returns the number of key-value pairs in the table
1124    fn len(&self) -> Result<u64> {
1125        let mut count = 0;
1126        for item in self.iter()? {
1127            let (_, values) = item?;
1128            for v in values {
1129                v?;
1130                count += 1;
1131            }
1132        }
1133        Ok(count)
1134    }
1135
1136    /// Returns `true` if the table is empty
1137    fn is_empty(&self) -> Result<bool> {
1138        self.len().map(|x| x == 0)
1139    }
1140}
1141
1142impl<K: RedbKey + 'static, V: RedbKey + 'static> Sealed for MultimapTable<'_, '_, K, V> {}
1143
1144impl<'db, 'txn, K: RedbKey + 'static, V: RedbKey + 'static> Drop
1145    for MultimapTable<'db, 'txn, K, V>
1146{
1147    fn drop(&mut self) {
1148        self.transaction.close_table(&self.name, &self.tree);
1149    }
1150}
1151
1152pub trait ReadableMultimapTable<K: RedbKey + 'static, V: RedbKey + 'static>: Sealed {
1153    /// Returns an iterator over all values for the given key. Values are in ascending order.
1154    fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<V>>
1155    where
1156        K: 'a;
1157
1158    fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<MultimapRange<K, V>>
1159    where
1160        K: 'a,
1161        KR: Borrow<K::SelfType<'a>> + 'a;
1162
1163    /// Retrieves information about storage usage for the table
1164    fn stats(&self) -> Result<TableStats>;
1165
1166    fn len(&self) -> Result<u64>;
1167
1168    fn is_empty(&self) -> Result<bool>;
1169
1170    /// Returns an double-ended iterator over all elements in the table. Values are in ascending
1171    /// order.
1172    fn iter(&self) -> Result<MultimapRange<K, V>> {
1173        self.range::<K::SelfType<'_>>(..)
1174    }
1175}
1176
1177/// A read-only untyped multimap table
1178pub struct ReadOnlyUntypedMultimapTable<'txn> {
1179    tree: RawBtree<'txn>,
1180    fixed_key_size: Option<usize>,
1181    fixed_value_size: Option<usize>,
1182    mem: &'txn TransactionalMemory,
1183}
1184
1185impl<'txn> ReadOnlyUntypedMultimapTable<'txn> {
1186    pub(crate) fn new(
1187        root_page: Option<(PageNumber, Checksum)>,
1188        fixed_key_size: Option<usize>,
1189        fixed_value_size: Option<usize>,
1190        mem: &'txn TransactionalMemory,
1191    ) -> Self {
1192        Self {
1193            tree: RawBtree::new(
1194                root_page,
1195                fixed_key_size,
1196                DynamicCollection::<()>::fixed_width_with(fixed_value_size),
1197                mem,
1198            ),
1199            fixed_key_size,
1200            fixed_value_size,
1201            mem,
1202        }
1203    }
1204
1205    /// Retrieves information about storage usage for the table
1206    pub fn stats(&self) -> Result<TableStats> {
1207        let tree_stats = multimap_btree_stats(
1208            self.tree.get_root().map(|(p, _)| p),
1209            self.mem,
1210            self.fixed_key_size,
1211            self.fixed_value_size,
1212        )?;
1213
1214        Ok(TableStats {
1215            tree_height: tree_stats.tree_height,
1216            leaf_pages: tree_stats.leaf_pages,
1217            branch_pages: tree_stats.branch_pages,
1218            stored_leaf_bytes: tree_stats.stored_leaf_bytes,
1219            metadata_bytes: tree_stats.metadata_bytes,
1220            fragmented_bytes: tree_stats.fragmented_bytes,
1221        })
1222    }
1223}
1224
1225/// A read-only multimap table
1226pub struct ReadOnlyMultimapTable<'txn, K: RedbKey + 'static, V: RedbKey + 'static> {
1227    tree: Btree<'txn, K, &'static DynamicCollection<V>>,
1228    mem: &'txn TransactionalMemory,
1229    _value_type: PhantomData<V>,
1230}
1231
1232impl<'txn, K: RedbKey + 'static, V: RedbKey + 'static> ReadOnlyMultimapTable<'txn, K, V> {
1233    pub(crate) fn new(
1234        root_page: Option<(PageNumber, Checksum)>,
1235        hint: PageHint,
1236        mem: &'txn TransactionalMemory,
1237    ) -> Result<ReadOnlyMultimapTable<'txn, K, V>> {
1238        Ok(ReadOnlyMultimapTable {
1239            tree: Btree::new(root_page, hint, mem)?,
1240            mem,
1241            _value_type: Default::default(),
1242        })
1243    }
1244}
1245
1246impl<'txn, K: RedbKey + 'static, V: RedbKey + 'static> ReadableMultimapTable<K, V>
1247    for ReadOnlyMultimapTable<'txn, K, V>
1248{
1249    /// Returns an iterator over all values for the given key. Values are in ascending order.
1250    fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<V>>
1251    where
1252        K: 'a,
1253    {
1254        let iter = if let Some(collection) = self.tree.get(key.borrow())? {
1255            DynamicCollection::iter(collection, self.mem)?
1256        } else {
1257            MultimapValue::new_subtree(BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(
1258                &(..),
1259                None,
1260                self.mem,
1261            )?)
1262        };
1263
1264        Ok(iter)
1265    }
1266
1267    fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<MultimapRange<K, V>>
1268    where
1269        K: 'a,
1270        KR: Borrow<K::SelfType<'a>> + 'a,
1271    {
1272        let inner = self.tree.range(&range)?;
1273        Ok(MultimapRange::new(inner, self.mem))
1274    }
1275
1276    fn stats(&self) -> Result<TableStats> {
1277        let tree_stats = multimap_btree_stats(
1278            self.tree.get_root().map(|(p, _)| p),
1279            self.mem,
1280            K::fixed_width(),
1281            V::fixed_width(),
1282        )?;
1283
1284        Ok(TableStats {
1285            tree_height: tree_stats.tree_height,
1286            leaf_pages: tree_stats.leaf_pages,
1287            branch_pages: tree_stats.branch_pages,
1288            stored_leaf_bytes: tree_stats.stored_leaf_bytes,
1289            metadata_bytes: tree_stats.metadata_bytes,
1290            fragmented_bytes: tree_stats.fragmented_bytes,
1291        })
1292    }
1293
1294    fn len(&self) -> Result<u64> {
1295        let mut count = 0;
1296        for item in self.iter()? {
1297            let (_, values) = item?;
1298            for v in values {
1299                v?;
1300                count += 1;
1301            }
1302        }
1303        Ok(count)
1304    }
1305
1306    fn is_empty(&self) -> Result<bool> {
1307        self.len().map(|x| x == 0)
1308    }
1309}
1310
1311impl<K: RedbKey, V: RedbKey> Sealed for ReadOnlyMultimapTable<'_, K, V> {}