Skip to main content

supertable_core/
delete.rs

1//! # SuperTable Row-Level Deletes
2//!
3//! This module provides tools for performing row-level delete operations.
4//! It supports both Copy-on-Write (CoW) and Merge-on-Read (MoR) strategies.
5
6use crate::table::Table;
7use crate::transaction::Transaction;
8use anyhow::Result;
9use arrow::compute::filter_record_batch;
10use datafusion::logical_expr::Expr;
11use datafusion::physical_expr::create_physical_expr;
12use datafusion::physical_expr::execution_props::ExecutionProps;
13
14/// Strategy for performing row-level operations.
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
16pub enum RowLevelStrategy {
17    /// Rewrite affected files entirely.
18    #[default]
19    CopyOnWrite,
20    /// Write delete files to be merged during read.
21    MergeOnRead,
22}
23
24/// Builder for DELETE operations.
25#[allow(unused)]
26pub struct DeleteBuilder {
27    table: Table,
28    filter: Expr,
29    strategy: RowLevelStrategy,
30}
31
32impl DeleteBuilder {
33    pub fn new(table: Table, filter: Expr) -> Self {
34        Self {
35            table,
36            filter,
37            strategy: RowLevelStrategy::CopyOnWrite,
38        }
39    }
40
41    pub fn with_strategy(mut self, strategy: RowLevelStrategy) -> Self {
42        self.strategy = strategy;
43        self
44    }
45
46    /// Executes the delete operation and returns a Transaction.
47    pub async fn execute(self) -> Result<Transaction> {
48        let _snapshot = self
49            .table
50            .metadata
51            .current_snapshot()
52            .ok_or_else(|| anyhow::anyhow!("No current snapshot to delete from"))?;
53
54        match self.strategy {
55            RowLevelStrategy::CopyOnWrite => self.execute_cow().await,
56            RowLevelStrategy::MergeOnRead => self.execute_mor().await,
57        }
58    }
59
60    async fn execute_cow(&self) -> Result<Transaction> {
61        let snapshot = self.table.metadata.current_snapshot().unwrap();
62        let all_files = snapshot.all_data_files(&self.table.storage).await?;
63
64        let schema_ref = self.table.metadata.current_schema().to_arrow_schema_ref();
65
66        // Create physical expression from the logical filter
67        let physical_filter = create_physical_expr(
68            &self.filter,
69            &self.table.metadata.current_schema().to_df_schema()?,
70            &ExecutionProps::new(),
71        )?;
72
73        let mut tx = self.table.new_transaction();
74        let reader = crate::reader::TableReader::new(self.table.storage.clone());
75        let writer = crate::writer::TableWriter::new(
76            self.table.storage.clone(),
77            self.table.metadata.location.clone(),
78            schema_ref.clone(),
79        );
80
81        for file in all_files {
82            if file.content != crate::manifest::FileContent::Data {
83                continue;
84            }
85
86            let batches = reader.read_file(&file.file_path).await?;
87            let mut rewritten_batches = Vec::new();
88            let mut any_deleted = false;
89
90            for batch in batches {
91                // Evaluate filter on the batch
92                // Note: The filter expression in 'DELETE FROM table WHERE filter'
93                // specifies which rows to DELETE.
94                // However, filter_record_batch keeps rows where the result is TRUE.
95                // So we actually need to keep rows where the filter is FALSE or NULL.
96                // For simplicity in this implementation, we'll invert the filter if possible,
97                // or just handle the selection here.
98
99                let filter_result = physical_filter.evaluate(&batch)?;
100                let filter_array = filter_result.into_array(batch.num_rows())?;
101                let filter_boolean = filter_array
102                    .as_any()
103                    .downcast_ref::<arrow::array::BooleanArray>()
104                    .ok_or_else(|| anyhow::anyhow!("Filter must return boolean"))?;
105
106                // Invert the filter: keep rows where filter is FALSE
107                let keep_mask = arrow::compute::not(filter_boolean)?;
108
109                // Check if any rows were deleted (where filter was TRUE)
110                if filter_boolean.true_count() > 0 {
111                    any_deleted = true;
112                }
113
114                let filtered_batch = filter_record_batch(&batch, &keep_mask)?;
115                if filtered_batch.num_rows() > 0 {
116                    rewritten_batches.push(filtered_batch);
117                }
118            }
119
120            if any_deleted {
121                tx.delete_file(file.file_path.clone());
122                if !rewritten_batches.is_empty() {
123                    let combined_batch =
124                        arrow::compute::concat_batches(&schema_ref, &rewritten_batches)?;
125                    let file_id = uuid::Uuid::new_v4().to_string();
126                    let new_file = writer.write_batch(&combined_batch, &file_id).await?;
127                    tx.add_file(new_file);
128                }
129            }
130        }
131
132        Ok(tx)
133    }
134
135    async fn execute_mor(&self) -> Result<Transaction> {
136        // For this prototype, we'll try to extract equality constraints from the filter
137        // and write an Equality Delete file.
138        // E.g., if filter is "id = 5", we write a parquet file with column "id" and value 5.
139
140        let snapshot = self.table.metadata.current_snapshot().unwrap();
141        let schema = self.table.metadata.current_schema();
142
143        // Find identifier column (assuming first column 'id' or similar for now, or use PK if defined)
144        // Todo: Use Table Identifier Fields. For now, picking field_id 1.
145        let id_field = schema.fields.iter().find(|f| f.id == 1).cloned();
146
147        if id_field.is_none() {
148            return Err(anyhow::anyhow!(
149                "Cannot determine equality field (id=1) for MoR"
150            ));
151        }
152        let id_field = id_field.unwrap();
153
154        // Scan to find IDs to delete
155        let physical_filter = create_physical_expr(
156            &self.filter,
157            &schema.to_df_schema()?,
158            &ExecutionProps::new(),
159        )?;
160
161        let reader = crate::reader::TableReader::new(self.table.storage.clone());
162        let all_files = snapshot.all_data_files(&self.table.storage).await?;
163
164        let mut ids_to_delete = Vec::new();
165
166        for file in all_files {
167            if file.content != crate::manifest::FileContent::Data {
168                continue;
169            }
170            let batches = reader.read_file(&file.file_path).await?;
171            for batch in batches {
172                let filter_result = physical_filter.evaluate(&batch)?;
173                let filter_array = filter_result.into_array(batch.num_rows())?;
174                let filter_boolean = filter_array
175                    .as_any()
176                    .downcast_ref::<arrow::array::BooleanArray>()
177                    .unwrap();
178
179                if filter_boolean.true_count() > 0 {
180                    // Extract IDs
181                    let id_col_idx = schema.fields.iter().position(|f| f.id == 1).unwrap();
182                    let id_array = batch.column(id_col_idx);
183                    // Filter ids to keep only those that matched the delete filter
184                    let filtered_ids = arrow::compute::filter(id_array, filter_boolean)?;
185                    ids_to_delete.push(filtered_ids);
186                }
187            }
188        }
189
190        if ids_to_delete.is_empty() {
191            return Ok(self.table.new_transaction());
192        }
193
194        // Concatenate all IDs
195        let total_ids: Vec<&dyn arrow::array::Array> =
196            ids_to_delete.iter().map(|a| a.as_ref()).collect();
197        let combined_ids = arrow::compute::concat(&total_ids)?;
198
199        // Create Equality Delete File Batch
200        let del_schema = std::sync::Arc::new(arrow::datatypes::Schema::new(vec![
201            arrow::datatypes::Field::new(
202                id_field.name.clone(),
203                id_field.field_type.to_arrow_datatype(),
204                false,
205            ),
206        ]));
207
208        let batch =
209            arrow::record_batch::RecordBatch::try_new(del_schema.clone(), vec![combined_ids])?;
210
211        let writer = crate::writer::TableWriter::new(
212            self.table.storage.clone(),
213            self.table.metadata.location.clone(),
214            del_schema,
215        );
216
217        let file_id = uuid::Uuid::new_v4().to_string();
218        let mut data_file = writer
219            .write_batch(&batch, &format!("delete-eq-{}", file_id))
220            .await?;
221        data_file.content = crate::manifest::FileContent::EqualityDeletes;
222
223        let mut tx = self.table.new_transaction();
224        tx.add_file(data_file);
225
226        Ok(tx)
227    }
228}