quill_sql/execution/physical_plan/
index_scan.rs1use std::collections::VecDeque;
2use std::ops::{Bound, RangeBounds};
3use std::sync::Arc;
4
5use parking_lot::Mutex;
6
7use crate::catalog::SchemaRef;
8use crate::error::QuillSQLError;
9use crate::execution::{ExecutionContext, VolcanoExecutor};
10use crate::storage::index::btree_index::TreeIndexIterator;
11use crate::storage::page::{RecordId, TupleMeta};
12use crate::storage::table_heap::TableHeap;
13use crate::transaction::{IsolationLevel, LockMode};
14use crate::utils::table_ref::TableReference;
15use crate::{error::QuillSQLResult, storage::tuple::Tuple};
16
17const INDEX_PREFETCH_BATCH: usize = 64;
18const INVISIBLE_THRESHOLD: usize = 2048;
19
20#[derive(Debug)]
21pub struct PhysicalIndexScan {
22 table_ref: TableReference,
23 index_name: String,
24 table_schema: SchemaRef,
25 start_bound: Bound<Tuple>,
26 end_bound: Bound<Tuple>,
27 iterator: Mutex<Option<TreeIndexIterator>>,
28 table_heap: Mutex<Option<Arc<TableHeap>>>,
29 buffer: Mutex<VecDeque<(RecordId, TupleMeta, Tuple)>>,
30 invisible_hits: Mutex<usize>,
31}
32
33impl PhysicalIndexScan {
34 pub fn new<R: RangeBounds<Tuple>>(
35 table_ref: TableReference,
36 index_name: String,
37 table_schema: SchemaRef,
38 range: R,
39 ) -> Self {
40 Self {
41 table_ref,
42 index_name,
43 table_schema,
44 start_bound: range.start_bound().cloned(),
45 end_bound: range.end_bound().cloned(),
46 iterator: Mutex::new(None),
47 table_heap: Mutex::new(None),
48 buffer: Mutex::new(VecDeque::new()),
49 invisible_hits: Mutex::new(0),
50 }
51 }
52
53 fn refill_buffer(&self) -> QuillSQLResult<bool> {
54 let table_heap = {
55 let guard = self.table_heap.lock();
56 guard
57 .clone()
58 .ok_or_else(|| QuillSQLError::Execution("table heap not initialized".to_string()))?
59 };
60
61 let mut fetched = VecDeque::with_capacity(INDEX_PREFETCH_BATCH);
62 {
63 let mut iter_guard = self.iterator.lock();
64 let iterator = iter_guard.as_mut().ok_or_else(|| {
65 QuillSQLError::Execution("index iterator not created".to_string())
66 })?;
67 for _ in 0..INDEX_PREFETCH_BATCH {
68 match iterator.next()? {
69 Some(rid) => {
70 let (meta, tuple) = table_heap.full_tuple(rid)?;
71 fetched.push_back((rid, meta, tuple));
72 }
73 None => break,
74 }
75 }
76 }
77
78 if fetched.is_empty() {
79 return Ok(false);
80 }
81
82 let mut buffer = self.buffer.lock();
83 buffer.extend(fetched);
84 Ok(true)
85 }
86
87 fn handle_invisible(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
88 let mut cnt = self.invisible_hits.lock();
89 *cnt += 1;
90 if *cnt >= INVISIBLE_THRESHOLD {
91 *cnt = 0;
92 if let Some(index_arc) = context.catalog.index(&self.table_ref, &self.index_name)? {
93 index_arc.note_potential_garbage(INVISIBLE_THRESHOLD);
94 }
95 }
96 Ok(())
97 }
98
99 fn consume_row(
100 &self,
101 context: &mut ExecutionContext,
102 rid: RecordId,
103 meta: TupleMeta,
104 tuple: Tuple,
105 ) -> QuillSQLResult<Option<Tuple>> {
106 context.read_visible_tuple(&self.table_ref, rid, &meta, tuple)
107 }
108}
109
110impl VolcanoExecutor for PhysicalIndexScan {
111 fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
112 if matches!(
113 context.txn().isolation_level(),
114 IsolationLevel::ReadCommitted
115 | IsolationLevel::RepeatableRead
116 | IsolationLevel::Serializable
117 ) {
118 context.lock_table(self.table_ref.clone(), LockMode::IntentionShared)?;
119 }
120
121 let table_heap = context.table_heap(&self.table_ref)?;
122 let index = context
123 .catalog
124 .index(&self.table_ref, &self.index_name)?
125 .unwrap();
126
127 {
128 let mut iter_guard = self.iterator.lock();
129 *iter_guard = Some(TreeIndexIterator::new(
130 index,
131 (self.start_bound.clone(), self.end_bound.clone()),
132 ));
133 }
134
135 {
136 let mut heap_guard = self.table_heap.lock();
137 *heap_guard = Some(table_heap);
138 }
139
140 self.buffer.lock().clear();
141 *self.invisible_hits.lock() = 0;
142
143 Ok(())
144 }
145
146 fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
147 loop {
148 if let Some((rid, meta, tuple)) = {
149 let mut buffer = self.buffer.lock();
150 buffer.pop_front()
151 } {
152 if meta.is_deleted {
153 self.handle_invisible(context)?;
154 continue;
155 }
156 if let Some(result) = self.consume_row(context, rid, meta, tuple)? {
157 return Ok(Some(result));
158 }
159 continue;
160 }
161
162 if !self.refill_buffer()? {
163 return Ok(None);
164 }
165 }
166 }
167
168 fn output_schema(&self) -> SchemaRef {
169 self.table_schema.clone()
170 }
171}
172
173impl std::fmt::Display for PhysicalIndexScan {
174 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175 write!(f, "IndexScan: {}", self.index_name)
176 }
177}