reddb_server/storage/btree/
cursor.rs1use super::node::{Node, NodeId};
6use super::tree::BPlusTree;
7use super::version::Snapshot;
8use std::fmt::Debug;
9use std::sync::{RwLock, RwLockReadGuard};
10
11fn recover_read_guard<'a, T>(lock: &'a RwLock<T>) -> RwLockReadGuard<'a, T> {
12 match lock.read() {
13 Ok(guard) => guard,
14 Err(poisoned) => poisoned.into_inner(),
15 }
16}
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20pub enum CursorDirection {
21 Forward,
23 Backward,
25}
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq)]
29pub enum CursorState {
30 BeforeFirst,
32 Valid,
34 AfterLast,
36 Invalid,
38}
39
40pub struct Cursor<'a, K, V>
42where
43 K: Clone + Ord + Debug + Send + Sync,
44 V: Clone + Debug + Send + Sync,
45{
46 tree: &'a BPlusTree<K, V>,
48 snapshot: Snapshot,
50 current_leaf: Option<NodeId>,
52 current_index: usize,
54 direction: CursorDirection,
56 state: CursorState,
58 current_entry: Option<(K, V)>,
60 leaf_cache: Vec<(K, V)>,
66 leaf_cache_idx: usize,
68}
69
70impl<'a, K, V> Cursor<'a, K, V>
71where
72 K: Clone + Ord + Debug + Send + Sync,
73 V: Clone + Debug + Send + Sync,
74{
75 pub fn new(tree: &'a BPlusTree<K, V>, snapshot: Snapshot) -> Self {
77 Self {
78 tree,
79 snapshot,
80 current_leaf: None,
81 current_index: 0,
82 direction: CursorDirection::Forward,
83 state: CursorState::BeforeFirst,
84 current_entry: None,
85 leaf_cache: Vec::new(),
86 leaf_cache_idx: 0,
87 }
88 }
89
90 pub fn at_key(tree: &'a BPlusTree<K, V>, snapshot: Snapshot, key: &K) -> Self {
92 let mut cursor = Self::new(tree, snapshot);
93 cursor.seek(key);
94 cursor
95 }
96
97 pub fn reverse(tree: &'a BPlusTree<K, V>, snapshot: Snapshot) -> Self {
99 Self {
100 tree,
101 snapshot,
102 current_leaf: None,
103 current_index: 0,
104 direction: CursorDirection::Backward,
105 state: CursorState::AfterLast,
106 current_entry: None,
107 leaf_cache: Vec::new(),
108 leaf_cache_idx: 0,
109 }
110 }
111
112 pub fn state(&self) -> CursorState {
114 self.state
115 }
116
117 pub fn is_valid(&self) -> bool {
119 self.state == CursorState::Valid
120 }
121
122 pub fn key(&self) -> Option<&K> {
124 self.current_entry.as_ref().map(|(k, _)| k)
125 }
126
127 pub fn value(&self) -> Option<&V> {
129 self.current_entry.as_ref().map(|(_, v)| v)
130 }
131
132 pub fn entry(&self) -> Option<(&K, &V)> {
134 self.current_entry.as_ref().map(|(k, v)| (k, v))
135 }
136
137 pub fn first(&mut self) -> bool {
139 let first_leaf = match *recover_read_guard(&self.tree.first_leaf) {
141 Some(id) => id,
142 None => {
143 self.state = CursorState::AfterLast;
144 return false;
145 }
146 };
147
148 self.current_leaf = Some(first_leaf);
149 self.current_index = 0;
150 self.direction = CursorDirection::Forward;
151 self.leaf_cache.clear();
153 self.leaf_cache_idx = 0;
154
155 self.load_current()
156 }
157
158 pub fn last(&mut self) -> bool {
160 let mut leaf_id = match *recover_read_guard(&self.tree.first_leaf) {
162 Some(id) => id,
163 None => {
164 self.state = CursorState::BeforeFirst;
165 return false;
166 }
167 };
168
169 while let Some(node) = self.tree.get_node(leaf_id) {
171 let node = recover_read_guard(&node);
172 if let Node::Leaf(leaf) = &*node {
173 match leaf.next {
174 Some(next_id) => {
175 leaf_id = next_id;
176 }
177 None => break,
178 }
179 } else {
180 break;
181 }
182 }
183
184 self.current_leaf = Some(leaf_id);
185
186 if let Some(node) = self.tree.get_node(leaf_id) {
188 let node = recover_read_guard(&node);
189 if let Node::Leaf(leaf) = &*node {
190 self.current_index = leaf.keys.len().saturating_sub(1);
191 }
192 }
193
194 self.direction = CursorDirection::Backward;
195 self.leaf_cache.clear();
198 self.leaf_cache_idx = 0;
199 self.load_current_at(self.current_index)
200 }
201
202 pub fn seek(&mut self, key: &K) -> bool {
204 let leaf_id = match self.find_leaf(key) {
206 Some(id) => id,
207 None => {
208 self.state = CursorState::AfterLast;
209 return false;
210 }
211 };
212
213 self.current_leaf = Some(leaf_id);
214
215 if let Some(node) = self.tree.get_node(leaf_id) {
217 let node = recover_read_guard(&node);
218 if let Node::Leaf(leaf) = &*node {
219 match leaf.keys.binary_search(key) {
220 Ok(i) => self.current_index = i,
221 Err(i) => self.current_index = i,
222 }
223 }
224 }
225
226 self.leaf_cache.clear();
228 self.leaf_cache_idx = 0;
229
230 self.load_current()
231 }
232
233 pub fn next(&mut self) -> bool {
235 match self.state {
236 CursorState::BeforeFirst => self.first(),
237 CursorState::AfterLast | CursorState::Invalid => false,
238 CursorState::Valid => {
239 self.leaf_cache_idx += 1;
241 if self.leaf_cache_idx < self.leaf_cache.len() {
242 let entry = self.leaf_cache[self.leaf_cache_idx].clone();
243 self.current_entry = Some(entry);
244 return true;
245 }
246 self.move_to_next_leaf()
248 }
249 }
250 }
251
252 pub fn prev(&mut self) -> bool {
254 match self.state {
255 CursorState::AfterLast => self.last(),
256 CursorState::BeforeFirst | CursorState::Invalid => false,
257 CursorState::Valid => {
258 if self.current_index == 0 {
259 self.move_to_prev_leaf()
261 } else {
262 self.current_index -= 1;
263 self.leaf_cache.clear();
266 self.leaf_cache_idx = 0;
267 self.load_current_at(self.current_index)
268 }
269 }
270 }
271 }
272
273 fn load_current_at(&mut self, index: usize) -> bool {
278 let leaf_id = match self.current_leaf {
279 Some(id) => id,
280 None => {
281 self.state = CursorState::Invalid;
282 self.current_entry = None;
283 return false;
284 }
285 };
286
287 if let Some(node) = self.tree.get_node(leaf_id) {
288 let node = recover_read_guard(&node);
289 if let Node::Leaf(leaf) = &*node {
290 if index < leaf.keys.len() {
291 if let Some(value) = leaf.entries[index].get(&self.snapshot) {
292 self.current_entry = Some((leaf.keys[index].clone(), value.clone()));
293 self.state = CursorState::Valid;
294 return true;
295 }
296 }
297 }
298 }
299
300 self.state = CursorState::BeforeFirst;
301 self.current_entry = None;
302 false
303 }
304
305 fn find_leaf(&self, key: &K) -> Option<NodeId> {
307 let root_id = (*recover_read_guard(&self.tree.root))?;
308 self.find_leaf_from(root_id, key)
309 }
310
311 fn find_leaf_from(&self, node_id: NodeId, key: &K) -> Option<NodeId> {
312 let node = self.tree.get_node(node_id)?;
313 let node = recover_read_guard(&node);
314
315 match &*node {
316 Node::Internal(internal) => {
317 let child_id = internal.get_child(key)?;
318 drop(node);
319 self.find_leaf_from(child_id, key)
320 }
321 Node::Leaf(_) => Some(node_id),
322 }
323 }
324
325 fn check_bounds(&self) -> bool {
327 if let Some(leaf_id) = self.current_leaf {
328 if let Some(node) = self.tree.get_node(leaf_id) {
329 let node = recover_read_guard(&node);
330 if let Node::Leaf(leaf) = &*node {
331 return self.current_index < leaf.keys.len();
332 }
333 }
334 }
335 false
336 }
337
338 fn fill_leaf_cache(&mut self) -> bool {
344 self.leaf_cache.clear();
345 self.leaf_cache_idx = 0;
346
347 let leaf_id = match self.current_leaf {
348 Some(id) => id,
349 None => return false,
350 };
351
352 if let Some(node) = self.tree.get_node(leaf_id) {
353 let node = recover_read_guard(&node);
354 if let Node::Leaf(leaf) = &*node {
355 for i in self.current_index..leaf.keys.len() {
357 if let Some(value) = leaf.entries[i].get(&self.snapshot) {
358 self.leaf_cache.push((leaf.keys[i].clone(), value.clone()));
359 }
360 }
361 }
362 }
363
364 !self.leaf_cache.is_empty()
365 }
366
367 fn load_current(&mut self) -> bool {
370 if self.leaf_cache_idx < self.leaf_cache.len() {
372 let entry = self.leaf_cache[self.leaf_cache_idx].clone();
373 self.current_entry = Some(entry);
374 self.state = CursorState::Valid;
375 return true;
376 }
377
378 if self.fill_leaf_cache() {
380 let entry = self.leaf_cache[0].clone();
381 self.current_entry = Some(entry);
382 self.state = CursorState::Valid;
383 true
384 } else {
385 self.move_to_next_leaf()
387 }
388 }
389
390 fn move_to_next_leaf(&mut self) -> bool {
392 let leaf_id = match self.current_leaf {
393 Some(id) => id,
394 None => {
395 self.state = CursorState::AfterLast;
396 return false;
397 }
398 };
399
400 let next_leaf = if let Some(node) = self.tree.get_node(leaf_id) {
402 let node = recover_read_guard(&node);
403 if let Node::Leaf(leaf) = &*node {
404 leaf.next
405 } else {
406 None
407 }
408 } else {
409 None
410 };
411
412 match next_leaf {
413 Some(next_id) => {
414 self.current_leaf = Some(next_id);
415 self.current_index = 0;
416 self.leaf_cache.clear();
418 self.leaf_cache_idx = 0;
419 self.load_current()
420 }
421 None => {
422 self.state = CursorState::AfterLast;
423 self.current_entry = None;
424 false
425 }
426 }
427 }
428
429 fn move_to_prev_leaf(&mut self) -> bool {
431 let leaf_id = match self.current_leaf {
432 Some(id) => id,
433 None => {
434 self.state = CursorState::BeforeFirst;
435 return false;
436 }
437 };
438
439 let prev_leaf = if let Some(node) = self.tree.get_node(leaf_id) {
440 let node = recover_read_guard(&node);
441 if let Node::Leaf(leaf) = &*node {
442 leaf.prev
443 } else {
444 None
445 }
446 } else {
447 None
448 };
449
450 match prev_leaf {
451 Some(prev_id) => {
452 self.current_leaf = Some(prev_id);
453 if let Some(node) = self.tree.get_node(prev_id) {
455 let node = recover_read_guard(&node);
456 if let Node::Leaf(leaf) = &*node {
457 self.current_index = leaf.keys.len().saturating_sub(1);
458 }
459 }
460 self.leaf_cache.clear();
461 self.leaf_cache_idx = 0;
462 self.load_current_at(self.current_index)
463 }
464 None => {
465 self.state = CursorState::BeforeFirst;
466 self.current_entry = None;
467 false
468 }
469 }
470 }
471}
472
473impl<'a, K, V> Iterator for Cursor<'a, K, V>
474where
475 K: Clone + Ord + Debug + Send + Sync,
476 V: Clone + Debug + Send + Sync,
477{
478 type Item = (K, V);
479
480 fn next(&mut self) -> Option<Self::Item> {
481 if self.state == CursorState::BeforeFirst {
482 if !self.first() {
483 return None;
484 }
485 self.current_entry.clone()
486 } else if self.next() {
487 self.current_entry.clone()
488 } else {
489 None
490 }
491 }
492}
493
494#[cfg(test)]
495mod tests {
496 use super::*;
497 use crate::storage::btree::{BPlusTree, BTreeConfig};
498
499 fn create_test_tree() -> BPlusTree<i32, String> {
500 use crate::storage::primitives::ids::TxnId;
501 let tree = BPlusTree::new(BTreeConfig::new().with_order(4));
502 for i in 1..=10 {
503 tree.insert(i, format!("v{}", i), TxnId(1));
504 }
505 tree
506 }
507
508 #[test]
509 fn test_cursor_forward() {
510 let tree = create_test_tree();
511 let snapshot = tree.snapshot();
512 let cursor = Cursor::new(&tree, snapshot);
513
514 let results: Vec<_> = cursor.collect();
515 assert_eq!(results.len(), 10);
516 assert_eq!(results[0], (1, "v1".to_string()));
517 assert_eq!(results[9], (10, "v10".to_string()));
518 }
519
520 #[test]
521 fn test_cursor_first_last() {
522 let tree = create_test_tree();
523 let snapshot = tree.snapshot();
524 let mut cursor = Cursor::new(&tree, snapshot);
525
526 assert!(Cursor::first(&mut cursor));
528 assert_eq!(cursor.key(), Some(&1));
529
530 let mut cursor2 = Cursor::new(&tree, tree.snapshot());
532 assert!(Cursor::last(&mut cursor2));
533 assert_eq!(cursor2.key(), Some(&10));
534 }
535
536 #[test]
537 fn test_cursor_seek() {
538 let tree = create_test_tree();
539 let snapshot = tree.snapshot();
540 let mut cursor = Cursor::new(&tree, snapshot);
541
542 assert!(cursor.seek(&5));
544 assert_eq!(cursor.key(), Some(&5));
545
546 assert!(cursor.seek(&7));
548 assert_eq!(cursor.key(), Some(&7));
549 }
550
551 #[test]
552 fn test_cursor_prev() {
553 let tree = create_test_tree();
554 let snapshot = tree.snapshot();
555 let mut cursor = Cursor::new(&tree, snapshot);
556
557 Cursor::last(&mut cursor);
559 assert_eq!(cursor.key(), Some(&10));
560
561 cursor.prev();
563 assert_eq!(cursor.key(), Some(&9));
564
565 cursor.prev();
566 assert_eq!(cursor.key(), Some(&8));
567 }
568
569 #[test]
570 fn test_cursor_empty_tree() {
571 let tree: BPlusTree<i32, String> = BPlusTree::with_default_config();
572 let snapshot = tree.snapshot();
573 let mut cursor = Cursor::new(&tree, snapshot);
574
575 assert!(!cursor.first());
576 assert!(!cursor.is_valid());
577 }
578}