Skip to main content

bf_tree/
snapshot.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT license.
3
4use std::cell::UnsafeCell;
5use std::collections::{HashMap, VecDeque};
6use std::mem::{ManuallyDrop, MaybeUninit};
7use std::ops::{Deref, DerefMut};
8use std::panic;
9use std::path::Path;
10use std::path::PathBuf;
11use std::sync::Arc;
12
13#[cfg(not(all(feature = "shuttle", test)))]
14use rand::Rng;
15#[cfg(all(feature = "shuttle", test))]
16use shuttle::rand::Rng;
17
18#[cfg(unix)]
19use std::os::unix::fs::FileExt;
20#[cfg(windows)]
21use std::os::windows::fs::FileExt;
22use std::sync::atomic::AtomicUsize;
23
24#[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
25use thread_local::ThreadLocal;
26
27use crate::{
28    circular_buffer::CircularBuffer,
29    error::ConfigError,
30    fs::VfsImpl,
31    mini_page_op::LeafOperations,
32    nodes::{leaf_node::MiniPageNextLevel, LeafNode, INVALID_DISK_OFFSET},
33    nodes::{InnerNode, InnerNodeBuilder, PageID, DISK_PAGE_SIZE, INNER_NODE_SIZE},
34    storage::{make_vfs, LeafStorage, PageLocation, PageTable},
35    sync::atomic::{AtomicBool, AtomicU64, Ordering},
36    utils::{get_rng, inner_lock::ReadGuard, BfsVisitor, NodeInfo},
37    wal::{LogEntry, LogEntryImpl, WriteAheadLog},
38    BfTree, Config, StorageBackend, WalConfig, WalReader,
39};
40
41const BF_TREE_MAGIC_BEGIN: &[u8; 16] = b"BF-TREE-V0-BEGIN";
42const BF_TREE_MAGIC_END: &[u8; 14] = b"BF-TREE-V0-END";
43const META_DATA_PAGE_OFFSET: usize = 0;
44
45const INVALID_SNAPSHOT_THREAD_ID: usize = usize::MAX; // Invalid thread slot id
46const NULL_PAGE_LOCATION_OFFSET: usize = usize::MAX; // Special page loc offset for a Null page
47const INVALID_SNAPSHOT_STATE: u64 = u64::MAX; // Invalid snapshot state
48pub const INVALID_SNAPSHOT_VERSION: u64 = u64::MAX; // Invalid snapshot version
49const DEFAULT_MAX_SNAPSHOT_THREAD_NUM: usize = 256; // Maximum numbers of concurrent threads in Bf-tree, if snapshot is enabled.
50const SNAPSHOT_STATE_PHASE_ID_SHIFT: usize = 61; // Number of bits to shift for phase id
51const SNAPSHOT_STATE_PHASE_NUM: u64 = 4; // There are 4 snapshot phases
52const SNAPSHOT_STATE_PHASE_ID_MASK: u64 = 0b111 << SNAPSHOT_STATE_PHASE_ID_SHIFT; // Most significant 3 bits for phase id (allowing up to 8 phases)
53const SNAPSHOT_STATE_VERSION_MASK: u64 = (1 << SNAPSHOT_STATE_PHASE_ID_SHIFT) - 1; // Least significant 61 bits for version number
54
55/// A simplified CPR snapshot of a Bf-Tree.
56/// For details, see the original CPR paper.
57/// When compared to the original CPR paper, the following simplifications were made:
58/// 1. No epoch framework, global state machine only.
59/// 2. No 2PL and partial execution of read/upsert/delete/ in one version allowed
60///    E.g., a bf-tree upsert operation requires a series of mini-transcational modifications m_0..m_n.
61///    We do not lock all m(s) beforehand. Also, we allow the operation to restart if at m_i CPR detects
62///    a version inconsistency (PREPARE/v thread seeing a (v+1) record). This is OK for bf-tree
63///    because all m(s) are independent even though a m_i could involve multiple page modifications.
64pub struct CPRSnapShotMgr {
65    // Snapshot state
66    // | 3 bits   |     61 bits    |
67    // | phase id | version number |
68    global_state: AtomicU64,
69    // False means the thread slot is vacant while True means occupied
70    thread_slots: [AtomicBool; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
71    // Thread-level snapshot state
72    thread_local_states: [AtomicU64; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
73    // Mappings of base/mini/inner nodes to their corresponding snapshot file offsets.
74    // Each user thread updates its own mappings asyncrhonously and get merged in the end.
75    thread_local_inner_mappings:
76        UnsafeCell<[Vec<(*const InnerNode, usize)>; DEFAULT_MAX_SNAPSHOT_THREAD_NUM]>,
77    thread_local_base_mappings: UnsafeCell<[Vec<(PageID, usize)>; DEFAULT_MAX_SNAPSHOT_THREAD_NUM]>,
78    thread_local_mini_mappings: UnsafeCell<[Vec<(PageID, usize)>; DEFAULT_MAX_SNAPSHOT_THREAD_NUM]>,
79    thread_local_mini_size_mappings:
80        UnsafeCell<[Vec<(PageID, usize)>; DEFAULT_MAX_SNAPSHOT_THREAD_NUM]>,
81    root_id: AtomicU64, // Page ID of the root node.
82    pause_snapshot: AtomicBool,
83    // The physical file of snapshot
84    vfs: Arc<dyn VfsImpl>,
85    // Ensuring only one snapshot is in progress at a time.
86    snapshot_in_progress: AtomicBool,
87}
88
89unsafe impl Sync for CPRSnapShotMgr {}
90
91unsafe impl Send for CPRSnapShotMgr {}
92
93/// Within the life time of this guard, bf-tree transactions are protected by CPR logic.
94pub struct CPRSnapshotGuard {
95    snapshot_mgr: Option<Arc<CPRSnapShotMgr>>,
96    thread_slot_id: usize,
97}
98
99impl CPRSnapshotGuard {
100    pub fn new(snapshot_mgr: Option<Arc<CPRSnapShotMgr>>) -> Result<Self, ()> {
101        match snapshot_mgr {
102            None => Ok(Self {
103                snapshot_mgr: None,
104                thread_slot_id: INVALID_SNAPSHOT_THREAD_ID,
105            }),
106            Some(ref mgr) => {
107                let thread_slot_id = mgr
108                    .reserve_thread_slot()
109                    .unwrap_or(INVALID_SNAPSHOT_THREAD_ID);
110                if thread_slot_id == INVALID_SNAPSHOT_THREAD_ID {
111                    return Err(());
112                }
113
114                assert_eq!(
115                    thread_slot_id,
116                    CPRSnapShotMgr::get_snapshot_thread_id().unwrap()
117                );
118                assert_eq!(
119                    CPRSnapShotMgr::get_snapshot_thread_version(),
120                    mgr.get_local_version(&thread_slot_id)
121                );
122
123                Ok(Self {
124                    snapshot_mgr: Some(mgr.clone()),
125                    thread_slot_id,
126                })
127            }
128        }
129    }
130
131    /// Returns true if the snapshot guard has a valid thread slot id.
132    pub fn is_protected(&self) -> bool {
133        self.thread_slot_id != INVALID_SNAPSHOT_THREAD_ID
134    }
135
136    pub fn get_local_phase_id(&self) -> u64 {
137        self.snapshot_mgr
138            .as_ref()
139            .unwrap()
140            .get_local_phase_id(&self.thread_slot_id)
141    }
142
143    pub fn snapshot_base_page(&self, id: PageID, ptr: &[u8], size: usize) {
144        self.snapshot_mgr
145            .as_ref()
146            .unwrap()
147            .snapshot_base_page(id, ptr, size);
148    }
149
150    pub fn snapshot_mini_page(&self, id: PageID, ptr: &[u8], size: usize) {
151        self.snapshot_mgr
152            .as_ref()
153            .unwrap()
154            .snapshot_mini_page(id, ptr, size);
155    }
156
157    pub fn snapshot_inner_node(&self, ptr: *const InnerNode) {
158        self.snapshot_mgr.as_ref().unwrap().snapshot_inner_node(ptr);
159    }
160
161    pub fn snapshot_root_page(&self, root_id: PageID) {
162        self.snapshot_mgr
163            .as_ref()
164            .unwrap()
165            .snapshot_root_page(root_id);
166    }
167}
168
169impl Drop for CPRSnapshotGuard {
170    fn drop(&mut self) {
171        if let Some(ref mgr) = self.snapshot_mgr {
172            mgr.release_thread_slot(self.thread_slot_id);
173        }
174    }
175}
176
177impl CPRSnapShotMgr {
178    //TODO: version id wraps around. Need to handle it
179
180    // Each user thread is assigned an unique snapshot thread id and local snapshot version
181    // for each bf-tree transaction (node split/insert/create), if snapshot is enabled.
182    thread_local! {
183        static SNAPSHOT_THREAD_ID: AtomicUsize = const { AtomicUsize::new(INVALID_SNAPSHOT_THREAD_ID) };
184        // For snapshot thread with valid snapshot_thread_id, its version is already contained in thread_local_states.
185        // But we keep an extra copy here for easier access.
186        static SNAPSHOT_THREAD_VERSION: AtomicU64 = const { AtomicU64::new(INVALID_SNAPSHOT_VERSION) };
187    }
188
189    pub fn are_all_threads_in_next_version(&self) -> bool {
190        if !self.snapshot_in_progress.load(Ordering::Acquire) {
191            false
192        } else {
193            let global_phase_id = self.get_global_phase_id();
194            global_phase_id == 3 || global_phase_id == 0
195        }
196    }
197
198    pub fn get_snapshot_thread_id() -> Result<usize, ()> {
199        let tid = Self::SNAPSHOT_THREAD_ID.with(|id| id.load(Ordering::Relaxed));
200        if tid == INVALID_SNAPSHOT_THREAD_ID {
201            Err(())
202        } else {
203            Ok(tid)
204        }
205    }
206
207    pub fn set_snapshot_thread_tls(tid: usize, version: u64) {
208        Self::SNAPSHOT_THREAD_ID.with(|id| id.store(tid, Ordering::Relaxed));
209        Self::SNAPSHOT_THREAD_VERSION.with(|ver| ver.store(version, Ordering::Relaxed));
210    }
211
212    pub fn get_snapshot_thread_version() -> u64 {
213        Self::SNAPSHOT_THREAD_VERSION.with(|ver| ver.load(Ordering::Relaxed))
214    }
215
216    /// Initialize a new snapshot instance.
217    pub fn new(storage_backend: &StorageBackend, version: u64, file_path: &PathBuf) -> Self {
218        let vfs: Arc<dyn VfsImpl> = make_vfs(storage_backend, file_path);
219        Self {
220            global_state: AtomicU64::new(Self::new_snapshot_state(0, version)), // Initial state: (REST, version)
221            thread_slots: [const { AtomicBool::new(false) }; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
222            thread_local_states: [const { AtomicU64::new(INVALID_SNAPSHOT_STATE) };
223                DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
224            thread_local_inner_mappings: UnsafeCell::new(
225                [const { Vec::new() }; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
226            ),
227            thread_local_base_mappings: UnsafeCell::new(
228                [const { Vec::new() }; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
229            ),
230            thread_local_mini_mappings: UnsafeCell::new(
231                [const { Vec::new() }; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
232            ),
233            thread_local_mini_size_mappings: UnsafeCell::new(
234                [const { Vec::new() }; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
235            ),
236            root_id: AtomicU64::new(0),
237            pause_snapshot: AtomicBool::new(false),
238            vfs,
239            snapshot_in_progress: AtomicBool::new(false),
240        }
241    }
242
243    /// Reset the snapshot
244    fn reset(&self) {
245        let local_inner_mappings = unsafe { &mut *self.thread_local_inner_mappings.get() };
246        let local_mini_mappings = unsafe { &mut *self.thread_local_mini_mappings.get() };
247        let local_base_mappings = unsafe { &mut *self.thread_local_base_mappings.get() };
248        let local_mini_size_mappings = unsafe { &mut *self.thread_local_mini_size_mappings.get() };
249        // De-duplicate mappings
250        for thread_slot_id in 0..DEFAULT_MAX_SNAPSHOT_THREAD_NUM {
251            local_inner_mappings[thread_slot_id].clear();
252            local_mini_mappings[thread_slot_id].clear();
253            local_base_mappings[thread_slot_id].clear();
254            local_mini_size_mappings[thread_slot_id].clear();
255        }
256    }
257
258    pub fn new_snapshot_state(phase_id: u64, version: u64) -> u64 {
259        assert!(
260            phase_id < SNAPSHOT_STATE_PHASE_NUM,
261            "Phase id must be less than {}",
262            SNAPSHOT_STATE_PHASE_NUM
263        );
264        assert!(
265            version < (1 << SNAPSHOT_STATE_PHASE_ID_SHIFT),
266            "Version must be less than 2^61"
267        );
268        (phase_id << SNAPSHOT_STATE_PHASE_ID_SHIFT) | version
269    }
270
271    fn get_global_version(&self) -> u64 {
272        self.global_state.load(Ordering::Acquire) & SNAPSHOT_STATE_VERSION_MASK
273    }
274
275    fn get_global_phase_id(&self) -> u64 {
276        (self.global_state.load(Ordering::Acquire) & SNAPSHOT_STATE_PHASE_ID_MASK)
277            >> SNAPSHOT_STATE_PHASE_ID_SHIFT
278    }
279
280    /// Retrieve the local state of a thread specified by its slot id.
281    fn get_local_state(&self, thread_slot_id: &usize) -> u64 {
282        self.thread_local_states[*thread_slot_id].load(Ordering::Acquire)
283    }
284
285    fn set_local_state(&self, thread_slot_id: &usize, state: u64) {
286        // Don't dirty the existing state if it's the same.
287        let current_state = self.get_local_state(thread_slot_id);
288        if current_state == state {
289            return;
290        }
291
292        self.thread_local_states[*thread_slot_id].store(state, Ordering::Release);
293    }
294
295    /// Retrieve the local version of a thread specified by its slot id.
296    pub fn get_local_version(&self, thread_slot_id: &usize) -> u64 {
297        let state = self.get_local_state(thread_slot_id);
298        state & SNAPSHOT_STATE_VERSION_MASK
299    }
300
301    /// Retrieve the local phase of a thread specified by its slot id.
302    pub fn get_local_phase_id(&self, thread_slot_id: &usize) -> u64 {
303        let state = self.get_local_state(thread_slot_id);
304        (state & SNAPSHOT_STATE_PHASE_ID_MASK) >> SNAPSHOT_STATE_PHASE_ID_SHIFT
305    }
306
307    /// Move the global snapshot stable to the next one.
308    fn advance_global_state(&self) -> u64 {
309        let phase_id = self.get_global_phase_id();
310        let version = self.get_global_version();
311
312        match phase_id {
313            0 => {
314                // (REST, v) -> (PREPARE, v)
315                let new_state = Self::new_snapshot_state(1, version);
316                self.global_state.store(new_state, Ordering::Release);
317                new_state
318            }
319            1 => {
320                // (PREPARE, v) -> (IN_PROGRESS, v + 1)
321                let new_state = Self::new_snapshot_state(2, version + 1);
322                self.global_state.store(new_state, Ordering::Release);
323                new_state
324            }
325            2 => {
326                // (IN_PROGRESS, v + 1) -> (SWEEPING, v + 1)
327                let new_state = Self::new_snapshot_state(3, version);
328                self.global_state.store(new_state, Ordering::Release);
329                new_state
330            }
331            3 => {
332                // (SWEEPING, v + 1) -> (REST, v + 1)
333                let new_state = Self::new_snapshot_state(0, version);
334                self.global_state.store(new_state, Ordering::Release);
335                new_state
336            }
337            _ => {
338                panic!("Invalid global phase id: {}", phase_id);
339            }
340        }
341    }
342
343    /// If all thread local states are either invalid or equal to the target state, return true.
344    /// This can only be invoked after the global state has advanced to the target_state.
345    fn check_if_phase_completed(&self, target_state: u64) -> bool {
346        // Checking all thread local states is sufficient because of the guarantee in `reserve_thread_slot`
347        for thread_slot_id in 0..DEFAULT_MAX_SNAPSHOT_THREAD_NUM {
348            let local_state = self.thread_local_states[thread_slot_id].load(Ordering::Acquire);
349            if local_state != INVALID_SNAPSHOT_STATE && local_state != target_state {
350                return false;
351            }
352        }
353        true
354    }
355
356    /// Obtain an unique thread slot id for the caller thread.
357    /// Guarantee that any local state assigned after the global state advances to the next one,
358    /// will either be reversed without further action or in the new state.
359    pub fn reserve_thread_slot(&self) -> Result<usize, ()> {
360        if self.pause_snapshot.load(Ordering::Acquire) {
361            return Err(());
362        }
363
364        let start = get_rng().gen_range(0..DEFAULT_MAX_SNAPSHOT_THREAD_NUM);
365        let end = 2 * DEFAULT_MAX_SNAPSHOT_THREAD_NUM;
366
367        for i in start..end {
368            let tid = i % DEFAULT_MAX_SNAPSHOT_THREAD_NUM;
369            if self.thread_slots[tid]
370                .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
371                .is_ok()
372            {
373                // Set the caller thread's snapshot local state to the global state
374                let global_state = self.global_state.load(Ordering::Acquire);
375                self.set_local_state(&tid, global_state);
376
377                // If the global state has changed, reset
378                // This is to guarantee that as soon as global state rolls to the next one,
379                // all new local states will either be reversed without further action or in the new state.
380                // Otherwise something bad could happen as described below:
381                // T1: state = global phase 'x'
382                // Mgr: global phase <- 'x + 1'
383                // Mrgr: all threads in phase 'x + 1' or invalid -> Execute 'x + 1' action
384                // T1: local state = state <- Inconsistency with global state
385                if self.get_local_state(&tid) != self.global_state.load(Ordering::Acquire) {
386                    self.set_local_state(&tid, INVALID_SNAPSHOT_STATE);
387                    assert!(self.thread_slots[tid]
388                        .compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
389                        .is_ok());
390                    return Err(());
391                } else {
392                    // Thread's TLS data must be empty before setting it
393                    assert!(Self::get_snapshot_thread_id().is_err());
394
395                    // Set the caller thread's snapshot thread id and version in TLS.
396                    Self::set_snapshot_thread_tls(tid, global_state & SNAPSHOT_STATE_VERSION_MASK);
397                    return Ok(tid);
398                }
399            }
400        }
401        Err(())
402    }
403
404    /// Free up the thread slot specified by the given thread slot id.
405    /// Reset the thread's TLS
406    pub fn release_thread_slot(&self, thread_slot_id: usize) {
407        self.set_local_state(&thread_slot_id, INVALID_SNAPSHOT_STATE);
408        self.thread_slots[thread_slot_id].store(false, Ordering::Release);
409
410        Self::set_snapshot_thread_tls(INVALID_SNAPSHOT_THREAD_ID, INVALID_SNAPSHOT_VERSION);
411    }
412
413    pub fn get_snapshot_guard(
414        snapshot_mgr: Option<Arc<CPRSnapShotMgr>>,
415    ) -> Result<CPRSnapshotGuard, ()> {
416        CPRSnapshotGuard::new(snapshot_mgr)
417    }
418
419    /// Snapshot a page to the current snapshot file and return its offset in the file.
420    /// The invoker needs to guarantee that the page to be copied is xlocked
421    /// throughout the lifetime of this function.
422    /// Also the invoker needs to guarantee proper alignment of ptr which could be required
423    /// by the underlying vfs. (E.g., io_uring_vfs requires 512B alignment)
424    pub fn snapshot_page(&self, ptr: &[u8], size: usize) -> usize {
425        // Allocate space in the snapshot file
426        let offset = self.vfs.alloc_offset(size);
427
428        // Copy the page (ptr) to the new space
429        self.vfs.write(offset, ptr);
430
431        // Return the offset
432        offset
433    }
434
435    pub fn snapshot_inner_node(&self, ptr: *const InnerNode) {
436        let offset = unsafe { self.snapshot_page((&*ptr).as_slice(), INNER_NODE_SIZE) };
437        let inner_mappings = unsafe { &mut *self.thread_local_inner_mappings.get() };
438
439        let tid = Self::get_snapshot_thread_id()
440            .expect("Snapshot of page triggered by unregistered thread.");
441        inner_mappings[tid].push((ptr, offset));
442    }
443
444    pub fn snapshot_mini_page(&self, id: PageID, ptr: &[u8], size: usize) {
445        let offset = if size != 0 {
446            self.snapshot_page(ptr, size)
447        } else {
448            // cache-only mode, NULL page
449            NULL_PAGE_LOCATION_OFFSET
450        };
451
452        let mini_mappings = unsafe { &mut *self.thread_local_mini_mappings.get() };
453        let tid = Self::get_snapshot_thread_id()
454            .expect("Snapshot of page triggered by unregistered thread.");
455        mini_mappings[tid].push((id, offset));
456
457        let mini_size_mappings = unsafe { &mut *self.thread_local_mini_size_mappings.get() };
458        mini_size_mappings[tid].push((id, size));
459    }
460
461    pub fn snapshot_base_page(&self, id: PageID, ptr: &[u8], size: usize) {
462        let offset = self.snapshot_page(ptr, size);
463
464        let base_mappings = unsafe { &mut *self.thread_local_base_mappings.get() };
465        let tid = Self::get_snapshot_thread_id()
466            .expect("Snapshot of page triggered by unregistereds thread.");
467        base_mappings[tid].push((id, offset));
468    }
469
470    pub fn snapshot_root_page(&self, root_id: PageID) {
471        self.root_id.store(root_id.raw(), Ordering::Release);
472    }
473
474    /// Sweep through all data pages and take snapshots of those
475    /// whose version is less than the passed-in snapshot version.
476    fn sweep(
477        &self,
478        tree: &BfTree,
479        version: u64,
480        inner_mapping: &mut Vec<(*const InnerNode, usize)>,
481        mini_mapping: &mut Vec<(PageID, usize)>,
482        mini_size_mapping: &mut Vec<(PageID, usize)>,
483        base_mapping: &mut Vec<(PageID, usize)>,
484    ) -> usize {
485        // There is no page table for inner nodes including the root, and each inner node's information is only
486        // saved in the tree structure itself. As a result, we need to traverse the tree to find all inner nodes.
487        // However, without blocking inner node splitting, the tree structure could change concurrently while we
488        // are BFS/DFSing the tree, making it difficult to enumerate all inner nodes to build inner node mappings.
489        // As such we freeze the tree structure temporarily using a 3-phase approach.
490        // Phase 1: Block all snapshot id reservation, drain the thread table.
491        // Phase 2: Traverse the tree and take snapshots of inner nodes whose version is < 'version'.
492        // Phase 3: Unblock snapshot id reservation.
493        // Given that there are usually a very limited number of inner nodes, user workload should be affected minimally.
494        // TODO: A complete block-free approach to scan all inner nodes.
495        self.pause_snapshot.store(true, Ordering::Release);
496        loop {
497            if self.check_if_phase_completed(INVALID_SNAPSHOT_STATE) {
498                // Upon reaching here, no user threads can obtain a snapshot id nor making changes to the tree structure.
499                // We sweep the root node first
500                loop {
501                    let root_id = tree.get_root_page();
502                    let rid = root_id.0;
503                    if root_id.1 {
504                        // Leaf
505                        let mut leaf = tree.mapping_table().get(&rid);
506                        let page_loc = leaf.get_page_location();
507
508                        match page_loc {
509                            PageLocation::Base(offset) => {
510                                let base_ref = leaf.load_base_page(*offset);
511                                if base_ref.get_snapshot_version() < version {
512                                    let base_ptr = unsafe {
513                                        std::slice::from_raw_parts(
514                                            base_ref as *const LeafNode as *const u8,
515                                            base_ref.meta.node_size as usize,
516                                        )
517                                    };
518                                    let offset = self
519                                        .snapshot_page(base_ptr, base_ref.meta.node_size as usize);
520                                    base_mapping.push((rid, offset));
521                                    self.snapshot_root_page(rid);
522                                }
523                            }
524                            PageLocation::Mini(ptr) => {
525                                // Root page is a mini page only in cache-only mode.
526                                assert!(tree.cache_only);
527
528                                let mini_ref = leaf.load_cache_page(*ptr);
529                                if mini_ref.get_snapshot_version() < version {
530                                    let mini_ptr = unsafe {
531                                        std::slice::from_raw_parts(
532                                            mini_ref as *const LeafNode as *const u8,
533                                            mini_ref.meta.node_size as usize,
534                                        )
535                                    };
536                                    let offset = self
537                                        .snapshot_page(mini_ptr, mini_ref.meta.node_size as usize);
538                                    mini_mapping.push((rid, offset));
539                                    mini_size_mapping.push((rid, mini_ref.meta.node_size as usize));
540                                    self.snapshot_root_page(rid);
541                                }
542                            }
543                            _ => {
544                                panic!("Unexpected page location for root page: {:?}", page_loc);
545                            }
546                        }
547
548                        break;
549                    } else {
550                        // Inner
551                        let ptr = rid.as_inner_node();
552                        // No need for WriteGuard as the tree structured is frozen and there are no active writers.
553                        let inner = match ReadGuard::try_read(ptr) {
554                            Ok(inner) => inner,
555                            Err(_) => continue,
556                        };
557
558                        if inner.as_ref().get_snapshot_version() < version {
559                            let offset =
560                                unsafe { self.snapshot_page((&*ptr).as_slice(), INNER_NODE_SIZE) };
561                            inner_mapping.push((ptr, offset));
562                            self.snapshot_root_page(rid);
563                        }
564                        break;
565                    }
566                }
567
568                // Inner nodes
569                let visitor = BfsVisitor::new_inner_only(tree);
570                for node in visitor {
571                    loop {
572                        match node {
573                            NodeInfo::Inner { ptr, .. } => {
574                                // No need for WriteGuard as the tree structured is frozen and there are no active writers.
575                                let inner = match ReadGuard::try_read(ptr) {
576                                    Ok(inner) => inner,
577                                    Err(_) => continue,
578                                };
579
580                                if inner.as_ref().get_snapshot_version() < version {
581                                    let offset = unsafe {
582                                        self.snapshot_page((&*ptr).as_slice(), INNER_NODE_SIZE)
583                                    };
584                                    inner_mapping.push((ptr, offset));
585                                }
586
587                                break;
588                            }
589                            NodeInfo::Leaf { level, .. } => {
590                                // This should have been captured by the case when root node is a leaf
591                                assert_eq!(level, 0);
592                                break;
593                            }
594                        }
595                    }
596                }
597
598                break;
599            }
600            // At most wasting 1 second per state transition.
601            std::thread::sleep(std::time::Duration::from_secs(1));
602        }
603        // Resume workload
604        self.pause_snapshot.store(false, Ordering::Release);
605
606        // In SWEEPING phase, there will be no new disk pages with v < 'version' being created. As such,
607        // a sequential sweep of the page table is sufficient to capture all data pages with v < 'version'.
608        let page_table_iter = tree.storage.page_table.iter();
609        let mut enumerate_leaf_count = 0;
610
611        for (_, pid) in page_table_iter {
612            assert!(pid.is_id());
613
614            // A reader lock is enough
615            let mut leaf = tree.mapping_table().get(&pid);
616            let page_loc = leaf.get_page_location();
617            enumerate_leaf_count += 1;
618
619            match page_loc {
620                PageLocation::Base(offset) => {
621                    let base_ref = leaf.load_base_page(*offset);
622                    if base_ref.get_snapshot_version() < version {
623                        let base_ptr = unsafe {
624                            std::slice::from_raw_parts(
625                                base_ref as *const LeafNode as *const u8,
626                                base_ref.meta.node_size as usize,
627                            )
628                        };
629                        let offset = self.snapshot_page(base_ptr, base_ref.meta.node_size as usize);
630                        base_mapping.push((pid, offset));
631                    }
632                }
633                PageLocation::Full(ptr) => {
634                    // We snapshot Full page as a disk page to reduce some complexity as they are equivalent.
635                    let full_ref = leaf.load_cache_page(*ptr);
636                    if full_ref.get_snapshot_version() < version {
637                        let full_ptr = unsafe {
638                            std::slice::from_raw_parts(
639                                full_ref as *const LeafNode as *const u8,
640                                full_ref.meta.node_size as usize,
641                            )
642                        };
643                        let offset = self.snapshot_page(full_ptr, full_ref.meta.node_size as usize);
644                        base_mapping.push((pid, offset));
645                    }
646                }
647                PageLocation::Mini(ptr) => {
648                    let mini_ref = leaf.load_cache_page(*ptr);
649                    if mini_ref.get_snapshot_version() < version {
650                        let mini_ptr = unsafe {
651                            std::slice::from_raw_parts(
652                                mini_ref as *const LeafNode as *const u8,
653                                mini_ref.meta.node_size as usize,
654                            )
655                        };
656                        let offset = self.snapshot_page(mini_ptr, mini_ref.meta.node_size as usize);
657                        mini_mapping.push((pid, offset));
658                        mini_size_mapping.push((pid, mini_ref.meta.node_size as usize));
659
660                        if !tree.cache_only {
661                            // In disk-mode, the base page of mini-page is part of the snapshot as well.
662                            let base_ref = leaf.load_base_page(mini_ref.next_level.as_offset());
663                            assert!(base_ref.get_snapshot_version() < version); // disk page's version should never be greater than its mini-page's.
664
665                            let base_ptr = unsafe {
666                                std::slice::from_raw_parts(
667                                    base_ref as *const LeafNode as *const u8,
668                                    base_ref.meta.node_size as usize,
669                                )
670                            };
671                            let offset =
672                                self.snapshot_page(base_ptr, base_ref.meta.node_size as usize);
673                            base_mapping.push((pid, offset));
674                        }
675                    }
676                }
677                PageLocation::Null => {
678                    assert!(tree.cache_only);
679                    // In cache-only mode, an entry in page table could be Null when the corresponding mini-page is evicted.
680                    // The Null page is also snapshotted with a special marker.
681                    // Note that, the underlying assumption here is that Null page is always of an older version which may not be true.
682                    // To reconcille with the CPR semantics, we say that any data page in cache-only mode could be either Null or a valid page.
683                    // TODO: version the Null pages.
684                    mini_mapping.push((pid, NULL_PAGE_LOCATION_OFFSET)); // Special marker
685                    mini_size_mapping.push((pid, 0));
686                }
687            }
688        }
689
690        enumerate_leaf_count
691    }
692
693    /// Finalize the snapshot file and reset the snapshotmgr's internal data.
694    #[allow(clippy::too_many_arguments)]
695    fn finalize(
696        &self,
697        snapshot_version: u64,
698        inner_mapping: &mut [(*const InnerNode, usize)],
699        mini_mapping: &mut [(PageID, usize)],
700        mini_size_mapping: &mut [(PageID, usize)],
701        base_mapping: &mut [(PageID, usize)],
702        leaf_count_upper: usize,
703        config: Arc<Config>,
704    ) {
705        // There could be duplicate leaf/inner node mappings and we use a hash map to de-duplicate them.
706        let mut inner_mapping_unique: HashMap<*const InnerNode, usize> = HashMap::new();
707        let mut mini_mapping_unique: HashMap<PageID, usize> = HashMap::new();
708        let mut mini_size_mapping_unique: HashMap<PageID, usize> = HashMap::new();
709        let mut base_mapping_unique: HashMap<PageID, usize> = HashMap::new();
710
711        let local_inner_mappings = unsafe { &mut *self.thread_local_inner_mappings.get() };
712        let local_mini_mappings = unsafe { &mut *self.thread_local_mini_mappings.get() };
713        let local_mini_size_mappings = unsafe { &mut *self.thread_local_mini_size_mappings.get() };
714        let local_base_mappings = unsafe { &mut *self.thread_local_base_mappings.get() };
715
716        // De-duplicate mappings among snapshot threads
717        for thread_slot_id in 0..DEFAULT_MAX_SNAPSHOT_THREAD_NUM {
718            for m in local_inner_mappings[thread_slot_id].iter() {
719                inner_mapping_unique.entry(m.0).or_insert(m.1);
720            }
721            for m in local_mini_mappings[thread_slot_id].iter() {
722                mini_mapping_unique.entry(m.0).or_insert(m.1);
723            }
724            for m in local_mini_size_mappings[thread_slot_id].iter() {
725                if let std::collections::hash_map::Entry::Vacant(e) =
726                    mini_size_mapping_unique.entry(m.0)
727                {
728                    e.insert(m.1);
729                } else {
730                    // In cache-only mode, given that NULL page has no snapshot version, a page
731                    // could be snapshotted multiple times with either a valid page or a Null page.
732                    // In disk mode, assert that the same page is never snapshotted with different
733                    // content
734                    if !config.cache_only {
735                        assert!(m.1 == mini_size_mapping_unique.get(&m.0).copied().unwrap());
736                    }
737                }
738            }
739
740            for m in local_base_mappings[thread_slot_id].iter() {
741                base_mapping_unique.entry(m.0).or_insert(m.1);
742            }
743        }
744
745        // De-duplicate with the sweep mappings
746        for (k, v) in inner_mapping.iter() {
747            if !inner_mapping_unique.contains_key(k) {
748                inner_mapping_unique.insert(*k, *v);
749            }
750        }
751        for (k, v) in mini_mapping.iter() {
752            if !mini_mapping_unique.contains_key(k) {
753                mini_mapping_unique.insert(*k, *v);
754            }
755        }
756        for (k, v) in mini_size_mapping.iter() {
757            if !mini_size_mapping_unique.contains_key(k) {
758                mini_size_mapping_unique.insert(*k, *v);
759            } else {
760                if !config.cache_only {
761                    assert!(*v == mini_size_mapping_unique.get(k).copied().unwrap());
762                }
763            }
764        }
765        for (k, v) in base_mapping.iter() {
766            if !base_mapping_unique.contains_key(k) {
767                base_mapping_unique.insert(*k, *v);
768            }
769        }
770
771        // Sanity checks
772        assert!(mini_mapping_unique.len() == mini_size_mapping_unique.len());
773        for (k, _) in mini_mapping_unique.iter() {
774            assert!(mini_size_mapping_unique.contains_key(k));
775        }
776
777        if config.cache_only {
778            assert!(base_mapping_unique.is_empty());
779        } else {
780            assert!(mini_mapping_unique.len() <= base_mapping_unique.len());
781        }
782
783        // Finalize the inner node mappings of the snapshot
784        let mut final_inner_mapping: Vec<(*const InnerNode, usize)> = Vec::new();
785        for (k, v) in inner_mapping_unique.into_iter() {
786            final_inner_mapping.push((k, v));
787        }
788
789        // Finalize the base leaf node mappings of the snapshot, disk-mode only
790        // Sort the base leaf node mappings by PageID in ascending order
791        // which is required for page table initialization.
792        let mut sorted_base_mapping_uninit: Vec<MaybeUninit<(PageID, usize)>> =
793            Vec::with_capacity(base_mapping_unique.len());
794        unsafe {
795            sorted_base_mapping_uninit.set_len(base_mapping_unique.len());
796        }
797        let mut sorted_base_mapping_init = vec![false; base_mapping_unique.len()];
798
799        for (k, v) in base_mapping_unique.iter() {
800            assert!(k.is_id());
801            let offset = k.as_id();
802            assert!((offset as usize) < sorted_base_mapping_uninit.len());
803            sorted_base_mapping_init[offset as usize] = true;
804            sorted_base_mapping_uninit[offset as usize].write((*k, *v));
805        }
806        let final_sorted_base_mapping: Vec<(PageID, usize)> = if !config.cache_only {
807            assert_eq!(
808                base_mapping_unique.len(),
809                sorted_base_mapping_init.iter().filter(|&&b| b).count()
810            );
811            unsafe {
812                std::mem::transmute::<
813                    std::vec::Vec<std::mem::MaybeUninit<(PageID, usize)>>,
814                    std::vec::Vec<(PageID, usize)>,
815                >(sorted_base_mapping_uninit)
816            }
817        } else {
818            Vec::new()
819        };
820
821        // Finalize mini-page leaf node mappings of the snapshot
822        let mut final_mini_mapping: Vec<(PageID, usize)> = Vec::new();
823        for (k, v) in mini_mapping_unique.into_iter() {
824            final_mini_mapping.push((k, v));
825        }
826
827        let mut final_mini_size_mapping: Vec<(PageID, usize)> = Vec::new();
828        for (k, v) in mini_size_mapping_unique.into_iter() {
829            final_mini_size_mapping.push((k, v));
830        }
831
832        let leaf_page_num = if config.cache_only {
833            // For a pagelocation that's NULL (evicted), we don't know its version
834            // As such, we assume they are of the snapshot version and should be included.
835            // A few slots in page table mnight be wasted due to false positive.
836            // TODO: Be precise which requires some more metadata. Ideas:
837            // 1) Count number of child leaf nodes in snapshotted inner nodes
838            // 2) Put version in Null PageLocation.
839            // 3) Compact the snapshot file
840            leaf_count_upper
841        } else {
842            assert!(leaf_count_upper >= final_sorted_base_mapping.len());
843            final_sorted_base_mapping.len()
844        };
845
846        let mut file_size = std::mem::size_of::<BfTreeMeta>() as u64;
847
848        // Flush various mappings into the snapshot file
849        let (inner_offset, inner_size) = serialize_vec_to_disk(&final_inner_mapping, &self.vfs);
850
851        if inner_offset != 0 {
852            file_size = (inner_offset + align_to_sector_size(inner_size)) as u64;
853        }
854
855        let (mini_offset, mini_size) = serialize_vec_to_disk(&final_mini_mapping, &self.vfs);
856
857        if mini_offset != 0 {
858            file_size = (mini_offset + align_to_sector_size(mini_size)) as u64;
859        }
860
861        let (mini_size_offset, mini_size_size) =
862            serialize_vec_to_disk(&final_mini_size_mapping, &self.vfs);
863
864        if mini_size_offset != 0 {
865            file_size = (mini_size_offset + align_to_sector_size(mini_size_size)) as u64;
866        }
867
868        let (base_offset, base_size) = serialize_vec_to_disk(&final_sorted_base_mapping, &self.vfs);
869
870        if base_offset != 0 {
871            file_size = (base_offset + align_to_sector_size(base_size)) as u64;
872        }
873
874        // Write the header to the first disk page of the snapshot file
875        let metadata = BfTreeMeta {
876            magic_begin: *BF_TREE_MAGIC_BEGIN,
877            root_id: unsafe { PageID::from_raw(self.root_id.load(Ordering::Acquire)) },
878            inner_offset,
879            inner_size,
880            mini_offset,
881            mini_size,
882            mini_size_offset,
883            mini_size_size,
884            base_offset,
885            base_size,
886            file_size,
887            leaf_page_num,
888            snapshot_version,
889            cache_only: config.cache_only,
890            cb_size_byte: config.cb_size_byte,
891            read_promotion_rate: config.read_promotion_rate.load(Ordering::Relaxed),
892            scan_promotion_rate: config.scan_promotion_rate.load(Ordering::Relaxed),
893            cb_min_record_size: config.cb_min_record_size,
894            cb_max_record_size: config.cb_max_record_size,
895            leaf_page_size: config.leaf_page_size,
896            cb_max_key_len: config.cb_max_key_len,
897            max_fence_len: config.max_fence_len,
898            cb_copy_on_access_ratio: config.cb_copy_on_access_ratio,
899            read_record_cache: config.read_record_cache,
900            max_mini_page_size: config.max_mini_page_size,
901            mini_page_binary_search: config.mini_page_binary_search,
902            write_load_full_page: config.write_load_full_page,
903            magic_end: *BF_TREE_MAGIC_END,
904        };
905
906        self.vfs.write(META_DATA_PAGE_OFFSET, metadata.as_slice());
907        self.vfs.flush();
908
909        self.reset();
910    }
911
912    /// Take a CPR snapshot of a Bf-Tree
913    pub fn snapshot(&self, tree: &BfTree) {
914        // Allowing only one active snapshot at a time.
915        if self
916            .snapshot_in_progress
917            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
918            .is_err()
919        {
920            return;
921        }
922
923        // Clear previous snapshots from the snapshot file
924        self.vfs.reset();
925
926        // Initialize a inner node and leaf node mapping for the sweep
927        let mut sweep_inner_mapping: Vec<(*const InnerNode, usize)> = Vec::new();
928        let mut sweep_mini_mapping: Vec<(PageID, usize)> = Vec::new();
929        let mut sweep_mini_size_mapping: Vec<(PageID, usize)> = Vec::new();
930        let mut sweep_base_mapping: Vec<(PageID, usize)> = Vec::new();
931
932        // At the beginning, the global phase id must be 0 (REST).
933        let mut current_global_phase_id = self.get_global_phase_id();
934        assert_eq!(current_global_phase_id, 0);
935
936        // Version of the ongoing snapshot is set
937        let snapshot_version = self.get_global_version();
938
939        // Immediately move the global state to (1 (PREPARE), snapshot_version)
940        let mut current_global_state = self.advance_global_state();
941        current_global_phase_id = self.get_global_phase_id();
942        assert_eq!(current_global_phase_id, 1);
943        assert_eq!(snapshot_version, self.get_global_version());
944
945        let mut leaf_node_count_upper_bound = 0; // Indicate the total number of leaf nodes in the captured snapshot.
946
947        loop {
948            if self.check_if_phase_completed(current_global_state) {
949                match current_global_phase_id {
950                    // REST
951                    0 => {
952                        // Upon reaching here, all user threads are in (REST, snapshot_version + 1).
953                        // All snapshots of pages of snapshot_version are done, and no more snapshot operations
954                        // neither. As such, we can safely finalize the snapshot by writing out the
955                        // metadata and page mappings.
956                        self.finalize(
957                            snapshot_version,
958                            &mut sweep_inner_mapping,
959                            &mut sweep_mini_mapping,
960                            &mut sweep_mini_size_mapping,
961                            &mut sweep_base_mapping,
962                            leaf_node_count_upper_bound,
963                            tree.config.clone(),
964                        );
965
966                        // The snapshot is done.
967                        break;
968                    }
969                    // PREPARE
970                    1 => {
971                        current_global_state = self.advance_global_state();
972                        current_global_phase_id = self.get_global_phase_id();
973                        assert_eq!(current_global_phase_id, 2);
974                        assert_eq!(snapshot_version + 1, self.get_global_version());
975                    }
976                    // IN_PROGRESS
977                    2 => {
978                        current_global_state = self.advance_global_state();
979                        current_global_phase_id = self.get_global_phase_id();
980                        assert_eq!(current_global_phase_id, 3);
981                        assert_eq!(snapshot_version + 1, self.get_global_version());
982                    }
983                    // SWEEPING
984                    3 => {
985                        // Upon reaching here, there are no user threads with snapshot_version anymore.
986                        // Sweep through and snapshot all data pages whose version is less than (snapshot_version + 1)
987                        // and build inner node and leaf node mapping for those pages.
988                        // Upon completion, all pages with version less than (snapshot_version + 1) should have been captured in the snapshot.
989                        // Note that, there could be duplicate snapshots during and after the sweep as user threads in 'SWEEPING'
990                        // state keeps taking snapshots of pages even if they have been captured by the sweep as the sweep does not
991                        // alter page versions. De-duplication is needed when finalizing the snapshot file.
992                        leaf_node_count_upper_bound = self.sweep(
993                            tree,
994                            snapshot_version + 1,
995                            &mut sweep_inner_mapping,
996                            &mut sweep_mini_mapping,
997                            &mut sweep_mini_size_mapping,
998                            &mut sweep_base_mapping,
999                        );
1000
1001                        current_global_state = self.advance_global_state();
1002                        current_global_phase_id = self.get_global_phase_id();
1003                        assert_eq!(current_global_phase_id, 0);
1004                        assert_eq!(snapshot_version + 1, self.get_global_version());
1005                    }
1006                    _ => {
1007                        panic!("Invalid global phase id: {}", current_global_phase_id);
1008                    }
1009                }
1010            }
1011
1012            // At most wasting 1 second per state transition.
1013            std::thread::sleep(std::time::Duration::from_secs(1));
1014        }
1015
1016        assert!(self
1017            .snapshot_in_progress
1018            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
1019            .is_ok());
1020    }
1021
1022    /// Recover a Bf-tree from an existing snapshot file at recovery_snapshot_file_path.
1023    ///
1024    /// Most configuration params are directly retrieved from the snapshot file
1025    /// with the following ones for the caller to specify:
1026    /// 1. use_snapshot: whether the new tree will also support snapshot.
1027    /// 2. new_snapshot_file_path: If use_snapshot, then provide the path of the snapshot file of the newly recovered Bf-tree.
1028    ///    Note that, this path must be different from the recovery_snapshot_file_path.
1029    /// 3. buffer_ptr: optional pointer to a pre-allocated buffer for the newly recovered Bf-tree
1030    /// 4. buffer_size optional override of the buffer size retrieved from the snapshot file.
1031    ///    Note that, if the newly specified value is smaller than the one retrieved from the snapshot file,
1032    ///    then failure to restore a tree might happen. This is because during recovery, the memory cache pages
1033    ///    of the tree at the moment of snapshot are reinstated into memory too.
1034    /// 5. wal_config: optional write-ahead log configuration for the newly recovered Bf-tree
1035    pub fn new_from_snapshot(
1036        recovery_snapshot_file_path: PathBuf, //  The snapshot file to recover from
1037        use_snapshot: bool,
1038        new_snapshot_file_path: Option<PathBuf>, // The snapshot file of the newly recovered Bf-tree
1039        buffer_ptr: Option<*mut u8>,
1040        buffer_size: Option<usize>, // buffer size of the newly created Bf-tree
1041        wal_config: Option<Arc<WalConfig>>,
1042    ) -> Result<BfTree, ConfigError> {
1043        // Check the recovery file is valid
1044        if !recovery_snapshot_file_path.exists() {
1045            // if not already exist, we just create a new empty file at the location.
1046            return Err(ConfigError::SnapshotFileInvalid(
1047                "Not found ".to_string()
1048                    + &recovery_snapshot_file_path
1049                        .clone()
1050                        .into_os_string()
1051                        .into_string()
1052                        .unwrap(),
1053            ));
1054        }
1055
1056        // Create WAL, if specified
1057        let wal = wal_config.as_ref().map(|s| WriteAheadLog::new(s.clone()));
1058
1059        // Retrieve the header of the snapshot file and construct a valid config for the to-be-recovered Bf-tree
1060        let reader = std::fs::File::open(recovery_snapshot_file_path.clone()).unwrap();
1061        let mut metadata = SectorAlignedVector::new_zeroed(DISK_PAGE_SIZE); // Metadata is at most one disk page in size
1062        #[cfg(unix)]
1063        {
1064            reader.read_at(&mut metadata, 0).unwrap();
1065        }
1066        #[cfg(windows)]
1067        {
1068            reader.seek_read(&mut metadata, 0).unwrap();
1069        }
1070
1071        let bf_meta = unsafe { (metadata.as_ptr() as *const BfTreeMeta).read() };
1072        bf_meta.check_magic();
1073        assert_eq!(reader.metadata().unwrap().len(), bf_meta.file_size);
1074
1075        let mut bf_tree_config = Config::new_from_snapshot(&bf_meta);
1076
1077        let recovery_snapshot_file_backend = StorageBackend::Std; // TODO, recover storage backend from snapshot file
1078        if !bf_tree_config.cache_only {
1079            bf_tree_config.file_path(recovery_snapshot_file_path.clone());
1080            // The storage backend should use the same vfs system as the recovery snapshot file
1081            bf_tree_config.storage_backend = recovery_snapshot_file_backend.clone();
1082        } else {
1083            bf_tree_config.storage_backend = StorageBackend::Memory;
1084        }
1085        bf_tree_config.use_snapshot = use_snapshot;
1086
1087        bf_tree_config.snapshot_backend = StorageBackend::Std; // TODO, allow user chosen snapshot backend
1088
1089        bf_tree_config.snapshot_file_path = match new_snapshot_file_path {
1090            Some(path) => path,
1091            None => PathBuf::new(),
1092        };
1093
1094        let snapshot_mgr = if bf_tree_config.use_snapshot {
1095            Some(Arc::new(CPRSnapShotMgr::new(
1096                &bf_tree_config.snapshot_backend,
1097                bf_tree_config.snapshot_version,
1098                &bf_tree_config.snapshot_file_path,
1099            )))
1100        } else {
1101            None
1102        };
1103
1104        if let Some(size) = buffer_size {
1105            bf_tree_config.cb_size_byte = size
1106        }
1107
1108        let size_classes = BfTree::create_mem_page_size_classes(
1109            bf_tree_config.cb_min_record_size,
1110            bf_tree_config.cb_max_record_size,
1111            bf_tree_config.leaf_page_size,
1112            bf_tree_config.max_fence_len,
1113            bf_tree_config.cache_only,
1114        );
1115
1116        bf_tree_config.write_ahead_log = wal_config.clone();
1117        bf_tree_config.validate()?;
1118
1119        let config = Arc::new(bf_tree_config);
1120
1121        // Start Bf-Tree re-construction using recovery_snapshot_file_path and the config
1122        let recovery_snapshot_vfs = make_vfs(
1123            &recovery_snapshot_file_backend,
1124            recovery_snapshot_file_path.clone(),
1125        );
1126
1127        // Step 1: reconstruct inner nodes.
1128        let mut root_page_id = bf_meta.root_id;
1129        let mut inner_node_page_buffer = SectorAlignedVector::new_zeroed(INNER_NODE_SIZE);
1130        if root_page_id.is_inner_node_pointer() {
1131            let inner_mapping: Vec<(*const InnerNode, usize)> = read_vec_from_offset(
1132                bf_meta.inner_offset,
1133                bf_meta.inner_size,
1134                &recovery_snapshot_vfs,
1135            );
1136            let mut inner_map = HashMap::new();
1137
1138            for m in inner_mapping {
1139                inner_map.insert(m.0, m.1);
1140            }
1141            let offset = inner_map.get(&root_page_id.as_inner_node()).unwrap();
1142            recovery_snapshot_vfs.read(*offset, &mut inner_node_page_buffer);
1143            let root_page = InnerNodeBuilder::new().build_from_slice(&inner_node_page_buffer);
1144
1145            // No need for disk offset of a inner node.
1146            unsafe {
1147                (*root_page).set_disk_offset(INVALID_DISK_OFFSET as u64);
1148            }
1149            // Mark the root node
1150            unsafe {
1151                (*root_page).set_root(true);
1152            }
1153            root_page_id = PageID::from_pointer(root_page);
1154
1155            let mut inner_resolve_queue = VecDeque::from([root_page]);
1156            while !inner_resolve_queue.is_empty() {
1157                let inner_ptr = inner_resolve_queue.pop_front().unwrap();
1158                let mut inner = ReadGuard::try_read(inner_ptr).unwrap().upgrade().unwrap();
1159                if inner.as_ref().meta.children_is_leaf() {
1160                    continue;
1161                }
1162                for (idx, c) in inner.as_ref().get_child_iter().enumerate() {
1163                    let offset = inner_map.get(&c.as_inner_node()).unwrap();
1164                    recovery_snapshot_vfs.read(*offset, &mut inner_node_page_buffer);
1165                    let inner_page =
1166                        InnerNodeBuilder::new().build_from_slice(&inner_node_page_buffer);
1167                    unsafe {
1168                        (*inner_page).set_disk_offset(INVALID_DISK_OFFSET as u64);
1169                    }
1170                    let inner_id = PageID::from_pointer(inner_page);
1171                    inner.as_mut().update_at_pos(idx, inner_id);
1172                    inner_resolve_queue.push_back(inner_page);
1173                }
1174            }
1175        }
1176
1177        let raw_root_id = if root_page_id.is_id() {
1178            root_page_id.raw() | BfTree::ROOT_IS_LEAF_MASK
1179        } else {
1180            root_page_id.raw()
1181        };
1182
1183        // Step 2. Reconstruct the page table and leaf pages.
1184        // Here we differentiate cache-only from disk-backed Bf-trees as their recovery process differs.
1185        if !bf_meta.cache_only {
1186            // For disk backed bf-tree, we reconstruct the page table using base pages.
1187            let base_mapping: Vec<(PageID, usize)> = if bf_meta.base_size > 0 {
1188                read_vec_from_offset(
1189                    bf_meta.base_offset,
1190                    bf_meta.base_size,
1191                    &recovery_snapshot_vfs,
1192                )
1193            } else {
1194                Vec::new()
1195            };
1196
1197            let base_page_loc_mapping = base_mapping.into_iter().map(|(pid, offset)| {
1198                let loc = PageLocation::Base(offset);
1199                (pid, loc)
1200            });
1201
1202            // The file system of the newly constructed Bf-tree is the recovery_snapshot_file
1203            let pt = PageTable::new_from_mapping(
1204                base_page_loc_mapping,
1205                recovery_snapshot_vfs.clone(),
1206                config.clone(),
1207                snapshot_mgr.clone(),
1208            );
1209
1210            let circular_buffer = CircularBuffer::new(
1211                config.cb_size_byte,
1212                config.cb_copy_on_access_ratio,
1213                config.cb_min_record_size,
1214                config.cb_max_record_size,
1215                config.leaf_page_size,
1216                config.max_fence_len,
1217                buffer_ptr,
1218                config.cache_only,
1219            );
1220
1221            // Next, we restore the mini-pages and wire them to the corresponding base pages and update the page table.
1222            let mini_mapping: Vec<(PageID, usize)> = if bf_meta.mini_size > 0 {
1223                read_vec_from_offset(
1224                    bf_meta.mini_offset,
1225                    bf_meta.mini_size,
1226                    &recovery_snapshot_vfs,
1227                )
1228            } else {
1229                Vec::new()
1230            };
1231
1232            let mini_size_mapping: Vec<(PageID, usize)> = if bf_meta.mini_size_size > 0 {
1233                read_vec_from_offset(
1234                    bf_meta.mini_size_offset,
1235                    bf_meta.mini_size_size,
1236                    &recovery_snapshot_vfs,
1237                )
1238            } else {
1239                Vec::new()
1240            };
1241
1242            let mut mini_size_mapping_unique: HashMap<PageID, usize> = HashMap::new();
1243            for (pid, size) in mini_size_mapping {
1244                mini_size_mapping_unique.insert(pid, size);
1245            }
1246
1247            // Create the storage system first before allocating mini-pages.
1248            let storage =
1249                LeafStorage::new_inner(config.clone(), pt, circular_buffer, recovery_snapshot_vfs);
1250
1251            for (pid, offset) in &mini_mapping {
1252                let mini_size = *mini_size_mapping_unique.get(pid).unwrap();
1253
1254                // Allocate space in memory for new mini-page
1255                let mini_page_guard = match storage.alloc_mini_page(mini_size) {
1256                    Ok(mini_page_ptr) => mini_page_ptr,
1257                    Err(_) => {
1258                        return Err(ConfigError::CircularBufferSize("buffer size set too small. Consider increasing it or not specifying at all".to_string()));
1259                    }
1260                };
1261
1262                // Copy mini-page from snapshot file to the newly allocated mini-page
1263                let mut page_buffer = SectorAlignedVector::new_zeroed(mini_size);
1264                storage.vfs.read(*offset, &mut page_buffer);
1265                unsafe {
1266                    std::ptr::copy_nonoverlapping(
1267                        page_buffer.as_ptr(),
1268                        mini_page_guard.as_ptr(),
1269                        mini_size,
1270                    );
1271                }
1272
1273                // Connect the new mini-page to the corresponding base page in page table
1274                let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
1275                let mini_page = unsafe { &mut *new_mini_ptr };
1276
1277                let mut base_page = storage.page_table.get_mut(pid);
1278                let page_loc = base_page.get_page_location().clone();
1279                match page_loc {
1280                    PageLocation::Base(off) => {
1281                        mini_page.next_level = MiniPageNextLevel::new(off);
1282                    }
1283                    _ => {
1284                        panic!("Unexpected page location for base page");
1285                    }
1286                }
1287                let mini_loc = PageLocation::Mini(new_mini_ptr);
1288
1289                // Replace the base page with the mini-page in the page table
1290                base_page.create_cache_page_loc(mini_loc);
1291            }
1292
1293            Ok(BfTree {
1294                storage,
1295                root_page_id: AtomicU64::new(raw_root_id),
1296                wal,
1297                write_load_full_page: config.write_load_full_page,
1298                cache_only: false,
1299                mini_page_size_classes: size_classes,
1300                snapshot_mgr,
1301                config,
1302                #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
1303                metrics_recorder: Some(Arc::new(ThreadLocal::new())),
1304            })
1305        } else {
1306            // For cache-only mode, we create a new page table with NULL pages
1307            let mini_mapping_unallocated: Vec<(PageID, PageLocation)> = (0..bf_meta.leaf_page_num)
1308                .map(|pid| (PageID::from_id(pid as u64), PageLocation::Null))
1309                .collect();
1310
1311            // Then we restore the mini-pages and replace the corresponding Null pages and update the page table.
1312            let mini_mapping: Vec<(PageID, usize)> = if bf_meta.mini_size > 0 {
1313                read_vec_from_offset(
1314                    bf_meta.mini_offset,
1315                    bf_meta.mini_size,
1316                    &recovery_snapshot_vfs,
1317                )
1318            } else {
1319                Vec::new()
1320            };
1321
1322            let mini_size_mapping: Vec<(PageID, usize)> = if bf_meta.mini_size_size > 0 {
1323                read_vec_from_offset(
1324                    bf_meta.mini_size_offset,
1325                    bf_meta.mini_size_size,
1326                    &recovery_snapshot_vfs,
1327                )
1328            } else {
1329                Vec::new()
1330            };
1331
1332            let mut mini_size_mapping_unique: HashMap<PageID, usize> = HashMap::new();
1333            for (pid, size) in mini_size_mapping {
1334                mini_size_mapping_unique.insert(pid, size);
1335            }
1336
1337            // For cache-only mode, the file system of the new bf-tree is memory-based
1338            let storage_vfs = make_vfs(&config.storage_backend, PathBuf::new());
1339
1340            let pt = PageTable::new_from_mapping(
1341                mini_mapping_unallocated.into_iter(),
1342                storage_vfs.clone(),
1343                config.clone(),
1344                snapshot_mgr.clone(),
1345            );
1346
1347            let circular_buffer = CircularBuffer::new(
1348                config.cb_size_byte,
1349                config.cb_copy_on_access_ratio,
1350                config.cb_min_record_size,
1351                config.cb_max_record_size,
1352                config.leaf_page_size,
1353                config.max_fence_len,
1354                buffer_ptr,
1355                config.cache_only,
1356            );
1357
1358            // Create a memory-based storage system before allocating mini-pages.
1359            let storage = LeafStorage::new_inner(config.clone(), pt, circular_buffer, storage_vfs);
1360
1361            for (pid, offset) in &mini_mapping {
1362                let mini_size = *mini_size_mapping_unique.get(pid).unwrap();
1363
1364                // Skip over null pages as the default is Null in the page table already
1365                if *offset == NULL_PAGE_LOCATION_OFFSET {
1366                    continue;
1367                }
1368
1369                // Allocate memory for a new mini-page in storage
1370                let mini_page_guard = match storage.alloc_mini_page(mini_size) {
1371                    Ok(mini_page_ptr) => mini_page_ptr,
1372                    Err(_) => {
1373                        panic!("Please increase cb_size_byte in config");
1374                    }
1375                };
1376
1377                // Copy mini-page from snapshot file to the newly allocated space.
1378                let mut page_buffer = SectorAlignedVector::new_zeroed(mini_size);
1379                recovery_snapshot_vfs.read(*offset, &mut page_buffer);
1380                unsafe {
1381                    std::ptr::copy_nonoverlapping(
1382                        page_buffer.as_ptr(),
1383                        mini_page_guard.as_ptr(),
1384                        mini_size,
1385                    );
1386                }
1387
1388                // Set its next level to oblivion
1389                let mini_page_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
1390                let mini_page = unsafe { &mut *mini_page_ptr };
1391                mini_page.next_level = MiniPageNextLevel::new_null();
1392
1393                // Update the corresponding page location
1394                let mut null_page = storage.page_table.get_mut(pid);
1395                let page_loc = null_page.get_page_location().clone();
1396                match page_loc {
1397                    PageLocation::Null => {
1398                        let mini_loc = PageLocation::Mini(mini_page_ptr);
1399                        null_page.create_cache_page_loc(mini_loc);
1400                    }
1401                    _ => {
1402                        panic!("Unexpected page location for null page");
1403                    }
1404                }
1405            }
1406            Ok(BfTree {
1407                storage,
1408                root_page_id: AtomicU64::new(raw_root_id),
1409                wal,
1410                write_load_full_page: config.write_load_full_page,
1411                cache_only: true,
1412                mini_page_size_classes: size_classes,
1413                snapshot_mgr,
1414                config,
1415                #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
1416                metrics_recorder: Some(Arc::new(ThreadLocal::new())),
1417            })
1418        }
1419    }
1420}
1421
1422impl BfTree {
1423    /// Recovery a Bf-Tree from a cpr snapshot and WAL files.
1424    /// Incomplete function, internal use only
1425    pub fn recovery(
1426        recovery_snapshot_file_path: PathBuf, //  The snapshot file to recover from
1427        wal_file: impl AsRef<Path>,
1428        use_snapshot: bool,
1429        new_snapshot_file_path: Option<PathBuf>, // The snapshot file of the newly recovered Bf-tree
1430        buffer_ptr: Option<*mut u8>,
1431        buffer_size: Option<usize>,
1432        wal: Option<Arc<WalConfig>>,
1433    ) {
1434        let bf_tree = BfTree::new_from_cpr_snapshot(
1435            recovery_snapshot_file_path,
1436            use_snapshot,
1437            new_snapshot_file_path,
1438            buffer_ptr,
1439            buffer_size,
1440            wal,
1441        )
1442        .unwrap();
1443        let wal_reader = WalReader::new(wal_file, 4096);
1444
1445        for seg in wal_reader.segment_iter() {
1446            for entry in seg.entry_iter() {
1447                let log_entry = LogEntry::read_from_buffer(entry.1);
1448                match log_entry {
1449                    LogEntry::Write(op) => {
1450                        bf_tree.insert(op.key, op.value);
1451                    }
1452                    LogEntry::Split(_op) => {
1453                        todo!("implement split op in wal!")
1454                    }
1455                }
1456            }
1457        }
1458    }
1459
1460    /// Take a new CPR snapshot
1461    pub fn cpr_snapshot(&self) {
1462        if !self.config.use_snapshot {
1463            panic!("Snapshots are not enabled in the configuration");
1464        }
1465
1466        let snpshot_mgr = self.snapshot_mgr.clone().unwrap();
1467        snpshot_mgr.snapshot(self);
1468    }
1469
1470    /// Recover a BfTree from a CPR snapshot
1471    pub fn new_from_cpr_snapshot(
1472        recovery_snapshot_file_path: PathBuf, //  The snapshot file to recover from
1473        use_snapshot: bool,
1474        new_snapshot_file_path: Option<PathBuf>, // The snapshot file of the newly recovered Bf-tree
1475        buffer_ptr: Option<*mut u8>,
1476        buffer_size: Option<usize>,
1477        wal: Option<Arc<WalConfig>>,
1478    ) -> Result<BfTree, ConfigError> {
1479        CPRSnapShotMgr::new_from_snapshot(
1480            recovery_snapshot_file_path,
1481            use_snapshot,
1482            new_snapshot_file_path,
1483            buffer_ptr,
1484            buffer_size,
1485            wal,
1486        )
1487    }
1488
1489    /// Check if all threads are running in the next version of the current snapshot
1490    pub fn are_all_threads_in_next_version(&self) -> bool {
1491        let snpshot_mgr = self.snapshot_mgr.clone().unwrap();
1492        snpshot_mgr.are_all_threads_in_next_version()
1493    }
1494}
1495
1496struct SectorAlignedVector {
1497    inner: ManuallyDrop<Vec<u8>>,
1498}
1499
1500impl Drop for SectorAlignedVector {
1501    fn drop(&mut self) {
1502        let layout =
1503            std::alloc::Layout::from_size_align(self.inner.capacity(), SECTOR_SIZE).unwrap();
1504        let ptr = self.inner.as_mut_ptr();
1505        unsafe {
1506            std::alloc::dealloc(ptr, layout);
1507        }
1508    }
1509}
1510
1511impl SectorAlignedVector {
1512    fn new_zeroed(capacity: usize) -> Self {
1513        let layout = std::alloc::Layout::from_size_align(capacity, SECTOR_SIZE).unwrap();
1514        let ptr = unsafe { std::alloc::alloc_zeroed(layout) };
1515
1516        let inner = unsafe { Vec::from_raw_parts(ptr, capacity, capacity) };
1517        Self {
1518            inner: ManuallyDrop::new(inner),
1519        }
1520    }
1521}
1522
1523impl Deref for SectorAlignedVector {
1524    type Target = Vec<u8>;
1525
1526    fn deref(&self) -> &Self::Target {
1527        &self.inner
1528    }
1529}
1530
1531impl DerefMut for SectorAlignedVector {
1532    fn deref_mut(&mut self) -> &mut Self::Target {
1533        &mut self.inner
1534    }
1535}
1536
1537/// We use repr(C) for simplicity, maybe flatbuffer or bincode or even repr(Rust) is better.
1538/// But we don't care about the space here.
1539/// I don't want to introduce giant dependencies just for this.
1540#[repr(C, align(512))]
1541pub(crate) struct BfTreeMeta {
1542    magic_begin: [u8; 16],
1543    // Snapshot file metadata
1544    root_id: PageID,
1545    inner_offset: usize,
1546    inner_size: usize,
1547    mini_offset: usize,
1548    mini_size: usize,
1549    mini_size_offset: usize,
1550    mini_size_size: usize,
1551    base_offset: usize,
1552    base_size: usize,
1553    file_size: u64,
1554    leaf_page_num: usize,
1555    // Bf-tree configuration of the snapshot
1556    pub(crate) cb_size_byte: usize,
1557    pub(crate) snapshot_version: u64,
1558    pub(crate) cache_only: bool,
1559    pub(crate) read_promotion_rate: usize,
1560    pub(crate) scan_promotion_rate: usize,
1561    pub(crate) cb_min_record_size: usize,
1562    pub(crate) cb_max_record_size: usize,
1563    pub(crate) leaf_page_size: usize,
1564    pub(crate) cb_max_key_len: usize,
1565    pub(crate) max_fence_len: usize,
1566    pub(crate) cb_copy_on_access_ratio: f64,
1567    pub(crate) read_record_cache: bool,
1568    pub(crate) max_mini_page_size: usize,
1569    pub(crate) mini_page_binary_search: bool,
1570    pub(crate) write_load_full_page: bool,
1571    magic_end: [u8; 14],
1572}
1573const _: () = assert!(std::mem::size_of::<BfTreeMeta>() <= DISK_PAGE_SIZE);
1574
1575impl BfTreeMeta {
1576    fn as_slice(&self) -> &[u8] {
1577        let ptr = self as *const Self as *const u8;
1578        let size = std::mem::size_of::<Self>();
1579        unsafe { std::slice::from_raw_parts(ptr, size) }
1580    }
1581
1582    fn check_magic(&self) {
1583        assert_eq!(self.magic_begin, *BF_TREE_MAGIC_BEGIN);
1584        assert_eq!(self.magic_end, *BF_TREE_MAGIC_END);
1585    }
1586}
1587
1588/// Returns starting offset and total size written to disk.
1589fn serialize_vec_to_disk<T>(v: &[T], vfs: &Arc<dyn VfsImpl>) -> (usize, usize) {
1590    if v.is_empty() {
1591        return (0, 0);
1592    }
1593    let unaligned_ptr = v.as_ptr() as *const u8;
1594    let unaligned_size = std::mem::size_of_val(v);
1595
1596    let aligned_size = align_to_sector_size(unaligned_size);
1597    let layout = std::alloc::Layout::from_size_align(aligned_size, SECTOR_SIZE).unwrap();
1598    unsafe {
1599        let aligned_ptr = std::alloc::alloc_zeroed(layout);
1600        std::ptr::copy_nonoverlapping(unaligned_ptr, aligned_ptr, unaligned_size);
1601        let slice = std::slice::from_raw_parts(aligned_ptr, aligned_size);
1602        let offset = serialize_u8_slice_to_disk(slice, vfs);
1603        std::alloc::dealloc(aligned_ptr, layout);
1604        (offset, unaligned_size)
1605    }
1606}
1607
1608fn read_vec_from_offset<T: Clone>(offset: usize, size: usize, vfs: &Arc<dyn VfsImpl>) -> Vec<T> {
1609    assert!(size > 0);
1610    let slice = read_u8_slice_from_disk(offset, size, vfs);
1611    let ptr = slice.as_ptr() as *const T;
1612    let size = size / std::mem::size_of::<T>();
1613    let slice = unsafe { std::slice::from_raw_parts(ptr, size) };
1614    slice.to_vec()
1615}
1616
1617fn read_u8_slice_from_disk(offset: usize, size: usize, vfs: &Arc<dyn VfsImpl>) -> Vec<u8> {
1618    let mut res = Vec::new();
1619    let mut buffer = vec![0; DISK_PAGE_SIZE];
1620    for i in (0..size).step_by(DISK_PAGE_SIZE) {
1621        vfs.read(offset + i, &mut buffer); // Read one disk page at a time
1622        res.extend_from_slice(&buffer);
1623    }
1624    res
1625}
1626
1627const SECTOR_SIZE: usize = 512;
1628
1629fn align_to_sector_size(n: usize) -> usize {
1630    (n + SECTOR_SIZE - 1) & !(SECTOR_SIZE - 1)
1631}
1632
1633/// Write a slice to disk and return the start offset and page count.
1634/// TODO: we should not just return offset and count, because the offset is not necessarily continuos.
1635///     We should return a Vec of offsets. But let's keep it simple for fast prototype.
1636fn serialize_u8_slice_to_disk(slice: &[u8], vfs: &Arc<dyn VfsImpl>) -> usize {
1637    let mut start_offset = None;
1638    for chunk in slice.chunks(DISK_PAGE_SIZE) {
1639        let offset = vfs.alloc_offset(DISK_PAGE_SIZE); // Write one disk page at a time
1640        if start_offset.is_none() {
1641            start_offset = Some(offset);
1642        }
1643        vfs.write(offset, chunk);
1644    }
1645    start_offset.unwrap()
1646}
1647
1648#[cfg(test)]
1649mod tests {
1650    use crate::{nodes::leaf_node::LeafReadResult, BfTree, Config};
1651    use std::panic;
1652    use std::str::FromStr;
1653    use std::sync::atomic::Ordering;
1654    use std::sync::{atomic::AtomicBool, Arc};
1655    use std::thread;
1656
1657    /// Multiple writer threads write to a BfTree in parallel while a separate thread taking multiple snapshots
1658    /// A new BfTree recovered from the snapshot should contain a prefix of all the inserts from each writer thread.
1659    /// A snapshot taken later should cover the previous snapshots.
1660    #[test]
1661    fn cpr_snapshot_disk() {
1662        let min_record_size: usize = 64;
1663        let max_record_size: usize = 2408;
1664        let leaf_page_size: usize = 8192;
1665        let snapshot_num: usize = 2;
1666        let num_threads: usize = 4;
1667        let file_path: String = "target/test_simple.bftree".to_string();
1668        let snapshot_file_path: String = "target/test_simple_snapshot.bftree".to_string();
1669        let new_snapshot_file_path: String = "target/test_simple_new_snapshot.bftree".to_string();
1670
1671        let tmp_file_path = std::path::PathBuf::from_str(&file_path).unwrap();
1672        let tmp_snapshot_file_path = std::path::PathBuf::from_str(&snapshot_file_path).unwrap();
1673        let tmp_new_snapshot_file_path =
1674            std::path::PathBuf::from_str(&new_snapshot_file_path).unwrap();
1675
1676        let mut config = Config::new(&tmp_file_path, 128 * 1024); // 128KB buffer pool. insert/split/eviction all triggered
1677        config.storage_backend(crate::StorageBackend::Std);
1678        config.cb_min_record_size = min_record_size;
1679        config.cb_max_record_size = max_record_size;
1680        config.leaf_page_size = leaf_page_size;
1681        config.max_fence_len = max_record_size;
1682        config.snapshot_file_path = tmp_snapshot_file_path.clone();
1683        config.use_snapshot(true);
1684
1685        let bftree = Arc::new(BfTree::with_config(config.clone(), None).unwrap());
1686        let finish = Arc::new(AtomicBool::new(false));
1687
1688        let handles: Vec<_> = (0..num_threads)
1689            .map(|i| {
1690                let finish_clone = finish.clone();
1691                let bftree_clone = bftree.clone();
1692
1693                std::thread::spawn(move || {
1694                    let key_len: usize = min_record_size / 2 + std::mem::size_of::<usize>();
1695                    assert!(key_len * 2 <= max_record_size);
1696                    let mut key_buffer = vec![0usize; key_len / std::mem::size_of::<usize>()];
1697
1698                    let mut r: usize = 0;
1699                    while !finish_clone.load(Ordering::Relaxed) {
1700                        key_buffer.fill(r);
1701                        key_buffer[0] = i;
1702
1703                        match bftree_clone.insert(
1704                            bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
1705                            bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
1706                        ) {
1707                            crate::LeafInsertResult::Success => {}
1708                            _ => {
1709                                panic!("Insert failed");
1710                            }
1711                        }
1712                        r += 1;
1713                    }
1714                    r
1715                })
1716            })
1717            .collect();
1718
1719        thread::sleep(std::time::Duration::from_secs(5));
1720        for _ in 0..snapshot_num {
1721            // take a snapshot
1722            bftree.cpr_snapshot();
1723            thread::sleep(std::time::Duration::from_secs(5));
1724        }
1725
1726        // Stop all writer threads
1727        let mut rs = vec![0usize; num_threads];
1728        finish.store(true, Ordering::Relaxed);
1729        for (i, h) in handles.into_iter().enumerate() {
1730            let r = h.join().unwrap();
1731            rs[i] = r;
1732        }
1733
1734        drop(bftree);
1735
1736        // Recover from the snapshot and check invariants
1737        let bftree = BfTree::new_from_cpr_snapshot(
1738            std::path::PathBuf::from_str(&snapshot_file_path).unwrap(),
1739            true,
1740            Some(std::path::PathBuf::from_str(&(new_snapshot_file_path)).unwrap()),
1741            None,
1742            None,
1743            None,
1744        )
1745        .unwrap();
1746
1747        let mut rs_captured = vec![0usize; num_threads];
1748        for i in 0..num_threads {
1749            let record_num = rs[i];
1750
1751            let key_len: usize = min_record_size / 2 + std::mem::size_of::<usize>();
1752            let mut key_buffer = vec![0usize; key_len / std::mem::size_of::<usize>()];
1753            let mut res_buffer = vec![0u8; key_len];
1754            let mut not_included = false;
1755
1756            for r in 0..record_num {
1757                key_buffer.fill(r);
1758                key_buffer[0] = i;
1759
1760                match bftree.read(
1761                    bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
1762                    &mut res_buffer,
1763                ) {
1764                    LeafReadResult::Found(v) => {
1765                        if not_included {
1766                            panic!("Found a record after the prefix for writer thread {}", i);
1767                        }
1768                        assert_eq!(v as usize, key_len);
1769                        assert_eq!(
1770                            &res_buffer,
1771                            bytemuck::must_cast_slice::<usize, u8>(&key_buffer)
1772                        );
1773                    }
1774                    LeafReadResult::NotFound => {
1775                        if !not_included {
1776                            not_included = true;
1777                            rs_captured[i] = r;
1778                        }
1779                    }
1780                    _ => {
1781                        panic!("Unexpected read result")
1782                    }
1783                }
1784            }
1785
1786            if !not_included {
1787                rs_captured[i] = record_num;
1788            }
1789
1790            // There must be some records that were not included in the snapshot
1791            assert!(rs_captured[i] < record_num)
1792        }
1793
1794        std::fs::remove_file(tmp_file_path).unwrap();
1795        std::fs::remove_file(tmp_new_snapshot_file_path).unwrap();
1796        std::fs::remove_file(tmp_snapshot_file_path).unwrap();
1797    }
1798
1799    #[test]
1800    fn cpr_snapshot_cache_only() {
1801        let min_record_size: usize = 64;
1802        let max_record_size: usize = 2408;
1803        let leaf_page_size: usize = 8192;
1804        let num_threads: usize = 4;
1805
1806        let snapshot_file_path: String =
1807            "target/test_simple_cache_only_snapshot.bftree".to_string();
1808        let new_snapshot_file_path: String =
1809            "target/test_simple_cache_only_new_snapshot.bftree".to_string();
1810        let tmp_snapshot_file_path = std::path::PathBuf::from_str(&snapshot_file_path).unwrap();
1811        let tmp_new_snapshot_file_path =
1812            std::path::PathBuf::from_str(&new_snapshot_file_path).unwrap();
1813
1814        let mut config = Config::default(); // Creat a CB that can hold 16 full pages
1815        config.storage_backend(crate::StorageBackend::Memory);
1816        config.file_path(":memory:");
1817        config.cache_only = true;
1818        config.cb_size_byte(1024 * 1024 * 1024);
1819        config.cb_min_record_size = min_record_size;
1820        config.cb_max_record_size = max_record_size;
1821        config.leaf_page_size = leaf_page_size;
1822        config.max_fence_len = max_record_size;
1823        config.snapshot_file_path = tmp_snapshot_file_path.clone();
1824        config.use_snapshot(true);
1825
1826        let bftree = Arc::new(BfTree::with_config(config.clone(), None).unwrap());
1827        let finish = Arc::new(AtomicBool::new(false));
1828
1829        let handles: Vec<_> = (0..num_threads)
1830            .map(|i| {
1831                let finish_clone = finish.clone();
1832                let bftree_clone = bftree.clone();
1833
1834                std::thread::spawn(move || {
1835                    let key_len: usize = min_record_size / 2 + std::mem::size_of::<usize>();
1836                    assert!(key_len * 2 <= max_record_size);
1837                    let mut key_buffer = vec![0usize; key_len / std::mem::size_of::<usize>()];
1838
1839                    let mut r: usize = 0;
1840                    while !finish_clone.load(Ordering::Relaxed) {
1841                        key_buffer.fill(r);
1842                        key_buffer[0] = i;
1843
1844                        match bftree_clone.insert(
1845                            bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
1846                            bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
1847                        ) {
1848                            crate::LeafInsertResult::Success => {}
1849                            _ => {
1850                                panic!("Insert failed");
1851                            }
1852                        }
1853                        r += 1;
1854                    }
1855                    r
1856                })
1857            })
1858            .collect();
1859
1860        thread::sleep(std::time::Duration::from_secs(5));
1861        // take a snapshot
1862        bftree.cpr_snapshot();
1863        thread::sleep(std::time::Duration::from_secs(5));
1864
1865        // Stop all writer threads
1866        let mut rs = vec![0usize; num_threads];
1867        finish.store(true, Ordering::Relaxed);
1868        for (i, h) in handles.into_iter().enumerate() {
1869            let r = h.join().unwrap();
1870            rs[i] = r;
1871        }
1872
1873        drop(bftree);
1874
1875        // Recover from the snapshot and check invariants
1876        let bftree = BfTree::new_from_cpr_snapshot(
1877            std::path::PathBuf::from_str(&snapshot_file_path).unwrap(),
1878            true,
1879            Some(std::path::PathBuf::from_str(&(new_snapshot_file_path)).unwrap()),
1880            None,
1881            None,
1882            None,
1883        )
1884        .unwrap();
1885
1886        let mut rs_captured = vec![0usize; num_threads];
1887        for i in 0..num_threads {
1888            let record_num = rs[i];
1889
1890            let key_len: usize = min_record_size / 2 + std::mem::size_of::<usize>();
1891            let mut key_buffer = vec![0usize; key_len / std::mem::size_of::<usize>()];
1892            let mut res_buffer = vec![0u8; key_len];
1893            let mut not_included = false;
1894
1895            for r in 0..record_num {
1896                key_buffer.fill(r);
1897                key_buffer[0] = i;
1898
1899                match bftree.read(
1900                    bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
1901                    &mut res_buffer,
1902                ) {
1903                    LeafReadResult::Found(v) => {
1904                        if not_included {
1905                            panic!("Found a record after the prefix for writer thread {}", i);
1906                        }
1907                        assert_eq!(v as usize, key_len);
1908                        assert_eq!(
1909                            &res_buffer,
1910                            bytemuck::must_cast_slice::<usize, u8>(&key_buffer)
1911                        );
1912                    }
1913                    LeafReadResult::NotFound => {
1914                        if !not_included {
1915                            not_included = true;
1916                            rs_captured[i] = r;
1917                        }
1918                    }
1919                    _ => {
1920                        panic!("Unexpected read result")
1921                    }
1922                }
1923            }
1924
1925            if !not_included {
1926                rs_captured[i] = record_num;
1927            }
1928
1929            // There must be some records that were not included in the snapshot
1930            assert!(rs_captured[i] < record_num)
1931        }
1932
1933        std::fs::remove_file(tmp_snapshot_file_path).unwrap();
1934        std::fs::remove_file(tmp_new_snapshot_file_path).unwrap();
1935    }
1936}