Skip to main content

shodh_redb/
multimap_table.rs

1use crate::cdc::types::{CdcEvent, ChangeOp};
2use crate::compat::{HashMap, Mutex};
3#[cfg(feature = "std")]
4use crate::db::CorruptPageInfo;
5use crate::db::TransactionGuard;
6use crate::multimap_table::DynamicCollectionType::{Inline, SubtreeV2};
7use crate::sealed::Sealed;
8use crate::table::{ReadableTableMetadata, TableStats};
9use crate::tree_store::{
10    AllPageNumbersBtreeIter, BRANCH, BranchAccessor, BranchMutator, Btree, BtreeHeader, BtreeMut,
11    BtreeRangeIter, BtreeStats, Checksum, DEFERRED, LEAF, LeafAccessor, LeafMutator,
12    MAX_PAIR_LENGTH, MAX_VALUE_LENGTH, Page, PageHint, PageNumber, PagePath, PageTrackerPolicy,
13    RawBtree, RawLeafBuilder, TransactionalMemory, UntypedBtree, UntypedBtreeMut, btree_stats,
14};
15use crate::types::{Key, TypeName, Value};
16use crate::{AccessGuard, MultimapTableHandle, Result, StorageError, WriteTransaction};
17use alloc::format;
18use alloc::string::{String, ToString};
19use alloc::sync::Arc;
20use alloc::vec;
21use alloc::vec::Vec;
22use core::borrow::Borrow;
23use core::cmp::max;
24use core::convert::TryInto;
25use core::marker::PhantomData;
26use core::mem;
27use core::mem::size_of;
28use core::ops::{RangeBounds, RangeFull};
29
30pub(crate) fn multimap_btree_stats(
31    root: Option<PageNumber>,
32    mem: &TransactionalMemory,
33    fixed_key_size: Option<usize>,
34    fixed_value_size: Option<usize>,
35) -> Result<BtreeStats> {
36    if let Some(root) = root {
37        multimap_stats_helper(root, mem, fixed_key_size, fixed_value_size)
38    } else {
39        Ok(BtreeStats {
40            tree_height: 0,
41            leaf_pages: 0,
42            branch_pages: 0,
43            stored_leaf_bytes: 0,
44            metadata_bytes: 0,
45            fragmented_bytes: 0,
46        })
47    }
48}
49
50fn multimap_stats_helper(
51    page_number: PageNumber,
52    mem: &TransactionalMemory,
53    fixed_key_size: Option<usize>,
54    fixed_value_size: Option<usize>,
55) -> Result<BtreeStats> {
56    let page = mem.get_page(page_number)?;
57    let node_mem = page.memory();
58    match node_mem[0] {
59        LEAF => {
60            let accessor = LeafAccessor::new(
61                page.memory(),
62                fixed_key_size,
63                DynamicCollection::<()>::fixed_width_with(fixed_value_size),
64            )?;
65            let mut leaf_bytes = 0u64;
66            let mut is_branch = false;
67            for i in 0..accessor.num_pairs() {
68                let entry = accessor
69                    .entry(i)
70                    .ok_or_else(|| StorageError::invalid_entry_index(page_number, i))?;
71                let collection: &UntypedDynamicCollection =
72                    UntypedDynamicCollection::new(entry.value());
73                match collection.collection_type()? {
74                    Inline => {
75                        let inline_accessor = LeafAccessor::new(
76                            collection.as_inline(),
77                            fixed_value_size,
78                            <() as Value>::fixed_width(),
79                        )?;
80                        leaf_bytes +=
81                            inline_accessor.length_of_pairs(0, inline_accessor.num_pairs()) as u64;
82                    }
83                    SubtreeV2 => {
84                        is_branch = true;
85                    }
86                }
87            }
88            let mut overhead_bytes = (accessor.total_length() as u64) - leaf_bytes;
89            let mut fragmented_bytes = (page.memory().len() - accessor.total_length()) as u64;
90            let mut max_child_height = 0;
91            let (mut leaf_pages, mut branch_pages) = if is_branch { (0, 1) } else { (1, 0) };
92
93            for i in 0..accessor.num_pairs() {
94                let entry = accessor
95                    .entry(i)
96                    .ok_or_else(|| StorageError::invalid_entry_index(page_number, i))?;
97                let collection: &UntypedDynamicCollection =
98                    UntypedDynamicCollection::new(entry.value());
99                match collection.collection_type()? {
100                    Inline => {
101                        // data is inline, so it was already counted above
102                    }
103                    SubtreeV2 => {
104                        // this is a sub-tree, so traverse it
105                        let stats = btree_stats(
106                            Some(collection.as_subtree().root),
107                            mem,
108                            fixed_value_size,
109                            <() as Value>::fixed_width(),
110                        )?;
111                        max_child_height = max(max_child_height, stats.tree_height);
112                        branch_pages += stats.branch_pages;
113                        leaf_pages += stats.leaf_pages;
114                        fragmented_bytes += stats.fragmented_bytes;
115                        overhead_bytes += stats.metadata_bytes;
116                        leaf_bytes += stats.stored_leaf_bytes;
117                    }
118                }
119            }
120
121            Ok(BtreeStats {
122                tree_height: max_child_height + 1,
123                leaf_pages,
124                branch_pages,
125                stored_leaf_bytes: leaf_bytes,
126                metadata_bytes: overhead_bytes,
127                fragmented_bytes,
128            })
129        }
130        BRANCH => {
131            let accessor = BranchAccessor::new(&page, fixed_key_size)?;
132            let mut max_child_height = 0;
133            let mut leaf_pages = 0;
134            let mut branch_pages = 1;
135            let mut stored_leaf_bytes = 0;
136            let mut metadata_bytes = accessor.total_length() as u64;
137            let mut fragmented_bytes = (page.memory().len() - accessor.total_length()) as u64;
138            for i in 0..accessor.count_children() {
139                if let Some(child) = accessor.child_page(i) {
140                    let stats =
141                        multimap_stats_helper(child, mem, fixed_key_size, fixed_value_size)?;
142                    max_child_height = max(max_child_height, stats.tree_height);
143                    leaf_pages += stats.leaf_pages;
144                    branch_pages += stats.branch_pages;
145                    stored_leaf_bytes += stats.stored_leaf_bytes;
146                    metadata_bytes += stats.metadata_bytes;
147                    fragmented_bytes += stats.fragmented_bytes;
148                }
149            }
150
151            Ok(BtreeStats {
152                tree_height: max_child_height + 1,
153                leaf_pages,
154                branch_pages,
155                stored_leaf_bytes,
156                metadata_bytes,
157                fragmented_bytes,
158            })
159        }
160        other => Err(StorageError::invalid_page_type(page_number, other)),
161    }
162}
163
164// Verify all the checksums in the tree, including any Dynamic collection subtrees
165pub(crate) fn verify_tree_and_subtree_checksums(
166    root: Option<BtreeHeader>,
167    key_size: Option<usize>,
168    value_size: Option<usize>,
169    mem: Arc<TransactionalMemory>,
170) -> Result<bool> {
171    if let Some(header) = root {
172        if !RawBtree::new(
173            Some(header),
174            key_size,
175            DynamicCollection::<()>::fixed_width_with(value_size),
176            mem.clone(),
177        )
178        .verify_checksum()?
179        {
180            return Ok(false);
181        }
182
183        let table_pages_iter = AllPageNumbersBtreeIter::new(
184            header.root,
185            key_size,
186            DynamicCollection::<()>::fixed_width_with(value_size),
187            mem.clone(),
188        )?;
189        for table_page in table_pages_iter {
190            let page = mem.get_page(table_page?)?;
191            let subtree_roots = parse_subtree_roots(&page, key_size, value_size)?;
192            for header in subtree_roots {
193                if !RawBtree::new(Some(header), value_size, <()>::fixed_width(), mem.clone())
194                    .verify_checksum()?
195                {
196                    return Ok(false);
197                }
198            }
199        }
200    }
201
202    Ok(true)
203}
204
205/// Like `verify_tree_and_subtree_checksums` but collects corruption details.
206#[cfg(feature = "std")]
207pub(crate) fn verify_tree_and_subtree_checksums_detailed(
208    root: Option<BtreeHeader>,
209    key_size: Option<usize>,
210    value_size: Option<usize>,
211    mem: Arc<TransactionalMemory>,
212) -> Result<(u64, Vec<CorruptPageInfo>)> {
213    let mut total_pages = 0u64;
214    let mut all_corruptions = Vec::new();
215
216    if let Some(header) = root {
217        let tree = RawBtree::new(
218            Some(header),
219            key_size,
220            DynamicCollection::<()>::fixed_width_with(value_size),
221            mem.clone(),
222        );
223        let (pages, corruptions) = tree.verify_checksum_detailed()?;
224        total_pages += pages;
225        all_corruptions.extend(corruptions);
226
227        let table_pages_iter = AllPageNumbersBtreeIter::new(
228            header.root,
229            key_size,
230            DynamicCollection::<()>::fixed_width_with(value_size),
231            mem.clone(),
232        )?;
233        for table_page in table_pages_iter {
234            let page = mem.get_page(table_page?)?;
235            let subtree_roots = parse_subtree_roots(&page, key_size, value_size)?;
236            for sub_header in subtree_roots {
237                let sub_tree = RawBtree::new(
238                    Some(sub_header),
239                    value_size,
240                    <()>::fixed_width(),
241                    mem.clone(),
242                );
243                let (pages, corruptions) = sub_tree.verify_checksum_detailed()?;
244                total_pages += pages;
245                all_corruptions.extend(corruptions);
246            }
247        }
248    }
249
250    Ok((total_pages, all_corruptions))
251}
252
253/// Like `verify_tree_and_subtree_checksums` but verifies structural integrity.
254#[cfg(feature = "std")]
255pub(crate) fn verify_tree_and_subtree_structure(
256    root: Option<BtreeHeader>,
257    key_size: Option<usize>,
258    value_size: Option<usize>,
259    mem: Arc<TransactionalMemory>,
260) -> Result<Vec<CorruptPageInfo>> {
261    let mut all_corruptions = Vec::new();
262
263    if let Some(header) = root {
264        let tree = RawBtree::new(
265            Some(header),
266            key_size,
267            DynamicCollection::<()>::fixed_width_with(value_size),
268            mem.clone(),
269        );
270        all_corruptions.extend(tree.verify_structure()?);
271
272        let table_pages_iter = AllPageNumbersBtreeIter::new(
273            header.root,
274            key_size,
275            DynamicCollection::<()>::fixed_width_with(value_size),
276            mem.clone(),
277        )?;
278        for table_page in table_pages_iter {
279            let page = mem.get_page(table_page?)?;
280            let subtree_roots = parse_subtree_roots(&page, key_size, value_size)?;
281            for sub_header in subtree_roots {
282                let sub_tree = RawBtree::new(
283                    Some(sub_header),
284                    value_size,
285                    <()>::fixed_width(),
286                    mem.clone(),
287                );
288                all_corruptions.extend(sub_tree.verify_structure()?);
289            }
290        }
291    }
292
293    Ok(all_corruptions)
294}
295
296// Relocate all subtrees to lower index pages, if possible
297pub(crate) fn relocate_subtrees(
298    root: (PageNumber, Checksum),
299    key_size: Option<usize>,
300    value_size: Option<usize>,
301    mem: Arc<TransactionalMemory>,
302    freed_pages: Arc<Mutex<Vec<PageNumber>>>,
303    relocation_map: &HashMap<PageNumber, PageNumber>,
304) -> Result<(PageNumber, Checksum)> {
305    let old_page = mem.get_page(root.0)?;
306    let mut new_page = if let Some(new_page_number) = relocation_map.get(&root.0) {
307        mem.get_page_mut(*new_page_number)?
308    } else {
309        return Ok(root);
310    };
311    let new_page_number = new_page.get_page_number();
312    new_page.memory_mut()?.copy_from_slice(old_page.memory());
313
314    match old_page.memory()[0] {
315        LEAF => {
316            let accessor = LeafAccessor::new(
317                old_page.memory(),
318                key_size,
319                UntypedDynamicCollection::fixed_width_with(value_size),
320            )?;
321            // TODO: maybe there's a better abstraction, so that we don't need to call into this low-level method?
322            let mut mutator = LeafMutator::new(
323                new_page.memory_mut()?,
324                key_size,
325                UntypedDynamicCollection::fixed_width_with(value_size),
326            );
327            for i in 0..accessor.num_pairs() {
328                let entry = accessor
329                    .entry(i)
330                    .ok_or_else(|| StorageError::invalid_entry_index(root.0, i))?;
331                let collection = UntypedDynamicCollection::from_bytes(entry.value());
332                if matches!(collection.collection_type()?, SubtreeV2) {
333                    let sub_root = collection.as_subtree();
334                    let mut tree = UntypedBtreeMut::new(
335                        Some(sub_root),
336                        mem.clone(),
337                        freed_pages.clone(),
338                        value_size,
339                        <() as Value>::fixed_width(),
340                    );
341                    tree.relocate(relocation_map)?;
342                    let new_root = tree.get_root().ok_or_else(|| {
343                        StorageError::Internal(
344                            "missing subtree root after relocate in relocate_subtrees".to_string(),
345                        )
346                    })?;
347                    if sub_root != new_root {
348                        let new_collection = UntypedDynamicCollection::make_subtree_data(new_root);
349                        mutator.insert(i, true, entry.key(), &new_collection);
350                    }
351                }
352            }
353        }
354        BRANCH => {
355            let accessor = BranchAccessor::new(&old_page, key_size)?;
356            let mut mutator = BranchMutator::new(new_page.memory_mut()?);
357            for i in 0..accessor.count_children() {
358                if let Some(child) = accessor.child_page(i) {
359                    let child_checksum = accessor.child_checksum(i).unwrap();
360                    let (new_child, new_checksum) = relocate_subtrees(
361                        (child, child_checksum),
362                        key_size,
363                        value_size,
364                        mem.clone(),
365                        freed_pages.clone(),
366                        relocation_map,
367                    )?;
368                    mutator.write_child_page(i, new_child, new_checksum);
369                }
370            }
371        }
372        _ => {
373            return Err(StorageError::invalid_page_type(
374                root.0,
375                old_page.memory()[0],
376            ));
377        }
378    }
379
380    let old_page_number = old_page.get_page_number();
381    drop(old_page);
382    // No need to track allocations, because this method is only called during compaction when
383    // there can't be any savepoints
384    let mut ignore = PageTrackerPolicy::Ignore;
385    if !mem.free_if_uncommitted(old_page_number, &mut ignore) {
386        freed_pages.lock().push(old_page_number);
387    }
388    Ok((new_page_number, DEFERRED))
389}
390
391// Finalize all the checksums in the tree, including any Dynamic collection subtrees
392// Returns the root checksum
393pub(crate) fn finalize_tree_and_subtree_checksums(
394    root: Option<BtreeHeader>,
395    key_size: Option<usize>,
396    value_size: Option<usize>,
397    mem: Arc<TransactionalMemory>,
398) -> Result<Option<BtreeHeader>> {
399    let freed_pages = Arc::new(Mutex::new(vec![]));
400    let mut tree = UntypedBtreeMut::new(
401        root,
402        mem.clone(),
403        freed_pages.clone(),
404        key_size,
405        DynamicCollection::<()>::fixed_width_with(value_size),
406    );
407    tree.dirty_leaf_visitor(|mut leaf_page| {
408        let mut sub_root_updates = vec![];
409        let accessor = LeafAccessor::new(
410            leaf_page.memory(),
411            key_size,
412            DynamicCollection::<()>::fixed_width_with(value_size),
413        )?;
414        for i in 0..accessor.num_pairs() {
415            let entry = accessor
416                .entry(i)
417                .ok_or_else(|| StorageError::invalid_entry_index(leaf_page.get_page_number(), i))?;
418            let collection = <&DynamicCollection<()>>::from_bytes(entry.value());
419            if matches!(collection.collection_type()?, SubtreeV2) {
420                let sub_root = collection.as_subtree();
421                if mem.uncommitted(sub_root.root) {
422                    let mut subtree = UntypedBtreeMut::new(
423                        Some(sub_root),
424                        mem.clone(),
425                        freed_pages.clone(),
426                        value_size,
427                        <()>::fixed_width(),
428                    );
429                    let subtree_root = subtree.finalize_dirty_checksums()?.unwrap();
430                    sub_root_updates.push((i, entry.key().to_vec(), subtree_root));
431                }
432            }
433        }
434        // TODO: maybe there's a better abstraction, so that we don't need to call into this low-level method?
435        let mut mutator = LeafMutator::new(
436            leaf_page.memory_mut()?,
437            key_size,
438            DynamicCollection::<()>::fixed_width_with(value_size),
439        );
440        for (i, key, sub_root) in sub_root_updates {
441            let collection = DynamicCollection::<()>::make_subtree_data(sub_root);
442            mutator.insert(i, true, &key, &collection);
443        }
444
445        Ok(())
446    })?;
447
448    let root = tree.finalize_dirty_checksums()?;
449    // No pages should have been freed by this operation
450    assert!(freed_pages.lock().is_empty());
451    Ok(root)
452}
453
454fn parse_subtree_roots<T: Page>(
455    page: &T,
456    fixed_key_size: Option<usize>,
457    fixed_value_size: Option<usize>,
458) -> crate::Result<Vec<BtreeHeader>> {
459    match page.memory()[0] {
460        BRANCH => Ok(vec![]),
461        LEAF => {
462            let mut result = vec![];
463            let accessor = LeafAccessor::new(
464                page.memory(),
465                fixed_key_size,
466                DynamicCollection::<()>::fixed_width_with(fixed_value_size),
467            )?;
468            for i in 0..accessor.num_pairs() {
469                let entry = accessor
470                    .entry(i)
471                    .ok_or_else(|| StorageError::invalid_entry_index(page.get_page_number(), i))?;
472                let collection = <&DynamicCollection<()>>::from_bytes(entry.value());
473                if matches!(collection.collection_type()?, SubtreeV2) {
474                    result.push(collection.as_subtree());
475                }
476            }
477
478            Ok(result)
479        }
480        _ => Err(StorageError::invalid_page_type(
481            page.get_page_number(),
482            page.memory()[0],
483        )),
484    }
485}
486
487pub(crate) struct UntypedMultiBtree {
488    mem: Arc<TransactionalMemory>,
489    root: Option<BtreeHeader>,
490    key_width: Option<usize>,
491    value_width: Option<usize>,
492}
493
494impl UntypedMultiBtree {
495    pub(crate) fn new(
496        root: Option<BtreeHeader>,
497        mem: Arc<TransactionalMemory>,
498        key_width: Option<usize>,
499        value_width: Option<usize>,
500    ) -> Self {
501        Self {
502            mem,
503            root,
504            key_width,
505            value_width,
506        }
507    }
508
509    // Applies visitor to pages in the tree
510    pub(crate) fn visit_all_pages<F>(&self, mut visitor: F) -> Result
511    where
512        F: FnMut(&PagePath) -> Result,
513    {
514        let tree = UntypedBtree::new(
515            self.root,
516            self.mem.clone(),
517            self.key_width,
518            UntypedDynamicCollection::fixed_width_with(self.value_width),
519        );
520        tree.visit_all_pages(|path| {
521            visitor(path)?;
522            let page = self.mem.get_page(path.page_number())?;
523            match page.memory()[0] {
524                LEAF => {
525                    for header in parse_subtree_roots(&page, self.key_width, self.value_width)? {
526                        let subtree = UntypedBtree::new(
527                            Some(header),
528                            self.mem.clone(),
529                            self.value_width,
530                            <() as Value>::fixed_width(),
531                        );
532                        subtree.visit_all_pages(|subpath| {
533                            let full_path = path.with_subpath(subpath);
534                            visitor(&full_path)
535                        })?;
536                    }
537                }
538                BRANCH => {
539                    // No-op. The tree.visit_pages() call will process this sub-tree
540                }
541                other => {
542                    return Err(StorageError::invalid_page_type(path.page_number(), other));
543                }
544            }
545            Ok(())
546        })?;
547
548        Ok(())
549    }
550}
551
552pub(crate) struct LeafKeyIter<'a, V: Key + 'static> {
553    inline_collection: AccessGuard<'a, &'static DynamicCollection<V>>,
554    fixed_key_size: Option<usize>,
555    fixed_value_size: Option<usize>,
556    start_entry: isize, // inclusive
557    end_entry: isize,   // inclusive
558}
559
560impl<'a, V: Key> LeafKeyIter<'a, V> {
561    fn new(
562        data: AccessGuard<'a, &'static DynamicCollection<V>>,
563        fixed_key_size: Option<usize>,
564        fixed_value_size: Option<usize>,
565    ) -> Result<Self> {
566        let accessor =
567            LeafAccessor::new(data.value().as_inline(), fixed_key_size, fixed_value_size)?;
568        let num_pairs = accessor.num_pairs();
569        let end_entry = if num_pairs == 0 {
570            -1
571        } else {
572            isize::try_from(num_pairs).unwrap_or(isize::MAX) - 1
573        };
574        Ok(Self {
575            inline_collection: data,
576            fixed_key_size,
577            fixed_value_size,
578            start_entry: 0,
579            end_entry,
580        })
581    }
582
583    fn next_key(&mut self) -> Option<Result<&[u8]>> {
584        if self.end_entry < self.start_entry {
585            return None;
586        }
587        let accessor = match LeafAccessor::new(
588            self.inline_collection.value().as_inline(),
589            self.fixed_key_size,
590            self.fixed_value_size,
591        ) {
592            Ok(a) => a,
593            Err(e) => return Some(Err(e)),
594        };
595        self.start_entry += 1;
596        accessor
597            .entry((self.start_entry - 1).try_into().unwrap())
598            .map(|e| Ok(e.key()))
599    }
600
601    fn next_key_back(&mut self) -> Option<Result<&[u8]>> {
602        if self.end_entry < self.start_entry {
603            return None;
604        }
605        let accessor = match LeafAccessor::new(
606            self.inline_collection.value().as_inline(),
607            self.fixed_key_size,
608            self.fixed_value_size,
609        ) {
610            Ok(a) => a,
611            Err(e) => return Some(Err(e)),
612        };
613        self.end_entry -= 1;
614        accessor
615            .entry((self.end_entry + 1).try_into().unwrap())
616            .map(|e| Ok(e.key()))
617    }
618}
619
620enum DynamicCollectionType {
621    Inline,
622    // Was used in file format version 1
623    // Subtree,
624    SubtreeV2,
625}
626
627impl DynamicCollectionType {
628    fn try_from_byte(value: u8) -> crate::Result<Self> {
629        match value {
630            LEAF => Ok(Inline),
631            // 2 => Subtree,
632            3 => Ok(SubtreeV2),
633            _ => Err(StorageError::Corrupted(format!(
634                "invalid DynamicCollectionType byte: {value}"
635            ))),
636        }
637    }
638}
639
640#[allow(clippy::from_over_into)]
641impl Into<u8> for DynamicCollectionType {
642    fn into(self) -> u8 {
643        match self {
644            // Reuse the LEAF type id, so that we can cast this directly into the format used by
645            // LeafAccessor
646            Inline => LEAF,
647            // Subtree => 2,
648            SubtreeV2 => 3,
649        }
650    }
651}
652
653/// Layout:
654/// type (1 byte):
655/// * 1 = inline data
656/// * 2 = sub tree
657///
658/// (when type = 1) data (n bytes): inlined leaf node
659///
660/// (when type = 2) root (8 bytes): sub tree root page number
661/// (when type = 2) checksum (16 bytes): sub tree checksum
662///
663/// NOTE: Even though the [`PhantomData`] is zero-sized, the inner data DST must be placed last.
664/// See [Exotically Sized Types](https://doc.rust-lang.org/nomicon/exotic-sizes.html#dynamically-sized-types-dsts)
665/// section of the Rustonomicon for more details.
666#[repr(transparent)]
667struct DynamicCollection<V: Key> {
668    _value_type: PhantomData<V>,
669    data: [u8],
670}
671
672impl<V: Key> core::fmt::Debug for DynamicCollection<V> {
673    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
674        f.debug_struct("DynamicCollection")
675            .field("data", &&self.data)
676            .finish()
677    }
678}
679
680impl<V: Key> Value for &DynamicCollection<V> {
681    type SelfType<'a>
682        = &'a DynamicCollection<V>
683    where
684        Self: 'a;
685    type AsBytes<'a>
686        = &'a [u8]
687    where
688        Self: 'a;
689
690    fn fixed_width() -> Option<usize> {
691        None
692    }
693
694    fn from_bytes<'a>(data: &'a [u8]) -> &'a DynamicCollection<V>
695    where
696        Self: 'a,
697    {
698        DynamicCollection::new(data)
699    }
700
701    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> &'a [u8]
702    where
703        Self: 'b,
704    {
705        &value.data
706    }
707
708    fn type_name() -> TypeName {
709        TypeName::internal("redb::DynamicCollection")
710    }
711}
712
713impl<V: Key> DynamicCollection<V> {
714    fn new(data: &[u8]) -> &Self {
715        // SAFETY: DynamicCollection<V> is #[repr(transparent)] over [u8] (with a
716        // zero-sized PhantomData<V>), so it has identical size, alignment, and
717        // memory layout. The pointer cast preserves the slice length metadata and
718        // the shared borrow lifetime is propagated to the returned reference.
719        unsafe { &*(core::ptr::from_ref::<[u8]>(data) as *const DynamicCollection<V>) }
720    }
721
722    fn collection_type(&self) -> crate::Result<DynamicCollectionType> {
723        DynamicCollectionType::try_from_byte(self.data[0])
724    }
725
726    fn as_inline(&self) -> &[u8] {
727        &self.data[1..]
728    }
729
730    fn as_subtree(&self) -> BtreeHeader {
731        BtreeHeader::from_le_bytes(
732            self.data[1..=BtreeHeader::serialized_size()]
733                .try_into()
734                .unwrap(),
735        )
736    }
737
738    fn get_num_values(&self) -> crate::Result<u64> {
739        Ok(match self.collection_type()? {
740            Inline => {
741                let leaf_data = self.as_inline();
742                let accessor =
743                    LeafAccessor::new(leaf_data, V::fixed_width(), <() as Value>::fixed_width())?;
744                accessor.num_pairs() as u64
745            }
746            SubtreeV2 => {
747                let offset = 1 + PageNumber::serialized_size() + size_of::<Checksum>();
748                u64::from_le_bytes(
749                    self.data[offset..(offset + size_of::<u64>())]
750                        .try_into()
751                        .unwrap(),
752                )
753            }
754        })
755    }
756
757    fn make_inline_data(data: &[u8]) -> Vec<u8> {
758        let mut result = vec![Inline.into()];
759        result.extend_from_slice(data);
760
761        result
762    }
763
764    fn make_subtree_data(header: BtreeHeader) -> Vec<u8> {
765        let mut result = vec![SubtreeV2.into()];
766        result.extend_from_slice(&header.to_le_bytes());
767        result
768    }
769
770    pub(crate) fn fixed_width_with(_value_width: Option<usize>) -> Option<usize> {
771        None
772    }
773}
774
775impl<V: Key> DynamicCollection<V> {
776    fn iter<'a>(
777        collection: AccessGuard<'a, &'static DynamicCollection<V>>,
778        guard: Arc<TransactionGuard>,
779        mem: Arc<TransactionalMemory>,
780    ) -> Result<MultimapValue<'a, V>> {
781        Ok(match collection.value().collection_type()? {
782            Inline => {
783                let leaf_iter =
784                    LeafKeyIter::new(collection, V::fixed_width(), <() as Value>::fixed_width())?;
785                MultimapValue::new_inline(leaf_iter, guard)?
786            }
787            SubtreeV2 => {
788                let root = collection.value().as_subtree().root;
789                MultimapValue::new_subtree(
790                    BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(
791                        &(..),
792                        Some(root),
793                        None,
794                        if mem.compression().is_enabled() {
795                            None
796                        } else {
797                            <() as Value>::fixed_width()
798                        },
799                        mem,
800                        false,
801                    )?,
802                    collection.value().get_num_values()?,
803                    guard,
804                )
805            }
806        })
807    }
808
809    fn iter_free_on_drop<'a>(
810        collection: AccessGuard<'a, &'static DynamicCollection<V>>,
811        pages: Vec<PageNumber>,
812        freed_pages: Arc<Mutex<Vec<PageNumber>>>,
813        allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
814        guard: Arc<TransactionGuard>,
815        mem: Arc<TransactionalMemory>,
816    ) -> Result<MultimapValue<'a, V>> {
817        let num_values = collection.value().get_num_values()?;
818        Ok(match collection.value().collection_type()? {
819            Inline => {
820                let leaf_iter =
821                    LeafKeyIter::new(collection, V::fixed_width(), <() as Value>::fixed_width())?;
822                MultimapValue::new_inline(leaf_iter, guard)?
823            }
824            SubtreeV2 => {
825                let root = collection.value().as_subtree().root;
826                let inner = BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(
827                    &(..),
828                    Some(root),
829                    None,
830                    if mem.compression().is_enabled() {
831                        None
832                    } else {
833                        <() as Value>::fixed_width()
834                    },
835                    mem.clone(),
836                    false,
837                )?;
838                MultimapValue::new_subtree_free_on_drop(
839                    inner,
840                    num_values,
841                    freed_pages,
842                    allocated_pages,
843                    pages,
844                    guard,
845                    mem,
846                )
847            }
848        })
849    }
850}
851
852#[repr(transparent)]
853struct UntypedDynamicCollection {
854    data: [u8],
855}
856
857impl UntypedDynamicCollection {
858    pub(crate) fn fixed_width_with(_value_width: Option<usize>) -> Option<usize> {
859        None
860    }
861
862    fn new(data: &[u8]) -> &Self {
863        // SAFETY: UntypedDynamicCollection is #[repr(transparent)] over [u8],
864        // so it has identical size, alignment, and memory layout. The pointer
865        // cast preserves the slice length metadata and the shared borrow
866        // lifetime is propagated to the returned reference.
867        unsafe { &*(core::ptr::from_ref::<[u8]>(data) as *const UntypedDynamicCollection) }
868    }
869
870    fn make_subtree_data(header: BtreeHeader) -> Vec<u8> {
871        let mut result = vec![SubtreeV2.into()];
872        result.extend_from_slice(&header.to_le_bytes());
873        result
874    }
875
876    fn from_bytes(data: &[u8]) -> &Self {
877        Self::new(data)
878    }
879
880    fn collection_type(&self) -> crate::Result<DynamicCollectionType> {
881        DynamicCollectionType::try_from_byte(self.data[0])
882    }
883
884    fn as_inline(&self) -> &[u8] {
885        &self.data[1..]
886    }
887
888    fn as_subtree(&self) -> BtreeHeader {
889        BtreeHeader::from_le_bytes(
890            self.data[1..=BtreeHeader::serialized_size()]
891                .try_into()
892                .unwrap(),
893        )
894    }
895}
896
897enum ValueIterState<'a, V: Key + 'static> {
898    Subtree(BtreeRangeIter<V, ()>),
899    InlineLeaf(LeafKeyIter<'a, V>),
900}
901
902pub struct MultimapValue<'a, V: Key + 'static> {
903    inner: Option<ValueIterState<'a, V>>,
904    remaining: u64,
905    freed_pages: Option<Arc<Mutex<Vec<PageNumber>>>>,
906    allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
907    free_on_drop: Vec<PageNumber>,
908    _transaction_guard: Arc<TransactionGuard>,
909    mem: Option<Arc<TransactionalMemory>>,
910    _value_type: PhantomData<V>,
911}
912
913impl<'a, V: Key + 'static> MultimapValue<'a, V> {
914    fn new_subtree(
915        inner: BtreeRangeIter<V, ()>,
916        num_values: u64,
917        guard: Arc<TransactionGuard>,
918    ) -> Self {
919        Self {
920            inner: Some(ValueIterState::Subtree(inner)),
921            remaining: num_values,
922            freed_pages: None,
923            allocated_pages: Arc::new(Mutex::new(PageTrackerPolicy::Closed)),
924            free_on_drop: vec![],
925            _transaction_guard: guard,
926            mem: None,
927            _value_type: Default::default(),
928        }
929    }
930
931    fn new_subtree_free_on_drop(
932        inner: BtreeRangeIter<V, ()>,
933        num_values: u64,
934        freed_pages: Arc<Mutex<Vec<PageNumber>>>,
935        allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
936        pages: Vec<PageNumber>,
937        guard: Arc<TransactionGuard>,
938        mem: Arc<TransactionalMemory>,
939    ) -> Self {
940        Self {
941            inner: Some(ValueIterState::Subtree(inner)),
942            remaining: num_values,
943            freed_pages: Some(freed_pages),
944            free_on_drop: pages,
945            allocated_pages,
946            _transaction_guard: guard,
947            mem: Some(mem),
948            _value_type: Default::default(),
949        }
950    }
951
952    fn new_inline(inner: LeafKeyIter<'a, V>, guard: Arc<TransactionGuard>) -> Result<Self> {
953        let remaining = inner.inline_collection.value().get_num_values()?;
954        Ok(Self {
955            inner: Some(ValueIterState::InlineLeaf(inner)),
956            remaining,
957            freed_pages: None,
958            allocated_pages: Arc::new(Mutex::new(PageTrackerPolicy::Closed)),
959            free_on_drop: vec![],
960            _transaction_guard: guard,
961            mem: None,
962            _value_type: Default::default(),
963        })
964    }
965
966    /// Returns the number of times this iterator will return `Some(Ok(_))`
967    ///
968    /// Note that `Some` may be returned from `next()` more than `len()` times if `Some(Err(_))` is returned
969    pub fn len(&self) -> u64 {
970        self.remaining
971    }
972
973    pub fn is_empty(&self) -> bool {
974        self.len() == 0
975    }
976}
977
978impl<'a, V: Key + 'static> Iterator for MultimapValue<'a, V> {
979    type Item = Result<AccessGuard<'a, V>>;
980
981    fn next(&mut self) -> Option<Self::Item> {
982        let bytes = match self.inner.as_mut()? {
983            ValueIterState::Subtree(iter) => match iter.next()? {
984                Ok(e) => e.key_data(),
985                Err(err) => {
986                    return Some(Err(err));
987                }
988            },
989            ValueIterState::InlineLeaf(iter) => match iter.next_key()? {
990                Ok(bytes) => bytes.to_vec(),
991                Err(err) => return Some(Err(err)),
992            },
993        };
994        self.remaining -= 1;
995        Some(Ok(AccessGuard::with_owned_value(bytes)))
996    }
997}
998
999impl<V: Key + 'static> DoubleEndedIterator for MultimapValue<'_, V> {
1000    fn next_back(&mut self) -> Option<Self::Item> {
1001        let bytes = match self.inner.as_mut()? {
1002            ValueIterState::Subtree(iter) => match iter.next_back()? {
1003                Ok(e) => e.key_data(),
1004                Err(err) => {
1005                    return Some(Err(err));
1006                }
1007            },
1008            ValueIterState::InlineLeaf(iter) => match iter.next_key_back()? {
1009                Ok(bytes) => bytes.to_vec(),
1010                Err(err) => return Some(Err(err)),
1011            },
1012        };
1013        self.remaining -= 1;
1014        Some(Ok(AccessGuard::with_owned_value(bytes)))
1015    }
1016}
1017
1018impl<V: Key + 'static> Drop for MultimapValue<'_, V> {
1019    fn drop(&mut self) {
1020        // Drop our references to the pages that are about to be freed
1021        drop(mem::take(&mut self.inner));
1022        if !self.free_on_drop.is_empty()
1023            && let (Some(freed_pages_arc), Some(mem_arc)) =
1024                (self.freed_pages.as_ref(), self.mem.as_ref())
1025        {
1026            let mut freed_pages = freed_pages_arc.lock();
1027            let mut allocated_pages = self.allocated_pages.lock();
1028            for page in &self.free_on_drop {
1029                if !mem_arc.free_if_uncommitted(*page, &mut allocated_pages) {
1030                    freed_pages.push(*page);
1031                }
1032            }
1033        }
1034    }
1035}
1036
1037pub struct MultimapRange<'a, K: Key + 'static, V: Key + 'static> {
1038    inner: BtreeRangeIter<K, &'static DynamicCollection<V>>,
1039    mem: Arc<TransactionalMemory>,
1040    transaction_guard: Arc<TransactionGuard>,
1041    _key_type: PhantomData<K>,
1042    _value_type: PhantomData<V>,
1043    _lifetime: PhantomData<&'a ()>,
1044}
1045
1046impl<K: Key + 'static, V: Key + 'static> MultimapRange<'_, K, V> {
1047    fn new(
1048        inner: BtreeRangeIter<K, &'static DynamicCollection<V>>,
1049        guard: Arc<TransactionGuard>,
1050        mem: Arc<TransactionalMemory>,
1051    ) -> Self {
1052        Self {
1053            inner,
1054            mem,
1055            transaction_guard: guard,
1056            _key_type: Default::default(),
1057            _value_type: Default::default(),
1058            _lifetime: Default::default(),
1059        }
1060    }
1061}
1062
1063impl<'a, K: Key + 'static, V: Key + 'static> Iterator for MultimapRange<'a, K, V> {
1064    type Item = Result<(AccessGuard<'a, K>, MultimapValue<'a, V>)>;
1065
1066    fn next(&mut self) -> Option<Self::Item> {
1067        match self.inner.next()? {
1068            Ok(entry) => {
1069                let key = AccessGuard::with_owned_value(entry.key_data());
1070                let (page, _, value_range, decompressed_value) = entry.into_raw();
1071                let collection = if let Some(bytes) = decompressed_value {
1072                    AccessGuard::with_owned_value(bytes)
1073                } else {
1074                    AccessGuard::with_page(page, value_range)
1075                };
1076                Some(
1077                    DynamicCollection::iter(
1078                        collection,
1079                        self.transaction_guard.clone(),
1080                        self.mem.clone(),
1081                    )
1082                    .map(|iter| (key, iter)),
1083                )
1084            }
1085            Err(err) => Some(Err(err)),
1086        }
1087    }
1088}
1089
1090impl<K: Key + 'static, V: Key + 'static> DoubleEndedIterator for MultimapRange<'_, K, V> {
1091    fn next_back(&mut self) -> Option<Self::Item> {
1092        match self.inner.next_back()? {
1093            Ok(entry) => {
1094                let key = AccessGuard::with_owned_value(entry.key_data());
1095                let (page, _, value_range, decompressed_value) = entry.into_raw();
1096                let collection = if let Some(bytes) = decompressed_value {
1097                    AccessGuard::with_owned_value(bytes)
1098                } else {
1099                    AccessGuard::with_page(page, value_range)
1100                };
1101                Some(
1102                    DynamicCollection::iter(
1103                        collection,
1104                        self.transaction_guard.clone(),
1105                        self.mem.clone(),
1106                    )
1107                    .map(|iter| (key, iter)),
1108                )
1109            }
1110            Err(err) => Some(Err(err)),
1111        }
1112    }
1113}
1114
1115/// A multimap table
1116///
1117/// [Multimap tables](https://en.wikipedia.org/wiki/Multimap) may have multiple values associated with each key
1118pub struct MultimapTable<'txn, K: Key + 'static, V: Key + 'static> {
1119    name: String,
1120    num_values: u64,
1121    transaction: &'txn WriteTransaction,
1122    freed_pages: Arc<Mutex<Vec<PageNumber>>>,
1123    allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
1124    tree: BtreeMut<'txn, K, &'static DynamicCollection<V>>,
1125    mem: Arc<TransactionalMemory>,
1126    _value_type: PhantomData<V>,
1127}
1128
1129impl<K: Key + 'static, V: Key + 'static> MultimapTableHandle for MultimapTable<'_, K, V> {
1130    fn name(&self) -> &str {
1131        &self.name
1132    }
1133}
1134
1135impl<'txn, K: Key + 'static, V: Key + 'static> MultimapTable<'txn, K, V> {
1136    pub(crate) fn new(
1137        name: &str,
1138        table_root: Option<BtreeHeader>,
1139        num_values: u64,
1140        freed_pages: Arc<Mutex<Vec<PageNumber>>>,
1141        allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
1142        mem: Arc<TransactionalMemory>,
1143        transaction: &'txn WriteTransaction,
1144    ) -> MultimapTable<'txn, K, V> {
1145        MultimapTable {
1146            name: name.to_string(),
1147            num_values,
1148            transaction,
1149            freed_pages: freed_pages.clone(),
1150            allocated_pages: allocated_pages.clone(),
1151            tree: BtreeMut::new(
1152                table_root,
1153                transaction.transaction_guard(),
1154                mem.clone(),
1155                freed_pages,
1156                allocated_pages,
1157            ),
1158            mem,
1159            _value_type: Default::default(),
1160        }
1161    }
1162
1163    #[allow(dead_code)]
1164    #[cfg(feature = "std")]
1165    pub(crate) fn print_debug(&self, include_values: bool) -> Result {
1166        self.tree.print_debug(include_values)
1167    }
1168
1169    /// Add the given value to the mapping of the key
1170    ///
1171    /// Returns `true` if the key-value pair was present
1172    pub fn insert<'k, 'v>(
1173        &mut self,
1174        key: impl Borrow<K::SelfType<'k>>,
1175        value: impl Borrow<V::SelfType<'v>>,
1176    ) -> Result<bool> {
1177        let value_bytes = V::as_bytes(value.borrow());
1178        let value_bytes_ref = value_bytes.as_ref();
1179        if value_bytes_ref.len() > MAX_VALUE_LENGTH {
1180            return Err(StorageError::ValueTooLarge(value_bytes_ref.len()));
1181        }
1182        let key_len = K::as_bytes(key.borrow()).as_ref().len();
1183        if key_len > MAX_VALUE_LENGTH {
1184            return Err(StorageError::ValueTooLarge(key_len));
1185        }
1186        if value_bytes_ref.len() + key_len > MAX_PAIR_LENGTH {
1187            return Err(StorageError::ValueTooLarge(value_bytes_ref.len() + key_len));
1188        }
1189        let get_result = self.tree.get(key.borrow())?;
1190        let existed = if get_result.is_some() {
1191            #[allow(clippy::unnecessary_unwrap)]
1192            let guard = get_result.unwrap();
1193            let collection_type = guard.value().collection_type()?;
1194            match collection_type {
1195                Inline => {
1196                    let leaf_data = guard.value().as_inline();
1197                    let accessor = LeafAccessor::new(
1198                        leaf_data,
1199                        V::fixed_width(),
1200                        <() as Value>::fixed_width(),
1201                    )?;
1202                    let (position, found) = accessor.position::<V>(value_bytes_ref);
1203                    if found {
1204                        return Ok(true);
1205                    }
1206
1207                    let num_pairs = accessor.num_pairs();
1208                    let new_pairs = num_pairs + 1;
1209                    let new_pair_bytes =
1210                        accessor.length_of_pairs(0, accessor.num_pairs()) + value_bytes_ref.len();
1211                    let new_key_bytes =
1212                        accessor.length_of_keys(0, accessor.num_pairs()) + value_bytes_ref.len();
1213                    let required_inline_bytes = RawLeafBuilder::required_bytes(
1214                        new_pairs,
1215                        new_pair_bytes,
1216                        V::fixed_width(),
1217                        <() as Value>::fixed_width(),
1218                    );
1219
1220                    if required_inline_bytes < self.mem.get_page_size() / 2 {
1221                        let mut data = vec![0; required_inline_bytes];
1222                        let mut builder = RawLeafBuilder::new(
1223                            &mut data,
1224                            new_pairs,
1225                            V::fixed_width(),
1226                            <() as Value>::fixed_width(),
1227                            new_key_bytes,
1228                        );
1229                        for i in 0..accessor.num_pairs() {
1230                            if i == position {
1231                                builder
1232                                    .append(value_bytes_ref, <() as Value>::as_bytes(&()).as_ref());
1233                            }
1234                            let entry = accessor.entry(i).ok_or_else(|| {
1235                                StorageError::Corrupted(format!(
1236                                    "invalid entry index {i} in multimap insert inline expansion"
1237                                ))
1238                            })?;
1239                            builder.append(entry.key(), entry.value());
1240                        }
1241                        if position == accessor.num_pairs() {
1242                            builder.append(value_bytes_ref, <() as Value>::as_bytes(&()).as_ref());
1243                        }
1244                        drop(builder);
1245                        drop(guard);
1246                        let inline_data = DynamicCollection::<V>::make_inline_data(&data);
1247                        self.tree
1248                            .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1249                    } else {
1250                        // convert into a subtree
1251                        let mut allocated = self.allocated_pages.lock();
1252                        let mut page = self.mem.allocate(leaf_data.len(), &mut allocated)?;
1253                        drop(allocated);
1254                        page.memory_mut()?[..leaf_data.len()].copy_from_slice(leaf_data);
1255                        let page_number = page.get_page_number();
1256                        drop(page);
1257                        drop(guard);
1258
1259                        // Don't bother computing the checksum, since we're about to modify the tree
1260                        let mut subtree: BtreeMut<'_, V, ()> = BtreeMut::new(
1261                            Some(BtreeHeader::new(page_number, 0, num_pairs as u64)),
1262                            self.transaction.transaction_guard(),
1263                            self.mem.clone(),
1264                            self.freed_pages.clone(),
1265                            self.allocated_pages.clone(),
1266                        );
1267                        let existed = subtree.insert(value.borrow(), &())?.is_some();
1268                        assert_eq!(existed, found);
1269                        let subtree_root = subtree.get_root().ok_or_else(|| {
1270                            StorageError::Internal(
1271                                "missing subtree root after insert in inline-to-subtree conversion"
1272                                    .to_string(),
1273                            )
1274                        })?;
1275                        let subtree_data = DynamicCollection::<V>::make_subtree_data(subtree_root);
1276                        self.tree
1277                            .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1278                    }
1279
1280                    found
1281                }
1282                SubtreeV2 => {
1283                    let mut subtree: BtreeMut<'_, V, ()> = BtreeMut::new(
1284                        Some(guard.value().as_subtree()),
1285                        self.transaction.transaction_guard(),
1286                        self.mem.clone(),
1287                        self.freed_pages.clone(),
1288                        self.allocated_pages.clone(),
1289                    );
1290                    drop(guard);
1291                    let existed = subtree.insert(value.borrow(), &())?.is_some();
1292                    let subtree_root = subtree.get_root().ok_or_else(|| {
1293                        StorageError::Internal(
1294                            "missing subtree root after insert into existing subtree".to_string(),
1295                        )
1296                    })?;
1297                    let subtree_data = DynamicCollection::<V>::make_subtree_data(subtree_root);
1298                    self.tree
1299                        .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1300
1301                    existed
1302                }
1303            }
1304        } else {
1305            drop(get_result);
1306            let required_inline_bytes = RawLeafBuilder::required_bytes(
1307                1,
1308                value_bytes_ref.len(),
1309                V::fixed_width(),
1310                <() as Value>::fixed_width(),
1311            );
1312            if required_inline_bytes < self.mem.get_page_size() / 2 {
1313                let mut data = vec![0; required_inline_bytes];
1314                let mut builder = RawLeafBuilder::new(
1315                    &mut data,
1316                    1,
1317                    V::fixed_width(),
1318                    <() as Value>::fixed_width(),
1319                    value_bytes_ref.len(),
1320                );
1321                builder.append(value_bytes_ref, <() as Value>::as_bytes(&()).as_ref());
1322                drop(builder);
1323                let inline_data = DynamicCollection::<V>::make_inline_data(&data);
1324                self.tree
1325                    .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1326            } else {
1327                let mut subtree: BtreeMut<'_, V, ()> = BtreeMut::new(
1328                    None,
1329                    self.transaction.transaction_guard(),
1330                    self.mem.clone(),
1331                    self.freed_pages.clone(),
1332                    self.allocated_pages.clone(),
1333                );
1334                subtree.insert(value.borrow(), &())?;
1335                let subtree_root = subtree.get_root().ok_or_else(|| {
1336                    StorageError::Internal(
1337                        "missing subtree root after insert into new subtree".to_string(),
1338                    )
1339                })?;
1340                let subtree_data = DynamicCollection::<V>::make_subtree_data(subtree_root);
1341                self.tree
1342                    .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1343            }
1344            false
1345        };
1346
1347        if !existed {
1348            self.num_values += 1;
1349            if self.transaction.cdc_log.is_some() {
1350                self.transaction.record_cdc(CdcEvent {
1351                    table_name: self.name.clone(),
1352                    op: ChangeOp::Insert,
1353                    key: K::as_bytes(key.borrow()).as_ref().to_vec(),
1354                    new_value: Some(V::as_bytes(value.borrow()).as_ref().to_vec()),
1355                    old_value: None,
1356                });
1357            }
1358        }
1359
1360        Ok(existed)
1361    }
1362
1363    /// Removes the given key-value pair
1364    ///
1365    /// Returns `true` if the key-value pair was present
1366    pub fn remove<'k, 'v>(
1367        &mut self,
1368        key: impl Borrow<K::SelfType<'k>>,
1369        value: impl Borrow<V::SelfType<'v>>,
1370    ) -> Result<bool> {
1371        let get_result = self.tree.get(key.borrow())?;
1372        if get_result.is_none() {
1373            return Ok(false);
1374        }
1375        let guard = get_result.unwrap();
1376        let v = guard.value();
1377        let existed = match v.collection_type()? {
1378            Inline => {
1379                let leaf_data = v.as_inline();
1380                let accessor =
1381                    LeafAccessor::new(leaf_data, V::fixed_width(), <() as Value>::fixed_width())?;
1382                if let Some(position) = accessor.find_key::<V>(V::as_bytes(value.borrow()).as_ref())
1383                {
1384                    let old_num_pairs = accessor.num_pairs();
1385                    if old_num_pairs == 1 {
1386                        drop(guard);
1387                        self.tree.remove(key.borrow())?;
1388                    } else {
1389                        let old_pairs_len = accessor.length_of_pairs(0, old_num_pairs);
1390                        let removed_value_len = accessor
1391                            .entry(position)
1392                            .ok_or_else(|| {
1393                                StorageError::Corrupted(format!(
1394                                    "invalid entry index {position} in multimap remove"
1395                                ))
1396                            })?
1397                            .key()
1398                            .len();
1399                        let required = RawLeafBuilder::required_bytes(
1400                            old_num_pairs - 1,
1401                            old_pairs_len - removed_value_len,
1402                            V::fixed_width(),
1403                            <() as Value>::fixed_width(),
1404                        );
1405                        let mut new_data = vec![0; required];
1406                        let new_key_len =
1407                            accessor.length_of_keys(0, old_num_pairs) - removed_value_len;
1408                        let mut builder = RawLeafBuilder::new(
1409                            &mut new_data,
1410                            old_num_pairs - 1,
1411                            V::fixed_width(),
1412                            <() as Value>::fixed_width(),
1413                            new_key_len,
1414                        );
1415                        for i in 0..old_num_pairs {
1416                            if i != position {
1417                                let entry = accessor.entry(i).ok_or_else(|| {
1418                                    StorageError::Corrupted(format!(
1419                                        "invalid entry index {i} in multimap remove rebuild"
1420                                    ))
1421                                })?;
1422                                builder.append(entry.key(), entry.value());
1423                            }
1424                        }
1425                        drop(builder);
1426                        drop(guard);
1427
1428                        let inline_data = DynamicCollection::<V>::make_inline_data(&new_data);
1429                        self.tree
1430                            .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1431                    }
1432                    true
1433                } else {
1434                    drop(guard);
1435                    false
1436                }
1437            }
1438            SubtreeV2 => {
1439                let mut subtree: BtreeMut<V, ()> = BtreeMut::new(
1440                    Some(v.as_subtree()),
1441                    self.transaction.transaction_guard(),
1442                    self.mem.clone(),
1443                    self.freed_pages.clone(),
1444                    self.allocated_pages.clone(),
1445                );
1446                drop(guard);
1447                let existed = subtree.remove(value.borrow())?.is_some();
1448
1449                if let Some(BtreeHeader {
1450                    root: new_root,
1451                    checksum: new_checksum,
1452                    length: new_length,
1453                }) = subtree.get_root()
1454                {
1455                    let page = self.mem.get_page(new_root)?;
1456                    match page.memory()[0] {
1457                        LEAF => {
1458                            let accessor = LeafAccessor::new(
1459                                page.memory(),
1460                                V::fixed_width(),
1461                                <() as Value>::fixed_width(),
1462                            )?;
1463                            let len = accessor.total_length();
1464                            if len < self.mem.get_page_size() / 2 {
1465                                let inline_data =
1466                                    DynamicCollection::<V>::make_inline_data(&page.memory()[..len]);
1467                                self.tree
1468                                    .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1469                                drop(page);
1470                                let mut allocated_pages = self.allocated_pages.lock();
1471                                if !self.mem.free_if_uncommitted(new_root, &mut allocated_pages) {
1472                                    (*self.freed_pages).lock().push(new_root);
1473                                }
1474                            } else {
1475                                let subtree_data =
1476                                    DynamicCollection::<V>::make_subtree_data(BtreeHeader::new(
1477                                        new_root,
1478                                        new_checksum,
1479                                        accessor.num_pairs() as u64,
1480                                    ));
1481                                self.tree
1482                                    .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1483                            }
1484                        }
1485                        BRANCH => {
1486                            let subtree_data = DynamicCollection::<V>::make_subtree_data(
1487                                BtreeHeader::new(new_root, new_checksum, new_length),
1488                            );
1489                            self.tree
1490                                .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1491                        }
1492                        other => {
1493                            return Err(StorageError::invalid_page_type(new_root, other));
1494                        }
1495                    }
1496                } else {
1497                    self.tree.remove(key.borrow())?;
1498                }
1499
1500                existed
1501            }
1502        };
1503
1504        if existed {
1505            self.num_values -= 1;
1506            if self.transaction.cdc_log.is_some() {
1507                self.transaction.record_cdc(CdcEvent {
1508                    table_name: self.name.clone(),
1509                    op: ChangeOp::Delete,
1510                    key: K::as_bytes(key.borrow()).as_ref().to_vec(),
1511                    new_value: None,
1512                    old_value: Some(V::as_bytes(value.borrow()).as_ref().to_vec()),
1513                });
1514            }
1515        }
1516
1517        Ok(existed)
1518    }
1519
1520    /// Removes all keys (and their associated values) in the given range
1521    ///
1522    /// Returns the number of keys removed
1523    pub fn drain<'a, KR>(&mut self, range: impl RangeBounds<KR> + 'a) -> Result<u64>
1524    where
1525        KR: Borrow<K::SelfType<'a>> + 'a,
1526    {
1527        // Collect keys first to avoid borrow conflict with remove_all
1528        let keys: Vec<Vec<u8>> = self
1529            .range(range)?
1530            .map(|result| result.map(|(k, _)| K::as_bytes(&k.value()).as_ref().to_vec()))
1531            .collect::<Result<Vec<_>>>()?;
1532
1533        let mut count = 0u64;
1534        for key_bytes in &keys {
1535            let key = K::from_bytes(key_bytes);
1536            // Consume the returned iterator to trigger page cleanup
1537            for result in self.remove_all(&key)? {
1538                result?;
1539            }
1540            count += 1;
1541        }
1542        Ok(count)
1543    }
1544
1545    /// Removes all keys (and their associated values) from the table
1546    ///
1547    /// Returns the number of keys removed
1548    pub fn drain_all(&mut self) -> Result<u64> {
1549        self.drain::<K::SelfType<'_>>(..)
1550    }
1551
1552    /// Removes all values for the given key
1553    ///
1554    /// Returns an iterator over the removed values. Values are in ascending order.
1555    pub fn remove_all<'a>(
1556        &mut self,
1557        key: impl Borrow<K::SelfType<'a>>,
1558    ) -> Result<MultimapValue<'_, V>> {
1559        // Collect values for CDC before removal
1560        if self.transaction.cdc_log.is_some() {
1561            let key_bytes = K::as_bytes(key.borrow()).as_ref().to_vec();
1562            let guard = self.transaction.transaction_guard();
1563            if let Some(collection) = self.tree.get(key.borrow())? {
1564                let value_iter = DynamicCollection::iter(collection, guard, self.mem.clone())?;
1565                let mut cdc_events: Vec<CdcEvent> = Vec::new();
1566                for result in value_iter {
1567                    let val = result?;
1568                    cdc_events.push(CdcEvent {
1569                        table_name: self.name.clone(),
1570                        op: ChangeOp::Delete,
1571                        key: key_bytes.clone(),
1572                        new_value: None,
1573                        old_value: Some(V::as_bytes(&val.value()).as_ref().to_vec()),
1574                    });
1575                }
1576                for event in cdc_events {
1577                    self.transaction.record_cdc(event);
1578                }
1579            }
1580        }
1581
1582        let iter = if let Some(collection) = self.tree.remove(key.borrow())? {
1583            let mut pages = vec![];
1584            if matches!(
1585                collection.value().collection_type()?,
1586                DynamicCollectionType::SubtreeV2
1587            ) {
1588                let root = collection.value().as_subtree().root;
1589                let all_pages = AllPageNumbersBtreeIter::new(
1590                    root,
1591                    V::fixed_width(),
1592                    <() as Value>::fixed_width(),
1593                    self.mem.clone(),
1594                )?;
1595                for page in all_pages {
1596                    pages.push(page?);
1597                }
1598            }
1599
1600            self.num_values -= collection.value().get_num_values()?;
1601
1602            DynamicCollection::iter_free_on_drop(
1603                collection,
1604                pages,
1605                self.freed_pages.clone(),
1606                self.allocated_pages.clone(),
1607                self.transaction.transaction_guard(),
1608                self.mem.clone(),
1609            )?
1610        } else {
1611            MultimapValue::new_subtree(
1612                BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(
1613                    &(..),
1614                    None,
1615                    None,
1616                    if self.mem.compression().is_enabled() {
1617                        None
1618                    } else {
1619                        <() as Value>::fixed_width()
1620                    },
1621                    self.mem.clone(),
1622                    false,
1623                )?,
1624                0,
1625                self.transaction.transaction_guard(),
1626            )
1627        };
1628
1629        Ok(iter)
1630    }
1631}
1632
1633impl<K: Key + 'static, V: Key + 'static> ReadableTableMetadata for MultimapTable<'_, K, V> {
1634    fn stats(&self) -> Result<TableStats> {
1635        let tree_stats = multimap_btree_stats(
1636            self.tree.get_root().map(|x| x.root),
1637            &self.mem,
1638            K::fixed_width(),
1639            V::fixed_width(),
1640        )?;
1641
1642        Ok(TableStats {
1643            tree_height: tree_stats.tree_height,
1644            leaf_pages: tree_stats.leaf_pages,
1645            branch_pages: tree_stats.branch_pages,
1646            stored_leaf_bytes: tree_stats.stored_leaf_bytes,
1647            metadata_bytes: tree_stats.metadata_bytes,
1648            fragmented_bytes: tree_stats.fragmented_bytes,
1649        })
1650    }
1651
1652    /// Returns the number of key-value pairs in the table
1653    fn len(&self) -> Result<u64> {
1654        Ok(self.num_values)
1655    }
1656}
1657
1658impl<K: Key + 'static, V: Key + 'static> ReadableMultimapTable<K, V> for MultimapTable<'_, K, V> {
1659    fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<'_, V>> {
1660        let guard = self.transaction.transaction_guard();
1661        let iter = if let Some(collection) = self.tree.get(key.borrow())? {
1662            DynamicCollection::iter(collection, guard, self.mem.clone())?
1663        } else {
1664            MultimapValue::new_subtree(
1665                BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(
1666                    &(..),
1667                    None,
1668                    None,
1669                    if self.mem.compression().is_enabled() {
1670                        None
1671                    } else {
1672                        <() as Value>::fixed_width()
1673                    },
1674                    self.mem.clone(),
1675                    false,
1676                )?,
1677                0,
1678                guard,
1679            )
1680        };
1681
1682        Ok(iter)
1683    }
1684
1685    fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<MultimapRange<'_, K, V>>
1686    where
1687        KR: Borrow<K::SelfType<'a>> + 'a,
1688    {
1689        let inner = self.tree.range(&range)?;
1690        Ok(MultimapRange::new(
1691            inner,
1692            self.transaction.transaction_guard(),
1693            self.mem.clone(),
1694        ))
1695    }
1696}
1697
1698impl<K: Key + 'static, V: Key + 'static> Sealed for MultimapTable<'_, K, V> {}
1699
1700impl<K: Key + 'static, V: Key + 'static> Drop for MultimapTable<'_, K, V> {
1701    fn drop(&mut self) {
1702        self.transaction
1703            .close_table(&self.name, &self.tree, self.num_values);
1704    }
1705}
1706
1707pub trait ReadableMultimapTable<K: Key + 'static, V: Key + 'static>: ReadableTableMetadata {
1708    /// Returns an iterator over all values for the given key. Values are in ascending order.
1709    fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<'_, V>>;
1710
1711    /// Returns a double-ended iterator over a range of elements in the table
1712    fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<MultimapRange<'_, K, V>>
1713    where
1714        KR: Borrow<K::SelfType<'a>> + 'a;
1715
1716    /// Returns an double-ended iterator over all elements in the table. Values are in ascending
1717    /// order.
1718    fn iter(&self) -> Result<MultimapRange<'_, K, V>> {
1719        self.range::<K::SelfType<'_>>(..)
1720    }
1721}
1722
1723/// A read-only untyped multimap table
1724pub struct ReadOnlyUntypedMultimapTable {
1725    num_values: u64,
1726    tree: RawBtree,
1727    fixed_key_size: Option<usize>,
1728    fixed_value_size: Option<usize>,
1729    mem: Arc<TransactionalMemory>,
1730}
1731
1732impl Sealed for ReadOnlyUntypedMultimapTable {}
1733
1734impl ReadableTableMetadata for ReadOnlyUntypedMultimapTable {
1735    /// Retrieves information about storage usage for the table
1736    fn stats(&self) -> Result<TableStats> {
1737        let tree_stats = multimap_btree_stats(
1738            self.tree.get_root().map(|x| x.root),
1739            &self.mem,
1740            self.fixed_key_size,
1741            self.fixed_value_size,
1742        )?;
1743
1744        Ok(TableStats {
1745            tree_height: tree_stats.tree_height,
1746            leaf_pages: tree_stats.leaf_pages,
1747            branch_pages: tree_stats.branch_pages,
1748            stored_leaf_bytes: tree_stats.stored_leaf_bytes,
1749            metadata_bytes: tree_stats.metadata_bytes,
1750            fragmented_bytes: tree_stats.fragmented_bytes,
1751        })
1752    }
1753
1754    fn len(&self) -> Result<u64> {
1755        Ok(self.num_values)
1756    }
1757}
1758
1759impl ReadOnlyUntypedMultimapTable {
1760    pub(crate) fn new(
1761        root: Option<BtreeHeader>,
1762        num_values: u64,
1763        fixed_key_size: Option<usize>,
1764        fixed_value_size: Option<usize>,
1765        mem: Arc<TransactionalMemory>,
1766    ) -> Self {
1767        Self {
1768            num_values,
1769            tree: RawBtree::new(
1770                root,
1771                fixed_key_size,
1772                DynamicCollection::<()>::fixed_width_with(fixed_value_size),
1773                mem.clone(),
1774            ),
1775            fixed_key_size,
1776            fixed_value_size,
1777            mem,
1778        }
1779    }
1780}
1781
1782/// A read-only multimap table
1783pub struct ReadOnlyMultimapTable<K: Key + 'static, V: Key + 'static> {
1784    tree: Btree<K, &'static DynamicCollection<V>>,
1785    num_values: u64,
1786    mem: Arc<TransactionalMemory>,
1787    transaction_guard: Arc<TransactionGuard>,
1788    _value_type: PhantomData<V>,
1789}
1790
1791impl<K: Key + 'static, V: Key + 'static> ReadOnlyMultimapTable<K, V> {
1792    pub(crate) fn new(
1793        root: Option<BtreeHeader>,
1794        num_values: u64,
1795        hint: PageHint,
1796        guard: Arc<TransactionGuard>,
1797        mem: Arc<TransactionalMemory>,
1798    ) -> Result<ReadOnlyMultimapTable<K, V>> {
1799        Ok(ReadOnlyMultimapTable {
1800            tree: Btree::new(root, hint, guard.clone(), mem.clone())?,
1801            num_values,
1802            mem,
1803            transaction_guard: guard,
1804            _value_type: Default::default(),
1805        })
1806    }
1807
1808    /// This method is like [`ReadableMultimapTable::get()`], but the iterator is reference counted and keeps the transaction
1809    /// alive until it is dropped.
1810    pub fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<'static, V>> {
1811        let iter = if let Some(collection) = self.tree.get(key.borrow())? {
1812            DynamicCollection::iter(collection, self.transaction_guard.clone(), self.mem.clone())?
1813        } else {
1814            MultimapValue::new_subtree(
1815                BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(
1816                    &(..),
1817                    None,
1818                    None,
1819                    if self.mem.compression().is_enabled() {
1820                        None
1821                    } else {
1822                        <() as Value>::fixed_width()
1823                    },
1824                    self.mem.clone(),
1825                    false,
1826                )?,
1827                0,
1828                self.transaction_guard.clone(),
1829            )
1830        };
1831
1832        Ok(iter)
1833    }
1834
1835    /// This method is like [`ReadableMultimapTable::range()`], but the iterator is reference counted and keeps the transaction
1836    /// alive until it is dropped.
1837    pub fn range<'a, KR>(&self, range: impl RangeBounds<KR>) -> Result<MultimapRange<'static, K, V>>
1838    where
1839        KR: Borrow<K::SelfType<'a>>,
1840    {
1841        let inner = self.tree.range(&range)?;
1842        Ok(MultimapRange::new(
1843            inner,
1844            self.transaction_guard.clone(),
1845            self.mem.clone(),
1846        ))
1847    }
1848}
1849
1850impl<K: Key + 'static, V: Key + 'static> ReadableTableMetadata for ReadOnlyMultimapTable<K, V> {
1851    fn stats(&self) -> Result<TableStats> {
1852        let tree_stats = multimap_btree_stats(
1853            self.tree.get_root().map(|x| x.root),
1854            &self.mem,
1855            K::fixed_width(),
1856            V::fixed_width(),
1857        )?;
1858
1859        Ok(TableStats {
1860            tree_height: tree_stats.tree_height,
1861            leaf_pages: tree_stats.leaf_pages,
1862            branch_pages: tree_stats.branch_pages,
1863            stored_leaf_bytes: tree_stats.stored_leaf_bytes,
1864            metadata_bytes: tree_stats.metadata_bytes,
1865            fragmented_bytes: tree_stats.fragmented_bytes,
1866        })
1867    }
1868
1869    fn len(&self) -> Result<u64> {
1870        Ok(self.num_values)
1871    }
1872}
1873
1874impl<K: Key + 'static, V: Key + 'static> ReadableMultimapTable<K, V>
1875    for ReadOnlyMultimapTable<K, V>
1876{
1877    /// Returns an iterator over all values for the given key. Values are in ascending order.
1878    fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<'_, V>> {
1879        let iter = if let Some(collection) = self.tree.get(key.borrow())? {
1880            DynamicCollection::iter(collection, self.transaction_guard.clone(), self.mem.clone())?
1881        } else {
1882            MultimapValue::new_subtree(
1883                BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(
1884                    &(..),
1885                    None,
1886                    None,
1887                    if self.mem.compression().is_enabled() {
1888                        None
1889                    } else {
1890                        <() as Value>::fixed_width()
1891                    },
1892                    self.mem.clone(),
1893                    false,
1894                )?,
1895                0,
1896                self.transaction_guard.clone(),
1897            )
1898        };
1899
1900        Ok(iter)
1901    }
1902
1903    fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<MultimapRange<'_, K, V>>
1904    where
1905        KR: Borrow<K::SelfType<'a>> + 'a,
1906    {
1907        let inner = self.tree.range(&range)?;
1908        Ok(MultimapRange::new(
1909            inner,
1910            self.transaction_guard.clone(),
1911            self.mem.clone(),
1912        ))
1913    }
1914}
1915
1916impl<K: Key, V: Key> Sealed for ReadOnlyMultimapTable<K, V> {}