quill_sql/storage/index/
btree_iterator.rs1use std::ops::{Bound, RangeBounds};
2use std::sync::Arc;
3
4use crate::buffer::{ReadPageGuard, INVALID_PAGE_ID};
5use crate::config::BTreeConfig;
6use crate::error::QuillSQLResult;
7use crate::storage::codec::BPlusTreeLeafPageCodec;
8use crate::storage::page::RecordId;
9use crate::storage::tuple::Tuple;
10use crate::utils::ring_buffer::RingBuffer;
11use bytes::BytesMut;
12
13use super::btree_index::BPlusTreeIndex;
14
15#[derive(Debug)]
16pub struct TreeIndexIterator {
17 index: Arc<BPlusTreeIndex>,
18 start_bound: Bound<Tuple>,
19 end_bound: Bound<Tuple>,
20 current_guard: Option<ReadPageGuard>,
21 cursor: usize,
22 started: bool,
23 kv_offset: usize,
25 current_bytes: Option<BytesMut>,
27 batch_buf: RingBuffer<BytesMut>,
29 batch_enabled: bool,
30 batch_window: usize,
31}
32
33impl TreeIndexIterator {
34 pub fn new<R: RangeBounds<Tuple>>(index: Arc<BPlusTreeIndex>, range: R) -> Self {
36 Self::new_with_config(index.clone(), range, index.config)
37 }
38
39 pub fn new_with_config<R: RangeBounds<Tuple>>(
41 index: Arc<BPlusTreeIndex>,
42 range: R,
43 cfg: BTreeConfig,
44 ) -> Self {
45 let cap = cfg.seq_window.max(1);
46 Self {
47 index,
48 start_bound: range.start_bound().cloned(),
49 end_bound: range.end_bound().cloned(),
50 current_guard: None,
51 cursor: 0,
52 started: false,
53 kv_offset: 0,
54 current_bytes: None,
55 batch_buf: RingBuffer::with_capacity(cap),
56 batch_enabled: cfg.seq_batch_enable,
57 batch_window: cfg.seq_window,
58 }
59 }
60
61 pub fn next(&mut self) -> QuillSQLResult<Option<RecordId>> {
63 if !self.started {
64 let root_page_id = self.index.get_root_page_id()?;
65 if root_page_id == INVALID_PAGE_ID {
66 return Ok(None);
67 }
68
69 match &self.start_bound {
70 Bound::Included(k) | Bound::Excluded(k) => {
71 if self.current_guard.is_none() {
72 let guard = self.index.find_leaf_page_for_iterator(k, root_page_id)?;
73 let (header, hdr_off) =
74 BPlusTreeLeafPageCodec::decode_header_only(guard.data())?;
75 let (full_leaf, _) = BPlusTreeLeafPageCodec::decode(
77 guard.data(),
78 self.index.key_schema.clone(),
79 )?;
80 self.cursor = full_leaf
81 .next_closest(k, matches!(self.start_bound, Bound::Included(_)))
82 .unwrap_or(header.current_size as usize);
83 let mut off = hdr_off;
85 for _ in 0..self.cursor {
86 let (_, new_off) = BPlusTreeLeafPageCodec::decode_kv_at_offset(
87 guard.data(),
88 self.index.key_schema.clone(),
89 off,
90 )?;
91 off = new_off;
92 }
93 self.kv_offset = off;
94 self.current_guard = Some(guard);
95 }
96 }
97 Bound::Unbounded => {
98 let guard = self.index.find_first_leaf_page()?;
99 self.current_guard = Some(guard);
100 self.cursor = 0;
101 let (_, hdr_off) = BPlusTreeLeafPageCodec::decode_header_only(
102 self.current_guard.as_ref().unwrap().data(),
103 )?;
104 self.kv_offset = hdr_off;
105 }
106 };
107 self.started = true;
108 }
109
110 if let Some(bytes) = self.current_bytes.as_ref() {
112 let (h1, _) = BPlusTreeLeafPageCodec::decode_header_only(bytes)?;
114 if self.cursor >= h1.current_size as usize {
115 if let Some(next_bytes) = self.batch_buf.pop() {
117 let (_nh, nh_off) = BPlusTreeLeafPageCodec::decode_header_only(&next_bytes)?;
119 self.current_bytes = Some(next_bytes);
120 self.kv_offset = nh_off;
121 self.cursor = 0;
122 } else if h1.next_page_id != INVALID_PAGE_ID {
123 self.fill_batch_from(h1.next_page_id)?;
125 if let Some(next_bytes) = self.batch_buf.pop() {
126 let (_nh, nh_off) =
127 BPlusTreeLeafPageCodec::decode_header_only(&next_bytes)?;
128 self.current_bytes = Some(next_bytes);
129 self.kv_offset = nh_off;
130 self.cursor = 0;
131 } else {
132 self.current_bytes = None;
133 return Ok(None);
134 }
135 } else {
136 self.current_bytes = None;
137 return Ok(None);
138 }
139 return self.next();
140 }
141 let ((key, rid), new_off) = BPlusTreeLeafPageCodec::decode_kv_at_offset(
143 bytes,
144 self.index.key_schema.clone(),
145 self.kv_offset,
146 )?;
147 let in_range = match &self.end_bound {
148 Bound::Included(end_key) => &key <= end_key,
149 Bound::Excluded(end_key) => &key < end_key,
150 Bound::Unbounded => true,
151 };
152 if in_range {
153 self.cursor += 1;
154 self.kv_offset = new_off;
155 return Ok(Some(rid));
156 }
157 } else if let Some(guard) = self.current_guard.as_ref() {
158 let (h1, _) = BPlusTreeLeafPageCodec::decode_header_only(guard.data())?;
161 let v1 = h1.version;
162 if self.cursor >= h1.current_size as usize {
163 let next_page_id = h1.next_page_id;
164 if next_page_id == INVALID_PAGE_ID {
165 self.current_guard = None;
166 return Ok(None);
167 }
168 if self.batch_enabled {
170 self.fill_batch_from(next_page_id)?;
171 if let Some(next_bytes) = self.batch_buf.pop() {
172 let (_nh, nh_off) =
173 BPlusTreeLeafPageCodec::decode_header_only(&next_bytes)?;
174 self.current_bytes = Some(next_bytes);
175 self.current_guard = None; self.kv_offset = nh_off;
177 self.cursor = 0;
178 return self.next();
179 }
180 }
181 if let Ok((next_g, next_leaf)) = self
183 .index
184 .buffer_pool
185 .fetch_tree_leaf_page(next_page_id, self.index.key_schema.clone())
186 {
187 if next_leaf.header.next_page_id != INVALID_PAGE_ID {
188 let _ = self
189 .index
190 .buffer_pool
191 .prefetch_page(next_leaf.header.next_page_id);
192 }
193 self.current_guard = Some(next_g);
194 } else {
195 self.current_guard =
196 Some(self.index.buffer_pool.fetch_page_read(next_page_id)?);
197 }
198 self.cursor = 0;
199 let (_nh, nh_off) = BPlusTreeLeafPageCodec::decode_header_only(
201 self.current_guard.as_ref().unwrap().data(),
202 )?;
203 self.kv_offset = nh_off;
204 return self.next();
205 }
206
207 let ((key, rid), new_off) = BPlusTreeLeafPageCodec::decode_kv_at_offset(
209 guard.data(),
210 self.index.key_schema.clone(),
211 self.kv_offset,
212 )?;
213
214 let in_range = match &self.end_bound {
215 Bound::Included(end_key) => &key <= end_key,
216 Bound::Excluded(end_key) => &key < end_key,
217 Bound::Unbounded => true,
218 };
219
220 if in_range {
221 self.cursor += 1;
222 self.kv_offset = new_off;
224 let (h2, _) = BPlusTreeLeafPageCodec::decode_header_only(guard.data())?;
226 let v2 = h2.version;
227 if v1 == v2 {
228 return Ok(Some(rid));
229 } else {
230 let restart_key = key.clone();
232 let root = self.index.get_root_page_id()?;
233 let guard = self.index.find_leaf_page_for_iterator(&restart_key, root)?;
234 self.current_guard = Some(guard);
235 let (hdr2, hdr2_off) = BPlusTreeLeafPageCodec::decode_header_only(
237 self.current_guard.as_ref().unwrap().data(),
238 )?;
239 let mut off2 = hdr2_off;
240 let mut pos2 = 0usize;
241 while pos2 < hdr2.current_size as usize {
242 let ((k2, _), new_off2) = BPlusTreeLeafPageCodec::decode_kv_at_offset(
243 self.current_guard.as_ref().unwrap().data(),
244 self.index.key_schema.clone(),
245 off2,
246 )?;
247 if &k2 >= &restart_key {
248 break;
249 }
250 off2 = new_off2;
251 pos2 += 1;
252 }
253 self.cursor = pos2;
254 self.kv_offset = off2;
256 return self.next();
257 }
258 }
259 }
260
261 Ok(None)
262 }
263
264 fn fill_batch_from(&mut self, mut pid: crate::buffer::PageId) -> QuillSQLResult<()> {
266 while self.batch_buf.len() < self.batch_window && pid != INVALID_PAGE_ID {
267 let guard = self.index.buffer_pool.fetch_page_read(pid)?;
268 let (leaf, _) =
269 BPlusTreeLeafPageCodec::decode(guard.data(), self.index.key_schema.clone())?;
270 let next_pid = leaf.header.next_page_id;
271 let bytes = BytesMut::from(&guard.data()[..]);
272 self.batch_buf.push(bytes);
273 pid = next_pid;
274 }
275 Ok(())
276 }
277}