1use crate::{Result, SqlRsError};
2use std::path::Path;
3use parking_lot::RwLock;
4use serde::{Deserialize, Serialize};
5
6use super::file::FileManager;
7use super::page::{Page, PageId, PageType, PageCache};
8
9const MAX_KEYS_PER_NODE: usize = 64;
10
11#[derive(Debug, Serialize, Deserialize)]
12struct BTreeNode {
13 page_id: PageId,
14 is_leaf: bool,
15 keys: Vec<Vec<u8>>,
16 values: Vec<Vec<u8>>,
17 children: Vec<PageId>,
18}
19
20impl BTreeNode {
21 fn new(page_id: PageId, is_leaf: bool) -> Self {
22 Self {
23 page_id,
24 is_leaf,
25 keys: Vec::new(),
26 values: Vec::new(),
27 children: Vec::new(),
28 }
29 }
30
31 fn serialize(&self) -> Vec<u8> {
32 bincode::serialize(self).unwrap_or_default()
33 }
34
35 fn deserialize(data: &[u8]) -> Option<Self> {
36 bincode::deserialize(data).ok()
37 }
38}
39
40pub struct BTree {
41 file_manager: RwLock<FileManager>,
42 cache: RwLock<PageCache>,
43 root_page_id: PageId,
44}
45
46impl BTree {
47 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
48 let file_manager = FileManager::open(path)?;
49
50 if file_manager.page_count() == 0 {
51 return Err(SqlRsError::Storage("Empty database file".to_string()));
52 }
53
54 Ok(Self {
55 file_manager: RwLock::new(file_manager),
56 cache: RwLock::new(PageCache::new(100)),
57 root_page_id: 0,
58 })
59 }
60
61 pub fn create<P: AsRef<Path>>(path: P) -> Result<Self> {
62 let mut file_manager = FileManager::create(path)?;
63
64 let root_page_id = file_manager.allocate_page();
65 let mut root_page = Page::new(root_page_id, PageType::BTreeLeaf);
66 root_page.write_header();
67
68 let root_node = BTreeNode::new(root_page_id, true);
69 let serialized = root_node.serialize();
70 root_page.data[1..serialized.len() + 1].copy_from_slice(&serialized);
71
72 file_manager.write_page(&root_page)?;
73 file_manager.flush()?;
74
75 Ok(Self {
76 file_manager: RwLock::new(file_manager),
77 cache: RwLock::new(PageCache::new(100)),
78 root_page_id,
79 })
80 }
81
82 fn read_node(&self, page_id: PageId) -> Result<BTreeNode> {
83 {
84 let cache = self.cache.read();
85 if let Some(page) = cache.get(page_id) {
86 if let Some(node) = BTreeNode::deserialize(&page.data[1..]) {
87 return Ok(node);
88 }
89 }
90 }
91
92 let mut file_manager = self.file_manager.write();
93 let page = file_manager.read_page(page_id)?;
94
95 let node = BTreeNode::deserialize(&page.data[1..])
96 .ok_or_else(|| SqlRsError::Storage("Failed to deserialize node".to_string()))?;
97
98 self.cache.write().insert(page);
99
100 Ok(node)
101 }
102
103 fn write_node(&self, node: &BTreeNode) -> Result<()> {
104 let mut page = Page::new(node.page_id, if node.is_leaf { PageType::BTreeLeaf } else { PageType::BTreeInternal });
105 page.write_header();
106
107 let serialized = node.serialize();
108 if serialized.len() + 1 > page.data.len() {
109 return Err(SqlRsError::Storage("Node too large for page".to_string()));
110 }
111
112 page.data[1..serialized.len() + 1].copy_from_slice(&serialized);
113
114 self.cache.write().insert(page.clone());
115 self.file_manager.write().write_page(&page)?;
116
117 Ok(())
118 }
119
120 pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
121 self.search_recursive(self.root_page_id, key)
122 }
123
124 fn search_recursive(&self, page_id: PageId, key: &[u8]) -> Result<Option<Vec<u8>>> {
125 let node = self.read_node(page_id)?;
126
127 if node.is_leaf {
128 for (i, k) in node.keys.iter().enumerate() {
129 if k.as_slice() == key {
130 return Ok(Some(node.values[i].clone()));
131 }
132 }
133 Ok(None)
134 } else {
135 let mut child_idx = 0;
136 for (i, k) in node.keys.iter().enumerate() {
137 if key < k.as_slice() {
138 child_idx = i;
139 break;
140 }
141 child_idx = i + 1;
142 }
143
144 if child_idx < node.children.len() {
145 self.search_recursive(node.children[child_idx], key)
146 } else {
147 Ok(None)
148 }
149 }
150 }
151
152 pub fn insert(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
153 let mut root = self.read_node(self.root_page_id)?;
154
155 if root.keys.len() >= MAX_KEYS_PER_NODE {
156 let new_root_id = self.file_manager.write().allocate_page();
157 let mut new_root = BTreeNode::new(new_root_id, false);
158 new_root.children.push(self.root_page_id);
159
160 self.split_child(&mut new_root, 0)?;
161 self.root_page_id = new_root_id;
162 self.write_node(&new_root)?;
163
164 root = new_root;
165 }
166
167 self.insert_non_full(root, key, value)?;
168 Ok(())
169 }
170
171 fn insert_non_full(&self, mut node: BTreeNode, key: &[u8], value: &[u8]) -> Result<()> {
172 if node.is_leaf {
173 let pos = node.keys.iter().position(|k| k.as_slice() >= key).unwrap_or(node.keys.len());
174
175 if pos < node.keys.len() && node.keys[pos] == key {
176 node.values[pos] = value.to_vec();
177 } else {
178 node.keys.insert(pos, key.to_vec());
179 node.values.insert(pos, value.to_vec());
180 }
181
182 self.write_node(&node)?;
183 } else {
184 let mut pos = node.keys.iter().position(|k| k.as_slice() > key).unwrap_or(node.keys.len());
185
186 let child_id = node.children[pos];
187 let child = self.read_node(child_id)?;
188
189 if child.keys.len() >= MAX_KEYS_PER_NODE {
190 self.split_child(&mut node, pos)?;
191
192 if key > node.keys[pos].as_slice() {
193 pos += 1;
194 }
195 }
196
197 let child_id = node.children[pos];
198 let child = self.read_node(child_id)?;
199 self.insert_non_full(child, key, value)?;
200 }
201
202 Ok(())
203 }
204
205 fn split_child(&self, parent: &mut BTreeNode, child_idx: usize) -> Result<()> {
206 let child_id = parent.children[child_idx];
207 let mut child = self.read_node(child_id)?;
208
209 let mid = child.keys.len() / 2;
210
211 let new_node_id = self.file_manager.write().allocate_page();
212 let mut new_node = BTreeNode::new(new_node_id, child.is_leaf);
213
214 if child.is_leaf {
215 new_node.keys = child.keys.split_off(mid);
216 new_node.values = child.values.split_off(mid);
217
218 let mid_key = new_node.keys[0].clone();
219 parent.keys.insert(child_idx, mid_key);
220 } else {
221 new_node.keys = child.keys.split_off(mid + 1);
222 new_node.values = child.values.split_off(mid + 1);
223 new_node.children = child.children.split_off(mid + 1);
224
225 let mid_key = child.keys.pop().unwrap();
226 child.values.pop();
227
228 parent.keys.insert(child_idx, mid_key);
229 }
230
231 parent.children.insert(child_idx + 1, new_node_id);
232
233 self.write_node(&child)?;
234 self.write_node(&new_node)?;
235 self.write_node(parent)?;
236
237 Ok(())
238 }
239
240 pub fn delete(&mut self, key: &[u8]) -> Result<()> {
241 let mut root = self.read_node(self.root_page_id)?;
242 self.delete_recursive(&mut root, key)?;
243
244 if !root.is_leaf && root.keys.is_empty() {
246 if let Some(&new_root_id) = root.children.first() {
247 self.root_page_id = new_root_id;
248 }
249 }
250
251 Ok(())
252 }
253
254 fn delete_recursive(&self, node: &mut BTreeNode, key: &[u8]) -> Result<bool> {
255 let idx = node.keys.iter().position(|k| k.as_slice() >= key).unwrap_or(node.keys.len());
256
257 if node.is_leaf {
258 if idx < node.keys.len() && node.keys[idx] == key {
259 node.keys.remove(idx);
260 node.values.remove(idx);
261 self.write_node(node)?;
262 return Ok(true);
263 }
264 return Ok(false);
265 }
266
267 if idx < node.keys.len() && node.keys[idx] == key {
269 return self.delete_from_internal_node(node, idx);
270 }
271
272 let child_id = node.children[idx];
274 let mut child = self.read_node(child_id)?;
275
276 if child.keys.len() <= MAX_KEYS_PER_NODE / 2 {
278 self.ensure_min_keys(node, idx)?;
279 let child_id = node.children[idx];
281 let mut child = self.read_node(child_id)?;
282 return self.delete_recursive(&mut child, key);
283 }
284
285 self.delete_recursive(&mut child, key)
286 }
287
288 fn delete_from_internal_node(&self, node: &mut BTreeNode, idx: usize) -> Result<bool> {
289 let predecessor_child_id = node.children[idx];
290 let predecessor_child = self.read_node(predecessor_child_id)?;
291
292 if predecessor_child.keys.len() > MAX_KEYS_PER_NODE / 2 {
293 let pred_key = self.get_predecessor(predecessor_child_id)?;
295
296 node.keys[idx] = pred_key.clone();
297 self.write_node(node)?;
298
299 let mut child = self.read_node(predecessor_child_id)?;
300 return self.delete_recursive(&mut child, &pred_key);
301 }
302
303 let successor_child_id = node.children[idx + 1];
304 let successor_child = self.read_node(successor_child_id)?;
305
306 if successor_child.keys.len() > MAX_KEYS_PER_NODE / 2 {
307 let succ_key = self.get_successor(successor_child_id)?;
309
310 node.keys[idx] = succ_key.clone();
311 self.write_node(node)?;
312
313 let mut child = self.read_node(successor_child_id)?;
314 return self.delete_recursive(&mut child, &succ_key);
315 }
316
317 self.merge_children(node, idx)?;
319 let merged_child_id = node.children[idx];
320 let mut merged_child = self.read_node(merged_child_id)?;
321 let key_to_delete = node.keys[idx].clone();
322 self.delete_recursive(&mut merged_child, &key_to_delete)
323 }
324
325 fn get_predecessor(&self, mut page_id: PageId) -> Result<Vec<u8>> {
326 loop {
327 let node = self.read_node(page_id)?;
328 if node.is_leaf {
329 return Ok(node.keys.last().cloned().unwrap());
330 }
331 page_id = *node.children.last().unwrap();
332 }
333 }
334
335 fn get_successor(&self, mut page_id: PageId) -> Result<Vec<u8>> {
336 loop {
337 let node = self.read_node(page_id)?;
338 if node.is_leaf {
339 return Ok(node.keys.first().cloned().unwrap());
340 }
341 page_id = node.children[0];
342 }
343 }
344
345 fn ensure_min_keys(&self, parent: &mut BTreeNode, idx: usize) -> Result<()> {
346 let child_id = parent.children[idx];
347 let child = self.read_node(child_id)?;
348
349 if child.keys.len() > MAX_KEYS_PER_NODE / 2 {
350 return Ok(());
351 }
352
353 if idx > 0 {
355 let left_sibling_id = parent.children[idx - 1];
356 let left_sibling = self.read_node(left_sibling_id)?;
357
358 if left_sibling.keys.len() > MAX_KEYS_PER_NODE / 2 {
359 return self.borrow_from_left(parent, idx);
360 }
361 }
362
363 if idx < parent.children.len() - 1 {
365 let right_sibling_id = parent.children[idx + 1];
366 let right_sibling = self.read_node(right_sibling_id)?;
367
368 if right_sibling.keys.len() > MAX_KEYS_PER_NODE / 2 {
369 return self.borrow_from_right(parent, idx);
370 }
371 }
372
373 if idx > 0 {
375 self.merge_children(parent, idx - 1)?;
376 } else {
377 self.merge_children(parent, idx)?;
378 }
379
380 Ok(())
381 }
382
383 fn borrow_from_left(&self, parent: &mut BTreeNode, idx: usize) -> Result<()> {
384 let child_id = parent.children[idx];
385 let left_sibling_id = parent.children[idx - 1];
386
387 let mut child = self.read_node(child_id)?;
388 let mut left_sibling = self.read_node(left_sibling_id)?;
389
390 child.keys.insert(0, parent.keys[idx - 1].clone());
392 if child.is_leaf {
393 child.values.insert(0, left_sibling.values.pop().unwrap());
394 } else {
395 child.children.insert(0, left_sibling.children.pop().unwrap());
396 child.values.insert(0, left_sibling.values.pop().unwrap());
397 }
398
399 parent.keys[idx - 1] = left_sibling.keys.pop().unwrap();
401
402 self.write_node(&child)?;
403 self.write_node(&left_sibling)?;
404 self.write_node(parent)?;
405
406 Ok(())
407 }
408
409 fn borrow_from_right(&self, parent: &mut BTreeNode, idx: usize) -> Result<()> {
410 let child_id = parent.children[idx];
411 let right_sibling_id = parent.children[idx + 1];
412
413 let mut child = self.read_node(child_id)?;
414 let mut right_sibling = self.read_node(right_sibling_id)?;
415
416 child.keys.push(parent.keys[idx].clone());
418 if child.is_leaf {
419 child.values.push(right_sibling.values.remove(0));
420 } else {
421 child.children.push(right_sibling.children.remove(0));
422 child.values.push(right_sibling.values.remove(0));
423 }
424
425 parent.keys[idx] = right_sibling.keys.remove(0);
427
428 self.write_node(&child)?;
429 self.write_node(&right_sibling)?;
430 self.write_node(parent)?;
431
432 Ok(())
433 }
434
435 fn merge_children(&self, parent: &mut BTreeNode, idx: usize) -> Result<()> {
436 let left_child_id = parent.children[idx];
437 let right_child_id = parent.children[idx + 1];
438
439 let mut left_child = self.read_node(left_child_id)?;
440 let right_child = self.read_node(right_child_id)?;
441
442 left_child.keys.push(parent.keys.remove(idx));
444 parent.children.remove(idx + 1);
445
446 left_child.keys.extend(right_child.keys);
448 left_child.values.extend(right_child.values);
449 if !left_child.is_leaf {
450 left_child.children.extend(right_child.children);
451 }
452
453 self.write_node(&left_child)?;
454 self.write_node(parent)?;
455
456 Ok(())
457 }
458
459 pub fn scan(&self, start: &[u8], end: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
460 let mut results = Vec::new();
461 self.scan_recursive(self.root_page_id, start, end, &mut results)?;
462 Ok(results)
463 }
464
465 fn scan_recursive(&self, page_id: PageId, start: &[u8], end: &[u8], results: &mut Vec<(Vec<u8>, Vec<u8>)>) -> Result<()> {
466 let node = self.read_node(page_id)?;
467
468 if node.is_leaf {
469 for (i, key) in node.keys.iter().enumerate() {
470 if key.as_slice() >= start && key.as_slice() < end {
471 results.push((key.clone(), node.values[i].clone()));
472 }
473 }
474 } else {
475 for (i, child_id) in node.children.iter().enumerate() {
476 if i < node.keys.len() {
477 if node.keys[i].as_slice() >= start {
478 self.scan_recursive(*child_id, start, end, results)?;
479 }
480 } else {
481 self.scan_recursive(*child_id, start, end, results)?;
482 }
483 }
484 }
485
486 Ok(())
487 }
488
489 pub fn flush(&mut self) -> Result<()> {
490 self.cache.write().clear();
491 self.file_manager.write().flush()
492 }
493}