Skip to main content

supertable_core/
update.rs

1//! # SuperTable Row-Level Updates
2//!
3//! This module provides tools for performing row-level update operations.
4//! It currently supports the Copy-on-Write (CoW) strategy.
5
6use crate::delete::RowLevelStrategy;
7use crate::table::Table;
8use crate::transaction::Transaction;
9use anyhow::Result;
10use arrow::array::RecordBatch;
11use datafusion::logical_expr::Expr;
12use datafusion::physical_expr::create_physical_expr;
13use datafusion::physical_expr::execution_props::ExecutionProps;
14use datafusion::prelude::SessionContext;
15use std::collections::HashMap;
16
17/// Builder for UPDATE operations.
18pub struct UpdateBuilder {
19    table: Table,
20    filter: Expr,
21    assignments: HashMap<String, Expr>,
22    strategy: RowLevelStrategy,
23}
24
25impl UpdateBuilder {
26    pub fn new(table: Table, filter: Expr) -> Self {
27        Self {
28            table,
29            filter,
30            assignments: HashMap::new(),
31            strategy: RowLevelStrategy::CopyOnWrite,
32        }
33    }
34
35    pub fn set(mut self, column: impl Into<String>, value: Expr) -> Self {
36        self.assignments.insert(column.into(), value);
37        self
38    }
39
40    pub fn with_strategy(mut self, strategy: RowLevelStrategy) -> Self {
41        self.strategy = strategy;
42        self
43    }
44
45    /// Executes the update operation and returns a Transaction.
46    pub async fn execute(self) -> Result<Transaction> {
47        match self.strategy {
48            RowLevelStrategy::CopyOnWrite => self.execute_cow().await,
49            RowLevelStrategy::MergeOnRead => Err(anyhow::anyhow!(
50                "Merge-on-Read for UPDATE is not yet implemented"
51            )),
52        }
53    }
54
55    async fn execute_cow(&self) -> Result<Transaction> {
56        let snapshot = self
57            .table
58            .metadata
59            .current_snapshot()
60            .ok_or_else(|| anyhow::anyhow!("No current snapshot to update"))?;
61        let all_files = snapshot.all_data_files(&self.table.storage).await?;
62
63        let schema_ref = self.table.metadata.current_schema().to_arrow_schema_ref();
64        let df_schema = self.table.metadata.current_schema().to_df_schema()?;
65        let _context = SessionContext::new();
66
67        // Create physical expression for the filter
68        let physical_filter =
69            create_physical_expr(&self.filter, &df_schema, &ExecutionProps::new())?;
70
71        // Create physical expressions for each assignment
72        let mut physical_assignments = HashMap::new();
73        for (col, expr) in &self.assignments {
74            let phys_expr = create_physical_expr(expr, &df_schema, &ExecutionProps::new())?;
75            physical_assignments.insert(col.clone(), phys_expr);
76        }
77
78        let mut tx = self.table.new_transaction();
79        let reader = crate::reader::TableReader::new(self.table.storage.clone());
80        let writer = crate::writer::TableWriter::new(
81            self.table.storage.clone(),
82            self.table.metadata.location.clone(),
83            schema_ref.clone(),
84        );
85
86        for file in all_files {
87            if file.content != crate::manifest::FileContent::Data {
88                continue;
89            }
90
91            let batches = reader.read_file(&file.file_path).await?;
92            let mut rewritten_batches = Vec::new();
93            let mut any_updated = false;
94
95            for batch in batches {
96                let filter_result = physical_filter.evaluate(&batch)?;
97                let filter_array = filter_result.into_array(batch.num_rows())?;
98                let filter_boolean = filter_array
99                    .as_any()
100                    .downcast_ref::<arrow::array::BooleanArray>()
101                    .ok_or_else(|| anyhow::anyhow!("Filter must return boolean"))?;
102
103                if filter_boolean.true_count() > 0 {
104                    any_updated = true;
105
106                    // For matching rows, apply assignments.
107                    // This is complex to do row-by-row on Arrow arrays.
108                    // A better way is to evaluate the assignment expr on the whole batch,
109                    // then use the filter_boolean to 'zip' (choose) between old and new values.
110
111                    let mut new_columns = Vec::new();
112                    for (i, field) in schema_ref.fields().iter().enumerate() {
113                        let original_col = batch.column(i);
114
115                        if let Some(phys_assignment) = physical_assignments.get(field.name()) {
116                            let assigned_val = phys_assignment.evaluate(&batch)?;
117                            let assigned_array = assigned_val.into_array(batch.num_rows())?;
118
119                            // Zip: if filter is true, use assigned_array, else use original_col
120                            let updated_col = arrow::compute::kernels::zip::zip(
121                                filter_boolean,
122                                &assigned_array,
123                                original_col,
124                            )?;
125                            new_columns.push(updated_col);
126                        } else {
127                            new_columns.push(original_col.clone());
128                        }
129                    }
130
131                    let updated_batch = RecordBatch::try_new(schema_ref.clone(), new_columns)?;
132                    rewritten_batches.push(updated_batch);
133                } else {
134                    rewritten_batches.push(batch);
135                }
136            }
137
138            if any_updated {
139                tx.delete_file(file.file_path.clone());
140                let combined_batch =
141                    arrow::compute::concat_batches(&schema_ref, &rewritten_batches)?;
142                let file_id = uuid::Uuid::new_v4().to_string();
143                let new_file = writer.write_batch(&combined_batch, &file_id).await?;
144                tx.add_file(new_file);
145            }
146        }
147
148        Ok(tx)
149    }
150}