quill_sql/execution/physical_plan/
update.rs1use crate::catalog::{SchemaRef, UPDATE_OUTPUT_SCHEMA_REF};
4use crate::error::{QuillSQLError, QuillSQLResult};
5use crate::execution::physical_plan::{resolve_table_binding, stream_not_ready};
6use crate::execution::{ExecutionContext, VolcanoExecutor};
7use crate::expression::Expr;
8use crate::storage::{
9 engine::{TableBinding, TupleStream},
10 tuple::Tuple,
11};
12use crate::transaction::LockMode;
13use crate::utils::scalar::ScalarValue;
14use crate::utils::table_ref::TableReference;
15use std::cell::RefCell;
16use std::collections::HashMap;
17use std::sync::atomic::{AtomicU32, Ordering};
18use std::sync::OnceLock;
19
20pub struct PhysicalUpdate {
21 pub table: TableReference,
22 pub table_schema: SchemaRef,
23 pub assignments: HashMap<String, Expr>,
24 pub selection: Option<Expr>,
25
26 update_rows: AtomicU32,
27 table_iterator: RefCell<Option<Box<dyn TupleStream>>>,
28 table_binding: OnceLock<TableBinding>,
29}
30
31impl PhysicalUpdate {
32 pub fn new(
33 table: TableReference,
34 table_schema: SchemaRef,
35 assignments: HashMap<String, Expr>,
36 selection: Option<Expr>,
37 ) -> Self {
38 Self {
39 table,
40 table_schema,
41 assignments,
42 selection,
43 update_rows: AtomicU32::new(0),
44 table_iterator: RefCell::new(None),
45 table_binding: OnceLock::new(),
46 }
47 }
48}
49
50impl VolcanoExecutor for PhysicalUpdate {
51 fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
52 self.update_rows.store(0, Ordering::SeqCst);
53 context.txn_ctx().ensure_writable(&self.table, "UPDATE")?;
54 let binding = resolve_table_binding(&self.table_binding, context, &self.table)?;
55 let stream = binding.scan()?;
56 self.table_iterator.replace(Some(stream));
57 context
58 .txn_ctx_mut()
59 .lock_table(self.table.clone(), LockMode::IntentionExclusive)
60 .map_err(|_| {
61 QuillSQLError::Execution(format!(
62 "failed to acquire IX lock on table {}",
63 self.table
64 ))
65 })?;
66 Ok(())
67 }
68
69 fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
70 loop {
72 let next_entry = {
73 let mut guard = self.table_iterator.borrow_mut();
74 let stream = guard.as_mut().ok_or_else(|| stream_not_ready("Update"))?;
75 stream.next()?
76 };
77 if let Some((rid, meta, tuple)) = next_entry {
78 if meta.insert_txn_id == context.txn_ctx().txn_id()
81 && meta.insert_cid == context.txn_ctx().command_id()
82 {
83 continue;
84 }
85
86 if let Some(selection) = &self.selection {
87 if !context.eval_predicate(selection, &tuple)? {
88 continue;
89 }
90 }
91
92 let binding = resolve_table_binding(&self.table_binding, context, &self.table)?;
93 let Some((prev_meta, mut current_tuple)) =
94 binding.prepare_row_for_write(context.txn_ctx_mut(), rid, &meta)?
95 else {
96 continue;
97 };
98 let prev_tuple = current_tuple.clone();
99 let mut eval_tuple = current_tuple.clone();
100
101 for (col_name, value_expr) in self.assignments.iter() {
103 let index = current_tuple.schema.index_of(None, col_name)?;
104 let col_datatype = current_tuple.schema.columns[index].data_type;
105 let new_value = context
109 .eval_expr(value_expr, &eval_tuple)?
110 .cast_to(&col_datatype)?;
111 current_tuple.data[index] = new_value.clone();
112 eval_tuple.data[index] = new_value;
113 }
114 binding.update(
115 context.txn_ctx_mut(),
116 rid,
117 current_tuple.clone(),
118 prev_meta,
119 prev_tuple,
120 )?;
121
122 self.update_rows.fetch_add(1, Ordering::SeqCst);
123 } else {
124 return if self.update_rows.load(Ordering::SeqCst) == 0 {
125 Ok(None)
126 } else {
127 let update_rows = self.update_rows.swap(0, Ordering::SeqCst);
128 Ok(Some(Tuple::new(
129 self.output_schema(),
130 vec![ScalarValue::Int32(Some(update_rows as i32))],
131 )))
132 };
133 }
134 }
135 }
136
137 fn output_schema(&self) -> SchemaRef {
138 UPDATE_OUTPUT_SCHEMA_REF.clone()
139 }
140}
141
142impl std::fmt::Display for PhysicalUpdate {
143 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144 write!(f, "Update")
145 }
146}
147
148impl std::fmt::Debug for PhysicalUpdate {
149 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150 f.debug_struct("PhysicalUpdate")
151 .field("table", &self.table)
152 .field("table_schema", &self.table_schema)
153 .field("assignments", &self.assignments)
154 .field("selection", &self.selection)
155 .field("update_rows", &self.update_rows)
156 .finish()
157 }
158}