bf_tree/
tree.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT license.
3
4#[cfg(not(all(feature = "shuttle", test)))]
5use rand::Rng;
6
7#[cfg(all(feature = "shuttle", test))]
8use shuttle::rand::Rng;
9
10use cfg_if::cfg_if;
11
12cfg_if! {
13    if #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))] {
14        use crate::metric::{Timer, TlsRecorder, timer::{TimerRecorder, DebugTimerGuard}};
15        use std::cell::UnsafeCell;
16        use thread_local::ThreadLocal;
17    }
18}
19
20use crate::{
21    check_parent,
22    circular_buffer::{CircularBufferMetrics, TombstoneHandle},
23    counter,
24    error::{ConfigError, TreeError},
25    histogram, info,
26    mini_page_op::{upgrade_to_full_page, LeafEntryXLocked, LeafOperations, ReadResult},
27    nodes::{
28        leaf_node::{LeafKVMeta, LeafReadResult, MiniPageNextLevel, OpType},
29        InnerNode, InnerNodeBuilder, LeafNode, PageID, CACHE_LINE_SIZE, DISK_PAGE_SIZE,
30        INNER_NODE_SIZE, MAX_KEY_LEN, MAX_LEAF_PAGE_SIZE, MAX_VALUE_LEN,
31    },
32    range_scan::{ScanIter, ScanIterMut, ScanReturnField},
33    storage::{LeafStorage, PageLocation, PageTable},
34    sync::{
35        atomic::{AtomicU64, Ordering},
36        Arc,
37    },
38    utils::{get_rng, inner_lock::ReadGuard, Backoff, BfsVisitor, NodeInfo},
39    wal::{WriteAheadLog, WriteOp},
40    Config, StorageBackend,
41};
42use std::path::Path;
43
44/// The bf-tree instance
45pub struct BfTree {
46    pub(crate) root_page_id: AtomicU64,
47    pub(crate) storage: LeafStorage,
48    pub(crate) wal: Option<Arc<WriteAheadLog>>,
49    pub(crate) config: Arc<Config>,
50    pub(crate) write_load_full_page: bool,
51    pub(crate) cache_only: bool, // If true, there is no permenant storage layer thus no durability guarantee of any upsert in the tree
52    pub(crate) mini_page_size_classes: Vec<usize>, // Size classes of mini pages
53    #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
54    pub metrics_recorder: Option<Arc<ThreadLocal<UnsafeCell<TlsRecorder>>>>, // Per-tree metrics recorder under "metrics-rt-debug" feature
55}
56
57unsafe impl Sync for BfTree {}
58
59unsafe impl Send for BfTree {}
60
61#[derive(Debug, PartialEq, Eq, Clone)]
62pub enum LeafInsertResult {
63    Success,
64    InvalidKV(String),
65}
66
67#[derive(Debug, PartialEq, Eq, Clone)]
68pub enum ScanIterError {
69    CacheOnlyMode,
70    InvalidStartKey,
71    InvalidEndKey,
72    InvalidCount,
73    InvalidKeyRange,
74}
75
76impl Drop for BfTree {
77    fn drop(&mut self) {
78        if let Some(ref wal) = self.wal {
79            wal.stop_background_job();
80        }
81
82        let visitor = BfsVisitor::new_all_nodes(self);
83        for node_info in visitor {
84            match node_info {
85                NodeInfo::Leaf { page_id, .. } => {
86                    let mut leaf = self.mapping_table().get_mut(&page_id);
87                    leaf.dealloc_self(&self.storage, self.cache_only);
88                }
89                NodeInfo::Inner { ptr, .. } => {
90                    if unsafe { &*ptr }.is_valid_disk_offset() {
91                        let disk_offset = unsafe { &*ptr }.disk_offset;
92                        if self.config.storage_backend == StorageBackend::Memory {
93                            // special case for memory backend, we need to deallocate the memory.
94                            self.storage.vfs.dealloc_offset(disk_offset as usize);
95                        }
96                    }
97                    InnerNode::free_node(ptr as *mut InnerNode);
98                }
99            }
100        }
101    }
102}
103
104impl Default for BfTree {
105    fn default() -> Self {
106        Self::new(":memory:", 1024 * 1024 * 32).unwrap()
107    }
108}
109
110impl BfTree {
111    pub(crate) const ROOT_IS_LEAF_MASK: u64 = 0x8000_0000_0000_0000; // This is quite error-prone, make sure the mask is not conflicting with the page id definition.
112
113    /// Create the size classes of all memory pages in acending order based on the record size (key + value) and the leaf page size
114    /// [s_0, s_1, ..., s_x], ascending order.
115    /// Each s_i is of size 2^i * c + size_of(LeafNode)
116    /// where c = (min_record_size + LeafKVMeta) aligned on CACHE_LINE_SIZE and 2^i * c <= leaf_page_size and s_x = leaf_page_size
117    ///
118    /// In non cache-only mode, the largest mini page size is s_(x-1) and full page/leaf base page size is s_x.
119    /// Currently, the design assumes a mini-page can always be successfully merged into a leaf page in one pass (including at most one base page split).
120    /// As such, the following three conditions are sufficient in preventing merge failures.
121    /// C1) x >= 1
122    /// C2) s_x > s_{x-1}
123    /// C3) max_record_size + SizeOf(KVMeta) <= (s_x - s_{x-1} - 2 * (fence_meta + max_key_len)).
124    ///
125    /// In cache-only mode, the largest mini page is s_x as there is no full nor base page.
126    /// Although there is no merging of mini-pages, the design assumes a new record can always be successfully inserted into a mini-page in one pass
127    /// (including at most one mini-page split). As such, the following sufficient condition is required.
128    /// C1) max_record_page + Sizeof(KVMeta) <= (s_x - SizeOf(NodeMeta)) / 2
129    /// C2) if x >= 1, s_x >= s_{x-1} + max_record_size + SizeOf(KVMeta)
130    ///
131    /// C2) is necessary for cache-only mode to guarantee that a mini-page can grow to full page size before being split up to two full-sized pages
132    pub(crate) fn create_mem_page_size_classes(
133        min_record_size_in_byte: usize,
134        max_record_size_in_byte: usize,
135        leaf_page_size_in_byte: usize,
136        max_fence_len_in_byte: usize,
137        cache_only: bool,
138    ) -> Vec<usize> {
139        // Sanity check of the input parameters
140        assert!(
141            min_record_size_in_byte > 1,
142            "cb_min_record_size in config cannot be less than 2"
143        );
144        assert!(
145            min_record_size_in_byte <= max_record_size_in_byte,
146            "cb_min_record_size cannot be larger than cb_max_record_size"
147        );
148        assert!(
149            max_fence_len_in_byte > 0,
150            "max_fence_len in config cannot be zero"
151        );
152        assert!(
153            max_fence_len_in_byte / 2 < max_record_size_in_byte,
154            "max_fence_len/2 cannot be larger than cb_max_record_size"
155        );
156        assert!(
157            leaf_page_size_in_byte <= MAX_LEAF_PAGE_SIZE,
158            "leaf_page_size in config cannot be larger than {}",
159            MAX_LEAF_PAGE_SIZE
160        );
161        assert!(
162            max_fence_len_in_byte / 2 <= MAX_KEY_LEN,
163            "max_key_len in config cannot be larger than {}",
164            MAX_KEY_LEN
165        );
166        assert!(
167            leaf_page_size_in_byte / min_record_size_in_byte <= 4096,
168            "Maximum number of records per page (leaf_page_size/min_record_size) cannot exceed 2^12.", // This is restricted by #bits for value count in NodeMeta
169        );
170
171        // In non cache-only mode, the leaf page should be in the multiple of DISK_PAGE_SIZE.
172        // In cache-only mode, the leaf page should be aligned on cache line size.
173        if !cache_only {
174            assert!(
175                leaf_page_size_in_byte.is_multiple_of(DISK_PAGE_SIZE),
176                "leaf_page_size in config should be multiple of {}",
177                DISK_PAGE_SIZE
178            );
179        } else {
180            assert!(
181                leaf_page_size_in_byte.is_multiple_of(CACHE_LINE_SIZE),
182                "leaf_page_size in config should be multiple of {}",
183                CACHE_LINE_SIZE
184            );
185        }
186
187        let max_record_size_with_meta = max_record_size_in_byte + std::mem::size_of::<LeafKVMeta>();
188        let mut max_mini_page_size: usize;
189
190        if cache_only {
191            // Guarantee C1), C2) for cache-only mode
192            max_mini_page_size = leaf_page_size_in_byte - max_record_size_with_meta;
193            max_mini_page_size = (max_mini_page_size / CACHE_LINE_SIZE) * CACHE_LINE_SIZE;
194
195            assert!(
196                leaf_page_size_in_byte
197                    >= 2 * max_record_size_with_meta + std::mem::size_of::<LeafNode>(),
198                "cb_max_record_size of config should be <= {}",
199                (leaf_page_size_in_byte - std::mem::size_of::<LeafNode>()) / 2
200                    - std::mem::size_of::<LeafKVMeta>()
201            );
202        } else {
203            // Guarantee C1), C2) and C3) for non cache-only mode
204            max_mini_page_size = leaf_page_size_in_byte
205                - max_record_size_with_meta
206                - max_fence_len_in_byte
207                - 2 * std::mem::size_of::<LeafKVMeta>();
208            max_mini_page_size = (max_mini_page_size / CACHE_LINE_SIZE) * CACHE_LINE_SIZE;
209
210            assert!(
211                max_mini_page_size >= max_record_size_with_meta + std::mem::size_of::<LeafNode>(),
212                "cb_max_record_size of config should be <= {}",
213                max_mini_page_size
214                    - std::mem::size_of::<LeafNode>()
215                    - std::mem::size_of::<LeafKVMeta>()
216            );
217        }
218
219        // Generate all size classes for mini-pages
220        let mut mem_page_size_classes = Vec::new();
221
222        let base: i32 = 2;
223        let mut record_num_per_page_exp: u32 = 0;
224        let c: usize = min_record_size_in_byte + std::mem::size_of::<LeafKVMeta>();
225
226        // No need to consider fence here as mini-pages have no fences.
227        let mut size_class =
228            base.pow(record_num_per_page_exp) as usize * c + std::mem::size_of::<LeafNode>();
229
230        // Memory page size is aligned on cache line size
231        if !size_class.is_multiple_of(CACHE_LINE_SIZE) {
232            size_class = (size_class / CACHE_LINE_SIZE + 1) * CACHE_LINE_SIZE;
233        }
234
235        while size_class <= max_mini_page_size {
236            if mem_page_size_classes.is_empty()
237                || (mem_page_size_classes[mem_page_size_classes.len() - 1] < size_class)
238            {
239                mem_page_size_classes.push(size_class);
240            }
241
242            record_num_per_page_exp += 1;
243            size_class =
244                base.pow(record_num_per_page_exp) as usize * c + std::mem::size_of::<LeafNode>();
245
246            if !size_class.is_multiple_of(CACHE_LINE_SIZE) {
247                size_class = (size_class / CACHE_LINE_SIZE + 1) * CACHE_LINE_SIZE;
248            }
249        }
250
251        if !cache_only {
252            assert!(!mem_page_size_classes.is_empty());
253        }
254
255        // Add the largest mini page size if not already added
256        if mem_page_size_classes.is_empty()
257            || mem_page_size_classes[mem_page_size_classes.len() - 1] < max_mini_page_size
258        {
259            mem_page_size_classes.push(max_mini_page_size);
260        }
261
262        // The largest page size is the full leaf page size
263        if mem_page_size_classes.is_empty()
264            || mem_page_size_classes[mem_page_size_classes.len() - 1] < leaf_page_size_in_byte
265        {
266            mem_page_size_classes.push(leaf_page_size_in_byte);
267        }
268
269        if !cache_only {
270            assert!(mem_page_size_classes.len() >= 2);
271        } else {
272            assert!(!mem_page_size_classes.is_empty());
273        }
274
275        mem_page_size_classes
276    }
277
278    /// Create a new bf-tree instance with customized storage backend and
279    /// circular buffer size
280    ///
281    /// For in-memory tree, use `:memory:` as file path.
282    /// For cache-only tree, use `:cache:` as file path
283    /// For disk tree, file_path is the path to the index file
284    ///
285    /// Mini page cache must be at least 8192 bytes for practical workloads.
286    ///
287    /// ```
288    /// use bf_tree::BfTree;
289    /// let tree = BfTree::new(":memory:", 8192).unwrap();
290    /// ```
291    pub fn new(file_path: impl AsRef<Path>, cache_size_byte: usize) -> Result<Self, ConfigError> {
292        let config = Config::new(file_path, cache_size_byte);
293        Self::with_config(config, None)
294    }
295
296    /// Create a new bf-tree instance with customized configuration based on
297    /// a config file
298    pub fn new_with_config_file<P: AsRef<Path>>(config_file_path: P) -> Result<Self, ConfigError> {
299        let config = Config::new_with_config_file(config_file_path);
300        Self::with_config(config, None)
301    }
302
303    /// Initialize the bf-tree with provided config. For advanced user only.
304    /// An optional pre-allocated buffer pointer can be provided to use as the buffer pool memory.
305    pub fn with_config(config: Config, buffer_ptr: Option<*mut u8>) -> Result<Self, ConfigError> {
306        // Validate the config first
307        config.validate()?;
308
309        let wal = match config.write_ahead_log.as_ref() {
310            Some(wal_config) => {
311                let wal = WriteAheadLog::new(wal_config.clone());
312                Some(wal)
313            }
314            None => None,
315        };
316        let write_load_full = config.write_load_full_page;
317        let config = Arc::new(config);
318
319        // In cache-only mode, the initial root page is a full mini-page
320        if config.cache_only {
321            let leaf_storage = LeafStorage::new(config.clone(), buffer_ptr);
322
323            // Assuming CB can accommodate at least 2 leaf pages at the same time
324            let mini_page_guard = (leaf_storage)
325                .alloc_mini_page(config.leaf_page_size)
326                .expect("Fail to allocate a mini-page as initial root node");
327            LeafNode::initialize_mini_page(
328                &mini_page_guard,
329                config.leaf_page_size,
330                MiniPageNextLevel::new_null(),
331                true,
332            );
333            let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
334            let mini_loc = PageLocation::Mini(new_mini_ptr);
335
336            let (root_id, root_lock) = leaf_storage
337                .mapping_table()
338                .insert_mini_page_mapping(mini_loc);
339            assert_eq!(root_id.as_id(), 0);
340
341            drop(root_lock);
342            drop(mini_page_guard);
343            let root_id = root_id.raw() | Self::ROOT_IS_LEAF_MASK;
344
345            return Ok(Self {
346                root_page_id: AtomicU64::new(root_id),
347                storage: leaf_storage,
348                wal,
349                cache_only: config.cache_only,
350                write_load_full_page: write_load_full,
351                mini_page_size_classes: Self::create_mem_page_size_classes(
352                    config.cb_min_record_size,
353                    config.cb_max_record_size,
354                    config.leaf_page_size,
355                    config.max_fence_len,
356                    config.cache_only,
357                ),
358                config,
359                #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
360                metrics_recorder: Some(Arc::new(ThreadLocal::new())),
361            });
362        }
363
364        let leaf_storage = LeafStorage::new(config.clone(), buffer_ptr);
365        let (root_id, root_lock) = leaf_storage.mapping_table().alloc_base_page_mapping();
366        drop(root_lock);
367        assert_eq!(root_id.as_id(), 0);
368
369        let root_id = root_id.raw() | Self::ROOT_IS_LEAF_MASK;
370        Ok(Self {
371            root_page_id: AtomicU64::new(root_id),
372            storage: leaf_storage,
373            wal,
374            cache_only: config.cache_only,
375            write_load_full_page: write_load_full,
376            mini_page_size_classes: Self::create_mem_page_size_classes(
377                config.cb_min_record_size,
378                config.cb_max_record_size,
379                config.leaf_page_size,
380                config.max_fence_len,
381                config.cache_only,
382            ),
383            config,
384            #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
385            metrics_recorder: Some(Arc::new(ThreadLocal::new())),
386        })
387    }
388
389    /// Get the buffer metrics of the circular buffer.
390    /// This is a blocking call, will stop all other buffer operations,
391    /// use it wisely.
392    pub fn get_buffer_metrics(&self) -> CircularBufferMetrics {
393        self.storage.get_buffer_metrics()
394    }
395
396    /// returns the root page id and whether it is a leaf node.
397    pub(crate) fn get_root_page(&self) -> (PageID, bool) {
398        let root_page_id = self.root_page_id.load(Ordering::Acquire);
399        let root_is_leaf = (root_page_id & Self::ROOT_IS_LEAF_MASK) != 0;
400        let clean = root_page_id & (!Self::ROOT_IS_LEAF_MASK);
401
402        let page_id = if root_is_leaf {
403            PageID::from_id(clean)
404        } else {
405            PageID::from_pointer(clean as *const InnerNode)
406        };
407
408        (page_id, root_is_leaf)
409    }
410
411    pub(crate) fn mapping_table(&self) -> &PageTable {
412        self.storage.mapping_table()
413    }
414
415    pub(crate) fn should_promote_read(&self) -> bool {
416        get_rng().gen_range(0..100) < self.config.read_promotion_rate.load(Ordering::Relaxed)
417    }
418
419    pub(crate) fn should_promote_scan_page(&self) -> bool {
420        get_rng().gen_range(0..100) < self.config.scan_promotion_rate.load(Ordering::Relaxed)
421    }
422
423    /// Chance% to promote a base read record to mini page.
424    pub fn update_read_promotion_rate(&self, new_rate: usize) {
425        self.config
426            .read_promotion_rate
427            .store(new_rate, Ordering::Relaxed);
428    }
429
430    fn try_split_leaf(
431        &self,
432        cur_page_id: PageID,
433        parent: &Option<ReadGuard>,
434    ) -> Result<bool, TreeError> {
435        debug_assert!(cur_page_id.is_id());
436
437        // here we need to acquire x-lock for performance reason:
438        // if we acquire s-lock, it's very difficult for us to later upgrade to x-lock, because rwlock favors readers:
439        //      consider readers keep coming, we will never be able to upgrade to x-lock.
440        let mut cur_page = self.mapping_table().get_mut(&cur_page_id);
441
442        check_parent!(self, cur_page_id, parent);
443
444        let should_split = cur_page.get_split_flag();
445        if !should_split {
446            return Ok(false);
447        }
448        match parent {
449            Some(_) => {
450                unreachable!("Leaf node split should not happen here");
451            }
452            None => {
453                // only for the case of root node split
454
455                // In cache-only mode, the root mini-page node is split into two equal-sized mini-pages
456                if self.cache_only {
457                    // Create a new mini-page of the same size as the current root node
458                    // Assuming CB is at least able to hold two leaf-page sized mini-pages
459                    let mini_page_guard = self
460                        .storage
461                        .alloc_mini_page(self.config.leaf_page_size)
462                        .expect("Fail to allocate a mini-page during root split");
463                    LeafNode::initialize_mini_page(
464                        &mini_page_guard,
465                        self.config.leaf_page_size,
466                        MiniPageNextLevel::new_null(),
467                        true,
468                    );
469                    let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
470                    let mini_loc = PageLocation::Mini(new_mini_ptr);
471
472                    // Insert the new page into mapping table
473                    let (sibling_id, _mini_lock) = self
474                        .storage
475                        .mapping_table()
476                        .insert_mini_page_mapping(mini_loc);
477
478                    // Split current page with the newly created mini page
479                    let cur_page_loc = cur_page.get_page_location().clone();
480                    match cur_page_loc {
481                        PageLocation::Mini(ptr) => {
482                            let cur_mini_page = cur_page.load_cache_page_mut(ptr);
483                            let sibling_page = unsafe { &mut *new_mini_ptr };
484                            let split_key = cur_mini_page.split(sibling_page, true);
485
486                            let mut new_root_builder = InnerNodeBuilder::new();
487                            new_root_builder
488                                .set_left_most_page_id(cur_page_id)
489                                .set_children_is_leaf(true)
490                                .add_record(split_key, sibling_id);
491
492                            let new_root_ptr = new_root_builder.build();
493
494                            self.root_page_id
495                                .store(PageID::from_pointer(new_root_ptr).raw(), Ordering::Release);
496
497                            info!(sibling = sibling_id.raw(), "New root node installed!");
498
499                            debug_assert!(cur_mini_page.meta.meta_count_with_fence() > 0);
500                            debug_assert!(sibling_page.meta.meta_count_with_fence() > 0);
501
502                            return Ok(true);
503                        }
504                        _ => {
505                            panic!("The root node is not a mini-page in cache-only mode")
506                        }
507                    }
508                }
509
510                let mut x_page = cur_page;
511
512                let (sibling_id, mut sibling_entry) = self.alloc_base_page_and_lock();
513
514                info!(sibling = sibling_id.raw(), "Splitting root node!");
515
516                let sibling = sibling_entry.load_base_page_mut();
517
518                let leaf_node = x_page.load_base_page_mut();
519                let split_key = leaf_node.split(sibling, false);
520
521                let mut new_root_builder = InnerNodeBuilder::new();
522                new_root_builder
523                    .set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE))
524                    .set_left_most_page_id(cur_page_id)
525                    .set_children_is_leaf(true)
526                    .add_record(split_key, sibling_id);
527
528                let new_root_ptr = new_root_builder.build();
529
530                self.root_page_id
531                    .store(PageID::from_pointer(new_root_ptr).raw(), Ordering::Release);
532
533                info!(sibling = sibling_id.raw(), "New root node installed!");
534                Ok(true)
535            }
536        }
537    }
538
539    fn alloc_base_page_and_lock(&self) -> (PageID, LeafEntryXLocked<'_>) {
540        let (pid, base_entry) = self.mapping_table().alloc_base_page_mapping();
541
542        (pid, base_entry)
543    }
544
545    fn try_split_inner<'a>(
546        &self,
547        cur_page: PageID,
548        parent: Option<ReadGuard<'a>>,
549    ) -> Result<(bool, Option<ReadGuard<'a>>), TreeError> {
550        let cur_node = ReadGuard::try_read(cur_page.as_inner_node())?;
551
552        check_parent!(self, cur_page, parent);
553
554        let should_split = cur_node.as_ref().meta.get_split_flag();
555
556        if !should_split {
557            return Ok((false, parent));
558        }
559
560        info!(has_parent = parent.is_some(), "split inner node");
561
562        match parent {
563            Some(p) => {
564                let mut x_cur = cur_node.upgrade().map_err(|(_x, e)| e)?;
565                let mut x_parent = p.upgrade().map_err(|(_x, e)| e)?;
566
567                let split_key = x_cur.as_ref().get_split_key();
568
569                let mut sibling_builder = InnerNodeBuilder::new();
570                sibling_builder.set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE));
571
572                let success = x_parent
573                    .as_mut()
574                    .insert(&split_key, sibling_builder.get_page_id());
575                if !success {
576                    x_parent.as_mut().meta.set_split_flag();
577                    return Err(TreeError::NeedRestart);
578                }
579
580                x_cur.as_mut().split(&mut sibling_builder);
581
582                sibling_builder.build();
583
584                Ok((true, Some(x_parent.downgrade())))
585            }
586            None => {
587                let mut x_cur = cur_node.upgrade().map_err(|(_x, e)| e)?;
588
589                let mut sibling_builder = InnerNodeBuilder::new();
590                sibling_builder.set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE));
591                let sibling_id = sibling_builder.get_page_id();
592
593                let split_key = x_cur.as_mut().split(&mut sibling_builder);
594
595                let mut new_root_builder = InnerNodeBuilder::new();
596                new_root_builder
597                    .set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE))
598                    .set_left_most_page_id(cur_page)
599                    .set_children_is_leaf(false)
600                    .add_record(split_key, sibling_id);
601                sibling_builder.build();
602                let new_root_ptr = new_root_builder.build();
603                let _x_root = ReadGuard::try_read(new_root_ptr)
604                    .unwrap()
605                    .upgrade()
606                    .unwrap();
607                self.root_page_id
608                    .store(PageID::from_pointer(new_root_ptr).raw(), Ordering::Release);
609
610                info!(
611                    has_parent = parent.is_some(),
612                    cur = cur_page.raw(),
613                    "finished split inner node"
614                );
615
616                Ok((true, parent))
617            }
618        }
619    }
620
621    pub(crate) fn traverse_to_leaf(
622        &self,
623        key: &[u8],
624        aggressive_split: bool,
625    ) -> Result<(PageID, Option<ReadGuard<'_>>), TreeError> {
626        let (mut cur_page, mut cur_is_leaf) = self.get_root_page();
627        let mut parent: Option<ReadGuard> = None;
628
629        loop {
630            if aggressive_split {
631                if cur_is_leaf
632                    && !cur_page.is_inner_node_pointer()
633                    && self.try_split_leaf(cur_page, &parent)?
634                {
635                    return Err(TreeError::NeedRestart);
636                } else if !cur_is_leaf {
637                    let (split_success, new_parent) = self.try_split_inner(cur_page, parent)?;
638                    if split_success {
639                        return Err(TreeError::NeedRestart);
640                    } else {
641                        parent = new_parent;
642                    }
643                }
644            }
645
646            if cur_is_leaf {
647                return Ok((cur_page, parent));
648            } else {
649                let next = ReadGuard::try_read(cur_page.as_inner_node())?;
650
651                check_parent!(self, cur_page, parent);
652
653                let next_node = next.as_ref();
654                let next_is_leaf = next_node.meta.children_is_leaf();
655                let pos = next_node.lower_bound(key);
656                let kv_meta = next_node.get_kv_meta(pos as u16);
657                cur_page = next_node.get_value(kv_meta);
658                cur_is_leaf = next_is_leaf;
659                parent = Some(next);
660            }
661        }
662    }
663
664    fn write_inner(&self, write_op: WriteOp, aggressive_split: bool) -> Result<(), TreeError> {
665        let (pid, parent) = self.traverse_to_leaf(write_op.key, aggressive_split)?;
666
667        let mut leaf_entry = self.mapping_table().get_mut(&pid);
668
669        check_parent!(self, pid, parent);
670
671        let page_loc = leaf_entry.get_page_location();
672        match page_loc {
673            PageLocation::Null => {
674                if !self.cache_only {
675                    panic!("Found an Null page in non cache-only mode");
676                }
677
678                if write_op.op_type == OpType::Delete {
679                    return Ok(());
680                }
681
682                // Create a new mini-page to replace the null page
683                let mini_page_size = LeafNode::get_chain_size_hint(
684                    write_op.key.len(),
685                    write_op.value.len(),
686                    &self.mini_page_size_classes,
687                    self.cache_only,
688                );
689                let mini_page_guard = self.storage.alloc_mini_page(mini_page_size)?;
690                LeafNode::initialize_mini_page(
691                    &mini_page_guard,
692                    mini_page_size,
693                    MiniPageNextLevel::new_null(),
694                    true,
695                );
696                let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
697                let mini_loc = PageLocation::Mini(new_mini_ptr);
698
699                leaf_entry.create_cache_page_loc(mini_loc);
700
701                let mini_page_ref = leaf_entry.load_cache_page_mut(new_mini_ptr);
702                let insert_success =
703                    mini_page_ref.insert(write_op.key, write_op.value, write_op.op_type, 0);
704                assert!(insert_success);
705
706                debug_assert!(mini_page_ref.meta.meta_count_with_fence() > 0);
707                counter!(InsertCreatedMiniPage);
708            }
709            _ => {
710                leaf_entry.insert(
711                    write_op.key,
712                    write_op.value,
713                    parent,
714                    write_op.op_type,
715                    &self.storage,
716                    &self.write_load_full_page,
717                    &self.cache_only,
718                    &self.mini_page_size_classes,
719                )?;
720
721                if leaf_entry.cache_page_about_to_evict(&self.storage) {
722                    // we don't care about the result here
723                    _ = leaf_entry.move_cache_page_to_tail(&self.storage);
724                }
725
726                if let Some(wal) = &self.wal {
727                    let lsn = wal.append_and_wait(&write_op, leaf_entry.get_disk_offset());
728                    leaf_entry.update_lsn(lsn);
729                }
730            }
731        }
732
733        Ok(())
734    }
735
736    /// Make sure you're not holding any lock while calling this function.
737    pub(crate) fn evict_from_circular_buffer(&self) -> Result<usize, TreeError> {
738        // Why we need to evict multiple times?
739        // because we don't want each alloc to trigger evict, i.e., we want alloc to fail less often.
740        // with default 1024 bytes, one eviction allows us to alloc 1024 bytes (4 256-byte mini pages) without failure.
741        const TARGET_EVICT_SIZE: usize = 1024;
742        let mut evicted = 0;
743
744        // A corner case: we may not have enough memory to evict (i.e., the buffer might be empty now)
745        let mut retry_cnt = 0;
746
747        while evicted < TARGET_EVICT_SIZE && retry_cnt < 10 {
748            let n = self
749                .storage
750                .evict_from_buffer(|mini_page_handle: &TombstoneHandle| {
751                    eviction_callback(mini_page_handle, self)
752                })?;
753            evicted += n as usize;
754            retry_cnt += 1;
755        }
756        info!("stopped evict from circular buffer");
757        Ok(evicted)
758    }
759
760    /// Insert a key-value pair to the system, overrides existing value if present.
761    ///
762    /// ```
763    /// use bf_tree::BfTree;
764    /// use bf_tree::LeafReadResult;
765    ///
766    /// let mut config = bf_tree::Config::default();
767    /// config.cb_min_record_size(4);
768    /// let tree = BfTree::with_config(config, None).unwrap();
769    /// tree.insert(b"key", b"value");
770    /// let mut buffer = [0u8; 1024];
771    /// let read_size = tree.read(b"key", &mut buffer);
772    ///
773    /// assert_eq!(read_size, LeafReadResult::Found(5));
774    /// assert_eq!(&buffer[..5], b"value");
775    /// ```
776    pub fn insert(&self, key: &[u8], value: &[u8]) -> LeafInsertResult {
777        // The input key cannot exceed the configured max key length
778        if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
779            return LeafInsertResult::InvalidKV(format!("Key too large {}", key.len()));
780        }
781
782        // The input key has to be one byte at least
783        if key.is_empty() {
784            return LeafInsertResult::InvalidKV(format!(
785                "Key too small {}, at least one byte",
786                key.len()
787            ));
788        }
789
790        // The input key value pair cannot exceed the configured max record size
791        if value.len() > MAX_VALUE_LEN || key.len() + value.len() > self.config.cb_max_record_size {
792            return LeafInsertResult::InvalidKV(format!(
793                "Record too large {}, {}, please adjust cb_max_record_size in config",
794                key.len(),
795                value.len()
796            ));
797        }
798
799        // The input key value pair cannot be smaller than the configured min record size
800        if key.len() + value.len() < self.config.cb_min_record_size {
801            return LeafInsertResult::InvalidKV(format!(
802                "Record too small {}, {}, please adjust cb_min_record_size in config",
803                key.len(),
804                value.len()
805            ));
806        }
807
808        let backoff = Backoff::new();
809        let mut aggressive_split = false;
810
811        counter!(Insert);
812        info!(key_len = key.len(), value_len = value.len(), "insert");
813
814        loop {
815            let result = self.write_inner(WriteOp::make_insert(key, value), aggressive_split);
816            match result {
817                Ok(_) => return LeafInsertResult::Success,
818                Err(TreeError::NeedRestart) => {
819                    #[cfg(all(feature = "shuttle", test))]
820                    {
821                        shuttle::thread::yield_now();
822                    }
823                    counter!(InsertNeedRestart);
824                    aggressive_split = true;
825                }
826                Err(TreeError::CircularBufferFull) => {
827                    info!("insert failed, started evict from circular buffer");
828                    aggressive_split = true;
829                    counter!(InsertCircularBufferFull);
830                    _ = self.evict_from_circular_buffer();
831                    continue;
832                }
833                Err(TreeError::Locked) => {
834                    counter!(InsertLocked);
835                    backoff.snooze();
836                }
837            }
838        }
839    }
840
841    /// Read a record from the tree.
842    /// Returns the number of bytes read.
843    ///
844    /// TODO: don't panic if the out_buffer is too small, instead returns a error.
845    ///
846    /// ```
847    /// use bf_tree::BfTree;
848    /// use bf_tree::LeafReadResult;
849    ///
850    /// let mut config = bf_tree::Config::default();
851    /// config.cb_min_record_size(4);
852    ///
853    /// let tree = BfTree::with_config(config, None).unwrap();
854    /// tree.insert(b"key", b"value");
855    /// let mut buffer = [0u8; 1024];
856    /// let read_size = tree.read(b"key", &mut buffer);
857    /// assert_eq!(read_size, LeafReadResult::Found(5));
858    /// assert_eq!(&buffer[..5], b"value");
859    /// ```
860    pub fn read(&self, key: &[u8], out_buffer: &mut [u8]) -> LeafReadResult {
861        // The input key cannot exceed the configured max key length
862        if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
863            return LeafReadResult::InvalidKey;
864        }
865
866        // The input key has to be one byte at least
867        if key.is_empty() {
868            return LeafReadResult::InvalidKey;
869        }
870
871        let backoff = Backoff::new();
872
873        info!(key_len = key.len(), "read");
874        counter!(Read);
875        let mut aggressive_split = false;
876
877        #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
878        let mut debug_timer = DebugTimerGuard::new(Timer::Read, self.metrics_recorder.clone());
879
880        loop {
881            let result = self.read_inner(key, out_buffer, aggressive_split);
882            match result {
883                Ok(v) => {
884                    #[cfg(any(
885                        feature = "metrics-rt-debug-all",
886                        feature = "metrics-rt-debug-timer"
887                    ))]
888                    debug_timer.end();
889
890                    return v;
891                }
892                Err(TreeError::CircularBufferFull) => {
893                    info!("read promotion failed, started evict from circular buffer");
894                    aggressive_split = true;
895                    match self.evict_from_circular_buffer() {
896                        Ok(_) => continue,
897                        Err(_) => continue,
898                    };
899                }
900                Err(_) => {
901                    backoff.spin();
902                    aggressive_split = true;
903                }
904            }
905        }
906    }
907
908    /// Delete a record from the tree.
909    ///
910    /// ```
911    /// use bf_tree::BfTree;
912    /// use bf_tree::LeafReadResult;
913    ///
914    /// let tree = BfTree::default();
915    /// tree.insert(b"key", b"value");
916    /// tree.delete(b"key");
917    /// let mut buffer = [0u8; 1024];
918    /// let rt = tree.read(b"key", &mut buffer);
919    /// assert_eq!(rt, LeafReadResult::Deleted);
920    /// ```
921    pub fn delete(&self, key: &[u8]) {
922        // The input key cannot exceed the configured max key length
923        if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
924            return;
925        }
926
927        // The input key has to be one byte at least
928        if key.is_empty() {
929            return;
930        }
931
932        let backoff = Backoff::new();
933
934        info!(key_len = key.len(), "delete");
935
936        let mut aggressive_split = false;
937
938        loop {
939            let result = self.write_inner(WriteOp::make_delete(key), aggressive_split);
940            match result {
941                Ok(_) => return,
942                Err(TreeError::CircularBufferFull) => {
943                    info!("delete failed, started evict from circular buffer");
944                    aggressive_split = true;
945                    match self.evict_from_circular_buffer() {
946                        Ok(_) => continue,
947                        Err(_) => continue,
948                    };
949                }
950                Err(_) => {
951                    aggressive_split = true;
952                    backoff.spin();
953                }
954            }
955        }
956    }
957
958    /// Scan records in the tree, with starting key and desired scan count.
959    /// Returns a iterator that yields key-value pairs.
960    pub fn scan_with_count<'a>(
961        &'a self,
962        key: &[u8],
963        cnt: usize,
964        return_field: ScanReturnField,
965    ) -> Result<ScanIter<'a, 'a>, ScanIterError> {
966        // In cache-only mode, scan is not supported
967        if self.cache_only {
968            return Err(ScanIterError::CacheOnlyMode);
969        }
970
971        // The start key cannot exceed the configured max key length
972        if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
973            return Err(ScanIterError::InvalidStartKey);
974        }
975
976        // The input key has to be one byte at least
977        if key.is_empty() {
978            return Err(ScanIterError::InvalidStartKey);
979        }
980
981        // The count cannot be zero
982        if cnt == 0 {
983            return Err(ScanIterError::InvalidCount);
984        }
985
986        Ok(ScanIter::new_with_scan_count(self, key, cnt, return_field))
987    }
988
989    pub fn scan_with_end_key<'a>(
990        &'a self,
991        start_key: &[u8],
992        end_key: &[u8],
993        return_field: ScanReturnField,
994    ) -> Result<ScanIter<'a, 'a>, ScanIterError> {
995        // In cache-only mode, scan is not supported
996        if self.cache_only {
997            return Err(ScanIterError::CacheOnlyMode);
998        }
999
1000        // The start key cannot exceed the configured max key length
1001        if start_key.len() > self.config.max_fence_len / 2 || start_key.len() > MAX_KEY_LEN {
1002            return Err(ScanIterError::InvalidStartKey);
1003        }
1004
1005        // The input key has to be one byte at least
1006        if start_key.is_empty() {
1007            return Err(ScanIterError::InvalidStartKey);
1008        }
1009
1010        // The end key cannot exceed the configured max key length
1011        if end_key.len() > self.config.max_fence_len / 2 || end_key.len() > MAX_KEY_LEN {
1012            return Err(ScanIterError::InvalidEndKey);
1013        }
1014
1015        // The input key has to be one byte at least
1016        if end_key.is_empty() {
1017            return Err(ScanIterError::InvalidEndKey);
1018        }
1019
1020        // The start key cannot be greater than the end key
1021        let cmp = start_key.cmp(end_key);
1022        if cmp == std::cmp::Ordering::Greater {
1023            return Err(ScanIterError::InvalidKeyRange);
1024        }
1025
1026        Ok(ScanIter::new_with_end_key(
1027            self,
1028            start_key,
1029            end_key,
1030            return_field,
1031        ))
1032    }
1033
1034    #[doc(hidden)]
1035    pub fn scan_mut_with_count<'a>(
1036        &'a self,
1037        key: &'a [u8],
1038        cnt: usize,
1039        return_field: ScanReturnField,
1040    ) -> Result<ScanIterMut<'a, 'a>, ScanIterError> {
1041        // In cache-only mode, scan is not supported
1042        if self.cache_only {
1043            return Err(ScanIterError::CacheOnlyMode);
1044        }
1045
1046        // The start key cannot exceed the configured max key length
1047        if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
1048            return Err(ScanIterError::InvalidStartKey);
1049        }
1050
1051        // The count cannot be zero
1052        if cnt == 0 {
1053            return Err(ScanIterError::InvalidCount);
1054        }
1055
1056        Ok(ScanIterMut::new_with_scan_count(
1057            self,
1058            key,
1059            cnt,
1060            return_field,
1061        ))
1062    }
1063
1064    #[doc(hidden)]
1065    pub fn scan_mut_with_end_key<'a>(
1066        &'a self,
1067        start_key: &'a [u8],
1068        end_key: &'a [u8],
1069        return_field: ScanReturnField,
1070    ) -> Result<ScanIterMut<'a, 'a>, ScanIterError> {
1071        // In cache-only mode, scan is not supported
1072        if self.cache_only {
1073            return Err(ScanIterError::CacheOnlyMode);
1074        }
1075
1076        // The start key cannot exceed the configured max key length
1077        if start_key.len() > self.config.max_fence_len / 2 || start_key.len() > MAX_KEY_LEN {
1078            return Err(ScanIterError::InvalidStartKey);
1079        }
1080
1081        // The end key cannot exceed the configured max key length
1082        if end_key.len() > self.config.max_fence_len / 2 || end_key.len() > MAX_KEY_LEN {
1083            return Err(ScanIterError::InvalidEndKey);
1084        }
1085
1086        Ok(ScanIterMut::new_with_end_key(
1087            self,
1088            start_key,
1089            end_key,
1090            return_field,
1091        ))
1092    }
1093
1094    fn read_inner(
1095        &self,
1096        key: &[u8],
1097        out_buffer: &mut [u8],
1098        aggressive_split: bool,
1099    ) -> Result<LeafReadResult, TreeError> {
1100        let (node, parent) = self.traverse_to_leaf(key, aggressive_split)?;
1101
1102        let mut leaf = self.mapping_table().get(&node);
1103
1104        check_parent!(self, node, parent);
1105
1106        let out = leaf.read(
1107            key,
1108            out_buffer,
1109            self.config.mini_page_binary_search,
1110            self.cache_only,
1111        );
1112        match out {
1113            ReadResult::Mini(r) | ReadResult::Full(r) => {
1114                if leaf.cache_page_about_to_evict(&self.storage) {
1115                    let mut x_leaf = match leaf.try_upgrade() {
1116                        Ok(v) => v,
1117                        Err(_) => return Ok(r),
1118                    };
1119                    // we don't care about the result here, because we are in read path, we don't want to block.
1120                    _ = x_leaf.move_cache_page_to_tail(&self.storage);
1121                }
1122
1123                Ok(r)
1124            }
1125
1126            ReadResult::Base(r) => {
1127                counter!(BasePageRead);
1128
1129                // In cache-only mode, no base page should exist
1130                if self.cache_only {
1131                    panic!("Attempt to read a base page while in cache-only mode.");
1132                }
1133
1134                let v = match r {
1135                    LeafReadResult::Found(v) => v,
1136                    _ => return Ok(r),
1137                };
1138
1139                if parent.is_none() || !self.should_promote_read() {
1140                    return Ok(r);
1141                }
1142
1143                let mut x_leaf = match leaf.try_upgrade() {
1144                    Ok(x) => x,
1145                    Err(_) => {
1146                        return Ok(r);
1147                    }
1148                };
1149
1150                if self.config.read_record_cache {
1151                    // we do record cache.
1152                    // we roll dice to see if we should insert this value to mini page.
1153
1154                    let out = x_leaf.insert(
1155                        key,
1156                        &out_buffer[0..v as usize],
1157                        parent,
1158                        OpType::Cache,
1159                        &self.storage,
1160                        &self.write_load_full_page,
1161                        &self.cache_only,
1162                        &self.mini_page_size_classes,
1163                    );
1164
1165                    match out {
1166                        Ok(_) => {
1167                            counter!(ReadPromotionOk);
1168                            Ok(r)
1169                        }
1170                        Err(TreeError::Locked) => {
1171                            // We are doing this very optimistically, if contention happens, we just abort and return.
1172                            counter!(ReadPromotionFailed);
1173                            Ok(r)
1174                        }
1175                        Err(TreeError::CircularBufferFull) => {
1176                            counter!(ReadPromotionFailed);
1177                            Err(TreeError::CircularBufferFull)
1178                        }
1179                        Err(TreeError::NeedRestart) => {
1180                            // If we need restart here, potentially because parent is full.
1181                            counter!(ReadPromotionFailed);
1182                            Err(TreeError::NeedRestart)
1183                        }
1184                    }
1185                } else {
1186                    match self.upgrade_to_full_page(x_leaf, parent.unwrap()) {
1187                        Ok(_) | Err(TreeError::Locked) => Ok(r),
1188                        Err(e) => Err(e),
1189                    }
1190                }
1191            }
1192            ReadResult::None => Ok(LeafReadResult::NotFound),
1193        }
1194    }
1195
1196    fn upgrade_to_full_page<'a>(
1197        &'a self,
1198        mut x_leaf: LeafEntryXLocked<'a>,
1199        parent: ReadGuard<'a>,
1200    ) -> Result<LeafEntryXLocked<'a>, TreeError> {
1201        let page_loc = x_leaf.get_page_location().clone();
1202        match page_loc {
1203            PageLocation::Mini(ptr) => {
1204                let mini_page = x_leaf.load_cache_page_mut(ptr);
1205                let h = self.storage.begin_dealloc_mini_page(mini_page)?;
1206                let _merge_result = x_leaf.try_merge_mini_page(&h, parent, &self.storage)?;
1207                let base_offset = mini_page.next_level;
1208                x_leaf.change_to_base_loc();
1209                self.storage.finish_dealloc_mini_page(h);
1210
1211                let base_page_ref = x_leaf.load_base_page_from_buffer();
1212                let full_page_loc =
1213                    upgrade_to_full_page(&self.storage, base_page_ref, base_offset)?;
1214                x_leaf.create_cache_page_loc(full_page_loc);
1215                Ok(x_leaf)
1216            }
1217            PageLocation::Full(_ptr) => Ok(x_leaf),
1218            PageLocation::Base(offset) => {
1219                let base_page_ref = x_leaf.load_base_page(offset);
1220                let next_level = MiniPageNextLevel::new(offset);
1221                let full_page_loc = upgrade_to_full_page(&self.storage, base_page_ref, next_level)?;
1222                x_leaf.create_cache_page_loc(full_page_loc);
1223                Ok(x_leaf)
1224            }
1225            PageLocation::Null => panic!("upgrade_to_full_page on Null page"),
1226        }
1227    }
1228
1229    /// Collect all metrics and reset the metric recorder
1230    /// The caller needs to ensure there are no references to the bf-tree's metrics recorder anymore.
1231    pub fn get_metrics(&mut self) -> Option<serde_json::Value> {
1232        #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
1233        {
1234            let recorder = self.metrics_recorder.take();
1235            match recorder {
1236                Some(r) => {
1237                    let recorders = Arc::try_unwrap(r).expect("Failed to obtain the recorders of bf-tree, please make sure no other references exist.");
1238                    let mut timer_accumulated = TimerRecorder::default();
1239
1240                    // Only collect timer metrics for now
1241                    for r in recorders {
1242                        let t = unsafe { &*r.get() };
1243
1244                        timer_accumulated += t.timers.clone();
1245                    }
1246
1247                    let output = serde_json::json!({
1248                        "Timers": timer_accumulated,
1249                    });
1250
1251                    self.metrics_recorder = Some(Arc::new(ThreadLocal::new()));
1252
1253                    Some(output)
1254                }
1255                None => None,
1256            }
1257        }
1258        #[cfg(not(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer")))]
1259        {
1260            None
1261        }
1262    }
1263}
1264
1265pub(crate) fn key_value_physical_size(key: &[u8], value: &[u8]) -> usize {
1266    let key_size = key.len();
1267    let value_size = value.len();
1268    let meta_size = crate::nodes::KV_META_SIZE;
1269    key_size + value_size + meta_size
1270}
1271
1272pub(crate) fn eviction_callback(
1273    mini_page_handle: &TombstoneHandle,
1274    tree: &BfTree,
1275) -> Result<(), TreeError> {
1276    let mini_page = mini_page_handle.ptr as *mut LeafNode;
1277    let key_to_this_page = if tree.cache_only {
1278        unsafe { &*mini_page }.try_get_key_to_reach_this_node()?
1279    } else {
1280        unsafe { &*mini_page }.get_key_to_reach_this_node()
1281    };
1282
1283    // Here we need to set aggressive split to true, because we would split parent node due to leaf split.
1284    let (pid, parent) = tree.traverse_to_leaf(&key_to_this_page, true)?;
1285    info!(
1286        pid = pid.raw(),
1287        "starting to merge mini page in eviction call back"
1288    );
1289
1290    let mut leaf_entry = tree.mapping_table().get_mut(&pid);
1291
1292    histogram!(EvictNodeSize, unsafe { &*mini_page }.meta.node_size as u64);
1293
1294    match leaf_entry.get_page_location() {
1295        PageLocation::Mini(ptr) => {
1296            {
1297                // In order to lock this node, we need to traverse to this node first;
1298                // but in order to traverse this node, we need to read the keys in this node;
1299                // in order to read the keys in this node, we need to lock this node.
1300                //
1301                // Because we didn't lock the node while reading `key_to_this_page`,
1302                // we need to recheck if the node is still the same node.
1303                if *ptr != mini_page {
1304                    return Err(TreeError::NeedRestart);
1305                }
1306            }
1307
1308            let parent = parent.expect("Mini page must have a parent");
1309            parent.check_version()?;
1310
1311            // In the case of cache_only, the correponding mapping table entry of the mini-page
1312            // is replaced by a non-existant base page
1313            if tree.cache_only {
1314                leaf_entry.change_to_null_loc();
1315            } else {
1316                leaf_entry.try_merge_mini_page(mini_page_handle, parent, &tree.storage)?;
1317                leaf_entry.change_to_base_loc();
1318                // we don't need to dealloc the old_mini_page here because we are in eviction callback.
1319            }
1320
1321            Ok(())
1322        }
1323
1324        PageLocation::Full(ptr) => {
1325            if *ptr != mini_page {
1326                return Err(TreeError::NeedRestart);
1327            }
1328
1329            leaf_entry.merge_full_page(mini_page_handle);
1330            Ok(())
1331        }
1332
1333        // This means the key read from the mini page is corrupted and points to a different page
1334        PageLocation::Base(_offset) => Err(TreeError::NeedRestart),
1335
1336        // This means the key read from the mini page is corrupted and points to a different page
1337        PageLocation::Null => Err(TreeError::NeedRestart),
1338    }
1339}
1340
1341#[cfg(test)]
1342mod tests {
1343    use crate::error::ConfigError;
1344    use crate::BfTree;
1345
1346    #[test]
1347    fn test_mini_page_size_classes() {
1348        let mut size_classes = BfTree::create_mem_page_size_classes(48, 1952, 4096, 64, false);
1349        assert_eq!(
1350            size_classes,
1351            vec![128, 192, 256, 512, 960, 1856, 2048, 4096]
1352        );
1353
1354        size_classes = BfTree::create_mem_page_size_classes(1548, 1548, 3136, 64, true);
1355        assert_eq!(size_classes, vec![1536, 3136]);
1356
1357        size_classes = BfTree::create_mem_page_size_classes(48, 3072, 12288, 64, false);
1358        assert_eq!(
1359            size_classes,
1360            vec![128, 192, 256, 512, 960, 1856, 3648, 7232, 9088, 12288]
1361        );
1362
1363        size_classes = BfTree::create_mem_page_size_classes(4, 1952, 4096, 32, false);
1364        assert_eq!(size_classes, vec![64, 128, 256, 448, 832, 1600, 2048, 4096]);
1365    }
1366
1367    #[test]
1368    fn test_invalid_config_to_build_bf_tree() {
1369        // Min record too small
1370        let mut config = crate::Config::default();
1371        config.cb_min_record_size(4);
1372        config.leaf_page_size(32 * 1024);
1373
1374        if let Err(e) = BfTree::with_config(config.clone(), None) {
1375            match e {
1376                ConfigError::MinimumRecordSize(_) => {}
1377                _ => panic!("Expected InvalidMinimumRecordSize error"),
1378            }
1379        } else {
1380            panic!("Expected error but got Ok");
1381        }
1382
1383        // Max record too large
1384        config = crate::Config::default();
1385        config.cb_max_record_size(64 * 1024);
1386
1387        if let Err(e) = BfTree::with_config(config.clone(), None) {
1388            match e {
1389                ConfigError::MaximumRecordSize(_) => {}
1390                _ => panic!("Expected InvalidMaximumRecordSize error"),
1391            }
1392        } else {
1393            panic!("Expected error but got Ok");
1394        }
1395
1396        // Leaf page size not aligned
1397        config = crate::Config::default();
1398        config.leaf_page_size(4050);
1399
1400        if let Err(e) = BfTree::with_config(config.clone(), None) {
1401            match e {
1402                ConfigError::LeafPageSize(_) => {}
1403                _ => panic!("Expected InvalidLeafPageSize error"),
1404            }
1405        } else {
1406            panic!("Expected error but got Ok");
1407        }
1408
1409        // Circular buffer size too small
1410        config = crate::Config::default();
1411        config.leaf_page_size(16 * 1024);
1412        config.cb_size_byte(16 * 1024);
1413
1414        if let Err(e) = BfTree::with_config(config.clone(), None) {
1415            match e {
1416                ConfigError::CircularBufferSize(_) => {}
1417                _ => panic!("Expected InvalidCircularBufferSize error"),
1418            }
1419        } else {
1420            panic!("Expected error but got Ok");
1421        }
1422
1423        // Circular buffer size not power of two
1424        config = crate::Config::default();
1425        config.cb_size_byte(20 * 1024);
1426        if let Err(e) = BfTree::with_config(config.clone(), None) {
1427            match e {
1428                ConfigError::CircularBufferSize(_) => {}
1429                _ => panic!("Expected InvalidCircularBufferSize error"),
1430            }
1431        } else {
1432            panic!("Expected error but got Ok");
1433        }
1434
1435        // Cache-only mode specific
1436        config = crate::Config::default();
1437        config.cache_only(true);
1438        config.cb_size_byte(2 * 4096);
1439
1440        if let Err(e) = BfTree::with_config(config.clone(), None) {
1441            match e {
1442                ConfigError::CircularBufferSize(_) => {}
1443                _ => panic!("Expected InvalidCircularBufferSize error"),
1444            }
1445        } else {
1446            panic!("Expected error but got Ok");
1447        }
1448    }
1449}