1use std::collections::{HashMap, VecDeque};
5use std::mem::ManuallyDrop;
6use std::ops::{Deref, DerefMut};
7use std::path::Path;
8use std::path::PathBuf;
9use std::sync::Arc;
10
11#[cfg(unix)]
12use std::os::unix::fs::FileExt;
13#[cfg(windows)]
14use std::os::windows::fs::FileExt;
15
16#[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
17use thread_local::ThreadLocal;
18
19use crate::{
20 circular_buffer::{CircularBuffer, TombstoneHandle},
21 fs::VfsImpl,
22 nodes::{InnerNode, InnerNodeBuilder, PageID, DISK_PAGE_SIZE, INNER_NODE_SIZE},
23 storage::{make_vfs, LeafStorage, PageLocation, PageTable},
24 sync::atomic::AtomicU64,
25 tree::eviction_callback,
26 utils::{inner_lock::ReadGuard, BfsVisitor, NodeInfo},
27 wal::{LogEntry, LogEntryImpl, WriteAheadLog},
28 BfTree, Config, WalReader,
29};
30
31const BF_TREE_MAGIC_BEGIN: &[u8; 16] = b"BF-TREE-V0-BEGIN";
32const BF_TREE_MAGIC_END: &[u8; 14] = b"BF-TREE-V0-END";
33const META_DATA_PAGE_OFFSET: usize = 0;
34
35struct SectorAlignedVector {
36 inner: ManuallyDrop<Vec<u8>>,
37}
38
39impl Drop for SectorAlignedVector {
40 fn drop(&mut self) {
41 let layout =
42 std::alloc::Layout::from_size_align(self.inner.capacity(), SECTOR_SIZE).unwrap();
43 let ptr = self.inner.as_mut_ptr();
44 unsafe {
45 std::alloc::dealloc(ptr, layout);
46 }
47 }
48}
49
50impl SectorAlignedVector {
51 fn new_zeroed(capacity: usize) -> Self {
52 let layout = std::alloc::Layout::from_size_align(capacity, SECTOR_SIZE).unwrap();
53 let ptr = unsafe { std::alloc::alloc_zeroed(layout) };
54
55 let inner = unsafe { Vec::from_raw_parts(ptr, capacity, capacity) };
56 Self {
57 inner: ManuallyDrop::new(inner),
58 }
59 }
60}
61
62impl Deref for SectorAlignedVector {
63 type Target = Vec<u8>;
64
65 fn deref(&self) -> &Self::Target {
66 &self.inner
67 }
68}
69
70impl DerefMut for SectorAlignedVector {
71 fn deref_mut(&mut self) -> &mut Self::Target {
72 &mut self.inner
73 }
74}
75
76impl BfTree {
77 pub fn recovery(
78 config_file: impl AsRef<Path>,
79 wal_file: impl AsRef<Path>,
80 buffer_ptr: Option<*mut u8>,
81 ) {
82 let bf_tree_config = Config::new_with_config_file(config_file);
83 let bf_tree = BfTree::new_from_snapshot(bf_tree_config, buffer_ptr);
84 let wal_reader = WalReader::new(wal_file, 4096);
85
86 for seg in wal_reader.segment_iter() {
87 for entry in seg.entry_iter() {
88 let log_entry = LogEntry::read_from_buffer(entry.1);
89 match log_entry {
90 LogEntry::Write(op) => {
91 bf_tree.insert(op.key, op.value);
92 }
93 LogEntry::Split(_op) => {
94 todo!("implement split op in wal!")
95 }
96 }
97 }
98 }
99 }
100
101 pub fn new_from_snapshot(bf_tree_config: Config, buffer_ptr: Option<*mut u8>) -> Self {
104 if !bf_tree_config.file_path.exists() {
105 return BfTree::with_config(bf_tree_config.clone(), buffer_ptr);
107 }
108
109 let reader = std::fs::File::open(bf_tree_config.file_path.clone()).unwrap();
110 let mut metadata = SectorAlignedVector::new_zeroed(4096);
111 #[cfg(unix)]
112 {
113 reader.read_at(&mut metadata, 0).unwrap();
114 }
115 #[cfg(windows)]
116 {
117 reader.seek_read(&mut metadata, 0).unwrap();
118 }
119
120 let bf_meta = unsafe { (metadata.as_ptr() as *const BfTreeMeta).read() };
121 bf_meta.check_magic();
122 assert_eq!(reader.metadata().unwrap().len(), bf_meta.file_size);
123
124 let config = Arc::new(bf_tree_config);
125
126 let wal = config
127 .write_ahead_log
128 .as_ref()
129 .map(|s| WriteAheadLog::new(s.clone()));
130
131 let vfs = make_vfs(&config.storage_backend, &config.file_path);
132
133 let mut page_buffer = SectorAlignedVector::new_zeroed(INNER_NODE_SIZE);
134
135 let mut root_page_id = bf_meta.root_id;
137 if root_page_id.is_inner_node_pointer() {
138 let inner_mapping: Vec<(*const InnerNode, usize)> =
139 read_vec_from_offset(bf_meta.inner_offset, bf_meta.inner_size, &vfs);
140 let mut inner_map = HashMap::new();
141
142 for m in inner_mapping {
143 inner_map.insert(m.0, m.1);
144 }
145 let offset = inner_map.get(&root_page_id.as_inner_node()).unwrap();
146 vfs.read(*offset, &mut page_buffer);
147 let root_page = InnerNodeBuilder::new().build_from_slice(&page_buffer);
148 root_page_id = PageID::from_pointer(root_page);
149
150 let mut inner_resolve_queue = VecDeque::from([root_page]);
151 while !inner_resolve_queue.is_empty() {
152 let inner_ptr = inner_resolve_queue.pop_front().unwrap();
153 let mut inner = ReadGuard::try_read(inner_ptr).unwrap().upgrade().unwrap();
154 if inner.as_ref().meta.children_is_leaf() {
155 continue;
156 }
157 for (idx, c) in inner.as_ref().get_child_iter().enumerate() {
158 let offset = inner_map.get(&c.as_inner_node()).unwrap();
159 vfs.read(*offset, &mut page_buffer);
160 let inner_page = InnerNodeBuilder::new().build_from_slice(&page_buffer);
161 let inner_id = PageID::from_pointer(inner_page);
162 inner.as_mut().update_at_pos(idx, inner_id);
163 inner_resolve_queue.push_back(inner_page);
164 }
165 }
166 }
167
168 let leaf_mapping: Vec<(PageID, usize)> =
170 read_vec_from_offset(bf_meta.leaf_offset, bf_meta.leaf_size, &vfs);
171 let leaf_mapping = leaf_mapping.into_iter().map(|(pid, offset)| {
172 let loc = PageLocation::Base(offset);
173 (pid, loc)
174 });
175 let pt = PageTable::new_from_mapping(leaf_mapping, vfs.clone(), config.clone());
176 let circular_buffer = CircularBuffer::new(
177 config.cb_size_byte,
178 config.cb_copy_on_access_ratio,
179 config.cb_min_record_size,
180 config.cb_max_record_size,
181 config.leaf_page_size,
182 config.max_fence_len,
183 buffer_ptr,
184 config.cache_only,
185 );
186
187 let storage = LeafStorage::new_inner(config.clone(), pt, circular_buffer, vfs);
188
189 let raw_root_id = if root_page_id.is_id() {
190 root_page_id.raw() | Self::ROOT_IS_LEAF_MASK
191 } else {
192 root_page_id.raw()
193 };
194
195 let size_classes = Self::create_mem_page_size_classes(
196 config.cb_min_record_size,
197 config.cb_max_record_size,
198 config.leaf_page_size,
199 config.max_fence_len,
200 config.cache_only,
201 );
202 BfTree {
203 storage,
204 root_page_id: AtomicU64::new(raw_root_id),
205 wal,
206 write_load_full_page: true,
207 cache_only: false,
208 mini_page_size_classes: size_classes,
209 config,
210 #[cfg(any(feature = "metrics-rt-debug-all", feature = "metrics-rt-debug-timer"))]
211 metrics_recorder: Some(Arc::new(ThreadLocal::new())),
212 }
213 }
214
215 pub fn snapshot(&self) -> PathBuf {
219 let callback = |h| -> Result<TombstoneHandle, TombstoneHandle> {
220 match eviction_callback(&h, self) {
221 Ok(_) => Ok(h),
222 Err(_e) => Err(h),
223 }
224 };
225 self.storage.circular_buffer.drain(callback);
226
227 let root_id = self.get_root_page();
228 let mut inner_mapping: Vec<(*const InnerNode, usize)> = Vec::new();
229 let visitor = BfsVisitor::new_inner_only(self);
230 for node in visitor {
231 match node {
232 NodeInfo::Inner { ptr, .. } => {
233 let inner = ReadGuard::try_read(ptr).unwrap();
234 if inner.as_ref().is_valid_disk_offset() {
235 let offset = inner.as_ref().disk_offset as usize;
236 self.storage.vfs.write(offset, inner.as_ref().as_slice());
237 inner_mapping.push((ptr, offset));
238 }
239 }
240 NodeInfo::Leaf { level, .. } => {
241 assert_eq!(level, 0);
246 }
247 }
248 }
249 let (inner_offset, inner_size) = serialize_vec_to_disk(&inner_mapping, &self.storage.vfs);
250
251 let mut leaf_mapping = Vec::new();
252 let page_table_iter = self.storage.page_table.iter();
253 for (entry, pid) in page_table_iter {
254 assert!(pid.is_id());
255 match entry.try_read().unwrap().as_ref() {
256 PageLocation::Base(base) => leaf_mapping.push((pid, *base)),
257 PageLocation::Full(_) | PageLocation::Mini(_) => {
258 unreachable!("Circular buffer should already be drained!")
259 }
260 PageLocation::Null => panic!("Snapshot of Null page"),
261 }
262 }
263
264 let (leaf_offset, leaf_size) = serialize_vec_to_disk(&leaf_mapping, &self.storage.vfs);
265
266 let file_size = (leaf_offset + align_to_sector_size(leaf_size)) as u64;
267
268 let metadata = BfTreeMeta {
269 magic_begin: *BF_TREE_MAGIC_BEGIN,
270 root_id: root_id.0,
271 inner_offset,
272 inner_size,
273 leaf_offset,
274 leaf_size,
275 file_size,
276 magic_end: *BF_TREE_MAGIC_END,
277 };
278
279 self.storage
280 .vfs
281 .write(META_DATA_PAGE_OFFSET, metadata.as_slice());
282 self.storage.vfs.flush();
283 self.config.file_path.clone()
284 }
285}
286
287#[repr(C, align(512))]
291struct BfTreeMeta {
292 magic_begin: [u8; 16],
293 root_id: PageID,
294 inner_offset: usize,
295 inner_size: usize,
296 leaf_offset: usize,
297 leaf_size: usize,
298 file_size: u64,
299 magic_end: [u8; 14],
300}
301const _: () = assert!(std::mem::size_of::<BfTreeMeta>() <= DISK_PAGE_SIZE);
302
303impl BfTreeMeta {
304 fn as_slice(&self) -> &[u8] {
305 let ptr = self as *const Self as *const u8;
306 let size = std::mem::size_of::<Self>();
307 unsafe { std::slice::from_raw_parts(ptr, size) }
308 }
309
310 fn check_magic(&self) {
311 assert_eq!(self.magic_begin, *BF_TREE_MAGIC_BEGIN);
312 assert_eq!(self.magic_end, *BF_TREE_MAGIC_END);
313 }
314}
315
316fn serialize_vec_to_disk<T>(v: &[T], vfs: &Arc<dyn VfsImpl>) -> (usize, usize) {
318 if v.is_empty() {
319 return (0, 0);
320 }
321 let unaligned_ptr = v.as_ptr() as *const u8;
322 let unaligned_size = std::mem::size_of_val(v);
323
324 let aligned_size = align_to_sector_size(unaligned_size);
325 let layout = std::alloc::Layout::from_size_align(aligned_size, SECTOR_SIZE).unwrap();
326 unsafe {
327 let aligned_ptr = std::alloc::alloc_zeroed(layout);
328 std::ptr::copy_nonoverlapping(unaligned_ptr, aligned_ptr, unaligned_size);
329 let slice = std::slice::from_raw_parts(aligned_ptr, aligned_size);
330 let offset = serialize_u8_slice_to_disk(slice, vfs);
331 std::alloc::dealloc(aligned_ptr, layout);
332 (offset, unaligned_size)
333 }
334}
335
336fn read_vec_from_offset<T: Clone>(offset: usize, size: usize, vfs: &Arc<dyn VfsImpl>) -> Vec<T> {
337 assert!(size > 0);
338 let slice = read_u8_slice_from_disk(offset, size, vfs);
339 let ptr = slice.as_ptr() as *const T;
340 let size = size / std::mem::size_of::<T>();
341 let slice = unsafe { std::slice::from_raw_parts(ptr, size) };
342 slice.to_vec()
343}
344
345fn read_u8_slice_from_disk(offset: usize, size: usize, vfs: &Arc<dyn VfsImpl>) -> Vec<u8> {
346 let mut res = Vec::new();
347 let mut buffer = vec![0; DISK_PAGE_SIZE];
348 for i in (0..size).step_by(DISK_PAGE_SIZE) {
349 vfs.read(offset + i, &mut buffer); res.extend_from_slice(&buffer);
351 }
352 res
353}
354
355const SECTOR_SIZE: usize = 512;
356
357fn align_to_sector_size(n: usize) -> usize {
358 (n + SECTOR_SIZE - 1) & !(SECTOR_SIZE - 1)
359}
360
361fn serialize_u8_slice_to_disk(slice: &[u8], vfs: &Arc<dyn VfsImpl>) -> usize {
365 let mut start_offset = None;
366 for chunk in slice.chunks(DISK_PAGE_SIZE) {
367 let offset = vfs.alloc_offset(DISK_PAGE_SIZE); if start_offset.is_none() {
369 start_offset = Some(offset);
370 }
371 vfs.write(offset, chunk);
372 }
373 start_offset.unwrap()
374}
375
376#[cfg(test)]
377mod tests {
378 use crate::{nodes::leaf_node::LeafReadResult, BfTree, Config};
379 use rstest::rstest;
380 use std::str::FromStr;
381
382 pub(crate) fn install_value_to_buffer(buffer: &mut [usize], key_id: usize) -> &[u8] {
383 for i in buffer.iter_mut() {
384 *i = key_id;
385 }
386
387 unsafe {
388 let ptr = buffer.as_ptr();
389 std::slice::from_raw_parts(ptr as *const u8, buffer.len() * 8)
390 }
391 }
392
393 #[rstest]
394 #[case(64, 2408, 8192, 500, "target/test_simple_1.bftree")] #[case(64, 2048, 16384, 500, "target/test_simple_2.bftree")] #[case(3072, 3072, 16384, 500, "target/test_simple_3.bftree")] fn persist_roundtrip_simple(
398 #[case] min_record_size: usize,
399 #[case] max_record_size: usize,
400 #[case] leaf_page_size: usize,
401 #[case] record_cnt: usize,
402 #[case] snapshot_file_path: String,
403 ) {
404 let tmp_file_path = std::path::PathBuf::from_str(&snapshot_file_path).unwrap();
405
406 let mut config = Config::new(&tmp_file_path, leaf_page_size * 16); config.storage_backend(crate::StorageBackend::Std);
408 config.cb_min_record_size = min_record_size;
409 config.cb_max_record_size = max_record_size;
410 config.leaf_page_size = leaf_page_size;
411 config.max_fence_len = max_record_size;
412
413 let bftree = BfTree::with_config(config.clone(), None);
414
415 let key_len: usize = min_record_size / 2;
416 let mut key_buffer = vec![0; key_len / 8];
417
418 for r in 0..record_cnt {
419 let key = install_value_to_buffer(&mut key_buffer, r);
420 bftree.insert(key, key);
421 }
422 bftree.snapshot();
423 drop(bftree);
424
425 let bftree = BfTree::new_from_snapshot(config.clone(), None);
426 let mut out_buffer = vec![0; key_len];
427 for r in 0..record_cnt {
428 let key = install_value_to_buffer(&mut key_buffer, r);
429 let bytes_read = bftree.read(key, &mut out_buffer);
430
431 match bytes_read {
432 LeafReadResult::Found(v) => {
433 assert_eq!(v as usize, key_len);
434 assert_eq!(&out_buffer, key);
435 }
436 _ => {
437 panic!("Key not found");
438 }
439 }
440 }
441
442 std::fs::remove_file(tmp_file_path).unwrap();
443 }
444}