quill_sql/execution/physical_plan/
update.rs

1use crate::catalog::{SchemaRef, UPDATE_OUTPUT_SCHEMA_REF};
2use crate::error::{QuillSQLError, QuillSQLResult};
3use crate::execution::{ExecutionContext, VolcanoExecutor};
4use crate::expression::{Expr, ExprTrait};
5use crate::storage::table_heap::TableIterator;
6use crate::storage::tuple::{Tuple, EMPTY_TUPLE};
7use crate::transaction::LockMode;
8use crate::utils::scalar::ScalarValue;
9use crate::utils::table_ref::TableReference;
10use std::collections::HashMap;
11use std::collections::HashSet;
12use std::sync::atomic::{AtomicU32, Ordering};
13use std::sync::Mutex;
14
15#[derive(Debug)]
16pub struct PhysicalUpdate {
17    pub table: TableReference,
18    pub table_schema: SchemaRef,
19    pub assignments: HashMap<String, Expr>,
20    pub selection: Option<Expr>,
21
22    update_rows: AtomicU32,
23    table_iterator: Mutex<Option<TableIterator>>,
24}
25
26impl PhysicalUpdate {
27    pub fn new(
28        table: TableReference,
29        table_schema: SchemaRef,
30        assignments: HashMap<String, Expr>,
31        selection: Option<Expr>,
32    ) -> Self {
33        Self {
34            table,
35            table_schema,
36            assignments,
37            selection,
38            update_rows: AtomicU32::new(0),
39            table_iterator: Mutex::new(None),
40        }
41    }
42}
43
44impl VolcanoExecutor for PhysicalUpdate {
45    fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
46        self.update_rows.store(0, Ordering::SeqCst);
47        context.ensure_writable(&self.table, "UPDATE")?;
48        let table_heap = context.catalog.table_heap(&self.table)?;
49        *self.table_iterator.lock().unwrap() = Some(TableIterator::new(table_heap.clone(), ..));
50        context
51            .txn_mgr
52            .acquire_table_lock(
53                context.txn,
54                self.table.clone(),
55                LockMode::IntentionExclusive,
56            )
57            .map_err(|_| {
58                QuillSQLError::Execution(format!(
59                    "failed to acquire IX lock on table {}",
60                    self.table
61                ))
62            })?;
63        Ok(())
64    }
65
66    fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
67        // TODO may scan index
68        let Some(table_iterator) = &mut *self.table_iterator.lock().unwrap() else {
69            return Err(QuillSQLError::Execution(
70                "table iterator not created".to_string(),
71            ));
72        };
73        let table_heap = context.catalog.table_heap(&self.table)?;
74
75        loop {
76            if let Some((rid, mut tuple)) = table_iterator.next()? {
77                if let Some(selection) = &self.selection {
78                    if !selection.evaluate(&tuple)?.as_boolean()?.unwrap_or(false) {
79                        continue;
80                    }
81                }
82
83                let meta = table_heap.tuple_meta(rid)?;
84                if !Tuple::is_visible(&meta, context.txn.id()) {
85                    continue;
86                }
87
88                context.lock_row_exclusive(&self.table, rid)?;
89
90                let prev_tuple = tuple.clone();
91                let prev_meta = meta;
92
93                // update tuple data
94                for (col_name, value_expr) in self.assignments.iter() {
95                    let index = tuple.schema.index_of(None, col_name)?;
96                    let col_datatype = tuple.schema.columns[index].data_type;
97                    let new_value = value_expr.evaluate(&EMPTY_TUPLE)?.cast_to(&col_datatype)?;
98                    tuple.data[index] = new_value;
99                }
100                table_heap.update_tuple(rid, tuple.clone())?;
101
102                let mut new_keys = Vec::new();
103                let mut old_keys = Vec::new();
104                let changed_cols: HashSet<String> = self
105                    .assignments
106                    .keys()
107                    .map(|s| s.to_ascii_lowercase())
108                    .collect();
109                let indexes = context.catalog.table_indexes(&self.table)?;
110                for index in indexes {
111                    // Skip indexes whose key columns are not affected by this update
112                    let affected = index
113                        .key_schema
114                        .columns
115                        .iter()
116                        .any(|c| changed_cols.contains(&c.name.to_ascii_lowercase()));
117                    if !affected {
118                        continue;
119                    }
120
121                    let old_key = prev_tuple
122                        .project_with_schema(index.key_schema.clone())
123                        .ok();
124                    let new_key = tuple.project_with_schema(index.key_schema.clone()).ok();
125
126                    // Skip maintenance when key values are unchanged
127                    if let (Some(ref ok), Some(ref nk)) = (&old_key, &new_key) {
128                        if ok.data == nk.data {
129                            continue;
130                        }
131                    }
132
133                    if let Some(old_key_tuple) = old_key {
134                        index.delete(&old_key_tuple)?;
135                        old_keys.push((index.clone(), old_key_tuple));
136                    }
137                    if let Some(new_key_tuple) = new_key {
138                        index.insert(&new_key_tuple, rid)?;
139                        new_keys.push((index.clone(), new_key_tuple));
140                    }
141                }
142
143                context.txn.push_update_undo(
144                    table_heap.clone(),
145                    rid,
146                    prev_meta,
147                    prev_tuple,
148                    new_keys,
149                    old_keys,
150                );
151
152                self.update_rows.fetch_add(1, Ordering::SeqCst);
153            } else {
154                return if self.update_rows.load(Ordering::SeqCst) == 0 {
155                    Ok(None)
156                } else {
157                    let update_rows = self.update_rows.swap(0, Ordering::SeqCst);
158                    Ok(Some(Tuple::new(
159                        self.output_schema(),
160                        vec![ScalarValue::Int32(Some(update_rows as i32))],
161                    )))
162                };
163            }
164        }
165    }
166
167    fn output_schema(&self) -> SchemaRef {
168        UPDATE_OUTPUT_SCHEMA_REF.clone()
169    }
170}
171
172impl std::fmt::Display for PhysicalUpdate {
173    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174        write!(f, "Update")
175    }
176}