1#[cfg(not(all(feature = "shuttle", test)))]
5use rand::Rng;
6
7#[cfg(all(feature = "shuttle", test))]
8use shuttle::rand::Rng;
9
10use cfg_if::cfg_if;
11
12cfg_if! {
13 if #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))] {
14 use crate::metric::{Timer, TlsRecorder, timer::{TimerRecorder, DebugTimerGuard}};
15 use std::cell::UnsafeCell;
16 use thread_local::ThreadLocal;
17 }
18}
19
20use crate::{
21 check_parent,
22 circular_buffer::{CircularBufferMetrics, TombstoneHandle},
23 counter,
24 error::{ConfigError, TreeError},
25 histogram, info,
26 mini_page_op::{upgrade_to_full_page, LeafEntryXLocked, LeafOperations, ReadResult},
27 nodes::{
28 leaf_node::{LeafKVMeta, LeafReadResult, MiniPageNextLevel, OpType},
29 InnerNode, InnerNodeBuilder, LeafNode, PageID, CACHE_LINE_SIZE, DISK_PAGE_SIZE,
30 INNER_NODE_SIZE, MAX_KEY_LEN, MAX_LEAF_PAGE_SIZE, MAX_VALUE_LEN,
31 },
32 range_scan::{ScanIter, ScanIterMut, ScanReturnField},
33 snapshot::{CPRSnapShotMgr, PhaseId},
34 storage::{LeafStorage, PageLocation, PageTable},
35 sync::{
36 atomic::{AtomicU64, Ordering},
37 Arc,
38 },
39 utils::{get_rng, inner_lock::ReadGuard, Backoff, BfsVisitor, NodeInfo},
40 wal::{WriteAheadLog, WriteOp},
41 Config, StorageBackend,
42};
43use std::path::Path;
44
45pub struct BfTree {
47 pub(crate) root_page_id: AtomicU64,
48 pub(crate) storage: LeafStorage,
49 pub(crate) wal: Option<Arc<WriteAheadLog>>,
50 pub(crate) config: Arc<Config>,
51 pub(crate) write_load_full_page: bool,
52 pub(crate) cache_only: bool, pub(crate) mini_page_size_classes: Vec<usize>, pub(crate) snapshot_mgr: Option<Arc<CPRSnapShotMgr>>, #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
56 pub metrics_recorder: Option<Arc<ThreadLocal<UnsafeCell<TlsRecorder>>>>, }
58
59unsafe impl Sync for BfTree {}
60
61unsafe impl Send for BfTree {}
62
63#[derive(Debug, PartialEq, Eq, Clone)]
64pub enum LeafInsertResult {
65 Success,
66 InvalidKV(String),
67}
68
69#[derive(Debug, PartialEq, Eq, Clone)]
70pub enum ScanIterError {
71 CacheOnlyMode,
72 InvalidStartKey,
73 InvalidEndKey,
74 InvalidCount,
75 InvalidKeyRange,
76}
77
78impl Drop for BfTree {
79 fn drop(&mut self) {
80 if let Some(ref wal) = self.wal {
81 wal.stop_background_job();
82 }
83
84 let visitor = BfsVisitor::new_all_nodes(self);
85 for node_info in visitor {
86 match node_info {
87 NodeInfo::Leaf { page_id, .. } => {
88 let mut leaf = self.mapping_table().get_mut(&page_id);
89 leaf.dealloc_self(&self.storage, self.cache_only);
90 }
91 NodeInfo::Inner { ptr, .. } => {
92 if unsafe { &*ptr }.is_valid_disk_offset() {
93 let disk_offset = unsafe { &*ptr }.disk_offset;
94 if self.config.storage_backend == StorageBackend::Memory {
95 self.storage.vfs.dealloc_offset(disk_offset as usize);
97 }
98 }
99 InnerNode::free_node(ptr as *mut InnerNode);
100 }
101 }
102 }
103 }
104}
105
106impl Default for BfTree {
107 fn default() -> Self {
108 Self::new(":memory:", 1024 * 1024 * 32).unwrap()
109 }
110}
111
112impl BfTree {
113 pub(crate) const ROOT_IS_LEAF_MASK: u64 = 0x8000_0000_0000_0000; pub(crate) fn create_mem_page_size_classes(
135 min_record_size_in_byte: usize,
136 max_record_size_in_byte: usize,
137 leaf_page_size_in_byte: usize,
138 max_fence_len_in_byte: usize,
139 cache_only: bool,
140 ) -> Vec<usize> {
141 assert!(
143 min_record_size_in_byte > 1,
144 "cb_min_record_size in config cannot be less than 2"
145 );
146 assert!(
147 min_record_size_in_byte <= max_record_size_in_byte,
148 "cb_min_record_size cannot be larger than cb_max_record_size"
149 );
150 assert!(
151 max_fence_len_in_byte > 0,
152 "max_fence_len in config cannot be zero"
153 );
154 assert!(
155 max_fence_len_in_byte / 2 < max_record_size_in_byte,
156 "max_fence_len/2 cannot be larger than cb_max_record_size"
157 );
158 assert!(
159 leaf_page_size_in_byte <= MAX_LEAF_PAGE_SIZE,
160 "leaf_page_size in config cannot be larger than {}",
161 MAX_LEAF_PAGE_SIZE
162 );
163 assert!(
164 max_fence_len_in_byte / 2 <= MAX_KEY_LEN,
165 "max_key_len in config cannot be larger than {}",
166 MAX_KEY_LEN
167 );
168 assert!(
169 leaf_page_size_in_byte / min_record_size_in_byte <= 4096,
170 "Maximum number of records per page (leaf_page_size/min_record_size) cannot exceed 2^12.", );
172
173 if !cache_only {
176 assert!(
177 leaf_page_size_in_byte.is_multiple_of(DISK_PAGE_SIZE),
178 "leaf_page_size in config should be multiple of {}",
179 DISK_PAGE_SIZE
180 );
181 } else {
182 assert!(
183 leaf_page_size_in_byte.is_multiple_of(CACHE_LINE_SIZE),
184 "leaf_page_size in config should be multiple of {}",
185 CACHE_LINE_SIZE
186 );
187 }
188
189 let max_record_size_with_meta = max_record_size_in_byte + std::mem::size_of::<LeafKVMeta>();
190 let mut max_mini_page_size: usize;
191
192 if cache_only {
193 max_mini_page_size = leaf_page_size_in_byte - max_record_size_with_meta;
195 max_mini_page_size = (max_mini_page_size / CACHE_LINE_SIZE) * CACHE_LINE_SIZE;
196
197 assert!(
198 leaf_page_size_in_byte
199 >= 2 * max_record_size_with_meta + std::mem::size_of::<LeafNode>(),
200 "cb_max_record_size of config should be <= {}",
201 (leaf_page_size_in_byte - std::mem::size_of::<LeafNode>()) / 2
202 - std::mem::size_of::<LeafKVMeta>()
203 );
204 } else {
205 max_mini_page_size = leaf_page_size_in_byte
207 - max_record_size_with_meta
208 - max_fence_len_in_byte
209 - 2 * std::mem::size_of::<LeafKVMeta>();
210 max_mini_page_size = (max_mini_page_size / CACHE_LINE_SIZE) * CACHE_LINE_SIZE;
211
212 assert!(
213 max_mini_page_size >= max_record_size_with_meta + std::mem::size_of::<LeafNode>(),
214 "cb_max_record_size of config should be <= {}",
215 max_mini_page_size
216 - std::mem::size_of::<LeafNode>()
217 - std::mem::size_of::<LeafKVMeta>()
218 );
219 }
220
221 let mut mem_page_size_classes = Vec::new();
223
224 let base: i32 = 2;
225 let mut record_num_per_page_exp: u32 = 0;
226 let c: usize = min_record_size_in_byte + std::mem::size_of::<LeafKVMeta>();
227
228 let mut size_class =
230 base.pow(record_num_per_page_exp) as usize * c + std::mem::size_of::<LeafNode>();
231
232 if !size_class.is_multiple_of(CACHE_LINE_SIZE) {
234 size_class = (size_class / CACHE_LINE_SIZE + 1) * CACHE_LINE_SIZE;
235 }
236
237 while size_class <= max_mini_page_size {
238 if mem_page_size_classes.is_empty()
239 || (mem_page_size_classes[mem_page_size_classes.len() - 1] < size_class)
240 {
241 mem_page_size_classes.push(size_class);
242 }
243
244 record_num_per_page_exp += 1;
245 size_class =
246 base.pow(record_num_per_page_exp) as usize * c + std::mem::size_of::<LeafNode>();
247
248 if !size_class.is_multiple_of(CACHE_LINE_SIZE) {
249 size_class = (size_class / CACHE_LINE_SIZE + 1) * CACHE_LINE_SIZE;
250 }
251 }
252
253 if !cache_only {
254 assert!(!mem_page_size_classes.is_empty());
255 }
256
257 if mem_page_size_classes.is_empty()
259 || mem_page_size_classes[mem_page_size_classes.len() - 1] < max_mini_page_size
260 {
261 mem_page_size_classes.push(max_mini_page_size);
262 }
263
264 if mem_page_size_classes.is_empty()
266 || mem_page_size_classes[mem_page_size_classes.len() - 1] < leaf_page_size_in_byte
267 {
268 mem_page_size_classes.push(leaf_page_size_in_byte);
269 }
270
271 if !cache_only {
272 assert!(mem_page_size_classes.len() >= 2);
273 } else {
274 assert!(!mem_page_size_classes.is_empty());
275 }
276
277 mem_page_size_classes
278 }
279
280 pub fn new(file_path: impl AsRef<Path>, cache_size_byte: usize) -> Result<Self, ConfigError> {
294 let config = Config::new(file_path, cache_size_byte);
295 Self::with_config(config, None)
296 }
297
298 pub fn new_with_config_file<P: AsRef<Path>>(config_file_path: P) -> Result<Self, ConfigError> {
301 let config = Config::new_with_config_file(config_file_path);
302 Self::with_config(config, None)
303 }
304
305 pub fn with_config(config: Config, buffer_ptr: Option<*mut u8>) -> Result<Self, ConfigError> {
308 config.validate()?;
310
311 let wal = match config.write_ahead_log.as_ref() {
312 Some(wal_config) => {
313 let wal = WriteAheadLog::new(wal_config.clone());
314 Some(wal)
315 }
316 None => None,
317 };
318 let write_load_full = config.write_load_full_page;
319 let config = Arc::new(config);
320
321 if config.cache_only {
323 let snapshot_mgr = if config.use_snapshot {
324 Some(Arc::new(CPRSnapShotMgr::new(config.snapshot_version)))
325 } else {
326 None
327 };
328
329 let leaf_storage = LeafStorage::new(config.clone(), buffer_ptr, snapshot_mgr.clone());
330
331 let snapshot_guard = match CPRSnapShotMgr::get_snapshot_guard(snapshot_mgr.clone()) {
334 Ok(guard) => guard,
335 Err(()) => {
336 panic!("Failed to acquire a snapshot guard for root page initialization");
337 }
338 };
339
340 let mini_page_guard = (leaf_storage)
342 .alloc_mini_page(config.leaf_page_size)
343 .expect("Fail to allocate a mini-page as initial root node");
344 LeafNode::initialize_mini_page(
345 &mini_page_guard,
346 config.leaf_page_size,
347 MiniPageNextLevel::new_null(),
348 true,
349 snapshot_guard.snapshot_version(),
350 );
351 let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
352 let mini_loc = PageLocation::Mini(new_mini_ptr);
353
354 let (root_id, root_lock) = leaf_storage
355 .mapping_table()
356 .insert_mini_page_mapping(mini_loc);
357 assert_eq!(root_id.as_id(), 0);
358
359 drop(root_lock);
360 drop(mini_page_guard);
361 let root_id = root_id.raw() | Self::ROOT_IS_LEAF_MASK;
362
363 return Ok(Self {
364 root_page_id: AtomicU64::new(root_id),
365 storage: leaf_storage,
366 wal,
367 cache_only: config.cache_only,
368 write_load_full_page: write_load_full,
369 mini_page_size_classes: Self::create_mem_page_size_classes(
370 config.cb_min_record_size,
371 config.cb_max_record_size,
372 config.leaf_page_size,
373 config.max_fence_len,
374 config.cache_only,
375 ),
376 snapshot_mgr,
377 config,
378 #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
379 metrics_recorder: Some(Arc::new(ThreadLocal::new())),
380 });
381 }
382
383 let snapshot_mgr = if config.use_snapshot {
384 Some(Arc::new(CPRSnapShotMgr::new(config.snapshot_version)))
385 } else {
386 None
387 };
388
389 let leaf_storage = LeafStorage::new(config.clone(), buffer_ptr, snapshot_mgr.clone());
390
391 let snapshot_guard = match CPRSnapShotMgr::get_snapshot_guard(snapshot_mgr.clone()) {
394 Ok(guard) => guard,
395 Err(()) => {
396 panic!("Failed to acquire a snapshot guard for root page initialization");
397 }
398 };
399
400 let (root_id, root_lock) = leaf_storage
401 .mapping_table()
402 .alloc_base_page_mapping(snapshot_guard.snapshot_version());
403 drop(root_lock);
404 assert_eq!(root_id.as_id(), 0);
405
406 let root_id = root_id.raw() | Self::ROOT_IS_LEAF_MASK;
407 Ok(Self {
408 root_page_id: AtomicU64::new(root_id),
409 storage: leaf_storage,
410 wal,
411 cache_only: config.cache_only,
412 write_load_full_page: write_load_full,
413 mini_page_size_classes: Self::create_mem_page_size_classes(
414 config.cb_min_record_size,
415 config.cb_max_record_size,
416 config.leaf_page_size,
417 config.max_fence_len,
418 config.cache_only,
419 ),
420 snapshot_mgr,
421 config,
422 #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
423 metrics_recorder: Some(Arc::new(ThreadLocal::new())),
424 })
425 }
426
427 pub fn config(&self) -> &Config {
428 &self.config
429 }
430
431 pub fn get_buffer_metrics(&self) -> CircularBufferMetrics {
435 self.storage.get_buffer_metrics()
436 }
437
438 pub(crate) fn get_root_page(&self) -> (PageID, bool) {
440 let root_page_id = self.root_page_id.load(Ordering::Acquire);
441 let root_is_leaf = (root_page_id & Self::ROOT_IS_LEAF_MASK) != 0;
442 let clean = root_page_id & (!Self::ROOT_IS_LEAF_MASK);
443
444 let page_id = if root_is_leaf {
445 PageID::from_id(clean)
446 } else {
447 PageID::from_pointer(clean as *const InnerNode)
448 };
449
450 (page_id, root_is_leaf)
451 }
452
453 pub(crate) fn mapping_table(&self) -> &PageTable {
454 self.storage.mapping_table()
455 }
456
457 pub(crate) fn should_promote_read(&self) -> bool {
458 get_rng().random_range(0..100) < self.config.read_promotion_rate.load(Ordering::Relaxed)
459 }
460
461 pub(crate) fn should_promote_scan_page(&self) -> bool {
462 get_rng().random_range(0..100) < self.config.scan_promotion_rate.load(Ordering::Relaxed)
463 }
464
465 pub fn update_read_promotion_rate(&self, new_rate: usize) {
467 self.config
468 .read_promotion_rate
469 .store(new_rate, Ordering::Relaxed);
470 }
471
472 fn try_split_leaf(
473 &self,
474 cur_page_id: PageID,
475 parent: &Option<ReadGuard>,
476 ) -> Result<bool, TreeError> {
477 debug_assert!(cur_page_id.is_id());
478
479 let mut cur_page = self.mapping_table().get_mut(&cur_page_id);
483
484 check_parent!(self, cur_page_id, parent);
485
486 let should_split = cur_page.get_split_flag();
487 if !should_split {
488 return Ok(false);
489 }
490
491 let snapshot_guard = match CPRSnapShotMgr::get_snapshot_guard(self.snapshot_mgr.clone()) {
493 Ok(guard) => guard,
494 Err(()) => {
495 return Err(TreeError::NeedRestart);
496 }
497 };
498
499 match parent {
500 Some(_) => {
501 unreachable!("Leaf node split should not happen here");
502 }
503 None => {
504 if snapshot_guard.is_protected() {
505 let local_thread_snapshot_version = snapshot_guard.snapshot_version();
506 match snapshot_guard.get_local_phase_id() {
507 PhaseId::Rest => {
508 if self.cache_only {
509 let root_page_loc = cur_page.get_page_location().clone();
510 match root_page_loc {
511 PageLocation::Mini(ptr) => {
512 let root_page = cur_page.load_cache_page_mut(ptr);
513 if root_page.get_clean_snapshot_version()
514 < local_thread_snapshot_version
515 {
516 root_page.set_snapshot_version(
517 local_thread_snapshot_version,
518 false,
519 );
520 }
521 }
522 _ => {
523 panic!(
524 "The root node is not a mini-page in cache-only mode"
525 )
526 }
527 }
528 } else {
529 let root_page = cur_page.load_base_page_mut();
530 if root_page.get_clean_snapshot_version()
531 < local_thread_snapshot_version
532 {
533 root_page
534 .set_snapshot_version(local_thread_snapshot_version, false);
535 }
536 }
537 }
538 PhaseId::Prepare => {
539 if self.cache_only {
540 let root_page_loc = cur_page.get_page_location().clone();
541 match root_page_loc {
542 PageLocation::Mini(ptr) => {
543 let root_page = cur_page.load_cache_page_mut(ptr);
544 if root_page.get_clean_snapshot_version()
545 > local_thread_snapshot_version
546 {
547 return Err(TreeError::NeedRestart);
548 }
549 }
550 _ => {
551 panic!(
552 "The root node is not a mini-page in cache-only mode"
553 )
554 }
555 }
556 } else {
557 let root_page = cur_page.load_base_page_mut();
558 if root_page.get_clean_snapshot_version()
559 > local_thread_snapshot_version
560 {
561 return Err(TreeError::NeedRestart);
562 }
563 }
564 }
565 PhaseId::InProgress | PhaseId::Sweep => {
566 if self.cache_only {
567 let root_page_loc = cur_page.get_page_location().clone();
568 match root_page_loc {
569 PageLocation::Mini(ptr) => {
570 let root_page = cur_page.load_cache_page_mut(ptr);
571 if root_page.get_clean_snapshot_version()
572 < local_thread_snapshot_version
573 {
574 let root_page_ptr = unsafe {
575 std::slice::from_raw_parts(
576 root_page as *const LeafNode as *const u8,
577 root_page.meta.node_size as usize,
578 )
579 };
580 snapshot_guard.snapshot_mini_page(
581 cur_page_id,
582 root_page_ptr,
583 root_page.meta.node_size as usize,
584 );
585 root_page.set_snapshot_version(
586 local_thread_snapshot_version,
587 false,
588 );
589 snapshot_guard.snapshot_root_page(cur_page_id);
590 }
591 }
592 _ => {
593 panic!(
594 "The root node is not a mini-page in cache-only mode"
595 )
596 }
597 }
598 } else {
599 let root_page = cur_page.load_base_page_mut();
600 if root_page.get_clean_snapshot_version()
601 < local_thread_snapshot_version
602 {
603 let root_page_ptr = unsafe {
604 std::slice::from_raw_parts(
605 root_page as *const LeafNode as *const u8,
606 root_page.meta.node_size as usize,
607 )
608 };
609 snapshot_guard.snapshot_base_page(
610 cur_page_id,
611 root_page_ptr,
612 root_page.meta.node_size as usize,
613 );
614 root_page
615 .set_snapshot_version(local_thread_snapshot_version, false);
616 snapshot_guard.snapshot_root_page(cur_page_id);
617 }
618 }
619 }
620 }
621 }
622
623 if self.cache_only {
627 let mini_page_guard = self
630 .storage
631 .alloc_mini_page(self.config.leaf_page_size)
632 .expect("Fail to allocate a mini-page during root split");
633 LeafNode::initialize_mini_page(
634 &mini_page_guard,
635 self.config.leaf_page_size,
636 MiniPageNextLevel::new_null(),
637 true,
638 snapshot_guard.snapshot_version(),
639 );
640 let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
641 let mini_loc = PageLocation::Mini(new_mini_ptr);
642
643 let (sibling_id, _mini_lock) = self
645 .storage
646 .mapping_table()
647 .insert_mini_page_mapping(mini_loc);
648
649 let cur_page_loc = cur_page.get_page_location().clone();
651 match cur_page_loc {
652 PageLocation::Mini(ptr) => {
653 let cur_mini_page = cur_page.load_cache_page_mut(ptr);
654 let sibling_page = unsafe { &mut *new_mini_ptr };
655 let split_key = cur_mini_page.split(
656 sibling_page,
657 true,
658 snapshot_guard.snapshot_version(),
659 );
660
661 let mut new_root_builder = InnerNodeBuilder::new();
662 new_root_builder
663 .set_left_most_page_id(cur_page_id)
664 .set_children_is_leaf(true)
665 .add_record(split_key, sibling_id);
666
667 let new_root_ptr =
668 new_root_builder.build(snapshot_guard.snapshot_version());
669 unsafe { (*new_root_ptr).set_root(true) };
670 self.root_page_id
671 .store(PageID::from_pointer(new_root_ptr).raw(), Ordering::Release);
672
673 info!(sibling = sibling_id.raw(), "New root node installed!");
674
675 debug_assert!(cur_mini_page.meta.meta_count_with_fence() > 0);
676 debug_assert!(sibling_page.meta.meta_count_with_fence() > 0);
677
678 return Ok(true);
679 }
680 _ => {
681 panic!("The root node is not a mini-page in cache-only mode")
682 }
683 }
684 }
685
686 let mut x_page = cur_page;
687
688 let (sibling_id, mut sibling_entry) =
690 self.alloc_base_page_and_lock(snapshot_guard.snapshot_version());
691
692 info!(sibling = sibling_id.raw(), "Splitting root node!");
693
694 let sibling = sibling_entry.load_base_page_mut();
695
696 let leaf_node = x_page.load_base_page_mut();
697 let split_key = leaf_node.split(sibling, false, snapshot_guard.snapshot_version());
698
699 let mut new_root_builder = InnerNodeBuilder::new();
701 new_root_builder
702 .set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE))
703 .set_left_most_page_id(cur_page_id)
704 .set_children_is_leaf(true)
705 .add_record(split_key, sibling_id);
706
707 let new_root_ptr = new_root_builder.build(snapshot_guard.snapshot_version());
708
709 unsafe { (*new_root_ptr).set_root(true) };
710
711 self.root_page_id
712 .store(PageID::from_pointer(new_root_ptr).raw(), Ordering::Release);
713
714 info!(sibling = sibling_id.raw(), "New root node installed!");
715 Ok(true)
716 }
717 }
718 }
719
720 fn alloc_base_page_and_lock(&self, snapshot_version: u64) -> (PageID, LeafEntryXLocked<'_>) {
721 let (pid, base_entry) = self
722 .mapping_table()
723 .alloc_base_page_mapping(snapshot_version);
724
725 (pid, base_entry)
726 }
727
728 fn try_split_inner<'a>(
729 &self,
730 cur_page: PageID,
731 parent: Option<ReadGuard<'a>>,
732 ) -> Result<(bool, Option<ReadGuard<'a>>), TreeError> {
733 let cur_node = ReadGuard::try_read(cur_page.as_inner_node())?;
734
735 check_parent!(self, cur_page, parent);
736
737 let should_split = cur_node.as_ref().meta.get_split_flag();
738
739 if !should_split {
740 return Ok((false, parent));
741 }
742
743 info!(has_parent = parent.is_some(), "split inner node");
744
745 let snapshot_guard = match CPRSnapShotMgr::get_snapshot_guard(self.snapshot_mgr.clone()) {
747 Ok(guard) => guard,
748 Err(()) => {
749 return Err(TreeError::NeedRestart);
750 }
751 };
752
753 match parent {
754 Some(p) => {
755 let mut x_cur = cur_node.upgrade().map_err(|(_x, e)| e)?;
756 let mut x_parent = p.upgrade().map_err(|(_x, e)| e)?;
757
758 if snapshot_guard.is_protected() {
760 let local_thread_snapshot_version = snapshot_guard.snapshot_version();
761
762 match snapshot_guard.get_local_phase_id() {
763 PhaseId::Rest => {
764 if x_parent.as_ref().get_clean_snapshot_version()
765 < local_thread_snapshot_version
766 {
767 x_parent
768 .as_mut()
769 .set_snapshot_version(local_thread_snapshot_version);
770 }
771
772 if x_cur.as_ref().get_clean_snapshot_version()
773 < local_thread_snapshot_version
774 {
775 x_cur
776 .as_mut()
777 .set_snapshot_version(local_thread_snapshot_version);
778 }
779 }
780 PhaseId::Prepare
781 if x_parent.as_ref().get_clean_snapshot_version()
782 > local_thread_snapshot_version
783 || x_cur.as_ref().get_clean_snapshot_version()
784 > local_thread_snapshot_version =>
785 {
786 return Err(TreeError::NeedRestart);
788 }
789 PhaseId::InProgress | PhaseId::Sweep => {
790 if x_parent.as_ref().get_clean_snapshot_version()
791 < local_thread_snapshot_version
792 {
793 snapshot_guard.snapshot_inner_node(x_parent.as_ref());
794 x_parent
795 .as_mut()
796 .set_snapshot_version(local_thread_snapshot_version);
797
798 if x_parent.as_ref().is_root() {
799 let root_id = self.root_page_id.load(Ordering::Acquire);
800 assert_eq!(
801 root_id,
802 PageID::from_pointer(x_parent.as_ref() as *const InnerNode)
803 .raw()
804 );
805
806 snapshot_guard.snapshot_root_page(PageID::from_pointer(
807 x_parent.as_ref() as *const InnerNode,
808 ));
809 }
810 }
811
812 if x_cur.as_ref().get_clean_snapshot_version()
813 < local_thread_snapshot_version
814 {
815 snapshot_guard.snapshot_inner_node(x_cur.as_ref());
816 x_cur
817 .as_mut()
818 .set_snapshot_version(local_thread_snapshot_version);
819 }
820 }
821 _ => {}
822 }
823 }
824
825 let split_key = x_cur.as_ref().get_split_key();
826
827 let mut sibling_builder = InnerNodeBuilder::new();
829 sibling_builder.set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE));
830
831 let success = x_parent
832 .as_mut()
833 .insert(&split_key, sibling_builder.get_page_id());
834 if !success {
835 x_parent.as_mut().meta.set_split_flag();
836 return Err(TreeError::NeedRestart);
837 }
838
839 x_cur
840 .as_mut()
841 .split(&mut sibling_builder, snapshot_guard.snapshot_version());
842
843 sibling_builder.build(snapshot_guard.snapshot_version());
844
845 Ok((true, Some(x_parent.downgrade())))
846 }
847 None => {
848 let mut x_cur = cur_node.upgrade().map_err(|(_x, e)| e)?;
849
850 if snapshot_guard.is_protected() {
852 let local_thread_snapshot_version = snapshot_guard.snapshot_version();
853
854 match snapshot_guard.get_local_phase_id() {
855 PhaseId::Rest
856 if x_cur.as_ref().get_clean_snapshot_version()
857 < local_thread_snapshot_version =>
858 {
859 x_cur
860 .as_mut()
861 .set_snapshot_version(local_thread_snapshot_version);
862 }
863 PhaseId::Prepare
864 if x_cur.as_ref().get_clean_snapshot_version()
865 > local_thread_snapshot_version =>
866 {
867 return Err(TreeError::NeedRestart);
869 }
870 PhaseId::InProgress | PhaseId::Sweep
871 if x_cur.as_ref().get_clean_snapshot_version()
872 < local_thread_snapshot_version =>
873 {
874 snapshot_guard.snapshot_inner_node(x_cur.as_ref());
875 x_cur
876 .as_mut()
877 .set_snapshot_version(local_thread_snapshot_version);
878
879 let root_id = self.root_page_id.load(Ordering::Acquire);
881 assert!(x_cur.as_ref().is_root());
882 assert_eq!(
883 root_id,
884 PageID::from_pointer(x_cur.as_ref() as *const InnerNode).raw()
885 );
886
887 snapshot_guard.snapshot_root_page(PageID::from_pointer(
888 x_cur.as_ref() as *const InnerNode
889 ));
890 }
891 _ => {}
892 }
893 }
894
895 let mut sibling_builder = InnerNodeBuilder::new();
897 sibling_builder.set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE));
898 let sibling_id = sibling_builder.get_page_id();
899
900 let split_key = x_cur
901 .as_mut()
902 .split(&mut sibling_builder, snapshot_guard.snapshot_version());
903 x_cur.as_mut().set_root(false);
904
905 let mut new_root_builder = InnerNodeBuilder::new();
907 new_root_builder
908 .set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE))
909 .set_left_most_page_id(cur_page)
910 .set_children_is_leaf(false)
911 .add_record(split_key, sibling_id);
912 sibling_builder.build(snapshot_guard.snapshot_version());
913 let new_root_ptr = new_root_builder.build(snapshot_guard.snapshot_version());
914
915 unsafe { (*new_root_ptr).set_root(true) };
916
917 let _x_root = ReadGuard::try_read(new_root_ptr)
918 .unwrap()
919 .upgrade()
920 .unwrap();
921 self.root_page_id
922 .store(PageID::from_pointer(new_root_ptr).raw(), Ordering::Release);
923
924 info!(
925 has_parent = parent.is_some(),
926 cur = cur_page.raw(),
927 "finished split inner node"
928 );
929
930 Ok((true, parent))
931 }
932 }
933 }
934
935 pub(crate) fn traverse_to_leaf(
936 &self,
937 key: &[u8],
938 aggressive_split: bool,
939 return_high_fence_key: bool,
940 mut high_fence: Option<&mut Vec<u8>>, ) -> Result<(PageID, Option<ReadGuard<'_>>), TreeError> {
942 let (mut cur_page, mut cur_is_leaf) = self.get_root_page();
943 let mut parent: Option<ReadGuard> = None;
944
945 loop {
946 if aggressive_split {
947 if cur_is_leaf
948 && !cur_page.is_inner_node_pointer()
949 && self.try_split_leaf(cur_page, &parent)?
950 {
951 return Err(TreeError::NeedRestart);
952 } else if !cur_is_leaf {
953 let (split_success, new_parent) = self.try_split_inner(cur_page, parent)?;
954 if split_success {
955 return Err(TreeError::NeedRestart);
956 } else {
957 parent = new_parent;
958 }
959 }
960 }
961
962 if cur_is_leaf {
963 return Ok((cur_page, parent));
964 } else {
965 let next = ReadGuard::try_read(cur_page.as_inner_node())?;
966
967 check_parent!(self, cur_page, parent);
968
969 let next_node = next.as_ref();
970 let next_is_leaf = next_node.meta.children_is_leaf();
971 let pos = next_node.lower_bound(key);
972 let kv_meta = next_node.get_kv_meta(pos as u16);
973 cur_page = next_node.get_value(kv_meta);
974 cur_is_leaf = next_is_leaf;
975
976 if return_high_fence_key
978 && ((pos + 1) as u16) < next_node.meta.meta_count_with_fence()
979 {
980 let hk_kv_meta = next_node.get_kv_meta((pos + 1) as u16);
981 let hk = next_node.get_full_key(hk_kv_meta);
982 if let Some(v) = high_fence.as_deref_mut() {
983 v.resize(hk.len(), 0u8);
984 v.copy_from_slice(&hk);
985 } else {
986 panic!("high_fence is None, but return_high_fence_key is true");
987 }
988 }
989
990 parent = Some(next);
991 }
992 }
993 }
994
995 fn write_inner(&self, write_op: WriteOp, aggressive_split: bool) -> Result<(), TreeError> {
996 let (pid, parent) = self.traverse_to_leaf(write_op.key, aggressive_split, false, None)?;
997
998 let mut leaf_entry = self.mapping_table().get_mut(&pid);
999
1000 check_parent!(self, pid, parent);
1001
1002 let page_loc = leaf_entry.get_page_location();
1003 match page_loc {
1004 PageLocation::Null => {
1005 if !self.cache_only {
1006 panic!("Found an Null page in non cache-only mode");
1007 }
1008
1009 if write_op.op_type == OpType::Delete {
1010 return Ok(());
1011 }
1012
1013 let mini_page_size = LeafNode::get_chain_size_hint(
1015 write_op.key.len(),
1016 write_op.value.len(),
1017 &self.mini_page_size_classes,
1018 self.cache_only,
1019 );
1020
1021 let snapshot_guard =
1023 match CPRSnapShotMgr::get_snapshot_guard(self.snapshot_mgr.clone()) {
1024 Ok(guard) => guard,
1025 Err(()) => {
1026 return Err(TreeError::NeedRestart);
1027 }
1028 };
1029
1030 let mini_page_guard = self.storage.alloc_mini_page(mini_page_size)?;
1031 LeafNode::initialize_mini_page(
1032 &mini_page_guard,
1033 mini_page_size,
1034 MiniPageNextLevel::new_null(),
1035 true,
1036 snapshot_guard.snapshot_version(),
1037 );
1038 let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
1039 let mini_loc = PageLocation::Mini(new_mini_ptr);
1040
1041 leaf_entry.create_cache_page_loc(mini_loc);
1042
1043 let mini_page_ref = leaf_entry.load_cache_page_mut(new_mini_ptr);
1044 let insert_success =
1045 mini_page_ref.insert(write_op.key, write_op.value, write_op.op_type, 0);
1046 assert!(insert_success);
1047
1048 debug_assert!(mini_page_ref.meta.meta_count_with_fence() > 0);
1049 counter!(InsertCreatedMiniPage);
1050 }
1051 _ => {
1052 leaf_entry.insert(
1053 write_op.key,
1054 write_op.value,
1055 parent,
1056 write_op.op_type,
1057 &self.storage,
1058 &self.write_load_full_page,
1059 &self.cache_only,
1060 &self.mini_page_size_classes,
1061 )?;
1062
1063 if leaf_entry.cache_page_about_to_evict(&self.storage) {
1064 _ = leaf_entry.move_cache_page_to_tail(&self.storage);
1066 }
1067
1068 if let Some(wal) = &self.wal {
1069 let lsn = wal.append_and_wait(&write_op, leaf_entry.get_disk_offset());
1070 leaf_entry.update_lsn(lsn);
1071 }
1072 }
1073 }
1074
1075 Ok(())
1076 }
1077
1078 pub(crate) fn evict_from_circular_buffer(&self) -> Result<usize, TreeError> {
1080 const TARGET_EVICT_SIZE: usize = 1024;
1084 let mut evicted = 0;
1085
1086 let mut retry_cnt = 0;
1088
1089 while evicted < TARGET_EVICT_SIZE && retry_cnt < 10 {
1090 let n = self
1091 .storage
1092 .evict_from_buffer(|mini_page_handle: &TombstoneHandle| {
1093 eviction_callback(mini_page_handle, self)
1094 })?;
1095 evicted += n as usize;
1096 retry_cnt += 1;
1097 }
1098 info!("stopped evict from circular buffer");
1099 Ok(evicted)
1100 }
1101
1102 pub fn insert(&self, key: &[u8], value: &[u8]) -> LeafInsertResult {
1119 if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
1121 return LeafInsertResult::InvalidKV(format!("Key too large {}", key.len()));
1122 }
1123
1124 if key.is_empty() {
1126 return LeafInsertResult::InvalidKV(format!(
1127 "Key too small {}, at least one byte",
1128 key.len()
1129 ));
1130 }
1131
1132 if value.len() > MAX_VALUE_LEN || key.len() + value.len() > self.config.cb_max_record_size {
1134 return LeafInsertResult::InvalidKV(format!(
1135 "Record too large {}, {}, please adjust cb_max_record_size in config",
1136 key.len(),
1137 value.len()
1138 ));
1139 }
1140
1141 if key.len() + value.len() < self.config.cb_min_record_size {
1143 return LeafInsertResult::InvalidKV(format!(
1144 "Record too small {}, {}, please adjust cb_min_record_size in config",
1145 key.len(),
1146 value.len()
1147 ));
1148 }
1149
1150 let backoff = Backoff::new();
1151 let mut aggressive_split = false;
1152
1153 counter!(Insert);
1154 info!(key_len = key.len(), value_len = value.len(), "insert");
1155
1156 loop {
1157 let result = self.write_inner(WriteOp::make_insert(key, value), aggressive_split);
1158 match result {
1159 Ok(_) => return LeafInsertResult::Success,
1160 Err(TreeError::NeedRestart) => {
1161 #[cfg(all(feature = "shuttle", test))]
1162 {
1163 shuttle::thread::yield_now();
1164 }
1165 counter!(InsertNeedRestart);
1166 aggressive_split = true;
1167 }
1168 Err(TreeError::CircularBufferFull) => {
1169 info!("insert failed, started evict from circular buffer");
1170 aggressive_split = true;
1171 counter!(InsertCircularBufferFull);
1172 _ = self.evict_from_circular_buffer();
1173 continue;
1174 }
1175 Err(TreeError::Locked) => {
1176 counter!(InsertLocked);
1177 backoff.snooze();
1178 }
1179 }
1180 }
1181 }
1182
1183 pub fn read(&self, key: &[u8], out_buffer: &mut [u8]) -> LeafReadResult {
1203 if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
1205 return LeafReadResult::InvalidKey;
1206 }
1207
1208 if key.is_empty() {
1210 return LeafReadResult::InvalidKey;
1211 }
1212
1213 let backoff = Backoff::new();
1214
1215 info!(key_len = key.len(), "read");
1216 counter!(Read);
1217 let mut aggressive_split = false;
1218
1219 #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
1220 let mut debug_timer = DebugTimerGuard::new(Timer::Read, self.metrics_recorder.clone());
1221
1222 loop {
1223 let result = self.read_inner(key, out_buffer, aggressive_split);
1224 match result {
1225 Ok(v) => {
1226 #[cfg(any(
1227 feature = "metrics-rt-debug-all",
1228 feature = "metrics-rt-debug-timer"
1229 ))]
1230 debug_timer.end();
1231
1232 return v;
1233 }
1234 Err(TreeError::CircularBufferFull) => {
1235 info!("read promotion failed, started evict from circular buffer");
1236 aggressive_split = true;
1237 match self.evict_from_circular_buffer() {
1238 Ok(_) => continue,
1239 Err(_) => continue,
1240 };
1241 }
1242 Err(_) => {
1243 backoff.spin();
1244 aggressive_split = true;
1245 }
1246 }
1247 }
1248 }
1249
1250 pub fn delete(&self, key: &[u8]) {
1264 if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
1266 return;
1267 }
1268
1269 if key.is_empty() {
1271 return;
1272 }
1273
1274 let backoff = Backoff::new();
1275
1276 info!(key_len = key.len(), "delete");
1277
1278 let mut aggressive_split = false;
1279
1280 loop {
1281 let result = self.write_inner(WriteOp::make_delete(key), aggressive_split);
1282 match result {
1283 Ok(_) => return,
1284 Err(TreeError::CircularBufferFull) => {
1285 info!("delete failed, started evict from circular buffer");
1286 aggressive_split = true;
1287 match self.evict_from_circular_buffer() {
1288 Ok(_) => continue,
1289 Err(_) => continue,
1290 };
1291 }
1292 Err(_) => {
1293 aggressive_split = true;
1294 backoff.spin();
1295 }
1296 }
1297 }
1298 }
1299
1300 pub fn scan_with_count<'a>(
1303 &'a self,
1304 key: &[u8],
1305 cnt: usize,
1306 return_field: ScanReturnField,
1307 ) -> Result<ScanIter<'a, 'a>, ScanIterError> {
1308 if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
1310 return Err(ScanIterError::InvalidStartKey);
1311 }
1312
1313 if key.is_empty() {
1315 return Err(ScanIterError::InvalidStartKey);
1316 }
1317
1318 if cnt == 0 {
1320 return Err(ScanIterError::InvalidCount);
1321 }
1322
1323 Ok(ScanIter::new_with_scan_count(self, key, cnt, return_field))
1324 }
1325
1326 pub fn scan_with_end_key<'a>(
1329 &'a self,
1330 start_key: &[u8],
1331 end_key: &[u8],
1332 return_field: ScanReturnField,
1333 ) -> Result<ScanIter<'a, 'a>, ScanIterError> {
1334 if start_key.len() > self.config.max_fence_len / 2 || start_key.len() > MAX_KEY_LEN {
1336 return Err(ScanIterError::InvalidStartKey);
1337 }
1338
1339 if start_key.is_empty() {
1341 return Err(ScanIterError::InvalidStartKey);
1342 }
1343
1344 if end_key.len() > self.config.max_fence_len / 2 || end_key.len() > MAX_KEY_LEN {
1346 return Err(ScanIterError::InvalidEndKey);
1347 }
1348
1349 if end_key.is_empty() {
1351 return Err(ScanIterError::InvalidEndKey);
1352 }
1353
1354 let cmp = start_key.cmp(end_key);
1356 if cmp == std::cmp::Ordering::Greater {
1357 return Err(ScanIterError::InvalidKeyRange);
1358 }
1359
1360 Ok(ScanIter::new_with_end_key(
1361 self,
1362 start_key,
1363 end_key,
1364 return_field,
1365 ))
1366 }
1367
1368 #[doc(hidden)]
1369 pub fn scan_mut_with_count<'a>(
1370 &'a self,
1371 key: &'a [u8],
1372 cnt: usize,
1373 return_field: ScanReturnField,
1374 ) -> Result<ScanIterMut<'a, 'a>, ScanIterError> {
1375 if self.cache_only {
1377 return Err(ScanIterError::CacheOnlyMode);
1378 }
1379
1380 if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
1382 return Err(ScanIterError::InvalidStartKey);
1383 }
1384
1385 if cnt == 0 {
1387 return Err(ScanIterError::InvalidCount);
1388 }
1389
1390 Ok(ScanIterMut::new_with_scan_count(
1391 self,
1392 key,
1393 cnt,
1394 return_field,
1395 ))
1396 }
1397
1398 #[doc(hidden)]
1399 pub fn scan_mut_with_end_key<'a>(
1400 &'a self,
1401 start_key: &'a [u8],
1402 end_key: &'a [u8],
1403 return_field: ScanReturnField,
1404 ) -> Result<ScanIterMut<'a, 'a>, ScanIterError> {
1405 if self.cache_only {
1407 return Err(ScanIterError::CacheOnlyMode);
1408 }
1409
1410 if start_key.len() > self.config.max_fence_len / 2 || start_key.len() > MAX_KEY_LEN {
1412 return Err(ScanIterError::InvalidStartKey);
1413 }
1414
1415 if end_key.len() > self.config.max_fence_len / 2 || end_key.len() > MAX_KEY_LEN {
1417 return Err(ScanIterError::InvalidEndKey);
1418 }
1419
1420 Ok(ScanIterMut::new_with_end_key(
1421 self,
1422 start_key,
1423 end_key,
1424 return_field,
1425 ))
1426 }
1427
1428 fn read_inner(
1429 &self,
1430 key: &[u8],
1431 out_buffer: &mut [u8],
1432 aggressive_split: bool,
1433 ) -> Result<LeafReadResult, TreeError> {
1434 let (node, parent) = self.traverse_to_leaf(key, aggressive_split, false, None)?;
1435
1436 let mut leaf = self.mapping_table().get(&node);
1437
1438 check_parent!(self, node, parent);
1439
1440 let out = leaf.read(
1441 key,
1442 out_buffer,
1443 self.config.mini_page_binary_search,
1444 self.cache_only,
1445 );
1446 match out {
1447 ReadResult::Mini(r) | ReadResult::Full(r) => {
1448 if leaf.cache_page_about_to_evict(&self.storage) {
1449 let mut x_leaf = match leaf.try_upgrade(self.snapshot_mgr.clone(), node) {
1450 Ok(v) => v,
1451 Err(_) => return Ok(r),
1452 };
1453 _ = x_leaf.move_cache_page_to_tail(&self.storage);
1455 }
1456
1457 Ok(r)
1458 }
1459
1460 ReadResult::Base(r) => {
1461 counter!(BasePageRead);
1462
1463 if self.cache_only {
1465 panic!("Attempt to read a base page while in cache-only mode.");
1466 }
1467
1468 let v = match r {
1469 LeafReadResult::Found(v) => v,
1470 _ => return Ok(r),
1471 };
1472
1473 if parent.is_none() || !self.should_promote_read() {
1474 return Ok(r);
1475 }
1476
1477 let mut x_leaf = match leaf.try_upgrade(self.snapshot_mgr.clone(), node) {
1478 Ok(x) => x,
1479 Err(_) => {
1480 return Ok(r);
1481 }
1482 };
1483
1484 if self.config.read_record_cache {
1485 let out = x_leaf.insert(
1489 key,
1490 &out_buffer[0..v as usize],
1491 parent,
1492 OpType::Cache,
1493 &self.storage,
1494 &self.write_load_full_page,
1495 &self.cache_only,
1496 &self.mini_page_size_classes,
1497 );
1498
1499 match out {
1500 Ok(_) => {
1501 counter!(ReadPromotionOk);
1502 Ok(r)
1503 }
1504 Err(TreeError::Locked) => {
1505 counter!(ReadPromotionFailed);
1507 Ok(r)
1508 }
1509 Err(TreeError::CircularBufferFull) => {
1510 counter!(ReadPromotionFailed);
1511 Err(TreeError::CircularBufferFull)
1512 }
1513 Err(TreeError::NeedRestart) => {
1514 counter!(ReadPromotionFailed);
1516 Err(TreeError::NeedRestart)
1517 }
1518 }
1519 } else {
1520 match self.upgrade_to_full_page(x_leaf, parent.unwrap()) {
1521 Ok(_) | Err(TreeError::Locked) => Ok(r),
1522 Err(e) => Err(e),
1523 }
1524 }
1525 }
1526 ReadResult::None => Ok(LeafReadResult::NotFound),
1527 }
1528 }
1529
1530 fn upgrade_to_full_page<'a>(
1531 &'a self,
1532 mut x_leaf: LeafEntryXLocked<'a>,
1533 parent: ReadGuard<'a>,
1534 ) -> Result<LeafEntryXLocked<'a>, TreeError> {
1535 let page_loc = x_leaf.get_page_location().clone();
1536 match page_loc {
1537 PageLocation::Mini(ptr) => {
1538 let mini_page = x_leaf.load_cache_page_mut(ptr);
1539 let h = self.storage.begin_dealloc_mini_page(mini_page)?;
1540 let _merge_result = x_leaf.try_merge_mini_page(&h, parent, &self.storage)?;
1541 let base_offset = mini_page.next_level;
1542 x_leaf.change_to_base_loc();
1543 self.storage.finish_dealloc_mini_page(h);
1544
1545 let base_page_ref = x_leaf.load_base_page_from_buffer();
1546 let full_page_loc =
1547 upgrade_to_full_page(&self.storage, base_page_ref, base_offset)?;
1548 x_leaf.create_cache_page_loc(full_page_loc);
1549 Ok(x_leaf)
1550 }
1551 PageLocation::Full(_ptr) => Ok(x_leaf),
1552 PageLocation::Base(offset) => {
1553 let base_page_ref = x_leaf.load_base_page(offset);
1554 let next_level = MiniPageNextLevel::new(offset);
1555 let full_page_loc = upgrade_to_full_page(&self.storage, base_page_ref, next_level)?;
1556 x_leaf.create_cache_page_loc(full_page_loc);
1557 Ok(x_leaf)
1558 }
1559 PageLocation::Null => panic!("upgrade_to_full_page on Null page"),
1560 }
1561 }
1562
1563 pub fn get_metrics(&mut self) -> Option<serde_json::Value> {
1566 #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
1567 {
1568 let recorder = self.metrics_recorder.take();
1569 match recorder {
1570 Some(r) => {
1571 let recorders = Arc::try_unwrap(r).expect("Failed to obtain the recorders of bf-tree, please make sure no other references exist.");
1572 let mut timer_accumulated = TimerRecorder::default();
1573
1574 for r in recorders {
1576 let t = unsafe { &*r.get() };
1577
1578 timer_accumulated += t.timers.clone();
1579 }
1580
1581 let output = serde_json::json!({
1582 "Timers": timer_accumulated,
1583 });
1584
1585 self.metrics_recorder = Some(Arc::new(ThreadLocal::new()));
1586
1587 Some(output)
1588 }
1589 None => None,
1590 }
1591 }
1592 #[cfg(not(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer")))]
1593 {
1594 None
1595 }
1596 }
1597}
1598
1599pub(crate) fn key_value_physical_size(key: &[u8], value: &[u8]) -> usize {
1600 let key_size = key.len();
1601 let value_size = value.len();
1602 let meta_size = crate::nodes::KV_META_SIZE;
1603 key_size + value_size + meta_size
1604}
1605
1606pub(crate) fn eviction_callback(
1607 mini_page_handle: &TombstoneHandle,
1608 tree: &BfTree,
1609) -> Result<(), TreeError> {
1610 let mini_page = mini_page_handle.ptr as *mut LeafNode;
1611 let key_to_this_page = if tree.cache_only {
1612 unsafe { &*mini_page }.try_get_key_to_reach_this_node()?
1613 } else {
1614 unsafe { &*mini_page }.get_key_to_reach_this_node()
1615 };
1616
1617 let (pid, parent) = tree.traverse_to_leaf(&key_to_this_page, true, false, None)?;
1619 info!(
1620 pid = pid.raw(),
1621 "starting to merge mini page in eviction call back"
1622 );
1623
1624 let mut leaf_entry = tree.mapping_table().get_mut(&pid);
1625
1626 histogram!(EvictNodeSize, unsafe { &*mini_page }.meta.node_size as u64);
1627
1628 match leaf_entry.get_page_location() {
1629 PageLocation::Mini(ptr) => {
1630 {
1631 if *ptr != mini_page {
1638 return Err(TreeError::NeedRestart);
1639 }
1640 }
1641
1642 let parent = parent.expect("Mini page must have a parent");
1643 parent.check_version()?;
1644
1645 if tree.cache_only {
1648 leaf_entry.change_to_null_loc();
1649 } else {
1650 leaf_entry.try_merge_mini_page(mini_page_handle, parent, &tree.storage)?;
1651 leaf_entry.change_to_base_loc();
1652 }
1654
1655 Ok(())
1656 }
1657
1658 PageLocation::Full(ptr) => {
1659 if *ptr != mini_page {
1660 return Err(TreeError::NeedRestart);
1661 }
1662
1663 leaf_entry.merge_full_page(mini_page_handle);
1664 Ok(())
1665 }
1666
1667 PageLocation::Base(_offset) => Err(TreeError::NeedRestart),
1669
1670 PageLocation::Null => Err(TreeError::NeedRestart),
1672 }
1673}
1674
1675#[cfg(test)]
1676mod tests {
1677 use crate::error::ConfigError;
1678 use crate::BfTree;
1679
1680 #[test]
1681 fn test_mini_page_size_classes() {
1682 let mut size_classes = BfTree::create_mem_page_size_classes(48, 1952, 4096, 64, false);
1683 assert_eq!(
1684 size_classes,
1685 vec![128, 192, 256, 512, 960, 1856, 2048, 4096]
1686 );
1687
1688 size_classes = BfTree::create_mem_page_size_classes(1544, 1544, 3136, 64, true);
1689 assert_eq!(size_classes, vec![1536, 3136]);
1690
1691 size_classes = BfTree::create_mem_page_size_classes(48, 3072, 12288, 64, false);
1692 assert_eq!(
1693 size_classes,
1694 vec![128, 192, 256, 512, 960, 1856, 3648, 7232, 9088, 12288]
1695 );
1696
1697 size_classes = BfTree::create_mem_page_size_classes(4, 1952, 4096, 32, false);
1698 assert_eq!(size_classes, vec![64, 128, 256, 448, 832, 1600, 2048, 4096]);
1699 }
1700
1701 #[test]
1702 fn test_invalid_config_to_build_bf_tree() {
1703 let mut config = crate::Config::default();
1705 config.cb_min_record_size(4);
1706 config.leaf_page_size(32 * 1024);
1707
1708 if let Err(e) = BfTree::with_config(config.clone(), None) {
1709 match e {
1710 ConfigError::MinimumRecordSize(_) => {}
1711 _ => panic!("Expected InvalidMinimumRecordSize error"),
1712 }
1713 } else {
1714 panic!("Expected error but got Ok");
1715 }
1716
1717 config = crate::Config::default();
1719 config.cb_max_record_size(64 * 1024);
1720
1721 if let Err(e) = BfTree::with_config(config.clone(), None) {
1722 match e {
1723 ConfigError::MaximumRecordSize(_) => {}
1724 _ => panic!("Expected InvalidMaximumRecordSize error"),
1725 }
1726 } else {
1727 panic!("Expected error but got Ok");
1728 }
1729
1730 config = crate::Config::default();
1732 config.leaf_page_size(4050);
1733
1734 if let Err(e) = BfTree::with_config(config.clone(), None) {
1735 match e {
1736 ConfigError::LeafPageSize(_) => {}
1737 _ => panic!("Expected InvalidLeafPageSize error"),
1738 }
1739 } else {
1740 panic!("Expected error but got Ok");
1741 }
1742
1743 config = crate::Config::default();
1745 config.leaf_page_size(16 * 1024);
1746 config.cb_size_byte(16 * 1024);
1747
1748 if let Err(e) = BfTree::with_config(config.clone(), None) {
1749 match e {
1750 ConfigError::CircularBufferSize(_) => {}
1751 _ => panic!("Expected InvalidCircularBufferSize error"),
1752 }
1753 } else {
1754 panic!("Expected error but got Ok");
1755 }
1756
1757 config = crate::Config::default();
1759 config.cb_size_byte(20 * 1024);
1760 if let Err(e) = BfTree::with_config(config.clone(), None) {
1761 match e {
1762 ConfigError::CircularBufferSize(_) => {}
1763 _ => panic!("Expected InvalidCircularBufferSize error"),
1764 }
1765 } else {
1766 panic!("Expected error but got Ok");
1767 }
1768
1769 config = crate::Config::default();
1771 config.cache_only(true);
1772 config.cb_size_byte(2 * 4096);
1773
1774 if let Err(e) = BfTree::with_config(config.clone(), None) {
1775 match e {
1776 ConfigError::CircularBufferSize(_) => {}
1777 _ => panic!("Expected InvalidCircularBufferSize error"),
1778 }
1779 } else {
1780 panic!("Expected error but got Ok");
1781 }
1782 }
1783}