1#[cfg(not(all(feature = "shuttle", test)))]
5use rand::Rng;
6
7#[cfg(all(feature = "shuttle", test))]
8use shuttle::rand::Rng;
9
10use cfg_if::cfg_if;
11
12cfg_if! {
13 if #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))] {
14 use crate::metric::{Timer, TlsRecorder, timer::{TimerRecorder, DebugTimerGuard}};
15 use std::cell::UnsafeCell;
16 use thread_local::ThreadLocal;
17 }
18}
19
20use crate::{
21 check_parent,
22 circular_buffer::{CircularBufferMetrics, TombstoneHandle},
23 counter,
24 error::TreeError,
25 histogram, info,
26 mini_page_op::{upgrade_to_full_page, LeafEntryXLocked, LeafOperations, ReadResult},
27 nodes::{
28 leaf_node::{LeafKVMeta, LeafReadResult, MiniPageNextLevel, OpType},
29 InnerNode, InnerNodeBuilder, LeafNode, PageID, CACHE_LINE_SIZE, DISK_PAGE_SIZE,
30 INNER_NODE_SIZE, MAX_KEY_LEN, MAX_LEAF_PAGE_SIZE, MAX_VALUE_LEN,
31 },
32 range_scan::{ScanIter, ScanIterMut},
33 storage::{LeafStorage, PageLocation, PageTable},
34 sync::{
35 atomic::{AtomicU64, Ordering},
36 Arc,
37 },
38 utils::{get_rng, inner_lock::ReadGuard, Backoff, BfsVisitor, NodeInfo},
39 wal::{WriteAheadLog, WriteOp},
40 Config, StorageBackend,
41};
42use std::path::Path;
43
44pub struct BfTree {
46 pub(crate) root_page_id: AtomicU64,
47 pub(crate) storage: LeafStorage,
48 pub(crate) wal: Option<Arc<WriteAheadLog>>,
49 pub(crate) config: Arc<Config>,
50 pub(crate) write_load_full_page: bool,
51 pub(crate) cache_only: bool, pub(crate) mini_page_size_classes: Vec<usize>, #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
54 pub metrics_recorder: Option<Arc<ThreadLocal<UnsafeCell<TlsRecorder>>>>, }
56
57unsafe impl Sync for BfTree {}
58
59unsafe impl Send for BfTree {}
60
61pub enum LeafInsertResult {
62 Success,
63 InvalidKV(String),
64}
65
66impl Drop for BfTree {
67 fn drop(&mut self) {
68 if let Some(ref wal) = self.wal {
69 wal.stop_background_job();
70 }
71
72 let visitor = BfsVisitor::new_all_nodes(self);
73 for node_info in visitor {
74 match node_info {
75 NodeInfo::Leaf { page_id, .. } => {
76 let mut leaf = self.mapping_table().get_mut(&page_id);
77 leaf.dealloc_self(&self.storage, self.cache_only);
78 }
79 NodeInfo::Inner { ptr, .. } => {
80 if unsafe { &*ptr }.is_valid_disk_offset() {
81 let disk_offset = unsafe { &*ptr }.disk_offset;
82 if self.config.storage_backend == StorageBackend::Memory {
83 self.storage.vfs.dealloc_offset(disk_offset as usize);
85 }
86 }
87 InnerNode::free_node(ptr as *mut InnerNode);
88 }
89 }
90 }
91 }
92}
93
94impl Default for BfTree {
95 fn default() -> Self {
96 Self::new(":memory:", 1024 * 1024 * 32)
97 }
98}
99
100impl BfTree {
101 pub(crate) const ROOT_IS_LEAF_MASK: u64 = 0x8000_0000_0000_0000; pub(crate) fn create_mem_page_size_classes(
123 min_record_size_in_byte: usize,
124 max_record_size_in_byte: usize,
125 leaf_page_size_in_byte: usize,
126 max_fence_len_in_byte: usize,
127 cache_only: bool,
128 ) -> Vec<usize> {
129 assert!(
131 min_record_size_in_byte > 1,
132 "cb_min_record_size in config cannot be less than 2"
133 );
134 assert!(
135 min_record_size_in_byte <= max_record_size_in_byte,
136 "cb_min_record_size cannot be larger than cb_max_record_size"
137 );
138 assert!(
139 max_fence_len_in_byte > 0,
140 "max_fence_len in config cannot be zero"
141 );
142 assert!(
143 max_fence_len_in_byte / 2 < max_record_size_in_byte,
144 "max_fence_len/2 cannot be larger than cb_max_record_size"
145 );
146 assert!(
147 leaf_page_size_in_byte <= MAX_LEAF_PAGE_SIZE,
148 "leaf_page_size in config cannot be larger than {}",
149 MAX_LEAF_PAGE_SIZE
150 );
151 assert!(
152 max_fence_len_in_byte / 2 <= MAX_KEY_LEN,
153 "max_key_len in config cannot be larger than {}",
154 MAX_KEY_LEN
155 );
156
157 if !cache_only {
160 assert!(
161 leaf_page_size_in_byte.is_multiple_of(DISK_PAGE_SIZE),
162 "leaf_page_size in config should be multiple of {}",
163 DISK_PAGE_SIZE
164 );
165 } else {
166 assert!(
167 leaf_page_size_in_byte.is_multiple_of(CACHE_LINE_SIZE),
168 "leaf_page_size in config should be multiple of {}",
169 CACHE_LINE_SIZE
170 );
171 }
172
173 let max_record_size_with_meta = max_record_size_in_byte + std::mem::size_of::<LeafKVMeta>();
174 let mut max_mini_page_size: usize;
175
176 if cache_only {
177 max_mini_page_size = leaf_page_size_in_byte - max_record_size_with_meta;
179 max_mini_page_size = (max_mini_page_size / CACHE_LINE_SIZE) * CACHE_LINE_SIZE;
180
181 assert!(
182 leaf_page_size_in_byte
183 >= 2 * max_record_size_with_meta + std::mem::size_of::<LeafNode>(),
184 "cb_max_record_size of config should be <= {}",
185 (leaf_page_size_in_byte - std::mem::size_of::<LeafNode>()) / 2
186 - std::mem::size_of::<LeafKVMeta>()
187 );
188 } else {
189 max_mini_page_size = leaf_page_size_in_byte
191 - max_record_size_with_meta
192 - max_fence_len_in_byte
193 - 2 * std::mem::size_of::<LeafKVMeta>();
194 max_mini_page_size = (max_mini_page_size / CACHE_LINE_SIZE) * CACHE_LINE_SIZE;
195
196 assert!(
197 max_mini_page_size >= max_record_size_with_meta + std::mem::size_of::<LeafNode>(),
198 "cb_max_record_size of config should be <= {}",
199 max_mini_page_size
200 - std::mem::size_of::<LeafNode>()
201 - std::mem::size_of::<LeafKVMeta>()
202 );
203 }
204
205 let mut mem_page_size_classes = Vec::new();
207
208 let base: i32 = 2;
209 let mut x: u32 = 0;
210 let c: usize = min_record_size_in_byte + std::mem::size_of::<LeafKVMeta>();
211
212 let mut size_class = base.pow(x) as usize * c + std::mem::size_of::<LeafNode>();
214
215 if !size_class.is_multiple_of(CACHE_LINE_SIZE) {
217 size_class = (size_class / CACHE_LINE_SIZE + 1) * CACHE_LINE_SIZE;
218 }
219
220 while size_class <= max_mini_page_size {
221 mem_page_size_classes.push(size_class);
222
223 x += 1;
224 size_class = base.pow(x) as usize * c + std::mem::size_of::<LeafNode>();
225
226 if !size_class.is_multiple_of(CACHE_LINE_SIZE) {
227 size_class = (size_class / CACHE_LINE_SIZE + 1) * CACHE_LINE_SIZE;
228 }
229 }
230
231 if !cache_only {
232 assert!(x > 0);
233 }
234
235 if x == 0 || mem_page_size_classes[(x - 1) as usize] < max_mini_page_size {
237 mem_page_size_classes.push(max_mini_page_size);
238 x += 1;
239 }
240
241 if x == 0 || mem_page_size_classes[(x - 1) as usize] < leaf_page_size_in_byte {
243 mem_page_size_classes.push(leaf_page_size_in_byte);
244 x += 1;
245 }
246
247 if !cache_only {
248 assert!(x >= 2);
249 } else {
250 assert!(x >= 1);
251 }
252
253 mem_page_size_classes
254 }
255
256 pub fn new(file_path: impl AsRef<Path>, cache_size_byte: usize) -> Self {
270 let config = Config::new(file_path, cache_size_byte);
271 Self::with_config(config, None)
272 }
273
274 pub fn new_with_config_file<P: AsRef<Path>>(config_file_path: P) -> Self {
277 let config = Config::new_with_config_file(config_file_path);
278 Self::with_config(config, None)
279 }
280
281 pub fn with_config(config: Config, buffer_ptr: Option<*mut u8>) -> Self {
284 let wal = match config.write_ahead_log.as_ref() {
285 Some(wal_config) => {
286 let wal = WriteAheadLog::new(wal_config.clone());
287 Some(wal)
288 }
289 None => None,
290 };
291 let write_load_full = config.write_load_full_page;
292 let config = Arc::new(config);
293
294 if config.cache_only {
301 assert!(
302 config.cb_size_byte >= 4 * config.leaf_page_size,
303 "In cache-only mode, cb_size_byte should be at least 4 times of leaf_page_size"
304 );
305 } else {
306 assert!(
307 config.cb_size_byte >= 2 * config.leaf_page_size,
308 "In non cache-only mode, cb_size_byte should be at least 2 times of leaf_page_size"
309 );
310 }
311
312 if config.cache_only {
314 let leaf_storage = LeafStorage::new(config.clone(), buffer_ptr);
315
316 let mini_page_guard = (leaf_storage)
318 .alloc_mini_page(config.leaf_page_size)
319 .expect("Fail to allocate a mini-page as initial root node");
320 LeafNode::initialize_mini_page(
321 &mini_page_guard,
322 config.leaf_page_size,
323 MiniPageNextLevel::new_null(),
324 true,
325 );
326 let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
327 let mini_loc = PageLocation::Mini(new_mini_ptr);
328
329 let (root_id, root_lock) = leaf_storage
330 .mapping_table()
331 .insert_mini_page_mapping(mini_loc);
332 assert_eq!(root_id.as_id(), 0);
333
334 drop(root_lock);
335 drop(mini_page_guard);
336 let root_id = root_id.raw() | Self::ROOT_IS_LEAF_MASK;
337
338 return Self {
339 root_page_id: AtomicU64::new(root_id),
340 storage: leaf_storage,
341 wal,
342 cache_only: config.cache_only,
343 write_load_full_page: write_load_full,
344 mini_page_size_classes: Self::create_mem_page_size_classes(
345 config.cb_min_record_size,
346 config.cb_max_record_size,
347 config.leaf_page_size,
348 config.max_fence_len,
349 config.cache_only,
350 ),
351 config,
352 #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
353 metrics_recorder: Some(Arc::new(ThreadLocal::new())),
354 };
355 }
356
357 let leaf_storage = LeafStorage::new(config.clone(), buffer_ptr);
358 let (root_id, root_lock) = leaf_storage.mapping_table().alloc_base_page_mapping();
359 drop(root_lock);
360 assert_eq!(root_id.as_id(), 0);
361
362 let root_id = root_id.raw() | Self::ROOT_IS_LEAF_MASK;
363 Self {
364 root_page_id: AtomicU64::new(root_id),
365 storage: leaf_storage,
366 wal,
367 cache_only: config.cache_only,
368 write_load_full_page: write_load_full,
369 mini_page_size_classes: Self::create_mem_page_size_classes(
370 config.cb_min_record_size,
371 config.cb_max_record_size,
372 config.leaf_page_size,
373 config.max_fence_len,
374 config.cache_only,
375 ),
376 config,
377 #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
378 metrics_recorder: Some(Arc::new(ThreadLocal::new())),
379 }
380 }
381
382 pub fn get_buffer_metrics(&self) -> CircularBufferMetrics {
386 self.storage.get_buffer_metrics()
387 }
388
389 pub(crate) fn get_root_page(&self) -> (PageID, bool) {
391 let root_page_id = self.root_page_id.load(Ordering::Acquire);
392 let root_is_leaf = (root_page_id & Self::ROOT_IS_LEAF_MASK) != 0;
393 let clean = root_page_id & (!Self::ROOT_IS_LEAF_MASK);
394
395 let page_id = if root_is_leaf {
396 PageID::from_id(clean)
397 } else {
398 PageID::from_pointer(clean as *const InnerNode)
399 };
400
401 (page_id, root_is_leaf)
402 }
403
404 pub(crate) fn mapping_table(&self) -> &PageTable {
405 self.storage.mapping_table()
406 }
407
408 pub(crate) fn should_promote_read(&self) -> bool {
409 get_rng().gen_range(0..100) < self.config.read_promotion_rate.load(Ordering::Relaxed)
410 }
411
412 pub(crate) fn should_promote_scan_page(&self) -> bool {
413 get_rng().gen_range(0..100) < self.config.scan_promotion_rate.load(Ordering::Relaxed)
414 }
415
416 pub fn update_read_promotion_rate(&self, new_rate: usize) {
418 self.config
419 .read_promotion_rate
420 .store(new_rate, Ordering::Relaxed);
421 }
422
423 fn try_split_leaf(
424 &self,
425 cur_page_id: PageID,
426 parent: &Option<ReadGuard>,
427 ) -> Result<bool, TreeError> {
428 debug_assert!(cur_page_id.is_id());
429
430 let mut cur_page = self.mapping_table().get_mut(&cur_page_id);
434
435 check_parent!(self, cur_page_id, parent);
436
437 let should_split = cur_page.get_split_flag();
438 if !should_split {
439 return Ok(false);
440 }
441 match parent {
442 Some(_) => {
443 unreachable!("Leaf node split should not happen here");
444 }
445 None => {
446 if self.cache_only {
450 let mini_page_guard = self
453 .storage
454 .alloc_mini_page(self.config.leaf_page_size)
455 .expect("Fail to allocate a mini-page during root split");
456 LeafNode::initialize_mini_page(
457 &mini_page_guard,
458 self.config.leaf_page_size,
459 MiniPageNextLevel::new_null(),
460 true,
461 );
462 let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
463 let mini_loc = PageLocation::Mini(new_mini_ptr);
464
465 let (sibling_id, _mini_lock) = self
467 .storage
468 .mapping_table()
469 .insert_mini_page_mapping(mini_loc);
470
471 let cur_page_loc = cur_page.get_page_location().clone();
473 match cur_page_loc {
474 PageLocation::Mini(ptr) => {
475 let cur_mini_page = cur_page.load_cache_page_mut(ptr);
476 let sibling_page = unsafe { &mut *new_mini_ptr };
477 let split_key = cur_mini_page.split(sibling_page, true);
478
479 let mut new_root_builder = InnerNodeBuilder::new();
480 new_root_builder
481 .set_left_most_page_id(cur_page_id)
482 .set_children_is_leaf(true)
483 .add_record(split_key, sibling_id);
484
485 let new_root_ptr = new_root_builder.build();
486
487 self.root_page_id
488 .store(PageID::from_pointer(new_root_ptr).raw(), Ordering::Release);
489
490 info!(sibling = sibling_id.raw(), "New root node installed!");
491
492 debug_assert!(cur_mini_page.meta.meta_count_with_fence() > 0);
493 debug_assert!(sibling_page.meta.meta_count_with_fence() > 0);
494
495 return Ok(true);
496 }
497 _ => {
498 panic!("The root node is not a mini-page in cache-only mode")
499 }
500 }
501 }
502
503 let mut x_page = cur_page;
504
505 let (sibling_id, mut sibling_entry) = self.alloc_base_page_and_lock();
506
507 info!(sibling = sibling_id.raw(), "Splitting root node!");
508
509 let sibling = sibling_entry.load_base_page_mut();
510
511 let leaf_node = x_page.load_base_page_mut();
512 let split_key = leaf_node.split(sibling, false);
513
514 let mut new_root_builder = InnerNodeBuilder::new();
515 new_root_builder
516 .set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE))
517 .set_left_most_page_id(cur_page_id)
518 .set_children_is_leaf(true)
519 .add_record(split_key, sibling_id);
520
521 let new_root_ptr = new_root_builder.build();
522
523 self.root_page_id
524 .store(PageID::from_pointer(new_root_ptr).raw(), Ordering::Release);
525
526 info!(sibling = sibling_id.raw(), "New root node installed!");
527 Ok(true)
528 }
529 }
530 }
531
532 fn alloc_base_page_and_lock(&self) -> (PageID, LeafEntryXLocked<'_>) {
533 let (pid, base_entry) = self.mapping_table().alloc_base_page_mapping();
534
535 (pid, base_entry)
536 }
537
538 fn try_split_inner<'a>(
539 &self,
540 cur_page: PageID,
541 parent: Option<ReadGuard<'a>>,
542 ) -> Result<(bool, Option<ReadGuard<'a>>), TreeError> {
543 let cur_node = ReadGuard::try_read(cur_page.as_inner_node())?;
544
545 check_parent!(self, cur_page, parent);
546
547 let should_split = cur_node.as_ref().meta.get_split_flag();
548
549 if !should_split {
550 return Ok((false, parent));
551 }
552
553 info!(has_parent = parent.is_some(), "split inner node");
554
555 match parent {
556 Some(p) => {
557 let mut x_cur = cur_node.upgrade().map_err(|(_x, e)| e)?;
558 let mut x_parent = p.upgrade().map_err(|(_x, e)| e)?;
559
560 let split_key = x_cur.as_ref().get_split_key();
561
562 let mut sibling_builder = InnerNodeBuilder::new();
563 sibling_builder.set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE));
564
565 let success = x_parent
566 .as_mut()
567 .insert(&split_key, sibling_builder.get_page_id());
568 if !success {
569 x_parent.as_mut().meta.set_split_flag();
570 return Err(TreeError::NeedRestart);
571 }
572
573 x_cur.as_mut().split(&mut sibling_builder);
574
575 sibling_builder.build();
576
577 Ok((true, Some(x_parent.downgrade())))
578 }
579 None => {
580 let mut x_cur = cur_node.upgrade().map_err(|(_x, e)| e)?;
581
582 let mut sibling_builder = InnerNodeBuilder::new();
583 sibling_builder.set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE));
584 let sibling_id = sibling_builder.get_page_id();
585
586 let split_key = x_cur.as_mut().split(&mut sibling_builder);
587
588 let mut new_root_builder = InnerNodeBuilder::new();
589 new_root_builder
590 .set_disk_offset(self.storage.alloc_disk_offset(INNER_NODE_SIZE))
591 .set_left_most_page_id(cur_page)
592 .set_children_is_leaf(false)
593 .add_record(split_key, sibling_id);
594 sibling_builder.build();
595 let new_root_ptr = new_root_builder.build();
596 let _x_root = ReadGuard::try_read(new_root_ptr)
597 .unwrap()
598 .upgrade()
599 .unwrap();
600 self.root_page_id
601 .store(PageID::from_pointer(new_root_ptr).raw(), Ordering::Release);
602
603 info!(
604 has_parent = parent.is_some(),
605 cur = cur_page.raw(),
606 "finished split inner node"
607 );
608
609 Ok((true, parent))
610 }
611 }
612 }
613
614 pub(crate) fn traverse_to_leaf(
615 &self,
616 key: &[u8],
617 aggressive_split: bool,
618 ) -> Result<(PageID, Option<ReadGuard<'_>>), TreeError> {
619 let (mut cur_page, mut cur_is_leaf) = self.get_root_page();
620 let mut parent: Option<ReadGuard> = None;
621
622 loop {
623 if aggressive_split {
624 if cur_is_leaf
625 && !cur_page.is_inner_node_pointer()
626 && self.try_split_leaf(cur_page, &parent)?
627 {
628 return Err(TreeError::NeedRestart);
629 } else if !cur_is_leaf {
630 let (split_success, new_parent) = self.try_split_inner(cur_page, parent)?;
631 if split_success {
632 return Err(TreeError::NeedRestart);
633 } else {
634 parent = new_parent;
635 }
636 }
637 }
638
639 if cur_is_leaf {
640 return Ok((cur_page, parent));
641 } else {
642 let next = ReadGuard::try_read(cur_page.as_inner_node())?;
643
644 check_parent!(self, cur_page, parent);
645
646 let next_node = next.as_ref();
647 let next_is_leaf = next_node.meta.children_is_leaf();
648 let pos = next_node.lower_bound(key);
649 let kv_meta = next_node.get_kv_meta(pos as u16);
650 cur_page = next_node.get_value(kv_meta);
651 cur_is_leaf = next_is_leaf;
652 parent = Some(next);
653 }
654 }
655 }
656
657 fn write_inner(&self, write_op: WriteOp, aggressive_split: bool) -> Result<(), TreeError> {
658 let (pid, parent) = self.traverse_to_leaf(write_op.key, aggressive_split)?;
659
660 let mut leaf_entry = self.mapping_table().get_mut(&pid);
661
662 check_parent!(self, pid, parent);
663
664 let page_loc = leaf_entry.get_page_location();
665 match page_loc {
666 PageLocation::Null => {
667 if !self.cache_only {
668 panic!("Found an Null page in non cache-only mode");
669 }
670
671 if write_op.op_type == OpType::Delete {
672 return Ok(());
673 }
674
675 let mini_page_size = LeafNode::get_chain_size_hint(
677 write_op.key.len(),
678 write_op.value.len(),
679 &self.mini_page_size_classes,
680 self.cache_only,
681 );
682 let mini_page_guard = self.storage.alloc_mini_page(mini_page_size)?;
683 LeafNode::initialize_mini_page(
684 &mini_page_guard,
685 mini_page_size,
686 MiniPageNextLevel::new_null(),
687 true,
688 );
689 let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
690 let mini_loc = PageLocation::Mini(new_mini_ptr);
691
692 leaf_entry.create_cache_page_loc(mini_loc);
693
694 let mini_page_ref = leaf_entry.load_cache_page_mut(new_mini_ptr);
695 let insert_success =
696 mini_page_ref.insert(write_op.key, write_op.value, write_op.op_type, 0);
697 assert!(insert_success);
698
699 debug_assert!(mini_page_ref.meta.meta_count_with_fence() > 0);
700 counter!(InsertCreatedMiniPage);
701 }
702 _ => {
703 leaf_entry.insert(
704 write_op.key,
705 write_op.value,
706 parent,
707 write_op.op_type,
708 &self.storage,
709 &self.write_load_full_page,
710 &self.cache_only,
711 &self.mini_page_size_classes,
712 )?;
713
714 if leaf_entry.cache_page_about_to_evict(&self.storage) {
715 _ = leaf_entry.move_cache_page_to_tail(&self.storage);
717 }
718
719 if let Some(wal) = &self.wal {
720 let lsn = wal.append_and_wait(&write_op, leaf_entry.get_disk_offset());
721 leaf_entry.update_lsn(lsn);
722 }
723 }
724 }
725
726 Ok(())
727 }
728
729 pub(crate) fn evict_from_circular_buffer(&self) -> Result<usize, TreeError> {
731 const TARGET_EVICT_SIZE: usize = 1024;
735 let mut evicted = 0;
736
737 let mut retry_cnt = 0;
739
740 while evicted < TARGET_EVICT_SIZE && retry_cnt < 10 {
741 let n = self
742 .storage
743 .evict_from_buffer(|mini_page_handle: &TombstoneHandle| {
744 eviction_callback(mini_page_handle, self)
745 })?;
746 evicted += n as usize;
747 retry_cnt += 1;
748 }
749 info!("stopped evict from circular buffer");
750 Ok(evicted)
751 }
752
753 pub fn insert(&self, key: &[u8], value: &[u8]) -> LeafInsertResult {
768 if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
770 return LeafInsertResult::InvalidKV(format!("Key too large {}", key.len()));
771 }
772
773 if value.len() > MAX_VALUE_LEN || key.len() + value.len() > self.config.cb_max_record_size {
775 return LeafInsertResult::InvalidKV(format!(
776 "Record too large {}, {}",
777 key.len(),
778 value.len()
779 ));
780 }
781
782 let backoff = Backoff::new();
783 let mut aggressive_split = false;
784
785 counter!(Insert);
786 info!(key_len = key.len(), value_len = value.len(), "insert");
787
788 loop {
789 let result = self.write_inner(WriteOp::make_insert(key, value), aggressive_split);
790 match result {
791 Ok(_) => return LeafInsertResult::Success,
792 Err(TreeError::NeedRestart) => {
793 #[cfg(all(feature = "shuttle", test))]
794 {
795 shuttle::thread::yield_now();
796 }
797 counter!(InsertNeedRestart);
798 aggressive_split = true;
799 }
800 Err(TreeError::CircularBufferFull) => {
801 info!("insert failed, started evict from circular buffer");
802 aggressive_split = true;
803 counter!(InsertCircularBufferFull);
804 _ = self.evict_from_circular_buffer();
805 continue;
806 }
807 Err(TreeError::Locked) => {
808 counter!(InsertLocked);
809 backoff.snooze();
810 }
811 }
812 }
813 }
814
815 pub fn read(&self, key: &[u8], out_buffer: &mut [u8]) -> LeafReadResult {
832 if key.len() > self.config.max_fence_len / 2 || key.len() > MAX_KEY_LEN {
834 return LeafReadResult::InvalidKey;
835 }
836
837 let backoff = Backoff::new();
838
839 info!(key_len = key.len(), "read");
840 counter!(Read);
841 let mut aggressive_split = false;
842
843 #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
844 let mut debug_timer = DebugTimerGuard::new(Timer::Read, self.metrics_recorder.clone());
845
846 loop {
847 let result = self.read_inner(key, out_buffer, aggressive_split);
848 match result {
849 Ok(v) => {
850 #[cfg(any(
851 feature = "metrics-rt-debug-all",
852 feature = "metrics-rt-debug-timer"
853 ))]
854 debug_timer.end();
855
856 return v;
857 }
858 Err(TreeError::CircularBufferFull) => {
859 info!("read promotion failed, started evict from circular buffer");
860 aggressive_split = true;
861 match self.evict_from_circular_buffer() {
862 Ok(_) => continue,
863 Err(_) => continue,
864 };
865 }
866 Err(_) => {
867 backoff.spin();
868 aggressive_split = true;
869 }
870 }
871 }
872 }
873
874 pub fn delete(&self, key: &[u8]) {
888 if key.len() > self.config.max_fence_len / 2 {
890 return;
891 }
892
893 let backoff = Backoff::new();
894
895 info!(key_len = key.len(), "delete");
896
897 let mut aggressive_split = false;
898
899 loop {
900 let result = self.write_inner(WriteOp::make_delete(key), aggressive_split);
901 match result {
902 Ok(_) => return,
903 Err(TreeError::CircularBufferFull) => {
904 info!("delete failed, started evict from circular buffer");
905 aggressive_split = true;
906 match self.evict_from_circular_buffer() {
907 Ok(_) => continue,
908 Err(_) => continue,
909 };
910 }
911 Err(_) => {
912 aggressive_split = true;
913 backoff.spin();
914 }
915 }
916 }
917 }
918
919 pub fn scan<'a>(&'a self, key: &'a [u8], cnt: usize) -> ScanIter<'a, 'a> {
922 ScanIter::new(self, key, cnt)
923 }
924
925 #[doc(hidden)]
926 pub fn scan_mut<'a>(&'a self, key: &'a [u8], cnt: usize) -> ScanIterMut<'a, 'a> {
927 ScanIterMut::new(self, key, cnt)
928 }
929
930 fn read_inner(
931 &self,
932 key: &[u8],
933 out_buffer: &mut [u8],
934 aggressive_split: bool,
935 ) -> Result<LeafReadResult, TreeError> {
936 let (node, parent) = self.traverse_to_leaf(key, aggressive_split)?;
937
938 let mut leaf = self.mapping_table().get(&node);
939
940 check_parent!(self, node, parent);
941
942 let out = leaf.read(
943 key,
944 out_buffer,
945 self.config.mini_page_binary_search,
946 self.cache_only,
947 );
948 match out {
949 ReadResult::Mini(r) | ReadResult::Full(r) => {
950 if leaf.cache_page_about_to_evict(&self.storage) {
951 let mut x_leaf = match leaf.try_upgrade() {
952 Ok(v) => v,
953 Err(_) => return Ok(r),
954 };
955 _ = x_leaf.move_cache_page_to_tail(&self.storage);
957 }
958
959 Ok(r)
960 }
961
962 ReadResult::Base(r) => {
963 counter!(BasePageRead);
964
965 if self.cache_only {
967 panic!("Attempt to read a base page while in cache-only mode.");
968 }
969
970 let v = match r {
971 LeafReadResult::Found(v) => v,
972 _ => return Ok(r),
973 };
974
975 if parent.is_none() || !self.should_promote_read() {
976 return Ok(r);
977 }
978
979 let mut x_leaf = match leaf.try_upgrade() {
980 Ok(x) => x,
981 Err(_) => {
982 return Ok(r);
983 }
984 };
985
986 if self.config.read_record_cache {
987 let out = x_leaf.insert(
991 key,
992 &out_buffer[0..v as usize],
993 parent,
994 OpType::Cache,
995 &self.storage,
996 &self.write_load_full_page,
997 &self.cache_only,
998 &self.mini_page_size_classes,
999 );
1000
1001 match out {
1002 Ok(_) => {
1003 counter!(ReadPromotionOk);
1004 Ok(r)
1005 }
1006 Err(TreeError::Locked) => {
1007 counter!(ReadPromotionFailed);
1009 Ok(r)
1010 }
1011 Err(TreeError::CircularBufferFull) => {
1012 counter!(ReadPromotionFailed);
1013 Err(TreeError::CircularBufferFull)
1014 }
1015 Err(TreeError::NeedRestart) => {
1016 counter!(ReadPromotionFailed);
1018 Err(TreeError::NeedRestart)
1019 }
1020 }
1021 } else {
1022 match self.upgrade_to_full_page(x_leaf, parent.unwrap()) {
1023 Ok(_) | Err(TreeError::Locked) => Ok(r),
1024 Err(e) => Err(e),
1025 }
1026 }
1027 }
1028 ReadResult::None => Ok(LeafReadResult::NotFound),
1029 }
1030 }
1031
1032 fn upgrade_to_full_page<'a>(
1033 &'a self,
1034 mut x_leaf: LeafEntryXLocked<'a>,
1035 parent: ReadGuard<'a>,
1036 ) -> Result<LeafEntryXLocked<'a>, TreeError> {
1037 let page_loc = x_leaf.get_page_location().clone();
1038 match page_loc {
1039 PageLocation::Mini(ptr) => {
1040 let mini_page = x_leaf.load_cache_page_mut(ptr);
1041 let h = self.storage.begin_dealloc_mini_page(mini_page)?;
1042 let _merge_result = x_leaf.try_merge_mini_page(&h, parent, &self.storage)?;
1043 let base_offset = mini_page.next_level;
1044 x_leaf.change_to_base_loc();
1045 self.storage.finish_dealloc_mini_page(h);
1046
1047 let base_page_ref = x_leaf.load_base_page_from_buffer();
1048 let full_page_loc =
1049 upgrade_to_full_page(&self.storage, base_page_ref, base_offset)?;
1050 x_leaf.create_cache_page_loc(full_page_loc);
1051 Ok(x_leaf)
1052 }
1053 PageLocation::Full(_ptr) => Ok(x_leaf),
1054 PageLocation::Base(offset) => {
1055 let base_page_ref = x_leaf.load_base_page(offset);
1056 let next_level = MiniPageNextLevel::new(offset);
1057 let full_page_loc = upgrade_to_full_page(&self.storage, base_page_ref, next_level)?;
1058 x_leaf.create_cache_page_loc(full_page_loc);
1059 Ok(x_leaf)
1060 }
1061 PageLocation::Null => panic!("upgrade_to_full_page on Null page"),
1062 }
1063 }
1064
1065 pub fn get_metrics(&mut self) -> Option<serde_json::Value> {
1068 #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
1069 {
1070 let recorder = self.metrics_recorder.take();
1071 match recorder {
1072 Some(r) => {
1073 let recorders = Arc::try_unwrap(r).expect("Failed to obtain the recorders of bf-tree, please make sure no other references exist.");
1074 let mut timer_accumulated = TimerRecorder::default();
1075
1076 for r in recorders {
1078 let t = unsafe { &*r.get() };
1079
1080 timer_accumulated += t.timers.clone();
1081 }
1082
1083 let output = serde_json::json!({
1084 "Timers": timer_accumulated,
1085 });
1086
1087 self.metrics_recorder = Some(Arc::new(ThreadLocal::new()));
1088
1089 Some(output)
1090 }
1091 None => None,
1092 }
1093 }
1094 #[cfg(not(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer")))]
1095 {
1096 None
1097 }
1098 }
1099}
1100
1101pub(crate) fn key_value_physical_size(key: &[u8], value: &[u8]) -> usize {
1102 let key_size = key.len();
1103 let value_size = value.len();
1104 let meta_size = crate::nodes::KV_META_SIZE;
1105 key_size + value_size + meta_size
1106}
1107
1108pub(crate) fn eviction_callback(
1109 mini_page_handle: &TombstoneHandle,
1110 tree: &BfTree,
1111) -> Result<(), TreeError> {
1112 let mini_page = mini_page_handle.ptr as *mut LeafNode;
1113 let key_to_this_page = if tree.cache_only {
1114 unsafe { &*mini_page }.try_get_key_to_reach_this_node()?
1115 } else {
1116 unsafe { &*mini_page }.get_key_to_reach_this_node()
1117 };
1118
1119 let (pid, parent) = tree.traverse_to_leaf(&key_to_this_page, true)?;
1121 info!(
1122 pid = pid.raw(),
1123 "starting to merge mini page in eviction call back"
1124 );
1125
1126 let mut leaf_entry = tree.mapping_table().get_mut(&pid);
1127
1128 histogram!(EvictNodeSize, unsafe { &*mini_page }.meta.node_size as u64);
1129
1130 match leaf_entry.get_page_location() {
1131 PageLocation::Mini(ptr) => {
1132 {
1133 if *ptr != mini_page {
1140 return Err(TreeError::NeedRestart);
1141 }
1142 }
1143
1144 let parent = parent.expect("Mini page must have a parent");
1145 parent.check_version()?;
1146
1147 if tree.cache_only {
1150 leaf_entry.change_to_null_loc();
1151 } else {
1152 leaf_entry.try_merge_mini_page(mini_page_handle, parent, &tree.storage)?;
1153 leaf_entry.change_to_base_loc();
1154 }
1156
1157 Ok(())
1158 }
1159
1160 PageLocation::Full(ptr) => {
1161 if *ptr != mini_page {
1162 return Err(TreeError::NeedRestart);
1163 }
1164
1165 leaf_entry.merge_full_page(mini_page_handle);
1166 Ok(())
1167 }
1168
1169 PageLocation::Base(_offset) => Err(TreeError::NeedRestart),
1171
1172 PageLocation::Null => Err(TreeError::NeedRestart),
1174 }
1175}
1176
1177#[cfg(test)]
1178mod tests {
1179 use crate::BfTree;
1180
1181 #[test]
1182 fn test_mini_page_size_classes() {
1183 let mut size_classes = BfTree::create_mem_page_size_classes(48, 1952, 4096, 64, false);
1184 assert_eq!(
1185 size_classes,
1186 vec![128, 192, 256, 512, 960, 1856, 2048, 4096]
1187 );
1188
1189 size_classes = BfTree::create_mem_page_size_classes(1548, 1548, 3136, 64, true);
1190 assert_eq!(size_classes, vec![1536, 3136]);
1191
1192 size_classes = BfTree::create_mem_page_size_classes(48, 3072, 12288, 64, false);
1193 assert_eq!(
1194 size_classes,
1195 vec![128, 192, 256, 512, 960, 1856, 3648, 7232, 9088, 12288]
1196 );
1197 }
1198}