1use core::panic;
5use std::{alloc::Layout, cmp::Ordering, sync::atomic};
6
7use crate::{
8 circular_buffer::CircularBufferPtr, counter, error::TreeError, range_scan::ScanReturnField,
9 utils::stats::LeafStats,
10};
11
12use super::{node_meta::NodeMeta, FENCE_KEY_CNT};
13
14#[repr(u8)]
17#[derive(PartialEq, Eq, Debug, Clone, Copy)]
18pub(crate) enum OpType {
19 Insert = 0,
20 Delete = 1,
21 Cache = 2, Phantom = 3, }
24
25impl OpType {
26 pub(crate) fn is_dirty(&self) -> bool {
27 match self {
28 OpType::Insert | OpType::Delete => true,
29 OpType::Cache | OpType::Phantom => false,
30 }
31 }
32
33 fn is_absent(&self) -> bool {
34 match self {
35 OpType::Insert | OpType::Cache => false,
36 OpType::Phantom | OpType::Delete => true,
37 }
38 }
39
40 fn is_cache(&self) -> bool {
41 match self {
42 OpType::Cache | OpType::Phantom => true,
43 OpType::Insert | OpType::Delete => false,
44 }
45 }
46}
47
48const PREVIEW_SIZE: usize = 2;
49
50const LOW_FENCE_IDX: usize = 0;
51const HIGH_FENCE_IDX: usize = 1;
52
53const OP_TYPE_SHIFT: u16 = 14;
54const KEY_LEN_MASK: u16 = 0x3F_FF; const REF_BIT_MASK: u16 = 0x80_00;
58const VALUE_LEN_MASK: u16 = 0x7F_FF; pub(crate) fn common_prefix_len(low_fence: &[u8], high_fence: &[u8]) -> u16 {
62 let mut prefix_len = 0;
63 for i in 0..std::cmp::min(low_fence.len(), high_fence.len()) {
64 if low_fence[i] == high_fence[i] {
65 prefix_len += 1;
66 } else {
67 break;
68 }
69 }
70 prefix_len
71}
72
73#[repr(C)]
74pub(crate) struct LeafKVMeta {
75 offset: u16,
76 op_type_key_len_in_byte: u16, ref_value_len_in_byte: std::sync::atomic::AtomicU16, preview_bytes: [u8; PREVIEW_SIZE],
79}
80
81impl LeafKVMeta {
82 pub(crate) fn make_prefixed_meta(
83 offset: u16,
84 value_len: u16,
85 key: &[u8],
86 prefix_len: u16,
87 op_type: OpType,
88 ) -> Self {
89 let mut meta = Self {
90 offset,
91 op_type_key_len_in_byte: key.len() as u16 | ((op_type as u16) << OP_TYPE_SHIFT),
92 ref_value_len_in_byte: std::sync::atomic::AtomicU16::new(value_len),
93 preview_bytes: [0; PREVIEW_SIZE],
94 };
95
96 for i in 0..std::cmp::min(key.len().saturating_sub(prefix_len as usize), PREVIEW_SIZE) {
97 meta.preview_bytes[i] = key[i + prefix_len as usize];
98 }
99
100 meta.clear_ref();
103 meta
104 }
105
106 pub fn make_infinite_high_fence_key() -> Self {
107 assert_eq!(std::mem::size_of::<Self>(), super::KV_META_SIZE);
108
109 Self {
110 offset: u16::MAX,
111 op_type_key_len_in_byte: 0,
112 ref_value_len_in_byte: std::sync::atomic::AtomicU16::new(0),
113 preview_bytes: [0; PREVIEW_SIZE],
114 }
115 }
116
117 pub fn make_infinite_low_fence_key() -> Self {
118 Self {
119 offset: u16::MAX - 1,
120 op_type_key_len_in_byte: 0,
121 ref_value_len_in_byte: std::sync::atomic::AtomicU16::new(0),
122 preview_bytes: [0; PREVIEW_SIZE],
123 }
124 }
125
126 pub fn is_infinite_low_fence_key(&self) -> bool {
127 self.offset == u16::MAX - 1
128 }
129
130 pub fn is_infinite_high_fence_key(&self) -> bool {
131 self.offset == u16::MAX
132 }
133
134 pub fn value_len(&self) -> u16 {
135 self.ref_value_len_in_byte.load(atomic::Ordering::Relaxed) & VALUE_LEN_MASK
136 }
137
138 pub fn set_value_len(&mut self, value: u16) {
139 let v = self.ref_value_len_in_byte.load(atomic::Ordering::Relaxed);
140 self.ref_value_len_in_byte.store(
141 (v & !VALUE_LEN_MASK) | (value & VALUE_LEN_MASK),
142 atomic::Ordering::Relaxed,
143 );
144 }
145
146 pub fn set_op_type(&mut self, op_type: OpType) {
147 self.op_type_key_len_in_byte = self.get_key_len() | ((op_type as u16) << OP_TYPE_SHIFT);
148 }
149
150 pub fn op_type(&self) -> OpType {
151 let l = self.op_type_key_len_in_byte;
152 let b = l >> OP_TYPE_SHIFT;
153 unsafe { std::mem::transmute(b as u8) }
154 }
155
156 pub fn mark_as_ref(&self) {
157 self.ref_value_len_in_byte
158 .fetch_or(REF_BIT_MASK, atomic::Ordering::Relaxed);
159 }
160
161 pub fn clear_ref(&self) {
162 self.ref_value_len_in_byte
163 .fetch_and(!REF_BIT_MASK, atomic::Ordering::Relaxed);
164 }
165
166 pub fn is_referenced(&self) -> bool {
167 self.ref_value_len_in_byte.load(atomic::Ordering::Relaxed) & REF_BIT_MASK != 0
168 }
169
170 pub fn mark_as_deleted(&mut self) {
171 self.set_op_type(OpType::Delete);
172 }
173
174 #[allow(dead_code)]
175 pub fn is_deleted(&self) -> bool {
176 self.op_type() == OpType::Delete
177 }
178
179 pub(crate) fn get_offset(&self) -> u16 {
180 self.offset
181 }
182
183 pub(crate) fn get_key_len(&self) -> u16 {
184 self.op_type_key_len_in_byte & KEY_LEN_MASK
185 }
186}
187
188#[derive(Debug, Clone, Copy)]
189pub(crate) struct MiniPageNextLevel {
190 val: usize,
191}
192
193impl MiniPageNextLevel {
194 pub(crate) fn new(val: usize) -> Self {
195 Self { val }
196 }
197
198 pub(crate) fn as_offset(&self) -> usize {
199 assert!(!self.is_null());
200 self.val
201 }
202
203 pub(crate) fn new_null() -> Self {
204 Self { val: usize::MAX }
205 }
206
207 pub(crate) fn is_null(&self) -> bool {
208 self.val == usize::MAX
209 }
210}
211
212#[repr(C)]
213pub(crate) struct LeafNode {
214 pub(crate) meta: NodeMeta,
215 prefix_len: u16,
216 pub(crate) next_level: MiniPageNextLevel,
217 pub(crate) lsn: u64,
218 snapshot_version: u64,
219 data: [u8; 0],
220}
221
222const _: () = assert!(std::mem::size_of::<LeafNode>() == 32);
223
224impl LeafNode {
225 fn max_data_size(node_size: usize) -> usize {
226 node_size - std::mem::size_of::<LeafNode>()
227 }
228
229 pub(crate) fn initialize_mini_page(
230 ptr: &CircularBufferPtr,
231 node_size: usize,
232 next_level: MiniPageNextLevel,
233 cache_only: bool,
234 snapshot_version: u64,
235 ) {
236 unsafe {
237 Self::init_node_with_fence(
238 ptr.as_ptr(),
239 &[],
240 &[],
241 node_size,
242 next_level,
243 false,
244 cache_only,
245 snapshot_version,
246 )
247 }
248 }
249
250 pub(crate) fn make_base_page(node_size: usize, snapshot_version: u64) -> *mut Self {
251 let layout = Layout::from_size_align(node_size, std::mem::align_of::<LeafNode>()).unwrap();
252 let ptr = unsafe { std::alloc::alloc(layout) };
253 unsafe {
254 Self::init_node_with_fence(
255 ptr,
256 &[],
257 &[],
258 node_size,
259 MiniPageNextLevel::new_null(),
260 true,
261 false,
262 snapshot_version,
263 );
264 }
265 ptr as *mut Self
266 }
267
268 pub(crate) fn covert_insert_records_to_cache(&mut self) {
270 let mut cur = self.first_meta_pos_after_fence();
271
272 while cur < self.meta.meta_count_with_fence() {
273 let meta = self.get_kv_meta_mut(cur as usize);
274 let op_type = meta.op_type();
275
276 match op_type {
277 OpType::Insert => {
278 meta.set_op_type(OpType::Cache);
279 }
280 OpType::Delete => {
281 meta.set_op_type(OpType::Phantom);
282 }
283 OpType::Cache | OpType::Phantom => {
284 unreachable!("Base page should not have op type: {:?}", op_type);
285 }
286 };
287
288 cur += 1;
289 }
290 }
291
292 pub(crate) fn convert_cache_records_to_insert(&mut self) {
294 let mut cur = self.first_meta_pos_after_fence();
295
296 while cur < self.meta.meta_count_with_fence() {
297 let meta = self.get_kv_meta_mut(cur as usize);
298 let op_type = meta.op_type();
299
300 match op_type {
301 OpType::Cache => {
302 meta.set_op_type(OpType::Insert);
303 }
304 OpType::Phantom => {
305 meta.set_op_type(OpType::Delete);
306 }
307 OpType::Insert | OpType::Delete => {
308 }
310 };
311
312 cur += 1;
313 }
314 }
315
316 #[allow(clippy::too_many_arguments)]
317 unsafe fn init_node_with_fence(
318 ptr: *mut u8,
319 low_fence: &[u8],
320 high_fence: &[u8],
321 node_size: usize,
322 next_level: MiniPageNextLevel,
323 has_fence: bool,
324 cache_only: bool,
325 snapshot_version: u64,
326 ) {
327 let ptr = ptr as *mut Self;
328
329 { &mut *ptr }.initialize(
330 low_fence,
331 high_fence,
332 node_size,
333 next_level,
334 has_fence,
335 cache_only,
336 snapshot_version,
337 );
338 }
339
340 pub(crate) fn get_kv_meta(&self, index: usize) -> &LeafKVMeta {
341 debug_assert!(index < self.meta.meta_count_with_fence() as usize);
342 let meta_ptr = self.data.as_ptr() as *const LeafKVMeta;
343 unsafe { &*meta_ptr.add(index) }
344 }
345
346 pub(crate) fn get_full_key(&self, meta: &LeafKVMeta) -> Vec<u8> {
347 let prefix = self.get_prefix();
348 let remaining = self.get_remaining_key(meta);
349 [prefix, remaining].concat()
350 }
351
352 pub(crate) fn get_low_fence_full_key(&self) -> Vec<u8> {
354 debug_assert!(LOW_FENCE_IDX < self.meta.meta_count_with_fence() as usize);
355 let meta_ptr = self.data.as_ptr() as *const LeafKVMeta;
356 let meta = unsafe { &*meta_ptr.add(LOW_FENCE_IDX) };
357
358 let key_offset = meta.get_offset();
359 let key_ptr = unsafe { self.data.as_ptr().add(key_offset as usize) };
360 unsafe {
361 let key = std::slice::from_raw_parts(key_ptr, (meta.get_key_len()) as usize);
362 [key].concat()
363 }
364 }
365
366 pub(crate) fn get_kv_meta_mut(&mut self, index: usize) -> &mut LeafKVMeta {
367 debug_assert!(index < self.meta.meta_count_with_fence() as usize);
368 let meta_ptr = self.data.as_mut_ptr() as *mut LeafKVMeta;
369 unsafe { meta_ptr.add(index).as_mut().unwrap() }
370 }
371
372 pub(crate) fn write_initial_kv_meta(&mut self, index: usize, meta: LeafKVMeta) {
373 let meta_ptr = self.data.as_mut_ptr() as *mut LeafKVMeta;
374 unsafe { meta_ptr.add(index).write(meta) };
375 }
376
377 pub(crate) fn get_remaining_key(&self, meta: &LeafKVMeta) -> &[u8] {
378 let key_offset = meta.get_offset();
379 let key_ptr = unsafe { self.data.as_ptr().add(key_offset as usize) };
380 unsafe {
381 std::slice::from_raw_parts(key_ptr, (meta.get_key_len() - self.prefix_len) as usize)
382 }
383 }
384
385 pub(crate) fn get_prefix(&self) -> &[u8] {
386 let m = self.get_kv_meta(LOW_FENCE_IDX);
387 let key_offset = m.get_offset();
388 unsafe {
389 std::slice::from_raw_parts(
390 self.data.as_ptr().add(key_offset as usize),
391 self.prefix_len as usize,
392 )
393 }
394 }
395
396 pub(crate) fn install_fence_key(&mut self, key: &[u8], is_high_fence: bool) {
397 let loc: u16 = self.meta.meta_count_with_fence();
398 if !is_high_fence {
399 debug_assert!(self.meta.meta_count_with_fence() == LOW_FENCE_IDX as u16);
400 } else {
401 debug_assert!(self.meta.meta_count_with_fence() == HIGH_FENCE_IDX as u16);
402 }
403
404 let cur_low_offset = self.current_lowest_offset();
405
406 self.meta.increment_value_count();
407 self.meta.remaining_size -= std::mem::size_of::<LeafKVMeta>() as u16;
408
409 if key.is_empty() {
410 let fence = if is_high_fence {
411 LeafKVMeta::make_infinite_high_fence_key()
412 } else {
413 LeafKVMeta::make_infinite_low_fence_key()
414 };
415 self.write_initial_kv_meta(loc as usize, fence);
416 } else {
417 let prefix_len = if is_high_fence { self.prefix_len } else { 0 }; let post_fix_len = key.len() as u16 - prefix_len;
420 let val_len = 0u16;
421 let kv_len = post_fix_len + val_len;
422
423 let remaining = self.meta.remaining_size;
424
425 debug_assert!(kv_len <= remaining);
426
427 let offset = cur_low_offset - kv_len;
428
429 let new_meta =
430 LeafKVMeta::make_prefixed_meta(offset, 0, key, prefix_len, OpType::Insert);
431
432 self.write_initial_kv_meta(loc as usize, new_meta);
433
434 unsafe {
435 let start_ptr = self.data.as_mut_ptr().add(offset as usize);
436
437 let key_slice = std::slice::from_raw_parts(
438 key.as_ptr().add(prefix_len as usize),
439 post_fix_len as usize,
440 );
441 std::ptr::copy_nonoverlapping(key_slice.as_ptr(), start_ptr, post_fix_len as usize);
442 }
443
444 self.meta.remaining_size -= kv_len;
445 }
446 }
447
448 pub(crate) fn current_lowest_offset(&self) -> u16 {
449 let value_count = self.meta.meta_count_with_fence();
450 let rt =
451 (value_count * std::mem::size_of::<LeafKVMeta>() as u16) + self.meta.remaining_size;
452
453 #[cfg(debug_assertions)]
455 {
456 let mut min_offset = LeafNode::max_data_size(self.meta.node_size as usize) as u16;
457 for i in 0..value_count {
458 let kv_meta = self.get_kv_meta(i as usize);
459 if kv_meta.is_infinite_low_fence_key() || kv_meta.is_infinite_high_fence_key() {
460 continue;
461 }
462 min_offset = std::cmp::min(min_offset, kv_meta.offset);
463 }
464 assert!(min_offset == rt);
465 }
466 rt
467 }
468
469 pub(crate) fn get_value(&self, meta: &LeafKVMeta) -> &[u8] {
470 let val_offset = meta.get_offset() + meta.get_key_len() - self.prefix_len;
471 let val_ptr = unsafe { self.data.as_ptr().add(val_offset as usize) };
472 unsafe { std::slice::from_raw_parts(val_ptr, meta.value_len() as usize) }
473 }
474
475 pub fn get_split_key(&self, cache_only: bool) -> (Vec<u8>, u16) {
486 let mut data_size = 0;
487
488 for meta in self.meta_iter() {
489 let key_len = meta.get_key_len();
490 let value_len = meta.value_len();
491
492 data_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
493 }
494
495 let split_target_size = data_size / 2;
496 let mut cur_size = 0;
497
498 for (i, meta) in self.meta_iter().enumerate() {
499 let key_len = meta.get_key_len();
500 let value_len = meta.value_len();
501
502 cur_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
503
504 if cache_only && i == 0 {
506 continue;
507 }
508
509 if cur_size >= split_target_size {
510 return (self.get_full_key(meta), i as u16);
511 }
512 }
513 unreachable!();
514 }
515
516 #[allow(clippy::unused_enumerate_index)]
518 pub fn get_kv_num_below_key(&self, merge_split_key: &Vec<u8>) -> u16 {
519 let mut cnt: u16 = 0;
521 for (_, meta) in self.meta_iter().enumerate() {
522 let key = self.get_full_key(meta);
523 let cmp = key.cmp(merge_split_key);
524 if cmp == std::cmp::Ordering::Less {
527 cnt += 1;
528 } else {
529 break;
530 }
531 }
532 cnt
533 }
534
535 pub fn get_cache_only_insert_split_key(&self, key: &[u8], new_record_size: &u16) -> Vec<u8> {
540 let mut merge_split_key_1: Option<Vec<u8>> = None;
541 let mut merge_split_key_2: Option<Vec<u8>> = None;
542 let mut diff_1: i16 = i16::MAX;
543 let mut diff_2: i16 = i16::MAX;
544
545 let mut total_merged_size: u16 = 0;
547
548 for meta in self.meta_iter() {
549 let key_len = meta.get_key_len();
550 let value_len = meta.value_len();
551
552 total_merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
553 }
554
555 total_merged_size += new_record_size + std::mem::size_of::<LeafKVMeta>() as u16;
556 let split_target_size = total_merged_size / 2;
557
558 let mut merged_size: u16 = 0;
560 let mut self_meta_iter = self.meta_iter();
561 let mut self_meta_option = self_meta_iter.next();
562
563 if self_meta_option.is_some() {
565 let mut cur_base_meta = self_meta_option.unwrap();
566 let mut cur_base_key = self.get_full_key(cur_base_meta);
567 let mut cmp = cur_base_key.as_slice().cmp(key);
568
569 while cmp == std::cmp::Ordering::Less {
570 let key_len = cur_base_meta.get_key_len();
571 let value_len = cur_base_meta.value_len();
572 merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
573 if merged_size >= split_target_size {
574 if merge_split_key_2.is_some() {
577 break;
578 }
579
580 let left_side: u16 = merged_size
581 - key_len
582 - value_len
583 - std::mem::size_of::<LeafKVMeta>() as u16;
584 let right_side = total_merged_size - left_side;
585
586 if merge_split_key_1.is_none() {
587 merge_split_key_1 = Some(cur_base_key);
588 diff_1 = (left_side as i16 - right_side as i16).abs();
589 } else {
590 merge_split_key_2 = Some(cur_base_key);
591 diff_2 = (left_side as i16 - right_side as i16).abs();
592 }
593 }
594
595 self_meta_option = self_meta_iter.next();
596 if let Some(meta) = self_meta_option {
597 cur_base_meta = meta;
598 cur_base_key = self.get_full_key(cur_base_meta);
599 cmp = cur_base_key.as_slice().cmp(key);
600 } else {
601 break;
602 }
603 }
604 }
605
606 if merge_split_key_2.is_none() {
608 merged_size += new_record_size + std::mem::size_of::<LeafKVMeta>() as u16;
609
610 if merged_size >= split_target_size {
611 let left_side: u16 =
615 merged_size - new_record_size - std::mem::size_of::<LeafKVMeta>() as u16;
616 let right_side = total_merged_size - left_side;
617
618 if merge_split_key_1.is_none() {
619 merge_split_key_1 = Some(key.to_vec());
620 diff_1 = (left_side as i16 - right_side as i16).abs();
621 } else {
622 merge_split_key_2 = Some(key.to_vec());
623 diff_2 = (left_side as i16 - right_side as i16).abs();
624 }
625 }
626 }
627
628 while self_meta_option.is_some() {
630 let base_meta = self_meta_option.unwrap();
631 let key_len = base_meta.get_key_len();
632 let value_len = base_meta.value_len();
633 merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
634
635 if merged_size >= split_target_size {
636 let cur_base_key = self.get_full_key(base_meta);
638 if merge_split_key_2.is_some() {
639 break;
640 }
641
642 let left_side: u16 =
643 merged_size - key_len - value_len - std::mem::size_of::<LeafKVMeta>() as u16;
644 let right_side = total_merged_size - left_side;
645
646 if merge_split_key_1.is_none() {
647 merge_split_key_1 = Some(cur_base_key);
648 diff_1 = (left_side as i16 - right_side as i16).abs();
649 } else {
650 merge_split_key_2 = Some(cur_base_key);
651 diff_2 = (left_side as i16 - right_side as i16).abs();
652 }
653 }
654 self_meta_option = self_meta_iter.next();
655 }
656
657 if merge_split_key_1.is_none() {
660 panic!(
661 "Fail to find a splitting key for merging mini and base page.{}, {}",
662 merged_size, split_target_size
663 );
664 }
665
666 if merge_split_key_2.is_none() || diff_1 < diff_2 {
667 return merge_split_key_1.unwrap();
668 }
669
670 merge_split_key_2.unwrap()
671 }
672
673 #[allow(clippy::unnecessary_unwrap)]
680 pub(crate) fn get_merge_split_key(&mut self, mini_page: &LeafNode) -> Vec<u8> {
681 let mut merge_split_key_1: Option<Vec<u8>> = None;
682 let mut merge_split_key_2: Option<Vec<u8>> = None;
683 let mut diff_1: i16 = i16::MAX;
684 let mut diff_2: i16 = i16::MAX;
685
686 let mut total_merged_size: u16 = 0;
687 let mut base_meta_iter = self.meta_iter();
688 let mut cur_pos = base_meta_iter.cur;
689 let mut cur_base_meta_option = base_meta_iter.next();
690 let mut duplicate_positions = vec![];
691
692 for mini_meta in mini_page.meta_iter() {
694 let c_type = mini_meta.op_type();
695 if !c_type.is_dirty() {
697 continue;
698 }
699
700 let cur_mini_key = mini_page.get_full_key(mini_meta);
701 if cur_base_meta_option.is_some() {
702 let mut cur_base_meta = cur_base_meta_option.unwrap();
703 let mut cur_base_key = self.get_full_key(cur_base_meta);
704 let mut cmp = cur_base_key.cmp(&cur_mini_key);
705
706 while cmp == std::cmp::Ordering::Less {
709 let key_len = cur_base_meta.get_key_len();
710 let value_len = cur_base_meta.value_len();
711 total_merged_size +=
712 key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
713
714 cur_pos = base_meta_iter.cur;
715 cur_base_meta_option = base_meta_iter.next();
716 if cur_base_meta_option.is_some() {
717 cur_base_meta = cur_base_meta_option.unwrap();
718 cur_base_key = self.get_full_key(cur_base_meta);
719 cmp = cur_base_key.cmp(&cur_mini_key);
720 } else {
721 break;
722 }
723 }
724
725 if cmp == std::cmp::Ordering::Equal && cur_base_meta_option.is_some() {
728 duplicate_positions.push(cur_pos);
730 cur_pos = base_meta_iter.cur;
731 cur_base_meta_option = base_meta_iter.next();
732 }
733 }
734
735 let key_len = mini_meta.get_key_len();
736 let value_len = mini_meta.value_len();
737 total_merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
738 }
739
740 while cur_base_meta_option.is_some() {
743 let base_meta = cur_base_meta_option.unwrap();
744 let key_len = base_meta.get_key_len();
745 let value_len = base_meta.value_len();
746 total_merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
747
748 cur_base_meta_option = base_meta_iter.next();
749 }
750
751 let split_target_size = total_merged_size / 2;
753
754 let mut merged_size: u16 = 0;
757 base_meta_iter = self.meta_iter();
758 cur_base_meta_option = base_meta_iter.next();
759
760 for mini_meta in mini_page.meta_iter() {
761 let c_type = mini_meta.op_type();
764 if !c_type.is_dirty() {
765 continue;
766 }
767
768 let cur_mini_key = mini_page.get_full_key(mini_meta);
769 if cur_base_meta_option.is_some() {
770 let mut cur_base_meta = cur_base_meta_option.unwrap();
771 let mut cur_base_key = self.get_full_key(cur_base_meta);
772 let mut cmp = cur_base_key.cmp(&cur_mini_key);
773 while cmp == std::cmp::Ordering::Less {
776 let key_len = cur_base_meta.get_key_len();
777 let value_len = cur_base_meta.value_len();
778 merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
779 if merged_size >= split_target_size {
780 if merge_split_key_2.is_some() {
783 break;
784 }
785
786 let left_side: u16 = merged_size
787 - key_len
788 - value_len
789 - std::mem::size_of::<LeafKVMeta>() as u16;
790 let right_side = total_merged_size - left_side;
791
792 if merge_split_key_1.is_none() {
793 merge_split_key_1 = Some(cur_base_key);
794 diff_1 = (left_side as i16 - right_side as i16).abs();
795 } else {
796 merge_split_key_2 = Some(cur_base_key);
797 diff_2 = (left_side as i16 - right_side as i16).abs();
798 }
799 }
800
801 cur_base_meta_option = base_meta_iter.next();
802 if cur_base_meta_option.is_some() {
803 cur_base_meta = cur_base_meta_option.unwrap();
804 cur_base_key = self.get_full_key(cur_base_meta);
805 cmp = cur_base_key.cmp(&cur_mini_key);
806 } else {
807 break;
808 }
809 }
810
811 if cmp == std::cmp::Ordering::Equal && cur_base_meta_option.is_some() {
814 cur_base_meta_option = base_meta_iter.next();
815 }
816 }
817
818 let key_len = mini_meta.get_key_len();
819 let value_len = mini_meta.value_len();
820 merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
821
822 if merged_size >= split_target_size {
823 if merge_split_key_2.is_some() {
826 break;
827 }
828
829 let left_side: u16 =
830 merged_size - key_len - value_len - std::mem::size_of::<LeafKVMeta>() as u16;
831 let right_side = total_merged_size - left_side;
832
833 if merge_split_key_1.is_none() {
834 merge_split_key_1 = Some(cur_mini_key);
835 diff_1 = (left_side as i16 - right_side as i16).abs();
836 } else {
837 merge_split_key_2 = Some(cur_mini_key);
838 diff_2 = (left_side as i16 - right_side as i16).abs();
839 }
840 }
841 }
842
843 while cur_base_meta_option.is_some() {
846 let base_meta = cur_base_meta_option.unwrap();
847 let key_len = base_meta.get_key_len();
848 let value_len = base_meta.value_len();
849 merged_size += key_len + value_len + std::mem::size_of::<LeafKVMeta>() as u16;
850
851 if merged_size >= split_target_size {
852 let cur_base_key = self.get_full_key(base_meta);
854 if merge_split_key_2.is_some() {
855 break;
856 }
857
858 let left_side: u16 =
859 merged_size - key_len - value_len - std::mem::size_of::<LeafKVMeta>() as u16;
860 let right_side = total_merged_size - left_side;
861
862 if merge_split_key_1.is_none() {
863 merge_split_key_1 = Some(cur_base_key);
864 diff_1 = (left_side as i16 - right_side as i16).abs();
865 } else {
866 merge_split_key_2 = Some(cur_base_key);
867 diff_2 = (left_side as i16 - right_side as i16).abs();
868 }
869 }
870 cur_base_meta_option = base_meta_iter.next();
871 }
872
873 for pos in duplicate_positions {
875 let pos_meta = self.get_kv_meta_mut(pos);
876 pos_meta.mark_as_deleted();
877 }
878
879 if merge_split_key_1.is_none() {
880 unreachable!(
881 "Fail to find a splitting key for merging mini and base page.{}, {}",
882 merged_size, split_target_size
883 );
884 }
885
886 if merge_split_key_2.is_some() {
888 let cmp = merge_split_key_1
889 .as_ref()
890 .unwrap()
891 .cmp(merge_split_key_2.as_ref().unwrap());
892 assert_ne!(cmp, std::cmp::Ordering::Equal);
893 }
894
895 let mut splitting_key = if merge_split_key_2.is_none() || diff_1 < diff_2 {
896 merge_split_key_1.as_ref().unwrap()
897 } else {
898 merge_split_key_2.as_ref().unwrap()
899 };
900
901 let mut fence_meta = self.get_kv_meta(LOW_FENCE_IDX);
904 if !fence_meta.is_infinite_low_fence_key() {
905 let low_fence_key = self.get_low_fence_key();
906 let cmp = splitting_key.cmp(&low_fence_key);
907 if cmp == std::cmp::Ordering::Equal {
908 splitting_key = merge_split_key_2.as_ref().unwrap();
909 }
910 }
911
912 fence_meta = self.get_kv_meta(HIGH_FENCE_IDX);
913 if !fence_meta.is_infinite_high_fence_key() {
914 let high_fence_key = self.get_high_fence_key();
915 let cmp = splitting_key.cmp(&high_fence_key);
916 assert_ne!(cmp, std::cmp::Ordering::Equal);
917 }
918
919 splitting_key.clone()
920 }
921
922 pub fn get_key_to_reach_this_node(&self) -> Vec<u8> {
923 let meta = self.get_kv_meta(self.first_meta_pos_after_fence() as usize);
924 self.get_full_key(meta)
925 }
926
927 pub fn try_get_key_to_reach_this_node(&self) -> Result<Vec<u8>, TreeError> {
931 assert!(self.meta.is_cache_only_leaf());
932
933 let index = 0;
935 if index >= self.meta.meta_count_with_fence() as usize {
936 return Err(TreeError::NeedRestart);
937 }
938
939 let meta_ptr = self.data.as_ptr() as *const LeafKVMeta;
940 let meta = unsafe { &*meta_ptr.add(index) };
941
942 let key_offset = meta.get_offset();
943 let key_len = meta.get_key_len();
944
945 if key_offset + key_len > LeafNode::max_data_size(self.meta.node_size as usize) as u16 {
946 return Err(TreeError::NeedRestart);
947 }
948
949 let key_ptr = unsafe { self.data.as_ptr().add(key_offset as usize) };
950 let key_slice =
951 unsafe { std::slice::from_raw_parts(key_ptr, (key_len - self.prefix_len) as usize) };
952 Ok(key_slice.to_vec())
953 }
954
955 pub fn get_low_fence_key(&self) -> Vec<u8> {
956 assert!(self.has_fence());
957 let fence_meta = self.get_kv_meta(LOW_FENCE_IDX);
958 if fence_meta.is_infinite_low_fence_key() {
959 vec![]
960 } else {
961 unsafe {
962 let start_ptr = self.data.as_ptr().add(fence_meta.offset as usize);
963 let key_slice =
964 std::slice::from_raw_parts(start_ptr, fence_meta.get_key_len() as usize);
965 key_slice.to_vec()
966 }
967 }
968 }
969
970 pub fn get_high_fence_key(&self) -> Vec<u8> {
971 assert!(self.has_fence());
972 let fence_meta = self.get_kv_meta(HIGH_FENCE_IDX);
973 if fence_meta.is_infinite_high_fence_key() {
974 vec![]
975 } else {
976 self.get_full_key(fence_meta)
977 }
978 }
979
980 #[inline]
983 pub(crate) fn key_cmp(&self, meta: &LeafKVMeta, key: &[u8]) -> Ordering {
984 let search_key_prefix = &key[(self.prefix_len as usize)..]
985 [..std::cmp::min(key.len() - self.prefix_len as usize, PREVIEW_SIZE)];
986
987 let prefix_key = &meta.preview_bytes[..std::cmp::min(
988 PREVIEW_SIZE,
989 meta.get_key_len() as usize - self.prefix_len as usize,
990 )];
991 let mut cmp = prefix_key.cmp(search_key_prefix);
992
993 if cmp == Ordering::Equal {
995 let full_key = self.get_remaining_key(meta);
996 let search_key_postfix = &key[self.prefix_len as usize..];
997 cmp = full_key.cmp(search_key_postfix);
998 }
999 cmp
1000 }
1001
1002 pub(crate) fn linear_lower_bound(&self, key: &[u8]) -> u16 {
1003 debug_assert!(key.len() >= self.prefix_len as usize);
1004
1005 let mut index = self.first_meta_pos_after_fence();
1006
1007 while index < self.meta.meta_count_with_fence() {
1008 let key_meta = self.get_kv_meta(index as usize);
1009
1010 #[cfg(target_arch = "x86_64")]
1011 unsafe {
1012 core::arch::x86_64::_mm_clflush(key_meta as *const LeafKVMeta as *const u8);
1014 }
1015 let cmp = self.key_cmp(key_meta, key);
1016
1017 if cmp != Ordering::Less {
1018 return index;
1019 }
1020
1021 index += 1;
1022 }
1023
1024 index
1025 }
1026
1027 pub(crate) fn lower_bound(&self, key: &[u8]) -> u16 {
1028 let mut lower = self.first_meta_pos_after_fence();
1029 let mut upper = self.meta.meta_count_with_fence();
1030
1031 let search_key_prefix = &key[(self.prefix_len as usize)..]
1032 [..std::cmp::min(key.len() - self.prefix_len as usize, PREVIEW_SIZE)];
1033
1034 debug_assert!(key.len() >= self.prefix_len as usize);
1035
1036 while lower < upper {
1037 let mid = lower + (upper - lower) / 2;
1038 let key_meta = self.get_kv_meta(mid as usize);
1039
1040 let prefix_key = &key_meta.preview_bytes[..std::cmp::min(
1041 PREVIEW_SIZE,
1042 key_meta.get_key_len() as usize - self.prefix_len as usize,
1043 )];
1044 let mut cmp = prefix_key.cmp(search_key_prefix);
1045
1046 if cmp == Ordering::Equal {
1048 let remaining_key = self.get_remaining_key(key_meta);
1049 let search_key_postfix = &key[self.prefix_len as usize..];
1050 cmp = remaining_key.cmp(search_key_postfix);
1051 }
1052
1053 match cmp {
1054 Ordering::Greater => {
1055 upper = mid;
1056 }
1057 Ordering::Equal => {
1058 return mid;
1059 }
1060 Ordering::Less => {
1061 lower = mid + 1;
1062 }
1063 }
1064 }
1065 lower
1066 }
1067
1068 pub(crate) fn insert(
1076 &mut self,
1077 key: &[u8],
1078 value: &[u8],
1079 op_type: OpType,
1080 max_fence_len: usize,
1081 ) -> bool {
1082 debug_assert!(key.len() as u16 >= self.prefix_len);
1083 match op_type {
1084 OpType::Insert | OpType::Cache => {
1085 debug_assert!(!value.is_empty());
1086 }
1087 OpType::Delete | OpType::Phantom => {}
1088 }
1089
1090 let post_fix_len = key.len() as u16 - self.prefix_len;
1091 let val_len = value.len() as u16;
1092 let kv_len = post_fix_len + val_len;
1093
1094 let value_count_with_fence = self.meta.meta_count_with_fence();
1095
1096 let pos = self.lower_bound(key) as usize;
1097
1098 if pos < value_count_with_fence as usize {
1099 let prefix_len = self.prefix_len as usize;
1100 let pos_meta = self.get_kv_meta(pos);
1101 let pos_key = self.get_remaining_key(pos_meta);
1102 let search_key_postfix = &key[prefix_len..];
1103 if pos_key.cmp(search_key_postfix) == Ordering::Equal {
1104 counter!(LeafInsertDuplicate);
1106 if op_type == OpType::Delete {
1107 let pos_meta = self.get_kv_meta_mut(pos);
1108 pos_meta.mark_as_deleted();
1109 return true;
1110 }
1111
1112 let pos_value = self.get_value(pos_meta);
1113 let pos_value_len = pos_value.len() as u16;
1114
1115 if pos_value_len >= val_len {
1116 unsafe {
1118 let pair_ptr = self.data.as_ptr().add(pos_meta.offset as usize) as *mut u8;
1119 std::ptr::copy_nonoverlapping(
1120 value.as_ptr(),
1121 pair_ptr.add(post_fix_len as usize),
1122 val_len as usize,
1123 );
1124 }
1125 let pos_meta = self.get_kv_meta_mut(pos);
1126 pos_meta.set_value_len(val_len);
1127 pos_meta.set_op_type(op_type);
1128 return true;
1129 }
1130
1131 if self.meta.remaining_size < kv_len {
1132 return false;
1133 }
1134 assert!(op_type != OpType::Cache);
1135 let offset = self.current_lowest_offset() - kv_len;
1136 unsafe {
1137 let pair_ptr = self.data.as_ptr().add(offset as usize) as *mut u8;
1138
1139 let pos_meta = self.get_kv_meta_mut(pos);
1140 pos_meta.set_value_len(val_len);
1141 pos_meta.set_op_type(op_type);
1142 pos_meta.offset = offset;
1143 std::ptr::copy_nonoverlapping(
1144 key[self.prefix_len as usize..].as_ptr(),
1145 pair_ptr,
1146 post_fix_len as usize,
1147 );
1148 std::ptr::copy_nonoverlapping(
1149 value.as_ptr(),
1150 pair_ptr.add(post_fix_len as usize),
1151 val_len as usize,
1152 );
1153 }
1154 self.meta.remaining_size -= kv_len;
1155 return true;
1156 }
1157 }
1158
1159 if op_type == OpType::Delete && (self.is_base_page()) {
1163 return true;
1165 }
1166
1167 if self.full_with_fences(
1169 kv_len + std::mem::size_of::<LeafKVMeta>() as u16,
1170 max_fence_len,
1171 ) {
1172 return false;
1173 }
1174
1175 counter!(LeafInsertNew);
1176
1177 let offset = self.current_lowest_offset() - kv_len;
1178 let new_meta =
1179 LeafKVMeta::make_prefixed_meta(offset, val_len, key, self.prefix_len, op_type);
1180
1181 unsafe {
1182 let metas_size = std::mem::size_of::<LeafKVMeta>()
1183 * (self.meta.meta_count_with_fence() - pos as u16) as usize;
1184 let src_ptr = self
1185 .data
1186 .as_ptr()
1187 .add(pos * std::mem::size_of::<LeafKVMeta>());
1188 let dest_ptr = self
1189 .data
1190 .as_mut_ptr()
1191 .add((pos + 1) * std::mem::size_of::<LeafKVMeta>());
1192 std::ptr::copy(src_ptr, dest_ptr, metas_size);
1193
1194 self.write_initial_kv_meta(pos, new_meta);
1195
1196 let pair_ptr = self.data.as_mut_ptr().add(offset as usize);
1197 std::ptr::copy_nonoverlapping(
1198 key[self.prefix_len as usize..].as_ptr(),
1199 pair_ptr,
1200 post_fix_len as usize,
1201 );
1202 std::ptr::copy_nonoverlapping(
1203 value.as_ptr(),
1204 pair_ptr.add(post_fix_len as usize),
1205 val_len as usize,
1206 );
1207 }
1208
1209 self.meta.remaining_size -= kv_len + std::mem::size_of::<LeafKVMeta>() as u16;
1210 self.meta.increment_value_count();
1211 true
1212 }
1213
1214 pub(crate) fn is_base_page(&self) -> bool {
1216 (!self.meta.is_cache_only_leaf()) && self.next_level.is_null()
1217 }
1218
1219 pub fn split(
1223 &mut self,
1224 sibling: &mut LeafNode,
1225 cache_only: bool,
1226 snapshot_version: u64,
1227 ) -> Vec<u8> {
1228 if cache_only {
1229 assert!(self.meta.is_cache_only_leaf() && !self.has_fence());
1230 } else {
1231 assert!(self.is_base_page() && self.has_fence());
1232 }
1233
1234 let current_count = self.meta.meta_count_without_fence();
1235
1236 let (splitting_key, new_cur_count) = self.get_split_key(cache_only);
1241 let sibling_cnt = current_count - new_cur_count;
1242
1243 if !cache_only {
1247 let low_fence_for_right = self.get_kv_meta(FENCE_KEY_CNT + new_cur_count as usize);
1248 assert!(!low_fence_for_right.is_infinite_low_fence_key());
1249 let high_fence_for_right = self.get_kv_meta(HIGH_FENCE_IDX);
1250
1251 let low_fence_key = self.get_full_key(low_fence_for_right);
1252 let high_fence_key = if high_fence_for_right.is_infinite_high_fence_key() {
1253 vec![]
1254 } else {
1255 self.get_full_key(high_fence_for_right)
1256 };
1257
1258 sibling.initialize(
1259 &low_fence_key,
1260 &high_fence_key,
1261 sibling.meta.node_size as usize,
1262 MiniPageNextLevel::new_null(),
1263 true,
1264 cache_only,
1265 snapshot_version,
1266 );
1267 } else {
1268 sibling.initialize(
1269 &[],
1270 &[],
1271 sibling.meta.node_size as usize,
1272 MiniPageNextLevel::new_null(),
1273 false,
1274 cache_only,
1275 snapshot_version,
1276 );
1277 }
1278
1279 let starting_kv_idx = if cache_only { 0 } else { FENCE_KEY_CNT };
1280
1281 for i in 0..sibling_cnt {
1282 let kv_meta = self.get_kv_meta((new_cur_count + i) as usize + starting_kv_idx);
1283 if kv_meta.op_type() == OpType::Delete {
1284 continue;
1286 }
1287 let key = self.get_full_key(kv_meta);
1288 let value = self.get_value(kv_meta);
1289 let insert_rt = sibling.insert(&key, value, OpType::Insert, 0);
1290 assert!(insert_rt);
1291 }
1292
1293 if !cache_only {
1294 self.meta
1295 .set_value_count(new_cur_count + FENCE_KEY_CNT as u16);
1296 } else {
1297 self.meta.set_value_count(new_cur_count);
1298 }
1299
1300 self.consolidate_after_split(&splitting_key, snapshot_version);
1301
1302 splitting_key
1303 }
1304
1305 pub fn split_with_key(
1310 &mut self,
1311 sibling: &mut LeafNode,
1312 merge_split_key: &Vec<u8>,
1313 cache_only: bool,
1314 snapshot_version: u64,
1315 ) {
1316 if cache_only {
1317 assert!(self.meta.is_cache_only_leaf() && !self.has_fence());
1318 } else {
1319 assert!(self.is_base_page() && self.has_fence());
1320 }
1321
1322 let current_count = self.meta.meta_count_without_fence();
1323 let mut new_cur_count = self.get_kv_num_below_key(merge_split_key);
1328 let sibling_cnt = current_count - new_cur_count;
1329
1330 if !cache_only {
1334 let high_fence_for_right = self.get_kv_meta(HIGH_FENCE_IDX);
1335
1336 let low_fence_key = merge_split_key.clone();
1337 let high_fence_key = if high_fence_for_right.is_infinite_high_fence_key() {
1338 vec![]
1339 } else {
1340 self.get_full_key(high_fence_for_right)
1341 };
1342
1343 sibling.initialize(
1344 &low_fence_key,
1345 &high_fence_key,
1346 sibling.meta.node_size as usize,
1347 MiniPageNextLevel::new_null(),
1348 true,
1349 cache_only,
1350 snapshot_version,
1351 );
1352 } else {
1353 sibling.initialize(
1354 &[],
1355 &[],
1356 sibling.meta.node_size as usize,
1357 MiniPageNextLevel::new_null(),
1358 false,
1359 cache_only,
1360 snapshot_version,
1361 );
1362 }
1363
1364 let starting_kv_index = if cache_only { 0 } else { FENCE_KEY_CNT };
1365
1366 for i in 0..sibling_cnt {
1367 let kv_meta = self.get_kv_meta((new_cur_count + i) as usize + starting_kv_index);
1368 if kv_meta.op_type() == OpType::Delete {
1369 continue;
1371 }
1372 let key = self.get_full_key(kv_meta);
1373 let value = self.get_value(kv_meta);
1374 let insert_rt = sibling.insert(&key, value, OpType::Insert, 0);
1375 assert!(insert_rt);
1376 }
1377
1378 if !cache_only {
1379 new_cur_count += FENCE_KEY_CNT as u16;
1380 }
1381
1382 self.meta.set_value_count(new_cur_count);
1383 self.consolidate_after_split(merge_split_key, snapshot_version);
1384
1385 if !cache_only {
1386 let left_high_fence = self.get_kv_meta(HIGH_FENCE_IDX);
1389 let left_high_fence_key = self.get_full_key(left_high_fence);
1390 let right_low_fence_key = sibling.get_low_fence_full_key();
1391
1392 assert_eq!(left_high_fence_key, *merge_split_key); assert_eq!(right_low_fence_key, *merge_split_key);
1394 }
1395 }
1396
1397 pub(crate) fn consolidate_inner(
1398 &mut self,
1399 new_optype: OpType,
1400 new_high_fence: Option<&[u8]>,
1401 skip_tombstone: bool,
1402 cache_only: bool,
1403 skip_key: Option<&[u8]>,
1404 snapshot_version: u64,
1405 ) {
1406 let mut pairs = Vec::new();
1407
1408 for meta in self.meta_iter() {
1409 if skip_tombstone && meta.op_type() == OpType::Delete {
1410 continue;
1412 }
1413
1414 match skip_key {
1415 Some(s_k) => {
1417 let k = self.get_full_key(meta);
1418 let cmp = s_k.cmp(&k);
1419
1420 if cmp != Ordering::Equal {
1421 pairs.push((k, self.get_value(meta).to_owned(), meta.op_type()));
1422 }
1423 }
1424 None => {
1425 pairs.push((
1426 self.get_full_key(meta),
1427 self.get_value(meta).to_owned(),
1428 meta.op_type(),
1429 ));
1430 }
1431 }
1432 }
1433
1434 let has_fence = self.has_fence();
1435 let (low_fence, high_fence) = if has_fence {
1436 let high_fence_key = match new_high_fence {
1437 None => self.get_high_fence_key(),
1438 Some(key) => key.to_vec(),
1439 };
1440 (self.get_low_fence_key(), high_fence_key)
1441 } else {
1442 (vec![], vec![])
1443 };
1444
1445 let node_size = self.meta.node_size;
1446
1447 self.initialize(
1448 &low_fence,
1449 &high_fence,
1450 node_size as usize,
1451 self.next_level,
1452 has_fence,
1453 cache_only,
1454 snapshot_version,
1455 );
1456
1457 for (key, value, op_type) in pairs {
1458 let rt = if op_type == OpType::Delete {
1459 self.insert(&key, &value, OpType::Delete, 0)
1460 } else {
1461 self.insert(&key, &value, new_optype, 0)
1462 };
1463 assert!(rt);
1464 }
1465 }
1466
1467 #[allow(dead_code)]
1468 pub(crate) fn consolidate(&mut self, snapshot_version: u64) {
1469 self.consolidate_inner(
1470 OpType::Insert,
1471 None,
1472 true,
1473 self.meta.is_cache_only_leaf(),
1474 None,
1475 snapshot_version,
1476 );
1477 }
1478
1479 #[allow(dead_code)]
1482 pub(crate) fn consolidate_after_merge(&mut self, snapshot_version: u64) {
1483 assert!(!self.is_base_page());
1484 self.consolidate_inner(
1485 OpType::Cache,
1486 None,
1487 false,
1488 self.meta.is_cache_only_leaf(),
1489 None,
1490 snapshot_version,
1491 );
1492 }
1493
1494 fn consolidate_after_split(&mut self, high_fence: &[u8], snapshot_version: u64) {
1500 assert!(self.meta.is_cache_only_leaf() || self.is_base_page());
1501 self.consolidate_inner(
1502 OpType::Insert,
1503 Some(high_fence),
1504 true,
1505 self.meta.is_cache_only_leaf(),
1506 None,
1507 snapshot_version,
1508 );
1509 }
1510
1511 pub(crate) fn consolidate_skip_key(&mut self, key: &[u8], snapshot_version: u64) {
1515 assert!(!self.is_base_page());
1516 assert!(self.meta.is_cache_only_leaf());
1517 self.consolidate_inner(
1518 OpType::Insert,
1519 None,
1520 true,
1521 self.meta.is_cache_only_leaf(),
1522 Some(key),
1523 snapshot_version,
1524 );
1525 }
1526
1527 pub(crate) fn copy_initialize_to(
1529 &self,
1530 dst_node: *mut LeafNode,
1531 dst_size: usize,
1532 discard_cold_cache: bool,
1533 snapshot_version: u64,
1534 ) {
1535 assert!(!self.is_base_page());
1536 assert!(self.meta.node_size as usize <= dst_size);
1537 let dst_ref = unsafe { &mut *dst_node };
1538 let empty = vec![];
1539
1540 dst_ref.initialize(
1541 &empty,
1542 &empty,
1543 dst_size,
1544 self.next_level,
1545 false, self.meta.is_cache_only_leaf(),
1547 snapshot_version,
1548 );
1549
1550 for meta in self.meta_iter() {
1551 let op = meta.op_type();
1552
1553 if discard_cold_cache && op.is_cache() && !meta.is_referenced() {
1558 continue;
1559 }
1560
1561 let value = self.get_value(meta);
1562 let rt = dst_ref.insert(&self.get_full_key(meta), value, op, 0);
1563 assert!(rt);
1564 }
1565 }
1566
1567 #[allow(clippy::too_many_arguments)]
1569 pub(crate) fn initialize(
1570 &mut self,
1571 low_fence: &[u8],
1572 high_fence: &[u8],
1573 node_size: usize,
1574 next_level: MiniPageNextLevel,
1575 has_fence: bool,
1576 cache_only: bool,
1577 snapshot_version: u64,
1578 ) {
1579 self.snapshot_version = snapshot_version & Self::LEAF_SNAPSHOT_VERSION_MASK;
1581
1582 if !has_fence {
1583 assert!(low_fence.is_empty());
1584 assert!(high_fence.is_empty());
1585
1586 self.meta = NodeMeta::new(
1587 LeafNode::max_data_size(node_size) as u16,
1588 false,
1589 false,
1590 node_size as u16,
1591 cache_only,
1592 );
1593 self.prefix_len = 0;
1594 self.next_level = next_level;
1595 } else {
1596 self.meta = NodeMeta::new(
1597 LeafNode::max_data_size(node_size) as u16, false,
1599 true,
1600 node_size as u16,
1601 cache_only,
1602 );
1603
1604 self.prefix_len = common_prefix_len(low_fence, high_fence);
1605 self.next_level = next_level;
1606 self.install_fence_key(low_fence, false);
1607 self.install_fence_key(high_fence, true);
1608 }
1609 }
1610
1611 pub(crate) fn set_split_flag(&mut self) {
1612 self.meta.set_split_flag();
1613 }
1614
1615 pub(crate) fn get_split_flag(&self) -> bool {
1616 self.meta.get_split_flag()
1617 }
1618
1619 const LEAF_SNAPSHOT_VERSION_CHANGED_FLAG: u64 = 0x8000_0000_0000_0000;
1622 const LEAF_SNAPSHOT_VERSION_MASK: u64 = 0x7FFF_FFFF_FFFF_FFFF;
1623
1624 pub(crate) fn get_clean_snapshot_version(&self) -> u64 {
1626 self.snapshot_version & Self::LEAF_SNAPSHOT_VERSION_MASK
1627 }
1628
1629 pub(crate) fn set_snapshot_version(&mut self, snapshot_version: u64, mark_changed: bool) {
1631 let next_version = snapshot_version & Self::LEAF_SNAPSHOT_VERSION_MASK;
1632
1633 if !mark_changed {
1634 self.snapshot_version = next_version;
1635 return;
1636 }
1637
1638 self.snapshot_version = next_version | Self::LEAF_SNAPSHOT_VERSION_CHANGED_FLAG;
1639 }
1640
1641 pub(crate) fn is_snapshot_version_changed(&self) -> bool {
1643 (self.snapshot_version & Self::LEAF_SNAPSHOT_VERSION_CHANGED_FLAG) != 0
1644 }
1645
1646 pub(crate) fn set_snapshot_version_changed_flag(&mut self) {
1648 self.snapshot_version |= Self::LEAF_SNAPSHOT_VERSION_CHANGED_FLAG;
1649 }
1650
1651 pub(crate) fn clear_snapshot_version_changed_flag(&mut self) {
1653 self.snapshot_version &= Self::LEAF_SNAPSHOT_VERSION_MASK;
1654 }
1655
1656 pub(crate) fn read_by_key(&self, search_key: &[u8], out_buffer: &mut [u8]) -> LeafReadResult {
1657 self.read_by_key_inner(search_key, out_buffer, true)
1658 }
1659
1660 #[must_use]
1662 pub(crate) fn read_by_key_inner(
1663 &self,
1664 search_key: &[u8],
1665 out_buffer: &mut [u8],
1666 binary_search: bool,
1667 ) -> LeafReadResult {
1668 let val_count = self.meta.meta_count_with_fence();
1669 let pos = if binary_search {
1670 self.lower_bound(search_key)
1671 } else {
1672 self.linear_lower_bound(search_key)
1673 };
1674
1675 if pos >= val_count {
1676 counter!(LeafNotFoundDueToRange);
1677 return LeafReadResult::NotFound;
1678 }
1679
1680 let kv_meta = self.get_kv_meta(pos as usize);
1681 let target_key = self.get_remaining_key(kv_meta);
1682
1683 if !kv_meta.is_referenced() {
1685 kv_meta.mark_as_ref();
1686 }
1687
1688 let input_post_key = &search_key[self.prefix_len as usize..];
1689 let cmp = target_key.cmp(input_post_key);
1690
1691 if cmp != Ordering::Equal {
1692 counter!(LeafNotFoundDueToKey);
1693 LeafReadResult::NotFound
1694 } else {
1695 if kv_meta.op_type().is_absent() {
1696 return LeafReadResult::Deleted;
1697 }
1698 let val_len = kv_meta.value_len();
1699 let val_ref = self.get_value(kv_meta);
1700 debug_assert_eq!(val_len as usize, val_ref.len());
1701 out_buffer[..val_len as usize].copy_from_slice(val_ref);
1702 LeafReadResult::Found(val_len as u32)
1703 }
1704 }
1705
1706 pub(crate) fn get_chain_size_hint(
1711 key_len: usize,
1712 value_len: usize,
1713 page_classes: &[usize],
1714 cache_only: bool,
1715 ) -> usize {
1716 let mut initial_record_size = key_len + value_len + std::mem::size_of::<LeafKVMeta>();
1717 initial_record_size += std::mem::size_of::<LeafNode>();
1718
1719 if let Some(s) = page_classes[0..(page_classes.len() - 1)]
1720 .iter()
1721 .position(|x| initial_record_size < *x)
1722 {
1723 return page_classes[s];
1724 } else if cache_only && initial_record_size <= page_classes[page_classes.len() - 1] {
1725 return page_classes[page_classes.len() - 1];
1726 }
1727
1728 panic!(
1729 "Record size {} plus metadata exceeds the max mini-page size {:?}",
1730 initial_record_size, page_classes
1731 );
1732 }
1733
1734 pub(crate) fn new_size_if_upgrade(
1740 &self,
1741 incoming_size: usize,
1742 page_classes: &[usize],
1743 cache_only: bool,
1744 ) -> Option<usize> {
1745 let cur_size = self.meta.node_size as usize;
1746 let request_size = cur_size + incoming_size;
1747
1748 if let Some(s) = page_classes[0..(page_classes.len() - 1)]
1749 .iter()
1750 .position(|x| request_size < *x)
1751 {
1752 if s == 0 {
1753 panic!("Should not be here");
1754 }
1755 return Some(page_classes[s]);
1756 } else if cache_only && request_size <= page_classes[page_classes.len() - 1] {
1757 return Some(page_classes[page_classes.len() - 1]);
1758 }
1759
1760 None
1761 }
1762
1763 pub(crate) fn free_base_page(node: *mut LeafNode) {
1765 assert!(unsafe { &*node }.is_base_page());
1766 let node_size = unsafe { &*node }.meta.node_size as usize;
1767 let layout = Layout::from_size_align(node_size, std::mem::align_of::<LeafNode>()).unwrap();
1768 unsafe {
1769 std::alloc::dealloc(node as *mut u8, layout);
1770 }
1771 }
1772
1773 pub(crate) fn need_actually_merge_to_disk(&self) -> bool {
1774 for meta in self.meta_iter() {
1775 if meta.op_type().is_dirty() {
1776 return true;
1777 }
1778 }
1779
1780 false
1781 }
1782
1783 fn estimate_merge_size(&self) -> usize {
1784 let mut required_size = 0;
1785
1786 for meta in self.meta_iter() {
1787 if meta.op_type() == OpType::Insert {
1788 required_size += (meta.get_key_len() + meta.value_len()) as usize
1789 + std::mem::size_of::<LeafKVMeta>();
1790 }
1791 }
1792
1793 required_size
1794 }
1795
1796 pub(crate) fn full_with_fences(&self, requested_size: u16, max_fence_len: usize) -> bool {
1799 if self.meta.remaining_size < requested_size {
1800 return true;
1801 }
1802
1803 if self.meta.has_fence() {
1804 let mut empty_data_size = self.meta.remaining_size; let low_key_meta = self.get_kv_meta(LOW_FENCE_IDX);
1807 if !low_key_meta.is_infinite_low_fence_key() {
1808 empty_data_size += low_key_meta.get_key_len();
1809 }
1810
1811 let high_key_meta = self.get_kv_meta(HIGH_FENCE_IDX);
1812 if !high_key_meta.is_infinite_high_fence_key() {
1813 empty_data_size += high_key_meta.get_key_len();
1814 }
1815
1816 if empty_data_size >= requested_size + max_fence_len as u16 {
1817 return false;
1818 }
1819
1820 return true;
1821 }
1822
1823 false
1824 }
1825
1826 pub(crate) fn merge_mini_page(&mut self, mini_page: &LeafNode, max_fence_len: usize) -> bool {
1827 let size_required = mini_page.estimate_merge_size();
1828
1829 if self.full_with_fences(size_required as u16, max_fence_len) {
1830 return false;
1831 }
1832
1833 self.snapshot_version = mini_page.get_clean_snapshot_version();
1835 for meta in mini_page.meta_iter() {
1836 let c_key = mini_page.get_full_key(meta);
1837 let c_type = meta.op_type();
1838 if !c_type.is_dirty() {
1839 continue;
1844 }
1845 let c_value = mini_page.get_value(meta);
1846 let rt = self.insert(&c_key, c_value, c_type, 0);
1847 assert!(rt);
1848 }
1849
1850 true
1851 }
1852
1853 pub(crate) fn get_stats(&self) -> LeafStats {
1854 let mut keys = Vec::new();
1855 let mut values = Vec::new();
1856 let mut op_types = Vec::new();
1857 let node_size = self.meta.node_size as usize;
1858 let prefix = self.get_prefix().to_owned();
1859
1860 for meta in self.meta_iter() {
1861 let key = self.get_full_key(meta);
1862 let value = self.get_value(meta);
1863 keys.push(key);
1864 values.push(value.to_owned());
1865 op_types.push(meta.op_type());
1866 }
1867
1868 LeafStats {
1869 keys,
1870 values,
1871 op_types,
1872 prefix,
1873 base_node: None,
1874 next_level: self.next_level,
1875 node_size,
1876 }
1877 }
1878
1879 pub(crate) fn has_fence(&self) -> bool {
1880 self.meta.has_fence()
1881 }
1882
1883 pub(crate) fn meta_iter(&self) -> LeafMetaIter<'_> {
1885 LeafMetaIter::new(self)
1886 }
1887
1888 fn first_meta_pos_after_fence(&self) -> u16 {
1889 if self.has_fence() {
1890 FENCE_KEY_CNT as u16
1891 } else {
1892 0
1893 }
1894 }
1895
1896 pub(crate) fn get_record_by_pos_with_bound(
1897 &self,
1898 pos: u32,
1899 out_buffer: &mut [u8],
1900 return_field: ScanReturnField,
1901 bound_key: &Option<Vec<u8>>,
1902 ) -> GetScanRecordByPosResult {
1903 if pos >= self.meta.meta_count_with_fence() as u32 {
1904 return GetScanRecordByPosResult::EndOfLeaf;
1905 }
1906
1907 let meta = self.get_kv_meta(pos as usize);
1908
1909 if meta.op_type().is_absent() {
1910 return GetScanRecordByPosResult::Deleted;
1911 }
1912
1913 match return_field {
1914 ScanReturnField::Value => {
1915 if let Some(bk) = bound_key {
1916 let cmp = self.get_full_key(meta).as_slice().cmp(bk);
1917 if cmp == Ordering::Greater {
1918 return GetScanRecordByPosResult::BoundKeyExceeded;
1919 }
1920 }
1921
1922 let value = self.get_value(meta);
1923 let value_len = meta.value_len() as usize;
1924 out_buffer[..value_len].copy_from_slice(value);
1925 GetScanRecordByPosResult::Found(0, value_len as u32)
1926 }
1927 ScanReturnField::Key => {
1928 let full_key = self.get_full_key(meta);
1929
1930 if let Some(bk) = bound_key {
1931 let cmp = full_key.as_slice().cmp(bk);
1932 if cmp == Ordering::Greater {
1933 return GetScanRecordByPosResult::BoundKeyExceeded;
1934 }
1935 }
1936
1937 let key_len = full_key.len();
1938 out_buffer[..key_len].copy_from_slice(&full_key);
1939 GetScanRecordByPosResult::Found(key_len as u32, 0)
1940 }
1941 ScanReturnField::KeyAndValue => {
1942 let full_key = self.get_full_key(meta);
1943
1944 if let Some(bk) = bound_key {
1945 let cmp = full_key.as_slice().cmp(bk);
1946 if cmp == Ordering::Greater {
1947 return GetScanRecordByPosResult::BoundKeyExceeded;
1948 }
1949 }
1950
1951 let key_len = full_key.len();
1952 let value = self.get_value(meta);
1953 let value_len = meta.value_len() as usize;
1954
1955 out_buffer[..key_len].copy_from_slice(&full_key);
1956 out_buffer[key_len..key_len + value_len].copy_from_slice(value);
1957
1958 GetScanRecordByPosResult::Found(key_len as u32, value_len as u32)
1959 }
1960 }
1961 }
1962}
1963
1964pub(crate) struct LeafMetaIter<'a> {
1965 node: &'a LeafNode,
1966 cur: usize,
1967}
1968
1969impl LeafMetaIter<'_> {
1970 fn new(leaf: &LeafNode) -> LeafMetaIter<'_> {
1971 let cur = leaf.first_meta_pos_after_fence();
1972 LeafMetaIter {
1973 node: leaf,
1974 cur: cur as usize,
1975 }
1976 }
1977}
1978
1979impl<'a> Iterator for LeafMetaIter<'a> {
1980 type Item = &'a LeafKVMeta;
1981
1982 fn next(&mut self) -> Option<Self::Item> {
1983 if self.cur < self.node.meta.meta_count_with_fence() as usize {
1984 let rt = self.node.get_kv_meta(self.cur);
1985 self.cur += 1;
1986 Some(rt)
1987 } else {
1988 None
1989 }
1990 }
1991}
1992
1993pub(crate) enum GetScanRecordByPosResult {
1994 EndOfLeaf,
1995 Deleted,
1996 Found(u32, u32), BoundKeyExceeded, }
1999
2000#[derive(Debug, PartialEq, Eq, Clone)]
2001pub enum LeafReadResult {
2002 Deleted,
2003 NotFound,
2004 Found(u32),
2005 InvalidKey,
2006}
2007
2008#[cfg(test)]
2009mod tests {
2010 use super::*;
2011 use bytemuck::cast_slice;
2012 use rstest::rstest;
2013
2014 #[test]
2015 fn test_op_type_enum() {
2016 assert_eq!(OpType::Insert as u8, 0);
2017 assert_eq!(OpType::Delete as u8, 1);
2018 assert_eq!(OpType::Cache as u8, 2);
2019 }
2020
2021 #[test]
2022 fn test_make_prefixed_meta() {
2023 let key = [1, 2, 3, 4, 5];
2024 let prefix_len = 2;
2025 let meta = LeafKVMeta::make_prefixed_meta(10, 20, &key, prefix_len, OpType::Insert);
2026
2027 assert_eq!(meta.offset, 10);
2028 assert_eq!(meta.get_key_len(), 5);
2029 assert_eq!(meta.value_len(), 20);
2030 assert_eq!(meta.preview_bytes, [3, 4]);
2031 assert_eq!(meta.op_type(), OpType::Insert);
2032 assert!(!meta.is_referenced());
2033 }
2034
2035 #[test]
2036 fn test_make_infinite_high_fence_key() {
2037 let meta = LeafKVMeta::make_infinite_high_fence_key();
2038 assert_eq!(meta.offset, u16::MAX);
2039 assert_eq!(meta.get_key_len(), 0);
2040 assert_eq!(meta.value_len(), 0);
2041 assert!(meta.is_infinite_high_fence_key());
2042 }
2043
2044 #[test]
2045 fn test_make_infinite_low_fence_key() {
2046 let meta = LeafKVMeta::make_infinite_low_fence_key();
2047 assert_eq!(meta.offset, u16::MAX - 1);
2048 assert_eq!(meta.get_key_len(), 0);
2049 assert_eq!(meta.value_len(), 0);
2050 assert!(meta.is_infinite_low_fence_key());
2051 }
2052
2053 #[test]
2054 fn test_value_len() {
2055 let mut meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Insert);
2056 assert_eq!(meta.value_len(), 20);
2057
2058 meta.set_value_len(30);
2059 assert_eq!(meta.value_len(), 30);
2060 }
2061
2062 #[test]
2063 fn test_op_type() {
2064 let meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Delete);
2065 assert_eq!(meta.op_type(), OpType::Delete);
2066 }
2067
2068 #[test]
2069 fn test_mark_as_ref_and_clear_ref() {
2070 let meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Insert);
2071 assert!(!meta.is_referenced());
2072
2073 meta.mark_as_ref();
2074 assert!(meta.is_referenced());
2075
2076 meta.clear_ref();
2077 assert!(!meta.is_referenced());
2078 }
2079
2080 #[test]
2081 fn test_mark_as_deleted_and_is_deleted() {
2082 let mut meta = LeafKVMeta::make_prefixed_meta(10, 20, &[1, 2, 3], 0, OpType::Insert);
2083 assert!(!meta.is_deleted());
2084
2085 meta.mark_as_deleted();
2086 assert!(meta.is_deleted());
2087 }
2088
2089 #[rstest]
2093 #[case(vec![1], vec![2], 2)]
2094 #[case(vec![2], vec![1], 2)]
2095 fn test_get_merge_split_key(
2096 #[case] base_page_values: Vec<usize>,
2097 #[case] mini_page_values: Vec<usize>,
2098 #[case] splitting_key: usize,
2099 ) {
2100 let base = unsafe {
2101 &mut *LeafNode::make_base_page(4096, crate::snapshot::INVALID_SNAPSHOT_VERSION)
2102 };
2103 let mini = unsafe {
2104 &mut *LeafNode::make_base_page(4096, crate::snapshot::INVALID_SNAPSHOT_VERSION)
2105 }; for i in 0..base_page_values.len() {
2109 let n = &base_page_values[i];
2110 let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
2111 let key = cast_slice::<usize, u8>(n_slice);
2112 let value = cast_slice::<usize, u8>(n_slice);
2113
2114 let rt = base.insert(key, value, OpType::Insert, 2);
2115 assert!(rt);
2116 }
2117
2118 for i in 0..mini_page_values.len() {
2119 let n = &mini_page_values[i];
2120 let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
2121 let key = cast_slice::<usize, u8>(n_slice);
2122 let value = cast_slice::<usize, u8>(n_slice);
2123
2124 let rt = mini.insert(key, value, OpType::Insert, 2);
2125 assert!(rt);
2126 }
2127
2128 let merge_split_key_byte = base.get_merge_split_key(mini);
2130 let merge_splitting_key = cast_slice::<u8, usize>(&merge_split_key_byte);
2131
2132 assert_eq!(merge_splitting_key[0], splitting_key);
2133
2134 LeafNode::free_base_page(base);
2135 LeafNode::free_base_page(mini);
2136 }
2137
2138 #[rstest]
2143 #[case(vec![1, 2, 3, 4], 3)]
2144 #[case(vec![1], 2)]
2145 #[case(vec![2], 2)]
2146 fn test_split_with_key(#[case] base_page_values: Vec<usize>, #[case] splitting_key: usize) {
2147 let base = unsafe {
2148 &mut *LeafNode::make_base_page(4096, crate::snapshot::INVALID_SNAPSHOT_VERSION)
2149 };
2150 let sibling = unsafe {
2151 &mut *LeafNode::make_base_page(4096, crate::snapshot::INVALID_SNAPSHOT_VERSION)
2152 };
2153
2154 for i in 0..base_page_values.len() {
2156 let n = &base_page_values[i];
2157 let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
2158 let key = cast_slice::<usize, u8>(n_slice);
2159 let value = cast_slice::<usize, u8>(n_slice);
2160
2161 let rt = base.insert(key, value, OpType::Insert, 2);
2162 assert!(rt);
2163 }
2164
2165 let splitting_key_ptr = &splitting_key;
2166 let splitting_key_slice =
2167 unsafe { std::slice::from_raw_parts(splitting_key_ptr as *const usize, 1) };
2168 let splitting_key_byte_arrary = cast_slice::<usize, u8>(splitting_key_slice).to_vec();
2169
2170 base.split_with_key(
2172 sibling,
2173 &splitting_key_byte_arrary,
2174 false,
2175 crate::snapshot::INVALID_SNAPSHOT_VERSION,
2176 );
2177
2178 let mut out_buffer = vec![0u8; 1024];
2181 for i in 0..base_page_values.len() {
2182 let mut page = &mut *base;
2183 if base_page_values[i] >= splitting_key {
2184 page = &mut *sibling;
2185 }
2186
2187 let n = &base_page_values[i];
2188 let n_slice = unsafe { std::slice::from_raw_parts(n as *const usize, 1) };
2189 let key = cast_slice::<usize, u8>(n_slice);
2190
2191 let rt = page.read_by_key(key, &mut out_buffer);
2192
2193 assert_eq!(rt, LeafReadResult::Found(key.len() as u32)); assert_eq!(&out_buffer[0..key.len()], key);
2195 }
2196
2197 LeafNode::free_base_page(base);
2198 LeafNode::free_base_page(sibling);
2199 }
2200}