1use std::collections::{HashMap, VecDeque};
5use std::mem::ManuallyDrop;
6use std::ops::{Deref, DerefMut};
7use std::path::Path;
8use std::path::PathBuf;
9use std::sync::Arc;
10
11#[cfg(unix)]
12use std::os::unix::fs::FileExt;
13#[cfg(windows)]
14use std::os::windows::fs::FileExt;
15
16#[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
17use thread_local::ThreadLocal;
18
19use crate::{
20 circular_buffer::{CircularBuffer, TombstoneHandle},
21 error::ConfigError,
22 fs::VfsImpl,
23 nodes::{InnerNode, InnerNodeBuilder, PageID, DISK_PAGE_SIZE, INNER_NODE_SIZE},
24 range_scan::ScanReturnField,
25 storage::{make_vfs, LeafStorage, PageLocation, PageTable},
26 sync::atomic::AtomicU64,
27 tree::eviction_callback,
28 utils::{inner_lock::ReadGuard, BfsVisitor, NodeInfo},
29 wal::{LogEntry, LogEntryImpl, WriteAheadLog},
30 BfTree, Config, StorageBackend, WalReader,
31};
32
33const BF_TREE_MAGIC_BEGIN: &[u8; 16] = b"BF-TREE-V0-BEGIN";
34const BF_TREE_MAGIC_END: &[u8; 14] = b"BF-TREE-V0-END";
35const META_DATA_PAGE_OFFSET: usize = 0;
36
37struct SectorAlignedVector {
38 inner: ManuallyDrop<Vec<u8>>,
39}
40
41impl Drop for SectorAlignedVector {
42 fn drop(&mut self) {
43 let layout =
44 std::alloc::Layout::from_size_align(self.inner.capacity(), SECTOR_SIZE).unwrap();
45 let ptr = self.inner.as_mut_ptr();
46 unsafe {
47 std::alloc::dealloc(ptr, layout);
48 }
49 }
50}
51
52impl SectorAlignedVector {
53 fn new_zeroed(capacity: usize) -> Self {
54 let layout = std::alloc::Layout::from_size_align(capacity, SECTOR_SIZE).unwrap();
55 let ptr = unsafe { std::alloc::alloc_zeroed(layout) };
56
57 let inner = unsafe { Vec::from_raw_parts(ptr, capacity, capacity) };
58 Self {
59 inner: ManuallyDrop::new(inner),
60 }
61 }
62}
63
64impl Deref for SectorAlignedVector {
65 type Target = Vec<u8>;
66
67 fn deref(&self) -> &Self::Target {
68 &self.inner
69 }
70}
71
72impl DerefMut for SectorAlignedVector {
73 fn deref_mut(&mut self) -> &mut Self::Target {
74 &mut self.inner
75 }
76}
77
78impl BfTree {
79 pub fn recovery(
82 config_file: impl AsRef<Path>,
83 wal_file: impl AsRef<Path>,
84 buffer_ptr: Option<*mut u8>,
85 ) {
86 let bf_tree_config = Config::new_with_config_file(config_file);
87 let bf_tree = BfTree::new_from_snapshot(bf_tree_config, buffer_ptr).unwrap();
88 let wal_reader = WalReader::new(wal_file, 4096);
89
90 for seg in wal_reader.segment_iter() {
91 for entry in seg.entry_iter() {
92 let log_entry = LogEntry::read_from_buffer(entry.1);
93 match log_entry {
94 LogEntry::Write(op) => {
95 bf_tree.insert(op.key, op.value);
96 }
97 LogEntry::Split(_op) => {
98 todo!("implement split op in wal!")
99 }
100 }
101 }
102 }
103 }
104
105 pub fn new_from_snapshot(
108 bf_tree_config: Config,
109 buffer_ptr: Option<*mut u8>,
110 ) -> Result<Self, ConfigError> {
111 if !bf_tree_config.file_path.exists() {
112 return BfTree::with_config(bf_tree_config.clone(), buffer_ptr);
114 }
115
116 bf_tree_config.validate()?;
118
119 let reader = std::fs::File::open(bf_tree_config.file_path.clone()).unwrap();
120 let mut metadata = SectorAlignedVector::new_zeroed(4096);
121 #[cfg(unix)]
122 {
123 reader.read_at(&mut metadata, 0).unwrap();
124 }
125 #[cfg(windows)]
126 {
127 reader.seek_read(&mut metadata, 0).unwrap();
128 }
129
130 let bf_meta = unsafe { (metadata.as_ptr() as *const BfTreeMeta).read() };
131 bf_meta.check_magic();
132 assert_eq!(reader.metadata().unwrap().len(), bf_meta.file_size);
133
134 let config = Arc::new(bf_tree_config);
135
136 let wal = config
137 .write_ahead_log
138 .as_ref()
139 .map(|s| WriteAheadLog::new(s.clone()));
140
141 let vfs = make_vfs(&config.storage_backend, &config.file_path);
142
143 let mut page_buffer = SectorAlignedVector::new_zeroed(INNER_NODE_SIZE);
144
145 let mut root_page_id = bf_meta.root_id;
147 if root_page_id.is_inner_node_pointer() {
148 let inner_mapping: Vec<(*const InnerNode, usize)> =
149 read_vec_from_offset(bf_meta.inner_offset, bf_meta.inner_size, &vfs);
150 let mut inner_map = HashMap::new();
151
152 for m in inner_mapping {
153 inner_map.insert(m.0, m.1);
154 }
155 let offset = inner_map.get(&root_page_id.as_inner_node()).unwrap();
156 vfs.read(*offset, &mut page_buffer);
157 let root_page = InnerNodeBuilder::new().build_from_slice(&page_buffer);
158 root_page_id = PageID::from_pointer(root_page);
159
160 let mut inner_resolve_queue = VecDeque::from([root_page]);
161 while !inner_resolve_queue.is_empty() {
162 let inner_ptr = inner_resolve_queue.pop_front().unwrap();
163 let mut inner = ReadGuard::try_read(inner_ptr).unwrap().upgrade().unwrap();
164 if inner.as_ref().meta.children_is_leaf() {
165 continue;
166 }
167 for (idx, c) in inner.as_ref().get_child_iter().enumerate() {
168 let offset = inner_map.get(&c.as_inner_node()).unwrap();
169 vfs.read(*offset, &mut page_buffer);
170 let inner_page = InnerNodeBuilder::new().build_from_slice(&page_buffer);
171 let inner_id = PageID::from_pointer(inner_page);
172 inner.as_mut().update_at_pos(idx, inner_id);
173 inner_resolve_queue.push_back(inner_page);
174 }
175 }
176 }
177
178 let leaf_mapping: Vec<(PageID, usize)> =
180 read_vec_from_offset(bf_meta.leaf_offset, bf_meta.leaf_size, &vfs);
181 let leaf_mapping = leaf_mapping.into_iter().map(|(pid, offset)| {
182 let loc = PageLocation::Base(offset);
183 (pid, loc)
184 });
185 let pt = PageTable::new_from_mapping(leaf_mapping, vfs.clone(), config.clone());
186 let circular_buffer = CircularBuffer::new(
187 config.cb_size_byte,
188 config.cb_copy_on_access_ratio,
189 config.cb_min_record_size,
190 config.cb_max_record_size,
191 config.leaf_page_size,
192 config.max_fence_len,
193 buffer_ptr,
194 config.cache_only,
195 );
196
197 let storage = LeafStorage::new_inner(config.clone(), pt, circular_buffer, vfs);
198
199 let raw_root_id = if root_page_id.is_id() {
200 root_page_id.raw() | Self::ROOT_IS_LEAF_MASK
201 } else {
202 root_page_id.raw()
203 };
204
205 let size_classes = Self::create_mem_page_size_classes(
206 config.cb_min_record_size,
207 config.cb_max_record_size,
208 config.leaf_page_size,
209 config.max_fence_len,
210 config.cache_only,
211 );
212 Ok(BfTree {
213 storage,
214 root_page_id: AtomicU64::new(raw_root_id),
215 wal,
216 write_load_full_page: true,
217 cache_only: false,
218 mini_page_size_classes: size_classes,
219 config,
220 #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
221 metrics_recorder: Some(Arc::new(ThreadLocal::new())),
222 })
223 }
224
225 pub fn snapshot(&self) -> PathBuf {
229 let callback = |h| -> Result<TombstoneHandle, TombstoneHandle> {
230 match eviction_callback(&h, self) {
231 Ok(_) => Ok(h),
232 Err(_e) => Err(h),
233 }
234 };
235 self.storage.circular_buffer.drain(callback);
236
237 let root_id = self.get_root_page();
238 let mut inner_mapping: Vec<(*const InnerNode, usize)> = Vec::new();
239 let visitor = BfsVisitor::new_inner_only(self);
240 for node in visitor {
241 match node {
242 NodeInfo::Inner { ptr, .. } => {
243 let inner = ReadGuard::try_read(ptr).unwrap();
244 if inner.as_ref().is_valid_disk_offset() {
245 let offset = inner.as_ref().disk_offset as usize;
246 self.storage.vfs.write(offset, inner.as_ref().as_slice());
247 inner_mapping.push((ptr, offset));
248 }
249 }
250 NodeInfo::Leaf { level, .. } => {
251 assert_eq!(level, 0);
256 }
257 }
258 }
259 let (inner_offset, inner_size) = serialize_vec_to_disk(&inner_mapping, &self.storage.vfs);
260
261 let mut leaf_mapping = Vec::new();
262 let page_table_iter = self.storage.page_table.iter();
263 for (entry, pid) in page_table_iter {
264 assert!(pid.is_id());
265 match entry.try_read().unwrap().as_ref() {
266 PageLocation::Base(base) => leaf_mapping.push((pid, *base)),
267 PageLocation::Full(_) | PageLocation::Mini(_) => {
268 unreachable!("Circular buffer should already be drained!")
269 }
270 PageLocation::Null => panic!("Snapshot of Null page"),
271 }
272 }
273
274 let (leaf_offset, leaf_size) = serialize_vec_to_disk(&leaf_mapping, &self.storage.vfs);
275
276 let file_size = (leaf_offset + align_to_sector_size(leaf_size)) as u64;
277
278 let metadata = BfTreeMeta {
279 magic_begin: *BF_TREE_MAGIC_BEGIN,
280 root_id: root_id.0,
281 inner_offset,
282 inner_size,
283 leaf_offset,
284 leaf_size,
285 file_size,
286 magic_end: *BF_TREE_MAGIC_END,
287 };
288
289 self.storage
290 .vfs
291 .write(META_DATA_PAGE_OFFSET, metadata.as_slice());
292 self.storage.vfs.flush();
293 self.config.file_path.clone()
294 }
295
296 pub fn snapshot_memory_to_disk(&self, snapshot_path: impl AsRef<Path>) -> PathBuf {
310 let snapshot_path = snapshot_path.as_ref();
311 assert!(
312 !snapshot_path.exists(),
313 "snapshot_memory_to_disk: target file already exists: {:?}",
314 snapshot_path
315 );
316
317 let mut disk_config = self.config.as_ref().clone();
319 disk_config.storage_backend(StorageBackend::Std);
320 disk_config.cache_only(false);
321 disk_config.file_path(snapshot_path);
322
323 let disk_tree = BfTree::with_config(disk_config, None)
324 .expect("Failed to create disk-backed BfTree for snapshot");
325
326 if self.cache_only {
327 panic!("snapshot_memory_to_disk does not support cache_only trees");
329 } else {
330 Self::copy_records_via_scan(self, &disk_tree);
333 }
334
335 disk_tree.snapshot()
336 }
337
338 fn copy_records_via_scan(src: &BfTree, dst: &BfTree) {
341 let buf_size = src.config.leaf_page_size;
343 let mut scan_buf = vec![0u8; buf_size];
344
345 let start_key: &[u8] = &[0u8];
347 let mut scan_iter =
348 match src.scan_with_count(start_key, usize::MAX, ScanReturnField::KeyAndValue) {
349 Ok(iter) => iter,
350 Err(_) => return, };
352
353 while let Some((key_len, value_len)) = scan_iter.next(&mut scan_buf) {
354 let key = &scan_buf[..key_len];
355 let value = &scan_buf[key_len..key_len + value_len];
356 dst.insert(key, value);
357 }
358 }
359
360 pub fn new_from_snapshot_disk_to_memory(
374 snapshot_path: impl AsRef<Path>,
375 memory_config: Config,
376 ) -> Result<Self, ConfigError> {
377 let snapshot_path = snapshot_path.as_ref();
378 assert!(
379 snapshot_path.exists(),
380 "new_from_snapshot_disk_to_memory: snapshot file does not exist: {:?}",
381 snapshot_path
382 );
383
384 let mut disk_config = memory_config.clone();
386 disk_config.storage_backend(StorageBackend::Std);
387 disk_config.cache_only(false);
388 disk_config.file_path(snapshot_path);
389
390 let disk_tree = BfTree::new_from_snapshot(disk_config, None)?;
391
392 let mut mem_config = memory_config;
396 mem_config.storage_backend(StorageBackend::Memory);
397 mem_config.cache_only(false);
398
399 let mem_tree = BfTree::with_config(mem_config, None)?;
400
401 Self::copy_records_via_scan(&disk_tree, &mem_tree);
404
405 Ok(mem_tree)
406 }
407}
408
409#[repr(C, align(512))]
413struct BfTreeMeta {
414 magic_begin: [u8; 16],
415 root_id: PageID,
416 inner_offset: usize,
417 inner_size: usize,
418 leaf_offset: usize,
419 leaf_size: usize,
420 file_size: u64,
421 magic_end: [u8; 14],
422}
423const _: () = assert!(std::mem::size_of::<BfTreeMeta>() <= DISK_PAGE_SIZE);
424
425impl BfTreeMeta {
426 fn as_slice(&self) -> &[u8] {
427 let ptr = self as *const Self as *const u8;
428 let size = std::mem::size_of::<Self>();
429 unsafe { std::slice::from_raw_parts(ptr, size) }
430 }
431
432 fn check_magic(&self) {
433 assert_eq!(self.magic_begin, *BF_TREE_MAGIC_BEGIN);
434 assert_eq!(self.magic_end, *BF_TREE_MAGIC_END);
435 }
436}
437
438fn serialize_vec_to_disk<T>(v: &[T], vfs: &Arc<dyn VfsImpl>) -> (usize, usize) {
440 if v.is_empty() {
441 return (0, 0);
442 }
443 let unaligned_ptr = v.as_ptr() as *const u8;
444 let unaligned_size = std::mem::size_of_val(v);
445
446 let aligned_size = align_to_sector_size(unaligned_size);
447 let layout = std::alloc::Layout::from_size_align(aligned_size, SECTOR_SIZE).unwrap();
448 unsafe {
449 let aligned_ptr = std::alloc::alloc_zeroed(layout);
450 std::ptr::copy_nonoverlapping(unaligned_ptr, aligned_ptr, unaligned_size);
451 let slice = std::slice::from_raw_parts(aligned_ptr, aligned_size);
452 let offset = serialize_u8_slice_to_disk(slice, vfs);
453 std::alloc::dealloc(aligned_ptr, layout);
454 (offset, unaligned_size)
455 }
456}
457
458fn read_vec_from_offset<T: Clone>(offset: usize, size: usize, vfs: &Arc<dyn VfsImpl>) -> Vec<T> {
459 assert!(size > 0);
460 let slice = read_u8_slice_from_disk(offset, size, vfs);
461 let ptr = slice.as_ptr() as *const T;
462 let size = size / std::mem::size_of::<T>();
463 let slice = unsafe { std::slice::from_raw_parts(ptr, size) };
464 slice.to_vec()
465}
466
467fn read_u8_slice_from_disk(offset: usize, size: usize, vfs: &Arc<dyn VfsImpl>) -> Vec<u8> {
468 let mut res = Vec::new();
469 let mut buffer = vec![0; DISK_PAGE_SIZE];
470 for i in (0..size).step_by(DISK_PAGE_SIZE) {
471 vfs.read(offset + i, &mut buffer); res.extend_from_slice(&buffer);
473 }
474 res
475}
476
477const SECTOR_SIZE: usize = 512;
478
479fn align_to_sector_size(n: usize) -> usize {
480 (n + SECTOR_SIZE - 1) & !(SECTOR_SIZE - 1)
481}
482
483fn serialize_u8_slice_to_disk(slice: &[u8], vfs: &Arc<dyn VfsImpl>) -> usize {
487 let mut start_offset = None;
488 for chunk in slice.chunks(DISK_PAGE_SIZE) {
489 let offset = vfs.alloc_offset(DISK_PAGE_SIZE); if start_offset.is_none() {
491 start_offset = Some(offset);
492 }
493 vfs.write(offset, chunk);
494 }
495 start_offset.unwrap()
496}
497
498#[cfg(test)]
499mod tests {
500 use crate::{
501 nodes::leaf_node::LeafReadResult, utils::test_util::install_value_to_buffer, BfTree, Config,
502 };
503 use rstest::rstest;
504 use std::str::FromStr;
505
506 #[rstest]
507 #[case(64, 2408, 8192, 500, "target/test_simple_1.bftree")] #[case(64, 2048, 16384, 500, "target/test_simple_2.bftree")] #[case(3072, 3072, 16384, 500, "target/test_simple_3.bftree")] fn persist_roundtrip_simple(
511 #[case] min_record_size: usize,
512 #[case] max_record_size: usize,
513 #[case] leaf_page_size: usize,
514 #[case] record_cnt: usize,
515 #[case] snapshot_file_path: String,
516 ) {
517 let tmp_file_path = std::path::PathBuf::from_str(&snapshot_file_path).unwrap();
518
519 let mut config = Config::new(&tmp_file_path, leaf_page_size * 16); config.storage_backend(crate::StorageBackend::Std);
521 config.cb_min_record_size = min_record_size;
522 config.cb_max_record_size = max_record_size;
523 config.leaf_page_size = leaf_page_size;
524 config.max_fence_len = max_record_size;
525
526 let bftree = BfTree::with_config(config.clone(), None).unwrap();
527
528 let key_len: usize = min_record_size / 2;
529 let mut key_buffer = vec![0; key_len / 8];
530
531 for r in 0..record_cnt {
532 let key = install_value_to_buffer(&mut key_buffer, r);
533 bftree.insert(key, key);
534 }
535 bftree.snapshot();
536 drop(bftree);
537
538 let bftree = BfTree::new_from_snapshot(config.clone(), None).unwrap();
539 let mut out_buffer = vec![0; key_len];
540 for r in 0..record_cnt {
541 let key = install_value_to_buffer(&mut key_buffer, r);
542 let bytes_read = bftree.read(key, &mut out_buffer);
543
544 match bytes_read {
545 LeafReadResult::Found(v) => {
546 assert_eq!(v as usize, key_len);
547 assert_eq!(&out_buffer, key);
548 }
549 _ => {
550 panic!("Key not found");
551 }
552 }
553 }
554
555 std::fs::remove_file(tmp_file_path).unwrap();
556 }
557
558 #[test]
559 fn snapshot_memory_to_disk_roundtrip() {
560 let snapshot_path = std::path::PathBuf::from_str("target/test_mem_to_disk.bftree").unwrap();
561 let _ = std::fs::remove_file(&snapshot_path);
563
564 let min_record_size: usize = 64;
565 let max_record_size: usize = 2048;
566 let leaf_page_size: usize = 8192;
567
568 let mut config = Config::new(":memory:", leaf_page_size * 16);
570 config.cb_min_record_size = min_record_size;
571 config.cb_max_record_size = max_record_size;
572 config.leaf_page_size = leaf_page_size;
573 config.max_fence_len = max_record_size;
574
575 let bftree = BfTree::with_config(config.clone(), None).unwrap();
576
577 let key_len: usize = min_record_size / 2;
578 let record_cnt = 200;
579 let mut key_buffer = vec![0usize; key_len / 8];
580
581 for r in 0..record_cnt {
582 let key = install_value_to_buffer(&mut key_buffer, r);
583 bftree.insert(key, key);
584 }
585
586 let path = bftree.snapshot_memory_to_disk(&snapshot_path);
588 assert_eq!(path, snapshot_path);
589 assert!(snapshot_path.exists());
590 drop(bftree);
591
592 let mut disk_config = Config::new(&snapshot_path, leaf_page_size * 16);
594 disk_config.storage_backend(crate::StorageBackend::Std);
595 disk_config.cb_min_record_size = min_record_size;
596 disk_config.cb_max_record_size = max_record_size;
597 disk_config.leaf_page_size = leaf_page_size;
598 disk_config.max_fence_len = max_record_size;
599
600 let loaded = BfTree::new_from_snapshot(disk_config, None).unwrap();
601 let mut out_buffer = vec![0u8; key_len];
602 for r in 0..record_cnt {
603 let key = install_value_to_buffer(&mut key_buffer, r);
604 let result = loaded.read(key, &mut out_buffer);
605 match result {
606 LeafReadResult::Found(v) => {
607 assert_eq!(v as usize, key_len);
608 assert_eq!(&out_buffer[..key_len], key);
609 }
610 other => {
611 panic!("Key {r} not found, got: {:?}", other);
612 }
613 }
614 }
615
616 std::fs::remove_file(snapshot_path).unwrap();
617 }
618
619 #[test]
620 fn snapshot_disk_to_memory_roundtrip() {
621 let snapshot_path = std::path::PathBuf::from_str("target/test_disk_to_mem.bftree").unwrap();
622 let _ = std::fs::remove_file(&snapshot_path);
623
624 let min_record_size: usize = 64;
625 let max_record_size: usize = 2048;
626 let leaf_page_size: usize = 8192;
627 let record_cnt: usize = 500;
628
629 {
631 let mut config = Config::new(&snapshot_path, leaf_page_size * 16);
632 config.storage_backend(crate::StorageBackend::Std);
633 config.cb_min_record_size = min_record_size;
634 config.cb_max_record_size = max_record_size;
635 config.leaf_page_size = leaf_page_size;
636 config.max_fence_len = max_record_size;
637
638 let tree = BfTree::with_config(config, None).unwrap();
639 let key_len = min_record_size / 2;
640 let mut key_buffer = vec![0usize; key_len / 8];
641
642 for r in 0..record_cnt {
643 let key = install_value_to_buffer(&mut key_buffer, r);
644 tree.insert(key, key);
645 }
646 tree.snapshot();
647 }
648
649 let mut mem_config = Config::new(":memory:", leaf_page_size * 16);
651 mem_config.cb_min_record_size = min_record_size;
652 mem_config.cb_max_record_size = max_record_size;
653 mem_config.leaf_page_size = leaf_page_size;
654 mem_config.max_fence_len = max_record_size;
655
656 let mem_tree =
657 BfTree::new_from_snapshot_disk_to_memory(&snapshot_path, mem_config).unwrap();
658
659 let key_len = min_record_size / 2;
661 let mut key_buffer = vec![0usize; key_len / 8];
662 let mut out_buffer = vec![0u8; key_len];
663 for r in 0..record_cnt {
664 let key = install_value_to_buffer(&mut key_buffer, r);
665 let result = mem_tree.read(key, &mut out_buffer);
666 match result {
667 LeafReadResult::Found(v) => {
668 assert_eq!(v as usize, key_len);
669 assert_eq!(&out_buffer[..key_len], key);
670 }
671 other => {
672 panic!("Key {r} not found, got: {:?}", other);
673 }
674 }
675 }
676
677 std::fs::remove_file(snapshot_path).unwrap();
678 }
679}