bf_tree/nodes/
leaf_node.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT license.
3
4use core::panic;
5use std::{alloc::Layout, cmp::Ordering, sync::atomic};
6
7use crate::{
8    circular_buffer::CircularBufferPtr, counter, error::TreeError, utils::stats::LeafStats,
9};
10
11use super::{node_meta::NodeMeta, FENCE_KEY_CNT};
12
13/// Invariant: leaf page can only have Insert and Delete type.
14///            mini page can have all.
15#[repr(u8)]
16#[derive(PartialEq, Eq, Debug, Clone, Copy)]
17pub(crate) enum OpType {
18    Insert = 0,
19    Delete = 1,
20    Cache = 2,   // the clean version of insert.
21    Phantom = 3, // the clean version of delete.
22}
23
24impl OpType {
25    pub(crate) fn is_dirty(&self) -> bool {
26        match self {
27            OpType::Insert | OpType::Delete => true,
28            OpType::Cache | OpType::Phantom => false,
29        }
30    }
31
32    fn is_absent(&self) -> bool {
33        match self {
34            OpType::Insert | OpType::Cache => false,
35            OpType::Phantom | OpType::Delete => true,
36        }
37    }
38
39    fn is_cache(&self) -> bool {
40        match self {
41            OpType::Cache | OpType::Phantom => true,
42            OpType::Insert | OpType::Delete => false,
43        }
44    }
45}
46
47const PREVIEW_SIZE: usize = 2;
48
49const LOW_FENCE_IDX: usize = 0;
50const HIGH_FENCE_IDX: usize = 1;
51
52const OP_TYPE_SHIFT: u16 = 14;
53const KEY_LEN_MASK: u16 = 0x3F_FF; // lower 14 bits on the key_len
54
55// highest bit on the value_len;
56const REF_BIT_MASK: u16 = 0x80_00;
57// third highest bit on the value_len;
58const VALUE_LEN_MASK: u16 = 0x7F_FF; // lower 15 bits on the value_len;
59
60pub(crate) fn common_prefix_len(low_fence: &[u8], high_fence: &[u8]) -> u16 {
61    let mut prefix_len = 0;
62    for i in 0..std::cmp::min(low_fence.len(), high_fence.len()) {
63        if low_fence[i] == high_fence[i] {
64            prefix_len += 1;
65        } else {
66            break;
67        }
68    }
69    prefix_len
70}
71
72#[repr(C)]
73pub(crate) struct LeafKVMeta {
74    offset: u16,
75    op_type_key_len_in_byte: u16, // Highest 2 bits: op_type, Lower 14 bits: Key length in bytes.
76    ref_value_len_in_byte: std::sync::atomic::AtomicU16, // Highest bit: ref, Lower 15 bits: value length in bytes. here we don't use shuttle's AtomicU16, because this is not a real sync point.
77    preview_bytes: [u8; PREVIEW_SIZE],
78}
79
80impl LeafKVMeta {
81    pub(crate) fn make_prefixed_meta(
82        offset: u16,
83        value_len: u16,
84        key: &[u8],
85        prefix_len: u16,
86        op_type: OpType,
87    ) -> Self {
88        let mut meta = Self {
89            offset,
90            op_type_key_len_in_byte: key.len() as u16 | ((op_type as u16) << OP_TYPE_SHIFT),
91            ref_value_len_in_byte: std::sync::atomic::AtomicU16::new(value_len),
92            preview_bytes: [0; PREVIEW_SIZE],
93        };
94
95        for i in 0..std::cmp::min(key.len().saturating_sub(prefix_len as usize), PREVIEW_SIZE) {
96            meta.preview_bytes[i] = key[i + prefix_len as usize];
97        }
98
99        // The initial value is not referenced, this is important because during the Garbage reclaim of the delta chain,
100        // we will call Insert to reset the states, which calls this function.
101        meta.clear_ref();
102        meta
103    }
104
105    pub fn make_infinite_high_fence_key() -> Self {
106        assert_eq!(std::mem::size_of::<Self>(), super::KV_META_SIZE);
107
108        Self {
109            offset: u16::MAX,
110            op_type_key_len_in_byte: 0,
111            ref_value_len_in_byte: std::sync::atomic::AtomicU16::new(0),
112            preview_bytes: [0; PREVIEW_SIZE],
113        }
114    }
115
116    pub fn make_infinite_low_fence_key() -> Self {
117        Self {
118            offset: u16::MAX - 1,
119            op_type_key_len_in_byte: 0,
120            ref_value_len_in_byte: std::sync::atomic::AtomicU16::new(0),
121            preview_bytes: [0; PREVIEW_SIZE],
122        }
123    }
124
125    pub fn is_infinite_low_fence_key(&self) -> bool {
126        self.offset == u16::MAX - 1
127    }
128
129    pub fn is_infinite_high_fence_key(&self) -> bool {
130        self.offset == u16::MAX
131    }
132
133    pub fn value_len(&self) -> u16 {
134        self.ref_value_len_in_byte.load(atomic::Ordering::Relaxed) & VALUE_LEN_MASK
135    }
136
137    pub fn set_value_len(&mut self, value: u16) {
138        let v = self.ref_value_len_in_byte.load(atomic::Ordering::Relaxed);
139        self.ref_value_len_in_byte.store(
140            (v & !VALUE_LEN_MASK) | (value & VALUE_LEN_MASK),
141            atomic::Ordering::Relaxed,
142        );
143    }
144
145    pub fn set_op_type(&mut self, op_type: OpType) {
146        self.op_type_key_len_in_byte = self.get_key_len() | ((op_type as u16) << OP_TYPE_SHIFT);
147    }
148
149    pub fn op_type(&self) -> OpType {
150        let l = self.op_type_key_len_in_byte;
151        let b = l >> OP_TYPE_SHIFT;
152        unsafe { std::mem::transmute(b as u8) }
153    }
154
155    pub fn mark_as_ref(&self) {
156        self.ref_value_len_in_byte
157            .fetch_or(REF_BIT_MASK, atomic::Ordering::Relaxed);
158    }
159
160    pub fn clear_ref(&self) {
161        self.ref_value_len_in_byte
162            .fetch_and(!REF_BIT_MASK, atomic::Ordering::Relaxed);
163    }
164
165    pub fn is_referenced(&self) -> bool {
166        self.ref_value_len_in_byte.load(atomic::Ordering::Relaxed) & REF_BIT_MASK != 0
167    }
168
169    pub fn mark_as_deleted(&mut self) {
170        self.set_op_type(OpType::Delete);
171    }
172
173    #[allow(dead_code)]
174    pub fn is_deleted(&self) -> bool {
175        self.op_type() == OpType::Delete
176    }
177
178    pub(crate) fn get_offset(&self) -> u16 {
179        self.offset
180    }
181
182    pub(crate) fn get_key_len(&self) -> u16 {
183        self.op_type_key_len_in_byte & KEY_LEN_MASK
184    }
185}
186
187#[derive(Debug, Clone, Copy)]
188pub(crate) struct MiniPageNextLevel {
189    val: usize,
190}
191
192impl MiniPageNextLevel {
193    pub(crate) fn new(val: usize) -> Self {
194        Self { val }
195    }
196
197    pub(crate) fn as_offset(&self) -> usize {
198        assert!(!self.is_null());
199        self.val
200    }
201
202    pub(crate) fn new_null() -> Self {
203        Self { val: usize::MAX }
204    }
205
206    pub(crate) fn is_null(&self) -> bool {
207        self.val == usize::MAX
208    }
209}
210
211#[repr(C)]
212pub(crate) struct LeafNode {
213    pub(crate) meta: NodeMeta,
214    prefix_len: u16,
215    pub(crate) next_level: MiniPageNextLevel,
216    pub(crate) lsn: u64,
217    data: [u8; 0],
218}
219
220const _: () = assert!(std::mem::size_of::<LeafNode>() == 24);
221
222impl LeafNode {
223    fn max_data_size(node_size: usize) -> usize {
224        node_size - std::mem::size_of::<LeafNode>()
225    }
226
227    pub(crate) fn initialize_mini_page(
228        ptr: &CircularBufferPtr,
229        node_size: usize,
230        next_level: MiniPageNextLevel,
231        cache_only: bool,
232    ) {
233        unsafe {
234            Self::init_node_with_fence(
235                ptr.as_ptr(),
236                &[],
237                &[],
238                node_size,
239                next_level,
240                false,
241                cache_only,
242            )
243        }
244    }
245
246    pub(crate) fn make_base_page(node_size: usize) -> *mut Self {
247        let layout = Layout::from_size_align(node_size, std::mem::align_of::<LeafNode>()).unwrap();
248        let ptr = unsafe { std::alloc::alloc(layout) };
249        unsafe {
250            Self::init_node_with_fence(
251                ptr,
252                &[],
253                &[],
254                node_size,
255                MiniPageNextLevel::new_null(),
256                true,
257                false,
258            );
259        }
260        ptr as *mut Self
261    }
262
263    /// After promoting a base page to full page cache, we need to mark every records as cache.
264    pub(crate) fn covert_insert_records_to_cache(&mut self) {
265        let mut cur = self.first_meta_pos_after_fence();
266
267        while cur < self.meta.meta_count_with_fence() {
268            let meta = self.get_kv_meta_mut(cur as usize);
269            let op_type = meta.op_type();
270
271            match op_type {
272                OpType::Insert => {
273                    meta.set_op_type(OpType::Cache);
274                }
275                OpType::Delete => {
276                    meta.set_op_type(OpType::Phantom);
277                }
278                OpType::Cache | OpType::Phantom => {
279                    unreachable!("Base page should not have op type: {:?}", op_type);
280                }
281            };
282
283            cur += 1;
284        }
285    }
286
287    /// When merge full page cache to base page, we need to set clean records to dirty records.
288    pub(crate) fn convert_cache_records_to_insert(&mut self) {
289        let mut cur = self.first_meta_pos_after_fence();
290
291        while cur < self.meta.meta_count_with_fence() {
292            let meta = self.get_kv_meta_mut(cur as usize);
293            let op_type = meta.op_type();
294
295            match op_type {
296                OpType::Cache => {
297                    meta.set_op_type(OpType::Insert);
298                }
299                OpType::Phantom => {
300                    meta.set_op_type(OpType::Delete);
301                }
302                OpType::Insert | OpType::Delete => {
303                    // other types might present.
304                }
305            };
306
307            cur += 1;
308        }
309    }
310
311    unsafe fn init_node_with_fence(
312        ptr: *mut u8,
313        low_fence: &[u8],
314        high_fence: &[u8],
315        node_size: usize,
316        next_level: MiniPageNextLevel,
317        has_fence: bool,
318        cache_only: bool,
319    ) {
320        let ptr = ptr as *mut Self;
321
322        { &mut *ptr }.initialize(
323            low_fence, high_fence, node_size, next_level, has_fence, cache_only,
324        );
325    }
326
327    pub(crate) fn get_kv_meta(&self, index: usize) -> &LeafKVMeta {
328        debug_assert!(index < self.meta.meta_count_with_fence() as usize);
329        let meta_ptr = self.data.as_ptr() as *const LeafKVMeta;
330        unsafe { &*meta_ptr.add(index) }
331    }
332
333    pub(crate) fn get_full_key(&self, meta: &LeafKVMeta) -> Vec<u8> {
334        let prefix = self.get_prefix();
335        let remaining = self.get_remaining_key(meta);
336        [prefix, remaining].concat()
337    }
338
339    /// Get the full key for low fence which is not prefix compressed
340    pub(crate) fn get_low_fence_full_key(&self) -> Vec<u8> {
341        debug_assert!(LOW_FENCE_IDX < self.meta.meta_count_with_fence() as usize);
342        let meta_ptr = self.data.as_ptr() as *const LeafKVMeta;
343        let meta = unsafe { &*meta_ptr.add(LOW_FENCE_IDX) };
344
345        let key_offset = meta.get_offset();
346        let key_ptr = unsafe { self.data.as_ptr().add(key_offset as usize) };
347        unsafe {
348            let key = std::slice::from_raw_parts(key_ptr, (meta.get_key_len()) as usize);
349            [key].concat()
350        }
351    }
352
353    pub(crate) fn get_kv_meta_mut(&mut self, index: usize) -> &mut LeafKVMeta {
354        debug_assert!(index < self.meta.meta_count_with_fence() as usize);
355        let meta_ptr = self.data.as_mut_ptr() as *mut LeafKVMeta;
356        unsafe { meta_ptr.add(index).as_mut().unwrap() }
357    }
358
359    pub(crate) fn write_initial_kv_meta(&mut self, index: usize, meta: LeafKVMeta) {
360        let meta_ptr = self.data.as_mut_ptr() as *mut LeafKVMeta;
361        unsafe { meta_ptr.add(index).write(meta) };
362    }
363
364    pub(crate) fn get_remaining_key(&self, meta: &LeafKVMeta) -> &[u8] {
365        let key_offset = meta.get_offset();
366        let key_ptr = unsafe { self.data.as_ptr().add(key_offset as usize) };
367        unsafe {
368            std::slice::from_raw_parts(key_ptr, (meta.get_key_len() - self.prefix_len) as usize)
369        }
370    }
371
372    pub(crate) fn get_prefix(&self) -> &[u8] {
373        let m = self.get_kv_meta(LOW_FENCE_IDX);
374        let key_offset = m.get_offset();
375        unsafe {
376            std::slice::from_raw_parts(
377                self.data.as_ptr().add(key_offset as usize),
378                self.prefix_len as usize,
379            )
380        }
381    }
382
383    pub(crate) fn install_fence_key(&mut self, key: &[u8], is_high_fence: bool) {
384        let loc: u16 = self.meta.meta_count_with_fence();
385        if !is_high_fence {
386            debug_assert!(self.meta.meta_count_with_fence() == LOW_FENCE_IDX as u16);
387        } else {
388            debug_assert!(self.meta.meta_count_with_fence() == HIGH_FENCE_IDX as u16);
389        }
390
391        let cur_low_offset = self.current_lowest_offset();
392
393        self.meta.increment_value_count();
394        self.meta.remaining_size -= std::mem::size_of::<LeafKVMeta>() as u16;
395
396        if key.is_empty() {
397            let fence = if is_high_fence {
398                LeafKVMeta::make_infinite_high_fence_key()
399            } else {
400                LeafKVMeta::make_infinite_low_fence_key()
401            };
402            self.write_initial_kv_meta(loc as usize, fence);
403        } else {
404            let prefix_len = if is_high_fence { self.prefix_len } else { 0 }; // we don't compress low fence.
405
406            let post_fix_len = key.len() as u16 - prefix_len;
407            let val_len = 0u16;
408            let kv_len = post_fix_len + val_len;
409
410            let remaining = self.meta.remaining_size;
411
412            debug_assert!(kv_len <= remaining);
413
414            let offset = cur_low_offset - kv_len;
415
416            let new_meta =
417                LeafKVMeta::make_prefixed_meta(offset, 0, key, prefix_len, OpType::Insert);
418
419            self.write_initial_kv_meta(loc as usize, new_meta);
420
421            unsafe {
422                let start_ptr = self.data.as_mut_ptr().add(offset as usize);
423
424                let key_slice = std::slice::from_raw_parts(
425                    key.as_ptr().add(prefix_len as usize),
426                    post_fix_len as usize,
427                );
428                std::ptr::copy_nonoverlapping(key_slice.as_ptr(), start_ptr, post_fix_len as usize);
429            }
430
431            self.meta.remaining_size -= kv_len;
432        }
433    }
434
435    pub(crate) fn current_lowest_offset(&self) -> u16 {
436        let value_count = self.meta.meta_count_with_fence();
437        let rt =
438            (value_count * std::mem::size_of::<LeafKVMeta>() as u16) + self.meta.remaining_size;
439
440        // Sanity check
441        #[cfg(debug_assertions)]
442        {
443            let mut min_offset = LeafNode::max_data_size(self.meta.node_size as usize) as u16;
444            for i in 0..value_count {
445                let kv_meta = self.get_kv_meta(i as usize);
446                if kv_meta.is_infinite_low_fence_key() || kv_meta.is_infinite_high_fence_key() {
447                    continue;
448                }
449                min_offset = std::cmp::min(min_offset, kv_meta.offset);
450            }
451            assert!(min_offset == rt);
452        }
453        rt
454    }
455
456    pub(crate) fn get_value(&self, meta: &LeafKVMeta) -> &[u8] {
457        let val_offset = meta.get_offset() + meta.get_key_len() - self.prefix_len;
458        let val_ptr = unsafe { self.data.as_ptr().add(val_offset as usize) };
459        unsafe { std::slice::from_raw_parts(val_ptr, meta.value_len() as usize) }
460    }
461
462    /// A good split key has two properties:
463    /// (1). it split the nodes into two roughly equal sizes.
464    /// (2). it should be short, so that parent node search can be faster.
465    ///
466    /// Get the split key is tricky:
467    /// 1. We can't just use the middle key, because the key/value are variable length. We want the key that split the node in size, not in key count.
468    /// 2. We don't try to find small split key here.
469    ///
470    /// Returns the key that split the node into two roughly equal size, and the new node count.
471    /// This is only invoked once for splitting the root node.
472    pub fn get_split_key(&self, cache_only: bool) -> (Vec<u8>, u16) {
473        let mut data_size = 0;
474
475        for meta in self.meta_iter() {
476            let key_len = meta.get_key_len();
477            let value_len = meta.value_len();
478
479            data_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
480        }
481
482        let split_target_size = data_size / 2;
483        let mut cur_size = 0;
484
485        for (i, meta) in self.meta_iter().enumerate() {
486            let key_len = meta.get_key_len();
487            let value_len = meta.value_len();
488
489            cur_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
490
491            // Here we guarantee in cache-only mode, the root node has at least two keys
492            if cache_only && i == 0 {
493                continue;
494            }
495
496            if cur_size >= split_target_size {
497                return (self.get_full_key(meta), i as u16);
498            }
499        }
500        unreachable!();
501    }
502
503    /// Get # of keys strictly smaller than a merge_split_key
504    #[allow(clippy::unused_enumerate_index)]
505    pub fn get_kv_num_below_key(&self, merge_split_key: &Vec<u8>) -> u16 {
506        // Linear search
507        let mut cnt: u16 = 0;
508        for (_, meta) in self.meta_iter().enumerate() {
509            let key = self.get_full_key(meta);
510            let cmp = key.cmp(merge_split_key);
511            // Pick all records from the base page whose key is smaller
512            // than merge_split_key
513            if cmp == std::cmp::Ordering::Less {
514                cnt += 1;
515            } else {
516                break;
517            }
518        }
519        cnt
520    }
521
522    /// [Cache-only mode]: Find the splitting key that evenly splits all the records in the
523    /// current node and the to-be-inserted record in half. The caller needs to guarantee that
524    /// there are at least two records with different such that it won't result in an empty page
525    /// after split
526    pub fn get_cache_only_insert_split_key(&self, key: &[u8], new_record_size: &u16) -> Vec<u8> {
527        let mut merge_split_key_1: Option<Vec<u8>> = None;
528        let mut merge_split_key_2: Option<Vec<u8>> = None;
529        let mut diff_1: i16 = i16::MAX;
530        let mut diff_2: i16 = i16::MAX;
531
532        // The total size of all records including the new record to insert
533        let mut total_merged_size: u16 = 0;
534
535        for meta in self.meta_iter() {
536            let key_len = meta.get_key_len();
537            let value_len = meta.value_len();
538
539            total_merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
540        }
541
542        total_merged_size += new_record_size + std::mem::size_of::<LeafKVMeta>() as u16;
543        let split_target_size = total_merged_size / 2;
544
545        // Search for the splitting key
546        let mut merged_size: u16 = 0;
547        let mut self_meta_iter = self.meta_iter();
548        let mut self_meta_option = self_meta_iter.next();
549
550        // Go through all records in the base page whose keys are smaller than the new record's key
551        if self_meta_option.is_some() {
552            let mut cur_base_meta = self_meta_option.unwrap();
553            let mut cur_base_key = self.get_full_key(cur_base_meta);
554            let mut cmp = cur_base_key.as_slice().cmp(key);
555
556            while cmp == std::cmp::Ordering::Less {
557                let key_len = cur_base_meta.get_key_len();
558                let value_len = cur_base_meta.value_len();
559                merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
560                if merged_size >= split_target_size {
561                    // Two split key candidates are already found
562                    // Stop
563                    if merge_split_key_2.is_some() {
564                        break;
565                    }
566
567                    let left_side: u16 = merged_size
568                        - key_len
569                        - value_len
570                        - std::mem::size_of::<LeafKVMeta>() as u16;
571                    let right_side = total_merged_size - left_side;
572
573                    if merge_split_key_1.is_none() {
574                        merge_split_key_1 = Some(cur_base_key);
575                        diff_1 = (left_side as i16 - right_side as i16).abs();
576                    } else {
577                        merge_split_key_2 = Some(cur_base_key);
578                        diff_2 = (left_side as i16 - right_side as i16).abs();
579                    }
580                }
581
582                self_meta_option = self_meta_iter.next();
583                if let Some(meta) = self_meta_option {
584                    cur_base_meta = meta;
585                    cur_base_key = self.get_full_key(cur_base_meta);
586                    cmp = cur_base_key.as_slice().cmp(key);
587                } else {
588                    break;
589                }
590            }
591        }
592
593        // Count the new key
594        if merge_split_key_2.is_none() {
595            merged_size += new_record_size + std::mem::size_of::<LeafKVMeta>() as u16;
596
597            if merged_size >= split_target_size {
598                // Two split key candidates are already found
599                // Stop
600
601                let left_side: u16 =
602                    merged_size - new_record_size - std::mem::size_of::<LeafKVMeta>() as u16;
603                let right_side = total_merged_size - left_side;
604
605                if merge_split_key_1.is_none() {
606                    merge_split_key_1 = Some(key.to_vec());
607                    diff_1 = (left_side as i16 - right_side as i16).abs();
608                } else {
609                    merge_split_key_2 = Some(key.to_vec());
610                    diff_2 = (left_side as i16 - right_side as i16).abs();
611                }
612            }
613        }
614
615        // Go through the rest of records in the base page
616        while self_meta_option.is_some() {
617            let base_meta = self_meta_option.unwrap();
618            let key_len = base_meta.get_key_len();
619            let value_len = base_meta.value_len();
620            merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
621
622            if merged_size >= split_target_size {
623                // Return the splitting key
624                let cur_base_key = self.get_full_key(base_meta);
625                if merge_split_key_2.is_some() {
626                    break;
627                }
628
629                let left_side: u16 =
630                    merged_size - key_len - value_len - std::mem::size_of::<LeafKVMeta>() as u16;
631                let right_side = total_merged_size - left_side;
632
633                if merge_split_key_1.is_none() {
634                    merge_split_key_1 = Some(cur_base_key);
635                    diff_1 = (left_side as i16 - right_side as i16).abs();
636                } else {
637                    merge_split_key_2 = Some(cur_base_key);
638                    diff_2 = (left_side as i16 - right_side as i16).abs();
639                }
640            }
641            self_meta_option = self_meta_iter.next();
642        }
643
644        // Pick the splitting that achieves the smallest size difference between
645        // the two halves
646        if merge_split_key_1.is_none() {
647            panic!(
648                "Fail to find a splitting key for merging mini and base page.{}, {}",
649                merged_size, split_target_size
650            );
651        }
652
653        if merge_split_key_2.is_none() || diff_1 < diff_2 {
654            return merge_split_key_1.unwrap();
655        }
656
657        merge_split_key_2.unwrap()
658    }
659
660    /// Find the splitting key that divides the merged records of
661    /// the mini-page and its correponding base page evenly in two
662    /// groups (base pages). Special considerations go to the split-key
663    /// bearing (k,v) pair.
664    /// Caller needs to ensure there are at least two distinct keys among the
665    /// mini page and self.
666    #[allow(clippy::unnecessary_unwrap)]
667    pub(crate) fn get_merge_split_key(&mut self, mini_page: &LeafNode) -> Vec<u8> {
668        let mut merge_split_key_1: Option<Vec<u8>> = None;
669        let mut merge_split_key_2: Option<Vec<u8>> = None;
670        let mut diff_1: i16 = i16::MAX;
671        let mut diff_2: i16 = i16::MAX;
672
673        let mut total_merged_size: u16 = 0;
674        let mut base_meta_iter = self.meta_iter();
675        let mut cur_pos = base_meta_iter.cur;
676        let mut cur_base_meta_option = base_meta_iter.next();
677        let mut duplicate_positions = vec![];
678
679        // Calculate the size of all distinctively merged records through merge-sort
680        for mini_meta in mini_page.meta_iter() {
681            let c_type = mini_meta.op_type();
682            // Skip cached or phantom records as they will not be merged into base pages
683            if !c_type.is_dirty() {
684                continue;
685            }
686
687            let cur_mini_key = mini_page.get_full_key(mini_meta);
688            if cur_base_meta_option.is_some() {
689                let mut cur_base_meta = cur_base_meta_option.unwrap();
690                let mut cur_base_key = self.get_full_key(cur_base_meta);
691                let mut cmp = cur_base_key.cmp(&cur_mini_key);
692
693                // Go through all records from the base page whose key is strictly smaller
694                // than the current record from the mini page
695                while cmp == std::cmp::Ordering::Less {
696                    let key_len = cur_base_meta.get_key_len();
697                    let value_len = cur_base_meta.value_len();
698                    total_merged_size +=
699                        key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
700
701                    cur_pos = base_meta_iter.cur;
702                    cur_base_meta_option = base_meta_iter.next();
703                    if cur_base_meta_option.is_some() {
704                        cur_base_meta = cur_base_meta_option.unwrap();
705                        cur_base_key = self.get_full_key(cur_base_meta);
706                        cmp = cur_base_key.cmp(&cur_mini_key);
707                    } else {
708                        break;
709                    }
710                }
711
712                // If the comparison is equal then advance the base page iterator to avoid counting it
713                // twice
714                if cmp == std::cmp::Ordering::Equal && cur_base_meta_option.is_some() {
715                    // Mark the duplicate entry for deletion
716                    duplicate_positions.push(cur_pos);
717                    cur_pos = base_meta_iter.cur;
718                    cur_base_meta_option = base_meta_iter.next();
719                }
720            }
721
722            let key_len = mini_meta.get_key_len();
723            let value_len = mini_meta.value_len();
724            total_merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
725        }
726
727        // Mini-page records are exhuasted, go through the rest of
728        // records from the base page, if any
729        while cur_base_meta_option.is_some() {
730            let base_meta = cur_base_meta_option.unwrap();
731            let key_len = base_meta.get_key_len();
732            let value_len = base_meta.value_len();
733            total_merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
734
735            cur_base_meta_option = base_meta_iter.next();
736        }
737
738        // The target split size is half of all the merged records
739        let split_target_size = total_merged_size / 2;
740
741        // Merge sort the distinct records from the mini page and the base page
742        // until the size of the sorted records reaches the target split size
743        let mut merged_size: u16 = 0;
744        base_meta_iter = self.meta_iter();
745        cur_base_meta_option = base_meta_iter.next();
746
747        for mini_meta in mini_page.meta_iter() {
748            // Skip cached or phantom records as they will
749            // not take additional space after merging
750            let c_type = mini_meta.op_type();
751            if !c_type.is_dirty() {
752                continue;
753            }
754
755            let cur_mini_key = mini_page.get_full_key(mini_meta);
756            if cur_base_meta_option.is_some() {
757                let mut cur_base_meta = cur_base_meta_option.unwrap();
758                let mut cur_base_key = self.get_full_key(cur_base_meta);
759                let mut cmp = cur_base_key.cmp(&cur_mini_key);
760                // Go through all records from the base page whose key is strictly smaller
761                // than the current record from the mini page
762                while cmp == std::cmp::Ordering::Less {
763                    let key_len = cur_base_meta.get_key_len();
764                    let value_len = cur_base_meta.value_len();
765                    merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
766                    if merged_size >= split_target_size {
767                        // Two split key candidates are already found
768                        // Stop
769                        if merge_split_key_2.is_some() {
770                            break;
771                        }
772
773                        let left_side: u16 = merged_size
774                            - key_len
775                            - value_len
776                            - std::mem::size_of::<LeafKVMeta>() as u16;
777                        let right_side = total_merged_size - left_side;
778
779                        if merge_split_key_1.is_none() {
780                            merge_split_key_1 = Some(cur_base_key);
781                            diff_1 = (left_side as i16 - right_side as i16).abs();
782                        } else {
783                            merge_split_key_2 = Some(cur_base_key);
784                            diff_2 = (left_side as i16 - right_side as i16).abs();
785                        }
786                    }
787
788                    cur_base_meta_option = base_meta_iter.next();
789                    if cur_base_meta_option.is_some() {
790                        cur_base_meta = cur_base_meta_option.unwrap();
791                        cur_base_key = self.get_full_key(cur_base_meta);
792                        cmp = cur_base_key.cmp(&cur_mini_key);
793                    } else {
794                        break;
795                    }
796                }
797
798                // If the comparison is equal then advance the base page iterator to avoid counting it
799                // twice
800                if cmp == std::cmp::Ordering::Equal && cur_base_meta_option.is_some() {
801                    cur_base_meta_option = base_meta_iter.next();
802                }
803            }
804
805            let key_len = mini_meta.get_key_len();
806            let value_len = mini_meta.value_len();
807            merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
808
809            if merged_size >= split_target_size {
810                // Two split key candidates are already found
811                // Stop
812                if merge_split_key_2.is_some() {
813                    break;
814                }
815
816                let left_side: u16 =
817                    merged_size - key_len - value_len - std::mem::size_of::<LeafKVMeta>() as u16;
818                let right_side = total_merged_size - left_side;
819
820                if merge_split_key_1.is_none() {
821                    merge_split_key_1 = Some(cur_mini_key);
822                    diff_1 = (left_side as i16 - right_side as i16).abs();
823                } else {
824                    merge_split_key_2 = Some(cur_mini_key);
825                    diff_2 = (left_side as i16 - right_side as i16).abs();
826                }
827            }
828        }
829
830        // Mini-page records are exhuasted, go through the rest of
831        // records from the base page, if any
832        while cur_base_meta_option.is_some() {
833            let base_meta = cur_base_meta_option.unwrap();
834            let key_len = base_meta.get_key_len();
835            let value_len = base_meta.value_len();
836            merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
837
838            if merged_size >= split_target_size {
839                // Return the splitting key
840                let cur_base_key = self.get_full_key(base_meta);
841                if merge_split_key_2.is_some() {
842                    break;
843                }
844
845                let left_side: u16 =
846                    merged_size - key_len - value_len - std::mem::size_of::<LeafKVMeta>() as u16;
847                let right_side = total_merged_size - left_side;
848
849                if merge_split_key_1.is_none() {
850                    merge_split_key_1 = Some(cur_base_key);
851                    diff_1 = (left_side as i16 - right_side as i16).abs();
852                } else {
853                    merge_split_key_2 = Some(cur_base_key);
854                    diff_2 = (left_side as i16 - right_side as i16).abs();
855                }
856            }
857            cur_base_meta_option = base_meta_iter.next();
858        }
859
860        // Delete all duplicate entries from the base page
861        for pos in duplicate_positions {
862            let pos_meta = self.get_kv_meta_mut(pos);
863            pos_meta.mark_as_deleted();
864        }
865
866        if merge_split_key_1.is_none() {
867            unreachable!(
868                "Fail to find a splitting key for merging mini and base page.{}, {}",
869                merged_size, split_target_size
870            );
871        }
872
873        // The two split keys must be different
874        if merge_split_key_2.is_some() {
875            let cmp = merge_split_key_1
876                .as_ref()
877                .unwrap()
878                .cmp(merge_split_key_2.as_ref().unwrap());
879            assert_ne!(cmp, std::cmp::Ordering::Equal);
880        }
881
882        let mut splitting_key = if merge_split_key_2.is_none() || diff_1 < diff_2 {
883            merge_split_key_1.as_ref().unwrap()
884        } else {
885            merge_split_key_2.as_ref().unwrap()
886        };
887
888        // The splitting key cannot be the low fence as it leads to invalid page
889        // and duplicate entries in inner node
890        let mut fence_meta = self.get_kv_meta(LOW_FENCE_IDX);
891        if !fence_meta.is_infinite_low_fence_key() {
892            let low_fence_key = self.get_low_fence_key();
893            let cmp = splitting_key.cmp(&low_fence_key);
894            if cmp == std::cmp::Ordering::Equal {
895                splitting_key = merge_split_key_2.as_ref().unwrap();
896            }
897        }
898
899        fence_meta = self.get_kv_meta(HIGH_FENCE_IDX);
900        if !fence_meta.is_infinite_high_fence_key() {
901            let high_fence_key = self.get_high_fence_key();
902            let cmp = splitting_key.cmp(&high_fence_key);
903            assert_ne!(cmp, std::cmp::Ordering::Equal);
904        }
905
906        splitting_key.clone()
907    }
908
909    pub fn get_key_to_reach_this_node(&self) -> Vec<u8> {
910        let meta = self.get_kv_meta(self.first_meta_pos_after_fence() as usize);
911        self.get_full_key(meta)
912    }
913
914    /// In cache-only mode, transient state could be observed when one thread is in the middle of
915    /// consolidating the leaf node, while the evicting callback thread is attempting a
916    /// unprotected read. As such, this function guarantees no panic happens during the key read
917    pub fn try_get_key_to_reach_this_node(&self) -> Result<Vec<u8>, TreeError> {
918        assert!(self.meta.is_cache_only_leaf());
919
920        // Get the meta of the first key
921        let index = 0;
922        if index >= self.meta.meta_count_with_fence() as usize {
923            return Err(TreeError::NeedRestart);
924        }
925
926        let meta_ptr = self.data.as_ptr() as *const LeafKVMeta;
927        let meta = unsafe { &*meta_ptr.add(index) };
928
929        let key_offset = meta.get_offset();
930
931        if key_offset + meta.get_key_len()
932            > LeafNode::max_data_size(self.meta.node_size as usize) as u16
933        {
934            return Err(TreeError::NeedRestart);
935        }
936
937        let key_ptr = unsafe { self.data.as_ptr().add(key_offset as usize) };
938        let key_slice = unsafe {
939            std::slice::from_raw_parts(key_ptr, (meta.get_key_len() - self.prefix_len) as usize)
940        };
941        Ok(key_slice.to_vec())
942    }
943
944    pub fn get_low_fence_key(&self) -> Vec<u8> {
945        assert!(self.has_fence());
946        let fence_meta = self.get_kv_meta(LOW_FENCE_IDX);
947        if fence_meta.is_infinite_low_fence_key() {
948            vec![]
949        } else {
950            unsafe {
951                let start_ptr = self.data.as_ptr().add(fence_meta.offset as usize);
952                let key_slice =
953                    std::slice::from_raw_parts(start_ptr, fence_meta.get_key_len() as usize);
954                key_slice.to_vec()
955            }
956        }
957    }
958
959    pub fn get_high_fence_key(&self) -> Vec<u8> {
960        assert!(self.has_fence());
961        let fence_meta = self.get_kv_meta(HIGH_FENCE_IDX);
962        if fence_meta.is_infinite_high_fence_key() {
963            vec![]
964        } else {
965            self.get_full_key(fence_meta)
966        }
967    }
968
969    /// Compares the input key with the existing key specified by `meta`
970    /// By convention, key_cmp(meta, key) returns the ordering matching the expression meta <operator> key if true.
971    #[inline]
972    pub(crate) fn key_cmp(&self, meta: &LeafKVMeta, key: &[u8]) -> Ordering {
973        let search_key_prefix = &key[(self.prefix_len as usize)..]
974            [..std::cmp::min(key.len() - self.prefix_len as usize, PREVIEW_SIZE)];
975
976        let prefix_key = &meta.preview_bytes[..std::cmp::min(
977            PREVIEW_SIZE,
978            meta.get_key_len() as usize - self.prefix_len as usize,
979        )];
980        let mut cmp = prefix_key.cmp(search_key_prefix);
981
982        // If the prefix matches, compare the full key
983        if cmp == Ordering::Equal {
984            let full_key = self.get_remaining_key(meta);
985            let search_key_postfix = &key[self.prefix_len as usize..];
986            cmp = full_key.cmp(search_key_postfix);
987        }
988        cmp
989    }
990
991    pub(crate) fn linear_lower_bound(&self, key: &[u8]) -> u16 {
992        debug_assert!(key.len() >= self.prefix_len as usize);
993
994        let mut index = self.first_meta_pos_after_fence();
995
996        while index < self.meta.meta_count_with_fence() {
997            let key_meta = self.get_kv_meta(index as usize);
998
999            #[cfg(target_arch = "x86_64")]
1000            unsafe {
1001                // For bw-tree-like linear search, we use clflush to simulate pointer chasing.
1002                core::arch::x86_64::_mm_clflush(key_meta as *const LeafKVMeta as *const u8);
1003            }
1004            let cmp = self.key_cmp(key_meta, key);
1005
1006            if cmp != Ordering::Less {
1007                return index;
1008            }
1009
1010            index += 1;
1011        }
1012
1013        index
1014    }
1015
1016    pub(crate) fn lower_bound(&self, key: &[u8]) -> u16 {
1017        let mut lower = self.first_meta_pos_after_fence();
1018        let mut upper = self.meta.meta_count_with_fence();
1019
1020        let search_key_prefix = &key[(self.prefix_len as usize)..]
1021            [..std::cmp::min(key.len() - self.prefix_len as usize, PREVIEW_SIZE)];
1022
1023        debug_assert!(key.len() >= self.prefix_len as usize);
1024
1025        while lower < upper {
1026            let mid = lower + (upper - lower) / 2;
1027            let key_meta = self.get_kv_meta(mid as usize);
1028
1029            let prefix_key = &key_meta.preview_bytes[..std::cmp::min(
1030                PREVIEW_SIZE,
1031                key_meta.get_key_len() as usize - self.prefix_len as usize,
1032            )];
1033            let mut cmp = prefix_key.cmp(search_key_prefix);
1034
1035            // If the prefix matches, compare the full key
1036            if cmp == Ordering::Equal {
1037                let remaining_key = self.get_remaining_key(key_meta);
1038                let search_key_postfix = &key[self.prefix_len as usize..];
1039                cmp = remaining_key.cmp(search_key_postfix);
1040            }
1041
1042            match cmp {
1043                Ordering::Greater => {
1044                    upper = mid;
1045                }
1046                Ordering::Equal => {
1047                    return mid;
1048                }
1049                Ordering::Less => {
1050                    lower = mid + 1;
1051                }
1052            }
1053        }
1054        lower
1055    }
1056
1057    /// Take a deep breath before you read/change this function.
1058    ///
1059    /// Leaf node insert is different from inner node that:
1060    /// 1. Leaf node value is variable length
1061    /// 2. Leaf node need to handle duplicate key, in which case it need to remove the old value and insert the new value.
1062    ///
1063    /// Returns true if the insert is successful, false if out of space.
1064    pub(crate) fn insert(
1065        &mut self,
1066        key: &[u8],
1067        value: &[u8],
1068        op_type: OpType,
1069        max_fence_len: usize,
1070    ) -> bool {
1071        debug_assert!(key.len() as u16 >= self.prefix_len);
1072        match op_type {
1073            OpType::Insert | OpType::Cache => {
1074                debug_assert!(!value.is_empty());
1075            }
1076            OpType::Delete | OpType::Phantom => {}
1077        }
1078
1079        let post_fix_len = key.len() as u16 - self.prefix_len;
1080        let val_len = value.len() as u16;
1081        let kv_len = post_fix_len + val_len;
1082
1083        let value_count_with_fence = self.meta.meta_count_with_fence();
1084
1085        let pos = self.lower_bound(key) as usize;
1086
1087        if pos < value_count_with_fence as usize {
1088            let prefix_len = self.prefix_len as usize;
1089            let pos_meta = self.get_kv_meta(pos);
1090            let pos_key = self.get_remaining_key(pos_meta);
1091            let search_key_postfix = &key[prefix_len..];
1092            if pos_key.cmp(search_key_postfix) == Ordering::Equal {
1093                // The key already exists.
1094                counter!(LeafInsertDuplicate);
1095                if op_type == OpType::Delete {
1096                    let pos_meta = self.get_kv_meta_mut(pos);
1097                    pos_meta.mark_as_deleted();
1098                    return true;
1099                }
1100
1101                let pos_value = self.get_value(pos_meta);
1102                let pos_value_len = pos_value.len() as u16;
1103
1104                if pos_value_len >= val_len {
1105                    // we are lucky, old value is larger than new value. We just overwrite the old value.
1106                    unsafe {
1107                        let pair_ptr = self.data.as_ptr().add(pos_meta.offset as usize) as *mut u8;
1108                        std::ptr::copy_nonoverlapping(
1109                            value.as_ptr(),
1110                            pair_ptr.add(post_fix_len as usize),
1111                            val_len as usize,
1112                        );
1113                    }
1114                    let pos_meta = self.get_kv_meta_mut(pos);
1115                    pos_meta.set_value_len(val_len);
1116                    pos_meta.set_op_type(op_type);
1117                    return true;
1118                }
1119
1120                if self.meta.remaining_size < kv_len {
1121                    return false;
1122                }
1123                assert!(op_type != OpType::Cache);
1124                let offset = self.current_lowest_offset() - kv_len;
1125                unsafe {
1126                    let pair_ptr = self.data.as_ptr().add(offset as usize) as *mut u8;
1127
1128                    let pos_meta = self.get_kv_meta_mut(pos);
1129                    pos_meta.set_value_len(val_len);
1130                    pos_meta.set_op_type(op_type);
1131                    pos_meta.offset = offset;
1132                    std::ptr::copy_nonoverlapping(
1133                        key[self.prefix_len as usize..].as_ptr(),
1134                        pair_ptr,
1135                        post_fix_len as usize,
1136                    );
1137                    std::ptr::copy_nonoverlapping(
1138                        value.as_ptr(),
1139                        pair_ptr.add(post_fix_len as usize),
1140                        val_len as usize,
1141                    );
1142                }
1143                self.meta.remaining_size -= kv_len;
1144                return true;
1145            }
1146        }
1147
1148        // The key is not already in the node.
1149        // For any in-memory page, skipping delete records here could lead to empty page
1150        // which becomes un-evictable.
1151        if op_type == OpType::Delete && (self.is_base_page()) {
1152            // we are deleting a key that is not in the page.
1153            return true;
1154        }
1155
1156        //Check if the node has capacity for the new record with or without fences
1157        if self.full_with_fences(
1158            kv_len + std::mem::size_of::<LeafKVMeta>() as u16,
1159            max_fence_len,
1160        ) {
1161            return false;
1162        }
1163
1164        counter!(LeafInsertNew);
1165
1166        let offset = self.current_lowest_offset() - kv_len;
1167        let new_meta =
1168            LeafKVMeta::make_prefixed_meta(offset, val_len, key, self.prefix_len, op_type);
1169
1170        unsafe {
1171            let metas_size = std::mem::size_of::<LeafKVMeta>()
1172                * (self.meta.meta_count_with_fence() - pos as u16) as usize;
1173            let src_ptr = self
1174                .data
1175                .as_ptr()
1176                .add(pos * std::mem::size_of::<LeafKVMeta>());
1177            let dest_ptr = self
1178                .data
1179                .as_mut_ptr()
1180                .add((pos + 1) * std::mem::size_of::<LeafKVMeta>());
1181            std::ptr::copy(src_ptr, dest_ptr, metas_size);
1182
1183            self.write_initial_kv_meta(pos, new_meta);
1184
1185            let pair_ptr = self.data.as_mut_ptr().add(offset as usize);
1186            std::ptr::copy_nonoverlapping(
1187                key[self.prefix_len as usize..].as_ptr(),
1188                pair_ptr,
1189                post_fix_len as usize,
1190            );
1191            std::ptr::copy_nonoverlapping(
1192                value.as_ptr(),
1193                pair_ptr.add(post_fix_len as usize),
1194                val_len as usize,
1195            );
1196        }
1197
1198        self.meta.remaining_size -= kv_len + std::mem::size_of::<LeafKVMeta>() as u16;
1199        self.meta.increment_value_count();
1200        true
1201    }
1202
1203    /// Base pages are leaf pages without next pointer in non cache-only mode
1204    pub(crate) fn is_base_page(&self) -> bool {
1205        (!self.meta.is_cache_only_leaf()) && self.next_level.is_null()
1206    }
1207
1208    /// Returns the splitting key after splitting self
1209    /// The new high fence key of self (left-side node) and low fence
1210    /// of the new base page (right-side node) equal to the splitting key
1211    pub fn split(&mut self, sibling: &mut LeafNode, cache_only: bool) -> Vec<u8> {
1212        if cache_only {
1213            assert!(self.meta.is_cache_only_leaf() && !self.has_fence());
1214        } else {
1215            assert!(self.is_base_page() && self.has_fence());
1216        }
1217
1218        let current_count = self.meta.meta_count_without_fence();
1219
1220        // [new_cur_count..] are moved to the sibling node
1221        // [..new_cur_count - 1] are kept in self
1222        // where splitting_key is the key at new_cur_count
1223        // Also, new_cur_count is in [0..(#non_fence_meta - 1)]
1224        let (splitting_key, new_cur_count) = self.get_split_key(cache_only);
1225        let sibling_cnt = current_count - new_cur_count;
1226
1227        // Now we have to do two things:
1228        // Copy the second half of the key-value pairs to the new node, setting the correct offsets.
1229        // Consolidate the key-value pairs in the current node, setting the correct offsets
1230        if !cache_only {
1231            let low_fence_for_right = self.get_kv_meta(FENCE_KEY_CNT + new_cur_count as usize);
1232            assert!(!low_fence_for_right.is_infinite_low_fence_key());
1233            let high_fence_for_right = self.get_kv_meta(HIGH_FENCE_IDX);
1234
1235            let low_fence_key = self.get_full_key(low_fence_for_right);
1236            let high_fence_key = if high_fence_for_right.is_infinite_high_fence_key() {
1237                vec![]
1238            } else {
1239                self.get_full_key(high_fence_for_right)
1240            };
1241
1242            sibling.initialize(
1243                &low_fence_key,
1244                &high_fence_key,
1245                sibling.meta.node_size as usize,
1246                MiniPageNextLevel::new_null(),
1247                true,
1248                cache_only,
1249            );
1250        } else {
1251            sibling.initialize(
1252                &[],
1253                &[],
1254                sibling.meta.node_size as usize,
1255                MiniPageNextLevel::new_null(),
1256                false,
1257                cache_only,
1258            );
1259        }
1260
1261        let starting_kv_idx = if cache_only { 0 } else { FENCE_KEY_CNT };
1262
1263        for i in 0..sibling_cnt {
1264            let kv_meta = self.get_kv_meta((new_cur_count + i) as usize + starting_kv_idx);
1265            if kv_meta.op_type() == OpType::Delete {
1266                // skip deleted records.
1267                continue;
1268            }
1269            let key = self.get_full_key(kv_meta);
1270            let value = self.get_value(kv_meta);
1271            let insert_rt = sibling.insert(&key, value, OpType::Insert, 0);
1272            assert!(insert_rt);
1273        }
1274
1275        if !cache_only {
1276            self.meta
1277                .set_value_count(new_cur_count + FENCE_KEY_CNT as u16);
1278        } else {
1279            self.meta.set_value_count(new_cur_count);
1280        }
1281
1282        self.consolidate_after_split(&splitting_key);
1283
1284        splitting_key
1285    }
1286
1287    /// Use the passed in `merge_split_key` as the splitting key
1288    /// to divide the base page in two. Also, install it as the high
1289    /// fence key of the left-side node and the low fence key of the
1290    /// right-side node
1291    pub fn split_with_key(
1292        &mut self,
1293        sibling: &mut LeafNode,
1294        merge_split_key: &Vec<u8>,
1295        cache_only: bool,
1296    ) {
1297        if cache_only {
1298            assert!(self.meta.is_cache_only_leaf() && !self.has_fence());
1299        } else {
1300            assert!(self.is_base_page() && self.has_fence());
1301        }
1302
1303        let current_count = self.meta.meta_count_without_fence();
1304        // [new_cur_count..] are moved to the sibling node
1305        // [..new_cur_count - 1] are kept in self
1306        // where splitting_key is the key at new_cur_count
1307        // Also, new_cur_count is in [0..#non_fence_meta]
1308        let mut new_cur_count = self.get_kv_num_below_key(merge_split_key);
1309        let sibling_cnt = current_count - new_cur_count;
1310
1311        // Now we have to do two things:
1312        // Copy the second half of the key-value pairs to the new node, setting the correct offsets.
1313        // Consolidate the key-value pairs in the current node, setting the correct offsets.
1314        if !cache_only {
1315            let high_fence_for_right = self.get_kv_meta(HIGH_FENCE_IDX);
1316
1317            let low_fence_key = merge_split_key.clone();
1318            let high_fence_key = if high_fence_for_right.is_infinite_high_fence_key() {
1319                vec![]
1320            } else {
1321                self.get_full_key(high_fence_for_right)
1322            };
1323
1324            sibling.initialize(
1325                &low_fence_key,
1326                &high_fence_key,
1327                sibling.meta.node_size as usize,
1328                MiniPageNextLevel::new_null(),
1329                true,
1330                cache_only,
1331            );
1332        } else {
1333            sibling.initialize(
1334                &[],
1335                &[],
1336                sibling.meta.node_size as usize,
1337                MiniPageNextLevel::new_null(),
1338                false,
1339                cache_only,
1340            );
1341        }
1342
1343        let starting_kv_index = if cache_only { 0 } else { FENCE_KEY_CNT };
1344
1345        for i in 0..sibling_cnt {
1346            let kv_meta = self.get_kv_meta((new_cur_count + i) as usize + starting_kv_index);
1347            if kv_meta.op_type() == OpType::Delete {
1348                // skip deleted records
1349                continue;
1350            }
1351            let key = self.get_full_key(kv_meta);
1352            let value = self.get_value(kv_meta);
1353            let insert_rt = sibling.insert(&key, value, OpType::Insert, 0);
1354            assert!(insert_rt);
1355        }
1356
1357        if !cache_only {
1358            new_cur_count += FENCE_KEY_CNT as u16;
1359        }
1360
1361        self.meta.set_value_count(new_cur_count);
1362        self.consolidate_after_split(merge_split_key);
1363
1364        if !cache_only {
1365            // Assert the high fence of the left side node and the low fence of the right side node
1366            // are both set to the merge split key
1367            let left_high_fence = self.get_kv_meta(HIGH_FENCE_IDX);
1368            let left_high_fence_key = self.get_full_key(left_high_fence);
1369            let right_low_fence_key = sibling.get_low_fence_full_key();
1370
1371            assert_eq!(left_high_fence_key, *merge_split_key); // The high fence of the left side node == splitting key
1372            assert_eq!(right_low_fence_key, *merge_split_key);
1373        }
1374    }
1375
1376    pub(crate) fn consolidate_inner(
1377        &mut self,
1378        new_optype: OpType,
1379        new_high_fence: Option<&[u8]>,
1380        skip_tombstone: bool,
1381        cache_only: bool,
1382        skip_key: Option<&[u8]>,
1383    ) {
1384        let mut pairs = Vec::new();
1385
1386        for meta in self.meta_iter() {
1387            if skip_tombstone && meta.op_type() == OpType::Delete {
1388                // Skip tombstone values.
1389                continue;
1390            }
1391
1392            match skip_key {
1393                // Skip the record with the skip_key
1394                Some(s_k) => {
1395                    let k = self.get_full_key(meta);
1396                    let cmp = s_k.cmp(&k);
1397
1398                    if cmp != Ordering::Equal {
1399                        pairs.push((k, self.get_value(meta).to_owned(), meta.op_type()));
1400                    }
1401                }
1402                None => {
1403                    pairs.push((
1404                        self.get_full_key(meta),
1405                        self.get_value(meta).to_owned(),
1406                        meta.op_type(),
1407                    ));
1408                }
1409            }
1410        }
1411
1412        let has_fence = self.has_fence();
1413        let (low_fence, high_fence) = if has_fence {
1414            let high_fence_key = match new_high_fence {
1415                None => self.get_high_fence_key(),
1416                Some(key) => key.to_vec(),
1417            };
1418            (self.get_low_fence_key(), high_fence_key)
1419        } else {
1420            (vec![], vec![])
1421        };
1422
1423        let node_size = self.meta.node_size;
1424
1425        self.initialize(
1426            &low_fence,
1427            &high_fence,
1428            node_size as usize,
1429            self.next_level,
1430            has_fence,
1431            cache_only,
1432        );
1433
1434        for (key, value, op_type) in pairs {
1435            let rt = if op_type == OpType::Delete {
1436                self.insert(&key, &value, OpType::Delete, 0)
1437            } else {
1438                self.insert(&key, &value, new_optype, 0)
1439            };
1440            assert!(rt);
1441        }
1442    }
1443
1444    #[allow(dead_code)]
1445    pub(crate) fn consolidate(&mut self) {
1446        self.consolidate_inner(
1447            OpType::Insert,
1448            None,
1449            true,
1450            self.meta.is_cache_only_leaf(),
1451            None,
1452        );
1453    }
1454
1455    /// For mini page only.
1456    /// After consolidation, every tombstone records are removed, every insert records become cache records.
1457    pub(crate) fn consolidate_after_merge(&mut self) {
1458        assert!(!self.is_base_page());
1459        self.consolidate_inner(
1460            OpType::Cache,
1461            None,
1462            false,
1463            self.meta.is_cache_only_leaf(),
1464            None,
1465        );
1466    }
1467
1468    /// Re-organize the key-value pairs to remove the holes in the data.
1469    /// A delete/split operation will leave holes in the data field, which is not good for space efficiency.
1470    /// The easiest (but not most efficient) way to do this is to first populate every key value out, then reset the node state, then insert them back.
1471    fn consolidate_after_split(&mut self, high_fence: &[u8]) {
1472        assert!(self.meta.is_cache_only_leaf() || self.is_base_page());
1473        self.consolidate_inner(
1474            OpType::Insert,
1475            Some(high_fence),
1476            true,
1477            self.meta.is_cache_only_leaf(),
1478            None,
1479        );
1480    }
1481
1482    /// Same as consolidate(), except that any record with the given key is dropped
1483    pub(crate) fn consolidate_skip_key(&mut self, key: &[u8]) {
1484        self.consolidate_inner(
1485            OpType::Insert,
1486            None,
1487            true,
1488            self.meta.is_cache_only_leaf(),
1489            Some(key),
1490        );
1491    }
1492
1493    /// Copy a mini-page to a new memory location
1494    pub(crate) fn copy_initialize_to(
1495        &self,
1496        dst_node: *mut LeafNode,
1497        dst_size: usize,
1498        discard_cold_cache: bool,
1499    ) {
1500        assert!(!self.is_base_page());
1501        assert!(self.meta.node_size as usize <= dst_size);
1502        let dst_ref = unsafe { &mut *dst_node };
1503        let empty = vec![];
1504
1505        dst_ref.initialize(
1506            &empty,
1507            &empty,
1508            dst_size,
1509            self.next_level,
1510            false, // Mini-page only, thus no fence
1511            self.meta.is_cache_only_leaf(),
1512        );
1513
1514        for meta in self.meta_iter() {
1515            let op = meta.op_type();
1516
1517            // Skip the cold values
1518            // This won't happen in cache-only mode as all records are dirty
1519            // In non cache-only mode, this won't lead to empty mini-page as
1520            // this is invoked after a read-hot or write-hot record.
1521            if discard_cold_cache && op.is_cache() && !meta.is_referenced() {
1522                continue;
1523            }
1524
1525            let value = self.get_value(meta);
1526            let rt = dst_ref.insert(&self.get_full_key(meta), value, op, 0);
1527            assert!(rt);
1528        }
1529    }
1530
1531    /// Initialize a LeafNode
1532    pub(crate) fn initialize(
1533        &mut self,
1534        low_fence: &[u8],
1535        high_fence: &[u8],
1536        node_size: usize,
1537        next_level: MiniPageNextLevel,
1538        has_fence: bool,
1539        cache_only: bool,
1540    ) {
1541        if !has_fence {
1542            assert!(low_fence.is_empty());
1543            assert!(high_fence.is_empty());
1544
1545            self.meta = NodeMeta::new(
1546                LeafNode::max_data_size(node_size) as u16,
1547                false,
1548                false,
1549                node_size as u16,
1550                cache_only,
1551            );
1552            self.prefix_len = 0;
1553            self.next_level = next_level;
1554        } else {
1555            self.meta = NodeMeta::new(
1556                LeafNode::max_data_size(node_size) as u16, // - max_fence_len as u16, // Reserve space for
1557                false,
1558                true,
1559                node_size as u16,
1560                cache_only,
1561            );
1562
1563            self.prefix_len = common_prefix_len(low_fence, high_fence);
1564            self.next_level = next_level;
1565            self.install_fence_key(low_fence, false);
1566            self.install_fence_key(high_fence, true);
1567        }
1568    }
1569
1570    pub(crate) fn set_split_flag(&mut self) {
1571        self.meta.set_split_flag();
1572    }
1573
1574    pub(crate) fn get_split_flag(&self) -> bool {
1575        self.meta.get_split_flag()
1576    }
1577
1578    pub(crate) fn read_by_key(&self, search_key: &[u8], out_buffer: &mut [u8]) -> LeafReadResult {
1579        self.read_by_key_inner(search_key, out_buffer, true)
1580    }
1581
1582    /// Read by key.
1583    #[must_use]
1584    pub(crate) fn read_by_key_inner(
1585        &self,
1586        search_key: &[u8],
1587        out_buffer: &mut [u8],
1588        binary_search: bool,
1589    ) -> LeafReadResult {
1590        let val_count = self.meta.meta_count_with_fence();
1591        let pos = if binary_search {
1592            self.lower_bound(search_key)
1593        } else {
1594            self.linear_lower_bound(search_key)
1595        };
1596
1597        if pos >= val_count {
1598            counter!(LeafNotFoundDueToRange);
1599            return LeafReadResult::NotFound;
1600        }
1601
1602        let kv_meta = self.get_kv_meta(pos as usize);
1603        let target_key = self.get_remaining_key(kv_meta);
1604
1605        // If the key is not already referenced, we need to mark it as referenced.
1606        if !kv_meta.is_referenced() {
1607            kv_meta.mark_as_ref();
1608        }
1609
1610        let input_post_key = &search_key[self.prefix_len as usize..];
1611        let cmp = target_key.cmp(input_post_key);
1612
1613        if cmp != Ordering::Equal {
1614            counter!(LeafNotFoundDueToKey);
1615            LeafReadResult::NotFound
1616        } else {
1617            if kv_meta.op_type().is_absent() {
1618                return LeafReadResult::Deleted;
1619            }
1620            let val_len = kv_meta.value_len();
1621            let val_ref = self.get_value(kv_meta);
1622            debug_assert_eq!(val_len as usize, val_ref.len());
1623            out_buffer[..val_len as usize].copy_from_slice(val_ref);
1624            LeafReadResult::Found(val_len as u32)
1625        }
1626    }
1627
1628    /// Pick the smallest mini-page size that the record fits in without filling it full
1629    /// For Bf-Tree with backend storage, the size is strictly less than the leaf page size (full)
1630    /// For cache-only Bf-Tree, we allow the mini-page size to reach leaf page size
1631    /// Assuming page_classes is in ascending order
1632    pub(crate) fn get_chain_size_hint(
1633        key_len: usize,
1634        value_len: usize,
1635        page_classes: &[usize],
1636        cache_only: bool,
1637    ) -> usize {
1638        let mut initial_record_size = key_len + value_len + std::mem::size_of::<LeafKVMeta>();
1639        initial_record_size += std::mem::size_of::<LeafNode>();
1640
1641        if let Some(s) = page_classes[0..(page_classes.len() - 1)]
1642            .iter()
1643            .position(|x| initial_record_size < *x)
1644        {
1645            return page_classes[s];
1646        } else if cache_only && initial_record_size <= page_classes[page_classes.len() - 1] {
1647            return page_classes[page_classes.len() - 1];
1648        }
1649
1650        panic!(
1651            "Record size {} plus metadata exceeds the max mini-page size {:?}",
1652            initial_record_size, page_classes
1653        );
1654    }
1655
1656    /// A mini-page is upgraded to the next size up where the record fits in without filling it full.
1657    /// For Bf-Tree with backend storage, the new size must be smaller than the leaf page size (full)
1658    /// For cache-only Bf-Tree, we allow the mini-page size to reach leaf page size
1659    /// If not possible, return None
1660    /// Assuming page_classes is in ascending order
1661    pub(crate) fn new_size_if_upgrade(
1662        &self,
1663        incoming_size: usize,
1664        page_classes: &[usize],
1665        cache_only: bool,
1666    ) -> Option<usize> {
1667        let cur_size = self.meta.node_size as usize;
1668        let request_size = cur_size + incoming_size;
1669
1670        if let Some(s) = page_classes[0..(page_classes.len() - 1)]
1671            .iter()
1672            .position(|x| request_size < *x)
1673        {
1674            if s == 0 {
1675                panic!("Should not be here");
1676            }
1677            return Some(page_classes[s]);
1678        } else if cache_only && request_size <= page_classes[page_classes.len() - 1] {
1679            return Some(page_classes[page_classes.len() - 1]);
1680        }
1681
1682        None
1683    }
1684
1685    /// Currently free node can only be called with base node. Mini page should be freed differently.
1686    pub(crate) fn free_base_page(node: *mut LeafNode) {
1687        assert!(unsafe { &*node }.is_base_page());
1688        let node_size = unsafe { &*node }.meta.node_size as usize;
1689        let layout = Layout::from_size_align(node_size, std::mem::align_of::<LeafNode>()).unwrap();
1690        unsafe {
1691            std::alloc::dealloc(node as *mut u8, layout);
1692        }
1693    }
1694
1695    pub(crate) fn need_actually_merge_to_disk(&self) -> bool {
1696        for meta in self.meta_iter() {
1697            if meta.op_type().is_dirty() {
1698                return true;
1699            }
1700        }
1701
1702        false
1703    }
1704
1705    fn estimate_merge_size(&self) -> usize {
1706        let mut required_size = 0;
1707
1708        for meta in self.meta_iter() {
1709            if meta.op_type() == OpType::Insert {
1710                required_size += (meta.get_key_len() + meta.value_len()) as usize
1711                    + std::mem::size_of::<LeafKVMeta>();
1712            }
1713        }
1714
1715        required_size
1716    }
1717
1718    /// Determine if the leaf node has space of the requested_size given the full fence length
1719    /// This is required as fences could be added to a base page during consolidation
1720    pub(crate) fn full_with_fences(&self, requested_size: u16, max_fence_len: usize) -> bool {
1721        if self.meta.remaining_size < requested_size {
1722            return true;
1723        }
1724
1725        if self.meta.has_fence() {
1726            let mut empty_data_size = self.meta.remaining_size; // Counting fence keys
1727
1728            let low_key_meta = self.get_kv_meta(LOW_FENCE_IDX);
1729            if !low_key_meta.is_infinite_low_fence_key() {
1730                empty_data_size += low_key_meta.get_key_len();
1731            }
1732
1733            let high_key_meta = self.get_kv_meta(HIGH_FENCE_IDX);
1734            if !high_key_meta.is_infinite_high_fence_key() {
1735                empty_data_size += high_key_meta.get_key_len();
1736            }
1737
1738            if empty_data_size >= requested_size + max_fence_len as u16 {
1739                return false;
1740            }
1741
1742            return true;
1743        }
1744
1745        false
1746    }
1747
1748    pub(crate) fn merge_mini_page(&mut self, mini_page: &LeafNode, max_fence_len: usize) -> bool {
1749        let size_required = mini_page.estimate_merge_size();
1750
1751        if self.full_with_fences(size_required as u16, max_fence_len) {
1752            return false;
1753        }
1754
1755        for meta in mini_page.meta_iter() {
1756            let c_key = mini_page.get_full_key(meta);
1757            let c_type = meta.op_type();
1758            if !c_type.is_dirty() {
1759                // This is important. We don't want to merge cache records, not only for performance but also correctness.
1760                // A cached record might be inaccessible (thus invalid).
1761                // Consider the case where scan operation merged the mini page, which triggers split,
1762                // The scan operation want to keep the mini page (why not), but the records are all in cache mode.
1763                continue;
1764            }
1765            let c_value = mini_page.get_value(meta);
1766            let rt = self.insert(&c_key, c_value, c_type, 0);
1767            assert!(rt);
1768        }
1769
1770        true
1771    }
1772
1773    pub(crate) fn get_stats(&self) -> LeafStats {
1774        let mut keys = Vec::new();
1775        let mut values = Vec::new();
1776        let mut op_types = Vec::new();
1777        let node_size = self.meta.node_size as usize;
1778        let prefix = self.get_prefix().to_owned();
1779
1780        for meta in self.meta_iter() {
1781            let key = self.get_full_key(meta);
1782            let value = self.get_value(meta);
1783            keys.push(key);
1784            values.push(value.to_owned());
1785            op_types.push(meta.op_type());
1786        }
1787
1788        LeafStats {
1789            keys,
1790            values,
1791            op_types,
1792            prefix,
1793            base_node: None,
1794            next_level: self.next_level,
1795            node_size,
1796        }
1797    }
1798
1799    pub(crate) fn has_fence(&self) -> bool {
1800        self.meta.has_fence()
1801    }
1802
1803    /// Returns an iterator that iterates over all key-value pairs, skipping the fence keys.
1804    pub(crate) fn meta_iter(&self) -> LeafMetaIter<'_> {
1805        LeafMetaIter::new(self)
1806    }
1807
1808    fn first_meta_pos_after_fence(&self) -> u16 {
1809        if self.has_fence() {
1810            FENCE_KEY_CNT as u16
1811        } else {
1812            0
1813        }
1814    }
1815
1816    pub(crate) fn get_value_by_pos(&self, pos: u32, out_buffer: &mut [u8]) -> GetValueByPosResult {
1817        if pos >= self.meta.meta_count_with_fence() as u32 {
1818            return GetValueByPosResult::EndOfLeaf;
1819        }
1820
1821        let meta = self.get_kv_meta(pos as usize);
1822
1823        if meta.op_type().is_absent() {
1824            return GetValueByPosResult::Deleted;
1825        }
1826
1827        let value = self.get_value(meta);
1828        let value_len = meta.value_len() as usize;
1829        out_buffer[..value_len].copy_from_slice(value);
1830        GetValueByPosResult::Found(value_len as u32)
1831    }
1832}
1833
1834pub(crate) struct LeafMetaIter<'a> {
1835    node: &'a LeafNode,
1836    cur: usize,
1837}
1838
1839impl LeafMetaIter<'_> {
1840    fn new(leaf: &LeafNode) -> LeafMetaIter<'_> {
1841        let cur = leaf.first_meta_pos_after_fence();
1842        LeafMetaIter {
1843            node: leaf,
1844            cur: cur as usize,
1845        }
1846    }
1847}
1848
1849impl<'a> Iterator for LeafMetaIter<'a> {
1850    type Item = &'a LeafKVMeta;
1851
1852    fn next(&mut self) -> Option<Self::Item> {
1853        if self.cur < self.node.meta.meta_count_with_fence() as usize {
1854            let rt = self.node.get_kv_meta(self.cur);
1855            self.cur += 1;
1856            Some(rt)
1857        } else {
1858            None
1859        }
1860    }
1861}
1862
1863pub(crate) enum GetValueByPosResult {
1864    EndOfLeaf,
1865    Deleted,
1866    Found(u32),
1867}
1868
1869#[derive(PartialEq, Eq, Debug)]
1870pub enum LeafReadResult {
1871    Deleted,
1872    NotFound,
1873    Found(u32),
1874    InvalidKey,
1875}
1876
1877#[cfg(test)]
1878mod tests {
1879    use super::*;
1880    use bytemuck::cast_slice;
1881    use rstest::rstest;
1882
1883    #[test]
1884    fn test_op_type_enum() {
1885        assert_eq!(OpType::Insert as u8, 0);
1886        assert_eq!(OpType::Delete as u8, 1);
1887        assert_eq!(OpType::Cache as u8, 2);
1888    }
1889
1890    #[test]
1891    fn test_make_prefixed_meta() {
1892        let key = [1, 2, 3, 4, 5];
1893        let prefix_len = 2;
1894        let meta = LeafKVMeta::make_prefixed_meta(10, 20, &key, prefix_len, OpType::Insert);
1895
1896        assert_eq!(meta.offset, 10);
1897        assert_eq!(meta.get_key_len(), 5);
1898        assert_eq!(meta.value_len(), 20);
1899        assert_eq!(meta.preview_bytes, [3, 4]);
1900        assert_eq!(meta.op_type(), OpType::Insert);
1901        assert!(!meta.is_referenced());
1902    }
1903
1904    #[test]
1905    fn test_make_infinite_high_fence_key() {
1906        let meta = LeafKVMeta::make_infinite_high_fence_key();
1907        assert_eq!(meta.offset, u16::MAX);
1908        assert_eq!(meta.get_key_len(), 0);
1909        assert_eq!(meta.value_len(), 0);
1910        assert!(meta.is_infinite_high_fence_key());
1911    }
1912
1913    #[test]
1914    fn test_make_infinite_low_fence_key() {
1915        let meta = LeafKVMeta::make_infinite_low_fence_key();
1916        assert_eq!(meta.offset, u16::MAX - 1);
1917        assert_eq!(meta.get_key_len(), 0);
1918        assert_eq!(meta.value_len(), 0);
1919        assert!(meta.is_infinite_low_fence_key());
1920    }
1921
1922    #[test]
1923    fn test_value_len() {
1924        let mut meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Insert);
1925        assert_eq!(meta.value_len(), 20);
1926
1927        meta.set_value_len(30);
1928        assert_eq!(meta.value_len(), 30);
1929    }
1930
1931    #[test]
1932    fn test_op_type() {
1933        let meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Delete);
1934        assert_eq!(meta.op_type(), OpType::Delete);
1935    }
1936
1937    #[test]
1938    fn test_mark_as_ref_and_clear_ref() {
1939        let meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Insert);
1940        assert!(!meta.is_referenced());
1941
1942        meta.mark_as_ref();
1943        assert!(meta.is_referenced());
1944
1945        meta.clear_ref();
1946        assert!(!meta.is_referenced());
1947    }
1948
1949    #[test]
1950    fn test_mark_as_deleted_and_is_deleted() {
1951        let mut meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Insert);
1952        assert!(!meta.is_deleted());
1953
1954        meta.mark_as_deleted();
1955        assert!(meta.is_deleted());
1956    }
1957
1958    /// This test verifies that the merge split key divides
1959    /// the combined records of the base page (self) and its
1960    /// to-be-merged mini-page in half
1961    #[rstest]
1962    #[case(vec![1], vec![2], 2)]
1963    #[case(vec![2], vec![1], 2)]
1964    fn test_get_merge_split_key(
1965        #[case] base_page_values: Vec<usize>,
1966        #[case] mini_page_values: Vec<usize>,
1967        #[case] splitting_key: usize,
1968    ) {
1969        let base = unsafe { &mut *LeafNode::make_base_page(4096) };
1970        let mini = unsafe { &mut *LeafNode::make_base_page(4096) }; // Using base page as substitute
1971
1972        // Insert values to base page and mini page accordingly
1973        for i in 0..base_page_values.len() {
1974            let n = &base_page_values[i];
1975            let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
1976            let key = cast_slice::<usize, u8>(n_slice);
1977            let value = cast_slice::<usize, u8>(n_slice);
1978
1979            let rt = base.insert(key, value, OpType::Insert, 2);
1980            assert!(rt);
1981        }
1982
1983        for i in 0..mini_page_values.len() {
1984            let n = &mini_page_values[i];
1985            let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
1986            let key = cast_slice::<usize, u8>(n_slice);
1987            let value = cast_slice::<usize, u8>(n_slice);
1988
1989            let rt = mini.insert(key, value, OpType::Insert, 2);
1990            assert!(rt);
1991        }
1992
1993        // Find the splitting key
1994        let merge_split_key_byte = base.get_merge_split_key(mini);
1995        let merge_splitting_key = cast_slice::<u8, usize>(&merge_split_key_byte);
1996
1997        assert_eq!(merge_splitting_key[0], splitting_key);
1998
1999        LeafNode::free_base_page(base);
2000        LeafNode::free_base_page(mini);
2001    }
2002
2003    /// This test verifies that a base page is correctly split
2004    /// up based on a splitting key with the left side node (self)
2005    /// containing keys less than the key and right side node (new)
2006    /// containing keys greater than or equal to the splitting key
2007    #[rstest]
2008    #[case(vec![1, 2, 3, 4], 3)]
2009    #[case(vec![1], 2)]
2010    #[case(vec![2], 2)]
2011    fn test_split_with_key(#[case] base_page_values: Vec<usize>, #[case] splitting_key: usize) {
2012        let base = unsafe { &mut *LeafNode::make_base_page(4096) };
2013        let sibling = unsafe { &mut *LeafNode::make_base_page(4096) };
2014
2015        // Insert values to base page
2016        for i in 0..base_page_values.len() {
2017            let n = &base_page_values[i];
2018            let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
2019            let key = cast_slice::<usize, u8>(n_slice);
2020            let value = cast_slice::<usize, u8>(n_slice);
2021
2022            let rt = base.insert(key, value, OpType::Insert, 2);
2023            assert!(rt);
2024        }
2025
2026        let splitting_key_ptr = &splitting_key;
2027        let splitting_key_slice =
2028            unsafe { std::slice::from_raw_parts(splitting_key_ptr as *const usize, 1) };
2029        let splitting_key_byte_arrary = cast_slice::<usize, u8>(splitting_key_slice).to_vec();
2030
2031        // Split the base page using the splitting key
2032        base.split_with_key(sibling, &splitting_key_byte_arrary, false);
2033
2034        // All values less than splitting key are in the left side node (self)
2035        // while those greater than or equal to the key are in the sibling node
2036        let mut out_buffer = vec![0u8; 1024];
2037        for i in 0..base_page_values.len() {
2038            let mut page = &mut *base;
2039            if base_page_values[i] >= splitting_key {
2040                page = &mut *sibling;
2041            }
2042
2043            let n = &base_page_values[i];
2044            let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
2045            let key = cast_slice::<usize, u8>(n_slice);
2046
2047            let rt = page.read_by_key(key, &mut out_buffer);
2048
2049            assert_eq!(rt, LeafReadResult::Found(key.len() as u32)); // key and value were set the same
2050            assert_eq!(&out_buffer[0..key.len()], key);
2051        }
2052
2053        LeafNode::free_base_page(base);
2054        LeafNode::free_base_page(sibling);
2055    }
2056}