quill_sql/execution/physical_plan/
update.rs1use crate::catalog::{SchemaRef, UPDATE_OUTPUT_SCHEMA_REF};
2use crate::error::{QuillSQLError, QuillSQLResult};
3use crate::execution::{ExecutionContext, VolcanoExecutor};
4use crate::expression::Expr;
5use crate::storage::table_heap::TableIterator;
6use crate::storage::tuple::Tuple;
7use crate::transaction::LockMode;
8use crate::utils::scalar::ScalarValue;
9use crate::utils::table_ref::TableReference;
10use std::collections::HashMap;
11use std::sync::atomic::{AtomicU32, Ordering};
12use std::sync::Mutex;
13
14#[derive(Debug)]
15pub struct PhysicalUpdate {
16 pub table: TableReference,
17 pub table_schema: SchemaRef,
18 pub assignments: HashMap<String, Expr>,
19 pub selection: Option<Expr>,
20
21 update_rows: AtomicU32,
22 table_iterator: Mutex<Option<TableIterator>>,
23}
24
25impl PhysicalUpdate {
26 pub fn new(
27 table: TableReference,
28 table_schema: SchemaRef,
29 assignments: HashMap<String, Expr>,
30 selection: Option<Expr>,
31 ) -> Self {
32 Self {
33 table,
34 table_schema,
35 assignments,
36 selection,
37 update_rows: AtomicU32::new(0),
38 table_iterator: Mutex::new(None),
39 }
40 }
41}
42
43impl VolcanoExecutor for PhysicalUpdate {
44 fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
45 self.update_rows.store(0, Ordering::SeqCst);
46 context.ensure_writable(&self.table, "UPDATE")?;
47 let table_heap = context.table_heap(&self.table)?;
48 *self.table_iterator.lock().unwrap() = Some(TableIterator::new(table_heap.clone(), ..));
49 context
50 .lock_table(self.table.clone(), LockMode::IntentionExclusive)
51 .map_err(|_| {
52 QuillSQLError::Execution(format!(
53 "failed to acquire IX lock on table {}",
54 self.table
55 ))
56 })?;
57 Ok(())
58 }
59
60 fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
61 let Some(table_iterator) = &mut *self.table_iterator.lock().unwrap() else {
63 return Err(QuillSQLError::Execution(
64 "table iterator not created".to_string(),
65 ));
66 };
67 let table_heap = context.table_heap(&self.table)?;
68
69 loop {
70 if let Some((rid, meta, tuple)) = table_iterator.next()? {
71 if meta.insert_txn_id == context.txn_id() && meta.insert_cid == context.command_id()
74 {
75 continue;
76 }
77
78 if let Some(selection) = &self.selection {
79 if !context.eval_predicate(selection, &tuple)? {
80 continue;
81 }
82 }
83
84 let Some((prev_meta, mut current_tuple)) =
85 context.prepare_row_for_write(&self.table, rid, &table_heap, &meta)?
86 else {
87 continue;
88 };
89 let prev_tuple = current_tuple.clone();
90 let mut eval_tuple = current_tuple.clone();
91
92 for (col_name, value_expr) in self.assignments.iter() {
94 let index = current_tuple.schema.index_of(None, col_name)?;
95 let col_datatype = current_tuple.schema.columns[index].data_type;
96 let new_value = context
100 .eval_expr(value_expr, &eval_tuple)?
101 .cast_to(&col_datatype)?;
102 current_tuple.data[index] = new_value.clone();
103 eval_tuple.data[index] = new_value;
104 }
105 context.apply_update(
106 &self.table,
107 table_heap.clone(),
108 rid,
109 current_tuple.clone(),
110 prev_meta,
111 prev_tuple,
112 )?;
113
114 self.update_rows.fetch_add(1, Ordering::SeqCst);
115 } else {
116 return if self.update_rows.load(Ordering::SeqCst) == 0 {
117 Ok(None)
118 } else {
119 let update_rows = self.update_rows.swap(0, Ordering::SeqCst);
120 Ok(Some(Tuple::new(
121 self.output_schema(),
122 vec![ScalarValue::Int32(Some(update_rows as i32))],
123 )))
124 };
125 }
126 }
127 }
128
129 fn output_schema(&self) -> SchemaRef {
130 UPDATE_OUTPUT_SCHEMA_REF.clone()
131 }
132}
133
134impl std::fmt::Display for PhysicalUpdate {
135 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136 write!(f, "Update")
137 }
138}