1use std::cell::UnsafeCell;
5use std::collections::{HashMap, VecDeque};
6use std::mem::{ManuallyDrop, MaybeUninit};
7use std::ops::{Deref, DerefMut};
8use std::panic;
9use std::path::Path;
10use std::path::PathBuf;
11use std::sync::Arc;
12
13#[cfg(not(all(feature = "shuttle", test)))]
14use rand::Rng;
15#[cfg(all(feature = "shuttle", test))]
16use shuttle::rand::Rng;
17
18#[cfg(unix)]
19use std::os::unix::fs::FileExt;
20#[cfg(windows)]
21use std::os::windows::fs::FileExt;
22use std::sync::atomic::AtomicUsize;
23
24#[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
25use thread_local::ThreadLocal;
26
27use crate::{
28 circular_buffer::CircularBuffer,
29 error::ConfigError,
30 fs::VfsImpl,
31 mini_page_op::LeafOperations,
32 nodes::{leaf_node::MiniPageNextLevel, LeafNode, INVALID_DISK_OFFSET},
33 nodes::{InnerNode, InnerNodeBuilder, PageID, DISK_PAGE_SIZE, INNER_NODE_SIZE},
34 storage::{make_vfs, LeafStorage, PageLocation, PageTable},
35 sync::atomic::{AtomicBool, AtomicU64, Ordering},
36 utils::{get_rng, inner_lock::ReadGuard, BfsVisitor, NodeInfo},
37 wal::{LogEntry, LogEntryImpl, WriteAheadLog},
38 BfTree, Config, StorageBackend, WalConfig, WalReader,
39};
40
41const BF_TREE_MAGIC_BEGIN: &[u8; 16] = b"BF-TREE-V0-BEGIN";
42const BF_TREE_MAGIC_END: &[u8; 14] = b"BF-TREE-V0-END";
43const META_DATA_PAGE_OFFSET: usize = 0;
44
45const INVALID_SNAPSHOT_THREAD_ID: usize = usize::MAX; const NULL_PAGE_LOCATION_OFFSET: usize = usize::MAX; const INVALID_SNAPSHOT_STATE: u64 = u64::MAX; pub const INVALID_SNAPSHOT_VERSION: u64 = u64::MAX; const DEFAULT_MAX_SNAPSHOT_THREAD_NUM: usize = 256; const SNAPSHOT_STATE_PHASE_ID_SHIFT: usize = 61; const SNAPSHOT_STATE_PHASE_NUM: u64 = 4; const SNAPSHOT_STATE_PHASE_ID_MASK: u64 = 0b111 << SNAPSHOT_STATE_PHASE_ID_SHIFT; const SNAPSHOT_STATE_VERSION_MASK: u64 = (1 << SNAPSHOT_STATE_PHASE_ID_SHIFT) - 1; pub struct CPRSnapShotMgr {
65 global_state: AtomicU64,
69 thread_slots: [AtomicBool; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
71 thread_local_states: [AtomicU64; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
73 thread_local_inner_mappings:
76 UnsafeCell<[Vec<(*const InnerNode, usize)>; DEFAULT_MAX_SNAPSHOT_THREAD_NUM]>,
77 thread_local_base_mappings: UnsafeCell<[Vec<(PageID, usize)>; DEFAULT_MAX_SNAPSHOT_THREAD_NUM]>,
78 thread_local_mini_mappings: UnsafeCell<[Vec<(PageID, usize)>; DEFAULT_MAX_SNAPSHOT_THREAD_NUM]>,
79 thread_local_mini_size_mappings:
80 UnsafeCell<[Vec<(PageID, usize)>; DEFAULT_MAX_SNAPSHOT_THREAD_NUM]>,
81 root_id: AtomicU64, pause_snapshot: AtomicBool,
83 vfs: Arc<dyn VfsImpl>,
85 snapshot_in_progress: AtomicBool,
87}
88
89unsafe impl Sync for CPRSnapShotMgr {}
90
91unsafe impl Send for CPRSnapShotMgr {}
92
93pub struct CPRSnapshotGuard {
95 snapshot_mgr: Option<Arc<CPRSnapShotMgr>>,
96 thread_slot_id: usize,
97}
98
99impl CPRSnapshotGuard {
100 pub fn new(snapshot_mgr: Option<Arc<CPRSnapShotMgr>>) -> Result<Self, ()> {
101 match snapshot_mgr {
102 None => Ok(Self {
103 snapshot_mgr: None,
104 thread_slot_id: INVALID_SNAPSHOT_THREAD_ID,
105 }),
106 Some(ref mgr) => {
107 let thread_slot_id = mgr
108 .reserve_thread_slot()
109 .unwrap_or(INVALID_SNAPSHOT_THREAD_ID);
110 if thread_slot_id == INVALID_SNAPSHOT_THREAD_ID {
111 return Err(());
112 }
113
114 assert_eq!(
115 thread_slot_id,
116 CPRSnapShotMgr::get_snapshot_thread_id().unwrap()
117 );
118 assert_eq!(
119 CPRSnapShotMgr::get_snapshot_thread_version(),
120 mgr.get_local_version(&thread_slot_id)
121 );
122
123 Ok(Self {
124 snapshot_mgr: Some(mgr.clone()),
125 thread_slot_id,
126 })
127 }
128 }
129 }
130
131 pub fn is_protected(&self) -> bool {
133 self.thread_slot_id != INVALID_SNAPSHOT_THREAD_ID
134 }
135
136 pub fn get_local_phase_id(&self) -> u64 {
137 self.snapshot_mgr
138 .as_ref()
139 .unwrap()
140 .get_local_phase_id(&self.thread_slot_id)
141 }
142
143 pub fn snapshot_base_page(&self, id: PageID, ptr: &[u8], size: usize) {
144 self.snapshot_mgr
145 .as_ref()
146 .unwrap()
147 .snapshot_base_page(id, ptr, size);
148 }
149
150 pub fn snapshot_mini_page(&self, id: PageID, ptr: &[u8], size: usize) {
151 self.snapshot_mgr
152 .as_ref()
153 .unwrap()
154 .snapshot_mini_page(id, ptr, size);
155 }
156
157 pub fn snapshot_inner_node(&self, ptr: *const InnerNode) {
158 self.snapshot_mgr.as_ref().unwrap().snapshot_inner_node(ptr);
159 }
160
161 pub fn snapshot_root_page(&self, root_id: PageID) {
162 self.snapshot_mgr
163 .as_ref()
164 .unwrap()
165 .snapshot_root_page(root_id);
166 }
167}
168
169impl Drop for CPRSnapshotGuard {
170 fn drop(&mut self) {
171 if let Some(ref mgr) = self.snapshot_mgr {
172 mgr.release_thread_slot(self.thread_slot_id);
173 }
174 }
175}
176
177impl CPRSnapShotMgr {
178 thread_local! {
183 static SNAPSHOT_THREAD_ID: AtomicUsize = const { AtomicUsize::new(INVALID_SNAPSHOT_THREAD_ID) };
184 static SNAPSHOT_THREAD_VERSION: AtomicU64 = const { AtomicU64::new(INVALID_SNAPSHOT_VERSION) };
187 }
188
189 pub fn are_all_threads_in_next_version(&self) -> bool {
190 if !self.snapshot_in_progress.load(Ordering::Acquire) {
191 false
192 } else {
193 let global_phase_id = self.get_global_phase_id();
194 global_phase_id == 3 || global_phase_id == 0
195 }
196 }
197
198 pub fn get_snapshot_thread_id() -> Result<usize, ()> {
199 let tid = Self::SNAPSHOT_THREAD_ID.with(|id| id.load(Ordering::Relaxed));
200 if tid == INVALID_SNAPSHOT_THREAD_ID {
201 Err(())
202 } else {
203 Ok(tid)
204 }
205 }
206
207 pub fn set_snapshot_thread_tls(tid: usize, version: u64) {
208 Self::SNAPSHOT_THREAD_ID.with(|id| id.store(tid, Ordering::Relaxed));
209 Self::SNAPSHOT_THREAD_VERSION.with(|ver| ver.store(version, Ordering::Relaxed));
210 }
211
212 pub fn get_snapshot_thread_version() -> u64 {
213 Self::SNAPSHOT_THREAD_VERSION.with(|ver| ver.load(Ordering::Relaxed))
214 }
215
216 pub fn new(storage_backend: &StorageBackend, version: u64, file_path: &PathBuf) -> Self {
218 let vfs: Arc<dyn VfsImpl> = make_vfs(storage_backend, file_path);
219 Self {
220 global_state: AtomicU64::new(Self::new_snapshot_state(0, version)), thread_slots: [const { AtomicBool::new(false) }; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
222 thread_local_states: [const { AtomicU64::new(INVALID_SNAPSHOT_STATE) };
223 DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
224 thread_local_inner_mappings: UnsafeCell::new(
225 [const { Vec::new() }; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
226 ),
227 thread_local_base_mappings: UnsafeCell::new(
228 [const { Vec::new() }; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
229 ),
230 thread_local_mini_mappings: UnsafeCell::new(
231 [const { Vec::new() }; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
232 ),
233 thread_local_mini_size_mappings: UnsafeCell::new(
234 [const { Vec::new() }; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
235 ),
236 root_id: AtomicU64::new(0),
237 pause_snapshot: AtomicBool::new(false),
238 vfs,
239 snapshot_in_progress: AtomicBool::new(false),
240 }
241 }
242
243 fn reset(&self) {
245 let local_inner_mappings = unsafe { &mut *self.thread_local_inner_mappings.get() };
246 let local_mini_mappings = unsafe { &mut *self.thread_local_mini_mappings.get() };
247 let local_base_mappings = unsafe { &mut *self.thread_local_base_mappings.get() };
248 let local_mini_size_mappings = unsafe { &mut *self.thread_local_mini_size_mappings.get() };
249 for thread_slot_id in 0..DEFAULT_MAX_SNAPSHOT_THREAD_NUM {
251 local_inner_mappings[thread_slot_id].clear();
252 local_mini_mappings[thread_slot_id].clear();
253 local_base_mappings[thread_slot_id].clear();
254 local_mini_size_mappings[thread_slot_id].clear();
255 }
256 }
257
258 pub fn new_snapshot_state(phase_id: u64, version: u64) -> u64 {
259 assert!(
260 phase_id < SNAPSHOT_STATE_PHASE_NUM,
261 "Phase id must be less than {}",
262 SNAPSHOT_STATE_PHASE_NUM
263 );
264 assert!(
265 version < (1 << SNAPSHOT_STATE_PHASE_ID_SHIFT),
266 "Version must be less than 2^61"
267 );
268 (phase_id << SNAPSHOT_STATE_PHASE_ID_SHIFT) | version
269 }
270
271 fn get_global_version(&self) -> u64 {
272 self.global_state.load(Ordering::Acquire) & SNAPSHOT_STATE_VERSION_MASK
273 }
274
275 fn get_global_phase_id(&self) -> u64 {
276 (self.global_state.load(Ordering::Acquire) & SNAPSHOT_STATE_PHASE_ID_MASK)
277 >> SNAPSHOT_STATE_PHASE_ID_SHIFT
278 }
279
280 fn get_local_state(&self, thread_slot_id: &usize) -> u64 {
282 self.thread_local_states[*thread_slot_id].load(Ordering::Acquire)
283 }
284
285 fn set_local_state(&self, thread_slot_id: &usize, state: u64) {
286 let current_state = self.get_local_state(thread_slot_id);
288 if current_state == state {
289 return;
290 }
291
292 self.thread_local_states[*thread_slot_id].store(state, Ordering::Release);
293 }
294
295 pub fn get_local_version(&self, thread_slot_id: &usize) -> u64 {
297 let state = self.get_local_state(thread_slot_id);
298 state & SNAPSHOT_STATE_VERSION_MASK
299 }
300
301 pub fn get_local_phase_id(&self, thread_slot_id: &usize) -> u64 {
303 let state = self.get_local_state(thread_slot_id);
304 (state & SNAPSHOT_STATE_PHASE_ID_MASK) >> SNAPSHOT_STATE_PHASE_ID_SHIFT
305 }
306
307 fn advance_global_state(&self) -> u64 {
309 let phase_id = self.get_global_phase_id();
310 let version = self.get_global_version();
311
312 match phase_id {
313 0 => {
314 let new_state = Self::new_snapshot_state(1, version);
316 self.global_state.store(new_state, Ordering::Release);
317 new_state
318 }
319 1 => {
320 let new_state = Self::new_snapshot_state(2, version + 1);
322 self.global_state.store(new_state, Ordering::Release);
323 new_state
324 }
325 2 => {
326 let new_state = Self::new_snapshot_state(3, version);
328 self.global_state.store(new_state, Ordering::Release);
329 new_state
330 }
331 3 => {
332 let new_state = Self::new_snapshot_state(0, version);
334 self.global_state.store(new_state, Ordering::Release);
335 new_state
336 }
337 _ => {
338 panic!("Invalid global phase id: {}", phase_id);
339 }
340 }
341 }
342
343 fn check_if_phase_completed(&self, target_state: u64) -> bool {
346 for thread_slot_id in 0..DEFAULT_MAX_SNAPSHOT_THREAD_NUM {
348 let local_state = self.thread_local_states[thread_slot_id].load(Ordering::Acquire);
349 if local_state != INVALID_SNAPSHOT_STATE && local_state != target_state {
350 return false;
351 }
352 }
353 true
354 }
355
356 pub fn reserve_thread_slot(&self) -> Result<usize, ()> {
360 if self.pause_snapshot.load(Ordering::Acquire) {
361 return Err(());
362 }
363
364 let start = get_rng().random_range(0..DEFAULT_MAX_SNAPSHOT_THREAD_NUM);
365 let end = 2 * DEFAULT_MAX_SNAPSHOT_THREAD_NUM;
366
367 for i in start..end {
368 let tid = i % DEFAULT_MAX_SNAPSHOT_THREAD_NUM;
369 if self.thread_slots[tid]
370 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
371 .is_ok()
372 {
373 let global_state = self.global_state.load(Ordering::Acquire);
375 self.set_local_state(&tid, global_state);
376
377 if self.get_local_state(&tid) != self.global_state.load(Ordering::Acquire) {
386 self.set_local_state(&tid, INVALID_SNAPSHOT_STATE);
387 assert!(self.thread_slots[tid]
388 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
389 .is_ok());
390 return Err(());
391 } else {
392 assert!(Self::get_snapshot_thread_id().is_err());
394
395 Self::set_snapshot_thread_tls(tid, global_state & SNAPSHOT_STATE_VERSION_MASK);
397 return Ok(tid);
398 }
399 }
400 }
401 Err(())
402 }
403
404 pub fn release_thread_slot(&self, thread_slot_id: usize) {
407 self.set_local_state(&thread_slot_id, INVALID_SNAPSHOT_STATE);
408 self.thread_slots[thread_slot_id].store(false, Ordering::Release);
409
410 Self::set_snapshot_thread_tls(INVALID_SNAPSHOT_THREAD_ID, INVALID_SNAPSHOT_VERSION);
411 }
412
413 pub fn get_snapshot_guard(
414 snapshot_mgr: Option<Arc<CPRSnapShotMgr>>,
415 ) -> Result<CPRSnapshotGuard, ()> {
416 CPRSnapshotGuard::new(snapshot_mgr)
417 }
418
419 pub fn snapshot_page(&self, ptr: &[u8], size: usize) -> usize {
425 let offset = self.vfs.alloc_offset(size);
427
428 self.vfs.write(offset, ptr);
430
431 offset
433 }
434
435 pub fn snapshot_inner_node(&self, ptr: *const InnerNode) {
436 let offset = unsafe { self.snapshot_page((&*ptr).as_slice(), INNER_NODE_SIZE) };
437 let inner_mappings = unsafe { &mut *self.thread_local_inner_mappings.get() };
438
439 let tid = Self::get_snapshot_thread_id()
440 .expect("Snapshot of page triggered by unregistered thread.");
441 inner_mappings[tid].push((ptr, offset));
442 }
443
444 pub fn snapshot_mini_page(&self, id: PageID, ptr: &[u8], size: usize) {
445 let offset = if size != 0 {
446 self.snapshot_page(ptr, size)
447 } else {
448 NULL_PAGE_LOCATION_OFFSET
450 };
451
452 let mini_mappings = unsafe { &mut *self.thread_local_mini_mappings.get() };
453 let tid = Self::get_snapshot_thread_id()
454 .expect("Snapshot of page triggered by unregistered thread.");
455 mini_mappings[tid].push((id, offset));
456
457 let mini_size_mappings = unsafe { &mut *self.thread_local_mini_size_mappings.get() };
458 mini_size_mappings[tid].push((id, size));
459 }
460
461 pub fn snapshot_base_page(&self, id: PageID, ptr: &[u8], size: usize) {
462 let offset = self.snapshot_page(ptr, size);
463
464 let base_mappings = unsafe { &mut *self.thread_local_base_mappings.get() };
465 let tid = Self::get_snapshot_thread_id()
466 .expect("Snapshot of page triggered by unregistereds thread.");
467 base_mappings[tid].push((id, offset));
468 }
469
470 pub fn snapshot_root_page(&self, root_id: PageID) {
471 self.root_id.store(root_id.raw(), Ordering::Release);
472 }
473
474 fn sweep(
477 &self,
478 tree: &BfTree,
479 version: u64,
480 inner_mapping: &mut Vec<(*const InnerNode, usize)>,
481 mini_mapping: &mut Vec<(PageID, usize)>,
482 mini_size_mapping: &mut Vec<(PageID, usize)>,
483 base_mapping: &mut Vec<(PageID, usize)>,
484 ) -> usize {
485 self.pause_snapshot.store(true, Ordering::Release);
496 loop {
497 if self.check_if_phase_completed(INVALID_SNAPSHOT_STATE) {
498 loop {
501 let root_id = tree.get_root_page();
502 let rid = root_id.0;
503 if root_id.1 {
504 let mut leaf = tree.mapping_table().get(&rid);
506 let page_loc = leaf.get_page_location();
507
508 match page_loc {
509 PageLocation::Base(offset) => {
510 let base_ref = leaf.load_base_page(*offset);
511 if base_ref.get_snapshot_version() < version {
512 let base_ptr = unsafe {
513 std::slice::from_raw_parts(
514 base_ref as *const LeafNode as *const u8,
515 base_ref.meta.node_size as usize,
516 )
517 };
518 let offset = self
519 .snapshot_page(base_ptr, base_ref.meta.node_size as usize);
520 base_mapping.push((rid, offset));
521 self.snapshot_root_page(rid);
522 }
523 }
524 PageLocation::Mini(ptr) => {
525 assert!(tree.cache_only);
527
528 let mini_ref = leaf.load_cache_page(*ptr);
529 if mini_ref.get_snapshot_version() < version {
530 let mini_ptr = unsafe {
531 std::slice::from_raw_parts(
532 mini_ref as *const LeafNode as *const u8,
533 mini_ref.meta.node_size as usize,
534 )
535 };
536 let offset = self
537 .snapshot_page(mini_ptr, mini_ref.meta.node_size as usize);
538 mini_mapping.push((rid, offset));
539 mini_size_mapping.push((rid, mini_ref.meta.node_size as usize));
540 self.snapshot_root_page(rid);
541 }
542 }
543 _ => {
544 panic!("Unexpected page location for root page: {:?}", page_loc);
545 }
546 }
547
548 break;
549 } else {
550 let ptr = rid.as_inner_node();
552 let inner = match ReadGuard::try_read(ptr) {
554 Ok(inner) => inner,
555 Err(_) => continue,
556 };
557
558 if inner.as_ref().get_snapshot_version() < version {
559 let offset =
560 unsafe { self.snapshot_page((&*ptr).as_slice(), INNER_NODE_SIZE) };
561 inner_mapping.push((ptr, offset));
562 self.snapshot_root_page(rid);
563 }
564 break;
565 }
566 }
567
568 let visitor = BfsVisitor::new_inner_only(tree);
570 for node in visitor {
571 loop {
572 match node {
573 NodeInfo::Inner { ptr, .. } => {
574 let inner = match ReadGuard::try_read(ptr) {
576 Ok(inner) => inner,
577 Err(_) => continue,
578 };
579
580 if inner.as_ref().get_snapshot_version() < version {
581 let offset = unsafe {
582 self.snapshot_page((&*ptr).as_slice(), INNER_NODE_SIZE)
583 };
584 inner_mapping.push((ptr, offset));
585 }
586
587 break;
588 }
589 NodeInfo::Leaf { level, .. } => {
590 assert_eq!(level, 0);
592 break;
593 }
594 }
595 }
596 }
597
598 break;
599 }
600 std::thread::sleep(std::time::Duration::from_secs(1));
602 }
603 self.pause_snapshot.store(false, Ordering::Release);
605
606 let page_table_iter = tree.storage.page_table.iter();
609 let mut enumerate_leaf_count = 0;
610
611 for (_, pid) in page_table_iter {
612 assert!(pid.is_id());
613
614 let mut leaf = tree.mapping_table().get(&pid);
616 let page_loc = leaf.get_page_location();
617 enumerate_leaf_count += 1;
618
619 match page_loc {
620 PageLocation::Base(offset) => {
621 let base_ref = leaf.load_base_page(*offset);
622 if base_ref.get_snapshot_version() < version {
623 let base_ptr = unsafe {
624 std::slice::from_raw_parts(
625 base_ref as *const LeafNode as *const u8,
626 base_ref.meta.node_size as usize,
627 )
628 };
629 let offset = self.snapshot_page(base_ptr, base_ref.meta.node_size as usize);
630 base_mapping.push((pid, offset));
631 }
632 }
633 PageLocation::Full(ptr) => {
634 let full_ref = leaf.load_cache_page(*ptr);
636 if full_ref.get_snapshot_version() < version {
637 let full_ptr = unsafe {
638 std::slice::from_raw_parts(
639 full_ref as *const LeafNode as *const u8,
640 full_ref.meta.node_size as usize,
641 )
642 };
643 let offset = self.snapshot_page(full_ptr, full_ref.meta.node_size as usize);
644 base_mapping.push((pid, offset));
645 }
646 }
647 PageLocation::Mini(ptr) => {
648 let mini_ref = leaf.load_cache_page(*ptr);
649 if mini_ref.get_snapshot_version() < version {
650 let mini_ptr = unsafe {
651 std::slice::from_raw_parts(
652 mini_ref as *const LeafNode as *const u8,
653 mini_ref.meta.node_size as usize,
654 )
655 };
656 let offset = self.snapshot_page(mini_ptr, mini_ref.meta.node_size as usize);
657 mini_mapping.push((pid, offset));
658 mini_size_mapping.push((pid, mini_ref.meta.node_size as usize));
659
660 if !tree.cache_only {
661 let base_ref = leaf.load_base_page(mini_ref.next_level.as_offset());
663 assert!(base_ref.get_snapshot_version() < version); let base_ptr = unsafe {
666 std::slice::from_raw_parts(
667 base_ref as *const LeafNode as *const u8,
668 base_ref.meta.node_size as usize,
669 )
670 };
671 let offset =
672 self.snapshot_page(base_ptr, base_ref.meta.node_size as usize);
673 base_mapping.push((pid, offset));
674 }
675 }
676 }
677 PageLocation::Null => {
678 assert!(tree.cache_only);
679 mini_mapping.push((pid, NULL_PAGE_LOCATION_OFFSET)); mini_size_mapping.push((pid, 0));
686 }
687 }
688 }
689
690 enumerate_leaf_count
691 }
692
693 #[allow(clippy::too_many_arguments)]
695 fn finalize(
696 &self,
697 snapshot_version: u64,
698 inner_mapping: &mut [(*const InnerNode, usize)],
699 mini_mapping: &mut [(PageID, usize)],
700 mini_size_mapping: &mut [(PageID, usize)],
701 base_mapping: &mut [(PageID, usize)],
702 leaf_count_upper: usize,
703 config: Arc<Config>,
704 ) {
705 let mut inner_mapping_unique: HashMap<*const InnerNode, usize> = HashMap::new();
707 let mut mini_mapping_unique: HashMap<PageID, usize> = HashMap::new();
708 let mut mini_size_mapping_unique: HashMap<PageID, usize> = HashMap::new();
709 let mut base_mapping_unique: HashMap<PageID, usize> = HashMap::new();
710
711 let local_inner_mappings = unsafe { &mut *self.thread_local_inner_mappings.get() };
712 let local_mini_mappings = unsafe { &mut *self.thread_local_mini_mappings.get() };
713 let local_mini_size_mappings = unsafe { &mut *self.thread_local_mini_size_mappings.get() };
714 let local_base_mappings = unsafe { &mut *self.thread_local_base_mappings.get() };
715
716 for thread_slot_id in 0..DEFAULT_MAX_SNAPSHOT_THREAD_NUM {
718 for m in local_inner_mappings[thread_slot_id].iter() {
719 inner_mapping_unique.entry(m.0).or_insert(m.1);
720 }
721 for m in local_mini_mappings[thread_slot_id].iter() {
722 mini_mapping_unique.entry(m.0).or_insert(m.1);
723 }
724 for m in local_mini_size_mappings[thread_slot_id].iter() {
725 if let std::collections::hash_map::Entry::Vacant(e) =
726 mini_size_mapping_unique.entry(m.0)
727 {
728 e.insert(m.1);
729 } else {
730 if !config.cache_only {
735 assert!(m.1 == mini_size_mapping_unique.get(&m.0).copied().unwrap());
736 }
737 }
738 }
739
740 for m in local_base_mappings[thread_slot_id].iter() {
741 base_mapping_unique.entry(m.0).or_insert(m.1);
742 }
743 }
744
745 for (k, v) in inner_mapping.iter() {
747 if !inner_mapping_unique.contains_key(k) {
748 inner_mapping_unique.insert(*k, *v);
749 }
750 }
751 for (k, v) in mini_mapping.iter() {
752 if !mini_mapping_unique.contains_key(k) {
753 mini_mapping_unique.insert(*k, *v);
754 }
755 }
756 for (k, v) in mini_size_mapping.iter() {
757 if !mini_size_mapping_unique.contains_key(k) {
758 mini_size_mapping_unique.insert(*k, *v);
759 } else {
760 if !config.cache_only {
761 assert!(*v == mini_size_mapping_unique.get(k).copied().unwrap());
762 }
763 }
764 }
765 for (k, v) in base_mapping.iter() {
766 if !base_mapping_unique.contains_key(k) {
767 base_mapping_unique.insert(*k, *v);
768 }
769 }
770
771 assert!(mini_mapping_unique.len() == mini_size_mapping_unique.len());
773 for (k, _) in mini_mapping_unique.iter() {
774 assert!(mini_size_mapping_unique.contains_key(k));
775 }
776
777 if config.cache_only {
778 assert!(base_mapping_unique.is_empty());
779 } else {
780 assert!(mini_mapping_unique.len() <= base_mapping_unique.len());
781 }
782
783 let mut final_inner_mapping: Vec<(*const InnerNode, usize)> = Vec::new();
785 for (k, v) in inner_mapping_unique.into_iter() {
786 final_inner_mapping.push((k, v));
787 }
788
789 let mut sorted_base_mapping_uninit: Vec<MaybeUninit<(PageID, usize)>> =
793 Vec::with_capacity(base_mapping_unique.len());
794 unsafe {
795 sorted_base_mapping_uninit.set_len(base_mapping_unique.len());
796 }
797 let mut sorted_base_mapping_init = vec![false; base_mapping_unique.len()];
798
799 for (k, v) in base_mapping_unique.iter() {
800 assert!(k.is_id());
801 let offset = k.as_id();
802 assert!((offset as usize) < sorted_base_mapping_uninit.len());
803 sorted_base_mapping_init[offset as usize] = true;
804 sorted_base_mapping_uninit[offset as usize].write((*k, *v));
805 }
806 let final_sorted_base_mapping: Vec<(PageID, usize)> = if !config.cache_only {
807 assert_eq!(
808 base_mapping_unique.len(),
809 sorted_base_mapping_init.iter().filter(|&&b| b).count()
810 );
811 unsafe {
812 std::mem::transmute::<
813 std::vec::Vec<std::mem::MaybeUninit<(PageID, usize)>>,
814 std::vec::Vec<(PageID, usize)>,
815 >(sorted_base_mapping_uninit)
816 }
817 } else {
818 Vec::new()
819 };
820
821 let mut final_mini_mapping: Vec<(PageID, usize)> = Vec::new();
823 for (k, v) in mini_mapping_unique.into_iter() {
824 final_mini_mapping.push((k, v));
825 }
826
827 let mut final_mini_size_mapping: Vec<(PageID, usize)> = Vec::new();
828 for (k, v) in mini_size_mapping_unique.into_iter() {
829 final_mini_size_mapping.push((k, v));
830 }
831
832 let leaf_page_num = if config.cache_only {
833 leaf_count_upper
841 } else {
842 assert!(leaf_count_upper >= final_sorted_base_mapping.len());
843 final_sorted_base_mapping.len()
844 };
845
846 let mut file_size = std::mem::size_of::<BfTreeMeta>() as u64;
847
848 let (inner_offset, inner_size) = serialize_vec_to_disk(&final_inner_mapping, &self.vfs);
850
851 if inner_offset != 0 {
852 file_size = (inner_offset + align_to_sector_size(inner_size)) as u64;
853 }
854
855 let (mini_offset, mini_size) = serialize_vec_to_disk(&final_mini_mapping, &self.vfs);
856
857 if mini_offset != 0 {
858 file_size = (mini_offset + align_to_sector_size(mini_size)) as u64;
859 }
860
861 let (mini_size_offset, mini_size_size) =
862 serialize_vec_to_disk(&final_mini_size_mapping, &self.vfs);
863
864 if mini_size_offset != 0 {
865 file_size = (mini_size_offset + align_to_sector_size(mini_size_size)) as u64;
866 }
867
868 let (base_offset, base_size) = serialize_vec_to_disk(&final_sorted_base_mapping, &self.vfs);
869
870 if base_offset != 0 {
871 file_size = (base_offset + align_to_sector_size(base_size)) as u64;
872 }
873
874 let metadata = BfTreeMeta {
876 magic_begin: *BF_TREE_MAGIC_BEGIN,
877 root_id: unsafe { PageID::from_raw(self.root_id.load(Ordering::Acquire)) },
878 inner_offset,
879 inner_size,
880 mini_offset,
881 mini_size,
882 mini_size_offset,
883 mini_size_size,
884 base_offset,
885 base_size,
886 file_size,
887 leaf_page_num,
888 snapshot_version,
889 cache_only: config.cache_only,
890 cb_size_byte: config.cb_size_byte,
891 read_promotion_rate: config.read_promotion_rate.load(Ordering::Relaxed),
892 scan_promotion_rate: config.scan_promotion_rate.load(Ordering::Relaxed),
893 cb_min_record_size: config.cb_min_record_size,
894 cb_max_record_size: config.cb_max_record_size,
895 leaf_page_size: config.leaf_page_size,
896 cb_max_key_len: config.cb_max_key_len,
897 max_fence_len: config.max_fence_len,
898 cb_copy_on_access_ratio: config.cb_copy_on_access_ratio,
899 read_record_cache: config.read_record_cache,
900 max_mini_page_size: config.max_mini_page_size,
901 mini_page_binary_search: config.mini_page_binary_search,
902 write_load_full_page: config.write_load_full_page,
903 magic_end: *BF_TREE_MAGIC_END,
904 };
905
906 self.vfs.write(META_DATA_PAGE_OFFSET, metadata.as_slice());
907 self.vfs.flush();
908
909 self.reset();
910 }
911
912 pub fn snapshot(&self, tree: &BfTree) {
914 if self
916 .snapshot_in_progress
917 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
918 .is_err()
919 {
920 return;
921 }
922
923 self.vfs.reset();
925
926 let mut sweep_inner_mapping: Vec<(*const InnerNode, usize)> = Vec::new();
928 let mut sweep_mini_mapping: Vec<(PageID, usize)> = Vec::new();
929 let mut sweep_mini_size_mapping: Vec<(PageID, usize)> = Vec::new();
930 let mut sweep_base_mapping: Vec<(PageID, usize)> = Vec::new();
931
932 let mut current_global_phase_id = self.get_global_phase_id();
934 assert_eq!(current_global_phase_id, 0);
935
936 let snapshot_version = self.get_global_version();
938
939 let mut current_global_state = self.advance_global_state();
941 current_global_phase_id = self.get_global_phase_id();
942 assert_eq!(current_global_phase_id, 1);
943 assert_eq!(snapshot_version, self.get_global_version());
944
945 let mut leaf_node_count_upper_bound = 0; loop {
948 if self.check_if_phase_completed(current_global_state) {
949 match current_global_phase_id {
950 0 => {
952 self.finalize(
957 snapshot_version,
958 &mut sweep_inner_mapping,
959 &mut sweep_mini_mapping,
960 &mut sweep_mini_size_mapping,
961 &mut sweep_base_mapping,
962 leaf_node_count_upper_bound,
963 tree.config.clone(),
964 );
965
966 break;
968 }
969 1 => {
971 current_global_state = self.advance_global_state();
972 current_global_phase_id = self.get_global_phase_id();
973 assert_eq!(current_global_phase_id, 2);
974 assert_eq!(snapshot_version + 1, self.get_global_version());
975 }
976 2 => {
978 current_global_state = self.advance_global_state();
979 current_global_phase_id = self.get_global_phase_id();
980 assert_eq!(current_global_phase_id, 3);
981 assert_eq!(snapshot_version + 1, self.get_global_version());
982 }
983 3 => {
985 leaf_node_count_upper_bound = self.sweep(
993 tree,
994 snapshot_version + 1,
995 &mut sweep_inner_mapping,
996 &mut sweep_mini_mapping,
997 &mut sweep_mini_size_mapping,
998 &mut sweep_base_mapping,
999 );
1000
1001 current_global_state = self.advance_global_state();
1002 current_global_phase_id = self.get_global_phase_id();
1003 assert_eq!(current_global_phase_id, 0);
1004 assert_eq!(snapshot_version + 1, self.get_global_version());
1005 }
1006 _ => {
1007 panic!("Invalid global phase id: {}", current_global_phase_id);
1008 }
1009 }
1010 }
1011
1012 std::thread::sleep(std::time::Duration::from_secs(1));
1014 }
1015
1016 assert!(self
1017 .snapshot_in_progress
1018 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
1019 .is_ok());
1020 }
1021
1022 pub fn new_from_snapshot(
1036 recovery_snapshot_file_path: PathBuf, use_snapshot: bool,
1038 new_snapshot_file_path: Option<PathBuf>, buffer_ptr: Option<*mut u8>,
1040 buffer_size: Option<usize>, wal_config: Option<Arc<WalConfig>>,
1042 ) -> Result<BfTree, ConfigError> {
1043 if !recovery_snapshot_file_path.exists() {
1045 return Err(ConfigError::SnapshotFileInvalid(
1047 "Not found ".to_string()
1048 + &recovery_snapshot_file_path
1049 .clone()
1050 .into_os_string()
1051 .into_string()
1052 .unwrap(),
1053 ));
1054 }
1055
1056 let wal = wal_config.as_ref().map(|s| WriteAheadLog::new(s.clone()));
1058
1059 let reader = std::fs::File::open(recovery_snapshot_file_path.clone()).unwrap();
1061 let mut metadata = SectorAlignedVector::new_zeroed(DISK_PAGE_SIZE); #[cfg(unix)]
1063 {
1064 reader.read_at(&mut metadata, 0).unwrap();
1065 }
1066 #[cfg(windows)]
1067 {
1068 reader.seek_read(&mut metadata, 0).unwrap();
1069 }
1070
1071 let bf_meta = unsafe { (metadata.as_ptr() as *const BfTreeMeta).read() };
1072 bf_meta.check_magic();
1073 assert_eq!(reader.metadata().unwrap().len(), bf_meta.file_size);
1074
1075 let mut bf_tree_config = Config::new_from_snapshot(&bf_meta);
1076
1077 let recovery_snapshot_file_backend = StorageBackend::Std; if !bf_tree_config.cache_only {
1079 bf_tree_config.file_path(recovery_snapshot_file_path.clone());
1080 bf_tree_config.storage_backend = recovery_snapshot_file_backend.clone();
1082 } else {
1083 bf_tree_config.storage_backend = StorageBackend::Memory;
1084 }
1085 bf_tree_config.use_snapshot = use_snapshot;
1086
1087 bf_tree_config.snapshot_backend = StorageBackend::Std; bf_tree_config.snapshot_file_path = match new_snapshot_file_path {
1090 Some(path) => path,
1091 None => PathBuf::new(),
1092 };
1093
1094 let snapshot_mgr = if bf_tree_config.use_snapshot {
1095 Some(Arc::new(CPRSnapShotMgr::new(
1096 &bf_tree_config.snapshot_backend,
1097 bf_tree_config.snapshot_version,
1098 &bf_tree_config.snapshot_file_path,
1099 )))
1100 } else {
1101 None
1102 };
1103
1104 if let Some(size) = buffer_size {
1105 bf_tree_config.cb_size_byte = size
1106 }
1107
1108 let size_classes = BfTree::create_mem_page_size_classes(
1109 bf_tree_config.cb_min_record_size,
1110 bf_tree_config.cb_max_record_size,
1111 bf_tree_config.leaf_page_size,
1112 bf_tree_config.max_fence_len,
1113 bf_tree_config.cache_only,
1114 );
1115
1116 bf_tree_config.write_ahead_log = wal_config.clone();
1117 bf_tree_config.validate()?;
1118
1119 let config = Arc::new(bf_tree_config);
1120
1121 let recovery_snapshot_vfs = make_vfs(
1123 &recovery_snapshot_file_backend,
1124 recovery_snapshot_file_path.clone(),
1125 );
1126
1127 let mut root_page_id = bf_meta.root_id;
1129 let mut inner_node_page_buffer = SectorAlignedVector::new_zeroed(INNER_NODE_SIZE);
1130 if root_page_id.is_inner_node_pointer() {
1131 let inner_mapping: Vec<(*const InnerNode, usize)> = read_vec_from_offset(
1132 bf_meta.inner_offset,
1133 bf_meta.inner_size,
1134 &recovery_snapshot_vfs,
1135 );
1136 let mut inner_map = HashMap::new();
1137
1138 for m in inner_mapping {
1139 inner_map.insert(m.0, m.1);
1140 }
1141 let offset = inner_map.get(&root_page_id.as_inner_node()).unwrap();
1142 recovery_snapshot_vfs.read(*offset, &mut inner_node_page_buffer);
1143 let root_page = InnerNodeBuilder::new().build_from_slice(&inner_node_page_buffer);
1144
1145 unsafe {
1147 (*root_page).set_disk_offset(INVALID_DISK_OFFSET as u64);
1148 }
1149 unsafe {
1151 (*root_page).set_root(true);
1152 }
1153 root_page_id = PageID::from_pointer(root_page);
1154
1155 let mut inner_resolve_queue = VecDeque::from([root_page]);
1156 while !inner_resolve_queue.is_empty() {
1157 let inner_ptr = inner_resolve_queue.pop_front().unwrap();
1158 let mut inner = ReadGuard::try_read(inner_ptr).unwrap().upgrade().unwrap();
1159 if inner.as_ref().meta.children_is_leaf() {
1160 continue;
1161 }
1162 for (idx, c) in inner.as_ref().get_child_iter().enumerate() {
1163 let offset = inner_map.get(&c.as_inner_node()).unwrap();
1164 recovery_snapshot_vfs.read(*offset, &mut inner_node_page_buffer);
1165 let inner_page =
1166 InnerNodeBuilder::new().build_from_slice(&inner_node_page_buffer);
1167 unsafe {
1168 (*inner_page).set_disk_offset(INVALID_DISK_OFFSET as u64);
1169 }
1170 let inner_id = PageID::from_pointer(inner_page);
1171 inner.as_mut().update_at_pos(idx, inner_id);
1172 inner_resolve_queue.push_back(inner_page);
1173 }
1174 }
1175 }
1176
1177 let raw_root_id = if root_page_id.is_id() {
1178 root_page_id.raw() | BfTree::ROOT_IS_LEAF_MASK
1179 } else {
1180 root_page_id.raw()
1181 };
1182
1183 if !bf_meta.cache_only {
1186 let base_mapping: Vec<(PageID, usize)> = if bf_meta.base_size > 0 {
1188 read_vec_from_offset(
1189 bf_meta.base_offset,
1190 bf_meta.base_size,
1191 &recovery_snapshot_vfs,
1192 )
1193 } else {
1194 Vec::new()
1195 };
1196
1197 let base_page_loc_mapping = base_mapping.into_iter().map(|(pid, offset)| {
1198 let loc = PageLocation::Base(offset);
1199 (pid, loc)
1200 });
1201
1202 let pt = PageTable::new_from_mapping(
1204 base_page_loc_mapping,
1205 recovery_snapshot_vfs.clone(),
1206 config.clone(),
1207 snapshot_mgr.clone(),
1208 );
1209
1210 let circular_buffer = CircularBuffer::new(
1211 config.cb_size_byte,
1212 config.cb_copy_on_access_ratio,
1213 config.cb_min_record_size,
1214 config.cb_max_record_size,
1215 config.leaf_page_size,
1216 config.max_fence_len,
1217 buffer_ptr,
1218 config.cache_only,
1219 );
1220
1221 let mini_mapping: Vec<(PageID, usize)> = if bf_meta.mini_size > 0 {
1223 read_vec_from_offset(
1224 bf_meta.mini_offset,
1225 bf_meta.mini_size,
1226 &recovery_snapshot_vfs,
1227 )
1228 } else {
1229 Vec::new()
1230 };
1231
1232 let mini_size_mapping: Vec<(PageID, usize)> = if bf_meta.mini_size_size > 0 {
1233 read_vec_from_offset(
1234 bf_meta.mini_size_offset,
1235 bf_meta.mini_size_size,
1236 &recovery_snapshot_vfs,
1237 )
1238 } else {
1239 Vec::new()
1240 };
1241
1242 let mut mini_size_mapping_unique: HashMap<PageID, usize> = HashMap::new();
1243 for (pid, size) in mini_size_mapping {
1244 mini_size_mapping_unique.insert(pid, size);
1245 }
1246
1247 let storage =
1249 LeafStorage::new_inner(config.clone(), pt, circular_buffer, recovery_snapshot_vfs);
1250
1251 for (pid, offset) in &mini_mapping {
1252 let mini_size = *mini_size_mapping_unique.get(pid).unwrap();
1253
1254 let mini_page_guard = match storage.alloc_mini_page(mini_size) {
1256 Ok(mini_page_ptr) => mini_page_ptr,
1257 Err(_) => {
1258 return Err(ConfigError::CircularBufferSize("buffer size set too small. Consider increasing it or not specifying at all".to_string()));
1259 }
1260 };
1261
1262 let mut page_buffer = SectorAlignedVector::new_zeroed(mini_size);
1264 storage.vfs.read(*offset, &mut page_buffer);
1265 unsafe {
1266 std::ptr::copy_nonoverlapping(
1267 page_buffer.as_ptr(),
1268 mini_page_guard.as_ptr(),
1269 mini_size,
1270 );
1271 }
1272
1273 let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
1275 let mini_page = unsafe { &mut *new_mini_ptr };
1276
1277 let mut base_page = storage.page_table.get_mut(pid);
1278 let page_loc = base_page.get_page_location().clone();
1279 match page_loc {
1280 PageLocation::Base(off) => {
1281 mini_page.next_level = MiniPageNextLevel::new(off);
1282 }
1283 _ => {
1284 panic!("Unexpected page location for base page");
1285 }
1286 }
1287 let mini_loc = PageLocation::Mini(new_mini_ptr);
1288
1289 base_page.create_cache_page_loc(mini_loc);
1291 }
1292
1293 Ok(BfTree {
1294 storage,
1295 root_page_id: AtomicU64::new(raw_root_id),
1296 wal,
1297 write_load_full_page: config.write_load_full_page,
1298 cache_only: false,
1299 mini_page_size_classes: size_classes,
1300 snapshot_mgr,
1301 config,
1302 #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
1303 metrics_recorder: Some(Arc::new(ThreadLocal::new())),
1304 })
1305 } else {
1306 let mini_mapping_unallocated: Vec<(PageID, PageLocation)> = (0..bf_meta.leaf_page_num)
1308 .map(|pid| (PageID::from_id(pid as u64), PageLocation::Null))
1309 .collect();
1310
1311 let mini_mapping: Vec<(PageID, usize)> = if bf_meta.mini_size > 0 {
1313 read_vec_from_offset(
1314 bf_meta.mini_offset,
1315 bf_meta.mini_size,
1316 &recovery_snapshot_vfs,
1317 )
1318 } else {
1319 Vec::new()
1320 };
1321
1322 let mini_size_mapping: Vec<(PageID, usize)> = if bf_meta.mini_size_size > 0 {
1323 read_vec_from_offset(
1324 bf_meta.mini_size_offset,
1325 bf_meta.mini_size_size,
1326 &recovery_snapshot_vfs,
1327 )
1328 } else {
1329 Vec::new()
1330 };
1331
1332 let mut mini_size_mapping_unique: HashMap<PageID, usize> = HashMap::new();
1333 for (pid, size) in mini_size_mapping {
1334 mini_size_mapping_unique.insert(pid, size);
1335 }
1336
1337 let storage_vfs = make_vfs(&config.storage_backend, PathBuf::new());
1339
1340 let pt = PageTable::new_from_mapping(
1341 mini_mapping_unallocated.into_iter(),
1342 storage_vfs.clone(),
1343 config.clone(),
1344 snapshot_mgr.clone(),
1345 );
1346
1347 let circular_buffer = CircularBuffer::new(
1348 config.cb_size_byte,
1349 config.cb_copy_on_access_ratio,
1350 config.cb_min_record_size,
1351 config.cb_max_record_size,
1352 config.leaf_page_size,
1353 config.max_fence_len,
1354 buffer_ptr,
1355 config.cache_only,
1356 );
1357
1358 let storage = LeafStorage::new_inner(config.clone(), pt, circular_buffer, storage_vfs);
1360
1361 for (pid, offset) in &mini_mapping {
1362 let mini_size = *mini_size_mapping_unique.get(pid).unwrap();
1363
1364 if *offset == NULL_PAGE_LOCATION_OFFSET {
1366 continue;
1367 }
1368
1369 let mini_page_guard = match storage.alloc_mini_page(mini_size) {
1371 Ok(mini_page_ptr) => mini_page_ptr,
1372 Err(_) => {
1373 panic!("Please increase cb_size_byte in config");
1374 }
1375 };
1376
1377 let mut page_buffer = SectorAlignedVector::new_zeroed(mini_size);
1379 recovery_snapshot_vfs.read(*offset, &mut page_buffer);
1380 unsafe {
1381 std::ptr::copy_nonoverlapping(
1382 page_buffer.as_ptr(),
1383 mini_page_guard.as_ptr(),
1384 mini_size,
1385 );
1386 }
1387
1388 let mini_page_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
1390 let mini_page = unsafe { &mut *mini_page_ptr };
1391 mini_page.next_level = MiniPageNextLevel::new_null();
1392
1393 let mut null_page = storage.page_table.get_mut(pid);
1395 let page_loc = null_page.get_page_location().clone();
1396 match page_loc {
1397 PageLocation::Null => {
1398 let mini_loc = PageLocation::Mini(mini_page_ptr);
1399 null_page.create_cache_page_loc(mini_loc);
1400 }
1401 _ => {
1402 panic!("Unexpected page location for null page");
1403 }
1404 }
1405 }
1406 Ok(BfTree {
1407 storage,
1408 root_page_id: AtomicU64::new(raw_root_id),
1409 wal,
1410 write_load_full_page: config.write_load_full_page,
1411 cache_only: true,
1412 mini_page_size_classes: size_classes,
1413 snapshot_mgr,
1414 config,
1415 #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
1416 metrics_recorder: Some(Arc::new(ThreadLocal::new())),
1417 })
1418 }
1419 }
1420}
1421
1422impl BfTree {
1423 pub fn recovery(
1426 recovery_snapshot_file_path: PathBuf, wal_file: impl AsRef<Path>,
1428 use_snapshot: bool,
1429 new_snapshot_file_path: Option<PathBuf>, buffer_ptr: Option<*mut u8>,
1431 buffer_size: Option<usize>,
1432 wal: Option<Arc<WalConfig>>,
1433 ) {
1434 let bf_tree = BfTree::new_from_cpr_snapshot(
1435 recovery_snapshot_file_path,
1436 use_snapshot,
1437 new_snapshot_file_path,
1438 buffer_ptr,
1439 buffer_size,
1440 wal,
1441 )
1442 .unwrap();
1443 let wal_reader = WalReader::new(wal_file, 4096);
1444
1445 for seg in wal_reader.segment_iter() {
1446 for entry in seg.entry_iter() {
1447 let log_entry = LogEntry::read_from_buffer(entry.1);
1448 match log_entry {
1449 LogEntry::Write(op) => {
1450 bf_tree.insert(op.key, op.value);
1451 }
1452 LogEntry::Split(_op) => {
1453 todo!("implement split op in wal!")
1454 }
1455 }
1456 }
1457 }
1458 }
1459
1460 pub fn cpr_snapshot(&self) {
1462 if !self.config.use_snapshot {
1463 panic!("Snapshots are not enabled in the configuration");
1464 }
1465
1466 let snpshot_mgr = self.snapshot_mgr.clone().unwrap();
1467 snpshot_mgr.snapshot(self);
1468 }
1469
1470 pub fn new_from_cpr_snapshot(
1472 recovery_snapshot_file_path: PathBuf, use_snapshot: bool,
1474 new_snapshot_file_path: Option<PathBuf>, buffer_ptr: Option<*mut u8>,
1476 buffer_size: Option<usize>,
1477 wal: Option<Arc<WalConfig>>,
1478 ) -> Result<BfTree, ConfigError> {
1479 CPRSnapShotMgr::new_from_snapshot(
1480 recovery_snapshot_file_path,
1481 use_snapshot,
1482 new_snapshot_file_path,
1483 buffer_ptr,
1484 buffer_size,
1485 wal,
1486 )
1487 }
1488
1489 pub fn are_all_threads_in_next_version(&self) -> bool {
1491 let snpshot_mgr = self.snapshot_mgr.clone().unwrap();
1492 snpshot_mgr.are_all_threads_in_next_version()
1493 }
1494}
1495
1496struct SectorAlignedVector {
1497 inner: ManuallyDrop<Vec<u8>>,
1498}
1499
1500impl Drop for SectorAlignedVector {
1501 fn drop(&mut self) {
1502 let layout =
1503 std::alloc::Layout::from_size_align(self.inner.capacity(), SECTOR_SIZE).unwrap();
1504 let ptr = self.inner.as_mut_ptr();
1505 unsafe {
1506 std::alloc::dealloc(ptr, layout);
1507 }
1508 }
1509}
1510
1511impl SectorAlignedVector {
1512 fn new_zeroed(capacity: usize) -> Self {
1513 let layout = std::alloc::Layout::from_size_align(capacity, SECTOR_SIZE).unwrap();
1514 let ptr = unsafe { std::alloc::alloc_zeroed(layout) };
1515
1516 let inner = unsafe { Vec::from_raw_parts(ptr, capacity, capacity) };
1517 Self {
1518 inner: ManuallyDrop::new(inner),
1519 }
1520 }
1521}
1522
1523impl Deref for SectorAlignedVector {
1524 type Target = Vec<u8>;
1525
1526 fn deref(&self) -> &Self::Target {
1527 &self.inner
1528 }
1529}
1530
1531impl DerefMut for SectorAlignedVector {
1532 fn deref_mut(&mut self) -> &mut Self::Target {
1533 &mut self.inner
1534 }
1535}
1536
1537#[repr(C, align(512))]
1541pub(crate) struct BfTreeMeta {
1542 magic_begin: [u8; 16],
1543 root_id: PageID,
1545 inner_offset: usize,
1546 inner_size: usize,
1547 mini_offset: usize,
1548 mini_size: usize,
1549 mini_size_offset: usize,
1550 mini_size_size: usize,
1551 base_offset: usize,
1552 base_size: usize,
1553 file_size: u64,
1554 leaf_page_num: usize,
1555 pub(crate) cb_size_byte: usize,
1557 pub(crate) snapshot_version: u64,
1558 pub(crate) cache_only: bool,
1559 pub(crate) read_promotion_rate: usize,
1560 pub(crate) scan_promotion_rate: usize,
1561 pub(crate) cb_min_record_size: usize,
1562 pub(crate) cb_max_record_size: usize,
1563 pub(crate) leaf_page_size: usize,
1564 pub(crate) cb_max_key_len: usize,
1565 pub(crate) max_fence_len: usize,
1566 pub(crate) cb_copy_on_access_ratio: f64,
1567 pub(crate) read_record_cache: bool,
1568 pub(crate) max_mini_page_size: usize,
1569 pub(crate) mini_page_binary_search: bool,
1570 pub(crate) write_load_full_page: bool,
1571 magic_end: [u8; 14],
1572}
1573const _: () = assert!(std::mem::size_of::<BfTreeMeta>() <= DISK_PAGE_SIZE);
1574
1575impl BfTreeMeta {
1576 fn as_slice(&self) -> &[u8] {
1577 let ptr = self as *const Self as *const u8;
1578 let size = std::mem::size_of::<Self>();
1579 unsafe { std::slice::from_raw_parts(ptr, size) }
1580 }
1581
1582 fn check_magic(&self) {
1583 assert_eq!(self.magic_begin, *BF_TREE_MAGIC_BEGIN);
1584 assert_eq!(self.magic_end, *BF_TREE_MAGIC_END);
1585 }
1586}
1587
1588fn serialize_vec_to_disk<T>(v: &[T], vfs: &Arc<dyn VfsImpl>) -> (usize, usize) {
1590 if v.is_empty() {
1591 return (0, 0);
1592 }
1593 let unaligned_ptr = v.as_ptr() as *const u8;
1594 let unaligned_size = std::mem::size_of_val(v);
1595
1596 let aligned_size = align_to_sector_size(unaligned_size);
1597 let layout = std::alloc::Layout::from_size_align(aligned_size, SECTOR_SIZE).unwrap();
1598 unsafe {
1599 let aligned_ptr = std::alloc::alloc_zeroed(layout);
1600 std::ptr::copy_nonoverlapping(unaligned_ptr, aligned_ptr, unaligned_size);
1601 let slice = std::slice::from_raw_parts(aligned_ptr, aligned_size);
1602 let offset = serialize_u8_slice_to_disk(slice, vfs);
1603 std::alloc::dealloc(aligned_ptr, layout);
1604 (offset, unaligned_size)
1605 }
1606}
1607
1608fn read_vec_from_offset<T: Clone>(offset: usize, size: usize, vfs: &Arc<dyn VfsImpl>) -> Vec<T> {
1609 assert!(size > 0);
1610 let slice = read_u8_slice_from_disk(offset, size, vfs);
1611 let ptr = slice.as_ptr() as *const T;
1612 let size = size / std::mem::size_of::<T>();
1613 let slice = unsafe { std::slice::from_raw_parts(ptr, size) };
1614 slice.to_vec()
1615}
1616
1617fn read_u8_slice_from_disk(offset: usize, size: usize, vfs: &Arc<dyn VfsImpl>) -> Vec<u8> {
1618 let mut res = Vec::new();
1619 let mut buffer = vec![0; DISK_PAGE_SIZE];
1620 for i in (0..size).step_by(DISK_PAGE_SIZE) {
1621 vfs.read(offset + i, &mut buffer); res.extend_from_slice(&buffer);
1623 }
1624 res
1625}
1626
1627const SECTOR_SIZE: usize = 512;
1628
1629fn align_to_sector_size(n: usize) -> usize {
1630 (n + SECTOR_SIZE - 1) & !(SECTOR_SIZE - 1)
1631}
1632
1633fn serialize_u8_slice_to_disk(slice: &[u8], vfs: &Arc<dyn VfsImpl>) -> usize {
1637 let mut start_offset = None;
1638 for chunk in slice.chunks(DISK_PAGE_SIZE) {
1639 let offset = vfs.alloc_offset(DISK_PAGE_SIZE); if start_offset.is_none() {
1641 start_offset = Some(offset);
1642 }
1643 vfs.write(offset, chunk);
1644 }
1645 start_offset.unwrap()
1646}
1647
1648#[cfg(test)]
1649mod tests {
1650 use crate::{nodes::leaf_node::LeafReadResult, BfTree, Config};
1651 use std::panic;
1652 use std::str::FromStr;
1653 use std::sync::atomic::Ordering;
1654 use std::sync::{atomic::AtomicBool, Arc};
1655 use std::thread;
1656
1657 #[test]
1661 fn cpr_snapshot_disk() {
1662 let min_record_size: usize = 64;
1663 let max_record_size: usize = 2408;
1664 let leaf_page_size: usize = 8192;
1665 let snapshot_num: usize = 2;
1666 let num_threads: usize = 4;
1667 let file_path: String = "target/test_simple.bftree".to_string();
1668 let snapshot_file_path: String = "target/test_simple_snapshot.bftree".to_string();
1669 let new_snapshot_file_path: String = "target/test_simple_new_snapshot.bftree".to_string();
1670
1671 let tmp_file_path = std::path::PathBuf::from_str(&file_path).unwrap();
1672 let tmp_snapshot_file_path = std::path::PathBuf::from_str(&snapshot_file_path).unwrap();
1673 let tmp_new_snapshot_file_path =
1674 std::path::PathBuf::from_str(&new_snapshot_file_path).unwrap();
1675
1676 let mut config = Config::new(&tmp_file_path, 128 * 1024); config.storage_backend(crate::StorageBackend::Std);
1678 config.cb_min_record_size = min_record_size;
1679 config.cb_max_record_size = max_record_size;
1680 config.leaf_page_size = leaf_page_size;
1681 config.max_fence_len = max_record_size;
1682 config.snapshot_file_path = tmp_snapshot_file_path.clone();
1683 config.use_snapshot(true);
1684
1685 let bftree = Arc::new(BfTree::with_config(config.clone(), None).unwrap());
1686 let finish = Arc::new(AtomicBool::new(false));
1687
1688 let handles: Vec<_> = (0..num_threads)
1689 .map(|i| {
1690 let finish_clone = finish.clone();
1691 let bftree_clone = bftree.clone();
1692
1693 std::thread::spawn(move || {
1694 let key_len: usize = min_record_size / 2 + std::mem::size_of::<usize>();
1695 assert!(key_len * 2 <= max_record_size);
1696 let mut key_buffer = vec![0usize; key_len / std::mem::size_of::<usize>()];
1697
1698 let mut r: usize = 0;
1699 while !finish_clone.load(Ordering::Relaxed) {
1700 key_buffer.fill(r);
1701 key_buffer[0] = i;
1702
1703 match bftree_clone.insert(
1704 bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
1705 bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
1706 ) {
1707 crate::LeafInsertResult::Success => {}
1708 _ => {
1709 panic!("Insert failed");
1710 }
1711 }
1712 r += 1;
1713 }
1714 r
1715 })
1716 })
1717 .collect();
1718
1719 thread::sleep(std::time::Duration::from_secs(5));
1720 for _ in 0..snapshot_num {
1721 bftree.cpr_snapshot();
1723 thread::sleep(std::time::Duration::from_secs(5));
1724 }
1725
1726 let mut rs = vec![0usize; num_threads];
1728 finish.store(true, Ordering::Relaxed);
1729 for (i, h) in handles.into_iter().enumerate() {
1730 let r = h.join().unwrap();
1731 rs[i] = r;
1732 }
1733
1734 drop(bftree);
1735
1736 let bftree = BfTree::new_from_cpr_snapshot(
1738 std::path::PathBuf::from_str(&snapshot_file_path).unwrap(),
1739 true,
1740 Some(std::path::PathBuf::from_str(&(new_snapshot_file_path)).unwrap()),
1741 None,
1742 None,
1743 None,
1744 )
1745 .unwrap();
1746
1747 let mut rs_captured = vec![0usize; num_threads];
1748 for i in 0..num_threads {
1749 let record_num = rs[i];
1750
1751 let key_len: usize = min_record_size / 2 + std::mem::size_of::<usize>();
1752 let mut key_buffer = vec![0usize; key_len / std::mem::size_of::<usize>()];
1753 let mut res_buffer = vec![0u8; key_len];
1754 let mut not_included = false;
1755
1756 for r in 0..record_num {
1757 key_buffer.fill(r);
1758 key_buffer[0] = i;
1759
1760 match bftree.read(
1761 bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
1762 &mut res_buffer,
1763 ) {
1764 LeafReadResult::Found(v) => {
1765 if not_included {
1766 panic!("Found a record after the prefix for writer thread {}", i);
1767 }
1768 assert_eq!(v as usize, key_len);
1769 assert_eq!(
1770 &res_buffer,
1771 bytemuck::must_cast_slice::<usize, u8>(&key_buffer)
1772 );
1773 }
1774 LeafReadResult::NotFound => {
1775 if !not_included {
1776 not_included = true;
1777 rs_captured[i] = r;
1778 }
1779 }
1780 _ => {
1781 panic!("Unexpected read result")
1782 }
1783 }
1784 }
1785
1786 if !not_included {
1787 rs_captured[i] = record_num;
1788 }
1789
1790 assert!(rs_captured[i] < record_num)
1792 }
1793
1794 std::fs::remove_file(tmp_file_path).unwrap();
1795 std::fs::remove_file(tmp_new_snapshot_file_path).unwrap();
1796 std::fs::remove_file(tmp_snapshot_file_path).unwrap();
1797 }
1798
1799 #[test]
1800 fn cpr_snapshot_cache_only() {
1801 let min_record_size: usize = 64;
1802 let max_record_size: usize = 2408;
1803 let leaf_page_size: usize = 8192;
1804 let num_threads: usize = 4;
1805
1806 let snapshot_file_path: String =
1807 "target/test_simple_cache_only_snapshot.bftree".to_string();
1808 let new_snapshot_file_path: String =
1809 "target/test_simple_cache_only_new_snapshot.bftree".to_string();
1810 let tmp_snapshot_file_path = std::path::PathBuf::from_str(&snapshot_file_path).unwrap();
1811 let tmp_new_snapshot_file_path =
1812 std::path::PathBuf::from_str(&new_snapshot_file_path).unwrap();
1813
1814 let mut config = Config::default(); config.storage_backend(crate::StorageBackend::Memory);
1816 config.file_path(":memory:");
1817 config.cache_only = true;
1818 config.cb_size_byte(1024 * 1024 * 1024);
1819 config.cb_min_record_size = min_record_size;
1820 config.cb_max_record_size = max_record_size;
1821 config.leaf_page_size = leaf_page_size;
1822 config.max_fence_len = max_record_size;
1823 config.snapshot_file_path = tmp_snapshot_file_path.clone();
1824 config.use_snapshot(true);
1825
1826 let bftree = Arc::new(BfTree::with_config(config.clone(), None).unwrap());
1827 let finish = Arc::new(AtomicBool::new(false));
1828
1829 let handles: Vec<_> = (0..num_threads)
1830 .map(|i| {
1831 let finish_clone = finish.clone();
1832 let bftree_clone = bftree.clone();
1833
1834 std::thread::spawn(move || {
1835 let key_len: usize = min_record_size / 2 + std::mem::size_of::<usize>();
1836 assert!(key_len * 2 <= max_record_size);
1837 let mut key_buffer = vec![0usize; key_len / std::mem::size_of::<usize>()];
1838
1839 let mut r: usize = 0;
1840 while !finish_clone.load(Ordering::Relaxed) {
1841 key_buffer.fill(r);
1842 key_buffer[0] = i;
1843
1844 match bftree_clone.insert(
1845 bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
1846 bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
1847 ) {
1848 crate::LeafInsertResult::Success => {}
1849 _ => {
1850 panic!("Insert failed");
1851 }
1852 }
1853 r += 1;
1854 }
1855 r
1856 })
1857 })
1858 .collect();
1859
1860 thread::sleep(std::time::Duration::from_secs(5));
1861 bftree.cpr_snapshot();
1863 thread::sleep(std::time::Duration::from_secs(5));
1864
1865 let mut rs = vec![0usize; num_threads];
1867 finish.store(true, Ordering::Relaxed);
1868 for (i, h) in handles.into_iter().enumerate() {
1869 let r = h.join().unwrap();
1870 rs[i] = r;
1871 }
1872
1873 drop(bftree);
1874
1875 let bftree = BfTree::new_from_cpr_snapshot(
1877 std::path::PathBuf::from_str(&snapshot_file_path).unwrap(),
1878 true,
1879 Some(std::path::PathBuf::from_str(&(new_snapshot_file_path)).unwrap()),
1880 None,
1881 None,
1882 None,
1883 )
1884 .unwrap();
1885
1886 let mut rs_captured = vec![0usize; num_threads];
1887 for i in 0..num_threads {
1888 let record_num = rs[i];
1889
1890 let key_len: usize = min_record_size / 2 + std::mem::size_of::<usize>();
1891 let mut key_buffer = vec![0usize; key_len / std::mem::size_of::<usize>()];
1892 let mut res_buffer = vec![0u8; key_len];
1893 let mut not_included = false;
1894
1895 for r in 0..record_num {
1896 key_buffer.fill(r);
1897 key_buffer[0] = i;
1898
1899 match bftree.read(
1900 bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
1901 &mut res_buffer,
1902 ) {
1903 LeafReadResult::Found(v) => {
1904 if not_included {
1905 panic!("Found a record after the prefix for writer thread {}", i);
1906 }
1907 assert_eq!(v as usize, key_len);
1908 assert_eq!(
1909 &res_buffer,
1910 bytemuck::must_cast_slice::<usize, u8>(&key_buffer)
1911 );
1912 }
1913 LeafReadResult::NotFound => {
1914 if !not_included {
1915 not_included = true;
1916 rs_captured[i] = r;
1917 }
1918 }
1919 _ => {
1920 panic!("Unexpected read result")
1921 }
1922 }
1923 }
1924
1925 if !not_included {
1926 rs_captured[i] = record_num;
1927 }
1928
1929 assert!(rs_captured[i] < record_num)
1931 }
1932
1933 std::fs::remove_file(tmp_snapshot_file_path).unwrap();
1934 std::fs::remove_file(tmp_new_snapshot_file_path).unwrap();
1935 }
1936}