Skip to main content

bf_tree/
tree.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT license.
3
4#[cfg(not(all(feature = "shuttle", test)))]
5use rand::Rng;
6
7#[cfg(all(feature = "shuttle", test))]
8use shuttle::rand::Rng;
9
10use cfg_if::cfg_if;
11
12cfg_if! {
13    if #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))] {
14        use crate::metric::{Timer, TlsRecorder, timer::{TimerRecorder, DebugTimerGuard}};
15        use std::cell::UnsafeCell;
16        use thread_local::ThreadLocal;
17    }
18}
19
20use crate::{
21    check_parent,
22    circular_buffer::{CircularBufferMetrics, TombstoneHandle},
23    counter,
24    error::{ConfigError, TreeError},
25    histogram, info,
26    mini_page_op::{upgrade_to_full_page, LeafEntryXLocked, LeafOperations, ReadResult},
27    nodes::{
28        leaf_node::{LeafKVMeta, LeafReadResult, MiniPageNextLevel, OpType},
29        InnerNode, InnerNodeBuilder, LeafNode, PageID, CACHE_LINE_SIZE, DISK_PAGE_SIZE,
30        INNER_NODE_SIZE, MAX_KEY_LEN, MAX_LEAF_PAGE_SIZE, MAX_VALUE_LEN,
31    },
32    range_scan::{ScanIter, ScanIterMut, ScanReturnField},
33    snapshot::{CPRSnapShotMgr, PhaseId},
34    storage::{LeafStorage, PageLocation, PageTable},
35    sync::{
36        atomic::{AtomicU64, Ordering},
37        Arc,
38    },
39    utils::{get_rng, inner_lock::ReadGuard, Backoff, BfsVisitor, NodeInfo},
40    wal::{WriteAheadLog, WriteOp},
41    Config, StorageBackend,
42};
43use std::path::Path;
44
45/// The bf-tree instance
46pub struct BfTree {
47    pub(crate) root_page_id: AtomicU64,
48    pub(crate) storage: LeafStorage,
49    pub(crate) wal: Option<Arc<WriteAheadLog>>,
50    pub(crate) config: Arc<Config>,
51    pub(crate) write_load_full_page: bool,
52    pub(crate) cache_only: bool, // If true, there is no permenant storage layer thus no durability guarantee of any upsert in the tree
53    pub(crate) mini_page_size_classes: Vec<usize>, // Size classes of mini pages
54    pub(crate) snapshot_mgr: Option<Arc<CPRSnapShotMgr>>, // Manager of a CPR snapshot
55    #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
56    pub metrics_recorder: Option<Arc<ThreadLocal<UnsafeCell<TlsRecorder>>>>, // Per-tree metrics recorder under "metrics-rt-debug" feature
57}
58
59unsafe impl Sync for BfTree {}
60
61unsafe impl Send for BfTree {}
62
63#[derive(Debug, PartialEq, Eq, Clone)]
64pub enum LeafInsertResult {
65    Success,
66    InvalidKV(String),
67}
68
69#[derive(Debug, PartialEq, Eq, Clone)]
70pub enum ScanIterError {
71    CacheOnlyMode,
72    InvalidStartKey,
73    InvalidEndKey,
74    InvalidCount,
75    InvalidKeyRange,
76}
77
78impl Drop for BfTree {
79    fn drop(&mut self) {
80        if let Some(ref wal) = self.wal {
81            wal.stop_background_job();
82        }
83
84        let visitor = BfsVisitor::new_all_nodes(self);
85        for node_info in visitor {
86            match node_info {
87                NodeInfo::Leaf { page_id, .. } => {
88                    let mut leaf = self.mapping_table().get_mut(&page_id);
89                    leaf.dealloc_self(&self.storage, self.cache_only);
90                }
91                NodeInfo::Inner { ptr, .. } => {
92                    if unsafe { &*ptr }.is_valid_disk_offset() {
93                        let disk_offset = unsafe { &*ptr }.disk_offset;
94                        if self.config.storage_backend == StorageBackend::Memory {
95                            // special case for memory backend, we need to deallocate the memory.
96                            self.storage.vfs.dealloc_offset(disk_offset as usize);
97                        }
98                    }
99                    InnerNode::free_node(ptr as *mut InnerNode);
100                }
101            }
102        }
103    }
104}
105
106impl Default for BfTree {
107    fn default() -> Self {
108        Self::new(":memory:", 1024 * 1024 * 32).unwrap()
109    }
110}
111
112impl BfTree {
113    pub(crate) const ROOT_IS_LEAF_MASK: u64 = 0x8000_0000_0000_0000; // This is quite error-prone, make sure the mask is not conflicting with the page id definition.
114
115    /// Create the size classes of all memory pages in acending order based on the record size (key + value) and the leaf page size
116    /// [s_0, s_1, ..., s_x], ascending order.
117    /// Each s_i is of size 2^i * c + size_of(LeafNode)
118    /// where c = (min_record_size + LeafKVMeta) aligned on CACHE_LINE_SIZE and 2^i * c <= leaf_page_size and s_x = leaf_page_size
119    ///
120    /// In non cache-only mode, the largest mini page size is s_(x-1) and full page/leaf base page size is s_x.
121    /// Currently, the design assumes a mini-page can always be successfully merged into a leaf page in one pass (including at most one base page split).
122    /// As such, the following three conditions are sufficient in preventing merge failures.
123    /// C1) x >= 1
124    /// C2) s_x > s_{x-1}
125    /// C3) max_record_size + SizeOf(KVMeta) <= (s_x - s_{x-1} - 2 * (fence_meta + max_key_len)).
126    ///
127    /// In cache-only mode, the largest mini page is s_x as there is no full nor base page.
128    /// Although there is no merging of mini-pages, the design assumes a new record can always be successfully inserted into a mini-page in one pass
129    /// (including at most one mini-page split). As such, the following sufficient condition is required.
130    /// C1) max_record_page + Sizeof(KVMeta) <= (s_x - SizeOf(NodeMeta)) / 2
131    /// C2) if x >= 1, s_x >= s_{x-1} + max_record_size + SizeOf(KVMeta)
132    ///
133    /// C2) is necessary for cache-only mode to guarantee that a mini-page can grow to full page size before being split up to two full-sized pages
134    pub(crate) fn create_mem_page_size_classes(
135        min_record_size_in_byte: usize,
136        max_record_size_in_byte: usize,
137        leaf_page_size_in_byte: usize,
138        max_fence_len_in_byte: usize,
139        cache_only: bool,
140    ) -> Vec<usize> {
141        // Sanity check of the input parameters
142        assert!(
143            min_record_size_in_byte > 1,
144            "cb_min_record_size in config cannot be less than 2"
145        );
146        assert!(
147            min_record_size_in_byte <= max_record_size_in_byte,
148            "cb_min_record_size cannot be larger than cb_max_record_size"
149        );
150        assert!(
151            max_fence_len_in_byte > 0,
152            "max_fence_len in config cannot be zero"
153        );
154        assert!(
155            max_fence_len_in_byte / 2 < max_record_size_in_byte,
156            "max_fence_len/2 cannot be larger than cb_max_record_size"
157        );
158        assert!(
159            leaf_page_size_in_byte <= MAX_LEAF_PAGE_SIZE,
160            "leaf_page_size in config cannot be larger than {}",
161            MAX_LEAF_PAGE_SIZE
162        );
163        assert!(
164            max_fence_len_in_byte / 2 <= MAX_KEY_LEN,
165            "max_key_len in config cannot be larger than {}",
166            MAX_KEY_LEN
167        );
168        assert!(
169            leaf_page_size_in_byte / min_record_size_in_byte <= 4096,
170            "Maximum number of records per page (leaf_page_size/min_record_size) cannot exceed 2^12.", // This is restricted by #bits for value count in NodeMeta
171        );
172
173        // In non cache-only mode, the leaf page should be in the multiple of DISK_PAGE_SIZE.
174        // In cache-only mode, the leaf page should be aligned on cache line size.
175        if !cache_only {
176            assert!(
177                leaf_page_size_in_byte.is_multiple_of(DISK_PAGE_SIZE),
178                "leaf_page_size in config should be multiple of {}",
179                DISK_PAGE_SIZE
180            );
181        } else {
182            assert!(
183                leaf_page_size_in_byte.is_multiple_of(CACHE_LINE_SIZE),
184                "leaf_page_size in config should be multiple of {}",
185                CACHE_LINE_SIZE
186            );
187        }
188
189        let max_record_size_with_meta = max_record_size_in_byte + std::mem::size_of::<LeafKVMeta>();
190        let mut max_mini_page_size: usize;
191
192        if cache_only {
193            // Guarantee C1), C2) for cache-only mode
194            max_mini_page_size = leaf_page_size_in_byte - max_record_size_with_meta;
195            max_mini_page_size = (max_mini_page_size / CACHE_LINE_SIZE) * CACHE_LINE_SIZE;
196
197            assert!(
198                leaf_page_size_in_byte
199                    >= 2 * max_record_size_with_meta + std::mem::size_of::<LeafNode>(),
200                "cb_max_record_size of config should be <= {}",
201                (leaf_page_size_in_byte - std::mem::size_of::<LeafNode>()) / 2
202                    - std::mem::size_of::<LeafKVMeta>()
203            );
204        } else {
205            // Guarantee C1), C2) and C3) for non cache-only mode
206            max_mini_page_size = leaf_page_size_in_byte
207                - max_record_size_with_meta
208                - max_fence_len_in_byte
209                - 2 * std::mem::size_of::<LeafKVMeta>();
210            max_mini_page_size = (max_mini_page_size / CACHE_LINE_SIZE) * CACHE_LINE_SIZE;
211
212            assert!(
213                max_mini_page_size >= max_record_size_with_meta + std::mem::size_of::<LeafNode>(),
214                "cb_max_record_size of config should be <= {}",
215                max_mini_page_size
216                    - std::mem::size_of::<LeafNode>()
217                    - std::mem::size_of::<LeafKVMeta>()
218            );
219        }
220
221        // Generate all size classes for mini-pages
222        let mut mem_page_size_classes = Vec::new();
223
224        let base: i32 = 2;
225        let mut record_num_per_page_exp: u32 = 0;
226        let c: usize = min_record_size_in_byte + std::mem::size_of::<LeafKVMeta>();
227
228        // No need to consider fence here as mini-pages have no fences.
229        let mut size_class =
230            base.pow(record_num_per_page_exp) as usize * c + std::mem::size_of::<LeafNode>();
231
232        // Memory page size is aligned on cache line size
233        if !size_class.is_multiple_of(CACHE_LINE_SIZE) {
234            size_class = (size_class / CACHE_LINE_SIZE + 1) * CACHE_LINE_SIZE;
235        }
236
237        while size_class <= max_mini_page_size {
238            if mem_page_size_classes.is_empty()
239                || (mem_page_size_classes[mem_page_size_classes.len() - 1] < size_class)
240            {
241                mem_page_size_classes.push(size_class);
242            }
243
244            record_num_per_page_exp += 1;
245            size_class =
246                base.pow(record_num_per_page_exp) as usize * c + std::mem::size_of::<LeafNode>();
247
248            if !size_class.is_multiple_of(CACHE_LINE_SIZE) {
249                size_class = (size_class / CACHE_LINE_SIZE + 1) * CACHE_LINE_SIZE;
250            }
251        }
252
253        if !cache_only {
254            assert!(!mem_page_size_classes.is_empty());
255        }
256
257        // Add the largest mini page size if not already added
258        if mem_page_size_classes.is_empty()
259            || mem_page_size_classes[mem_page_size_classes.len() - 1] < max_mini_page_size
260        {
261            mem_page_size_classes.push(max_mini_page_size);
262        }
263
264        // The largest page size is the full leaf page size
265        if mem_page_size_classes.is_empty()
266            || mem_page_size_classes[mem_page_size_classes.len() - 1] < leaf_page_size_in_byte
267        {
268            mem_page_size_classes.push(leaf_page_size_in_byte);
269        }
270
271        if !cache_only {
272            assert!(mem_page_size_classes.len() >= 2);
273        } else {
274            assert!(!mem_page_size_classes.is_empty());
275        }
276
277        mem_page_size_classes
278    }
279
280    /// Create a new bf-tree instance with customized storage backend and
281    /// circular buffer size
282    ///
283    /// For in-memory tree, use `:memory:` as file path.
284    /// For cache-only tree, use `:cache:` as file path
285    /// For disk tree, file_path is the path to the index file
286    ///
287    /// Mini page cache must be at least 8192 bytes for practical workloads.
288    ///
289    /// ```
290    /// use bf_tree::BfTree;
291    /// let tree = BfTree::new(":memory:", 8192).unwrap();
292    /// ```
293    pub fn new(file_path: impl AsRef<Path>, cache_size_byte: usize) -> Result<Self, ConfigError> {
294        let config = Config::new(file_path, cache_size_byte);
295        Self::with_config(config, None)
296    }
297
298    /// Create a new bf-tree instance with customized configuration based on
299    /// a config file
300    pub fn new_with_config_file<P: AsRef<Path>>(config_file_path: P) -> Result<Self, ConfigError> {
301        let config = Config::new_with_config_file(config_file_path);
302        Self::with_config(config, None)
303    }
304
305    /// Initialize the bf-tree with provided config. For advanced user only.
306    /// An optional pre-allocated buffer pointer can be provided to use as the buffer pool memory.
307    pub fn with_config(config: Config, buffer_ptr: Option<*mut u8>) -> Result<Self, ConfigError> {
308        // Validate the config first
309        config.validate()?;
310
311        let wal = match config.write_ahead_log.as_ref() {
312            Some(wal_config) => {
313                let wal = WriteAheadLog::new(wal_config.clone());
314                Some(wal)
315            }
316            None => None,
317        };
318        let write_load_full = config.write_load_full_page;
319        let config = Arc::new(config);
320
321        // In cache-only mode, the initial root page is a full mini-page
322        if config.cache_only {
323            let snapshot_mgr = if config.use_snapshot {
324                Some(Arc::new(CPRSnapShotMgr::new(config.snapshot_version)))
325            } else {
326                None
327            };
328
329            let leaf_storage = LeafStorage::new(config.clone(), buffer_ptr, snapshot_mgr.clone());
330
331            // CPR snapshot guard for proper snapshot version setting of the root page
332            // No need for CPR logic
333            let snapshot_guard = match CPRSnapShotMgr::get_snapshot_guard(snapshot_mgr.clone()) {
334                Ok(guard) => guard,
335                Err(()) => {
336                    panic!("Failed to acquire a snapshot guard for root page initialization");
337                }
338            };
339
340            // Assuming CB can accommodate at least 2 leaf pages at the same time
341            let mini_page_guard = (leaf_storage)
342                .alloc_mini_page(config.leaf_page_size)
343                .expect("Fail to allocate a mini-page as initial root node");
344            LeafNode::initialize_mini_page(
345                &mini_page_guard,
346                config.leaf_page_size,
347                MiniPageNextLevel::new_null(),
348                true,
349                snapshot_guard.snapshot_version(),
350            );
351            let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
352            let mini_loc = PageLocation::Mini(new_mini_ptr);
353
354            let (root_id, root_lock) = leaf_storage
355                .mapping_table()
356                .insert_mini_page_mapping(mini_loc);
357            assert_eq!(root_id.as_id(), 0);
358
359            drop(root_lock);
360            drop(mini_page_guard);
361            let root_id = root_id.raw() | Self::ROOT_IS_LEAF_MASK;
362
363            return Ok(Self {
364                root_page_id: AtomicU64::new(root_id),
365                storage: leaf_storage,
366                wal,
367                cache_only: config.cache_only,
368                write_load_full_page: write_load_full,
369                mini_page_size_classes: Self::create_mem_page_size_classes(
370                    config.cb_min_record_size,
371                    config.cb_max_record_size,
372                    config.leaf_page_size,
373                    config.max_fence_len,
374                    config.cache_only,
375                ),
376                snapshot_mgr,
377                config,
378                #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
379                metrics_recorder: Some(Arc::new(ThreadLocal::new())),
380            });
381        }
382
383        let snapshot_mgr = if config.use_snapshot {
384            Some(Arc::new(CPRSnapShotMgr::new(config.snapshot_version)))
385        } else {
386            None
387        };
388
389        let leaf_storage = LeafStorage::new(config.clone(), buffer_ptr, snapshot_mgr.clone());
390
391        // CPR snapshot guard for proper snapshot version setting of the root page
392        // No need for CPR logic
393        let snapshot_guard = match CPRSnapShotMgr::get_snapshot_guard(snapshot_mgr.clone()) {
394            Ok(guard) => guard,
395            Err(()) => {
396                panic!("Failed to acquire a snapshot guard for root page initialization");
397            }
398        };
399
400        let (root_id, root_lock) = leaf_storage
401            .mapping_table()
402            .alloc_base_page_mapping(snapshot_guard.snapshot_version());
403        drop(root_lock);
404        assert_eq!(root_id.as_id(), 0);
405
406        let root_id = root_id.raw() | Self::ROOT_IS_LEAF_MASK;
407        Ok(Self {
408            root_page_id: AtomicU64::new(root_id),
409            storage: leaf_storage,
410            wal,
411            cache_only: config.cache_only,
412            write_load_full_page: write_load_full,
413            mini_page_size_classes: Self::create_mem_page_size_classes(
414                config.cb_min_record_size,
415                config.cb_max_record_size,
416                config.leaf_page_size,
417                config.max_fence_len,
418                config.cache_only,
419            ),
420            snapshot_mgr,
421            config,
422            #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
423            metrics_recorder: Some(Arc::new(ThreadLocal::new())),
424        })
425    }
426
427    pub fn config(&self) -> &Config {
428        &self.config
429    }
430
431    /// Get the buffer metrics of the circular buffer.
432    /// This is a blocking call, will stop all other buffer operations,
433    /// use it wisely.
434    pub fn get_buffer_metrics(&self) -> CircularBufferMetrics {
435        self.storage.get_buffer_metrics()
436    }
437
438    /// returns the root page id and whether it is a leaf node.
439    pub(crate) fn get_root_page(&self) -> (PageID, bool) {
440        let root_page_id = self.root_page_id.load(Ordering::Acquire);
441        let root_is_leaf = (root_page_id & Self::ROOT_IS_LEAF_MASK) != 0;
442        let clean = root_page_id & (!Self::ROOT_IS_LEAF_MASK);
443
444        let page_id = if root_is_leaf {
445            PageID::from_id(clean)
446        } else {
447            PageID::from_pointer(clean as *const InnerNode)
448        };
449
450        (page_id, root_is_leaf)
451    }
452
453    pub(crate) fn mapping_table(&self) -> &PageTable {
454        self.storage.mapping_table()
455    }
456
457    pub(crate) fn should_promote_read(&self) -> bool {
458        get_rng().random_range(0..100) < self.config.read_promotion_rate.load(Ordering::Relaxed)
459    }
460
461    pub(crate) fn should_promote_scan_page(&self) -> bool {
462        get_rng().random_range(0..100) < self.config.scan_promotion_rate.load(Ordering::Relaxed)
463    }
464
465    /// Chance% to promote a base read record to mini page.
466    pub fn update_read_promotion_rate(&self, new_rate: usize) {
467        self.config
468            .read_promotion_rate
469            .store(new_rate, Ordering::Relaxed);
470    }
471
472    fn try_split_leaf(
473        &self,
474        cur_page_id: PageID,
475        parent: &Option<ReadGuard>,
476    ) -> Result<bool, TreeError> {
477        debug_assert!(cur_page_id.is_id());
478
479        // here we need to acquire x-lock for performance reason:
480        // if we acquire s-lock, it's very difficult for us to later upgrade to x-lock, because rwlock favors readers:
481        //      consider readers keep coming, we will never be able to upgrade to x-lock.
482        let mut cur_page = self.mapping_table().get_mut(&cur_page_id);
483
484        check_parent!(self, cur_page_id, parent);
485
486        let should_split = cur_page.get_split_flag();
487        if !should_split {
488            return Ok(false);
489        }
490
491        // CPR snapshot guard
492        let snapshot_guard = match CPRSnapShotMgr::get_snapshot_guard(self.snapshot_mgr.clone()) {
493            Ok(guard) => guard,
494            Err(()) => {
495                return Err(TreeError::NeedRestart);
496            }
497        };
498
499        match parent {
500            Some(_) => {
501                unreachable!("Leaf node split should not happen here");
502            }
503            None => {
504                if snapshot_guard.is_protected() {
505                    let local_thread_snapshot_version = snapshot_guard.snapshot_version();
506                    match snapshot_guard.get_local_phase_id() {
507                        PhaseId::Rest => {
508                            if self.cache_only {
509                                let root_page_loc = cur_page.get_page_location().clone();
510                                match root_page_loc {
511                                    PageLocation::Mini(ptr) => {
512                                        let root_page = cur_page.load_cache_page_mut(ptr);
513                                        if root_page.get_clean_snapshot_version()
514                                            < local_thread_snapshot_version
515                                        {
516                                            root_page.set_snapshot_version(
517                                                local_thread_snapshot_version,
518                                                false,
519                                            );
520                                        }
521                                    }
522                                    _ => {
523                                        panic!(
524                                            "The root node is not a mini-page in cache-only mode"
525                                        )
526                                    }
527                                }
528                            } else {
529                                let root_page = cur_page.load_base_page_mut();
530                                if root_page.get_clean_snapshot_version()
531                                    < local_thread_snapshot_version
532                                {
533                                    root_page
534                                        .set_snapshot_version(local_thread_snapshot_version, false);
535                                }
536                            }
537                        }
538                        PhaseId::Prepare => {
539                            if self.cache_only {
540                                let root_page_loc = cur_page.get_page_location().clone();
541                                match root_page_loc {
542                                    PageLocation::Mini(ptr) => {
543                                        let root_page = cur_page.load_cache_page_mut(ptr);
544                                        if root_page.get_clean_snapshot_version()
545                                            > local_thread_snapshot_version
546                                        {
547                                            return Err(TreeError::NeedRestart);
548                                        }
549                                    }
550                                    _ => {
551                                        panic!(
552                                            "The root node is not a mini-page in cache-only mode"
553                                        )
554                                    }
555                                }
556                            } else {
557                                let root_page = cur_page.load_base_page_mut();
558                                if root_page.get_clean_snapshot_version()
559                                    > local_thread_snapshot_version
560                                {
561                                    return Err(TreeError::NeedRestart);
562                                }
563                            }
564                        }
565                        PhaseId::InProgress | PhaseId::Sweep => {
566                            if self.cache_only {
567                                let root_page_loc = cur_page.get_page_location().clone();
568                                match root_page_loc {
569                                    PageLocation::Mini(ptr) => {
570                                        let root_page = cur_page.load_cache_page_mut(ptr);
571                                        if root_page.get_clean_snapshot_version()
572                                            < local_thread_snapshot_version
573                                        {
574                                            let root_page_ptr = unsafe {
575                                                std::slice::from_raw_parts(
576                                                    root_page as *const LeafNode as *const u8,
577                                                    root_page.meta.node_size as usize,
578                                                )
579                                            };
580                                            snapshot_guard.snapshot_mini_page(
581                                                cur_page_id,
582                                                root_page_ptr,
583                                                root_page.meta.node_size as usize,
584                                            );
585                                            root_page.set_snapshot_version(
586                                                local_thread_snapshot_version,
587                                                false,
588                                            );
589                                            snapshot_guard.snapshot_root_page(cur_page_id);
590                                        }
591                                    }
592                                    _ => {
593                                        panic!(
594                                            "The root node is not a mini-page in cache-only mode"
595                                        )
596                                    }
597                                }
598                            } else {
599                                let root_page = cur_page.load_base_page_mut();
600                                if root_page.get_clean_snapshot_version()
601                                    < local_thread_snapshot_version
602                                {
603                                    let root_page_ptr = unsafe {
604                                        std::slice::from_raw_parts(
605                                            root_page as *const LeafNode as *const u8,
606                                            root_page.meta.node_size as usize,
607                                        )
608                                    };
609                                    snapshot_guard.snapshot_base_page(
610                                        cur_page_id,
611                                        root_page_ptr,
612                                        root_page.meta.node_size as usize,
613                                    );
614                                    root_page
615                                        .set_snapshot_version(local_thread_snapshot_version, false);
616                                    snapshot_guard.snapshot_root_page(cur_page_id);
617                                }
618                            }
619                        }
620                    }
621                }
622
623                // only for the case of root node split
624
625                // In cache-only mode, the root mini-page node is split into two equal-sized mini-pages
626                if self.cache_only {
627                    // Create a new mini-page of the same size as the current root node
628                    // Assuming CB is at least able to hold two leaf-page sized mini-pages
629                    let mini_page_guard = self
630                        .storage
631                        .alloc_mini_page(self.config.leaf_page_size)
632                        .expect("Fail to allocate a mini-page during root split");
633                    LeafNode::initialize_mini_page(
634                        &mini_page_guard,
635                        self.config.leaf_page_size,
636                        MiniPageNextLevel::new_null(),
637                        true,
638                        snapshot_guard.snapshot_version(),
639                    );
640                    let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
641                    let mini_loc = PageLocation::Mini(new_mini_ptr);
642
643                    // Insert the new page into mapping table
644                    let (sibling_id, _mini_lock) = self
645                        .storage
646                        .mapping_table()
647                        .insert_mini_page_mapping(mini_loc);
648
649                    // Split current page with the newly created mini page
650                    let cur_page_loc = cur_page.get_page_location().clone();
651                    match cur_page_loc {
652                        PageLocation::Mini(ptr) => {
653                            let cur_mini_page = cur_page.load_cache_page_mut(ptr);
654                            let sibling_page = unsafe { &mut *new_mini_ptr };
655                            let split_key = cur_mini_page.split(
656                                sibling_page,
657                                true,
658                                snapshot_guard.snapshot_version(),
659                            );
660
661                            let mut new_root_builder = InnerNodeBuilder::new();
662                            new_root_builder
663                                .set_left_most_page_id(cur_page_id)
664                                .set_children_is_leaf(true)
665                                .add_record(split_key, sibling_id);
666
667                            let new_root_ptr =
668                                new_root_builder.build(snapshot_guard.snapshot_version());
669                            unsafe { (*new_root_ptr).set_root(true) };
670                            self.root_page_id
671                                .store(PageID::from_pointer(new_root_ptr).raw(), Ordering::Release);
672
673                            info!(sibling = sibling_id.raw(), "New root node installed!");
674
675                            debug_assert!(cur_mini_page.meta.meta_count_with_fence() > 0);
676                            debug_assert!(sibling_page.meta.meta_count_with_fence() > 0);
677
678                            return Ok(true);
679                        }
680                        _ => {
681                            panic!("The root node is not a mini-page in cache-only mode")
682                        }
683                    }
684                }
685
686                let mut x_page = cur_page;
687
688                // Guarantee: the new sibling node has the same snapshot version as the current thread.
689                let (sibling_id, mut sibling_entry) =
690                    self.alloc_base_page_and_lock(snapshot_guard.snapshot_version());
691
692                info!(sibling = sibling_id.raw(), "Splitting root node!");
693
694                let sibling = sibling_entry.load_base_page_mut();
695
696                let leaf_node = x_page.load_base_page_mut();
697                let split_key = leaf_node.split(sibling, false, snapshot_guard.snapshot_version());
698
699                // Guarantee: The new root node has the same snapshot version as the current thread.
700                let mut new_root_builder = InnerNodeBuilder::new();
701                new_root_builder
702                    .set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE))
703                    .set_left_most_page_id(cur_page_id)
704                    .set_children_is_leaf(true)
705                    .add_record(split_key, sibling_id);
706
707                let new_root_ptr = new_root_builder.build(snapshot_guard.snapshot_version());
708
709                unsafe { (*new_root_ptr).set_root(true) };
710
711                self.root_page_id
712                    .store(PageID::from_pointer(new_root_ptr).raw(), Ordering::Release);
713
714                info!(sibling = sibling_id.raw(), "New root node installed!");
715                Ok(true)
716            }
717        }
718    }
719
720    fn alloc_base_page_and_lock(&self, snapshot_version: u64) -> (PageID, LeafEntryXLocked<'_>) {
721        let (pid, base_entry) = self
722            .mapping_table()
723            .alloc_base_page_mapping(snapshot_version);
724
725        (pid, base_entry)
726    }
727
728    fn try_split_inner<'a>(
729        &self,
730        cur_page: PageID,
731        parent: Option<ReadGuard<'a>>,
732    ) -> Result<(bool, Option<ReadGuard<'a>>), TreeError> {
733        let cur_node = ReadGuard::try_read(cur_page.as_inner_node())?;
734
735        check_parent!(self, cur_page, parent);
736
737        let should_split = cur_node.as_ref().meta.get_split_flag();
738
739        if !should_split {
740            return Ok((false, parent));
741        }
742
743        info!(has_parent = parent.is_some(), "split inner node");
744
745        // CPR snapshot guard
746        let snapshot_guard = match CPRSnapShotMgr::get_snapshot_guard(self.snapshot_mgr.clone()) {
747            Ok(guard) => guard,
748            Err(()) => {
749                return Err(TreeError::NeedRestart);
750            }
751        };
752
753        match parent {
754            Some(p) => {
755                let mut x_cur = cur_node.upgrade().map_err(|(_x, e)| e)?;
756                let mut x_parent = p.upgrade().map_err(|(_x, e)| e)?;
757
758                // CPR snapshot
759                if snapshot_guard.is_protected() {
760                    let local_thread_snapshot_version = snapshot_guard.snapshot_version();
761
762                    match snapshot_guard.get_local_phase_id() {
763                        PhaseId::Rest => {
764                            if x_parent.as_ref().get_clean_snapshot_version()
765                                < local_thread_snapshot_version
766                            {
767                                x_parent
768                                    .as_mut()
769                                    .set_snapshot_version(local_thread_snapshot_version);
770                            }
771
772                            if x_cur.as_ref().get_clean_snapshot_version()
773                                < local_thread_snapshot_version
774                            {
775                                x_cur
776                                    .as_mut()
777                                    .set_snapshot_version(local_thread_snapshot_version);
778                            }
779                        }
780                        PhaseId::Prepare
781                            if x_parent.as_ref().get_clean_snapshot_version()
782                                > local_thread_snapshot_version
783                                || x_cur.as_ref().get_clean_snapshot_version()
784                                    > local_thread_snapshot_version =>
785                        {
786                            // Abort if either page has been taken a snapshot before.
787                            return Err(TreeError::NeedRestart);
788                        }
789                        PhaseId::InProgress | PhaseId::Sweep => {
790                            if x_parent.as_ref().get_clean_snapshot_version()
791                                < local_thread_snapshot_version
792                            {
793                                snapshot_guard.snapshot_inner_node(x_parent.as_ref());
794                                x_parent
795                                    .as_mut()
796                                    .set_snapshot_version(local_thread_snapshot_version);
797
798                                if x_parent.as_ref().is_root() {
799                                    let root_id = self.root_page_id.load(Ordering::Acquire);
800                                    assert_eq!(
801                                        root_id,
802                                        PageID::from_pointer(x_parent.as_ref() as *const InnerNode)
803                                            .raw()
804                                    );
805
806                                    snapshot_guard.snapshot_root_page(PageID::from_pointer(
807                                        x_parent.as_ref() as *const InnerNode,
808                                    ));
809                                }
810                            }
811
812                            if x_cur.as_ref().get_clean_snapshot_version()
813                                < local_thread_snapshot_version
814                            {
815                                snapshot_guard.snapshot_inner_node(x_cur.as_ref());
816                                x_cur
817                                    .as_mut()
818                                    .set_snapshot_version(local_thread_snapshot_version);
819                            }
820                        }
821                        _ => {}
822                    }
823                }
824
825                let split_key = x_cur.as_ref().get_split_key();
826
827                // Guarantee: the new sibling node has the same snapshot version as the current thread.
828                let mut sibling_builder = InnerNodeBuilder::new();
829                sibling_builder.set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE));
830
831                let success = x_parent
832                    .as_mut()
833                    .insert(&split_key, sibling_builder.get_page_id());
834                if !success {
835                    x_parent.as_mut().meta.set_split_flag();
836                    return Err(TreeError::NeedRestart);
837                }
838
839                x_cur
840                    .as_mut()
841                    .split(&mut sibling_builder, snapshot_guard.snapshot_version());
842
843                sibling_builder.build(snapshot_guard.snapshot_version());
844
845                Ok((true, Some(x_parent.downgrade())))
846            }
847            None => {
848                let mut x_cur = cur_node.upgrade().map_err(|(_x, e)| e)?;
849
850                // CPR snapshot
851                if snapshot_guard.is_protected() {
852                    let local_thread_snapshot_version = snapshot_guard.snapshot_version();
853
854                    match snapshot_guard.get_local_phase_id() {
855                        PhaseId::Rest
856                            if x_cur.as_ref().get_clean_snapshot_version()
857                                < local_thread_snapshot_version =>
858                        {
859                            x_cur
860                                .as_mut()
861                                .set_snapshot_version(local_thread_snapshot_version);
862                        }
863                        PhaseId::Prepare
864                            if x_cur.as_ref().get_clean_snapshot_version()
865                                > local_thread_snapshot_version =>
866                        {
867                            // Abort if either page has been taken a snapshot before.
868                            return Err(TreeError::NeedRestart);
869                        }
870                        PhaseId::InProgress | PhaseId::Sweep
871                            if x_cur.as_ref().get_clean_snapshot_version()
872                                < local_thread_snapshot_version =>
873                        {
874                            snapshot_guard.snapshot_inner_node(x_cur.as_ref());
875                            x_cur
876                                .as_mut()
877                                .set_snapshot_version(local_thread_snapshot_version);
878
879                            // snapshot root
880                            let root_id = self.root_page_id.load(Ordering::Acquire);
881                            assert!(x_cur.as_ref().is_root());
882                            assert_eq!(
883                                root_id,
884                                PageID::from_pointer(x_cur.as_ref() as *const InnerNode).raw()
885                            );
886
887                            snapshot_guard.snapshot_root_page(PageID::from_pointer(
888                                x_cur.as_ref() as *const InnerNode
889                            ));
890                        }
891                        _ => {}
892                    }
893                }
894
895                // Guarantee: The new root node has the same snapshot version as the current thread.
896                let mut sibling_builder = InnerNodeBuilder::new();
897                sibling_builder.set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE));
898                let sibling_id = sibling_builder.get_page_id();
899
900                let split_key = x_cur
901                    .as_mut()
902                    .split(&mut sibling_builder, snapshot_guard.snapshot_version());
903                x_cur.as_mut().set_root(false);
904
905                // Guarantee: The new root node has the same snapshot version as the current thread.
906                let mut new_root_builder = InnerNodeBuilder::new();
907                new_root_builder
908                    .set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE))
909                    .set_left_most_page_id(cur_page)
910                    .set_children_is_leaf(false)
911                    .add_record(split_key, sibling_id);
912                sibling_builder.build(snapshot_guard.snapshot_version());
913                let new_root_ptr = new_root_builder.build(snapshot_guard.snapshot_version());
914
915                unsafe { (*new_root_ptr).set_root(true) };
916
917                let _x_root = ReadGuard::try_read(new_root_ptr)
918                    .unwrap()
919                    .upgrade()
920                    .unwrap();
921                self.root_page_id
922                    .store(PageID::from_pointer(new_root_ptr).raw(), Ordering::Release);
923
924                info!(
925                    has_parent = parent.is_some(),
926                    cur = cur_page.raw(),
927                    "finished split inner node"
928                );
929
930                Ok((true, parent))
931            }
932        }
933    }
934
935    pub(crate) fn traverse_to_leaf(
936        &self,
937        key: &[u8],
938        aggressive_split: bool,
939        return_high_fence_key: bool,
940        mut high_fence: Option<&mut Vec<u8>>, // For cache-only mode, the high fence of the leaf node will be returned in this vector
941    ) -> Result<(PageID, Option<ReadGuard<'_>>), TreeError> {
942        let (mut cur_page, mut cur_is_leaf) = self.get_root_page();
943        let mut parent: Option<ReadGuard> = None;
944
945        loop {
946            if aggressive_split {
947                if cur_is_leaf
948                    && !cur_page.is_inner_node_pointer()
949                    && self.try_split_leaf(cur_page, &parent)?
950                {
951                    return Err(TreeError::NeedRestart);
952                } else if !cur_is_leaf {
953                    let (split_success, new_parent) = self.try_split_inner(cur_page, parent)?;
954                    if split_success {
955                        return Err(TreeError::NeedRestart);
956                    } else {
957                        parent = new_parent;
958                    }
959                }
960            }
961
962            if cur_is_leaf {
963                return Ok((cur_page, parent));
964            } else {
965                let next = ReadGuard::try_read(cur_page.as_inner_node())?;
966
967                check_parent!(self, cur_page, parent);
968
969                let next_node = next.as_ref();
970                let next_is_leaf = next_node.meta.children_is_leaf();
971                let pos = next_node.lower_bound(key);
972                let kv_meta = next_node.get_kv_meta(pos as u16);
973                cur_page = next_node.get_value(kv_meta);
974                cur_is_leaf = next_is_leaf;
975
976                // If required and exists, the next key at (pos + 1) is the high fence key
977                if return_high_fence_key
978                    && ((pos + 1) as u16) < next_node.meta.meta_count_with_fence()
979                {
980                    let hk_kv_meta = next_node.get_kv_meta((pos + 1) as u16);
981                    let hk = next_node.get_full_key(hk_kv_meta);
982                    if let Some(v) = high_fence.as_deref_mut() {
983                        v.resize(hk.len(), 0u8);
984                        v.copy_from_slice(&hk);
985                    } else {
986                        panic!("high_fence is None, but return_high_fence_key is true");
987                    }
988                }
989
990                parent = Some(next);
991            }
992        }
993    }
994
995    fn write_inner(&self, write_op: WriteOp, aggressive_split: bool) -> Result<(), TreeError> {
996        let (pid, parent) = self.traverse_to_leaf(write_op.key, aggressive_split, false, None)?;
997
998        let mut leaf_entry = self.mapping_table().get_mut(&pid);
999
1000        check_parent!(self, pid, parent);
1001
1002        let page_loc = leaf_entry.get_page_location();
1003        match page_loc {
1004            PageLocation::Null => {
1005                if !self.cache_only {
1006                    panic!("Found an Null page in non cache-only mode");
1007                }
1008
1009                if write_op.op_type == OpType::Delete {
1010                    return Ok(());
1011                }
1012
1013                // Create a new mini-page to replace the null page
1014                let mini_page_size = LeafNode::get_chain_size_hint(
1015                    write_op.key.len(),
1016                    write_op.value.len(),
1017                    &self.mini_page_size_classes,
1018                    self.cache_only,
1019                );
1020
1021                // CPR snapshot guard for proper snapshot version setting of the new mini-page
1022                let snapshot_guard =
1023                    match CPRSnapShotMgr::get_snapshot_guard(self.snapshot_mgr.clone()) {
1024                        Ok(guard) => guard,
1025                        Err(()) => {
1026                            return Err(TreeError::NeedRestart);
1027                        }
1028                    };
1029
1030                let mini_page_guard = self.storage.alloc_mini_page(mini_page_size)?;
1031                LeafNode::initialize_mini_page(
1032                    &mini_page_guard,
1033                    mini_page_size,
1034                    MiniPageNextLevel::new_null(),
1035                    true,
1036                    snapshot_guard.snapshot_version(),
1037                );
1038                let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
1039                let mini_loc = PageLocation::Mini(new_mini_ptr);
1040
1041                leaf_entry.create_cache_page_loc(mini_loc);
1042
1043                let mini_page_ref = leaf_entry.load_cache_page_mut(new_mini_ptr);
1044                let insert_success =
1045                    mini_page_ref.insert(write_op.key, write_op.value, write_op.op_type, 0);
1046                assert!(insert_success);
1047
1048                debug_assert!(mini_page_ref.meta.meta_count_with_fence() > 0);
1049                counter!(InsertCreatedMiniPage);
1050            }
1051            _ => {
1052                leaf_entry.insert(
1053                    write_op.key,
1054                    write_op.value,
1055                    parent,
1056                    write_op.op_type,
1057                    &self.storage,
1058                    &self.write_load_full_page,
1059                    &self.cache_only,
1060                    &self.mini_page_size_classes,
1061                )?;
1062
1063                if leaf_entry.cache_page_about_to_evict(&self.storage) {
1064                    // we don't care about the result here
1065                    _ = leaf_entry.move_cache_page_to_tail(&self.storage);
1066                }
1067
1068                if let Some(wal) = &self.wal {
1069                    let lsn = wal.append_and_wait(&write_op, leaf_entry.get_disk_offset());
1070                    leaf_entry.update_lsn(lsn);
1071                }
1072            }
1073        }
1074
1075        Ok(())
1076    }
1077
1078    /// Make sure you're not holding any lock while calling this function.
1079    pub(crate) fn evict_from_circular_buffer(&self) -> Result<usize, TreeError> {
1080        // Why we need to evict multiple times?
1081        // because we don't want each alloc to trigger evict, i.e., we want alloc to fail less often.
1082        // with default 1024 bytes, one eviction allows us to alloc 1024 bytes (4 256-byte mini pages) without failure.
1083        const TARGET_EVICT_SIZE: usize = 1024;
1084        let mut evicted = 0;
1085
1086        // A corner case: we may not have enough memory to evict (i.e., the buffer might be empty now)
1087        let mut retry_cnt = 0;
1088
1089        while evicted < TARGET_EVICT_SIZE && retry_cnt < 10 {
1090            let n = self
1091                .storage
1092                .evict_from_buffer(|mini_page_handle: &TombstoneHandle| {
1093                    eviction_callback(mini_page_handle, self)
1094                })?;
1095            evicted += n as usize;
1096            retry_cnt += 1;
1097        }
1098        info!("stopped evict from circular buffer");
1099        Ok(evicted)
1100    }
1101
1102    /// Insert a key-value pair to the system, overrides existing value if present.
1103    ///
1104    /// ```
1105    /// use bf_tree::BfTree;
1106    /// use bf_tree::LeafReadResult;
1107    ///
1108    /// let mut config = bf_tree::Config::default();
1109    /// config.cb_min_record_size(4);
1110    /// let tree = BfTree::with_config(config, None).unwrap();
1111    /// tree.insert(b"key", b"value");
1112    /// let mut buffer = [0u8; 1024];
1113    /// let read_size = tree.read(b"key", &mut buffer);
1114    ///
1115    /// assert_eq!(read_size, LeafReadResult::Found(5));
1116    /// assert_eq!(&buffer[..5], b"value");
1117    /// ```
1118    pub fn insert(&self, key: &[u8], value: &[u8]) -> LeafInsertResult {
1119        // The input key cannot exceed the configured max key length
1120        if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
1121            return LeafInsertResult::InvalidKV(format!("Key too large {}", key.len()));
1122        }
1123
1124        // The input key has to be one byte at least
1125        if key.is_empty() {
1126            return LeafInsertResult::InvalidKV(format!(
1127                "Key too small {}, at least one byte",
1128                key.len()
1129            ));
1130        }
1131
1132        // The input key value pair cannot exceed the configured max record size
1133        if value.len() > MAX_VALUE_LEN || key.len() + value.len() > self.config.cb_max_record_size {
1134            return LeafInsertResult::InvalidKV(format!(
1135                "Record too large {}, {}, please adjust cb_max_record_size in config",
1136                key.len(),
1137                value.len()
1138            ));
1139        }
1140
1141        // The input key value pair cannot be smaller than the configured min record size
1142        if key.len() + value.len() < self.config.cb_min_record_size {
1143            return LeafInsertResult::InvalidKV(format!(
1144                "Record too small {}, {}, please adjust cb_min_record_size in config",
1145                key.len(),
1146                value.len()
1147            ));
1148        }
1149
1150        let backoff = Backoff::new();
1151        let mut aggressive_split = false;
1152
1153        counter!(Insert);
1154        info!(key_len = key.len(), value_len = value.len(), "insert");
1155
1156        loop {
1157            let result = self.write_inner(WriteOp::make_insert(key, value), aggressive_split);
1158            match result {
1159                Ok(_) => return LeafInsertResult::Success,
1160                Err(TreeError::NeedRestart) => {
1161                    #[cfg(all(feature = "shuttle", test))]
1162                    {
1163                        shuttle::thread::yield_now();
1164                    }
1165                    counter!(InsertNeedRestart);
1166                    aggressive_split = true;
1167                }
1168                Err(TreeError::CircularBufferFull) => {
1169                    info!("insert failed, started evict from circular buffer");
1170                    aggressive_split = true;
1171                    counter!(InsertCircularBufferFull);
1172                    _ = self.evict_from_circular_buffer();
1173                    continue;
1174                }
1175                Err(TreeError::Locked) => {
1176                    counter!(InsertLocked);
1177                    backoff.snooze();
1178                }
1179            }
1180        }
1181    }
1182
1183    /// Read a record from the tree.
1184    /// Returns the number of bytes read.
1185    ///
1186    /// TODO: don't panic if the out_buffer is too small, instead returns a error.
1187    ///
1188    /// ```
1189    /// use bf_tree::BfTree;
1190    /// use bf_tree::LeafReadResult;
1191    ///
1192    /// let mut config = bf_tree::Config::default();
1193    /// config.cb_min_record_size(4);
1194    ///
1195    /// let tree = BfTree::with_config(config, None).unwrap();
1196    /// tree.insert(b"key", b"value");
1197    /// let mut buffer = [0u8; 1024];
1198    /// let read_size = tree.read(b"key", &mut buffer);
1199    /// assert_eq!(read_size, LeafReadResult::Found(5));
1200    /// assert_eq!(&buffer[..5], b"value");
1201    /// ```
1202    pub fn read(&self, key: &[u8], out_buffer: &mut [u8]) -> LeafReadResult {
1203        // The input key cannot exceed the configured max key length
1204        if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
1205            return LeafReadResult::InvalidKey;
1206        }
1207
1208        // The input key has to be one byte at least
1209        if key.is_empty() {
1210            return LeafReadResult::InvalidKey;
1211        }
1212
1213        let backoff = Backoff::new();
1214
1215        info!(key_len = key.len(), "read");
1216        counter!(Read);
1217        let mut aggressive_split = false;
1218
1219        #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
1220        let mut debug_timer = DebugTimerGuard::new(Timer::Read, self.metrics_recorder.clone());
1221
1222        loop {
1223            let result = self.read_inner(key, out_buffer, aggressive_split);
1224            match result {
1225                Ok(v) => {
1226                    #[cfg(any(
1227                        feature = "metrics-rt-debug-all",
1228                        feature = "metrics-rt-debug-timer"
1229                    ))]
1230                    debug_timer.end();
1231
1232                    return v;
1233                }
1234                Err(TreeError::CircularBufferFull) => {
1235                    info!("read promotion failed, started evict from circular buffer");
1236                    aggressive_split = true;
1237                    match self.evict_from_circular_buffer() {
1238                        Ok(_) => continue,
1239                        Err(_) => continue,
1240                    };
1241                }
1242                Err(_) => {
1243                    backoff.spin();
1244                    aggressive_split = true;
1245                }
1246            }
1247        }
1248    }
1249
1250    /// Delete a record from the tree.
1251    ///
1252    /// ```
1253    /// use bf_tree::BfTree;
1254    /// use bf_tree::LeafReadResult;
1255    ///
1256    /// let tree = BfTree::default();
1257    /// tree.insert(b"key", b"value");
1258    /// tree.delete(b"key");
1259    /// let mut buffer = [0u8; 1024];
1260    /// let rt = tree.read(b"key", &mut buffer);
1261    /// assert_eq!(rt, LeafReadResult::Deleted);
1262    /// ```
1263    pub fn delete(&self, key: &[u8]) {
1264        // The input key cannot exceed the configured max key length
1265        if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
1266            return;
1267        }
1268
1269        // The input key has to be one byte at least
1270        if key.is_empty() {
1271            return;
1272        }
1273
1274        let backoff = Backoff::new();
1275
1276        info!(key_len = key.len(), "delete");
1277
1278        let mut aggressive_split = false;
1279
1280        loop {
1281            let result = self.write_inner(WriteOp::make_delete(key), aggressive_split);
1282            match result {
1283                Ok(_) => return,
1284                Err(TreeError::CircularBufferFull) => {
1285                    info!("delete failed, started evict from circular buffer");
1286                    aggressive_split = true;
1287                    match self.evict_from_circular_buffer() {
1288                        Ok(_) => continue,
1289                        Err(_) => continue,
1290                    };
1291                }
1292                Err(_) => {
1293                    aggressive_split = true;
1294                    backoff.spin();
1295                }
1296            }
1297        }
1298    }
1299
1300    /// Scan records in the tree, with starting key and desired scan count.
1301    /// Returns a iterator that yields key-value pairs.
1302    pub fn scan_with_count<'a>(
1303        &'a self,
1304        key: &[u8],
1305        cnt: usize,
1306        return_field: ScanReturnField,
1307    ) -> Result<ScanIter<'a, 'a>, ScanIterError> {
1308        // The start key cannot exceed the configured max key length
1309        if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
1310            return Err(ScanIterError::InvalidStartKey);
1311        }
1312
1313        // The input key has to be one byte at least
1314        if key.is_empty() {
1315            return Err(ScanIterError::InvalidStartKey);
1316        }
1317
1318        // The count cannot be zero
1319        if cnt == 0 {
1320            return Err(ScanIterError::InvalidCount);
1321        }
1322
1323        Ok(ScanIter::new_with_scan_count(self, key, cnt, return_field))
1324    }
1325
1326    /// Scan records in the tree, with start and end keys.
1327    /// Returns a iterator that yields key-value pairs.
1328    pub fn scan_with_end_key<'a>(
1329        &'a self,
1330        start_key: &[u8],
1331        end_key: &[u8],
1332        return_field: ScanReturnField,
1333    ) -> Result<ScanIter<'a, 'a>, ScanIterError> {
1334        // The start key cannot exceed the configured max key length
1335        if start_key.len() > self.config.max_fence_len / 2 || start_key.len() > MAX_KEY_LEN {
1336            return Err(ScanIterError::InvalidStartKey);
1337        }
1338
1339        // The input key has to be one byte at least
1340        if start_key.is_empty() {
1341            return Err(ScanIterError::InvalidStartKey);
1342        }
1343
1344        // The end key cannot exceed the configured max key length
1345        if end_key.len() > self.config.max_fence_len / 2 || end_key.len() > MAX_KEY_LEN {
1346            return Err(ScanIterError::InvalidEndKey);
1347        }
1348
1349        // The input key has to be one byte at least
1350        if end_key.is_empty() {
1351            return Err(ScanIterError::InvalidEndKey);
1352        }
1353
1354        // The start key cannot be greater than the end key
1355        let cmp = start_key.cmp(end_key);
1356        if cmp == std::cmp::Ordering::Greater {
1357            return Err(ScanIterError::InvalidKeyRange);
1358        }
1359
1360        Ok(ScanIter::new_with_end_key(
1361            self,
1362            start_key,
1363            end_key,
1364            return_field,
1365        ))
1366    }
1367
1368    #[doc(hidden)]
1369    pub fn scan_mut_with_count<'a>(
1370        &'a self,
1371        key: &'a [u8],
1372        cnt: usize,
1373        return_field: ScanReturnField,
1374    ) -> Result<ScanIterMut<'a, 'a>, ScanIterError> {
1375        // In cache-only mode, scan is not supported
1376        if self.cache_only {
1377            return Err(ScanIterError::CacheOnlyMode);
1378        }
1379
1380        // The start key cannot exceed the configured max key length
1381        if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
1382            return Err(ScanIterError::InvalidStartKey);
1383        }
1384
1385        // The count cannot be zero
1386        if cnt == 0 {
1387            return Err(ScanIterError::InvalidCount);
1388        }
1389
1390        Ok(ScanIterMut::new_with_scan_count(
1391            self,
1392            key,
1393            cnt,
1394            return_field,
1395        ))
1396    }
1397
1398    #[doc(hidden)]
1399    pub fn scan_mut_with_end_key<'a>(
1400        &'a self,
1401        start_key: &'a [u8],
1402        end_key: &'a [u8],
1403        return_field: ScanReturnField,
1404    ) -> Result<ScanIterMut<'a, 'a>, ScanIterError> {
1405        // In cache-only mode, scan is not supported
1406        if self.cache_only {
1407            return Err(ScanIterError::CacheOnlyMode);
1408        }
1409
1410        // The start key cannot exceed the configured max key length
1411        if start_key.len() > self.config.max_fence_len / 2 || start_key.len() > MAX_KEY_LEN {
1412            return Err(ScanIterError::InvalidStartKey);
1413        }
1414
1415        // The end key cannot exceed the configured max key length
1416        if end_key.len() > self.config.max_fence_len / 2 || end_key.len() > MAX_KEY_LEN {
1417            return Err(ScanIterError::InvalidEndKey);
1418        }
1419
1420        Ok(ScanIterMut::new_with_end_key(
1421            self,
1422            start_key,
1423            end_key,
1424            return_field,
1425        ))
1426    }
1427
1428    fn read_inner(
1429        &self,
1430        key: &[u8],
1431        out_buffer: &mut [u8],
1432        aggressive_split: bool,
1433    ) -> Result<LeafReadResult, TreeError> {
1434        let (node, parent) = self.traverse_to_leaf(key, aggressive_split, false, None)?;
1435
1436        let mut leaf = self.mapping_table().get(&node);
1437
1438        check_parent!(self, node, parent);
1439
1440        let out = leaf.read(
1441            key,
1442            out_buffer,
1443            self.config.mini_page_binary_search,
1444            self.cache_only,
1445        );
1446        match out {
1447            ReadResult::Mini(r) | ReadResult::Full(r) => {
1448                if leaf.cache_page_about_to_evict(&self.storage) {
1449                    let mut x_leaf = match leaf.try_upgrade(self.snapshot_mgr.clone(), node) {
1450                        Ok(v) => v,
1451                        Err(_) => return Ok(r),
1452                    };
1453                    // we don't care about the result here, because we are in read path, we don't want to block.
1454                    _ = x_leaf.move_cache_page_to_tail(&self.storage);
1455                }
1456
1457                Ok(r)
1458            }
1459
1460            ReadResult::Base(r) => {
1461                counter!(BasePageRead);
1462
1463                // In cache-only mode, no base page should exist
1464                if self.cache_only {
1465                    panic!("Attempt to read a base page while in cache-only mode.");
1466                }
1467
1468                let v = match r {
1469                    LeafReadResult::Found(v) => v,
1470                    _ => return Ok(r),
1471                };
1472
1473                if parent.is_none() || !self.should_promote_read() {
1474                    return Ok(r);
1475                }
1476
1477                let mut x_leaf = match leaf.try_upgrade(self.snapshot_mgr.clone(), node) {
1478                    Ok(x) => x,
1479                    Err(_) => {
1480                        return Ok(r);
1481                    }
1482                };
1483
1484                if self.config.read_record_cache {
1485                    // we do record cache.
1486                    // we roll dice to see if we should insert this value to mini page.
1487
1488                    let out = x_leaf.insert(
1489                        key,
1490                        &out_buffer[0..v as usize],
1491                        parent,
1492                        OpType::Cache,
1493                        &self.storage,
1494                        &self.write_load_full_page,
1495                        &self.cache_only,
1496                        &self.mini_page_size_classes,
1497                    );
1498
1499                    match out {
1500                        Ok(_) => {
1501                            counter!(ReadPromotionOk);
1502                            Ok(r)
1503                        }
1504                        Err(TreeError::Locked) => {
1505                            // We are doing this very optimistically, if contention happens, we just abort and return.
1506                            counter!(ReadPromotionFailed);
1507                            Ok(r)
1508                        }
1509                        Err(TreeError::CircularBufferFull) => {
1510                            counter!(ReadPromotionFailed);
1511                            Err(TreeError::CircularBufferFull)
1512                        }
1513                        Err(TreeError::NeedRestart) => {
1514                            // If we need restart here, potentially because parent is full.
1515                            counter!(ReadPromotionFailed);
1516                            Err(TreeError::NeedRestart)
1517                        }
1518                    }
1519                } else {
1520                    match self.upgrade_to_full_page(x_leaf, parent.unwrap()) {
1521                        Ok(_) | Err(TreeError::Locked) => Ok(r),
1522                        Err(e) => Err(e),
1523                    }
1524                }
1525            }
1526            ReadResult::None => Ok(LeafReadResult::NotFound),
1527        }
1528    }
1529
1530    fn upgrade_to_full_page<'a>(
1531        &'a self,
1532        mut x_leaf: LeafEntryXLocked<'a>,
1533        parent: ReadGuard<'a>,
1534    ) -> Result<LeafEntryXLocked<'a>, TreeError> {
1535        let page_loc = x_leaf.get_page_location().clone();
1536        match page_loc {
1537            PageLocation::Mini(ptr) => {
1538                let mini_page = x_leaf.load_cache_page_mut(ptr);
1539                let h = self.storage.begin_dealloc_mini_page(mini_page)?;
1540                let _merge_result = x_leaf.try_merge_mini_page(&h, parent, &self.storage)?;
1541                let base_offset = mini_page.next_level;
1542                x_leaf.change_to_base_loc();
1543                self.storage.finish_dealloc_mini_page(h);
1544
1545                let base_page_ref = x_leaf.load_base_page_from_buffer();
1546                let full_page_loc =
1547                    upgrade_to_full_page(&self.storage, base_page_ref, base_offset)?;
1548                x_leaf.create_cache_page_loc(full_page_loc);
1549                Ok(x_leaf)
1550            }
1551            PageLocation::Full(_ptr) => Ok(x_leaf),
1552            PageLocation::Base(offset) => {
1553                let base_page_ref = x_leaf.load_base_page(offset);
1554                let next_level = MiniPageNextLevel::new(offset);
1555                let full_page_loc = upgrade_to_full_page(&self.storage, base_page_ref, next_level)?;
1556                x_leaf.create_cache_page_loc(full_page_loc);
1557                Ok(x_leaf)
1558            }
1559            PageLocation::Null => panic!("upgrade_to_full_page on Null page"),
1560        }
1561    }
1562
1563    /// Collect all metrics and reset the metric recorder
1564    /// The caller needs to ensure there are no references to the bf-tree's metrics recorder anymore.
1565    pub fn get_metrics(&mut self) -> Option<serde_json::Value> {
1566        #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
1567        {
1568            let recorder = self.metrics_recorder.take();
1569            match recorder {
1570                Some(r) => {
1571                    let recorders = Arc::try_unwrap(r).expect("Failed to obtain the recorders of bf-tree, please make sure no other references exist.");
1572                    let mut timer_accumulated = TimerRecorder::default();
1573
1574                    // Only collect timer metrics for now
1575                    for r in recorders {
1576                        let t = unsafe { &*r.get() };
1577
1578                        timer_accumulated += t.timers.clone();
1579                    }
1580
1581                    let output = serde_json::json!({
1582                        "Timers": timer_accumulated,
1583                    });
1584
1585                    self.metrics_recorder = Some(Arc::new(ThreadLocal::new()));
1586
1587                    Some(output)
1588                }
1589                None => None,
1590            }
1591        }
1592        #[cfg(not(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer")))]
1593        {
1594            None
1595        }
1596    }
1597}
1598
1599pub(crate) fn key_value_physical_size(key: &[u8], value: &[u8]) -> usize {
1600    let key_size = key.len();
1601    let value_size = value.len();
1602    let meta_size = crate::nodes::KV_META_SIZE;
1603    key_size + value_size + meta_size
1604}
1605
1606pub(crate) fn eviction_callback(
1607    mini_page_handle: &TombstoneHandle,
1608    tree: &BfTree,
1609) -> Result<(), TreeError> {
1610    let mini_page = mini_page_handle.ptr as *mut LeafNode;
1611    let key_to_this_page = if tree.cache_only {
1612        unsafe { &*mini_page }.try_get_key_to_reach_this_node()?
1613    } else {
1614        unsafe { &*mini_page }.get_key_to_reach_this_node()
1615    };
1616
1617    // Here we need to set aggressive split to true, because we would split parent node due to leaf split.
1618    let (pid, parent) = tree.traverse_to_leaf(&key_to_this_page, true, false, None)?;
1619    info!(
1620        pid = pid.raw(),
1621        "starting to merge mini page in eviction call back"
1622    );
1623
1624    let mut leaf_entry = tree.mapping_table().get_mut(&pid);
1625
1626    histogram!(EvictNodeSize, unsafe { &*mini_page }.meta.node_size as u64);
1627
1628    match leaf_entry.get_page_location() {
1629        PageLocation::Mini(ptr) => {
1630            {
1631                // In order to lock this node, we need to traverse to this node first;
1632                // but in order to traverse this node, we need to read the keys in this node;
1633                // in order to read the keys in this node, we need to lock this node.
1634                //
1635                // Because we didn't lock the node while reading `key_to_this_page`,
1636                // we need to recheck if the node is still the same node.
1637                if *ptr != mini_page {
1638                    return Err(TreeError::NeedRestart);
1639                }
1640            }
1641
1642            let parent = parent.expect("Mini page must have a parent");
1643            parent.check_version()?;
1644
1645            // In the case of cache_only, the correponding mapping table entry of the mini-page
1646            // is replaced by a non-existant base page
1647            if tree.cache_only {
1648                leaf_entry.change_to_null_loc();
1649            } else {
1650                leaf_entry.try_merge_mini_page(mini_page_handle, parent, &tree.storage)?;
1651                leaf_entry.change_to_base_loc();
1652                // we don't need to dealloc the old_mini_page here because we are in eviction callback.
1653            }
1654
1655            Ok(())
1656        }
1657
1658        PageLocation::Full(ptr) => {
1659            if *ptr != mini_page {
1660                return Err(TreeError::NeedRestart);
1661            }
1662
1663            leaf_entry.merge_full_page(mini_page_handle);
1664            Ok(())
1665        }
1666
1667        // This means the key read from the mini page is corrupted and points to a different page
1668        PageLocation::Base(_offset) => Err(TreeError::NeedRestart),
1669
1670        // This means the key read from the mini page is corrupted and points to a different page
1671        PageLocation::Null => Err(TreeError::NeedRestart),
1672    }
1673}
1674
1675#[cfg(test)]
1676mod tests {
1677    use crate::error::ConfigError;
1678    use crate::BfTree;
1679
1680    #[test]
1681    fn test_mini_page_size_classes() {
1682        let mut size_classes = BfTree::create_mem_page_size_classes(48, 1952, 4096, 64, false);
1683        assert_eq!(
1684            size_classes,
1685            vec![128, 192, 256, 512, 960, 1856, 2048, 4096]
1686        );
1687
1688        size_classes = BfTree::create_mem_page_size_classes(1544, 1544, 3136, 64, true);
1689        assert_eq!(size_classes, vec![1536, 3136]);
1690
1691        size_classes = BfTree::create_mem_page_size_classes(48, 3072, 12288, 64, false);
1692        assert_eq!(
1693            size_classes,
1694            vec![128, 192, 256, 512, 960, 1856, 3648, 7232, 9088, 12288]
1695        );
1696
1697        size_classes = BfTree::create_mem_page_size_classes(4, 1952, 4096, 32, false);
1698        assert_eq!(size_classes, vec![64, 128, 256, 448, 832, 1600, 2048, 4096]);
1699    }
1700
1701    #[test]
1702    fn test_invalid_config_to_build_bf_tree() {
1703        // Min record too small
1704        let mut config = crate::Config::default();
1705        config.cb_min_record_size(4);
1706        config.leaf_page_size(32 * 1024);
1707
1708        if let Err(e) = BfTree::with_config(config.clone(), None) {
1709            match e {
1710                ConfigError::MinimumRecordSize(_) => {}
1711                _ => panic!("Expected InvalidMinimumRecordSize error"),
1712            }
1713        } else {
1714            panic!("Expected error but got Ok");
1715        }
1716
1717        // Max record too large
1718        config = crate::Config::default();
1719        config.cb_max_record_size(64 * 1024);
1720
1721        if let Err(e) = BfTree::with_config(config.clone(), None) {
1722            match e {
1723                ConfigError::MaximumRecordSize(_) => {}
1724                _ => panic!("Expected InvalidMaximumRecordSize error"),
1725            }
1726        } else {
1727            panic!("Expected error but got Ok");
1728        }
1729
1730        // Leaf page size not aligned
1731        config = crate::Config::default();
1732        config.leaf_page_size(4050);
1733
1734        if let Err(e) = BfTree::with_config(config.clone(), None) {
1735            match e {
1736                ConfigError::LeafPageSize(_) => {}
1737                _ => panic!("Expected InvalidLeafPageSize error"),
1738            }
1739        } else {
1740            panic!("Expected error but got Ok");
1741        }
1742
1743        // Circular buffer size too small
1744        config = crate::Config::default();
1745        config.leaf_page_size(16 * 1024);
1746        config.cb_size_byte(16 * 1024);
1747
1748        if let Err(e) = BfTree::with_config(config.clone(), None) {
1749            match e {
1750                ConfigError::CircularBufferSize(_) => {}
1751                _ => panic!("Expected InvalidCircularBufferSize error"),
1752            }
1753        } else {
1754            panic!("Expected error but got Ok");
1755        }
1756
1757        // Circular buffer size not power of two
1758        config = crate::Config::default();
1759        config.cb_size_byte(20 * 1024);
1760        if let Err(e) = BfTree::with_config(config.clone(), None) {
1761            match e {
1762                ConfigError::CircularBufferSize(_) => {}
1763                _ => panic!("Expected InvalidCircularBufferSize error"),
1764            }
1765        } else {
1766            panic!("Expected error but got Ok");
1767        }
1768
1769        // Cache-only mode specific
1770        config = crate::Config::default();
1771        config.cache_only(true);
1772        config.cb_size_byte(2 * 4096);
1773
1774        if let Err(e) = BfTree::with_config(config.clone(), None) {
1775            match e {
1776                ConfigError::CircularBufferSize(_) => {}
1777                _ => panic!("Expected InvalidCircularBufferSize error"),
1778            }
1779        } else {
1780            panic!("Expected error but got Ok");
1781        }
1782    }
1783}