quill_sql/execution/
mod.rs

1pub mod physical_plan;
2
3use std::sync::Arc;
4
5use crate::catalog::SchemaRef;
6use crate::error::{QuillSQLError, QuillSQLResult};
7use crate::execution::physical_plan::PhysicalPlan;
8use crate::transaction::{CommandId, TransactionManager, TransactionSnapshot};
9use crate::{
10    catalog::Catalog,
11    storage::tuple::Tuple,
12    transaction::{LockMode, Transaction},
13    utils::table_ref::TableReference,
14};
15use log::warn;
16use sqlparser::ast::TransactionAccessMode;
17pub trait VolcanoExecutor {
18    fn init(&self, _context: &mut ExecutionContext) -> QuillSQLResult<()> {
19        Ok(())
20    }
21
22    fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>>;
23
24    fn output_schema(&self) -> SchemaRef;
25}
26
27pub struct ExecutionContext<'a> {
28    pub catalog: &'a mut Catalog,
29    pub txn: &'a mut Transaction,
30    pub txn_mgr: &'a TransactionManager,
31    command_id: CommandId,
32    snapshot: TransactionSnapshot,
33}
34
35impl<'a> ExecutionContext<'a> {
36    pub fn new(
37        catalog: &'a mut Catalog,
38        txn: &'a mut Transaction,
39        txn_mgr: &'a TransactionManager,
40    ) -> Self {
41        let command_id = txn.begin_command();
42        let snapshot = txn_mgr.snapshot(txn.id());
43        Self {
44            catalog,
45            txn,
46            txn_mgr,
47            command_id,
48            snapshot,
49        }
50    }
51
52    pub fn command_id(&self) -> CommandId {
53        self.command_id
54    }
55
56    pub fn snapshot(&self) -> &TransactionSnapshot {
57        &self.snapshot
58    }
59
60    pub fn is_visible(&self, meta: &crate::storage::page::TupleMeta) -> bool {
61        self.snapshot.is_visible(meta, self.command_id, |txn_id| {
62            self.txn_mgr.transaction_status(txn_id)
63        })
64    }
65
66    pub fn lock_table(&mut self, table: TableReference, mode: LockMode) -> QuillSQLResult<()> {
67        self.txn_mgr
68            .acquire_table_lock(self.txn, table.clone(), mode)
69            .map_err(|e| QuillSQLError::Execution(format!("lock error: {}", e)))?;
70        Ok(())
71    }
72
73    pub fn lock_row_shared(
74        &mut self,
75        table: &TableReference,
76        rid: crate::storage::page::RecordId,
77        retain: bool,
78    ) -> QuillSQLResult<()> {
79        let acquired =
80            self.txn_mgr
81                .try_acquire_row_lock(self.txn, table.clone(), rid, LockMode::Shared)?;
82        if !acquired {
83            return Err(QuillSQLError::Execution(
84                "failed to acquire shared row lock".to_string(),
85            ));
86        }
87        if retain {
88            self.txn_mgr
89                .record_shared_row_lock(self.txn.id(), table.clone(), rid);
90        } else {
91            // Track transient shared locks so subsequent attempts still go through the lock manager.
92            self.txn_mgr
93                .remove_row_key_marker(self.txn.id(), table, rid);
94        }
95        Ok(())
96    }
97
98    pub fn unlock_row_shared(
99        &mut self,
100        table: &TableReference,
101        rid: crate::storage::page::RecordId,
102    ) -> QuillSQLResult<()> {
103        self.txn_mgr
104            .try_unlock_shared_row(self.txn.id(), table, rid)
105    }
106
107    pub fn lock_row_exclusive(
108        &mut self,
109        table: &TableReference,
110        rid: crate::storage::page::RecordId,
111    ) -> QuillSQLResult<()> {
112        if !self
113            .txn_mgr
114            .try_acquire_row_lock(self.txn, table.clone(), rid, LockMode::Exclusive)?
115        {
116            return Err(QuillSQLError::Execution(
117                "failed to acquire row exclusive lock".to_string(),
118            ));
119        }
120        Ok(())
121    }
122
123    /// Ensure that the current transaction is allowed to perform a write on the given table.
124    pub fn ensure_writable(&self, table: &TableReference, operation: &str) -> QuillSQLResult<()> {
125        if matches!(self.txn.access_mode(), TransactionAccessMode::ReadOnly) {
126            warn!(
127                "read-only txn {} attempted '{}' on {}",
128                self.txn.id(),
129                operation,
130                table.to_log_string()
131            );
132            return Err(QuillSQLError::Execution(format!(
133                "operation '{}' on table {} is not allowed in READ ONLY transaction",
134                operation,
135                table.to_log_string()
136            )));
137        }
138        Ok(())
139    }
140}
141
142pub struct ExecutionEngine<'a> {
143    pub context: ExecutionContext<'a>,
144}
145impl<'a> ExecutionEngine<'a> {
146    pub fn execute(&mut self, plan: Arc<PhysicalPlan>) -> QuillSQLResult<Vec<Tuple>> {
147        plan.init(&mut self.context)?;
148        let mut result = Vec::new();
149        loop {
150            let next_tuple = plan.next(&mut self.context)?;
151            if let Some(tuple) = next_tuple {
152                result.push(tuple);
153            } else {
154                break;
155            }
156        }
157        Ok(result)
158    }
159}