Skip to main content

bf_tree/
range_scan.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT license.
3
4use crate::{
5    check_parent, counter,
6    error::TreeError,
7    mini_page_op::{
8        upgrade_to_full_page, LeafEntrySLocked, LeafEntryXLocked, LeafOperations, MergeResult,
9    },
10    nodes::leaf_node::{GetScanRecordByPosResult, MiniPageNextLevel},
11    storage::PageLocation,
12    utils::{inner_lock::ReadGuard, Backoff},
13    BfTree,
14};
15
16pub(crate) enum ScanError {
17    NeedMergeMiniPage,
18}
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum ScanReturnField {
22    Key,
23    Value,
24    KeyAndValue,
25}
26
27pub(crate) enum ScanPosition {
28    Base(u32),
29    Full(u32),
30    Mini(u32), // cache-only mode
31    Null,      // cache-only mode, evicted mini page
32}
33
34impl ScanPosition {
35    fn move_to_next(&mut self) {
36        match self {
37            ScanPosition::Base(offset) => *offset += 1,
38            ScanPosition::Full(offset) => *offset += 1,
39            ScanPosition::Mini(offset) => *offset += 1,
40            ScanPosition::Null => panic!("move_to_next on Null page"),
41        }
42    }
43}
44
45// I think we only need s-lock. but we do x-lock because we can't downgrade a x-lock to s-lock yet.
46// implementing the downgrade is more challenging than I thought.
47// we currently keep both, but for performance we shouldn't hold the x-lock for too long.
48enum ScanLock<'b> {
49    S(LeafEntrySLocked<'b>),
50    X(LeafEntryXLocked<'b>),
51}
52
53impl ScanLock<'_> {
54    fn get_record_by_pos_with_bound(
55        &self,
56        pos: &ScanPosition,
57        out_buffer: &mut [u8],
58        return_field: ScanReturnField,
59        end_key: &Option<Vec<u8>>,
60    ) -> GetScanRecordByPosResult {
61        match self {
62            ScanLock::S(leaf) => {
63                leaf.scan_record_by_pos_with_bound(pos, out_buffer, return_field, end_key)
64            }
65            ScanLock::X(leaf) => {
66                leaf.scan_record_by_pos_with_bound(pos, out_buffer, return_field, end_key)
67            }
68        }
69    }
70
71    fn get_right_sibling(&mut self) -> Vec<u8> {
72        match self {
73            ScanLock::S(leaf) => leaf.get_right_sibling(),
74            ScanLock::X(leaf) => leaf.get_right_sibling(),
75        }
76    }
77}
78
79pub struct ScanIterMut<'a, 'b: 'a> {
80    tree: &'b BfTree,
81    scan_cnt: usize,
82
83    scan_position: ScanPosition,
84
85    leaf_lock: LeafEntryXLocked<'a>,
86
87    return_field: ScanReturnField,
88
89    end_key: Option<Vec<u8>>,
90}
91
92impl<'b> ScanIterMut<'_, 'b> {
93    pub fn new_with_scan_count(
94        tree: &'b BfTree,
95        start_key: &'b [u8],
96        scan_cnt: usize,
97        return_field: ScanReturnField,
98    ) -> Self {
99        let backoff = Backoff::new();
100        let mut aggressive_split = false;
101
102        loop {
103            let (scan_pos, lock) = match move_cursor_to_leaf_mut(tree, start_key, aggressive_split)
104            {
105                Ok((pos, lock)) => (pos, lock),
106                Err(TreeError::Locked) => {
107                    backoff.spin();
108                    continue;
109                }
110                Err(TreeError::NeedRestart) => {
111                    aggressive_split = true;
112                    backoff.spin();
113                    continue;
114                }
115                Err(TreeError::CircularBufferFull) => {
116                    _ = tree.evict_from_circular_buffer();
117                    aggressive_split = true;
118                    continue;
119                }
120            };
121
122            return Self {
123                tree,
124                scan_cnt,
125                scan_position: scan_pos,
126                leaf_lock: lock,
127                return_field,
128                end_key: None,
129            };
130        }
131    }
132
133    pub fn new_with_end_key(
134        tree: &'b BfTree,
135        start_key: &'b [u8],
136        end_key: &[u8],
137        return_field: ScanReturnField,
138    ) -> Self {
139        let mut si = Self::new_with_scan_count(tree, start_key, usize::MAX, return_field);
140        si.end_key = Some(end_key.to_vec());
141        si
142    }
143
144    pub fn next(&mut self, out_buffer: &mut [u8]) -> Option<(usize, usize)> {
145        if self.scan_cnt == 0 && self.end_key.is_none() {
146            return None;
147        }
148
149        match self.leaf_lock.scan_record_by_pos_with_bound(
150            &self.scan_position,
151            out_buffer,
152            self.return_field,
153            &self.end_key,
154        ) {
155            GetScanRecordByPosResult::Deleted => {
156                self.scan_position.move_to_next();
157                self.next(out_buffer)
158            }
159            GetScanRecordByPosResult::Found(key_len, value_len) => {
160                self.scan_position.move_to_next();
161                self.scan_cnt -= 1;
162
163                // since we are mut, we need to mark as dirty.
164                match self.leaf_lock.get_page_location() {
165                    PageLocation::Base(_offset) => {
166                        self.leaf_lock.load_base_page_mut();
167                    }
168                    PageLocation::Full(_) => {
169                        // do nothing.
170                    }
171                    PageLocation::Mini(_) => {
172                        unreachable!()
173                    }
174                    PageLocation::Null => panic!("range_scan next on Null page"),
175                }
176                Some((key_len as usize, value_len as usize))
177            }
178            GetScanRecordByPosResult::EndOfLeaf => {
179                // we need to load next leaf.
180                let right_sibling = self.leaf_lock.get_right_sibling();
181
182                if right_sibling.is_empty() {
183                    self.scan_cnt = 0;
184                    return None;
185                }
186
187                let backoff = Backoff::new();
188
189                let mut aggressive_split = false;
190                loop {
191                    let (pos, lock) = match move_cursor_to_leaf_mut(
192                        self.tree,
193                        &right_sibling,
194                        aggressive_split,
195                    ) {
196                        Ok((pos, lock)) => (pos, lock),
197                        Err(TreeError::Locked) => {
198                            backoff.spin();
199                            continue;
200                        }
201                        Err(TreeError::CircularBufferFull) => {
202                            // We can't call eviction here because we are holding a lock, which may happened to be evicted!
203                            // It is safe bc circular buffer full is caused by promoting to full page, which is a performance concern not correctness.
204                            //
205                            aggressive_split = true;
206                            continue;
207                        }
208                        Err(TreeError::NeedRestart) => {
209                            aggressive_split = true;
210                            backoff.spin();
211                            continue;
212                        }
213                    };
214                    self.scan_position = pos;
215                    self.leaf_lock = lock;
216                    break;
217                }
218                self.next(out_buffer)
219            }
220            GetScanRecordByPosResult::BoundKeyExceeded => {
221                self.scan_cnt = 0;
222                None
223            }
224        }
225    }
226}
227
228/// The scan iterator obtained from [BfTree::scan].
229pub struct ScanIter<'a, 'b: 'a> {
230    tree: &'b BfTree,
231    scan_cnt: usize,
232
233    scan_position: ScanPosition,
234
235    leaf_lock: ScanLock<'a>,
236
237    return_field: ScanReturnField,
238
239    end_key: Option<Vec<u8>>,
240
241    next_key: Option<Vec<u8>>,
242}
243
244impl<'b> ScanIter<'_, 'b> {
245    pub fn new_with_scan_count(
246        tree: &'b BfTree,
247        start_key: &[u8],
248        scan_cnt: usize,
249        return_field: ScanReturnField,
250    ) -> Self {
251        let backoff = Backoff::new();
252        let mut aggressive_split = false;
253
254        let mut next_key = if tree.cache_only {
255            Some(Vec::new())
256        } else {
257            None
258        };
259
260        loop {
261            let (scan_pos, lock) =
262                match move_cursor_to_leaf(tree, start_key, aggressive_split, next_key.as_mut()) {
263                    Ok((pos, lock)) => (pos, lock),
264                    Err(TreeError::Locked) => {
265                        backoff.spin();
266                        continue;
267                    }
268                    Err(TreeError::NeedRestart) => {
269                        aggressive_split = true;
270                        backoff.spin();
271                        continue;
272                    }
273                    Err(TreeError::CircularBufferFull) => {
274                        _ = tree.evict_from_circular_buffer();
275                        aggressive_split = true;
276                        continue;
277                    }
278                };
279
280            return Self {
281                tree,
282                scan_cnt,
283                scan_position: scan_pos,
284                leaf_lock: lock,
285                return_field,
286                end_key: None,
287                next_key,
288            };
289        }
290    }
291
292    pub fn new_with_end_key(
293        tree: &'b BfTree,
294        start_key: &[u8],
295        end_key: &[u8],
296        return_field: ScanReturnField,
297    ) -> Self {
298        let mut si = Self::new_with_scan_count(tree, start_key, usize::MAX, return_field);
299        si.end_key = Some(end_key.to_vec());
300        si
301    }
302
303    // Here we need to busy loop? Is that safe?
304    /// Scan next value into `out_buffer`.
305    /// next() terminates if 1) reached the last key. 2) scanned `scan_cnt` records, if set. 3) reached end_key, if set.
306    /// Returns the length of the record fields copied into `out_buffer` or None if there is no more value.
307    pub fn next(&mut self, out_buffer: &mut [u8]) -> Option<(usize, usize)> {
308        if self.scan_cnt == 0 && self.end_key.is_none() {
309            return None;
310        }
311
312        match self.leaf_lock.get_record_by_pos_with_bound(
313            &self.scan_position,
314            out_buffer,
315            self.return_field,
316            &self.end_key,
317        ) {
318            GetScanRecordByPosResult::Deleted => {
319                self.scan_position.move_to_next();
320                self.next(out_buffer)
321            }
322            GetScanRecordByPosResult::Found(key_len, value_len) => {
323                self.scan_position.move_to_next();
324                self.scan_cnt -= 1;
325                Some((key_len as usize, value_len as usize))
326            }
327            GetScanRecordByPosResult::EndOfLeaf => {
328                // we need to load next leaf.
329                counter!(ScanGoNextLeaf);
330                let right_sibling = if !self.tree.cache_only {
331                    self.leaf_lock.get_right_sibling()
332                } else {
333                    if let Some(key) = self.next_key.as_ref() {
334                        key.clone()
335                    } else {
336                        panic!("next_key is None in cache_only mode");
337                    }
338                };
339
340                if right_sibling.is_empty() {
341                    self.scan_cnt = 0;
342                    return None;
343                }
344
345                let backoff = Backoff::new();
346
347                let mut aggressive_split = false;
348                loop {
349                    let (pos, lock) = match move_cursor_to_leaf(
350                        self.tree,
351                        &right_sibling,
352                        aggressive_split,
353                        self.next_key.as_mut(),
354                    ) {
355                        Ok((pos, lock)) => (pos, lock),
356                        Err(TreeError::Locked) => {
357                            backoff.spin();
358                            continue;
359                        }
360                        Err(TreeError::CircularBufferFull) => {
361                            // We can't call eviction here because we are holding a lock, which may happened to be evicted!
362                            // It is safe bc circular buffer full is caused by promoting to full page, which is a performance concern not correctness.
363                            //
364                            // Should we consider making the below function to be unsafe? To arise the awareness?
365                            // _ = self.tree.evict_from_circular_buffer();
366                            aggressive_split = true;
367                            continue;
368                        }
369                        Err(TreeError::NeedRestart) => {
370                            aggressive_split = true;
371                            backoff.spin();
372                            continue;
373                        }
374                    };
375                    self.scan_position = pos;
376                    self.leaf_lock = lock;
377                    break;
378                }
379                self.next(out_buffer)
380            }
381            GetScanRecordByPosResult::BoundKeyExceeded => {
382                self.scan_cnt = 0;
383                None
384            }
385        }
386    }
387}
388
389fn promote_or_merge_mini_page<'a>(
390    tree: &'a BfTree,
391    key: &[u8],
392    leaf: &mut LeafEntryXLocked<'a>,
393    parent: ReadGuard<'a>,
394) -> Result<ScanPosition, TreeError> {
395    let page_loc = leaf.get_page_location();
396    match page_loc {
397        PageLocation::Full(_) => {
398            unreachable!()
399        }
400        PageLocation::Base(offset) => {
401            counter!(ScanPromoteBaseToFull);
402            // upgrade this page to full page.
403            let next_level = MiniPageNextLevel::new(*offset);
404            let base_page_ref = leaf.load_base_page_from_buffer();
405            let pos = base_page_ref.lower_bound(key);
406
407            // Upgrade only if not empty
408            if base_page_ref.meta.meta_count_without_fence() > 0 {
409                let full_page_loc = upgrade_to_full_page(&tree.storage, base_page_ref, next_level)?;
410
411                leaf.create_cache_page_loc(full_page_loc);
412
413                Ok(ScanPosition::Full(pos as u32))
414            } else {
415                Ok(ScanPosition::Base(pos as u32))
416            }
417        }
418        PageLocation::Mini(ptr) => {
419            counter!(ScanMergeMiniPage);
420            let mini_page = leaf.load_cache_page_mut(*ptr);
421            // acquire the handle so that the eviction process with not contend with us.
422            let h = tree.storage.begin_dealloc_mini_page(mini_page)?;
423            let merge_result = leaf.try_merge_mini_page(&h, parent, &tree.storage)?;
424
425            match merge_result {
426                MergeResult::NoSplit => {
427                    // if no split, we face two choices:
428                    // (1) keep using the merged base page
429                    // (2) upgrade this page to full page, so that future scans don't need to load base page.
430                    //      this is done with probability to avoid polluting the cache.
431                    if tree.should_promote_scan_page() {
432                        // upgrade to full page
433                        let base_offset = mini_page.next_level;
434                        leaf.change_to_base_loc();
435                        tree.storage.finish_dealloc_mini_page(h);
436
437                        let base_page_ref = leaf.load_base_page_from_buffer();
438                        let pos = base_page_ref.lower_bound(key);
439                        if base_page_ref.meta.meta_count_without_fence() > 0 {
440                            let full_page_loc =
441                                upgrade_to_full_page(&tree.storage, base_page_ref, base_offset)?;
442
443                            leaf.create_cache_page_loc(full_page_loc);
444                            Ok(ScanPosition::Full(pos as u32))
445                        } else {
446                            Ok(ScanPosition::Base(pos as u32))
447                        }
448                    } else {
449                        leaf.change_to_base_loc();
450                        tree.storage.finish_dealloc_mini_page(h);
451                        let base_ref = leaf.load_base_page_from_buffer();
452                        let pos = base_ref.lower_bound(key);
453                        Ok(ScanPosition::Base(pos as u32))
454                    }
455                }
456                MergeResult::MergeAndSplit => {
457                    // if split happens, the mini page contains records that does not belong to us, we need to drop it.
458                    leaf.change_to_base_loc();
459                    tree.storage.finish_dealloc_mini_page(h);
460
461                    // we need to restart traverse to leaf, because merging splitted the base,
462                    // which may cause us to land on the wrong leaf.
463                    // retry on this might cause unnecessary IO (dropped the base), but it's rare.
464                    Err(TreeError::NeedRestart)
465                }
466            }
467        }
468        PageLocation::Null => panic!("promote_or_merge_mini_page on Null page"),
469    }
470}
471
472fn move_cursor_to_leaf_mut<'a>(
473    tree: &'a BfTree,
474    key: &[u8],
475    aggressive_split: bool,
476) -> Result<(ScanPosition, LeafEntryXLocked<'a>), TreeError> {
477    let (pid, parent) = tree.traverse_to_leaf(key, aggressive_split, false, None)?;
478
479    let mut leaf = tree.mapping_table().get_mut(&pid);
480
481    check_parent!(tree, pid, parent);
482
483    if let Ok(pos) = leaf.get_scan_position(key, false) {
484        match pos {
485            ScanPosition::Base(_) => {
486                if !tree.should_promote_scan_page() {
487                    return Ok((pos, leaf));
488                }
489                // o.w. fall through and upgrade to full page.
490            }
491            ScanPosition::Full(_) => {
492                return Ok((pos, leaf));
493            }
494            _ => {
495                panic!("Scanning mini or null page in ScanIterMut");
496            }
497        }
498    }
499
500    // we need to merge mini page.
501
502    let v = promote_or_merge_mini_page(tree, key, &mut leaf, parent.unwrap())?;
503    Ok((v, leaf))
504}
505
506fn move_cursor_to_leaf<'a>(
507    tree: &'a BfTree,
508    key: &[u8],
509    aggressive_split: bool,
510    mut next_key: Option<&mut Vec<u8>>,
511) -> Result<(ScanPosition, ScanLock<'a>), TreeError> {
512    let (pid, parent) = if tree.cache_only {
513        if let Some(key) = next_key.as_deref_mut() {
514            key.clear();
515        } else {
516            panic!("next_key is None in cache_only mode");
517        }
518        tree.traverse_to_leaf(key, aggressive_split, true, next_key)?
519    } else {
520        tree.traverse_to_leaf(key, aggressive_split, false, None)?
521    };
522
523    let mut leaf = tree.mapping_table().get(&pid);
524
525    check_parent!(tree, pid, parent);
526
527    if let Ok(pos) = leaf.get_scan_position(key, tree.cache_only) {
528        match pos {
529            ScanPosition::Base(_) => {
530                counter!(ScanBasePage);
531                if parent.is_none() || !tree.should_promote_scan_page() {
532                    return Ok((pos, ScanLock::S(leaf)));
533                }
534                // o.w. fall through and upgrade to full page.
535            }
536            ScanPosition::Full(_) => {
537                counter!(ScanFullPage);
538                return Ok((pos, ScanLock::S(leaf)));
539            }
540            ScanPosition::Mini(_) => {
541                assert!(tree.cache_only);
542                counter!(ScanMiniPage);
543                return Ok((pos, ScanLock::S(leaf)));
544            }
545            ScanPosition::Null => {
546                counter!(ScanNullPage);
547                return Ok((pos, ScanLock::S(leaf)));
548            }
549        }
550    }
551
552    // we need to merge mini page.
553    let mut x_leaf = leaf
554        .try_upgrade(tree.snapshot_mgr.clone(), pid)
555        .map_err(|_e| TreeError::Locked)?;
556
557    let v = promote_or_merge_mini_page(tree, key, &mut x_leaf, parent.unwrap())?;
558    Ok((v, ScanLock::X(x_leaf)))
559}
560
561#[cfg(test)]
562mod tests {
563    use crate::utils::test_util::install_value_to_buffer;
564    use crate::{BfTree, Config};
565    use crate::{LeafInsertResult, ScanReturnField};
566    use std::mem::size_of;
567
568    #[test]
569    fn test_scan_with_count() {
570        let tree = BfTree::default();
571
572        // Insert 1000 consecutive keys
573        let key_len: usize = tree.config.max_fence_len / 2;
574        let mut key_buffer = vec![0; key_len / size_of::<usize>()];
575
576        let value_len: usize = 1024; // 1KB long values
577        let mut value_buffer = vec![0; value_len / size_of::<usize>()];
578
579        for i in 0..1_000 {
580            let key = install_value_to_buffer(&mut key_buffer, i);
581            let value = install_value_to_buffer(&mut value_buffer, i);
582            if tree.insert(key, value) != LeafInsertResult::Success {
583                panic!("Insert failed");
584            }
585        }
586
587        // Scan with invalid count
588        let mut start_key = install_value_to_buffer(&mut key_buffer, 0);
589        let r = tree.scan_with_count(start_key, 0, ScanReturnField::Value);
590        match r {
591            Err(e) => {
592                assert_eq!(e, crate::ScanIterError::InvalidCount);
593            }
594            _ => {
595                panic!("Should not succeed");
596            }
597        }
598
599        // Scan 100 at a time for 9 times
600        let mut output_buffer = vec![0u8; key_len + value_len];
601        let mut prev_key = vec![0u8; key_len];
602        for _ in 0..9 {
603            start_key = &prev_key.as_slice().as_ref();
604            let mut scan_iter = tree
605                .scan_with_count(start_key, 101, ScanReturnField::KeyAndValue)
606                .expect("Scan failed");
607
608            let mut cnt = 0;
609            while let Some((kl, vl)) = scan_iter.next(&mut output_buffer) {
610                let scanned_key = &output_buffer[0..kl];
611                assert!(kl == key_len);
612
613                if cnt != 0 {
614                    let cmp_res = scanned_key.cmp(&prev_key);
615                    if cmp_res == std::cmp::Ordering::Less {
616                        panic!("Keys are not in order");
617                    }
618                    assert_eq!(cmp_res, std::cmp::Ordering::Greater);
619                }
620
621                prev_key[..kl].copy_from_slice(scanned_key);
622
623                assert!(vl == value_len);
624                cnt += 1;
625            }
626            assert!(cnt == 101);
627        }
628
629        // Scan 120 for the last 100 keys
630        start_key = &prev_key.as_slice().as_ref();
631        let mut scan_iter = tree
632            .scan_with_count(start_key, 120, ScanReturnField::Key)
633            .expect("Scan failed");
634        let mut cnt = 0;
635
636        while let Some((kl, vl)) = scan_iter.next(&mut output_buffer) {
637            let scanned_key = &output_buffer[0..kl];
638            assert!(kl == key_len);
639
640            if cnt != 0 {
641                let cmp_res = scanned_key.cmp(&prev_key);
642                assert_eq!(cmp_res, std::cmp::Ordering::Greater);
643            }
644            prev_key[..kl].copy_from_slice(scanned_key);
645            assert!(vl == 0);
646            cnt += 1;
647        }
648        assert!(cnt == 100);
649    }
650
651    #[test]
652    fn test_scan_with_end_key() {
653        let tree = BfTree::default();
654
655        // Insert 1000 consecutive keys
656        let key_len: usize = tree.config.max_fence_len / 2;
657        let mut key_buffer = vec![0; key_len / size_of::<usize>()];
658
659        let value_len: usize = 1024; // 1KB long values
660        let mut value_buffer = vec![0; value_len / size_of::<usize>()];
661
662        for i in 0..1_000 {
663            let key = install_value_to_buffer(&mut key_buffer, i);
664            let value = install_value_to_buffer(&mut value_buffer, i);
665            if tree.insert(key, value) != LeafInsertResult::Success {
666                panic!("Insert failed");
667            }
668        }
669
670        // Scan with invalid keys
671        let mut start_key = install_value_to_buffer(&mut key_buffer, 1);
672        let mut invalid_key_buffer: Vec<usize> = vec![0; key_len / size_of::<usize>() + 1];
673        let mut invalid_key = install_value_to_buffer(&mut invalid_key_buffer, 1);
674
675        let mut r = tree.scan_with_end_key(start_key, invalid_key, ScanReturnField::Value);
676        match r {
677            Err(e) => {
678                assert_eq!(e, crate::ScanIterError::InvalidEndKey);
679            }
680            _ => {
681                panic!("Should not succeed");
682            }
683        }
684
685        invalid_key = install_value_to_buffer(&mut invalid_key_buffer, 0);
686
687        r = tree.scan_with_end_key(invalid_key, start_key, ScanReturnField::Value);
688        match r {
689            Err(e) => {
690                assert_eq!(e, crate::ScanIterError::InvalidStartKey);
691            }
692            _ => {
693                panic!("Should not succeed");
694            }
695        }
696
697        let mut end_key_buffer = vec![0; key_len / size_of::<usize>()];
698        let mut end_key = install_value_to_buffer(&mut end_key_buffer, 0);
699
700        r = tree.scan_with_end_key(start_key, end_key, ScanReturnField::Value);
701        match r {
702            Err(e) => {
703                assert_eq!(e, crate::ScanIterError::InvalidKeyRange);
704            }
705            _ => {
706                panic!("Should not succeed");
707            }
708        }
709
710        start_key = install_value_to_buffer(&mut key_buffer, 0);
711        end_key = install_value_to_buffer(&mut end_key_buffer, 777);
712
713        let mut scan_iter = tree
714            .scan_with_end_key(start_key, end_key, ScanReturnField::Key)
715            .expect("Scan failed");
716        let mut output_buffer = vec![0u8; key_len];
717        let mut prev_key = vec![0u8; key_len];
718        let mut cnt = 0;
719
720        while let Some((kl, vl)) = scan_iter.next(&mut output_buffer) {
721            let scanned_key = &output_buffer[0..kl];
722            assert!(kl == key_len);
723
724            if cnt != 0 {
725                let cmp_res = scanned_key.cmp(&prev_key);
726                assert_eq!(cmp_res, std::cmp::Ordering::Greater);
727            }
728            prev_key[..kl].copy_from_slice(scanned_key);
729
730            let cmp_res = scanned_key.cmp(end_key);
731            assert!(cmp_res == std::cmp::Ordering::Less || cmp_res == std::cmp::Ordering::Equal);
732
733            assert!(vl == 0);
734            cnt += 1;
735        }
736
737        let cmp_res = prev_key.as_slice().cmp(end_key);
738        assert!(cmp_res == std::cmp::Ordering::Equal);
739        assert!(cnt == 40);
740    }
741
742    fn cache_only_tree(cb_size_byte: Option<usize>) -> BfTree {
743        let mut config = Config::default();
744        config.cache_only(true);
745        if let Some(size) = cb_size_byte {
746            config.cb_size_byte(size);
747        }
748        BfTree::with_config(config, None).expect("Failed to create cache-only BfTree")
749    }
750
751    #[test]
752    fn test_scan_with_count_cache_only() {
753        let tree = cache_only_tree(None);
754        assert!(tree.cache_only);
755
756        // Insert 1000 consecutive keys
757        let key_len: usize = tree.config.max_fence_len / 2;
758        let mut key_buffer = vec![0; key_len / size_of::<usize>()];
759
760        let value_len: usize = 1024; // 1KB long values
761        let mut value_buffer = vec![0; value_len / size_of::<usize>()];
762
763        for i in 0..1_000 {
764            let key = install_value_to_buffer(&mut key_buffer, i);
765            let value = install_value_to_buffer(&mut value_buffer, i);
766            if tree.insert(key, value) != LeafInsertResult::Success {
767                panic!("Insert failed");
768            }
769        }
770
771        // Scan with invalid count
772        let mut start_key = install_value_to_buffer(&mut key_buffer, 0);
773        let r = tree.scan_with_count(start_key, 0, ScanReturnField::Value);
774        match r {
775            Err(e) => {
776                assert_eq!(e, crate::ScanIterError::InvalidCount);
777            }
778            _ => {
779                panic!("Should not succeed");
780            }
781        }
782
783        // Scan 100 at a time for 9 times
784        let mut output_buffer = vec![0u8; key_len + value_len];
785        let mut prev_key = vec![0u8; key_len];
786        for _ in 0..9 {
787            start_key = &prev_key.as_slice().as_ref();
788            let mut scan_iter = tree
789                .scan_with_count(start_key, 101, ScanReturnField::KeyAndValue)
790                .expect("Scan failed");
791
792            let mut cnt = 0;
793            while let Some((kl, vl)) = scan_iter.next(&mut output_buffer) {
794                let scanned_key = &output_buffer[0..kl];
795                assert!(kl == key_len);
796
797                if cnt != 0 {
798                    let cmp_res = scanned_key.cmp(&prev_key);
799                    if cmp_res == std::cmp::Ordering::Less {
800                        panic!("Keys are not in order");
801                    }
802                    assert_eq!(cmp_res, std::cmp::Ordering::Greater);
803                }
804
805                prev_key[..kl].copy_from_slice(scanned_key);
806
807                assert!(vl == value_len);
808                cnt += 1;
809            }
810            assert!(cnt == 101);
811        }
812
813        // Scan 120 for the last 100 keys
814        start_key = &prev_key.as_slice().as_ref();
815        let mut scan_iter = tree
816            .scan_with_count(start_key, 120, ScanReturnField::Key)
817            .expect("Scan failed");
818        let mut cnt = 0;
819
820        while let Some((kl, vl)) = scan_iter.next(&mut output_buffer) {
821            let scanned_key = &output_buffer[0..kl];
822            assert!(kl == key_len);
823
824            if cnt != 0 {
825                let cmp_res = scanned_key.cmp(&prev_key);
826                assert_eq!(cmp_res, std::cmp::Ordering::Greater);
827            }
828            prev_key[..kl].copy_from_slice(scanned_key);
829            assert!(vl == 0);
830            cnt += 1;
831        }
832        assert!(cnt == 100);
833    }
834
835    #[test]
836    fn test_scan_with_end_key_cache_only() {
837        let tree = cache_only_tree(None);
838        assert!(tree.cache_only);
839
840        // Insert 1000 consecutive keys
841        let key_len: usize = tree.config.max_fence_len / 2;
842        let mut key_buffer = vec![0; key_len / size_of::<usize>()];
843
844        let value_len: usize = 1024; // 1KB long values
845        let mut value_buffer = vec![0; value_len / size_of::<usize>()];
846
847        for i in 0..1_000 {
848            let key = install_value_to_buffer(&mut key_buffer, i);
849            let value = install_value_to_buffer(&mut value_buffer, i);
850            if tree.insert(key, value) != LeafInsertResult::Success {
851                panic!("Insert failed");
852            }
853        }
854
855        // Scan with invalid keys
856        let start_key = install_value_to_buffer(&mut key_buffer, 1);
857        let mut invalid_key_buffer: Vec<usize> = vec![0; key_len / size_of::<usize>() + 1];
858        let mut invalid_key = install_value_to_buffer(&mut invalid_key_buffer, 1);
859
860        let mut r = tree.scan_with_end_key(start_key, invalid_key, ScanReturnField::Value);
861        match r {
862            Err(e) => {
863                assert_eq!(e, crate::ScanIterError::InvalidEndKey);
864            }
865            _ => {
866                panic!("Should not succeed");
867            }
868        }
869
870        invalid_key = install_value_to_buffer(&mut invalid_key_buffer, 0);
871
872        r = tree.scan_with_end_key(invalid_key, start_key, ScanReturnField::Value);
873        match r {
874            Err(e) => {
875                assert_eq!(e, crate::ScanIterError::InvalidStartKey);
876            }
877            _ => {
878                panic!("Should not succeed");
879            }
880        }
881
882        let mut end_key_buffer = vec![0; key_len / size_of::<usize>()];
883        let mut end_key = install_value_to_buffer(&mut end_key_buffer, 0);
884
885        r = tree.scan_with_end_key(start_key, end_key, ScanReturnField::Value);
886        match r {
887            Err(e) => {
888                assert_eq!(e, crate::ScanIterError::InvalidKeyRange);
889            }
890            _ => {
891                panic!("Should not succeed");
892            }
893        }
894
895        let start_key = install_value_to_buffer(&mut key_buffer, 0);
896        end_key = install_value_to_buffer(&mut end_key_buffer, 777);
897
898        let mut scan_iter = tree
899            .scan_with_end_key(start_key, end_key, ScanReturnField::Key)
900            .expect("Scan failed");
901        let mut output_buffer = vec![0u8; key_len];
902        let mut prev_key = vec![0u8; key_len];
903        let mut cnt = 0;
904
905        while let Some((kl, vl)) = scan_iter.next(&mut output_buffer) {
906            let scanned_key = &output_buffer[0..kl];
907            assert!(kl == key_len);
908
909            if cnt != 0 {
910                let cmp_res = scanned_key.cmp(&prev_key);
911                assert_eq!(cmp_res, std::cmp::Ordering::Greater);
912            }
913            prev_key[..kl].copy_from_slice(scanned_key);
914
915            let cmp_res = scanned_key.cmp(end_key);
916            assert!(cmp_res == std::cmp::Ordering::Less || cmp_res == std::cmp::Ordering::Equal);
917
918            assert!(vl == 0);
919            cnt += 1;
920        }
921
922        let cmp_res = prev_key.as_slice().cmp(end_key);
923        assert!(cmp_res == std::cmp::Ordering::Equal);
924        assert!(cnt == 40);
925    }
926
927    #[test]
928    fn test_scan_mut_disallowed_in_cache_only() {
929        let tree = cache_only_tree(None);
930        assert!(tree.cache_only);
931
932        let key_len: usize = tree.config.max_fence_len / 2;
933        let mut key_buffer = vec![0; key_len / size_of::<usize>()];
934        let start_key = install_value_to_buffer(&mut key_buffer, 0);
935
936        let mut end_key_buffer = vec![0; key_len / size_of::<usize>()];
937        let end_key = install_value_to_buffer(&mut end_key_buffer, 1);
938
939        let r = tree.scan_mut_with_count(start_key, 10, ScanReturnField::Value);
940        match r {
941            Err(e) => assert_eq!(e, crate::ScanIterError::CacheOnlyMode),
942            _ => panic!("scan_mut_with_count should be disallowed in cache-only mode"),
943        }
944
945        let r = tree.scan_mut_with_end_key(start_key, end_key, ScanReturnField::Value);
946        match r {
947            Err(e) => assert_eq!(e, crate::ScanIterError::CacheOnlyMode),
948            _ => panic!("scan_mut_with_end_key should be disallowed in cache-only mode"),
949        }
950    }
951
952    /// A small circular buffer (32 KiB cache, 4 KiB leaf pages -> only a
953    /// handful of leaves fit) so that inserting the test workload forces
954    /// evictions (Null pages) during scan.
955    const SMALL_CB_SIZE: usize = 32 * 1024;
956
957    #[test]
958    fn test_scan_with_count_cache_only_lossy() {
959        let tree = cache_only_tree(Some(SMALL_CB_SIZE));
960        assert!(tree.cache_only);
961
962        let key_len: usize = tree.config.max_fence_len / 2;
963        let mut key_buffer = vec![0; key_len / size_of::<usize>()];
964
965        // Use ~1 KiB values so the workload (10K records ~ 10 MiB) is
966        // dramatically larger than the 32 KiB circular buffer; evictions
967        // are guaranteed.
968        let value_len: usize = 1024;
969        let mut value_buffer = vec![0; value_len / size_of::<usize>()];
970
971        let total: usize = 10_000;
972        for i in 0..total {
973            let key = install_value_to_buffer(&mut key_buffer, i);
974            let value = install_value_to_buffer(&mut value_buffer, i);
975            if tree.insert(key, value) != LeafInsertResult::Success {
976                panic!("Insert failed");
977            }
978        }
979
980        // Scan everything from the smallest key.
981        let start_key = install_value_to_buffer(&mut key_buffer, 0);
982        let mut scan_iter = tree
983            .scan_with_count(start_key, usize::MAX, ScanReturnField::Key)
984            .expect("Scan failed");
985
986        let mut output_buffer = vec![0u8; key_len + value_len];
987        let mut prev_key = vec![0u8; key_len];
988        let mut cnt: usize = 0;
989        while let Some((kl, vl)) = scan_iter.next(&mut output_buffer) {
990            assert_eq!(kl, key_len);
991            assert_eq!(vl, 0); // ScanReturnField::Key
992
993            let scanned_key = &output_buffer[0..kl];
994            if cnt != 0 {
995                let cmp_res = scanned_key.cmp(&prev_key);
996                assert_eq!(
997                    cmp_res,
998                    std::cmp::Ordering::Greater,
999                    "scan must produce strictly increasing keys"
1000                );
1001            }
1002            prev_key[..kl].copy_from_slice(scanned_key);
1003            cnt += 1;
1004        }
1005
1006        // Some pages were evicted, so we should get strictly fewer than total.
1007        assert!(
1008            cnt < total,
1009            "expected evictions to drop some keys, got cnt={} total={}",
1010            cnt,
1011            total
1012        );
1013        // But the remaining cached data should still produce a non-trivial scan.
1014        assert!(cnt > 0, "scan returned no records");
1015    }
1016
1017    #[test]
1018    fn test_scan_with_end_key_cache_only_lossy() {
1019        let tree = cache_only_tree(Some(SMALL_CB_SIZE));
1020        assert!(tree.cache_only);
1021
1022        let key_len: usize = tree.config.max_fence_len / 2;
1023        let mut key_buffer = vec![0; key_len / size_of::<usize>()];
1024
1025        let value_len: usize = 1024;
1026        let mut value_buffer = vec![0; value_len / size_of::<usize>()];
1027
1028        let total: usize = 5_000;
1029        for i in 0..total {
1030            let key = install_value_to_buffer(&mut key_buffer, i);
1031            let value = install_value_to_buffer(&mut value_buffer, i);
1032            if tree.insert(key, value) != LeafInsertResult::Success {
1033                panic!("Insert failed");
1034            }
1035        }
1036
1037        // Bounded range scan; bound is well within the inserted key space.
1038        let start_key = install_value_to_buffer(&mut key_buffer, 0);
1039        let mut end_key_buffer = vec![0; key_len / size_of::<usize>()];
1040        let end_key_idx: usize = 2_000;
1041        let end_key = install_value_to_buffer(&mut end_key_buffer, end_key_idx);
1042
1043        let mut scan_iter = tree
1044            .scan_with_end_key(start_key, end_key, ScanReturnField::Key)
1045            .expect("Scan failed");
1046
1047        let mut output_buffer = vec![0u8; key_len];
1048        let mut prev_key = vec![0u8; key_len];
1049        let mut cnt: usize = 0;
1050        while let Some((kl, vl)) = scan_iter.next(&mut output_buffer) {
1051            assert_eq!(kl, key_len);
1052            assert_eq!(vl, 0);
1053
1054            let scanned_key = &output_buffer[0..kl];
1055            if cnt != 0 {
1056                let cmp_res = scanned_key.cmp(&prev_key);
1057                assert_eq!(cmp_res, std::cmp::Ordering::Greater);
1058            }
1059            prev_key[..kl].copy_from_slice(scanned_key);
1060
1061            // Every scanned key must be <= end_key.
1062            let cmp_res = scanned_key.cmp(end_key);
1063            assert!(cmp_res == std::cmp::Ordering::Less || cmp_res == std::cmp::Ordering::Equal);
1064
1065            cnt += 1;
1066        }
1067
1068        // Even with eviction we expect at least some keys, and we should
1069        // never see more than the bound implies.
1070        assert!(
1071            cnt <= end_key_idx + 1,
1072            "scan returned more keys ({}) than the upper bound ({})",
1073            cnt,
1074            end_key_idx + 1
1075        );
1076    }
1077
1078    #[test]
1079    fn test_scan_with_value_cache_only_lossy() {
1080        // Verifies that scanned (key, value) pairs are self-consistent (the
1081        // returned value matches the encoding of the returned key) even when
1082        // pages are being evicted, and that scan tolerates Null pages.
1083        let tree = cache_only_tree(Some(SMALL_CB_SIZE));
1084        assert!(tree.cache_only);
1085
1086        let key_len: usize = tree.config.max_fence_len / 2;
1087        let mut key_buffer = vec![0; key_len / size_of::<usize>()];
1088
1089        let value_len: usize = 1024;
1090        let mut value_buffer = vec![0; value_len / size_of::<usize>()];
1091
1092        let total: usize = 8_000;
1093        for i in 0..total {
1094            let key = install_value_to_buffer(&mut key_buffer, i);
1095            let value = install_value_to_buffer(&mut value_buffer, i);
1096            if tree.insert(key, value) != LeafInsertResult::Success {
1097                panic!("Insert failed");
1098            }
1099        }
1100
1101        let start_key = install_value_to_buffer(&mut key_buffer, 0);
1102        let mut scan_iter = tree
1103            .scan_with_count(start_key, usize::MAX, ScanReturnField::KeyAndValue)
1104            .expect("Scan failed");
1105
1106        let mut output_buffer = vec![0u8; key_len + value_len];
1107        let mut prev_key = vec![0u8; key_len];
1108        let mut cnt: usize = 0;
1109
1110        while let Some((kl, vl)) = scan_iter.next(&mut output_buffer) {
1111            assert_eq!(kl, key_len);
1112            assert_eq!(vl, value_len);
1113
1114            let scanned_key = &output_buffer[0..kl];
1115            let scanned_value = &output_buffer[kl..kl + vl];
1116
1117            if cnt != 0 {
1118                assert_eq!(scanned_key.cmp(&prev_key), std::cmp::Ordering::Greater);
1119            }
1120            prev_key[..kl].copy_from_slice(scanned_key);
1121
1122            // `install_value_to_buffer` fills the entire buffer by repeating
1123            // `murmur64(id)`; for the same id, the first `key_len` bytes of
1124            // the value buffer equal the entire key buffer. Use this to
1125            // verify the (key, value) pair is self-consistent.
1126            assert_eq!(
1127                &scanned_value[..key_len],
1128                scanned_key,
1129                "scanned value prefix does not match key (key/value out of sync)"
1130            );
1131            // The value is also a repetition of the same word: verify every
1132            // chunk of `key_len` bytes within the value matches the key.
1133            for chunk in scanned_value.chunks_exact(key_len) {
1134                assert_eq!(chunk, scanned_key, "value is not a repetition of key");
1135            }
1136
1137            cnt += 1;
1138        }
1139
1140        assert!(cnt > 0, "scan returned no records");
1141        assert!(
1142            cnt < total,
1143            "expected evictions to drop some keys, got cnt={} total={}",
1144            cnt,
1145            total
1146        );
1147    }
1148}