quill_sql/execution/physical_plan/
update.rs

1//! UPDATE operator that rewrites tuples and keeps heap/index metadata in sync.
2
3use crate::catalog::{SchemaRef, UPDATE_OUTPUT_SCHEMA_REF};
4use crate::error::{QuillSQLError, QuillSQLResult};
5use crate::execution::physical_plan::{resolve_table_binding, stream_not_ready};
6use crate::execution::{ExecutionContext, VolcanoExecutor};
7use crate::expression::Expr;
8use crate::storage::{
9    engine::{TableBinding, TupleStream},
10    tuple::Tuple,
11};
12use crate::transaction::LockMode;
13use crate::utils::scalar::ScalarValue;
14use crate::utils::table_ref::TableReference;
15use std::cell::RefCell;
16use std::collections::HashMap;
17use std::sync::atomic::{AtomicU32, Ordering};
18use std::sync::OnceLock;
19
20pub struct PhysicalUpdate {
21    pub table: TableReference,
22    pub table_schema: SchemaRef,
23    pub assignments: HashMap<String, Expr>,
24    pub selection: Option<Expr>,
25
26    update_rows: AtomicU32,
27    table_iterator: RefCell<Option<Box<dyn TupleStream>>>,
28    table_binding: OnceLock<TableBinding>,
29}
30
31impl PhysicalUpdate {
32    pub fn new(
33        table: TableReference,
34        table_schema: SchemaRef,
35        assignments: HashMap<String, Expr>,
36        selection: Option<Expr>,
37    ) -> Self {
38        Self {
39            table,
40            table_schema,
41            assignments,
42            selection,
43            update_rows: AtomicU32::new(0),
44            table_iterator: RefCell::new(None),
45            table_binding: OnceLock::new(),
46        }
47    }
48}
49
50impl VolcanoExecutor for PhysicalUpdate {
51    fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
52        self.update_rows.store(0, Ordering::SeqCst);
53        context.txn_ctx().ensure_writable(&self.table, "UPDATE")?;
54        let binding = resolve_table_binding(&self.table_binding, context, &self.table)?;
55        let stream = binding.scan()?;
56        self.table_iterator.replace(Some(stream));
57        context
58            .txn_ctx_mut()
59            .lock_table(self.table.clone(), LockMode::IntentionExclusive)
60            .map_err(|_| {
61                QuillSQLError::Execution(format!(
62                    "failed to acquire IX lock on table {}",
63                    self.table
64                ))
65            })?;
66        Ok(())
67    }
68
69    fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
70        // TODO may scan index
71        loop {
72            let next_entry = {
73                let mut guard = self.table_iterator.borrow_mut();
74                let stream = guard.as_mut().ok_or_else(|| stream_not_ready("Update"))?;
75                stream.next()?
76            };
77            if let Some((rid, meta, tuple)) = next_entry {
78                // Skip versions that were created by this command so we do not
79                // immediately reprocess the freshly inserted MVCC tuple and loop forever.
80                if meta.insert_txn_id == context.txn_ctx().txn_id()
81                    && meta.insert_cid == context.txn_ctx().command_id()
82                {
83                    continue;
84                }
85
86                if let Some(selection) = &self.selection {
87                    if !context.eval_predicate(selection, &tuple)? {
88                        continue;
89                    }
90                }
91
92                let binding = resolve_table_binding(&self.table_binding, context, &self.table)?;
93                let Some((prev_meta, mut current_tuple)) =
94                    binding.prepare_row_for_write(context.txn_ctx_mut(), rid, &meta)?
95                else {
96                    continue;
97                };
98                let prev_tuple = current_tuple.clone();
99                let mut eval_tuple = current_tuple.clone();
100
101                // update tuple data
102                for (col_name, value_expr) in self.assignments.iter() {
103                    let index = current_tuple.schema.index_of(None, col_name)?;
104                    let col_datatype = current_tuple.schema.columns[index].data_type;
105                    // use the updated value for subsequent expressions in this update
106                    // e.g., SET a = 1, b = a + 1
107                    // should set b to 2
108                    let new_value = context
109                        .eval_expr(value_expr, &eval_tuple)?
110                        .cast_to(&col_datatype)?;
111                    current_tuple.data[index] = new_value.clone();
112                    eval_tuple.data[index] = new_value;
113                }
114                binding.update(
115                    context.txn_ctx_mut(),
116                    rid,
117                    current_tuple.clone(),
118                    prev_meta,
119                    prev_tuple,
120                )?;
121
122                self.update_rows.fetch_add(1, Ordering::SeqCst);
123            } else {
124                return if self.update_rows.load(Ordering::SeqCst) == 0 {
125                    Ok(None)
126                } else {
127                    let update_rows = self.update_rows.swap(0, Ordering::SeqCst);
128                    Ok(Some(Tuple::new(
129                        self.output_schema(),
130                        vec![ScalarValue::Int32(Some(update_rows as i32))],
131                    )))
132                };
133            }
134        }
135    }
136
137    fn output_schema(&self) -> SchemaRef {
138        UPDATE_OUTPUT_SCHEMA_REF.clone()
139    }
140}
141
142impl std::fmt::Display for PhysicalUpdate {
143    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144        write!(f, "Update")
145    }
146}
147
148impl std::fmt::Debug for PhysicalUpdate {
149    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
150        f.debug_struct("PhysicalUpdate")
151            .field("table", &self.table)
152            .field("table_schema", &self.table_schema)
153            .field("assignments", &self.assignments)
154            .field("selection", &self.selection)
155            .field("update_rows", &self.update_rows)
156            .finish()
157    }
158}