Skip to main content

bf_tree/nodes/
leaf_node.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT license.
3
4use core::panic;
5use std::{alloc::Layout, cmp::Ordering, sync::atomic};
6
7use crate::{
8    circular_buffer::CircularBufferPtr, counter, error::TreeError, range_scan::ScanReturnField,
9    utils::stats::LeafStats,
10};
11
12use super::{node_meta::NodeMeta, FENCE_KEY_CNT};
13
14/// Invariant: leaf page can only have Insert and Delete type.
15///            mini page can have all.
16#[repr(u8)]
17#[derive(PartialEq, Eq, Debug, Clone, Copy)]
18pub(crate) enum OpType {
19    Insert = 0,
20    Delete = 1,
21    Cache = 2,   // the clean version of insert.
22    Phantom = 3, // the clean version of delete.
23}
24
25impl OpType {
26    pub(crate) fn is_dirty(&self) -> bool {
27        match self {
28            OpType::Insert | OpType::Delete => true,
29            OpType::Cache | OpType::Phantom => false,
30        }
31    }
32
33    fn is_absent(&self) -> bool {
34        match self {
35            OpType::Insert | OpType::Cache => false,
36            OpType::Phantom | OpType::Delete => true,
37        }
38    }
39
40    fn is_cache(&self) -> bool {
41        match self {
42            OpType::Cache | OpType::Phantom => true,
43            OpType::Insert | OpType::Delete => false,
44        }
45    }
46}
47
48const PREVIEW_SIZE: usize = 2;
49
50const LOW_FENCE_IDX: usize = 0;
51const HIGH_FENCE_IDX: usize = 1;
52
53const OP_TYPE_SHIFT: u16 = 14;
54const KEY_LEN_MASK: u16 = 0x3F_FF; // lower 14 bits on the key_len
55
56// highest bit on the value_len;
57const REF_BIT_MASK: u16 = 0x80_00;
58// third highest bit on the value_len;
59const VALUE_LEN_MASK: u16 = 0x7F_FF; // lower 15 bits on the value_len;
60
61pub(crate) fn common_prefix_len(low_fence: &[u8], high_fence: &[u8]) -> u16 {
62    let mut prefix_len = 0;
63    for i in 0..std::cmp::min(low_fence.len(), high_fence.len()) {
64        if low_fence[i] == high_fence[i] {
65            prefix_len += 1;
66        } else {
67            break;
68        }
69    }
70    prefix_len
71}
72
73#[repr(C)]
74pub(crate) struct LeafKVMeta {
75    offset: u16,
76    op_type_key_len_in_byte: u16, // Highest 2 bits: op_type, Lower 14 bits: Key length in bytes.
77    ref_value_len_in_byte: std::sync::atomic::AtomicU16, // Highest bit: ref, Lower 15 bits: value length in bytes. here we don't use shuttle's AtomicU16, because this is not a real sync point.
78    preview_bytes: [u8; PREVIEW_SIZE],
79}
80
81impl LeafKVMeta {
82    pub(crate) fn make_prefixed_meta(
83        offset: u16,
84        value_len: u16,
85        key: &[u8],
86        prefix_len: u16,
87        op_type: OpType,
88    ) -> Self {
89        let mut meta = Self {
90            offset,
91            op_type_key_len_in_byte: key.len() as u16 | ((op_type as u16) << OP_TYPE_SHIFT),
92            ref_value_len_in_byte: std::sync::atomic::AtomicU16::new(value_len),
93            preview_bytes: [0; PREVIEW_SIZE],
94        };
95
96        for i in 0..std::cmp::min(key.len().saturating_sub(prefix_len as usize), PREVIEW_SIZE) {
97            meta.preview_bytes[i] = key[i + prefix_len as usize];
98        }
99
100        // The initial value is not referenced, this is important because during the Garbage reclaim of the delta chain,
101        // we will call Insert to reset the states, which calls this function.
102        meta.clear_ref();
103        meta
104    }
105
106    pub fn make_infinite_high_fence_key() -> Self {
107        assert_eq!(std::mem::size_of::<Self>(), super::KV_META_SIZE);
108
109        Self {
110            offset: u16::MAX,
111            op_type_key_len_in_byte: 0,
112            ref_value_len_in_byte: std::sync::atomic::AtomicU16::new(0),
113            preview_bytes: [0; PREVIEW_SIZE],
114        }
115    }
116
117    pub fn make_infinite_low_fence_key() -> Self {
118        Self {
119            offset: u16::MAX - 1,
120            op_type_key_len_in_byte: 0,
121            ref_value_len_in_byte: std::sync::atomic::AtomicU16::new(0),
122            preview_bytes: [0; PREVIEW_SIZE],
123        }
124    }
125
126    pub fn is_infinite_low_fence_key(&self) -> bool {
127        self.offset == u16::MAX - 1
128    }
129
130    pub fn is_infinite_high_fence_key(&self) -> bool {
131        self.offset == u16::MAX
132    }
133
134    pub fn value_len(&self) -> u16 {
135        self.ref_value_len_in_byte.load(atomic::Ordering::Relaxed) & VALUE_LEN_MASK
136    }
137
138    pub fn set_value_len(&mut self, value: u16) {
139        let v = self.ref_value_len_in_byte.load(atomic::Ordering::Relaxed);
140        self.ref_value_len_in_byte.store(
141            (v & !VALUE_LEN_MASK) | (value & VALUE_LEN_MASK),
142            atomic::Ordering::Relaxed,
143        );
144    }
145
146    pub fn set_op_type(&mut self, op_type: OpType) {
147        self.op_type_key_len_in_byte = self.get_key_len() | ((op_type as u16) << OP_TYPE_SHIFT);
148    }
149
150    pub fn op_type(&self) -> OpType {
151        let l = self.op_type_key_len_in_byte;
152        let b = l >> OP_TYPE_SHIFT;
153        unsafe { std::mem::transmute(b as u8) }
154    }
155
156    pub fn mark_as_ref(&self) {
157        self.ref_value_len_in_byte
158            .fetch_or(REF_BIT_MASK, atomic::Ordering::Relaxed);
159    }
160
161    pub fn clear_ref(&self) {
162        self.ref_value_len_in_byte
163            .fetch_and(!REF_BIT_MASK, atomic::Ordering::Relaxed);
164    }
165
166    pub fn is_referenced(&self) -> bool {
167        self.ref_value_len_in_byte.load(atomic::Ordering::Relaxed) & REF_BIT_MASK != 0
168    }
169
170    pub fn mark_as_deleted(&mut self) {
171        self.set_op_type(OpType::Delete);
172    }
173
174    #[allow(dead_code)]
175    pub fn is_deleted(&self) -> bool {
176        self.op_type() == OpType::Delete
177    }
178
179    pub(crate) fn get_offset(&self) -> u16 {
180        self.offset
181    }
182
183    pub(crate) fn get_key_len(&self) -> u16 {
184        self.op_type_key_len_in_byte & KEY_LEN_MASK
185    }
186}
187
188#[derive(Debug, Clone, Copy)]
189pub(crate) struct MiniPageNextLevel {
190    val: usize,
191}
192
193impl MiniPageNextLevel {
194    pub(crate) fn new(val: usize) -> Self {
195        Self { val }
196    }
197
198    pub(crate) fn as_offset(&self) -> usize {
199        assert!(!self.is_null());
200        self.val
201    }
202
203    pub(crate) fn new_null() -> Self {
204        Self { val: usize::MAX }
205    }
206
207    pub(crate) fn is_null(&self) -> bool {
208        self.val == usize::MAX
209    }
210}
211
212#[repr(C)]
213pub(crate) struct LeafNode {
214    pub(crate) meta: NodeMeta,
215    prefix_len: u16,
216    pub(crate) next_level: MiniPageNextLevel,
217    pub(crate) lsn: u64,
218    snapshot_version: u64,
219    data: [u8; 0],
220}
221
222const _: () = assert!(std::mem::size_of::<LeafNode>() == 32);
223
224impl LeafNode {
225    fn max_data_size(node_size: usize) -> usize {
226        node_size - std::mem::size_of::<LeafNode>()
227    }
228
229    pub(crate) fn initialize_mini_page(
230        ptr: &CircularBufferPtr,
231        node_size: usize,
232        next_level: MiniPageNextLevel,
233        cache_only: bool,
234        snapshot_version: u64,
235    ) {
236        unsafe {
237            Self::init_node_with_fence(
238                ptr.as_ptr(),
239                &[],
240                &[],
241                node_size,
242                next_level,
243                false,
244                cache_only,
245                snapshot_version,
246            )
247        }
248    }
249
250    pub(crate) fn make_base_page(node_size: usize, snapshot_version: u64) -> *mut Self {
251        let layout = Layout::from_size_align(node_size, std::mem::align_of::<LeafNode>()).unwrap();
252        let ptr = unsafe { std::alloc::alloc(layout) };
253        unsafe {
254            Self::init_node_with_fence(
255                ptr,
256                &[],
257                &[],
258                node_size,
259                MiniPageNextLevel::new_null(),
260                true,
261                false,
262                snapshot_version,
263            );
264        }
265        ptr as *mut Self
266    }
267
268    /// After promoting a base page to full page cache, we need to mark every records as cache.
269    pub(crate) fn covert_insert_records_to_cache(&mut self) {
270        let mut cur = self.first_meta_pos_after_fence();
271
272        while cur < self.meta.meta_count_with_fence() {
273            let meta = self.get_kv_meta_mut(cur as usize);
274            let op_type = meta.op_type();
275
276            match op_type {
277                OpType::Insert => {
278                    meta.set_op_type(OpType::Cache);
279                }
280                OpType::Delete => {
281                    meta.set_op_type(OpType::Phantom);
282                }
283                OpType::Cache | OpType::Phantom => {
284                    unreachable!("Base page should not have op type: {:?}", op_type);
285                }
286            };
287
288            cur += 1;
289        }
290    }
291
292    /// When merge full page cache to base page, we need to set clean records to dirty records.
293    pub(crate) fn convert_cache_records_to_insert(&mut self) {
294        let mut cur = self.first_meta_pos_after_fence();
295
296        while cur < self.meta.meta_count_with_fence() {
297            let meta = self.get_kv_meta_mut(cur as usize);
298            let op_type = meta.op_type();
299
300            match op_type {
301                OpType::Cache => {
302                    meta.set_op_type(OpType::Insert);
303                }
304                OpType::Phantom => {
305                    meta.set_op_type(OpType::Delete);
306                }
307                OpType::Insert | OpType::Delete => {
308                    // other types might present.
309                }
310            };
311
312            cur += 1;
313        }
314    }
315
316    #[allow(clippy::too_many_arguments)]
317    unsafe fn init_node_with_fence(
318        ptr: *mut u8,
319        low_fence: &[u8],
320        high_fence: &[u8],
321        node_size: usize,
322        next_level: MiniPageNextLevel,
323        has_fence: bool,
324        cache_only: bool,
325        snapshot_version: u64,
326    ) {
327        let ptr = ptr as *mut Self;
328
329        { &mut *ptr }.initialize(
330            low_fence,
331            high_fence,
332            node_size,
333            next_level,
334            has_fence,
335            cache_only,
336            snapshot_version,
337        );
338    }
339
340    pub(crate) fn get_kv_meta(&self, index: usize) -> &LeafKVMeta {
341        debug_assert!(index < self.meta.meta_count_with_fence() as usize);
342        let meta_ptr = self.data.as_ptr() as *const LeafKVMeta;
343        unsafe { &*meta_ptr.add(index) }
344    }
345
346    pub(crate) fn get_full_key(&self, meta: &LeafKVMeta) -> Vec<u8> {
347        let prefix = self.get_prefix();
348        let remaining = self.get_remaining_key(meta);
349        [prefix, remaining].concat()
350    }
351
352    /// Get the full key for low fence which is not prefix compressed
353    pub(crate) fn get_low_fence_full_key(&self) -> Vec<u8> {
354        debug_assert!(LOW_FENCE_IDX < self.meta.meta_count_with_fence() as usize);
355        let meta_ptr = self.data.as_ptr() as *const LeafKVMeta;
356        let meta = unsafe { &*meta_ptr.add(LOW_FENCE_IDX) };
357
358        let key_offset = meta.get_offset();
359        let key_ptr = unsafe { self.data.as_ptr().add(key_offset as usize) };
360        unsafe {
361            let key = std::slice::from_raw_parts(key_ptr, (meta.get_key_len()) as usize);
362            [key].concat()
363        }
364    }
365
366    pub(crate) fn get_kv_meta_mut(&mut self, index: usize) -> &mut LeafKVMeta {
367        debug_assert!(index < self.meta.meta_count_with_fence() as usize);
368        let meta_ptr = self.data.as_mut_ptr() as *mut LeafKVMeta;
369        unsafe { meta_ptr.add(index).as_mut().unwrap() }
370    }
371
372    pub(crate) fn write_initial_kv_meta(&mut self, index: usize, meta: LeafKVMeta) {
373        let meta_ptr = self.data.as_mut_ptr() as *mut LeafKVMeta;
374        unsafe { meta_ptr.add(index).write(meta) };
375    }
376
377    pub(crate) fn get_remaining_key(&self, meta: &LeafKVMeta) -> &[u8] {
378        let key_offset = meta.get_offset();
379        let key_ptr = unsafe { self.data.as_ptr().add(key_offset as usize) };
380        unsafe {
381            std::slice::from_raw_parts(key_ptr, (meta.get_key_len() - self.prefix_len) as usize)
382        }
383    }
384
385    pub(crate) fn get_prefix(&self) -> &[u8] {
386        let m = self.get_kv_meta(LOW_FENCE_IDX);
387        let key_offset = m.get_offset();
388        unsafe {
389            std::slice::from_raw_parts(
390                self.data.as_ptr().add(key_offset as usize),
391                self.prefix_len as usize,
392            )
393        }
394    }
395
396    pub(crate) fn install_fence_key(&mut self, key: &[u8], is_high_fence: bool) {
397        let loc: u16 = self.meta.meta_count_with_fence();
398        if !is_high_fence {
399            debug_assert!(self.meta.meta_count_with_fence() == LOW_FENCE_IDX as u16);
400        } else {
401            debug_assert!(self.meta.meta_count_with_fence() == HIGH_FENCE_IDX as u16);
402        }
403
404        let cur_low_offset = self.current_lowest_offset();
405
406        self.meta.increment_value_count();
407        self.meta.remaining_size -= std::mem::size_of::<LeafKVMeta>() as u16;
408
409        if key.is_empty() {
410            let fence = if is_high_fence {
411                LeafKVMeta::make_infinite_high_fence_key()
412            } else {
413                LeafKVMeta::make_infinite_low_fence_key()
414            };
415            self.write_initial_kv_meta(loc as usize, fence);
416        } else {
417            let prefix_len = if is_high_fence { self.prefix_len } else { 0 }; // we don't compress low fence.
418
419            let post_fix_len = key.len() as u16 - prefix_len;
420            let val_len = 0u16;
421            let kv_len = post_fix_len + val_len;
422
423            let remaining = self.meta.remaining_size;
424
425            debug_assert!(kv_len <= remaining);
426
427            let offset = cur_low_offset - kv_len;
428
429            let new_meta =
430                LeafKVMeta::make_prefixed_meta(offset, 0, key, prefix_len, OpType::Insert);
431
432            self.write_initial_kv_meta(loc as usize, new_meta);
433
434            unsafe {
435                let start_ptr = self.data.as_mut_ptr().add(offset as usize);
436
437                let key_slice = std::slice::from_raw_parts(
438                    key.as_ptr().add(prefix_len as usize),
439                    post_fix_len as usize,
440                );
441                std::ptr::copy_nonoverlapping(key_slice.as_ptr(), start_ptr, post_fix_len as usize);
442            }
443
444            self.meta.remaining_size -= kv_len;
445        }
446    }
447
448    pub(crate) fn current_lowest_offset(&self) -> u16 {
449        let value_count = self.meta.meta_count_with_fence();
450        let rt =
451            (value_count * std::mem::size_of::<LeafKVMeta>() as u16) + self.meta.remaining_size;
452
453        // Sanity check
454        #[cfg(debug_assertions)]
455        {
456            let mut min_offset = LeafNode::max_data_size(self.meta.node_size as usize) as u16;
457            for i in 0..value_count {
458                let kv_meta = self.get_kv_meta(i as usize);
459                if kv_meta.is_infinite_low_fence_key() || kv_meta.is_infinite_high_fence_key() {
460                    continue;
461                }
462                min_offset = std::cmp::min(min_offset, kv_meta.offset);
463            }
464            assert!(min_offset == rt);
465        }
466        rt
467    }
468
469    pub(crate) fn get_value(&self, meta: &LeafKVMeta) -> &[u8] {
470        let val_offset = meta.get_offset() + meta.get_key_len() - self.prefix_len;
471        let val_ptr = unsafe { self.data.as_ptr().add(val_offset as usize) };
472        unsafe { std::slice::from_raw_parts(val_ptr, meta.value_len() as usize) }
473    }
474
475    /// A good split key has two properties:
476    /// (1). it split the nodes into two roughly equal sizes.
477    /// (2). it should be short, so that parent node search can be faster.
478    ///
479    /// Get the split key is tricky:
480    /// 1. We can't just use the middle key, because the key/value are variable length. We want the key that split the node in size, not in key count.
481    /// 2. We don't try to find small split key here.
482    ///
483    /// Returns the key that split the node into two roughly equal size, and the new node count.
484    /// This is only invoked once for splitting the root node.
485    pub fn get_split_key(&self, cache_only: bool) -> (Vec<u8>, u16) {
486        let mut data_size = 0;
487
488        for meta in self.meta_iter() {
489            let key_len = meta.get_key_len();
490            let value_len = meta.value_len();
491
492            data_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
493        }
494
495        let split_target_size = data_size / 2;
496        let mut cur_size = 0;
497
498        for (i, meta) in self.meta_iter().enumerate() {
499            let key_len = meta.get_key_len();
500            let value_len = meta.value_len();
501
502            cur_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
503
504            // Here we guarantee in cache-only mode, the root node has at least two keys
505            if cache_only && i == 0 {
506                continue;
507            }
508
509            if cur_size >= split_target_size {
510                return (self.get_full_key(meta), i as u16);
511            }
512        }
513        unreachable!();
514    }
515
516    /// Get # of keys strictly smaller than a merge_split_key
517    #[allow(clippy::unused_enumerate_index)]
518    pub fn get_kv_num_below_key(&self, merge_split_key: &Vec<u8>) -> u16 {
519        // Linear search
520        let mut cnt: u16 = 0;
521        for (_, meta) in self.meta_iter().enumerate() {
522            let key = self.get_full_key(meta);
523            let cmp = key.cmp(merge_split_key);
524            // Pick all records from the base page whose key is smaller
525            // than merge_split_key
526            if cmp == std::cmp::Ordering::Less {
527                cnt += 1;
528            } else {
529                break;
530            }
531        }
532        cnt
533    }
534
535    /// [Cache-only mode]: Find the splitting key that evenly splits all the records in the
536    /// current node and the to-be-inserted record in half. The caller needs to guarantee that
537    /// there are at least two records with different such that it won't result in an empty page
538    /// after split
539    pub fn get_cache_only_insert_split_key(&self, key: &[u8], new_record_size: &u16) -> Vec<u8> {
540        let mut merge_split_key_1: Option<Vec<u8>> = None;
541        let mut merge_split_key_2: Option<Vec<u8>> = None;
542        let mut diff_1: i16 = i16::MAX;
543        let mut diff_2: i16 = i16::MAX;
544
545        // The total size of all records including the new record to insert
546        let mut total_merged_size: u16 = 0;
547
548        for meta in self.meta_iter() {
549            let key_len = meta.get_key_len();
550            let value_len = meta.value_len();
551
552            total_merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
553        }
554
555        total_merged_size += new_record_size + std::mem::size_of::<LeafKVMeta>() as u16;
556        let split_target_size = total_merged_size / 2;
557
558        // Search for the splitting key
559        let mut merged_size: u16 = 0;
560        let mut self_meta_iter = self.meta_iter();
561        let mut self_meta_option = self_meta_iter.next();
562
563        // Go through all records in the base page whose keys are smaller than the new record's key
564        if self_meta_option.is_some() {
565            let mut cur_base_meta = self_meta_option.unwrap();
566            let mut cur_base_key = self.get_full_key(cur_base_meta);
567            let mut cmp = cur_base_key.as_slice().cmp(key);
568
569            while cmp == std::cmp::Ordering::Less {
570                let key_len = cur_base_meta.get_key_len();
571                let value_len = cur_base_meta.value_len();
572                merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
573                if merged_size >= split_target_size {
574                    // Two split key candidates are already found
575                    // Stop
576                    if merge_split_key_2.is_some() {
577                        break;
578                    }
579
580                    let left_side: u16 = merged_size
581                        - key_len
582                        - value_len
583                        - std::mem::size_of::<LeafKVMeta>() as u16;
584                    let right_side = total_merged_size - left_side;
585
586                    if merge_split_key_1.is_none() {
587                        merge_split_key_1 = Some(cur_base_key);
588                        diff_1 = (left_side as i16 - right_side as i16).abs();
589                    } else {
590                        merge_split_key_2 = Some(cur_base_key);
591                        diff_2 = (left_side as i16 - right_side as i16).abs();
592                    }
593                }
594
595                self_meta_option = self_meta_iter.next();
596                if let Some(meta) = self_meta_option {
597                    cur_base_meta = meta;
598                    cur_base_key = self.get_full_key(cur_base_meta);
599                    cmp = cur_base_key.as_slice().cmp(key);
600                } else {
601                    break;
602                }
603            }
604        }
605
606        // Count the new key
607        if merge_split_key_2.is_none() {
608            merged_size += new_record_size + std::mem::size_of::<LeafKVMeta>() as u16;
609
610            if merged_size >= split_target_size {
611                // Two split key candidates are already found
612                // Stop
613
614                let left_side: u16 =
615                    merged_size - new_record_size - std::mem::size_of::<LeafKVMeta>() as u16;
616                let right_side = total_merged_size - left_side;
617
618                if merge_split_key_1.is_none() {
619                    merge_split_key_1 = Some(key.to_vec());
620                    diff_1 = (left_side as i16 - right_side as i16).abs();
621                } else {
622                    merge_split_key_2 = Some(key.to_vec());
623                    diff_2 = (left_side as i16 - right_side as i16).abs();
624                }
625            }
626        }
627
628        // Go through the rest of records in the base page
629        while self_meta_option.is_some() {
630            let base_meta = self_meta_option.unwrap();
631            let key_len = base_meta.get_key_len();
632            let value_len = base_meta.value_len();
633            merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
634
635            if merged_size >= split_target_size {
636                // Return the splitting key
637                let cur_base_key = self.get_full_key(base_meta);
638                if merge_split_key_2.is_some() {
639                    break;
640                }
641
642                let left_side: u16 =
643                    merged_size - key_len - value_len - std::mem::size_of::<LeafKVMeta>() as u16;
644                let right_side = total_merged_size - left_side;
645
646                if merge_split_key_1.is_none() {
647                    merge_split_key_1 = Some(cur_base_key);
648                    diff_1 = (left_side as i16 - right_side as i16).abs();
649                } else {
650                    merge_split_key_2 = Some(cur_base_key);
651                    diff_2 = (left_side as i16 - right_side as i16).abs();
652                }
653            }
654            self_meta_option = self_meta_iter.next();
655        }
656
657        // Pick the splitting that achieves the smallest size difference between
658        // the two halves
659        if merge_split_key_1.is_none() {
660            panic!(
661                "Fail to find a splitting key for merging mini and base page.{}, {}",
662                merged_size, split_target_size
663            );
664        }
665
666        if merge_split_key_2.is_none() || diff_1 < diff_2 {
667            return merge_split_key_1.unwrap();
668        }
669
670        merge_split_key_2.unwrap()
671    }
672
673    /// Find the splitting key that divides the merged records of
674    /// the mini-page and its correponding base page evenly in two
675    /// groups (base pages). Special considerations go to the split-key
676    /// bearing (k,v) pair.
677    /// Caller needs to ensure there are at least two distinct keys among the
678    /// mini page and self.
679    #[allow(clippy::unnecessary_unwrap)]
680    pub(crate) fn get_merge_split_key(&mut self, mini_page: &LeafNode) -> Vec<u8> {
681        let mut merge_split_key_1: Option<Vec<u8>> = None;
682        let mut merge_split_key_2: Option<Vec<u8>> = None;
683        let mut diff_1: i16 = i16::MAX;
684        let mut diff_2: i16 = i16::MAX;
685
686        let mut total_merged_size: u16 = 0;
687        let mut base_meta_iter = self.meta_iter();
688        let mut cur_pos = base_meta_iter.cur;
689        let mut cur_base_meta_option = base_meta_iter.next();
690        let mut duplicate_positions = vec![];
691
692        // Calculate the size of all distinctively merged records through merge-sort
693        for mini_meta in mini_page.meta_iter() {
694            let c_type = mini_meta.op_type();
695            // Skip cached or phantom records as they will not be merged into base pages
696            if !c_type.is_dirty() {
697                continue;
698            }
699
700            let cur_mini_key = mini_page.get_full_key(mini_meta);
701            if cur_base_meta_option.is_some() {
702                let mut cur_base_meta = cur_base_meta_option.unwrap();
703                let mut cur_base_key = self.get_full_key(cur_base_meta);
704                let mut cmp = cur_base_key.cmp(&cur_mini_key);
705
706                // Go through all records from the base page whose key is strictly smaller
707                // than the current record from the mini page
708                while cmp == std::cmp::Ordering::Less {
709                    let key_len = cur_base_meta.get_key_len();
710                    let value_len = cur_base_meta.value_len();
711                    total_merged_size +=
712                        key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
713
714                    cur_pos = base_meta_iter.cur;
715                    cur_base_meta_option = base_meta_iter.next();
716                    if cur_base_meta_option.is_some() {
717                        cur_base_meta = cur_base_meta_option.unwrap();
718                        cur_base_key = self.get_full_key(cur_base_meta);
719                        cmp = cur_base_key.cmp(&cur_mini_key);
720                    } else {
721                        break;
722                    }
723                }
724
725                // If the comparison is equal then advance the base page iterator to avoid counting it
726                // twice
727                if cmp == std::cmp::Ordering::Equal && cur_base_meta_option.is_some() {
728                    // Mark the duplicate entry for deletion
729                    duplicate_positions.push(cur_pos);
730                    cur_pos = base_meta_iter.cur;
731                    cur_base_meta_option = base_meta_iter.next();
732                }
733            }
734
735            let key_len = mini_meta.get_key_len();
736            let value_len = mini_meta.value_len();
737            total_merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
738        }
739
740        // Mini-page records are exhuasted, go through the rest of
741        // records from the base page, if any
742        while cur_base_meta_option.is_some() {
743            let base_meta = cur_base_meta_option.unwrap();
744            let key_len = base_meta.get_key_len();
745            let value_len = base_meta.value_len();
746            total_merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
747
748            cur_base_meta_option = base_meta_iter.next();
749        }
750
751        // The target split size is half of all the merged records
752        let split_target_size = total_merged_size / 2;
753
754        // Merge sort the distinct records from the mini page and the base page
755        // until the size of the sorted records reaches the target split size
756        let mut merged_size: u16 = 0;
757        base_meta_iter = self.meta_iter();
758        cur_base_meta_option = base_meta_iter.next();
759
760        for mini_meta in mini_page.meta_iter() {
761            // Skip cached or phantom records as they will
762            // not take additional space after merging
763            let c_type = mini_meta.op_type();
764            if !c_type.is_dirty() {
765                continue;
766            }
767
768            let cur_mini_key = mini_page.get_full_key(mini_meta);
769            if cur_base_meta_option.is_some() {
770                let mut cur_base_meta = cur_base_meta_option.unwrap();
771                let mut cur_base_key = self.get_full_key(cur_base_meta);
772                let mut cmp = cur_base_key.cmp(&cur_mini_key);
773                // Go through all records from the base page whose key is strictly smaller
774                // than the current record from the mini page
775                while cmp == std::cmp::Ordering::Less {
776                    let key_len = cur_base_meta.get_key_len();
777                    let value_len = cur_base_meta.value_len();
778                    merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
779                    if merged_size >= split_target_size {
780                        // Two split key candidates are already found
781                        // Stop
782                        if merge_split_key_2.is_some() {
783                            break;
784                        }
785
786                        let left_side: u16 = merged_size
787                            - key_len
788                            - value_len
789                            - std::mem::size_of::<LeafKVMeta>() as u16;
790                        let right_side = total_merged_size - left_side;
791
792                        if merge_split_key_1.is_none() {
793                            merge_split_key_1 = Some(cur_base_key);
794                            diff_1 = (left_side as i16 - right_side as i16).abs();
795                        } else {
796                            merge_split_key_2 = Some(cur_base_key);
797                            diff_2 = (left_side as i16 - right_side as i16).abs();
798                        }
799                    }
800
801                    cur_base_meta_option = base_meta_iter.next();
802                    if cur_base_meta_option.is_some() {
803                        cur_base_meta = cur_base_meta_option.unwrap();
804                        cur_base_key = self.get_full_key(cur_base_meta);
805                        cmp = cur_base_key.cmp(&cur_mini_key);
806                    } else {
807                        break;
808                    }
809                }
810
811                // If the comparison is equal then advance the base page iterator to avoid counting it
812                // twice
813                if cmp == std::cmp::Ordering::Equal && cur_base_meta_option.is_some() {
814                    cur_base_meta_option = base_meta_iter.next();
815                }
816            }
817
818            let key_len = mini_meta.get_key_len();
819            let value_len = mini_meta.value_len();
820            merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
821
822            if merged_size >= split_target_size {
823                // Two split key candidates are already found
824                // Stop
825                if merge_split_key_2.is_some() {
826                    break;
827                }
828
829                let left_side: u16 =
830                    merged_size - key_len - value_len - std::mem::size_of::<LeafKVMeta>() as u16;
831                let right_side = total_merged_size - left_side;
832
833                if merge_split_key_1.is_none() {
834                    merge_split_key_1 = Some(cur_mini_key);
835                    diff_1 = (left_side as i16 - right_side as i16).abs();
836                } else {
837                    merge_split_key_2 = Some(cur_mini_key);
838                    diff_2 = (left_side as i16 - right_side as i16).abs();
839                }
840            }
841        }
842
843        // Mini-page records are exhuasted, go through the rest of
844        // records from the base page, if any
845        while cur_base_meta_option.is_some() {
846            let base_meta = cur_base_meta_option.unwrap();
847            let key_len = base_meta.get_key_len();
848            let value_len = base_meta.value_len();
849            merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
850
851            if merged_size >= split_target_size {
852                // Return the splitting key
853                let cur_base_key = self.get_full_key(base_meta);
854                if merge_split_key_2.is_some() {
855                    break;
856                }
857
858                let left_side: u16 =
859                    merged_size - key_len - value_len - std::mem::size_of::<LeafKVMeta>() as u16;
860                let right_side = total_merged_size - left_side;
861
862                if merge_split_key_1.is_none() {
863                    merge_split_key_1 = Some(cur_base_key);
864                    diff_1 = (left_side as i16 - right_side as i16).abs();
865                } else {
866                    merge_split_key_2 = Some(cur_base_key);
867                    diff_2 = (left_side as i16 - right_side as i16).abs();
868                }
869            }
870            cur_base_meta_option = base_meta_iter.next();
871        }
872
873        // Delete all duplicate entries from the base page
874        for pos in duplicate_positions {
875            let pos_meta = self.get_kv_meta_mut(pos);
876            pos_meta.mark_as_deleted();
877        }
878
879        if merge_split_key_1.is_none() {
880            unreachable!(
881                "Fail to find a splitting key for merging mini and base page.{}, {}",
882                merged_size, split_target_size
883            );
884        }
885
886        // The two split keys must be different
887        if merge_split_key_2.is_some() {
888            let cmp = merge_split_key_1
889                .as_ref()
890                .unwrap()
891                .cmp(merge_split_key_2.as_ref().unwrap());
892            assert_ne!(cmp, std::cmp::Ordering::Equal);
893        }
894
895        let mut splitting_key = if merge_split_key_2.is_none() || diff_1 < diff_2 {
896            merge_split_key_1.as_ref().unwrap()
897        } else {
898            merge_split_key_2.as_ref().unwrap()
899        };
900
901        // The splitting key cannot be the low fence as it leads to invalid page
902        // and duplicate entries in inner node
903        let mut fence_meta = self.get_kv_meta(LOW_FENCE_IDX);
904        if !fence_meta.is_infinite_low_fence_key() {
905            let low_fence_key = self.get_low_fence_key();
906            let cmp = splitting_key.cmp(&low_fence_key);
907            if cmp == std::cmp::Ordering::Equal {
908                splitting_key = merge_split_key_2.as_ref().unwrap();
909            }
910        }
911
912        fence_meta = self.get_kv_meta(HIGH_FENCE_IDX);
913        if !fence_meta.is_infinite_high_fence_key() {
914            let high_fence_key = self.get_high_fence_key();
915            let cmp = splitting_key.cmp(&high_fence_key);
916            assert_ne!(cmp, std::cmp::Ordering::Equal);
917        }
918
919        splitting_key.clone()
920    }
921
922    pub fn get_key_to_reach_this_node(&self) -> Vec<u8> {
923        let meta = self.get_kv_meta(self.first_meta_pos_after_fence() as usize);
924        self.get_full_key(meta)
925    }
926
927    /// In cache-only mode, transient state could be observed when one thread is in the middle of
928    /// consolidating the leaf node, while the evicting callback thread is attempting a
929    /// unprotected read. As such, this function guarantees no panic happens during the key read
930    pub fn try_get_key_to_reach_this_node(&self) -> Result<Vec<u8>, TreeError> {
931        assert!(self.meta.is_cache_only_leaf());
932
933        // Get the meta of the first key
934        let index = 0;
935        if index >= self.meta.meta_count_with_fence() as usize {
936            return Err(TreeError::NeedRestart);
937        }
938
939        let meta_ptr = self.data.as_ptr() as *const LeafKVMeta;
940        let meta = unsafe { &*meta_ptr.add(index) };
941
942        let key_offset = meta.get_offset();
943        let key_len = meta.get_key_len();
944
945        if key_offset + key_len > LeafNode::max_data_size(self.meta.node_size as usize) as u16 {
946            return Err(TreeError::NeedRestart);
947        }
948
949        let key_ptr = unsafe { self.data.as_ptr().add(key_offset as usize) };
950        let key_slice =
951            unsafe { std::slice::from_raw_parts(key_ptr, (key_len - self.prefix_len) as usize) };
952        Ok(key_slice.to_vec())
953    }
954
955    pub fn get_low_fence_key(&self) -> Vec<u8> {
956        assert!(self.has_fence());
957        let fence_meta = self.get_kv_meta(LOW_FENCE_IDX);
958        if fence_meta.is_infinite_low_fence_key() {
959            vec![]
960        } else {
961            unsafe {
962                let start_ptr = self.data.as_ptr().add(fence_meta.offset as usize);
963                let key_slice =
964                    std::slice::from_raw_parts(start_ptr, fence_meta.get_key_len() as usize);
965                key_slice.to_vec()
966            }
967        }
968    }
969
970    pub fn get_high_fence_key(&self) -> Vec<u8> {
971        assert!(self.has_fence());
972        let fence_meta = self.get_kv_meta(HIGH_FENCE_IDX);
973        if fence_meta.is_infinite_high_fence_key() {
974            vec![]
975        } else {
976            self.get_full_key(fence_meta)
977        }
978    }
979
980    /// Compares the input key with the existing key specified by `meta`
981    /// By convention, key_cmp(meta, key) returns the ordering matching the expression meta <operator> key if true.
982    #[inline]
983    pub(crate) fn key_cmp(&self, meta: &LeafKVMeta, key: &[u8]) -> Ordering {
984        let search_key_prefix = &key[(self.prefix_len as usize)..]
985            [..std::cmp::min(key.len() - self.prefix_len as usize, PREVIEW_SIZE)];
986
987        let prefix_key = &meta.preview_bytes[..std::cmp::min(
988            PREVIEW_SIZE,
989            meta.get_key_len() as usize - self.prefix_len as usize,
990        )];
991        let mut cmp = prefix_key.cmp(search_key_prefix);
992
993        // If the prefix matches, compare the full key
994        if cmp == Ordering::Equal {
995            let full_key = self.get_remaining_key(meta);
996            let search_key_postfix = &key[self.prefix_len as usize..];
997            cmp = full_key.cmp(search_key_postfix);
998        }
999        cmp
1000    }
1001
1002    pub(crate) fn linear_lower_bound(&self, key: &[u8]) -> u16 {
1003        debug_assert!(key.len() >= self.prefix_len as usize);
1004
1005        let mut index = self.first_meta_pos_after_fence();
1006
1007        while index < self.meta.meta_count_with_fence() {
1008            let key_meta = self.get_kv_meta(index as usize);
1009
1010            #[cfg(target_arch = "x86_64")]
1011            unsafe {
1012                // For bw-tree-like linear search, we use clflush to simulate pointer chasing.
1013                core::arch::x86_64::_mm_clflush(key_meta as *const LeafKVMeta as *const u8);
1014            }
1015            let cmp = self.key_cmp(key_meta, key);
1016
1017            if cmp != Ordering::Less {
1018                return index;
1019            }
1020
1021            index += 1;
1022        }
1023
1024        index
1025    }
1026
1027    pub(crate) fn lower_bound(&self, key: &[u8]) -> u16 {
1028        let mut lower = self.first_meta_pos_after_fence();
1029        let mut upper = self.meta.meta_count_with_fence();
1030
1031        let search_key_prefix = &key[(self.prefix_len as usize)..]
1032            [..std::cmp::min(key.len() - self.prefix_len as usize, PREVIEW_SIZE)];
1033
1034        debug_assert!(key.len() >= self.prefix_len as usize);
1035
1036        while lower < upper {
1037            let mid = lower + (upper - lower) / 2;
1038            let key_meta = self.get_kv_meta(mid as usize);
1039
1040            let prefix_key = &key_meta.preview_bytes[..std::cmp::min(
1041                PREVIEW_SIZE,
1042                key_meta.get_key_len() as usize - self.prefix_len as usize,
1043            )];
1044            let mut cmp = prefix_key.cmp(search_key_prefix);
1045
1046            // If the prefix matches, compare the full key
1047            if cmp == Ordering::Equal {
1048                let remaining_key = self.get_remaining_key(key_meta);
1049                let search_key_postfix = &key[self.prefix_len as usize..];
1050                cmp = remaining_key.cmp(search_key_postfix);
1051            }
1052
1053            match cmp {
1054                Ordering::Greater => {
1055                    upper = mid;
1056                }
1057                Ordering::Equal => {
1058                    return mid;
1059                }
1060                Ordering::Less => {
1061                    lower = mid + 1;
1062                }
1063            }
1064        }
1065        lower
1066    }
1067
1068    /// Take a deep breath before you read/change this function.
1069    ///
1070    /// Leaf node insert is different from inner node that:
1071    /// 1. Leaf node value is variable length
1072    /// 2. Leaf node need to handle duplicate key, in which case it need to remove the old value and insert the new value.
1073    ///
1074    /// Returns true if the insert is successful, false if out of space.
1075    pub(crate) fn insert(
1076        &mut self,
1077        key: &[u8],
1078        value: &[u8],
1079        op_type: OpType,
1080        max_fence_len: usize,
1081    ) -> bool {
1082        debug_assert!(key.len() as u16 >= self.prefix_len);
1083        match op_type {
1084            OpType::Insert | OpType::Cache => {
1085                debug_assert!(!value.is_empty());
1086            }
1087            OpType::Delete | OpType::Phantom => {}
1088        }
1089
1090        let post_fix_len = key.len() as u16 - self.prefix_len;
1091        let val_len = value.len() as u16;
1092        let kv_len = post_fix_len + val_len;
1093
1094        let value_count_with_fence = self.meta.meta_count_with_fence();
1095
1096        let pos = self.lower_bound(key) as usize;
1097
1098        if pos < value_count_with_fence as usize {
1099            let prefix_len = self.prefix_len as usize;
1100            let pos_meta = self.get_kv_meta(pos);
1101            let pos_key = self.get_remaining_key(pos_meta);
1102            let search_key_postfix = &key[prefix_len..];
1103            if pos_key.cmp(search_key_postfix) == Ordering::Equal {
1104                // The key already exists.
1105                counter!(LeafInsertDuplicate);
1106                if op_type == OpType::Delete {
1107                    let pos_meta = self.get_kv_meta_mut(pos);
1108                    pos_meta.mark_as_deleted();
1109                    return true;
1110                }
1111
1112                let pos_value = self.get_value(pos_meta);
1113                let pos_value_len = pos_value.len() as u16;
1114
1115                if pos_value_len >= val_len {
1116                    // we are lucky, old value is larger than new value. We just overwrite the old value.
1117                    unsafe {
1118                        let pair_ptr = self.data.as_ptr().add(pos_meta.offset as usize) as *mut u8;
1119                        std::ptr::copy_nonoverlapping(
1120                            value.as_ptr(),
1121                            pair_ptr.add(post_fix_len as usize),
1122                            val_len as usize,
1123                        );
1124                    }
1125                    let pos_meta = self.get_kv_meta_mut(pos);
1126                    pos_meta.set_value_len(val_len);
1127                    pos_meta.set_op_type(op_type);
1128                    return true;
1129                }
1130
1131                if self.meta.remaining_size < kv_len {
1132                    return false;
1133                }
1134                assert!(op_type != OpType::Cache);
1135                let offset = self.current_lowest_offset() - kv_len;
1136                unsafe {
1137                    let pair_ptr = self.data.as_ptr().add(offset as usize) as *mut u8;
1138
1139                    let pos_meta = self.get_kv_meta_mut(pos);
1140                    pos_meta.set_value_len(val_len);
1141                    pos_meta.set_op_type(op_type);
1142                    pos_meta.offset = offset;
1143                    std::ptr::copy_nonoverlapping(
1144                        key[self.prefix_len as usize..].as_ptr(),
1145                        pair_ptr,
1146                        post_fix_len as usize,
1147                    );
1148                    std::ptr::copy_nonoverlapping(
1149                        value.as_ptr(),
1150                        pair_ptr.add(post_fix_len as usize),
1151                        val_len as usize,
1152                    );
1153                }
1154                self.meta.remaining_size -= kv_len;
1155                return true;
1156            }
1157        }
1158
1159        // The key is not already in the node.
1160        // For any in-memory page, skipping delete records here could lead to empty page
1161        // which becomes un-evictable.
1162        if op_type == OpType::Delete && (self.is_base_page()) {
1163            // we are deleting a key that is not in the page.
1164            return true;
1165        }
1166
1167        //Check if the node has capacity for the new record with or without fences
1168        if self.full_with_fences(
1169            kv_len + std::mem::size_of::<LeafKVMeta>() as u16,
1170            max_fence_len,
1171        ) {
1172            return false;
1173        }
1174
1175        counter!(LeafInsertNew);
1176
1177        let offset = self.current_lowest_offset() - kv_len;
1178        let new_meta =
1179            LeafKVMeta::make_prefixed_meta(offset, val_len, key, self.prefix_len, op_type);
1180
1181        unsafe {
1182            let metas_size = std::mem::size_of::<LeafKVMeta>()
1183                * (self.meta.meta_count_with_fence() - pos as u16) as usize;
1184            let src_ptr = self
1185                .data
1186                .as_ptr()
1187                .add(pos * std::mem::size_of::<LeafKVMeta>());
1188            let dest_ptr = self
1189                .data
1190                .as_mut_ptr()
1191                .add((pos + 1) * std::mem::size_of::<LeafKVMeta>());
1192            std::ptr::copy(src_ptr, dest_ptr, metas_size);
1193
1194            self.write_initial_kv_meta(pos, new_meta);
1195
1196            let pair_ptr = self.data.as_mut_ptr().add(offset as usize);
1197            std::ptr::copy_nonoverlapping(
1198                key[self.prefix_len as usize..].as_ptr(),
1199                pair_ptr,
1200                post_fix_len as usize,
1201            );
1202            std::ptr::copy_nonoverlapping(
1203                value.as_ptr(),
1204                pair_ptr.add(post_fix_len as usize),
1205                val_len as usize,
1206            );
1207        }
1208
1209        self.meta.remaining_size -= kv_len + std::mem::size_of::<LeafKVMeta>() as u16;
1210        self.meta.increment_value_count();
1211        true
1212    }
1213
1214    /// Base pages are leaf pages without next pointer in non cache-only mode
1215    pub(crate) fn is_base_page(&self) -> bool {
1216        (!self.meta.is_cache_only_leaf()) && self.next_level.is_null()
1217    }
1218
1219    /// Returns the splitting key after splitting self
1220    /// The new high fence key of self (left-side node) and low fence
1221    /// of the new base page (right-side node) equal to the splitting key
1222    pub fn split(
1223        &mut self,
1224        sibling: &mut LeafNode,
1225        cache_only: bool,
1226        snapshot_version: u64,
1227    ) -> Vec<u8> {
1228        if cache_only {
1229            assert!(self.meta.is_cache_only_leaf() && !self.has_fence());
1230        } else {
1231            assert!(self.is_base_page() && self.has_fence());
1232        }
1233
1234        let current_count = self.meta.meta_count_without_fence();
1235
1236        // [new_cur_count..] are moved to the sibling node
1237        // [..new_cur_count - 1] are kept in self
1238        // where splitting_key is the key at new_cur_count
1239        // Also, new_cur_count is in [0..(#non_fence_meta - 1)]
1240        let (splitting_key, new_cur_count) = self.get_split_key(cache_only);
1241        let sibling_cnt = current_count - new_cur_count;
1242
1243        // Now we have to do two things:
1244        // Copy the second half of the key-value pairs to the new node, setting the correct offsets.
1245        // Consolidate the key-value pairs in the current node, setting the correct offsets
1246        if !cache_only {
1247            let low_fence_for_right = self.get_kv_meta(FENCE_KEY_CNT + new_cur_count as usize);
1248            assert!(!low_fence_for_right.is_infinite_low_fence_key());
1249            let high_fence_for_right = self.get_kv_meta(HIGH_FENCE_IDX);
1250
1251            let low_fence_key = self.get_full_key(low_fence_for_right);
1252            let high_fence_key = if high_fence_for_right.is_infinite_high_fence_key() {
1253                vec![]
1254            } else {
1255                self.get_full_key(high_fence_for_right)
1256            };
1257
1258            sibling.initialize(
1259                &low_fence_key,
1260                &high_fence_key,
1261                sibling.meta.node_size as usize,
1262                MiniPageNextLevel::new_null(),
1263                true,
1264                cache_only,
1265                snapshot_version,
1266            );
1267        } else {
1268            sibling.initialize(
1269                &[],
1270                &[],
1271                sibling.meta.node_size as usize,
1272                MiniPageNextLevel::new_null(),
1273                false,
1274                cache_only,
1275                snapshot_version,
1276            );
1277        }
1278
1279        let starting_kv_idx = if cache_only { 0 } else { FENCE_KEY_CNT };
1280
1281        for i in 0..sibling_cnt {
1282            let kv_meta = self.get_kv_meta((new_cur_count + i) as usize + starting_kv_idx);
1283            if kv_meta.op_type() == OpType::Delete {
1284                // skip deleted records.
1285                continue;
1286            }
1287            let key = self.get_full_key(kv_meta);
1288            let value = self.get_value(kv_meta);
1289            let insert_rt = sibling.insert(&key, value, OpType::Insert, 0);
1290            assert!(insert_rt);
1291        }
1292
1293        if !cache_only {
1294            self.meta
1295                .set_value_count(new_cur_count + FENCE_KEY_CNT as u16);
1296        } else {
1297            self.meta.set_value_count(new_cur_count);
1298        }
1299
1300        self.consolidate_after_split(&splitting_key, snapshot_version);
1301
1302        splitting_key
1303    }
1304
1305    /// Use the passed in `merge_split_key` as the splitting key
1306    /// to divide the base page in two. Also, install it as the high
1307    /// fence key of the left-side node and the low fence key of the
1308    /// right-side node
1309    pub fn split_with_key(
1310        &mut self,
1311        sibling: &mut LeafNode,
1312        merge_split_key: &Vec<u8>,
1313        cache_only: bool,
1314        snapshot_version: u64,
1315    ) {
1316        if cache_only {
1317            assert!(self.meta.is_cache_only_leaf() && !self.has_fence());
1318        } else {
1319            assert!(self.is_base_page() && self.has_fence());
1320        }
1321
1322        let current_count = self.meta.meta_count_without_fence();
1323        // [new_cur_count..] are moved to the sibling node
1324        // [..new_cur_count - 1] are kept in self
1325        // where splitting_key is the key at new_cur_count
1326        // Also, new_cur_count is in [0..#non_fence_meta]
1327        let mut new_cur_count = self.get_kv_num_below_key(merge_split_key);
1328        let sibling_cnt = current_count - new_cur_count;
1329
1330        // Now we have to do two things:
1331        // Copy the second half of the key-value pairs to the new node, setting the correct offsets.
1332        // Consolidate the key-value pairs in the current node, setting the correct offsets.
1333        if !cache_only {
1334            let high_fence_for_right = self.get_kv_meta(HIGH_FENCE_IDX);
1335
1336            let low_fence_key = merge_split_key.clone();
1337            let high_fence_key = if high_fence_for_right.is_infinite_high_fence_key() {
1338                vec![]
1339            } else {
1340                self.get_full_key(high_fence_for_right)
1341            };
1342
1343            sibling.initialize(
1344                &low_fence_key,
1345                &high_fence_key,
1346                sibling.meta.node_size as usize,
1347                MiniPageNextLevel::new_null(),
1348                true,
1349                cache_only,
1350                snapshot_version,
1351            );
1352        } else {
1353            sibling.initialize(
1354                &[],
1355                &[],
1356                sibling.meta.node_size as usize,
1357                MiniPageNextLevel::new_null(),
1358                false,
1359                cache_only,
1360                snapshot_version,
1361            );
1362        }
1363
1364        let starting_kv_index = if cache_only { 0 } else { FENCE_KEY_CNT };
1365
1366        for i in 0..sibling_cnt {
1367            let kv_meta = self.get_kv_meta((new_cur_count + i) as usize + starting_kv_index);
1368            if kv_meta.op_type() == OpType::Delete {
1369                // skip deleted records
1370                continue;
1371            }
1372            let key = self.get_full_key(kv_meta);
1373            let value = self.get_value(kv_meta);
1374            let insert_rt = sibling.insert(&key, value, OpType::Insert, 0);
1375            assert!(insert_rt);
1376        }
1377
1378        if !cache_only {
1379            new_cur_count += FENCE_KEY_CNT as u16;
1380        }
1381
1382        self.meta.set_value_count(new_cur_count);
1383        self.consolidate_after_split(merge_split_key, snapshot_version);
1384
1385        if !cache_only {
1386            // Assert the high fence of the left side node and the low fence of the right side node
1387            // are both set to the merge split key
1388            let left_high_fence = self.get_kv_meta(HIGH_FENCE_IDX);
1389            let left_high_fence_key = self.get_full_key(left_high_fence);
1390            let right_low_fence_key = sibling.get_low_fence_full_key();
1391
1392            assert_eq!(left_high_fence_key, *merge_split_key); // The high fence of the left side node == splitting key
1393            assert_eq!(right_low_fence_key, *merge_split_key);
1394        }
1395    }
1396
1397    pub(crate) fn consolidate_inner(
1398        &mut self,
1399        new_optype: OpType,
1400        new_high_fence: Option<&[u8]>,
1401        skip_tombstone: bool,
1402        cache_only: bool,
1403        skip_key: Option<&[u8]>,
1404        snapshot_version: u64,
1405    ) {
1406        let mut pairs = Vec::new();
1407
1408        for meta in self.meta_iter() {
1409            if skip_tombstone && meta.op_type() == OpType::Delete {
1410                // Skip tombstone values.
1411                continue;
1412            }
1413
1414            match skip_key {
1415                // Skip the record with the skip_key
1416                Some(s_k) => {
1417                    let k = self.get_full_key(meta);
1418                    let cmp = s_k.cmp(&k);
1419
1420                    if cmp != Ordering::Equal {
1421                        pairs.push((k, self.get_value(meta).to_owned(), meta.op_type()));
1422                    }
1423                }
1424                None => {
1425                    pairs.push((
1426                        self.get_full_key(meta),
1427                        self.get_value(meta).to_owned(),
1428                        meta.op_type(),
1429                    ));
1430                }
1431            }
1432        }
1433
1434        let has_fence = self.has_fence();
1435        let (low_fence, high_fence) = if has_fence {
1436            let high_fence_key = match new_high_fence {
1437                None => self.get_high_fence_key(),
1438                Some(key) => key.to_vec(),
1439            };
1440            (self.get_low_fence_key(), high_fence_key)
1441        } else {
1442            (vec![], vec![])
1443        };
1444
1445        let node_size = self.meta.node_size;
1446
1447        self.initialize(
1448            &low_fence,
1449            &high_fence,
1450            node_size as usize,
1451            self.next_level,
1452            has_fence,
1453            cache_only,
1454            snapshot_version,
1455        );
1456
1457        for (key, value, op_type) in pairs {
1458            let rt = if op_type == OpType::Delete {
1459                self.insert(&key, &value, OpType::Delete, 0)
1460            } else {
1461                self.insert(&key, &value, new_optype, 0)
1462            };
1463            assert!(rt);
1464        }
1465    }
1466
1467    #[allow(dead_code)]
1468    pub(crate) fn consolidate(&mut self, snapshot_version: u64) {
1469        self.consolidate_inner(
1470            OpType::Insert,
1471            None,
1472            true,
1473            self.meta.is_cache_only_leaf(),
1474            None,
1475            snapshot_version,
1476        );
1477    }
1478
1479    /// For mini page only.
1480    /// After consolidation, every tombstone records are removed, every insert records become cache records.
1481    #[allow(dead_code)]
1482    pub(crate) fn consolidate_after_merge(&mut self, snapshot_version: u64) {
1483        assert!(!self.is_base_page());
1484        self.consolidate_inner(
1485            OpType::Cache,
1486            None,
1487            false,
1488            self.meta.is_cache_only_leaf(),
1489            None,
1490            snapshot_version,
1491        );
1492    }
1493
1494    /// For base page in non cache-only mode or mini page in cache-only mode.
1495    /// Note that this operation could empty the page so the caller needs to ensure it won't lead to an empty mini page in memory.
1496    /// Re-organize the key-value pairs to remove the holes in the data.
1497    /// A delete/split operation will leave holes in the data field, which is not good for space efficiency.
1498    /// The easiest (but not most efficient) way to do this is to first populate every key value out, then reset the node state, then insert them back.
1499    fn consolidate_after_split(&mut self, high_fence: &[u8], snapshot_version: u64) {
1500        assert!(self.meta.is_cache_only_leaf() || self.is_base_page());
1501        self.consolidate_inner(
1502            OpType::Insert,
1503            Some(high_fence),
1504            true,
1505            self.meta.is_cache_only_leaf(),
1506            None,
1507            snapshot_version,
1508        );
1509    }
1510
1511    /// For mini page only in cache-only mode only.
1512    /// Note that this operation could empty the page so the caller needs to ensure it won't lead to an empty mini page in memory.
1513    /// Same as consolidate(), except that any record with the given key is dropped
1514    pub(crate) fn consolidate_skip_key(&mut self, key: &[u8], snapshot_version: u64) {
1515        assert!(!self.is_base_page());
1516        assert!(self.meta.is_cache_only_leaf());
1517        self.consolidate_inner(
1518            OpType::Insert,
1519            None,
1520            true,
1521            self.meta.is_cache_only_leaf(),
1522            Some(key),
1523            snapshot_version,
1524        );
1525    }
1526
1527    /// Copy a mini-page to a new memory location
1528    pub(crate) fn copy_initialize_to(
1529        &self,
1530        dst_node: *mut LeafNode,
1531        dst_size: usize,
1532        discard_cold_cache: bool,
1533        snapshot_version: u64,
1534    ) {
1535        assert!(!self.is_base_page());
1536        assert!(self.meta.node_size as usize <= dst_size);
1537        let dst_ref = unsafe { &mut *dst_node };
1538        let empty = vec![];
1539
1540        dst_ref.initialize(
1541            &empty,
1542            &empty,
1543            dst_size,
1544            self.next_level,
1545            false, // Mini-page only, thus no fence
1546            self.meta.is_cache_only_leaf(),
1547            snapshot_version,
1548        );
1549
1550        for meta in self.meta_iter() {
1551            let op = meta.op_type();
1552
1553            // Skip the cold values
1554            // This won't happen in cache-only mode as all records are dirty
1555            // In non cache-only mode, this won't lead to empty mini-page as
1556            // this is invoked after a read-hot or write-hot record.
1557            if discard_cold_cache && op.is_cache() && !meta.is_referenced() {
1558                continue;
1559            }
1560
1561            let value = self.get_value(meta);
1562            let rt = dst_ref.insert(&self.get_full_key(meta), value, op, 0);
1563            assert!(rt);
1564        }
1565    }
1566
1567    /// Initialize a LeafNode
1568    #[allow(clippy::too_many_arguments)]
1569    pub(crate) fn initialize(
1570        &mut self,
1571        low_fence: &[u8],
1572        high_fence: &[u8],
1573        node_size: usize,
1574        next_level: MiniPageNextLevel,
1575        has_fence: bool,
1576        cache_only: bool,
1577        snapshot_version: u64,
1578    ) {
1579        // The initial version of a page is marked as not changed
1580        self.snapshot_version = snapshot_version & Self::LEAF_SNAPSHOT_VERSION_MASK;
1581
1582        if !has_fence {
1583            assert!(low_fence.is_empty());
1584            assert!(high_fence.is_empty());
1585
1586            self.meta = NodeMeta::new(
1587                LeafNode::max_data_size(node_size) as u16,
1588                false,
1589                false,
1590                node_size as u16,
1591                cache_only,
1592            );
1593            self.prefix_len = 0;
1594            self.next_level = next_level;
1595        } else {
1596            self.meta = NodeMeta::new(
1597                LeafNode::max_data_size(node_size) as u16, // - max_fence_len as u16, // Reserve space for
1598                false,
1599                true,
1600                node_size as u16,
1601                cache_only,
1602            );
1603
1604            self.prefix_len = common_prefix_len(low_fence, high_fence);
1605            self.next_level = next_level;
1606            self.install_fence_key(low_fence, false);
1607            self.install_fence_key(high_fence, true);
1608        }
1609    }
1610
1611    pub(crate) fn set_split_flag(&mut self) {
1612        self.meta.set_split_flag();
1613    }
1614
1615    pub(crate) fn get_split_flag(&self) -> bool {
1616        self.meta.get_split_flag()
1617    }
1618
1619    // Highest bit (63) is used as "changed" indicator flag.
1620    // Bits 0-62 hold the actual version value.
1621    const LEAF_SNAPSHOT_VERSION_CHANGED_FLAG: u64 = 0x8000_0000_0000_0000;
1622    const LEAF_SNAPSHOT_VERSION_MASK: u64 = 0x7FFF_FFFF_FFFF_FFFF;
1623
1624    /// Get the snapshot version value without the "is_changed" flag.
1625    pub(crate) fn get_clean_snapshot_version(&self) -> u64 {
1626        self.snapshot_version & Self::LEAF_SNAPSHOT_VERSION_MASK
1627    }
1628
1629    /// Set the snapshot version and mark whether it changed via the highest bit flag.
1630    pub(crate) fn set_snapshot_version(&mut self, snapshot_version: u64, mark_changed: bool) {
1631        let next_version = snapshot_version & Self::LEAF_SNAPSHOT_VERSION_MASK;
1632
1633        if !mark_changed {
1634            self.snapshot_version = next_version;
1635            return;
1636        }
1637
1638        self.snapshot_version = next_version | Self::LEAF_SNAPSHOT_VERSION_CHANGED_FLAG;
1639    }
1640
1641    /// Check whether the snapshot version has changed.
1642    pub(crate) fn is_snapshot_version_changed(&self) -> bool {
1643        (self.snapshot_version & Self::LEAF_SNAPSHOT_VERSION_CHANGED_FLAG) != 0
1644    }
1645
1646    /// Set the "changed" flag without changing the version value.
1647    pub(crate) fn set_snapshot_version_changed_flag(&mut self) {
1648        self.snapshot_version |= Self::LEAF_SNAPSHOT_VERSION_CHANGED_FLAG;
1649    }
1650
1651    /// Clear the "changed" flag without changing the version value.
1652    pub(crate) fn clear_snapshot_version_changed_flag(&mut self) {
1653        self.snapshot_version &= Self::LEAF_SNAPSHOT_VERSION_MASK;
1654    }
1655
1656    pub(crate) fn read_by_key(&self, search_key: &[u8], out_buffer: &mut [u8]) -> LeafReadResult {
1657        self.read_by_key_inner(search_key, out_buffer, true)
1658    }
1659
1660    /// Read by key.
1661    #[must_use]
1662    pub(crate) fn read_by_key_inner(
1663        &self,
1664        search_key: &[u8],
1665        out_buffer: &mut [u8],
1666        binary_search: bool,
1667    ) -> LeafReadResult {
1668        let val_count = self.meta.meta_count_with_fence();
1669        let pos = if binary_search {
1670            self.lower_bound(search_key)
1671        } else {
1672            self.linear_lower_bound(search_key)
1673        };
1674
1675        if pos >= val_count {
1676            counter!(LeafNotFoundDueToRange);
1677            return LeafReadResult::NotFound;
1678        }
1679
1680        let kv_meta = self.get_kv_meta(pos as usize);
1681        let target_key = self.get_remaining_key(kv_meta);
1682
1683        // If the key is not already referenced, we need to mark it as referenced.
1684        if !kv_meta.is_referenced() {
1685            kv_meta.mark_as_ref();
1686        }
1687
1688        let input_post_key = &search_key[self.prefix_len as usize..];
1689        let cmp = target_key.cmp(input_post_key);
1690
1691        if cmp != Ordering::Equal {
1692            counter!(LeafNotFoundDueToKey);
1693            LeafReadResult::NotFound
1694        } else {
1695            if kv_meta.op_type().is_absent() {
1696                return LeafReadResult::Deleted;
1697            }
1698            let val_len = kv_meta.value_len();
1699            let val_ref = self.get_value(kv_meta);
1700            debug_assert_eq!(val_len as usize, val_ref.len());
1701            out_buffer[..val_len as usize].copy_from_slice(val_ref);
1702            LeafReadResult::Found(val_len as u32)
1703        }
1704    }
1705
1706    /// Pick the smallest mini-page size that the record fits in without filling it full
1707    /// For Bf-Tree with backend storage, the size is strictly less than the leaf page size (full)
1708    /// For cache-only Bf-Tree, we allow the mini-page size to reach leaf page size
1709    /// Assuming page_classes is in ascending order
1710    pub(crate) fn get_chain_size_hint(
1711        key_len: usize,
1712        value_len: usize,
1713        page_classes: &[usize],
1714        cache_only: bool,
1715    ) -> usize {
1716        let mut initial_record_size = key_len + value_len + std::mem::size_of::<LeafKVMeta>();
1717        initial_record_size += std::mem::size_of::<LeafNode>();
1718
1719        if let Some(s) = page_classes[0..(page_classes.len() - 1)]
1720            .iter()
1721            .position(|x| initial_record_size < *x)
1722        {
1723            return page_classes[s];
1724        } else if cache_only && initial_record_size <= page_classes[page_classes.len() - 1] {
1725            return page_classes[page_classes.len() - 1];
1726        }
1727
1728        panic!(
1729            "Record size {} plus metadata exceeds the max mini-page size {:?}",
1730            initial_record_size, page_classes
1731        );
1732    }
1733
1734    /// A mini-page is upgraded to the next size up where the record fits in without filling it full.
1735    /// For Bf-Tree with backend storage, the new size must be smaller than the leaf page size (full)
1736    /// For cache-only Bf-Tree, we allow the mini-page size to reach leaf page size
1737    /// If not possible, return None
1738    /// Assuming page_classes is in ascending order
1739    pub(crate) fn new_size_if_upgrade(
1740        &self,
1741        incoming_size: usize,
1742        page_classes: &[usize],
1743        cache_only: bool,
1744    ) -> Option<usize> {
1745        let cur_size = self.meta.node_size as usize;
1746        let request_size = cur_size + incoming_size;
1747
1748        if let Some(s) = page_classes[0..(page_classes.len() - 1)]
1749            .iter()
1750            .position(|x| request_size < *x)
1751        {
1752            if s == 0 {
1753                panic!("Should not be here");
1754            }
1755            return Some(page_classes[s]);
1756        } else if cache_only && request_size <= page_classes[page_classes.len() - 1] {
1757            return Some(page_classes[page_classes.len() - 1]);
1758        }
1759
1760        None
1761    }
1762
1763    /// Currently free node can only be called with base node. Mini page should be freed differently.
1764    pub(crate) fn free_base_page(node: *mut LeafNode) {
1765        assert!(unsafe { &*node }.is_base_page());
1766        let node_size = unsafe { &*node }.meta.node_size as usize;
1767        let layout = Layout::from_size_align(node_size, std::mem::align_of::<LeafNode>()).unwrap();
1768        unsafe {
1769            std::alloc::dealloc(node as *mut u8, layout);
1770        }
1771    }
1772
1773    pub(crate) fn need_actually_merge_to_disk(&self) -> bool {
1774        for meta in self.meta_iter() {
1775            if meta.op_type().is_dirty() {
1776                return true;
1777            }
1778        }
1779
1780        false
1781    }
1782
1783    fn estimate_merge_size(&self) -> usize {
1784        let mut required_size = 0;
1785
1786        for meta in self.meta_iter() {
1787            if meta.op_type() == OpType::Insert {
1788                required_size += (meta.get_key_len() + meta.value_len()) as usize
1789                    + std::mem::size_of::<LeafKVMeta>();
1790            }
1791        }
1792
1793        required_size
1794    }
1795
1796    /// Determine if the leaf node has space of the requested_size given the full fence length
1797    /// This is required as fences could be added to a base page during consolidation
1798    pub(crate) fn full_with_fences(&self, requested_size: u16, max_fence_len: usize) -> bool {
1799        if self.meta.remaining_size < requested_size {
1800            return true;
1801        }
1802
1803        if self.meta.has_fence() {
1804            let mut empty_data_size = self.meta.remaining_size; // Counting fence keys
1805
1806            let low_key_meta = self.get_kv_meta(LOW_FENCE_IDX);
1807            if !low_key_meta.is_infinite_low_fence_key() {
1808                empty_data_size += low_key_meta.get_key_len();
1809            }
1810
1811            let high_key_meta = self.get_kv_meta(HIGH_FENCE_IDX);
1812            if !high_key_meta.is_infinite_high_fence_key() {
1813                empty_data_size += high_key_meta.get_key_len();
1814            }
1815
1816            if empty_data_size >= requested_size + max_fence_len as u16 {
1817                return false;
1818            }
1819
1820            return true;
1821        }
1822
1823        false
1824    }
1825
1826    pub(crate) fn merge_mini_page(&mut self, mini_page: &LeafNode, max_fence_len: usize) -> bool {
1827        let size_required = mini_page.estimate_merge_size();
1828
1829        if self.full_with_fences(size_required as u16, max_fence_len) {
1830            return false;
1831        }
1832
1833        // Merge the clean snapshot version too.
1834        self.snapshot_version = mini_page.get_clean_snapshot_version();
1835        for meta in mini_page.meta_iter() {
1836            let c_key = mini_page.get_full_key(meta);
1837            let c_type = meta.op_type();
1838            if !c_type.is_dirty() {
1839                // This is important. We don't want to merge cache records, not only for performance but also correctness.
1840                // A cached record might be inaccessible (thus invalid).
1841                // Consider the case where scan operation merged the mini page, which triggers split,
1842                // The scan operation want to keep the mini page (why not), but the records are all in cache mode.
1843                continue;
1844            }
1845            let c_value = mini_page.get_value(meta);
1846            let rt = self.insert(&c_key, c_value, c_type, 0);
1847            assert!(rt);
1848        }
1849
1850        true
1851    }
1852
1853    pub(crate) fn get_stats(&self) -> LeafStats {
1854        let mut keys = Vec::new();
1855        let mut values = Vec::new();
1856        let mut op_types = Vec::new();
1857        let node_size = self.meta.node_size as usize;
1858        let prefix = self.get_prefix().to_owned();
1859
1860        for meta in self.meta_iter() {
1861            let key = self.get_full_key(meta);
1862            let value = self.get_value(meta);
1863            keys.push(key);
1864            values.push(value.to_owned());
1865            op_types.push(meta.op_type());
1866        }
1867
1868        LeafStats {
1869            keys,
1870            values,
1871            op_types,
1872            prefix,
1873            base_node: None,
1874            next_level: self.next_level,
1875            node_size,
1876        }
1877    }
1878
1879    pub(crate) fn has_fence(&self) -> bool {
1880        self.meta.has_fence()
1881    }
1882
1883    /// Returns an iterator that iterates over all key-value pairs, skipping the fence keys.
1884    pub(crate) fn meta_iter(&self) -> LeafMetaIter<'_> {
1885        LeafMetaIter::new(self)
1886    }
1887
1888    fn first_meta_pos_after_fence(&self) -> u16 {
1889        if self.has_fence() {
1890            FENCE_KEY_CNT as u16
1891        } else {
1892            0
1893        }
1894    }
1895
1896    pub(crate) fn get_record_by_pos_with_bound(
1897        &self,
1898        pos: u32,
1899        out_buffer: &mut [u8],
1900        return_field: ScanReturnField,
1901        bound_key: &Option<Vec<u8>>,
1902    ) -> GetScanRecordByPosResult {
1903        if pos >= self.meta.meta_count_with_fence() as u32 {
1904            return GetScanRecordByPosResult::EndOfLeaf;
1905        }
1906
1907        let meta = self.get_kv_meta(pos as usize);
1908
1909        if meta.op_type().is_absent() {
1910            return GetScanRecordByPosResult::Deleted;
1911        }
1912
1913        match return_field {
1914            ScanReturnField::Value => {
1915                if let Some(bk) = bound_key {
1916                    let cmp = self.get_full_key(meta).as_slice().cmp(bk);
1917                    if cmp == Ordering::Greater {
1918                        return GetScanRecordByPosResult::BoundKeyExceeded;
1919                    }
1920                }
1921
1922                let value = self.get_value(meta);
1923                let value_len = meta.value_len() as usize;
1924                out_buffer[..value_len].copy_from_slice(value);
1925                GetScanRecordByPosResult::Found(0, value_len as u32)
1926            }
1927            ScanReturnField::Key => {
1928                let full_key = self.get_full_key(meta);
1929
1930                if let Some(bk) = bound_key {
1931                    let cmp = full_key.as_slice().cmp(bk);
1932                    if cmp == Ordering::Greater {
1933                        return GetScanRecordByPosResult::BoundKeyExceeded;
1934                    }
1935                }
1936
1937                let key_len = full_key.len();
1938                out_buffer[..key_len].copy_from_slice(&full_key);
1939                GetScanRecordByPosResult::Found(key_len as u32, 0)
1940            }
1941            ScanReturnField::KeyAndValue => {
1942                let full_key = self.get_full_key(meta);
1943
1944                if let Some(bk) = bound_key {
1945                    let cmp = full_key.as_slice().cmp(bk);
1946                    if cmp == Ordering::Greater {
1947                        return GetScanRecordByPosResult::BoundKeyExceeded;
1948                    }
1949                }
1950
1951                let key_len = full_key.len();
1952                let value = self.get_value(meta);
1953                let value_len = meta.value_len() as usize;
1954
1955                out_buffer[..key_len].copy_from_slice(&full_key);
1956                out_buffer[key_len..key_len + value_len].copy_from_slice(value);
1957
1958                GetScanRecordByPosResult::Found(key_len as u32, value_len as u32)
1959            }
1960        }
1961    }
1962}
1963
1964pub(crate) struct LeafMetaIter<'a> {
1965    node: &'a LeafNode,
1966    cur: usize,
1967}
1968
1969impl LeafMetaIter<'_> {
1970    fn new(leaf: &LeafNode) -> LeafMetaIter<'_> {
1971        let cur = leaf.first_meta_pos_after_fence();
1972        LeafMetaIter {
1973            node: leaf,
1974            cur: cur as usize,
1975        }
1976    }
1977}
1978
1979impl<'a> Iterator for LeafMetaIter<'a> {
1980    type Item = &'a LeafKVMeta;
1981
1982    fn next(&mut self) -> Option<Self::Item> {
1983        if self.cur < self.node.meta.meta_count_with_fence() as usize {
1984            let rt = self.node.get_kv_meta(self.cur);
1985            self.cur += 1;
1986            Some(rt)
1987        } else {
1988            None
1989        }
1990    }
1991}
1992
1993pub(crate) enum GetScanRecordByPosResult {
1994    EndOfLeaf,
1995    Deleted,
1996    Found(u32, u32),  // length of returned key and value
1997    BoundKeyExceeded, // The key at the pos exceeds a given bound key
1998}
1999
2000#[derive(Debug, PartialEq, Eq, Clone)]
2001pub enum LeafReadResult {
2002    Deleted,
2003    NotFound,
2004    Found(u32),
2005    InvalidKey,
2006}
2007
2008#[cfg(test)]
2009mod tests {
2010    use super::*;
2011    use bytemuck::cast_slice;
2012    use rstest::rstest;
2013
2014    #[test]
2015    fn test_op_type_enum() {
2016        assert_eq!(OpType::Insert as u8, 0);
2017        assert_eq!(OpType::Delete as u8, 1);
2018        assert_eq!(OpType::Cache as u8, 2);
2019    }
2020
2021    #[test]
2022    fn test_make_prefixed_meta() {
2023        let key = [1, 2, 3, 4, 5];
2024        let prefix_len = 2;
2025        let meta = LeafKVMeta::make_prefixed_meta(10, 20, &key, prefix_len, OpType::Insert);
2026
2027        assert_eq!(meta.offset, 10);
2028        assert_eq!(meta.get_key_len(), 5);
2029        assert_eq!(meta.value_len(), 20);
2030        assert_eq!(meta.preview_bytes, [3, 4]);
2031        assert_eq!(meta.op_type(), OpType::Insert);
2032        assert!(!meta.is_referenced());
2033    }
2034
2035    #[test]
2036    fn test_make_infinite_high_fence_key() {
2037        let meta = LeafKVMeta::make_infinite_high_fence_key();
2038        assert_eq!(meta.offset, u16::MAX);
2039        assert_eq!(meta.get_key_len(), 0);
2040        assert_eq!(meta.value_len(), 0);
2041        assert!(meta.is_infinite_high_fence_key());
2042    }
2043
2044    #[test]
2045    fn test_make_infinite_low_fence_key() {
2046        let meta = LeafKVMeta::make_infinite_low_fence_key();
2047        assert_eq!(meta.offset, u16::MAX - 1);
2048        assert_eq!(meta.get_key_len(), 0);
2049        assert_eq!(meta.value_len(), 0);
2050        assert!(meta.is_infinite_low_fence_key());
2051    }
2052
2053    #[test]
2054    fn test_value_len() {
2055        let mut meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Insert);
2056        assert_eq!(meta.value_len(), 20);
2057
2058        meta.set_value_len(30);
2059        assert_eq!(meta.value_len(), 30);
2060    }
2061
2062    #[test]
2063    fn test_op_type() {
2064        let meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Delete);
2065        assert_eq!(meta.op_type(), OpType::Delete);
2066    }
2067
2068    #[test]
2069    fn test_mark_as_ref_and_clear_ref() {
2070        let meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Insert);
2071        assert!(!meta.is_referenced());
2072
2073        meta.mark_as_ref();
2074        assert!(meta.is_referenced());
2075
2076        meta.clear_ref();
2077        assert!(!meta.is_referenced());
2078    }
2079
2080    #[test]
2081    fn test_mark_as_deleted_and_is_deleted() {
2082        let mut meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Insert);
2083        assert!(!meta.is_deleted());
2084
2085        meta.mark_as_deleted();
2086        assert!(meta.is_deleted());
2087    }
2088
2089    /// This test verifies that the merge split key divides
2090    /// the combined records of the base page (self) and its
2091    /// to-be-merged mini-page in half
2092    #[rstest]
2093    #[case(vec![1], vec![2], 2)]
2094    #[case(vec![2], vec![1], 2)]
2095    fn test_get_merge_split_key(
2096        #[case] base_page_values: Vec<usize>,
2097        #[case] mini_page_values: Vec<usize>,
2098        #[case] splitting_key: usize,
2099    ) {
2100        let base = unsafe {
2101            &mut *LeafNode::make_base_page(4096, crate::snapshot::INVALID_SNAPSHOT_VERSION)
2102        };
2103        let mini = unsafe {
2104            &mut *LeafNode::make_base_page(4096, crate::snapshot::INVALID_SNAPSHOT_VERSION)
2105        }; // Using base page as substitute
2106
2107        // Insert values to base page and mini page accordingly
2108        for i in 0..base_page_values.len() {
2109            let n = &base_page_values[i];
2110            let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
2111            let key = cast_slice::<usize, u8>(n_slice);
2112            let value = cast_slice::<usize, u8>(n_slice);
2113
2114            let rt = base.insert(key, value, OpType::Insert, 2);
2115            assert!(rt);
2116        }
2117
2118        for i in 0..mini_page_values.len() {
2119            let n = &mini_page_values[i];
2120            let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
2121            let key = cast_slice::<usize, u8>(n_slice);
2122            let value = cast_slice::<usize, u8>(n_slice);
2123
2124            let rt = mini.insert(key, value, OpType::Insert, 2);
2125            assert!(rt);
2126        }
2127
2128        // Find the splitting key
2129        let merge_split_key_byte = base.get_merge_split_key(mini);
2130        let merge_splitting_key = cast_slice::<u8, usize>(&merge_split_key_byte);
2131
2132        assert_eq!(merge_splitting_key[0], splitting_key);
2133
2134        LeafNode::free_base_page(base);
2135        LeafNode::free_base_page(mini);
2136    }
2137
2138    /// This test verifies that a base page is correctly split
2139    /// up based on a splitting key with the left side node (self)
2140    /// containing keys less than the key and right side node (new)
2141    /// containing keys greater than or equal to the splitting key
2142    #[rstest]
2143    #[case(vec![1, 2, 3, 4], 3)]
2144    #[case(vec![1], 2)]
2145    #[case(vec![2], 2)]
2146    fn test_split_with_key(#[case] base_page_values: Vec<usize>, #[case] splitting_key: usize) {
2147        let base = unsafe {
2148            &mut *LeafNode::make_base_page(4096, crate::snapshot::INVALID_SNAPSHOT_VERSION)
2149        };
2150        let sibling = unsafe {
2151            &mut *LeafNode::make_base_page(4096, crate::snapshot::INVALID_SNAPSHOT_VERSION)
2152        };
2153
2154        // Insert values to base page
2155        for i in 0..base_page_values.len() {
2156            let n = &base_page_values[i];
2157            let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
2158            let key = cast_slice::<usize, u8>(n_slice);
2159            let value = cast_slice::<usize, u8>(n_slice);
2160
2161            let rt = base.insert(key, value, OpType::Insert, 2);
2162            assert!(rt);
2163        }
2164
2165        let splitting_key_ptr = &splitting_key;
2166        let splitting_key_slice =
2167            unsafe { std::slice::from_raw_parts(splitting_key_ptr as *const usize, 1) };
2168        let splitting_key_byte_arrary = cast_slice::<usize, u8>(splitting_key_slice).to_vec();
2169
2170        // Split the base page using the splitting key
2171        base.split_with_key(
2172            sibling,
2173            &splitting_key_byte_arrary,
2174            false,
2175            crate::snapshot::INVALID_SNAPSHOT_VERSION,
2176        );
2177
2178        // All values less than splitting key are in the left side node (self)
2179        // while those greater than or equal to the key are in the sibling node
2180        let mut out_buffer = vec![0u8; 1024];
2181        for i in 0..base_page_values.len() {
2182            let mut page = &mut *base;
2183            if base_page_values[i] >= splitting_key {
2184                page = &mut *sibling;
2185            }
2186
2187            let n = &base_page_values[i];
2188            let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
2189            let key = cast_slice::<usize, u8>(n_slice);
2190
2191            let rt = page.read_by_key(key, &mut out_buffer);
2192
2193            assert_eq!(rt, LeafReadResult::Found(key.len() as u32)); // key and value were set the same
2194            assert_eq!(&out_buffer[0..key.len()], key);
2195        }
2196
2197        LeafNode::free_base_page(base);
2198        LeafNode::free_base_page(sibling);
2199    }
2200}