quill_sql/execution/
mod.rs1pub mod physical_plan;
2
3use std::sync::Arc;
4
5use crate::catalog::SchemaRef;
6use crate::error::{QuillSQLError, QuillSQLResult};
7use crate::execution::physical_plan::PhysicalPlan;
8use crate::transaction::{CommandId, TransactionManager, TransactionSnapshot};
9use crate::{
10 catalog::Catalog,
11 storage::tuple::Tuple,
12 transaction::{LockMode, Transaction},
13 utils::table_ref::TableReference,
14};
15use log::warn;
16use sqlparser::ast::TransactionAccessMode;
17pub trait VolcanoExecutor {
18 fn init(&self, _context: &mut ExecutionContext) -> QuillSQLResult<()> {
19 Ok(())
20 }
21
22 fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>>;
23
24 fn output_schema(&self) -> SchemaRef;
25}
26
27pub struct ExecutionContext<'a> {
28 pub catalog: &'a mut Catalog,
29 pub txn: &'a mut Transaction,
30 pub txn_mgr: &'a TransactionManager,
31 command_id: CommandId,
32 snapshot: TransactionSnapshot,
33}
34
35impl<'a> ExecutionContext<'a> {
36 pub fn new(
37 catalog: &'a mut Catalog,
38 txn: &'a mut Transaction,
39 txn_mgr: &'a TransactionManager,
40 ) -> Self {
41 let command_id = txn.begin_command();
42 let snapshot = txn_mgr.snapshot(txn.id());
43 Self {
44 catalog,
45 txn,
46 txn_mgr,
47 command_id,
48 snapshot,
49 }
50 }
51
52 pub fn command_id(&self) -> CommandId {
53 self.command_id
54 }
55
56 pub fn snapshot(&self) -> &TransactionSnapshot {
57 &self.snapshot
58 }
59
60 pub fn is_visible(&self, meta: &crate::storage::page::TupleMeta) -> bool {
61 self.snapshot.is_visible(meta, self.command_id, |txn_id| {
62 self.txn_mgr.transaction_status(txn_id)
63 })
64 }
65
66 pub fn lock_table(&mut self, table: TableReference, mode: LockMode) -> QuillSQLResult<()> {
67 self.txn_mgr
68 .acquire_table_lock(self.txn, table.clone(), mode)
69 .map_err(|e| QuillSQLError::Execution(format!("lock error: {}", e)))?;
70 Ok(())
71 }
72
73 pub fn lock_row_shared(
74 &mut self,
75 table: &TableReference,
76 rid: crate::storage::page::RecordId,
77 retain: bool,
78 ) -> QuillSQLResult<()> {
79 let acquired =
80 self.txn_mgr
81 .try_acquire_row_lock(self.txn, table.clone(), rid, LockMode::Shared)?;
82 if !acquired {
83 return Err(QuillSQLError::Execution(
84 "failed to acquire shared row lock".to_string(),
85 ));
86 }
87 if retain {
88 self.txn_mgr
89 .record_shared_row_lock(self.txn.id(), table.clone(), rid);
90 } else {
91 self.txn_mgr
93 .remove_row_key_marker(self.txn.id(), table, rid);
94 }
95 Ok(())
96 }
97
98 pub fn unlock_row_shared(
99 &mut self,
100 table: &TableReference,
101 rid: crate::storage::page::RecordId,
102 ) -> QuillSQLResult<()> {
103 self.txn_mgr
104 .try_unlock_shared_row(self.txn.id(), table, rid)
105 }
106
107 pub fn lock_row_exclusive(
108 &mut self,
109 table: &TableReference,
110 rid: crate::storage::page::RecordId,
111 ) -> QuillSQLResult<()> {
112 if !self
113 .txn_mgr
114 .try_acquire_row_lock(self.txn, table.clone(), rid, LockMode::Exclusive)?
115 {
116 return Err(QuillSQLError::Execution(
117 "failed to acquire row exclusive lock".to_string(),
118 ));
119 }
120 Ok(())
121 }
122
123 pub fn ensure_writable(&self, table: &TableReference, operation: &str) -> QuillSQLResult<()> {
125 if matches!(self.txn.access_mode(), TransactionAccessMode::ReadOnly) {
126 warn!(
127 "read-only txn {} attempted '{}' on {}",
128 self.txn.id(),
129 operation,
130 table.to_log_string()
131 );
132 return Err(QuillSQLError::Execution(format!(
133 "operation '{}' on table {} is not allowed in READ ONLY transaction",
134 operation,
135 table.to_log_string()
136 )));
137 }
138 Ok(())
139 }
140}
141
142pub struct ExecutionEngine<'a> {
143 pub context: ExecutionContext<'a>,
144}
145impl<'a> ExecutionEngine<'a> {
146 pub fn execute(&mut self, plan: Arc<PhysicalPlan>) -> QuillSQLResult<Vec<Tuple>> {
147 plan.init(&mut self.context)?;
148 let mut result = Vec::new();
149 loop {
150 let next_tuple = plan.next(&mut self.context)?;
151 if let Some(tuple) = next_tuple {
152 result.push(tuple);
153 } else {
154 break;
155 }
156 }
157 Ok(result)
158 }
159}