Skip to main content

quill_sql/execution/physical_plan/
delete.rs

1//! DELETE operator that scans a table and removes visible tuples.
2
3use std::cell::RefCell;
4use std::sync::atomic::{AtomicU32, Ordering};
5use std::sync::OnceLock;
6
7use crate::catalog::{SchemaRef, DELETE_OUTPUT_SCHEMA_REF};
8use crate::error::QuillSQLResult;
9use crate::execution::physical_plan::{resolve_table_binding, stream_not_ready};
10use crate::execution::{ExecutionContext, VolcanoExecutor};
11use crate::expression::Expr;
12use crate::storage::{
13    engine::{TableBinding, TupleStream},
14    tuple::Tuple,
15};
16use crate::transaction::LockMode;
17use crate::utils::scalar::ScalarValue;
18use crate::utils::table_ref::TableReference;
19
20pub struct PhysicalDelete {
21    pub table: TableReference,
22    pub table_schema: SchemaRef,
23    pub predicate: Option<Expr>,
24    deleted_rows: AtomicU32,
25    iterator: RefCell<Option<Box<dyn TupleStream>>>,
26    table_binding: OnceLock<TableBinding>,
27}
28
29impl PhysicalDelete {
30    pub fn new(table: TableReference, table_schema: SchemaRef, predicate: Option<Expr>) -> Self {
31        Self {
32            table,
33            table_schema,
34            predicate,
35            deleted_rows: AtomicU32::new(0),
36            iterator: RefCell::new(None),
37            table_binding: OnceLock::new(),
38        }
39    }
40}
41
42impl VolcanoExecutor for PhysicalDelete {
43    fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
44        self.deleted_rows.store(0, Ordering::SeqCst);
45        context.txn_ctx().ensure_writable(&self.table, "DELETE")?;
46        context
47            .txn_ctx_mut()
48            .lock_table(self.table.clone(), LockMode::IntentionExclusive)?;
49        let binding = resolve_table_binding(&self.table_binding, context, &self.table)?;
50        let stream = binding.scan()?;
51        self.iterator.replace(Some(stream));
52        Ok(())
53    }
54
55    fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
56        loop {
57            let next_entry = {
58                let mut guard = self.iterator.borrow_mut();
59                let stream = guard.as_mut().ok_or_else(|| stream_not_ready("Delete"))?;
60                stream.next()?
61            };
62            let Some((rid, meta, tuple)) = next_entry else {
63                let deleted = self.deleted_rows.swap(0, Ordering::SeqCst);
64                if deleted == 0 {
65                    return Ok(None);
66                }
67                return Ok(Some(Tuple::new(
68                    self.output_schema(),
69                    vec![ScalarValue::Int32(Some(deleted as i32))],
70                )));
71            };
72
73            if let Some(predicate) = &self.predicate {
74                if !context.eval_predicate(predicate, &tuple)? {
75                    continue;
76                }
77            }
78
79            let binding = resolve_table_binding(&self.table_binding, context, &self.table)?;
80            let Some((current_meta, current_tuple)) =
81                binding.prepare_row_for_write(context.txn_ctx_mut(), rid, &meta)?
82            else {
83                continue;
84            };
85            binding.delete(context.txn_ctx_mut(), rid, current_meta, current_tuple)?;
86            self.deleted_rows.fetch_add(1, Ordering::SeqCst);
87        }
88    }
89
90    fn output_schema(&self) -> SchemaRef {
91        DELETE_OUTPUT_SCHEMA_REF.clone()
92    }
93}
94
95impl std::fmt::Display for PhysicalDelete {
96    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97        write!(f, "Delete")
98    }
99}
100
101impl std::fmt::Debug for PhysicalDelete {
102    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103        f.debug_struct("PhysicalDelete")
104            .field("table", &self.table)
105            .field("table_schema", &self.table_schema)
106            .field("predicate", &self.predicate)
107            .field("deleted_rows", &self.deleted_rows)
108            .finish()
109    }
110}