quill_sql/execution/physical_plan/
index_scan.rs

1use std::collections::VecDeque;
2use std::ops::{Bound, RangeBounds};
3use std::sync::Arc;
4
5use parking_lot::Mutex;
6
7use crate::catalog::SchemaRef;
8use crate::error::QuillSQLError;
9use crate::execution::{ExecutionContext, VolcanoExecutor};
10use crate::storage::index::btree_index::TreeIndexIterator;
11use crate::storage::page::{RecordId, TupleMeta};
12use crate::storage::table_heap::TableHeap;
13use crate::transaction::{IsolationLevel, LockMode};
14use crate::utils::table_ref::TableReference;
15use crate::{error::QuillSQLResult, storage::tuple::Tuple};
16
17const INDEX_PREFETCH_BATCH: usize = 64;
18const INVISIBLE_THRESHOLD: usize = 2048;
19
20#[derive(Debug)]
21pub struct PhysicalIndexScan {
22    table_ref: TableReference,
23    index_name: String,
24    table_schema: SchemaRef,
25    start_bound: Bound<Tuple>,
26    end_bound: Bound<Tuple>,
27    iterator: Mutex<Option<TreeIndexIterator>>,
28    table_heap: Mutex<Option<Arc<TableHeap>>>,
29    buffer: Mutex<VecDeque<(RecordId, TupleMeta, Tuple)>>,
30    invisible_hits: Mutex<usize>,
31}
32
33impl PhysicalIndexScan {
34    pub fn new<R: RangeBounds<Tuple>>(
35        table_ref: TableReference,
36        index_name: String,
37        table_schema: SchemaRef,
38        range: R,
39    ) -> Self {
40        Self {
41            table_ref,
42            index_name,
43            table_schema,
44            start_bound: range.start_bound().cloned(),
45            end_bound: range.end_bound().cloned(),
46            iterator: Mutex::new(None),
47            table_heap: Mutex::new(None),
48            buffer: Mutex::new(VecDeque::new()),
49            invisible_hits: Mutex::new(0),
50        }
51    }
52
53    fn refill_buffer(&self) -> QuillSQLResult<bool> {
54        let table_heap = {
55            let guard = self.table_heap.lock();
56            guard
57                .clone()
58                .ok_or_else(|| QuillSQLError::Execution("table heap not initialized".to_string()))?
59        };
60
61        let mut fetched = VecDeque::with_capacity(INDEX_PREFETCH_BATCH);
62        {
63            let mut iter_guard = self.iterator.lock();
64            let iterator = iter_guard.as_mut().ok_or_else(|| {
65                QuillSQLError::Execution("index iterator not created".to_string())
66            })?;
67            for _ in 0..INDEX_PREFETCH_BATCH {
68                match iterator.next()? {
69                    Some(rid) => {
70                        let (meta, tuple) = table_heap.full_tuple(rid)?;
71                        fetched.push_back((rid, meta, tuple));
72                    }
73                    None => break,
74                }
75            }
76        }
77
78        if fetched.is_empty() {
79            return Ok(false);
80        }
81
82        let mut buffer = self.buffer.lock();
83        buffer.extend(fetched);
84        Ok(true)
85    }
86
87    fn handle_invisible(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
88        let mut cnt = self.invisible_hits.lock();
89        *cnt += 1;
90        if *cnt >= INVISIBLE_THRESHOLD {
91            *cnt = 0;
92            if let Some(index_arc) = context.catalog.index(&self.table_ref, &self.index_name)? {
93                index_arc.note_potential_garbage(INVISIBLE_THRESHOLD);
94            }
95        }
96        Ok(())
97    }
98
99    fn consume_row(
100        &self,
101        context: &mut ExecutionContext,
102        rid: RecordId,
103        meta: TupleMeta,
104        tuple: Tuple,
105    ) -> QuillSQLResult<Option<Tuple>> {
106        context.read_visible_tuple(&self.table_ref, rid, &meta, tuple)
107    }
108}
109
110impl VolcanoExecutor for PhysicalIndexScan {
111    fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
112        if matches!(
113            context.txn().isolation_level(),
114            IsolationLevel::ReadCommitted
115                | IsolationLevel::RepeatableRead
116                | IsolationLevel::Serializable
117        ) {
118            context.lock_table(self.table_ref.clone(), LockMode::IntentionShared)?;
119        }
120
121        let table_heap = context.table_heap(&self.table_ref)?;
122        let index = context
123            .catalog
124            .index(&self.table_ref, &self.index_name)?
125            .unwrap();
126
127        {
128            let mut iter_guard = self.iterator.lock();
129            *iter_guard = Some(TreeIndexIterator::new(
130                index,
131                (self.start_bound.clone(), self.end_bound.clone()),
132            ));
133        }
134
135        {
136            let mut heap_guard = self.table_heap.lock();
137            *heap_guard = Some(table_heap);
138        }
139
140        self.buffer.lock().clear();
141        *self.invisible_hits.lock() = 0;
142
143        Ok(())
144    }
145
146    fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
147        loop {
148            if let Some((rid, meta, tuple)) = {
149                let mut buffer = self.buffer.lock();
150                buffer.pop_front()
151            } {
152                if meta.is_deleted {
153                    self.handle_invisible(context)?;
154                    continue;
155                }
156                if let Some(result) = self.consume_row(context, rid, meta, tuple)? {
157                    return Ok(Some(result));
158                }
159                continue;
160            }
161
162            if !self.refill_buffer()? {
163                return Ok(None);
164            }
165        }
166    }
167
168    fn output_schema(&self) -> SchemaRef {
169        self.table_schema.clone()
170    }
171}
172
173impl std::fmt::Display for PhysicalIndexScan {
174    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175        write!(f, "IndexScan: {}", self.index_name)
176    }
177}