1use std::{
2 collections::HashMap,
3 collections::HashSet,
4 fmt, io,
5 path::{Path, PathBuf},
6 sync::{
7 Arc, OnceLock, Weak,
8 atomic::{AtomicU32, AtomicU64, Ordering},
9 },
10};
11
12use parking_lot::{Mutex, RwLock, RwLockReadGuard};
13
14#[cfg(feature = "ffi")]
15mod ffi;
16pub(crate) mod node;
17pub(crate) mod page_store;
18pub(crate) mod store;
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum Error {
22 NotFound,
23 Corruption,
24 TooLarge,
25 Internal,
26 NoSpace,
27 IoError,
28 Invalid,
29 Duplicate,
30 Conflict,
31}
32
33impl fmt::Display for Error {
34 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35 write!(f, "{:?}", self)
36 }
37}
38
39impl std::error::Error for Error {}
40
41impl From<io::Error> for Error {
42 fn from(_: io::Error) -> Self {
43 Error::IoError
44 }
45}
46
47pub type Result<T> = std::result::Result<T, Error>;
48
49pub type PageId = u32;
50
51pub const MAGIC: u64 = 0x636f776274726565;
52pub const FORMAT_VERSION: u32 = 3;
53
54static BTREE_INSTANCE_REGISTRY: OnceLock<Mutex<HashMap<PathBuf, Weak<BTree>>>> = OnceLock::new();
55
56fn btree_instance_registry() -> &'static Mutex<HashMap<PathBuf, Weak<BTree>>> {
57 BTREE_INSTANCE_REGISTRY.get_or_init(|| Mutex::new(HashMap::new()))
58}
59
60fn sweep_dead_btree_instances(reg: &mut HashMap<PathBuf, Weak<BTree>>) {
61 reg.retain(|_, weak| weak.strong_count() > 0);
62}
63
64fn normalize_db_path(path: &Path) -> PathBuf {
65 if let Ok(canonical) = std::fs::canonicalize(path) {
66 return canonical;
67 }
68
69 let absolute = if path.is_absolute() {
70 path.to_path_buf()
71 } else if let Ok(cwd) = std::env::current_dir() {
72 cwd.join(path)
73 } else {
74 path.to_path_buf()
75 };
76
77 let parent_canonical = absolute
78 .parent()
79 .and_then(|p| std::fs::canonicalize(p).ok());
80 if let Some(parent) = parent_canonical
81 && let Some(name) = absolute.file_name()
82 {
83 return parent.join(name);
84 }
85 absolute
86}
87
88#[repr(C)]
89#[derive(Clone, Copy, Debug)]
90pub struct MetaNode {
91 pub magic: u64,
92 pub format_version: u32,
93 pub page_size: u32,
94 pub catalog_root: PageId,
95 pub mapping_root: PageId,
96 pub reverse_root: PageId,
97 pub next_lid: PageId,
98 pub next_page_id: PageId,
99 pub freelist_root: PageId,
100 pub seq: u64,
101 pub checksum: u64,
102}
103
104impl MetaNode {
105 pub fn as_page_slice(&self) -> [u8; PAGE_SIZE] {
106 let mut buf = [0u8; PAGE_SIZE];
107 let src = unsafe {
108 std::slice::from_raw_parts(
109 (self as *const Self) as *const u8,
110 std::mem::size_of::<Self>(),
111 )
112 };
113 buf[..src.len()].copy_from_slice(src);
114 buf
115 }
116
117 pub fn from_slice(x: &[u8]) -> Self {
118 unsafe { std::ptr::read_unaligned(x.as_ptr().cast::<Self>()) }
119 }
120}
121
122impl Default for MetaNode {
123 fn default() -> Self {
124 Self::new()
125 }
126}
127
128impl MetaNode {
129 pub fn new() -> Self {
130 let mut this = Self {
131 magic: MAGIC,
132 format_version: FORMAT_VERSION,
133 page_size: PAGE_SIZE as u32,
134 catalog_root: 0,
135 mapping_root: 0,
136 reverse_root: 0,
137 next_lid: 1,
138 next_page_id: 2, freelist_root: 0,
140 seq: 1,
141 checksum: 0,
142 };
143 this.update_checksum();
144 this
145 }
146
147 pub fn update_checksum(&mut self) {
148 self.checksum = 0;
149 let s = unsafe {
150 std::slice::from_raw_parts(
151 (self as *const Self) as *const u8,
152 std::mem::size_of::<Self>(),
153 )
154 };
155 self.checksum = crc32c::crc32c(s) as u64;
156 }
157
158 fn calc_checksum(&self) -> u64 {
159 let mut clone = *self;
160 clone.checksum = 0;
161 let s = unsafe {
162 std::slice::from_raw_parts(
163 (&clone as *const Self) as *const u8,
164 std::mem::size_of::<Self>(),
165 )
166 };
167 crc32c::crc32c(s) as u64
168 }
169
170 pub fn validate(&self) -> Result<()> {
171 if self.magic == 0 && self.seq == 0 {
173 return Err(Error::Corruption);
174 }
175 if self.checksum != self.calc_checksum() {
176 return Err(Error::Corruption);
177 }
178 Ok(())
179 }
180}
181
182use crate::{
183 node::{MAX_KEY_LEN, Node, PAGE_SIZE},
184 page_store::{LogicalStore, PageStore, decode_u32_key, encode_u32_key},
185 store::Store,
186};
187
188struct Route {
189 node: Arc<Node>,
190 page_id: PageId,
191 pos: usize,
192}
193
194struct TxContext<'a> {
196 store: &'a dyn PageStore,
197 freed: &'a mut Vec<(PageId, u32)>,
198 alloc: &'a mut HashSet<PageId>,
199}
200
201impl<'a> TxContext<'a> {
202 fn new(
203 store: &'a dyn PageStore,
204 freed: &'a mut Vec<(PageId, u32)>,
205 alloc: &'a mut HashSet<PageId>,
206 ) -> Self {
207 Self {
208 store,
209 freed,
210 alloc,
211 }
212 }
213
214 fn alloc_page(&mut self) -> Result<PageId> {
215 self.store.alloc_page(self.alloc)
216 }
217
218 fn write_node(&mut self, node: &mut Node) -> Result<PageId> {
219 let pid = self.alloc_page()?;
220 self.store.write_data(&[pid], node.finalize())?;
221 Ok(pid)
222 }
223
224 fn free_page(&mut self, pid: PageId) -> Result<()> {
225 self.store.schedule_free(pid, self.freed)
226 }
227}
228
229pub struct Tree {
230 store: Arc<dyn PageStore>,
231 pub root_page_id: Arc<RwLock<PageId>>,
232 pending_free: Arc<RwLock<Vec<(PageId, u32)>>>,
233 pending_alloc: Arc<RwLock<HashSet<PageId>>>,
234}
235
236impl Tree {
237 pub fn open(
238 store: Arc<dyn PageStore>,
239 root_page_id: Arc<RwLock<PageId>>,
240 pending_free: Arc<RwLock<Vec<(PageId, u32)>>>,
241 pending_alloc: Arc<RwLock<HashSet<PageId>>>,
242 ) -> Result<Self> {
243 Ok(Self {
244 store,
245 root_page_id,
246 pending_free,
247 pending_alloc,
248 })
249 }
250
251 fn traverse_to_leaf(
252 &self,
253 mut node: Arc<Node>,
254 mut page_id: PageId,
255 key: &[u8],
256 ) -> Result<(Vec<Route>, Arc<Node>, PageId)> {
257 let mut stack = Vec::new();
258 while !node.is_leaf() {
259 let pos = match node.search(key) {
260 Ok(pos) => pos,
261 Err(pos) => pos.saturating_sub(1),
262 };
263 let child_id = node.child_at(pos);
264 if child_id == 0 {
265 return Err(Error::Corruption);
266 }
267 let child_node = self.store.load_node(child_id)?;
268 stack.push(Route { node, page_id, pos });
269 node = child_node;
270 page_id = child_id;
271 }
272 Ok((stack, node, page_id))
273 }
274
275 fn merge_pending(&self, freed: Vec<(PageId, u32)>, mut alloc: HashSet<PageId>) {
277 let mut main_free = self.pending_free.write();
278 let mut main_alloc = self.pending_alloc.write();
279
280 for (pid, nr) in freed {
281 if alloc.remove(&pid) {
282 let _ = self.store.free_pages_immediate(pid, nr);
285 continue;
286 }
287 if main_alloc.remove(&pid) {
288 let _ = self.store.free_pages_immediate(pid, nr);
290 } else {
291 Self::merge_free_extent(&mut main_free, pid, nr);
292 }
293 }
294 main_alloc.extend(alloc);
295 }
296
297 fn merge_free_extent(free: &mut Vec<(PageId, u32)>, page_id: PageId, nr_pages: u32) {
298 if page_id == 0 || nr_pages == 0 {
299 return;
300 }
301
302 let mut start = page_id as u64;
303 let mut end = start + nr_pages as u64;
304 let mut idx = 0;
305
306 while idx < free.len() && (free[idx].0 as u64) + (free[idx].1 as u64) < start {
307 idx += 1;
308 }
309
310 while idx < free.len() {
311 let (free_start, free_len) = free[idx];
312 let free_start = free_start as u64;
313 let free_end = free_start + free_len as u64;
314 if free_start > end {
315 break;
316 }
317 start = start.min(free_start);
318 end = end.max(free_end);
319 free.remove(idx);
320 }
321
322 free.insert(idx, (start as PageId, (end - start) as u32));
323 }
324
325 pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
326 if key.len() > MAX_KEY_LEN {
327 return Err(Error::TooLarge);
328 }
329 let mut freed = Vec::new();
331 let mut alloc = HashSet::new();
332
333 let result = self.execute_put(key, value, &mut freed, &mut alloc);
334
335 if result.is_err() {
336 for pid in alloc {
338 let _ = self.store.free_pages_immediate(pid, 1);
339 }
340 return result;
341 }
342
343 self.merge_pending(freed, alloc);
344 Ok(())
345 }
346
347 fn execute_put(
348 &self,
349 key: &[u8],
350 value: &[u8],
351 freed: &mut Vec<(PageId, u32)>,
352 alloc: &mut HashSet<PageId>,
353 ) -> Result<()> {
354 let mut ctx = TxContext::new(self.store.as_ref(), freed, alloc);
355 let mut root_lock = self.root_page_id.write();
356 let current_root_id = *root_lock;
357
358 if current_root_id == 0 {
360 let mut node = Node::new_leaf();
361 node.put(ctx.store, key, value, ctx.freed, ctx.alloc)?;
362 *root_lock = ctx.write_node(&mut node)?;
363 return Ok(());
364 }
365
366 let root_node = self.store.load_node(current_root_id)?;
368 let (mut stack, leaf_node_arc, leaf_id) =
369 self.traverse_to_leaf(root_node, current_root_id, key)?;
370
371 let mut current_node = (*leaf_node_arc).clone();
372
373 let mut split_info = self.apply_insert(&mut ctx, &mut current_node, key, value)?;
375
376 let mut new_child_id = ctx.write_node(&mut current_node)?;
378 ctx.free_page(leaf_id)?;
379
380 while let Some(Route {
382 node: parent_arc,
383 page_id: parent_id,
384 pos,
385 }) = stack.pop()
386 {
387 let mut parent = (*parent_arc).clone();
388
389 parent.update_child_page(pos, new_child_id);
391
392 if let Some((sep, mut rhs)) = split_info.take() {
394 let rhs_id = ctx.write_node(&mut rhs)?;
395 split_info =
397 self.apply_insert(&mut ctx, &mut parent, &sep, &rhs_id.to_le_bytes())?;
398 }
399
400 new_child_id = ctx.write_node(&mut parent)?;
402 ctx.free_page(parent_id)?;
403 }
404
405 if let Some((sep, mut rhs)) = split_info {
407 let rhs_id = ctx.write_node(&mut rhs)?;
408 let mut new_root = Node::new_branch();
409 new_root.put(
411 ctx.store,
412 &[],
413 &new_child_id.to_le_bytes(),
414 ctx.freed,
415 ctx.alloc,
416 )?;
417 new_root.put(ctx.store, &sep, &rhs_id.to_le_bytes(), ctx.freed, ctx.alloc)?;
419 *root_lock = ctx.write_node(&mut new_root)?;
420 } else {
421 *root_lock = new_child_id;
423 }
424
425 Ok(())
426 }
427
428 fn apply_insert(
429 &self,
430 ctx: &mut TxContext,
431 node: &mut Node,
432 key: &[u8],
433 value: &[u8],
434 ) -> Result<Option<(Vec<u8>, Node)>> {
435 match node.put(ctx.store, key, value, ctx.freed, ctx.alloc) {
436 Ok(()) => Ok(None),
437 Err(Error::NoSpace) => {
438 let (sep, mut rhs) = node.split()?;
440 if key < &sep {
441 node.put(ctx.store, key, value, ctx.freed, ctx.alloc)?;
442 } else {
443 rhs.put(ctx.store, key, value, ctx.freed, ctx.alloc)?;
444 }
445 Ok(Some((sep, rhs)))
446 }
447 Err(e) => Err(e),
448 }
449 }
450
451 pub fn get(&self, key: &[u8]) -> Result<Vec<u8>> {
452 let root_id = *self.root_page_id.read();
453 if root_id == 0 {
454 return Err(Error::NotFound);
455 }
456
457 let root_node = self.store.load_node(root_id)?;
458 let mut current = root_node;
459 loop {
460 if current.is_leaf() {
461 return current.get(self.store.as_ref(), key);
462 }
463 let pos = current.child_pos_for_key(key);
464 current = self.store.load_node(current.child_at(pos))?;
465 }
466 }
467
468 pub fn del(&self, key: &[u8]) -> Result<()> {
469 let mut freed = Vec::new();
471 let mut alloc = HashSet::new();
472
473 let result = self.execute_del(key, &mut freed, &mut alloc);
474
475 if result.is_err() {
476 for pid in alloc {
478 let _ = self.store.free_pages_immediate(pid, 1);
479 }
480 return result;
481 }
482
483 self.merge_pending(freed, alloc);
484 Ok(())
485 }
486
487 fn execute_del(
488 &self,
489 key: &[u8],
490 freed: &mut Vec<(PageId, u32)>,
491 alloc: &mut HashSet<PageId>,
492 ) -> Result<()> {
493 let mut ctx = TxContext::new(self.store.as_ref(), freed, alloc);
494 let mut root_lock = self.root_page_id.write();
495 let current_root_id = *root_lock;
496
497 if current_root_id == 0 {
498 return Err(Error::NotFound);
499 }
500
501 let root_node = self.store.load_node(current_root_id)?;
503 let (mut stack, leaf_arc, leaf_id) =
504 self.traverse_to_leaf(root_node, current_root_id, key)?;
505
506 let mut current_node = (*leaf_arc).clone();
507 current_node.del(ctx.store, key, ctx.freed)?;
508
509 let mut empty = current_node.is_empty();
511 let mut new_child_id = if !empty {
512 ctx.write_node(&mut current_node)?
513 } else {
514 0
515 };
516 ctx.free_page(leaf_id)?;
517
518 while let Some(Route {
520 node: parent_arc,
521 page_id: parent_id,
522 pos,
523 }) = stack.pop()
524 {
525 let mut parent = (*parent_arc).clone();
526
527 if empty {
528 parent.shrink_slot(pos);
531 if !parent.is_leaf() && pos == 0 && !parent.is_empty() {
534 parent.slot_at_mut(0).klen = 0;
535 parent.dirty = true;
536 }
537 } else {
538 parent.update_child_page(pos, new_child_id);
540 }
541
542 if parent.is_empty() {
544 empty = true;
545 new_child_id = 0;
546 } else {
547 empty = false;
548 new_child_id = ctx.write_node(&mut parent)?;
549 }
550 ctx.free_page(parent_id)?;
551 }
552
553 if new_child_id != 0 {
556 loop {
557 let node_id = new_child_id;
558 let node = self.store.load_node(node_id)?;
559 if !node.is_leaf() && node.num_children() == 1 {
560 let child_id = node.child_at(0);
561 ctx.free_page(node_id)?;
562 new_child_id = child_id;
563 } else {
564 break;
565 }
566 }
567 }
568
569 *root_lock = new_child_id;
570 Ok(())
571 }
572
573 pub(crate) fn collect_tree_pages(
574 store: &dyn PageStore,
575 root_id: PageId,
576 freed: &mut Vec<(PageId, u32)>,
577 ) -> Result<()> {
578 if root_id == 0 {
579 return Ok(());
580 }
581
582 let mut stack = vec![root_id];
583
584 while let Some(current_id) = stack.pop() {
585 let node = store.load_node(current_id)?;
586 store.schedule_free(current_id, freed)?;
587
588 for i in 0..node.num_children() {
589 if node.is_leaf() {
590 let slot = node.slot_at(i);
591 if !slot.is_inline() {
592 node.free_slot_pages(store, slot, freed)?;
593 }
594 } else {
595 let child_id = node.child_at(i);
596 if child_id != 0 {
597 stack.push(child_id);
598 }
599 }
600 }
601 }
602 Ok(())
603 }
604
605 pub fn iterator(&self) -> TreeIterator {
606 let root_id = *self.root_page_id.read();
607 TreeIterator::new(self.store.clone(), root_id)
608 }
609
610 pub fn iterator_from(&self, key: &[u8]) -> TreeIterator {
611 let root_id = *self.root_page_id.read();
612 TreeIterator::new_from(self.store.clone(), root_id, key)
613 }
614}
615
616pub struct TreeIterator {
617 store: Arc<dyn PageStore>,
618 stack: Vec<(Arc<Node>, usize)>,
619 current_leaf: Option<(Arc<Node>, usize)>,
620}
621
622impl TreeIterator {
623 fn new(store: Arc<dyn PageStore>, root_id: PageId) -> Self {
624 let mut iter = Self {
625 store,
626 stack: Vec::new(),
627 current_leaf: None,
628 };
629
630 if root_id != 0
631 && let Ok(node) = iter.store.load_node(root_id)
632 {
633 iter.push_node(node);
634 }
635 iter
636 }
637
638 fn new_from(store: Arc<dyn PageStore>, root_id: PageId, key: &[u8]) -> Self {
639 let mut iter = Self {
640 store,
641 stack: Vec::new(),
642 current_leaf: None,
643 };
644
645 if root_id == 0 {
646 return iter;
647 }
648
649 let mut node = match iter.store.load_node(root_id) {
650 Ok(node) => node,
651 Err(_) => return iter,
652 };
653
654 while !node.is_leaf() {
655 let pos = match node.search(key) {
656 Ok(pos) => pos,
657 Err(pos) => pos.saturating_sub(1),
658 };
659 let child_id = node.child_at(pos);
660 if child_id == 0 {
661 return iter;
662 }
663 iter.stack.push((node.clone(), pos + 1));
664 match iter.store.load_node(child_id) {
665 Ok(child) => node = child,
666 Err(_) => return iter,
667 }
668 }
669
670 let leaf_pos = match node.search(key) {
671 Ok(pos) => pos,
672 Err(pos) => pos,
673 };
674 iter.current_leaf = Some((node, leaf_pos));
675 iter
676 }
677
678 fn push_node(&mut self, node: Arc<Node>) {
679 if node.is_leaf() {
680 self.current_leaf = Some((node, 0));
681 } else {
682 self.stack.push((node, 0));
683 }
684 }
685
686 pub fn next_ref(&mut self, key_buf: &mut Vec<u8>, val_buf: &mut Vec<u8>) -> bool {
687 loop {
688 if let Some((leaf, idx)) = self.current_leaf.as_mut() {
689 if *idx < leaf.num_children() {
690 let slot = leaf.slot_at(*idx);
691
692 key_buf.clear();
693 key_buf.extend_from_slice(leaf.key_at(*idx));
694
695 val_buf.clear();
696 if slot.is_inline() {
697 val_buf.extend_from_slice(leaf.value_at(*idx));
698 } else if let Ok(pages) = leaf.collect_page_ids(self.store.as_ref(), slot) {
699 val_buf.resize(slot.value_len(), 0);
700 if self.store.read_data(&pages, val_buf).is_err() {
701 *idx += 1;
702 continue;
703 }
704 } else {
705 *idx += 1;
706 continue;
707 }
708
709 *idx += 1;
710 return true;
711 } else {
712 self.current_leaf = None;
713 }
714 }
715
716 if let Some((node, idx)) = self.stack.last_mut() {
717 if *idx < node.num_children() {
718 let child_id = node.child_at(*idx);
719 *idx += 1;
720 if let Ok(child_node) = self.store.load_node(child_id) {
721 self.push_node(child_node);
722 }
723 } else {
724 self.stack.pop();
725 }
726 } else {
727 return false;
728 }
729 }
730 }
731}
732
733pub struct Txn<'a> {
734 pub(crate) tree: Tree,
735 pub(crate) _marker: std::marker::PhantomData<&'a ()>,
736}
737
738impl<'a> Txn<'a> {
739 pub fn put<K, V>(&mut self, key: K, value: V) -> Result<()>
740 where
741 K: AsRef<[u8]>,
742 V: AsRef<[u8]>,
743 {
744 self.tree.put(key.as_ref(), value.as_ref())
745 }
746
747 pub fn get<K>(&self, key: K) -> Result<Vec<u8>>
748 where
749 K: AsRef<[u8]>,
750 {
751 self.tree.get(key.as_ref())
752 }
753
754 pub fn del<K>(&mut self, key: K) -> Result<()>
755 where
756 K: AsRef<[u8]>,
757 {
758 self.tree.del(key.as_ref())
759 }
760
761 pub fn iter(&self) -> TreeIterator {
762 self.tree.iterator()
763 }
764}
765
766pub struct ReadOnlyTree {
767 store: Arc<dyn PageStore>,
768 root_page_id: PageId,
769 root_node: Option<Arc<Node>>,
770}
771
772impl ReadOnlyTree {
773 fn new(store: Arc<dyn PageStore>, root_page_id: PageId) -> Result<Self> {
774 let root_node = if root_page_id == 0 {
775 None
776 } else {
777 Some(store.load_node(root_page_id)?)
778 };
779 Ok(Self {
780 store,
781 root_page_id,
782 root_node,
783 })
784 }
785
786 fn get(&self, key: &[u8]) -> Result<Vec<u8>> {
787 if self.root_page_id == 0 {
788 return Err(Error::NotFound);
789 }
790
791 let mut current = self.root_node.clone().ok_or(Error::NotFound)?;
792 loop {
793 if current.is_leaf() {
794 return current.get(self.store.as_ref(), key);
795 }
796 let pos = current.child_pos_for_key(key);
797 current = self.store.load_node(current.child_at(pos))?;
798 }
799 }
800
801 fn iterator(&self) -> TreeIterator {
802 TreeIterator::new(self.store.clone(), self.root_page_id)
803 }
804}
805
806pub struct ReadOnlyTxn<'a> {
807 pub(crate) tree: Arc<ReadOnlyTree>,
808 pub(crate) _guard: RwLockReadGuard<'a, ()>,
809}
810
811impl<'a> ReadOnlyTxn<'a> {
812 pub fn get<K>(&self, key: K) -> Result<Vec<u8>>
813 where
814 K: AsRef<[u8]>,
815 {
816 self.tree.get(key.as_ref())
817 }
818
819 pub fn iter(&self) -> TreeIterator {
820 self.tree.iterator()
821 }
822}
823
824pub struct MultiTxn<'a> {
825 btree: &'a BTree,
826 bucket_roots: HashMap<String, PageId>,
827}
828
829impl<'a> MultiTxn<'a> {
830 pub fn exec<F, R>(&mut self, bucket: &str, f: F) -> Result<R>
831 where
832 F: FnOnce(&mut Txn) -> Result<R>,
833 {
834 let name_bytes = bucket.as_bytes();
835
836 let initial_root = if let Some(&root) = self.bucket_roots.get(bucket) {
837 root
838 } else {
839 match self.btree.catalog_tree.get(name_bytes) {
840 Ok(bytes) => BucketMetadata::from_slice(&bytes).root_page_id,
841 Err(Error::NotFound) => 0,
842 Err(e) => return Err(e),
843 }
844 };
845
846 let logical_store_obj: Arc<dyn PageStore> = self.btree.logical_store.clone();
847 let tree = Tree::open(
848 logical_store_obj,
849 Arc::new(RwLock::new(initial_root)),
850 self.btree.pending_free.clone(),
851 self.btree.pending_alloc.clone(),
852 )?;
853
854 let mut txn = Txn {
855 tree,
856 _marker: std::marker::PhantomData,
857 };
858
859 let res = f(&mut txn);
860 if res.is_ok() {
861 let new_root = *txn.tree.root_page_id.read();
862 self.bucket_roots.insert(bucket.to_string(), new_root);
863 }
864 res
865 }
866}
867
868pub struct BucketMetadata {
869 pub(crate) root_page_id: PageId,
870}
871
872impl BucketMetadata {
873 pub fn from_slice(x: &[u8]) -> Self {
874 assert!(x.len() >= std::mem::size_of::<Self>());
875 unsafe { std::ptr::read_unaligned(x.as_ptr().cast::<Self>()) }
876 }
877
878 pub fn as_slice(&self) -> &[u8] {
879 unsafe {
880 std::slice::from_raw_parts(
881 self as *const Self as *const u8,
882 std::mem::size_of::<Self>(),
883 )
884 }
885 }
886}
887
888pub struct BTree {
889 pub(crate) store: Arc<Store>,
890 pub(crate) catalog_tree: Arc<Tree>,
891 pub(crate) mapping_tree: Arc<Tree>,
892 pub(crate) reverse_tree: Arc<Tree>,
893 pub(crate) logical_store: Arc<LogicalStore>,
894 pub(crate) pending_free: Arc<RwLock<Vec<(PageId, u32)>>>,
895 pub(crate) pending_alloc: Arc<RwLock<HashSet<PageId>>>,
896 pub(crate) writer_lock: Arc<RwLock<()>>,
897 pub(crate) start_root_id: Arc<AtomicU32>,
898 pub(crate) start_seq: Arc<AtomicU64>,
899 pub(crate) bucket_root_cache: Arc<RwLock<BucketRootCache>>,
900 pub(crate) bucket_tree_cache: Arc<RwLock<BucketTreeCache>>,
901 instance_anchor: Option<Arc<BTree>>,
902}
903
904type BucketRootCache = HashMap<Vec<u8>, (PageId, u64)>;
905type BucketTreeCache = HashMap<Vec<u8>, (PageId, u64, Arc<ReadOnlyTree>)>;
906
907const COMPACT_SMALL_DATA_THRESHOLD_PAGES: u64 = 1024;
909const COMPACT_TAIL_RATIO: f64 = 0.5;
911
912#[derive(Clone, Copy, Debug, PartialEq, Eq)]
913pub struct CompactStats {
914 pub moved_pages: u64,
915 pub remaining_candidates: u64,
916}
917
918impl BTree {
919 fn sync_local_snapshot_from_store(&self) {
920 let snapshot = self.store.cached_snapshot();
921 *self.catalog_tree.root_page_id.write() = snapshot.catalog_root;
922 *self.mapping_tree.root_page_id.write() = snapshot.mapping_root;
923 *self.reverse_tree.root_page_id.write() = snapshot.reverse_root;
924 self.start_root_id
925 .store(snapshot.catalog_root, Ordering::Release);
926 self.start_seq.store(snapshot.seq, Ordering::Release);
927 }
928
929 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
931 let path = path.as_ref();
932 let key = normalize_db_path(path);
933
934 if let Some(existing) = {
935 let mut reg = btree_instance_registry().lock();
936 sweep_dead_btree_instances(&mut reg);
937 let upgraded = reg.get(&key).and_then(|w| w.upgrade());
938 if upgraded.is_none() {
939 reg.remove(&key);
940 }
941 upgraded
942 } {
943 let mut handle = existing.as_ref().clone();
944 handle.instance_anchor = Some(existing);
945 handle.sync_local_snapshot_from_store();
946 return Ok(handle);
947 }
948
949 let store = Arc::new(Store::open(path)?);
950 let catalog_root = store.get_catalog_root();
951 let mapping_root = store.get_mapping_root();
952 let reverse_root = store.get_reverse_root();
953 let initial_seq = store.get_seq();
954 let pending_free = Arc::new(RwLock::new(Vec::new()));
955 let pending_alloc = Arc::new(RwLock::new(HashSet::new()));
956 let mapping_root_lock = Arc::new(RwLock::new(mapping_root));
957 let reverse_root_lock = Arc::new(RwLock::new(reverse_root));
958
959 let physical_store: Arc<dyn PageStore> = store.clone();
960 let mapping_tree = Arc::new(Tree::open(
961 physical_store.clone(),
962 mapping_root_lock,
963 pending_free.clone(),
964 pending_alloc.clone(),
965 )?);
966 let reverse_tree = Arc::new(Tree::open(
967 physical_store,
968 reverse_root_lock,
969 pending_free.clone(),
970 pending_alloc.clone(),
971 )?);
972
973 let logical_store = Arc::new(LogicalStore::new(
974 store.clone(),
975 mapping_tree.clone(),
976 reverse_tree.clone(),
977 ));
978 let catalog_tree_root_lock = Arc::new(RwLock::new(catalog_root));
979 let logical_store_obj: Arc<dyn PageStore> = logical_store.clone();
980 let catalog_tree = Arc::new(Tree::open(
981 logical_store_obj,
982 catalog_tree_root_lock,
983 pending_free.clone(),
984 pending_alloc.clone(),
985 )?);
986
987 let instance = Self {
988 store: store.clone(),
989 catalog_tree,
990 mapping_tree,
991 reverse_tree,
992 logical_store,
993 pending_free,
994 pending_alloc,
995 writer_lock: Arc::new(RwLock::new(())),
996 start_root_id: Arc::new(AtomicU32::new(catalog_root)),
997 start_seq: Arc::new(AtomicU64::new(initial_seq)),
998 bucket_root_cache: Arc::new(RwLock::new(HashMap::new())),
999 bucket_tree_cache: Arc::new(RwLock::new(HashMap::new())),
1000 instance_anchor: None,
1001 };
1002 let instance_arc = Arc::new(instance);
1003 {
1004 let mut reg = btree_instance_registry().lock();
1005 sweep_dead_btree_instances(&mut reg);
1006 if let Some(existing) = reg.get(&key).and_then(|w| w.upgrade()) {
1007 let mut handle = existing.as_ref().clone();
1008 handle.instance_anchor = Some(existing);
1009 handle.sync_local_snapshot_from_store();
1010 return Ok(handle);
1011 }
1012 reg.insert(key, Arc::downgrade(&instance_arc));
1013 }
1014
1015 let mut handle = instance_arc.as_ref().clone();
1016 handle.instance_anchor = Some(instance_arc);
1017 Ok(handle)
1018 }
1019
1020 pub fn exec<F, R>(&self, bucket: &str, f: F) -> Result<R>
1031 where
1032 F: FnOnce(&mut Txn) -> Result<R>,
1033 {
1034 let _lock = self.writer_lock.write();
1035
1036 self.refresh_internal()?;
1039
1040 let name_bytes = bucket.as_bytes();
1042 let initial_root = match self.catalog_tree.get(name_bytes) {
1043 Ok(bytes) => BucketMetadata::from_slice(&bytes).root_page_id,
1044 Err(Error::NotFound) => 0,
1045 Err(e) => return Err(e),
1046 };
1047
1048 let pre_alloc = self.pending_alloc.read().clone();
1050 let pre_free = self.pending_free.read().clone();
1051 let pre_catalog_root = *self.catalog_tree.root_page_id.read();
1052 let pre_mapping_root = *self.mapping_tree.root_page_id.read();
1053 let pre_reverse_root = *self.reverse_tree.root_page_id.read();
1054
1055 let logical_store_obj: Arc<dyn PageStore> = self.logical_store.clone();
1056 let tree = Tree::open(
1057 logical_store_obj,
1058 Arc::new(RwLock::new(initial_root)),
1059 self.pending_free.clone(),
1060 self.pending_alloc.clone(),
1061 )?;
1062
1063 let mut txn = Txn {
1064 tree,
1065 _marker: std::marker::PhantomData,
1066 };
1067
1068 match f(&mut txn) {
1069 Ok(res) => {
1070 let new_root = *txn.tree.root_page_id.read();
1071 let metadata = BucketMetadata {
1072 root_page_id: new_root,
1073 };
1074 self.catalog_tree.put(name_bytes, metadata.as_slice())?;
1075 if let Err(e) = self.commit_internal() {
1076 *self.catalog_tree.root_page_id.write() = pre_catalog_root;
1078 *self.mapping_tree.root_page_id.write() = pre_mapping_root;
1079 *self.reverse_tree.root_page_id.write() = pre_reverse_root;
1080 self.rollback_pages(&pre_alloc, &pre_free);
1081 return Err(e);
1082 }
1083 let latest_seq = self.store.get_seq();
1084 self.bucket_root_cache
1085 .write()
1086 .insert(name_bytes.to_vec(), (new_root, latest_seq));
1087 Ok(res)
1088 }
1089 Err(e) => {
1090 *self.catalog_tree.root_page_id.write() = pre_catalog_root;
1091 *self.mapping_tree.root_page_id.write() = pre_mapping_root;
1092 *self.reverse_tree.root_page_id.write() = pre_reverse_root;
1093 self.rollback_pages(&pre_alloc, &pre_free);
1094 Err(e)
1095 }
1096 }
1097 }
1098
1099 pub fn exec_multi<F, R>(&self, f: F) -> Result<R>
1104 where
1105 F: FnOnce(&mut MultiTxn) -> Result<R>,
1106 {
1107 let _lock = self.writer_lock.write();
1108
1109 self.refresh_internal()?;
1110
1111 let pre_alloc = self.pending_alloc.read().clone();
1112 let pre_free = self.pending_free.read().clone();
1113 let pre_catalog_root = *self.catalog_tree.root_page_id.read();
1114 let pre_mapping_root = *self.mapping_tree.root_page_id.read();
1115 let pre_reverse_root = *self.reverse_tree.root_page_id.read();
1116
1117 let mut multi_txn = MultiTxn {
1118 btree: self,
1119 bucket_roots: HashMap::new(),
1120 };
1121
1122 match f(&mut multi_txn) {
1123 Ok(res) => {
1124 let mut updated = Vec::new();
1125 for (name, new_root) in multi_txn.bucket_roots {
1126 let metadata = BucketMetadata {
1127 root_page_id: new_root,
1128 };
1129 self.catalog_tree
1130 .put(name.as_bytes(), metadata.as_slice())?;
1131 updated.push((name.into_bytes(), new_root));
1132 }
1133 if let Err(e) = self.commit_internal() {
1134 *self.catalog_tree.root_page_id.write() = pre_catalog_root;
1135 *self.mapping_tree.root_page_id.write() = pre_mapping_root;
1136 *self.reverse_tree.root_page_id.write() = pre_reverse_root;
1137 self.rollback_pages(&pre_alloc, &pre_free);
1138 return Err(e);
1139 }
1140 let latest_seq = self.store.get_seq();
1141 let mut cache = self.bucket_root_cache.write();
1142 for (name, new_root) in updated {
1143 cache.insert(name, (new_root, latest_seq));
1144 }
1145 Ok(res)
1146 }
1147 Err(e) => {
1148 *self.catalog_tree.root_page_id.write() = pre_catalog_root;
1149 *self.mapping_tree.root_page_id.write() = pre_mapping_root;
1150 *self.reverse_tree.root_page_id.write() = pre_reverse_root;
1151 self.rollback_pages(&pre_alloc, &pre_free);
1152 Err(e)
1153 }
1154 }
1155 }
1156
1157 fn rollback_pages(&self, pre_alloc: &HashSet<PageId>, pre_free: &[(PageId, u32)]) {
1158 let mut alloc = self.pending_alloc.write();
1159 let mut freed_now = Vec::new();
1160 for &pid in alloc.iter() {
1161 if !pre_alloc.contains(&pid) {
1162 freed_now.push(pid);
1163 }
1164 }
1165 for pid in freed_now {
1166 alloc.remove(&pid);
1167 let _ = self.store.free_pages(pid, 1);
1168 }
1169 drop(alloc);
1170
1171 let current_free = self.pending_free.read().clone();
1172 let freed_delta = Self::diff_free_extents(¤t_free, pre_free);
1173 for (pid, nr) in freed_delta {
1174 let _ = self.store.unfree_pages(pid, nr);
1175 }
1176
1177 *self.pending_free.write() = pre_free.to_owned();
1178 }
1179
1180 fn diff_free_extents(current: &[(PageId, u32)], base: &[(PageId, u32)]) -> Vec<(PageId, u32)> {
1181 let mut res = Vec::new();
1182 let mut j = 0usize;
1183
1184 for &(cur_start, cur_len) in current {
1185 let cur_start_u64 = cur_start as u64;
1186 let cur_end = cur_start_u64 + cur_len as u64;
1187 let mut start = cur_start_u64;
1188
1189 while j < base.len() && (base[j].0 as u64) + base[j].1 as u64 <= start {
1190 j += 1;
1191 }
1192
1193 let mut k = j;
1194 while k < base.len() {
1195 let (base_start, base_len) = base[k];
1196 let base_start_u64 = base_start as u64;
1197 let base_end = base_start_u64 + base_len as u64;
1198
1199 if base_start_u64 >= cur_end {
1200 break;
1201 }
1202
1203 if base_start_u64 > start {
1204 res.push((start as PageId, (base_start_u64 - start) as u32));
1205 }
1206
1207 if base_end >= cur_end {
1208 start = cur_end;
1209 break;
1210 }
1211
1212 start = base_end;
1213 k += 1;
1214 }
1215
1216 if start < cur_end {
1217 res.push((start as PageId, (cur_end - start) as u32));
1218 }
1219 }
1220
1221 res
1222 }
1223
1224 fn max_tree_page_id(store: &dyn PageStore, root_id: PageId) -> Result<PageId> {
1225 if root_id == 0 {
1226 return Ok(0);
1227 }
1228
1229 let mut max_pid = root_id;
1230 let mut stack = vec![root_id];
1231
1232 while let Some(pid) = stack.pop() {
1233 if pid > max_pid {
1234 max_pid = pid;
1235 }
1236 let node = store.load_node(pid)?;
1237 if node.is_leaf() {
1238 for i in 0..node.num_children() {
1239 let slot = node.slot_at(i);
1240 if !slot.is_inline() {
1241 let pages = node.collect_page_ids(store, slot)?;
1242 for page in pages {
1243 if page > max_pid {
1244 max_pid = page;
1245 }
1246 }
1247 }
1248 }
1249 } else {
1250 for i in 0..node.num_children() {
1251 let child = node.child_at(i);
1252 if child != 0 {
1253 stack.push(child);
1254 }
1255 }
1256 }
1257 }
1258
1259 Ok(max_pid)
1260 }
1261
1262 pub fn view<F, R>(&self, bucket: &str, f: F) -> Result<R>
1268 where
1269 F: FnOnce(&ReadOnlyTxn) -> Result<R>,
1270 {
1271 let lock = self.writer_lock.read();
1272
1273 let (mut latest_seq, mut latest_root) = self.store.shared_snapshot();
1275 let seq_changed = latest_seq != self.start_seq.load(Ordering::Acquire);
1276 if seq_changed {
1277 let snapshot = self.store.refresh_sb()?;
1278 self.store.clear_cache();
1280 self.logical_store.clear_lid_cache();
1281 *self.catalog_tree.root_page_id.write() = snapshot.catalog_root;
1282 *self.mapping_tree.root_page_id.write() = snapshot.mapping_root;
1283 *self.reverse_tree.root_page_id.write() = snapshot.reverse_root;
1284 self.start_root_id
1285 .store(snapshot.catalog_root, Ordering::Release);
1286 self.start_seq.store(snapshot.seq, Ordering::Release);
1287 self.bucket_root_cache.write().clear();
1288 self.bucket_tree_cache.write().clear();
1289 latest_seq = snapshot.seq;
1290 latest_root = snapshot.catalog_root;
1291 }
1292
1293 let name_bytes = bucket.as_bytes();
1294 let cached_tree =
1295 self.bucket_tree_cache
1296 .read()
1297 .get(name_bytes)
1298 .and_then(|(_root, seq, tree)| {
1299 if *seq == latest_seq {
1300 Some(tree.clone())
1301 } else {
1302 None
1303 }
1304 });
1305 if let Some(tree) = cached_tree {
1306 let txn = ReadOnlyTxn { tree, _guard: lock };
1307 return f(&txn);
1308 }
1309
1310 let cached_root = self
1311 .bucket_root_cache
1312 .read()
1313 .get(name_bytes)
1314 .and_then(|(root, seq)| {
1315 if *seq == latest_seq {
1316 Some(*root)
1317 } else {
1318 None
1319 }
1320 });
1321
1322 let bucket_root = if let Some(root) = cached_root {
1323 root
1324 } else {
1325 let logical_store_obj: Arc<dyn PageStore> = self.logical_store.clone();
1327 let catalog = Tree::open(
1328 logical_store_obj,
1329 Arc::new(RwLock::new(latest_root)),
1330 Arc::new(RwLock::new(Vec::new())),
1331 Arc::new(RwLock::new(HashSet::new())),
1332 )?;
1333
1334 let metadata_bytes = catalog.get(name_bytes)?;
1335 let metadata = BucketMetadata::from_slice(&metadata_bytes);
1336 let root = metadata.root_page_id;
1337 self.bucket_root_cache
1338 .write()
1339 .insert(name_bytes.to_vec(), (root, latest_seq));
1340 root
1341 };
1342
1343 let logical_store_obj: Arc<dyn PageStore> = self.logical_store.clone();
1344 let tree = Arc::new(ReadOnlyTree::new(logical_store_obj, bucket_root)?);
1345 self.bucket_tree_cache
1346 .write()
1347 .insert(name_bytes.to_vec(), (bucket_root, latest_seq, tree.clone()));
1348
1349 let txn = ReadOnlyTxn { tree, _guard: lock };
1350
1351 f(&txn)
1352 }
1353
1354 pub fn del_bucket<N>(&self, name: N) -> Result<()>
1356 where
1357 N: AsRef<str>,
1358 {
1359 let _lock = self.writer_lock.write();
1360
1361 self.refresh_internal()?;
1363
1364 let name_bytes = name.as_ref().as_bytes();
1365 let metadata_bytes = self.catalog_tree.get(name_bytes)?;
1366 let bucket_metadata = BucketMetadata::from_slice(&metadata_bytes);
1367
1368 let mut pages_to_free = Vec::new();
1369 if bucket_metadata.root_page_id != 0 {
1370 Tree::collect_tree_pages(
1371 self.logical_store.as_ref(),
1372 bucket_metadata.root_page_id,
1373 &mut pages_to_free,
1374 )?;
1375 }
1376
1377 self.catalog_tree.del(name_bytes)?;
1378 self.pending_free.write().extend(pages_to_free);
1379 self.commit_internal()
1380 }
1381
1382 fn commit_internal(&self) -> Result<()> {
1383 let start_seq = self.start_seq.load(Ordering::Acquire);
1384 let (latest_seq, _) = self.store.shared_snapshot();
1385 if latest_seq != start_seq {
1386 return Err(Error::Conflict);
1387 }
1388 let snapshot = self.store.cached_snapshot();
1389
1390 let catalog_root = *self.catalog_tree.root_page_id.read();
1391 let mapping_root = *self.mapping_tree.root_page_id.read();
1392 let reverse_root = *self.reverse_tree.root_page_id.read();
1393
1394 let mut freed_lock = self.pending_free.write();
1395 let mut alloc_lock = self.pending_alloc.write();
1396
1397 if freed_lock.is_empty()
1398 && alloc_lock.is_empty()
1399 && snapshot.catalog_root == catalog_root
1400 && snapshot.mapping_root == mapping_root
1401 && snapshot.reverse_root == reverse_root
1402 {
1403 return Ok(());
1404 }
1405
1406 self.store
1407 .commit_roots(catalog_root, mapping_root, reverse_root, &freed_lock)?;
1408 self.store.sync()?;
1409
1410 freed_lock.clear();
1411 alloc_lock.clear();
1412 self.start_root_id.store(catalog_root, Ordering::Release);
1413 self.start_seq
1414 .store(self.store.get_seq(), Ordering::Release);
1415 Ok(())
1416 }
1417
1418 pub fn commit(&self) -> Result<()> {
1420 let _lock = self.writer_lock.write();
1421 self.commit_internal()
1422 }
1423
1424 fn compact_tail_window(
1425 total_pages: PageId,
1426 target_bytes: u64,
1427 ) -> Option<(PageId, PageId, u64)> {
1428 let usable_pages = total_pages.saturating_sub(2);
1429 if usable_pages == 0 {
1430 return None;
1431 }
1432
1433 let usable_pages_u64 = usable_pages as u64;
1434 let mut target_pages_u64 = if usable_pages_u64 <= COMPACT_SMALL_DATA_THRESHOLD_PAGES {
1435 usable_pages_u64
1436 } else if target_bytes == 0 {
1437 ((usable_pages_u64 as f64) * COMPACT_TAIL_RATIO).ceil() as u64
1438 } else {
1439 target_bytes.saturating_add(PAGE_SIZE as u64 - 1) / PAGE_SIZE as u64
1440 };
1441 if target_pages_u64 == 0 {
1442 return None;
1443 }
1444 if target_pages_u64 > usable_pages_u64 {
1445 target_pages_u64 = usable_pages_u64;
1446 }
1447
1448 let target_pages = target_pages_u64 as PageId;
1449 let tail_start = total_pages.saturating_sub(target_pages).max(2);
1450 Some((tail_start, target_pages, target_pages_u64))
1451 }
1452
1453 fn compact_move_tail(
1454 &self,
1455 total_pages: PageId,
1456 tail_start: PageId,
1457 target_pages_u64: u64,
1458 prealloc: Option<&[PageId]>,
1459 ) -> Result<(u64, u64, usize)> {
1460 let mut moved = 0u64;
1461 let mut total_candidates = 0u64;
1462 let mut prealloc_idx = 0usize;
1463 let mut iter = self.reverse_tree.iterator_from(&encode_u32_key(tail_start));
1464 let mut key_buf = Vec::new();
1465 let mut val_buf = Vec::new();
1466 let physical_store: &dyn PageStore = self.store.as_ref();
1467
1468 while iter.next_ref(&mut key_buf, &mut val_buf) {
1469 let pid = decode_u32_key(&key_buf)?;
1470 if pid < tail_start {
1471 continue;
1472 }
1473 if pid >= total_pages {
1474 break;
1475 }
1476 let lid = decode_u32_key(&val_buf)?;
1477 total_candidates += 1;
1478 if moved >= target_pages_u64 {
1479 continue;
1480 }
1481
1482 let new_pid = if let Some(pids) = prealloc {
1483 if prealloc_idx >= pids.len() {
1484 return Err(Error::Internal);
1485 }
1486 let pid = pids[prealloc_idx];
1487 prealloc_idx += 1;
1488 pid
1489 } else {
1490 let mut alloc = self.pending_alloc.write();
1491 physical_store.alloc_page(&mut alloc)?
1492 };
1493
1494 let page = self.store.load_page(pid)?;
1495 self.store.write_page(new_pid, &page)?;
1496
1497 let lid_key = encode_u32_key(lid);
1498 let new_pid_key = encode_u32_key(new_pid);
1499 let old_pid_key = encode_u32_key(pid);
1500
1501 self.mapping_tree.put(&lid_key, &new_pid_key)?;
1502 self.reverse_tree.del(&old_pid_key)?;
1503 self.reverse_tree.put(&new_pid_key, &lid_key)?;
1504
1505 {
1506 let mut freed = self.pending_free.write();
1507 physical_store.schedule_free(pid, &mut freed)?;
1508 }
1509
1510 moved += 1;
1511 }
1512
1513 Ok((moved, total_candidates, prealloc_idx))
1514 }
1515
1516 fn compact_tail_live_pages(&self, total_pages: PageId, tail_start: PageId) -> Result<u64> {
1517 let mut total_candidates = 0u64;
1518 let mut iter = self.reverse_tree.iterator_from(&encode_u32_key(tail_start));
1519 let mut key_buf = Vec::new();
1520 let mut val_buf = Vec::new();
1521
1522 while iter.next_ref(&mut key_buf, &mut val_buf) {
1523 let pid = decode_u32_key(&key_buf)?;
1524 if pid < tail_start {
1525 continue;
1526 }
1527 if pid >= total_pages {
1528 break;
1529 }
1530 total_candidates += 1;
1531 }
1532
1533 Ok(total_candidates)
1534 }
1535
1536 fn compact_release_unused_prealloc(&self, prealloc: &[PageId], used: usize) -> Result<()> {
1537 if used >= prealloc.len() {
1538 return Ok(());
1539 }
1540
1541 let unused: Vec<PageId> = prealloc[used..].to_vec();
1542 {
1543 let mut alloc = self.pending_alloc.write();
1544 for pid in &unused {
1545 alloc.remove(pid);
1546 }
1547 }
1548
1549 for pid in unused {
1550 self.store.free_pages(pid, 1)?;
1551 }
1552
1553 Ok(())
1554 }
1555
1556 pub fn compact(&self, target_bytes: u64) -> Result<CompactStats> {
1562 let _lock = self.writer_lock.write();
1563
1564 self.refresh_internal()?;
1565
1566 let total_pages = self.store.get_next_page_id();
1567 let (tail_start, _target_pages, target_pages_u64) =
1568 if let Some(params) = Self::compact_tail_window(total_pages, target_bytes) {
1569 params
1570 } else {
1571 return Ok(CompactStats {
1572 moved_pages: 0,
1573 remaining_candidates: 0,
1574 });
1575 };
1576 if target_pages_u64 == 0 {
1577 return Ok(CompactStats {
1578 moved_pages: 0,
1579 remaining_candidates: 0,
1580 });
1581 }
1582 let total_candidates = self.compact_tail_live_pages(total_pages, tail_start)?;
1583 if total_candidates == 0 {
1584 return Ok(CompactStats {
1585 moved_pages: 0,
1586 remaining_candidates: 0,
1587 });
1588 }
1589 let planned_moves = total_candidates.min(target_pages_u64);
1590 let strict_no_growth = target_bytes == 0;
1593
1594 let pre_alloc = self.pending_alloc.read().clone();
1595 let pre_free = self.pending_free.read().clone();
1596 let pre_catalog_root = *self.catalog_tree.root_page_id.read();
1597 let pre_mapping_root = *self.mapping_tree.root_page_id.read();
1598 let pre_reverse_root = *self.reverse_tree.root_page_id.read();
1599
1600 let mut prealloc = None;
1601 if strict_no_growth && self.store.free_pages_below(tail_start) < planned_moves {
1602 return Ok(CompactStats {
1603 moved_pages: 0,
1604 remaining_candidates: total_candidates,
1605 });
1606 }
1607 if let Some(pids) = self
1608 .store
1609 .alloc_pages_below(tail_start, planned_moves as PageId)?
1610 {
1611 let mut alloc = self.pending_alloc.write();
1612 for pid in &pids {
1613 alloc.insert(*pid);
1614 }
1615 prealloc = Some(pids);
1616 }
1617 if strict_no_growth && prealloc.is_none() {
1618 return Ok(CompactStats {
1619 moved_pages: 0,
1620 remaining_candidates: total_candidates,
1621 });
1622 }
1623 let move_budget = if strict_no_growth {
1624 planned_moves
1625 } else {
1626 target_pages_u64
1627 };
1628
1629 let move_result =
1630 self.compact_move_tail(total_pages, tail_start, move_budget, prealloc.as_deref());
1631
1632 let (moved, _scanned_candidates, used_prealloc) = match move_result {
1633 Ok(res) => res,
1634 Err(e) => {
1635 *self.catalog_tree.root_page_id.write() = pre_catalog_root;
1636 *self.mapping_tree.root_page_id.write() = pre_mapping_root;
1637 *self.reverse_tree.root_page_id.write() = pre_reverse_root;
1638 self.rollback_pages(&pre_alloc, &pre_free);
1639 return Err(e);
1640 }
1641 };
1642
1643 if let Some(prealloc) = prealloc.as_ref()
1644 && let Err(e) = self.compact_release_unused_prealloc(prealloc, used_prealloc)
1645 {
1646 *self.catalog_tree.root_page_id.write() = pre_catalog_root;
1647 *self.mapping_tree.root_page_id.write() = pre_mapping_root;
1648 *self.reverse_tree.root_page_id.write() = pre_reverse_root;
1649 self.rollback_pages(&pre_alloc, &pre_free);
1650 return Err(e);
1651 }
1652
1653 if let Err(e) = self.commit_internal() {
1654 *self.catalog_tree.root_page_id.write() = pre_catalog_root;
1655 *self.mapping_tree.root_page_id.write() = pre_mapping_root;
1656 *self.reverse_tree.root_page_id.write() = pre_reverse_root;
1657 self.rollback_pages(&pre_alloc, &pre_free);
1658 return Err(e);
1659 }
1660
1661 let max_reverse_pid = {
1662 let mut max_pid = 0u32;
1663 let mut iter = self.reverse_tree.iterator();
1664 let mut key_buf = Vec::new();
1665 let mut val_buf = Vec::new();
1666 while iter.next_ref(&mut key_buf, &mut val_buf) {
1667 let pid = decode_u32_key(&key_buf)?;
1668 if pid > max_pid {
1669 max_pid = pid;
1670 }
1671 }
1672 max_pid
1673 };
1674 let max_mapping_pid =
1675 Self::max_tree_page_id(self.store.as_ref(), *self.mapping_tree.root_page_id.read())?;
1676 let max_reverse_tree_pid =
1677 Self::max_tree_page_id(self.store.as_ref(), *self.reverse_tree.root_page_id.read())?;
1678 let max_freelist_pid = self.store.max_freelist_page_id();
1679 let mut min_end = max_reverse_pid
1680 .max(max_mapping_pid)
1681 .max(max_reverse_tree_pid)
1682 .max(max_freelist_pid)
1683 .saturating_add(1);
1684 if min_end < 2 {
1685 min_end = 2;
1686 }
1687
1688 let _ = self.store.try_truncate_tail_with_floor(min_end)?;
1689 self.start_seq
1690 .store(self.store.get_seq(), Ordering::Release);
1691 self.logical_store.clear_lid_cache();
1692
1693 Ok(CompactStats {
1694 moved_pages: moved,
1695 remaining_candidates: total_candidates.saturating_sub(moved),
1696 })
1697 }
1698
1699 fn refresh_internal(&self) -> Result<()> {
1700 self.pending_free.write().clear();
1701 self.pending_alloc.write().clear();
1702
1703 let (latest_seq, _) = self.store.shared_snapshot();
1705 if latest_seq == self.start_seq.load(Ordering::Acquire) {
1706 return Ok(());
1707 }
1708
1709 self.store.clear_cache();
1710 self.logical_store.clear_lid_cache();
1711
1712 let snapshot = self.store.refresh_sb()?;
1713 *self.catalog_tree.root_page_id.write() = snapshot.catalog_root;
1714 *self.mapping_tree.root_page_id.write() = snapshot.mapping_root;
1715 *self.reverse_tree.root_page_id.write() = snapshot.reverse_root;
1716 self.start_root_id
1717 .store(snapshot.catalog_root, Ordering::Release);
1718 self.start_seq.store(snapshot.seq, Ordering::Release);
1719 Ok(())
1720 }
1721
1722 pub fn buckets(&self) -> Result<Vec<String>> {
1724 let _lock = self.writer_lock.read();
1725
1726 let snapshot = self.store.refresh_sb()?;
1728 let physical_store: Arc<dyn PageStore> = self.store.clone();
1729 let mapping_tree = Arc::new(Tree::open(
1730 physical_store.clone(),
1731 Arc::new(RwLock::new(snapshot.mapping_root)),
1732 Arc::new(RwLock::new(Vec::new())),
1733 Arc::new(RwLock::new(HashSet::new())),
1734 )?);
1735 let reverse_tree = Arc::new(Tree::open(
1736 physical_store,
1737 Arc::new(RwLock::new(snapshot.reverse_root)),
1738 Arc::new(RwLock::new(Vec::new())),
1739 Arc::new(RwLock::new(HashSet::new())),
1740 )?);
1741 let logical_store = Arc::new(LogicalStore::new(
1742 self.store.clone(),
1743 mapping_tree,
1744 reverse_tree,
1745 ));
1746 let logical_store_obj: Arc<dyn PageStore> = logical_store;
1747 let catalog = Tree::open(
1748 logical_store_obj,
1749 Arc::new(RwLock::new(snapshot.catalog_root)),
1750 Arc::new(RwLock::new(Vec::new())),
1751 Arc::new(RwLock::new(HashSet::new())),
1752 )?;
1753
1754 let mut iter = catalog.iterator();
1755 let mut key_buf = Vec::new();
1756 let mut val_buf = Vec::new();
1757 let mut res = Vec::new();
1758 while iter.next_ref(&mut key_buf, &mut val_buf) {
1759 if let Ok(s) = std::str::from_utf8(&key_buf) {
1760 res.push(s.to_string());
1761 }
1762 }
1763 Ok(res)
1764 }
1765
1766 #[doc(hidden)]
1769 pub fn current_seq(&self) -> u64 {
1770 self.store.get_seq()
1771 }
1772
1773 #[doc(hidden)]
1776 pub fn pending_pages(&self) -> (usize, usize) {
1777 (
1778 self.pending_alloc.read().len(),
1779 self.pending_free.read().len(),
1780 )
1781 }
1782}
1783
1784impl Clone for BTree {
1785 fn clone(&self) -> Self {
1787 let catalog_root = *self.catalog_tree.root_page_id.read();
1788 let mapping_root = *self.mapping_tree.root_page_id.read();
1789 let reverse_root = *self.reverse_tree.root_page_id.read();
1790 let start_root_id = self.start_root_id.load(Ordering::Acquire);
1791 let start_seq = self.start_seq.load(Ordering::Acquire);
1792
1793 let physical_store: Arc<dyn PageStore> = self.store.clone();
1794 let mapping_tree = Arc::new(
1795 Tree::open(
1796 physical_store.clone(),
1797 Arc::new(RwLock::new(mapping_root)),
1798 self.pending_free.clone(),
1799 self.pending_alloc.clone(),
1800 )
1801 .expect("failed to clone mapping"),
1802 );
1803 let reverse_tree = Arc::new(
1804 Tree::open(
1805 physical_store,
1806 Arc::new(RwLock::new(reverse_root)),
1807 self.pending_free.clone(),
1808 self.pending_alloc.clone(),
1809 )
1810 .expect("failed to clone reverse"),
1811 );
1812 let logical_store = Arc::new(LogicalStore::new(
1813 self.store.clone(),
1814 mapping_tree.clone(),
1815 reverse_tree.clone(),
1816 ));
1817 let logical_store_obj: Arc<dyn PageStore> = logical_store.clone();
1818 let catalog_tree = Arc::new(
1819 Tree::open(
1820 logical_store_obj,
1821 Arc::new(RwLock::new(catalog_root)),
1822 self.pending_free.clone(),
1823 self.pending_alloc.clone(),
1824 )
1825 .expect("failed to clone catalog"),
1826 );
1827
1828 Self {
1829 store: self.store.clone(),
1830 catalog_tree,
1831 mapping_tree,
1832 reverse_tree,
1833 logical_store,
1834 pending_free: self.pending_free.clone(),
1835 pending_alloc: self.pending_alloc.clone(),
1836 writer_lock: self.writer_lock.clone(),
1837 start_root_id: Arc::new(AtomicU32::new(start_root_id)),
1838 start_seq: Arc::new(AtomicU64::new(start_seq)),
1839 bucket_root_cache: self.bucket_root_cache.clone(),
1840 bucket_tree_cache: self.bucket_tree_cache.clone(),
1841 instance_anchor: self.instance_anchor.clone(),
1842 }
1843 }
1844}