Skip to main content

hematite/btree/
tree.rs

1//! Generic tree lifecycle, validation, and space-accounting helpers.
2//!
3//! This file contains the control-plane operations around a tree root:
4//! - create a fresh root page;
5//! - open an existing root and validate it is a B-tree page;
6//! - delete or reset an entire tree;
7//! - walk all pages in a tree for validation or space accounting.
8//!
9//! It complements `index.rs`:
10//! - `index.rs` mutates a single tree;
11//! - `tree.rs` manages a tree as a durable object rooted at one page id.
12
13use crate::btree::index::BTreeIndex;
14use crate::btree::node::BTreeNode;
15use crate::btree::value_store::StoredValueLayout;
16use crate::btree::NodeType;
17use crate::error::{HematiteError, Result};
18use crate::storage::overflow::collect_overflow_page_ids;
19use crate::storage::{
20    Page, PageId, Pager, DB_HEADER_PAGE_ID, INVALID_PAGE_ID, STORAGE_METADATA_PAGE_ID,
21};
22use std::collections::HashSet;
23use std::sync::{Arc, Mutex, MutexGuard};
24
25pub struct BTreeManager {
26    storage: Arc<Mutex<Pager>>,
27}
28
29impl BTreeManager {
30    fn lock_storage(&self) -> Result<MutexGuard<'_, Pager>> {
31        self.storage.lock().map_err(|_| {
32            HematiteError::InternalError("B-tree manager storage mutex is poisoned".to_string())
33        })
34    }
35
36    #[cfg(test)]
37    pub fn new(storage: Pager) -> Self {
38        Self {
39            storage: Arc::new(Mutex::new(storage)),
40        }
41    }
42
43    pub fn from_shared_storage(storage: Arc<Mutex<Pager>>) -> Self {
44        Self { storage }
45    }
46
47    pub fn create_tree(&mut self) -> Result<PageId> {
48        let mut pager = self.lock_storage()?;
49        create_tree_root(&mut pager)
50    }
51
52    pub fn open_tree(&mut self, root_page_id: PageId) -> Result<BTreeIndex> {
53        let page = self.lock_storage()?.read_page(root_page_id)?;
54        let _node = BTreeNode::from_page(page)?;
55        Ok(BTreeIndex::from_shared_storage(
56            self.storage.clone(),
57            root_page_id,
58        ))
59    }
60
61    pub fn delete_tree(&mut self, root_page_id: PageId) -> Result<()> {
62        self.delete_tree_recursive(root_page_id)?;
63        Ok(())
64    }
65
66    fn delete_tree_recursive(&mut self, page_id: PageId) -> Result<()> {
67        let page = self.lock_storage()?.read_page(page_id)?;
68        let node = BTreeNode::from_page(page)?;
69
70        match node.node_type {
71            NodeType::Leaf => {
72                self.lock_storage()?.deallocate_page(page_id)?;
73            }
74            NodeType::Internal => {
75                for child_page_id in node.children {
76                    self.delete_tree_recursive(child_page_id)?;
77                }
78                self.lock_storage()?.deallocate_page(page_id)?;
79            }
80        }
81        Ok(())
82    }
83
84    pub fn validate_tree(&mut self, root_page_id: PageId) -> Result<bool> {
85        if root_page_id == INVALID_PAGE_ID
86            || root_page_id == DB_HEADER_PAGE_ID
87            || root_page_id == STORAGE_METADATA_PAGE_ID
88        {
89            return Ok(false);
90        }
91
92        let mut state = TreeValidationState {
93            visited: HashSet::new(),
94            leaf_depth: None,
95        };
96
97        self.validate_node_recursive(root_page_id, None, None, 0, &mut state)
98    }
99
100    fn validate_node_recursive(
101        &mut self,
102        page_id: PageId,
103        lower_bound: Option<KeyBound>,
104        upper_bound: Option<KeyBound>,
105        depth: usize,
106        state: &mut TreeValidationState,
107    ) -> Result<bool> {
108        if page_id == INVALID_PAGE_ID
109            || page_id == DB_HEADER_PAGE_ID
110            || page_id == STORAGE_METADATA_PAGE_ID
111        {
112            return Ok(false);
113        }
114
115        if !state.visited.insert(page_id) {
116            return Ok(false);
117        }
118
119        let page = self.lock_storage()?.read_page(page_id)?;
120        let node = BTreeNode::from_page(page)?;
121
122        for i in 1..node.keys.len() {
123            if node.keys[i - 1] >= node.keys[i] {
124                return Ok(false);
125            }
126        }
127        for key in &node.keys {
128            if let Some(lower) = &lower_bound {
129                let below_lower = if lower.inclusive {
130                    key.as_bytes() < lower.key.as_slice()
131                } else {
132                    key.as_bytes() <= lower.key.as_slice()
133                };
134                if below_lower {
135                    return Ok(false);
136                }
137            }
138            if let Some(upper) = &upper_bound {
139                let above_upper = if upper.inclusive {
140                    key.as_bytes() > upper.key.as_slice()
141                } else {
142                    key.as_bytes() >= upper.key.as_slice()
143                };
144                if above_upper {
145                    return Ok(false);
146                }
147            }
148        }
149
150        match node.node_type {
151            NodeType::Leaf => {
152                if node.keys.len() != node.values.len() {
153                    return Ok(false);
154                }
155
156                if let Some(expected_depth) = state.leaf_depth {
157                    if expected_depth != depth {
158                        return Ok(false);
159                    }
160                } else {
161                    state.leaf_depth = Some(depth);
162                }
163
164                Ok(true)
165            }
166            NodeType::Internal => {
167                if node.children.len() != node.keys.len() + 1 {
168                    return Ok(false);
169                }
170                if !node.values.is_empty() {
171                    return Ok(false);
172                }
173
174                for (child_index, child_page_id) in node.children.iter().copied().enumerate() {
175                    let child_lower = if child_index == 0 {
176                        lower_bound.clone()
177                    } else {
178                        Some(KeyBound {
179                            key: node.keys[child_index - 1].as_bytes().to_vec(),
180                            inclusive: true,
181                        })
182                    };
183                    let child_upper = if child_index == node.keys.len() {
184                        upper_bound.clone()
185                    } else {
186                        Some(KeyBound {
187                            key: node.keys[child_index].as_bytes().to_vec(),
188                            inclusive: false,
189                        })
190                    };
191
192                    if !self.validate_node_recursive(
193                        child_page_id,
194                        child_lower,
195                        child_upper,
196                        depth + 1,
197                        state,
198                    )? {
199                        return Ok(false);
200                    }
201                }
202
203                Ok(true)
204            }
205        }
206    }
207}
208
209pub fn create_tree_root(pager: &mut Pager) -> Result<PageId> {
210    let root_page_id = pager.allocate_page()?;
211    initialize_empty_tree_root(pager, root_page_id)?;
212    Ok(root_page_id)
213}
214
215pub fn initialize_empty_tree_root(pager: &mut Pager, root_page_id: PageId) -> Result<()> {
216    let mut root_page = Page::new(root_page_id);
217    let root_node = BTreeNode::new_leaf(root_page_id);
218    BTreeNode::to_page(&root_node, &mut root_page)?;
219    pager.write_page(root_page)
220}
221
222pub fn reset_tree_pages(pager: &mut Pager, root_page_id: PageId) -> Result<()> {
223    let mut page_ids = Vec::new();
224    collect_tree_page_ids(pager, root_page_id, &mut page_ids)?;
225    for page_id in page_ids {
226        if page_id != root_page_id {
227            pager.deallocate_page(page_id)?;
228        }
229    }
230    initialize_empty_tree_root(pager, root_page_id)
231}
232
233pub fn collect_tree_page_ids(
234    pager: &mut Pager,
235    page_id: PageId,
236    out: &mut Vec<PageId>,
237) -> Result<()> {
238    out.push(page_id);
239    let page = pager.read_page(page_id)?;
240    let node = BTreeNode::from_page(page)?;
241    if node.node_type == NodeType::Internal {
242        for child_page_id in node.children {
243            collect_tree_page_ids(pager, child_page_id, out)?;
244        }
245    }
246    Ok(())
247}
248
249#[derive(Debug, Clone, Default, PartialEq, Eq)]
250pub struct TreeSpaceStats {
251    pub page_ids: Vec<PageId>,
252    pub overflow_page_ids: Vec<PageId>,
253    pub used_bytes: usize,
254    pub overflow_used_bytes: usize,
255    pub leaf_pages: usize,
256    pub internal_pages: usize,
257}
258
259pub fn collect_tree_space_stats(pager: &mut Pager, root_page_id: PageId) -> Result<TreeSpaceStats> {
260    let mut visited = HashSet::new();
261    let mut stats = TreeSpaceStats::default();
262    collect_tree_space_stats_recursive(pager, root_page_id, &mut visited, &mut stats)?;
263    Ok(stats)
264}
265
266fn collect_tree_space_stats_recursive(
267    pager: &mut Pager,
268    page_id: PageId,
269    visited: &mut HashSet<PageId>,
270    stats: &mut TreeSpaceStats,
271) -> Result<()> {
272    if !visited.insert(page_id) {
273        return Err(crate::error::HematiteError::CorruptedData(format!(
274            "Cycle detected while collecting tree space stats at page {}",
275            page_id
276        )));
277    }
278
279    let page = pager.read_page(page_id)?;
280    let node = BTreeNode::from_page(page)?;
281    stats.page_ids.push(page_id);
282    stats.used_bytes += node.estimate_serialized_size();
283
284    match node.node_type {
285        NodeType::Leaf => {
286            stats.leaf_pages += 1;
287            for value in &node.values {
288                let layout = StoredValueLayout::decode(value.as_bytes())?;
289                if layout.overflow_first_page != INVALID_PAGE_ID {
290                    let overflow_page_ids =
291                        collect_overflow_page_ids(pager, Some(layout.overflow_first_page))?;
292                    stats.overflow_used_bytes += layout.overflow_len();
293                    for overflow_page_id in overflow_page_ids {
294                        if visited.contains(&overflow_page_id)
295                            || stats.overflow_page_ids.contains(&overflow_page_id)
296                        {
297                            return Err(crate::error::HematiteError::CorruptedData(format!(
298                                "Duplicate overflow page {} encountered while collecting tree space stats",
299                                overflow_page_id
300                            )));
301                        }
302                        stats.overflow_page_ids.push(overflow_page_id);
303                    }
304                }
305            }
306        }
307        NodeType::Internal => {
308            stats.internal_pages += 1;
309            for child_page_id in node.children {
310                collect_tree_space_stats_recursive(pager, child_page_id, visited, stats)?;
311            }
312        }
313    }
314
315    Ok(())
316}
317
318#[derive(Debug, Default)]
319struct TreeValidationState {
320    visited: HashSet<PageId>,
321    leaf_depth: Option<usize>,
322}
323
324#[derive(Debug, Clone)]
325struct KeyBound {
326    key: Vec<u8>,
327    inclusive: bool,
328}