1#[cfg(not(all(feature = "shuttle", test)))]
5use rand::Rng;
6
7#[cfg(all(feature = "shuttle", test))]
8use shuttle::rand::Rng;
9
10use cfg_if::cfg_if;
11
12cfg_if! {
13 if #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))] {
14 use crate::metric::{Timer, TlsRecorder, timer::{TimerRecorder, DebugTimerGuard}};
15 use std::cell::UnsafeCell;
16 use thread_local::ThreadLocal;
17 }
18}
19
20use crate::{
21 check_parent,
22 circular_buffer::{CircularBufferMetrics, TombstoneHandle},
23 counter,
24 error::{ConfigError, TreeError},
25 histogram, info,
26 mini_page_op::{upgrade_to_full_page, LeafEntryXLocked, LeafOperations, ReadResult},
27 nodes::{
28 leaf_node::{LeafKVMeta, LeafReadResult, MiniPageNextLevel, OpType},
29 InnerNode, InnerNodeBuilder, LeafNode, PageID, CACHE_LINE_SIZE, DISK_PAGE_SIZE,
30 INNER_NODE_SIZE, MAX_KEY_LEN, MAX_LEAF_PAGE_SIZE, MAX_VALUE_LEN,
31 },
32 range_scan::{ScanIter, ScanIterMut, ScanReturnField},
33 snapshot::CPRSnapShotMgr,
34 storage::{LeafStorage, PageLocation, PageTable},
35 sync::{
36 atomic::{AtomicU64, Ordering},
37 Arc,
38 },
39 utils::{get_rng, inner_lock::ReadGuard, Backoff, BfsVisitor, NodeInfo},
40 wal::{WriteAheadLog, WriteOp},
41 Config, StorageBackend,
42};
43use std::path::Path;
44
45pub struct BfTree {
47 pub(crate) root_page_id: AtomicU64,
48 pub(crate) storage: LeafStorage,
49 pub(crate) wal: Option<Arc<WriteAheadLog>>,
50 pub(crate) config: Arc<Config>,
51 pub(crate) write_load_full_page: bool,
52 pub(crate) cache_only: bool, pub(crate) mini_page_size_classes: Vec<usize>, pub(crate) snapshot_mgr: Option<Arc<CPRSnapShotMgr>>, #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
56 pub metrics_recorder: Option<Arc<ThreadLocal<UnsafeCell<TlsRecorder>>>>, }
58
59unsafe impl Sync for BfTree {}
60
61unsafe impl Send for BfTree {}
62
63#[derive(Debug, PartialEq, Eq, Clone)]
64pub enum LeafInsertResult {
65 Success,
66 InvalidKV(String),
67}
68
69#[derive(Debug, PartialEq, Eq, Clone)]
70pub enum ScanIterError {
71 CacheOnlyMode,
72 InvalidStartKey,
73 InvalidEndKey,
74 InvalidCount,
75 InvalidKeyRange,
76}
77
78impl Drop for BfTree {
79 fn drop(&mut self) {
80 if let Some(ref wal) = self.wal {
81 wal.stop_background_job();
82 }
83
84 let visitor = BfsVisitor::new_all_nodes(self);
85 for node_info in visitor {
86 match node_info {
87 NodeInfo::Leaf { page_id, .. } => {
88 let mut leaf = self.mapping_table().get_mut(&page_id);
89 leaf.dealloc_self(&self.storage, self.cache_only);
90 }
91 NodeInfo::Inner { ptr, .. } => {
92 if unsafe { &*ptr }.is_valid_disk_offset() {
93 let disk_offset = unsafe { &*ptr }.disk_offset;
94 if self.config.storage_backend == StorageBackend::Memory {
95 self.storage.vfs.dealloc_offset(disk_offset as usize);
97 }
98 }
99 InnerNode::free_node(ptr as *mut InnerNode);
100 }
101 }
102 }
103 }
104}
105
106impl Default for BfTree {
107 fn default() -> Self {
108 Self::new(":memory:", 1024 * 1024 * 32).unwrap()
109 }
110}
111
112impl BfTree {
113 pub(crate) const ROOT_IS_LEAF_MASK: u64 = 0x8000_0000_0000_0000; pub(crate) fn create_mem_page_size_classes(
135 min_record_size_in_byte: usize,
136 max_record_size_in_byte: usize,
137 leaf_page_size_in_byte: usize,
138 max_fence_len_in_byte: usize,
139 cache_only: bool,
140 ) -> Vec<usize> {
141 assert!(
143 min_record_size_in_byte > 1,
144 "cb_min_record_size in config cannot be less than 2"
145 );
146 assert!(
147 min_record_size_in_byte <= max_record_size_in_byte,
148 "cb_min_record_size cannot be larger than cb_max_record_size"
149 );
150 assert!(
151 max_fence_len_in_byte > 0,
152 "max_fence_len in config cannot be zero"
153 );
154 assert!(
155 max_fence_len_in_byte / 2 < max_record_size_in_byte,
156 "max_fence_len/2 cannot be larger than cb_max_record_size"
157 );
158 assert!(
159 leaf_page_size_in_byte <= MAX_LEAF_PAGE_SIZE,
160 "leaf_page_size in config cannot be larger than {}",
161 MAX_LEAF_PAGE_SIZE
162 );
163 assert!(
164 max_fence_len_in_byte / 2 <= MAX_KEY_LEN,
165 "max_key_len in config cannot be larger than {}",
166 MAX_KEY_LEN
167 );
168 assert!(
169 leaf_page_size_in_byte / min_record_size_in_byte <= 4096,
170 "Maximum number of records per page (leaf_page_size/min_record_size) cannot exceed 2^12.", );
172
173 if !cache_only {
176 assert!(
177 leaf_page_size_in_byte.is_multiple_of(DISK_PAGE_SIZE),
178 "leaf_page_size in config should be multiple of {}",
179 DISK_PAGE_SIZE
180 );
181 } else {
182 assert!(
183 leaf_page_size_in_byte.is_multiple_of(CACHE_LINE_SIZE),
184 "leaf_page_size in config should be multiple of {}",
185 CACHE_LINE_SIZE
186 );
187 }
188
189 let max_record_size_with_meta = max_record_size_in_byte + std::mem::size_of::<LeafKVMeta>();
190 let mut max_mini_page_size: usize;
191
192 if cache_only {
193 max_mini_page_size = leaf_page_size_in_byte - max_record_size_with_meta;
195 max_mini_page_size = (max_mini_page_size / CACHE_LINE_SIZE) * CACHE_LINE_SIZE;
196
197 assert!(
198 leaf_page_size_in_byte
199 >= 2 * max_record_size_with_meta + std::mem::size_of::<LeafNode>(),
200 "cb_max_record_size of config should be <= {}",
201 (leaf_page_size_in_byte - std::mem::size_of::<LeafNode>()) / 2
202 - std::mem::size_of::<LeafKVMeta>()
203 );
204 } else {
205 max_mini_page_size = leaf_page_size_in_byte
207 - max_record_size_with_meta
208 - max_fence_len_in_byte
209 - 2 * std::mem::size_of::<LeafKVMeta>();
210 max_mini_page_size = (max_mini_page_size / CACHE_LINE_SIZE) * CACHE_LINE_SIZE;
211
212 assert!(
213 max_mini_page_size >= max_record_size_with_meta + std::mem::size_of::<LeafNode>(),
214 "cb_max_record_size of config should be <= {}",
215 max_mini_page_size
216 - std::mem::size_of::<LeafNode>()
217 - std::mem::size_of::<LeafKVMeta>()
218 );
219 }
220
221 let mut mem_page_size_classes = Vec::new();
223
224 let base: i32 = 2;
225 let mut record_num_per_page_exp: u32 = 0;
226 let c: usize = min_record_size_in_byte + std::mem::size_of::<LeafKVMeta>();
227
228 let mut size_class =
230 base.pow(record_num_per_page_exp) as usize * c + std::mem::size_of::<LeafNode>();
231
232 if !size_class.is_multiple_of(CACHE_LINE_SIZE) {
234 size_class = (size_class / CACHE_LINE_SIZE + 1) * CACHE_LINE_SIZE;
235 }
236
237 while size_class <= max_mini_page_size {
238 if mem_page_size_classes.is_empty()
239 || (mem_page_size_classes[mem_page_size_classes.len() - 1] < size_class)
240 {
241 mem_page_size_classes.push(size_class);
242 }
243
244 record_num_per_page_exp += 1;
245 size_class =
246 base.pow(record_num_per_page_exp) as usize * c + std::mem::size_of::<LeafNode>();
247
248 if !size_class.is_multiple_of(CACHE_LINE_SIZE) {
249 size_class = (size_class / CACHE_LINE_SIZE + 1) * CACHE_LINE_SIZE;
250 }
251 }
252
253 if !cache_only {
254 assert!(!mem_page_size_classes.is_empty());
255 }
256
257 if mem_page_size_classes.is_empty()
259 || mem_page_size_classes[mem_page_size_classes.len() - 1] < max_mini_page_size
260 {
261 mem_page_size_classes.push(max_mini_page_size);
262 }
263
264 if mem_page_size_classes.is_empty()
266 || mem_page_size_classes[mem_page_size_classes.len() - 1] < leaf_page_size_in_byte
267 {
268 mem_page_size_classes.push(leaf_page_size_in_byte);
269 }
270
271 if !cache_only {
272 assert!(mem_page_size_classes.len() >= 2);
273 } else {
274 assert!(!mem_page_size_classes.is_empty());
275 }
276
277 mem_page_size_classes
278 }
279
280 pub fn new(file_path: impl AsRef<Path>, cache_size_byte: usize) -> Result<Self, ConfigError> {
294 let config = Config::new(file_path, cache_size_byte);
295 Self::with_config(config, None)
296 }
297
298 pub fn new_with_config_file<P: AsRef<Path>>(config_file_path: P) -> Result<Self, ConfigError> {
301 let config = Config::new_with_config_file(config_file_path);
302 Self::with_config(config, None)
303 }
304
305 pub fn with_config(config: Config, buffer_ptr: Option<*mut u8>) -> Result<Self, ConfigError> {
308 config.validate()?;
310
311 let wal = match config.write_ahead_log.as_ref() {
312 Some(wal_config) => {
313 let wal = WriteAheadLog::new(wal_config.clone());
314 Some(wal)
315 }
316 None => None,
317 };
318 let write_load_full = config.write_load_full_page;
319 let config = Arc::new(config);
320
321 if config.cache_only {
323 let snapshot_mgr = if config.use_snapshot {
324 Some(Arc::new(CPRSnapShotMgr::new(
325 &config.snapshot_backend,
326 config.snapshot_version,
327 &config.snapshot_file_path,
328 )))
329 } else {
330 None
331 };
332
333 let leaf_storage = LeafStorage::new(config.clone(), buffer_ptr, snapshot_mgr.clone());
334
335 let _snapshot_guard = match CPRSnapShotMgr::get_snapshot_guard(snapshot_mgr.clone()) {
338 Ok(guard) => guard,
339 Err(()) => {
340 panic!("Failed to acquire a snapshot guard for root page initialization");
341 }
342 };
343
344 let mini_page_guard = (leaf_storage)
346 .alloc_mini_page(config.leaf_page_size)
347 .expect("Fail to allocate a mini-page as initial root node");
348 LeafNode::initialize_mini_page(
349 &mini_page_guard,
350 config.leaf_page_size,
351 MiniPageNextLevel::new_null(),
352 true,
353 );
354 let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
355 let mini_loc = PageLocation::Mini(new_mini_ptr);
356
357 let (root_id, root_lock) = leaf_storage
358 .mapping_table()
359 .insert_mini_page_mapping(mini_loc);
360 assert_eq!(root_id.as_id(), 0);
361
362 drop(root_lock);
363 drop(mini_page_guard);
364 let root_id = root_id.raw() | Self::ROOT_IS_LEAF_MASK;
365
366 return Ok(Self {
367 root_page_id: AtomicU64::new(root_id),
368 storage: leaf_storage,
369 wal,
370 cache_only: config.cache_only,
371 write_load_full_page: write_load_full,
372 mini_page_size_classes: Self::create_mem_page_size_classes(
373 config.cb_min_record_size,
374 config.cb_max_record_size,
375 config.leaf_page_size,
376 config.max_fence_len,
377 config.cache_only,
378 ),
379 snapshot_mgr,
380 config,
381 #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
382 metrics_recorder: Some(Arc::new(ThreadLocal::new())),
383 });
384 }
385
386 let snapshot_mgr = if config.use_snapshot {
387 Some(Arc::new(CPRSnapShotMgr::new(
388 &config.snapshot_backend,
389 config.snapshot_version,
390 &config.snapshot_file_path,
391 )))
392 } else {
393 None
394 };
395
396 let leaf_storage = LeafStorage::new(config.clone(), buffer_ptr, snapshot_mgr.clone());
397
398 let _snapshot_guard = match CPRSnapShotMgr::get_snapshot_guard(snapshot_mgr.clone()) {
401 Ok(guard) => guard,
402 Err(()) => {
403 panic!("Failed to acquire a snapshot guard for root page initialization");
404 }
405 };
406
407 let (root_id, root_lock) = leaf_storage.mapping_table().alloc_base_page_mapping();
408 drop(root_lock);
409 assert_eq!(root_id.as_id(), 0);
410
411 let root_id = root_id.raw() | Self::ROOT_IS_LEAF_MASK;
412 Ok(Self {
413 root_page_id: AtomicU64::new(root_id),
414 storage: leaf_storage,
415 wal,
416 cache_only: config.cache_only,
417 write_load_full_page: write_load_full,
418 mini_page_size_classes: Self::create_mem_page_size_classes(
419 config.cb_min_record_size,
420 config.cb_max_record_size,
421 config.leaf_page_size,
422 config.max_fence_len,
423 config.cache_only,
424 ),
425 snapshot_mgr,
426 config,
427 #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
428 metrics_recorder: Some(Arc::new(ThreadLocal::new())),
429 })
430 }
431
432 pub fn config(&self) -> &Config {
433 &self.config
434 }
435
436 pub fn get_buffer_metrics(&self) -> CircularBufferMetrics {
440 self.storage.get_buffer_metrics()
441 }
442
443 pub(crate) fn get_root_page(&self) -> (PageID, bool) {
445 let root_page_id = self.root_page_id.load(Ordering::Acquire);
446 let root_is_leaf = (root_page_id & Self::ROOT_IS_LEAF_MASK) != 0;
447 let clean = root_page_id & (!Self::ROOT_IS_LEAF_MASK);
448
449 let page_id = if root_is_leaf {
450 PageID::from_id(clean)
451 } else {
452 PageID::from_pointer(clean as *const InnerNode)
453 };
454
455 (page_id, root_is_leaf)
456 }
457
458 pub(crate) fn mapping_table(&self) -> &PageTable {
459 self.storage.mapping_table()
460 }
461
462 pub(crate) fn should_promote_read(&self) -> bool {
463 get_rng().gen_range(0..100) < self.config.read_promotion_rate.load(Ordering::Relaxed)
464 }
465
466 pub(crate) fn should_promote_scan_page(&self) -> bool {
467 get_rng().gen_range(0..100) < self.config.scan_promotion_rate.load(Ordering::Relaxed)
468 }
469
470 pub fn update_read_promotion_rate(&self, new_rate: usize) {
472 self.config
473 .read_promotion_rate
474 .store(new_rate, Ordering::Relaxed);
475 }
476
477 fn try_split_leaf(
478 &self,
479 cur_page_id: PageID,
480 parent: &Option<ReadGuard>,
481 ) -> Result<bool, TreeError> {
482 debug_assert!(cur_page_id.is_id());
483
484 let mut cur_page = self.mapping_table().get_mut(&cur_page_id);
488
489 check_parent!(self, cur_page_id, parent);
490
491 let should_split = cur_page.get_split_flag();
492 if !should_split {
493 return Ok(false);
494 }
495
496 let snapshot_guard = match CPRSnapShotMgr::get_snapshot_guard(self.snapshot_mgr.clone()) {
498 Ok(guard) => guard,
499 Err(()) => {
500 return Err(TreeError::NeedRestart);
501 }
502 };
503
504 match parent {
505 Some(_) => {
506 unreachable!("Leaf node split should not happen here");
507 }
508 None => {
509 if snapshot_guard.is_protected() {
510 let local_thread_snapshot_version =
511 CPRSnapShotMgr::get_snapshot_thread_version();
512 match snapshot_guard.get_local_phase_id() {
513 1 => {
515 if self.cache_only {
516 let root_page_loc = cur_page.get_page_location().clone();
517 match root_page_loc {
518 PageLocation::Mini(ptr) => {
519 let root_page = cur_page.load_cache_page_mut(ptr);
520 if root_page.get_snapshot_version()
521 > local_thread_snapshot_version
522 {
523 return Err(TreeError::NeedRestart);
524 }
525 }
526 _ => {
527 panic!(
528 "The root node is not a mini-page in cache-only mode"
529 )
530 }
531 }
532 } else {
533 let root_page = cur_page.load_base_page_mut();
534 if root_page.get_snapshot_version() > local_thread_snapshot_version
535 {
536 return Err(TreeError::NeedRestart);
537 }
538 }
539 }
540 2 | 3 => {
542 if self.cache_only {
543 let root_page_loc = cur_page.get_page_location().clone();
544 match root_page_loc {
545 PageLocation::Mini(ptr) => {
546 let root_page = cur_page.load_cache_page_mut(ptr);
547 if root_page.get_snapshot_version()
548 < local_thread_snapshot_version
549 {
550 let root_page_ptr = unsafe {
551 std::slice::from_raw_parts(
552 root_page as *const LeafNode as *const u8,
553 root_page.meta.node_size as usize,
554 )
555 };
556 snapshot_guard.snapshot_mini_page(
557 cur_page_id,
558 root_page_ptr,
559 root_page.meta.node_size as usize,
560 );
561 root_page.set_snapshot_version(
562 local_thread_snapshot_version,
563 false,
564 );
565 snapshot_guard.snapshot_root_page(cur_page_id);
566 }
567 }
568 _ => {
569 panic!(
570 "The root node is not a mini-page in cache-only mode"
571 )
572 }
573 }
574 } else {
575 let root_page = cur_page.load_base_page_mut();
576 if root_page.get_snapshot_version() < local_thread_snapshot_version
577 {
578 let root_page_ptr = unsafe {
579 std::slice::from_raw_parts(
580 root_page as *const LeafNode as *const u8,
581 root_page.meta.node_size as usize,
582 )
583 };
584 snapshot_guard.snapshot_base_page(
585 cur_page_id,
586 root_page_ptr,
587 root_page.meta.node_size as usize,
588 );
589 root_page
590 .set_snapshot_version(local_thread_snapshot_version, false);
591 snapshot_guard.snapshot_root_page(cur_page_id);
592 }
593 }
594 }
595 _ => {}
596 }
597 }
598
599 if self.cache_only {
603 let mini_page_guard = self
606 .storage
607 .alloc_mini_page(self.config.leaf_page_size)
608 .expect("Fail to allocate a mini-page during root split");
609 LeafNode::initialize_mini_page(
610 &mini_page_guard,
611 self.config.leaf_page_size,
612 MiniPageNextLevel::new_null(),
613 true,
614 );
615 let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
616 let mini_loc = PageLocation::Mini(new_mini_ptr);
617
618 let (sibling_id, _mini_lock) = self
620 .storage
621 .mapping_table()
622 .insert_mini_page_mapping(mini_loc);
623
624 let cur_page_loc = cur_page.get_page_location().clone();
626 match cur_page_loc {
627 PageLocation::Mini(ptr) => {
628 let cur_mini_page = cur_page.load_cache_page_mut(ptr);
629 let sibling_page = unsafe { &mut *new_mini_ptr };
630 let split_key = cur_mini_page.split(sibling_page, true);
631
632 let mut new_root_builder = InnerNodeBuilder::new();
633 new_root_builder
634 .set_left_most_page_id(cur_page_id)
635 .set_children_is_leaf(true)
636 .add_record(split_key, sibling_id);
637
638 let new_root_ptr = new_root_builder.build();
639 unsafe { (*new_root_ptr).set_root(true) };
640 self.root_page_id
641 .store(PageID::from_pointer(new_root_ptr).raw(), Ordering::Release);
642
643 info!(sibling = sibling_id.raw(), "New root node installed!");
644
645 debug_assert!(cur_mini_page.meta.meta_count_with_fence() > 0);
646 debug_assert!(sibling_page.meta.meta_count_with_fence() > 0);
647
648 return Ok(true);
649 }
650 _ => {
651 panic!("The root node is not a mini-page in cache-only mode")
652 }
653 }
654 }
655
656 let mut x_page = cur_page;
657
658 let (sibling_id, mut sibling_entry) = self.alloc_base_page_and_lock();
659
660 info!(sibling = sibling_id.raw(), "Splitting root node!");
661
662 let sibling = sibling_entry.load_base_page_mut();
663
664 let leaf_node = x_page.load_base_page_mut();
665 let split_key = leaf_node.split(sibling, false);
666
667 let mut new_root_builder = InnerNodeBuilder::new();
668 new_root_builder
669 .set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE))
670 .set_left_most_page_id(cur_page_id)
671 .set_children_is_leaf(true)
672 .add_record(split_key, sibling_id);
673
674 let new_root_ptr = new_root_builder.build();
675
676 unsafe { (*new_root_ptr).set_root(true) };
677
678 self.root_page_id
679 .store(PageID::from_pointer(new_root_ptr).raw(), Ordering::Release);
680
681 info!(sibling = sibling_id.raw(), "New root node installed!");
682 Ok(true)
683 }
684 }
685 }
686
687 fn alloc_base_page_and_lock(&self) -> (PageID, LeafEntryXLocked<'_>) {
688 let (pid, base_entry) = self.mapping_table().alloc_base_page_mapping();
689
690 (pid, base_entry)
691 }
692
693 fn try_split_inner<'a>(
694 &self,
695 cur_page: PageID,
696 parent: Option<ReadGuard<'a>>,
697 ) -> Result<(bool, Option<ReadGuard<'a>>), TreeError> {
698 let cur_node = ReadGuard::try_read(cur_page.as_inner_node())?;
699
700 check_parent!(self, cur_page, parent);
701
702 let should_split = cur_node.as_ref().meta.get_split_flag();
703
704 if !should_split {
705 return Ok((false, parent));
706 }
707
708 info!(has_parent = parent.is_some(), "split inner node");
709
710 let snapshot_guard = match CPRSnapShotMgr::get_snapshot_guard(self.snapshot_mgr.clone()) {
712 Ok(guard) => guard,
713 Err(()) => {
714 return Err(TreeError::NeedRestart);
715 }
716 };
717
718 match parent {
719 Some(p) => {
720 let mut x_cur = cur_node.upgrade().map_err(|(_x, e)| e)?;
721 let mut x_parent = p.upgrade().map_err(|(_x, e)| e)?;
722
723 if snapshot_guard.is_protected() {
725 let local_thread_snapshot_version =
726 CPRSnapShotMgr::get_snapshot_thread_version();
727
728 match snapshot_guard.get_local_phase_id() {
729 1 => {
731 if x_parent.as_ref().get_snapshot_version()
733 > local_thread_snapshot_version
734 || x_cur.as_ref().get_snapshot_version()
735 > local_thread_snapshot_version
736 {
737 return Err(TreeError::NeedRestart);
738 }
739 }
740 2 | 3 => {
742 if x_parent.as_ref().get_snapshot_version()
743 < local_thread_snapshot_version
744 {
745 snapshot_guard.snapshot_inner_node(x_parent.as_ref());
746 x_parent
747 .as_mut()
748 .set_snapshot_version(local_thread_snapshot_version);
749
750 if x_parent.as_ref().is_root() {
751 let root_id = self.root_page_id.load(Ordering::Acquire);
752 assert_eq!(
753 root_id,
754 PageID::from_pointer(x_parent.as_ref() as *const InnerNode)
755 .raw()
756 );
757
758 snapshot_guard.snapshot_root_page(PageID::from_pointer(
759 x_parent.as_ref() as *const InnerNode,
760 ));
761 }
762 }
763
764 if x_cur.as_ref().get_snapshot_version() < local_thread_snapshot_version
765 {
766 snapshot_guard.snapshot_inner_node(x_cur.as_ref());
767 x_cur
768 .as_mut()
769 .set_snapshot_version(local_thread_snapshot_version);
770 }
771 }
772 _ => {}
773 }
774 }
775
776 let split_key = x_cur.as_ref().get_split_key();
777
778 let mut sibling_builder = InnerNodeBuilder::new();
779 sibling_builder.set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE));
780
781 let success = x_parent
782 .as_mut()
783 .insert(&split_key, sibling_builder.get_page_id());
784 if !success {
785 x_parent.as_mut().meta.set_split_flag();
786 return Err(TreeError::NeedRestart);
787 }
788
789 x_cur.as_mut().split(&mut sibling_builder);
790
791 sibling_builder.build();
792
793 Ok((true, Some(x_parent.downgrade())))
794 }
795 None => {
796 let mut x_cur = cur_node.upgrade().map_err(|(_x, e)| e)?;
797
798 if snapshot_guard.is_protected() {
800 let local_thread_snapshot_version =
801 CPRSnapShotMgr::get_snapshot_thread_version();
802
803 match snapshot_guard.get_local_phase_id() {
804 1 => {
806 if x_cur.as_ref().get_snapshot_version() > local_thread_snapshot_version
808 {
809 return Err(TreeError::NeedRestart);
810 }
811 }
812 2 | 3 => {
814 if x_cur.as_ref().get_snapshot_version() < local_thread_snapshot_version
815 {
816 snapshot_guard.snapshot_inner_node(x_cur.as_ref());
817 x_cur
818 .as_mut()
819 .set_snapshot_version(local_thread_snapshot_version);
820
821 let root_id = self.root_page_id.load(Ordering::Acquire);
823 assert!(x_cur.as_ref().is_root());
824 assert_eq!(
825 root_id,
826 PageID::from_pointer(x_cur.as_ref() as *const InnerNode).raw()
827 );
828
829 snapshot_guard.snapshot_root_page(PageID::from_pointer(
830 x_cur.as_ref() as *const InnerNode,
831 ));
832 }
833 }
834 _ => {}
835 }
836 }
837
838 let mut sibling_builder = InnerNodeBuilder::new();
839 sibling_builder.set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE));
840 let sibling_id = sibling_builder.get_page_id();
841
842 let split_key = x_cur.as_mut().split(&mut sibling_builder);
843 x_cur.as_mut().set_root(false);
844
845 let mut new_root_builder = InnerNodeBuilder::new();
846 new_root_builder
847 .set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE))
848 .set_left_most_page_id(cur_page)
849 .set_children_is_leaf(false)
850 .add_record(split_key, sibling_id);
851 sibling_builder.build();
852 let new_root_ptr = new_root_builder.build();
853
854 unsafe { (*new_root_ptr).set_root(true) };
855
856 let _x_root = ReadGuard::try_read(new_root_ptr)
857 .unwrap()
858 .upgrade()
859 .unwrap();
860 self.root_page_id
861 .store(PageID::from_pointer(new_root_ptr).raw(), Ordering::Release);
862
863 info!(
864 has_parent = parent.is_some(),
865 cur = cur_page.raw(),
866 "finished split inner node"
867 );
868
869 Ok((true, parent))
870 }
871 }
872 }
873
874 pub(crate) fn traverse_to_leaf(
875 &self,
876 key: &[u8],
877 aggressive_split: bool,
878 ) -> Result<(PageID, Option<ReadGuard<'_>>), TreeError> {
879 let (mut cur_page, mut cur_is_leaf) = self.get_root_page();
880 let mut parent: Option<ReadGuard> = None;
881
882 loop {
883 if aggressive_split {
884 if cur_is_leaf
885 && !cur_page.is_inner_node_pointer()
886 && self.try_split_leaf(cur_page, &parent)?
887 {
888 return Err(TreeError::NeedRestart);
889 } else if !cur_is_leaf {
890 let (split_success, new_parent) = self.try_split_inner(cur_page, parent)?;
891 if split_success {
892 return Err(TreeError::NeedRestart);
893 } else {
894 parent = new_parent;
895 }
896 }
897 }
898
899 if cur_is_leaf {
900 return Ok((cur_page, parent));
901 } else {
902 let next = ReadGuard::try_read(cur_page.as_inner_node())?;
903
904 check_parent!(self, cur_page, parent);
905
906 let next_node = next.as_ref();
907 let next_is_leaf = next_node.meta.children_is_leaf();
908 let pos = next_node.lower_bound(key);
909 let kv_meta = next_node.get_kv_meta(pos as u16);
910 cur_page = next_node.get_value(kv_meta);
911 cur_is_leaf = next_is_leaf;
912 parent = Some(next);
913 }
914 }
915 }
916
917 fn write_inner(&self, write_op: WriteOp, aggressive_split: bool) -> Result<(), TreeError> {
918 let (pid, parent) = self.traverse_to_leaf(write_op.key, aggressive_split)?;
919
920 let mut leaf_entry = self.mapping_table().get_mut(&pid);
921
922 check_parent!(self, pid, parent);
923
924 let page_loc = leaf_entry.get_page_location();
925 match page_loc {
926 PageLocation::Null => {
927 if !self.cache_only {
928 panic!("Found an Null page in non cache-only mode");
929 }
930
931 if write_op.op_type == OpType::Delete {
932 return Ok(());
933 }
934
935 let mini_page_size = LeafNode::get_chain_size_hint(
937 write_op.key.len(),
938 write_op.value.len(),
939 &self.mini_page_size_classes,
940 self.cache_only,
941 );
942
943 let snapshot_guard =
945 match CPRSnapShotMgr::get_snapshot_guard(self.snapshot_mgr.clone()) {
946 Ok(guard) => guard,
947 Err(()) => {
948 return Err(TreeError::NeedRestart);
949 }
950 };
951
952 if snapshot_guard.is_protected() {
953 snapshot_guard.snapshot_mini_page(pid, &[], 0);
958 }
959
960 let mini_page_guard = self.storage.alloc_mini_page(mini_page_size)?;
961 LeafNode::initialize_mini_page(
962 &mini_page_guard,
963 mini_page_size,
964 MiniPageNextLevel::new_null(),
965 true,
966 );
967 let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
968 let mini_loc = PageLocation::Mini(new_mini_ptr);
969
970 leaf_entry.create_cache_page_loc(mini_loc);
971
972 let mini_page_ref = leaf_entry.load_cache_page_mut(new_mini_ptr);
973 let insert_success =
974 mini_page_ref.insert(write_op.key, write_op.value, write_op.op_type, 0);
975 assert!(insert_success);
976
977 debug_assert!(mini_page_ref.meta.meta_count_with_fence() > 0);
978 counter!(InsertCreatedMiniPage);
979 }
980 _ => {
981 leaf_entry.insert(
982 write_op.key,
983 write_op.value,
984 parent,
985 write_op.op_type,
986 &self.storage,
987 &self.write_load_full_page,
988 &self.cache_only,
989 &self.mini_page_size_classes,
990 )?;
991
992 if leaf_entry.cache_page_about_to_evict(&self.storage) {
993 _ = leaf_entry.move_cache_page_to_tail(&self.storage);
995 }
996
997 if let Some(wal) = &self.wal {
998 let lsn = wal.append_and_wait(&write_op, leaf_entry.get_disk_offset());
999 leaf_entry.update_lsn(lsn);
1000 }
1001 }
1002 }
1003
1004 Ok(())
1005 }
1006
1007 pub(crate) fn evict_from_circular_buffer(&self) -> Result<usize, TreeError> {
1009 const TARGET_EVICT_SIZE: usize = 1024;
1013 let mut evicted = 0;
1014
1015 let mut retry_cnt = 0;
1017
1018 while evicted < TARGET_EVICT_SIZE && retry_cnt < 10 {
1019 let n = self
1020 .storage
1021 .evict_from_buffer(|mini_page_handle: &TombstoneHandle| {
1022 eviction_callback(mini_page_handle, self)
1023 })?;
1024 evicted += n as usize;
1025 retry_cnt += 1;
1026 }
1027 info!("stopped evict from circular buffer");
1028 Ok(evicted)
1029 }
1030
1031 pub fn insert(&self, key: &[u8], value: &[u8]) -> LeafInsertResult {
1048 if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
1050 return LeafInsertResult::InvalidKV(format!("Key too large {}", key.len()));
1051 }
1052
1053 if key.is_empty() {
1055 return LeafInsertResult::InvalidKV(format!(
1056 "Key too small {}, at least one byte",
1057 key.len()
1058 ));
1059 }
1060
1061 if value.len() > MAX_VALUE_LEN || key.len() + value.len() > self.config.cb_max_record_size {
1063 return LeafInsertResult::InvalidKV(format!(
1064 "Record too large {}, {}, please adjust cb_max_record_size in config",
1065 key.len(),
1066 value.len()
1067 ));
1068 }
1069
1070 if key.len() + value.len() < self.config.cb_min_record_size {
1072 return LeafInsertResult::InvalidKV(format!(
1073 "Record too small {}, {}, please adjust cb_min_record_size in config",
1074 key.len(),
1075 value.len()
1076 ));
1077 }
1078
1079 let backoff = Backoff::new();
1080 let mut aggressive_split = false;
1081
1082 counter!(Insert);
1083 info!(key_len = key.len(), value_len = value.len(), "insert");
1084
1085 loop {
1086 let result = self.write_inner(WriteOp::make_insert(key, value), aggressive_split);
1087 match result {
1088 Ok(_) => return LeafInsertResult::Success,
1089 Err(TreeError::NeedRestart) => {
1090 #[cfg(all(feature = "shuttle", test))]
1091 {
1092 shuttle::thread::yield_now();
1093 }
1094 counter!(InsertNeedRestart);
1095 aggressive_split = true;
1096 }
1097 Err(TreeError::CircularBufferFull) => {
1098 info!("insert failed, started evict from circular buffer");
1099 aggressive_split = true;
1100 counter!(InsertCircularBufferFull);
1101 _ = self.evict_from_circular_buffer();
1102 continue;
1103 }
1104 Err(TreeError::Locked) => {
1105 counter!(InsertLocked);
1106 backoff.snooze();
1107 }
1108 }
1109 }
1110 }
1111
1112 pub fn read(&self, key: &[u8], out_buffer: &mut [u8]) -> LeafReadResult {
1132 if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
1134 return LeafReadResult::InvalidKey;
1135 }
1136
1137 if key.is_empty() {
1139 return LeafReadResult::InvalidKey;
1140 }
1141
1142 let backoff = Backoff::new();
1143
1144 info!(key_len = key.len(), "read");
1145 counter!(Read);
1146 let mut aggressive_split = false;
1147
1148 #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
1149 let mut debug_timer = DebugTimerGuard::new(Timer::Read, self.metrics_recorder.clone());
1150
1151 loop {
1152 let result = self.read_inner(key, out_buffer, aggressive_split);
1153 match result {
1154 Ok(v) => {
1155 #[cfg(any(
1156 feature = "metrics-rt-debug-all",
1157 feature = "metrics-rt-debug-timer"
1158 ))]
1159 debug_timer.end();
1160
1161 return v;
1162 }
1163 Err(TreeError::CircularBufferFull) => {
1164 info!("read promotion failed, started evict from circular buffer");
1165 aggressive_split = true;
1166 match self.evict_from_circular_buffer() {
1167 Ok(_) => continue,
1168 Err(_) => continue,
1169 };
1170 }
1171 Err(_) => {
1172 backoff.spin();
1173 aggressive_split = true;
1174 }
1175 }
1176 }
1177 }
1178
1179 pub fn delete(&self, key: &[u8]) {
1193 if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
1195 return;
1196 }
1197
1198 if key.is_empty() {
1200 return;
1201 }
1202
1203 let backoff = Backoff::new();
1204
1205 info!(key_len = key.len(), "delete");
1206
1207 let mut aggressive_split = false;
1208
1209 loop {
1210 let result = self.write_inner(WriteOp::make_delete(key), aggressive_split);
1211 match result {
1212 Ok(_) => return,
1213 Err(TreeError::CircularBufferFull) => {
1214 info!("delete failed, started evict from circular buffer");
1215 aggressive_split = true;
1216 match self.evict_from_circular_buffer() {
1217 Ok(_) => continue,
1218 Err(_) => continue,
1219 };
1220 }
1221 Err(_) => {
1222 aggressive_split = true;
1223 backoff.spin();
1224 }
1225 }
1226 }
1227 }
1228
1229 pub fn scan_with_count<'a>(
1232 &'a self,
1233 key: &[u8],
1234 cnt: usize,
1235 return_field: ScanReturnField,
1236 ) -> Result<ScanIter<'a, 'a>, ScanIterError> {
1237 if self.cache_only {
1239 return Err(ScanIterError::CacheOnlyMode);
1240 }
1241
1242 if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
1244 return Err(ScanIterError::InvalidStartKey);
1245 }
1246
1247 if key.is_empty() {
1249 return Err(ScanIterError::InvalidStartKey);
1250 }
1251
1252 if cnt == 0 {
1254 return Err(ScanIterError::InvalidCount);
1255 }
1256
1257 Ok(ScanIter::new_with_scan_count(self, key, cnt, return_field))
1258 }
1259
1260 pub fn scan_with_end_key<'a>(
1261 &'a self,
1262 start_key: &[u8],
1263 end_key: &[u8],
1264 return_field: ScanReturnField,
1265 ) -> Result<ScanIter<'a, 'a>, ScanIterError> {
1266 if self.cache_only {
1268 return Err(ScanIterError::CacheOnlyMode);
1269 }
1270
1271 if start_key.len() > self.config.max_fence_len / 2 || start_key.len() > MAX_KEY_LEN {
1273 return Err(ScanIterError::InvalidStartKey);
1274 }
1275
1276 if start_key.is_empty() {
1278 return Err(ScanIterError::InvalidStartKey);
1279 }
1280
1281 if end_key.len() > self.config.max_fence_len / 2 || end_key.len() > MAX_KEY_LEN {
1283 return Err(ScanIterError::InvalidEndKey);
1284 }
1285
1286 if end_key.is_empty() {
1288 return Err(ScanIterError::InvalidEndKey);
1289 }
1290
1291 let cmp = start_key.cmp(end_key);
1293 if cmp == std::cmp::Ordering::Greater {
1294 return Err(ScanIterError::InvalidKeyRange);
1295 }
1296
1297 Ok(ScanIter::new_with_end_key(
1298 self,
1299 start_key,
1300 end_key,
1301 return_field,
1302 ))
1303 }
1304
1305 #[doc(hidden)]
1306 pub fn scan_mut_with_count<'a>(
1307 &'a self,
1308 key: &'a [u8],
1309 cnt: usize,
1310 return_field: ScanReturnField,
1311 ) -> Result<ScanIterMut<'a, 'a>, ScanIterError> {
1312 if self.cache_only {
1314 return Err(ScanIterError::CacheOnlyMode);
1315 }
1316
1317 if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
1319 return Err(ScanIterError::InvalidStartKey);
1320 }
1321
1322 if cnt == 0 {
1324 return Err(ScanIterError::InvalidCount);
1325 }
1326
1327 Ok(ScanIterMut::new_with_scan_count(
1328 self,
1329 key,
1330 cnt,
1331 return_field,
1332 ))
1333 }
1334
1335 #[doc(hidden)]
1336 pub fn scan_mut_with_end_key<'a>(
1337 &'a self,
1338 start_key: &'a [u8],
1339 end_key: &'a [u8],
1340 return_field: ScanReturnField,
1341 ) -> Result<ScanIterMut<'a, 'a>, ScanIterError> {
1342 if self.cache_only {
1344 return Err(ScanIterError::CacheOnlyMode);
1345 }
1346
1347 if start_key.len() > self.config.max_fence_len / 2 || start_key.len() > MAX_KEY_LEN {
1349 return Err(ScanIterError::InvalidStartKey);
1350 }
1351
1352 if end_key.len() > self.config.max_fence_len / 2 || end_key.len() > MAX_KEY_LEN {
1354 return Err(ScanIterError::InvalidEndKey);
1355 }
1356
1357 Ok(ScanIterMut::new_with_end_key(
1358 self,
1359 start_key,
1360 end_key,
1361 return_field,
1362 ))
1363 }
1364
1365 fn read_inner(
1366 &self,
1367 key: &[u8],
1368 out_buffer: &mut [u8],
1369 aggressive_split: bool,
1370 ) -> Result<LeafReadResult, TreeError> {
1371 let (node, parent) = self.traverse_to_leaf(key, aggressive_split)?;
1372
1373 let mut leaf = self.mapping_table().get(&node);
1374
1375 check_parent!(self, node, parent);
1376
1377 let out = leaf.read(
1378 key,
1379 out_buffer,
1380 self.config.mini_page_binary_search,
1381 self.cache_only,
1382 );
1383 match out {
1384 ReadResult::Mini(r) | ReadResult::Full(r) => {
1385 if leaf.cache_page_about_to_evict(&self.storage) {
1386 let mut x_leaf = match leaf.try_upgrade(self.snapshot_mgr.clone(), node) {
1387 Ok(v) => v,
1388 Err(_) => return Ok(r),
1389 };
1390 _ = x_leaf.move_cache_page_to_tail(&self.storage);
1392 }
1393
1394 Ok(r)
1395 }
1396
1397 ReadResult::Base(r) => {
1398 counter!(BasePageRead);
1399
1400 if self.cache_only {
1402 panic!("Attempt to read a base page while in cache-only mode.");
1403 }
1404
1405 let v = match r {
1406 LeafReadResult::Found(v) => v,
1407 _ => return Ok(r),
1408 };
1409
1410 if parent.is_none() || !self.should_promote_read() {
1411 return Ok(r);
1412 }
1413
1414 let mut x_leaf = match leaf.try_upgrade(self.snapshot_mgr.clone(), node) {
1415 Ok(x) => x,
1416 Err(_) => {
1417 return Ok(r);
1418 }
1419 };
1420
1421 if self.config.read_record_cache {
1422 let out = x_leaf.insert(
1426 key,
1427 &out_buffer[0..v as usize],
1428 parent,
1429 OpType::Cache,
1430 &self.storage,
1431 &self.write_load_full_page,
1432 &self.cache_only,
1433 &self.mini_page_size_classes,
1434 );
1435
1436 match out {
1437 Ok(_) => {
1438 counter!(ReadPromotionOk);
1439 Ok(r)
1440 }
1441 Err(TreeError::Locked) => {
1442 counter!(ReadPromotionFailed);
1444 Ok(r)
1445 }
1446 Err(TreeError::CircularBufferFull) => {
1447 counter!(ReadPromotionFailed);
1448 Err(TreeError::CircularBufferFull)
1449 }
1450 Err(TreeError::NeedRestart) => {
1451 counter!(ReadPromotionFailed);
1453 Err(TreeError::NeedRestart)
1454 }
1455 }
1456 } else {
1457 match self.upgrade_to_full_page(x_leaf, parent.unwrap()) {
1458 Ok(_) | Err(TreeError::Locked) => Ok(r),
1459 Err(e) => Err(e),
1460 }
1461 }
1462 }
1463 ReadResult::None => Ok(LeafReadResult::NotFound),
1464 }
1465 }
1466
1467 fn upgrade_to_full_page<'a>(
1468 &'a self,
1469 mut x_leaf: LeafEntryXLocked<'a>,
1470 parent: ReadGuard<'a>,
1471 ) -> Result<LeafEntryXLocked<'a>, TreeError> {
1472 let page_loc = x_leaf.get_page_location().clone();
1473 match page_loc {
1474 PageLocation::Mini(ptr) => {
1475 let mini_page = x_leaf.load_cache_page_mut(ptr);
1476 let h = self.storage.begin_dealloc_mini_page(mini_page)?;
1477 let _merge_result = x_leaf.try_merge_mini_page(&h, parent, &self.storage)?;
1478 let base_offset = mini_page.next_level;
1479 x_leaf.change_to_base_loc();
1480 self.storage.finish_dealloc_mini_page(h);
1481
1482 let base_page_ref = x_leaf.load_base_page_from_buffer();
1483 let full_page_loc =
1484 upgrade_to_full_page(&self.storage, base_page_ref, base_offset)?;
1485 x_leaf.create_cache_page_loc(full_page_loc);
1486 Ok(x_leaf)
1487 }
1488 PageLocation::Full(_ptr) => Ok(x_leaf),
1489 PageLocation::Base(offset) => {
1490 let base_page_ref = x_leaf.load_base_page(offset);
1491 let next_level = MiniPageNextLevel::new(offset);
1492 let full_page_loc = upgrade_to_full_page(&self.storage, base_page_ref, next_level)?;
1493 x_leaf.create_cache_page_loc(full_page_loc);
1494 Ok(x_leaf)
1495 }
1496 PageLocation::Null => panic!("upgrade_to_full_page on Null page"),
1497 }
1498 }
1499
1500 pub fn get_metrics(&mut self) -> Option<serde_json::Value> {
1503 #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
1504 {
1505 let recorder = self.metrics_recorder.take();
1506 match recorder {
1507 Some(r) => {
1508 let recorders = Arc::try_unwrap(r).expect("Failed to obtain the recorders of bf-tree, please make sure no other references exist.");
1509 let mut timer_accumulated = TimerRecorder::default();
1510
1511 for r in recorders {
1513 let t = unsafe { &*r.get() };
1514
1515 timer_accumulated += t.timers.clone();
1516 }
1517
1518 let output = serde_json::json!({
1519 "Timers": timer_accumulated,
1520 });
1521
1522 self.metrics_recorder = Some(Arc::new(ThreadLocal::new()));
1523
1524 Some(output)
1525 }
1526 None => None,
1527 }
1528 }
1529 #[cfg(not(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer")))]
1530 {
1531 None
1532 }
1533 }
1534}
1535
1536pub(crate) fn key_value_physical_size(key: &[u8], value: &[u8]) -> usize {
1537 let key_size = key.len();
1538 let value_size = value.len();
1539 let meta_size = crate::nodes::KV_META_SIZE;
1540 key_size + value_size + meta_size
1541}
1542
1543pub(crate) fn eviction_callback(
1544 mini_page_handle: &TombstoneHandle,
1545 tree: &BfTree,
1546) -> Result<(), TreeError> {
1547 let mini_page = mini_page_handle.ptr as *mut LeafNode;
1548 let key_to_this_page = if tree.cache_only {
1549 unsafe { &*mini_page }.try_get_key_to_reach_this_node()?
1550 } else {
1551 unsafe { &*mini_page }.get_key_to_reach_this_node()
1552 };
1553
1554 let (pid, parent) = tree.traverse_to_leaf(&key_to_this_page, true)?;
1556 info!(
1557 pid = pid.raw(),
1558 "starting to merge mini page in eviction call back"
1559 );
1560
1561 let mut leaf_entry = tree.mapping_table().get_mut(&pid);
1562
1563 histogram!(EvictNodeSize, unsafe { &*mini_page }.meta.node_size as u64);
1564
1565 match leaf_entry.get_page_location() {
1566 PageLocation::Mini(ptr) => {
1567 {
1568 if *ptr != mini_page {
1575 return Err(TreeError::NeedRestart);
1576 }
1577 }
1578
1579 let parent = parent.expect("Mini page must have a parent");
1580 parent.check_version()?;
1581
1582 if tree.cache_only {
1585 leaf_entry.change_to_null_loc();
1586 } else {
1587 leaf_entry.try_merge_mini_page(mini_page_handle, parent, &tree.storage)?;
1588 leaf_entry.change_to_base_loc();
1589 }
1591
1592 Ok(())
1593 }
1594
1595 PageLocation::Full(ptr) => {
1596 if *ptr != mini_page {
1597 return Err(TreeError::NeedRestart);
1598 }
1599
1600 leaf_entry.merge_full_page(mini_page_handle);
1601 Ok(())
1602 }
1603
1604 PageLocation::Base(_offset) => Err(TreeError::NeedRestart),
1606
1607 PageLocation::Null => Err(TreeError::NeedRestart),
1609 }
1610}
1611
1612#[cfg(test)]
1613mod tests {
1614 use crate::error::ConfigError;
1615 use crate::BfTree;
1616
1617 #[test]
1618 fn test_mini_page_size_classes() {
1619 let mut size_classes = BfTree::create_mem_page_size_classes(48, 1952, 4096, 64, false);
1620 assert_eq!(
1621 size_classes,
1622 vec![128, 192, 256, 512, 960, 1856, 2048, 4096]
1623 );
1624
1625 size_classes = BfTree::create_mem_page_size_classes(1544, 1544, 3136, 64, true);
1626 assert_eq!(size_classes, vec![1536, 3136]);
1627
1628 size_classes = BfTree::create_mem_page_size_classes(48, 3072, 12288, 64, false);
1629 assert_eq!(
1630 size_classes,
1631 vec![128, 192, 256, 512, 960, 1856, 3648, 7232, 9088, 12288]
1632 );
1633
1634 size_classes = BfTree::create_mem_page_size_classes(4, 1952, 4096, 32, false);
1635 assert_eq!(size_classes, vec![64, 128, 256, 448, 832, 1600, 2048, 4096]);
1636 }
1637
1638 #[test]
1639 fn test_invalid_config_to_build_bf_tree() {
1640 let mut config = crate::Config::default();
1642 config.cb_min_record_size(4);
1643 config.leaf_page_size(32 * 1024);
1644
1645 if let Err(e) = BfTree::with_config(config.clone(), None) {
1646 match e {
1647 ConfigError::MinimumRecordSize(_) => {}
1648 _ => panic!("Expected InvalidMinimumRecordSize error"),
1649 }
1650 } else {
1651 panic!("Expected error but got Ok");
1652 }
1653
1654 config = crate::Config::default();
1656 config.cb_max_record_size(64 * 1024);
1657
1658 if let Err(e) = BfTree::with_config(config.clone(), None) {
1659 match e {
1660 ConfigError::MaximumRecordSize(_) => {}
1661 _ => panic!("Expected InvalidMaximumRecordSize error"),
1662 }
1663 } else {
1664 panic!("Expected error but got Ok");
1665 }
1666
1667 config = crate::Config::default();
1669 config.leaf_page_size(4050);
1670
1671 if let Err(e) = BfTree::with_config(config.clone(), None) {
1672 match e {
1673 ConfigError::LeafPageSize(_) => {}
1674 _ => panic!("Expected InvalidLeafPageSize error"),
1675 }
1676 } else {
1677 panic!("Expected error but got Ok");
1678 }
1679
1680 config = crate::Config::default();
1682 config.leaf_page_size(16 * 1024);
1683 config.cb_size_byte(16 * 1024);
1684
1685 if let Err(e) = BfTree::with_config(config.clone(), None) {
1686 match e {
1687 ConfigError::CircularBufferSize(_) => {}
1688 _ => panic!("Expected InvalidCircularBufferSize error"),
1689 }
1690 } else {
1691 panic!("Expected error but got Ok");
1692 }
1693
1694 config = crate::Config::default();
1696 config.cb_size_byte(20 * 1024);
1697 if let Err(e) = BfTree::with_config(config.clone(), None) {
1698 match e {
1699 ConfigError::CircularBufferSize(_) => {}
1700 _ => panic!("Expected InvalidCircularBufferSize error"),
1701 }
1702 } else {
1703 panic!("Expected error but got Ok");
1704 }
1705
1706 config = crate::Config::default();
1708 config.cache_only(true);
1709 config.cb_size_byte(2 * 4096);
1710
1711 if let Err(e) = BfTree::with_config(config.clone(), None) {
1712 match e {
1713 ConfigError::CircularBufferSize(_) => {}
1714 _ => panic!("Expected InvalidCircularBufferSize error"),
1715 }
1716 } else {
1717 panic!("Expected error but got Ok");
1718 }
1719 }
1720}