quill_sql/execution/
mod.rs

1pub mod physical_plan;
2
3use std::sync::Arc;
4
5use crate::catalog::{Schema, SchemaRef};
6use crate::error::{QuillSQLError, QuillSQLResult};
7use crate::execution::physical_plan::PhysicalPlan;
8use crate::expression::{Expr, ExprTrait};
9use crate::storage::{
10    engine::StorageEngine,
11    index::btree_index::BPlusTreeIndex,
12    page::{RecordId, TupleMeta},
13    table_heap::TableHeap,
14    tuple::Tuple,
15};
16use crate::transaction::{
17    CommandId, IsolationLevel, Transaction, TransactionManager, TransactionSnapshot, TxnRuntime,
18};
19use crate::utils::scalar::ScalarValue;
20use crate::{catalog::Catalog, transaction::LockMode, utils::table_ref::TableReference};
21use log::warn;
22use sqlparser::ast::TransactionAccessMode;
23pub trait VolcanoExecutor {
24    fn init(&self, _context: &mut ExecutionContext) -> QuillSQLResult<()> {
25        Ok(())
26    }
27
28    fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>>;
29
30    fn output_schema(&self) -> SchemaRef;
31}
32
33/// Shared state threaded through every physical operator during execution.
34/// Exposes MVCC helpers, storage access, expression evaluation and DDL utilities.
35pub struct ExecutionContext<'a> {
36    /// Mutable reference to the global catalog (schema + metadata).
37    pub catalog: &'a mut Catalog,
38    /// Pluggable storage engine used for heap/index access.
39    storage: Arc<dyn StorageEngine>,
40    /// Transaction runtime wrapper (snapshot, locks, undo tracking).
41    txn: TxnRuntime<'a>,
42}
43
44impl<'a> ExecutionContext<'a> {
45    pub fn new(
46        catalog: &'a mut Catalog,
47        txn: &'a mut Transaction,
48        txn_mgr: &'a TransactionManager,
49        storage: Arc<dyn StorageEngine>,
50    ) -> Self {
51        let runtime = TxnRuntime::new(txn_mgr, txn);
52        Self {
53            catalog,
54            storage,
55            txn: runtime,
56        }
57    }
58
59    pub fn command_id(&self) -> CommandId {
60        self.txn.command_id()
61    }
62
63    pub fn snapshot(&self) -> &TransactionSnapshot {
64        self.txn.snapshot()
65    }
66
67    pub fn is_visible(&self, meta: &crate::storage::page::TupleMeta) -> bool {
68        self.txn.is_visible(meta)
69    }
70
71    /// Perform MVCC visibility checks (and shared locks) based on the current
72    /// isolation level, returning the tuple only if it is visible.
73    pub fn read_visible_tuple(
74        &mut self,
75        table: &TableReference,
76        rid: crate::storage::page::RecordId,
77        meta: &crate::storage::page::TupleMeta,
78        tuple: Tuple,
79    ) -> QuillSQLResult<Option<Tuple>> {
80        match self.txn().isolation_level() {
81            IsolationLevel::ReadUncommitted => {
82                if self.is_visible(meta) {
83                    Ok(Some(tuple))
84                } else {
85                    Ok(None)
86                }
87            }
88            IsolationLevel::ReadCommitted => {
89                self.lock_row_shared(table, rid, false)?;
90                let visible = self.is_visible(meta);
91                self.unlock_row_shared(table, rid)?;
92                if visible {
93                    Ok(Some(tuple))
94                } else {
95                    Ok(None)
96                }
97            }
98            IsolationLevel::RepeatableRead | IsolationLevel::Serializable => {
99                self.lock_row_shared(table, rid, true)?;
100                let visible = self.is_visible(meta);
101                self.unlock_row_shared(table, rid)?;
102                if visible {
103                    Ok(Some(tuple))
104                } else {
105                    Ok(None)
106                }
107            }
108        }
109    }
110
111    /// Evaluate an expression expected to produce a boolean result.
112    pub fn eval_predicate(&self, expr: &Expr, tuple: &Tuple) -> QuillSQLResult<bool> {
113        match expr.evaluate(tuple)? {
114            ScalarValue::Boolean(Some(v)) => Ok(v),
115            ScalarValue::Boolean(None) => Ok(false),
116            other => Err(QuillSQLError::Execution(format!(
117                "predicate value must be boolean, got {}",
118                other
119            ))),
120        }
121    }
122
123    /// Evaluate an arbitrary scalar expression.
124    pub fn eval_expr(&self, expr: &Expr, tuple: &Tuple) -> QuillSQLResult<ScalarValue> {
125        expr.evaluate(tuple)
126    }
127
128    /// Insert a tuple, update all indexes, and register undo state.
129    pub fn insert_tuple_with_indexes(
130        &mut self,
131        table: &TableReference,
132        tuple: &Tuple,
133    ) -> QuillSQLResult<()> {
134        let (table_heap, rid) = self.storage.mvcc_insert(
135            self.catalog,
136            table,
137            tuple,
138            self.txn_id(),
139            self.command_id(),
140        )?;
141
142        let mut index_links = Vec::new();
143        for index in self.table_indexes(table)? {
144            if let Ok(key_tuple) = tuple.project_with_schema(index.key_schema.clone()) {
145                index.insert(&key_tuple, rid)?;
146                index_links.push((index.clone(), key_tuple));
147            }
148        }
149
150        self.txn_mut()
151            .push_insert_undo(table_heap, rid, index_links);
152        Ok(())
153    }
154
155    /// Mark a row deleted via MVCC and enqueue undo data.
156    pub fn apply_delete(
157        &mut self,
158        table_heap: Arc<TableHeap>,
159        rid: RecordId,
160        prev_meta: TupleMeta,
161        prev_tuple: Tuple,
162    ) -> QuillSQLResult<()> {
163        self.storage
164            .mvcc_delete(&table_heap, rid, self.txn_id(), self.command_id())?;
165        self.txn_mut()
166            .push_delete_undo(table_heap, rid, prev_meta, prev_tuple);
167        Ok(())
168    }
169
170    /// Create a new MVCC version, update indexes, and log undo.
171    pub fn apply_update(
172        &mut self,
173        table: &TableReference,
174        table_heap: Arc<TableHeap>,
175        rid: RecordId,
176        new_tuple: Tuple,
177        prev_meta: TupleMeta,
178        prev_tuple: Tuple,
179    ) -> QuillSQLResult<()> {
180        let (new_rid, _) = self.storage.mvcc_update(
181            &table_heap,
182            rid,
183            new_tuple.clone(),
184            self.txn_id(),
185            self.command_id(),
186        )?;
187
188        let mut new_keys = Vec::new();
189        for index in self.table_indexes(table)? {
190            if let Ok(new_key_tuple) = new_tuple.project_with_schema(index.key_schema.clone()) {
191                index.insert(&new_key_tuple, new_rid)?;
192                new_keys.push((index.clone(), new_key_tuple));
193            }
194        }
195
196        self.txn_mut()
197            .push_update_undo(table_heap, rid, new_rid, prev_meta, prev_tuple, new_keys);
198        Ok(())
199    }
200
201    /// Acquire an exclusive lock and re-check visibility before mutating a row.
202    /// Returns the current tuple version if it is still visible.
203    pub fn prepare_row_for_write(
204        &mut self,
205        table: &TableReference,
206        rid: crate::storage::page::RecordId,
207        table_heap: &Arc<TableHeap>,
208        observed_meta: &crate::storage::page::TupleMeta,
209    ) -> QuillSQLResult<Option<(crate::storage::page::TupleMeta, Tuple)>> {
210        if !self.is_visible(observed_meta) {
211            return Ok(None);
212        }
213        self.lock_row_exclusive(table, rid)?;
214        let (current_meta, current_tuple) = table_heap.full_tuple(rid)?;
215        if !self.is_visible(&current_meta) {
216            self.unlock_row(table, rid);
217            return Ok(None);
218        }
219        Ok(Some((current_meta, current_tuple)))
220    }
221
222    pub fn lock_table(&mut self, table: TableReference, mode: LockMode) -> QuillSQLResult<()> {
223        self.txn.lock_table(table, mode)
224    }
225
226    pub fn lock_row_shared(
227        &mut self,
228        table: &TableReference,
229        rid: crate::storage::page::RecordId,
230        retain: bool,
231    ) -> QuillSQLResult<()> {
232        let acquired = self
233            .txn
234            .try_lock_row(table.clone(), rid, LockMode::Shared)?;
235        if !acquired {
236            return Err(QuillSQLError::Execution(
237                "failed to acquire shared row lock".to_string(),
238            ));
239        }
240        if retain {
241            self.txn.record_shared_row_lock(table.clone(), rid);
242        } else {
243            // Track transient shared locks so subsequent attempts still go through the lock manager.
244            self.txn.remove_row_key_marker(table, rid);
245        }
246        Ok(())
247    }
248
249    pub fn unlock_row_shared(
250        &mut self,
251        table: &TableReference,
252        rid: crate::storage::page::RecordId,
253    ) -> QuillSQLResult<()> {
254        self.txn.try_unlock_shared_row(table, rid)
255    }
256
257    pub fn lock_row_exclusive(
258        &mut self,
259        table: &TableReference,
260        rid: crate::storage::page::RecordId,
261    ) -> QuillSQLResult<()> {
262        if !self
263            .txn
264            .try_lock_row(table.clone(), rid, LockMode::Exclusive)?
265        {
266            return Err(QuillSQLError::Execution(
267                "failed to acquire row exclusive lock".to_string(),
268            ));
269        }
270        Ok(())
271    }
272
273    /// Ensure that the current transaction is allowed to perform a write on the given table.
274    pub fn ensure_writable(&self, table: &TableReference, operation: &str) -> QuillSQLResult<()> {
275        if matches!(
276            self.txn.transaction().access_mode(),
277            TransactionAccessMode::ReadOnly
278        ) {
279            warn!(
280                "read-only txn {} attempted '{}' on {}",
281                self.txn.id(),
282                operation,
283                table.to_log_string()
284            );
285            return Err(QuillSQLError::Execution(format!(
286                "operation '{}' on table {} is not allowed in READ ONLY transaction",
287                operation,
288                table.to_log_string()
289            )));
290        }
291        Ok(())
292    }
293
294    pub fn txn(&self) -> &Transaction {
295        self.txn.transaction()
296    }
297
298    pub fn txn_mut(&mut self) -> &mut Transaction {
299        self.txn.transaction_mut()
300    }
301
302    pub fn txn_manager(&self) -> &TransactionManager {
303        self.txn.manager()
304    }
305
306    pub fn txn_runtime(&self) -> &TxnRuntime<'a> {
307        &self.txn
308    }
309
310    pub fn txn_id(&self) -> crate::transaction::TransactionId {
311        self.txn.id()
312    }
313
314    pub fn unlock_row(&self, table: &TableReference, rid: crate::storage::page::RecordId) {
315        self.txn.unlock_row(table, rid);
316    }
317
318    /// Look up the table heap through the storage engine.
319    pub fn table_heap(&self, table: &TableReference) -> QuillSQLResult<Arc<TableHeap>> {
320        self.storage.table_heap(self.catalog, table)
321    }
322
323    /// Fetch all indexes defined on a table.
324    pub fn table_indexes(
325        &self,
326        table: &TableReference,
327    ) -> QuillSQLResult<Vec<Arc<BPlusTreeIndex>>> {
328        self.storage.table_indexes(self.catalog, table)
329    }
330
331    /// Non-allocating helper used by DDL to test for table existence.
332    pub fn try_table_heap(&self, table: &TableReference) -> Option<Arc<TableHeap>> {
333        self.catalog.try_table_heap(table)
334    }
335
336    /// Create a table (used by the CREATE TABLE physical operator).
337    pub fn create_table(
338        &mut self,
339        table: TableReference,
340        schema: Arc<Schema>,
341    ) -> QuillSQLResult<()> {
342        self.catalog.create_table(table, schema).map(|_| ())
343    }
344
345    /// Drop a table, returning whether it existed.
346    pub fn drop_table(&mut self, table: &TableReference) -> QuillSQLResult<bool> {
347        self.catalog.drop_table(table)
348    }
349
350    /// Create an index (used by CREATE INDEX).
351    pub fn create_index(
352        &mut self,
353        name: String,
354        table: &TableReference,
355        key_schema: Arc<Schema>,
356    ) -> QuillSQLResult<()> {
357        self.catalog
358            .create_index(name, table, key_schema)
359            .map(|_| ())
360    }
361
362    /// Drop an index, returning whether it existed.
363    pub fn drop_index(&mut self, table: &TableReference, name: &str) -> QuillSQLResult<bool> {
364        self.catalog.drop_index(table, name)
365    }
366
367    /// Resolve the table that owns `catalog.schema.index`.
368    pub fn find_index_owner(
369        &self,
370        catalog: Option<&str>,
371        schema: Option<&str>,
372        name: &str,
373    ) -> Option<TableReference> {
374        self.catalog.find_index_owner(catalog, schema, name)
375    }
376}
377
378pub struct ExecutionEngine<'a> {
379    pub context: ExecutionContext<'a>,
380}
381impl<'a> ExecutionEngine<'a> {
382    pub fn execute(&mut self, plan: Arc<PhysicalPlan>) -> QuillSQLResult<Vec<Tuple>> {
383        plan.init(&mut self.context)?;
384        let mut result = Vec::new();
385        loop {
386            let next_tuple = plan.next(&mut self.context)?;
387            if let Some(tuple) = next_tuple {
388                result.push(tuple);
389            } else {
390                break;
391            }
392        }
393        Ok(result)
394    }
395}