Skip to main content

bf_tree/nodes/
leaf_node.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT license.
3
4use core::panic;
5use std::{alloc::Layout, cmp::Ordering, sync::atomic};
6
7use crate::{
8    circular_buffer::CircularBufferPtr, counter, error::TreeError, range_scan::ScanReturnField,
9    snapshot::CPRSnapShotMgr, snapshot::INVALID_SNAPSHOT_VERSION, utils::stats::LeafStats,
10};
11
12use super::{node_meta::NodeMeta, FENCE_KEY_CNT};
13
14/// Invariant: leaf page can only have Insert and Delete type.
15///            mini page can have all.
16#[repr(u8)]
17#[derive(PartialEq, Eq, Debug, Clone, Copy)]
18pub(crate) enum OpType {
19    Insert = 0,
20    Delete = 1,
21    Cache = 2,   // the clean version of insert.
22    Phantom = 3, // the clean version of delete.
23}
24
25impl OpType {
26    pub(crate) fn is_dirty(&self) -> bool {
27        match self {
28            OpType::Insert | OpType::Delete => true,
29            OpType::Cache | OpType::Phantom => false,
30        }
31    }
32
33    fn is_absent(&self) -> bool {
34        match self {
35            OpType::Insert | OpType::Cache => false,
36            OpType::Phantom | OpType::Delete => true,
37        }
38    }
39
40    fn is_cache(&self) -> bool {
41        match self {
42            OpType::Cache | OpType::Phantom => true,
43            OpType::Insert | OpType::Delete => false,
44        }
45    }
46}
47
48const PREVIEW_SIZE: usize = 2;
49
50const LOW_FENCE_IDX: usize = 0;
51const HIGH_FENCE_IDX: usize = 1;
52
53const OP_TYPE_SHIFT: u16 = 14;
54const KEY_LEN_MASK: u16 = 0x3F_FF; // lower 14 bits on the key_len
55
56// highest bit on the value_len;
57const REF_BIT_MASK: u16 = 0x80_00;
58// third highest bit on the value_len;
59const VALUE_LEN_MASK: u16 = 0x7F_FF; // lower 15 bits on the value_len;
60
61pub(crate) fn common_prefix_len(low_fence: &[u8], high_fence: &[u8]) -> u16 {
62    let mut prefix_len = 0;
63    for i in 0..std::cmp::min(low_fence.len(), high_fence.len()) {
64        if low_fence[i] == high_fence[i] {
65            prefix_len += 1;
66        } else {
67            break;
68        }
69    }
70    prefix_len
71}
72
73#[repr(C)]
74pub(crate) struct LeafKVMeta {
75    offset: u16,
76    op_type_key_len_in_byte: u16, // Highest 2 bits: op_type, Lower 14 bits: Key length in bytes.
77    ref_value_len_in_byte: std::sync::atomic::AtomicU16, // Highest bit: ref, Lower 15 bits: value length in bytes. here we don't use shuttle's AtomicU16, because this is not a real sync point.
78    preview_bytes: [u8; PREVIEW_SIZE],
79}
80
81impl LeafKVMeta {
82    pub(crate) fn make_prefixed_meta(
83        offset: u16,
84        value_len: u16,
85        key: &[u8],
86        prefix_len: u16,
87        op_type: OpType,
88    ) -> Self {
89        let mut meta = Self {
90            offset,
91            op_type_key_len_in_byte: key.len() as u16 | ((op_type as u16) << OP_TYPE_SHIFT),
92            ref_value_len_in_byte: std::sync::atomic::AtomicU16::new(value_len),
93            preview_bytes: [0; PREVIEW_SIZE],
94        };
95
96        for i in 0..std::cmp::min(key.len().saturating_sub(prefix_len as usize), PREVIEW_SIZE) {
97            meta.preview_bytes[i] = key[i + prefix_len as usize];
98        }
99
100        // The initial value is not referenced, this is important because during the Garbage reclaim of the delta chain,
101        // we will call Insert to reset the states, which calls this function.
102        meta.clear_ref();
103        meta
104    }
105
106    pub fn make_infinite_high_fence_key() -> Self {
107        assert_eq!(std::mem::size_of::<Self>(), super::KV_META_SIZE);
108
109        Self {
110            offset: u16::MAX,
111            op_type_key_len_in_byte: 0,
112            ref_value_len_in_byte: std::sync::atomic::AtomicU16::new(0),
113            preview_bytes: [0; PREVIEW_SIZE],
114        }
115    }
116
117    pub fn make_infinite_low_fence_key() -> Self {
118        Self {
119            offset: u16::MAX - 1,
120            op_type_key_len_in_byte: 0,
121            ref_value_len_in_byte: std::sync::atomic::AtomicU16::new(0),
122            preview_bytes: [0; PREVIEW_SIZE],
123        }
124    }
125
126    pub fn is_infinite_low_fence_key(&self) -> bool {
127        self.offset == u16::MAX - 1
128    }
129
130    pub fn is_infinite_high_fence_key(&self) -> bool {
131        self.offset == u16::MAX
132    }
133
134    pub fn value_len(&self) -> u16 {
135        self.ref_value_len_in_byte.load(atomic::Ordering::Relaxed) & VALUE_LEN_MASK
136    }
137
138    pub fn set_value_len(&mut self, value: u16) {
139        let v = self.ref_value_len_in_byte.load(atomic::Ordering::Relaxed);
140        self.ref_value_len_in_byte.store(
141            (v & !VALUE_LEN_MASK) | (value & VALUE_LEN_MASK),
142            atomic::Ordering::Relaxed,
143        );
144    }
145
146    pub fn set_op_type(&mut self, op_type: OpType) {
147        self.op_type_key_len_in_byte = self.get_key_len() | ((op_type as u16) << OP_TYPE_SHIFT);
148    }
149
150    pub fn op_type(&self) -> OpType {
151        let l = self.op_type_key_len_in_byte;
152        let b = l >> OP_TYPE_SHIFT;
153        unsafe { std::mem::transmute(b as u8) }
154    }
155
156    pub fn mark_as_ref(&self) {
157        self.ref_value_len_in_byte
158            .fetch_or(REF_BIT_MASK, atomic::Ordering::Relaxed);
159    }
160
161    pub fn clear_ref(&self) {
162        self.ref_value_len_in_byte
163            .fetch_and(!REF_BIT_MASK, atomic::Ordering::Relaxed);
164    }
165
166    pub fn is_referenced(&self) -> bool {
167        self.ref_value_len_in_byte.load(atomic::Ordering::Relaxed) & REF_BIT_MASK != 0
168    }
169
170    pub fn mark_as_deleted(&mut self) {
171        self.set_op_type(OpType::Delete);
172    }
173
174    #[allow(dead_code)]
175    pub fn is_deleted(&self) -> bool {
176        self.op_type() == OpType::Delete
177    }
178
179    pub(crate) fn get_offset(&self) -> u16 {
180        self.offset
181    }
182
183    pub(crate) fn get_key_len(&self) -> u16 {
184        self.op_type_key_len_in_byte & KEY_LEN_MASK
185    }
186}
187
188#[derive(Debug, Clone, Copy)]
189pub(crate) struct MiniPageNextLevel {
190    val: usize,
191}
192
193impl MiniPageNextLevel {
194    pub(crate) fn new(val: usize) -> Self {
195        Self { val }
196    }
197
198    pub(crate) fn as_offset(&self) -> usize {
199        assert!(!self.is_null());
200        self.val
201    }
202
203    pub(crate) fn new_null() -> Self {
204        Self { val: usize::MAX }
205    }
206
207    pub(crate) fn is_null(&self) -> bool {
208        self.val == usize::MAX
209    }
210}
211
212#[repr(C)]
213pub(crate) struct LeafNode {
214    pub(crate) meta: NodeMeta,
215    prefix_len: u16,
216    pub(crate) next_level: MiniPageNextLevel,
217    pub(crate) lsn: u64,
218    snapshot_version: u64,
219    data: [u8; 0],
220}
221
222const _: () = assert!(std::mem::size_of::<LeafNode>() == 32);
223
224impl LeafNode {
225    fn max_data_size(node_size: usize) -> usize {
226        node_size - std::mem::size_of::<LeafNode>()
227    }
228
229    pub(crate) fn initialize_mini_page(
230        ptr: &CircularBufferPtr,
231        node_size: usize,
232        next_level: MiniPageNextLevel,
233        cache_only: bool,
234    ) {
235        unsafe {
236            Self::init_node_with_fence(
237                ptr.as_ptr(),
238                &[],
239                &[],
240                node_size,
241                next_level,
242                false,
243                cache_only,
244            )
245        }
246    }
247
248    pub(crate) fn make_base_page(node_size: usize) -> *mut Self {
249        let layout = Layout::from_size_align(node_size, std::mem::align_of::<LeafNode>()).unwrap();
250        let ptr = unsafe { std::alloc::alloc(layout) };
251        unsafe {
252            Self::init_node_with_fence(
253                ptr,
254                &[],
255                &[],
256                node_size,
257                MiniPageNextLevel::new_null(),
258                true,
259                false,
260            );
261        }
262        ptr as *mut Self
263    }
264
265    /// After promoting a base page to full page cache, we need to mark every records as cache.
266    pub(crate) fn covert_insert_records_to_cache(&mut self) {
267        let mut cur = self.first_meta_pos_after_fence();
268
269        while cur < self.meta.meta_count_with_fence() {
270            let meta = self.get_kv_meta_mut(cur as usize);
271            let op_type = meta.op_type();
272
273            match op_type {
274                OpType::Insert => {
275                    meta.set_op_type(OpType::Cache);
276                }
277                OpType::Delete => {
278                    meta.set_op_type(OpType::Phantom);
279                }
280                OpType::Cache | OpType::Phantom => {
281                    unreachable!("Base page should not have op type: {:?}", op_type);
282                }
283            };
284
285            cur += 1;
286        }
287    }
288
289    /// When merge full page cache to base page, we need to set clean records to dirty records.
290    pub(crate) fn convert_cache_records_to_insert(&mut self) {
291        let mut cur = self.first_meta_pos_after_fence();
292
293        while cur < self.meta.meta_count_with_fence() {
294            let meta = self.get_kv_meta_mut(cur as usize);
295            let op_type = meta.op_type();
296
297            match op_type {
298                OpType::Cache => {
299                    meta.set_op_type(OpType::Insert);
300                }
301                OpType::Phantom => {
302                    meta.set_op_type(OpType::Delete);
303                }
304                OpType::Insert | OpType::Delete => {
305                    // other types might present.
306                }
307            };
308
309            cur += 1;
310        }
311    }
312
313    unsafe fn init_node_with_fence(
314        ptr: *mut u8,
315        low_fence: &[u8],
316        high_fence: &[u8],
317        node_size: usize,
318        next_level: MiniPageNextLevel,
319        has_fence: bool,
320        cache_only: bool,
321    ) {
322        let ptr = ptr as *mut Self;
323
324        { &mut *ptr }.initialize(
325            low_fence, high_fence, node_size, next_level, has_fence, cache_only,
326        );
327    }
328
329    pub(crate) fn get_kv_meta(&self, index: usize) -> &LeafKVMeta {
330        debug_assert!(index < self.meta.meta_count_with_fence() as usize);
331        let meta_ptr = self.data.as_ptr() as *const LeafKVMeta;
332        unsafe { &*meta_ptr.add(index) }
333    }
334
335    pub(crate) fn get_full_key(&self, meta: &LeafKVMeta) -> Vec<u8> {
336        let prefix = self.get_prefix();
337        let remaining = self.get_remaining_key(meta);
338        [prefix, remaining].concat()
339    }
340
341    /// Get the full key for low fence which is not prefix compressed
342    pub(crate) fn get_low_fence_full_key(&self) -> Vec<u8> {
343        debug_assert!(LOW_FENCE_IDX < self.meta.meta_count_with_fence() as usize);
344        let meta_ptr = self.data.as_ptr() as *const LeafKVMeta;
345        let meta = unsafe { &*meta_ptr.add(LOW_FENCE_IDX) };
346
347        let key_offset = meta.get_offset();
348        let key_ptr = unsafe { self.data.as_ptr().add(key_offset as usize) };
349        unsafe {
350            let key = std::slice::from_raw_parts(key_ptr, (meta.get_key_len()) as usize);
351            [key].concat()
352        }
353    }
354
355    pub(crate) fn get_kv_meta_mut(&mut self, index: usize) -> &mut LeafKVMeta {
356        debug_assert!(index < self.meta.meta_count_with_fence() as usize);
357        let meta_ptr = self.data.as_mut_ptr() as *mut LeafKVMeta;
358        unsafe { meta_ptr.add(index).as_mut().unwrap() }
359    }
360
361    pub(crate) fn write_initial_kv_meta(&mut self, index: usize, meta: LeafKVMeta) {
362        let meta_ptr = self.data.as_mut_ptr() as *mut LeafKVMeta;
363        unsafe { meta_ptr.add(index).write(meta) };
364    }
365
366    pub(crate) fn get_remaining_key(&self, meta: &LeafKVMeta) -> &[u8] {
367        let key_offset = meta.get_offset();
368        let key_ptr = unsafe { self.data.as_ptr().add(key_offset as usize) };
369        unsafe {
370            std::slice::from_raw_parts(key_ptr, (meta.get_key_len() - self.prefix_len) as usize)
371        }
372    }
373
374    pub(crate) fn get_prefix(&self) -> &[u8] {
375        let m = self.get_kv_meta(LOW_FENCE_IDX);
376        let key_offset = m.get_offset();
377        unsafe {
378            std::slice::from_raw_parts(
379                self.data.as_ptr().add(key_offset as usize),
380                self.prefix_len as usize,
381            )
382        }
383    }
384
385    pub(crate) fn install_fence_key(&mut self, key: &[u8], is_high_fence: bool) {
386        let loc: u16 = self.meta.meta_count_with_fence();
387        if !is_high_fence {
388            debug_assert!(self.meta.meta_count_with_fence() == LOW_FENCE_IDX as u16);
389        } else {
390            debug_assert!(self.meta.meta_count_with_fence() == HIGH_FENCE_IDX as u16);
391        }
392
393        let cur_low_offset = self.current_lowest_offset();
394
395        self.meta.increment_value_count();
396        self.meta.remaining_size -= std::mem::size_of::<LeafKVMeta>() as u16;
397
398        if key.is_empty() {
399            let fence = if is_high_fence {
400                LeafKVMeta::make_infinite_high_fence_key()
401            } else {
402                LeafKVMeta::make_infinite_low_fence_key()
403            };
404            self.write_initial_kv_meta(loc as usize, fence);
405        } else {
406            let prefix_len = if is_high_fence { self.prefix_len } else { 0 }; // we don't compress low fence.
407
408            let post_fix_len = key.len() as u16 - prefix_len;
409            let val_len = 0u16;
410            let kv_len = post_fix_len + val_len;
411
412            let remaining = self.meta.remaining_size;
413
414            debug_assert!(kv_len <= remaining);
415
416            let offset = cur_low_offset - kv_len;
417
418            let new_meta =
419                LeafKVMeta::make_prefixed_meta(offset, 0, key, prefix_len, OpType::Insert);
420
421            self.write_initial_kv_meta(loc as usize, new_meta);
422
423            unsafe {
424                let start_ptr = self.data.as_mut_ptr().add(offset as usize);
425
426                let key_slice = std::slice::from_raw_parts(
427                    key.as_ptr().add(prefix_len as usize),
428                    post_fix_len as usize,
429                );
430                std::ptr::copy_nonoverlapping(key_slice.as_ptr(), start_ptr, post_fix_len as usize);
431            }
432
433            self.meta.remaining_size -= kv_len;
434        }
435    }
436
437    pub(crate) fn current_lowest_offset(&self) -> u16 {
438        let value_count = self.meta.meta_count_with_fence();
439        let rt =
440            (value_count * std::mem::size_of::<LeafKVMeta>() as u16) + self.meta.remaining_size;
441
442        // Sanity check
443        #[cfg(debug_assertions)]
444        {
445            let mut min_offset = LeafNode::max_data_size(self.meta.node_size as usize) as u16;
446            for i in 0..value_count {
447                let kv_meta = self.get_kv_meta(i as usize);
448                if kv_meta.is_infinite_low_fence_key() || kv_meta.is_infinite_high_fence_key() {
449                    continue;
450                }
451                min_offset = std::cmp::min(min_offset, kv_meta.offset);
452            }
453            assert!(min_offset == rt);
454        }
455        rt
456    }
457
458    pub(crate) fn get_value(&self, meta: &LeafKVMeta) -> &[u8] {
459        let val_offset = meta.get_offset() + meta.get_key_len() - self.prefix_len;
460        let val_ptr = unsafe { self.data.as_ptr().add(val_offset as usize) };
461        unsafe { std::slice::from_raw_parts(val_ptr, meta.value_len() as usize) }
462    }
463
464    /// A good split key has two properties:
465    /// (1). it split the nodes into two roughly equal sizes.
466    /// (2). it should be short, so that parent node search can be faster.
467    ///
468    /// Get the split key is tricky:
469    /// 1. We can't just use the middle key, because the key/value are variable length. We want the key that split the node in size, not in key count.
470    /// 2. We don't try to find small split key here.
471    ///
472    /// Returns the key that split the node into two roughly equal size, and the new node count.
473    /// This is only invoked once for splitting the root node.
474    pub fn get_split_key(&self, cache_only: bool) -> (Vec<u8>, u16) {
475        let mut data_size = 0;
476
477        for meta in self.meta_iter() {
478            let key_len = meta.get_key_len();
479            let value_len = meta.value_len();
480
481            data_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
482        }
483
484        let split_target_size = data_size / 2;
485        let mut cur_size = 0;
486
487        for (i, meta) in self.meta_iter().enumerate() {
488            let key_len = meta.get_key_len();
489            let value_len = meta.value_len();
490
491            cur_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
492
493            // Here we guarantee in cache-only mode, the root node has at least two keys
494            if cache_only && i == 0 {
495                continue;
496            }
497
498            if cur_size >= split_target_size {
499                return (self.get_full_key(meta), i as u16);
500            }
501        }
502        unreachable!();
503    }
504
505    /// Get # of keys strictly smaller than a merge_split_key
506    #[allow(clippy::unused_enumerate_index)]
507    pub fn get_kv_num_below_key(&self, merge_split_key: &Vec<u8>) -> u16 {
508        // Linear search
509        let mut cnt: u16 = 0;
510        for (_, meta) in self.meta_iter().enumerate() {
511            let key = self.get_full_key(meta);
512            let cmp = key.cmp(merge_split_key);
513            // Pick all records from the base page whose key is smaller
514            // than merge_split_key
515            if cmp == std::cmp::Ordering::Less {
516                cnt += 1;
517            } else {
518                break;
519            }
520        }
521        cnt
522    }
523
524    /// [Cache-only mode]: Find the splitting key that evenly splits all the records in the
525    /// current node and the to-be-inserted record in half. The caller needs to guarantee that
526    /// there are at least two records with different such that it won't result in an empty page
527    /// after split
528    pub fn get_cache_only_insert_split_key(&self, key: &[u8], new_record_size: &u16) -> Vec<u8> {
529        let mut merge_split_key_1: Option<Vec<u8>> = None;
530        let mut merge_split_key_2: Option<Vec<u8>> = None;
531        let mut diff_1: i16 = i16::MAX;
532        let mut diff_2: i16 = i16::MAX;
533
534        // The total size of all records including the new record to insert
535        let mut total_merged_size: u16 = 0;
536
537        for meta in self.meta_iter() {
538            let key_len = meta.get_key_len();
539            let value_len = meta.value_len();
540
541            total_merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
542        }
543
544        total_merged_size += new_record_size + std::mem::size_of::<LeafKVMeta>() as u16;
545        let split_target_size = total_merged_size / 2;
546
547        // Search for the splitting key
548        let mut merged_size: u16 = 0;
549        let mut self_meta_iter = self.meta_iter();
550        let mut self_meta_option = self_meta_iter.next();
551
552        // Go through all records in the base page whose keys are smaller than the new record's key
553        if self_meta_option.is_some() {
554            let mut cur_base_meta = self_meta_option.unwrap();
555            let mut cur_base_key = self.get_full_key(cur_base_meta);
556            let mut cmp = cur_base_key.as_slice().cmp(key);
557
558            while cmp == std::cmp::Ordering::Less {
559                let key_len = cur_base_meta.get_key_len();
560                let value_len = cur_base_meta.value_len();
561                merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
562                if merged_size >= split_target_size {
563                    // Two split key candidates are already found
564                    // Stop
565                    if merge_split_key_2.is_some() {
566                        break;
567                    }
568
569                    let left_side: u16 = merged_size
570                        - key_len
571                        - value_len
572                        - std::mem::size_of::<LeafKVMeta>() as u16;
573                    let right_side = total_merged_size - left_side;
574
575                    if merge_split_key_1.is_none() {
576                        merge_split_key_1 = Some(cur_base_key);
577                        diff_1 = (left_side as i16 - right_side as i16).abs();
578                    } else {
579                        merge_split_key_2 = Some(cur_base_key);
580                        diff_2 = (left_side as i16 - right_side as i16).abs();
581                    }
582                }
583
584                self_meta_option = self_meta_iter.next();
585                if let Some(meta) = self_meta_option {
586                    cur_base_meta = meta;
587                    cur_base_key = self.get_full_key(cur_base_meta);
588                    cmp = cur_base_key.as_slice().cmp(key);
589                } else {
590                    break;
591                }
592            }
593        }
594
595        // Count the new key
596        if merge_split_key_2.is_none() {
597            merged_size += new_record_size + std::mem::size_of::<LeafKVMeta>() as u16;
598
599            if merged_size >= split_target_size {
600                // Two split key candidates are already found
601                // Stop
602
603                let left_side: u16 =
604                    merged_size - new_record_size - std::mem::size_of::<LeafKVMeta>() as u16;
605                let right_side = total_merged_size - left_side;
606
607                if merge_split_key_1.is_none() {
608                    merge_split_key_1 = Some(key.to_vec());
609                    diff_1 = (left_side as i16 - right_side as i16).abs();
610                } else {
611                    merge_split_key_2 = Some(key.to_vec());
612                    diff_2 = (left_side as i16 - right_side as i16).abs();
613                }
614            }
615        }
616
617        // Go through the rest of records in the base page
618        while self_meta_option.is_some() {
619            let base_meta = self_meta_option.unwrap();
620            let key_len = base_meta.get_key_len();
621            let value_len = base_meta.value_len();
622            merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
623
624            if merged_size >= split_target_size {
625                // Return the splitting key
626                let cur_base_key = self.get_full_key(base_meta);
627                if merge_split_key_2.is_some() {
628                    break;
629                }
630
631                let left_side: u16 =
632                    merged_size - key_len - value_len - std::mem::size_of::<LeafKVMeta>() as u16;
633                let right_side = total_merged_size - left_side;
634
635                if merge_split_key_1.is_none() {
636                    merge_split_key_1 = Some(cur_base_key);
637                    diff_1 = (left_side as i16 - right_side as i16).abs();
638                } else {
639                    merge_split_key_2 = Some(cur_base_key);
640                    diff_2 = (left_side as i16 - right_side as i16).abs();
641                }
642            }
643            self_meta_option = self_meta_iter.next();
644        }
645
646        // Pick the splitting that achieves the smallest size difference between
647        // the two halves
648        if merge_split_key_1.is_none() {
649            panic!(
650                "Fail to find a splitting key for merging mini and base page.{}, {}",
651                merged_size, split_target_size
652            );
653        }
654
655        if merge_split_key_2.is_none() || diff_1 < diff_2 {
656            return merge_split_key_1.unwrap();
657        }
658
659        merge_split_key_2.unwrap()
660    }
661
662    /// Find the splitting key that divides the merged records of
663    /// the mini-page and its correponding base page evenly in two
664    /// groups (base pages). Special considerations go to the split-key
665    /// bearing (k,v) pair.
666    /// Caller needs to ensure there are at least two distinct keys among the
667    /// mini page and self.
668    #[allow(clippy::unnecessary_unwrap)]
669    pub(crate) fn get_merge_split_key(&mut self, mini_page: &LeafNode) -> Vec<u8> {
670        let mut merge_split_key_1: Option<Vec<u8>> = None;
671        let mut merge_split_key_2: Option<Vec<u8>> = None;
672        let mut diff_1: i16 = i16::MAX;
673        let mut diff_2: i16 = i16::MAX;
674
675        let mut total_merged_size: u16 = 0;
676        let mut base_meta_iter = self.meta_iter();
677        let mut cur_pos = base_meta_iter.cur;
678        let mut cur_base_meta_option = base_meta_iter.next();
679        let mut duplicate_positions = vec![];
680
681        // Calculate the size of all distinctively merged records through merge-sort
682        for mini_meta in mini_page.meta_iter() {
683            let c_type = mini_meta.op_type();
684            // Skip cached or phantom records as they will not be merged into base pages
685            if !c_type.is_dirty() {
686                continue;
687            }
688
689            let cur_mini_key = mini_page.get_full_key(mini_meta);
690            if cur_base_meta_option.is_some() {
691                let mut cur_base_meta = cur_base_meta_option.unwrap();
692                let mut cur_base_key = self.get_full_key(cur_base_meta);
693                let mut cmp = cur_base_key.cmp(&cur_mini_key);
694
695                // Go through all records from the base page whose key is strictly smaller
696                // than the current record from the mini page
697                while cmp == std::cmp::Ordering::Less {
698                    let key_len = cur_base_meta.get_key_len();
699                    let value_len = cur_base_meta.value_len();
700                    total_merged_size +=
701                        key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
702
703                    cur_pos = base_meta_iter.cur;
704                    cur_base_meta_option = base_meta_iter.next();
705                    if cur_base_meta_option.is_some() {
706                        cur_base_meta = cur_base_meta_option.unwrap();
707                        cur_base_key = self.get_full_key(cur_base_meta);
708                        cmp = cur_base_key.cmp(&cur_mini_key);
709                    } else {
710                        break;
711                    }
712                }
713
714                // If the comparison is equal then advance the base page iterator to avoid counting it
715                // twice
716                if cmp == std::cmp::Ordering::Equal && cur_base_meta_option.is_some() {
717                    // Mark the duplicate entry for deletion
718                    duplicate_positions.push(cur_pos);
719                    cur_pos = base_meta_iter.cur;
720                    cur_base_meta_option = base_meta_iter.next();
721                }
722            }
723
724            let key_len = mini_meta.get_key_len();
725            let value_len = mini_meta.value_len();
726            total_merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
727        }
728
729        // Mini-page records are exhuasted, go through the rest of
730        // records from the base page, if any
731        while cur_base_meta_option.is_some() {
732            let base_meta = cur_base_meta_option.unwrap();
733            let key_len = base_meta.get_key_len();
734            let value_len = base_meta.value_len();
735            total_merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
736
737            cur_base_meta_option = base_meta_iter.next();
738        }
739
740        // The target split size is half of all the merged records
741        let split_target_size = total_merged_size / 2;
742
743        // Merge sort the distinct records from the mini page and the base page
744        // until the size of the sorted records reaches the target split size
745        let mut merged_size: u16 = 0;
746        base_meta_iter = self.meta_iter();
747        cur_base_meta_option = base_meta_iter.next();
748
749        for mini_meta in mini_page.meta_iter() {
750            // Skip cached or phantom records as they will
751            // not take additional space after merging
752            let c_type = mini_meta.op_type();
753            if !c_type.is_dirty() {
754                continue;
755            }
756
757            let cur_mini_key = mini_page.get_full_key(mini_meta);
758            if cur_base_meta_option.is_some() {
759                let mut cur_base_meta = cur_base_meta_option.unwrap();
760                let mut cur_base_key = self.get_full_key(cur_base_meta);
761                let mut cmp = cur_base_key.cmp(&cur_mini_key);
762                // Go through all records from the base page whose key is strictly smaller
763                // than the current record from the mini page
764                while cmp == std::cmp::Ordering::Less {
765                    let key_len = cur_base_meta.get_key_len();
766                    let value_len = cur_base_meta.value_len();
767                    merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
768                    if merged_size >= split_target_size {
769                        // Two split key candidates are already found
770                        // Stop
771                        if merge_split_key_2.is_some() {
772                            break;
773                        }
774
775                        let left_side: u16 = merged_size
776                            - key_len
777                            - value_len
778                            - std::mem::size_of::<LeafKVMeta>() as u16;
779                        let right_side = total_merged_size - left_side;
780
781                        if merge_split_key_1.is_none() {
782                            merge_split_key_1 = Some(cur_base_key);
783                            diff_1 = (left_side as i16 - right_side as i16).abs();
784                        } else {
785                            merge_split_key_2 = Some(cur_base_key);
786                            diff_2 = (left_side as i16 - right_side as i16).abs();
787                        }
788                    }
789
790                    cur_base_meta_option = base_meta_iter.next();
791                    if cur_base_meta_option.is_some() {
792                        cur_base_meta = cur_base_meta_option.unwrap();
793                        cur_base_key = self.get_full_key(cur_base_meta);
794                        cmp = cur_base_key.cmp(&cur_mini_key);
795                    } else {
796                        break;
797                    }
798                }
799
800                // If the comparison is equal then advance the base page iterator to avoid counting it
801                // twice
802                if cmp == std::cmp::Ordering::Equal && cur_base_meta_option.is_some() {
803                    cur_base_meta_option = base_meta_iter.next();
804                }
805            }
806
807            let key_len = mini_meta.get_key_len();
808            let value_len = mini_meta.value_len();
809            merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
810
811            if merged_size >= split_target_size {
812                // Two split key candidates are already found
813                // Stop
814                if merge_split_key_2.is_some() {
815                    break;
816                }
817
818                let left_side: u16 =
819                    merged_size - key_len - value_len - std::mem::size_of::<LeafKVMeta>() as u16;
820                let right_side = total_merged_size - left_side;
821
822                if merge_split_key_1.is_none() {
823                    merge_split_key_1 = Some(cur_mini_key);
824                    diff_1 = (left_side as i16 - right_side as i16).abs();
825                } else {
826                    merge_split_key_2 = Some(cur_mini_key);
827                    diff_2 = (left_side as i16 - right_side as i16).abs();
828                }
829            }
830        }
831
832        // Mini-page records are exhuasted, go through the rest of
833        // records from the base page, if any
834        while cur_base_meta_option.is_some() {
835            let base_meta = cur_base_meta_option.unwrap();
836            let key_len = base_meta.get_key_len();
837            let value_len = base_meta.value_len();
838            merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
839
840            if merged_size >= split_target_size {
841                // Return the splitting key
842                let cur_base_key = self.get_full_key(base_meta);
843                if merge_split_key_2.is_some() {
844                    break;
845                }
846
847                let left_side: u16 =
848                    merged_size - key_len - value_len - std::mem::size_of::<LeafKVMeta>() as u16;
849                let right_side = total_merged_size - left_side;
850
851                if merge_split_key_1.is_none() {
852                    merge_split_key_1 = Some(cur_base_key);
853                    diff_1 = (left_side as i16 - right_side as i16).abs();
854                } else {
855                    merge_split_key_2 = Some(cur_base_key);
856                    diff_2 = (left_side as i16 - right_side as i16).abs();
857                }
858            }
859            cur_base_meta_option = base_meta_iter.next();
860        }
861
862        // Delete all duplicate entries from the base page
863        for pos in duplicate_positions {
864            let pos_meta = self.get_kv_meta_mut(pos);
865            pos_meta.mark_as_deleted();
866        }
867
868        if merge_split_key_1.is_none() {
869            unreachable!(
870                "Fail to find a splitting key for merging mini and base page.{}, {}",
871                merged_size, split_target_size
872            );
873        }
874
875        // The two split keys must be different
876        if merge_split_key_2.is_some() {
877            let cmp = merge_split_key_1
878                .as_ref()
879                .unwrap()
880                .cmp(merge_split_key_2.as_ref().unwrap());
881            assert_ne!(cmp, std::cmp::Ordering::Equal);
882        }
883
884        let mut splitting_key = if merge_split_key_2.is_none() || diff_1 < diff_2 {
885            merge_split_key_1.as_ref().unwrap()
886        } else {
887            merge_split_key_2.as_ref().unwrap()
888        };
889
890        // The splitting key cannot be the low fence as it leads to invalid page
891        // and duplicate entries in inner node
892        let mut fence_meta = self.get_kv_meta(LOW_FENCE_IDX);
893        if !fence_meta.is_infinite_low_fence_key() {
894            let low_fence_key = self.get_low_fence_key();
895            let cmp = splitting_key.cmp(&low_fence_key);
896            if cmp == std::cmp::Ordering::Equal {
897                splitting_key = merge_split_key_2.as_ref().unwrap();
898            }
899        }
900
901        fence_meta = self.get_kv_meta(HIGH_FENCE_IDX);
902        if !fence_meta.is_infinite_high_fence_key() {
903            let high_fence_key = self.get_high_fence_key();
904            let cmp = splitting_key.cmp(&high_fence_key);
905            assert_ne!(cmp, std::cmp::Ordering::Equal);
906        }
907
908        splitting_key.clone()
909    }
910
911    pub fn get_key_to_reach_this_node(&self) -> Vec<u8> {
912        let meta = self.get_kv_meta(self.first_meta_pos_after_fence() as usize);
913        self.get_full_key(meta)
914    }
915
916    /// In cache-only mode, transient state could be observed when one thread is in the middle of
917    /// consolidating the leaf node, while the evicting callback thread is attempting a
918    /// unprotected read. As such, this function guarantees no panic happens during the key read
919    pub fn try_get_key_to_reach_this_node(&self) -> Result<Vec<u8>, TreeError> {
920        assert!(self.meta.is_cache_only_leaf());
921
922        // Get the meta of the first key
923        let index = 0;
924        if index >= self.meta.meta_count_with_fence() as usize {
925            return Err(TreeError::NeedRestart);
926        }
927
928        let meta_ptr = self.data.as_ptr() as *const LeafKVMeta;
929        let meta = unsafe { &*meta_ptr.add(index) };
930
931        let key_offset = meta.get_offset();
932        let key_len = meta.get_key_len();
933
934        if key_offset + key_len > LeafNode::max_data_size(self.meta.node_size as usize) as u16 {
935            return Err(TreeError::NeedRestart);
936        }
937
938        let key_ptr = unsafe { self.data.as_ptr().add(key_offset as usize) };
939        let key_slice =
940            unsafe { std::slice::from_raw_parts(key_ptr, (key_len - self.prefix_len) as usize) };
941        Ok(key_slice.to_vec())
942    }
943
944    pub fn get_low_fence_key(&self) -> Vec<u8> {
945        assert!(self.has_fence());
946        let fence_meta = self.get_kv_meta(LOW_FENCE_IDX);
947        if fence_meta.is_infinite_low_fence_key() {
948            vec![]
949        } else {
950            unsafe {
951                let start_ptr = self.data.as_ptr().add(fence_meta.offset as usize);
952                let key_slice =
953                    std::slice::from_raw_parts(start_ptr, fence_meta.get_key_len() as usize);
954                key_slice.to_vec()
955            }
956        }
957    }
958
959    pub fn get_high_fence_key(&self) -> Vec<u8> {
960        assert!(self.has_fence());
961        let fence_meta = self.get_kv_meta(HIGH_FENCE_IDX);
962        if fence_meta.is_infinite_high_fence_key() {
963            vec![]
964        } else {
965            self.get_full_key(fence_meta)
966        }
967    }
968
969    /// Compares the input key with the existing key specified by `meta`
970    /// By convention, key_cmp(meta, key) returns the ordering matching the expression meta <operator> key if true.
971    #[inline]
972    pub(crate) fn key_cmp(&self, meta: &LeafKVMeta, key: &[u8]) -> Ordering {
973        let search_key_prefix = &key[(self.prefix_len as usize)..]
974            [..std::cmp::min(key.len() - self.prefix_len as usize, PREVIEW_SIZE)];
975
976        let prefix_key = &meta.preview_bytes[..std::cmp::min(
977            PREVIEW_SIZE,
978            meta.get_key_len() as usize - self.prefix_len as usize,
979        )];
980        let mut cmp = prefix_key.cmp(search_key_prefix);
981
982        // If the prefix matches, compare the full key
983        if cmp == Ordering::Equal {
984            let full_key = self.get_remaining_key(meta);
985            let search_key_postfix = &key[self.prefix_len as usize..];
986            cmp = full_key.cmp(search_key_postfix);
987        }
988        cmp
989    }
990
991    pub(crate) fn linear_lower_bound(&self, key: &[u8]) -> u16 {
992        debug_assert!(key.len() >= self.prefix_len as usize);
993
994        let mut index = self.first_meta_pos_after_fence();
995
996        while index < self.meta.meta_count_with_fence() {
997            let key_meta = self.get_kv_meta(index as usize);
998
999            #[cfg(target_arch = "x86_64")]
1000            unsafe {
1001                // For bw-tree-like linear search, we use clflush to simulate pointer chasing.
1002                core::arch::x86_64::_mm_clflush(key_meta as *const LeafKVMeta as *const u8);
1003            }
1004            let cmp = self.key_cmp(key_meta, key);
1005
1006            if cmp != Ordering::Less {
1007                return index;
1008            }
1009
1010            index += 1;
1011        }
1012
1013        index
1014    }
1015
1016    pub(crate) fn lower_bound(&self, key: &[u8]) -> u16 {
1017        let mut lower = self.first_meta_pos_after_fence();
1018        let mut upper = self.meta.meta_count_with_fence();
1019
1020        let search_key_prefix = &key[(self.prefix_len as usize)..]
1021            [..std::cmp::min(key.len() - self.prefix_len as usize, PREVIEW_SIZE)];
1022
1023        debug_assert!(key.len() >= self.prefix_len as usize);
1024
1025        while lower < upper {
1026            let mid = lower + (upper - lower) / 2;
1027            let key_meta = self.get_kv_meta(mid as usize);
1028
1029            let prefix_key = &key_meta.preview_bytes[..std::cmp::min(
1030                PREVIEW_SIZE,
1031                key_meta.get_key_len() as usize - self.prefix_len as usize,
1032            )];
1033            let mut cmp = prefix_key.cmp(search_key_prefix);
1034
1035            // If the prefix matches, compare the full key
1036            if cmp == Ordering::Equal {
1037                let remaining_key = self.get_remaining_key(key_meta);
1038                let search_key_postfix = &key[self.prefix_len as usize..];
1039                cmp = remaining_key.cmp(search_key_postfix);
1040            }
1041
1042            match cmp {
1043                Ordering::Greater => {
1044                    upper = mid;
1045                }
1046                Ordering::Equal => {
1047                    return mid;
1048                }
1049                Ordering::Less => {
1050                    lower = mid + 1;
1051                }
1052            }
1053        }
1054        lower
1055    }
1056
1057    /// Take a deep breath before you read/change this function.
1058    ///
1059    /// Leaf node insert is different from inner node that:
1060    /// 1. Leaf node value is variable length
1061    /// 2. Leaf node need to handle duplicate key, in which case it need to remove the old value and insert the new value.
1062    ///
1063    /// Returns true if the insert is successful, false if out of space.
1064    pub(crate) fn insert(
1065        &mut self,
1066        key: &[u8],
1067        value: &[u8],
1068        op_type: OpType,
1069        max_fence_len: usize,
1070    ) -> bool {
1071        debug_assert!(key.len() as u16 >= self.prefix_len);
1072        match op_type {
1073            OpType::Insert | OpType::Cache => {
1074                debug_assert!(!value.is_empty());
1075            }
1076            OpType::Delete | OpType::Phantom => {}
1077        }
1078
1079        let post_fix_len = key.len() as u16 - self.prefix_len;
1080        let val_len = value.len() as u16;
1081        let kv_len = post_fix_len + val_len;
1082
1083        let value_count_with_fence = self.meta.meta_count_with_fence();
1084
1085        let pos = self.lower_bound(key) as usize;
1086
1087        if pos < value_count_with_fence as usize {
1088            let prefix_len = self.prefix_len as usize;
1089            let pos_meta = self.get_kv_meta(pos);
1090            let pos_key = self.get_remaining_key(pos_meta);
1091            let search_key_postfix = &key[prefix_len..];
1092            if pos_key.cmp(search_key_postfix) == Ordering::Equal {
1093                // The key already exists.
1094                counter!(LeafInsertDuplicate);
1095                if op_type == OpType::Delete {
1096                    let pos_meta = self.get_kv_meta_mut(pos);
1097                    pos_meta.mark_as_deleted();
1098                    return true;
1099                }
1100
1101                let pos_value = self.get_value(pos_meta);
1102                let pos_value_len = pos_value.len() as u16;
1103
1104                if pos_value_len >= val_len {
1105                    // we are lucky, old value is larger than new value. We just overwrite the old value.
1106                    unsafe {
1107                        let pair_ptr = self.data.as_ptr().add(pos_meta.offset as usize) as *mut u8;
1108                        std::ptr::copy_nonoverlapping(
1109                            value.as_ptr(),
1110                            pair_ptr.add(post_fix_len as usize),
1111                            val_len as usize,
1112                        );
1113                    }
1114                    let pos_meta = self.get_kv_meta_mut(pos);
1115                    pos_meta.set_value_len(val_len);
1116                    pos_meta.set_op_type(op_type);
1117                    return true;
1118                }
1119
1120                if self.meta.remaining_size < kv_len {
1121                    return false;
1122                }
1123                assert!(op_type != OpType::Cache);
1124                let offset = self.current_lowest_offset() - kv_len;
1125                unsafe {
1126                    let pair_ptr = self.data.as_ptr().add(offset as usize) as *mut u8;
1127
1128                    let pos_meta = self.get_kv_meta_mut(pos);
1129                    pos_meta.set_value_len(val_len);
1130                    pos_meta.set_op_type(op_type);
1131                    pos_meta.offset = offset;
1132                    std::ptr::copy_nonoverlapping(
1133                        key[self.prefix_len as usize..].as_ptr(),
1134                        pair_ptr,
1135                        post_fix_len as usize,
1136                    );
1137                    std::ptr::copy_nonoverlapping(
1138                        value.as_ptr(),
1139                        pair_ptr.add(post_fix_len as usize),
1140                        val_len as usize,
1141                    );
1142                }
1143                self.meta.remaining_size -= kv_len;
1144                return true;
1145            }
1146        }
1147
1148        // The key is not already in the node.
1149        // For any in-memory page, skipping delete records here could lead to empty page
1150        // which becomes un-evictable.
1151        if op_type == OpType::Delete && (self.is_base_page()) {
1152            // we are deleting a key that is not in the page.
1153            return true;
1154        }
1155
1156        //Check if the node has capacity for the new record with or without fences
1157        if self.full_with_fences(
1158            kv_len + std::mem::size_of::<LeafKVMeta>() as u16,
1159            max_fence_len,
1160        ) {
1161            return false;
1162        }
1163
1164        counter!(LeafInsertNew);
1165
1166        let offset = self.current_lowest_offset() - kv_len;
1167        let new_meta =
1168            LeafKVMeta::make_prefixed_meta(offset, val_len, key, self.prefix_len, op_type);
1169
1170        unsafe {
1171            let metas_size = std::mem::size_of::<LeafKVMeta>()
1172                * (self.meta.meta_count_with_fence() - pos as u16) as usize;
1173            let src_ptr = self
1174                .data
1175                .as_ptr()
1176                .add(pos * std::mem::size_of::<LeafKVMeta>());
1177            let dest_ptr = self
1178                .data
1179                .as_mut_ptr()
1180                .add((pos + 1) * std::mem::size_of::<LeafKVMeta>());
1181            std::ptr::copy(src_ptr, dest_ptr, metas_size);
1182
1183            self.write_initial_kv_meta(pos, new_meta);
1184
1185            let pair_ptr = self.data.as_mut_ptr().add(offset as usize);
1186            std::ptr::copy_nonoverlapping(
1187                key[self.prefix_len as usize..].as_ptr(),
1188                pair_ptr,
1189                post_fix_len as usize,
1190            );
1191            std::ptr::copy_nonoverlapping(
1192                value.as_ptr(),
1193                pair_ptr.add(post_fix_len as usize),
1194                val_len as usize,
1195            );
1196        }
1197
1198        self.meta.remaining_size -= kv_len + std::mem::size_of::<LeafKVMeta>() as u16;
1199        self.meta.increment_value_count();
1200        true
1201    }
1202
1203    /// Base pages are leaf pages without next pointer in non cache-only mode
1204    pub(crate) fn is_base_page(&self) -> bool {
1205        (!self.meta.is_cache_only_leaf()) && self.next_level.is_null()
1206    }
1207
1208    /// Returns the splitting key after splitting self
1209    /// The new high fence key of self (left-side node) and low fence
1210    /// of the new base page (right-side node) equal to the splitting key
1211    pub fn split(&mut self, sibling: &mut LeafNode, cache_only: bool) -> Vec<u8> {
1212        if cache_only {
1213            assert!(self.meta.is_cache_only_leaf() && !self.has_fence());
1214        } else {
1215            assert!(self.is_base_page() && self.has_fence());
1216        }
1217
1218        let current_count = self.meta.meta_count_without_fence();
1219
1220        // [new_cur_count..] are moved to the sibling node
1221        // [..new_cur_count - 1] are kept in self
1222        // where splitting_key is the key at new_cur_count
1223        // Also, new_cur_count is in [0..(#non_fence_meta - 1)]
1224        let (splitting_key, new_cur_count) = self.get_split_key(cache_only);
1225        let sibling_cnt = current_count - new_cur_count;
1226
1227        // Now we have to do two things:
1228        // Copy the second half of the key-value pairs to the new node, setting the correct offsets.
1229        // Consolidate the key-value pairs in the current node, setting the correct offsets
1230        if !cache_only {
1231            let low_fence_for_right = self.get_kv_meta(FENCE_KEY_CNT + new_cur_count as usize);
1232            assert!(!low_fence_for_right.is_infinite_low_fence_key());
1233            let high_fence_for_right = self.get_kv_meta(HIGH_FENCE_IDX);
1234
1235            let low_fence_key = self.get_full_key(low_fence_for_right);
1236            let high_fence_key = if high_fence_for_right.is_infinite_high_fence_key() {
1237                vec![]
1238            } else {
1239                self.get_full_key(high_fence_for_right)
1240            };
1241
1242            sibling.initialize(
1243                &low_fence_key,
1244                &high_fence_key,
1245                sibling.meta.node_size as usize,
1246                MiniPageNextLevel::new_null(),
1247                true,
1248                cache_only,
1249            );
1250        } else {
1251            sibling.initialize(
1252                &[],
1253                &[],
1254                sibling.meta.node_size as usize,
1255                MiniPageNextLevel::new_null(),
1256                false,
1257                cache_only,
1258            );
1259        }
1260
1261        let starting_kv_idx = if cache_only { 0 } else { FENCE_KEY_CNT };
1262
1263        for i in 0..sibling_cnt {
1264            let kv_meta = self.get_kv_meta((new_cur_count + i) as usize + starting_kv_idx);
1265            if kv_meta.op_type() == OpType::Delete {
1266                // skip deleted records.
1267                continue;
1268            }
1269            let key = self.get_full_key(kv_meta);
1270            let value = self.get_value(kv_meta);
1271            let insert_rt = sibling.insert(&key, value, OpType::Insert, 0);
1272            assert!(insert_rt);
1273        }
1274
1275        if !cache_only {
1276            self.meta
1277                .set_value_count(new_cur_count + FENCE_KEY_CNT as u16);
1278        } else {
1279            self.meta.set_value_count(new_cur_count);
1280        }
1281
1282        self.consolidate_after_split(&splitting_key);
1283
1284        splitting_key
1285    }
1286
1287    /// Use the passed in `merge_split_key` as the splitting key
1288    /// to divide the base page in two. Also, install it as the high
1289    /// fence key of the left-side node and the low fence key of the
1290    /// right-side node
1291    pub fn split_with_key(
1292        &mut self,
1293        sibling: &mut LeafNode,
1294        merge_split_key: &Vec<u8>,
1295        cache_only: bool,
1296    ) {
1297        if cache_only {
1298            assert!(self.meta.is_cache_only_leaf() && !self.has_fence());
1299        } else {
1300            assert!(self.is_base_page() && self.has_fence());
1301        }
1302
1303        let current_count = self.meta.meta_count_without_fence();
1304        // [new_cur_count..] are moved to the sibling node
1305        // [..new_cur_count - 1] are kept in self
1306        // where splitting_key is the key at new_cur_count
1307        // Also, new_cur_count is in [0..#non_fence_meta]
1308        let mut new_cur_count = self.get_kv_num_below_key(merge_split_key);
1309        let sibling_cnt = current_count - new_cur_count;
1310
1311        // Now we have to do two things:
1312        // Copy the second half of the key-value pairs to the new node, setting the correct offsets.
1313        // Consolidate the key-value pairs in the current node, setting the correct offsets.
1314        if !cache_only {
1315            let high_fence_for_right = self.get_kv_meta(HIGH_FENCE_IDX);
1316
1317            let low_fence_key = merge_split_key.clone();
1318            let high_fence_key = if high_fence_for_right.is_infinite_high_fence_key() {
1319                vec![]
1320            } else {
1321                self.get_full_key(high_fence_for_right)
1322            };
1323
1324            sibling.initialize(
1325                &low_fence_key,
1326                &high_fence_key,
1327                sibling.meta.node_size as usize,
1328                MiniPageNextLevel::new_null(),
1329                true,
1330                cache_only,
1331            );
1332        } else {
1333            sibling.initialize(
1334                &[],
1335                &[],
1336                sibling.meta.node_size as usize,
1337                MiniPageNextLevel::new_null(),
1338                false,
1339                cache_only,
1340            );
1341        }
1342
1343        let starting_kv_index = if cache_only { 0 } else { FENCE_KEY_CNT };
1344
1345        for i in 0..sibling_cnt {
1346            let kv_meta = self.get_kv_meta((new_cur_count + i) as usize + starting_kv_index);
1347            if kv_meta.op_type() == OpType::Delete {
1348                // skip deleted records
1349                continue;
1350            }
1351            let key = self.get_full_key(kv_meta);
1352            let value = self.get_value(kv_meta);
1353            let insert_rt = sibling.insert(&key, value, OpType::Insert, 0);
1354            assert!(insert_rt);
1355        }
1356
1357        if !cache_only {
1358            new_cur_count += FENCE_KEY_CNT as u16;
1359        }
1360
1361        self.meta.set_value_count(new_cur_count);
1362        self.consolidate_after_split(merge_split_key);
1363
1364        if !cache_only {
1365            // Assert the high fence of the left side node and the low fence of the right side node
1366            // are both set to the merge split key
1367            let left_high_fence = self.get_kv_meta(HIGH_FENCE_IDX);
1368            let left_high_fence_key = self.get_full_key(left_high_fence);
1369            let right_low_fence_key = sibling.get_low_fence_full_key();
1370
1371            assert_eq!(left_high_fence_key, *merge_split_key); // The high fence of the left side node == splitting key
1372            assert_eq!(right_low_fence_key, *merge_split_key);
1373        }
1374    }
1375
1376    pub(crate) fn consolidate_inner(
1377        &mut self,
1378        new_optype: OpType,
1379        new_high_fence: Option<&[u8]>,
1380        skip_tombstone: bool,
1381        cache_only: bool,
1382        skip_key: Option<&[u8]>,
1383    ) {
1384        let mut pairs = Vec::new();
1385
1386        for meta in self.meta_iter() {
1387            if skip_tombstone && meta.op_type() == OpType::Delete {
1388                // Skip tombstone values.
1389                continue;
1390            }
1391
1392            match skip_key {
1393                // Skip the record with the skip_key
1394                Some(s_k) => {
1395                    let k = self.get_full_key(meta);
1396                    let cmp = s_k.cmp(&k);
1397
1398                    if cmp != Ordering::Equal {
1399                        pairs.push((k, self.get_value(meta).to_owned(), meta.op_type()));
1400                    }
1401                }
1402                None => {
1403                    pairs.push((
1404                        self.get_full_key(meta),
1405                        self.get_value(meta).to_owned(),
1406                        meta.op_type(),
1407                    ));
1408                }
1409            }
1410        }
1411
1412        let has_fence = self.has_fence();
1413        let (low_fence, high_fence) = if has_fence {
1414            let high_fence_key = match new_high_fence {
1415                None => self.get_high_fence_key(),
1416                Some(key) => key.to_vec(),
1417            };
1418            (self.get_low_fence_key(), high_fence_key)
1419        } else {
1420            (vec![], vec![])
1421        };
1422
1423        let node_size = self.meta.node_size;
1424
1425        self.initialize(
1426            &low_fence,
1427            &high_fence,
1428            node_size as usize,
1429            self.next_level,
1430            has_fence,
1431            cache_only,
1432        );
1433
1434        for (key, value, op_type) in pairs {
1435            let rt = if op_type == OpType::Delete {
1436                self.insert(&key, &value, OpType::Delete, 0)
1437            } else {
1438                self.insert(&key, &value, new_optype, 0)
1439            };
1440            assert!(rt);
1441        }
1442    }
1443
1444    #[allow(dead_code)]
1445    pub(crate) fn consolidate(&mut self) {
1446        self.consolidate_inner(
1447            OpType::Insert,
1448            None,
1449            true,
1450            self.meta.is_cache_only_leaf(),
1451            None,
1452        );
1453    }
1454
1455    /// For mini page only.
1456    /// After consolidation, every tombstone records are removed, every insert records become cache records.
1457    #[allow(dead_code)]
1458    pub(crate) fn consolidate_after_merge(&mut self) {
1459        assert!(!self.is_base_page());
1460        self.consolidate_inner(
1461            OpType::Cache,
1462            None,
1463            false,
1464            self.meta.is_cache_only_leaf(),
1465            None,
1466        );
1467    }
1468
1469    /// For base page in non cache-only mode or mini page in cache-only mode.
1470    /// Note that this operation could empty the page so the caller needs to ensure it won't lead to an empty mini page in memory.
1471    /// Re-organize the key-value pairs to remove the holes in the data.
1472    /// A delete/split operation will leave holes in the data field, which is not good for space efficiency.
1473    /// The easiest (but not most efficient) way to do this is to first populate every key value out, then reset the node state, then insert them back.
1474    fn consolidate_after_split(&mut self, high_fence: &[u8]) {
1475        assert!(self.meta.is_cache_only_leaf() || self.is_base_page());
1476        self.consolidate_inner(
1477            OpType::Insert,
1478            Some(high_fence),
1479            true,
1480            self.meta.is_cache_only_leaf(),
1481            None,
1482        );
1483    }
1484
1485    /// For mini page only in cache-only mode only.
1486    /// Note that this operation could empty the page so the caller needs to ensure it won't lead to an empty mini page in memory.
1487    /// Same as consolidate(), except that any record with the given key is dropped
1488    pub(crate) fn consolidate_skip_key(&mut self, key: &[u8]) {
1489        assert!(!self.is_base_page());
1490        assert!(self.meta.is_cache_only_leaf());
1491        self.consolidate_inner(
1492            OpType::Insert,
1493            None,
1494            true,
1495            self.meta.is_cache_only_leaf(),
1496            Some(key),
1497        );
1498    }
1499
1500    /// Copy a mini-page to a new memory location
1501    pub(crate) fn copy_initialize_to(
1502        &self,
1503        dst_node: *mut LeafNode,
1504        dst_size: usize,
1505        discard_cold_cache: bool,
1506    ) {
1507        assert!(!self.is_base_page());
1508        assert!(self.meta.node_size as usize <= dst_size);
1509        let dst_ref = unsafe { &mut *dst_node };
1510        let empty = vec![];
1511
1512        dst_ref.initialize(
1513            &empty,
1514            &empty,
1515            dst_size,
1516            self.next_level,
1517            false, // Mini-page only, thus no fence
1518            self.meta.is_cache_only_leaf(),
1519        );
1520
1521        for meta in self.meta_iter() {
1522            let op = meta.op_type();
1523
1524            // Skip the cold values
1525            // This won't happen in cache-only mode as all records are dirty
1526            // In non cache-only mode, this won't lead to empty mini-page as
1527            // this is invoked after a read-hot or write-hot record.
1528            if discard_cold_cache && op.is_cache() && !meta.is_referenced() {
1529                continue;
1530            }
1531
1532            let value = self.get_value(meta);
1533            let rt = dst_ref.insert(&self.get_full_key(meta), value, op, 0);
1534            assert!(rt);
1535        }
1536    }
1537
1538    /// Initialize a LeafNode
1539    pub(crate) fn initialize(
1540        &mut self,
1541        low_fence: &[u8],
1542        high_fence: &[u8],
1543        node_size: usize,
1544        next_level: MiniPageNextLevel,
1545        has_fence: bool,
1546        cache_only: bool,
1547    ) {
1548        let snapshot_version = match CPRSnapShotMgr::get_snapshot_thread_id() {
1549            Ok(_) => CPRSnapShotMgr::get_snapshot_thread_version(),
1550            Err(_) => INVALID_SNAPSHOT_VERSION,
1551        };
1552
1553        // The initial version of a page is marked as not changed
1554        self.snapshot_version = snapshot_version & Self::SNAPSHOT_VERSION_MASK;
1555
1556        if !has_fence {
1557            assert!(low_fence.is_empty());
1558            assert!(high_fence.is_empty());
1559
1560            self.meta = NodeMeta::new(
1561                LeafNode::max_data_size(node_size) as u16,
1562                false,
1563                false,
1564                node_size as u16,
1565                cache_only,
1566            );
1567            self.prefix_len = 0;
1568            self.next_level = next_level;
1569        } else {
1570            self.meta = NodeMeta::new(
1571                LeafNode::max_data_size(node_size) as u16, // - max_fence_len as u16, // Reserve space for
1572                false,
1573                true,
1574                node_size as u16,
1575                cache_only,
1576            );
1577
1578            self.prefix_len = common_prefix_len(low_fence, high_fence);
1579            self.next_level = next_level;
1580            self.install_fence_key(low_fence, false);
1581            self.install_fence_key(high_fence, true);
1582        }
1583    }
1584
1585    pub(crate) fn set_split_flag(&mut self) {
1586        self.meta.set_split_flag();
1587    }
1588
1589    pub(crate) fn get_split_flag(&self) -> bool {
1590        self.meta.get_split_flag()
1591    }
1592
1593    // Highest bit (63) is used as "changed" indicator flag.
1594    // Bits 0-62 hold the actual version value.
1595    const SNAPSHOT_VERSION_CHANGED_FLAG: u64 = 0x8000_0000_0000_0000;
1596    const SNAPSHOT_VERSION_MASK: u64 = 0x7FFF_FFFF_FFFF_FFFF;
1597
1598    /// Get the snapshot version value without the "is_changed" flag.
1599    pub(crate) fn get_snapshot_version(&self) -> u64 {
1600        self.snapshot_version & Self::SNAPSHOT_VERSION_MASK
1601    }
1602
1603    /// Set the snapshot version and mark whether it changed via the highest bit flag.
1604    pub(crate) fn set_snapshot_version(&mut self, snapshot_version: u64, mark_changed: bool) {
1605        let next_version = snapshot_version & Self::SNAPSHOT_VERSION_MASK;
1606
1607        if !mark_changed {
1608            self.snapshot_version = next_version;
1609            return;
1610        }
1611
1612        let current_version = self.get_snapshot_version();
1613
1614        // Set changed flag only when version differs.
1615        if next_version != current_version {
1616            self.snapshot_version = next_version | Self::SNAPSHOT_VERSION_CHANGED_FLAG;
1617        }
1618    }
1619
1620    /// Check whether the snapshot version has changed.
1621    pub(crate) fn is_snapshot_version_changed(&self) -> bool {
1622        (self.snapshot_version & Self::SNAPSHOT_VERSION_CHANGED_FLAG) != 0
1623    }
1624
1625    /// Set the "changed" flag without changing the version value.
1626    pub(crate) fn set_snapshot_version_changed_flag(&mut self) {
1627        self.snapshot_version |= Self::SNAPSHOT_VERSION_CHANGED_FLAG;
1628    }
1629
1630    pub(crate) fn read_by_key(&self, search_key: &[u8], out_buffer: &mut [u8]) -> LeafReadResult {
1631        self.read_by_key_inner(search_key, out_buffer, true)
1632    }
1633
1634    /// Read by key.
1635    #[must_use]
1636    pub(crate) fn read_by_key_inner(
1637        &self,
1638        search_key: &[u8],
1639        out_buffer: &mut [u8],
1640        binary_search: bool,
1641    ) -> LeafReadResult {
1642        let val_count = self.meta.meta_count_with_fence();
1643        let pos = if binary_search {
1644            self.lower_bound(search_key)
1645        } else {
1646            self.linear_lower_bound(search_key)
1647        };
1648
1649        if pos >= val_count {
1650            counter!(LeafNotFoundDueToRange);
1651            return LeafReadResult::NotFound;
1652        }
1653
1654        let kv_meta = self.get_kv_meta(pos as usize);
1655        let target_key = self.get_remaining_key(kv_meta);
1656
1657        // If the key is not already referenced, we need to mark it as referenced.
1658        if !kv_meta.is_referenced() {
1659            kv_meta.mark_as_ref();
1660        }
1661
1662        let input_post_key = &search_key[self.prefix_len as usize..];
1663        let cmp = target_key.cmp(input_post_key);
1664
1665        if cmp != Ordering::Equal {
1666            counter!(LeafNotFoundDueToKey);
1667            LeafReadResult::NotFound
1668        } else {
1669            if kv_meta.op_type().is_absent() {
1670                return LeafReadResult::Deleted;
1671            }
1672            let val_len = kv_meta.value_len();
1673            let val_ref = self.get_value(kv_meta);
1674            debug_assert_eq!(val_len as usize, val_ref.len());
1675            out_buffer[..val_len as usize].copy_from_slice(val_ref);
1676            LeafReadResult::Found(val_len as u32)
1677        }
1678    }
1679
1680    /// Pick the smallest mini-page size that the record fits in without filling it full
1681    /// For Bf-Tree with backend storage, the size is strictly less than the leaf page size (full)
1682    /// For cache-only Bf-Tree, we allow the mini-page size to reach leaf page size
1683    /// Assuming page_classes is in ascending order
1684    pub(crate) fn get_chain_size_hint(
1685        key_len: usize,
1686        value_len: usize,
1687        page_classes: &[usize],
1688        cache_only: bool,
1689    ) -> usize {
1690        let mut initial_record_size = key_len + value_len + std::mem::size_of::<LeafKVMeta>();
1691        initial_record_size += std::mem::size_of::<LeafNode>();
1692
1693        if let Some(s) = page_classes[0..(page_classes.len() - 1)]
1694            .iter()
1695            .position(|x| initial_record_size < *x)
1696        {
1697            return page_classes[s];
1698        } else if cache_only && initial_record_size <= page_classes[page_classes.len() - 1] {
1699            return page_classes[page_classes.len() - 1];
1700        }
1701
1702        panic!(
1703            "Record size {} plus metadata exceeds the max mini-page size {:?}",
1704            initial_record_size, page_classes
1705        );
1706    }
1707
1708    /// A mini-page is upgraded to the next size up where the record fits in without filling it full.
1709    /// For Bf-Tree with backend storage, the new size must be smaller than the leaf page size (full)
1710    /// For cache-only Bf-Tree, we allow the mini-page size to reach leaf page size
1711    /// If not possible, return None
1712    /// Assuming page_classes is in ascending order
1713    pub(crate) fn new_size_if_upgrade(
1714        &self,
1715        incoming_size: usize,
1716        page_classes: &[usize],
1717        cache_only: bool,
1718    ) -> Option<usize> {
1719        let cur_size = self.meta.node_size as usize;
1720        let request_size = cur_size + incoming_size;
1721
1722        if let Some(s) = page_classes[0..(page_classes.len() - 1)]
1723            .iter()
1724            .position(|x| request_size < *x)
1725        {
1726            if s == 0 {
1727                panic!("Should not be here");
1728            }
1729            return Some(page_classes[s]);
1730        } else if cache_only && request_size <= page_classes[page_classes.len() - 1] {
1731            return Some(page_classes[page_classes.len() - 1]);
1732        }
1733
1734        None
1735    }
1736
1737    /// Currently free node can only be called with base node. Mini page should be freed differently.
1738    pub(crate) fn free_base_page(node: *mut LeafNode) {
1739        assert!(unsafe { &*node }.is_base_page());
1740        let node_size = unsafe { &*node }.meta.node_size as usize;
1741        let layout = Layout::from_size_align(node_size, std::mem::align_of::<LeafNode>()).unwrap();
1742        unsafe {
1743            std::alloc::dealloc(node as *mut u8, layout);
1744        }
1745    }
1746
1747    pub(crate) fn need_actually_merge_to_disk(&self) -> bool {
1748        for meta in self.meta_iter() {
1749            if meta.op_type().is_dirty() {
1750                return true;
1751            }
1752        }
1753
1754        false
1755    }
1756
1757    fn estimate_merge_size(&self) -> usize {
1758        let mut required_size = 0;
1759
1760        for meta in self.meta_iter() {
1761            if meta.op_type() == OpType::Insert {
1762                required_size += (meta.get_key_len() + meta.value_len()) as usize
1763                    + std::mem::size_of::<LeafKVMeta>();
1764            }
1765        }
1766
1767        required_size
1768    }
1769
1770    /// Determine if the leaf node has space of the requested_size given the full fence length
1771    /// This is required as fences could be added to a base page during consolidation
1772    pub(crate) fn full_with_fences(&self, requested_size: u16, max_fence_len: usize) -> bool {
1773        if self.meta.remaining_size < requested_size {
1774            return true;
1775        }
1776
1777        if self.meta.has_fence() {
1778            let mut empty_data_size = self.meta.remaining_size; // Counting fence keys
1779
1780            let low_key_meta = self.get_kv_meta(LOW_FENCE_IDX);
1781            if !low_key_meta.is_infinite_low_fence_key() {
1782                empty_data_size += low_key_meta.get_key_len();
1783            }
1784
1785            let high_key_meta = self.get_kv_meta(HIGH_FENCE_IDX);
1786            if !high_key_meta.is_infinite_high_fence_key() {
1787                empty_data_size += high_key_meta.get_key_len();
1788            }
1789
1790            if empty_data_size >= requested_size + max_fence_len as u16 {
1791                return false;
1792            }
1793
1794            return true;
1795        }
1796
1797        false
1798    }
1799
1800    pub(crate) fn merge_mini_page(&mut self, mini_page: &LeafNode, max_fence_len: usize) -> bool {
1801        let size_required = mini_page.estimate_merge_size();
1802
1803        if self.full_with_fences(size_required as u16, max_fence_len) {
1804            return false;
1805        }
1806
1807        // Merge the snapshot version too.
1808        self.snapshot_version = mini_page.get_snapshot_version();
1809        for meta in mini_page.meta_iter() {
1810            let c_key = mini_page.get_full_key(meta);
1811            let c_type = meta.op_type();
1812            if !c_type.is_dirty() {
1813                // This is important. We don't want to merge cache records, not only for performance but also correctness.
1814                // A cached record might be inaccessible (thus invalid).
1815                // Consider the case where scan operation merged the mini page, which triggers split,
1816                // The scan operation want to keep the mini page (why not), but the records are all in cache mode.
1817                continue;
1818            }
1819            let c_value = mini_page.get_value(meta);
1820            let rt = self.insert(&c_key, c_value, c_type, 0);
1821            assert!(rt);
1822        }
1823
1824        true
1825    }
1826
1827    pub(crate) fn get_stats(&self) -> LeafStats {
1828        let mut keys = Vec::new();
1829        let mut values = Vec::new();
1830        let mut op_types = Vec::new();
1831        let node_size = self.meta.node_size as usize;
1832        let prefix = self.get_prefix().to_owned();
1833
1834        for meta in self.meta_iter() {
1835            let key = self.get_full_key(meta);
1836            let value = self.get_value(meta);
1837            keys.push(key);
1838            values.push(value.to_owned());
1839            op_types.push(meta.op_type());
1840        }
1841
1842        LeafStats {
1843            keys,
1844            values,
1845            op_types,
1846            prefix,
1847            base_node: None,
1848            next_level: self.next_level,
1849            node_size,
1850        }
1851    }
1852
1853    pub(crate) fn has_fence(&self) -> bool {
1854        self.meta.has_fence()
1855    }
1856
1857    /// Returns an iterator that iterates over all key-value pairs, skipping the fence keys.
1858    pub(crate) fn meta_iter(&self) -> LeafMetaIter<'_> {
1859        LeafMetaIter::new(self)
1860    }
1861
1862    fn first_meta_pos_after_fence(&self) -> u16 {
1863        if self.has_fence() {
1864            FENCE_KEY_CNT as u16
1865        } else {
1866            0
1867        }
1868    }
1869
1870    pub(crate) fn get_record_by_pos_with_bound(
1871        &self,
1872        pos: u32,
1873        out_buffer: &mut [u8],
1874        return_field: ScanReturnField,
1875        bound_key: &Option<Vec<u8>>,
1876    ) -> GetScanRecordByPosResult {
1877        if pos >= self.meta.meta_count_with_fence() as u32 {
1878            return GetScanRecordByPosResult::EndOfLeaf;
1879        }
1880
1881        let meta = self.get_kv_meta(pos as usize);
1882
1883        if meta.op_type().is_absent() {
1884            return GetScanRecordByPosResult::Deleted;
1885        }
1886
1887        match return_field {
1888            ScanReturnField::Value => {
1889                if let Some(bk) = bound_key {
1890                    let cmp = self.get_full_key(meta).as_slice().cmp(bk);
1891                    if cmp == Ordering::Greater {
1892                        return GetScanRecordByPosResult::BoundKeyExceeded;
1893                    }
1894                }
1895
1896                let value = self.get_value(meta);
1897                let value_len = meta.value_len() as usize;
1898                out_buffer[..value_len].copy_from_slice(value);
1899                GetScanRecordByPosResult::Found(0, value_len as u32)
1900            }
1901            ScanReturnField::Key => {
1902                let full_key = self.get_full_key(meta);
1903
1904                if let Some(bk) = bound_key {
1905                    let cmp = full_key.as_slice().cmp(bk);
1906                    if cmp == Ordering::Greater {
1907                        return GetScanRecordByPosResult::BoundKeyExceeded;
1908                    }
1909                }
1910
1911                let key_len = full_key.len();
1912                out_buffer[..key_len].copy_from_slice(&full_key);
1913                GetScanRecordByPosResult::Found(key_len as u32, 0)
1914            }
1915            ScanReturnField::KeyAndValue => {
1916                let full_key = self.get_full_key(meta);
1917
1918                if let Some(bk) = bound_key {
1919                    let cmp = full_key.as_slice().cmp(bk);
1920                    if cmp == Ordering::Greater {
1921                        return GetScanRecordByPosResult::BoundKeyExceeded;
1922                    }
1923                }
1924
1925                let key_len = full_key.len();
1926                let value = self.get_value(meta);
1927                let value_len = meta.value_len() as usize;
1928
1929                out_buffer[..key_len].copy_from_slice(&full_key);
1930                out_buffer[key_len..key_len + value_len].copy_from_slice(value);
1931
1932                GetScanRecordByPosResult::Found(key_len as u32, value_len as u32)
1933            }
1934        }
1935    }
1936}
1937
1938pub(crate) struct LeafMetaIter<'a> {
1939    node: &'a LeafNode,
1940    cur: usize,
1941}
1942
1943impl LeafMetaIter<'_> {
1944    fn new(leaf: &LeafNode) -> LeafMetaIter<'_> {
1945        let cur = leaf.first_meta_pos_after_fence();
1946        LeafMetaIter {
1947            node: leaf,
1948            cur: cur as usize,
1949        }
1950    }
1951}
1952
1953impl<'a> Iterator for LeafMetaIter<'a> {
1954    type Item = &'a LeafKVMeta;
1955
1956    fn next(&mut self) -> Option<Self::Item> {
1957        if self.cur < self.node.meta.meta_count_with_fence() as usize {
1958            let rt = self.node.get_kv_meta(self.cur);
1959            self.cur += 1;
1960            Some(rt)
1961        } else {
1962            None
1963        }
1964    }
1965}
1966
1967pub(crate) enum GetScanRecordByPosResult {
1968    EndOfLeaf,
1969    Deleted,
1970    Found(u32, u32),  // length of returned key and value
1971    BoundKeyExceeded, // The key at the pos exceeds a given bound key
1972}
1973
1974#[derive(Debug, PartialEq, Eq, Clone)]
1975pub enum LeafReadResult {
1976    Deleted,
1977    NotFound,
1978    Found(u32),
1979    InvalidKey,
1980}
1981
1982#[cfg(test)]
1983mod tests {
1984    use super::*;
1985    use bytemuck::cast_slice;
1986    use rstest::rstest;
1987
1988    #[test]
1989    fn test_op_type_enum() {
1990        assert_eq!(OpType::Insert as u8, 0);
1991        assert_eq!(OpType::Delete as u8, 1);
1992        assert_eq!(OpType::Cache as u8, 2);
1993    }
1994
1995    #[test]
1996    fn test_make_prefixed_meta() {
1997        let key = [1, 2, 3, 4, 5];
1998        let prefix_len = 2;
1999        let meta = LeafKVMeta::make_prefixed_meta(10, 20, &key, prefix_len, OpType::Insert);
2000
2001        assert_eq!(meta.offset, 10);
2002        assert_eq!(meta.get_key_len(), 5);
2003        assert_eq!(meta.value_len(), 20);
2004        assert_eq!(meta.preview_bytes, [3, 4]);
2005        assert_eq!(meta.op_type(), OpType::Insert);
2006        assert!(!meta.is_referenced());
2007    }
2008
2009    #[test]
2010    fn test_make_infinite_high_fence_key() {
2011        let meta = LeafKVMeta::make_infinite_high_fence_key();
2012        assert_eq!(meta.offset, u16::MAX);
2013        assert_eq!(meta.get_key_len(), 0);
2014        assert_eq!(meta.value_len(), 0);
2015        assert!(meta.is_infinite_high_fence_key());
2016    }
2017
2018    #[test]
2019    fn test_make_infinite_low_fence_key() {
2020        let meta = LeafKVMeta::make_infinite_low_fence_key();
2021        assert_eq!(meta.offset, u16::MAX - 1);
2022        assert_eq!(meta.get_key_len(), 0);
2023        assert_eq!(meta.value_len(), 0);
2024        assert!(meta.is_infinite_low_fence_key());
2025    }
2026
2027    #[test]
2028    fn test_value_len() {
2029        let mut meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Insert);
2030        assert_eq!(meta.value_len(), 20);
2031
2032        meta.set_value_len(30);
2033        assert_eq!(meta.value_len(), 30);
2034    }
2035
2036    #[test]
2037    fn test_op_type() {
2038        let meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Delete);
2039        assert_eq!(meta.op_type(), OpType::Delete);
2040    }
2041
2042    #[test]
2043    fn test_mark_as_ref_and_clear_ref() {
2044        let meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Insert);
2045        assert!(!meta.is_referenced());
2046
2047        meta.mark_as_ref();
2048        assert!(meta.is_referenced());
2049
2050        meta.clear_ref();
2051        assert!(!meta.is_referenced());
2052    }
2053
2054    #[test]
2055    fn test_mark_as_deleted_and_is_deleted() {
2056        let mut meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Insert);
2057        assert!(!meta.is_deleted());
2058
2059        meta.mark_as_deleted();
2060        assert!(meta.is_deleted());
2061    }
2062
2063    /// This test verifies that the merge split key divides
2064    /// the combined records of the base page (self) and its
2065    /// to-be-merged mini-page in half
2066    #[rstest]
2067    #[case(vec![1], vec![2], 2)]
2068    #[case(vec![2], vec![1], 2)]
2069    fn test_get_merge_split_key(
2070        #[case] base_page_values: Vec<usize>,
2071        #[case] mini_page_values: Vec<usize>,
2072        #[case] splitting_key: usize,
2073    ) {
2074        let base = unsafe { &mut *LeafNode::make_base_page(4096) };
2075        let mini = unsafe { &mut *LeafNode::make_base_page(4096) }; // Using base page as substitute
2076
2077        // Insert values to base page and mini page accordingly
2078        for i in 0..base_page_values.len() {
2079            let n = &base_page_values[i];
2080            let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
2081            let key = cast_slice::<usize, u8>(n_slice);
2082            let value = cast_slice::<usize, u8>(n_slice);
2083
2084            let rt = base.insert(key, value, OpType::Insert, 2);
2085            assert!(rt);
2086        }
2087
2088        for i in 0..mini_page_values.len() {
2089            let n = &mini_page_values[i];
2090            let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
2091            let key = cast_slice::<usize, u8>(n_slice);
2092            let value = cast_slice::<usize, u8>(n_slice);
2093
2094            let rt = mini.insert(key, value, OpType::Insert, 2);
2095            assert!(rt);
2096        }
2097
2098        // Find the splitting key
2099        let merge_split_key_byte = base.get_merge_split_key(mini);
2100        let merge_splitting_key = cast_slice::<u8, usize>(&merge_split_key_byte);
2101
2102        assert_eq!(merge_splitting_key[0], splitting_key);
2103
2104        LeafNode::free_base_page(base);
2105        LeafNode::free_base_page(mini);
2106    }
2107
2108    /// This test verifies that a base page is correctly split
2109    /// up based on a splitting key with the left side node (self)
2110    /// containing keys less than the key and right side node (new)
2111    /// containing keys greater than or equal to the splitting key
2112    #[rstest]
2113    #[case(vec![1, 2, 3, 4], 3)]
2114    #[case(vec![1], 2)]
2115    #[case(vec![2], 2)]
2116    fn test_split_with_key(#[case] base_page_values: Vec<usize>, #[case] splitting_key: usize) {
2117        let base = unsafe { &mut *LeafNode::make_base_page(4096) };
2118        let sibling = unsafe { &mut *LeafNode::make_base_page(4096) };
2119
2120        // Insert values to base page
2121        for i in 0..base_page_values.len() {
2122            let n = &base_page_values[i];
2123            let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
2124            let key = cast_slice::<usize, u8>(n_slice);
2125            let value = cast_slice::<usize, u8>(n_slice);
2126
2127            let rt = base.insert(key, value, OpType::Insert, 2);
2128            assert!(rt);
2129        }
2130
2131        let splitting_key_ptr = &splitting_key;
2132        let splitting_key_slice =
2133            unsafe { std::slice::from_raw_parts(splitting_key_ptr as *const usize, 1) };
2134        let splitting_key_byte_arrary = cast_slice::<usize, u8>(splitting_key_slice).to_vec();
2135
2136        // Split the base page using the splitting key
2137        base.split_with_key(sibling, &splitting_key_byte_arrary, false);
2138
2139        // All values less than splitting key are in the left side node (self)
2140        // while those greater than or equal to the key are in the sibling node
2141        let mut out_buffer = vec![0u8; 1024];
2142        for i in 0..base_page_values.len() {
2143            let mut page = &mut *base;
2144            if base_page_values[i] >= splitting_key {
2145                page = &mut *sibling;
2146            }
2147
2148            let n = &base_page_values[i];
2149            let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
2150            let key = cast_slice::<usize, u8>(n_slice);
2151
2152            let rt = page.read_by_key(key, &mut out_buffer);
2153
2154            assert_eq!(rt, LeafReadResult::Found(key.len() as u32)); // key and value were set the same
2155            assert_eq!(&out_buffer[0..key.len()], key);
2156        }
2157
2158        LeafNode::free_base_page(base);
2159        LeafNode::free_base_page(sibling);
2160    }
2161}