quill_sql/execution/
mod.rs

1pub mod physical_plan;
2
3use std::sync::Arc;
4
5use crate::catalog::SchemaRef;
6use crate::error::{QuillSQLError, QuillSQLResult};
7use crate::execution::physical_plan::PhysicalPlan;
8use crate::transaction::TransactionManager;
9use crate::{
10    catalog::Catalog,
11    storage::tuple::Tuple,
12    transaction::{LockMode, Transaction},
13    utils::table_ref::TableReference,
14};
15use log::warn;
16use sqlparser::ast::TransactionAccessMode;
17pub trait VolcanoExecutor {
18    fn init(&self, _context: &mut ExecutionContext) -> QuillSQLResult<()> {
19        Ok(())
20    }
21
22    fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>>;
23
24    fn output_schema(&self) -> SchemaRef;
25}
26
27pub struct ExecutionContext<'a> {
28    pub catalog: &'a mut Catalog,
29    pub txn: &'a mut Transaction,
30    pub txn_mgr: &'a TransactionManager,
31}
32
33impl<'a> ExecutionContext<'a> {
34    pub fn new(
35        catalog: &'a mut Catalog,
36        txn: &'a mut Transaction,
37        txn_mgr: &'a TransactionManager,
38    ) -> Self {
39        Self {
40            catalog,
41            txn,
42            txn_mgr,
43        }
44    }
45
46    pub fn lock_table(&mut self, table: TableReference, mode: LockMode) -> QuillSQLResult<()> {
47        self.txn_mgr
48            .acquire_table_lock(self.txn, table.clone(), mode)
49            .map_err(|e| QuillSQLError::Execution(format!("lock error: {}", e)))?;
50        Ok(())
51    }
52
53    pub fn lock_row_shared(
54        &mut self,
55        table: &TableReference,
56        rid: crate::storage::page::RecordId,
57        retain: bool,
58    ) -> QuillSQLResult<()> {
59        let acquired =
60            self.txn_mgr
61                .try_acquire_row_lock(self.txn, table.clone(), rid, LockMode::Shared)?;
62        if !acquired {
63            return Err(QuillSQLError::Execution(
64                "failed to acquire shared row lock".to_string(),
65            ));
66        }
67        if retain {
68            self.txn_mgr
69                .record_shared_row_lock(self.txn.id(), table.clone(), rid);
70        } else {
71            // Track transient shared locks so subsequent attempts still go through the lock manager.
72            self.txn_mgr
73                .remove_row_key_marker(self.txn.id(), table, rid);
74        }
75        Ok(())
76    }
77
78    pub fn unlock_row_shared(
79        &mut self,
80        table: &TableReference,
81        rid: crate::storage::page::RecordId,
82    ) -> QuillSQLResult<()> {
83        self.txn_mgr
84            .try_unlock_shared_row(self.txn.id(), table, rid)
85    }
86
87    pub fn lock_row_exclusive(
88        &mut self,
89        table: &TableReference,
90        rid: crate::storage::page::RecordId,
91    ) -> QuillSQLResult<()> {
92        if !self
93            .txn_mgr
94            .try_acquire_row_lock(self.txn, table.clone(), rid, LockMode::Exclusive)?
95        {
96            return Err(QuillSQLError::Execution(
97                "failed to acquire row exclusive lock".to_string(),
98            ));
99        }
100        Ok(())
101    }
102
103    /// Ensure that the current transaction is allowed to perform a write on the given table.
104    pub fn ensure_writable(&self, table: &TableReference, operation: &str) -> QuillSQLResult<()> {
105        if matches!(self.txn.access_mode(), TransactionAccessMode::ReadOnly) {
106            warn!(
107                "read-only txn {} attempted '{}' on {}",
108                self.txn.id(),
109                operation,
110                table.to_log_string()
111            );
112            return Err(QuillSQLError::Execution(format!(
113                "operation '{}' on table {} is not allowed in READ ONLY transaction",
114                operation,
115                table.to_log_string()
116            )));
117        }
118        Ok(())
119    }
120}
121
122pub struct ExecutionEngine<'a> {
123    pub context: ExecutionContext<'a>,
124}
125impl<'a> ExecutionEngine<'a> {
126    pub fn execute(&mut self, plan: Arc<PhysicalPlan>) -> QuillSQLResult<Vec<Tuple>> {
127        plan.init(&mut self.context)?;
128        let mut result = Vec::new();
129        loop {
130            let next_tuple = plan.next(&mut self.context)?;
131            if let Some(tuple) = next_tuple {
132                result.push(tuple);
133            } else {
134                break;
135            }
136        }
137        Ok(result)
138    }
139}