supertable_core/
delete.rs1use crate::table::Table;
7use crate::transaction::Transaction;
8use anyhow::Result;
9use arrow::compute::filter_record_batch;
10use datafusion::logical_expr::Expr;
11use datafusion::physical_expr::create_physical_expr;
12use datafusion::physical_expr::execution_props::ExecutionProps;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
16pub enum RowLevelStrategy {
17 #[default]
19 CopyOnWrite,
20 MergeOnRead,
22}
23
24#[allow(unused)]
26pub struct DeleteBuilder {
27 table: Table,
28 filter: Expr,
29 strategy: RowLevelStrategy,
30}
31
32impl DeleteBuilder {
33 pub fn new(table: Table, filter: Expr) -> Self {
34 Self {
35 table,
36 filter,
37 strategy: RowLevelStrategy::CopyOnWrite,
38 }
39 }
40
41 pub fn with_strategy(mut self, strategy: RowLevelStrategy) -> Self {
42 self.strategy = strategy;
43 self
44 }
45
46 pub async fn execute(self) -> Result<Transaction> {
48 let _snapshot = self
49 .table
50 .metadata
51 .current_snapshot()
52 .ok_or_else(|| anyhow::anyhow!("No current snapshot to delete from"))?;
53
54 match self.strategy {
55 RowLevelStrategy::CopyOnWrite => self.execute_cow().await,
56 RowLevelStrategy::MergeOnRead => self.execute_mor().await,
57 }
58 }
59
60 async fn execute_cow(&self) -> Result<Transaction> {
61 let snapshot = self.table.metadata.current_snapshot().unwrap();
62 let all_files = snapshot.all_data_files(&self.table.storage).await?;
63
64 let schema_ref = self.table.metadata.current_schema().to_arrow_schema_ref();
65
66 let physical_filter = create_physical_expr(
68 &self.filter,
69 &self.table.metadata.current_schema().to_df_schema()?,
70 &ExecutionProps::new(),
71 )?;
72
73 let mut tx = self.table.new_transaction();
74 let reader = crate::reader::TableReader::new(self.table.storage.clone());
75 let writer = crate::writer::TableWriter::new(
76 self.table.storage.clone(),
77 self.table.metadata.location.clone(),
78 schema_ref.clone(),
79 );
80
81 for file in all_files {
82 if file.content != crate::manifest::FileContent::Data {
83 continue;
84 }
85
86 let batches = reader.read_file(&file.file_path).await?;
87 let mut rewritten_batches = Vec::new();
88 let mut any_deleted = false;
89
90 for batch in batches {
91 let filter_result = physical_filter.evaluate(&batch)?;
100 let filter_array = filter_result.into_array(batch.num_rows())?;
101 let filter_boolean = filter_array
102 .as_any()
103 .downcast_ref::<arrow::array::BooleanArray>()
104 .ok_or_else(|| anyhow::anyhow!("Filter must return boolean"))?;
105
106 let keep_mask = arrow::compute::not(filter_boolean)?;
108
109 if filter_boolean.true_count() > 0 {
111 any_deleted = true;
112 }
113
114 let filtered_batch = filter_record_batch(&batch, &keep_mask)?;
115 if filtered_batch.num_rows() > 0 {
116 rewritten_batches.push(filtered_batch);
117 }
118 }
119
120 if any_deleted {
121 tx.delete_file(file.file_path.clone());
122 if !rewritten_batches.is_empty() {
123 let combined_batch =
124 arrow::compute::concat_batches(&schema_ref, &rewritten_batches)?;
125 let file_id = uuid::Uuid::new_v4().to_string();
126 let new_file = writer.write_batch(&combined_batch, &file_id).await?;
127 tx.add_file(new_file);
128 }
129 }
130 }
131
132 Ok(tx)
133 }
134
135 async fn execute_mor(&self) -> Result<Transaction> {
136 let snapshot = self.table.metadata.current_snapshot().unwrap();
141 let schema = self.table.metadata.current_schema();
142
143 let id_field = schema.fields.iter().find(|f| f.id == 1).cloned();
146
147 if id_field.is_none() {
148 return Err(anyhow::anyhow!(
149 "Cannot determine equality field (id=1) for MoR"
150 ));
151 }
152 let id_field = id_field.unwrap();
153
154 let physical_filter = create_physical_expr(
156 &self.filter,
157 &schema.to_df_schema()?,
158 &ExecutionProps::new(),
159 )?;
160
161 let reader = crate::reader::TableReader::new(self.table.storage.clone());
162 let all_files = snapshot.all_data_files(&self.table.storage).await?;
163
164 let mut ids_to_delete = Vec::new();
165
166 for file in all_files {
167 if file.content != crate::manifest::FileContent::Data {
168 continue;
169 }
170 let batches = reader.read_file(&file.file_path).await?;
171 for batch in batches {
172 let filter_result = physical_filter.evaluate(&batch)?;
173 let filter_array = filter_result.into_array(batch.num_rows())?;
174 let filter_boolean = filter_array
175 .as_any()
176 .downcast_ref::<arrow::array::BooleanArray>()
177 .unwrap();
178
179 if filter_boolean.true_count() > 0 {
180 let id_col_idx = schema.fields.iter().position(|f| f.id == 1).unwrap();
182 let id_array = batch.column(id_col_idx);
183 let filtered_ids = arrow::compute::filter(id_array, filter_boolean)?;
185 ids_to_delete.push(filtered_ids);
186 }
187 }
188 }
189
190 if ids_to_delete.is_empty() {
191 return Ok(self.table.new_transaction());
192 }
193
194 let total_ids: Vec<&dyn arrow::array::Array> =
196 ids_to_delete.iter().map(|a| a.as_ref()).collect();
197 let combined_ids = arrow::compute::concat(&total_ids)?;
198
199 let del_schema = std::sync::Arc::new(arrow::datatypes::Schema::new(vec![
201 arrow::datatypes::Field::new(
202 id_field.name.clone(),
203 id_field.field_type.to_arrow_datatype(),
204 false,
205 ),
206 ]));
207
208 let batch =
209 arrow::record_batch::RecordBatch::try_new(del_schema.clone(), vec![combined_ids])?;
210
211 let writer = crate::writer::TableWriter::new(
212 self.table.storage.clone(),
213 self.table.metadata.location.clone(),
214 del_schema,
215 );
216
217 let file_id = uuid::Uuid::new_v4().to_string();
218 let mut data_file = writer
219 .write_batch(&batch, &format!("delete-eq-{}", file_id))
220 .await?;
221 data_file.content = crate::manifest::FileContent::EqualityDeletes;
222
223 let mut tx = self.table.new_transaction();
224 tx.add_file(data_file);
225
226 Ok(tx)
227 }
228}