Skip to main content

sql_rs/storage/
btree.rs

1use crate::{Result, SqlRsError};
2use std::path::Path;
3use parking_lot::RwLock;
4use serde::{Deserialize, Serialize};
5
6use super::file::FileManager;
7use super::page::{Page, PageId, PageType, PageCache};
8
9const MAX_KEYS_PER_NODE: usize = 64;
10
11#[derive(Debug, Serialize, Deserialize)]
12struct BTreeNode {
13    page_id: PageId,
14    is_leaf: bool,
15    keys: Vec<Vec<u8>>,
16    values: Vec<Vec<u8>>,
17    children: Vec<PageId>,
18}
19
20impl BTreeNode {
21    fn new(page_id: PageId, is_leaf: bool) -> Self {
22        Self {
23            page_id,
24            is_leaf,
25            keys: Vec::new(),
26            values: Vec::new(),
27            children: Vec::new(),
28        }
29    }
30    
31    fn serialize(&self) -> Vec<u8> {
32        bincode::serialize(self).unwrap_or_default()
33    }
34    
35    fn deserialize(data: &[u8]) -> Option<Self> {
36        bincode::deserialize(data).ok()
37    }
38}
39
40pub struct BTree {
41    file_manager: RwLock<FileManager>,
42    cache: RwLock<PageCache>,
43    root_page_id: PageId,
44}
45
46impl BTree {
47    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
48        let file_manager = FileManager::open(path)?;
49        
50        if file_manager.page_count() == 0 {
51            return Err(SqlRsError::Storage("Empty database file".to_string()));
52        }
53        
54        Ok(Self {
55            file_manager: RwLock::new(file_manager),
56            cache: RwLock::new(PageCache::new(100)),
57            root_page_id: 0,
58        })
59    }
60    
61    pub fn create<P: AsRef<Path>>(path: P) -> Result<Self> {
62        let mut file_manager = FileManager::create(path)?;
63        
64        let root_page_id = file_manager.allocate_page();
65        let mut root_page = Page::new(root_page_id, PageType::BTreeLeaf);
66        root_page.write_header();
67        
68        let root_node = BTreeNode::new(root_page_id, true);
69        let serialized = root_node.serialize();
70        root_page.data[1..serialized.len() + 1].copy_from_slice(&serialized);
71        
72        file_manager.write_page(&root_page)?;
73        file_manager.flush()?;
74        
75        Ok(Self {
76            file_manager: RwLock::new(file_manager),
77            cache: RwLock::new(PageCache::new(100)),
78            root_page_id,
79        })
80    }
81    
82    fn read_node(&self, page_id: PageId) -> Result<BTreeNode> {
83        {
84            let cache = self.cache.read();
85            if let Some(page) = cache.get(page_id) {
86                if let Some(node) = BTreeNode::deserialize(&page.data[1..]) {
87                    return Ok(node);
88                }
89            }
90        }
91        
92        let mut file_manager = self.file_manager.write();
93        let page = file_manager.read_page(page_id)?;
94        
95        let node = BTreeNode::deserialize(&page.data[1..])
96            .ok_or_else(|| SqlRsError::Storage("Failed to deserialize node".to_string()))?;
97        
98        self.cache.write().insert(page);
99        
100        Ok(node)
101    }
102    
103    fn write_node(&self, node: &BTreeNode) -> Result<()> {
104        let mut page = Page::new(node.page_id, if node.is_leaf { PageType::BTreeLeaf } else { PageType::BTreeInternal });
105        page.write_header();
106        
107        let serialized = node.serialize();
108        if serialized.len() + 1 > page.data.len() {
109            return Err(SqlRsError::Storage("Node too large for page".to_string()));
110        }
111        
112        page.data[1..serialized.len() + 1].copy_from_slice(&serialized);
113        
114        self.cache.write().insert(page.clone());
115        self.file_manager.write().write_page(&page)?;
116        
117        Ok(())
118    }
119    
120    pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
121        self.search_recursive(self.root_page_id, key)
122    }
123    
124    fn search_recursive(&self, page_id: PageId, key: &[u8]) -> Result<Option<Vec<u8>>> {
125        let node = self.read_node(page_id)?;
126        
127        if node.is_leaf {
128            for (i, k) in node.keys.iter().enumerate() {
129                if k.as_slice() == key {
130                    return Ok(Some(node.values[i].clone()));
131                }
132            }
133            Ok(None)
134        } else {
135            let mut child_idx = 0;
136            for (i, k) in node.keys.iter().enumerate() {
137                if key < k.as_slice() {
138                    child_idx = i;
139                    break;
140                }
141                child_idx = i + 1;
142            }
143            
144            if child_idx < node.children.len() {
145                self.search_recursive(node.children[child_idx], key)
146            } else {
147                Ok(None)
148            }
149        }
150    }
151    
152    pub fn insert(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
153        let mut root = self.read_node(self.root_page_id)?;
154        
155        if root.keys.len() >= MAX_KEYS_PER_NODE {
156            let new_root_id = self.file_manager.write().allocate_page();
157            let mut new_root = BTreeNode::new(new_root_id, false);
158            new_root.children.push(self.root_page_id);
159            
160            self.split_child(&mut new_root, 0)?;
161            self.root_page_id = new_root_id;
162            self.write_node(&new_root)?;
163            
164            root = new_root;
165        }
166        
167        self.insert_non_full(root, key, value)?;
168        Ok(())
169    }
170    
171    fn insert_non_full(&self, mut node: BTreeNode, key: &[u8], value: &[u8]) -> Result<()> {
172        if node.is_leaf {
173            let pos = node.keys.iter().position(|k| k.as_slice() >= key).unwrap_or(node.keys.len());
174            
175            if pos < node.keys.len() && node.keys[pos] == key {
176                node.values[pos] = value.to_vec();
177            } else {
178                node.keys.insert(pos, key.to_vec());
179                node.values.insert(pos, value.to_vec());
180            }
181            
182            self.write_node(&node)?;
183        } else {
184            let mut pos = node.keys.iter().position(|k| k.as_slice() > key).unwrap_or(node.keys.len());
185            
186            let child_id = node.children[pos];
187            let child = self.read_node(child_id)?;
188            
189            if child.keys.len() >= MAX_KEYS_PER_NODE {
190                self.split_child(&mut node, pos)?;
191                
192                if key > node.keys[pos].as_slice() {
193                    pos += 1;
194                }
195            }
196            
197            let child_id = node.children[pos];
198            let child = self.read_node(child_id)?;
199            self.insert_non_full(child, key, value)?;
200        }
201        
202        Ok(())
203    }
204    
205    fn split_child(&self, parent: &mut BTreeNode, child_idx: usize) -> Result<()> {
206        let child_id = parent.children[child_idx];
207        let mut child = self.read_node(child_id)?;
208        
209        let mid = child.keys.len() / 2;
210        
211        let new_node_id = self.file_manager.write().allocate_page();
212        let mut new_node = BTreeNode::new(new_node_id, child.is_leaf);
213        
214        if child.is_leaf {
215            new_node.keys = child.keys.split_off(mid);
216            new_node.values = child.values.split_off(mid);
217            
218            let mid_key = new_node.keys[0].clone();
219            parent.keys.insert(child_idx, mid_key);
220        } else {
221            new_node.keys = child.keys.split_off(mid + 1);
222            new_node.values = child.values.split_off(mid + 1);
223            new_node.children = child.children.split_off(mid + 1);
224            
225            let mid_key = child.keys.pop().unwrap();
226            child.values.pop();
227            
228            parent.keys.insert(child_idx, mid_key);
229        }
230        
231        parent.children.insert(child_idx + 1, new_node_id);
232        
233        self.write_node(&child)?;
234        self.write_node(&new_node)?;
235        self.write_node(parent)?;
236        
237        Ok(())
238    }
239    
240    pub fn delete(&mut self, key: &[u8]) -> Result<()> {
241        let mut root = self.read_node(self.root_page_id)?;
242        self.delete_recursive(&mut root, key)?;
243
244        // If root has no keys but has a child, make the child the new root
245        if !root.is_leaf && root.keys.is_empty() {
246            if let Some(&new_root_id) = root.children.first() {
247                self.root_page_id = new_root_id;
248            }
249        }
250
251        Ok(())
252    }
253
254    fn delete_recursive(&self, node: &mut BTreeNode, key: &[u8]) -> Result<bool> {
255        let idx = node.keys.iter().position(|k| k.as_slice() >= key).unwrap_or(node.keys.len());
256
257        if node.is_leaf {
258            if idx < node.keys.len() && node.keys[idx] == key {
259                node.keys.remove(idx);
260                node.values.remove(idx);
261                self.write_node(node)?;
262                return Ok(true);
263            }
264            return Ok(false);
265        }
266
267        // Key is in internal node
268        if idx < node.keys.len() && node.keys[idx] == key {
269            return self.delete_from_internal_node(node, idx);
270        }
271
272        // Key is in child
273        let child_id = node.children[idx];
274        let mut child = self.read_node(child_id)?;
275
276        // Ensure child has enough keys
277        if child.keys.len() <= MAX_KEYS_PER_NODE / 2 {
278            self.ensure_min_keys(node, idx)?;
279            // Reload child and potentially new node index
280            let child_id = node.children[idx];
281            let mut child = self.read_node(child_id)?;
282            return self.delete_recursive(&mut child, key);
283        }
284
285        self.delete_recursive(&mut child, key)
286    }
287
288    fn delete_from_internal_node(&self, node: &mut BTreeNode, idx: usize) -> Result<bool> {
289        let predecessor_child_id = node.children[idx];
290        let predecessor_child = self.read_node(predecessor_child_id)?;
291
292        if predecessor_child.keys.len() > MAX_KEYS_PER_NODE / 2 {
293            // Get predecessor (rightmost key of left child)
294            let pred_key = self.get_predecessor(predecessor_child_id)?;
295
296            node.keys[idx] = pred_key.clone();
297            self.write_node(node)?;
298
299            let mut child = self.read_node(predecessor_child_id)?;
300            return self.delete_recursive(&mut child, &pred_key);
301        }
302
303        let successor_child_id = node.children[idx + 1];
304        let successor_child = self.read_node(successor_child_id)?;
305
306        if successor_child.keys.len() > MAX_KEYS_PER_NODE / 2 {
307            // Get successor (leftmost key of right child)
308            let succ_key = self.get_successor(successor_child_id)?;
309
310            node.keys[idx] = succ_key.clone();
311            self.write_node(node)?;
312
313            let mut child = self.read_node(successor_child_id)?;
314            return self.delete_recursive(&mut child, &succ_key);
315        }
316
317        // Merge both children and delete
318        self.merge_children(node, idx)?;
319        let merged_child_id = node.children[idx];
320        let mut merged_child = self.read_node(merged_child_id)?;
321        let key_to_delete = node.keys[idx].clone();
322        self.delete_recursive(&mut merged_child, &key_to_delete)
323    }
324
325    fn get_predecessor(&self, mut page_id: PageId) -> Result<Vec<u8>> {
326        loop {
327            let node = self.read_node(page_id)?;
328            if node.is_leaf {
329                return Ok(node.keys.last().cloned().unwrap());
330            }
331            page_id = *node.children.last().unwrap();
332        }
333    }
334
335    fn get_successor(&self, mut page_id: PageId) -> Result<Vec<u8>> {
336        loop {
337            let node = self.read_node(page_id)?;
338            if node.is_leaf {
339                return Ok(node.keys.first().cloned().unwrap());
340            }
341            page_id = node.children[0];
342        }
343    }
344
345    fn ensure_min_keys(&self, parent: &mut BTreeNode, idx: usize) -> Result<()> {
346        let child_id = parent.children[idx];
347        let child = self.read_node(child_id)?;
348
349        if child.keys.len() > MAX_KEYS_PER_NODE / 2 {
350            return Ok(());
351        }
352
353        // Try to borrow from left sibling
354        if idx > 0 {
355            let left_sibling_id = parent.children[idx - 1];
356            let left_sibling = self.read_node(left_sibling_id)?;
357
358            if left_sibling.keys.len() > MAX_KEYS_PER_NODE / 2 {
359                return self.borrow_from_left(parent, idx);
360            }
361        }
362
363        // Try to borrow from right sibling
364        if idx < parent.children.len() - 1 {
365            let right_sibling_id = parent.children[idx + 1];
366            let right_sibling = self.read_node(right_sibling_id)?;
367
368            if right_sibling.keys.len() > MAX_KEYS_PER_NODE / 2 {
369                return self.borrow_from_right(parent, idx);
370            }
371        }
372
373        // Merge with a sibling
374        if idx > 0 {
375            self.merge_children(parent, idx - 1)?;
376        } else {
377            self.merge_children(parent, idx)?;
378        }
379
380        Ok(())
381    }
382
383    fn borrow_from_left(&self, parent: &mut BTreeNode, idx: usize) -> Result<()> {
384        let child_id = parent.children[idx];
385        let left_sibling_id = parent.children[idx - 1];
386
387        let mut child = self.read_node(child_id)?;
388        let mut left_sibling = self.read_node(left_sibling_id)?;
389
390        // Move key from parent to child
391        child.keys.insert(0, parent.keys[idx - 1].clone());
392        if child.is_leaf {
393            child.values.insert(0, left_sibling.values.pop().unwrap());
394        } else {
395            child.children.insert(0, left_sibling.children.pop().unwrap());
396            child.values.insert(0, left_sibling.values.pop().unwrap());
397        }
398
399        // Move key from left sibling to parent
400        parent.keys[idx - 1] = left_sibling.keys.pop().unwrap();
401
402        self.write_node(&child)?;
403        self.write_node(&left_sibling)?;
404        self.write_node(parent)?;
405
406        Ok(())
407    }
408
409    fn borrow_from_right(&self, parent: &mut BTreeNode, idx: usize) -> Result<()> {
410        let child_id = parent.children[idx];
411        let right_sibling_id = parent.children[idx + 1];
412
413        let mut child = self.read_node(child_id)?;
414        let mut right_sibling = self.read_node(right_sibling_id)?;
415
416        // Move key from parent to child
417        child.keys.push(parent.keys[idx].clone());
418        if child.is_leaf {
419            child.values.push(right_sibling.values.remove(0));
420        } else {
421            child.children.push(right_sibling.children.remove(0));
422            child.values.push(right_sibling.values.remove(0));
423        }
424
425        // Move key from right sibling to parent
426        parent.keys[idx] = right_sibling.keys.remove(0);
427
428        self.write_node(&child)?;
429        self.write_node(&right_sibling)?;
430        self.write_node(parent)?;
431
432        Ok(())
433    }
434
435    fn merge_children(&self, parent: &mut BTreeNode, idx: usize) -> Result<()> {
436        let left_child_id = parent.children[idx];
437        let right_child_id = parent.children[idx + 1];
438
439        let mut left_child = self.read_node(left_child_id)?;
440        let right_child = self.read_node(right_child_id)?;
441
442        // Move key from parent to left child
443        left_child.keys.push(parent.keys.remove(idx));
444        parent.children.remove(idx + 1);
445
446        // Merge keys, values, and children
447        left_child.keys.extend(right_child.keys);
448        left_child.values.extend(right_child.values);
449        if !left_child.is_leaf {
450            left_child.children.extend(right_child.children);
451        }
452
453        self.write_node(&left_child)?;
454        self.write_node(parent)?;
455
456        Ok(())
457    }
458    
459    pub fn scan(&self, start: &[u8], end: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
460        let mut results = Vec::new();
461        self.scan_recursive(self.root_page_id, start, end, &mut results)?;
462        Ok(results)
463    }
464    
465    fn scan_recursive(&self, page_id: PageId, start: &[u8], end: &[u8], results: &mut Vec<(Vec<u8>, Vec<u8>)>) -> Result<()> {
466        let node = self.read_node(page_id)?;
467        
468        if node.is_leaf {
469            for (i, key) in node.keys.iter().enumerate() {
470                if key.as_slice() >= start && key.as_slice() < end {
471                    results.push((key.clone(), node.values[i].clone()));
472                }
473            }
474        } else {
475            for (i, child_id) in node.children.iter().enumerate() {
476                if i < node.keys.len() {
477                    if node.keys[i].as_slice() >= start {
478                        self.scan_recursive(*child_id, start, end, results)?;
479                    }
480                } else {
481                    self.scan_recursive(*child_id, start, end, results)?;
482                }
483            }
484        }
485        
486        Ok(())
487    }
488    
489    pub fn flush(&mut self) -> Result<()> {
490        self.cache.write().clear();
491        self.file_manager.write().flush()
492    }
493}