vibesql_executor/update/
mod.rs

1//! UPDATE statement execution
2//!
3//! This module provides UPDATE statement execution with the following architecture:
4//!
5//! - `row_selector`: Handles WHERE clause evaluation and primary key index optimization
6//! - `value_updater`: Applies assignment expressions to rows
7//! - `constraints`: Validates NOT NULL, PRIMARY KEY, UNIQUE, and CHECK constraints
8//! - `foreign_keys`: Validates foreign key constraints and child references
9//!
10//! The main `UpdateExecutor` orchestrates these components to implement SQL's two-phase
11//! update semantics: first collect all updates evaluating against original rows, then
12//! apply all updates atomically.
13//!
14//! ## Performance Optimizations
15//!
16//! The executor includes a fast path for single-row primary key updates that:
17//! - Skips trigger checks when no triggers exist for the table
18//! - Avoids schema cloning
19//! - Uses single-pass execution instead of two-phase
20//! - Minimizes allocations
21
22mod constraints;
23mod foreign_keys;
24mod row_selector;
25mod value_updater;
26
27use constraints::ConstraintValidator;
28use foreign_keys::ForeignKeyValidator;
29use row_selector::RowSelector;
30use value_updater::ValueUpdater;
31use vibesql_ast::{BinaryOperator, Expression, UpdateStmt};
32use vibesql_storage::statistics::CostEstimator;
33use vibesql_storage::Database;
34
35use crate::{
36    dml_cost::DmlOptimizer, errors::ExecutorError, evaluator::ExpressionEvaluator,
37    privilege_checker::PrivilegeChecker,
38};
39
40/// Executor for UPDATE statements
41pub struct UpdateExecutor;
42
43impl UpdateExecutor {
44    /// Execute an UPDATE statement
45    ///
46    /// # Arguments
47    ///
48    /// * `stmt` - The UPDATE statement AST node
49    /// * `database` - The database to update
50    ///
51    /// # Returns
52    ///
53    /// Number of rows updated or error
54    ///
55    /// # Examples
56    ///
57    /// ```
58    /// use vibesql_ast::{Assignment, Expression, UpdateStmt};
59    /// use vibesql_catalog::{ColumnSchema, TableSchema};
60    /// use vibesql_executor::UpdateExecutor;
61    /// use vibesql_storage::Database;
62    /// use vibesql_types::{DataType, SqlValue};
63    ///
64    /// let mut db = Database::new();
65    ///
66    /// // Create table
67    /// let schema = TableSchema::new(
68    ///     "employees".to_string(),
69    ///     vec![
70    ///         ColumnSchema::new("id".to_string(), DataType::Integer, false),
71    ///         ColumnSchema::new("salary".to_string(), DataType::Integer, false),
72    ///     ],
73    /// );
74    /// db.create_table(schema).unwrap();
75    ///
76    /// // Insert a row
77    /// db.insert_row(
78    ///     "employees",
79    ///     vibesql_storage::Row::new(vec![SqlValue::Integer(1), SqlValue::Integer(50000)]),
80    /// )
81    /// .unwrap();
82    ///
83    /// // Update salary
84    /// let stmt = UpdateStmt {
85    ///     table_name: "employees".to_string(),
86    ///     assignments: vec![Assignment {
87    ///         column: "salary".to_string(),
88    ///         value: Expression::Literal(SqlValue::Integer(60000)),
89    ///     }],
90    ///     where_clause: None,
91    /// };
92    ///
93    /// let count = UpdateExecutor::execute(&stmt, &mut db).unwrap();
94    /// assert_eq!(count, 1);
95    /// ```
96    pub fn execute(stmt: &UpdateStmt, database: &mut Database) -> Result<usize, ExecutorError> {
97        Self::execute_internal(stmt, database, None, None, None)
98    }
99
100    /// Execute an UPDATE statement with procedural context
101    /// Supports procedural variables in SET and WHERE clauses
102    pub fn execute_with_procedural_context(
103        stmt: &UpdateStmt,
104        database: &mut Database,
105        procedural_context: &crate::procedural::ExecutionContext,
106    ) -> Result<usize, ExecutorError> {
107        Self::execute_internal(stmt, database, None, Some(procedural_context), None)
108    }
109
110    /// Execute an UPDATE statement with trigger context
111    /// This allows UPDATE statements within trigger bodies to reference OLD/NEW pseudo-variables
112    pub fn execute_with_trigger_context(
113        stmt: &UpdateStmt,
114        database: &mut Database,
115        trigger_context: &crate::trigger_execution::TriggerContext,
116    ) -> Result<usize, ExecutorError> {
117        Self::execute_internal(stmt, database, None, None, Some(trigger_context))
118    }
119
120    /// Execute an UPDATE statement with optional pre-fetched schema
121    ///
122    /// This method allows cursor-level schema caching to reduce redundant catalog lookups.
123    /// If schema is provided, skips the catalog lookup step.
124    ///
125    /// # Arguments
126    ///
127    /// * `stmt` - The UPDATE statement AST node
128    /// * `database` - The database to update
129    /// * `schema` - Optional pre-fetched schema (from cursor cache)
130    ///
131    /// # Returns
132    ///
133    /// Number of rows updated or error
134    pub fn execute_with_schema(
135        stmt: &UpdateStmt,
136        database: &mut Database,
137        schema: Option<&vibesql_catalog::TableSchema>,
138    ) -> Result<usize, ExecutorError> {
139        Self::execute_internal(stmt, database, schema, None, None)
140    }
141
142    /// Internal implementation supporting both schema caching, procedural context, and trigger context
143    fn execute_internal(
144        stmt: &UpdateStmt,
145        database: &mut Database,
146        schema: Option<&vibesql_catalog::TableSchema>,
147        procedural_context: Option<&crate::procedural::ExecutionContext>,
148        trigger_context: Option<&crate::trigger_execution::TriggerContext>,
149    ) -> Result<usize, ExecutorError> {
150        // Check UPDATE privilege on the table
151        PrivilegeChecker::check_update(database, &stmt.table_name)?;
152
153        // Step 1: Get table schema - clone it to avoid borrow issues
154        // We need owned schema because we take mutable references to database later
155        let schema_owned: vibesql_catalog::TableSchema = if let Some(s) = schema {
156            s.clone()
157        } else {
158            database
159                .catalog
160                .get_table(&stmt.table_name)
161                .ok_or_else(|| ExecutorError::TableNotFound(stmt.table_name.clone()))?
162                .clone()
163        };
164        let schema = &schema_owned;
165
166        // Check if table has UPDATE triggers (check once, use multiple times)
167        let has_triggers = trigger_context.is_none()
168            && database
169                .catalog
170                .get_triggers_for_table(
171                    &stmt.table_name,
172                    Some(vibesql_ast::TriggerEvent::Update(None)),
173                )
174                .next()
175                .is_some();
176
177        // Try fast path for simple single-row PK updates without triggers
178        // Conditions: no triggers, no procedural context, simple WHERE pk = value
179        if !has_triggers && procedural_context.is_none() && trigger_context.is_none() {
180            if let Some(result) = Self::try_fast_path_update(stmt, database, schema)? {
181                return Ok(result);
182            }
183        }
184
185        // Fire BEFORE STATEMENT triggers only if triggers exist
186        if has_triggers {
187            crate::TriggerFirer::execute_before_statement_triggers(
188                database,
189                &stmt.table_name,
190                vibesql_ast::TriggerEvent::Update(None),
191            )?;
192        }
193
194        // Get PK indices without cloning entire schema
195        let pk_indices = schema.get_primary_key_indices();
196
197        // Step 2: Get table from storage (for reading rows)
198        let table = database
199            .get_table(&stmt.table_name)
200            .ok_or_else(|| ExecutorError::TableNotFound(stmt.table_name.clone()))?;
201
202        // Step 3: Create expression evaluator with database reference for subquery support
203        //         and optional procedural/trigger context for variable resolution
204        let evaluator = if let Some(ctx) = trigger_context {
205            // Trigger context takes precedence (trigger statements can't have procedural context)
206            ExpressionEvaluator::with_trigger_context(schema, database, ctx)
207        } else if let Some(ctx) = procedural_context {
208            ExpressionEvaluator::with_procedural_context(schema, database, ctx)
209        } else {
210            ExpressionEvaluator::with_database(schema, database)
211        };
212
213        // Step 4: Select rows to update using RowSelector
214        let row_selector = RowSelector::new(schema, &evaluator);
215        let candidate_rows = row_selector.select_rows(table, &stmt.where_clause)?;
216
217        // Estimate DML cost for query analysis and optimization decisions
218        if std::env::var("DML_COST_DEBUG").is_ok() && !candidate_rows.is_empty() {
219            if let Some(index_info) = database.get_table_index_info(&stmt.table_name) {
220                // Get table statistics for cost estimation (use cached if available, or fallback to estimate)
221                let table_stats = table
222                    .get_statistics()
223                    .cloned()
224                    .unwrap_or_else(|| vibesql_storage::TableStatistics::estimate_from_row_count(table.row_count()));
225
226                // Estimate the ratio of indexes affected based on columns being updated
227                // This is a heuristic: assume columns are distributed evenly across indexes
228                let total_columns = schema.columns.len();
229                let changed_columns = stmt.assignments.len();
230                let indexes_affected_ratio = if total_columns > 0 {
231                    (changed_columns as f64 / total_columns as f64).min(1.0)
232                } else {
233                    1.0 // Conservative estimate if no columns
234                };
235
236                let cost_estimator = CostEstimator::default();
237                let estimated_cost = cost_estimator.estimate_update(
238                    candidate_rows.len(),
239                    &table_stats,
240                    &index_info,
241                    indexes_affected_ratio,
242                );
243                eprintln!(
244                    "DML_COST_DEBUG: UPDATE {} rows in {} - estimated_cost: {:.2} (hash_indexes: {}, btree_indexes: {}, columnar: {}, affected_ratio: {:.2})",
245                    candidate_rows.len(),
246                    stmt.table_name,
247                    estimated_cost,
248                    index_info.hash_index_count,
249                    index_info.btree_index_count,
250                    index_info.is_native_columnar,
251                    indexes_affected_ratio
252                );
253            }
254        }
255
256        // Step 5: Create value updater
257        let value_updater = ValueUpdater::new(schema, &evaluator, &stmt.table_name);
258
259        // Step 6: Build list of updates (two-phase execution for SQL semantics)
260        // Each update consists of: (row_index, old_row, new_row, changed_columns, updates_pk)
261        let mut updates: Vec<(
262            usize,
263            vibesql_storage::Row,
264            vibesql_storage::Row,
265            std::collections::HashSet<usize>,
266            bool, // whether PK is being updated
267        )> = Vec::new();
268
269        for (row_index, row) in candidate_rows {
270            // Clear CSE cache before evaluating assignment expressions for this row
271            // to prevent cached column values from previous rows
272            evaluator.clear_cse_cache();
273
274            // Apply assignments to build updated row
275            let (new_row, changed_columns) =
276                value_updater.apply_assignments(&row, &stmt.assignments)?;
277
278            // Check if primary key is being updated
279            let updates_pk = if let Some(ref pk_idx) = pk_indices {
280                stmt.assignments.iter().any(|a| {
281                    let col_index = schema.get_column_index(&a.column).unwrap();
282                    pk_idx.contains(&col_index)
283                })
284            } else {
285                false
286            };
287
288            // Validate all constraints (NOT NULL, PRIMARY KEY, UNIQUE, CHECK)
289            let constraint_validator = ConstraintValidator::new(schema);
290            constraint_validator.validate_row(
291                table,
292                &stmt.table_name,
293                row_index,
294                &new_row,
295                &row,
296            )?;
297
298            // Validate user-defined UNIQUE indexes (CREATE UNIQUE INDEX)
299            constraint_validator.validate_unique_indexes(
300                database,
301                &stmt.table_name,
302                &new_row,
303                &row,
304            )?;
305
306            // Enforce FOREIGN KEY constraints (child table)
307            if !schema.foreign_keys.is_empty() {
308                ForeignKeyValidator::validate_constraints(
309                    database,
310                    &stmt.table_name,
311                    &new_row.values,
312                )?;
313            }
314
315            updates.push((row_index, row.clone(), new_row, changed_columns, updates_pk));
316        }
317
318        // Step 7: Handle CASCADE updates for primary key changes (before triggers)
319        // This must happen after validation but before applying parent updates
320        for (_row_index, old_row, new_row, _changed_columns, updates_pk) in &updates {
321            if *updates_pk {
322                ForeignKeyValidator::check_no_child_references(
323                    database,
324                    &stmt.table_name,
325                    old_row,
326                    new_row,
327                )?;
328            }
329        }
330
331        // Cost-based optimization: Log update cost with indexes_affected_ratio
332        if !updates.is_empty() {
333            // Compute aggregate changed columns across all updates
334            let mut all_changed_columns = std::collections::HashSet::new();
335            for (_, _, _, changed_cols, _) in &updates {
336                all_changed_columns.extend(changed_cols.iter().copied());
337            }
338
339            let optimizer = DmlOptimizer::new(database, &stmt.table_name);
340            let indexes_affected_ratio =
341                optimizer.compute_indexes_affected_ratio(&all_changed_columns, schema);
342            let _update_cost = optimizer.estimate_update_cost(updates.len(), indexes_affected_ratio);
343
344            // Log optimization insight: selective updates (low affected ratio) are much cheaper
345            if std::env::var("DML_COST_DEBUG").is_ok() && indexes_affected_ratio < 1.0 {
346                eprintln!(
347                    "DML_COST_DEBUG: UPDATE on {} - {} rows, {:.0}% indexes affected (selective update optimization)",
348                    stmt.table_name,
349                    updates.len(),
350                    indexes_affected_ratio * 100.0
351                );
352            }
353        }
354
355        // Fire BEFORE UPDATE triggers for all rows (before database mutation)
356        if has_triggers {
357            for (_row_index, old_row, new_row, _changed_columns, _updates_pk) in &updates {
358                crate::TriggerFirer::execute_before_triggers(
359                    database,
360                    &stmt.table_name,
361                    vibesql_ast::TriggerEvent::Update(None),
362                    Some(old_row),
363                    Some(new_row),
364                )?;
365            }
366        }
367
368        // Step 8: Apply all updates (after evaluation phase completes)
369        let update_count = updates.len();
370
371        // Get mutable table reference
372        let table_mut = database
373            .get_table_mut(&stmt.table_name)
374            .ok_or_else(|| ExecutorError::TableNotFound(stmt.table_name.clone()))?;
375
376        // Collect the updates first
377        let mut index_updates = Vec::new();
378        for (index, old_row, new_row, changed_columns, _updates_pk) in &updates {
379            table_mut
380                .update_row_selective(*index, new_row.clone(), changed_columns)
381                .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
382
383            index_updates.push((*index, old_row.clone(), new_row.clone(), changed_columns.clone()));
384        }
385
386        // Fire AFTER UPDATE triggers for all updated rows
387        if has_triggers {
388            for (_index, old_row, new_row, _changed_columns) in &index_updates {
389                crate::TriggerFirer::execute_after_triggers(
390                    database,
391                    &stmt.table_name,
392                    vibesql_ast::TriggerEvent::Update(None),
393                    Some(old_row),
394                    Some(new_row),
395                )?;
396            }
397        }
398
399        // Now update user-defined indexes after releasing table borrow
400        // Pass changed_columns to skip indexes that don't involve any modified columns
401        for (index, old_row, new_row, changed_columns) in index_updates {
402            database.update_indexes_for_update(&stmt.table_name, &old_row, &new_row, index, Some(&changed_columns));
403        }
404
405        // Invalidate the database-level columnar cache since table data changed.
406        // Note: Table-level cache is invalidated by update_row_fast()/update_row_selective().
407        // Both invalidations are necessary because they manage separate caches:
408        // - Table-level cache: used by Table::scan_columnar() for SIMD filtering
409        // - Database-level cache: used by Database::get_columnar() for cached access
410        if update_count > 0 {
411            database.invalidate_columnar_cache(&stmt.table_name);
412        }
413
414        // Fire AFTER STATEMENT triggers only if triggers exist
415        if has_triggers {
416            crate::TriggerFirer::execute_after_statement_triggers(
417                database,
418                &stmt.table_name,
419                vibesql_ast::TriggerEvent::Update(None),
420            )?;
421        }
422
423        Ok(update_count)
424    }
425
426    /// Try to execute UPDATE via fast path for simple single-row PK updates.
427    /// Returns Some(count) if fast path succeeded, None if we should use normal path.
428    ///
429    /// Fast path conditions:
430    /// - WHERE clause is simple equality on single-column primary key
431    /// - No foreign keys to validate
432    /// - Table has a primary key index
433    fn try_fast_path_update(
434        stmt: &UpdateStmt,
435        database: &mut Database,
436        schema: &vibesql_catalog::TableSchema,
437    ) -> Result<Option<usize>, ExecutorError> {
438        // Check if we have a simple PK lookup in WHERE clause
439        let where_clause = match &stmt.where_clause {
440            Some(vibesql_ast::WhereClause::Condition(expr)) => expr,
441            _ => return Ok(None), // No WHERE or CURRENT OF - use normal path
442        };
443
444        // Extract PK value from WHERE clause
445        let pk_value = match Self::extract_pk_equality(where_clause, schema) {
446            Some(val) => val,
447            None => return Ok(None), // Not a simple PK equality
448        };
449
450        // Get table and check for PK index, look up row index
451        let row_index = {
452            let table = database
453                .get_table(&stmt.table_name)
454                .ok_or_else(|| ExecutorError::TableNotFound(stmt.table_name.clone()))?;
455
456            let pk_index = match table.primary_key_index() {
457                Some(idx) => idx,
458                None => return Ok(None), // No PK index
459            };
460
461            // Look up row by PK
462            match pk_index.get(&pk_value) {
463                Some(&idx) => idx,
464                None => return Ok(Some(0)), // Row not found - 0 rows updated
465            }
466        }; // table borrow ends here
467
468        // SUPER-FAST PATH: All literal assignments to non-indexed, non-PK, non-unique columns
469        // This path avoids ALL row cloning by updating columns in-place
470        // Extended from single-assignment to support multiple assignments (ONEPASS optimization)
471        if let Some(result) = Self::try_super_fast_path(stmt, database, schema, row_index)? {
472            return Ok(Some(result));
473        }
474
475        // Skip fast path if table has foreign keys (need validation)
476        if !schema.foreign_keys.is_empty() {
477            return Ok(None);
478        }
479
480        // Skip fast path if table has unique constraints (need validation)
481        if !schema.unique_constraints.is_empty() {
482            return Ok(None);
483        }
484
485        // Check if we're updating PK columns - if so, check for CASCADE requirements
486        if let Some(ref pk_idx) = schema.get_primary_key_indices() {
487            let updates_pk = stmt.assignments.iter().any(|a| {
488                schema.get_column_index(&a.column).map(|idx| pk_idx.contains(&idx)).unwrap_or(false)
489            });
490            if updates_pk {
491                // Check if ANY table in database has foreign keys (might need CASCADE)
492                let has_any_fks = database.catalog.list_tables().iter().any(|table_name| {
493                    database
494                        .catalog
495                        .get_table(table_name)
496                        .map(|s| !s.foreign_keys.is_empty())
497                        .unwrap_or(false)
498                });
499                if has_any_fks {
500                    return Ok(None); // Use normal path for CASCADE handling
501                }
502            }
503        }
504
505        // Re-borrow table to get the old row
506        let table = database
507            .get_table(&stmt.table_name)
508            .ok_or_else(|| ExecutorError::TableNotFound(stmt.table_name.clone()))?;
509        let old_row = table.scan()[row_index].clone();
510
511        // Create evaluator for expression evaluation
512        let evaluator = ExpressionEvaluator::with_database(schema, database);
513
514        // Apply assignments
515        let mut new_row = old_row.clone();
516        let mut changed_columns = std::collections::HashSet::new();
517
518        for assignment in &stmt.assignments {
519            let col_index = schema.get_column_index(&assignment.column).ok_or_else(|| {
520                ExecutorError::ColumnNotFound {
521                    column_name: assignment.column.clone(),
522                    table_name: stmt.table_name.clone(),
523                    searched_tables: vec![stmt.table_name.clone()],
524                    available_columns: schema.columns.iter().map(|c| c.name.clone()).collect(),
525                }
526            })?;
527
528            let new_value = match &assignment.value {
529                vibesql_ast::Expression::Default => {
530                    let column = &schema.columns[col_index];
531                    if let Some(default_expr) = &column.default_value {
532                        match default_expr {
533                            vibesql_ast::Expression::Literal(lit) => lit.clone(),
534                            _ => return Ok(None), // Complex default - use normal path
535                        }
536                    } else {
537                        vibesql_types::SqlValue::Null
538                    }
539                }
540                _ => evaluator.eval(&assignment.value, &old_row)?,
541            };
542
543            new_row
544                .set(col_index, new_value)
545                .map_err(|e| ExecutorError::StorageError(e.to_string()))?;
546            changed_columns.insert(col_index);
547        }
548
549        // Quick constraint validation (NOT NULL only for changed columns)
550        for &col_idx in &changed_columns {
551            let column = &schema.columns[col_idx];
552            if !column.nullable && new_row.values[col_idx] == vibesql_types::SqlValue::Null {
553                return Err(ExecutorError::ConstraintViolation(format!(
554                    "NOT NULL constraint violation: column '{}' cannot be NULL",
555                    column.name
556                )));
557            }
558        }
559
560        // Check PK uniqueness if updating PK columns
561        let pk_indices = schema.get_primary_key_indices();
562        if let Some(ref pk_idx) = pk_indices {
563            let updates_pk = changed_columns.iter().any(|c| pk_idx.contains(c));
564            if updates_pk {
565                // PK is being updated - need to check uniqueness
566                let new_pk: Vec<_> = pk_idx.iter().map(|&i| new_row.values[i].clone()).collect();
567                if let Some(pk_index) = table.primary_key_index() {
568                    if let Some(&existing_idx) = pk_index.get(&new_pk) {
569                        if existing_idx != row_index {
570                            return Err(ExecutorError::ConstraintViolation(format!(
571                                "PRIMARY KEY constraint violation: duplicate key {:?} on {}",
572                                new_pk, stmt.table_name
573                            )));
574                        }
575                    }
576                }
577            }
578        }
579
580        // Update user-defined indexes FIRST (while we still have both row references)
581        // Pass changed_columns to skip indexes that don't involve any modified columns
582        database.update_indexes_for_update(&stmt.table_name, &old_row, &new_row, row_index, Some(&changed_columns));
583
584        // Apply the update directly (transfers ownership of new_row, no clone needed)
585        let table_mut = database
586            .get_table_mut(&stmt.table_name)
587            .ok_or_else(|| ExecutorError::TableNotFound(stmt.table_name.clone()))?;
588
589        // Use unchecked variant - row is already validated above
590        table_mut.update_row_unchecked(row_index, new_row, &old_row, &changed_columns);
591
592        Ok(Some(1))
593    }
594
595    /// Try SUPER-FAST path: direct in-place column updates for literal assignments
596    /// to non-indexed, non-PK, non-unique columns.
597    ///
598    /// This is the ONEPASS optimization for single-row updates:
599    /// - Supports multiple assignments (not just single)
600    /// - Validates all columns can be updated in-place
601    /// - No row cloning required
602    ///
603    /// Returns Some(1) if all updates were applied in-place, None if should use normal path.
604    fn try_super_fast_path(
605        stmt: &UpdateStmt,
606        database: &mut Database,
607        schema: &vibesql_catalog::TableSchema,
608        row_index: usize,
609    ) -> Result<Option<usize>, ExecutorError> {
610        // Collect all literal updates that can be done in-place
611        let mut inplace_updates: Vec<(usize, vibesql_types::SqlValue)> = Vec::new();
612
613        let pk_indices = schema.get_primary_key_indices();
614
615        for assignment in &stmt.assignments {
616            // Check if value is a literal (no expression evaluation needed)
617            let new_value = match &assignment.value {
618                vibesql_ast::Expression::Literal(val) => val.clone(),
619                _ => return Ok(None), // Non-literal expression - use normal path
620            };
621
622            let col_index = match schema.get_column_index(&assignment.column) {
623                Some(idx) => idx,
624                None => return Ok(None), // Column not found - let normal path handle error
625            };
626
627            // Check column is not in PK
628            let is_pk_col = pk_indices
629                .as_ref()
630                .map(|pk| pk.contains(&col_index))
631                .unwrap_or(false);
632            if is_pk_col {
633                return Ok(None); // PK update needs full validation
634            }
635
636            // Check column is not in any unique constraint
637            let col_name_upper = assignment.column.to_uppercase();
638            let is_unique_col = schema
639                .unique_constraints
640                .iter()
641                .any(|uc| uc.iter().any(|name| name.to_uppercase() == col_name_upper));
642            if is_unique_col {
643                return Ok(None); // Unique constraint needs validation
644            }
645
646            // Check NOT NULL constraint
647            let column = &schema.columns[col_index];
648            if !column.nullable && new_value == vibesql_types::SqlValue::Null {
649                return Err(ExecutorError::ConstraintViolation(format!(
650                    "NOT NULL constraint violation: column '{}' cannot be NULL",
651                    column.name
652                )));
653            }
654
655            // Check no user-defined indexes on this column
656            if database.has_index_on_column(&stmt.table_name, &assignment.column) {
657                return Ok(None); // Index update needs normal path
658            }
659
660            inplace_updates.push((col_index, new_value));
661        }
662
663        // All checks passed - apply updates in-place
664        if inplace_updates.is_empty() {
665            return Ok(None); // No updates to apply
666        }
667
668        let table_mut = database
669            .get_table_mut(&stmt.table_name)
670            .ok_or_else(|| ExecutorError::TableNotFound(stmt.table_name.clone()))?;
671
672        // Apply all column updates in-place (no row cloning!)
673        for (col_index, new_value) in inplace_updates {
674            table_mut.update_column_inplace(row_index, col_index, new_value);
675        }
676
677        Ok(Some(1))
678    }
679
680    /// Extract primary key values from WHERE expression.
681    ///
682    /// Supports:
683    /// - Single-column PK: `pk = value` or `value = pk`
684    /// - Composite PK: `pk1 = val1 AND pk2 = val2` (any order)
685    ///
686    /// Returns Some(pk_values) in PK column order if all PK columns are matched,
687    /// None otherwise.
688    fn extract_pk_equality(
689        expr: &Expression,
690        schema: &vibesql_catalog::TableSchema,
691    ) -> Option<Vec<vibesql_types::SqlValue>> {
692        let pk_indices = schema.get_primary_key_indices()?;
693        if pk_indices.is_empty() {
694            return None;
695        }
696
697        // Collect all column = literal equalities from the expression
698        let mut equalities: std::collections::HashMap<usize, vibesql_types::SqlValue> =
699            std::collections::HashMap::new();
700        Self::collect_pk_equalities(expr, schema, &mut equalities);
701
702        // Check if we have all PK columns and build result in PK order
703        let mut pk_values = Vec::with_capacity(pk_indices.len());
704        for &pk_col in &pk_indices {
705            match equalities.get(&pk_col) {
706                Some(value) => pk_values.push(value.clone()),
707                None => return None, // Missing PK column
708            }
709        }
710
711        Some(pk_values)
712    }
713
714    /// Recursively collect column = literal equalities from WHERE expression
715    fn collect_pk_equalities(
716        expr: &Expression,
717        schema: &vibesql_catalog::TableSchema,
718        equalities: &mut std::collections::HashMap<usize, vibesql_types::SqlValue>,
719    ) {
720        match expr {
721            Expression::BinaryOp { left, op: BinaryOperator::And, right } => {
722                // Recurse into AND branches
723                Self::collect_pk_equalities(left, schema, equalities);
724                Self::collect_pk_equalities(right, schema, equalities);
725            }
726            Expression::Conjunction(exprs) => {
727                // Handle flattened AND chains
728                for e in exprs {
729                    Self::collect_pk_equalities(e, schema, equalities);
730                }
731            }
732            Expression::BinaryOp { left, op: BinaryOperator::Equal, right } => {
733                // Check: column = literal
734                if let (Expression::ColumnRef { column, .. }, Expression::Literal(value)) =
735                    (left.as_ref(), right.as_ref())
736                {
737                    if let Some(col_index) = schema.get_column_index(column) {
738                        equalities.insert(col_index, value.clone());
739                    }
740                }
741                // Check: literal = column
742                else if let (Expression::Literal(value), Expression::ColumnRef { column, .. }) =
743                    (left.as_ref(), right.as_ref())
744                {
745                    if let Some(col_index) = schema.get_column_index(column) {
746                        equalities.insert(col_index, value.clone());
747                    }
748                }
749            }
750            _ => {} // Ignore other expressions
751        }
752    }
753}
754
755/// Execute an UPDATE statement with trigger context
756/// This function is used when executing UPDATE statements within trigger bodies
757/// to support OLD/NEW pseudo-variable references
758pub fn execute_update_with_trigger_context(
759    database: &mut Database,
760    stmt: &UpdateStmt,
761    trigger_context: &crate::trigger_execution::TriggerContext,
762) -> Result<usize, ExecutorError> {
763    UpdateExecutor::execute_with_trigger_context(stmt, database, trigger_context)
764}