quill_sql/execution/
mod.rs1pub mod physical_plan;
2
3use std::sync::Arc;
4
5use crate::catalog::SchemaRef;
6use crate::error::{QuillSQLError, QuillSQLResult};
7use crate::execution::physical_plan::PhysicalPlan;
8use crate::transaction::TransactionManager;
9use crate::{
10 catalog::Catalog,
11 storage::tuple::Tuple,
12 transaction::{LockMode, Transaction},
13 utils::table_ref::TableReference,
14};
15use log::warn;
16use sqlparser::ast::TransactionAccessMode;
17pub trait VolcanoExecutor {
18 fn init(&self, _context: &mut ExecutionContext) -> QuillSQLResult<()> {
19 Ok(())
20 }
21
22 fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>>;
23
24 fn output_schema(&self) -> SchemaRef;
25}
26
27pub struct ExecutionContext<'a> {
28 pub catalog: &'a mut Catalog,
29 pub txn: &'a mut Transaction,
30 pub txn_mgr: &'a TransactionManager,
31}
32
33impl<'a> ExecutionContext<'a> {
34 pub fn new(
35 catalog: &'a mut Catalog,
36 txn: &'a mut Transaction,
37 txn_mgr: &'a TransactionManager,
38 ) -> Self {
39 Self {
40 catalog,
41 txn,
42 txn_mgr,
43 }
44 }
45
46 pub fn lock_table(&mut self, table: TableReference, mode: LockMode) -> QuillSQLResult<()> {
47 self.txn_mgr
48 .acquire_table_lock(self.txn, table.clone(), mode)
49 .map_err(|e| QuillSQLError::Execution(format!("lock error: {}", e)))?;
50 Ok(())
51 }
52
53 pub fn lock_row_shared(
54 &mut self,
55 table: &TableReference,
56 rid: crate::storage::page::RecordId,
57 retain: bool,
58 ) -> QuillSQLResult<()> {
59 let acquired =
60 self.txn_mgr
61 .try_acquire_row_lock(self.txn, table.clone(), rid, LockMode::Shared)?;
62 if !acquired {
63 return Err(QuillSQLError::Execution(
64 "failed to acquire shared row lock".to_string(),
65 ));
66 }
67 if retain {
68 self.txn_mgr
69 .record_shared_row_lock(self.txn.id(), table.clone(), rid);
70 } else {
71 self.txn_mgr
73 .remove_row_key_marker(self.txn.id(), table, rid);
74 }
75 Ok(())
76 }
77
78 pub fn unlock_row_shared(
79 &mut self,
80 table: &TableReference,
81 rid: crate::storage::page::RecordId,
82 ) -> QuillSQLResult<()> {
83 self.txn_mgr
84 .try_unlock_shared_row(self.txn.id(), table, rid)
85 }
86
87 pub fn lock_row_exclusive(
88 &mut self,
89 table: &TableReference,
90 rid: crate::storage::page::RecordId,
91 ) -> QuillSQLResult<()> {
92 if !self
93 .txn_mgr
94 .try_acquire_row_lock(self.txn, table.clone(), rid, LockMode::Exclusive)?
95 {
96 return Err(QuillSQLError::Execution(
97 "failed to acquire row exclusive lock".to_string(),
98 ));
99 }
100 Ok(())
101 }
102
103 pub fn ensure_writable(&self, table: &TableReference, operation: &str) -> QuillSQLResult<()> {
105 if matches!(self.txn.access_mode(), TransactionAccessMode::ReadOnly) {
106 warn!(
107 "read-only txn {} attempted '{}' on {}",
108 self.txn.id(),
109 operation,
110 table.to_log_string()
111 );
112 return Err(QuillSQLError::Execution(format!(
113 "operation '{}' on table {} is not allowed in READ ONLY transaction",
114 operation,
115 table.to_log_string()
116 )));
117 }
118 Ok(())
119 }
120}
121
122pub struct ExecutionEngine<'a> {
123 pub context: ExecutionContext<'a>,
124}
125impl<'a> ExecutionEngine<'a> {
126 pub fn execute(&mut self, plan: Arc<PhysicalPlan>) -> QuillSQLResult<Vec<Tuple>> {
127 plan.init(&mut self.context)?;
128 let mut result = Vec::new();
129 loop {
130 let next_tuple = plan.next(&mut self.context)?;
131 if let Some(tuple) = next_tuple {
132 result.push(tuple);
133 } else {
134 break;
135 }
136 }
137 Ok(result)
138 }
139}