1use std::collections::{HashMap, VecDeque};
5use std::mem::ManuallyDrop;
6use std::ops::{Deref, DerefMut};
7use std::path::Path;
8use std::path::PathBuf;
9use std::sync::Arc;
10
11#[cfg(unix)]
12use std::os::unix::fs::FileExt;
13#[cfg(windows)]
14use std::os::windows::fs::FileExt;
15
16#[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
17use thread_local::ThreadLocal;
18
19use crate::{
20 circular_buffer::{CircularBuffer, TombstoneHandle},
21 error::ConfigError,
22 fs::VfsImpl,
23 nodes::{InnerNode, InnerNodeBuilder, PageID, DISK_PAGE_SIZE, INNER_NODE_SIZE},
24 storage::{make_vfs, LeafStorage, PageLocation, PageTable},
25 sync::atomic::AtomicU64,
26 tree::eviction_callback,
27 utils::{inner_lock::ReadGuard, BfsVisitor, NodeInfo},
28 wal::{LogEntry, LogEntryImpl, WriteAheadLog},
29 BfTree, Config, WalReader,
30};
31
32const BF_TREE_MAGIC_BEGIN: &[u8; 16] = b"BF-TREE-V0-BEGIN";
33const BF_TREE_MAGIC_END: &[u8; 14] = b"BF-TREE-V0-END";
34const META_DATA_PAGE_OFFSET: usize = 0;
35
36struct SectorAlignedVector {
37 inner: ManuallyDrop<Vec<u8>>,
38}
39
40impl Drop for SectorAlignedVector {
41 fn drop(&mut self) {
42 let layout =
43 std::alloc::Layout::from_size_align(self.inner.capacity(), SECTOR_SIZE).unwrap();
44 let ptr = self.inner.as_mut_ptr();
45 unsafe {
46 std::alloc::dealloc(ptr, layout);
47 }
48 }
49}
50
51impl SectorAlignedVector {
52 fn new_zeroed(capacity: usize) -> Self {
53 let layout = std::alloc::Layout::from_size_align(capacity, SECTOR_SIZE).unwrap();
54 let ptr = unsafe { std::alloc::alloc_zeroed(layout) };
55
56 let inner = unsafe { Vec::from_raw_parts(ptr, capacity, capacity) };
57 Self {
58 inner: ManuallyDrop::new(inner),
59 }
60 }
61}
62
63impl Deref for SectorAlignedVector {
64 type Target = Vec<u8>;
65
66 fn deref(&self) -> &Self::Target {
67 &self.inner
68 }
69}
70
71impl DerefMut for SectorAlignedVector {
72 fn deref_mut(&mut self) -> &mut Self::Target {
73 &mut self.inner
74 }
75}
76
77impl BfTree {
78 pub fn recovery(
81 config_file: impl AsRef<Path>,
82 wal_file: impl AsRef<Path>,
83 buffer_ptr: Option<*mut u8>,
84 ) {
85 let bf_tree_config = Config::new_with_config_file(config_file);
86 let bf_tree = BfTree::new_from_snapshot(bf_tree_config, buffer_ptr).unwrap();
87 let wal_reader = WalReader::new(wal_file, 4096);
88
89 for seg in wal_reader.segment_iter() {
90 for entry in seg.entry_iter() {
91 let log_entry = LogEntry::read_from_buffer(entry.1);
92 match log_entry {
93 LogEntry::Write(op) => {
94 bf_tree.insert(op.key, op.value);
95 }
96 LogEntry::Split(_op) => {
97 todo!("implement split op in wal!")
98 }
99 }
100 }
101 }
102 }
103
104 pub fn new_from_snapshot(
107 bf_tree_config: Config,
108 buffer_ptr: Option<*mut u8>,
109 ) -> Result<Self, ConfigError> {
110 if !bf_tree_config.file_path.exists() {
111 return BfTree::with_config(bf_tree_config.clone(), buffer_ptr);
113 }
114
115 bf_tree_config.validate()?;
117
118 let reader = std::fs::File::open(bf_tree_config.file_path.clone()).unwrap();
119 let mut metadata = SectorAlignedVector::new_zeroed(4096);
120 #[cfg(unix)]
121 {
122 reader.read_at(&mut metadata, 0).unwrap();
123 }
124 #[cfg(windows)]
125 {
126 reader.seek_read(&mut metadata, 0).unwrap();
127 }
128
129 let bf_meta = unsafe { (metadata.as_ptr() as *const BfTreeMeta).read() };
130 bf_meta.check_magic();
131 assert_eq!(reader.metadata().unwrap().len(), bf_meta.file_size);
132
133 let config = Arc::new(bf_tree_config);
134
135 let wal = config
136 .write_ahead_log
137 .as_ref()
138 .map(|s| WriteAheadLog::new(s.clone()));
139
140 let vfs = make_vfs(&config.storage_backend, &config.file_path);
141
142 let mut page_buffer = SectorAlignedVector::new_zeroed(INNER_NODE_SIZE);
143
144 let mut root_page_id = bf_meta.root_id;
146 if root_page_id.is_inner_node_pointer() {
147 let inner_mapping: Vec<(*const InnerNode, usize)> =
148 read_vec_from_offset(bf_meta.inner_offset, bf_meta.inner_size, &vfs);
149 let mut inner_map = HashMap::new();
150
151 for m in inner_mapping {
152 inner_map.insert(m.0, m.1);
153 }
154 let offset = inner_map.get(&root_page_id.as_inner_node()).unwrap();
155 vfs.read(*offset, &mut page_buffer);
156 let root_page = InnerNodeBuilder::new().build_from_slice(&page_buffer);
157 root_page_id = PageID::from_pointer(root_page);
158
159 let mut inner_resolve_queue = VecDeque::from([root_page]);
160 while !inner_resolve_queue.is_empty() {
161 let inner_ptr = inner_resolve_queue.pop_front().unwrap();
162 let mut inner = ReadGuard::try_read(inner_ptr).unwrap().upgrade().unwrap();
163 if inner.as_ref().meta.children_is_leaf() {
164 continue;
165 }
166 for (idx, c) in inner.as_ref().get_child_iter().enumerate() {
167 let offset = inner_map.get(&c.as_inner_node()).unwrap();
168 vfs.read(*offset, &mut page_buffer);
169 let inner_page = InnerNodeBuilder::new().build_from_slice(&page_buffer);
170 let inner_id = PageID::from_pointer(inner_page);
171 inner.as_mut().update_at_pos(idx, inner_id);
172 inner_resolve_queue.push_back(inner_page);
173 }
174 }
175 }
176
177 let leaf_mapping: Vec<(PageID, usize)> =
179 read_vec_from_offset(bf_meta.leaf_offset, bf_meta.leaf_size, &vfs);
180 let leaf_mapping = leaf_mapping.into_iter().map(|(pid, offset)| {
181 let loc = PageLocation::Base(offset);
182 (pid, loc)
183 });
184 let pt = PageTable::new_from_mapping(leaf_mapping, vfs.clone(), config.clone());
185 let circular_buffer = CircularBuffer::new(
186 config.cb_size_byte,
187 config.cb_copy_on_access_ratio,
188 config.cb_min_record_size,
189 config.cb_max_record_size,
190 config.leaf_page_size,
191 config.max_fence_len,
192 buffer_ptr,
193 config.cache_only,
194 );
195
196 let storage = LeafStorage::new_inner(config.clone(), pt, circular_buffer, vfs);
197
198 let raw_root_id = if root_page_id.is_id() {
199 root_page_id.raw() | Self::ROOT_IS_LEAF_MASK
200 } else {
201 root_page_id.raw()
202 };
203
204 let size_classes = Self::create_mem_page_size_classes(
205 config.cb_min_record_size,
206 config.cb_max_record_size,
207 config.leaf_page_size,
208 config.max_fence_len,
209 config.cache_only,
210 );
211 Ok(BfTree {
212 storage,
213 root_page_id: AtomicU64::new(raw_root_id),
214 wal,
215 write_load_full_page: true,
216 cache_only: false,
217 mini_page_size_classes: size_classes,
218 config,
219 #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
220 metrics_recorder: Some(Arc::new(ThreadLocal::new())),
221 })
222 }
223
224 pub fn snapshot(&self) -> PathBuf {
228 let callback = |h| -> Result<TombstoneHandle, TombstoneHandle> {
229 match eviction_callback(&h, self) {
230 Ok(_) => Ok(h),
231 Err(_e) => Err(h),
232 }
233 };
234 self.storage.circular_buffer.drain(callback);
235
236 let root_id = self.get_root_page();
237 let mut inner_mapping: Vec<(*const InnerNode, usize)> = Vec::new();
238 let visitor = BfsVisitor::new_inner_only(self);
239 for node in visitor {
240 match node {
241 NodeInfo::Inner { ptr, .. } => {
242 let inner = ReadGuard::try_read(ptr).unwrap();
243 if inner.as_ref().is_valid_disk_offset() {
244 let offset = inner.as_ref().disk_offset as usize;
245 self.storage.vfs.write(offset, inner.as_ref().as_slice());
246 inner_mapping.push((ptr, offset));
247 }
248 }
249 NodeInfo::Leaf { level, .. } => {
250 assert_eq!(level, 0);
255 }
256 }
257 }
258 let (inner_offset, inner_size) = serialize_vec_to_disk(&inner_mapping, &self.storage.vfs);
259
260 let mut leaf_mapping = Vec::new();
261 let page_table_iter = self.storage.page_table.iter();
262 for (entry, pid) in page_table_iter {
263 assert!(pid.is_id());
264 match entry.try_read().unwrap().as_ref() {
265 PageLocation::Base(base) => leaf_mapping.push((pid, *base)),
266 PageLocation::Full(_) | PageLocation::Mini(_) => {
267 unreachable!("Circular buffer should already be drained!")
268 }
269 PageLocation::Null => panic!("Snapshot of Null page"),
270 }
271 }
272
273 let (leaf_offset, leaf_size) = serialize_vec_to_disk(&leaf_mapping, &self.storage.vfs);
274
275 let file_size = (leaf_offset + align_to_sector_size(leaf_size)) as u64;
276
277 let metadata = BfTreeMeta {
278 magic_begin: *BF_TREE_MAGIC_BEGIN,
279 root_id: root_id.0,
280 inner_offset,
281 inner_size,
282 leaf_offset,
283 leaf_size,
284 file_size,
285 magic_end: *BF_TREE_MAGIC_END,
286 };
287
288 self.storage
289 .vfs
290 .write(META_DATA_PAGE_OFFSET, metadata.as_slice());
291 self.storage.vfs.flush();
292 self.config.file_path.clone()
293 }
294}
295
296#[repr(C, align(512))]
300struct BfTreeMeta {
301 magic_begin: [u8; 16],
302 root_id: PageID,
303 inner_offset: usize,
304 inner_size: usize,
305 leaf_offset: usize,
306 leaf_size: usize,
307 file_size: u64,
308 magic_end: [u8; 14],
309}
310const _: () = assert!(std::mem::size_of::<BfTreeMeta>() <= DISK_PAGE_SIZE);
311
312impl BfTreeMeta {
313 fn as_slice(&self) -> &[u8] {
314 let ptr = self as *const Self as *const u8;
315 let size = std::mem::size_of::<Self>();
316 unsafe { std::slice::from_raw_parts(ptr, size) }
317 }
318
319 fn check_magic(&self) {
320 assert_eq!(self.magic_begin, *BF_TREE_MAGIC_BEGIN);
321 assert_eq!(self.magic_end, *BF_TREE_MAGIC_END);
322 }
323}
324
325fn serialize_vec_to_disk<T>(v: &[T], vfs: &Arc<dyn VfsImpl>) -> (usize, usize) {
327 if v.is_empty() {
328 return (0, 0);
329 }
330 let unaligned_ptr = v.as_ptr() as *const u8;
331 let unaligned_size = std::mem::size_of_val(v);
332
333 let aligned_size = align_to_sector_size(unaligned_size);
334 let layout = std::alloc::Layout::from_size_align(aligned_size, SECTOR_SIZE).unwrap();
335 unsafe {
336 let aligned_ptr = std::alloc::alloc_zeroed(layout);
337 std::ptr::copy_nonoverlapping(unaligned_ptr, aligned_ptr, unaligned_size);
338 let slice = std::slice::from_raw_parts(aligned_ptr, aligned_size);
339 let offset = serialize_u8_slice_to_disk(slice, vfs);
340 std::alloc::dealloc(aligned_ptr, layout);
341 (offset, unaligned_size)
342 }
343}
344
345fn read_vec_from_offset<T: Clone>(offset: usize, size: usize, vfs: &Arc<dyn VfsImpl>) -> Vec<T> {
346 assert!(size > 0);
347 let slice = read_u8_slice_from_disk(offset, size, vfs);
348 let ptr = slice.as_ptr() as *const T;
349 let size = size / std::mem::size_of::<T>();
350 let slice = unsafe { std::slice::from_raw_parts(ptr, size) };
351 slice.to_vec()
352}
353
354fn read_u8_slice_from_disk(offset: usize, size: usize, vfs: &Arc<dyn VfsImpl>) -> Vec<u8> {
355 let mut res = Vec::new();
356 let mut buffer = vec![0; DISK_PAGE_SIZE];
357 for i in (0..size).step_by(DISK_PAGE_SIZE) {
358 vfs.read(offset + i, &mut buffer); res.extend_from_slice(&buffer);
360 }
361 res
362}
363
364const SECTOR_SIZE: usize = 512;
365
366fn align_to_sector_size(n: usize) -> usize {
367 (n + SECTOR_SIZE - 1) & !(SECTOR_SIZE - 1)
368}
369
370fn serialize_u8_slice_to_disk(slice: &[u8], vfs: &Arc<dyn VfsImpl>) -> usize {
374 let mut start_offset = None;
375 for chunk in slice.chunks(DISK_PAGE_SIZE) {
376 let offset = vfs.alloc_offset(DISK_PAGE_SIZE); if start_offset.is_none() {
378 start_offset = Some(offset);
379 }
380 vfs.write(offset, chunk);
381 }
382 start_offset.unwrap()
383}
384
385#[cfg(test)]
386mod tests {
387 use crate::{
388 nodes::leaf_node::LeafReadResult, utils::test_util::install_value_to_buffer, BfTree, Config,
389 };
390 use rstest::rstest;
391 use std::str::FromStr;
392
393 #[rstest]
394 #[case(64, 2408, 8192, 500, "target/test_simple_1.bftree")] #[case(64, 2048, 16384, 500, "target/test_simple_2.bftree")] #[case(3072, 3072, 16384, 500, "target/test_simple_3.bftree")] fn persist_roundtrip_simple(
398 #[case] min_record_size: usize,
399 #[case] max_record_size: usize,
400 #[case] leaf_page_size: usize,
401 #[case] record_cnt: usize,
402 #[case] snapshot_file_path: String,
403 ) {
404 let tmp_file_path = std::path::PathBuf::from_str(&snapshot_file_path).unwrap();
405
406 let mut config = Config::new(&tmp_file_path, leaf_page_size * 16); config.storage_backend(crate::StorageBackend::Std);
408 config.cb_min_record_size = min_record_size;
409 config.cb_max_record_size = max_record_size;
410 config.leaf_page_size = leaf_page_size;
411 config.max_fence_len = max_record_size;
412
413 let bftree = BfTree::with_config(config.clone(), None).unwrap();
414
415 let key_len: usize = min_record_size / 2;
416 let mut key_buffer = vec![0; key_len / 8];
417
418 for r in 0..record_cnt {
419 let key = install_value_to_buffer(&mut key_buffer, r);
420 bftree.insert(key, key);
421 }
422 bftree.snapshot();
423 drop(bftree);
424
425 let bftree = BfTree::new_from_snapshot(config.clone(), None).unwrap();
426 let mut out_buffer = vec![0; key_len];
427 for r in 0..record_cnt {
428 let key = install_value_to_buffer(&mut key_buffer, r);
429 let bytes_read = bftree.read(key, &mut out_buffer);
430
431 match bytes_read {
432 LeafReadResult::Found(v) => {
433 assert_eq!(v as usize, key_len);
434 assert_eq!(&out_buffer, key);
435 }
436 _ => {
437 panic!("Key not found");
438 }
439 }
440 }
441
442 std::fs::remove_file(tmp_file_path).unwrap();
443 }
444}