Skip to main content

reddb_server/storage/btree/
tree.rs

1//! B+ Tree Implementation
2//!
3//! Concurrent B+ tree with MVCC support.
4
5use super::node::{InternalNode, LeafEntry, LeafNode, Node, NodeId, NodeType};
6use super::version::{next_timestamp, ActiveTransaction, Snapshot, Timestamp, TxnId};
7use std::collections::HashMap;
8use std::fmt::Debug;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
11
12fn recover_read_guard<'a, T>(lock: &'a RwLock<T>) -> RwLockReadGuard<'a, T> {
13    match lock.read() {
14        Ok(guard) => guard,
15        Err(poisoned) => poisoned.into_inner(),
16    }
17}
18
19fn recover_write_guard<'a, T>(lock: &'a RwLock<T>) -> RwLockWriteGuard<'a, T> {
20    match lock.write() {
21        Ok(guard) => guard,
22        Err(poisoned) => poisoned.into_inner(),
23    }
24}
25
26/// B+ Tree configuration
27#[derive(Debug, Clone)]
28pub struct BTreeConfig {
29    /// Order (maximum children per node)
30    pub order: usize,
31    /// Enable MVCC
32    pub mvcc_enabled: bool,
33    /// GC watermark age (timestamps older than this are eligible for GC)
34    pub gc_watermark_age: Timestamp,
35}
36
37impl BTreeConfig {
38    /// Create default config
39    pub fn new() -> Self {
40        Self {
41            order: 128,
42            mvcc_enabled: true,
43            gc_watermark_age: Timestamp(1000),
44        }
45    }
46
47    /// Set order
48    pub fn with_order(mut self, order: usize) -> Self {
49        self.order = order.max(4); // Minimum order 4
50        self
51    }
52
53    /// Enable/disable MVCC
54    pub fn with_mvcc(mut self, enabled: bool) -> Self {
55        self.mvcc_enabled = enabled;
56        self
57    }
58}
59
60impl Default for BTreeConfig {
61    fn default() -> Self {
62        Self::new()
63    }
64}
65
66/// B+ Tree statistics
67#[derive(Debug, Clone, Default)]
68pub struct BTreeStats {
69    /// Number of entries
70    pub entries: u64,
71    /// Number of nodes
72    pub nodes: u64,
73    /// Number of internal nodes
74    pub internal_nodes: u64,
75    /// Number of leaf nodes
76    pub leaf_nodes: u64,
77    /// Tree height
78    pub height: u32,
79    /// Total versions (MVCC)
80    pub versions: u64,
81    /// Insert count
82    pub inserts: u64,
83    /// Update count
84    pub updates: u64,
85    /// Delete count
86    pub deletes: u64,
87    /// Get count
88    pub gets: u64,
89    /// Range scan count
90    pub range_scans: u64,
91}
92
93/// B+ Tree with MVCC
94pub struct BPlusTree<K, V>
95where
96    K: Clone + Ord + Debug + Send + Sync,
97    V: Clone + Debug + Send + Sync,
98{
99    /// Configuration
100    config: BTreeConfig,
101    /// Root node ID
102    pub(crate) root: RwLock<Option<NodeId>>,
103    /// Node storage
104    nodes: RwLock<HashMap<NodeId, Arc<RwLock<Node<K, V>>>>>,
105    /// First leaf (for full scans)
106    pub(crate) first_leaf: RwLock<Option<NodeId>>,
107    /// Statistics
108    stats: RwLock<BTreeStats>,
109    /// Lock-free hot-path counters — avoids serializing concurrent reads through
110    /// a write lock just to increment a counter.
111    gets_counter: AtomicU64,
112    range_scans_counter: AtomicU64,
113    /// Active transactions
114    active_txns: RwLock<HashMap<TxnId, ActiveTransaction>>,
115    /// Next transaction ID
116    next_txn_id: RwLock<TxnId>,
117}
118
119impl<K, V> BPlusTree<K, V>
120where
121    K: Clone + Ord + Debug + Send + Sync,
122    V: Clone + Debug + Send + Sync,
123{
124    /// Create new empty tree
125    pub fn new(config: BTreeConfig) -> Self {
126        Self {
127            config,
128            root: RwLock::new(None),
129            nodes: RwLock::new(HashMap::new()),
130            first_leaf: RwLock::new(None),
131            stats: RwLock::new(BTreeStats::default()),
132            gets_counter: AtomicU64::new(0),
133            range_scans_counter: AtomicU64::new(0),
134            active_txns: RwLock::new(HashMap::new()),
135            next_txn_id: RwLock::new(TxnId(1)),
136        }
137    }
138
139    /// Create with default config
140    pub fn with_default_config() -> Self {
141        Self::new(BTreeConfig::default())
142    }
143
144    /// Get configuration
145    pub fn config(&self) -> &BTreeConfig {
146        &self.config
147    }
148
149    /// Get statistics
150    pub fn stats(&self) -> BTreeStats {
151        let mut s = recover_read_guard(&self.stats).clone();
152        s.gets = self.gets_counter.load(Ordering::Relaxed);
153        s.range_scans = self.range_scans_counter.load(Ordering::Relaxed);
154        s
155    }
156
157    /// Check if tree is empty
158    pub fn is_empty(&self) -> bool {
159        recover_read_guard(&self.root).is_none()
160    }
161
162    /// Get number of entries
163    pub fn len(&self) -> usize {
164        recover_read_guard(&self.stats).entries as usize
165    }
166
167    // =========================================
168    // Transaction Management
169    // =========================================
170
171    /// Begin a new transaction
172    pub fn begin_transaction(&self) -> TxnId {
173        let mut next_id = recover_write_guard(&self.next_txn_id);
174        let txn_id = *next_id;
175        *next_id += 1;
176
177        // Get list of active transactions
178        let active_txns = recover_read_guard(&self.active_txns);
179        let active_list: Vec<TxnId> = active_txns.keys().copied().collect();
180        drop(active_txns);
181
182        // Create new transaction
183        let txn = ActiveTransaction::new(txn_id, active_list);
184
185        recover_write_guard(&self.active_txns).insert(txn_id, txn);
186
187        txn_id
188    }
189
190    /// Commit a transaction
191    pub fn commit_transaction(&self, txn_id: TxnId) -> bool {
192        let mut active = recover_write_guard(&self.active_txns);
193        if let Some(mut txn) = active.remove(&txn_id) {
194            txn.commit();
195            true
196        } else {
197            false
198        }
199    }
200
201    /// Abort a transaction
202    pub fn abort_transaction(&self, txn_id: TxnId) -> bool {
203        let mut active = recover_write_guard(&self.active_txns);
204        if let Some(mut txn) = active.remove(&txn_id) {
205            txn.abort();
206            true
207        } else {
208            false
209        }
210    }
211
212    /// Get snapshot for transaction
213    pub fn get_snapshot(&self, txn_id: TxnId) -> Option<Snapshot> {
214        let active = recover_read_guard(&self.active_txns);
215        active.get(&txn_id).map(|txn| txn.snapshot.clone())
216    }
217
218    /// Create a read-only snapshot at current time
219    pub fn snapshot(&self) -> Snapshot {
220        Snapshot::new(TxnId::ZERO, next_timestamp())
221    }
222
223    // =========================================
224    // Node Management
225    // =========================================
226
227    /// Get node by ID
228    pub(crate) fn get_node(&self, id: NodeId) -> Option<Arc<RwLock<Node<K, V>>>> {
229        recover_read_guard(&self.nodes).get(&id).cloned()
230    }
231
232    /// Store node
233    fn store_node(&self, node: Node<K, V>) -> NodeId {
234        let id = node.id();
235        let arc = Arc::new(RwLock::new(node));
236        let node_type = recover_read_guard(&arc).node_type();
237        recover_write_guard(&self.nodes).insert(id, Arc::clone(&arc));
238
239        let mut stats = recover_write_guard(&self.stats);
240        stats.nodes += 1;
241        match node_type {
242            NodeType::Internal => stats.internal_nodes += 1,
243            NodeType::Leaf => stats.leaf_nodes += 1,
244        }
245
246        id
247    }
248
249    /// Remove node
250    fn remove_node(&self, id: NodeId) {
251        if let Some(node) = recover_write_guard(&self.nodes).remove(&id) {
252            let mut stats = recover_write_guard(&self.stats);
253            stats.nodes -= 1;
254            match recover_read_guard(&node).node_type() {
255                NodeType::Internal => stats.internal_nodes -= 1,
256                NodeType::Leaf => stats.leaf_nodes -= 1,
257            }
258        }
259    }
260
261    // =========================================
262    // Read Operations
263    // =========================================
264
265    /// Get value for key
266    pub fn get(&self, key: &K, snapshot: &Snapshot) -> Option<V> {
267        self.gets_counter.fetch_add(1, Ordering::Relaxed);
268
269        let root_id = *recover_read_guard(&self.root);
270        let root_id = root_id?;
271
272        self.get_from_node(root_id, key, snapshot)
273    }
274
275    /// Get value starting from node
276    fn get_from_node(&self, node_id: NodeId, key: &K, snapshot: &Snapshot) -> Option<V> {
277        let node = self.get_node(node_id)?;
278        let node = recover_read_guard(&node);
279
280        match &*node {
281            Node::Internal(internal) => {
282                let child_id = internal.get_child(key)?;
283                drop(node);
284                self.get_from_node(child_id, key, snapshot)
285            }
286            Node::Leaf(leaf) => leaf.get(key, snapshot).cloned(),
287        }
288    }
289
290    /// Check if key exists
291    pub fn contains(&self, key: &K, snapshot: &Snapshot) -> bool {
292        self.get(key, snapshot).is_some()
293    }
294
295    /// Get range of values
296    pub fn range(&self, start: Option<&K>, end: Option<&K>, snapshot: &Snapshot) -> Vec<(K, V)> {
297        self.range_scans_counter.fetch_add(1, Ordering::Relaxed);
298
299        let mut results = Vec::new();
300
301        // Find starting leaf
302        let start_leaf_id = if let Some(start_key) = start {
303            self.find_leaf(start_key)
304        } else {
305            *recover_read_guard(&self.first_leaf)
306        };
307
308        let Some(mut leaf_id) = start_leaf_id else {
309            return results;
310        };
311
312        // Traverse leaves
313        loop {
314            let node = match self.get_node(leaf_id) {
315                Some(n) => n,
316                None => break,
317            };
318
319            let node = recover_read_guard(&node);
320            let leaf = match &*node {
321                Node::Leaf(l) => l,
322                _ => break,
323            };
324
325            // Collect entries from this leaf
326            for (key, value) in leaf.range(start, end, snapshot) {
327                // Check end condition
328                if let Some(end_key) = end {
329                    if key >= end_key {
330                        return results;
331                    }
332                }
333                results.push((key.clone(), value.clone()));
334            }
335
336            // Move to next leaf
337            leaf_id = match leaf.next {
338                Some(id) => id,
339                None => break,
340            };
341        }
342
343        results
344    }
345
346    /// Find leaf node for key
347    fn find_leaf(&self, key: &K) -> Option<NodeId> {
348        let root_id = *recover_read_guard(&self.root);
349        let root_id = root_id?;
350
351        self.find_leaf_from_node(root_id, key)
352    }
353
354    /// Find leaf starting from node
355    fn find_leaf_from_node(&self, node_id: NodeId, key: &K) -> Option<NodeId> {
356        let node = self.get_node(node_id)?;
357        let node = recover_read_guard(&node);
358
359        match &*node {
360            Node::Internal(internal) => {
361                let child_id = internal.get_child(key)?;
362                drop(node);
363                self.find_leaf_from_node(child_id, key)
364            }
365            Node::Leaf(_) => Some(node_id),
366        }
367    }
368
369    // =========================================
370    // Write Operations
371    // =========================================
372
373    /// Insert key-value pair
374    pub fn insert(&self, key: K, value: V, txn_id: TxnId) -> bool {
375        let timestamp = next_timestamp();
376        self.insert_with_timestamp(key, value, txn_id, timestamp)
377    }
378
379    /// Insert with explicit timestamp
380    fn insert_with_timestamp(&self, key: K, value: V, txn_id: TxnId, timestamp: Timestamp) -> bool {
381        let mut root_lock = recover_write_guard(&self.root);
382
383        if root_lock.is_none() {
384            // Create first leaf
385            let mut leaf = LeafNode::new();
386            leaf.insert(key, value, txn_id, timestamp);
387            let leaf_id = self.store_node(Node::Leaf(leaf));
388            *root_lock = Some(leaf_id);
389            *recover_write_guard(&self.first_leaf) = Some(leaf_id);
390
391            let mut stats = recover_write_guard(&self.stats);
392            stats.entries += 1;
393            stats.inserts += 1;
394            stats.height = 1;
395
396            return true;
397        }
398
399        let Some(root_id) = *root_lock else {
400            return false;
401        };
402        drop(root_lock);
403
404        // Insert into tree
405        match self.insert_recursive(root_id, key.clone(), value, txn_id, timestamp) {
406            InsertResult::Done(is_new) => {
407                let mut stats = recover_write_guard(&self.stats);
408                if is_new {
409                    stats.entries += 1;
410                    stats.inserts += 1;
411                } else {
412                    stats.updates += 1;
413                }
414                is_new
415            }
416            InsertResult::Split(median, right_id) => {
417                // Root split - create new root
418                let mut new_root = InternalNode::new();
419                new_root.children.push(root_id);
420                new_root.insert(median, right_id);
421
422                let new_root_id = self.store_node(Node::Internal(new_root));
423                *recover_write_guard(&self.root) = Some(new_root_id);
424
425                let mut stats = recover_write_guard(&self.stats);
426                stats.entries += 1;
427                stats.inserts += 1;
428                stats.height += 1;
429
430                true
431            }
432        }
433    }
434
435    /// Recursive insert
436    fn insert_recursive(
437        &self,
438        node_id: NodeId,
439        key: K,
440        value: V,
441        txn_id: TxnId,
442        timestamp: Timestamp,
443    ) -> InsertResult<K> {
444        let Some(node) = self.get_node(node_id) else {
445            return InsertResult::Done(false);
446        };
447        let mut node = recover_write_guard(&node);
448
449        match &mut *node {
450            Node::Internal(internal) => {
451                let child_idx = internal.find_child_index(&key);
452                let child_id = internal.children[child_idx];
453                drop(node);
454
455                match self.insert_recursive(child_id, key, value, txn_id, timestamp) {
456                    InsertResult::Done(is_new) => InsertResult::Done(is_new),
457                    InsertResult::Split(median, right_child) => {
458                        let Some(node) = self.get_node(node_id) else {
459                            return InsertResult::Done(false);
460                        };
461                        let mut node = recover_write_guard(&node);
462                        let internal = match &mut *node {
463                            Node::Internal(internal) => internal,
464                            Node::Leaf(_) => return InsertResult::Done(false),
465                        };
466
467                        internal.insert(median, right_child);
468
469                        if internal.keys.len() >= self.config.order - 1 {
470                            // Need to split
471                            let (new_median, right) = internal.split();
472                            let right_id = self.store_node(Node::Internal(right));
473                            InsertResult::Split(new_median, right_id)
474                        } else {
475                            InsertResult::Done(true)
476                        }
477                    }
478                }
479            }
480            Node::Leaf(leaf) => {
481                let is_new = leaf.insert(key.clone(), value, txn_id, timestamp);
482
483                if leaf.keys.len() >= self.config.order - 1 {
484                    // Need to split
485                    let (median, right) = leaf.split();
486                    let right_id = self.store_node(Node::Leaf(right));
487                    InsertResult::Split(median, right_id)
488                } else {
489                    InsertResult::Done(is_new)
490                }
491            }
492        }
493    }
494
495    /// Delete key
496    pub fn delete(&self, key: &K, txn_id: TxnId) -> bool {
497        let timestamp = next_timestamp();
498        self.delete_with_timestamp(key, txn_id, timestamp)
499    }
500
501    /// Delete with explicit timestamp
502    fn delete_with_timestamp(&self, key: &K, txn_id: TxnId, timestamp: Timestamp) -> bool {
503        let root_id = match *recover_read_guard(&self.root) {
504            Some(id) => id,
505            None => return false,
506        };
507
508        // Find leaf with key
509        let leaf_id = match self.find_leaf(key) {
510            Some(id) => id,
511            None => return false,
512        };
513
514        // Mark as deleted (MVCC soft delete)
515        let Some(node) = self.get_node(leaf_id) else {
516            return false;
517        };
518        let mut node = recover_write_guard(&node);
519
520        if let Node::Leaf(leaf) = &mut *node {
521            if leaf.delete(key, txn_id, timestamp) {
522                recover_write_guard(&self.stats).deletes += 1;
523                return true;
524            }
525        }
526
527        false
528    }
529
530    // =========================================
531    // Utility Operations
532    // =========================================
533
534    pub(crate) fn compact_deleted_entries(&self, watermark: Timestamp) -> usize {
535        let mut kept_entries: Vec<LeafEntry<K, V>> = Vec::new();
536        let mut removed = 0usize;
537
538        let mut leaf_id = *recover_read_guard(&self.first_leaf);
539        while let Some(id) = leaf_id {
540            let node = match self.get_node(id) {
541                Some(node) => node,
542                None => break,
543            };
544            let node = recover_read_guard(&node);
545            if let Node::Leaf(leaf) = &*node {
546                for entry in &leaf.entries {
547                    if Self::entry_purgeable(entry, watermark) {
548                        removed += 1;
549                    } else {
550                        kept_entries.push(entry.clone());
551                    }
552                }
553                leaf_id = leaf.next;
554            } else {
555                break;
556            }
557        }
558
559        if removed == 0 {
560            return 0;
561        }
562
563        self.rebuild_from_entries(kept_entries);
564        removed
565    }
566
567    fn entry_purgeable(entry: &LeafEntry<K, V>, watermark: Timestamp) -> bool {
568        if !entry.versions.is_all_deleted() {
569            return false;
570        }
571
572        entry
573            .versions
574            .head()
575            .map(|version| version.created_at <= watermark)
576            .unwrap_or(false)
577    }
578
579    fn rebuild_from_entries(&self, entries: Vec<LeafEntry<K, V>>) {
580        let mut new_nodes: HashMap<NodeId, Arc<RwLock<Node<K, V>>>> = HashMap::new();
581        let mut leaf_nodes = Vec::new();
582
583        let max_leaf_keys = self.config.order.saturating_sub(1).max(1);
584
585        for chunk in entries.chunks(max_leaf_keys) {
586            let mut leaf = LeafNode::new();
587            for entry in chunk {
588                leaf.keys.push(entry.key.clone());
589                leaf.entries.push(entry.clone());
590            }
591            leaf_nodes.push(leaf);
592        }
593
594        for i in 0..leaf_nodes.len() {
595            if i > 0 {
596                let prev_id = leaf_nodes[i - 1].id;
597                leaf_nodes[i].prev = Some(prev_id);
598            }
599            if i + 1 < leaf_nodes.len() {
600                let next_id = leaf_nodes[i + 1].id;
601                leaf_nodes[i].next = Some(next_id);
602            }
603        }
604
605        let mut current_level: Vec<NodeId> = Vec::new();
606        for leaf in leaf_nodes {
607            let id = leaf.id;
608            new_nodes.insert(id, Arc::new(RwLock::new(Node::Leaf(leaf))));
609            current_level.push(id);
610        }
611
612        let mut height = if current_level.is_empty() { 0 } else { 1 };
613        let max_children = self.config.order.max(2);
614
615        while current_level.len() > 1 {
616            let mut next_level = Vec::new();
617
618            for chunk in current_level.chunks(max_children) {
619                let mut internal = InternalNode::new();
620
621                for (idx, child_id) in chunk.iter().enumerate() {
622                    internal.children.push(*child_id);
623                    if idx > 0 {
624                        let key = Self::node_min_key(&new_nodes, *child_id);
625                        internal.keys.push(key);
626                    }
627                }
628
629                let id = internal.id;
630                new_nodes.insert(id, Arc::new(RwLock::new(Node::Internal(internal))));
631                next_level.push(id);
632            }
633
634            current_level = next_level;
635            height += 1;
636        }
637
638        let root_id = current_level.first().copied();
639
640        *recover_write_guard(&self.root) = root_id;
641        *recover_write_guard(&self.first_leaf) = root_id.and_then(|id| {
642            let node = new_nodes.get(&id)?;
643            let node = recover_read_guard(node);
644            match &*node {
645                Node::Leaf(_) => Some(id),
646                Node::Internal(_) => {
647                    let mut current = id;
648                    loop {
649                        let node = new_nodes.get(&current)?;
650                        let node = recover_read_guard(node);
651                        match &*node {
652                            Node::Leaf(_) => return Some(current),
653                            Node::Internal(internal) => {
654                                if let Some(&child) = internal.children.first() {
655                                    current = child;
656                                } else {
657                                    return None;
658                                }
659                            }
660                        }
661                    }
662                }
663            }
664        });
665
666        *recover_write_guard(&self.nodes) = new_nodes;
667
668        let leaf_count = if let Some(first_leaf) = *recover_read_guard(&self.first_leaf) {
669            let nodes = recover_read_guard(&self.nodes);
670            let mut count = 0u64;
671            let mut leaf_id = Some(first_leaf);
672            while let Some(id) = leaf_id {
673                let node = nodes.get(&id).cloned();
674                if let Some(node) = node {
675                    let node = recover_read_guard(&node);
676                    if let Node::Leaf(leaf) = &*node {
677                        count += 1;
678                        leaf_id = leaf.next;
679                    } else {
680                        break;
681                    }
682                } else {
683                    break;
684                }
685            }
686            count
687        } else {
688            0
689        };
690
691        let mut stats = recover_write_guard(&self.stats);
692        stats.entries = entries.len() as u64;
693        stats.nodes = recover_read_guard(&self.nodes).len() as u64;
694        stats.leaf_nodes = leaf_count;
695        stats.internal_nodes = stats.nodes.saturating_sub(leaf_count);
696        stats.height = height as u32;
697    }
698
699    fn node_min_key(nodes: &HashMap<NodeId, Arc<RwLock<Node<K, V>>>>, node_id: NodeId) -> K {
700        let node = nodes.get(&node_id).expect("node missing");
701        let node = recover_read_guard(node);
702        match &*node {
703            Node::Leaf(leaf) => leaf.keys.first().expect("leaf empty").clone(),
704            Node::Internal(internal) => {
705                let child = *internal.children.first().expect("internal empty");
706                drop(node);
707                Self::node_min_key(nodes, child)
708            }
709        }
710    }
711
712    /// Clear all entries
713    pub fn clear(&self) {
714        *recover_write_guard(&self.root) = None;
715        *recover_write_guard(&self.first_leaf) = None;
716        recover_write_guard(&self.nodes).clear();
717        *recover_write_guard(&self.stats) = BTreeStats::default();
718    }
719
720    /// Get tree height
721    pub fn height(&self) -> u32 {
722        let root_id = match *recover_read_guard(&self.root) {
723            Some(id) => id,
724            None => return 0,
725        };
726
727        self.height_from_node(root_id)
728    }
729
730    /// Calculate height from node
731    fn height_from_node(&self, node_id: NodeId) -> u32 {
732        let node = match self.get_node(node_id) {
733            Some(n) => n,
734            None => return 0,
735        };
736
737        let node = recover_read_guard(&node);
738
739        match &*node {
740            Node::Leaf(_) => 1,
741            Node::Internal(internal) => {
742                if let Some(&first_child) = internal.children.first() {
743                    drop(node);
744                    1 + self.height_from_node(first_child)
745                } else {
746                    1
747                }
748            }
749        }
750    }
751
752    /// Collect all keys (for debugging)
753    pub fn all_keys(&self, snapshot: &Snapshot) -> Vec<K> {
754        self.range(None, None, snapshot)
755            .into_iter()
756            .map(|(k, _)| k)
757            .collect()
758    }
759}
760
761impl<K, V> Default for BPlusTree<K, V>
762where
763    K: Clone + Ord + Debug + Send + Sync,
764    V: Clone + Debug + Send + Sync,
765{
766    fn default() -> Self {
767        Self::with_default_config()
768    }
769}
770
771/// Result of insert operation
772enum InsertResult<K> {
773    /// Insert completed
774    Done(bool),
775    /// Node split occurred (median key, right node ID)
776    Split(K, NodeId),
777}
778
779#[cfg(test)]
780mod tests {
781    use super::*;
782
783    #[test]
784    fn test_empty_tree() {
785        let tree: BPlusTree<i32, String> = BPlusTree::with_default_config();
786        assert!(tree.is_empty());
787        assert_eq!(tree.len(), 0);
788        assert_eq!(tree.height(), 0);
789    }
790
791    #[test]
792    fn test_insert_single() {
793        let tree: BPlusTree<i32, String> = BPlusTree::with_default_config();
794
795        tree.insert(10, "ten".to_string(), TxnId(1));
796
797        // Snapshot taken AFTER insert to see the data (MVCC semantics)
798        let snapshot = tree.snapshot();
799        assert!(!tree.is_empty());
800        assert_eq!(tree.len(), 1);
801        assert_eq!(tree.get(&10, &snapshot), Some("ten".to_string()));
802    }
803
804    #[test]
805    fn test_insert_multiple() {
806        let tree: BPlusTree<i32, String> = BPlusTree::new(BTreeConfig::new().with_order(4));
807
808        for i in 1..=10 {
809            tree.insert(i, format!("v{}", i), TxnId(1));
810        }
811
812        // Snapshot taken AFTER inserts to see all data (MVCC semantics)
813        let snapshot = tree.snapshot();
814        assert_eq!(tree.len(), 10);
815
816        for i in 1..=10 {
817            assert_eq!(tree.get(&i, &snapshot), Some(format!("v{}", i)));
818        }
819    }
820
821    #[test]
822    fn test_insert_causes_split() {
823        let tree: BPlusTree<i32, String> = BPlusTree::new(BTreeConfig::new().with_order(4));
824
825        // Insert enough to cause splits
826        for i in 1..=20 {
827            tree.insert(i, format!("v{}", i), TxnId(1));
828        }
829
830        // Snapshot taken AFTER inserts to see all data (MVCC semantics)
831        let snapshot = tree.snapshot();
832        assert_eq!(tree.len(), 20);
833        assert!(tree.height() > 1);
834
835        // All values still accessible
836        for i in 1..=20 {
837            assert_eq!(tree.get(&i, &snapshot), Some(format!("v{}", i)));
838        }
839    }
840
841    #[test]
842    fn test_update() {
843        let tree: BPlusTree<i32, String> = BPlusTree::with_default_config();
844
845        tree.insert(10, "v1".to_string(), TxnId(1));
846        // Snapshot AFTER first insert
847        let snapshot1 = tree.snapshot();
848        assert_eq!(tree.get(&10, &snapshot1), Some("v1".to_string()));
849
850        tree.insert(10, "v2".to_string(), TxnId(2));
851        // Snapshot AFTER update
852        let snapshot2 = tree.snapshot();
853        assert_eq!(tree.get(&10, &snapshot2), Some("v2".to_string()));
854
855        // Still only one entry
856        assert_eq!(tree.len(), 1);
857    }
858
859    #[test]
860    fn test_delete() {
861        let tree: BPlusTree<i32, String> = BPlusTree::with_default_config();
862
863        tree.insert(10, "ten".to_string(), TxnId(1));
864        // Snapshot AFTER insert
865        let snapshot1 = tree.snapshot();
866        assert!(tree.contains(&10, &snapshot1));
867
868        tree.delete(&10, TxnId(2));
869        // Snapshot AFTER delete
870        let snapshot2 = tree.snapshot();
871        assert!(!tree.contains(&10, &snapshot2));
872    }
873
874    #[test]
875    fn test_range() {
876        let tree: BPlusTree<i32, String> = BPlusTree::new(BTreeConfig::new().with_order(4));
877
878        for i in 1..=100 {
879            tree.insert(i, format!("v{}", i), TxnId(1));
880        }
881
882        // Snapshot AFTER inserts (MVCC semantics)
883        let snapshot = tree.snapshot();
884
885        // Range 25..75
886        let results = tree.range(Some(&25), Some(&75), &snapshot);
887        assert_eq!(results.len(), 50); // 25..74 inclusive
888
889        // First and last
890        assert_eq!(results.first().unwrap().0, 25);
891        assert_eq!(results.last().unwrap().0, 74);
892    }
893
894    #[test]
895    fn test_range_full() {
896        let tree: BPlusTree<i32, String> = BPlusTree::new(BTreeConfig::new().with_order(4));
897
898        for i in 1..=10 {
899            tree.insert(i, format!("v{}", i), TxnId(1));
900        }
901
902        // Snapshot AFTER inserts (MVCC semantics)
903        let snapshot = tree.snapshot();
904        let results = tree.range(None, None, &snapshot);
905        assert_eq!(results.len(), 10);
906    }
907
908    #[test]
909    fn test_transactions() {
910        let tree: BPlusTree<i32, String> = BPlusTree::with_default_config();
911
912        // Begin transaction
913        let txn1 = tree.begin_transaction();
914        assert!(tree.get_snapshot(txn1).is_some());
915
916        // Insert in transaction
917        tree.insert(10, "ten".to_string(), txn1);
918
919        // Commit
920        assert!(tree.commit_transaction(txn1));
921        assert!(tree.get_snapshot(txn1).is_none());
922    }
923
924    #[test]
925    fn test_abort_transaction() {
926        let tree: BPlusTree<i32, String> = BPlusTree::with_default_config();
927
928        let txn1 = tree.begin_transaction();
929        tree.insert(10, "ten".to_string(), txn1);
930
931        // Abort
932        assert!(tree.abort_transaction(txn1));
933        assert!(tree.get_snapshot(txn1).is_none());
934    }
935
936    #[test]
937    fn test_clear() {
938        let tree: BPlusTree<i32, String> = BPlusTree::with_default_config();
939
940        for i in 1..=10 {
941            tree.insert(i, format!("v{}", i), TxnId(1));
942        }
943
944        assert!(!tree.is_empty());
945
946        tree.clear();
947        assert!(tree.is_empty());
948        assert_eq!(tree.len(), 0);
949    }
950
951    #[test]
952    fn test_all_keys() {
953        let tree: BPlusTree<i32, String> = BPlusTree::new(BTreeConfig::new().with_order(4));
954
955        for i in [5, 2, 8, 1, 9, 3, 7, 4, 6, 10] {
956            tree.insert(i, format!("v{}", i), TxnId(1));
957        }
958
959        // Snapshot AFTER inserts (MVCC semantics)
960        let snapshot = tree.snapshot();
961        let keys = tree.all_keys(&snapshot);
962        assert_eq!(keys, vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
963    }
964
965    #[test]
966    fn test_stats() {
967        let tree: BPlusTree<i32, String> = BPlusTree::new(BTreeConfig::new().with_order(4));
968        let snapshot = tree.snapshot();
969
970        for i in 1..=10 {
971            tree.insert(i, format!("v{}", i), TxnId(1));
972        }
973
974        tree.get(&5, &snapshot);
975        tree.delete(&3, TxnId(2));
976
977        let stats = tree.stats();
978        assert_eq!(stats.inserts, 10);
979        assert_eq!(stats.gets, 1);
980        assert_eq!(stats.deletes, 1);
981    }
982}