1use crate::{
5 check_parent, counter,
6 error::TreeError,
7 mini_page_op::{
8 upgrade_to_full_page, LeafEntrySLocked, LeafEntryXLocked, LeafOperations, MergeResult,
9 },
10 nodes::leaf_node::{GetValueByPosResult, MiniPageNextLevel},
11 storage::PageLocation,
12 utils::{inner_lock::ReadGuard, Backoff},
13 BfTree,
14};
15
16pub(crate) enum ScanError {
17 NeedMergeMiniPage,
18}
19
20pub(crate) enum ScanPosition {
21 Base(u32),
22 Full(u32),
23 }
25
26impl ScanPosition {
27 fn move_to_next(&mut self) {
28 match self {
29 ScanPosition::Base(offset) => *offset += 1,
30 ScanPosition::Full(offset) => *offset += 1,
31 }
32 }
33}
34
35enum ScanLock<'b> {
39 S(LeafEntrySLocked<'b>),
40 X(LeafEntryXLocked<'b>),
41}
42
43impl ScanLock<'_> {
44 fn get_value_by_pos(&self, pos: &ScanPosition, out_buffer: &mut [u8]) -> GetValueByPosResult {
45 match self {
46 ScanLock::S(leaf) => leaf.scan_value_by_pos(pos, out_buffer),
47 ScanLock::X(leaf) => leaf.scan_value_by_pos(pos, out_buffer),
48 }
49 }
50
51 fn get_right_sibling(&mut self) -> Vec<u8> {
52 match self {
53 ScanLock::S(leaf) => leaf.get_right_sibling(),
54 ScanLock::X(leaf) => leaf.get_right_sibling(),
55 }
56 }
57}
58
59pub struct ScanIterMut<'a, 'b: 'a> {
60 tree: &'b BfTree,
61 scan_cnt: usize,
62
63 scan_position: ScanPosition,
64
65 leaf_lock: LeafEntryXLocked<'a>,
66}
67
68impl<'b> ScanIterMut<'_, 'b> {
69 pub(crate) fn new(tree: &'b BfTree, start_key: &'b [u8], scan_cnt: usize) -> Self {
70 let backoff = Backoff::new();
71 let mut aggressive_split = false;
72
73 loop {
74 let (scan_pos, lock) = match move_cursor_to_leaf_mut(tree, start_key, aggressive_split)
75 {
76 Ok((pos, lock)) => (pos, lock),
77 Err(TreeError::Locked) => {
78 backoff.spin();
79 continue;
80 }
81 Err(TreeError::NeedRestart) => {
82 aggressive_split = true;
83 backoff.spin();
84 continue;
85 }
86 Err(TreeError::CircularBufferFull) => {
87 _ = tree.evict_from_circular_buffer();
88 aggressive_split = true;
89 continue;
90 }
91 };
92
93 return Self {
94 tree,
95 scan_cnt,
96 scan_position: scan_pos,
97 leaf_lock: lock,
98 };
99 }
100 }
101
102 pub fn next(&mut self, out_buffer: &mut [u8]) -> Option<usize> {
103 if self.scan_cnt == 0 {
104 return None;
105 }
106
107 match self
108 .leaf_lock
109 .scan_value_by_pos(&self.scan_position, out_buffer)
110 {
111 GetValueByPosResult::Deleted => {
112 self.scan_position.move_to_next();
113 self.next(out_buffer)
114 }
115 GetValueByPosResult::Found(len) => {
116 self.scan_position.move_to_next();
117 self.scan_cnt -= 1;
118
119 match self.leaf_lock.get_page_location() {
121 PageLocation::Base(_offset) => {
122 self.leaf_lock.load_base_page_mut();
123 }
124 PageLocation::Full(_) => {
125 }
127 PageLocation::Mini(_) => {
128 unreachable!()
129 }
130 PageLocation::Null => panic!("range_scan next on Null page"),
131 }
132 Some(len as usize)
133 }
134 GetValueByPosResult::EndOfLeaf => {
135 let right_sibling = self.leaf_lock.get_right_sibling();
137
138 if right_sibling.is_empty() {
139 self.scan_cnt = 0;
140 return None;
141 }
142
143 let backoff = Backoff::new();
144
145 let mut aggressive_split = false;
146 loop {
147 let (pos, lock) = match move_cursor_to_leaf_mut(
148 self.tree,
149 &right_sibling,
150 aggressive_split,
151 ) {
152 Ok((pos, lock)) => (pos, lock),
153 Err(TreeError::Locked) => {
154 backoff.spin();
155 continue;
156 }
157 Err(TreeError::CircularBufferFull) => {
158 aggressive_split = true;
162 continue;
163 }
164 Err(TreeError::NeedRestart) => {
165 aggressive_split = true;
166 backoff.spin();
167 continue;
168 }
169 };
170 self.scan_position = pos;
171 self.leaf_lock = lock;
172 break;
173 }
174 self.next(out_buffer)
175 }
176 }
177 }
178}
179
180pub struct ScanIter<'a, 'b: 'a> {
182 tree: &'b BfTree,
183 scan_cnt: usize,
184
185 scan_position: ScanPosition,
186
187 leaf_lock: ScanLock<'a>,
188}
189
190impl<'b> ScanIter<'_, 'b> {
191 pub(crate) fn new(tree: &'b BfTree, start_key: &'b [u8], scan_cnt: usize) -> Self {
192 let backoff = Backoff::new();
193 let mut aggressive_split = false;
194
195 loop {
196 let (scan_pos, lock) = match move_cursor_to_leaf(tree, start_key, aggressive_split) {
197 Ok((pos, lock)) => (pos, lock),
198 Err(TreeError::Locked) => {
199 backoff.spin();
200 continue;
201 }
202 Err(TreeError::NeedRestart) => {
203 aggressive_split = true;
204 backoff.spin();
205 continue;
206 }
207 Err(TreeError::CircularBufferFull) => {
208 _ = tree.evict_from_circular_buffer();
209 aggressive_split = true;
210 continue;
211 }
212 };
213
214 return Self {
215 tree,
216 scan_cnt,
217 scan_position: scan_pos,
218 leaf_lock: lock,
219 };
220 }
221 }
222
223 pub fn next(&mut self, out_buffer: &mut [u8]) -> Option<usize> {
227 if self.scan_cnt == 0 {
228 return None;
229 }
230
231 match self
232 .leaf_lock
233 .get_value_by_pos(&self.scan_position, out_buffer)
234 {
235 GetValueByPosResult::Deleted => {
236 self.scan_position.move_to_next();
237 self.next(out_buffer)
238 }
239 GetValueByPosResult::Found(len) => {
240 self.scan_position.move_to_next();
241 self.scan_cnt -= 1;
242 Some(len as usize)
243 }
244 GetValueByPosResult::EndOfLeaf => {
245 counter!(ScanGoNextLeaf);
247 let right_sibling = self.leaf_lock.get_right_sibling();
248
249 if right_sibling.is_empty() {
250 self.scan_cnt = 0;
251 return None;
252 }
253
254 let backoff = Backoff::new();
255
256 let mut aggressive_split = false;
257 loop {
258 let (pos, lock) =
259 match move_cursor_to_leaf(self.tree, &right_sibling, aggressive_split) {
260 Ok((pos, lock)) => (pos, lock),
261 Err(TreeError::Locked) => {
262 backoff.spin();
263 continue;
264 }
265 Err(TreeError::CircularBufferFull) => {
266 aggressive_split = true;
272 continue;
273 }
274 Err(TreeError::NeedRestart) => {
275 aggressive_split = true;
276 backoff.spin();
277 continue;
278 }
279 };
280 self.scan_position = pos;
281 self.leaf_lock = lock;
282 break;
283 }
284 self.next(out_buffer)
285 }
286 }
287 }
288}
289
290fn promote_or_merge_mini_page<'a>(
291 tree: &'a BfTree,
292 key: &[u8],
293 leaf: &mut LeafEntryXLocked<'a>,
294 parent: ReadGuard<'a>,
295) -> Result<ScanPosition, TreeError> {
296 let page_loc = leaf.get_page_location();
297 match page_loc {
298 PageLocation::Full(_) => {
299 unreachable!()
300 }
301 PageLocation::Base(offset) => {
302 counter!(ScanPromoteBaseToFull);
303 let next_level = MiniPageNextLevel::new(*offset);
305 let base_page_ref = leaf.load_base_page_from_buffer();
306 let pos = base_page_ref.lower_bound(key);
307
308 let full_page_loc = upgrade_to_full_page(&tree.storage, base_page_ref, next_level)?;
309
310 leaf.create_cache_page_loc(full_page_loc);
311
312 Ok(ScanPosition::Base(pos as u32))
313 }
314 PageLocation::Mini(ptr) => {
315 counter!(ScanMergeMiniPage);
316 let mini_page = leaf.load_cache_page_mut(*ptr);
317 let h = tree.storage.begin_dealloc_mini_page(mini_page)?;
319 let merge_result = leaf.try_merge_mini_page(&h, parent, &tree.storage)?;
320
321 match merge_result {
322 MergeResult::NoSplit => {
323 if tree.should_promote_scan_page() {
328 let base_offset = mini_page.next_level;
330 leaf.change_to_base_loc();
331 tree.storage.finish_dealloc_mini_page(h);
332
333 let base_page_ref = leaf.load_base_page_from_buffer();
334 let full_page_loc =
335 upgrade_to_full_page(&tree.storage, base_page_ref, base_offset)?;
336
337 leaf.create_cache_page_loc(full_page_loc);
338 Err(TreeError::NeedRestart)
339 } else {
340 mini_page.consolidate_after_merge();
341 leaf.change_to_base_loc();
342 tree.storage.finish_dealloc_mini_page(h);
343 let base_ref = leaf.load_base_page_from_buffer();
344 let pos = base_ref.lower_bound(key);
345 Ok(ScanPosition::Base(pos as u32))
346 }
347 }
348 MergeResult::MergeAndSplit => {
349 leaf.change_to_base_loc();
351 tree.storage.finish_dealloc_mini_page(h);
352
353 Err(TreeError::NeedRestart)
357 }
358 }
359 }
360 PageLocation::Null => panic!("promote_or_merge_mini_page on Null page"),
361 }
362}
363
364fn move_cursor_to_leaf_mut<'a>(
365 tree: &'a BfTree,
366 key: &[u8],
367 aggressive_split: bool,
368) -> Result<(ScanPosition, LeafEntryXLocked<'a>), TreeError> {
369 let (pid, parent) = tree.traverse_to_leaf(key, aggressive_split)?;
370
371 let mut leaf = tree.mapping_table().get_mut(&pid);
372
373 check_parent!(tree, pid, parent);
374
375 if let Ok(pos) = leaf.get_scan_position(key) {
376 match pos {
377 ScanPosition::Base(_) => {
378 if !tree.should_promote_scan_page() {
379 return Ok((pos, leaf));
380 }
381 }
383 ScanPosition::Full(_) => {
384 return Ok((pos, leaf));
385 }
386 }
387 }
388
389 let v = promote_or_merge_mini_page(tree, key, &mut leaf, parent.unwrap())?;
392 Ok((v, leaf))
393}
394
395fn move_cursor_to_leaf<'a>(
396 tree: &'a BfTree,
397 key: &[u8],
398 aggressive_split: bool,
399) -> Result<(ScanPosition, ScanLock<'a>), TreeError> {
400 let (pid, parent) = tree.traverse_to_leaf(key, aggressive_split)?;
401
402 let mut leaf = tree.mapping_table().get(&pid);
403
404 check_parent!(tree, pid, parent);
405
406 if let Ok(pos) = leaf.get_scan_position(key) {
407 match pos {
408 ScanPosition::Base(_) => {
409 counter!(ScanBasePage);
410 if parent.is_none() || !tree.should_promote_scan_page() {
411 return Ok((pos, ScanLock::S(leaf)));
412 }
413 }
415 ScanPosition::Full(_) => {
416 counter!(ScanFullPage);
417 return Ok((pos, ScanLock::S(leaf)));
418 }
419 }
420 }
421
422 let mut x_leaf = leaf.try_upgrade().map_err(|_e| TreeError::Locked)?;
424
425 let v = promote_or_merge_mini_page(tree, key, &mut x_leaf, parent.unwrap())?;
426 Ok((v, ScanLock::X(x_leaf)))
427}