quill_sql/execution/physical_plan/
delete.rs

1use std::sync::atomic::{AtomicU32, Ordering};
2
3use crate::catalog::{SchemaRef, DELETE_OUTPUT_SCHEMA_REF};
4use crate::error::{QuillSQLError, QuillSQLResult};
5use crate::execution::{ExecutionContext, VolcanoExecutor};
6use crate::expression::Expr;
7use crate::storage::table_heap::TableIterator;
8use crate::storage::tuple::Tuple;
9use crate::transaction::LockMode;
10use crate::utils::scalar::ScalarValue;
11use crate::utils::table_ref::TableReference;
12
13#[derive(Debug)]
14pub struct PhysicalDelete {
15    pub table: TableReference,
16    pub table_schema: SchemaRef,
17    pub predicate: Option<Expr>,
18    deleted_rows: AtomicU32,
19    iterator: std::sync::Mutex<Option<TableIterator>>,
20}
21
22impl PhysicalDelete {
23    pub fn new(table: TableReference, table_schema: SchemaRef, predicate: Option<Expr>) -> Self {
24        Self {
25            table,
26            table_schema,
27            predicate,
28            deleted_rows: AtomicU32::new(0),
29            iterator: std::sync::Mutex::new(None),
30        }
31    }
32}
33
34impl VolcanoExecutor for PhysicalDelete {
35    fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
36        self.deleted_rows.store(0, Ordering::SeqCst);
37        context.ensure_writable(&self.table, "DELETE")?;
38        context.lock_table(self.table.clone(), LockMode::IntentionExclusive)?;
39        let table_heap = context.table_heap(&self.table)?;
40        *self.iterator.lock().unwrap() = Some(TableIterator::new(table_heap, ..));
41        Ok(())
42    }
43
44    fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
45        let Some(iterator) = &mut *self.iterator.lock().unwrap() else {
46            return Err(QuillSQLError::Execution(
47                "table iterator not created".to_string(),
48            ));
49        };
50
51        loop {
52            let Some((rid, meta, tuple)) = iterator.next()? else {
53                let deleted = self.deleted_rows.swap(0, Ordering::SeqCst);
54                if deleted == 0 {
55                    return Ok(None);
56                }
57                return Ok(Some(Tuple::new(
58                    self.output_schema(),
59                    vec![ScalarValue::Int32(Some(deleted as i32))],
60                )));
61            };
62
63            if let Some(predicate) = &self.predicate {
64                if !context.eval_predicate(predicate, &tuple)? {
65                    continue;
66                }
67            }
68
69            let table_heap = context.table_heap(&self.table)?;
70            let Some((current_meta, current_tuple)) =
71                context.prepare_row_for_write(&self.table, rid, &table_heap, &meta)?
72            else {
73                continue;
74            };
75            context.apply_delete(table_heap.clone(), rid, current_meta, current_tuple)?;
76            self.deleted_rows.fetch_add(1, Ordering::SeqCst);
77        }
78    }
79
80    fn output_schema(&self) -> SchemaRef {
81        DELETE_OUTPUT_SCHEMA_REF.clone()
82    }
83}
84
85impl std::fmt::Display for PhysicalDelete {
86    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87        write!(f, "Delete")
88    }
89}