use crate::table::Table;
use crate::transaction::Transaction;
use anyhow::Result;
use arrow::compute::filter_record_batch;
use datafusion::logical_expr::Expr;
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_expr::execution_props::ExecutionProps;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum RowLevelStrategy {
#[default]
CopyOnWrite,
MergeOnRead,
}
#[allow(unused)]
pub struct DeleteBuilder {
table: Table,
filter: Expr,
strategy: RowLevelStrategy,
}
impl DeleteBuilder {
pub fn new(table: Table, filter: Expr) -> Self {
Self {
table,
filter,
strategy: RowLevelStrategy::CopyOnWrite,
}
}
pub fn with_strategy(mut self, strategy: RowLevelStrategy) -> Self {
self.strategy = strategy;
self
}
pub async fn execute(self) -> Result<Transaction> {
let _snapshot = self
.table
.metadata
.current_snapshot()
.ok_or_else(|| anyhow::anyhow!("No current snapshot to delete from"))?;
match self.strategy {
RowLevelStrategy::CopyOnWrite => self.execute_cow().await,
RowLevelStrategy::MergeOnRead => self.execute_mor().await,
}
}
async fn execute_cow(&self) -> Result<Transaction> {
let snapshot = self.table.metadata.current_snapshot().unwrap();
let all_files = snapshot.all_data_files(&self.table.storage).await?;
let schema_ref = self.table.metadata.current_schema().to_arrow_schema_ref();
let physical_filter = create_physical_expr(
&self.filter,
&self.table.metadata.current_schema().to_df_schema()?,
&ExecutionProps::new(),
)?;
let mut tx = self.table.new_transaction();
let reader = crate::reader::TableReader::new(self.table.storage.clone());
let writer = crate::writer::TableWriter::new(
self.table.storage.clone(),
self.table.metadata.location.clone(),
schema_ref.clone(),
);
for file in all_files {
if file.content != crate::manifest::FileContent::Data {
continue;
}
let batches = reader.read_file(&file.file_path).await?;
let mut rewritten_batches = Vec::new();
let mut any_deleted = false;
for batch in batches {
let filter_result = physical_filter.evaluate(&batch)?;
let filter_array = filter_result.into_array(batch.num_rows())?;
let filter_boolean = filter_array
.as_any()
.downcast_ref::<arrow::array::BooleanArray>()
.ok_or_else(|| anyhow::anyhow!("Filter must return boolean"))?;
let keep_mask = arrow::compute::not(filter_boolean)?;
if filter_boolean.true_count() > 0 {
any_deleted = true;
}
let filtered_batch = filter_record_batch(&batch, &keep_mask)?;
if filtered_batch.num_rows() > 0 {
rewritten_batches.push(filtered_batch);
}
}
if any_deleted {
tx.delete_file(file.file_path.clone());
if !rewritten_batches.is_empty() {
let combined_batch =
arrow::compute::concat_batches(&schema_ref, &rewritten_batches)?;
let file_id = uuid::Uuid::new_v4().to_string();
let new_file = writer.write_batch(&combined_batch, &file_id).await?;
tx.add_file(new_file);
}
}
}
Ok(tx)
}
async fn execute_mor(&self) -> Result<Transaction> {
let snapshot = self.table.metadata.current_snapshot().unwrap();
let schema = self.table.metadata.current_schema();
let id_field = schema.fields.iter().find(|f| f.id == 1).cloned();
if id_field.is_none() {
return Err(anyhow::anyhow!(
"Cannot determine equality field (id=1) for MoR"
));
}
let id_field = id_field.unwrap();
let physical_filter = create_physical_expr(
&self.filter,
&schema.to_df_schema()?,
&ExecutionProps::new(),
)?;
let reader = crate::reader::TableReader::new(self.table.storage.clone());
let all_files = snapshot.all_data_files(&self.table.storage).await?;
let mut ids_to_delete = Vec::new();
for file in all_files {
if file.content != crate::manifest::FileContent::Data {
continue;
}
let batches = reader.read_file(&file.file_path).await?;
for batch in batches {
let filter_result = physical_filter.evaluate(&batch)?;
let filter_array = filter_result.into_array(batch.num_rows())?;
let filter_boolean = filter_array
.as_any()
.downcast_ref::<arrow::array::BooleanArray>()
.unwrap();
if filter_boolean.true_count() > 0 {
let id_col_idx = schema.fields.iter().position(|f| f.id == 1).unwrap();
let id_array = batch.column(id_col_idx);
let filtered_ids = arrow::compute::filter(id_array, filter_boolean)?;
ids_to_delete.push(filtered_ids);
}
}
}
if ids_to_delete.is_empty() {
return Ok(self.table.new_transaction());
}
let total_ids: Vec<&dyn arrow::array::Array> =
ids_to_delete.iter().map(|a| a.as_ref()).collect();
let combined_ids = arrow::compute::concat(&total_ids)?;
let del_schema = std::sync::Arc::new(arrow::datatypes::Schema::new(vec![
arrow::datatypes::Field::new(
id_field.name.clone(),
id_field.field_type.to_arrow_datatype(),
false,
),
]));
let batch =
arrow::record_batch::RecordBatch::try_new(del_schema.clone(), vec![combined_ids])?;
let writer = crate::writer::TableWriter::new(
self.table.storage.clone(),
self.table.metadata.location.clone(),
del_schema,
);
let file_id = uuid::Uuid::new_v4().to_string();
let mut data_file = writer
.write_batch(&batch, &format!("delete-eq-{}", file_id))
.await?;
data_file.content = crate::manifest::FileContent::EqualityDeletes;
let mut tx = self.table.new_transaction();
tx.add_file(data_file);
Ok(tx)
}
}