Skip to main content

bf_tree/
snapshot.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT license.
3
4use std::collections::{HashMap, VecDeque};
5use std::mem::ManuallyDrop;
6use std::ops::{Deref, DerefMut};
7use std::path::Path;
8use std::path::PathBuf;
9use std::sync::Arc;
10
11#[cfg(unix)]
12use std::os::unix::fs::FileExt;
13#[cfg(windows)]
14use std::os::windows::fs::FileExt;
15
16#[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
17use thread_local::ThreadLocal;
18
19use crate::{
20    circular_buffer::{CircularBuffer, TombstoneHandle},
21    error::ConfigError,
22    fs::VfsImpl,
23    nodes::{InnerNode, InnerNodeBuilder, PageID, DISK_PAGE_SIZE, INNER_NODE_SIZE},
24    range_scan::ScanReturnField,
25    storage::{make_vfs, LeafStorage, PageLocation, PageTable},
26    sync::atomic::AtomicU64,
27    tree::eviction_callback,
28    utils::{inner_lock::ReadGuard, BfsVisitor, NodeInfo},
29    wal::{LogEntry, LogEntryImpl, WriteAheadLog},
30    BfTree, Config, StorageBackend, WalReader,
31};
32
33const BF_TREE_MAGIC_BEGIN: &[u8; 16] = b"BF-TREE-V0-BEGIN";
34const BF_TREE_MAGIC_END: &[u8; 14] = b"BF-TREE-V0-END";
35const META_DATA_PAGE_OFFSET: usize = 0;
36
37struct SectorAlignedVector {
38    inner: ManuallyDrop<Vec<u8>>,
39}
40
41impl Drop for SectorAlignedVector {
42    fn drop(&mut self) {
43        let layout =
44            std::alloc::Layout::from_size_align(self.inner.capacity(), SECTOR_SIZE).unwrap();
45        let ptr = self.inner.as_mut_ptr();
46        unsafe {
47            std::alloc::dealloc(ptr, layout);
48        }
49    }
50}
51
52impl SectorAlignedVector {
53    fn new_zeroed(capacity: usize) -> Self {
54        let layout = std::alloc::Layout::from_size_align(capacity, SECTOR_SIZE).unwrap();
55        let ptr = unsafe { std::alloc::alloc_zeroed(layout) };
56
57        let inner = unsafe { Vec::from_raw_parts(ptr, capacity, capacity) };
58        Self {
59            inner: ManuallyDrop::new(inner),
60        }
61    }
62}
63
64impl Deref for SectorAlignedVector {
65    type Target = Vec<u8>;
66
67    fn deref(&self) -> &Self::Target {
68        &self.inner
69    }
70}
71
72impl DerefMut for SectorAlignedVector {
73    fn deref_mut(&mut self) -> &mut Self::Target {
74        &mut self.inner
75    }
76}
77
78impl BfTree {
79    /// Recovery a Bf-Tree from snapshot and WAL files.
80    /// Incomplete function, internal use only
81    pub fn recovery(
82        config_file: impl AsRef<Path>,
83        wal_file: impl AsRef<Path>,
84        buffer_ptr: Option<*mut u8>,
85    ) {
86        let bf_tree_config = Config::new_with_config_file(config_file);
87        let bf_tree = BfTree::new_from_snapshot(bf_tree_config, buffer_ptr).unwrap();
88        let wal_reader = WalReader::new(wal_file, 4096);
89
90        for seg in wal_reader.segment_iter() {
91            for entry in seg.entry_iter() {
92                let log_entry = LogEntry::read_from_buffer(entry.1);
93                match log_entry {
94                    LogEntry::Write(op) => {
95                        bf_tree.insert(op.key, op.value);
96                    }
97                    LogEntry::Split(_op) => {
98                        todo!("implement split op in wal!")
99                    }
100                }
101            }
102        }
103    }
104
105    /// Instead of creating a new Bf-Tree instance,
106    /// it loads a Bf-Tree snapshot file and resume from there.
107    pub fn new_from_snapshot(
108        bf_tree_config: Config,
109        buffer_ptr: Option<*mut u8>,
110    ) -> Result<Self, ConfigError> {
111        if !bf_tree_config.file_path.exists() {
112            // if not already exist, we just create a new empty file at the location.
113            return BfTree::with_config(bf_tree_config.clone(), buffer_ptr);
114        }
115
116        // Validate the config first
117        bf_tree_config.validate()?;
118
119        let reader = std::fs::File::open(bf_tree_config.file_path.clone()).unwrap();
120        let mut metadata = SectorAlignedVector::new_zeroed(4096);
121        #[cfg(unix)]
122        {
123            reader.read_at(&mut metadata, 0).unwrap();
124        }
125        #[cfg(windows)]
126        {
127            reader.seek_read(&mut metadata, 0).unwrap();
128        }
129
130        let bf_meta = unsafe { (metadata.as_ptr() as *const BfTreeMeta).read() };
131        bf_meta.check_magic();
132        assert_eq!(reader.metadata().unwrap().len(), bf_meta.file_size);
133
134        let config = Arc::new(bf_tree_config);
135
136        let wal = config
137            .write_ahead_log
138            .as_ref()
139            .map(|s| WriteAheadLog::new(s.clone()));
140
141        let vfs = make_vfs(&config.storage_backend, &config.file_path);
142
143        let mut page_buffer = SectorAlignedVector::new_zeroed(INNER_NODE_SIZE);
144
145        // Step 1: reconstruct inner nodes.
146        let mut root_page_id = bf_meta.root_id;
147        if root_page_id.is_inner_node_pointer() {
148            let inner_mapping: Vec<(*const InnerNode, usize)> =
149                read_vec_from_offset(bf_meta.inner_offset, bf_meta.inner_size, &vfs);
150            let mut inner_map = HashMap::new();
151
152            for m in inner_mapping {
153                inner_map.insert(m.0, m.1);
154            }
155            let offset = inner_map.get(&root_page_id.as_inner_node()).unwrap();
156            vfs.read(*offset, &mut page_buffer);
157            let root_page = InnerNodeBuilder::new().build_from_slice(&page_buffer);
158            root_page_id = PageID::from_pointer(root_page);
159
160            let mut inner_resolve_queue = VecDeque::from([root_page]);
161            while !inner_resolve_queue.is_empty() {
162                let inner_ptr = inner_resolve_queue.pop_front().unwrap();
163                let mut inner = ReadGuard::try_read(inner_ptr).unwrap().upgrade().unwrap();
164                if inner.as_ref().meta.children_is_leaf() {
165                    continue;
166                }
167                for (idx, c) in inner.as_ref().get_child_iter().enumerate() {
168                    let offset = inner_map.get(&c.as_inner_node()).unwrap();
169                    vfs.read(*offset, &mut page_buffer);
170                    let inner_page = InnerNodeBuilder::new().build_from_slice(&page_buffer);
171                    let inner_id = PageID::from_pointer(inner_page);
172                    inner.as_mut().update_at_pos(idx, inner_id);
173                    inner_resolve_queue.push_back(inner_page);
174                }
175            }
176        }
177
178        // Step 2: reconstruct leaf mappings.
179        let leaf_mapping: Vec<(PageID, usize)> =
180            read_vec_from_offset(bf_meta.leaf_offset, bf_meta.leaf_size, &vfs);
181        let leaf_mapping = leaf_mapping.into_iter().map(|(pid, offset)| {
182            let loc = PageLocation::Base(offset);
183            (pid, loc)
184        });
185        let pt = PageTable::new_from_mapping(leaf_mapping, vfs.clone(), config.clone());
186        let circular_buffer = CircularBuffer::new(
187            config.cb_size_byte,
188            config.cb_copy_on_access_ratio,
189            config.cb_min_record_size,
190            config.cb_max_record_size,
191            config.leaf_page_size,
192            config.max_fence_len,
193            buffer_ptr,
194            config.cache_only,
195        );
196
197        let storage = LeafStorage::new_inner(config.clone(), pt, circular_buffer, vfs);
198
199        let raw_root_id = if root_page_id.is_id() {
200            root_page_id.raw() | Self::ROOT_IS_LEAF_MASK
201        } else {
202            root_page_id.raw()
203        };
204
205        let size_classes = Self::create_mem_page_size_classes(
206            config.cb_min_record_size,
207            config.cb_max_record_size,
208            config.leaf_page_size,
209            config.max_fence_len,
210            config.cache_only,
211        );
212        Ok(BfTree {
213            storage,
214            root_page_id: AtomicU64::new(raw_root_id),
215            wal,
216            write_load_full_page: true,
217            cache_only: false,
218            mini_page_size_classes: size_classes,
219            config,
220            #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
221            metrics_recorder: Some(Arc::new(ThreadLocal::new())),
222        })
223    }
224
225    /// Stop the world and take a snapshot of the current state.
226    ///
227    /// Returns the snapshot file path
228    pub fn snapshot(&self) -> PathBuf {
229        let callback = |h| -> Result<TombstoneHandle, TombstoneHandle> {
230            match eviction_callback(&h, self) {
231                Ok(_) => Ok(h),
232                Err(_e) => Err(h),
233            }
234        };
235        self.storage.circular_buffer.drain(callback);
236
237        let root_id = self.get_root_page();
238        let mut inner_mapping: Vec<(*const InnerNode, usize)> = Vec::new();
239        let visitor = BfsVisitor::new_inner_only(self);
240        for node in visitor {
241            match node {
242                NodeInfo::Inner { ptr, .. } => {
243                    let inner = ReadGuard::try_read(ptr).unwrap();
244                    if inner.as_ref().is_valid_disk_offset() {
245                        let offset = inner.as_ref().disk_offset as usize;
246                        self.storage.vfs.write(offset, inner.as_ref().as_slice());
247                        inner_mapping.push((ptr, offset));
248                    }
249                }
250                NodeInfo::Leaf { level, .. } => {
251                    // corner case: we might still get a leaf node when the root is leaf...
252                    //
253                    // When ROOT is leaf, it is in `FORCE` mode, meaning that all data are write to disk.
254                    // do don't need to do anything here.
255                    assert_eq!(level, 0);
256                }
257            }
258        }
259        let (inner_offset, inner_size) = serialize_vec_to_disk(&inner_mapping, &self.storage.vfs);
260
261        let mut leaf_mapping = Vec::new();
262        let page_table_iter = self.storage.page_table.iter();
263        for (entry, pid) in page_table_iter {
264            assert!(pid.is_id());
265            match entry.try_read().unwrap().as_ref() {
266                PageLocation::Base(base) => leaf_mapping.push((pid, *base)),
267                PageLocation::Full(_) | PageLocation::Mini(_) => {
268                    unreachable!("Circular buffer should already be drained!")
269                }
270                PageLocation::Null => panic!("Snapshot of Null page"),
271            }
272        }
273
274        let (leaf_offset, leaf_size) = serialize_vec_to_disk(&leaf_mapping, &self.storage.vfs);
275
276        let file_size = (leaf_offset + align_to_sector_size(leaf_size)) as u64;
277
278        let metadata = BfTreeMeta {
279            magic_begin: *BF_TREE_MAGIC_BEGIN,
280            root_id: root_id.0,
281            inner_offset,
282            inner_size,
283            leaf_offset,
284            leaf_size,
285            file_size,
286            magic_end: *BF_TREE_MAGIC_END,
287        };
288
289        self.storage
290            .vfs
291            .write(META_DATA_PAGE_OFFSET, metadata.as_slice());
292        self.storage.vfs.flush();
293        self.config.file_path.clone()
294    }
295
296    /// Snapshot an in-memory Bf-Tree to a file on disk.
297    ///
298    /// This works by scanning all live records from the in-memory tree and inserting
299    /// them one-by-one into a new file-backed tree, then calling [`snapshot`] on it.
300    /// Records are streamed without buffering the entire dataset in memory.
301    ///
302    /// For `cache_only` trees (where scan is not supported), falls back to collecting
303    /// records via BFS traversal before inserting.
304    ///
305    /// Returns the snapshot file path.
306    ///
307    /// # Panics
308    /// Panics if `snapshot_path` already exists.
309    pub fn snapshot_memory_to_disk(&self, snapshot_path: impl AsRef<Path>) -> PathBuf {
310        let snapshot_path = snapshot_path.as_ref();
311        assert!(
312            !snapshot_path.exists(),
313            "snapshot_memory_to_disk: target file already exists: {:?}",
314            snapshot_path
315        );
316
317        // Build a disk-backed config from the current config.
318        let mut disk_config = self.config.as_ref().clone();
319        disk_config.storage_backend(StorageBackend::Std);
320        disk_config.cache_only(false);
321        disk_config.file_path(snapshot_path);
322
323        let disk_tree = BfTree::with_config(disk_config, None)
324            .expect("Failed to create disk-backed BfTree for snapshot");
325
326        if self.cache_only {
327            // cache_only mode does not support scan, so throw an error.
328            panic!("snapshot_memory_to_disk does not support cache_only trees");
329        } else {
330            // Use the scan operator to stream records directly into the disk tree
331            // without buffering the entire dataset in memory.
332            Self::copy_records_via_scan(self, &disk_tree);
333        }
334
335        disk_tree.snapshot()
336    }
337
338    /// Copy all live records from `src` to `dst` using the scan operator.
339    /// This streams one record at a time, avoiding large intermediate allocations.
340    fn copy_records_via_scan(src: &BfTree, dst: &BfTree) {
341        // Allocate a single reusable buffer large enough for any key+value.
342        let buf_size = src.config.leaf_page_size;
343        let mut scan_buf = vec![0u8; buf_size];
344
345        // Start scanning from the smallest possible key (a single 0x00 byte).
346        let start_key: &[u8] = &[0u8];
347        let mut scan_iter =
348            match src.scan_with_count(start_key, usize::MAX, ScanReturnField::KeyAndValue) {
349                Ok(iter) => iter,
350                Err(_) => return, // empty tree or other issue
351            };
352
353        while let Some((key_len, value_len)) = scan_iter.next(&mut scan_buf) {
354            let key = &scan_buf[..key_len];
355            let value = &scan_buf[key_len..key_len + value_len];
356            dst.insert(key, value);
357        }
358    }
359
360    /// Load an on-disk Bf-Tree snapshot and return a new in-memory (cache_only) Bf-Tree
361    /// containing all its records.
362    ///
363    /// The caller provides a `Config` that controls the in-memory tree's parameters
364    /// (circular buffer size, record sizes, leaf page size, etc.).
365    /// `storage_backend` and `cache_only` are overridden automatically.
366    ///
367    /// # Arguments
368    /// * `snapshot_path` – path to an existing snapshot file on disk.
369    /// * `memory_config` – configuration for the resulting in-memory tree.
370    ///
371    /// # Panics
372    /// Panics if the snapshot file does not exist.
373    pub fn new_from_snapshot_disk_to_memory(
374        snapshot_path: impl AsRef<Path>,
375        memory_config: Config,
376    ) -> Result<Self, ConfigError> {
377        let snapshot_path = snapshot_path.as_ref();
378        assert!(
379            snapshot_path.exists(),
380            "new_from_snapshot_disk_to_memory: snapshot file does not exist: {:?}",
381            snapshot_path
382        );
383
384        // Step 1: Open the on-disk snapshot as a normal disk-backed tree.
385        let mut disk_config = memory_config.clone();
386        disk_config.storage_backend(StorageBackend::Std);
387        disk_config.cache_only(false);
388        disk_config.file_path(snapshot_path);
389
390        let disk_tree = BfTree::new_from_snapshot(disk_config, None)?;
391
392        // Step 2: Create the in-memory tree.
393        // Use Memory backend with cache_only=false so that evicted pages
394        // are stored as heap-backed base pages (no data loss).
395        let mut mem_config = memory_config;
396        mem_config.storage_backend(StorageBackend::Memory);
397        mem_config.cache_only(false);
398
399        let mem_tree = BfTree::with_config(mem_config, None)?;
400
401        // Step 3: Stream records from the disk tree into the memory tree via scan.
402        // The disk tree is never cache_only, so scan is always available.
403        Self::copy_records_via_scan(&disk_tree, &mem_tree);
404
405        Ok(mem_tree)
406    }
407}
408
409/// We use repr(C) for simplicity, maybe flatbuffer or bincode or even repr(Rust) is better.
410/// But we don't care about the space here.
411/// I don't want to introduce giant dependencies just for this.
412#[repr(C, align(512))]
413struct BfTreeMeta {
414    magic_begin: [u8; 16],
415    root_id: PageID,
416    inner_offset: usize,
417    inner_size: usize,
418    leaf_offset: usize,
419    leaf_size: usize,
420    file_size: u64,
421    magic_end: [u8; 14],
422}
423const _: () = assert!(std::mem::size_of::<BfTreeMeta>() <= DISK_PAGE_SIZE);
424
425impl BfTreeMeta {
426    fn as_slice(&self) -> &[u8] {
427        let ptr = self as *const Self as *const u8;
428        let size = std::mem::size_of::<Self>();
429        unsafe { std::slice::from_raw_parts(ptr, size) }
430    }
431
432    fn check_magic(&self) {
433        assert_eq!(self.magic_begin, *BF_TREE_MAGIC_BEGIN);
434        assert_eq!(self.magic_end, *BF_TREE_MAGIC_END);
435    }
436}
437
438/// Returns starting offset and total size written to disk.
439fn serialize_vec_to_disk<T>(v: &[T], vfs: &Arc<dyn VfsImpl>) -> (usize, usize) {
440    if v.is_empty() {
441        return (0, 0);
442    }
443    let unaligned_ptr = v.as_ptr() as *const u8;
444    let unaligned_size = std::mem::size_of_val(v);
445
446    let aligned_size = align_to_sector_size(unaligned_size);
447    let layout = std::alloc::Layout::from_size_align(aligned_size, SECTOR_SIZE).unwrap();
448    unsafe {
449        let aligned_ptr = std::alloc::alloc_zeroed(layout);
450        std::ptr::copy_nonoverlapping(unaligned_ptr, aligned_ptr, unaligned_size);
451        let slice = std::slice::from_raw_parts(aligned_ptr, aligned_size);
452        let offset = serialize_u8_slice_to_disk(slice, vfs);
453        std::alloc::dealloc(aligned_ptr, layout);
454        (offset, unaligned_size)
455    }
456}
457
458fn read_vec_from_offset<T: Clone>(offset: usize, size: usize, vfs: &Arc<dyn VfsImpl>) -> Vec<T> {
459    assert!(size > 0);
460    let slice = read_u8_slice_from_disk(offset, size, vfs);
461    let ptr = slice.as_ptr() as *const T;
462    let size = size / std::mem::size_of::<T>();
463    let slice = unsafe { std::slice::from_raw_parts(ptr, size) };
464    slice.to_vec()
465}
466
467fn read_u8_slice_from_disk(offset: usize, size: usize, vfs: &Arc<dyn VfsImpl>) -> Vec<u8> {
468    let mut res = Vec::new();
469    let mut buffer = vec![0; DISK_PAGE_SIZE];
470    for i in (0..size).step_by(DISK_PAGE_SIZE) {
471        vfs.read(offset + i, &mut buffer); // Read one disk page at a time
472        res.extend_from_slice(&buffer);
473    }
474    res
475}
476
477const SECTOR_SIZE: usize = 512;
478
479fn align_to_sector_size(n: usize) -> usize {
480    (n + SECTOR_SIZE - 1) & !(SECTOR_SIZE - 1)
481}
482
483/// Write a slice to disk and return the start offset and page count.
484/// TODO: we should not just return offset and count, because the offset is not necessarily continuos.
485///     We should return a Vec of offsets. But let's keep it simple for fast prototype.
486fn serialize_u8_slice_to_disk(slice: &[u8], vfs: &Arc<dyn VfsImpl>) -> usize {
487    let mut start_offset = None;
488    for chunk in slice.chunks(DISK_PAGE_SIZE) {
489        let offset = vfs.alloc_offset(DISK_PAGE_SIZE); // Write one disk page at a time
490        if start_offset.is_none() {
491            start_offset = Some(offset);
492        }
493        vfs.write(offset, chunk);
494    }
495    start_offset.unwrap()
496}
497
498#[cfg(test)]
499mod tests {
500    use crate::{
501        nodes::leaf_node::LeafReadResult, utils::test_util::install_value_to_buffer, BfTree, Config,
502    };
503    use rstest::rstest;
504    use std::str::FromStr;
505
506    #[rstest]
507    #[case(64, 2408, 8192, 500, "target/test_simple_1.bftree")] // 1 leaf page = 2 disk page
508    #[case(64, 2048, 16384, 500, "target/test_simple_2.bftree")] // 1 leaf page = 4 disk page
509    #[case(3072, 3072, 16384, 500, "target/test_simple_3.bftree")] // 1 leaf page = 1 disk page, uniform record size
510    fn persist_roundtrip_simple(
511        #[case] min_record_size: usize,
512        #[case] max_record_size: usize,
513        #[case] leaf_page_size: usize,
514        #[case] record_cnt: usize,
515        #[case] snapshot_file_path: String,
516    ) {
517        let tmp_file_path = std::path::PathBuf::from_str(&snapshot_file_path).unwrap();
518
519        let mut config = Config::new(&tmp_file_path, leaf_page_size * 16); // Creat a CB that can hold 16 full pages
520        config.storage_backend(crate::StorageBackend::Std);
521        config.cb_min_record_size = min_record_size;
522        config.cb_max_record_size = max_record_size;
523        config.leaf_page_size = leaf_page_size;
524        config.max_fence_len = max_record_size;
525
526        let bftree = BfTree::with_config(config.clone(), None).unwrap();
527
528        let key_len: usize = min_record_size / 2;
529        let mut key_buffer = vec![0; key_len / 8];
530
531        for r in 0..record_cnt {
532            let key = install_value_to_buffer(&mut key_buffer, r);
533            bftree.insert(key, key);
534        }
535        bftree.snapshot();
536        drop(bftree);
537
538        let bftree = BfTree::new_from_snapshot(config.clone(), None).unwrap();
539        let mut out_buffer = vec![0; key_len];
540        for r in 0..record_cnt {
541            let key = install_value_to_buffer(&mut key_buffer, r);
542            let bytes_read = bftree.read(key, &mut out_buffer);
543
544            match bytes_read {
545                LeafReadResult::Found(v) => {
546                    assert_eq!(v as usize, key_len);
547                    assert_eq!(&out_buffer, key);
548                }
549                _ => {
550                    panic!("Key not found");
551                }
552            }
553        }
554
555        std::fs::remove_file(tmp_file_path).unwrap();
556    }
557
558    #[test]
559    fn snapshot_memory_to_disk_roundtrip() {
560        let snapshot_path = std::path::PathBuf::from_str("target/test_mem_to_disk.bftree").unwrap();
561        // Clean up in case a previous run left the file behind
562        let _ = std::fs::remove_file(&snapshot_path);
563
564        let min_record_size: usize = 64;
565        let max_record_size: usize = 2048;
566        let leaf_page_size: usize = 8192;
567
568        // Create an in-memory (non-cache_only) BfTree
569        let mut config = Config::new(":memory:", leaf_page_size * 16);
570        config.cb_min_record_size = min_record_size;
571        config.cb_max_record_size = max_record_size;
572        config.leaf_page_size = leaf_page_size;
573        config.max_fence_len = max_record_size;
574
575        let bftree = BfTree::with_config(config.clone(), None).unwrap();
576
577        let key_len: usize = min_record_size / 2;
578        let record_cnt = 200;
579        let mut key_buffer = vec![0usize; key_len / 8];
580
581        for r in 0..record_cnt {
582            let key = install_value_to_buffer(&mut key_buffer, r);
583            bftree.insert(key, key);
584        }
585
586        // Snapshot the in-memory tree to disk
587        let path = bftree.snapshot_memory_to_disk(&snapshot_path);
588        assert_eq!(path, snapshot_path);
589        assert!(snapshot_path.exists());
590        drop(bftree);
591
592        // Reload the snapshot into a disk-backed tree and verify all records
593        let mut disk_config = Config::new(&snapshot_path, leaf_page_size * 16);
594        disk_config.storage_backend(crate::StorageBackend::Std);
595        disk_config.cb_min_record_size = min_record_size;
596        disk_config.cb_max_record_size = max_record_size;
597        disk_config.leaf_page_size = leaf_page_size;
598        disk_config.max_fence_len = max_record_size;
599
600        let loaded = BfTree::new_from_snapshot(disk_config, None).unwrap();
601        let mut out_buffer = vec![0u8; key_len];
602        for r in 0..record_cnt {
603            let key = install_value_to_buffer(&mut key_buffer, r);
604            let result = loaded.read(key, &mut out_buffer);
605            match result {
606                LeafReadResult::Found(v) => {
607                    assert_eq!(v as usize, key_len);
608                    assert_eq!(&out_buffer[..key_len], key);
609                }
610                other => {
611                    panic!("Key {r} not found, got: {:?}", other);
612                }
613            }
614        }
615
616        std::fs::remove_file(snapshot_path).unwrap();
617    }
618
619    #[test]
620    fn snapshot_disk_to_memory_roundtrip() {
621        let snapshot_path = std::path::PathBuf::from_str("target/test_disk_to_mem.bftree").unwrap();
622        let _ = std::fs::remove_file(&snapshot_path);
623
624        let min_record_size: usize = 64;
625        let max_record_size: usize = 2048;
626        let leaf_page_size: usize = 8192;
627        let record_cnt: usize = 500;
628
629        // Step 1: Build a disk-backed tree, populate it, snapshot to disk.
630        {
631            let mut config = Config::new(&snapshot_path, leaf_page_size * 16);
632            config.storage_backend(crate::StorageBackend::Std);
633            config.cb_min_record_size = min_record_size;
634            config.cb_max_record_size = max_record_size;
635            config.leaf_page_size = leaf_page_size;
636            config.max_fence_len = max_record_size;
637
638            let tree = BfTree::with_config(config, None).unwrap();
639            let key_len = min_record_size / 2;
640            let mut key_buffer = vec![0usize; key_len / 8];
641
642            for r in 0..record_cnt {
643                let key = install_value_to_buffer(&mut key_buffer, r);
644                tree.insert(key, key);
645            }
646            tree.snapshot();
647        }
648
649        // Step 2: Load the snapshot into an in-memory (cache_only) tree.
650        let mut mem_config = Config::new(":memory:", leaf_page_size * 16);
651        mem_config.cb_min_record_size = min_record_size;
652        mem_config.cb_max_record_size = max_record_size;
653        mem_config.leaf_page_size = leaf_page_size;
654        mem_config.max_fence_len = max_record_size;
655
656        let mem_tree =
657            BfTree::new_from_snapshot_disk_to_memory(&snapshot_path, mem_config).unwrap();
658
659        // Step 3: Verify all records are present.
660        let key_len = min_record_size / 2;
661        let mut key_buffer = vec![0usize; key_len / 8];
662        let mut out_buffer = vec![0u8; key_len];
663        for r in 0..record_cnt {
664            let key = install_value_to_buffer(&mut key_buffer, r);
665            let result = mem_tree.read(key, &mut out_buffer);
666            match result {
667                LeafReadResult::Found(v) => {
668                    assert_eq!(v as usize, key_len);
669                    assert_eq!(&out_buffer[..key_len], key);
670                }
671                other => {
672                    panic!("Key {r} not found, got: {:?}", other);
673                }
674            }
675        }
676
677        std::fs::remove_file(snapshot_path).unwrap();
678    }
679}