1use crate::{
5 check_parent, counter,
6 error::TreeError,
7 mini_page_op::{
8 upgrade_to_full_page, LeafEntrySLocked, LeafEntryXLocked, LeafOperations, MergeResult,
9 },
10 nodes::leaf_node::{GetScanRecordByPosResult, MiniPageNextLevel},
11 storage::PageLocation,
12 utils::{inner_lock::ReadGuard, Backoff},
13 BfTree,
14};
15
16pub(crate) enum ScanError {
17 NeedMergeMiniPage,
18}
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum ScanReturnField {
22 Key,
23 Value,
24 KeyAndValue,
25}
26
27pub(crate) enum ScanPosition {
28 Base(u32),
29 Full(u32),
30 Mini(u32), Null, }
33
34impl ScanPosition {
35 fn move_to_next(&mut self) {
36 match self {
37 ScanPosition::Base(offset) => *offset += 1,
38 ScanPosition::Full(offset) => *offset += 1,
39 ScanPosition::Mini(offset) => *offset += 1,
40 ScanPosition::Null => panic!("move_to_next on Null page"),
41 }
42 }
43}
44
45enum ScanLock<'b> {
49 S(LeafEntrySLocked<'b>),
50 X(LeafEntryXLocked<'b>),
51}
52
53impl ScanLock<'_> {
54 fn get_record_by_pos_with_bound(
55 &self,
56 pos: &ScanPosition,
57 out_buffer: &mut [u8],
58 return_field: ScanReturnField,
59 end_key: &Option<Vec<u8>>,
60 ) -> GetScanRecordByPosResult {
61 match self {
62 ScanLock::S(leaf) => {
63 leaf.scan_record_by_pos_with_bound(pos, out_buffer, return_field, end_key)
64 }
65 ScanLock::X(leaf) => {
66 leaf.scan_record_by_pos_with_bound(pos, out_buffer, return_field, end_key)
67 }
68 }
69 }
70
71 fn get_right_sibling(&mut self) -> Vec<u8> {
72 match self {
73 ScanLock::S(leaf) => leaf.get_right_sibling(),
74 ScanLock::X(leaf) => leaf.get_right_sibling(),
75 }
76 }
77}
78
79pub struct ScanIterMut<'a, 'b: 'a> {
80 tree: &'b BfTree,
81 scan_cnt: usize,
82
83 scan_position: ScanPosition,
84
85 leaf_lock: LeafEntryXLocked<'a>,
86
87 return_field: ScanReturnField,
88
89 end_key: Option<Vec<u8>>,
90}
91
92impl<'b> ScanIterMut<'_, 'b> {
93 pub fn new_with_scan_count(
94 tree: &'b BfTree,
95 start_key: &'b [u8],
96 scan_cnt: usize,
97 return_field: ScanReturnField,
98 ) -> Self {
99 let backoff = Backoff::new();
100 let mut aggressive_split = false;
101
102 loop {
103 let (scan_pos, lock) = match move_cursor_to_leaf_mut(tree, start_key, aggressive_split)
104 {
105 Ok((pos, lock)) => (pos, lock),
106 Err(TreeError::Locked) => {
107 backoff.spin();
108 continue;
109 }
110 Err(TreeError::NeedRestart) => {
111 aggressive_split = true;
112 backoff.spin();
113 continue;
114 }
115 Err(TreeError::CircularBufferFull) => {
116 _ = tree.evict_from_circular_buffer();
117 aggressive_split = true;
118 continue;
119 }
120 };
121
122 return Self {
123 tree,
124 scan_cnt,
125 scan_position: scan_pos,
126 leaf_lock: lock,
127 return_field,
128 end_key: None,
129 };
130 }
131 }
132
133 pub fn new_with_end_key(
134 tree: &'b BfTree,
135 start_key: &'b [u8],
136 end_key: &[u8],
137 return_field: ScanReturnField,
138 ) -> Self {
139 let mut si = Self::new_with_scan_count(tree, start_key, usize::MAX, return_field);
140 si.end_key = Some(end_key.to_vec());
141 si
142 }
143
144 pub fn next(&mut self, out_buffer: &mut [u8]) -> Option<(usize, usize)> {
145 if self.scan_cnt == 0 && self.end_key.is_none() {
146 return None;
147 }
148
149 match self.leaf_lock.scan_record_by_pos_with_bound(
150 &self.scan_position,
151 out_buffer,
152 self.return_field,
153 &self.end_key,
154 ) {
155 GetScanRecordByPosResult::Deleted => {
156 self.scan_position.move_to_next();
157 self.next(out_buffer)
158 }
159 GetScanRecordByPosResult::Found(key_len, value_len) => {
160 self.scan_position.move_to_next();
161 self.scan_cnt -= 1;
162
163 match self.leaf_lock.get_page_location() {
165 PageLocation::Base(_offset) => {
166 self.leaf_lock.load_base_page_mut();
167 }
168 PageLocation::Full(_) => {
169 }
171 PageLocation::Mini(_) => {
172 unreachable!()
173 }
174 PageLocation::Null => panic!("range_scan next on Null page"),
175 }
176 Some((key_len as usize, value_len as usize))
177 }
178 GetScanRecordByPosResult::EndOfLeaf => {
179 let right_sibling = self.leaf_lock.get_right_sibling();
181
182 if right_sibling.is_empty() {
183 self.scan_cnt = 0;
184 return None;
185 }
186
187 let backoff = Backoff::new();
188
189 let mut aggressive_split = false;
190 loop {
191 let (pos, lock) = match move_cursor_to_leaf_mut(
192 self.tree,
193 &right_sibling,
194 aggressive_split,
195 ) {
196 Ok((pos, lock)) => (pos, lock),
197 Err(TreeError::Locked) => {
198 backoff.spin();
199 continue;
200 }
201 Err(TreeError::CircularBufferFull) => {
202 aggressive_split = true;
206 continue;
207 }
208 Err(TreeError::NeedRestart) => {
209 aggressive_split = true;
210 backoff.spin();
211 continue;
212 }
213 };
214 self.scan_position = pos;
215 self.leaf_lock = lock;
216 break;
217 }
218 self.next(out_buffer)
219 }
220 GetScanRecordByPosResult::BoundKeyExceeded => {
221 self.scan_cnt = 0;
222 None
223 }
224 }
225 }
226}
227
228pub struct ScanIter<'a, 'b: 'a> {
230 tree: &'b BfTree,
231 scan_cnt: usize,
232
233 scan_position: ScanPosition,
234
235 leaf_lock: ScanLock<'a>,
236
237 return_field: ScanReturnField,
238
239 end_key: Option<Vec<u8>>,
240
241 next_key: Option<Vec<u8>>,
242}
243
244impl<'b> ScanIter<'_, 'b> {
245 pub fn new_with_scan_count(
246 tree: &'b BfTree,
247 start_key: &[u8],
248 scan_cnt: usize,
249 return_field: ScanReturnField,
250 ) -> Self {
251 let backoff = Backoff::new();
252 let mut aggressive_split = false;
253
254 let mut next_key = if tree.cache_only {
255 Some(Vec::new())
256 } else {
257 None
258 };
259
260 loop {
261 let (scan_pos, lock) =
262 match move_cursor_to_leaf(tree, start_key, aggressive_split, next_key.as_mut()) {
263 Ok((pos, lock)) => (pos, lock),
264 Err(TreeError::Locked) => {
265 backoff.spin();
266 continue;
267 }
268 Err(TreeError::NeedRestart) => {
269 aggressive_split = true;
270 backoff.spin();
271 continue;
272 }
273 Err(TreeError::CircularBufferFull) => {
274 _ = tree.evict_from_circular_buffer();
275 aggressive_split = true;
276 continue;
277 }
278 };
279
280 return Self {
281 tree,
282 scan_cnt,
283 scan_position: scan_pos,
284 leaf_lock: lock,
285 return_field,
286 end_key: None,
287 next_key,
288 };
289 }
290 }
291
292 pub fn new_with_end_key(
293 tree: &'b BfTree,
294 start_key: &[u8],
295 end_key: &[u8],
296 return_field: ScanReturnField,
297 ) -> Self {
298 let mut si = Self::new_with_scan_count(tree, start_key, usize::MAX, return_field);
299 si.end_key = Some(end_key.to_vec());
300 si
301 }
302
303 pub fn next(&mut self, out_buffer: &mut [u8]) -> Option<(usize, usize)> {
308 if self.scan_cnt == 0 && self.end_key.is_none() {
309 return None;
310 }
311
312 match self.leaf_lock.get_record_by_pos_with_bound(
313 &self.scan_position,
314 out_buffer,
315 self.return_field,
316 &self.end_key,
317 ) {
318 GetScanRecordByPosResult::Deleted => {
319 self.scan_position.move_to_next();
320 self.next(out_buffer)
321 }
322 GetScanRecordByPosResult::Found(key_len, value_len) => {
323 self.scan_position.move_to_next();
324 self.scan_cnt -= 1;
325 Some((key_len as usize, value_len as usize))
326 }
327 GetScanRecordByPosResult::EndOfLeaf => {
328 counter!(ScanGoNextLeaf);
330 let right_sibling = if !self.tree.cache_only {
331 self.leaf_lock.get_right_sibling()
332 } else {
333 if let Some(key) = self.next_key.as_ref() {
334 key.clone()
335 } else {
336 panic!("next_key is None in cache_only mode");
337 }
338 };
339
340 if right_sibling.is_empty() {
341 self.scan_cnt = 0;
342 return None;
343 }
344
345 let backoff = Backoff::new();
346
347 let mut aggressive_split = false;
348 loop {
349 let (pos, lock) = match move_cursor_to_leaf(
350 self.tree,
351 &right_sibling,
352 aggressive_split,
353 self.next_key.as_mut(),
354 ) {
355 Ok((pos, lock)) => (pos, lock),
356 Err(TreeError::Locked) => {
357 backoff.spin();
358 continue;
359 }
360 Err(TreeError::CircularBufferFull) => {
361 aggressive_split = true;
367 continue;
368 }
369 Err(TreeError::NeedRestart) => {
370 aggressive_split = true;
371 backoff.spin();
372 continue;
373 }
374 };
375 self.scan_position = pos;
376 self.leaf_lock = lock;
377 break;
378 }
379 self.next(out_buffer)
380 }
381 GetScanRecordByPosResult::BoundKeyExceeded => {
382 self.scan_cnt = 0;
383 None
384 }
385 }
386 }
387}
388
389fn promote_or_merge_mini_page<'a>(
390 tree: &'a BfTree,
391 key: &[u8],
392 leaf: &mut LeafEntryXLocked<'a>,
393 parent: ReadGuard<'a>,
394) -> Result<ScanPosition, TreeError> {
395 let page_loc = leaf.get_page_location();
396 match page_loc {
397 PageLocation::Full(_) => {
398 unreachable!()
399 }
400 PageLocation::Base(offset) => {
401 counter!(ScanPromoteBaseToFull);
402 let next_level = MiniPageNextLevel::new(*offset);
404 let base_page_ref = leaf.load_base_page_from_buffer();
405 let pos = base_page_ref.lower_bound(key);
406
407 if base_page_ref.meta.meta_count_without_fence() > 0 {
409 let full_page_loc = upgrade_to_full_page(&tree.storage, base_page_ref, next_level)?;
410
411 leaf.create_cache_page_loc(full_page_loc);
412
413 Ok(ScanPosition::Full(pos as u32))
414 } else {
415 Ok(ScanPosition::Base(pos as u32))
416 }
417 }
418 PageLocation::Mini(ptr) => {
419 counter!(ScanMergeMiniPage);
420 let mini_page = leaf.load_cache_page_mut(*ptr);
421 let h = tree.storage.begin_dealloc_mini_page(mini_page)?;
423 let merge_result = leaf.try_merge_mini_page(&h, parent, &tree.storage)?;
424
425 match merge_result {
426 MergeResult::NoSplit => {
427 if tree.should_promote_scan_page() {
432 let base_offset = mini_page.next_level;
434 leaf.change_to_base_loc();
435 tree.storage.finish_dealloc_mini_page(h);
436
437 let base_page_ref = leaf.load_base_page_from_buffer();
438 let pos = base_page_ref.lower_bound(key);
439 if base_page_ref.meta.meta_count_without_fence() > 0 {
440 let full_page_loc =
441 upgrade_to_full_page(&tree.storage, base_page_ref, base_offset)?;
442
443 leaf.create_cache_page_loc(full_page_loc);
444 Ok(ScanPosition::Full(pos as u32))
445 } else {
446 Ok(ScanPosition::Base(pos as u32))
447 }
448 } else {
449 leaf.change_to_base_loc();
450 tree.storage.finish_dealloc_mini_page(h);
451 let base_ref = leaf.load_base_page_from_buffer();
452 let pos = base_ref.lower_bound(key);
453 Ok(ScanPosition::Base(pos as u32))
454 }
455 }
456 MergeResult::MergeAndSplit => {
457 leaf.change_to_base_loc();
459 tree.storage.finish_dealloc_mini_page(h);
460
461 Err(TreeError::NeedRestart)
465 }
466 }
467 }
468 PageLocation::Null => panic!("promote_or_merge_mini_page on Null page"),
469 }
470}
471
472fn move_cursor_to_leaf_mut<'a>(
473 tree: &'a BfTree,
474 key: &[u8],
475 aggressive_split: bool,
476) -> Result<(ScanPosition, LeafEntryXLocked<'a>), TreeError> {
477 let (pid, parent) = tree.traverse_to_leaf(key, aggressive_split, false, None)?;
478
479 let mut leaf = tree.mapping_table().get_mut(&pid);
480
481 check_parent!(tree, pid, parent);
482
483 if let Ok(pos) = leaf.get_scan_position(key, false) {
484 match pos {
485 ScanPosition::Base(_) => {
486 if !tree.should_promote_scan_page() {
487 return Ok((pos, leaf));
488 }
489 }
491 ScanPosition::Full(_) => {
492 return Ok((pos, leaf));
493 }
494 _ => {
495 panic!("Scanning mini or null page in ScanIterMut");
496 }
497 }
498 }
499
500 let v = promote_or_merge_mini_page(tree, key, &mut leaf, parent.unwrap())?;
503 Ok((v, leaf))
504}
505
506fn move_cursor_to_leaf<'a>(
507 tree: &'a BfTree,
508 key: &[u8],
509 aggressive_split: bool,
510 mut next_key: Option<&mut Vec<u8>>,
511) -> Result<(ScanPosition, ScanLock<'a>), TreeError> {
512 let (pid, parent) = if tree.cache_only {
513 if let Some(key) = next_key.as_deref_mut() {
514 key.clear();
515 } else {
516 panic!("next_key is None in cache_only mode");
517 }
518 tree.traverse_to_leaf(key, aggressive_split, true, next_key)?
519 } else {
520 tree.traverse_to_leaf(key, aggressive_split, false, None)?
521 };
522
523 let mut leaf = tree.mapping_table().get(&pid);
524
525 check_parent!(tree, pid, parent);
526
527 if let Ok(pos) = leaf.get_scan_position(key, tree.cache_only) {
528 match pos {
529 ScanPosition::Base(_) => {
530 counter!(ScanBasePage);
531 if parent.is_none() || !tree.should_promote_scan_page() {
532 return Ok((pos, ScanLock::S(leaf)));
533 }
534 }
536 ScanPosition::Full(_) => {
537 counter!(ScanFullPage);
538 return Ok((pos, ScanLock::S(leaf)));
539 }
540 ScanPosition::Mini(_) => {
541 assert!(tree.cache_only);
542 counter!(ScanMiniPage);
543 return Ok((pos, ScanLock::S(leaf)));
544 }
545 ScanPosition::Null => {
546 counter!(ScanNullPage);
547 return Ok((pos, ScanLock::S(leaf)));
548 }
549 }
550 }
551
552 let mut x_leaf = leaf
554 .try_upgrade(tree.snapshot_mgr.clone(), pid)
555 .map_err(|_e| TreeError::Locked)?;
556
557 let v = promote_or_merge_mini_page(tree, key, &mut x_leaf, parent.unwrap())?;
558 Ok((v, ScanLock::X(x_leaf)))
559}
560
561#[cfg(test)]
562mod tests {
563 use crate::utils::test_util::install_value_to_buffer;
564 use crate::{BfTree, Config};
565 use crate::{LeafInsertResult, ScanReturnField};
566 use std::mem::size_of;
567
568 #[test]
569 fn test_scan_with_count() {
570 let tree = BfTree::default();
571
572 let key_len: usize = tree.config.max_fence_len / 2;
574 let mut key_buffer = vec![0; key_len / size_of::<usize>()];
575
576 let value_len: usize = 1024; let mut value_buffer = vec![0; value_len / size_of::<usize>()];
578
579 for i in 0..1_000 {
580 let key = install_value_to_buffer(&mut key_buffer, i);
581 let value = install_value_to_buffer(&mut value_buffer, i);
582 if tree.insert(key, value) != LeafInsertResult::Success {
583 panic!("Insert failed");
584 }
585 }
586
587 let mut start_key = install_value_to_buffer(&mut key_buffer, 0);
589 let r = tree.scan_with_count(start_key, 0, ScanReturnField::Value);
590 match r {
591 Err(e) => {
592 assert_eq!(e, crate::ScanIterError::InvalidCount);
593 }
594 _ => {
595 panic!("Should not succeed");
596 }
597 }
598
599 let mut output_buffer = vec![0u8; key_len + value_len];
601 let mut prev_key = vec![0u8; key_len];
602 for _ in 0..9 {
603 start_key = &prev_key.as_slice().as_ref();
604 let mut scan_iter = tree
605 .scan_with_count(start_key, 101, ScanReturnField::KeyAndValue)
606 .expect("Scan failed");
607
608 let mut cnt = 0;
609 while let Some((kl, vl)) = scan_iter.next(&mut output_buffer) {
610 let scanned_key = &output_buffer[0..kl];
611 assert!(kl == key_len);
612
613 if cnt != 0 {
614 let cmp_res = scanned_key.cmp(&prev_key);
615 if cmp_res == std::cmp::Ordering::Less {
616 panic!("Keys are not in order");
617 }
618 assert_eq!(cmp_res, std::cmp::Ordering::Greater);
619 }
620
621 prev_key[..kl].copy_from_slice(scanned_key);
622
623 assert!(vl == value_len);
624 cnt += 1;
625 }
626 assert!(cnt == 101);
627 }
628
629 start_key = &prev_key.as_slice().as_ref();
631 let mut scan_iter = tree
632 .scan_with_count(start_key, 120, ScanReturnField::Key)
633 .expect("Scan failed");
634 let mut cnt = 0;
635
636 while let Some((kl, vl)) = scan_iter.next(&mut output_buffer) {
637 let scanned_key = &output_buffer[0..kl];
638 assert!(kl == key_len);
639
640 if cnt != 0 {
641 let cmp_res = scanned_key.cmp(&prev_key);
642 assert_eq!(cmp_res, std::cmp::Ordering::Greater);
643 }
644 prev_key[..kl].copy_from_slice(scanned_key);
645 assert!(vl == 0);
646 cnt += 1;
647 }
648 assert!(cnt == 100);
649 }
650
651 #[test]
652 fn test_scan_with_end_key() {
653 let tree = BfTree::default();
654
655 let key_len: usize = tree.config.max_fence_len / 2;
657 let mut key_buffer = vec![0; key_len / size_of::<usize>()];
658
659 let value_len: usize = 1024; let mut value_buffer = vec![0; value_len / size_of::<usize>()];
661
662 for i in 0..1_000 {
663 let key = install_value_to_buffer(&mut key_buffer, i);
664 let value = install_value_to_buffer(&mut value_buffer, i);
665 if tree.insert(key, value) != LeafInsertResult::Success {
666 panic!("Insert failed");
667 }
668 }
669
670 let mut start_key = install_value_to_buffer(&mut key_buffer, 1);
672 let mut invalid_key_buffer: Vec<usize> = vec![0; key_len / size_of::<usize>() + 1];
673 let mut invalid_key = install_value_to_buffer(&mut invalid_key_buffer, 1);
674
675 let mut r = tree.scan_with_end_key(start_key, invalid_key, ScanReturnField::Value);
676 match r {
677 Err(e) => {
678 assert_eq!(e, crate::ScanIterError::InvalidEndKey);
679 }
680 _ => {
681 panic!("Should not succeed");
682 }
683 }
684
685 invalid_key = install_value_to_buffer(&mut invalid_key_buffer, 0);
686
687 r = tree.scan_with_end_key(invalid_key, start_key, ScanReturnField::Value);
688 match r {
689 Err(e) => {
690 assert_eq!(e, crate::ScanIterError::InvalidStartKey);
691 }
692 _ => {
693 panic!("Should not succeed");
694 }
695 }
696
697 let mut end_key_buffer = vec![0; key_len / size_of::<usize>()];
698 let mut end_key = install_value_to_buffer(&mut end_key_buffer, 0);
699
700 r = tree.scan_with_end_key(start_key, end_key, ScanReturnField::Value);
701 match r {
702 Err(e) => {
703 assert_eq!(e, crate::ScanIterError::InvalidKeyRange);
704 }
705 _ => {
706 panic!("Should not succeed");
707 }
708 }
709
710 start_key = install_value_to_buffer(&mut key_buffer, 0);
711 end_key = install_value_to_buffer(&mut end_key_buffer, 777);
712
713 let mut scan_iter = tree
714 .scan_with_end_key(start_key, end_key, ScanReturnField::Key)
715 .expect("Scan failed");
716 let mut output_buffer = vec![0u8; key_len];
717 let mut prev_key = vec![0u8; key_len];
718 let mut cnt = 0;
719
720 while let Some((kl, vl)) = scan_iter.next(&mut output_buffer) {
721 let scanned_key = &output_buffer[0..kl];
722 assert!(kl == key_len);
723
724 if cnt != 0 {
725 let cmp_res = scanned_key.cmp(&prev_key);
726 assert_eq!(cmp_res, std::cmp::Ordering::Greater);
727 }
728 prev_key[..kl].copy_from_slice(scanned_key);
729
730 let cmp_res = scanned_key.cmp(end_key);
731 assert!(cmp_res == std::cmp::Ordering::Less || cmp_res == std::cmp::Ordering::Equal);
732
733 assert!(vl == 0);
734 cnt += 1;
735 }
736
737 let cmp_res = prev_key.as_slice().cmp(end_key);
738 assert!(cmp_res == std::cmp::Ordering::Equal);
739 assert!(cnt == 40);
740 }
741
742 fn cache_only_tree(cb_size_byte: Option<usize>) -> BfTree {
743 let mut config = Config::default();
744 config.cache_only(true);
745 if let Some(size) = cb_size_byte {
746 config.cb_size_byte(size);
747 }
748 BfTree::with_config(config, None).expect("Failed to create cache-only BfTree")
749 }
750
751 #[test]
752 fn test_scan_with_count_cache_only() {
753 let tree = cache_only_tree(None);
754 assert!(tree.cache_only);
755
756 let key_len: usize = tree.config.max_fence_len / 2;
758 let mut key_buffer = vec![0; key_len / size_of::<usize>()];
759
760 let value_len: usize = 1024; let mut value_buffer = vec![0; value_len / size_of::<usize>()];
762
763 for i in 0..1_000 {
764 let key = install_value_to_buffer(&mut key_buffer, i);
765 let value = install_value_to_buffer(&mut value_buffer, i);
766 if tree.insert(key, value) != LeafInsertResult::Success {
767 panic!("Insert failed");
768 }
769 }
770
771 let mut start_key = install_value_to_buffer(&mut key_buffer, 0);
773 let r = tree.scan_with_count(start_key, 0, ScanReturnField::Value);
774 match r {
775 Err(e) => {
776 assert_eq!(e, crate::ScanIterError::InvalidCount);
777 }
778 _ => {
779 panic!("Should not succeed");
780 }
781 }
782
783 let mut output_buffer = vec![0u8; key_len + value_len];
785 let mut prev_key = vec![0u8; key_len];
786 for _ in 0..9 {
787 start_key = &prev_key.as_slice().as_ref();
788 let mut scan_iter = tree
789 .scan_with_count(start_key, 101, ScanReturnField::KeyAndValue)
790 .expect("Scan failed");
791
792 let mut cnt = 0;
793 while let Some((kl, vl)) = scan_iter.next(&mut output_buffer) {
794 let scanned_key = &output_buffer[0..kl];
795 assert!(kl == key_len);
796
797 if cnt != 0 {
798 let cmp_res = scanned_key.cmp(&prev_key);
799 if cmp_res == std::cmp::Ordering::Less {
800 panic!("Keys are not in order");
801 }
802 assert_eq!(cmp_res, std::cmp::Ordering::Greater);
803 }
804
805 prev_key[..kl].copy_from_slice(scanned_key);
806
807 assert!(vl == value_len);
808 cnt += 1;
809 }
810 assert!(cnt == 101);
811 }
812
813 start_key = &prev_key.as_slice().as_ref();
815 let mut scan_iter = tree
816 .scan_with_count(start_key, 120, ScanReturnField::Key)
817 .expect("Scan failed");
818 let mut cnt = 0;
819
820 while let Some((kl, vl)) = scan_iter.next(&mut output_buffer) {
821 let scanned_key = &output_buffer[0..kl];
822 assert!(kl == key_len);
823
824 if cnt != 0 {
825 let cmp_res = scanned_key.cmp(&prev_key);
826 assert_eq!(cmp_res, std::cmp::Ordering::Greater);
827 }
828 prev_key[..kl].copy_from_slice(scanned_key);
829 assert!(vl == 0);
830 cnt += 1;
831 }
832 assert!(cnt == 100);
833 }
834
835 #[test]
836 fn test_scan_with_end_key_cache_only() {
837 let tree = cache_only_tree(None);
838 assert!(tree.cache_only);
839
840 let key_len: usize = tree.config.max_fence_len / 2;
842 let mut key_buffer = vec![0; key_len / size_of::<usize>()];
843
844 let value_len: usize = 1024; let mut value_buffer = vec![0; value_len / size_of::<usize>()];
846
847 for i in 0..1_000 {
848 let key = install_value_to_buffer(&mut key_buffer, i);
849 let value = install_value_to_buffer(&mut value_buffer, i);
850 if tree.insert(key, value) != LeafInsertResult::Success {
851 panic!("Insert failed");
852 }
853 }
854
855 let start_key = install_value_to_buffer(&mut key_buffer, 1);
857 let mut invalid_key_buffer: Vec<usize> = vec![0; key_len / size_of::<usize>() + 1];
858 let mut invalid_key = install_value_to_buffer(&mut invalid_key_buffer, 1);
859
860 let mut r = tree.scan_with_end_key(start_key, invalid_key, ScanReturnField::Value);
861 match r {
862 Err(e) => {
863 assert_eq!(e, crate::ScanIterError::InvalidEndKey);
864 }
865 _ => {
866 panic!("Should not succeed");
867 }
868 }
869
870 invalid_key = install_value_to_buffer(&mut invalid_key_buffer, 0);
871
872 r = tree.scan_with_end_key(invalid_key, start_key, ScanReturnField::Value);
873 match r {
874 Err(e) => {
875 assert_eq!(e, crate::ScanIterError::InvalidStartKey);
876 }
877 _ => {
878 panic!("Should not succeed");
879 }
880 }
881
882 let mut end_key_buffer = vec![0; key_len / size_of::<usize>()];
883 let mut end_key = install_value_to_buffer(&mut end_key_buffer, 0);
884
885 r = tree.scan_with_end_key(start_key, end_key, ScanReturnField::Value);
886 match r {
887 Err(e) => {
888 assert_eq!(e, crate::ScanIterError::InvalidKeyRange);
889 }
890 _ => {
891 panic!("Should not succeed");
892 }
893 }
894
895 let start_key = install_value_to_buffer(&mut key_buffer, 0);
896 end_key = install_value_to_buffer(&mut end_key_buffer, 777);
897
898 let mut scan_iter = tree
899 .scan_with_end_key(start_key, end_key, ScanReturnField::Key)
900 .expect("Scan failed");
901 let mut output_buffer = vec![0u8; key_len];
902 let mut prev_key = vec![0u8; key_len];
903 let mut cnt = 0;
904
905 while let Some((kl, vl)) = scan_iter.next(&mut output_buffer) {
906 let scanned_key = &output_buffer[0..kl];
907 assert!(kl == key_len);
908
909 if cnt != 0 {
910 let cmp_res = scanned_key.cmp(&prev_key);
911 assert_eq!(cmp_res, std::cmp::Ordering::Greater);
912 }
913 prev_key[..kl].copy_from_slice(scanned_key);
914
915 let cmp_res = scanned_key.cmp(end_key);
916 assert!(cmp_res == std::cmp::Ordering::Less || cmp_res == std::cmp::Ordering::Equal);
917
918 assert!(vl == 0);
919 cnt += 1;
920 }
921
922 let cmp_res = prev_key.as_slice().cmp(end_key);
923 assert!(cmp_res == std::cmp::Ordering::Equal);
924 assert!(cnt == 40);
925 }
926
927 #[test]
928 fn test_scan_mut_disallowed_in_cache_only() {
929 let tree = cache_only_tree(None);
930 assert!(tree.cache_only);
931
932 let key_len: usize = tree.config.max_fence_len / 2;
933 let mut key_buffer = vec![0; key_len / size_of::<usize>()];
934 let start_key = install_value_to_buffer(&mut key_buffer, 0);
935
936 let mut end_key_buffer = vec![0; key_len / size_of::<usize>()];
937 let end_key = install_value_to_buffer(&mut end_key_buffer, 1);
938
939 let r = tree.scan_mut_with_count(start_key, 10, ScanReturnField::Value);
940 match r {
941 Err(e) => assert_eq!(e, crate::ScanIterError::CacheOnlyMode),
942 _ => panic!("scan_mut_with_count should be disallowed in cache-only mode"),
943 }
944
945 let r = tree.scan_mut_with_end_key(start_key, end_key, ScanReturnField::Value);
946 match r {
947 Err(e) => assert_eq!(e, crate::ScanIterError::CacheOnlyMode),
948 _ => panic!("scan_mut_with_end_key should be disallowed in cache-only mode"),
949 }
950 }
951
952 const SMALL_CB_SIZE: usize = 32 * 1024;
956
957 #[test]
958 fn test_scan_with_count_cache_only_lossy() {
959 let tree = cache_only_tree(Some(SMALL_CB_SIZE));
960 assert!(tree.cache_only);
961
962 let key_len: usize = tree.config.max_fence_len / 2;
963 let mut key_buffer = vec![0; key_len / size_of::<usize>()];
964
965 let value_len: usize = 1024;
969 let mut value_buffer = vec![0; value_len / size_of::<usize>()];
970
971 let total: usize = 10_000;
972 for i in 0..total {
973 let key = install_value_to_buffer(&mut key_buffer, i);
974 let value = install_value_to_buffer(&mut value_buffer, i);
975 if tree.insert(key, value) != LeafInsertResult::Success {
976 panic!("Insert failed");
977 }
978 }
979
980 let start_key = install_value_to_buffer(&mut key_buffer, 0);
982 let mut scan_iter = tree
983 .scan_with_count(start_key, usize::MAX, ScanReturnField::Key)
984 .expect("Scan failed");
985
986 let mut output_buffer = vec![0u8; key_len + value_len];
987 let mut prev_key = vec![0u8; key_len];
988 let mut cnt: usize = 0;
989 while let Some((kl, vl)) = scan_iter.next(&mut output_buffer) {
990 assert_eq!(kl, key_len);
991 assert_eq!(vl, 0); let scanned_key = &output_buffer[0..kl];
994 if cnt != 0 {
995 let cmp_res = scanned_key.cmp(&prev_key);
996 assert_eq!(
997 cmp_res,
998 std::cmp::Ordering::Greater,
999 "scan must produce strictly increasing keys"
1000 );
1001 }
1002 prev_key[..kl].copy_from_slice(scanned_key);
1003 cnt += 1;
1004 }
1005
1006 assert!(
1008 cnt < total,
1009 "expected evictions to drop some keys, got cnt={} total={}",
1010 cnt,
1011 total
1012 );
1013 assert!(cnt > 0, "scan returned no records");
1015 }
1016
1017 #[test]
1018 fn test_scan_with_end_key_cache_only_lossy() {
1019 let tree = cache_only_tree(Some(SMALL_CB_SIZE));
1020 assert!(tree.cache_only);
1021
1022 let key_len: usize = tree.config.max_fence_len / 2;
1023 let mut key_buffer = vec![0; key_len / size_of::<usize>()];
1024
1025 let value_len: usize = 1024;
1026 let mut value_buffer = vec![0; value_len / size_of::<usize>()];
1027
1028 let total: usize = 5_000;
1029 for i in 0..total {
1030 let key = install_value_to_buffer(&mut key_buffer, i);
1031 let value = install_value_to_buffer(&mut value_buffer, i);
1032 if tree.insert(key, value) != LeafInsertResult::Success {
1033 panic!("Insert failed");
1034 }
1035 }
1036
1037 let start_key = install_value_to_buffer(&mut key_buffer, 0);
1039 let mut end_key_buffer = vec![0; key_len / size_of::<usize>()];
1040 let end_key_idx: usize = 2_000;
1041 let end_key = install_value_to_buffer(&mut end_key_buffer, end_key_idx);
1042
1043 let mut scan_iter = tree
1044 .scan_with_end_key(start_key, end_key, ScanReturnField::Key)
1045 .expect("Scan failed");
1046
1047 let mut output_buffer = vec![0u8; key_len];
1048 let mut prev_key = vec![0u8; key_len];
1049 let mut cnt: usize = 0;
1050 while let Some((kl, vl)) = scan_iter.next(&mut output_buffer) {
1051 assert_eq!(kl, key_len);
1052 assert_eq!(vl, 0);
1053
1054 let scanned_key = &output_buffer[0..kl];
1055 if cnt != 0 {
1056 let cmp_res = scanned_key.cmp(&prev_key);
1057 assert_eq!(cmp_res, std::cmp::Ordering::Greater);
1058 }
1059 prev_key[..kl].copy_from_slice(scanned_key);
1060
1061 let cmp_res = scanned_key.cmp(end_key);
1063 assert!(cmp_res == std::cmp::Ordering::Less || cmp_res == std::cmp::Ordering::Equal);
1064
1065 cnt += 1;
1066 }
1067
1068 assert!(
1071 cnt <= end_key_idx + 1,
1072 "scan returned more keys ({}) than the upper bound ({})",
1073 cnt,
1074 end_key_idx + 1
1075 );
1076 }
1077
1078 #[test]
1079 fn test_scan_with_value_cache_only_lossy() {
1080 let tree = cache_only_tree(Some(SMALL_CB_SIZE));
1084 assert!(tree.cache_only);
1085
1086 let key_len: usize = tree.config.max_fence_len / 2;
1087 let mut key_buffer = vec![0; key_len / size_of::<usize>()];
1088
1089 let value_len: usize = 1024;
1090 let mut value_buffer = vec![0; value_len / size_of::<usize>()];
1091
1092 let total: usize = 8_000;
1093 for i in 0..total {
1094 let key = install_value_to_buffer(&mut key_buffer, i);
1095 let value = install_value_to_buffer(&mut value_buffer, i);
1096 if tree.insert(key, value) != LeafInsertResult::Success {
1097 panic!("Insert failed");
1098 }
1099 }
1100
1101 let start_key = install_value_to_buffer(&mut key_buffer, 0);
1102 let mut scan_iter = tree
1103 .scan_with_count(start_key, usize::MAX, ScanReturnField::KeyAndValue)
1104 .expect("Scan failed");
1105
1106 let mut output_buffer = vec![0u8; key_len + value_len];
1107 let mut prev_key = vec![0u8; key_len];
1108 let mut cnt: usize = 0;
1109
1110 while let Some((kl, vl)) = scan_iter.next(&mut output_buffer) {
1111 assert_eq!(kl, key_len);
1112 assert_eq!(vl, value_len);
1113
1114 let scanned_key = &output_buffer[0..kl];
1115 let scanned_value = &output_buffer[kl..kl + vl];
1116
1117 if cnt != 0 {
1118 assert_eq!(scanned_key.cmp(&prev_key), std::cmp::Ordering::Greater);
1119 }
1120 prev_key[..kl].copy_from_slice(scanned_key);
1121
1122 assert_eq!(
1127 &scanned_value[..key_len],
1128 scanned_key,
1129 "scanned value prefix does not match key (key/value out of sync)"
1130 );
1131 for chunk in scanned_value.chunks_exact(key_len) {
1134 assert_eq!(chunk, scanned_key, "value is not a repetition of key");
1135 }
1136
1137 cnt += 1;
1138 }
1139
1140 assert!(cnt > 0, "scan returned no records");
1141 assert!(
1142 cnt < total,
1143 "expected evictions to drop some keys, got cnt={} total={}",
1144 cnt,
1145 total
1146 );
1147 }
1148}