Skip to main content

bf_tree/
snapshot.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT license.
3
4use std::collections::{HashMap, VecDeque};
5use std::mem::ManuallyDrop;
6use std::ops::{Deref, DerefMut};
7use std::path::Path;
8use std::path::PathBuf;
9use std::sync::Arc;
10
11#[cfg(unix)]
12use std::os::unix::fs::FileExt;
13#[cfg(windows)]
14use std::os::windows::fs::FileExt;
15
16#[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
17use thread_local::ThreadLocal;
18
19use crate::{
20    circular_buffer::{CircularBuffer, TombstoneHandle},
21    error::ConfigError,
22    fs::VfsImpl,
23    nodes::{InnerNode, InnerNodeBuilder, PageID, DISK_PAGE_SIZE, INNER_NODE_SIZE},
24    storage::{make_vfs, LeafStorage, PageLocation, PageTable},
25    sync::atomic::AtomicU64,
26    tree::eviction_callback,
27    utils::{inner_lock::ReadGuard, BfsVisitor, NodeInfo},
28    wal::{LogEntry, LogEntryImpl, WriteAheadLog},
29    BfTree, Config, WalReader,
30};
31
32const BF_TREE_MAGIC_BEGIN: &[u8; 16] = b"BF-TREE-V0-BEGIN";
33const BF_TREE_MAGIC_END: &[u8; 14] = b"BF-TREE-V0-END";
34const META_DATA_PAGE_OFFSET: usize = 0;
35
36struct SectorAlignedVector {
37    inner: ManuallyDrop<Vec<u8>>,
38}
39
40impl Drop for SectorAlignedVector {
41    fn drop(&mut self) {
42        let layout =
43            std::alloc::Layout::from_size_align(self.inner.capacity(), SECTOR_SIZE).unwrap();
44        let ptr = self.inner.as_mut_ptr();
45        unsafe {
46            std::alloc::dealloc(ptr, layout);
47        }
48    }
49}
50
51impl SectorAlignedVector {
52    fn new_zeroed(capacity: usize) -> Self {
53        let layout = std::alloc::Layout::from_size_align(capacity, SECTOR_SIZE).unwrap();
54        let ptr = unsafe { std::alloc::alloc_zeroed(layout) };
55
56        let inner = unsafe { Vec::from_raw_parts(ptr, capacity, capacity) };
57        Self {
58            inner: ManuallyDrop::new(inner),
59        }
60    }
61}
62
63impl Deref for SectorAlignedVector {
64    type Target = Vec<u8>;
65
66    fn deref(&self) -> &Self::Target {
67        &self.inner
68    }
69}
70
71impl DerefMut for SectorAlignedVector {
72    fn deref_mut(&mut self) -> &mut Self::Target {
73        &mut self.inner
74    }
75}
76
77impl BfTree {
78    /// Recovery a Bf-Tree from snapshot and WAL files.
79    /// Incomplete function, internal use only
80    pub fn recovery(
81        config_file: impl AsRef<Path>,
82        wal_file: impl AsRef<Path>,
83        buffer_ptr: Option<*mut u8>,
84    ) {
85        let bf_tree_config = Config::new_with_config_file(config_file);
86        let bf_tree = BfTree::new_from_snapshot(bf_tree_config, buffer_ptr).unwrap();
87        let wal_reader = WalReader::new(wal_file, 4096);
88
89        for seg in wal_reader.segment_iter() {
90            for entry in seg.entry_iter() {
91                let log_entry = LogEntry::read_from_buffer(entry.1);
92                match log_entry {
93                    LogEntry::Write(op) => {
94                        bf_tree.insert(op.key, op.value);
95                    }
96                    LogEntry::Split(_op) => {
97                        todo!("implement split op in wal!")
98                    }
99                }
100            }
101        }
102    }
103
104    /// Instead of creating a new Bf-Tree instance,
105    /// it loads a Bf-Tree snapshot file and resume from there.
106    pub fn new_from_snapshot(
107        bf_tree_config: Config,
108        buffer_ptr: Option<*mut u8>,
109    ) -> Result<Self, ConfigError> {
110        if !bf_tree_config.file_path.exists() {
111            // if not already exist, we just create a new empty file at the location.
112            return BfTree::with_config(bf_tree_config.clone(), buffer_ptr);
113        }
114
115        // Validate the config first
116        bf_tree_config.validate()?;
117
118        let reader = std::fs::File::open(bf_tree_config.file_path.clone()).unwrap();
119        let mut metadata = SectorAlignedVector::new_zeroed(4096);
120        #[cfg(unix)]
121        {
122            reader.read_at(&mut metadata, 0).unwrap();
123        }
124        #[cfg(windows)]
125        {
126            reader.seek_read(&mut metadata, 0).unwrap();
127        }
128
129        let bf_meta = unsafe { (metadata.as_ptr() as *const BfTreeMeta).read() };
130        bf_meta.check_magic();
131        assert_eq!(reader.metadata().unwrap().len(), bf_meta.file_size);
132
133        let config = Arc::new(bf_tree_config);
134
135        let wal = config
136            .write_ahead_log
137            .as_ref()
138            .map(|s| WriteAheadLog::new(s.clone()));
139
140        let vfs = make_vfs(&config.storage_backend, &config.file_path);
141
142        let mut page_buffer = SectorAlignedVector::new_zeroed(INNER_NODE_SIZE);
143
144        // Step 1: reconstruct inner nodes.
145        let mut root_page_id = bf_meta.root_id;
146        if root_page_id.is_inner_node_pointer() {
147            let inner_mapping: Vec<(*const InnerNode, usize)> =
148                read_vec_from_offset(bf_meta.inner_offset, bf_meta.inner_size, &vfs);
149            let mut inner_map = HashMap::new();
150
151            for m in inner_mapping {
152                inner_map.insert(m.0, m.1);
153            }
154            let offset = inner_map.get(&root_page_id.as_inner_node()).unwrap();
155            vfs.read(*offset, &mut page_buffer);
156            let root_page = InnerNodeBuilder::new().build_from_slice(&page_buffer);
157            root_page_id = PageID::from_pointer(root_page);
158
159            let mut inner_resolve_queue = VecDeque::from([root_page]);
160            while !inner_resolve_queue.is_empty() {
161                let inner_ptr = inner_resolve_queue.pop_front().unwrap();
162                let mut inner = ReadGuard::try_read(inner_ptr).unwrap().upgrade().unwrap();
163                if inner.as_ref().meta.children_is_leaf() {
164                    continue;
165                }
166                for (idx, c) in inner.as_ref().get_child_iter().enumerate() {
167                    let offset = inner_map.get(&c.as_inner_node()).unwrap();
168                    vfs.read(*offset, &mut page_buffer);
169                    let inner_page = InnerNodeBuilder::new().build_from_slice(&page_buffer);
170                    let inner_id = PageID::from_pointer(inner_page);
171                    inner.as_mut().update_at_pos(idx, inner_id);
172                    inner_resolve_queue.push_back(inner_page);
173                }
174            }
175        }
176
177        // Step 2: reconstruct leaf mappings.
178        let leaf_mapping: Vec<(PageID, usize)> =
179            read_vec_from_offset(bf_meta.leaf_offset, bf_meta.leaf_size, &vfs);
180        let leaf_mapping = leaf_mapping.into_iter().map(|(pid, offset)| {
181            let loc = PageLocation::Base(offset);
182            (pid, loc)
183        });
184        let pt = PageTable::new_from_mapping(leaf_mapping, vfs.clone(), config.clone());
185        let circular_buffer = CircularBuffer::new(
186            config.cb_size_byte,
187            config.cb_copy_on_access_ratio,
188            config.cb_min_record_size,
189            config.cb_max_record_size,
190            config.leaf_page_size,
191            config.max_fence_len,
192            buffer_ptr,
193            config.cache_only,
194        );
195
196        let storage = LeafStorage::new_inner(config.clone(), pt, circular_buffer, vfs);
197
198        let raw_root_id = if root_page_id.is_id() {
199            root_page_id.raw() | Self::ROOT_IS_LEAF_MASK
200        } else {
201            root_page_id.raw()
202        };
203
204        let size_classes = Self::create_mem_page_size_classes(
205            config.cb_min_record_size,
206            config.cb_max_record_size,
207            config.leaf_page_size,
208            config.max_fence_len,
209            config.cache_only,
210        );
211        Ok(BfTree {
212            storage,
213            root_page_id: AtomicU64::new(raw_root_id),
214            wal,
215            write_load_full_page: true,
216            cache_only: false,
217            mini_page_size_classes: size_classes,
218            config,
219            #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
220            metrics_recorder: Some(Arc::new(ThreadLocal::new())),
221        })
222    }
223
224    /// Stop the world and take a snapshot of the current state.
225    ///
226    /// Returns the snapshot file path
227    pub fn snapshot(&self) -> PathBuf {
228        let callback = |h| -> Result<TombstoneHandle, TombstoneHandle> {
229            match eviction_callback(&h, self) {
230                Ok(_) => Ok(h),
231                Err(_e) => Err(h),
232            }
233        };
234        self.storage.circular_buffer.drain(callback);
235
236        let root_id = self.get_root_page();
237        let mut inner_mapping: Vec<(*const InnerNode, usize)> = Vec::new();
238        let visitor = BfsVisitor::new_inner_only(self);
239        for node in visitor {
240            match node {
241                NodeInfo::Inner { ptr, .. } => {
242                    let inner = ReadGuard::try_read(ptr).unwrap();
243                    if inner.as_ref().is_valid_disk_offset() {
244                        let offset = inner.as_ref().disk_offset as usize;
245                        self.storage.vfs.write(offset, inner.as_ref().as_slice());
246                        inner_mapping.push((ptr, offset));
247                    }
248                }
249                NodeInfo::Leaf { level, .. } => {
250                    // corner case: we might still get a leaf node when the root is leaf...
251                    //
252                    // When ROOT is leaf, it is in `FORCE` mode, meaning that all data are write to disk.
253                    // do don't need to do anything here.
254                    assert_eq!(level, 0);
255                }
256            }
257        }
258        let (inner_offset, inner_size) = serialize_vec_to_disk(&inner_mapping, &self.storage.vfs);
259
260        let mut leaf_mapping = Vec::new();
261        let page_table_iter = self.storage.page_table.iter();
262        for (entry, pid) in page_table_iter {
263            assert!(pid.is_id());
264            match entry.try_read().unwrap().as_ref() {
265                PageLocation::Base(base) => leaf_mapping.push((pid, *base)),
266                PageLocation::Full(_) | PageLocation::Mini(_) => {
267                    unreachable!("Circular buffer should already be drained!")
268                }
269                PageLocation::Null => panic!("Snapshot of Null page"),
270            }
271        }
272
273        let (leaf_offset, leaf_size) = serialize_vec_to_disk(&leaf_mapping, &self.storage.vfs);
274
275        let file_size = (leaf_offset + align_to_sector_size(leaf_size)) as u64;
276
277        let metadata = BfTreeMeta {
278            magic_begin: *BF_TREE_MAGIC_BEGIN,
279            root_id: root_id.0,
280            inner_offset,
281            inner_size,
282            leaf_offset,
283            leaf_size,
284            file_size,
285            magic_end: *BF_TREE_MAGIC_END,
286        };
287
288        self.storage
289            .vfs
290            .write(META_DATA_PAGE_OFFSET, metadata.as_slice());
291        self.storage.vfs.flush();
292        self.config.file_path.clone()
293    }
294}
295
296/// We use repr(C) for simplicity, maybe flatbuffer or bincode or even repr(Rust) is better.
297/// But we don't care about the space here.
298/// I don't want to introduce giant dependencies just for this.
299#[repr(C, align(512))]
300struct BfTreeMeta {
301    magic_begin: [u8; 16],
302    root_id: PageID,
303    inner_offset: usize,
304    inner_size: usize,
305    leaf_offset: usize,
306    leaf_size: usize,
307    file_size: u64,
308    magic_end: [u8; 14],
309}
310const _: () = assert!(std::mem::size_of::<BfTreeMeta>() <= DISK_PAGE_SIZE);
311
312impl BfTreeMeta {
313    fn as_slice(&self) -> &[u8] {
314        let ptr = self as *const Self as *const u8;
315        let size = std::mem::size_of::<Self>();
316        unsafe { std::slice::from_raw_parts(ptr, size) }
317    }
318
319    fn check_magic(&self) {
320        assert_eq!(self.magic_begin, *BF_TREE_MAGIC_BEGIN);
321        assert_eq!(self.magic_end, *BF_TREE_MAGIC_END);
322    }
323}
324
325/// Returns starting offset and total size written to disk.
326fn serialize_vec_to_disk<T>(v: &[T], vfs: &Arc<dyn VfsImpl>) -> (usize, usize) {
327    if v.is_empty() {
328        return (0, 0);
329    }
330    let unaligned_ptr = v.as_ptr() as *const u8;
331    let unaligned_size = std::mem::size_of_val(v);
332
333    let aligned_size = align_to_sector_size(unaligned_size);
334    let layout = std::alloc::Layout::from_size_align(aligned_size, SECTOR_SIZE).unwrap();
335    unsafe {
336        let aligned_ptr = std::alloc::alloc_zeroed(layout);
337        std::ptr::copy_nonoverlapping(unaligned_ptr, aligned_ptr, unaligned_size);
338        let slice = std::slice::from_raw_parts(aligned_ptr, aligned_size);
339        let offset = serialize_u8_slice_to_disk(slice, vfs);
340        std::alloc::dealloc(aligned_ptr, layout);
341        (offset, unaligned_size)
342    }
343}
344
345fn read_vec_from_offset<T: Clone>(offset: usize, size: usize, vfs: &Arc<dyn VfsImpl>) -> Vec<T> {
346    assert!(size > 0);
347    let slice = read_u8_slice_from_disk(offset, size, vfs);
348    let ptr = slice.as_ptr() as *const T;
349    let size = size / std::mem::size_of::<T>();
350    let slice = unsafe { std::slice::from_raw_parts(ptr, size) };
351    slice.to_vec()
352}
353
354fn read_u8_slice_from_disk(offset: usize, size: usize, vfs: &Arc<dyn VfsImpl>) -> Vec<u8> {
355    let mut res = Vec::new();
356    let mut buffer = vec![0; DISK_PAGE_SIZE];
357    for i in (0..size).step_by(DISK_PAGE_SIZE) {
358        vfs.read(offset + i, &mut buffer); // Read one disk page at a time
359        res.extend_from_slice(&buffer);
360    }
361    res
362}
363
364const SECTOR_SIZE: usize = 512;
365
366fn align_to_sector_size(n: usize) -> usize {
367    (n + SECTOR_SIZE - 1) & !(SECTOR_SIZE - 1)
368}
369
370/// Write a slice to disk and return the start offset and page count.
371/// TODO: we should not just return offset and count, because the offset is not necessarily continuos.
372///     We should return a Vec of offsets. But let's keep it simple for fast prototype.
373fn serialize_u8_slice_to_disk(slice: &[u8], vfs: &Arc<dyn VfsImpl>) -> usize {
374    let mut start_offset = None;
375    for chunk in slice.chunks(DISK_PAGE_SIZE) {
376        let offset = vfs.alloc_offset(DISK_PAGE_SIZE); // Write one disk page at a time
377        if start_offset.is_none() {
378            start_offset = Some(offset);
379        }
380        vfs.write(offset, chunk);
381    }
382    start_offset.unwrap()
383}
384
385#[cfg(test)]
386mod tests {
387    use crate::{
388        nodes::leaf_node::LeafReadResult, utils::test_util::install_value_to_buffer, BfTree, Config,
389    };
390    use rstest::rstest;
391    use std::str::FromStr;
392
393    #[rstest]
394    #[case(64, 2408, 8192, 500, "target/test_simple_1.bftree")] // 1 leaf page = 2 disk page
395    #[case(64, 2048, 16384, 500, "target/test_simple_2.bftree")] // 1 leaf page = 4 disk page
396    #[case(3072, 3072, 16384, 500, "target/test_simple_3.bftree")] // 1 leaf page = 1 disk page, uniform record size
397    fn persist_roundtrip_simple(
398        #[case] min_record_size: usize,
399        #[case] max_record_size: usize,
400        #[case] leaf_page_size: usize,
401        #[case] record_cnt: usize,
402        #[case] snapshot_file_path: String,
403    ) {
404        let tmp_file_path = std::path::PathBuf::from_str(&snapshot_file_path).unwrap();
405
406        let mut config = Config::new(&tmp_file_path, leaf_page_size * 16); // Creat a CB that can hold 16 full pages
407        config.storage_backend(crate::StorageBackend::Std);
408        config.cb_min_record_size = min_record_size;
409        config.cb_max_record_size = max_record_size;
410        config.leaf_page_size = leaf_page_size;
411        config.max_fence_len = max_record_size;
412
413        let bftree = BfTree::with_config(config.clone(), None).unwrap();
414
415        let key_len: usize = min_record_size / 2;
416        let mut key_buffer = vec![0; key_len / 8];
417
418        for r in 0..record_cnt {
419            let key = install_value_to_buffer(&mut key_buffer, r);
420            bftree.insert(key, key);
421        }
422        bftree.snapshot();
423        drop(bftree);
424
425        let bftree = BfTree::new_from_snapshot(config.clone(), None).unwrap();
426        let mut out_buffer = vec![0; key_len];
427        for r in 0..record_cnt {
428            let key = install_value_to_buffer(&mut key_buffer, r);
429            let bytes_read = bftree.read(key, &mut out_buffer);
430
431            match bytes_read {
432                LeafReadResult::Found(v) => {
433                    assert_eq!(v as usize, key_len);
434                    assert_eq!(&out_buffer, key);
435                }
436                _ => {
437                    panic!("Key not found");
438                }
439            }
440        }
441
442        std::fs::remove_file(tmp_file_path).unwrap();
443    }
444}