quill_sql/execution/physical_plan/
delete.rs

1use std::sync::atomic::{AtomicU32, Ordering};
2
3use crate::catalog::{SchemaRef, DELETE_OUTPUT_SCHEMA_REF};
4use crate::error::{QuillSQLError, QuillSQLResult};
5use crate::execution::{ExecutionContext, VolcanoExecutor};
6use crate::expression::Expr;
7use crate::expression::ExprTrait;
8use crate::storage::table_heap::TableIterator;
9use crate::storage::tuple::Tuple;
10use crate::transaction::LockMode;
11use crate::utils::scalar::ScalarValue;
12use crate::utils::table_ref::TableReference;
13
14#[derive(Debug)]
15pub struct PhysicalDelete {
16    pub table: TableReference,
17    pub table_schema: SchemaRef,
18    pub predicate: Option<Expr>,
19    deleted_rows: AtomicU32,
20    iterator: std::sync::Mutex<Option<TableIterator>>,
21}
22
23impl PhysicalDelete {
24    pub fn new(table: TableReference, table_schema: SchemaRef, predicate: Option<Expr>) -> Self {
25        Self {
26            table,
27            table_schema,
28            predicate,
29            deleted_rows: AtomicU32::new(0),
30            iterator: std::sync::Mutex::new(None),
31        }
32    }
33}
34
35impl VolcanoExecutor for PhysicalDelete {
36    fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
37        self.deleted_rows.store(0, Ordering::SeqCst);
38        context.ensure_writable(&self.table, "DELETE")?;
39        context.lock_table(self.table.clone(), LockMode::IntentionExclusive)?;
40        let table_heap = context.catalog.table_heap(&self.table)?;
41        *self.iterator.lock().unwrap() = Some(TableIterator::new(table_heap, ..));
42        Ok(())
43    }
44
45    fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
46        let Some(iterator) = &mut *self.iterator.lock().unwrap() else {
47            return Err(QuillSQLError::Execution(
48                "table iterator not created".to_string(),
49            ));
50        };
51
52        loop {
53            let Some((rid, tuple)) = iterator.next()? else {
54                let deleted = self.deleted_rows.swap(0, Ordering::SeqCst);
55                if deleted == 0 {
56                    return Ok(None);
57                }
58                return Ok(Some(Tuple::new(
59                    self.output_schema(),
60                    vec![ScalarValue::Int32(Some(deleted as i32))],
61                )));
62            };
63
64            if let Some(predicate) = &self.predicate {
65                if !predicate.evaluate(&tuple)?.as_boolean()?.unwrap_or(false) {
66                    continue;
67                }
68            }
69
70            let table_heap = context.catalog.table_heap(&self.table)?;
71            let meta = table_heap.tuple_meta(rid)?;
72            if !Tuple::is_visible(&meta, context.txn.id()) {
73                continue;
74            }
75
76            context.lock_row_exclusive(&self.table, rid)?;
77
78            let old_tuple = table_heap.tuple(rid)?;
79            table_heap.delete_tuple(rid, context.txn.id())?;
80
81            let mut index_entries = Vec::new();
82            for index in context.catalog.table_indexes(&self.table)? {
83                if let Ok(key) = old_tuple.project_with_schema(index.key_schema.clone()) {
84                    index.delete(&key)?;
85                    index_entries.push((index.clone(), key));
86                }
87            }
88
89            context
90                .txn
91                .push_delete_undo(table_heap.clone(), rid, meta, old_tuple, index_entries);
92            self.deleted_rows.fetch_add(1, Ordering::SeqCst);
93        }
94    }
95
96    fn output_schema(&self) -> SchemaRef {
97        DELETE_OUTPUT_SCHEMA_REF.clone()
98    }
99}
100
101impl std::fmt::Display for PhysicalDelete {
102    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103        write!(f, "Delete")
104    }
105}