quill_sql/execution/physical_plan/
delete.rs1use std::sync::atomic::{AtomicU32, Ordering};
2
3use crate::catalog::{SchemaRef, DELETE_OUTPUT_SCHEMA_REF};
4use crate::error::{QuillSQLError, QuillSQLResult};
5use crate::execution::{ExecutionContext, VolcanoExecutor};
6use crate::expression::Expr;
7use crate::expression::ExprTrait;
8use crate::storage::table_heap::TableIterator;
9use crate::storage::tuple::Tuple;
10use crate::transaction::LockMode;
11use crate::utils::scalar::ScalarValue;
12use crate::utils::table_ref::TableReference;
13
14#[derive(Debug)]
15pub struct PhysicalDelete {
16 pub table: TableReference,
17 pub table_schema: SchemaRef,
18 pub predicate: Option<Expr>,
19 deleted_rows: AtomicU32,
20 iterator: std::sync::Mutex<Option<TableIterator>>,
21}
22
23impl PhysicalDelete {
24 pub fn new(table: TableReference, table_schema: SchemaRef, predicate: Option<Expr>) -> Self {
25 Self {
26 table,
27 table_schema,
28 predicate,
29 deleted_rows: AtomicU32::new(0),
30 iterator: std::sync::Mutex::new(None),
31 }
32 }
33}
34
35impl VolcanoExecutor for PhysicalDelete {
36 fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
37 self.deleted_rows.store(0, Ordering::SeqCst);
38 context.ensure_writable(&self.table, "DELETE")?;
39 context.lock_table(self.table.clone(), LockMode::IntentionExclusive)?;
40 let table_heap = context.catalog.table_heap(&self.table)?;
41 *self.iterator.lock().unwrap() = Some(TableIterator::new(table_heap, ..));
42 Ok(())
43 }
44
45 fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
46 let Some(iterator) = &mut *self.iterator.lock().unwrap() else {
47 return Err(QuillSQLError::Execution(
48 "table iterator not created".to_string(),
49 ));
50 };
51
52 loop {
53 let Some((rid, tuple)) = iterator.next()? else {
54 let deleted = self.deleted_rows.swap(0, Ordering::SeqCst);
55 if deleted == 0 {
56 return Ok(None);
57 }
58 return Ok(Some(Tuple::new(
59 self.output_schema(),
60 vec![ScalarValue::Int32(Some(deleted as i32))],
61 )));
62 };
63
64 if let Some(predicate) = &self.predicate {
65 if !predicate.evaluate(&tuple)?.as_boolean()?.unwrap_or(false) {
66 continue;
67 }
68 }
69
70 let table_heap = context.catalog.table_heap(&self.table)?;
71 let meta = table_heap.tuple_meta(rid)?;
72 if !Tuple::is_visible(&meta, context.txn.id()) {
73 continue;
74 }
75
76 context.lock_row_exclusive(&self.table, rid)?;
77
78 let old_tuple = table_heap.tuple(rid)?;
79 table_heap.delete_tuple(rid, context.txn.id())?;
80
81 let mut index_entries = Vec::new();
82 for index in context.catalog.table_indexes(&self.table)? {
83 if let Ok(key) = old_tuple.project_with_schema(index.key_schema.clone()) {
84 index.delete(&key)?;
85 index_entries.push((index.clone(), key));
86 }
87 }
88
89 context
90 .txn
91 .push_delete_undo(table_heap.clone(), rid, meta, old_tuple, index_entries);
92 self.deleted_rows.fetch_add(1, Ordering::SeqCst);
93 }
94 }
95
96 fn output_schema(&self) -> SchemaRef {
97 DELETE_OUTPUT_SCHEMA_REF.clone()
98 }
99}
100
101impl std::fmt::Display for PhysicalDelete {
102 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103 write!(f, "Delete")
104 }
105}