1use crate::{
5 check_parent, counter,
6 error::TreeError,
7 mini_page_op::{
8 upgrade_to_full_page, LeafEntrySLocked, LeafEntryXLocked, LeafOperations, MergeResult,
9 },
10 nodes::leaf_node::{GetScanRecordByPosResult, MiniPageNextLevel},
11 storage::PageLocation,
12 utils::{inner_lock::ReadGuard, Backoff},
13 BfTree,
14};
15
16pub(crate) enum ScanError {
17 NeedMergeMiniPage,
18}
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum ScanReturnField {
22 Key,
23 Value,
24 KeyAndValue,
25}
26
27pub(crate) enum ScanPosition {
28 Base(u32),
29 Full(u32),
30 }
32
33impl ScanPosition {
34 fn move_to_next(&mut self) {
35 match self {
36 ScanPosition::Base(offset) => *offset += 1,
37 ScanPosition::Full(offset) => *offset += 1,
38 }
39 }
40}
41
42enum ScanLock<'b> {
46 S(LeafEntrySLocked<'b>),
47 X(LeafEntryXLocked<'b>),
48}
49
50impl ScanLock<'_> {
51 fn get_record_by_pos_with_bound(
52 &self,
53 pos: &ScanPosition,
54 out_buffer: &mut [u8],
55 return_field: ScanReturnField,
56 end_key: &Option<Vec<u8>>,
57 ) -> GetScanRecordByPosResult {
58 match self {
59 ScanLock::S(leaf) => {
60 leaf.scan_record_by_pos_with_bound(pos, out_buffer, return_field, end_key)
61 }
62 ScanLock::X(leaf) => {
63 leaf.scan_record_by_pos_with_bound(pos, out_buffer, return_field, end_key)
64 }
65 }
66 }
67
68 fn get_right_sibling(&mut self) -> Vec<u8> {
69 match self {
70 ScanLock::S(leaf) => leaf.get_right_sibling(),
71 ScanLock::X(leaf) => leaf.get_right_sibling(),
72 }
73 }
74}
75
76pub struct ScanIterMut<'a, 'b: 'a> {
77 tree: &'b BfTree,
78 scan_cnt: usize,
79
80 scan_position: ScanPosition,
81
82 leaf_lock: LeafEntryXLocked<'a>,
83
84 return_field: ScanReturnField,
85
86 end_key: Option<Vec<u8>>,
87}
88
89impl<'b> ScanIterMut<'_, 'b> {
90 pub fn new_with_scan_count(
91 tree: &'b BfTree,
92 start_key: &'b [u8],
93 scan_cnt: usize,
94 return_field: ScanReturnField,
95 ) -> Self {
96 let backoff = Backoff::new();
97 let mut aggressive_split = false;
98
99 loop {
100 let (scan_pos, lock) = match move_cursor_to_leaf_mut(tree, start_key, aggressive_split)
101 {
102 Ok((pos, lock)) => (pos, lock),
103 Err(TreeError::Locked) => {
104 backoff.spin();
105 continue;
106 }
107 Err(TreeError::NeedRestart) => {
108 aggressive_split = true;
109 backoff.spin();
110 continue;
111 }
112 Err(TreeError::CircularBufferFull) => {
113 _ = tree.evict_from_circular_buffer();
114 aggressive_split = true;
115 continue;
116 }
117 };
118
119 return Self {
120 tree,
121 scan_cnt,
122 scan_position: scan_pos,
123 leaf_lock: lock,
124 return_field,
125 end_key: None,
126 };
127 }
128 }
129
130 pub fn new_with_end_key(
131 tree: &'b BfTree,
132 start_key: &'b [u8],
133 end_key: &[u8],
134 return_field: ScanReturnField,
135 ) -> Self {
136 let mut si = Self::new_with_scan_count(tree, start_key, usize::MAX, return_field);
137 si.end_key = Some(end_key.to_vec());
138 si
139 }
140
141 pub fn next(&mut self, out_buffer: &mut [u8]) -> Option<(usize, usize)> {
142 if self.scan_cnt == 0 && self.end_key.is_none() {
143 return None;
144 }
145
146 match self.leaf_lock.scan_record_by_pos_with_bound(
147 &self.scan_position,
148 out_buffer,
149 self.return_field,
150 &self.end_key,
151 ) {
152 GetScanRecordByPosResult::Deleted => {
153 self.scan_position.move_to_next();
154 self.next(out_buffer)
155 }
156 GetScanRecordByPosResult::Found(key_len, value_len) => {
157 self.scan_position.move_to_next();
158 self.scan_cnt -= 1;
159
160 match self.leaf_lock.get_page_location() {
162 PageLocation::Base(_offset) => {
163 self.leaf_lock.load_base_page_mut();
164 }
165 PageLocation::Full(_) => {
166 }
168 PageLocation::Mini(_) => {
169 unreachable!()
170 }
171 PageLocation::Null => panic!("range_scan next on Null page"),
172 }
173 Some((key_len as usize, value_len as usize))
174 }
175 GetScanRecordByPosResult::EndOfLeaf => {
176 let right_sibling = self.leaf_lock.get_right_sibling();
178
179 if right_sibling.is_empty() {
180 self.scan_cnt = 0;
181 return None;
182 }
183
184 let backoff = Backoff::new();
185
186 let mut aggressive_split = false;
187 loop {
188 let (pos, lock) = match move_cursor_to_leaf_mut(
189 self.tree,
190 &right_sibling,
191 aggressive_split,
192 ) {
193 Ok((pos, lock)) => (pos, lock),
194 Err(TreeError::Locked) => {
195 backoff.spin();
196 continue;
197 }
198 Err(TreeError::CircularBufferFull) => {
199 aggressive_split = true;
203 continue;
204 }
205 Err(TreeError::NeedRestart) => {
206 aggressive_split = true;
207 backoff.spin();
208 continue;
209 }
210 };
211 self.scan_position = pos;
212 self.leaf_lock = lock;
213 break;
214 }
215 self.next(out_buffer)
216 }
217 GetScanRecordByPosResult::BoundKeyExceeded => {
218 self.scan_cnt = 0;
219 None
220 }
221 }
222 }
223}
224
225pub struct ScanIter<'a, 'b: 'a> {
227 tree: &'b BfTree,
228 scan_cnt: usize,
229
230 scan_position: ScanPosition,
231
232 leaf_lock: ScanLock<'a>,
233
234 return_field: ScanReturnField,
235
236 end_key: Option<Vec<u8>>,
237}
238
239impl<'b> ScanIter<'_, 'b> {
240 pub fn new_with_scan_count(
241 tree: &'b BfTree,
242 start_key: &[u8],
243 scan_cnt: usize,
244 return_field: ScanReturnField,
245 ) -> Self {
246 let backoff = Backoff::new();
247 let mut aggressive_split = false;
248
249 loop {
250 let (scan_pos, lock) = match move_cursor_to_leaf(tree, start_key, aggressive_split) {
251 Ok((pos, lock)) => (pos, lock),
252 Err(TreeError::Locked) => {
253 backoff.spin();
254 continue;
255 }
256 Err(TreeError::NeedRestart) => {
257 aggressive_split = true;
258 backoff.spin();
259 continue;
260 }
261 Err(TreeError::CircularBufferFull) => {
262 _ = tree.evict_from_circular_buffer();
263 aggressive_split = true;
264 continue;
265 }
266 };
267
268 return Self {
269 tree,
270 scan_cnt,
271 scan_position: scan_pos,
272 leaf_lock: lock,
273 return_field,
274 end_key: None,
275 };
276 }
277 }
278
279 pub fn new_with_end_key(
280 tree: &'b BfTree,
281 start_key: &[u8],
282 end_key: &[u8],
283 return_field: ScanReturnField,
284 ) -> Self {
285 let mut si = Self::new_with_scan_count(tree, start_key, usize::MAX, return_field);
286 si.end_key = Some(end_key.to_vec());
287 si
288 }
289
290 pub fn next(&mut self, out_buffer: &mut [u8]) -> Option<(usize, usize)> {
295 if self.scan_cnt == 0 && self.end_key.is_none() {
296 return None;
297 }
298
299 match self.leaf_lock.get_record_by_pos_with_bound(
300 &self.scan_position,
301 out_buffer,
302 self.return_field,
303 &self.end_key,
304 ) {
305 GetScanRecordByPosResult::Deleted => {
306 self.scan_position.move_to_next();
307 self.next(out_buffer)
308 }
309 GetScanRecordByPosResult::Found(key_len, value_len) => {
310 self.scan_position.move_to_next();
311 self.scan_cnt -= 1;
312 Some((key_len as usize, value_len as usize))
313 }
314 GetScanRecordByPosResult::EndOfLeaf => {
315 counter!(ScanGoNextLeaf);
317 let right_sibling = self.leaf_lock.get_right_sibling();
318
319 if right_sibling.is_empty() {
320 self.scan_cnt = 0;
321 return None;
322 }
323
324 let backoff = Backoff::new();
325
326 let mut aggressive_split = false;
327 loop {
328 let (pos, lock) =
329 match move_cursor_to_leaf(self.tree, &right_sibling, aggressive_split) {
330 Ok((pos, lock)) => (pos, lock),
331 Err(TreeError::Locked) => {
332 backoff.spin();
333 continue;
334 }
335 Err(TreeError::CircularBufferFull) => {
336 aggressive_split = true;
342 continue;
343 }
344 Err(TreeError::NeedRestart) => {
345 aggressive_split = true;
346 backoff.spin();
347 continue;
348 }
349 };
350 self.scan_position = pos;
351 self.leaf_lock = lock;
352 break;
353 }
354 self.next(out_buffer)
355 }
356 GetScanRecordByPosResult::BoundKeyExceeded => {
357 self.scan_cnt = 0;
358 None
359 }
360 }
361 }
362}
363
364fn promote_or_merge_mini_page<'a>(
365 tree: &'a BfTree,
366 key: &[u8],
367 leaf: &mut LeafEntryXLocked<'a>,
368 parent: ReadGuard<'a>,
369) -> Result<ScanPosition, TreeError> {
370 let page_loc = leaf.get_page_location();
371 match page_loc {
372 PageLocation::Full(_) => {
373 unreachable!()
374 }
375 PageLocation::Base(offset) => {
376 counter!(ScanPromoteBaseToFull);
377 let next_level = MiniPageNextLevel::new(*offset);
379 let base_page_ref = leaf.load_base_page_from_buffer();
380 let pos = base_page_ref.lower_bound(key);
381
382 if base_page_ref.meta.meta_count_without_fence() > 0 {
384 let full_page_loc = upgrade_to_full_page(&tree.storage, base_page_ref, next_level)?;
385
386 leaf.create_cache_page_loc(full_page_loc);
387
388 Ok(ScanPosition::Full(pos as u32))
389 } else {
390 Ok(ScanPosition::Base(pos as u32))
391 }
392 }
393 PageLocation::Mini(ptr) => {
394 counter!(ScanMergeMiniPage);
395 let mini_page = leaf.load_cache_page_mut(*ptr);
396 let h = tree.storage.begin_dealloc_mini_page(mini_page)?;
398 let merge_result = leaf.try_merge_mini_page(&h, parent, &tree.storage)?;
399
400 match merge_result {
401 MergeResult::NoSplit => {
402 if tree.should_promote_scan_page() {
407 let base_offset = mini_page.next_level;
409 leaf.change_to_base_loc();
410 tree.storage.finish_dealloc_mini_page(h);
411
412 let base_page_ref = leaf.load_base_page_from_buffer();
413 let pos = base_page_ref.lower_bound(key);
414 if base_page_ref.meta.meta_count_without_fence() > 0 {
415 let full_page_loc =
416 upgrade_to_full_page(&tree.storage, base_page_ref, base_offset)?;
417
418 leaf.create_cache_page_loc(full_page_loc);
419 Ok(ScanPosition::Full(pos as u32))
420 } else {
421 Ok(ScanPosition::Base(pos as u32))
422 }
423 } else {
424 leaf.change_to_base_loc();
425 tree.storage.finish_dealloc_mini_page(h);
426 let base_ref = leaf.load_base_page_from_buffer();
427 let pos = base_ref.lower_bound(key);
428 Ok(ScanPosition::Base(pos as u32))
429 }
430 }
431 MergeResult::MergeAndSplit => {
432 leaf.change_to_base_loc();
434 tree.storage.finish_dealloc_mini_page(h);
435
436 Err(TreeError::NeedRestart)
440 }
441 }
442 }
443 PageLocation::Null => panic!("promote_or_merge_mini_page on Null page"),
444 }
445}
446
447fn move_cursor_to_leaf_mut<'a>(
448 tree: &'a BfTree,
449 key: &[u8],
450 aggressive_split: bool,
451) -> Result<(ScanPosition, LeafEntryXLocked<'a>), TreeError> {
452 let (pid, parent) = tree.traverse_to_leaf(key, aggressive_split)?;
453
454 let mut leaf = tree.mapping_table().get_mut(&pid);
455
456 check_parent!(tree, pid, parent);
457
458 if let Ok(pos) = leaf.get_scan_position(key) {
459 match pos {
460 ScanPosition::Base(_) => {
461 if !tree.should_promote_scan_page() {
462 return Ok((pos, leaf));
463 }
464 }
466 ScanPosition::Full(_) => {
467 return Ok((pos, leaf));
468 }
469 }
470 }
471
472 let v = promote_or_merge_mini_page(tree, key, &mut leaf, parent.unwrap())?;
475 Ok((v, leaf))
476}
477
478fn move_cursor_to_leaf<'a>(
479 tree: &'a BfTree,
480 key: &[u8],
481 aggressive_split: bool,
482) -> Result<(ScanPosition, ScanLock<'a>), TreeError> {
483 let (pid, parent) = tree.traverse_to_leaf(key, aggressive_split)?;
484
485 let mut leaf = tree.mapping_table().get(&pid);
486
487 check_parent!(tree, pid, parent);
488
489 if let Ok(pos) = leaf.get_scan_position(key) {
490 match pos {
491 ScanPosition::Base(_) => {
492 counter!(ScanBasePage);
493 if parent.is_none() || !tree.should_promote_scan_page() {
494 return Ok((pos, ScanLock::S(leaf)));
495 }
496 }
498 ScanPosition::Full(_) => {
499 counter!(ScanFullPage);
500 return Ok((pos, ScanLock::S(leaf)));
501 }
502 }
503 }
504
505 let mut x_leaf = leaf.try_upgrade().map_err(|_e| TreeError::Locked)?;
507
508 let v = promote_or_merge_mini_page(tree, key, &mut x_leaf, parent.unwrap())?;
509 Ok((v, ScanLock::X(x_leaf)))
510}
511
512#[cfg(test)]
513mod tests {
514 use crate::utils::test_util::install_value_to_buffer;
515 use crate::BfTree;
516 use crate::{LeafInsertResult, ScanReturnField};
517 use std::mem::size_of;
518
519 #[test]
520 fn test_scan_with_count() {
521 let tree = BfTree::default();
522
523 let key_len: usize = tree.config.max_fence_len / 2;
525 let mut key_buffer = vec![0; key_len / size_of::<usize>()];
526
527 let value_len: usize = 1024; let mut value_buffer = vec![0; value_len / size_of::<usize>()];
529
530 for i in 0..1_000 {
531 let key = install_value_to_buffer(&mut key_buffer, i);
532 let value = install_value_to_buffer(&mut value_buffer, i);
533 if tree.insert(key, value) != LeafInsertResult::Success {
534 panic!("Insert failed");
535 }
536 }
537
538 let mut start_key = install_value_to_buffer(&mut key_buffer, 0);
540 let r = tree.scan_with_count(start_key, 0, ScanReturnField::Value);
541 match r {
542 Err(e) => {
543 assert_eq!(e, crate::ScanIterError::InvalidCount);
544 }
545 _ => {
546 panic!("Should not succeed");
547 }
548 }
549
550 let mut output_buffer = vec![0u8; key_len + value_len];
552 let mut prev_key = vec![0u8; key_len];
553 for _ in 0..9 {
554 start_key = &prev_key.as_slice().as_ref();
555 let mut scan_iter = tree
556 .scan_with_count(start_key, 101, ScanReturnField::KeyAndValue)
557 .expect("Scan failed");
558
559 let mut cnt = 0;
560 while let Some((kl, vl)) = scan_iter.next(&mut output_buffer) {
561 let scanned_key = &output_buffer[0..kl];
562 assert!(kl == key_len);
563
564 if cnt != 0 {
565 let cmp_res = scanned_key.cmp(&prev_key);
566 if cmp_res == std::cmp::Ordering::Less {
567 panic!("Keys are not in order");
568 }
569 assert_eq!(cmp_res, std::cmp::Ordering::Greater);
570 }
571
572 prev_key[..kl].copy_from_slice(scanned_key);
573
574 assert!(vl == value_len);
575 cnt += 1;
576 }
577 assert!(cnt == 101);
578 }
579
580 start_key = &prev_key.as_slice().as_ref();
582 let mut scan_iter = tree
583 .scan_with_count(start_key, 120, ScanReturnField::Key)
584 .expect("Scan failed");
585 let mut cnt = 0;
586
587 while let Some((kl, vl)) = scan_iter.next(&mut output_buffer) {
588 let scanned_key = &output_buffer[0..kl];
589 assert!(kl == key_len);
590
591 if cnt != 0 {
592 let cmp_res = scanned_key.cmp(&prev_key);
593 assert_eq!(cmp_res, std::cmp::Ordering::Greater);
594 }
595 prev_key[..kl].copy_from_slice(scanned_key);
596 assert!(vl == 0);
597 cnt += 1;
598 }
599 assert!(cnt == 100);
600 }
601
602 #[test]
603 fn test_scan_with_end_key() {
604 let tree = BfTree::default();
605
606 let key_len: usize = tree.config.max_fence_len / 2;
608 let mut key_buffer = vec![0; key_len / size_of::<usize>()];
609
610 let value_len: usize = 1024; let mut value_buffer = vec![0; value_len / size_of::<usize>()];
612
613 for i in 0..1_000 {
614 let key = install_value_to_buffer(&mut key_buffer, i);
615 let value = install_value_to_buffer(&mut value_buffer, i);
616 if tree.insert(key, value) != LeafInsertResult::Success {
617 panic!("Insert failed");
618 }
619 }
620
621 let mut start_key = install_value_to_buffer(&mut key_buffer, 1);
623 let mut invalid_key_buffer: Vec<usize> = vec![0; key_len / size_of::<usize>() + 1];
624 let mut invalid_key = install_value_to_buffer(&mut invalid_key_buffer, 1);
625
626 let mut r = tree.scan_with_end_key(start_key, invalid_key, ScanReturnField::Value);
627 match r {
628 Err(e) => {
629 assert_eq!(e, crate::ScanIterError::InvalidEndKey);
630 }
631 _ => {
632 panic!("Should not succeed");
633 }
634 }
635
636 invalid_key = install_value_to_buffer(&mut invalid_key_buffer, 0);
637
638 r = tree.scan_with_end_key(invalid_key, start_key, ScanReturnField::Value);
639 match r {
640 Err(e) => {
641 assert_eq!(e, crate::ScanIterError::InvalidStartKey);
642 }
643 _ => {
644 panic!("Should not succeed");
645 }
646 }
647
648 let mut end_key_buffer = vec![0; key_len / size_of::<usize>()];
649 let mut end_key = install_value_to_buffer(&mut end_key_buffer, 0);
650
651 r = tree.scan_with_end_key(start_key, end_key, ScanReturnField::Value);
652 match r {
653 Err(e) => {
654 assert_eq!(e, crate::ScanIterError::InvalidKeyRange);
655 }
656 _ => {
657 panic!("Should not succeed");
658 }
659 }
660
661 start_key = install_value_to_buffer(&mut key_buffer, 0);
662 end_key = install_value_to_buffer(&mut end_key_buffer, 777);
663
664 let mut scan_iter = tree
665 .scan_with_end_key(start_key, end_key, ScanReturnField::Key)
666 .expect("Scan failed");
667 let mut output_buffer = vec![0u8; key_len];
668 let mut prev_key = vec![0u8; key_len];
669 let mut cnt = 0;
670
671 while let Some((kl, vl)) = scan_iter.next(&mut output_buffer) {
672 let scanned_key = &output_buffer[0..kl];
673 assert!(kl == key_len);
674
675 if cnt != 0 {
676 let cmp_res = scanned_key.cmp(&prev_key);
677 assert_eq!(cmp_res, std::cmp::Ordering::Greater);
678 }
679 prev_key[..kl].copy_from_slice(scanned_key);
680
681 let cmp_res = scanned_key.cmp(end_key);
682 assert!(cmp_res == std::cmp::Ordering::Less || cmp_res == std::cmp::Ordering::Equal);
683
684 assert!(vl == 0);
685 cnt += 1;
686 }
687
688 let cmp_res = prev_key.as_slice().cmp(end_key);
689 assert!(cmp_res == std::cmp::Ordering::Equal);
690 assert!(cnt == 40);
691 }
692}