quill_sql/execution/physical_plan/
update.rs

1use crate::catalog::{SchemaRef, UPDATE_OUTPUT_SCHEMA_REF};
2use crate::error::{QuillSQLError, QuillSQLResult};
3use crate::execution::{ExecutionContext, VolcanoExecutor};
4use crate::expression::Expr;
5use crate::storage::table_heap::TableIterator;
6use crate::storage::tuple::Tuple;
7use crate::transaction::LockMode;
8use crate::utils::scalar::ScalarValue;
9use crate::utils::table_ref::TableReference;
10use std::collections::HashMap;
11use std::sync::atomic::{AtomicU32, Ordering};
12use std::sync::Mutex;
13
14#[derive(Debug)]
15pub struct PhysicalUpdate {
16    pub table: TableReference,
17    pub table_schema: SchemaRef,
18    pub assignments: HashMap<String, Expr>,
19    pub selection: Option<Expr>,
20
21    update_rows: AtomicU32,
22    table_iterator: Mutex<Option<TableIterator>>,
23}
24
25impl PhysicalUpdate {
26    pub fn new(
27        table: TableReference,
28        table_schema: SchemaRef,
29        assignments: HashMap<String, Expr>,
30        selection: Option<Expr>,
31    ) -> Self {
32        Self {
33            table,
34            table_schema,
35            assignments,
36            selection,
37            update_rows: AtomicU32::new(0),
38            table_iterator: Mutex::new(None),
39        }
40    }
41}
42
43impl VolcanoExecutor for PhysicalUpdate {
44    fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
45        self.update_rows.store(0, Ordering::SeqCst);
46        context.ensure_writable(&self.table, "UPDATE")?;
47        let table_heap = context.table_heap(&self.table)?;
48        *self.table_iterator.lock().unwrap() = Some(TableIterator::new(table_heap.clone(), ..));
49        context
50            .lock_table(self.table.clone(), LockMode::IntentionExclusive)
51            .map_err(|_| {
52                QuillSQLError::Execution(format!(
53                    "failed to acquire IX lock on table {}",
54                    self.table
55                ))
56            })?;
57        Ok(())
58    }
59
60    fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
61        // TODO may scan index
62        let Some(table_iterator) = &mut *self.table_iterator.lock().unwrap() else {
63            return Err(QuillSQLError::Execution(
64                "table iterator not created".to_string(),
65            ));
66        };
67        let table_heap = context.table_heap(&self.table)?;
68
69        loop {
70            if let Some((rid, meta, tuple)) = table_iterator.next()? {
71                // Skip versions that were created by this command so we do not
72                // immediately reprocess the freshly inserted MVCC tuple and loop forever.
73                if meta.insert_txn_id == context.txn_id() && meta.insert_cid == context.command_id()
74                {
75                    continue;
76                }
77
78                if let Some(selection) = &self.selection {
79                    if !context.eval_predicate(selection, &tuple)? {
80                        continue;
81                    }
82                }
83
84                let Some((prev_meta, mut current_tuple)) =
85                    context.prepare_row_for_write(&self.table, rid, &table_heap, &meta)?
86                else {
87                    continue;
88                };
89                let prev_tuple = current_tuple.clone();
90                let mut eval_tuple = current_tuple.clone();
91
92                // update tuple data
93                for (col_name, value_expr) in self.assignments.iter() {
94                    let index = current_tuple.schema.index_of(None, col_name)?;
95                    let col_datatype = current_tuple.schema.columns[index].data_type;
96                    // use the updated value for subsequent expressions in this update
97                    // e.g., SET a = 1, b = a + 1
98                    // should set b to 2
99                    let new_value = context
100                        .eval_expr(value_expr, &eval_tuple)?
101                        .cast_to(&col_datatype)?;
102                    current_tuple.data[index] = new_value.clone();
103                    eval_tuple.data[index] = new_value;
104                }
105                context.apply_update(
106                    &self.table,
107                    table_heap.clone(),
108                    rid,
109                    current_tuple.clone(),
110                    prev_meta,
111                    prev_tuple,
112                )?;
113
114                self.update_rows.fetch_add(1, Ordering::SeqCst);
115            } else {
116                return if self.update_rows.load(Ordering::SeqCst) == 0 {
117                    Ok(None)
118                } else {
119                    let update_rows = self.update_rows.swap(0, Ordering::SeqCst);
120                    Ok(Some(Tuple::new(
121                        self.output_schema(),
122                        vec![ScalarValue::Int32(Some(update_rows as i32))],
123                    )))
124                };
125            }
126        }
127    }
128
129    fn output_schema(&self) -> SchemaRef {
130        UPDATE_OUTPUT_SCHEMA_REF.clone()
131    }
132}
133
134impl std::fmt::Display for PhysicalUpdate {
135    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
136        write!(f, "Update")
137    }
138}