quill_sql/execution/physical_plan/
index_scan.rs

1//! Index-backed range scan operator with MVCC filtering.
2
3use std::cell::RefCell;
4use std::ops::{Bound, RangeBounds};
5use std::sync::OnceLock;
6
7use super::scan::ScanPrefetch;
8use crate::catalog::SchemaRef;
9use crate::execution::physical_plan::{resolve_table_binding, stream_not_ready};
10use crate::execution::{ExecutionContext, VolcanoExecutor};
11use crate::storage::{
12    engine::{IndexScanRequest, TableBinding, TupleStream},
13    page::{RecordId, TupleMeta},
14};
15use crate::transaction::{IsolationLevel, LockMode};
16use crate::utils::table_ref::TableReference;
17use crate::{error::QuillSQLResult, storage::tuple::Tuple};
18
19const INDEX_PREFETCH_BATCH: usize = 64;
20pub struct PhysicalIndexScan {
21    table_ref: TableReference,
22    index_name: String,
23    table_schema: SchemaRef,
24    start_bound: Bound<Tuple>,
25    end_bound: Bound<Tuple>,
26    stream: RefCell<Option<Box<dyn TupleStream>>>,
27    prefetch: ScanPrefetch,
28    table_binding: OnceLock<TableBinding>,
29}
30
31impl PhysicalIndexScan {
32    pub fn new<R: RangeBounds<Tuple>>(
33        table_ref: TableReference,
34        index_name: String,
35        table_schema: SchemaRef,
36        range: R,
37    ) -> Self {
38        Self {
39            table_ref,
40            index_name,
41            table_schema,
42            start_bound: range.start_bound().cloned(),
43            end_bound: range.end_bound().cloned(),
44            stream: RefCell::new(None),
45            prefetch: ScanPrefetch::new(INDEX_PREFETCH_BATCH),
46            table_binding: OnceLock::new(),
47        }
48    }
49
50    fn refill_buffer(&self) -> QuillSQLResult<bool> {
51        self.prefetch.refill(|limit, out| {
52            let mut stream_guard = self.stream.borrow_mut();
53            let stream = stream_guard
54                .as_mut()
55                .ok_or_else(|| stream_not_ready("IndexScan"))?;
56            for _ in 0..limit {
57                match stream.next()? {
58                    Some(entry) => out.push_back(entry),
59                    None => break,
60                }
61            }
62            Ok(())
63        })
64    }
65
66    fn consume_row(
67        &self,
68        context: &mut ExecutionContext,
69        rid: RecordId,
70        meta: TupleMeta,
71        tuple: Tuple,
72    ) -> QuillSQLResult<Option<Tuple>> {
73        context
74            .txn_ctx_mut()
75            .read_visible_tuple(&self.table_ref, rid, &meta, tuple)
76    }
77}
78
79impl VolcanoExecutor for PhysicalIndexScan {
80    fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
81        if matches!(
82            context.txn_ctx().isolation_level(),
83            IsolationLevel::ReadCommitted
84                | IsolationLevel::RepeatableRead
85                | IsolationLevel::Serializable
86        ) {
87            context
88                .txn_ctx_mut()
89                .lock_table(self.table_ref.clone(), LockMode::IntentionShared)?;
90        }
91
92        let binding = resolve_table_binding(&self.table_binding, context, &self.table_ref)?;
93        let request = IndexScanRequest::new(self.start_bound.clone(), self.end_bound.clone());
94        self.stream
95            .replace(Some(binding.index_scan(&self.index_name, request)?));
96
97        self.prefetch.clear();
98
99        Ok(())
100    }
101
102    fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
103        loop {
104            if let Some((rid, meta, tuple)) = self.prefetch.pop_front() {
105                if meta.is_deleted {
106                    continue;
107                }
108                if let Some(result) = self.consume_row(context, rid, meta, tuple)? {
109                    return Ok(Some(result));
110                }
111                continue;
112            }
113
114            if !self.refill_buffer()? {
115                return Ok(None);
116            }
117        }
118    }
119
120    fn output_schema(&self) -> SchemaRef {
121        self.table_schema.clone()
122    }
123}
124
125impl std::fmt::Display for PhysicalIndexScan {
126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127        write!(f, "IndexScan: {}", self.index_name)
128    }
129}
130
131impl std::fmt::Debug for PhysicalIndexScan {
132    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133        f.debug_struct("PhysicalIndexScan")
134            .field("table_ref", &self.table_ref)
135            .field("index_name", &self.index_name)
136            .field("table_schema", &self.table_schema)
137            .field("start_bound", &self.start_bound)
138            .field("end_bound", &self.end_bound)
139            .field("prefetch", &self.prefetch)
140            .finish()
141    }
142}