bf_tree/
snapshot.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT license.
3
4use std::collections::{HashMap, VecDeque};
5use std::mem::ManuallyDrop;
6use std::ops::{Deref, DerefMut};
7use std::path::Path;
8use std::path::PathBuf;
9use std::sync::Arc;
10
11#[cfg(unix)]
12use std::os::unix::fs::FileExt;
13#[cfg(windows)]
14use std::os::windows::fs::FileExt;
15
16#[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
17use thread_local::ThreadLocal;
18
19use crate::{
20    circular_buffer::{CircularBuffer, TombstoneHandle},
21    fs::VfsImpl,
22    nodes::{InnerNode, InnerNodeBuilder, PageID, DISK_PAGE_SIZE, INNER_NODE_SIZE},
23    storage::{make_vfs, LeafStorage, PageLocation, PageTable},
24    sync::atomic::AtomicU64,
25    tree::eviction_callback,
26    utils::{inner_lock::ReadGuard, BfsVisitor, NodeInfo},
27    wal::{LogEntry, LogEntryImpl, WriteAheadLog},
28    BfTree, Config, WalReader,
29};
30
31const BF_TREE_MAGIC_BEGIN: &[u8; 16] = b"BF-TREE-V0-BEGIN";
32const BF_TREE_MAGIC_END: &[u8; 14] = b"BF-TREE-V0-END";
33const META_DATA_PAGE_OFFSET: usize = 0;
34
35struct SectorAlignedVector {
36    inner: ManuallyDrop<Vec<u8>>,
37}
38
39impl Drop for SectorAlignedVector {
40    fn drop(&mut self) {
41        let layout =
42            std::alloc::Layout::from_size_align(self.inner.capacity(), SECTOR_SIZE).unwrap();
43        let ptr = self.inner.as_mut_ptr();
44        unsafe {
45            std::alloc::dealloc(ptr, layout);
46        }
47    }
48}
49
50impl SectorAlignedVector {
51    fn new_zeroed(capacity: usize) -> Self {
52        let layout = std::alloc::Layout::from_size_align(capacity, SECTOR_SIZE).unwrap();
53        let ptr = unsafe { std::alloc::alloc_zeroed(layout) };
54
55        let inner = unsafe { Vec::from_raw_parts(ptr, capacity, capacity) };
56        Self {
57            inner: ManuallyDrop::new(inner),
58        }
59    }
60}
61
62impl Deref for SectorAlignedVector {
63    type Target = Vec<u8>;
64
65    fn deref(&self) -> &Self::Target {
66        &self.inner
67    }
68}
69
70impl DerefMut for SectorAlignedVector {
71    fn deref_mut(&mut self) -> &mut Self::Target {
72        &mut self.inner
73    }
74}
75
76impl BfTree {
77    pub fn recovery(
78        config_file: impl AsRef<Path>,
79        wal_file: impl AsRef<Path>,
80        buffer_ptr: Option<*mut u8>,
81    ) {
82        let bf_tree_config = Config::new_with_config_file(config_file);
83        let bf_tree = BfTree::new_from_snapshot(bf_tree_config, buffer_ptr);
84        let wal_reader = WalReader::new(wal_file, 4096);
85
86        for seg in wal_reader.segment_iter() {
87            for entry in seg.entry_iter() {
88                let log_entry = LogEntry::read_from_buffer(entry.1);
89                match log_entry {
90                    LogEntry::Write(op) => {
91                        bf_tree.insert(op.key, op.value);
92                    }
93                    LogEntry::Split(_op) => {
94                        todo!("implement split op in wal!")
95                    }
96                }
97            }
98        }
99    }
100
101    /// Instead of creating a new Bf-Tree instance,
102    /// it loads a Bf-Tree snapshot file and resume from there.
103    pub fn new_from_snapshot(bf_tree_config: Config, buffer_ptr: Option<*mut u8>) -> Self {
104        if !bf_tree_config.file_path.exists() {
105            // if not already exist, we just create a new empty file at the location.
106            return BfTree::with_config(bf_tree_config.clone(), buffer_ptr);
107        }
108
109        let reader = std::fs::File::open(bf_tree_config.file_path.clone()).unwrap();
110        let mut metadata = SectorAlignedVector::new_zeroed(4096);
111        #[cfg(unix)]
112        {
113            reader.read_at(&mut metadata, 0).unwrap();
114        }
115        #[cfg(windows)]
116        {
117            reader.seek_read(&mut metadata, 0).unwrap();
118        }
119
120        let bf_meta = unsafe { (metadata.as_ptr() as *const BfTreeMeta).read() };
121        bf_meta.check_magic();
122        assert_eq!(reader.metadata().unwrap().len(), bf_meta.file_size);
123
124        let config = Arc::new(bf_tree_config);
125
126        let wal = config
127            .write_ahead_log
128            .as_ref()
129            .map(|s| WriteAheadLog::new(s.clone()));
130
131        let vfs = make_vfs(&config.storage_backend, &config.file_path);
132
133        let mut page_buffer = SectorAlignedVector::new_zeroed(INNER_NODE_SIZE);
134
135        // Step 1: reconstruct inner nodes.
136        let mut root_page_id = bf_meta.root_id;
137        if root_page_id.is_inner_node_pointer() {
138            let inner_mapping: Vec<(*const InnerNode, usize)> =
139                read_vec_from_offset(bf_meta.inner_offset, bf_meta.inner_size, &vfs);
140            let mut inner_map = HashMap::new();
141
142            for m in inner_mapping {
143                inner_map.insert(m.0, m.1);
144            }
145            let offset = inner_map.get(&root_page_id.as_inner_node()).unwrap();
146            vfs.read(*offset, &mut page_buffer);
147            let root_page = InnerNodeBuilder::new().build_from_slice(&page_buffer);
148            root_page_id = PageID::from_pointer(root_page);
149
150            let mut inner_resolve_queue = VecDeque::from([root_page]);
151            while !inner_resolve_queue.is_empty() {
152                let inner_ptr = inner_resolve_queue.pop_front().unwrap();
153                let mut inner = ReadGuard::try_read(inner_ptr).unwrap().upgrade().unwrap();
154                if inner.as_ref().meta.children_is_leaf() {
155                    continue;
156                }
157                for (idx, c) in inner.as_ref().get_child_iter().enumerate() {
158                    let offset = inner_map.get(&c.as_inner_node()).unwrap();
159                    vfs.read(*offset, &mut page_buffer);
160                    let inner_page = InnerNodeBuilder::new().build_from_slice(&page_buffer);
161                    let inner_id = PageID::from_pointer(inner_page);
162                    inner.as_mut().update_at_pos(idx, inner_id);
163                    inner_resolve_queue.push_back(inner_page);
164                }
165            }
166        }
167
168        // Step 2: reconstruct leaf mappings.
169        let leaf_mapping: Vec<(PageID, usize)> =
170            read_vec_from_offset(bf_meta.leaf_offset, bf_meta.leaf_size, &vfs);
171        let leaf_mapping = leaf_mapping.into_iter().map(|(pid, offset)| {
172            let loc = PageLocation::Base(offset);
173            (pid, loc)
174        });
175        let pt = PageTable::new_from_mapping(leaf_mapping, vfs.clone(), config.clone());
176        let circular_buffer = CircularBuffer::new(
177            config.cb_size_byte,
178            config.cb_copy_on_access_ratio,
179            config.cb_min_record_size,
180            config.cb_max_record_size,
181            config.leaf_page_size,
182            config.max_fence_len,
183            buffer_ptr,
184            config.cache_only,
185        );
186
187        let storage = LeafStorage::new_inner(config.clone(), pt, circular_buffer, vfs);
188
189        let raw_root_id = if root_page_id.is_id() {
190            root_page_id.raw() | Self::ROOT_IS_LEAF_MASK
191        } else {
192            root_page_id.raw()
193        };
194
195        let size_classes = Self::create_mem_page_size_classes(
196            config.cb_min_record_size,
197            config.cb_max_record_size,
198            config.leaf_page_size,
199            config.max_fence_len,
200            config.cache_only,
201        );
202        BfTree {
203            storage,
204            root_page_id: AtomicU64::new(raw_root_id),
205            wal,
206            write_load_full_page: true,
207            cache_only: false,
208            mini_page_size_classes: size_classes,
209            config,
210            #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
211            metrics_recorder: Some(Arc::new(ThreadLocal::new())),
212        }
213    }
214
215    /// Stop the world and take a snapshot of the current state.
216    ///
217    /// Returns the snapshot file path
218    pub fn snapshot(&self) -> PathBuf {
219        let callback = |h| -> Result<TombstoneHandle, TombstoneHandle> {
220            match eviction_callback(&h, self) {
221                Ok(_) => Ok(h),
222                Err(_e) => Err(h),
223            }
224        };
225        self.storage.circular_buffer.drain(callback);
226
227        let root_id = self.get_root_page();
228        let mut inner_mapping: Vec<(*const InnerNode, usize)> = Vec::new();
229        let visitor = BfsVisitor::new_inner_only(self);
230        for node in visitor {
231            match node {
232                NodeInfo::Inner { ptr, .. } => {
233                    let inner = ReadGuard::try_read(ptr).unwrap();
234                    if inner.as_ref().is_valid_disk_offset() {
235                        let offset = inner.as_ref().disk_offset as usize;
236                        self.storage.vfs.write(offset, inner.as_ref().as_slice());
237                        inner_mapping.push((ptr, offset));
238                    }
239                }
240                NodeInfo::Leaf { level, .. } => {
241                    // corner case: we might still get a leaf node when the root is leaf...
242                    //
243                    // When ROOT is leaf, it is in `FORCE` mode, meaning that all data are write to disk.
244                    // do don't need to do anything here.
245                    assert_eq!(level, 0);
246                }
247            }
248        }
249        let (inner_offset, inner_size) = serialize_vec_to_disk(&inner_mapping, &self.storage.vfs);
250
251        let mut leaf_mapping = Vec::new();
252        let page_table_iter = self.storage.page_table.iter();
253        for (entry, pid) in page_table_iter {
254            assert!(pid.is_id());
255            match entry.try_read().unwrap().as_ref() {
256                PageLocation::Base(base) => leaf_mapping.push((pid, *base)),
257                PageLocation::Full(_) | PageLocation::Mini(_) => {
258                    unreachable!("Circular buffer should already be drained!")
259                }
260                PageLocation::Null => panic!("Snapshot of Null page"),
261            }
262        }
263
264        let (leaf_offset, leaf_size) = serialize_vec_to_disk(&leaf_mapping, &self.storage.vfs);
265
266        let file_size = (leaf_offset + align_to_sector_size(leaf_size)) as u64;
267
268        let metadata = BfTreeMeta {
269            magic_begin: *BF_TREE_MAGIC_BEGIN,
270            root_id: root_id.0,
271            inner_offset,
272            inner_size,
273            leaf_offset,
274            leaf_size,
275            file_size,
276            magic_end: *BF_TREE_MAGIC_END,
277        };
278
279        self.storage
280            .vfs
281            .write(META_DATA_PAGE_OFFSET, metadata.as_slice());
282        self.storage.vfs.flush();
283        self.config.file_path.clone()
284    }
285}
286
287/// We use repr(C) for simplicity, maybe flatbuffer or bincode or even repr(Rust) is better.
288/// But we don't care about the space here.
289/// I don't want to introduce giant dependencies just for this.
290#[repr(C, align(512))]
291struct BfTreeMeta {
292    magic_begin: [u8; 16],
293    root_id: PageID,
294    inner_offset: usize,
295    inner_size: usize,
296    leaf_offset: usize,
297    leaf_size: usize,
298    file_size: u64,
299    magic_end: [u8; 14],
300}
301const _: () = assert!(std::mem::size_of::<BfTreeMeta>() <= DISK_PAGE_SIZE);
302
303impl BfTreeMeta {
304    fn as_slice(&self) -> &[u8] {
305        let ptr = self as *const Self as *const u8;
306        let size = std::mem::size_of::<Self>();
307        unsafe { std::slice::from_raw_parts(ptr, size) }
308    }
309
310    fn check_magic(&self) {
311        assert_eq!(self.magic_begin, *BF_TREE_MAGIC_BEGIN);
312        assert_eq!(self.magic_end, *BF_TREE_MAGIC_END);
313    }
314}
315
316/// Returns starting offset and total size written to disk.
317fn serialize_vec_to_disk<T>(v: &[T], vfs: &Arc<dyn VfsImpl>) -> (usize, usize) {
318    if v.is_empty() {
319        return (0, 0);
320    }
321    let unaligned_ptr = v.as_ptr() as *const u8;
322    let unaligned_size = std::mem::size_of_val(v);
323
324    let aligned_size = align_to_sector_size(unaligned_size);
325    let layout = std::alloc::Layout::from_size_align(aligned_size, SECTOR_SIZE).unwrap();
326    unsafe {
327        let aligned_ptr = std::alloc::alloc_zeroed(layout);
328        std::ptr::copy_nonoverlapping(unaligned_ptr, aligned_ptr, unaligned_size);
329        let slice = std::slice::from_raw_parts(aligned_ptr, aligned_size);
330        let offset = serialize_u8_slice_to_disk(slice, vfs);
331        std::alloc::dealloc(aligned_ptr, layout);
332        (offset, unaligned_size)
333    }
334}
335
336fn read_vec_from_offset<T: Clone>(offset: usize, size: usize, vfs: &Arc<dyn VfsImpl>) -> Vec<T> {
337    assert!(size > 0);
338    let slice = read_u8_slice_from_disk(offset, size, vfs);
339    let ptr = slice.as_ptr() as *const T;
340    let size = size / std::mem::size_of::<T>();
341    let slice = unsafe { std::slice::from_raw_parts(ptr, size) };
342    slice.to_vec()
343}
344
345fn read_u8_slice_from_disk(offset: usize, size: usize, vfs: &Arc<dyn VfsImpl>) -> Vec<u8> {
346    let mut res = Vec::new();
347    let mut buffer = vec![0; DISK_PAGE_SIZE];
348    for i in (0..size).step_by(DISK_PAGE_SIZE) {
349        vfs.read(offset + i, &mut buffer); // Read one disk page at a time
350        res.extend_from_slice(&buffer);
351    }
352    res
353}
354
355const SECTOR_SIZE: usize = 512;
356
357fn align_to_sector_size(n: usize) -> usize {
358    (n + SECTOR_SIZE - 1) & !(SECTOR_SIZE - 1)
359}
360
361/// Write a slice to disk and return the start offset and page count.
362/// TODO: we should not just return offset and count, because the offset is not necessarily continuos.
363///     We should return a Vec of offsets. But let's keep it simple for fast prototype.
364fn serialize_u8_slice_to_disk(slice: &[u8], vfs: &Arc<dyn VfsImpl>) -> usize {
365    let mut start_offset = None;
366    for chunk in slice.chunks(DISK_PAGE_SIZE) {
367        let offset = vfs.alloc_offset(DISK_PAGE_SIZE); // Write one disk page at a time
368        if start_offset.is_none() {
369            start_offset = Some(offset);
370        }
371        vfs.write(offset, chunk);
372    }
373    start_offset.unwrap()
374}
375
376#[cfg(test)]
377mod tests {
378    use crate::{nodes::leaf_node::LeafReadResult, BfTree, Config};
379    use rstest::rstest;
380    use std::str::FromStr;
381
382    pub(crate) fn install_value_to_buffer(buffer: &mut [usize], key_id: usize) -> &[u8] {
383        for i in buffer.iter_mut() {
384            *i = key_id;
385        }
386
387        unsafe {
388            let ptr = buffer.as_ptr();
389            std::slice::from_raw_parts(ptr as *const u8, buffer.len() * 8)
390        }
391    }
392
393    #[rstest]
394    #[case(64, 2408, 8192, 500, "target/test_simple_1.bftree")] // 1 leaf page = 2 disk page
395    #[case(64, 2048, 16384, 500, "target/test_simple_2.bftree")] // 1 leaf page = 4 disk page
396    #[case(3072, 3072, 16384, 500, "target/test_simple_3.bftree")] // 1 leaf page = 1 disk page, uniform record size
397    fn persist_roundtrip_simple(
398        #[case] min_record_size: usize,
399        #[case] max_record_size: usize,
400        #[case] leaf_page_size: usize,
401        #[case] record_cnt: usize,
402        #[case] snapshot_file_path: String,
403    ) {
404        let tmp_file_path = std::path::PathBuf::from_str(&snapshot_file_path).unwrap();
405
406        let mut config = Config::new(&tmp_file_path, leaf_page_size * 16); // Creat a CB that can hold 16 full pages
407        config.storage_backend(crate::StorageBackend::Std);
408        config.cb_min_record_size = min_record_size;
409        config.cb_max_record_size = max_record_size;
410        config.leaf_page_size = leaf_page_size;
411        config.max_fence_len = max_record_size;
412
413        let bftree = BfTree::with_config(config.clone(), None);
414
415        let key_len: usize = min_record_size / 2;
416        let mut key_buffer = vec![0; key_len / 8];
417
418        for r in 0..record_cnt {
419            let key = install_value_to_buffer(&mut key_buffer, r);
420            bftree.insert(key, key);
421        }
422        bftree.snapshot();
423        drop(bftree);
424
425        let bftree = BfTree::new_from_snapshot(config.clone(), None);
426        let mut out_buffer = vec![0; key_len];
427        for r in 0..record_cnt {
428            let key = install_value_to_buffer(&mut key_buffer, r);
429            let bytes_read = bftree.read(key, &mut out_buffer);
430
431            match bytes_read {
432                LeafReadResult::Found(v) => {
433                    assert_eq!(v as usize, key_len);
434                    assert_eq!(&out_buffer, key);
435                }
436                _ => {
437                    panic!("Key not found");
438                }
439            }
440        }
441
442        std::fs::remove_file(tmp_file_path).unwrap();
443    }
444}