Skip to main content

reddb_server/storage/btree/
node.rs

1//! B+ Tree Node Structures
2//!
3//! Internal and leaf nodes for the B+ tree.
4
5use super::version::{Snapshot, Timestamp, TxnId, VersionChain};
6use std::fmt::Debug;
7use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
8
9/// Node ID type
10pub type NodeId = u64;
11
12/// Global node ID counter
13static NODE_ID_COUNTER: AtomicU64 = AtomicU64::new(1);
14
15/// Get next node ID
16pub fn next_node_id() -> NodeId {
17    NODE_ID_COUNTER.fetch_add(1, AtomicOrdering::SeqCst)
18}
19
20/// Node type discriminator
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum NodeType {
23    /// Internal node (keys + child pointers)
24    Internal,
25    /// Leaf node (keys + values)
26    Leaf,
27}
28
29/// Generic B+ tree node
30#[derive(Debug)]
31pub enum Node<K, V>
32where
33    K: Clone + Ord + Debug,
34    V: Clone + Debug,
35{
36    /// Internal node
37    Internal(InternalNode<K, V>),
38    /// Leaf node
39    Leaf(LeafNode<K, V>),
40}
41
42impl<K, V> Node<K, V>
43where
44    K: Clone + Ord + Debug,
45    V: Clone + Debug,
46{
47    /// Get node ID
48    pub fn id(&self) -> NodeId {
49        match self {
50            Node::Internal(n) => n.id,
51            Node::Leaf(n) => n.id,
52        }
53    }
54
55    /// Get node type
56    pub fn node_type(&self) -> NodeType {
57        match self {
58            Node::Internal(_) => NodeType::Internal,
59            Node::Leaf(_) => NodeType::Leaf,
60        }
61    }
62
63    /// Check if node is leaf
64    pub fn is_leaf(&self) -> bool {
65        matches!(self, Node::Leaf(_))
66    }
67
68    /// Get number of keys
69    pub fn len(&self) -> usize {
70        match self {
71            Node::Internal(n) => n.keys.len(),
72            Node::Leaf(n) => n.keys.len(),
73        }
74    }
75
76    /// Check if empty
77    pub fn is_empty(&self) -> bool {
78        self.len() == 0
79    }
80
81    /// Check if node is full
82    pub fn is_full(&self, order: usize) -> bool {
83        self.len() >= order - 1
84    }
85
86    /// Check if node has minimum keys
87    pub fn has_min_keys(&self, order: usize) -> bool {
88        let min = (order - 1) / 2;
89        self.len() >= min
90    }
91
92    /// Get minimum key
93    pub fn min_key(&self) -> Option<&K> {
94        match self {
95            Node::Internal(n) => n.keys.first(),
96            Node::Leaf(n) => n.keys.first(),
97        }
98    }
99
100    /// Get maximum key
101    pub fn max_key(&self) -> Option<&K> {
102        match self {
103            Node::Internal(n) => n.keys.last(),
104            Node::Leaf(n) => n.keys.last(),
105        }
106    }
107
108    /// Get as internal node
109    pub fn as_internal(&self) -> Option<&InternalNode<K, V>> {
110        match self {
111            Node::Internal(n) => Some(n),
112            _ => None,
113        }
114    }
115
116    /// Get as internal node mutable
117    pub fn as_internal_mut(&mut self) -> Option<&mut InternalNode<K, V>> {
118        match self {
119            Node::Internal(n) => Some(n),
120            _ => None,
121        }
122    }
123
124    /// Get as leaf node
125    pub fn as_leaf(&self) -> Option<&LeafNode<K, V>> {
126        match self {
127            Node::Leaf(n) => Some(n),
128            _ => None,
129        }
130    }
131
132    /// Get as leaf node mutable
133    pub fn as_leaf_mut(&mut self) -> Option<&mut LeafNode<K, V>> {
134        match self {
135            Node::Leaf(n) => Some(n),
136            _ => None,
137        }
138    }
139}
140
141/// Internal node (keys + child pointers)
142#[derive(Debug)]
143pub struct InternalNode<K, V>
144where
145    K: Clone + Ord + Debug,
146    V: Clone + Debug,
147{
148    /// Node ID
149    pub id: NodeId,
150    /// Keys (separator keys)
151    pub keys: Vec<K>,
152    /// Child node IDs (len = keys.len() + 1)
153    pub children: Vec<NodeId>,
154    /// Phantom data for V
155    _phantom: std::marker::PhantomData<V>,
156}
157
158impl<K, V> InternalNode<K, V>
159where
160    K: Clone + Ord + Debug,
161    V: Clone + Debug,
162{
163    /// Create new internal node
164    pub fn new() -> Self {
165        Self {
166            id: next_node_id(),
167            keys: Vec::new(),
168            children: Vec::new(),
169            _phantom: std::marker::PhantomData,
170        }
171    }
172
173    /// Create with initial child
174    pub fn with_child(child_id: NodeId) -> Self {
175        let mut node = Self::new();
176        node.children.push(child_id);
177        node
178    }
179
180    /// Find child index for key
181    pub fn find_child_index(&self, key: &K) -> usize {
182        match self.keys.binary_search(key) {
183            Ok(i) => i + 1,
184            Err(i) => i,
185        }
186    }
187
188    /// Get child for key
189    pub fn get_child(&self, key: &K) -> Option<NodeId> {
190        let idx = self.find_child_index(key);
191        self.children.get(idx).copied()
192    }
193
194    /// Insert key and child at position
195    pub fn insert_at(&mut self, idx: usize, key: K, right_child: NodeId) {
196        self.keys.insert(idx, key);
197        self.children.insert(idx + 1, right_child);
198    }
199
200    /// Insert key and child in sorted order
201    pub fn insert(&mut self, key: K, right_child: NodeId) {
202        let idx = match self.keys.binary_search(&key) {
203            Ok(i) | Err(i) => i,
204        };
205        self.insert_at(idx, key, right_child);
206    }
207
208    /// Remove key at index
209    pub fn remove_at(&mut self, idx: usize) -> (K, NodeId) {
210        let key = self.keys.remove(idx);
211        let child = self.children.remove(idx + 1);
212        (key, child)
213    }
214
215    /// Split node at middle
216    pub fn split(&mut self) -> (K, Self) {
217        let mid = self.keys.len() / 2;
218
219        // Middle key goes up to parent
220        let median_key = self.keys.remove(mid);
221
222        // Right half becomes new node
223        let right_keys: Vec<K> = self.keys.drain(mid..).collect();
224        let right_children: Vec<NodeId> = self.children.drain(mid + 1..).collect();
225
226        let mut right = Self::new();
227        right.keys = right_keys;
228        right.children = right_children;
229
230        (median_key, right)
231    }
232
233    /// Merge with sibling
234    pub fn merge(&mut self, separator: K, right: &mut Self) {
235        self.keys.push(separator);
236        self.keys.append(&mut right.keys);
237        self.children.append(&mut right.children);
238    }
239
240    /// Borrow from left sibling
241    ///
242    /// # Invariant
243    ///
244    /// Caller must have verified that `left.keys.len() > MIN_KEYS` before
245    /// invoking this function (see rebalance logic in `tree.rs`). Otherwise
246    /// borrowing would violate the B-tree invariant on the sibling.
247    pub fn borrow_from_left(&mut self, left: &mut Self, parent_key: &K) -> K {
248        // Get rightmost key from left sibling
249        let borrowed_key = left
250            .keys
251            .pop()
252            .expect("invariant: borrow_from_left requires left.keys non-empty");
253        let borrowed_child = left
254            .children
255            .pop()
256            .expect("invariant: internal node has children.len() == keys.len() + 1");
257
258        // Parent key comes down
259        self.keys.insert(0, parent_key.clone());
260        self.children.insert(0, borrowed_child);
261
262        // Borrowed key goes up to parent
263        borrowed_key
264    }
265
266    /// Borrow from right sibling
267    pub fn borrow_from_right(&mut self, right: &mut Self, parent_key: &K) -> K {
268        // Get leftmost key from right sibling
269        let borrowed_key = right.keys.remove(0);
270        let borrowed_child = right.children.remove(0);
271
272        // Parent key comes down
273        self.keys.push(parent_key.clone());
274        self.children.push(borrowed_child);
275
276        // Borrowed key goes up to parent
277        borrowed_key
278    }
279}
280
281impl<K, V> Default for InternalNode<K, V>
282where
283    K: Clone + Ord + Debug,
284    V: Clone + Debug,
285{
286    fn default() -> Self {
287        Self::new()
288    }
289}
290
291/// Leaf node entry with MVCC versions
292#[derive(Debug, Clone)]
293pub struct LeafEntry<K, V>
294where
295    K: Clone + Ord + Debug,
296    V: Clone + Debug,
297{
298    /// Key
299    pub key: K,
300    /// Version chain for the value
301    pub versions: VersionChain<V>,
302}
303
304impl<K, V> LeafEntry<K, V>
305where
306    K: Clone + Ord + Debug,
307    V: Clone + Debug,
308{
309    /// Create new entry
310    pub fn new(key: K, value: V, txn_id: TxnId, timestamp: Timestamp) -> Self {
311        Self {
312            key,
313            versions: VersionChain::with_value(value, txn_id, timestamp),
314        }
315    }
316
317    /// Get value for snapshot
318    pub fn get(&self, snapshot: &Snapshot) -> Option<&V> {
319        self.versions.get(snapshot)
320    }
321
322    /// Update value
323    pub fn update(&mut self, value: V, txn_id: TxnId, timestamp: Timestamp) {
324        self.versions.update(value, txn_id, timestamp);
325    }
326
327    /// Delete entry
328    pub fn delete(&mut self, txn_id: TxnId, timestamp: Timestamp) {
329        self.versions.delete(txn_id, timestamp);
330    }
331}
332
333/// Leaf node (keys + versioned values)
334#[derive(Debug)]
335pub struct LeafNode<K, V>
336where
337    K: Clone + Ord + Debug,
338    V: Clone + Debug,
339{
340    /// Node ID
341    pub id: NodeId,
342    /// Keys
343    pub keys: Vec<K>,
344    /// Versioned values (parallel to keys)
345    pub entries: Vec<LeafEntry<K, V>>,
346    /// Next leaf (for range scans)
347    pub next: Option<NodeId>,
348    /// Previous leaf (for reverse scans)
349    pub prev: Option<NodeId>,
350}
351
352impl<K, V> LeafNode<K, V>
353where
354    K: Clone + Ord + Debug,
355    V: Clone + Debug,
356{
357    /// Create new leaf node
358    pub fn new() -> Self {
359        Self {
360            id: next_node_id(),
361            keys: Vec::new(),
362            entries: Vec::new(),
363            next: None,
364            prev: None,
365        }
366    }
367
368    /// Find index for key
369    pub fn find_index(&self, key: &K) -> Result<usize, usize> {
370        self.keys.binary_search(key)
371    }
372
373    /// Get value for key
374    pub fn get(&self, key: &K, snapshot: &Snapshot) -> Option<&V> {
375        match self.find_index(key) {
376            Ok(idx) => self.entries[idx].get(snapshot),
377            Err(_) => None,
378        }
379    }
380
381    /// Check if key exists
382    pub fn contains(&self, key: &K, snapshot: &Snapshot) -> bool {
383        self.get(key, snapshot).is_some()
384    }
385
386    /// Insert key-value pair
387    pub fn insert(&mut self, key: K, value: V, txn_id: TxnId, timestamp: Timestamp) -> bool {
388        match self.find_index(&key) {
389            Ok(idx) => {
390                // Key exists, update
391                self.entries[idx].update(value, txn_id, timestamp);
392                false // No new key added
393            }
394            Err(idx) => {
395                // New key
396                let entry = LeafEntry::new(key.clone(), value, txn_id, timestamp);
397                self.keys.insert(idx, key);
398                self.entries.insert(idx, entry);
399                true // New key added
400            }
401        }
402    }
403
404    /// Delete key
405    pub fn delete(&mut self, key: &K, txn_id: TxnId, timestamp: Timestamp) -> bool {
406        match self.find_index(key) {
407            Ok(idx) => {
408                self.entries[idx].delete(txn_id, timestamp);
409                true
410            }
411            Err(_) => false,
412        }
413    }
414
415    /// Split node at middle
416    pub fn split(&mut self) -> (K, Self) {
417        let mid = self.keys.len() / 2;
418
419        // Right half becomes new node
420        let right_keys: Vec<K> = self.keys.drain(mid..).collect();
421        let right_entries: Vec<LeafEntry<K, V>> = self.entries.drain(mid..).collect();
422
423        let mut right = Self::new();
424        right.keys = right_keys.clone();
425        right.entries = right_entries;
426
427        // First key of right node is the separator
428        let separator = right.keys[0].clone();
429
430        // Link siblings
431        right.next = self.next;
432        right.prev = Some(self.id);
433        self.next = Some(right.id);
434
435        (separator, right)
436    }
437
438    /// Merge with right sibling
439    pub fn merge(&mut self, right: &mut Self) {
440        self.keys.append(&mut right.keys);
441        self.entries.append(&mut right.entries);
442        self.next = right.next;
443    }
444
445    /// Borrow from left sibling
446    ///
447    /// # Invariant
448    ///
449    /// Caller must have verified that `left.keys.len() > MIN_KEYS` before
450    /// invoking this function. In leaf nodes `keys.len() == entries.len()`
451    /// always, so one invariant check covers both pops.
452    pub fn borrow_from_left(&mut self, left: &mut Self) -> K {
453        let borrowed_key = left
454            .keys
455            .pop()
456            .expect("invariant: borrow_from_left requires left.keys non-empty");
457        let borrowed_entry = left
458            .entries
459            .pop()
460            .expect("invariant: leaf node has keys.len() == entries.len()");
461
462        self.keys.insert(0, borrowed_key.clone());
463        self.entries.insert(0, borrowed_entry);
464
465        self.keys[0].clone()
466    }
467
468    /// Borrow from right sibling
469    pub fn borrow_from_right(&mut self, right: &mut Self) -> K {
470        let borrowed_key = right.keys.remove(0);
471        let borrowed_entry = right.entries.remove(0);
472
473        self.keys.push(borrowed_key);
474        self.entries.push(borrowed_entry);
475
476        right.keys[0].clone()
477    }
478
479    /// Get all keys in range
480    pub fn range<'a>(
481        &'a self,
482        start: Option<&K>,
483        end: Option<&K>,
484        snapshot: &'a Snapshot,
485    ) -> impl Iterator<Item = (&'a K, &'a V)> {
486        let start_idx = match start {
487            Some(k) => match self.find_index(k) {
488                Ok(i) => i,
489                Err(i) => i,
490            },
491            None => 0,
492        };
493
494        let end_idx = match end {
495            Some(k) => match self.find_index(k) {
496                Ok(i) => i + 1,
497                Err(i) => i,
498            },
499            None => self.keys.len(),
500        };
501
502        self.keys[start_idx..end_idx]
503            .iter()
504            .zip(self.entries[start_idx..end_idx].iter())
505            .filter_map(move |(key, entry)| entry.get(snapshot).map(|v| (key, v)))
506    }
507
508    /// Garbage collect old versions
509    pub fn gc(&mut self, watermark: Timestamp) -> usize {
510        let mut removed = 0;
511        for entry in &mut self.entries {
512            removed += entry.versions.gc(watermark);
513        }
514        removed
515    }
516}
517
518impl<K, V> Default for LeafNode<K, V>
519where
520    K: Clone + Ord + Debug,
521    V: Clone + Debug,
522{
523    fn default() -> Self {
524        Self::new()
525    }
526}
527
528#[cfg(test)]
529mod tests {
530    use super::*;
531
532    #[test]
533    fn test_internal_node_basic() {
534        let mut node: InternalNode<i32, String> = InternalNode::new();
535        assert!(node.keys.is_empty());
536        assert!(node.children.is_empty());
537
538        // Add a child first
539        node.children.push(100);
540
541        // Insert separator keys with children
542        node.insert(5, 101);
543        node.insert(10, 102);
544        node.insert(15, 103);
545
546        assert_eq!(node.keys, vec![5, 10, 15]);
547        assert_eq!(node.children, vec![100, 101, 102, 103]);
548    }
549
550    #[test]
551    fn test_internal_node_find_child() {
552        let mut node: InternalNode<i32, String> = InternalNode::new();
553        node.children.push(100);
554        node.insert(10, 101);
555        node.insert(20, 102);
556        node.insert(30, 103);
557
558        // Key < 10 goes to child 0
559        assert_eq!(node.find_child_index(&5), 0);
560        // Key = 10 goes to child 1
561        assert_eq!(node.find_child_index(&10), 1);
562        // Key 15 (10 < x < 20) goes to child 1
563        assert_eq!(node.find_child_index(&15), 1);
564        // Key >= 30 goes to child 3
565        assert_eq!(node.find_child_index(&35), 3);
566    }
567
568    #[test]
569    fn test_internal_node_split() {
570        let mut node: InternalNode<i32, String> = InternalNode::new();
571        node.children.push(100);
572        for i in 1..=6 {
573            node.insert(i * 10, 100 + i as u64);
574        }
575
576        let (median, right) = node.split();
577
578        // Median goes up
579        assert!(node.keys.len() < 6);
580        assert!(!right.keys.is_empty());
581    }
582
583    #[test]
584    fn test_leaf_node_basic() {
585        let mut node: LeafNode<i32, String> = LeafNode::new();
586        let snapshot = Snapshot::new(TxnId::ZERO, Timestamp(100));
587
588        assert!(node.keys.is_empty());
589
590        // Insert values
591        node.insert(10, "ten".to_string(), TxnId(1), Timestamp(1));
592        node.insert(20, "twenty".to_string(), TxnId(1), Timestamp(2));
593        node.insert(5, "five".to_string(), TxnId(1), Timestamp(3));
594
595        assert_eq!(node.keys, vec![5, 10, 20]);
596        assert_eq!(node.get(&10, &snapshot), Some(&"ten".to_string()));
597        assert_eq!(node.get(&15, &snapshot), None);
598    }
599
600    #[test]
601    fn test_leaf_node_update() {
602        let mut node: LeafNode<i32, String> = LeafNode::new();
603        let snapshot = Snapshot::new(TxnId::ZERO, Timestamp(100));
604
605        node.insert(10, "v1".to_string(), TxnId(1), Timestamp(1));
606        assert_eq!(node.get(&10, &snapshot), Some(&"v1".to_string()));
607
608        // Update same key
609        node.insert(10, "v2".to_string(), TxnId(2), Timestamp(2));
610        assert_eq!(node.get(&10, &snapshot), Some(&"v2".to_string()));
611
612        // Only one key
613        assert_eq!(node.keys.len(), 1);
614    }
615
616    #[test]
617    fn test_leaf_node_delete() {
618        let mut node: LeafNode<i32, String> = LeafNode::new();
619        let snapshot = Snapshot::new(TxnId::ZERO, Timestamp(100));
620
621        node.insert(10, "ten".to_string(), TxnId(1), Timestamp(1));
622        assert!(node.contains(&10, &snapshot));
623
624        node.delete(&10, TxnId(2), Timestamp(2));
625        assert!(!node.contains(&10, &snapshot));
626    }
627
628    #[test]
629    fn test_leaf_node_split() {
630        let mut node: LeafNode<i32, String> = LeafNode::new();
631
632        for i in 1..=6 {
633            node.insert(i * 10, format!("v{}", i), TxnId(1), Timestamp(i as u64));
634        }
635
636        let (separator, right) = node.split();
637
638        // Keys are split
639        assert!(node.keys.len() < 6);
640        assert!(!right.keys.is_empty());
641
642        // Separator is first key of right
643        assert_eq!(separator, right.keys[0]);
644
645        // Siblings are linked
646        assert_eq!(node.next, Some(right.id));
647        assert_eq!(right.prev, Some(node.id));
648    }
649
650    #[test]
651    fn test_leaf_node_range() {
652        let mut node: LeafNode<i32, String> = LeafNode::new();
653        let snapshot = Snapshot::new(TxnId::ZERO, Timestamp(100));
654
655        for i in 1..=10 {
656            node.insert(i * 10, format!("v{}", i), TxnId(1), Timestamp(i as u64));
657        }
658
659        // Range 25..75
660        let results: Vec<_> = node.range(Some(&25), Some(&75), &snapshot).collect();
661        assert_eq!(results.len(), 5); // 30, 40, 50, 60, 70
662    }
663
664    #[test]
665    fn test_node_enum() {
666        let leaf: Node<i32, String> = Node::Leaf(LeafNode::new());
667        assert!(leaf.is_leaf());
668        assert!(leaf.as_leaf().is_some());
669        assert!(leaf.as_internal().is_none());
670
671        let internal: Node<i32, String> = Node::Internal(InternalNode::new());
672        assert!(!internal.is_leaf());
673        assert!(internal.as_internal().is_some());
674        assert!(internal.as_leaf().is_none());
675    }
676}