Skip to main content

bf_tree/
range_scan.rs

1// Copyright (c) Microsoft Corporation.
2// Licensed under the MIT license.
3
4use crate::{
5    check_parent, counter,
6    error::TreeError,
7    mini_page_op::{
8        upgrade_to_full_page, LeafEntrySLocked, LeafEntryXLocked, LeafOperations, MergeResult,
9    },
10    nodes::leaf_node::{GetScanRecordByPosResult, MiniPageNextLevel},
11    storage::PageLocation,
12    utils::{inner_lock::ReadGuard, Backoff},
13    BfTree,
14};
15
16pub(crate) enum ScanError {
17    NeedMergeMiniPage,
18}
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum ScanReturnField {
22    Key,
23    Value,
24    KeyAndValue,
25}
26
27pub(crate) enum ScanPosition {
28    Base(u32),
29    Full(u32),
30    // can't scan on mini page.
31}
32
33impl ScanPosition {
34    fn move_to_next(&mut self) {
35        match self {
36            ScanPosition::Base(offset) => *offset += 1,
37            ScanPosition::Full(offset) => *offset += 1,
38        }
39    }
40}
41
42// I think we only need s-lock. but we do x-lock because we can't downgrade a x-lock to s-lock yet.
43// implementing the downgrade is more challenging than I thought.
44// we currently keep both, but for performance we shouldn't hold the x-lock for too long.
45enum ScanLock<'b> {
46    S(LeafEntrySLocked<'b>),
47    X(LeafEntryXLocked<'b>),
48}
49
50impl ScanLock<'_> {
51    fn get_record_by_pos_with_bound(
52        &self,
53        pos: &ScanPosition,
54        out_buffer: &mut [u8],
55        return_field: ScanReturnField,
56        end_key: &Option<Vec<u8>>,
57    ) -> GetScanRecordByPosResult {
58        match self {
59            ScanLock::S(leaf) => {
60                leaf.scan_record_by_pos_with_bound(pos, out_buffer, return_field, end_key)
61            }
62            ScanLock::X(leaf) => {
63                leaf.scan_record_by_pos_with_bound(pos, out_buffer, return_field, end_key)
64            }
65        }
66    }
67
68    fn get_right_sibling(&mut self) -> Vec<u8> {
69        match self {
70            ScanLock::S(leaf) => leaf.get_right_sibling(),
71            ScanLock::X(leaf) => leaf.get_right_sibling(),
72        }
73    }
74}
75
76pub struct ScanIterMut<'a, 'b: 'a> {
77    tree: &'b BfTree,
78    scan_cnt: usize,
79
80    scan_position: ScanPosition,
81
82    leaf_lock: LeafEntryXLocked<'a>,
83
84    return_field: ScanReturnField,
85
86    end_key: Option<Vec<u8>>,
87}
88
89impl<'b> ScanIterMut<'_, 'b> {
90    pub fn new_with_scan_count(
91        tree: &'b BfTree,
92        start_key: &'b [u8],
93        scan_cnt: usize,
94        return_field: ScanReturnField,
95    ) -> Self {
96        let backoff = Backoff::new();
97        let mut aggressive_split = false;
98
99        loop {
100            let (scan_pos, lock) = match move_cursor_to_leaf_mut(tree, start_key, aggressive_split)
101            {
102                Ok((pos, lock)) => (pos, lock),
103                Err(TreeError::Locked) => {
104                    backoff.spin();
105                    continue;
106                }
107                Err(TreeError::NeedRestart) => {
108                    aggressive_split = true;
109                    backoff.spin();
110                    continue;
111                }
112                Err(TreeError::CircularBufferFull) => {
113                    _ = tree.evict_from_circular_buffer();
114                    aggressive_split = true;
115                    continue;
116                }
117            };
118
119            return Self {
120                tree,
121                scan_cnt,
122                scan_position: scan_pos,
123                leaf_lock: lock,
124                return_field,
125                end_key: None,
126            };
127        }
128    }
129
130    pub fn new_with_end_key(
131        tree: &'b BfTree,
132        start_key: &'b [u8],
133        end_key: &[u8],
134        return_field: ScanReturnField,
135    ) -> Self {
136        let mut si = Self::new_with_scan_count(tree, start_key, usize::MAX, return_field);
137        si.end_key = Some(end_key.to_vec());
138        si
139    }
140
141    pub fn next(&mut self, out_buffer: &mut [u8]) -> Option<(usize, usize)> {
142        if self.scan_cnt == 0 && self.end_key.is_none() {
143            return None;
144        }
145
146        match self.leaf_lock.scan_record_by_pos_with_bound(
147            &self.scan_position,
148            out_buffer,
149            self.return_field,
150            &self.end_key,
151        ) {
152            GetScanRecordByPosResult::Deleted => {
153                self.scan_position.move_to_next();
154                self.next(out_buffer)
155            }
156            GetScanRecordByPosResult::Found(key_len, value_len) => {
157                self.scan_position.move_to_next();
158                self.scan_cnt -= 1;
159
160                // since we are mut, we need to mark as dirty.
161                match self.leaf_lock.get_page_location() {
162                    PageLocation::Base(_offset) => {
163                        self.leaf_lock.load_base_page_mut();
164                    }
165                    PageLocation::Full(_) => {
166                        // do nothing.
167                    }
168                    PageLocation::Mini(_) => {
169                        unreachable!()
170                    }
171                    PageLocation::Null => panic!("range_scan next on Null page"),
172                }
173                Some((key_len as usize, value_len as usize))
174            }
175            GetScanRecordByPosResult::EndOfLeaf => {
176                // we need to load next leaf.
177                let right_sibling = self.leaf_lock.get_right_sibling();
178
179                if right_sibling.is_empty() {
180                    self.scan_cnt = 0;
181                    return None;
182                }
183
184                let backoff = Backoff::new();
185
186                let mut aggressive_split = false;
187                loop {
188                    let (pos, lock) = match move_cursor_to_leaf_mut(
189                        self.tree,
190                        &right_sibling,
191                        aggressive_split,
192                    ) {
193                        Ok((pos, lock)) => (pos, lock),
194                        Err(TreeError::Locked) => {
195                            backoff.spin();
196                            continue;
197                        }
198                        Err(TreeError::CircularBufferFull) => {
199                            // We can't call eviction here because we are holding a lock, which may happened to be evicted!
200                            // It is safe bc circular buffer full is caused by promoting to full page, which is a performance concern not correctness.
201                            //
202                            aggressive_split = true;
203                            continue;
204                        }
205                        Err(TreeError::NeedRestart) => {
206                            aggressive_split = true;
207                            backoff.spin();
208                            continue;
209                        }
210                    };
211                    self.scan_position = pos;
212                    self.leaf_lock = lock;
213                    break;
214                }
215                self.next(out_buffer)
216            }
217            GetScanRecordByPosResult::BoundKeyExceeded => {
218                self.scan_cnt = 0;
219                None
220            }
221        }
222    }
223}
224
225/// The scan iterator obtained from [BfTree::scan].
226pub struct ScanIter<'a, 'b: 'a> {
227    tree: &'b BfTree,
228    scan_cnt: usize,
229
230    scan_position: ScanPosition,
231
232    leaf_lock: ScanLock<'a>,
233
234    return_field: ScanReturnField,
235
236    end_key: Option<Vec<u8>>,
237}
238
239impl<'b> ScanIter<'_, 'b> {
240    pub fn new_with_scan_count(
241        tree: &'b BfTree,
242        start_key: &[u8],
243        scan_cnt: usize,
244        return_field: ScanReturnField,
245    ) -> Self {
246        let backoff = Backoff::new();
247        let mut aggressive_split = false;
248
249        loop {
250            let (scan_pos, lock) = match move_cursor_to_leaf(tree, start_key, aggressive_split) {
251                Ok((pos, lock)) => (pos, lock),
252                Err(TreeError::Locked) => {
253                    backoff.spin();
254                    continue;
255                }
256                Err(TreeError::NeedRestart) => {
257                    aggressive_split = true;
258                    backoff.spin();
259                    continue;
260                }
261                Err(TreeError::CircularBufferFull) => {
262                    _ = tree.evict_from_circular_buffer();
263                    aggressive_split = true;
264                    continue;
265                }
266            };
267
268            return Self {
269                tree,
270                scan_cnt,
271                scan_position: scan_pos,
272                leaf_lock: lock,
273                return_field,
274                end_key: None,
275            };
276        }
277    }
278
279    pub fn new_with_end_key(
280        tree: &'b BfTree,
281        start_key: &[u8],
282        end_key: &[u8],
283        return_field: ScanReturnField,
284    ) -> Self {
285        let mut si = Self::new_with_scan_count(tree, start_key, usize::MAX, return_field);
286        si.end_key = Some(end_key.to_vec());
287        si
288    }
289
290    // Here we need to busy loop? Is that safe?
291    /// Scan next value into `out_buffer`.
292    /// next() terminates if 1) reached the last key. 2) scanned `scan_cnt` records, if set. 3) reached end_key, if set.
293    /// Returns the length of the record fields copied into `out_buffer` or None if there is no more value.
294    pub fn next(&mut self, out_buffer: &mut [u8]) -> Option<(usize, usize)> {
295        if self.scan_cnt == 0 && self.end_key.is_none() {
296            return None;
297        }
298
299        match self.leaf_lock.get_record_by_pos_with_bound(
300            &self.scan_position,
301            out_buffer,
302            self.return_field,
303            &self.end_key,
304        ) {
305            GetScanRecordByPosResult::Deleted => {
306                self.scan_position.move_to_next();
307                self.next(out_buffer)
308            }
309            GetScanRecordByPosResult::Found(key_len, value_len) => {
310                self.scan_position.move_to_next();
311                self.scan_cnt -= 1;
312                Some((key_len as usize, value_len as usize))
313            }
314            GetScanRecordByPosResult::EndOfLeaf => {
315                // we need to load next leaf.
316                counter!(ScanGoNextLeaf);
317                let right_sibling = self.leaf_lock.get_right_sibling();
318
319                if right_sibling.is_empty() {
320                    self.scan_cnt = 0;
321                    return None;
322                }
323
324                let backoff = Backoff::new();
325
326                let mut aggressive_split = false;
327                loop {
328                    let (pos, lock) =
329                        match move_cursor_to_leaf(self.tree, &right_sibling, aggressive_split) {
330                            Ok((pos, lock)) => (pos, lock),
331                            Err(TreeError::Locked) => {
332                                backoff.spin();
333                                continue;
334                            }
335                            Err(TreeError::CircularBufferFull) => {
336                                // We can't call eviction here becuase we are holding a lock, which may happened to be evicted!
337                                // It is safe bc circular buffer full is caused by promoting to full page, which is a performance concern not correctness.
338                                //
339                                // Should we consider making the below function to be unsafe? To arise the awareness?
340                                // _ = self.tree.evict_from_circular_buffer();
341                                aggressive_split = true;
342                                continue;
343                            }
344                            Err(TreeError::NeedRestart) => {
345                                aggressive_split = true;
346                                backoff.spin();
347                                continue;
348                            }
349                        };
350                    self.scan_position = pos;
351                    self.leaf_lock = lock;
352                    break;
353                }
354                self.next(out_buffer)
355            }
356            GetScanRecordByPosResult::BoundKeyExceeded => {
357                self.scan_cnt = 0;
358                None
359            }
360        }
361    }
362}
363
364fn promote_or_merge_mini_page<'a>(
365    tree: &'a BfTree,
366    key: &[u8],
367    leaf: &mut LeafEntryXLocked<'a>,
368    parent: ReadGuard<'a>,
369) -> Result<ScanPosition, TreeError> {
370    let page_loc = leaf.get_page_location();
371    match page_loc {
372        PageLocation::Full(_) => {
373            unreachable!()
374        }
375        PageLocation::Base(offset) => {
376            counter!(ScanPromoteBaseToFull);
377            // upgrade this page to full page.
378            let next_level = MiniPageNextLevel::new(*offset);
379            let base_page_ref = leaf.load_base_page_from_buffer();
380            let pos = base_page_ref.lower_bound(key);
381
382            // Upgrade only if not empty
383            if base_page_ref.meta.meta_count_without_fence() > 0 {
384                let full_page_loc = upgrade_to_full_page(&tree.storage, base_page_ref, next_level)?;
385
386                leaf.create_cache_page_loc(full_page_loc);
387
388                Ok(ScanPosition::Full(pos as u32))
389            } else {
390                Ok(ScanPosition::Base(pos as u32))
391            }
392        }
393        PageLocation::Mini(ptr) => {
394            counter!(ScanMergeMiniPage);
395            let mini_page = leaf.load_cache_page_mut(*ptr);
396            // acquire the handle so that the eviction process with not contend with us.
397            let h = tree.storage.begin_dealloc_mini_page(mini_page)?;
398            let merge_result = leaf.try_merge_mini_page(&h, parent, &tree.storage)?;
399
400            match merge_result {
401                MergeResult::NoSplit => {
402                    // if no split, we face two choices:
403                    // (1) keep using the merged base page
404                    // (2) upgrade this page to full page, so that future scans don't need to load base page.
405                    //      this is done with probability to avoid polluting the cache.
406                    if tree.should_promote_scan_page() {
407                        // upgrade to full page
408                        let base_offset = mini_page.next_level;
409                        leaf.change_to_base_loc();
410                        tree.storage.finish_dealloc_mini_page(h);
411
412                        let base_page_ref = leaf.load_base_page_from_buffer();
413                        let pos = base_page_ref.lower_bound(key);
414                        if base_page_ref.meta.meta_count_without_fence() > 0 {
415                            let full_page_loc =
416                                upgrade_to_full_page(&tree.storage, base_page_ref, base_offset)?;
417
418                            leaf.create_cache_page_loc(full_page_loc);
419                            Ok(ScanPosition::Full(pos as u32))
420                        } else {
421                            Ok(ScanPosition::Base(pos as u32))
422                        }
423                    } else {
424                        leaf.change_to_base_loc();
425                        tree.storage.finish_dealloc_mini_page(h);
426                        let base_ref = leaf.load_base_page_from_buffer();
427                        let pos = base_ref.lower_bound(key);
428                        Ok(ScanPosition::Base(pos as u32))
429                    }
430                }
431                MergeResult::MergeAndSplit => {
432                    // if split happens, the mini page contains records that does not belong to us, we need to drop it.
433                    leaf.change_to_base_loc();
434                    tree.storage.finish_dealloc_mini_page(h);
435
436                    // we need to restart traverse to leaf, because merging splitted the base,
437                    // which may cause us to land on the wrong leaf.
438                    // retry on this might cause unnecessary IO (dropped the base), but it's rare.
439                    Err(TreeError::NeedRestart)
440                }
441            }
442        }
443        PageLocation::Null => panic!("promote_or_merge_mini_page on Null page"),
444    }
445}
446
447fn move_cursor_to_leaf_mut<'a>(
448    tree: &'a BfTree,
449    key: &[u8],
450    aggressive_split: bool,
451) -> Result<(ScanPosition, LeafEntryXLocked<'a>), TreeError> {
452    let (pid, parent) = tree.traverse_to_leaf(key, aggressive_split)?;
453
454    let mut leaf = tree.mapping_table().get_mut(&pid);
455
456    check_parent!(tree, pid, parent);
457
458    if let Ok(pos) = leaf.get_scan_position(key) {
459        match pos {
460            ScanPosition::Base(_) => {
461                if !tree.should_promote_scan_page() {
462                    return Ok((pos, leaf));
463                }
464                // o.w. fall through and upgrade to full page.
465            }
466            ScanPosition::Full(_) => {
467                return Ok((pos, leaf));
468            }
469        }
470    }
471
472    // we need to merge mini page.
473
474    let v = promote_or_merge_mini_page(tree, key, &mut leaf, parent.unwrap())?;
475    Ok((v, leaf))
476}
477
478fn move_cursor_to_leaf<'a>(
479    tree: &'a BfTree,
480    key: &[u8],
481    aggressive_split: bool,
482) -> Result<(ScanPosition, ScanLock<'a>), TreeError> {
483    let (pid, parent) = tree.traverse_to_leaf(key, aggressive_split)?;
484
485    let mut leaf = tree.mapping_table().get(&pid);
486
487    check_parent!(tree, pid, parent);
488
489    if let Ok(pos) = leaf.get_scan_position(key) {
490        match pos {
491            ScanPosition::Base(_) => {
492                counter!(ScanBasePage);
493                if parent.is_none() || !tree.should_promote_scan_page() {
494                    return Ok((pos, ScanLock::S(leaf)));
495                }
496                // o.w. fall through and upgrade to full page.
497            }
498            ScanPosition::Full(_) => {
499                counter!(ScanFullPage);
500                return Ok((pos, ScanLock::S(leaf)));
501            }
502        }
503    }
504
505    // we need to merge mini page.
506    let mut x_leaf = leaf.try_upgrade().map_err(|_e| TreeError::Locked)?;
507
508    let v = promote_or_merge_mini_page(tree, key, &mut x_leaf, parent.unwrap())?;
509    Ok((v, ScanLock::X(x_leaf)))
510}
511
512#[cfg(test)]
513mod tests {
514    use crate::utils::test_util::install_value_to_buffer;
515    use crate::BfTree;
516    use crate::{LeafInsertResult, ScanReturnField};
517    use std::mem::size_of;
518
519    #[test]
520    fn test_scan_with_count() {
521        let tree = BfTree::default();
522
523        // Insert 1000 consecutive keys
524        let key_len: usize = tree.config.max_fence_len / 2;
525        let mut key_buffer = vec![0; key_len / size_of::<usize>()];
526
527        let value_len: usize = 1024; // 1KB long values
528        let mut value_buffer = vec![0; value_len / size_of::<usize>()];
529
530        for i in 0..1_000 {
531            let key = install_value_to_buffer(&mut key_buffer, i);
532            let value = install_value_to_buffer(&mut value_buffer, i);
533            if tree.insert(key, value) != LeafInsertResult::Success {
534                panic!("Insert failed");
535            }
536        }
537
538        // Scan with invalid count
539        let mut start_key = install_value_to_buffer(&mut key_buffer, 0);
540        let r = tree.scan_with_count(start_key, 0, ScanReturnField::Value);
541        match r {
542            Err(e) => {
543                assert_eq!(e, crate::ScanIterError::InvalidCount);
544            }
545            _ => {
546                panic!("Should not succeed");
547            }
548        }
549
550        // Scan 100 at a time for 9 times
551        let mut output_buffer = vec![0u8; key_len + value_len];
552        let mut prev_key = vec![0u8; key_len];
553        for _ in 0..9 {
554            start_key = &prev_key.as_slice().as_ref();
555            let mut scan_iter = tree
556                .scan_with_count(start_key, 101, ScanReturnField::KeyAndValue)
557                .expect("Scan failed");
558
559            let mut cnt = 0;
560            while let Some((kl, vl)) = scan_iter.next(&mut output_buffer) {
561                let scanned_key = &output_buffer[0..kl];
562                assert!(kl == key_len);
563
564                if cnt != 0 {
565                    let cmp_res = scanned_key.cmp(&prev_key);
566                    if cmp_res == std::cmp::Ordering::Less {
567                        panic!("Keys are not in order");
568                    }
569                    assert_eq!(cmp_res, std::cmp::Ordering::Greater);
570                }
571
572                prev_key[..kl].copy_from_slice(scanned_key);
573
574                assert!(vl == value_len);
575                cnt += 1;
576            }
577            assert!(cnt == 101);
578        }
579
580        // Scan 120 for the last 100 keys
581        start_key = &prev_key.as_slice().as_ref();
582        let mut scan_iter = tree
583            .scan_with_count(start_key, 120, ScanReturnField::Key)
584            .expect("Scan failed");
585        let mut cnt = 0;
586
587        while let Some((kl, vl)) = scan_iter.next(&mut output_buffer) {
588            let scanned_key = &output_buffer[0..kl];
589            assert!(kl == key_len);
590
591            if cnt != 0 {
592                let cmp_res = scanned_key.cmp(&prev_key);
593                assert_eq!(cmp_res, std::cmp::Ordering::Greater);
594            }
595            prev_key[..kl].copy_from_slice(scanned_key);
596            assert!(vl == 0);
597            cnt += 1;
598        }
599        assert!(cnt == 100);
600    }
601
602    #[test]
603    fn test_scan_with_end_key() {
604        let tree = BfTree::default();
605
606        // Insert 1000 consecutive keys
607        let key_len: usize = tree.config.max_fence_len / 2;
608        let mut key_buffer = vec![0; key_len / size_of::<usize>()];
609
610        let value_len: usize = 1024; // 1KB long values
611        let mut value_buffer = vec![0; value_len / size_of::<usize>()];
612
613        for i in 0..1_000 {
614            let key = install_value_to_buffer(&mut key_buffer, i);
615            let value = install_value_to_buffer(&mut value_buffer, i);
616            if tree.insert(key, value) != LeafInsertResult::Success {
617                panic!("Insert failed");
618            }
619        }
620
621        // Scan with invalid keys
622        let mut start_key = install_value_to_buffer(&mut key_buffer, 1);
623        let mut invalid_key_buffer: Vec<usize> = vec![0; key_len / size_of::<usize>() + 1];
624        let mut invalid_key = install_value_to_buffer(&mut invalid_key_buffer, 1);
625
626        let mut r = tree.scan_with_end_key(start_key, invalid_key, ScanReturnField::Value);
627        match r {
628            Err(e) => {
629                assert_eq!(e, crate::ScanIterError::InvalidEndKey);
630            }
631            _ => {
632                panic!("Should not succeed");
633            }
634        }
635
636        invalid_key = install_value_to_buffer(&mut invalid_key_buffer, 0);
637
638        r = tree.scan_with_end_key(invalid_key, start_key, ScanReturnField::Value);
639        match r {
640            Err(e) => {
641                assert_eq!(e, crate::ScanIterError::InvalidStartKey);
642            }
643            _ => {
644                panic!("Should not succeed");
645            }
646        }
647
648        let mut end_key_buffer = vec![0; key_len / size_of::<usize>()];
649        let mut end_key = install_value_to_buffer(&mut end_key_buffer, 0);
650
651        r = tree.scan_with_end_key(start_key, end_key, ScanReturnField::Value);
652        match r {
653            Err(e) => {
654                assert_eq!(e, crate::ScanIterError::InvalidKeyRange);
655            }
656            _ => {
657                panic!("Should not succeed");
658            }
659        }
660
661        start_key = install_value_to_buffer(&mut key_buffer, 0);
662        end_key = install_value_to_buffer(&mut end_key_buffer, 777);
663
664        let mut scan_iter = tree
665            .scan_with_end_key(start_key, end_key, ScanReturnField::Key)
666            .expect("Scan failed");
667        let mut output_buffer = vec![0u8; key_len];
668        let mut prev_key = vec![0u8; key_len];
669        let mut cnt = 0;
670
671        while let Some((kl, vl)) = scan_iter.next(&mut output_buffer) {
672            let scanned_key = &output_buffer[0..kl];
673            assert!(kl == key_len);
674
675            if cnt != 0 {
676                let cmp_res = scanned_key.cmp(&prev_key);
677                assert_eq!(cmp_res, std::cmp::Ordering::Greater);
678            }
679            prev_key[..kl].copy_from_slice(scanned_key);
680
681            let cmp_res = scanned_key.cmp(end_key);
682            assert!(cmp_res == std::cmp::Ordering::Less || cmp_res == std::cmp::Ordering::Equal);
683
684            assert!(vl == 0);
685            cnt += 1;
686        }
687
688        let cmp_res = prev_key.as_slice().cmp(end_key);
689        assert!(cmp_res == std::cmp::Ordering::Equal);
690        assert!(cnt == 40);
691    }
692}