vibesql_executor/delete/
executor.rs

1//! DELETE statement execution
2
3use vibesql_ast::DeleteStmt;
4use vibesql_catalog::TableIdentifier;
5use vibesql_storage::Database;
6use vibesql_types::SqlValue;
7
8use super::integrity::check_no_child_references;
9use crate::{
10    dml_cost::DmlOptimizer, errors::ExecutorError, evaluator::ExpressionEvaluator,
11    expression_index_maintenance, privilege_checker::PrivilegeChecker,
12    sqlite_schema::is_sqlite_schema_table, truncate_validation::can_use_truncate,
13};
14
15/// Executor for DELETE statements
16pub struct DeleteExecutor;
17
18impl DeleteExecutor {
19    /// Execute a DELETE statement
20    ///
21    /// # Arguments
22    ///
23    /// * `stmt` - The DELETE statement AST node
24    /// * `database` - The database to delete from
25    ///
26    /// # Returns
27    ///
28    /// Number of rows deleted or error
29    ///
30    /// # Examples
31    ///
32    /// ```
33    /// use vibesql_ast::{BinaryOperator, DeleteStmt, Expression, WhereClause};
34    /// use vibesql_catalog::{ColumnSchema, TableSchema};
35    /// use vibesql_executor::DeleteExecutor;
36    /// use vibesql_storage::Database;
37    /// use vibesql_types::{DataType, SqlValue};
38    ///
39    /// let mut db = Database::new();
40    ///
41    /// // Create table
42    /// let schema = TableSchema::new(
43    ///     "users".to_string(),
44    ///     vec![
45    ///         ColumnSchema::new("id".to_string(), DataType::Integer, false),
46    ///         ColumnSchema::new(
47    ///             "name".to_string(),
48    ///             DataType::Varchar { max_length: Some(50) },
49    ///             false,
50    ///         ),
51    ///     ],
52    /// );
53    /// db.create_table(schema).unwrap();
54    ///
55    /// // Insert rows
56    /// db.insert_row(
57    ///     "users",
58    ///     vibesql_storage::Row::new(vec![
59    ///         SqlValue::Integer(1),
60    ///         SqlValue::Varchar(arcstr::ArcStr::from("Alice")),
61    ///     ]),
62    /// )
63    /// .unwrap();
64    /// db.insert_row(
65    ///     "users",
66    ///     vibesql_storage::Row::new(vec![
67    ///         SqlValue::Integer(2),
68    ///         SqlValue::Varchar(arcstr::ArcStr::from("Bob")),
69    ///     ]),
70    /// )
71    /// .unwrap();
72    ///
73    /// // Delete specific row
74    /// let stmt = DeleteStmt {
75    ///     with_clause: None,
76    ///     only: false,
77    ///     table_name: "users".to_string(),
78    ///     quoted: false,
79    ///     where_clause: Some(WhereClause::Condition(Expression::BinaryOp {
80    ///         left: Box::new(Expression::ColumnRef(vibesql_ast::ColumnIdentifier::simple("id", false))),
81    ///         op: BinaryOperator::Equal,
82    ///         right: Box::new(Expression::Literal(SqlValue::Integer(1))),
83    ///     })),
84    ///     order_by: None,
85    ///     limit: None,
86    ///     offset: None,
87    /// };
88    ///
89    /// let count = DeleteExecutor::execute(&stmt, &mut db).unwrap();
90    /// assert_eq!(count, 1);
91    /// ```
92    pub fn execute(stmt: &DeleteStmt, database: &mut Database) -> Result<usize, ExecutorError> {
93        Self::execute_internal(stmt, database, None, None)
94    }
95
96    /// Execute a DELETE statement with procedural context
97    /// Supports procedural variables in WHERE clause
98    pub fn execute_with_procedural_context(
99        stmt: &DeleteStmt,
100        database: &mut Database,
101        procedural_context: &crate::procedural::ExecutionContext,
102    ) -> Result<usize, ExecutorError> {
103        Self::execute_internal(stmt, database, Some(procedural_context), None)
104    }
105
106    /// Execute a DELETE statement with trigger context
107    /// This allows DELETE statements within trigger bodies to reference OLD/NEW pseudo-variables
108    pub fn execute_with_trigger_context(
109        stmt: &DeleteStmt,
110        database: &mut Database,
111        trigger_context: &crate::trigger_execution::TriggerContext,
112    ) -> Result<usize, ExecutorError> {
113        Self::execute_internal(stmt, database, None, Some(trigger_context))
114    }
115
116    /// Internal implementation supporting procedural context and trigger context
117    fn execute_internal(
118        stmt: &DeleteStmt,
119        database: &mut Database,
120        procedural_context: Option<&crate::procedural::ExecutionContext>,
121        trigger_context: Option<&crate::trigger_execution::TriggerContext>,
122    ) -> Result<usize, ExecutorError> {
123        // Note: stmt.only is currently ignored (treated as false)
124        // ONLY keyword is used in table inheritance to exclude derived tables.
125        // Since table inheritance is not yet implemented, we treat all deletes the same.
126
127        // Check if target is sqlite_master/sqlite_schema (read-only system table)
128        if is_sqlite_schema_table(&stmt.table_name) {
129            return Err(ExecutorError::SqliteSystemTableReadOnly {
130                table_name: stmt.table_name.clone(),
131                operation: "modified".to_string(),
132            });
133        }
134
135        // Check DELETE privilege on the table
136        PrivilegeChecker::check_delete(database, &stmt.table_name)?;
137
138        // Check if target is a VIEW with INSTEAD OF triggers
139        if let Some(view_def) = database.catalog.get_view(&stmt.table_name).cloned() {
140            return execute_delete_on_view(
141                database,
142                stmt,
143                &view_def,
144                procedural_context,
145                trigger_context,
146            );
147        }
148
149        // Use TableIdentifier for SQL:1999 case-sensitive lookups when quoted
150        // Handle schema-qualified table names (e.g., "temp.t1")
151        let table_id = if let Some((schema_part, table_part)) = stmt.table_name.split_once('.') {
152            // Schema-qualified name: schema.table
153            TableIdentifier::qualified(schema_part, false, table_part, stmt.quoted)
154        } else {
155            TableIdentifier::new(&stmt.table_name, stmt.quoted)
156        };
157
158        // Check table exists
159        if !database.catalog.table_exists_by_identifier(&table_id) {
160            return Err(ExecutorError::TableNotFound(stmt.table_name.clone()));
161        }
162
163        // Fast path: DELETE FROM table (no WHERE clause)
164        // Use TRUNCATE-style optimization for 100-1000x performance improvement
165        // Only use truncate if there's no ORDER BY or LIMIT (which would restrict which rows to delete)
166        if stmt.where_clause.is_none()
167            && stmt.order_by.is_none()
168            && stmt.limit.is_none()
169            && can_use_truncate(database, &stmt.table_name)?
170        {
171            return execute_truncate(database, &stmt.table_name);
172        }
173
174        // Step 1: Get schema (clone to avoid borrow issues)
175        let schema = database
176            .catalog
177            .get_table_by_identifier(&table_id)
178            .ok_or_else(|| ExecutorError::TableNotFound(stmt.table_name.clone()))?
179            .clone();
180
181        // Use canonical table name from schema for all storage operations
182        // This ensures case-sensitive tables (quoted identifiers) are accessed correctly
183        let table_name = &schema.name;
184
185        // Fast path: Single-row PK delete without triggers/FKs
186        // This avoids ExpressionEvaluator creation and row cloning
187        if procedural_context.is_none() && trigger_context.is_none() {
188            if let Some(vibesql_ast::WhereClause::Condition(where_expr)) = &stmt.where_clause {
189                if let Some(pk_values) = Self::extract_primary_key_lookup(where_expr, &schema) {
190                    // Check if we can use the super-fast path (no triggers, no FKs)
191                    let has_triggers = database
192                        .catalog
193                        .get_triggers_for_table(table_name, Some(vibesql_ast::TriggerEvent::Delete))
194                        .next()
195                        .is_some();
196
197                    // Fast check: if this table has no PK, FKs can't reference it
198                    let has_pk = schema.get_primary_key_indices().is_some();
199                    let has_referencing_fks = has_pk
200                        && database.catalog.list_tables().iter().any(|t| {
201                            database
202                                .catalog
203                                .get_table(t)
204                                .map(|s| {
205                                    s.foreign_keys
206                                        .iter()
207                                        .any(|fk| fk.parent_table.eq_ignore_ascii_case(table_name))
208                                })
209                                .unwrap_or(false)
210                        });
211
212                    // Also skip fast path if there are expression indexes that need maintenance
213                    let has_expression_indexes = database.has_expression_indexes(table_name);
214                    if !has_triggers && !has_referencing_fks && !has_expression_indexes {
215                        // Use the fast path - no triggers, no FKs, no expression indexes, single row PK delete
216                        match database.delete_by_pk_fast(table_name, &pk_values) {
217                            Ok(deleted) => {
218                                let count = if deleted { 1 } else { 0 };
219                                // Check all assertions after DELETE completes (SQL:1999 Feature
220                                // F671/F672) This ensures
221                                // database-wide integrity constraints are maintained
222                                crate::advanced_objects::AssertionChecker::check_all_assertions(
223                                    database,
224                                )?;
225                                return Ok(count);
226                            }
227                            Err(_) => {
228                                // Fall through to standard path on error
229                            }
230                        }
231                    }
232                }
233            }
234        }
235
236        // Step 2: Evaluate WHERE clause and collect rows to delete (two-phase execution)
237        // Get table for scanning
238        let table = database
239            .get_table(table_name)
240            .ok_or_else(|| ExecutorError::TableNotFound(stmt.table_name.clone()))?;
241
242        // Execute CTEs if present (WITH clause support)
243        let cte_results = if let Some(ref cte_list) = stmt.with_clause {
244            Some(crate::select::cte::execute_ctes(cte_list, |cte_query, prior_ctes| {
245                let cte_executor = crate::SelectExecutor::new_with_cte(database, prior_ctes);
246                cte_executor.execute(cte_query)
247            })?)
248        } else {
249            None
250        };
251
252        // Create evaluator with database reference for subquery support (EXISTS, NOT EXISTS, IN
253        // with subquery, etc.) and optional procedural/trigger context for variable resolution
254        let mut evaluator = if let Some(ctx) = trigger_context {
255            // Trigger context takes precedence (trigger statements can't have procedural context)
256            ExpressionEvaluator::with_trigger_context(&schema, database, ctx)
257        } else if let Some(ctx) = procedural_context {
258            ExpressionEvaluator::with_procedural_context(&schema, database, ctx)
259        } else if let Some(ref cte_ctx) = cte_results {
260            // Use CTE context for WITH clause support
261            ExpressionEvaluator::with_database_and_cte(&schema, database, cte_ctx)
262        } else {
263            ExpressionEvaluator::with_database(&schema, database)
264        };
265
266        // Check once if any DELETE triggers exist for this table (used for fast-path checks)
267        let has_delete_triggers = database
268            .catalog
269            .get_triggers_for_table(table_name, Some(vibesql_ast::TriggerEvent::Delete))
270            .next()
271            .is_some();
272
273        // Validate WHERE clause columns upfront (catches errors even on empty tables)
274        if let Some(vibesql_ast::WhereClause::Condition(where_expr)) = &stmt.where_clause {
275            Self::validate_where_columns(where_expr, &schema, table_name)?;
276        }
277
278        // Find rows to delete and their indices
279        // Try to use primary key index for fast lookup
280        let mut rows_and_indices_to_delete: Vec<(usize, vibesql_storage::Row)> = Vec::new();
281
282        // For ORDER BY in DELETE, we need to skip PK optimization and do a full scan
283        // because we need all matching rows to sort them properly
284        let has_order_by = stmt.order_by.is_some();
285
286        if !has_order_by {
287            if let Some(vibesql_ast::WhereClause::Condition(where_expr)) = &stmt.where_clause {
288                // Try primary key optimization
289                if let Some(pk_values) = Self::extract_primary_key_lookup(where_expr, &schema) {
290                    if let Some(pk_index) = table.primary_key_index() {
291                        if let Some(&row_index) = pk_index.get(&pk_values) {
292                            // Found the row via index - single row to delete
293                            rows_and_indices_to_delete
294                                .push((row_index, table.scan()[row_index].clone()));
295                        }
296                        // If not found, rows_and_indices_to_delete stays empty (no rows to delete)
297                    } else {
298                        // No PK index, fall through to table scan below
299                        Self::collect_rows_with_scan(
300                            table,
301                            &stmt.where_clause,
302                            &mut evaluator,
303                            &mut rows_and_indices_to_delete,
304                        )?;
305                    }
306                } else {
307                    // Can't extract PK lookup, fall through to table scan
308                    Self::collect_rows_with_scan(
309                        table,
310                        &stmt.where_clause,
311                        &mut evaluator,
312                        &mut rows_and_indices_to_delete,
313                    )?;
314                }
315            } else {
316                // No WHERE clause - collect all rows
317                Self::collect_rows_with_scan(
318                    table,
319                    &stmt.where_clause,
320                    &mut evaluator,
321                    &mut rows_and_indices_to_delete,
322                )?;
323            }
324        } else {
325            // ORDER BY present - must do full scan to get all rows for sorting
326            Self::collect_rows_with_scan(
327                table,
328                &stmt.where_clause,
329                &mut evaluator,
330                &mut rows_and_indices_to_delete,
331            )?;
332        }
333
334        // Apply ORDER BY sorting and LIMIT/OFFSET (SQLite extension)
335        if let Some(ref order_by) = stmt.order_by {
336            Self::apply_order_by_and_limit(
337                &mut rows_and_indices_to_delete,
338                order_by,
339                &stmt.limit,
340                &stmt.offset,
341                &schema,
342                &evaluator,
343            )?;
344        }
345
346        // Cost-based optimization: Log delete cost and check for early compaction recommendation
347        let optimizer = DmlOptimizer::new(database, table_name);
348        if optimizer.should_chunk_delete(rows_and_indices_to_delete.len()) {
349            // Log recommendation for potential chunked delete (informational only)
350            // Actual chunked delete would require transaction support to be safe
351            if std::env::var("DML_COST_DEBUG").is_ok() {
352                eprintln!(
353                    "DML_COST_DEBUG: DELETE on {} - {} rows qualifies for chunked delete",
354                    stmt.table_name,
355                    rows_and_indices_to_delete.len()
356                );
357            }
358        }
359        if optimizer.should_trigger_early_compaction() {
360            // Log early compaction recommendation (informational only)
361            // Table compaction is triggered automatically after >50% deleted rows
362            if std::env::var("DML_COST_DEBUG").is_ok() {
363                eprintln!(
364                    "DML_COST_DEBUG: DELETE on {} - early compaction recommended due to high deleted ratio",
365                    stmt.table_name
366                );
367            }
368        }
369
370        // Fire BEFORE STATEMENT triggers only if triggers exist AND we're not inside a trigger
371        // context (Statement-level triggers don't fire for deletes within trigger bodies)
372        if has_delete_triggers && trigger_context.is_none() {
373            crate::TriggerFirer::execute_before_statement_triggers(
374                database,
375                table_name,
376                vibesql_ast::TriggerEvent::Delete,
377            )?;
378        }
379
380        // Step 3: Fire BEFORE DELETE ROW triggers only if triggers exist
381        if has_delete_triggers {
382            for (_, row) in &rows_and_indices_to_delete {
383                crate::TriggerFirer::execute_before_triggers(
384                    database,
385                    table_name,
386                    vibesql_ast::TriggerEvent::Delete,
387                    Some(row),
388                    None,
389                )?;
390            }
391        }
392
393        // Step 4: Handle referential integrity for each row to be deleted
394        // This may CASCADE deletes, SET NULL, or SET DEFAULT in child tables
395        for (_, row) in &rows_and_indices_to_delete {
396            check_no_child_references(database, table_name, row)?;
397        }
398
399        // Extract indices for deletion
400        let mut deleted_indices: Vec<usize> =
401            rows_and_indices_to_delete.iter().map(|(idx, _)| *idx).collect();
402        deleted_indices.sort_unstable();
403
404        // Step 5a: Emit WAL entries and remove entries from user-defined indexes
405        // BEFORE deleting rows (while row indices are still valid and we have old values)
406        // First emit WAL entries for each row (needed for recovery replay)
407        for (idx, row) in &rows_and_indices_to_delete {
408            database.emit_wal_delete(table_name, *idx as u64, row.values.to_vec());
409        }
410
411        // Then use batch method for index updates: O(d + m*log n) vs O(d*m*log n)
412        // where d=deletes, m=indexes
413        let rows_refs: Vec<(usize, &vibesql_storage::Row)> =
414            rows_and_indices_to_delete.iter().map(|(idx, row)| (*idx, row)).collect();
415        database.batch_update_indexes_for_delete(table_name, &rows_refs);
416
417        // Maintain expression indexes for each deleted row
418        for (row_index, row) in &rows_and_indices_to_delete {
419            expression_index_maintenance::maintain_expression_indexes_for_delete(
420                database,
421                table_name,
422                row,
423                *row_index,
424            );
425        }
426
427        // Step 5b: Actually delete the rows using fast path (no table scan needed)
428        let table_mut = database
429            .get_table_mut(table_name)
430            .ok_or_else(|| ExecutorError::TableNotFound(stmt.table_name.clone()))?;
431
432        // Use delete_by_indices_batch for O(d) instead of O(n) where d = deletes
433        // The batch version pre-computes schema lookups for internal hash indexes,
434        // reducing overhead by ~30-40% for multi-row deletes.
435        // User-defined index entries have already been removed by batch_update_indexes_for_delete
436        // above. Note: If >50% of rows are deleted, compaction triggers and row indices
437        // change. When compaction occurs, we must rebuild user-defined indexes.
438        let delete_result = table_mut.delete_by_indices_batch(&deleted_indices);
439
440        // If compaction occurred, rebuild user-defined indexes since all row indices changed
441        if delete_result.compacted {
442            database.rebuild_indexes(table_name);
443            // Expression indexes need special handling (expression evaluation)
444            expression_index_maintenance::rebuild_expression_indexes_after_compaction(
445                database, table_name,
446            );
447        }
448
449        // Invalidate the database-level columnar cache since table data changed.
450        // Note: The table-level cache is already invalidated by delete_by_indices().
451        // Both invalidations are necessary because they manage separate caches:
452        // - Table-level cache: used by Table::scan_columnar() for SIMD filtering
453        // - Database-level cache: used by Database::get_columnar() for cached access
454        if delete_result.deleted_count > 0 {
455            database.invalidate_columnar_cache(table_name);
456        }
457
458        // Step 6: Fire AFTER DELETE ROW triggers only if triggers exist
459        if has_delete_triggers {
460            for (_, row) in &rows_and_indices_to_delete {
461                crate::TriggerFirer::execute_after_triggers(
462                    database,
463                    table_name,
464                    vibesql_ast::TriggerEvent::Delete,
465                    Some(row),
466                    None,
467                )?;
468            }
469        }
470
471        // Fire AFTER STATEMENT triggers only if triggers exist AND we're not inside a trigger
472        // context (Statement-level triggers don't fire for deletes within trigger bodies)
473        if has_delete_triggers && trigger_context.is_none() {
474            crate::TriggerFirer::execute_after_statement_triggers(
475                database,
476                table_name,
477                vibesql_ast::TriggerEvent::Delete,
478            )?;
479        }
480
481        // Check all assertions after DELETE completes (SQL:1999 Feature F671/F672)
482        // This ensures database-wide integrity constraints are maintained
483        crate::advanced_objects::AssertionChecker::check_all_assertions(database)?;
484
485        Ok(delete_result.deleted_count)
486    }
487
488    /// Extract primary key value from WHERE expression if it's a simple equality
489    fn extract_primary_key_lookup(
490        where_expr: &vibesql_ast::Expression,
491        schema: &vibesql_catalog::TableSchema,
492    ) -> Option<Vec<vibesql_types::SqlValue>> {
493        use vibesql_ast::{BinaryOperator, Expression};
494
495        // Only handle simple binary equality operations
496        if let Expression::BinaryOp { left, op: BinaryOperator::Equal, right } = where_expr {
497            // Check if left side is a column reference and right side is a literal
498            if let (Expression::ColumnRef(col_id), Expression::Literal(value)) =
499                (left.as_ref(), right.as_ref())
500            {
501                // Check if this column is the primary key
502                let column = col_id.column_canonical();
503                if let Some(pk_indices) = schema.get_primary_key_indices() {
504                    if let Some(col_index) = schema.get_column_index(column) {
505                        // Only handle single-column primary keys for now
506                        if pk_indices.len() == 1 && pk_indices[0] == col_index {
507                            return Some(vec![value.clone()]);
508                        }
509                    }
510                }
511            }
512
513            // Also check the reverse: literal = column
514            if let (Expression::Literal(value), Expression::ColumnRef(col_id)) =
515                (left.as_ref(), right.as_ref())
516            {
517                let column = col_id.column_canonical();
518                if let Some(pk_indices) = schema.get_primary_key_indices() {
519                    if let Some(col_index) = schema.get_column_index(column) {
520                        if pk_indices.len() == 1 && pk_indices[0] == col_index {
521                            return Some(vec![value.clone()]);
522                        }
523                    }
524                }
525            }
526        }
527
528        None
529    }
530
531    /// Validate that all column references in the WHERE clause exist in the schema
532    /// This catches errors like "DELETE FROM t WHERE nonexistent_col = 5" even on empty tables
533    #[allow(clippy::only_used_in_recursion)] // table_name preserved for future error messages
534    fn validate_where_columns(
535        expr: &vibesql_ast::Expression,
536        schema: &vibesql_catalog::TableSchema,
537        table_name: &str,
538    ) -> Result<(), ExecutorError> {
539        use vibesql_ast::Expression;
540
541        match expr {
542            Expression::ColumnRef(col_id) => {
543                let col_name = col_id.column_canonical();
544                // Check if column exists in schema (case-insensitive)
545                // Also allow ROWID pseudo-column aliases
546                let is_rowid = col_name.eq_ignore_ascii_case("rowid")
547                    || col_name.eq_ignore_ascii_case("_rowid_")
548                    || col_name.eq_ignore_ascii_case("oid");
549                if !is_rowid
550                    && !schema.columns.iter().any(|c| c.name.eq_ignore_ascii_case(col_name))
551                {
552                    return Err(ExecutorError::NoSuchColumn { column_ref: col_name.to_string() });
553                }
554                Ok(())
555            }
556            Expression::BinaryOp { left, right, .. } => {
557                Self::validate_where_columns(left, schema, table_name)?;
558                Self::validate_where_columns(right, schema, table_name)
559            }
560            Expression::UnaryOp { expr, .. } => {
561                Self::validate_where_columns(expr, schema, table_name)
562            }
563            Expression::IsNull { expr, .. } => {
564                Self::validate_where_columns(expr, schema, table_name)
565            }
566            Expression::Between { expr, low, high, .. } => {
567                Self::validate_where_columns(expr, schema, table_name)?;
568                Self::validate_where_columns(low, schema, table_name)?;
569                Self::validate_where_columns(high, schema, table_name)
570            }
571            Expression::InList { expr, values, .. } => {
572                Self::validate_where_columns(expr, schema, table_name)?;
573                for item in values {
574                    Self::validate_where_columns(item, schema, table_name)?;
575                }
576                Ok(())
577            }
578            Expression::Function { args, .. } => {
579                for arg in args {
580                    Self::validate_where_columns(arg, schema, table_name)?;
581                }
582                Ok(())
583            }
584            Expression::AggregateFunction { args, .. } => {
585                for arg in args {
586                    Self::validate_where_columns(arg, schema, table_name)?;
587                }
588                Ok(())
589            }
590            Expression::Case { operand, when_clauses, else_result } => {
591                if let Some(op) = operand {
592                    Self::validate_where_columns(op, schema, table_name)?;
593                }
594                for case_when in when_clauses {
595                    for cond in &case_when.conditions {
596                        Self::validate_where_columns(cond, schema, table_name)?;
597                    }
598                    Self::validate_where_columns(&case_when.result, schema, table_name)?;
599                }
600                if let Some(else_expr) = else_result {
601                    Self::validate_where_columns(else_expr, schema, table_name)?;
602                }
603                Ok(())
604            }
605            // Literals and other expressions that don't reference columns
606            Expression::Literal(_)
607            | Expression::Wildcard
608            | Expression::Placeholder(_)
609            | Expression::NumberedPlaceholder(_)
610            | Expression::NamedPlaceholder(_)
611            | Expression::CurrentDate
612            | Expression::CurrentTime { .. }
613            | Expression::CurrentTimestamp { .. } => Ok(()),
614            // Subqueries have their own scope - don't validate against parent table
615            Expression::ScalarSubquery(_)
616            | Expression::In { .. }
617            | Expression::Exists { .. }
618            | Expression::QuantifiedComparison { .. } => Ok(()),
619            // Other expressions - recurse into children if any
620            _ => Ok(()),
621        }
622    }
623
624    /// Collect rows using table scan (fallback when PK optimization can't be used)
625    fn collect_rows_with_scan(
626        table: &vibesql_storage::Table,
627        where_clause: &Option<vibesql_ast::WhereClause>,
628        evaluator: &mut ExpressionEvaluator,
629        rows_and_indices: &mut Vec<(usize, vibesql_storage::Row)>,
630    ) -> Result<(), ExecutorError> {
631        // Use scan_live() to skip already-deleted rows
632        for (index, row) in table.scan_live() {
633            // Clear CSE cache before evaluating each row to prevent column values
634            // from being incorrectly cached across different rows
635            evaluator.clear_cse_cache();
636
637            // Set row_id for ROWID pseudo-column support (SQLite compatibility)
638            // SQLite uses 1-indexed rowids, so add 1 to the physical index
639            // This allows WHERE rowid = N to work correctly
640            // Use the row's explicit row_id if set, otherwise compute from physical index
641            let row_id = row.row_id.unwrap_or((index + 1) as u64);
642            evaluator.set_row_index(row_id);
643
644            let should_delete = if let Some(ref where_clause) = where_clause {
645                match where_clause {
646                    vibesql_ast::WhereClause::Condition(where_expr) => {
647                        // Propagate errors from eval() - don't silently swallow them
648                        let result = evaluator.eval(where_expr, row)?;
649                        // SQLite treats non-zero numeric values as TRUE
650                        is_truthy(&result)
651                    }
652                    vibesql_ast::WhereClause::CurrentOf(_cursor_name) => {
653                        return Err(ExecutorError::UnsupportedFeature(
654                            "WHERE CURRENT OF cursor is not yet implemented".to_string(),
655                        ));
656                    }
657                }
658            } else {
659                true
660            };
661
662            if should_delete {
663                rows_and_indices.push((index, row.clone()));
664            }
665        }
666
667        Ok(())
668    }
669
670    /// Apply ORDER BY sorting and LIMIT/OFFSET to the collected rows
671    /// This implements the SQLite extension for DELETE with ORDER BY LIMIT
672    fn apply_order_by_and_limit(
673        rows_and_indices: &mut Vec<(usize, vibesql_storage::Row)>,
674        order_by: &[vibesql_ast::OrderByItem],
675        limit: &Option<vibesql_ast::Expression>,
676        offset: &Option<vibesql_ast::Expression>,
677        _schema: &vibesql_catalog::TableSchema,
678        evaluator: &ExpressionEvaluator,
679    ) -> Result<(), ExecutorError> {
680        use vibesql_ast::OrderDirection;
681        use vibesql_types::SqlValue;
682
683        // Sort rows by ORDER BY columns
684        rows_and_indices.sort_by(|a, b| {
685            for item in order_by {
686                // Evaluate the ORDER BY expression for both rows
687                let val_a = evaluator.eval(&item.expr, &a.1).unwrap_or(SqlValue::Null);
688                let val_b = evaluator.eval(&item.expr, &b.1).unwrap_or(SqlValue::Null);
689
690                // Compare values with proper NULL handling
691                // NULLS FIRST: nulls come first (default for DESC)
692                // NULLS LAST: nulls come last (default for ASC)
693                let nulls_first = match item.nulls_order {
694                    Some(vibesql_ast::NullsOrder::First) => true,
695                    Some(vibesql_ast::NullsOrder::Last) => false,
696                    None => matches!(item.direction, OrderDirection::Desc),
697                };
698
699                let cmp = match (&val_a, &val_b) {
700                    (SqlValue::Null, SqlValue::Null) => std::cmp::Ordering::Equal,
701                    (SqlValue::Null, _) => {
702                        if nulls_first {
703                            std::cmp::Ordering::Less
704                        } else {
705                            std::cmp::Ordering::Greater
706                        }
707                    }
708                    (_, SqlValue::Null) => {
709                        if nulls_first {
710                            std::cmp::Ordering::Greater
711                        } else {
712                            std::cmp::Ordering::Less
713                        }
714                    }
715                    _ => val_a.partial_cmp(&val_b).unwrap_or(std::cmp::Ordering::Equal),
716                };
717
718                // Apply direction
719                let cmp = match item.direction {
720                    OrderDirection::Desc => cmp.reverse(),
721                    OrderDirection::Asc => cmp,
722                };
723
724                if cmp != std::cmp::Ordering::Equal {
725                    return cmp;
726                }
727            }
728            std::cmp::Ordering::Equal
729        });
730
731        // Evaluate OFFSET expression if present
732        let offset_val = if let Some(ref offset_expr) = offset {
733            // Evaluate the offset expression without a row context
734            // (it should be a constant or simple expression)
735            let empty_row = vibesql_storage::Row::new(vec![]);
736            match evaluator.eval(offset_expr, &empty_row)? {
737                SqlValue::Integer(n) if n >= 0 => n as usize,
738                SqlValue::Bigint(n) if n >= 0 => n as usize,
739                SqlValue::Null => 0, // NULL offset treated as 0
740                _ => {
741                    return Err(ExecutorError::TypeError(
742                        "OFFSET value must be a non-negative integer".to_string(),
743                    ))
744                }
745            }
746        } else {
747            0
748        };
749
750        // Evaluate LIMIT expression if present
751        let limit_val = if let Some(ref limit_expr) = limit {
752            let empty_row = vibesql_storage::Row::new(vec![]);
753            match evaluator.eval(limit_expr, &empty_row)? {
754                SqlValue::Integer(n) if n >= 0 => Some(n as usize),
755                SqlValue::Bigint(n) if n >= 0 => Some(n as usize),
756                SqlValue::Integer(-1) | SqlValue::Bigint(-1) => None, // -1 means no limit (SQLite extension)
757                SqlValue::Null => None, // NULL limit treated as no limit
758                _ => {
759                    return Err(ExecutorError::TypeError(
760                        "LIMIT value must be a non-negative integer".to_string(),
761                    ))
762                }
763            }
764        } else {
765            None
766        };
767
768        // Apply OFFSET: skip first N rows
769        if offset_val > 0 {
770            if offset_val >= rows_and_indices.len() {
771                rows_and_indices.clear();
772            } else {
773                rows_and_indices.drain(..offset_val);
774            }
775        }
776
777        // Apply LIMIT: keep only first N rows
778        if let Some(limit) = limit_val {
779            rows_and_indices.truncate(limit);
780        }
781
782        Ok(())
783    }
784}
785
786/// Execute TRUNCATE-style fast path for DELETE FROM table (no WHERE)
787///
788/// Clears all rows and indexes in a single operation instead of row-by-row deletion.
789/// Provides 100-1000x performance improvement for full table deletes.
790///
791/// # Safety
792/// Only call this after `can_use_truncate` returns true.
793fn execute_truncate(database: &mut Database, table_name: &str) -> Result<usize, ExecutorError> {
794    let table = database
795        .get_table_mut(table_name)
796        .ok_or_else(|| ExecutorError::TableNotFound(table_name.to_string()))?;
797
798    let row_count = table.row_count();
799
800    // Clear all data at once (O(1) operation)
801    // Note: table.clear() invalidates the table-level columnar cache internally
802    table.clear();
803
804    // Invalidate the database-level columnar cache since table data changed.
805    // Both the table-level (via clear()) and database-level invalidations are
806    // necessary because they manage separate caches at different levels.
807    if row_count > 0 {
808        database.invalidate_columnar_cache(table_name);
809    }
810
811    // Check all assertions after DELETE completes (SQL:1999 Feature F671/F672)
812    // This ensures database-wide integrity constraints are maintained
813    crate::advanced_objects::AssertionChecker::check_all_assertions(database)?;
814
815    Ok(row_count)
816}
817
818/// Execute a DELETE statement with trigger context
819/// This function is used when executing DELETE statements within trigger bodies
820/// to support OLD/NEW pseudo-variable references
821pub fn execute_delete_with_trigger_context(
822    database: &mut Database,
823    stmt: &DeleteStmt,
824    trigger_context: &crate::trigger_execution::TriggerContext,
825) -> Result<usize, ExecutorError> {
826    DeleteExecutor::execute_with_trigger_context(stmt, database, trigger_context)
827}
828
829/// Execute DELETE on a VIEW using INSTEAD OF triggers
830///
831/// When deleting from a view, we need to fire INSTEAD OF DELETE triggers
832/// instead of actually deleting data. The triggers typically delete from
833/// the underlying tables.
834fn execute_delete_on_view(
835    database: &mut Database,
836    stmt: &DeleteStmt,
837    view_def: &vibesql_catalog::ViewDefinition,
838    procedural_context: Option<&crate::procedural::ExecutionContext>,
839    trigger_context: Option<&crate::trigger_execution::TriggerContext>,
840) -> Result<usize, ExecutorError> {
841    use vibesql_ast::TriggerTiming;
842
843    // Find INSTEAD OF DELETE triggers for this view
844    let triggers = crate::TriggerFirer::find_triggers(
845        database,
846        &view_def.name,
847        TriggerTiming::InsteadOf,
848        vibesql_ast::TriggerEvent::Delete,
849    );
850
851    if triggers.is_empty() {
852        return Err(ExecutorError::UnsupportedExpression(format!(
853            "Cannot DELETE from view '{}' without INSTEAD OF trigger",
854            view_def.name
855        )));
856    }
857
858    // Build a pseudo-schema for the view
859    let view_schema = build_view_schema(database, view_def)?;
860
861    // Execute the view query to get the rows to potentially delete
862    let select_executor = crate::SelectExecutor::new(database);
863    let all_rows = select_executor.execute_with_columns(&view_def.query)?;
864
865    // Collect rows to delete first, before firing triggers
866    // This avoids borrow conflicts with the evaluator
867    let rows_to_delete: Vec<vibesql_storage::Row> = {
868        // Create evaluator for WHERE clause (if any)
869        let evaluator = if let Some(ctx) = trigger_context {
870            ExpressionEvaluator::with_trigger_context(&view_schema, database, ctx)
871        } else if let Some(ctx) = procedural_context {
872            ExpressionEvaluator::with_procedural_context(&view_schema, database, ctx)
873        } else {
874            ExpressionEvaluator::with_database(&view_schema, database)
875        };
876
877        // Select rows matching WHERE clause
878        let mut collected_rows = Vec::new();
879        for row in &all_rows.rows {
880            let matches = match &stmt.where_clause {
881                Some(vibesql_ast::WhereClause::Condition(expr)) => {
882                    let result = evaluator.eval(expr, row)?;
883                    // SQLite treats non-zero numeric values as TRUE
884                    is_truthy(&result)
885                }
886                None => true, // No WHERE clause - delete all rows
887                Some(vibesql_ast::WhereClause::CurrentOf(_)) => {
888                    return Err(ExecutorError::UnsupportedExpression(
889                        "CURRENT OF not supported for view deletes".to_string(),
890                    ));
891                }
892            };
893
894            if matches {
895                collected_rows.push(row.clone());
896            }
897        }
898        collected_rows
899    }; // evaluator dropped here
900
901    // Now fire triggers (database can be mutably borrowed)
902    let rows_processed = rows_to_delete.len();
903    for old_row in rows_to_delete {
904        for trigger in &triggers {
905            crate::TriggerFirer::execute_trigger(database, trigger, Some(&old_row), None)?;
906        }
907    }
908
909    Ok(rows_processed)
910}
911
912/// Build a pseudo TableSchema from a view definition
913fn build_view_schema(
914    database: &Database,
915    view_def: &vibesql_catalog::ViewDefinition,
916) -> Result<vibesql_catalog::TableSchema, ExecutorError> {
917    // Execute the view's SELECT query to get column names
918    let select_executor = crate::SelectExecutor::new(database);
919    let result = select_executor.execute_with_columns(&view_def.query)?;
920
921    // Use explicit column names if provided, otherwise derive from SELECT
922    let column_names: Vec<String> =
923        if let Some(ref cols) = view_def.columns { cols.clone() } else { result.columns.clone() };
924
925    // Build columns with a generic data type (we just need names for trigger binding)
926    let columns: Vec<vibesql_catalog::ColumnSchema> = column_names
927        .into_iter()
928        .map(|name| {
929            vibesql_catalog::ColumnSchema::new(
930                name,
931                vibesql_types::DataType::Varchar { max_length: None },
932                true,
933            )
934        })
935        .collect();
936
937    Ok(vibesql_catalog::TableSchema::new(view_def.name.clone(), columns))
938}
939
940/// Check if a SqlValue is truthy using SQLite truthiness rules.
941///
942/// SQLite rules:
943/// - `Boolean(true)` → true
944/// - `Boolean(false)` → false
945/// - `Null` → false (NULL is not truthy for WHERE clause purposes)
946/// - `0` and `0.0` → false
947/// - Any other numeric value → true
948/// - Strings → parse leading numeric, non-zero is true, 0 or non-numeric is false
949fn is_truthy(value: &SqlValue) -> bool {
950    use SqlValue::*;
951    match value {
952        Boolean(b) => *b,
953        Null => false, // NULL is not truthy for row selection
954        // Integer types: 0 is false, non-zero is true
955        Integer(n) => *n != 0,
956        Smallint(n) => *n != 0,
957        Bigint(n) => *n != 0,
958        Unsigned(n) => *n != 0,
959        // Floating point types: 0.0 is false, non-zero is true
960        Float(f) => *f != 0.0,
961        Real(f) => *f != 0.0,
962        Double(f) => *f != 0.0,
963        Numeric(f) => *f != 0.0,
964        // String types: SQLite parses leading numeric portion
965        Varchar(s) | Character(s) => crate::evaluator::operators::is_truthy_string(s),
966        // Other types are not truthy (conservative default)
967        _ => false,
968    }
969}