quill_sql/execution/physical_plan/
update.rs1use crate::catalog::{SchemaRef, UPDATE_OUTPUT_SCHEMA_REF};
2use crate::error::{QuillSQLError, QuillSQLResult};
3use crate::execution::{ExecutionContext, VolcanoExecutor};
4use crate::expression::{Expr, ExprTrait};
5use crate::storage::table_heap::TableIterator;
6use crate::storage::tuple::{Tuple, EMPTY_TUPLE};
7use crate::transaction::LockMode;
8use crate::utils::scalar::ScalarValue;
9use crate::utils::table_ref::TableReference;
10use std::collections::HashMap;
11use std::collections::HashSet;
12use std::sync::atomic::{AtomicU32, Ordering};
13use std::sync::Mutex;
14
15#[derive(Debug)]
16pub struct PhysicalUpdate {
17 pub table: TableReference,
18 pub table_schema: SchemaRef,
19 pub assignments: HashMap<String, Expr>,
20 pub selection: Option<Expr>,
21
22 update_rows: AtomicU32,
23 table_iterator: Mutex<Option<TableIterator>>,
24}
25
26impl PhysicalUpdate {
27 pub fn new(
28 table: TableReference,
29 table_schema: SchemaRef,
30 assignments: HashMap<String, Expr>,
31 selection: Option<Expr>,
32 ) -> Self {
33 Self {
34 table,
35 table_schema,
36 assignments,
37 selection,
38 update_rows: AtomicU32::new(0),
39 table_iterator: Mutex::new(None),
40 }
41 }
42}
43
44impl VolcanoExecutor for PhysicalUpdate {
45 fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
46 self.update_rows.store(0, Ordering::SeqCst);
47 context.ensure_writable(&self.table, "UPDATE")?;
48 let table_heap = context.catalog.table_heap(&self.table)?;
49 *self.table_iterator.lock().unwrap() = Some(TableIterator::new(table_heap.clone(), ..));
50 context
51 .txn_mgr
52 .acquire_table_lock(
53 context.txn,
54 self.table.clone(),
55 LockMode::IntentionExclusive,
56 )
57 .map_err(|_| {
58 QuillSQLError::Execution(format!(
59 "failed to acquire IX lock on table {}",
60 self.table
61 ))
62 })?;
63 Ok(())
64 }
65
66 fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
67 let Some(table_iterator) = &mut *self.table_iterator.lock().unwrap() else {
69 return Err(QuillSQLError::Execution(
70 "table iterator not created".to_string(),
71 ));
72 };
73 let table_heap = context.catalog.table_heap(&self.table)?;
74
75 loop {
76 if let Some((rid, mut tuple)) = table_iterator.next()? {
77 if let Some(selection) = &self.selection {
78 if !selection.evaluate(&tuple)?.as_boolean()?.unwrap_or(false) {
79 continue;
80 }
81 }
82
83 let meta = table_heap.tuple_meta(rid)?;
84 if !Tuple::is_visible(&meta, context.txn.id()) {
85 continue;
86 }
87
88 context.lock_row_exclusive(&self.table, rid)?;
89
90 let prev_tuple = tuple.clone();
91 let prev_meta = meta;
92
93 for (col_name, value_expr) in self.assignments.iter() {
95 let index = tuple.schema.index_of(None, col_name)?;
96 let col_datatype = tuple.schema.columns[index].data_type;
97 let new_value = value_expr.evaluate(&EMPTY_TUPLE)?.cast_to(&col_datatype)?;
98 tuple.data[index] = new_value;
99 }
100 table_heap.update_tuple(rid, tuple.clone())?;
101
102 let mut new_keys = Vec::new();
103 let mut old_keys = Vec::new();
104 let changed_cols: HashSet<String> = self
105 .assignments
106 .keys()
107 .map(|s| s.to_ascii_lowercase())
108 .collect();
109 let indexes = context.catalog.table_indexes(&self.table)?;
110 for index in indexes {
111 let affected = index
113 .key_schema
114 .columns
115 .iter()
116 .any(|c| changed_cols.contains(&c.name.to_ascii_lowercase()));
117 if !affected {
118 continue;
119 }
120
121 let old_key = prev_tuple
122 .project_with_schema(index.key_schema.clone())
123 .ok();
124 let new_key = tuple.project_with_schema(index.key_schema.clone()).ok();
125
126 if let (Some(ref ok), Some(ref nk)) = (&old_key, &new_key) {
128 if ok.data == nk.data {
129 continue;
130 }
131 }
132
133 if let Some(old_key_tuple) = old_key {
134 index.delete(&old_key_tuple)?;
135 old_keys.push((index.clone(), old_key_tuple));
136 }
137 if let Some(new_key_tuple) = new_key {
138 index.insert(&new_key_tuple, rid)?;
139 new_keys.push((index.clone(), new_key_tuple));
140 }
141 }
142
143 context.txn.push_update_undo(
144 table_heap.clone(),
145 rid,
146 prev_meta,
147 prev_tuple,
148 new_keys,
149 old_keys,
150 );
151
152 self.update_rows.fetch_add(1, Ordering::SeqCst);
153 } else {
154 return if self.update_rows.load(Ordering::SeqCst) == 0 {
155 Ok(None)
156 } else {
157 let update_rows = self.update_rows.swap(0, Ordering::SeqCst);
158 Ok(Some(Tuple::new(
159 self.output_schema(),
160 vec![ScalarValue::Int32(Some(update_rows as i32))],
161 )))
162 };
163 }
164 }
165 }
166
167 fn output_schema(&self) -> SchemaRef {
168 UPDATE_OUTPUT_SCHEMA_REF.clone()
169 }
170}
171
172impl std::fmt::Display for PhysicalUpdate {
173 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174 write!(f, "Update")
175 }
176}