Skip to main content

bf_tree/
snapshot.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT license.
3
4use std::cell::UnsafeCell;
5use std::collections::{HashMap, VecDeque};
6use std::mem::{ManuallyDrop, MaybeUninit};
7use std::ops::{Deref, DerefMut};
8use std::panic;
9use std::path::Path;
10use std::path::PathBuf;
11use std::sync::Arc;
12
13#[cfg(not(all(feature = "shuttle", test)))]
14use rand::Rng;
15#[cfg(all(feature = "shuttle", test))]
16use shuttle::rand::Rng;
17
18#[cfg(unix)]
19use std::os::unix::fs::FileExt;
20#[cfg(windows)]
21use std::os::windows::fs::FileExt;
22
23#[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
24use thread_local::ThreadLocal;
25
26use crate::{
27    circular_buffer::CircularBuffer,
28    error::ConfigError,
29    fs::VfsImpl,
30    mini_page_op::LeafOperations,
31    nodes::{leaf_node::MiniPageNextLevel, LeafNode, INVALID_DISK_OFFSET},
32    nodes::{InnerNode, InnerNodeBuilder, PageID, DISK_PAGE_SIZE, INNER_NODE_SIZE},
33    storage::{make_vfs, LeafStorage, PageLocation, PageTable},
34    sync::atomic::{AtomicBool, AtomicU64, Ordering},
35    sync::thread,
36    sync::RwLock,
37    utils::{get_rng, inner_lock::ReadGuard, BfsVisitor, NodeInfo},
38    wal::{LogEntry, LogEntryImpl, WriteAheadLog},
39    BfTree, Config, StorageBackend, WalConfig, WalReader,
40};
41
42const BF_TREE_MAGIC_BEGIN: &[u8; 16] = b"BF-TREE-V0-BEGIN";
43const BF_TREE_MAGIC_END: &[u8; 14] = b"BF-TREE-V0-END";
44const META_DATA_PAGE_OFFSET: usize = 0;
45
46const INVALID_SNAPSHOT_THREAD_ID: usize = usize::MAX; // Invalid thread slot id
47const NULL_PAGE_LOCATION_OFFSET: usize = usize::MAX; // Special page loc offset for a Null page
48const INVALID_SNAPSHOT_STATE: u64 = u64::MAX; // Invalid snapshot state
49pub const INVALID_SNAPSHOT_VERSION: u64 = u64::MAX >> 1; // Invalid snapshot version
50const DEFAULT_MAX_SNAPSHOT_THREAD_NUM: usize = 64; // Maximum numbers of concurrent threads in Bf-tree, if snapshot is enabled.
51const SNAPSHOT_STATE_PHASE_ID_SHIFT: usize = 61; // Number of bits to shift for phase id
52const SNAPSHOT_STATE_PHASE_NUM: u64 = 4; // There are 4 snapshot phases
53const SNAPSHOT_STATE_PHASE_ID_MASK: u64 = 0b111 << SNAPSHOT_STATE_PHASE_ID_SHIFT; // Most significant 3 bits for phase id (allowing up to 8 phases)
54const SNAPSHOT_STATE_VERSION_MASK: u64 = (1 << SNAPSHOT_STATE_PHASE_ID_SHIFT) - 1; // Least significant 61 bits for version number
55
56/// Snapshot phase identifier.
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum PhaseId {
59    Rest,
60    Prepare,
61    InProgress,
62    Sweep,
63}
64
65impl PhaseId {
66    fn from_raw(value: u64) -> Self {
67        match value {
68            0 => PhaseId::Rest,
69            1 => PhaseId::Prepare,
70            2 => PhaseId::InProgress,
71            3 => PhaseId::Sweep,
72            _ => panic!("Invalid phase id: {}", value),
73        }
74    }
75
76    fn as_raw(self) -> u64 {
77        match self {
78            PhaseId::Rest => 0,
79            PhaseId::Prepare => 1,
80            PhaseId::InProgress => 2,
81            PhaseId::Sweep => 3,
82        }
83    }
84}
85
86/// A simplified CPR snapshot of a Bf-Tree.
87/// For details, see the original CPR paper.
88/// When compared to the original CPR paper, the following simplifications were made:
89/// 1. No epoch framework, global state machine only.
90/// 2. No 2PL and partial execution of read/upsert/delete/ in one version allowed
91///    E.g., a bf-tree upsert operation requires a series of mini-transcational modifications m_0..m_n.
92///    We do not lock all m(s) beforehand. Also, we allow the operation to restart if at m_i CPR detects
93///    a version inconsistency (PREPARE/v thread seeing a (v+1) record). This is OK for bf-tree
94///    because all m(s) are independent even though a m_i could involve multiple page modifications.
95pub struct CPRSnapShotMgr {
96    // Snapshot state
97    // | 3 bits   |     61 bits    |
98    // | phase id | version number |
99    global_state: AtomicU64,
100    // False means the thread slot is vacant while True means occupied
101    thread_slots: [AtomicBool; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
102    // Thread-level snapshot state
103    thread_local_states: [AtomicU64; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
104    // Mappings of base/mini/inner nodes to their corresponding snapshot file offsets.
105    // Each user thread updates its own mappings asyncrhonously and get merged in the end.
106    thread_local_inner_mappings:
107        UnsafeCell<[Vec<(*const InnerNode, usize)>; DEFAULT_MAX_SNAPSHOT_THREAD_NUM]>,
108    thread_local_base_mappings: UnsafeCell<[Vec<(PageID, usize)>; DEFAULT_MAX_SNAPSHOT_THREAD_NUM]>,
109    thread_local_mini_mappings: UnsafeCell<[Vec<(PageID, usize)>; DEFAULT_MAX_SNAPSHOT_THREAD_NUM]>,
110    thread_local_mini_size_mappings:
111        UnsafeCell<[Vec<(PageID, usize)>; DEFAULT_MAX_SNAPSHOT_THREAD_NUM]>,
112    root_id: AtomicU64, // Page ID of the root node.
113    pause_snapshot: AtomicBool,
114    // The physical file of snapshot. Wrapped in RwLock so the underlying
115    // vfs can be swapped when taking a new snapshot.
116    // Arc<Box<dyn VfsImpl>> is another option.
117    vfs: RwLock<Arc<dyn VfsImpl>>,
118    // Ensuring only one snapshot is in progress at a time.
119    snapshot_in_progress: AtomicBool,
120}
121
122unsafe impl Sync for CPRSnapShotMgr {}
123
124unsafe impl Send for CPRSnapShotMgr {}
125
126/// Within the life time of this guard, bf-tree transactions are protected by CPR logic.
127pub struct CPRSnapshotGuard {
128    snapshot_mgr: Option<Arc<CPRSnapShotMgr>>,
129    thread_slot_id: usize,
130    snapshot_version: u64,
131    phase_id: PhaseId,
132}
133
134impl CPRSnapshotGuard {
135    pub fn new(snapshot_mgr: Option<Arc<CPRSnapShotMgr>>) -> Result<Self, ()> {
136        match snapshot_mgr {
137            None => Ok(Self {
138                snapshot_mgr: None,
139                thread_slot_id: INVALID_SNAPSHOT_THREAD_ID,
140                snapshot_version: INVALID_SNAPSHOT_VERSION,
141                phase_id: PhaseId::Rest,
142            }),
143            Some(ref mgr) => {
144                let (thread_slot_id, snapshot_version, phase_id) = mgr.reserve_thread_slot()?;
145
146                Ok(Self {
147                    snapshot_mgr: Some(mgr.clone()),
148                    thread_slot_id,
149                    snapshot_version,
150                    phase_id,
151                })
152            }
153        }
154    }
155
156    /// Returns the snapshot version for this thread's transaction.
157    pub fn snapshot_version(&self) -> u64 {
158        self.snapshot_version
159    }
160
161    /// Returns the phase id at the time the guard was acquired.
162    pub fn get_local_phase_id(&self) -> PhaseId {
163        self.phase_id
164    }
165
166    /// Returns true if the snapshot guard has a valid thread slot id.
167    pub fn is_protected(&self) -> bool {
168        self.thread_slot_id != INVALID_SNAPSHOT_THREAD_ID
169    }
170
171    pub fn snapshot_base_page(&self, id: PageID, ptr: &[u8], size: usize) {
172        self.snapshot_mgr
173            .as_ref()
174            .unwrap()
175            .snapshot_base_page(id, ptr, size, self.thread_slot_id);
176    }
177
178    pub fn snapshot_mini_page(&self, id: PageID, ptr: &[u8], size: usize) {
179        self.snapshot_mgr
180            .as_ref()
181            .unwrap()
182            .snapshot_mini_page(id, ptr, size, self.thread_slot_id);
183    }
184
185    pub fn snapshot_inner_node(&self, ptr: *const InnerNode) {
186        self.snapshot_mgr
187            .as_ref()
188            .unwrap()
189            .snapshot_inner_node(ptr, self.thread_slot_id);
190    }
191
192    pub fn snapshot_root_page(&self, root_id: PageID) {
193        self.snapshot_mgr
194            .as_ref()
195            .unwrap()
196            .snapshot_root_page(root_id);
197    }
198}
199
200impl Drop for CPRSnapshotGuard {
201    fn drop(&mut self) {
202        if let Some(ref mgr) = self.snapshot_mgr {
203            mgr.release_thread_slot(self.thread_slot_id);
204        }
205    }
206}
207
208impl CPRSnapShotMgr {
209    pub fn are_all_threads_in_next_version(&self) -> bool {
210        if !self.snapshot_in_progress.load(Ordering::Acquire) {
211            false
212        } else {
213            let global_phase_id = self.get_global_phase_id();
214            global_phase_id == PhaseId::Sweep || global_phase_id == PhaseId::Rest
215        }
216    }
217
218    /// Initialize a new snapshot instance.
219    pub fn new(version: u64) -> Self {
220        let vfs: Arc<dyn VfsImpl> = make_vfs(&StorageBackend::Memory, ":memory:");
221        Self {
222            global_state: AtomicU64::new(Self::new_snapshot_state(0, version)), // Initial state: (REST, version)
223            thread_slots: [const { AtomicBool::new(false) }; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
224            thread_local_states: [const { AtomicU64::new(INVALID_SNAPSHOT_STATE) };
225                DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
226            thread_local_inner_mappings: UnsafeCell::new(
227                [const { Vec::new() }; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
228            ),
229            thread_local_base_mappings: UnsafeCell::new(
230                [const { Vec::new() }; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
231            ),
232            thread_local_mini_mappings: UnsafeCell::new(
233                [const { Vec::new() }; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
234            ),
235            thread_local_mini_size_mappings: UnsafeCell::new(
236                [const { Vec::new() }; DEFAULT_MAX_SNAPSHOT_THREAD_NUM],
237            ),
238            root_id: AtomicU64::new(0),
239            pause_snapshot: AtomicBool::new(false),
240            vfs: RwLock::new(vfs),
241            snapshot_in_progress: AtomicBool::new(false),
242        }
243    }
244
245    /// Reset the snapshot
246    fn reset(&self) {
247        let local_inner_mappings = unsafe { &mut *self.thread_local_inner_mappings.get() };
248        let local_mini_mappings = unsafe { &mut *self.thread_local_mini_mappings.get() };
249        let local_base_mappings = unsafe { &mut *self.thread_local_base_mappings.get() };
250        let local_mini_size_mappings = unsafe { &mut *self.thread_local_mini_size_mappings.get() };
251        // De-duplicate mappings
252        for thread_slot_id in 0..DEFAULT_MAX_SNAPSHOT_THREAD_NUM {
253            local_inner_mappings[thread_slot_id] = Vec::new();
254            local_mini_mappings[thread_slot_id] = Vec::new();
255            local_base_mappings[thread_slot_id] = Vec::new();
256            local_mini_size_mappings[thread_slot_id] = Vec::new();
257        }
258
259        self.root_id.store(0, Ordering::Release);
260    }
261
262    pub fn new_snapshot_state(phase_id: u64, version: u64) -> u64 {
263        assert!(
264            phase_id < SNAPSHOT_STATE_PHASE_NUM,
265            "Phase id must be less than {}",
266            SNAPSHOT_STATE_PHASE_NUM
267        );
268        assert!(
269            version < (1 << SNAPSHOT_STATE_PHASE_ID_SHIFT),
270            "Version must be less than 2^61"
271        );
272        (phase_id << SNAPSHOT_STATE_PHASE_ID_SHIFT) | version
273    }
274
275    fn get_global_version(&self) -> u64 {
276        self.global_state.load(Ordering::Acquire) & SNAPSHOT_STATE_VERSION_MASK
277    }
278
279    fn get_global_phase_id(&self) -> PhaseId {
280        PhaseId::from_raw(
281            (self.global_state.load(Ordering::Acquire) & SNAPSHOT_STATE_PHASE_ID_MASK)
282                >> SNAPSHOT_STATE_PHASE_ID_SHIFT,
283        )
284    }
285
286    /// Retrieve the local state of a thread specified by its slot id.
287    fn get_local_state(&self, thread_slot_id: &usize) -> u64 {
288        self.thread_local_states[*thread_slot_id].load(Ordering::Acquire)
289    }
290
291    fn set_local_state(&self, thread_slot_id: &usize, state: u64) {
292        // Don't dirty the existing state if it's the same.
293        let current_state = self.get_local_state(thread_slot_id);
294        if current_state == state {
295            return;
296        }
297
298        self.thread_local_states[*thread_slot_id].store(state, Ordering::Release);
299    }
300
301    /// Move the global snapshot stable to the next one.
302    fn advance_global_state(&self) -> u64 {
303        let phase_id = self.get_global_phase_id();
304        let version = self.get_global_version();
305
306        match phase_id {
307            PhaseId::Rest => {
308                // (REST, v) -> (PREPARE, v)
309                let new_state = Self::new_snapshot_state(PhaseId::Prepare.as_raw(), version);
310                self.global_state.store(new_state, Ordering::Release);
311                new_state
312            }
313            PhaseId::Prepare => {
314                // (PREPARE, v) -> (IN_PROGRESS, v + 1)
315                let new_state = Self::new_snapshot_state(PhaseId::InProgress.as_raw(), version + 1);
316                self.global_state.store(new_state, Ordering::Release);
317                new_state
318            }
319            PhaseId::InProgress => {
320                // (IN_PROGRESS, v + 1) -> (SWEEPING, v + 1)
321                let new_state = Self::new_snapshot_state(PhaseId::Sweep.as_raw(), version);
322                self.global_state.store(new_state, Ordering::Release);
323                new_state
324            }
325            PhaseId::Sweep => {
326                // (SWEEPING, v + 1) -> (REST, v + 1)
327                let new_state = Self::new_snapshot_state(PhaseId::Rest.as_raw(), version);
328                self.global_state.store(new_state, Ordering::Release);
329                new_state
330            }
331        }
332    }
333
334    /// If all thread local states are either invalid or equal to the target state, return true.
335    /// This can only be invoked after the global state has advanced to the target_state.
336    fn check_if_phase_completed(&self, target_state: u64) -> bool {
337        // Checking all thread local states is sufficient because of the guarantee in `reserve_thread_slot`
338        for thread_slot_id in 0..DEFAULT_MAX_SNAPSHOT_THREAD_NUM {
339            let local_state = self.thread_local_states[thread_slot_id].load(Ordering::Acquire);
340            if local_state != INVALID_SNAPSHOT_STATE && local_state != target_state {
341                return false;
342            }
343        }
344        true
345    }
346
347    /// Obtain an unique thread slot id for the caller thread.
348    /// Guarantee that any local state assigned after the global state advances to the next one,
349    /// will either be reversed without further action or in the new state.
350    pub fn reserve_thread_slot(&self) -> Result<(usize, u64, PhaseId), ()> {
351        if self.pause_snapshot.load(Ordering::Acquire) {
352            return Err(());
353        }
354
355        let start = get_rng().random_range(0..DEFAULT_MAX_SNAPSHOT_THREAD_NUM);
356        let end = 2 * DEFAULT_MAX_SNAPSHOT_THREAD_NUM;
357
358        for i in start..end {
359            let tid = i % DEFAULT_MAX_SNAPSHOT_THREAD_NUM;
360            if self.thread_slots[tid]
361                .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
362                .is_ok()
363            {
364                // Set the caller thread's snapshot local state to the global state
365                let global_state = self.global_state.load(Ordering::Acquire);
366                self.set_local_state(&tid, global_state);
367
368                // If the global state has changed or a pause is requested after setting the local state, reset
369                // This is to guarantee that as soon as global state rolls to the next one,
370                // all new local states will either be reversed without further action or in the new state.
371                // Otherwise something bad could happen as described below:
372                // T1: state = global phase 'x'
373                // Mgr: global phase <- 'x + 1'
374                // Mrgr: all threads in phase 'x + 1' or invalid -> Execute 'x + 1' action
375                // T1: local state = state <- Inconsistency with global state
376                // Similar case for the pause_snapshot flag.
377                if self.get_local_state(&tid) != self.global_state.load(Ordering::Acquire)
378                    || self.pause_snapshot.load(Ordering::Acquire)
379                {
380                    self.set_local_state(&tid, INVALID_SNAPSHOT_STATE);
381                    assert!(self.thread_slots[tid]
382                        .compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
383                        .is_ok());
384                    return Err(());
385                } else {
386                    let version = global_state & SNAPSHOT_STATE_VERSION_MASK;
387                    let phase_id = PhaseId::from_raw(
388                        (global_state & SNAPSHOT_STATE_PHASE_ID_MASK)
389                            >> SNAPSHOT_STATE_PHASE_ID_SHIFT,
390                    );
391                    return Ok((tid, version, phase_id));
392                }
393            }
394        }
395        Err(())
396    }
397
398    /// Free up the thread slot specified by the given thread slot id.
399    pub fn release_thread_slot(&self, thread_slot_id: usize) {
400        self.set_local_state(&thread_slot_id, INVALID_SNAPSHOT_STATE);
401        self.thread_slots[thread_slot_id].store(false, Ordering::Release);
402    }
403
404    pub fn get_snapshot_guard(
405        snapshot_mgr: Option<Arc<CPRSnapShotMgr>>,
406    ) -> Result<CPRSnapshotGuard, ()> {
407        CPRSnapshotGuard::new(snapshot_mgr)
408    }
409
410    /// Snapshot a page to the current snapshot file and return its offset in the file.
411    /// The invoker needs to guarantee that the page to be copied is xlocked
412    /// throughout the lifetime of this function.
413    /// Also the invoker needs to guarantee proper alignment of ptr which could be required
414    /// by the underlying vfs. (E.g., io_uring_vfs requires 512B alignment)
415    pub fn snapshot_page(&self, ptr: &[u8], size: usize) -> usize {
416        // Allocate space in the snapshot file
417        let vfs = self.vfs.read().unwrap().clone();
418        let offset = vfs.alloc_offset(size);
419
420        // Copy the page (ptr) to the new space
421        vfs.write(offset, ptr);
422
423        // Return the offset
424        offset
425    }
426
427    pub fn snapshot_inner_node(&self, ptr: *const InnerNode, thread_slot_id: usize) {
428        let offset = unsafe { self.snapshot_page((&*ptr).as_slice(), INNER_NODE_SIZE) };
429        let inner_mappings = unsafe { &mut *self.thread_local_inner_mappings.get() };
430
431        inner_mappings[thread_slot_id].push((ptr, offset));
432    }
433
434    pub fn snapshot_mini_page(&self, id: PageID, ptr: &[u8], size: usize, thread_slot_id: usize) {
435        let offset = if size != 0 {
436            self.snapshot_page(ptr, size)
437        } else {
438            // cache-only mode, NULL page
439            NULL_PAGE_LOCATION_OFFSET
440        };
441
442        let mini_mappings = unsafe { &mut *self.thread_local_mini_mappings.get() };
443        mini_mappings[thread_slot_id].push((id, offset));
444
445        let mini_size_mappings = unsafe { &mut *self.thread_local_mini_size_mappings.get() };
446        mini_size_mappings[thread_slot_id].push((id, size));
447
448        assert!(mini_mappings[thread_slot_id].len() == mini_size_mappings[thread_slot_id].len());
449    }
450
451    pub fn snapshot_base_page(&self, id: PageID, ptr: &[u8], size: usize, thread_slot_id: usize) {
452        let offset = self.snapshot_page(ptr, size);
453
454        let base_mappings = unsafe { &mut *self.thread_local_base_mappings.get() };
455        base_mappings[thread_slot_id].push((id, offset));
456    }
457
458    pub fn snapshot_root_page(&self, root_id: PageID) {
459        let cur_root_id = self.root_id.load(Ordering::Acquire);
460        if cur_root_id != 0 {
461            assert_eq!(cur_root_id, root_id.raw());
462        }
463
464        self.root_id.store(root_id.raw(), Ordering::Release);
465    }
466
467    /// Sweep through all data pages and take snapshots of those
468    /// whose version is less than the passed-in snapshot version.
469    fn sweep(
470        &self,
471        tree: &BfTree,
472        version: u64,
473        inner_mapping: &mut Vec<(*const InnerNode, usize)>,
474        mini_mapping: &mut Vec<(PageID, usize)>,
475        mini_size_mapping: &mut Vec<(PageID, usize)>,
476        base_mapping: &mut Vec<(PageID, usize)>,
477    ) -> usize {
478        // There is no page table for inner nodes including the root, and each inner node's information is only
479        // saved in the tree structure itself. As a result, we need to traverse the tree to find all inner nodes.
480        // However, without blocking inner node splitting, the tree structure could change concurrently while we
481        // are BFS/DFSing the tree, making it difficult to enumerate all inner nodes to build inner node mappings.
482        // As such we freeze the tree structure temporarily using a 3-phase approach.
483        // Phase 1: Block all snapshot id reservation, drain the thread table.
484        // Phase 2: Traverse the tree and take snapshots of inner nodes whose version is < 'version'.
485        // Phase 3: Unblock snapshot id reservation.
486        // Given that there are usually a very limited number of inner nodes, user workload should be affected minimally.
487        // TODO: A complete block-free approach to scan all inner nodes.
488        self.pause_snapshot.store(true, Ordering::Release);
489        loop {
490            if self.check_if_phase_completed(INVALID_SNAPSHOT_STATE) {
491                // Upon reaching here, no user threads can obtain a snapshot id nor making changes to the tree structure.
492                // We sweep the root node first
493                loop {
494                    let root_id = tree.get_root_page();
495                    let rid = root_id.0;
496                    if root_id.1 {
497                        // Leaf
498                        let mut leaf = tree.mapping_table().get(&rid);
499                        let page_loc = leaf.get_page_location();
500
501                        match page_loc {
502                            PageLocation::Base(offset) => {
503                                let base_ref = leaf.load_base_page(*offset);
504                                if base_ref.get_clean_snapshot_version() < version {
505                                    let base_ptr = unsafe {
506                                        std::slice::from_raw_parts(
507                                            base_ref as *const LeafNode as *const u8,
508                                            base_ref.meta.node_size as usize,
509                                        )
510                                    };
511                                    let offset = self
512                                        .snapshot_page(base_ptr, base_ref.meta.node_size as usize);
513                                    base_mapping.push((rid, offset));
514                                    self.snapshot_root_page(rid);
515                                }
516                            }
517                            PageLocation::Mini(ptr) => {
518                                // Root page is a mini page only in cache-only mode.
519                                assert!(tree.cache_only);
520
521                                let mini_ref = leaf.load_cache_page(*ptr);
522                                if mini_ref.get_clean_snapshot_version() < version {
523                                    let mini_ptr = unsafe {
524                                        std::slice::from_raw_parts(
525                                            mini_ref as *const LeafNode as *const u8,
526                                            mini_ref.meta.node_size as usize,
527                                        )
528                                    };
529                                    let offset = self
530                                        .snapshot_page(mini_ptr, mini_ref.meta.node_size as usize);
531                                    mini_mapping.push((rid, offset));
532                                    mini_size_mapping.push((rid, mini_ref.meta.node_size as usize));
533                                    self.snapshot_root_page(rid);
534                                }
535                            }
536                            _ => {
537                                panic!("Unexpected page location for root page: {:?}", page_loc);
538                            }
539                        }
540
541                        break;
542                    } else {
543                        // Inner
544                        let ptr = rid.as_inner_node();
545                        // No need for WriteGuard as the tree structured is frozen and there are no active writers.
546                        let inner = match ReadGuard::try_read(ptr) {
547                            Ok(inner) => inner,
548                            Err(_) => continue,
549                        };
550
551                        if inner.as_ref().get_clean_snapshot_version() < version {
552                            let offset =
553                                unsafe { self.snapshot_page((&*ptr).as_slice(), INNER_NODE_SIZE) };
554                            inner_mapping.push((ptr, offset));
555                            self.snapshot_root_page(rid);
556                        }
557                        break;
558                    }
559                }
560
561                // Inner nodes
562                let visitor = BfsVisitor::new_inner_only(tree);
563                for node in visitor {
564                    loop {
565                        match node {
566                            NodeInfo::Inner { ptr, .. } => {
567                                // No need for WriteGuard as the tree structured is frozen and there are no active writers.
568                                let inner = match ReadGuard::try_read(ptr) {
569                                    Ok(inner) => inner,
570                                    Err(_) => continue,
571                                };
572
573                                if inner.as_ref().get_clean_snapshot_version() < version {
574                                    let offset = unsafe {
575                                        self.snapshot_page((&*ptr).as_slice(), INNER_NODE_SIZE)
576                                    };
577                                    inner_mapping.push((ptr, offset));
578                                }
579
580                                break;
581                            }
582                            NodeInfo::Leaf { level, .. } => {
583                                // This should have been captured by the case when root node is a leaf
584                                assert_eq!(level, 0);
585                                break;
586                            }
587                        }
588                    }
589                }
590
591                break;
592            }
593            // At most wasting 1 second per state transition.
594            thread::sleep(std::time::Duration::from_secs(1));
595
596            #[cfg(all(feature = "shuttle", test))]
597            shuttle::thread::yield_now();
598        }
599
600        // Resume workload
601        self.pause_snapshot.store(false, Ordering::Release);
602
603        // In SWEEPING phase, there will be no new disk pages with v < 'version' being created. As such,
604        // a sequential sweep of the page table is sufficient to capture all data pages with v < 'version'.
605        let page_table_iter = tree.storage.page_table.iter();
606        let mut enumerate_leaf_count = 0;
607
608        for (_, pid) in page_table_iter {
609            assert!(pid.is_id());
610
611            // A reader lock is enough
612            let mut leaf = tree.mapping_table().get(&pid);
613            let page_loc = leaf.get_page_location().clone();
614            enumerate_leaf_count += 1;
615
616            match page_loc {
617                PageLocation::Base(offset) => {
618                    let base_ref = leaf.load_base_page(offset);
619                    if base_ref.get_clean_snapshot_version() < version {
620                        let base_ptr = unsafe {
621                            std::slice::from_raw_parts(
622                                base_ref as *const LeafNode as *const u8,
623                                base_ref.meta.node_size as usize,
624                            )
625                        };
626                        let new_offset =
627                            self.snapshot_page(base_ptr, base_ref.meta.node_size as usize);
628                        base_mapping.push((pid, new_offset));
629                    }
630                }
631                PageLocation::Full(ptr) => {
632                    // We snapshot Full page as a disk page to reduce some complexity as they are equivalent.
633                    let full_ref = leaf.load_cache_page(ptr);
634                    if full_ref.get_clean_snapshot_version() < version {
635                        // Temporarily change the next level to null for snapshotting.
636                        // and reverse afterwards.
637                        let next_level = full_ref.next_level;
638                        let full_page = unsafe { &mut *ptr };
639                        full_page.next_level = MiniPageNextLevel::new_null();
640                        let full_ptr = unsafe {
641                            std::slice::from_raw_parts(
642                                full_ref as *const LeafNode as *const u8,
643                                full_ref.meta.node_size as usize,
644                            )
645                        };
646                        let offset = self.snapshot_page(full_ptr, full_ref.meta.node_size as usize);
647                        full_page.next_level = next_level;
648                        base_mapping.push((pid, offset));
649                    }
650                }
651                PageLocation::Mini(ptr) => {
652                    let mini_ref = leaf.load_cache_page(ptr);
653                    if mini_ref.get_clean_snapshot_version() < version {
654                        let mini_ptr = unsafe {
655                            std::slice::from_raw_parts(
656                                mini_ref as *const LeafNode as *const u8,
657                                mini_ref.meta.node_size as usize,
658                            )
659                        };
660                        let offset = self.snapshot_page(mini_ptr, mini_ref.meta.node_size as usize);
661                        mini_mapping.push((pid, offset));
662                        mini_size_mapping.push((pid, mini_ref.meta.node_size as usize));
663
664                        if !tree.cache_only {
665                            // In disk-mode, the base page of mini-page is part of the snapshot as well.
666                            let base_ref = leaf.load_base_page(mini_ref.next_level.as_offset());
667                            assert!(base_ref.get_clean_snapshot_version() < version); // disk page's version should never be greater than its mini-page's.
668
669                            let base_ptr = unsafe {
670                                std::slice::from_raw_parts(
671                                    base_ref as *const LeafNode as *const u8,
672                                    base_ref.meta.node_size as usize,
673                                )
674                            };
675                            let offset =
676                                self.snapshot_page(base_ptr, base_ref.meta.node_size as usize);
677                            base_mapping.push((pid, offset));
678                        }
679                    }
680                }
681                PageLocation::Null => {
682                    assert!(tree.cache_only);
683                    // In cache-only mode, an entry in page table could be Null when the corresponding mini-page is evicted.
684                    // The Null page is also snapshotted with a special marker.
685                    // Note that, the underlying assumption here is that Null page is always of an older version which may not be true.
686                    // To reconcille with the CPR semantics, we say that any data page in cache-only mode could be either Null or a valid page.
687                    // TODO: version the Null pages.
688                    mini_mapping.push((pid, NULL_PAGE_LOCATION_OFFSET)); // Special marker
689                    mini_size_mapping.push((pid, 0));
690                }
691            }
692        }
693
694        enumerate_leaf_count
695    }
696
697    /// Finalize the snapshot file and reset the snapshotmgr's internal data.
698    #[allow(clippy::too_many_arguments)]
699    fn finalize(
700        &self,
701        snapshot_version: u64,
702        inner_mapping: &mut [(*const InnerNode, usize)],
703        mini_mapping: &mut [(PageID, usize)],
704        mini_size_mapping: &mut [(PageID, usize)],
705        base_mapping: &mut [(PageID, usize)],
706        leaf_count_upper: usize,
707        config: Arc<Config>,
708    ) {
709        // There could be duplicate leaf/inner node mappings and we use a hash map to de-duplicate them.
710        let mut inner_mapping_unique: HashMap<*const InnerNode, usize> = HashMap::new();
711        let mut mini_mapping_unique: HashMap<PageID, usize> = HashMap::new();
712        let mut mini_size_mapping_unique: HashMap<PageID, usize> = HashMap::new();
713        let mut base_mapping_unique: HashMap<PageID, usize> = HashMap::new();
714
715        let local_inner_mappings = unsafe { &mut *self.thread_local_inner_mappings.get() };
716        let local_mini_mappings = unsafe { &mut *self.thread_local_mini_mappings.get() };
717        let local_mini_size_mappings = unsafe { &mut *self.thread_local_mini_size_mappings.get() };
718        let local_base_mappings = unsafe { &mut *self.thread_local_base_mappings.get() };
719
720        for thread_slot_id in 0..DEFAULT_MAX_SNAPSHOT_THREAD_NUM {
721            let entry_num = local_mini_mappings[thread_slot_id].len();
722            assert!(
723                local_mini_mappings[thread_slot_id].len()
724                    == local_mini_size_mappings[thread_slot_id].len()
725            );
726            for i in 0..entry_num {
727                assert!(
728                    local_mini_mappings[thread_slot_id][i].0
729                        == local_mini_size_mappings[thread_slot_id][i].0
730                );
731
732                if local_mini_mappings[thread_slot_id][i].1 == NULL_PAGE_LOCATION_OFFSET {
733                    assert_eq!(local_mini_size_mappings[thread_slot_id][i].1, 0);
734                } else {
735                    assert!(local_mini_size_mappings[thread_slot_id][i].1 > 0);
736                }
737
738                if let std::collections::hash_map::Entry::Vacant(e) =
739                    mini_mapping_unique.entry(local_mini_mappings[thread_slot_id][i].0)
740                {
741                    e.insert(local_mini_mappings[thread_slot_id][i].1);
742                    mini_size_mapping_unique.insert(
743                        local_mini_size_mappings[thread_slot_id][i].0,
744                        local_mini_size_mappings[thread_slot_id][i].1,
745                    );
746                    assert!(mini_mapping_unique.len() == mini_size_mapping_unique.len());
747                }
748            }
749            assert_eq!(local_mini_mappings[thread_slot_id].len(), entry_num);
750            assert_eq!(local_mini_size_mappings[thread_slot_id].len(), entry_num);
751        }
752
753        // De-duplicate mappings among snapshot threads
754        for thread_slot_id in 0..DEFAULT_MAX_SNAPSHOT_THREAD_NUM {
755            for m in local_inner_mappings[thread_slot_id].iter() {
756                inner_mapping_unique.entry(m.0).or_insert(m.1);
757            }
758
759            for m in local_base_mappings[thread_slot_id].iter() {
760                base_mapping_unique.entry(m.0).or_insert(m.1);
761            }
762        }
763
764        // Sanity checks
765        assert!(mini_mapping_unique.len() == mini_size_mapping_unique.len());
766        for (k, v) in mini_mapping_unique.iter() {
767            assert!(mini_size_mapping_unique.contains_key(k));
768            if *v == NULL_PAGE_LOCATION_OFFSET {
769                assert_eq!(mini_size_mapping_unique.get(k).copied().unwrap(), 0);
770            } else {
771                assert!(mini_size_mapping_unique.get(k).copied().unwrap() > 0);
772            }
773        }
774
775        // De-duplicate with the sweep mappings
776        for (k, v) in inner_mapping.iter() {
777            if !inner_mapping_unique.contains_key(k) {
778                inner_mapping_unique.insert(*k, *v);
779            }
780        }
781        for (k, v) in mini_mapping.iter() {
782            if !mini_mapping_unique.contains_key(k) {
783                mini_mapping_unique.insert(*k, *v);
784            }
785        }
786        for (k, v) in mini_size_mapping.iter() {
787            if !mini_size_mapping_unique.contains_key(k) {
788                mini_size_mapping_unique.insert(*k, *v);
789            } else {
790                if !config.cache_only {
791                    assert!(*v == mini_size_mapping_unique.get(k).copied().unwrap());
792                }
793            }
794        }
795        for (k, v) in base_mapping.iter() {
796            if !base_mapping_unique.contains_key(k) {
797                base_mapping_unique.insert(*k, *v);
798            }
799        }
800
801        // Sanity checks
802        assert!(mini_mapping_unique.len() == mini_size_mapping_unique.len());
803        for (k, v) in mini_mapping_unique.iter() {
804            assert!(mini_size_mapping_unique.contains_key(k));
805            if *v == NULL_PAGE_LOCATION_OFFSET {
806                assert_eq!(mini_size_mapping_unique.get(k).copied().unwrap(), 0);
807            } else {
808                assert!(mini_size_mapping_unique.get(k).copied().unwrap() > 0);
809            }
810        }
811
812        if config.cache_only {
813            assert!(base_mapping_unique.is_empty());
814        } else {
815            assert!(mini_mapping_unique.len() <= base_mapping_unique.len());
816        }
817
818        // Finalize the inner node mappings of the snapshot
819        let mut final_inner_mapping: Vec<(*const InnerNode, usize)> = Vec::new();
820        for (k, v) in inner_mapping_unique.into_iter() {
821            final_inner_mapping.push((k, v));
822        }
823
824        // Finalize the base leaf node mappings of the snapshot, disk-mode only
825        // Sort the base leaf node mappings by PageID in ascending order
826        // which is required for page table initialization.
827        let mut sorted_base_mapping_uninit: Vec<MaybeUninit<(PageID, usize)>> =
828            Vec::with_capacity(base_mapping_unique.len());
829        unsafe {
830            sorted_base_mapping_uninit.set_len(base_mapping_unique.len());
831        }
832        let mut sorted_base_mapping_init = vec![false; base_mapping_unique.len()];
833
834        for (k, v) in base_mapping_unique.iter() {
835            assert!(k.is_id());
836            let offset = k.as_id();
837            assert!((offset as usize) < sorted_base_mapping_uninit.len());
838            sorted_base_mapping_init[offset as usize] = true;
839            sorted_base_mapping_uninit[offset as usize].write((*k, *v));
840        }
841        let final_sorted_base_mapping: Vec<(PageID, usize)> = if !config.cache_only {
842            assert_eq!(
843                base_mapping_unique.len(),
844                sorted_base_mapping_init.iter().filter(|&&b| b).count()
845            );
846            unsafe {
847                std::mem::transmute::<
848                    std::vec::Vec<std::mem::MaybeUninit<(PageID, usize)>>,
849                    std::vec::Vec<(PageID, usize)>,
850                >(sorted_base_mapping_uninit)
851            }
852        } else {
853            Vec::new()
854        };
855
856        // Finalize mini-page leaf node mappings of the snapshot
857        let mut final_mini_mapping: Vec<(PageID, usize)> = Vec::new();
858        for (k, v) in mini_mapping_unique.into_iter() {
859            final_mini_mapping.push((k, v));
860        }
861
862        let mut final_mini_size_mapping: Vec<(PageID, usize)> = Vec::new();
863        for (k, v) in mini_size_mapping_unique.into_iter() {
864            final_mini_size_mapping.push((k, v));
865        }
866
867        let leaf_page_num = if config.cache_only {
868            // For a pagelocation that's NULL (evicted), we don't know its version
869            // As such, we assume they are of the snapshot version and should be included.
870            // A few slots in page table mnight be wasted due to false positive.
871            // TODO: Be precise which requires some more metadata. Ideas:
872            // 1) Count number of child leaf nodes in snapshotted inner nodes
873            // 2) Put version in Null PageLocation.
874            // 3) Compact the snapshot file
875            leaf_count_upper
876        } else {
877            assert!(leaf_count_upper >= final_sorted_base_mapping.len());
878            final_sorted_base_mapping.len()
879        };
880
881        let mut file_size = std::mem::size_of::<BfTreeMeta>() as u64;
882
883        // Flush various mappings into the snapshot file
884        let (inner_offset, inner_size) =
885            serialize_vec_to_disk(&final_inner_mapping, &self.vfs.read().unwrap());
886
887        if inner_offset != 0 {
888            file_size = (inner_offset + align_to_sector_size(inner_size)) as u64;
889        }
890
891        let (mini_offset, mini_size) =
892            serialize_vec_to_disk(&final_mini_mapping, &self.vfs.read().unwrap());
893
894        if mini_offset != 0 {
895            file_size = (mini_offset + align_to_sector_size(mini_size)) as u64;
896        }
897
898        let (mini_size_offset, mini_size_size) =
899            serialize_vec_to_disk(&final_mini_size_mapping, &self.vfs.read().unwrap());
900
901        if mini_size_offset != 0 {
902            file_size = (mini_size_offset + align_to_sector_size(mini_size_size)) as u64;
903        }
904
905        let (base_offset, base_size) =
906            serialize_vec_to_disk(&final_sorted_base_mapping, &self.vfs.read().unwrap());
907
908        if base_offset != 0 {
909            file_size = (base_offset + align_to_sector_size(base_size)) as u64;
910        }
911
912        // Write the header to the first disk page of the snapshot file
913        let metadata = BfTreeMeta {
914            magic_begin: *BF_TREE_MAGIC_BEGIN,
915            root_id: unsafe { PageID::from_raw(self.root_id.load(Ordering::Acquire)) },
916            inner_offset,
917            inner_size,
918            mini_offset,
919            mini_size,
920            mini_size_offset,
921            mini_size_size,
922            base_offset,
923            base_size,
924            file_size,
925            leaf_page_num,
926            snapshot_version,
927            cache_only: config.cache_only,
928            cb_size_byte: config.cb_size_byte,
929            read_promotion_rate: config.read_promotion_rate.load(Ordering::Relaxed),
930            scan_promotion_rate: config.scan_promotion_rate.load(Ordering::Relaxed),
931            cb_min_record_size: config.cb_min_record_size,
932            cb_max_record_size: config.cb_max_record_size,
933            leaf_page_size: config.leaf_page_size,
934            cb_max_key_len: config.cb_max_key_len,
935            max_fence_len: config.max_fence_len,
936            cb_copy_on_access_ratio: config.cb_copy_on_access_ratio,
937            read_record_cache: config.read_record_cache,
938            max_mini_page_size: config.max_mini_page_size,
939            mini_page_binary_search: config.mini_page_binary_search,
940            write_load_full_page: config.write_load_full_page,
941            magic_end: *BF_TREE_MAGIC_END,
942        };
943
944        let vfs = self.vfs.read().unwrap();
945        vfs.write(META_DATA_PAGE_OFFSET, metadata.as_slice());
946        vfs.flush();
947
948        self.reset();
949    }
950
951    /// Take a CPR snapshot of a Bf-Tree
952    pub fn snapshot(&self, tree: &BfTree, snapshot_file_path: impl AsRef<Path>) {
953        // Allowing only one active snapshot at a time.
954        if self
955            .snapshot_in_progress
956            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
957            .is_err()
958        {
959            println!("Another snapshot is in progress, skipping this snapshot request.");
960            return;
961        }
962
963        // Create a vfs for the snapshot
964        let mut vfs_guard = self.vfs.write().unwrap();
965        let snapshot_vfs = make_vfs(&tree.config.snapshot_backend, snapshot_file_path);
966        let old_vfs = std::mem::replace(&mut *vfs_guard, snapshot_vfs);
967        drop(old_vfs);
968
969        // Reset the snapshot vfs before use
970        vfs_guard.reset();
971
972        // Drop the guard
973        drop(vfs_guard);
974
975        // Initialize a inner node and leaf node mapping for the sweep
976        let mut sweep_inner_mapping: Vec<(*const InnerNode, usize)> = Vec::new();
977        let mut sweep_mini_mapping: Vec<(PageID, usize)> = Vec::new();
978        let mut sweep_mini_size_mapping: Vec<(PageID, usize)> = Vec::new();
979        let mut sweep_base_mapping: Vec<(PageID, usize)> = Vec::new();
980
981        // At the beginning, the global phase id must be 0 (REST).
982        let mut current_global_phase_id = self.get_global_phase_id();
983        assert_eq!(current_global_phase_id, PhaseId::Rest);
984
985        // Version of the ongoing snapshot is set
986        let snapshot_version = self.get_global_version();
987
988        // Immediately move the global state to (1 (PREPARE), snapshot_version)
989        let mut current_global_state = self.advance_global_state();
990        current_global_phase_id = self.get_global_phase_id();
991        assert_eq!(current_global_phase_id, PhaseId::Prepare);
992        assert_eq!(snapshot_version, self.get_global_version());
993
994        let mut leaf_node_count_upper_bound = 0; // Indicate the total number of leaf nodes in the captured snapshot.
995
996        loop {
997            if self.check_if_phase_completed(current_global_state) {
998                match current_global_phase_id {
999                    PhaseId::Rest => {
1000                        // Upon reaching here, all user threads are in (REST, snapshot_version + 1).
1001                        // All snapshots of pages of snapshot_version are done, and no more snapshot operations
1002                        // neither. As such, we can safely finalize the snapshot by writing out the
1003                        // metadata and page mappings.
1004                        self.finalize(
1005                            snapshot_version,
1006                            &mut sweep_inner_mapping,
1007                            &mut sweep_mini_mapping,
1008                            &mut sweep_mini_size_mapping,
1009                            &mut sweep_base_mapping,
1010                            leaf_node_count_upper_bound,
1011                            tree.config.clone(),
1012                        );
1013
1014                        // Close the snapshot vfs
1015                        let mut vfs_guard = self.vfs.write().unwrap();
1016                        let snapshot_vfs = make_vfs(&StorageBackend::Memory, ":memory:");
1017                        let old_vfs = std::mem::replace(&mut *vfs_guard, snapshot_vfs);
1018                        drop(old_vfs);
1019                        drop(vfs_guard);
1020
1021                        // The snapshot is done.
1022                        break;
1023                    }
1024                    PhaseId::Prepare => {
1025                        current_global_state = self.advance_global_state();
1026                        current_global_phase_id = self.get_global_phase_id();
1027                        assert_eq!(current_global_phase_id, PhaseId::InProgress);
1028                        assert_eq!(snapshot_version + 1, self.get_global_version());
1029                    }
1030                    PhaseId::InProgress => {
1031                        current_global_state = self.advance_global_state();
1032                        current_global_phase_id = self.get_global_phase_id();
1033                        assert_eq!(current_global_phase_id, PhaseId::Sweep);
1034                        assert_eq!(snapshot_version + 1, self.get_global_version());
1035                    }
1036                    PhaseId::Sweep => {
1037                        // Upon reaching here, there are no user threads with snapshot_version anymore.
1038                        // Sweep through and snapshot all data pages whose version is less than (snapshot_version + 1)
1039                        // and build inner node and leaf node mapping for those pages.
1040                        // Upon completion, all pages with version less than (snapshot_version + 1) should have been captured in the snapshot.
1041                        // Note that, there could be duplicate snapshots during and after the sweep as user threads in 'SWEEPING'
1042                        // state keeps taking snapshots of pages even if they have been captured by the sweep as the sweep does not
1043                        // alter page versions. De-duplication is needed when finalizing the snapshot file.
1044                        leaf_node_count_upper_bound = self.sweep(
1045                            tree,
1046                            snapshot_version + 1,
1047                            &mut sweep_inner_mapping,
1048                            &mut sweep_mini_mapping,
1049                            &mut sweep_mini_size_mapping,
1050                            &mut sweep_base_mapping,
1051                        );
1052
1053                        current_global_state = self.advance_global_state();
1054                        current_global_phase_id = self.get_global_phase_id();
1055                        assert_eq!(current_global_phase_id, PhaseId::Rest);
1056                        assert_eq!(snapshot_version + 1, self.get_global_version());
1057                    }
1058                }
1059            }
1060
1061            // At most wasting 1 second per state transition.
1062            thread::sleep(std::time::Duration::from_secs(1));
1063
1064            #[cfg(all(feature = "shuttle", test))]
1065            shuttle::thread::yield_now();
1066        }
1067
1068        assert!(self
1069            .snapshot_in_progress
1070            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Relaxed)
1071            .is_ok());
1072    }
1073
1074    /// Recover a Bf-tree from an existing snapshot file at recovery_snapshot_file_path.
1075    ///
1076    /// Most configuration params are directly retrieved from the snapshot file
1077    /// with the following ones for the caller to specify:
1078    /// 1. use_snapshot: whether the new tree will also support snapshot.
1079    /// 2. new_snapshot_file_path: If use_snapshot, then provide the path of the snapshot file of the newly recovered Bf-tree.
1080    ///    Note that, this path must be different from the recovery_snapshot_file_path.
1081    /// 3. buffer_ptr: optional pointer to a pre-allocated buffer for the newly recovered Bf-tree
1082    /// 4. buffer_size optional override of the buffer size retrieved from the snapshot file.
1083    ///    Note that, if the newly specified value is smaller than the one retrieved from the snapshot file,
1084    ///    then failure to restore a tree might happen. This is because during recovery, the memory cache pages
1085    ///    of the tree at the moment of snapshot are reinstated into memory too.
1086    /// 5. wal_config: optional write-ahead log configuration for the newly recovered Bf-tree
1087    pub fn new_from_snapshot(
1088        recovery_snapshot_file_path: impl AsRef<Path>, //  The snapshot file to recover from
1089        use_snapshot: bool,
1090        buffer_ptr: Option<*mut u8>,
1091        buffer_size: Option<usize>, // buffer size of the newly created Bf-tree
1092        wal_config: Option<Arc<WalConfig>>,
1093    ) -> Result<BfTree, ConfigError> {
1094        // Check the recovery file is valid
1095        if !recovery_snapshot_file_path.as_ref().exists() {
1096            // if not already exist, we just create a new empty file at the location.
1097            return Err(ConfigError::SnapshotFileInvalid(
1098                "Not found ".to_string() + recovery_snapshot_file_path.as_ref().to_str().unwrap(),
1099            ));
1100        }
1101
1102        // Create WAL, if specified
1103        let wal = wal_config.as_ref().map(|s| WriteAheadLog::new(s.clone()));
1104
1105        // Retrieve the header of the snapshot file and construct a valid config for the to-be-recovered Bf-tree
1106        let reader = std::fs::File::open(recovery_snapshot_file_path.as_ref()).unwrap();
1107        let mut metadata = SectorAlignedVector::new_zeroed(DISK_PAGE_SIZE); // Metadata is at most one disk page in size
1108        #[cfg(unix)]
1109        {
1110            reader.read_at(&mut metadata, 0).unwrap();
1111        }
1112        #[cfg(windows)]
1113        {
1114            reader.seek_read(&mut metadata, 0).unwrap();
1115        }
1116
1117        let bf_meta = unsafe { (metadata.as_ptr() as *const BfTreeMeta).read() };
1118        bf_meta.check_magic();
1119        assert_eq!(reader.metadata().unwrap().len(), bf_meta.file_size);
1120
1121        let mut bf_tree_config = Config::new_from_snapshot(&bf_meta);
1122
1123        let recovery_snapshot_file_backend = StorageBackend::Std; // TODO, recover storage backend from snapshot file
1124        if !bf_tree_config.cache_only {
1125            bf_tree_config.file_path(recovery_snapshot_file_path.as_ref());
1126            // The storage backend should use the same vfs system as the recovery snapshot file
1127            bf_tree_config.storage_backend = recovery_snapshot_file_backend.clone();
1128        } else {
1129            bf_tree_config.storage_backend = StorageBackend::Memory;
1130        }
1131        bf_tree_config.use_snapshot = use_snapshot;
1132
1133        bf_tree_config.snapshot_backend = StorageBackend::Std; // TODO, allow user chosen snapshot backend
1134
1135        let snapshot_mgr = if bf_tree_config.use_snapshot {
1136            Some(Arc::new(CPRSnapShotMgr::new(
1137                bf_tree_config.snapshot_version,
1138            )))
1139        } else {
1140            None
1141        };
1142
1143        if let Some(size) = buffer_size {
1144            bf_tree_config.cb_size_byte = size
1145        }
1146
1147        let size_classes = BfTree::create_mem_page_size_classes(
1148            bf_tree_config.cb_min_record_size,
1149            bf_tree_config.cb_max_record_size,
1150            bf_tree_config.leaf_page_size,
1151            bf_tree_config.max_fence_len,
1152            bf_tree_config.cache_only,
1153        );
1154
1155        bf_tree_config.write_ahead_log = wal_config.clone();
1156        bf_tree_config.validate()?;
1157
1158        let config = Arc::new(bf_tree_config);
1159
1160        // Start Bf-Tree re-construction using recovery_snapshot_file_path and the config
1161        let recovery_snapshot_vfs = make_vfs(
1162            &recovery_snapshot_file_backend,
1163            recovery_snapshot_file_path.as_ref(),
1164        );
1165
1166        // Step 1: reconstruct inner nodes.
1167        let mut root_page_id = bf_meta.root_id;
1168        let mut inner_node_page_buffer = SectorAlignedVector::new_zeroed(INNER_NODE_SIZE);
1169        if root_page_id.is_inner_node_pointer() {
1170            let inner_mapping: Vec<(*const InnerNode, usize)> = read_vec_from_offset(
1171                bf_meta.inner_offset,
1172                bf_meta.inner_size,
1173                &recovery_snapshot_vfs,
1174            );
1175
1176            // Sanity check on root nodes
1177            let mut root_cnt = 0;
1178            for (_ptr, offset) in &inner_mapping {
1179                recovery_snapshot_vfs.read(*offset, &mut inner_node_page_buffer);
1180                let inner_node = InnerNodeBuilder::new().build_from_slice(&inner_node_page_buffer);
1181                if unsafe { (*inner_node).is_root() } {
1182                    root_cnt += 1;
1183                }
1184                InnerNode::free_node(inner_node);
1185            }
1186            assert_eq!(root_cnt, 1, "Root count in inner mapping: {}", root_cnt);
1187
1188            let mut inner_map = HashMap::new();
1189
1190            for m in inner_mapping {
1191                inner_map.insert(m.0, m.1);
1192            }
1193            let offset = inner_map.get(&root_page_id.as_inner_node()).unwrap();
1194            recovery_snapshot_vfs.read(*offset, &mut inner_node_page_buffer);
1195            let root_page = InnerNodeBuilder::new().build_from_slice(&inner_node_page_buffer);
1196
1197            // No need for disk offset of a inner node.
1198            unsafe {
1199                (*root_page).set_disk_offset(INVALID_DISK_OFFSET as u64);
1200            }
1201            // Mark the root node
1202            unsafe {
1203                (*root_page).set_root(true);
1204            }
1205            root_page_id = PageID::from_pointer(root_page);
1206
1207            let mut inner_resolve_queue = VecDeque::from([root_page]);
1208            while !inner_resolve_queue.is_empty() {
1209                let inner_ptr = inner_resolve_queue.pop_front().unwrap();
1210                let mut inner = ReadGuard::try_read(inner_ptr).unwrap().upgrade().unwrap();
1211                if inner.as_ref().meta.children_is_leaf() {
1212                    continue;
1213                }
1214                for (idx, c) in inner.as_ref().get_child_iter().enumerate() {
1215                    let offset = inner_map.get(&c.as_inner_node()).unwrap();
1216                    recovery_snapshot_vfs.read(*offset, &mut inner_node_page_buffer);
1217                    let inner_page =
1218                        InnerNodeBuilder::new().build_from_slice(&inner_node_page_buffer);
1219                    unsafe {
1220                        (*inner_page).set_disk_offset(INVALID_DISK_OFFSET as u64);
1221                    }
1222                    let inner_id = PageID::from_pointer(inner_page);
1223                    inner.as_mut().update_at_pos(idx, inner_id);
1224                    inner_resolve_queue.push_back(inner_page);
1225                }
1226            }
1227        }
1228
1229        let raw_root_id = if root_page_id.is_id() {
1230            root_page_id.raw() | BfTree::ROOT_IS_LEAF_MASK
1231        } else {
1232            root_page_id.raw()
1233        };
1234
1235        // Step 2. Reconstruct the page table and leaf pages.
1236        // Here we differentiate cache-only from disk-backed Bf-trees as their recovery process differs.
1237        if !bf_meta.cache_only {
1238            // For disk backed bf-tree, we reconstruct the page table using base pages.
1239            let base_mapping: Vec<(PageID, usize)> = if bf_meta.base_size > 0 {
1240                read_vec_from_offset(
1241                    bf_meta.base_offset,
1242                    bf_meta.base_size,
1243                    &recovery_snapshot_vfs,
1244                )
1245            } else {
1246                Vec::new()
1247            };
1248
1249            let base_page_loc_mapping = base_mapping.into_iter().map(|(pid, offset)| {
1250                let loc = PageLocation::Base(offset);
1251                (pid, loc)
1252            });
1253
1254            // The file system of the newly constructed Bf-tree is the recovery_snapshot_file
1255            let pt = PageTable::new_from_mapping(
1256                base_page_loc_mapping,
1257                recovery_snapshot_vfs.clone(),
1258                config.clone(),
1259                snapshot_mgr.clone(),
1260            );
1261
1262            let circular_buffer = CircularBuffer::new(
1263                config.cb_size_byte,
1264                config.cb_copy_on_access_ratio,
1265                config.cb_min_record_size,
1266                config.cb_max_record_size,
1267                config.leaf_page_size,
1268                config.max_fence_len,
1269                buffer_ptr,
1270                config.cache_only,
1271            );
1272
1273            // Next, we restore the mini-pages and wire them to the corresponding base pages and update the page table.
1274            let mini_mapping: Vec<(PageID, usize)> = if bf_meta.mini_size > 0 {
1275                read_vec_from_offset(
1276                    bf_meta.mini_offset,
1277                    bf_meta.mini_size,
1278                    &recovery_snapshot_vfs,
1279                )
1280            } else {
1281                Vec::new()
1282            };
1283
1284            let mini_size_mapping: Vec<(PageID, usize)> = if bf_meta.mini_size_size > 0 {
1285                read_vec_from_offset(
1286                    bf_meta.mini_size_offset,
1287                    bf_meta.mini_size_size,
1288                    &recovery_snapshot_vfs,
1289                )
1290            } else {
1291                Vec::new()
1292            };
1293
1294            let mut mini_size_mapping_unique: HashMap<PageID, usize> = HashMap::new();
1295            for (pid, size) in mini_size_mapping {
1296                mini_size_mapping_unique.insert(pid, size);
1297            }
1298
1299            // Create the storage system first before allocating mini-pages.
1300            let storage =
1301                LeafStorage::new_inner(config.clone(), pt, circular_buffer, recovery_snapshot_vfs);
1302
1303            for (pid, offset) in &mini_mapping {
1304                let mini_size = *mini_size_mapping_unique.get(pid).unwrap();
1305
1306                // Allocate space in memory for new mini-page
1307                let mini_page_guard = match storage.alloc_mini_page(mini_size) {
1308                    Ok(mini_page_ptr) => mini_page_ptr,
1309                    Err(_) => {
1310                        return Err(ConfigError::CircularBufferSize("buffer size set too small. Consider increasing it or not specifying at all".to_string()));
1311                    }
1312                };
1313
1314                // Copy mini-page from snapshot file to the newly allocated mini-page
1315                let mut page_buffer = SectorAlignedVector::new_zeroed(mini_size);
1316                storage.vfs.read(*offset, &mut page_buffer);
1317                unsafe {
1318                    std::ptr::copy_nonoverlapping(
1319                        page_buffer.as_ptr(),
1320                        mini_page_guard.as_ptr(),
1321                        mini_size,
1322                    );
1323                }
1324
1325                // Connect the new mini-page to the corresponding base page in page table
1326                let new_mini_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
1327                let mini_page = unsafe { &mut *new_mini_ptr };
1328
1329                let mut base_page = storage.page_table.get_mut(pid);
1330                let page_loc = base_page.get_page_location().clone();
1331                match page_loc {
1332                    PageLocation::Base(off) => {
1333                        mini_page.next_level = MiniPageNextLevel::new(off);
1334                    }
1335                    _ => {
1336                        panic!("Unexpected page location for base page");
1337                    }
1338                }
1339                let mini_loc = PageLocation::Mini(new_mini_ptr);
1340
1341                // Replace the base page with the mini-page in the page table
1342                base_page.create_cache_page_loc(mini_loc);
1343            }
1344
1345            Ok(BfTree {
1346                storage,
1347                root_page_id: AtomicU64::new(raw_root_id),
1348                wal,
1349                write_load_full_page: config.write_load_full_page,
1350                cache_only: false,
1351                mini_page_size_classes: size_classes,
1352                snapshot_mgr,
1353                config,
1354                #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
1355                metrics_recorder: Some(Arc::new(ThreadLocal::new())),
1356            })
1357        } else {
1358            // For cache-only mode, we create a new page table with NULL pages
1359            let mini_mapping_unallocated: Vec<(PageID, PageLocation)> = (0..bf_meta.leaf_page_num)
1360                .map(|pid| (PageID::from_id(pid as u64), PageLocation::Null))
1361                .collect();
1362
1363            // Then we restore the mini-pages and replace the corresponding Null pages and update the page table.
1364            let mini_mapping: Vec<(PageID, usize)> = if bf_meta.mini_size > 0 {
1365                read_vec_from_offset(
1366                    bf_meta.mini_offset,
1367                    bf_meta.mini_size,
1368                    &recovery_snapshot_vfs,
1369                )
1370            } else {
1371                Vec::new()
1372            };
1373
1374            let mini_size_mapping: Vec<(PageID, usize)> = if bf_meta.mini_size_size > 0 {
1375                read_vec_from_offset(
1376                    bf_meta.mini_size_offset,
1377                    bf_meta.mini_size_size,
1378                    &recovery_snapshot_vfs,
1379                )
1380            } else {
1381                Vec::new()
1382            };
1383
1384            let mut mini_size_mapping_unique: HashMap<PageID, usize> = HashMap::new();
1385            for (pid, size) in mini_size_mapping {
1386                mini_size_mapping_unique.insert(pid, size);
1387            }
1388
1389            // For cache-only mode, the file system of the new bf-tree is memory-based
1390            let storage_vfs = make_vfs(&config.storage_backend, PathBuf::new());
1391
1392            let pt = PageTable::new_from_mapping(
1393                mini_mapping_unallocated.into_iter(),
1394                storage_vfs.clone(),
1395                config.clone(),
1396                snapshot_mgr.clone(),
1397            );
1398
1399            let circular_buffer = CircularBuffer::new(
1400                config.cb_size_byte,
1401                config.cb_copy_on_access_ratio,
1402                config.cb_min_record_size,
1403                config.cb_max_record_size,
1404                config.leaf_page_size,
1405                config.max_fence_len,
1406                buffer_ptr,
1407                config.cache_only,
1408            );
1409
1410            // Create a memory-based storage system before allocating mini-pages.
1411            let storage = LeafStorage::new_inner(config.clone(), pt, circular_buffer, storage_vfs);
1412
1413            for (pid, offset) in &mini_mapping {
1414                let mini_size = *mini_size_mapping_unique.get(pid).unwrap();
1415
1416                // Skip over null pages as the default is Null in the page table already
1417                if *offset == NULL_PAGE_LOCATION_OFFSET {
1418                    continue;
1419                }
1420
1421                // Allocate memory for a new mini-page in storage
1422                let mini_page_guard = match storage.alloc_mini_page(mini_size) {
1423                    Ok(mini_page_ptr) => mini_page_ptr,
1424                    Err(_) => {
1425                        panic!("Please increase cb_size_byte in config");
1426                    }
1427                };
1428
1429                // Copy mini-page from snapshot file to the newly allocated space.
1430                let mut page_buffer = SectorAlignedVector::new_zeroed(mini_size);
1431                recovery_snapshot_vfs.read(*offset, &mut page_buffer);
1432                unsafe {
1433                    std::ptr::copy_nonoverlapping(
1434                        page_buffer.as_ptr(),
1435                        mini_page_guard.as_ptr(),
1436                        mini_size,
1437                    );
1438                }
1439
1440                // Set its next level to oblivion
1441                let mini_page_ptr = mini_page_guard.as_ptr() as *mut LeafNode;
1442                let mini_page = unsafe { &mut *mini_page_ptr };
1443                mini_page.next_level = MiniPageNextLevel::new_null();
1444
1445                // Update the corresponding page location
1446                let mut null_page = storage.page_table.get_mut(pid);
1447                let page_loc = null_page.get_page_location().clone();
1448                match page_loc {
1449                    PageLocation::Null => {
1450                        let mini_loc = PageLocation::Mini(mini_page_ptr);
1451                        null_page.create_cache_page_loc(mini_loc);
1452                    }
1453                    _ => {
1454                        panic!("Unexpected page location for null page");
1455                    }
1456                }
1457            }
1458            Ok(BfTree {
1459                storage,
1460                root_page_id: AtomicU64::new(raw_root_id),
1461                wal,
1462                write_load_full_page: config.write_load_full_page,
1463                cache_only: true,
1464                mini_page_size_classes: size_classes,
1465                snapshot_mgr,
1466                config,
1467                #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
1468                metrics_recorder: Some(Arc::new(ThreadLocal::new())),
1469            })
1470        }
1471    }
1472}
1473
1474impl BfTree {
1475    /// Recovery a Bf-Tree from a cpr snapshot and WAL files.
1476    /// Incomplete function, internal use only
1477    pub fn recovery(
1478        recovery_snapshot_file_path: PathBuf, //  The snapshot file to recover from
1479        wal_file: impl AsRef<Path>,
1480        use_snapshot: bool,
1481        buffer_ptr: Option<*mut u8>,
1482        buffer_size: Option<usize>,
1483        wal: Option<Arc<WalConfig>>,
1484    ) {
1485        let bf_tree = BfTree::new_from_cpr_snapshot(
1486            recovery_snapshot_file_path,
1487            use_snapshot,
1488            buffer_ptr,
1489            buffer_size,
1490            wal,
1491        )
1492        .unwrap();
1493        let wal_reader = WalReader::new(wal_file, 4096);
1494
1495        for seg in wal_reader.segment_iter() {
1496            for entry in seg.entry_iter() {
1497                let log_entry = LogEntry::read_from_buffer(entry.1);
1498                match log_entry {
1499                    LogEntry::Write(op) => {
1500                        bf_tree.insert(op.key, op.value);
1501                    }
1502                    LogEntry::Split(_op) => {
1503                        todo!("implement split op in wal!")
1504                    }
1505                }
1506            }
1507        }
1508    }
1509
1510    /// Take a new CPR snapshot
1511    pub fn cpr_snapshot(&self, snapshot_file_path: impl AsRef<Path>) {
1512        if !self.config.use_snapshot {
1513            panic!("Snapshots are not enabled in the configuration");
1514        }
1515
1516        let snpshot_mgr = self.snapshot_mgr.clone().unwrap();
1517        snpshot_mgr.snapshot(self, snapshot_file_path);
1518    }
1519
1520    /// Recover a BfTree from a CPR snapshot
1521    pub fn new_from_cpr_snapshot(
1522        recovery_snapshot_file_path: impl AsRef<Path>, //  The snapshot file to recover from
1523        use_snapshot: bool,
1524        buffer_ptr: Option<*mut u8>,
1525        buffer_size: Option<usize>,
1526        wal: Option<Arc<WalConfig>>,
1527    ) -> Result<BfTree, ConfigError> {
1528        CPRSnapShotMgr::new_from_snapshot(
1529            recovery_snapshot_file_path,
1530            use_snapshot,
1531            buffer_ptr,
1532            buffer_size,
1533            wal,
1534        )
1535    }
1536
1537    /// Check if all threads are running in the next version of the current snapshot
1538    pub fn are_all_threads_in_next_snapshot_version(&self) -> bool {
1539        if let Some(snapshot_mgr) = &self.snapshot_mgr {
1540            return snapshot_mgr.are_all_threads_in_next_version();
1541        }
1542        false
1543    }
1544}
1545
1546struct SectorAlignedVector {
1547    inner: ManuallyDrop<Vec<u8>>,
1548}
1549
1550impl Drop for SectorAlignedVector {
1551    fn drop(&mut self) {
1552        let layout =
1553            std::alloc::Layout::from_size_align(self.inner.capacity(), SECTOR_SIZE).unwrap();
1554        let ptr = self.inner.as_mut_ptr();
1555        unsafe {
1556            std::alloc::dealloc(ptr, layout);
1557        }
1558    }
1559}
1560
1561impl SectorAlignedVector {
1562    fn new_zeroed(capacity: usize) -> Self {
1563        let layout = std::alloc::Layout::from_size_align(capacity, SECTOR_SIZE).unwrap();
1564        let ptr = unsafe { std::alloc::alloc_zeroed(layout) };
1565
1566        let inner = unsafe { Vec::from_raw_parts(ptr, capacity, capacity) };
1567        Self {
1568            inner: ManuallyDrop::new(inner),
1569        }
1570    }
1571}
1572
1573impl Deref for SectorAlignedVector {
1574    type Target = Vec<u8>;
1575
1576    fn deref(&self) -> &Self::Target {
1577        &self.inner
1578    }
1579}
1580
1581impl DerefMut for SectorAlignedVector {
1582    fn deref_mut(&mut self) -> &mut Self::Target {
1583        &mut self.inner
1584    }
1585}
1586
1587/// We use repr(C) for simplicity, maybe flatbuffer or bincode or even repr(Rust) is better.
1588/// But we don't care about the space here.
1589/// I don't want to introduce giant dependencies just for this.
1590#[repr(C, align(512))]
1591pub(crate) struct BfTreeMeta {
1592    magic_begin: [u8; 16],
1593    // Snapshot file metadata
1594    root_id: PageID,
1595    inner_offset: usize,
1596    inner_size: usize,
1597    mini_offset: usize,
1598    mini_size: usize,
1599    mini_size_offset: usize,
1600    mini_size_size: usize,
1601    base_offset: usize,
1602    base_size: usize,
1603    file_size: u64,
1604    leaf_page_num: usize,
1605    // Bf-tree configuration of the snapshot
1606    pub(crate) cb_size_byte: usize,
1607    pub(crate) snapshot_version: u64,
1608    pub(crate) cache_only: bool,
1609    pub(crate) read_promotion_rate: usize,
1610    pub(crate) scan_promotion_rate: usize,
1611    pub(crate) cb_min_record_size: usize,
1612    pub(crate) cb_max_record_size: usize,
1613    pub(crate) leaf_page_size: usize,
1614    pub(crate) cb_max_key_len: usize,
1615    pub(crate) max_fence_len: usize,
1616    pub(crate) cb_copy_on_access_ratio: f64,
1617    pub(crate) read_record_cache: bool,
1618    pub(crate) max_mini_page_size: usize,
1619    pub(crate) mini_page_binary_search: bool,
1620    pub(crate) write_load_full_page: bool,
1621    magic_end: [u8; 14],
1622}
1623const _: () = assert!(std::mem::size_of::<BfTreeMeta>() <= DISK_PAGE_SIZE);
1624
1625impl BfTreeMeta {
1626    fn as_slice(&self) -> &[u8] {
1627        let ptr = self as *const Self as *const u8;
1628        let size = std::mem::size_of::<Self>();
1629        unsafe { std::slice::from_raw_parts(ptr, size) }
1630    }
1631
1632    fn check_magic(&self) {
1633        assert_eq!(self.magic_begin, *BF_TREE_MAGIC_BEGIN);
1634        assert_eq!(self.magic_end, *BF_TREE_MAGIC_END);
1635    }
1636}
1637
1638/// Returns starting offset and total size written to disk.
1639fn serialize_vec_to_disk<T>(v: &[T], vfs: &Arc<dyn VfsImpl>) -> (usize, usize) {
1640    if v.is_empty() {
1641        return (0, 0);
1642    }
1643    let unaligned_ptr = v.as_ptr() as *const u8;
1644    let unaligned_size = std::mem::size_of_val(v);
1645
1646    let aligned_size = align_to_sector_size(unaligned_size);
1647    let layout = std::alloc::Layout::from_size_align(aligned_size, SECTOR_SIZE).unwrap();
1648    unsafe {
1649        let aligned_ptr = std::alloc::alloc_zeroed(layout);
1650        std::ptr::copy_nonoverlapping(unaligned_ptr, aligned_ptr, unaligned_size);
1651        let slice = std::slice::from_raw_parts(aligned_ptr, aligned_size);
1652        let offset = serialize_u8_slice_to_disk(slice, vfs);
1653        std::alloc::dealloc(aligned_ptr, layout);
1654        (offset, unaligned_size)
1655    }
1656}
1657
1658fn read_vec_from_offset<T: Clone>(offset: usize, size: usize, vfs: &Arc<dyn VfsImpl>) -> Vec<T> {
1659    assert!(size > 0);
1660    let slice = read_u8_slice_from_disk(offset, size, vfs);
1661    let ptr = slice.as_ptr() as *const T;
1662    let size = size / std::mem::size_of::<T>();
1663    let slice = unsafe { std::slice::from_raw_parts(ptr, size) };
1664    slice.to_vec()
1665}
1666
1667fn read_u8_slice_from_disk(offset: usize, size: usize, vfs: &Arc<dyn VfsImpl>) -> Vec<u8> {
1668    let mut res = Vec::new();
1669    let mut buffer = vec![0; DISK_PAGE_SIZE];
1670    for i in (0..size).step_by(DISK_PAGE_SIZE) {
1671        vfs.read(offset + i, &mut buffer); // Read one disk page at a time
1672        res.extend_from_slice(&buffer);
1673    }
1674    res
1675}
1676
1677const SECTOR_SIZE: usize = 512;
1678
1679fn align_to_sector_size(n: usize) -> usize {
1680    (n + SECTOR_SIZE - 1) & !(SECTOR_SIZE - 1)
1681}
1682
1683/// Write a slice to disk and return the start offset and page count.
1684/// TODO: we should not just return offset and count, because the offset is not necessarily continuos.
1685///     We should return a Vec of offsets. But let's keep it simple for fast prototype.
1686fn serialize_u8_slice_to_disk(slice: &[u8], vfs: &Arc<dyn VfsImpl>) -> usize {
1687    let mut start_offset = None;
1688    for chunk in slice.chunks(DISK_PAGE_SIZE) {
1689        let offset = vfs.alloc_offset(DISK_PAGE_SIZE); // Write one disk page at a time
1690        if start_offset.is_none() {
1691            start_offset = Some(offset);
1692        }
1693        vfs.write(offset, chunk);
1694    }
1695    start_offset.unwrap()
1696}
1697
1698#[cfg(test)]
1699mod tests {
1700    use crate::{nodes::leaf_node::LeafReadResult, sync::thread, BfTree, Config};
1701    use std::panic;
1702    #[cfg(feature = "shuttle")]
1703    use std::path::PathBuf;
1704    use std::str::FromStr;
1705    use std::sync::atomic::Ordering;
1706    use std::sync::{atomic::AtomicBool, Arc};
1707
1708    /// Multiple writer threads write to a BfTree in parallel while a separate thread taking multiple snapshots
1709    /// A new BfTree recovered from the snapshot should contain a prefix of all the inserts from each writer thread.
1710    /// A snapshot taken later should cover the previous snapshots.
1711    #[test]
1712    fn cpr_snapshot_disk() {
1713        // Install a panic hook that triggers the just-in-time debugger (e.g. VS debugger)
1714        // so we can inspect the state at the point of failure.
1715        panic::set_hook(Box::new(|info| {
1716            eprintln!("PANIC: {info}");
1717            unsafe { std::arch::asm!("int 3") };
1718        }));
1719
1720        let min_record_size: usize = 64;
1721        let max_record_size: usize = 2408;
1722        let leaf_page_size: usize = 8192;
1723        let snapshot_num: usize = 10;
1724        let num_threads: usize = 4;
1725        let file_path: String = "target/test_simple.bftree".to_string();
1726        let snapshot_file_path: String = "target/test_simple_snapshot.bftree".to_string();
1727
1728        let tmp_file_path = std::path::PathBuf::from_str(&file_path).unwrap();
1729        let tmp_snapshot_file_path = std::path::PathBuf::from_str(&snapshot_file_path).unwrap();
1730
1731        let mut config = Config::new(&tmp_file_path, 128 * 1024); // 128KB buffer pool. insert/split/eviction all triggered
1732        config.storage_backend(crate::StorageBackend::Std);
1733        config.cb_min_record_size = min_record_size + 2 * std::mem::size_of::<usize>();
1734        config.cb_max_record_size = max_record_size;
1735        config.leaf_page_size = leaf_page_size;
1736        config.max_fence_len = min_record_size + 2 * std::mem::size_of::<usize>();
1737        config.use_snapshot(true);
1738
1739        let bftree = Arc::new(BfTree::with_config(config.clone(), None).unwrap());
1740        let finish = Arc::new(AtomicBool::new(false));
1741
1742        let handles: Vec<_> = (0..num_threads)
1743            .map(|i| {
1744                let finish_clone = finish.clone();
1745                let bftree_clone = bftree.clone();
1746
1747                thread::spawn(move || {
1748                    let key_len: usize = min_record_size / 2 + std::mem::size_of::<usize>();
1749                    assert!(key_len * 2 <= max_record_size);
1750                    let mut key_buffer = vec![0usize; key_len / std::mem::size_of::<usize>()];
1751
1752                    let mut r: usize = 0;
1753                    while !finish_clone.load(Ordering::Relaxed) {
1754                        key_buffer.fill(r);
1755                        key_buffer[0] = i;
1756
1757                        match bftree_clone.insert(
1758                            bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
1759                            bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
1760                        ) {
1761                            crate::LeafInsertResult::Success => {}
1762                            _ => {
1763                                panic!("Insert failed");
1764                            }
1765                        }
1766                        r += 1;
1767                    }
1768                    r
1769                })
1770            })
1771            .collect();
1772
1773        thread::sleep(std::time::Duration::from_secs(5));
1774        for _ in 0..snapshot_num {
1775            // take a snapshot
1776            let _ = std::fs::remove_file(&tmp_snapshot_file_path);
1777            bftree.cpr_snapshot(&tmp_snapshot_file_path);
1778            thread::sleep(std::time::Duration::from_secs(5));
1779        }
1780
1781        // Stop all writer threads
1782        let mut rs = vec![0usize; num_threads];
1783        finish.store(true, Ordering::Relaxed);
1784        for (i, h) in handles.into_iter().enumerate() {
1785            let r = h.join().unwrap();
1786            rs[i] = r;
1787        }
1788
1789        verify_snapshot_recovery(
1790            &tmp_snapshot_file_path,
1791            num_threads,
1792            min_record_size,
1793            &rs,
1794            true,
1795        );
1796
1797        std::fs::remove_file(tmp_file_path).unwrap();
1798        std::fs::remove_file(tmp_snapshot_file_path).unwrap();
1799    }
1800
1801    /// Testing snapshot for cache-only mode with std::thread
1802    #[test]
1803    fn cpr_snapshot_cache_only() {
1804        let min_record_size: usize = 64;
1805        let max_record_size: usize = 2408;
1806        let leaf_page_size: usize = 8192;
1807        let num_threads: usize = 4;
1808
1809        let snapshot_file_path: String =
1810            "target/test_simple_cache_only_snapshot.bftree".to_string();
1811        let tmp_snapshot_file_path = std::path::PathBuf::from_str(&snapshot_file_path).unwrap();
1812
1813        let mut config = Config::default(); // Creat a CB that can hold 16 full pages
1814        config.storage_backend(crate::StorageBackend::Memory);
1815        config.file_path(":memory:");
1816        config.cache_only = true;
1817        config.cb_size_byte(1024 * 1024 * 1024);
1818        config.cb_min_record_size = min_record_size;
1819        config.cb_max_record_size = max_record_size;
1820        config.leaf_page_size = leaf_page_size;
1821        config.max_fence_len = max_record_size;
1822        config.use_snapshot(true);
1823
1824        let bftree = Arc::new(BfTree::with_config(config.clone(), None).unwrap());
1825        let finish = Arc::new(AtomicBool::new(false));
1826
1827        let handles: Vec<_> = (0..num_threads)
1828            .map(|i| {
1829                let finish_clone = finish.clone();
1830                let bftree_clone = bftree.clone();
1831
1832                thread::spawn(move || {
1833                    let key_len: usize = min_record_size / 2 + std::mem::size_of::<usize>();
1834                    assert!(key_len * 2 <= max_record_size);
1835                    let mut key_buffer = vec![0usize; key_len / std::mem::size_of::<usize>()];
1836
1837                    let mut r: usize = 0;
1838                    while !finish_clone.load(Ordering::Relaxed) {
1839                        key_buffer.fill(r);
1840                        key_buffer[0] = i;
1841
1842                        match bftree_clone.insert(
1843                            bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
1844                            bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
1845                        ) {
1846                            crate::LeafInsertResult::Success => {}
1847                            _ => {
1848                                panic!("Insert failed");
1849                            }
1850                        }
1851                        r += 1;
1852                    }
1853                    r
1854                })
1855            })
1856            .collect();
1857
1858        thread::sleep(std::time::Duration::from_secs(5));
1859        // take a snapshot
1860        bftree.cpr_snapshot(&tmp_snapshot_file_path);
1861        thread::sleep(std::time::Duration::from_secs(5));
1862
1863        // Stop all writer threads
1864        let mut rs = vec![0usize; num_threads];
1865        finish.store(true, Ordering::Relaxed);
1866        for (i, h) in handles.into_iter().enumerate() {
1867            let r = h.join().unwrap();
1868            rs[i] = r;
1869        }
1870
1871        verify_snapshot_recovery(
1872            &tmp_snapshot_file_path,
1873            num_threads,
1874            min_record_size,
1875            &rs,
1876            false,
1877        );
1878
1879        std::fs::remove_file(tmp_snapshot_file_path).unwrap();
1880    }
1881
1882    fn verify_snapshot_recovery(
1883        snapshot_file: impl AsRef<std::path::Path>,
1884        num_threads: usize,
1885        min_record_size: usize,
1886        records_num_per_threads: &Vec<usize>,
1887        check_prefix: bool,
1888    ) {
1889        let bftree = BfTree::new_from_cpr_snapshot(snapshot_file, false, None, None, None)
1890            .expect("fail to recover from snapshot");
1891
1892        let mut rs_captured = vec![0usize; num_threads];
1893        for i in 0..num_threads {
1894            let record_num = records_num_per_threads[i];
1895
1896            let key_len: usize = min_record_size / 2 + std::mem::size_of::<usize>();
1897            let mut key_buffer = vec![0usize; key_len / std::mem::size_of::<usize>()];
1898            let mut res_buffer = vec![0u8; key_len];
1899            let mut not_included = false;
1900            let mut first_gap_record: Option<usize> = None;
1901
1902            for r in 0..record_num {
1903                key_buffer.fill(r);
1904                key_buffer[0] = i;
1905
1906                match bftree.read(
1907                    bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
1908                    &mut res_buffer,
1909                ) {
1910                    LeafReadResult::Found(v) => {
1911                        if check_prefix && not_included {
1912                            // Gather diagnostic info: scan forward to find all gaps
1913                            let mut gaps = Vec::new();
1914                            let mut found_after = Vec::new();
1915                            let gap_start = first_gap_record.unwrap();
1916                            // Collect all gaps in the range [gap_start..r+50]
1917                            let scan_end = std::cmp::min(r + 50, record_num);
1918                            for scan_r in gap_start..scan_end {
1919                                key_buffer.fill(scan_r);
1920                                key_buffer[0] = i;
1921                                match bftree.read(
1922                                    bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
1923                                    &mut res_buffer,
1924                                ) {
1925                                    LeafReadResult::Found(_) => {
1926                                        found_after.push(scan_r);
1927                                    }
1928                                    LeafReadResult::NotFound => {
1929                                        gaps.push(scan_r);
1930                                    }
1931                                    _ => {}
1932                                }
1933                            }
1934                            panic!(
1935                                "PREFIX VIOLATION: thread={}, first_gap_at={}, found_record_after_gap={}, \
1936                                 total_captured_before_gap={}, total_records={}\n\
1937                                 Gaps in [{}, {}): {:?}\n\
1938                                 Found in [{}, {}): {:?}",
1939                                i, gap_start, r, rs_captured[i], record_num,
1940                                gap_start, scan_end, &gaps[..std::cmp::min(gaps.len(), 20)],
1941                                gap_start, scan_end, &found_after[..std::cmp::min(found_after.len(), 20)],
1942                            );
1943                        }
1944                        assert_eq!(v as usize, key_len);
1945                        assert_eq!(
1946                            &res_buffer,
1947                            bytemuck::must_cast_slice::<usize, u8>(&key_buffer)
1948                        );
1949                        rs_captured[i] += 1;
1950                    }
1951                    LeafReadResult::NotFound => {
1952                        if !not_included {
1953                            not_included = true;
1954                            first_gap_record = Some(r);
1955                        }
1956                    }
1957                    _ => {
1958                        panic!("Unexpected read result")
1959                    }
1960                }
1961            }
1962
1963            assert!(rs_captured[i] <= record_num);
1964            println!("Total inserted records for thread {}: {}", i, record_num);
1965            println!(
1966                "Hit ratio for thread {}: {}",
1967                i,
1968                rs_captured[i] as f64 / record_num as f64
1969            );
1970        }
1971    }
1972
1973    /// Inner body for the cache-only CPR snapshot test, parameterized by an
1974    /// iteration id so that concurrent shuttle replicas (and successive shuttle
1975    /// iterations) do not collide on the snapshot file path.
1976    #[cfg(feature = "shuttle")]
1977    fn shuttle_cpr_snapshot_cache_only_inner(iter: usize) {
1978        let min_record_size: usize = 64;
1979        let max_record_size: usize = 2408;
1980        let leaf_page_size: usize = 8192;
1981        let num_threads: usize = 4;
1982        // Bounded number of inserts per writer so shuttle iterations finish quickly.
1983        let inserts_per_thread: usize = 1_000; // 1K inserts per thread
1984
1985        let snapshot_file_path: String = format!(
1986            "target/shuttle_cpr_snapshot_cache_only_{}_{}.bftree",
1987            std::process::id(),
1988            iter,
1989        );
1990        let tmp_snapshot_file_path = std::path::PathBuf::from_str(&snapshot_file_path).unwrap();
1991
1992        let mut config = Config::default();
1993        config.storage_backend(crate::StorageBackend::Memory);
1994        config.file_path(":memory:");
1995        config.cache_only = true;
1996        // Use a buffer sufficient for the test data. 128KB is more than enough
1997        // for 16 small records and avoids allocating 1GB per shuttle iteration.
1998        config.cb_size_byte(1024 * 1024 * 1024);
1999        config.cb_min_record_size = min_record_size;
2000        config.cb_max_record_size = max_record_size;
2001        config.leaf_page_size = leaf_page_size;
2002        config.max_fence_len = max_record_size;
2003        config.use_snapshot(true);
2004
2005        let bftree = Arc::new(BfTree::with_config(config.clone(), None).unwrap());
2006        let mut rs = vec![0usize; num_threads];
2007
2008        for j in 0..2 {
2009            let handles: Vec<_> = (0..num_threads)
2010                .map(|i| {
2011                    let bftree_clone = bftree.clone();
2012                    let start_id = j * inserts_per_thread;
2013                    let end_id = start_id + inserts_per_thread;
2014                    thread::spawn(move || {
2015                        let key_len: usize = min_record_size / 2 + std::mem::size_of::<usize>();
2016                        assert!(key_len * 2 <= max_record_size);
2017                        let mut key_buffer = vec![0usize; key_len / std::mem::size_of::<usize>()];
2018
2019                        for r in start_id..end_id {
2020                            key_buffer.fill(r);
2021                            key_buffer[0] = i;
2022
2023                            match bftree_clone.insert(
2024                                bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
2025                                bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
2026                            ) {
2027                                crate::LeafInsertResult::Success => {}
2028                                _ => {
2029                                    panic!("Insert failed");
2030                                }
2031                            }
2032                        }
2033                        inserts_per_thread
2034                    })
2035                })
2036                .collect();
2037
2038            // Snapshot thread: takes the snapshot concurrently with the writers.
2039            let bftree_for_snap = bftree.clone();
2040            let snap_path = tmp_snapshot_file_path.clone();
2041            let snap_handle = thread::spawn(move || {
2042                bftree_for_snap.cpr_snapshot(&snap_path);
2043            });
2044
2045            for (i, h) in handles.into_iter().enumerate() {
2046                let r = h.join().unwrap();
2047                rs[i] += r;
2048            }
2049
2050            // Verify the snapshot taken is valid
2051            snap_handle.join().unwrap();
2052            let snap_path = tmp_snapshot_file_path.clone();
2053            verify_snapshot_recovery(&snap_path, num_threads, min_record_size, &rs, false);
2054        }
2055
2056        let _ = std::fs::remove_file(tmp_snapshot_file_path);
2057    }
2058
2059    /// Testing
2060    #[cfg(feature = "shuttle")]
2061    #[test]
2062    fn shuttle_cpr_snapshot_cache_only() {
2063        use std::sync::atomic::AtomicUsize;
2064
2065        // Unique iteration id so portfolio replicas / successive iterations do
2066        // not collide on the snapshot file path.
2067        static ITER: AtomicUsize = AtomicUsize::new(0);
2068
2069        let mut shuttle_config = shuttle::Config::default();
2070        //shuttle_config.max_steps = shuttle::MaxSteps::FailAfter(100_000);
2071        shuttle_config.max_steps = shuttle::MaxSteps::None;
2072        shuttle_config.stack_size = 1024 * 1024 * 1024; // 1GB — default 32KB overflows with deep tree ops
2073        shuttle_config.failure_persistence =
2074            shuttle::FailurePersistence::File(Some(PathBuf::from_str("target").unwrap()));
2075
2076        let mut runner = shuttle::PortfolioRunner::new(true, shuttle_config);
2077        let available_cores = std::thread::available_parallelism().unwrap().get().min(4);
2078        for _ in 0..available_cores {
2079            runner.add(shuttle::scheduler::PctScheduler::new(10, 1000));
2080        }
2081
2082        runner.run(|| {
2083            let iter = ITER.fetch_add(1, Ordering::Relaxed);
2084            shuttle_cpr_snapshot_cache_only_inner(iter);
2085            eprintln!("Completed shuttle iteration {}", iter);
2086        });
2087    }
2088
2089    /// Inner body for the disk CPR snapshot shuttle test, parameterized by an
2090    /// iteration id so that concurrent shuttle replicas (and successive shuttle
2091    /// iterations) do not collide on file paths.
2092    #[cfg(feature = "shuttle")]
2093    fn shuttle_cpr_snapshot_disk_inner(iter: usize) {
2094        let min_record_size: usize = 64;
2095        let max_record_size: usize = 2408;
2096        let leaf_page_size: usize = 8192;
2097        let num_threads: usize = 4;
2098        // 500 inserts/thread × 4 threads = 2000 records/round.
2099        // With 128KB buffer (~1600 record capacity), this triggers eviction
2100        // while staying within shuttle coroutine stack limits.
2101        let inserts_per_thread: usize = 500;
2102
2103        let file_path: String = format!(
2104            "target/shuttle_cpr_snapshot_disk_{}_{}.bftree",
2105            std::process::id(),
2106            iter,
2107        );
2108        let snapshot_file_path: String = format!(
2109            "target/shuttle_cpr_snapshot_disk_{}_{}_snap.bftree",
2110            std::process::id(),
2111            iter,
2112        );
2113        let tmp_file_path = std::path::PathBuf::from_str(&file_path).unwrap();
2114        let tmp_snapshot_file_path = std::path::PathBuf::from_str(&snapshot_file_path).unwrap();
2115
2116        let mut config = Config::new(&tmp_file_path, 128 * 1024); // 128KB buffer, triggers eviction
2117        config.storage_backend(crate::StorageBackend::Std);
2118        config.cb_min_record_size = min_record_size + 2 * std::mem::size_of::<usize>();
2119        config.cb_max_record_size = max_record_size;
2120        config.leaf_page_size = leaf_page_size;
2121        config.max_fence_len = min_record_size + 2 * std::mem::size_of::<usize>();
2122        config.use_snapshot(true);
2123
2124        let bftree = Arc::new(BfTree::with_config(config.clone(), None).unwrap());
2125        let mut rs = vec![0usize; num_threads];
2126        for j in 0..3 {
2127            let handles: Vec<_> = (0..num_threads)
2128                .map(|i| {
2129                    let bftree_clone = bftree.clone();
2130                    // Always use key range 0..inserts_per_thread so shuttle_replay
2131                    // can reproduce any failing iteration with iter=0.
2132                    let start_id = j * inserts_per_thread;
2133                    let end_id = start_id + inserts_per_thread;
2134                    thread::spawn(move || {
2135                        let key_len: usize = min_record_size / 2 + std::mem::size_of::<usize>();
2136                        assert!(key_len * 2 <= max_record_size);
2137                        let mut key_buffer = vec![0usize; key_len / std::mem::size_of::<usize>()];
2138
2139                        for r in start_id..end_id {
2140                            key_buffer.fill(r);
2141                            key_buffer[0] = i;
2142
2143                            match bftree_clone.insert(
2144                                bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
2145                                bytemuck::must_cast_slice::<usize, u8>(&key_buffer),
2146                            ) {
2147                                crate::LeafInsertResult::Success => {}
2148                                _ => {
2149                                    panic!("Insert failed");
2150                                }
2151                            }
2152                        }
2153                        inserts_per_thread
2154                    })
2155                })
2156                .collect();
2157
2158            // Snapshot thread: takes the snapshot concurrently with the writers.
2159            let bftree_for_snap = bftree.clone();
2160            let snap_path = tmp_snapshot_file_path.clone();
2161            let snap_handle = thread::spawn(move || {
2162                bftree_for_snap.cpr_snapshot(&snap_path);
2163            });
2164
2165            for (i, h) in handles.into_iter().enumerate() {
2166                let r = h.join().unwrap();
2167                rs[i] += r;
2168            }
2169
2170            snap_handle.join().unwrap();
2171
2172            // Recover from the snapshot and verify invariants
2173            verify_snapshot_recovery(
2174                &tmp_snapshot_file_path,
2175                num_threads,
2176                min_record_size,
2177                &rs,
2178                true,
2179            );
2180        }
2181        let _ = std::fs::remove_file(tmp_file_path);
2182        let _ = std::fs::remove_file(tmp_snapshot_file_path);
2183    }
2184
2185    #[cfg(feature = "shuttle")]
2186    #[test]
2187    fn shuttle_cpr_snapshot_disk() {
2188        use std::sync::atomic::AtomicUsize;
2189
2190        // Unique iteration id so portfolio replicas / successive iterations do
2191        // not collide on file paths.
2192        static ITER: AtomicUsize = AtomicUsize::new(0);
2193
2194        let mut shuttle_config = shuttle::Config::default();
2195        shuttle_config.max_steps = shuttle::MaxSteps::None;
2196        // Default shuttle stack is 32KB which is too small for deep B-tree
2197        // operations with eviction + file I/O. Increase to avoid stack overflow
2198        // that manifests as STATUS_HEAP_CORRUPTION on Windows.
2199        shuttle_config.stack_size = 4 * 1024 * 1024; // 4MB
2200        shuttle_config.failure_persistence =
2201            shuttle::FailurePersistence::File(Some(PathBuf::from_str("target").unwrap()));
2202
2203        let mut runner = shuttle::PortfolioRunner::new(true, shuttle_config);
2204        let available_cores = std::thread::available_parallelism().unwrap().get().min(4);
2205        for _ in 0..available_cores {
2206            runner.add(shuttle::scheduler::PctScheduler::new(10, 100));
2207        }
2208
2209        runner.run(|| {
2210            let iter = ITER.fetch_add(1, Ordering::Relaxed);
2211            shuttle_cpr_snapshot_disk_inner(iter);
2212        });
2213    }
2214
2215    #[cfg(feature = "shuttle")]
2216    #[test]
2217    fn shuttle_replay() {
2218        let schedule_path = "target/schedule000.txt";
2219        if !std::path::Path::new(schedule_path).exists() {
2220            eprintln!("No schedule file at {schedule_path}; run shuttle_cpr_snapshot_disk to generate one on failure.");
2221            return;
2222        }
2223
2224        // install global collector configured based on RUST_LOG env var.
2225        tracing_subscriber::fmt()
2226            .with_ansi(true)
2227            .with_thread_names(false)
2228            .with_target(false)
2229            .init();
2230
2231        shuttle::replay_from_file(|| shuttle_cpr_snapshot_disk_inner(0), schedule_path);
2232    }
2233}