quill_sql/execution/physical_plan/
index_scan.rs1use std::cell::RefCell;
4use std::ops::{Bound, RangeBounds};
5use std::sync::OnceLock;
6
7use super::scan::ScanPrefetch;
8use crate::catalog::SchemaRef;
9use crate::execution::physical_plan::{resolve_table_binding, stream_not_ready};
10use crate::execution::{ExecutionContext, VolcanoExecutor};
11use crate::storage::{
12 engine::{IndexScanRequest, TableBinding, TupleStream},
13 page::{RecordId, TupleMeta},
14};
15use crate::transaction::{IsolationLevel, LockMode};
16use crate::utils::table_ref::TableReference;
17use crate::{error::QuillSQLResult, storage::tuple::Tuple};
18
19const INDEX_PREFETCH_BATCH: usize = 64;
20pub struct PhysicalIndexScan {
21 table_ref: TableReference,
22 index_name: String,
23 table_schema: SchemaRef,
24 start_bound: Bound<Tuple>,
25 end_bound: Bound<Tuple>,
26 stream: RefCell<Option<Box<dyn TupleStream>>>,
27 prefetch: ScanPrefetch,
28 table_binding: OnceLock<TableBinding>,
29}
30
31impl PhysicalIndexScan {
32 pub fn new<R: RangeBounds<Tuple>>(
33 table_ref: TableReference,
34 index_name: String,
35 table_schema: SchemaRef,
36 range: R,
37 ) -> Self {
38 Self {
39 table_ref,
40 index_name,
41 table_schema,
42 start_bound: range.start_bound().cloned(),
43 end_bound: range.end_bound().cloned(),
44 stream: RefCell::new(None),
45 prefetch: ScanPrefetch::new(INDEX_PREFETCH_BATCH),
46 table_binding: OnceLock::new(),
47 }
48 }
49
50 fn refill_buffer(&self) -> QuillSQLResult<bool> {
51 self.prefetch.refill(|limit, out| {
52 let mut stream_guard = self.stream.borrow_mut();
53 let stream = stream_guard
54 .as_mut()
55 .ok_or_else(|| stream_not_ready("IndexScan"))?;
56 for _ in 0..limit {
57 match stream.next()? {
58 Some(entry) => out.push_back(entry),
59 None => break,
60 }
61 }
62 Ok(())
63 })
64 }
65
66 fn consume_row(
67 &self,
68 context: &mut ExecutionContext,
69 rid: RecordId,
70 meta: TupleMeta,
71 tuple: Tuple,
72 ) -> QuillSQLResult<Option<Tuple>> {
73 context
74 .txn_ctx_mut()
75 .read_visible_tuple(&self.table_ref, rid, &meta, tuple)
76 }
77}
78
79impl VolcanoExecutor for PhysicalIndexScan {
80 fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
81 if matches!(
82 context.txn_ctx().isolation_level(),
83 IsolationLevel::ReadCommitted
84 | IsolationLevel::RepeatableRead
85 | IsolationLevel::Serializable
86 ) {
87 context
88 .txn_ctx_mut()
89 .lock_table(self.table_ref.clone(), LockMode::IntentionShared)?;
90 }
91
92 let binding = resolve_table_binding(&self.table_binding, context, &self.table_ref)?;
93 let request = IndexScanRequest::new(self.start_bound.clone(), self.end_bound.clone());
94 self.stream
95 .replace(Some(binding.index_scan(&self.index_name, request)?));
96
97 self.prefetch.clear();
98
99 Ok(())
100 }
101
102 fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
103 loop {
104 if let Some((rid, meta, tuple)) = self.prefetch.pop_front() {
105 if meta.is_deleted {
106 continue;
107 }
108 if let Some(result) = self.consume_row(context, rid, meta, tuple)? {
109 return Ok(Some(result));
110 }
111 continue;
112 }
113
114 if !self.refill_buffer()? {
115 return Ok(None);
116 }
117 }
118 }
119
120 fn output_schema(&self) -> SchemaRef {
121 self.table_schema.clone()
122 }
123}
124
125impl std::fmt::Display for PhysicalIndexScan {
126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127 write!(f, "IndexScan: {}", self.index_name)
128 }
129}
130
131impl std::fmt::Debug for PhysicalIndexScan {
132 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133 f.debug_struct("PhysicalIndexScan")
134 .field("table_ref", &self.table_ref)
135 .field("index_name", &self.index_name)
136 .field("table_schema", &self.table_schema)
137 .field("start_bound", &self.start_bound)
138 .field("end_bound", &self.end_bound)
139 .field("prefetch", &self.prefetch)
140 .finish()
141 }
142}