1#[cfg(not(all(feature = "shuttle", test)))]
5use rand::Rng;
6
7#[cfg(all(feature = "shuttle", test))]
8use shuttle::rand::Rng;
9
10use cfg_if::cfg_if;
11
12cfg_if! {
13 if #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))] {
14 use crate::metric::{Timer, TlsRecorder, timer::{TimerRecorder, DebugTimerGuard}};
15 use std::cell::UnsafeCell;
16 use thread_local::ThreadLocal;
17 }
18}
19
20use crate::{
21 check_parent,
22 circular_buffer::{CircularBufferMetrics, TombstoneHandle},
23 counter,
24 error::{ConfigError, TreeError},
25 histogram, info,
26 mini_page_op::{upgrade_to_full_page, LeafEntryXLocked, LeafOperations, ReadResult},
27 nodes::{
28 leaf_node::{LeafKVMeta, LeafReadResult, MiniPageNextLevel, OpType},
29 InnerNode, InnerNodeBuilder, LeafNode, PageID, CACHE_LINE_SIZE, DISK_PAGE_SIZE,
30 INNER_NODE_SIZE, MAX_KEY_LEN, MAX_LEAF_PAGE_SIZE, MAX_VALUE_LEN,
31 },
32 range_scan::{ScanIter, ScanIterMut, ScanReturnField},
33 storage::{LeafStorage, PageLocation, PageTable},
34 sync::{
35 atomic::{AtomicU64, Ordering},
36 Arc,
37 },
38 utils::{get_rng, inner_lock::ReadGuard, Backoff, BfsVisitor, NodeInfo},
39 wal::{WriteAheadLog, WriteOp},
40 Config, StorageBackend,
41};
42use std::path::Path;
43
44pub struct BfTree {
46 pub(crate) root_page_id: AtomicU64,
47 pub(crate) storage: LeafStorage,
48 pub(crate) wal: Option<Arc<WriteAheadLog>>,
49 pub(crate) config: Arc<Config>,
50 pub(crate) write_load_full_page: bool,
51 pub(crate) cache_only: bool, pub(crate) mini_page_size_classes: Vec<usize>, #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
54 pub metrics_recorder: Option<Arc<ThreadLocal<UnsafeCell<TlsRecorder>>>>, }
56
57unsafe impl Sync for BfTree {}
58
59unsafe impl Send for BfTree {}
60
61#[derive(Debug, PartialEq, Eq, Clone)]
62pub enum LeafInsertResult {
63 Success,
64 InvalidKV(String),
65}
66
67#[derive(Debug, PartialEq, Eq, Clone)]
68pub enum ScanIterError {
69 CacheOnlyMode,
70 InvalidStartKey,
71 InvalidEndKey,
72 InvalidCount,
73 InvalidKeyRange,
74}
75
76impl Drop for BfTree {
77 fn drop(&mut self) {
78 if let Some(ref wal) = self.wal {
79 wal.stop_background_job();
80 }
81
82 let visitor = BfsVisitor::new_all_nodes(self);
83 for node_info in visitor {
84 match node_info {
85 NodeInfo::Leaf { page_id, .. } => {
86 let mut leaf = self.mapping_table().get_mut(&page_id);
87 leaf.dealloc_self(&self.storage, self.cache_only);
88 }
89 NodeInfo::Inner { ptr, .. } => {
90 if unsafe { &*ptr }.is_valid_disk_offset() {
91 let disk_offset = unsafe { &*ptr }.disk_offset;
92 if self.config.storage_backend == StorageBackend::Memory {
93 self.storage.vfs.dealloc_offset(disk_offset as usize);
95 }
96 }
97 InnerNode::free_node(ptr as *mut InnerNode);
98 }
99 }
100 }
101 }
102}
103
104impl Default for BfTree {
105 fn default() -> Self {
106 Self::new(":memory:", 1024 * 1024 * 32).unwrap()
107 }
108}
109
110impl BfTree {
111 pub(crate) const ROOT_IS_LEAF_MASK: u64 = 0x8000_0000_0000_0000; pub(crate) fn create_mem_page_size_classes(
133 min_record_size_in_byte: usize,
134 max_record_size_in_byte: usize,
135 leaf_page_size_in_byte: usize,
136 max_fence_len_in_byte: usize,
137 cache_only: bool,
138 ) -> Vec<usize> {
139 assert!(
141 min_record_size_in_byte > 1,
142 "cb_min_record_size in config cannot be less than 2"
143 );
144 assert!(
145 min_record_size_in_byte <= max_record_size_in_byte,
146 "cb_min_record_size cannot be larger than cb_max_record_size"
147 );
148 assert!(
149 max_fence_len_in_byte > 0,
150 "max_fence_len in config cannot be zero"
151 );
152 assert!(
153 max_fence_len_in_byte / 2 < max_record_size_in_byte,
154 "max_fence_len/2 cannot be larger than cb_max_record_size"
155 );
156 assert!(
157 leaf_page_size_in_byte <= MAX_LEAF_PAGE_SIZE,
158 "leaf_page_size in config cannot be larger than {}",
159 MAX_LEAF_PAGE_SIZE
160 );
161 assert!(
162 max_fence_len_in_byte / 2 <= MAX_KEY_LEN,
163 "max_key_len in config cannot be larger than {}",
164 MAX_KEY_LEN
165 );
166 assert!(
167 leaf_page_size_in_byte / min_record_size_in_byte <= 4096,
168 "Maximum number of records per page (leaf_page_size/min_record_size) cannot exceed 2^12.", );
170
171 if !cache_only {
174 assert!(
175 leaf_page_size_in_byte.is_multiple_of(DISK_PAGE_SIZE),
176 "leaf_page_size in config should be multiple of {}",
177 DISK_PAGE_SIZE
178 );
179 } else {
180 assert!(
181 leaf_page_size_in_byte.is_multiple_of(CACHE_LINE_SIZE),
182 "leaf_page_size in config should be multiple of {}",
183 CACHE_LINE_SIZE
184 );
185 }
186
187 let max_record_size_with_meta = max_record_size_in_byte + std::mem::size_of::<LeafKVMeta>();
188 let mut max_mini_page_size: usize;
189
190 if cache_only {
191 max_mini_page_size = leaf_page_size_in_byte - max_record_size_with_meta;
193 max_mini_page_size = (max_mini_page_size / CACHE_LINE_SIZE) * CACHE_LINE_SIZE;
194
195 assert!(
196 leaf_page_size_in_byte
197 >= 2 * max_record_size_with_meta + std::mem::size_of::<LeafNode>(),
198 "cb_max_record_size of config should be <= {}",
199 (leaf_page_size_in_byte - std::mem::size_of::<LeafNode>()) / 2
200 - std::mem::size_of::<LeafKVMeta>()
201 );
202 } else {
203 max_mini_page_size = leaf_page_size_in_byte
205 - max_record_size_with_meta
206 - max_fence_len_in_byte
207 - 2 * std::mem::size_of::<LeafKVMeta>();
208 max_mini_page_size = (max_mini_page_size / CACHE_LINE_SIZE) * CACHE_LINE_SIZE;
209
210 assert!(
211 max_mini_page_size >= max_record_size_with_meta + std::mem::size_of::<LeafNode>(),
212 "cb_max_record_size of config should be <= {}",
213 max_mini_page_size
214 - std::mem::size_of::<LeafNode>()
215 - std::mem::size_of::<LeafKVMeta>()
216 );
217 }
218
219 let mut mem_page_size_classes = Vec::new();
221
222 let base: i32 = 2;
223 let mut record_num_per_page_exp: u32 = 0;
224 let c: usize = min_record_size_in_byte + std::mem::size_of::<LeafKVMeta>();
225
226 let mut size_class =
228 base.pow(record_num_per_page_exp) as usize * c + std::mem::size_of::<LeafNode>();
229
230 if !size_class.is_multiple_of(CACHE_LINE_SIZE) {
232 size_class = (size_class / CACHE_LINE_SIZE + 1) * CACHE_LINE_SIZE;
233 }
234
235 while size_class <= max_mini_page_size {
236 if mem_page_size_classes.is_empty()
237 || (mem_page_size_classes[mem_page_size_classes.len() - 1] < size_class)
238 {
239 mem_page_size_classes.push(size_class);
240 }
241
242 record_num_per_page_exp += 1;
243 size_class =
244 base.pow(record_num_per_page_exp) as usize * c + std::mem::size_of::<LeafNode>();
245
246 if !size_class.is_multiple_of(CACHE_LINE_SIZE) {
247 size_class = (size_class / CACHE_LINE_SIZE + 1) * CACHE_LINE_SIZE;
248 }
249 }
250
251 if !cache_only {
252 assert!(!mem_page_size_classes.is_empty());
253 }
254
255 if mem_page_size_classes.is_empty()
257 || mem_page_size_classes[mem_page_size_classes.len() - 1] < max_mini_page_size
258 {
259 mem_page_size_classes.push(max_mini_page_size);
260 }
261
262 if mem_page_size_classes.is_empty()
264 || mem_page_size_classes[mem_page_size_classes.len() - 1] < leaf_page_size_in_byte
265 {
266 mem_page_size_classes.push(leaf_page_size_in_byte);
267 }
268
269 if !cache_only {
270 assert!(mem_page_size_classes.len() >= 2);
271 } else {
272 assert!(!mem_page_size_classes.is_empty());
273 }
274
275 mem_page_size_classes
276 }
277
278 pub fn new(file_path: impl AsRef<Path>, cache_size_byte: usize) -> Result<Self, ConfigError> {
292 let config = Config::new(file_path, cache_size_byte);
293 Self::with_config(config, None)
294 }
295
296 pub fn new_with_config_file<P: AsRef<Path>>(config_file_path: P) -> Result<Self, ConfigError> {
299 let config = Config::new_with_config_file(config_file_path);
300 Self::with_config(config, None)
301 }
302
303 pub fn with_config(config: Config, buffer_ptr: Option<*mut u8>) -> Result<Self, ConfigError> {
306 config.validate()?;
308
309 let wal = match config.write_ahead_log.as_ref() {
310 Some(wal_config) => {
311 let wal = WriteAheadLog::new(wal_config.clone());
312 Some(wal)
313 }
314 None => None,
315 };
316 let write_load_full = config.write_load_full_page;
317 let config = Arc::new(config);
318
319 if config.cache_only {
321 let leaf_storage = LeafStorage::new(config.clone(), buffer_ptr);
322
323 let mini_page_guard = (leaf_storage)
325 .alloc_mini_page(config.leaf_page_size)
326 .expect("Fail to allocate a mini-page as initial root node");
327 LeafNode::initialize_mini_page(
328 &mini_page_guard,
329 config.leaf_page_size,
330 MiniPageNextLevel::new_null(),
331 true,
332 );
333 let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
334 let mini_loc = PageLocation::Mini(new_mini_ptr);
335
336 let (root_id, root_lock) = leaf_storage
337 .mapping_table()
338 .insert_mini_page_mapping(mini_loc);
339 assert_eq!(root_id.as_id(), 0);
340
341 drop(root_lock);
342 drop(mini_page_guard);
343 let root_id = root_id.raw() | Self::ROOT_IS_LEAF_MASK;
344
345 return Ok(Self {
346 root_page_id: AtomicU64::new(root_id),
347 storage: leaf_storage,
348 wal,
349 cache_only: config.cache_only,
350 write_load_full_page: write_load_full,
351 mini_page_size_classes: Self::create_mem_page_size_classes(
352 config.cb_min_record_size,
353 config.cb_max_record_size,
354 config.leaf_page_size,
355 config.max_fence_len,
356 config.cache_only,
357 ),
358 config,
359 #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
360 metrics_recorder: Some(Arc::new(ThreadLocal::new())),
361 });
362 }
363
364 let leaf_storage = LeafStorage::new(config.clone(), buffer_ptr);
365 let (root_id, root_lock) = leaf_storage.mapping_table().alloc_base_page_mapping();
366 drop(root_lock);
367 assert_eq!(root_id.as_id(), 0);
368
369 let root_id = root_id.raw() | Self::ROOT_IS_LEAF_MASK;
370 Ok(Self {
371 root_page_id: AtomicU64::new(root_id),
372 storage: leaf_storage,
373 wal,
374 cache_only: config.cache_only,
375 write_load_full_page: write_load_full,
376 mini_page_size_classes: Self::create_mem_page_size_classes(
377 config.cb_min_record_size,
378 config.cb_max_record_size,
379 config.leaf_page_size,
380 config.max_fence_len,
381 config.cache_only,
382 ),
383 config,
384 #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
385 metrics_recorder: Some(Arc::new(ThreadLocal::new())),
386 })
387 }
388
389 pub fn get_buffer_metrics(&self) -> CircularBufferMetrics {
393 self.storage.get_buffer_metrics()
394 }
395
396 pub(crate) fn get_root_page(&self) -> (PageID, bool) {
398 let root_page_id = self.root_page_id.load(Ordering::Acquire);
399 let root_is_leaf = (root_page_id & Self::ROOT_IS_LEAF_MASK) != 0;
400 let clean = root_page_id & (!Self::ROOT_IS_LEAF_MASK);
401
402 let page_id = if root_is_leaf {
403 PageID::from_id(clean)
404 } else {
405 PageID::from_pointer(clean as *const InnerNode)
406 };
407
408 (page_id, root_is_leaf)
409 }
410
411 pub(crate) fn mapping_table(&self) -> &PageTable {
412 self.storage.mapping_table()
413 }
414
415 pub(crate) fn should_promote_read(&self) -> bool {
416 get_rng().gen_range(0..100) < self.config.read_promotion_rate.load(Ordering::Relaxed)
417 }
418
419 pub(crate) fn should_promote_scan_page(&self) -> bool {
420 get_rng().gen_range(0..100) < self.config.scan_promotion_rate.load(Ordering::Relaxed)
421 }
422
423 pub fn update_read_promotion_rate(&self, new_rate: usize) {
425 self.config
426 .read_promotion_rate
427 .store(new_rate, Ordering::Relaxed);
428 }
429
430 fn try_split_leaf(
431 &self,
432 cur_page_id: PageID,
433 parent: &Option<ReadGuard>,
434 ) -> Result<bool, TreeError> {
435 debug_assert!(cur_page_id.is_id());
436
437 let mut cur_page = self.mapping_table().get_mut(&cur_page_id);
441
442 check_parent!(self, cur_page_id, parent);
443
444 let should_split = cur_page.get_split_flag();
445 if !should_split {
446 return Ok(false);
447 }
448 match parent {
449 Some(_) => {
450 unreachable!("Leaf node split should not happen here");
451 }
452 None => {
453 if self.cache_only {
457 let mini_page_guard = self
460 .storage
461 .alloc_mini_page(self.config.leaf_page_size)
462 .expect("Fail to allocate a mini-page during root split");
463 LeafNode::initialize_mini_page(
464 &mini_page_guard,
465 self.config.leaf_page_size,
466 MiniPageNextLevel::new_null(),
467 true,
468 );
469 let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
470 let mini_loc = PageLocation::Mini(new_mini_ptr);
471
472 let (sibling_id, _mini_lock) = self
474 .storage
475 .mapping_table()
476 .insert_mini_page_mapping(mini_loc);
477
478 let cur_page_loc = cur_page.get_page_location().clone();
480 match cur_page_loc {
481 PageLocation::Mini(ptr) => {
482 let cur_mini_page = cur_page.load_cache_page_mut(ptr);
483 let sibling_page = unsafe { &mut *new_mini_ptr };
484 let split_key = cur_mini_page.split(sibling_page, true);
485
486 let mut new_root_builder = InnerNodeBuilder::new();
487 new_root_builder
488 .set_left_most_page_id(cur_page_id)
489 .set_children_is_leaf(true)
490 .add_record(split_key, sibling_id);
491
492 let new_root_ptr = new_root_builder.build();
493
494 self.root_page_id
495 .store(PageID::from_pointer(new_root_ptr).raw(), Ordering::Release);
496
497 info!(sibling = sibling_id.raw(), "New root node installed!");
498
499 debug_assert!(cur_mini_page.meta.meta_count_with_fence() > 0);
500 debug_assert!(sibling_page.meta.meta_count_with_fence() > 0);
501
502 return Ok(true);
503 }
504 _ => {
505 panic!("The root node is not a mini-page in cache-only mode")
506 }
507 }
508 }
509
510 let mut x_page = cur_page;
511
512 let (sibling_id, mut sibling_entry) = self.alloc_base_page_and_lock();
513
514 info!(sibling = sibling_id.raw(), "Splitting root node!");
515
516 let sibling = sibling_entry.load_base_page_mut();
517
518 let leaf_node = x_page.load_base_page_mut();
519 let split_key = leaf_node.split(sibling, false);
520
521 let mut new_root_builder = InnerNodeBuilder::new();
522 new_root_builder
523 .set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE))
524 .set_left_most_page_id(cur_page_id)
525 .set_children_is_leaf(true)
526 .add_record(split_key, sibling_id);
527
528 let new_root_ptr = new_root_builder.build();
529
530 self.root_page_id
531 .store(PageID::from_pointer(new_root_ptr).raw(), Ordering::Release);
532
533 info!(sibling = sibling_id.raw(), "New root node installed!");
534 Ok(true)
535 }
536 }
537 }
538
539 fn alloc_base_page_and_lock(&self) -> (PageID, LeafEntryXLocked<'_>) {
540 let (pid, base_entry) = self.mapping_table().alloc_base_page_mapping();
541
542 (pid, base_entry)
543 }
544
545 fn try_split_inner<'a>(
546 &self,
547 cur_page: PageID,
548 parent: Option<ReadGuard<'a>>,
549 ) -> Result<(bool, Option<ReadGuard<'a>>), TreeError> {
550 let cur_node = ReadGuard::try_read(cur_page.as_inner_node())?;
551
552 check_parent!(self, cur_page, parent);
553
554 let should_split = cur_node.as_ref().meta.get_split_flag();
555
556 if !should_split {
557 return Ok((false, parent));
558 }
559
560 info!(has_parent = parent.is_some(), "split inner node");
561
562 match parent {
563 Some(p) => {
564 let mut x_cur = cur_node.upgrade().map_err(|(_x, e)| e)?;
565 let mut x_parent = p.upgrade().map_err(|(_x, e)| e)?;
566
567 let split_key = x_cur.as_ref().get_split_key();
568
569 let mut sibling_builder = InnerNodeBuilder::new();
570 sibling_builder.set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE));
571
572 let success = x_parent
573 .as_mut()
574 .insert(&split_key, sibling_builder.get_page_id());
575 if !success {
576 x_parent.as_mut().meta.set_split_flag();
577 return Err(TreeError::NeedRestart);
578 }
579
580 x_cur.as_mut().split(&mut sibling_builder);
581
582 sibling_builder.build();
583
584 Ok((true, Some(x_parent.downgrade())))
585 }
586 None => {
587 let mut x_cur = cur_node.upgrade().map_err(|(_x, e)| e)?;
588
589 let mut sibling_builder = InnerNodeBuilder::new();
590 sibling_builder.set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE));
591 let sibling_id = sibling_builder.get_page_id();
592
593 let split_key = x_cur.as_mut().split(&mut sibling_builder);
594
595 let mut new_root_builder = InnerNodeBuilder::new();
596 new_root_builder
597 .set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE))
598 .set_left_most_page_id(cur_page)
599 .set_children_is_leaf(false)
600 .add_record(split_key, sibling_id);
601 sibling_builder.build();
602 let new_root_ptr = new_root_builder.build();
603 let _x_root = ReadGuard::try_read(new_root_ptr)
604 .unwrap()
605 .upgrade()
606 .unwrap();
607 self.root_page_id
608 .store(PageID::from_pointer(new_root_ptr).raw(), Ordering::Release);
609
610 info!(
611 has_parent = parent.is_some(),
612 cur = cur_page.raw(),
613 "finished split inner node"
614 );
615
616 Ok((true, parent))
617 }
618 }
619 }
620
621 pub(crate) fn traverse_to_leaf(
622 &self,
623 key: &[u8],
624 aggressive_split: bool,
625 ) -> Result<(PageID, Option<ReadGuard<'_>>), TreeError> {
626 let (mut cur_page, mut cur_is_leaf) = self.get_root_page();
627 let mut parent: Option<ReadGuard> = None;
628
629 loop {
630 if aggressive_split {
631 if cur_is_leaf
632 && !cur_page.is_inner_node_pointer()
633 && self.try_split_leaf(cur_page, &parent)?
634 {
635 return Err(TreeError::NeedRestart);
636 } else if !cur_is_leaf {
637 let (split_success, new_parent) = self.try_split_inner(cur_page, parent)?;
638 if split_success {
639 return Err(TreeError::NeedRestart);
640 } else {
641 parent = new_parent;
642 }
643 }
644 }
645
646 if cur_is_leaf {
647 return Ok((cur_page, parent));
648 } else {
649 let next = ReadGuard::try_read(cur_page.as_inner_node())?;
650
651 check_parent!(self, cur_page, parent);
652
653 let next_node = next.as_ref();
654 let next_is_leaf = next_node.meta.children_is_leaf();
655 let pos = next_node.lower_bound(key);
656 let kv_meta = next_node.get_kv_meta(pos as u16);
657 cur_page = next_node.get_value(kv_meta);
658 cur_is_leaf = next_is_leaf;
659 parent = Some(next);
660 }
661 }
662 }
663
664 fn write_inner(&self, write_op: WriteOp, aggressive_split: bool) -> Result<(), TreeError> {
665 let (pid, parent) = self.traverse_to_leaf(write_op.key, aggressive_split)?;
666
667 let mut leaf_entry = self.mapping_table().get_mut(&pid);
668
669 check_parent!(self, pid, parent);
670
671 let page_loc = leaf_entry.get_page_location();
672 match page_loc {
673 PageLocation::Null => {
674 if !self.cache_only {
675 panic!("Found an Null page in non cache-only mode");
676 }
677
678 if write_op.op_type == OpType::Delete {
679 return Ok(());
680 }
681
682 let mini_page_size = LeafNode::get_chain_size_hint(
684 write_op.key.len(),
685 write_op.value.len(),
686 &self.mini_page_size_classes,
687 self.cache_only,
688 );
689 let mini_page_guard = self.storage.alloc_mini_page(mini_page_size)?;
690 LeafNode::initialize_mini_page(
691 &mini_page_guard,
692 mini_page_size,
693 MiniPageNextLevel::new_null(),
694 true,
695 );
696 let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
697 let mini_loc = PageLocation::Mini(new_mini_ptr);
698
699 leaf_entry.create_cache_page_loc(mini_loc);
700
701 let mini_page_ref = leaf_entry.load_cache_page_mut(new_mini_ptr);
702 let insert_success =
703 mini_page_ref.insert(write_op.key, write_op.value, write_op.op_type, 0);
704 assert!(insert_success);
705
706 debug_assert!(mini_page_ref.meta.meta_count_with_fence() > 0);
707 counter!(InsertCreatedMiniPage);
708 }
709 _ => {
710 leaf_entry.insert(
711 write_op.key,
712 write_op.value,
713 parent,
714 write_op.op_type,
715 &self.storage,
716 &self.write_load_full_page,
717 &self.cache_only,
718 &self.mini_page_size_classes,
719 )?;
720
721 if leaf_entry.cache_page_about_to_evict(&self.storage) {
722 _ = leaf_entry.move_cache_page_to_tail(&self.storage);
724 }
725
726 if let Some(wal) = &self.wal {
727 let lsn = wal.append_and_wait(&write_op, leaf_entry.get_disk_offset());
728 leaf_entry.update_lsn(lsn);
729 }
730 }
731 }
732
733 Ok(())
734 }
735
736 pub(crate) fn evict_from_circular_buffer(&self) -> Result<usize, TreeError> {
738 const TARGET_EVICT_SIZE: usize = 1024;
742 let mut evicted = 0;
743
744 let mut retry_cnt = 0;
746
747 while evicted < TARGET_EVICT_SIZE && retry_cnt < 10 {
748 let n = self
749 .storage
750 .evict_from_buffer(|mini_page_handle: &TombstoneHandle| {
751 eviction_callback(mini_page_handle, self)
752 })?;
753 evicted += n as usize;
754 retry_cnt += 1;
755 }
756 info!("stopped evict from circular buffer");
757 Ok(evicted)
758 }
759
760 pub fn insert(&self, key: &[u8], value: &[u8]) -> LeafInsertResult {
777 if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
779 return LeafInsertResult::InvalidKV(format!("Key too large {}", key.len()));
780 }
781
782 if key.is_empty() {
784 return LeafInsertResult::InvalidKV(format!(
785 "Key too small {}, at least one byte",
786 key.len()
787 ));
788 }
789
790 if value.len() > MAX_VALUE_LEN || key.len() + value.len() > self.config.cb_max_record_size {
792 return LeafInsertResult::InvalidKV(format!(
793 "Record too large {}, {}, please adjust cb_max_record_size in config",
794 key.len(),
795 value.len()
796 ));
797 }
798
799 if key.len() + value.len() < self.config.cb_min_record_size {
801 return LeafInsertResult::InvalidKV(format!(
802 "Record too small {}, {}, please adjust cb_min_record_size in config",
803 key.len(),
804 value.len()
805 ));
806 }
807
808 let backoff = Backoff::new();
809 let mut aggressive_split = false;
810
811 counter!(Insert);
812 info!(key_len = key.len(), value_len = value.len(), "insert");
813
814 loop {
815 let result = self.write_inner(WriteOp::make_insert(key, value), aggressive_split);
816 match result {
817 Ok(_) => return LeafInsertResult::Success,
818 Err(TreeError::NeedRestart) => {
819 #[cfg(all(feature = "shuttle", test))]
820 {
821 shuttle::thread::yield_now();
822 }
823 counter!(InsertNeedRestart);
824 aggressive_split = true;
825 }
826 Err(TreeError::CircularBufferFull) => {
827 info!("insert failed, started evict from circular buffer");
828 aggressive_split = true;
829 counter!(InsertCircularBufferFull);
830 _ = self.evict_from_circular_buffer();
831 continue;
832 }
833 Err(TreeError::Locked) => {
834 counter!(InsertLocked);
835 backoff.snooze();
836 }
837 }
838 }
839 }
840
841 pub fn read(&self, key: &[u8], out_buffer: &mut [u8]) -> LeafReadResult {
861 if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
863 return LeafReadResult::InvalidKey;
864 }
865
866 if key.is_empty() {
868 return LeafReadResult::InvalidKey;
869 }
870
871 let backoff = Backoff::new();
872
873 info!(key_len = key.len(), "read");
874 counter!(Read);
875 let mut aggressive_split = false;
876
877 #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
878 let mut debug_timer = DebugTimerGuard::new(Timer::Read, self.metrics_recorder.clone());
879
880 loop {
881 let result = self.read_inner(key, out_buffer, aggressive_split);
882 match result {
883 Ok(v) => {
884 #[cfg(any(
885 feature = "metrics-rt-debug-all",
886 feature = "metrics-rt-debug-timer"
887 ))]
888 debug_timer.end();
889
890 return v;
891 }
892 Err(TreeError::CircularBufferFull) => {
893 info!("read promotion failed, started evict from circular buffer");
894 aggressive_split = true;
895 match self.evict_from_circular_buffer() {
896 Ok(_) => continue,
897 Err(_) => continue,
898 };
899 }
900 Err(_) => {
901 backoff.spin();
902 aggressive_split = true;
903 }
904 }
905 }
906 }
907
908 pub fn delete(&self, key: &[u8]) {
922 if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
924 return;
925 }
926
927 if key.is_empty() {
929 return;
930 }
931
932 let backoff = Backoff::new();
933
934 info!(key_len = key.len(), "delete");
935
936 let mut aggressive_split = false;
937
938 loop {
939 let result = self.write_inner(WriteOp::make_delete(key), aggressive_split);
940 match result {
941 Ok(_) => return,
942 Err(TreeError::CircularBufferFull) => {
943 info!("delete failed, started evict from circular buffer");
944 aggressive_split = true;
945 match self.evict_from_circular_buffer() {
946 Ok(_) => continue,
947 Err(_) => continue,
948 };
949 }
950 Err(_) => {
951 aggressive_split = true;
952 backoff.spin();
953 }
954 }
955 }
956 }
957
958 pub fn scan_with_count<'a>(
961 &'a self,
962 key: &[u8],
963 cnt: usize,
964 return_field: ScanReturnField,
965 ) -> Result<ScanIter<'a, 'a>, ScanIterError> {
966 if self.cache_only {
968 return Err(ScanIterError::CacheOnlyMode);
969 }
970
971 if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
973 return Err(ScanIterError::InvalidStartKey);
974 }
975
976 if key.is_empty() {
978 return Err(ScanIterError::InvalidStartKey);
979 }
980
981 if cnt == 0 {
983 return Err(ScanIterError::InvalidCount);
984 }
985
986 Ok(ScanIter::new_with_scan_count(self, key, cnt, return_field))
987 }
988
989 pub fn scan_with_end_key<'a>(
990 &'a self,
991 start_key: &[u8],
992 end_key: &[u8],
993 return_field: ScanReturnField,
994 ) -> Result<ScanIter<'a, 'a>, ScanIterError> {
995 if self.cache_only {
997 return Err(ScanIterError::CacheOnlyMode);
998 }
999
1000 if start_key.len() > self.config.max_fence_len / 2 || start_key.len() > MAX_KEY_LEN {
1002 return Err(ScanIterError::InvalidStartKey);
1003 }
1004
1005 if start_key.is_empty() {
1007 return Err(ScanIterError::InvalidStartKey);
1008 }
1009
1010 if end_key.len() > self.config.max_fence_len / 2 || end_key.len() > MAX_KEY_LEN {
1012 return Err(ScanIterError::InvalidEndKey);
1013 }
1014
1015 if end_key.is_empty() {
1017 return Err(ScanIterError::InvalidEndKey);
1018 }
1019
1020 let cmp = start_key.cmp(end_key);
1022 if cmp == std::cmp::Ordering::Greater {
1023 return Err(ScanIterError::InvalidKeyRange);
1024 }
1025
1026 Ok(ScanIter::new_with_end_key(
1027 self,
1028 start_key,
1029 end_key,
1030 return_field,
1031 ))
1032 }
1033
1034 #[doc(hidden)]
1035 pub fn scan_mut_with_count<'a>(
1036 &'a self,
1037 key: &'a [u8],
1038 cnt: usize,
1039 return_field: ScanReturnField,
1040 ) -> Result<ScanIterMut<'a, 'a>, ScanIterError> {
1041 if self.cache_only {
1043 return Err(ScanIterError::CacheOnlyMode);
1044 }
1045
1046 if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
1048 return Err(ScanIterError::InvalidStartKey);
1049 }
1050
1051 if cnt == 0 {
1053 return Err(ScanIterError::InvalidCount);
1054 }
1055
1056 Ok(ScanIterMut::new_with_scan_count(
1057 self,
1058 key,
1059 cnt,
1060 return_field,
1061 ))
1062 }
1063
1064 #[doc(hidden)]
1065 pub fn scan_mut_with_end_key<'a>(
1066 &'a self,
1067 start_key: &'a [u8],
1068 end_key: &'a [u8],
1069 return_field: ScanReturnField,
1070 ) -> Result<ScanIterMut<'a, 'a>, ScanIterError> {
1071 if self.cache_only {
1073 return Err(ScanIterError::CacheOnlyMode);
1074 }
1075
1076 if start_key.len() > self.config.max_fence_len / 2 || start_key.len() > MAX_KEY_LEN {
1078 return Err(ScanIterError::InvalidStartKey);
1079 }
1080
1081 if end_key.len() > self.config.max_fence_len / 2 || end_key.len() > MAX_KEY_LEN {
1083 return Err(ScanIterError::InvalidEndKey);
1084 }
1085
1086 Ok(ScanIterMut::new_with_end_key(
1087 self,
1088 start_key,
1089 end_key,
1090 return_field,
1091 ))
1092 }
1093
1094 fn read_inner(
1095 &self,
1096 key: &[u8],
1097 out_buffer: &mut [u8],
1098 aggressive_split: bool,
1099 ) -> Result<LeafReadResult, TreeError> {
1100 let (node, parent) = self.traverse_to_leaf(key, aggressive_split)?;
1101
1102 let mut leaf = self.mapping_table().get(&node);
1103
1104 check_parent!(self, node, parent);
1105
1106 let out = leaf.read(
1107 key,
1108 out_buffer,
1109 self.config.mini_page_binary_search,
1110 self.cache_only,
1111 );
1112 match out {
1113 ReadResult::Mini(r) | ReadResult::Full(r) => {
1114 if leaf.cache_page_about_to_evict(&self.storage) {
1115 let mut x_leaf = match leaf.try_upgrade() {
1116 Ok(v) => v,
1117 Err(_) => return Ok(r),
1118 };
1119 _ = x_leaf.move_cache_page_to_tail(&self.storage);
1121 }
1122
1123 Ok(r)
1124 }
1125
1126 ReadResult::Base(r) => {
1127 counter!(BasePageRead);
1128
1129 if self.cache_only {
1131 panic!("Attempt to read a base page while in cache-only mode.");
1132 }
1133
1134 let v = match r {
1135 LeafReadResult::Found(v) => v,
1136 _ => return Ok(r),
1137 };
1138
1139 if parent.is_none() || !self.should_promote_read() {
1140 return Ok(r);
1141 }
1142
1143 let mut x_leaf = match leaf.try_upgrade() {
1144 Ok(x) => x,
1145 Err(_) => {
1146 return Ok(r);
1147 }
1148 };
1149
1150 if self.config.read_record_cache {
1151 let out = x_leaf.insert(
1155 key,
1156 &out_buffer[0..v as usize],
1157 parent,
1158 OpType::Cache,
1159 &self.storage,
1160 &self.write_load_full_page,
1161 &self.cache_only,
1162 &self.mini_page_size_classes,
1163 );
1164
1165 match out {
1166 Ok(_) => {
1167 counter!(ReadPromotionOk);
1168 Ok(r)
1169 }
1170 Err(TreeError::Locked) => {
1171 counter!(ReadPromotionFailed);
1173 Ok(r)
1174 }
1175 Err(TreeError::CircularBufferFull) => {
1176 counter!(ReadPromotionFailed);
1177 Err(TreeError::CircularBufferFull)
1178 }
1179 Err(TreeError::NeedRestart) => {
1180 counter!(ReadPromotionFailed);
1182 Err(TreeError::NeedRestart)
1183 }
1184 }
1185 } else {
1186 match self.upgrade_to_full_page(x_leaf, parent.unwrap()) {
1187 Ok(_) | Err(TreeError::Locked) => Ok(r),
1188 Err(e) => Err(e),
1189 }
1190 }
1191 }
1192 ReadResult::None => Ok(LeafReadResult::NotFound),
1193 }
1194 }
1195
1196 fn upgrade_to_full_page<'a>(
1197 &'a self,
1198 mut x_leaf: LeafEntryXLocked<'a>,
1199 parent: ReadGuard<'a>,
1200 ) -> Result<LeafEntryXLocked<'a>, TreeError> {
1201 let page_loc = x_leaf.get_page_location().clone();
1202 match page_loc {
1203 PageLocation::Mini(ptr) => {
1204 let mini_page = x_leaf.load_cache_page_mut(ptr);
1205 let h = self.storage.begin_dealloc_mini_page(mini_page)?;
1206 let _merge_result = x_leaf.try_merge_mini_page(&h, parent, &self.storage)?;
1207 let base_offset = mini_page.next_level;
1208 x_leaf.change_to_base_loc();
1209 self.storage.finish_dealloc_mini_page(h);
1210
1211 let base_page_ref = x_leaf.load_base_page_from_buffer();
1212 let full_page_loc =
1213 upgrade_to_full_page(&self.storage, base_page_ref, base_offset)?;
1214 x_leaf.create_cache_page_loc(full_page_loc);
1215 Ok(x_leaf)
1216 }
1217 PageLocation::Full(_ptr) => Ok(x_leaf),
1218 PageLocation::Base(offset) => {
1219 let base_page_ref = x_leaf.load_base_page(offset);
1220 let next_level = MiniPageNextLevel::new(offset);
1221 let full_page_loc = upgrade_to_full_page(&self.storage, base_page_ref, next_level)?;
1222 x_leaf.create_cache_page_loc(full_page_loc);
1223 Ok(x_leaf)
1224 }
1225 PageLocation::Null => panic!("upgrade_to_full_page on Null page"),
1226 }
1227 }
1228
1229 pub fn get_metrics(&mut self) -> Option<serde_json::Value> {
1232 #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
1233 {
1234 let recorder = self.metrics_recorder.take();
1235 match recorder {
1236 Some(r) => {
1237 let recorders = Arc::try_unwrap(r).expect("Failed to obtain the recorders of bf-tree, please make sure no other references exist.");
1238 let mut timer_accumulated = TimerRecorder::default();
1239
1240 for r in recorders {
1242 let t = unsafe { &*r.get() };
1243
1244 timer_accumulated += t.timers.clone();
1245 }
1246
1247 let output = serde_json::json!({
1248 "Timers": timer_accumulated,
1249 });
1250
1251 self.metrics_recorder = Some(Arc::new(ThreadLocal::new()));
1252
1253 Some(output)
1254 }
1255 None => None,
1256 }
1257 }
1258 #[cfg(not(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer")))]
1259 {
1260 None
1261 }
1262 }
1263}
1264
1265pub(crate) fn key_value_physical_size(key: &[u8], value: &[u8]) -> usize {
1266 let key_size = key.len();
1267 let value_size = value.len();
1268 let meta_size = crate::nodes::KV_META_SIZE;
1269 key_size + value_size + meta_size
1270}
1271
1272pub(crate) fn eviction_callback(
1273 mini_page_handle: &TombstoneHandle,
1274 tree: &BfTree,
1275) -> Result<(), TreeError> {
1276 let mini_page = mini_page_handle.ptr as *mut LeafNode;
1277 let key_to_this_page = if tree.cache_only {
1278 unsafe { &*mini_page }.try_get_key_to_reach_this_node()?
1279 } else {
1280 unsafe { &*mini_page }.get_key_to_reach_this_node()
1281 };
1282
1283 let (pid, parent) = tree.traverse_to_leaf(&key_to_this_page, true)?;
1285 info!(
1286 pid = pid.raw(),
1287 "starting to merge mini page in eviction call back"
1288 );
1289
1290 let mut leaf_entry = tree.mapping_table().get_mut(&pid);
1291
1292 histogram!(EvictNodeSize, unsafe { &*mini_page }.meta.node_size as u64);
1293
1294 match leaf_entry.get_page_location() {
1295 PageLocation::Mini(ptr) => {
1296 {
1297 if *ptr != mini_page {
1304 return Err(TreeError::NeedRestart);
1305 }
1306 }
1307
1308 let parent = parent.expect("Mini page must have a parent");
1309 parent.check_version()?;
1310
1311 if tree.cache_only {
1314 leaf_entry.change_to_null_loc();
1315 } else {
1316 leaf_entry.try_merge_mini_page(mini_page_handle, parent, &tree.storage)?;
1317 leaf_entry.change_to_base_loc();
1318 }
1320
1321 Ok(())
1322 }
1323
1324 PageLocation::Full(ptr) => {
1325 if *ptr != mini_page {
1326 return Err(TreeError::NeedRestart);
1327 }
1328
1329 leaf_entry.merge_full_page(mini_page_handle);
1330 Ok(())
1331 }
1332
1333 PageLocation::Base(_offset) => Err(TreeError::NeedRestart),
1335
1336 PageLocation::Null => Err(TreeError::NeedRestart),
1338 }
1339}
1340
1341#[cfg(test)]
1342mod tests {
1343 use crate::error::ConfigError;
1344 use crate::BfTree;
1345
1346 #[test]
1347 fn test_mini_page_size_classes() {
1348 let mut size_classes = BfTree::create_mem_page_size_classes(48, 1952, 4096, 64, false);
1349 assert_eq!(
1350 size_classes,
1351 vec![128, 192, 256, 512, 960, 1856, 2048, 4096]
1352 );
1353
1354 size_classes = BfTree::create_mem_page_size_classes(1548, 1548, 3136, 64, true);
1355 assert_eq!(size_classes, vec![1536, 3136]);
1356
1357 size_classes = BfTree::create_mem_page_size_classes(48, 3072, 12288, 64, false);
1358 assert_eq!(
1359 size_classes,
1360 vec![128, 192, 256, 512, 960, 1856, 3648, 7232, 9088, 12288]
1361 );
1362
1363 size_classes = BfTree::create_mem_page_size_classes(4, 1952, 4096, 32, false);
1364 assert_eq!(size_classes, vec![64, 128, 256, 448, 832, 1600, 2048, 4096]);
1365 }
1366
1367 #[test]
1368 fn test_invalid_config_to_build_bf_tree() {
1369 let mut config = crate::Config::default();
1371 config.cb_min_record_size(4);
1372 config.leaf_page_size(32 * 1024);
1373
1374 if let Err(e) = BfTree::with_config(config.clone(), None) {
1375 match e {
1376 ConfigError::MinimumRecordSize(_) => {}
1377 _ => panic!("Expected InvalidMinimumRecordSize error"),
1378 }
1379 } else {
1380 panic!("Expected error but got Ok");
1381 }
1382
1383 config = crate::Config::default();
1385 config.cb_max_record_size(64 * 1024);
1386
1387 if let Err(e) = BfTree::with_config(config.clone(), None) {
1388 match e {
1389 ConfigError::MaximumRecordSize(_) => {}
1390 _ => panic!("Expected InvalidMaximumRecordSize error"),
1391 }
1392 } else {
1393 panic!("Expected error but got Ok");
1394 }
1395
1396 config = crate::Config::default();
1398 config.leaf_page_size(4050);
1399
1400 if let Err(e) = BfTree::with_config(config.clone(), None) {
1401 match e {
1402 ConfigError::LeafPageSize(_) => {}
1403 _ => panic!("Expected InvalidLeafPageSize error"),
1404 }
1405 } else {
1406 panic!("Expected error but got Ok");
1407 }
1408
1409 config = crate::Config::default();
1411 config.leaf_page_size(16 * 1024);
1412 config.cb_size_byte(16 * 1024);
1413
1414 if let Err(e) = BfTree::with_config(config.clone(), None) {
1415 match e {
1416 ConfigError::CircularBufferSize(_) => {}
1417 _ => panic!("Expected InvalidCircularBufferSize error"),
1418 }
1419 } else {
1420 panic!("Expected error but got Ok");
1421 }
1422
1423 config = crate::Config::default();
1425 config.cb_size_byte(20 * 1024);
1426 if let Err(e) = BfTree::with_config(config.clone(), None) {
1427 match e {
1428 ConfigError::CircularBufferSize(_) => {}
1429 _ => panic!("Expected InvalidCircularBufferSize error"),
1430 }
1431 } else {
1432 panic!("Expected error but got Ok");
1433 }
1434
1435 config = crate::Config::default();
1437 config.cache_only(true);
1438 config.cb_size_byte(2 * 4096);
1439
1440 if let Err(e) = BfTree::with_config(config.clone(), None) {
1441 match e {
1442 ConfigError::CircularBufferSize(_) => {}
1443 _ => panic!("Expected InvalidCircularBufferSize error"),
1444 }
1445 } else {
1446 panic!("Expected error but got Ok");
1447 }
1448 }
1449}