1use std::cell::UnsafeCell;
5use std::collections::{HashMap, VecDeque};
6use std::mem::{ManuallyDrop, MaybeUninit};
7use std::ops::{Deref, DerefMut};
8use std::panic;
9use std::path::Path;
10use std::path::PathBuf;
11use std::sync::Arc;
12
13#[cfg(not(all(feature = "shuttle", test)))]
14use rand::Rng;
15#[cfg(all(feature = "shuttle", test))]
16use shuttle::rand::Rng;
17
18#[cfg(unix)]
19use std::os::unix::fs::FileExt;
20#[cfg(windows)]
21use std::os::windows::fs::FileExt;
22
23#[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
24use thread_local::ThreadLocal;
25
26use crate::{
27 circular_buffer::CircularBuffer,
28 error::ConfigError,
29 fs::VfsImpl,
30 mini_page_op::LeafOperations,
31 nodes::{leaf_node::MiniPageNextLevel, LeafNode, INVALID_DISK_OFFSET},
32 nodes::{InnerNode, InnerNodeBuilder, PageID, DISK_PAGE_SIZE, INNER_NODE_SIZE},
33 storage::{make_vfs, LeafStorage, PageLocation, PageTable},
34 sync::atomic::{AtomicBool, AtomicU64, Ordering},
35 sync::thread,
36 sync::RwLock,
37 utils::{get_rng, inner_lock::ReadGuard, BfsVisitor, NodeInfo},
38 wal::{LogEntry, LogEntryImpl, WriteAheadLog},
39 BfTree, Config, StorageBackend, WalConfig, WalReader,
40};
41
42const BF_TREE_MAGIC_BEGIN: &[u8; 16] = b"BF-TREE-V0-BEGIN";
43const BF_TREE_MAGIC_END: &[u8; 14] = b"BF-TREE-V0-END";
44const META_DATA_PAGE_OFFSET: usize = 0;
45
46const INVALID_SNAPSHOT_THREAD_ID: usize = usize::MAX; const NULL_PAGE_LOCATION_OFFSET: usize = usize::MAX; const INVALID_SNAPSHOT_STATE: u64 = u64::MAX; pub const INVALID_SNAPSHOT_VERSION: u64 = u64::MAX >> 1; const DEFAULT_MAX_SNAPSHOT_THREAD_NUM: usize = 64; const SNAPSHOT_STATE_PHASE_ID_SHIFT: usize = 61; const SNAPSHOT_STATE_PHASE_NUM: u64 = 4; const SNAPSHOT_STATE_PHASE_ID_MASK: u64 = 0b111 << SNAPSHOT_STATE_PHASE_ID_SHIFT; const SNAPSHOT_STATE_VERSION_MASK: u64 = (1 << SNAPSHOT_STATE_PHASE_ID_SHIFT) - 1; #[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum PhaseId {
59 Rest,
60 Prepare,
61 InProgress,
62 Sweep,
63}
64
65impl PhaseId {
66 fn from_raw(value: u64) -> Self {
67 match value {
68 0 => PhaseId::Rest,
69 1 => PhaseId::Prepare,
70 2 => PhaseId::InProgress,
71 3 => PhaseId::Sweep,
72 _ => panic!("Invalid phase id: {}", value),
73 }
74 }
75
76 fn as_raw(self) -> u64 {
77 match self {
78 PhaseId::Rest => 0,
79 PhaseId::Prepare => 1,
80 PhaseId::InProgress => 2,
81 PhaseId::Sweep => 3,
82 }
83 }
84}
85
86pub struct CPRSnapShotMgr {
96 global_state: AtomicU64,
100 thread_slots: [AtomicBool; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
102 thread_local_states: [AtomicU64; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
104 thread_local_inner_mappings:
107 UnsafeCell<[Vec<(*const InnerNode, usize)>; DEFAULT_MAX_SNAPSHOT_THREAD_NUM]>,
108 thread_local_base_mappings: UnsafeCell<[Vec<(PageID, usize)>; DEFAULT_MAX_SNAPSHOT_THREAD_NUM]>,
109 thread_local_mini_mappings: UnsafeCell<[Vec<(PageID, usize)>; DEFAULT_MAX_SNAPSHOT_THREAD_NUM]>,
110 thread_local_mini_size_mappings:
111 UnsafeCell<[Vec<(PageID, usize)>; DEFAULT_MAX_SNAPSHOT_THREAD_NUM]>,
112 root_id: AtomicU64, pause_snapshot: AtomicBool,
114 vfs: RwLock<Arc<dyn VfsImpl>>,
118 snapshot_in_progress: AtomicBool,
120}
121
122unsafe impl Sync for CPRSnapShotMgr {}
123
124unsafe impl Send for CPRSnapShotMgr {}
125
126pub struct CPRSnapshotGuard {
128 snapshot_mgr: Option<Arc<CPRSnapShotMgr>>,
129 thread_slot_id: usize,
130 snapshot_version: u64,
131 phase_id: PhaseId,
132}
133
134impl CPRSnapshotGuard {
135 pub fn new(snapshot_mgr: Option<Arc<CPRSnapShotMgr>>) -> Result<Self, ()> {
136 match snapshot_mgr {
137 None => Ok(Self {
138 snapshot_mgr: None,
139 thread_slot_id: INVALID_SNAPSHOT_THREAD_ID,
140 snapshot_version: INVALID_SNAPSHOT_VERSION,
141 phase_id: PhaseId::Rest,
142 }),
143 Some(ref mgr) => {
144 let (thread_slot_id, snapshot_version, phase_id) = mgr.reserve_thread_slot()?;
145
146 Ok(Self {
147 snapshot_mgr: Some(mgr.clone()),
148 thread_slot_id,
149 snapshot_version,
150 phase_id,
151 })
152 }
153 }
154 }
155
156 pub fn snapshot_version(&self) -> u64 {
158 self.snapshot_version
159 }
160
161 pub fn get_local_phase_id(&self) -> PhaseId {
163 self.phase_id
164 }
165
166 pub fn is_protected(&self) -> bool {
168 self.thread_slot_id != INVALID_SNAPSHOT_THREAD_ID
169 }
170
171 pub fn snapshot_base_page(&self, id: PageID, ptr: &[u8], size: usize) {
172 self.snapshot_mgr
173 .as_ref()
174 .unwrap()
175 .snapshot_base_page(id, ptr, size, self.thread_slot_id);
176 }
177
178 pub fn snapshot_mini_page(&self, id: PageID, ptr: &[u8], size: usize) {
179 self.snapshot_mgr
180 .as_ref()
181 .unwrap()
182 .snapshot_mini_page(id, ptr, size, self.thread_slot_id);
183 }
184
185 pub fn snapshot_inner_node(&self, ptr: *const InnerNode) {
186 self.snapshot_mgr
187 .as_ref()
188 .unwrap()
189 .snapshot_inner_node(ptr, self.thread_slot_id);
190 }
191
192 pub fn snapshot_root_page(&self, root_id: PageID) {
193 self.snapshot_mgr
194 .as_ref()
195 .unwrap()
196 .snapshot_root_page(root_id);
197 }
198}
199
200impl Drop for CPRSnapshotGuard {
201 fn drop(&mut self) {
202 if let Some(ref mgr) = self.snapshot_mgr {
203 mgr.release_thread_slot(self.thread_slot_id);
204 }
205 }
206}
207
208impl CPRSnapShotMgr {
209 pub fn are_all_threads_in_next_version(&self) -> bool {
210 if !self.snapshot_in_progress.load(Ordering::Acquire) {
211 false
212 } else {
213 let global_phase_id = self.get_global_phase_id();
214 global_phase_id == PhaseId::Sweep || global_phase_id == PhaseId::Rest
215 }
216 }
217
218 pub fn new(version: u64) -> Self {
220 let vfs: Arc<dyn VfsImpl> = make_vfs(&StorageBackend::Memory, ":memory:");
221 Self {
222 global_state: AtomicU64::new(Self::new_snapshot_state(0, version)), thread_slots: [const { AtomicBool::new(false) }; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
224 thread_local_states: [const { AtomicU64::new(INVALID_SNAPSHOT_STATE) };
225 DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
226 thread_local_inner_mappings: UnsafeCell::new(
227 [const { Vec::new() }; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
228 ),
229 thread_local_base_mappings: UnsafeCell::new(
230 [const { Vec::new() }; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
231 ),
232 thread_local_mini_mappings: UnsafeCell::new(
233 [const { Vec::new() }; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
234 ),
235 thread_local_mini_size_mappings: UnsafeCell::new(
236 [const { Vec::new() }; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
237 ),
238 root_id: AtomicU64::new(0),
239 pause_snapshot: AtomicBool::new(false),
240 vfs: RwLock::new(vfs),
241 snapshot_in_progress: AtomicBool::new(false),
242 }
243 }
244
245 fn reset(&self) {
247 let local_inner_mappings = unsafe { &mut *self.thread_local_inner_mappings.get() };
248 let local_mini_mappings = unsafe { &mut *self.thread_local_mini_mappings.get() };
249 let local_base_mappings = unsafe { &mut *self.thread_local_base_mappings.get() };
250 let local_mini_size_mappings = unsafe { &mut *self.thread_local_mini_size_mappings.get() };
251 for thread_slot_id in 0..DEFAULT_MAX_SNAPSHOT_THREAD_NUM {
253 local_inner_mappings[thread_slot_id] = Vec::new();
254 local_mini_mappings[thread_slot_id] = Vec::new();
255 local_base_mappings[thread_slot_id] = Vec::new();
256 local_mini_size_mappings[thread_slot_id] = Vec::new();
257 }
258
259 self.root_id.store(0, Ordering::Release);
260 }
261
262 pub fn new_snapshot_state(phase_id: u64, version: u64) -> u64 {
263 assert!(
264 phase_id < SNAPSHOT_STATE_PHASE_NUM,
265 "Phase id must be less than {}",
266 SNAPSHOT_STATE_PHASE_NUM
267 );
268 assert!(
269 version < (1 << SNAPSHOT_STATE_PHASE_ID_SHIFT),
270 "Version must be less than 2^61"
271 );
272 (phase_id << SNAPSHOT_STATE_PHASE_ID_SHIFT) | version
273 }
274
275 fn get_global_version(&self) -> u64 {
276 self.global_state.load(Ordering::Acquire) & SNAPSHOT_STATE_VERSION_MASK
277 }
278
279 fn get_global_phase_id(&self) -> PhaseId {
280 PhaseId::from_raw(
281 (self.global_state.load(Ordering::Acquire) & SNAPSHOT_STATE_PHASE_ID_MASK)
282 >> SNAPSHOT_STATE_PHASE_ID_SHIFT,
283 )
284 }
285
286 fn get_local_state(&self, thread_slot_id: &usize) -> u64 {
288 self.thread_local_states[*thread_slot_id].load(Ordering::Acquire)
289 }
290
291 fn set_local_state(&self, thread_slot_id: &usize, state: u64) {
292 let current_state = self.get_local_state(thread_slot_id);
294 if current_state == state {
295 return;
296 }
297
298 self.thread_local_states[*thread_slot_id].store(state, Ordering::Release);
299 }
300
301 fn advance_global_state(&self) -> u64 {
303 let phase_id = self.get_global_phase_id();
304 let version = self.get_global_version();
305
306 match phase_id {
307 PhaseId::Rest => {
308 let new_state = Self::new_snapshot_state(PhaseId::Prepare.as_raw(), version);
310 self.global_state.store(new_state, Ordering::Release);
311 new_state
312 }
313 PhaseId::Prepare => {
314 let new_state = Self::new_snapshot_state(PhaseId::InProgress.as_raw(), version + 1);
316 self.global_state.store(new_state, Ordering::Release);
317 new_state
318 }
319 PhaseId::InProgress => {
320 let new_state = Self::new_snapshot_state(PhaseId::Sweep.as_raw(), version);
322 self.global_state.store(new_state, Ordering::Release);
323 new_state
324 }
325 PhaseId::Sweep => {
326 let new_state = Self::new_snapshot_state(PhaseId::Rest.as_raw(), version);
328 self.global_state.store(new_state, Ordering::Release);
329 new_state
330 }
331 }
332 }
333
334 fn check_if_phase_completed(&self, target_state: u64) -> bool {
337 for thread_slot_id in 0..DEFAULT_MAX_SNAPSHOT_THREAD_NUM {
339 let local_state = self.thread_local_states[thread_slot_id].load(Ordering::Acquire);
340 if local_state != INVALID_SNAPSHOT_STATE && local_state != target_state {
341 return false;
342 }
343 }
344 true
345 }
346
347 pub fn reserve_thread_slot(&self) -> Result<(usize, u64, PhaseId), ()> {
351 if self.pause_snapshot.load(Ordering::Acquire) {
352 return Err(());
353 }
354
355 let start = get_rng().random_range(0..DEFAULT_MAX_SNAPSHOT_THREAD_NUM);
356 let end = 2 * DEFAULT_MAX_SNAPSHOT_THREAD_NUM;
357
358 for i in start..end {
359 let tid = i % DEFAULT_MAX_SNAPSHOT_THREAD_NUM;
360 if self.thread_slots[tid]
361 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
362 .is_ok()
363 {
364 let global_state = self.global_state.load(Ordering::Acquire);
366 self.set_local_state(&tid, global_state);
367
368 if self.get_local_state(&tid) != self.global_state.load(Ordering::Acquire)
378 || self.pause_snapshot.load(Ordering::Acquire)
379 {
380 self.set_local_state(&tid, INVALID_SNAPSHOT_STATE);
381 assert!(self.thread_slots[tid]
382 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
383 .is_ok());
384 return Err(());
385 } else {
386 let version = global_state & SNAPSHOT_STATE_VERSION_MASK;
387 let phase_id = PhaseId::from_raw(
388 (global_state & SNAPSHOT_STATE_PHASE_ID_MASK)
389 >> SNAPSHOT_STATE_PHASE_ID_SHIFT,
390 );
391 return Ok((tid, version, phase_id));
392 }
393 }
394 }
395 Err(())
396 }
397
398 pub fn release_thread_slot(&self, thread_slot_id: usize) {
400 self.set_local_state(&thread_slot_id, INVALID_SNAPSHOT_STATE);
401 self.thread_slots[thread_slot_id].store(false, Ordering::Release);
402 }
403
404 pub fn get_snapshot_guard(
405 snapshot_mgr: Option<Arc<CPRSnapShotMgr>>,
406 ) -> Result<CPRSnapshotGuard, ()> {
407 CPRSnapshotGuard::new(snapshot_mgr)
408 }
409
410 pub fn snapshot_page(&self, ptr: &[u8], size: usize) -> usize {
416 let vfs = self.vfs.read().unwrap().clone();
418 let offset = vfs.alloc_offset(size);
419
420 vfs.write(offset, ptr);
422
423 offset
425 }
426
427 pub fn snapshot_inner_node(&self, ptr: *const InnerNode, thread_slot_id: usize) {
428 let offset = unsafe { self.snapshot_page((&*ptr).as_slice(), INNER_NODE_SIZE) };
429 let inner_mappings = unsafe { &mut *self.thread_local_inner_mappings.get() };
430
431 inner_mappings[thread_slot_id].push((ptr, offset));
432 }
433
434 pub fn snapshot_mini_page(&self, id: PageID, ptr: &[u8], size: usize, thread_slot_id: usize) {
435 let offset = if size != 0 {
436 self.snapshot_page(ptr, size)
437 } else {
438 NULL_PAGE_LOCATION_OFFSET
440 };
441
442 let mini_mappings = unsafe { &mut *self.thread_local_mini_mappings.get() };
443 mini_mappings[thread_slot_id].push((id, offset));
444
445 let mini_size_mappings = unsafe { &mut *self.thread_local_mini_size_mappings.get() };
446 mini_size_mappings[thread_slot_id].push((id, size));
447
448 assert!(mini_mappings[thread_slot_id].len() == mini_size_mappings[thread_slot_id].len());
449 }
450
451 pub fn snapshot_base_page(&self, id: PageID, ptr: &[u8], size: usize, thread_slot_id: usize) {
452 let offset = self.snapshot_page(ptr, size);
453
454 let base_mappings = unsafe { &mut *self.thread_local_base_mappings.get() };
455 base_mappings[thread_slot_id].push((id, offset));
456 }
457
458 pub fn snapshot_root_page(&self, root_id: PageID) {
459 let cur_root_id = self.root_id.load(Ordering::Acquire);
460 if cur_root_id != 0 {
461 assert_eq!(cur_root_id, root_id.raw());
462 }
463
464 self.root_id.store(root_id.raw(), Ordering::Release);
465 }
466
467 fn sweep(
470 &self,
471 tree: &BfTree,
472 version: u64,
473 inner_mapping: &mut Vec<(*const InnerNode, usize)>,
474 mini_mapping: &mut Vec<(PageID, usize)>,
475 mini_size_mapping: &mut Vec<(PageID, usize)>,
476 base_mapping: &mut Vec<(PageID, usize)>,
477 ) -> usize {
478 self.pause_snapshot.store(true, Ordering::Release);
489 loop {
490 if self.check_if_phase_completed(INVALID_SNAPSHOT_STATE) {
491 loop {
494 let root_id = tree.get_root_page();
495 let rid = root_id.0;
496 if root_id.1 {
497 let mut leaf = tree.mapping_table().get(&rid);
499 let page_loc = leaf.get_page_location();
500
501 match page_loc {
502 PageLocation::Base(offset) => {
503 let base_ref = leaf.load_base_page(*offset);
504 if base_ref.get_clean_snapshot_version() < version {
505 let base_ptr = unsafe {
506 std::slice::from_raw_parts(
507 base_ref as *const LeafNode as *const u8,
508 base_ref.meta.node_size as usize,
509 )
510 };
511 let offset = self
512 .snapshot_page(base_ptr, base_ref.meta.node_size as usize);
513 base_mapping.push((rid, offset));
514 self.snapshot_root_page(rid);
515 }
516 }
517 PageLocation::Mini(ptr) => {
518 assert!(tree.cache_only);
520
521 let mini_ref = leaf.load_cache_page(*ptr);
522 if mini_ref.get_clean_snapshot_version() < version {
523 let mini_ptr = unsafe {
524 std::slice::from_raw_parts(
525 mini_ref as *const LeafNode as *const u8,
526 mini_ref.meta.node_size as usize,
527 )
528 };
529 let offset = self
530 .snapshot_page(mini_ptr, mini_ref.meta.node_size as usize);
531 mini_mapping.push((rid, offset));
532 mini_size_mapping.push((rid, mini_ref.meta.node_size as usize));
533 self.snapshot_root_page(rid);
534 }
535 }
536 _ => {
537 panic!("Unexpected page location for root page: {:?}", page_loc);
538 }
539 }
540
541 break;
542 } else {
543 let ptr = rid.as_inner_node();
545 let inner = match ReadGuard::try_read(ptr) {
547 Ok(inner) => inner,
548 Err(_) => continue,
549 };
550
551 if inner.as_ref().get_clean_snapshot_version() < version {
552 let offset =
553 unsafe { self.snapshot_page((&*ptr).as_slice(), INNER_NODE_SIZE) };
554 inner_mapping.push((ptr, offset));
555 self.snapshot_root_page(rid);
556 }
557 break;
558 }
559 }
560
561 let visitor = BfsVisitor::new_inner_only(tree);
563 for node in visitor {
564 loop {
565 match node {
566 NodeInfo::Inner { ptr, .. } => {
567 let inner = match ReadGuard::try_read(ptr) {
569 Ok(inner) => inner,
570 Err(_) => continue,
571 };
572
573 if inner.as_ref().get_clean_snapshot_version() < version {
574 let offset = unsafe {
575 self.snapshot_page((&*ptr).as_slice(), INNER_NODE_SIZE)
576 };
577 inner_mapping.push((ptr, offset));
578 }
579
580 break;
581 }
582 NodeInfo::Leaf { level, .. } => {
583 assert_eq!(level, 0);
585 break;
586 }
587 }
588 }
589 }
590
591 break;
592 }
593 thread::sleep(std::time::Duration::from_secs(1));
595
596 #[cfg(all(feature = "shuttle", test))]
597 shuttle::thread::yield_now();
598 }
599
600 self.pause_snapshot.store(false, Ordering::Release);
602
603 let page_table_iter = tree.storage.page_table.iter();
606 let mut enumerate_leaf_count = 0;
607
608 for (_, pid) in page_table_iter {
609 assert!(pid.is_id());
610
611 let mut leaf = tree.mapping_table().get(&pid);
613 let page_loc = leaf.get_page_location().clone();
614 enumerate_leaf_count += 1;
615
616 match page_loc {
617 PageLocation::Base(offset) => {
618 let base_ref = leaf.load_base_page(offset);
619 if base_ref.get_clean_snapshot_version() < version {
620 let base_ptr = unsafe {
621 std::slice::from_raw_parts(
622 base_ref as *const LeafNode as *const u8,
623 base_ref.meta.node_size as usize,
624 )
625 };
626 let new_offset =
627 self.snapshot_page(base_ptr, base_ref.meta.node_size as usize);
628 base_mapping.push((pid, new_offset));
629 }
630 }
631 PageLocation::Full(ptr) => {
632 let full_ref = leaf.load_cache_page(ptr);
634 if full_ref.get_clean_snapshot_version() < version {
635 let next_level = full_ref.next_level;
638 let full_page = unsafe { &mut *ptr };
639 full_page.next_level = MiniPageNextLevel::new_null();
640 let full_ptr = unsafe {
641 std::slice::from_raw_parts(
642 full_ref as *const LeafNode as *const u8,
643 full_ref.meta.node_size as usize,
644 )
645 };
646 let offset = self.snapshot_page(full_ptr, full_ref.meta.node_size as usize);
647 full_page.next_level = next_level;
648 base_mapping.push((pid, offset));
649 }
650 }
651 PageLocation::Mini(ptr) => {
652 let mini_ref = leaf.load_cache_page(ptr);
653 if mini_ref.get_clean_snapshot_version() < version {
654 let mini_ptr = unsafe {
655 std::slice::from_raw_parts(
656 mini_ref as *const LeafNode as *const u8,
657 mini_ref.meta.node_size as usize,
658 )
659 };
660 let offset = self.snapshot_page(mini_ptr, mini_ref.meta.node_size as usize);
661 mini_mapping.push((pid, offset));
662 mini_size_mapping.push((pid, mini_ref.meta.node_size as usize));
663
664 if !tree.cache_only {
665 let base_ref = leaf.load_base_page(mini_ref.next_level.as_offset());
667 assert!(base_ref.get_clean_snapshot_version() < version); let base_ptr = unsafe {
670 std::slice::from_raw_parts(
671 base_ref as *const LeafNode as *const u8,
672 base_ref.meta.node_size as usize,
673 )
674 };
675 let offset =
676 self.snapshot_page(base_ptr, base_ref.meta.node_size as usize);
677 base_mapping.push((pid, offset));
678 }
679 }
680 }
681 PageLocation::Null => {
682 assert!(tree.cache_only);
683 mini_mapping.push((pid, NULL_PAGE_LOCATION_OFFSET)); mini_size_mapping.push((pid, 0));
690 }
691 }
692 }
693
694 enumerate_leaf_count
695 }
696
697 #[allow(clippy::too_many_arguments)]
699 fn finalize(
700 &self,
701 snapshot_version: u64,
702 inner_mapping: &mut [(*const InnerNode, usize)],
703 mini_mapping: &mut [(PageID, usize)],
704 mini_size_mapping: &mut [(PageID, usize)],
705 base_mapping: &mut [(PageID, usize)],
706 leaf_count_upper: usize,
707 config: Arc<Config>,
708 ) {
709 let mut inner_mapping_unique: HashMap<*const InnerNode, usize> = HashMap::new();
711 let mut mini_mapping_unique: HashMap<PageID, usize> = HashMap::new();
712 let mut mini_size_mapping_unique: HashMap<PageID, usize> = HashMap::new();
713 let mut base_mapping_unique: HashMap<PageID, usize> = HashMap::new();
714
715 let local_inner_mappings = unsafe { &mut *self.thread_local_inner_mappings.get() };
716 let local_mini_mappings = unsafe { &mut *self.thread_local_mini_mappings.get() };
717 let local_mini_size_mappings = unsafe { &mut *self.thread_local_mini_size_mappings.get() };
718 let local_base_mappings = unsafe { &mut *self.thread_local_base_mappings.get() };
719
720 for thread_slot_id in 0..DEFAULT_MAX_SNAPSHOT_THREAD_NUM {
721 let entry_num = local_mini_mappings[thread_slot_id].len();
722 assert!(
723 local_mini_mappings[thread_slot_id].len()
724 == local_mini_size_mappings[thread_slot_id].len()
725 );
726 for i in 0..entry_num {
727 assert!(
728 local_mini_mappings[thread_slot_id][i].0
729 == local_mini_size_mappings[thread_slot_id][i].0
730 );
731
732 if local_mini_mappings[thread_slot_id][i].1 == NULL_PAGE_LOCATION_OFFSET {
733 assert_eq!(local_mini_size_mappings[thread_slot_id][i].1, 0);
734 } else {
735 assert!(local_mini_size_mappings[thread_slot_id][i].1 > 0);
736 }
737
738 if let std::collections::hash_map::Entry::Vacant(e) =
739 mini_mapping_unique.entry(local_mini_mappings[thread_slot_id][i].0)
740 {
741 e.insert(local_mini_mappings[thread_slot_id][i].1);
742 mini_size_mapping_unique.insert(
743 local_mini_size_mappings[thread_slot_id][i].0,
744 local_mini_size_mappings[thread_slot_id][i].1,
745 );
746 assert!(mini_mapping_unique.len() == mini_size_mapping_unique.len());
747 }
748 }
749 assert_eq!(local_mini_mappings[thread_slot_id].len(), entry_num);
750 assert_eq!(local_mini_size_mappings[thread_slot_id].len(), entry_num);
751 }
752
753 for thread_slot_id in 0..DEFAULT_MAX_SNAPSHOT_THREAD_NUM {
755 for m in local_inner_mappings[thread_slot_id].iter() {
756 inner_mapping_unique.entry(m.0).or_insert(m.1);
757 }
758
759 for m in local_base_mappings[thread_slot_id].iter() {
760 base_mapping_unique.entry(m.0).or_insert(m.1);
761 }
762 }
763
764 assert!(mini_mapping_unique.len() == mini_size_mapping_unique.len());
766 for (k, v) in mini_mapping_unique.iter() {
767 assert!(mini_size_mapping_unique.contains_key(k));
768 if *v == NULL_PAGE_LOCATION_OFFSET {
769 assert_eq!(mini_size_mapping_unique.get(k).copied().unwrap(), 0);
770 } else {
771 assert!(mini_size_mapping_unique.get(k).copied().unwrap() > 0);
772 }
773 }
774
775 for (k, v) in inner_mapping.iter() {
777 if !inner_mapping_unique.contains_key(k) {
778 inner_mapping_unique.insert(*k, *v);
779 }
780 }
781 for (k, v) in mini_mapping.iter() {
782 if !mini_mapping_unique.contains_key(k) {
783 mini_mapping_unique.insert(*k, *v);
784 }
785 }
786 for (k, v) in mini_size_mapping.iter() {
787 if !mini_size_mapping_unique.contains_key(k) {
788 mini_size_mapping_unique.insert(*k, *v);
789 } else {
790 if !config.cache_only {
791 assert!(*v == mini_size_mapping_unique.get(k).copied().unwrap());
792 }
793 }
794 }
795 for (k, v) in base_mapping.iter() {
796 if !base_mapping_unique.contains_key(k) {
797 base_mapping_unique.insert(*k, *v);
798 }
799 }
800
801 assert!(mini_mapping_unique.len() == mini_size_mapping_unique.len());
803 for (k, v) in mini_mapping_unique.iter() {
804 assert!(mini_size_mapping_unique.contains_key(k));
805 if *v == NULL_PAGE_LOCATION_OFFSET {
806 assert_eq!(mini_size_mapping_unique.get(k).copied().unwrap(), 0);
807 } else {
808 assert!(mini_size_mapping_unique.get(k).copied().unwrap() > 0);
809 }
810 }
811
812 if config.cache_only {
813 assert!(base_mapping_unique.is_empty());
814 } else {
815 assert!(mini_mapping_unique.len() <= base_mapping_unique.len());
816 }
817
818 let mut final_inner_mapping: Vec<(*const InnerNode, usize)> = Vec::new();
820 for (k, v) in inner_mapping_unique.into_iter() {
821 final_inner_mapping.push((k, v));
822 }
823
824 let mut sorted_base_mapping_uninit: Vec<MaybeUninit<(PageID, usize)>> =
828 Vec::with_capacity(base_mapping_unique.len());
829 unsafe {
830 sorted_base_mapping_uninit.set_len(base_mapping_unique.len());
831 }
832 let mut sorted_base_mapping_init = vec![false; base_mapping_unique.len()];
833
834 for (k, v) in base_mapping_unique.iter() {
835 assert!(k.is_id());
836 let offset = k.as_id();
837 assert!((offset as usize) < sorted_base_mapping_uninit.len());
838 sorted_base_mapping_init[offset as usize] = true;
839 sorted_base_mapping_uninit[offset as usize].write((*k, *v));
840 }
841 let final_sorted_base_mapping: Vec<(PageID, usize)> = if !config.cache_only {
842 assert_eq!(
843 base_mapping_unique.len(),
844 sorted_base_mapping_init.iter().filter(|&&b| b).count()
845 );
846 unsafe {
847 std::mem::transmute::<
848 std::vec::Vec<std::mem::MaybeUninit<(PageID, usize)>>,
849 std::vec::Vec<(PageID, usize)>,
850 >(sorted_base_mapping_uninit)
851 }
852 } else {
853 Vec::new()
854 };
855
856 let mut final_mini_mapping: Vec<(PageID, usize)> = Vec::new();
858 for (k, v) in mini_mapping_unique.into_iter() {
859 final_mini_mapping.push((k, v));
860 }
861
862 let mut final_mini_size_mapping: Vec<(PageID, usize)> = Vec::new();
863 for (k, v) in mini_size_mapping_unique.into_iter() {
864 final_mini_size_mapping.push((k, v));
865 }
866
867 let leaf_page_num = if config.cache_only {
868 leaf_count_upper
876 } else {
877 assert!(leaf_count_upper >= final_sorted_base_mapping.len());
878 final_sorted_base_mapping.len()
879 };
880
881 let mut file_size = std::mem::size_of::<BfTreeMeta>() as u64;
882
883 let (inner_offset, inner_size) =
885 serialize_vec_to_disk(&final_inner_mapping, &self.vfs.read().unwrap());
886
887 if inner_offset != 0 {
888 file_size = (inner_offset + align_to_sector_size(inner_size)) as u64;
889 }
890
891 let (mini_offset, mini_size) =
892 serialize_vec_to_disk(&final_mini_mapping, &self.vfs.read().unwrap());
893
894 if mini_offset != 0 {
895 file_size = (mini_offset + align_to_sector_size(mini_size)) as u64;
896 }
897
898 let (mini_size_offset, mini_size_size) =
899 serialize_vec_to_disk(&final_mini_size_mapping, &self.vfs.read().unwrap());
900
901 if mini_size_offset != 0 {
902 file_size = (mini_size_offset + align_to_sector_size(mini_size_size)) as u64;
903 }
904
905 let (base_offset, base_size) =
906 serialize_vec_to_disk(&final_sorted_base_mapping, &self.vfs.read().unwrap());
907
908 if base_offset != 0 {
909 file_size = (base_offset + align_to_sector_size(base_size)) as u64;
910 }
911
912 let metadata = BfTreeMeta {
914 magic_begin: *BF_TREE_MAGIC_BEGIN,
915 root_id: unsafe { PageID::from_raw(self.root_id.load(Ordering::Acquire)) },
916 inner_offset,
917 inner_size,
918 mini_offset,
919 mini_size,
920 mini_size_offset,
921 mini_size_size,
922 base_offset,
923 base_size,
924 file_size,
925 leaf_page_num,
926 snapshot_version,
927 cache_only: config.cache_only,
928 cb_size_byte: config.cb_size_byte,
929 read_promotion_rate: config.read_promotion_rate.load(Ordering::Relaxed),
930 scan_promotion_rate: config.scan_promotion_rate.load(Ordering::Relaxed),
931 cb_min_record_size: config.cb_min_record_size,
932 cb_max_record_size: config.cb_max_record_size,
933 leaf_page_size: config.leaf_page_size,
934 cb_max_key_len: config.cb_max_key_len,
935 max_fence_len: config.max_fence_len,
936 cb_copy_on_access_ratio: config.cb_copy_on_access_ratio,
937 read_record_cache: config.read_record_cache,
938 max_mini_page_size: config.max_mini_page_size,
939 mini_page_binary_search: config.mini_page_binary_search,
940 write_load_full_page: config.write_load_full_page,
941 magic_end: *BF_TREE_MAGIC_END,
942 };
943
944 let vfs = self.vfs.read().unwrap();
945 vfs.write(META_DATA_PAGE_OFFSET, metadata.as_slice());
946 vfs.flush();
947
948 self.reset();
949 }
950
951 pub fn snapshot(&self, tree: &BfTree, snapshot_file_path: impl AsRef<Path>) {
953 if self
955 .snapshot_in_progress
956 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
957 .is_err()
958 {
959 println!("Another snapshot is in progress, skipping this snapshot request.");
960 return;
961 }
962
963 let mut vfs_guard = self.vfs.write().unwrap();
965 let snapshot_vfs = make_vfs(&tree.config.snapshot_backend, snapshot_file_path);
966 let old_vfs = std::mem::replace(&mut *vfs_guard, snapshot_vfs);
967 drop(old_vfs);
968
969 vfs_guard.reset();
971
972 drop(vfs_guard);
974
975 let mut sweep_inner_mapping: Vec<(*const InnerNode, usize)> = Vec::new();
977 let mut sweep_mini_mapping: Vec<(PageID, usize)> = Vec::new();
978 let mut sweep_mini_size_mapping: Vec<(PageID, usize)> = Vec::new();
979 let mut sweep_base_mapping: Vec<(PageID, usize)> = Vec::new();
980
981 let mut current_global_phase_id = self.get_global_phase_id();
983 assert_eq!(current_global_phase_id, PhaseId::Rest);
984
985 let snapshot_version = self.get_global_version();
987
988 let mut current_global_state = self.advance_global_state();
990 current_global_phase_id = self.get_global_phase_id();
991 assert_eq!(current_global_phase_id, PhaseId::Prepare);
992 assert_eq!(snapshot_version, self.get_global_version());
993
994 let mut leaf_node_count_upper_bound = 0; loop {
997 if self.check_if_phase_completed(current_global_state) {
998 match current_global_phase_id {
999 PhaseId::Rest => {
1000 self.finalize(
1005 snapshot_version,
1006 &mut sweep_inner_mapping,
1007 &mut sweep_mini_mapping,
1008 &mut sweep_mini_size_mapping,
1009 &mut sweep_base_mapping,
1010 leaf_node_count_upper_bound,
1011 tree.config.clone(),
1012 );
1013
1014 let mut vfs_guard = self.vfs.write().unwrap();
1016 let snapshot_vfs = make_vfs(&StorageBackend::Memory, ":memory:");
1017 let old_vfs = std::mem::replace(&mut *vfs_guard, snapshot_vfs);
1018 drop(old_vfs);
1019 drop(vfs_guard);
1020
1021 break;
1023 }
1024 PhaseId::Prepare => {
1025 current_global_state = self.advance_global_state();
1026 current_global_phase_id = self.get_global_phase_id();
1027 assert_eq!(current_global_phase_id, PhaseId::InProgress);
1028 assert_eq!(snapshot_version + 1, self.get_global_version());
1029 }
1030 PhaseId::InProgress => {
1031 current_global_state = self.advance_global_state();
1032 current_global_phase_id = self.get_global_phase_id();
1033 assert_eq!(current_global_phase_id, PhaseId::Sweep);
1034 assert_eq!(snapshot_version + 1, self.get_global_version());
1035 }
1036 PhaseId::Sweep => {
1037 leaf_node_count_upper_bound = self.sweep(
1045 tree,
1046 snapshot_version + 1,
1047 &mut sweep_inner_mapping,
1048 &mut sweep_mini_mapping,
1049 &mut sweep_mini_size_mapping,
1050 &mut sweep_base_mapping,
1051 );
1052
1053 current_global_state = self.advance_global_state();
1054 current_global_phase_id = self.get_global_phase_id();
1055 assert_eq!(current_global_phase_id, PhaseId::Rest);
1056 assert_eq!(snapshot_version + 1, self.get_global_version());
1057 }
1058 }
1059 }
1060
1061 thread::sleep(std::time::Duration::from_secs(1));
1063
1064 #[cfg(all(feature = "shuttle", test))]
1065 shuttle::thread::yield_now();
1066 }
1067
1068 assert!(self
1069 .snapshot_in_progress
1070 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
1071 .is_ok());
1072 }
1073
1074 pub fn new_from_snapshot(
1088 recovery_snapshot_file_path: impl AsRef<Path>, use_snapshot: bool,
1090 buffer_ptr: Option<*mut u8>,
1091 buffer_size: Option<usize>, wal_config: Option<Arc<WalConfig>>,
1093 ) -> Result<BfTree, ConfigError> {
1094 if !recovery_snapshot_file_path.as_ref().exists() {
1096 return Err(ConfigError::SnapshotFileInvalid(
1098 "Not found ".to_string() + recovery_snapshot_file_path.as_ref().to_str().unwrap(),
1099 ));
1100 }
1101
1102 let wal = wal_config.as_ref().map(|s| WriteAheadLog::new(s.clone()));
1104
1105 let reader = std::fs::File::open(recovery_snapshot_file_path.as_ref()).unwrap();
1107 let mut metadata = SectorAlignedVector::new_zeroed(DISK_PAGE_SIZE); #[cfg(unix)]
1109 {
1110 reader.read_at(&mut metadata, 0).unwrap();
1111 }
1112 #[cfg(windows)]
1113 {
1114 reader.seek_read(&mut metadata, 0).unwrap();
1115 }
1116
1117 let bf_meta = unsafe { (metadata.as_ptr() as *const BfTreeMeta).read() };
1118 bf_meta.check_magic();
1119 assert_eq!(reader.metadata().unwrap().len(), bf_meta.file_size);
1120
1121 let mut bf_tree_config = Config::new_from_snapshot(&bf_meta);
1122
1123 let recovery_snapshot_file_backend = StorageBackend::Std; if !bf_tree_config.cache_only {
1125 bf_tree_config.file_path(recovery_snapshot_file_path.as_ref());
1126 bf_tree_config.storage_backend = recovery_snapshot_file_backend.clone();
1128 } else {
1129 bf_tree_config.storage_backend = StorageBackend::Memory;
1130 }
1131 bf_tree_config.use_snapshot = use_snapshot;
1132
1133 bf_tree_config.snapshot_backend = StorageBackend::Std; let snapshot_mgr = if bf_tree_config.use_snapshot {
1136 Some(Arc::new(CPRSnapShotMgr::new(
1137 bf_tree_config.snapshot_version,
1138 )))
1139 } else {
1140 None
1141 };
1142
1143 if let Some(size) = buffer_size {
1144 bf_tree_config.cb_size_byte = size
1145 }
1146
1147 let size_classes = BfTree::create_mem_page_size_classes(
1148 bf_tree_config.cb_min_record_size,
1149 bf_tree_config.cb_max_record_size,
1150 bf_tree_config.leaf_page_size,
1151 bf_tree_config.max_fence_len,
1152 bf_tree_config.cache_only,
1153 );
1154
1155 bf_tree_config.write_ahead_log = wal_config.clone();
1156 bf_tree_config.validate()?;
1157
1158 let config = Arc::new(bf_tree_config);
1159
1160 let recovery_snapshot_vfs = make_vfs(
1162 &recovery_snapshot_file_backend,
1163 recovery_snapshot_file_path.as_ref(),
1164 );
1165
1166 let mut root_page_id = bf_meta.root_id;
1168 let mut inner_node_page_buffer = SectorAlignedVector::new_zeroed(INNER_NODE_SIZE);
1169 if root_page_id.is_inner_node_pointer() {
1170 let inner_mapping: Vec<(*const InnerNode, usize)> = read_vec_from_offset(
1171 bf_meta.inner_offset,
1172 bf_meta.inner_size,
1173 &recovery_snapshot_vfs,
1174 );
1175
1176 let mut root_cnt = 0;
1178 for (_ptr, offset) in &inner_mapping {
1179 recovery_snapshot_vfs.read(*offset, &mut inner_node_page_buffer);
1180 let inner_node = InnerNodeBuilder::new().build_from_slice(&inner_node_page_buffer);
1181 if unsafe { (*inner_node).is_root() } {
1182 println!("Here");
1183 root_cnt += 1;
1184 }
1185 }
1186 assert_eq!(root_cnt, 1, "Root count in inner mapping: {}", root_cnt);
1187
1188 let mut inner_map = HashMap::new();
1189
1190 for m in inner_mapping {
1191 inner_map.insert(m.0, m.1);
1192 }
1193 let offset = inner_map.get(&root_page_id.as_inner_node()).unwrap();
1194 recovery_snapshot_vfs.read(*offset, &mut inner_node_page_buffer);
1195 let root_page = InnerNodeBuilder::new().build_from_slice(&inner_node_page_buffer);
1196
1197 unsafe {
1199 (*root_page).set_disk_offset(INVALID_DISK_OFFSET as u64);
1200 }
1201 unsafe {
1203 (*root_page).set_root(true);
1204 }
1205 root_page_id = PageID::from_pointer(root_page);
1206
1207 let mut inner_resolve_queue = VecDeque::from([root_page]);
1208 while !inner_resolve_queue.is_empty() {
1209 let inner_ptr = inner_resolve_queue.pop_front().unwrap();
1210 let mut inner = ReadGuard::try_read(inner_ptr).unwrap().upgrade().unwrap();
1211 if inner.as_ref().meta.children_is_leaf() {
1212 continue;
1213 }
1214 for (idx, c) in inner.as_ref().get_child_iter().enumerate() {
1215 let offset = inner_map.get(&c.as_inner_node()).unwrap();
1216 recovery_snapshot_vfs.read(*offset, &mut inner_node_page_buffer);
1217 let inner_page =
1218 InnerNodeBuilder::new().build_from_slice(&inner_node_page_buffer);
1219 unsafe {
1220 (*inner_page).set_disk_offset(INVALID_DISK_OFFSET as u64);
1221 }
1222 let inner_id = PageID::from_pointer(inner_page);
1223 inner.as_mut().update_at_pos(idx, inner_id);
1224 inner_resolve_queue.push_back(inner_page);
1225 }
1226 }
1227 }
1228
1229 let raw_root_id = if root_page_id.is_id() {
1230 root_page_id.raw() | BfTree::ROOT_IS_LEAF_MASK
1231 } else {
1232 root_page_id.raw()
1233 };
1234
1235 if !bf_meta.cache_only {
1238 let base_mapping: Vec<(PageID, usize)> = if bf_meta.base_size > 0 {
1240 read_vec_from_offset(
1241 bf_meta.base_offset,
1242 bf_meta.base_size,
1243 &recovery_snapshot_vfs,
1244 )
1245 } else {
1246 Vec::new()
1247 };
1248
1249 let base_page_loc_mapping = base_mapping.into_iter().map(|(pid, offset)| {
1250 let loc = PageLocation::Base(offset);
1251 (pid, loc)
1252 });
1253
1254 let pt = PageTable::new_from_mapping(
1256 base_page_loc_mapping,
1257 recovery_snapshot_vfs.clone(),
1258 config.clone(),
1259 snapshot_mgr.clone(),
1260 );
1261
1262 let circular_buffer = CircularBuffer::new(
1263 config.cb_size_byte,
1264 config.cb_copy_on_access_ratio,
1265 config.cb_min_record_size,
1266 config.cb_max_record_size,
1267 config.leaf_page_size,
1268 config.max_fence_len,
1269 buffer_ptr,
1270 config.cache_only,
1271 );
1272
1273 let mini_mapping: Vec<(PageID, usize)> = if bf_meta.mini_size > 0 {
1275 read_vec_from_offset(
1276 bf_meta.mini_offset,
1277 bf_meta.mini_size,
1278 &recovery_snapshot_vfs,
1279 )
1280 } else {
1281 Vec::new()
1282 };
1283
1284 let mini_size_mapping: Vec<(PageID, usize)> = if bf_meta.mini_size_size > 0 {
1285 read_vec_from_offset(
1286 bf_meta.mini_size_offset,
1287 bf_meta.mini_size_size,
1288 &recovery_snapshot_vfs,
1289 )
1290 } else {
1291 Vec::new()
1292 };
1293
1294 let mut mini_size_mapping_unique: HashMap<PageID, usize> = HashMap::new();
1295 for (pid, size) in mini_size_mapping {
1296 mini_size_mapping_unique.insert(pid, size);
1297 }
1298
1299 let storage =
1301 LeafStorage::new_inner(config.clone(), pt, circular_buffer, recovery_snapshot_vfs);
1302
1303 for (pid, offset) in &mini_mapping {
1304 let mini_size = *mini_size_mapping_unique.get(pid).unwrap();
1305
1306 let mini_page_guard = match storage.alloc_mini_page(mini_size) {
1308 Ok(mini_page_ptr) => mini_page_ptr,
1309 Err(_) => {
1310 return Err(ConfigError::CircularBufferSize("buffer size set too small. Consider increasing it or not specifying at all".to_string()));
1311 }
1312 };
1313
1314 let mut page_buffer = SectorAlignedVector::new_zeroed(mini_size);
1316 storage.vfs.read(*offset, &mut page_buffer);
1317 unsafe {
1318 std::ptr::copy_nonoverlapping(
1319 page_buffer.as_ptr(),
1320 mini_page_guard.as_ptr(),
1321 mini_size,
1322 );
1323 }
1324
1325 let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
1327 let mini_page = unsafe { &mut *new_mini_ptr };
1328
1329 let mut base_page = storage.page_table.get_mut(pid);
1330 let page_loc = base_page.get_page_location().clone();
1331 match page_loc {
1332 PageLocation::Base(off) => {
1333 mini_page.next_level = MiniPageNextLevel::new(off);
1334 }
1335 _ => {
1336 panic!("Unexpected page location for base page");
1337 }
1338 }
1339 let mini_loc = PageLocation::Mini(new_mini_ptr);
1340
1341 base_page.create_cache_page_loc(mini_loc);
1343 }
1344
1345 Ok(BfTree {
1346 storage,
1347 root_page_id: AtomicU64::new(raw_root_id),
1348 wal,
1349 write_load_full_page: config.write_load_full_page,
1350 cache_only: false,
1351 mini_page_size_classes: size_classes,
1352 snapshot_mgr,
1353 config,
1354 #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
1355 metrics_recorder: Some(Arc::new(ThreadLocal::new())),
1356 })
1357 } else {
1358 let mini_mapping_unallocated: Vec<(PageID, PageLocation)> = (0..bf_meta.leaf_page_num)
1360 .map(|pid| (PageID::from_id(pid as u64), PageLocation::Null))
1361 .collect();
1362
1363 let mini_mapping: Vec<(PageID, usize)> = if bf_meta.mini_size > 0 {
1365 read_vec_from_offset(
1366 bf_meta.mini_offset,
1367 bf_meta.mini_size,
1368 &recovery_snapshot_vfs,
1369 )
1370 } else {
1371 Vec::new()
1372 };
1373
1374 let mini_size_mapping: Vec<(PageID, usize)> = if bf_meta.mini_size_size > 0 {
1375 read_vec_from_offset(
1376 bf_meta.mini_size_offset,
1377 bf_meta.mini_size_size,
1378 &recovery_snapshot_vfs,
1379 )
1380 } else {
1381 Vec::new()
1382 };
1383
1384 let mut mini_size_mapping_unique: HashMap<PageID, usize> = HashMap::new();
1385 for (pid, size) in mini_size_mapping {
1386 mini_size_mapping_unique.insert(pid, size);
1387 }
1388
1389 let storage_vfs = make_vfs(&config.storage_backend, PathBuf::new());
1391
1392 let pt = PageTable::new_from_mapping(
1393 mini_mapping_unallocated.into_iter(),
1394 storage_vfs.clone(),
1395 config.clone(),
1396 snapshot_mgr.clone(),
1397 );
1398
1399 let circular_buffer = CircularBuffer::new(
1400 config.cb_size_byte,
1401 config.cb_copy_on_access_ratio,
1402 config.cb_min_record_size,
1403 config.cb_max_record_size,
1404 config.leaf_page_size,
1405 config.max_fence_len,
1406 buffer_ptr,
1407 config.cache_only,
1408 );
1409
1410 let storage = LeafStorage::new_inner(config.clone(), pt, circular_buffer, storage_vfs);
1412
1413 for (pid, offset) in &mini_mapping {
1414 let mini_size = *mini_size_mapping_unique.get(pid).unwrap();
1415
1416 if *offset == NULL_PAGE_LOCATION_OFFSET {
1418 continue;
1419 }
1420
1421 let mini_page_guard = match storage.alloc_mini_page(mini_size) {
1423 Ok(mini_page_ptr) => mini_page_ptr,
1424 Err(_) => {
1425 panic!("Please increase cb_size_byte in config");
1426 }
1427 };
1428
1429 let mut page_buffer = SectorAlignedVector::new_zeroed(mini_size);
1431 recovery_snapshot_vfs.read(*offset, &mut page_buffer);
1432 unsafe {
1433 std::ptr::copy_nonoverlapping(
1434 page_buffer.as_ptr(),
1435 mini_page_guard.as_ptr(),
1436 mini_size,
1437 );
1438 }
1439
1440 let mini_page_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
1442 let mini_page = unsafe { &mut *mini_page_ptr };
1443 mini_page.next_level = MiniPageNextLevel::new_null();
1444
1445 let mut null_page = storage.page_table.get_mut(pid);
1447 let page_loc = null_page.get_page_location().clone();
1448 match page_loc {
1449 PageLocation::Null => {
1450 let mini_loc = PageLocation::Mini(mini_page_ptr);
1451 null_page.create_cache_page_loc(mini_loc);
1452 }
1453 _ => {
1454 panic!("Unexpected page location for null page");
1455 }
1456 }
1457 }
1458 Ok(BfTree {
1459 storage,
1460 root_page_id: AtomicU64::new(raw_root_id),
1461 wal,
1462 write_load_full_page: config.write_load_full_page,
1463 cache_only: true,
1464 mini_page_size_classes: size_classes,
1465 snapshot_mgr,
1466 config,
1467 #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
1468 metrics_recorder: Some(Arc::new(ThreadLocal::new())),
1469 })
1470 }
1471 }
1472}
1473
1474impl BfTree {
1475 pub fn recovery(
1478 recovery_snapshot_file_path: PathBuf, wal_file: impl AsRef<Path>,
1480 use_snapshot: bool,
1481 buffer_ptr: Option<*mut u8>,
1482 buffer_size: Option<usize>,
1483 wal: Option<Arc<WalConfig>>,
1484 ) {
1485 let bf_tree = BfTree::new_from_cpr_snapshot(
1486 recovery_snapshot_file_path,
1487 use_snapshot,
1488 buffer_ptr,
1489 buffer_size,
1490 wal,
1491 )
1492 .unwrap();
1493 let wal_reader = WalReader::new(wal_file, 4096);
1494
1495 for seg in wal_reader.segment_iter() {
1496 for entry in seg.entry_iter() {
1497 let log_entry = LogEntry::read_from_buffer(entry.1);
1498 match log_entry {
1499 LogEntry::Write(op) => {
1500 bf_tree.insert(op.key, op.value);
1501 }
1502 LogEntry::Split(_op) => {
1503 todo!("implement split op in wal!")
1504 }
1505 }
1506 }
1507 }
1508 }
1509
1510 pub fn cpr_snapshot(&self, snapshot_file_path: impl AsRef<Path>) {
1512 if !self.config.use_snapshot {
1513 panic!("Snapshots are not enabled in the configuration");
1514 }
1515
1516 let snpshot_mgr = self.snapshot_mgr.clone().unwrap();
1517 snpshot_mgr.snapshot(self, snapshot_file_path);
1518 }
1519
1520 pub fn new_from_cpr_snapshot(
1522 recovery_snapshot_file_path: impl AsRef<Path>, use_snapshot: bool,
1524 buffer_ptr: Option<*mut u8>,
1525 buffer_size: Option<usize>,
1526 wal: Option<Arc<WalConfig>>,
1527 ) -> Result<BfTree, ConfigError> {
1528 CPRSnapShotMgr::new_from_snapshot(
1529 recovery_snapshot_file_path,
1530 use_snapshot,
1531 buffer_ptr,
1532 buffer_size,
1533 wal,
1534 )
1535 }
1536
1537 pub fn are_all_threads_in_next_snapshot_version(&self) -> bool {
1539 if let Some(snapshot_mgr) = &self.snapshot_mgr {
1540 return snapshot_mgr.are_all_threads_in_next_version();
1541 }
1542 false
1543 }
1544}
1545
1546struct SectorAlignedVector {
1547 inner: ManuallyDrop<Vec<u8>>,
1548}
1549
1550impl Drop for SectorAlignedVector {
1551 fn drop(&mut self) {
1552 let layout =
1553 std::alloc::Layout::from_size_align(self.inner.capacity(), SECTOR_SIZE).unwrap();
1554 let ptr = self.inner.as_mut_ptr();
1555 unsafe {
1556 std::alloc::dealloc(ptr, layout);
1557 }
1558 }
1559}
1560
1561impl SectorAlignedVector {
1562 fn new_zeroed(capacity: usize) -> Self {
1563 let layout = std::alloc::Layout::from_size_align(capacity, SECTOR_SIZE).unwrap();
1564 let ptr = unsafe { std::alloc::alloc_zeroed(layout) };
1565
1566 let inner = unsafe { Vec::from_raw_parts(ptr, capacity, capacity) };
1567 Self {
1568 inner: ManuallyDrop::new(inner),
1569 }
1570 }
1571}
1572
1573impl Deref for SectorAlignedVector {
1574 type Target = Vec<u8>;
1575
1576 fn deref(&self) -> &Self::Target {
1577 &self.inner
1578 }
1579}
1580
1581impl DerefMut for SectorAlignedVector {
1582 fn deref_mut(&mut self) -> &mut Self::Target {
1583 &mut self.inner
1584 }
1585}
1586
1587#[repr(C, align(512))]
1591pub(crate) struct BfTreeMeta {
1592 magic_begin: [u8; 16],
1593 root_id: PageID,
1595 inner_offset: usize,
1596 inner_size: usize,
1597 mini_offset: usize,
1598 mini_size: usize,
1599 mini_size_offset: usize,
1600 mini_size_size: usize,
1601 base_offset: usize,
1602 base_size: usize,
1603 file_size: u64,
1604 leaf_page_num: usize,
1605 pub(crate) cb_size_byte: usize,
1607 pub(crate) snapshot_version: u64,
1608 pub(crate) cache_only: bool,
1609 pub(crate) read_promotion_rate: usize,
1610 pub(crate) scan_promotion_rate: usize,
1611 pub(crate) cb_min_record_size: usize,
1612 pub(crate) cb_max_record_size: usize,
1613 pub(crate) leaf_page_size: usize,
1614 pub(crate) cb_max_key_len: usize,
1615 pub(crate) max_fence_len: usize,
1616 pub(crate) cb_copy_on_access_ratio: f64,
1617 pub(crate) read_record_cache: bool,
1618 pub(crate) max_mini_page_size: usize,
1619 pub(crate) mini_page_binary_search: bool,
1620 pub(crate) write_load_full_page: bool,
1621 magic_end: [u8; 14],
1622}
1623const _: () = assert!(std::mem::size_of::<BfTreeMeta>() <= DISK_PAGE_SIZE);
1624
1625impl BfTreeMeta {
1626 fn as_slice(&self) -> &[u8] {
1627 let ptr = self as *const Self as *const u8;
1628 let size = std::mem::size_of::<Self>();
1629 unsafe { std::slice::from_raw_parts(ptr, size) }
1630 }
1631
1632 fn check_magic(&self) {
1633 assert_eq!(self.magic_begin, *BF_TREE_MAGIC_BEGIN);
1634 assert_eq!(self.magic_end, *BF_TREE_MAGIC_END);
1635 }
1636}
1637
1638fn serialize_vec_to_disk<T>(v: &[T], vfs: &Arc<dyn VfsImpl>) -> (usize, usize) {
1640 if v.is_empty() {
1641 return (0, 0);
1642 }
1643 let unaligned_ptr = v.as_ptr() as *const u8;
1644 let unaligned_size = std::mem::size_of_val(v);
1645
1646 let aligned_size = align_to_sector_size(unaligned_size);
1647 let layout = std::alloc::Layout::from_size_align(aligned_size, SECTOR_SIZE).unwrap();
1648 unsafe {
1649 let aligned_ptr = std::alloc::alloc_zeroed(layout);
1650 std::ptr::copy_nonoverlapping(unaligned_ptr, aligned_ptr, unaligned_size);
1651 let slice = std::slice::from_raw_parts(aligned_ptr, aligned_size);
1652 let offset = serialize_u8_slice_to_disk(slice, vfs);
1653 std::alloc::dealloc(aligned_ptr, layout);
1654 (offset, unaligned_size)
1655 }
1656}
1657
1658fn read_vec_from_offset<T: Clone>(offset: usize, size: usize, vfs: &Arc<dyn VfsImpl>) -> Vec<T> {
1659 assert!(size > 0);
1660 let slice = read_u8_slice_from_disk(offset, size, vfs);
1661 let ptr = slice.as_ptr() as *const T;
1662 let size = size / std::mem::size_of::<T>();
1663 let slice = unsafe { std::slice::from_raw_parts(ptr, size) };
1664 slice.to_vec()
1665}
1666
1667fn read_u8_slice_from_disk(offset: usize, size: usize, vfs: &Arc<dyn VfsImpl>) -> Vec<u8> {
1668 let mut res = Vec::new();
1669 let mut buffer = vec![0; DISK_PAGE_SIZE];
1670 for i in (0..size).step_by(DISK_PAGE_SIZE) {
1671 vfs.read(offset + i, &mut buffer); res.extend_from_slice(&buffer);
1673 }
1674 res
1675}
1676
1677const SECTOR_SIZE: usize = 512;
1678
1679fn align_to_sector_size(n: usize) -> usize {
1680 (n + SECTOR_SIZE - 1) & !(SECTOR_SIZE - 1)
1681}
1682
1683fn serialize_u8_slice_to_disk(slice: &[u8], vfs: &Arc<dyn VfsImpl>) -> usize {
1687 let mut start_offset = None;
1688 for chunk in slice.chunks(DISK_PAGE_SIZE) {
1689 let offset = vfs.alloc_offset(DISK_PAGE_SIZE); if start_offset.is_none() {
1691 start_offset = Some(offset);
1692 }
1693 vfs.write(offset, chunk);
1694 }
1695 start_offset.unwrap()
1696}
1697
1698#[cfg(test)]
1699mod tests {
1700 use crate::{nodes::leaf_node::LeafReadResult, sync::thread, BfTree, Config};
1701 use std::panic;
1702 #[cfg(feature = "shuttle")]
1703 use std::path::PathBuf;
1704 use std::str::FromStr;
1705 use std::sync::atomic::Ordering;
1706 use std::sync::{atomic::AtomicBool, Arc};
1707
1708 #[test]
1712 fn cpr_snapshot_disk() {
1713 panic::set_hook(Box::new(|info| {
1716 eprintln!("PANIC: {info}");
1717 unsafe { std::arch::asm!("int 3") };
1718 }));
1719
1720 let min_record_size: usize = 64;
1721 let max_record_size: usize = 2408;
1722 let leaf_page_size: usize = 8192;
1723 let snapshot_num: usize = 10;
1724 let num_threads: usize = 4;
1725 let file_path: String = "target/test_simple.bftree".to_string();
1726 let snapshot_file_path: String = "target/test_simple_snapshot.bftree".to_string();
1727
1728 let tmp_file_path = std::path::PathBuf::from_str(&file_path).unwrap();
1729 let tmp_snapshot_file_path = std::path::PathBuf::from_str(&snapshot_file_path).unwrap();
1730
1731 let mut config = Config::new(&tmp_file_path, 128 * 1024); config.storage_backend(crate::StorageBackend::Std);
1733 config.cb_min_record_size = min_record_size + 2 * std::mem::size_of::<usize>();
1734 config.cb_max_record_size = max_record_size;
1735 config.leaf_page_size = leaf_page_size;
1736 config.max_fence_len = min_record_size + 2 * std::mem::size_of::<usize>();
1737 config.use_snapshot(true);
1738
1739 let bftree = Arc::new(BfTree::with_config(config.clone(), None).unwrap());
1740 let finish = Arc::new(AtomicBool::new(false));
1741
1742 let handles: Vec<_> = (0..num_threads)
1743 .map(|i| {
1744 let finish_clone = finish.clone();
1745 let bftree_clone = bftree.clone();
1746
1747 thread::spawn(move || {
1748 let key_len: usize = min_record_size / 2 + std::mem::size_of::<usize>();
1749 assert!(key_len * 2 <= max_record_size);
1750 let mut key_buffer = vec![0usize; key_len / std::mem::size_of::<usize>()];
1751
1752 let mut r: usize = 0;
1753 while !finish_clone.load(Ordering::Relaxed) {
1754 key_buffer.fill(r);
1755 key_buffer[0] = i;
1756
1757 match bftree_clone.insert(
1758 bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
1759 bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
1760 ) {
1761 crate::LeafInsertResult::Success => {}
1762 _ => {
1763 panic!("Insert failed");
1764 }
1765 }
1766 r += 1;
1767 }
1768 r
1769 })
1770 })
1771 .collect();
1772
1773 thread::sleep(std::time::Duration::from_secs(5));
1774 for _ in 0..snapshot_num {
1775 let _ = std::fs::remove_file(&tmp_snapshot_file_path);
1777 bftree.cpr_snapshot(&tmp_snapshot_file_path);
1778 thread::sleep(std::time::Duration::from_secs(5));
1779 }
1780
1781 let mut rs = vec![0usize; num_threads];
1783 finish.store(true, Ordering::Relaxed);
1784 for (i, h) in handles.into_iter().enumerate() {
1785 let r = h.join().unwrap();
1786 rs[i] = r;
1787 }
1788
1789 verify_snapshot_recovery(
1790 &tmp_snapshot_file_path,
1791 num_threads,
1792 min_record_size,
1793 &rs,
1794 true,
1795 );
1796
1797 std::fs::remove_file(tmp_file_path).unwrap();
1798 std::fs::remove_file(tmp_snapshot_file_path).unwrap();
1799 }
1800
1801 #[test]
1803 fn cpr_snapshot_cache_only() {
1804 let min_record_size: usize = 64;
1805 let max_record_size: usize = 2408;
1806 let leaf_page_size: usize = 8192;
1807 let num_threads: usize = 4;
1808
1809 let snapshot_file_path: String =
1810 "target/test_simple_cache_only_snapshot.bftree".to_string();
1811 let tmp_snapshot_file_path = std::path::PathBuf::from_str(&snapshot_file_path).unwrap();
1812
1813 let mut config = Config::default(); config.storage_backend(crate::StorageBackend::Memory);
1815 config.file_path(":memory:");
1816 config.cache_only = true;
1817 config.cb_size_byte(1024 * 1024 * 1024);
1818 config.cb_min_record_size = min_record_size;
1819 config.cb_max_record_size = max_record_size;
1820 config.leaf_page_size = leaf_page_size;
1821 config.max_fence_len = max_record_size;
1822 config.use_snapshot(true);
1823
1824 let bftree = Arc::new(BfTree::with_config(config.clone(), None).unwrap());
1825 let finish = Arc::new(AtomicBool::new(false));
1826
1827 let handles: Vec<_> = (0..num_threads)
1828 .map(|i| {
1829 let finish_clone = finish.clone();
1830 let bftree_clone = bftree.clone();
1831
1832 thread::spawn(move || {
1833 let key_len: usize = min_record_size / 2 + std::mem::size_of::<usize>();
1834 assert!(key_len * 2 <= max_record_size);
1835 let mut key_buffer = vec![0usize; key_len / std::mem::size_of::<usize>()];
1836
1837 let mut r: usize = 0;
1838 while !finish_clone.load(Ordering::Relaxed) {
1839 key_buffer.fill(r);
1840 key_buffer[0] = i;
1841
1842 match bftree_clone.insert(
1843 bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
1844 bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
1845 ) {
1846 crate::LeafInsertResult::Success => {}
1847 _ => {
1848 panic!("Insert failed");
1849 }
1850 }
1851 r += 1;
1852 }
1853 r
1854 })
1855 })
1856 .collect();
1857
1858 thread::sleep(std::time::Duration::from_secs(5));
1859 bftree.cpr_snapshot(&tmp_snapshot_file_path);
1861 thread::sleep(std::time::Duration::from_secs(5));
1862
1863 let mut rs = vec![0usize; num_threads];
1865 finish.store(true, Ordering::Relaxed);
1866 for (i, h) in handles.into_iter().enumerate() {
1867 let r = h.join().unwrap();
1868 rs[i] = r;
1869 }
1870
1871 verify_snapshot_recovery(
1872 &tmp_snapshot_file_path,
1873 num_threads,
1874 min_record_size,
1875 &rs,
1876 false,
1877 );
1878
1879 std::fs::remove_file(tmp_snapshot_file_path).unwrap();
1880 }
1881
1882 fn verify_snapshot_recovery(
1883 snapshot_file: impl AsRef<std::path::Path>,
1884 num_threads: usize,
1885 min_record_size: usize,
1886 records_num_per_threads: &Vec<usize>,
1887 check_prefix: bool,
1888 ) {
1889 let bftree = BfTree::new_from_cpr_snapshot(snapshot_file, false, None, None, None)
1890 .expect("fail to recover from snapshot");
1891
1892 let mut rs_captured = vec![0usize; num_threads];
1893 for i in 0..num_threads {
1894 let record_num = records_num_per_threads[i];
1895
1896 let key_len: usize = min_record_size / 2 + std::mem::size_of::<usize>();
1897 let mut key_buffer = vec![0usize; key_len / std::mem::size_of::<usize>()];
1898 let mut res_buffer = vec![0u8; key_len];
1899 let mut not_included = false;
1900 let mut first_gap_record: Option<usize> = None;
1901
1902 for r in 0..record_num {
1903 key_buffer.fill(r);
1904 key_buffer[0] = i;
1905
1906 match bftree.read(
1907 bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
1908 &mut res_buffer,
1909 ) {
1910 LeafReadResult::Found(v) => {
1911 if check_prefix && not_included {
1912 let mut gaps = Vec::new();
1914 let mut found_after = Vec::new();
1915 let gap_start = first_gap_record.unwrap();
1916 let scan_end = std::cmp::min(r + 50, record_num);
1918 for scan_r in gap_start..scan_end {
1919 key_buffer.fill(scan_r);
1920 key_buffer[0] = i;
1921 match bftree.read(
1922 bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
1923 &mut res_buffer,
1924 ) {
1925 LeafReadResult::Found(_) => {
1926 found_after.push(scan_r);
1927 }
1928 LeafReadResult::NotFound => {
1929 gaps.push(scan_r);
1930 }
1931 _ => {}
1932 }
1933 }
1934 panic!(
1935 "PREFIX VIOLATION: thread={}, first_gap_at={}, found_record_after_gap={}, \
1936 total_captured_before_gap={}, total_records={}\n\
1937 Gaps in [{}, {}): {:?}\n\
1938 Found in [{}, {}): {:?}",
1939 i, gap_start, r, rs_captured[i], record_num,
1940 gap_start, scan_end, &gaps[..std::cmp::min(gaps.len(), 20)],
1941 gap_start, scan_end, &found_after[..std::cmp::min(found_after.len(), 20)],
1942 );
1943 }
1944 assert_eq!(v as usize, key_len);
1945 assert_eq!(
1946 &res_buffer,
1947 bytemuck::must_cast_slice::<usize, u8>(&key_buffer)
1948 );
1949 rs_captured[i] += 1;
1950 }
1951 LeafReadResult::NotFound => {
1952 if !not_included {
1953 not_included = true;
1954 first_gap_record = Some(r);
1955 }
1956 }
1957 _ => {
1958 panic!("Unexpected read result")
1959 }
1960 }
1961 }
1962
1963 assert!(rs_captured[i] <= record_num);
1964 println!("Total inserted records for thread {}: {}", i, record_num);
1965 println!(
1966 "Hit ratio for thread {}: {}",
1967 i,
1968 rs_captured[i] as f64 / record_num as f64
1969 );
1970 }
1971 }
1972
1973 #[cfg(feature = "shuttle")]
1977 fn shuttle_cpr_snapshot_cache_only_inner(iter: usize) {
1978 let min_record_size: usize = 64;
1979 let max_record_size: usize = 2408;
1980 let leaf_page_size: usize = 8192;
1981 let num_threads: usize = 4;
1982 let inserts_per_thread: usize = 1_000; let snapshot_file_path: String = format!(
1986 "target/shuttle_cpr_snapshot_cache_only_{}_{}.bftree",
1987 std::process::id(),
1988 iter,
1989 );
1990 let tmp_snapshot_file_path = std::path::PathBuf::from_str(&snapshot_file_path).unwrap();
1991
1992 let mut config = Config::default();
1993 config.storage_backend(crate::StorageBackend::Memory);
1994 config.file_path(":memory:");
1995 config.cache_only = true;
1996 config.cb_size_byte(1024 * 1024 * 1024);
1999 config.cb_min_record_size = min_record_size;
2000 config.cb_max_record_size = max_record_size;
2001 config.leaf_page_size = leaf_page_size;
2002 config.max_fence_len = max_record_size;
2003 config.use_snapshot(true);
2004
2005 let bftree = Arc::new(BfTree::with_config(config.clone(), None).unwrap());
2006 let mut rs = vec![0usize; num_threads];
2007
2008 for j in 0..2 {
2009 let handles: Vec<_> = (0..num_threads)
2010 .map(|i| {
2011 let bftree_clone = bftree.clone();
2012 let start_id = j * inserts_per_thread;
2013 let end_id = start_id + inserts_per_thread;
2014 thread::spawn(move || {
2015 let key_len: usize = min_record_size / 2 + std::mem::size_of::<usize>();
2016 assert!(key_len * 2 <= max_record_size);
2017 let mut key_buffer = vec![0usize; key_len / std::mem::size_of::<usize>()];
2018
2019 for r in start_id..end_id {
2020 key_buffer.fill(r);
2021 key_buffer[0] = i;
2022
2023 match bftree_clone.insert(
2024 bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
2025 bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
2026 ) {
2027 crate::LeafInsertResult::Success => {}
2028 _ => {
2029 panic!("Insert failed");
2030 }
2031 }
2032 }
2033 inserts_per_thread
2034 })
2035 })
2036 .collect();
2037
2038 let bftree_for_snap = bftree.clone();
2040 let snap_path = tmp_snapshot_file_path.clone();
2041 let snap_handle = thread::spawn(move || {
2042 bftree_for_snap.cpr_snapshot(&snap_path);
2043 });
2044
2045 for (i, h) in handles.into_iter().enumerate() {
2046 let r = h.join().unwrap();
2047 rs[i] += r;
2048 }
2049
2050 snap_handle.join().unwrap();
2052 let snap_path = tmp_snapshot_file_path.clone();
2053 verify_snapshot_recovery(&snap_path, num_threads, min_record_size, &rs, false);
2054 }
2055
2056 let _ = std::fs::remove_file(tmp_snapshot_file_path);
2057 }
2058
2059 #[cfg(feature = "shuttle")]
2061 #[test]
2062 fn shuttle_cpr_snapshot_cache_only() {
2063 use std::sync::atomic::AtomicUsize;
2064
2065 static ITER: AtomicUsize = AtomicUsize::new(0);
2068
2069 let mut shuttle_config = shuttle::Config::default();
2070 shuttle_config.max_steps = shuttle::MaxSteps::None;
2072 shuttle_config.stack_size = 1024 * 1024 * 1024; shuttle_config.failure_persistence =
2074 shuttle::FailurePersistence::File(Some(PathBuf::from_str("target").unwrap()));
2075
2076 let mut runner = shuttle::PortfolioRunner::new(true, shuttle_config);
2077 let available_cores = std::thread::available_parallelism().unwrap().get().min(4);
2078 for _ in 0..available_cores {
2079 runner.add(shuttle::scheduler::PctScheduler::new(5, 400));
2080 }
2081
2082 runner.run(|| {
2083 let iter = ITER.fetch_add(1, Ordering::Relaxed);
2084 shuttle_cpr_snapshot_cache_only_inner(iter);
2085 eprintln!("Completed shuttle iteration {}", iter);
2086 });
2087 }
2088
2089 #[cfg(feature = "shuttle")]
2093 fn shuttle_cpr_snapshot_disk_inner(iter: usize) {
2094 let min_record_size: usize = 64;
2095 let max_record_size: usize = 2408;
2096 let leaf_page_size: usize = 8192;
2097 let num_threads: usize = 4;
2098 let inserts_per_thread: usize = 500;
2102
2103 let file_path: String = format!(
2104 "target/shuttle_cpr_snapshot_disk_{}_{}.bftree",
2105 std::process::id(),
2106 iter,
2107 );
2108 let snapshot_file_path: String = format!(
2109 "target/shuttle_cpr_snapshot_disk_{}_{}_snap.bftree",
2110 std::process::id(),
2111 iter,
2112 );
2113 let tmp_file_path = std::path::PathBuf::from_str(&file_path).unwrap();
2114 let tmp_snapshot_file_path = std::path::PathBuf::from_str(&snapshot_file_path).unwrap();
2115
2116 let mut config = Config::new(&tmp_file_path, 128 * 1024); config.storage_backend(crate::StorageBackend::Std);
2118 config.cb_min_record_size = min_record_size + 2 * std::mem::size_of::<usize>();
2119 config.cb_max_record_size = max_record_size;
2120 config.leaf_page_size = leaf_page_size;
2121 config.max_fence_len = min_record_size + 2 * std::mem::size_of::<usize>();
2122 config.use_snapshot(true);
2123
2124 let bftree = Arc::new(BfTree::with_config(config.clone(), None).unwrap());
2125 let mut rs = vec![0usize; num_threads];
2126 for j in 0..3 {
2127 let handles: Vec<_> = (0..num_threads)
2128 .map(|i| {
2129 let bftree_clone = bftree.clone();
2130 let start_id = j * inserts_per_thread;
2133 let end_id = start_id + inserts_per_thread;
2134 thread::spawn(move || {
2135 let key_len: usize = min_record_size / 2 + std::mem::size_of::<usize>();
2136 assert!(key_len * 2 <= max_record_size);
2137 let mut key_buffer = vec![0usize; key_len / std::mem::size_of::<usize>()];
2138
2139 for r in start_id..end_id {
2140 key_buffer.fill(r);
2141 key_buffer[0] = i;
2142
2143 match bftree_clone.insert(
2144 bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
2145 bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
2146 ) {
2147 crate::LeafInsertResult::Success => {}
2148 _ => {
2149 panic!("Insert failed");
2150 }
2151 }
2152 }
2153 inserts_per_thread
2154 })
2155 })
2156 .collect();
2157
2158 let bftree_for_snap = bftree.clone();
2160 let snap_path = tmp_snapshot_file_path.clone();
2161 let snap_handle = thread::spawn(move || {
2162 bftree_for_snap.cpr_snapshot(&snap_path);
2163 });
2164
2165 for (i, h) in handles.into_iter().enumerate() {
2166 let r = h.join().unwrap();
2167 rs[i] += r;
2168 }
2169
2170 snap_handle.join().unwrap();
2171
2172 verify_snapshot_recovery(
2174 &tmp_snapshot_file_path,
2175 num_threads,
2176 min_record_size,
2177 &rs,
2178 true,
2179 );
2180 }
2181 let _ = std::fs::remove_file(tmp_file_path);
2182 let _ = std::fs::remove_file(tmp_snapshot_file_path);
2183 }
2184
2185 #[cfg(feature = "shuttle")]
2186 #[test]
2187 fn shuttle_cpr_snapshot_disk() {
2188 use std::sync::atomic::AtomicUsize;
2189
2190 static ITER: AtomicUsize = AtomicUsize::new(0);
2193
2194 let mut shuttle_config = shuttle::Config::default();
2195 shuttle_config.max_steps = shuttle::MaxSteps::None;
2196 shuttle_config.stack_size = 4 * 1024 * 1024; shuttle_config.failure_persistence =
2201 shuttle::FailurePersistence::File(Some(PathBuf::from_str("target").unwrap()));
2202
2203 let mut runner = shuttle::PortfolioRunner::new(true, shuttle_config);
2204 let available_cores = std::thread::available_parallelism().unwrap().get().min(4);
2205 for _ in 0..available_cores {
2206 runner.add(shuttle::scheduler::PctScheduler::new(5, 400));
2207 }
2208
2209 runner.run(|| {
2210 let iter = ITER.fetch_add(1, Ordering::Relaxed);
2211 shuttle_cpr_snapshot_disk_inner(iter);
2212 });
2213 }
2214
2215 #[cfg(feature = "shuttle")]
2216 #[test]
2217 fn shuttle_replay() {
2218 let schedule_path = "target/schedule000.txt";
2219 if !std::path::Path::new(schedule_path).exists() {
2220 eprintln!("No schedule file at {schedule_path}; run shuttle_cpr_snapshot_disk to generate one on failure.");
2221 return;
2222 }
2223
2224 tracing_subscriber::fmt()
2226 .with_ansi(true)
2227 .with_thread_names(false)
2228 .with_target(false)
2229 .init();
2230
2231 shuttle::replay_from_file(|| shuttle_cpr_snapshot_disk_inner(0), schedule_path);
2232 }
2233}