Skip to main content

sql_rs/storage/
btree.rs

1use crate::{Result, SqlRsError};
2use std::path::Path;
3use std::sync::Arc;
4use parking_lot::RwLock;
5use serde::{Deserialize, Serialize};
6
7use super::file::FileManager;
8use super::page::{Page, PageId, PageType, PageCache};
9
10const MAX_KEYS_PER_NODE: usize = 64;
11
12#[derive(Debug, Serialize, Deserialize)]
13struct BTreeNode {
14    page_id: PageId,
15    is_leaf: bool,
16    keys: Vec<Vec<u8>>,
17    values: Vec<Vec<u8>>,
18    children: Vec<PageId>,
19}
20
21impl BTreeNode {
22    fn new(page_id: PageId, is_leaf: bool) -> Self {
23        Self {
24            page_id,
25            is_leaf,
26            keys: Vec::new(),
27            values: Vec::new(),
28            children: Vec::new(),
29        }
30    }
31    
32    fn serialize(&self) -> Vec<u8> {
33        bincode::serialize(self).unwrap_or_default()
34    }
35    
36    fn deserialize(data: &[u8]) -> Option<Self> {
37        bincode::deserialize(data).ok()
38    }
39}
40
41pub struct BTree {
42    file_manager: Arc<RwLock<FileManager>>,
43    cache: RwLock<PageCache>,
44    root_page_id: PageId,
45}
46
47impl BTree {
48    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
49        let file_manager = FileManager::open(path)?;
50
51        if file_manager.page_count() == 0 {
52            return Err(SqlRsError::Storage("Empty database file".to_string()));
53        }
54
55        Ok(Self {
56            file_manager: Arc::new(RwLock::new(file_manager)),
57            cache: RwLock::new(PageCache::new(100)),
58            root_page_id: 0,
59        })
60    }
61
62    pub fn create<P: AsRef<Path>>(path: P) -> Result<Self> {
63        let mut file_manager = FileManager::create(path)?;
64
65        let root_page_id = file_manager.allocate_page();
66        let mut root_page = Page::new(root_page_id, PageType::BTreeLeaf);
67        root_page.write_header();
68
69        let root_node = BTreeNode::new(root_page_id, true);
70        let serialized = root_node.serialize();
71        root_page.data[1..serialized.len() + 1].copy_from_slice(&serialized);
72
73        file_manager.write_page(&root_page)?;
74        file_manager.flush()?;
75
76        Ok(Self {
77            file_manager: Arc::new(RwLock::new(file_manager)),
78            cache: RwLock::new(PageCache::new(100)),
79            root_page_id,
80        })
81    }
82    
83    fn read_node(&self, page_id: PageId) -> Result<BTreeNode> {
84        {
85            let cache = self.cache.read();
86            if let Some(page) = cache.get(page_id) {
87                if let Some(node) = BTreeNode::deserialize(&page.data[1..]) {
88                    return Ok(node);
89                }
90            }
91        }
92        
93        let mut file_manager = self.file_manager.write();
94        let page = file_manager.read_page(page_id)?;
95        
96        let node = BTreeNode::deserialize(&page.data[1..])
97            .ok_or_else(|| SqlRsError::Storage("Failed to deserialize node".to_string()))?;
98        
99        self.cache.write().insert(page);
100        
101        Ok(node)
102    }
103    
104    fn write_node(&self, node: &BTreeNode) -> Result<()> {
105        let mut page = Page::new(node.page_id, if node.is_leaf { PageType::BTreeLeaf } else { PageType::BTreeInternal });
106        page.write_header();
107        
108        let serialized = node.serialize();
109        if serialized.len() + 1 > page.data.len() {
110            return Err(SqlRsError::Storage("Node too large for page".to_string()));
111        }
112        
113        page.data[1..serialized.len() + 1].copy_from_slice(&serialized);
114        
115        self.cache.write().insert(page.clone());
116        self.file_manager.write().write_page(&page)?;
117        
118        Ok(())
119    }
120    
121    pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
122        self.search_recursive(self.root_page_id, key)
123    }
124    
125    fn search_recursive(&self, page_id: PageId, key: &[u8]) -> Result<Option<Vec<u8>>> {
126        let node = self.read_node(page_id)?;
127        
128        if node.is_leaf {
129            for (i, k) in node.keys.iter().enumerate() {
130                if k.as_slice() == key {
131                    return Ok(Some(node.values[i].clone()));
132                }
133            }
134            Ok(None)
135        } else {
136            let mut child_idx = 0;
137            for (i, k) in node.keys.iter().enumerate() {
138                if key < k.as_slice() {
139                    child_idx = i;
140                    break;
141                }
142                child_idx = i + 1;
143            }
144            
145            if child_idx < node.children.len() {
146                self.search_recursive(node.children[child_idx], key)
147            } else {
148                Ok(None)
149            }
150        }
151    }
152    
153    pub fn insert(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
154        let mut root = self.read_node(self.root_page_id)?;
155        
156        if root.keys.len() >= MAX_KEYS_PER_NODE {
157            let new_root_id = self.file_manager.write().allocate_page();
158            let mut new_root = BTreeNode::new(new_root_id, false);
159            new_root.children.push(self.root_page_id);
160            
161            self.split_child(&mut new_root, 0)?;
162            self.root_page_id = new_root_id;
163            self.write_node(&new_root)?;
164            
165            root = new_root;
166        }
167        
168        self.insert_non_full(root, key, value)?;
169        Ok(())
170    }
171    
172    fn insert_non_full(&self, mut node: BTreeNode, key: &[u8], value: &[u8]) -> Result<()> {
173        if node.is_leaf {
174            let pos = node.keys.iter().position(|k| k.as_slice() >= key).unwrap_or(node.keys.len());
175            
176            if pos < node.keys.len() && node.keys[pos] == key {
177                node.values[pos] = value.to_vec();
178            } else {
179                node.keys.insert(pos, key.to_vec());
180                node.values.insert(pos, value.to_vec());
181            }
182            
183            self.write_node(&node)?;
184        } else {
185            let mut pos = node.keys.iter().position(|k| k.as_slice() > key).unwrap_or(node.keys.len());
186            
187            let child_id = node.children[pos];
188            let child = self.read_node(child_id)?;
189            
190            if child.keys.len() >= MAX_KEYS_PER_NODE {
191                self.split_child(&mut node, pos)?;
192                
193                if key > node.keys[pos].as_slice() {
194                    pos += 1;
195                }
196            }
197            
198            let child_id = node.children[pos];
199            let child = self.read_node(child_id)?;
200            self.insert_non_full(child, key, value)?;
201        }
202        
203        Ok(())
204    }
205    
206    fn split_child(&self, parent: &mut BTreeNode, child_idx: usize) -> Result<()> {
207        let child_id = parent.children[child_idx];
208        let mut child = self.read_node(child_id)?;
209        
210        let mid = child.keys.len() / 2;
211        
212        let new_node_id = self.file_manager.write().allocate_page();
213        let mut new_node = BTreeNode::new(new_node_id, child.is_leaf);
214        
215        if child.is_leaf {
216            new_node.keys = child.keys.split_off(mid);
217            new_node.values = child.values.split_off(mid);
218            
219            let mid_key = new_node.keys[0].clone();
220            parent.keys.insert(child_idx, mid_key);
221        } else {
222            new_node.keys = child.keys.split_off(mid + 1);
223            new_node.values = child.values.split_off(mid + 1);
224            new_node.children = child.children.split_off(mid + 1);
225            
226            let mid_key = child.keys.pop().unwrap();
227            child.values.pop();
228            
229            parent.keys.insert(child_idx, mid_key);
230        }
231        
232        parent.children.insert(child_idx + 1, new_node_id);
233        
234        self.write_node(&child)?;
235        self.write_node(&new_node)?;
236        self.write_node(parent)?;
237        
238        Ok(())
239    }
240    
241    pub fn delete(&mut self, key: &[u8]) -> Result<()> {
242        let mut root = self.read_node(self.root_page_id)?;
243        self.delete_recursive(&mut root, key)?;
244
245        // If root has no keys but has a child, make the child the new root
246        if !root.is_leaf && root.keys.is_empty() {
247            if let Some(&new_root_id) = root.children.first() {
248                self.root_page_id = new_root_id;
249            }
250        }
251
252        Ok(())
253    }
254
255    fn delete_recursive(&self, node: &mut BTreeNode, key: &[u8]) -> Result<bool> {
256        let idx = node.keys.iter().position(|k| k.as_slice() >= key).unwrap_or(node.keys.len());
257
258        if node.is_leaf {
259            if idx < node.keys.len() && node.keys[idx] == key {
260                node.keys.remove(idx);
261                node.values.remove(idx);
262                self.write_node(node)?;
263                return Ok(true);
264            }
265            return Ok(false);
266        }
267
268        // Key is in internal node
269        if idx < node.keys.len() && node.keys[idx] == key {
270            return self.delete_from_internal_node(node, idx);
271        }
272
273        // Key is in child
274        let child_id = node.children[idx];
275        let mut child = self.read_node(child_id)?;
276
277        // Ensure child has enough keys
278        if child.keys.len() <= MAX_KEYS_PER_NODE / 2 {
279            self.ensure_min_keys(node, idx)?;
280            // Reload child and potentially new node index
281            let child_id = node.children[idx];
282            let mut child = self.read_node(child_id)?;
283            return self.delete_recursive(&mut child, key);
284        }
285
286        self.delete_recursive(&mut child, key)
287    }
288
289    fn delete_from_internal_node(&self, node: &mut BTreeNode, idx: usize) -> Result<bool> {
290        let predecessor_child_id = node.children[idx];
291        let predecessor_child = self.read_node(predecessor_child_id)?;
292
293        if predecessor_child.keys.len() > MAX_KEYS_PER_NODE / 2 {
294            // Get predecessor (rightmost key of left child)
295            let pred_key = self.get_predecessor(predecessor_child_id)?;
296
297            node.keys[idx] = pred_key.clone();
298            self.write_node(node)?;
299
300            let mut child = self.read_node(predecessor_child_id)?;
301            return self.delete_recursive(&mut child, &pred_key);
302        }
303
304        let successor_child_id = node.children[idx + 1];
305        let successor_child = self.read_node(successor_child_id)?;
306
307        if successor_child.keys.len() > MAX_KEYS_PER_NODE / 2 {
308            // Get successor (leftmost key of right child)
309            let succ_key = self.get_successor(successor_child_id)?;
310
311            node.keys[idx] = succ_key.clone();
312            self.write_node(node)?;
313
314            let mut child = self.read_node(successor_child_id)?;
315            return self.delete_recursive(&mut child, &succ_key);
316        }
317
318        // Merge both children and delete
319        self.merge_children(node, idx)?;
320        let merged_child_id = node.children[idx];
321        let mut merged_child = self.read_node(merged_child_id)?;
322        let key_to_delete = node.keys[idx].clone();
323        self.delete_recursive(&mut merged_child, &key_to_delete)
324    }
325
326    fn get_predecessor(&self, mut page_id: PageId) -> Result<Vec<u8>> {
327        loop {
328            let node = self.read_node(page_id)?;
329            if node.is_leaf {
330                return Ok(node.keys.last().cloned().unwrap());
331            }
332            page_id = *node.children.last().unwrap();
333        }
334    }
335
336    fn get_successor(&self, mut page_id: PageId) -> Result<Vec<u8>> {
337        loop {
338            let node = self.read_node(page_id)?;
339            if node.is_leaf {
340                return Ok(node.keys.first().cloned().unwrap());
341            }
342            page_id = node.children[0];
343        }
344    }
345
346    fn ensure_min_keys(&self, parent: &mut BTreeNode, idx: usize) -> Result<()> {
347        let child_id = parent.children[idx];
348        let child = self.read_node(child_id)?;
349
350        if child.keys.len() > MAX_KEYS_PER_NODE / 2 {
351            return Ok(());
352        }
353
354        // Try to borrow from left sibling
355        if idx > 0 {
356            let left_sibling_id = parent.children[idx - 1];
357            let left_sibling = self.read_node(left_sibling_id)?;
358
359            if left_sibling.keys.len() > MAX_KEYS_PER_NODE / 2 {
360                return self.borrow_from_left(parent, idx);
361            }
362        }
363
364        // Try to borrow from right sibling
365        if idx < parent.children.len() - 1 {
366            let right_sibling_id = parent.children[idx + 1];
367            let right_sibling = self.read_node(right_sibling_id)?;
368
369            if right_sibling.keys.len() > MAX_KEYS_PER_NODE / 2 {
370                return self.borrow_from_right(parent, idx);
371            }
372        }
373
374        // Merge with a sibling
375        if idx > 0 {
376            self.merge_children(parent, idx - 1)?;
377        } else {
378            self.merge_children(parent, idx)?;
379        }
380
381        Ok(())
382    }
383
384    fn borrow_from_left(&self, parent: &mut BTreeNode, idx: usize) -> Result<()> {
385        let child_id = parent.children[idx];
386        let left_sibling_id = parent.children[idx - 1];
387
388        let mut child = self.read_node(child_id)?;
389        let mut left_sibling = self.read_node(left_sibling_id)?;
390
391        // Move key from parent to child
392        child.keys.insert(0, parent.keys[idx - 1].clone());
393        if child.is_leaf {
394            child.values.insert(0, left_sibling.values.pop().unwrap());
395        } else {
396            child.children.insert(0, left_sibling.children.pop().unwrap());
397            child.values.insert(0, left_sibling.values.pop().unwrap());
398        }
399
400        // Move key from left sibling to parent
401        parent.keys[idx - 1] = left_sibling.keys.pop().unwrap();
402
403        self.write_node(&child)?;
404        self.write_node(&left_sibling)?;
405        self.write_node(parent)?;
406
407        Ok(())
408    }
409
410    fn borrow_from_right(&self, parent: &mut BTreeNode, idx: usize) -> Result<()> {
411        let child_id = parent.children[idx];
412        let right_sibling_id = parent.children[idx + 1];
413
414        let mut child = self.read_node(child_id)?;
415        let mut right_sibling = self.read_node(right_sibling_id)?;
416
417        // Move key from parent to child
418        child.keys.push(parent.keys[idx].clone());
419        if child.is_leaf {
420            child.values.push(right_sibling.values.remove(0));
421        } else {
422            child.children.push(right_sibling.children.remove(0));
423            child.values.push(right_sibling.values.remove(0));
424        }
425
426        // Move key from right sibling to parent
427        parent.keys[idx] = right_sibling.keys.remove(0);
428
429        self.write_node(&child)?;
430        self.write_node(&right_sibling)?;
431        self.write_node(parent)?;
432
433        Ok(())
434    }
435
436    fn merge_children(&self, parent: &mut BTreeNode, idx: usize) -> Result<()> {
437        let left_child_id = parent.children[idx];
438        let right_child_id = parent.children[idx + 1];
439
440        let mut left_child = self.read_node(left_child_id)?;
441        let right_child = self.read_node(right_child_id)?;
442
443        // Move key from parent to left child
444        left_child.keys.push(parent.keys.remove(idx));
445        parent.children.remove(idx + 1);
446
447        // Merge keys, values, and children
448        left_child.keys.extend(right_child.keys);
449        left_child.values.extend(right_child.values);
450        if !left_child.is_leaf {
451            left_child.children.extend(right_child.children);
452        }
453
454        self.write_node(&left_child)?;
455        self.write_node(parent)?;
456
457        Ok(())
458    }
459    
460    pub fn scan(&self, start: &[u8], end: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
461        let mut results = Vec::new();
462        self.scan_recursive(self.root_page_id, start, end, &mut results)?;
463        Ok(results)
464    }
465    
466    fn scan_recursive(&self, page_id: PageId, start: &[u8], end: &[u8], results: &mut Vec<(Vec<u8>, Vec<u8>)>) -> Result<()> {
467        let node = self.read_node(page_id)?;
468        
469        if node.is_leaf {
470            for (i, key) in node.keys.iter().enumerate() {
471                if key.as_slice() >= start && key.as_slice() < end {
472                    results.push((key.clone(), node.values[i].clone()));
473                }
474            }
475        } else {
476            for (i, child_id) in node.children.iter().enumerate() {
477                if i < node.keys.len() {
478                    if node.keys[i].as_slice() >= start {
479                        self.scan_recursive(*child_id, start, end, results)?;
480                    }
481                } else {
482                    self.scan_recursive(*child_id, start, end, results)?;
483                }
484            }
485        }
486        
487        Ok(())
488    }
489    
490    pub fn flush(&mut self) -> Result<()> {
491        self.cache.write().clear();
492        self.file_manager.write().flush()
493    }
494
495    pub fn get_file_manager(&self) -> Arc<RwLock<FileManager>> {
496        Arc::clone(&self.file_manager)
497    }
498}