1use core::panic;
5use std::{alloc::Layout, cmp::Ordering, sync::atomic};
6
7use crate::{
8 circular_buffer::CircularBufferPtr, counter, error::TreeError, range_scan::ScanReturnField,
9 utils::stats::LeafStats,
10};
11
12use super::{node_meta::NodeMeta, FENCE_KEY_CNT};
13
14#[repr(u8)]
17#[derive(PartialEq, Eq, Debug, Clone, Copy)]
18pub(crate) enum OpType {
19 Insert = 0,
20 Delete = 1,
21 Cache = 2, Phantom = 3, }
24
25impl OpType {
26 pub(crate) fn is_dirty(&self) -> bool {
27 match self {
28 OpType::Insert | OpType::Delete => true,
29 OpType::Cache | OpType::Phantom => false,
30 }
31 }
32
33 fn is_absent(&self) -> bool {
34 match self {
35 OpType::Insert | OpType::Cache => false,
36 OpType::Phantom | OpType::Delete => true,
37 }
38 }
39
40 fn is_cache(&self) -> bool {
41 match self {
42 OpType::Cache | OpType::Phantom => true,
43 OpType::Insert | OpType::Delete => false,
44 }
45 }
46}
47
48const PREVIEW_SIZE: usize = 2;
49
50const LOW_FENCE_IDX: usize = 0;
51const HIGH_FENCE_IDX: usize = 1;
52
53const OP_TYPE_SHIFT: u16 = 14;
54const KEY_LEN_MASK: u16 = 0x3F_FF; const REF_BIT_MASK: u16 = 0x80_00;
58const VALUE_LEN_MASK: u16 = 0x7F_FF; pub(crate) fn common_prefix_len(low_fence: &[u8], high_fence: &[u8]) -> u16 {
62 let mut prefix_len = 0;
63 for i in 0..std::cmp::min(low_fence.len(), high_fence.len()) {
64 if low_fence[i] == high_fence[i] {
65 prefix_len += 1;
66 } else {
67 break;
68 }
69 }
70 prefix_len
71}
72
73#[repr(C)]
74pub(crate) struct LeafKVMeta {
75 offset: u16,
76 op_type_key_len_in_byte: u16, ref_value_len_in_byte: std::sync::atomic::AtomicU16, preview_bytes: [u8; PREVIEW_SIZE],
79}
80
81impl LeafKVMeta {
82 pub(crate) fn make_prefixed_meta(
83 offset: u16,
84 value_len: u16,
85 key: &[u8],
86 prefix_len: u16,
87 op_type: OpType,
88 ) -> Self {
89 let mut meta = Self {
90 offset,
91 op_type_key_len_in_byte: key.len() as u16 | ((op_type as u16) << OP_TYPE_SHIFT),
92 ref_value_len_in_byte: std::sync::atomic::AtomicU16::new(value_len),
93 preview_bytes: [0; PREVIEW_SIZE],
94 };
95
96 for i in 0..std::cmp::min(key.len().saturating_sub(prefix_len as usize), PREVIEW_SIZE) {
97 meta.preview_bytes[i] = key[i + prefix_len as usize];
98 }
99
100 meta.clear_ref();
103 meta
104 }
105
106 pub fn make_infinite_high_fence_key() -> Self {
107 assert_eq!(std::mem::size_of::<Self>(), super::KV_META_SIZE);
108
109 Self {
110 offset: u16::MAX,
111 op_type_key_len_in_byte: 0,
112 ref_value_len_in_byte: std::sync::atomic::AtomicU16::new(0),
113 preview_bytes: [0; PREVIEW_SIZE],
114 }
115 }
116
117 pub fn make_infinite_low_fence_key() -> Self {
118 Self {
119 offset: u16::MAX - 1,
120 op_type_key_len_in_byte: 0,
121 ref_value_len_in_byte: std::sync::atomic::AtomicU16::new(0),
122 preview_bytes: [0; PREVIEW_SIZE],
123 }
124 }
125
126 pub fn is_infinite_low_fence_key(&self) -> bool {
127 self.offset == u16::MAX - 1
128 }
129
130 pub fn is_infinite_high_fence_key(&self) -> bool {
131 self.offset == u16::MAX
132 }
133
134 pub fn value_len(&self) -> u16 {
135 self.ref_value_len_in_byte.load(atomic::Ordering::Relaxed) & VALUE_LEN_MASK
136 }
137
138 pub fn set_value_len(&mut self, value: u16) {
139 let v = self.ref_value_len_in_byte.load(atomic::Ordering::Relaxed);
140 self.ref_value_len_in_byte.store(
141 (v & !VALUE_LEN_MASK) | (value & VALUE_LEN_MASK),
142 atomic::Ordering::Relaxed,
143 );
144 }
145
146 pub fn set_op_type(&mut self, op_type: OpType) {
147 self.op_type_key_len_in_byte = self.get_key_len() | ((op_type as u16) << OP_TYPE_SHIFT);
148 }
149
150 pub fn op_type(&self) -> OpType {
151 let l = self.op_type_key_len_in_byte;
152 let b = l >> OP_TYPE_SHIFT;
153 unsafe { std::mem::transmute(b as u8) }
154 }
155
156 pub fn mark_as_ref(&self) {
157 self.ref_value_len_in_byte
158 .fetch_or(REF_BIT_MASK, atomic::Ordering::Relaxed);
159 }
160
161 pub fn clear_ref(&self) {
162 self.ref_value_len_in_byte
163 .fetch_and(!REF_BIT_MASK, atomic::Ordering::Relaxed);
164 }
165
166 pub fn is_referenced(&self) -> bool {
167 self.ref_value_len_in_byte.load(atomic::Ordering::Relaxed) & REF_BIT_MASK != 0
168 }
169
170 pub fn mark_as_deleted(&mut self) {
171 self.set_op_type(OpType::Delete);
172 }
173
174 #[allow(dead_code)]
175 pub fn is_deleted(&self) -> bool {
176 self.op_type() == OpType::Delete
177 }
178
179 pub(crate) fn get_offset(&self) -> u16 {
180 self.offset
181 }
182
183 pub(crate) fn get_key_len(&self) -> u16 {
184 self.op_type_key_len_in_byte & KEY_LEN_MASK
185 }
186}
187
188#[derive(Debug, Clone, Copy)]
189pub(crate) struct MiniPageNextLevel {
190 val: usize,
191}
192
193impl MiniPageNextLevel {
194 pub(crate) fn new(val: usize) -> Self {
195 Self { val }
196 }
197
198 pub(crate) fn as_offset(&self) -> usize {
199 assert!(!self.is_null());
200 self.val
201 }
202
203 pub(crate) fn new_null() -> Self {
204 Self { val: usize::MAX }
205 }
206
207 pub(crate) fn is_null(&self) -> bool {
208 self.val == usize::MAX
209 }
210}
211
212#[repr(C)]
213pub(crate) struct LeafNode {
214 pub(crate) meta: NodeMeta,
215 prefix_len: u16,
216 pub(crate) next_level: MiniPageNextLevel,
217 pub(crate) lsn: u64,
218 data: [u8; 0],
219}
220
221const _: () = assert!(std::mem::size_of::<LeafNode>() == 24);
222
223impl LeafNode {
224 fn max_data_size(node_size: usize) -> usize {
225 node_size - std::mem::size_of::<LeafNode>()
226 }
227
228 pub(crate) fn initialize_mini_page(
229 ptr: &CircularBufferPtr,
230 node_size: usize,
231 next_level: MiniPageNextLevel,
232 cache_only: bool,
233 ) {
234 unsafe {
235 Self::init_node_with_fence(
236 ptr.as_ptr(),
237 &[],
238 &[],
239 node_size,
240 next_level,
241 false,
242 cache_only,
243 )
244 }
245 }
246
247 pub(crate) fn make_base_page(node_size: usize) -> *mut Self {
248 let layout = Layout::from_size_align(node_size, std::mem::align_of::<LeafNode>()).unwrap();
249 let ptr = unsafe { std::alloc::alloc(layout) };
250 unsafe {
251 Self::init_node_with_fence(
252 ptr,
253 &[],
254 &[],
255 node_size,
256 MiniPageNextLevel::new_null(),
257 true,
258 false,
259 );
260 }
261 ptr as *mut Self
262 }
263
264 pub(crate) fn covert_insert_records_to_cache(&mut self) {
266 let mut cur = self.first_meta_pos_after_fence();
267
268 while cur < self.meta.meta_count_with_fence() {
269 let meta = self.get_kv_meta_mut(cur as usize);
270 let op_type = meta.op_type();
271
272 match op_type {
273 OpType::Insert => {
274 meta.set_op_type(OpType::Cache);
275 }
276 OpType::Delete => {
277 meta.set_op_type(OpType::Phantom);
278 }
279 OpType::Cache | OpType::Phantom => {
280 unreachable!("Base page should not have op type: {:?}", op_type);
281 }
282 };
283
284 cur += 1;
285 }
286 }
287
288 pub(crate) fn convert_cache_records_to_insert(&mut self) {
290 let mut cur = self.first_meta_pos_after_fence();
291
292 while cur < self.meta.meta_count_with_fence() {
293 let meta = self.get_kv_meta_mut(cur as usize);
294 let op_type = meta.op_type();
295
296 match op_type {
297 OpType::Cache => {
298 meta.set_op_type(OpType::Insert);
299 }
300 OpType::Phantom => {
301 meta.set_op_type(OpType::Delete);
302 }
303 OpType::Insert | OpType::Delete => {
304 }
306 };
307
308 cur += 1;
309 }
310 }
311
312 unsafe fn init_node_with_fence(
313 ptr: *mut u8,
314 low_fence: &[u8],
315 high_fence: &[u8],
316 node_size: usize,
317 next_level: MiniPageNextLevel,
318 has_fence: bool,
319 cache_only: bool,
320 ) {
321 let ptr = ptr as *mut Self;
322
323 { &mut *ptr }.initialize(
324 low_fence, high_fence, node_size, next_level, has_fence, cache_only,
325 );
326 }
327
328 pub(crate) fn get_kv_meta(&self, index: usize) -> &LeafKVMeta {
329 debug_assert!(index < self.meta.meta_count_with_fence() as usize);
330 let meta_ptr = self.data.as_ptr() as *const LeafKVMeta;
331 unsafe { &*meta_ptr.add(index) }
332 }
333
334 pub(crate) fn get_full_key(&self, meta: &LeafKVMeta) -> Vec<u8> {
335 let prefix = self.get_prefix();
336 let remaining = self.get_remaining_key(meta);
337 [prefix, remaining].concat()
338 }
339
340 pub(crate) fn get_low_fence_full_key(&self) -> Vec<u8> {
342 debug_assert!(LOW_FENCE_IDX < self.meta.meta_count_with_fence() as usize);
343 let meta_ptr = self.data.as_ptr() as *const LeafKVMeta;
344 let meta = unsafe { &*meta_ptr.add(LOW_FENCE_IDX) };
345
346 let key_offset = meta.get_offset();
347 let key_ptr = unsafe { self.data.as_ptr().add(key_offset as usize) };
348 unsafe {
349 let key = std::slice::from_raw_parts(key_ptr, (meta.get_key_len()) as usize);
350 [key].concat()
351 }
352 }
353
354 pub(crate) fn get_kv_meta_mut(&mut self, index: usize) -> &mut LeafKVMeta {
355 debug_assert!(index < self.meta.meta_count_with_fence() as usize);
356 let meta_ptr = self.data.as_mut_ptr() as *mut LeafKVMeta;
357 unsafe { meta_ptr.add(index).as_mut().unwrap() }
358 }
359
360 pub(crate) fn write_initial_kv_meta(&mut self, index: usize, meta: LeafKVMeta) {
361 let meta_ptr = self.data.as_mut_ptr() as *mut LeafKVMeta;
362 unsafe { meta_ptr.add(index).write(meta) };
363 }
364
365 pub(crate) fn get_remaining_key(&self, meta: &LeafKVMeta) -> &[u8] {
366 let key_offset = meta.get_offset();
367 let key_ptr = unsafe { self.data.as_ptr().add(key_offset as usize) };
368 unsafe {
369 std::slice::from_raw_parts(key_ptr, (meta.get_key_len() - self.prefix_len) as usize)
370 }
371 }
372
373 pub(crate) fn get_prefix(&self) -> &[u8] {
374 let m = self.get_kv_meta(LOW_FENCE_IDX);
375 let key_offset = m.get_offset();
376 unsafe {
377 std::slice::from_raw_parts(
378 self.data.as_ptr().add(key_offset as usize),
379 self.prefix_len as usize,
380 )
381 }
382 }
383
384 pub(crate) fn install_fence_key(&mut self, key: &[u8], is_high_fence: bool) {
385 let loc: u16 = self.meta.meta_count_with_fence();
386 if !is_high_fence {
387 debug_assert!(self.meta.meta_count_with_fence() == LOW_FENCE_IDX as u16);
388 } else {
389 debug_assert!(self.meta.meta_count_with_fence() == HIGH_FENCE_IDX as u16);
390 }
391
392 let cur_low_offset = self.current_lowest_offset();
393
394 self.meta.increment_value_count();
395 self.meta.remaining_size -= std::mem::size_of::<LeafKVMeta>() as u16;
396
397 if key.is_empty() {
398 let fence = if is_high_fence {
399 LeafKVMeta::make_infinite_high_fence_key()
400 } else {
401 LeafKVMeta::make_infinite_low_fence_key()
402 };
403 self.write_initial_kv_meta(loc as usize, fence);
404 } else {
405 let prefix_len = if is_high_fence { self.prefix_len } else { 0 }; let post_fix_len = key.len() as u16 - prefix_len;
408 let val_len = 0u16;
409 let kv_len = post_fix_len + val_len;
410
411 let remaining = self.meta.remaining_size;
412
413 debug_assert!(kv_len <= remaining);
414
415 let offset = cur_low_offset - kv_len;
416
417 let new_meta =
418 LeafKVMeta::make_prefixed_meta(offset, 0, key, prefix_len, OpType::Insert);
419
420 self.write_initial_kv_meta(loc as usize, new_meta);
421
422 unsafe {
423 let start_ptr = self.data.as_mut_ptr().add(offset as usize);
424
425 let key_slice = std::slice::from_raw_parts(
426 key.as_ptr().add(prefix_len as usize),
427 post_fix_len as usize,
428 );
429 std::ptr::copy_nonoverlapping(key_slice.as_ptr(), start_ptr, post_fix_len as usize);
430 }
431
432 self.meta.remaining_size -= kv_len;
433 }
434 }
435
436 pub(crate) fn current_lowest_offset(&self) -> u16 {
437 let value_count = self.meta.meta_count_with_fence();
438 let rt =
439 (value_count * std::mem::size_of::<LeafKVMeta>() as u16) + self.meta.remaining_size;
440
441 #[cfg(debug_assertions)]
443 {
444 let mut min_offset = LeafNode::max_data_size(self.meta.node_size as usize) as u16;
445 for i in 0..value_count {
446 let kv_meta = self.get_kv_meta(i as usize);
447 if kv_meta.is_infinite_low_fence_key() || kv_meta.is_infinite_high_fence_key() {
448 continue;
449 }
450 min_offset = std::cmp::min(min_offset, kv_meta.offset);
451 }
452 assert!(min_offset == rt);
453 }
454 rt
455 }
456
457 pub(crate) fn get_value(&self, meta: &LeafKVMeta) -> &[u8] {
458 let val_offset = meta.get_offset() + meta.get_key_len() - self.prefix_len;
459 let val_ptr = unsafe { self.data.as_ptr().add(val_offset as usize) };
460 unsafe { std::slice::from_raw_parts(val_ptr, meta.value_len() as usize) }
461 }
462
463 pub fn get_split_key(&self, cache_only: bool) -> (Vec<u8>, u16) {
474 let mut data_size = 0;
475
476 for meta in self.meta_iter() {
477 let key_len = meta.get_key_len();
478 let value_len = meta.value_len();
479
480 data_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
481 }
482
483 let split_target_size = data_size / 2;
484 let mut cur_size = 0;
485
486 for (i, meta) in self.meta_iter().enumerate() {
487 let key_len = meta.get_key_len();
488 let value_len = meta.value_len();
489
490 cur_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
491
492 if cache_only && i == 0 {
494 continue;
495 }
496
497 if cur_size >= split_target_size {
498 return (self.get_full_key(meta), i as u16);
499 }
500 }
501 unreachable!();
502 }
503
504 #[allow(clippy::unused_enumerate_index)]
506 pub fn get_kv_num_below_key(&self, merge_split_key: &Vec<u8>) -> u16 {
507 let mut cnt: u16 = 0;
509 for (_, meta) in self.meta_iter().enumerate() {
510 let key = self.get_full_key(meta);
511 let cmp = key.cmp(merge_split_key);
512 if cmp == std::cmp::Ordering::Less {
515 cnt += 1;
516 } else {
517 break;
518 }
519 }
520 cnt
521 }
522
523 pub fn get_cache_only_insert_split_key(&self, key: &[u8], new_record_size: &u16) -> Vec<u8> {
528 let mut merge_split_key_1: Option<Vec<u8>> = None;
529 let mut merge_split_key_2: Option<Vec<u8>> = None;
530 let mut diff_1: i16 = i16::MAX;
531 let mut diff_2: i16 = i16::MAX;
532
533 let mut total_merged_size: u16 = 0;
535
536 for meta in self.meta_iter() {
537 let key_len = meta.get_key_len();
538 let value_len = meta.value_len();
539
540 total_merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
541 }
542
543 total_merged_size += new_record_size + std::mem::size_of::<LeafKVMeta>() as u16;
544 let split_target_size = total_merged_size / 2;
545
546 let mut merged_size: u16 = 0;
548 let mut self_meta_iter = self.meta_iter();
549 let mut self_meta_option = self_meta_iter.next();
550
551 if self_meta_option.is_some() {
553 let mut cur_base_meta = self_meta_option.unwrap();
554 let mut cur_base_key = self.get_full_key(cur_base_meta);
555 let mut cmp = cur_base_key.as_slice().cmp(key);
556
557 while cmp == std::cmp::Ordering::Less {
558 let key_len = cur_base_meta.get_key_len();
559 let value_len = cur_base_meta.value_len();
560 merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
561 if merged_size >= split_target_size {
562 if merge_split_key_2.is_some() {
565 break;
566 }
567
568 let left_side: u16 = merged_size
569 - key_len
570 - value_len
571 - std::mem::size_of::<LeafKVMeta>() as u16;
572 let right_side = total_merged_size - left_side;
573
574 if merge_split_key_1.is_none() {
575 merge_split_key_1 = Some(cur_base_key);
576 diff_1 = (left_side as i16 - right_side as i16).abs();
577 } else {
578 merge_split_key_2 = Some(cur_base_key);
579 diff_2 = (left_side as i16 - right_side as i16).abs();
580 }
581 }
582
583 self_meta_option = self_meta_iter.next();
584 if let Some(meta) = self_meta_option {
585 cur_base_meta = meta;
586 cur_base_key = self.get_full_key(cur_base_meta);
587 cmp = cur_base_key.as_slice().cmp(key);
588 } else {
589 break;
590 }
591 }
592 }
593
594 if merge_split_key_2.is_none() {
596 merged_size += new_record_size + std::mem::size_of::<LeafKVMeta>() as u16;
597
598 if merged_size >= split_target_size {
599 let left_side: u16 =
603 merged_size - new_record_size - std::mem::size_of::<LeafKVMeta>() as u16;
604 let right_side = total_merged_size - left_side;
605
606 if merge_split_key_1.is_none() {
607 merge_split_key_1 = Some(key.to_vec());
608 diff_1 = (left_side as i16 - right_side as i16).abs();
609 } else {
610 merge_split_key_2 = Some(key.to_vec());
611 diff_2 = (left_side as i16 - right_side as i16).abs();
612 }
613 }
614 }
615
616 while self_meta_option.is_some() {
618 let base_meta = self_meta_option.unwrap();
619 let key_len = base_meta.get_key_len();
620 let value_len = base_meta.value_len();
621 merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
622
623 if merged_size >= split_target_size {
624 let cur_base_key = self.get_full_key(base_meta);
626 if merge_split_key_2.is_some() {
627 break;
628 }
629
630 let left_side: u16 =
631 merged_size - key_len - value_len - std::mem::size_of::<LeafKVMeta>() as u16;
632 let right_side = total_merged_size - left_side;
633
634 if merge_split_key_1.is_none() {
635 merge_split_key_1 = Some(cur_base_key);
636 diff_1 = (left_side as i16 - right_side as i16).abs();
637 } else {
638 merge_split_key_2 = Some(cur_base_key);
639 diff_2 = (left_side as i16 - right_side as i16).abs();
640 }
641 }
642 self_meta_option = self_meta_iter.next();
643 }
644
645 if merge_split_key_1.is_none() {
648 panic!(
649 "Fail to find a splitting key for merging mini and base page.{}, {}",
650 merged_size, split_target_size
651 );
652 }
653
654 if merge_split_key_2.is_none() || diff_1 < diff_2 {
655 return merge_split_key_1.unwrap();
656 }
657
658 merge_split_key_2.unwrap()
659 }
660
661 #[allow(clippy::unnecessary_unwrap)]
668 pub(crate) fn get_merge_split_key(&mut self, mini_page: &LeafNode) -> Vec<u8> {
669 let mut merge_split_key_1: Option<Vec<u8>> = None;
670 let mut merge_split_key_2: Option<Vec<u8>> = None;
671 let mut diff_1: i16 = i16::MAX;
672 let mut diff_2: i16 = i16::MAX;
673
674 let mut total_merged_size: u16 = 0;
675 let mut base_meta_iter = self.meta_iter();
676 let mut cur_pos = base_meta_iter.cur;
677 let mut cur_base_meta_option = base_meta_iter.next();
678 let mut duplicate_positions = vec![];
679
680 for mini_meta in mini_page.meta_iter() {
682 let c_type = mini_meta.op_type();
683 if !c_type.is_dirty() {
685 continue;
686 }
687
688 let cur_mini_key = mini_page.get_full_key(mini_meta);
689 if cur_base_meta_option.is_some() {
690 let mut cur_base_meta = cur_base_meta_option.unwrap();
691 let mut cur_base_key = self.get_full_key(cur_base_meta);
692 let mut cmp = cur_base_key.cmp(&cur_mini_key);
693
694 while cmp == std::cmp::Ordering::Less {
697 let key_len = cur_base_meta.get_key_len();
698 let value_len = cur_base_meta.value_len();
699 total_merged_size +=
700 key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
701
702 cur_pos = base_meta_iter.cur;
703 cur_base_meta_option = base_meta_iter.next();
704 if cur_base_meta_option.is_some() {
705 cur_base_meta = cur_base_meta_option.unwrap();
706 cur_base_key = self.get_full_key(cur_base_meta);
707 cmp = cur_base_key.cmp(&cur_mini_key);
708 } else {
709 break;
710 }
711 }
712
713 if cmp == std::cmp::Ordering::Equal && cur_base_meta_option.is_some() {
716 duplicate_positions.push(cur_pos);
718 cur_pos = base_meta_iter.cur;
719 cur_base_meta_option = base_meta_iter.next();
720 }
721 }
722
723 let key_len = mini_meta.get_key_len();
724 let value_len = mini_meta.value_len();
725 total_merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
726 }
727
728 while cur_base_meta_option.is_some() {
731 let base_meta = cur_base_meta_option.unwrap();
732 let key_len = base_meta.get_key_len();
733 let value_len = base_meta.value_len();
734 total_merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
735
736 cur_base_meta_option = base_meta_iter.next();
737 }
738
739 let split_target_size = total_merged_size / 2;
741
742 let mut merged_size: u16 = 0;
745 base_meta_iter = self.meta_iter();
746 cur_base_meta_option = base_meta_iter.next();
747
748 for mini_meta in mini_page.meta_iter() {
749 let c_type = mini_meta.op_type();
752 if !c_type.is_dirty() {
753 continue;
754 }
755
756 let cur_mini_key = mini_page.get_full_key(mini_meta);
757 if cur_base_meta_option.is_some() {
758 let mut cur_base_meta = cur_base_meta_option.unwrap();
759 let mut cur_base_key = self.get_full_key(cur_base_meta);
760 let mut cmp = cur_base_key.cmp(&cur_mini_key);
761 while cmp == std::cmp::Ordering::Less {
764 let key_len = cur_base_meta.get_key_len();
765 let value_len = cur_base_meta.value_len();
766 merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
767 if merged_size >= split_target_size {
768 if merge_split_key_2.is_some() {
771 break;
772 }
773
774 let left_side: u16 = merged_size
775 - key_len
776 - value_len
777 - std::mem::size_of::<LeafKVMeta>() as u16;
778 let right_side = total_merged_size - left_side;
779
780 if merge_split_key_1.is_none() {
781 merge_split_key_1 = Some(cur_base_key);
782 diff_1 = (left_side as i16 - right_side as i16).abs();
783 } else {
784 merge_split_key_2 = Some(cur_base_key);
785 diff_2 = (left_side as i16 - right_side as i16).abs();
786 }
787 }
788
789 cur_base_meta_option = base_meta_iter.next();
790 if cur_base_meta_option.is_some() {
791 cur_base_meta = cur_base_meta_option.unwrap();
792 cur_base_key = self.get_full_key(cur_base_meta);
793 cmp = cur_base_key.cmp(&cur_mini_key);
794 } else {
795 break;
796 }
797 }
798
799 if cmp == std::cmp::Ordering::Equal && cur_base_meta_option.is_some() {
802 cur_base_meta_option = base_meta_iter.next();
803 }
804 }
805
806 let key_len = mini_meta.get_key_len();
807 let value_len = mini_meta.value_len();
808 merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
809
810 if merged_size >= split_target_size {
811 if merge_split_key_2.is_some() {
814 break;
815 }
816
817 let left_side: u16 =
818 merged_size - key_len - value_len - std::mem::size_of::<LeafKVMeta>() as u16;
819 let right_side = total_merged_size - left_side;
820
821 if merge_split_key_1.is_none() {
822 merge_split_key_1 = Some(cur_mini_key);
823 diff_1 = (left_side as i16 - right_side as i16).abs();
824 } else {
825 merge_split_key_2 = Some(cur_mini_key);
826 diff_2 = (left_side as i16 - right_side as i16).abs();
827 }
828 }
829 }
830
831 while cur_base_meta_option.is_some() {
834 let base_meta = cur_base_meta_option.unwrap();
835 let key_len = base_meta.get_key_len();
836 let value_len = base_meta.value_len();
837 merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
838
839 if merged_size >= split_target_size {
840 let cur_base_key = self.get_full_key(base_meta);
842 if merge_split_key_2.is_some() {
843 break;
844 }
845
846 let left_side: u16 =
847 merged_size - key_len - value_len - std::mem::size_of::<LeafKVMeta>() as u16;
848 let right_side = total_merged_size - left_side;
849
850 if merge_split_key_1.is_none() {
851 merge_split_key_1 = Some(cur_base_key);
852 diff_1 = (left_side as i16 - right_side as i16).abs();
853 } else {
854 merge_split_key_2 = Some(cur_base_key);
855 diff_2 = (left_side as i16 - right_side as i16).abs();
856 }
857 }
858 cur_base_meta_option = base_meta_iter.next();
859 }
860
861 for pos in duplicate_positions {
863 let pos_meta = self.get_kv_meta_mut(pos);
864 pos_meta.mark_as_deleted();
865 }
866
867 if merge_split_key_1.is_none() {
868 unreachable!(
869 "Fail to find a splitting key for merging mini and base page.{}, {}",
870 merged_size, split_target_size
871 );
872 }
873
874 if merge_split_key_2.is_some() {
876 let cmp = merge_split_key_1
877 .as_ref()
878 .unwrap()
879 .cmp(merge_split_key_2.as_ref().unwrap());
880 assert_ne!(cmp, std::cmp::Ordering::Equal);
881 }
882
883 let mut splitting_key = if merge_split_key_2.is_none() || diff_1 < diff_2 {
884 merge_split_key_1.as_ref().unwrap()
885 } else {
886 merge_split_key_2.as_ref().unwrap()
887 };
888
889 let mut fence_meta = self.get_kv_meta(LOW_FENCE_IDX);
892 if !fence_meta.is_infinite_low_fence_key() {
893 let low_fence_key = self.get_low_fence_key();
894 let cmp = splitting_key.cmp(&low_fence_key);
895 if cmp == std::cmp::Ordering::Equal {
896 splitting_key = merge_split_key_2.as_ref().unwrap();
897 }
898 }
899
900 fence_meta = self.get_kv_meta(HIGH_FENCE_IDX);
901 if !fence_meta.is_infinite_high_fence_key() {
902 let high_fence_key = self.get_high_fence_key();
903 let cmp = splitting_key.cmp(&high_fence_key);
904 assert_ne!(cmp, std::cmp::Ordering::Equal);
905 }
906
907 splitting_key.clone()
908 }
909
910 pub fn get_key_to_reach_this_node(&self) -> Vec<u8> {
911 let meta = self.get_kv_meta(self.first_meta_pos_after_fence() as usize);
912 self.get_full_key(meta)
913 }
914
915 pub fn try_get_key_to_reach_this_node(&self) -> Result<Vec<u8>, TreeError> {
919 assert!(self.meta.is_cache_only_leaf());
920
921 let index = 0;
923 if index >= self.meta.meta_count_with_fence() as usize {
924 return Err(TreeError::NeedRestart);
925 }
926
927 let meta_ptr = self.data.as_ptr() as *const LeafKVMeta;
928 let meta = unsafe { &*meta_ptr.add(index) };
929
930 let key_offset = meta.get_offset();
931 let key_len = meta.get_key_len();
932
933 if key_offset + key_len > LeafNode::max_data_size(self.meta.node_size as usize) as u16 {
934 return Err(TreeError::NeedRestart);
935 }
936
937 let key_ptr = unsafe { self.data.as_ptr().add(key_offset as usize) };
938 let key_slice =
939 unsafe { std::slice::from_raw_parts(key_ptr, (key_len - self.prefix_len) as usize) };
940 Ok(key_slice.to_vec())
941 }
942
943 pub fn get_low_fence_key(&self) -> Vec<u8> {
944 assert!(self.has_fence());
945 let fence_meta = self.get_kv_meta(LOW_FENCE_IDX);
946 if fence_meta.is_infinite_low_fence_key() {
947 vec![]
948 } else {
949 unsafe {
950 let start_ptr = self.data.as_ptr().add(fence_meta.offset as usize);
951 let key_slice =
952 std::slice::from_raw_parts(start_ptr, fence_meta.get_key_len() as usize);
953 key_slice.to_vec()
954 }
955 }
956 }
957
958 pub fn get_high_fence_key(&self) -> Vec<u8> {
959 assert!(self.has_fence());
960 let fence_meta = self.get_kv_meta(HIGH_FENCE_IDX);
961 if fence_meta.is_infinite_high_fence_key() {
962 vec![]
963 } else {
964 self.get_full_key(fence_meta)
965 }
966 }
967
968 #[inline]
971 pub(crate) fn key_cmp(&self, meta: &LeafKVMeta, key: &[u8]) -> Ordering {
972 let search_key_prefix = &key[(self.prefix_len as usize)..]
973 [..std::cmp::min(key.len() - self.prefix_len as usize, PREVIEW_SIZE)];
974
975 let prefix_key = &meta.preview_bytes[..std::cmp::min(
976 PREVIEW_SIZE,
977 meta.get_key_len() as usize - self.prefix_len as usize,
978 )];
979 let mut cmp = prefix_key.cmp(search_key_prefix);
980
981 if cmp == Ordering::Equal {
983 let full_key = self.get_remaining_key(meta);
984 let search_key_postfix = &key[self.prefix_len as usize..];
985 cmp = full_key.cmp(search_key_postfix);
986 }
987 cmp
988 }
989
990 pub(crate) fn linear_lower_bound(&self, key: &[u8]) -> u16 {
991 debug_assert!(key.len() >= self.prefix_len as usize);
992
993 let mut index = self.first_meta_pos_after_fence();
994
995 while index < self.meta.meta_count_with_fence() {
996 let key_meta = self.get_kv_meta(index as usize);
997
998 #[cfg(target_arch = "x86_64")]
999 unsafe {
1000 core::arch::x86_64::_mm_clflush(key_meta as *const LeafKVMeta as *const u8);
1002 }
1003 let cmp = self.key_cmp(key_meta, key);
1004
1005 if cmp != Ordering::Less {
1006 return index;
1007 }
1008
1009 index += 1;
1010 }
1011
1012 index
1013 }
1014
1015 pub(crate) fn lower_bound(&self, key: &[u8]) -> u16 {
1016 let mut lower = self.first_meta_pos_after_fence();
1017 let mut upper = self.meta.meta_count_with_fence();
1018
1019 let search_key_prefix = &key[(self.prefix_len as usize)..]
1020 [..std::cmp::min(key.len() - self.prefix_len as usize, PREVIEW_SIZE)];
1021
1022 debug_assert!(key.len() >= self.prefix_len as usize);
1023
1024 while lower < upper {
1025 let mid = lower + (upper - lower) / 2;
1026 let key_meta = self.get_kv_meta(mid as usize);
1027
1028 let prefix_key = &key_meta.preview_bytes[..std::cmp::min(
1029 PREVIEW_SIZE,
1030 key_meta.get_key_len() as usize - self.prefix_len as usize,
1031 )];
1032 let mut cmp = prefix_key.cmp(search_key_prefix);
1033
1034 if cmp == Ordering::Equal {
1036 let remaining_key = self.get_remaining_key(key_meta);
1037 let search_key_postfix = &key[self.prefix_len as usize..];
1038 cmp = remaining_key.cmp(search_key_postfix);
1039 }
1040
1041 match cmp {
1042 Ordering::Greater => {
1043 upper = mid;
1044 }
1045 Ordering::Equal => {
1046 return mid;
1047 }
1048 Ordering::Less => {
1049 lower = mid + 1;
1050 }
1051 }
1052 }
1053 lower
1054 }
1055
1056 pub(crate) fn insert(
1064 &mut self,
1065 key: &[u8],
1066 value: &[u8],
1067 op_type: OpType,
1068 max_fence_len: usize,
1069 ) -> bool {
1070 debug_assert!(key.len() as u16 >= self.prefix_len);
1071 match op_type {
1072 OpType::Insert | OpType::Cache => {
1073 debug_assert!(!value.is_empty());
1074 }
1075 OpType::Delete | OpType::Phantom => {}
1076 }
1077
1078 let post_fix_len = key.len() as u16 - self.prefix_len;
1079 let val_len = value.len() as u16;
1080 let kv_len = post_fix_len + val_len;
1081
1082 let value_count_with_fence = self.meta.meta_count_with_fence();
1083
1084 let pos = self.lower_bound(key) as usize;
1085
1086 if pos < value_count_with_fence as usize {
1087 let prefix_len = self.prefix_len as usize;
1088 let pos_meta = self.get_kv_meta(pos);
1089 let pos_key = self.get_remaining_key(pos_meta);
1090 let search_key_postfix = &key[prefix_len..];
1091 if pos_key.cmp(search_key_postfix) == Ordering::Equal {
1092 counter!(LeafInsertDuplicate);
1094 if op_type == OpType::Delete {
1095 let pos_meta = self.get_kv_meta_mut(pos);
1096 pos_meta.mark_as_deleted();
1097 return true;
1098 }
1099
1100 let pos_value = self.get_value(pos_meta);
1101 let pos_value_len = pos_value.len() as u16;
1102
1103 if pos_value_len >= val_len {
1104 unsafe {
1106 let pair_ptr = self.data.as_ptr().add(pos_meta.offset as usize) as *mut u8;
1107 std::ptr::copy_nonoverlapping(
1108 value.as_ptr(),
1109 pair_ptr.add(post_fix_len as usize),
1110 val_len as usize,
1111 );
1112 }
1113 let pos_meta = self.get_kv_meta_mut(pos);
1114 pos_meta.set_value_len(val_len);
1115 pos_meta.set_op_type(op_type);
1116 return true;
1117 }
1118
1119 if self.meta.remaining_size < kv_len {
1120 return false;
1121 }
1122 assert!(op_type != OpType::Cache);
1123 let offset = self.current_lowest_offset() - kv_len;
1124 unsafe {
1125 let pair_ptr = self.data.as_ptr().add(offset as usize) as *mut u8;
1126
1127 let pos_meta = self.get_kv_meta_mut(pos);
1128 pos_meta.set_value_len(val_len);
1129 pos_meta.set_op_type(op_type);
1130 pos_meta.offset = offset;
1131 std::ptr::copy_nonoverlapping(
1132 key[self.prefix_len as usize..].as_ptr(),
1133 pair_ptr,
1134 post_fix_len as usize,
1135 );
1136 std::ptr::copy_nonoverlapping(
1137 value.as_ptr(),
1138 pair_ptr.add(post_fix_len as usize),
1139 val_len as usize,
1140 );
1141 }
1142 self.meta.remaining_size -= kv_len;
1143 return true;
1144 }
1145 }
1146
1147 if op_type == OpType::Delete && (self.is_base_page()) {
1151 return true;
1153 }
1154
1155 if self.full_with_fences(
1157 kv_len + std::mem::size_of::<LeafKVMeta>() as u16,
1158 max_fence_len,
1159 ) {
1160 return false;
1161 }
1162
1163 counter!(LeafInsertNew);
1164
1165 let offset = self.current_lowest_offset() - kv_len;
1166 let new_meta =
1167 LeafKVMeta::make_prefixed_meta(offset, val_len, key, self.prefix_len, op_type);
1168
1169 unsafe {
1170 let metas_size = std::mem::size_of::<LeafKVMeta>()
1171 * (self.meta.meta_count_with_fence() - pos as u16) as usize;
1172 let src_ptr = self
1173 .data
1174 .as_ptr()
1175 .add(pos * std::mem::size_of::<LeafKVMeta>());
1176 let dest_ptr = self
1177 .data
1178 .as_mut_ptr()
1179 .add((pos + 1) * std::mem::size_of::<LeafKVMeta>());
1180 std::ptr::copy(src_ptr, dest_ptr, metas_size);
1181
1182 self.write_initial_kv_meta(pos, new_meta);
1183
1184 let pair_ptr = self.data.as_mut_ptr().add(offset as usize);
1185 std::ptr::copy_nonoverlapping(
1186 key[self.prefix_len as usize..].as_ptr(),
1187 pair_ptr,
1188 post_fix_len as usize,
1189 );
1190 std::ptr::copy_nonoverlapping(
1191 value.as_ptr(),
1192 pair_ptr.add(post_fix_len as usize),
1193 val_len as usize,
1194 );
1195 }
1196
1197 self.meta.remaining_size -= kv_len + std::mem::size_of::<LeafKVMeta>() as u16;
1198 self.meta.increment_value_count();
1199 true
1200 }
1201
1202 pub(crate) fn is_base_page(&self) -> bool {
1204 (!self.meta.is_cache_only_leaf()) && self.next_level.is_null()
1205 }
1206
1207 pub fn split(&mut self, sibling: &mut LeafNode, cache_only: bool) -> Vec<u8> {
1211 if cache_only {
1212 assert!(self.meta.is_cache_only_leaf() && !self.has_fence());
1213 } else {
1214 assert!(self.is_base_page() && self.has_fence());
1215 }
1216
1217 let current_count = self.meta.meta_count_without_fence();
1218
1219 let (splitting_key, new_cur_count) = self.get_split_key(cache_only);
1224 let sibling_cnt = current_count - new_cur_count;
1225
1226 if !cache_only {
1230 let low_fence_for_right = self.get_kv_meta(FENCE_KEY_CNT + new_cur_count as usize);
1231 assert!(!low_fence_for_right.is_infinite_low_fence_key());
1232 let high_fence_for_right = self.get_kv_meta(HIGH_FENCE_IDX);
1233
1234 let low_fence_key = self.get_full_key(low_fence_for_right);
1235 let high_fence_key = if high_fence_for_right.is_infinite_high_fence_key() {
1236 vec![]
1237 } else {
1238 self.get_full_key(high_fence_for_right)
1239 };
1240
1241 sibling.initialize(
1242 &low_fence_key,
1243 &high_fence_key,
1244 sibling.meta.node_size as usize,
1245 MiniPageNextLevel::new_null(),
1246 true,
1247 cache_only,
1248 );
1249 } else {
1250 sibling.initialize(
1251 &[],
1252 &[],
1253 sibling.meta.node_size as usize,
1254 MiniPageNextLevel::new_null(),
1255 false,
1256 cache_only,
1257 );
1258 }
1259
1260 let starting_kv_idx = if cache_only { 0 } else { FENCE_KEY_CNT };
1261
1262 for i in 0..sibling_cnt {
1263 let kv_meta = self.get_kv_meta((new_cur_count + i) as usize + starting_kv_idx);
1264 if kv_meta.op_type() == OpType::Delete {
1265 continue;
1267 }
1268 let key = self.get_full_key(kv_meta);
1269 let value = self.get_value(kv_meta);
1270 let insert_rt = sibling.insert(&key, value, OpType::Insert, 0);
1271 assert!(insert_rt);
1272 }
1273
1274 if !cache_only {
1275 self.meta
1276 .set_value_count(new_cur_count + FENCE_KEY_CNT as u16);
1277 } else {
1278 self.meta.set_value_count(new_cur_count);
1279 }
1280
1281 self.consolidate_after_split(&splitting_key);
1282
1283 splitting_key
1284 }
1285
1286 pub fn split_with_key(
1291 &mut self,
1292 sibling: &mut LeafNode,
1293 merge_split_key: &Vec<u8>,
1294 cache_only: bool,
1295 ) {
1296 if cache_only {
1297 assert!(self.meta.is_cache_only_leaf() && !self.has_fence());
1298 } else {
1299 assert!(self.is_base_page() && self.has_fence());
1300 }
1301
1302 let current_count = self.meta.meta_count_without_fence();
1303 let mut new_cur_count = self.get_kv_num_below_key(merge_split_key);
1308 let sibling_cnt = current_count - new_cur_count;
1309
1310 if !cache_only {
1314 let high_fence_for_right = self.get_kv_meta(HIGH_FENCE_IDX);
1315
1316 let low_fence_key = merge_split_key.clone();
1317 let high_fence_key = if high_fence_for_right.is_infinite_high_fence_key() {
1318 vec![]
1319 } else {
1320 self.get_full_key(high_fence_for_right)
1321 };
1322
1323 sibling.initialize(
1324 &low_fence_key,
1325 &high_fence_key,
1326 sibling.meta.node_size as usize,
1327 MiniPageNextLevel::new_null(),
1328 true,
1329 cache_only,
1330 );
1331 } else {
1332 sibling.initialize(
1333 &[],
1334 &[],
1335 sibling.meta.node_size as usize,
1336 MiniPageNextLevel::new_null(),
1337 false,
1338 cache_only,
1339 );
1340 }
1341
1342 let starting_kv_index = if cache_only { 0 } else { FENCE_KEY_CNT };
1343
1344 for i in 0..sibling_cnt {
1345 let kv_meta = self.get_kv_meta((new_cur_count + i) as usize + starting_kv_index);
1346 if kv_meta.op_type() == OpType::Delete {
1347 continue;
1349 }
1350 let key = self.get_full_key(kv_meta);
1351 let value = self.get_value(kv_meta);
1352 let insert_rt = sibling.insert(&key, value, OpType::Insert, 0);
1353 assert!(insert_rt);
1354 }
1355
1356 if !cache_only {
1357 new_cur_count += FENCE_KEY_CNT as u16;
1358 }
1359
1360 self.meta.set_value_count(new_cur_count);
1361 self.consolidate_after_split(merge_split_key);
1362
1363 if !cache_only {
1364 let left_high_fence = self.get_kv_meta(HIGH_FENCE_IDX);
1367 let left_high_fence_key = self.get_full_key(left_high_fence);
1368 let right_low_fence_key = sibling.get_low_fence_full_key();
1369
1370 assert_eq!(left_high_fence_key, *merge_split_key); assert_eq!(right_low_fence_key, *merge_split_key);
1372 }
1373 }
1374
1375 pub(crate) fn consolidate_inner(
1376 &mut self,
1377 new_optype: OpType,
1378 new_high_fence: Option<&[u8]>,
1379 skip_tombstone: bool,
1380 cache_only: bool,
1381 skip_key: Option<&[u8]>,
1382 ) {
1383 let mut pairs = Vec::new();
1384
1385 for meta in self.meta_iter() {
1386 if skip_tombstone && meta.op_type() == OpType::Delete {
1387 continue;
1389 }
1390
1391 match skip_key {
1392 Some(s_k) => {
1394 let k = self.get_full_key(meta);
1395 let cmp = s_k.cmp(&k);
1396
1397 if cmp != Ordering::Equal {
1398 pairs.push((k, self.get_value(meta).to_owned(), meta.op_type()));
1399 }
1400 }
1401 None => {
1402 pairs.push((
1403 self.get_full_key(meta),
1404 self.get_value(meta).to_owned(),
1405 meta.op_type(),
1406 ));
1407 }
1408 }
1409 }
1410
1411 let has_fence = self.has_fence();
1412 let (low_fence, high_fence) = if has_fence {
1413 let high_fence_key = match new_high_fence {
1414 None => self.get_high_fence_key(),
1415 Some(key) => key.to_vec(),
1416 };
1417 (self.get_low_fence_key(), high_fence_key)
1418 } else {
1419 (vec![], vec![])
1420 };
1421
1422 let node_size = self.meta.node_size;
1423
1424 self.initialize(
1425 &low_fence,
1426 &high_fence,
1427 node_size as usize,
1428 self.next_level,
1429 has_fence,
1430 cache_only,
1431 );
1432
1433 for (key, value, op_type) in pairs {
1434 let rt = if op_type == OpType::Delete {
1435 self.insert(&key, &value, OpType::Delete, 0)
1436 } else {
1437 self.insert(&key, &value, new_optype, 0)
1438 };
1439 assert!(rt);
1440 }
1441 }
1442
1443 #[allow(dead_code)]
1444 pub(crate) fn consolidate(&mut self) {
1445 self.consolidate_inner(
1446 OpType::Insert,
1447 None,
1448 true,
1449 self.meta.is_cache_only_leaf(),
1450 None,
1451 );
1452 }
1453
1454 #[allow(dead_code)]
1457 pub(crate) fn consolidate_after_merge(&mut self) {
1458 assert!(!self.is_base_page());
1459 self.consolidate_inner(
1460 OpType::Cache,
1461 None,
1462 false,
1463 self.meta.is_cache_only_leaf(),
1464 None,
1465 );
1466 }
1467
1468 fn consolidate_after_split(&mut self, high_fence: &[u8]) {
1474 assert!(self.meta.is_cache_only_leaf() || self.is_base_page());
1475 self.consolidate_inner(
1476 OpType::Insert,
1477 Some(high_fence),
1478 true,
1479 self.meta.is_cache_only_leaf(),
1480 None,
1481 );
1482 }
1483
1484 pub(crate) fn consolidate_skip_key(&mut self, key: &[u8]) {
1488 assert!(!self.is_base_page());
1489 assert!(self.meta.is_cache_only_leaf());
1490 self.consolidate_inner(
1491 OpType::Insert,
1492 None,
1493 true,
1494 self.meta.is_cache_only_leaf(),
1495 Some(key),
1496 );
1497 }
1498
1499 pub(crate) fn copy_initialize_to(
1501 &self,
1502 dst_node: *mut LeafNode,
1503 dst_size: usize,
1504 discard_cold_cache: bool,
1505 ) {
1506 assert!(!self.is_base_page());
1507 assert!(self.meta.node_size as usize <= dst_size);
1508 let dst_ref = unsafe { &mut *dst_node };
1509 let empty = vec![];
1510
1511 dst_ref.initialize(
1512 &empty,
1513 &empty,
1514 dst_size,
1515 self.next_level,
1516 false, self.meta.is_cache_only_leaf(),
1518 );
1519
1520 for meta in self.meta_iter() {
1521 let op = meta.op_type();
1522
1523 if discard_cold_cache && op.is_cache() && !meta.is_referenced() {
1528 continue;
1529 }
1530
1531 let value = self.get_value(meta);
1532 let rt = dst_ref.insert(&self.get_full_key(meta), value, op, 0);
1533 assert!(rt);
1534 }
1535 }
1536
1537 pub(crate) fn initialize(
1539 &mut self,
1540 low_fence: &[u8],
1541 high_fence: &[u8],
1542 node_size: usize,
1543 next_level: MiniPageNextLevel,
1544 has_fence: bool,
1545 cache_only: bool,
1546 ) {
1547 if !has_fence {
1548 assert!(low_fence.is_empty());
1549 assert!(high_fence.is_empty());
1550
1551 self.meta = NodeMeta::new(
1552 LeafNode::max_data_size(node_size) as u16,
1553 false,
1554 false,
1555 node_size as u16,
1556 cache_only,
1557 );
1558 self.prefix_len = 0;
1559 self.next_level = next_level;
1560 } else {
1561 self.meta = NodeMeta::new(
1562 LeafNode::max_data_size(node_size) as u16, false,
1564 true,
1565 node_size as u16,
1566 cache_only,
1567 );
1568
1569 self.prefix_len = common_prefix_len(low_fence, high_fence);
1570 self.next_level = next_level;
1571 self.install_fence_key(low_fence, false);
1572 self.install_fence_key(high_fence, true);
1573 }
1574 }
1575
1576 pub(crate) fn set_split_flag(&mut self) {
1577 self.meta.set_split_flag();
1578 }
1579
1580 pub(crate) fn get_split_flag(&self) -> bool {
1581 self.meta.get_split_flag()
1582 }
1583
1584 pub(crate) fn read_by_key(&self, search_key: &[u8], out_buffer: &mut [u8]) -> LeafReadResult {
1585 self.read_by_key_inner(search_key, out_buffer, true)
1586 }
1587
1588 #[must_use]
1590 pub(crate) fn read_by_key_inner(
1591 &self,
1592 search_key: &[u8],
1593 out_buffer: &mut [u8],
1594 binary_search: bool,
1595 ) -> LeafReadResult {
1596 let val_count = self.meta.meta_count_with_fence();
1597 let pos = if binary_search {
1598 self.lower_bound(search_key)
1599 } else {
1600 self.linear_lower_bound(search_key)
1601 };
1602
1603 if pos >= val_count {
1604 counter!(LeafNotFoundDueToRange);
1605 return LeafReadResult::NotFound;
1606 }
1607
1608 let kv_meta = self.get_kv_meta(pos as usize);
1609 let target_key = self.get_remaining_key(kv_meta);
1610
1611 if !kv_meta.is_referenced() {
1613 kv_meta.mark_as_ref();
1614 }
1615
1616 let input_post_key = &search_key[self.prefix_len as usize..];
1617 let cmp = target_key.cmp(input_post_key);
1618
1619 if cmp != Ordering::Equal {
1620 counter!(LeafNotFoundDueToKey);
1621 LeafReadResult::NotFound
1622 } else {
1623 if kv_meta.op_type().is_absent() {
1624 return LeafReadResult::Deleted;
1625 }
1626 let val_len = kv_meta.value_len();
1627 let val_ref = self.get_value(kv_meta);
1628 debug_assert_eq!(val_len as usize, val_ref.len());
1629 out_buffer[..val_len as usize].copy_from_slice(val_ref);
1630 LeafReadResult::Found(val_len as u32)
1631 }
1632 }
1633
1634 pub(crate) fn get_chain_size_hint(
1639 key_len: usize,
1640 value_len: usize,
1641 page_classes: &[usize],
1642 cache_only: bool,
1643 ) -> usize {
1644 let mut initial_record_size = key_len + value_len + std::mem::size_of::<LeafKVMeta>();
1645 initial_record_size += std::mem::size_of::<LeafNode>();
1646
1647 if let Some(s) = page_classes[0..(page_classes.len() - 1)]
1648 .iter()
1649 .position(|x| initial_record_size < *x)
1650 {
1651 return page_classes[s];
1652 } else if cache_only && initial_record_size <= page_classes[page_classes.len() - 1] {
1653 return page_classes[page_classes.len() - 1];
1654 }
1655
1656 panic!(
1657 "Record size {} plus metadata exceeds the max mini-page size {:?}",
1658 initial_record_size, page_classes
1659 );
1660 }
1661
1662 pub(crate) fn new_size_if_upgrade(
1668 &self,
1669 incoming_size: usize,
1670 page_classes: &[usize],
1671 cache_only: bool,
1672 ) -> Option<usize> {
1673 let cur_size = self.meta.node_size as usize;
1674 let request_size = cur_size + incoming_size;
1675
1676 if let Some(s) = page_classes[0..(page_classes.len() - 1)]
1677 .iter()
1678 .position(|x| request_size < *x)
1679 {
1680 if s == 0 {
1681 panic!("Should not be here");
1682 }
1683 return Some(page_classes[s]);
1684 } else if cache_only && request_size <= page_classes[page_classes.len() - 1] {
1685 return Some(page_classes[page_classes.len() - 1]);
1686 }
1687
1688 None
1689 }
1690
1691 pub(crate) fn free_base_page(node: *mut LeafNode) {
1693 assert!(unsafe { &*node }.is_base_page());
1694 let node_size = unsafe { &*node }.meta.node_size as usize;
1695 let layout = Layout::from_size_align(node_size, std::mem::align_of::<LeafNode>()).unwrap();
1696 unsafe {
1697 std::alloc::dealloc(node as *mut u8, layout);
1698 }
1699 }
1700
1701 pub(crate) fn need_actually_merge_to_disk(&self) -> bool {
1702 for meta in self.meta_iter() {
1703 if meta.op_type().is_dirty() {
1704 return true;
1705 }
1706 }
1707
1708 false
1709 }
1710
1711 fn estimate_merge_size(&self) -> usize {
1712 let mut required_size = 0;
1713
1714 for meta in self.meta_iter() {
1715 if meta.op_type() == OpType::Insert {
1716 required_size += (meta.get_key_len() + meta.value_len()) as usize
1717 + std::mem::size_of::<LeafKVMeta>();
1718 }
1719 }
1720
1721 required_size
1722 }
1723
1724 pub(crate) fn full_with_fences(&self, requested_size: u16, max_fence_len: usize) -> bool {
1727 if self.meta.remaining_size < requested_size {
1728 return true;
1729 }
1730
1731 if self.meta.has_fence() {
1732 let mut empty_data_size = self.meta.remaining_size; let low_key_meta = self.get_kv_meta(LOW_FENCE_IDX);
1735 if !low_key_meta.is_infinite_low_fence_key() {
1736 empty_data_size += low_key_meta.get_key_len();
1737 }
1738
1739 let high_key_meta = self.get_kv_meta(HIGH_FENCE_IDX);
1740 if !high_key_meta.is_infinite_high_fence_key() {
1741 empty_data_size += high_key_meta.get_key_len();
1742 }
1743
1744 if empty_data_size >= requested_size + max_fence_len as u16 {
1745 return false;
1746 }
1747
1748 return true;
1749 }
1750
1751 false
1752 }
1753
1754 pub(crate) fn merge_mini_page(&mut self, mini_page: &LeafNode, max_fence_len: usize) -> bool {
1755 let size_required = mini_page.estimate_merge_size();
1756
1757 if self.full_with_fences(size_required as u16, max_fence_len) {
1758 return false;
1759 }
1760
1761 for meta in mini_page.meta_iter() {
1762 let c_key = mini_page.get_full_key(meta);
1763 let c_type = meta.op_type();
1764 if !c_type.is_dirty() {
1765 continue;
1770 }
1771 let c_value = mini_page.get_value(meta);
1772 let rt = self.insert(&c_key, c_value, c_type, 0);
1773 assert!(rt);
1774 }
1775
1776 true
1777 }
1778
1779 pub(crate) fn get_stats(&self) -> LeafStats {
1780 let mut keys = Vec::new();
1781 let mut values = Vec::new();
1782 let mut op_types = Vec::new();
1783 let node_size = self.meta.node_size as usize;
1784 let prefix = self.get_prefix().to_owned();
1785
1786 for meta in self.meta_iter() {
1787 let key = self.get_full_key(meta);
1788 let value = self.get_value(meta);
1789 keys.push(key);
1790 values.push(value.to_owned());
1791 op_types.push(meta.op_type());
1792 }
1793
1794 LeafStats {
1795 keys,
1796 values,
1797 op_types,
1798 prefix,
1799 base_node: None,
1800 next_level: self.next_level,
1801 node_size,
1802 }
1803 }
1804
1805 pub(crate) fn has_fence(&self) -> bool {
1806 self.meta.has_fence()
1807 }
1808
1809 pub(crate) fn meta_iter(&self) -> LeafMetaIter<'_> {
1811 LeafMetaIter::new(self)
1812 }
1813
1814 fn first_meta_pos_after_fence(&self) -> u16 {
1815 if self.has_fence() {
1816 FENCE_KEY_CNT as u16
1817 } else {
1818 0
1819 }
1820 }
1821
1822 pub(crate) fn get_record_by_pos_with_bound(
1823 &self,
1824 pos: u32,
1825 out_buffer: &mut [u8],
1826 return_field: ScanReturnField,
1827 bound_key: &Option<Vec<u8>>,
1828 ) -> GetScanRecordByPosResult {
1829 if pos >= self.meta.meta_count_with_fence() as u32 {
1830 return GetScanRecordByPosResult::EndOfLeaf;
1831 }
1832
1833 let meta = self.get_kv_meta(pos as usize);
1834
1835 if meta.op_type().is_absent() {
1836 return GetScanRecordByPosResult::Deleted;
1837 }
1838
1839 match return_field {
1840 ScanReturnField::Value => {
1841 if let Some(bk) = bound_key {
1842 let cmp = self.get_full_key(meta).as_slice().cmp(bk);
1843 if cmp == Ordering::Greater {
1844 return GetScanRecordByPosResult::BoundKeyExceeded;
1845 }
1846 }
1847
1848 let value = self.get_value(meta);
1849 let value_len = meta.value_len() as usize;
1850 out_buffer[..value_len].copy_from_slice(value);
1851 GetScanRecordByPosResult::Found(0, value_len as u32)
1852 }
1853 ScanReturnField::Key => {
1854 let full_key = self.get_full_key(meta);
1855
1856 if let Some(bk) = bound_key {
1857 let cmp = full_key.as_slice().cmp(bk);
1858 if cmp == Ordering::Greater {
1859 return GetScanRecordByPosResult::BoundKeyExceeded;
1860 }
1861 }
1862
1863 let key_len = full_key.len();
1864 out_buffer[..key_len].copy_from_slice(&full_key);
1865 GetScanRecordByPosResult::Found(key_len as u32, 0)
1866 }
1867 ScanReturnField::KeyAndValue => {
1868 let full_key = self.get_full_key(meta);
1869
1870 if let Some(bk) = bound_key {
1871 let cmp = full_key.as_slice().cmp(bk);
1872 if cmp == Ordering::Greater {
1873 return GetScanRecordByPosResult::BoundKeyExceeded;
1874 }
1875 }
1876
1877 let key_len = full_key.len();
1878 let value = self.get_value(meta);
1879 let value_len = meta.value_len() as usize;
1880
1881 out_buffer[..key_len].copy_from_slice(&full_key);
1882 out_buffer[key_len..key_len + value_len].copy_from_slice(value);
1883
1884 GetScanRecordByPosResult::Found(key_len as u32, value_len as u32)
1885 }
1886 }
1887 }
1888}
1889
1890pub(crate) struct LeafMetaIter<'a> {
1891 node: &'a LeafNode,
1892 cur: usize,
1893}
1894
1895impl LeafMetaIter<'_> {
1896 fn new(leaf: &LeafNode) -> LeafMetaIter<'_> {
1897 let cur = leaf.first_meta_pos_after_fence();
1898 LeafMetaIter {
1899 node: leaf,
1900 cur: cur as usize,
1901 }
1902 }
1903}
1904
1905impl<'a> Iterator for LeafMetaIter<'a> {
1906 type Item = &'a LeafKVMeta;
1907
1908 fn next(&mut self) -> Option<Self::Item> {
1909 if self.cur < self.node.meta.meta_count_with_fence() as usize {
1910 let rt = self.node.get_kv_meta(self.cur);
1911 self.cur += 1;
1912 Some(rt)
1913 } else {
1914 None
1915 }
1916 }
1917}
1918
1919pub(crate) enum GetScanRecordByPosResult {
1920 EndOfLeaf,
1921 Deleted,
1922 Found(u32, u32), BoundKeyExceeded, }
1925
1926#[derive(Debug, PartialEq, Eq, Clone)]
1927pub enum LeafReadResult {
1928 Deleted,
1929 NotFound,
1930 Found(u32),
1931 InvalidKey,
1932}
1933
1934#[cfg(test)]
1935mod tests {
1936 use super::*;
1937 use bytemuck::cast_slice;
1938 use rstest::rstest;
1939
1940 #[test]
1941 fn test_op_type_enum() {
1942 assert_eq!(OpType::Insert as u8, 0);
1943 assert_eq!(OpType::Delete as u8, 1);
1944 assert_eq!(OpType::Cache as u8, 2);
1945 }
1946
1947 #[test]
1948 fn test_make_prefixed_meta() {
1949 let key = [1, 2, 3, 4, 5];
1950 let prefix_len = 2;
1951 let meta = LeafKVMeta::make_prefixed_meta(10, 20, &key, prefix_len, OpType::Insert);
1952
1953 assert_eq!(meta.offset, 10);
1954 assert_eq!(meta.get_key_len(), 5);
1955 assert_eq!(meta.value_len(), 20);
1956 assert_eq!(meta.preview_bytes, [3, 4]);
1957 assert_eq!(meta.op_type(), OpType::Insert);
1958 assert!(!meta.is_referenced());
1959 }
1960
1961 #[test]
1962 fn test_make_infinite_high_fence_key() {
1963 let meta = LeafKVMeta::make_infinite_high_fence_key();
1964 assert_eq!(meta.offset, u16::MAX);
1965 assert_eq!(meta.get_key_len(), 0);
1966 assert_eq!(meta.value_len(), 0);
1967 assert!(meta.is_infinite_high_fence_key());
1968 }
1969
1970 #[test]
1971 fn test_make_infinite_low_fence_key() {
1972 let meta = LeafKVMeta::make_infinite_low_fence_key();
1973 assert_eq!(meta.offset, u16::MAX - 1);
1974 assert_eq!(meta.get_key_len(), 0);
1975 assert_eq!(meta.value_len(), 0);
1976 assert!(meta.is_infinite_low_fence_key());
1977 }
1978
1979 #[test]
1980 fn test_value_len() {
1981 let mut meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Insert);
1982 assert_eq!(meta.value_len(), 20);
1983
1984 meta.set_value_len(30);
1985 assert_eq!(meta.value_len(), 30);
1986 }
1987
1988 #[test]
1989 fn test_op_type() {
1990 let meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Delete);
1991 assert_eq!(meta.op_type(), OpType::Delete);
1992 }
1993
1994 #[test]
1995 fn test_mark_as_ref_and_clear_ref() {
1996 let meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Insert);
1997 assert!(!meta.is_referenced());
1998
1999 meta.mark_as_ref();
2000 assert!(meta.is_referenced());
2001
2002 meta.clear_ref();
2003 assert!(!meta.is_referenced());
2004 }
2005
2006 #[test]
2007 fn test_mark_as_deleted_and_is_deleted() {
2008 let mut meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Insert);
2009 assert!(!meta.is_deleted());
2010
2011 meta.mark_as_deleted();
2012 assert!(meta.is_deleted());
2013 }
2014
2015 #[rstest]
2019 #[case(vec![1], vec![2], 2)]
2020 #[case(vec![2], vec![1], 2)]
2021 fn test_get_merge_split_key(
2022 #[case] base_page_values: Vec<usize>,
2023 #[case] mini_page_values: Vec<usize>,
2024 #[case] splitting_key: usize,
2025 ) {
2026 let base = unsafe { &mut *LeafNode::make_base_page(4096) };
2027 let mini = unsafe { &mut *LeafNode::make_base_page(4096) }; for i in 0..base_page_values.len() {
2031 let n = &base_page_values[i];
2032 let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
2033 let key = cast_slice::<usize, u8>(n_slice);
2034 let value = cast_slice::<usize, u8>(n_slice);
2035
2036 let rt = base.insert(key, value, OpType::Insert, 2);
2037 assert!(rt);
2038 }
2039
2040 for i in 0..mini_page_values.len() {
2041 let n = &mini_page_values[i];
2042 let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
2043 let key = cast_slice::<usize, u8>(n_slice);
2044 let value = cast_slice::<usize, u8>(n_slice);
2045
2046 let rt = mini.insert(key, value, OpType::Insert, 2);
2047 assert!(rt);
2048 }
2049
2050 let merge_split_key_byte = base.get_merge_split_key(mini);
2052 let merge_splitting_key = cast_slice::<u8, usize>(&merge_split_key_byte);
2053
2054 assert_eq!(merge_splitting_key[0], splitting_key);
2055
2056 LeafNode::free_base_page(base);
2057 LeafNode::free_base_page(mini);
2058 }
2059
2060 #[rstest]
2065 #[case(vec![1, 2, 3, 4], 3)]
2066 #[case(vec![1], 2)]
2067 #[case(vec![2], 2)]
2068 fn test_split_with_key(#[case] base_page_values: Vec<usize>, #[case] splitting_key: usize) {
2069 let base = unsafe { &mut *LeafNode::make_base_page(4096) };
2070 let sibling = unsafe { &mut *LeafNode::make_base_page(4096) };
2071
2072 for i in 0..base_page_values.len() {
2074 let n = &base_page_values[i];
2075 let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
2076 let key = cast_slice::<usize, u8>(n_slice);
2077 let value = cast_slice::<usize, u8>(n_slice);
2078
2079 let rt = base.insert(key, value, OpType::Insert, 2);
2080 assert!(rt);
2081 }
2082
2083 let splitting_key_ptr = &splitting_key;
2084 let splitting_key_slice =
2085 unsafe { std::slice::from_raw_parts(splitting_key_ptr as *const usize, 1) };
2086 let splitting_key_byte_arrary = cast_slice::<usize, u8>(splitting_key_slice).to_vec();
2087
2088 base.split_with_key(sibling, &splitting_key_byte_arrary, false);
2090
2091 let mut out_buffer = vec![0u8; 1024];
2094 for i in 0..base_page_values.len() {
2095 let mut page = &mut *base;
2096 if base_page_values[i] >= splitting_key {
2097 page = &mut *sibling;
2098 }
2099
2100 let n = &base_page_values[i];
2101 let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
2102 let key = cast_slice::<usize, u8>(n_slice);
2103
2104 let rt = page.read_by_key(key, &mut out_buffer);
2105
2106 assert_eq!(rt, LeafReadResult::Found(key.len() as u32)); assert_eq!(&out_buffer[0..key.len()], key);
2108 }
2109
2110 LeafNode::free_base_page(base);
2111 LeafNode::free_base_page(sibling);
2112 }
2113}