Skip to main content

bf_tree/nodes/
leaf_node.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT license.
3
4use core::panic;
5use std::{alloc::Layout, cmp::Ordering, sync::atomic};
6
7use crate::{
8    circular_buffer::CircularBufferPtr, counter, error::TreeError, range_scan::ScanReturnField,
9    utils::stats::LeafStats,
10};
11
12use super::{node_meta::NodeMeta, FENCE_KEY_CNT};
13
14/// Invariant: leaf page can only have Insert and Delete type.
15///            mini page can have all.
16#[repr(u8)]
17#[derive(PartialEq, Eq, Debug, Clone, Copy)]
18pub(crate) enum OpType {
19    Insert = 0,
20    Delete = 1,
21    Cache = 2,   // the clean version of insert.
22    Phantom = 3, // the clean version of delete.
23}
24
25impl OpType {
26    pub(crate) fn is_dirty(&self) -> bool {
27        match self {
28            OpType::Insert | OpType::Delete => true,
29            OpType::Cache | OpType::Phantom => false,
30        }
31    }
32
33    fn is_absent(&self) -> bool {
34        match self {
35            OpType::Insert | OpType::Cache => false,
36            OpType::Phantom | OpType::Delete => true,
37        }
38    }
39
40    fn is_cache(&self) -> bool {
41        match self {
42            OpType::Cache | OpType::Phantom => true,
43            OpType::Insert | OpType::Delete => false,
44        }
45    }
46}
47
48const PREVIEW_SIZE: usize = 2;
49
50const LOW_FENCE_IDX: usize = 0;
51const HIGH_FENCE_IDX: usize = 1;
52
53const OP_TYPE_SHIFT: u16 = 14;
54const KEY_LEN_MASK: u16 = 0x3F_FF; // lower 14 bits on the key_len
55
56// highest bit on the value_len;
57const REF_BIT_MASK: u16 = 0x80_00;
58// third highest bit on the value_len;
59const VALUE_LEN_MASK: u16 = 0x7F_FF; // lower 15 bits on the value_len;
60
61pub(crate) fn common_prefix_len(low_fence: &[u8], high_fence: &[u8]) -> u16 {
62    let mut prefix_len = 0;
63    for i in 0..std::cmp::min(low_fence.len(), high_fence.len()) {
64        if low_fence[i] == high_fence[i] {
65            prefix_len += 1;
66        } else {
67            break;
68        }
69    }
70    prefix_len
71}
72
73#[repr(C)]
74pub(crate) struct LeafKVMeta {
75    offset: u16,
76    op_type_key_len_in_byte: u16, // Highest 2 bits: op_type, Lower 14 bits: Key length in bytes.
77    ref_value_len_in_byte: std::sync::atomic::AtomicU16, // Highest bit: ref, Lower 15 bits: value length in bytes. here we don't use shuttle's AtomicU16, because this is not a real sync point.
78    preview_bytes: [u8; PREVIEW_SIZE],
79}
80
81impl LeafKVMeta {
82    pub(crate) fn make_prefixed_meta(
83        offset: u16,
84        value_len: u16,
85        key: &[u8],
86        prefix_len: u16,
87        op_type: OpType,
88    ) -> Self {
89        let mut meta = Self {
90            offset,
91            op_type_key_len_in_byte: key.len() as u16 | ((op_type as u16) << OP_TYPE_SHIFT),
92            ref_value_len_in_byte: std::sync::atomic::AtomicU16::new(value_len),
93            preview_bytes: [0; PREVIEW_SIZE],
94        };
95
96        for i in 0..std::cmp::min(key.len().saturating_sub(prefix_len as usize), PREVIEW_SIZE) {
97            meta.preview_bytes[i] = key[i + prefix_len as usize];
98        }
99
100        // The initial value is not referenced, this is important because during the Garbage reclaim of the delta chain,
101        // we will call Insert to reset the states, which calls this function.
102        meta.clear_ref();
103        meta
104    }
105
106    pub fn make_infinite_high_fence_key() -> Self {
107        assert_eq!(std::mem::size_of::<Self>(), super::KV_META_SIZE);
108
109        Self {
110            offset: u16::MAX,
111            op_type_key_len_in_byte: 0,
112            ref_value_len_in_byte: std::sync::atomic::AtomicU16::new(0),
113            preview_bytes: [0; PREVIEW_SIZE],
114        }
115    }
116
117    pub fn make_infinite_low_fence_key() -> Self {
118        Self {
119            offset: u16::MAX - 1,
120            op_type_key_len_in_byte: 0,
121            ref_value_len_in_byte: std::sync::atomic::AtomicU16::new(0),
122            preview_bytes: [0; PREVIEW_SIZE],
123        }
124    }
125
126    pub fn is_infinite_low_fence_key(&self) -> bool {
127        self.offset == u16::MAX - 1
128    }
129
130    pub fn is_infinite_high_fence_key(&self) -> bool {
131        self.offset == u16::MAX
132    }
133
134    pub fn value_len(&self) -> u16 {
135        self.ref_value_len_in_byte.load(atomic::Ordering::Relaxed) & VALUE_LEN_MASK
136    }
137
138    pub fn set_value_len(&mut self, value: u16) {
139        let v = self.ref_value_len_in_byte.load(atomic::Ordering::Relaxed);
140        self.ref_value_len_in_byte.store(
141            (v & !VALUE_LEN_MASK) | (value & VALUE_LEN_MASK),
142            atomic::Ordering::Relaxed,
143        );
144    }
145
146    pub fn set_op_type(&mut self, op_type: OpType) {
147        self.op_type_key_len_in_byte = self.get_key_len() | ((op_type as u16) << OP_TYPE_SHIFT);
148    }
149
150    pub fn op_type(&self) -> OpType {
151        let l = self.op_type_key_len_in_byte;
152        let b = l >> OP_TYPE_SHIFT;
153        unsafe { std::mem::transmute(b as u8) }
154    }
155
156    pub fn mark_as_ref(&self) {
157        self.ref_value_len_in_byte
158            .fetch_or(REF_BIT_MASK, atomic::Ordering::Relaxed);
159    }
160
161    pub fn clear_ref(&self) {
162        self.ref_value_len_in_byte
163            .fetch_and(!REF_BIT_MASK, atomic::Ordering::Relaxed);
164    }
165
166    pub fn is_referenced(&self) -> bool {
167        self.ref_value_len_in_byte.load(atomic::Ordering::Relaxed) & REF_BIT_MASK != 0
168    }
169
170    pub fn mark_as_deleted(&mut self) {
171        self.set_op_type(OpType::Delete);
172    }
173
174    #[allow(dead_code)]
175    pub fn is_deleted(&self) -> bool {
176        self.op_type() == OpType::Delete
177    }
178
179    pub(crate) fn get_offset(&self) -> u16 {
180        self.offset
181    }
182
183    pub(crate) fn get_key_len(&self) -> u16 {
184        self.op_type_key_len_in_byte & KEY_LEN_MASK
185    }
186}
187
188#[derive(Debug, Clone, Copy)]
189pub(crate) struct MiniPageNextLevel {
190    val: usize,
191}
192
193impl MiniPageNextLevel {
194    pub(crate) fn new(val: usize) -> Self {
195        Self { val }
196    }
197
198    pub(crate) fn as_offset(&self) -> usize {
199        assert!(!self.is_null());
200        self.val
201    }
202
203    pub(crate) fn new_null() -> Self {
204        Self { val: usize::MAX }
205    }
206
207    pub(crate) fn is_null(&self) -> bool {
208        self.val == usize::MAX
209    }
210}
211
212#[repr(C)]
213pub(crate) struct LeafNode {
214    pub(crate) meta: NodeMeta,
215    prefix_len: u16,
216    pub(crate) next_level: MiniPageNextLevel,
217    pub(crate) lsn: u64,
218    data: [u8; 0],
219}
220
221const _: () = assert!(std::mem::size_of::<LeafNode>() == 24);
222
223impl LeafNode {
224    fn max_data_size(node_size: usize) -> usize {
225        node_size - std::mem::size_of::<LeafNode>()
226    }
227
228    pub(crate) fn initialize_mini_page(
229        ptr: &CircularBufferPtr,
230        node_size: usize,
231        next_level: MiniPageNextLevel,
232        cache_only: bool,
233    ) {
234        unsafe {
235            Self::init_node_with_fence(
236                ptr.as_ptr(),
237                &[],
238                &[],
239                node_size,
240                next_level,
241                false,
242                cache_only,
243            )
244        }
245    }
246
247    pub(crate) fn make_base_page(node_size: usize) -> *mut Self {
248        let layout = Layout::from_size_align(node_size, std::mem::align_of::<LeafNode>()).unwrap();
249        let ptr = unsafe { std::alloc::alloc(layout) };
250        unsafe {
251            Self::init_node_with_fence(
252                ptr,
253                &[],
254                &[],
255                node_size,
256                MiniPageNextLevel::new_null(),
257                true,
258                false,
259            );
260        }
261        ptr as *mut Self
262    }
263
264    /// After promoting a base page to full page cache, we need to mark every records as cache.
265    pub(crate) fn covert_insert_records_to_cache(&mut self) {
266        let mut cur = self.first_meta_pos_after_fence();
267
268        while cur < self.meta.meta_count_with_fence() {
269            let meta = self.get_kv_meta_mut(cur as usize);
270            let op_type = meta.op_type();
271
272            match op_type {
273                OpType::Insert => {
274                    meta.set_op_type(OpType::Cache);
275                }
276                OpType::Delete => {
277                    meta.set_op_type(OpType::Phantom);
278                }
279                OpType::Cache | OpType::Phantom => {
280                    unreachable!("Base page should not have op type: {:?}", op_type);
281                }
282            };
283
284            cur += 1;
285        }
286    }
287
288    /// When merge full page cache to base page, we need to set clean records to dirty records.
289    pub(crate) fn convert_cache_records_to_insert(&mut self) {
290        let mut cur = self.first_meta_pos_after_fence();
291
292        while cur < self.meta.meta_count_with_fence() {
293            let meta = self.get_kv_meta_mut(cur as usize);
294            let op_type = meta.op_type();
295
296            match op_type {
297                OpType::Cache => {
298                    meta.set_op_type(OpType::Insert);
299                }
300                OpType::Phantom => {
301                    meta.set_op_type(OpType::Delete);
302                }
303                OpType::Insert | OpType::Delete => {
304                    // other types might present.
305                }
306            };
307
308            cur += 1;
309        }
310    }
311
312    unsafe fn init_node_with_fence(
313        ptr: *mut u8,
314        low_fence: &[u8],
315        high_fence: &[u8],
316        node_size: usize,
317        next_level: MiniPageNextLevel,
318        has_fence: bool,
319        cache_only: bool,
320    ) {
321        let ptr = ptr as *mut Self;
322
323        { &mut *ptr }.initialize(
324            low_fence, high_fence, node_size, next_level, has_fence, cache_only,
325        );
326    }
327
328    pub(crate) fn get_kv_meta(&self, index: usize) -> &LeafKVMeta {
329        debug_assert!(index < self.meta.meta_count_with_fence() as usize);
330        let meta_ptr = self.data.as_ptr() as *const LeafKVMeta;
331        unsafe { &*meta_ptr.add(index) }
332    }
333
334    pub(crate) fn get_full_key(&self, meta: &LeafKVMeta) -> Vec<u8> {
335        let prefix = self.get_prefix();
336        let remaining = self.get_remaining_key(meta);
337        [prefix, remaining].concat()
338    }
339
340    /// Get the full key for low fence which is not prefix compressed
341    pub(crate) fn get_low_fence_full_key(&self) -> Vec<u8> {
342        debug_assert!(LOW_FENCE_IDX < self.meta.meta_count_with_fence() as usize);
343        let meta_ptr = self.data.as_ptr() as *const LeafKVMeta;
344        let meta = unsafe { &*meta_ptr.add(LOW_FENCE_IDX) };
345
346        let key_offset = meta.get_offset();
347        let key_ptr = unsafe { self.data.as_ptr().add(key_offset as usize) };
348        unsafe {
349            let key = std::slice::from_raw_parts(key_ptr, (meta.get_key_len()) as usize);
350            [key].concat()
351        }
352    }
353
354    pub(crate) fn get_kv_meta_mut(&mut self, index: usize) -> &mut LeafKVMeta {
355        debug_assert!(index < self.meta.meta_count_with_fence() as usize);
356        let meta_ptr = self.data.as_mut_ptr() as *mut LeafKVMeta;
357        unsafe { meta_ptr.add(index).as_mut().unwrap() }
358    }
359
360    pub(crate) fn write_initial_kv_meta(&mut self, index: usize, meta: LeafKVMeta) {
361        let meta_ptr = self.data.as_mut_ptr() as *mut LeafKVMeta;
362        unsafe { meta_ptr.add(index).write(meta) };
363    }
364
365    pub(crate) fn get_remaining_key(&self, meta: &LeafKVMeta) -> &[u8] {
366        let key_offset = meta.get_offset();
367        let key_ptr = unsafe { self.data.as_ptr().add(key_offset as usize) };
368        unsafe {
369            std::slice::from_raw_parts(key_ptr, (meta.get_key_len() - self.prefix_len) as usize)
370        }
371    }
372
373    pub(crate) fn get_prefix(&self) -> &[u8] {
374        let m = self.get_kv_meta(LOW_FENCE_IDX);
375        let key_offset = m.get_offset();
376        unsafe {
377            std::slice::from_raw_parts(
378                self.data.as_ptr().add(key_offset as usize),
379                self.prefix_len as usize,
380            )
381        }
382    }
383
384    pub(crate) fn install_fence_key(&mut self, key: &[u8], is_high_fence: bool) {
385        let loc: u16 = self.meta.meta_count_with_fence();
386        if !is_high_fence {
387            debug_assert!(self.meta.meta_count_with_fence() == LOW_FENCE_IDX as u16);
388        } else {
389            debug_assert!(self.meta.meta_count_with_fence() == HIGH_FENCE_IDX as u16);
390        }
391
392        let cur_low_offset = self.current_lowest_offset();
393
394        self.meta.increment_value_count();
395        self.meta.remaining_size -= std::mem::size_of::<LeafKVMeta>() as u16;
396
397        if key.is_empty() {
398            let fence = if is_high_fence {
399                LeafKVMeta::make_infinite_high_fence_key()
400            } else {
401                LeafKVMeta::make_infinite_low_fence_key()
402            };
403            self.write_initial_kv_meta(loc as usize, fence);
404        } else {
405            let prefix_len = if is_high_fence { self.prefix_len } else { 0 }; // we don't compress low fence.
406
407            let post_fix_len = key.len() as u16 - prefix_len;
408            let val_len = 0u16;
409            let kv_len = post_fix_len + val_len;
410
411            let remaining = self.meta.remaining_size;
412
413            debug_assert!(kv_len <= remaining);
414
415            let offset = cur_low_offset - kv_len;
416
417            let new_meta =
418                LeafKVMeta::make_prefixed_meta(offset, 0, key, prefix_len, OpType::Insert);
419
420            self.write_initial_kv_meta(loc as usize, new_meta);
421
422            unsafe {
423                let start_ptr = self.data.as_mut_ptr().add(offset as usize);
424
425                let key_slice = std::slice::from_raw_parts(
426                    key.as_ptr().add(prefix_len as usize),
427                    post_fix_len as usize,
428                );
429                std::ptr::copy_nonoverlapping(key_slice.as_ptr(), start_ptr, post_fix_len as usize);
430            }
431
432            self.meta.remaining_size -= kv_len;
433        }
434    }
435
436    pub(crate) fn current_lowest_offset(&self) -> u16 {
437        let value_count = self.meta.meta_count_with_fence();
438        let rt =
439            (value_count * std::mem::size_of::<LeafKVMeta>() as u16) + self.meta.remaining_size;
440
441        // Sanity check
442        #[cfg(debug_assertions)]
443        {
444            let mut min_offset = LeafNode::max_data_size(self.meta.node_size as usize) as u16;
445            for i in 0..value_count {
446                let kv_meta = self.get_kv_meta(i as usize);
447                if kv_meta.is_infinite_low_fence_key() || kv_meta.is_infinite_high_fence_key() {
448                    continue;
449                }
450                min_offset = std::cmp::min(min_offset, kv_meta.offset);
451            }
452            assert!(min_offset == rt);
453        }
454        rt
455    }
456
457    pub(crate) fn get_value(&self, meta: &LeafKVMeta) -> &[u8] {
458        let val_offset = meta.get_offset() + meta.get_key_len() - self.prefix_len;
459        let val_ptr = unsafe { self.data.as_ptr().add(val_offset as usize) };
460        unsafe { std::slice::from_raw_parts(val_ptr, meta.value_len() as usize) }
461    }
462
463    /// A good split key has two properties:
464    /// (1). it split the nodes into two roughly equal sizes.
465    /// (2). it should be short, so that parent node search can be faster.
466    ///
467    /// Get the split key is tricky:
468    /// 1. We can't just use the middle key, because the key/value are variable length. We want the key that split the node in size, not in key count.
469    /// 2. We don't try to find small split key here.
470    ///
471    /// Returns the key that split the node into two roughly equal size, and the new node count.
472    /// This is only invoked once for splitting the root node.
473    pub fn get_split_key(&self, cache_only: bool) -> (Vec<u8>, u16) {
474        let mut data_size = 0;
475
476        for meta in self.meta_iter() {
477            let key_len = meta.get_key_len();
478            let value_len = meta.value_len();
479
480            data_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
481        }
482
483        let split_target_size = data_size / 2;
484        let mut cur_size = 0;
485
486        for (i, meta) in self.meta_iter().enumerate() {
487            let key_len = meta.get_key_len();
488            let value_len = meta.value_len();
489
490            cur_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
491
492            // Here we guarantee in cache-only mode, the root node has at least two keys
493            if cache_only && i == 0 {
494                continue;
495            }
496
497            if cur_size >= split_target_size {
498                return (self.get_full_key(meta), i as u16);
499            }
500        }
501        unreachable!();
502    }
503
504    /// Get # of keys strictly smaller than a merge_split_key
505    #[allow(clippy::unused_enumerate_index)]
506    pub fn get_kv_num_below_key(&self, merge_split_key: &Vec<u8>) -> u16 {
507        // Linear search
508        let mut cnt: u16 = 0;
509        for (_, meta) in self.meta_iter().enumerate() {
510            let key = self.get_full_key(meta);
511            let cmp = key.cmp(merge_split_key);
512            // Pick all records from the base page whose key is smaller
513            // than merge_split_key
514            if cmp == std::cmp::Ordering::Less {
515                cnt += 1;
516            } else {
517                break;
518            }
519        }
520        cnt
521    }
522
523    /// [Cache-only mode]: Find the splitting key that evenly splits all the records in the
524    /// current node and the to-be-inserted record in half. The caller needs to guarantee that
525    /// there are at least two records with different such that it won't result in an empty page
526    /// after split
527    pub fn get_cache_only_insert_split_key(&self, key: &[u8], new_record_size: &u16) -> Vec<u8> {
528        let mut merge_split_key_1: Option<Vec<u8>> = None;
529        let mut merge_split_key_2: Option<Vec<u8>> = None;
530        let mut diff_1: i16 = i16::MAX;
531        let mut diff_2: i16 = i16::MAX;
532
533        // The total size of all records including the new record to insert
534        let mut total_merged_size: u16 = 0;
535
536        for meta in self.meta_iter() {
537            let key_len = meta.get_key_len();
538            let value_len = meta.value_len();
539
540            total_merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
541        }
542
543        total_merged_size += new_record_size + std::mem::size_of::<LeafKVMeta>() as u16;
544        let split_target_size = total_merged_size / 2;
545
546        // Search for the splitting key
547        let mut merged_size: u16 = 0;
548        let mut self_meta_iter = self.meta_iter();
549        let mut self_meta_option = self_meta_iter.next();
550
551        // Go through all records in the base page whose keys are smaller than the new record's key
552        if self_meta_option.is_some() {
553            let mut cur_base_meta = self_meta_option.unwrap();
554            let mut cur_base_key = self.get_full_key(cur_base_meta);
555            let mut cmp = cur_base_key.as_slice().cmp(key);
556
557            while cmp == std::cmp::Ordering::Less {
558                let key_len = cur_base_meta.get_key_len();
559                let value_len = cur_base_meta.value_len();
560                merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
561                if merged_size >= split_target_size {
562                    // Two split key candidates are already found
563                    // Stop
564                    if merge_split_key_2.is_some() {
565                        break;
566                    }
567
568                    let left_side: u16 = merged_size
569                        - key_len
570                        - value_len
571                        - std::mem::size_of::<LeafKVMeta>() as u16;
572                    let right_side = total_merged_size - left_side;
573
574                    if merge_split_key_1.is_none() {
575                        merge_split_key_1 = Some(cur_base_key);
576                        diff_1 = (left_side as i16 - right_side as i16).abs();
577                    } else {
578                        merge_split_key_2 = Some(cur_base_key);
579                        diff_2 = (left_side as i16 - right_side as i16).abs();
580                    }
581                }
582
583                self_meta_option = self_meta_iter.next();
584                if let Some(meta) = self_meta_option {
585                    cur_base_meta = meta;
586                    cur_base_key = self.get_full_key(cur_base_meta);
587                    cmp = cur_base_key.as_slice().cmp(key);
588                } else {
589                    break;
590                }
591            }
592        }
593
594        // Count the new key
595        if merge_split_key_2.is_none() {
596            merged_size += new_record_size + std::mem::size_of::<LeafKVMeta>() as u16;
597
598            if merged_size >= split_target_size {
599                // Two split key candidates are already found
600                // Stop
601
602                let left_side: u16 =
603                    merged_size - new_record_size - std::mem::size_of::<LeafKVMeta>() as u16;
604                let right_side = total_merged_size - left_side;
605
606                if merge_split_key_1.is_none() {
607                    merge_split_key_1 = Some(key.to_vec());
608                    diff_1 = (left_side as i16 - right_side as i16).abs();
609                } else {
610                    merge_split_key_2 = Some(key.to_vec());
611                    diff_2 = (left_side as i16 - right_side as i16).abs();
612                }
613            }
614        }
615
616        // Go through the rest of records in the base page
617        while self_meta_option.is_some() {
618            let base_meta = self_meta_option.unwrap();
619            let key_len = base_meta.get_key_len();
620            let value_len = base_meta.value_len();
621            merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
622
623            if merged_size >= split_target_size {
624                // Return the splitting key
625                let cur_base_key = self.get_full_key(base_meta);
626                if merge_split_key_2.is_some() {
627                    break;
628                }
629
630                let left_side: u16 =
631                    merged_size - key_len - value_len - std::mem::size_of::<LeafKVMeta>() as u16;
632                let right_side = total_merged_size - left_side;
633
634                if merge_split_key_1.is_none() {
635                    merge_split_key_1 = Some(cur_base_key);
636                    diff_1 = (left_side as i16 - right_side as i16).abs();
637                } else {
638                    merge_split_key_2 = Some(cur_base_key);
639                    diff_2 = (left_side as i16 - right_side as i16).abs();
640                }
641            }
642            self_meta_option = self_meta_iter.next();
643        }
644
645        // Pick the splitting that achieves the smallest size difference between
646        // the two halves
647        if merge_split_key_1.is_none() {
648            panic!(
649                "Fail to find a splitting key for merging mini and base page.{}, {}",
650                merged_size, split_target_size
651            );
652        }
653
654        if merge_split_key_2.is_none() || diff_1 < diff_2 {
655            return merge_split_key_1.unwrap();
656        }
657
658        merge_split_key_2.unwrap()
659    }
660
661    /// Find the splitting key that divides the merged records of
662    /// the mini-page and its correponding base page evenly in two
663    /// groups (base pages). Special considerations go to the split-key
664    /// bearing (k,v) pair.
665    /// Caller needs to ensure there are at least two distinct keys among the
666    /// mini page and self.
667    #[allow(clippy::unnecessary_unwrap)]
668    pub(crate) fn get_merge_split_key(&mut self, mini_page: &LeafNode) -> Vec<u8> {
669        let mut merge_split_key_1: Option<Vec<u8>> = None;
670        let mut merge_split_key_2: Option<Vec<u8>> = None;
671        let mut diff_1: i16 = i16::MAX;
672        let mut diff_2: i16 = i16::MAX;
673
674        let mut total_merged_size: u16 = 0;
675        let mut base_meta_iter = self.meta_iter();
676        let mut cur_pos = base_meta_iter.cur;
677        let mut cur_base_meta_option = base_meta_iter.next();
678        let mut duplicate_positions = vec![];
679
680        // Calculate the size of all distinctively merged records through merge-sort
681        for mini_meta in mini_page.meta_iter() {
682            let c_type = mini_meta.op_type();
683            // Skip cached or phantom records as they will not be merged into base pages
684            if !c_type.is_dirty() {
685                continue;
686            }
687
688            let cur_mini_key = mini_page.get_full_key(mini_meta);
689            if cur_base_meta_option.is_some() {
690                let mut cur_base_meta = cur_base_meta_option.unwrap();
691                let mut cur_base_key = self.get_full_key(cur_base_meta);
692                let mut cmp = cur_base_key.cmp(&cur_mini_key);
693
694                // Go through all records from the base page whose key is strictly smaller
695                // than the current record from the mini page
696                while cmp == std::cmp::Ordering::Less {
697                    let key_len = cur_base_meta.get_key_len();
698                    let value_len = cur_base_meta.value_len();
699                    total_merged_size +=
700                        key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
701
702                    cur_pos = base_meta_iter.cur;
703                    cur_base_meta_option = base_meta_iter.next();
704                    if cur_base_meta_option.is_some() {
705                        cur_base_meta = cur_base_meta_option.unwrap();
706                        cur_base_key = self.get_full_key(cur_base_meta);
707                        cmp = cur_base_key.cmp(&cur_mini_key);
708                    } else {
709                        break;
710                    }
711                }
712
713                // If the comparison is equal then advance the base page iterator to avoid counting it
714                // twice
715                if cmp == std::cmp::Ordering::Equal && cur_base_meta_option.is_some() {
716                    // Mark the duplicate entry for deletion
717                    duplicate_positions.push(cur_pos);
718                    cur_pos = base_meta_iter.cur;
719                    cur_base_meta_option = base_meta_iter.next();
720                }
721            }
722
723            let key_len = mini_meta.get_key_len();
724            let value_len = mini_meta.value_len();
725            total_merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
726        }
727
728        // Mini-page records are exhuasted, go through the rest of
729        // records from the base page, if any
730        while cur_base_meta_option.is_some() {
731            let base_meta = cur_base_meta_option.unwrap();
732            let key_len = base_meta.get_key_len();
733            let value_len = base_meta.value_len();
734            total_merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
735
736            cur_base_meta_option = base_meta_iter.next();
737        }
738
739        // The target split size is half of all the merged records
740        let split_target_size = total_merged_size / 2;
741
742        // Merge sort the distinct records from the mini page and the base page
743        // until the size of the sorted records reaches the target split size
744        let mut merged_size: u16 = 0;
745        base_meta_iter = self.meta_iter();
746        cur_base_meta_option = base_meta_iter.next();
747
748        for mini_meta in mini_page.meta_iter() {
749            // Skip cached or phantom records as they will
750            // not take additional space after merging
751            let c_type = mini_meta.op_type();
752            if !c_type.is_dirty() {
753                continue;
754            }
755
756            let cur_mini_key = mini_page.get_full_key(mini_meta);
757            if cur_base_meta_option.is_some() {
758                let mut cur_base_meta = cur_base_meta_option.unwrap();
759                let mut cur_base_key = self.get_full_key(cur_base_meta);
760                let mut cmp = cur_base_key.cmp(&cur_mini_key);
761                // Go through all records from the base page whose key is strictly smaller
762                // than the current record from the mini page
763                while cmp == std::cmp::Ordering::Less {
764                    let key_len = cur_base_meta.get_key_len();
765                    let value_len = cur_base_meta.value_len();
766                    merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
767                    if merged_size >= split_target_size {
768                        // Two split key candidates are already found
769                        // Stop
770                        if merge_split_key_2.is_some() {
771                            break;
772                        }
773
774                        let left_side: u16 = merged_size
775                            - key_len
776                            - value_len
777                            - std::mem::size_of::<LeafKVMeta>() as u16;
778                        let right_side = total_merged_size - left_side;
779
780                        if merge_split_key_1.is_none() {
781                            merge_split_key_1 = Some(cur_base_key);
782                            diff_1 = (left_side as i16 - right_side as i16).abs();
783                        } else {
784                            merge_split_key_2 = Some(cur_base_key);
785                            diff_2 = (left_side as i16 - right_side as i16).abs();
786                        }
787                    }
788
789                    cur_base_meta_option = base_meta_iter.next();
790                    if cur_base_meta_option.is_some() {
791                        cur_base_meta = cur_base_meta_option.unwrap();
792                        cur_base_key = self.get_full_key(cur_base_meta);
793                        cmp = cur_base_key.cmp(&cur_mini_key);
794                    } else {
795                        break;
796                    }
797                }
798
799                // If the comparison is equal then advance the base page iterator to avoid counting it
800                // twice
801                if cmp == std::cmp::Ordering::Equal && cur_base_meta_option.is_some() {
802                    cur_base_meta_option = base_meta_iter.next();
803                }
804            }
805
806            let key_len = mini_meta.get_key_len();
807            let value_len = mini_meta.value_len();
808            merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
809
810            if merged_size >= split_target_size {
811                // Two split key candidates are already found
812                // Stop
813                if merge_split_key_2.is_some() {
814                    break;
815                }
816
817                let left_side: u16 =
818                    merged_size - key_len - value_len - std::mem::size_of::<LeafKVMeta>() as u16;
819                let right_side = total_merged_size - left_side;
820
821                if merge_split_key_1.is_none() {
822                    merge_split_key_1 = Some(cur_mini_key);
823                    diff_1 = (left_side as i16 - right_side as i16).abs();
824                } else {
825                    merge_split_key_2 = Some(cur_mini_key);
826                    diff_2 = (left_side as i16 - right_side as i16).abs();
827                }
828            }
829        }
830
831        // Mini-page records are exhuasted, go through the rest of
832        // records from the base page, if any
833        while cur_base_meta_option.is_some() {
834            let base_meta = cur_base_meta_option.unwrap();
835            let key_len = base_meta.get_key_len();
836            let value_len = base_meta.value_len();
837            merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
838
839            if merged_size >= split_target_size {
840                // Return the splitting key
841                let cur_base_key = self.get_full_key(base_meta);
842                if merge_split_key_2.is_some() {
843                    break;
844                }
845
846                let left_side: u16 =
847                    merged_size - key_len - value_len - std::mem::size_of::<LeafKVMeta>() as u16;
848                let right_side = total_merged_size - left_side;
849
850                if merge_split_key_1.is_none() {
851                    merge_split_key_1 = Some(cur_base_key);
852                    diff_1 = (left_side as i16 - right_side as i16).abs();
853                } else {
854                    merge_split_key_2 = Some(cur_base_key);
855                    diff_2 = (left_side as i16 - right_side as i16).abs();
856                }
857            }
858            cur_base_meta_option = base_meta_iter.next();
859        }
860
861        // Delete all duplicate entries from the base page
862        for pos in duplicate_positions {
863            let pos_meta = self.get_kv_meta_mut(pos);
864            pos_meta.mark_as_deleted();
865        }
866
867        if merge_split_key_1.is_none() {
868            unreachable!(
869                "Fail to find a splitting key for merging mini and base page.{}, {}",
870                merged_size, split_target_size
871            );
872        }
873
874        // The two split keys must be different
875        if merge_split_key_2.is_some() {
876            let cmp = merge_split_key_1
877                .as_ref()
878                .unwrap()
879                .cmp(merge_split_key_2.as_ref().unwrap());
880            assert_ne!(cmp, std::cmp::Ordering::Equal);
881        }
882
883        let mut splitting_key = if merge_split_key_2.is_none() || diff_1 < diff_2 {
884            merge_split_key_1.as_ref().unwrap()
885        } else {
886            merge_split_key_2.as_ref().unwrap()
887        };
888
889        // The splitting key cannot be the low fence as it leads to invalid page
890        // and duplicate entries in inner node
891        let mut fence_meta = self.get_kv_meta(LOW_FENCE_IDX);
892        if !fence_meta.is_infinite_low_fence_key() {
893            let low_fence_key = self.get_low_fence_key();
894            let cmp = splitting_key.cmp(&low_fence_key);
895            if cmp == std::cmp::Ordering::Equal {
896                splitting_key = merge_split_key_2.as_ref().unwrap();
897            }
898        }
899
900        fence_meta = self.get_kv_meta(HIGH_FENCE_IDX);
901        if !fence_meta.is_infinite_high_fence_key() {
902            let high_fence_key = self.get_high_fence_key();
903            let cmp = splitting_key.cmp(&high_fence_key);
904            assert_ne!(cmp, std::cmp::Ordering::Equal);
905        }
906
907        splitting_key.clone()
908    }
909
910    pub fn get_key_to_reach_this_node(&self) -> Vec<u8> {
911        let meta = self.get_kv_meta(self.first_meta_pos_after_fence() as usize);
912        self.get_full_key(meta)
913    }
914
915    /// In cache-only mode, transient state could be observed when one thread is in the middle of
916    /// consolidating the leaf node, while the evicting callback thread is attempting a
917    /// unprotected read. As such, this function guarantees no panic happens during the key read
918    pub fn try_get_key_to_reach_this_node(&self) -> Result<Vec<u8>, TreeError> {
919        assert!(self.meta.is_cache_only_leaf());
920
921        // Get the meta of the first key
922        let index = 0;
923        if index >= self.meta.meta_count_with_fence() as usize {
924            return Err(TreeError::NeedRestart);
925        }
926
927        let meta_ptr = self.data.as_ptr() as *const LeafKVMeta;
928        let meta = unsafe { &*meta_ptr.add(index) };
929
930        let key_offset = meta.get_offset();
931        let key_len = meta.get_key_len();
932
933        if key_offset + key_len > LeafNode::max_data_size(self.meta.node_size as usize) as u16 {
934            return Err(TreeError::NeedRestart);
935        }
936
937        let key_ptr = unsafe { self.data.as_ptr().add(key_offset as usize) };
938        let key_slice =
939            unsafe { std::slice::from_raw_parts(key_ptr, (key_len - self.prefix_len) as usize) };
940        Ok(key_slice.to_vec())
941    }
942
943    pub fn get_low_fence_key(&self) -> Vec<u8> {
944        assert!(self.has_fence());
945        let fence_meta = self.get_kv_meta(LOW_FENCE_IDX);
946        if fence_meta.is_infinite_low_fence_key() {
947            vec![]
948        } else {
949            unsafe {
950                let start_ptr = self.data.as_ptr().add(fence_meta.offset as usize);
951                let key_slice =
952                    std::slice::from_raw_parts(start_ptr, fence_meta.get_key_len() as usize);
953                key_slice.to_vec()
954            }
955        }
956    }
957
958    pub fn get_high_fence_key(&self) -> Vec<u8> {
959        assert!(self.has_fence());
960        let fence_meta = self.get_kv_meta(HIGH_FENCE_IDX);
961        if fence_meta.is_infinite_high_fence_key() {
962            vec![]
963        } else {
964            self.get_full_key(fence_meta)
965        }
966    }
967
968    /// Compares the input key with the existing key specified by `meta`
969    /// By convention, key_cmp(meta, key) returns the ordering matching the expression meta <operator> key if true.
970    #[inline]
971    pub(crate) fn key_cmp(&self, meta: &LeafKVMeta, key: &[u8]) -> Ordering {
972        let search_key_prefix = &key[(self.prefix_len as usize)..]
973            [..std::cmp::min(key.len() - self.prefix_len as usize, PREVIEW_SIZE)];
974
975        let prefix_key = &meta.preview_bytes[..std::cmp::min(
976            PREVIEW_SIZE,
977            meta.get_key_len() as usize - self.prefix_len as usize,
978        )];
979        let mut cmp = prefix_key.cmp(search_key_prefix);
980
981        // If the prefix matches, compare the full key
982        if cmp == Ordering::Equal {
983            let full_key = self.get_remaining_key(meta);
984            let search_key_postfix = &key[self.prefix_len as usize..];
985            cmp = full_key.cmp(search_key_postfix);
986        }
987        cmp
988    }
989
990    pub(crate) fn linear_lower_bound(&self, key: &[u8]) -> u16 {
991        debug_assert!(key.len() >= self.prefix_len as usize);
992
993        let mut index = self.first_meta_pos_after_fence();
994
995        while index < self.meta.meta_count_with_fence() {
996            let key_meta = self.get_kv_meta(index as usize);
997
998            #[cfg(target_arch = "x86_64")]
999            unsafe {
1000                // For bw-tree-like linear search, we use clflush to simulate pointer chasing.
1001                core::arch::x86_64::_mm_clflush(key_meta as *const LeafKVMeta as *const u8);
1002            }
1003            let cmp = self.key_cmp(key_meta, key);
1004
1005            if cmp != Ordering::Less {
1006                return index;
1007            }
1008
1009            index += 1;
1010        }
1011
1012        index
1013    }
1014
1015    pub(crate) fn lower_bound(&self, key: &[u8]) -> u16 {
1016        let mut lower = self.first_meta_pos_after_fence();
1017        let mut upper = self.meta.meta_count_with_fence();
1018
1019        let search_key_prefix = &key[(self.prefix_len as usize)..]
1020            [..std::cmp::min(key.len() - self.prefix_len as usize, PREVIEW_SIZE)];
1021
1022        debug_assert!(key.len() >= self.prefix_len as usize);
1023
1024        while lower < upper {
1025            let mid = lower + (upper - lower) / 2;
1026            let key_meta = self.get_kv_meta(mid as usize);
1027
1028            let prefix_key = &key_meta.preview_bytes[..std::cmp::min(
1029                PREVIEW_SIZE,
1030                key_meta.get_key_len() as usize - self.prefix_len as usize,
1031            )];
1032            let mut cmp = prefix_key.cmp(search_key_prefix);
1033
1034            // If the prefix matches, compare the full key
1035            if cmp == Ordering::Equal {
1036                let remaining_key = self.get_remaining_key(key_meta);
1037                let search_key_postfix = &key[self.prefix_len as usize..];
1038                cmp = remaining_key.cmp(search_key_postfix);
1039            }
1040
1041            match cmp {
1042                Ordering::Greater => {
1043                    upper = mid;
1044                }
1045                Ordering::Equal => {
1046                    return mid;
1047                }
1048                Ordering::Less => {
1049                    lower = mid + 1;
1050                }
1051            }
1052        }
1053        lower
1054    }
1055
1056    /// Take a deep breath before you read/change this function.
1057    ///
1058    /// Leaf node insert is different from inner node that:
1059    /// 1. Leaf node value is variable length
1060    /// 2. Leaf node need to handle duplicate key, in which case it need to remove the old value and insert the new value.
1061    ///
1062    /// Returns true if the insert is successful, false if out of space.
1063    pub(crate) fn insert(
1064        &mut self,
1065        key: &[u8],
1066        value: &[u8],
1067        op_type: OpType,
1068        max_fence_len: usize,
1069    ) -> bool {
1070        debug_assert!(key.len() as u16 >= self.prefix_len);
1071        match op_type {
1072            OpType::Insert | OpType::Cache => {
1073                debug_assert!(!value.is_empty());
1074            }
1075            OpType::Delete | OpType::Phantom => {}
1076        }
1077
1078        let post_fix_len = key.len() as u16 - self.prefix_len;
1079        let val_len = value.len() as u16;
1080        let kv_len = post_fix_len + val_len;
1081
1082        let value_count_with_fence = self.meta.meta_count_with_fence();
1083
1084        let pos = self.lower_bound(key) as usize;
1085
1086        if pos < value_count_with_fence as usize {
1087            let prefix_len = self.prefix_len as usize;
1088            let pos_meta = self.get_kv_meta(pos);
1089            let pos_key = self.get_remaining_key(pos_meta);
1090            let search_key_postfix = &key[prefix_len..];
1091            if pos_key.cmp(search_key_postfix) == Ordering::Equal {
1092                // The key already exists.
1093                counter!(LeafInsertDuplicate);
1094                if op_type == OpType::Delete {
1095                    let pos_meta = self.get_kv_meta_mut(pos);
1096                    pos_meta.mark_as_deleted();
1097                    return true;
1098                }
1099
1100                let pos_value = self.get_value(pos_meta);
1101                let pos_value_len = pos_value.len() as u16;
1102
1103                if pos_value_len >= val_len {
1104                    // we are lucky, old value is larger than new value. We just overwrite the old value.
1105                    unsafe {
1106                        let pair_ptr = self.data.as_ptr().add(pos_meta.offset as usize) as *mut u8;
1107                        std::ptr::copy_nonoverlapping(
1108                            value.as_ptr(),
1109                            pair_ptr.add(post_fix_len as usize),
1110                            val_len as usize,
1111                        );
1112                    }
1113                    let pos_meta = self.get_kv_meta_mut(pos);
1114                    pos_meta.set_value_len(val_len);
1115                    pos_meta.set_op_type(op_type);
1116                    return true;
1117                }
1118
1119                if self.meta.remaining_size < kv_len {
1120                    return false;
1121                }
1122                assert!(op_type != OpType::Cache);
1123                let offset = self.current_lowest_offset() - kv_len;
1124                unsafe {
1125                    let pair_ptr = self.data.as_ptr().add(offset as usize) as *mut u8;
1126
1127                    let pos_meta = self.get_kv_meta_mut(pos);
1128                    pos_meta.set_value_len(val_len);
1129                    pos_meta.set_op_type(op_type);
1130                    pos_meta.offset = offset;
1131                    std::ptr::copy_nonoverlapping(
1132                        key[self.prefix_len as usize..].as_ptr(),
1133                        pair_ptr,
1134                        post_fix_len as usize,
1135                    );
1136                    std::ptr::copy_nonoverlapping(
1137                        value.as_ptr(),
1138                        pair_ptr.add(post_fix_len as usize),
1139                        val_len as usize,
1140                    );
1141                }
1142                self.meta.remaining_size -= kv_len;
1143                return true;
1144            }
1145        }
1146
1147        // The key is not already in the node.
1148        // For any in-memory page, skipping delete records here could lead to empty page
1149        // which becomes un-evictable.
1150        if op_type == OpType::Delete && (self.is_base_page()) {
1151            // we are deleting a key that is not in the page.
1152            return true;
1153        }
1154
1155        //Check if the node has capacity for the new record with or without fences
1156        if self.full_with_fences(
1157            kv_len + std::mem::size_of::<LeafKVMeta>() as u16,
1158            max_fence_len,
1159        ) {
1160            return false;
1161        }
1162
1163        counter!(LeafInsertNew);
1164
1165        let offset = self.current_lowest_offset() - kv_len;
1166        let new_meta =
1167            LeafKVMeta::make_prefixed_meta(offset, val_len, key, self.prefix_len, op_type);
1168
1169        unsafe {
1170            let metas_size = std::mem::size_of::<LeafKVMeta>()
1171                * (self.meta.meta_count_with_fence() - pos as u16) as usize;
1172            let src_ptr = self
1173                .data
1174                .as_ptr()
1175                .add(pos * std::mem::size_of::<LeafKVMeta>());
1176            let dest_ptr = self
1177                .data
1178                .as_mut_ptr()
1179                .add((pos + 1) * std::mem::size_of::<LeafKVMeta>());
1180            std::ptr::copy(src_ptr, dest_ptr, metas_size);
1181
1182            self.write_initial_kv_meta(pos, new_meta);
1183
1184            let pair_ptr = self.data.as_mut_ptr().add(offset as usize);
1185            std::ptr::copy_nonoverlapping(
1186                key[self.prefix_len as usize..].as_ptr(),
1187                pair_ptr,
1188                post_fix_len as usize,
1189            );
1190            std::ptr::copy_nonoverlapping(
1191                value.as_ptr(),
1192                pair_ptr.add(post_fix_len as usize),
1193                val_len as usize,
1194            );
1195        }
1196
1197        self.meta.remaining_size -= kv_len + std::mem::size_of::<LeafKVMeta>() as u16;
1198        self.meta.increment_value_count();
1199        true
1200    }
1201
1202    /// Base pages are leaf pages without next pointer in non cache-only mode
1203    pub(crate) fn is_base_page(&self) -> bool {
1204        (!self.meta.is_cache_only_leaf()) && self.next_level.is_null()
1205    }
1206
1207    /// Returns the splitting key after splitting self
1208    /// The new high fence key of self (left-side node) and low fence
1209    /// of the new base page (right-side node) equal to the splitting key
1210    pub fn split(&mut self, sibling: &mut LeafNode, cache_only: bool) -> Vec<u8> {
1211        if cache_only {
1212            assert!(self.meta.is_cache_only_leaf() && !self.has_fence());
1213        } else {
1214            assert!(self.is_base_page() && self.has_fence());
1215        }
1216
1217        let current_count = self.meta.meta_count_without_fence();
1218
1219        // [new_cur_count..] are moved to the sibling node
1220        // [..new_cur_count - 1] are kept in self
1221        // where splitting_key is the key at new_cur_count
1222        // Also, new_cur_count is in [0..(#non_fence_meta - 1)]
1223        let (splitting_key, new_cur_count) = self.get_split_key(cache_only);
1224        let sibling_cnt = current_count - new_cur_count;
1225
1226        // Now we have to do two things:
1227        // Copy the second half of the key-value pairs to the new node, setting the correct offsets.
1228        // Consolidate the key-value pairs in the current node, setting the correct offsets
1229        if !cache_only {
1230            let low_fence_for_right = self.get_kv_meta(FENCE_KEY_CNT + new_cur_count as usize);
1231            assert!(!low_fence_for_right.is_infinite_low_fence_key());
1232            let high_fence_for_right = self.get_kv_meta(HIGH_FENCE_IDX);
1233
1234            let low_fence_key = self.get_full_key(low_fence_for_right);
1235            let high_fence_key = if high_fence_for_right.is_infinite_high_fence_key() {
1236                vec![]
1237            } else {
1238                self.get_full_key(high_fence_for_right)
1239            };
1240
1241            sibling.initialize(
1242                &low_fence_key,
1243                &high_fence_key,
1244                sibling.meta.node_size as usize,
1245                MiniPageNextLevel::new_null(),
1246                true,
1247                cache_only,
1248            );
1249        } else {
1250            sibling.initialize(
1251                &[],
1252                &[],
1253                sibling.meta.node_size as usize,
1254                MiniPageNextLevel::new_null(),
1255                false,
1256                cache_only,
1257            );
1258        }
1259
1260        let starting_kv_idx = if cache_only { 0 } else { FENCE_KEY_CNT };
1261
1262        for i in 0..sibling_cnt {
1263            let kv_meta = self.get_kv_meta((new_cur_count + i) as usize + starting_kv_idx);
1264            if kv_meta.op_type() == OpType::Delete {
1265                // skip deleted records.
1266                continue;
1267            }
1268            let key = self.get_full_key(kv_meta);
1269            let value = self.get_value(kv_meta);
1270            let insert_rt = sibling.insert(&key, value, OpType::Insert, 0);
1271            assert!(insert_rt);
1272        }
1273
1274        if !cache_only {
1275            self.meta
1276                .set_value_count(new_cur_count + FENCE_KEY_CNT as u16);
1277        } else {
1278            self.meta.set_value_count(new_cur_count);
1279        }
1280
1281        self.consolidate_after_split(&splitting_key);
1282
1283        splitting_key
1284    }
1285
1286    /// Use the passed in `merge_split_key` as the splitting key
1287    /// to divide the base page in two. Also, install it as the high
1288    /// fence key of the left-side node and the low fence key of the
1289    /// right-side node
1290    pub fn split_with_key(
1291        &mut self,
1292        sibling: &mut LeafNode,
1293        merge_split_key: &Vec<u8>,
1294        cache_only: bool,
1295    ) {
1296        if cache_only {
1297            assert!(self.meta.is_cache_only_leaf() && !self.has_fence());
1298        } else {
1299            assert!(self.is_base_page() && self.has_fence());
1300        }
1301
1302        let current_count = self.meta.meta_count_without_fence();
1303        // [new_cur_count..] are moved to the sibling node
1304        // [..new_cur_count - 1] are kept in self
1305        // where splitting_key is the key at new_cur_count
1306        // Also, new_cur_count is in [0..#non_fence_meta]
1307        let mut new_cur_count = self.get_kv_num_below_key(merge_split_key);
1308        let sibling_cnt = current_count - new_cur_count;
1309
1310        // Now we have to do two things:
1311        // Copy the second half of the key-value pairs to the new node, setting the correct offsets.
1312        // Consolidate the key-value pairs in the current node, setting the correct offsets.
1313        if !cache_only {
1314            let high_fence_for_right = self.get_kv_meta(HIGH_FENCE_IDX);
1315
1316            let low_fence_key = merge_split_key.clone();
1317            let high_fence_key = if high_fence_for_right.is_infinite_high_fence_key() {
1318                vec![]
1319            } else {
1320                self.get_full_key(high_fence_for_right)
1321            };
1322
1323            sibling.initialize(
1324                &low_fence_key,
1325                &high_fence_key,
1326                sibling.meta.node_size as usize,
1327                MiniPageNextLevel::new_null(),
1328                true,
1329                cache_only,
1330            );
1331        } else {
1332            sibling.initialize(
1333                &[],
1334                &[],
1335                sibling.meta.node_size as usize,
1336                MiniPageNextLevel::new_null(),
1337                false,
1338                cache_only,
1339            );
1340        }
1341
1342        let starting_kv_index = if cache_only { 0 } else { FENCE_KEY_CNT };
1343
1344        for i in 0..sibling_cnt {
1345            let kv_meta = self.get_kv_meta((new_cur_count + i) as usize + starting_kv_index);
1346            if kv_meta.op_type() == OpType::Delete {
1347                // skip deleted records
1348                continue;
1349            }
1350            let key = self.get_full_key(kv_meta);
1351            let value = self.get_value(kv_meta);
1352            let insert_rt = sibling.insert(&key, value, OpType::Insert, 0);
1353            assert!(insert_rt);
1354        }
1355
1356        if !cache_only {
1357            new_cur_count += FENCE_KEY_CNT as u16;
1358        }
1359
1360        self.meta.set_value_count(new_cur_count);
1361        self.consolidate_after_split(merge_split_key);
1362
1363        if !cache_only {
1364            // Assert the high fence of the left side node and the low fence of the right side node
1365            // are both set to the merge split key
1366            let left_high_fence = self.get_kv_meta(HIGH_FENCE_IDX);
1367            let left_high_fence_key = self.get_full_key(left_high_fence);
1368            let right_low_fence_key = sibling.get_low_fence_full_key();
1369
1370            assert_eq!(left_high_fence_key, *merge_split_key); // The high fence of the left side node == splitting key
1371            assert_eq!(right_low_fence_key, *merge_split_key);
1372        }
1373    }
1374
1375    pub(crate) fn consolidate_inner(
1376        &mut self,
1377        new_optype: OpType,
1378        new_high_fence: Option<&[u8]>,
1379        skip_tombstone: bool,
1380        cache_only: bool,
1381        skip_key: Option<&[u8]>,
1382    ) {
1383        let mut pairs = Vec::new();
1384
1385        for meta in self.meta_iter() {
1386            if skip_tombstone && meta.op_type() == OpType::Delete {
1387                // Skip tombstone values.
1388                continue;
1389            }
1390
1391            match skip_key {
1392                // Skip the record with the skip_key
1393                Some(s_k) => {
1394                    let k = self.get_full_key(meta);
1395                    let cmp = s_k.cmp(&k);
1396
1397                    if cmp != Ordering::Equal {
1398                        pairs.push((k, self.get_value(meta).to_owned(), meta.op_type()));
1399                    }
1400                }
1401                None => {
1402                    pairs.push((
1403                        self.get_full_key(meta),
1404                        self.get_value(meta).to_owned(),
1405                        meta.op_type(),
1406                    ));
1407                }
1408            }
1409        }
1410
1411        let has_fence = self.has_fence();
1412        let (low_fence, high_fence) = if has_fence {
1413            let high_fence_key = match new_high_fence {
1414                None => self.get_high_fence_key(),
1415                Some(key) => key.to_vec(),
1416            };
1417            (self.get_low_fence_key(), high_fence_key)
1418        } else {
1419            (vec![], vec![])
1420        };
1421
1422        let node_size = self.meta.node_size;
1423
1424        self.initialize(
1425            &low_fence,
1426            &high_fence,
1427            node_size as usize,
1428            self.next_level,
1429            has_fence,
1430            cache_only,
1431        );
1432
1433        for (key, value, op_type) in pairs {
1434            let rt = if op_type == OpType::Delete {
1435                self.insert(&key, &value, OpType::Delete, 0)
1436            } else {
1437                self.insert(&key, &value, new_optype, 0)
1438            };
1439            assert!(rt);
1440        }
1441    }
1442
1443    #[allow(dead_code)]
1444    pub(crate) fn consolidate(&mut self) {
1445        self.consolidate_inner(
1446            OpType::Insert,
1447            None,
1448            true,
1449            self.meta.is_cache_only_leaf(),
1450            None,
1451        );
1452    }
1453
1454    /// For mini page only.
1455    /// After consolidation, every tombstone records are removed, every insert records become cache records.
1456    #[allow(dead_code)]
1457    pub(crate) fn consolidate_after_merge(&mut self) {
1458        assert!(!self.is_base_page());
1459        self.consolidate_inner(
1460            OpType::Cache,
1461            None,
1462            false,
1463            self.meta.is_cache_only_leaf(),
1464            None,
1465        );
1466    }
1467
1468    /// For base page in non cache-only mode or mini page in cache-only mode.
1469    /// Note that this operation could empty the page so the caller needs to ensure it won't lead to an empty mini page in memory.
1470    /// Re-organize the key-value pairs to remove the holes in the data.
1471    /// A delete/split operation will leave holes in the data field, which is not good for space efficiency.
1472    /// The easiest (but not most efficient) way to do this is to first populate every key value out, then reset the node state, then insert them back.
1473    fn consolidate_after_split(&mut self, high_fence: &[u8]) {
1474        assert!(self.meta.is_cache_only_leaf() || self.is_base_page());
1475        self.consolidate_inner(
1476            OpType::Insert,
1477            Some(high_fence),
1478            true,
1479            self.meta.is_cache_only_leaf(),
1480            None,
1481        );
1482    }
1483
1484    /// For mini page only in cache-only mode only.
1485    /// Note that this operation could empty the page so the caller needs to ensure it won't lead to an empty mini page in memory.
1486    /// Same as consolidate(), except that any record with the given key is dropped
1487    pub(crate) fn consolidate_skip_key(&mut self, key: &[u8]) {
1488        assert!(!self.is_base_page());
1489        assert!(self.meta.is_cache_only_leaf());
1490        self.consolidate_inner(
1491            OpType::Insert,
1492            None,
1493            true,
1494            self.meta.is_cache_only_leaf(),
1495            Some(key),
1496        );
1497    }
1498
1499    /// Copy a mini-page to a new memory location
1500    pub(crate) fn copy_initialize_to(
1501        &self,
1502        dst_node: *mut LeafNode,
1503        dst_size: usize,
1504        discard_cold_cache: bool,
1505    ) {
1506        assert!(!self.is_base_page());
1507        assert!(self.meta.node_size as usize <= dst_size);
1508        let dst_ref = unsafe { &mut *dst_node };
1509        let empty = vec![];
1510
1511        dst_ref.initialize(
1512            &empty,
1513            &empty,
1514            dst_size,
1515            self.next_level,
1516            false, // Mini-page only, thus no fence
1517            self.meta.is_cache_only_leaf(),
1518        );
1519
1520        for meta in self.meta_iter() {
1521            let op = meta.op_type();
1522
1523            // Skip the cold values
1524            // This won't happen in cache-only mode as all records are dirty
1525            // In non cache-only mode, this won't lead to empty mini-page as
1526            // this is invoked after a read-hot or write-hot record.
1527            if discard_cold_cache && op.is_cache() && !meta.is_referenced() {
1528                continue;
1529            }
1530
1531            let value = self.get_value(meta);
1532            let rt = dst_ref.insert(&self.get_full_key(meta), value, op, 0);
1533            assert!(rt);
1534        }
1535    }
1536
1537    /// Initialize a LeafNode
1538    pub(crate) fn initialize(
1539        &mut self,
1540        low_fence: &[u8],
1541        high_fence: &[u8],
1542        node_size: usize,
1543        next_level: MiniPageNextLevel,
1544        has_fence: bool,
1545        cache_only: bool,
1546    ) {
1547        if !has_fence {
1548            assert!(low_fence.is_empty());
1549            assert!(high_fence.is_empty());
1550
1551            self.meta = NodeMeta::new(
1552                LeafNode::max_data_size(node_size) as u16,
1553                false,
1554                false,
1555                node_size as u16,
1556                cache_only,
1557            );
1558            self.prefix_len = 0;
1559            self.next_level = next_level;
1560        } else {
1561            self.meta = NodeMeta::new(
1562                LeafNode::max_data_size(node_size) as u16, // - max_fence_len as u16, // Reserve space for
1563                false,
1564                true,
1565                node_size as u16,
1566                cache_only,
1567            );
1568
1569            self.prefix_len = common_prefix_len(low_fence, high_fence);
1570            self.next_level = next_level;
1571            self.install_fence_key(low_fence, false);
1572            self.install_fence_key(high_fence, true);
1573        }
1574    }
1575
1576    pub(crate) fn set_split_flag(&mut self) {
1577        self.meta.set_split_flag();
1578    }
1579
1580    pub(crate) fn get_split_flag(&self) -> bool {
1581        self.meta.get_split_flag()
1582    }
1583
1584    pub(crate) fn read_by_key(&self, search_key: &[u8], out_buffer: &mut [u8]) -> LeafReadResult {
1585        self.read_by_key_inner(search_key, out_buffer, true)
1586    }
1587
1588    /// Read by key.
1589    #[must_use]
1590    pub(crate) fn read_by_key_inner(
1591        &self,
1592        search_key: &[u8],
1593        out_buffer: &mut [u8],
1594        binary_search: bool,
1595    ) -> LeafReadResult {
1596        let val_count = self.meta.meta_count_with_fence();
1597        let pos = if binary_search {
1598            self.lower_bound(search_key)
1599        } else {
1600            self.linear_lower_bound(search_key)
1601        };
1602
1603        if pos >= val_count {
1604            counter!(LeafNotFoundDueToRange);
1605            return LeafReadResult::NotFound;
1606        }
1607
1608        let kv_meta = self.get_kv_meta(pos as usize);
1609        let target_key = self.get_remaining_key(kv_meta);
1610
1611        // If the key is not already referenced, we need to mark it as referenced.
1612        if !kv_meta.is_referenced() {
1613            kv_meta.mark_as_ref();
1614        }
1615
1616        let input_post_key = &search_key[self.prefix_len as usize..];
1617        let cmp = target_key.cmp(input_post_key);
1618
1619        if cmp != Ordering::Equal {
1620            counter!(LeafNotFoundDueToKey);
1621            LeafReadResult::NotFound
1622        } else {
1623            if kv_meta.op_type().is_absent() {
1624                return LeafReadResult::Deleted;
1625            }
1626            let val_len = kv_meta.value_len();
1627            let val_ref = self.get_value(kv_meta);
1628            debug_assert_eq!(val_len as usize, val_ref.len());
1629            out_buffer[..val_len as usize].copy_from_slice(val_ref);
1630            LeafReadResult::Found(val_len as u32)
1631        }
1632    }
1633
1634    /// Pick the smallest mini-page size that the record fits in without filling it full
1635    /// For Bf-Tree with backend storage, the size is strictly less than the leaf page size (full)
1636    /// For cache-only Bf-Tree, we allow the mini-page size to reach leaf page size
1637    /// Assuming page_classes is in ascending order
1638    pub(crate) fn get_chain_size_hint(
1639        key_len: usize,
1640        value_len: usize,
1641        page_classes: &[usize],
1642        cache_only: bool,
1643    ) -> usize {
1644        let mut initial_record_size = key_len + value_len + std::mem::size_of::<LeafKVMeta>();
1645        initial_record_size += std::mem::size_of::<LeafNode>();
1646
1647        if let Some(s) = page_classes[0..(page_classes.len() - 1)]
1648            .iter()
1649            .position(|x| initial_record_size < *x)
1650        {
1651            return page_classes[s];
1652        } else if cache_only && initial_record_size <= page_classes[page_classes.len() - 1] {
1653            return page_classes[page_classes.len() - 1];
1654        }
1655
1656        panic!(
1657            "Record size {} plus metadata exceeds the max mini-page size {:?}",
1658            initial_record_size, page_classes
1659        );
1660    }
1661
1662    /// A mini-page is upgraded to the next size up where the record fits in without filling it full.
1663    /// For Bf-Tree with backend storage, the new size must be smaller than the leaf page size (full)
1664    /// For cache-only Bf-Tree, we allow the mini-page size to reach leaf page size
1665    /// If not possible, return None
1666    /// Assuming page_classes is in ascending order
1667    pub(crate) fn new_size_if_upgrade(
1668        &self,
1669        incoming_size: usize,
1670        page_classes: &[usize],
1671        cache_only: bool,
1672    ) -> Option<usize> {
1673        let cur_size = self.meta.node_size as usize;
1674        let request_size = cur_size + incoming_size;
1675
1676        if let Some(s) = page_classes[0..(page_classes.len() - 1)]
1677            .iter()
1678            .position(|x| request_size < *x)
1679        {
1680            if s == 0 {
1681                panic!("Should not be here");
1682            }
1683            return Some(page_classes[s]);
1684        } else if cache_only && request_size <= page_classes[page_classes.len() - 1] {
1685            return Some(page_classes[page_classes.len() - 1]);
1686        }
1687
1688        None
1689    }
1690
1691    /// Currently free node can only be called with base node. Mini page should be freed differently.
1692    pub(crate) fn free_base_page(node: *mut LeafNode) {
1693        assert!(unsafe { &*node }.is_base_page());
1694        let node_size = unsafe { &*node }.meta.node_size as usize;
1695        let layout = Layout::from_size_align(node_size, std::mem::align_of::<LeafNode>()).unwrap();
1696        unsafe {
1697            std::alloc::dealloc(node as *mut u8, layout);
1698        }
1699    }
1700
1701    pub(crate) fn need_actually_merge_to_disk(&self) -> bool {
1702        for meta in self.meta_iter() {
1703            if meta.op_type().is_dirty() {
1704                return true;
1705            }
1706        }
1707
1708        false
1709    }
1710
1711    fn estimate_merge_size(&self) -> usize {
1712        let mut required_size = 0;
1713
1714        for meta in self.meta_iter() {
1715            if meta.op_type() == OpType::Insert {
1716                required_size += (meta.get_key_len() + meta.value_len()) as usize
1717                    + std::mem::size_of::<LeafKVMeta>();
1718            }
1719        }
1720
1721        required_size
1722    }
1723
1724    /// Determine if the leaf node has space of the requested_size given the full fence length
1725    /// This is required as fences could be added to a base page during consolidation
1726    pub(crate) fn full_with_fences(&self, requested_size: u16, max_fence_len: usize) -> bool {
1727        if self.meta.remaining_size < requested_size {
1728            return true;
1729        }
1730
1731        if self.meta.has_fence() {
1732            let mut empty_data_size = self.meta.remaining_size; // Counting fence keys
1733
1734            let low_key_meta = self.get_kv_meta(LOW_FENCE_IDX);
1735            if !low_key_meta.is_infinite_low_fence_key() {
1736                empty_data_size += low_key_meta.get_key_len();
1737            }
1738
1739            let high_key_meta = self.get_kv_meta(HIGH_FENCE_IDX);
1740            if !high_key_meta.is_infinite_high_fence_key() {
1741                empty_data_size += high_key_meta.get_key_len();
1742            }
1743
1744            if empty_data_size >= requested_size + max_fence_len as u16 {
1745                return false;
1746            }
1747
1748            return true;
1749        }
1750
1751        false
1752    }
1753
1754    pub(crate) fn merge_mini_page(&mut self, mini_page: &LeafNode, max_fence_len: usize) -> bool {
1755        let size_required = mini_page.estimate_merge_size();
1756
1757        if self.full_with_fences(size_required as u16, max_fence_len) {
1758            return false;
1759        }
1760
1761        for meta in mini_page.meta_iter() {
1762            let c_key = mini_page.get_full_key(meta);
1763            let c_type = meta.op_type();
1764            if !c_type.is_dirty() {
1765                // This is important. We don't want to merge cache records, not only for performance but also correctness.
1766                // A cached record might be inaccessible (thus invalid).
1767                // Consider the case where scan operation merged the mini page, which triggers split,
1768                // The scan operation want to keep the mini page (why not), but the records are all in cache mode.
1769                continue;
1770            }
1771            let c_value = mini_page.get_value(meta);
1772            let rt = self.insert(&c_key, c_value, c_type, 0);
1773            assert!(rt);
1774        }
1775
1776        true
1777    }
1778
1779    pub(crate) fn get_stats(&self) -> LeafStats {
1780        let mut keys = Vec::new();
1781        let mut values = Vec::new();
1782        let mut op_types = Vec::new();
1783        let node_size = self.meta.node_size as usize;
1784        let prefix = self.get_prefix().to_owned();
1785
1786        for meta in self.meta_iter() {
1787            let key = self.get_full_key(meta);
1788            let value = self.get_value(meta);
1789            keys.push(key);
1790            values.push(value.to_owned());
1791            op_types.push(meta.op_type());
1792        }
1793
1794        LeafStats {
1795            keys,
1796            values,
1797            op_types,
1798            prefix,
1799            base_node: None,
1800            next_level: self.next_level,
1801            node_size,
1802        }
1803    }
1804
1805    pub(crate) fn has_fence(&self) -> bool {
1806        self.meta.has_fence()
1807    }
1808
1809    /// Returns an iterator that iterates over all key-value pairs, skipping the fence keys.
1810    pub(crate) fn meta_iter(&self) -> LeafMetaIter<'_> {
1811        LeafMetaIter::new(self)
1812    }
1813
1814    fn first_meta_pos_after_fence(&self) -> u16 {
1815        if self.has_fence() {
1816            FENCE_KEY_CNT as u16
1817        } else {
1818            0
1819        }
1820    }
1821
1822    pub(crate) fn get_record_by_pos_with_bound(
1823        &self,
1824        pos: u32,
1825        out_buffer: &mut [u8],
1826        return_field: ScanReturnField,
1827        bound_key: &Option<Vec<u8>>,
1828    ) -> GetScanRecordByPosResult {
1829        if pos >= self.meta.meta_count_with_fence() as u32 {
1830            return GetScanRecordByPosResult::EndOfLeaf;
1831        }
1832
1833        let meta = self.get_kv_meta(pos as usize);
1834
1835        if meta.op_type().is_absent() {
1836            return GetScanRecordByPosResult::Deleted;
1837        }
1838
1839        match return_field {
1840            ScanReturnField::Value => {
1841                if let Some(bk) = bound_key {
1842                    let cmp = self.get_full_key(meta).as_slice().cmp(bk);
1843                    if cmp == Ordering::Greater {
1844                        return GetScanRecordByPosResult::BoundKeyExceeded;
1845                    }
1846                }
1847
1848                let value = self.get_value(meta);
1849                let value_len = meta.value_len() as usize;
1850                out_buffer[..value_len].copy_from_slice(value);
1851                GetScanRecordByPosResult::Found(0, value_len as u32)
1852            }
1853            ScanReturnField::Key => {
1854                let full_key = self.get_full_key(meta);
1855
1856                if let Some(bk) = bound_key {
1857                    let cmp = full_key.as_slice().cmp(bk);
1858                    if cmp == Ordering::Greater {
1859                        return GetScanRecordByPosResult::BoundKeyExceeded;
1860                    }
1861                }
1862
1863                let key_len = full_key.len();
1864                out_buffer[..key_len].copy_from_slice(&full_key);
1865                GetScanRecordByPosResult::Found(key_len as u32, 0)
1866            }
1867            ScanReturnField::KeyAndValue => {
1868                let full_key = self.get_full_key(meta);
1869
1870                if let Some(bk) = bound_key {
1871                    let cmp = full_key.as_slice().cmp(bk);
1872                    if cmp == Ordering::Greater {
1873                        return GetScanRecordByPosResult::BoundKeyExceeded;
1874                    }
1875                }
1876
1877                let key_len = full_key.len();
1878                let value = self.get_value(meta);
1879                let value_len = meta.value_len() as usize;
1880
1881                out_buffer[..key_len].copy_from_slice(&full_key);
1882                out_buffer[key_len..key_len + value_len].copy_from_slice(value);
1883
1884                GetScanRecordByPosResult::Found(key_len as u32, value_len as u32)
1885            }
1886        }
1887    }
1888}
1889
1890pub(crate) struct LeafMetaIter<'a> {
1891    node: &'a LeafNode,
1892    cur: usize,
1893}
1894
1895impl LeafMetaIter<'_> {
1896    fn new(leaf: &LeafNode) -> LeafMetaIter<'_> {
1897        let cur = leaf.first_meta_pos_after_fence();
1898        LeafMetaIter {
1899            node: leaf,
1900            cur: cur as usize,
1901        }
1902    }
1903}
1904
1905impl<'a> Iterator for LeafMetaIter<'a> {
1906    type Item = &'a LeafKVMeta;
1907
1908    fn next(&mut self) -> Option<Self::Item> {
1909        if self.cur < self.node.meta.meta_count_with_fence() as usize {
1910            let rt = self.node.get_kv_meta(self.cur);
1911            self.cur += 1;
1912            Some(rt)
1913        } else {
1914            None
1915        }
1916    }
1917}
1918
1919pub(crate) enum GetScanRecordByPosResult {
1920    EndOfLeaf,
1921    Deleted,
1922    Found(u32, u32),  // length of returned key and value
1923    BoundKeyExceeded, // The key at the pos exceeds a given bound key
1924}
1925
1926#[derive(Debug, PartialEq, Eq, Clone)]
1927pub enum LeafReadResult {
1928    Deleted,
1929    NotFound,
1930    Found(u32),
1931    InvalidKey,
1932}
1933
1934#[cfg(test)]
1935mod tests {
1936    use super::*;
1937    use bytemuck::cast_slice;
1938    use rstest::rstest;
1939
1940    #[test]
1941    fn test_op_type_enum() {
1942        assert_eq!(OpType::Insert as u8, 0);
1943        assert_eq!(OpType::Delete as u8, 1);
1944        assert_eq!(OpType::Cache as u8, 2);
1945    }
1946
1947    #[test]
1948    fn test_make_prefixed_meta() {
1949        let key = [1, 2, 3, 4, 5];
1950        let prefix_len = 2;
1951        let meta = LeafKVMeta::make_prefixed_meta(10, 20, &key, prefix_len, OpType::Insert);
1952
1953        assert_eq!(meta.offset, 10);
1954        assert_eq!(meta.get_key_len(), 5);
1955        assert_eq!(meta.value_len(), 20);
1956        assert_eq!(meta.preview_bytes, [3, 4]);
1957        assert_eq!(meta.op_type(), OpType::Insert);
1958        assert!(!meta.is_referenced());
1959    }
1960
1961    #[test]
1962    fn test_make_infinite_high_fence_key() {
1963        let meta = LeafKVMeta::make_infinite_high_fence_key();
1964        assert_eq!(meta.offset, u16::MAX);
1965        assert_eq!(meta.get_key_len(), 0);
1966        assert_eq!(meta.value_len(), 0);
1967        assert!(meta.is_infinite_high_fence_key());
1968    }
1969
1970    #[test]
1971    fn test_make_infinite_low_fence_key() {
1972        let meta = LeafKVMeta::make_infinite_low_fence_key();
1973        assert_eq!(meta.offset, u16::MAX - 1);
1974        assert_eq!(meta.get_key_len(), 0);
1975        assert_eq!(meta.value_len(), 0);
1976        assert!(meta.is_infinite_low_fence_key());
1977    }
1978
1979    #[test]
1980    fn test_value_len() {
1981        let mut meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Insert);
1982        assert_eq!(meta.value_len(), 20);
1983
1984        meta.set_value_len(30);
1985        assert_eq!(meta.value_len(), 30);
1986    }
1987
1988    #[test]
1989    fn test_op_type() {
1990        let meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Delete);
1991        assert_eq!(meta.op_type(), OpType::Delete);
1992    }
1993
1994    #[test]
1995    fn test_mark_as_ref_and_clear_ref() {
1996        let meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Insert);
1997        assert!(!meta.is_referenced());
1998
1999        meta.mark_as_ref();
2000        assert!(meta.is_referenced());
2001
2002        meta.clear_ref();
2003        assert!(!meta.is_referenced());
2004    }
2005
2006    #[test]
2007    fn test_mark_as_deleted_and_is_deleted() {
2008        let mut meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Insert);
2009        assert!(!meta.is_deleted());
2010
2011        meta.mark_as_deleted();
2012        assert!(meta.is_deleted());
2013    }
2014
2015    /// This test verifies that the merge split key divides
2016    /// the combined records of the base page (self) and its
2017    /// to-be-merged mini-page in half
2018    #[rstest]
2019    #[case(vec![1], vec![2], 2)]
2020    #[case(vec![2], vec![1], 2)]
2021    fn test_get_merge_split_key(
2022        #[case] base_page_values: Vec<usize>,
2023        #[case] mini_page_values: Vec<usize>,
2024        #[case] splitting_key: usize,
2025    ) {
2026        let base = unsafe { &mut *LeafNode::make_base_page(4096) };
2027        let mini = unsafe { &mut *LeafNode::make_base_page(4096) }; // Using base page as substitute
2028
2029        // Insert values to base page and mini page accordingly
2030        for i in 0..base_page_values.len() {
2031            let n = &base_page_values[i];
2032            let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
2033            let key = cast_slice::<usize, u8>(n_slice);
2034            let value = cast_slice::<usize, u8>(n_slice);
2035
2036            let rt = base.insert(key, value, OpType::Insert, 2);
2037            assert!(rt);
2038        }
2039
2040        for i in 0..mini_page_values.len() {
2041            let n = &mini_page_values[i];
2042            let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
2043            let key = cast_slice::<usize, u8>(n_slice);
2044            let value = cast_slice::<usize, u8>(n_slice);
2045
2046            let rt = mini.insert(key, value, OpType::Insert, 2);
2047            assert!(rt);
2048        }
2049
2050        // Find the splitting key
2051        let merge_split_key_byte = base.get_merge_split_key(mini);
2052        let merge_splitting_key = cast_slice::<u8, usize>(&merge_split_key_byte);
2053
2054        assert_eq!(merge_splitting_key[0], splitting_key);
2055
2056        LeafNode::free_base_page(base);
2057        LeafNode::free_base_page(mini);
2058    }
2059
2060    /// This test verifies that a base page is correctly split
2061    /// up based on a splitting key with the left side node (self)
2062    /// containing keys less than the key and right side node (new)
2063    /// containing keys greater than or equal to the splitting key
2064    #[rstest]
2065    #[case(vec![1, 2, 3, 4], 3)]
2066    #[case(vec![1], 2)]
2067    #[case(vec![2], 2)]
2068    fn test_split_with_key(#[case] base_page_values: Vec<usize>, #[case] splitting_key: usize) {
2069        let base = unsafe { &mut *LeafNode::make_base_page(4096) };
2070        let sibling = unsafe { &mut *LeafNode::make_base_page(4096) };
2071
2072        // Insert values to base page
2073        for i in 0..base_page_values.len() {
2074            let n = &base_page_values[i];
2075            let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
2076            let key = cast_slice::<usize, u8>(n_slice);
2077            let value = cast_slice::<usize, u8>(n_slice);
2078
2079            let rt = base.insert(key, value, OpType::Insert, 2);
2080            assert!(rt);
2081        }
2082
2083        let splitting_key_ptr = &splitting_key;
2084        let splitting_key_slice =
2085            unsafe { std::slice::from_raw_parts(splitting_key_ptr as *const usize, 1) };
2086        let splitting_key_byte_arrary = cast_slice::<usize, u8>(splitting_key_slice).to_vec();
2087
2088        // Split the base page using the splitting key
2089        base.split_with_key(sibling, &splitting_key_byte_arrary, false);
2090
2091        // All values less than splitting key are in the left side node (self)
2092        // while those greater than or equal to the key are in the sibling node
2093        let mut out_buffer = vec![0u8; 1024];
2094        for i in 0..base_page_values.len() {
2095            let mut page = &mut *base;
2096            if base_page_values[i] >= splitting_key {
2097                page = &mut *sibling;
2098            }
2099
2100            let n = &base_page_values[i];
2101            let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
2102            let key = cast_slice::<usize, u8>(n_slice);
2103
2104            let rt = page.read_by_key(key, &mut out_buffer);
2105
2106            assert_eq!(rt, LeafReadResult::Found(key.len() as u32)); // key and value were set the same
2107            assert_eq!(&out_buffer[0..key.len()], key);
2108        }
2109
2110        LeafNode::free_base_page(base);
2111        LeafNode::free_base_page(sibling);
2112    }
2113}