1#[cfg(not(all(feature = "shuttle", test)))]
5use rand::Rng;
6
7#[cfg(all(feature = "shuttle", test))]
8use shuttle::rand::Rng;
9
10use cfg_if::cfg_if;
11
12cfg_if! {
13 if #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))] {
14 use crate::metric::{Timer, TlsRecorder, timer::{TimerRecorder, DebugTimerGuard}};
15 use std::cell::UnsafeCell;
16 use thread_local::ThreadLocal;
17 }
18}
19
20use crate::{
21 check_parent,
22 circular_buffer::{CircularBufferMetrics, TombstoneHandle},
23 counter,
24 error::{ConfigError, TreeError},
25 histogram, info,
26 mini_page_op::{upgrade_to_full_page, LeafEntryXLocked, LeafOperations, ReadResult},
27 nodes::{
28 leaf_node::{LeafKVMeta, LeafReadResult, MiniPageNextLevel, OpType},
29 InnerNode, InnerNodeBuilder, LeafNode, PageID, CACHE_LINE_SIZE, DISK_PAGE_SIZE,
30 INNER_NODE_SIZE, MAX_KEY_LEN, MAX_LEAF_PAGE_SIZE, MAX_VALUE_LEN,
31 },
32 range_scan::{ScanIter, ScanIterMut, ScanReturnField},
33 storage::{LeafStorage, PageLocation, PageTable},
34 sync::{
35 atomic::{AtomicU64, Ordering},
36 Arc,
37 },
38 utils::{get_rng, inner_lock::ReadGuard, Backoff, BfsVisitor, NodeInfo},
39 wal::{WriteAheadLog, WriteOp},
40 Config, StorageBackend,
41};
42use std::path::Path;
43
44pub struct BfTree {
46 pub(crate) root_page_id: AtomicU64,
47 pub(crate) storage: LeafStorage,
48 pub(crate) wal: Option<Arc<WriteAheadLog>>,
49 pub(crate) config: Arc<Config>,
50 pub(crate) write_load_full_page: bool,
51 pub(crate) cache_only: bool, pub(crate) mini_page_size_classes: Vec<usize>, #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
54 pub metrics_recorder: Option<Arc<ThreadLocal<UnsafeCell<TlsRecorder>>>>, }
56
57unsafe impl Sync for BfTree {}
58
59unsafe impl Send for BfTree {}
60
61#[derive(Debug, PartialEq, Eq, Clone)]
62pub enum LeafInsertResult {
63 Success,
64 InvalidKV(String),
65}
66
67#[derive(Debug, PartialEq, Eq, Clone)]
68pub enum ScanIterError {
69 CacheOnlyMode,
70 InvalidStartKey,
71 InvalidEndKey,
72 InvalidCount,
73 InvalidKeyRange,
74}
75
76impl Drop for BfTree {
77 fn drop(&mut self) {
78 if let Some(ref wal) = self.wal {
79 wal.stop_background_job();
80 }
81
82 let visitor = BfsVisitor::new_all_nodes(self);
83 for node_info in visitor {
84 match node_info {
85 NodeInfo::Leaf { page_id, .. } => {
86 let mut leaf = self.mapping_table().get_mut(&page_id);
87 leaf.dealloc_self(&self.storage, self.cache_only);
88 }
89 NodeInfo::Inner { ptr, .. } => {
90 if unsafe { &*ptr }.is_valid_disk_offset() {
91 let disk_offset = unsafe { &*ptr }.disk_offset;
92 if self.config.storage_backend == StorageBackend::Memory {
93 self.storage.vfs.dealloc_offset(disk_offset as usize);
95 }
96 }
97 InnerNode::free_node(ptr as *mut InnerNode);
98 }
99 }
100 }
101 }
102}
103
104impl Default for BfTree {
105 fn default() -> Self {
106 Self::new(":memory:", 1024 * 1024 * 32).unwrap()
107 }
108}
109
110impl BfTree {
111 pub(crate) const ROOT_IS_LEAF_MASK: u64 = 0x8000_0000_0000_0000; pub(crate) fn create_mem_page_size_classes(
133 min_record_size_in_byte: usize,
134 max_record_size_in_byte: usize,
135 leaf_page_size_in_byte: usize,
136 max_fence_len_in_byte: usize,
137 cache_only: bool,
138 ) -> Vec<usize> {
139 assert!(
141 min_record_size_in_byte > 1,
142 "cb_min_record_size in config cannot be less than 2"
143 );
144 assert!(
145 min_record_size_in_byte <= max_record_size_in_byte,
146 "cb_min_record_size cannot be larger than cb_max_record_size"
147 );
148 assert!(
149 max_fence_len_in_byte > 0,
150 "max_fence_len in config cannot be zero"
151 );
152 assert!(
153 max_fence_len_in_byte / 2 < max_record_size_in_byte,
154 "max_fence_len/2 cannot be larger than cb_max_record_size"
155 );
156 assert!(
157 leaf_page_size_in_byte <= MAX_LEAF_PAGE_SIZE,
158 "leaf_page_size in config cannot be larger than {}",
159 MAX_LEAF_PAGE_SIZE
160 );
161 assert!(
162 max_fence_len_in_byte / 2 <= MAX_KEY_LEN,
163 "max_key_len in config cannot be larger than {}",
164 MAX_KEY_LEN
165 );
166 assert!(
167 leaf_page_size_in_byte / min_record_size_in_byte <= 4096,
168 "Maximum number of records per page (leaf_page_size/min_record_size) cannot exceed 2^12.", );
170
171 if !cache_only {
174 assert!(
175 leaf_page_size_in_byte.is_multiple_of(DISK_PAGE_SIZE),
176 "leaf_page_size in config should be multiple of {}",
177 DISK_PAGE_SIZE
178 );
179 } else {
180 assert!(
181 leaf_page_size_in_byte.is_multiple_of(CACHE_LINE_SIZE),
182 "leaf_page_size in config should be multiple of {}",
183 CACHE_LINE_SIZE
184 );
185 }
186
187 let max_record_size_with_meta = max_record_size_in_byte + std::mem::size_of::<LeafKVMeta>();
188 let mut max_mini_page_size: usize;
189
190 if cache_only {
191 max_mini_page_size = leaf_page_size_in_byte - max_record_size_with_meta;
193 max_mini_page_size = (max_mini_page_size / CACHE_LINE_SIZE) * CACHE_LINE_SIZE;
194
195 assert!(
196 leaf_page_size_in_byte
197 >= 2 * max_record_size_with_meta + std::mem::size_of::<LeafNode>(),
198 "cb_max_record_size of config should be <= {}",
199 (leaf_page_size_in_byte - std::mem::size_of::<LeafNode>()) / 2
200 - std::mem::size_of::<LeafKVMeta>()
201 );
202 } else {
203 max_mini_page_size = leaf_page_size_in_byte
205 - max_record_size_with_meta
206 - max_fence_len_in_byte
207 - 2 * std::mem::size_of::<LeafKVMeta>();
208 max_mini_page_size = (max_mini_page_size / CACHE_LINE_SIZE) * CACHE_LINE_SIZE;
209
210 assert!(
211 max_mini_page_size >= max_record_size_with_meta + std::mem::size_of::<LeafNode>(),
212 "cb_max_record_size of config should be <= {}",
213 max_mini_page_size
214 - std::mem::size_of::<LeafNode>()
215 - std::mem::size_of::<LeafKVMeta>()
216 );
217 }
218
219 let mut mem_page_size_classes = Vec::new();
221
222 let base: i32 = 2;
223 let mut record_num_per_page_exp: u32 = 0;
224 let c: usize = min_record_size_in_byte + std::mem::size_of::<LeafKVMeta>();
225
226 let mut size_class =
228 base.pow(record_num_per_page_exp) as usize * c + std::mem::size_of::<LeafNode>();
229
230 if !size_class.is_multiple_of(CACHE_LINE_SIZE) {
232 size_class = (size_class / CACHE_LINE_SIZE + 1) * CACHE_LINE_SIZE;
233 }
234
235 while size_class <= max_mini_page_size {
236 if mem_page_size_classes.is_empty()
237 || (mem_page_size_classes[mem_page_size_classes.len() - 1] < size_class)
238 {
239 mem_page_size_classes.push(size_class);
240 }
241
242 record_num_per_page_exp += 1;
243 size_class =
244 base.pow(record_num_per_page_exp) as usize * c + std::mem::size_of::<LeafNode>();
245
246 if !size_class.is_multiple_of(CACHE_LINE_SIZE) {
247 size_class = (size_class / CACHE_LINE_SIZE + 1) * CACHE_LINE_SIZE;
248 }
249 }
250
251 if !cache_only {
252 assert!(!mem_page_size_classes.is_empty());
253 }
254
255 if mem_page_size_classes.is_empty()
257 || mem_page_size_classes[mem_page_size_classes.len() - 1] < max_mini_page_size
258 {
259 mem_page_size_classes.push(max_mini_page_size);
260 }
261
262 if mem_page_size_classes.is_empty()
264 || mem_page_size_classes[mem_page_size_classes.len() - 1] < leaf_page_size_in_byte
265 {
266 mem_page_size_classes.push(leaf_page_size_in_byte);
267 }
268
269 if !cache_only {
270 assert!(mem_page_size_classes.len() >= 2);
271 } else {
272 assert!(!mem_page_size_classes.is_empty());
273 }
274
275 mem_page_size_classes
276 }
277
278 pub fn new(file_path: impl AsRef<Path>, cache_size_byte: usize) -> Result<Self, ConfigError> {
292 let config = Config::new(file_path, cache_size_byte);
293 Self::with_config(config, None)
294 }
295
296 pub fn new_with_config_file<P: AsRef<Path>>(config_file_path: P) -> Result<Self, ConfigError> {
299 let config = Config::new_with_config_file(config_file_path);
300 Self::with_config(config, None)
301 }
302
303 pub fn with_config(config: Config, buffer_ptr: Option<*mut u8>) -> Result<Self, ConfigError> {
306 config.validate()?;
308
309 let wal = match config.write_ahead_log.as_ref() {
310 Some(wal_config) => {
311 let wal = WriteAheadLog::new(wal_config.clone());
312 Some(wal)
313 }
314 None => None,
315 };
316 let write_load_full = config.write_load_full_page;
317 let config = Arc::new(config);
318
319 if config.cache_only {
321 let leaf_storage = LeafStorage::new(config.clone(), buffer_ptr);
322
323 let mini_page_guard = (leaf_storage)
325 .alloc_mini_page(config.leaf_page_size)
326 .expect("Fail to allocate a mini-page as initial root node");
327 LeafNode::initialize_mini_page(
328 &mini_page_guard,
329 config.leaf_page_size,
330 MiniPageNextLevel::new_null(),
331 true,
332 );
333 let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
334 let mini_loc = PageLocation::Mini(new_mini_ptr);
335
336 let (root_id, root_lock) = leaf_storage
337 .mapping_table()
338 .insert_mini_page_mapping(mini_loc);
339 assert_eq!(root_id.as_id(), 0);
340
341 drop(root_lock);
342 drop(mini_page_guard);
343 let root_id = root_id.raw() | Self::ROOT_IS_LEAF_MASK;
344
345 return Ok(Self {
346 root_page_id: AtomicU64::new(root_id),
347 storage: leaf_storage,
348 wal,
349 cache_only: config.cache_only,
350 write_load_full_page: write_load_full,
351 mini_page_size_classes: Self::create_mem_page_size_classes(
352 config.cb_min_record_size,
353 config.cb_max_record_size,
354 config.leaf_page_size,
355 config.max_fence_len,
356 config.cache_only,
357 ),
358 config,
359 #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
360 metrics_recorder: Some(Arc::new(ThreadLocal::new())),
361 });
362 }
363
364 let leaf_storage = LeafStorage::new(config.clone(), buffer_ptr);
365 let (root_id, root_lock) = leaf_storage.mapping_table().alloc_base_page_mapping();
366 drop(root_lock);
367 assert_eq!(root_id.as_id(), 0);
368
369 let root_id = root_id.raw() | Self::ROOT_IS_LEAF_MASK;
370 Ok(Self {
371 root_page_id: AtomicU64::new(root_id),
372 storage: leaf_storage,
373 wal,
374 cache_only: config.cache_only,
375 write_load_full_page: write_load_full,
376 mini_page_size_classes: Self::create_mem_page_size_classes(
377 config.cb_min_record_size,
378 config.cb_max_record_size,
379 config.leaf_page_size,
380 config.max_fence_len,
381 config.cache_only,
382 ),
383 config,
384 #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
385 metrics_recorder: Some(Arc::new(ThreadLocal::new())),
386 })
387 }
388
389 pub fn config(&self) -> &Config {
390 &self.config
391 }
392
393 pub fn get_buffer_metrics(&self) -> CircularBufferMetrics {
397 self.storage.get_buffer_metrics()
398 }
399
400 pub(crate) fn get_root_page(&self) -> (PageID, bool) {
402 let root_page_id = self.root_page_id.load(Ordering::Acquire);
403 let root_is_leaf = (root_page_id & Self::ROOT_IS_LEAF_MASK) != 0;
404 let clean = root_page_id & (!Self::ROOT_IS_LEAF_MASK);
405
406 let page_id = if root_is_leaf {
407 PageID::from_id(clean)
408 } else {
409 PageID::from_pointer(clean as *const InnerNode)
410 };
411
412 (page_id, root_is_leaf)
413 }
414
415 pub(crate) fn mapping_table(&self) -> &PageTable {
416 self.storage.mapping_table()
417 }
418
419 pub(crate) fn should_promote_read(&self) -> bool {
420 get_rng().gen_range(0..100) < self.config.read_promotion_rate.load(Ordering::Relaxed)
421 }
422
423 pub(crate) fn should_promote_scan_page(&self) -> bool {
424 get_rng().gen_range(0..100) < self.config.scan_promotion_rate.load(Ordering::Relaxed)
425 }
426
427 pub fn update_read_promotion_rate(&self, new_rate: usize) {
429 self.config
430 .read_promotion_rate
431 .store(new_rate, Ordering::Relaxed);
432 }
433
434 fn try_split_leaf(
435 &self,
436 cur_page_id: PageID,
437 parent: &Option<ReadGuard>,
438 ) -> Result<bool, TreeError> {
439 debug_assert!(cur_page_id.is_id());
440
441 let mut cur_page = self.mapping_table().get_mut(&cur_page_id);
445
446 check_parent!(self, cur_page_id, parent);
447
448 let should_split = cur_page.get_split_flag();
449 if !should_split {
450 return Ok(false);
451 }
452 match parent {
453 Some(_) => {
454 unreachable!("Leaf node split should not happen here");
455 }
456 None => {
457 if self.cache_only {
461 let mini_page_guard = self
464 .storage
465 .alloc_mini_page(self.config.leaf_page_size)
466 .expect("Fail to allocate a mini-page during root split");
467 LeafNode::initialize_mini_page(
468 &mini_page_guard,
469 self.config.leaf_page_size,
470 MiniPageNextLevel::new_null(),
471 true,
472 );
473 let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
474 let mini_loc = PageLocation::Mini(new_mini_ptr);
475
476 let (sibling_id, _mini_lock) = self
478 .storage
479 .mapping_table()
480 .insert_mini_page_mapping(mini_loc);
481
482 let cur_page_loc = cur_page.get_page_location().clone();
484 match cur_page_loc {
485 PageLocation::Mini(ptr) => {
486 let cur_mini_page = cur_page.load_cache_page_mut(ptr);
487 let sibling_page = unsafe { &mut *new_mini_ptr };
488 let split_key = cur_mini_page.split(sibling_page, true);
489
490 let mut new_root_builder = InnerNodeBuilder::new();
491 new_root_builder
492 .set_left_most_page_id(cur_page_id)
493 .set_children_is_leaf(true)
494 .add_record(split_key, sibling_id);
495
496 let new_root_ptr = new_root_builder.build();
497
498 self.root_page_id
499 .store(PageID::from_pointer(new_root_ptr).raw(), Ordering::Release);
500
501 info!(sibling = sibling_id.raw(), "New root node installed!");
502
503 debug_assert!(cur_mini_page.meta.meta_count_with_fence() > 0);
504 debug_assert!(sibling_page.meta.meta_count_with_fence() > 0);
505
506 return Ok(true);
507 }
508 _ => {
509 panic!("The root node is not a mini-page in cache-only mode")
510 }
511 }
512 }
513
514 let mut x_page = cur_page;
515
516 let (sibling_id, mut sibling_entry) = self.alloc_base_page_and_lock();
517
518 info!(sibling = sibling_id.raw(), "Splitting root node!");
519
520 let sibling = sibling_entry.load_base_page_mut();
521
522 let leaf_node = x_page.load_base_page_mut();
523 let split_key = leaf_node.split(sibling, false);
524
525 let mut new_root_builder = InnerNodeBuilder::new();
526 new_root_builder
527 .set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE))
528 .set_left_most_page_id(cur_page_id)
529 .set_children_is_leaf(true)
530 .add_record(split_key, sibling_id);
531
532 let new_root_ptr = new_root_builder.build();
533
534 self.root_page_id
535 .store(PageID::from_pointer(new_root_ptr).raw(), Ordering::Release);
536
537 info!(sibling = sibling_id.raw(), "New root node installed!");
538 Ok(true)
539 }
540 }
541 }
542
543 fn alloc_base_page_and_lock(&self) -> (PageID, LeafEntryXLocked<'_>) {
544 let (pid, base_entry) = self.mapping_table().alloc_base_page_mapping();
545
546 (pid, base_entry)
547 }
548
549 fn try_split_inner<'a>(
550 &self,
551 cur_page: PageID,
552 parent: Option<ReadGuard<'a>>,
553 ) -> Result<(bool, Option<ReadGuard<'a>>), TreeError> {
554 let cur_node = ReadGuard::try_read(cur_page.as_inner_node())?;
555
556 check_parent!(self, cur_page, parent);
557
558 let should_split = cur_node.as_ref().meta.get_split_flag();
559
560 if !should_split {
561 return Ok((false, parent));
562 }
563
564 info!(has_parent = parent.is_some(), "split inner node");
565
566 match parent {
567 Some(p) => {
568 let mut x_cur = cur_node.upgrade().map_err(|(_x, e)| e)?;
569 let mut x_parent = p.upgrade().map_err(|(_x, e)| e)?;
570
571 let split_key = x_cur.as_ref().get_split_key();
572
573 let mut sibling_builder = InnerNodeBuilder::new();
574 sibling_builder.set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE));
575
576 let success = x_parent
577 .as_mut()
578 .insert(&split_key, sibling_builder.get_page_id());
579 if !success {
580 x_parent.as_mut().meta.set_split_flag();
581 return Err(TreeError::NeedRestart);
582 }
583
584 x_cur.as_mut().split(&mut sibling_builder);
585
586 sibling_builder.build();
587
588 Ok((true, Some(x_parent.downgrade())))
589 }
590 None => {
591 let mut x_cur = cur_node.upgrade().map_err(|(_x, e)| e)?;
592
593 let mut sibling_builder = InnerNodeBuilder::new();
594 sibling_builder.set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE));
595 let sibling_id = sibling_builder.get_page_id();
596
597 let split_key = x_cur.as_mut().split(&mut sibling_builder);
598
599 let mut new_root_builder = InnerNodeBuilder::new();
600 new_root_builder
601 .set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE))
602 .set_left_most_page_id(cur_page)
603 .set_children_is_leaf(false)
604 .add_record(split_key, sibling_id);
605 sibling_builder.build();
606 let new_root_ptr = new_root_builder.build();
607 let _x_root = ReadGuard::try_read(new_root_ptr)
608 .unwrap()
609 .upgrade()
610 .unwrap();
611 self.root_page_id
612 .store(PageID::from_pointer(new_root_ptr).raw(), Ordering::Release);
613
614 info!(
615 has_parent = parent.is_some(),
616 cur = cur_page.raw(),
617 "finished split inner node"
618 );
619
620 Ok((true, parent))
621 }
622 }
623 }
624
625 pub(crate) fn traverse_to_leaf(
626 &self,
627 key: &[u8],
628 aggressive_split: bool,
629 ) -> Result<(PageID, Option<ReadGuard<'_>>), TreeError> {
630 let (mut cur_page, mut cur_is_leaf) = self.get_root_page();
631 let mut parent: Option<ReadGuard> = None;
632
633 loop {
634 if aggressive_split {
635 if cur_is_leaf
636 && !cur_page.is_inner_node_pointer()
637 && self.try_split_leaf(cur_page, &parent)?
638 {
639 return Err(TreeError::NeedRestart);
640 } else if !cur_is_leaf {
641 let (split_success, new_parent) = self.try_split_inner(cur_page, parent)?;
642 if split_success {
643 return Err(TreeError::NeedRestart);
644 } else {
645 parent = new_parent;
646 }
647 }
648 }
649
650 if cur_is_leaf {
651 return Ok((cur_page, parent));
652 } else {
653 let next = ReadGuard::try_read(cur_page.as_inner_node())?;
654
655 check_parent!(self, cur_page, parent);
656
657 let next_node = next.as_ref();
658 let next_is_leaf = next_node.meta.children_is_leaf();
659 let pos = next_node.lower_bound(key);
660 let kv_meta = next_node.get_kv_meta(pos as u16);
661 cur_page = next_node.get_value(kv_meta);
662 cur_is_leaf = next_is_leaf;
663 parent = Some(next);
664 }
665 }
666 }
667
668 fn write_inner(&self, write_op: WriteOp, aggressive_split: bool) -> Result<(), TreeError> {
669 let (pid, parent) = self.traverse_to_leaf(write_op.key, aggressive_split)?;
670
671 let mut leaf_entry = self.mapping_table().get_mut(&pid);
672
673 check_parent!(self, pid, parent);
674
675 let page_loc = leaf_entry.get_page_location();
676 match page_loc {
677 PageLocation::Null => {
678 if !self.cache_only {
679 panic!("Found an Null page in non cache-only mode");
680 }
681
682 if write_op.op_type == OpType::Delete {
683 return Ok(());
684 }
685
686 let mini_page_size = LeafNode::get_chain_size_hint(
688 write_op.key.len(),
689 write_op.value.len(),
690 &self.mini_page_size_classes,
691 self.cache_only,
692 );
693 let mini_page_guard = self.storage.alloc_mini_page(mini_page_size)?;
694 LeafNode::initialize_mini_page(
695 &mini_page_guard,
696 mini_page_size,
697 MiniPageNextLevel::new_null(),
698 true,
699 );
700 let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
701 let mini_loc = PageLocation::Mini(new_mini_ptr);
702
703 leaf_entry.create_cache_page_loc(mini_loc);
704
705 let mini_page_ref = leaf_entry.load_cache_page_mut(new_mini_ptr);
706 let insert_success =
707 mini_page_ref.insert(write_op.key, write_op.value, write_op.op_type, 0);
708 assert!(insert_success);
709
710 debug_assert!(mini_page_ref.meta.meta_count_with_fence() > 0);
711 counter!(InsertCreatedMiniPage);
712 }
713 _ => {
714 leaf_entry.insert(
715 write_op.key,
716 write_op.value,
717 parent,
718 write_op.op_type,
719 &self.storage,
720 &self.write_load_full_page,
721 &self.cache_only,
722 &self.mini_page_size_classes,
723 )?;
724
725 if leaf_entry.cache_page_about_to_evict(&self.storage) {
726 _ = leaf_entry.move_cache_page_to_tail(&self.storage);
728 }
729
730 if let Some(wal) = &self.wal {
731 let lsn = wal.append_and_wait(&write_op, leaf_entry.get_disk_offset());
732 leaf_entry.update_lsn(lsn);
733 }
734 }
735 }
736
737 Ok(())
738 }
739
740 pub(crate) fn evict_from_circular_buffer(&self) -> Result<usize, TreeError> {
742 const TARGET_EVICT_SIZE: usize = 1024;
746 let mut evicted = 0;
747
748 let mut retry_cnt = 0;
750
751 while evicted < TARGET_EVICT_SIZE && retry_cnt < 10 {
752 let n = self
753 .storage
754 .evict_from_buffer(|mini_page_handle: &TombstoneHandle| {
755 eviction_callback(mini_page_handle, self)
756 })?;
757 evicted += n as usize;
758 retry_cnt += 1;
759 }
760 info!("stopped evict from circular buffer");
761 Ok(evicted)
762 }
763
764 pub fn insert(&self, key: &[u8], value: &[u8]) -> LeafInsertResult {
781 if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
783 return LeafInsertResult::InvalidKV(format!("Key too large {}", key.len()));
784 }
785
786 if key.is_empty() {
788 return LeafInsertResult::InvalidKV(format!(
789 "Key too small {}, at least one byte",
790 key.len()
791 ));
792 }
793
794 if value.len() > MAX_VALUE_LEN || key.len() + value.len() > self.config.cb_max_record_size {
796 return LeafInsertResult::InvalidKV(format!(
797 "Record too large {}, {}, please adjust cb_max_record_size in config",
798 key.len(),
799 value.len()
800 ));
801 }
802
803 if key.len() + value.len() < self.config.cb_min_record_size {
805 return LeafInsertResult::InvalidKV(format!(
806 "Record too small {}, {}, please adjust cb_min_record_size in config",
807 key.len(),
808 value.len()
809 ));
810 }
811
812 let backoff = Backoff::new();
813 let mut aggressive_split = false;
814
815 counter!(Insert);
816 info!(key_len = key.len(), value_len = value.len(), "insert");
817
818 loop {
819 let result = self.write_inner(WriteOp::make_insert(key, value), aggressive_split);
820 match result {
821 Ok(_) => return LeafInsertResult::Success,
822 Err(TreeError::NeedRestart) => {
823 #[cfg(all(feature = "shuttle", test))]
824 {
825 shuttle::thread::yield_now();
826 }
827 counter!(InsertNeedRestart);
828 aggressive_split = true;
829 }
830 Err(TreeError::CircularBufferFull) => {
831 info!("insert failed, started evict from circular buffer");
832 aggressive_split = true;
833 counter!(InsertCircularBufferFull);
834 _ = self.evict_from_circular_buffer();
835 continue;
836 }
837 Err(TreeError::Locked) => {
838 counter!(InsertLocked);
839 backoff.snooze();
840 }
841 }
842 }
843 }
844
845 pub fn read(&self, key: &[u8], out_buffer: &mut [u8]) -> LeafReadResult {
865 if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
867 return LeafReadResult::InvalidKey;
868 }
869
870 if key.is_empty() {
872 return LeafReadResult::InvalidKey;
873 }
874
875 let backoff = Backoff::new();
876
877 info!(key_len = key.len(), "read");
878 counter!(Read);
879 let mut aggressive_split = false;
880
881 #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
882 let mut debug_timer = DebugTimerGuard::new(Timer::Read, self.metrics_recorder.clone());
883
884 loop {
885 let result = self.read_inner(key, out_buffer, aggressive_split);
886 match result {
887 Ok(v) => {
888 #[cfg(any(
889 feature = "metrics-rt-debug-all",
890 feature = "metrics-rt-debug-timer"
891 ))]
892 debug_timer.end();
893
894 return v;
895 }
896 Err(TreeError::CircularBufferFull) => {
897 info!("read promotion failed, started evict from circular buffer");
898 aggressive_split = true;
899 match self.evict_from_circular_buffer() {
900 Ok(_) => continue,
901 Err(_) => continue,
902 };
903 }
904 Err(_) => {
905 backoff.spin();
906 aggressive_split = true;
907 }
908 }
909 }
910 }
911
912 pub fn delete(&self, key: &[u8]) {
926 if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
928 return;
929 }
930
931 if key.is_empty() {
933 return;
934 }
935
936 let backoff = Backoff::new();
937
938 info!(key_len = key.len(), "delete");
939
940 let mut aggressive_split = false;
941
942 loop {
943 let result = self.write_inner(WriteOp::make_delete(key), aggressive_split);
944 match result {
945 Ok(_) => return,
946 Err(TreeError::CircularBufferFull) => {
947 info!("delete failed, started evict from circular buffer");
948 aggressive_split = true;
949 match self.evict_from_circular_buffer() {
950 Ok(_) => continue,
951 Err(_) => continue,
952 };
953 }
954 Err(_) => {
955 aggressive_split = true;
956 backoff.spin();
957 }
958 }
959 }
960 }
961
962 pub fn scan_with_count<'a>(
965 &'a self,
966 key: &[u8],
967 cnt: usize,
968 return_field: ScanReturnField,
969 ) -> Result<ScanIter<'a, 'a>, ScanIterError> {
970 if self.cache_only {
972 return Err(ScanIterError::CacheOnlyMode);
973 }
974
975 if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
977 return Err(ScanIterError::InvalidStartKey);
978 }
979
980 if key.is_empty() {
982 return Err(ScanIterError::InvalidStartKey);
983 }
984
985 if cnt == 0 {
987 return Err(ScanIterError::InvalidCount);
988 }
989
990 Ok(ScanIter::new_with_scan_count(self, key, cnt, return_field))
991 }
992
993 pub fn scan_with_end_key<'a>(
994 &'a self,
995 start_key: &[u8],
996 end_key: &[u8],
997 return_field: ScanReturnField,
998 ) -> Result<ScanIter<'a, 'a>, ScanIterError> {
999 if self.cache_only {
1001 return Err(ScanIterError::CacheOnlyMode);
1002 }
1003
1004 if start_key.len() > self.config.max_fence_len / 2 || start_key.len() > MAX_KEY_LEN {
1006 return Err(ScanIterError::InvalidStartKey);
1007 }
1008
1009 if start_key.is_empty() {
1011 return Err(ScanIterError::InvalidStartKey);
1012 }
1013
1014 if end_key.len() > self.config.max_fence_len / 2 || end_key.len() > MAX_KEY_LEN {
1016 return Err(ScanIterError::InvalidEndKey);
1017 }
1018
1019 if end_key.is_empty() {
1021 return Err(ScanIterError::InvalidEndKey);
1022 }
1023
1024 let cmp = start_key.cmp(end_key);
1026 if cmp == std::cmp::Ordering::Greater {
1027 return Err(ScanIterError::InvalidKeyRange);
1028 }
1029
1030 Ok(ScanIter::new_with_end_key(
1031 self,
1032 start_key,
1033 end_key,
1034 return_field,
1035 ))
1036 }
1037
1038 #[doc(hidden)]
1039 pub fn scan_mut_with_count<'a>(
1040 &'a self,
1041 key: &'a [u8],
1042 cnt: usize,
1043 return_field: ScanReturnField,
1044 ) -> Result<ScanIterMut<'a, 'a>, ScanIterError> {
1045 if self.cache_only {
1047 return Err(ScanIterError::CacheOnlyMode);
1048 }
1049
1050 if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
1052 return Err(ScanIterError::InvalidStartKey);
1053 }
1054
1055 if cnt == 0 {
1057 return Err(ScanIterError::InvalidCount);
1058 }
1059
1060 Ok(ScanIterMut::new_with_scan_count(
1061 self,
1062 key,
1063 cnt,
1064 return_field,
1065 ))
1066 }
1067
1068 #[doc(hidden)]
1069 pub fn scan_mut_with_end_key<'a>(
1070 &'a self,
1071 start_key: &'a [u8],
1072 end_key: &'a [u8],
1073 return_field: ScanReturnField,
1074 ) -> Result<ScanIterMut<'a, 'a>, ScanIterError> {
1075 if self.cache_only {
1077 return Err(ScanIterError::CacheOnlyMode);
1078 }
1079
1080 if start_key.len() > self.config.max_fence_len / 2 || start_key.len() > MAX_KEY_LEN {
1082 return Err(ScanIterError::InvalidStartKey);
1083 }
1084
1085 if end_key.len() > self.config.max_fence_len / 2 || end_key.len() > MAX_KEY_LEN {
1087 return Err(ScanIterError::InvalidEndKey);
1088 }
1089
1090 Ok(ScanIterMut::new_with_end_key(
1091 self,
1092 start_key,
1093 end_key,
1094 return_field,
1095 ))
1096 }
1097
1098 fn read_inner(
1099 &self,
1100 key: &[u8],
1101 out_buffer: &mut [u8],
1102 aggressive_split: bool,
1103 ) -> Result<LeafReadResult, TreeError> {
1104 let (node, parent) = self.traverse_to_leaf(key, aggressive_split)?;
1105
1106 let mut leaf = self.mapping_table().get(&node);
1107
1108 check_parent!(self, node, parent);
1109
1110 let out = leaf.read(
1111 key,
1112 out_buffer,
1113 self.config.mini_page_binary_search,
1114 self.cache_only,
1115 );
1116 match out {
1117 ReadResult::Mini(r) | ReadResult::Full(r) => {
1118 if leaf.cache_page_about_to_evict(&self.storage) {
1119 let mut x_leaf = match leaf.try_upgrade() {
1120 Ok(v) => v,
1121 Err(_) => return Ok(r),
1122 };
1123 _ = x_leaf.move_cache_page_to_tail(&self.storage);
1125 }
1126
1127 Ok(r)
1128 }
1129
1130 ReadResult::Base(r) => {
1131 counter!(BasePageRead);
1132
1133 if self.cache_only {
1135 panic!("Attempt to read a base page while in cache-only mode.");
1136 }
1137
1138 let v = match r {
1139 LeafReadResult::Found(v) => v,
1140 _ => return Ok(r),
1141 };
1142
1143 if parent.is_none() || !self.should_promote_read() {
1144 return Ok(r);
1145 }
1146
1147 let mut x_leaf = match leaf.try_upgrade() {
1148 Ok(x) => x,
1149 Err(_) => {
1150 return Ok(r);
1151 }
1152 };
1153
1154 if self.config.read_record_cache {
1155 let out = x_leaf.insert(
1159 key,
1160 &out_buffer[0..v as usize],
1161 parent,
1162 OpType::Cache,
1163 &self.storage,
1164 &self.write_load_full_page,
1165 &self.cache_only,
1166 &self.mini_page_size_classes,
1167 );
1168
1169 match out {
1170 Ok(_) => {
1171 counter!(ReadPromotionOk);
1172 Ok(r)
1173 }
1174 Err(TreeError::Locked) => {
1175 counter!(ReadPromotionFailed);
1177 Ok(r)
1178 }
1179 Err(TreeError::CircularBufferFull) => {
1180 counter!(ReadPromotionFailed);
1181 Err(TreeError::CircularBufferFull)
1182 }
1183 Err(TreeError::NeedRestart) => {
1184 counter!(ReadPromotionFailed);
1186 Err(TreeError::NeedRestart)
1187 }
1188 }
1189 } else {
1190 match self.upgrade_to_full_page(x_leaf, parent.unwrap()) {
1191 Ok(_) | Err(TreeError::Locked) => Ok(r),
1192 Err(e) => Err(e),
1193 }
1194 }
1195 }
1196 ReadResult::None => Ok(LeafReadResult::NotFound),
1197 }
1198 }
1199
1200 fn upgrade_to_full_page<'a>(
1201 &'a self,
1202 mut x_leaf: LeafEntryXLocked<'a>,
1203 parent: ReadGuard<'a>,
1204 ) -> Result<LeafEntryXLocked<'a>, TreeError> {
1205 let page_loc = x_leaf.get_page_location().clone();
1206 match page_loc {
1207 PageLocation::Mini(ptr) => {
1208 let mini_page = x_leaf.load_cache_page_mut(ptr);
1209 let h = self.storage.begin_dealloc_mini_page(mini_page)?;
1210 let _merge_result = x_leaf.try_merge_mini_page(&h, parent, &self.storage)?;
1211 let base_offset = mini_page.next_level;
1212 x_leaf.change_to_base_loc();
1213 self.storage.finish_dealloc_mini_page(h);
1214
1215 let base_page_ref = x_leaf.load_base_page_from_buffer();
1216 let full_page_loc =
1217 upgrade_to_full_page(&self.storage, base_page_ref, base_offset)?;
1218 x_leaf.create_cache_page_loc(full_page_loc);
1219 Ok(x_leaf)
1220 }
1221 PageLocation::Full(_ptr) => Ok(x_leaf),
1222 PageLocation::Base(offset) => {
1223 let base_page_ref = x_leaf.load_base_page(offset);
1224 let next_level = MiniPageNextLevel::new(offset);
1225 let full_page_loc = upgrade_to_full_page(&self.storage, base_page_ref, next_level)?;
1226 x_leaf.create_cache_page_loc(full_page_loc);
1227 Ok(x_leaf)
1228 }
1229 PageLocation::Null => panic!("upgrade_to_full_page on Null page"),
1230 }
1231 }
1232
1233 pub fn get_metrics(&mut self) -> Option<serde_json::Value> {
1236 #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
1237 {
1238 let recorder = self.metrics_recorder.take();
1239 match recorder {
1240 Some(r) => {
1241 let recorders = Arc::try_unwrap(r).expect("Failed to obtain the recorders of bf-tree, please make sure no other references exist.");
1242 let mut timer_accumulated = TimerRecorder::default();
1243
1244 for r in recorders {
1246 let t = unsafe { &*r.get() };
1247
1248 timer_accumulated += t.timers.clone();
1249 }
1250
1251 let output = serde_json::json!({
1252 "Timers": timer_accumulated,
1253 });
1254
1255 self.metrics_recorder = Some(Arc::new(ThreadLocal::new()));
1256
1257 Some(output)
1258 }
1259 None => None,
1260 }
1261 }
1262 #[cfg(not(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer")))]
1263 {
1264 None
1265 }
1266 }
1267}
1268
1269pub(crate) fn key_value_physical_size(key: &[u8], value: &[u8]) -> usize {
1270 let key_size = key.len();
1271 let value_size = value.len();
1272 let meta_size = crate::nodes::KV_META_SIZE;
1273 key_size + value_size + meta_size
1274}
1275
1276pub(crate) fn eviction_callback(
1277 mini_page_handle: &TombstoneHandle,
1278 tree: &BfTree,
1279) -> Result<(), TreeError> {
1280 let mini_page = mini_page_handle.ptr as *mut LeafNode;
1281 let key_to_this_page = if tree.cache_only {
1282 unsafe { &*mini_page }.try_get_key_to_reach_this_node()?
1283 } else {
1284 unsafe { &*mini_page }.get_key_to_reach_this_node()
1285 };
1286
1287 let (pid, parent) = tree.traverse_to_leaf(&key_to_this_page, true)?;
1289 info!(
1290 pid = pid.raw(),
1291 "starting to merge mini page in eviction call back"
1292 );
1293
1294 let mut leaf_entry = tree.mapping_table().get_mut(&pid);
1295
1296 histogram!(EvictNodeSize, unsafe { &*mini_page }.meta.node_size as u64);
1297
1298 match leaf_entry.get_page_location() {
1299 PageLocation::Mini(ptr) => {
1300 {
1301 if *ptr != mini_page {
1308 return Err(TreeError::NeedRestart);
1309 }
1310 }
1311
1312 let parent = parent.expect("Mini page must have a parent");
1313 parent.check_version()?;
1314
1315 if tree.cache_only {
1318 leaf_entry.change_to_null_loc();
1319 } else {
1320 leaf_entry.try_merge_mini_page(mini_page_handle, parent, &tree.storage)?;
1321 leaf_entry.change_to_base_loc();
1322 }
1324
1325 Ok(())
1326 }
1327
1328 PageLocation::Full(ptr) => {
1329 if *ptr != mini_page {
1330 return Err(TreeError::NeedRestart);
1331 }
1332
1333 leaf_entry.merge_full_page(mini_page_handle);
1334 Ok(())
1335 }
1336
1337 PageLocation::Base(_offset) => Err(TreeError::NeedRestart),
1339
1340 PageLocation::Null => Err(TreeError::NeedRestart),
1342 }
1343}
1344
1345#[cfg(test)]
1346mod tests {
1347 use crate::error::ConfigError;
1348 use crate::BfTree;
1349
1350 #[test]
1351 fn test_mini_page_size_classes() {
1352 let mut size_classes = BfTree::create_mem_page_size_classes(48, 1952, 4096, 64, false);
1353 assert_eq!(
1354 size_classes,
1355 vec![128, 192, 256, 512, 960, 1856, 2048, 4096]
1356 );
1357
1358 size_classes = BfTree::create_mem_page_size_classes(1548, 1548, 3136, 64, true);
1359 assert_eq!(size_classes, vec![1536, 3136]);
1360
1361 size_classes = BfTree::create_mem_page_size_classes(48, 3072, 12288, 64, false);
1362 assert_eq!(
1363 size_classes,
1364 vec![128, 192, 256, 512, 960, 1856, 3648, 7232, 9088, 12288]
1365 );
1366
1367 size_classes = BfTree::create_mem_page_size_classes(4, 1952, 4096, 32, false);
1368 assert_eq!(size_classes, vec![64, 128, 256, 448, 832, 1600, 2048, 4096]);
1369 }
1370
1371 #[test]
1372 fn test_invalid_config_to_build_bf_tree() {
1373 let mut config = crate::Config::default();
1375 config.cb_min_record_size(4);
1376 config.leaf_page_size(32 * 1024);
1377
1378 if let Err(e) = BfTree::with_config(config.clone(), None) {
1379 match e {
1380 ConfigError::MinimumRecordSize(_) => {}
1381 _ => panic!("Expected InvalidMinimumRecordSize error"),
1382 }
1383 } else {
1384 panic!("Expected error but got Ok");
1385 }
1386
1387 config = crate::Config::default();
1389 config.cb_max_record_size(64 * 1024);
1390
1391 if let Err(e) = BfTree::with_config(config.clone(), None) {
1392 match e {
1393 ConfigError::MaximumRecordSize(_) => {}
1394 _ => panic!("Expected InvalidMaximumRecordSize error"),
1395 }
1396 } else {
1397 panic!("Expected error but got Ok");
1398 }
1399
1400 config = crate::Config::default();
1402 config.leaf_page_size(4050);
1403
1404 if let Err(e) = BfTree::with_config(config.clone(), None) {
1405 match e {
1406 ConfigError::LeafPageSize(_) => {}
1407 _ => panic!("Expected InvalidLeafPageSize error"),
1408 }
1409 } else {
1410 panic!("Expected error but got Ok");
1411 }
1412
1413 config = crate::Config::default();
1415 config.leaf_page_size(16 * 1024);
1416 config.cb_size_byte(16 * 1024);
1417
1418 if let Err(e) = BfTree::with_config(config.clone(), None) {
1419 match e {
1420 ConfigError::CircularBufferSize(_) => {}
1421 _ => panic!("Expected InvalidCircularBufferSize error"),
1422 }
1423 } else {
1424 panic!("Expected error but got Ok");
1425 }
1426
1427 config = crate::Config::default();
1429 config.cb_size_byte(20 * 1024);
1430 if let Err(e) = BfTree::with_config(config.clone(), None) {
1431 match e {
1432 ConfigError::CircularBufferSize(_) => {}
1433 _ => panic!("Expected InvalidCircularBufferSize error"),
1434 }
1435 } else {
1436 panic!("Expected error but got Ok");
1437 }
1438
1439 config = crate::Config::default();
1441 config.cache_only(true);
1442 config.cb_size_byte(2 * 4096);
1443
1444 if let Err(e) = BfTree::with_config(config.clone(), None) {
1445 match e {
1446 ConfigError::CircularBufferSize(_) => {}
1447 _ => panic!("Expected InvalidCircularBufferSize error"),
1448 }
1449 } else {
1450 panic!("Expected error but got Ok");
1451 }
1452 }
1453}