Skip to main content

citadel_sql/executor/
dml.rs

1use std::cell::RefCell;
2use std::sync::Arc;
3
4use citadel::Database;
5use citadel_buffer::btree::{UpsertAction, UpsertOutcome};
6use citadel_txn::read_txn::ReadTxn;
7use citadel_txn::write_txn::WriteTxn;
8use rustc_hash::FxHashMap;
9
10use crate::encoding::{encode_composite_key_into, encode_row_into};
11use crate::error::{Result, SqlError};
12use crate::eval::{eval_expr, is_truthy, ColumnMap, EvalCtx};
13use crate::parser::*;
14use crate::types::*;
15
16use crate::schema::SchemaManager;
17
18use super::compile::CompiledPlan;
19use super::helpers::*;
20use super::CteContext;
21
22/// (before-insert, after-insert, after-update) row-trigger presence, hoisted once.
23fn row_insert_trigger_flags(schema: &SchemaManager, table_name: &str) -> (bool, bool, bool) {
24    use crate::parser::{TriggerEvent, TriggerGranularity, TriggerTiming};
25    let triggers = schema.triggers_for(table_name);
26    let row =
27        |t: &crate::types::TriggerDef| t.enabled && t.granularity == TriggerGranularity::ForEachRow;
28    let ev = |t: &crate::types::TriggerDef, update: bool| {
29        t.events.iter().any(|e| {
30            if update {
31                matches!(e, TriggerEvent::Update(_))
32            } else {
33                matches!(e, TriggerEvent::Insert)
34            }
35        })
36    };
37    (
38        triggers
39            .iter()
40            .any(|t| row(t) && t.timing == TriggerTiming::Before && ev(t, false)),
41        triggers
42            .iter()
43            .any(|t| row(t) && t.timing == TriggerTiming::After && ev(t, false)),
44        triggers
45            .iter()
46            .any(|t| row(t) && t.timing == TriggerTiming::After && ev(t, true)),
47    )
48}
49
50/// Classify an INSERT for cache invalidation: a pure append (single-INTEGER pk,
51/// no conflict) stays append-retainable; anything else hard-invalidates.
52fn mark_insert_dml(
53    schema: &SchemaManager,
54    table_name: &str,
55    on_conflict: bool,
56    single_int_pk: bool,
57    min_inserted_pk: Option<i64>,
58    rows_written: u64,
59) {
60    if on_conflict {
61        schema.mark_dml(table_name);
62    } else if single_int_pk {
63        if let Some(m) = min_inserted_pk {
64            schema.mark_dml_append(table_name, m);
65        }
66    } else if rows_written > 0 {
67        schema.mark_dml(table_name);
68    }
69}
70
71/// Single INTEGER pk - the only shape an ANN plan indexes (and append-retains).
72fn is_single_int_pk(table_schema: &TableSchema) -> bool {
73    table_schema.primary_key_columns.len() == 1
74        && matches!(
75            table_schema.columns[table_schema.primary_key_columns[0] as usize].data_type,
76            DataType::Integer
77        )
78}
79
80pub(super) fn exec_insert(
81    db: &Database,
82    schema: &SchemaManager,
83    stmt: &InsertStmt,
84    params: &[Value],
85) -> Result<ExecutionResult> {
86    let empty_ctes = CteContext::default();
87    let materialized;
88    let stmt = if insert_has_subquery(stmt) {
89        materialized = materialize_insert(stmt, &mut |sub| {
90            exec_subquery_read(db, schema, sub, &empty_ctes)
91        })?;
92        &materialized
93    } else {
94        stmt
95    };
96
97    let lower_name = stmt.table.to_ascii_lowercase();
98    if let Some(view_def) = schema.get_view(&lower_name) {
99        if super::triggers::has_instead_of(schema, &lower_name, super::triggers::FireEvent::Insert)
100        {
101            let aliases = view_def.column_aliases.clone();
102            return exec_instead_of_view_insert_auto(
103                db,
104                schema,
105                &lower_name,
106                &aliases,
107                stmt,
108                params,
109            );
110        }
111        return Err(SqlError::CannotModifyView(stmt.table.clone()));
112    }
113    if schema.get_matview(&lower_name).is_some() {
114        return Err(SqlError::CannotModifyView(format!(
115            "materialized view '{}' is read-only — use REFRESH MATERIALIZED VIEW",
116            stmt.table
117        )));
118    }
119    let table_schema = schema
120        .get(&lower_name)
121        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
122
123    let insert_columns = if stmt.columns.is_empty() {
124        table_schema
125            .columns
126            .iter()
127            .map(|c| c.name.clone())
128            .collect::<Vec<_>>()
129    } else {
130        stmt.columns
131            .iter()
132            .map(|c| c.to_ascii_lowercase())
133            .collect()
134    };
135
136    let col_indices: Vec<usize> = insert_columns
137        .iter()
138        .map(|name| {
139            table_schema
140                .column_index(name)
141                .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
142        })
143        .collect::<Result<_>>()?;
144
145    for &ci in &col_indices {
146        if table_schema.columns[ci].generated_kind.is_some() {
147            return Err(SqlError::CannotInsertIntoGeneratedColumn(
148                table_schema.columns[ci].name.clone(),
149            ));
150        }
151    }
152
153    let defaults: Vec<(usize, &Expr)> = table_schema
154        .columns
155        .iter()
156        .filter(|c| c.default_expr.is_some() && !col_indices.contains(&(c.position as usize)))
157        .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
158        .collect();
159
160    let generated_cols: Vec<(usize, &Expr)> = table_schema
161        .columns
162        .iter()
163        .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
164        .map(|c| (c.position as usize, c.generated_expr.as_ref().unwrap()))
165        .collect();
166
167    let has_checks = table_schema.has_checks();
168    let strict = table_schema.is_strict();
169    let row_col_map_for_gen = if !generated_cols.is_empty() {
170        Some(ColumnMap::new(&table_schema.columns))
171    } else {
172        None
173    };
174    let check_col_map = if has_checks {
175        Some(ColumnMap::new(&table_schema.columns))
176    } else {
177        None
178    };
179
180    let select_rows = match &stmt.source {
181        InsertSource::Select(sq) => {
182            let insert_ctes =
183                super::materialize_all_ctes(&sq.ctes, sq.recursive, &mut |body, ctx| {
184                    exec_query_body_read(db, schema, body, ctx)
185                })?;
186            let qr = exec_query_body_read(db, schema, &sq.body, &insert_ctes)?;
187            Some(qr.rows)
188        }
189        InsertSource::Values(_) => None,
190    };
191
192    let compiled_conflict: Option<Arc<CompiledOnConflict>> = stmt
193        .on_conflict
194        .as_ref()
195        .map(|oc| compile_on_conflict(oc, table_schema).map(Arc::new))
196        .transpose()?;
197
198    let row_col_map = compiled_conflict
199        .as_ref()
200        .map(|_| ColumnMap::new(&table_schema.columns));
201
202    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
203    // DML invalidates the table's persisted ANN segment in the SAME txn
204    // (rollback restores it; commit makes table-changed-but-segment-survives
205    // unrepresentable for this path).
206    super::ann_persist::purge_segment(&mut wtx, &table_schema.name)?;
207    let mut count: u64 = 0;
208    let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
209        stmt.returning.as_ref().map(|_| Vec::new());
210
211    let pk_indices = table_schema.pk_indices();
212    let non_pk = table_schema.non_pk_indices();
213    let enc_pos = table_schema.encoding_positions();
214    let phys_count = table_schema.physical_non_pk_count();
215    let mut row = vec![Value::Null; table_schema.columns.len()];
216    let mut pk_values: Vec<Value> = vec![Value::Null; pk_indices.len()];
217    let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
218    let mut key_buf: Vec<u8> = Vec::with_capacity(64);
219    let mut value_buf: Vec<u8> = Vec::with_capacity(256);
220    let mut fk_key_buf: Vec<u8> = Vec::with_capacity(64);
221
222    let values = match &stmt.source {
223        InsertSource::Values(rows) => Some(rows.as_slice()),
224        InsertSource::Select(_) => None,
225    };
226    let sel_rows = select_rows.as_deref();
227
228    let total = match (values, sel_rows) {
229        (Some(rows), _) => rows.len(),
230        (_, Some(rows)) => rows.len(),
231        _ => 0,
232    };
233
234    if let Some(sel) = sel_rows {
235        if !sel.is_empty() && sel[0].len() != insert_columns.len() {
236            return Err(SqlError::InvalidValue(format!(
237                "INSERT ... SELECT column count mismatch: expected {}, got {}",
238                insert_columns.len(),
239                sel[0].len()
240            )));
241        }
242    }
243
244    let has_insert_statement_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
245        t.enabled
246            && t.granularity == crate::parser::TriggerGranularity::ForEachStatement
247            && t.events
248                .iter()
249                .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
250    });
251    let mut stmt_new_rows: Vec<Vec<Value>> = if has_insert_statement_triggers {
252        Vec::with_capacity(total)
253    } else {
254        Vec::new()
255    };
256
257    if has_insert_statement_triggers {
258        super::triggers::fire_statement_triggers(
259            &mut wtx,
260            schema,
261            &table_schema.name,
262            crate::parser::TriggerTiming::Before,
263            super::triggers::FireEvent::Insert,
264            &table_schema.columns,
265            &[],
266            &[],
267        )?;
268    }
269
270    let plain_insert = compiled_conflict.is_none();
271    let single_int_pk = is_single_int_pk(table_schema);
272    let mut min_inserted_pk: Option<i64> = None;
273    let (has_before_insert_triggers, has_after_insert_triggers, has_after_update_triggers) =
274        row_insert_trigger_flags(schema, &table_schema.name);
275
276    for idx in 0..total {
277        for v in row.iter_mut() {
278            *v = Value::Null;
279        }
280
281        if let Some(value_rows) = values {
282            let value_row = &value_rows[idx];
283            if value_row.len() != insert_columns.len() {
284                return Err(SqlError::InvalidValue(format!(
285                    "expected {} values, got {}",
286                    insert_columns.len(),
287                    value_row.len()
288                )));
289            }
290            for (i, expr) in value_row.iter().enumerate() {
291                let val = if let Expr::Parameter(n) = expr {
292                    params
293                        .get(n - 1)
294                        .cloned()
295                        .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?
296                } else {
297                    eval_const_expr(expr)?
298                };
299                let col_idx = col_indices[i];
300                let col = &table_schema.columns[col_idx];
301                row[col_idx] = if val.is_null() {
302                    Value::Null
303                } else {
304                    coerce_for_column(val, col, strict)?
305                };
306            }
307        } else if let Some(sel) = sel_rows {
308            let sel_row = &sel[idx];
309            for (i, val) in sel_row.iter().enumerate() {
310                let col_idx = col_indices[i];
311                let col = &table_schema.columns[col_idx];
312                row[col_idx] = if val.is_null() {
313                    Value::Null
314                } else {
315                    coerce_for_column(val.clone(), col, strict)?
316                };
317            }
318        }
319
320        for &(pos, def_expr) in &defaults {
321            let val = eval_const_expr(def_expr)?;
322            let col = &table_schema.columns[pos];
323            if !val.is_null() {
324                row[pos] = coerce_for_column(val, col, strict)?;
325            }
326        }
327
328        if let Some(ref gen_map) = row_col_map_for_gen {
329            for &(pos, gen_expr) in &generated_cols {
330                let val = eval_expr(gen_expr, &EvalCtx::new(gen_map, &row))?;
331                let col = &table_schema.columns[pos];
332                row[pos] = if val.is_null() {
333                    Value::Null
334                } else {
335                    coerce_for_column(val, col, strict)?
336                };
337            }
338        }
339
340        for col in &table_schema.columns {
341            if !col.nullable && row[col.position as usize].is_null() {
342                return Err(SqlError::NotNullViolation(col.name.clone()));
343            }
344        }
345
346        if let Some(ref col_map) = check_col_map {
347            for col in &table_schema.columns {
348                if let Some(ref check) = col.check_expr {
349                    let result = eval_expr(check, &EvalCtx::new(col_map, &row))?;
350                    if !is_truthy(&result) && !result.is_null() {
351                        let name = col.check_name.as_deref().unwrap_or(&col.name);
352                        return Err(SqlError::CheckViolation(name.to_string()));
353                    }
354                }
355            }
356            for tc in &table_schema.check_constraints {
357                let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &row))?;
358                if !is_truthy(&result) && !result.is_null() {
359                    let name = tc.name.as_deref().unwrap_or(&tc.sql);
360                    return Err(SqlError::CheckViolation(name.to_string()));
361                }
362            }
363        }
364
365        for fk in &table_schema.foreign_keys {
366            let any_null = fk.columns.iter().any(|&ci| row[ci as usize].is_null());
367            if any_null {
368                continue; // MATCH SIMPLE: skip if any FK col is NULL
369            }
370            let fk_vals: Vec<Value> = fk
371                .columns
372                .iter()
373                .map(|&ci| row[ci as usize].clone())
374                .collect();
375            fk_key_buf.clear();
376            encode_composite_key_into(&fk_vals, &mut fk_key_buf);
377            if fk.deferrable && fk.initially_deferred {
378                let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
379                wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
380                    fk_name: name,
381                    foreign_table: fk.foreign_table.as_bytes().to_vec(),
382                    parent_key: fk_key_buf.clone(),
383                });
384                continue;
385            }
386            if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key_buf) {
387                let found = wtx
388                    .table_get(fk.foreign_table.as_bytes(), &fk_key_buf)
389                    .map_err(SqlError::Storage)?;
390                if found.is_none() {
391                    let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
392                    return Err(SqlError::ForeignKeyViolation(name.to_string()));
393                }
394                wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key_buf);
395            }
396        }
397
398        let proposed_row_for_returning: Option<Vec<Value>> =
399            returning_rows.as_ref().map(|_| row.clone());
400        let row_for_stmt_trigger: Option<Vec<Value>> = if has_insert_statement_triggers {
401            Some(row.clone())
402        } else {
403            None
404        };
405
406        if has_before_insert_triggers {
407            super::triggers::fire_row_triggers(
408                &mut wtx,
409                schema,
410                &table_schema.name,
411                crate::parser::TriggerTiming::Before,
412                super::triggers::FireEvent::Insert,
413                None,
414                Some(row.clone()),
415                &table_schema.columns,
416            )?;
417        }
418
419        for (j, &i) in pk_indices.iter().enumerate() {
420            pk_values[j] = std::mem::replace(&mut row[i], Value::Null);
421        }
422        encode_composite_key_into(&pk_values, &mut key_buf);
423        if plain_insert && single_int_pk {
424            if let Value::Integer(id) = &pk_values[0] {
425                min_inserted_pk = Some(min_inserted_pk.map_or(*id, |m| m.min(*id)));
426            }
427        }
428
429        for (j, &i) in non_pk.iter().enumerate() {
430            let col = &table_schema.columns[i];
431            if matches!(
432                col.generated_kind,
433                Some(crate::parser::GeneratedKind::Virtual)
434            ) {
435                value_values[enc_pos[j] as usize] = Value::Null;
436                row[i] = Value::Null;
437            } else {
438                value_values[enc_pos[j] as usize] = std::mem::replace(&mut row[i], Value::Null);
439            }
440        }
441        encode_row_into(&value_values, &mut value_buf);
442
443        if key_buf.len() > citadel_core::MAX_KEY_SIZE {
444            return Err(SqlError::KeyTooLarge {
445                size: key_buf.len(),
446                max: citadel_core::MAX_KEY_SIZE,
447            });
448        }
449        if value_buf.len() > citadel_core::MAX_VALUE_SIZE {
450            return Err(SqlError::RowTooLarge {
451                size: value_buf.len(),
452                max: citadel_core::MAX_VALUE_SIZE,
453            });
454        }
455
456        match compiled_conflict.as_ref() {
457            None => {
458                let is_new = wtx
459                    .table_insert(table_schema.name.as_bytes(), &key_buf, &value_buf)
460                    .map_err(SqlError::Storage)?;
461                if !is_new {
462                    return Err(SqlError::DuplicateKey);
463                }
464                if !table_schema.indices.is_empty() || has_after_insert_triggers {
465                    for (j, &i) in pk_indices.iter().enumerate() {
466                        row[i] = pk_values[j].clone();
467                    }
468                    for (j, &i) in non_pk.iter().enumerate() {
469                        row[i] =
470                            std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
471                    }
472                    if !table_schema.indices.is_empty() {
473                        insert_index_entries(&mut wtx, table_schema, &row, &pk_values)?;
474                    }
475                    if has_after_insert_triggers {
476                        super::triggers::fire_row_triggers(
477                            &mut wtx,
478                            schema,
479                            &table_schema.name,
480                            crate::parser::TriggerTiming::After,
481                            super::triggers::FireEvent::Insert,
482                            None,
483                            Some(row.clone()),
484                            &table_schema.columns,
485                        )?;
486                    }
487                }
488                if let Some(r) = row_for_stmt_trigger.clone() {
489                    stmt_new_rows.push(r);
490                }
491                count += 1;
492                if let Some(buf) = returning_rows.as_mut() {
493                    buf.push((None, proposed_row_for_returning));
494                }
495            }
496            Some(oc) => {
497                let oc_ref: &CompiledOnConflict = oc;
498                let needs_row = upsert_needs_row(oc_ref, table_schema);
499                if needs_row {
500                    for (j, &i) in pk_indices.iter().enumerate() {
501                        row[i] = pk_values[j].clone();
502                    }
503                    for (j, &i) in non_pk.iter().enumerate() {
504                        row[i] =
505                            std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
506                    }
507                }
508                let outcome = apply_insert_with_conflict(
509                    &mut wtx,
510                    table_schema,
511                    &key_buf,
512                    &value_buf,
513                    &row,
514                    &pk_values,
515                    oc_ref,
516                    row_col_map.as_ref().unwrap(),
517                    stmt.returning.is_some(),
518                )?;
519                match outcome {
520                    InsertRowOutcome::Inserted => {
521                        count += 1;
522                        if let Some(buf) = returning_rows.as_mut() {
523                            buf.push((None, proposed_row_for_returning));
524                        }
525                        if let Some(r) = row_for_stmt_trigger.clone() {
526                            stmt_new_rows.push(r);
527                        }
528                        if has_after_insert_triggers {
529                            super::triggers::fire_row_triggers(
530                                &mut wtx,
531                                schema,
532                                &table_schema.name,
533                                crate::parser::TriggerTiming::After,
534                                super::triggers::FireEvent::Insert,
535                                None,
536                                Some(row.clone()),
537                                &table_schema.columns,
538                            )?;
539                        }
540                    }
541                    InsertRowOutcome::Updated { old, new } => {
542                        count += 1;
543                        if let Some(buf) = returning_rows.as_mut() {
544                            buf.push((Some(old.clone()), Some(new.clone())));
545                        }
546                        if has_after_update_triggers {
547                            let changed_cols: Vec<String> = match oc_ref {
548                                CompiledOnConflict::DoUpdate { assignments, .. } => assignments
549                                    .iter()
550                                    .map(|(col_idx, _)| table_schema.columns[*col_idx].name.clone())
551                                    .collect(),
552                                _ => Vec::new(),
553                            };
554                            super::triggers::fire_row_triggers(
555                                &mut wtx,
556                                schema,
557                                &table_schema.name,
558                                crate::parser::TriggerTiming::After,
559                                super::triggers::FireEvent::Update {
560                                    changed_columns: &changed_cols,
561                                },
562                                Some(old),
563                                Some(new),
564                                &table_schema.columns,
565                            )?;
566                        }
567                    }
568                    InsertRowOutcome::Skipped => {}
569                }
570            }
571        }
572    }
573
574    if has_insert_statement_triggers {
575        super::triggers::fire_statement_triggers(
576            &mut wtx,
577            schema,
578            &table_schema.name,
579            crate::parser::TriggerTiming::After,
580            super::triggers::FireEvent::Insert,
581            &table_schema.columns,
582            &[],
583            &stmt_new_rows,
584        )?;
585    }
586
587    mark_insert_dml(
588        schema,
589        &table_schema.name,
590        !plain_insert,
591        single_int_pk,
592        min_inserted_pk,
593        count,
594    );
595
596    if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
597        let qr = super::helpers::project_returning(table_schema, returning_cols, &rows)?;
598        super::helpers::drain_deferred_fk_checks(&mut wtx)?;
599        wtx.commit().map_err(SqlError::Storage)?;
600        return Ok(ExecutionResult::Query(qr));
601    }
602
603    super::helpers::drain_deferred_fk_checks(&mut wtx)?;
604    wtx.commit().map_err(SqlError::Storage)?;
605    Ok(ExecutionResult::RowsAffected(count))
606}
607
608pub(super) fn has_subquery(expr: &Expr) -> bool {
609    crate::parser::has_subquery(expr)
610}
611
612pub(super) fn stmt_has_subquery(stmt: &SelectStmt) -> bool {
613    if let Some(ref w) = stmt.where_clause {
614        if has_subquery(w) {
615            return true;
616        }
617    }
618    if let Some(ref h) = stmt.having {
619        if has_subquery(h) {
620            return true;
621        }
622    }
623    for col in &stmt.columns {
624        if let SelectColumn::Expr { expr, .. } = col {
625            if has_subquery(expr) {
626                return true;
627            }
628        }
629    }
630    for ob in &stmt.order_by {
631        if has_subquery(&ob.expr) {
632            return true;
633        }
634    }
635    for join in &stmt.joins {
636        if let Some(ref on_expr) = join.on_clause {
637            if has_subquery(on_expr) {
638                return true;
639            }
640        }
641    }
642    false
643}
644
645pub(super) fn materialize_expr(
646    expr: &Expr,
647    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
648) -> Result<Expr> {
649    match expr {
650        Expr::InSubquery {
651            expr: e,
652            subquery,
653            negated,
654        } => {
655            let inner = materialize_expr(e, exec_sub)?;
656            let qr = exec_sub(subquery)?;
657            if !qr.columns.is_empty() && qr.columns.len() != 1 {
658                return Err(SqlError::SubqueryMultipleColumns);
659            }
660            let mut values = rustc_hash::FxHashSet::default();
661            let mut has_null = false;
662            for row in &qr.rows {
663                if row[0].is_null() {
664                    has_null = true;
665                } else {
666                    values.insert(row[0].clone());
667                }
668            }
669            Ok(Expr::InSet {
670                expr: Box::new(inner),
671                values,
672                has_null,
673                negated: *negated,
674            })
675        }
676        Expr::ScalarSubquery(subquery) => {
677            let qr = exec_sub(subquery)?;
678            if qr.rows.len() > 1 {
679                return Err(SqlError::SubqueryMultipleRows);
680            }
681            let val = if qr.rows.is_empty() {
682                Value::Null
683            } else {
684                qr.rows[0][0].clone()
685            };
686            Ok(Expr::Literal(val))
687        }
688        Expr::Exists { subquery, negated } => {
689            let qr = exec_sub(subquery)?;
690            let exists = !qr.rows.is_empty();
691            let result = if *negated { !exists } else { exists };
692            Ok(Expr::Literal(Value::Boolean(result)))
693        }
694        Expr::InList {
695            expr: e,
696            list,
697            negated,
698        } => {
699            let inner = materialize_expr(e, exec_sub)?;
700            let items = list
701                .iter()
702                .map(|item| materialize_expr(item, exec_sub))
703                .collect::<Result<Vec<_>>>()?;
704            Ok(Expr::InList {
705                expr: Box::new(inner),
706                list: items,
707                negated: *negated,
708            })
709        }
710        Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
711            left: Box::new(materialize_expr(left, exec_sub)?),
712            op: *op,
713            right: Box::new(materialize_expr(right, exec_sub)?),
714        }),
715        Expr::UnaryOp { op, expr: e } => Ok(Expr::UnaryOp {
716            op: *op,
717            expr: Box::new(materialize_expr(e, exec_sub)?),
718        }),
719        Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(materialize_expr(e, exec_sub)?))),
720        Expr::IsNotNull(e) => Ok(Expr::IsNotNull(Box::new(materialize_expr(e, exec_sub)?))),
721        Expr::InSet {
722            expr: e,
723            values,
724            has_null,
725            negated,
726        } => Ok(Expr::InSet {
727            expr: Box::new(materialize_expr(e, exec_sub)?),
728            values: values.clone(),
729            has_null: *has_null,
730            negated: *negated,
731        }),
732        Expr::Between {
733            expr: e,
734            low,
735            high,
736            negated,
737        } => Ok(Expr::Between {
738            expr: Box::new(materialize_expr(e, exec_sub)?),
739            low: Box::new(materialize_expr(low, exec_sub)?),
740            high: Box::new(materialize_expr(high, exec_sub)?),
741            negated: *negated,
742        }),
743        Expr::Like {
744            expr: e,
745            pattern,
746            escape,
747            negated,
748        } => {
749            let esc = escape
750                .as_ref()
751                .map(|es| materialize_expr(es, exec_sub).map(Box::new))
752                .transpose()?;
753            Ok(Expr::Like {
754                expr: Box::new(materialize_expr(e, exec_sub)?),
755                pattern: Box::new(materialize_expr(pattern, exec_sub)?),
756                escape: esc,
757                negated: *negated,
758            })
759        }
760        Expr::Case {
761            operand,
762            conditions,
763            else_result,
764        } => {
765            let op = operand
766                .as_ref()
767                .map(|e| materialize_expr(e, exec_sub).map(Box::new))
768                .transpose()?;
769            let conds = conditions
770                .iter()
771                .map(|(c, r)| {
772                    Ok((
773                        materialize_expr(c, exec_sub)?,
774                        materialize_expr(r, exec_sub)?,
775                    ))
776                })
777                .collect::<Result<Vec<_>>>()?;
778            let else_r = else_result
779                .as_ref()
780                .map(|e| materialize_expr(e, exec_sub).map(Box::new))
781                .transpose()?;
782            Ok(Expr::Case {
783                operand: op,
784                conditions: conds,
785                else_result: else_r,
786            })
787        }
788        Expr::Coalesce(args) => {
789            let materialized = args
790                .iter()
791                .map(|a| materialize_expr(a, exec_sub))
792                .collect::<Result<Vec<_>>>()?;
793            Ok(Expr::Coalesce(materialized))
794        }
795        Expr::Cast { expr: e, data_type } => Ok(Expr::Cast {
796            expr: Box::new(materialize_expr(e, exec_sub)?),
797            data_type: *data_type,
798        }),
799        Expr::Function {
800            name,
801            args,
802            distinct,
803        } => {
804            let materialized = args
805                .iter()
806                .map(|a| materialize_expr(a, exec_sub))
807                .collect::<Result<Vec<_>>>()?;
808            Ok(Expr::Function {
809                name: name.clone(),
810                args: materialized,
811                distinct: *distinct,
812            })
813        }
814        other => Ok(other.clone()),
815    }
816}
817
818pub(super) fn materialize_stmt(
819    stmt: &SelectStmt,
820    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
821) -> Result<SelectStmt> {
822    let where_clause = stmt
823        .where_clause
824        .as_ref()
825        .map(|e| materialize_expr(e, exec_sub))
826        .transpose()?;
827    let having = stmt
828        .having
829        .as_ref()
830        .map(|e| materialize_expr(e, exec_sub))
831        .transpose()?;
832    let columns = stmt
833        .columns
834        .iter()
835        .map(|c| match c {
836            SelectColumn::AllColumns => Ok(SelectColumn::AllColumns),
837            SelectColumn::AllFromOld => Ok(SelectColumn::AllFromOld),
838            SelectColumn::AllFromNew => Ok(SelectColumn::AllFromNew),
839            SelectColumn::Expr { expr, alias } => Ok(SelectColumn::Expr {
840                expr: materialize_expr(expr, exec_sub)?,
841                alias: alias.clone(),
842            }),
843        })
844        .collect::<Result<Vec<_>>>()?;
845    let order_by = stmt
846        .order_by
847        .iter()
848        .map(|ob| {
849            Ok(OrderByItem {
850                expr: materialize_expr(&ob.expr, exec_sub)?,
851                descending: ob.descending,
852                nulls_first: ob.nulls_first,
853            })
854        })
855        .collect::<Result<Vec<_>>>()?;
856    let joins = stmt
857        .joins
858        .iter()
859        .map(|j| {
860            let on_clause = j
861                .on_clause
862                .as_ref()
863                .map(|e| materialize_expr(e, exec_sub))
864                .transpose()?;
865            Ok(JoinClause {
866                join_type: j.join_type,
867                table: j.table.clone(),
868                subquery: j.subquery.clone(),
869                on_clause,
870            })
871        })
872        .collect::<Result<Vec<_>>>()?;
873    let group_by = stmt
874        .group_by
875        .iter()
876        .map(|e| materialize_expr(e, exec_sub))
877        .collect::<Result<Vec<_>>>()?;
878    Ok(SelectStmt {
879        columns,
880        from: stmt.from.clone(),
881        from_alias: stmt.from_alias.clone(),
882        from_subquery: stmt.from_subquery.clone(),
883        from_args: stmt.from_args.clone(),
884        from_json_table: stmt.from_json_table.clone(),
885        joins,
886        distinct: stmt.distinct,
887        where_clause,
888        order_by,
889        limit: stmt.limit.clone(),
890        offset: stmt.offset.clone(),
891        group_by,
892        having,
893    })
894}
895
896pub(super) fn exec_subquery_read(
897    db: &Database,
898    schema: &SchemaManager,
899    stmt: &SelectStmt,
900    ctes: &CteContext,
901) -> Result<QueryResult> {
902    let mut rtx = db.begin_read();
903    exec_subquery_with_read(&mut rtx, schema, stmt, ctes)
904}
905
906pub(super) fn exec_subquery_with_read(
907    rtx: &mut ReadTxn<'_>,
908    schema: &SchemaManager,
909    stmt: &SelectStmt,
910    ctes: &CteContext,
911) -> Result<QueryResult> {
912    match super::exec_select_with_read(rtx, schema, stmt, ctes)? {
913        ExecutionResult::Query(qr) => Ok(qr),
914        _ => Ok(QueryResult {
915            columns: vec![],
916            rows: vec![],
917        }),
918    }
919}
920
921pub(super) fn exec_subquery_write(
922    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
923    schema: &SchemaManager,
924    stmt: &SelectStmt,
925    ctes: &CteContext,
926) -> Result<QueryResult> {
927    match super::exec_select_in_txn(wtx, schema, stmt, ctes)? {
928        ExecutionResult::Query(qr) => Ok(qr),
929        _ => Ok(QueryResult {
930            columns: vec![],
931            rows: vec![],
932        }),
933    }
934}
935
936pub(super) fn update_has_subquery(stmt: &UpdateStmt) -> bool {
937    stmt.where_clause.as_ref().is_some_and(has_subquery)
938        || stmt.assignments.iter().any(|(_, e)| has_subquery(e))
939}
940
941pub(super) fn materialize_update(
942    stmt: &UpdateStmt,
943    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
944) -> Result<UpdateStmt> {
945    let where_clause = stmt
946        .where_clause
947        .as_ref()
948        .map(|e| materialize_expr(e, exec_sub))
949        .transpose()?;
950    let assignments = stmt
951        .assignments
952        .iter()
953        .map(|(name, expr)| Ok((name.clone(), materialize_expr(expr, exec_sub)?)))
954        .collect::<Result<Vec<_>>>()?;
955    Ok(UpdateStmt {
956        table: stmt.table.clone(),
957        assignments,
958        where_clause,
959        returning: stmt.returning.clone(),
960    })
961}
962
963pub(super) fn delete_has_subquery(stmt: &DeleteStmt) -> bool {
964    stmt.where_clause.as_ref().is_some_and(has_subquery)
965}
966
967pub(super) fn materialize_delete(
968    stmt: &DeleteStmt,
969    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
970) -> Result<DeleteStmt> {
971    let where_clause = stmt
972        .where_clause
973        .as_ref()
974        .map(|e| materialize_expr(e, exec_sub))
975        .transpose()?;
976    Ok(DeleteStmt {
977        table: stmt.table.clone(),
978        where_clause,
979        returning: stmt.returning.clone(),
980    })
981}
982
983pub(super) fn insert_has_subquery(stmt: &InsertStmt) -> bool {
984    match &stmt.source {
985        InsertSource::Values(rows) => rows.iter().any(|row| row.iter().any(has_subquery)),
986        // SELECT source subqueries are handled by exec_select's correlated/non-correlated paths
987        InsertSource::Select(_) => false,
988    }
989}
990
991pub(super) fn materialize_insert(
992    stmt: &InsertStmt,
993    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
994) -> Result<InsertStmt> {
995    let source = match &stmt.source {
996        InsertSource::Values(rows) => {
997            let mat = rows
998                .iter()
999                .map(|row| {
1000                    row.iter()
1001                        .map(|e| materialize_expr(e, exec_sub))
1002                        .collect::<Result<Vec<_>>>()
1003                })
1004                .collect::<Result<Vec<_>>>()?;
1005            InsertSource::Values(mat)
1006        }
1007        InsertSource::Select(sq) => {
1008            let ctes = sq
1009                .ctes
1010                .iter()
1011                .map(|c| {
1012                    Ok(CteDefinition {
1013                        name: c.name.clone(),
1014                        column_aliases: c.column_aliases.clone(),
1015                        body: materialize_query_body(&c.body, exec_sub)?,
1016                    })
1017                })
1018                .collect::<Result<Vec<_>>>()?;
1019            let body = materialize_query_body(&sq.body, exec_sub)?;
1020            InsertSource::Select(Box::new(SelectQuery {
1021                ctes,
1022                recursive: sq.recursive,
1023                body,
1024            }))
1025        }
1026    };
1027    Ok(InsertStmt {
1028        table: stmt.table.clone(),
1029        columns: stmt.columns.clone(),
1030        source,
1031        on_conflict: stmt.on_conflict.clone(),
1032        returning: stmt.returning.clone(),
1033    })
1034}
1035
1036pub(super) fn materialize_query_body(
1037    body: &QueryBody,
1038    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1039) -> Result<QueryBody> {
1040    match body {
1041        QueryBody::Select(sel) => Ok(QueryBody::Select(Box::new(materialize_stmt(
1042            sel, exec_sub,
1043        )?))),
1044        QueryBody::Compound(comp) => Ok(QueryBody::Compound(Box::new(CompoundSelect {
1045            op: comp.op.clone(),
1046            all: comp.all,
1047            left: Box::new(materialize_query_body(&comp.left, exec_sub)?),
1048            right: Box::new(materialize_query_body(&comp.right, exec_sub)?),
1049            order_by: comp.order_by.clone(),
1050            limit: comp.limit.clone(),
1051            offset: comp.offset.clone(),
1052        }))),
1053        QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Ok(body.clone()),
1054    }
1055}
1056
1057pub(super) fn exec_query_body_with_read(
1058    rtx: &mut ReadTxn<'_>,
1059    schema: &SchemaManager,
1060    body: &QueryBody,
1061    ctes: &CteContext,
1062) -> Result<ExecutionResult> {
1063    match body {
1064        QueryBody::Select(sel) => super::exec_select_with_read(rtx, schema, sel, ctes),
1065        QueryBody::Compound(comp) => exec_compound_select_with_read(rtx, schema, comp, ctes),
1066        QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Err(
1067            SqlError::Unsupported("DML CTE bodies require an active write transaction".into()),
1068        ),
1069    }
1070}
1071
1072pub(super) fn exec_query_body_in_txn(
1073    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1074    schema: &SchemaManager,
1075    body: &QueryBody,
1076    ctes: &CteContext,
1077) -> Result<ExecutionResult> {
1078    match body {
1079        QueryBody::Select(sel) => super::exec_select_in_txn(wtx, schema, sel, ctes),
1080        QueryBody::Compound(comp) => exec_compound_select_in_txn(wtx, schema, comp, ctes),
1081        QueryBody::Insert(ins) => exec_insert_in_txn_with_ctes(wtx, schema, ins, &[], ctes),
1082        QueryBody::Update(upd) => super::exec_update_in_txn(wtx, schema, upd),
1083        QueryBody::Delete(del) => super::exec_delete_in_txn(wtx, schema, del),
1084    }
1085}
1086
1087pub(super) fn exec_query_body_read(
1088    db: &Database,
1089    schema: &SchemaManager,
1090    body: &QueryBody,
1091    ctes: &CteContext,
1092) -> Result<QueryResult> {
1093    let mut rtx = db.begin_read();
1094    exec_query_body_with_read_qr(&mut rtx, schema, body, ctes)
1095}
1096
1097pub(super) fn exec_query_body_with_read_qr(
1098    rtx: &mut ReadTxn<'_>,
1099    schema: &SchemaManager,
1100    body: &QueryBody,
1101    ctes: &CteContext,
1102) -> Result<QueryResult> {
1103    match exec_query_body_with_read(rtx, schema, body, ctes)? {
1104        ExecutionResult::Query(qr) => Ok(qr),
1105        _ => Ok(QueryResult {
1106            columns: vec![],
1107            rows: vec![],
1108        }),
1109    }
1110}
1111
1112pub(super) fn exec_query_body_write(
1113    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1114    schema: &SchemaManager,
1115    body: &QueryBody,
1116    ctes: &CteContext,
1117) -> Result<QueryResult> {
1118    match exec_query_body_in_txn(wtx, schema, body, ctes)? {
1119        ExecutionResult::Query(qr) => Ok(qr),
1120        _ => Ok(QueryResult {
1121            columns: vec![],
1122            rows: vec![],
1123        }),
1124    }
1125}
1126
1127pub(super) fn exec_compound_select_with_read(
1128    rtx: &mut ReadTxn<'_>,
1129    schema: &SchemaManager,
1130    comp: &CompoundSelect,
1131    ctes: &CteContext,
1132) -> Result<ExecutionResult> {
1133    let left_qr = match exec_query_body_with_read(rtx, schema, &comp.left, ctes)? {
1134        ExecutionResult::Query(qr) => qr,
1135        _ => QueryResult {
1136            columns: vec![],
1137            rows: vec![],
1138        },
1139    };
1140    let right_qr = match exec_query_body_with_read(rtx, schema, &comp.right, ctes)? {
1141        ExecutionResult::Query(qr) => qr,
1142        _ => QueryResult {
1143            columns: vec![],
1144            rows: vec![],
1145        },
1146    };
1147    apply_set_operation(comp, left_qr, right_qr)
1148}
1149
1150pub(super) fn exec_compound_select_in_txn(
1151    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1152    schema: &SchemaManager,
1153    comp: &CompoundSelect,
1154    ctes: &CteContext,
1155) -> Result<ExecutionResult> {
1156    let left_qr = match exec_query_body_in_txn(wtx, schema, &comp.left, ctes)? {
1157        ExecutionResult::Query(qr) => qr,
1158        _ => QueryResult {
1159            columns: vec![],
1160            rows: vec![],
1161        },
1162    };
1163    let right_qr = match exec_query_body_in_txn(wtx, schema, &comp.right, ctes)? {
1164        ExecutionResult::Query(qr) => qr,
1165        _ => QueryResult {
1166            columns: vec![],
1167            rows: vec![],
1168        },
1169    };
1170    apply_set_operation(comp, left_qr, right_qr)
1171}
1172
1173pub(super) fn apply_set_operation(
1174    comp: &CompoundSelect,
1175    left_qr: QueryResult,
1176    right_qr: QueryResult,
1177) -> Result<ExecutionResult> {
1178    if !left_qr.columns.is_empty()
1179        && !right_qr.columns.is_empty()
1180        && left_qr.columns.len() != right_qr.columns.len()
1181    {
1182        return Err(SqlError::CompoundColumnCountMismatch {
1183            left: left_qr.columns.len(),
1184            right: right_qr.columns.len(),
1185        });
1186    }
1187
1188    let columns = left_qr.columns;
1189
1190    let mut rows = match (&comp.op, comp.all) {
1191        (SetOp::Union, true) => {
1192            let total = left_qr.rows.len().saturating_add(right_qr.rows.len());
1193            let mut rows = Vec::with_capacity(total);
1194            rows.extend(left_qr.rows);
1195            rows.extend(right_qr.rows);
1196            rows
1197        }
1198        (SetOp::Union, false) => {
1199            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1200            let mut rows = Vec::new();
1201            for row in left_qr.rows.into_iter().chain(right_qr.rows) {
1202                if !seen.contains(&row) {
1203                    seen.insert(row.clone());
1204                    rows.push(row);
1205                }
1206            }
1207            rows
1208        }
1209        (SetOp::Intersect, true) => {
1210            let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1211            for row in &right_qr.rows {
1212                *right_counts.entry(row.clone()).or_insert(0) += 1;
1213            }
1214            let mut rows = Vec::new();
1215            for row in left_qr.rows {
1216                if let Some(count) = right_counts.get_mut(&row) {
1217                    if *count > 0 {
1218                        *count -= 1;
1219                        rows.push(row);
1220                    }
1221                }
1222            }
1223            rows
1224        }
1225        (SetOp::Intersect, false) => {
1226            let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1227            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1228            let mut rows = Vec::new();
1229            for row in left_qr.rows {
1230                if right_set.contains(&row) && !seen.contains(&row) {
1231                    seen.insert(row.clone());
1232                    rows.push(row);
1233                }
1234            }
1235            rows
1236        }
1237        (SetOp::Except, true) => {
1238            let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1239            for row in &right_qr.rows {
1240                *right_counts.entry(row.clone()).or_insert(0) += 1;
1241            }
1242            let mut rows = Vec::new();
1243            for row in left_qr.rows {
1244                if let Some(count) = right_counts.get_mut(&row) {
1245                    if *count > 0 {
1246                        *count -= 1;
1247                        continue;
1248                    }
1249                }
1250                rows.push(row);
1251            }
1252            rows
1253        }
1254        (SetOp::Except, false) => {
1255            let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1256            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1257            let mut rows = Vec::new();
1258            for row in left_qr.rows {
1259                if !right_set.contains(&row) && !seen.contains(&row) {
1260                    seen.insert(row.clone());
1261                    rows.push(row);
1262                }
1263            }
1264            rows
1265        }
1266    };
1267
1268    if !comp.order_by.is_empty() {
1269        let col_defs: Vec<crate::types::ColumnDef> = columns
1270            .iter()
1271            .enumerate()
1272            .map(|(i, name)| crate::types::ColumnDef {
1273                name: name.clone(),
1274                data_type: crate::types::DataType::Null,
1275                nullable: true,
1276                position: i as u16,
1277                default_expr: None,
1278                default_sql: None,
1279                check_expr: None,
1280                check_sql: None,
1281                check_name: None,
1282                is_with_timezone: false,
1283                generated_expr: None,
1284                generated_sql: None,
1285                generated_kind: None,
1286                collation: crate::types::Collation::Binary,
1287            })
1288            .collect();
1289        sort_rows(&mut rows, &comp.order_by, &col_defs)?;
1290    }
1291
1292    if let Some(ref offset_expr) = comp.offset {
1293        let offset = eval_const_int(offset_expr)?.max(0) as usize;
1294        if offset < rows.len() {
1295            rows = rows.split_off(offset);
1296        } else {
1297            rows.clear();
1298        }
1299    }
1300
1301    if let Some(ref limit_expr) = comp.limit {
1302        let limit = eval_const_int(limit_expr)?.max(0) as usize;
1303        rows.truncate(limit);
1304    }
1305
1306    Ok(ExecutionResult::Query(QueryResult { columns, rows }))
1307}
1308
1309struct InsertBufs {
1310    row: Vec<Value>,
1311    pk_values: Vec<Value>,
1312    value_values: Vec<Value>,
1313    key_buf: Vec<u8>,
1314    value_buf: Vec<u8>,
1315    col_indices: Vec<usize>,
1316    fk_key_buf: Vec<u8>,
1317}
1318
1319impl InsertBufs {
1320    fn new() -> Self {
1321        Self {
1322            row: Vec::new(),
1323            pk_values: Vec::new(),
1324            value_values: Vec::new(),
1325            key_buf: Vec::with_capacity(64),
1326            value_buf: Vec::with_capacity(256),
1327            col_indices: Vec::new(),
1328            fk_key_buf: Vec::with_capacity(64),
1329        }
1330    }
1331}
1332
1333thread_local! {
1334    static INSERT_SCRATCH: RefCell<InsertBufs> = RefCell::new(InsertBufs::new());
1335    static UPSERT_SCRATCH: RefCell<UpsertBufs> = RefCell::new(UpsertBufs::new());
1336}
1337
1338fn with_insert_scratch<R>(f: impl FnOnce(&mut InsertBufs) -> R) -> R {
1339    INSERT_SCRATCH.with(|slot| match slot.try_borrow_mut() {
1340        Ok(mut borrowed) => f(&mut borrowed),
1341        Err(_) => {
1342            let mut local = InsertBufs::new();
1343            f(&mut local)
1344        }
1345    })
1346}
1347
1348pub(super) struct UpsertBufs {
1349    old_row: Vec<Value>,
1350    new_row: Vec<Value>,
1351    value_values: Vec<Value>,
1352    new_value_buf: Vec<u8>,
1353}
1354
1355impl UpsertBufs {
1356    pub(super) fn new() -> Self {
1357        Self {
1358            old_row: Vec::new(),
1359            new_row: Vec::new(),
1360            value_values: Vec::new(),
1361            new_value_buf: Vec::with_capacity(256),
1362        }
1363    }
1364}
1365
1366pub fn exec_insert_in_txn(
1367    wtx: &mut WriteTxn<'_>,
1368    schema: &SchemaManager,
1369    stmt: &InsertStmt,
1370    params: &[Value],
1371) -> Result<ExecutionResult> {
1372    with_insert_scratch(|bufs| {
1373        exec_insert_in_txn_impl(
1374            wtx,
1375            schema,
1376            stmt,
1377            params,
1378            bufs,
1379            None,
1380            &CteContext::default(),
1381        )
1382    })
1383}
1384
1385pub(super) fn exec_insert_in_txn_with_ctes(
1386    wtx: &mut WriteTxn<'_>,
1387    schema: &SchemaManager,
1388    stmt: &InsertStmt,
1389    params: &[Value],
1390    outer_ctes: &CteContext,
1391) -> Result<ExecutionResult> {
1392    with_insert_scratch(|bufs| {
1393        exec_insert_in_txn_impl(wtx, schema, stmt, params, bufs, None, outer_ctes)
1394    })
1395}
1396
1397fn exec_insert_in_txn_cached(
1398    wtx: &mut WriteTxn<'_>,
1399    schema: &SchemaManager,
1400    stmt: &InsertStmt,
1401    params: &[Value],
1402    cache: &InsertCache,
1403) -> Result<ExecutionResult> {
1404    with_insert_scratch(|bufs| {
1405        exec_insert_in_txn_impl(
1406            wtx,
1407            schema,
1408            stmt,
1409            params,
1410            bufs,
1411            Some(cache),
1412            &CteContext::default(),
1413        )
1414    })
1415}
1416
1417fn exec_insert_in_txn_impl(
1418    wtx: &mut WriteTxn<'_>,
1419    schema: &SchemaManager,
1420    stmt: &InsertStmt,
1421    params: &[Value],
1422    bufs: &mut InsertBufs,
1423    cache: Option<&InsertCache>,
1424    outer_ctes: &CteContext,
1425) -> Result<ExecutionResult> {
1426    let empty_ctes = CteContext::default();
1427    let materialized;
1428    let has_sub = match cache {
1429        Some(c) => c.has_subquery,
1430        None => insert_has_subquery(stmt),
1431    };
1432    let stmt = if has_sub {
1433        materialized = materialize_insert(stmt, &mut |sub| {
1434            exec_subquery_write(wtx, schema, sub, &empty_ctes)
1435        })?;
1436        &materialized
1437    } else {
1438        stmt
1439    };
1440
1441    let view_lookup_key = stmt.table.to_ascii_lowercase();
1442    if let Some(view_def) = schema.get_view(&view_lookup_key) {
1443        if super::triggers::has_instead_of(
1444            schema,
1445            &view_lookup_key,
1446            super::triggers::FireEvent::Insert,
1447        ) {
1448            let aliases = view_def.column_aliases.clone();
1449            return exec_instead_of_view_insert_in_txn(
1450                wtx,
1451                schema,
1452                &view_lookup_key,
1453                &aliases,
1454                stmt,
1455                params,
1456            );
1457        }
1458        return Err(SqlError::CannotModifyView(stmt.table.clone()));
1459    }
1460
1461    let table_schema = schema
1462        .get(&stmt.table)
1463        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1464    if table_schema.has_ann_index() {
1465        super::ann_persist::purge_segment(wtx, &table_schema.name)?;
1466    }
1467
1468    let default_columns;
1469    let insert_columns: &[String] = if stmt.columns.is_empty() {
1470        default_columns = table_schema
1471            .columns
1472            .iter()
1473            .map(|c| c.name.clone())
1474            .collect::<Vec<_>>();
1475        &default_columns
1476    } else {
1477        &stmt.columns
1478    };
1479
1480    bufs.col_indices.clear();
1481    if let Some(c) = cache {
1482        bufs.col_indices.extend_from_slice(&c.col_indices);
1483    } else {
1484        for name in insert_columns {
1485            bufs.col_indices.push(
1486                table_schema
1487                    .column_index(name)
1488                    .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?,
1489            );
1490        }
1491    }
1492
1493    if cache.is_none() {
1494        for &ci in &bufs.col_indices {
1495            if table_schema.columns[ci].generated_kind.is_some() {
1496                return Err(SqlError::CannotInsertIntoGeneratedColumn(
1497                    table_schema.columns[ci].name.clone(),
1498                ));
1499            }
1500        }
1501    }
1502
1503    let generated_cols_uncached: Vec<(usize, &Expr, FastGenEval)>;
1504    let cached_gen_positions: &[usize];
1505    let cached_gen_fast_evals: &[FastGenEval];
1506    if let Some(c) = cache {
1507        cached_gen_positions = &c.generated_col_positions;
1508        cached_gen_fast_evals = &c.generated_fast_evals;
1509        generated_cols_uncached = Vec::new();
1510    } else {
1511        cached_gen_positions = &[];
1512        cached_gen_fast_evals = &[];
1513        generated_cols_uncached = table_schema
1514            .columns
1515            .iter()
1516            .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
1517            .map(|c| {
1518                let expr = c.generated_expr.as_ref().unwrap();
1519                let fe = detect_fast_gen_eval(expr, table_schema);
1520                (c.position as usize, expr, fe)
1521            })
1522            .collect();
1523    }
1524    let has_gen_cols = !cached_gen_positions.is_empty() || !generated_cols_uncached.is_empty();
1525    let row_col_map_for_gen_owned: Option<ColumnMap> = if !has_gen_cols || cache.is_some() {
1526        None
1527    } else {
1528        Some(ColumnMap::new(&table_schema.columns))
1529    };
1530    let row_col_map_for_gen: Option<&ColumnMap> = if !has_gen_cols {
1531        None
1532    } else if let Some(c) = cache {
1533        c.row_col_map.as_ref()
1534    } else {
1535        row_col_map_for_gen_owned.as_ref()
1536    };
1537
1538    let any_defaults = match cache {
1539        Some(c) => c.any_defaults,
1540        None => table_schema
1541            .columns
1542            .iter()
1543            .any(|c| c.default_expr.is_some()),
1544    };
1545    let defaults: Vec<(usize, &Expr)> = if any_defaults {
1546        table_schema
1547            .columns
1548            .iter()
1549            .filter(|c| {
1550                c.default_expr.is_some() && !bufs.col_indices.contains(&(c.position as usize))
1551            })
1552            .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
1553            .collect()
1554    } else {
1555        Vec::new()
1556    };
1557
1558    let has_checks = match cache {
1559        Some(c) => c.has_checks,
1560        None => table_schema.has_checks(),
1561    };
1562    let check_col_map = if has_checks {
1563        Some(ColumnMap::new(&table_schema.columns))
1564    } else {
1565        None
1566    };
1567
1568    let (pk_indices, non_pk, enc_pos, phys_count, dropped): (
1569        &[usize],
1570        &[usize],
1571        &[u16],
1572        usize,
1573        &[u16],
1574    ) = if let Some(c) = cache {
1575        (
1576            &c.pk_indices,
1577            &c.non_pk_indices,
1578            &c.encoding_positions,
1579            c.phys_count,
1580            &c.dropped_non_pk_slots,
1581        )
1582    } else {
1583        (
1584            table_schema.pk_indices(),
1585            table_schema.non_pk_indices(),
1586            table_schema.encoding_positions(),
1587            table_schema.physical_non_pk_count(),
1588            table_schema.dropped_non_pk_slots(),
1589        )
1590    };
1591
1592    bufs.row.resize(table_schema.columns.len(), Value::Null);
1593    bufs.pk_values.resize(pk_indices.len(), Value::Null);
1594    bufs.value_values.resize(phys_count, Value::Null);
1595
1596    let table_bytes = table_schema.name.as_bytes();
1597    let has_fks = !table_schema.foreign_keys.is_empty();
1598    let has_indices = !table_schema.indices.is_empty();
1599    let has_defaults = !defaults.is_empty();
1600
1601    let compiled_conflict: Option<Arc<CompiledOnConflict>> = match (cache, &stmt.on_conflict) {
1602        (Some(c), Some(_)) if c.on_conflict.is_some() => c.on_conflict.clone(),
1603        (_, Some(oc)) => Some(Arc::new(compile_on_conflict(oc, table_schema)?)),
1604        (_, None) => None,
1605    };
1606
1607    let row_col_map_owned: Option<ColumnMap> =
1608        if compiled_conflict.is_some() && cache.and_then(|c| c.row_col_map.as_ref()).is_none() {
1609            Some(ColumnMap::new(&table_schema.columns))
1610        } else {
1611            None
1612        };
1613    let row_col_map: Option<&ColumnMap> = cache
1614        .and_then(|c| c.row_col_map.as_ref())
1615        .or(row_col_map_owned.as_ref());
1616
1617    let select_rows = match &stmt.source {
1618        InsertSource::Select(sq) => {
1619            let insert_ctes = super::materialize_all_ctes_with_outer(
1620                &sq.ctes,
1621                sq.recursive,
1622                outer_ctes,
1623                &mut |body, ctx| exec_query_body_write(wtx, schema, body, ctx),
1624            )?;
1625            let qr = exec_query_body_write(wtx, schema, &sq.body, &insert_ctes)?;
1626            Some(qr.rows)
1627        }
1628        InsertSource::Values(_) => None,
1629    };
1630
1631    let mut count: u64 = 0;
1632    let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
1633        stmt.returning.as_ref().map(|_| Vec::new());
1634
1635    let plain_insert = compiled_conflict.is_none();
1636    let single_int_pk = is_single_int_pk(table_schema);
1637    let mut min_inserted_pk: Option<i64> = None;
1638
1639    let values = match &stmt.source {
1640        InsertSource::Values(rows) => Some(rows.as_slice()),
1641        InsertSource::Select(_) => None,
1642    };
1643    let sel_rows = select_rows.as_deref();
1644
1645    let total = match (values, sel_rows) {
1646        (Some(rows), _) => rows.len(),
1647        (_, Some(rows)) => rows.len(),
1648        _ => 0,
1649    };
1650
1651    if let Some(sel) = sel_rows {
1652        if !sel.is_empty() && sel[0].len() != insert_columns.len() {
1653            return Err(SqlError::InvalidValue(format!(
1654                "INSERT ... SELECT column count mismatch: expected {}, got {}",
1655                insert_columns.len(),
1656                sel[0].len()
1657            )));
1658        }
1659    }
1660
1661    let has_insert_statement_triggers_impl =
1662        schema.triggers_for(&table_schema.name).iter().any(|t| {
1663            t.enabled
1664                && t.granularity == crate::parser::TriggerGranularity::ForEachStatement
1665                && t.events
1666                    .iter()
1667                    .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1668        });
1669    let mut stmt_new_rows_impl: Vec<Vec<Value>> = if has_insert_statement_triggers_impl {
1670        Vec::with_capacity(total)
1671    } else {
1672        Vec::new()
1673    };
1674    if has_insert_statement_triggers_impl {
1675        super::triggers::fire_statement_triggers(
1676            wtx,
1677            schema,
1678            &table_schema.name,
1679            crate::parser::TriggerTiming::Before,
1680            super::triggers::FireEvent::Insert,
1681            &table_schema.columns,
1682            &[],
1683            &[],
1684        )?;
1685    }
1686
1687    let (has_before_insert_triggers, has_after_insert_triggers, has_after_update_triggers) =
1688        row_insert_trigger_flags(schema, &table_schema.name);
1689
1690    let skip_row_clear = cache.is_some_and(|c| c.row_fully_overwritten);
1691    for idx in 0..total {
1692        if !skip_row_clear {
1693            for v in bufs.row.iter_mut() {
1694                *v = Value::Null;
1695            }
1696        }
1697
1698        if let Some(value_rows) = values {
1699            if let Some(plan) = cache.and_then(|c| c.bind_plan.as_ref()) {
1700                for action in plan {
1701                    match action {
1702                        BindAction::Param {
1703                            param_idx,
1704                            col_idx,
1705                            target,
1706                        } => {
1707                            let v = &params[*param_idx];
1708                            bufs.row[*col_idx] = if v.is_null() {
1709                                Value::Null
1710                            } else if v.data_type() == *target {
1711                                v.clone()
1712                            } else {
1713                                let got = v.data_type();
1714                                v.clone().coerce_into(*target).ok_or_else(|| {
1715                                    SqlError::TypeMismatch {
1716                                        expected: target.to_string(),
1717                                        got: got.to_string(),
1718                                    }
1719                                })?
1720                            };
1721                        }
1722                        BindAction::Literal { value, col_idx } => {
1723                            bufs.row[*col_idx] = value.clone();
1724                        }
1725                    }
1726                }
1727            } else {
1728                let value_row = &value_rows[idx];
1729                if value_row.len() != insert_columns.len() {
1730                    return Err(SqlError::InvalidValue(format!(
1731                        "expected {} values, got {}",
1732                        insert_columns.len(),
1733                        value_row.len()
1734                    )));
1735                }
1736                for (i, expr) in value_row.iter().enumerate() {
1737                    let val = match expr {
1738                        Expr::Parameter(n) => params
1739                            .get(n - 1)
1740                            .cloned()
1741                            .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
1742                        Expr::Literal(v) => v.clone(),
1743                        _ => eval_const_expr(expr)?,
1744                    };
1745                    let col_idx = bufs.col_indices[i];
1746                    let col = &table_schema.columns[col_idx];
1747                    let got_type = val.data_type();
1748                    bufs.row[col_idx] = if val.is_null() {
1749                        Value::Null
1750                    } else {
1751                        val.coerce_into(col.data_type)
1752                            .ok_or_else(|| SqlError::TypeMismatch {
1753                                expected: col.data_type.to_string(),
1754                                got: got_type.to_string(),
1755                            })?
1756                    };
1757                }
1758            }
1759        } else if let Some(sel) = sel_rows {
1760            let sel_row = &sel[idx];
1761            for (i, val) in sel_row.iter().enumerate() {
1762                let col_idx = bufs.col_indices[i];
1763                let col = &table_schema.columns[col_idx];
1764                let got_type = val.data_type();
1765                bufs.row[col_idx] = if val.is_null() {
1766                    Value::Null
1767                } else {
1768                    val.clone().coerce_into(col.data_type).ok_or_else(|| {
1769                        SqlError::TypeMismatch {
1770                            expected: col.data_type.to_string(),
1771                            got: got_type.to_string(),
1772                        }
1773                    })?
1774                };
1775            }
1776        }
1777
1778        if has_defaults {
1779            for &(pos, def_expr) in &defaults {
1780                let val = eval_const_expr(def_expr)?;
1781                let col = &table_schema.columns[pos];
1782                if !val.is_null() {
1783                    let got_type = val.data_type();
1784                    bufs.row[pos] =
1785                        val.coerce_into(col.data_type)
1786                            .ok_or_else(|| SqlError::TypeMismatch {
1787                                expected: col.data_type.to_string(),
1788                                got: got_type.to_string(),
1789                            })?;
1790                }
1791            }
1792        }
1793
1794        if let Some(gen_map) = row_col_map_for_gen {
1795            if cache.is_some() {
1796                for (pos, fast) in cached_gen_positions
1797                    .iter()
1798                    .copied()
1799                    .zip(cached_gen_fast_evals.iter())
1800                {
1801                    let gen_expr = table_schema.columns[pos].generated_expr.as_ref().unwrap();
1802                    let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1803                    let col = &table_schema.columns[pos];
1804                    bufs.row[pos] = if val.is_null() {
1805                        Value::Null
1806                    } else {
1807                        let got_type = val.data_type();
1808                        val.coerce_into(col.data_type)
1809                            .ok_or_else(|| SqlError::TypeMismatch {
1810                                expected: col.data_type.to_string(),
1811                                got: got_type.to_string(),
1812                            })?
1813                    };
1814                }
1815            } else {
1816                for (pos, gen_expr, fast) in &generated_cols_uncached {
1817                    let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1818                    let col = &table_schema.columns[*pos];
1819                    bufs.row[*pos] = if val.is_null() {
1820                        Value::Null
1821                    } else {
1822                        let got_type = val.data_type();
1823                        val.coerce_into(col.data_type)
1824                            .ok_or_else(|| SqlError::TypeMismatch {
1825                                expected: col.data_type.to_string(),
1826                                got: got_type.to_string(),
1827                            })?
1828                    };
1829                }
1830            }
1831        }
1832
1833        if let Some(c) = cache {
1834            for &pos in &c.not_null_indices {
1835                if bufs.row[pos as usize].is_null() {
1836                    return Err(SqlError::NotNullViolation(
1837                        table_schema.columns[pos as usize].name.clone(),
1838                    ));
1839                }
1840            }
1841        } else {
1842            for col in &table_schema.columns {
1843                if !col.nullable && bufs.row[col.position as usize].is_null() {
1844                    return Err(SqlError::NotNullViolation(col.name.clone()));
1845                }
1846            }
1847        }
1848
1849        if let Some(ref col_map) = check_col_map {
1850            for col in &table_schema.columns {
1851                if let Some(ref check) = col.check_expr {
1852                    let result = eval_expr(check, &EvalCtx::new(col_map, &bufs.row))?;
1853                    if !is_truthy(&result) && !result.is_null() {
1854                        let name = col.check_name.as_deref().unwrap_or(&col.name);
1855                        return Err(SqlError::CheckViolation(name.to_string()));
1856                    }
1857                }
1858            }
1859            for tc in &table_schema.check_constraints {
1860                let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &bufs.row))?;
1861                if !is_truthy(&result) && !result.is_null() {
1862                    let name = tc.name.as_deref().unwrap_or(&tc.sql);
1863                    return Err(SqlError::CheckViolation(name.to_string()));
1864                }
1865            }
1866        }
1867
1868        if has_fks {
1869            for fk in &table_schema.foreign_keys {
1870                let any_null = fk.columns.iter().any(|&ci| bufs.row[ci as usize].is_null());
1871                if any_null {
1872                    continue;
1873                }
1874                crate::encoding::encode_composite_key_from_indices(
1875                    &fk.columns,
1876                    &bufs.row,
1877                    &mut bufs.fk_key_buf,
1878                );
1879                if fk.deferrable && fk.initially_deferred {
1880                    let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
1881                    wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
1882                        fk_name: name,
1883                        foreign_table: fk.foreign_table.as_bytes().to_vec(),
1884                        parent_key: bufs.fk_key_buf.clone(),
1885                    });
1886                    continue;
1887                }
1888                if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &bufs.fk_key_buf) {
1889                    let found = wtx
1890                        .table_get(fk.foreign_table.as_bytes(), &bufs.fk_key_buf)
1891                        .map_err(SqlError::Storage)?;
1892                    if found.is_none() {
1893                        let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
1894                        return Err(SqlError::ForeignKeyViolation(name.to_string()));
1895                    }
1896                    wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &bufs.fk_key_buf);
1897                }
1898            }
1899        }
1900
1901        let proposed_row_for_returning: Option<Vec<Value>> =
1902            returning_rows.as_ref().map(|_| bufs.row.clone());
1903        let row_for_stmt_trigger_impl: Option<Vec<Value>> = if has_insert_statement_triggers_impl {
1904            Some(bufs.row.clone())
1905        } else {
1906            None
1907        };
1908
1909        if has_before_insert_triggers {
1910            super::triggers::fire_row_triggers(
1911                wtx,
1912                schema,
1913                &table_schema.name,
1914                crate::parser::TriggerTiming::Before,
1915                super::triggers::FireEvent::Insert,
1916                None,
1917                Some(bufs.row.clone()),
1918                &table_schema.columns,
1919            )?;
1920        }
1921
1922        for (j, &i) in pk_indices.iter().enumerate() {
1923            bufs.pk_values[j] = std::mem::replace(&mut bufs.row[i], Value::Null);
1924        }
1925        match cache.map(|c| c.single_int_pk).unwrap_or(false) {
1926            true => match bufs.pk_values[0] {
1927                Value::Integer(v) => crate::encoding::encode_int_key_into(v, &mut bufs.key_buf),
1928                _ => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1929            },
1930            false => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1931        }
1932        if plain_insert && single_int_pk {
1933            if let Value::Integer(id) = &bufs.pk_values[0] {
1934                min_inserted_pk = Some(min_inserted_pk.map_or(*id, |m| m.min(*id)));
1935            }
1936        }
1937
1938        for &slot in dropped {
1939            bufs.value_values[slot as usize] = Value::Null;
1940        }
1941        for (j, &i) in non_pk.iter().enumerate() {
1942            let col = &table_schema.columns[i];
1943            if matches!(
1944                col.generated_kind,
1945                Some(crate::parser::GeneratedKind::Virtual)
1946            ) {
1947                bufs.value_values[enc_pos[j] as usize] = Value::Null;
1948                bufs.row[i] = Value::Null;
1949            } else {
1950                bufs.value_values[enc_pos[j] as usize] =
1951                    std::mem::replace(&mut bufs.row[i], Value::Null);
1952            }
1953        }
1954        match cache.and_then(|c| c.row_encoder.as_ref()) {
1955            Some(tmpl) => crate::encoding::encode_row_with_template(
1956                tmpl,
1957                &bufs.value_values,
1958                &mut bufs.value_buf,
1959            )?,
1960            None => encode_row_into(&bufs.value_values, &mut bufs.value_buf),
1961        }
1962
1963        if bufs.key_buf.len() > citadel_core::MAX_KEY_SIZE {
1964            return Err(SqlError::KeyTooLarge {
1965                size: bufs.key_buf.len(),
1966                max: citadel_core::MAX_KEY_SIZE,
1967            });
1968        }
1969        if bufs.value_buf.len() > citadel_core::MAX_VALUE_SIZE {
1970            return Err(SqlError::RowTooLarge {
1971                size: bufs.value_buf.len(),
1972                max: citadel_core::MAX_VALUE_SIZE,
1973            });
1974        }
1975
1976        match compiled_conflict.as_ref() {
1977            None => {
1978                let is_new = wtx
1979                    .table_insert(table_bytes, &bufs.key_buf, &bufs.value_buf)
1980                    .map_err(SqlError::Storage)?;
1981                if !is_new {
1982                    return Err(SqlError::DuplicateKey);
1983                }
1984                if has_indices || has_after_insert_triggers {
1985                    for (j, &i) in pk_indices.iter().enumerate() {
1986                        bufs.row[i] = bufs.pk_values[j].clone();
1987                    }
1988                    for (j, &i) in non_pk.iter().enumerate() {
1989                        bufs.row[i] = std::mem::replace(
1990                            &mut bufs.value_values[enc_pos[j] as usize],
1991                            Value::Null,
1992                        );
1993                    }
1994                    if has_indices {
1995                        insert_index_entries(wtx, table_schema, &bufs.row, &bufs.pk_values)?;
1996                    }
1997                    if has_after_insert_triggers {
1998                        super::triggers::fire_row_triggers(
1999                            wtx,
2000                            schema,
2001                            &table_schema.name,
2002                            crate::parser::TriggerTiming::After,
2003                            super::triggers::FireEvent::Insert,
2004                            None,
2005                            Some(bufs.row.clone()),
2006                            &table_schema.columns,
2007                        )?;
2008                    }
2009                }
2010                if let Some(r) = row_for_stmt_trigger_impl.clone() {
2011                    stmt_new_rows_impl.push(r);
2012                }
2013                count += 1;
2014                if let Some(buf) = returning_rows.as_mut() {
2015                    buf.push((None, proposed_row_for_returning));
2016                }
2017            }
2018            Some(oc) => {
2019                let oc_ref: &CompiledOnConflict = oc;
2020                let needs_row = upsert_needs_row(oc_ref, table_schema);
2021                if needs_row {
2022                    for (j, &i) in pk_indices.iter().enumerate() {
2023                        bufs.row[i] = bufs.pk_values[j].clone();
2024                    }
2025                    for (j, &i) in non_pk.iter().enumerate() {
2026                        bufs.row[i] = std::mem::replace(
2027                            &mut bufs.value_values[enc_pos[j] as usize],
2028                            Value::Null,
2029                        );
2030                    }
2031                }
2032                let outcome = apply_insert_with_conflict(
2033                    wtx,
2034                    table_schema,
2035                    &bufs.key_buf,
2036                    &bufs.value_buf,
2037                    &bufs.row,
2038                    &bufs.pk_values,
2039                    oc_ref,
2040                    row_col_map.unwrap(),
2041                    stmt.returning.is_some(),
2042                )?;
2043                match outcome {
2044                    InsertRowOutcome::Inserted => {
2045                        count += 1;
2046                        if let Some(buf) = returning_rows.as_mut() {
2047                            buf.push((None, proposed_row_for_returning));
2048                        }
2049                        if let Some(r) = row_for_stmt_trigger_impl.clone() {
2050                            stmt_new_rows_impl.push(r);
2051                        }
2052                        if has_after_insert_triggers {
2053                            super::triggers::fire_row_triggers(
2054                                wtx,
2055                                schema,
2056                                &table_schema.name,
2057                                crate::parser::TriggerTiming::After,
2058                                super::triggers::FireEvent::Insert,
2059                                None,
2060                                Some(bufs.row.clone()),
2061                                &table_schema.columns,
2062                            )?;
2063                        }
2064                    }
2065                    InsertRowOutcome::Updated { old, new } => {
2066                        count += 1;
2067                        if let Some(buf) = returning_rows.as_mut() {
2068                            buf.push((Some(old.clone()), Some(new.clone())));
2069                        }
2070                        if has_after_update_triggers {
2071                            let changed_cols: Vec<String> = match oc_ref {
2072                                CompiledOnConflict::DoUpdate { assignments, .. } => assignments
2073                                    .iter()
2074                                    .map(|(col_idx, _)| table_schema.columns[*col_idx].name.clone())
2075                                    .collect(),
2076                                _ => Vec::new(),
2077                            };
2078                            super::triggers::fire_row_triggers(
2079                                wtx,
2080                                schema,
2081                                &table_schema.name,
2082                                crate::parser::TriggerTiming::After,
2083                                super::triggers::FireEvent::Update {
2084                                    changed_columns: &changed_cols,
2085                                },
2086                                Some(old),
2087                                Some(new),
2088                                &table_schema.columns,
2089                            )?;
2090                        }
2091                    }
2092                    InsertRowOutcome::Skipped => {}
2093                }
2094            }
2095        }
2096    }
2097
2098    mark_insert_dml(
2099        schema,
2100        &table_schema.name,
2101        !plain_insert,
2102        single_int_pk,
2103        min_inserted_pk,
2104        count,
2105    );
2106
2107    if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
2108        if has_insert_statement_triggers_impl {
2109            super::triggers::fire_statement_triggers(
2110                wtx,
2111                schema,
2112                &table_schema.name,
2113                crate::parser::TriggerTiming::After,
2114                super::triggers::FireEvent::Insert,
2115                &table_schema.columns,
2116                &[],
2117                &stmt_new_rows_impl,
2118            )?;
2119        }
2120        return Ok(ExecutionResult::Query(super::helpers::project_returning(
2121            table_schema,
2122            returning_cols,
2123            &rows,
2124        )?));
2125    }
2126
2127    if has_insert_statement_triggers_impl {
2128        super::triggers::fire_statement_triggers(
2129            wtx,
2130            schema,
2131            &table_schema.name,
2132            crate::parser::TriggerTiming::After,
2133            super::triggers::FireEvent::Insert,
2134            &table_schema.columns,
2135            &[],
2136            &stmt_new_rows_impl,
2137        )?;
2138    }
2139
2140    Ok(ExecutionResult::RowsAffected(count))
2141}
2142
2143pub struct CompiledInsert {
2144    table_lower: String,
2145    cached: Option<InsertCache>,
2146}
2147
2148struct InsertCache {
2149    col_indices: Vec<usize>,
2150    has_subquery: bool,
2151    any_defaults: bool,
2152    has_checks: bool,
2153    on_conflict: Option<Arc<CompiledOnConflict>>,
2154    row_col_map: Option<ColumnMap>,
2155    generated_col_positions: Vec<usize>,
2156    generated_fast_evals: Vec<FastGenEval>,
2157    pk_indices: Vec<usize>,
2158    non_pk_indices: Vec<usize>,
2159    encoding_positions: Vec<u16>,
2160    dropped_non_pk_slots: Vec<u16>,
2161    phys_count: usize,
2162    single_int_pk: bool,
2163    not_null_indices: Vec<u16>,
2164    bind_plan: Option<Vec<BindAction>>,
2165    row_fully_overwritten: bool,
2166    row_encoder: Option<crate::encoding::RowTemplate>,
2167    is_trivial_fast: bool,
2168    trivial_fast_program: Option<TrivialFastProgram>,
2169    needs_scoped_params: bool,
2170}
2171
2172#[derive(Clone)]
2173enum BindAction {
2174    Param {
2175        param_idx: usize,
2176        col_idx: usize,
2177        target: DataType,
2178    },
2179    Literal {
2180        value: Value,
2181        col_idx: usize,
2182    },
2183}
2184
2185#[derive(Clone)]
2186struct TrivialFastProgram {
2187    template: Vec<u8>,
2188    ops: Vec<WriteOp>,
2189    pk_param: u8,
2190    not_null_param_indices: Vec<u8>,
2191    fk_checks: Vec<FkCheckSpec>,
2192    index_inserts: Vec<IndexInsertSpec>,
2193    on_dup: DupPolicy,
2194}
2195
2196/// PK-duplicate policy: plain INSERT errors; `ON CONFLICT DO NOTHING` skips.
2197#[derive(Clone, Copy, PartialEq, Eq)]
2198enum DupPolicy {
2199    Error,
2200    Skip,
2201}
2202
2203/// A foreign-key existence check encodable straight from bound params.
2204#[derive(Clone)]
2205struct FkCheckSpec {
2206    foreign_table: Vec<u8>,
2207    col_params: Vec<u8>,
2208}
2209
2210/// A pure-column non-unique secondary index insert encodable from bound params.
2211#[derive(Clone)]
2212struct IndexInsertSpec {
2213    table: Vec<u8>,
2214    key_params: Vec<(u8, crate::types::Collation)>,
2215}
2216
2217#[derive(Clone)]
2218enum WriteOp {
2219    ParamI64 {
2220        param_idx: u8,
2221        off: u32,
2222    },
2223    LiteralI64 {
2224        value: i64,
2225        off: u32,
2226    },
2227    GenAddParamsI64 {
2228        a_param: u8,
2229        b_param: u8,
2230        off: u32,
2231        bitmap_byte_off: u32,
2232        bitmap_bit_mask: u8,
2233    },
2234    GenMulAddParamI64 {
2235        param_idx: u8,
2236        mul: i64,
2237        add: i64,
2238        off: u32,
2239        bitmap_byte_off: u32,
2240        bitmap_bit_mask: u8,
2241    },
2242}
2243
2244fn build_trivial_fast_program(
2245    bind_plan: &[BindAction],
2246    phys_count: usize,
2247    non_virtual_pairs: &[(usize, usize)],
2248    generated_col_positions: &[usize],
2249    generated_fast_evals: &[FastGenEval],
2250    ts: &TableSchema,
2251    on_conflict: Option<&CompiledOnConflict>,
2252) -> Option<TrivialFastProgram> {
2253    let columns = &ts.columns;
2254    let pk_col = ts.pk_indices()[0];
2255
2256    // Only DO NOTHING on a PK arbiter can skip dupes; any other shape bails.
2257    let on_dup = match on_conflict {
2258        None => DupPolicy::Error,
2259        Some(CompiledOnConflict::DoNothing { target })
2260            if matches!(target, None | Some(ConflictKind::PrimaryKey))
2261                && ts.indices.is_empty()
2262                && ts.foreign_keys.is_empty() =>
2263        {
2264            DupPolicy::Skip
2265        }
2266        _ => return None,
2267    };
2268
2269    let mut col_to_bind: rustc_hash::FxHashMap<usize, &BindAction> = Default::default();
2270    for action in bind_plan {
2271        let col = match action {
2272            BindAction::Param { col_idx, .. } | BindAction::Literal { col_idx, .. } => *col_idx,
2273        };
2274        col_to_bind.insert(col, action);
2275    }
2276
2277    // Freeze literals into the template; int params/generated cols stay holes.
2278    let mut slots: Vec<crate::encoding::TemplateSlot> = (0..phys_count)
2279        .map(|_| crate::encoding::TemplateSlot::Null)
2280        .collect();
2281    for &(col, slot) in non_virtual_pairs {
2282        slots[slot] = match col_to_bind.get(&col) {
2283            Some(BindAction::Literal { value, .. }) => {
2284                crate::encoding::TemplateSlot::Const(value.clone())
2285            }
2286            Some(BindAction::Param { target, .. }) => {
2287                if *target != DataType::Integer {
2288                    return None;
2289                }
2290                crate::encoding::TemplateSlot::IntHole
2291            }
2292            None => {
2293                if columns[col].data_type != DataType::Integer {
2294                    return None;
2295                }
2296                crate::encoding::TemplateSlot::IntHole
2297            }
2298        };
2299    }
2300    let tmpl = crate::encoding::build_row_template(phys_count, &slots);
2301    let col_to_slot: rustc_hash::FxHashMap<usize, usize> =
2302        non_virtual_pairs.iter().copied().collect();
2303    let slot_to_off: rustc_hash::FxHashMap<usize, usize> =
2304        tmpl.slot_offsets.iter().copied().collect();
2305
2306    let mut col_to_param: rustc_hash::FxHashMap<usize, u8> = Default::default();
2307    let mut col_to_lit_int: rustc_hash::FxHashMap<usize, i64> = Default::default();
2308    let mut pk_param: Option<u8> = None;
2309    let mut ops: Vec<WriteOp> = Vec::with_capacity(bind_plan.len() + generated_col_positions.len());
2310    let mut not_null_param_indices: Vec<u8> = Vec::new();
2311
2312    for action in bind_plan {
2313        match action {
2314            BindAction::Param {
2315                param_idx,
2316                col_idx,
2317                target,
2318            } => {
2319                if *target != DataType::Integer {
2320                    return None;
2321                }
2322                let pi: u8 = u8::try_from(*param_idx).ok()?;
2323                col_to_param.insert(*col_idx, pi);
2324                if *col_idx == pk_col {
2325                    pk_param = Some(pi);
2326                } else {
2327                    let slot = *col_to_slot.get(col_idx)?;
2328                    let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
2329                    ops.push(WriteOp::ParamI64 { param_idx: pi, off });
2330                    if !columns[*col_idx].nullable {
2331                        not_null_param_indices.push(pi);
2332                    }
2333                }
2334            }
2335            // Already in the template; record ints only for generated-col refs.
2336            BindAction::Literal { value, col_idx } => {
2337                if *col_idx == pk_col {
2338                    return None;
2339                }
2340                if let Value::Integer(v) = value {
2341                    col_to_lit_int.insert(*col_idx, *v);
2342                }
2343            }
2344        }
2345    }
2346
2347    let pk_param = pk_param?;
2348
2349    for (i, &gen_pos) in generated_col_positions.iter().enumerate() {
2350        let gen_slot = *col_to_slot.get(&gen_pos)?;
2351        let gen_off = u32::try_from(*slot_to_off.get(&gen_slot)?).ok()?;
2352        let bitmap_byte_off = u32::try_from(2 + gen_slot / 8).ok()?;
2353        let bitmap_bit_mask: u8 = 1u8 << (gen_slot % 8);
2354        let gen_col_nullable = columns[gen_pos].nullable;
2355
2356        match &generated_fast_evals[i] {
2357            FastGenEval::IntColAddCol {
2358                left_idx,
2359                right_idx,
2360            } => {
2361                let a_param = col_to_param.get(left_idx).copied();
2362                let b_param = col_to_param.get(right_idx).copied();
2363                match (a_param, b_param) {
2364                    (Some(ap), Some(bp)) => {
2365                        let deps_safe = gen_col_nullable
2366                            || (not_null_param_indices.contains(&ap)
2367                                && not_null_param_indices.contains(&bp));
2368                        if !deps_safe {
2369                            return None;
2370                        }
2371                        ops.push(WriteOp::GenAddParamsI64 {
2372                            a_param: ap,
2373                            b_param: bp,
2374                            off: gen_off,
2375                            bitmap_byte_off,
2376                            bitmap_bit_mask,
2377                        });
2378                    }
2379                    (Some(p), None) => {
2380                        let lit = col_to_lit_int.get(right_idx).copied()?;
2381                        if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2382                            return None;
2383                        }
2384                        ops.push(WriteOp::GenMulAddParamI64 {
2385                            param_idx: p,
2386                            mul: 1,
2387                            add: lit,
2388                            off: gen_off,
2389                            bitmap_byte_off,
2390                            bitmap_bit_mask,
2391                        });
2392                    }
2393                    (None, Some(p)) => {
2394                        let lit = col_to_lit_int.get(left_idx).copied()?;
2395                        if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2396                            return None;
2397                        }
2398                        ops.push(WriteOp::GenMulAddParamI64 {
2399                            param_idx: p,
2400                            mul: 1,
2401                            add: lit,
2402                            off: gen_off,
2403                            bitmap_byte_off,
2404                            bitmap_bit_mask,
2405                        });
2406                    }
2407                    (None, None) => {
2408                        let la = col_to_lit_int.get(left_idx).copied()?;
2409                        let lb = col_to_lit_int.get(right_idx).copied()?;
2410                        ops.push(WriteOp::LiteralI64 {
2411                            value: la.wrapping_add(lb),
2412                            off: gen_off,
2413                        });
2414                    }
2415                }
2416            }
2417            FastGenEval::IntColMulAdd {
2418                col_schema_idx,
2419                mul,
2420                add,
2421            } => {
2422                if let Some(p) = col_to_param.get(col_schema_idx).copied() {
2423                    if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2424                        return None;
2425                    }
2426                    ops.push(WriteOp::GenMulAddParamI64 {
2427                        param_idx: p,
2428                        mul: *mul,
2429                        add: *add,
2430                        off: gen_off,
2431                        bitmap_byte_off,
2432                        bitmap_bit_mask,
2433                    });
2434                } else if let Some(lit) = col_to_lit_int.get(col_schema_idx).copied() {
2435                    ops.push(WriteOp::LiteralI64 {
2436                        value: lit.wrapping_mul(*mul).wrapping_add(*add),
2437                        off: gen_off,
2438                    });
2439                } else {
2440                    return None;
2441                }
2442            }
2443            FastGenEval::None => return None,
2444        }
2445    }
2446
2447    let mut fk_checks: Vec<FkCheckSpec> = Vec::with_capacity(ts.foreign_keys.len());
2448    for fk in &ts.foreign_keys {
2449        if fk.deferrable && fk.initially_deferred {
2450            return None;
2451        }
2452        let mut col_params = Vec::with_capacity(fk.columns.len());
2453        for &c in &fk.columns {
2454            col_params.push(col_to_param.get(&(c as usize)).copied()?);
2455        }
2456        fk_checks.push(FkCheckSpec {
2457            foreign_table: fk.foreign_table.as_bytes().to_vec(),
2458            col_params,
2459        });
2460    }
2461
2462    let mut index_inserts: Vec<IndexInsertSpec> = Vec::with_capacity(ts.indices.len());
2463    for idx in &ts.indices {
2464        if idx.unique
2465            || !idx.is_pure_column_index()
2466            || idx.predicate_expr.is_some()
2467            || idx.predicate_sql.is_some()
2468        {
2469            return None;
2470        }
2471        let mut key_params = Vec::with_capacity(idx.keys.len());
2472        for (i, key) in idx.keys.iter().enumerate() {
2473            let crate::types::IndexKey::Column { idx: col_idx, .. } = key else {
2474                return None;
2475            };
2476            key_params.push((
2477                col_to_param.get(&(*col_idx as usize)).copied()?,
2478                idx.collation_at(i),
2479            ));
2480        }
2481        index_inserts.push(IndexInsertSpec {
2482            table: TableSchema::index_table_name(&ts.name, &idx.name),
2483            key_params,
2484        });
2485    }
2486
2487    Some(TrivialFastProgram {
2488        template: tmpl.template,
2489        ops,
2490        pk_param,
2491        not_null_param_indices,
2492        fk_checks,
2493        index_inserts,
2494        on_dup,
2495    })
2496}
2497
2498#[derive(Clone)]
2499pub(super) enum CompiledOnConflict {
2500    DoNothing {
2501        target: Option<ConflictKind>,
2502    },
2503    DoUpdate {
2504        target: ConflictKind,
2505        assignments: Vec<(usize, Expr)>,
2506        where_clause: Option<Expr>,
2507        fast_paths: Option<Vec<DoUpdateFastPath>>,
2508    },
2509}
2510
2511#[derive(Clone, Copy)]
2512pub(super) enum DoUpdateFastPath {
2513    IntAddConst { phys_idx: usize, delta: i64 },
2514}
2515
2516#[derive(Clone, Debug)]
2517pub(super) enum ConflictKind {
2518    PrimaryKey,
2519    UniqueIndex { index_idx: usize },
2520}
2521
2522fn resolve_conflict_target(target: &ConflictTarget, ts: &TableSchema) -> Result<ConflictKind> {
2523    match target {
2524        ConflictTarget::Columns(cols) => {
2525            let col_idx_set: Vec<u16> = cols
2526                .iter()
2527                .map(|name| {
2528                    ts.column_index(name)
2529                        .map(|i| i as u16)
2530                        .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
2531                })
2532                .collect::<Result<_>>()?;
2533            let pk_set = ts.primary_key_columns.clone();
2534            if set_equal(&col_idx_set, &pk_set) {
2535                return Ok(ConflictKind::PrimaryKey);
2536            }
2537            for (index_idx, idx) in ts.indices.iter().enumerate() {
2538                if idx.unique && set_equal(&col_idx_set, &idx.columns_vec()) {
2539                    return Ok(ConflictKind::UniqueIndex { index_idx });
2540                }
2541            }
2542            Err(SqlError::Plan(
2543                "ON CONFLICT target does not match any unique constraint".into(),
2544            ))
2545        }
2546        ConflictTarget::Constraint(name) => {
2547            let lower = name.to_ascii_lowercase();
2548            for (index_idx, idx) in ts.indices.iter().enumerate() {
2549                if idx.name.eq_ignore_ascii_case(&lower) {
2550                    if idx.unique {
2551                        return Ok(ConflictKind::UniqueIndex { index_idx });
2552                    }
2553                    return Err(SqlError::Plan(format!(
2554                        "ON CONFLICT ON CONSTRAINT '{name}' requires a unique index"
2555                    )));
2556                }
2557            }
2558            Err(SqlError::Plan(format!(
2559                "unknown constraint '{name}'; primary keys cannot be referenced by name, use ON CONFLICT (col_list)"
2560            )))
2561        }
2562    }
2563}
2564
2565fn set_equal(a: &[u16], b: &[u16]) -> bool {
2566    if a.len() != b.len() {
2567        return false;
2568    }
2569    let mut a_sorted = a.to_vec();
2570    let mut b_sorted = b.to_vec();
2571    a_sorted.sort_unstable();
2572    b_sorted.sort_unstable();
2573    a_sorted == b_sorted
2574}
2575
2576pub(super) enum InsertRowOutcome {
2577    Inserted,
2578    Updated { old: Vec<Value>, new: Vec<Value> },
2579    Skipped,
2580}
2581
2582#[allow(clippy::too_many_arguments)]
2583#[inline]
2584pub(super) fn apply_insert_with_conflict(
2585    wtx: &mut WriteTxn<'_>,
2586    table_schema: &TableSchema,
2587    key_buf: &[u8],
2588    value_buf: &[u8],
2589    row: &[Value],
2590    pk_values: &[Value],
2591    on_conflict: &CompiledOnConflict,
2592    col_map: &ColumnMap,
2593    capture_returning: bool,
2594) -> Result<InsertRowOutcome> {
2595    let table_bytes = table_schema.name.as_bytes();
2596
2597    if let CompiledOnConflict::DoNothing { target } = on_conflict {
2598        let pk_target = matches!(target, None | Some(ConflictKind::PrimaryKey));
2599        if pk_target && table_schema.indices.is_empty() && table_schema.foreign_keys.is_empty() {
2600            let inserted = wtx
2601                .table_insert_if_absent(table_bytes, key_buf, value_buf)
2602                .map_err(SqlError::Storage)?;
2603            return Ok(if inserted {
2604                InsertRowOutcome::Inserted
2605            } else {
2606                InsertRowOutcome::Skipped
2607            });
2608        }
2609    }
2610
2611    if let CompiledOnConflict::DoUpdate {
2612        target: ConflictKind::PrimaryKey,
2613        assignments,
2614        where_clause,
2615        fast_paths,
2616    } = on_conflict
2617    {
2618        if can_fuse_do_update(table_schema, assignments) {
2619            return apply_do_update_fused(
2620                wtx,
2621                table_schema,
2622                table_bytes,
2623                key_buf,
2624                value_buf,
2625                row,
2626                assignments,
2627                where_clause.as_ref(),
2628                col_map,
2629                fast_paths.as_deref(),
2630                capture_returning,
2631            );
2632        }
2633    }
2634
2635    let primary_outcome = wtx
2636        .table_insert_or_fetch(table_bytes, key_buf, value_buf)
2637        .map_err(SqlError::Storage)?;
2638
2639    match primary_outcome {
2640        citadel_txn::write_txn::InsertOutcome::Inserted => {
2641            if table_schema.indices.is_empty() {
2642                return Ok(InsertRowOutcome::Inserted);
2643            }
2644            let mut inserted_keys: Vec<(usize, Vec<u8>)> = Vec::new();
2645            match insert_index_entries_or_fetch(
2646                wtx,
2647                table_schema,
2648                row,
2649                pk_values,
2650                &mut inserted_keys,
2651            )? {
2652                None => Ok(InsertRowOutcome::Inserted),
2653                Some(conflicting_idx) => {
2654                    let matches_target =
2655                        matches!(on_conflict, CompiledOnConflict::DoNothing { target: None })
2656                            || matches!(
2657                                on_conflict,
2658                                CompiledOnConflict::DoNothing {
2659                                    target: Some(ConflictKind::UniqueIndex { index_idx }),
2660                                } | CompiledOnConflict::DoUpdate {
2661                                    target: ConflictKind::UniqueIndex { index_idx },
2662                                    ..
2663                                } if *index_idx == conflicting_idx
2664                            );
2665                    undo_partial_insert(wtx, table_schema, key_buf, &inserted_keys)?;
2666                    if !matches_target {
2667                        return Err(SqlError::UniqueViolation(
2668                            table_schema.indices[conflicting_idx].name.clone(),
2669                        ));
2670                    }
2671                    match on_conflict {
2672                        CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2673                        CompiledOnConflict::DoUpdate {
2674                            assignments,
2675                            where_clause,
2676                            ..
2677                        } => {
2678                            let existing_pk =
2679                                fetch_unique_index_pk(wtx, table_schema, conflicting_idx, row)?;
2680                            apply_do_update(
2681                                wtx,
2682                                table_schema,
2683                                &existing_pk,
2684                                row,
2685                                assignments,
2686                                where_clause.as_ref(),
2687                                col_map,
2688                                capture_returning,
2689                            )
2690                        }
2691                    }
2692                }
2693            }
2694        }
2695        citadel_txn::write_txn::InsertOutcome::Existed(old_bytes) => {
2696            let matches_target = matches!(
2697                on_conflict,
2698                CompiledOnConflict::DoNothing { target: None }
2699                    | CompiledOnConflict::DoNothing {
2700                        target: Some(ConflictKind::PrimaryKey),
2701                    }
2702                    | CompiledOnConflict::DoUpdate {
2703                        target: ConflictKind::PrimaryKey,
2704                        ..
2705                    }
2706            );
2707            if !matches_target {
2708                return Err(SqlError::DuplicateKey);
2709            }
2710            match on_conflict {
2711                CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2712                CompiledOnConflict::DoUpdate {
2713                    assignments,
2714                    where_clause,
2715                    ..
2716                } => {
2717                    let old_row = decode_full_row(table_schema, key_buf, &old_bytes)?;
2718                    apply_do_update_with_old_row(
2719                        wtx,
2720                        table_schema,
2721                        key_buf,
2722                        &old_row,
2723                        row,
2724                        assignments,
2725                        where_clause.as_ref(),
2726                        col_map,
2727                        capture_returning,
2728                    )
2729                }
2730            }
2731        }
2732    }
2733}
2734
2735#[inline]
2736fn apply_fast_path_patch(
2737    old_bytes: &[u8],
2738    fast_paths: &[DoUpdateFastPath],
2739) -> Result<UpsertAction> {
2740    UPSERT_SCRATCH.with(|slot| {
2741        let mut bufs = slot.borrow_mut();
2742        bufs.new_value_buf.clear();
2743        bufs.new_value_buf.extend_from_slice(old_bytes);
2744
2745        let mut patch_scratch: Vec<u8> = Vec::new();
2746
2747        for fp in fast_paths {
2748            match fp {
2749                DoUpdateFastPath::IntAddConst { phys_idx, delta } => {
2750                    let decoded =
2751                        crate::encoding::decode_columns(&bufs.new_value_buf, &[*phys_idx])?;
2752                    let old_val = &decoded[0];
2753                    let new_val = match old_val {
2754                        Value::Integer(i) => Value::Integer(i.wrapping_add(*delta)),
2755                        Value::Null => Value::Null,
2756                        _ => {
2757                            return Err(SqlError::TypeMismatch {
2758                                expected: "INTEGER".into(),
2759                                got: old_val.data_type().to_string(),
2760                            });
2761                        }
2762                    };
2763                    if !crate::encoding::patch_column_in_place(
2764                        &mut bufs.new_value_buf,
2765                        *phys_idx,
2766                        &new_val,
2767                    )? {
2768                        patch_scratch.clear();
2769                        crate::encoding::patch_row_column(
2770                            &bufs.new_value_buf,
2771                            *phys_idx,
2772                            &new_val,
2773                            &mut patch_scratch,
2774                        )?;
2775                        std::mem::swap(&mut bufs.new_value_buf, &mut patch_scratch);
2776                    }
2777                }
2778            }
2779        }
2780
2781        if bufs.new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2782            return Err(SqlError::RowTooLarge {
2783                size: bufs.new_value_buf.len(),
2784                max: citadel_core::MAX_VALUE_SIZE,
2785            });
2786        }
2787
2788        Ok(UpsertAction::Replace(bufs.new_value_buf.clone()))
2789    })
2790}
2791
2792fn upsert_needs_row(oc: &CompiledOnConflict, ts: &TableSchema) -> bool {
2793    if !ts.indices.is_empty() {
2794        return true;
2795    }
2796    match oc {
2797        CompiledOnConflict::DoNothing { .. } => false,
2798        CompiledOnConflict::DoUpdate { fast_paths, .. } => fast_paths.is_none() || ts.has_checks(),
2799    }
2800}
2801
2802fn can_fuse_do_update(ts: &TableSchema, assignments: &[(usize, Expr)]) -> bool {
2803    if !ts.indices.is_empty() {
2804        return false;
2805    }
2806    if !ts.foreign_keys.is_empty() {
2807        return false;
2808    }
2809    if ts.columns.iter().any(|c| c.generated_kind.is_some()) {
2810        return false;
2811    }
2812    let pk = ts.pk_indices();
2813    !assignments.iter().any(|(ci, _)| pk.contains(ci))
2814}
2815
2816#[allow(clippy::too_many_arguments)]
2817#[inline]
2818fn apply_do_update_fused(
2819    wtx: &mut WriteTxn<'_>,
2820    table_schema: &TableSchema,
2821    table_bytes: &[u8],
2822    key_buf: &[u8],
2823    value_buf: &[u8],
2824    proposed_row: &[Value],
2825    assignments: &[(usize, Expr)],
2826    where_clause: Option<&Expr>,
2827    col_map: &ColumnMap,
2828    fast_paths: Option<&[DoUpdateFastPath]>,
2829    capture_returning: bool,
2830) -> Result<InsertRowOutcome> {
2831    let non_pk = table_schema.non_pk_indices();
2832    let enc_pos = table_schema.encoding_positions();
2833    let phys_count = table_schema.physical_non_pk_count();
2834    let dropped = table_schema.dropped_non_pk_slots();
2835    let has_checks = table_schema.has_checks();
2836    let has_fks = !table_schema.foreign_keys.is_empty();
2837
2838    let captured: std::cell::RefCell<Option<(Vec<Value>, Vec<Value>)>> =
2839        std::cell::RefCell::new(None);
2840
2841    let outcome =
2842        wtx.table_upsert_with::<_, SqlError>(table_bytes, key_buf, value_buf, |old_bytes| {
2843            if let Some(fps) = fast_paths {
2844                if !has_checks {
2845                    let action = apply_fast_path_patch(old_bytes, fps)?;
2846                    if capture_returning {
2847                        if let UpsertAction::Replace(ref new_bytes) = action {
2848                            let old_row = decode_full_row(table_schema, key_buf, old_bytes)?;
2849                            let new_row = decode_full_row(table_schema, key_buf, new_bytes)?;
2850                            *captured.borrow_mut() = Some((old_row, new_row));
2851                        }
2852                    }
2853                    return Ok(action);
2854                }
2855            }
2856            UPSERT_SCRATCH.with(|slot| {
2857                let mut bufs = slot.borrow_mut();
2858                let UpsertBufs {
2859                    old_row,
2860                    new_row,
2861                    value_values,
2862                    new_value_buf,
2863                } = &mut *bufs;
2864
2865                old_row.clear();
2866                old_row.resize(table_schema.columns.len(), Value::Null);
2867                decode_full_row_into(table_schema, key_buf, old_bytes, old_row)?;
2868
2869                if let Some(w) = where_clause {
2870                    let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2871                    let result = eval_expr(w, &ctx)?;
2872                    if result.is_null() || !is_truthy(&result) {
2873                        return Ok(UpsertAction::Skip);
2874                    }
2875                }
2876
2877                new_row.clear();
2878                new_row.extend_from_slice(old_row);
2879                for (col_idx, expr) in assignments {
2880                    let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2881                    let val = eval_expr(expr, &ctx)?;
2882                    let col = &table_schema.columns[*col_idx];
2883                    new_row[*col_idx] = if val.is_null() {
2884                        Value::Null
2885                    } else {
2886                        let got = val.data_type();
2887                        val.coerce_into(col.data_type)
2888                            .ok_or_else(|| SqlError::TypeMismatch {
2889                                expected: col.data_type.to_string(),
2890                                got: got.to_string(),
2891                            })?
2892                    };
2893                }
2894
2895                for (assigned_idx, _) in assignments {
2896                    let col = &table_schema.columns[*assigned_idx];
2897                    if !col.nullable && new_row[col.position as usize].is_null() {
2898                        return Err(SqlError::NotNullViolation(col.name.clone()));
2899                    }
2900                }
2901                if has_checks {
2902                    for col in &table_schema.columns {
2903                        if let Some(ref check) = col.check_expr {
2904                            let ctx = EvalCtx::new(col_map, new_row);
2905                            let result = eval_expr(check, &ctx)?;
2906                            if !is_truthy(&result) && !result.is_null() {
2907                                let name = col.check_name.as_deref().unwrap_or(&col.name);
2908                                return Err(SqlError::CheckViolation(name.to_string()));
2909                            }
2910                        }
2911                    }
2912                    for tc in &table_schema.check_constraints {
2913                        let ctx = EvalCtx::new(col_map, new_row);
2914                        let result = eval_expr(&tc.expr, &ctx)?;
2915                        if !is_truthy(&result) && !result.is_null() {
2916                            let name = tc.name.as_deref().unwrap_or(&tc.sql);
2917                            return Err(SqlError::CheckViolation(name.to_string()));
2918                        }
2919                    }
2920                }
2921                let _ = has_fks;
2922
2923                value_values.clear();
2924                value_values.resize(phys_count, Value::Null);
2925                for &slot in dropped {
2926                    value_values[slot as usize] = Value::Null;
2927                }
2928                for (j, &i) in non_pk.iter().enumerate() {
2929                    value_values[enc_pos[j] as usize] = new_row[i].clone();
2930                }
2931                new_value_buf.clear();
2932                crate::encoding::encode_row_into(value_values, new_value_buf);
2933
2934                if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2935                    return Err(SqlError::RowTooLarge {
2936                        size: new_value_buf.len(),
2937                        max: citadel_core::MAX_VALUE_SIZE,
2938                    });
2939                }
2940
2941                if capture_returning {
2942                    *captured.borrow_mut() = Some((old_row.clone(), new_row.clone()));
2943                }
2944                Ok(UpsertAction::Replace(new_value_buf.clone()))
2945            })
2946        })?;
2947
2948    match outcome {
2949        UpsertOutcome::Inserted => Ok(InsertRowOutcome::Inserted),
2950        UpsertOutcome::Updated => {
2951            if capture_returning {
2952                let (old, new) = captured.into_inner().ok_or_else(|| {
2953                    SqlError::InvalidValue("DO UPDATE produced no captured rows".into())
2954                })?;
2955                Ok(InsertRowOutcome::Updated { old, new })
2956            } else {
2957                Ok(InsertRowOutcome::Inserted)
2958            }
2959        }
2960        UpsertOutcome::Skipped => Ok(InsertRowOutcome::Skipped),
2961    }
2962}
2963
2964fn fetch_unique_index_pk(
2965    wtx: &mut WriteTxn<'_>,
2966    table_schema: &TableSchema,
2967    index_idx: usize,
2968    row: &[Value],
2969) -> Result<Vec<u8>> {
2970    let idx = &table_schema.indices[index_idx];
2971    let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2972    let indexed: Vec<Value> = idx
2973        .column_positions_iter()
2974        .map(|col_idx| row[col_idx as usize].clone())
2975        .collect();
2976    let key = crate::encoding::encode_composite_key(&indexed);
2977    let value = wtx
2978        .table_get(&idx_table, &key)
2979        .map_err(SqlError::Storage)?
2980        .ok_or_else(|| {
2981            SqlError::InvalidValue("unique index missing expected collision entry".into())
2982        })?;
2983    Ok(value)
2984}
2985
2986#[allow(clippy::too_many_arguments)]
2987fn apply_do_update(
2988    wtx: &mut WriteTxn<'_>,
2989    table_schema: &TableSchema,
2990    pk_key: &[u8],
2991    proposed_row: &[Value],
2992    assignments: &[(usize, Expr)],
2993    where_clause: Option<&Expr>,
2994    col_map: &ColumnMap,
2995    capture_returning: bool,
2996) -> Result<InsertRowOutcome> {
2997    let old_value = wtx
2998        .table_get(table_schema.name.as_bytes(), pk_key)
2999        .map_err(SqlError::Storage)?
3000        .ok_or_else(|| SqlError::InvalidValue("primary row missing for DO UPDATE target".into()))?;
3001    let old_row = decode_full_row(table_schema, pk_key, &old_value)?;
3002    apply_do_update_with_old_row(
3003        wtx,
3004        table_schema,
3005        pk_key,
3006        &old_row,
3007        proposed_row,
3008        assignments,
3009        where_clause,
3010        col_map,
3011        capture_returning,
3012    )
3013}
3014
3015#[allow(clippy::too_many_arguments)]
3016fn apply_do_update_with_old_row(
3017    wtx: &mut WriteTxn<'_>,
3018    table_schema: &TableSchema,
3019    old_pk_key: &[u8],
3020    old_row: &[Value],
3021    proposed_row: &[Value],
3022    assignments: &[(usize, Expr)],
3023    where_clause: Option<&Expr>,
3024    col_map: &ColumnMap,
3025    capture_returning: bool,
3026) -> Result<InsertRowOutcome> {
3027    if let Some(w) = where_clause {
3028        let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
3029        let result = eval_expr(w, &ctx)?;
3030        if result.is_null() || !is_truthy(&result) {
3031            return Ok(InsertRowOutcome::Skipped);
3032        }
3033    }
3034
3035    let mut new_row = old_row.to_vec();
3036    for (col_idx, expr) in assignments {
3037        let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
3038        let val = eval_expr(expr, &ctx)?;
3039        let col = &table_schema.columns[*col_idx];
3040        new_row[*col_idx] = if val.is_null() {
3041            Value::Null
3042        } else {
3043            let got = val.data_type();
3044            val.coerce_into(col.data_type)
3045                .ok_or_else(|| SqlError::TypeMismatch {
3046                    expected: col.data_type.to_string(),
3047                    got: got.to_string(),
3048                })?
3049        };
3050    }
3051
3052    for col in &table_schema.columns {
3053        if matches!(
3054            col.generated_kind,
3055            Some(crate::parser::GeneratedKind::Stored)
3056        ) {
3057            let val = eval_expr(
3058                col.generated_expr.as_ref().unwrap(),
3059                &EvalCtx::new(col_map, &new_row),
3060            )?;
3061            let pos = col.position as usize;
3062            new_row[pos] = if val.is_null() {
3063                if !col.nullable {
3064                    return Err(SqlError::NotNullViolation(col.name.clone()));
3065                }
3066                Value::Null
3067            } else {
3068                let got = val.data_type();
3069                val.coerce_into(col.data_type)
3070                    .ok_or_else(|| SqlError::TypeMismatch {
3071                        expected: col.data_type.to_string(),
3072                        got: got.to_string(),
3073                    })?
3074            };
3075        }
3076    }
3077
3078    let pk_indices = table_schema.pk_indices();
3079    let assigned_pk = assignments.iter().any(|(ci, _)| pk_indices.contains(ci));
3080    let pk_changed = assigned_pk && pk_indices.iter().any(|&i| old_row[i] != new_row[i]);
3081
3082    for (assigned_idx, _) in assignments {
3083        let col = &table_schema.columns[*assigned_idx];
3084        if !col.nullable && new_row[col.position as usize].is_null() {
3085            return Err(SqlError::NotNullViolation(col.name.clone()));
3086        }
3087    }
3088    if table_schema.has_checks() {
3089        for col in &table_schema.columns {
3090            if let Some(ref check) = col.check_expr {
3091                let ctx = EvalCtx::new(col_map, &new_row);
3092                let result = eval_expr(check, &ctx)?;
3093                if !is_truthy(&result) && !result.is_null() {
3094                    let name = col.check_name.as_deref().unwrap_or(&col.name);
3095                    return Err(SqlError::CheckViolation(name.to_string()));
3096                }
3097            }
3098        }
3099        for tc in &table_schema.check_constraints {
3100            let ctx = EvalCtx::new(col_map, &new_row);
3101            let result = eval_expr(&tc.expr, &ctx)?;
3102            if !is_truthy(&result) && !result.is_null() {
3103                let name = tc.name.as_deref().unwrap_or(&tc.sql);
3104                return Err(SqlError::CheckViolation(name.to_string()));
3105            }
3106        }
3107    }
3108    for fk in &table_schema.foreign_keys {
3109        let changed = fk
3110            .columns
3111            .iter()
3112            .any(|&ci| old_row[ci as usize] != new_row[ci as usize]);
3113        if !changed {
3114            continue;
3115        }
3116        let any_null = fk.columns.iter().any(|&ci| new_row[ci as usize].is_null());
3117        if any_null {
3118            continue;
3119        }
3120        let fk_vals: Vec<Value> = fk
3121            .columns
3122            .iter()
3123            .map(|&ci| new_row[ci as usize].clone())
3124            .collect();
3125        let fk_key = crate::encoding::encode_composite_key(&fk_vals);
3126        if fk.deferrable && fk.initially_deferred {
3127            let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
3128            wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
3129                fk_name: name,
3130                foreign_table: fk.foreign_table.as_bytes().to_vec(),
3131                parent_key: fk_key,
3132            });
3133            continue;
3134        }
3135        if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key) {
3136            let found = wtx
3137                .table_get(fk.foreign_table.as_bytes(), &fk_key)
3138                .map_err(SqlError::Storage)?;
3139            if found.is_none() {
3140                let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
3141                return Err(SqlError::ForeignKeyViolation(name.to_string()));
3142            }
3143            wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key);
3144        }
3145    }
3146
3147    let has_indices = !table_schema.indices.is_empty();
3148    let old_pk_values: Vec<Value> = if has_indices || pk_changed {
3149        pk_indices.iter().map(|&i| old_row[i].clone()).collect()
3150    } else {
3151        Vec::new()
3152    };
3153    let new_pk_values: Vec<Value> = if has_indices || pk_changed {
3154        pk_indices.iter().map(|&i| new_row[i].clone()).collect()
3155    } else {
3156        Vec::new()
3157    };
3158
3159    let non_pk = table_schema.non_pk_indices();
3160    let enc_pos = table_schema.encoding_positions();
3161    let phys_count = table_schema.physical_non_pk_count();
3162    let dropped = table_schema.dropped_non_pk_slots();
3163    let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
3164    for &slot in dropped {
3165        value_values[slot as usize] = Value::Null;
3166    }
3167    for (j, &i) in non_pk.iter().enumerate() {
3168        let col = &table_schema.columns[i];
3169        value_values[enc_pos[j] as usize] = if matches!(
3170            col.generated_kind,
3171            Some(crate::parser::GeneratedKind::Virtual)
3172        ) {
3173            Value::Null
3174        } else {
3175            new_row[i].clone()
3176        };
3177    }
3178    let mut new_value_buf = Vec::with_capacity(256);
3179    crate::encoding::encode_row_into(&value_values, &mut new_value_buf);
3180
3181    if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
3182        return Err(SqlError::RowTooLarge {
3183            size: new_value_buf.len(),
3184            max: citadel_core::MAX_VALUE_SIZE,
3185        });
3186    }
3187
3188    if pk_changed {
3189        let new_pk_key = crate::encoding::encode_composite_key(&new_pk_values);
3190        let inserted = wtx
3191            .table_insert(table_schema.name.as_bytes(), &new_pk_key, &new_value_buf)
3192            .map_err(SqlError::Storage)?;
3193        if !inserted {
3194            return Err(SqlError::DuplicateKey);
3195        }
3196        wtx.table_delete(table_schema.name.as_bytes(), old_pk_key)
3197            .map_err(SqlError::Storage)?;
3198        for idx in &table_schema.indices {
3199            let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
3200            let old_idx_key =
3201                encode_index_key_with_schema(idx, old_row, &old_pk_values, table_schema);
3202            wtx.table_delete(&idx_table, &old_idx_key)
3203                .map_err(SqlError::Storage)?;
3204            let new_idx_key =
3205                encode_index_key_with_schema(idx, &new_row, &new_pk_values, table_schema);
3206            let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
3207            let is_new = wtx
3208                .table_insert(&idx_table, &new_idx_key, &new_idx_val)
3209                .map_err(SqlError::Storage)?;
3210            if idx.unique && !is_new {
3211                let any_null = idx
3212                    .column_positions_iter()
3213                    .any(|c| new_row[c as usize].is_null());
3214                if !any_null {
3215                    return Err(SqlError::UniqueViolation(idx.name.clone()));
3216                }
3217            }
3218        }
3219    } else {
3220        wtx.table_update_sorted(
3221            table_schema.name.as_bytes(),
3222            &[(old_pk_key, new_value_buf.as_slice())],
3223        )
3224        .map_err(SqlError::Storage)?;
3225        let col_map_partial = any_partial_index(table_schema).then(|| table_schema.column_map());
3226        for idx in &table_schema.indices {
3227            let cols_changed = index_columns_changed(idx, old_row, &new_row);
3228            let (del, ins) = partial_idx_update_actions(
3229                idx,
3230                old_row,
3231                &new_row,
3232                cols_changed,
3233                false,
3234                col_map_partial,
3235            );
3236            let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
3237            if del {
3238                let old_idx_key =
3239                    encode_index_key_with_schema(idx, old_row, &old_pk_values, table_schema);
3240                wtx.table_delete(&idx_table, &old_idx_key)
3241                    .map_err(SqlError::Storage)?;
3242            }
3243            if ins {
3244                let new_idx_key =
3245                    encode_index_key_with_schema(idx, &new_row, &new_pk_values, table_schema);
3246                let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
3247                let is_new = wtx
3248                    .table_insert(&idx_table, &new_idx_key, &new_idx_val)
3249                    .map_err(SqlError::Storage)?;
3250                if idx.unique && !is_new {
3251                    let any_null = idx
3252                        .column_positions_iter()
3253                        .any(|c| new_row[c as usize].is_null());
3254                    if !any_null {
3255                        return Err(SqlError::UniqueViolation(idx.name.clone()));
3256                    }
3257                }
3258            }
3259        }
3260    }
3261
3262    if capture_returning {
3263        Ok(InsertRowOutcome::Updated {
3264            old: old_row.to_vec(),
3265            new: new_row,
3266        })
3267    } else {
3268        Ok(InsertRowOutcome::Inserted)
3269    }
3270}
3271
3272fn detect_fast_paths(
3273    ts: &TableSchema,
3274    assignments: &[(usize, Expr)],
3275) -> Option<Vec<DoUpdateFastPath>> {
3276    let non_pk = ts.non_pk_indices();
3277    let enc_pos = ts.encoding_positions();
3278    let mut out = Vec::with_capacity(assignments.len());
3279    for (col_idx, expr) in assignments {
3280        let col = &ts.columns[*col_idx];
3281        if col.data_type != DataType::Integer {
3282            return None;
3283        }
3284        let nonpk_order = non_pk.iter().position(|&i| i == *col_idx)?;
3285        let phys_idx = enc_pos[nonpk_order] as usize;
3286
3287        if let Expr::BinaryOp { left, op, right } = expr {
3288            if !matches!(op, BinOp::Add | BinOp::Sub) {
3289                return None;
3290            }
3291            let reads_target =
3292                matches!(left.as_ref(), Expr::Column(n) if n.eq_ignore_ascii_case(&col.name));
3293            if !reads_target {
3294                return None;
3295            }
3296            if let Expr::Literal(Value::Integer(n)) = right.as_ref() {
3297                let delta = if matches!(op, BinOp::Sub) { -n } else { *n };
3298                let _ = col_idx;
3299                out.push(DoUpdateFastPath::IntAddConst { phys_idx, delta });
3300                continue;
3301            }
3302            return None;
3303        }
3304        return None;
3305    }
3306    Some(out)
3307}
3308
3309fn compile_on_conflict(oc: &OnConflictClause, ts: &TableSchema) -> Result<CompiledOnConflict> {
3310    let target = oc
3311        .target
3312        .as_ref()
3313        .map(|t| resolve_conflict_target(t, ts))
3314        .transpose()?;
3315    match &oc.action {
3316        OnConflictAction::DoNothing => Ok(CompiledOnConflict::DoNothing { target }),
3317        OnConflictAction::DoUpdate {
3318            assignments,
3319            where_clause,
3320        } => {
3321            let target = target.ok_or_else(|| {
3322                SqlError::Plan("ON CONFLICT without target requires DO NOTHING".into())
3323            })?;
3324            let compiled_assignments: Vec<(usize, Expr)> = assignments
3325                .iter()
3326                .map(|(name, expr)| {
3327                    let col_idx = ts
3328                        .column_index(name)
3329                        .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?;
3330                    Ok((col_idx, expr.clone()))
3331                })
3332                .collect::<Result<_>>()?;
3333            let fast_paths = if where_clause.is_none() {
3334                detect_fast_paths(ts, &compiled_assignments)
3335            } else {
3336                None
3337            };
3338            Ok(CompiledOnConflict::DoUpdate {
3339                target,
3340                assignments: compiled_assignments,
3341                where_clause: where_clause.clone(),
3342                fast_paths,
3343            })
3344        }
3345    }
3346}
3347
3348/// Caller MUST check `cache.is_trivial_fast` first.
3349fn exec_insert_trivial_fast(
3350    wtx: &mut WriteTxn<'_>,
3351    table_lower: &str,
3352    cache: &InsertCache,
3353    bufs: &mut InsertBufs,
3354    params: &[Value],
3355) -> Result<ExecutionResult> {
3356    let prog = cache
3357        .trivial_fast_program
3358        .as_ref()
3359        .expect("trivial fast: program");
3360
3361    for &p in &prog.not_null_param_indices {
3362        if params[p as usize].is_null() {
3363            return Err(SqlError::NotNullViolation(format!("param@{p}")));
3364        }
3365    }
3366
3367    match &params[prog.pk_param as usize] {
3368        Value::Integer(v) => crate::encoding::encode_int_key_into(*v, &mut bufs.key_buf),
3369        _ => return Err(SqlError::InvalidValue("non-integer PK in fast path".into())),
3370    }
3371
3372    bufs.value_buf.clear();
3373    bufs.value_buf.extend_from_slice(&prog.template);
3374
3375    for op in &prog.ops {
3376        match op {
3377            WriteOp::ParamI64 { param_idx, off } => match &params[*param_idx as usize] {
3378                Value::Integer(v) => {
3379                    let off = *off as usize;
3380                    bufs.value_buf[off..off + 8].copy_from_slice(&v.to_le_bytes());
3381                }
3382                other => {
3383                    return Err(SqlError::TypeMismatch {
3384                        expected: "Integer".into(),
3385                        got: other.data_type().to_string(),
3386                    });
3387                }
3388            },
3389            WriteOp::LiteralI64 { value, off } => {
3390                let off = *off as usize;
3391                bufs.value_buf[off..off + 8].copy_from_slice(&value.to_le_bytes());
3392            }
3393            WriteOp::GenAddParamsI64 {
3394                a_param,
3395                b_param,
3396                off,
3397                bitmap_byte_off,
3398                bitmap_bit_mask,
3399            } => match (&params[*a_param as usize], &params[*b_param as usize]) {
3400                (Value::Integer(a), Value::Integer(b)) => {
3401                    let off = *off as usize;
3402                    bufs.value_buf[off..off + 8].copy_from_slice(&a.wrapping_add(*b).to_le_bytes());
3403                }
3404                _ => {
3405                    bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
3406                }
3407            },
3408            WriteOp::GenMulAddParamI64 {
3409                param_idx,
3410                mul,
3411                add,
3412                off,
3413                bitmap_byte_off,
3414                bitmap_bit_mask,
3415            } => match &params[*param_idx as usize] {
3416                Value::Integer(v) => {
3417                    let r = v.wrapping_mul(*mul).wrapping_add(*add);
3418                    let off = *off as usize;
3419                    bufs.value_buf[off..off + 8].copy_from_slice(&r.to_le_bytes());
3420                }
3421                _ => {
3422                    bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
3423                }
3424            },
3425        }
3426    }
3427
3428    for fk in &prog.fk_checks {
3429        if fk.col_params.iter().any(|&p| params[p as usize].is_null()) {
3430            continue;
3431        }
3432        bufs.fk_key_buf.clear();
3433        for &p in &fk.col_params {
3434            crate::encoding::encode_key_value_into(&params[p as usize], &mut bufs.fk_key_buf);
3435        }
3436        if !wtx.fk_check_cached(&fk.foreign_table, &bufs.fk_key_buf) {
3437            let found = wtx
3438                .table_get(&fk.foreign_table, &bufs.fk_key_buf)
3439                .map_err(SqlError::Storage)?;
3440            if found.is_none() {
3441                return Err(SqlError::ForeignKeyViolation(
3442                    String::from_utf8_lossy(&fk.foreign_table).into_owned(),
3443                ));
3444            }
3445            wtx.mark_fk_verified(&fk.foreign_table, &bufs.fk_key_buf);
3446        }
3447    }
3448
3449    let is_new = wtx
3450        .table_insert_if_absent(table_lower.as_bytes(), &bufs.key_buf, &bufs.value_buf)
3451        .map_err(SqlError::Storage)?;
3452    if !is_new {
3453        return match prog.on_dup {
3454            DupPolicy::Error => Err(SqlError::DuplicateKey),
3455            DupPolicy::Skip => Ok(ExecutionResult::RowsAffected(0)),
3456        };
3457    }
3458
3459    for idx in &prog.index_inserts {
3460        bufs.fk_key_buf.clear();
3461        for &(p, coll) in &idx.key_params {
3462            crate::encoding::encode_key_value_collated_into(
3463                &params[p as usize],
3464                coll,
3465                &mut bufs.fk_key_buf,
3466            );
3467        }
3468        crate::encoding::encode_key_value_into(
3469            &params[prog.pk_param as usize],
3470            &mut bufs.fk_key_buf,
3471        );
3472        wtx.table_insert_index(&idx.table, &bufs.fk_key_buf, &[])
3473            .map_err(SqlError::Storage)?;
3474    }
3475
3476    Ok(ExecutionResult::RowsAffected(1))
3477}
3478
3479fn build_bind_plan(
3480    stmt: &InsertStmt,
3481    col_indices: &[usize],
3482    col_data_types: &[DataType],
3483) -> Option<Vec<BindAction>> {
3484    let rows = match &stmt.source {
3485        InsertSource::Values(rows) => rows,
3486        _ => return None,
3487    };
3488    if rows.len() != 1 {
3489        return None;
3490    }
3491    let value_row = &rows[0];
3492    if value_row.len() != col_indices.len() {
3493        return None;
3494    }
3495    let mut plan = Vec::with_capacity(value_row.len());
3496    for (i, expr) in value_row.iter().enumerate() {
3497        let col_idx = col_indices[i];
3498        let target = col_data_types[col_idx];
3499        match expr {
3500            Expr::Parameter(n) => {
3501                if *n == 0 {
3502                    return None;
3503                }
3504                plan.push(BindAction::Param {
3505                    param_idx: n - 1,
3506                    col_idx,
3507                    target,
3508                });
3509            }
3510            Expr::Literal(v) => plan.push(BindAction::Literal {
3511                value: v.clone(),
3512                col_idx,
3513            }),
3514            _ => return None,
3515        }
3516    }
3517    Some(plan)
3518}
3519
3520impl CompiledInsert {
3521    pub fn try_compile(schema: &SchemaManager, stmt: &InsertStmt) -> Option<Self> {
3522        let lower = stmt.table.to_ascii_lowercase();
3523        let cached = if let Some(ts) = schema.get(&lower) {
3524            let insert_columns: Vec<&str> = if stmt.columns.is_empty() {
3525                ts.columns.iter().map(|c| c.name.as_str()).collect()
3526            } else {
3527                stmt.columns.iter().map(|s| s.as_str()).collect()
3528            };
3529            let mut col_indices = Vec::with_capacity(insert_columns.len());
3530            for name in &insert_columns {
3531                col_indices.push(ts.column_index(name)?);
3532            }
3533            if col_indices
3534                .iter()
3535                .any(|&ci| ts.columns[ci].generated_kind.is_some())
3536            {
3537                return None;
3538            }
3539            let on_conflict = stmt
3540                .on_conflict
3541                .as_ref()
3542                .map(|oc| compile_on_conflict(oc, ts))
3543                .transpose()
3544                .ok()
3545                .flatten()
3546                .map(Arc::new);
3547            let generated_col_positions: Vec<usize> = ts
3548                .columns
3549                .iter()
3550                .enumerate()
3551                .filter_map(|(i, c)| {
3552                    matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored))
3553                        .then_some(i)
3554                })
3555                .collect();
3556            let generated_fast_evals: Vec<FastGenEval> = generated_col_positions
3557                .iter()
3558                .map(|&pos| {
3559                    detect_fast_gen_eval(ts.columns[pos].generated_expr.as_ref().unwrap(), ts)
3560                })
3561                .collect();
3562            let row_col_map = if on_conflict.is_some() || !generated_col_positions.is_empty() {
3563                Some(ColumnMap::new(&ts.columns))
3564            } else {
3565                None
3566            };
3567            let pk_indices: Vec<usize> = ts.pk_indices().to_vec();
3568            let non_pk_indices: Vec<usize> = ts.non_pk_indices().to_vec();
3569            let encoding_positions: Vec<u16> = ts.encoding_positions().to_vec();
3570            let dropped_non_pk_slots: Vec<u16> = ts.dropped_non_pk_slots().to_vec();
3571            let phys_count = ts.physical_non_pk_count();
3572            let col_data_types: Vec<DataType> = ts.columns.iter().map(|c| c.data_type).collect();
3573            let single_int_pk =
3574                pk_indices.len() == 1 && ts.columns[pk_indices[0]].data_type == DataType::Integer;
3575            let not_null_indices: Vec<u16> = ts
3576                .columns
3577                .iter()
3578                .filter(|c| !c.nullable)
3579                .map(|c| c.position)
3580                .collect();
3581            let bind_plan = build_bind_plan(stmt, &col_indices, &col_data_types);
3582            let any_defaults_flag = ts.columns.iter().any(|c| c.default_expr.is_some());
3583            let row_fully_overwritten = if any_defaults_flag {
3584                false
3585            } else {
3586                let mut covered: rustc_hash::FxHashSet<usize> =
3587                    col_indices.iter().copied().collect();
3588                covered.extend(generated_col_positions.iter().copied());
3589                for (j, &i) in non_pk_indices.iter().enumerate() {
3590                    let _ = j;
3591                    if matches!(
3592                        ts.columns[i].generated_kind,
3593                        Some(crate::parser::GeneratedKind::Virtual)
3594                    ) {
3595                        covered.insert(i);
3596                    }
3597                }
3598                bind_plan.is_some() && covered.len() == ts.columns.len()
3599            };
3600            let mut non_virtual_pairs: Vec<(usize, usize)> = Vec::new();
3601            let mut null_value_slots: Vec<usize> =
3602                dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3603            for (j, &i) in non_pk_indices.iter().enumerate() {
3604                let slot = encoding_positions[j] as usize;
3605                if matches!(
3606                    ts.columns[i].generated_kind,
3607                    Some(crate::parser::GeneratedKind::Virtual)
3608                ) {
3609                    null_value_slots.push(slot);
3610                } else {
3611                    non_virtual_pairs.push((i, slot));
3612                }
3613            }
3614            let row_encoder = {
3615                let all_int_or_null = non_pk_indices.iter().enumerate().all(|(j, &i)| {
3616                    let col = &ts.columns[i];
3617                    if matches!(
3618                        col.generated_kind,
3619                        Some(crate::parser::GeneratedKind::Virtual)
3620                    ) {
3621                        true
3622                    } else {
3623                        col.data_type == DataType::Integer && encoding_positions[j] != u16::MAX
3624                    }
3625                });
3626                if all_int_or_null {
3627                    let mut slots: Vec<crate::encoding::TemplateSlot> = (0..phys_count)
3628                        .map(|_| crate::encoding::TemplateSlot::IntHole)
3629                        .collect();
3630                    for &s in &dropped_non_pk_slots {
3631                        slots[s as usize] = crate::encoding::TemplateSlot::Null;
3632                    }
3633                    for (j, &i) in non_pk_indices.iter().enumerate() {
3634                        if matches!(
3635                            ts.columns[i].generated_kind,
3636                            Some(crate::parser::GeneratedKind::Virtual)
3637                        ) {
3638                            slots[encoding_positions[j] as usize] =
3639                                crate::encoding::TemplateSlot::Null;
3640                        }
3641                    }
3642                    Some(crate::encoding::build_row_template(phys_count, &slots))
3643                } else {
3644                    None
3645                }
3646            };
3647            // build_trivial_fast_program rejects any shape it can't compile.
3648            let is_trivial_fast_eligible = !insert_has_subquery(stmt)
3649                && !ts.columns.iter().any(|c| c.default_expr.is_some())
3650                && !ts.has_checks()
3651                && stmt.returning.is_none()
3652                && bind_plan.is_some()
3653                && row_fully_overwritten
3654                && single_int_pk
3655                && generated_fast_evals
3656                    .iter()
3657                    .all(|fe| !matches!(fe, FastGenEval::None));
3658            let trivial_fast_program = if is_trivial_fast_eligible {
3659                build_trivial_fast_program(
3660                    bind_plan.as_ref().unwrap(),
3661                    phys_count,
3662                    &non_virtual_pairs,
3663                    &generated_col_positions,
3664                    &generated_fast_evals,
3665                    ts,
3666                    on_conflict.as_deref(),
3667                )
3668            } else {
3669                None
3670            };
3671            let is_trivial_fast = trivial_fast_program.is_some();
3672            let has_checks = ts.has_checks();
3673            let any_defaults = ts.columns.iter().any(|c| c.default_expr.is_some());
3674            let needs_scoped_params = bind_plan.is_none()
3675                || has_checks
3676                || any_defaults
3677                || !generated_col_positions.is_empty()
3678                || on_conflict.is_some()
3679                || stmt.returning.is_some()
3680                || insert_has_subquery(stmt)
3681                || super::helpers::any_partial_index(ts);
3682            Some(InsertCache {
3683                col_indices,
3684                has_subquery: insert_has_subquery(stmt),
3685                any_defaults,
3686                has_checks,
3687                on_conflict,
3688                row_col_map,
3689                generated_col_positions,
3690                generated_fast_evals,
3691                pk_indices,
3692                non_pk_indices,
3693                encoding_positions,
3694                dropped_non_pk_slots,
3695                phys_count,
3696                single_int_pk,
3697                not_null_indices,
3698                bind_plan,
3699                row_fully_overwritten,
3700                row_encoder,
3701                is_trivial_fast,
3702                trivial_fast_program,
3703                needs_scoped_params,
3704            })
3705        } else if schema.get_view(&lower).is_some() {
3706            None
3707        } else {
3708            return None;
3709        };
3710        Some(Self {
3711            table_lower: lower,
3712            cached,
3713        })
3714    }
3715}
3716
3717impl CompiledPlan for CompiledInsert {
3718    fn execute(
3719        &self,
3720        db: &Database,
3721        schema: &SchemaManager,
3722        stmt: &Statement,
3723        params: &[Value],
3724        txn: super::compile::ActiveTxnRef<'_, '_>,
3725    ) -> Result<ExecutionResult> {
3726        let ins = match stmt {
3727            Statement::Insert(i) => i,
3728            _ => {
3729                return Err(SqlError::Unsupported(
3730                    "CompiledInsert received non-INSERT statement".into(),
3731                ))
3732            }
3733        };
3734        use super::compile::ActiveTxnRef;
3735        match txn {
3736            ActiveTxnRef::None => exec_insert(db, schema, ins, params),
3737            ActiveTxnRef::Read(_) => Err(SqlError::Unsupported(
3738                "cannot execute mutating statement inside a read-only transaction".into(),
3739            )),
3740            ActiveTxnRef::Write(outer) => match self.cached.as_ref() {
3741                Some(c) if c.is_trivial_fast => with_insert_scratch(|bufs| {
3742                    exec_insert_trivial_fast(outer, &self.table_lower, c, bufs, params)
3743                }),
3744                Some(c) => exec_insert_in_txn_cached(outer, schema, ins, params, c),
3745                None => exec_insert_in_txn(outer, schema, ins, params),
3746            },
3747        }
3748    }
3749
3750    fn uses_scoped_params(&self) -> bool {
3751        match self.cached.as_ref() {
3752            Some(c) => !c.is_trivial_fast && c.needs_scoped_params,
3753            None => true,
3754        }
3755    }
3756}
3757
3758pub struct CompiledDelete {
3759    table_lower: String,
3760}
3761
3762impl CompiledDelete {
3763    pub fn try_compile(schema: &SchemaManager, stmt: &DeleteStmt) -> Option<Self> {
3764        let lower = stmt.table.to_ascii_lowercase();
3765        schema.get(&lower)?;
3766        Some(Self { table_lower: lower })
3767    }
3768}
3769
3770impl CompiledPlan for CompiledDelete {
3771    fn execute(
3772        &self,
3773        db: &Database,
3774        schema: &SchemaManager,
3775        stmt: &Statement,
3776        _params: &[Value],
3777        txn: super::compile::ActiveTxnRef<'_, '_>,
3778    ) -> Result<ExecutionResult> {
3779        let del = match stmt {
3780            Statement::Delete(d) => d,
3781            _ => {
3782                return Err(SqlError::Unsupported(
3783                    "CompiledDelete received non-DELETE statement".into(),
3784                ))
3785            }
3786        };
3787        let _ = &self.table_lower;
3788        use super::compile::ActiveTxnRef;
3789        match txn {
3790            ActiveTxnRef::None => super::write::exec_delete(db, schema, del),
3791            ActiveTxnRef::Read(_) => Err(SqlError::Unsupported(
3792                "cannot execute mutating statement inside a read-only transaction".into(),
3793            )),
3794            ActiveTxnRef::Write(outer) => super::write::exec_delete_in_txn(outer, schema, del),
3795        }
3796    }
3797}
3798
3799fn exec_instead_of_view_insert_auto(
3800    db: &Database,
3801    schema: &SchemaManager,
3802    view_name: &str,
3803    aliases: &[String],
3804    stmt: &InsertStmt,
3805    params: &[Value],
3806) -> Result<ExecutionResult> {
3807    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
3808    let r = exec_instead_of_view_insert_in_txn(&mut wtx, schema, view_name, aliases, stmt, params)?;
3809    wtx.commit().map_err(SqlError::Storage)?;
3810    Ok(r)
3811}
3812
3813fn exec_instead_of_view_insert_in_txn(
3814    wtx: &mut WriteTxn<'_>,
3815    schema: &SchemaManager,
3816    view_name: &str,
3817    aliases: &[String],
3818    stmt: &InsertStmt,
3819    params: &[Value],
3820) -> Result<ExecutionResult> {
3821    // CREATE VIEW without explicit aliases stores an empty vec; derive at runtime.
3822    let resolved_aliases: Vec<String> = if aliases.is_empty() {
3823        derive_view_columns(wtx, schema, view_name)?
3824    } else {
3825        aliases.to_vec()
3826    };
3827    let view_cols = super::triggers::view_columns_from_aliases(&resolved_aliases);
3828    let alias_map: rustc_hash::FxHashMap<String, usize> = resolved_aliases
3829        .iter()
3830        .enumerate()
3831        .map(|(i, name)| (name.to_ascii_lowercase(), i))
3832        .collect();
3833
3834    let target_positions: Vec<usize> = if stmt.columns.is_empty() {
3835        (0..resolved_aliases.len()).collect()
3836    } else {
3837        stmt.columns
3838            .iter()
3839            .map(|c| {
3840                alias_map
3841                    .get(&c.to_ascii_lowercase())
3842                    .copied()
3843                    .ok_or_else(|| SqlError::ColumnNotFound(c.clone()))
3844            })
3845            .collect::<Result<_>>()?
3846    };
3847
3848    let source_rows: Vec<Vec<Value>> = match &stmt.source {
3849        InsertSource::Values(rows) => {
3850            let mut out = Vec::with_capacity(rows.len());
3851            for row in rows {
3852                if row.len() != target_positions.len() {
3853                    return Err(SqlError::InvalidValue(format!(
3854                        "expected {} values, got {}",
3855                        target_positions.len(),
3856                        row.len()
3857                    )));
3858                }
3859                let mut vals = Vec::with_capacity(row.len());
3860                for expr in row {
3861                    let v = match expr {
3862                        Expr::Parameter(n) => params
3863                            .get(n - 1)
3864                            .cloned()
3865                            .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
3866                        Expr::Literal(v) => v.clone(),
3867                        other => eval_const_expr(other)?,
3868                    };
3869                    vals.push(v);
3870                }
3871                out.push(vals);
3872            }
3873            out
3874        }
3875        InsertSource::Select(sq) => {
3876            let empty_ctes = CteContext::default();
3877            let qr = exec_query_body_write(wtx, schema, &sq.body, &empty_ctes)?;
3878            qr.rows
3879        }
3880    };
3881
3882    let mut count: u64 = 0;
3883    for row in source_rows {
3884        if row.len() != target_positions.len() {
3885            return Err(SqlError::InvalidValue(format!(
3886                "expected {} values, got {}",
3887                target_positions.len(),
3888                row.len()
3889            )));
3890        }
3891        let mut new_row = vec![Value::Null; resolved_aliases.len()];
3892        for (slot, val) in target_positions.iter().zip(row) {
3893            new_row[*slot] = val;
3894        }
3895        super::triggers::fire_row_triggers(
3896            wtx,
3897            schema,
3898            view_name,
3899            crate::parser::TriggerTiming::InsteadOf,
3900            super::triggers::FireEvent::Insert,
3901            None,
3902            Some(new_row),
3903            &view_cols,
3904        )?;
3905        count += 1;
3906    }
3907    Ok(ExecutionResult::RowsAffected(count))
3908}
3909
3910fn derive_view_columns(
3911    wtx: &mut WriteTxn<'_>,
3912    schema: &SchemaManager,
3913    view_name: &str,
3914) -> Result<Vec<String>> {
3915    use crate::parser::{QueryBody, SelectColumn, SelectQuery, SelectStmt};
3916    let sel = SelectStmt {
3917        columns: vec![SelectColumn::AllColumns],
3918        from: view_name.to_string(),
3919        from_alias: None,
3920        from_subquery: None,
3921        from_args: None,
3922        from_json_table: None,
3923        joins: vec![],
3924        distinct: false,
3925        where_clause: None,
3926        order_by: vec![],
3927        limit: Some(Expr::Literal(Value::Integer(1))),
3928        offset: None,
3929        group_by: vec![],
3930        having: None,
3931    };
3932    let sq = SelectQuery {
3933        ctes: vec![],
3934        recursive: false,
3935        body: QueryBody::Select(Box::new(sel)),
3936    };
3937    let qr = super::cte::exec_select_query_in_txn(wtx, schema, &sq)?;
3938    match qr {
3939        ExecutionResult::Query(q) => Ok(q.columns),
3940        _ => Ok(Vec::new()),
3941    }
3942}
3943
3944#[cfg(test)]
3945#[path = "dml_tests.rs"]
3946mod tests;