1use crate::btree::index::BTreeIndex;
14use crate::btree::node::BTreeNode;
15use crate::btree::value_store::StoredValueLayout;
16use crate::btree::NodeType;
17use crate::error::{HematiteError, Result};
18use crate::storage::overflow::collect_overflow_page_ids;
19use crate::storage::{
20 Page, PageId, Pager, DB_HEADER_PAGE_ID, INVALID_PAGE_ID, STORAGE_METADATA_PAGE_ID,
21};
22use std::collections::HashSet;
23use std::sync::{Arc, Mutex, MutexGuard};
24
25pub struct BTreeManager {
26 storage: Arc<Mutex<Pager>>,
27}
28
29impl BTreeManager {
30 fn lock_storage(&self) -> Result<MutexGuard<'_, Pager>> {
31 self.storage.lock().map_err(|_| {
32 HematiteError::InternalError("B-tree manager storage mutex is poisoned".to_string())
33 })
34 }
35
36 #[cfg(test)]
37 pub fn new(storage: Pager) -> Self {
38 Self {
39 storage: Arc::new(Mutex::new(storage)),
40 }
41 }
42
43 pub fn from_shared_storage(storage: Arc<Mutex<Pager>>) -> Self {
44 Self { storage }
45 }
46
47 pub fn create_tree(&mut self) -> Result<PageId> {
48 let mut pager = self.lock_storage()?;
49 create_tree_root(&mut pager)
50 }
51
52 pub fn open_tree(&mut self, root_page_id: PageId) -> Result<BTreeIndex> {
53 let page = self.lock_storage()?.read_page(root_page_id)?;
54 let _node = BTreeNode::from_page(page)?;
55 Ok(BTreeIndex::from_shared_storage(
56 self.storage.clone(),
57 root_page_id,
58 ))
59 }
60
61 pub fn delete_tree(&mut self, root_page_id: PageId) -> Result<()> {
62 self.delete_tree_recursive(root_page_id)?;
63 Ok(())
64 }
65
66 fn delete_tree_recursive(&mut self, page_id: PageId) -> Result<()> {
67 let page = self.lock_storage()?.read_page(page_id)?;
68 let node = BTreeNode::from_page(page)?;
69
70 match node.node_type {
71 NodeType::Leaf => {
72 self.lock_storage()?.deallocate_page(page_id)?;
73 }
74 NodeType::Internal => {
75 for child_page_id in node.children {
76 self.delete_tree_recursive(child_page_id)?;
77 }
78 self.lock_storage()?.deallocate_page(page_id)?;
79 }
80 }
81 Ok(())
82 }
83
84 pub fn validate_tree(&mut self, root_page_id: PageId) -> Result<bool> {
85 if root_page_id == INVALID_PAGE_ID
86 || root_page_id == DB_HEADER_PAGE_ID
87 || root_page_id == STORAGE_METADATA_PAGE_ID
88 {
89 return Ok(false);
90 }
91
92 let mut state = TreeValidationState {
93 visited: HashSet::new(),
94 leaf_depth: None,
95 };
96
97 self.validate_node_recursive(root_page_id, None, None, 0, &mut state)
98 }
99
100 fn validate_node_recursive(
101 &mut self,
102 page_id: PageId,
103 lower_bound: Option<KeyBound>,
104 upper_bound: Option<KeyBound>,
105 depth: usize,
106 state: &mut TreeValidationState,
107 ) -> Result<bool> {
108 if page_id == INVALID_PAGE_ID
109 || page_id == DB_HEADER_PAGE_ID
110 || page_id == STORAGE_METADATA_PAGE_ID
111 {
112 return Ok(false);
113 }
114
115 if !state.visited.insert(page_id) {
116 return Ok(false);
117 }
118
119 let page = self.lock_storage()?.read_page(page_id)?;
120 let node = BTreeNode::from_page(page)?;
121
122 for i in 1..node.keys.len() {
123 if node.keys[i - 1] >= node.keys[i] {
124 return Ok(false);
125 }
126 }
127 for key in &node.keys {
128 if let Some(lower) = &lower_bound {
129 let below_lower = if lower.inclusive {
130 key.as_bytes() < lower.key.as_slice()
131 } else {
132 key.as_bytes() <= lower.key.as_slice()
133 };
134 if below_lower {
135 return Ok(false);
136 }
137 }
138 if let Some(upper) = &upper_bound {
139 let above_upper = if upper.inclusive {
140 key.as_bytes() > upper.key.as_slice()
141 } else {
142 key.as_bytes() >= upper.key.as_slice()
143 };
144 if above_upper {
145 return Ok(false);
146 }
147 }
148 }
149
150 match node.node_type {
151 NodeType::Leaf => {
152 if node.keys.len() != node.values.len() {
153 return Ok(false);
154 }
155
156 if let Some(expected_depth) = state.leaf_depth {
157 if expected_depth != depth {
158 return Ok(false);
159 }
160 } else {
161 state.leaf_depth = Some(depth);
162 }
163
164 Ok(true)
165 }
166 NodeType::Internal => {
167 if node.children.len() != node.keys.len() + 1 {
168 return Ok(false);
169 }
170 if !node.values.is_empty() {
171 return Ok(false);
172 }
173
174 for (child_index, child_page_id) in node.children.iter().copied().enumerate() {
175 let child_lower = if child_index == 0 {
176 lower_bound.clone()
177 } else {
178 Some(KeyBound {
179 key: node.keys[child_index - 1].as_bytes().to_vec(),
180 inclusive: true,
181 })
182 };
183 let child_upper = if child_index == node.keys.len() {
184 upper_bound.clone()
185 } else {
186 Some(KeyBound {
187 key: node.keys[child_index].as_bytes().to_vec(),
188 inclusive: false,
189 })
190 };
191
192 if !self.validate_node_recursive(
193 child_page_id,
194 child_lower,
195 child_upper,
196 depth + 1,
197 state,
198 )? {
199 return Ok(false);
200 }
201 }
202
203 Ok(true)
204 }
205 }
206 }
207}
208
209pub fn create_tree_root(pager: &mut Pager) -> Result<PageId> {
210 let root_page_id = pager.allocate_page()?;
211 initialize_empty_tree_root(pager, root_page_id)?;
212 Ok(root_page_id)
213}
214
215pub fn initialize_empty_tree_root(pager: &mut Pager, root_page_id: PageId) -> Result<()> {
216 let mut root_page = Page::new(root_page_id);
217 let root_node = BTreeNode::new_leaf(root_page_id);
218 BTreeNode::to_page(&root_node, &mut root_page)?;
219 pager.write_page(root_page)
220}
221
222pub fn reset_tree_pages(pager: &mut Pager, root_page_id: PageId) -> Result<()> {
223 let mut page_ids = Vec::new();
224 collect_tree_page_ids(pager, root_page_id, &mut page_ids)?;
225 for page_id in page_ids {
226 if page_id != root_page_id {
227 pager.deallocate_page(page_id)?;
228 }
229 }
230 initialize_empty_tree_root(pager, root_page_id)
231}
232
233pub fn collect_tree_page_ids(
234 pager: &mut Pager,
235 page_id: PageId,
236 out: &mut Vec<PageId>,
237) -> Result<()> {
238 out.push(page_id);
239 let page = pager.read_page(page_id)?;
240 let node = BTreeNode::from_page(page)?;
241 if node.node_type == NodeType::Internal {
242 for child_page_id in node.children {
243 collect_tree_page_ids(pager, child_page_id, out)?;
244 }
245 }
246 Ok(())
247}
248
249#[derive(Debug, Clone, Default, PartialEq, Eq)]
250pub struct TreeSpaceStats {
251 pub page_ids: Vec<PageId>,
252 pub overflow_page_ids: Vec<PageId>,
253 pub used_bytes: usize,
254 pub overflow_used_bytes: usize,
255 pub leaf_pages: usize,
256 pub internal_pages: usize,
257}
258
259pub fn collect_tree_space_stats(pager: &mut Pager, root_page_id: PageId) -> Result<TreeSpaceStats> {
260 let mut visited = HashSet::new();
261 let mut stats = TreeSpaceStats::default();
262 collect_tree_space_stats_recursive(pager, root_page_id, &mut visited, &mut stats)?;
263 Ok(stats)
264}
265
266fn collect_tree_space_stats_recursive(
267 pager: &mut Pager,
268 page_id: PageId,
269 visited: &mut HashSet<PageId>,
270 stats: &mut TreeSpaceStats,
271) -> Result<()> {
272 if !visited.insert(page_id) {
273 return Err(crate::error::HematiteError::CorruptedData(format!(
274 "Cycle detected while collecting tree space stats at page {}",
275 page_id
276 )));
277 }
278
279 let page = pager.read_page(page_id)?;
280 let node = BTreeNode::from_page(page)?;
281 stats.page_ids.push(page_id);
282 stats.used_bytes += node.estimate_serialized_size();
283
284 match node.node_type {
285 NodeType::Leaf => {
286 stats.leaf_pages += 1;
287 for value in &node.values {
288 let layout = StoredValueLayout::decode(value.as_bytes())?;
289 if layout.overflow_first_page != INVALID_PAGE_ID {
290 let overflow_page_ids =
291 collect_overflow_page_ids(pager, Some(layout.overflow_first_page))?;
292 stats.overflow_used_bytes += layout.overflow_len();
293 for overflow_page_id in overflow_page_ids {
294 if visited.contains(&overflow_page_id)
295 || stats.overflow_page_ids.contains(&overflow_page_id)
296 {
297 return Err(crate::error::HematiteError::CorruptedData(format!(
298 "Duplicate overflow page {} encountered while collecting tree space stats",
299 overflow_page_id
300 )));
301 }
302 stats.overflow_page_ids.push(overflow_page_id);
303 }
304 }
305 }
306 }
307 NodeType::Internal => {
308 stats.internal_pages += 1;
309 for child_page_id in node.children {
310 collect_tree_space_stats_recursive(pager, child_page_id, visited, stats)?;
311 }
312 }
313 }
314
315 Ok(())
316}
317
318#[derive(Debug, Default)]
319struct TreeValidationState {
320 visited: HashSet<PageId>,
321 leaf_depth: Option<usize>,
322}
323
324#[derive(Debug, Clone)]
325struct KeyBound {
326 key: Vec<u8>,
327 inclusive: bool,
328}