Skip to main content

btree_store/
lib.rs

1use std::{
2    collections::HashMap,
3    collections::HashSet,
4    fmt, io,
5    path::{Path, PathBuf},
6    sync::{
7        Arc, OnceLock, Weak,
8        atomic::{AtomicU32, AtomicU64, Ordering},
9    },
10};
11
12use parking_lot::{Mutex, RwLock, RwLockReadGuard};
13
14#[cfg(feature = "ffi")]
15mod ffi;
16pub(crate) mod node;
17pub(crate) mod page_store;
18pub(crate) mod store;
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum Error {
22    NotFound,
23    Corruption,
24    TooLarge,
25    Internal,
26    NoSpace,
27    IoError,
28    Invalid,
29    Duplicate,
30    Conflict,
31}
32
33impl fmt::Display for Error {
34    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35        write!(f, "{:?}", self)
36    }
37}
38
39impl std::error::Error for Error {}
40
41impl From<io::Error> for Error {
42    fn from(_: io::Error) -> Self {
43        Error::IoError
44    }
45}
46
47pub type Result<T> = std::result::Result<T, Error>;
48
49pub type PageId = u32;
50
51pub const MAGIC: u64 = 0x636f776274726565;
52pub const FORMAT_VERSION: u32 = 3;
53
54static BTREE_INSTANCE_REGISTRY: OnceLock<Mutex<HashMap<PathBuf, Weak<BTree>>>> = OnceLock::new();
55
56fn btree_instance_registry() -> &'static Mutex<HashMap<PathBuf, Weak<BTree>>> {
57    BTREE_INSTANCE_REGISTRY.get_or_init(|| Mutex::new(HashMap::new()))
58}
59
60fn sweep_dead_btree_instances(reg: &mut HashMap<PathBuf, Weak<BTree>>) {
61    reg.retain(|_, weak| weak.strong_count() > 0);
62}
63
64fn normalize_db_path(path: &Path) -> PathBuf {
65    if let Ok(canonical) = std::fs::canonicalize(path) {
66        return canonical;
67    }
68
69    let absolute = if path.is_absolute() {
70        path.to_path_buf()
71    } else if let Ok(cwd) = std::env::current_dir() {
72        cwd.join(path)
73    } else {
74        path.to_path_buf()
75    };
76
77    let parent_canonical = absolute
78        .parent()
79        .and_then(|p| std::fs::canonicalize(p).ok());
80    if let Some(parent) = parent_canonical
81        && let Some(name) = absolute.file_name()
82    {
83        return parent.join(name);
84    }
85    absolute
86}
87
88#[repr(C)]
89#[derive(Clone, Copy, Debug)]
90pub struct MetaNode {
91    pub magic: u64,
92    pub format_version: u32,
93    pub page_size: u32,
94    pub catalog_root: PageId,
95    pub mapping_root: PageId,
96    pub reverse_root: PageId,
97    pub next_lid: PageId,
98    pub next_page_id: PageId,
99    pub freelist_root: PageId,
100    pub seq: u64,
101    pub checksum: u64,
102}
103
104impl MetaNode {
105    pub fn as_page_slice(&self) -> [u8; PAGE_SIZE] {
106        let mut buf = [0u8; PAGE_SIZE];
107        let src = unsafe {
108            std::slice::from_raw_parts(
109                (self as *const Self) as *const u8,
110                std::mem::size_of::<Self>(),
111            )
112        };
113        buf[..src.len()].copy_from_slice(src);
114        buf
115    }
116
117    pub fn from_slice(x: &[u8]) -> Self {
118        unsafe { std::ptr::read_unaligned(x.as_ptr().cast::<Self>()) }
119    }
120}
121
122impl Default for MetaNode {
123    fn default() -> Self {
124        Self::new()
125    }
126}
127
128impl MetaNode {
129    pub fn new() -> Self {
130        let mut this = Self {
131            magic: MAGIC,
132            format_version: FORMAT_VERSION,
133            page_size: PAGE_SIZE as u32,
134            catalog_root: 0,
135            mapping_root: 0,
136            reverse_root: 0,
137            next_lid: 1,
138            next_page_id: 2, // skip two meta pages
139            freelist_root: 0,
140            seq: 1,
141            checksum: 0,
142        };
143        this.update_checksum();
144        this
145    }
146
147    pub fn update_checksum(&mut self) {
148        self.checksum = 0;
149        let s = unsafe {
150            std::slice::from_raw_parts(
151                (self as *const Self) as *const u8,
152                std::mem::size_of::<Self>(),
153            )
154        };
155        self.checksum = crc32c::crc32c(s) as u64;
156    }
157
158    fn calc_checksum(&self) -> u64 {
159        let mut clone = *self;
160        clone.checksum = 0;
161        let s = unsafe {
162            std::slice::from_raw_parts(
163                (&clone as *const Self) as *const u8,
164                std::mem::size_of::<Self>(),
165            )
166        };
167        crc32c::crc32c(s) as u64
168    }
169
170    pub fn validate(&self) -> Result<()> {
171        // Torn write detection: treat an all-zero meta page as invalid.
172        if self.magic == 0 && self.seq == 0 {
173            return Err(Error::Corruption);
174        }
175        if self.checksum != self.calc_checksum() {
176            return Err(Error::Corruption);
177        }
178        Ok(())
179    }
180}
181
182use crate::{
183    node::{MAX_KEY_LEN, Node, PAGE_SIZE},
184    page_store::{LogicalStore, PageStore, decode_u32_key, encode_u32_key},
185    store::Store,
186};
187
188struct Route {
189    node: Arc<Node>,
190    page_id: PageId,
191    pos: usize,
192}
193
194/// encapsulates page allocation and freeing during a transaction, ensuring COW safety
195struct TxContext<'a> {
196    store: &'a dyn PageStore,
197    freed: &'a mut Vec<(PageId, u32)>,
198    alloc: &'a mut HashSet<PageId>,
199}
200
201impl<'a> TxContext<'a> {
202    fn new(
203        store: &'a dyn PageStore,
204        freed: &'a mut Vec<(PageId, u32)>,
205        alloc: &'a mut HashSet<PageId>,
206    ) -> Self {
207        Self {
208            store,
209            freed,
210            alloc,
211        }
212    }
213
214    fn alloc_page(&mut self) -> Result<PageId> {
215        self.store.alloc_page(self.alloc)
216    }
217
218    fn write_node(&mut self, node: &mut Node) -> Result<PageId> {
219        let pid = self.alloc_page()?;
220        self.store.write_data(&[pid], node.finalize())?;
221        Ok(pid)
222    }
223
224    fn free_page(&mut self, pid: PageId) -> Result<()> {
225        self.store.schedule_free(pid, self.freed)
226    }
227}
228
229pub struct Tree {
230    store: Arc<dyn PageStore>,
231    pub root_page_id: Arc<RwLock<PageId>>,
232    pending_free: Arc<RwLock<Vec<(PageId, u32)>>>,
233    pending_alloc: Arc<RwLock<HashSet<PageId>>>,
234}
235
236impl Tree {
237    pub fn open(
238        store: Arc<dyn PageStore>,
239        root_page_id: Arc<RwLock<PageId>>,
240        pending_free: Arc<RwLock<Vec<(PageId, u32)>>>,
241        pending_alloc: Arc<RwLock<HashSet<PageId>>>,
242    ) -> Result<Self> {
243        Ok(Self {
244            store,
245            root_page_id,
246            pending_free,
247            pending_alloc,
248        })
249    }
250
251    fn traverse_to_leaf(
252        &self,
253        mut node: Arc<Node>,
254        mut page_id: PageId,
255        key: &[u8],
256    ) -> Result<(Vec<Route>, Arc<Node>, PageId)> {
257        let mut stack = Vec::new();
258        while !node.is_leaf() {
259            let pos = match node.search(key) {
260                Ok(pos) => pos,
261                Err(pos) => pos.saturating_sub(1),
262            };
263            let child_id = node.child_at(pos);
264            if child_id == 0 {
265                return Err(Error::Corruption);
266            }
267            let child_node = self.store.load_node(child_id)?;
268            stack.push(Route { node, page_id, pos });
269            node = child_node;
270            page_id = child_id;
271        }
272        Ok((stack, node, page_id))
273    }
274
275    // put tmp pids to global containers
276    fn merge_pending(&self, freed: Vec<(PageId, u32)>, mut alloc: HashSet<PageId>) {
277        let mut main_free = self.pending_free.write();
278        let mut main_alloc = self.pending_alloc.write();
279
280        for (pid, nr) in freed {
281            if alloc.remove(&pid) {
282                // If it was allocated and freed in the same operation, it's a transient COW node.
283                // It's safe to return it to the store immediately for reuse within this txn.
284                let _ = self.store.free_pages_immediate(pid, nr);
285                continue;
286            }
287            if main_alloc.remove(&pid) {
288                // Previously allocated in this same transaction, now replaced by a newer COW version.
289                let _ = self.store.free_pages_immediate(pid, nr);
290            } else {
291                Self::merge_free_extent(&mut main_free, pid, nr);
292            }
293        }
294        main_alloc.extend(alloc);
295    }
296
297    fn merge_free_extent(free: &mut Vec<(PageId, u32)>, page_id: PageId, nr_pages: u32) {
298        if page_id == 0 || nr_pages == 0 {
299            return;
300        }
301
302        let mut start = page_id as u64;
303        let mut end = start + nr_pages as u64;
304        let mut idx = 0;
305
306        while idx < free.len() && (free[idx].0 as u64) + (free[idx].1 as u64) < start {
307            idx += 1;
308        }
309
310        while idx < free.len() {
311            let (free_start, free_len) = free[idx];
312            let free_start = free_start as u64;
313            let free_end = free_start + free_len as u64;
314            if free_start > end {
315                break;
316            }
317            start = start.min(free_start);
318            end = end.max(free_end);
319            free.remove(idx);
320        }
321
322        free.insert(idx, (start as PageId, (end - start) as u32));
323    }
324
325    pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> {
326        if key.len() > MAX_KEY_LEN {
327            return Err(Error::TooLarge);
328        }
329        // use local buffers to reduce lock contention and keep partial changes local until success
330        let mut freed = Vec::new();
331        let mut alloc = HashSet::new();
332
333        let result = self.execute_put(key, value, &mut freed, &mut alloc);
334
335        if result.is_err() {
336            // reclaim pages allocated during this failed operation to prevent leak
337            for pid in alloc {
338                let _ = self.store.free_pages_immediate(pid, 1);
339            }
340            return result;
341        }
342
343        self.merge_pending(freed, alloc);
344        Ok(())
345    }
346
347    fn execute_put(
348        &self,
349        key: &[u8],
350        value: &[u8],
351        freed: &mut Vec<(PageId, u32)>,
352        alloc: &mut HashSet<PageId>,
353    ) -> Result<()> {
354        let mut ctx = TxContext::new(self.store.as_ref(), freed, alloc);
355        let mut root_lock = self.root_page_id.write();
356        let current_root_id = *root_lock;
357
358        // root is empty
359        if current_root_id == 0 {
360            let mut node = Node::new_leaf();
361            node.put(ctx.store, key, value, ctx.freed, ctx.alloc)?;
362            *root_lock = ctx.write_node(&mut node)?;
363            return Ok(());
364        }
365
366        // 1. find target leaf node
367        let root_node = self.store.load_node(current_root_id)?;
368        let (mut stack, leaf_node_arc, leaf_id) =
369            self.traverse_to_leaf(root_node, current_root_id, key)?;
370
371        let mut current_node = (*leaf_node_arc).clone();
372
373        // 2. modify leaf node and get split info (if any)
374        let mut split_info = self.apply_insert(&mut ctx, &mut current_node, key, value)?;
375
376        // write new COW leaf node
377        let mut new_child_id = ctx.write_node(&mut current_node)?;
378        ctx.free_page(leaf_id)?;
379
380        // 3. backtrack up the path, propagating changes and splits
381        while let Some(Route {
382            node: parent_arc,
383            page_id: parent_id,
384            pos,
385        }) = stack.pop()
386        {
387            let mut parent = (*parent_arc).clone();
388
389            // point the current slot to the newly created COW node
390            parent.update_child_page(pos, new_child_id);
391
392            // if the previous child node split, insert the right-hand side (rhs) into the current parent
393            if let Some((sep, mut rhs)) = split_info.take() {
394                let rhs_id = ctx.write_node(&mut rhs)?;
395                // inserting a separator may cause the parent node to split as well
396                split_info =
397                    self.apply_insert(&mut ctx, &mut parent, &sep, &rhs_id.to_le_bytes())?;
398            }
399
400            // write new COW parent node and prepare for the next level up
401            new_child_id = ctx.write_node(&mut parent)?;
402            ctx.free_page(parent_id)?;
403        }
404
405        // 4. handle root node split
406        if let Some((sep, mut rhs)) = split_info {
407            let rhs_id = ctx.write_node(&mut rhs)?;
408            let mut new_root = Node::new_branch();
409            // left child points to the COW version of the old root; its key is always empty in branch nodes
410            new_root.put(
411                ctx.store,
412                &[],
413                &new_child_id.to_le_bytes(),
414                ctx.freed,
415                ctx.alloc,
416            )?;
417            // right child points to the newly split node
418            new_root.put(ctx.store, &sep, &rhs_id.to_le_bytes(), ctx.freed, ctx.alloc)?;
419            *root_lock = ctx.write_node(&mut new_root)?;
420        } else {
421            // root did not split, simply update root pointer
422            *root_lock = new_child_id;
423        }
424
425        Ok(())
426    }
427
428    fn apply_insert(
429        &self,
430        ctx: &mut TxContext,
431        node: &mut Node,
432        key: &[u8],
433        value: &[u8],
434    ) -> Result<Option<(Vec<u8>, Node)>> {
435        match node.put(ctx.store, key, value, ctx.freed, ctx.alloc) {
436            Ok(()) => Ok(None),
437            Err(Error::NoSpace) => {
438                // split to right
439                let (sep, mut rhs) = node.split()?;
440                if key < &sep {
441                    node.put(ctx.store, key, value, ctx.freed, ctx.alloc)?;
442                } else {
443                    rhs.put(ctx.store, key, value, ctx.freed, ctx.alloc)?;
444                }
445                Ok(Some((sep, rhs)))
446            }
447            Err(e) => Err(e),
448        }
449    }
450
451    pub fn get(&self, key: &[u8]) -> Result<Vec<u8>> {
452        let root_id = *self.root_page_id.read();
453        if root_id == 0 {
454            return Err(Error::NotFound);
455        }
456
457        let root_node = self.store.load_node(root_id)?;
458        let mut current = root_node;
459        loop {
460            if current.is_leaf() {
461                return current.get(self.store.as_ref(), key);
462            }
463            let pos = current.child_pos_for_key(key);
464            current = self.store.load_node(current.child_at(pos))?;
465        }
466    }
467
468    pub fn del(&self, key: &[u8]) -> Result<()> {
469        // use local buffers to reduce lock contention and keep partial changes local until success
470        let mut freed = Vec::new();
471        let mut alloc = HashSet::new();
472
473        let result = self.execute_del(key, &mut freed, &mut alloc);
474
475        if result.is_err() {
476            // reclaim pages allocated during this failed operation to prevent leak
477            for pid in alloc {
478                let _ = self.store.free_pages_immediate(pid, 1);
479            }
480            return result;
481        }
482
483        self.merge_pending(freed, alloc);
484        Ok(())
485    }
486
487    fn execute_del(
488        &self,
489        key: &[u8],
490        freed: &mut Vec<(PageId, u32)>,
491        alloc: &mut HashSet<PageId>,
492    ) -> Result<()> {
493        let mut ctx = TxContext::new(self.store.as_ref(), freed, alloc);
494        let mut root_lock = self.root_page_id.write();
495        let current_root_id = *root_lock;
496
497        if current_root_id == 0 {
498            return Err(Error::NotFound);
499        }
500
501        // 1. find target leaf node
502        let root_node = self.store.load_node(current_root_id)?;
503        let (mut stack, leaf_arc, leaf_id) =
504            self.traverse_to_leaf(root_node, current_root_id, key)?;
505
506        let mut current_node = (*leaf_arc).clone();
507        current_node.del(ctx.store, key, ctx.freed)?;
508
509        // 2. handle leaf node changes
510        let mut empty = current_node.is_empty();
511        let mut new_child_id = if !empty {
512            ctx.write_node(&mut current_node)?
513        } else {
514            0
515        };
516        ctx.free_page(leaf_id)?;
517
518        // 3. backtrack up the path, handling parent node updates or shrinks
519        while let Some(Route {
520            node: parent_arc,
521            page_id: parent_id,
522            pos,
523        }) = stack.pop()
524        {
525            let mut parent = (*parent_arc).clone();
526
527            if empty {
528                // if child node became empty, remove corresponding slot from parent
529                // NOTE: merge is triggered when node is empty and thus no elements borrow is required
530                parent.shrink_slot(pos);
531                // special case: if the leftmost child of a branch node is deleted, ensure the new first child has an
532                // empty key
533                if !parent.is_leaf() && pos == 0 && !parent.is_empty() {
534                    parent.slot_at_mut(0).klen = 0;
535                    parent.dirty = true;
536                }
537            } else {
538                // if child node only changed content, update pointer in parent
539                parent.update_child_page(pos, new_child_id);
540            }
541
542            // check if current parent node also becomes empty
543            if parent.is_empty() {
544                empty = true;
545                new_child_id = 0;
546            } else {
547                empty = false;
548                new_child_id = ctx.write_node(&mut parent)?;
549            }
550            ctx.free_page(parent_id)?;
551        }
552
553        // 4. root collapse optimization
554        // if root is a branch node with only one child, elevate child to be the new root
555        if new_child_id != 0 {
556            loop {
557                let node_id = new_child_id;
558                let node = self.store.load_node(node_id)?;
559                if !node.is_leaf() && node.num_children() == 1 {
560                    let child_id = node.child_at(0);
561                    ctx.free_page(node_id)?;
562                    new_child_id = child_id;
563                } else {
564                    break;
565                }
566            }
567        }
568
569        *root_lock = new_child_id;
570        Ok(())
571    }
572
573    pub(crate) fn collect_tree_pages(
574        store: &dyn PageStore,
575        root_id: PageId,
576        freed: &mut Vec<(PageId, u32)>,
577    ) -> Result<()> {
578        if root_id == 0 {
579            return Ok(());
580        }
581
582        let mut stack = vec![root_id];
583
584        while let Some(current_id) = stack.pop() {
585            let node = store.load_node(current_id)?;
586            store.schedule_free(current_id, freed)?;
587
588            for i in 0..node.num_children() {
589                if node.is_leaf() {
590                    let slot = node.slot_at(i);
591                    if !slot.is_inline() {
592                        node.free_slot_pages(store, slot, freed)?;
593                    }
594                } else {
595                    let child_id = node.child_at(i);
596                    if child_id != 0 {
597                        stack.push(child_id);
598                    }
599                }
600            }
601        }
602        Ok(())
603    }
604
605    pub fn iterator(&self) -> TreeIterator {
606        let root_id = *self.root_page_id.read();
607        TreeIterator::new(self.store.clone(), root_id)
608    }
609
610    pub fn iterator_from(&self, key: &[u8]) -> TreeIterator {
611        let root_id = *self.root_page_id.read();
612        TreeIterator::new_from(self.store.clone(), root_id, key)
613    }
614}
615
616pub struct TreeIterator {
617    store: Arc<dyn PageStore>,
618    stack: Vec<(Arc<Node>, usize)>,
619    current_leaf: Option<(Arc<Node>, usize)>,
620}
621
622impl TreeIterator {
623    fn new(store: Arc<dyn PageStore>, root_id: PageId) -> Self {
624        let mut iter = Self {
625            store,
626            stack: Vec::new(),
627            current_leaf: None,
628        };
629
630        if root_id != 0
631            && let Ok(node) = iter.store.load_node(root_id)
632        {
633            iter.push_node(node);
634        }
635        iter
636    }
637
638    fn new_from(store: Arc<dyn PageStore>, root_id: PageId, key: &[u8]) -> Self {
639        let mut iter = Self {
640            store,
641            stack: Vec::new(),
642            current_leaf: None,
643        };
644
645        if root_id == 0 {
646            return iter;
647        }
648
649        let mut node = match iter.store.load_node(root_id) {
650            Ok(node) => node,
651            Err(_) => return iter,
652        };
653
654        while !node.is_leaf() {
655            let pos = match node.search(key) {
656                Ok(pos) => pos,
657                Err(pos) => pos.saturating_sub(1),
658            };
659            let child_id = node.child_at(pos);
660            if child_id == 0 {
661                return iter;
662            }
663            iter.stack.push((node.clone(), pos + 1));
664            match iter.store.load_node(child_id) {
665                Ok(child) => node = child,
666                Err(_) => return iter,
667            }
668        }
669
670        let leaf_pos = match node.search(key) {
671            Ok(pos) => pos,
672            Err(pos) => pos,
673        };
674        iter.current_leaf = Some((node, leaf_pos));
675        iter
676    }
677
678    fn push_node(&mut self, node: Arc<Node>) {
679        if node.is_leaf() {
680            self.current_leaf = Some((node, 0));
681        } else {
682            self.stack.push((node, 0));
683        }
684    }
685
686    pub fn next_ref(&mut self, key_buf: &mut Vec<u8>, val_buf: &mut Vec<u8>) -> bool {
687        loop {
688            if let Some((leaf, idx)) = self.current_leaf.as_mut() {
689                if *idx < leaf.num_children() {
690                    let slot = leaf.slot_at(*idx);
691
692                    key_buf.clear();
693                    key_buf.extend_from_slice(leaf.key_at(*idx));
694
695                    val_buf.clear();
696                    if slot.is_inline() {
697                        val_buf.extend_from_slice(leaf.value_at(*idx));
698                    } else if let Ok(pages) = leaf.collect_page_ids(self.store.as_ref(), slot) {
699                        val_buf.resize(slot.value_len(), 0);
700                        if self.store.read_data(&pages, val_buf).is_err() {
701                            *idx += 1;
702                            continue;
703                        }
704                    } else {
705                        *idx += 1;
706                        continue;
707                    }
708
709                    *idx += 1;
710                    return true;
711                } else {
712                    self.current_leaf = None;
713                }
714            }
715
716            if let Some((node, idx)) = self.stack.last_mut() {
717                if *idx < node.num_children() {
718                    let child_id = node.child_at(*idx);
719                    *idx += 1;
720                    if let Ok(child_node) = self.store.load_node(child_id) {
721                        self.push_node(child_node);
722                    }
723                } else {
724                    self.stack.pop();
725                }
726            } else {
727                return false;
728            }
729        }
730    }
731}
732
733pub struct Txn<'a> {
734    pub(crate) tree: Tree,
735    pub(crate) _marker: std::marker::PhantomData<&'a ()>,
736}
737
738impl<'a> Txn<'a> {
739    pub fn put<K, V>(&mut self, key: K, value: V) -> Result<()>
740    where
741        K: AsRef<[u8]>,
742        V: AsRef<[u8]>,
743    {
744        self.tree.put(key.as_ref(), value.as_ref())
745    }
746
747    pub fn get<K>(&self, key: K) -> Result<Vec<u8>>
748    where
749        K: AsRef<[u8]>,
750    {
751        self.tree.get(key.as_ref())
752    }
753
754    pub fn del<K>(&mut self, key: K) -> Result<()>
755    where
756        K: AsRef<[u8]>,
757    {
758        self.tree.del(key.as_ref())
759    }
760
761    pub fn iter(&self) -> TreeIterator {
762        self.tree.iterator()
763    }
764}
765
766pub struct ReadOnlyTree {
767    store: Arc<dyn PageStore>,
768    root_page_id: PageId,
769    root_node: Option<Arc<Node>>,
770}
771
772impl ReadOnlyTree {
773    fn new(store: Arc<dyn PageStore>, root_page_id: PageId) -> Result<Self> {
774        let root_node = if root_page_id == 0 {
775            None
776        } else {
777            Some(store.load_node(root_page_id)?)
778        };
779        Ok(Self {
780            store,
781            root_page_id,
782            root_node,
783        })
784    }
785
786    fn get(&self, key: &[u8]) -> Result<Vec<u8>> {
787        if self.root_page_id == 0 {
788            return Err(Error::NotFound);
789        }
790
791        let mut current = self.root_node.clone().ok_or(Error::NotFound)?;
792        loop {
793            if current.is_leaf() {
794                return current.get(self.store.as_ref(), key);
795            }
796            let pos = current.child_pos_for_key(key);
797            current = self.store.load_node(current.child_at(pos))?;
798        }
799    }
800
801    fn iterator(&self) -> TreeIterator {
802        TreeIterator::new(self.store.clone(), self.root_page_id)
803    }
804}
805
806pub struct ReadOnlyTxn<'a> {
807    pub(crate) tree: Arc<ReadOnlyTree>,
808    pub(crate) _guard: RwLockReadGuard<'a, ()>,
809}
810
811impl<'a> ReadOnlyTxn<'a> {
812    pub fn get<K>(&self, key: K) -> Result<Vec<u8>>
813    where
814        K: AsRef<[u8]>,
815    {
816        self.tree.get(key.as_ref())
817    }
818
819    pub fn iter(&self) -> TreeIterator {
820        self.tree.iterator()
821    }
822}
823
824pub struct MultiTxn<'a> {
825    btree: &'a BTree,
826    bucket_roots: HashMap<String, PageId>,
827}
828
829impl<'a> MultiTxn<'a> {
830    pub fn exec<F, R>(&mut self, bucket: &str, f: F) -> Result<R>
831    where
832        F: FnOnce(&mut Txn) -> Result<R>,
833    {
834        let name_bytes = bucket.as_bytes();
835
836        let initial_root = if let Some(&root) = self.bucket_roots.get(bucket) {
837            root
838        } else {
839            match self.btree.catalog_tree.get(name_bytes) {
840                Ok(bytes) => BucketMetadata::from_slice(&bytes).root_page_id,
841                Err(Error::NotFound) => 0,
842                Err(e) => return Err(e),
843            }
844        };
845
846        let logical_store_obj: Arc<dyn PageStore> = self.btree.logical_store.clone();
847        let tree = Tree::open(
848            logical_store_obj,
849            Arc::new(RwLock::new(initial_root)),
850            self.btree.pending_free.clone(),
851            self.btree.pending_alloc.clone(),
852        )?;
853
854        let mut txn = Txn {
855            tree,
856            _marker: std::marker::PhantomData,
857        };
858
859        let res = f(&mut txn);
860        if res.is_ok() {
861            let new_root = *txn.tree.root_page_id.read();
862            self.bucket_roots.insert(bucket.to_string(), new_root);
863        }
864        res
865    }
866}
867
868pub struct BucketMetadata {
869    pub(crate) root_page_id: PageId,
870}
871
872impl BucketMetadata {
873    pub fn from_slice(x: &[u8]) -> Self {
874        assert!(x.len() >= std::mem::size_of::<Self>());
875        unsafe { std::ptr::read_unaligned(x.as_ptr().cast::<Self>()) }
876    }
877
878    pub fn as_slice(&self) -> &[u8] {
879        unsafe {
880            std::slice::from_raw_parts(
881                self as *const Self as *const u8,
882                std::mem::size_of::<Self>(),
883            )
884        }
885    }
886}
887
888pub struct BTree {
889    pub(crate) store: Arc<Store>,
890    pub(crate) catalog_tree: Arc<Tree>,
891    pub(crate) mapping_tree: Arc<Tree>,
892    pub(crate) reverse_tree: Arc<Tree>,
893    pub(crate) logical_store: Arc<LogicalStore>,
894    pub(crate) pending_free: Arc<RwLock<Vec<(PageId, u32)>>>,
895    pub(crate) pending_alloc: Arc<RwLock<HashSet<PageId>>>,
896    pub(crate) writer_lock: Arc<RwLock<()>>,
897    pub(crate) start_root_id: Arc<AtomicU32>,
898    pub(crate) start_seq: Arc<AtomicU64>,
899    pub(crate) bucket_root_cache: Arc<RwLock<BucketRootCache>>,
900    pub(crate) bucket_tree_cache: Arc<RwLock<BucketTreeCache>>,
901    instance_anchor: Option<Arc<BTree>>,
902}
903
904type BucketRootCache = HashMap<Vec<u8>, (PageId, u64)>;
905type BucketTreeCache = HashMap<Vec<u8>, (PageId, u64, Arc<ReadOnlyTree>)>;
906
907/// compact all pages when total data pages are at or below this threshold
908const COMPACT_SMALL_DATA_THRESHOLD_PAGES: u64 = 1024;
909/// default tail ratio when target_bytes is zero
910const COMPACT_TAIL_RATIO: f64 = 0.5;
911
912#[derive(Clone, Copy, Debug, PartialEq, Eq)]
913pub struct CompactStats {
914    pub moved_pages: u64,
915    pub remaining_candidates: u64,
916}
917
918impl BTree {
919    fn sync_local_snapshot_from_store(&self) {
920        let snapshot = self.store.cached_snapshot();
921        *self.catalog_tree.root_page_id.write() = snapshot.catalog_root;
922        *self.mapping_tree.root_page_id.write() = snapshot.mapping_root;
923        *self.reverse_tree.root_page_id.write() = snapshot.reverse_root;
924        self.start_root_id
925            .store(snapshot.catalog_root, Ordering::Release);
926        self.start_seq.store(snapshot.seq, Ordering::Release);
927    }
928
929    /// Open or create a btree database at the given path.
930    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
931        let path = path.as_ref();
932        let key = normalize_db_path(path);
933
934        if let Some(existing) = {
935            let mut reg = btree_instance_registry().lock();
936            sweep_dead_btree_instances(&mut reg);
937            let upgraded = reg.get(&key).and_then(|w| w.upgrade());
938            if upgraded.is_none() {
939                reg.remove(&key);
940            }
941            upgraded
942        } {
943            let mut handle = existing.as_ref().clone();
944            handle.instance_anchor = Some(existing);
945            handle.sync_local_snapshot_from_store();
946            return Ok(handle);
947        }
948
949        let store = Arc::new(Store::open(path)?);
950        let catalog_root = store.get_catalog_root();
951        let mapping_root = store.get_mapping_root();
952        let reverse_root = store.get_reverse_root();
953        let initial_seq = store.get_seq();
954        let pending_free = Arc::new(RwLock::new(Vec::new()));
955        let pending_alloc = Arc::new(RwLock::new(HashSet::new()));
956        let mapping_root_lock = Arc::new(RwLock::new(mapping_root));
957        let reverse_root_lock = Arc::new(RwLock::new(reverse_root));
958
959        let physical_store: Arc<dyn PageStore> = store.clone();
960        let mapping_tree = Arc::new(Tree::open(
961            physical_store.clone(),
962            mapping_root_lock,
963            pending_free.clone(),
964            pending_alloc.clone(),
965        )?);
966        let reverse_tree = Arc::new(Tree::open(
967            physical_store,
968            reverse_root_lock,
969            pending_free.clone(),
970            pending_alloc.clone(),
971        )?);
972
973        let logical_store = Arc::new(LogicalStore::new(
974            store.clone(),
975            mapping_tree.clone(),
976            reverse_tree.clone(),
977        ));
978        let catalog_tree_root_lock = Arc::new(RwLock::new(catalog_root));
979        let logical_store_obj: Arc<dyn PageStore> = logical_store.clone();
980        let catalog_tree = Arc::new(Tree::open(
981            logical_store_obj,
982            catalog_tree_root_lock,
983            pending_free.clone(),
984            pending_alloc.clone(),
985        )?);
986
987        let instance = Self {
988            store: store.clone(),
989            catalog_tree,
990            mapping_tree,
991            reverse_tree,
992            logical_store,
993            pending_free,
994            pending_alloc,
995            writer_lock: Arc::new(RwLock::new(())),
996            start_root_id: Arc::new(AtomicU32::new(catalog_root)),
997            start_seq: Arc::new(AtomicU64::new(initial_seq)),
998            bucket_root_cache: Arc::new(RwLock::new(HashMap::new())),
999            bucket_tree_cache: Arc::new(RwLock::new(HashMap::new())),
1000            instance_anchor: None,
1001        };
1002        let instance_arc = Arc::new(instance);
1003        {
1004            let mut reg = btree_instance_registry().lock();
1005            sweep_dead_btree_instances(&mut reg);
1006            if let Some(existing) = reg.get(&key).and_then(|w| w.upgrade()) {
1007                let mut handle = existing.as_ref().clone();
1008                handle.instance_anchor = Some(existing);
1009                handle.sync_local_snapshot_from_store();
1010                return Ok(handle);
1011            }
1012            reg.insert(key, Arc::downgrade(&instance_arc));
1013        }
1014
1015        let mut handle = instance_arc.as_ref().clone();
1016        handle.instance_anchor = Some(instance_arc);
1017        Ok(handle)
1018    }
1019
1020    /// Executes a read-write transaction on the specified bucket.
1021    /// Creates the bucket on successful commit if it doesn't exist.
1022    ///
1023    /// The transaction is committed if the closure returns `Ok`, but the commit can still fail
1024    /// (e.g., conflict or I/O error). On failure, changes are rolled back.
1025    /// If the closure returns `Err`, the transaction is rolled back (allocated pages are reclaimed).
1026    ///
1027    /// # Warning
1028    /// Nested calls to `exec` or `view` on the same `BTree` instance are NOT supported
1029    /// and may lead to deadlocks or undefined behavior.
1030    pub fn exec<F, R>(&self, bucket: &str, f: F) -> Result<R>
1031    where
1032        F: FnOnce(&mut Txn) -> Result<R>,
1033    {
1034        let _lock = self.writer_lock.write();
1035
1036        // Auto-refresh to the latest disk state before starting a new transaction.
1037        // This makes the "Session" always start from the freshest data.
1038        self.refresh_internal()?;
1039
1040        // Check if there's an existing bucket
1041        let name_bytes = bucket.as_bytes();
1042        let initial_root = match self.catalog_tree.get(name_bytes) {
1043            Ok(bytes) => BucketMetadata::from_slice(&bytes).root_page_id,
1044            Err(Error::NotFound) => 0,
1045            Err(e) => return Err(e),
1046        };
1047
1048        // Snapshot pending state for rollback
1049        let pre_alloc = self.pending_alloc.read().clone();
1050        let pre_free = self.pending_free.read().clone();
1051        let pre_catalog_root = *self.catalog_tree.root_page_id.read();
1052        let pre_mapping_root = *self.mapping_tree.root_page_id.read();
1053        let pre_reverse_root = *self.reverse_tree.root_page_id.read();
1054
1055        let logical_store_obj: Arc<dyn PageStore> = self.logical_store.clone();
1056        let tree = Tree::open(
1057            logical_store_obj,
1058            Arc::new(RwLock::new(initial_root)),
1059            self.pending_free.clone(),
1060            self.pending_alloc.clone(),
1061        )?;
1062
1063        let mut txn = Txn {
1064            tree,
1065            _marker: std::marker::PhantomData,
1066        };
1067
1068        match f(&mut txn) {
1069            Ok(res) => {
1070                let new_root = *txn.tree.root_page_id.read();
1071                let metadata = BucketMetadata {
1072                    root_page_id: new_root,
1073                };
1074                self.catalog_tree.put(name_bytes, metadata.as_slice())?;
1075                if let Err(e) = self.commit_internal() {
1076                    // Rollback catalog and pages on commit failure (e.g. Conflict)
1077                    *self.catalog_tree.root_page_id.write() = pre_catalog_root;
1078                    *self.mapping_tree.root_page_id.write() = pre_mapping_root;
1079                    *self.reverse_tree.root_page_id.write() = pre_reverse_root;
1080                    self.rollback_pages(&pre_alloc, &pre_free);
1081                    return Err(e);
1082                }
1083                let latest_seq = self.store.get_seq();
1084                self.bucket_root_cache
1085                    .write()
1086                    .insert(name_bytes.to_vec(), (new_root, latest_seq));
1087                Ok(res)
1088            }
1089            Err(e) => {
1090                *self.catalog_tree.root_page_id.write() = pre_catalog_root;
1091                *self.mapping_tree.root_page_id.write() = pre_mapping_root;
1092                *self.reverse_tree.root_page_id.write() = pre_reverse_root;
1093                self.rollback_pages(&pre_alloc, &pre_free);
1094                Err(e)
1095            }
1096        }
1097    }
1098
1099    /// Executes multiple operations across different buckets in a single atomic transaction.
1100    ///
1101    /// This is more efficient than calling `exec` multiple times because on success it performs
1102    /// a single superblock update and disk sync at the end.
1103    pub fn exec_multi<F, R>(&self, f: F) -> Result<R>
1104    where
1105        F: FnOnce(&mut MultiTxn) -> Result<R>,
1106    {
1107        let _lock = self.writer_lock.write();
1108
1109        self.refresh_internal()?;
1110
1111        let pre_alloc = self.pending_alloc.read().clone();
1112        let pre_free = self.pending_free.read().clone();
1113        let pre_catalog_root = *self.catalog_tree.root_page_id.read();
1114        let pre_mapping_root = *self.mapping_tree.root_page_id.read();
1115        let pre_reverse_root = *self.reverse_tree.root_page_id.read();
1116
1117        let mut multi_txn = MultiTxn {
1118            btree: self,
1119            bucket_roots: HashMap::new(),
1120        };
1121
1122        match f(&mut multi_txn) {
1123            Ok(res) => {
1124                let mut updated = Vec::new();
1125                for (name, new_root) in multi_txn.bucket_roots {
1126                    let metadata = BucketMetadata {
1127                        root_page_id: new_root,
1128                    };
1129                    self.catalog_tree
1130                        .put(name.as_bytes(), metadata.as_slice())?;
1131                    updated.push((name.into_bytes(), new_root));
1132                }
1133                if let Err(e) = self.commit_internal() {
1134                    *self.catalog_tree.root_page_id.write() = pre_catalog_root;
1135                    *self.mapping_tree.root_page_id.write() = pre_mapping_root;
1136                    *self.reverse_tree.root_page_id.write() = pre_reverse_root;
1137                    self.rollback_pages(&pre_alloc, &pre_free);
1138                    return Err(e);
1139                }
1140                let latest_seq = self.store.get_seq();
1141                let mut cache = self.bucket_root_cache.write();
1142                for (name, new_root) in updated {
1143                    cache.insert(name, (new_root, latest_seq));
1144                }
1145                Ok(res)
1146            }
1147            Err(e) => {
1148                *self.catalog_tree.root_page_id.write() = pre_catalog_root;
1149                *self.mapping_tree.root_page_id.write() = pre_mapping_root;
1150                *self.reverse_tree.root_page_id.write() = pre_reverse_root;
1151                self.rollback_pages(&pre_alloc, &pre_free);
1152                Err(e)
1153            }
1154        }
1155    }
1156
1157    fn rollback_pages(&self, pre_alloc: &HashSet<PageId>, pre_free: &[(PageId, u32)]) {
1158        let mut alloc = self.pending_alloc.write();
1159        let mut freed_now = Vec::new();
1160        for &pid in alloc.iter() {
1161            if !pre_alloc.contains(&pid) {
1162                freed_now.push(pid);
1163            }
1164        }
1165        for pid in freed_now {
1166            alloc.remove(&pid);
1167            let _ = self.store.free_pages(pid, 1);
1168        }
1169        drop(alloc);
1170
1171        let current_free = self.pending_free.read().clone();
1172        let freed_delta = Self::diff_free_extents(&current_free, pre_free);
1173        for (pid, nr) in freed_delta {
1174            let _ = self.store.unfree_pages(pid, nr);
1175        }
1176
1177        *self.pending_free.write() = pre_free.to_owned();
1178    }
1179
1180    fn diff_free_extents(current: &[(PageId, u32)], base: &[(PageId, u32)]) -> Vec<(PageId, u32)> {
1181        let mut res = Vec::new();
1182        let mut j = 0usize;
1183
1184        for &(cur_start, cur_len) in current {
1185            let cur_start_u64 = cur_start as u64;
1186            let cur_end = cur_start_u64 + cur_len as u64;
1187            let mut start = cur_start_u64;
1188
1189            while j < base.len() && (base[j].0 as u64) + base[j].1 as u64 <= start {
1190                j += 1;
1191            }
1192
1193            let mut k = j;
1194            while k < base.len() {
1195                let (base_start, base_len) = base[k];
1196                let base_start_u64 = base_start as u64;
1197                let base_end = base_start_u64 + base_len as u64;
1198
1199                if base_start_u64 >= cur_end {
1200                    break;
1201                }
1202
1203                if base_start_u64 > start {
1204                    res.push((start as PageId, (base_start_u64 - start) as u32));
1205                }
1206
1207                if base_end >= cur_end {
1208                    start = cur_end;
1209                    break;
1210                }
1211
1212                start = base_end;
1213                k += 1;
1214            }
1215
1216            if start < cur_end {
1217                res.push((start as PageId, (cur_end - start) as u32));
1218            }
1219        }
1220
1221        res
1222    }
1223
1224    fn max_tree_page_id(store: &dyn PageStore, root_id: PageId) -> Result<PageId> {
1225        if root_id == 0 {
1226            return Ok(0);
1227        }
1228
1229        let mut max_pid = root_id;
1230        let mut stack = vec![root_id];
1231
1232        while let Some(pid) = stack.pop() {
1233            if pid > max_pid {
1234                max_pid = pid;
1235            }
1236            let node = store.load_node(pid)?;
1237            if node.is_leaf() {
1238                for i in 0..node.num_children() {
1239                    let slot = node.slot_at(i);
1240                    if !slot.is_inline() {
1241                        let pages = node.collect_page_ids(store, slot)?;
1242                        for page in pages {
1243                            if page > max_pid {
1244                                max_pid = page;
1245                            }
1246                        }
1247                    }
1248                }
1249            } else {
1250                for i in 0..node.num_children() {
1251                    let child = node.child_at(i);
1252                    if child != 0 {
1253                        stack.push(child);
1254                    }
1255                }
1256            }
1257        }
1258
1259        Ok(max_pid)
1260    }
1261
1262    /// Executes a read-only transaction on the specified bucket.
1263    ///
1264    /// # Warning
1265    /// Nested calls to `exec` or `view` on the same `BTree` instance are NOT supported
1266    /// and may lead to deadlocks or undefined behavior.
1267    pub fn view<F, R>(&self, bucket: &str, f: F) -> Result<R>
1268    where
1269        F: FnOnce(&ReadOnlyTxn) -> Result<R>,
1270    {
1271        let lock = self.writer_lock.read();
1272
1273        // For view, we also want the freshest data.
1274        let (mut latest_seq, mut latest_root) = self.store.shared_snapshot();
1275        let seq_changed = latest_seq != self.start_seq.load(Ordering::Acquire);
1276        if seq_changed {
1277            let snapshot = self.store.refresh_sb()?;
1278            // Clear cache to avoid stale reads if version moved
1279            self.store.clear_cache();
1280            self.logical_store.clear_lid_cache();
1281            *self.catalog_tree.root_page_id.write() = snapshot.catalog_root;
1282            *self.mapping_tree.root_page_id.write() = snapshot.mapping_root;
1283            *self.reverse_tree.root_page_id.write() = snapshot.reverse_root;
1284            self.start_root_id
1285                .store(snapshot.catalog_root, Ordering::Release);
1286            self.start_seq.store(snapshot.seq, Ordering::Release);
1287            self.bucket_root_cache.write().clear();
1288            self.bucket_tree_cache.write().clear();
1289            latest_seq = snapshot.seq;
1290            latest_root = snapshot.catalog_root;
1291        }
1292
1293        let name_bytes = bucket.as_bytes();
1294        let cached_tree =
1295            self.bucket_tree_cache
1296                .read()
1297                .get(name_bytes)
1298                .and_then(|(_root, seq, tree)| {
1299                    if *seq == latest_seq {
1300                        Some(tree.clone())
1301                    } else {
1302                        None
1303                    }
1304                });
1305        if let Some(tree) = cached_tree {
1306            let txn = ReadOnlyTxn { tree, _guard: lock };
1307            return f(&txn);
1308        }
1309
1310        let cached_root = self
1311            .bucket_root_cache
1312            .read()
1313            .get(name_bytes)
1314            .and_then(|(root, seq)| {
1315                if *seq == latest_seq {
1316                    Some(*root)
1317                } else {
1318                    None
1319                }
1320            });
1321
1322        let bucket_root = if let Some(root) = cached_root {
1323            root
1324        } else {
1325            // Use the latest root for the catalog lookup
1326            let logical_store_obj: Arc<dyn PageStore> = self.logical_store.clone();
1327            let catalog = Tree::open(
1328                logical_store_obj,
1329                Arc::new(RwLock::new(latest_root)),
1330                Arc::new(RwLock::new(Vec::new())),
1331                Arc::new(RwLock::new(HashSet::new())),
1332            )?;
1333
1334            let metadata_bytes = catalog.get(name_bytes)?;
1335            let metadata = BucketMetadata::from_slice(&metadata_bytes);
1336            let root = metadata.root_page_id;
1337            self.bucket_root_cache
1338                .write()
1339                .insert(name_bytes.to_vec(), (root, latest_seq));
1340            root
1341        };
1342
1343        let logical_store_obj: Arc<dyn PageStore> = self.logical_store.clone();
1344        let tree = Arc::new(ReadOnlyTree::new(logical_store_obj, bucket_root)?);
1345        self.bucket_tree_cache
1346            .write()
1347            .insert(name_bytes.to_vec(), (bucket_root, latest_seq, tree.clone()));
1348
1349        let txn = ReadOnlyTxn { tree, _guard: lock };
1350
1351        f(&txn)
1352    }
1353
1354    /// Delete a bucket by name and persist the change.
1355    pub fn del_bucket<N>(&self, name: N) -> Result<()>
1356    where
1357        N: AsRef<str>,
1358    {
1359        let _lock = self.writer_lock.write();
1360
1361        // ensure we are operating on the latest state
1362        self.refresh_internal()?;
1363
1364        let name_bytes = name.as_ref().as_bytes();
1365        let metadata_bytes = self.catalog_tree.get(name_bytes)?;
1366        let bucket_metadata = BucketMetadata::from_slice(&metadata_bytes);
1367
1368        let mut pages_to_free = Vec::new();
1369        if bucket_metadata.root_page_id != 0 {
1370            Tree::collect_tree_pages(
1371                self.logical_store.as_ref(),
1372                bucket_metadata.root_page_id,
1373                &mut pages_to_free,
1374            )?;
1375        }
1376
1377        self.catalog_tree.del(name_bytes)?;
1378        self.pending_free.write().extend(pages_to_free);
1379        self.commit_internal()
1380    }
1381
1382    fn commit_internal(&self) -> Result<()> {
1383        let start_seq = self.start_seq.load(Ordering::Acquire);
1384        let (latest_seq, _) = self.store.shared_snapshot();
1385        if latest_seq != start_seq {
1386            return Err(Error::Conflict);
1387        }
1388        let snapshot = self.store.cached_snapshot();
1389
1390        let catalog_root = *self.catalog_tree.root_page_id.read();
1391        let mapping_root = *self.mapping_tree.root_page_id.read();
1392        let reverse_root = *self.reverse_tree.root_page_id.read();
1393
1394        let mut freed_lock = self.pending_free.write();
1395        let mut alloc_lock = self.pending_alloc.write();
1396
1397        if freed_lock.is_empty()
1398            && alloc_lock.is_empty()
1399            && snapshot.catalog_root == catalog_root
1400            && snapshot.mapping_root == mapping_root
1401            && snapshot.reverse_root == reverse_root
1402        {
1403            return Ok(());
1404        }
1405
1406        self.store
1407            .commit_roots(catalog_root, mapping_root, reverse_root, &freed_lock)?;
1408        self.store.sync()?;
1409
1410        freed_lock.clear();
1411        alloc_lock.clear();
1412        self.start_root_id.store(catalog_root, Ordering::Release);
1413        self.start_seq
1414            .store(self.store.get_seq(), Ordering::Release);
1415        Ok(())
1416    }
1417
1418    /// Commit pending catalog changes if no conflict is detected.
1419    pub fn commit(&self) -> Result<()> {
1420        let _lock = self.writer_lock.write();
1421        self.commit_internal()
1422    }
1423
1424    fn compact_tail_window(
1425        total_pages: PageId,
1426        target_bytes: u64,
1427    ) -> Option<(PageId, PageId, u64)> {
1428        let usable_pages = total_pages.saturating_sub(2);
1429        if usable_pages == 0 {
1430            return None;
1431        }
1432
1433        let usable_pages_u64 = usable_pages as u64;
1434        let mut target_pages_u64 = if usable_pages_u64 <= COMPACT_SMALL_DATA_THRESHOLD_PAGES {
1435            usable_pages_u64
1436        } else if target_bytes == 0 {
1437            ((usable_pages_u64 as f64) * COMPACT_TAIL_RATIO).ceil() as u64
1438        } else {
1439            target_bytes.saturating_add(PAGE_SIZE as u64 - 1) / PAGE_SIZE as u64
1440        };
1441        if target_pages_u64 == 0 {
1442            return None;
1443        }
1444        if target_pages_u64 > usable_pages_u64 {
1445            target_pages_u64 = usable_pages_u64;
1446        }
1447
1448        let target_pages = target_pages_u64 as PageId;
1449        let tail_start = total_pages.saturating_sub(target_pages).max(2);
1450        Some((tail_start, target_pages, target_pages_u64))
1451    }
1452
1453    fn compact_move_tail(
1454        &self,
1455        total_pages: PageId,
1456        tail_start: PageId,
1457        target_pages_u64: u64,
1458        prealloc: Option<&[PageId]>,
1459    ) -> Result<(u64, u64, usize)> {
1460        let mut moved = 0u64;
1461        let mut total_candidates = 0u64;
1462        let mut prealloc_idx = 0usize;
1463        let mut iter = self.reverse_tree.iterator_from(&encode_u32_key(tail_start));
1464        let mut key_buf = Vec::new();
1465        let mut val_buf = Vec::new();
1466        let physical_store: &dyn PageStore = self.store.as_ref();
1467
1468        while iter.next_ref(&mut key_buf, &mut val_buf) {
1469            let pid = decode_u32_key(&key_buf)?;
1470            if pid < tail_start {
1471                continue;
1472            }
1473            if pid >= total_pages {
1474                break;
1475            }
1476            let lid = decode_u32_key(&val_buf)?;
1477            total_candidates += 1;
1478            if moved >= target_pages_u64 {
1479                continue;
1480            }
1481
1482            let new_pid = if let Some(pids) = prealloc {
1483                if prealloc_idx >= pids.len() {
1484                    return Err(Error::Internal);
1485                }
1486                let pid = pids[prealloc_idx];
1487                prealloc_idx += 1;
1488                pid
1489            } else {
1490                let mut alloc = self.pending_alloc.write();
1491                physical_store.alloc_page(&mut alloc)?
1492            };
1493
1494            let page = self.store.load_page(pid)?;
1495            self.store.write_page(new_pid, &page)?;
1496
1497            let lid_key = encode_u32_key(lid);
1498            let new_pid_key = encode_u32_key(new_pid);
1499            let old_pid_key = encode_u32_key(pid);
1500
1501            self.mapping_tree.put(&lid_key, &new_pid_key)?;
1502            self.reverse_tree.del(&old_pid_key)?;
1503            self.reverse_tree.put(&new_pid_key, &lid_key)?;
1504
1505            {
1506                let mut freed = self.pending_free.write();
1507                physical_store.schedule_free(pid, &mut freed)?;
1508            }
1509
1510            moved += 1;
1511        }
1512
1513        Ok((moved, total_candidates, prealloc_idx))
1514    }
1515
1516    fn compact_tail_live_pages(&self, total_pages: PageId, tail_start: PageId) -> Result<u64> {
1517        let mut total_candidates = 0u64;
1518        let mut iter = self.reverse_tree.iterator_from(&encode_u32_key(tail_start));
1519        let mut key_buf = Vec::new();
1520        let mut val_buf = Vec::new();
1521
1522        while iter.next_ref(&mut key_buf, &mut val_buf) {
1523            let pid = decode_u32_key(&key_buf)?;
1524            if pid < tail_start {
1525                continue;
1526            }
1527            if pid >= total_pages {
1528                break;
1529            }
1530            total_candidates += 1;
1531        }
1532
1533        Ok(total_candidates)
1534    }
1535
1536    fn compact_release_unused_prealloc(&self, prealloc: &[PageId], used: usize) -> Result<()> {
1537        if used >= prealloc.len() {
1538            return Ok(());
1539        }
1540
1541        let unused: Vec<PageId> = prealloc[used..].to_vec();
1542        {
1543            let mut alloc = self.pending_alloc.write();
1544            for pid in &unused {
1545                alloc.remove(pid);
1546            }
1547        }
1548
1549        for pid in unused {
1550            self.store.free_pages(pid, 1)?;
1551        }
1552
1553        Ok(())
1554    }
1555
1556    /// run tail-window compaction
1557    ///
1558    /// target_bytes is the desired amount to reclaim, 0 uses the default ratio
1559    /// this moves live pages out of the tail window and tries to truncate the file
1560    /// if low-address free pages exceed the threshold, the mover allocates only below the tail
1561    pub fn compact(&self, target_bytes: u64) -> Result<CompactStats> {
1562        let _lock = self.writer_lock.write();
1563
1564        self.refresh_internal()?;
1565
1566        let total_pages = self.store.get_next_page_id();
1567        let (tail_start, _target_pages, target_pages_u64) =
1568            if let Some(params) = Self::compact_tail_window(total_pages, target_bytes) {
1569                params
1570            } else {
1571                return Ok(CompactStats {
1572                    moved_pages: 0,
1573                    remaining_candidates: 0,
1574                });
1575            };
1576        if target_pages_u64 == 0 {
1577            return Ok(CompactStats {
1578                moved_pages: 0,
1579                remaining_candidates: 0,
1580            });
1581        }
1582        let total_candidates = self.compact_tail_live_pages(total_pages, tail_start)?;
1583        if total_candidates == 0 {
1584            return Ok(CompactStats {
1585                moved_pages: 0,
1586                remaining_candidates: 0,
1587            });
1588        }
1589        let planned_moves = total_candidates.min(target_pages_u64);
1590        // For default compaction, avoid file growth by requiring all relocated pages
1591        // to be preallocated from low addresses below the compaction tail.
1592        let strict_no_growth = target_bytes == 0;
1593
1594        let pre_alloc = self.pending_alloc.read().clone();
1595        let pre_free = self.pending_free.read().clone();
1596        let pre_catalog_root = *self.catalog_tree.root_page_id.read();
1597        let pre_mapping_root = *self.mapping_tree.root_page_id.read();
1598        let pre_reverse_root = *self.reverse_tree.root_page_id.read();
1599
1600        let mut prealloc = None;
1601        if strict_no_growth && self.store.free_pages_below(tail_start) < planned_moves {
1602            return Ok(CompactStats {
1603                moved_pages: 0,
1604                remaining_candidates: total_candidates,
1605            });
1606        }
1607        if let Some(pids) = self
1608            .store
1609            .alloc_pages_below(tail_start, planned_moves as PageId)?
1610        {
1611            let mut alloc = self.pending_alloc.write();
1612            for pid in &pids {
1613                alloc.insert(*pid);
1614            }
1615            prealloc = Some(pids);
1616        }
1617        if strict_no_growth && prealloc.is_none() {
1618            return Ok(CompactStats {
1619                moved_pages: 0,
1620                remaining_candidates: total_candidates,
1621            });
1622        }
1623        let move_budget = if strict_no_growth {
1624            planned_moves
1625        } else {
1626            target_pages_u64
1627        };
1628
1629        let move_result =
1630            self.compact_move_tail(total_pages, tail_start, move_budget, prealloc.as_deref());
1631
1632        let (moved, _scanned_candidates, used_prealloc) = match move_result {
1633            Ok(res) => res,
1634            Err(e) => {
1635                *self.catalog_tree.root_page_id.write() = pre_catalog_root;
1636                *self.mapping_tree.root_page_id.write() = pre_mapping_root;
1637                *self.reverse_tree.root_page_id.write() = pre_reverse_root;
1638                self.rollback_pages(&pre_alloc, &pre_free);
1639                return Err(e);
1640            }
1641        };
1642
1643        if let Some(prealloc) = prealloc.as_ref()
1644            && let Err(e) = self.compact_release_unused_prealloc(prealloc, used_prealloc)
1645        {
1646            *self.catalog_tree.root_page_id.write() = pre_catalog_root;
1647            *self.mapping_tree.root_page_id.write() = pre_mapping_root;
1648            *self.reverse_tree.root_page_id.write() = pre_reverse_root;
1649            self.rollback_pages(&pre_alloc, &pre_free);
1650            return Err(e);
1651        }
1652
1653        if let Err(e) = self.commit_internal() {
1654            *self.catalog_tree.root_page_id.write() = pre_catalog_root;
1655            *self.mapping_tree.root_page_id.write() = pre_mapping_root;
1656            *self.reverse_tree.root_page_id.write() = pre_reverse_root;
1657            self.rollback_pages(&pre_alloc, &pre_free);
1658            return Err(e);
1659        }
1660
1661        let max_reverse_pid = {
1662            let mut max_pid = 0u32;
1663            let mut iter = self.reverse_tree.iterator();
1664            let mut key_buf = Vec::new();
1665            let mut val_buf = Vec::new();
1666            while iter.next_ref(&mut key_buf, &mut val_buf) {
1667                let pid = decode_u32_key(&key_buf)?;
1668                if pid > max_pid {
1669                    max_pid = pid;
1670                }
1671            }
1672            max_pid
1673        };
1674        let max_mapping_pid =
1675            Self::max_tree_page_id(self.store.as_ref(), *self.mapping_tree.root_page_id.read())?;
1676        let max_reverse_tree_pid =
1677            Self::max_tree_page_id(self.store.as_ref(), *self.reverse_tree.root_page_id.read())?;
1678        let max_freelist_pid = self.store.max_freelist_page_id();
1679        let mut min_end = max_reverse_pid
1680            .max(max_mapping_pid)
1681            .max(max_reverse_tree_pid)
1682            .max(max_freelist_pid)
1683            .saturating_add(1);
1684        if min_end < 2 {
1685            min_end = 2;
1686        }
1687
1688        let _ = self.store.try_truncate_tail_with_floor(min_end)?;
1689        self.start_seq
1690            .store(self.store.get_seq(), Ordering::Release);
1691        self.logical_store.clear_lid_cache();
1692
1693        Ok(CompactStats {
1694            moved_pages: moved,
1695            remaining_candidates: total_candidates.saturating_sub(moved),
1696        })
1697    }
1698
1699    fn refresh_internal(&self) -> Result<()> {
1700        self.pending_free.write().clear();
1701        self.pending_alloc.write().clear();
1702
1703        // Fast path: snapshot version unchanged, so current in-memory roots and caches are valid.
1704        let (latest_seq, _) = self.store.shared_snapshot();
1705        if latest_seq == self.start_seq.load(Ordering::Acquire) {
1706            return Ok(());
1707        }
1708
1709        self.store.clear_cache();
1710        self.logical_store.clear_lid_cache();
1711
1712        let snapshot = self.store.refresh_sb()?;
1713        *self.catalog_tree.root_page_id.write() = snapshot.catalog_root;
1714        *self.mapping_tree.root_page_id.write() = snapshot.mapping_root;
1715        *self.reverse_tree.root_page_id.write() = snapshot.reverse_root;
1716        self.start_root_id
1717            .store(snapshot.catalog_root, Ordering::Release);
1718        self.start_seq.store(snapshot.seq, Ordering::Release);
1719        Ok(())
1720    }
1721
1722    /// Returns all bucket names.
1723    pub fn buckets(&self) -> Result<Vec<String>> {
1724        let _lock = self.writer_lock.read();
1725
1726        // Ensure we see the latest buckets from disk
1727        let snapshot = self.store.refresh_sb()?;
1728        let physical_store: Arc<dyn PageStore> = self.store.clone();
1729        let mapping_tree = Arc::new(Tree::open(
1730            physical_store.clone(),
1731            Arc::new(RwLock::new(snapshot.mapping_root)),
1732            Arc::new(RwLock::new(Vec::new())),
1733            Arc::new(RwLock::new(HashSet::new())),
1734        )?);
1735        let reverse_tree = Arc::new(Tree::open(
1736            physical_store,
1737            Arc::new(RwLock::new(snapshot.reverse_root)),
1738            Arc::new(RwLock::new(Vec::new())),
1739            Arc::new(RwLock::new(HashSet::new())),
1740        )?);
1741        let logical_store = Arc::new(LogicalStore::new(
1742            self.store.clone(),
1743            mapping_tree,
1744            reverse_tree,
1745        ));
1746        let logical_store_obj: Arc<dyn PageStore> = logical_store;
1747        let catalog = Tree::open(
1748            logical_store_obj,
1749            Arc::new(RwLock::new(snapshot.catalog_root)),
1750            Arc::new(RwLock::new(Vec::new())),
1751            Arc::new(RwLock::new(HashSet::new())),
1752        )?;
1753
1754        let mut iter = catalog.iterator();
1755        let mut key_buf = Vec::new();
1756        let mut val_buf = Vec::new();
1757        let mut res = Vec::new();
1758        while iter.next_ref(&mut key_buf, &mut val_buf) {
1759            if let Ok(s) = std::str::from_utf8(&key_buf) {
1760                res.push(s.to_string());
1761            }
1762        }
1763        Ok(res)
1764    }
1765
1766    /// Returns the current transaction sequence number.
1767    /// Useful for monitoring and testing.
1768    #[doc(hidden)]
1769    pub fn current_seq(&self) -> u64 {
1770        self.store.get_seq()
1771    }
1772
1773    /// Returns the number of (allocated, freed) pages currently pending commit in this handle.
1774    /// Useful for monitoring and testing.
1775    #[doc(hidden)]
1776    pub fn pending_pages(&self) -> (usize, usize) {
1777        (
1778            self.pending_alloc.read().len(),
1779            self.pending_free.read().len(),
1780        )
1781    }
1782}
1783
1784impl Clone for BTree {
1785    /// Cloning a BTree handle shares the store, writer lock, and pending page tracking.
1786    fn clone(&self) -> Self {
1787        let catalog_root = *self.catalog_tree.root_page_id.read();
1788        let mapping_root = *self.mapping_tree.root_page_id.read();
1789        let reverse_root = *self.reverse_tree.root_page_id.read();
1790        let start_root_id = self.start_root_id.load(Ordering::Acquire);
1791        let start_seq = self.start_seq.load(Ordering::Acquire);
1792
1793        let physical_store: Arc<dyn PageStore> = self.store.clone();
1794        let mapping_tree = Arc::new(
1795            Tree::open(
1796                physical_store.clone(),
1797                Arc::new(RwLock::new(mapping_root)),
1798                self.pending_free.clone(),
1799                self.pending_alloc.clone(),
1800            )
1801            .expect("failed to clone mapping"),
1802        );
1803        let reverse_tree = Arc::new(
1804            Tree::open(
1805                physical_store,
1806                Arc::new(RwLock::new(reverse_root)),
1807                self.pending_free.clone(),
1808                self.pending_alloc.clone(),
1809            )
1810            .expect("failed to clone reverse"),
1811        );
1812        let logical_store = Arc::new(LogicalStore::new(
1813            self.store.clone(),
1814            mapping_tree.clone(),
1815            reverse_tree.clone(),
1816        ));
1817        let logical_store_obj: Arc<dyn PageStore> = logical_store.clone();
1818        let catalog_tree = Arc::new(
1819            Tree::open(
1820                logical_store_obj,
1821                Arc::new(RwLock::new(catalog_root)),
1822                self.pending_free.clone(),
1823                self.pending_alloc.clone(),
1824            )
1825            .expect("failed to clone catalog"),
1826        );
1827
1828        Self {
1829            store: self.store.clone(),
1830            catalog_tree,
1831            mapping_tree,
1832            reverse_tree,
1833            logical_store,
1834            pending_free: self.pending_free.clone(),
1835            pending_alloc: self.pending_alloc.clone(),
1836            writer_lock: self.writer_lock.clone(),
1837            start_root_id: Arc::new(AtomicU32::new(start_root_id)),
1838            start_seq: Arc::new(AtomicU64::new(start_seq)),
1839            bucket_root_cache: self.bucket_root_cache.clone(),
1840            bucket_tree_cache: self.bucket_tree_cache.clone(),
1841            instance_anchor: self.instance_anchor.clone(),
1842        }
1843    }
1844}