Skip to main content

bf_tree/
tree.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT license.
3
4#[cfg(not(all(feature = "shuttle", test)))]
5use rand::Rng;
6
7#[cfg(all(feature = "shuttle", test))]
8use shuttle::rand::Rng;
9
10use cfg_if::cfg_if;
11
12cfg_if! {
13    if #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))] {
14        use crate::metric::{Timer, TlsRecorder, timer::{TimerRecorder, DebugTimerGuard}};
15        use std::cell::UnsafeCell;
16        use thread_local::ThreadLocal;
17    }
18}
19
20use crate::{
21    check_parent,
22    circular_buffer::{CircularBufferMetrics, TombstoneHandle},
23    counter,
24    error::{ConfigError, TreeError},
25    histogram, info,
26    mini_page_op::{upgrade_to_full_page, LeafEntryXLocked, LeafOperations, ReadResult},
27    nodes::{
28        leaf_node::{LeafKVMeta, LeafReadResult, MiniPageNextLevel, OpType},
29        InnerNode, InnerNodeBuilder, LeafNode, PageID, CACHE_LINE_SIZE, DISK_PAGE_SIZE,
30        INNER_NODE_SIZE, MAX_KEY_LEN, MAX_LEAF_PAGE_SIZE, MAX_VALUE_LEN,
31    },
32    range_scan::{ScanIter, ScanIterMut, ScanReturnField},
33    storage::{LeafStorage, PageLocation, PageTable},
34    sync::{
35        atomic::{AtomicU64, Ordering},
36        Arc,
37    },
38    utils::{get_rng, inner_lock::ReadGuard, Backoff, BfsVisitor, NodeInfo},
39    wal::{WriteAheadLog, WriteOp},
40    Config, StorageBackend,
41};
42use std::path::Path;
43
44/// The bf-tree instance
45pub struct BfTree {
46    pub(crate) root_page_id: AtomicU64,
47    pub(crate) storage: LeafStorage,
48    pub(crate) wal: Option<Arc<WriteAheadLog>>,
49    pub(crate) config: Arc<Config>,
50    pub(crate) write_load_full_page: bool,
51    pub(crate) cache_only: bool, // If true, there is no permenant storage layer thus no durability guarantee of any upsert in the tree
52    pub(crate) mini_page_size_classes: Vec<usize>, // Size classes of mini pages
53    #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
54    pub metrics_recorder: Option<Arc<ThreadLocal<UnsafeCell<TlsRecorder>>>>, // Per-tree metrics recorder under "metrics-rt-debug" feature
55}
56
57unsafe impl Sync for BfTree {}
58
59unsafe impl Send for BfTree {}
60
61#[derive(Debug, PartialEq, Eq, Clone)]
62pub enum LeafInsertResult {
63    Success,
64    InvalidKV(String),
65}
66
67#[derive(Debug, PartialEq, Eq, Clone)]
68pub enum ScanIterError {
69    CacheOnlyMode,
70    InvalidStartKey,
71    InvalidEndKey,
72    InvalidCount,
73    InvalidKeyRange,
74}
75
76impl Drop for BfTree {
77    fn drop(&mut self) {
78        if let Some(ref wal) = self.wal {
79            wal.stop_background_job();
80        }
81
82        let visitor = BfsVisitor::new_all_nodes(self);
83        for node_info in visitor {
84            match node_info {
85                NodeInfo::Leaf { page_id, .. } => {
86                    let mut leaf = self.mapping_table().get_mut(&page_id);
87                    leaf.dealloc_self(&self.storage, self.cache_only);
88                }
89                NodeInfo::Inner { ptr, .. } => {
90                    if unsafe { &*ptr }.is_valid_disk_offset() {
91                        let disk_offset = unsafe { &*ptr }.disk_offset;
92                        if self.config.storage_backend == StorageBackend::Memory {
93                            // special case for memory backend, we need to deallocate the memory.
94                            self.storage.vfs.dealloc_offset(disk_offset as usize);
95                        }
96                    }
97                    InnerNode::free_node(ptr as *mut InnerNode);
98                }
99            }
100        }
101    }
102}
103
104impl Default for BfTree {
105    fn default() -> Self {
106        Self::new(":memory:", 1024 * 1024 * 32).unwrap()
107    }
108}
109
110impl BfTree {
111    pub(crate) const ROOT_IS_LEAF_MASK: u64 = 0x8000_0000_0000_0000; // This is quite error-prone, make sure the mask is not conflicting with the page id definition.
112
113    /// Create the size classes of all memory pages in acending order based on the record size (key + value) and the leaf page size
114    /// [s_0, s_1, ..., s_x], ascending order.
115    /// Each s_i is of size 2^i * c + size_of(LeafNode)
116    /// where c = (min_record_size + LeafKVMeta) aligned on CACHE_LINE_SIZE and 2^i * c <= leaf_page_size and s_x = leaf_page_size
117    ///
118    /// In non cache-only mode, the largest mini page size is s_(x-1) and full page/leaf base page size is s_x.
119    /// Currently, the design assumes a mini-page can always be successfully merged into a leaf page in one pass (including at most one base page split).
120    /// As such, the following three conditions are sufficient in preventing merge failures.
121    /// C1) x >= 1
122    /// C2) s_x > s_{x-1}
123    /// C3) max_record_size + SizeOf(KVMeta) <= (s_x - s_{x-1} - 2 * (fence_meta + max_key_len)).
124    ///
125    /// In cache-only mode, the largest mini page is s_x as there is no full nor base page.
126    /// Although there is no merging of mini-pages, the design assumes a new record can always be successfully inserted into a mini-page in one pass
127    /// (including at most one mini-page split). As such, the following sufficient condition is required.
128    /// C1) max_record_page + Sizeof(KVMeta) <= (s_x - SizeOf(NodeMeta)) / 2
129    /// C2) if x >= 1, s_x >= s_{x-1} + max_record_size + SizeOf(KVMeta)
130    ///
131    /// C2) is necessary for cache-only mode to guarantee that a mini-page can grow to full page size before being split up to two full-sized pages
132    pub(crate) fn create_mem_page_size_classes(
133        min_record_size_in_byte: usize,
134        max_record_size_in_byte: usize,
135        leaf_page_size_in_byte: usize,
136        max_fence_len_in_byte: usize,
137        cache_only: bool,
138    ) -> Vec<usize> {
139        // Sanity check of the input parameters
140        assert!(
141            min_record_size_in_byte > 1,
142            "cb_min_record_size in config cannot be less than 2"
143        );
144        assert!(
145            min_record_size_in_byte <= max_record_size_in_byte,
146            "cb_min_record_size cannot be larger than cb_max_record_size"
147        );
148        assert!(
149            max_fence_len_in_byte > 0,
150            "max_fence_len in config cannot be zero"
151        );
152        assert!(
153            max_fence_len_in_byte / 2 < max_record_size_in_byte,
154            "max_fence_len/2 cannot be larger than cb_max_record_size"
155        );
156        assert!(
157            leaf_page_size_in_byte <= MAX_LEAF_PAGE_SIZE,
158            "leaf_page_size in config cannot be larger than {}",
159            MAX_LEAF_PAGE_SIZE
160        );
161        assert!(
162            max_fence_len_in_byte / 2 <= MAX_KEY_LEN,
163            "max_key_len in config cannot be larger than {}",
164            MAX_KEY_LEN
165        );
166        assert!(
167            leaf_page_size_in_byte / min_record_size_in_byte <= 4096,
168            "Maximum number of records per page (leaf_page_size/min_record_size) cannot exceed 2^12.", // This is restricted by #bits for value count in NodeMeta
169        );
170
171        // In non cache-only mode, the leaf page should be in the multiple of DISK_PAGE_SIZE.
172        // In cache-only mode, the leaf page should be aligned on cache line size.
173        if !cache_only {
174            assert!(
175                leaf_page_size_in_byte.is_multiple_of(DISK_PAGE_SIZE),
176                "leaf_page_size in config should be multiple of {}",
177                DISK_PAGE_SIZE
178            );
179        } else {
180            assert!(
181                leaf_page_size_in_byte.is_multiple_of(CACHE_LINE_SIZE),
182                "leaf_page_size in config should be multiple of {}",
183                CACHE_LINE_SIZE
184            );
185        }
186
187        let max_record_size_with_meta = max_record_size_in_byte + std::mem::size_of::<LeafKVMeta>();
188        let mut max_mini_page_size: usize;
189
190        if cache_only {
191            // Guarantee C1), C2) for cache-only mode
192            max_mini_page_size = leaf_page_size_in_byte - max_record_size_with_meta;
193            max_mini_page_size = (max_mini_page_size / CACHE_LINE_SIZE) * CACHE_LINE_SIZE;
194
195            assert!(
196                leaf_page_size_in_byte
197                    >= 2 * max_record_size_with_meta + std::mem::size_of::<LeafNode>(),
198                "cb_max_record_size of config should be <= {}",
199                (leaf_page_size_in_byte - std::mem::size_of::<LeafNode>()) / 2
200                    - std::mem::size_of::<LeafKVMeta>()
201            );
202        } else {
203            // Guarantee C1), C2) and C3) for non cache-only mode
204            max_mini_page_size = leaf_page_size_in_byte
205                - max_record_size_with_meta
206                - max_fence_len_in_byte
207                - 2 * std::mem::size_of::<LeafKVMeta>();
208            max_mini_page_size = (max_mini_page_size / CACHE_LINE_SIZE) * CACHE_LINE_SIZE;
209
210            assert!(
211                max_mini_page_size >= max_record_size_with_meta + std::mem::size_of::<LeafNode>(),
212                "cb_max_record_size of config should be <= {}",
213                max_mini_page_size
214                    - std::mem::size_of::<LeafNode>()
215                    - std::mem::size_of::<LeafKVMeta>()
216            );
217        }
218
219        // Generate all size classes for mini-pages
220        let mut mem_page_size_classes = Vec::new();
221
222        let base: i32 = 2;
223        let mut record_num_per_page_exp: u32 = 0;
224        let c: usize = min_record_size_in_byte + std::mem::size_of::<LeafKVMeta>();
225
226        // No need to consider fence here as mini-pages have no fences.
227        let mut size_class =
228            base.pow(record_num_per_page_exp) as usize * c + std::mem::size_of::<LeafNode>();
229
230        // Memory page size is aligned on cache line size
231        if !size_class.is_multiple_of(CACHE_LINE_SIZE) {
232            size_class = (size_class / CACHE_LINE_SIZE + 1) * CACHE_LINE_SIZE;
233        }
234
235        while size_class <= max_mini_page_size {
236            if mem_page_size_classes.is_empty()
237                || (mem_page_size_classes[mem_page_size_classes.len() - 1] < size_class)
238            {
239                mem_page_size_classes.push(size_class);
240            }
241
242            record_num_per_page_exp += 1;
243            size_class =
244                base.pow(record_num_per_page_exp) as usize * c + std::mem::size_of::<LeafNode>();
245
246            if !size_class.is_multiple_of(CACHE_LINE_SIZE) {
247                size_class = (size_class / CACHE_LINE_SIZE + 1) * CACHE_LINE_SIZE;
248            }
249        }
250
251        if !cache_only {
252            assert!(!mem_page_size_classes.is_empty());
253        }
254
255        // Add the largest mini page size if not already added
256        if mem_page_size_classes.is_empty()
257            || mem_page_size_classes[mem_page_size_classes.len() - 1] < max_mini_page_size
258        {
259            mem_page_size_classes.push(max_mini_page_size);
260        }
261
262        // The largest page size is the full leaf page size
263        if mem_page_size_classes.is_empty()
264            || mem_page_size_classes[mem_page_size_classes.len() - 1] < leaf_page_size_in_byte
265        {
266            mem_page_size_classes.push(leaf_page_size_in_byte);
267        }
268
269        if !cache_only {
270            assert!(mem_page_size_classes.len() >= 2);
271        } else {
272            assert!(!mem_page_size_classes.is_empty());
273        }
274
275        mem_page_size_classes
276    }
277
278    /// Create a new bf-tree instance with customized storage backend and
279    /// circular buffer size
280    ///
281    /// For in-memory tree, use `:memory:` as file path.
282    /// For cache-only tree, use `:cache:` as file path
283    /// For disk tree, file_path is the path to the index file
284    ///
285    /// Mini page cache must be at least 8192 bytes for practical workloads.
286    ///
287    /// ```
288    /// use bf_tree::BfTree;
289    /// let tree = BfTree::new(":memory:", 8192).unwrap();
290    /// ```
291    pub fn new(file_path: impl AsRef<Path>, cache_size_byte: usize) -> Result<Self, ConfigError> {
292        let config = Config::new(file_path, cache_size_byte);
293        Self::with_config(config, None)
294    }
295
296    /// Create a new bf-tree instance with customized configuration based on
297    /// a config file
298    pub fn new_with_config_file<P: AsRef<Path>>(config_file_path: P) -> Result<Self, ConfigError> {
299        let config = Config::new_with_config_file(config_file_path);
300        Self::with_config(config, None)
301    }
302
303    /// Initialize the bf-tree with provided config. For advanced user only.
304    /// An optional pre-allocated buffer pointer can be provided to use as the buffer pool memory.
305    pub fn with_config(config: Config, buffer_ptr: Option<*mut u8>) -> Result<Self, ConfigError> {
306        // Validate the config first
307        config.validate()?;
308
309        let wal = match config.write_ahead_log.as_ref() {
310            Some(wal_config) => {
311                let wal = WriteAheadLog::new(wal_config.clone());
312                Some(wal)
313            }
314            None => None,
315        };
316        let write_load_full = config.write_load_full_page;
317        let config = Arc::new(config);
318
319        // In cache-only mode, the initial root page is a full mini-page
320        if config.cache_only {
321            let leaf_storage = LeafStorage::new(config.clone(), buffer_ptr);
322
323            // Assuming CB can accommodate at least 2 leaf pages at the same time
324            let mini_page_guard = (leaf_storage)
325                .alloc_mini_page(config.leaf_page_size)
326                .expect("Fail to allocate a mini-page as initial root node");
327            LeafNode::initialize_mini_page(
328                &mini_page_guard,
329                config.leaf_page_size,
330                MiniPageNextLevel::new_null(),
331                true,
332            );
333            let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
334            let mini_loc = PageLocation::Mini(new_mini_ptr);
335
336            let (root_id, root_lock) = leaf_storage
337                .mapping_table()
338                .insert_mini_page_mapping(mini_loc);
339            assert_eq!(root_id.as_id(), 0);
340
341            drop(root_lock);
342            drop(mini_page_guard);
343            let root_id = root_id.raw() | Self::ROOT_IS_LEAF_MASK;
344
345            return Ok(Self {
346                root_page_id: AtomicU64::new(root_id),
347                storage: leaf_storage,
348                wal,
349                cache_only: config.cache_only,
350                write_load_full_page: write_load_full,
351                mini_page_size_classes: Self::create_mem_page_size_classes(
352                    config.cb_min_record_size,
353                    config.cb_max_record_size,
354                    config.leaf_page_size,
355                    config.max_fence_len,
356                    config.cache_only,
357                ),
358                config,
359                #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
360                metrics_recorder: Some(Arc::new(ThreadLocal::new())),
361            });
362        }
363
364        let leaf_storage = LeafStorage::new(config.clone(), buffer_ptr);
365        let (root_id, root_lock) = leaf_storage.mapping_table().alloc_base_page_mapping();
366        drop(root_lock);
367        assert_eq!(root_id.as_id(), 0);
368
369        let root_id = root_id.raw() | Self::ROOT_IS_LEAF_MASK;
370        Ok(Self {
371            root_page_id: AtomicU64::new(root_id),
372            storage: leaf_storage,
373            wal,
374            cache_only: config.cache_only,
375            write_load_full_page: write_load_full,
376            mini_page_size_classes: Self::create_mem_page_size_classes(
377                config.cb_min_record_size,
378                config.cb_max_record_size,
379                config.leaf_page_size,
380                config.max_fence_len,
381                config.cache_only,
382            ),
383            config,
384            #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
385            metrics_recorder: Some(Arc::new(ThreadLocal::new())),
386        })
387    }
388
389    pub fn config(&self) -> &Config {
390        &self.config
391    }
392
393    /// Get the buffer metrics of the circular buffer.
394    /// This is a blocking call, will stop all other buffer operations,
395    /// use it wisely.
396    pub fn get_buffer_metrics(&self) -> CircularBufferMetrics {
397        self.storage.get_buffer_metrics()
398    }
399
400    /// returns the root page id and whether it is a leaf node.
401    pub(crate) fn get_root_page(&self) -> (PageID, bool) {
402        let root_page_id = self.root_page_id.load(Ordering::Acquire);
403        let root_is_leaf = (root_page_id & Self::ROOT_IS_LEAF_MASK) != 0;
404        let clean = root_page_id & (!Self::ROOT_IS_LEAF_MASK);
405
406        let page_id = if root_is_leaf {
407            PageID::from_id(clean)
408        } else {
409            PageID::from_pointer(clean as *const InnerNode)
410        };
411
412        (page_id, root_is_leaf)
413    }
414
415    pub(crate) fn mapping_table(&self) -> &PageTable {
416        self.storage.mapping_table()
417    }
418
419    pub(crate) fn should_promote_read(&self) -> bool {
420        get_rng().gen_range(0..100) < self.config.read_promotion_rate.load(Ordering::Relaxed)
421    }
422
423    pub(crate) fn should_promote_scan_page(&self) -> bool {
424        get_rng().gen_range(0..100) < self.config.scan_promotion_rate.load(Ordering::Relaxed)
425    }
426
427    /// Chance% to promote a base read record to mini page.
428    pub fn update_read_promotion_rate(&self, new_rate: usize) {
429        self.config
430            .read_promotion_rate
431            .store(new_rate, Ordering::Relaxed);
432    }
433
434    fn try_split_leaf(
435        &self,
436        cur_page_id: PageID,
437        parent: &Option<ReadGuard>,
438    ) -> Result<bool, TreeError> {
439        debug_assert!(cur_page_id.is_id());
440
441        // here we need to acquire x-lock for performance reason:
442        // if we acquire s-lock, it's very difficult for us to later upgrade to x-lock, because rwlock favors readers:
443        //      consider readers keep coming, we will never be able to upgrade to x-lock.
444        let mut cur_page = self.mapping_table().get_mut(&cur_page_id);
445
446        check_parent!(self, cur_page_id, parent);
447
448        let should_split = cur_page.get_split_flag();
449        if !should_split {
450            return Ok(false);
451        }
452        match parent {
453            Some(_) => {
454                unreachable!("Leaf node split should not happen here");
455            }
456            None => {
457                // only for the case of root node split
458
459                // In cache-only mode, the root mini-page node is split into two equal-sized mini-pages
460                if self.cache_only {
461                    // Create a new mini-page of the same size as the current root node
462                    // Assuming CB is at least able to hold two leaf-page sized mini-pages
463                    let mini_page_guard = self
464                        .storage
465                        .alloc_mini_page(self.config.leaf_page_size)
466                        .expect("Fail to allocate a mini-page during root split");
467                    LeafNode::initialize_mini_page(
468                        &mini_page_guard,
469                        self.config.leaf_page_size,
470                        MiniPageNextLevel::new_null(),
471                        true,
472                    );
473                    let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
474                    let mini_loc = PageLocation::Mini(new_mini_ptr);
475
476                    // Insert the new page into mapping table
477                    let (sibling_id, _mini_lock) = self
478                        .storage
479                        .mapping_table()
480                        .insert_mini_page_mapping(mini_loc);
481
482                    // Split current page with the newly created mini page
483                    let cur_page_loc = cur_page.get_page_location().clone();
484                    match cur_page_loc {
485                        PageLocation::Mini(ptr) => {
486                            let cur_mini_page = cur_page.load_cache_page_mut(ptr);
487                            let sibling_page = unsafe { &mut *new_mini_ptr };
488                            let split_key = cur_mini_page.split(sibling_page, true);
489
490                            let mut new_root_builder = InnerNodeBuilder::new();
491                            new_root_builder
492                                .set_left_most_page_id(cur_page_id)
493                                .set_children_is_leaf(true)
494                                .add_record(split_key, sibling_id);
495
496                            let new_root_ptr = new_root_builder.build();
497
498                            self.root_page_id
499                                .store(PageID::from_pointer(new_root_ptr).raw(), Ordering::Release);
500
501                            info!(sibling = sibling_id.raw(), "New root node installed!");
502
503                            debug_assert!(cur_mini_page.meta.meta_count_with_fence() > 0);
504                            debug_assert!(sibling_page.meta.meta_count_with_fence() > 0);
505
506                            return Ok(true);
507                        }
508                        _ => {
509                            panic!("The root node is not a mini-page in cache-only mode")
510                        }
511                    }
512                }
513
514                let mut x_page = cur_page;
515
516                let (sibling_id, mut sibling_entry) = self.alloc_base_page_and_lock();
517
518                info!(sibling = sibling_id.raw(), "Splitting root node!");
519
520                let sibling = sibling_entry.load_base_page_mut();
521
522                let leaf_node = x_page.load_base_page_mut();
523                let split_key = leaf_node.split(sibling, false);
524
525                let mut new_root_builder = InnerNodeBuilder::new();
526                new_root_builder
527                    .set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE))
528                    .set_left_most_page_id(cur_page_id)
529                    .set_children_is_leaf(true)
530                    .add_record(split_key, sibling_id);
531
532                let new_root_ptr = new_root_builder.build();
533
534                self.root_page_id
535                    .store(PageID::from_pointer(new_root_ptr).raw(), Ordering::Release);
536
537                info!(sibling = sibling_id.raw(), "New root node installed!");
538                Ok(true)
539            }
540        }
541    }
542
543    fn alloc_base_page_and_lock(&self) -> (PageID, LeafEntryXLocked<'_>) {
544        let (pid, base_entry) = self.mapping_table().alloc_base_page_mapping();
545
546        (pid, base_entry)
547    }
548
549    fn try_split_inner<'a>(
550        &self,
551        cur_page: PageID,
552        parent: Option<ReadGuard<'a>>,
553    ) -> Result<(bool, Option<ReadGuard<'a>>), TreeError> {
554        let cur_node = ReadGuard::try_read(cur_page.as_inner_node())?;
555
556        check_parent!(self, cur_page, parent);
557
558        let should_split = cur_node.as_ref().meta.get_split_flag();
559
560        if !should_split {
561            return Ok((false, parent));
562        }
563
564        info!(has_parent = parent.is_some(), "split inner node");
565
566        match parent {
567            Some(p) => {
568                let mut x_cur = cur_node.upgrade().map_err(|(_x, e)| e)?;
569                let mut x_parent = p.upgrade().map_err(|(_x, e)| e)?;
570
571                let split_key = x_cur.as_ref().get_split_key();
572
573                let mut sibling_builder = InnerNodeBuilder::new();
574                sibling_builder.set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE));
575
576                let success = x_parent
577                    .as_mut()
578                    .insert(&split_key, sibling_builder.get_page_id());
579                if !success {
580                    x_parent.as_mut().meta.set_split_flag();
581                    return Err(TreeError::NeedRestart);
582                }
583
584                x_cur.as_mut().split(&mut sibling_builder);
585
586                sibling_builder.build();
587
588                Ok((true, Some(x_parent.downgrade())))
589            }
590            None => {
591                let mut x_cur = cur_node.upgrade().map_err(|(_x, e)| e)?;
592
593                let mut sibling_builder = InnerNodeBuilder::new();
594                sibling_builder.set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE));
595                let sibling_id = sibling_builder.get_page_id();
596
597                let split_key = x_cur.as_mut().split(&mut sibling_builder);
598
599                let mut new_root_builder = InnerNodeBuilder::new();
600                new_root_builder
601                    .set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE))
602                    .set_left_most_page_id(cur_page)
603                    .set_children_is_leaf(false)
604                    .add_record(split_key, sibling_id);
605                sibling_builder.build();
606                let new_root_ptr = new_root_builder.build();
607                let _x_root = ReadGuard::try_read(new_root_ptr)
608                    .unwrap()
609                    .upgrade()
610                    .unwrap();
611                self.root_page_id
612                    .store(PageID::from_pointer(new_root_ptr).raw(), Ordering::Release);
613
614                info!(
615                    has_parent = parent.is_some(),
616                    cur = cur_page.raw(),
617                    "finished split inner node"
618                );
619
620                Ok((true, parent))
621            }
622        }
623    }
624
625    pub(crate) fn traverse_to_leaf(
626        &self,
627        key: &[u8],
628        aggressive_split: bool,
629    ) -> Result<(PageID, Option<ReadGuard<'_>>), TreeError> {
630        let (mut cur_page, mut cur_is_leaf) = self.get_root_page();
631        let mut parent: Option<ReadGuard> = None;
632
633        loop {
634            if aggressive_split {
635                if cur_is_leaf
636                    && !cur_page.is_inner_node_pointer()
637                    && self.try_split_leaf(cur_page, &parent)?
638                {
639                    return Err(TreeError::NeedRestart);
640                } else if !cur_is_leaf {
641                    let (split_success, new_parent) = self.try_split_inner(cur_page, parent)?;
642                    if split_success {
643                        return Err(TreeError::NeedRestart);
644                    } else {
645                        parent = new_parent;
646                    }
647                }
648            }
649
650            if cur_is_leaf {
651                return Ok((cur_page, parent));
652            } else {
653                let next = ReadGuard::try_read(cur_page.as_inner_node())?;
654
655                check_parent!(self, cur_page, parent);
656
657                let next_node = next.as_ref();
658                let next_is_leaf = next_node.meta.children_is_leaf();
659                let pos = next_node.lower_bound(key);
660                let kv_meta = next_node.get_kv_meta(pos as u16);
661                cur_page = next_node.get_value(kv_meta);
662                cur_is_leaf = next_is_leaf;
663                parent = Some(next);
664            }
665        }
666    }
667
668    fn write_inner(&self, write_op: WriteOp, aggressive_split: bool) -> Result<(), TreeError> {
669        let (pid, parent) = self.traverse_to_leaf(write_op.key, aggressive_split)?;
670
671        let mut leaf_entry = self.mapping_table().get_mut(&pid);
672
673        check_parent!(self, pid, parent);
674
675        let page_loc = leaf_entry.get_page_location();
676        match page_loc {
677            PageLocation::Null => {
678                if !self.cache_only {
679                    panic!("Found an Null page in non cache-only mode");
680                }
681
682                if write_op.op_type == OpType::Delete {
683                    return Ok(());
684                }
685
686                // Create a new mini-page to replace the null page
687                let mini_page_size = LeafNode::get_chain_size_hint(
688                    write_op.key.len(),
689                    write_op.value.len(),
690                    &self.mini_page_size_classes,
691                    self.cache_only,
692                );
693                let mini_page_guard = self.storage.alloc_mini_page(mini_page_size)?;
694                LeafNode::initialize_mini_page(
695                    &mini_page_guard,
696                    mini_page_size,
697                    MiniPageNextLevel::new_null(),
698                    true,
699                );
700                let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
701                let mini_loc = PageLocation::Mini(new_mini_ptr);
702
703                leaf_entry.create_cache_page_loc(mini_loc);
704
705                let mini_page_ref = leaf_entry.load_cache_page_mut(new_mini_ptr);
706                let insert_success =
707                    mini_page_ref.insert(write_op.key, write_op.value, write_op.op_type, 0);
708                assert!(insert_success);
709
710                debug_assert!(mini_page_ref.meta.meta_count_with_fence() > 0);
711                counter!(InsertCreatedMiniPage);
712            }
713            _ => {
714                leaf_entry.insert(
715                    write_op.key,
716                    write_op.value,
717                    parent,
718                    write_op.op_type,
719                    &self.storage,
720                    &self.write_load_full_page,
721                    &self.cache_only,
722                    &self.mini_page_size_classes,
723                )?;
724
725                if leaf_entry.cache_page_about_to_evict(&self.storage) {
726                    // we don't care about the result here
727                    _ = leaf_entry.move_cache_page_to_tail(&self.storage);
728                }
729
730                if let Some(wal) = &self.wal {
731                    let lsn = wal.append_and_wait(&write_op, leaf_entry.get_disk_offset());
732                    leaf_entry.update_lsn(lsn);
733                }
734            }
735        }
736
737        Ok(())
738    }
739
740    /// Make sure you're not holding any lock while calling this function.
741    pub(crate) fn evict_from_circular_buffer(&self) -> Result<usize, TreeError> {
742        // Why we need to evict multiple times?
743        // because we don't want each alloc to trigger evict, i.e., we want alloc to fail less often.
744        // with default 1024 bytes, one eviction allows us to alloc 1024 bytes (4 256-byte mini pages) without failure.
745        const TARGET_EVICT_SIZE: usize = 1024;
746        let mut evicted = 0;
747
748        // A corner case: we may not have enough memory to evict (i.e., the buffer might be empty now)
749        let mut retry_cnt = 0;
750
751        while evicted < TARGET_EVICT_SIZE && retry_cnt < 10 {
752            let n = self
753                .storage
754                .evict_from_buffer(|mini_page_handle: &TombstoneHandle| {
755                    eviction_callback(mini_page_handle, self)
756                })?;
757            evicted += n as usize;
758            retry_cnt += 1;
759        }
760        info!("stopped evict from circular buffer");
761        Ok(evicted)
762    }
763
764    /// Insert a key-value pair to the system, overrides existing value if present.
765    ///
766    /// ```
767    /// use bf_tree::BfTree;
768    /// use bf_tree::LeafReadResult;
769    ///
770    /// let mut config = bf_tree::Config::default();
771    /// config.cb_min_record_size(4);
772    /// let tree = BfTree::with_config(config, None).unwrap();
773    /// tree.insert(b"key", b"value");
774    /// let mut buffer = [0u8; 1024];
775    /// let read_size = tree.read(b"key", &mut buffer);
776    ///
777    /// assert_eq!(read_size, LeafReadResult::Found(5));
778    /// assert_eq!(&buffer[..5], b"value");
779    /// ```
780    pub fn insert(&self, key: &[u8], value: &[u8]) -> LeafInsertResult {
781        // The input key cannot exceed the configured max key length
782        if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
783            return LeafInsertResult::InvalidKV(format!("Key too large {}", key.len()));
784        }
785
786        // The input key has to be one byte at least
787        if key.is_empty() {
788            return LeafInsertResult::InvalidKV(format!(
789                "Key too small {}, at least one byte",
790                key.len()
791            ));
792        }
793
794        // The input key value pair cannot exceed the configured max record size
795        if value.len() > MAX_VALUE_LEN || key.len() + value.len() > self.config.cb_max_record_size {
796            return LeafInsertResult::InvalidKV(format!(
797                "Record too large {}, {}, please adjust cb_max_record_size in config",
798                key.len(),
799                value.len()
800            ));
801        }
802
803        // The input key value pair cannot be smaller than the configured min record size
804        if key.len() + value.len() < self.config.cb_min_record_size {
805            return LeafInsertResult::InvalidKV(format!(
806                "Record too small {}, {}, please adjust cb_min_record_size in config",
807                key.len(),
808                value.len()
809            ));
810        }
811
812        let backoff = Backoff::new();
813        let mut aggressive_split = false;
814
815        counter!(Insert);
816        info!(key_len = key.len(), value_len = value.len(), "insert");
817
818        loop {
819            let result = self.write_inner(WriteOp::make_insert(key, value), aggressive_split);
820            match result {
821                Ok(_) => return LeafInsertResult::Success,
822                Err(TreeError::NeedRestart) => {
823                    #[cfg(all(feature = "shuttle", test))]
824                    {
825                        shuttle::thread::yield_now();
826                    }
827                    counter!(InsertNeedRestart);
828                    aggressive_split = true;
829                }
830                Err(TreeError::CircularBufferFull) => {
831                    info!("insert failed, started evict from circular buffer");
832                    aggressive_split = true;
833                    counter!(InsertCircularBufferFull);
834                    _ = self.evict_from_circular_buffer();
835                    continue;
836                }
837                Err(TreeError::Locked) => {
838                    counter!(InsertLocked);
839                    backoff.snooze();
840                }
841            }
842        }
843    }
844
845    /// Read a record from the tree.
846    /// Returns the number of bytes read.
847    ///
848    /// TODO: don't panic if the out_buffer is too small, instead returns a error.
849    ///
850    /// ```
851    /// use bf_tree::BfTree;
852    /// use bf_tree::LeafReadResult;
853    ///
854    /// let mut config = bf_tree::Config::default();
855    /// config.cb_min_record_size(4);
856    ///
857    /// let tree = BfTree::with_config(config, None).unwrap();
858    /// tree.insert(b"key", b"value");
859    /// let mut buffer = [0u8; 1024];
860    /// let read_size = tree.read(b"key", &mut buffer);
861    /// assert_eq!(read_size, LeafReadResult::Found(5));
862    /// assert_eq!(&buffer[..5], b"value");
863    /// ```
864    pub fn read(&self, key: &[u8], out_buffer: &mut [u8]) -> LeafReadResult {
865        // The input key cannot exceed the configured max key length
866        if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
867            return LeafReadResult::InvalidKey;
868        }
869
870        // The input key has to be one byte at least
871        if key.is_empty() {
872            return LeafReadResult::InvalidKey;
873        }
874
875        let backoff = Backoff::new();
876
877        info!(key_len = key.len(), "read");
878        counter!(Read);
879        let mut aggressive_split = false;
880
881        #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
882        let mut debug_timer = DebugTimerGuard::new(Timer::Read, self.metrics_recorder.clone());
883
884        loop {
885            let result = self.read_inner(key, out_buffer, aggressive_split);
886            match result {
887                Ok(v) => {
888                    #[cfg(any(
889                        feature = "metrics-rt-debug-all",
890                        feature = "metrics-rt-debug-timer"
891                    ))]
892                    debug_timer.end();
893
894                    return v;
895                }
896                Err(TreeError::CircularBufferFull) => {
897                    info!("read promotion failed, started evict from circular buffer");
898                    aggressive_split = true;
899                    match self.evict_from_circular_buffer() {
900                        Ok(_) => continue,
901                        Err(_) => continue,
902                    };
903                }
904                Err(_) => {
905                    backoff.spin();
906                    aggressive_split = true;
907                }
908            }
909        }
910    }
911
912    /// Delete a record from the tree.
913    ///
914    /// ```
915    /// use bf_tree::BfTree;
916    /// use bf_tree::LeafReadResult;
917    ///
918    /// let tree = BfTree::default();
919    /// tree.insert(b"key", b"value");
920    /// tree.delete(b"key");
921    /// let mut buffer = [0u8; 1024];
922    /// let rt = tree.read(b"key", &mut buffer);
923    /// assert_eq!(rt, LeafReadResult::Deleted);
924    /// ```
925    pub fn delete(&self, key: &[u8]) {
926        // The input key cannot exceed the configured max key length
927        if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
928            return;
929        }
930
931        // The input key has to be one byte at least
932        if key.is_empty() {
933            return;
934        }
935
936        let backoff = Backoff::new();
937
938        info!(key_len = key.len(), "delete");
939
940        let mut aggressive_split = false;
941
942        loop {
943            let result = self.write_inner(WriteOp::make_delete(key), aggressive_split);
944            match result {
945                Ok(_) => return,
946                Err(TreeError::CircularBufferFull) => {
947                    info!("delete failed, started evict from circular buffer");
948                    aggressive_split = true;
949                    match self.evict_from_circular_buffer() {
950                        Ok(_) => continue,
951                        Err(_) => continue,
952                    };
953                }
954                Err(_) => {
955                    aggressive_split = true;
956                    backoff.spin();
957                }
958            }
959        }
960    }
961
962    /// Scan records in the tree, with starting key and desired scan count.
963    /// Returns a iterator that yields key-value pairs.
964    pub fn scan_with_count<'a>(
965        &'a self,
966        key: &[u8],
967        cnt: usize,
968        return_field: ScanReturnField,
969    ) -> Result<ScanIter<'a, 'a>, ScanIterError> {
970        // In cache-only mode, scan is not supported
971        if self.cache_only {
972            return Err(ScanIterError::CacheOnlyMode);
973        }
974
975        // The start key cannot exceed the configured max key length
976        if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
977            return Err(ScanIterError::InvalidStartKey);
978        }
979
980        // The input key has to be one byte at least
981        if key.is_empty() {
982            return Err(ScanIterError::InvalidStartKey);
983        }
984
985        // The count cannot be zero
986        if cnt == 0 {
987            return Err(ScanIterError::InvalidCount);
988        }
989
990        Ok(ScanIter::new_with_scan_count(self, key, cnt, return_field))
991    }
992
993    pub fn scan_with_end_key<'a>(
994        &'a self,
995        start_key: &[u8],
996        end_key: &[u8],
997        return_field: ScanReturnField,
998    ) -> Result<ScanIter<'a, 'a>, ScanIterError> {
999        // In cache-only mode, scan is not supported
1000        if self.cache_only {
1001            return Err(ScanIterError::CacheOnlyMode);
1002        }
1003
1004        // The start key cannot exceed the configured max key length
1005        if start_key.len() > self.config.max_fence_len / 2 || start_key.len() > MAX_KEY_LEN {
1006            return Err(ScanIterError::InvalidStartKey);
1007        }
1008
1009        // The input key has to be one byte at least
1010        if start_key.is_empty() {
1011            return Err(ScanIterError::InvalidStartKey);
1012        }
1013
1014        // The end key cannot exceed the configured max key length
1015        if end_key.len() > self.config.max_fence_len / 2 || end_key.len() > MAX_KEY_LEN {
1016            return Err(ScanIterError::InvalidEndKey);
1017        }
1018
1019        // The input key has to be one byte at least
1020        if end_key.is_empty() {
1021            return Err(ScanIterError::InvalidEndKey);
1022        }
1023
1024        // The start key cannot be greater than the end key
1025        let cmp = start_key.cmp(end_key);
1026        if cmp == std::cmp::Ordering::Greater {
1027            return Err(ScanIterError::InvalidKeyRange);
1028        }
1029
1030        Ok(ScanIter::new_with_end_key(
1031            self,
1032            start_key,
1033            end_key,
1034            return_field,
1035        ))
1036    }
1037
1038    #[doc(hidden)]
1039    pub fn scan_mut_with_count<'a>(
1040        &'a self,
1041        key: &'a [u8],
1042        cnt: usize,
1043        return_field: ScanReturnField,
1044    ) -> Result<ScanIterMut<'a, 'a>, ScanIterError> {
1045        // In cache-only mode, scan is not supported
1046        if self.cache_only {
1047            return Err(ScanIterError::CacheOnlyMode);
1048        }
1049
1050        // The start key cannot exceed the configured max key length
1051        if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
1052            return Err(ScanIterError::InvalidStartKey);
1053        }
1054
1055        // The count cannot be zero
1056        if cnt == 0 {
1057            return Err(ScanIterError::InvalidCount);
1058        }
1059
1060        Ok(ScanIterMut::new_with_scan_count(
1061            self,
1062            key,
1063            cnt,
1064            return_field,
1065        ))
1066    }
1067
1068    #[doc(hidden)]
1069    pub fn scan_mut_with_end_key<'a>(
1070        &'a self,
1071        start_key: &'a [u8],
1072        end_key: &'a [u8],
1073        return_field: ScanReturnField,
1074    ) -> Result<ScanIterMut<'a, 'a>, ScanIterError> {
1075        // In cache-only mode, scan is not supported
1076        if self.cache_only {
1077            return Err(ScanIterError::CacheOnlyMode);
1078        }
1079
1080        // The start key cannot exceed the configured max key length
1081        if start_key.len() > self.config.max_fence_len / 2 || start_key.len() > MAX_KEY_LEN {
1082            return Err(ScanIterError::InvalidStartKey);
1083        }
1084
1085        // The end key cannot exceed the configured max key length
1086        if end_key.len() > self.config.max_fence_len / 2 || end_key.len() > MAX_KEY_LEN {
1087            return Err(ScanIterError::InvalidEndKey);
1088        }
1089
1090        Ok(ScanIterMut::new_with_end_key(
1091            self,
1092            start_key,
1093            end_key,
1094            return_field,
1095        ))
1096    }
1097
1098    fn read_inner(
1099        &self,
1100        key: &[u8],
1101        out_buffer: &mut [u8],
1102        aggressive_split: bool,
1103    ) -> Result<LeafReadResult, TreeError> {
1104        let (node, parent) = self.traverse_to_leaf(key, aggressive_split)?;
1105
1106        let mut leaf = self.mapping_table().get(&node);
1107
1108        check_parent!(self, node, parent);
1109
1110        let out = leaf.read(
1111            key,
1112            out_buffer,
1113            self.config.mini_page_binary_search,
1114            self.cache_only,
1115        );
1116        match out {
1117            ReadResult::Mini(r) | ReadResult::Full(r) => {
1118                if leaf.cache_page_about_to_evict(&self.storage) {
1119                    let mut x_leaf = match leaf.try_upgrade() {
1120                        Ok(v) => v,
1121                        Err(_) => return Ok(r),
1122                    };
1123                    // we don't care about the result here, because we are in read path, we don't want to block.
1124                    _ = x_leaf.move_cache_page_to_tail(&self.storage);
1125                }
1126
1127                Ok(r)
1128            }
1129
1130            ReadResult::Base(r) => {
1131                counter!(BasePageRead);
1132
1133                // In cache-only mode, no base page should exist
1134                if self.cache_only {
1135                    panic!("Attempt to read a base page while in cache-only mode.");
1136                }
1137
1138                let v = match r {
1139                    LeafReadResult::Found(v) => v,
1140                    _ => return Ok(r),
1141                };
1142
1143                if parent.is_none() || !self.should_promote_read() {
1144                    return Ok(r);
1145                }
1146
1147                let mut x_leaf = match leaf.try_upgrade() {
1148                    Ok(x) => x,
1149                    Err(_) => {
1150                        return Ok(r);
1151                    }
1152                };
1153
1154                if self.config.read_record_cache {
1155                    // we do record cache.
1156                    // we roll dice to see if we should insert this value to mini page.
1157
1158                    let out = x_leaf.insert(
1159                        key,
1160                        &out_buffer[0..v as usize],
1161                        parent,
1162                        OpType::Cache,
1163                        &self.storage,
1164                        &self.write_load_full_page,
1165                        &self.cache_only,
1166                        &self.mini_page_size_classes,
1167                    );
1168
1169                    match out {
1170                        Ok(_) => {
1171                            counter!(ReadPromotionOk);
1172                            Ok(r)
1173                        }
1174                        Err(TreeError::Locked) => {
1175                            // We are doing this very optimistically, if contention happens, we just abort and return.
1176                            counter!(ReadPromotionFailed);
1177                            Ok(r)
1178                        }
1179                        Err(TreeError::CircularBufferFull) => {
1180                            counter!(ReadPromotionFailed);
1181                            Err(TreeError::CircularBufferFull)
1182                        }
1183                        Err(TreeError::NeedRestart) => {
1184                            // If we need restart here, potentially because parent is full.
1185                            counter!(ReadPromotionFailed);
1186                            Err(TreeError::NeedRestart)
1187                        }
1188                    }
1189                } else {
1190                    match self.upgrade_to_full_page(x_leaf, parent.unwrap()) {
1191                        Ok(_) | Err(TreeError::Locked) => Ok(r),
1192                        Err(e) => Err(e),
1193                    }
1194                }
1195            }
1196            ReadResult::None => Ok(LeafReadResult::NotFound),
1197        }
1198    }
1199
1200    fn upgrade_to_full_page<'a>(
1201        &'a self,
1202        mut x_leaf: LeafEntryXLocked<'a>,
1203        parent: ReadGuard<'a>,
1204    ) -> Result<LeafEntryXLocked<'a>, TreeError> {
1205        let page_loc = x_leaf.get_page_location().clone();
1206        match page_loc {
1207            PageLocation::Mini(ptr) => {
1208                let mini_page = x_leaf.load_cache_page_mut(ptr);
1209                let h = self.storage.begin_dealloc_mini_page(mini_page)?;
1210                let _merge_result = x_leaf.try_merge_mini_page(&h, parent, &self.storage)?;
1211                let base_offset = mini_page.next_level;
1212                x_leaf.change_to_base_loc();
1213                self.storage.finish_dealloc_mini_page(h);
1214
1215                let base_page_ref = x_leaf.load_base_page_from_buffer();
1216                let full_page_loc =
1217                    upgrade_to_full_page(&self.storage, base_page_ref, base_offset)?;
1218                x_leaf.create_cache_page_loc(full_page_loc);
1219                Ok(x_leaf)
1220            }
1221            PageLocation::Full(_ptr) => Ok(x_leaf),
1222            PageLocation::Base(offset) => {
1223                let base_page_ref = x_leaf.load_base_page(offset);
1224                let next_level = MiniPageNextLevel::new(offset);
1225                let full_page_loc = upgrade_to_full_page(&self.storage, base_page_ref, next_level)?;
1226                x_leaf.create_cache_page_loc(full_page_loc);
1227                Ok(x_leaf)
1228            }
1229            PageLocation::Null => panic!("upgrade_to_full_page on Null page"),
1230        }
1231    }
1232
1233    /// Collect all metrics and reset the metric recorder
1234    /// The caller needs to ensure there are no references to the bf-tree's metrics recorder anymore.
1235    pub fn get_metrics(&mut self) -> Option<serde_json::Value> {
1236        #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
1237        {
1238            let recorder = self.metrics_recorder.take();
1239            match recorder {
1240                Some(r) => {
1241                    let recorders = Arc::try_unwrap(r).expect("Failed to obtain the recorders of bf-tree, please make sure no other references exist.");
1242                    let mut timer_accumulated = TimerRecorder::default();
1243
1244                    // Only collect timer metrics for now
1245                    for r in recorders {
1246                        let t = unsafe { &*r.get() };
1247
1248                        timer_accumulated += t.timers.clone();
1249                    }
1250
1251                    let output = serde_json::json!({
1252                        "Timers": timer_accumulated,
1253                    });
1254
1255                    self.metrics_recorder = Some(Arc::new(ThreadLocal::new()));
1256
1257                    Some(output)
1258                }
1259                None => None,
1260            }
1261        }
1262        #[cfg(not(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer")))]
1263        {
1264            None
1265        }
1266    }
1267}
1268
1269pub(crate) fn key_value_physical_size(key: &[u8], value: &[u8]) -> usize {
1270    let key_size = key.len();
1271    let value_size = value.len();
1272    let meta_size = crate::nodes::KV_META_SIZE;
1273    key_size + value_size + meta_size
1274}
1275
1276pub(crate) fn eviction_callback(
1277    mini_page_handle: &TombstoneHandle,
1278    tree: &BfTree,
1279) -> Result<(), TreeError> {
1280    let mini_page = mini_page_handle.ptr as *mut LeafNode;
1281    let key_to_this_page = if tree.cache_only {
1282        unsafe { &*mini_page }.try_get_key_to_reach_this_node()?
1283    } else {
1284        unsafe { &*mini_page }.get_key_to_reach_this_node()
1285    };
1286
1287    // Here we need to set aggressive split to true, because we would split parent node due to leaf split.
1288    let (pid, parent) = tree.traverse_to_leaf(&key_to_this_page, true)?;
1289    info!(
1290        pid = pid.raw(),
1291        "starting to merge mini page in eviction call back"
1292    );
1293
1294    let mut leaf_entry = tree.mapping_table().get_mut(&pid);
1295
1296    histogram!(EvictNodeSize, unsafe { &*mini_page }.meta.node_size as u64);
1297
1298    match leaf_entry.get_page_location() {
1299        PageLocation::Mini(ptr) => {
1300            {
1301                // In order to lock this node, we need to traverse to this node first;
1302                // but in order to traverse this node, we need to read the keys in this node;
1303                // in order to read the keys in this node, we need to lock this node.
1304                //
1305                // Because we didn't lock the node while reading `key_to_this_page`,
1306                // we need to recheck if the node is still the same node.
1307                if *ptr != mini_page {
1308                    return Err(TreeError::NeedRestart);
1309                }
1310            }
1311
1312            let parent = parent.expect("Mini page must have a parent");
1313            parent.check_version()?;
1314
1315            // In the case of cache_only, the correponding mapping table entry of the mini-page
1316            // is replaced by a non-existant base page
1317            if tree.cache_only {
1318                leaf_entry.change_to_null_loc();
1319            } else {
1320                leaf_entry.try_merge_mini_page(mini_page_handle, parent, &tree.storage)?;
1321                leaf_entry.change_to_base_loc();
1322                // we don't need to dealloc the old_mini_page here because we are in eviction callback.
1323            }
1324
1325            Ok(())
1326        }
1327
1328        PageLocation::Full(ptr) => {
1329            if *ptr != mini_page {
1330                return Err(TreeError::NeedRestart);
1331            }
1332
1333            leaf_entry.merge_full_page(mini_page_handle);
1334            Ok(())
1335        }
1336
1337        // This means the key read from the mini page is corrupted and points to a different page
1338        PageLocation::Base(_offset) => Err(TreeError::NeedRestart),
1339
1340        // This means the key read from the mini page is corrupted and points to a different page
1341        PageLocation::Null => Err(TreeError::NeedRestart),
1342    }
1343}
1344
1345#[cfg(test)]
1346mod tests {
1347    use crate::error::ConfigError;
1348    use crate::BfTree;
1349
1350    #[test]
1351    fn test_mini_page_size_classes() {
1352        let mut size_classes = BfTree::create_mem_page_size_classes(48, 1952, 4096, 64, false);
1353        assert_eq!(
1354            size_classes,
1355            vec![128, 192, 256, 512, 960, 1856, 2048, 4096]
1356        );
1357
1358        size_classes = BfTree::create_mem_page_size_classes(1548, 1548, 3136, 64, true);
1359        assert_eq!(size_classes, vec![1536, 3136]);
1360
1361        size_classes = BfTree::create_mem_page_size_classes(48, 3072, 12288, 64, false);
1362        assert_eq!(
1363            size_classes,
1364            vec![128, 192, 256, 512, 960, 1856, 3648, 7232, 9088, 12288]
1365        );
1366
1367        size_classes = BfTree::create_mem_page_size_classes(4, 1952, 4096, 32, false);
1368        assert_eq!(size_classes, vec![64, 128, 256, 448, 832, 1600, 2048, 4096]);
1369    }
1370
1371    #[test]
1372    fn test_invalid_config_to_build_bf_tree() {
1373        // Min record too small
1374        let mut config = crate::Config::default();
1375        config.cb_min_record_size(4);
1376        config.leaf_page_size(32 * 1024);
1377
1378        if let Err(e) = BfTree::with_config(config.clone(), None) {
1379            match e {
1380                ConfigError::MinimumRecordSize(_) => {}
1381                _ => panic!("Expected InvalidMinimumRecordSize error"),
1382            }
1383        } else {
1384            panic!("Expected error but got Ok");
1385        }
1386
1387        // Max record too large
1388        config = crate::Config::default();
1389        config.cb_max_record_size(64 * 1024);
1390
1391        if let Err(e) = BfTree::with_config(config.clone(), None) {
1392            match e {
1393                ConfigError::MaximumRecordSize(_) => {}
1394                _ => panic!("Expected InvalidMaximumRecordSize error"),
1395            }
1396        } else {
1397            panic!("Expected error but got Ok");
1398        }
1399
1400        // Leaf page size not aligned
1401        config = crate::Config::default();
1402        config.leaf_page_size(4050);
1403
1404        if let Err(e) = BfTree::with_config(config.clone(), None) {
1405            match e {
1406                ConfigError::LeafPageSize(_) => {}
1407                _ => panic!("Expected InvalidLeafPageSize error"),
1408            }
1409        } else {
1410            panic!("Expected error but got Ok");
1411        }
1412
1413        // Circular buffer size too small
1414        config = crate::Config::default();
1415        config.leaf_page_size(16 * 1024);
1416        config.cb_size_byte(16 * 1024);
1417
1418        if let Err(e) = BfTree::with_config(config.clone(), None) {
1419            match e {
1420                ConfigError::CircularBufferSize(_) => {}
1421                _ => panic!("Expected InvalidCircularBufferSize error"),
1422            }
1423        } else {
1424            panic!("Expected error but got Ok");
1425        }
1426
1427        // Circular buffer size not power of two
1428        config = crate::Config::default();
1429        config.cb_size_byte(20 * 1024);
1430        if let Err(e) = BfTree::with_config(config.clone(), None) {
1431            match e {
1432                ConfigError::CircularBufferSize(_) => {}
1433                _ => panic!("Expected InvalidCircularBufferSize error"),
1434            }
1435        } else {
1436            panic!("Expected error but got Ok");
1437        }
1438
1439        // Cache-only mode specific
1440        config = crate::Config::default();
1441        config.cache_only(true);
1442        config.cb_size_byte(2 * 4096);
1443
1444        if let Err(e) = BfTree::with_config(config.clone(), None) {
1445            match e {
1446                ConfigError::CircularBufferSize(_) => {}
1447                _ => panic!("Expected InvalidCircularBufferSize error"),
1448            }
1449        } else {
1450            panic!("Expected error but got Ok");
1451        }
1452    }
1453}