1use core::panic;
5use std::{alloc::Layout, cmp::Ordering, sync::atomic};
6
7use crate::{
8 circular_buffer::CircularBufferPtr, counter, error::TreeError, range_scan::ScanReturnField,
9 snapshot::CPRSnapShotMgr, snapshot::INVALID_SNAPSHOT_VERSION, utils::stats::LeafStats,
10};
11
12use super::{node_meta::NodeMeta, FENCE_KEY_CNT};
13
14#[repr(u8)]
17#[derive(PartialEq, Eq, Debug, Clone, Copy)]
18pub(crate) enum OpType {
19 Insert = 0,
20 Delete = 1,
21 Cache = 2, Phantom = 3, }
24
25impl OpType {
26 pub(crate) fn is_dirty(&self) -> bool {
27 match self {
28 OpType::Insert | OpType::Delete => true,
29 OpType::Cache | OpType::Phantom => false,
30 }
31 }
32
33 fn is_absent(&self) -> bool {
34 match self {
35 OpType::Insert | OpType::Cache => false,
36 OpType::Phantom | OpType::Delete => true,
37 }
38 }
39
40 fn is_cache(&self) -> bool {
41 match self {
42 OpType::Cache | OpType::Phantom => true,
43 OpType::Insert | OpType::Delete => false,
44 }
45 }
46}
47
48const PREVIEW_SIZE: usize = 2;
49
50const LOW_FENCE_IDX: usize = 0;
51const HIGH_FENCE_IDX: usize = 1;
52
53const OP_TYPE_SHIFT: u16 = 14;
54const KEY_LEN_MASK: u16 = 0x3F_FF; const REF_BIT_MASK: u16 = 0x80_00;
58const VALUE_LEN_MASK: u16 = 0x7F_FF; pub(crate) fn common_prefix_len(low_fence: &[u8], high_fence: &[u8]) -> u16 {
62 let mut prefix_len = 0;
63 for i in 0..std::cmp::min(low_fence.len(), high_fence.len()) {
64 if low_fence[i] == high_fence[i] {
65 prefix_len += 1;
66 } else {
67 break;
68 }
69 }
70 prefix_len
71}
72
73#[repr(C)]
74pub(crate) struct LeafKVMeta {
75 offset: u16,
76 op_type_key_len_in_byte: u16, ref_value_len_in_byte: std::sync::atomic::AtomicU16, preview_bytes: [u8; PREVIEW_SIZE],
79}
80
81impl LeafKVMeta {
82 pub(crate) fn make_prefixed_meta(
83 offset: u16,
84 value_len: u16,
85 key: &[u8],
86 prefix_len: u16,
87 op_type: OpType,
88 ) -> Self {
89 let mut meta = Self {
90 offset,
91 op_type_key_len_in_byte: key.len() as u16 | ((op_type as u16) << OP_TYPE_SHIFT),
92 ref_value_len_in_byte: std::sync::atomic::AtomicU16::new(value_len),
93 preview_bytes: [0; PREVIEW_SIZE],
94 };
95
96 for i in 0..std::cmp::min(key.len().saturating_sub(prefix_len as usize), PREVIEW_SIZE) {
97 meta.preview_bytes[i] = key[i + prefix_len as usize];
98 }
99
100 meta.clear_ref();
103 meta
104 }
105
106 pub fn make_infinite_high_fence_key() -> Self {
107 assert_eq!(std::mem::size_of::<Self>(), super::KV_META_SIZE);
108
109 Self {
110 offset: u16::MAX,
111 op_type_key_len_in_byte: 0,
112 ref_value_len_in_byte: std::sync::atomic::AtomicU16::new(0),
113 preview_bytes: [0; PREVIEW_SIZE],
114 }
115 }
116
117 pub fn make_infinite_low_fence_key() -> Self {
118 Self {
119 offset: u16::MAX - 1,
120 op_type_key_len_in_byte: 0,
121 ref_value_len_in_byte: std::sync::atomic::AtomicU16::new(0),
122 preview_bytes: [0; PREVIEW_SIZE],
123 }
124 }
125
126 pub fn is_infinite_low_fence_key(&self) -> bool {
127 self.offset == u16::MAX - 1
128 }
129
130 pub fn is_infinite_high_fence_key(&self) -> bool {
131 self.offset == u16::MAX
132 }
133
134 pub fn value_len(&self) -> u16 {
135 self.ref_value_len_in_byte.load(atomic::Ordering::Relaxed) & VALUE_LEN_MASK
136 }
137
138 pub fn set_value_len(&mut self, value: u16) {
139 let v = self.ref_value_len_in_byte.load(atomic::Ordering::Relaxed);
140 self.ref_value_len_in_byte.store(
141 (v & !VALUE_LEN_MASK) | (value & VALUE_LEN_MASK),
142 atomic::Ordering::Relaxed,
143 );
144 }
145
146 pub fn set_op_type(&mut self, op_type: OpType) {
147 self.op_type_key_len_in_byte = self.get_key_len() | ((op_type as u16) << OP_TYPE_SHIFT);
148 }
149
150 pub fn op_type(&self) -> OpType {
151 let l = self.op_type_key_len_in_byte;
152 let b = l >> OP_TYPE_SHIFT;
153 unsafe { std::mem::transmute(b as u8) }
154 }
155
156 pub fn mark_as_ref(&self) {
157 self.ref_value_len_in_byte
158 .fetch_or(REF_BIT_MASK, atomic::Ordering::Relaxed);
159 }
160
161 pub fn clear_ref(&self) {
162 self.ref_value_len_in_byte
163 .fetch_and(!REF_BIT_MASK, atomic::Ordering::Relaxed);
164 }
165
166 pub fn is_referenced(&self) -> bool {
167 self.ref_value_len_in_byte.load(atomic::Ordering::Relaxed) & REF_BIT_MASK != 0
168 }
169
170 pub fn mark_as_deleted(&mut self) {
171 self.set_op_type(OpType::Delete);
172 }
173
174 #[allow(dead_code)]
175 pub fn is_deleted(&self) -> bool {
176 self.op_type() == OpType::Delete
177 }
178
179 pub(crate) fn get_offset(&self) -> u16 {
180 self.offset
181 }
182
183 pub(crate) fn get_key_len(&self) -> u16 {
184 self.op_type_key_len_in_byte & KEY_LEN_MASK
185 }
186}
187
188#[derive(Debug, Clone, Copy)]
189pub(crate) struct MiniPageNextLevel {
190 val: usize,
191}
192
193impl MiniPageNextLevel {
194 pub(crate) fn new(val: usize) -> Self {
195 Self { val }
196 }
197
198 pub(crate) fn as_offset(&self) -> usize {
199 assert!(!self.is_null());
200 self.val
201 }
202
203 pub(crate) fn new_null() -> Self {
204 Self { val: usize::MAX }
205 }
206
207 pub(crate) fn is_null(&self) -> bool {
208 self.val == usize::MAX
209 }
210}
211
212#[repr(C)]
213pub(crate) struct LeafNode {
214 pub(crate) meta: NodeMeta,
215 prefix_len: u16,
216 pub(crate) next_level: MiniPageNextLevel,
217 pub(crate) lsn: u64,
218 snapshot_version: u64,
219 data: [u8; 0],
220}
221
222const _: () = assert!(std::mem::size_of::<LeafNode>() == 32);
223
224impl LeafNode {
225 fn max_data_size(node_size: usize) -> usize {
226 node_size - std::mem::size_of::<LeafNode>()
227 }
228
229 pub(crate) fn initialize_mini_page(
230 ptr: &CircularBufferPtr,
231 node_size: usize,
232 next_level: MiniPageNextLevel,
233 cache_only: bool,
234 ) {
235 unsafe {
236 Self::init_node_with_fence(
237 ptr.as_ptr(),
238 &[],
239 &[],
240 node_size,
241 next_level,
242 false,
243 cache_only,
244 )
245 }
246 }
247
248 pub(crate) fn make_base_page(node_size: usize) -> *mut Self {
249 let layout = Layout::from_size_align(node_size, std::mem::align_of::<LeafNode>()).unwrap();
250 let ptr = unsafe { std::alloc::alloc(layout) };
251 unsafe {
252 Self::init_node_with_fence(
253 ptr,
254 &[],
255 &[],
256 node_size,
257 MiniPageNextLevel::new_null(),
258 true,
259 false,
260 );
261 }
262 ptr as *mut Self
263 }
264
265 pub(crate) fn covert_insert_records_to_cache(&mut self) {
267 let mut cur = self.first_meta_pos_after_fence();
268
269 while cur < self.meta.meta_count_with_fence() {
270 let meta = self.get_kv_meta_mut(cur as usize);
271 let op_type = meta.op_type();
272
273 match op_type {
274 OpType::Insert => {
275 meta.set_op_type(OpType::Cache);
276 }
277 OpType::Delete => {
278 meta.set_op_type(OpType::Phantom);
279 }
280 OpType::Cache | OpType::Phantom => {
281 unreachable!("Base page should not have op type: {:?}", op_type);
282 }
283 };
284
285 cur += 1;
286 }
287 }
288
289 pub(crate) fn convert_cache_records_to_insert(&mut self) {
291 let mut cur = self.first_meta_pos_after_fence();
292
293 while cur < self.meta.meta_count_with_fence() {
294 let meta = self.get_kv_meta_mut(cur as usize);
295 let op_type = meta.op_type();
296
297 match op_type {
298 OpType::Cache => {
299 meta.set_op_type(OpType::Insert);
300 }
301 OpType::Phantom => {
302 meta.set_op_type(OpType::Delete);
303 }
304 OpType::Insert | OpType::Delete => {
305 }
307 };
308
309 cur += 1;
310 }
311 }
312
313 unsafe fn init_node_with_fence(
314 ptr: *mut u8,
315 low_fence: &[u8],
316 high_fence: &[u8],
317 node_size: usize,
318 next_level: MiniPageNextLevel,
319 has_fence: bool,
320 cache_only: bool,
321 ) {
322 let ptr = ptr as *mut Self;
323
324 { &mut *ptr }.initialize(
325 low_fence, high_fence, node_size, next_level, has_fence, cache_only,
326 );
327 }
328
329 pub(crate) fn get_kv_meta(&self, index: usize) -> &LeafKVMeta {
330 debug_assert!(index < self.meta.meta_count_with_fence() as usize);
331 let meta_ptr = self.data.as_ptr() as *const LeafKVMeta;
332 unsafe { &*meta_ptr.add(index) }
333 }
334
335 pub(crate) fn get_full_key(&self, meta: &LeafKVMeta) -> Vec<u8> {
336 let prefix = self.get_prefix();
337 let remaining = self.get_remaining_key(meta);
338 [prefix, remaining].concat()
339 }
340
341 pub(crate) fn get_low_fence_full_key(&self) -> Vec<u8> {
343 debug_assert!(LOW_FENCE_IDX < self.meta.meta_count_with_fence() as usize);
344 let meta_ptr = self.data.as_ptr() as *const LeafKVMeta;
345 let meta = unsafe { &*meta_ptr.add(LOW_FENCE_IDX) };
346
347 let key_offset = meta.get_offset();
348 let key_ptr = unsafe { self.data.as_ptr().add(key_offset as usize) };
349 unsafe {
350 let key = std::slice::from_raw_parts(key_ptr, (meta.get_key_len()) as usize);
351 [key].concat()
352 }
353 }
354
355 pub(crate) fn get_kv_meta_mut(&mut self, index: usize) -> &mut LeafKVMeta {
356 debug_assert!(index < self.meta.meta_count_with_fence() as usize);
357 let meta_ptr = self.data.as_mut_ptr() as *mut LeafKVMeta;
358 unsafe { meta_ptr.add(index).as_mut().unwrap() }
359 }
360
361 pub(crate) fn write_initial_kv_meta(&mut self, index: usize, meta: LeafKVMeta) {
362 let meta_ptr = self.data.as_mut_ptr() as *mut LeafKVMeta;
363 unsafe { meta_ptr.add(index).write(meta) };
364 }
365
366 pub(crate) fn get_remaining_key(&self, meta: &LeafKVMeta) -> &[u8] {
367 let key_offset = meta.get_offset();
368 let key_ptr = unsafe { self.data.as_ptr().add(key_offset as usize) };
369 unsafe {
370 std::slice::from_raw_parts(key_ptr, (meta.get_key_len() - self.prefix_len) as usize)
371 }
372 }
373
374 pub(crate) fn get_prefix(&self) -> &[u8] {
375 let m = self.get_kv_meta(LOW_FENCE_IDX);
376 let key_offset = m.get_offset();
377 unsafe {
378 std::slice::from_raw_parts(
379 self.data.as_ptr().add(key_offset as usize),
380 self.prefix_len as usize,
381 )
382 }
383 }
384
385 pub(crate) fn install_fence_key(&mut self, key: &[u8], is_high_fence: bool) {
386 let loc: u16 = self.meta.meta_count_with_fence();
387 if !is_high_fence {
388 debug_assert!(self.meta.meta_count_with_fence() == LOW_FENCE_IDX as u16);
389 } else {
390 debug_assert!(self.meta.meta_count_with_fence() == HIGH_FENCE_IDX as u16);
391 }
392
393 let cur_low_offset = self.current_lowest_offset();
394
395 self.meta.increment_value_count();
396 self.meta.remaining_size -= std::mem::size_of::<LeafKVMeta>() as u16;
397
398 if key.is_empty() {
399 let fence = if is_high_fence {
400 LeafKVMeta::make_infinite_high_fence_key()
401 } else {
402 LeafKVMeta::make_infinite_low_fence_key()
403 };
404 self.write_initial_kv_meta(loc as usize, fence);
405 } else {
406 let prefix_len = if is_high_fence { self.prefix_len } else { 0 }; let post_fix_len = key.len() as u16 - prefix_len;
409 let val_len = 0u16;
410 let kv_len = post_fix_len + val_len;
411
412 let remaining = self.meta.remaining_size;
413
414 debug_assert!(kv_len <= remaining);
415
416 let offset = cur_low_offset - kv_len;
417
418 let new_meta =
419 LeafKVMeta::make_prefixed_meta(offset, 0, key, prefix_len, OpType::Insert);
420
421 self.write_initial_kv_meta(loc as usize, new_meta);
422
423 unsafe {
424 let start_ptr = self.data.as_mut_ptr().add(offset as usize);
425
426 let key_slice = std::slice::from_raw_parts(
427 key.as_ptr().add(prefix_len as usize),
428 post_fix_len as usize,
429 );
430 std::ptr::copy_nonoverlapping(key_slice.as_ptr(), start_ptr, post_fix_len as usize);
431 }
432
433 self.meta.remaining_size -= kv_len;
434 }
435 }
436
437 pub(crate) fn current_lowest_offset(&self) -> u16 {
438 let value_count = self.meta.meta_count_with_fence();
439 let rt =
440 (value_count * std::mem::size_of::<LeafKVMeta>() as u16) + self.meta.remaining_size;
441
442 #[cfg(debug_assertions)]
444 {
445 let mut min_offset = LeafNode::max_data_size(self.meta.node_size as usize) as u16;
446 for i in 0..value_count {
447 let kv_meta = self.get_kv_meta(i as usize);
448 if kv_meta.is_infinite_low_fence_key() || kv_meta.is_infinite_high_fence_key() {
449 continue;
450 }
451 min_offset = std::cmp::min(min_offset, kv_meta.offset);
452 }
453 assert!(min_offset == rt);
454 }
455 rt
456 }
457
458 pub(crate) fn get_value(&self, meta: &LeafKVMeta) -> &[u8] {
459 let val_offset = meta.get_offset() + meta.get_key_len() - self.prefix_len;
460 let val_ptr = unsafe { self.data.as_ptr().add(val_offset as usize) };
461 unsafe { std::slice::from_raw_parts(val_ptr, meta.value_len() as usize) }
462 }
463
464 pub fn get_split_key(&self, cache_only: bool) -> (Vec<u8>, u16) {
475 let mut data_size = 0;
476
477 for meta in self.meta_iter() {
478 let key_len = meta.get_key_len();
479 let value_len = meta.value_len();
480
481 data_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
482 }
483
484 let split_target_size = data_size / 2;
485 let mut cur_size = 0;
486
487 for (i, meta) in self.meta_iter().enumerate() {
488 let key_len = meta.get_key_len();
489 let value_len = meta.value_len();
490
491 cur_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
492
493 if cache_only && i == 0 {
495 continue;
496 }
497
498 if cur_size >= split_target_size {
499 return (self.get_full_key(meta), i as u16);
500 }
501 }
502 unreachable!();
503 }
504
505 #[allow(clippy::unused_enumerate_index)]
507 pub fn get_kv_num_below_key(&self, merge_split_key: &Vec<u8>) -> u16 {
508 let mut cnt: u16 = 0;
510 for (_, meta) in self.meta_iter().enumerate() {
511 let key = self.get_full_key(meta);
512 let cmp = key.cmp(merge_split_key);
513 if cmp == std::cmp::Ordering::Less {
516 cnt += 1;
517 } else {
518 break;
519 }
520 }
521 cnt
522 }
523
524 pub fn get_cache_only_insert_split_key(&self, key: &[u8], new_record_size: &u16) -> Vec<u8> {
529 let mut merge_split_key_1: Option<Vec<u8>> = None;
530 let mut merge_split_key_2: Option<Vec<u8>> = None;
531 let mut diff_1: i16 = i16::MAX;
532 let mut diff_2: i16 = i16::MAX;
533
534 let mut total_merged_size: u16 = 0;
536
537 for meta in self.meta_iter() {
538 let key_len = meta.get_key_len();
539 let value_len = meta.value_len();
540
541 total_merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
542 }
543
544 total_merged_size += new_record_size + std::mem::size_of::<LeafKVMeta>() as u16;
545 let split_target_size = total_merged_size / 2;
546
547 let mut merged_size: u16 = 0;
549 let mut self_meta_iter = self.meta_iter();
550 let mut self_meta_option = self_meta_iter.next();
551
552 if self_meta_option.is_some() {
554 let mut cur_base_meta = self_meta_option.unwrap();
555 let mut cur_base_key = self.get_full_key(cur_base_meta);
556 let mut cmp = cur_base_key.as_slice().cmp(key);
557
558 while cmp == std::cmp::Ordering::Less {
559 let key_len = cur_base_meta.get_key_len();
560 let value_len = cur_base_meta.value_len();
561 merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
562 if merged_size >= split_target_size {
563 if merge_split_key_2.is_some() {
566 break;
567 }
568
569 let left_side: u16 = merged_size
570 - key_len
571 - value_len
572 - std::mem::size_of::<LeafKVMeta>() as u16;
573 let right_side = total_merged_size - left_side;
574
575 if merge_split_key_1.is_none() {
576 merge_split_key_1 = Some(cur_base_key);
577 diff_1 = (left_side as i16 - right_side as i16).abs();
578 } else {
579 merge_split_key_2 = Some(cur_base_key);
580 diff_2 = (left_side as i16 - right_side as i16).abs();
581 }
582 }
583
584 self_meta_option = self_meta_iter.next();
585 if let Some(meta) = self_meta_option {
586 cur_base_meta = meta;
587 cur_base_key = self.get_full_key(cur_base_meta);
588 cmp = cur_base_key.as_slice().cmp(key);
589 } else {
590 break;
591 }
592 }
593 }
594
595 if merge_split_key_2.is_none() {
597 merged_size += new_record_size + std::mem::size_of::<LeafKVMeta>() as u16;
598
599 if merged_size >= split_target_size {
600 let left_side: u16 =
604 merged_size - new_record_size - std::mem::size_of::<LeafKVMeta>() as u16;
605 let right_side = total_merged_size - left_side;
606
607 if merge_split_key_1.is_none() {
608 merge_split_key_1 = Some(key.to_vec());
609 diff_1 = (left_side as i16 - right_side as i16).abs();
610 } else {
611 merge_split_key_2 = Some(key.to_vec());
612 diff_2 = (left_side as i16 - right_side as i16).abs();
613 }
614 }
615 }
616
617 while self_meta_option.is_some() {
619 let base_meta = self_meta_option.unwrap();
620 let key_len = base_meta.get_key_len();
621 let value_len = base_meta.value_len();
622 merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
623
624 if merged_size >= split_target_size {
625 let cur_base_key = self.get_full_key(base_meta);
627 if merge_split_key_2.is_some() {
628 break;
629 }
630
631 let left_side: u16 =
632 merged_size - key_len - value_len - std::mem::size_of::<LeafKVMeta>() as u16;
633 let right_side = total_merged_size - left_side;
634
635 if merge_split_key_1.is_none() {
636 merge_split_key_1 = Some(cur_base_key);
637 diff_1 = (left_side as i16 - right_side as i16).abs();
638 } else {
639 merge_split_key_2 = Some(cur_base_key);
640 diff_2 = (left_side as i16 - right_side as i16).abs();
641 }
642 }
643 self_meta_option = self_meta_iter.next();
644 }
645
646 if merge_split_key_1.is_none() {
649 panic!(
650 "Fail to find a splitting key for merging mini and base page.{}, {}",
651 merged_size, split_target_size
652 );
653 }
654
655 if merge_split_key_2.is_none() || diff_1 < diff_2 {
656 return merge_split_key_1.unwrap();
657 }
658
659 merge_split_key_2.unwrap()
660 }
661
662 #[allow(clippy::unnecessary_unwrap)]
669 pub(crate) fn get_merge_split_key(&mut self, mini_page: &LeafNode) -> Vec<u8> {
670 let mut merge_split_key_1: Option<Vec<u8>> = None;
671 let mut merge_split_key_2: Option<Vec<u8>> = None;
672 let mut diff_1: i16 = i16::MAX;
673 let mut diff_2: i16 = i16::MAX;
674
675 let mut total_merged_size: u16 = 0;
676 let mut base_meta_iter = self.meta_iter();
677 let mut cur_pos = base_meta_iter.cur;
678 let mut cur_base_meta_option = base_meta_iter.next();
679 let mut duplicate_positions = vec![];
680
681 for mini_meta in mini_page.meta_iter() {
683 let c_type = mini_meta.op_type();
684 if !c_type.is_dirty() {
686 continue;
687 }
688
689 let cur_mini_key = mini_page.get_full_key(mini_meta);
690 if cur_base_meta_option.is_some() {
691 let mut cur_base_meta = cur_base_meta_option.unwrap();
692 let mut cur_base_key = self.get_full_key(cur_base_meta);
693 let mut cmp = cur_base_key.cmp(&cur_mini_key);
694
695 while cmp == std::cmp::Ordering::Less {
698 let key_len = cur_base_meta.get_key_len();
699 let value_len = cur_base_meta.value_len();
700 total_merged_size +=
701 key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
702
703 cur_pos = base_meta_iter.cur;
704 cur_base_meta_option = base_meta_iter.next();
705 if cur_base_meta_option.is_some() {
706 cur_base_meta = cur_base_meta_option.unwrap();
707 cur_base_key = self.get_full_key(cur_base_meta);
708 cmp = cur_base_key.cmp(&cur_mini_key);
709 } else {
710 break;
711 }
712 }
713
714 if cmp == std::cmp::Ordering::Equal && cur_base_meta_option.is_some() {
717 duplicate_positions.push(cur_pos);
719 cur_pos = base_meta_iter.cur;
720 cur_base_meta_option = base_meta_iter.next();
721 }
722 }
723
724 let key_len = mini_meta.get_key_len();
725 let value_len = mini_meta.value_len();
726 total_merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
727 }
728
729 while cur_base_meta_option.is_some() {
732 let base_meta = cur_base_meta_option.unwrap();
733 let key_len = base_meta.get_key_len();
734 let value_len = base_meta.value_len();
735 total_merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
736
737 cur_base_meta_option = base_meta_iter.next();
738 }
739
740 let split_target_size = total_merged_size / 2;
742
743 let mut merged_size: u16 = 0;
746 base_meta_iter = self.meta_iter();
747 cur_base_meta_option = base_meta_iter.next();
748
749 for mini_meta in mini_page.meta_iter() {
750 let c_type = mini_meta.op_type();
753 if !c_type.is_dirty() {
754 continue;
755 }
756
757 let cur_mini_key = mini_page.get_full_key(mini_meta);
758 if cur_base_meta_option.is_some() {
759 let mut cur_base_meta = cur_base_meta_option.unwrap();
760 let mut cur_base_key = self.get_full_key(cur_base_meta);
761 let mut cmp = cur_base_key.cmp(&cur_mini_key);
762 while cmp == std::cmp::Ordering::Less {
765 let key_len = cur_base_meta.get_key_len();
766 let value_len = cur_base_meta.value_len();
767 merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
768 if merged_size >= split_target_size {
769 if merge_split_key_2.is_some() {
772 break;
773 }
774
775 let left_side: u16 = merged_size
776 - key_len
777 - value_len
778 - std::mem::size_of::<LeafKVMeta>() as u16;
779 let right_side = total_merged_size - left_side;
780
781 if merge_split_key_1.is_none() {
782 merge_split_key_1 = Some(cur_base_key);
783 diff_1 = (left_side as i16 - right_side as i16).abs();
784 } else {
785 merge_split_key_2 = Some(cur_base_key);
786 diff_2 = (left_side as i16 - right_side as i16).abs();
787 }
788 }
789
790 cur_base_meta_option = base_meta_iter.next();
791 if cur_base_meta_option.is_some() {
792 cur_base_meta = cur_base_meta_option.unwrap();
793 cur_base_key = self.get_full_key(cur_base_meta);
794 cmp = cur_base_key.cmp(&cur_mini_key);
795 } else {
796 break;
797 }
798 }
799
800 if cmp == std::cmp::Ordering::Equal && cur_base_meta_option.is_some() {
803 cur_base_meta_option = base_meta_iter.next();
804 }
805 }
806
807 let key_len = mini_meta.get_key_len();
808 let value_len = mini_meta.value_len();
809 merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
810
811 if merged_size >= split_target_size {
812 if merge_split_key_2.is_some() {
815 break;
816 }
817
818 let left_side: u16 =
819 merged_size - key_len - value_len - std::mem::size_of::<LeafKVMeta>() as u16;
820 let right_side = total_merged_size - left_side;
821
822 if merge_split_key_1.is_none() {
823 merge_split_key_1 = Some(cur_mini_key);
824 diff_1 = (left_side as i16 - right_side as i16).abs();
825 } else {
826 merge_split_key_2 = Some(cur_mini_key);
827 diff_2 = (left_side as i16 - right_side as i16).abs();
828 }
829 }
830 }
831
832 while cur_base_meta_option.is_some() {
835 let base_meta = cur_base_meta_option.unwrap();
836 let key_len = base_meta.get_key_len();
837 let value_len = base_meta.value_len();
838 merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
839
840 if merged_size >= split_target_size {
841 let cur_base_key = self.get_full_key(base_meta);
843 if merge_split_key_2.is_some() {
844 break;
845 }
846
847 let left_side: u16 =
848 merged_size - key_len - value_len - std::mem::size_of::<LeafKVMeta>() as u16;
849 let right_side = total_merged_size - left_side;
850
851 if merge_split_key_1.is_none() {
852 merge_split_key_1 = Some(cur_base_key);
853 diff_1 = (left_side as i16 - right_side as i16).abs();
854 } else {
855 merge_split_key_2 = Some(cur_base_key);
856 diff_2 = (left_side as i16 - right_side as i16).abs();
857 }
858 }
859 cur_base_meta_option = base_meta_iter.next();
860 }
861
862 for pos in duplicate_positions {
864 let pos_meta = self.get_kv_meta_mut(pos);
865 pos_meta.mark_as_deleted();
866 }
867
868 if merge_split_key_1.is_none() {
869 unreachable!(
870 "Fail to find a splitting key for merging mini and base page.{}, {}",
871 merged_size, split_target_size
872 );
873 }
874
875 if merge_split_key_2.is_some() {
877 let cmp = merge_split_key_1
878 .as_ref()
879 .unwrap()
880 .cmp(merge_split_key_2.as_ref().unwrap());
881 assert_ne!(cmp, std::cmp::Ordering::Equal);
882 }
883
884 let mut splitting_key = if merge_split_key_2.is_none() || diff_1 < diff_2 {
885 merge_split_key_1.as_ref().unwrap()
886 } else {
887 merge_split_key_2.as_ref().unwrap()
888 };
889
890 let mut fence_meta = self.get_kv_meta(LOW_FENCE_IDX);
893 if !fence_meta.is_infinite_low_fence_key() {
894 let low_fence_key = self.get_low_fence_key();
895 let cmp = splitting_key.cmp(&low_fence_key);
896 if cmp == std::cmp::Ordering::Equal {
897 splitting_key = merge_split_key_2.as_ref().unwrap();
898 }
899 }
900
901 fence_meta = self.get_kv_meta(HIGH_FENCE_IDX);
902 if !fence_meta.is_infinite_high_fence_key() {
903 let high_fence_key = self.get_high_fence_key();
904 let cmp = splitting_key.cmp(&high_fence_key);
905 assert_ne!(cmp, std::cmp::Ordering::Equal);
906 }
907
908 splitting_key.clone()
909 }
910
911 pub fn get_key_to_reach_this_node(&self) -> Vec<u8> {
912 let meta = self.get_kv_meta(self.first_meta_pos_after_fence() as usize);
913 self.get_full_key(meta)
914 }
915
916 pub fn try_get_key_to_reach_this_node(&self) -> Result<Vec<u8>, TreeError> {
920 assert!(self.meta.is_cache_only_leaf());
921
922 let index = 0;
924 if index >= self.meta.meta_count_with_fence() as usize {
925 return Err(TreeError::NeedRestart);
926 }
927
928 let meta_ptr = self.data.as_ptr() as *const LeafKVMeta;
929 let meta = unsafe { &*meta_ptr.add(index) };
930
931 let key_offset = meta.get_offset();
932 let key_len = meta.get_key_len();
933
934 if key_offset + key_len > LeafNode::max_data_size(self.meta.node_size as usize) as u16 {
935 return Err(TreeError::NeedRestart);
936 }
937
938 let key_ptr = unsafe { self.data.as_ptr().add(key_offset as usize) };
939 let key_slice =
940 unsafe { std::slice::from_raw_parts(key_ptr, (key_len - self.prefix_len) as usize) };
941 Ok(key_slice.to_vec())
942 }
943
944 pub fn get_low_fence_key(&self) -> Vec<u8> {
945 assert!(self.has_fence());
946 let fence_meta = self.get_kv_meta(LOW_FENCE_IDX);
947 if fence_meta.is_infinite_low_fence_key() {
948 vec![]
949 } else {
950 unsafe {
951 let start_ptr = self.data.as_ptr().add(fence_meta.offset as usize);
952 let key_slice =
953 std::slice::from_raw_parts(start_ptr, fence_meta.get_key_len() as usize);
954 key_slice.to_vec()
955 }
956 }
957 }
958
959 pub fn get_high_fence_key(&self) -> Vec<u8> {
960 assert!(self.has_fence());
961 let fence_meta = self.get_kv_meta(HIGH_FENCE_IDX);
962 if fence_meta.is_infinite_high_fence_key() {
963 vec![]
964 } else {
965 self.get_full_key(fence_meta)
966 }
967 }
968
969 #[inline]
972 pub(crate) fn key_cmp(&self, meta: &LeafKVMeta, key: &[u8]) -> Ordering {
973 let search_key_prefix = &key[(self.prefix_len as usize)..]
974 [..std::cmp::min(key.len() - self.prefix_len as usize, PREVIEW_SIZE)];
975
976 let prefix_key = &meta.preview_bytes[..std::cmp::min(
977 PREVIEW_SIZE,
978 meta.get_key_len() as usize - self.prefix_len as usize,
979 )];
980 let mut cmp = prefix_key.cmp(search_key_prefix);
981
982 if cmp == Ordering::Equal {
984 let full_key = self.get_remaining_key(meta);
985 let search_key_postfix = &key[self.prefix_len as usize..];
986 cmp = full_key.cmp(search_key_postfix);
987 }
988 cmp
989 }
990
991 pub(crate) fn linear_lower_bound(&self, key: &[u8]) -> u16 {
992 debug_assert!(key.len() >= self.prefix_len as usize);
993
994 let mut index = self.first_meta_pos_after_fence();
995
996 while index < self.meta.meta_count_with_fence() {
997 let key_meta = self.get_kv_meta(index as usize);
998
999 #[cfg(target_arch = "x86_64")]
1000 unsafe {
1001 core::arch::x86_64::_mm_clflush(key_meta as *const LeafKVMeta as *const u8);
1003 }
1004 let cmp = self.key_cmp(key_meta, key);
1005
1006 if cmp != Ordering::Less {
1007 return index;
1008 }
1009
1010 index += 1;
1011 }
1012
1013 index
1014 }
1015
1016 pub(crate) fn lower_bound(&self, key: &[u8]) -> u16 {
1017 let mut lower = self.first_meta_pos_after_fence();
1018 let mut upper = self.meta.meta_count_with_fence();
1019
1020 let search_key_prefix = &key[(self.prefix_len as usize)..]
1021 [..std::cmp::min(key.len() - self.prefix_len as usize, PREVIEW_SIZE)];
1022
1023 debug_assert!(key.len() >= self.prefix_len as usize);
1024
1025 while lower < upper {
1026 let mid = lower + (upper - lower) / 2;
1027 let key_meta = self.get_kv_meta(mid as usize);
1028
1029 let prefix_key = &key_meta.preview_bytes[..std::cmp::min(
1030 PREVIEW_SIZE,
1031 key_meta.get_key_len() as usize - self.prefix_len as usize,
1032 )];
1033 let mut cmp = prefix_key.cmp(search_key_prefix);
1034
1035 if cmp == Ordering::Equal {
1037 let remaining_key = self.get_remaining_key(key_meta);
1038 let search_key_postfix = &key[self.prefix_len as usize..];
1039 cmp = remaining_key.cmp(search_key_postfix);
1040 }
1041
1042 match cmp {
1043 Ordering::Greater => {
1044 upper = mid;
1045 }
1046 Ordering::Equal => {
1047 return mid;
1048 }
1049 Ordering::Less => {
1050 lower = mid + 1;
1051 }
1052 }
1053 }
1054 lower
1055 }
1056
1057 pub(crate) fn insert(
1065 &mut self,
1066 key: &[u8],
1067 value: &[u8],
1068 op_type: OpType,
1069 max_fence_len: usize,
1070 ) -> bool {
1071 debug_assert!(key.len() as u16 >= self.prefix_len);
1072 match op_type {
1073 OpType::Insert | OpType::Cache => {
1074 debug_assert!(!value.is_empty());
1075 }
1076 OpType::Delete | OpType::Phantom => {}
1077 }
1078
1079 let post_fix_len = key.len() as u16 - self.prefix_len;
1080 let val_len = value.len() as u16;
1081 let kv_len = post_fix_len + val_len;
1082
1083 let value_count_with_fence = self.meta.meta_count_with_fence();
1084
1085 let pos = self.lower_bound(key) as usize;
1086
1087 if pos < value_count_with_fence as usize {
1088 let prefix_len = self.prefix_len as usize;
1089 let pos_meta = self.get_kv_meta(pos);
1090 let pos_key = self.get_remaining_key(pos_meta);
1091 let search_key_postfix = &key[prefix_len..];
1092 if pos_key.cmp(search_key_postfix) == Ordering::Equal {
1093 counter!(LeafInsertDuplicate);
1095 if op_type == OpType::Delete {
1096 let pos_meta = self.get_kv_meta_mut(pos);
1097 pos_meta.mark_as_deleted();
1098 return true;
1099 }
1100
1101 let pos_value = self.get_value(pos_meta);
1102 let pos_value_len = pos_value.len() as u16;
1103
1104 if pos_value_len >= val_len {
1105 unsafe {
1107 let pair_ptr = self.data.as_ptr().add(pos_meta.offset as usize) as *mut u8;
1108 std::ptr::copy_nonoverlapping(
1109 value.as_ptr(),
1110 pair_ptr.add(post_fix_len as usize),
1111 val_len as usize,
1112 );
1113 }
1114 let pos_meta = self.get_kv_meta_mut(pos);
1115 pos_meta.set_value_len(val_len);
1116 pos_meta.set_op_type(op_type);
1117 return true;
1118 }
1119
1120 if self.meta.remaining_size < kv_len {
1121 return false;
1122 }
1123 assert!(op_type != OpType::Cache);
1124 let offset = self.current_lowest_offset() - kv_len;
1125 unsafe {
1126 let pair_ptr = self.data.as_ptr().add(offset as usize) as *mut u8;
1127
1128 let pos_meta = self.get_kv_meta_mut(pos);
1129 pos_meta.set_value_len(val_len);
1130 pos_meta.set_op_type(op_type);
1131 pos_meta.offset = offset;
1132 std::ptr::copy_nonoverlapping(
1133 key[self.prefix_len as usize..].as_ptr(),
1134 pair_ptr,
1135 post_fix_len as usize,
1136 );
1137 std::ptr::copy_nonoverlapping(
1138 value.as_ptr(),
1139 pair_ptr.add(post_fix_len as usize),
1140 val_len as usize,
1141 );
1142 }
1143 self.meta.remaining_size -= kv_len;
1144 return true;
1145 }
1146 }
1147
1148 if op_type == OpType::Delete && (self.is_base_page()) {
1152 return true;
1154 }
1155
1156 if self.full_with_fences(
1158 kv_len + std::mem::size_of::<LeafKVMeta>() as u16,
1159 max_fence_len,
1160 ) {
1161 return false;
1162 }
1163
1164 counter!(LeafInsertNew);
1165
1166 let offset = self.current_lowest_offset() - kv_len;
1167 let new_meta =
1168 LeafKVMeta::make_prefixed_meta(offset, val_len, key, self.prefix_len, op_type);
1169
1170 unsafe {
1171 let metas_size = std::mem::size_of::<LeafKVMeta>()
1172 * (self.meta.meta_count_with_fence() - pos as u16) as usize;
1173 let src_ptr = self
1174 .data
1175 .as_ptr()
1176 .add(pos * std::mem::size_of::<LeafKVMeta>());
1177 let dest_ptr = self
1178 .data
1179 .as_mut_ptr()
1180 .add((pos + 1) * std::mem::size_of::<LeafKVMeta>());
1181 std::ptr::copy(src_ptr, dest_ptr, metas_size);
1182
1183 self.write_initial_kv_meta(pos, new_meta);
1184
1185 let pair_ptr = self.data.as_mut_ptr().add(offset as usize);
1186 std::ptr::copy_nonoverlapping(
1187 key[self.prefix_len as usize..].as_ptr(),
1188 pair_ptr,
1189 post_fix_len as usize,
1190 );
1191 std::ptr::copy_nonoverlapping(
1192 value.as_ptr(),
1193 pair_ptr.add(post_fix_len as usize),
1194 val_len as usize,
1195 );
1196 }
1197
1198 self.meta.remaining_size -= kv_len + std::mem::size_of::<LeafKVMeta>() as u16;
1199 self.meta.increment_value_count();
1200 true
1201 }
1202
1203 pub(crate) fn is_base_page(&self) -> bool {
1205 (!self.meta.is_cache_only_leaf()) && self.next_level.is_null()
1206 }
1207
1208 pub fn split(&mut self, sibling: &mut LeafNode, cache_only: bool) -> Vec<u8> {
1212 if cache_only {
1213 assert!(self.meta.is_cache_only_leaf() && !self.has_fence());
1214 } else {
1215 assert!(self.is_base_page() && self.has_fence());
1216 }
1217
1218 let current_count = self.meta.meta_count_without_fence();
1219
1220 let (splitting_key, new_cur_count) = self.get_split_key(cache_only);
1225 let sibling_cnt = current_count - new_cur_count;
1226
1227 if !cache_only {
1231 let low_fence_for_right = self.get_kv_meta(FENCE_KEY_CNT + new_cur_count as usize);
1232 assert!(!low_fence_for_right.is_infinite_low_fence_key());
1233 let high_fence_for_right = self.get_kv_meta(HIGH_FENCE_IDX);
1234
1235 let low_fence_key = self.get_full_key(low_fence_for_right);
1236 let high_fence_key = if high_fence_for_right.is_infinite_high_fence_key() {
1237 vec![]
1238 } else {
1239 self.get_full_key(high_fence_for_right)
1240 };
1241
1242 sibling.initialize(
1243 &low_fence_key,
1244 &high_fence_key,
1245 sibling.meta.node_size as usize,
1246 MiniPageNextLevel::new_null(),
1247 true,
1248 cache_only,
1249 );
1250 } else {
1251 sibling.initialize(
1252 &[],
1253 &[],
1254 sibling.meta.node_size as usize,
1255 MiniPageNextLevel::new_null(),
1256 false,
1257 cache_only,
1258 );
1259 }
1260
1261 let starting_kv_idx = if cache_only { 0 } else { FENCE_KEY_CNT };
1262
1263 for i in 0..sibling_cnt {
1264 let kv_meta = self.get_kv_meta((new_cur_count + i) as usize + starting_kv_idx);
1265 if kv_meta.op_type() == OpType::Delete {
1266 continue;
1268 }
1269 let key = self.get_full_key(kv_meta);
1270 let value = self.get_value(kv_meta);
1271 let insert_rt = sibling.insert(&key, value, OpType::Insert, 0);
1272 assert!(insert_rt);
1273 }
1274
1275 if !cache_only {
1276 self.meta
1277 .set_value_count(new_cur_count + FENCE_KEY_CNT as u16);
1278 } else {
1279 self.meta.set_value_count(new_cur_count);
1280 }
1281
1282 self.consolidate_after_split(&splitting_key);
1283
1284 splitting_key
1285 }
1286
1287 pub fn split_with_key(
1292 &mut self,
1293 sibling: &mut LeafNode,
1294 merge_split_key: &Vec<u8>,
1295 cache_only: bool,
1296 ) {
1297 if cache_only {
1298 assert!(self.meta.is_cache_only_leaf() && !self.has_fence());
1299 } else {
1300 assert!(self.is_base_page() && self.has_fence());
1301 }
1302
1303 let current_count = self.meta.meta_count_without_fence();
1304 let mut new_cur_count = self.get_kv_num_below_key(merge_split_key);
1309 let sibling_cnt = current_count - new_cur_count;
1310
1311 if !cache_only {
1315 let high_fence_for_right = self.get_kv_meta(HIGH_FENCE_IDX);
1316
1317 let low_fence_key = merge_split_key.clone();
1318 let high_fence_key = if high_fence_for_right.is_infinite_high_fence_key() {
1319 vec![]
1320 } else {
1321 self.get_full_key(high_fence_for_right)
1322 };
1323
1324 sibling.initialize(
1325 &low_fence_key,
1326 &high_fence_key,
1327 sibling.meta.node_size as usize,
1328 MiniPageNextLevel::new_null(),
1329 true,
1330 cache_only,
1331 );
1332 } else {
1333 sibling.initialize(
1334 &[],
1335 &[],
1336 sibling.meta.node_size as usize,
1337 MiniPageNextLevel::new_null(),
1338 false,
1339 cache_only,
1340 );
1341 }
1342
1343 let starting_kv_index = if cache_only { 0 } else { FENCE_KEY_CNT };
1344
1345 for i in 0..sibling_cnt {
1346 let kv_meta = self.get_kv_meta((new_cur_count + i) as usize + starting_kv_index);
1347 if kv_meta.op_type() == OpType::Delete {
1348 continue;
1350 }
1351 let key = self.get_full_key(kv_meta);
1352 let value = self.get_value(kv_meta);
1353 let insert_rt = sibling.insert(&key, value, OpType::Insert, 0);
1354 assert!(insert_rt);
1355 }
1356
1357 if !cache_only {
1358 new_cur_count += FENCE_KEY_CNT as u16;
1359 }
1360
1361 self.meta.set_value_count(new_cur_count);
1362 self.consolidate_after_split(merge_split_key);
1363
1364 if !cache_only {
1365 let left_high_fence = self.get_kv_meta(HIGH_FENCE_IDX);
1368 let left_high_fence_key = self.get_full_key(left_high_fence);
1369 let right_low_fence_key = sibling.get_low_fence_full_key();
1370
1371 assert_eq!(left_high_fence_key, *merge_split_key); assert_eq!(right_low_fence_key, *merge_split_key);
1373 }
1374 }
1375
1376 pub(crate) fn consolidate_inner(
1377 &mut self,
1378 new_optype: OpType,
1379 new_high_fence: Option<&[u8]>,
1380 skip_tombstone: bool,
1381 cache_only: bool,
1382 skip_key: Option<&[u8]>,
1383 ) {
1384 let mut pairs = Vec::new();
1385
1386 for meta in self.meta_iter() {
1387 if skip_tombstone && meta.op_type() == OpType::Delete {
1388 continue;
1390 }
1391
1392 match skip_key {
1393 Some(s_k) => {
1395 let k = self.get_full_key(meta);
1396 let cmp = s_k.cmp(&k);
1397
1398 if cmp != Ordering::Equal {
1399 pairs.push((k, self.get_value(meta).to_owned(), meta.op_type()));
1400 }
1401 }
1402 None => {
1403 pairs.push((
1404 self.get_full_key(meta),
1405 self.get_value(meta).to_owned(),
1406 meta.op_type(),
1407 ));
1408 }
1409 }
1410 }
1411
1412 let has_fence = self.has_fence();
1413 let (low_fence, high_fence) = if has_fence {
1414 let high_fence_key = match new_high_fence {
1415 None => self.get_high_fence_key(),
1416 Some(key) => key.to_vec(),
1417 };
1418 (self.get_low_fence_key(), high_fence_key)
1419 } else {
1420 (vec![], vec![])
1421 };
1422
1423 let node_size = self.meta.node_size;
1424
1425 self.initialize(
1426 &low_fence,
1427 &high_fence,
1428 node_size as usize,
1429 self.next_level,
1430 has_fence,
1431 cache_only,
1432 );
1433
1434 for (key, value, op_type) in pairs {
1435 let rt = if op_type == OpType::Delete {
1436 self.insert(&key, &value, OpType::Delete, 0)
1437 } else {
1438 self.insert(&key, &value, new_optype, 0)
1439 };
1440 assert!(rt);
1441 }
1442 }
1443
1444 #[allow(dead_code)]
1445 pub(crate) fn consolidate(&mut self) {
1446 self.consolidate_inner(
1447 OpType::Insert,
1448 None,
1449 true,
1450 self.meta.is_cache_only_leaf(),
1451 None,
1452 );
1453 }
1454
1455 #[allow(dead_code)]
1458 pub(crate) fn consolidate_after_merge(&mut self) {
1459 assert!(!self.is_base_page());
1460 self.consolidate_inner(
1461 OpType::Cache,
1462 None,
1463 false,
1464 self.meta.is_cache_only_leaf(),
1465 None,
1466 );
1467 }
1468
1469 fn consolidate_after_split(&mut self, high_fence: &[u8]) {
1475 assert!(self.meta.is_cache_only_leaf() || self.is_base_page());
1476 self.consolidate_inner(
1477 OpType::Insert,
1478 Some(high_fence),
1479 true,
1480 self.meta.is_cache_only_leaf(),
1481 None,
1482 );
1483 }
1484
1485 pub(crate) fn consolidate_skip_key(&mut self, key: &[u8]) {
1489 assert!(!self.is_base_page());
1490 assert!(self.meta.is_cache_only_leaf());
1491 self.consolidate_inner(
1492 OpType::Insert,
1493 None,
1494 true,
1495 self.meta.is_cache_only_leaf(),
1496 Some(key),
1497 );
1498 }
1499
1500 pub(crate) fn copy_initialize_to(
1502 &self,
1503 dst_node: *mut LeafNode,
1504 dst_size: usize,
1505 discard_cold_cache: bool,
1506 ) {
1507 assert!(!self.is_base_page());
1508 assert!(self.meta.node_size as usize <= dst_size);
1509 let dst_ref = unsafe { &mut *dst_node };
1510 let empty = vec![];
1511
1512 dst_ref.initialize(
1513 &empty,
1514 &empty,
1515 dst_size,
1516 self.next_level,
1517 false, self.meta.is_cache_only_leaf(),
1519 );
1520
1521 for meta in self.meta_iter() {
1522 let op = meta.op_type();
1523
1524 if discard_cold_cache && op.is_cache() && !meta.is_referenced() {
1529 continue;
1530 }
1531
1532 let value = self.get_value(meta);
1533 let rt = dst_ref.insert(&self.get_full_key(meta), value, op, 0);
1534 assert!(rt);
1535 }
1536 }
1537
1538 pub(crate) fn initialize(
1540 &mut self,
1541 low_fence: &[u8],
1542 high_fence: &[u8],
1543 node_size: usize,
1544 next_level: MiniPageNextLevel,
1545 has_fence: bool,
1546 cache_only: bool,
1547 ) {
1548 let snapshot_version = match CPRSnapShotMgr::get_snapshot_thread_id() {
1549 Ok(_) => CPRSnapShotMgr::get_snapshot_thread_version(),
1550 Err(_) => INVALID_SNAPSHOT_VERSION,
1551 };
1552
1553 self.snapshot_version = snapshot_version & Self::SNAPSHOT_VERSION_MASK;
1555
1556 if !has_fence {
1557 assert!(low_fence.is_empty());
1558 assert!(high_fence.is_empty());
1559
1560 self.meta = NodeMeta::new(
1561 LeafNode::max_data_size(node_size) as u16,
1562 false,
1563 false,
1564 node_size as u16,
1565 cache_only,
1566 );
1567 self.prefix_len = 0;
1568 self.next_level = next_level;
1569 } else {
1570 self.meta = NodeMeta::new(
1571 LeafNode::max_data_size(node_size) as u16, false,
1573 true,
1574 node_size as u16,
1575 cache_only,
1576 );
1577
1578 self.prefix_len = common_prefix_len(low_fence, high_fence);
1579 self.next_level = next_level;
1580 self.install_fence_key(low_fence, false);
1581 self.install_fence_key(high_fence, true);
1582 }
1583 }
1584
1585 pub(crate) fn set_split_flag(&mut self) {
1586 self.meta.set_split_flag();
1587 }
1588
1589 pub(crate) fn get_split_flag(&self) -> bool {
1590 self.meta.get_split_flag()
1591 }
1592
1593 const SNAPSHOT_VERSION_CHANGED_FLAG: u64 = 0x8000_0000_0000_0000;
1596 const SNAPSHOT_VERSION_MASK: u64 = 0x7FFF_FFFF_FFFF_FFFF;
1597
1598 pub(crate) fn get_snapshot_version(&self) -> u64 {
1600 self.snapshot_version & Self::SNAPSHOT_VERSION_MASK
1601 }
1602
1603 pub(crate) fn set_snapshot_version(&mut self, snapshot_version: u64, mark_changed: bool) {
1605 let next_version = snapshot_version & Self::SNAPSHOT_VERSION_MASK;
1606
1607 if !mark_changed {
1608 self.snapshot_version = next_version;
1609 return;
1610 }
1611
1612 let current_version = self.get_snapshot_version();
1613
1614 if next_version != current_version {
1616 self.snapshot_version = next_version | Self::SNAPSHOT_VERSION_CHANGED_FLAG;
1617 }
1618 }
1619
1620 pub(crate) fn is_snapshot_version_changed(&self) -> bool {
1622 (self.snapshot_version & Self::SNAPSHOT_VERSION_CHANGED_FLAG) != 0
1623 }
1624
1625 pub(crate) fn set_snapshot_version_changed_flag(&mut self) {
1627 self.snapshot_version |= Self::SNAPSHOT_VERSION_CHANGED_FLAG;
1628 }
1629
1630 pub(crate) fn read_by_key(&self, search_key: &[u8], out_buffer: &mut [u8]) -> LeafReadResult {
1631 self.read_by_key_inner(search_key, out_buffer, true)
1632 }
1633
1634 #[must_use]
1636 pub(crate) fn read_by_key_inner(
1637 &self,
1638 search_key: &[u8],
1639 out_buffer: &mut [u8],
1640 binary_search: bool,
1641 ) -> LeafReadResult {
1642 let val_count = self.meta.meta_count_with_fence();
1643 let pos = if binary_search {
1644 self.lower_bound(search_key)
1645 } else {
1646 self.linear_lower_bound(search_key)
1647 };
1648
1649 if pos >= val_count {
1650 counter!(LeafNotFoundDueToRange);
1651 return LeafReadResult::NotFound;
1652 }
1653
1654 let kv_meta = self.get_kv_meta(pos as usize);
1655 let target_key = self.get_remaining_key(kv_meta);
1656
1657 if !kv_meta.is_referenced() {
1659 kv_meta.mark_as_ref();
1660 }
1661
1662 let input_post_key = &search_key[self.prefix_len as usize..];
1663 let cmp = target_key.cmp(input_post_key);
1664
1665 if cmp != Ordering::Equal {
1666 counter!(LeafNotFoundDueToKey);
1667 LeafReadResult::NotFound
1668 } else {
1669 if kv_meta.op_type().is_absent() {
1670 return LeafReadResult::Deleted;
1671 }
1672 let val_len = kv_meta.value_len();
1673 let val_ref = self.get_value(kv_meta);
1674 debug_assert_eq!(val_len as usize, val_ref.len());
1675 out_buffer[..val_len as usize].copy_from_slice(val_ref);
1676 LeafReadResult::Found(val_len as u32)
1677 }
1678 }
1679
1680 pub(crate) fn get_chain_size_hint(
1685 key_len: usize,
1686 value_len: usize,
1687 page_classes: &[usize],
1688 cache_only: bool,
1689 ) -> usize {
1690 let mut initial_record_size = key_len + value_len + std::mem::size_of::<LeafKVMeta>();
1691 initial_record_size += std::mem::size_of::<LeafNode>();
1692
1693 if let Some(s) = page_classes[0..(page_classes.len() - 1)]
1694 .iter()
1695 .position(|x| initial_record_size < *x)
1696 {
1697 return page_classes[s];
1698 } else if cache_only && initial_record_size <= page_classes[page_classes.len() - 1] {
1699 return page_classes[page_classes.len() - 1];
1700 }
1701
1702 panic!(
1703 "Record size {} plus metadata exceeds the max mini-page size {:?}",
1704 initial_record_size, page_classes
1705 );
1706 }
1707
1708 pub(crate) fn new_size_if_upgrade(
1714 &self,
1715 incoming_size: usize,
1716 page_classes: &[usize],
1717 cache_only: bool,
1718 ) -> Option<usize> {
1719 let cur_size = self.meta.node_size as usize;
1720 let request_size = cur_size + incoming_size;
1721
1722 if let Some(s) = page_classes[0..(page_classes.len() - 1)]
1723 .iter()
1724 .position(|x| request_size < *x)
1725 {
1726 if s == 0 {
1727 panic!("Should not be here");
1728 }
1729 return Some(page_classes[s]);
1730 } else if cache_only && request_size <= page_classes[page_classes.len() - 1] {
1731 return Some(page_classes[page_classes.len() - 1]);
1732 }
1733
1734 None
1735 }
1736
1737 pub(crate) fn free_base_page(node: *mut LeafNode) {
1739 assert!(unsafe { &*node }.is_base_page());
1740 let node_size = unsafe { &*node }.meta.node_size as usize;
1741 let layout = Layout::from_size_align(node_size, std::mem::align_of::<LeafNode>()).unwrap();
1742 unsafe {
1743 std::alloc::dealloc(node as *mut u8, layout);
1744 }
1745 }
1746
1747 pub(crate) fn need_actually_merge_to_disk(&self) -> bool {
1748 for meta in self.meta_iter() {
1749 if meta.op_type().is_dirty() {
1750 return true;
1751 }
1752 }
1753
1754 false
1755 }
1756
1757 fn estimate_merge_size(&self) -> usize {
1758 let mut required_size = 0;
1759
1760 for meta in self.meta_iter() {
1761 if meta.op_type() == OpType::Insert {
1762 required_size += (meta.get_key_len() + meta.value_len()) as usize
1763 + std::mem::size_of::<LeafKVMeta>();
1764 }
1765 }
1766
1767 required_size
1768 }
1769
1770 pub(crate) fn full_with_fences(&self, requested_size: u16, max_fence_len: usize) -> bool {
1773 if self.meta.remaining_size < requested_size {
1774 return true;
1775 }
1776
1777 if self.meta.has_fence() {
1778 let mut empty_data_size = self.meta.remaining_size; let low_key_meta = self.get_kv_meta(LOW_FENCE_IDX);
1781 if !low_key_meta.is_infinite_low_fence_key() {
1782 empty_data_size += low_key_meta.get_key_len();
1783 }
1784
1785 let high_key_meta = self.get_kv_meta(HIGH_FENCE_IDX);
1786 if !high_key_meta.is_infinite_high_fence_key() {
1787 empty_data_size += high_key_meta.get_key_len();
1788 }
1789
1790 if empty_data_size >= requested_size + max_fence_len as u16 {
1791 return false;
1792 }
1793
1794 return true;
1795 }
1796
1797 false
1798 }
1799
1800 pub(crate) fn merge_mini_page(&mut self, mini_page: &LeafNode, max_fence_len: usize) -> bool {
1801 let size_required = mini_page.estimate_merge_size();
1802
1803 if self.full_with_fences(size_required as u16, max_fence_len) {
1804 return false;
1805 }
1806
1807 self.snapshot_version = mini_page.get_snapshot_version();
1809 for meta in mini_page.meta_iter() {
1810 let c_key = mini_page.get_full_key(meta);
1811 let c_type = meta.op_type();
1812 if !c_type.is_dirty() {
1813 continue;
1818 }
1819 let c_value = mini_page.get_value(meta);
1820 let rt = self.insert(&c_key, c_value, c_type, 0);
1821 assert!(rt);
1822 }
1823
1824 true
1825 }
1826
1827 pub(crate) fn get_stats(&self) -> LeafStats {
1828 let mut keys = Vec::new();
1829 let mut values = Vec::new();
1830 let mut op_types = Vec::new();
1831 let node_size = self.meta.node_size as usize;
1832 let prefix = self.get_prefix().to_owned();
1833
1834 for meta in self.meta_iter() {
1835 let key = self.get_full_key(meta);
1836 let value = self.get_value(meta);
1837 keys.push(key);
1838 values.push(value.to_owned());
1839 op_types.push(meta.op_type());
1840 }
1841
1842 LeafStats {
1843 keys,
1844 values,
1845 op_types,
1846 prefix,
1847 base_node: None,
1848 next_level: self.next_level,
1849 node_size,
1850 }
1851 }
1852
1853 pub(crate) fn has_fence(&self) -> bool {
1854 self.meta.has_fence()
1855 }
1856
1857 pub(crate) fn meta_iter(&self) -> LeafMetaIter<'_> {
1859 LeafMetaIter::new(self)
1860 }
1861
1862 fn first_meta_pos_after_fence(&self) -> u16 {
1863 if self.has_fence() {
1864 FENCE_KEY_CNT as u16
1865 } else {
1866 0
1867 }
1868 }
1869
1870 pub(crate) fn get_record_by_pos_with_bound(
1871 &self,
1872 pos: u32,
1873 out_buffer: &mut [u8],
1874 return_field: ScanReturnField,
1875 bound_key: &Option<Vec<u8>>,
1876 ) -> GetScanRecordByPosResult {
1877 if pos >= self.meta.meta_count_with_fence() as u32 {
1878 return GetScanRecordByPosResult::EndOfLeaf;
1879 }
1880
1881 let meta = self.get_kv_meta(pos as usize);
1882
1883 if meta.op_type().is_absent() {
1884 return GetScanRecordByPosResult::Deleted;
1885 }
1886
1887 match return_field {
1888 ScanReturnField::Value => {
1889 if let Some(bk) = bound_key {
1890 let cmp = self.get_full_key(meta).as_slice().cmp(bk);
1891 if cmp == Ordering::Greater {
1892 return GetScanRecordByPosResult::BoundKeyExceeded;
1893 }
1894 }
1895
1896 let value = self.get_value(meta);
1897 let value_len = meta.value_len() as usize;
1898 out_buffer[..value_len].copy_from_slice(value);
1899 GetScanRecordByPosResult::Found(0, value_len as u32)
1900 }
1901 ScanReturnField::Key => {
1902 let full_key = self.get_full_key(meta);
1903
1904 if let Some(bk) = bound_key {
1905 let cmp = full_key.as_slice().cmp(bk);
1906 if cmp == Ordering::Greater {
1907 return GetScanRecordByPosResult::BoundKeyExceeded;
1908 }
1909 }
1910
1911 let key_len = full_key.len();
1912 out_buffer[..key_len].copy_from_slice(&full_key);
1913 GetScanRecordByPosResult::Found(key_len as u32, 0)
1914 }
1915 ScanReturnField::KeyAndValue => {
1916 let full_key = self.get_full_key(meta);
1917
1918 if let Some(bk) = bound_key {
1919 let cmp = full_key.as_slice().cmp(bk);
1920 if cmp == Ordering::Greater {
1921 return GetScanRecordByPosResult::BoundKeyExceeded;
1922 }
1923 }
1924
1925 let key_len = full_key.len();
1926 let value = self.get_value(meta);
1927 let value_len = meta.value_len() as usize;
1928
1929 out_buffer[..key_len].copy_from_slice(&full_key);
1930 out_buffer[key_len..key_len + value_len].copy_from_slice(value);
1931
1932 GetScanRecordByPosResult::Found(key_len as u32, value_len as u32)
1933 }
1934 }
1935 }
1936}
1937
1938pub(crate) struct LeafMetaIter<'a> {
1939 node: &'a LeafNode,
1940 cur: usize,
1941}
1942
1943impl LeafMetaIter<'_> {
1944 fn new(leaf: &LeafNode) -> LeafMetaIter<'_> {
1945 let cur = leaf.first_meta_pos_after_fence();
1946 LeafMetaIter {
1947 node: leaf,
1948 cur: cur as usize,
1949 }
1950 }
1951}
1952
1953impl<'a> Iterator for LeafMetaIter<'a> {
1954 type Item = &'a LeafKVMeta;
1955
1956 fn next(&mut self) -> Option<Self::Item> {
1957 if self.cur < self.node.meta.meta_count_with_fence() as usize {
1958 let rt = self.node.get_kv_meta(self.cur);
1959 self.cur += 1;
1960 Some(rt)
1961 } else {
1962 None
1963 }
1964 }
1965}
1966
1967pub(crate) enum GetScanRecordByPosResult {
1968 EndOfLeaf,
1969 Deleted,
1970 Found(u32, u32), BoundKeyExceeded, }
1973
1974#[derive(Debug, PartialEq, Eq, Clone)]
1975pub enum LeafReadResult {
1976 Deleted,
1977 NotFound,
1978 Found(u32),
1979 InvalidKey,
1980}
1981
1982#[cfg(test)]
1983mod tests {
1984 use super::*;
1985 use bytemuck::cast_slice;
1986 use rstest::rstest;
1987
1988 #[test]
1989 fn test_op_type_enum() {
1990 assert_eq!(OpType::Insert as u8, 0);
1991 assert_eq!(OpType::Delete as u8, 1);
1992 assert_eq!(OpType::Cache as u8, 2);
1993 }
1994
1995 #[test]
1996 fn test_make_prefixed_meta() {
1997 let key = [1, 2, 3, 4, 5];
1998 let prefix_len = 2;
1999 let meta = LeafKVMeta::make_prefixed_meta(10, 20, &key, prefix_len, OpType::Insert);
2000
2001 assert_eq!(meta.offset, 10);
2002 assert_eq!(meta.get_key_len(), 5);
2003 assert_eq!(meta.value_len(), 20);
2004 assert_eq!(meta.preview_bytes, [3, 4]);
2005 assert_eq!(meta.op_type(), OpType::Insert);
2006 assert!(!meta.is_referenced());
2007 }
2008
2009 #[test]
2010 fn test_make_infinite_high_fence_key() {
2011 let meta = LeafKVMeta::make_infinite_high_fence_key();
2012 assert_eq!(meta.offset, u16::MAX);
2013 assert_eq!(meta.get_key_len(), 0);
2014 assert_eq!(meta.value_len(), 0);
2015 assert!(meta.is_infinite_high_fence_key());
2016 }
2017
2018 #[test]
2019 fn test_make_infinite_low_fence_key() {
2020 let meta = LeafKVMeta::make_infinite_low_fence_key();
2021 assert_eq!(meta.offset, u16::MAX - 1);
2022 assert_eq!(meta.get_key_len(), 0);
2023 assert_eq!(meta.value_len(), 0);
2024 assert!(meta.is_infinite_low_fence_key());
2025 }
2026
2027 #[test]
2028 fn test_value_len() {
2029 let mut meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Insert);
2030 assert_eq!(meta.value_len(), 20);
2031
2032 meta.set_value_len(30);
2033 assert_eq!(meta.value_len(), 30);
2034 }
2035
2036 #[test]
2037 fn test_op_type() {
2038 let meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Delete);
2039 assert_eq!(meta.op_type(), OpType::Delete);
2040 }
2041
2042 #[test]
2043 fn test_mark_as_ref_and_clear_ref() {
2044 let meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Insert);
2045 assert!(!meta.is_referenced());
2046
2047 meta.mark_as_ref();
2048 assert!(meta.is_referenced());
2049
2050 meta.clear_ref();
2051 assert!(!meta.is_referenced());
2052 }
2053
2054 #[test]
2055 fn test_mark_as_deleted_and_is_deleted() {
2056 let mut meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Insert);
2057 assert!(!meta.is_deleted());
2058
2059 meta.mark_as_deleted();
2060 assert!(meta.is_deleted());
2061 }
2062
2063 #[rstest]
2067 #[case(vec![1], vec![2], 2)]
2068 #[case(vec![2], vec![1], 2)]
2069 fn test_get_merge_split_key(
2070 #[case] base_page_values: Vec<usize>,
2071 #[case] mini_page_values: Vec<usize>,
2072 #[case] splitting_key: usize,
2073 ) {
2074 let base = unsafe { &mut *LeafNode::make_base_page(4096) };
2075 let mini = unsafe { &mut *LeafNode::make_base_page(4096) }; for i in 0..base_page_values.len() {
2079 let n = &base_page_values[i];
2080 let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
2081 let key = cast_slice::<usize, u8>(n_slice);
2082 let value = cast_slice::<usize, u8>(n_slice);
2083
2084 let rt = base.insert(key, value, OpType::Insert, 2);
2085 assert!(rt);
2086 }
2087
2088 for i in 0..mini_page_values.len() {
2089 let n = &mini_page_values[i];
2090 let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
2091 let key = cast_slice::<usize, u8>(n_slice);
2092 let value = cast_slice::<usize, u8>(n_slice);
2093
2094 let rt = mini.insert(key, value, OpType::Insert, 2);
2095 assert!(rt);
2096 }
2097
2098 let merge_split_key_byte = base.get_merge_split_key(mini);
2100 let merge_splitting_key = cast_slice::<u8, usize>(&merge_split_key_byte);
2101
2102 assert_eq!(merge_splitting_key[0], splitting_key);
2103
2104 LeafNode::free_base_page(base);
2105 LeafNode::free_base_page(mini);
2106 }
2107
2108 #[rstest]
2113 #[case(vec![1, 2, 3, 4], 3)]
2114 #[case(vec![1], 2)]
2115 #[case(vec![2], 2)]
2116 fn test_split_with_key(#[case] base_page_values: Vec<usize>, #[case] splitting_key: usize) {
2117 let base = unsafe { &mut *LeafNode::make_base_page(4096) };
2118 let sibling = unsafe { &mut *LeafNode::make_base_page(4096) };
2119
2120 for i in 0..base_page_values.len() {
2122 let n = &base_page_values[i];
2123 let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
2124 let key = cast_slice::<usize, u8>(n_slice);
2125 let value = cast_slice::<usize, u8>(n_slice);
2126
2127 let rt = base.insert(key, value, OpType::Insert, 2);
2128 assert!(rt);
2129 }
2130
2131 let splitting_key_ptr = &splitting_key;
2132 let splitting_key_slice =
2133 unsafe { std::slice::from_raw_parts(splitting_key_ptr as *const usize, 1) };
2134 let splitting_key_byte_arrary = cast_slice::<usize, u8>(splitting_key_slice).to_vec();
2135
2136 base.split_with_key(sibling, &splitting_key_byte_arrary, false);
2138
2139 let mut out_buffer = vec![0u8; 1024];
2142 for i in 0..base_page_values.len() {
2143 let mut page = &mut *base;
2144 if base_page_values[i] >= splitting_key {
2145 page = &mut *sibling;
2146 }
2147
2148 let n = &base_page_values[i];
2149 let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
2150 let key = cast_slice::<usize, u8>(n_slice);
2151
2152 let rt = page.read_by_key(key, &mut out_buffer);
2153
2154 assert_eq!(rt, LeafReadResult::Found(key.len() as u32)); assert_eq!(&out_buffer[0..key.len()], key);
2156 }
2157
2158 LeafNode::free_base_page(base);
2159 LeafNode::free_base_page(sibling);
2160 }
2161}