Skip to main content

reddb_server/storage/btree/
cursor.rs

1//! B+ Tree Cursor
2//!
3//! Iterator for traversing B+ tree entries.
4
5use super::node::{Node, NodeId};
6use super::tree::BPlusTree;
7use super::version::Snapshot;
8use std::fmt::Debug;
9use std::sync::{RwLock, RwLockReadGuard};
10
11fn recover_read_guard<'a, T>(lock: &'a RwLock<T>) -> RwLockReadGuard<'a, T> {
12    match lock.read() {
13        Ok(guard) => guard,
14        Err(poisoned) => poisoned.into_inner(),
15    }
16}
17
18/// Cursor direction
19#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub enum CursorDirection {
21    /// Forward (ascending keys)
22    Forward,
23    /// Backward (descending keys)
24    Backward,
25}
26
27/// Cursor state
28#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum CursorState {
30    /// Before first entry
31    BeforeFirst,
32    /// At valid entry
33    Valid,
34    /// After last entry
35    AfterLast,
36    /// Invalid/closed
37    Invalid,
38}
39
40/// Cursor for iterating over B+ tree entries
41pub struct Cursor<'a, K, V>
42where
43    K: Clone + Ord + Debug + Send + Sync,
44    V: Clone + Debug + Send + Sync,
45{
46    /// Reference to tree
47    tree: &'a BPlusTree<K, V>,
48    /// Snapshot for consistent reads
49    snapshot: Snapshot,
50    /// Current leaf node ID
51    current_leaf: Option<NodeId>,
52    /// Current index in leaf
53    current_index: usize,
54    /// Direction
55    direction: CursorDirection,
56    /// State
57    state: CursorState,
58    /// Cached current key-value
59    current_entry: Option<(K, V)>,
60    /// Pre-fetched visible entries from the current leaf.
61    ///
62    /// Populated once when the cursor lands on a new leaf (via `fill_leaf_cache`).
63    /// Subsequent `next()` calls drain this vec without re-acquiring `nodes.read()`.
64    /// One lock acquisition per leaf instead of one per visible entry.
65    leaf_cache: Vec<(K, V)>,
66    /// Next position to consume from `leaf_cache`.
67    leaf_cache_idx: usize,
68}
69
70impl<'a, K, V> Cursor<'a, K, V>
71where
72    K: Clone + Ord + Debug + Send + Sync,
73    V: Clone + Debug + Send + Sync,
74{
75    /// Create new forward cursor at beginning
76    pub fn new(tree: &'a BPlusTree<K, V>, snapshot: Snapshot) -> Self {
77        Self {
78            tree,
79            snapshot,
80            current_leaf: None,
81            current_index: 0,
82            direction: CursorDirection::Forward,
83            state: CursorState::BeforeFirst,
84            current_entry: None,
85            leaf_cache: Vec::new(),
86            leaf_cache_idx: 0,
87        }
88    }
89
90    /// Create cursor at specific key
91    pub fn at_key(tree: &'a BPlusTree<K, V>, snapshot: Snapshot, key: &K) -> Self {
92        let mut cursor = Self::new(tree, snapshot);
93        cursor.seek(key);
94        cursor
95    }
96
97    /// Create reverse cursor
98    pub fn reverse(tree: &'a BPlusTree<K, V>, snapshot: Snapshot) -> Self {
99        Self {
100            tree,
101            snapshot,
102            current_leaf: None,
103            current_index: 0,
104            direction: CursorDirection::Backward,
105            state: CursorState::AfterLast,
106            current_entry: None,
107            leaf_cache: Vec::new(),
108            leaf_cache_idx: 0,
109        }
110    }
111
112    /// Get current state
113    pub fn state(&self) -> CursorState {
114        self.state
115    }
116
117    /// Check if at valid entry
118    pub fn is_valid(&self) -> bool {
119        self.state == CursorState::Valid
120    }
121
122    /// Get current key
123    pub fn key(&self) -> Option<&K> {
124        self.current_entry.as_ref().map(|(k, _)| k)
125    }
126
127    /// Get current value
128    pub fn value(&self) -> Option<&V> {
129        self.current_entry.as_ref().map(|(_, v)| v)
130    }
131
132    /// Get current key-value pair
133    pub fn entry(&self) -> Option<(&K, &V)> {
134        self.current_entry.as_ref().map(|(k, v)| (k, v))
135    }
136
137    /// Move to first entry
138    pub fn first(&mut self) -> bool {
139        // Find first leaf
140        let first_leaf = match *recover_read_guard(&self.tree.first_leaf) {
141            Some(id) => id,
142            None => {
143                self.state = CursorState::AfterLast;
144                return false;
145            }
146        };
147
148        self.current_leaf = Some(first_leaf);
149        self.current_index = 0;
150        self.direction = CursorDirection::Forward;
151        // Reset cache so fill_leaf_cache() is called for the new leaf
152        self.leaf_cache.clear();
153        self.leaf_cache_idx = 0;
154
155        self.load_current()
156    }
157
158    /// Move to last entry
159    pub fn last(&mut self) -> bool {
160        // Find last leaf by traversing from first
161        let mut leaf_id = match *recover_read_guard(&self.tree.first_leaf) {
162            Some(id) => id,
163            None => {
164                self.state = CursorState::BeforeFirst;
165                return false;
166            }
167        };
168
169        // Walk to last leaf
170        while let Some(node) = self.tree.get_node(leaf_id) {
171            let node = recover_read_guard(&node);
172            if let Node::Leaf(leaf) = &*node {
173                match leaf.next {
174                    Some(next_id) => {
175                        leaf_id = next_id;
176                    }
177                    None => break,
178                }
179            } else {
180                break;
181            }
182        }
183
184        self.current_leaf = Some(leaf_id);
185
186        // Set to last entry in leaf
187        if let Some(node) = self.tree.get_node(leaf_id) {
188            let node = recover_read_guard(&node);
189            if let Node::Leaf(leaf) = &*node {
190                self.current_index = leaf.keys.len().saturating_sub(1);
191            }
192        }
193
194        self.direction = CursorDirection::Backward;
195        // Reset cache — last() positions at a specific index; subsequent prev()
196        // calls use load_current_at() which bypasses the forward cache.
197        self.leaf_cache.clear();
198        self.leaf_cache_idx = 0;
199        self.load_current_at(self.current_index)
200    }
201
202    /// Seek to key (or first key >= key)
203    pub fn seek(&mut self, key: &K) -> bool {
204        // Find leaf containing key
205        let leaf_id = match self.find_leaf(key) {
206            Some(id) => id,
207            None => {
208                self.state = CursorState::AfterLast;
209                return false;
210            }
211        };
212
213        self.current_leaf = Some(leaf_id);
214
215        // Find index in leaf
216        if let Some(node) = self.tree.get_node(leaf_id) {
217            let node = recover_read_guard(&node);
218            if let Node::Leaf(leaf) = &*node {
219                match leaf.keys.binary_search(key) {
220                    Ok(i) => self.current_index = i,
221                    Err(i) => self.current_index = i,
222                }
223            }
224        }
225
226        // Reset cache — fill_leaf_cache respects current_index for seek offset
227        self.leaf_cache.clear();
228        self.leaf_cache_idx = 0;
229
230        self.load_current()
231    }
232
233    /// Move to next entry
234    pub fn next(&mut self) -> bool {
235        match self.state {
236            CursorState::BeforeFirst => self.first(),
237            CursorState::AfterLast | CursorState::Invalid => false,
238            CursorState::Valid => {
239                // Fast path: consume next entry from the pre-fetched leaf cache
240                self.leaf_cache_idx += 1;
241                if self.leaf_cache_idx < self.leaf_cache.len() {
242                    let entry = self.leaf_cache[self.leaf_cache_idx].clone();
243                    self.current_entry = Some(entry);
244                    return true;
245                }
246                // Cache exhausted — move to next leaf
247                self.move_to_next_leaf()
248            }
249        }
250    }
251
252    /// Move to previous entry
253    pub fn prev(&mut self) -> bool {
254        match self.state {
255            CursorState::AfterLast => self.last(),
256            CursorState::BeforeFirst | CursorState::Invalid => false,
257            CursorState::Valid => {
258                if self.current_index == 0 {
259                    // Move to previous leaf
260                    self.move_to_prev_leaf()
261                } else {
262                    self.current_index -= 1;
263                    // Backward traversal bypasses the forward cache — load directly
264                    // from the node at current_index.
265                    self.leaf_cache.clear();
266                    self.leaf_cache_idx = 0;
267                    self.load_current_at(self.current_index)
268                }
269            }
270        }
271    }
272
273    /// Load a single visible entry at `index` in `current_leaf`, without using
274    /// or populating `leaf_cache`.  Used by backward traversal where the cache
275    /// (filled in forward order from seek/first position) doesn't cover earlier
276    /// positions.
277    fn load_current_at(&mut self, index: usize) -> bool {
278        let leaf_id = match self.current_leaf {
279            Some(id) => id,
280            None => {
281                self.state = CursorState::Invalid;
282                self.current_entry = None;
283                return false;
284            }
285        };
286
287        if let Some(node) = self.tree.get_node(leaf_id) {
288            let node = recover_read_guard(&node);
289            if let Node::Leaf(leaf) = &*node {
290                if index < leaf.keys.len() {
291                    if let Some(value) = leaf.entries[index].get(&self.snapshot) {
292                        self.current_entry = Some((leaf.keys[index].clone(), value.clone()));
293                        self.state = CursorState::Valid;
294                        return true;
295                    }
296                }
297            }
298        }
299
300        self.state = CursorState::BeforeFirst;
301        self.current_entry = None;
302        false
303    }
304
305    /// Find leaf for key
306    fn find_leaf(&self, key: &K) -> Option<NodeId> {
307        let root_id = (*recover_read_guard(&self.tree.root))?;
308        self.find_leaf_from(root_id, key)
309    }
310
311    fn find_leaf_from(&self, node_id: NodeId, key: &K) -> Option<NodeId> {
312        let node = self.tree.get_node(node_id)?;
313        let node = recover_read_guard(&node);
314
315        match &*node {
316            Node::Internal(internal) => {
317                let child_id = internal.get_child(key)?;
318                drop(node);
319                self.find_leaf_from(child_id, key)
320            }
321            Node::Leaf(_) => Some(node_id),
322        }
323    }
324
325    /// Check if current index is within bounds
326    fn check_bounds(&self) -> bool {
327        if let Some(leaf_id) = self.current_leaf {
328            if let Some(node) = self.tree.get_node(leaf_id) {
329                let node = recover_read_guard(&node);
330                if let Node::Leaf(leaf) = &*node {
331                    return self.current_index < leaf.keys.len();
332                }
333            }
334        }
335        false
336    }
337
338    /// Populate `leaf_cache` with all visible entries from `current_leaf`.
339    ///
340    /// Acquires `nodes.read()` exactly once for the entire leaf, then releases
341    /// the lock. Subsequent `next()` calls drain the cache without re-locking.
342    /// Returns true if at least one visible entry was found.
343    fn fill_leaf_cache(&mut self) -> bool {
344        self.leaf_cache.clear();
345        self.leaf_cache_idx = 0;
346
347        let leaf_id = match self.current_leaf {
348            Some(id) => id,
349            None => return false,
350        };
351
352        if let Some(node) = self.tree.get_node(leaf_id) {
353            let node = recover_read_guard(&node);
354            if let Node::Leaf(leaf) = &*node {
355                // Skip entries before current_index (used by seek())
356                for i in self.current_index..leaf.keys.len() {
357                    if let Some(value) = leaf.entries[i].get(&self.snapshot) {
358                        self.leaf_cache.push((leaf.keys[i].clone(), value.clone()));
359                    }
360                }
361            }
362        }
363
364        !self.leaf_cache.is_empty()
365    }
366
367    /// Load current entry from leaf — uses `leaf_cache` when available,
368    /// falling back to `fill_leaf_cache()` on first entry of a new leaf.
369    fn load_current(&mut self) -> bool {
370        // If cache is warm and has entries, serve directly (no lock needed)
371        if self.leaf_cache_idx < self.leaf_cache.len() {
372            let entry = self.leaf_cache[self.leaf_cache_idx].clone();
373            self.current_entry = Some(entry);
374            self.state = CursorState::Valid;
375            return true;
376        }
377
378        // Cache empty — fill from current leaf (one lock acquisition)
379        if self.fill_leaf_cache() {
380            let entry = self.leaf_cache[0].clone();
381            self.current_entry = Some(entry);
382            self.state = CursorState::Valid;
383            true
384        } else {
385            // No visible entries in this leaf — move to next
386            self.move_to_next_leaf()
387        }
388    }
389
390    /// Move to next leaf
391    fn move_to_next_leaf(&mut self) -> bool {
392        let leaf_id = match self.current_leaf {
393            Some(id) => id,
394            None => {
395                self.state = CursorState::AfterLast;
396                return false;
397            }
398        };
399
400        // Read next sibling pointer — single lock acquisition, also resets cache
401        let next_leaf = if let Some(node) = self.tree.get_node(leaf_id) {
402            let node = recover_read_guard(&node);
403            if let Node::Leaf(leaf) = &*node {
404                leaf.next
405            } else {
406                None
407            }
408        } else {
409            None
410        };
411
412        match next_leaf {
413            Some(next_id) => {
414                self.current_leaf = Some(next_id);
415                self.current_index = 0;
416                // Clear cache — fill_leaf_cache() will populate it from the new leaf
417                self.leaf_cache.clear();
418                self.leaf_cache_idx = 0;
419                self.load_current()
420            }
421            None => {
422                self.state = CursorState::AfterLast;
423                self.current_entry = None;
424                false
425            }
426        }
427    }
428
429    /// Move to previous leaf
430    fn move_to_prev_leaf(&mut self) -> bool {
431        let leaf_id = match self.current_leaf {
432            Some(id) => id,
433            None => {
434                self.state = CursorState::BeforeFirst;
435                return false;
436            }
437        };
438
439        let prev_leaf = if let Some(node) = self.tree.get_node(leaf_id) {
440            let node = recover_read_guard(&node);
441            if let Node::Leaf(leaf) = &*node {
442                leaf.prev
443            } else {
444                None
445            }
446        } else {
447            None
448        };
449
450        match prev_leaf {
451            Some(prev_id) => {
452                self.current_leaf = Some(prev_id);
453                // Set to last entry in previous leaf
454                if let Some(node) = self.tree.get_node(prev_id) {
455                    let node = recover_read_guard(&node);
456                    if let Node::Leaf(leaf) = &*node {
457                        self.current_index = leaf.keys.len().saturating_sub(1);
458                    }
459                }
460                self.leaf_cache.clear();
461                self.leaf_cache_idx = 0;
462                self.load_current_at(self.current_index)
463            }
464            None => {
465                self.state = CursorState::BeforeFirst;
466                self.current_entry = None;
467                false
468            }
469        }
470    }
471}
472
473impl<'a, K, V> Iterator for Cursor<'a, K, V>
474where
475    K: Clone + Ord + Debug + Send + Sync,
476    V: Clone + Debug + Send + Sync,
477{
478    type Item = (K, V);
479
480    fn next(&mut self) -> Option<Self::Item> {
481        if self.state == CursorState::BeforeFirst {
482            if !self.first() {
483                return None;
484            }
485            self.current_entry.clone()
486        } else if self.next() {
487            self.current_entry.clone()
488        } else {
489            None
490        }
491    }
492}
493
494#[cfg(test)]
495mod tests {
496    use super::*;
497    use crate::storage::btree::{BPlusTree, BTreeConfig};
498
499    fn create_test_tree() -> BPlusTree<i32, String> {
500        use crate::storage::primitives::ids::TxnId;
501        let tree = BPlusTree::new(BTreeConfig::new().with_order(4));
502        for i in 1..=10 {
503            tree.insert(i, format!("v{}", i), TxnId(1));
504        }
505        tree
506    }
507
508    #[test]
509    fn test_cursor_forward() {
510        let tree = create_test_tree();
511        let snapshot = tree.snapshot();
512        let cursor = Cursor::new(&tree, snapshot);
513
514        let results: Vec<_> = cursor.collect();
515        assert_eq!(results.len(), 10);
516        assert_eq!(results[0], (1, "v1".to_string()));
517        assert_eq!(results[9], (10, "v10".to_string()));
518    }
519
520    #[test]
521    fn test_cursor_first_last() {
522        let tree = create_test_tree();
523        let snapshot = tree.snapshot();
524        let mut cursor = Cursor::new(&tree, snapshot);
525
526        // First - use UFCS to avoid Iterator::first() conflict
527        assert!(Cursor::first(&mut cursor));
528        assert_eq!(cursor.key(), Some(&1));
529
530        // Last - use UFCS to avoid Iterator::last() conflict
531        let mut cursor2 = Cursor::new(&tree, tree.snapshot());
532        assert!(Cursor::last(&mut cursor2));
533        assert_eq!(cursor2.key(), Some(&10));
534    }
535
536    #[test]
537    fn test_cursor_seek() {
538        let tree = create_test_tree();
539        let snapshot = tree.snapshot();
540        let mut cursor = Cursor::new(&tree, snapshot);
541
542        // Seek to existing key
543        assert!(cursor.seek(&5));
544        assert_eq!(cursor.key(), Some(&5));
545
546        // Seek to non-existing key (should find next)
547        assert!(cursor.seek(&7));
548        assert_eq!(cursor.key(), Some(&7));
549    }
550
551    #[test]
552    fn test_cursor_prev() {
553        let tree = create_test_tree();
554        let snapshot = tree.snapshot();
555        let mut cursor = Cursor::new(&tree, snapshot);
556
557        // Start at end - use UFCS to avoid Iterator::last() conflict
558        Cursor::last(&mut cursor);
559        assert_eq!(cursor.key(), Some(&10));
560
561        // Move backwards
562        cursor.prev();
563        assert_eq!(cursor.key(), Some(&9));
564
565        cursor.prev();
566        assert_eq!(cursor.key(), Some(&8));
567    }
568
569    #[test]
570    fn test_cursor_empty_tree() {
571        let tree: BPlusTree<i32, String> = BPlusTree::with_default_config();
572        let snapshot = tree.snapshot();
573        let mut cursor = Cursor::new(&tree, snapshot);
574
575        assert!(!cursor.first());
576        assert!(!cursor.is_valid());
577    }
578}