1use crate::{Result, SqlRsError};
2use std::path::Path;
3use std::sync::Arc;
4use parking_lot::RwLock;
5use serde::{Deserialize, Serialize};
6
7use super::file::FileManager;
8use super::page::{Page, PageId, PageType, PageCache};
9
10const MAX_KEYS_PER_NODE: usize = 64;
11
12#[derive(Debug, Serialize, Deserialize)]
13struct BTreeNode {
14 page_id: PageId,
15 is_leaf: bool,
16 keys: Vec<Vec<u8>>,
17 values: Vec<Vec<u8>>,
18 children: Vec<PageId>,
19}
20
21impl BTreeNode {
22 fn new(page_id: PageId, is_leaf: bool) -> Self {
23 Self {
24 page_id,
25 is_leaf,
26 keys: Vec::new(),
27 values: Vec::new(),
28 children: Vec::new(),
29 }
30 }
31
32 fn serialize(&self) -> Vec<u8> {
33 bincode::serialize(self).unwrap_or_default()
34 }
35
36 fn deserialize(data: &[u8]) -> Option<Self> {
37 bincode::deserialize(data).ok()
38 }
39}
40
41pub struct BTree {
42 file_manager: Arc<RwLock<FileManager>>,
43 cache: RwLock<PageCache>,
44 root_page_id: PageId,
45}
46
47impl BTree {
48 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
49 let file_manager = FileManager::open(path)?;
50
51 if file_manager.page_count() == 0 {
52 return Err(SqlRsError::Storage("Empty database file".to_string()));
53 }
54
55 Ok(Self {
56 file_manager: Arc::new(RwLock::new(file_manager)),
57 cache: RwLock::new(PageCache::new(100)),
58 root_page_id: 0,
59 })
60 }
61
62 pub fn create<P: AsRef<Path>>(path: P) -> Result<Self> {
63 let mut file_manager = FileManager::create(path)?;
64
65 let root_page_id = file_manager.allocate_page();
66 let mut root_page = Page::new(root_page_id, PageType::BTreeLeaf);
67 root_page.write_header();
68
69 let root_node = BTreeNode::new(root_page_id, true);
70 let serialized = root_node.serialize();
71 root_page.data[1..serialized.len() + 1].copy_from_slice(&serialized);
72
73 file_manager.write_page(&root_page)?;
74 file_manager.flush()?;
75
76 Ok(Self {
77 file_manager: Arc::new(RwLock::new(file_manager)),
78 cache: RwLock::new(PageCache::new(100)),
79 root_page_id,
80 })
81 }
82
83 fn read_node(&self, page_id: PageId) -> Result<BTreeNode> {
84 {
85 let cache = self.cache.read();
86 if let Some(page) = cache.get(page_id) {
87 if let Some(node) = BTreeNode::deserialize(&page.data[1..]) {
88 return Ok(node);
89 }
90 }
91 }
92
93 let mut file_manager = self.file_manager.write();
94 let page = file_manager.read_page(page_id)?;
95
96 let node = BTreeNode::deserialize(&page.data[1..])
97 .ok_or_else(|| SqlRsError::Storage("Failed to deserialize node".to_string()))?;
98
99 self.cache.write().insert(page);
100
101 Ok(node)
102 }
103
104 fn write_node(&self, node: &BTreeNode) -> Result<()> {
105 let mut page = Page::new(node.page_id, if node.is_leaf { PageType::BTreeLeaf } else { PageType::BTreeInternal });
106 page.write_header();
107
108 let serialized = node.serialize();
109 if serialized.len() + 1 > page.data.len() {
110 return Err(SqlRsError::Storage("Node too large for page".to_string()));
111 }
112
113 page.data[1..serialized.len() + 1].copy_from_slice(&serialized);
114
115 self.cache.write().insert(page.clone());
116 self.file_manager.write().write_page(&page)?;
117
118 Ok(())
119 }
120
121 pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
122 self.search_recursive(self.root_page_id, key)
123 }
124
125 fn search_recursive(&self, page_id: PageId, key: &[u8]) -> Result<Option<Vec<u8>>> {
126 let node = self.read_node(page_id)?;
127
128 if node.is_leaf {
129 for (i, k) in node.keys.iter().enumerate() {
130 if k.as_slice() == key {
131 return Ok(Some(node.values[i].clone()));
132 }
133 }
134 Ok(None)
135 } else {
136 let mut child_idx = 0;
137 for (i, k) in node.keys.iter().enumerate() {
138 if key < k.as_slice() {
139 child_idx = i;
140 break;
141 }
142 child_idx = i + 1;
143 }
144
145 if child_idx < node.children.len() {
146 self.search_recursive(node.children[child_idx], key)
147 } else {
148 Ok(None)
149 }
150 }
151 }
152
153 pub fn insert(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
154 let mut root = self.read_node(self.root_page_id)?;
155
156 if root.keys.len() >= MAX_KEYS_PER_NODE {
157 let new_root_id = self.file_manager.write().allocate_page();
158 let mut new_root = BTreeNode::new(new_root_id, false);
159 new_root.children.push(self.root_page_id);
160
161 self.split_child(&mut new_root, 0)?;
162 self.root_page_id = new_root_id;
163 self.write_node(&new_root)?;
164
165 root = new_root;
166 }
167
168 self.insert_non_full(root, key, value)?;
169 Ok(())
170 }
171
172 fn insert_non_full(&self, mut node: BTreeNode, key: &[u8], value: &[u8]) -> Result<()> {
173 if node.is_leaf {
174 let pos = node.keys.iter().position(|k| k.as_slice() >= key).unwrap_or(node.keys.len());
175
176 if pos < node.keys.len() && node.keys[pos] == key {
177 node.values[pos] = value.to_vec();
178 } else {
179 node.keys.insert(pos, key.to_vec());
180 node.values.insert(pos, value.to_vec());
181 }
182
183 self.write_node(&node)?;
184 } else {
185 let mut pos = node.keys.iter().position(|k| k.as_slice() > key).unwrap_or(node.keys.len());
186
187 let child_id = node.children[pos];
188 let child = self.read_node(child_id)?;
189
190 if child.keys.len() >= MAX_KEYS_PER_NODE {
191 self.split_child(&mut node, pos)?;
192
193 if key > node.keys[pos].as_slice() {
194 pos += 1;
195 }
196 }
197
198 let child_id = node.children[pos];
199 let child = self.read_node(child_id)?;
200 self.insert_non_full(child, key, value)?;
201 }
202
203 Ok(())
204 }
205
206 fn split_child(&self, parent: &mut BTreeNode, child_idx: usize) -> Result<()> {
207 let child_id = parent.children[child_idx];
208 let mut child = self.read_node(child_id)?;
209
210 let mid = child.keys.len() / 2;
211
212 let new_node_id = self.file_manager.write().allocate_page();
213 let mut new_node = BTreeNode::new(new_node_id, child.is_leaf);
214
215 if child.is_leaf {
216 new_node.keys = child.keys.split_off(mid);
217 new_node.values = child.values.split_off(mid);
218
219 let mid_key = new_node.keys[0].clone();
220 parent.keys.insert(child_idx, mid_key);
221 } else {
222 new_node.keys = child.keys.split_off(mid + 1);
223 new_node.values = child.values.split_off(mid + 1);
224 new_node.children = child.children.split_off(mid + 1);
225
226 let mid_key = child.keys.pop().unwrap();
227 child.values.pop();
228
229 parent.keys.insert(child_idx, mid_key);
230 }
231
232 parent.children.insert(child_idx + 1, new_node_id);
233
234 self.write_node(&child)?;
235 self.write_node(&new_node)?;
236 self.write_node(parent)?;
237
238 Ok(())
239 }
240
241 pub fn delete(&mut self, key: &[u8]) -> Result<()> {
242 let mut root = self.read_node(self.root_page_id)?;
243 self.delete_recursive(&mut root, key)?;
244
245 if !root.is_leaf && root.keys.is_empty() {
247 if let Some(&new_root_id) = root.children.first() {
248 self.root_page_id = new_root_id;
249 }
250 }
251
252 Ok(())
253 }
254
255 fn delete_recursive(&self, node: &mut BTreeNode, key: &[u8]) -> Result<bool> {
256 let idx = node.keys.iter().position(|k| k.as_slice() >= key).unwrap_or(node.keys.len());
257
258 if node.is_leaf {
259 if idx < node.keys.len() && node.keys[idx] == key {
260 node.keys.remove(idx);
261 node.values.remove(idx);
262 self.write_node(node)?;
263 return Ok(true);
264 }
265 return Ok(false);
266 }
267
268 if idx < node.keys.len() && node.keys[idx] == key {
270 return self.delete_from_internal_node(node, idx);
271 }
272
273 let child_id = node.children[idx];
275 let mut child = self.read_node(child_id)?;
276
277 if child.keys.len() <= MAX_KEYS_PER_NODE / 2 {
279 self.ensure_min_keys(node, idx)?;
280 let child_id = node.children[idx];
282 let mut child = self.read_node(child_id)?;
283 return self.delete_recursive(&mut child, key);
284 }
285
286 self.delete_recursive(&mut child, key)
287 }
288
289 fn delete_from_internal_node(&self, node: &mut BTreeNode, idx: usize) -> Result<bool> {
290 let predecessor_child_id = node.children[idx];
291 let predecessor_child = self.read_node(predecessor_child_id)?;
292
293 if predecessor_child.keys.len() > MAX_KEYS_PER_NODE / 2 {
294 let pred_key = self.get_predecessor(predecessor_child_id)?;
296
297 node.keys[idx] = pred_key.clone();
298 self.write_node(node)?;
299
300 let mut child = self.read_node(predecessor_child_id)?;
301 return self.delete_recursive(&mut child, &pred_key);
302 }
303
304 let successor_child_id = node.children[idx + 1];
305 let successor_child = self.read_node(successor_child_id)?;
306
307 if successor_child.keys.len() > MAX_KEYS_PER_NODE / 2 {
308 let succ_key = self.get_successor(successor_child_id)?;
310
311 node.keys[idx] = succ_key.clone();
312 self.write_node(node)?;
313
314 let mut child = self.read_node(successor_child_id)?;
315 return self.delete_recursive(&mut child, &succ_key);
316 }
317
318 self.merge_children(node, idx)?;
320 let merged_child_id = node.children[idx];
321 let mut merged_child = self.read_node(merged_child_id)?;
322 let key_to_delete = node.keys[idx].clone();
323 self.delete_recursive(&mut merged_child, &key_to_delete)
324 }
325
326 fn get_predecessor(&self, mut page_id: PageId) -> Result<Vec<u8>> {
327 loop {
328 let node = self.read_node(page_id)?;
329 if node.is_leaf {
330 return Ok(node.keys.last().cloned().unwrap());
331 }
332 page_id = *node.children.last().unwrap();
333 }
334 }
335
336 fn get_successor(&self, mut page_id: PageId) -> Result<Vec<u8>> {
337 loop {
338 let node = self.read_node(page_id)?;
339 if node.is_leaf {
340 return Ok(node.keys.first().cloned().unwrap());
341 }
342 page_id = node.children[0];
343 }
344 }
345
346 fn ensure_min_keys(&self, parent: &mut BTreeNode, idx: usize) -> Result<()> {
347 let child_id = parent.children[idx];
348 let child = self.read_node(child_id)?;
349
350 if child.keys.len() > MAX_KEYS_PER_NODE / 2 {
351 return Ok(());
352 }
353
354 if idx > 0 {
356 let left_sibling_id = parent.children[idx - 1];
357 let left_sibling = self.read_node(left_sibling_id)?;
358
359 if left_sibling.keys.len() > MAX_KEYS_PER_NODE / 2 {
360 return self.borrow_from_left(parent, idx);
361 }
362 }
363
364 if idx < parent.children.len() - 1 {
366 let right_sibling_id = parent.children[idx + 1];
367 let right_sibling = self.read_node(right_sibling_id)?;
368
369 if right_sibling.keys.len() > MAX_KEYS_PER_NODE / 2 {
370 return self.borrow_from_right(parent, idx);
371 }
372 }
373
374 if idx > 0 {
376 self.merge_children(parent, idx - 1)?;
377 } else {
378 self.merge_children(parent, idx)?;
379 }
380
381 Ok(())
382 }
383
384 fn borrow_from_left(&self, parent: &mut BTreeNode, idx: usize) -> Result<()> {
385 let child_id = parent.children[idx];
386 let left_sibling_id = parent.children[idx - 1];
387
388 let mut child = self.read_node(child_id)?;
389 let mut left_sibling = self.read_node(left_sibling_id)?;
390
391 child.keys.insert(0, parent.keys[idx - 1].clone());
393 if child.is_leaf {
394 child.values.insert(0, left_sibling.values.pop().unwrap());
395 } else {
396 child.children.insert(0, left_sibling.children.pop().unwrap());
397 child.values.insert(0, left_sibling.values.pop().unwrap());
398 }
399
400 parent.keys[idx - 1] = left_sibling.keys.pop().unwrap();
402
403 self.write_node(&child)?;
404 self.write_node(&left_sibling)?;
405 self.write_node(parent)?;
406
407 Ok(())
408 }
409
410 fn borrow_from_right(&self, parent: &mut BTreeNode, idx: usize) -> Result<()> {
411 let child_id = parent.children[idx];
412 let right_sibling_id = parent.children[idx + 1];
413
414 let mut child = self.read_node(child_id)?;
415 let mut right_sibling = self.read_node(right_sibling_id)?;
416
417 child.keys.push(parent.keys[idx].clone());
419 if child.is_leaf {
420 child.values.push(right_sibling.values.remove(0));
421 } else {
422 child.children.push(right_sibling.children.remove(0));
423 child.values.push(right_sibling.values.remove(0));
424 }
425
426 parent.keys[idx] = right_sibling.keys.remove(0);
428
429 self.write_node(&child)?;
430 self.write_node(&right_sibling)?;
431 self.write_node(parent)?;
432
433 Ok(())
434 }
435
436 fn merge_children(&self, parent: &mut BTreeNode, idx: usize) -> Result<()> {
437 let left_child_id = parent.children[idx];
438 let right_child_id = parent.children[idx + 1];
439
440 let mut left_child = self.read_node(left_child_id)?;
441 let right_child = self.read_node(right_child_id)?;
442
443 left_child.keys.push(parent.keys.remove(idx));
445 parent.children.remove(idx + 1);
446
447 left_child.keys.extend(right_child.keys);
449 left_child.values.extend(right_child.values);
450 if !left_child.is_leaf {
451 left_child.children.extend(right_child.children);
452 }
453
454 self.write_node(&left_child)?;
455 self.write_node(parent)?;
456
457 Ok(())
458 }
459
460 pub fn scan(&self, start: &[u8], end: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
461 let mut results = Vec::new();
462 self.scan_recursive(self.root_page_id, start, end, &mut results)?;
463 Ok(results)
464 }
465
466 fn scan_recursive(&self, page_id: PageId, start: &[u8], end: &[u8], results: &mut Vec<(Vec<u8>, Vec<u8>)>) -> Result<()> {
467 let node = self.read_node(page_id)?;
468
469 if node.is_leaf {
470 for (i, key) in node.keys.iter().enumerate() {
471 if key.as_slice() >= start && key.as_slice() < end {
472 results.push((key.clone(), node.values[i].clone()));
473 }
474 }
475 } else {
476 for (i, child_id) in node.children.iter().enumerate() {
477 if i < node.keys.len() {
478 if node.keys[i].as_slice() >= start {
479 self.scan_recursive(*child_id, start, end, results)?;
480 }
481 } else {
482 self.scan_recursive(*child_id, start, end, results)?;
483 }
484 }
485 }
486
487 Ok(())
488 }
489
490 pub fn flush(&mut self) -> Result<()> {
491 self.cache.write().clear();
492 self.file_manager.write().flush()
493 }
494
495 pub fn get_file_manager(&self) -> Arc<RwLock<FileManager>> {
496 Arc::clone(&self.file_manager)
497 }
498}