quill_sql/storage/index/
btree_iterator.rs

1use std::ops::{Bound, RangeBounds};
2use std::sync::Arc;
3
4use crate::buffer::{ReadPageGuard, INVALID_PAGE_ID};
5use crate::config::BTreeConfig;
6use crate::error::QuillSQLResult;
7use crate::storage::codec::BPlusTreeLeafPageCodec;
8use crate::storage::page::RecordId;
9use crate::storage::tuple::Tuple;
10use crate::utils::ring_buffer::RingBuffer;
11use bytes::BytesMut;
12
13use super::btree_index::BPlusTreeIndex;
14
15#[derive(Debug)]
16pub struct TreeIndexIterator {
17    index: Arc<BPlusTreeIndex>,
18    start_bound: Bound<Tuple>,
19    end_bound: Bound<Tuple>,
20    current_guard: Option<ReadPageGuard>,
21    cursor: usize,
22    started: bool,
23    // Byte offset to current cursor's KV within the leaf page
24    kv_offset: usize,
25    // Optional: current leaf bytes when using batch path
26    current_bytes: Option<BytesMut>,
27    // Unified ring buffer for upcoming leaf bytes
28    batch_buf: RingBuffer<BytesMut>,
29    batch_enabled: bool,
30    batch_window: usize,
31}
32
33impl TreeIndexIterator {
34    /// Create a new iterator over a range (reads config from index)
35    pub fn new<R: RangeBounds<Tuple>>(index: Arc<BPlusTreeIndex>, range: R) -> Self {
36        Self::new_with_config(index.clone(), range, index.config)
37    }
38
39    /// Create a new iterator with explicit configuration
40    pub fn new_with_config<R: RangeBounds<Tuple>>(
41        index: Arc<BPlusTreeIndex>,
42        range: R,
43        cfg: BTreeConfig,
44    ) -> Self {
45        let cap = cfg.seq_window.max(1);
46        Self {
47            index,
48            start_bound: range.start_bound().cloned(),
49            end_bound: range.end_bound().cloned(),
50            current_guard: None,
51            cursor: 0,
52            started: false,
53            kv_offset: 0,
54            current_bytes: None,
55            batch_buf: RingBuffer::with_capacity(cap),
56            batch_enabled: cfg.seq_batch_enable,
57            batch_window: cfg.seq_window,
58        }
59    }
60
61    /// Iterate to the next RID in order
62    pub fn next(&mut self) -> QuillSQLResult<Option<RecordId>> {
63        if !self.started {
64            let root_page_id = self.index.get_root_page_id()?;
65            if root_page_id == INVALID_PAGE_ID {
66                return Ok(None);
67            }
68
69            match &self.start_bound {
70                Bound::Included(k) | Bound::Excluded(k) => {
71                    if self.current_guard.is_none() {
72                        let guard = self.index.find_leaf_page_for_iterator(k, root_page_id)?;
73                        let (header, hdr_off) =
74                            BPlusTreeLeafPageCodec::decode_header_only(guard.data())?;
75                        // find initial cursor position
76                        let (full_leaf, _) = BPlusTreeLeafPageCodec::decode(
77                            guard.data(),
78                            self.index.key_schema.clone(),
79                        )?;
80                        self.cursor = full_leaf
81                            .next_closest(k, matches!(self.start_bound, Bound::Included(_)))
82                            .unwrap_or(header.current_size as usize);
83                        // compute initial kv_offset by advancing from header once
84                        let mut off = hdr_off;
85                        for _ in 0..self.cursor {
86                            let (_, new_off) = BPlusTreeLeafPageCodec::decode_kv_at_offset(
87                                guard.data(),
88                                self.index.key_schema.clone(),
89                                off,
90                            )?;
91                            off = new_off;
92                        }
93                        self.kv_offset = off;
94                        self.current_guard = Some(guard);
95                    }
96                }
97                Bound::Unbounded => {
98                    let guard = self.index.find_first_leaf_page()?;
99                    self.current_guard = Some(guard);
100                    self.cursor = 0;
101                    let (_, hdr_off) = BPlusTreeLeafPageCodec::decode_header_only(
102                        self.current_guard.as_ref().unwrap().data(),
103                    )?;
104                    self.kv_offset = hdr_off;
105                }
106            };
107            self.started = true;
108        }
109
110        // Two modes: guard-mode (buffer pool) vs bytes-mode (batch path)
111        if let Some(bytes) = self.current_bytes.as_ref() {
112            // Batch bytes-mode
113            let (h1, _) = BPlusTreeLeafPageCodec::decode_header_only(bytes)?;
114            if self.cursor >= h1.current_size as usize {
115                // advance to next leaf: use batch buffer
116                if let Some(next_bytes) = self.batch_buf.pop() {
117                    // decode header to compute kv_offset start
118                    let (_nh, nh_off) = BPlusTreeLeafPageCodec::decode_header_only(&next_bytes)?;
119                    self.current_bytes = Some(next_bytes);
120                    self.kv_offset = nh_off;
121                    self.cursor = 0;
122                } else if h1.next_page_id != INVALID_PAGE_ID {
123                    // refill batch from next pid synchronously
124                    self.fill_batch_from(h1.next_page_id)?;
125                    if let Some(next_bytes) = self.batch_buf.pop() {
126                        let (_nh, nh_off) =
127                            BPlusTreeLeafPageCodec::decode_header_only(&next_bytes)?;
128                        self.current_bytes = Some(next_bytes);
129                        self.kv_offset = nh_off;
130                        self.cursor = 0;
131                    } else {
132                        self.current_bytes = None;
133                        return Ok(None);
134                    }
135                } else {
136                    self.current_bytes = None;
137                    return Ok(None);
138                }
139                return self.next();
140            }
141            // decode KV at current cursor using cached offset
142            let ((key, rid), new_off) = BPlusTreeLeafPageCodec::decode_kv_at_offset(
143                bytes,
144                self.index.key_schema.clone(),
145                self.kv_offset,
146            )?;
147            let in_range = match &self.end_bound {
148                Bound::Included(end_key) => &key <= end_key,
149                Bound::Excluded(end_key) => &key < end_key,
150                Bound::Unbounded => true,
151            };
152            if in_range {
153                self.cursor += 1;
154                self.kv_offset = new_off;
155                return Ok(Some(rid));
156            }
157        } else if let Some(guard) = self.current_guard.as_ref() {
158            // lightweight OLC on iterator read
159            // Fast path: header-only double-read for OLC
160            let (h1, _) = BPlusTreeLeafPageCodec::decode_header_only(guard.data())?;
161            let v1 = h1.version;
162            if self.cursor >= h1.current_size as usize {
163                let next_page_id = h1.next_page_id;
164                if next_page_id == INVALID_PAGE_ID {
165                    self.current_guard = None;
166                    return Ok(None);
167                }
168                // Switch to batch mode if enabled
169                if self.batch_enabled {
170                    self.fill_batch_from(next_page_id)?;
171                    if let Some(next_bytes) = self.batch_buf.pop() {
172                        let (_nh, nh_off) =
173                            BPlusTreeLeafPageCodec::decode_header_only(&next_bytes)?;
174                        self.current_bytes = Some(next_bytes);
175                        self.current_guard = None; // leave guard-mode
176                        self.kv_offset = nh_off;
177                        self.cursor = 0;
178                        return self.next();
179                    }
180                }
181                // prefetch next-next leaf to warm cache (best-effort)
182                if let Ok((next_g, next_leaf)) = self
183                    .index
184                    .buffer_pool
185                    .fetch_tree_leaf_page(next_page_id, self.index.key_schema.clone())
186                {
187                    if next_leaf.header.next_page_id != INVALID_PAGE_ID {
188                        let _ = self
189                            .index
190                            .buffer_pool
191                            .prefetch_page(next_leaf.header.next_page_id);
192                    }
193                    self.current_guard = Some(next_g);
194                } else {
195                    self.current_guard =
196                        Some(self.index.buffer_pool.fetch_page_read(next_page_id)?);
197                }
198                self.cursor = 0;
199                // reset kv_offset to the start of next leaf
200                let (_nh, nh_off) = BPlusTreeLeafPageCodec::decode_header_only(
201                    self.current_guard.as_ref().unwrap().data(),
202                )?;
203                self.kv_offset = nh_off;
204                return self.next();
205            }
206
207            // decode key/rid at current cursor using cached offset
208            let ((key, rid), new_off) = BPlusTreeLeafPageCodec::decode_kv_at_offset(
209                guard.data(),
210                self.index.key_schema.clone(),
211                self.kv_offset,
212            )?;
213
214            let in_range = match &self.end_bound {
215                Bound::Included(end_key) => &key <= end_key,
216                Bound::Excluded(end_key) => &key < end_key,
217                Bound::Unbounded => true,
218            };
219
220            if in_range {
221                self.cursor += 1;
222                // advance cached offset to next KV
223                self.kv_offset = new_off;
224                // verify version unchanged; otherwise restart iterator lazily
225                let (h2, _) = BPlusTreeLeafPageCodec::decode_header_only(guard.data())?;
226                let v2 = h2.version;
227                if v1 == v2 {
228                    return Ok(Some(rid));
229                } else {
230                    // restart from current key to avoid duplicates
231                    let restart_key = key.clone();
232                    let root = self.index.get_root_page_id()?;
233                    let guard = self.index.find_leaf_page_for_iterator(&restart_key, root)?;
234                    self.current_guard = Some(guard);
235                    // recompute cursor cheaply using header + linear seek
236                    let (hdr2, hdr2_off) = BPlusTreeLeafPageCodec::decode_header_only(
237                        self.current_guard.as_ref().unwrap().data(),
238                    )?;
239                    let mut off2 = hdr2_off;
240                    let mut pos2 = 0usize;
241                    while pos2 < hdr2.current_size as usize {
242                        let ((k2, _), new_off2) = BPlusTreeLeafPageCodec::decode_kv_at_offset(
243                            self.current_guard.as_ref().unwrap().data(),
244                            self.index.key_schema.clone(),
245                            off2,
246                        )?;
247                        if &k2 >= &restart_key {
248                            break;
249                        }
250                        off2 = new_off2;
251                        pos2 += 1;
252                    }
253                    self.cursor = pos2;
254                    // set cached offset for the new cursor
255                    self.kv_offset = off2;
256                    return self.next();
257                }
258            }
259        }
260
261        Ok(None)
262    }
263
264    /// Synchronously fill batch buffer by following the next_page_id chain.
265    fn fill_batch_from(&mut self, mut pid: crate::buffer::PageId) -> QuillSQLResult<()> {
266        while self.batch_buf.len() < self.batch_window && pid != INVALID_PAGE_ID {
267            let guard = self.index.buffer_pool.fetch_page_read(pid)?;
268            let (leaf, _) =
269                BPlusTreeLeafPageCodec::decode(guard.data(), self.index.key_schema.clone())?;
270            let next_pid = leaf.header.next_page_id;
271            let bytes = BytesMut::from(&guard.data()[..]);
272            self.batch_buf.push(bytes);
273            pid = next_pid;
274        }
275        Ok(())
276    }
277}