1use core::panic;
5use std::{alloc::Layout, cmp::Ordering, sync::atomic};
6
7use crate::{
8 circular_buffer::CircularBufferPtr, counter, error::TreeError, utils::stats::LeafStats,
9};
10
11use super::{node_meta::NodeMeta, FENCE_KEY_CNT};
12
13#[repr(u8)]
16#[derive(PartialEq, Eq, Debug, Clone, Copy)]
17pub(crate) enum OpType {
18 Insert = 0,
19 Delete = 1,
20 Cache = 2, Phantom = 3, }
23
24impl OpType {
25 pub(crate) fn is_dirty(&self) -> bool {
26 match self {
27 OpType::Insert | OpType::Delete => true,
28 OpType::Cache | OpType::Phantom => false,
29 }
30 }
31
32 fn is_absent(&self) -> bool {
33 match self {
34 OpType::Insert | OpType::Cache => false,
35 OpType::Phantom | OpType::Delete => true,
36 }
37 }
38
39 fn is_cache(&self) -> bool {
40 match self {
41 OpType::Cache | OpType::Phantom => true,
42 OpType::Insert | OpType::Delete => false,
43 }
44 }
45}
46
47const PREVIEW_SIZE: usize = 2;
48
49const LOW_FENCE_IDX: usize = 0;
50const HIGH_FENCE_IDX: usize = 1;
51
52const OP_TYPE_SHIFT: u16 = 14;
53const KEY_LEN_MASK: u16 = 0x3F_FF; const REF_BIT_MASK: u16 = 0x80_00;
57const VALUE_LEN_MASK: u16 = 0x7F_FF; pub(crate) fn common_prefix_len(low_fence: &[u8], high_fence: &[u8]) -> u16 {
61 let mut prefix_len = 0;
62 for i in 0..std::cmp::min(low_fence.len(), high_fence.len()) {
63 if low_fence[i] == high_fence[i] {
64 prefix_len += 1;
65 } else {
66 break;
67 }
68 }
69 prefix_len
70}
71
72#[repr(C)]
73pub(crate) struct LeafKVMeta {
74 offset: u16,
75 op_type_key_len_in_byte: u16, ref_value_len_in_byte: std::sync::atomic::AtomicU16, preview_bytes: [u8; PREVIEW_SIZE],
78}
79
80impl LeafKVMeta {
81 pub(crate) fn make_prefixed_meta(
82 offset: u16,
83 value_len: u16,
84 key: &[u8],
85 prefix_len: u16,
86 op_type: OpType,
87 ) -> Self {
88 let mut meta = Self {
89 offset,
90 op_type_key_len_in_byte: key.len() as u16 | ((op_type as u16) << OP_TYPE_SHIFT),
91 ref_value_len_in_byte: std::sync::atomic::AtomicU16::new(value_len),
92 preview_bytes: [0; PREVIEW_SIZE],
93 };
94
95 for i in 0..std::cmp::min(key.len().saturating_sub(prefix_len as usize), PREVIEW_SIZE) {
96 meta.preview_bytes[i] = key[i + prefix_len as usize];
97 }
98
99 meta.clear_ref();
102 meta
103 }
104
105 pub fn make_infinite_high_fence_key() -> Self {
106 assert_eq!(std::mem::size_of::<Self>(), super::KV_META_SIZE);
107
108 Self {
109 offset: u16::MAX,
110 op_type_key_len_in_byte: 0,
111 ref_value_len_in_byte: std::sync::atomic::AtomicU16::new(0),
112 preview_bytes: [0; PREVIEW_SIZE],
113 }
114 }
115
116 pub fn make_infinite_low_fence_key() -> Self {
117 Self {
118 offset: u16::MAX - 1,
119 op_type_key_len_in_byte: 0,
120 ref_value_len_in_byte: std::sync::atomic::AtomicU16::new(0),
121 preview_bytes: [0; PREVIEW_SIZE],
122 }
123 }
124
125 pub fn is_infinite_low_fence_key(&self) -> bool {
126 self.offset == u16::MAX - 1
127 }
128
129 pub fn is_infinite_high_fence_key(&self) -> bool {
130 self.offset == u16::MAX
131 }
132
133 pub fn value_len(&self) -> u16 {
134 self.ref_value_len_in_byte.load(atomic::Ordering::Relaxed) & VALUE_LEN_MASK
135 }
136
137 pub fn set_value_len(&mut self, value: u16) {
138 let v = self.ref_value_len_in_byte.load(atomic::Ordering::Relaxed);
139 self.ref_value_len_in_byte.store(
140 (v & !VALUE_LEN_MASK) | (value & VALUE_LEN_MASK),
141 atomic::Ordering::Relaxed,
142 );
143 }
144
145 pub fn set_op_type(&mut self, op_type: OpType) {
146 self.op_type_key_len_in_byte = self.get_key_len() | ((op_type as u16) << OP_TYPE_SHIFT);
147 }
148
149 pub fn op_type(&self) -> OpType {
150 let l = self.op_type_key_len_in_byte;
151 let b = l >> OP_TYPE_SHIFT;
152 unsafe { std::mem::transmute(b as u8) }
153 }
154
155 pub fn mark_as_ref(&self) {
156 self.ref_value_len_in_byte
157 .fetch_or(REF_BIT_MASK, atomic::Ordering::Relaxed);
158 }
159
160 pub fn clear_ref(&self) {
161 self.ref_value_len_in_byte
162 .fetch_and(!REF_BIT_MASK, atomic::Ordering::Relaxed);
163 }
164
165 pub fn is_referenced(&self) -> bool {
166 self.ref_value_len_in_byte.load(atomic::Ordering::Relaxed) & REF_BIT_MASK != 0
167 }
168
169 pub fn mark_as_deleted(&mut self) {
170 self.set_op_type(OpType::Delete);
171 }
172
173 #[allow(dead_code)]
174 pub fn is_deleted(&self) -> bool {
175 self.op_type() == OpType::Delete
176 }
177
178 pub(crate) fn get_offset(&self) -> u16 {
179 self.offset
180 }
181
182 pub(crate) fn get_key_len(&self) -> u16 {
183 self.op_type_key_len_in_byte & KEY_LEN_MASK
184 }
185}
186
187#[derive(Debug, Clone, Copy)]
188pub(crate) struct MiniPageNextLevel {
189 val: usize,
190}
191
192impl MiniPageNextLevel {
193 pub(crate) fn new(val: usize) -> Self {
194 Self { val }
195 }
196
197 pub(crate) fn as_offset(&self) -> usize {
198 assert!(!self.is_null());
199 self.val
200 }
201
202 pub(crate) fn new_null() -> Self {
203 Self { val: usize::MAX }
204 }
205
206 pub(crate) fn is_null(&self) -> bool {
207 self.val == usize::MAX
208 }
209}
210
211#[repr(C)]
212pub(crate) struct LeafNode {
213 pub(crate) meta: NodeMeta,
214 prefix_len: u16,
215 pub(crate) next_level: MiniPageNextLevel,
216 pub(crate) lsn: u64,
217 data: [u8; 0],
218}
219
220const _: () = assert!(std::mem::size_of::<LeafNode>() == 24);
221
222impl LeafNode {
223 fn max_data_size(node_size: usize) -> usize {
224 node_size - std::mem::size_of::<LeafNode>()
225 }
226
227 pub(crate) fn initialize_mini_page(
228 ptr: &CircularBufferPtr,
229 node_size: usize,
230 next_level: MiniPageNextLevel,
231 cache_only: bool,
232 ) {
233 unsafe {
234 Self::init_node_with_fence(
235 ptr.as_ptr(),
236 &[],
237 &[],
238 node_size,
239 next_level,
240 false,
241 cache_only,
242 )
243 }
244 }
245
246 pub(crate) fn make_base_page(node_size: usize) -> *mut Self {
247 let layout = Layout::from_size_align(node_size, std::mem::align_of::<LeafNode>()).unwrap();
248 let ptr = unsafe { std::alloc::alloc(layout) };
249 unsafe {
250 Self::init_node_with_fence(
251 ptr,
252 &[],
253 &[],
254 node_size,
255 MiniPageNextLevel::new_null(),
256 true,
257 false,
258 );
259 }
260 ptr as *mut Self
261 }
262
263 pub(crate) fn covert_insert_records_to_cache(&mut self) {
265 let mut cur = self.first_meta_pos_after_fence();
266
267 while cur < self.meta.meta_count_with_fence() {
268 let meta = self.get_kv_meta_mut(cur as usize);
269 let op_type = meta.op_type();
270
271 match op_type {
272 OpType::Insert => {
273 meta.set_op_type(OpType::Cache);
274 }
275 OpType::Delete => {
276 meta.set_op_type(OpType::Phantom);
277 }
278 OpType::Cache | OpType::Phantom => {
279 unreachable!("Base page should not have op type: {:?}", op_type);
280 }
281 };
282
283 cur += 1;
284 }
285 }
286
287 pub(crate) fn convert_cache_records_to_insert(&mut self) {
289 let mut cur = self.first_meta_pos_after_fence();
290
291 while cur < self.meta.meta_count_with_fence() {
292 let meta = self.get_kv_meta_mut(cur as usize);
293 let op_type = meta.op_type();
294
295 match op_type {
296 OpType::Cache => {
297 meta.set_op_type(OpType::Insert);
298 }
299 OpType::Phantom => {
300 meta.set_op_type(OpType::Delete);
301 }
302 OpType::Insert | OpType::Delete => {
303 }
305 };
306
307 cur += 1;
308 }
309 }
310
311 unsafe fn init_node_with_fence(
312 ptr: *mut u8,
313 low_fence: &[u8],
314 high_fence: &[u8],
315 node_size: usize,
316 next_level: MiniPageNextLevel,
317 has_fence: bool,
318 cache_only: bool,
319 ) {
320 let ptr = ptr as *mut Self;
321
322 { &mut *ptr }.initialize(
323 low_fence, high_fence, node_size, next_level, has_fence, cache_only,
324 );
325 }
326
327 pub(crate) fn get_kv_meta(&self, index: usize) -> &LeafKVMeta {
328 debug_assert!(index < self.meta.meta_count_with_fence() as usize);
329 let meta_ptr = self.data.as_ptr() as *const LeafKVMeta;
330 unsafe { &*meta_ptr.add(index) }
331 }
332
333 pub(crate) fn get_full_key(&self, meta: &LeafKVMeta) -> Vec<u8> {
334 let prefix = self.get_prefix();
335 let remaining = self.get_remaining_key(meta);
336 [prefix, remaining].concat()
337 }
338
339 pub(crate) fn get_low_fence_full_key(&self) -> Vec<u8> {
341 debug_assert!(LOW_FENCE_IDX < self.meta.meta_count_with_fence() as usize);
342 let meta_ptr = self.data.as_ptr() as *const LeafKVMeta;
343 let meta = unsafe { &*meta_ptr.add(LOW_FENCE_IDX) };
344
345 let key_offset = meta.get_offset();
346 let key_ptr = unsafe { self.data.as_ptr().add(key_offset as usize) };
347 unsafe {
348 let key = std::slice::from_raw_parts(key_ptr, (meta.get_key_len()) as usize);
349 [key].concat()
350 }
351 }
352
353 pub(crate) fn get_kv_meta_mut(&mut self, index: usize) -> &mut LeafKVMeta {
354 debug_assert!(index < self.meta.meta_count_with_fence() as usize);
355 let meta_ptr = self.data.as_mut_ptr() as *mut LeafKVMeta;
356 unsafe { meta_ptr.add(index).as_mut().unwrap() }
357 }
358
359 pub(crate) fn write_initial_kv_meta(&mut self, index: usize, meta: LeafKVMeta) {
360 let meta_ptr = self.data.as_mut_ptr() as *mut LeafKVMeta;
361 unsafe { meta_ptr.add(index).write(meta) };
362 }
363
364 pub(crate) fn get_remaining_key(&self, meta: &LeafKVMeta) -> &[u8] {
365 let key_offset = meta.get_offset();
366 let key_ptr = unsafe { self.data.as_ptr().add(key_offset as usize) };
367 unsafe {
368 std::slice::from_raw_parts(key_ptr, (meta.get_key_len() - self.prefix_len) as usize)
369 }
370 }
371
372 pub(crate) fn get_prefix(&self) -> &[u8] {
373 let m = self.get_kv_meta(LOW_FENCE_IDX);
374 let key_offset = m.get_offset();
375 unsafe {
376 std::slice::from_raw_parts(
377 self.data.as_ptr().add(key_offset as usize),
378 self.prefix_len as usize,
379 )
380 }
381 }
382
383 pub(crate) fn install_fence_key(&mut self, key: &[u8], is_high_fence: bool) {
384 let loc: u16 = self.meta.meta_count_with_fence();
385 if !is_high_fence {
386 debug_assert!(self.meta.meta_count_with_fence() == LOW_FENCE_IDX as u16);
387 } else {
388 debug_assert!(self.meta.meta_count_with_fence() == HIGH_FENCE_IDX as u16);
389 }
390
391 let cur_low_offset = self.current_lowest_offset();
392
393 self.meta.increment_value_count();
394 self.meta.remaining_size -= std::mem::size_of::<LeafKVMeta>() as u16;
395
396 if key.is_empty() {
397 let fence = if is_high_fence {
398 LeafKVMeta::make_infinite_high_fence_key()
399 } else {
400 LeafKVMeta::make_infinite_low_fence_key()
401 };
402 self.write_initial_kv_meta(loc as usize, fence);
403 } else {
404 let prefix_len = if is_high_fence { self.prefix_len } else { 0 }; let post_fix_len = key.len() as u16 - prefix_len;
407 let val_len = 0u16;
408 let kv_len = post_fix_len + val_len;
409
410 let remaining = self.meta.remaining_size;
411
412 debug_assert!(kv_len <= remaining);
413
414 let offset = cur_low_offset - kv_len;
415
416 let new_meta =
417 LeafKVMeta::make_prefixed_meta(offset, 0, key, prefix_len, OpType::Insert);
418
419 self.write_initial_kv_meta(loc as usize, new_meta);
420
421 unsafe {
422 let start_ptr = self.data.as_mut_ptr().add(offset as usize);
423
424 let key_slice = std::slice::from_raw_parts(
425 key.as_ptr().add(prefix_len as usize),
426 post_fix_len as usize,
427 );
428 std::ptr::copy_nonoverlapping(key_slice.as_ptr(), start_ptr, post_fix_len as usize);
429 }
430
431 self.meta.remaining_size -= kv_len;
432 }
433 }
434
435 pub(crate) fn current_lowest_offset(&self) -> u16 {
436 let value_count = self.meta.meta_count_with_fence();
437 let rt =
438 (value_count * std::mem::size_of::<LeafKVMeta>() as u16) + self.meta.remaining_size;
439
440 #[cfg(debug_assertions)]
442 {
443 let mut min_offset = LeafNode::max_data_size(self.meta.node_size as usize) as u16;
444 for i in 0..value_count {
445 let kv_meta = self.get_kv_meta(i as usize);
446 if kv_meta.is_infinite_low_fence_key() || kv_meta.is_infinite_high_fence_key() {
447 continue;
448 }
449 min_offset = std::cmp::min(min_offset, kv_meta.offset);
450 }
451 assert!(min_offset == rt);
452 }
453 rt
454 }
455
456 pub(crate) fn get_value(&self, meta: &LeafKVMeta) -> &[u8] {
457 let val_offset = meta.get_offset() + meta.get_key_len() - self.prefix_len;
458 let val_ptr = unsafe { self.data.as_ptr().add(val_offset as usize) };
459 unsafe { std::slice::from_raw_parts(val_ptr, meta.value_len() as usize) }
460 }
461
462 pub fn get_split_key(&self, cache_only: bool) -> (Vec<u8>, u16) {
473 let mut data_size = 0;
474
475 for meta in self.meta_iter() {
476 let key_len = meta.get_key_len();
477 let value_len = meta.value_len();
478
479 data_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
480 }
481
482 let split_target_size = data_size / 2;
483 let mut cur_size = 0;
484
485 for (i, meta) in self.meta_iter().enumerate() {
486 let key_len = meta.get_key_len();
487 let value_len = meta.value_len();
488
489 cur_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
490
491 if cache_only && i == 0 {
493 continue;
494 }
495
496 if cur_size >= split_target_size {
497 return (self.get_full_key(meta), i as u16);
498 }
499 }
500 unreachable!();
501 }
502
503 #[allow(clippy::unused_enumerate_index)]
505 pub fn get_kv_num_below_key(&self, merge_split_key: &Vec<u8>) -> u16 {
506 let mut cnt: u16 = 0;
508 for (_, meta) in self.meta_iter().enumerate() {
509 let key = self.get_full_key(meta);
510 let cmp = key.cmp(merge_split_key);
511 if cmp == std::cmp::Ordering::Less {
514 cnt += 1;
515 } else {
516 break;
517 }
518 }
519 cnt
520 }
521
522 pub fn get_cache_only_insert_split_key(&self, key: &[u8], new_record_size: &u16) -> Vec<u8> {
527 let mut merge_split_key_1: Option<Vec<u8>> = None;
528 let mut merge_split_key_2: Option<Vec<u8>> = None;
529 let mut diff_1: i16 = i16::MAX;
530 let mut diff_2: i16 = i16::MAX;
531
532 let mut total_merged_size: u16 = 0;
534
535 for meta in self.meta_iter() {
536 let key_len = meta.get_key_len();
537 let value_len = meta.value_len();
538
539 total_merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
540 }
541
542 total_merged_size += new_record_size + std::mem::size_of::<LeafKVMeta>() as u16;
543 let split_target_size = total_merged_size / 2;
544
545 let mut merged_size: u16 = 0;
547 let mut self_meta_iter = self.meta_iter();
548 let mut self_meta_option = self_meta_iter.next();
549
550 if self_meta_option.is_some() {
552 let mut cur_base_meta = self_meta_option.unwrap();
553 let mut cur_base_key = self.get_full_key(cur_base_meta);
554 let mut cmp = cur_base_key.as_slice().cmp(key);
555
556 while cmp == std::cmp::Ordering::Less {
557 let key_len = cur_base_meta.get_key_len();
558 let value_len = cur_base_meta.value_len();
559 merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
560 if merged_size >= split_target_size {
561 if merge_split_key_2.is_some() {
564 break;
565 }
566
567 let left_side: u16 = merged_size
568 - key_len
569 - value_len
570 - std::mem::size_of::<LeafKVMeta>() as u16;
571 let right_side = total_merged_size - left_side;
572
573 if merge_split_key_1.is_none() {
574 merge_split_key_1 = Some(cur_base_key);
575 diff_1 = (left_side as i16 - right_side as i16).abs();
576 } else {
577 merge_split_key_2 = Some(cur_base_key);
578 diff_2 = (left_side as i16 - right_side as i16).abs();
579 }
580 }
581
582 self_meta_option = self_meta_iter.next();
583 if let Some(meta) = self_meta_option {
584 cur_base_meta = meta;
585 cur_base_key = self.get_full_key(cur_base_meta);
586 cmp = cur_base_key.as_slice().cmp(key);
587 } else {
588 break;
589 }
590 }
591 }
592
593 if merge_split_key_2.is_none() {
595 merged_size += new_record_size + std::mem::size_of::<LeafKVMeta>() as u16;
596
597 if merged_size >= split_target_size {
598 let left_side: u16 =
602 merged_size - new_record_size - std::mem::size_of::<LeafKVMeta>() as u16;
603 let right_side = total_merged_size - left_side;
604
605 if merge_split_key_1.is_none() {
606 merge_split_key_1 = Some(key.to_vec());
607 diff_1 = (left_side as i16 - right_side as i16).abs();
608 } else {
609 merge_split_key_2 = Some(key.to_vec());
610 diff_2 = (left_side as i16 - right_side as i16).abs();
611 }
612 }
613 }
614
615 while self_meta_option.is_some() {
617 let base_meta = self_meta_option.unwrap();
618 let key_len = base_meta.get_key_len();
619 let value_len = base_meta.value_len();
620 merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
621
622 if merged_size >= split_target_size {
623 let cur_base_key = self.get_full_key(base_meta);
625 if merge_split_key_2.is_some() {
626 break;
627 }
628
629 let left_side: u16 =
630 merged_size - key_len - value_len - std::mem::size_of::<LeafKVMeta>() as u16;
631 let right_side = total_merged_size - left_side;
632
633 if merge_split_key_1.is_none() {
634 merge_split_key_1 = Some(cur_base_key);
635 diff_1 = (left_side as i16 - right_side as i16).abs();
636 } else {
637 merge_split_key_2 = Some(cur_base_key);
638 diff_2 = (left_side as i16 - right_side as i16).abs();
639 }
640 }
641 self_meta_option = self_meta_iter.next();
642 }
643
644 if merge_split_key_1.is_none() {
647 panic!(
648 "Fail to find a splitting key for merging mini and base page.{}, {}",
649 merged_size, split_target_size
650 );
651 }
652
653 if merge_split_key_2.is_none() || diff_1 < diff_2 {
654 return merge_split_key_1.unwrap();
655 }
656
657 merge_split_key_2.unwrap()
658 }
659
660 #[allow(clippy::unnecessary_unwrap)]
667 pub(crate) fn get_merge_split_key(&mut self, mini_page: &LeafNode) -> Vec<u8> {
668 let mut merge_split_key_1: Option<Vec<u8>> = None;
669 let mut merge_split_key_2: Option<Vec<u8>> = None;
670 let mut diff_1: i16 = i16::MAX;
671 let mut diff_2: i16 = i16::MAX;
672
673 let mut total_merged_size: u16 = 0;
674 let mut base_meta_iter = self.meta_iter();
675 let mut cur_pos = base_meta_iter.cur;
676 let mut cur_base_meta_option = base_meta_iter.next();
677 let mut duplicate_positions = vec![];
678
679 for mini_meta in mini_page.meta_iter() {
681 let c_type = mini_meta.op_type();
682 if !c_type.is_dirty() {
684 continue;
685 }
686
687 let cur_mini_key = mini_page.get_full_key(mini_meta);
688 if cur_base_meta_option.is_some() {
689 let mut cur_base_meta = cur_base_meta_option.unwrap();
690 let mut cur_base_key = self.get_full_key(cur_base_meta);
691 let mut cmp = cur_base_key.cmp(&cur_mini_key);
692
693 while cmp == std::cmp::Ordering::Less {
696 let key_len = cur_base_meta.get_key_len();
697 let value_len = cur_base_meta.value_len();
698 total_merged_size +=
699 key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
700
701 cur_pos = base_meta_iter.cur;
702 cur_base_meta_option = base_meta_iter.next();
703 if cur_base_meta_option.is_some() {
704 cur_base_meta = cur_base_meta_option.unwrap();
705 cur_base_key = self.get_full_key(cur_base_meta);
706 cmp = cur_base_key.cmp(&cur_mini_key);
707 } else {
708 break;
709 }
710 }
711
712 if cmp == std::cmp::Ordering::Equal && cur_base_meta_option.is_some() {
715 duplicate_positions.push(cur_pos);
717 cur_pos = base_meta_iter.cur;
718 cur_base_meta_option = base_meta_iter.next();
719 }
720 }
721
722 let key_len = mini_meta.get_key_len();
723 let value_len = mini_meta.value_len();
724 total_merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
725 }
726
727 while cur_base_meta_option.is_some() {
730 let base_meta = cur_base_meta_option.unwrap();
731 let key_len = base_meta.get_key_len();
732 let value_len = base_meta.value_len();
733 total_merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
734
735 cur_base_meta_option = base_meta_iter.next();
736 }
737
738 let split_target_size = total_merged_size / 2;
740
741 let mut merged_size: u16 = 0;
744 base_meta_iter = self.meta_iter();
745 cur_base_meta_option = base_meta_iter.next();
746
747 for mini_meta in mini_page.meta_iter() {
748 let c_type = mini_meta.op_type();
751 if !c_type.is_dirty() {
752 continue;
753 }
754
755 let cur_mini_key = mini_page.get_full_key(mini_meta);
756 if cur_base_meta_option.is_some() {
757 let mut cur_base_meta = cur_base_meta_option.unwrap();
758 let mut cur_base_key = self.get_full_key(cur_base_meta);
759 let mut cmp = cur_base_key.cmp(&cur_mini_key);
760 while cmp == std::cmp::Ordering::Less {
763 let key_len = cur_base_meta.get_key_len();
764 let value_len = cur_base_meta.value_len();
765 merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
766 if merged_size >= split_target_size {
767 if merge_split_key_2.is_some() {
770 break;
771 }
772
773 let left_side: u16 = merged_size
774 - key_len
775 - value_len
776 - std::mem::size_of::<LeafKVMeta>() as u16;
777 let right_side = total_merged_size - left_side;
778
779 if merge_split_key_1.is_none() {
780 merge_split_key_1 = Some(cur_base_key);
781 diff_1 = (left_side as i16 - right_side as i16).abs();
782 } else {
783 merge_split_key_2 = Some(cur_base_key);
784 diff_2 = (left_side as i16 - right_side as i16).abs();
785 }
786 }
787
788 cur_base_meta_option = base_meta_iter.next();
789 if cur_base_meta_option.is_some() {
790 cur_base_meta = cur_base_meta_option.unwrap();
791 cur_base_key = self.get_full_key(cur_base_meta);
792 cmp = cur_base_key.cmp(&cur_mini_key);
793 } else {
794 break;
795 }
796 }
797
798 if cmp == std::cmp::Ordering::Equal && cur_base_meta_option.is_some() {
801 cur_base_meta_option = base_meta_iter.next();
802 }
803 }
804
805 let key_len = mini_meta.get_key_len();
806 let value_len = mini_meta.value_len();
807 merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
808
809 if merged_size >= split_target_size {
810 if merge_split_key_2.is_some() {
813 break;
814 }
815
816 let left_side: u16 =
817 merged_size - key_len - value_len - std::mem::size_of::<LeafKVMeta>() as u16;
818 let right_side = total_merged_size - left_side;
819
820 if merge_split_key_1.is_none() {
821 merge_split_key_1 = Some(cur_mini_key);
822 diff_1 = (left_side as i16 - right_side as i16).abs();
823 } else {
824 merge_split_key_2 = Some(cur_mini_key);
825 diff_2 = (left_side as i16 - right_side as i16).abs();
826 }
827 }
828 }
829
830 while cur_base_meta_option.is_some() {
833 let base_meta = cur_base_meta_option.unwrap();
834 let key_len = base_meta.get_key_len();
835 let value_len = base_meta.value_len();
836 merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
837
838 if merged_size >= split_target_size {
839 let cur_base_key = self.get_full_key(base_meta);
841 if merge_split_key_2.is_some() {
842 break;
843 }
844
845 let left_side: u16 =
846 merged_size - key_len - value_len - std::mem::size_of::<LeafKVMeta>() as u16;
847 let right_side = total_merged_size - left_side;
848
849 if merge_split_key_1.is_none() {
850 merge_split_key_1 = Some(cur_base_key);
851 diff_1 = (left_side as i16 - right_side as i16).abs();
852 } else {
853 merge_split_key_2 = Some(cur_base_key);
854 diff_2 = (left_side as i16 - right_side as i16).abs();
855 }
856 }
857 cur_base_meta_option = base_meta_iter.next();
858 }
859
860 for pos in duplicate_positions {
862 let pos_meta = self.get_kv_meta_mut(pos);
863 pos_meta.mark_as_deleted();
864 }
865
866 if merge_split_key_1.is_none() {
867 unreachable!(
868 "Fail to find a splitting key for merging mini and base page.{}, {}",
869 merged_size, split_target_size
870 );
871 }
872
873 if merge_split_key_2.is_some() {
875 let cmp = merge_split_key_1
876 .as_ref()
877 .unwrap()
878 .cmp(merge_split_key_2.as_ref().unwrap());
879 assert_ne!(cmp, std::cmp::Ordering::Equal);
880 }
881
882 let mut splitting_key = if merge_split_key_2.is_none() || diff_1 < diff_2 {
883 merge_split_key_1.as_ref().unwrap()
884 } else {
885 merge_split_key_2.as_ref().unwrap()
886 };
887
888 let mut fence_meta = self.get_kv_meta(LOW_FENCE_IDX);
891 if !fence_meta.is_infinite_low_fence_key() {
892 let low_fence_key = self.get_low_fence_key();
893 let cmp = splitting_key.cmp(&low_fence_key);
894 if cmp == std::cmp::Ordering::Equal {
895 splitting_key = merge_split_key_2.as_ref().unwrap();
896 }
897 }
898
899 fence_meta = self.get_kv_meta(HIGH_FENCE_IDX);
900 if !fence_meta.is_infinite_high_fence_key() {
901 let high_fence_key = self.get_high_fence_key();
902 let cmp = splitting_key.cmp(&high_fence_key);
903 assert_ne!(cmp, std::cmp::Ordering::Equal);
904 }
905
906 splitting_key.clone()
907 }
908
909 pub fn get_key_to_reach_this_node(&self) -> Vec<u8> {
910 let meta = self.get_kv_meta(self.first_meta_pos_after_fence() as usize);
911 self.get_full_key(meta)
912 }
913
914 pub fn try_get_key_to_reach_this_node(&self) -> Result<Vec<u8>, TreeError> {
918 assert!(self.meta.is_cache_only_leaf());
919
920 let index = 0;
922 if index >= self.meta.meta_count_with_fence() as usize {
923 return Err(TreeError::NeedRestart);
924 }
925
926 let meta_ptr = self.data.as_ptr() as *const LeafKVMeta;
927 let meta = unsafe { &*meta_ptr.add(index) };
928
929 let key_offset = meta.get_offset();
930
931 if key_offset + meta.get_key_len()
932 > LeafNode::max_data_size(self.meta.node_size as usize) as u16
933 {
934 return Err(TreeError::NeedRestart);
935 }
936
937 let key_ptr = unsafe { self.data.as_ptr().add(key_offset as usize) };
938 let key_slice = unsafe {
939 std::slice::from_raw_parts(key_ptr, (meta.get_key_len() - self.prefix_len) as usize)
940 };
941 Ok(key_slice.to_vec())
942 }
943
944 pub fn get_low_fence_key(&self) -> Vec<u8> {
945 assert!(self.has_fence());
946 let fence_meta = self.get_kv_meta(LOW_FENCE_IDX);
947 if fence_meta.is_infinite_low_fence_key() {
948 vec![]
949 } else {
950 unsafe {
951 let start_ptr = self.data.as_ptr().add(fence_meta.offset as usize);
952 let key_slice =
953 std::slice::from_raw_parts(start_ptr, fence_meta.get_key_len() as usize);
954 key_slice.to_vec()
955 }
956 }
957 }
958
959 pub fn get_high_fence_key(&self) -> Vec<u8> {
960 assert!(self.has_fence());
961 let fence_meta = self.get_kv_meta(HIGH_FENCE_IDX);
962 if fence_meta.is_infinite_high_fence_key() {
963 vec![]
964 } else {
965 self.get_full_key(fence_meta)
966 }
967 }
968
969 #[inline]
972 pub(crate) fn key_cmp(&self, meta: &LeafKVMeta, key: &[u8]) -> Ordering {
973 let search_key_prefix = &key[(self.prefix_len as usize)..]
974 [..std::cmp::min(key.len() - self.prefix_len as usize, PREVIEW_SIZE)];
975
976 let prefix_key = &meta.preview_bytes[..std::cmp::min(
977 PREVIEW_SIZE,
978 meta.get_key_len() as usize - self.prefix_len as usize,
979 )];
980 let mut cmp = prefix_key.cmp(search_key_prefix);
981
982 if cmp == Ordering::Equal {
984 let full_key = self.get_remaining_key(meta);
985 let search_key_postfix = &key[self.prefix_len as usize..];
986 cmp = full_key.cmp(search_key_postfix);
987 }
988 cmp
989 }
990
991 pub(crate) fn linear_lower_bound(&self, key: &[u8]) -> u16 {
992 debug_assert!(key.len() >= self.prefix_len as usize);
993
994 let mut index = self.first_meta_pos_after_fence();
995
996 while index < self.meta.meta_count_with_fence() {
997 let key_meta = self.get_kv_meta(index as usize);
998
999 #[cfg(target_arch = "x86_64")]
1000 unsafe {
1001 core::arch::x86_64::_mm_clflush(key_meta as *const LeafKVMeta as *const u8);
1003 }
1004 let cmp = self.key_cmp(key_meta, key);
1005
1006 if cmp != Ordering::Less {
1007 return index;
1008 }
1009
1010 index += 1;
1011 }
1012
1013 index
1014 }
1015
1016 pub(crate) fn lower_bound(&self, key: &[u8]) -> u16 {
1017 let mut lower = self.first_meta_pos_after_fence();
1018 let mut upper = self.meta.meta_count_with_fence();
1019
1020 let search_key_prefix = &key[(self.prefix_len as usize)..]
1021 [..std::cmp::min(key.len() - self.prefix_len as usize, PREVIEW_SIZE)];
1022
1023 debug_assert!(key.len() >= self.prefix_len as usize);
1024
1025 while lower < upper {
1026 let mid = lower + (upper - lower) / 2;
1027 let key_meta = self.get_kv_meta(mid as usize);
1028
1029 let prefix_key = &key_meta.preview_bytes[..std::cmp::min(
1030 PREVIEW_SIZE,
1031 key_meta.get_key_len() as usize - self.prefix_len as usize,
1032 )];
1033 let mut cmp = prefix_key.cmp(search_key_prefix);
1034
1035 if cmp == Ordering::Equal {
1037 let remaining_key = self.get_remaining_key(key_meta);
1038 let search_key_postfix = &key[self.prefix_len as usize..];
1039 cmp = remaining_key.cmp(search_key_postfix);
1040 }
1041
1042 match cmp {
1043 Ordering::Greater => {
1044 upper = mid;
1045 }
1046 Ordering::Equal => {
1047 return mid;
1048 }
1049 Ordering::Less => {
1050 lower = mid + 1;
1051 }
1052 }
1053 }
1054 lower
1055 }
1056
1057 pub(crate) fn insert(
1065 &mut self,
1066 key: &[u8],
1067 value: &[u8],
1068 op_type: OpType,
1069 max_fence_len: usize,
1070 ) -> bool {
1071 debug_assert!(key.len() as u16 >= self.prefix_len);
1072 match op_type {
1073 OpType::Insert | OpType::Cache => {
1074 debug_assert!(!value.is_empty());
1075 }
1076 OpType::Delete | OpType::Phantom => {}
1077 }
1078
1079 let post_fix_len = key.len() as u16 - self.prefix_len;
1080 let val_len = value.len() as u16;
1081 let kv_len = post_fix_len + val_len;
1082
1083 let value_count_with_fence = self.meta.meta_count_with_fence();
1084
1085 let pos = self.lower_bound(key) as usize;
1086
1087 if pos < value_count_with_fence as usize {
1088 let prefix_len = self.prefix_len as usize;
1089 let pos_meta = self.get_kv_meta(pos);
1090 let pos_key = self.get_remaining_key(pos_meta);
1091 let search_key_postfix = &key[prefix_len..];
1092 if pos_key.cmp(search_key_postfix) == Ordering::Equal {
1093 counter!(LeafInsertDuplicate);
1095 if op_type == OpType::Delete {
1096 let pos_meta = self.get_kv_meta_mut(pos);
1097 pos_meta.mark_as_deleted();
1098 return true;
1099 }
1100
1101 let pos_value = self.get_value(pos_meta);
1102 let pos_value_len = pos_value.len() as u16;
1103
1104 if pos_value_len >= val_len {
1105 unsafe {
1107 let pair_ptr = self.data.as_ptr().add(pos_meta.offset as usize) as *mut u8;
1108 std::ptr::copy_nonoverlapping(
1109 value.as_ptr(),
1110 pair_ptr.add(post_fix_len as usize),
1111 val_len as usize,
1112 );
1113 }
1114 let pos_meta = self.get_kv_meta_mut(pos);
1115 pos_meta.set_value_len(val_len);
1116 pos_meta.set_op_type(op_type);
1117 return true;
1118 }
1119
1120 if self.meta.remaining_size < kv_len {
1121 return false;
1122 }
1123 assert!(op_type != OpType::Cache);
1124 let offset = self.current_lowest_offset() - kv_len;
1125 unsafe {
1126 let pair_ptr = self.data.as_ptr().add(offset as usize) as *mut u8;
1127
1128 let pos_meta = self.get_kv_meta_mut(pos);
1129 pos_meta.set_value_len(val_len);
1130 pos_meta.set_op_type(op_type);
1131 pos_meta.offset = offset;
1132 std::ptr::copy_nonoverlapping(
1133 key[self.prefix_len as usize..].as_ptr(),
1134 pair_ptr,
1135 post_fix_len as usize,
1136 );
1137 std::ptr::copy_nonoverlapping(
1138 value.as_ptr(),
1139 pair_ptr.add(post_fix_len as usize),
1140 val_len as usize,
1141 );
1142 }
1143 self.meta.remaining_size -= kv_len;
1144 return true;
1145 }
1146 }
1147
1148 if op_type == OpType::Delete && (self.is_base_page()) {
1152 return true;
1154 }
1155
1156 if self.full_with_fences(
1158 kv_len + std::mem::size_of::<LeafKVMeta>() as u16,
1159 max_fence_len,
1160 ) {
1161 return false;
1162 }
1163
1164 counter!(LeafInsertNew);
1165
1166 let offset = self.current_lowest_offset() - kv_len;
1167 let new_meta =
1168 LeafKVMeta::make_prefixed_meta(offset, val_len, key, self.prefix_len, op_type);
1169
1170 unsafe {
1171 let metas_size = std::mem::size_of::<LeafKVMeta>()
1172 * (self.meta.meta_count_with_fence() - pos as u16) as usize;
1173 let src_ptr = self
1174 .data
1175 .as_ptr()
1176 .add(pos * std::mem::size_of::<LeafKVMeta>());
1177 let dest_ptr = self
1178 .data
1179 .as_mut_ptr()
1180 .add((pos + 1) * std::mem::size_of::<LeafKVMeta>());
1181 std::ptr::copy(src_ptr, dest_ptr, metas_size);
1182
1183 self.write_initial_kv_meta(pos, new_meta);
1184
1185 let pair_ptr = self.data.as_mut_ptr().add(offset as usize);
1186 std::ptr::copy_nonoverlapping(
1187 key[self.prefix_len as usize..].as_ptr(),
1188 pair_ptr,
1189 post_fix_len as usize,
1190 );
1191 std::ptr::copy_nonoverlapping(
1192 value.as_ptr(),
1193 pair_ptr.add(post_fix_len as usize),
1194 val_len as usize,
1195 );
1196 }
1197
1198 self.meta.remaining_size -= kv_len + std::mem::size_of::<LeafKVMeta>() as u16;
1199 self.meta.increment_value_count();
1200 true
1201 }
1202
1203 pub(crate) fn is_base_page(&self) -> bool {
1205 (!self.meta.is_cache_only_leaf()) && self.next_level.is_null()
1206 }
1207
1208 pub fn split(&mut self, sibling: &mut LeafNode, cache_only: bool) -> Vec<u8> {
1212 if cache_only {
1213 assert!(self.meta.is_cache_only_leaf() && !self.has_fence());
1214 } else {
1215 assert!(self.is_base_page() && self.has_fence());
1216 }
1217
1218 let current_count = self.meta.meta_count_without_fence();
1219
1220 let (splitting_key, new_cur_count) = self.get_split_key(cache_only);
1225 let sibling_cnt = current_count - new_cur_count;
1226
1227 if !cache_only {
1231 let low_fence_for_right = self.get_kv_meta(FENCE_KEY_CNT + new_cur_count as usize);
1232 assert!(!low_fence_for_right.is_infinite_low_fence_key());
1233 let high_fence_for_right = self.get_kv_meta(HIGH_FENCE_IDX);
1234
1235 let low_fence_key = self.get_full_key(low_fence_for_right);
1236 let high_fence_key = if high_fence_for_right.is_infinite_high_fence_key() {
1237 vec![]
1238 } else {
1239 self.get_full_key(high_fence_for_right)
1240 };
1241
1242 sibling.initialize(
1243 &low_fence_key,
1244 &high_fence_key,
1245 sibling.meta.node_size as usize,
1246 MiniPageNextLevel::new_null(),
1247 true,
1248 cache_only,
1249 );
1250 } else {
1251 sibling.initialize(
1252 &[],
1253 &[],
1254 sibling.meta.node_size as usize,
1255 MiniPageNextLevel::new_null(),
1256 false,
1257 cache_only,
1258 );
1259 }
1260
1261 let starting_kv_idx = if cache_only { 0 } else { FENCE_KEY_CNT };
1262
1263 for i in 0..sibling_cnt {
1264 let kv_meta = self.get_kv_meta((new_cur_count + i) as usize + starting_kv_idx);
1265 if kv_meta.op_type() == OpType::Delete {
1266 continue;
1268 }
1269 let key = self.get_full_key(kv_meta);
1270 let value = self.get_value(kv_meta);
1271 let insert_rt = sibling.insert(&key, value, OpType::Insert, 0);
1272 assert!(insert_rt);
1273 }
1274
1275 if !cache_only {
1276 self.meta
1277 .set_value_count(new_cur_count + FENCE_KEY_CNT as u16);
1278 } else {
1279 self.meta.set_value_count(new_cur_count);
1280 }
1281
1282 self.consolidate_after_split(&splitting_key);
1283
1284 splitting_key
1285 }
1286
1287 pub fn split_with_key(
1292 &mut self,
1293 sibling: &mut LeafNode,
1294 merge_split_key: &Vec<u8>,
1295 cache_only: bool,
1296 ) {
1297 if cache_only {
1298 assert!(self.meta.is_cache_only_leaf() && !self.has_fence());
1299 } else {
1300 assert!(self.is_base_page() && self.has_fence());
1301 }
1302
1303 let current_count = self.meta.meta_count_without_fence();
1304 let mut new_cur_count = self.get_kv_num_below_key(merge_split_key);
1309 let sibling_cnt = current_count - new_cur_count;
1310
1311 if !cache_only {
1315 let high_fence_for_right = self.get_kv_meta(HIGH_FENCE_IDX);
1316
1317 let low_fence_key = merge_split_key.clone();
1318 let high_fence_key = if high_fence_for_right.is_infinite_high_fence_key() {
1319 vec![]
1320 } else {
1321 self.get_full_key(high_fence_for_right)
1322 };
1323
1324 sibling.initialize(
1325 &low_fence_key,
1326 &high_fence_key,
1327 sibling.meta.node_size as usize,
1328 MiniPageNextLevel::new_null(),
1329 true,
1330 cache_only,
1331 );
1332 } else {
1333 sibling.initialize(
1334 &[],
1335 &[],
1336 sibling.meta.node_size as usize,
1337 MiniPageNextLevel::new_null(),
1338 false,
1339 cache_only,
1340 );
1341 }
1342
1343 let starting_kv_index = if cache_only { 0 } else { FENCE_KEY_CNT };
1344
1345 for i in 0..sibling_cnt {
1346 let kv_meta = self.get_kv_meta((new_cur_count + i) as usize + starting_kv_index);
1347 if kv_meta.op_type() == OpType::Delete {
1348 continue;
1350 }
1351 let key = self.get_full_key(kv_meta);
1352 let value = self.get_value(kv_meta);
1353 let insert_rt = sibling.insert(&key, value, OpType::Insert, 0);
1354 assert!(insert_rt);
1355 }
1356
1357 if !cache_only {
1358 new_cur_count += FENCE_KEY_CNT as u16;
1359 }
1360
1361 self.meta.set_value_count(new_cur_count);
1362 self.consolidate_after_split(merge_split_key);
1363
1364 if !cache_only {
1365 let left_high_fence = self.get_kv_meta(HIGH_FENCE_IDX);
1368 let left_high_fence_key = self.get_full_key(left_high_fence);
1369 let right_low_fence_key = sibling.get_low_fence_full_key();
1370
1371 assert_eq!(left_high_fence_key, *merge_split_key); assert_eq!(right_low_fence_key, *merge_split_key);
1373 }
1374 }
1375
1376 pub(crate) fn consolidate_inner(
1377 &mut self,
1378 new_optype: OpType,
1379 new_high_fence: Option<&[u8]>,
1380 skip_tombstone: bool,
1381 cache_only: bool,
1382 skip_key: Option<&[u8]>,
1383 ) {
1384 let mut pairs = Vec::new();
1385
1386 for meta in self.meta_iter() {
1387 if skip_tombstone && meta.op_type() == OpType::Delete {
1388 continue;
1390 }
1391
1392 match skip_key {
1393 Some(s_k) => {
1395 let k = self.get_full_key(meta);
1396 let cmp = s_k.cmp(&k);
1397
1398 if cmp != Ordering::Equal {
1399 pairs.push((k, self.get_value(meta).to_owned(), meta.op_type()));
1400 }
1401 }
1402 None => {
1403 pairs.push((
1404 self.get_full_key(meta),
1405 self.get_value(meta).to_owned(),
1406 meta.op_type(),
1407 ));
1408 }
1409 }
1410 }
1411
1412 let has_fence = self.has_fence();
1413 let (low_fence, high_fence) = if has_fence {
1414 let high_fence_key = match new_high_fence {
1415 None => self.get_high_fence_key(),
1416 Some(key) => key.to_vec(),
1417 };
1418 (self.get_low_fence_key(), high_fence_key)
1419 } else {
1420 (vec![], vec![])
1421 };
1422
1423 let node_size = self.meta.node_size;
1424
1425 self.initialize(
1426 &low_fence,
1427 &high_fence,
1428 node_size as usize,
1429 self.next_level,
1430 has_fence,
1431 cache_only,
1432 );
1433
1434 for (key, value, op_type) in pairs {
1435 let rt = if op_type == OpType::Delete {
1436 self.insert(&key, &value, OpType::Delete, 0)
1437 } else {
1438 self.insert(&key, &value, new_optype, 0)
1439 };
1440 assert!(rt);
1441 }
1442 }
1443
1444 #[allow(dead_code)]
1445 pub(crate) fn consolidate(&mut self) {
1446 self.consolidate_inner(
1447 OpType::Insert,
1448 None,
1449 true,
1450 self.meta.is_cache_only_leaf(),
1451 None,
1452 );
1453 }
1454
1455 pub(crate) fn consolidate_after_merge(&mut self) {
1458 assert!(!self.is_base_page());
1459 self.consolidate_inner(
1460 OpType::Cache,
1461 None,
1462 false,
1463 self.meta.is_cache_only_leaf(),
1464 None,
1465 );
1466 }
1467
1468 fn consolidate_after_split(&mut self, high_fence: &[u8]) {
1472 assert!(self.meta.is_cache_only_leaf() || self.is_base_page());
1473 self.consolidate_inner(
1474 OpType::Insert,
1475 Some(high_fence),
1476 true,
1477 self.meta.is_cache_only_leaf(),
1478 None,
1479 );
1480 }
1481
1482 pub(crate) fn consolidate_skip_key(&mut self, key: &[u8]) {
1484 self.consolidate_inner(
1485 OpType::Insert,
1486 None,
1487 true,
1488 self.meta.is_cache_only_leaf(),
1489 Some(key),
1490 );
1491 }
1492
1493 pub(crate) fn copy_initialize_to(
1495 &self,
1496 dst_node: *mut LeafNode,
1497 dst_size: usize,
1498 discard_cold_cache: bool,
1499 ) {
1500 assert!(!self.is_base_page());
1501 assert!(self.meta.node_size as usize <= dst_size);
1502 let dst_ref = unsafe { &mut *dst_node };
1503 let empty = vec![];
1504
1505 dst_ref.initialize(
1506 &empty,
1507 &empty,
1508 dst_size,
1509 self.next_level,
1510 false, self.meta.is_cache_only_leaf(),
1512 );
1513
1514 for meta in self.meta_iter() {
1515 let op = meta.op_type();
1516
1517 if discard_cold_cache && op.is_cache() && !meta.is_referenced() {
1522 continue;
1523 }
1524
1525 let value = self.get_value(meta);
1526 let rt = dst_ref.insert(&self.get_full_key(meta), value, op, 0);
1527 assert!(rt);
1528 }
1529 }
1530
1531 pub(crate) fn initialize(
1533 &mut self,
1534 low_fence: &[u8],
1535 high_fence: &[u8],
1536 node_size: usize,
1537 next_level: MiniPageNextLevel,
1538 has_fence: bool,
1539 cache_only: bool,
1540 ) {
1541 if !has_fence {
1542 assert!(low_fence.is_empty());
1543 assert!(high_fence.is_empty());
1544
1545 self.meta = NodeMeta::new(
1546 LeafNode::max_data_size(node_size) as u16,
1547 false,
1548 false,
1549 node_size as u16,
1550 cache_only,
1551 );
1552 self.prefix_len = 0;
1553 self.next_level = next_level;
1554 } else {
1555 self.meta = NodeMeta::new(
1556 LeafNode::max_data_size(node_size) as u16, false,
1558 true,
1559 node_size as u16,
1560 cache_only,
1561 );
1562
1563 self.prefix_len = common_prefix_len(low_fence, high_fence);
1564 self.next_level = next_level;
1565 self.install_fence_key(low_fence, false);
1566 self.install_fence_key(high_fence, true);
1567 }
1568 }
1569
1570 pub(crate) fn set_split_flag(&mut self) {
1571 self.meta.set_split_flag();
1572 }
1573
1574 pub(crate) fn get_split_flag(&self) -> bool {
1575 self.meta.get_split_flag()
1576 }
1577
1578 pub(crate) fn read_by_key(&self, search_key: &[u8], out_buffer: &mut [u8]) -> LeafReadResult {
1579 self.read_by_key_inner(search_key, out_buffer, true)
1580 }
1581
1582 #[must_use]
1584 pub(crate) fn read_by_key_inner(
1585 &self,
1586 search_key: &[u8],
1587 out_buffer: &mut [u8],
1588 binary_search: bool,
1589 ) -> LeafReadResult {
1590 let val_count = self.meta.meta_count_with_fence();
1591 let pos = if binary_search {
1592 self.lower_bound(search_key)
1593 } else {
1594 self.linear_lower_bound(search_key)
1595 };
1596
1597 if pos >= val_count {
1598 counter!(LeafNotFoundDueToRange);
1599 return LeafReadResult::NotFound;
1600 }
1601
1602 let kv_meta = self.get_kv_meta(pos as usize);
1603 let target_key = self.get_remaining_key(kv_meta);
1604
1605 if !kv_meta.is_referenced() {
1607 kv_meta.mark_as_ref();
1608 }
1609
1610 let input_post_key = &search_key[self.prefix_len as usize..];
1611 let cmp = target_key.cmp(input_post_key);
1612
1613 if cmp != Ordering::Equal {
1614 counter!(LeafNotFoundDueToKey);
1615 LeafReadResult::NotFound
1616 } else {
1617 if kv_meta.op_type().is_absent() {
1618 return LeafReadResult::Deleted;
1619 }
1620 let val_len = kv_meta.value_len();
1621 let val_ref = self.get_value(kv_meta);
1622 debug_assert_eq!(val_len as usize, val_ref.len());
1623 out_buffer[..val_len as usize].copy_from_slice(val_ref);
1624 LeafReadResult::Found(val_len as u32)
1625 }
1626 }
1627
1628 pub(crate) fn get_chain_size_hint(
1633 key_len: usize,
1634 value_len: usize,
1635 page_classes: &[usize],
1636 cache_only: bool,
1637 ) -> usize {
1638 let mut initial_record_size = key_len + value_len + std::mem::size_of::<LeafKVMeta>();
1639 initial_record_size += std::mem::size_of::<LeafNode>();
1640
1641 if let Some(s) = page_classes[0..(page_classes.len() - 1)]
1642 .iter()
1643 .position(|x| initial_record_size < *x)
1644 {
1645 return page_classes[s];
1646 } else if cache_only && initial_record_size <= page_classes[page_classes.len() - 1] {
1647 return page_classes[page_classes.len() - 1];
1648 }
1649
1650 panic!(
1651 "Record size {} plus metadata exceeds the max mini-page size {:?}",
1652 initial_record_size, page_classes
1653 );
1654 }
1655
1656 pub(crate) fn new_size_if_upgrade(
1662 &self,
1663 incoming_size: usize,
1664 page_classes: &[usize],
1665 cache_only: bool,
1666 ) -> Option<usize> {
1667 let cur_size = self.meta.node_size as usize;
1668 let request_size = cur_size + incoming_size;
1669
1670 if let Some(s) = page_classes[0..(page_classes.len() - 1)]
1671 .iter()
1672 .position(|x| request_size < *x)
1673 {
1674 if s == 0 {
1675 panic!("Should not be here");
1676 }
1677 return Some(page_classes[s]);
1678 } else if cache_only && request_size <= page_classes[page_classes.len() - 1] {
1679 return Some(page_classes[page_classes.len() - 1]);
1680 }
1681
1682 None
1683 }
1684
1685 pub(crate) fn free_base_page(node: *mut LeafNode) {
1687 assert!(unsafe { &*node }.is_base_page());
1688 let node_size = unsafe { &*node }.meta.node_size as usize;
1689 let layout = Layout::from_size_align(node_size, std::mem::align_of::<LeafNode>()).unwrap();
1690 unsafe {
1691 std::alloc::dealloc(node as *mut u8, layout);
1692 }
1693 }
1694
1695 pub(crate) fn need_actually_merge_to_disk(&self) -> bool {
1696 for meta in self.meta_iter() {
1697 if meta.op_type().is_dirty() {
1698 return true;
1699 }
1700 }
1701
1702 false
1703 }
1704
1705 fn estimate_merge_size(&self) -> usize {
1706 let mut required_size = 0;
1707
1708 for meta in self.meta_iter() {
1709 if meta.op_type() == OpType::Insert {
1710 required_size += (meta.get_key_len() + meta.value_len()) as usize
1711 + std::mem::size_of::<LeafKVMeta>();
1712 }
1713 }
1714
1715 required_size
1716 }
1717
1718 pub(crate) fn full_with_fences(&self, requested_size: u16, max_fence_len: usize) -> bool {
1721 if self.meta.remaining_size < requested_size {
1722 return true;
1723 }
1724
1725 if self.meta.has_fence() {
1726 let mut empty_data_size = self.meta.remaining_size; let low_key_meta = self.get_kv_meta(LOW_FENCE_IDX);
1729 if !low_key_meta.is_infinite_low_fence_key() {
1730 empty_data_size += low_key_meta.get_key_len();
1731 }
1732
1733 let high_key_meta = self.get_kv_meta(HIGH_FENCE_IDX);
1734 if !high_key_meta.is_infinite_high_fence_key() {
1735 empty_data_size += high_key_meta.get_key_len();
1736 }
1737
1738 if empty_data_size >= requested_size + max_fence_len as u16 {
1739 return false;
1740 }
1741
1742 return true;
1743 }
1744
1745 false
1746 }
1747
1748 pub(crate) fn merge_mini_page(&mut self, mini_page: &LeafNode, max_fence_len: usize) -> bool {
1749 let size_required = mini_page.estimate_merge_size();
1750
1751 if self.full_with_fences(size_required as u16, max_fence_len) {
1752 return false;
1753 }
1754
1755 for meta in mini_page.meta_iter() {
1756 let c_key = mini_page.get_full_key(meta);
1757 let c_type = meta.op_type();
1758 if !c_type.is_dirty() {
1759 continue;
1764 }
1765 let c_value = mini_page.get_value(meta);
1766 let rt = self.insert(&c_key, c_value, c_type, 0);
1767 assert!(rt);
1768 }
1769
1770 true
1771 }
1772
1773 pub(crate) fn get_stats(&self) -> LeafStats {
1774 let mut keys = Vec::new();
1775 let mut values = Vec::new();
1776 let mut op_types = Vec::new();
1777 let node_size = self.meta.node_size as usize;
1778 let prefix = self.get_prefix().to_owned();
1779
1780 for meta in self.meta_iter() {
1781 let key = self.get_full_key(meta);
1782 let value = self.get_value(meta);
1783 keys.push(key);
1784 values.push(value.to_owned());
1785 op_types.push(meta.op_type());
1786 }
1787
1788 LeafStats {
1789 keys,
1790 values,
1791 op_types,
1792 prefix,
1793 base_node: None,
1794 next_level: self.next_level,
1795 node_size,
1796 }
1797 }
1798
1799 pub(crate) fn has_fence(&self) -> bool {
1800 self.meta.has_fence()
1801 }
1802
1803 pub(crate) fn meta_iter(&self) -> LeafMetaIter<'_> {
1805 LeafMetaIter::new(self)
1806 }
1807
1808 fn first_meta_pos_after_fence(&self) -> u16 {
1809 if self.has_fence() {
1810 FENCE_KEY_CNT as u16
1811 } else {
1812 0
1813 }
1814 }
1815
1816 pub(crate) fn get_value_by_pos(&self, pos: u32, out_buffer: &mut [u8]) -> GetValueByPosResult {
1817 if pos >= self.meta.meta_count_with_fence() as u32 {
1818 return GetValueByPosResult::EndOfLeaf;
1819 }
1820
1821 let meta = self.get_kv_meta(pos as usize);
1822
1823 if meta.op_type().is_absent() {
1824 return GetValueByPosResult::Deleted;
1825 }
1826
1827 let value = self.get_value(meta);
1828 let value_len = meta.value_len() as usize;
1829 out_buffer[..value_len].copy_from_slice(value);
1830 GetValueByPosResult::Found(value_len as u32)
1831 }
1832}
1833
1834pub(crate) struct LeafMetaIter<'a> {
1835 node: &'a LeafNode,
1836 cur: usize,
1837}
1838
1839impl LeafMetaIter<'_> {
1840 fn new(leaf: &LeafNode) -> LeafMetaIter<'_> {
1841 let cur = leaf.first_meta_pos_after_fence();
1842 LeafMetaIter {
1843 node: leaf,
1844 cur: cur as usize,
1845 }
1846 }
1847}
1848
1849impl<'a> Iterator for LeafMetaIter<'a> {
1850 type Item = &'a LeafKVMeta;
1851
1852 fn next(&mut self) -> Option<Self::Item> {
1853 if self.cur < self.node.meta.meta_count_with_fence() as usize {
1854 let rt = self.node.get_kv_meta(self.cur);
1855 self.cur += 1;
1856 Some(rt)
1857 } else {
1858 None
1859 }
1860 }
1861}
1862
1863pub(crate) enum GetValueByPosResult {
1864 EndOfLeaf,
1865 Deleted,
1866 Found(u32),
1867}
1868
1869#[derive(PartialEq, Eq, Debug)]
1870pub enum LeafReadResult {
1871 Deleted,
1872 NotFound,
1873 Found(u32),
1874 InvalidKey,
1875}
1876
1877#[cfg(test)]
1878mod tests {
1879 use super::*;
1880 use bytemuck::cast_slice;
1881 use rstest::rstest;
1882
1883 #[test]
1884 fn test_op_type_enum() {
1885 assert_eq!(OpType::Insert as u8, 0);
1886 assert_eq!(OpType::Delete as u8, 1);
1887 assert_eq!(OpType::Cache as u8, 2);
1888 }
1889
1890 #[test]
1891 fn test_make_prefixed_meta() {
1892 let key = [1, 2, 3, 4, 5];
1893 let prefix_len = 2;
1894 let meta = LeafKVMeta::make_prefixed_meta(10, 20, &key, prefix_len, OpType::Insert);
1895
1896 assert_eq!(meta.offset, 10);
1897 assert_eq!(meta.get_key_len(), 5);
1898 assert_eq!(meta.value_len(), 20);
1899 assert_eq!(meta.preview_bytes, [3, 4]);
1900 assert_eq!(meta.op_type(), OpType::Insert);
1901 assert!(!meta.is_referenced());
1902 }
1903
1904 #[test]
1905 fn test_make_infinite_high_fence_key() {
1906 let meta = LeafKVMeta::make_infinite_high_fence_key();
1907 assert_eq!(meta.offset, u16::MAX);
1908 assert_eq!(meta.get_key_len(), 0);
1909 assert_eq!(meta.value_len(), 0);
1910 assert!(meta.is_infinite_high_fence_key());
1911 }
1912
1913 #[test]
1914 fn test_make_infinite_low_fence_key() {
1915 let meta = LeafKVMeta::make_infinite_low_fence_key();
1916 assert_eq!(meta.offset, u16::MAX - 1);
1917 assert_eq!(meta.get_key_len(), 0);
1918 assert_eq!(meta.value_len(), 0);
1919 assert!(meta.is_infinite_low_fence_key());
1920 }
1921
1922 #[test]
1923 fn test_value_len() {
1924 let mut meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Insert);
1925 assert_eq!(meta.value_len(), 20);
1926
1927 meta.set_value_len(30);
1928 assert_eq!(meta.value_len(), 30);
1929 }
1930
1931 #[test]
1932 fn test_op_type() {
1933 let meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Delete);
1934 assert_eq!(meta.op_type(), OpType::Delete);
1935 }
1936
1937 #[test]
1938 fn test_mark_as_ref_and_clear_ref() {
1939 let meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Insert);
1940 assert!(!meta.is_referenced());
1941
1942 meta.mark_as_ref();
1943 assert!(meta.is_referenced());
1944
1945 meta.clear_ref();
1946 assert!(!meta.is_referenced());
1947 }
1948
1949 #[test]
1950 fn test_mark_as_deleted_and_is_deleted() {
1951 let mut meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Insert);
1952 assert!(!meta.is_deleted());
1953
1954 meta.mark_as_deleted();
1955 assert!(meta.is_deleted());
1956 }
1957
1958 #[rstest]
1962 #[case(vec![1], vec![2], 2)]
1963 #[case(vec![2], vec![1], 2)]
1964 fn test_get_merge_split_key(
1965 #[case] base_page_values: Vec<usize>,
1966 #[case] mini_page_values: Vec<usize>,
1967 #[case] splitting_key: usize,
1968 ) {
1969 let base = unsafe { &mut *LeafNode::make_base_page(4096) };
1970 let mini = unsafe { &mut *LeafNode::make_base_page(4096) }; for i in 0..base_page_values.len() {
1974 let n = &base_page_values[i];
1975 let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
1976 let key = cast_slice::<usize, u8>(n_slice);
1977 let value = cast_slice::<usize, u8>(n_slice);
1978
1979 let rt = base.insert(key, value, OpType::Insert, 2);
1980 assert!(rt);
1981 }
1982
1983 for i in 0..mini_page_values.len() {
1984 let n = &mini_page_values[i];
1985 let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
1986 let key = cast_slice::<usize, u8>(n_slice);
1987 let value = cast_slice::<usize, u8>(n_slice);
1988
1989 let rt = mini.insert(key, value, OpType::Insert, 2);
1990 assert!(rt);
1991 }
1992
1993 let merge_split_key_byte = base.get_merge_split_key(mini);
1995 let merge_splitting_key = cast_slice::<u8, usize>(&merge_split_key_byte);
1996
1997 assert_eq!(merge_splitting_key[0], splitting_key);
1998
1999 LeafNode::free_base_page(base);
2000 LeafNode::free_base_page(mini);
2001 }
2002
2003 #[rstest]
2008 #[case(vec![1, 2, 3, 4], 3)]
2009 #[case(vec![1], 2)]
2010 #[case(vec![2], 2)]
2011 fn test_split_with_key(#[case] base_page_values: Vec<usize>, #[case] splitting_key: usize) {
2012 let base = unsafe { &mut *LeafNode::make_base_page(4096) };
2013 let sibling = unsafe { &mut *LeafNode::make_base_page(4096) };
2014
2015 for i in 0..base_page_values.len() {
2017 let n = &base_page_values[i];
2018 let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
2019 let key = cast_slice::<usize, u8>(n_slice);
2020 let value = cast_slice::<usize, u8>(n_slice);
2021
2022 let rt = base.insert(key, value, OpType::Insert, 2);
2023 assert!(rt);
2024 }
2025
2026 let splitting_key_ptr = &splitting_key;
2027 let splitting_key_slice =
2028 unsafe { std::slice::from_raw_parts(splitting_key_ptr as *const usize, 1) };
2029 let splitting_key_byte_arrary = cast_slice::<usize, u8>(splitting_key_slice).to_vec();
2030
2031 base.split_with_key(sibling, &splitting_key_byte_arrary, false);
2033
2034 let mut out_buffer = vec![0u8; 1024];
2037 for i in 0..base_page_values.len() {
2038 let mut page = &mut *base;
2039 if base_page_values[i] >= splitting_key {
2040 page = &mut *sibling;
2041 }
2042
2043 let n = &base_page_values[i];
2044 let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
2045 let key = cast_slice::<usize, u8>(n_slice);
2046
2047 let rt = page.read_by_key(key, &mut out_buffer);
2048
2049 assert_eq!(rt, LeafReadResult::Found(key.len() as u32)); assert_eq!(&out_buffer[0..key.len()], key);
2051 }
2052
2053 LeafNode::free_base_page(base);
2054 LeafNode::free_base_page(sibling);
2055 }
2056}