bf_tree/
tree.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT license.
3
4#[cfg(not(all(feature = "shuttle", test)))]
5use rand::Rng;
6
7#[cfg(all(feature = "shuttle", test))]
8use shuttle::rand::Rng;
9
10use cfg_if::cfg_if;
11
12cfg_if! {
13    if #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))] {
14        use crate::metric::{Timer, TlsRecorder, timer::{TimerRecorder, DebugTimerGuard}};
15        use std::cell::UnsafeCell;
16        use thread_local::ThreadLocal;
17    }
18}
19
20use crate::{
21    check_parent,
22    circular_buffer::{CircularBufferMetrics, TombstoneHandle},
23    counter,
24    error::TreeError,
25    histogram, info,
26    mini_page_op::{upgrade_to_full_page, LeafEntryXLocked, LeafOperations, ReadResult},
27    nodes::{
28        leaf_node::{LeafKVMeta, LeafReadResult, MiniPageNextLevel, OpType},
29        InnerNode, InnerNodeBuilder, LeafNode, PageID, CACHE_LINE_SIZE, DISK_PAGE_SIZE,
30        INNER_NODE_SIZE, MAX_KEY_LEN, MAX_LEAF_PAGE_SIZE, MAX_VALUE_LEN,
31    },
32    range_scan::{ScanIter, ScanIterMut},
33    storage::{LeafStorage, PageLocation, PageTable},
34    sync::{
35        atomic::{AtomicU64, Ordering},
36        Arc,
37    },
38    utils::{get_rng, inner_lock::ReadGuard, Backoff, BfsVisitor, NodeInfo},
39    wal::{WriteAheadLog, WriteOp},
40    Config, StorageBackend,
41};
42use std::path::Path;
43
44/// The bf-tree instance
45pub struct BfTree {
46    pub(crate) root_page_id: AtomicU64,
47    pub(crate) storage: LeafStorage,
48    pub(crate) wal: Option<Arc<WriteAheadLog>>,
49    pub(crate) config: Arc<Config>,
50    pub(crate) write_load_full_page: bool,
51    pub(crate) cache_only: bool, // If true, there is no permenant storage layer thus no durability guarantee of any upsert in the tree
52    pub(crate) mini_page_size_classes: Vec<usize>, // Size classes of mini pages
53    #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
54    pub metrics_recorder: Option<Arc<ThreadLocal<UnsafeCell<TlsRecorder>>>>, // Per-tree metrics recorder under "metrics-rt-debug" feature
55}
56
57unsafe impl Sync for BfTree {}
58
59unsafe impl Send for BfTree {}
60
61pub enum LeafInsertResult {
62    Success,
63    InvalidKV(String),
64}
65
66impl Drop for BfTree {
67    fn drop(&mut self) {
68        if let Some(ref wal) = self.wal {
69            wal.stop_background_job();
70        }
71
72        let visitor = BfsVisitor::new_all_nodes(self);
73        for node_info in visitor {
74            match node_info {
75                NodeInfo::Leaf { page_id, .. } => {
76                    let mut leaf = self.mapping_table().get_mut(&page_id);
77                    leaf.dealloc_self(&self.storage, self.cache_only);
78                }
79                NodeInfo::Inner { ptr, .. } => {
80                    if unsafe { &*ptr }.is_valid_disk_offset() {
81                        let disk_offset = unsafe { &*ptr }.disk_offset;
82                        if self.config.storage_backend == StorageBackend::Memory {
83                            // special case for memory backend, we need to deallocate the memory.
84                            self.storage.vfs.dealloc_offset(disk_offset as usize);
85                        }
86                    }
87                    InnerNode::free_node(ptr as *mut InnerNode);
88                }
89            }
90        }
91    }
92}
93
94impl Default for BfTree {
95    fn default() -> Self {
96        Self::new(":memory:", 1024 * 1024 * 32)
97    }
98}
99
100impl BfTree {
101    pub(crate) const ROOT_IS_LEAF_MASK: u64 = 0x8000_0000_0000_0000; // This is quite error-prone, make sure the mask is not conflicting with the page id definition.
102
103    /// Create the size classes of all memory pages in acending order based on the record size (key + value) and the leaf page size
104    /// [s_0, s_1, ..., s_x], ascending order.
105    /// Each s_i is of size 2^i * c + size_of(LeafNode)
106    /// where c = (min_record_size + LeafKVMeta) aligned on CACHE_LINE_SIZE and 2^i * c <= leaf_page_size and s_x = leaf_page_size
107    ///
108    /// In non cache-only mode, the largest mini page size is s_(x-1) and full page/leaf base page size is s_x.
109    /// Currently, the design assumes a mini-page can always be successfully merged into a leaf page in one pass (including at most one base page split).
110    /// As such, the following three conditions are sufficient in preventing merge failures.
111    /// C1) x >= 1
112    /// C2) s_x > s_{x-1}
113    /// C3) max_record_size + SizeOf(KVMeta) <= (s_x - s_{x-1} - 2 * (fence_meta + max_key_len)).
114    ///
115    /// In cache-only mode, the largest mini page is s_x as there is no full nor base page.
116    /// Although there is no merging of mini-pages, the design assumes a new record can always be successfully inserted into a mini-page in one pass
117    /// (including at most one mini-page split). As such, the following sufficient condition is required.
118    /// C1) max_record_page + Sizeof(KVMeta) <= (s_x - SizeOf(NodeMeta)) / 2
119    /// C2) if x >= 1, s_x >= s_{x-1} + max_record_size + SizeOf(KVMeta)
120    ///
121    /// C2) is necessary for cache-only mode to guarantee that a mini-page can grow to full page size before being split up to two full-sized pages
122    pub(crate) fn create_mem_page_size_classes(
123        min_record_size_in_byte: usize,
124        max_record_size_in_byte: usize,
125        leaf_page_size_in_byte: usize,
126        max_fence_len_in_byte: usize,
127        cache_only: bool,
128    ) -> Vec<usize> {
129        // Sanity check of the input parameters
130        assert!(
131            min_record_size_in_byte > 1,
132            "cb_min_record_size in config cannot be less than 2"
133        );
134        assert!(
135            min_record_size_in_byte <= max_record_size_in_byte,
136            "cb_min_record_size cannot be larger than cb_max_record_size"
137        );
138        assert!(
139            max_fence_len_in_byte > 0,
140            "max_fence_len in config cannot be zero"
141        );
142        assert!(
143            max_fence_len_in_byte / 2 < max_record_size_in_byte,
144            "max_fence_len/2 cannot be larger than cb_max_record_size"
145        );
146        assert!(
147            leaf_page_size_in_byte <= MAX_LEAF_PAGE_SIZE,
148            "leaf_page_size in config cannot be larger than {}",
149            MAX_LEAF_PAGE_SIZE
150        );
151        assert!(
152            max_fence_len_in_byte / 2 <= MAX_KEY_LEN,
153            "max_key_len in config cannot be larger than {}",
154            MAX_KEY_LEN
155        );
156
157        // In non cache-only mode, the leaf page should be in the multiple of DISK_PAGE_SIZE.
158        // In cache-only mode, the leaf page should be aligned on cache line size.
159        if !cache_only {
160            assert!(
161                leaf_page_size_in_byte.is_multiple_of(DISK_PAGE_SIZE),
162                "leaf_page_size in config should be multiple of {}",
163                DISK_PAGE_SIZE
164            );
165        } else {
166            assert!(
167                leaf_page_size_in_byte.is_multiple_of(CACHE_LINE_SIZE),
168                "leaf_page_size in config should be multiple of {}",
169                CACHE_LINE_SIZE
170            );
171        }
172
173        let max_record_size_with_meta = max_record_size_in_byte + std::mem::size_of::<LeafKVMeta>();
174        let mut max_mini_page_size: usize;
175
176        if cache_only {
177            // Guarantee C1), C2) for cache-only mode
178            max_mini_page_size = leaf_page_size_in_byte - max_record_size_with_meta;
179            max_mini_page_size = (max_mini_page_size / CACHE_LINE_SIZE) * CACHE_LINE_SIZE;
180
181            assert!(
182                leaf_page_size_in_byte
183                    >= 2 * max_record_size_with_meta + std::mem::size_of::<LeafNode>(),
184                "cb_max_record_size of config should be <= {}",
185                (leaf_page_size_in_byte - std::mem::size_of::<LeafNode>()) / 2
186                    - std::mem::size_of::<LeafKVMeta>()
187            );
188        } else {
189            // Guarantee C1), C2) and C3) for non cache-only mode
190            max_mini_page_size = leaf_page_size_in_byte
191                - max_record_size_with_meta
192                - max_fence_len_in_byte
193                - 2 * std::mem::size_of::<LeafKVMeta>();
194            max_mini_page_size = (max_mini_page_size / CACHE_LINE_SIZE) * CACHE_LINE_SIZE;
195
196            assert!(
197                max_mini_page_size >= max_record_size_with_meta + std::mem::size_of::<LeafNode>(),
198                "cb_max_record_size of config should be <= {}",
199                max_mini_page_size
200                    - std::mem::size_of::<LeafNode>()
201                    - std::mem::size_of::<LeafKVMeta>()
202            );
203        }
204
205        // Generate all size classes for mini-pages
206        let mut mem_page_size_classes = Vec::new();
207
208        let base: i32 = 2;
209        let mut x: u32 = 0;
210        let c: usize = min_record_size_in_byte + std::mem::size_of::<LeafKVMeta>();
211
212        // No need to consider fence here as mini-pages have no fences.
213        let mut size_class = base.pow(x) as usize * c + std::mem::size_of::<LeafNode>();
214
215        // Memory page size is aligned on cache line size
216        if !size_class.is_multiple_of(CACHE_LINE_SIZE) {
217            size_class = (size_class / CACHE_LINE_SIZE + 1) * CACHE_LINE_SIZE;
218        }
219
220        while size_class <= max_mini_page_size {
221            mem_page_size_classes.push(size_class);
222
223            x += 1;
224            size_class = base.pow(x) as usize * c + std::mem::size_of::<LeafNode>();
225
226            if !size_class.is_multiple_of(CACHE_LINE_SIZE) {
227                size_class = (size_class / CACHE_LINE_SIZE + 1) * CACHE_LINE_SIZE;
228            }
229        }
230
231        if !cache_only {
232            assert!(x > 0);
233        }
234
235        // Add the largest mini page size if not already added
236        if x == 0 || mem_page_size_classes[(x - 1) as usize] < max_mini_page_size {
237            mem_page_size_classes.push(max_mini_page_size);
238            x += 1;
239        }
240
241        // The largest page size is the full leaf page size
242        if x == 0 || mem_page_size_classes[(x - 1) as usize] < leaf_page_size_in_byte {
243            mem_page_size_classes.push(leaf_page_size_in_byte);
244            x += 1;
245        }
246
247        if !cache_only {
248            assert!(x >= 2);
249        } else {
250            assert!(x >= 1);
251        }
252
253        mem_page_size_classes
254    }
255
256    /// Create a new bf-tree instance with customized storage backend and
257    /// circular buffer size
258    ///
259    /// For in-memory tree, use `:memory:` as file path.
260    /// For cache-only tree, use `:cache:` as file path
261    /// For disk tree, file_path is the path to the index file
262    ///
263    /// Mini page cache must be at least 8192 bytes for practical workloads.
264    ///
265    /// ```
266    /// use bf_tree::BfTree;
267    /// let tree = BfTree::new(":memory:", 8192);
268    /// ```
269    pub fn new(file_path: impl AsRef<Path>, cache_size_byte: usize) -> Self {
270        let config = Config::new(file_path, cache_size_byte);
271        Self::with_config(config, None)
272    }
273
274    /// Create a new bf-tree instance with customized configuration based on
275    /// a config file
276    pub fn new_with_config_file<P: AsRef<Path>>(config_file_path: P) -> Self {
277        let config = Config::new_with_config_file(config_file_path);
278        Self::with_config(config, None)
279    }
280
281    /// Initialize the bf-tree with provided config. For advanced user only.
282    /// An optional pre-allocated buffer pointer can be provided to use as the buffer pool memory.
283    pub fn with_config(config: Config, buffer_ptr: Option<*mut u8>) -> Self {
284        let wal = match config.write_ahead_log.as_ref() {
285            Some(wal_config) => {
286                let wal = WriteAheadLog::new(wal_config.clone());
287                Some(wal)
288            }
289            None => None,
290        };
291        let write_load_full = config.write_load_full_page;
292        let config = Arc::new(config);
293
294        // In cache-only mode, the circular buffer size >= 4 * leaf_page_size
295        // This is because during page split, two full sized mini-page need to be in memory
296        // Note that 2 * leaf_page_size only guarantees one full sized mini-page due to AllocMeta
297        //
298        // In non cache-only mode, the circular buffer size >= 2 * leaf_page_size
299        // As at most one full sized leaf page is required
300        if config.cache_only {
301            assert!(
302                config.cb_size_byte >= 4 * config.leaf_page_size,
303                "In cache-only mode, cb_size_byte should be at least 4 times of leaf_page_size"
304            );
305        } else {
306            assert!(
307                config.cb_size_byte >= 2 * config.leaf_page_size,
308                "In non cache-only mode, cb_size_byte should be at least 2 times of leaf_page_size"
309            );
310        }
311
312        // In cache-only mode, the initial root page is a full mini-page
313        if config.cache_only {
314            let leaf_storage = LeafStorage::new(config.clone(), buffer_ptr);
315
316            // Assuming CB is at least 4KB in size
317            let mini_page_guard = (leaf_storage)
318                .alloc_mini_page(config.leaf_page_size)
319                .expect("Fail to allocate a mini-page as initial root node");
320            LeafNode::initialize_mini_page(
321                &mini_page_guard,
322                config.leaf_page_size,
323                MiniPageNextLevel::new_null(),
324                true,
325            );
326            let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
327            let mini_loc = PageLocation::Mini(new_mini_ptr);
328
329            let (root_id, root_lock) = leaf_storage
330                .mapping_table()
331                .insert_mini_page_mapping(mini_loc);
332            assert_eq!(root_id.as_id(), 0);
333
334            drop(root_lock);
335            drop(mini_page_guard);
336            let root_id = root_id.raw() | Self::ROOT_IS_LEAF_MASK;
337
338            return Self {
339                root_page_id: AtomicU64::new(root_id),
340                storage: leaf_storage,
341                wal,
342                cache_only: config.cache_only,
343                write_load_full_page: write_load_full,
344                mini_page_size_classes: Self::create_mem_page_size_classes(
345                    config.cb_min_record_size,
346                    config.cb_max_record_size,
347                    config.leaf_page_size,
348                    config.max_fence_len,
349                    config.cache_only,
350                ),
351                config,
352                #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
353                metrics_recorder: Some(Arc::new(ThreadLocal::new())),
354            };
355        }
356
357        let leaf_storage = LeafStorage::new(config.clone(), buffer_ptr);
358        let (root_id, root_lock) = leaf_storage.mapping_table().alloc_base_page_mapping();
359        drop(root_lock);
360        assert_eq!(root_id.as_id(), 0);
361
362        let root_id = root_id.raw() | Self::ROOT_IS_LEAF_MASK;
363        Self {
364            root_page_id: AtomicU64::new(root_id),
365            storage: leaf_storage,
366            wal,
367            cache_only: config.cache_only,
368            write_load_full_page: write_load_full,
369            mini_page_size_classes: Self::create_mem_page_size_classes(
370                config.cb_min_record_size,
371                config.cb_max_record_size,
372                config.leaf_page_size,
373                config.max_fence_len,
374                config.cache_only,
375            ),
376            config,
377            #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
378            metrics_recorder: Some(Arc::new(ThreadLocal::new())),
379        }
380    }
381
382    /// Get the buffer metrics of the circular buffer.
383    /// This is a blocking call, will stop all other buffer operations,
384    /// use it wisely.
385    pub fn get_buffer_metrics(&self) -> CircularBufferMetrics {
386        self.storage.get_buffer_metrics()
387    }
388
389    /// returns the root page id and whether it is a leaf node.
390    pub(crate) fn get_root_page(&self) -> (PageID, bool) {
391        let root_page_id = self.root_page_id.load(Ordering::Acquire);
392        let root_is_leaf = (root_page_id & Self::ROOT_IS_LEAF_MASK) != 0;
393        let clean = root_page_id & (!Self::ROOT_IS_LEAF_MASK);
394
395        let page_id = if root_is_leaf {
396            PageID::from_id(clean)
397        } else {
398            PageID::from_pointer(clean as *const InnerNode)
399        };
400
401        (page_id, root_is_leaf)
402    }
403
404    pub(crate) fn mapping_table(&self) -> &PageTable {
405        self.storage.mapping_table()
406    }
407
408    pub(crate) fn should_promote_read(&self) -> bool {
409        get_rng().gen_range(0..100) < self.config.read_promotion_rate.load(Ordering::Relaxed)
410    }
411
412    pub(crate) fn should_promote_scan_page(&self) -> bool {
413        get_rng().gen_range(0..100) < self.config.scan_promotion_rate.load(Ordering::Relaxed)
414    }
415
416    /// Chance% to promote a base read record to mini page.
417    pub fn update_read_promotion_rate(&self, new_rate: usize) {
418        self.config
419            .read_promotion_rate
420            .store(new_rate, Ordering::Relaxed);
421    }
422
423    fn try_split_leaf(
424        &self,
425        cur_page_id: PageID,
426        parent: &Option<ReadGuard>,
427    ) -> Result<bool, TreeError> {
428        debug_assert!(cur_page_id.is_id());
429
430        // here we need to acquire x-lock for performance reason:
431        // if we acquire s-lock, it's very difficult for us to later upgrade to x-lock, because rwlock favors readers:
432        //      consider readers keep coming, we will never be able to upgrade to x-lock.
433        let mut cur_page = self.mapping_table().get_mut(&cur_page_id);
434
435        check_parent!(self, cur_page_id, parent);
436
437        let should_split = cur_page.get_split_flag();
438        if !should_split {
439            return Ok(false);
440        }
441        match parent {
442            Some(_) => {
443                unreachable!("Leaf node split should not happen here");
444            }
445            None => {
446                // only for the case of root node split
447
448                // In cache-only mode, the root mini-page node is split into two equal-sized mini-pages
449                if self.cache_only {
450                    // Create a new mini-page of the same size as the current root node
451                    // Assuming CB is at least able to hold two leaf-page sized mini-pages
452                    let mini_page_guard = self
453                        .storage
454                        .alloc_mini_page(self.config.leaf_page_size)
455                        .expect("Fail to allocate a mini-page during root split");
456                    LeafNode::initialize_mini_page(
457                        &mini_page_guard,
458                        self.config.leaf_page_size,
459                        MiniPageNextLevel::new_null(),
460                        true,
461                    );
462                    let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
463                    let mini_loc = PageLocation::Mini(new_mini_ptr);
464
465                    // Insert the new page into mapping table
466                    let (sibling_id, _mini_lock) = self
467                        .storage
468                        .mapping_table()
469                        .insert_mini_page_mapping(mini_loc);
470
471                    // Split current page with the newly created mini page
472                    let cur_page_loc = cur_page.get_page_location().clone();
473                    match cur_page_loc {
474                        PageLocation::Mini(ptr) => {
475                            let cur_mini_page = cur_page.load_cache_page_mut(ptr);
476                            let sibling_page = unsafe { &mut *new_mini_ptr };
477                            let split_key = cur_mini_page.split(sibling_page, true);
478
479                            let mut new_root_builder = InnerNodeBuilder::new();
480                            new_root_builder
481                                .set_left_most_page_id(cur_page_id)
482                                .set_children_is_leaf(true)
483                                .add_record(split_key, sibling_id);
484
485                            let new_root_ptr = new_root_builder.build();
486
487                            self.root_page_id
488                                .store(PageID::from_pointer(new_root_ptr).raw(), Ordering::Release);
489
490                            info!(sibling = sibling_id.raw(), "New root node installed!");
491
492                            debug_assert!(cur_mini_page.meta.meta_count_with_fence() > 0);
493                            debug_assert!(sibling_page.meta.meta_count_with_fence() > 0);
494
495                            return Ok(true);
496                        }
497                        _ => {
498                            panic!("The root node is not a mini-page in cache-only mode")
499                        }
500                    }
501                }
502
503                let mut x_page = cur_page;
504
505                let (sibling_id, mut sibling_entry) = self.alloc_base_page_and_lock();
506
507                info!(sibling = sibling_id.raw(), "Splitting root node!");
508
509                let sibling = sibling_entry.load_base_page_mut();
510
511                let leaf_node = x_page.load_base_page_mut();
512                let split_key = leaf_node.split(sibling, false);
513
514                let mut new_root_builder = InnerNodeBuilder::new();
515                new_root_builder
516                    .set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE))
517                    .set_left_most_page_id(cur_page_id)
518                    .set_children_is_leaf(true)
519                    .add_record(split_key, sibling_id);
520
521                let new_root_ptr = new_root_builder.build();
522
523                self.root_page_id
524                    .store(PageID::from_pointer(new_root_ptr).raw(), Ordering::Release);
525
526                info!(sibling = sibling_id.raw(), "New root node installed!");
527                Ok(true)
528            }
529        }
530    }
531
532    fn alloc_base_page_and_lock(&self) -> (PageID, LeafEntryXLocked<'_>) {
533        let (pid, base_entry) = self.mapping_table().alloc_base_page_mapping();
534
535        (pid, base_entry)
536    }
537
538    fn try_split_inner<'a>(
539        &self,
540        cur_page: PageID,
541        parent: Option<ReadGuard<'a>>,
542    ) -> Result<(bool, Option<ReadGuard<'a>>), TreeError> {
543        let cur_node = ReadGuard::try_read(cur_page.as_inner_node())?;
544
545        check_parent!(self, cur_page, parent);
546
547        let should_split = cur_node.as_ref().meta.get_split_flag();
548
549        if !should_split {
550            return Ok((false, parent));
551        }
552
553        info!(has_parent = parent.is_some(), "split inner node");
554
555        match parent {
556            Some(p) => {
557                let mut x_cur = cur_node.upgrade().map_err(|(_x, e)| e)?;
558                let mut x_parent = p.upgrade().map_err(|(_x, e)| e)?;
559
560                let split_key = x_cur.as_ref().get_split_key();
561
562                let mut sibling_builder = InnerNodeBuilder::new();
563                sibling_builder.set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE));
564
565                let success = x_parent
566                    .as_mut()
567                    .insert(&split_key, sibling_builder.get_page_id());
568                if !success {
569                    x_parent.as_mut().meta.set_split_flag();
570                    return Err(TreeError::NeedRestart);
571                }
572
573                x_cur.as_mut().split(&mut sibling_builder);
574
575                sibling_builder.build();
576
577                Ok((true, Some(x_parent.downgrade())))
578            }
579            None => {
580                let mut x_cur = cur_node.upgrade().map_err(|(_x, e)| e)?;
581
582                let mut sibling_builder = InnerNodeBuilder::new();
583                sibling_builder.set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE));
584                let sibling_id = sibling_builder.get_page_id();
585
586                let split_key = x_cur.as_mut().split(&mut sibling_builder);
587
588                let mut new_root_builder = InnerNodeBuilder::new();
589                new_root_builder
590                    .set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE))
591                    .set_left_most_page_id(cur_page)
592                    .set_children_is_leaf(false)
593                    .add_record(split_key, sibling_id);
594                sibling_builder.build();
595                let new_root_ptr = new_root_builder.build();
596                let _x_root = ReadGuard::try_read(new_root_ptr)
597                    .unwrap()
598                    .upgrade()
599                    .unwrap();
600                self.root_page_id
601                    .store(PageID::from_pointer(new_root_ptr).raw(), Ordering::Release);
602
603                info!(
604                    has_parent = parent.is_some(),
605                    cur = cur_page.raw(),
606                    "finished split inner node"
607                );
608
609                Ok((true, parent))
610            }
611        }
612    }
613
614    pub(crate) fn traverse_to_leaf(
615        &self,
616        key: &[u8],
617        aggressive_split: bool,
618    ) -> Result<(PageID, Option<ReadGuard<'_>>), TreeError> {
619        let (mut cur_page, mut cur_is_leaf) = self.get_root_page();
620        let mut parent: Option<ReadGuard> = None;
621
622        loop {
623            if aggressive_split {
624                if cur_is_leaf
625                    && !cur_page.is_inner_node_pointer()
626                    && self.try_split_leaf(cur_page, &parent)?
627                {
628                    return Err(TreeError::NeedRestart);
629                } else if !cur_is_leaf {
630                    let (split_success, new_parent) = self.try_split_inner(cur_page, parent)?;
631                    if split_success {
632                        return Err(TreeError::NeedRestart);
633                    } else {
634                        parent = new_parent;
635                    }
636                }
637            }
638
639            if cur_is_leaf {
640                return Ok((cur_page, parent));
641            } else {
642                let next = ReadGuard::try_read(cur_page.as_inner_node())?;
643
644                check_parent!(self, cur_page, parent);
645
646                let next_node = next.as_ref();
647                let next_is_leaf = next_node.meta.children_is_leaf();
648                let pos = next_node.lower_bound(key);
649                let kv_meta = next_node.get_kv_meta(pos as u16);
650                cur_page = next_node.get_value(kv_meta);
651                cur_is_leaf = next_is_leaf;
652                parent = Some(next);
653            }
654        }
655    }
656
657    fn write_inner(&self, write_op: WriteOp, aggressive_split: bool) -> Result<(), TreeError> {
658        let (pid, parent) = self.traverse_to_leaf(write_op.key, aggressive_split)?;
659
660        let mut leaf_entry = self.mapping_table().get_mut(&pid);
661
662        check_parent!(self, pid, parent);
663
664        let page_loc = leaf_entry.get_page_location();
665        match page_loc {
666            PageLocation::Null => {
667                if !self.cache_only {
668                    panic!("Found an Null page in non cache-only mode");
669                }
670
671                if write_op.op_type == OpType::Delete {
672                    return Ok(());
673                }
674
675                // Create a new mini-page to replace the null page
676                let mini_page_size = LeafNode::get_chain_size_hint(
677                    write_op.key.len(),
678                    write_op.value.len(),
679                    &self.mini_page_size_classes,
680                    self.cache_only,
681                );
682                let mini_page_guard = self.storage.alloc_mini_page(mini_page_size)?;
683                LeafNode::initialize_mini_page(
684                    &mini_page_guard,
685                    mini_page_size,
686                    MiniPageNextLevel::new_null(),
687                    true,
688                );
689                let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
690                let mini_loc = PageLocation::Mini(new_mini_ptr);
691
692                leaf_entry.create_cache_page_loc(mini_loc);
693
694                let mini_page_ref = leaf_entry.load_cache_page_mut(new_mini_ptr);
695                let insert_success =
696                    mini_page_ref.insert(write_op.key, write_op.value, write_op.op_type, 0);
697                assert!(insert_success);
698
699                debug_assert!(mini_page_ref.meta.meta_count_with_fence() > 0);
700                counter!(InsertCreatedMiniPage);
701            }
702            _ => {
703                leaf_entry.insert(
704                    write_op.key,
705                    write_op.value,
706                    parent,
707                    write_op.op_type,
708                    &self.storage,
709                    &self.write_load_full_page,
710                    &self.cache_only,
711                    &self.mini_page_size_classes,
712                )?;
713
714                if leaf_entry.cache_page_about_to_evict(&self.storage) {
715                    // we don't care about the result here
716                    _ = leaf_entry.move_cache_page_to_tail(&self.storage);
717                }
718
719                if let Some(wal) = &self.wal {
720                    let lsn = wal.append_and_wait(&write_op, leaf_entry.get_disk_offset());
721                    leaf_entry.update_lsn(lsn);
722                }
723            }
724        }
725
726        Ok(())
727    }
728
729    /// Make sure you're not holding any lock while calling this function.
730    pub(crate) fn evict_from_circular_buffer(&self) -> Result<usize, TreeError> {
731        // Why we need to evict multiple times?
732        // because we don't want each alloc to trigger evict, i.e., we want alloc to fail less often.
733        // with default 1024 bytes, one eviction allows us to alloc 1024 bytes (4 256-byte mini pages) without failure.
734        const TARGET_EVICT_SIZE: usize = 1024;
735        let mut evicted = 0;
736
737        // A corner case: we may not have enough memory to evict (i.e., the buffer might be empty now)
738        let mut retry_cnt = 0;
739
740        while evicted < TARGET_EVICT_SIZE && retry_cnt < 10 {
741            let n = self
742                .storage
743                .evict_from_buffer(|mini_page_handle: &TombstoneHandle| {
744                    eviction_callback(mini_page_handle, self)
745                })?;
746            evicted += n as usize;
747            retry_cnt += 1;
748        }
749        info!("stopped evict from circular buffer");
750        Ok(evicted)
751    }
752
753    /// Insert a key-value pair to the system, overrides existing value if present.
754    ///
755    /// ```
756    /// use bf_tree::BfTree;
757    /// use bf_tree::LeafReadResult;
758    ///
759    /// let tree = BfTree::default();
760    /// tree.insert(b"key", b"value");
761    /// let mut buffer = [0u8; 1024];
762    /// let read_size = tree.read(b"key", &mut buffer);
763    ///
764    /// assert_eq!(read_size, LeafReadResult::Found(5));
765    /// assert_eq!(&buffer[..5], b"value");
766    /// ```
767    pub fn insert(&self, key: &[u8], value: &[u8]) -> LeafInsertResult {
768        // The input key cannot exceed the configured max key length
769        if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
770            return LeafInsertResult::InvalidKV(format!("Key too large {}", key.len()));
771        }
772
773        // The input key value pair cannot exceed the configured max record size
774        if value.len() > MAX_VALUE_LEN || key.len() + value.len() > self.config.cb_max_record_size {
775            return LeafInsertResult::InvalidKV(format!(
776                "Record too large {}, {}",
777                key.len(),
778                value.len()
779            ));
780        }
781
782        let backoff = Backoff::new();
783        let mut aggressive_split = false;
784
785        counter!(Insert);
786        info!(key_len = key.len(), value_len = value.len(), "insert");
787
788        loop {
789            let result = self.write_inner(WriteOp::make_insert(key, value), aggressive_split);
790            match result {
791                Ok(_) => return LeafInsertResult::Success,
792                Err(TreeError::NeedRestart) => {
793                    #[cfg(all(feature = "shuttle", test))]
794                    {
795                        shuttle::thread::yield_now();
796                    }
797                    counter!(InsertNeedRestart);
798                    aggressive_split = true;
799                }
800                Err(TreeError::CircularBufferFull) => {
801                    info!("insert failed, started evict from circular buffer");
802                    aggressive_split = true;
803                    counter!(InsertCircularBufferFull);
804                    _ = self.evict_from_circular_buffer();
805                    continue;
806                }
807                Err(TreeError::Locked) => {
808                    counter!(InsertLocked);
809                    backoff.snooze();
810                }
811            }
812        }
813    }
814
815    /// Read a record from the tree.
816    /// Returns the number of bytes read.
817    ///
818    /// TODO: don't panic if the out_buffer is too small, instead returns a error.
819    ///
820    /// ```
821    /// use bf_tree::BfTree;
822    /// use bf_tree::LeafReadResult;
823    ///
824    /// let tree = BfTree::default();
825    /// tree.insert(b"key", b"value");
826    /// let mut buffer = [0u8; 1024];
827    /// let read_size = tree.read(b"key", &mut buffer);
828    /// assert_eq!(read_size, LeafReadResult::Found(5));
829    /// assert_eq!(&buffer[..5], b"value");
830    /// ```
831    pub fn read(&self, key: &[u8], out_buffer: &mut [u8]) -> LeafReadResult {
832        // The input key cannot exceed the configured max key length
833        if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
834            return LeafReadResult::InvalidKey;
835        }
836
837        let backoff = Backoff::new();
838
839        info!(key_len = key.len(), "read");
840        counter!(Read);
841        let mut aggressive_split = false;
842
843        #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
844        let mut debug_timer = DebugTimerGuard::new(Timer::Read, self.metrics_recorder.clone());
845
846        loop {
847            let result = self.read_inner(key, out_buffer, aggressive_split);
848            match result {
849                Ok(v) => {
850                    #[cfg(any(
851                        feature = "metrics-rt-debug-all",
852                        feature = "metrics-rt-debug-timer"
853                    ))]
854                    debug_timer.end();
855
856                    return v;
857                }
858                Err(TreeError::CircularBufferFull) => {
859                    info!("read promotion failed, started evict from circular buffer");
860                    aggressive_split = true;
861                    match self.evict_from_circular_buffer() {
862                        Ok(_) => continue,
863                        Err(_) => continue,
864                    };
865                }
866                Err(_) => {
867                    backoff.spin();
868                    aggressive_split = true;
869                }
870            }
871        }
872    }
873
874    /// Delete a record from the tree.
875    ///
876    /// ```
877    /// use bf_tree::BfTree;
878    /// use bf_tree::LeafReadResult;
879    ///
880    /// let tree = BfTree::default();
881    /// tree.insert(b"key", b"value");
882    /// tree.delete(b"key");
883    /// let mut buffer = [0u8; 1024];
884    /// let rt = tree.read(b"key", &mut buffer);
885    /// assert_eq!(rt, LeafReadResult::Deleted);
886    /// ```
887    pub fn delete(&self, key: &[u8]) {
888        // The input key cannot exceed the configured max key length
889        if key.len() > self.config.max_fence_len / 2 {
890            return;
891        }
892
893        let backoff = Backoff::new();
894
895        info!(key_len = key.len(), "delete");
896
897        let mut aggressive_split = false;
898
899        loop {
900            let result = self.write_inner(WriteOp::make_delete(key), aggressive_split);
901            match result {
902                Ok(_) => return,
903                Err(TreeError::CircularBufferFull) => {
904                    info!("delete failed, started evict from circular buffer");
905                    aggressive_split = true;
906                    match self.evict_from_circular_buffer() {
907                        Ok(_) => continue,
908                        Err(_) => continue,
909                    };
910                }
911                Err(_) => {
912                    aggressive_split = true;
913                    backoff.spin();
914                }
915            }
916        }
917    }
918
919    /// Scan records in the tree, with starting key and desired scan count.
920    /// Returns a iterator that yields key-value pairs.
921    pub fn scan<'a>(&'a self, key: &'a [u8], cnt: usize) -> ScanIter<'a, 'a> {
922        ScanIter::new(self, key, cnt)
923    }
924
925    #[doc(hidden)]
926    pub fn scan_mut<'a>(&'a self, key: &'a [u8], cnt: usize) -> ScanIterMut<'a, 'a> {
927        ScanIterMut::new(self, key, cnt)
928    }
929
930    fn read_inner(
931        &self,
932        key: &[u8],
933        out_buffer: &mut [u8],
934        aggressive_split: bool,
935    ) -> Result<LeafReadResult, TreeError> {
936        let (node, parent) = self.traverse_to_leaf(key, aggressive_split)?;
937
938        let mut leaf = self.mapping_table().get(&node);
939
940        check_parent!(self, node, parent);
941
942        let out = leaf.read(
943            key,
944            out_buffer,
945            self.config.mini_page_binary_search,
946            self.cache_only,
947        );
948        match out {
949            ReadResult::Mini(r) | ReadResult::Full(r) => {
950                if leaf.cache_page_about_to_evict(&self.storage) {
951                    let mut x_leaf = match leaf.try_upgrade() {
952                        Ok(v) => v,
953                        Err(_) => return Ok(r),
954                    };
955                    // we don't care about the result here, because we are in read path, we don't want to block.
956                    _ = x_leaf.move_cache_page_to_tail(&self.storage);
957                }
958
959                Ok(r)
960            }
961
962            ReadResult::Base(r) => {
963                counter!(BasePageRead);
964
965                // In cache-only mode, no base page should exist
966                if self.cache_only {
967                    panic!("Attempt to read a base page while in cache-only mode.");
968                }
969
970                let v = match r {
971                    LeafReadResult::Found(v) => v,
972                    _ => return Ok(r),
973                };
974
975                if parent.is_none() || !self.should_promote_read() {
976                    return Ok(r);
977                }
978
979                let mut x_leaf = match leaf.try_upgrade() {
980                    Ok(x) => x,
981                    Err(_) => {
982                        return Ok(r);
983                    }
984                };
985
986                if self.config.read_record_cache {
987                    // we do record cache.
988                    // we roll dice to see if we should insert this value to mini page.
989
990                    let out = x_leaf.insert(
991                        key,
992                        &out_buffer[0..v as usize],
993                        parent,
994                        OpType::Cache,
995                        &self.storage,
996                        &self.write_load_full_page,
997                        &self.cache_only,
998                        &self.mini_page_size_classes,
999                    );
1000
1001                    match out {
1002                        Ok(_) => {
1003                            counter!(ReadPromotionOk);
1004                            Ok(r)
1005                        }
1006                        Err(TreeError::Locked) => {
1007                            // We are doing this very optimistically, if contention happens, we just abort and return.
1008                            counter!(ReadPromotionFailed);
1009                            Ok(r)
1010                        }
1011                        Err(TreeError::CircularBufferFull) => {
1012                            counter!(ReadPromotionFailed);
1013                            Err(TreeError::CircularBufferFull)
1014                        }
1015                        Err(TreeError::NeedRestart) => {
1016                            // If we need restart here, potentially because parent is full.
1017                            counter!(ReadPromotionFailed);
1018                            Err(TreeError::NeedRestart)
1019                        }
1020                    }
1021                } else {
1022                    match self.upgrade_to_full_page(x_leaf, parent.unwrap()) {
1023                        Ok(_) | Err(TreeError::Locked) => Ok(r),
1024                        Err(e) => Err(e),
1025                    }
1026                }
1027            }
1028            ReadResult::None => Ok(LeafReadResult::NotFound),
1029        }
1030    }
1031
1032    fn upgrade_to_full_page<'a>(
1033        &'a self,
1034        mut x_leaf: LeafEntryXLocked<'a>,
1035        parent: ReadGuard<'a>,
1036    ) -> Result<LeafEntryXLocked<'a>, TreeError> {
1037        let page_loc = x_leaf.get_page_location().clone();
1038        match page_loc {
1039            PageLocation::Mini(ptr) => {
1040                let mini_page = x_leaf.load_cache_page_mut(ptr);
1041                let h = self.storage.begin_dealloc_mini_page(mini_page)?;
1042                let _merge_result = x_leaf.try_merge_mini_page(&h, parent, &self.storage)?;
1043                let base_offset = mini_page.next_level;
1044                x_leaf.change_to_base_loc();
1045                self.storage.finish_dealloc_mini_page(h);
1046
1047                let base_page_ref = x_leaf.load_base_page_from_buffer();
1048                let full_page_loc =
1049                    upgrade_to_full_page(&self.storage, base_page_ref, base_offset)?;
1050                x_leaf.create_cache_page_loc(full_page_loc);
1051                Ok(x_leaf)
1052            }
1053            PageLocation::Full(_ptr) => Ok(x_leaf),
1054            PageLocation::Base(offset) => {
1055                let base_page_ref = x_leaf.load_base_page(offset);
1056                let next_level = MiniPageNextLevel::new(offset);
1057                let full_page_loc = upgrade_to_full_page(&self.storage, base_page_ref, next_level)?;
1058                x_leaf.create_cache_page_loc(full_page_loc);
1059                Ok(x_leaf)
1060            }
1061            PageLocation::Null => panic!("upgrade_to_full_page on Null page"),
1062        }
1063    }
1064
1065    /// Collect all metrics and reset the metric recorder
1066    /// The caller needs to ensure there are no references to the bf-tree's metrics recorder anymore.
1067    pub fn get_metrics(&mut self) -> Option<serde_json::Value> {
1068        #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
1069        {
1070            let recorder = self.metrics_recorder.take();
1071            match recorder {
1072                Some(r) => {
1073                    let recorders = Arc::try_unwrap(r).expect("Failed to obtain the recorders of bf-tree, please make sure no other references exist.");
1074                    let mut timer_accumulated = TimerRecorder::default();
1075
1076                    // Only collect timer metrics for now
1077                    for r in recorders {
1078                        let t = unsafe { &*r.get() };
1079
1080                        timer_accumulated += t.timers.clone();
1081                    }
1082
1083                    let output = serde_json::json!({
1084                        "Timers": timer_accumulated,
1085                    });
1086
1087                    self.metrics_recorder = Some(Arc::new(ThreadLocal::new()));
1088
1089                    Some(output)
1090                }
1091                None => None,
1092            }
1093        }
1094        #[cfg(not(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer")))]
1095        {
1096            None
1097        }
1098    }
1099}
1100
1101pub(crate) fn key_value_physical_size(key: &[u8], value: &[u8]) -> usize {
1102    let key_size = key.len();
1103    let value_size = value.len();
1104    let meta_size = crate::nodes::KV_META_SIZE;
1105    key_size + value_size + meta_size
1106}
1107
1108pub(crate) fn eviction_callback(
1109    mini_page_handle: &TombstoneHandle,
1110    tree: &BfTree,
1111) -> Result<(), TreeError> {
1112    let mini_page = mini_page_handle.ptr as *mut LeafNode;
1113    let key_to_this_page = if tree.cache_only {
1114        unsafe { &*mini_page }.try_get_key_to_reach_this_node()?
1115    } else {
1116        unsafe { &*mini_page }.get_key_to_reach_this_node()
1117    };
1118
1119    // Here we need to set aggressive split to true, because we would split parent node due to leaf split.
1120    let (pid, parent) = tree.traverse_to_leaf(&key_to_this_page, true)?;
1121    info!(
1122        pid = pid.raw(),
1123        "starting to merge mini page in eviction call back"
1124    );
1125
1126    let mut leaf_entry = tree.mapping_table().get_mut(&pid);
1127
1128    histogram!(EvictNodeSize, unsafe { &*mini_page }.meta.node_size as u64);
1129
1130    match leaf_entry.get_page_location() {
1131        PageLocation::Mini(ptr) => {
1132            {
1133                // In order to lock this node, we need to traverse to this node first;
1134                // but in order to traverse this node, we need to read the keys in this node;
1135                // in order to read the keys in this node, we need to lock this node.
1136                //
1137                // Because we didn't lock the node while reading `key_to_this_page`,
1138                // we need to recheck if the node is still the same node.
1139                if *ptr != mini_page {
1140                    return Err(TreeError::NeedRestart);
1141                }
1142            }
1143
1144            let parent = parent.expect("Mini page must have a parent");
1145            parent.check_version()?;
1146
1147            // In the case of cache_only, the correponding mapping table entry of the mini-page
1148            // is replaced by a non-existant base page
1149            if tree.cache_only {
1150                leaf_entry.change_to_null_loc();
1151            } else {
1152                leaf_entry.try_merge_mini_page(mini_page_handle, parent, &tree.storage)?;
1153                leaf_entry.change_to_base_loc();
1154                // we don't need to dealloc the old_mini_page here because we are in eviction callback.
1155            }
1156
1157            Ok(())
1158        }
1159
1160        PageLocation::Full(ptr) => {
1161            if *ptr != mini_page {
1162                return Err(TreeError::NeedRestart);
1163            }
1164
1165            leaf_entry.merge_full_page(mini_page_handle);
1166            Ok(())
1167        }
1168
1169        // This means the key read from the mini page is corrupted and points to a different page
1170        PageLocation::Base(_offset) => Err(TreeError::NeedRestart),
1171
1172        // This means the key read from the mini page is corrupted and points to a different page
1173        PageLocation::Null => Err(TreeError::NeedRestart),
1174    }
1175}
1176
1177#[cfg(test)]
1178mod tests {
1179    use crate::BfTree;
1180
1181    #[test]
1182    fn test_mini_page_size_classes() {
1183        let mut size_classes = BfTree::create_mem_page_size_classes(48, 1952, 4096, 64, false);
1184        assert_eq!(
1185            size_classes,
1186            vec![128, 192, 256, 512, 960, 1856, 2048, 4096]
1187        );
1188
1189        size_classes = BfTree::create_mem_page_size_classes(1548, 1548, 3136, 64, true);
1190        assert_eq!(size_classes, vec![1536, 3136]);
1191
1192        size_classes = BfTree::create_mem_page_size_classes(48, 3072, 12288, 64, false);
1193        assert_eq!(
1194            size_classes,
1195            vec![128, 192, 256, 512, 960, 1856, 3648, 7232, 9088, 12288]
1196        );
1197    }
1198}