Skip to main content

bf_tree/
tree.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT license.
3
4#[cfg(not(all(feature = "shuttle", test)))]
5use rand::Rng;
6
7#[cfg(all(feature = "shuttle", test))]
8use shuttle::rand::Rng;
9
10use cfg_if::cfg_if;
11
12cfg_if! {
13    if #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))] {
14        use crate::metric::{Timer, TlsRecorder, timer::{TimerRecorder, DebugTimerGuard}};
15        use std::cell::UnsafeCell;
16        use thread_local::ThreadLocal;
17    }
18}
19
20use crate::{
21    check_parent,
22    circular_buffer::{CircularBufferMetrics, TombstoneHandle},
23    counter,
24    error::{ConfigError, TreeError},
25    histogram, info,
26    mini_page_op::{upgrade_to_full_page, LeafEntryXLocked, LeafOperations, ReadResult},
27    nodes::{
28        leaf_node::{LeafKVMeta, LeafReadResult, MiniPageNextLevel, OpType},
29        InnerNode, InnerNodeBuilder, LeafNode, PageID, CACHE_LINE_SIZE, DISK_PAGE_SIZE,
30        INNER_NODE_SIZE, MAX_KEY_LEN, MAX_LEAF_PAGE_SIZE, MAX_VALUE_LEN,
31    },
32    range_scan::{ScanIter, ScanIterMut, ScanReturnField},
33    snapshot::CPRSnapShotMgr,
34    storage::{LeafStorage, PageLocation, PageTable},
35    sync::{
36        atomic::{AtomicU64, Ordering},
37        Arc,
38    },
39    utils::{get_rng, inner_lock::ReadGuard, Backoff, BfsVisitor, NodeInfo},
40    wal::{WriteAheadLog, WriteOp},
41    Config, StorageBackend,
42};
43use std::path::Path;
44
45/// The bf-tree instance
46pub struct BfTree {
47    pub(crate) root_page_id: AtomicU64,
48    pub(crate) storage: LeafStorage,
49    pub(crate) wal: Option<Arc<WriteAheadLog>>,
50    pub(crate) config: Arc<Config>,
51    pub(crate) write_load_full_page: bool,
52    pub(crate) cache_only: bool, // If true, there is no permenant storage layer thus no durability guarantee of any upsert in the tree
53    pub(crate) mini_page_size_classes: Vec<usize>, // Size classes of mini pages
54    pub(crate) snapshot_mgr: Option<Arc<CPRSnapShotMgr>>, // Manager of a CPR snapshot
55    #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
56    pub metrics_recorder: Option<Arc<ThreadLocal<UnsafeCell<TlsRecorder>>>>, // Per-tree metrics recorder under "metrics-rt-debug" feature
57}
58
59unsafe impl Sync for BfTree {}
60
61unsafe impl Send for BfTree {}
62
63#[derive(Debug, PartialEq, Eq, Clone)]
64pub enum LeafInsertResult {
65    Success,
66    InvalidKV(String),
67}
68
69#[derive(Debug, PartialEq, Eq, Clone)]
70pub enum ScanIterError {
71    CacheOnlyMode,
72    InvalidStartKey,
73    InvalidEndKey,
74    InvalidCount,
75    InvalidKeyRange,
76}
77
78impl Drop for BfTree {
79    fn drop(&mut self) {
80        if let Some(ref wal) = self.wal {
81            wal.stop_background_job();
82        }
83
84        let visitor = BfsVisitor::new_all_nodes(self);
85        for node_info in visitor {
86            match node_info {
87                NodeInfo::Leaf { page_id, .. } => {
88                    let mut leaf = self.mapping_table().get_mut(&page_id);
89                    leaf.dealloc_self(&self.storage, self.cache_only);
90                }
91                NodeInfo::Inner { ptr, .. } => {
92                    if unsafe { &*ptr }.is_valid_disk_offset() {
93                        let disk_offset = unsafe { &*ptr }.disk_offset;
94                        if self.config.storage_backend == StorageBackend::Memory {
95                            // special case for memory backend, we need to deallocate the memory.
96                            self.storage.vfs.dealloc_offset(disk_offset as usize);
97                        }
98                    }
99                    InnerNode::free_node(ptr as *mut InnerNode);
100                }
101            }
102        }
103    }
104}
105
106impl Default for BfTree {
107    fn default() -> Self {
108        Self::new(":memory:", 1024 * 1024 * 32).unwrap()
109    }
110}
111
112impl BfTree {
113    pub(crate) const ROOT_IS_LEAF_MASK: u64 = 0x8000_0000_0000_0000; // This is quite error-prone, make sure the mask is not conflicting with the page id definition.
114
115    /// Create the size classes of all memory pages in acending order based on the record size (key + value) and the leaf page size
116    /// [s_0, s_1, ..., s_x], ascending order.
117    /// Each s_i is of size 2^i * c + size_of(LeafNode)
118    /// where c = (min_record_size + LeafKVMeta) aligned on CACHE_LINE_SIZE and 2^i * c <= leaf_page_size and s_x = leaf_page_size
119    ///
120    /// In non cache-only mode, the largest mini page size is s_(x-1) and full page/leaf base page size is s_x.
121    /// Currently, the design assumes a mini-page can always be successfully merged into a leaf page in one pass (including at most one base page split).
122    /// As such, the following three conditions are sufficient in preventing merge failures.
123    /// C1) x >= 1
124    /// C2) s_x > s_{x-1}
125    /// C3) max_record_size + SizeOf(KVMeta) <= (s_x - s_{x-1} - 2 * (fence_meta + max_key_len)).
126    ///
127    /// In cache-only mode, the largest mini page is s_x as there is no full nor base page.
128    /// Although there is no merging of mini-pages, the design assumes a new record can always be successfully inserted into a mini-page in one pass
129    /// (including at most one mini-page split). As such, the following sufficient condition is required.
130    /// C1) max_record_page + Sizeof(KVMeta) <= (s_x - SizeOf(NodeMeta)) / 2
131    /// C2) if x >= 1, s_x >= s_{x-1} + max_record_size + SizeOf(KVMeta)
132    ///
133    /// C2) is necessary for cache-only mode to guarantee that a mini-page can grow to full page size before being split up to two full-sized pages
134    pub(crate) fn create_mem_page_size_classes(
135        min_record_size_in_byte: usize,
136        max_record_size_in_byte: usize,
137        leaf_page_size_in_byte: usize,
138        max_fence_len_in_byte: usize,
139        cache_only: bool,
140    ) -> Vec<usize> {
141        // Sanity check of the input parameters
142        assert!(
143            min_record_size_in_byte > 1,
144            "cb_min_record_size in config cannot be less than 2"
145        );
146        assert!(
147            min_record_size_in_byte <= max_record_size_in_byte,
148            "cb_min_record_size cannot be larger than cb_max_record_size"
149        );
150        assert!(
151            max_fence_len_in_byte > 0,
152            "max_fence_len in config cannot be zero"
153        );
154        assert!(
155            max_fence_len_in_byte / 2 < max_record_size_in_byte,
156            "max_fence_len/2 cannot be larger than cb_max_record_size"
157        );
158        assert!(
159            leaf_page_size_in_byte <= MAX_LEAF_PAGE_SIZE,
160            "leaf_page_size in config cannot be larger than {}",
161            MAX_LEAF_PAGE_SIZE
162        );
163        assert!(
164            max_fence_len_in_byte / 2 <= MAX_KEY_LEN,
165            "max_key_len in config cannot be larger than {}",
166            MAX_KEY_LEN
167        );
168        assert!(
169            leaf_page_size_in_byte / min_record_size_in_byte <= 4096,
170            "Maximum number of records per page (leaf_page_size/min_record_size) cannot exceed 2^12.", // This is restricted by #bits for value count in NodeMeta
171        );
172
173        // In non cache-only mode, the leaf page should be in the multiple of DISK_PAGE_SIZE.
174        // In cache-only mode, the leaf page should be aligned on cache line size.
175        if !cache_only {
176            assert!(
177                leaf_page_size_in_byte.is_multiple_of(DISK_PAGE_SIZE),
178                "leaf_page_size in config should be multiple of {}",
179                DISK_PAGE_SIZE
180            );
181        } else {
182            assert!(
183                leaf_page_size_in_byte.is_multiple_of(CACHE_LINE_SIZE),
184                "leaf_page_size in config should be multiple of {}",
185                CACHE_LINE_SIZE
186            );
187        }
188
189        let max_record_size_with_meta = max_record_size_in_byte + std::mem::size_of::<LeafKVMeta>();
190        let mut max_mini_page_size: usize;
191
192        if cache_only {
193            // Guarantee C1), C2) for cache-only mode
194            max_mini_page_size = leaf_page_size_in_byte - max_record_size_with_meta;
195            max_mini_page_size = (max_mini_page_size / CACHE_LINE_SIZE) * CACHE_LINE_SIZE;
196
197            assert!(
198                leaf_page_size_in_byte
199                    >= 2 * max_record_size_with_meta + std::mem::size_of::<LeafNode>(),
200                "cb_max_record_size of config should be <= {}",
201                (leaf_page_size_in_byte - std::mem::size_of::<LeafNode>()) / 2
202                    - std::mem::size_of::<LeafKVMeta>()
203            );
204        } else {
205            // Guarantee C1), C2) and C3) for non cache-only mode
206            max_mini_page_size = leaf_page_size_in_byte
207                - max_record_size_with_meta
208                - max_fence_len_in_byte
209                - 2 * std::mem::size_of::<LeafKVMeta>();
210            max_mini_page_size = (max_mini_page_size / CACHE_LINE_SIZE) * CACHE_LINE_SIZE;
211
212            assert!(
213                max_mini_page_size >= max_record_size_with_meta + std::mem::size_of::<LeafNode>(),
214                "cb_max_record_size of config should be <= {}",
215                max_mini_page_size
216                    - std::mem::size_of::<LeafNode>()
217                    - std::mem::size_of::<LeafKVMeta>()
218            );
219        }
220
221        // Generate all size classes for mini-pages
222        let mut mem_page_size_classes = Vec::new();
223
224        let base: i32 = 2;
225        let mut record_num_per_page_exp: u32 = 0;
226        let c: usize = min_record_size_in_byte + std::mem::size_of::<LeafKVMeta>();
227
228        // No need to consider fence here as mini-pages have no fences.
229        let mut size_class =
230            base.pow(record_num_per_page_exp) as usize * c + std::mem::size_of::<LeafNode>();
231
232        // Memory page size is aligned on cache line size
233        if !size_class.is_multiple_of(CACHE_LINE_SIZE) {
234            size_class = (size_class / CACHE_LINE_SIZE + 1) * CACHE_LINE_SIZE;
235        }
236
237        while size_class <= max_mini_page_size {
238            if mem_page_size_classes.is_empty()
239                || (mem_page_size_classes[mem_page_size_classes.len() - 1] < size_class)
240            {
241                mem_page_size_classes.push(size_class);
242            }
243
244            record_num_per_page_exp += 1;
245            size_class =
246                base.pow(record_num_per_page_exp) as usize * c + std::mem::size_of::<LeafNode>();
247
248            if !size_class.is_multiple_of(CACHE_LINE_SIZE) {
249                size_class = (size_class / CACHE_LINE_SIZE + 1) * CACHE_LINE_SIZE;
250            }
251        }
252
253        if !cache_only {
254            assert!(!mem_page_size_classes.is_empty());
255        }
256
257        // Add the largest mini page size if not already added
258        if mem_page_size_classes.is_empty()
259            || mem_page_size_classes[mem_page_size_classes.len() - 1] < max_mini_page_size
260        {
261            mem_page_size_classes.push(max_mini_page_size);
262        }
263
264        // The largest page size is the full leaf page size
265        if mem_page_size_classes.is_empty()
266            || mem_page_size_classes[mem_page_size_classes.len() - 1] < leaf_page_size_in_byte
267        {
268            mem_page_size_classes.push(leaf_page_size_in_byte);
269        }
270
271        if !cache_only {
272            assert!(mem_page_size_classes.len() >= 2);
273        } else {
274            assert!(!mem_page_size_classes.is_empty());
275        }
276
277        mem_page_size_classes
278    }
279
280    /// Create a new bf-tree instance with customized storage backend and
281    /// circular buffer size
282    ///
283    /// For in-memory tree, use `:memory:` as file path.
284    /// For cache-only tree, use `:cache:` as file path
285    /// For disk tree, file_path is the path to the index file
286    ///
287    /// Mini page cache must be at least 8192 bytes for practical workloads.
288    ///
289    /// ```
290    /// use bf_tree::BfTree;
291    /// let tree = BfTree::new(":memory:", 8192).unwrap();
292    /// ```
293    pub fn new(file_path: impl AsRef<Path>, cache_size_byte: usize) -> Result<Self, ConfigError> {
294        let config = Config::new(file_path, cache_size_byte);
295        Self::with_config(config, None)
296    }
297
298    /// Create a new bf-tree instance with customized configuration based on
299    /// a config file
300    pub fn new_with_config_file<P: AsRef<Path>>(config_file_path: P) -> Result<Self, ConfigError> {
301        let config = Config::new_with_config_file(config_file_path);
302        Self::with_config(config, None)
303    }
304
305    /// Initialize the bf-tree with provided config. For advanced user only.
306    /// An optional pre-allocated buffer pointer can be provided to use as the buffer pool memory.
307    pub fn with_config(config: Config, buffer_ptr: Option<*mut u8>) -> Result<Self, ConfigError> {
308        // Validate the config first
309        config.validate()?;
310
311        let wal = match config.write_ahead_log.as_ref() {
312            Some(wal_config) => {
313                let wal = WriteAheadLog::new(wal_config.clone());
314                Some(wal)
315            }
316            None => None,
317        };
318        let write_load_full = config.write_load_full_page;
319        let config = Arc::new(config);
320
321        // In cache-only mode, the initial root page is a full mini-page
322        if config.cache_only {
323            let snapshot_mgr = if config.use_snapshot {
324                Some(Arc::new(CPRSnapShotMgr::new(
325                    &config.snapshot_backend,
326                    config.snapshot_version,
327                    &config.snapshot_file_path,
328                )))
329            } else {
330                None
331            };
332
333            let leaf_storage = LeafStorage::new(config.clone(), buffer_ptr, snapshot_mgr.clone());
334
335            // CPR snapshot guard for proper snapshot version setting of the root page
336            // No need for CPR logic
337            let _snapshot_guard = match CPRSnapShotMgr::get_snapshot_guard(snapshot_mgr.clone()) {
338                Ok(guard) => guard,
339                Err(()) => {
340                    panic!("Failed to acquire a snapshot guard for root page initialization");
341                }
342            };
343
344            // Assuming CB can accommodate at least 2 leaf pages at the same time
345            let mini_page_guard = (leaf_storage)
346                .alloc_mini_page(config.leaf_page_size)
347                .expect("Fail to allocate a mini-page as initial root node");
348            LeafNode::initialize_mini_page(
349                &mini_page_guard,
350                config.leaf_page_size,
351                MiniPageNextLevel::new_null(),
352                true,
353            );
354            let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
355            let mini_loc = PageLocation::Mini(new_mini_ptr);
356
357            let (root_id, root_lock) = leaf_storage
358                .mapping_table()
359                .insert_mini_page_mapping(mini_loc);
360            assert_eq!(root_id.as_id(), 0);
361
362            drop(root_lock);
363            drop(mini_page_guard);
364            let root_id = root_id.raw() | Self::ROOT_IS_LEAF_MASK;
365
366            return Ok(Self {
367                root_page_id: AtomicU64::new(root_id),
368                storage: leaf_storage,
369                wal,
370                cache_only: config.cache_only,
371                write_load_full_page: write_load_full,
372                mini_page_size_classes: Self::create_mem_page_size_classes(
373                    config.cb_min_record_size,
374                    config.cb_max_record_size,
375                    config.leaf_page_size,
376                    config.max_fence_len,
377                    config.cache_only,
378                ),
379                snapshot_mgr,
380                config,
381                #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
382                metrics_recorder: Some(Arc::new(ThreadLocal::new())),
383            });
384        }
385
386        let snapshot_mgr = if config.use_snapshot {
387            Some(Arc::new(CPRSnapShotMgr::new(
388                &config.snapshot_backend,
389                config.snapshot_version,
390                &config.snapshot_file_path,
391            )))
392        } else {
393            None
394        };
395
396        let leaf_storage = LeafStorage::new(config.clone(), buffer_ptr, snapshot_mgr.clone());
397
398        // CPR snapshot guard for proper snapshot version setting of the root page
399        // No need for CPR logic
400        let _snapshot_guard = match CPRSnapShotMgr::get_snapshot_guard(snapshot_mgr.clone()) {
401            Ok(guard) => guard,
402            Err(()) => {
403                panic!("Failed to acquire a snapshot guard for root page initialization");
404            }
405        };
406
407        let (root_id, root_lock) = leaf_storage.mapping_table().alloc_base_page_mapping();
408        drop(root_lock);
409        assert_eq!(root_id.as_id(), 0);
410
411        let root_id = root_id.raw() | Self::ROOT_IS_LEAF_MASK;
412        Ok(Self {
413            root_page_id: AtomicU64::new(root_id),
414            storage: leaf_storage,
415            wal,
416            cache_only: config.cache_only,
417            write_load_full_page: write_load_full,
418            mini_page_size_classes: Self::create_mem_page_size_classes(
419                config.cb_min_record_size,
420                config.cb_max_record_size,
421                config.leaf_page_size,
422                config.max_fence_len,
423                config.cache_only,
424            ),
425            snapshot_mgr,
426            config,
427            #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
428            metrics_recorder: Some(Arc::new(ThreadLocal::new())),
429        })
430    }
431
432    pub fn config(&self) -> &Config {
433        &self.config
434    }
435
436    /// Get the buffer metrics of the circular buffer.
437    /// This is a blocking call, will stop all other buffer operations,
438    /// use it wisely.
439    pub fn get_buffer_metrics(&self) -> CircularBufferMetrics {
440        self.storage.get_buffer_metrics()
441    }
442
443    /// returns the root page id and whether it is a leaf node.
444    pub(crate) fn get_root_page(&self) -> (PageID, bool) {
445        let root_page_id = self.root_page_id.load(Ordering::Acquire);
446        let root_is_leaf = (root_page_id & Self::ROOT_IS_LEAF_MASK) != 0;
447        let clean = root_page_id & (!Self::ROOT_IS_LEAF_MASK);
448
449        let page_id = if root_is_leaf {
450            PageID::from_id(clean)
451        } else {
452            PageID::from_pointer(clean as *const InnerNode)
453        };
454
455        (page_id, root_is_leaf)
456    }
457
458    pub(crate) fn mapping_table(&self) -> &PageTable {
459        self.storage.mapping_table()
460    }
461
462    pub(crate) fn should_promote_read(&self) -> bool {
463        get_rng().random_range(0..100) < self.config.read_promotion_rate.load(Ordering::Relaxed)
464    }
465
466    pub(crate) fn should_promote_scan_page(&self) -> bool {
467        get_rng().random_range(0..100) < self.config.scan_promotion_rate.load(Ordering::Relaxed)
468    }
469
470    /// Chance% to promote a base read record to mini page.
471    pub fn update_read_promotion_rate(&self, new_rate: usize) {
472        self.config
473            .read_promotion_rate
474            .store(new_rate, Ordering::Relaxed);
475    }
476
477    fn try_split_leaf(
478        &self,
479        cur_page_id: PageID,
480        parent: &Option<ReadGuard>,
481    ) -> Result<bool, TreeError> {
482        debug_assert!(cur_page_id.is_id());
483
484        // here we need to acquire x-lock for performance reason:
485        // if we acquire s-lock, it's very difficult for us to later upgrade to x-lock, because rwlock favors readers:
486        //      consider readers keep coming, we will never be able to upgrade to x-lock.
487        let mut cur_page = self.mapping_table().get_mut(&cur_page_id);
488
489        check_parent!(self, cur_page_id, parent);
490
491        let should_split = cur_page.get_split_flag();
492        if !should_split {
493            return Ok(false);
494        }
495
496        // CPR snapshot guard
497        let snapshot_guard = match CPRSnapShotMgr::get_snapshot_guard(self.snapshot_mgr.clone()) {
498            Ok(guard) => guard,
499            Err(()) => {
500                return Err(TreeError::NeedRestart);
501            }
502        };
503
504        match parent {
505            Some(_) => {
506                unreachable!("Leaf node split should not happen here");
507            }
508            None => {
509                if snapshot_guard.is_protected() {
510                    let local_thread_snapshot_version =
511                        CPRSnapShotMgr::get_snapshot_thread_version();
512                    match snapshot_guard.get_local_phase_id() {
513                        // PREPARE
514                        1 => {
515                            if self.cache_only {
516                                let root_page_loc = cur_page.get_page_location().clone();
517                                match root_page_loc {
518                                    PageLocation::Mini(ptr) => {
519                                        let root_page = cur_page.load_cache_page_mut(ptr);
520                                        if root_page.get_snapshot_version()
521                                            > local_thread_snapshot_version
522                                        {
523                                            return Err(TreeError::NeedRestart);
524                                        }
525                                    }
526                                    _ => {
527                                        panic!(
528                                            "The root node is not a mini-page in cache-only mode"
529                                        )
530                                    }
531                                }
532                            } else {
533                                let root_page = cur_page.load_base_page_mut();
534                                if root_page.get_snapshot_version() > local_thread_snapshot_version
535                                {
536                                    return Err(TreeError::NeedRestart);
537                                }
538                            }
539                        }
540                        // INPROG or SWEEPING
541                        2 | 3 => {
542                            if self.cache_only {
543                                let root_page_loc = cur_page.get_page_location().clone();
544                                match root_page_loc {
545                                    PageLocation::Mini(ptr) => {
546                                        let root_page = cur_page.load_cache_page_mut(ptr);
547                                        if root_page.get_snapshot_version()
548                                            < local_thread_snapshot_version
549                                        {
550                                            let root_page_ptr = unsafe {
551                                                std::slice::from_raw_parts(
552                                                    root_page as *const LeafNode as *const u8,
553                                                    root_page.meta.node_size as usize,
554                                                )
555                                            };
556                                            snapshot_guard.snapshot_mini_page(
557                                                cur_page_id,
558                                                root_page_ptr,
559                                                root_page.meta.node_size as usize,
560                                            );
561                                            root_page.set_snapshot_version(
562                                                local_thread_snapshot_version,
563                                                false,
564                                            );
565                                            snapshot_guard.snapshot_root_page(cur_page_id);
566                                        }
567                                    }
568                                    _ => {
569                                        panic!(
570                                            "The root node is not a mini-page in cache-only mode"
571                                        )
572                                    }
573                                }
574                            } else {
575                                let root_page = cur_page.load_base_page_mut();
576                                if root_page.get_snapshot_version() < local_thread_snapshot_version
577                                {
578                                    let root_page_ptr = unsafe {
579                                        std::slice::from_raw_parts(
580                                            root_page as *const LeafNode as *const u8,
581                                            root_page.meta.node_size as usize,
582                                        )
583                                    };
584                                    snapshot_guard.snapshot_base_page(
585                                        cur_page_id,
586                                        root_page_ptr,
587                                        root_page.meta.node_size as usize,
588                                    );
589                                    root_page
590                                        .set_snapshot_version(local_thread_snapshot_version, false);
591                                    snapshot_guard.snapshot_root_page(cur_page_id);
592                                }
593                            }
594                        }
595                        _ => {}
596                    }
597                }
598
599                // only for the case of root node split
600
601                // In cache-only mode, the root mini-page node is split into two equal-sized mini-pages
602                if self.cache_only {
603                    // Create a new mini-page of the same size as the current root node
604                    // Assuming CB is at least able to hold two leaf-page sized mini-pages
605                    let mini_page_guard = self
606                        .storage
607                        .alloc_mini_page(self.config.leaf_page_size)
608                        .expect("Fail to allocate a mini-page during root split");
609                    LeafNode::initialize_mini_page(
610                        &mini_page_guard,
611                        self.config.leaf_page_size,
612                        MiniPageNextLevel::new_null(),
613                        true,
614                    );
615                    let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
616                    let mini_loc = PageLocation::Mini(new_mini_ptr);
617
618                    // Insert the new page into mapping table
619                    let (sibling_id, _mini_lock) = self
620                        .storage
621                        .mapping_table()
622                        .insert_mini_page_mapping(mini_loc);
623
624                    // Split current page with the newly created mini page
625                    let cur_page_loc = cur_page.get_page_location().clone();
626                    match cur_page_loc {
627                        PageLocation::Mini(ptr) => {
628                            let cur_mini_page = cur_page.load_cache_page_mut(ptr);
629                            let sibling_page = unsafe { &mut *new_mini_ptr };
630                            let split_key = cur_mini_page.split(sibling_page, true);
631
632                            let mut new_root_builder = InnerNodeBuilder::new();
633                            new_root_builder
634                                .set_left_most_page_id(cur_page_id)
635                                .set_children_is_leaf(true)
636                                .add_record(split_key, sibling_id);
637
638                            let new_root_ptr = new_root_builder.build();
639                            unsafe { (*new_root_ptr).set_root(true) };
640                            self.root_page_id
641                                .store(PageID::from_pointer(new_root_ptr).raw(), Ordering::Release);
642
643                            info!(sibling = sibling_id.raw(), "New root node installed!");
644
645                            debug_assert!(cur_mini_page.meta.meta_count_with_fence() > 0);
646                            debug_assert!(sibling_page.meta.meta_count_with_fence() > 0);
647
648                            return Ok(true);
649                        }
650                        _ => {
651                            panic!("The root node is not a mini-page in cache-only mode")
652                        }
653                    }
654                }
655
656                let mut x_page = cur_page;
657
658                let (sibling_id, mut sibling_entry) = self.alloc_base_page_and_lock();
659
660                info!(sibling = sibling_id.raw(), "Splitting root node!");
661
662                let sibling = sibling_entry.load_base_page_mut();
663
664                let leaf_node = x_page.load_base_page_mut();
665                let split_key = leaf_node.split(sibling, false);
666
667                let mut new_root_builder = InnerNodeBuilder::new();
668                new_root_builder
669                    .set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE))
670                    .set_left_most_page_id(cur_page_id)
671                    .set_children_is_leaf(true)
672                    .add_record(split_key, sibling_id);
673
674                let new_root_ptr = new_root_builder.build();
675
676                unsafe { (*new_root_ptr).set_root(true) };
677
678                self.root_page_id
679                    .store(PageID::from_pointer(new_root_ptr).raw(), Ordering::Release);
680
681                info!(sibling = sibling_id.raw(), "New root node installed!");
682                Ok(true)
683            }
684        }
685    }
686
687    fn alloc_base_page_and_lock(&self) -> (PageID, LeafEntryXLocked<'_>) {
688        let (pid, base_entry) = self.mapping_table().alloc_base_page_mapping();
689
690        (pid, base_entry)
691    }
692
693    fn try_split_inner<'a>(
694        &self,
695        cur_page: PageID,
696        parent: Option<ReadGuard<'a>>,
697    ) -> Result<(bool, Option<ReadGuard<'a>>), TreeError> {
698        let cur_node = ReadGuard::try_read(cur_page.as_inner_node())?;
699
700        check_parent!(self, cur_page, parent);
701
702        let should_split = cur_node.as_ref().meta.get_split_flag();
703
704        if !should_split {
705            return Ok((false, parent));
706        }
707
708        info!(has_parent = parent.is_some(), "split inner node");
709
710        // CPR snapshot guard
711        let snapshot_guard = match CPRSnapShotMgr::get_snapshot_guard(self.snapshot_mgr.clone()) {
712            Ok(guard) => guard,
713            Err(()) => {
714                return Err(TreeError::NeedRestart);
715            }
716        };
717
718        match parent {
719            Some(p) => {
720                let mut x_cur = cur_node.upgrade().map_err(|(_x, e)| e)?;
721                let mut x_parent = p.upgrade().map_err(|(_x, e)| e)?;
722
723                // CPR snapshot
724                if snapshot_guard.is_protected() {
725                    let local_thread_snapshot_version =
726                        CPRSnapShotMgr::get_snapshot_thread_version();
727
728                    match snapshot_guard.get_local_phase_id() {
729                        // PREPARE
730                        1 => {
731                            // Abort if either page has been taken a snapshot before.
732                            if x_parent.as_ref().get_snapshot_version()
733                                > local_thread_snapshot_version
734                                || x_cur.as_ref().get_snapshot_version()
735                                    > local_thread_snapshot_version
736                            {
737                                return Err(TreeError::NeedRestart);
738                            }
739                        }
740                        // INPROG or SWEEPING
741                        2 | 3 => {
742                            if x_parent.as_ref().get_snapshot_version()
743                                < local_thread_snapshot_version
744                            {
745                                snapshot_guard.snapshot_inner_node(x_parent.as_ref());
746                                x_parent
747                                    .as_mut()
748                                    .set_snapshot_version(local_thread_snapshot_version);
749
750                                if x_parent.as_ref().is_root() {
751                                    let root_id = self.root_page_id.load(Ordering::Acquire);
752                                    assert_eq!(
753                                        root_id,
754                                        PageID::from_pointer(x_parent.as_ref() as *const InnerNode)
755                                            .raw()
756                                    );
757
758                                    snapshot_guard.snapshot_root_page(PageID::from_pointer(
759                                        x_parent.as_ref() as *const InnerNode,
760                                    ));
761                                }
762                            }
763
764                            if x_cur.as_ref().get_snapshot_version() < local_thread_snapshot_version
765                            {
766                                snapshot_guard.snapshot_inner_node(x_cur.as_ref());
767                                x_cur
768                                    .as_mut()
769                                    .set_snapshot_version(local_thread_snapshot_version);
770                            }
771                        }
772                        _ => {}
773                    }
774                }
775
776                let split_key = x_cur.as_ref().get_split_key();
777
778                let mut sibling_builder = InnerNodeBuilder::new();
779                sibling_builder.set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE));
780
781                let success = x_parent
782                    .as_mut()
783                    .insert(&split_key, sibling_builder.get_page_id());
784                if !success {
785                    x_parent.as_mut().meta.set_split_flag();
786                    return Err(TreeError::NeedRestart);
787                }
788
789                x_cur.as_mut().split(&mut sibling_builder);
790
791                sibling_builder.build();
792
793                Ok((true, Some(x_parent.downgrade())))
794            }
795            None => {
796                let mut x_cur = cur_node.upgrade().map_err(|(_x, e)| e)?;
797
798                // CPR snapshot
799                if snapshot_guard.is_protected() {
800                    let local_thread_snapshot_version =
801                        CPRSnapShotMgr::get_snapshot_thread_version();
802
803                    match snapshot_guard.get_local_phase_id() {
804                        // PREPARE
805                        1 => {
806                            // Abort if either page has been taken a snapshot before.
807                            if x_cur.as_ref().get_snapshot_version() > local_thread_snapshot_version
808                            {
809                                return Err(TreeError::NeedRestart);
810                            }
811                        }
812                        // INPROG or SWEEPING
813                        2 | 3 => {
814                            if x_cur.as_ref().get_snapshot_version() < local_thread_snapshot_version
815                            {
816                                snapshot_guard.snapshot_inner_node(x_cur.as_ref());
817                                x_cur
818                                    .as_mut()
819                                    .set_snapshot_version(local_thread_snapshot_version);
820
821                                // snapshot root
822                                let root_id = self.root_page_id.load(Ordering::Acquire);
823                                assert!(x_cur.as_ref().is_root());
824                                assert_eq!(
825                                    root_id,
826                                    PageID::from_pointer(x_cur.as_ref() as *const InnerNode).raw()
827                                );
828
829                                snapshot_guard.snapshot_root_page(PageID::from_pointer(
830                                    x_cur.as_ref() as *const InnerNode,
831                                ));
832                            }
833                        }
834                        _ => {}
835                    }
836                }
837
838                let mut sibling_builder = InnerNodeBuilder::new();
839                sibling_builder.set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE));
840                let sibling_id = sibling_builder.get_page_id();
841
842                let split_key = x_cur.as_mut().split(&mut sibling_builder);
843                x_cur.as_mut().set_root(false);
844
845                let mut new_root_builder = InnerNodeBuilder::new();
846                new_root_builder
847                    .set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE))
848                    .set_left_most_page_id(cur_page)
849                    .set_children_is_leaf(false)
850                    .add_record(split_key, sibling_id);
851                sibling_builder.build();
852                let new_root_ptr = new_root_builder.build();
853
854                unsafe { (*new_root_ptr).set_root(true) };
855
856                let _x_root = ReadGuard::try_read(new_root_ptr)
857                    .unwrap()
858                    .upgrade()
859                    .unwrap();
860                self.root_page_id
861                    .store(PageID::from_pointer(new_root_ptr).raw(), Ordering::Release);
862
863                info!(
864                    has_parent = parent.is_some(),
865                    cur = cur_page.raw(),
866                    "finished split inner node"
867                );
868
869                Ok((true, parent))
870            }
871        }
872    }
873
874    pub(crate) fn traverse_to_leaf(
875        &self,
876        key: &[u8],
877        aggressive_split: bool,
878    ) -> Result<(PageID, Option<ReadGuard<'_>>), TreeError> {
879        let (mut cur_page, mut cur_is_leaf) = self.get_root_page();
880        let mut parent: Option<ReadGuard> = None;
881
882        loop {
883            if aggressive_split {
884                if cur_is_leaf
885                    && !cur_page.is_inner_node_pointer()
886                    && self.try_split_leaf(cur_page, &parent)?
887                {
888                    return Err(TreeError::NeedRestart);
889                } else if !cur_is_leaf {
890                    let (split_success, new_parent) = self.try_split_inner(cur_page, parent)?;
891                    if split_success {
892                        return Err(TreeError::NeedRestart);
893                    } else {
894                        parent = new_parent;
895                    }
896                }
897            }
898
899            if cur_is_leaf {
900                return Ok((cur_page, parent));
901            } else {
902                let next = ReadGuard::try_read(cur_page.as_inner_node())?;
903
904                check_parent!(self, cur_page, parent);
905
906                let next_node = next.as_ref();
907                let next_is_leaf = next_node.meta.children_is_leaf();
908                let pos = next_node.lower_bound(key);
909                let kv_meta = next_node.get_kv_meta(pos as u16);
910                cur_page = next_node.get_value(kv_meta);
911                cur_is_leaf = next_is_leaf;
912                parent = Some(next);
913            }
914        }
915    }
916
917    fn write_inner(&self, write_op: WriteOp, aggressive_split: bool) -> Result<(), TreeError> {
918        let (pid, parent) = self.traverse_to_leaf(write_op.key, aggressive_split)?;
919
920        let mut leaf_entry = self.mapping_table().get_mut(&pid);
921
922        check_parent!(self, pid, parent);
923
924        let page_loc = leaf_entry.get_page_location();
925        match page_loc {
926            PageLocation::Null => {
927                if !self.cache_only {
928                    panic!("Found an Null page in non cache-only mode");
929                }
930
931                if write_op.op_type == OpType::Delete {
932                    return Ok(());
933                }
934
935                // Create a new mini-page to replace the null page
936                let mini_page_size = LeafNode::get_chain_size_hint(
937                    write_op.key.len(),
938                    write_op.value.len(),
939                    &self.mini_page_size_classes,
940                    self.cache_only,
941                );
942
943                // CPR snapshot guard for proper snapshot version setting of the root page
944                let snapshot_guard =
945                    match CPRSnapShotMgr::get_snapshot_guard(self.snapshot_mgr.clone()) {
946                        Ok(guard) => guard,
947                        Err(()) => {
948                            return Err(TreeError::NeedRestart);
949                        }
950                    };
951
952                if snapshot_guard.is_protected() {
953                    // Snapshot the NULL page assuming it is of version v
954                    // as we don't know whether the existing page has been snapshotted or not
955                    // and the new page will be of version v+1.
956                    // TODO: Versioning of NULL page
957                    snapshot_guard.snapshot_mini_page(pid, &[], 0);
958                }
959
960                let mini_page_guard = self.storage.alloc_mini_page(mini_page_size)?;
961                LeafNode::initialize_mini_page(
962                    &mini_page_guard,
963                    mini_page_size,
964                    MiniPageNextLevel::new_null(),
965                    true,
966                );
967                let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
968                let mini_loc = PageLocation::Mini(new_mini_ptr);
969
970                leaf_entry.create_cache_page_loc(mini_loc);
971
972                let mini_page_ref = leaf_entry.load_cache_page_mut(new_mini_ptr);
973                let insert_success =
974                    mini_page_ref.insert(write_op.key, write_op.value, write_op.op_type, 0);
975                assert!(insert_success);
976
977                debug_assert!(mini_page_ref.meta.meta_count_with_fence() > 0);
978                counter!(InsertCreatedMiniPage);
979            }
980            _ => {
981                leaf_entry.insert(
982                    write_op.key,
983                    write_op.value,
984                    parent,
985                    write_op.op_type,
986                    &self.storage,
987                    &self.write_load_full_page,
988                    &self.cache_only,
989                    &self.mini_page_size_classes,
990                )?;
991
992                if leaf_entry.cache_page_about_to_evict(&self.storage) {
993                    // we don't care about the result here
994                    _ = leaf_entry.move_cache_page_to_tail(&self.storage);
995                }
996
997                if let Some(wal) = &self.wal {
998                    let lsn = wal.append_and_wait(&write_op, leaf_entry.get_disk_offset());
999                    leaf_entry.update_lsn(lsn);
1000                }
1001            }
1002        }
1003
1004        Ok(())
1005    }
1006
1007    /// Make sure you're not holding any lock while calling this function.
1008    pub(crate) fn evict_from_circular_buffer(&self) -> Result<usize, TreeError> {
1009        // Why we need to evict multiple times?
1010        // because we don't want each alloc to trigger evict, i.e., we want alloc to fail less often.
1011        // with default 1024 bytes, one eviction allows us to alloc 1024 bytes (4 256-byte mini pages) without failure.
1012        const TARGET_EVICT_SIZE: usize = 1024;
1013        let mut evicted = 0;
1014
1015        // A corner case: we may not have enough memory to evict (i.e., the buffer might be empty now)
1016        let mut retry_cnt = 0;
1017
1018        while evicted < TARGET_EVICT_SIZE && retry_cnt < 10 {
1019            let n = self
1020                .storage
1021                .evict_from_buffer(|mini_page_handle: &TombstoneHandle| {
1022                    eviction_callback(mini_page_handle, self)
1023                })?;
1024            evicted += n as usize;
1025            retry_cnt += 1;
1026        }
1027        info!("stopped evict from circular buffer");
1028        Ok(evicted)
1029    }
1030
1031    /// Insert a key-value pair to the system, overrides existing value if present.
1032    ///
1033    /// ```
1034    /// use bf_tree::BfTree;
1035    /// use bf_tree::LeafReadResult;
1036    ///
1037    /// let mut config = bf_tree::Config::default();
1038    /// config.cb_min_record_size(4);
1039    /// let tree = BfTree::with_config(config, None).unwrap();
1040    /// tree.insert(b"key", b"value");
1041    /// let mut buffer = [0u8; 1024];
1042    /// let read_size = tree.read(b"key", &mut buffer);
1043    ///
1044    /// assert_eq!(read_size, LeafReadResult::Found(5));
1045    /// assert_eq!(&buffer[..5], b"value");
1046    /// ```
1047    pub fn insert(&self, key: &[u8], value: &[u8]) -> LeafInsertResult {
1048        // The input key cannot exceed the configured max key length
1049        if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
1050            return LeafInsertResult::InvalidKV(format!("Key too large {}", key.len()));
1051        }
1052
1053        // The input key has to be one byte at least
1054        if key.is_empty() {
1055            return LeafInsertResult::InvalidKV(format!(
1056                "Key too small {}, at least one byte",
1057                key.len()
1058            ));
1059        }
1060
1061        // The input key value pair cannot exceed the configured max record size
1062        if value.len() > MAX_VALUE_LEN || key.len() + value.len() > self.config.cb_max_record_size {
1063            return LeafInsertResult::InvalidKV(format!(
1064                "Record too large {}, {}, please adjust cb_max_record_size in config",
1065                key.len(),
1066                value.len()
1067            ));
1068        }
1069
1070        // The input key value pair cannot be smaller than the configured min record size
1071        if key.len() + value.len() < self.config.cb_min_record_size {
1072            return LeafInsertResult::InvalidKV(format!(
1073                "Record too small {}, {}, please adjust cb_min_record_size in config",
1074                key.len(),
1075                value.len()
1076            ));
1077        }
1078
1079        let backoff = Backoff::new();
1080        let mut aggressive_split = false;
1081
1082        counter!(Insert);
1083        info!(key_len = key.len(), value_len = value.len(), "insert");
1084
1085        loop {
1086            let result = self.write_inner(WriteOp::make_insert(key, value), aggressive_split);
1087            match result {
1088                Ok(_) => return LeafInsertResult::Success,
1089                Err(TreeError::NeedRestart) => {
1090                    #[cfg(all(feature = "shuttle", test))]
1091                    {
1092                        shuttle::thread::yield_now();
1093                    }
1094                    counter!(InsertNeedRestart);
1095                    aggressive_split = true;
1096                }
1097                Err(TreeError::CircularBufferFull) => {
1098                    info!("insert failed, started evict from circular buffer");
1099                    aggressive_split = true;
1100                    counter!(InsertCircularBufferFull);
1101                    _ = self.evict_from_circular_buffer();
1102                    continue;
1103                }
1104                Err(TreeError::Locked) => {
1105                    counter!(InsertLocked);
1106                    backoff.snooze();
1107                }
1108            }
1109        }
1110    }
1111
1112    /// Read a record from the tree.
1113    /// Returns the number of bytes read.
1114    ///
1115    /// TODO: don't panic if the out_buffer is too small, instead returns a error.
1116    ///
1117    /// ```
1118    /// use bf_tree::BfTree;
1119    /// use bf_tree::LeafReadResult;
1120    ///
1121    /// let mut config = bf_tree::Config::default();
1122    /// config.cb_min_record_size(4);
1123    ///
1124    /// let tree = BfTree::with_config(config, None).unwrap();
1125    /// tree.insert(b"key", b"value");
1126    /// let mut buffer = [0u8; 1024];
1127    /// let read_size = tree.read(b"key", &mut buffer);
1128    /// assert_eq!(read_size, LeafReadResult::Found(5));
1129    /// assert_eq!(&buffer[..5], b"value");
1130    /// ```
1131    pub fn read(&self, key: &[u8], out_buffer: &mut [u8]) -> LeafReadResult {
1132        // The input key cannot exceed the configured max key length
1133        if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
1134            return LeafReadResult::InvalidKey;
1135        }
1136
1137        // The input key has to be one byte at least
1138        if key.is_empty() {
1139            return LeafReadResult::InvalidKey;
1140        }
1141
1142        let backoff = Backoff::new();
1143
1144        info!(key_len = key.len(), "read");
1145        counter!(Read);
1146        let mut aggressive_split = false;
1147
1148        #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
1149        let mut debug_timer = DebugTimerGuard::new(Timer::Read, self.metrics_recorder.clone());
1150
1151        loop {
1152            let result = self.read_inner(key, out_buffer, aggressive_split);
1153            match result {
1154                Ok(v) => {
1155                    #[cfg(any(
1156                        feature = "metrics-rt-debug-all",
1157                        feature = "metrics-rt-debug-timer"
1158                    ))]
1159                    debug_timer.end();
1160
1161                    return v;
1162                }
1163                Err(TreeError::CircularBufferFull) => {
1164                    info!("read promotion failed, started evict from circular buffer");
1165                    aggressive_split = true;
1166                    match self.evict_from_circular_buffer() {
1167                        Ok(_) => continue,
1168                        Err(_) => continue,
1169                    };
1170                }
1171                Err(_) => {
1172                    backoff.spin();
1173                    aggressive_split = true;
1174                }
1175            }
1176        }
1177    }
1178
1179    /// Delete a record from the tree.
1180    ///
1181    /// ```
1182    /// use bf_tree::BfTree;
1183    /// use bf_tree::LeafReadResult;
1184    ///
1185    /// let tree = BfTree::default();
1186    /// tree.insert(b"key", b"value");
1187    /// tree.delete(b"key");
1188    /// let mut buffer = [0u8; 1024];
1189    /// let rt = tree.read(b"key", &mut buffer);
1190    /// assert_eq!(rt, LeafReadResult::Deleted);
1191    /// ```
1192    pub fn delete(&self, key: &[u8]) {
1193        // The input key cannot exceed the configured max key length
1194        if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
1195            return;
1196        }
1197
1198        // The input key has to be one byte at least
1199        if key.is_empty() {
1200            return;
1201        }
1202
1203        let backoff = Backoff::new();
1204
1205        info!(key_len = key.len(), "delete");
1206
1207        let mut aggressive_split = false;
1208
1209        loop {
1210            let result = self.write_inner(WriteOp::make_delete(key), aggressive_split);
1211            match result {
1212                Ok(_) => return,
1213                Err(TreeError::CircularBufferFull) => {
1214                    info!("delete failed, started evict from circular buffer");
1215                    aggressive_split = true;
1216                    match self.evict_from_circular_buffer() {
1217                        Ok(_) => continue,
1218                        Err(_) => continue,
1219                    };
1220                }
1221                Err(_) => {
1222                    aggressive_split = true;
1223                    backoff.spin();
1224                }
1225            }
1226        }
1227    }
1228
1229    /// Scan records in the tree, with starting key and desired scan count.
1230    /// Returns a iterator that yields key-value pairs.
1231    pub fn scan_with_count<'a>(
1232        &'a self,
1233        key: &[u8],
1234        cnt: usize,
1235        return_field: ScanReturnField,
1236    ) -> Result<ScanIter<'a, 'a>, ScanIterError> {
1237        // In cache-only mode, scan is not supported
1238        if self.cache_only {
1239            return Err(ScanIterError::CacheOnlyMode);
1240        }
1241
1242        // The start key cannot exceed the configured max key length
1243        if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
1244            return Err(ScanIterError::InvalidStartKey);
1245        }
1246
1247        // The input key has to be one byte at least
1248        if key.is_empty() {
1249            return Err(ScanIterError::InvalidStartKey);
1250        }
1251
1252        // The count cannot be zero
1253        if cnt == 0 {
1254            return Err(ScanIterError::InvalidCount);
1255        }
1256
1257        Ok(ScanIter::new_with_scan_count(self, key, cnt, return_field))
1258    }
1259
1260    pub fn scan_with_end_key<'a>(
1261        &'a self,
1262        start_key: &[u8],
1263        end_key: &[u8],
1264        return_field: ScanReturnField,
1265    ) -> Result<ScanIter<'a, 'a>, ScanIterError> {
1266        // In cache-only mode, scan is not supported
1267        if self.cache_only {
1268            return Err(ScanIterError::CacheOnlyMode);
1269        }
1270
1271        // The start key cannot exceed the configured max key length
1272        if start_key.len() > self.config.max_fence_len / 2 || start_key.len() > MAX_KEY_LEN {
1273            return Err(ScanIterError::InvalidStartKey);
1274        }
1275
1276        // The input key has to be one byte at least
1277        if start_key.is_empty() {
1278            return Err(ScanIterError::InvalidStartKey);
1279        }
1280
1281        // The end key cannot exceed the configured max key length
1282        if end_key.len() > self.config.max_fence_len / 2 || end_key.len() > MAX_KEY_LEN {
1283            return Err(ScanIterError::InvalidEndKey);
1284        }
1285
1286        // The input key has to be one byte at least
1287        if end_key.is_empty() {
1288            return Err(ScanIterError::InvalidEndKey);
1289        }
1290
1291        // The start key cannot be greater than the end key
1292        let cmp = start_key.cmp(end_key);
1293        if cmp == std::cmp::Ordering::Greater {
1294            return Err(ScanIterError::InvalidKeyRange);
1295        }
1296
1297        Ok(ScanIter::new_with_end_key(
1298            self,
1299            start_key,
1300            end_key,
1301            return_field,
1302        ))
1303    }
1304
1305    #[doc(hidden)]
1306    pub fn scan_mut_with_count<'a>(
1307        &'a self,
1308        key: &'a [u8],
1309        cnt: usize,
1310        return_field: ScanReturnField,
1311    ) -> Result<ScanIterMut<'a, 'a>, ScanIterError> {
1312        // In cache-only mode, scan is not supported
1313        if self.cache_only {
1314            return Err(ScanIterError::CacheOnlyMode);
1315        }
1316
1317        // The start key cannot exceed the configured max key length
1318        if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
1319            return Err(ScanIterError::InvalidStartKey);
1320        }
1321
1322        // The count cannot be zero
1323        if cnt == 0 {
1324            return Err(ScanIterError::InvalidCount);
1325        }
1326
1327        Ok(ScanIterMut::new_with_scan_count(
1328            self,
1329            key,
1330            cnt,
1331            return_field,
1332        ))
1333    }
1334
1335    #[doc(hidden)]
1336    pub fn scan_mut_with_end_key<'a>(
1337        &'a self,
1338        start_key: &'a [u8],
1339        end_key: &'a [u8],
1340        return_field: ScanReturnField,
1341    ) -> Result<ScanIterMut<'a, 'a>, ScanIterError> {
1342        // In cache-only mode, scan is not supported
1343        if self.cache_only {
1344            return Err(ScanIterError::CacheOnlyMode);
1345        }
1346
1347        // The start key cannot exceed the configured max key length
1348        if start_key.len() > self.config.max_fence_len / 2 || start_key.len() > MAX_KEY_LEN {
1349            return Err(ScanIterError::InvalidStartKey);
1350        }
1351
1352        // The end key cannot exceed the configured max key length
1353        if end_key.len() > self.config.max_fence_len / 2 || end_key.len() > MAX_KEY_LEN {
1354            return Err(ScanIterError::InvalidEndKey);
1355        }
1356
1357        Ok(ScanIterMut::new_with_end_key(
1358            self,
1359            start_key,
1360            end_key,
1361            return_field,
1362        ))
1363    }
1364
1365    fn read_inner(
1366        &self,
1367        key: &[u8],
1368        out_buffer: &mut [u8],
1369        aggressive_split: bool,
1370    ) -> Result<LeafReadResult, TreeError> {
1371        let (node, parent) = self.traverse_to_leaf(key, aggressive_split)?;
1372
1373        let mut leaf = self.mapping_table().get(&node);
1374
1375        check_parent!(self, node, parent);
1376
1377        let out = leaf.read(
1378            key,
1379            out_buffer,
1380            self.config.mini_page_binary_search,
1381            self.cache_only,
1382        );
1383        match out {
1384            ReadResult::Mini(r) | ReadResult::Full(r) => {
1385                if leaf.cache_page_about_to_evict(&self.storage) {
1386                    let mut x_leaf = match leaf.try_upgrade(self.snapshot_mgr.clone(), node) {
1387                        Ok(v) => v,
1388                        Err(_) => return Ok(r),
1389                    };
1390                    // we don't care about the result here, because we are in read path, we don't want to block.
1391                    _ = x_leaf.move_cache_page_to_tail(&self.storage);
1392                }
1393
1394                Ok(r)
1395            }
1396
1397            ReadResult::Base(r) => {
1398                counter!(BasePageRead);
1399
1400                // In cache-only mode, no base page should exist
1401                if self.cache_only {
1402                    panic!("Attempt to read a base page while in cache-only mode.");
1403                }
1404
1405                let v = match r {
1406                    LeafReadResult::Found(v) => v,
1407                    _ => return Ok(r),
1408                };
1409
1410                if parent.is_none() || !self.should_promote_read() {
1411                    return Ok(r);
1412                }
1413
1414                let mut x_leaf = match leaf.try_upgrade(self.snapshot_mgr.clone(), node) {
1415                    Ok(x) => x,
1416                    Err(_) => {
1417                        return Ok(r);
1418                    }
1419                };
1420
1421                if self.config.read_record_cache {
1422                    // we do record cache.
1423                    // we roll dice to see if we should insert this value to mini page.
1424
1425                    let out = x_leaf.insert(
1426                        key,
1427                        &out_buffer[0..v as usize],
1428                        parent,
1429                        OpType::Cache,
1430                        &self.storage,
1431                        &self.write_load_full_page,
1432                        &self.cache_only,
1433                        &self.mini_page_size_classes,
1434                    );
1435
1436                    match out {
1437                        Ok(_) => {
1438                            counter!(ReadPromotionOk);
1439                            Ok(r)
1440                        }
1441                        Err(TreeError::Locked) => {
1442                            // We are doing this very optimistically, if contention happens, we just abort and return.
1443                            counter!(ReadPromotionFailed);
1444                            Ok(r)
1445                        }
1446                        Err(TreeError::CircularBufferFull) => {
1447                            counter!(ReadPromotionFailed);
1448                            Err(TreeError::CircularBufferFull)
1449                        }
1450                        Err(TreeError::NeedRestart) => {
1451                            // If we need restart here, potentially because parent is full.
1452                            counter!(ReadPromotionFailed);
1453                            Err(TreeError::NeedRestart)
1454                        }
1455                    }
1456                } else {
1457                    match self.upgrade_to_full_page(x_leaf, parent.unwrap()) {
1458                        Ok(_) | Err(TreeError::Locked) => Ok(r),
1459                        Err(e) => Err(e),
1460                    }
1461                }
1462            }
1463            ReadResult::None => Ok(LeafReadResult::NotFound),
1464        }
1465    }
1466
1467    fn upgrade_to_full_page<'a>(
1468        &'a self,
1469        mut x_leaf: LeafEntryXLocked<'a>,
1470        parent: ReadGuard<'a>,
1471    ) -> Result<LeafEntryXLocked<'a>, TreeError> {
1472        let page_loc = x_leaf.get_page_location().clone();
1473        match page_loc {
1474            PageLocation::Mini(ptr) => {
1475                let mini_page = x_leaf.load_cache_page_mut(ptr);
1476                let h = self.storage.begin_dealloc_mini_page(mini_page)?;
1477                let _merge_result = x_leaf.try_merge_mini_page(&h, parent, &self.storage)?;
1478                let base_offset = mini_page.next_level;
1479                x_leaf.change_to_base_loc();
1480                self.storage.finish_dealloc_mini_page(h);
1481
1482                let base_page_ref = x_leaf.load_base_page_from_buffer();
1483                let full_page_loc =
1484                    upgrade_to_full_page(&self.storage, base_page_ref, base_offset)?;
1485                x_leaf.create_cache_page_loc(full_page_loc);
1486                Ok(x_leaf)
1487            }
1488            PageLocation::Full(_ptr) => Ok(x_leaf),
1489            PageLocation::Base(offset) => {
1490                let base_page_ref = x_leaf.load_base_page(offset);
1491                let next_level = MiniPageNextLevel::new(offset);
1492                let full_page_loc = upgrade_to_full_page(&self.storage, base_page_ref, next_level)?;
1493                x_leaf.create_cache_page_loc(full_page_loc);
1494                Ok(x_leaf)
1495            }
1496            PageLocation::Null => panic!("upgrade_to_full_page on Null page"),
1497        }
1498    }
1499
1500    /// Collect all metrics and reset the metric recorder
1501    /// The caller needs to ensure there are no references to the bf-tree's metrics recorder anymore.
1502    pub fn get_metrics(&mut self) -> Option<serde_json::Value> {
1503        #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
1504        {
1505            let recorder = self.metrics_recorder.take();
1506            match recorder {
1507                Some(r) => {
1508                    let recorders = Arc::try_unwrap(r).expect("Failed to obtain the recorders of bf-tree, please make sure no other references exist.");
1509                    let mut timer_accumulated = TimerRecorder::default();
1510
1511                    // Only collect timer metrics for now
1512                    for r in recorders {
1513                        let t = unsafe { &*r.get() };
1514
1515                        timer_accumulated += t.timers.clone();
1516                    }
1517
1518                    let output = serde_json::json!({
1519                        "Timers": timer_accumulated,
1520                    });
1521
1522                    self.metrics_recorder = Some(Arc::new(ThreadLocal::new()));
1523
1524                    Some(output)
1525                }
1526                None => None,
1527            }
1528        }
1529        #[cfg(not(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer")))]
1530        {
1531            None
1532        }
1533    }
1534}
1535
1536pub(crate) fn key_value_physical_size(key: &[u8], value: &[u8]) -> usize {
1537    let key_size = key.len();
1538    let value_size = value.len();
1539    let meta_size = crate::nodes::KV_META_SIZE;
1540    key_size + value_size + meta_size
1541}
1542
1543pub(crate) fn eviction_callback(
1544    mini_page_handle: &TombstoneHandle,
1545    tree: &BfTree,
1546) -> Result<(), TreeError> {
1547    let mini_page = mini_page_handle.ptr as *mut LeafNode;
1548    let key_to_this_page = if tree.cache_only {
1549        unsafe { &*mini_page }.try_get_key_to_reach_this_node()?
1550    } else {
1551        unsafe { &*mini_page }.get_key_to_reach_this_node()
1552    };
1553
1554    // Here we need to set aggressive split to true, because we would split parent node due to leaf split.
1555    let (pid, parent) = tree.traverse_to_leaf(&key_to_this_page, true)?;
1556    info!(
1557        pid = pid.raw(),
1558        "starting to merge mini page in eviction call back"
1559    );
1560
1561    let mut leaf_entry = tree.mapping_table().get_mut(&pid);
1562
1563    histogram!(EvictNodeSize, unsafe { &*mini_page }.meta.node_size as u64);
1564
1565    match leaf_entry.get_page_location() {
1566        PageLocation::Mini(ptr) => {
1567            {
1568                // In order to lock this node, we need to traverse to this node first;
1569                // but in order to traverse this node, we need to read the keys in this node;
1570                // in order to read the keys in this node, we need to lock this node.
1571                //
1572                // Because we didn't lock the node while reading `key_to_this_page`,
1573                // we need to recheck if the node is still the same node.
1574                if *ptr != mini_page {
1575                    return Err(TreeError::NeedRestart);
1576                }
1577            }
1578
1579            let parent = parent.expect("Mini page must have a parent");
1580            parent.check_version()?;
1581
1582            // In the case of cache_only, the correponding mapping table entry of the mini-page
1583            // is replaced by a non-existant base page
1584            if tree.cache_only {
1585                leaf_entry.change_to_null_loc();
1586            } else {
1587                leaf_entry.try_merge_mini_page(mini_page_handle, parent, &tree.storage)?;
1588                leaf_entry.change_to_base_loc();
1589                // we don't need to dealloc the old_mini_page here because we are in eviction callback.
1590            }
1591
1592            Ok(())
1593        }
1594
1595        PageLocation::Full(ptr) => {
1596            if *ptr != mini_page {
1597                return Err(TreeError::NeedRestart);
1598            }
1599
1600            leaf_entry.merge_full_page(mini_page_handle);
1601            Ok(())
1602        }
1603
1604        // This means the key read from the mini page is corrupted and points to a different page
1605        PageLocation::Base(_offset) => Err(TreeError::NeedRestart),
1606
1607        // This means the key read from the mini page is corrupted and points to a different page
1608        PageLocation::Null => Err(TreeError::NeedRestart),
1609    }
1610}
1611
1612#[cfg(test)]
1613mod tests {
1614    use crate::error::ConfigError;
1615    use crate::BfTree;
1616
1617    #[test]
1618    fn test_mini_page_size_classes() {
1619        let mut size_classes = BfTree::create_mem_page_size_classes(48, 1952, 4096, 64, false);
1620        assert_eq!(
1621            size_classes,
1622            vec![128, 192, 256, 512, 960, 1856, 2048, 4096]
1623        );
1624
1625        size_classes = BfTree::create_mem_page_size_classes(1544, 1544, 3136, 64, true);
1626        assert_eq!(size_classes, vec![1536, 3136]);
1627
1628        size_classes = BfTree::create_mem_page_size_classes(48, 3072, 12288, 64, false);
1629        assert_eq!(
1630            size_classes,
1631            vec![128, 192, 256, 512, 960, 1856, 3648, 7232, 9088, 12288]
1632        );
1633
1634        size_classes = BfTree::create_mem_page_size_classes(4, 1952, 4096, 32, false);
1635        assert_eq!(size_classes, vec![64, 128, 256, 448, 832, 1600, 2048, 4096]);
1636    }
1637
1638    #[test]
1639    fn test_invalid_config_to_build_bf_tree() {
1640        // Min record too small
1641        let mut config = crate::Config::default();
1642        config.cb_min_record_size(4);
1643        config.leaf_page_size(32 * 1024);
1644
1645        if let Err(e) = BfTree::with_config(config.clone(), None) {
1646            match e {
1647                ConfigError::MinimumRecordSize(_) => {}
1648                _ => panic!("Expected InvalidMinimumRecordSize error"),
1649            }
1650        } else {
1651            panic!("Expected error but got Ok");
1652        }
1653
1654        // Max record too large
1655        config = crate::Config::default();
1656        config.cb_max_record_size(64 * 1024);
1657
1658        if let Err(e) = BfTree::with_config(config.clone(), None) {
1659            match e {
1660                ConfigError::MaximumRecordSize(_) => {}
1661                _ => panic!("Expected InvalidMaximumRecordSize error"),
1662            }
1663        } else {
1664            panic!("Expected error but got Ok");
1665        }
1666
1667        // Leaf page size not aligned
1668        config = crate::Config::default();
1669        config.leaf_page_size(4050);
1670
1671        if let Err(e) = BfTree::with_config(config.clone(), None) {
1672            match e {
1673                ConfigError::LeafPageSize(_) => {}
1674                _ => panic!("Expected InvalidLeafPageSize error"),
1675            }
1676        } else {
1677            panic!("Expected error but got Ok");
1678        }
1679
1680        // Circular buffer size too small
1681        config = crate::Config::default();
1682        config.leaf_page_size(16 * 1024);
1683        config.cb_size_byte(16 * 1024);
1684
1685        if let Err(e) = BfTree::with_config(config.clone(), None) {
1686            match e {
1687                ConfigError::CircularBufferSize(_) => {}
1688                _ => panic!("Expected InvalidCircularBufferSize error"),
1689            }
1690        } else {
1691            panic!("Expected error but got Ok");
1692        }
1693
1694        // Circular buffer size not power of two
1695        config = crate::Config::default();
1696        config.cb_size_byte(20 * 1024);
1697        if let Err(e) = BfTree::with_config(config.clone(), None) {
1698            match e {
1699                ConfigError::CircularBufferSize(_) => {}
1700                _ => panic!("Expected InvalidCircularBufferSize error"),
1701            }
1702        } else {
1703            panic!("Expected error but got Ok");
1704        }
1705
1706        // Cache-only mode specific
1707        config = crate::Config::default();
1708        config.cache_only(true);
1709        config.cb_size_byte(2 * 4096);
1710
1711        if let Err(e) = BfTree::with_config(config.clone(), None) {
1712            match e {
1713                ConfigError::CircularBufferSize(_) => {}
1714                _ => panic!("Expected InvalidCircularBufferSize error"),
1715            }
1716        } else {
1717            panic!("Expected error but got Ok");
1718        }
1719    }
1720}