1pub mod physical_plan;
2
3use std::sync::Arc;
4
5use crate::catalog::{Schema, SchemaRef};
6use crate::error::{QuillSQLError, QuillSQLResult};
7use crate::execution::physical_plan::PhysicalPlan;
8use crate::expression::{Expr, ExprTrait};
9use crate::storage::{
10 engine::StorageEngine,
11 index::btree_index::BPlusTreeIndex,
12 page::{RecordId, TupleMeta},
13 table_heap::TableHeap,
14 tuple::Tuple,
15};
16use crate::transaction::{
17 CommandId, IsolationLevel, Transaction, TransactionManager, TransactionSnapshot, TxnRuntime,
18};
19use crate::utils::scalar::ScalarValue;
20use crate::{catalog::Catalog, transaction::LockMode, utils::table_ref::TableReference};
21use log::warn;
22use sqlparser::ast::TransactionAccessMode;
23pub trait VolcanoExecutor {
24 fn init(&self, _context: &mut ExecutionContext) -> QuillSQLResult<()> {
25 Ok(())
26 }
27
28 fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>>;
29
30 fn output_schema(&self) -> SchemaRef;
31}
32
33pub struct ExecutionContext<'a> {
36 pub catalog: &'a mut Catalog,
38 storage: Arc<dyn StorageEngine>,
40 txn: TxnRuntime<'a>,
42}
43
44impl<'a> ExecutionContext<'a> {
45 pub fn new(
46 catalog: &'a mut Catalog,
47 txn: &'a mut Transaction,
48 txn_mgr: &'a TransactionManager,
49 storage: Arc<dyn StorageEngine>,
50 ) -> Self {
51 let runtime = TxnRuntime::new(txn_mgr, txn);
52 Self {
53 catalog,
54 storage,
55 txn: runtime,
56 }
57 }
58
59 pub fn command_id(&self) -> CommandId {
60 self.txn.command_id()
61 }
62
63 pub fn snapshot(&self) -> &TransactionSnapshot {
64 self.txn.snapshot()
65 }
66
67 pub fn is_visible(&self, meta: &crate::storage::page::TupleMeta) -> bool {
68 self.txn.is_visible(meta)
69 }
70
71 pub fn read_visible_tuple(
74 &mut self,
75 table: &TableReference,
76 rid: crate::storage::page::RecordId,
77 meta: &crate::storage::page::TupleMeta,
78 tuple: Tuple,
79 ) -> QuillSQLResult<Option<Tuple>> {
80 match self.txn().isolation_level() {
81 IsolationLevel::ReadUncommitted => {
82 if self.is_visible(meta) {
83 Ok(Some(tuple))
84 } else {
85 Ok(None)
86 }
87 }
88 IsolationLevel::ReadCommitted => {
89 self.lock_row_shared(table, rid, false)?;
90 let visible = self.is_visible(meta);
91 self.unlock_row_shared(table, rid)?;
92 if visible {
93 Ok(Some(tuple))
94 } else {
95 Ok(None)
96 }
97 }
98 IsolationLevel::RepeatableRead | IsolationLevel::Serializable => {
99 self.lock_row_shared(table, rid, true)?;
100 let visible = self.is_visible(meta);
101 self.unlock_row_shared(table, rid)?;
102 if visible {
103 Ok(Some(tuple))
104 } else {
105 Ok(None)
106 }
107 }
108 }
109 }
110
111 pub fn eval_predicate(&self, expr: &Expr, tuple: &Tuple) -> QuillSQLResult<bool> {
113 match expr.evaluate(tuple)? {
114 ScalarValue::Boolean(Some(v)) => Ok(v),
115 ScalarValue::Boolean(None) => Ok(false),
116 other => Err(QuillSQLError::Execution(format!(
117 "predicate value must be boolean, got {}",
118 other
119 ))),
120 }
121 }
122
123 pub fn eval_expr(&self, expr: &Expr, tuple: &Tuple) -> QuillSQLResult<ScalarValue> {
125 expr.evaluate(tuple)
126 }
127
128 pub fn insert_tuple_with_indexes(
130 &mut self,
131 table: &TableReference,
132 tuple: &Tuple,
133 ) -> QuillSQLResult<()> {
134 let (table_heap, rid) = self.storage.mvcc_insert(
135 self.catalog,
136 table,
137 tuple,
138 self.txn_id(),
139 self.command_id(),
140 )?;
141
142 let mut index_links = Vec::new();
143 for index in self.table_indexes(table)? {
144 if let Ok(key_tuple) = tuple.project_with_schema(index.key_schema.clone()) {
145 index.insert(&key_tuple, rid)?;
146 index_links.push((index.clone(), key_tuple));
147 }
148 }
149
150 self.txn_mut()
151 .push_insert_undo(table_heap, rid, index_links);
152 Ok(())
153 }
154
155 pub fn apply_delete(
157 &mut self,
158 table_heap: Arc<TableHeap>,
159 rid: RecordId,
160 prev_meta: TupleMeta,
161 prev_tuple: Tuple,
162 ) -> QuillSQLResult<()> {
163 self.storage
164 .mvcc_delete(&table_heap, rid, self.txn_id(), self.command_id())?;
165 self.txn_mut()
166 .push_delete_undo(table_heap, rid, prev_meta, prev_tuple);
167 Ok(())
168 }
169
170 pub fn apply_update(
172 &mut self,
173 table: &TableReference,
174 table_heap: Arc<TableHeap>,
175 rid: RecordId,
176 new_tuple: Tuple,
177 prev_meta: TupleMeta,
178 prev_tuple: Tuple,
179 ) -> QuillSQLResult<()> {
180 let (new_rid, _) = self.storage.mvcc_update(
181 &table_heap,
182 rid,
183 new_tuple.clone(),
184 self.txn_id(),
185 self.command_id(),
186 )?;
187
188 let mut new_keys = Vec::new();
189 for index in self.table_indexes(table)? {
190 if let Ok(new_key_tuple) = new_tuple.project_with_schema(index.key_schema.clone()) {
191 index.insert(&new_key_tuple, new_rid)?;
192 new_keys.push((index.clone(), new_key_tuple));
193 }
194 }
195
196 self.txn_mut()
197 .push_update_undo(table_heap, rid, new_rid, prev_meta, prev_tuple, new_keys);
198 Ok(())
199 }
200
201 pub fn prepare_row_for_write(
204 &mut self,
205 table: &TableReference,
206 rid: crate::storage::page::RecordId,
207 table_heap: &Arc<TableHeap>,
208 observed_meta: &crate::storage::page::TupleMeta,
209 ) -> QuillSQLResult<Option<(crate::storage::page::TupleMeta, Tuple)>> {
210 if !self.is_visible(observed_meta) {
211 return Ok(None);
212 }
213 self.lock_row_exclusive(table, rid)?;
214 let (current_meta, current_tuple) = table_heap.full_tuple(rid)?;
215 if !self.is_visible(¤t_meta) {
216 self.unlock_row(table, rid);
217 return Ok(None);
218 }
219 Ok(Some((current_meta, current_tuple)))
220 }
221
222 pub fn lock_table(&mut self, table: TableReference, mode: LockMode) -> QuillSQLResult<()> {
223 self.txn.lock_table(table, mode)
224 }
225
226 pub fn lock_row_shared(
227 &mut self,
228 table: &TableReference,
229 rid: crate::storage::page::RecordId,
230 retain: bool,
231 ) -> QuillSQLResult<()> {
232 let acquired = self
233 .txn
234 .try_lock_row(table.clone(), rid, LockMode::Shared)?;
235 if !acquired {
236 return Err(QuillSQLError::Execution(
237 "failed to acquire shared row lock".to_string(),
238 ));
239 }
240 if retain {
241 self.txn.record_shared_row_lock(table.clone(), rid);
242 } else {
243 self.txn.remove_row_key_marker(table, rid);
245 }
246 Ok(())
247 }
248
249 pub fn unlock_row_shared(
250 &mut self,
251 table: &TableReference,
252 rid: crate::storage::page::RecordId,
253 ) -> QuillSQLResult<()> {
254 self.txn.try_unlock_shared_row(table, rid)
255 }
256
257 pub fn lock_row_exclusive(
258 &mut self,
259 table: &TableReference,
260 rid: crate::storage::page::RecordId,
261 ) -> QuillSQLResult<()> {
262 if !self
263 .txn
264 .try_lock_row(table.clone(), rid, LockMode::Exclusive)?
265 {
266 return Err(QuillSQLError::Execution(
267 "failed to acquire row exclusive lock".to_string(),
268 ));
269 }
270 Ok(())
271 }
272
273 pub fn ensure_writable(&self, table: &TableReference, operation: &str) -> QuillSQLResult<()> {
275 if matches!(
276 self.txn.transaction().access_mode(),
277 TransactionAccessMode::ReadOnly
278 ) {
279 warn!(
280 "read-only txn {} attempted '{}' on {}",
281 self.txn.id(),
282 operation,
283 table.to_log_string()
284 );
285 return Err(QuillSQLError::Execution(format!(
286 "operation '{}' on table {} is not allowed in READ ONLY transaction",
287 operation,
288 table.to_log_string()
289 )));
290 }
291 Ok(())
292 }
293
294 pub fn txn(&self) -> &Transaction {
295 self.txn.transaction()
296 }
297
298 pub fn txn_mut(&mut self) -> &mut Transaction {
299 self.txn.transaction_mut()
300 }
301
302 pub fn txn_manager(&self) -> &TransactionManager {
303 self.txn.manager()
304 }
305
306 pub fn txn_runtime(&self) -> &TxnRuntime<'a> {
307 &self.txn
308 }
309
310 pub fn txn_id(&self) -> crate::transaction::TransactionId {
311 self.txn.id()
312 }
313
314 pub fn unlock_row(&self, table: &TableReference, rid: crate::storage::page::RecordId) {
315 self.txn.unlock_row(table, rid);
316 }
317
318 pub fn table_heap(&self, table: &TableReference) -> QuillSQLResult<Arc<TableHeap>> {
320 self.storage.table_heap(self.catalog, table)
321 }
322
323 pub fn table_indexes(
325 &self,
326 table: &TableReference,
327 ) -> QuillSQLResult<Vec<Arc<BPlusTreeIndex>>> {
328 self.storage.table_indexes(self.catalog, table)
329 }
330
331 pub fn try_table_heap(&self, table: &TableReference) -> Option<Arc<TableHeap>> {
333 self.catalog.try_table_heap(table)
334 }
335
336 pub fn create_table(
338 &mut self,
339 table: TableReference,
340 schema: Arc<Schema>,
341 ) -> QuillSQLResult<()> {
342 self.catalog.create_table(table, schema).map(|_| ())
343 }
344
345 pub fn drop_table(&mut self, table: &TableReference) -> QuillSQLResult<bool> {
347 self.catalog.drop_table(table)
348 }
349
350 pub fn create_index(
352 &mut self,
353 name: String,
354 table: &TableReference,
355 key_schema: Arc<Schema>,
356 ) -> QuillSQLResult<()> {
357 self.catalog
358 .create_index(name, table, key_schema)
359 .map(|_| ())
360 }
361
362 pub fn drop_index(&mut self, table: &TableReference, name: &str) -> QuillSQLResult<bool> {
364 self.catalog.drop_index(table, name)
365 }
366
367 pub fn find_index_owner(
369 &self,
370 catalog: Option<&str>,
371 schema: Option<&str>,
372 name: &str,
373 ) -> Option<TableReference> {
374 self.catalog.find_index_owner(catalog, schema, name)
375 }
376}
377
378pub struct ExecutionEngine<'a> {
379 pub context: ExecutionContext<'a>,
380}
381impl<'a> ExecutionEngine<'a> {
382 pub fn execute(&mut self, plan: Arc<PhysicalPlan>) -> QuillSQLResult<Vec<Tuple>> {
383 plan.init(&mut self.context)?;
384 let mut result = Vec::new();
385 loop {
386 let next_tuple = plan.next(&mut self.context)?;
387 if let Some(tuple) = next_tuple {
388 result.push(tuple);
389 } else {
390 break;
391 }
392 }
393 Ok(result)
394 }
395}