supertable_core/
update.rs1use crate::delete::RowLevelStrategy;
7use crate::table::Table;
8use crate::transaction::Transaction;
9use anyhow::Result;
10use arrow::array::RecordBatch;
11use datafusion::logical_expr::Expr;
12use datafusion::physical_expr::create_physical_expr;
13use datafusion::physical_expr::execution_props::ExecutionProps;
14use datafusion::prelude::SessionContext;
15use std::collections::HashMap;
16
17pub struct UpdateBuilder {
19 table: Table,
20 filter: Expr,
21 assignments: HashMap<String, Expr>,
22 strategy: RowLevelStrategy,
23}
24
25impl UpdateBuilder {
26 pub fn new(table: Table, filter: Expr) -> Self {
27 Self {
28 table,
29 filter,
30 assignments: HashMap::new(),
31 strategy: RowLevelStrategy::CopyOnWrite,
32 }
33 }
34
35 pub fn set(mut self, column: impl Into<String>, value: Expr) -> Self {
36 self.assignments.insert(column.into(), value);
37 self
38 }
39
40 pub fn with_strategy(mut self, strategy: RowLevelStrategy) -> Self {
41 self.strategy = strategy;
42 self
43 }
44
45 pub async fn execute(self) -> Result<Transaction> {
47 match self.strategy {
48 RowLevelStrategy::CopyOnWrite => self.execute_cow().await,
49 RowLevelStrategy::MergeOnRead => Err(anyhow::anyhow!(
50 "Merge-on-Read for UPDATE is not yet implemented"
51 )),
52 }
53 }
54
55 async fn execute_cow(&self) -> Result<Transaction> {
56 let snapshot = self
57 .table
58 .metadata
59 .current_snapshot()
60 .ok_or_else(|| anyhow::anyhow!("No current snapshot to update"))?;
61 let all_files = snapshot.all_data_files(&self.table.storage).await?;
62
63 let schema_ref = self.table.metadata.current_schema().to_arrow_schema_ref();
64 let df_schema = self.table.metadata.current_schema().to_df_schema()?;
65 let _context = SessionContext::new();
66
67 let physical_filter =
69 create_physical_expr(&self.filter, &df_schema, &ExecutionProps::new())?;
70
71 let mut physical_assignments = HashMap::new();
73 for (col, expr) in &self.assignments {
74 let phys_expr = create_physical_expr(expr, &df_schema, &ExecutionProps::new())?;
75 physical_assignments.insert(col.clone(), phys_expr);
76 }
77
78 let mut tx = self.table.new_transaction();
79 let reader = crate::reader::TableReader::new(self.table.storage.clone());
80 let writer = crate::writer::TableWriter::new(
81 self.table.storage.clone(),
82 self.table.metadata.location.clone(),
83 schema_ref.clone(),
84 );
85
86 for file in all_files {
87 if file.content != crate::manifest::FileContent::Data {
88 continue;
89 }
90
91 let batches = reader.read_file(&file.file_path).await?;
92 let mut rewritten_batches = Vec::new();
93 let mut any_updated = false;
94
95 for batch in batches {
96 let filter_result = physical_filter.evaluate(&batch)?;
97 let filter_array = filter_result.into_array(batch.num_rows())?;
98 let filter_boolean = filter_array
99 .as_any()
100 .downcast_ref::<arrow::array::BooleanArray>()
101 .ok_or_else(|| anyhow::anyhow!("Filter must return boolean"))?;
102
103 if filter_boolean.true_count() > 0 {
104 any_updated = true;
105
106 let mut new_columns = Vec::new();
112 for (i, field) in schema_ref.fields().iter().enumerate() {
113 let original_col = batch.column(i);
114
115 if let Some(phys_assignment) = physical_assignments.get(field.name()) {
116 let assigned_val = phys_assignment.evaluate(&batch)?;
117 let assigned_array = assigned_val.into_array(batch.num_rows())?;
118
119 let updated_col = arrow::compute::kernels::zip::zip(
121 filter_boolean,
122 &assigned_array,
123 original_col,
124 )?;
125 new_columns.push(updated_col);
126 } else {
127 new_columns.push(original_col.clone());
128 }
129 }
130
131 let updated_batch = RecordBatch::try_new(schema_ref.clone(), new_columns)?;
132 rewritten_batches.push(updated_batch);
133 } else {
134 rewritten_batches.push(batch);
135 }
136 }
137
138 if any_updated {
139 tx.delete_file(file.file_path.clone());
140 let combined_batch =
141 arrow::compute::concat_batches(&schema_ref, &rewritten_batches)?;
142 let file_id = uuid::Uuid::new_v4().to_string();
143 let new_file = writer.write_batch(&combined_batch, &file_id).await?;
144 tx.add_file(new_file);
145 }
146 }
147
148 Ok(tx)
149 }
150}