1use crate::multimap_table::DynamicCollectionType::{Inline, Subtree};
2use crate::sealed::Sealed;
3use crate::table::TableStats;
4use crate::tree_store::{
5 btree_stats, AllPageNumbersBtreeIter, BranchAccessor, Btree, BtreeMut, BtreeRangeIter,
6 BtreeStats, CachePriority, Checksum, LeafAccessor, LeafMutator, Page, PageHint, PageNumber,
7 RawBtree, RawLeafBuilder, TransactionalMemory, UntypedBtreeMut, BRANCH, LEAF, MAX_VALUE_LENGTH,
8};
9use crate::types::{RedbKey, RedbValue, TypeName};
10use crate::{AccessGuard, MultimapTableHandle, Result, StorageError, WriteTransaction};
11use std::borrow::Borrow;
12use std::cmp::max;
13use std::convert::TryInto;
14use std::marker::PhantomData;
15use std::mem;
16use std::mem::size_of;
17use std::ops::{RangeBounds, RangeFull};
18use std::sync::{Arc, Mutex};
19
20pub(crate) fn multimap_btree_stats(
21 root: Option<PageNumber>,
22 mem: &TransactionalMemory,
23 fixed_key_size: Option<usize>,
24 fixed_value_size: Option<usize>,
25) -> Result<BtreeStats> {
26 if let Some(root) = root {
27 multimap_stats_helper(root, mem, fixed_key_size, fixed_value_size)
28 } else {
29 Ok(BtreeStats {
30 tree_height: 0,
31 leaf_pages: 0,
32 branch_pages: 0,
33 stored_leaf_bytes: 0,
34 metadata_bytes: 0,
35 fragmented_bytes: 0,
36 })
37 }
38}
39
40fn multimap_stats_helper(
41 page_number: PageNumber,
42 mem: &TransactionalMemory,
43 fixed_key_size: Option<usize>,
44 fixed_value_size: Option<usize>,
45) -> Result<BtreeStats> {
46 let page = mem.get_page(page_number)?;
47 let node_mem = page.memory();
48 match node_mem[0] {
49 LEAF => {
50 let accessor = LeafAccessor::new(
51 page.memory(),
52 fixed_key_size,
53 DynamicCollection::<()>::fixed_width_with(fixed_value_size),
54 );
55 let mut leaf_bytes = 0u64;
56 let mut is_branch = false;
57 for i in 0..accessor.num_pairs() {
58 let entry = accessor.entry(i).unwrap();
59 let collection: &UntypedDynamicCollection =
60 UntypedDynamicCollection::new(entry.value());
61 match collection.collection_type() {
62 Inline => {
63 let inline_accessor = LeafAccessor::new(
64 collection.as_inline(),
65 fixed_value_size,
66 <() as RedbValue>::fixed_width(),
67 );
68 leaf_bytes +=
69 inline_accessor.length_of_pairs(0, inline_accessor.num_pairs()) as u64;
70 }
71 Subtree => {
72 is_branch = true;
73 }
74 }
75 }
76 let mut overhead_bytes = (accessor.total_length() as u64) - leaf_bytes;
77 let mut fragmented_bytes = (page.memory().len() - accessor.total_length()) as u64;
78 let mut max_child_height = 0;
79 let (mut leaf_pages, mut branch_pages) = if is_branch { (0, 1) } else { (1, 0) };
80
81 for i in 0..accessor.num_pairs() {
82 let entry = accessor.entry(i).unwrap();
83 let collection: &UntypedDynamicCollection =
84 UntypedDynamicCollection::new(entry.value());
85 match collection.collection_type() {
86 Inline => {
87 }
89 Subtree => {
90 let stats = btree_stats(
92 Some(collection.as_subtree().0),
93 mem,
94 fixed_value_size,
95 <() as RedbValue>::fixed_width(),
96 )?;
97 max_child_height = max(max_child_height, stats.tree_height);
98 branch_pages += stats.branch_pages;
99 leaf_pages += stats.leaf_pages;
100 fragmented_bytes += stats.fragmented_bytes;
101 overhead_bytes += stats.metadata_bytes;
102 leaf_bytes += stats.stored_leaf_bytes;
103 }
104 }
105 }
106
107 Ok(BtreeStats {
108 tree_height: max_child_height + 1,
109 leaf_pages,
110 branch_pages,
111 stored_leaf_bytes: leaf_bytes,
112 metadata_bytes: overhead_bytes,
113 fragmented_bytes,
114 })
115 }
116 BRANCH => {
117 let accessor = BranchAccessor::new(&page, fixed_key_size);
118 let mut max_child_height = 0;
119 let mut leaf_pages = 0;
120 let mut branch_pages = 1;
121 let mut stored_leaf_bytes = 0;
122 let mut metadata_bytes = accessor.total_length() as u64;
123 let mut fragmented_bytes = (page.memory().len() - accessor.total_length()) as u64;
124 for i in 0..accessor.count_children() {
125 if let Some(child) = accessor.child_page(i) {
126 let stats =
127 multimap_stats_helper(child, mem, fixed_key_size, fixed_value_size)?;
128 max_child_height = max(max_child_height, stats.tree_height);
129 leaf_pages += stats.leaf_pages;
130 branch_pages += stats.branch_pages;
131 stored_leaf_bytes += stats.stored_leaf_bytes;
132 metadata_bytes += stats.metadata_bytes;
133 fragmented_bytes += stats.fragmented_bytes;
134 }
135 }
136
137 Ok(BtreeStats {
138 tree_height: max_child_height + 1,
139 leaf_pages,
140 branch_pages,
141 stored_leaf_bytes,
142 metadata_bytes,
143 fragmented_bytes,
144 })
145 }
146 _ => unreachable!(),
147 }
148}
149
150pub(crate) fn verify_tree_and_subtree_checksums(
152 root: Option<(PageNumber, Checksum)>,
153 key_size: Option<usize>,
154 value_size: Option<usize>,
155 mem: &TransactionalMemory,
156) -> Result<bool> {
157 if let Some((root, root_checksum)) = root {
158 if !RawBtree::new(
159 Some((root, root_checksum)),
160 key_size,
161 DynamicCollection::<()>::fixed_width_with(value_size),
162 mem,
163 )
164 .verify_checksum()?
165 {
166 return Ok(false);
167 }
168
169 let table_pages_iter = AllPageNumbersBtreeIter::new(
170 root,
171 key_size,
172 DynamicCollection::<()>::fixed_width_with(value_size),
173 mem,
174 )?;
175 for table_page in table_pages_iter {
176 let page = mem.get_page(table_page?)?;
177 let subtree_roots = parse_subtree_roots(&page, key_size, value_size);
178 for (sub_root, sub_root_checksum) in subtree_roots {
179 if !RawBtree::new(
180 Some((sub_root, sub_root_checksum)),
181 value_size,
182 <()>::fixed_width(),
183 mem,
184 )
185 .verify_checksum()?
186 {
187 return Ok(false);
188 }
189 }
190 }
191 }
192
193 Ok(true)
194}
195
196pub(crate) fn finalize_tree_and_subtree_checksums(
199 root: Option<(PageNumber, Checksum)>,
200 key_size: Option<usize>,
201 value_size: Option<usize>,
202 mem: &TransactionalMemory,
203) -> Result<Option<(PageNumber, Checksum)>> {
204 let freed_pages = Arc::new(Mutex::new(vec![]));
205 let mut tree = UntypedBtreeMut::new(
206 root,
207 mem,
208 freed_pages.clone(),
209 key_size,
210 DynamicCollection::<()>::fixed_width_with(value_size),
211 );
212 tree.dirty_leaf_visitor(|mut leaf_page| {
213 let mut sub_root_updates = vec![];
214 let accessor = LeafAccessor::new(
215 leaf_page.memory(),
216 key_size,
217 DynamicCollection::<()>::fixed_width_with(value_size),
218 );
219 for i in 0..accessor.num_pairs() {
220 let entry = accessor.entry(i).unwrap();
221 let collection = <&DynamicCollection<()>>::from_bytes(entry.value());
222 if matches!(collection.collection_type(), DynamicCollectionType::Subtree) {
223 let sub_root = collection.as_subtree();
224 if mem.uncommitted(sub_root.0) {
225 let mut subtree = UntypedBtreeMut::new(
226 Some(sub_root),
227 mem,
228 freed_pages.clone(),
229 value_size,
230 <()>::fixed_width(),
231 );
232 subtree.finalize_dirty_checksums()?;
233 sub_root_updates.push((i, entry.key().to_vec(), subtree.get_root().unwrap()));
234 }
235 }
236 }
237 drop(accessor);
238 let mut mutator = LeafMutator::new(
240 &mut leaf_page,
241 key_size,
242 DynamicCollection::<()>::fixed_width_with(value_size),
243 );
244 for (i, key, (sub_root, checksum)) in sub_root_updates {
245 let collection = DynamicCollection::<()>::make_subtree_data(sub_root, checksum);
246 mutator.insert(i, true, &key, &collection);
247 }
248
249 Ok(())
250 })?;
251
252 tree.finalize_dirty_checksums()?;
253 assert!(freed_pages.lock().unwrap().is_empty());
255 Ok(tree.get_root())
256}
257
258pub(crate) fn parse_subtree_roots<T: Page>(
259 page: &T,
260 fixed_key_size: Option<usize>,
261 fixed_value_size: Option<usize>,
262) -> Vec<(PageNumber, Checksum)> {
263 match page.memory()[0] {
264 BRANCH => {
265 vec![]
266 }
267 LEAF => {
268 let mut result = vec![];
269 let accessor = LeafAccessor::new(
270 page.memory(),
271 fixed_key_size,
272 DynamicCollection::<()>::fixed_width_with(fixed_value_size),
273 );
274 for i in 0..accessor.num_pairs() {
275 let entry = accessor.entry(i).unwrap();
276 let collection = <&DynamicCollection<()>>::from_bytes(entry.value());
277 if matches!(collection.collection_type(), DynamicCollectionType::Subtree) {
278 result.push(collection.as_subtree());
279 }
280 }
281
282 result
283 }
284 _ => unreachable!(),
285 }
286}
287
288pub(crate) struct LeafKeyIter<'a, V: 'static> {
289 inline_collection: AccessGuard<'a, &'static DynamicCollection<V>>,
290 fixed_key_size: Option<usize>,
291 fixed_value_size: Option<usize>,
292 start_entry: isize, end_entry: isize, }
295
296impl<'a, V> LeafKeyIter<'a, V> {
297 fn new(
298 data: AccessGuard<'a, &'static DynamicCollection<V>>,
299 fixed_key_size: Option<usize>,
300 fixed_value_size: Option<usize>,
301 ) -> Self {
302 let accessor =
303 LeafAccessor::new(data.value().as_inline(), fixed_key_size, fixed_value_size);
304 let end_entry = isize::try_from(accessor.num_pairs()).unwrap() - 1;
305 Self {
306 inline_collection: data,
307 fixed_key_size,
308 fixed_value_size,
309 start_entry: 0,
310 end_entry,
311 }
312 }
313
314 fn next_key(&mut self) -> Option<&[u8]> {
315 if self.end_entry < self.start_entry {
316 return None;
317 }
318 let accessor = LeafAccessor::new(
319 self.inline_collection.value().as_inline(),
320 self.fixed_key_size,
321 self.fixed_value_size,
322 );
323 self.start_entry += 1;
324 accessor
325 .entry((self.start_entry - 1).try_into().unwrap())
326 .map(|e| e.key())
327 }
328
329 fn next_key_back(&mut self) -> Option<&[u8]> {
330 if self.end_entry < self.start_entry {
331 return None;
332 }
333 let accessor = LeafAccessor::new(
334 self.inline_collection.value().as_inline(),
335 self.fixed_key_size,
336 self.fixed_value_size,
337 );
338 self.end_entry -= 1;
339 accessor
340 .entry((self.end_entry + 1).try_into().unwrap())
341 .map(|e| e.key())
342 }
343}
344
345enum DynamicCollectionType {
346 Inline,
347 Subtree,
348}
349
350impl From<u8> for DynamicCollectionType {
351 fn from(value: u8) -> Self {
352 match value {
353 LEAF => Inline,
354 2 => Subtree,
355 _ => unreachable!(),
356 }
357 }
358}
359
360#[allow(clippy::from_over_into)]
361impl Into<u8> for DynamicCollectionType {
362 fn into(self) -> u8 {
363 match self {
364 Inline => LEAF,
367 Subtree => 2,
368 }
369 }
370}
371
372#[repr(transparent)]
386pub(crate) struct DynamicCollection<V> {
387 _value_type: PhantomData<V>,
388 data: [u8],
389}
390
391impl<V> std::fmt::Debug for DynamicCollection<V> {
392 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
393 f.debug_struct("DynamicCollection")
394 .field("data", &&self.data)
395 .finish()
396 }
397}
398
399impl<V> RedbValue for &DynamicCollection<V> {
400 type SelfType<'a> = &'a DynamicCollection<V>
401 where
402 Self: 'a;
403 type AsBytes<'a> = &'a [u8]
404 where
405 Self: 'a;
406
407 fn fixed_width() -> Option<usize> {
408 None
409 }
410
411 fn from_bytes<'a>(data: &'a [u8]) -> &'a DynamicCollection<V>
412 where
413 Self: 'a,
414 {
415 DynamicCollection::new(data)
416 }
417
418 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> &'a [u8]
419 where
420 Self: 'a,
421 Self: 'b,
422 {
423 &value.data
424 }
425
426 fn type_name() -> TypeName {
427 TypeName::internal("redb::DynamicCollection")
428 }
429}
430
431impl<V> DynamicCollection<V> {
432 fn new(data: &[u8]) -> &Self {
433 unsafe { mem::transmute(data) }
434 }
435
436 fn collection_type(&self) -> DynamicCollectionType {
437 DynamicCollectionType::from(self.data[0])
438 }
439
440 fn as_inline(&self) -> &[u8] {
441 debug_assert!(matches!(self.collection_type(), Inline));
442 &self.data[1..]
443 }
444
445 fn as_subtree(&self) -> (PageNumber, Checksum) {
446 debug_assert!(matches!(self.collection_type(), Subtree));
447 let offset = 1 + PageNumber::serialized_size();
448 let page_number = PageNumber::from_le_bytes(self.data[1..offset].try_into().unwrap());
449 let checksum = Checksum::from_le_bytes(
450 self.data[offset..(offset + size_of::<Checksum>())]
451 .try_into()
452 .unwrap(),
453 );
454 (page_number, checksum)
455 }
456
457 fn make_inline_data(data: &[u8]) -> Vec<u8> {
458 let mut result = vec![Inline.into()];
459 result.extend_from_slice(data);
460
461 result
462 }
463
464 fn make_subtree_data(root: PageNumber, checksum: Checksum) -> Vec<u8> {
465 let mut result = vec![Subtree.into()];
466 result.extend_from_slice(&root.to_le_bytes());
467 result.extend_from_slice(Checksum::as_bytes(&checksum).as_ref());
468
469 result
470 }
471
472 pub(crate) fn fixed_width_with(_value_width: Option<usize>) -> Option<usize> {
473 None
474 }
475}
476
477impl<V: RedbKey> DynamicCollection<V> {
478 fn iter<'a>(
479 collection: AccessGuard<'a, &'static DynamicCollection<V>>,
480 mem: &'a TransactionalMemory,
481 ) -> Result<MultimapValue<'a, V>> {
482 Ok(match collection.value().collection_type() {
483 Inline => {
484 let leaf_iter = LeafKeyIter::new(
485 collection,
486 V::fixed_width(),
487 <() as RedbValue>::fixed_width(),
488 );
489 MultimapValue::new_inline(leaf_iter)
490 }
491 Subtree => {
492 let root = collection.value().as_subtree().0;
493 MultimapValue::new_subtree(BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(
494 &(..),
495 Some(root),
496 mem,
497 )?)
498 }
499 })
500 }
501
502 fn iter_free_on_drop<'a>(
503 collection: AccessGuard<'a, &'static DynamicCollection<V>>,
504 pages: Vec<PageNumber>,
505 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
506 mem: &'a TransactionalMemory,
507 ) -> Result<MultimapValue<'a, V>> {
508 Ok(match collection.value().collection_type() {
509 Inline => {
510 let leaf_iter = LeafKeyIter::new(
511 collection,
512 V::fixed_width(),
513 <() as RedbValue>::fixed_width(),
514 );
515 MultimapValue::new_inline(leaf_iter)
516 }
517 Subtree => {
518 let root = collection.value().as_subtree().0;
519 let inner =
520 BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(&(..), Some(root), mem)?;
521 MultimapValue::new_subtree_free_on_drop(inner, freed_pages, pages, mem)
522 }
523 })
524 }
525}
526
527#[repr(transparent)]
528pub(crate) struct UntypedDynamicCollection {
529 data: [u8],
530}
531
532impl UntypedDynamicCollection {
533 fn new(data: &[u8]) -> &Self {
534 unsafe { mem::transmute(data) }
535 }
536
537 fn collection_type(&self) -> DynamicCollectionType {
538 DynamicCollectionType::from(self.data[0])
539 }
540
541 fn as_inline(&self) -> &[u8] {
542 debug_assert!(matches!(self.collection_type(), Inline));
543 &self.data[1..]
544 }
545
546 fn as_subtree(&self) -> (PageNumber, Checksum) {
547 debug_assert!(matches!(self.collection_type(), Subtree));
548 let offset = 1 + PageNumber::serialized_size();
549 let page_number = PageNumber::from_le_bytes(self.data[1..offset].try_into().unwrap());
550 let checksum = Checksum::from_le_bytes(
551 self.data[offset..(offset + size_of::<Checksum>())]
552 .try_into()
553 .unwrap(),
554 );
555 (page_number, checksum)
556 }
557}
558
559enum ValueIterState<'a, V: RedbKey + 'static> {
560 Subtree(BtreeRangeIter<'a, V, ()>),
561 InlineLeaf(LeafKeyIter<'a, V>),
562}
563
564pub struct MultimapValue<'a, V: RedbKey + 'static> {
565 inner: Option<ValueIterState<'a, V>>,
566 freed_pages: Option<Arc<Mutex<Vec<PageNumber>>>>,
567 free_on_drop: Vec<PageNumber>,
568 mem: Option<&'a TransactionalMemory>,
569 _value_type: PhantomData<V>,
570}
571
572impl<'a, V: RedbKey + 'static> MultimapValue<'a, V> {
573 fn new_subtree(inner: BtreeRangeIter<'a, V, ()>) -> Self {
574 Self {
575 inner: Some(ValueIterState::Subtree(inner)),
576 freed_pages: None,
577 free_on_drop: vec![],
578 mem: None,
579 _value_type: Default::default(),
580 }
581 }
582
583 fn new_subtree_free_on_drop(
584 inner: BtreeRangeIter<'a, V, ()>,
585 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
586 pages: Vec<PageNumber>,
587 mem: &'a TransactionalMemory,
588 ) -> Self {
589 Self {
590 inner: Some(ValueIterState::Subtree(inner)),
591 freed_pages: Some(freed_pages),
592 free_on_drop: pages,
593 mem: Some(mem),
594 _value_type: Default::default(),
595 }
596 }
597
598 fn new_inline(inner: LeafKeyIter<'a, V>) -> Self {
599 Self {
600 inner: Some(ValueIterState::InlineLeaf(inner)),
601 freed_pages: None,
602 free_on_drop: vec![],
603 mem: None,
604 _value_type: Default::default(),
605 }
606 }
607}
608
609impl<'a, V: RedbKey + 'static> Iterator for MultimapValue<'a, V> {
610 type Item = Result<AccessGuard<'a, V>>;
611
612 fn next(&mut self) -> Option<Self::Item> {
613 let bytes = match self.inner.as_mut().unwrap() {
615 ValueIterState::Subtree(ref mut iter) => match iter.next()? {
616 Ok(e) => e.key_data(),
617 Err(err) => {
618 return Some(Err(err));
619 }
620 },
621 ValueIterState::InlineLeaf(ref mut iter) => iter.next_key()?.to_vec(),
622 };
623 Some(Ok(AccessGuard::with_owned_value(bytes)))
624 }
625}
626
627impl<'a, V: RedbKey + 'static> DoubleEndedIterator for MultimapValue<'a, V> {
628 fn next_back(&mut self) -> Option<Self::Item> {
629 let bytes = match self.inner.as_mut().unwrap() {
631 ValueIterState::Subtree(ref mut iter) => match iter.next_back()? {
632 Ok(e) => e.key_data(),
633 Err(err) => {
634 return Some(Err(err));
635 }
636 },
637 ValueIterState::InlineLeaf(ref mut iter) => iter.next_key_back()?.to_vec(),
638 };
639 Some(Ok(AccessGuard::with_owned_value(bytes)))
640 }
641}
642
643impl<'a, V: RedbKey + 'static> Drop for MultimapValue<'a, V> {
644 fn drop(&mut self) {
645 drop(mem::take(&mut self.inner));
647 if !self.free_on_drop.is_empty() {
648 let mut freed_pages = self.freed_pages.as_ref().unwrap().lock().unwrap();
649 for page in self.free_on_drop.iter() {
650 if !self.mem.unwrap().free_if_uncommitted(*page) {
651 freed_pages.push(*page);
652 }
653 }
654 }
655 }
656}
657
658pub struct MultimapRange<'a, K: RedbKey + 'static, V: RedbKey + 'static> {
659 inner: BtreeRangeIter<'a, K, &'static DynamicCollection<V>>,
660 mem: &'a TransactionalMemory,
661 _key_type: PhantomData<K>,
662 _value_type: PhantomData<V>,
663}
664
665impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> MultimapRange<'a, K, V> {
666 fn new(
667 inner: BtreeRangeIter<'a, K, &'static DynamicCollection<V>>,
668 mem: &'a TransactionalMemory,
669 ) -> Self {
670 Self {
671 inner,
672 mem,
673 _key_type: Default::default(),
674 _value_type: Default::default(),
675 }
676 }
677}
678
679impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> Iterator for MultimapRange<'a, K, V> {
680 type Item = Result<(AccessGuard<'a, K>, MultimapValue<'a, V>)>;
681
682 fn next(&mut self) -> Option<Self::Item> {
683 match self.inner.next()? {
684 Ok(entry) => {
685 let key = AccessGuard::with_owned_value(entry.key_data());
686 let (page, _, value_range) = entry.into_raw();
687 let collection = AccessGuard::with_page(page, value_range);
688 Some(DynamicCollection::iter(collection, self.mem).map(|iter| (key, iter)))
689 }
690 Err(err) => Some(Err(err)),
691 }
692 }
693}
694
695impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> DoubleEndedIterator
696 for MultimapRange<'a, K, V>
697{
698 fn next_back(&mut self) -> Option<Self::Item> {
699 match self.inner.next_back()? {
700 Ok(entry) => {
701 let key = AccessGuard::with_owned_value(entry.key_data());
702 let (page, _, value_range) = entry.into_raw();
703 let collection = AccessGuard::with_page(page, value_range);
704 Some(DynamicCollection::iter(collection, self.mem).map(|iter| (key, iter)))
705 }
706 Err(err) => Some(Err(err)),
707 }
708 }
709}
710
711pub struct MultimapTable<'db, 'txn, K: RedbKey + 'static, V: RedbKey + 'static> {
715 name: String,
716 transaction: &'txn WriteTransaction<'db>,
717 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
718 tree: BtreeMut<'txn, K, &'static DynamicCollection<V>>,
719 mem: &'db TransactionalMemory,
720 _value_type: PhantomData<V>,
721}
722
723impl<K: RedbKey + 'static, V: RedbKey + 'static> MultimapTableHandle
724 for MultimapTable<'_, '_, K, V>
725{
726 fn name(&self) -> &str {
727 &self.name
728 }
729}
730
731impl<'db, 'txn, K: RedbKey + 'static, V: RedbKey + 'static> MultimapTable<'db, 'txn, K, V> {
732 pub(crate) fn new(
733 name: &str,
734 table_root: Option<(PageNumber, Checksum)>,
735 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
736 mem: &'db TransactionalMemory,
737 transaction: &'txn WriteTransaction<'db>,
738 ) -> MultimapTable<'db, 'txn, K, V> {
739 MultimapTable {
740 name: name.to_string(),
741 transaction,
742 freed_pages: freed_pages.clone(),
743 tree: BtreeMut::new(table_root, mem, freed_pages),
744 mem,
745 _value_type: Default::default(),
746 }
747 }
748
749 #[allow(dead_code)]
750 pub(crate) fn print_debug(&self, include_values: bool) -> Result {
751 self.tree.print_debug(include_values)
752 }
753
754 pub fn insert<'k, 'v>(
758 &mut self,
759 key: impl Borrow<K::SelfType<'k>>,
760 value: impl Borrow<V::SelfType<'v>>,
761 ) -> Result<bool> {
762 let value_bytes = V::as_bytes(value.borrow());
763 let value_bytes_ref = value_bytes.as_ref();
764 if value_bytes_ref.len() > MAX_VALUE_LENGTH {
765 return Err(StorageError::ValueTooLarge(value_bytes_ref.len()));
766 }
767 let key_bytes = K::as_bytes(key.borrow());
768 if key_bytes.as_ref().len() > MAX_VALUE_LENGTH {
769 return Err(StorageError::ValueTooLarge(key_bytes.as_ref().len()));
770 }
771 let get_result = self.tree.get(key.borrow())?;
772 let existed = if get_result.is_some() {
773 #[allow(clippy::unnecessary_unwrap)]
774 let guard = get_result.unwrap();
775 let collection_type = guard.value().collection_type();
776 match collection_type {
777 Inline => {
778 let leaf_data = guard.value().as_inline();
779 let accessor = LeafAccessor::new(
780 leaf_data,
781 V::fixed_width(),
782 <() as RedbValue>::fixed_width(),
783 );
784 let (position, found) = accessor.position::<V>(value_bytes_ref);
785 if found {
786 return Ok(true);
787 }
788
789 let new_pairs = accessor.num_pairs() + 1;
790 let new_pair_bytes =
791 accessor.length_of_pairs(0, accessor.num_pairs()) + value_bytes_ref.len();
792 let new_key_bytes =
793 accessor.length_of_keys(0, accessor.num_pairs()) + value_bytes_ref.len();
794 let required_inline_bytes = RawLeafBuilder::required_bytes(
795 new_pairs,
796 new_pair_bytes,
797 V::fixed_width(),
798 <() as RedbValue>::fixed_width(),
799 );
800
801 if required_inline_bytes < self.mem.get_page_size() / 2 {
802 let mut data = vec![0; required_inline_bytes];
803 let mut builder = RawLeafBuilder::new(
804 &mut data,
805 new_pairs,
806 V::fixed_width(),
807 <() as RedbValue>::fixed_width(),
808 new_key_bytes,
809 );
810 for i in 0..accessor.num_pairs() {
811 if i == position {
812 builder.append(
813 value_bytes_ref,
814 <() as RedbValue>::as_bytes(&()).as_ref(),
815 );
816 }
817 let entry = accessor.entry(i).unwrap();
818 builder.append(entry.key(), entry.value());
819 }
820 if position == accessor.num_pairs() {
821 builder
822 .append(value_bytes_ref, <() as RedbValue>::as_bytes(&()).as_ref());
823 }
824 drop(builder);
825 drop(guard);
826 let inline_data = DynamicCollection::<V>::make_inline_data(&data);
827 self.tree
828 .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
829 } else {
830 let mut page = self.mem.allocate(leaf_data.len(), CachePriority::Low)?;
832 page.memory_mut()[..leaf_data.len()].copy_from_slice(leaf_data);
833 let page_number = page.get_page_number();
834 drop(page);
835 drop(guard);
836
837 let mut subtree: BtreeMut<'_, V, ()> = BtreeMut::new(
839 Some((page_number, 0)),
840 self.mem,
841 self.freed_pages.clone(),
842 );
843 let existed = subtree.insert(value.borrow(), &())?.is_some();
844 assert_eq!(existed, found);
845 let (new_root, new_checksum) = subtree.get_root().unwrap();
846 let subtree_data =
847 DynamicCollection::<V>::make_subtree_data(new_root, new_checksum);
848 self.tree
849 .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
850 }
851
852 found
853 }
854 Subtree => {
855 let mut subtree: BtreeMut<'_, V, ()> = BtreeMut::new(
856 Some(guard.value().as_subtree()),
857 self.mem,
858 self.freed_pages.clone(),
859 );
860 drop(guard);
861 let existed = subtree.insert(value.borrow(), &())?.is_some();
862 let (new_root, new_checksum) = subtree.get_root().unwrap();
863 let subtree_data =
864 DynamicCollection::<V>::make_subtree_data(new_root, new_checksum);
865 self.tree
866 .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
867
868 existed
869 }
870 }
871 } else {
872 drop(get_result);
873 let required_inline_bytes = RawLeafBuilder::required_bytes(
874 1,
875 value_bytes_ref.len(),
876 V::fixed_width(),
877 <() as RedbValue>::fixed_width(),
878 );
879 if required_inline_bytes < self.mem.get_page_size() / 2 {
880 let mut data = vec![0; required_inline_bytes];
881 let mut builder = RawLeafBuilder::new(
882 &mut data,
883 1,
884 V::fixed_width(),
885 <() as RedbValue>::fixed_width(),
886 value_bytes_ref.len(),
887 );
888 builder.append(value_bytes_ref, <() as RedbValue>::as_bytes(&()).as_ref());
889 drop(builder);
890 let inline_data = DynamicCollection::<V>::make_inline_data(&data);
891 self.tree
892 .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
893 } else {
894 let mut subtree: BtreeMut<'_, V, ()> =
895 BtreeMut::new(None, self.mem, self.freed_pages.clone());
896 subtree.insert(value.borrow(), &())?;
897 let (new_root, new_checksum) = subtree.get_root().unwrap();
898 let subtree_data =
899 DynamicCollection::<V>::make_subtree_data(new_root, new_checksum);
900 self.tree
901 .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
902 }
903 false
904 };
905
906 Ok(existed)
907 }
908
909 pub fn remove<'a>(
913 &mut self,
914 key: impl Borrow<K::SelfType<'a>>,
915 value: impl Borrow<V::SelfType<'a>>,
916 ) -> Result<bool>
917 where
918 K: 'a,
919 V: 'a,
920 {
921 let get_result = self.tree.get(key.borrow())?;
922 if get_result.is_none() {
923 return Ok(false);
924 }
925 let guard = get_result.unwrap();
926 let v = guard.value();
927 let existed = match v.collection_type() {
928 Inline => {
929 let leaf_data = v.as_inline();
930 let accessor = LeafAccessor::new(
931 leaf_data,
932 V::fixed_width(),
933 <() as RedbValue>::fixed_width(),
934 );
935 if let Some(position) = accessor.find_key::<V>(V::as_bytes(value.borrow()).as_ref())
936 {
937 let old_num_pairs = accessor.num_pairs();
938 if old_num_pairs == 1 {
939 drop(guard);
940 self.tree.remove(key.borrow())?;
941 } else {
942 let old_pairs_len = accessor.length_of_pairs(0, old_num_pairs);
943 let removed_value_len = accessor.entry(position).unwrap().key().len();
944 let required = RawLeafBuilder::required_bytes(
945 old_num_pairs - 1,
946 old_pairs_len - removed_value_len,
947 V::fixed_width(),
948 <() as RedbValue>::fixed_width(),
949 );
950 let mut new_data = vec![0; required];
951 let new_key_len =
952 accessor.length_of_keys(0, old_num_pairs) - removed_value_len;
953 let mut builder = RawLeafBuilder::new(
954 &mut new_data,
955 old_num_pairs - 1,
956 V::fixed_width(),
957 <() as RedbValue>::fixed_width(),
958 new_key_len,
959 );
960 for i in 0..old_num_pairs {
961 if i != position {
962 let entry = accessor.entry(i).unwrap();
963 builder.append(entry.key(), entry.value());
964 }
965 }
966 drop(builder);
967 drop(guard);
968
969 let inline_data = DynamicCollection::<V>::make_inline_data(&new_data);
970 self.tree
971 .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
972 }
973 true
974 } else {
975 drop(guard);
976 false
977 }
978 }
979 Subtree => {
980 let mut subtree: BtreeMut<V, ()> =
981 BtreeMut::new(Some(v.as_subtree()), self.mem, self.freed_pages.clone());
982 drop(guard);
983 let existed = subtree.remove(value.borrow())?.is_some();
984
985 if let Some((new_root, new_checksum)) = subtree.get_root() {
986 let page = self.mem.get_page(new_root)?;
987 match page.memory()[0] {
988 LEAF => {
989 let accessor = LeafAccessor::new(
990 page.memory(),
991 V::fixed_width(),
992 <() as RedbValue>::fixed_width(),
993 );
994 let len = accessor.total_length();
995 if len < self.mem.get_page_size() / 2 {
996 let inline_data =
997 DynamicCollection::<V>::make_inline_data(&page.memory()[..len]);
998 self.tree
999 .insert(key.borrow(), &DynamicCollection::new(&inline_data))?;
1000 drop(page);
1001 if !self.mem.free_if_uncommitted(new_root) {
1002 (*self.freed_pages).lock().unwrap().push(new_root);
1003 }
1004 } else {
1005 let subtree_data = DynamicCollection::<V>::make_subtree_data(
1006 new_root,
1007 new_checksum,
1008 );
1009 self.tree
1010 .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1011 }
1012 }
1013 BRANCH => {
1014 let subtree_data =
1015 DynamicCollection::<V>::make_subtree_data(new_root, new_checksum);
1016 self.tree
1017 .insert(key.borrow(), &DynamicCollection::new(&subtree_data))?;
1018 }
1019 _ => unreachable!(),
1020 }
1021 } else {
1022 self.tree.remove(key.borrow())?;
1023 }
1024
1025 existed
1026 }
1027 };
1028
1029 Ok(existed)
1030 }
1031
1032 pub fn remove_all<'a>(&mut self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<V>>
1036 where
1037 K: 'a,
1038 {
1039 let iter = if let Some(collection) = self.tree.remove(key.borrow())? {
1040 let mut pages = vec![];
1041 if matches!(
1042 collection.value().collection_type(),
1043 DynamicCollectionType::Subtree
1044 ) {
1045 let root = collection.value().as_subtree().0;
1046 let all_pages = AllPageNumbersBtreeIter::new(
1047 root,
1048 V::fixed_width(),
1049 <() as RedbValue>::fixed_width(),
1050 self.mem,
1051 )?;
1052 for page in all_pages {
1053 pages.push(page?);
1054 }
1055 }
1056 DynamicCollection::iter_free_on_drop(
1057 collection,
1058 pages,
1059 self.freed_pages.clone(),
1060 self.mem,
1061 )?
1062 } else {
1063 MultimapValue::new_subtree(BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(
1064 &(..),
1065 None,
1066 self.mem,
1067 )?)
1068 };
1069
1070 Ok(iter)
1071 }
1072}
1073
1074impl<'db, 'txn, K: RedbKey + 'static, V: RedbKey + 'static> ReadableMultimapTable<K, V>
1075 for MultimapTable<'db, 'txn, K, V>
1076{
1077 fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<V>>
1079 where
1080 K: 'a,
1081 {
1082 let iter = if let Some(collection) = self.tree.get(key.borrow())? {
1083 DynamicCollection::iter(collection, self.mem)?
1084 } else {
1085 MultimapValue::new_subtree(BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(
1086 &(..),
1087 None,
1088 self.mem,
1089 )?)
1090 };
1091
1092 Ok(iter)
1093 }
1094
1095 fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<MultimapRange<K, V>>
1097 where
1098 K: 'a,
1099 KR: Borrow<K::SelfType<'a>> + 'a,
1100 {
1101 let inner = self.tree.range(&range)?;
1102 Ok(MultimapRange::new(inner, self.mem))
1103 }
1104
1105 fn stats(&self) -> Result<TableStats> {
1106 let tree_stats = multimap_btree_stats(
1107 self.tree.get_root().map(|(p, _)| p),
1108 self.mem,
1109 K::fixed_width(),
1110 V::fixed_width(),
1111 )?;
1112
1113 Ok(TableStats {
1114 tree_height: tree_stats.tree_height,
1115 leaf_pages: tree_stats.leaf_pages,
1116 branch_pages: tree_stats.branch_pages,
1117 stored_leaf_bytes: tree_stats.stored_leaf_bytes,
1118 metadata_bytes: tree_stats.metadata_bytes,
1119 fragmented_bytes: tree_stats.fragmented_bytes,
1120 })
1121 }
1122
1123 fn len(&self) -> Result<u64> {
1125 let mut count = 0;
1126 for item in self.iter()? {
1127 let (_, values) = item?;
1128 for v in values {
1129 v?;
1130 count += 1;
1131 }
1132 }
1133 Ok(count)
1134 }
1135
1136 fn is_empty(&self) -> Result<bool> {
1138 self.len().map(|x| x == 0)
1139 }
1140}
1141
1142impl<K: RedbKey + 'static, V: RedbKey + 'static> Sealed for MultimapTable<'_, '_, K, V> {}
1143
1144impl<'db, 'txn, K: RedbKey + 'static, V: RedbKey + 'static> Drop
1145 for MultimapTable<'db, 'txn, K, V>
1146{
1147 fn drop(&mut self) {
1148 self.transaction.close_table(&self.name, &self.tree);
1149 }
1150}
1151
1152pub trait ReadableMultimapTable<K: RedbKey + 'static, V: RedbKey + 'static>: Sealed {
1153 fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<V>>
1155 where
1156 K: 'a;
1157
1158 fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<MultimapRange<K, V>>
1159 where
1160 K: 'a,
1161 KR: Borrow<K::SelfType<'a>> + 'a;
1162
1163 fn stats(&self) -> Result<TableStats>;
1165
1166 fn len(&self) -> Result<u64>;
1167
1168 fn is_empty(&self) -> Result<bool>;
1169
1170 fn iter(&self) -> Result<MultimapRange<K, V>> {
1173 self.range::<K::SelfType<'_>>(..)
1174 }
1175}
1176
1177pub struct ReadOnlyUntypedMultimapTable<'txn> {
1179 tree: RawBtree<'txn>,
1180 fixed_key_size: Option<usize>,
1181 fixed_value_size: Option<usize>,
1182 mem: &'txn TransactionalMemory,
1183}
1184
1185impl<'txn> ReadOnlyUntypedMultimapTable<'txn> {
1186 pub(crate) fn new(
1187 root_page: Option<(PageNumber, Checksum)>,
1188 fixed_key_size: Option<usize>,
1189 fixed_value_size: Option<usize>,
1190 mem: &'txn TransactionalMemory,
1191 ) -> Self {
1192 Self {
1193 tree: RawBtree::new(
1194 root_page,
1195 fixed_key_size,
1196 DynamicCollection::<()>::fixed_width_with(fixed_value_size),
1197 mem,
1198 ),
1199 fixed_key_size,
1200 fixed_value_size,
1201 mem,
1202 }
1203 }
1204
1205 pub fn stats(&self) -> Result<TableStats> {
1207 let tree_stats = multimap_btree_stats(
1208 self.tree.get_root().map(|(p, _)| p),
1209 self.mem,
1210 self.fixed_key_size,
1211 self.fixed_value_size,
1212 )?;
1213
1214 Ok(TableStats {
1215 tree_height: tree_stats.tree_height,
1216 leaf_pages: tree_stats.leaf_pages,
1217 branch_pages: tree_stats.branch_pages,
1218 stored_leaf_bytes: tree_stats.stored_leaf_bytes,
1219 metadata_bytes: tree_stats.metadata_bytes,
1220 fragmented_bytes: tree_stats.fragmented_bytes,
1221 })
1222 }
1223}
1224
1225pub struct ReadOnlyMultimapTable<'txn, K: RedbKey + 'static, V: RedbKey + 'static> {
1227 tree: Btree<'txn, K, &'static DynamicCollection<V>>,
1228 mem: &'txn TransactionalMemory,
1229 _value_type: PhantomData<V>,
1230}
1231
1232impl<'txn, K: RedbKey + 'static, V: RedbKey + 'static> ReadOnlyMultimapTable<'txn, K, V> {
1233 pub(crate) fn new(
1234 root_page: Option<(PageNumber, Checksum)>,
1235 hint: PageHint,
1236 mem: &'txn TransactionalMemory,
1237 ) -> Result<ReadOnlyMultimapTable<'txn, K, V>> {
1238 Ok(ReadOnlyMultimapTable {
1239 tree: Btree::new(root_page, hint, mem)?,
1240 mem,
1241 _value_type: Default::default(),
1242 })
1243 }
1244}
1245
1246impl<'txn, K: RedbKey + 'static, V: RedbKey + 'static> ReadableMultimapTable<K, V>
1247 for ReadOnlyMultimapTable<'txn, K, V>
1248{
1249 fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<MultimapValue<V>>
1251 where
1252 K: 'a,
1253 {
1254 let iter = if let Some(collection) = self.tree.get(key.borrow())? {
1255 DynamicCollection::iter(collection, self.mem)?
1256 } else {
1257 MultimapValue::new_subtree(BtreeRangeIter::new::<RangeFull, &V::SelfType<'_>>(
1258 &(..),
1259 None,
1260 self.mem,
1261 )?)
1262 };
1263
1264 Ok(iter)
1265 }
1266
1267 fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<MultimapRange<K, V>>
1268 where
1269 K: 'a,
1270 KR: Borrow<K::SelfType<'a>> + 'a,
1271 {
1272 let inner = self.tree.range(&range)?;
1273 Ok(MultimapRange::new(inner, self.mem))
1274 }
1275
1276 fn stats(&self) -> Result<TableStats> {
1277 let tree_stats = multimap_btree_stats(
1278 self.tree.get_root().map(|(p, _)| p),
1279 self.mem,
1280 K::fixed_width(),
1281 V::fixed_width(),
1282 )?;
1283
1284 Ok(TableStats {
1285 tree_height: tree_stats.tree_height,
1286 leaf_pages: tree_stats.leaf_pages,
1287 branch_pages: tree_stats.branch_pages,
1288 stored_leaf_bytes: tree_stats.stored_leaf_bytes,
1289 metadata_bytes: tree_stats.metadata_bytes,
1290 fragmented_bytes: tree_stats.fragmented_bytes,
1291 })
1292 }
1293
1294 fn len(&self) -> Result<u64> {
1295 let mut count = 0;
1296 for item in self.iter()? {
1297 let (_, values) = item?;
1298 for v in values {
1299 v?;
1300 count += 1;
1301 }
1302 }
1303 Ok(count)
1304 }
1305
1306 fn is_empty(&self) -> Result<bool> {
1307 self.len().map(|x| x == 0)
1308 }
1309}
1310
1311impl<K: RedbKey, V: RedbKey> Sealed for ReadOnlyMultimapTable<'_, K, V> {}