Skip to main content

citadel_sql/executor/
dml.rs

1use std::cell::RefCell;
2use std::sync::Arc;
3
4use citadel::Database;
5use citadel_buffer::btree::{UpsertAction, UpsertOutcome};
6use citadel_txn::read_txn::ReadTxn;
7use citadel_txn::write_txn::WriteTxn;
8use rustc_hash::FxHashMap;
9
10use crate::encoding::{encode_composite_key_into, encode_row_into};
11use crate::error::{Result, SqlError};
12use crate::eval::{eval_expr, is_truthy, ColumnMap, EvalCtx};
13use crate::parser::*;
14use crate::types::*;
15
16use crate::schema::SchemaManager;
17
18use super::compile::CompiledPlan;
19use super::helpers::*;
20use super::CteContext;
21
22/// (before-insert, after-insert, after-update) row-trigger presence, hoisted once.
23fn row_insert_trigger_flags(schema: &SchemaManager, table_name: &str) -> (bool, bool, bool) {
24    use crate::parser::{TriggerEvent, TriggerGranularity, TriggerTiming};
25    let triggers = schema.triggers_for(table_name);
26    let row =
27        |t: &crate::types::TriggerDef| t.enabled && t.granularity == TriggerGranularity::ForEachRow;
28    let ev = |t: &crate::types::TriggerDef, update: bool| {
29        t.events.iter().any(|e| {
30            if update {
31                matches!(e, TriggerEvent::Update(_))
32            } else {
33                matches!(e, TriggerEvent::Insert)
34            }
35        })
36    };
37    (
38        triggers
39            .iter()
40            .any(|t| row(t) && t.timing == TriggerTiming::Before && ev(t, false)),
41        triggers
42            .iter()
43            .any(|t| row(t) && t.timing == TriggerTiming::After && ev(t, false)),
44        triggers
45            .iter()
46            .any(|t| row(t) && t.timing == TriggerTiming::After && ev(t, true)),
47    )
48}
49
50/// Classify an INSERT for cache invalidation: a pure append (single-INTEGER pk,
51/// no conflict) stays append-retainable; anything else hard-invalidates.
52fn mark_insert_dml(
53    schema: &SchemaManager,
54    table_name: &str,
55    on_conflict: bool,
56    single_int_pk: bool,
57    min_inserted_pk: Option<i64>,
58    rows_written: u64,
59) {
60    if on_conflict {
61        schema.mark_dml(table_name);
62    } else if single_int_pk {
63        if let Some(m) = min_inserted_pk {
64            schema.mark_dml_append(table_name, m);
65        }
66    } else if rows_written > 0 {
67        schema.mark_dml(table_name);
68    }
69}
70
71/// Single INTEGER pk - the only shape an ANN plan indexes (and append-retains).
72fn is_single_int_pk(table_schema: &TableSchema) -> bool {
73    table_schema.primary_key_columns.len() == 1
74        && matches!(
75            table_schema.columns[table_schema.primary_key_columns[0] as usize].data_type,
76            DataType::Integer
77        )
78}
79
80pub(super) fn exec_insert(
81    db: &Database,
82    schema: &SchemaManager,
83    stmt: &InsertStmt,
84    params: &[Value],
85) -> Result<ExecutionResult> {
86    let empty_ctes = CteContext::default();
87    let materialized;
88    let stmt = if insert_has_subquery(stmt) {
89        materialized = materialize_insert(stmt, &mut |sub| {
90            exec_subquery_read(db, schema, sub, &empty_ctes)
91        })?;
92        &materialized
93    } else {
94        stmt
95    };
96
97    let lower_name = stmt.table.to_ascii_lowercase();
98    if let Some(view_def) = schema.get_view(&lower_name) {
99        if super::triggers::has_instead_of(schema, &lower_name, super::triggers::FireEvent::Insert)
100        {
101            let aliases = view_def.column_aliases.clone();
102            return exec_instead_of_view_insert_auto(
103                db,
104                schema,
105                &lower_name,
106                &aliases,
107                stmt,
108                params,
109            );
110        }
111        return Err(SqlError::CannotModifyView(stmt.table.clone()));
112    }
113    if schema.get_matview(&lower_name).is_some() {
114        return Err(SqlError::CannotModifyView(format!(
115            "materialized view '{}' is read-only — use REFRESH MATERIALIZED VIEW",
116            stmt.table
117        )));
118    }
119    let table_schema = schema
120        .get(&lower_name)
121        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
122
123    let insert_columns = if stmt.columns.is_empty() {
124        table_schema
125            .columns
126            .iter()
127            .map(|c| c.name.clone())
128            .collect::<Vec<_>>()
129    } else {
130        stmt.columns
131            .iter()
132            .map(|c| c.to_ascii_lowercase())
133            .collect()
134    };
135
136    let col_indices: Vec<usize> = insert_columns
137        .iter()
138        .map(|name| {
139            table_schema
140                .column_index(name)
141                .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
142        })
143        .collect::<Result<_>>()?;
144
145    for &ci in &col_indices {
146        if table_schema.columns[ci].generated_kind.is_some() {
147            return Err(SqlError::CannotInsertIntoGeneratedColumn(
148                table_schema.columns[ci].name.clone(),
149            ));
150        }
151    }
152
153    let defaults: Vec<(usize, &Expr)> = table_schema
154        .columns
155        .iter()
156        .filter(|c| c.default_expr.is_some() && !col_indices.contains(&(c.position as usize)))
157        .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
158        .collect();
159
160    let generated_cols: Vec<(usize, &Expr)> = table_schema
161        .columns
162        .iter()
163        .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
164        .map(|c| (c.position as usize, c.generated_expr.as_ref().unwrap()))
165        .collect();
166
167    let has_checks = table_schema.has_checks();
168    let strict = table_schema.is_strict();
169    let row_col_map_for_gen = (!generated_cols.is_empty()).then(|| table_schema.column_map());
170    let check_col_map = has_checks.then(|| table_schema.column_map());
171
172    let select_rows = match &stmt.source {
173        InsertSource::Select(sq) => {
174            let insert_ctes =
175                super::materialize_all_ctes(&sq.ctes, sq.recursive, &mut |body, ctx| {
176                    exec_query_body_read(db, schema, body, ctx)
177                })?;
178            let qr = exec_query_body_read(db, schema, &sq.body, &insert_ctes)?;
179            Some(qr.rows)
180        }
181        InsertSource::Values(_) => None,
182    };
183
184    let compiled_conflict: Option<Arc<CompiledOnConflict>> = stmt
185        .on_conflict
186        .as_ref()
187        .map(|oc| compile_on_conflict(oc, table_schema).map(Arc::new))
188        .transpose()?;
189
190    let row_col_map = compiled_conflict
191        .as_ref()
192        .map(|_| table_schema.column_map());
193
194    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
195    // DML invalidates the table's persisted ANN segment in the SAME txn
196    // (rollback restores it; commit makes table-changed-but-segment-survives
197    // unrepresentable for this path).
198    if table_schema.has_ann_index() {
199        super::ann_persist::purge_segment(&mut wtx, &table_schema.name)?;
200    }
201    let mut count: u64 = 0;
202    let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
203        stmt.returning.as_ref().map(|_| Vec::new());
204
205    let pk_indices = table_schema.pk_indices();
206    let non_pk = table_schema.non_pk_indices();
207    let enc_pos = table_schema.encoding_positions();
208    let phys_count = table_schema.physical_non_pk_count();
209    let mut row = vec![Value::Null; table_schema.columns.len()];
210    let mut pk_values: Vec<Value> = vec![Value::Null; pk_indices.len()];
211    let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
212    let mut key_buf: Vec<u8> = Vec::with_capacity(64);
213    let mut value_buf: Vec<u8> = Vec::with_capacity(256);
214    let mut fk_key_buf: Vec<u8> = Vec::with_capacity(64);
215
216    let values = match &stmt.source {
217        InsertSource::Values(rows) => Some(rows.as_slice()),
218        InsertSource::Select(_) => None,
219    };
220    let sel_rows = select_rows.as_deref();
221
222    let total = match (values, sel_rows) {
223        (Some(rows), _) => rows.len(),
224        (_, Some(rows)) => rows.len(),
225        _ => 0,
226    };
227
228    if let Some(sel) = sel_rows {
229        if !sel.is_empty() && sel[0].len() != insert_columns.len() {
230            return Err(SqlError::InvalidValue(format!(
231                "INSERT ... SELECT column count mismatch: expected {}, got {}",
232                insert_columns.len(),
233                sel[0].len()
234            )));
235        }
236    }
237
238    let has_insert_statement_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
239        t.enabled
240            && t.granularity == crate::parser::TriggerGranularity::ForEachStatement
241            && t.events
242                .iter()
243                .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
244    });
245    let mut stmt_new_rows: Vec<Vec<Value>> = if has_insert_statement_triggers {
246        Vec::with_capacity(total)
247    } else {
248        Vec::new()
249    };
250
251    if has_insert_statement_triggers {
252        super::triggers::fire_statement_triggers(
253            &mut wtx,
254            schema,
255            &table_schema.name,
256            crate::parser::TriggerTiming::Before,
257            super::triggers::FireEvent::Insert,
258            &table_schema.columns,
259            &[],
260            &[],
261        )?;
262    }
263
264    let plain_insert = compiled_conflict.is_none();
265    let single_int_pk = is_single_int_pk(table_schema);
266    let mut min_inserted_pk: Option<i64> = None;
267    let (has_before_insert_triggers, has_after_insert_triggers, has_after_update_triggers) =
268        row_insert_trigger_flags(schema, &table_schema.name);
269
270    for idx in 0..total {
271        for v in row.iter_mut() {
272            *v = Value::Null;
273        }
274
275        if let Some(value_rows) = values {
276            let value_row = &value_rows[idx];
277            if value_row.len() != insert_columns.len() {
278                return Err(SqlError::InvalidValue(format!(
279                    "expected {} values, got {}",
280                    insert_columns.len(),
281                    value_row.len()
282                )));
283            }
284            for (i, expr) in value_row.iter().enumerate() {
285                let val = if let Expr::Parameter(n) = expr {
286                    params
287                        .get(n - 1)
288                        .cloned()
289                        .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?
290                } else {
291                    eval_const_expr(expr)?
292                };
293                let col_idx = col_indices[i];
294                let col = &table_schema.columns[col_idx];
295                row[col_idx] = if val.is_null() {
296                    Value::Null
297                } else {
298                    coerce_for_column(val, col, strict)?
299                };
300            }
301        } else if let Some(sel) = sel_rows {
302            let sel_row = &sel[idx];
303            for (i, val) in sel_row.iter().enumerate() {
304                let col_idx = col_indices[i];
305                let col = &table_schema.columns[col_idx];
306                row[col_idx] = if val.is_null() {
307                    Value::Null
308                } else {
309                    coerce_for_column(val.clone(), col, strict)?
310                };
311            }
312        }
313
314        for &(pos, def_expr) in &defaults {
315            let val = eval_const_expr(def_expr)?;
316            let col = &table_schema.columns[pos];
317            if !val.is_null() {
318                row[pos] = coerce_for_column(val, col, strict)?;
319            }
320        }
321
322        if let Some(gen_map) = row_col_map_for_gen {
323            for &(pos, gen_expr) in &generated_cols {
324                let val = eval_expr(gen_expr, &EvalCtx::new(gen_map, &row))?;
325                let col = &table_schema.columns[pos];
326                row[pos] = if val.is_null() {
327                    Value::Null
328                } else {
329                    coerce_for_column(val, col, strict)?
330                };
331            }
332        }
333
334        for col in &table_schema.columns {
335            if !col.nullable && row[col.position as usize].is_null() {
336                return Err(SqlError::NotNullViolation(col.name.clone()));
337            }
338        }
339
340        if let Some(col_map) = check_col_map {
341            for col in &table_schema.columns {
342                if let Some(ref check) = col.check_expr {
343                    let result = eval_expr(check, &EvalCtx::new(col_map, &row))?;
344                    if !is_truthy(&result) && !result.is_null() {
345                        let name = col.check_name.as_deref().unwrap_or(&col.name);
346                        return Err(SqlError::CheckViolation(name.to_string()));
347                    }
348                }
349            }
350            for tc in &table_schema.check_constraints {
351                let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &row))?;
352                if !is_truthy(&result) && !result.is_null() {
353                    let name = tc.name.as_deref().unwrap_or(&tc.sql);
354                    return Err(SqlError::CheckViolation(name.to_string()));
355                }
356            }
357        }
358
359        for fk in &table_schema.foreign_keys {
360            let any_null = fk.columns.iter().any(|&ci| row[ci as usize].is_null());
361            if any_null {
362                continue; // MATCH SIMPLE: skip if any FK col is NULL
363            }
364            let fk_vals: Vec<Value> = fk
365                .columns
366                .iter()
367                .map(|&ci| row[ci as usize].clone())
368                .collect();
369            fk_key_buf.clear();
370            encode_composite_key_into(&fk_vals, &mut fk_key_buf);
371            if fk.deferrable && fk.initially_deferred {
372                let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
373                wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
374                    fk_name: name,
375                    foreign_table: fk.foreign_table.as_bytes().to_vec(),
376                    parent_key: fk_key_buf.clone(),
377                });
378                continue;
379            }
380            if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key_buf) {
381                let found = wtx
382                    .table_get(fk.foreign_table.as_bytes(), &fk_key_buf)
383                    .map_err(SqlError::Storage)?;
384                if found.is_none() {
385                    let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
386                    return Err(SqlError::ForeignKeyViolation(name.to_string()));
387                }
388                wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key_buf);
389            }
390        }
391
392        let proposed_row_for_returning: Option<Vec<Value>> =
393            returning_rows.as_ref().map(|_| row.clone());
394        let row_for_stmt_trigger: Option<Vec<Value>> = if has_insert_statement_triggers {
395            Some(row.clone())
396        } else {
397            None
398        };
399
400        if has_before_insert_triggers {
401            super::triggers::fire_row_triggers(
402                &mut wtx,
403                schema,
404                &table_schema.name,
405                crate::parser::TriggerTiming::Before,
406                super::triggers::FireEvent::Insert,
407                None,
408                Some(row.clone()),
409                &table_schema.columns,
410            )?;
411        }
412
413        for (j, &i) in pk_indices.iter().enumerate() {
414            pk_values[j] = std::mem::replace(&mut row[i], Value::Null);
415        }
416        encode_composite_key_into(&pk_values, &mut key_buf);
417        if plain_insert && single_int_pk {
418            if let Value::Integer(id) = &pk_values[0] {
419                min_inserted_pk = Some(min_inserted_pk.map_or(*id, |m| m.min(*id)));
420            }
421        }
422
423        for (j, &i) in non_pk.iter().enumerate() {
424            let col = &table_schema.columns[i];
425            if matches!(
426                col.generated_kind,
427                Some(crate::parser::GeneratedKind::Virtual)
428            ) {
429                value_values[enc_pos[j] as usize] = Value::Null;
430                row[i] = Value::Null;
431            } else {
432                value_values[enc_pos[j] as usize] = std::mem::replace(&mut row[i], Value::Null);
433            }
434        }
435        encode_row_into(&value_values, &mut value_buf);
436
437        if key_buf.len() > citadel_core::MAX_KEY_SIZE {
438            return Err(SqlError::KeyTooLarge {
439                size: key_buf.len(),
440                max: citadel_core::MAX_KEY_SIZE,
441            });
442        }
443        if value_buf.len() > citadel_core::MAX_VALUE_SIZE {
444            return Err(SqlError::RowTooLarge {
445                size: value_buf.len(),
446                max: citadel_core::MAX_VALUE_SIZE,
447            });
448        }
449
450        match compiled_conflict.as_ref() {
451            None => {
452                let is_new = wtx
453                    .table_insert(table_schema.name.as_bytes(), &key_buf, &value_buf)
454                    .map_err(SqlError::Storage)?;
455                if !is_new {
456                    return Err(SqlError::DuplicateKey);
457                }
458                if !table_schema.indices.is_empty() || has_after_insert_triggers {
459                    for (j, &i) in pk_indices.iter().enumerate() {
460                        row[i] = pk_values[j].clone();
461                    }
462                    for (j, &i) in non_pk.iter().enumerate() {
463                        row[i] =
464                            std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
465                    }
466                    if !table_schema.indices.is_empty() {
467                        insert_index_entries(&mut wtx, table_schema, &row, &pk_values)?;
468                    }
469                    if has_after_insert_triggers {
470                        super::triggers::fire_row_triggers(
471                            &mut wtx,
472                            schema,
473                            &table_schema.name,
474                            crate::parser::TriggerTiming::After,
475                            super::triggers::FireEvent::Insert,
476                            None,
477                            Some(row.clone()),
478                            &table_schema.columns,
479                        )?;
480                    }
481                }
482                if let Some(r) = row_for_stmt_trigger.clone() {
483                    stmt_new_rows.push(r);
484                }
485                count += 1;
486                if let Some(buf) = returning_rows.as_mut() {
487                    buf.push((None, proposed_row_for_returning));
488                }
489            }
490            Some(oc) => {
491                let oc_ref: &CompiledOnConflict = oc;
492                let needs_row = upsert_needs_row(oc_ref, table_schema);
493                if needs_row {
494                    for (j, &i) in pk_indices.iter().enumerate() {
495                        row[i] = pk_values[j].clone();
496                    }
497                    for (j, &i) in non_pk.iter().enumerate() {
498                        row[i] =
499                            std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
500                    }
501                }
502                let outcome = apply_insert_with_conflict(
503                    &mut wtx,
504                    table_schema,
505                    &key_buf,
506                    &value_buf,
507                    &row,
508                    &pk_values,
509                    oc_ref,
510                    row_col_map.unwrap(),
511                    // Trigger dispatch needs the Updated outcome's rows too.
512                    stmt.returning.is_some() || has_after_update_triggers,
513                )?;
514                match outcome {
515                    InsertRowOutcome::Inserted => {
516                        count += 1;
517                        if let Some(buf) = returning_rows.as_mut() {
518                            buf.push((None, proposed_row_for_returning));
519                        }
520                        if let Some(r) = row_for_stmt_trigger.clone() {
521                            stmt_new_rows.push(r);
522                        }
523                        if has_after_insert_triggers {
524                            super::triggers::fire_row_triggers(
525                                &mut wtx,
526                                schema,
527                                &table_schema.name,
528                                crate::parser::TriggerTiming::After,
529                                super::triggers::FireEvent::Insert,
530                                None,
531                                Some(row.clone()),
532                                &table_schema.columns,
533                            )?;
534                        }
535                    }
536                    InsertRowOutcome::Updated { old, new } => {
537                        count += 1;
538                        if let Some(buf) = returning_rows.as_mut() {
539                            buf.push((Some(old.clone()), Some(new.clone())));
540                        }
541                        if has_after_update_triggers {
542                            let changed_cols: Vec<String> = match oc_ref {
543                                CompiledOnConflict::DoUpdate { assignments, .. } => assignments
544                                    .iter()
545                                    .map(|(col_idx, _)| table_schema.columns[*col_idx].name.clone())
546                                    .collect(),
547                                _ => Vec::new(),
548                            };
549                            super::triggers::fire_row_triggers(
550                                &mut wtx,
551                                schema,
552                                &table_schema.name,
553                                crate::parser::TriggerTiming::After,
554                                super::triggers::FireEvent::Update {
555                                    changed_columns: &changed_cols,
556                                },
557                                Some(old),
558                                Some(new),
559                                &table_schema.columns,
560                            )?;
561                        }
562                    }
563                    InsertRowOutcome::Skipped => {}
564                }
565            }
566        }
567    }
568
569    if has_insert_statement_triggers {
570        super::triggers::fire_statement_triggers(
571            &mut wtx,
572            schema,
573            &table_schema.name,
574            crate::parser::TriggerTiming::After,
575            super::triggers::FireEvent::Insert,
576            &table_schema.columns,
577            &[],
578            &stmt_new_rows,
579        )?;
580    }
581
582    mark_insert_dml(
583        schema,
584        &table_schema.name,
585        !plain_insert,
586        single_int_pk,
587        min_inserted_pk,
588        count,
589    );
590
591    if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
592        let qr = super::helpers::project_returning(table_schema, returning_cols, &rows)?;
593        super::helpers::drain_deferred_fk_checks(&mut wtx)?;
594        wtx.commit().map_err(SqlError::Storage)?;
595        return Ok(ExecutionResult::Query(qr));
596    }
597
598    super::helpers::drain_deferred_fk_checks(&mut wtx)?;
599    wtx.commit().map_err(SqlError::Storage)?;
600    Ok(ExecutionResult::RowsAffected(count))
601}
602
603pub(super) fn has_subquery(expr: &Expr) -> bool {
604    crate::parser::has_subquery(expr)
605}
606
607pub(super) fn stmt_has_subquery(stmt: &SelectStmt) -> bool {
608    if let Some(ref w) = stmt.where_clause {
609        if has_subquery(w) {
610            return true;
611        }
612    }
613    if let Some(ref h) = stmt.having {
614        if has_subquery(h) {
615            return true;
616        }
617    }
618    for col in &stmt.columns {
619        if let SelectColumn::Expr { expr, .. } = col {
620            if has_subquery(expr) {
621                return true;
622            }
623        }
624    }
625    for ob in &stmt.order_by {
626        if has_subquery(&ob.expr) {
627            return true;
628        }
629    }
630    for gb in &stmt.group_by {
631        if has_subquery(gb) {
632            return true;
633        }
634    }
635    for join in &stmt.joins {
636        if let Some(ref on_expr) = join.on_clause {
637            if has_subquery(on_expr) {
638                return true;
639            }
640        }
641    }
642    false
643}
644
645pub(super) fn materialize_expr(
646    expr: &Expr,
647    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
648) -> Result<Expr> {
649    match expr {
650        Expr::InSubquery {
651            expr: e,
652            subquery,
653            negated,
654        } => {
655            let inner = materialize_expr(e, exec_sub)?;
656            let qr = exec_sub(subquery)?;
657            if !qr.columns.is_empty() && qr.columns.len() != 1 {
658                return Err(SqlError::SubqueryMultipleColumns);
659            }
660            let mut values = rustc_hash::FxHashSet::default();
661            let mut has_null = false;
662            for row in &qr.rows {
663                if row[0].is_null() {
664                    has_null = true;
665                } else {
666                    values.insert(row[0].clone());
667                }
668            }
669            Ok(Expr::InSet {
670                expr: Box::new(inner),
671                values,
672                has_null,
673                negated: *negated,
674            })
675        }
676        Expr::ScalarSubquery(subquery) => {
677            let qr = exec_sub(subquery)?;
678            if qr.rows.len() > 1 {
679                return Err(SqlError::SubqueryMultipleRows);
680            }
681            let val = if qr.rows.is_empty() {
682                Value::Null
683            } else {
684                qr.rows[0][0].clone()
685            };
686            Ok(Expr::Literal(val))
687        }
688        Expr::Exists { subquery, negated } => {
689            let qr = exec_sub(subquery)?;
690            let exists = !qr.rows.is_empty();
691            let result = if *negated { !exists } else { exists };
692            Ok(Expr::Literal(Value::Boolean(result)))
693        }
694        Expr::InList {
695            expr: e,
696            list,
697            negated,
698        } => {
699            let inner = materialize_expr(e, exec_sub)?;
700            let items = list
701                .iter()
702                .map(|item| materialize_expr(item, exec_sub))
703                .collect::<Result<Vec<_>>>()?;
704            Ok(Expr::InList {
705                expr: Box::new(inner),
706                list: items,
707                negated: *negated,
708            })
709        }
710        Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
711            left: Box::new(materialize_expr(left, exec_sub)?),
712            op: *op,
713            right: Box::new(materialize_expr(right, exec_sub)?),
714        }),
715        Expr::UnaryOp { op, expr: e } => Ok(Expr::UnaryOp {
716            op: *op,
717            expr: Box::new(materialize_expr(e, exec_sub)?),
718        }),
719        Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(materialize_expr(e, exec_sub)?))),
720        Expr::IsNotNull(e) => Ok(Expr::IsNotNull(Box::new(materialize_expr(e, exec_sub)?))),
721        Expr::InSet {
722            expr: e,
723            values,
724            has_null,
725            negated,
726        } => Ok(Expr::InSet {
727            expr: Box::new(materialize_expr(e, exec_sub)?),
728            values: values.clone(),
729            has_null: *has_null,
730            negated: *negated,
731        }),
732        Expr::Between {
733            expr: e,
734            low,
735            high,
736            negated,
737        } => Ok(Expr::Between {
738            expr: Box::new(materialize_expr(e, exec_sub)?),
739            low: Box::new(materialize_expr(low, exec_sub)?),
740            high: Box::new(materialize_expr(high, exec_sub)?),
741            negated: *negated,
742        }),
743        Expr::Like {
744            expr: e,
745            pattern,
746            escape,
747            negated,
748        } => {
749            let esc = escape
750                .as_ref()
751                .map(|es| materialize_expr(es, exec_sub).map(Box::new))
752                .transpose()?;
753            Ok(Expr::Like {
754                expr: Box::new(materialize_expr(e, exec_sub)?),
755                pattern: Box::new(materialize_expr(pattern, exec_sub)?),
756                escape: esc,
757                negated: *negated,
758            })
759        }
760        Expr::Case {
761            operand,
762            conditions,
763            else_result,
764        } => {
765            let op = operand
766                .as_ref()
767                .map(|e| materialize_expr(e, exec_sub).map(Box::new))
768                .transpose()?;
769            let conds = conditions
770                .iter()
771                .map(|(c, r)| {
772                    Ok((
773                        materialize_expr(c, exec_sub)?,
774                        materialize_expr(r, exec_sub)?,
775                    ))
776                })
777                .collect::<Result<Vec<_>>>()?;
778            let else_r = else_result
779                .as_ref()
780                .map(|e| materialize_expr(e, exec_sub).map(Box::new))
781                .transpose()?;
782            Ok(Expr::Case {
783                operand: op,
784                conditions: conds,
785                else_result: else_r,
786            })
787        }
788        Expr::Coalesce(args) => {
789            let materialized = args
790                .iter()
791                .map(|a| materialize_expr(a, exec_sub))
792                .collect::<Result<Vec<_>>>()?;
793            Ok(Expr::Coalesce(materialized))
794        }
795        Expr::Cast { expr: e, data_type } => Ok(Expr::Cast {
796            expr: Box::new(materialize_expr(e, exec_sub)?),
797            data_type: *data_type,
798        }),
799        Expr::Function {
800            name,
801            args,
802            distinct,
803        } => {
804            let materialized = args
805                .iter()
806                .map(|a| materialize_expr(a, exec_sub))
807                .collect::<Result<Vec<_>>>()?;
808            Ok(Expr::Function {
809                name: name.clone(),
810                args: materialized,
811                distinct: *distinct,
812            })
813        }
814        other => Ok(other.clone()),
815    }
816}
817
818pub(super) fn materialize_stmt(
819    stmt: &SelectStmt,
820    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
821) -> Result<SelectStmt> {
822    let where_clause = stmt
823        .where_clause
824        .as_ref()
825        .map(|e| materialize_expr(e, exec_sub))
826        .transpose()?;
827    let having = stmt
828        .having
829        .as_ref()
830        .map(|e| materialize_expr(e, exec_sub))
831        .transpose()?;
832    let columns = stmt
833        .columns
834        .iter()
835        .map(|c| match c {
836            SelectColumn::AllColumns => Ok(SelectColumn::AllColumns),
837            SelectColumn::AllFromOld => Ok(SelectColumn::AllFromOld),
838            SelectColumn::AllFromNew => Ok(SelectColumn::AllFromNew),
839            SelectColumn::Expr { expr, alias } => Ok(SelectColumn::Expr {
840                expr: materialize_expr(expr, exec_sub)?,
841                alias: alias.clone(),
842            }),
843        })
844        .collect::<Result<Vec<_>>>()?;
845    let order_by = stmt
846        .order_by
847        .iter()
848        .map(|ob| {
849            Ok(OrderByItem {
850                expr: materialize_expr(&ob.expr, exec_sub)?,
851                descending: ob.descending,
852                nulls_first: ob.nulls_first,
853            })
854        })
855        .collect::<Result<Vec<_>>>()?;
856    let joins = stmt
857        .joins
858        .iter()
859        .map(|j| {
860            let on_clause = j
861                .on_clause
862                .as_ref()
863                .map(|e| materialize_expr(e, exec_sub))
864                .transpose()?;
865            Ok(JoinClause {
866                join_type: j.join_type,
867                table: j.table.clone(),
868                subquery: j.subquery.clone(),
869                on_clause,
870            })
871        })
872        .collect::<Result<Vec<_>>>()?;
873    let group_by = stmt
874        .group_by
875        .iter()
876        .map(|e| materialize_expr(e, exec_sub))
877        .collect::<Result<Vec<_>>>()?;
878    Ok(SelectStmt {
879        columns,
880        from: stmt.from.clone(),
881        from_alias: stmt.from_alias.clone(),
882        from_subquery: stmt.from_subquery.clone(),
883        from_args: stmt.from_args.clone(),
884        from_json_table: stmt.from_json_table.clone(),
885        joins,
886        distinct: stmt.distinct,
887        where_clause,
888        order_by,
889        limit: stmt.limit.clone(),
890        offset: stmt.offset.clone(),
891        group_by,
892        having,
893    })
894}
895
896pub(super) fn exec_subquery_read(
897    db: &Database,
898    schema: &SchemaManager,
899    stmt: &SelectStmt,
900    ctes: &CteContext,
901) -> Result<QueryResult> {
902    let mut rtx = db.begin_read();
903    exec_subquery_with_read(&mut rtx, schema, stmt, ctes)
904}
905
906pub(super) fn exec_subquery_with_read(
907    rtx: &mut ReadTxn<'_>,
908    schema: &SchemaManager,
909    stmt: &SelectStmt,
910    ctes: &CteContext,
911) -> Result<QueryResult> {
912    match super::exec_select_with_read(rtx, schema, stmt, ctes)? {
913        ExecutionResult::Query(qr) => Ok(qr),
914        _ => Ok(QueryResult {
915            columns: vec![],
916            rows: vec![],
917        }),
918    }
919}
920
921pub(super) fn exec_subquery_write(
922    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
923    schema: &SchemaManager,
924    stmt: &SelectStmt,
925    ctes: &CteContext,
926) -> Result<QueryResult> {
927    match super::exec_select_in_txn(wtx, schema, stmt, ctes)? {
928        ExecutionResult::Query(qr) => Ok(qr),
929        _ => Ok(QueryResult {
930            columns: vec![],
931            rows: vec![],
932        }),
933    }
934}
935
936pub(super) fn update_has_subquery(stmt: &UpdateStmt) -> bool {
937    stmt.where_clause.as_ref().is_some_and(has_subquery)
938        || stmt.assignments.iter().any(|(_, e)| has_subquery(e))
939}
940
941pub(super) fn materialize_update(
942    stmt: &UpdateStmt,
943    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
944) -> Result<UpdateStmt> {
945    let where_clause = stmt
946        .where_clause
947        .as_ref()
948        .map(|e| materialize_expr(e, exec_sub))
949        .transpose()?;
950    let assignments = stmt
951        .assignments
952        .iter()
953        .map(|(name, expr)| Ok((name.clone(), materialize_expr(expr, exec_sub)?)))
954        .collect::<Result<Vec<_>>>()?;
955    Ok(UpdateStmt {
956        table: stmt.table.clone(),
957        assignments,
958        where_clause,
959        returning: stmt.returning.clone(),
960    })
961}
962
963pub(super) fn delete_has_subquery(stmt: &DeleteStmt) -> bool {
964    stmt.where_clause.as_ref().is_some_and(has_subquery)
965}
966
967pub(super) fn materialize_delete(
968    stmt: &DeleteStmt,
969    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
970) -> Result<DeleteStmt> {
971    let where_clause = stmt
972        .where_clause
973        .as_ref()
974        .map(|e| materialize_expr(e, exec_sub))
975        .transpose()?;
976    Ok(DeleteStmt {
977        table: stmt.table.clone(),
978        where_clause,
979        returning: stmt.returning.clone(),
980    })
981}
982
983pub(super) fn insert_has_subquery(stmt: &InsertStmt) -> bool {
984    match &stmt.source {
985        InsertSource::Values(rows) => rows.iter().any(|row| row.iter().any(has_subquery)),
986        // SELECT source subqueries are handled by exec_select's correlated/non-correlated paths
987        InsertSource::Select(_) => false,
988    }
989}
990
991pub(super) fn materialize_insert(
992    stmt: &InsertStmt,
993    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
994) -> Result<InsertStmt> {
995    let source = match &stmt.source {
996        InsertSource::Values(rows) => {
997            let mat = rows
998                .iter()
999                .map(|row| {
1000                    row.iter()
1001                        .map(|e| materialize_expr(e, exec_sub))
1002                        .collect::<Result<Vec<_>>>()
1003                })
1004                .collect::<Result<Vec<_>>>()?;
1005            InsertSource::Values(mat)
1006        }
1007        InsertSource::Select(sq) => {
1008            let ctes = sq
1009                .ctes
1010                .iter()
1011                .map(|c| {
1012                    Ok(CteDefinition {
1013                        name: c.name.clone(),
1014                        column_aliases: c.column_aliases.clone(),
1015                        body: materialize_query_body(&c.body, exec_sub)?,
1016                    })
1017                })
1018                .collect::<Result<Vec<_>>>()?;
1019            let body = materialize_query_body(&sq.body, exec_sub)?;
1020            InsertSource::Select(Box::new(SelectQuery {
1021                ctes,
1022                recursive: sq.recursive,
1023                body,
1024            }))
1025        }
1026    };
1027    Ok(InsertStmt {
1028        table: stmt.table.clone(),
1029        columns: stmt.columns.clone(),
1030        source,
1031        on_conflict: stmt.on_conflict.clone(),
1032        returning: stmt.returning.clone(),
1033    })
1034}
1035
1036pub(super) fn materialize_query_body(
1037    body: &QueryBody,
1038    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1039) -> Result<QueryBody> {
1040    match body {
1041        QueryBody::Select(sel) => Ok(QueryBody::Select(Box::new(materialize_stmt(
1042            sel, exec_sub,
1043        )?))),
1044        QueryBody::Compound(comp) => Ok(QueryBody::Compound(Box::new(CompoundSelect {
1045            op: comp.op.clone(),
1046            all: comp.all,
1047            left: Box::new(materialize_query_body(&comp.left, exec_sub)?),
1048            right: Box::new(materialize_query_body(&comp.right, exec_sub)?),
1049            order_by: comp.order_by.clone(),
1050            limit: comp.limit.clone(),
1051            offset: comp.offset.clone(),
1052        }))),
1053        QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Ok(body.clone()),
1054    }
1055}
1056
1057pub(super) fn exec_query_body_with_read(
1058    rtx: &mut ReadTxn<'_>,
1059    schema: &SchemaManager,
1060    body: &QueryBody,
1061    ctes: &CteContext,
1062) -> Result<ExecutionResult> {
1063    match body {
1064        QueryBody::Select(sel) => super::exec_select_with_read(rtx, schema, sel, ctes),
1065        QueryBody::Compound(comp) => exec_compound_select_with_read(rtx, schema, comp, ctes),
1066        QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Err(
1067            SqlError::Unsupported("DML CTE bodies require an active write transaction".into()),
1068        ),
1069    }
1070}
1071
1072pub(super) fn exec_query_body_in_txn(
1073    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1074    schema: &SchemaManager,
1075    body: &QueryBody,
1076    ctes: &CteContext,
1077) -> Result<ExecutionResult> {
1078    match body {
1079        QueryBody::Select(sel) => super::exec_select_in_txn(wtx, schema, sel, ctes),
1080        QueryBody::Compound(comp) => exec_compound_select_in_txn(wtx, schema, comp, ctes),
1081        QueryBody::Insert(ins) => exec_insert_in_txn_with_ctes(wtx, schema, ins, &[], ctes),
1082        QueryBody::Update(upd) => super::exec_update_in_txn(wtx, schema, upd),
1083        QueryBody::Delete(del) => super::exec_delete_in_txn(wtx, schema, del),
1084    }
1085}
1086
1087pub(super) fn exec_query_body_read(
1088    db: &Database,
1089    schema: &SchemaManager,
1090    body: &QueryBody,
1091    ctes: &CteContext,
1092) -> Result<QueryResult> {
1093    let mut rtx = db.begin_read();
1094    exec_query_body_with_read_qr(&mut rtx, schema, body, ctes)
1095}
1096
1097pub(super) fn exec_query_body_with_read_qr(
1098    rtx: &mut ReadTxn<'_>,
1099    schema: &SchemaManager,
1100    body: &QueryBody,
1101    ctes: &CteContext,
1102) -> Result<QueryResult> {
1103    match exec_query_body_with_read(rtx, schema, body, ctes)? {
1104        ExecutionResult::Query(qr) => Ok(qr),
1105        _ => Ok(QueryResult {
1106            columns: vec![],
1107            rows: vec![],
1108        }),
1109    }
1110}
1111
1112pub(super) fn exec_query_body_write(
1113    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1114    schema: &SchemaManager,
1115    body: &QueryBody,
1116    ctes: &CteContext,
1117) -> Result<QueryResult> {
1118    match exec_query_body_in_txn(wtx, schema, body, ctes)? {
1119        ExecutionResult::Query(qr) => Ok(qr),
1120        _ => Ok(QueryResult {
1121            columns: vec![],
1122            rows: vec![],
1123        }),
1124    }
1125}
1126
1127pub(super) fn exec_compound_select_with_read(
1128    rtx: &mut ReadTxn<'_>,
1129    schema: &SchemaManager,
1130    comp: &CompoundSelect,
1131    ctes: &CteContext,
1132) -> Result<ExecutionResult> {
1133    let left_qr = match exec_query_body_with_read(rtx, schema, &comp.left, ctes)? {
1134        ExecutionResult::Query(qr) => qr,
1135        _ => QueryResult {
1136            columns: vec![],
1137            rows: vec![],
1138        },
1139    };
1140    let right_qr = match exec_query_body_with_read(rtx, schema, &comp.right, ctes)? {
1141        ExecutionResult::Query(qr) => qr,
1142        _ => QueryResult {
1143            columns: vec![],
1144            rows: vec![],
1145        },
1146    };
1147    apply_set_operation(comp, left_qr, right_qr)
1148}
1149
1150pub(super) fn exec_compound_select_in_txn(
1151    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1152    schema: &SchemaManager,
1153    comp: &CompoundSelect,
1154    ctes: &CteContext,
1155) -> Result<ExecutionResult> {
1156    let left_qr = match exec_query_body_in_txn(wtx, schema, &comp.left, ctes)? {
1157        ExecutionResult::Query(qr) => qr,
1158        _ => QueryResult {
1159            columns: vec![],
1160            rows: vec![],
1161        },
1162    };
1163    let right_qr = match exec_query_body_in_txn(wtx, schema, &comp.right, ctes)? {
1164        ExecutionResult::Query(qr) => qr,
1165        _ => QueryResult {
1166            columns: vec![],
1167            rows: vec![],
1168        },
1169    };
1170    apply_set_operation(comp, left_qr, right_qr)
1171}
1172
1173pub(super) fn apply_set_operation(
1174    comp: &CompoundSelect,
1175    left_qr: QueryResult,
1176    right_qr: QueryResult,
1177) -> Result<ExecutionResult> {
1178    if !left_qr.columns.is_empty()
1179        && !right_qr.columns.is_empty()
1180        && left_qr.columns.len() != right_qr.columns.len()
1181    {
1182        return Err(SqlError::CompoundColumnCountMismatch {
1183            left: left_qr.columns.len(),
1184            right: right_qr.columns.len(),
1185        });
1186    }
1187
1188    let columns = left_qr.columns;
1189
1190    let mut rows = match (&comp.op, comp.all) {
1191        (SetOp::Union, true) => {
1192            let total = left_qr.rows.len().saturating_add(right_qr.rows.len());
1193            let mut rows = Vec::with_capacity(total);
1194            rows.extend(left_qr.rows);
1195            rows.extend(right_qr.rows);
1196            rows
1197        }
1198        (SetOp::Union, false) => {
1199            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1200            let mut rows = Vec::new();
1201            for row in left_qr.rows.into_iter().chain(right_qr.rows) {
1202                if !seen.contains(&row) {
1203                    seen.insert(row.clone());
1204                    rows.push(row);
1205                }
1206            }
1207            rows
1208        }
1209        (SetOp::Intersect, true) => {
1210            let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1211            for row in &right_qr.rows {
1212                *right_counts.entry(row.clone()).or_insert(0) += 1;
1213            }
1214            let mut rows = Vec::new();
1215            for row in left_qr.rows {
1216                if let Some(count) = right_counts.get_mut(&row) {
1217                    if *count > 0 {
1218                        *count -= 1;
1219                        rows.push(row);
1220                    }
1221                }
1222            }
1223            rows
1224        }
1225        (SetOp::Intersect, false) => {
1226            let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1227            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1228            let mut rows = Vec::new();
1229            for row in left_qr.rows {
1230                if right_set.contains(&row) && !seen.contains(&row) {
1231                    seen.insert(row.clone());
1232                    rows.push(row);
1233                }
1234            }
1235            rows
1236        }
1237        (SetOp::Except, true) => {
1238            let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1239            for row in &right_qr.rows {
1240                *right_counts.entry(row.clone()).or_insert(0) += 1;
1241            }
1242            let mut rows = Vec::new();
1243            for row in left_qr.rows {
1244                if let Some(count) = right_counts.get_mut(&row) {
1245                    if *count > 0 {
1246                        *count -= 1;
1247                        continue;
1248                    }
1249                }
1250                rows.push(row);
1251            }
1252            rows
1253        }
1254        (SetOp::Except, false) => {
1255            let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1256            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1257            let mut rows = Vec::new();
1258            for row in left_qr.rows {
1259                if !right_set.contains(&row) && !seen.contains(&row) {
1260                    seen.insert(row.clone());
1261                    rows.push(row);
1262                }
1263            }
1264            rows
1265        }
1266    };
1267
1268    if !comp.order_by.is_empty() {
1269        let col_defs: Vec<crate::types::ColumnDef> = columns
1270            .iter()
1271            .enumerate()
1272            .map(|(i, name)| crate::types::ColumnDef {
1273                name: name.clone(),
1274                data_type: crate::types::DataType::Null,
1275                nullable: true,
1276                position: i as u16,
1277                default_expr: None,
1278                default_sql: None,
1279                check_expr: None,
1280                check_sql: None,
1281                check_name: None,
1282                is_with_timezone: false,
1283                generated_expr: None,
1284                generated_sql: None,
1285                generated_kind: None,
1286                collation: crate::types::Collation::Binary,
1287            })
1288            .collect();
1289        sort_rows(&mut rows, &comp.order_by, &col_defs)?;
1290    }
1291
1292    if let Some(ref offset_expr) = comp.offset {
1293        let offset = eval_const_int(offset_expr)?.max(0) as usize;
1294        if offset < rows.len() {
1295            rows = rows.split_off(offset);
1296        } else {
1297            rows.clear();
1298        }
1299    }
1300
1301    if let Some(ref limit_expr) = comp.limit {
1302        let limit = eval_const_int(limit_expr)?.max(0) as usize;
1303        rows.truncate(limit);
1304    }
1305
1306    Ok(ExecutionResult::Query(QueryResult { columns, rows }))
1307}
1308
1309struct InsertBufs {
1310    row: Vec<Value>,
1311    pk_values: Vec<Value>,
1312    value_values: Vec<Value>,
1313    key_buf: Vec<u8>,
1314    value_buf: Vec<u8>,
1315    col_indices: Vec<usize>,
1316    fk_key_buf: Vec<u8>,
1317}
1318
1319impl InsertBufs {
1320    fn new() -> Self {
1321        Self {
1322            row: Vec::new(),
1323            pk_values: Vec::new(),
1324            value_values: Vec::new(),
1325            key_buf: Vec::with_capacity(64),
1326            value_buf: Vec::with_capacity(256),
1327            col_indices: Vec::new(),
1328            fk_key_buf: Vec::with_capacity(64),
1329        }
1330    }
1331}
1332
1333thread_local! {
1334    static INSERT_SCRATCH: RefCell<InsertBufs> = RefCell::new(InsertBufs::new());
1335    static UPSERT_SCRATCH: RefCell<UpsertBufs> = RefCell::new(UpsertBufs::new());
1336}
1337
1338fn with_insert_scratch<R>(f: impl FnOnce(&mut InsertBufs) -> R) -> R {
1339    INSERT_SCRATCH.with(|slot| match slot.try_borrow_mut() {
1340        Ok(mut borrowed) => f(&mut borrowed),
1341        Err(_) => {
1342            let mut local = InsertBufs::new();
1343            f(&mut local)
1344        }
1345    })
1346}
1347
1348pub(super) struct UpsertBufs {
1349    old_row: Vec<Value>,
1350    new_row: Vec<Value>,
1351    value_values: Vec<Value>,
1352    new_value_buf: Vec<u8>,
1353}
1354
1355impl UpsertBufs {
1356    pub(super) fn new() -> Self {
1357        Self {
1358            old_row: Vec::new(),
1359            new_row: Vec::new(),
1360            value_values: Vec::new(),
1361            new_value_buf: Vec::with_capacity(256),
1362        }
1363    }
1364}
1365
1366pub fn exec_insert_in_txn(
1367    wtx: &mut WriteTxn<'_>,
1368    schema: &SchemaManager,
1369    stmt: &InsertStmt,
1370    params: &[Value],
1371) -> Result<ExecutionResult> {
1372    with_insert_scratch(|bufs| {
1373        exec_insert_in_txn_impl(
1374            wtx,
1375            schema,
1376            stmt,
1377            params,
1378            bufs,
1379            None,
1380            &CteContext::default(),
1381        )
1382    })
1383}
1384
1385pub(super) fn exec_insert_in_txn_with_ctes(
1386    wtx: &mut WriteTxn<'_>,
1387    schema: &SchemaManager,
1388    stmt: &InsertStmt,
1389    params: &[Value],
1390    outer_ctes: &CteContext,
1391) -> Result<ExecutionResult> {
1392    with_insert_scratch(|bufs| {
1393        exec_insert_in_txn_impl(wtx, schema, stmt, params, bufs, None, outer_ctes)
1394    })
1395}
1396
1397fn exec_insert_in_txn_cached(
1398    wtx: &mut WriteTxn<'_>,
1399    schema: &SchemaManager,
1400    stmt: &InsertStmt,
1401    params: &[Value],
1402    cache: &InsertCache,
1403) -> Result<ExecutionResult> {
1404    with_insert_scratch(|bufs| {
1405        exec_insert_in_txn_impl(
1406            wtx,
1407            schema,
1408            stmt,
1409            params,
1410            bufs,
1411            Some(cache),
1412            &CteContext::default(),
1413        )
1414    })
1415}
1416
1417fn exec_insert_in_txn_impl(
1418    wtx: &mut WriteTxn<'_>,
1419    schema: &SchemaManager,
1420    stmt: &InsertStmt,
1421    params: &[Value],
1422    bufs: &mut InsertBufs,
1423    cache: Option<&InsertCache>,
1424    outer_ctes: &CteContext,
1425) -> Result<ExecutionResult> {
1426    let empty_ctes = CteContext::default();
1427    let materialized;
1428    let has_sub = match cache {
1429        Some(c) => c.has_subquery,
1430        None => insert_has_subquery(stmt),
1431    };
1432    let stmt = if has_sub {
1433        materialized = materialize_insert(stmt, &mut |sub| {
1434            exec_subquery_write(wtx, schema, sub, &empty_ctes)
1435        })?;
1436        &materialized
1437    } else {
1438        stmt
1439    };
1440
1441    let view_lookup_key = stmt.table.to_ascii_lowercase();
1442    if let Some(view_def) = schema.get_view(&view_lookup_key) {
1443        if super::triggers::has_instead_of(
1444            schema,
1445            &view_lookup_key,
1446            super::triggers::FireEvent::Insert,
1447        ) {
1448            let aliases = view_def.column_aliases.clone();
1449            return exec_instead_of_view_insert_in_txn(
1450                wtx,
1451                schema,
1452                &view_lookup_key,
1453                &aliases,
1454                stmt,
1455                params,
1456            );
1457        }
1458        return Err(SqlError::CannotModifyView(stmt.table.clone()));
1459    }
1460    if schema.get_matview(&view_lookup_key).is_some() {
1461        return Err(SqlError::CannotModifyView(format!(
1462            "materialized view '{}' is read-only — use REFRESH MATERIALIZED VIEW",
1463            stmt.table
1464        )));
1465    }
1466
1467    let table_schema = schema
1468        .get(&stmt.table)
1469        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1470    if table_schema.has_ann_index() {
1471        super::ann_persist::purge_segment(wtx, &table_schema.name)?;
1472    }
1473
1474    let default_columns;
1475    let insert_columns: &[String] = if stmt.columns.is_empty() {
1476        default_columns = table_schema
1477            .columns
1478            .iter()
1479            .map(|c| c.name.clone())
1480            .collect::<Vec<_>>();
1481        &default_columns
1482    } else {
1483        &stmt.columns
1484    };
1485
1486    bufs.col_indices.clear();
1487    if let Some(c) = cache {
1488        bufs.col_indices.extend_from_slice(&c.col_indices);
1489    } else {
1490        for name in insert_columns {
1491            bufs.col_indices.push(
1492                table_schema
1493                    .column_index(name)
1494                    .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?,
1495            );
1496        }
1497    }
1498
1499    if cache.is_none() {
1500        for &ci in &bufs.col_indices {
1501            if table_schema.columns[ci].generated_kind.is_some() {
1502                return Err(SqlError::CannotInsertIntoGeneratedColumn(
1503                    table_schema.columns[ci].name.clone(),
1504                ));
1505            }
1506        }
1507    }
1508
1509    let generated_cols_uncached: Vec<(usize, &Expr, FastGenEval)>;
1510    let cached_gen_positions: &[usize];
1511    let cached_gen_fast_evals: &[FastGenEval];
1512    if let Some(c) = cache {
1513        cached_gen_positions = &c.generated_col_positions;
1514        cached_gen_fast_evals = &c.generated_fast_evals;
1515        generated_cols_uncached = Vec::new();
1516    } else {
1517        cached_gen_positions = &[];
1518        cached_gen_fast_evals = &[];
1519        generated_cols_uncached = table_schema
1520            .columns
1521            .iter()
1522            .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
1523            .map(|c| {
1524                let expr = c.generated_expr.as_ref().unwrap();
1525                let fe = detect_fast_gen_eval(expr, table_schema);
1526                (c.position as usize, expr, fe)
1527            })
1528            .collect();
1529    }
1530    let has_gen_cols = !cached_gen_positions.is_empty() || !generated_cols_uncached.is_empty();
1531    let row_col_map_for_gen: Option<&ColumnMap> = has_gen_cols.then(|| table_schema.column_map());
1532
1533    let any_defaults = match cache {
1534        Some(c) => c.any_defaults,
1535        None => table_schema
1536            .columns
1537            .iter()
1538            .any(|c| c.default_expr.is_some()),
1539    };
1540    let defaults: Vec<(usize, &Expr)> = if any_defaults {
1541        table_schema
1542            .columns
1543            .iter()
1544            .filter(|c| {
1545                c.default_expr.is_some() && !bufs.col_indices.contains(&(c.position as usize))
1546            })
1547            .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
1548            .collect()
1549    } else {
1550        Vec::new()
1551    };
1552
1553    let has_checks = match cache {
1554        Some(c) => c.has_checks,
1555        None => table_schema.has_checks(),
1556    };
1557    let check_col_map = if has_checks {
1558        Some(table_schema.column_map())
1559    } else {
1560        None
1561    };
1562
1563    let (pk_indices, non_pk, enc_pos, phys_count, dropped): (
1564        &[usize],
1565        &[usize],
1566        &[u16],
1567        usize,
1568        &[u16],
1569    ) = if let Some(c) = cache {
1570        (
1571            &c.pk_indices,
1572            &c.non_pk_indices,
1573            &c.encoding_positions,
1574            c.phys_count,
1575            &c.dropped_non_pk_slots,
1576        )
1577    } else {
1578        (
1579            table_schema.pk_indices(),
1580            table_schema.non_pk_indices(),
1581            table_schema.encoding_positions(),
1582            table_schema.physical_non_pk_count(),
1583            table_schema.dropped_non_pk_slots(),
1584        )
1585    };
1586
1587    bufs.row.resize(table_schema.columns.len(), Value::Null);
1588    bufs.pk_values.resize(pk_indices.len(), Value::Null);
1589    bufs.value_values.resize(phys_count, Value::Null);
1590
1591    let table_bytes = table_schema.name.as_bytes();
1592    let has_fks = !table_schema.foreign_keys.is_empty();
1593    let has_indices = !table_schema.indices.is_empty();
1594    let has_defaults = !defaults.is_empty();
1595
1596    let compiled_conflict: Option<Arc<CompiledOnConflict>> = match (cache, &stmt.on_conflict) {
1597        (Some(c), Some(_)) if c.on_conflict.is_some() => c.on_conflict.clone(),
1598        (_, Some(oc)) => Some(Arc::new(compile_on_conflict(oc, table_schema)?)),
1599        (_, None) => None,
1600    };
1601
1602    let row_col_map: Option<&ColumnMap> = compiled_conflict
1603        .is_some()
1604        .then(|| table_schema.column_map());
1605
1606    let select_rows = match &stmt.source {
1607        InsertSource::Select(sq) => {
1608            let insert_ctes = super::materialize_all_ctes_with_outer(
1609                &sq.ctes,
1610                sq.recursive,
1611                outer_ctes,
1612                &mut |body, ctx| exec_query_body_write(wtx, schema, body, ctx),
1613            )?;
1614            let qr = exec_query_body_write(wtx, schema, &sq.body, &insert_ctes)?;
1615            Some(qr.rows)
1616        }
1617        InsertSource::Values(_) => None,
1618    };
1619
1620    let mut count: u64 = 0;
1621    let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
1622        stmt.returning.as_ref().map(|_| Vec::new());
1623
1624    let plain_insert = compiled_conflict.is_none();
1625    let single_int_pk = is_single_int_pk(table_schema);
1626    let mut min_inserted_pk: Option<i64> = None;
1627
1628    let values = match &stmt.source {
1629        InsertSource::Values(rows) => Some(rows.as_slice()),
1630        InsertSource::Select(_) => None,
1631    };
1632    let sel_rows = select_rows.as_deref();
1633
1634    let total = match (values, sel_rows) {
1635        (Some(rows), _) => rows.len(),
1636        (_, Some(rows)) => rows.len(),
1637        _ => 0,
1638    };
1639
1640    if let Some(sel) = sel_rows {
1641        if !sel.is_empty() && sel[0].len() != insert_columns.len() {
1642            return Err(SqlError::InvalidValue(format!(
1643                "INSERT ... SELECT column count mismatch: expected {}, got {}",
1644                insert_columns.len(),
1645                sel[0].len()
1646            )));
1647        }
1648    }
1649
1650    let has_insert_statement_triggers_impl =
1651        schema.triggers_for(&table_schema.name).iter().any(|t| {
1652            t.enabled
1653                && t.granularity == crate::parser::TriggerGranularity::ForEachStatement
1654                && t.events
1655                    .iter()
1656                    .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1657        });
1658    let mut stmt_new_rows_impl: Vec<Vec<Value>> = if has_insert_statement_triggers_impl {
1659        Vec::with_capacity(total)
1660    } else {
1661        Vec::new()
1662    };
1663    if has_insert_statement_triggers_impl {
1664        super::triggers::fire_statement_triggers(
1665            wtx,
1666            schema,
1667            &table_schema.name,
1668            crate::parser::TriggerTiming::Before,
1669            super::triggers::FireEvent::Insert,
1670            &table_schema.columns,
1671            &[],
1672            &[],
1673        )?;
1674    }
1675
1676    let (has_before_insert_triggers, has_after_insert_triggers, has_after_update_triggers) =
1677        row_insert_trigger_flags(schema, &table_schema.name);
1678
1679    let skip_row_clear = cache.is_some_and(|c| c.row_fully_overwritten);
1680    for idx in 0..total {
1681        if !skip_row_clear {
1682            for v in bufs.row.iter_mut() {
1683                *v = Value::Null;
1684            }
1685        }
1686
1687        if let Some(value_rows) = values {
1688            if let Some(plan) = cache.and_then(|c| c.bind_plan.as_ref()) {
1689                for action in plan {
1690                    match action {
1691                        BindAction::Param {
1692                            param_idx,
1693                            col_idx,
1694                            target,
1695                        } => {
1696                            let v = &params[*param_idx];
1697                            bufs.row[*col_idx] = if v.is_null() {
1698                                Value::Null
1699                            } else if v.data_type() == *target {
1700                                v.clone()
1701                            } else {
1702                                let got = v.data_type();
1703                                v.clone().coerce_into(*target).ok_or_else(|| {
1704                                    SqlError::TypeMismatch {
1705                                        expected: target.to_string(),
1706                                        got: got.to_string(),
1707                                    }
1708                                })?
1709                            };
1710                        }
1711                        BindAction::Literal { value, col_idx } => {
1712                            bufs.row[*col_idx] = value.clone();
1713                        }
1714                    }
1715                }
1716            } else {
1717                let value_row = &value_rows[idx];
1718                if value_row.len() != insert_columns.len() {
1719                    return Err(SqlError::InvalidValue(format!(
1720                        "expected {} values, got {}",
1721                        insert_columns.len(),
1722                        value_row.len()
1723                    )));
1724                }
1725                for (i, expr) in value_row.iter().enumerate() {
1726                    let val = match expr {
1727                        Expr::Parameter(n) => params
1728                            .get(n - 1)
1729                            .cloned()
1730                            .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
1731                        Expr::Literal(v) => v.clone(),
1732                        _ => eval_const_expr(expr)?,
1733                    };
1734                    let col_idx = bufs.col_indices[i];
1735                    let col = &table_schema.columns[col_idx];
1736                    let got_type = val.data_type();
1737                    bufs.row[col_idx] = if val.is_null() {
1738                        Value::Null
1739                    } else {
1740                        val.coerce_into(col.data_type)
1741                            .ok_or_else(|| SqlError::TypeMismatch {
1742                                expected: col.data_type.to_string(),
1743                                got: got_type.to_string(),
1744                            })?
1745                    };
1746                }
1747            }
1748        } else if let Some(sel) = sel_rows {
1749            let sel_row = &sel[idx];
1750            for (i, val) in sel_row.iter().enumerate() {
1751                let col_idx = bufs.col_indices[i];
1752                let col = &table_schema.columns[col_idx];
1753                let got_type = val.data_type();
1754                bufs.row[col_idx] = if val.is_null() {
1755                    Value::Null
1756                } else {
1757                    val.clone().coerce_into(col.data_type).ok_or_else(|| {
1758                        SqlError::TypeMismatch {
1759                            expected: col.data_type.to_string(),
1760                            got: got_type.to_string(),
1761                        }
1762                    })?
1763                };
1764            }
1765        }
1766
1767        if has_defaults {
1768            for &(pos, def_expr) in &defaults {
1769                let val = eval_const_expr(def_expr)?;
1770                let col = &table_schema.columns[pos];
1771                if !val.is_null() {
1772                    let got_type = val.data_type();
1773                    bufs.row[pos] =
1774                        val.coerce_into(col.data_type)
1775                            .ok_or_else(|| SqlError::TypeMismatch {
1776                                expected: col.data_type.to_string(),
1777                                got: got_type.to_string(),
1778                            })?;
1779                }
1780            }
1781        }
1782
1783        if let Some(gen_map) = row_col_map_for_gen {
1784            if cache.is_some() {
1785                for (pos, fast) in cached_gen_positions
1786                    .iter()
1787                    .copied()
1788                    .zip(cached_gen_fast_evals.iter())
1789                {
1790                    let gen_expr = table_schema.columns[pos].generated_expr.as_ref().unwrap();
1791                    let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1792                    let col = &table_schema.columns[pos];
1793                    bufs.row[pos] = if val.is_null() {
1794                        Value::Null
1795                    } else {
1796                        let got_type = val.data_type();
1797                        val.coerce_into(col.data_type)
1798                            .ok_or_else(|| SqlError::TypeMismatch {
1799                                expected: col.data_type.to_string(),
1800                                got: got_type.to_string(),
1801                            })?
1802                    };
1803                }
1804            } else {
1805                for (pos, gen_expr, fast) in &generated_cols_uncached {
1806                    let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1807                    let col = &table_schema.columns[*pos];
1808                    bufs.row[*pos] = if val.is_null() {
1809                        Value::Null
1810                    } else {
1811                        let got_type = val.data_type();
1812                        val.coerce_into(col.data_type)
1813                            .ok_or_else(|| SqlError::TypeMismatch {
1814                                expected: col.data_type.to_string(),
1815                                got: got_type.to_string(),
1816                            })?
1817                    };
1818                }
1819            }
1820        }
1821
1822        if let Some(c) = cache {
1823            for &pos in &c.not_null_indices {
1824                if bufs.row[pos as usize].is_null() {
1825                    return Err(SqlError::NotNullViolation(
1826                        table_schema.columns[pos as usize].name.clone(),
1827                    ));
1828                }
1829            }
1830        } else {
1831            for col in &table_schema.columns {
1832                if !col.nullable && bufs.row[col.position as usize].is_null() {
1833                    return Err(SqlError::NotNullViolation(col.name.clone()));
1834                }
1835            }
1836        }
1837
1838        if let Some(col_map) = check_col_map {
1839            for col in &table_schema.columns {
1840                if let Some(ref check) = col.check_expr {
1841                    let result = eval_expr(check, &EvalCtx::new(col_map, &bufs.row))?;
1842                    if !is_truthy(&result) && !result.is_null() {
1843                        let name = col.check_name.as_deref().unwrap_or(&col.name);
1844                        return Err(SqlError::CheckViolation(name.to_string()));
1845                    }
1846                }
1847            }
1848            for tc in &table_schema.check_constraints {
1849                let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &bufs.row))?;
1850                if !is_truthy(&result) && !result.is_null() {
1851                    let name = tc.name.as_deref().unwrap_or(&tc.sql);
1852                    return Err(SqlError::CheckViolation(name.to_string()));
1853                }
1854            }
1855        }
1856
1857        if has_fks {
1858            for fk in &table_schema.foreign_keys {
1859                let any_null = fk.columns.iter().any(|&ci| bufs.row[ci as usize].is_null());
1860                if any_null {
1861                    continue;
1862                }
1863                crate::encoding::encode_composite_key_from_indices(
1864                    &fk.columns,
1865                    &bufs.row,
1866                    &mut bufs.fk_key_buf,
1867                );
1868                if fk.deferrable && fk.initially_deferred {
1869                    let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
1870                    wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
1871                        fk_name: name,
1872                        foreign_table: fk.foreign_table.as_bytes().to_vec(),
1873                        parent_key: bufs.fk_key_buf.clone(),
1874                    });
1875                    continue;
1876                }
1877                if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &bufs.fk_key_buf) {
1878                    let found = wtx
1879                        .table_get(fk.foreign_table.as_bytes(), &bufs.fk_key_buf)
1880                        .map_err(SqlError::Storage)?;
1881                    if found.is_none() {
1882                        let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
1883                        return Err(SqlError::ForeignKeyViolation(name.to_string()));
1884                    }
1885                    wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &bufs.fk_key_buf);
1886                }
1887            }
1888        }
1889
1890        let proposed_row_for_returning: Option<Vec<Value>> =
1891            returning_rows.as_ref().map(|_| bufs.row.clone());
1892        let row_for_stmt_trigger_impl: Option<Vec<Value>> = if has_insert_statement_triggers_impl {
1893            Some(bufs.row.clone())
1894        } else {
1895            None
1896        };
1897
1898        if has_before_insert_triggers {
1899            super::triggers::fire_row_triggers(
1900                wtx,
1901                schema,
1902                &table_schema.name,
1903                crate::parser::TriggerTiming::Before,
1904                super::triggers::FireEvent::Insert,
1905                None,
1906                Some(bufs.row.clone()),
1907                &table_schema.columns,
1908            )?;
1909        }
1910
1911        for (j, &i) in pk_indices.iter().enumerate() {
1912            bufs.pk_values[j] = std::mem::replace(&mut bufs.row[i], Value::Null);
1913        }
1914        match cache.map(|c| c.single_int_pk).unwrap_or(false) {
1915            true => match bufs.pk_values[0] {
1916                Value::Integer(v) => crate::encoding::encode_int_key_into(v, &mut bufs.key_buf),
1917                _ => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1918            },
1919            false => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1920        }
1921        if plain_insert && single_int_pk {
1922            if let Value::Integer(id) = &bufs.pk_values[0] {
1923                min_inserted_pk = Some(min_inserted_pk.map_or(*id, |m| m.min(*id)));
1924            }
1925        }
1926
1927        for &slot in dropped {
1928            bufs.value_values[slot as usize] = Value::Null;
1929        }
1930        for (j, &i) in non_pk.iter().enumerate() {
1931            let col = &table_schema.columns[i];
1932            if matches!(
1933                col.generated_kind,
1934                Some(crate::parser::GeneratedKind::Virtual)
1935            ) {
1936                bufs.value_values[enc_pos[j] as usize] = Value::Null;
1937                bufs.row[i] = Value::Null;
1938            } else {
1939                bufs.value_values[enc_pos[j] as usize] =
1940                    std::mem::replace(&mut bufs.row[i], Value::Null);
1941            }
1942        }
1943        match cache.and_then(|c| c.row_encoder.as_ref()) {
1944            Some(tmpl) => crate::encoding::encode_row_with_template(
1945                tmpl,
1946                &bufs.value_values,
1947                &mut bufs.value_buf,
1948            )?,
1949            None => encode_row_into(&bufs.value_values, &mut bufs.value_buf),
1950        }
1951
1952        if bufs.key_buf.len() > citadel_core::MAX_KEY_SIZE {
1953            return Err(SqlError::KeyTooLarge {
1954                size: bufs.key_buf.len(),
1955                max: citadel_core::MAX_KEY_SIZE,
1956            });
1957        }
1958        if bufs.value_buf.len() > citadel_core::MAX_VALUE_SIZE {
1959            return Err(SqlError::RowTooLarge {
1960                size: bufs.value_buf.len(),
1961                max: citadel_core::MAX_VALUE_SIZE,
1962            });
1963        }
1964
1965        match compiled_conflict.as_ref() {
1966            None => {
1967                let is_new = wtx
1968                    .table_insert(table_bytes, &bufs.key_buf, &bufs.value_buf)
1969                    .map_err(SqlError::Storage)?;
1970                if !is_new {
1971                    return Err(SqlError::DuplicateKey);
1972                }
1973                if has_indices || has_after_insert_triggers {
1974                    for (j, &i) in pk_indices.iter().enumerate() {
1975                        bufs.row[i] = bufs.pk_values[j].clone();
1976                    }
1977                    for (j, &i) in non_pk.iter().enumerate() {
1978                        bufs.row[i] = std::mem::replace(
1979                            &mut bufs.value_values[enc_pos[j] as usize],
1980                            Value::Null,
1981                        );
1982                    }
1983                    if has_indices {
1984                        insert_index_entries(wtx, table_schema, &bufs.row, &bufs.pk_values)?;
1985                    }
1986                    if has_after_insert_triggers {
1987                        super::triggers::fire_row_triggers(
1988                            wtx,
1989                            schema,
1990                            &table_schema.name,
1991                            crate::parser::TriggerTiming::After,
1992                            super::triggers::FireEvent::Insert,
1993                            None,
1994                            Some(bufs.row.clone()),
1995                            &table_schema.columns,
1996                        )?;
1997                    }
1998                }
1999                if let Some(r) = row_for_stmt_trigger_impl.clone() {
2000                    stmt_new_rows_impl.push(r);
2001                }
2002                count += 1;
2003                if let Some(buf) = returning_rows.as_mut() {
2004                    buf.push((None, proposed_row_for_returning));
2005                }
2006            }
2007            Some(oc) => {
2008                let oc_ref: &CompiledOnConflict = oc;
2009                let needs_row = upsert_needs_row(oc_ref, table_schema);
2010                if needs_row {
2011                    for (j, &i) in pk_indices.iter().enumerate() {
2012                        bufs.row[i] = bufs.pk_values[j].clone();
2013                    }
2014                    for (j, &i) in non_pk.iter().enumerate() {
2015                        bufs.row[i] = std::mem::replace(
2016                            &mut bufs.value_values[enc_pos[j] as usize],
2017                            Value::Null,
2018                        );
2019                    }
2020                }
2021                let outcome = apply_insert_with_conflict(
2022                    wtx,
2023                    table_schema,
2024                    &bufs.key_buf,
2025                    &bufs.value_buf,
2026                    &bufs.row,
2027                    &bufs.pk_values,
2028                    oc_ref,
2029                    row_col_map.unwrap(),
2030                    // Trigger dispatch needs the Updated outcome's rows too.
2031                    stmt.returning.is_some() || has_after_update_triggers,
2032                )?;
2033                match outcome {
2034                    InsertRowOutcome::Inserted => {
2035                        count += 1;
2036                        if let Some(buf) = returning_rows.as_mut() {
2037                            buf.push((None, proposed_row_for_returning));
2038                        }
2039                        if let Some(r) = row_for_stmt_trigger_impl.clone() {
2040                            stmt_new_rows_impl.push(r);
2041                        }
2042                        if has_after_insert_triggers {
2043                            super::triggers::fire_row_triggers(
2044                                wtx,
2045                                schema,
2046                                &table_schema.name,
2047                                crate::parser::TriggerTiming::After,
2048                                super::triggers::FireEvent::Insert,
2049                                None,
2050                                Some(bufs.row.clone()),
2051                                &table_schema.columns,
2052                            )?;
2053                        }
2054                    }
2055                    InsertRowOutcome::Updated { old, new } => {
2056                        count += 1;
2057                        if let Some(buf) = returning_rows.as_mut() {
2058                            buf.push((Some(old.clone()), Some(new.clone())));
2059                        }
2060                        if has_after_update_triggers {
2061                            let changed_cols: Vec<String> = match oc_ref {
2062                                CompiledOnConflict::DoUpdate { assignments, .. } => assignments
2063                                    .iter()
2064                                    .map(|(col_idx, _)| table_schema.columns[*col_idx].name.clone())
2065                                    .collect(),
2066                                _ => Vec::new(),
2067                            };
2068                            super::triggers::fire_row_triggers(
2069                                wtx,
2070                                schema,
2071                                &table_schema.name,
2072                                crate::parser::TriggerTiming::After,
2073                                super::triggers::FireEvent::Update {
2074                                    changed_columns: &changed_cols,
2075                                },
2076                                Some(old),
2077                                Some(new),
2078                                &table_schema.columns,
2079                            )?;
2080                        }
2081                    }
2082                    InsertRowOutcome::Skipped => {}
2083                }
2084            }
2085        }
2086    }
2087
2088    mark_insert_dml(
2089        schema,
2090        &table_schema.name,
2091        !plain_insert,
2092        single_int_pk,
2093        min_inserted_pk,
2094        count,
2095    );
2096
2097    if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
2098        if has_insert_statement_triggers_impl {
2099            super::triggers::fire_statement_triggers(
2100                wtx,
2101                schema,
2102                &table_schema.name,
2103                crate::parser::TriggerTiming::After,
2104                super::triggers::FireEvent::Insert,
2105                &table_schema.columns,
2106                &[],
2107                &stmt_new_rows_impl,
2108            )?;
2109        }
2110        return Ok(ExecutionResult::Query(super::helpers::project_returning(
2111            table_schema,
2112            returning_cols,
2113            &rows,
2114        )?));
2115    }
2116
2117    if has_insert_statement_triggers_impl {
2118        super::triggers::fire_statement_triggers(
2119            wtx,
2120            schema,
2121            &table_schema.name,
2122            crate::parser::TriggerTiming::After,
2123            super::triggers::FireEvent::Insert,
2124            &table_schema.columns,
2125            &[],
2126            &stmt_new_rows_impl,
2127        )?;
2128    }
2129
2130    Ok(ExecutionResult::RowsAffected(count))
2131}
2132
2133pub struct CompiledInsert {
2134    table_lower: String,
2135    cached: Option<InsertCache>,
2136}
2137
2138struct InsertCache {
2139    col_indices: Vec<usize>,
2140    has_subquery: bool,
2141    any_defaults: bool,
2142    has_checks: bool,
2143    on_conflict: Option<Arc<CompiledOnConflict>>,
2144    generated_col_positions: Vec<usize>,
2145    generated_fast_evals: Vec<FastGenEval>,
2146    pk_indices: Vec<usize>,
2147    non_pk_indices: Vec<usize>,
2148    encoding_positions: Vec<u16>,
2149    dropped_non_pk_slots: Vec<u16>,
2150    phys_count: usize,
2151    single_int_pk: bool,
2152    not_null_indices: Vec<u16>,
2153    bind_plan: Option<Vec<BindAction>>,
2154    row_fully_overwritten: bool,
2155    row_encoder: Option<crate::encoding::RowTemplate>,
2156    is_trivial_fast: bool,
2157    trivial_fast_program: Option<TrivialFastProgram>,
2158    needs_scoped_params: bool,
2159}
2160
2161#[derive(Clone)]
2162enum BindAction {
2163    Param {
2164        param_idx: usize,
2165        col_idx: usize,
2166        target: DataType,
2167    },
2168    Literal {
2169        value: Value,
2170        col_idx: usize,
2171    },
2172}
2173
2174#[derive(Clone)]
2175struct TrivialFastProgram {
2176    template: Vec<u8>,
2177    ops: Vec<WriteOp>,
2178    pk_param: u8,
2179    fk_checks: Vec<FkCheckSpec>,
2180    index_inserts: Vec<IndexInsertSpec>,
2181    on_dup: DupPolicy,
2182}
2183
2184/// PK-dup policy: Error = plain INSERT, Skip = DO NOTHING, Patch = DO UPDATE.
2185#[derive(Clone)]
2186enum DupPolicy {
2187    Error,
2188    Skip,
2189    Patch(Vec<DoUpdateFastPath>),
2190}
2191
2192/// A foreign-key existence check encodable straight from bound params.
2193#[derive(Clone)]
2194struct FkCheckSpec {
2195    foreign_table: Vec<u8>,
2196    col_params: Vec<u8>,
2197}
2198
2199/// A pure-column non-unique secondary index insert encodable from bound params.
2200#[derive(Clone)]
2201struct IndexInsertSpec {
2202    table: Vec<u8>,
2203    key_params: Vec<(u8, crate::types::Collation)>,
2204}
2205
2206#[derive(Clone)]
2207enum WriteOp {
2208    ParamI64 {
2209        param_idx: u8,
2210        off: u32,
2211    },
2212    LiteralI64 {
2213        value: i64,
2214        off: u32,
2215    },
2216    GenAddParamsI64 {
2217        a_param: u8,
2218        b_param: u8,
2219        off: u32,
2220    },
2221    GenMulAddParamI64 {
2222        param_idx: u8,
2223        mul: i64,
2224        add: i64,
2225        off: u32,
2226    },
2227}
2228
2229fn build_trivial_fast_program(
2230    bind_plan: &[BindAction],
2231    phys_count: usize,
2232    non_virtual_pairs: &[(usize, usize)],
2233    generated_col_positions: &[usize],
2234    generated_fast_evals: &[FastGenEval],
2235    ts: &TableSchema,
2236    on_conflict: Option<&CompiledOnConflict>,
2237) -> Option<TrivialFastProgram> {
2238    let columns = &ts.columns;
2239    let pk_col = ts.pk_indices()[0];
2240
2241    // PK-arbiter shapes on index/FK-free tables only; anything else bails.
2242    let on_dup = match on_conflict {
2243        None => DupPolicy::Error,
2244        Some(CompiledOnConflict::DoNothing { target })
2245            if matches!(target, None | Some(ConflictKind::PrimaryKey))
2246                && ts.indices.is_empty()
2247                && ts.foreign_keys.is_empty() =>
2248        {
2249            DupPolicy::Skip
2250        }
2251        Some(CompiledOnConflict::DoUpdate {
2252            target: ConflictKind::PrimaryKey,
2253            where_clause: None,
2254            fast_paths: Some(fps),
2255            ..
2256        }) if ts.indices.is_empty() && ts.foreign_keys.is_empty() && !ts.has_checks() => {
2257            DupPolicy::Patch(fps.clone())
2258        }
2259        _ => return None,
2260    };
2261
2262    let mut col_to_bind: rustc_hash::FxHashMap<usize, &BindAction> = Default::default();
2263    for action in bind_plan {
2264        let col = match action {
2265            BindAction::Param { col_idx, .. } | BindAction::Literal { col_idx, .. } => *col_idx,
2266        };
2267        col_to_bind.insert(col, action);
2268    }
2269
2270    // Freeze literals into the template; int params/generated cols stay holes.
2271    let mut slots: Vec<crate::encoding::TemplateSlot> = (0..phys_count)
2272        .map(|_| crate::encoding::TemplateSlot::Null)
2273        .collect();
2274    for &(col, slot) in non_virtual_pairs {
2275        slots[slot] = match col_to_bind.get(&col) {
2276            Some(BindAction::Literal { value, .. }) => {
2277                crate::encoding::TemplateSlot::Const(value.clone())
2278            }
2279            Some(BindAction::Param { target, .. }) => {
2280                if *target != DataType::Integer {
2281                    return None;
2282                }
2283                crate::encoding::TemplateSlot::IntHole
2284            }
2285            None => {
2286                if columns[col].data_type != DataType::Integer {
2287                    return None;
2288                }
2289                crate::encoding::TemplateSlot::IntHole
2290            }
2291        };
2292    }
2293    let tmpl = crate::encoding::build_row_template(phys_count, &slots);
2294    let col_to_slot: rustc_hash::FxHashMap<usize, usize> =
2295        non_virtual_pairs.iter().copied().collect();
2296    let slot_to_off: rustc_hash::FxHashMap<usize, usize> =
2297        tmpl.slot_offsets.iter().copied().collect();
2298
2299    let mut col_to_param: rustc_hash::FxHashMap<usize, u8> = Default::default();
2300    let mut col_to_lit_int: rustc_hash::FxHashMap<usize, i64> = Default::default();
2301    let mut pk_param: Option<u8> = None;
2302    let mut ops: Vec<WriteOp> = Vec::with_capacity(bind_plan.len() + generated_col_positions.len());
2303    let mut not_null_param_indices: Vec<u8> = Vec::new();
2304
2305    for action in bind_plan {
2306        match action {
2307            BindAction::Param {
2308                param_idx,
2309                col_idx,
2310                target,
2311            } => {
2312                if *target != DataType::Integer {
2313                    return None;
2314                }
2315                let pi: u8 = u8::try_from(*param_idx).ok()?;
2316                col_to_param.insert(*col_idx, pi);
2317                if *col_idx == pk_col {
2318                    pk_param = Some(pi);
2319                } else {
2320                    let slot = *col_to_slot.get(col_idx)?;
2321                    let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
2322                    ops.push(WriteOp::ParamI64 { param_idx: pi, off });
2323                    if !columns[*col_idx].nullable {
2324                        not_null_param_indices.push(pi);
2325                    }
2326                }
2327            }
2328            // Already in the template; record ints only for generated-col refs.
2329            BindAction::Literal { value, col_idx } => {
2330                if *col_idx == pk_col {
2331                    return None;
2332                }
2333                if let Value::Integer(v) = value {
2334                    col_to_lit_int.insert(*col_idx, *v);
2335                }
2336            }
2337        }
2338    }
2339
2340    let pk_param = pk_param?;
2341
2342    for (i, &gen_pos) in generated_col_positions.iter().enumerate() {
2343        let gen_slot = *col_to_slot.get(&gen_pos)?;
2344        let gen_off = u32::try_from(*slot_to_off.get(&gen_slot)?).ok()?;
2345        let gen_col_nullable = columns[gen_pos].nullable;
2346
2347        match &generated_fast_evals[i] {
2348            FastGenEval::IntColAddCol {
2349                left_idx,
2350                right_idx,
2351            } => {
2352                let a_param = col_to_param.get(left_idx).copied();
2353                let b_param = col_to_param.get(right_idx).copied();
2354                match (a_param, b_param) {
2355                    (Some(ap), Some(bp)) => {
2356                        let deps_safe = gen_col_nullable
2357                            || (not_null_param_indices.contains(&ap)
2358                                && not_null_param_indices.contains(&bp));
2359                        if !deps_safe {
2360                            return None;
2361                        }
2362                        ops.push(WriteOp::GenAddParamsI64 {
2363                            a_param: ap,
2364                            b_param: bp,
2365                            off: gen_off,
2366                        });
2367                    }
2368                    (Some(p), None) => {
2369                        let lit = col_to_lit_int.get(right_idx).copied()?;
2370                        if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2371                            return None;
2372                        }
2373                        ops.push(WriteOp::GenMulAddParamI64 {
2374                            param_idx: p,
2375                            mul: 1,
2376                            add: lit,
2377                            off: gen_off,
2378                        });
2379                    }
2380                    (None, Some(p)) => {
2381                        let lit = col_to_lit_int.get(left_idx).copied()?;
2382                        if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2383                            return None;
2384                        }
2385                        ops.push(WriteOp::GenMulAddParamI64 {
2386                            param_idx: p,
2387                            mul: 1,
2388                            add: lit,
2389                            off: gen_off,
2390                        });
2391                    }
2392                    (None, None) => {
2393                        let la = col_to_lit_int.get(left_idx).copied()?;
2394                        let lb = col_to_lit_int.get(right_idx).copied()?;
2395                        ops.push(WriteOp::LiteralI64 {
2396                            value: la.wrapping_add(lb),
2397                            off: gen_off,
2398                        });
2399                    }
2400                }
2401            }
2402            FastGenEval::IntColMulAdd {
2403                col_schema_idx,
2404                mul,
2405                add,
2406            } => {
2407                if let Some(p) = col_to_param.get(col_schema_idx).copied() {
2408                    if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2409                        return None;
2410                    }
2411                    ops.push(WriteOp::GenMulAddParamI64 {
2412                        param_idx: p,
2413                        mul: *mul,
2414                        add: *add,
2415                        off: gen_off,
2416                    });
2417                } else if let Some(lit) = col_to_lit_int.get(col_schema_idx).copied() {
2418                    ops.push(WriteOp::LiteralI64 {
2419                        value: lit.wrapping_mul(*mul).wrapping_add(*add),
2420                        off: gen_off,
2421                    });
2422                } else {
2423                    return None;
2424                }
2425            }
2426            FastGenEval::None => return None,
2427        }
2428    }
2429
2430    let mut fk_checks: Vec<FkCheckSpec> = Vec::with_capacity(ts.foreign_keys.len());
2431    for fk in &ts.foreign_keys {
2432        if fk.deferrable && fk.initially_deferred {
2433            return None;
2434        }
2435        let mut col_params = Vec::with_capacity(fk.columns.len());
2436        for &c in &fk.columns {
2437            col_params.push(col_to_param.get(&(c as usize)).copied()?);
2438        }
2439        fk_checks.push(FkCheckSpec {
2440            foreign_table: fk.foreign_table.as_bytes().to_vec(),
2441            col_params,
2442        });
2443    }
2444
2445    let mut index_inserts: Vec<IndexInsertSpec> = Vec::with_capacity(ts.indices.len());
2446    for idx in &ts.indices {
2447        if idx.unique
2448            || !idx.is_pure_column_index()
2449            || idx.predicate_expr.is_some()
2450            || idx.predicate_sql.is_some()
2451        {
2452            return None;
2453        }
2454        let mut key_params = Vec::with_capacity(idx.keys.len());
2455        for (i, key) in idx.keys.iter().enumerate() {
2456            let crate::types::IndexKey::Column { idx: col_idx, .. } = key else {
2457                return None;
2458            };
2459            key_params.push((
2460                col_to_param.get(&(*col_idx as usize)).copied()?,
2461                idx.collation_at(i),
2462            ));
2463        }
2464        index_inserts.push(IndexInsertSpec {
2465            table: TableSchema::index_table_name(&ts.name, &idx.name),
2466            key_params,
2467        });
2468    }
2469
2470    Some(TrivialFastProgram {
2471        template: tmpl.template,
2472        ops,
2473        pk_param,
2474        fk_checks,
2475        index_inserts,
2476        on_dup,
2477    })
2478}
2479
2480#[derive(Clone)]
2481pub(super) enum CompiledOnConflict {
2482    DoNothing {
2483        target: Option<ConflictKind>,
2484    },
2485    DoUpdate {
2486        target: ConflictKind,
2487        assignments: Vec<(usize, Expr)>,
2488        where_clause: Option<Expr>,
2489        fast_paths: Option<Vec<DoUpdateFastPath>>,
2490    },
2491}
2492
2493#[derive(Clone, Copy)]
2494pub(super) enum DoUpdateFastPath {
2495    IntAddConst { phys_idx: usize, delta: i64 },
2496}
2497
2498#[derive(Clone, Debug)]
2499pub(super) enum ConflictKind {
2500    PrimaryKey,
2501    UniqueIndex { index_idx: usize },
2502}
2503
2504fn resolve_conflict_target(target: &ConflictTarget, ts: &TableSchema) -> Result<ConflictKind> {
2505    match target {
2506        ConflictTarget::Columns(cols) => {
2507            let col_idx_set: Vec<u16> = cols
2508                .iter()
2509                .map(|name| {
2510                    ts.column_index(name)
2511                        .map(|i| i as u16)
2512                        .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
2513                })
2514                .collect::<Result<_>>()?;
2515            let pk_set = ts.primary_key_columns.clone();
2516            if set_equal(&col_idx_set, &pk_set) {
2517                return Ok(ConflictKind::PrimaryKey);
2518            }
2519            for (index_idx, idx) in ts.indices.iter().enumerate() {
2520                if idx.unique && set_equal(&col_idx_set, &idx.columns_vec()) {
2521                    return Ok(ConflictKind::UniqueIndex { index_idx });
2522                }
2523            }
2524            Err(SqlError::Plan(
2525                "ON CONFLICT target does not match any unique constraint".into(),
2526            ))
2527        }
2528        ConflictTarget::Constraint(name) => {
2529            let lower = name.to_ascii_lowercase();
2530            for (index_idx, idx) in ts.indices.iter().enumerate() {
2531                if idx.name.eq_ignore_ascii_case(&lower) {
2532                    if idx.unique {
2533                        return Ok(ConflictKind::UniqueIndex { index_idx });
2534                    }
2535                    return Err(SqlError::Plan(format!(
2536                        "ON CONFLICT ON CONSTRAINT '{name}' requires a unique index"
2537                    )));
2538                }
2539            }
2540            Err(SqlError::Plan(format!(
2541                "unknown constraint '{name}'; primary keys cannot be referenced by name, use ON CONFLICT (col_list)"
2542            )))
2543        }
2544    }
2545}
2546
2547fn set_equal(a: &[u16], b: &[u16]) -> bool {
2548    if a.len() != b.len() {
2549        return false;
2550    }
2551    let mut a_sorted = a.to_vec();
2552    let mut b_sorted = b.to_vec();
2553    a_sorted.sort_unstable();
2554    b_sorted.sort_unstable();
2555    a_sorted == b_sorted
2556}
2557
2558pub(super) enum InsertRowOutcome {
2559    Inserted,
2560    Updated { old: Vec<Value>, new: Vec<Value> },
2561    Skipped,
2562}
2563
2564#[allow(clippy::too_many_arguments)]
2565#[inline]
2566pub(super) fn apply_insert_with_conflict(
2567    wtx: &mut WriteTxn<'_>,
2568    table_schema: &TableSchema,
2569    key_buf: &[u8],
2570    value_buf: &[u8],
2571    row: &[Value],
2572    pk_values: &[Value],
2573    on_conflict: &CompiledOnConflict,
2574    col_map: &ColumnMap,
2575    capture_returning: bool,
2576) -> Result<InsertRowOutcome> {
2577    let table_bytes = table_schema.name.as_bytes();
2578
2579    if let CompiledOnConflict::DoNothing { target } = on_conflict {
2580        let pk_target = matches!(target, None | Some(ConflictKind::PrimaryKey));
2581        if pk_target && table_schema.indices.is_empty() && table_schema.foreign_keys.is_empty() {
2582            let inserted = wtx
2583                .table_insert_if_absent(table_bytes, key_buf, value_buf)
2584                .map_err(SqlError::Storage)?;
2585            return Ok(if inserted {
2586                InsertRowOutcome::Inserted
2587            } else {
2588                InsertRowOutcome::Skipped
2589            });
2590        }
2591    }
2592
2593    if let CompiledOnConflict::DoUpdate {
2594        target: ConflictKind::PrimaryKey,
2595        assignments,
2596        where_clause,
2597        fast_paths,
2598    } = on_conflict
2599    {
2600        if can_fuse_do_update(table_schema, assignments) {
2601            return apply_do_update_fused(
2602                wtx,
2603                table_schema,
2604                table_bytes,
2605                key_buf,
2606                value_buf,
2607                row,
2608                assignments,
2609                where_clause.as_ref(),
2610                col_map,
2611                fast_paths.as_deref(),
2612                capture_returning,
2613            );
2614        }
2615    }
2616
2617    let primary_outcome = wtx
2618        .table_insert_or_fetch(table_bytes, key_buf, value_buf)
2619        .map_err(SqlError::Storage)?;
2620
2621    match primary_outcome {
2622        citadel_txn::write_txn::InsertOutcome::Inserted => {
2623            if table_schema.indices.is_empty() {
2624                return Ok(InsertRowOutcome::Inserted);
2625            }
2626            let mut inserted_keys: Vec<(usize, Vec<u8>)> = Vec::new();
2627            match insert_index_entries_or_fetch(
2628                wtx,
2629                table_schema,
2630                row,
2631                pk_values,
2632                &mut inserted_keys,
2633            )? {
2634                None => Ok(InsertRowOutcome::Inserted),
2635                Some(conflicting_idx) => {
2636                    let matches_target =
2637                        matches!(on_conflict, CompiledOnConflict::DoNothing { target: None })
2638                            || matches!(
2639                                on_conflict,
2640                                CompiledOnConflict::DoNothing {
2641                                    target: Some(ConflictKind::UniqueIndex { index_idx }),
2642                                } | CompiledOnConflict::DoUpdate {
2643                                    target: ConflictKind::UniqueIndex { index_idx },
2644                                    ..
2645                                } if *index_idx == conflicting_idx
2646                            );
2647                    undo_partial_insert(wtx, table_schema, key_buf, &inserted_keys)?;
2648                    if !matches_target {
2649                        return Err(SqlError::UniqueViolation(
2650                            table_schema.indices[conflicting_idx].name.clone(),
2651                        ));
2652                    }
2653                    match on_conflict {
2654                        CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2655                        CompiledOnConflict::DoUpdate {
2656                            assignments,
2657                            where_clause,
2658                            ..
2659                        } => {
2660                            let existing_pk =
2661                                fetch_unique_index_pk(wtx, table_schema, conflicting_idx, row)?;
2662                            apply_do_update(
2663                                wtx,
2664                                table_schema,
2665                                &existing_pk,
2666                                row,
2667                                assignments,
2668                                where_clause.as_ref(),
2669                                col_map,
2670                                capture_returning,
2671                            )
2672                        }
2673                    }
2674                }
2675            }
2676        }
2677        citadel_txn::write_txn::InsertOutcome::Existed(old_bytes) => {
2678            let matches_target = matches!(
2679                on_conflict,
2680                CompiledOnConflict::DoNothing { target: None }
2681                    | CompiledOnConflict::DoNothing {
2682                        target: Some(ConflictKind::PrimaryKey),
2683                    }
2684                    | CompiledOnConflict::DoUpdate {
2685                        target: ConflictKind::PrimaryKey,
2686                        ..
2687                    }
2688            );
2689            if !matches_target {
2690                return Err(SqlError::DuplicateKey);
2691            }
2692            match on_conflict {
2693                CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2694                CompiledOnConflict::DoUpdate {
2695                    assignments,
2696                    where_clause,
2697                    ..
2698                } => {
2699                    let old_row = decode_full_row(table_schema, key_buf, &old_bytes)?;
2700                    apply_do_update_with_old_row(
2701                        wtx,
2702                        table_schema,
2703                        key_buf,
2704                        &old_row,
2705                        row,
2706                        assignments,
2707                        where_clause.as_ref(),
2708                        col_map,
2709                        capture_returning,
2710                    )
2711                }
2712            }
2713        }
2714    }
2715}
2716
2717#[inline]
2718fn apply_fast_path_patch(
2719    old_bytes: &[u8],
2720    fast_paths: &[DoUpdateFastPath],
2721) -> Result<UpsertAction> {
2722    UPSERT_SCRATCH.with(|slot| {
2723        let mut bufs = slot.borrow_mut();
2724        bufs.new_value_buf.clear();
2725        bufs.new_value_buf.extend_from_slice(old_bytes);
2726
2727        let mut patch_scratch: Vec<u8> = Vec::new();
2728
2729        for fp in fast_paths {
2730            match fp {
2731                DoUpdateFastPath::IntAddConst { phys_idx, delta } => {
2732                    let decoded =
2733                        crate::encoding::decode_columns(&bufs.new_value_buf, &[*phys_idx])?;
2734                    let old_val = &decoded[0];
2735                    let new_val = match old_val {
2736                        Value::Integer(i) => Value::Integer(i.wrapping_add(*delta)),
2737                        Value::Null => Value::Null,
2738                        _ => {
2739                            return Err(SqlError::TypeMismatch {
2740                                expected: "INTEGER".into(),
2741                                got: old_val.data_type().to_string(),
2742                            });
2743                        }
2744                    };
2745                    if !crate::encoding::patch_column_in_place(
2746                        &mut bufs.new_value_buf,
2747                        *phys_idx,
2748                        &new_val,
2749                    )? {
2750                        patch_scratch.clear();
2751                        crate::encoding::patch_row_column(
2752                            &bufs.new_value_buf,
2753                            *phys_idx,
2754                            &new_val,
2755                            &mut patch_scratch,
2756                        )?;
2757                        std::mem::swap(&mut bufs.new_value_buf, &mut patch_scratch);
2758                    }
2759                }
2760            }
2761        }
2762
2763        if bufs.new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2764            return Err(SqlError::RowTooLarge {
2765                size: bufs.new_value_buf.len(),
2766                max: citadel_core::MAX_VALUE_SIZE,
2767            });
2768        }
2769
2770        Ok(UpsertAction::Replace(bufs.new_value_buf.clone()))
2771    })
2772}
2773
2774fn upsert_needs_row(oc: &CompiledOnConflict, ts: &TableSchema) -> bool {
2775    if !ts.indices.is_empty() {
2776        return true;
2777    }
2778    match oc {
2779        CompiledOnConflict::DoNothing { .. } => false,
2780        CompiledOnConflict::DoUpdate { fast_paths, .. } => fast_paths.is_none() || ts.has_checks(),
2781    }
2782}
2783
2784fn can_fuse_do_update(ts: &TableSchema, assignments: &[(usize, Expr)]) -> bool {
2785    if !ts.indices.is_empty() {
2786        return false;
2787    }
2788    if !ts.foreign_keys.is_empty() {
2789        return false;
2790    }
2791    if ts.columns.iter().any(|c| c.generated_kind.is_some()) {
2792        return false;
2793    }
2794    let pk = ts.pk_indices();
2795    !assignments.iter().any(|(ci, _)| pk.contains(ci))
2796}
2797
2798#[allow(clippy::too_many_arguments)]
2799#[inline]
2800fn apply_do_update_fused(
2801    wtx: &mut WriteTxn<'_>,
2802    table_schema: &TableSchema,
2803    table_bytes: &[u8],
2804    key_buf: &[u8],
2805    value_buf: &[u8],
2806    proposed_row: &[Value],
2807    assignments: &[(usize, Expr)],
2808    where_clause: Option<&Expr>,
2809    col_map: &ColumnMap,
2810    fast_paths: Option<&[DoUpdateFastPath]>,
2811    capture_returning: bool,
2812) -> Result<InsertRowOutcome> {
2813    let non_pk = table_schema.non_pk_indices();
2814    let enc_pos = table_schema.encoding_positions();
2815    let phys_count = table_schema.physical_non_pk_count();
2816    let dropped = table_schema.dropped_non_pk_slots();
2817    let has_checks = table_schema.has_checks();
2818    let has_fks = !table_schema.foreign_keys.is_empty();
2819
2820    let captured: std::cell::RefCell<Option<(Vec<Value>, Vec<Value>)>> =
2821        std::cell::RefCell::new(None);
2822
2823    let outcome =
2824        wtx.table_upsert_with::<_, SqlError>(table_bytes, key_buf, value_buf, |old_bytes| {
2825            if let Some(fps) = fast_paths {
2826                if !has_checks {
2827                    let action = apply_fast_path_patch(old_bytes, fps)?;
2828                    if capture_returning {
2829                        if let UpsertAction::Replace(ref new_bytes) = action {
2830                            let old_row = decode_full_row(table_schema, key_buf, old_bytes)?;
2831                            let new_row = decode_full_row(table_schema, key_buf, new_bytes)?;
2832                            *captured.borrow_mut() = Some((old_row, new_row));
2833                        }
2834                    }
2835                    return Ok(action);
2836                }
2837            }
2838            UPSERT_SCRATCH.with(|slot| {
2839                let mut bufs = slot.borrow_mut();
2840                let UpsertBufs {
2841                    old_row,
2842                    new_row,
2843                    value_values,
2844                    new_value_buf,
2845                } = &mut *bufs;
2846
2847                old_row.clear();
2848                old_row.resize(table_schema.columns.len(), Value::Null);
2849                decode_full_row_into(table_schema, key_buf, old_bytes, old_row)?;
2850
2851                if let Some(w) = where_clause {
2852                    let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2853                    let result = eval_expr(w, &ctx)?;
2854                    if result.is_null() || !is_truthy(&result) {
2855                        return Ok(UpsertAction::Skip);
2856                    }
2857                }
2858
2859                new_row.clear();
2860                new_row.extend_from_slice(old_row);
2861                for (col_idx, expr) in assignments {
2862                    let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2863                    let val = eval_expr(expr, &ctx)?;
2864                    let col = &table_schema.columns[*col_idx];
2865                    new_row[*col_idx] = if val.is_null() {
2866                        Value::Null
2867                    } else {
2868                        let got = val.data_type();
2869                        val.coerce_into(col.data_type)
2870                            .ok_or_else(|| SqlError::TypeMismatch {
2871                                expected: col.data_type.to_string(),
2872                                got: got.to_string(),
2873                            })?
2874                    };
2875                }
2876
2877                for (assigned_idx, _) in assignments {
2878                    let col = &table_schema.columns[*assigned_idx];
2879                    if !col.nullable && new_row[col.position as usize].is_null() {
2880                        return Err(SqlError::NotNullViolation(col.name.clone()));
2881                    }
2882                }
2883                if has_checks {
2884                    for col in &table_schema.columns {
2885                        if let Some(ref check) = col.check_expr {
2886                            let ctx = EvalCtx::new(col_map, new_row);
2887                            let result = eval_expr(check, &ctx)?;
2888                            if !is_truthy(&result) && !result.is_null() {
2889                                let name = col.check_name.as_deref().unwrap_or(&col.name);
2890                                return Err(SqlError::CheckViolation(name.to_string()));
2891                            }
2892                        }
2893                    }
2894                    for tc in &table_schema.check_constraints {
2895                        let ctx = EvalCtx::new(col_map, new_row);
2896                        let result = eval_expr(&tc.expr, &ctx)?;
2897                        if !is_truthy(&result) && !result.is_null() {
2898                            let name = tc.name.as_deref().unwrap_or(&tc.sql);
2899                            return Err(SqlError::CheckViolation(name.to_string()));
2900                        }
2901                    }
2902                }
2903                let _ = has_fks;
2904
2905                value_values.clear();
2906                value_values.resize(phys_count, Value::Null);
2907                for &slot in dropped {
2908                    value_values[slot as usize] = Value::Null;
2909                }
2910                for (j, &i) in non_pk.iter().enumerate() {
2911                    value_values[enc_pos[j] as usize] = new_row[i].clone();
2912                }
2913                new_value_buf.clear();
2914                crate::encoding::encode_row_into(value_values, new_value_buf);
2915
2916                if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2917                    return Err(SqlError::RowTooLarge {
2918                        size: new_value_buf.len(),
2919                        max: citadel_core::MAX_VALUE_SIZE,
2920                    });
2921                }
2922
2923                if capture_returning {
2924                    *captured.borrow_mut() = Some((old_row.clone(), new_row.clone()));
2925                }
2926                Ok(UpsertAction::Replace(new_value_buf.clone()))
2927            })
2928        })?;
2929
2930    match outcome {
2931        UpsertOutcome::Inserted => Ok(InsertRowOutcome::Inserted),
2932        UpsertOutcome::Updated => {
2933            if capture_returning {
2934                let (old, new) = captured.into_inner().ok_or_else(|| {
2935                    SqlError::InvalidValue("DO UPDATE produced no captured rows".into())
2936                })?;
2937                Ok(InsertRowOutcome::Updated { old, new })
2938            } else {
2939                Ok(InsertRowOutcome::Inserted)
2940            }
2941        }
2942        UpsertOutcome::Skipped => Ok(InsertRowOutcome::Skipped),
2943    }
2944}
2945
2946fn fetch_unique_index_pk(
2947    wtx: &mut WriteTxn<'_>,
2948    table_schema: &TableSchema,
2949    index_idx: usize,
2950    row: &[Value],
2951) -> Result<Vec<u8>> {
2952    let idx = &table_schema.indices[index_idx];
2953    let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2954    let indexed: Vec<Value> = idx
2955        .column_positions_iter()
2956        .map(|col_idx| row[col_idx as usize].clone())
2957        .collect();
2958    let key = crate::encoding::encode_composite_key(&indexed);
2959    let value = wtx
2960        .table_get(&idx_table, &key)
2961        .map_err(SqlError::Storage)?
2962        .ok_or_else(|| {
2963            SqlError::InvalidValue("unique index missing expected collision entry".into())
2964        })?;
2965    Ok(value)
2966}
2967
2968#[allow(clippy::too_many_arguments)]
2969fn apply_do_update(
2970    wtx: &mut WriteTxn<'_>,
2971    table_schema: &TableSchema,
2972    pk_key: &[u8],
2973    proposed_row: &[Value],
2974    assignments: &[(usize, Expr)],
2975    where_clause: Option<&Expr>,
2976    col_map: &ColumnMap,
2977    capture_returning: bool,
2978) -> Result<InsertRowOutcome> {
2979    let old_value = wtx
2980        .table_get(table_schema.name.as_bytes(), pk_key)
2981        .map_err(SqlError::Storage)?
2982        .ok_or_else(|| SqlError::InvalidValue("primary row missing for DO UPDATE target".into()))?;
2983    let old_row = decode_full_row(table_schema, pk_key, &old_value)?;
2984    apply_do_update_with_old_row(
2985        wtx,
2986        table_schema,
2987        pk_key,
2988        &old_row,
2989        proposed_row,
2990        assignments,
2991        where_clause,
2992        col_map,
2993        capture_returning,
2994    )
2995}
2996
2997#[allow(clippy::too_many_arguments)]
2998fn apply_do_update_with_old_row(
2999    wtx: &mut WriteTxn<'_>,
3000    table_schema: &TableSchema,
3001    old_pk_key: &[u8],
3002    old_row: &[Value],
3003    proposed_row: &[Value],
3004    assignments: &[(usize, Expr)],
3005    where_clause: Option<&Expr>,
3006    col_map: &ColumnMap,
3007    capture_returning: bool,
3008) -> Result<InsertRowOutcome> {
3009    if let Some(w) = where_clause {
3010        let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
3011        let result = eval_expr(w, &ctx)?;
3012        if result.is_null() || !is_truthy(&result) {
3013            return Ok(InsertRowOutcome::Skipped);
3014        }
3015    }
3016
3017    let mut new_row = old_row.to_vec();
3018    for (col_idx, expr) in assignments {
3019        let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
3020        let val = eval_expr(expr, &ctx)?;
3021        let col = &table_schema.columns[*col_idx];
3022        new_row[*col_idx] = if val.is_null() {
3023            Value::Null
3024        } else {
3025            let got = val.data_type();
3026            val.coerce_into(col.data_type)
3027                .ok_or_else(|| SqlError::TypeMismatch {
3028                    expected: col.data_type.to_string(),
3029                    got: got.to_string(),
3030                })?
3031        };
3032    }
3033
3034    for col in &table_schema.columns {
3035        if matches!(
3036            col.generated_kind,
3037            Some(crate::parser::GeneratedKind::Stored)
3038        ) {
3039            let val = eval_expr(
3040                col.generated_expr.as_ref().unwrap(),
3041                &EvalCtx::new(col_map, &new_row),
3042            )?;
3043            let pos = col.position as usize;
3044            new_row[pos] = if val.is_null() {
3045                if !col.nullable {
3046                    return Err(SqlError::NotNullViolation(col.name.clone()));
3047                }
3048                Value::Null
3049            } else {
3050                let got = val.data_type();
3051                val.coerce_into(col.data_type)
3052                    .ok_or_else(|| SqlError::TypeMismatch {
3053                        expected: col.data_type.to_string(),
3054                        got: got.to_string(),
3055                    })?
3056            };
3057        }
3058    }
3059
3060    let pk_indices = table_schema.pk_indices();
3061    let assigned_pk = assignments.iter().any(|(ci, _)| pk_indices.contains(ci));
3062    let pk_changed = assigned_pk && pk_indices.iter().any(|&i| old_row[i] != new_row[i]);
3063
3064    for (assigned_idx, _) in assignments {
3065        let col = &table_schema.columns[*assigned_idx];
3066        if !col.nullable && new_row[col.position as usize].is_null() {
3067            return Err(SqlError::NotNullViolation(col.name.clone()));
3068        }
3069    }
3070    if table_schema.has_checks() {
3071        for col in &table_schema.columns {
3072            if let Some(ref check) = col.check_expr {
3073                let ctx = EvalCtx::new(col_map, &new_row);
3074                let result = eval_expr(check, &ctx)?;
3075                if !is_truthy(&result) && !result.is_null() {
3076                    let name = col.check_name.as_deref().unwrap_or(&col.name);
3077                    return Err(SqlError::CheckViolation(name.to_string()));
3078                }
3079            }
3080        }
3081        for tc in &table_schema.check_constraints {
3082            let ctx = EvalCtx::new(col_map, &new_row);
3083            let result = eval_expr(&tc.expr, &ctx)?;
3084            if !is_truthy(&result) && !result.is_null() {
3085                let name = tc.name.as_deref().unwrap_or(&tc.sql);
3086                return Err(SqlError::CheckViolation(name.to_string()));
3087            }
3088        }
3089    }
3090    for fk in &table_schema.foreign_keys {
3091        let changed = fk
3092            .columns
3093            .iter()
3094            .any(|&ci| old_row[ci as usize] != new_row[ci as usize]);
3095        if !changed {
3096            continue;
3097        }
3098        let any_null = fk.columns.iter().any(|&ci| new_row[ci as usize].is_null());
3099        if any_null {
3100            continue;
3101        }
3102        let fk_vals: Vec<Value> = fk
3103            .columns
3104            .iter()
3105            .map(|&ci| new_row[ci as usize].clone())
3106            .collect();
3107        let fk_key = crate::encoding::encode_composite_key(&fk_vals);
3108        if fk.deferrable && fk.initially_deferred {
3109            let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
3110            wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
3111                fk_name: name,
3112                foreign_table: fk.foreign_table.as_bytes().to_vec(),
3113                parent_key: fk_key,
3114            });
3115            continue;
3116        }
3117        if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key) {
3118            let found = wtx
3119                .table_get(fk.foreign_table.as_bytes(), &fk_key)
3120                .map_err(SqlError::Storage)?;
3121            if found.is_none() {
3122                let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
3123                return Err(SqlError::ForeignKeyViolation(name.to_string()));
3124            }
3125            wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key);
3126        }
3127    }
3128
3129    let has_indices = !table_schema.indices.is_empty();
3130    let old_pk_values: Vec<Value> = if has_indices || pk_changed {
3131        pk_indices.iter().map(|&i| old_row[i].clone()).collect()
3132    } else {
3133        Vec::new()
3134    };
3135    let new_pk_values: Vec<Value> = if has_indices || pk_changed {
3136        pk_indices.iter().map(|&i| new_row[i].clone()).collect()
3137    } else {
3138        Vec::new()
3139    };
3140
3141    let non_pk = table_schema.non_pk_indices();
3142    let enc_pos = table_schema.encoding_positions();
3143    let phys_count = table_schema.physical_non_pk_count();
3144    let dropped = table_schema.dropped_non_pk_slots();
3145    let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
3146    for &slot in dropped {
3147        value_values[slot as usize] = Value::Null;
3148    }
3149    for (j, &i) in non_pk.iter().enumerate() {
3150        let col = &table_schema.columns[i];
3151        value_values[enc_pos[j] as usize] = if matches!(
3152            col.generated_kind,
3153            Some(crate::parser::GeneratedKind::Virtual)
3154        ) {
3155            Value::Null
3156        } else {
3157            new_row[i].clone()
3158        };
3159    }
3160    let mut new_value_buf = Vec::with_capacity(256);
3161    crate::encoding::encode_row_into(&value_values, &mut new_value_buf);
3162
3163    if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
3164        return Err(SqlError::RowTooLarge {
3165            size: new_value_buf.len(),
3166            max: citadel_core::MAX_VALUE_SIZE,
3167        });
3168    }
3169
3170    if pk_changed {
3171        let new_pk_key = crate::encoding::encode_composite_key(&new_pk_values);
3172        let inserted = wtx
3173            .table_insert(table_schema.name.as_bytes(), &new_pk_key, &new_value_buf)
3174            .map_err(SqlError::Storage)?;
3175        if !inserted {
3176            return Err(SqlError::DuplicateKey);
3177        }
3178        wtx.table_delete(table_schema.name.as_bytes(), old_pk_key)
3179            .map_err(SqlError::Storage)?;
3180        for idx in &table_schema.indices {
3181            let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
3182            let old_idx_key =
3183                encode_index_key_with_schema(idx, old_row, &old_pk_values, table_schema);
3184            wtx.table_delete(&idx_table, &old_idx_key)
3185                .map_err(SqlError::Storage)?;
3186            let new_idx_key =
3187                encode_index_key_with_schema(idx, &new_row, &new_pk_values, table_schema);
3188            let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
3189            let is_new = wtx
3190                .table_insert(&idx_table, &new_idx_key, &new_idx_val)
3191                .map_err(SqlError::Storage)?;
3192            if idx.unique && !is_new {
3193                let any_null = idx
3194                    .column_positions_iter()
3195                    .any(|c| new_row[c as usize].is_null());
3196                if !any_null {
3197                    return Err(SqlError::UniqueViolation(idx.name.clone()));
3198                }
3199            }
3200        }
3201    } else {
3202        wtx.table_update_sorted(
3203            table_schema.name.as_bytes(),
3204            &[(old_pk_key, new_value_buf.as_slice())],
3205        )
3206        .map_err(SqlError::Storage)?;
3207        let col_map_partial = any_partial_index(table_schema).then(|| table_schema.column_map());
3208        for idx in &table_schema.indices {
3209            let cols_changed = index_columns_changed(idx, old_row, &new_row);
3210            let (del, ins) = partial_idx_update_actions(
3211                idx,
3212                old_row,
3213                &new_row,
3214                cols_changed,
3215                false,
3216                col_map_partial,
3217            );
3218            let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
3219            if del {
3220                let old_idx_key =
3221                    encode_index_key_with_schema(idx, old_row, &old_pk_values, table_schema);
3222                wtx.table_delete(&idx_table, &old_idx_key)
3223                    .map_err(SqlError::Storage)?;
3224            }
3225            if ins {
3226                let new_idx_key =
3227                    encode_index_key_with_schema(idx, &new_row, &new_pk_values, table_schema);
3228                let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
3229                let is_new = wtx
3230                    .table_insert(&idx_table, &new_idx_key, &new_idx_val)
3231                    .map_err(SqlError::Storage)?;
3232                if idx.unique && !is_new {
3233                    let any_null = idx
3234                        .column_positions_iter()
3235                        .any(|c| new_row[c as usize].is_null());
3236                    if !any_null {
3237                        return Err(SqlError::UniqueViolation(idx.name.clone()));
3238                    }
3239                }
3240            }
3241        }
3242    }
3243
3244    if capture_returning {
3245        Ok(InsertRowOutcome::Updated {
3246            old: old_row.to_vec(),
3247            new: new_row,
3248        })
3249    } else {
3250        Ok(InsertRowOutcome::Inserted)
3251    }
3252}
3253
3254fn detect_fast_paths(
3255    ts: &TableSchema,
3256    assignments: &[(usize, Expr)],
3257) -> Option<Vec<DoUpdateFastPath>> {
3258    let non_pk = ts.non_pk_indices();
3259    let enc_pos = ts.encoding_positions();
3260    let mut out = Vec::with_capacity(assignments.len());
3261    for (col_idx, expr) in assignments {
3262        let col = &ts.columns[*col_idx];
3263        if col.data_type != DataType::Integer {
3264            return None;
3265        }
3266        let nonpk_order = non_pk.iter().position(|&i| i == *col_idx)?;
3267        let phys_idx = enc_pos[nonpk_order] as usize;
3268
3269        if let Expr::BinaryOp { left, op, right } = expr {
3270            if !matches!(op, BinOp::Add | BinOp::Sub) {
3271                return None;
3272            }
3273            let reads_target =
3274                matches!(left.as_ref(), Expr::Column(n) if n.eq_ignore_ascii_case(&col.name));
3275            if !reads_target {
3276                return None;
3277            }
3278            if let Expr::Literal(Value::Integer(n)) = right.as_ref() {
3279                let delta = if matches!(op, BinOp::Sub) { -n } else { *n };
3280                let _ = col_idx;
3281                out.push(DoUpdateFastPath::IntAddConst { phys_idx, delta });
3282                continue;
3283            }
3284            return None;
3285        }
3286        return None;
3287    }
3288    Some(out)
3289}
3290
3291fn compile_on_conflict(oc: &OnConflictClause, ts: &TableSchema) -> Result<CompiledOnConflict> {
3292    let target = oc
3293        .target
3294        .as_ref()
3295        .map(|t| resolve_conflict_target(t, ts))
3296        .transpose()?;
3297    match &oc.action {
3298        OnConflictAction::DoNothing => Ok(CompiledOnConflict::DoNothing { target }),
3299        OnConflictAction::DoUpdate {
3300            assignments,
3301            where_clause,
3302        } => {
3303            let target = target.ok_or_else(|| {
3304                SqlError::Plan("ON CONFLICT without target requires DO NOTHING".into())
3305            })?;
3306            let compiled_assignments: Vec<(usize, Expr)> = assignments
3307                .iter()
3308                .map(|(name, expr)| {
3309                    let col_idx = ts
3310                        .column_index(name)
3311                        .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?;
3312                    Ok((col_idx, expr.clone()))
3313                })
3314                .collect::<Result<_>>()?;
3315            let fast_paths = if where_clause.is_none() {
3316                detect_fast_paths(ts, &compiled_assignments)
3317            } else {
3318                None
3319            };
3320            Ok(CompiledOnConflict::DoUpdate {
3321                target,
3322                assignments: compiled_assignments,
3323                where_clause: where_clause.clone(),
3324                fast_paths,
3325            })
3326        }
3327    }
3328}
3329
3330/// Caller MUST check `cache.is_trivial_fast` first. `Ok(None)` = a NULL
3331/// param changes the row's cell layout; the caller runs the cached lane.
3332fn exec_insert_trivial_fast(
3333    wtx: &mut WriteTxn<'_>,
3334    table_lower: &str,
3335    cache: &InsertCache,
3336    bufs: &mut InsertBufs,
3337    params: &[Value],
3338) -> Result<Option<ExecutionResult>> {
3339    let prog = cache
3340        .trivial_fast_program
3341        .as_ref()
3342        .expect("trivial fast: program");
3343
3344    match &params[prog.pk_param as usize] {
3345        Value::Integer(v) => crate::encoding::encode_int_key_into(*v, &mut bufs.key_buf),
3346        Value::Null => return Ok(None),
3347        _ => return Err(SqlError::InvalidValue("non-integer PK in fast path".into())),
3348    }
3349
3350    bufs.value_buf.clear();
3351    bufs.value_buf.extend_from_slice(&prog.template);
3352
3353    for op in &prog.ops {
3354        match op {
3355            WriteOp::ParamI64 { param_idx, off } => match &params[*param_idx as usize] {
3356                Value::Integer(v) => {
3357                    let off = *off as usize;
3358                    bufs.value_buf[off..off + 8].copy_from_slice(&v.to_le_bytes());
3359                }
3360                Value::Null => return Ok(None),
3361                other => {
3362                    return Err(SqlError::TypeMismatch {
3363                        expected: "Integer".into(),
3364                        got: other.data_type().to_string(),
3365                    });
3366                }
3367            },
3368            WriteOp::LiteralI64 { value, off } => {
3369                let off = *off as usize;
3370                bufs.value_buf[off..off + 8].copy_from_slice(&value.to_le_bytes());
3371            }
3372            WriteOp::GenAddParamsI64 {
3373                a_param,
3374                b_param,
3375                off,
3376            } => match (&params[*a_param as usize], &params[*b_param as usize]) {
3377                (Value::Integer(a), Value::Integer(b)) => {
3378                    let off = *off as usize;
3379                    bufs.value_buf[off..off + 8].copy_from_slice(&a.wrapping_add(*b).to_le_bytes());
3380                }
3381                _ => return Ok(None),
3382            },
3383            WriteOp::GenMulAddParamI64 {
3384                param_idx,
3385                mul,
3386                add,
3387                off,
3388            } => match &params[*param_idx as usize] {
3389                Value::Integer(v) => {
3390                    let r = v.wrapping_mul(*mul).wrapping_add(*add);
3391                    let off = *off as usize;
3392                    bufs.value_buf[off..off + 8].copy_from_slice(&r.to_le_bytes());
3393                }
3394                _ => return Ok(None),
3395            },
3396        }
3397    }
3398
3399    for fk in &prog.fk_checks {
3400        if fk.col_params.iter().any(|&p| params[p as usize].is_null()) {
3401            continue;
3402        }
3403        bufs.fk_key_buf.clear();
3404        for &p in &fk.col_params {
3405            crate::encoding::encode_key_value_into(&params[p as usize], &mut bufs.fk_key_buf);
3406        }
3407        if !wtx.fk_check_cached(&fk.foreign_table, &bufs.fk_key_buf) {
3408            let found = wtx
3409                .table_get(&fk.foreign_table, &bufs.fk_key_buf)
3410                .map_err(SqlError::Storage)?;
3411            if found.is_none() {
3412                return Err(SqlError::ForeignKeyViolation(
3413                    String::from_utf8_lossy(&fk.foreign_table).into_owned(),
3414                ));
3415            }
3416            wtx.mark_fk_verified(&fk.foreign_table, &bufs.fk_key_buf);
3417        }
3418    }
3419
3420    if let DupPolicy::Patch(fps) = &prog.on_dup {
3421        let outcome = wtx.table_upsert_with::<_, SqlError>(
3422            table_lower.as_bytes(),
3423            &bufs.key_buf,
3424            &bufs.value_buf,
3425            |old_bytes| apply_fast_path_patch(old_bytes, fps),
3426        )?;
3427        return Ok(Some(match outcome {
3428            UpsertOutcome::Inserted | UpsertOutcome::Updated => ExecutionResult::RowsAffected(1),
3429            UpsertOutcome::Skipped => ExecutionResult::RowsAffected(0),
3430        }));
3431    }
3432
3433    let is_new = wtx
3434        .table_insert_if_absent(table_lower.as_bytes(), &bufs.key_buf, &bufs.value_buf)
3435        .map_err(SqlError::Storage)?;
3436    if !is_new {
3437        return match &prog.on_dup {
3438            DupPolicy::Error => Err(SqlError::DuplicateKey),
3439            DupPolicy::Skip => Ok(Some(ExecutionResult::RowsAffected(0))),
3440            DupPolicy::Patch(_) => unreachable!("handled above"),
3441        };
3442    }
3443
3444    for idx in &prog.index_inserts {
3445        bufs.fk_key_buf.clear();
3446        for &(p, coll) in &idx.key_params {
3447            crate::encoding::encode_key_value_collated_into(
3448                &params[p as usize],
3449                coll,
3450                &mut bufs.fk_key_buf,
3451            );
3452        }
3453        crate::encoding::encode_key_value_into(
3454            &params[prog.pk_param as usize],
3455            &mut bufs.fk_key_buf,
3456        );
3457        wtx.table_insert_index(&idx.table, &bufs.fk_key_buf, &[])
3458            .map_err(SqlError::Storage)?;
3459    }
3460
3461    Ok(Some(ExecutionResult::RowsAffected(1)))
3462}
3463
3464fn build_bind_plan(
3465    stmt: &InsertStmt,
3466    col_indices: &[usize],
3467    col_data_types: &[DataType],
3468) -> Option<Vec<BindAction>> {
3469    let rows = match &stmt.source {
3470        InsertSource::Values(rows) => rows,
3471        _ => return None,
3472    };
3473    if rows.len() != 1 {
3474        return None;
3475    }
3476    let value_row = &rows[0];
3477    if value_row.len() != col_indices.len() {
3478        return None;
3479    }
3480    let mut plan = Vec::with_capacity(value_row.len());
3481    for (i, expr) in value_row.iter().enumerate() {
3482        let col_idx = col_indices[i];
3483        let target = col_data_types[col_idx];
3484        match expr {
3485            Expr::Parameter(n) => {
3486                if *n == 0 {
3487                    return None;
3488                }
3489                plan.push(BindAction::Param {
3490                    param_idx: n - 1,
3491                    col_idx,
3492                    target,
3493                });
3494            }
3495            Expr::Literal(v) => plan.push(BindAction::Literal {
3496                value: v.clone(),
3497                col_idx,
3498            }),
3499            _ => return None,
3500        }
3501    }
3502    Some(plan)
3503}
3504
3505impl CompiledInsert {
3506    pub fn try_compile(schema: &SchemaManager, stmt: &InsertStmt) -> Option<Self> {
3507        let lower = stmt.table.to_ascii_lowercase();
3508        // Matview names resolve to their backing table; only the interpreted
3509        // path raises the modification error.
3510        let cached = if schema.get_matview(&lower).is_some() {
3511            None
3512        } else if let Some(ts) = schema.get(&lower) {
3513            let insert_columns: Vec<&str> = if stmt.columns.is_empty() {
3514                ts.columns.iter().map(|c| c.name.as_str()).collect()
3515            } else {
3516                stmt.columns.iter().map(|s| s.as_str()).collect()
3517            };
3518            let mut col_indices = Vec::with_capacity(insert_columns.len());
3519            for name in &insert_columns {
3520                col_indices.push(ts.column_index(name)?);
3521            }
3522            if col_indices
3523                .iter()
3524                .any(|&ci| ts.columns[ci].generated_kind.is_some())
3525            {
3526                return None;
3527            }
3528            let on_conflict = stmt
3529                .on_conflict
3530                .as_ref()
3531                .map(|oc| compile_on_conflict(oc, ts))
3532                .transpose()
3533                .ok()
3534                .flatten()
3535                .map(Arc::new);
3536            let generated_col_positions: Vec<usize> = ts
3537                .columns
3538                .iter()
3539                .enumerate()
3540                .filter_map(|(i, c)| {
3541                    matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored))
3542                        .then_some(i)
3543                })
3544                .collect();
3545            let generated_fast_evals: Vec<FastGenEval> = generated_col_positions
3546                .iter()
3547                .map(|&pos| {
3548                    detect_fast_gen_eval(ts.columns[pos].generated_expr.as_ref().unwrap(), ts)
3549                })
3550                .collect();
3551            let pk_indices: Vec<usize> = ts.pk_indices().to_vec();
3552            let non_pk_indices: Vec<usize> = ts.non_pk_indices().to_vec();
3553            let encoding_positions: Vec<u16> = ts.encoding_positions().to_vec();
3554            let dropped_non_pk_slots: Vec<u16> = ts.dropped_non_pk_slots().to_vec();
3555            let phys_count = ts.physical_non_pk_count();
3556            let col_data_types: Vec<DataType> = ts.columns.iter().map(|c| c.data_type).collect();
3557            let single_int_pk =
3558                pk_indices.len() == 1 && ts.columns[pk_indices[0]].data_type == DataType::Integer;
3559            let not_null_indices: Vec<u16> = ts
3560                .columns
3561                .iter()
3562                .filter(|c| !c.nullable)
3563                .map(|c| c.position)
3564                .collect();
3565            let bind_plan = build_bind_plan(stmt, &col_indices, &col_data_types);
3566            let any_defaults_flag = ts.columns.iter().any(|c| c.default_expr.is_some());
3567            let row_fully_overwritten = if any_defaults_flag {
3568                false
3569            } else {
3570                let mut covered: rustc_hash::FxHashSet<usize> =
3571                    col_indices.iter().copied().collect();
3572                covered.extend(generated_col_positions.iter().copied());
3573                for (j, &i) in non_pk_indices.iter().enumerate() {
3574                    let _ = j;
3575                    if matches!(
3576                        ts.columns[i].generated_kind,
3577                        Some(crate::parser::GeneratedKind::Virtual)
3578                    ) {
3579                        covered.insert(i);
3580                    }
3581                }
3582                bind_plan.is_some() && covered.len() == ts.columns.len()
3583            };
3584            let mut non_virtual_pairs: Vec<(usize, usize)> = Vec::new();
3585            let mut null_value_slots: Vec<usize> =
3586                dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3587            for (j, &i) in non_pk_indices.iter().enumerate() {
3588                let slot = encoding_positions[j] as usize;
3589                if matches!(
3590                    ts.columns[i].generated_kind,
3591                    Some(crate::parser::GeneratedKind::Virtual)
3592                ) {
3593                    null_value_slots.push(slot);
3594                } else {
3595                    non_virtual_pairs.push((i, slot));
3596                }
3597            }
3598            let row_encoder = {
3599                let all_int_or_null = non_pk_indices.iter().enumerate().all(|(j, &i)| {
3600                    let col = &ts.columns[i];
3601                    if matches!(
3602                        col.generated_kind,
3603                        Some(crate::parser::GeneratedKind::Virtual)
3604                    ) {
3605                        true
3606                    } else {
3607                        col.data_type == DataType::Integer && encoding_positions[j] != u16::MAX
3608                    }
3609                });
3610                if all_int_or_null {
3611                    let mut slots: Vec<crate::encoding::TemplateSlot> = (0..phys_count)
3612                        .map(|_| crate::encoding::TemplateSlot::IntHole)
3613                        .collect();
3614                    for &s in &dropped_non_pk_slots {
3615                        slots[s as usize] = crate::encoding::TemplateSlot::Null;
3616                    }
3617                    for (j, &i) in non_pk_indices.iter().enumerate() {
3618                        if matches!(
3619                            ts.columns[i].generated_kind,
3620                            Some(crate::parser::GeneratedKind::Virtual)
3621                        ) {
3622                            slots[encoding_positions[j] as usize] =
3623                                crate::encoding::TemplateSlot::Null;
3624                        }
3625                    }
3626                    Some(crate::encoding::build_row_template(phys_count, &slots))
3627                } else {
3628                    None
3629                }
3630            };
3631            // build_trivial_fast_program rejects any shape it can't compile.
3632            let is_trivial_fast_eligible = !insert_has_subquery(stmt)
3633                && !ts.columns.iter().any(|c| c.default_expr.is_some())
3634                && !ts.has_checks()
3635                && stmt.returning.is_none()
3636                && bind_plan.is_some()
3637                && row_fully_overwritten
3638                && single_int_pk
3639                && !super::triggers::has_insert_triggers(schema, &ts.name)
3640                // A DO UPDATE dup hit fires UPDATE row triggers on the slow path.
3641                && (stmt.on_conflict.is_none()
3642                    || !super::triggers::has_update_triggers(schema, &ts.name))
3643                && generated_fast_evals
3644                    .iter()
3645                    .all(|fe| !matches!(fe, FastGenEval::None));
3646            let trivial_fast_program = if is_trivial_fast_eligible {
3647                build_trivial_fast_program(
3648                    bind_plan.as_ref().unwrap(),
3649                    phys_count,
3650                    &non_virtual_pairs,
3651                    &generated_col_positions,
3652                    &generated_fast_evals,
3653                    ts,
3654                    on_conflict.as_deref(),
3655                )
3656            } else {
3657                None
3658            };
3659            let is_trivial_fast = trivial_fast_program.is_some();
3660            let has_checks = ts.has_checks();
3661            let any_defaults = ts.columns.iter().any(|c| c.default_expr.is_some());
3662            let needs_scoped_params = bind_plan.is_none()
3663                || has_checks
3664                || any_defaults
3665                || !generated_col_positions.is_empty()
3666                || on_conflict.is_some()
3667                || stmt.returning.is_some()
3668                || insert_has_subquery(stmt)
3669                || super::helpers::any_partial_index(ts);
3670            Some(InsertCache {
3671                col_indices,
3672                has_subquery: insert_has_subquery(stmt),
3673                any_defaults,
3674                has_checks,
3675                on_conflict,
3676                generated_col_positions,
3677                generated_fast_evals,
3678                pk_indices,
3679                non_pk_indices,
3680                encoding_positions,
3681                dropped_non_pk_slots,
3682                phys_count,
3683                single_int_pk,
3684                not_null_indices,
3685                bind_plan,
3686                row_fully_overwritten,
3687                row_encoder,
3688                is_trivial_fast,
3689                trivial_fast_program,
3690                needs_scoped_params,
3691            })
3692        } else if schema.get_view(&lower).is_some() {
3693            None
3694        } else {
3695            return None;
3696        };
3697        Some(Self {
3698            table_lower: lower,
3699            cached,
3700        })
3701    }
3702}
3703
3704impl CompiledPlan for CompiledInsert {
3705    fn execute(
3706        &self,
3707        db: &Database,
3708        schema: &SchemaManager,
3709        stmt: &Statement,
3710        params: &[Value],
3711        txn: super::compile::ActiveTxnRef<'_, '_>,
3712    ) -> Result<ExecutionResult> {
3713        let ins = match stmt {
3714            Statement::Insert(i) => i,
3715            _ => {
3716                return Err(SqlError::Unsupported(
3717                    "CompiledInsert received non-INSERT statement".into(),
3718                ))
3719            }
3720        };
3721        use super::compile::ActiveTxnRef;
3722        match txn {
3723            ActiveTxnRef::None => exec_insert(db, schema, ins, params),
3724            ActiveTxnRef::Read(_) => Err(SqlError::Unsupported(
3725                "cannot execute mutating statement inside a read-only transaction".into(),
3726            )),
3727            ActiveTxnRef::Write(outer) => match self.cached.as_ref() {
3728                Some(c) if c.is_trivial_fast => {
3729                    // Patch mutates existing rows: mark like every DO UPDATE path.
3730                    if matches!(
3731                        c.trivial_fast_program.as_ref().map(|p| &p.on_dup),
3732                        Some(DupPolicy::Patch(_))
3733                    ) {
3734                        schema.mark_dml(&self.table_lower);
3735                    }
3736                    match with_insert_scratch(|bufs| {
3737                        exec_insert_trivial_fast(outer, &self.table_lower, c, bufs, params)
3738                    })? {
3739                        Some(r) => Ok(r),
3740                        None => exec_insert_in_txn_cached(outer, schema, ins, params, c),
3741                    }
3742                }
3743                Some(c) => exec_insert_in_txn_cached(outer, schema, ins, params, c),
3744                None => exec_insert_in_txn(outer, schema, ins, params),
3745            },
3746        }
3747    }
3748
3749    fn uses_scoped_params(&self) -> bool {
3750        match self.cached.as_ref() {
3751            Some(c) => !c.is_trivial_fast && c.needs_scoped_params,
3752            None => true,
3753        }
3754    }
3755}
3756
3757fn exec_instead_of_view_insert_auto(
3758    db: &Database,
3759    schema: &SchemaManager,
3760    view_name: &str,
3761    aliases: &[String],
3762    stmt: &InsertStmt,
3763    params: &[Value],
3764) -> Result<ExecutionResult> {
3765    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
3766    let r = exec_instead_of_view_insert_in_txn(&mut wtx, schema, view_name, aliases, stmt, params)?;
3767    wtx.commit().map_err(SqlError::Storage)?;
3768    Ok(r)
3769}
3770
3771fn exec_instead_of_view_insert_in_txn(
3772    wtx: &mut WriteTxn<'_>,
3773    schema: &SchemaManager,
3774    view_name: &str,
3775    aliases: &[String],
3776    stmt: &InsertStmt,
3777    params: &[Value],
3778) -> Result<ExecutionResult> {
3779    // CREATE VIEW without explicit aliases stores an empty vec; derive at runtime.
3780    let resolved_aliases: Vec<String> = if aliases.is_empty() {
3781        derive_view_columns(wtx, schema, view_name)?
3782    } else {
3783        aliases.to_vec()
3784    };
3785    let view_cols = super::triggers::view_columns_from_aliases(&resolved_aliases);
3786    let alias_map: rustc_hash::FxHashMap<String, usize> = resolved_aliases
3787        .iter()
3788        .enumerate()
3789        .map(|(i, name)| (name.to_ascii_lowercase(), i))
3790        .collect();
3791
3792    let target_positions: Vec<usize> = if stmt.columns.is_empty() {
3793        (0..resolved_aliases.len()).collect()
3794    } else {
3795        stmt.columns
3796            .iter()
3797            .map(|c| {
3798                alias_map
3799                    .get(&c.to_ascii_lowercase())
3800                    .copied()
3801                    .ok_or_else(|| SqlError::ColumnNotFound(c.clone()))
3802            })
3803            .collect::<Result<_>>()?
3804    };
3805
3806    let source_rows: Vec<Vec<Value>> = match &stmt.source {
3807        InsertSource::Values(rows) => {
3808            let mut out = Vec::with_capacity(rows.len());
3809            for row in rows {
3810                if row.len() != target_positions.len() {
3811                    return Err(SqlError::InvalidValue(format!(
3812                        "expected {} values, got {}",
3813                        target_positions.len(),
3814                        row.len()
3815                    )));
3816                }
3817                let mut vals = Vec::with_capacity(row.len());
3818                for expr in row {
3819                    let v = match expr {
3820                        Expr::Parameter(n) => params
3821                            .get(n - 1)
3822                            .cloned()
3823                            .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
3824                        Expr::Literal(v) => v.clone(),
3825                        other => eval_const_expr(other)?,
3826                    };
3827                    vals.push(v);
3828                }
3829                out.push(vals);
3830            }
3831            out
3832        }
3833        InsertSource::Select(sq) => {
3834            let empty_ctes = CteContext::default();
3835            let qr = exec_query_body_write(wtx, schema, &sq.body, &empty_ctes)?;
3836            qr.rows
3837        }
3838    };
3839
3840    let mut count: u64 = 0;
3841    for row in source_rows {
3842        if row.len() != target_positions.len() {
3843            return Err(SqlError::InvalidValue(format!(
3844                "expected {} values, got {}",
3845                target_positions.len(),
3846                row.len()
3847            )));
3848        }
3849        let mut new_row = vec![Value::Null; resolved_aliases.len()];
3850        for (slot, val) in target_positions.iter().zip(row) {
3851            new_row[*slot] = val;
3852        }
3853        super::triggers::fire_row_triggers(
3854            wtx,
3855            schema,
3856            view_name,
3857            crate::parser::TriggerTiming::InsteadOf,
3858            super::triggers::FireEvent::Insert,
3859            None,
3860            Some(new_row),
3861            &view_cols,
3862        )?;
3863        count += 1;
3864    }
3865    Ok(ExecutionResult::RowsAffected(count))
3866}
3867
3868fn derive_view_columns(
3869    wtx: &mut WriteTxn<'_>,
3870    schema: &SchemaManager,
3871    view_name: &str,
3872) -> Result<Vec<String>> {
3873    use crate::parser::{QueryBody, SelectColumn, SelectQuery, SelectStmt};
3874    let sel = SelectStmt {
3875        columns: vec![SelectColumn::AllColumns],
3876        from: view_name.to_string(),
3877        from_alias: None,
3878        from_subquery: None,
3879        from_args: None,
3880        from_json_table: None,
3881        joins: vec![],
3882        distinct: false,
3883        where_clause: None,
3884        order_by: vec![],
3885        limit: Some(Expr::Literal(Value::Integer(1))),
3886        offset: None,
3887        group_by: vec![],
3888        having: None,
3889    };
3890    let sq = SelectQuery {
3891        ctes: vec![],
3892        recursive: false,
3893        body: QueryBody::Select(Box::new(sel)),
3894    };
3895    let qr = super::cte::exec_select_query_in_txn(wtx, schema, &sq)?;
3896    match qr {
3897        ExecutionResult::Query(q) => Ok(q.columns),
3898        _ => Ok(Vec::new()),
3899    }
3900}
3901
3902#[cfg(test)]
3903#[path = "dml_tests.rs"]
3904mod tests;