Skip to main content

citadel_sql/executor/
dml.rs

1use std::cell::RefCell;
2use std::sync::Arc;
3
4use citadel::Database;
5use citadel_buffer::btree::{UpsertAction, UpsertOutcome};
6use citadel_txn::write_txn::WriteTxn;
7use rustc_hash::FxHashMap;
8
9use crate::encoding::{encode_composite_key_into, encode_row_into};
10use crate::error::{Result, SqlError};
11use crate::eval::{eval_expr, is_truthy, ColumnMap, EvalCtx};
12use crate::parser::*;
13use crate::types::*;
14
15use crate::schema::SchemaManager;
16
17use super::compile::CompiledPlan;
18use super::helpers::*;
19use super::CteContext;
20
21pub(super) fn exec_insert(
22    db: &Database,
23    schema: &SchemaManager,
24    stmt: &InsertStmt,
25    params: &[Value],
26) -> Result<ExecutionResult> {
27    let empty_ctes = CteContext::default();
28    let materialized;
29    let stmt = if insert_has_subquery(stmt) {
30        materialized = materialize_insert(stmt, &mut |sub| {
31            exec_subquery_read(db, schema, sub, &empty_ctes)
32        })?;
33        &materialized
34    } else {
35        stmt
36    };
37
38    let lower_name = stmt.table.to_ascii_lowercase();
39    if let Some(view_def) = schema.get_view(&lower_name) {
40        if super::triggers::has_instead_of(schema, &lower_name, super::triggers::FireEvent::Insert)
41        {
42            let aliases = view_def.column_aliases.clone();
43            return exec_instead_of_view_insert_auto(
44                db,
45                schema,
46                &lower_name,
47                &aliases,
48                stmt,
49                params,
50            );
51        }
52        return Err(SqlError::CannotModifyView(stmt.table.clone()));
53    }
54    if schema.get_matview(&lower_name).is_some() {
55        return Err(SqlError::CannotModifyView(format!(
56            "materialized view '{}' is read-only — use REFRESH MATERIALIZED VIEW",
57            stmt.table
58        )));
59    }
60    let table_schema = schema
61        .get(&lower_name)
62        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
63
64    let insert_columns = if stmt.columns.is_empty() {
65        table_schema
66            .columns
67            .iter()
68            .map(|c| c.name.clone())
69            .collect::<Vec<_>>()
70    } else {
71        stmt.columns
72            .iter()
73            .map(|c| c.to_ascii_lowercase())
74            .collect()
75    };
76
77    let col_indices: Vec<usize> = insert_columns
78        .iter()
79        .map(|name| {
80            table_schema
81                .column_index(name)
82                .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
83        })
84        .collect::<Result<_>>()?;
85
86    for &ci in &col_indices {
87        if table_schema.columns[ci].generated_kind.is_some() {
88            return Err(SqlError::CannotInsertIntoGeneratedColumn(
89                table_schema.columns[ci].name.clone(),
90            ));
91        }
92    }
93
94    let defaults: Vec<(usize, &Expr)> = table_schema
95        .columns
96        .iter()
97        .filter(|c| c.default_expr.is_some() && !col_indices.contains(&(c.position as usize)))
98        .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
99        .collect();
100
101    let generated_cols: Vec<(usize, &Expr)> = table_schema
102        .columns
103        .iter()
104        .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
105        .map(|c| (c.position as usize, c.generated_expr.as_ref().unwrap()))
106        .collect();
107
108    let has_checks = table_schema.has_checks();
109    let strict = table_schema.is_strict();
110    let row_col_map_for_gen = if !generated_cols.is_empty() {
111        Some(ColumnMap::new(&table_schema.columns))
112    } else {
113        None
114    };
115    let check_col_map = if has_checks {
116        Some(ColumnMap::new(&table_schema.columns))
117    } else {
118        None
119    };
120
121    let select_rows = match &stmt.source {
122        InsertSource::Select(sq) => {
123            let insert_ctes =
124                super::materialize_all_ctes(&sq.ctes, sq.recursive, &mut |body, ctx| {
125                    exec_query_body_read(db, schema, body, ctx)
126                })?;
127            let qr = exec_query_body_read(db, schema, &sq.body, &insert_ctes)?;
128            Some(qr.rows)
129        }
130        InsertSource::Values(_) => None,
131    };
132
133    let compiled_conflict: Option<Arc<CompiledOnConflict>> = stmt
134        .on_conflict
135        .as_ref()
136        .map(|oc| compile_on_conflict(oc, table_schema).map(Arc::new))
137        .transpose()?;
138
139    let row_col_map = compiled_conflict
140        .as_ref()
141        .map(|_| ColumnMap::new(&table_schema.columns));
142
143    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
144    let mut count: u64 = 0;
145    let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
146        stmt.returning.as_ref().map(|_| Vec::new());
147
148    let pk_indices = table_schema.pk_indices();
149    let non_pk = table_schema.non_pk_indices();
150    let enc_pos = table_schema.encoding_positions();
151    let phys_count = table_schema.physical_non_pk_count();
152    let mut row = vec![Value::Null; table_schema.columns.len()];
153    let mut pk_values: Vec<Value> = vec![Value::Null; pk_indices.len()];
154    let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
155    let mut key_buf: Vec<u8> = Vec::with_capacity(64);
156    let mut value_buf: Vec<u8> = Vec::with_capacity(256);
157    let mut fk_key_buf: Vec<u8> = Vec::with_capacity(64);
158
159    let values = match &stmt.source {
160        InsertSource::Values(rows) => Some(rows.as_slice()),
161        InsertSource::Select(_) => None,
162    };
163    let sel_rows = select_rows.as_deref();
164
165    let total = match (values, sel_rows) {
166        (Some(rows), _) => rows.len(),
167        (_, Some(rows)) => rows.len(),
168        _ => 0,
169    };
170
171    if let Some(sel) = sel_rows {
172        if !sel.is_empty() && sel[0].len() != insert_columns.len() {
173            return Err(SqlError::InvalidValue(format!(
174                "INSERT ... SELECT column count mismatch: expected {}, got {}",
175                insert_columns.len(),
176                sel[0].len()
177            )));
178        }
179    }
180
181    let has_insert_statement_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
182        t.enabled
183            && t.granularity == crate::parser::TriggerGranularity::ForEachStatement
184            && t.events
185                .iter()
186                .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
187    });
188    let mut stmt_new_rows: Vec<Vec<Value>> = if has_insert_statement_triggers {
189        Vec::with_capacity(total)
190    } else {
191        Vec::new()
192    };
193
194    if has_insert_statement_triggers {
195        super::triggers::fire_statement_triggers(
196            &mut wtx,
197            schema,
198            &table_schema.name,
199            crate::parser::TriggerTiming::Before,
200            super::triggers::FireEvent::Insert,
201            &table_schema.columns,
202            &[],
203            &[],
204        )?;
205    }
206
207    for idx in 0..total {
208        for v in row.iter_mut() {
209            *v = Value::Null;
210        }
211
212        if let Some(value_rows) = values {
213            let value_row = &value_rows[idx];
214            if value_row.len() != insert_columns.len() {
215                return Err(SqlError::InvalidValue(format!(
216                    "expected {} values, got {}",
217                    insert_columns.len(),
218                    value_row.len()
219                )));
220            }
221            for (i, expr) in value_row.iter().enumerate() {
222                let val = if let Expr::Parameter(n) = expr {
223                    params
224                        .get(n - 1)
225                        .cloned()
226                        .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?
227                } else {
228                    eval_const_expr(expr)?
229                };
230                let col_idx = col_indices[i];
231                let col = &table_schema.columns[col_idx];
232                row[col_idx] = if val.is_null() {
233                    Value::Null
234                } else {
235                    coerce_for_column(val, col, strict)?
236                };
237            }
238        } else if let Some(sel) = sel_rows {
239            let sel_row = &sel[idx];
240            for (i, val) in sel_row.iter().enumerate() {
241                let col_idx = col_indices[i];
242                let col = &table_schema.columns[col_idx];
243                row[col_idx] = if val.is_null() {
244                    Value::Null
245                } else {
246                    coerce_for_column(val.clone(), col, strict)?
247                };
248            }
249        }
250
251        for &(pos, def_expr) in &defaults {
252            let val = eval_const_expr(def_expr)?;
253            let col = &table_schema.columns[pos];
254            if !val.is_null() {
255                row[pos] = coerce_for_column(val, col, strict)?;
256            }
257        }
258
259        if let Some(ref gen_map) = row_col_map_for_gen {
260            for &(pos, gen_expr) in &generated_cols {
261                let val = eval_expr(gen_expr, &EvalCtx::new(gen_map, &row))?;
262                let col = &table_schema.columns[pos];
263                row[pos] = if val.is_null() {
264                    Value::Null
265                } else {
266                    coerce_for_column(val, col, strict)?
267                };
268            }
269        }
270
271        for col in &table_schema.columns {
272            if !col.nullable && row[col.position as usize].is_null() {
273                return Err(SqlError::NotNullViolation(col.name.clone()));
274            }
275        }
276
277        if let Some(ref col_map) = check_col_map {
278            for col in &table_schema.columns {
279                if let Some(ref check) = col.check_expr {
280                    let result = eval_expr(check, &EvalCtx::new(col_map, &row))?;
281                    if !is_truthy(&result) && !result.is_null() {
282                        let name = col.check_name.as_deref().unwrap_or(&col.name);
283                        return Err(SqlError::CheckViolation(name.to_string()));
284                    }
285                }
286            }
287            for tc in &table_schema.check_constraints {
288                let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &row))?;
289                if !is_truthy(&result) && !result.is_null() {
290                    let name = tc.name.as_deref().unwrap_or(&tc.sql);
291                    return Err(SqlError::CheckViolation(name.to_string()));
292                }
293            }
294        }
295
296        for fk in &table_schema.foreign_keys {
297            let any_null = fk.columns.iter().any(|&ci| row[ci as usize].is_null());
298            if any_null {
299                continue; // MATCH SIMPLE: skip if any FK col is NULL
300            }
301            let fk_vals: Vec<Value> = fk
302                .columns
303                .iter()
304                .map(|&ci| row[ci as usize].clone())
305                .collect();
306            fk_key_buf.clear();
307            encode_composite_key_into(&fk_vals, &mut fk_key_buf);
308            if fk.deferrable && fk.initially_deferred {
309                let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
310                wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
311                    fk_name: name,
312                    foreign_table: fk.foreign_table.as_bytes().to_vec(),
313                    parent_key: fk_key_buf.clone(),
314                });
315                continue;
316            }
317            if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key_buf) {
318                let found = wtx
319                    .table_get(fk.foreign_table.as_bytes(), &fk_key_buf)
320                    .map_err(SqlError::Storage)?;
321                if found.is_none() {
322                    let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
323                    return Err(SqlError::ForeignKeyViolation(name.to_string()));
324                }
325                wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key_buf);
326            }
327        }
328
329        let proposed_row_for_returning: Option<Vec<Value>> =
330            returning_rows.as_ref().map(|_| row.clone());
331        let row_for_stmt_trigger: Option<Vec<Value>> = if has_insert_statement_triggers {
332            Some(row.clone())
333        } else {
334            None
335        };
336
337        let has_before_insert_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
338            t.enabled
339                && t.timing == crate::parser::TriggerTiming::Before
340                && t.granularity == crate::parser::TriggerGranularity::ForEachRow
341                && t.events
342                    .iter()
343                    .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
344        });
345        if has_before_insert_triggers {
346            super::triggers::fire_row_triggers(
347                &mut wtx,
348                schema,
349                &table_schema.name,
350                crate::parser::TriggerTiming::Before,
351                super::triggers::FireEvent::Insert,
352                None,
353                Some(row.clone()),
354                &table_schema.columns,
355            )?;
356        }
357
358        for (j, &i) in pk_indices.iter().enumerate() {
359            pk_values[j] = std::mem::replace(&mut row[i], Value::Null);
360        }
361        encode_composite_key_into(&pk_values, &mut key_buf);
362
363        for (j, &i) in non_pk.iter().enumerate() {
364            let col = &table_schema.columns[i];
365            if matches!(
366                col.generated_kind,
367                Some(crate::parser::GeneratedKind::Virtual)
368            ) {
369                value_values[enc_pos[j] as usize] = Value::Null;
370                row[i] = Value::Null;
371            } else {
372                value_values[enc_pos[j] as usize] = std::mem::replace(&mut row[i], Value::Null);
373            }
374        }
375        encode_row_into(&value_values, &mut value_buf);
376
377        if key_buf.len() > citadel_core::MAX_KEY_SIZE {
378            return Err(SqlError::KeyTooLarge {
379                size: key_buf.len(),
380                max: citadel_core::MAX_KEY_SIZE,
381            });
382        }
383        if value_buf.len() > citadel_core::MAX_VALUE_SIZE {
384            return Err(SqlError::RowTooLarge {
385                size: value_buf.len(),
386                max: citadel_core::MAX_VALUE_SIZE,
387            });
388        }
389
390        match compiled_conflict.as_ref() {
391            None => {
392                let is_new = wtx
393                    .table_insert(table_schema.name.as_bytes(), &key_buf, &value_buf)
394                    .map_err(SqlError::Storage)?;
395                if !is_new {
396                    return Err(SqlError::DuplicateKey);
397                }
398                let has_after_insert_triggers =
399                    schema.triggers_for(&table_schema.name).iter().any(|t| {
400                        t.enabled
401                            && t.timing == crate::parser::TriggerTiming::After
402                            && t.granularity == crate::parser::TriggerGranularity::ForEachRow
403                            && t.events
404                                .iter()
405                                .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
406                    });
407                if !table_schema.indices.is_empty() || has_after_insert_triggers {
408                    for (j, &i) in pk_indices.iter().enumerate() {
409                        row[i] = pk_values[j].clone();
410                    }
411                    for (j, &i) in non_pk.iter().enumerate() {
412                        row[i] =
413                            std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
414                    }
415                    if !table_schema.indices.is_empty() {
416                        insert_index_entries(&mut wtx, table_schema, &row, &pk_values)?;
417                    }
418                    if has_after_insert_triggers {
419                        super::triggers::fire_row_triggers(
420                            &mut wtx,
421                            schema,
422                            &table_schema.name,
423                            crate::parser::TriggerTiming::After,
424                            super::triggers::FireEvent::Insert,
425                            None,
426                            Some(row.clone()),
427                            &table_schema.columns,
428                        )?;
429                    }
430                }
431                if let Some(r) = row_for_stmt_trigger.clone() {
432                    stmt_new_rows.push(r);
433                }
434                count += 1;
435                if let Some(buf) = returning_rows.as_mut() {
436                    buf.push((None, proposed_row_for_returning));
437                }
438            }
439            Some(oc) => {
440                let oc_ref: &CompiledOnConflict = oc;
441                let needs_row = upsert_needs_row(oc_ref, table_schema);
442                if needs_row {
443                    for (j, &i) in pk_indices.iter().enumerate() {
444                        row[i] = pk_values[j].clone();
445                    }
446                    for (j, &i) in non_pk.iter().enumerate() {
447                        row[i] =
448                            std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
449                    }
450                }
451                let outcome = apply_insert_with_conflict(
452                    &mut wtx,
453                    table_schema,
454                    &key_buf,
455                    &value_buf,
456                    &row,
457                    &pk_values,
458                    oc_ref,
459                    row_col_map.as_ref().unwrap(),
460                    stmt.returning.is_some(),
461                )?;
462                match outcome {
463                    InsertRowOutcome::Inserted => {
464                        count += 1;
465                        if let Some(buf) = returning_rows.as_mut() {
466                            buf.push((None, proposed_row_for_returning));
467                        }
468                        if let Some(r) = row_for_stmt_trigger.clone() {
469                            stmt_new_rows.push(r);
470                        }
471                        let has_after_insert_triggers =
472                            schema.triggers_for(&table_schema.name).iter().any(|t| {
473                                t.enabled
474                                    && t.timing == crate::parser::TriggerTiming::After
475                                    && t.granularity
476                                        == crate::parser::TriggerGranularity::ForEachRow
477                                    && t.events
478                                        .iter()
479                                        .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
480                            });
481                        if has_after_insert_triggers {
482                            super::triggers::fire_row_triggers(
483                                &mut wtx,
484                                schema,
485                                &table_schema.name,
486                                crate::parser::TriggerTiming::After,
487                                super::triggers::FireEvent::Insert,
488                                None,
489                                Some(row.clone()),
490                                &table_schema.columns,
491                            )?;
492                        }
493                    }
494                    InsertRowOutcome::Updated { old, new } => {
495                        count += 1;
496                        if let Some(buf) = returning_rows.as_mut() {
497                            buf.push((Some(old.clone()), Some(new.clone())));
498                        }
499                        let has_after_update_triggers =
500                            schema.triggers_for(&table_schema.name).iter().any(|t| {
501                                t.enabled
502                                    && t.timing == crate::parser::TriggerTiming::After
503                                    && t.granularity
504                                        == crate::parser::TriggerGranularity::ForEachRow
505                                    && t.events.iter().any(|e| {
506                                        matches!(e, crate::parser::TriggerEvent::Update(_))
507                                    })
508                            });
509                        if has_after_update_triggers {
510                            let changed_cols: Vec<String> = match oc_ref {
511                                CompiledOnConflict::DoUpdate { assignments, .. } => assignments
512                                    .iter()
513                                    .map(|(col_idx, _)| table_schema.columns[*col_idx].name.clone())
514                                    .collect(),
515                                _ => Vec::new(),
516                            };
517                            super::triggers::fire_row_triggers(
518                                &mut wtx,
519                                schema,
520                                &table_schema.name,
521                                crate::parser::TriggerTiming::After,
522                                super::triggers::FireEvent::Update {
523                                    changed_columns: &changed_cols,
524                                },
525                                Some(old),
526                                Some(new),
527                                &table_schema.columns,
528                            )?;
529                        }
530                    }
531                    InsertRowOutcome::Skipped => {}
532                }
533            }
534        }
535    }
536
537    if has_insert_statement_triggers {
538        super::triggers::fire_statement_triggers(
539            &mut wtx,
540            schema,
541            &table_schema.name,
542            crate::parser::TriggerTiming::After,
543            super::triggers::FireEvent::Insert,
544            &table_schema.columns,
545            &[],
546            &stmt_new_rows,
547        )?;
548    }
549
550    if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
551        let qr = super::helpers::project_returning(table_schema, returning_cols, &rows)?;
552        super::helpers::drain_deferred_fk_checks(&mut wtx)?;
553        wtx.commit().map_err(SqlError::Storage)?;
554        return Ok(ExecutionResult::Query(qr));
555    }
556
557    super::helpers::drain_deferred_fk_checks(&mut wtx)?;
558    wtx.commit().map_err(SqlError::Storage)?;
559    Ok(ExecutionResult::RowsAffected(count))
560}
561
562pub(super) fn has_subquery(expr: &Expr) -> bool {
563    crate::parser::has_subquery(expr)
564}
565
566pub(super) fn stmt_has_subquery(stmt: &SelectStmt) -> bool {
567    if let Some(ref w) = stmt.where_clause {
568        if has_subquery(w) {
569            return true;
570        }
571    }
572    if let Some(ref h) = stmt.having {
573        if has_subquery(h) {
574            return true;
575        }
576    }
577    for col in &stmt.columns {
578        if let SelectColumn::Expr { expr, .. } = col {
579            if has_subquery(expr) {
580                return true;
581            }
582        }
583    }
584    for ob in &stmt.order_by {
585        if has_subquery(&ob.expr) {
586            return true;
587        }
588    }
589    for join in &stmt.joins {
590        if let Some(ref on_expr) = join.on_clause {
591            if has_subquery(on_expr) {
592                return true;
593            }
594        }
595    }
596    false
597}
598
599pub(super) fn materialize_expr(
600    expr: &Expr,
601    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
602) -> Result<Expr> {
603    match expr {
604        Expr::InSubquery {
605            expr: e,
606            subquery,
607            negated,
608        } => {
609            let inner = materialize_expr(e, exec_sub)?;
610            let qr = exec_sub(subquery)?;
611            if !qr.columns.is_empty() && qr.columns.len() != 1 {
612                return Err(SqlError::SubqueryMultipleColumns);
613            }
614            let mut values = rustc_hash::FxHashSet::default();
615            let mut has_null = false;
616            for row in &qr.rows {
617                if row[0].is_null() {
618                    has_null = true;
619                } else {
620                    values.insert(row[0].clone());
621                }
622            }
623            Ok(Expr::InSet {
624                expr: Box::new(inner),
625                values,
626                has_null,
627                negated: *negated,
628            })
629        }
630        Expr::ScalarSubquery(subquery) => {
631            let qr = exec_sub(subquery)?;
632            if qr.rows.len() > 1 {
633                return Err(SqlError::SubqueryMultipleRows);
634            }
635            let val = if qr.rows.is_empty() {
636                Value::Null
637            } else {
638                qr.rows[0][0].clone()
639            };
640            Ok(Expr::Literal(val))
641        }
642        Expr::Exists { subquery, negated } => {
643            let qr = exec_sub(subquery)?;
644            let exists = !qr.rows.is_empty();
645            let result = if *negated { !exists } else { exists };
646            Ok(Expr::Literal(Value::Boolean(result)))
647        }
648        Expr::InList {
649            expr: e,
650            list,
651            negated,
652        } => {
653            let inner = materialize_expr(e, exec_sub)?;
654            let items = list
655                .iter()
656                .map(|item| materialize_expr(item, exec_sub))
657                .collect::<Result<Vec<_>>>()?;
658            Ok(Expr::InList {
659                expr: Box::new(inner),
660                list: items,
661                negated: *negated,
662            })
663        }
664        Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
665            left: Box::new(materialize_expr(left, exec_sub)?),
666            op: *op,
667            right: Box::new(materialize_expr(right, exec_sub)?),
668        }),
669        Expr::UnaryOp { op, expr: e } => Ok(Expr::UnaryOp {
670            op: *op,
671            expr: Box::new(materialize_expr(e, exec_sub)?),
672        }),
673        Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(materialize_expr(e, exec_sub)?))),
674        Expr::IsNotNull(e) => Ok(Expr::IsNotNull(Box::new(materialize_expr(e, exec_sub)?))),
675        Expr::InSet {
676            expr: e,
677            values,
678            has_null,
679            negated,
680        } => Ok(Expr::InSet {
681            expr: Box::new(materialize_expr(e, exec_sub)?),
682            values: values.clone(),
683            has_null: *has_null,
684            negated: *negated,
685        }),
686        Expr::Between {
687            expr: e,
688            low,
689            high,
690            negated,
691        } => Ok(Expr::Between {
692            expr: Box::new(materialize_expr(e, exec_sub)?),
693            low: Box::new(materialize_expr(low, exec_sub)?),
694            high: Box::new(materialize_expr(high, exec_sub)?),
695            negated: *negated,
696        }),
697        Expr::Like {
698            expr: e,
699            pattern,
700            escape,
701            negated,
702        } => {
703            let esc = escape
704                .as_ref()
705                .map(|es| materialize_expr(es, exec_sub).map(Box::new))
706                .transpose()?;
707            Ok(Expr::Like {
708                expr: Box::new(materialize_expr(e, exec_sub)?),
709                pattern: Box::new(materialize_expr(pattern, exec_sub)?),
710                escape: esc,
711                negated: *negated,
712            })
713        }
714        Expr::Case {
715            operand,
716            conditions,
717            else_result,
718        } => {
719            let op = operand
720                .as_ref()
721                .map(|e| materialize_expr(e, exec_sub).map(Box::new))
722                .transpose()?;
723            let conds = conditions
724                .iter()
725                .map(|(c, r)| {
726                    Ok((
727                        materialize_expr(c, exec_sub)?,
728                        materialize_expr(r, exec_sub)?,
729                    ))
730                })
731                .collect::<Result<Vec<_>>>()?;
732            let else_r = else_result
733                .as_ref()
734                .map(|e| materialize_expr(e, exec_sub).map(Box::new))
735                .transpose()?;
736            Ok(Expr::Case {
737                operand: op,
738                conditions: conds,
739                else_result: else_r,
740            })
741        }
742        Expr::Coalesce(args) => {
743            let materialized = args
744                .iter()
745                .map(|a| materialize_expr(a, exec_sub))
746                .collect::<Result<Vec<_>>>()?;
747            Ok(Expr::Coalesce(materialized))
748        }
749        Expr::Cast { expr: e, data_type } => Ok(Expr::Cast {
750            expr: Box::new(materialize_expr(e, exec_sub)?),
751            data_type: *data_type,
752        }),
753        Expr::Function {
754            name,
755            args,
756            distinct,
757        } => {
758            let materialized = args
759                .iter()
760                .map(|a| materialize_expr(a, exec_sub))
761                .collect::<Result<Vec<_>>>()?;
762            Ok(Expr::Function {
763                name: name.clone(),
764                args: materialized,
765                distinct: *distinct,
766            })
767        }
768        other => Ok(other.clone()),
769    }
770}
771
772pub(super) fn materialize_stmt(
773    stmt: &SelectStmt,
774    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
775) -> Result<SelectStmt> {
776    let where_clause = stmt
777        .where_clause
778        .as_ref()
779        .map(|e| materialize_expr(e, exec_sub))
780        .transpose()?;
781    let having = stmt
782        .having
783        .as_ref()
784        .map(|e| materialize_expr(e, exec_sub))
785        .transpose()?;
786    let columns = stmt
787        .columns
788        .iter()
789        .map(|c| match c {
790            SelectColumn::AllColumns => Ok(SelectColumn::AllColumns),
791            SelectColumn::AllFromOld => Ok(SelectColumn::AllFromOld),
792            SelectColumn::AllFromNew => Ok(SelectColumn::AllFromNew),
793            SelectColumn::Expr { expr, alias } => Ok(SelectColumn::Expr {
794                expr: materialize_expr(expr, exec_sub)?,
795                alias: alias.clone(),
796            }),
797        })
798        .collect::<Result<Vec<_>>>()?;
799    let order_by = stmt
800        .order_by
801        .iter()
802        .map(|ob| {
803            Ok(OrderByItem {
804                expr: materialize_expr(&ob.expr, exec_sub)?,
805                descending: ob.descending,
806                nulls_first: ob.nulls_first,
807            })
808        })
809        .collect::<Result<Vec<_>>>()?;
810    let joins = stmt
811        .joins
812        .iter()
813        .map(|j| {
814            let on_clause = j
815                .on_clause
816                .as_ref()
817                .map(|e| materialize_expr(e, exec_sub))
818                .transpose()?;
819            Ok(JoinClause {
820                join_type: j.join_type,
821                table: j.table.clone(),
822                subquery: j.subquery.clone(),
823                on_clause,
824            })
825        })
826        .collect::<Result<Vec<_>>>()?;
827    let group_by = stmt
828        .group_by
829        .iter()
830        .map(|e| materialize_expr(e, exec_sub))
831        .collect::<Result<Vec<_>>>()?;
832    Ok(SelectStmt {
833        columns,
834        from: stmt.from.clone(),
835        from_alias: stmt.from_alias.clone(),
836        from_subquery: stmt.from_subquery.clone(),
837        from_args: stmt.from_args.clone(),
838        from_json_table: stmt.from_json_table.clone(),
839        joins,
840        distinct: stmt.distinct,
841        where_clause,
842        order_by,
843        limit: stmt.limit.clone(),
844        offset: stmt.offset.clone(),
845        group_by,
846        having,
847    })
848}
849
850pub(super) fn exec_subquery_read(
851    db: &Database,
852    schema: &SchemaManager,
853    stmt: &SelectStmt,
854    ctes: &CteContext,
855) -> Result<QueryResult> {
856    match super::exec_select(db, schema, stmt, ctes)? {
857        ExecutionResult::Query(qr) => Ok(qr),
858        _ => Ok(QueryResult {
859            columns: vec![],
860            rows: vec![],
861        }),
862    }
863}
864
865pub(super) fn exec_subquery_write(
866    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
867    schema: &SchemaManager,
868    stmt: &SelectStmt,
869    ctes: &CteContext,
870) -> Result<QueryResult> {
871    match super::exec_select_in_txn(wtx, schema, stmt, ctes)? {
872        ExecutionResult::Query(qr) => Ok(qr),
873        _ => Ok(QueryResult {
874            columns: vec![],
875            rows: vec![],
876        }),
877    }
878}
879
880pub(super) fn update_has_subquery(stmt: &UpdateStmt) -> bool {
881    stmt.where_clause.as_ref().is_some_and(has_subquery)
882        || stmt.assignments.iter().any(|(_, e)| has_subquery(e))
883}
884
885pub(super) fn materialize_update(
886    stmt: &UpdateStmt,
887    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
888) -> Result<UpdateStmt> {
889    let where_clause = stmt
890        .where_clause
891        .as_ref()
892        .map(|e| materialize_expr(e, exec_sub))
893        .transpose()?;
894    let assignments = stmt
895        .assignments
896        .iter()
897        .map(|(name, expr)| Ok((name.clone(), materialize_expr(expr, exec_sub)?)))
898        .collect::<Result<Vec<_>>>()?;
899    Ok(UpdateStmt {
900        table: stmt.table.clone(),
901        assignments,
902        where_clause,
903        returning: stmt.returning.clone(),
904    })
905}
906
907pub(super) fn delete_has_subquery(stmt: &DeleteStmt) -> bool {
908    stmt.where_clause.as_ref().is_some_and(has_subquery)
909}
910
911pub(super) fn materialize_delete(
912    stmt: &DeleteStmt,
913    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
914) -> Result<DeleteStmt> {
915    let where_clause = stmt
916        .where_clause
917        .as_ref()
918        .map(|e| materialize_expr(e, exec_sub))
919        .transpose()?;
920    Ok(DeleteStmt {
921        table: stmt.table.clone(),
922        where_clause,
923        returning: stmt.returning.clone(),
924    })
925}
926
927pub(super) fn insert_has_subquery(stmt: &InsertStmt) -> bool {
928    match &stmt.source {
929        InsertSource::Values(rows) => rows.iter().any(|row| row.iter().any(has_subquery)),
930        // SELECT source subqueries are handled by exec_select's correlated/non-correlated paths
931        InsertSource::Select(_) => false,
932    }
933}
934
935pub(super) fn materialize_insert(
936    stmt: &InsertStmt,
937    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
938) -> Result<InsertStmt> {
939    let source = match &stmt.source {
940        InsertSource::Values(rows) => {
941            let mat = rows
942                .iter()
943                .map(|row| {
944                    row.iter()
945                        .map(|e| materialize_expr(e, exec_sub))
946                        .collect::<Result<Vec<_>>>()
947                })
948                .collect::<Result<Vec<_>>>()?;
949            InsertSource::Values(mat)
950        }
951        InsertSource::Select(sq) => {
952            let ctes = sq
953                .ctes
954                .iter()
955                .map(|c| {
956                    Ok(CteDefinition {
957                        name: c.name.clone(),
958                        column_aliases: c.column_aliases.clone(),
959                        body: materialize_query_body(&c.body, exec_sub)?,
960                    })
961                })
962                .collect::<Result<Vec<_>>>()?;
963            let body = materialize_query_body(&sq.body, exec_sub)?;
964            InsertSource::Select(Box::new(SelectQuery {
965                ctes,
966                recursive: sq.recursive,
967                body,
968            }))
969        }
970    };
971    Ok(InsertStmt {
972        table: stmt.table.clone(),
973        columns: stmt.columns.clone(),
974        source,
975        on_conflict: stmt.on_conflict.clone(),
976        returning: stmt.returning.clone(),
977    })
978}
979
980pub(super) fn materialize_query_body(
981    body: &QueryBody,
982    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
983) -> Result<QueryBody> {
984    match body {
985        QueryBody::Select(sel) => Ok(QueryBody::Select(Box::new(materialize_stmt(
986            sel, exec_sub,
987        )?))),
988        QueryBody::Compound(comp) => Ok(QueryBody::Compound(Box::new(CompoundSelect {
989            op: comp.op.clone(),
990            all: comp.all,
991            left: Box::new(materialize_query_body(&comp.left, exec_sub)?),
992            right: Box::new(materialize_query_body(&comp.right, exec_sub)?),
993            order_by: comp.order_by.clone(),
994            limit: comp.limit.clone(),
995            offset: comp.offset.clone(),
996        }))),
997        QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Ok(body.clone()),
998    }
999}
1000
1001pub(super) fn exec_query_body(
1002    db: &Database,
1003    schema: &SchemaManager,
1004    body: &QueryBody,
1005    ctes: &CteContext,
1006) -> Result<ExecutionResult> {
1007    match body {
1008        QueryBody::Select(sel) => super::exec_select(db, schema, sel, ctes),
1009        QueryBody::Compound(comp) => exec_compound_select(db, schema, comp, ctes),
1010        QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Err(
1011            SqlError::Unsupported("DML CTE bodies require an active write transaction".into()),
1012        ),
1013    }
1014}
1015
1016pub(super) fn exec_query_body_in_txn(
1017    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1018    schema: &SchemaManager,
1019    body: &QueryBody,
1020    ctes: &CteContext,
1021) -> Result<ExecutionResult> {
1022    match body {
1023        QueryBody::Select(sel) => super::exec_select_in_txn(wtx, schema, sel, ctes),
1024        QueryBody::Compound(comp) => exec_compound_select_in_txn(wtx, schema, comp, ctes),
1025        QueryBody::Insert(ins) => exec_insert_in_txn_with_ctes(wtx, schema, ins, &[], ctes),
1026        QueryBody::Update(upd) => super::exec_update_in_txn(wtx, schema, upd),
1027        QueryBody::Delete(del) => super::exec_delete_in_txn(wtx, schema, del),
1028    }
1029}
1030
1031pub(super) fn exec_query_body_read(
1032    db: &Database,
1033    schema: &SchemaManager,
1034    body: &QueryBody,
1035    ctes: &CteContext,
1036) -> Result<QueryResult> {
1037    match exec_query_body(db, schema, body, ctes)? {
1038        ExecutionResult::Query(qr) => Ok(qr),
1039        _ => Ok(QueryResult {
1040            columns: vec![],
1041            rows: vec![],
1042        }),
1043    }
1044}
1045
1046pub(super) fn exec_query_body_write(
1047    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1048    schema: &SchemaManager,
1049    body: &QueryBody,
1050    ctes: &CteContext,
1051) -> Result<QueryResult> {
1052    match exec_query_body_in_txn(wtx, schema, body, ctes)? {
1053        ExecutionResult::Query(qr) => Ok(qr),
1054        _ => Ok(QueryResult {
1055            columns: vec![],
1056            rows: vec![],
1057        }),
1058    }
1059}
1060
1061pub(super) fn exec_compound_select(
1062    db: &Database,
1063    schema: &SchemaManager,
1064    comp: &CompoundSelect,
1065    ctes: &CteContext,
1066) -> Result<ExecutionResult> {
1067    let left_qr = match exec_query_body(db, schema, &comp.left, ctes)? {
1068        ExecutionResult::Query(qr) => qr,
1069        _ => QueryResult {
1070            columns: vec![],
1071            rows: vec![],
1072        },
1073    };
1074    let right_qr = match exec_query_body(db, schema, &comp.right, ctes)? {
1075        ExecutionResult::Query(qr) => qr,
1076        _ => QueryResult {
1077            columns: vec![],
1078            rows: vec![],
1079        },
1080    };
1081    apply_set_operation(comp, left_qr, right_qr)
1082}
1083
1084pub(super) fn exec_compound_select_in_txn(
1085    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1086    schema: &SchemaManager,
1087    comp: &CompoundSelect,
1088    ctes: &CteContext,
1089) -> Result<ExecutionResult> {
1090    let left_qr = match exec_query_body_in_txn(wtx, schema, &comp.left, ctes)? {
1091        ExecutionResult::Query(qr) => qr,
1092        _ => QueryResult {
1093            columns: vec![],
1094            rows: vec![],
1095        },
1096    };
1097    let right_qr = match exec_query_body_in_txn(wtx, schema, &comp.right, ctes)? {
1098        ExecutionResult::Query(qr) => qr,
1099        _ => QueryResult {
1100            columns: vec![],
1101            rows: vec![],
1102        },
1103    };
1104    apply_set_operation(comp, left_qr, right_qr)
1105}
1106
1107pub(super) fn apply_set_operation(
1108    comp: &CompoundSelect,
1109    left_qr: QueryResult,
1110    right_qr: QueryResult,
1111) -> Result<ExecutionResult> {
1112    if !left_qr.columns.is_empty()
1113        && !right_qr.columns.is_empty()
1114        && left_qr.columns.len() != right_qr.columns.len()
1115    {
1116        return Err(SqlError::CompoundColumnCountMismatch {
1117            left: left_qr.columns.len(),
1118            right: right_qr.columns.len(),
1119        });
1120    }
1121
1122    let columns = left_qr.columns;
1123
1124    let mut rows = match (&comp.op, comp.all) {
1125        (SetOp::Union, true) => {
1126            let total = left_qr.rows.len().saturating_add(right_qr.rows.len());
1127            let mut rows = Vec::with_capacity(total);
1128            rows.extend(left_qr.rows);
1129            rows.extend(right_qr.rows);
1130            rows
1131        }
1132        (SetOp::Union, false) => {
1133            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1134            let mut rows = Vec::new();
1135            for row in left_qr.rows.into_iter().chain(right_qr.rows) {
1136                if !seen.contains(&row) {
1137                    seen.insert(row.clone());
1138                    rows.push(row);
1139                }
1140            }
1141            rows
1142        }
1143        (SetOp::Intersect, true) => {
1144            let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1145            for row in &right_qr.rows {
1146                *right_counts.entry(row.clone()).or_insert(0) += 1;
1147            }
1148            let mut rows = Vec::new();
1149            for row in left_qr.rows {
1150                if let Some(count) = right_counts.get_mut(&row) {
1151                    if *count > 0 {
1152                        *count -= 1;
1153                        rows.push(row);
1154                    }
1155                }
1156            }
1157            rows
1158        }
1159        (SetOp::Intersect, false) => {
1160            let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1161            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1162            let mut rows = Vec::new();
1163            for row in left_qr.rows {
1164                if right_set.contains(&row) && !seen.contains(&row) {
1165                    seen.insert(row.clone());
1166                    rows.push(row);
1167                }
1168            }
1169            rows
1170        }
1171        (SetOp::Except, true) => {
1172            let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1173            for row in &right_qr.rows {
1174                *right_counts.entry(row.clone()).or_insert(0) += 1;
1175            }
1176            let mut rows = Vec::new();
1177            for row in left_qr.rows {
1178                if let Some(count) = right_counts.get_mut(&row) {
1179                    if *count > 0 {
1180                        *count -= 1;
1181                        continue;
1182                    }
1183                }
1184                rows.push(row);
1185            }
1186            rows
1187        }
1188        (SetOp::Except, false) => {
1189            let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1190            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1191            let mut rows = Vec::new();
1192            for row in left_qr.rows {
1193                if !right_set.contains(&row) && !seen.contains(&row) {
1194                    seen.insert(row.clone());
1195                    rows.push(row);
1196                }
1197            }
1198            rows
1199        }
1200    };
1201
1202    if !comp.order_by.is_empty() {
1203        let col_defs: Vec<crate::types::ColumnDef> = columns
1204            .iter()
1205            .enumerate()
1206            .map(|(i, name)| crate::types::ColumnDef {
1207                name: name.clone(),
1208                data_type: crate::types::DataType::Null,
1209                nullable: true,
1210                position: i as u16,
1211                default_expr: None,
1212                default_sql: None,
1213                check_expr: None,
1214                check_sql: None,
1215                check_name: None,
1216                is_with_timezone: false,
1217                generated_expr: None,
1218                generated_sql: None,
1219                generated_kind: None,
1220                collation: crate::types::Collation::Binary,
1221            })
1222            .collect();
1223        sort_rows(&mut rows, &comp.order_by, &col_defs)?;
1224    }
1225
1226    if let Some(ref offset_expr) = comp.offset {
1227        let offset = eval_const_int(offset_expr)?.max(0) as usize;
1228        if offset < rows.len() {
1229            rows = rows.split_off(offset);
1230        } else {
1231            rows.clear();
1232        }
1233    }
1234
1235    if let Some(ref limit_expr) = comp.limit {
1236        let limit = eval_const_int(limit_expr)?.max(0) as usize;
1237        rows.truncate(limit);
1238    }
1239
1240    Ok(ExecutionResult::Query(QueryResult { columns, rows }))
1241}
1242
1243struct InsertBufs {
1244    row: Vec<Value>,
1245    pk_values: Vec<Value>,
1246    value_values: Vec<Value>,
1247    key_buf: Vec<u8>,
1248    value_buf: Vec<u8>,
1249    col_indices: Vec<usize>,
1250    fk_key_buf: Vec<u8>,
1251}
1252
1253impl InsertBufs {
1254    fn new() -> Self {
1255        Self {
1256            row: Vec::new(),
1257            pk_values: Vec::new(),
1258            value_values: Vec::new(),
1259            key_buf: Vec::with_capacity(64),
1260            value_buf: Vec::with_capacity(256),
1261            col_indices: Vec::new(),
1262            fk_key_buf: Vec::with_capacity(64),
1263        }
1264    }
1265}
1266
1267thread_local! {
1268    static INSERT_SCRATCH: RefCell<InsertBufs> = RefCell::new(InsertBufs::new());
1269    static UPSERT_SCRATCH: RefCell<UpsertBufs> = RefCell::new(UpsertBufs::new());
1270}
1271
1272fn with_insert_scratch<R>(f: impl FnOnce(&mut InsertBufs) -> R) -> R {
1273    INSERT_SCRATCH.with(|slot| match slot.try_borrow_mut() {
1274        Ok(mut borrowed) => f(&mut borrowed),
1275        Err(_) => {
1276            let mut local = InsertBufs::new();
1277            f(&mut local)
1278        }
1279    })
1280}
1281
1282pub(super) struct UpsertBufs {
1283    old_row: Vec<Value>,
1284    new_row: Vec<Value>,
1285    value_values: Vec<Value>,
1286    new_value_buf: Vec<u8>,
1287}
1288
1289impl UpsertBufs {
1290    pub(super) fn new() -> Self {
1291        Self {
1292            old_row: Vec::new(),
1293            new_row: Vec::new(),
1294            value_values: Vec::new(),
1295            new_value_buf: Vec::with_capacity(256),
1296        }
1297    }
1298}
1299
1300pub fn exec_insert_in_txn(
1301    wtx: &mut WriteTxn<'_>,
1302    schema: &SchemaManager,
1303    stmt: &InsertStmt,
1304    params: &[Value],
1305) -> Result<ExecutionResult> {
1306    with_insert_scratch(|bufs| {
1307        exec_insert_in_txn_impl(
1308            wtx,
1309            schema,
1310            stmt,
1311            params,
1312            bufs,
1313            None,
1314            &CteContext::default(),
1315        )
1316    })
1317}
1318
1319pub(super) fn exec_insert_in_txn_with_ctes(
1320    wtx: &mut WriteTxn<'_>,
1321    schema: &SchemaManager,
1322    stmt: &InsertStmt,
1323    params: &[Value],
1324    outer_ctes: &CteContext,
1325) -> Result<ExecutionResult> {
1326    with_insert_scratch(|bufs| {
1327        exec_insert_in_txn_impl(wtx, schema, stmt, params, bufs, None, outer_ctes)
1328    })
1329}
1330
1331fn exec_insert_in_txn_cached(
1332    wtx: &mut WriteTxn<'_>,
1333    schema: &SchemaManager,
1334    stmt: &InsertStmt,
1335    params: &[Value],
1336    cache: &InsertCache,
1337) -> Result<ExecutionResult> {
1338    with_insert_scratch(|bufs| {
1339        exec_insert_in_txn_impl(
1340            wtx,
1341            schema,
1342            stmt,
1343            params,
1344            bufs,
1345            Some(cache),
1346            &CteContext::default(),
1347        )
1348    })
1349}
1350
1351fn exec_insert_in_txn_impl(
1352    wtx: &mut WriteTxn<'_>,
1353    schema: &SchemaManager,
1354    stmt: &InsertStmt,
1355    params: &[Value],
1356    bufs: &mut InsertBufs,
1357    cache: Option<&InsertCache>,
1358    outer_ctes: &CteContext,
1359) -> Result<ExecutionResult> {
1360    let empty_ctes = CteContext::default();
1361    let materialized;
1362    let has_sub = match cache {
1363        Some(c) => c.has_subquery,
1364        None => insert_has_subquery(stmt),
1365    };
1366    let stmt = if has_sub {
1367        materialized = materialize_insert(stmt, &mut |sub| {
1368            exec_subquery_write(wtx, schema, sub, &empty_ctes)
1369        })?;
1370        &materialized
1371    } else {
1372        stmt
1373    };
1374
1375    let view_lookup_key = stmt.table.to_ascii_lowercase();
1376    if let Some(view_def) = schema.get_view(&view_lookup_key) {
1377        if super::triggers::has_instead_of(
1378            schema,
1379            &view_lookup_key,
1380            super::triggers::FireEvent::Insert,
1381        ) {
1382            let aliases = view_def.column_aliases.clone();
1383            return exec_instead_of_view_insert_in_txn(
1384                wtx,
1385                schema,
1386                &view_lookup_key,
1387                &aliases,
1388                stmt,
1389                params,
1390            );
1391        }
1392        return Err(SqlError::CannotModifyView(stmt.table.clone()));
1393    }
1394
1395    let table_schema = schema
1396        .get(&stmt.table)
1397        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1398
1399    let default_columns;
1400    let insert_columns: &[String] = if stmt.columns.is_empty() {
1401        default_columns = table_schema
1402            .columns
1403            .iter()
1404            .map(|c| c.name.clone())
1405            .collect::<Vec<_>>();
1406        &default_columns
1407    } else {
1408        &stmt.columns
1409    };
1410
1411    bufs.col_indices.clear();
1412    if let Some(c) = cache {
1413        bufs.col_indices.extend_from_slice(&c.col_indices);
1414    } else {
1415        for name in insert_columns {
1416            bufs.col_indices.push(
1417                table_schema
1418                    .column_index(name)
1419                    .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?,
1420            );
1421        }
1422    }
1423
1424    if cache.is_none() {
1425        for &ci in &bufs.col_indices {
1426            if table_schema.columns[ci].generated_kind.is_some() {
1427                return Err(SqlError::CannotInsertIntoGeneratedColumn(
1428                    table_schema.columns[ci].name.clone(),
1429                ));
1430            }
1431        }
1432    }
1433
1434    let generated_cols_uncached: Vec<(usize, &Expr, FastGenEval)>;
1435    let cached_gen_positions: &[usize];
1436    let cached_gen_fast_evals: &[FastGenEval];
1437    if let Some(c) = cache {
1438        cached_gen_positions = &c.generated_col_positions;
1439        cached_gen_fast_evals = &c.generated_fast_evals;
1440        generated_cols_uncached = Vec::new();
1441    } else {
1442        cached_gen_positions = &[];
1443        cached_gen_fast_evals = &[];
1444        generated_cols_uncached = table_schema
1445            .columns
1446            .iter()
1447            .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
1448            .map(|c| {
1449                let expr = c.generated_expr.as_ref().unwrap();
1450                let fe = detect_fast_gen_eval(expr, table_schema);
1451                (c.position as usize, expr, fe)
1452            })
1453            .collect();
1454    }
1455    let has_gen_cols = !cached_gen_positions.is_empty() || !generated_cols_uncached.is_empty();
1456    let row_col_map_for_gen_owned: Option<ColumnMap> = if !has_gen_cols || cache.is_some() {
1457        None
1458    } else {
1459        Some(ColumnMap::new(&table_schema.columns))
1460    };
1461    let row_col_map_for_gen: Option<&ColumnMap> = if !has_gen_cols {
1462        None
1463    } else if let Some(c) = cache {
1464        c.row_col_map.as_ref()
1465    } else {
1466        row_col_map_for_gen_owned.as_ref()
1467    };
1468
1469    let any_defaults = match cache {
1470        Some(c) => c.any_defaults,
1471        None => table_schema
1472            .columns
1473            .iter()
1474            .any(|c| c.default_expr.is_some()),
1475    };
1476    let defaults: Vec<(usize, &Expr)> = if any_defaults {
1477        table_schema
1478            .columns
1479            .iter()
1480            .filter(|c| {
1481                c.default_expr.is_some() && !bufs.col_indices.contains(&(c.position as usize))
1482            })
1483            .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
1484            .collect()
1485    } else {
1486        Vec::new()
1487    };
1488
1489    let has_checks = match cache {
1490        Some(c) => c.has_checks,
1491        None => table_schema.has_checks(),
1492    };
1493    let check_col_map = if has_checks {
1494        Some(ColumnMap::new(&table_schema.columns))
1495    } else {
1496        None
1497    };
1498
1499    let (pk_indices, non_pk, enc_pos, phys_count, dropped): (
1500        &[usize],
1501        &[usize],
1502        &[u16],
1503        usize,
1504        &[u16],
1505    ) = if let Some(c) = cache {
1506        (
1507            &c.pk_indices,
1508            &c.non_pk_indices,
1509            &c.encoding_positions,
1510            c.phys_count,
1511            &c.dropped_non_pk_slots,
1512        )
1513    } else {
1514        (
1515            table_schema.pk_indices(),
1516            table_schema.non_pk_indices(),
1517            table_schema.encoding_positions(),
1518            table_schema.physical_non_pk_count(),
1519            table_schema.dropped_non_pk_slots(),
1520        )
1521    };
1522
1523    bufs.row.resize(table_schema.columns.len(), Value::Null);
1524    bufs.pk_values.resize(pk_indices.len(), Value::Null);
1525    bufs.value_values.resize(phys_count, Value::Null);
1526
1527    let table_bytes = table_schema.name.as_bytes();
1528    let has_fks = !table_schema.foreign_keys.is_empty();
1529    let has_indices = !table_schema.indices.is_empty();
1530    let has_defaults = !defaults.is_empty();
1531
1532    let compiled_conflict: Option<Arc<CompiledOnConflict>> = match (cache, &stmt.on_conflict) {
1533        (Some(c), Some(_)) if c.on_conflict.is_some() => c.on_conflict.clone(),
1534        (_, Some(oc)) => Some(Arc::new(compile_on_conflict(oc, table_schema)?)),
1535        (_, None) => None,
1536    };
1537
1538    let row_col_map_owned: Option<ColumnMap> =
1539        if compiled_conflict.is_some() && cache.and_then(|c| c.row_col_map.as_ref()).is_none() {
1540            Some(ColumnMap::new(&table_schema.columns))
1541        } else {
1542            None
1543        };
1544    let row_col_map: Option<&ColumnMap> = cache
1545        .and_then(|c| c.row_col_map.as_ref())
1546        .or(row_col_map_owned.as_ref());
1547
1548    let select_rows = match &stmt.source {
1549        InsertSource::Select(sq) => {
1550            let insert_ctes = super::materialize_all_ctes_with_outer(
1551                &sq.ctes,
1552                sq.recursive,
1553                outer_ctes,
1554                &mut |body, ctx| exec_query_body_write(wtx, schema, body, ctx),
1555            )?;
1556            let qr = exec_query_body_write(wtx, schema, &sq.body, &insert_ctes)?;
1557            Some(qr.rows)
1558        }
1559        InsertSource::Values(_) => None,
1560    };
1561
1562    let mut count: u64 = 0;
1563    let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
1564        stmt.returning.as_ref().map(|_| Vec::new());
1565
1566    let values = match &stmt.source {
1567        InsertSource::Values(rows) => Some(rows.as_slice()),
1568        InsertSource::Select(_) => None,
1569    };
1570    let sel_rows = select_rows.as_deref();
1571
1572    let total = match (values, sel_rows) {
1573        (Some(rows), _) => rows.len(),
1574        (_, Some(rows)) => rows.len(),
1575        _ => 0,
1576    };
1577
1578    if let Some(sel) = sel_rows {
1579        if !sel.is_empty() && sel[0].len() != insert_columns.len() {
1580            return Err(SqlError::InvalidValue(format!(
1581                "INSERT ... SELECT column count mismatch: expected {}, got {}",
1582                insert_columns.len(),
1583                sel[0].len()
1584            )));
1585        }
1586    }
1587
1588    let has_insert_statement_triggers_impl =
1589        schema.triggers_for(&table_schema.name).iter().any(|t| {
1590            t.enabled
1591                && t.granularity == crate::parser::TriggerGranularity::ForEachStatement
1592                && t.events
1593                    .iter()
1594                    .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1595        });
1596    let mut stmt_new_rows_impl: Vec<Vec<Value>> = if has_insert_statement_triggers_impl {
1597        Vec::with_capacity(total)
1598    } else {
1599        Vec::new()
1600    };
1601    if has_insert_statement_triggers_impl {
1602        super::triggers::fire_statement_triggers(
1603            wtx,
1604            schema,
1605            &table_schema.name,
1606            crate::parser::TriggerTiming::Before,
1607            super::triggers::FireEvent::Insert,
1608            &table_schema.columns,
1609            &[],
1610            &[],
1611        )?;
1612    }
1613
1614    let skip_row_clear = cache.is_some_and(|c| c.row_fully_overwritten);
1615    for idx in 0..total {
1616        if !skip_row_clear {
1617            for v in bufs.row.iter_mut() {
1618                *v = Value::Null;
1619            }
1620        }
1621
1622        if let Some(value_rows) = values {
1623            if let Some(plan) = cache.and_then(|c| c.bind_plan.as_ref()) {
1624                for action in plan {
1625                    match action {
1626                        BindAction::Param {
1627                            param_idx,
1628                            col_idx,
1629                            target,
1630                        } => {
1631                            let v = &params[*param_idx];
1632                            bufs.row[*col_idx] = if v.is_null() {
1633                                Value::Null
1634                            } else if v.data_type() == *target {
1635                                v.clone()
1636                            } else {
1637                                let got = v.data_type();
1638                                v.clone().coerce_into(*target).ok_or_else(|| {
1639                                    SqlError::TypeMismatch {
1640                                        expected: target.to_string(),
1641                                        got: got.to_string(),
1642                                    }
1643                                })?
1644                            };
1645                        }
1646                        BindAction::Literal { value, col_idx } => {
1647                            bufs.row[*col_idx] = value.clone();
1648                        }
1649                    }
1650                }
1651            } else {
1652                let value_row = &value_rows[idx];
1653                if value_row.len() != insert_columns.len() {
1654                    return Err(SqlError::InvalidValue(format!(
1655                        "expected {} values, got {}",
1656                        insert_columns.len(),
1657                        value_row.len()
1658                    )));
1659                }
1660                for (i, expr) in value_row.iter().enumerate() {
1661                    let val = match expr {
1662                        Expr::Parameter(n) => params
1663                            .get(n - 1)
1664                            .cloned()
1665                            .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
1666                        Expr::Literal(v) => v.clone(),
1667                        _ => eval_const_expr(expr)?,
1668                    };
1669                    let col_idx = bufs.col_indices[i];
1670                    let col = &table_schema.columns[col_idx];
1671                    let got_type = val.data_type();
1672                    bufs.row[col_idx] = if val.is_null() {
1673                        Value::Null
1674                    } else {
1675                        val.coerce_into(col.data_type)
1676                            .ok_or_else(|| SqlError::TypeMismatch {
1677                                expected: col.data_type.to_string(),
1678                                got: got_type.to_string(),
1679                            })?
1680                    };
1681                }
1682            }
1683        } else if let Some(sel) = sel_rows {
1684            let sel_row = &sel[idx];
1685            for (i, val) in sel_row.iter().enumerate() {
1686                let col_idx = bufs.col_indices[i];
1687                let col = &table_schema.columns[col_idx];
1688                let got_type = val.data_type();
1689                bufs.row[col_idx] = if val.is_null() {
1690                    Value::Null
1691                } else {
1692                    val.clone().coerce_into(col.data_type).ok_or_else(|| {
1693                        SqlError::TypeMismatch {
1694                            expected: col.data_type.to_string(),
1695                            got: got_type.to_string(),
1696                        }
1697                    })?
1698                };
1699            }
1700        }
1701
1702        if has_defaults {
1703            for &(pos, def_expr) in &defaults {
1704                let val = eval_const_expr(def_expr)?;
1705                let col = &table_schema.columns[pos];
1706                if !val.is_null() {
1707                    let got_type = val.data_type();
1708                    bufs.row[pos] =
1709                        val.coerce_into(col.data_type)
1710                            .ok_or_else(|| SqlError::TypeMismatch {
1711                                expected: col.data_type.to_string(),
1712                                got: got_type.to_string(),
1713                            })?;
1714                }
1715            }
1716        }
1717
1718        if let Some(gen_map) = row_col_map_for_gen {
1719            if cache.is_some() {
1720                for (pos, fast) in cached_gen_positions
1721                    .iter()
1722                    .copied()
1723                    .zip(cached_gen_fast_evals.iter())
1724                {
1725                    let gen_expr = table_schema.columns[pos].generated_expr.as_ref().unwrap();
1726                    let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1727                    let col = &table_schema.columns[pos];
1728                    bufs.row[pos] = if val.is_null() {
1729                        Value::Null
1730                    } else {
1731                        let got_type = val.data_type();
1732                        val.coerce_into(col.data_type)
1733                            .ok_or_else(|| SqlError::TypeMismatch {
1734                                expected: col.data_type.to_string(),
1735                                got: got_type.to_string(),
1736                            })?
1737                    };
1738                }
1739            } else {
1740                for (pos, gen_expr, fast) in &generated_cols_uncached {
1741                    let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1742                    let col = &table_schema.columns[*pos];
1743                    bufs.row[*pos] = if val.is_null() {
1744                        Value::Null
1745                    } else {
1746                        let got_type = val.data_type();
1747                        val.coerce_into(col.data_type)
1748                            .ok_or_else(|| SqlError::TypeMismatch {
1749                                expected: col.data_type.to_string(),
1750                                got: got_type.to_string(),
1751                            })?
1752                    };
1753                }
1754            }
1755        }
1756
1757        if let Some(c) = cache {
1758            for &pos in &c.not_null_indices {
1759                if bufs.row[pos as usize].is_null() {
1760                    return Err(SqlError::NotNullViolation(
1761                        table_schema.columns[pos as usize].name.clone(),
1762                    ));
1763                }
1764            }
1765        } else {
1766            for col in &table_schema.columns {
1767                if !col.nullable && bufs.row[col.position as usize].is_null() {
1768                    return Err(SqlError::NotNullViolation(col.name.clone()));
1769                }
1770            }
1771        }
1772
1773        if let Some(ref col_map) = check_col_map {
1774            for col in &table_schema.columns {
1775                if let Some(ref check) = col.check_expr {
1776                    let result = eval_expr(check, &EvalCtx::new(col_map, &bufs.row))?;
1777                    if !is_truthy(&result) && !result.is_null() {
1778                        let name = col.check_name.as_deref().unwrap_or(&col.name);
1779                        return Err(SqlError::CheckViolation(name.to_string()));
1780                    }
1781                }
1782            }
1783            for tc in &table_schema.check_constraints {
1784                let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &bufs.row))?;
1785                if !is_truthy(&result) && !result.is_null() {
1786                    let name = tc.name.as_deref().unwrap_or(&tc.sql);
1787                    return Err(SqlError::CheckViolation(name.to_string()));
1788                }
1789            }
1790        }
1791
1792        if has_fks {
1793            for fk in &table_schema.foreign_keys {
1794                let any_null = fk.columns.iter().any(|&ci| bufs.row[ci as usize].is_null());
1795                if any_null {
1796                    continue;
1797                }
1798                crate::encoding::encode_composite_key_from_indices(
1799                    &fk.columns,
1800                    &bufs.row,
1801                    &mut bufs.fk_key_buf,
1802                );
1803                if fk.deferrable && fk.initially_deferred {
1804                    let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
1805                    wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
1806                        fk_name: name,
1807                        foreign_table: fk.foreign_table.as_bytes().to_vec(),
1808                        parent_key: bufs.fk_key_buf.clone(),
1809                    });
1810                    continue;
1811                }
1812                if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &bufs.fk_key_buf) {
1813                    let found = wtx
1814                        .table_get(fk.foreign_table.as_bytes(), &bufs.fk_key_buf)
1815                        .map_err(SqlError::Storage)?;
1816                    if found.is_none() {
1817                        let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
1818                        return Err(SqlError::ForeignKeyViolation(name.to_string()));
1819                    }
1820                    wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &bufs.fk_key_buf);
1821                }
1822            }
1823        }
1824
1825        let proposed_row_for_returning: Option<Vec<Value>> =
1826            returning_rows.as_ref().map(|_| bufs.row.clone());
1827        let row_for_stmt_trigger_impl: Option<Vec<Value>> = if has_insert_statement_triggers_impl {
1828            Some(bufs.row.clone())
1829        } else {
1830            None
1831        };
1832
1833        let has_before_insert_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
1834            t.enabled
1835                && t.timing == crate::parser::TriggerTiming::Before
1836                && t.granularity == crate::parser::TriggerGranularity::ForEachRow
1837                && t.events
1838                    .iter()
1839                    .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1840        });
1841        if has_before_insert_triggers {
1842            super::triggers::fire_row_triggers(
1843                wtx,
1844                schema,
1845                &table_schema.name,
1846                crate::parser::TriggerTiming::Before,
1847                super::triggers::FireEvent::Insert,
1848                None,
1849                Some(bufs.row.clone()),
1850                &table_schema.columns,
1851            )?;
1852        }
1853
1854        for (j, &i) in pk_indices.iter().enumerate() {
1855            bufs.pk_values[j] = std::mem::replace(&mut bufs.row[i], Value::Null);
1856        }
1857        match cache.map(|c| c.single_int_pk).unwrap_or(false) {
1858            true => match bufs.pk_values[0] {
1859                Value::Integer(v) => crate::encoding::encode_int_key_into(v, &mut bufs.key_buf),
1860                _ => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1861            },
1862            false => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1863        }
1864
1865        for &slot in dropped {
1866            bufs.value_values[slot as usize] = Value::Null;
1867        }
1868        for (j, &i) in non_pk.iter().enumerate() {
1869            let col = &table_schema.columns[i];
1870            if matches!(
1871                col.generated_kind,
1872                Some(crate::parser::GeneratedKind::Virtual)
1873            ) {
1874                bufs.value_values[enc_pos[j] as usize] = Value::Null;
1875                bufs.row[i] = Value::Null;
1876            } else {
1877                bufs.value_values[enc_pos[j] as usize] =
1878                    std::mem::replace(&mut bufs.row[i], Value::Null);
1879            }
1880        }
1881        match cache.and_then(|c| c.row_encoder.as_ref()) {
1882            Some(tmpl) => crate::encoding::encode_int_row_with_template(
1883                tmpl,
1884                &bufs.value_values,
1885                &mut bufs.value_buf,
1886            )?,
1887            None => encode_row_into(&bufs.value_values, &mut bufs.value_buf),
1888        }
1889
1890        if bufs.key_buf.len() > citadel_core::MAX_KEY_SIZE {
1891            return Err(SqlError::KeyTooLarge {
1892                size: bufs.key_buf.len(),
1893                max: citadel_core::MAX_KEY_SIZE,
1894            });
1895        }
1896        if bufs.value_buf.len() > citadel_core::MAX_VALUE_SIZE {
1897            return Err(SqlError::RowTooLarge {
1898                size: bufs.value_buf.len(),
1899                max: citadel_core::MAX_VALUE_SIZE,
1900            });
1901        }
1902
1903        match compiled_conflict.as_ref() {
1904            None => {
1905                let is_new = wtx
1906                    .table_insert(table_bytes, &bufs.key_buf, &bufs.value_buf)
1907                    .map_err(SqlError::Storage)?;
1908                if !is_new {
1909                    return Err(SqlError::DuplicateKey);
1910                }
1911                let has_after_insert_triggers =
1912                    schema.triggers_for(&table_schema.name).iter().any(|t| {
1913                        t.enabled
1914                            && t.timing == crate::parser::TriggerTiming::After
1915                            && t.granularity == crate::parser::TriggerGranularity::ForEachRow
1916                            && t.events
1917                                .iter()
1918                                .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1919                    });
1920                if has_indices || has_after_insert_triggers {
1921                    for (j, &i) in pk_indices.iter().enumerate() {
1922                        bufs.row[i] = bufs.pk_values[j].clone();
1923                    }
1924                    for (j, &i) in non_pk.iter().enumerate() {
1925                        bufs.row[i] = std::mem::replace(
1926                            &mut bufs.value_values[enc_pos[j] as usize],
1927                            Value::Null,
1928                        );
1929                    }
1930                    if has_indices {
1931                        insert_index_entries(wtx, table_schema, &bufs.row, &bufs.pk_values)?;
1932                    }
1933                    if has_after_insert_triggers {
1934                        super::triggers::fire_row_triggers(
1935                            wtx,
1936                            schema,
1937                            &table_schema.name,
1938                            crate::parser::TriggerTiming::After,
1939                            super::triggers::FireEvent::Insert,
1940                            None,
1941                            Some(bufs.row.clone()),
1942                            &table_schema.columns,
1943                        )?;
1944                    }
1945                }
1946                if let Some(r) = row_for_stmt_trigger_impl.clone() {
1947                    stmt_new_rows_impl.push(r);
1948                }
1949                count += 1;
1950                if let Some(buf) = returning_rows.as_mut() {
1951                    buf.push((None, proposed_row_for_returning));
1952                }
1953            }
1954            Some(oc) => {
1955                let oc_ref: &CompiledOnConflict = oc;
1956                let needs_row = upsert_needs_row(oc_ref, table_schema);
1957                if needs_row {
1958                    for (j, &i) in pk_indices.iter().enumerate() {
1959                        bufs.row[i] = bufs.pk_values[j].clone();
1960                    }
1961                    for (j, &i) in non_pk.iter().enumerate() {
1962                        bufs.row[i] = std::mem::replace(
1963                            &mut bufs.value_values[enc_pos[j] as usize],
1964                            Value::Null,
1965                        );
1966                    }
1967                }
1968                let outcome = apply_insert_with_conflict(
1969                    wtx,
1970                    table_schema,
1971                    &bufs.key_buf,
1972                    &bufs.value_buf,
1973                    &bufs.row,
1974                    &bufs.pk_values,
1975                    oc_ref,
1976                    row_col_map.unwrap(),
1977                    stmt.returning.is_some(),
1978                )?;
1979                match outcome {
1980                    InsertRowOutcome::Inserted => {
1981                        count += 1;
1982                        if let Some(buf) = returning_rows.as_mut() {
1983                            buf.push((None, proposed_row_for_returning));
1984                        }
1985                        if let Some(r) = row_for_stmt_trigger_impl.clone() {
1986                            stmt_new_rows_impl.push(r);
1987                        }
1988                        let has_after_insert_triggers =
1989                            schema.triggers_for(&table_schema.name).iter().any(|t| {
1990                                t.enabled
1991                                    && t.timing == crate::parser::TriggerTiming::After
1992                                    && t.granularity
1993                                        == crate::parser::TriggerGranularity::ForEachRow
1994                                    && t.events
1995                                        .iter()
1996                                        .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1997                            });
1998                        if has_after_insert_triggers {
1999                            super::triggers::fire_row_triggers(
2000                                wtx,
2001                                schema,
2002                                &table_schema.name,
2003                                crate::parser::TriggerTiming::After,
2004                                super::triggers::FireEvent::Insert,
2005                                None,
2006                                Some(bufs.row.clone()),
2007                                &table_schema.columns,
2008                            )?;
2009                        }
2010                    }
2011                    InsertRowOutcome::Updated { old, new } => {
2012                        count += 1;
2013                        if let Some(buf) = returning_rows.as_mut() {
2014                            buf.push((Some(old.clone()), Some(new.clone())));
2015                        }
2016                        let has_after_update_triggers =
2017                            schema.triggers_for(&table_schema.name).iter().any(|t| {
2018                                t.enabled
2019                                    && t.timing == crate::parser::TriggerTiming::After
2020                                    && t.granularity
2021                                        == crate::parser::TriggerGranularity::ForEachRow
2022                                    && t.events.iter().any(|e| {
2023                                        matches!(e, crate::parser::TriggerEvent::Update(_))
2024                                    })
2025                            });
2026                        if has_after_update_triggers {
2027                            let changed_cols: Vec<String> = match oc_ref {
2028                                CompiledOnConflict::DoUpdate { assignments, .. } => assignments
2029                                    .iter()
2030                                    .map(|(col_idx, _)| table_schema.columns[*col_idx].name.clone())
2031                                    .collect(),
2032                                _ => Vec::new(),
2033                            };
2034                            super::triggers::fire_row_triggers(
2035                                wtx,
2036                                schema,
2037                                &table_schema.name,
2038                                crate::parser::TriggerTiming::After,
2039                                super::triggers::FireEvent::Update {
2040                                    changed_columns: &changed_cols,
2041                                },
2042                                Some(old),
2043                                Some(new),
2044                                &table_schema.columns,
2045                            )?;
2046                        }
2047                    }
2048                    InsertRowOutcome::Skipped => {}
2049                }
2050            }
2051        }
2052    }
2053
2054    if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
2055        if has_insert_statement_triggers_impl {
2056            super::triggers::fire_statement_triggers(
2057                wtx,
2058                schema,
2059                &table_schema.name,
2060                crate::parser::TriggerTiming::After,
2061                super::triggers::FireEvent::Insert,
2062                &table_schema.columns,
2063                &[],
2064                &stmt_new_rows_impl,
2065            )?;
2066        }
2067        return Ok(ExecutionResult::Query(super::helpers::project_returning(
2068            table_schema,
2069            returning_cols,
2070            &rows,
2071        )?));
2072    }
2073
2074    if has_insert_statement_triggers_impl {
2075        super::triggers::fire_statement_triggers(
2076            wtx,
2077            schema,
2078            &table_schema.name,
2079            crate::parser::TriggerTiming::After,
2080            super::triggers::FireEvent::Insert,
2081            &table_schema.columns,
2082            &[],
2083            &stmt_new_rows_impl,
2084        )?;
2085    }
2086
2087    Ok(ExecutionResult::RowsAffected(count))
2088}
2089
2090pub struct CompiledInsert {
2091    table_lower: String,
2092    cached: Option<InsertCache>,
2093}
2094
2095struct InsertCache {
2096    col_indices: Vec<usize>,
2097    has_subquery: bool,
2098    any_defaults: bool,
2099    has_checks: bool,
2100    on_conflict: Option<Arc<CompiledOnConflict>>,
2101    row_col_map: Option<ColumnMap>,
2102    generated_col_positions: Vec<usize>,
2103    generated_fast_evals: Vec<FastGenEval>,
2104    pk_indices: Vec<usize>,
2105    non_pk_indices: Vec<usize>,
2106    encoding_positions: Vec<u16>,
2107    dropped_non_pk_slots: Vec<u16>,
2108    phys_count: usize,
2109    single_int_pk: bool,
2110    not_null_indices: Vec<u16>,
2111    bind_plan: Option<Vec<BindAction>>,
2112    row_fully_overwritten: bool,
2113    row_encoder: Option<crate::encoding::IntRowTemplate>,
2114    is_trivial_fast: bool,
2115    trivial_fast_program: Option<TrivialFastProgram>,
2116    needs_scoped_params: bool,
2117}
2118
2119#[derive(Clone)]
2120enum BindAction {
2121    Param {
2122        param_idx: usize,
2123        col_idx: usize,
2124        target: DataType,
2125    },
2126    Literal {
2127        value: Value,
2128        col_idx: usize,
2129    },
2130}
2131
2132#[derive(Clone)]
2133struct TrivialFastProgram {
2134    template: Vec<u8>,
2135    ops: Vec<WriteOp>,
2136    pk_param: u8,
2137    not_null_param_indices: Vec<u8>,
2138}
2139
2140#[derive(Clone)]
2141enum WriteOp {
2142    ParamI64 {
2143        param_idx: u8,
2144        off: u32,
2145    },
2146    LiteralI64 {
2147        value: i64,
2148        off: u32,
2149    },
2150    GenAddParamsI64 {
2151        a_param: u8,
2152        b_param: u8,
2153        off: u32,
2154        bitmap_byte_off: u32,
2155        bitmap_bit_mask: u8,
2156    },
2157    GenMulAddParamI64 {
2158        param_idx: u8,
2159        mul: i64,
2160        add: i64,
2161        off: u32,
2162        bitmap_byte_off: u32,
2163        bitmap_bit_mask: u8,
2164    },
2165}
2166
2167fn build_trivial_fast_program(
2168    bind_plan: &[BindAction],
2169    row_encoder: &crate::encoding::IntRowTemplate,
2170    non_virtual_pairs: &[(usize, usize)],
2171    generated_col_positions: &[usize],
2172    generated_fast_evals: &[FastGenEval],
2173    pk_indices: &[usize],
2174    columns: &[crate::types::ColumnDef],
2175) -> Option<TrivialFastProgram> {
2176    let pk_col = pk_indices[0];
2177    let col_to_slot: rustc_hash::FxHashMap<usize, usize> =
2178        non_virtual_pairs.iter().copied().collect();
2179    let slot_to_off: rustc_hash::FxHashMap<usize, usize> =
2180        row_encoder.slot_offsets.iter().copied().collect();
2181
2182    let mut col_to_param: rustc_hash::FxHashMap<usize, u8> = Default::default();
2183    let mut col_to_lit_int: rustc_hash::FxHashMap<usize, i64> = Default::default();
2184    let mut pk_param: Option<u8> = None;
2185    let mut ops: Vec<WriteOp> = Vec::with_capacity(bind_plan.len() + generated_col_positions.len());
2186    let mut not_null_param_indices: Vec<u8> = Vec::new();
2187
2188    for action in bind_plan {
2189        match action {
2190            BindAction::Param {
2191                param_idx,
2192                col_idx,
2193                target,
2194            } => {
2195                if *target != DataType::Integer {
2196                    return None;
2197                }
2198                let pi: u8 = u8::try_from(*param_idx).ok()?;
2199                col_to_param.insert(*col_idx, pi);
2200                if *col_idx == pk_col {
2201                    pk_param = Some(pi);
2202                } else {
2203                    let slot = *col_to_slot.get(col_idx)?;
2204                    let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
2205                    ops.push(WriteOp::ParamI64 { param_idx: pi, off });
2206                    if !columns[*col_idx].nullable {
2207                        not_null_param_indices.push(pi);
2208                    }
2209                }
2210            }
2211            BindAction::Literal { value, col_idx } => match value {
2212                Value::Integer(v) => {
2213                    col_to_lit_int.insert(*col_idx, *v);
2214                    if *col_idx == pk_col {
2215                        return None;
2216                    }
2217                    let slot = *col_to_slot.get(col_idx)?;
2218                    let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
2219                    ops.push(WriteOp::LiteralI64 { value: *v, off });
2220                }
2221                _ => return None,
2222            },
2223        }
2224    }
2225
2226    let pk_param = pk_param?;
2227
2228    for (i, &gen_pos) in generated_col_positions.iter().enumerate() {
2229        let gen_slot = *col_to_slot.get(&gen_pos)?;
2230        let gen_off = u32::try_from(*slot_to_off.get(&gen_slot)?).ok()?;
2231        let bitmap_byte_off = u32::try_from(2 + gen_slot / 8).ok()?;
2232        let bitmap_bit_mask: u8 = 1u8 << (gen_slot % 8);
2233        let gen_col_nullable = columns[gen_pos].nullable;
2234
2235        match &generated_fast_evals[i] {
2236            FastGenEval::IntColAddCol {
2237                left_idx,
2238                right_idx,
2239            } => {
2240                let a_param = col_to_param.get(left_idx).copied();
2241                let b_param = col_to_param.get(right_idx).copied();
2242                match (a_param, b_param) {
2243                    (Some(ap), Some(bp)) => {
2244                        let deps_safe = gen_col_nullable
2245                            || (not_null_param_indices.contains(&ap)
2246                                && not_null_param_indices.contains(&bp));
2247                        if !deps_safe {
2248                            return None;
2249                        }
2250                        ops.push(WriteOp::GenAddParamsI64 {
2251                            a_param: ap,
2252                            b_param: bp,
2253                            off: gen_off,
2254                            bitmap_byte_off,
2255                            bitmap_bit_mask,
2256                        });
2257                    }
2258                    (Some(p), None) => {
2259                        let lit = col_to_lit_int.get(right_idx).copied()?;
2260                        if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2261                            return None;
2262                        }
2263                        ops.push(WriteOp::GenMulAddParamI64 {
2264                            param_idx: p,
2265                            mul: 1,
2266                            add: lit,
2267                            off: gen_off,
2268                            bitmap_byte_off,
2269                            bitmap_bit_mask,
2270                        });
2271                    }
2272                    (None, Some(p)) => {
2273                        let lit = col_to_lit_int.get(left_idx).copied()?;
2274                        if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2275                            return None;
2276                        }
2277                        ops.push(WriteOp::GenMulAddParamI64 {
2278                            param_idx: p,
2279                            mul: 1,
2280                            add: lit,
2281                            off: gen_off,
2282                            bitmap_byte_off,
2283                            bitmap_bit_mask,
2284                        });
2285                    }
2286                    (None, None) => {
2287                        let la = col_to_lit_int.get(left_idx).copied()?;
2288                        let lb = col_to_lit_int.get(right_idx).copied()?;
2289                        ops.push(WriteOp::LiteralI64 {
2290                            value: la.wrapping_add(lb),
2291                            off: gen_off,
2292                        });
2293                    }
2294                }
2295            }
2296            FastGenEval::IntColMulAdd {
2297                col_schema_idx,
2298                mul,
2299                add,
2300            } => {
2301                if let Some(p) = col_to_param.get(col_schema_idx).copied() {
2302                    if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2303                        return None;
2304                    }
2305                    ops.push(WriteOp::GenMulAddParamI64 {
2306                        param_idx: p,
2307                        mul: *mul,
2308                        add: *add,
2309                        off: gen_off,
2310                        bitmap_byte_off,
2311                        bitmap_bit_mask,
2312                    });
2313                } else if let Some(lit) = col_to_lit_int.get(col_schema_idx).copied() {
2314                    ops.push(WriteOp::LiteralI64 {
2315                        value: lit.wrapping_mul(*mul).wrapping_add(*add),
2316                        off: gen_off,
2317                    });
2318                } else {
2319                    return None;
2320                }
2321            }
2322            FastGenEval::None => return None,
2323        }
2324    }
2325
2326    Some(TrivialFastProgram {
2327        template: row_encoder.template.clone(),
2328        ops,
2329        pk_param,
2330        not_null_param_indices,
2331    })
2332}
2333
2334#[derive(Clone)]
2335pub(super) enum CompiledOnConflict {
2336    DoNothing {
2337        target: Option<ConflictKind>,
2338    },
2339    DoUpdate {
2340        target: ConflictKind,
2341        assignments: Vec<(usize, Expr)>,
2342        where_clause: Option<Expr>,
2343        fast_paths: Option<Vec<DoUpdateFastPath>>,
2344    },
2345}
2346
2347#[derive(Clone, Copy)]
2348pub(super) enum DoUpdateFastPath {
2349    IntAddConst { phys_idx: usize, delta: i64 },
2350}
2351
2352#[derive(Clone, Debug)]
2353pub(super) enum ConflictKind {
2354    PrimaryKey,
2355    UniqueIndex { index_idx: usize },
2356}
2357
2358fn resolve_conflict_target(target: &ConflictTarget, ts: &TableSchema) -> Result<ConflictKind> {
2359    match target {
2360        ConflictTarget::Columns(cols) => {
2361            let col_idx_set: Vec<u16> = cols
2362                .iter()
2363                .map(|name| {
2364                    ts.column_index(name)
2365                        .map(|i| i as u16)
2366                        .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
2367                })
2368                .collect::<Result<_>>()?;
2369            let pk_set = ts.primary_key_columns.clone();
2370            if set_equal(&col_idx_set, &pk_set) {
2371                return Ok(ConflictKind::PrimaryKey);
2372            }
2373            for (index_idx, idx) in ts.indices.iter().enumerate() {
2374                if idx.unique && set_equal(&col_idx_set, &idx.columns_vec()) {
2375                    return Ok(ConflictKind::UniqueIndex { index_idx });
2376                }
2377            }
2378            Err(SqlError::Plan(
2379                "ON CONFLICT target does not match any unique constraint".into(),
2380            ))
2381        }
2382        ConflictTarget::Constraint(name) => {
2383            let lower = name.to_ascii_lowercase();
2384            for (index_idx, idx) in ts.indices.iter().enumerate() {
2385                if idx.name.eq_ignore_ascii_case(&lower) {
2386                    if idx.unique {
2387                        return Ok(ConflictKind::UniqueIndex { index_idx });
2388                    }
2389                    return Err(SqlError::Plan(format!(
2390                        "ON CONFLICT ON CONSTRAINT '{name}' requires a unique index"
2391                    )));
2392                }
2393            }
2394            Err(SqlError::Plan(format!(
2395                "unknown constraint '{name}'; primary keys cannot be referenced by name, use ON CONFLICT (col_list)"
2396            )))
2397        }
2398    }
2399}
2400
2401fn set_equal(a: &[u16], b: &[u16]) -> bool {
2402    if a.len() != b.len() {
2403        return false;
2404    }
2405    let mut a_sorted = a.to_vec();
2406    let mut b_sorted = b.to_vec();
2407    a_sorted.sort_unstable();
2408    b_sorted.sort_unstable();
2409    a_sorted == b_sorted
2410}
2411
2412pub(super) enum InsertRowOutcome {
2413    Inserted,
2414    Updated { old: Vec<Value>, new: Vec<Value> },
2415    Skipped,
2416}
2417
2418#[allow(clippy::too_many_arguments)]
2419#[inline]
2420pub(super) fn apply_insert_with_conflict(
2421    wtx: &mut WriteTxn<'_>,
2422    table_schema: &TableSchema,
2423    key_buf: &[u8],
2424    value_buf: &[u8],
2425    row: &[Value],
2426    pk_values: &[Value],
2427    on_conflict: &CompiledOnConflict,
2428    col_map: &ColumnMap,
2429    capture_returning: bool,
2430) -> Result<InsertRowOutcome> {
2431    let table_bytes = table_schema.name.as_bytes();
2432
2433    if let CompiledOnConflict::DoNothing { target } = on_conflict {
2434        let pk_target = matches!(target, None | Some(ConflictKind::PrimaryKey));
2435        if pk_target && table_schema.indices.is_empty() && table_schema.foreign_keys.is_empty() {
2436            let inserted = wtx
2437                .table_insert_if_absent(table_bytes, key_buf, value_buf)
2438                .map_err(SqlError::Storage)?;
2439            return Ok(if inserted {
2440                InsertRowOutcome::Inserted
2441            } else {
2442                InsertRowOutcome::Skipped
2443            });
2444        }
2445    }
2446
2447    if let CompiledOnConflict::DoUpdate {
2448        target: ConflictKind::PrimaryKey,
2449        assignments,
2450        where_clause,
2451        fast_paths,
2452    } = on_conflict
2453    {
2454        if can_fuse_do_update(table_schema, assignments) {
2455            return apply_do_update_fused(
2456                wtx,
2457                table_schema,
2458                table_bytes,
2459                key_buf,
2460                value_buf,
2461                row,
2462                assignments,
2463                where_clause.as_ref(),
2464                col_map,
2465                fast_paths.as_deref(),
2466                capture_returning,
2467            );
2468        }
2469    }
2470
2471    let primary_outcome = wtx
2472        .table_insert_or_fetch(table_bytes, key_buf, value_buf)
2473        .map_err(SqlError::Storage)?;
2474
2475    match primary_outcome {
2476        citadel_txn::write_txn::InsertOutcome::Inserted => {
2477            if table_schema.indices.is_empty() {
2478                return Ok(InsertRowOutcome::Inserted);
2479            }
2480            let mut inserted_keys: Vec<(usize, Vec<u8>)> = Vec::new();
2481            match insert_index_entries_or_fetch(
2482                wtx,
2483                table_schema,
2484                row,
2485                pk_values,
2486                &mut inserted_keys,
2487            )? {
2488                None => Ok(InsertRowOutcome::Inserted),
2489                Some(conflicting_idx) => {
2490                    let matches_target =
2491                        matches!(on_conflict, CompiledOnConflict::DoNothing { target: None })
2492                            || matches!(
2493                                on_conflict,
2494                                CompiledOnConflict::DoNothing {
2495                                    target: Some(ConflictKind::UniqueIndex { index_idx }),
2496                                } | CompiledOnConflict::DoUpdate {
2497                                    target: ConflictKind::UniqueIndex { index_idx },
2498                                    ..
2499                                } if *index_idx == conflicting_idx
2500                            );
2501                    undo_partial_insert(wtx, table_schema, key_buf, &inserted_keys)?;
2502                    if !matches_target {
2503                        return Err(SqlError::UniqueViolation(
2504                            table_schema.indices[conflicting_idx].name.clone(),
2505                        ));
2506                    }
2507                    match on_conflict {
2508                        CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2509                        CompiledOnConflict::DoUpdate {
2510                            assignments,
2511                            where_clause,
2512                            ..
2513                        } => {
2514                            let existing_pk =
2515                                fetch_unique_index_pk(wtx, table_schema, conflicting_idx, row)?;
2516                            apply_do_update(
2517                                wtx,
2518                                table_schema,
2519                                &existing_pk,
2520                                row,
2521                                assignments,
2522                                where_clause.as_ref(),
2523                                col_map,
2524                                capture_returning,
2525                            )
2526                        }
2527                    }
2528                }
2529            }
2530        }
2531        citadel_txn::write_txn::InsertOutcome::Existed(old_bytes) => {
2532            let matches_target = matches!(
2533                on_conflict,
2534                CompiledOnConflict::DoNothing { target: None }
2535                    | CompiledOnConflict::DoNothing {
2536                        target: Some(ConflictKind::PrimaryKey),
2537                    }
2538                    | CompiledOnConflict::DoUpdate {
2539                        target: ConflictKind::PrimaryKey,
2540                        ..
2541                    }
2542            );
2543            if !matches_target {
2544                return Err(SqlError::DuplicateKey);
2545            }
2546            match on_conflict {
2547                CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2548                CompiledOnConflict::DoUpdate {
2549                    assignments,
2550                    where_clause,
2551                    ..
2552                } => {
2553                    let old_row = decode_full_row(table_schema, key_buf, &old_bytes)?;
2554                    apply_do_update_with_old_row(
2555                        wtx,
2556                        table_schema,
2557                        key_buf,
2558                        &old_row,
2559                        row,
2560                        assignments,
2561                        where_clause.as_ref(),
2562                        col_map,
2563                        capture_returning,
2564                    )
2565                }
2566            }
2567        }
2568    }
2569}
2570
2571#[inline]
2572fn apply_fast_path_patch(
2573    old_bytes: &[u8],
2574    fast_paths: &[DoUpdateFastPath],
2575) -> Result<UpsertAction> {
2576    UPSERT_SCRATCH.with(|slot| {
2577        let mut bufs = slot.borrow_mut();
2578        bufs.new_value_buf.clear();
2579        bufs.new_value_buf.extend_from_slice(old_bytes);
2580
2581        let mut patch_scratch: Vec<u8> = Vec::new();
2582
2583        for fp in fast_paths {
2584            match fp {
2585                DoUpdateFastPath::IntAddConst { phys_idx, delta } => {
2586                    let decoded =
2587                        crate::encoding::decode_columns(&bufs.new_value_buf, &[*phys_idx])?;
2588                    let old_val = &decoded[0];
2589                    let new_val = match old_val {
2590                        Value::Integer(i) => Value::Integer(i.wrapping_add(*delta)),
2591                        Value::Null => Value::Null,
2592                        _ => {
2593                            return Err(SqlError::TypeMismatch {
2594                                expected: "INTEGER".into(),
2595                                got: old_val.data_type().to_string(),
2596                            });
2597                        }
2598                    };
2599                    if !crate::encoding::patch_column_in_place(
2600                        &mut bufs.new_value_buf,
2601                        *phys_idx,
2602                        &new_val,
2603                    )? {
2604                        patch_scratch.clear();
2605                        crate::encoding::patch_row_column(
2606                            &bufs.new_value_buf,
2607                            *phys_idx,
2608                            &new_val,
2609                            &mut patch_scratch,
2610                        )?;
2611                        std::mem::swap(&mut bufs.new_value_buf, &mut patch_scratch);
2612                    }
2613                }
2614            }
2615        }
2616
2617        if bufs.new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2618            return Err(SqlError::RowTooLarge {
2619                size: bufs.new_value_buf.len(),
2620                max: citadel_core::MAX_VALUE_SIZE,
2621            });
2622        }
2623
2624        Ok(UpsertAction::Replace(bufs.new_value_buf.clone()))
2625    })
2626}
2627
2628fn upsert_needs_row(oc: &CompiledOnConflict, ts: &TableSchema) -> bool {
2629    if !ts.indices.is_empty() {
2630        return true;
2631    }
2632    match oc {
2633        CompiledOnConflict::DoNothing { .. } => false,
2634        CompiledOnConflict::DoUpdate { fast_paths, .. } => fast_paths.is_none() || ts.has_checks(),
2635    }
2636}
2637
2638fn can_fuse_do_update(ts: &TableSchema, assignments: &[(usize, Expr)]) -> bool {
2639    if !ts.indices.is_empty() {
2640        return false;
2641    }
2642    if !ts.foreign_keys.is_empty() {
2643        return false;
2644    }
2645    if ts.columns.iter().any(|c| c.generated_kind.is_some()) {
2646        return false;
2647    }
2648    let pk = ts.pk_indices();
2649    !assignments.iter().any(|(ci, _)| pk.contains(ci))
2650}
2651
2652#[allow(clippy::too_many_arguments)]
2653#[inline]
2654fn apply_do_update_fused(
2655    wtx: &mut WriteTxn<'_>,
2656    table_schema: &TableSchema,
2657    table_bytes: &[u8],
2658    key_buf: &[u8],
2659    value_buf: &[u8],
2660    proposed_row: &[Value],
2661    assignments: &[(usize, Expr)],
2662    where_clause: Option<&Expr>,
2663    col_map: &ColumnMap,
2664    fast_paths: Option<&[DoUpdateFastPath]>,
2665    capture_returning: bool,
2666) -> Result<InsertRowOutcome> {
2667    let non_pk = table_schema.non_pk_indices();
2668    let enc_pos = table_schema.encoding_positions();
2669    let phys_count = table_schema.physical_non_pk_count();
2670    let dropped = table_schema.dropped_non_pk_slots();
2671    let has_checks = table_schema.has_checks();
2672    let has_fks = !table_schema.foreign_keys.is_empty();
2673
2674    let captured: std::cell::RefCell<Option<(Vec<Value>, Vec<Value>)>> =
2675        std::cell::RefCell::new(None);
2676
2677    let outcome =
2678        wtx.table_upsert_with::<_, SqlError>(table_bytes, key_buf, value_buf, |old_bytes| {
2679            if let Some(fps) = fast_paths {
2680                if !has_checks {
2681                    let action = apply_fast_path_patch(old_bytes, fps)?;
2682                    if capture_returning {
2683                        if let UpsertAction::Replace(ref new_bytes) = action {
2684                            let old_row = decode_full_row(table_schema, key_buf, old_bytes)?;
2685                            let new_row = decode_full_row(table_schema, key_buf, new_bytes)?;
2686                            *captured.borrow_mut() = Some((old_row, new_row));
2687                        }
2688                    }
2689                    return Ok(action);
2690                }
2691            }
2692            UPSERT_SCRATCH.with(|slot| {
2693                let mut bufs = slot.borrow_mut();
2694                let UpsertBufs {
2695                    old_row,
2696                    new_row,
2697                    value_values,
2698                    new_value_buf,
2699                } = &mut *bufs;
2700
2701                old_row.clear();
2702                old_row.resize(table_schema.columns.len(), Value::Null);
2703                decode_full_row_into(table_schema, key_buf, old_bytes, old_row)?;
2704
2705                if let Some(w) = where_clause {
2706                    let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2707                    let result = eval_expr(w, &ctx)?;
2708                    if result.is_null() || !is_truthy(&result) {
2709                        return Ok(UpsertAction::Skip);
2710                    }
2711                }
2712
2713                new_row.clear();
2714                new_row.extend_from_slice(old_row);
2715                for (col_idx, expr) in assignments {
2716                    let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2717                    let val = eval_expr(expr, &ctx)?;
2718                    let col = &table_schema.columns[*col_idx];
2719                    new_row[*col_idx] = if val.is_null() {
2720                        Value::Null
2721                    } else {
2722                        let got = val.data_type();
2723                        val.coerce_into(col.data_type)
2724                            .ok_or_else(|| SqlError::TypeMismatch {
2725                                expected: col.data_type.to_string(),
2726                                got: got.to_string(),
2727                            })?
2728                    };
2729                }
2730
2731                for (assigned_idx, _) in assignments {
2732                    let col = &table_schema.columns[*assigned_idx];
2733                    if !col.nullable && new_row[col.position as usize].is_null() {
2734                        return Err(SqlError::NotNullViolation(col.name.clone()));
2735                    }
2736                }
2737                if has_checks {
2738                    for col in &table_schema.columns {
2739                        if let Some(ref check) = col.check_expr {
2740                            let ctx = EvalCtx::new(col_map, new_row);
2741                            let result = eval_expr(check, &ctx)?;
2742                            if !is_truthy(&result) && !result.is_null() {
2743                                let name = col.check_name.as_deref().unwrap_or(&col.name);
2744                                return Err(SqlError::CheckViolation(name.to_string()));
2745                            }
2746                        }
2747                    }
2748                    for tc in &table_schema.check_constraints {
2749                        let ctx = EvalCtx::new(col_map, new_row);
2750                        let result = eval_expr(&tc.expr, &ctx)?;
2751                        if !is_truthy(&result) && !result.is_null() {
2752                            let name = tc.name.as_deref().unwrap_or(&tc.sql);
2753                            return Err(SqlError::CheckViolation(name.to_string()));
2754                        }
2755                    }
2756                }
2757                let _ = has_fks;
2758
2759                value_values.clear();
2760                value_values.resize(phys_count, Value::Null);
2761                for &slot in dropped {
2762                    value_values[slot as usize] = Value::Null;
2763                }
2764                for (j, &i) in non_pk.iter().enumerate() {
2765                    value_values[enc_pos[j] as usize] = new_row[i].clone();
2766                }
2767                new_value_buf.clear();
2768                crate::encoding::encode_row_into(value_values, new_value_buf);
2769
2770                if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2771                    return Err(SqlError::RowTooLarge {
2772                        size: new_value_buf.len(),
2773                        max: citadel_core::MAX_VALUE_SIZE,
2774                    });
2775                }
2776
2777                if capture_returning {
2778                    *captured.borrow_mut() = Some((old_row.clone(), new_row.clone()));
2779                }
2780                Ok(UpsertAction::Replace(new_value_buf.clone()))
2781            })
2782        })?;
2783
2784    match outcome {
2785        UpsertOutcome::Inserted => Ok(InsertRowOutcome::Inserted),
2786        UpsertOutcome::Updated => {
2787            if capture_returning {
2788                let (old, new) = captured.into_inner().ok_or_else(|| {
2789                    SqlError::InvalidValue("DO UPDATE produced no captured rows".into())
2790                })?;
2791                Ok(InsertRowOutcome::Updated { old, new })
2792            } else {
2793                Ok(InsertRowOutcome::Inserted)
2794            }
2795        }
2796        UpsertOutcome::Skipped => Ok(InsertRowOutcome::Skipped),
2797    }
2798}
2799
2800fn fetch_unique_index_pk(
2801    wtx: &mut WriteTxn<'_>,
2802    table_schema: &TableSchema,
2803    index_idx: usize,
2804    row: &[Value],
2805) -> Result<Vec<u8>> {
2806    let idx = &table_schema.indices[index_idx];
2807    let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2808    let indexed: Vec<Value> = idx
2809        .column_positions_iter()
2810        .map(|col_idx| row[col_idx as usize].clone())
2811        .collect();
2812    let key = crate::encoding::encode_composite_key(&indexed);
2813    let value = wtx
2814        .table_get(&idx_table, &key)
2815        .map_err(SqlError::Storage)?
2816        .ok_or_else(|| {
2817            SqlError::InvalidValue("unique index missing expected collision entry".into())
2818        })?;
2819    Ok(value)
2820}
2821
2822#[allow(clippy::too_many_arguments)]
2823fn apply_do_update(
2824    wtx: &mut WriteTxn<'_>,
2825    table_schema: &TableSchema,
2826    pk_key: &[u8],
2827    proposed_row: &[Value],
2828    assignments: &[(usize, Expr)],
2829    where_clause: Option<&Expr>,
2830    col_map: &ColumnMap,
2831    capture_returning: bool,
2832) -> Result<InsertRowOutcome> {
2833    let old_value = wtx
2834        .table_get(table_schema.name.as_bytes(), pk_key)
2835        .map_err(SqlError::Storage)?
2836        .ok_or_else(|| SqlError::InvalidValue("primary row missing for DO UPDATE target".into()))?;
2837    let old_row = decode_full_row(table_schema, pk_key, &old_value)?;
2838    apply_do_update_with_old_row(
2839        wtx,
2840        table_schema,
2841        pk_key,
2842        &old_row,
2843        proposed_row,
2844        assignments,
2845        where_clause,
2846        col_map,
2847        capture_returning,
2848    )
2849}
2850
2851#[allow(clippy::too_many_arguments)]
2852fn apply_do_update_with_old_row(
2853    wtx: &mut WriteTxn<'_>,
2854    table_schema: &TableSchema,
2855    old_pk_key: &[u8],
2856    old_row: &[Value],
2857    proposed_row: &[Value],
2858    assignments: &[(usize, Expr)],
2859    where_clause: Option<&Expr>,
2860    col_map: &ColumnMap,
2861    capture_returning: bool,
2862) -> Result<InsertRowOutcome> {
2863    if let Some(w) = where_clause {
2864        let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2865        let result = eval_expr(w, &ctx)?;
2866        if result.is_null() || !is_truthy(&result) {
2867            return Ok(InsertRowOutcome::Skipped);
2868        }
2869    }
2870
2871    let mut new_row = old_row.to_vec();
2872    for (col_idx, expr) in assignments {
2873        let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2874        let val = eval_expr(expr, &ctx)?;
2875        let col = &table_schema.columns[*col_idx];
2876        new_row[*col_idx] = if val.is_null() {
2877            Value::Null
2878        } else {
2879            let got = val.data_type();
2880            val.coerce_into(col.data_type)
2881                .ok_or_else(|| SqlError::TypeMismatch {
2882                    expected: col.data_type.to_string(),
2883                    got: got.to_string(),
2884                })?
2885        };
2886    }
2887
2888    for col in &table_schema.columns {
2889        if matches!(
2890            col.generated_kind,
2891            Some(crate::parser::GeneratedKind::Stored)
2892        ) {
2893            let val = eval_expr(
2894                col.generated_expr.as_ref().unwrap(),
2895                &EvalCtx::new(col_map, &new_row),
2896            )?;
2897            let pos = col.position as usize;
2898            new_row[pos] = if val.is_null() {
2899                if !col.nullable {
2900                    return Err(SqlError::NotNullViolation(col.name.clone()));
2901                }
2902                Value::Null
2903            } else {
2904                let got = val.data_type();
2905                val.coerce_into(col.data_type)
2906                    .ok_or_else(|| SqlError::TypeMismatch {
2907                        expected: col.data_type.to_string(),
2908                        got: got.to_string(),
2909                    })?
2910            };
2911        }
2912    }
2913
2914    let pk_indices = table_schema.pk_indices();
2915    let assigned_pk = assignments.iter().any(|(ci, _)| pk_indices.contains(ci));
2916    let pk_changed = assigned_pk && pk_indices.iter().any(|&i| old_row[i] != new_row[i]);
2917
2918    for (assigned_idx, _) in assignments {
2919        let col = &table_schema.columns[*assigned_idx];
2920        if !col.nullable && new_row[col.position as usize].is_null() {
2921            return Err(SqlError::NotNullViolation(col.name.clone()));
2922        }
2923    }
2924    if table_schema.has_checks() {
2925        for col in &table_schema.columns {
2926            if let Some(ref check) = col.check_expr {
2927                let ctx = EvalCtx::new(col_map, &new_row);
2928                let result = eval_expr(check, &ctx)?;
2929                if !is_truthy(&result) && !result.is_null() {
2930                    let name = col.check_name.as_deref().unwrap_or(&col.name);
2931                    return Err(SqlError::CheckViolation(name.to_string()));
2932                }
2933            }
2934        }
2935        for tc in &table_schema.check_constraints {
2936            let ctx = EvalCtx::new(col_map, &new_row);
2937            let result = eval_expr(&tc.expr, &ctx)?;
2938            if !is_truthy(&result) && !result.is_null() {
2939                let name = tc.name.as_deref().unwrap_or(&tc.sql);
2940                return Err(SqlError::CheckViolation(name.to_string()));
2941            }
2942        }
2943    }
2944    for fk in &table_schema.foreign_keys {
2945        let changed = fk
2946            .columns
2947            .iter()
2948            .any(|&ci| old_row[ci as usize] != new_row[ci as usize]);
2949        if !changed {
2950            continue;
2951        }
2952        let any_null = fk.columns.iter().any(|&ci| new_row[ci as usize].is_null());
2953        if any_null {
2954            continue;
2955        }
2956        let fk_vals: Vec<Value> = fk
2957            .columns
2958            .iter()
2959            .map(|&ci| new_row[ci as usize].clone())
2960            .collect();
2961        let fk_key = crate::encoding::encode_composite_key(&fk_vals);
2962        if fk.deferrable && fk.initially_deferred {
2963            let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
2964            wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
2965                fk_name: name,
2966                foreign_table: fk.foreign_table.as_bytes().to_vec(),
2967                parent_key: fk_key,
2968            });
2969            continue;
2970        }
2971        if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key) {
2972            let found = wtx
2973                .table_get(fk.foreign_table.as_bytes(), &fk_key)
2974                .map_err(SqlError::Storage)?;
2975            if found.is_none() {
2976                let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
2977                return Err(SqlError::ForeignKeyViolation(name.to_string()));
2978            }
2979            wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key);
2980        }
2981    }
2982
2983    let has_indices = !table_schema.indices.is_empty();
2984    let old_pk_values: Vec<Value> = if has_indices || pk_changed {
2985        pk_indices.iter().map(|&i| old_row[i].clone()).collect()
2986    } else {
2987        Vec::new()
2988    };
2989    let new_pk_values: Vec<Value> = if has_indices || pk_changed {
2990        pk_indices.iter().map(|&i| new_row[i].clone()).collect()
2991    } else {
2992        Vec::new()
2993    };
2994
2995    let non_pk = table_schema.non_pk_indices();
2996    let enc_pos = table_schema.encoding_positions();
2997    let phys_count = table_schema.physical_non_pk_count();
2998    let dropped = table_schema.dropped_non_pk_slots();
2999    let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
3000    for &slot in dropped {
3001        value_values[slot as usize] = Value::Null;
3002    }
3003    for (j, &i) in non_pk.iter().enumerate() {
3004        let col = &table_schema.columns[i];
3005        value_values[enc_pos[j] as usize] = if matches!(
3006            col.generated_kind,
3007            Some(crate::parser::GeneratedKind::Virtual)
3008        ) {
3009            Value::Null
3010        } else {
3011            new_row[i].clone()
3012        };
3013    }
3014    let mut new_value_buf = Vec::with_capacity(256);
3015    crate::encoding::encode_row_into(&value_values, &mut new_value_buf);
3016
3017    if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
3018        return Err(SqlError::RowTooLarge {
3019            size: new_value_buf.len(),
3020            max: citadel_core::MAX_VALUE_SIZE,
3021        });
3022    }
3023
3024    if pk_changed {
3025        let new_pk_key = crate::encoding::encode_composite_key(&new_pk_values);
3026        let inserted = wtx
3027            .table_insert(table_schema.name.as_bytes(), &new_pk_key, &new_value_buf)
3028            .map_err(SqlError::Storage)?;
3029        if !inserted {
3030            return Err(SqlError::DuplicateKey);
3031        }
3032        wtx.table_delete(table_schema.name.as_bytes(), old_pk_key)
3033            .map_err(SqlError::Storage)?;
3034        for idx in &table_schema.indices {
3035            let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
3036            let old_idx_key =
3037                encode_index_key_with_schema(idx, old_row, &old_pk_values, table_schema);
3038            wtx.table_delete(&idx_table, &old_idx_key)
3039                .map_err(SqlError::Storage)?;
3040            let new_idx_key =
3041                encode_index_key_with_schema(idx, &new_row, &new_pk_values, table_schema);
3042            let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
3043            let is_new = wtx
3044                .table_insert(&idx_table, &new_idx_key, &new_idx_val)
3045                .map_err(SqlError::Storage)?;
3046            if idx.unique && !is_new {
3047                let any_null = idx
3048                    .column_positions_iter()
3049                    .any(|c| new_row[c as usize].is_null());
3050                if !any_null {
3051                    return Err(SqlError::UniqueViolation(idx.name.clone()));
3052                }
3053            }
3054        }
3055    } else {
3056        wtx.table_update_sorted(
3057            table_schema.name.as_bytes(),
3058            &[(old_pk_key, new_value_buf.as_slice())],
3059        )
3060        .map_err(SqlError::Storage)?;
3061        let col_map_partial = any_partial_index(table_schema).then(|| table_schema.column_map());
3062        for idx in &table_schema.indices {
3063            let cols_changed = index_columns_changed(idx, old_row, &new_row);
3064            let (del, ins) = partial_idx_update_actions(
3065                idx,
3066                old_row,
3067                &new_row,
3068                cols_changed,
3069                false,
3070                col_map_partial,
3071            );
3072            let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
3073            if del {
3074                let old_idx_key =
3075                    encode_index_key_with_schema(idx, old_row, &old_pk_values, table_schema);
3076                wtx.table_delete(&idx_table, &old_idx_key)
3077                    .map_err(SqlError::Storage)?;
3078            }
3079            if ins {
3080                let new_idx_key =
3081                    encode_index_key_with_schema(idx, &new_row, &new_pk_values, table_schema);
3082                let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
3083                let is_new = wtx
3084                    .table_insert(&idx_table, &new_idx_key, &new_idx_val)
3085                    .map_err(SqlError::Storage)?;
3086                if idx.unique && !is_new {
3087                    let any_null = idx
3088                        .column_positions_iter()
3089                        .any(|c| new_row[c as usize].is_null());
3090                    if !any_null {
3091                        return Err(SqlError::UniqueViolation(idx.name.clone()));
3092                    }
3093                }
3094            }
3095        }
3096    }
3097
3098    if capture_returning {
3099        Ok(InsertRowOutcome::Updated {
3100            old: old_row.to_vec(),
3101            new: new_row,
3102        })
3103    } else {
3104        Ok(InsertRowOutcome::Inserted)
3105    }
3106}
3107
3108fn detect_fast_paths(
3109    ts: &TableSchema,
3110    assignments: &[(usize, Expr)],
3111) -> Option<Vec<DoUpdateFastPath>> {
3112    let non_pk = ts.non_pk_indices();
3113    let enc_pos = ts.encoding_positions();
3114    let mut out = Vec::with_capacity(assignments.len());
3115    for (col_idx, expr) in assignments {
3116        let col = &ts.columns[*col_idx];
3117        if col.data_type != DataType::Integer {
3118            return None;
3119        }
3120        let nonpk_order = non_pk.iter().position(|&i| i == *col_idx)?;
3121        let phys_idx = enc_pos[nonpk_order] as usize;
3122
3123        if let Expr::BinaryOp { left, op, right } = expr {
3124            if !matches!(op, BinOp::Add | BinOp::Sub) {
3125                return None;
3126            }
3127            let reads_target =
3128                matches!(left.as_ref(), Expr::Column(n) if n.eq_ignore_ascii_case(&col.name));
3129            if !reads_target {
3130                return None;
3131            }
3132            if let Expr::Literal(Value::Integer(n)) = right.as_ref() {
3133                let delta = if matches!(op, BinOp::Sub) { -n } else { *n };
3134                let _ = col_idx;
3135                out.push(DoUpdateFastPath::IntAddConst { phys_idx, delta });
3136                continue;
3137            }
3138            return None;
3139        }
3140        return None;
3141    }
3142    Some(out)
3143}
3144
3145fn compile_on_conflict(oc: &OnConflictClause, ts: &TableSchema) -> Result<CompiledOnConflict> {
3146    let target = oc
3147        .target
3148        .as_ref()
3149        .map(|t| resolve_conflict_target(t, ts))
3150        .transpose()?;
3151    match &oc.action {
3152        OnConflictAction::DoNothing => Ok(CompiledOnConflict::DoNothing { target }),
3153        OnConflictAction::DoUpdate {
3154            assignments,
3155            where_clause,
3156        } => {
3157            let target = target.ok_or_else(|| {
3158                SqlError::Plan("ON CONFLICT without target requires DO NOTHING".into())
3159            })?;
3160            let compiled_assignments: Vec<(usize, Expr)> = assignments
3161                .iter()
3162                .map(|(name, expr)| {
3163                    let col_idx = ts
3164                        .column_index(name)
3165                        .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?;
3166                    Ok((col_idx, expr.clone()))
3167                })
3168                .collect::<Result<_>>()?;
3169            let fast_paths = if where_clause.is_none() {
3170                detect_fast_paths(ts, &compiled_assignments)
3171            } else {
3172                None
3173            };
3174            Ok(CompiledOnConflict::DoUpdate {
3175                target,
3176                assignments: compiled_assignments,
3177                where_clause: where_clause.clone(),
3178                fast_paths,
3179            })
3180        }
3181    }
3182}
3183
3184/// Caller MUST check `cache.is_trivial_fast` first.
3185fn exec_insert_trivial_fast(
3186    wtx: &mut WriteTxn<'_>,
3187    table_lower: &str,
3188    cache: &InsertCache,
3189    bufs: &mut InsertBufs,
3190    params: &[Value],
3191) -> Result<ExecutionResult> {
3192    let prog = cache
3193        .trivial_fast_program
3194        .as_ref()
3195        .expect("trivial fast: program");
3196
3197    for &p in &prog.not_null_param_indices {
3198        if params[p as usize].is_null() {
3199            return Err(SqlError::NotNullViolation(format!("param@{p}")));
3200        }
3201    }
3202
3203    match &params[prog.pk_param as usize] {
3204        Value::Integer(v) => crate::encoding::encode_int_key_into(*v, &mut bufs.key_buf),
3205        _ => return Err(SqlError::InvalidValue("non-integer PK in fast path".into())),
3206    }
3207
3208    bufs.value_buf.clear();
3209    bufs.value_buf.extend_from_slice(&prog.template);
3210
3211    for op in &prog.ops {
3212        match op {
3213            WriteOp::ParamI64 { param_idx, off } => match &params[*param_idx as usize] {
3214                Value::Integer(v) => {
3215                    let off = *off as usize;
3216                    bufs.value_buf[off..off + 8].copy_from_slice(&v.to_le_bytes());
3217                }
3218                other => {
3219                    return Err(SqlError::TypeMismatch {
3220                        expected: "Integer".into(),
3221                        got: other.data_type().to_string(),
3222                    });
3223                }
3224            },
3225            WriteOp::LiteralI64 { value, off } => {
3226                let off = *off as usize;
3227                bufs.value_buf[off..off + 8].copy_from_slice(&value.to_le_bytes());
3228            }
3229            WriteOp::GenAddParamsI64 {
3230                a_param,
3231                b_param,
3232                off,
3233                bitmap_byte_off,
3234                bitmap_bit_mask,
3235            } => match (&params[*a_param as usize], &params[*b_param as usize]) {
3236                (Value::Integer(a), Value::Integer(b)) => {
3237                    let off = *off as usize;
3238                    bufs.value_buf[off..off + 8].copy_from_slice(&a.wrapping_add(*b).to_le_bytes());
3239                }
3240                _ => {
3241                    bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
3242                }
3243            },
3244            WriteOp::GenMulAddParamI64 {
3245                param_idx,
3246                mul,
3247                add,
3248                off,
3249                bitmap_byte_off,
3250                bitmap_bit_mask,
3251            } => match &params[*param_idx as usize] {
3252                Value::Integer(v) => {
3253                    let r = v.wrapping_mul(*mul).wrapping_add(*add);
3254                    let off = *off as usize;
3255                    bufs.value_buf[off..off + 8].copy_from_slice(&r.to_le_bytes());
3256                }
3257                _ => {
3258                    bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
3259                }
3260            },
3261        }
3262    }
3263
3264    let is_new = wtx
3265        .table_insert(table_lower.as_bytes(), &bufs.key_buf, &bufs.value_buf)
3266        .map_err(SqlError::Storage)?;
3267    if !is_new {
3268        return Err(SqlError::DuplicateKey);
3269    }
3270    Ok(ExecutionResult::RowsAffected(1))
3271}
3272
3273fn build_bind_plan(
3274    stmt: &InsertStmt,
3275    col_indices: &[usize],
3276    col_data_types: &[DataType],
3277) -> Option<Vec<BindAction>> {
3278    let rows = match &stmt.source {
3279        InsertSource::Values(rows) => rows,
3280        _ => return None,
3281    };
3282    if rows.len() != 1 {
3283        return None;
3284    }
3285    let value_row = &rows[0];
3286    if value_row.len() != col_indices.len() {
3287        return None;
3288    }
3289    let mut plan = Vec::with_capacity(value_row.len());
3290    for (i, expr) in value_row.iter().enumerate() {
3291        let col_idx = col_indices[i];
3292        let target = col_data_types[col_idx];
3293        match expr {
3294            Expr::Parameter(n) => {
3295                if *n == 0 {
3296                    return None;
3297                }
3298                plan.push(BindAction::Param {
3299                    param_idx: n - 1,
3300                    col_idx,
3301                    target,
3302                });
3303            }
3304            Expr::Literal(v) => plan.push(BindAction::Literal {
3305                value: v.clone(),
3306                col_idx,
3307            }),
3308            _ => return None,
3309        }
3310    }
3311    Some(plan)
3312}
3313
3314impl CompiledInsert {
3315    pub fn try_compile(schema: &SchemaManager, stmt: &InsertStmt) -> Option<Self> {
3316        let lower = stmt.table.to_ascii_lowercase();
3317        let cached = if let Some(ts) = schema.get(&lower) {
3318            let insert_columns: Vec<&str> = if stmt.columns.is_empty() {
3319                ts.columns.iter().map(|c| c.name.as_str()).collect()
3320            } else {
3321                stmt.columns.iter().map(|s| s.as_str()).collect()
3322            };
3323            let mut col_indices = Vec::with_capacity(insert_columns.len());
3324            for name in &insert_columns {
3325                col_indices.push(ts.column_index(name)?);
3326            }
3327            if col_indices
3328                .iter()
3329                .any(|&ci| ts.columns[ci].generated_kind.is_some())
3330            {
3331                return None;
3332            }
3333            let on_conflict = stmt
3334                .on_conflict
3335                .as_ref()
3336                .map(|oc| compile_on_conflict(oc, ts))
3337                .transpose()
3338                .ok()
3339                .flatten()
3340                .map(Arc::new);
3341            let generated_col_positions: Vec<usize> = ts
3342                .columns
3343                .iter()
3344                .enumerate()
3345                .filter_map(|(i, c)| {
3346                    matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored))
3347                        .then_some(i)
3348                })
3349                .collect();
3350            let generated_fast_evals: Vec<FastGenEval> = generated_col_positions
3351                .iter()
3352                .map(|&pos| {
3353                    detect_fast_gen_eval(ts.columns[pos].generated_expr.as_ref().unwrap(), ts)
3354                })
3355                .collect();
3356            let row_col_map = if on_conflict.is_some() || !generated_col_positions.is_empty() {
3357                Some(ColumnMap::new(&ts.columns))
3358            } else {
3359                None
3360            };
3361            let pk_indices: Vec<usize> = ts.pk_indices().to_vec();
3362            let non_pk_indices: Vec<usize> = ts.non_pk_indices().to_vec();
3363            let encoding_positions: Vec<u16> = ts.encoding_positions().to_vec();
3364            let dropped_non_pk_slots: Vec<u16> = ts.dropped_non_pk_slots().to_vec();
3365            let phys_count = ts.physical_non_pk_count();
3366            let col_data_types: Vec<DataType> = ts.columns.iter().map(|c| c.data_type).collect();
3367            let single_int_pk =
3368                pk_indices.len() == 1 && ts.columns[pk_indices[0]].data_type == DataType::Integer;
3369            let not_null_indices: Vec<u16> = ts
3370                .columns
3371                .iter()
3372                .filter(|c| !c.nullable)
3373                .map(|c| c.position)
3374                .collect();
3375            let bind_plan = build_bind_plan(stmt, &col_indices, &col_data_types);
3376            let any_defaults_flag = ts.columns.iter().any(|c| c.default_expr.is_some());
3377            let row_fully_overwritten = if any_defaults_flag {
3378                false
3379            } else {
3380                let mut covered: rustc_hash::FxHashSet<usize> =
3381                    col_indices.iter().copied().collect();
3382                covered.extend(generated_col_positions.iter().copied());
3383                for (j, &i) in non_pk_indices.iter().enumerate() {
3384                    let _ = j;
3385                    if matches!(
3386                        ts.columns[i].generated_kind,
3387                        Some(crate::parser::GeneratedKind::Virtual)
3388                    ) {
3389                        covered.insert(i);
3390                    }
3391                }
3392                bind_plan.is_some() && covered.len() == ts.columns.len()
3393            };
3394            let has_fks = !ts.foreign_keys.is_empty();
3395            let has_indices = !ts.indices.is_empty();
3396            let mut non_virtual_pairs: Vec<(usize, usize)> = Vec::new();
3397            let mut null_value_slots: Vec<usize> =
3398                dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3399            for (j, &i) in non_pk_indices.iter().enumerate() {
3400                let slot = encoding_positions[j] as usize;
3401                if matches!(
3402                    ts.columns[i].generated_kind,
3403                    Some(crate::parser::GeneratedKind::Virtual)
3404                ) {
3405                    null_value_slots.push(slot);
3406                } else {
3407                    non_virtual_pairs.push((i, slot));
3408                }
3409            }
3410            let row_encoder = {
3411                let all_int_or_null = non_pk_indices.iter().enumerate().all(|(j, &i)| {
3412                    let col = &ts.columns[i];
3413                    if matches!(
3414                        col.generated_kind,
3415                        Some(crate::parser::GeneratedKind::Virtual)
3416                    ) {
3417                        true
3418                    } else {
3419                        col.data_type == DataType::Integer && encoding_positions[j] != u16::MAX
3420                    }
3421                });
3422                if all_int_or_null {
3423                    let mut null_slots: Vec<usize> =
3424                        dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3425                    for (j, &i) in non_pk_indices.iter().enumerate() {
3426                        if matches!(
3427                            ts.columns[i].generated_kind,
3428                            Some(crate::parser::GeneratedKind::Virtual)
3429                        ) {
3430                            null_slots.push(encoding_positions[j] as usize);
3431                        }
3432                    }
3433                    Some(crate::encoding::build_int_row_template(
3434                        phys_count,
3435                        &null_slots,
3436                    ))
3437                } else {
3438                    None
3439                }
3440            };
3441            let is_trivial_fast_eligible = !insert_has_subquery(stmt)
3442                && !ts.columns.iter().any(|c| c.default_expr.is_some())
3443                && !ts.has_checks()
3444                && !has_fks
3445                && !has_indices
3446                && stmt.on_conflict.is_none()
3447                && stmt.returning.is_none()
3448                && bind_plan.is_some()
3449                && row_encoder.is_some()
3450                && row_fully_overwritten
3451                && single_int_pk
3452                && generated_fast_evals
3453                    .iter()
3454                    .all(|fe| !matches!(fe, FastGenEval::None));
3455            let trivial_fast_program = if is_trivial_fast_eligible {
3456                build_trivial_fast_program(
3457                    bind_plan.as_ref().unwrap(),
3458                    row_encoder.as_ref().unwrap(),
3459                    &non_virtual_pairs,
3460                    &generated_col_positions,
3461                    &generated_fast_evals,
3462                    &pk_indices,
3463                    &ts.columns,
3464                )
3465            } else {
3466                None
3467            };
3468            let is_trivial_fast = trivial_fast_program.is_some();
3469            let has_checks = ts.has_checks();
3470            let any_defaults = ts.columns.iter().any(|c| c.default_expr.is_some());
3471            let needs_scoped_params = bind_plan.is_none()
3472                || has_checks
3473                || any_defaults
3474                || !generated_col_positions.is_empty()
3475                || on_conflict.is_some()
3476                || stmt.returning.is_some()
3477                || insert_has_subquery(stmt)
3478                || super::helpers::any_partial_index(ts);
3479            Some(InsertCache {
3480                col_indices,
3481                has_subquery: insert_has_subquery(stmt),
3482                any_defaults,
3483                has_checks,
3484                on_conflict,
3485                row_col_map,
3486                generated_col_positions,
3487                generated_fast_evals,
3488                pk_indices,
3489                non_pk_indices,
3490                encoding_positions,
3491                dropped_non_pk_slots,
3492                phys_count,
3493                single_int_pk,
3494                not_null_indices,
3495                bind_plan,
3496                row_fully_overwritten,
3497                row_encoder,
3498                is_trivial_fast,
3499                trivial_fast_program,
3500                needs_scoped_params,
3501            })
3502        } else if schema.get_view(&lower).is_some() {
3503            None
3504        } else {
3505            return None;
3506        };
3507        Some(Self {
3508            table_lower: lower,
3509            cached,
3510        })
3511    }
3512}
3513
3514impl CompiledPlan for CompiledInsert {
3515    fn execute(
3516        &self,
3517        db: &Database,
3518        schema: &SchemaManager,
3519        stmt: &Statement,
3520        params: &[Value],
3521        wtx: Option<&mut WriteTxn<'_>>,
3522    ) -> Result<ExecutionResult> {
3523        let ins = match stmt {
3524            Statement::Insert(i) => i,
3525            _ => {
3526                return Err(SqlError::Unsupported(
3527                    "CompiledInsert received non-INSERT statement".into(),
3528                ))
3529            }
3530        };
3531        match wtx {
3532            None => exec_insert(db, schema, ins, params),
3533            Some(outer) => match self.cached.as_ref() {
3534                Some(c) if c.is_trivial_fast => with_insert_scratch(|bufs| {
3535                    exec_insert_trivial_fast(outer, &self.table_lower, c, bufs, params)
3536                }),
3537                Some(c) => exec_insert_in_txn_cached(outer, schema, ins, params, c),
3538                None => exec_insert_in_txn(outer, schema, ins, params),
3539            },
3540        }
3541    }
3542
3543    fn uses_scoped_params(&self) -> bool {
3544        match self.cached.as_ref() {
3545            Some(c) => !c.is_trivial_fast && c.needs_scoped_params,
3546            None => true,
3547        }
3548    }
3549}
3550
3551pub struct CompiledDelete {
3552    table_lower: String,
3553}
3554
3555impl CompiledDelete {
3556    pub fn try_compile(schema: &SchemaManager, stmt: &DeleteStmt) -> Option<Self> {
3557        let lower = stmt.table.to_ascii_lowercase();
3558        schema.get(&lower)?;
3559        Some(Self { table_lower: lower })
3560    }
3561}
3562
3563impl CompiledPlan for CompiledDelete {
3564    fn execute(
3565        &self,
3566        db: &Database,
3567        schema: &SchemaManager,
3568        stmt: &Statement,
3569        _params: &[Value],
3570        wtx: Option<&mut WriteTxn<'_>>,
3571    ) -> Result<ExecutionResult> {
3572        let del = match stmt {
3573            Statement::Delete(d) => d,
3574            _ => {
3575                return Err(SqlError::Unsupported(
3576                    "CompiledDelete received non-DELETE statement".into(),
3577                ))
3578            }
3579        };
3580        let _ = &self.table_lower;
3581        match wtx {
3582            None => super::write::exec_delete(db, schema, del),
3583            Some(outer) => super::write::exec_delete_in_txn(outer, schema, del),
3584        }
3585    }
3586}
3587
3588fn exec_instead_of_view_insert_auto(
3589    db: &Database,
3590    schema: &SchemaManager,
3591    view_name: &str,
3592    aliases: &[String],
3593    stmt: &InsertStmt,
3594    params: &[Value],
3595) -> Result<ExecutionResult> {
3596    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
3597    let r = exec_instead_of_view_insert_in_txn(&mut wtx, schema, view_name, aliases, stmt, params)?;
3598    wtx.commit().map_err(SqlError::Storage)?;
3599    Ok(r)
3600}
3601
3602fn exec_instead_of_view_insert_in_txn(
3603    wtx: &mut WriteTxn<'_>,
3604    schema: &SchemaManager,
3605    view_name: &str,
3606    aliases: &[String],
3607    stmt: &InsertStmt,
3608    params: &[Value],
3609) -> Result<ExecutionResult> {
3610    // CREATE VIEW without explicit aliases stores an empty vec — derive at runtime.
3611    let resolved_aliases: Vec<String> = if aliases.is_empty() {
3612        derive_view_columns(wtx, schema, view_name)?
3613    } else {
3614        aliases.to_vec()
3615    };
3616    let view_cols = super::triggers::view_columns_from_aliases(&resolved_aliases);
3617    let alias_map: rustc_hash::FxHashMap<String, usize> = resolved_aliases
3618        .iter()
3619        .enumerate()
3620        .map(|(i, name)| (name.to_ascii_lowercase(), i))
3621        .collect();
3622
3623    let target_positions: Vec<usize> = if stmt.columns.is_empty() {
3624        (0..resolved_aliases.len()).collect()
3625    } else {
3626        stmt.columns
3627            .iter()
3628            .map(|c| {
3629                alias_map
3630                    .get(&c.to_ascii_lowercase())
3631                    .copied()
3632                    .ok_or_else(|| SqlError::ColumnNotFound(c.clone()))
3633            })
3634            .collect::<Result<_>>()?
3635    };
3636
3637    let source_rows: Vec<Vec<Value>> = match &stmt.source {
3638        InsertSource::Values(rows) => {
3639            let mut out = Vec::with_capacity(rows.len());
3640            for row in rows {
3641                if row.len() != target_positions.len() {
3642                    return Err(SqlError::InvalidValue(format!(
3643                        "expected {} values, got {}",
3644                        target_positions.len(),
3645                        row.len()
3646                    )));
3647                }
3648                let mut vals = Vec::with_capacity(row.len());
3649                for expr in row {
3650                    let v = match expr {
3651                        Expr::Parameter(n) => params
3652                            .get(n - 1)
3653                            .cloned()
3654                            .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
3655                        Expr::Literal(v) => v.clone(),
3656                        other => eval_const_expr(other)?,
3657                    };
3658                    vals.push(v);
3659                }
3660                out.push(vals);
3661            }
3662            out
3663        }
3664        InsertSource::Select(sq) => {
3665            let empty_ctes = CteContext::default();
3666            let qr = exec_query_body_write(wtx, schema, &sq.body, &empty_ctes)?;
3667            qr.rows
3668        }
3669    };
3670
3671    let mut count: u64 = 0;
3672    for row in source_rows {
3673        if row.len() != target_positions.len() {
3674            return Err(SqlError::InvalidValue(format!(
3675                "expected {} values, got {}",
3676                target_positions.len(),
3677                row.len()
3678            )));
3679        }
3680        let mut new_row = vec![Value::Null; resolved_aliases.len()];
3681        for (slot, val) in target_positions.iter().zip(row) {
3682            new_row[*slot] = val;
3683        }
3684        super::triggers::fire_row_triggers(
3685            wtx,
3686            schema,
3687            view_name,
3688            crate::parser::TriggerTiming::InsteadOf,
3689            super::triggers::FireEvent::Insert,
3690            None,
3691            Some(new_row),
3692            &view_cols,
3693        )?;
3694        count += 1;
3695    }
3696    Ok(ExecutionResult::RowsAffected(count))
3697}
3698
3699fn derive_view_columns(
3700    wtx: &mut WriteTxn<'_>,
3701    schema: &SchemaManager,
3702    view_name: &str,
3703) -> Result<Vec<String>> {
3704    use crate::parser::{QueryBody, SelectColumn, SelectQuery, SelectStmt};
3705    let sel = SelectStmt {
3706        columns: vec![SelectColumn::AllColumns],
3707        from: view_name.to_string(),
3708        from_alias: None,
3709        from_subquery: None,
3710        from_args: None,
3711        from_json_table: None,
3712        joins: vec![],
3713        distinct: false,
3714        where_clause: None,
3715        order_by: vec![],
3716        limit: Some(Expr::Literal(Value::Integer(1))),
3717        offset: None,
3718        group_by: vec![],
3719        having: None,
3720    };
3721    let sq = SelectQuery {
3722        ctes: vec![],
3723        recursive: false,
3724        body: QueryBody::Select(Box::new(sel)),
3725    };
3726    let qr = super::cte::exec_select_query_in_txn(wtx, schema, &sq)?;
3727    match qr {
3728        ExecutionResult::Query(q) => Ok(q.columns),
3729        _ => Ok(Vec::new()),
3730    }
3731}