Skip to main content

citadel_sql/executor/
write.rs

1use std::collections::HashMap;
2
3use citadel::Database;
4
5use crate::encoding::{
6    decode_column_raw, decode_column_with_offset, decode_composite_key, decode_pk_integer,
7    encode_composite_key, encode_row, patch_at_offset, patch_column_in_place, patch_row_column,
8};
9use crate::error::{Result, SqlError};
10use crate::eval::{eval_expr, is_truthy, ColumnMap};
11use crate::parser::*;
12use crate::schema::SchemaManager;
13use crate::types::*;
14
15use super::correlated::*;
16use super::dml::*;
17use super::helpers::*;
18use super::scan::*;
19use super::select::*;
20use super::view::*;
21use super::CteContext;
22
23// ── Compiled UPDATE plan cache ──────────────────────────────────────
24
25pub struct UpdateBufs {
26    partial_row: Vec<Value>,
27    patch_buf: Vec<u8>,
28    offsets: Vec<usize>,
29}
30
31impl Default for UpdateBufs {
32    fn default() -> Self {
33        Self {
34            partial_row: Vec::new(),
35            patch_buf: Vec::with_capacity(256),
36            offsets: Vec::new(),
37        }
38    }
39}
40
41impl UpdateBufs {
42    pub fn new() -> Self {
43        Self::default()
44    }
45}
46
47pub struct CompiledUpdate {
48    table_name_lower: String,
49    is_view: bool,
50    has_correlated_where: bool,
51    has_subquery: bool,
52    can_fast_path: bool,
53    fast: Option<CompiledFastPath>,
54}
55
56struct CompiledFastPath {
57    num_pk_cols: usize,
58    num_columns: usize,
59    single_int_pk: bool,
60    targets: Vec<CompiledTarget>,
61    scan_plan: crate::planner::ScanPlan,
62    pk_idx_cache: Vec<usize>,
63    col_map: ColumnMap,
64    range_bounds_i64: Option<Vec<(BinOp, i64)>>,
65}
66
67enum FastEval {
68    None,
69    IntAdd(i64),
70    IntSub(i64),
71    IntMul(i64),
72    IntSet(i64),
73}
74
75struct CompiledTarget {
76    schema_idx: usize,
77    phys_idx: usize,
78    expr: Expr,
79    col: ColumnDef,
80    fast_eval: FastEval,
81}
82
83fn detect_fast_eval(expr: &Expr, col_name: &str) -> FastEval {
84    let lower = col_name.to_ascii_lowercase();
85    match expr {
86        Expr::Literal(Value::Integer(n)) => FastEval::IntSet(*n),
87        Expr::BinaryOp { left, op, right } => {
88            let col_match =
89                |e: &Expr| matches!(e, Expr::Column(c) if c.to_ascii_lowercase() == lower);
90            let int_lit = |e: &Expr| match e {
91                Expr::Literal(Value::Integer(n)) => Some(*n),
92                _ => None,
93            };
94            if col_match(left) {
95                if let Some(n) = int_lit(right) {
96                    return match op {
97                        BinOp::Add => FastEval::IntAdd(n),
98                        BinOp::Sub => FastEval::IntSub(n),
99                        BinOp::Mul => FastEval::IntMul(n),
100                        _ => FastEval::None,
101                    };
102                }
103            }
104            if col_match(right) {
105                if let Some(n) = int_lit(left) {
106                    return match op {
107                        BinOp::Add => FastEval::IntAdd(n),
108                        BinOp::Mul => FastEval::IntMul(n),
109                        _ => FastEval::None,
110                    };
111                }
112            }
113            FastEval::None
114        }
115        _ => FastEval::None,
116    }
117}
118
119pub fn compile_update(schema: &SchemaManager, stmt: &UpdateStmt) -> Result<CompiledUpdate> {
120    let table_name_lower = stmt.table.to_ascii_lowercase();
121    let is_view = schema.get_view(&table_name_lower).is_some();
122    if is_view {
123        return Ok(CompiledUpdate {
124            table_name_lower,
125            is_view: true,
126            has_correlated_where: false,
127            has_subquery: false,
128            can_fast_path: false,
129            fast: None,
130        });
131    }
132
133    let table_schema = schema
134        .get(&table_name_lower)
135        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
136
137    let corr_ctx = CorrelationCtx {
138        outer_schema: table_schema,
139        outer_alias: None,
140    };
141    let has_correlated = has_correlated_where(&stmt.where_clause, &corr_ctx, schema);
142    let has_sub = update_has_subquery(stmt);
143
144    if has_correlated || has_sub {
145        return Ok(CompiledUpdate {
146            table_name_lower,
147            is_view: false,
148            has_correlated_where: has_correlated,
149            has_subquery: has_sub,
150            can_fast_path: false,
151            fast: None,
152        });
153    }
154
155    let pk_indices = table_schema.pk_indices();
156    let pk_changed_by_set = stmt.assignments.iter().any(|(col_name, _)| {
157        table_schema
158            .column_index(col_name)
159            .is_some_and(|idx| table_schema.primary_key_columns.contains(&(idx as u16)))
160    });
161    let has_fk = !table_schema.foreign_keys.is_empty();
162    let has_indices = !table_schema.indices.is_empty();
163    let has_child_fk = !schema.child_fks_for(&table_name_lower).is_empty();
164    let can_fast_path = !pk_changed_by_set
165        && !has_fk
166        && !has_indices
167        && !has_child_fk
168        && !table_schema.has_checks();
169
170    let fast = if can_fast_path {
171        let non_pk = table_schema.non_pk_indices();
172        let enc_pos = table_schema.encoding_positions();
173        let num_pk_cols = table_schema.primary_key_columns.len();
174
175        let mut targets = Vec::with_capacity(stmt.assignments.len());
176        for (col_name, expr) in &stmt.assignments {
177            let schema_idx = table_schema
178                .column_index(col_name)
179                .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))?;
180            let nonpk_order = non_pk
181                .iter()
182                .position(|&i| i == schema_idx)
183                .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))?;
184            let phys_idx = enc_pos[nonpk_order] as usize;
185            let fast_eval = detect_fast_eval(expr, col_name);
186            targets.push(CompiledTarget {
187                schema_idx,
188                phys_idx,
189                expr: expr.clone(),
190                col: table_schema.columns[schema_idx].clone(),
191                fast_eval,
192            });
193        }
194
195        let plan = crate::planner::plan_select(table_schema, &stmt.where_clause);
196        let single_int_pk = num_pk_cols == 1
197            && table_schema.columns[table_schema.primary_key_columns[0] as usize].data_type
198                == DataType::Integer;
199
200        let range_bounds_i64 = if single_int_pk {
201            if let crate::planner::ScanPlan::PkRangeScan {
202                ref range_conds, ..
203            } = plan
204            {
205                let bounds: Vec<(BinOp, i64)> = range_conds
206                    .iter()
207                    .filter_map(|(op, val)| match val {
208                        Value::Integer(i) => Some((*op, *i)),
209                        _ => None,
210                    })
211                    .collect();
212                if bounds.len() == range_conds.len() {
213                    Some(bounds)
214                } else {
215                    None
216                }
217            } else {
218                None
219            }
220        } else {
221            None
222        };
223
224        Some(CompiledFastPath {
225            num_pk_cols,
226            num_columns: table_schema.columns.len(),
227            single_int_pk,
228            targets,
229            scan_plan: plan,
230            pk_idx_cache: pk_indices.to_vec(),
231            col_map: ColumnMap::new(&table_schema.columns),
232            range_bounds_i64,
233        })
234    } else {
235        None
236    };
237
238    Ok(CompiledUpdate {
239        table_name_lower,
240        is_view: false,
241        has_correlated_where: false,
242        has_subquery: false,
243        can_fast_path,
244        fast,
245    })
246}
247
248pub fn exec_update_compiled(
249    db: &Database,
250    schema: &SchemaManager,
251    stmt: &UpdateStmt,
252    compiled: &CompiledUpdate,
253    bufs: &mut UpdateBufs,
254) -> Result<ExecutionResult> {
255    if compiled.is_view {
256        return Err(SqlError::CannotModifyView(stmt.table.clone()));
257    }
258    if compiled.has_correlated_where || compiled.has_subquery || !compiled.can_fast_path {
259        return exec_update(db, schema, stmt);
260    }
261
262    let fast = compiled.fast.as_ref().unwrap();
263    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
264
265    if let crate::planner::ScanPlan::PkRangeScan {
266        ref start_key,
267        ref range_conds,
268        ..
269    } = fast.scan_plan
270    {
271        bufs.partial_row.clear();
272        bufs.partial_row.resize(fast.num_columns, Value::Null);
273        bufs.offsets.clear();
274        bufs.offsets.resize(fast.targets.len(), usize::MAX);
275
276        let count = wtx.table_update_range(
277            compiled.table_name_lower.as_bytes(),
278            start_key,
279            |key, value| {
280                if let Some(ref bounds) = fast.range_bounds_i64 {
281                    let pk = decode_pk_integer(key)?;
282                    for &(op, bound) in bounds {
283                        match op {
284                            BinOp::Lt if pk >= bound => return Ok(None),
285                            BinOp::LtEq if pk > bound => return Ok(None),
286                            BinOp::Gt if pk <= bound => return Ok(Some(false)),
287                            BinOp::GtEq if pk < bound => return Ok(Some(false)),
288                            _ => {}
289                        }
290                    }
291                    bufs.partial_row[fast.pk_idx_cache[0]] = Value::Integer(pk);
292                } else if fast.single_int_pk {
293                    let pk = decode_pk_integer(key)?;
294                    let pk_val = Value::Integer(pk);
295                    for (op, bound) in range_conds {
296                        match op {
297                            BinOp::Lt if &pk_val >= bound => return Ok(None),
298                            BinOp::LtEq if &pk_val > bound => return Ok(None),
299                            BinOp::Gt if &pk_val <= bound => return Ok(Some(false)),
300                            BinOp::GtEq if &pk_val < bound => return Ok(Some(false)),
301                            _ => {}
302                        }
303                    }
304                    bufs.partial_row[fast.pk_idx_cache[0]] = pk_val;
305                } else {
306                    let pk_vals = decode_composite_key(key, fast.num_pk_cols)?;
307                    for (op, bound) in range_conds {
308                        match op {
309                            BinOp::Lt if &pk_vals[0] >= bound => return Ok(None),
310                            BinOp::LtEq if &pk_vals[0] > bound => return Ok(None),
311                            BinOp::Gt if &pk_vals[0] <= bound => return Ok(Some(false)),
312                            BinOp::GtEq if &pk_vals[0] < bound => return Ok(Some(false)),
313                            _ => {}
314                        }
315                    }
316                    for (i, &pi) in fast.pk_idx_cache.iter().enumerate() {
317                        bufs.partial_row[pi] = pk_vals[i].clone();
318                    }
319                }
320                for (i, target) in fast.targets.iter().enumerate() {
321                    let (raw, off) = decode_column_with_offset(value, target.phys_idx)?;
322                    bufs.partial_row[target.schema_idx] = raw.to_value();
323                    bufs.offsets[i] = off;
324                }
325                for (i, target) in fast.targets.iter().enumerate() {
326                    let new_val = match target.fast_eval {
327                        FastEval::IntAdd(n) => {
328                            if let Value::Integer(v) = bufs.partial_row[target.schema_idx] {
329                                Value::Integer(v.wrapping_add(n))
330                            } else {
331                                eval_expr(&target.expr, &fast.col_map, &bufs.partial_row)?
332                            }
333                        }
334                        FastEval::IntSub(n) => {
335                            if let Value::Integer(v) = bufs.partial_row[target.schema_idx] {
336                                Value::Integer(v.wrapping_sub(n))
337                            } else {
338                                eval_expr(&target.expr, &fast.col_map, &bufs.partial_row)?
339                            }
340                        }
341                        FastEval::IntMul(n) => {
342                            if let Value::Integer(v) = bufs.partial_row[target.schema_idx] {
343                                Value::Integer(v.wrapping_mul(n))
344                            } else {
345                                eval_expr(&target.expr, &fast.col_map, &bufs.partial_row)?
346                            }
347                        }
348                        FastEval::IntSet(n) => Value::Integer(n),
349                        FastEval::None => {
350                            eval_expr(&target.expr, &fast.col_map, &bufs.partial_row)?
351                        }
352                    };
353                    let coerced = if new_val.is_null() {
354                        if !target.col.nullable {
355                            return Err(SqlError::NotNullViolation(target.col.name.clone()));
356                        }
357                        Value::Null
358                    } else {
359                        let got_type = new_val.data_type();
360                        new_val.coerce_into(target.col.data_type).ok_or_else(|| {
361                            SqlError::TypeMismatch {
362                                expected: target.col.data_type.to_string(),
363                                got: got_type.to_string(),
364                            }
365                        })?
366                    };
367                    if !patch_at_offset(value, bufs.offsets[i], &coerced)?
368                        && !patch_column_in_place(value, target.phys_idx, &coerced)?
369                    {
370                        patch_row_column(value, target.phys_idx, &coerced, &mut bufs.patch_buf)?;
371                        value[..bufs.patch_buf.len()].copy_from_slice(&bufs.patch_buf);
372                        for off in bufs.offsets.iter_mut().skip(i + 1) {
373                            *off = usize::MAX;
374                        }
375                    }
376                }
377                Ok(Some(true))
378            },
379        )?;
380
381        wtx.commit().map_err(SqlError::Storage)?;
382        return Ok(ExecutionResult::RowsAffected(count));
383    }
384
385    // PkLookup / SeqScan — fall back to full exec_update for now
386    drop(wtx);
387    exec_update(db, schema, stmt)
388}
389
390// ── UPDATE / DELETE execution ───────────────────────────────────────
391
392pub(super) fn exec_update(
393    db: &Database,
394    schema: &SchemaManager,
395    stmt: &UpdateStmt,
396) -> Result<ExecutionResult> {
397    let lower_name = stmt.table.to_ascii_lowercase();
398    if schema.get_view(&lower_name).is_some() {
399        return Err(SqlError::CannotModifyView(stmt.table.clone()));
400    }
401    let table_schema = schema
402        .get(&lower_name)
403        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
404
405    // Correlated subquery in UPDATE WHERE — check BEFORE materialization
406    let corr_ctx = CorrelationCtx {
407        outer_schema: table_schema,
408        outer_alias: None,
409    };
410    if has_correlated_where(&stmt.where_clause, &corr_ctx, schema) {
411        let select_stmt = SelectStmt {
412            columns: vec![SelectColumn::AllColumns],
413            from: stmt.table.clone(),
414            from_alias: None,
415            joins: vec![],
416            distinct: false,
417            where_clause: stmt.where_clause.clone(),
418            order_by: vec![],
419            limit: None,
420            offset: None,
421            group_by: vec![],
422            having: None,
423        };
424        let (mut rows, _) = collect_rows_read(db, table_schema, &None, None)?;
425        let remaining =
426            handle_correlated_where_read(db, schema, &select_stmt, &corr_ctx, &mut rows)?;
427
428        if let Some(ref w) = remaining {
429            let col_map = ColumnMap::new(&table_schema.columns);
430            rows.retain(|row| match eval_expr(w, &col_map, row) {
431                Ok(val) => is_truthy(&val),
432                Err(_) => false,
433            });
434        }
435
436        let pk_indices = table_schema.pk_indices();
437        let pk_values: Vec<Value> = rows.iter().map(|row| row[pk_indices[0]].clone()).collect();
438        let pk_col = &table_schema.columns[pk_indices[0]].name;
439        let in_set: std::collections::HashSet<Value> = pk_values.into_iter().collect();
440        let new_where = if in_set.is_empty() {
441            Some(Expr::Literal(Value::Boolean(false)))
442        } else {
443            Some(Expr::InSet {
444                expr: Box::new(Expr::Column(pk_col.clone())),
445                values: in_set,
446                has_null: false,
447                negated: false,
448            })
449        };
450
451        let rewritten = UpdateStmt {
452            table: stmt.table.clone(),
453            assignments: stmt.assignments.clone(),
454            where_clause: new_where,
455        };
456        return exec_update(db, schema, &rewritten);
457    }
458
459    let materialized;
460    let stmt = if update_has_subquery(stmt) {
461        materialized = materialize_update(stmt, &mut |sub| {
462            exec_subquery_read(db, schema, sub, &HashMap::new())
463        })?;
464        &materialized
465    } else {
466        stmt
467    };
468
469    let col_map = ColumnMap::new(&table_schema.columns);
470    let pk_indices = table_schema.pk_indices();
471
472    let pk_changed_by_set = stmt.assignments.iter().any(|(col_name, _)| {
473        table_schema
474            .column_index(col_name)
475            .is_some_and(|idx| table_schema.primary_key_columns.contains(&(idx as u16)))
476    });
477
478    // Fast path: no FK, no indices, no PK change → raw-byte scan + patch
479    let has_fk = !table_schema.foreign_keys.is_empty();
480    let has_indices = !table_schema.indices.is_empty();
481    let has_child_fk = !schema.child_fks_for(&lower_name).is_empty();
482    if !pk_changed_by_set && !has_fk && !has_indices && !has_child_fk && !table_schema.has_checks()
483    {
484        let non_pk = table_schema.non_pk_indices();
485        let enc_pos = table_schema.encoding_positions();
486        let num_pk_cols = table_schema.primary_key_columns.len();
487
488        struct AssignTarget {
489            schema_idx: usize,
490            phys_idx: usize,
491            expr: Expr,
492            col: ColumnDef,
493        }
494        let mut targets: Vec<AssignTarget> = Vec::with_capacity(stmt.assignments.len());
495        for (col_name, expr) in &stmt.assignments {
496            let schema_idx = table_schema
497                .column_index(col_name)
498                .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))?;
499            let nonpk_order = non_pk
500                .iter()
501                .position(|&i| i == schema_idx)
502                .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))?;
503            let phys_idx = enc_pos[nonpk_order] as usize;
504            targets.push(AssignTarget {
505                schema_idx,
506                phys_idx,
507                expr: expr.clone(),
508                col: table_schema.columns[schema_idx].clone(),
509            });
510        }
511
512        let plan = crate::planner::plan_select(table_schema, &stmt.where_clause);
513        let single_int_pk = num_pk_cols == 1
514            && table_schema.columns[table_schema.primary_key_columns[0] as usize].data_type
515                == DataType::Integer;
516
517        let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
518
519        // Fused PkRangeScan: scan + patch in a single leaf pass, zero allocs
520        if let crate::planner::ScanPlan::PkRangeScan {
521            ref start_key,
522            ref range_conds,
523            ..
524        } = plan
525        {
526            let range_conds = range_conds.clone();
527            let mut partial_row = vec![Value::Null; table_schema.columns.len()];
528            let pk_idx_cache = table_schema.pk_indices().to_vec();
529            let mut patch_buf: Vec<u8> = Vec::with_capacity(256);
530
531            let count =
532                wtx.table_update_range(lower_name.as_bytes(), start_key, |key, value| {
533                    // Range check: None = stop, Some(false) = skip, fall through = in range
534                    if single_int_pk {
535                        let pk_int = Value::Integer(decode_pk_integer(key)?);
536                        for (op, bound) in &range_conds {
537                            match op {
538                                BinOp::Lt if &pk_int >= bound => return Ok(None),
539                                BinOp::LtEq if &pk_int > bound => return Ok(None),
540                                BinOp::Gt if &pk_int <= bound => return Ok(Some(false)),
541                                BinOp::GtEq if &pk_int < bound => return Ok(Some(false)),
542                                _ => {}
543                            }
544                        }
545                    } else {
546                        let pk_vals = decode_composite_key(key, num_pk_cols)?;
547                        for (op, bound) in &range_conds {
548                            match op {
549                                BinOp::Lt if &pk_vals[0] >= bound => return Ok(None),
550                                BinOp::LtEq if &pk_vals[0] > bound => return Ok(None),
551                                BinOp::Gt if &pk_vals[0] <= bound => return Ok(Some(false)),
552                                BinOp::GtEq if &pk_vals[0] < bound => return Ok(Some(false)),
553                                _ => {}
554                            }
555                        }
556                    }
557
558                    if single_int_pk {
559                        partial_row[pk_idx_cache[0]] = Value::Integer(decode_pk_integer(key)?);
560                    } else {
561                        let pk_vals = decode_composite_key(key, num_pk_cols)?;
562                        for (i, &pi) in pk_idx_cache.iter().enumerate() {
563                            partial_row[pi] = pk_vals[i].clone();
564                        }
565                    }
566                    for target in &targets {
567                        partial_row[target.schema_idx] =
568                            decode_column_raw(value, target.phys_idx)?.to_value();
569                    }
570                    // Eval + patch directly in the leaf cell's value bytes
571                    for target in &targets {
572                        let new_val = eval_expr(&target.expr, &col_map, &partial_row)?;
573                        let coerced = if new_val.is_null() {
574                            if !target.col.nullable {
575                                return Err(SqlError::NotNullViolation(target.col.name.clone()));
576                            }
577                            Value::Null
578                        } else {
579                            let got_type = new_val.data_type();
580                            new_val.coerce_into(target.col.data_type).ok_or_else(|| {
581                                SqlError::TypeMismatch {
582                                    expected: target.col.data_type.to_string(),
583                                    got: got_type.to_string(),
584                                }
585                            })?
586                        };
587                        if !patch_column_in_place(value, target.phys_idx, &coerced)? {
588                            patch_row_column(value, target.phys_idx, &coerced, &mut patch_buf)?;
589                            value[..patch_buf.len()].copy_from_slice(&patch_buf);
590                        }
591                    }
592                    Ok(Some(true))
593                })?;
594
595            wtx.commit().map_err(SqlError::Storage)?;
596            return Ok(ExecutionResult::RowsAffected(count));
597        }
598
599        // Collect-then-write path for PkLookup and SeqScan
600        let mut kv_pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
601        {
602            match &plan {
603                crate::planner::ScanPlan::PkLookup { pk_values } => {
604                    let key = crate::encoding::encode_composite_key(pk_values);
605                    if let Some(value) = wtx
606                        .table_get(lower_name.as_bytes(), &key)
607                        .map_err(SqlError::Storage)?
608                    {
609                        kv_pairs.push((key, value));
610                    }
611                }
612                _ => {
613                    wtx.table_for_each(lower_name.as_bytes(), |key, value| {
614                        kv_pairs.push((key.to_vec(), value.to_vec()));
615                        Ok(())
616                    })
617                    .map_err(SqlError::Storage)?;
618                }
619            }
620        }
621
622        let mut patch_buf: Vec<u8> = Vec::with_capacity(256);
623        let mut partial_row = vec![Value::Null; table_schema.columns.len()];
624        let pk_idx_cache = table_schema.pk_indices().to_vec();
625        let mut patched: Vec<(Vec<u8>, Vec<u8>)> = Vec::with_capacity(kv_pairs.len());
626
627        for (key, raw_value) in &mut kv_pairs {
628            if matches!(plan, crate::planner::ScanPlan::SeqScan) {
629                if let Some(ref w) = stmt.where_clause {
630                    let row = decode_full_row(table_schema, key, raw_value)?;
631                    if !eval_expr(w, &col_map, &row).is_ok_and(|v| is_truthy(&v)) {
632                        continue;
633                    }
634                }
635            }
636            if single_int_pk {
637                partial_row[pk_idx_cache[0]] = Value::Integer(decode_pk_integer(key)?);
638            } else {
639                let pk_vals = decode_composite_key(key, num_pk_cols)?;
640                for (i, &pi) in pk_idx_cache.iter().enumerate() {
641                    partial_row[pi] = pk_vals[i].clone();
642                }
643            }
644            for target in &targets {
645                partial_row[target.schema_idx] =
646                    decode_column_raw(raw_value, target.phys_idx)?.to_value();
647            }
648            for target in &targets {
649                let new_val = eval_expr(&target.expr, &col_map, &partial_row)?;
650                let coerced = if new_val.is_null() {
651                    if !target.col.nullable {
652                        return Err(SqlError::NotNullViolation(target.col.name.clone()));
653                    }
654                    Value::Null
655                } else {
656                    let got_type = new_val.data_type();
657                    new_val.coerce_into(target.col.data_type).ok_or_else(|| {
658                        SqlError::TypeMismatch {
659                            expected: target.col.data_type.to_string(),
660                            got: got_type.to_string(),
661                        }
662                    })?
663                };
664                if !patch_column_in_place(raw_value, target.phys_idx, &coerced)? {
665                    patch_row_column(raw_value, target.phys_idx, &coerced, &mut patch_buf)?;
666                    std::mem::swap(raw_value, &mut patch_buf);
667                }
668            }
669            patched.push((std::mem::take(key), std::mem::take(raw_value)));
670        }
671
672        if !patched.is_empty() {
673            let refs: Vec<(&[u8], &[u8])> = patched
674                .iter()
675                .map(|(k, v)| (k.as_slice(), v.as_slice()))
676                .collect();
677            wtx.table_update_sorted(lower_name.as_bytes(), &refs)
678                .map_err(SqlError::Storage)?;
679        }
680        let count = patched.len() as u64;
681        wtx.commit().map_err(SqlError::Storage)?;
682        return Ok(ExecutionResult::RowsAffected(count));
683    }
684
685    // Slow path: has FK/indices/PK changes — materialize all changes for validation
686    let all_candidates = collect_keyed_rows_read(db, table_schema, &stmt.where_clause)?;
687    let matching_rows: Vec<(Vec<u8>, Vec<Value>)> = all_candidates
688        .into_iter()
689        .filter(|(_, row)| match &stmt.where_clause {
690            Some(where_expr) => eval_expr(where_expr, &col_map, row).is_ok_and(|v| is_truthy(&v)),
691            None => true,
692        })
693        .collect();
694
695    if matching_rows.is_empty() {
696        return Ok(ExecutionResult::RowsAffected(0));
697    }
698
699    struct UpdateChange {
700        old_key: Vec<u8>,
701        new_key: Vec<u8>,
702        new_value: Vec<u8>,
703        pk_changed: bool,
704        old_row: Vec<Value>,
705        new_row: Vec<Value>,
706    }
707
708    let mut changes: Vec<UpdateChange> = Vec::new();
709
710    for (old_key, row) in &matching_rows {
711        let mut new_row = row.clone();
712
713        let mut evaluated: Vec<(usize, Value)> = Vec::with_capacity(stmt.assignments.len());
714        for (col_name, expr) in &stmt.assignments {
715            let col_idx = table_schema
716                .column_index(col_name)
717                .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))?;
718            let new_val = eval_expr(expr, &col_map, row)?;
719            let col = &table_schema.columns[col_idx];
720
721            let got_type = new_val.data_type();
722            let coerced = if new_val.is_null() {
723                if !col.nullable {
724                    return Err(SqlError::NotNullViolation(col.name.clone()));
725                }
726                Value::Null
727            } else {
728                new_val
729                    .coerce_into(col.data_type)
730                    .ok_or_else(|| SqlError::TypeMismatch {
731                        expected: col.data_type.to_string(),
732                        got: got_type.to_string(),
733                    })?
734            };
735
736            evaluated.push((col_idx, coerced));
737        }
738
739        for (col_idx, coerced) in evaluated {
740            new_row[col_idx] = coerced;
741        }
742
743        if table_schema.has_checks() {
744            for col in &table_schema.columns {
745                if let Some(ref check) = col.check_expr {
746                    let result = eval_expr(check, &col_map, &new_row)?;
747                    if !is_truthy(&result) && !result.is_null() {
748                        let name = col.check_name.as_deref().unwrap_or(&col.name);
749                        return Err(SqlError::CheckViolation(name.to_string()));
750                    }
751                }
752            }
753            for tc in &table_schema.check_constraints {
754                let result = eval_expr(&tc.expr, &col_map, &new_row)?;
755                if !is_truthy(&result) && !result.is_null() {
756                    let name = tc.name.as_deref().unwrap_or(&tc.sql);
757                    return Err(SqlError::CheckViolation(name.to_string()));
758                }
759            }
760        }
761
762        let pk_values: Vec<Value> = pk_indices.iter().map(|&i| new_row[i].clone()).collect();
763        let new_key = encode_composite_key(&pk_values);
764
765        let non_pk = table_schema.non_pk_indices();
766        let enc_pos = table_schema.encoding_positions();
767        let phys_count = table_schema.physical_non_pk_count();
768        let mut value_values = vec![Value::Null; phys_count];
769        for (j, &i) in non_pk.iter().enumerate() {
770            value_values[enc_pos[j] as usize] = new_row[i].clone();
771        }
772        let new_value = encode_row(&value_values);
773
774        changes.push(UpdateChange {
775            old_key: old_key.clone(),
776            new_key,
777            new_value,
778            pk_changed: pk_changed_by_set,
779            old_row: row.clone(),
780            new_row,
781        });
782    }
783
784    {
785        use std::collections::HashSet;
786        let mut new_keys: HashSet<Vec<u8>> = HashSet::new();
787        for c in &changes {
788            if c.pk_changed && c.new_key != c.old_key && !new_keys.insert(c.new_key.clone()) {
789                return Err(SqlError::DuplicateKey);
790            }
791        }
792    }
793
794    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
795
796    // FK child-side: validate new FK values exist in parent
797    if !table_schema.foreign_keys.is_empty() {
798        for c in &changes {
799            for fk in &table_schema.foreign_keys {
800                let fk_changed = fk
801                    .columns
802                    .iter()
803                    .any(|&ci| c.old_row[ci as usize] != c.new_row[ci as usize]);
804                if !fk_changed {
805                    continue;
806                }
807                let any_null = fk
808                    .columns
809                    .iter()
810                    .any(|&ci| c.new_row[ci as usize].is_null());
811                if any_null {
812                    continue;
813                }
814                let fk_vals: Vec<Value> = fk
815                    .columns
816                    .iter()
817                    .map(|&ci| c.new_row[ci as usize].clone())
818                    .collect();
819                let fk_key = encode_composite_key(&fk_vals);
820                let found = wtx
821                    .table_get(fk.foreign_table.as_bytes(), &fk_key)
822                    .map_err(SqlError::Storage)?;
823                if found.is_none() {
824                    let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
825                    return Err(SqlError::ForeignKeyViolation(name.to_string()));
826                }
827            }
828        }
829    }
830
831    // FK parent-side: if PK changed, check no child references old PK
832    let child_fks = schema.child_fks_for(&lower_name);
833    if !child_fks.is_empty() {
834        for c in &changes {
835            if !c.pk_changed {
836                continue;
837            }
838            let old_pk: Vec<Value> = pk_indices.iter().map(|&i| c.old_row[i].clone()).collect();
839            let old_pk_key = encode_composite_key(&old_pk);
840            for &(child_table, fk) in &child_fks {
841                let child_schema = schema.get(child_table).unwrap();
842                let fk_idx = child_schema
843                    .indices
844                    .iter()
845                    .find(|idx| idx.columns == fk.columns);
846                if let Some(idx) = fk_idx {
847                    let idx_table = TableSchema::index_table_name(child_table, &idx.name);
848                    let mut has_child = false;
849                    wtx.table_scan_from(&idx_table, &old_pk_key, |key, _| {
850                        if key.starts_with(&old_pk_key) {
851                            has_child = true;
852                            Ok(false) // stop scanning
853                        } else {
854                            Ok(false) // past prefix, stop
855                        }
856                    })
857                    .map_err(SqlError::Storage)?;
858                    if has_child {
859                        return Err(SqlError::ForeignKeyViolation(format!(
860                            "cannot update PK in '{}': referenced by '{}'",
861                            lower_name, child_table
862                        )));
863                    }
864                }
865            }
866        }
867    }
868
869    for c in &changes {
870        let old_pk: Vec<Value> = pk_indices.iter().map(|&i| c.old_row[i].clone()).collect();
871
872        for idx in &table_schema.indices {
873            if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
874                let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
875                let old_idx_key = encode_index_key(idx, &c.old_row, &old_pk);
876                wtx.table_delete(&idx_table, &old_idx_key)
877                    .map_err(SqlError::Storage)?;
878            }
879        }
880
881        if c.pk_changed {
882            wtx.table_delete(lower_name.as_bytes(), &c.old_key)
883                .map_err(SqlError::Storage)?;
884        }
885    }
886
887    for c in &changes {
888        let new_pk: Vec<Value> = pk_indices.iter().map(|&i| c.new_row[i].clone()).collect();
889
890        if c.pk_changed {
891            let is_new = wtx
892                .table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
893                .map_err(SqlError::Storage)?;
894            if !is_new {
895                return Err(SqlError::DuplicateKey);
896            }
897        } else {
898            wtx.table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
899                .map_err(SqlError::Storage)?;
900        }
901
902        for idx in &table_schema.indices {
903            if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
904                let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
905                let new_idx_key = encode_index_key(idx, &c.new_row, &new_pk);
906                let new_idx_val = encode_index_value(idx, &c.new_row, &new_pk);
907                let is_new = wtx
908                    .table_insert(&idx_table, &new_idx_key, &new_idx_val)
909                    .map_err(SqlError::Storage)?;
910                if idx.unique && !is_new {
911                    let indexed_values: Vec<Value> = idx
912                        .columns
913                        .iter()
914                        .map(|&col_idx| c.new_row[col_idx as usize].clone())
915                        .collect();
916                    let any_null = indexed_values.iter().any(|v| v.is_null());
917                    if !any_null {
918                        return Err(SqlError::UniqueViolation(idx.name.clone()));
919                    }
920                }
921            }
922        }
923    }
924
925    let count = changes.len() as u64;
926    wtx.commit().map_err(SqlError::Storage)?;
927    Ok(ExecutionResult::RowsAffected(count))
928}
929
930pub(super) fn exec_delete(
931    db: &Database,
932    schema: &SchemaManager,
933    stmt: &DeleteStmt,
934) -> Result<ExecutionResult> {
935    let lower_name = stmt.table.to_ascii_lowercase();
936    if schema.get_view(&lower_name).is_some() {
937        return Err(SqlError::CannotModifyView(stmt.table.clone()));
938    }
939    let table_schema = schema
940        .get(&lower_name)
941        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
942
943    let corr_ctx = CorrelationCtx {
944        outer_schema: table_schema,
945        outer_alias: None,
946    };
947    if has_correlated_where(&stmt.where_clause, &corr_ctx, schema) {
948        let select_stmt = SelectStmt {
949            columns: vec![SelectColumn::AllColumns],
950            from: stmt.table.clone(),
951            from_alias: None,
952            joins: vec![],
953            distinct: false,
954            where_clause: stmt.where_clause.clone(),
955            order_by: vec![],
956            limit: None,
957            offset: None,
958            group_by: vec![],
959            having: None,
960        };
961        let (mut rows, _) = collect_rows_read(db, table_schema, &None, None)?;
962        let remaining =
963            handle_correlated_where_read(db, schema, &select_stmt, &corr_ctx, &mut rows)?;
964
965        if let Some(ref w) = remaining {
966            let col_map = ColumnMap::new(&table_schema.columns);
967            rows.retain(|row| match eval_expr(w, &col_map, row) {
968                Ok(val) => is_truthy(&val),
969                Err(_) => false,
970            });
971        }
972
973        let pk_indices = table_schema.pk_indices();
974        let pk_values: Vec<Value> = rows.iter().map(|row| row[pk_indices[0]].clone()).collect();
975        let pk_col = &table_schema.columns[pk_indices[0]].name;
976        let in_set: std::collections::HashSet<Value> = pk_values.into_iter().collect();
977        let new_where = if in_set.is_empty() {
978            Some(Expr::Literal(Value::Boolean(false)))
979        } else {
980            Some(Expr::InSet {
981                expr: Box::new(Expr::Column(pk_col.clone())),
982                values: in_set,
983                has_null: false,
984                negated: false,
985            })
986        };
987
988        let rewritten = DeleteStmt {
989            table: stmt.table.clone(),
990            where_clause: new_where,
991        };
992        return exec_delete(db, schema, &rewritten);
993    }
994
995    let materialized;
996    let stmt = if delete_has_subquery(stmt) {
997        materialized = materialize_delete(stmt, &mut |sub| {
998            exec_subquery_read(db, schema, sub, &HashMap::new())
999        })?;
1000        &materialized
1001    } else {
1002        stmt
1003    };
1004
1005    let col_map = ColumnMap::new(&table_schema.columns);
1006    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
1007    let all_candidates = collect_keyed_rows_write(&mut wtx, table_schema, &stmt.where_clause)?;
1008    let rows_to_delete: Vec<(Vec<u8>, Vec<Value>)> = all_candidates
1009        .into_iter()
1010        .filter(|(_, row)| match &stmt.where_clause {
1011            Some(where_expr) => match eval_expr(where_expr, &col_map, row) {
1012                Ok(val) => is_truthy(&val),
1013                Err(_) => false,
1014            },
1015            None => true,
1016        })
1017        .collect();
1018
1019    if rows_to_delete.is_empty() {
1020        wtx.commit().map_err(SqlError::Storage)?;
1021        return Ok(ExecutionResult::RowsAffected(0));
1022    }
1023
1024    let pk_indices = table_schema.pk_indices();
1025
1026    // FK parent-side: check no child rows reference deleted PKs
1027    let child_fks = schema.child_fks_for(&lower_name);
1028    if !child_fks.is_empty() {
1029        for (_key, row) in &rows_to_delete {
1030            let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
1031            let pk_key = encode_composite_key(&pk_values);
1032            for &(child_table, fk) in &child_fks {
1033                let child_schema = schema.get(child_table).unwrap();
1034                let fk_idx = child_schema
1035                    .indices
1036                    .iter()
1037                    .find(|idx| idx.columns == fk.columns);
1038                if let Some(idx) = fk_idx {
1039                    let idx_table = TableSchema::index_table_name(child_table, &idx.name);
1040                    let mut has_child = false;
1041                    wtx.table_scan_from(&idx_table, &pk_key, |key, _| {
1042                        if key.starts_with(&pk_key) {
1043                            has_child = true;
1044                            Ok(false)
1045                        } else {
1046                            Ok(false)
1047                        }
1048                    })
1049                    .map_err(SqlError::Storage)?;
1050                    if has_child {
1051                        return Err(SqlError::ForeignKeyViolation(format!(
1052                            "cannot delete from '{}': referenced by '{}'",
1053                            lower_name, child_table
1054                        )));
1055                    }
1056                }
1057            }
1058        }
1059    }
1060
1061    for (key, row) in &rows_to_delete {
1062        let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
1063        delete_index_entries(&mut wtx, table_schema, row, &pk_values)?;
1064        wtx.table_delete(lower_name.as_bytes(), key)
1065            .map_err(SqlError::Storage)?;
1066    }
1067    let count = rows_to_delete.len() as u64;
1068    wtx.commit().map_err(SqlError::Storage)?;
1069    Ok(ExecutionResult::RowsAffected(count))
1070}
1071
1072pub(super) fn exec_select_in_txn(
1073    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1074    schema: &SchemaManager,
1075    stmt: &SelectStmt,
1076    ctes: &CteContext,
1077) -> Result<ExecutionResult> {
1078    if stmt.from.is_empty() {
1079        let materialized;
1080        let stmt = if stmt_has_subquery(stmt) {
1081            materialized =
1082                materialize_stmt(stmt, &mut |sub| exec_subquery_write(wtx, schema, sub, ctes))?;
1083            &materialized
1084        } else {
1085            stmt
1086        };
1087        return super::exec_select_no_from(stmt);
1088    }
1089
1090    let lower_name = stmt.from.to_ascii_lowercase();
1091
1092    if let Some(cte_result) = ctes.get(&lower_name) {
1093        if stmt.joins.is_empty() {
1094            return super::exec_select_from_cte(cte_result, stmt, &mut |sub| {
1095                exec_subquery_write(wtx, schema, sub, ctes)
1096            });
1097        } else {
1098            return super::exec_select_join_with_ctes(stmt, ctes, &mut |name| {
1099                super::scan_table_write(wtx, schema, name)
1100            });
1101        }
1102    }
1103
1104    if !ctes.is_empty()
1105        && stmt
1106            .joins
1107            .iter()
1108            .any(|j| ctes.contains_key(&j.table.name.to_ascii_lowercase()))
1109    {
1110        return super::exec_select_join_with_ctes(stmt, ctes, &mut |name| {
1111            super::scan_table_write_or_view(wtx, schema, name)
1112        });
1113    }
1114
1115    // ── View resolution (in-txn) ────────────────────────────────────
1116    if let Some(view_def) = schema.get_view(&lower_name) {
1117        if let Some(fused) = try_fuse_view(stmt, schema, view_def)? {
1118            return super::exec_select_in_txn(wtx, schema, &fused, ctes);
1119        }
1120        let view_qr = exec_view_write(wtx, schema, view_def)?;
1121        if stmt.joins.is_empty() {
1122            return super::exec_select_from_cte(&view_qr, stmt, &mut |sub| {
1123                exec_subquery_write(wtx, schema, sub, ctes)
1124            });
1125        } else {
1126            let mut view_ctes = ctes.clone();
1127            view_ctes.insert(lower_name.clone(), view_qr);
1128            return super::exec_select_join_with_ctes(stmt, &view_ctes, &mut |name| {
1129                super::scan_table_write_or_view(wtx, schema, name)
1130            });
1131        }
1132    }
1133
1134    let any_join_view = stmt.joins.iter().any(|j| {
1135        schema
1136            .get_view(&j.table.name.to_ascii_lowercase())
1137            .is_some()
1138    });
1139    if any_join_view {
1140        let mut view_ctes = ctes.clone();
1141        for j in &stmt.joins {
1142            let jname = j.table.name.to_ascii_lowercase();
1143            if let Some(vd) = schema.get_view(&jname) {
1144                if let std::collections::hash_map::Entry::Vacant(e) = view_ctes.entry(jname) {
1145                    let vqr = exec_view_write(wtx, schema, vd)?;
1146                    e.insert(vqr);
1147                }
1148            }
1149        }
1150        return super::exec_select_join_with_ctes(stmt, &view_ctes, &mut |name| {
1151            super::scan_table_write(wtx, schema, name)
1152        });
1153    }
1154
1155    if !stmt.joins.is_empty() {
1156        return super::exec_select_join_in_txn(wtx, schema, stmt);
1157    }
1158
1159    let lower_name = stmt.from.to_ascii_lowercase();
1160    let table_schema = schema
1161        .get(&lower_name)
1162        .ok_or_else(|| SqlError::TableNotFound(stmt.from.clone()))?;
1163
1164    // Correlated subquery handling (in-txn)
1165    let corr_ctx = CorrelationCtx {
1166        outer_schema: table_schema,
1167        outer_alias: stmt.from_alias.as_deref(),
1168    };
1169    if has_correlated_where(&stmt.where_clause, &corr_ctx, schema) {
1170        let (mut rows, _) = collect_rows_write(wtx, table_schema, &None, None)?;
1171        let remaining_where =
1172            handle_correlated_where_write(wtx, schema, stmt, &corr_ctx, &mut rows)?;
1173        let clean_stmt = SelectStmt {
1174            where_clause: remaining_where,
1175            columns: stmt.columns.clone(),
1176            from: stmt.from.clone(),
1177            from_alias: stmt.from_alias.clone(),
1178            joins: stmt.joins.clone(),
1179            distinct: stmt.distinct,
1180            order_by: stmt.order_by.clone(),
1181            limit: stmt.limit.clone(),
1182            offset: stmt.offset.clone(),
1183            group_by: stmt.group_by.clone(),
1184            having: stmt.having.clone(),
1185        };
1186        let final_stmt;
1187        let s = if stmt_has_subquery(&clean_stmt) {
1188            final_stmt = materialize_stmt(&clean_stmt, &mut |sub| {
1189                exec_subquery_write(wtx, schema, sub, ctes)
1190            })?;
1191            &final_stmt
1192        } else {
1193            &clean_stmt
1194        };
1195        return super::process_select(&table_schema.columns, rows, s, false);
1196    }
1197
1198    let materialized;
1199    let stmt = if stmt_has_subquery(stmt) {
1200        materialized =
1201            materialize_stmt(stmt, &mut |sub| exec_subquery_write(wtx, schema, sub, ctes))?;
1202        &materialized
1203    } else {
1204        stmt
1205    };
1206
1207    if let Some(result) = try_count_star_shortcut(stmt, || {
1208        wtx.table_entry_count(lower_name.as_bytes())
1209            .map_err(SqlError::Storage)
1210    })? {
1211        return Ok(result);
1212    }
1213
1214    if let Some(plan) = StreamAggPlan::try_new(stmt, table_schema)? {
1215        let mut states: Vec<AggState> = plan.ops.iter().map(|(op, _)| AggState::new(op)).collect();
1216        let mut scan_err: Option<SqlError> = None;
1217        if stmt.where_clause.is_none() {
1218            wtx.table_scan_from(lower_name.as_bytes(), b"", |key, value| {
1219                Ok(plan.feed_row_raw(key, value, &mut states, &mut scan_err))
1220            })
1221            .map_err(SqlError::Storage)?;
1222        } else {
1223            let col_map = ColumnMap::new(&table_schema.columns);
1224            wtx.table_scan_from(lower_name.as_bytes(), b"", |key, value| {
1225                Ok(plan.feed_row(
1226                    key,
1227                    value,
1228                    table_schema,
1229                    &col_map,
1230                    &stmt.where_clause,
1231                    &mut states,
1232                    &mut scan_err,
1233                ))
1234            })
1235            .map_err(SqlError::Storage)?;
1236        }
1237        if let Some(e) = scan_err {
1238            return Err(e);
1239        }
1240        return Ok(plan.finish(states));
1241    }
1242
1243    if let Some(plan) = StreamGroupByPlan::try_new(stmt, table_schema)? {
1244        let lower = lower_name.clone();
1245        return plan.execute_scan(|cb| {
1246            wtx.table_scan_from(lower.as_bytes(), b"", |key, value| Ok(cb(key, value)))
1247        });
1248    }
1249
1250    if let Some(plan) = TopKScanPlan::try_new(stmt, table_schema)? {
1251        let lower = lower_name.clone();
1252        return plan.execute_scan(table_schema, stmt, |cb| {
1253            wtx.table_scan_from(lower.as_bytes(), b"", |key, value| Ok(cb(key, value)))
1254        });
1255    }
1256
1257    let scan_limit = compute_scan_limit(stmt);
1258    let (rows, predicate_applied) =
1259        collect_rows_write(wtx, table_schema, &stmt.where_clause, scan_limit)?;
1260    super::process_select(&table_schema.columns, rows, stmt, predicate_applied)
1261}
1262
1263pub(super) fn exec_update_in_txn(
1264    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1265    schema: &SchemaManager,
1266    stmt: &UpdateStmt,
1267) -> Result<ExecutionResult> {
1268    let materialized;
1269    let stmt = if update_has_subquery(stmt) {
1270        materialized = materialize_update(stmt, &mut |sub| {
1271            exec_subquery_write(wtx, schema, sub, &HashMap::new())
1272        })?;
1273        &materialized
1274    } else {
1275        stmt
1276    };
1277
1278    let lower_name = stmt.table.to_ascii_lowercase();
1279    let table_schema = schema
1280        .get(&lower_name)
1281        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1282
1283    let col_map = ColumnMap::new(&table_schema.columns);
1284    let all_candidates = collect_keyed_rows_write(wtx, table_schema, &stmt.where_clause)?;
1285    let matching_rows: Vec<(Vec<u8>, Vec<Value>)> = all_candidates
1286        .into_iter()
1287        .filter(|(_, row)| match &stmt.where_clause {
1288            Some(where_expr) => match eval_expr(where_expr, &col_map, row) {
1289                Ok(val) => is_truthy(&val),
1290                Err(_) => false,
1291            },
1292            None => true,
1293        })
1294        .collect();
1295
1296    if matching_rows.is_empty() {
1297        return Ok(ExecutionResult::RowsAffected(0));
1298    }
1299
1300    struct UpdateChange {
1301        old_key: Vec<u8>,
1302        new_key: Vec<u8>,
1303        new_value: Vec<u8>,
1304        pk_changed: bool,
1305        old_row: Vec<Value>,
1306        new_row: Vec<Value>,
1307    }
1308
1309    let pk_indices = table_schema.pk_indices();
1310    let mut changes: Vec<UpdateChange> = Vec::new();
1311
1312    for (old_key, row) in &matching_rows {
1313        let mut new_row = row.clone();
1314        let mut pk_changed = false;
1315
1316        // Evaluate all SET expressions against the original row (SQL standard).
1317        let mut evaluated: Vec<(usize, Value)> = Vec::with_capacity(stmt.assignments.len());
1318        for (col_name, expr) in &stmt.assignments {
1319            let col_idx = table_schema
1320                .column_index(col_name)
1321                .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))?;
1322            let new_val = eval_expr(expr, &col_map, row)?;
1323            let col = &table_schema.columns[col_idx];
1324
1325            let got_type = new_val.data_type();
1326            let coerced = if new_val.is_null() {
1327                if !col.nullable {
1328                    return Err(SqlError::NotNullViolation(col.name.clone()));
1329                }
1330                Value::Null
1331            } else {
1332                new_val
1333                    .coerce_into(col.data_type)
1334                    .ok_or_else(|| SqlError::TypeMismatch {
1335                        expected: col.data_type.to_string(),
1336                        got: got_type.to_string(),
1337                    })?
1338            };
1339
1340            evaluated.push((col_idx, coerced));
1341        }
1342
1343        for (col_idx, coerced) in evaluated {
1344            if table_schema.primary_key_columns.contains(&(col_idx as u16)) {
1345                pk_changed = true;
1346            }
1347            new_row[col_idx] = coerced;
1348        }
1349
1350        if table_schema.has_checks() {
1351            for col in &table_schema.columns {
1352                if let Some(ref check) = col.check_expr {
1353                    let result = eval_expr(check, &col_map, &new_row)?;
1354                    if !is_truthy(&result) && !result.is_null() {
1355                        let name = col.check_name.as_deref().unwrap_or(&col.name);
1356                        return Err(SqlError::CheckViolation(name.to_string()));
1357                    }
1358                }
1359            }
1360            for tc in &table_schema.check_constraints {
1361                let result = eval_expr(&tc.expr, &col_map, &new_row)?;
1362                if !is_truthy(&result) && !result.is_null() {
1363                    let name = tc.name.as_deref().unwrap_or(&tc.sql);
1364                    return Err(SqlError::CheckViolation(name.to_string()));
1365                }
1366            }
1367        }
1368
1369        let pk_values: Vec<Value> = pk_indices.iter().map(|&i| new_row[i].clone()).collect();
1370        let new_key = encode_composite_key(&pk_values);
1371
1372        let non_pk = table_schema.non_pk_indices();
1373        let enc_pos = table_schema.encoding_positions();
1374        let phys_count = table_schema.physical_non_pk_count();
1375        let mut value_values = vec![Value::Null; phys_count];
1376        for (j, &i) in non_pk.iter().enumerate() {
1377            value_values[enc_pos[j] as usize] = new_row[i].clone();
1378        }
1379        let new_value = encode_row(&value_values);
1380
1381        changes.push(UpdateChange {
1382            old_key: old_key.clone(),
1383            new_key,
1384            new_value,
1385            pk_changed,
1386            old_row: row.clone(),
1387            new_row,
1388        });
1389    }
1390
1391    {
1392        use std::collections::HashSet;
1393        let mut new_keys: HashSet<Vec<u8>> = HashSet::new();
1394        for c in &changes {
1395            if c.pk_changed && c.new_key != c.old_key && !new_keys.insert(c.new_key.clone()) {
1396                return Err(SqlError::DuplicateKey);
1397            }
1398        }
1399    }
1400
1401    // FK child-side: validate new FK values exist in parent
1402    if !table_schema.foreign_keys.is_empty() {
1403        for c in &changes {
1404            for fk in &table_schema.foreign_keys {
1405                let fk_changed = fk
1406                    .columns
1407                    .iter()
1408                    .any(|&ci| c.old_row[ci as usize] != c.new_row[ci as usize]);
1409                if !fk_changed {
1410                    continue;
1411                }
1412                let any_null = fk
1413                    .columns
1414                    .iter()
1415                    .any(|&ci| c.new_row[ci as usize].is_null());
1416                if any_null {
1417                    continue;
1418                }
1419                let fk_vals: Vec<Value> = fk
1420                    .columns
1421                    .iter()
1422                    .map(|&ci| c.new_row[ci as usize].clone())
1423                    .collect();
1424                let fk_key = encode_composite_key(&fk_vals);
1425                let found = wtx
1426                    .table_get(fk.foreign_table.as_bytes(), &fk_key)
1427                    .map_err(SqlError::Storage)?;
1428                if found.is_none() {
1429                    let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
1430                    return Err(SqlError::ForeignKeyViolation(name.to_string()));
1431                }
1432            }
1433        }
1434    }
1435
1436    // FK parent-side: if PK changed, check no child references old PK
1437    let child_fks = schema.child_fks_for(&lower_name);
1438    if !child_fks.is_empty() {
1439        for c in &changes {
1440            if !c.pk_changed {
1441                continue;
1442            }
1443            let old_pk: Vec<Value> = pk_indices.iter().map(|&i| c.old_row[i].clone()).collect();
1444            let old_pk_key = encode_composite_key(&old_pk);
1445            for &(child_table, fk) in &child_fks {
1446                let child_schema = schema.get(child_table).unwrap();
1447                let fk_idx = child_schema
1448                    .indices
1449                    .iter()
1450                    .find(|idx| idx.columns == fk.columns);
1451                if let Some(idx) = fk_idx {
1452                    let idx_table = TableSchema::index_table_name(child_table, &idx.name);
1453                    let mut has_child = false;
1454                    wtx.table_scan_from(&idx_table, &old_pk_key, |key, _| {
1455                        if key.starts_with(&old_pk_key) {
1456                            has_child = true;
1457                            Ok(false)
1458                        } else {
1459                            Ok(false)
1460                        }
1461                    })
1462                    .map_err(SqlError::Storage)?;
1463                    if has_child {
1464                        return Err(SqlError::ForeignKeyViolation(format!(
1465                            "cannot update PK in '{}': referenced by '{}'",
1466                            lower_name, child_table
1467                        )));
1468                    }
1469                }
1470            }
1471        }
1472    }
1473
1474    for c in &changes {
1475        let old_pk: Vec<Value> = pk_indices.iter().map(|&i| c.old_row[i].clone()).collect();
1476
1477        for idx in &table_schema.indices {
1478            if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
1479                let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
1480                let old_idx_key = encode_index_key(idx, &c.old_row, &old_pk);
1481                wtx.table_delete(&idx_table, &old_idx_key)
1482                    .map_err(SqlError::Storage)?;
1483            }
1484        }
1485
1486        if c.pk_changed {
1487            wtx.table_delete(lower_name.as_bytes(), &c.old_key)
1488                .map_err(SqlError::Storage)?;
1489        }
1490    }
1491
1492    for c in &changes {
1493        let new_pk: Vec<Value> = pk_indices.iter().map(|&i| c.new_row[i].clone()).collect();
1494
1495        if c.pk_changed {
1496            let is_new = wtx
1497                .table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
1498                .map_err(SqlError::Storage)?;
1499            if !is_new {
1500                return Err(SqlError::DuplicateKey);
1501            }
1502        } else {
1503            wtx.table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
1504                .map_err(SqlError::Storage)?;
1505        }
1506
1507        for idx in &table_schema.indices {
1508            if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
1509                let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
1510                let new_idx_key = encode_index_key(idx, &c.new_row, &new_pk);
1511                let new_idx_val = encode_index_value(idx, &c.new_row, &new_pk);
1512                let is_new = wtx
1513                    .table_insert(&idx_table, &new_idx_key, &new_idx_val)
1514                    .map_err(SqlError::Storage)?;
1515                if idx.unique && !is_new {
1516                    let indexed_values: Vec<Value> = idx
1517                        .columns
1518                        .iter()
1519                        .map(|&col_idx| c.new_row[col_idx as usize].clone())
1520                        .collect();
1521                    let any_null = indexed_values.iter().any(|v| v.is_null());
1522                    if !any_null {
1523                        return Err(SqlError::UniqueViolation(idx.name.clone()));
1524                    }
1525                }
1526            }
1527        }
1528    }
1529
1530    let count = changes.len() as u64;
1531    Ok(ExecutionResult::RowsAffected(count))
1532}
1533
1534pub(super) fn exec_delete_in_txn(
1535    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1536    schema: &SchemaManager,
1537    stmt: &DeleteStmt,
1538) -> Result<ExecutionResult> {
1539    let materialized;
1540    let stmt = if delete_has_subquery(stmt) {
1541        materialized = materialize_delete(stmt, &mut |sub| {
1542            exec_subquery_write(wtx, schema, sub, &HashMap::new())
1543        })?;
1544        &materialized
1545    } else {
1546        stmt
1547    };
1548
1549    let lower_name = stmt.table.to_ascii_lowercase();
1550    let table_schema = schema
1551        .get(&lower_name)
1552        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1553
1554    let col_map = ColumnMap::new(&table_schema.columns);
1555    let all_candidates = collect_keyed_rows_write(wtx, table_schema, &stmt.where_clause)?;
1556    let rows_to_delete: Vec<(Vec<u8>, Vec<Value>)> = all_candidates
1557        .into_iter()
1558        .filter(|(_, row)| match &stmt.where_clause {
1559            Some(where_expr) => match eval_expr(where_expr, &col_map, row) {
1560                Ok(val) => is_truthy(&val),
1561                Err(_) => false,
1562            },
1563            None => true,
1564        })
1565        .collect();
1566
1567    if rows_to_delete.is_empty() {
1568        return Ok(ExecutionResult::RowsAffected(0));
1569    }
1570
1571    let pk_indices = table_schema.pk_indices();
1572
1573    // FK parent-side: check no child rows reference deleted PKs
1574    let child_fks = schema.child_fks_for(&lower_name);
1575    if !child_fks.is_empty() {
1576        for (_key, row) in &rows_to_delete {
1577            let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
1578            let pk_key = encode_composite_key(&pk_values);
1579            for &(child_table, fk) in &child_fks {
1580                let child_schema = schema.get(child_table).unwrap();
1581                let fk_idx = child_schema
1582                    .indices
1583                    .iter()
1584                    .find(|idx| idx.columns == fk.columns);
1585                if let Some(idx) = fk_idx {
1586                    let idx_table = TableSchema::index_table_name(child_table, &idx.name);
1587                    let mut has_child = false;
1588                    wtx.table_scan_from(&idx_table, &pk_key, |key, _| {
1589                        if key.starts_with(&pk_key) {
1590                            has_child = true;
1591                            Ok(false)
1592                        } else {
1593                            Ok(false)
1594                        }
1595                    })
1596                    .map_err(SqlError::Storage)?;
1597                    if has_child {
1598                        return Err(SqlError::ForeignKeyViolation(format!(
1599                            "cannot delete from '{}': referenced by '{}'",
1600                            lower_name, child_table
1601                        )));
1602                    }
1603                }
1604            }
1605        }
1606    }
1607
1608    for (key, row) in &rows_to_delete {
1609        let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
1610        delete_index_entries(wtx, table_schema, row, &pk_values)?;
1611        wtx.table_delete(lower_name.as_bytes(), key)
1612            .map_err(SqlError::Storage)?;
1613    }
1614    let count = rows_to_delete.len() as u64;
1615    Ok(ExecutionResult::RowsAffected(count))
1616}