Skip to main content

citadel_sql/executor/
dml.rs

1use std::cell::RefCell;
2use std::sync::Arc;
3
4use citadel::Database;
5use citadel_buffer::btree::{UpsertAction, UpsertOutcome};
6use citadel_txn::read_txn::ReadTxn;
7use citadel_txn::write_txn::WriteTxn;
8use rustc_hash::FxHashMap;
9
10use crate::encoding::{encode_composite_key_into, encode_row_into};
11use crate::error::{Result, SqlError};
12use crate::eval::{eval_expr, is_truthy, ColumnMap, EvalCtx};
13use crate::parser::*;
14use crate::types::*;
15
16use crate::schema::SchemaManager;
17
18use super::compile::CompiledPlan;
19use super::helpers::*;
20use super::CteContext;
21
22pub(super) fn exec_insert(
23    db: &Database,
24    schema: &SchemaManager,
25    stmt: &InsertStmt,
26    params: &[Value],
27) -> Result<ExecutionResult> {
28    let empty_ctes = CteContext::default();
29    let materialized;
30    let stmt = if insert_has_subquery(stmt) {
31        materialized = materialize_insert(stmt, &mut |sub| {
32            exec_subquery_read(db, schema, sub, &empty_ctes)
33        })?;
34        &materialized
35    } else {
36        stmt
37    };
38
39    let lower_name = stmt.table.to_ascii_lowercase();
40    if let Some(view_def) = schema.get_view(&lower_name) {
41        if super::triggers::has_instead_of(schema, &lower_name, super::triggers::FireEvent::Insert)
42        {
43            let aliases = view_def.column_aliases.clone();
44            return exec_instead_of_view_insert_auto(
45                db,
46                schema,
47                &lower_name,
48                &aliases,
49                stmt,
50                params,
51            );
52        }
53        return Err(SqlError::CannotModifyView(stmt.table.clone()));
54    }
55    if schema.get_matview(&lower_name).is_some() {
56        return Err(SqlError::CannotModifyView(format!(
57            "materialized view '{}' is read-only — use REFRESH MATERIALIZED VIEW",
58            stmt.table
59        )));
60    }
61    let table_schema = schema
62        .get(&lower_name)
63        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
64    schema.mark_dml(&table_schema.name);
65
66    let insert_columns = if stmt.columns.is_empty() {
67        table_schema
68            .columns
69            .iter()
70            .map(|c| c.name.clone())
71            .collect::<Vec<_>>()
72    } else {
73        stmt.columns
74            .iter()
75            .map(|c| c.to_ascii_lowercase())
76            .collect()
77    };
78
79    let col_indices: Vec<usize> = insert_columns
80        .iter()
81        .map(|name| {
82            table_schema
83                .column_index(name)
84                .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
85        })
86        .collect::<Result<_>>()?;
87
88    for &ci in &col_indices {
89        if table_schema.columns[ci].generated_kind.is_some() {
90            return Err(SqlError::CannotInsertIntoGeneratedColumn(
91                table_schema.columns[ci].name.clone(),
92            ));
93        }
94    }
95
96    let defaults: Vec<(usize, &Expr)> = table_schema
97        .columns
98        .iter()
99        .filter(|c| c.default_expr.is_some() && !col_indices.contains(&(c.position as usize)))
100        .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
101        .collect();
102
103    let generated_cols: Vec<(usize, &Expr)> = table_schema
104        .columns
105        .iter()
106        .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
107        .map(|c| (c.position as usize, c.generated_expr.as_ref().unwrap()))
108        .collect();
109
110    let has_checks = table_schema.has_checks();
111    let strict = table_schema.is_strict();
112    let row_col_map_for_gen = if !generated_cols.is_empty() {
113        Some(ColumnMap::new(&table_schema.columns))
114    } else {
115        None
116    };
117    let check_col_map = if has_checks {
118        Some(ColumnMap::new(&table_schema.columns))
119    } else {
120        None
121    };
122
123    let select_rows = match &stmt.source {
124        InsertSource::Select(sq) => {
125            let insert_ctes =
126                super::materialize_all_ctes(&sq.ctes, sq.recursive, &mut |body, ctx| {
127                    exec_query_body_read(db, schema, body, ctx)
128                })?;
129            let qr = exec_query_body_read(db, schema, &sq.body, &insert_ctes)?;
130            Some(qr.rows)
131        }
132        InsertSource::Values(_) => None,
133    };
134
135    let compiled_conflict: Option<Arc<CompiledOnConflict>> = stmt
136        .on_conflict
137        .as_ref()
138        .map(|oc| compile_on_conflict(oc, table_schema).map(Arc::new))
139        .transpose()?;
140
141    let row_col_map = compiled_conflict
142        .as_ref()
143        .map(|_| ColumnMap::new(&table_schema.columns));
144
145    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
146    let mut count: u64 = 0;
147    let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
148        stmt.returning.as_ref().map(|_| Vec::new());
149
150    let pk_indices = table_schema.pk_indices();
151    let non_pk = table_schema.non_pk_indices();
152    let enc_pos = table_schema.encoding_positions();
153    let phys_count = table_schema.physical_non_pk_count();
154    let mut row = vec![Value::Null; table_schema.columns.len()];
155    let mut pk_values: Vec<Value> = vec![Value::Null; pk_indices.len()];
156    let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
157    let mut key_buf: Vec<u8> = Vec::with_capacity(64);
158    let mut value_buf: Vec<u8> = Vec::with_capacity(256);
159    let mut fk_key_buf: Vec<u8> = Vec::with_capacity(64);
160
161    let values = match &stmt.source {
162        InsertSource::Values(rows) => Some(rows.as_slice()),
163        InsertSource::Select(_) => None,
164    };
165    let sel_rows = select_rows.as_deref();
166
167    let total = match (values, sel_rows) {
168        (Some(rows), _) => rows.len(),
169        (_, Some(rows)) => rows.len(),
170        _ => 0,
171    };
172
173    if let Some(sel) = sel_rows {
174        if !sel.is_empty() && sel[0].len() != insert_columns.len() {
175            return Err(SqlError::InvalidValue(format!(
176                "INSERT ... SELECT column count mismatch: expected {}, got {}",
177                insert_columns.len(),
178                sel[0].len()
179            )));
180        }
181    }
182
183    let has_insert_statement_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
184        t.enabled
185            && t.granularity == crate::parser::TriggerGranularity::ForEachStatement
186            && t.events
187                .iter()
188                .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
189    });
190    let mut stmt_new_rows: Vec<Vec<Value>> = if has_insert_statement_triggers {
191        Vec::with_capacity(total)
192    } else {
193        Vec::new()
194    };
195
196    if has_insert_statement_triggers {
197        super::triggers::fire_statement_triggers(
198            &mut wtx,
199            schema,
200            &table_schema.name,
201            crate::parser::TriggerTiming::Before,
202            super::triggers::FireEvent::Insert,
203            &table_schema.columns,
204            &[],
205            &[],
206        )?;
207    }
208
209    for idx in 0..total {
210        for v in row.iter_mut() {
211            *v = Value::Null;
212        }
213
214        if let Some(value_rows) = values {
215            let value_row = &value_rows[idx];
216            if value_row.len() != insert_columns.len() {
217                return Err(SqlError::InvalidValue(format!(
218                    "expected {} values, got {}",
219                    insert_columns.len(),
220                    value_row.len()
221                )));
222            }
223            for (i, expr) in value_row.iter().enumerate() {
224                let val = if let Expr::Parameter(n) = expr {
225                    params
226                        .get(n - 1)
227                        .cloned()
228                        .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?
229                } else {
230                    eval_const_expr(expr)?
231                };
232                let col_idx = col_indices[i];
233                let col = &table_schema.columns[col_idx];
234                row[col_idx] = if val.is_null() {
235                    Value::Null
236                } else {
237                    coerce_for_column(val, col, strict)?
238                };
239            }
240        } else if let Some(sel) = sel_rows {
241            let sel_row = &sel[idx];
242            for (i, val) in sel_row.iter().enumerate() {
243                let col_idx = col_indices[i];
244                let col = &table_schema.columns[col_idx];
245                row[col_idx] = if val.is_null() {
246                    Value::Null
247                } else {
248                    coerce_for_column(val.clone(), col, strict)?
249                };
250            }
251        }
252
253        for &(pos, def_expr) in &defaults {
254            let val = eval_const_expr(def_expr)?;
255            let col = &table_schema.columns[pos];
256            if !val.is_null() {
257                row[pos] = coerce_for_column(val, col, strict)?;
258            }
259        }
260
261        if let Some(ref gen_map) = row_col_map_for_gen {
262            for &(pos, gen_expr) in &generated_cols {
263                let val = eval_expr(gen_expr, &EvalCtx::new(gen_map, &row))?;
264                let col = &table_schema.columns[pos];
265                row[pos] = if val.is_null() {
266                    Value::Null
267                } else {
268                    coerce_for_column(val, col, strict)?
269                };
270            }
271        }
272
273        for col in &table_schema.columns {
274            if !col.nullable && row[col.position as usize].is_null() {
275                return Err(SqlError::NotNullViolation(col.name.clone()));
276            }
277        }
278
279        if let Some(ref col_map) = check_col_map {
280            for col in &table_schema.columns {
281                if let Some(ref check) = col.check_expr {
282                    let result = eval_expr(check, &EvalCtx::new(col_map, &row))?;
283                    if !is_truthy(&result) && !result.is_null() {
284                        let name = col.check_name.as_deref().unwrap_or(&col.name);
285                        return Err(SqlError::CheckViolation(name.to_string()));
286                    }
287                }
288            }
289            for tc in &table_schema.check_constraints {
290                let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &row))?;
291                if !is_truthy(&result) && !result.is_null() {
292                    let name = tc.name.as_deref().unwrap_or(&tc.sql);
293                    return Err(SqlError::CheckViolation(name.to_string()));
294                }
295            }
296        }
297
298        for fk in &table_schema.foreign_keys {
299            let any_null = fk.columns.iter().any(|&ci| row[ci as usize].is_null());
300            if any_null {
301                continue; // MATCH SIMPLE: skip if any FK col is NULL
302            }
303            let fk_vals: Vec<Value> = fk
304                .columns
305                .iter()
306                .map(|&ci| row[ci as usize].clone())
307                .collect();
308            fk_key_buf.clear();
309            encode_composite_key_into(&fk_vals, &mut fk_key_buf);
310            if fk.deferrable && fk.initially_deferred {
311                let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
312                wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
313                    fk_name: name,
314                    foreign_table: fk.foreign_table.as_bytes().to_vec(),
315                    parent_key: fk_key_buf.clone(),
316                });
317                continue;
318            }
319            if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key_buf) {
320                let found = wtx
321                    .table_get(fk.foreign_table.as_bytes(), &fk_key_buf)
322                    .map_err(SqlError::Storage)?;
323                if found.is_none() {
324                    let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
325                    return Err(SqlError::ForeignKeyViolation(name.to_string()));
326                }
327                wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key_buf);
328            }
329        }
330
331        let proposed_row_for_returning: Option<Vec<Value>> =
332            returning_rows.as_ref().map(|_| row.clone());
333        let row_for_stmt_trigger: Option<Vec<Value>> = if has_insert_statement_triggers {
334            Some(row.clone())
335        } else {
336            None
337        };
338
339        let has_before_insert_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
340            t.enabled
341                && t.timing == crate::parser::TriggerTiming::Before
342                && t.granularity == crate::parser::TriggerGranularity::ForEachRow
343                && t.events
344                    .iter()
345                    .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
346        });
347        if has_before_insert_triggers {
348            super::triggers::fire_row_triggers(
349                &mut wtx,
350                schema,
351                &table_schema.name,
352                crate::parser::TriggerTiming::Before,
353                super::triggers::FireEvent::Insert,
354                None,
355                Some(row.clone()),
356                &table_schema.columns,
357            )?;
358        }
359
360        for (j, &i) in pk_indices.iter().enumerate() {
361            pk_values[j] = std::mem::replace(&mut row[i], Value::Null);
362        }
363        encode_composite_key_into(&pk_values, &mut key_buf);
364
365        for (j, &i) in non_pk.iter().enumerate() {
366            let col = &table_schema.columns[i];
367            if matches!(
368                col.generated_kind,
369                Some(crate::parser::GeneratedKind::Virtual)
370            ) {
371                value_values[enc_pos[j] as usize] = Value::Null;
372                row[i] = Value::Null;
373            } else {
374                value_values[enc_pos[j] as usize] = std::mem::replace(&mut row[i], Value::Null);
375            }
376        }
377        encode_row_into(&value_values, &mut value_buf);
378
379        if key_buf.len() > citadel_core::MAX_KEY_SIZE {
380            return Err(SqlError::KeyTooLarge {
381                size: key_buf.len(),
382                max: citadel_core::MAX_KEY_SIZE,
383            });
384        }
385        if value_buf.len() > citadel_core::MAX_VALUE_SIZE {
386            return Err(SqlError::RowTooLarge {
387                size: value_buf.len(),
388                max: citadel_core::MAX_VALUE_SIZE,
389            });
390        }
391
392        match compiled_conflict.as_ref() {
393            None => {
394                let is_new = wtx
395                    .table_insert(table_schema.name.as_bytes(), &key_buf, &value_buf)
396                    .map_err(SqlError::Storage)?;
397                if !is_new {
398                    return Err(SqlError::DuplicateKey);
399                }
400                let has_after_insert_triggers =
401                    schema.triggers_for(&table_schema.name).iter().any(|t| {
402                        t.enabled
403                            && t.timing == crate::parser::TriggerTiming::After
404                            && t.granularity == crate::parser::TriggerGranularity::ForEachRow
405                            && t.events
406                                .iter()
407                                .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
408                    });
409                if !table_schema.indices.is_empty() || has_after_insert_triggers {
410                    for (j, &i) in pk_indices.iter().enumerate() {
411                        row[i] = pk_values[j].clone();
412                    }
413                    for (j, &i) in non_pk.iter().enumerate() {
414                        row[i] =
415                            std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
416                    }
417                    if !table_schema.indices.is_empty() {
418                        insert_index_entries(&mut wtx, table_schema, &row, &pk_values)?;
419                    }
420                    if has_after_insert_triggers {
421                        super::triggers::fire_row_triggers(
422                            &mut wtx,
423                            schema,
424                            &table_schema.name,
425                            crate::parser::TriggerTiming::After,
426                            super::triggers::FireEvent::Insert,
427                            None,
428                            Some(row.clone()),
429                            &table_schema.columns,
430                        )?;
431                    }
432                }
433                if let Some(r) = row_for_stmt_trigger.clone() {
434                    stmt_new_rows.push(r);
435                }
436                count += 1;
437                if let Some(buf) = returning_rows.as_mut() {
438                    buf.push((None, proposed_row_for_returning));
439                }
440            }
441            Some(oc) => {
442                let oc_ref: &CompiledOnConflict = oc;
443                let needs_row = upsert_needs_row(oc_ref, table_schema);
444                if needs_row {
445                    for (j, &i) in pk_indices.iter().enumerate() {
446                        row[i] = pk_values[j].clone();
447                    }
448                    for (j, &i) in non_pk.iter().enumerate() {
449                        row[i] =
450                            std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
451                    }
452                }
453                let outcome = apply_insert_with_conflict(
454                    &mut wtx,
455                    table_schema,
456                    &key_buf,
457                    &value_buf,
458                    &row,
459                    &pk_values,
460                    oc_ref,
461                    row_col_map.as_ref().unwrap(),
462                    stmt.returning.is_some(),
463                )?;
464                match outcome {
465                    InsertRowOutcome::Inserted => {
466                        count += 1;
467                        if let Some(buf) = returning_rows.as_mut() {
468                            buf.push((None, proposed_row_for_returning));
469                        }
470                        if let Some(r) = row_for_stmt_trigger.clone() {
471                            stmt_new_rows.push(r);
472                        }
473                        let has_after_insert_triggers =
474                            schema.triggers_for(&table_schema.name).iter().any(|t| {
475                                t.enabled
476                                    && t.timing == crate::parser::TriggerTiming::After
477                                    && t.granularity
478                                        == crate::parser::TriggerGranularity::ForEachRow
479                                    && t.events
480                                        .iter()
481                                        .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
482                            });
483                        if has_after_insert_triggers {
484                            super::triggers::fire_row_triggers(
485                                &mut wtx,
486                                schema,
487                                &table_schema.name,
488                                crate::parser::TriggerTiming::After,
489                                super::triggers::FireEvent::Insert,
490                                None,
491                                Some(row.clone()),
492                                &table_schema.columns,
493                            )?;
494                        }
495                    }
496                    InsertRowOutcome::Updated { old, new } => {
497                        count += 1;
498                        if let Some(buf) = returning_rows.as_mut() {
499                            buf.push((Some(old.clone()), Some(new.clone())));
500                        }
501                        let has_after_update_triggers =
502                            schema.triggers_for(&table_schema.name).iter().any(|t| {
503                                t.enabled
504                                    && t.timing == crate::parser::TriggerTiming::After
505                                    && t.granularity
506                                        == crate::parser::TriggerGranularity::ForEachRow
507                                    && t.events.iter().any(|e| {
508                                        matches!(e, crate::parser::TriggerEvent::Update(_))
509                                    })
510                            });
511                        if has_after_update_triggers {
512                            let changed_cols: Vec<String> = match oc_ref {
513                                CompiledOnConflict::DoUpdate { assignments, .. } => assignments
514                                    .iter()
515                                    .map(|(col_idx, _)| table_schema.columns[*col_idx].name.clone())
516                                    .collect(),
517                                _ => Vec::new(),
518                            };
519                            super::triggers::fire_row_triggers(
520                                &mut wtx,
521                                schema,
522                                &table_schema.name,
523                                crate::parser::TriggerTiming::After,
524                                super::triggers::FireEvent::Update {
525                                    changed_columns: &changed_cols,
526                                },
527                                Some(old),
528                                Some(new),
529                                &table_schema.columns,
530                            )?;
531                        }
532                    }
533                    InsertRowOutcome::Skipped => {}
534                }
535            }
536        }
537    }
538
539    if has_insert_statement_triggers {
540        super::triggers::fire_statement_triggers(
541            &mut wtx,
542            schema,
543            &table_schema.name,
544            crate::parser::TriggerTiming::After,
545            super::triggers::FireEvent::Insert,
546            &table_schema.columns,
547            &[],
548            &stmt_new_rows,
549        )?;
550    }
551
552    if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
553        let qr = super::helpers::project_returning(table_schema, returning_cols, &rows)?;
554        super::helpers::drain_deferred_fk_checks(&mut wtx)?;
555        wtx.commit().map_err(SqlError::Storage)?;
556        return Ok(ExecutionResult::Query(qr));
557    }
558
559    super::helpers::drain_deferred_fk_checks(&mut wtx)?;
560    wtx.commit().map_err(SqlError::Storage)?;
561    Ok(ExecutionResult::RowsAffected(count))
562}
563
564pub(super) fn has_subquery(expr: &Expr) -> bool {
565    crate::parser::has_subquery(expr)
566}
567
568pub(super) fn stmt_has_subquery(stmt: &SelectStmt) -> bool {
569    if let Some(ref w) = stmt.where_clause {
570        if has_subquery(w) {
571            return true;
572        }
573    }
574    if let Some(ref h) = stmt.having {
575        if has_subquery(h) {
576            return true;
577        }
578    }
579    for col in &stmt.columns {
580        if let SelectColumn::Expr { expr, .. } = col {
581            if has_subquery(expr) {
582                return true;
583            }
584        }
585    }
586    for ob in &stmt.order_by {
587        if has_subquery(&ob.expr) {
588            return true;
589        }
590    }
591    for join in &stmt.joins {
592        if let Some(ref on_expr) = join.on_clause {
593            if has_subquery(on_expr) {
594                return true;
595            }
596        }
597    }
598    false
599}
600
601pub(super) fn materialize_expr(
602    expr: &Expr,
603    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
604) -> Result<Expr> {
605    match expr {
606        Expr::InSubquery {
607            expr: e,
608            subquery,
609            negated,
610        } => {
611            let inner = materialize_expr(e, exec_sub)?;
612            let qr = exec_sub(subquery)?;
613            if !qr.columns.is_empty() && qr.columns.len() != 1 {
614                return Err(SqlError::SubqueryMultipleColumns);
615            }
616            let mut values = rustc_hash::FxHashSet::default();
617            let mut has_null = false;
618            for row in &qr.rows {
619                if row[0].is_null() {
620                    has_null = true;
621                } else {
622                    values.insert(row[0].clone());
623                }
624            }
625            Ok(Expr::InSet {
626                expr: Box::new(inner),
627                values,
628                has_null,
629                negated: *negated,
630            })
631        }
632        Expr::ScalarSubquery(subquery) => {
633            let qr = exec_sub(subquery)?;
634            if qr.rows.len() > 1 {
635                return Err(SqlError::SubqueryMultipleRows);
636            }
637            let val = if qr.rows.is_empty() {
638                Value::Null
639            } else {
640                qr.rows[0][0].clone()
641            };
642            Ok(Expr::Literal(val))
643        }
644        Expr::Exists { subquery, negated } => {
645            let qr = exec_sub(subquery)?;
646            let exists = !qr.rows.is_empty();
647            let result = if *negated { !exists } else { exists };
648            Ok(Expr::Literal(Value::Boolean(result)))
649        }
650        Expr::InList {
651            expr: e,
652            list,
653            negated,
654        } => {
655            let inner = materialize_expr(e, exec_sub)?;
656            let items = list
657                .iter()
658                .map(|item| materialize_expr(item, exec_sub))
659                .collect::<Result<Vec<_>>>()?;
660            Ok(Expr::InList {
661                expr: Box::new(inner),
662                list: items,
663                negated: *negated,
664            })
665        }
666        Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
667            left: Box::new(materialize_expr(left, exec_sub)?),
668            op: *op,
669            right: Box::new(materialize_expr(right, exec_sub)?),
670        }),
671        Expr::UnaryOp { op, expr: e } => Ok(Expr::UnaryOp {
672            op: *op,
673            expr: Box::new(materialize_expr(e, exec_sub)?),
674        }),
675        Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(materialize_expr(e, exec_sub)?))),
676        Expr::IsNotNull(e) => Ok(Expr::IsNotNull(Box::new(materialize_expr(e, exec_sub)?))),
677        Expr::InSet {
678            expr: e,
679            values,
680            has_null,
681            negated,
682        } => Ok(Expr::InSet {
683            expr: Box::new(materialize_expr(e, exec_sub)?),
684            values: values.clone(),
685            has_null: *has_null,
686            negated: *negated,
687        }),
688        Expr::Between {
689            expr: e,
690            low,
691            high,
692            negated,
693        } => Ok(Expr::Between {
694            expr: Box::new(materialize_expr(e, exec_sub)?),
695            low: Box::new(materialize_expr(low, exec_sub)?),
696            high: Box::new(materialize_expr(high, exec_sub)?),
697            negated: *negated,
698        }),
699        Expr::Like {
700            expr: e,
701            pattern,
702            escape,
703            negated,
704        } => {
705            let esc = escape
706                .as_ref()
707                .map(|es| materialize_expr(es, exec_sub).map(Box::new))
708                .transpose()?;
709            Ok(Expr::Like {
710                expr: Box::new(materialize_expr(e, exec_sub)?),
711                pattern: Box::new(materialize_expr(pattern, exec_sub)?),
712                escape: esc,
713                negated: *negated,
714            })
715        }
716        Expr::Case {
717            operand,
718            conditions,
719            else_result,
720        } => {
721            let op = operand
722                .as_ref()
723                .map(|e| materialize_expr(e, exec_sub).map(Box::new))
724                .transpose()?;
725            let conds = conditions
726                .iter()
727                .map(|(c, r)| {
728                    Ok((
729                        materialize_expr(c, exec_sub)?,
730                        materialize_expr(r, exec_sub)?,
731                    ))
732                })
733                .collect::<Result<Vec<_>>>()?;
734            let else_r = else_result
735                .as_ref()
736                .map(|e| materialize_expr(e, exec_sub).map(Box::new))
737                .transpose()?;
738            Ok(Expr::Case {
739                operand: op,
740                conditions: conds,
741                else_result: else_r,
742            })
743        }
744        Expr::Coalesce(args) => {
745            let materialized = args
746                .iter()
747                .map(|a| materialize_expr(a, exec_sub))
748                .collect::<Result<Vec<_>>>()?;
749            Ok(Expr::Coalesce(materialized))
750        }
751        Expr::Cast { expr: e, data_type } => Ok(Expr::Cast {
752            expr: Box::new(materialize_expr(e, exec_sub)?),
753            data_type: *data_type,
754        }),
755        Expr::Function {
756            name,
757            args,
758            distinct,
759        } => {
760            let materialized = args
761                .iter()
762                .map(|a| materialize_expr(a, exec_sub))
763                .collect::<Result<Vec<_>>>()?;
764            Ok(Expr::Function {
765                name: name.clone(),
766                args: materialized,
767                distinct: *distinct,
768            })
769        }
770        other => Ok(other.clone()),
771    }
772}
773
774pub(super) fn materialize_stmt(
775    stmt: &SelectStmt,
776    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
777) -> Result<SelectStmt> {
778    let where_clause = stmt
779        .where_clause
780        .as_ref()
781        .map(|e| materialize_expr(e, exec_sub))
782        .transpose()?;
783    let having = stmt
784        .having
785        .as_ref()
786        .map(|e| materialize_expr(e, exec_sub))
787        .transpose()?;
788    let columns = stmt
789        .columns
790        .iter()
791        .map(|c| match c {
792            SelectColumn::AllColumns => Ok(SelectColumn::AllColumns),
793            SelectColumn::AllFromOld => Ok(SelectColumn::AllFromOld),
794            SelectColumn::AllFromNew => Ok(SelectColumn::AllFromNew),
795            SelectColumn::Expr { expr, alias } => Ok(SelectColumn::Expr {
796                expr: materialize_expr(expr, exec_sub)?,
797                alias: alias.clone(),
798            }),
799        })
800        .collect::<Result<Vec<_>>>()?;
801    let order_by = stmt
802        .order_by
803        .iter()
804        .map(|ob| {
805            Ok(OrderByItem {
806                expr: materialize_expr(&ob.expr, exec_sub)?,
807                descending: ob.descending,
808                nulls_first: ob.nulls_first,
809            })
810        })
811        .collect::<Result<Vec<_>>>()?;
812    let joins = stmt
813        .joins
814        .iter()
815        .map(|j| {
816            let on_clause = j
817                .on_clause
818                .as_ref()
819                .map(|e| materialize_expr(e, exec_sub))
820                .transpose()?;
821            Ok(JoinClause {
822                join_type: j.join_type,
823                table: j.table.clone(),
824                subquery: j.subquery.clone(),
825                on_clause,
826            })
827        })
828        .collect::<Result<Vec<_>>>()?;
829    let group_by = stmt
830        .group_by
831        .iter()
832        .map(|e| materialize_expr(e, exec_sub))
833        .collect::<Result<Vec<_>>>()?;
834    Ok(SelectStmt {
835        columns,
836        from: stmt.from.clone(),
837        from_alias: stmt.from_alias.clone(),
838        from_subquery: stmt.from_subquery.clone(),
839        from_args: stmt.from_args.clone(),
840        from_json_table: stmt.from_json_table.clone(),
841        joins,
842        distinct: stmt.distinct,
843        where_clause,
844        order_by,
845        limit: stmt.limit.clone(),
846        offset: stmt.offset.clone(),
847        group_by,
848        having,
849    })
850}
851
852pub(super) fn exec_subquery_read(
853    db: &Database,
854    schema: &SchemaManager,
855    stmt: &SelectStmt,
856    ctes: &CteContext,
857) -> Result<QueryResult> {
858    let mut rtx = db.begin_read();
859    exec_subquery_with_read(&mut rtx, schema, stmt, ctes)
860}
861
862pub(super) fn exec_subquery_with_read(
863    rtx: &mut ReadTxn<'_>,
864    schema: &SchemaManager,
865    stmt: &SelectStmt,
866    ctes: &CteContext,
867) -> Result<QueryResult> {
868    match super::exec_select_with_read(rtx, schema, stmt, ctes)? {
869        ExecutionResult::Query(qr) => Ok(qr),
870        _ => Ok(QueryResult {
871            columns: vec![],
872            rows: vec![],
873        }),
874    }
875}
876
877pub(super) fn exec_subquery_write(
878    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
879    schema: &SchemaManager,
880    stmt: &SelectStmt,
881    ctes: &CteContext,
882) -> Result<QueryResult> {
883    match super::exec_select_in_txn(wtx, schema, stmt, ctes)? {
884        ExecutionResult::Query(qr) => Ok(qr),
885        _ => Ok(QueryResult {
886            columns: vec![],
887            rows: vec![],
888        }),
889    }
890}
891
892pub(super) fn update_has_subquery(stmt: &UpdateStmt) -> bool {
893    stmt.where_clause.as_ref().is_some_and(has_subquery)
894        || stmt.assignments.iter().any(|(_, e)| has_subquery(e))
895}
896
897pub(super) fn materialize_update(
898    stmt: &UpdateStmt,
899    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
900) -> Result<UpdateStmt> {
901    let where_clause = stmt
902        .where_clause
903        .as_ref()
904        .map(|e| materialize_expr(e, exec_sub))
905        .transpose()?;
906    let assignments = stmt
907        .assignments
908        .iter()
909        .map(|(name, expr)| Ok((name.clone(), materialize_expr(expr, exec_sub)?)))
910        .collect::<Result<Vec<_>>>()?;
911    Ok(UpdateStmt {
912        table: stmt.table.clone(),
913        assignments,
914        where_clause,
915        returning: stmt.returning.clone(),
916    })
917}
918
919pub(super) fn delete_has_subquery(stmt: &DeleteStmt) -> bool {
920    stmt.where_clause.as_ref().is_some_and(has_subquery)
921}
922
923pub(super) fn materialize_delete(
924    stmt: &DeleteStmt,
925    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
926) -> Result<DeleteStmt> {
927    let where_clause = stmt
928        .where_clause
929        .as_ref()
930        .map(|e| materialize_expr(e, exec_sub))
931        .transpose()?;
932    Ok(DeleteStmt {
933        table: stmt.table.clone(),
934        where_clause,
935        returning: stmt.returning.clone(),
936    })
937}
938
939pub(super) fn insert_has_subquery(stmt: &InsertStmt) -> bool {
940    match &stmt.source {
941        InsertSource::Values(rows) => rows.iter().any(|row| row.iter().any(has_subquery)),
942        // SELECT source subqueries are handled by exec_select's correlated/non-correlated paths
943        InsertSource::Select(_) => false,
944    }
945}
946
947pub(super) fn materialize_insert(
948    stmt: &InsertStmt,
949    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
950) -> Result<InsertStmt> {
951    let source = match &stmt.source {
952        InsertSource::Values(rows) => {
953            let mat = rows
954                .iter()
955                .map(|row| {
956                    row.iter()
957                        .map(|e| materialize_expr(e, exec_sub))
958                        .collect::<Result<Vec<_>>>()
959                })
960                .collect::<Result<Vec<_>>>()?;
961            InsertSource::Values(mat)
962        }
963        InsertSource::Select(sq) => {
964            let ctes = sq
965                .ctes
966                .iter()
967                .map(|c| {
968                    Ok(CteDefinition {
969                        name: c.name.clone(),
970                        column_aliases: c.column_aliases.clone(),
971                        body: materialize_query_body(&c.body, exec_sub)?,
972                    })
973                })
974                .collect::<Result<Vec<_>>>()?;
975            let body = materialize_query_body(&sq.body, exec_sub)?;
976            InsertSource::Select(Box::new(SelectQuery {
977                ctes,
978                recursive: sq.recursive,
979                body,
980            }))
981        }
982    };
983    Ok(InsertStmt {
984        table: stmt.table.clone(),
985        columns: stmt.columns.clone(),
986        source,
987        on_conflict: stmt.on_conflict.clone(),
988        returning: stmt.returning.clone(),
989    })
990}
991
992pub(super) fn materialize_query_body(
993    body: &QueryBody,
994    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
995) -> Result<QueryBody> {
996    match body {
997        QueryBody::Select(sel) => Ok(QueryBody::Select(Box::new(materialize_stmt(
998            sel, exec_sub,
999        )?))),
1000        QueryBody::Compound(comp) => Ok(QueryBody::Compound(Box::new(CompoundSelect {
1001            op: comp.op.clone(),
1002            all: comp.all,
1003            left: Box::new(materialize_query_body(&comp.left, exec_sub)?),
1004            right: Box::new(materialize_query_body(&comp.right, exec_sub)?),
1005            order_by: comp.order_by.clone(),
1006            limit: comp.limit.clone(),
1007            offset: comp.offset.clone(),
1008        }))),
1009        QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Ok(body.clone()),
1010    }
1011}
1012
1013pub(super) fn exec_query_body_with_read(
1014    rtx: &mut ReadTxn<'_>,
1015    schema: &SchemaManager,
1016    body: &QueryBody,
1017    ctes: &CteContext,
1018) -> Result<ExecutionResult> {
1019    match body {
1020        QueryBody::Select(sel) => super::exec_select_with_read(rtx, schema, sel, ctes),
1021        QueryBody::Compound(comp) => exec_compound_select_with_read(rtx, schema, comp, ctes),
1022        QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Err(
1023            SqlError::Unsupported("DML CTE bodies require an active write transaction".into()),
1024        ),
1025    }
1026}
1027
1028pub(super) fn exec_query_body_in_txn(
1029    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1030    schema: &SchemaManager,
1031    body: &QueryBody,
1032    ctes: &CteContext,
1033) -> Result<ExecutionResult> {
1034    match body {
1035        QueryBody::Select(sel) => super::exec_select_in_txn(wtx, schema, sel, ctes),
1036        QueryBody::Compound(comp) => exec_compound_select_in_txn(wtx, schema, comp, ctes),
1037        QueryBody::Insert(ins) => exec_insert_in_txn_with_ctes(wtx, schema, ins, &[], ctes),
1038        QueryBody::Update(upd) => super::exec_update_in_txn(wtx, schema, upd),
1039        QueryBody::Delete(del) => super::exec_delete_in_txn(wtx, schema, del),
1040    }
1041}
1042
1043pub(super) fn exec_query_body_read(
1044    db: &Database,
1045    schema: &SchemaManager,
1046    body: &QueryBody,
1047    ctes: &CteContext,
1048) -> Result<QueryResult> {
1049    let mut rtx = db.begin_read();
1050    exec_query_body_with_read_qr(&mut rtx, schema, body, ctes)
1051}
1052
1053pub(super) fn exec_query_body_with_read_qr(
1054    rtx: &mut ReadTxn<'_>,
1055    schema: &SchemaManager,
1056    body: &QueryBody,
1057    ctes: &CteContext,
1058) -> Result<QueryResult> {
1059    match exec_query_body_with_read(rtx, schema, body, ctes)? {
1060        ExecutionResult::Query(qr) => Ok(qr),
1061        _ => Ok(QueryResult {
1062            columns: vec![],
1063            rows: vec![],
1064        }),
1065    }
1066}
1067
1068pub(super) fn exec_query_body_write(
1069    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1070    schema: &SchemaManager,
1071    body: &QueryBody,
1072    ctes: &CteContext,
1073) -> Result<QueryResult> {
1074    match exec_query_body_in_txn(wtx, schema, body, ctes)? {
1075        ExecutionResult::Query(qr) => Ok(qr),
1076        _ => Ok(QueryResult {
1077            columns: vec![],
1078            rows: vec![],
1079        }),
1080    }
1081}
1082
1083pub(super) fn exec_compound_select_with_read(
1084    rtx: &mut ReadTxn<'_>,
1085    schema: &SchemaManager,
1086    comp: &CompoundSelect,
1087    ctes: &CteContext,
1088) -> Result<ExecutionResult> {
1089    let left_qr = match exec_query_body_with_read(rtx, schema, &comp.left, ctes)? {
1090        ExecutionResult::Query(qr) => qr,
1091        _ => QueryResult {
1092            columns: vec![],
1093            rows: vec![],
1094        },
1095    };
1096    let right_qr = match exec_query_body_with_read(rtx, schema, &comp.right, ctes)? {
1097        ExecutionResult::Query(qr) => qr,
1098        _ => QueryResult {
1099            columns: vec![],
1100            rows: vec![],
1101        },
1102    };
1103    apply_set_operation(comp, left_qr, right_qr)
1104}
1105
1106pub(super) fn exec_compound_select_in_txn(
1107    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1108    schema: &SchemaManager,
1109    comp: &CompoundSelect,
1110    ctes: &CteContext,
1111) -> Result<ExecutionResult> {
1112    let left_qr = match exec_query_body_in_txn(wtx, schema, &comp.left, ctes)? {
1113        ExecutionResult::Query(qr) => qr,
1114        _ => QueryResult {
1115            columns: vec![],
1116            rows: vec![],
1117        },
1118    };
1119    let right_qr = match exec_query_body_in_txn(wtx, schema, &comp.right, ctes)? {
1120        ExecutionResult::Query(qr) => qr,
1121        _ => QueryResult {
1122            columns: vec![],
1123            rows: vec![],
1124        },
1125    };
1126    apply_set_operation(comp, left_qr, right_qr)
1127}
1128
1129pub(super) fn apply_set_operation(
1130    comp: &CompoundSelect,
1131    left_qr: QueryResult,
1132    right_qr: QueryResult,
1133) -> Result<ExecutionResult> {
1134    if !left_qr.columns.is_empty()
1135        && !right_qr.columns.is_empty()
1136        && left_qr.columns.len() != right_qr.columns.len()
1137    {
1138        return Err(SqlError::CompoundColumnCountMismatch {
1139            left: left_qr.columns.len(),
1140            right: right_qr.columns.len(),
1141        });
1142    }
1143
1144    let columns = left_qr.columns;
1145
1146    let mut rows = match (&comp.op, comp.all) {
1147        (SetOp::Union, true) => {
1148            let total = left_qr.rows.len().saturating_add(right_qr.rows.len());
1149            let mut rows = Vec::with_capacity(total);
1150            rows.extend(left_qr.rows);
1151            rows.extend(right_qr.rows);
1152            rows
1153        }
1154        (SetOp::Union, false) => {
1155            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1156            let mut rows = Vec::new();
1157            for row in left_qr.rows.into_iter().chain(right_qr.rows) {
1158                if !seen.contains(&row) {
1159                    seen.insert(row.clone());
1160                    rows.push(row);
1161                }
1162            }
1163            rows
1164        }
1165        (SetOp::Intersect, true) => {
1166            let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1167            for row in &right_qr.rows {
1168                *right_counts.entry(row.clone()).or_insert(0) += 1;
1169            }
1170            let mut rows = Vec::new();
1171            for row in left_qr.rows {
1172                if let Some(count) = right_counts.get_mut(&row) {
1173                    if *count > 0 {
1174                        *count -= 1;
1175                        rows.push(row);
1176                    }
1177                }
1178            }
1179            rows
1180        }
1181        (SetOp::Intersect, false) => {
1182            let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1183            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1184            let mut rows = Vec::new();
1185            for row in left_qr.rows {
1186                if right_set.contains(&row) && !seen.contains(&row) {
1187                    seen.insert(row.clone());
1188                    rows.push(row);
1189                }
1190            }
1191            rows
1192        }
1193        (SetOp::Except, true) => {
1194            let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1195            for row in &right_qr.rows {
1196                *right_counts.entry(row.clone()).or_insert(0) += 1;
1197            }
1198            let mut rows = Vec::new();
1199            for row in left_qr.rows {
1200                if let Some(count) = right_counts.get_mut(&row) {
1201                    if *count > 0 {
1202                        *count -= 1;
1203                        continue;
1204                    }
1205                }
1206                rows.push(row);
1207            }
1208            rows
1209        }
1210        (SetOp::Except, false) => {
1211            let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1212            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1213            let mut rows = Vec::new();
1214            for row in left_qr.rows {
1215                if !right_set.contains(&row) && !seen.contains(&row) {
1216                    seen.insert(row.clone());
1217                    rows.push(row);
1218                }
1219            }
1220            rows
1221        }
1222    };
1223
1224    if !comp.order_by.is_empty() {
1225        let col_defs: Vec<crate::types::ColumnDef> = columns
1226            .iter()
1227            .enumerate()
1228            .map(|(i, name)| crate::types::ColumnDef {
1229                name: name.clone(),
1230                data_type: crate::types::DataType::Null,
1231                nullable: true,
1232                position: i as u16,
1233                default_expr: None,
1234                default_sql: None,
1235                check_expr: None,
1236                check_sql: None,
1237                check_name: None,
1238                is_with_timezone: false,
1239                generated_expr: None,
1240                generated_sql: None,
1241                generated_kind: None,
1242                collation: crate::types::Collation::Binary,
1243            })
1244            .collect();
1245        sort_rows(&mut rows, &comp.order_by, &col_defs)?;
1246    }
1247
1248    if let Some(ref offset_expr) = comp.offset {
1249        let offset = eval_const_int(offset_expr)?.max(0) as usize;
1250        if offset < rows.len() {
1251            rows = rows.split_off(offset);
1252        } else {
1253            rows.clear();
1254        }
1255    }
1256
1257    if let Some(ref limit_expr) = comp.limit {
1258        let limit = eval_const_int(limit_expr)?.max(0) as usize;
1259        rows.truncate(limit);
1260    }
1261
1262    Ok(ExecutionResult::Query(QueryResult { columns, rows }))
1263}
1264
1265struct InsertBufs {
1266    row: Vec<Value>,
1267    pk_values: Vec<Value>,
1268    value_values: Vec<Value>,
1269    key_buf: Vec<u8>,
1270    value_buf: Vec<u8>,
1271    col_indices: Vec<usize>,
1272    fk_key_buf: Vec<u8>,
1273}
1274
1275impl InsertBufs {
1276    fn new() -> Self {
1277        Self {
1278            row: Vec::new(),
1279            pk_values: Vec::new(),
1280            value_values: Vec::new(),
1281            key_buf: Vec::with_capacity(64),
1282            value_buf: Vec::with_capacity(256),
1283            col_indices: Vec::new(),
1284            fk_key_buf: Vec::with_capacity(64),
1285        }
1286    }
1287}
1288
1289thread_local! {
1290    static INSERT_SCRATCH: RefCell<InsertBufs> = RefCell::new(InsertBufs::new());
1291    static UPSERT_SCRATCH: RefCell<UpsertBufs> = RefCell::new(UpsertBufs::new());
1292}
1293
1294fn with_insert_scratch<R>(f: impl FnOnce(&mut InsertBufs) -> R) -> R {
1295    INSERT_SCRATCH.with(|slot| match slot.try_borrow_mut() {
1296        Ok(mut borrowed) => f(&mut borrowed),
1297        Err(_) => {
1298            let mut local = InsertBufs::new();
1299            f(&mut local)
1300        }
1301    })
1302}
1303
1304pub(super) struct UpsertBufs {
1305    old_row: Vec<Value>,
1306    new_row: Vec<Value>,
1307    value_values: Vec<Value>,
1308    new_value_buf: Vec<u8>,
1309}
1310
1311impl UpsertBufs {
1312    pub(super) fn new() -> Self {
1313        Self {
1314            old_row: Vec::new(),
1315            new_row: Vec::new(),
1316            value_values: Vec::new(),
1317            new_value_buf: Vec::with_capacity(256),
1318        }
1319    }
1320}
1321
1322pub fn exec_insert_in_txn(
1323    wtx: &mut WriteTxn<'_>,
1324    schema: &SchemaManager,
1325    stmt: &InsertStmt,
1326    params: &[Value],
1327) -> Result<ExecutionResult> {
1328    with_insert_scratch(|bufs| {
1329        exec_insert_in_txn_impl(
1330            wtx,
1331            schema,
1332            stmt,
1333            params,
1334            bufs,
1335            None,
1336            &CteContext::default(),
1337        )
1338    })
1339}
1340
1341pub(super) fn exec_insert_in_txn_with_ctes(
1342    wtx: &mut WriteTxn<'_>,
1343    schema: &SchemaManager,
1344    stmt: &InsertStmt,
1345    params: &[Value],
1346    outer_ctes: &CteContext,
1347) -> Result<ExecutionResult> {
1348    with_insert_scratch(|bufs| {
1349        exec_insert_in_txn_impl(wtx, schema, stmt, params, bufs, None, outer_ctes)
1350    })
1351}
1352
1353fn exec_insert_in_txn_cached(
1354    wtx: &mut WriteTxn<'_>,
1355    schema: &SchemaManager,
1356    stmt: &InsertStmt,
1357    params: &[Value],
1358    cache: &InsertCache,
1359) -> Result<ExecutionResult> {
1360    with_insert_scratch(|bufs| {
1361        exec_insert_in_txn_impl(
1362            wtx,
1363            schema,
1364            stmt,
1365            params,
1366            bufs,
1367            Some(cache),
1368            &CteContext::default(),
1369        )
1370    })
1371}
1372
1373fn exec_insert_in_txn_impl(
1374    wtx: &mut WriteTxn<'_>,
1375    schema: &SchemaManager,
1376    stmt: &InsertStmt,
1377    params: &[Value],
1378    bufs: &mut InsertBufs,
1379    cache: Option<&InsertCache>,
1380    outer_ctes: &CteContext,
1381) -> Result<ExecutionResult> {
1382    let empty_ctes = CteContext::default();
1383    let materialized;
1384    let has_sub = match cache {
1385        Some(c) => c.has_subquery,
1386        None => insert_has_subquery(stmt),
1387    };
1388    let stmt = if has_sub {
1389        materialized = materialize_insert(stmt, &mut |sub| {
1390            exec_subquery_write(wtx, schema, sub, &empty_ctes)
1391        })?;
1392        &materialized
1393    } else {
1394        stmt
1395    };
1396
1397    let view_lookup_key = stmt.table.to_ascii_lowercase();
1398    if let Some(view_def) = schema.get_view(&view_lookup_key) {
1399        if super::triggers::has_instead_of(
1400            schema,
1401            &view_lookup_key,
1402            super::triggers::FireEvent::Insert,
1403        ) {
1404            let aliases = view_def.column_aliases.clone();
1405            return exec_instead_of_view_insert_in_txn(
1406                wtx,
1407                schema,
1408                &view_lookup_key,
1409                &aliases,
1410                stmt,
1411                params,
1412            );
1413        }
1414        return Err(SqlError::CannotModifyView(stmt.table.clone()));
1415    }
1416
1417    let table_schema = schema
1418        .get(&stmt.table)
1419        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1420    schema.mark_dml(&table_schema.name);
1421
1422    let default_columns;
1423    let insert_columns: &[String] = if stmt.columns.is_empty() {
1424        default_columns = table_schema
1425            .columns
1426            .iter()
1427            .map(|c| c.name.clone())
1428            .collect::<Vec<_>>();
1429        &default_columns
1430    } else {
1431        &stmt.columns
1432    };
1433
1434    bufs.col_indices.clear();
1435    if let Some(c) = cache {
1436        bufs.col_indices.extend_from_slice(&c.col_indices);
1437    } else {
1438        for name in insert_columns {
1439            bufs.col_indices.push(
1440                table_schema
1441                    .column_index(name)
1442                    .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?,
1443            );
1444        }
1445    }
1446
1447    if cache.is_none() {
1448        for &ci in &bufs.col_indices {
1449            if table_schema.columns[ci].generated_kind.is_some() {
1450                return Err(SqlError::CannotInsertIntoGeneratedColumn(
1451                    table_schema.columns[ci].name.clone(),
1452                ));
1453            }
1454        }
1455    }
1456
1457    let generated_cols_uncached: Vec<(usize, &Expr, FastGenEval)>;
1458    let cached_gen_positions: &[usize];
1459    let cached_gen_fast_evals: &[FastGenEval];
1460    if let Some(c) = cache {
1461        cached_gen_positions = &c.generated_col_positions;
1462        cached_gen_fast_evals = &c.generated_fast_evals;
1463        generated_cols_uncached = Vec::new();
1464    } else {
1465        cached_gen_positions = &[];
1466        cached_gen_fast_evals = &[];
1467        generated_cols_uncached = table_schema
1468            .columns
1469            .iter()
1470            .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
1471            .map(|c| {
1472                let expr = c.generated_expr.as_ref().unwrap();
1473                let fe = detect_fast_gen_eval(expr, table_schema);
1474                (c.position as usize, expr, fe)
1475            })
1476            .collect();
1477    }
1478    let has_gen_cols = !cached_gen_positions.is_empty() || !generated_cols_uncached.is_empty();
1479    let row_col_map_for_gen_owned: Option<ColumnMap> = if !has_gen_cols || cache.is_some() {
1480        None
1481    } else {
1482        Some(ColumnMap::new(&table_schema.columns))
1483    };
1484    let row_col_map_for_gen: Option<&ColumnMap> = if !has_gen_cols {
1485        None
1486    } else if let Some(c) = cache {
1487        c.row_col_map.as_ref()
1488    } else {
1489        row_col_map_for_gen_owned.as_ref()
1490    };
1491
1492    let any_defaults = match cache {
1493        Some(c) => c.any_defaults,
1494        None => table_schema
1495            .columns
1496            .iter()
1497            .any(|c| c.default_expr.is_some()),
1498    };
1499    let defaults: Vec<(usize, &Expr)> = if any_defaults {
1500        table_schema
1501            .columns
1502            .iter()
1503            .filter(|c| {
1504                c.default_expr.is_some() && !bufs.col_indices.contains(&(c.position as usize))
1505            })
1506            .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
1507            .collect()
1508    } else {
1509        Vec::new()
1510    };
1511
1512    let has_checks = match cache {
1513        Some(c) => c.has_checks,
1514        None => table_schema.has_checks(),
1515    };
1516    let check_col_map = if has_checks {
1517        Some(ColumnMap::new(&table_schema.columns))
1518    } else {
1519        None
1520    };
1521
1522    let (pk_indices, non_pk, enc_pos, phys_count, dropped): (
1523        &[usize],
1524        &[usize],
1525        &[u16],
1526        usize,
1527        &[u16],
1528    ) = if let Some(c) = cache {
1529        (
1530            &c.pk_indices,
1531            &c.non_pk_indices,
1532            &c.encoding_positions,
1533            c.phys_count,
1534            &c.dropped_non_pk_slots,
1535        )
1536    } else {
1537        (
1538            table_schema.pk_indices(),
1539            table_schema.non_pk_indices(),
1540            table_schema.encoding_positions(),
1541            table_schema.physical_non_pk_count(),
1542            table_schema.dropped_non_pk_slots(),
1543        )
1544    };
1545
1546    bufs.row.resize(table_schema.columns.len(), Value::Null);
1547    bufs.pk_values.resize(pk_indices.len(), Value::Null);
1548    bufs.value_values.resize(phys_count, Value::Null);
1549
1550    let table_bytes = table_schema.name.as_bytes();
1551    let has_fks = !table_schema.foreign_keys.is_empty();
1552    let has_indices = !table_schema.indices.is_empty();
1553    let has_defaults = !defaults.is_empty();
1554
1555    let compiled_conflict: Option<Arc<CompiledOnConflict>> = match (cache, &stmt.on_conflict) {
1556        (Some(c), Some(_)) if c.on_conflict.is_some() => c.on_conflict.clone(),
1557        (_, Some(oc)) => Some(Arc::new(compile_on_conflict(oc, table_schema)?)),
1558        (_, None) => None,
1559    };
1560
1561    let row_col_map_owned: Option<ColumnMap> =
1562        if compiled_conflict.is_some() && cache.and_then(|c| c.row_col_map.as_ref()).is_none() {
1563            Some(ColumnMap::new(&table_schema.columns))
1564        } else {
1565            None
1566        };
1567    let row_col_map: Option<&ColumnMap> = cache
1568        .and_then(|c| c.row_col_map.as_ref())
1569        .or(row_col_map_owned.as_ref());
1570
1571    let select_rows = match &stmt.source {
1572        InsertSource::Select(sq) => {
1573            let insert_ctes = super::materialize_all_ctes_with_outer(
1574                &sq.ctes,
1575                sq.recursive,
1576                outer_ctes,
1577                &mut |body, ctx| exec_query_body_write(wtx, schema, body, ctx),
1578            )?;
1579            let qr = exec_query_body_write(wtx, schema, &sq.body, &insert_ctes)?;
1580            Some(qr.rows)
1581        }
1582        InsertSource::Values(_) => None,
1583    };
1584
1585    let mut count: u64 = 0;
1586    let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
1587        stmt.returning.as_ref().map(|_| Vec::new());
1588
1589    let values = match &stmt.source {
1590        InsertSource::Values(rows) => Some(rows.as_slice()),
1591        InsertSource::Select(_) => None,
1592    };
1593    let sel_rows = select_rows.as_deref();
1594
1595    let total = match (values, sel_rows) {
1596        (Some(rows), _) => rows.len(),
1597        (_, Some(rows)) => rows.len(),
1598        _ => 0,
1599    };
1600
1601    if let Some(sel) = sel_rows {
1602        if !sel.is_empty() && sel[0].len() != insert_columns.len() {
1603            return Err(SqlError::InvalidValue(format!(
1604                "INSERT ... SELECT column count mismatch: expected {}, got {}",
1605                insert_columns.len(),
1606                sel[0].len()
1607            )));
1608        }
1609    }
1610
1611    let has_insert_statement_triggers_impl =
1612        schema.triggers_for(&table_schema.name).iter().any(|t| {
1613            t.enabled
1614                && t.granularity == crate::parser::TriggerGranularity::ForEachStatement
1615                && t.events
1616                    .iter()
1617                    .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1618        });
1619    let mut stmt_new_rows_impl: Vec<Vec<Value>> = if has_insert_statement_triggers_impl {
1620        Vec::with_capacity(total)
1621    } else {
1622        Vec::new()
1623    };
1624    if has_insert_statement_triggers_impl {
1625        super::triggers::fire_statement_triggers(
1626            wtx,
1627            schema,
1628            &table_schema.name,
1629            crate::parser::TriggerTiming::Before,
1630            super::triggers::FireEvent::Insert,
1631            &table_schema.columns,
1632            &[],
1633            &[],
1634        )?;
1635    }
1636
1637    let skip_row_clear = cache.is_some_and(|c| c.row_fully_overwritten);
1638    for idx in 0..total {
1639        if !skip_row_clear {
1640            for v in bufs.row.iter_mut() {
1641                *v = Value::Null;
1642            }
1643        }
1644
1645        if let Some(value_rows) = values {
1646            if let Some(plan) = cache.and_then(|c| c.bind_plan.as_ref()) {
1647                for action in plan {
1648                    match action {
1649                        BindAction::Param {
1650                            param_idx,
1651                            col_idx,
1652                            target,
1653                        } => {
1654                            let v = &params[*param_idx];
1655                            bufs.row[*col_idx] = if v.is_null() {
1656                                Value::Null
1657                            } else if v.data_type() == *target {
1658                                v.clone()
1659                            } else {
1660                                let got = v.data_type();
1661                                v.clone().coerce_into(*target).ok_or_else(|| {
1662                                    SqlError::TypeMismatch {
1663                                        expected: target.to_string(),
1664                                        got: got.to_string(),
1665                                    }
1666                                })?
1667                            };
1668                        }
1669                        BindAction::Literal { value, col_idx } => {
1670                            bufs.row[*col_idx] = value.clone();
1671                        }
1672                    }
1673                }
1674            } else {
1675                let value_row = &value_rows[idx];
1676                if value_row.len() != insert_columns.len() {
1677                    return Err(SqlError::InvalidValue(format!(
1678                        "expected {} values, got {}",
1679                        insert_columns.len(),
1680                        value_row.len()
1681                    )));
1682                }
1683                for (i, expr) in value_row.iter().enumerate() {
1684                    let val = match expr {
1685                        Expr::Parameter(n) => params
1686                            .get(n - 1)
1687                            .cloned()
1688                            .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
1689                        Expr::Literal(v) => v.clone(),
1690                        _ => eval_const_expr(expr)?,
1691                    };
1692                    let col_idx = bufs.col_indices[i];
1693                    let col = &table_schema.columns[col_idx];
1694                    let got_type = val.data_type();
1695                    bufs.row[col_idx] = if val.is_null() {
1696                        Value::Null
1697                    } else {
1698                        val.coerce_into(col.data_type)
1699                            .ok_or_else(|| SqlError::TypeMismatch {
1700                                expected: col.data_type.to_string(),
1701                                got: got_type.to_string(),
1702                            })?
1703                    };
1704                }
1705            }
1706        } else if let Some(sel) = sel_rows {
1707            let sel_row = &sel[idx];
1708            for (i, val) in sel_row.iter().enumerate() {
1709                let col_idx = bufs.col_indices[i];
1710                let col = &table_schema.columns[col_idx];
1711                let got_type = val.data_type();
1712                bufs.row[col_idx] = if val.is_null() {
1713                    Value::Null
1714                } else {
1715                    val.clone().coerce_into(col.data_type).ok_or_else(|| {
1716                        SqlError::TypeMismatch {
1717                            expected: col.data_type.to_string(),
1718                            got: got_type.to_string(),
1719                        }
1720                    })?
1721                };
1722            }
1723        }
1724
1725        if has_defaults {
1726            for &(pos, def_expr) in &defaults {
1727                let val = eval_const_expr(def_expr)?;
1728                let col = &table_schema.columns[pos];
1729                if !val.is_null() {
1730                    let got_type = val.data_type();
1731                    bufs.row[pos] =
1732                        val.coerce_into(col.data_type)
1733                            .ok_or_else(|| SqlError::TypeMismatch {
1734                                expected: col.data_type.to_string(),
1735                                got: got_type.to_string(),
1736                            })?;
1737                }
1738            }
1739        }
1740
1741        if let Some(gen_map) = row_col_map_for_gen {
1742            if cache.is_some() {
1743                for (pos, fast) in cached_gen_positions
1744                    .iter()
1745                    .copied()
1746                    .zip(cached_gen_fast_evals.iter())
1747                {
1748                    let gen_expr = table_schema.columns[pos].generated_expr.as_ref().unwrap();
1749                    let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1750                    let col = &table_schema.columns[pos];
1751                    bufs.row[pos] = if val.is_null() {
1752                        Value::Null
1753                    } else {
1754                        let got_type = val.data_type();
1755                        val.coerce_into(col.data_type)
1756                            .ok_or_else(|| SqlError::TypeMismatch {
1757                                expected: col.data_type.to_string(),
1758                                got: got_type.to_string(),
1759                            })?
1760                    };
1761                }
1762            } else {
1763                for (pos, gen_expr, fast) in &generated_cols_uncached {
1764                    let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1765                    let col = &table_schema.columns[*pos];
1766                    bufs.row[*pos] = if val.is_null() {
1767                        Value::Null
1768                    } else {
1769                        let got_type = val.data_type();
1770                        val.coerce_into(col.data_type)
1771                            .ok_or_else(|| SqlError::TypeMismatch {
1772                                expected: col.data_type.to_string(),
1773                                got: got_type.to_string(),
1774                            })?
1775                    };
1776                }
1777            }
1778        }
1779
1780        if let Some(c) = cache {
1781            for &pos in &c.not_null_indices {
1782                if bufs.row[pos as usize].is_null() {
1783                    return Err(SqlError::NotNullViolation(
1784                        table_schema.columns[pos as usize].name.clone(),
1785                    ));
1786                }
1787            }
1788        } else {
1789            for col in &table_schema.columns {
1790                if !col.nullable && bufs.row[col.position as usize].is_null() {
1791                    return Err(SqlError::NotNullViolation(col.name.clone()));
1792                }
1793            }
1794        }
1795
1796        if let Some(ref col_map) = check_col_map {
1797            for col in &table_schema.columns {
1798                if let Some(ref check) = col.check_expr {
1799                    let result = eval_expr(check, &EvalCtx::new(col_map, &bufs.row))?;
1800                    if !is_truthy(&result) && !result.is_null() {
1801                        let name = col.check_name.as_deref().unwrap_or(&col.name);
1802                        return Err(SqlError::CheckViolation(name.to_string()));
1803                    }
1804                }
1805            }
1806            for tc in &table_schema.check_constraints {
1807                let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &bufs.row))?;
1808                if !is_truthy(&result) && !result.is_null() {
1809                    let name = tc.name.as_deref().unwrap_or(&tc.sql);
1810                    return Err(SqlError::CheckViolation(name.to_string()));
1811                }
1812            }
1813        }
1814
1815        if has_fks {
1816            for fk in &table_schema.foreign_keys {
1817                let any_null = fk.columns.iter().any(|&ci| bufs.row[ci as usize].is_null());
1818                if any_null {
1819                    continue;
1820                }
1821                crate::encoding::encode_composite_key_from_indices(
1822                    &fk.columns,
1823                    &bufs.row,
1824                    &mut bufs.fk_key_buf,
1825                );
1826                if fk.deferrable && fk.initially_deferred {
1827                    let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
1828                    wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
1829                        fk_name: name,
1830                        foreign_table: fk.foreign_table.as_bytes().to_vec(),
1831                        parent_key: bufs.fk_key_buf.clone(),
1832                    });
1833                    continue;
1834                }
1835                if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &bufs.fk_key_buf) {
1836                    let found = wtx
1837                        .table_get(fk.foreign_table.as_bytes(), &bufs.fk_key_buf)
1838                        .map_err(SqlError::Storage)?;
1839                    if found.is_none() {
1840                        let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
1841                        return Err(SqlError::ForeignKeyViolation(name.to_string()));
1842                    }
1843                    wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &bufs.fk_key_buf);
1844                }
1845            }
1846        }
1847
1848        let proposed_row_for_returning: Option<Vec<Value>> =
1849            returning_rows.as_ref().map(|_| bufs.row.clone());
1850        let row_for_stmt_trigger_impl: Option<Vec<Value>> = if has_insert_statement_triggers_impl {
1851            Some(bufs.row.clone())
1852        } else {
1853            None
1854        };
1855
1856        let has_before_insert_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
1857            t.enabled
1858                && t.timing == crate::parser::TriggerTiming::Before
1859                && t.granularity == crate::parser::TriggerGranularity::ForEachRow
1860                && t.events
1861                    .iter()
1862                    .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1863        });
1864        if has_before_insert_triggers {
1865            super::triggers::fire_row_triggers(
1866                wtx,
1867                schema,
1868                &table_schema.name,
1869                crate::parser::TriggerTiming::Before,
1870                super::triggers::FireEvent::Insert,
1871                None,
1872                Some(bufs.row.clone()),
1873                &table_schema.columns,
1874            )?;
1875        }
1876
1877        for (j, &i) in pk_indices.iter().enumerate() {
1878            bufs.pk_values[j] = std::mem::replace(&mut bufs.row[i], Value::Null);
1879        }
1880        match cache.map(|c| c.single_int_pk).unwrap_or(false) {
1881            true => match bufs.pk_values[0] {
1882                Value::Integer(v) => crate::encoding::encode_int_key_into(v, &mut bufs.key_buf),
1883                _ => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1884            },
1885            false => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1886        }
1887
1888        for &slot in dropped {
1889            bufs.value_values[slot as usize] = Value::Null;
1890        }
1891        for (j, &i) in non_pk.iter().enumerate() {
1892            let col = &table_schema.columns[i];
1893            if matches!(
1894                col.generated_kind,
1895                Some(crate::parser::GeneratedKind::Virtual)
1896            ) {
1897                bufs.value_values[enc_pos[j] as usize] = Value::Null;
1898                bufs.row[i] = Value::Null;
1899            } else {
1900                bufs.value_values[enc_pos[j] as usize] =
1901                    std::mem::replace(&mut bufs.row[i], Value::Null);
1902            }
1903        }
1904        match cache.and_then(|c| c.row_encoder.as_ref()) {
1905            Some(tmpl) => crate::encoding::encode_int_row_with_template(
1906                tmpl,
1907                &bufs.value_values,
1908                &mut bufs.value_buf,
1909            )?,
1910            None => encode_row_into(&bufs.value_values, &mut bufs.value_buf),
1911        }
1912
1913        if bufs.key_buf.len() > citadel_core::MAX_KEY_SIZE {
1914            return Err(SqlError::KeyTooLarge {
1915                size: bufs.key_buf.len(),
1916                max: citadel_core::MAX_KEY_SIZE,
1917            });
1918        }
1919        if bufs.value_buf.len() > citadel_core::MAX_VALUE_SIZE {
1920            return Err(SqlError::RowTooLarge {
1921                size: bufs.value_buf.len(),
1922                max: citadel_core::MAX_VALUE_SIZE,
1923            });
1924        }
1925
1926        match compiled_conflict.as_ref() {
1927            None => {
1928                let is_new = wtx
1929                    .table_insert(table_bytes, &bufs.key_buf, &bufs.value_buf)
1930                    .map_err(SqlError::Storage)?;
1931                if !is_new {
1932                    return Err(SqlError::DuplicateKey);
1933                }
1934                let has_after_insert_triggers =
1935                    schema.triggers_for(&table_schema.name).iter().any(|t| {
1936                        t.enabled
1937                            && t.timing == crate::parser::TriggerTiming::After
1938                            && t.granularity == crate::parser::TriggerGranularity::ForEachRow
1939                            && t.events
1940                                .iter()
1941                                .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1942                    });
1943                if has_indices || has_after_insert_triggers {
1944                    for (j, &i) in pk_indices.iter().enumerate() {
1945                        bufs.row[i] = bufs.pk_values[j].clone();
1946                    }
1947                    for (j, &i) in non_pk.iter().enumerate() {
1948                        bufs.row[i] = std::mem::replace(
1949                            &mut bufs.value_values[enc_pos[j] as usize],
1950                            Value::Null,
1951                        );
1952                    }
1953                    if has_indices {
1954                        insert_index_entries(wtx, table_schema, &bufs.row, &bufs.pk_values)?;
1955                    }
1956                    if has_after_insert_triggers {
1957                        super::triggers::fire_row_triggers(
1958                            wtx,
1959                            schema,
1960                            &table_schema.name,
1961                            crate::parser::TriggerTiming::After,
1962                            super::triggers::FireEvent::Insert,
1963                            None,
1964                            Some(bufs.row.clone()),
1965                            &table_schema.columns,
1966                        )?;
1967                    }
1968                }
1969                if let Some(r) = row_for_stmt_trigger_impl.clone() {
1970                    stmt_new_rows_impl.push(r);
1971                }
1972                count += 1;
1973                if let Some(buf) = returning_rows.as_mut() {
1974                    buf.push((None, proposed_row_for_returning));
1975                }
1976            }
1977            Some(oc) => {
1978                let oc_ref: &CompiledOnConflict = oc;
1979                let needs_row = upsert_needs_row(oc_ref, table_schema);
1980                if needs_row {
1981                    for (j, &i) in pk_indices.iter().enumerate() {
1982                        bufs.row[i] = bufs.pk_values[j].clone();
1983                    }
1984                    for (j, &i) in non_pk.iter().enumerate() {
1985                        bufs.row[i] = std::mem::replace(
1986                            &mut bufs.value_values[enc_pos[j] as usize],
1987                            Value::Null,
1988                        );
1989                    }
1990                }
1991                let outcome = apply_insert_with_conflict(
1992                    wtx,
1993                    table_schema,
1994                    &bufs.key_buf,
1995                    &bufs.value_buf,
1996                    &bufs.row,
1997                    &bufs.pk_values,
1998                    oc_ref,
1999                    row_col_map.unwrap(),
2000                    stmt.returning.is_some(),
2001                )?;
2002                match outcome {
2003                    InsertRowOutcome::Inserted => {
2004                        count += 1;
2005                        if let Some(buf) = returning_rows.as_mut() {
2006                            buf.push((None, proposed_row_for_returning));
2007                        }
2008                        if let Some(r) = row_for_stmt_trigger_impl.clone() {
2009                            stmt_new_rows_impl.push(r);
2010                        }
2011                        let has_after_insert_triggers =
2012                            schema.triggers_for(&table_schema.name).iter().any(|t| {
2013                                t.enabled
2014                                    && t.timing == crate::parser::TriggerTiming::After
2015                                    && t.granularity
2016                                        == crate::parser::TriggerGranularity::ForEachRow
2017                                    && t.events
2018                                        .iter()
2019                                        .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
2020                            });
2021                        if has_after_insert_triggers {
2022                            super::triggers::fire_row_triggers(
2023                                wtx,
2024                                schema,
2025                                &table_schema.name,
2026                                crate::parser::TriggerTiming::After,
2027                                super::triggers::FireEvent::Insert,
2028                                None,
2029                                Some(bufs.row.clone()),
2030                                &table_schema.columns,
2031                            )?;
2032                        }
2033                    }
2034                    InsertRowOutcome::Updated { old, new } => {
2035                        count += 1;
2036                        if let Some(buf) = returning_rows.as_mut() {
2037                            buf.push((Some(old.clone()), Some(new.clone())));
2038                        }
2039                        let has_after_update_triggers =
2040                            schema.triggers_for(&table_schema.name).iter().any(|t| {
2041                                t.enabled
2042                                    && t.timing == crate::parser::TriggerTiming::After
2043                                    && t.granularity
2044                                        == crate::parser::TriggerGranularity::ForEachRow
2045                                    && t.events.iter().any(|e| {
2046                                        matches!(e, crate::parser::TriggerEvent::Update(_))
2047                                    })
2048                            });
2049                        if has_after_update_triggers {
2050                            let changed_cols: Vec<String> = match oc_ref {
2051                                CompiledOnConflict::DoUpdate { assignments, .. } => assignments
2052                                    .iter()
2053                                    .map(|(col_idx, _)| table_schema.columns[*col_idx].name.clone())
2054                                    .collect(),
2055                                _ => Vec::new(),
2056                            };
2057                            super::triggers::fire_row_triggers(
2058                                wtx,
2059                                schema,
2060                                &table_schema.name,
2061                                crate::parser::TriggerTiming::After,
2062                                super::triggers::FireEvent::Update {
2063                                    changed_columns: &changed_cols,
2064                                },
2065                                Some(old),
2066                                Some(new),
2067                                &table_schema.columns,
2068                            )?;
2069                        }
2070                    }
2071                    InsertRowOutcome::Skipped => {}
2072                }
2073            }
2074        }
2075    }
2076
2077    if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
2078        if has_insert_statement_triggers_impl {
2079            super::triggers::fire_statement_triggers(
2080                wtx,
2081                schema,
2082                &table_schema.name,
2083                crate::parser::TriggerTiming::After,
2084                super::triggers::FireEvent::Insert,
2085                &table_schema.columns,
2086                &[],
2087                &stmt_new_rows_impl,
2088            )?;
2089        }
2090        return Ok(ExecutionResult::Query(super::helpers::project_returning(
2091            table_schema,
2092            returning_cols,
2093            &rows,
2094        )?));
2095    }
2096
2097    if has_insert_statement_triggers_impl {
2098        super::triggers::fire_statement_triggers(
2099            wtx,
2100            schema,
2101            &table_schema.name,
2102            crate::parser::TriggerTiming::After,
2103            super::triggers::FireEvent::Insert,
2104            &table_schema.columns,
2105            &[],
2106            &stmt_new_rows_impl,
2107        )?;
2108    }
2109
2110    Ok(ExecutionResult::RowsAffected(count))
2111}
2112
2113pub struct CompiledInsert {
2114    table_lower: String,
2115    cached: Option<InsertCache>,
2116}
2117
2118struct InsertCache {
2119    col_indices: Vec<usize>,
2120    has_subquery: bool,
2121    any_defaults: bool,
2122    has_checks: bool,
2123    on_conflict: Option<Arc<CompiledOnConflict>>,
2124    row_col_map: Option<ColumnMap>,
2125    generated_col_positions: Vec<usize>,
2126    generated_fast_evals: Vec<FastGenEval>,
2127    pk_indices: Vec<usize>,
2128    non_pk_indices: Vec<usize>,
2129    encoding_positions: Vec<u16>,
2130    dropped_non_pk_slots: Vec<u16>,
2131    phys_count: usize,
2132    single_int_pk: bool,
2133    not_null_indices: Vec<u16>,
2134    bind_plan: Option<Vec<BindAction>>,
2135    row_fully_overwritten: bool,
2136    row_encoder: Option<crate::encoding::IntRowTemplate>,
2137    is_trivial_fast: bool,
2138    trivial_fast_program: Option<TrivialFastProgram>,
2139    needs_scoped_params: bool,
2140}
2141
2142#[derive(Clone)]
2143enum BindAction {
2144    Param {
2145        param_idx: usize,
2146        col_idx: usize,
2147        target: DataType,
2148    },
2149    Literal {
2150        value: Value,
2151        col_idx: usize,
2152    },
2153}
2154
2155#[derive(Clone)]
2156struct TrivialFastProgram {
2157    template: Vec<u8>,
2158    ops: Vec<WriteOp>,
2159    pk_param: u8,
2160    not_null_param_indices: Vec<u8>,
2161}
2162
2163#[derive(Clone)]
2164enum WriteOp {
2165    ParamI64 {
2166        param_idx: u8,
2167        off: u32,
2168    },
2169    LiteralI64 {
2170        value: i64,
2171        off: u32,
2172    },
2173    GenAddParamsI64 {
2174        a_param: u8,
2175        b_param: u8,
2176        off: u32,
2177        bitmap_byte_off: u32,
2178        bitmap_bit_mask: u8,
2179    },
2180    GenMulAddParamI64 {
2181        param_idx: u8,
2182        mul: i64,
2183        add: i64,
2184        off: u32,
2185        bitmap_byte_off: u32,
2186        bitmap_bit_mask: u8,
2187    },
2188}
2189
2190fn build_trivial_fast_program(
2191    bind_plan: &[BindAction],
2192    row_encoder: &crate::encoding::IntRowTemplate,
2193    non_virtual_pairs: &[(usize, usize)],
2194    generated_col_positions: &[usize],
2195    generated_fast_evals: &[FastGenEval],
2196    pk_indices: &[usize],
2197    columns: &[crate::types::ColumnDef],
2198) -> Option<TrivialFastProgram> {
2199    let pk_col = pk_indices[0];
2200    let col_to_slot: rustc_hash::FxHashMap<usize, usize> =
2201        non_virtual_pairs.iter().copied().collect();
2202    let slot_to_off: rustc_hash::FxHashMap<usize, usize> =
2203        row_encoder.slot_offsets.iter().copied().collect();
2204
2205    let mut col_to_param: rustc_hash::FxHashMap<usize, u8> = Default::default();
2206    let mut col_to_lit_int: rustc_hash::FxHashMap<usize, i64> = Default::default();
2207    let mut pk_param: Option<u8> = None;
2208    let mut ops: Vec<WriteOp> = Vec::with_capacity(bind_plan.len() + generated_col_positions.len());
2209    let mut not_null_param_indices: Vec<u8> = Vec::new();
2210
2211    for action in bind_plan {
2212        match action {
2213            BindAction::Param {
2214                param_idx,
2215                col_idx,
2216                target,
2217            } => {
2218                if *target != DataType::Integer {
2219                    return None;
2220                }
2221                let pi: u8 = u8::try_from(*param_idx).ok()?;
2222                col_to_param.insert(*col_idx, pi);
2223                if *col_idx == pk_col {
2224                    pk_param = Some(pi);
2225                } else {
2226                    let slot = *col_to_slot.get(col_idx)?;
2227                    let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
2228                    ops.push(WriteOp::ParamI64 { param_idx: pi, off });
2229                    if !columns[*col_idx].nullable {
2230                        not_null_param_indices.push(pi);
2231                    }
2232                }
2233            }
2234            BindAction::Literal { value, col_idx } => match value {
2235                Value::Integer(v) => {
2236                    col_to_lit_int.insert(*col_idx, *v);
2237                    if *col_idx == pk_col {
2238                        return None;
2239                    }
2240                    let slot = *col_to_slot.get(col_idx)?;
2241                    let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
2242                    ops.push(WriteOp::LiteralI64 { value: *v, off });
2243                }
2244                _ => return None,
2245            },
2246        }
2247    }
2248
2249    let pk_param = pk_param?;
2250
2251    for (i, &gen_pos) in generated_col_positions.iter().enumerate() {
2252        let gen_slot = *col_to_slot.get(&gen_pos)?;
2253        let gen_off = u32::try_from(*slot_to_off.get(&gen_slot)?).ok()?;
2254        let bitmap_byte_off = u32::try_from(2 + gen_slot / 8).ok()?;
2255        let bitmap_bit_mask: u8 = 1u8 << (gen_slot % 8);
2256        let gen_col_nullable = columns[gen_pos].nullable;
2257
2258        match &generated_fast_evals[i] {
2259            FastGenEval::IntColAddCol {
2260                left_idx,
2261                right_idx,
2262            } => {
2263                let a_param = col_to_param.get(left_idx).copied();
2264                let b_param = col_to_param.get(right_idx).copied();
2265                match (a_param, b_param) {
2266                    (Some(ap), Some(bp)) => {
2267                        let deps_safe = gen_col_nullable
2268                            || (not_null_param_indices.contains(&ap)
2269                                && not_null_param_indices.contains(&bp));
2270                        if !deps_safe {
2271                            return None;
2272                        }
2273                        ops.push(WriteOp::GenAddParamsI64 {
2274                            a_param: ap,
2275                            b_param: bp,
2276                            off: gen_off,
2277                            bitmap_byte_off,
2278                            bitmap_bit_mask,
2279                        });
2280                    }
2281                    (Some(p), None) => {
2282                        let lit = col_to_lit_int.get(right_idx).copied()?;
2283                        if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2284                            return None;
2285                        }
2286                        ops.push(WriteOp::GenMulAddParamI64 {
2287                            param_idx: p,
2288                            mul: 1,
2289                            add: lit,
2290                            off: gen_off,
2291                            bitmap_byte_off,
2292                            bitmap_bit_mask,
2293                        });
2294                    }
2295                    (None, Some(p)) => {
2296                        let lit = col_to_lit_int.get(left_idx).copied()?;
2297                        if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2298                            return None;
2299                        }
2300                        ops.push(WriteOp::GenMulAddParamI64 {
2301                            param_idx: p,
2302                            mul: 1,
2303                            add: lit,
2304                            off: gen_off,
2305                            bitmap_byte_off,
2306                            bitmap_bit_mask,
2307                        });
2308                    }
2309                    (None, None) => {
2310                        let la = col_to_lit_int.get(left_idx).copied()?;
2311                        let lb = col_to_lit_int.get(right_idx).copied()?;
2312                        ops.push(WriteOp::LiteralI64 {
2313                            value: la.wrapping_add(lb),
2314                            off: gen_off,
2315                        });
2316                    }
2317                }
2318            }
2319            FastGenEval::IntColMulAdd {
2320                col_schema_idx,
2321                mul,
2322                add,
2323            } => {
2324                if let Some(p) = col_to_param.get(col_schema_idx).copied() {
2325                    if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2326                        return None;
2327                    }
2328                    ops.push(WriteOp::GenMulAddParamI64 {
2329                        param_idx: p,
2330                        mul: *mul,
2331                        add: *add,
2332                        off: gen_off,
2333                        bitmap_byte_off,
2334                        bitmap_bit_mask,
2335                    });
2336                } else if let Some(lit) = col_to_lit_int.get(col_schema_idx).copied() {
2337                    ops.push(WriteOp::LiteralI64 {
2338                        value: lit.wrapping_mul(*mul).wrapping_add(*add),
2339                        off: gen_off,
2340                    });
2341                } else {
2342                    return None;
2343                }
2344            }
2345            FastGenEval::None => return None,
2346        }
2347    }
2348
2349    Some(TrivialFastProgram {
2350        template: row_encoder.template.clone(),
2351        ops,
2352        pk_param,
2353        not_null_param_indices,
2354    })
2355}
2356
2357#[derive(Clone)]
2358pub(super) enum CompiledOnConflict {
2359    DoNothing {
2360        target: Option<ConflictKind>,
2361    },
2362    DoUpdate {
2363        target: ConflictKind,
2364        assignments: Vec<(usize, Expr)>,
2365        where_clause: Option<Expr>,
2366        fast_paths: Option<Vec<DoUpdateFastPath>>,
2367    },
2368}
2369
2370#[derive(Clone, Copy)]
2371pub(super) enum DoUpdateFastPath {
2372    IntAddConst { phys_idx: usize, delta: i64 },
2373}
2374
2375#[derive(Clone, Debug)]
2376pub(super) enum ConflictKind {
2377    PrimaryKey,
2378    UniqueIndex { index_idx: usize },
2379}
2380
2381fn resolve_conflict_target(target: &ConflictTarget, ts: &TableSchema) -> Result<ConflictKind> {
2382    match target {
2383        ConflictTarget::Columns(cols) => {
2384            let col_idx_set: Vec<u16> = cols
2385                .iter()
2386                .map(|name| {
2387                    ts.column_index(name)
2388                        .map(|i| i as u16)
2389                        .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
2390                })
2391                .collect::<Result<_>>()?;
2392            let pk_set = ts.primary_key_columns.clone();
2393            if set_equal(&col_idx_set, &pk_set) {
2394                return Ok(ConflictKind::PrimaryKey);
2395            }
2396            for (index_idx, idx) in ts.indices.iter().enumerate() {
2397                if idx.unique && set_equal(&col_idx_set, &idx.columns_vec()) {
2398                    return Ok(ConflictKind::UniqueIndex { index_idx });
2399                }
2400            }
2401            Err(SqlError::Plan(
2402                "ON CONFLICT target does not match any unique constraint".into(),
2403            ))
2404        }
2405        ConflictTarget::Constraint(name) => {
2406            let lower = name.to_ascii_lowercase();
2407            for (index_idx, idx) in ts.indices.iter().enumerate() {
2408                if idx.name.eq_ignore_ascii_case(&lower) {
2409                    if idx.unique {
2410                        return Ok(ConflictKind::UniqueIndex { index_idx });
2411                    }
2412                    return Err(SqlError::Plan(format!(
2413                        "ON CONFLICT ON CONSTRAINT '{name}' requires a unique index"
2414                    )));
2415                }
2416            }
2417            Err(SqlError::Plan(format!(
2418                "unknown constraint '{name}'; primary keys cannot be referenced by name, use ON CONFLICT (col_list)"
2419            )))
2420        }
2421    }
2422}
2423
2424fn set_equal(a: &[u16], b: &[u16]) -> bool {
2425    if a.len() != b.len() {
2426        return false;
2427    }
2428    let mut a_sorted = a.to_vec();
2429    let mut b_sorted = b.to_vec();
2430    a_sorted.sort_unstable();
2431    b_sorted.sort_unstable();
2432    a_sorted == b_sorted
2433}
2434
2435pub(super) enum InsertRowOutcome {
2436    Inserted,
2437    Updated { old: Vec<Value>, new: Vec<Value> },
2438    Skipped,
2439}
2440
2441#[allow(clippy::too_many_arguments)]
2442#[inline]
2443pub(super) fn apply_insert_with_conflict(
2444    wtx: &mut WriteTxn<'_>,
2445    table_schema: &TableSchema,
2446    key_buf: &[u8],
2447    value_buf: &[u8],
2448    row: &[Value],
2449    pk_values: &[Value],
2450    on_conflict: &CompiledOnConflict,
2451    col_map: &ColumnMap,
2452    capture_returning: bool,
2453) -> Result<InsertRowOutcome> {
2454    let table_bytes = table_schema.name.as_bytes();
2455
2456    if let CompiledOnConflict::DoNothing { target } = on_conflict {
2457        let pk_target = matches!(target, None | Some(ConflictKind::PrimaryKey));
2458        if pk_target && table_schema.indices.is_empty() && table_schema.foreign_keys.is_empty() {
2459            let inserted = wtx
2460                .table_insert_if_absent(table_bytes, key_buf, value_buf)
2461                .map_err(SqlError::Storage)?;
2462            return Ok(if inserted {
2463                InsertRowOutcome::Inserted
2464            } else {
2465                InsertRowOutcome::Skipped
2466            });
2467        }
2468    }
2469
2470    if let CompiledOnConflict::DoUpdate {
2471        target: ConflictKind::PrimaryKey,
2472        assignments,
2473        where_clause,
2474        fast_paths,
2475    } = on_conflict
2476    {
2477        if can_fuse_do_update(table_schema, assignments) {
2478            return apply_do_update_fused(
2479                wtx,
2480                table_schema,
2481                table_bytes,
2482                key_buf,
2483                value_buf,
2484                row,
2485                assignments,
2486                where_clause.as_ref(),
2487                col_map,
2488                fast_paths.as_deref(),
2489                capture_returning,
2490            );
2491        }
2492    }
2493
2494    let primary_outcome = wtx
2495        .table_insert_or_fetch(table_bytes, key_buf, value_buf)
2496        .map_err(SqlError::Storage)?;
2497
2498    match primary_outcome {
2499        citadel_txn::write_txn::InsertOutcome::Inserted => {
2500            if table_schema.indices.is_empty() {
2501                return Ok(InsertRowOutcome::Inserted);
2502            }
2503            let mut inserted_keys: Vec<(usize, Vec<u8>)> = Vec::new();
2504            match insert_index_entries_or_fetch(
2505                wtx,
2506                table_schema,
2507                row,
2508                pk_values,
2509                &mut inserted_keys,
2510            )? {
2511                None => Ok(InsertRowOutcome::Inserted),
2512                Some(conflicting_idx) => {
2513                    let matches_target =
2514                        matches!(on_conflict, CompiledOnConflict::DoNothing { target: None })
2515                            || matches!(
2516                                on_conflict,
2517                                CompiledOnConflict::DoNothing {
2518                                    target: Some(ConflictKind::UniqueIndex { index_idx }),
2519                                } | CompiledOnConflict::DoUpdate {
2520                                    target: ConflictKind::UniqueIndex { index_idx },
2521                                    ..
2522                                } if *index_idx == conflicting_idx
2523                            );
2524                    undo_partial_insert(wtx, table_schema, key_buf, &inserted_keys)?;
2525                    if !matches_target {
2526                        return Err(SqlError::UniqueViolation(
2527                            table_schema.indices[conflicting_idx].name.clone(),
2528                        ));
2529                    }
2530                    match on_conflict {
2531                        CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2532                        CompiledOnConflict::DoUpdate {
2533                            assignments,
2534                            where_clause,
2535                            ..
2536                        } => {
2537                            let existing_pk =
2538                                fetch_unique_index_pk(wtx, table_schema, conflicting_idx, row)?;
2539                            apply_do_update(
2540                                wtx,
2541                                table_schema,
2542                                &existing_pk,
2543                                row,
2544                                assignments,
2545                                where_clause.as_ref(),
2546                                col_map,
2547                                capture_returning,
2548                            )
2549                        }
2550                    }
2551                }
2552            }
2553        }
2554        citadel_txn::write_txn::InsertOutcome::Existed(old_bytes) => {
2555            let matches_target = matches!(
2556                on_conflict,
2557                CompiledOnConflict::DoNothing { target: None }
2558                    | CompiledOnConflict::DoNothing {
2559                        target: Some(ConflictKind::PrimaryKey),
2560                    }
2561                    | CompiledOnConflict::DoUpdate {
2562                        target: ConflictKind::PrimaryKey,
2563                        ..
2564                    }
2565            );
2566            if !matches_target {
2567                return Err(SqlError::DuplicateKey);
2568            }
2569            match on_conflict {
2570                CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2571                CompiledOnConflict::DoUpdate {
2572                    assignments,
2573                    where_clause,
2574                    ..
2575                } => {
2576                    let old_row = decode_full_row(table_schema, key_buf, &old_bytes)?;
2577                    apply_do_update_with_old_row(
2578                        wtx,
2579                        table_schema,
2580                        key_buf,
2581                        &old_row,
2582                        row,
2583                        assignments,
2584                        where_clause.as_ref(),
2585                        col_map,
2586                        capture_returning,
2587                    )
2588                }
2589            }
2590        }
2591    }
2592}
2593
2594#[inline]
2595fn apply_fast_path_patch(
2596    old_bytes: &[u8],
2597    fast_paths: &[DoUpdateFastPath],
2598) -> Result<UpsertAction> {
2599    UPSERT_SCRATCH.with(|slot| {
2600        let mut bufs = slot.borrow_mut();
2601        bufs.new_value_buf.clear();
2602        bufs.new_value_buf.extend_from_slice(old_bytes);
2603
2604        let mut patch_scratch: Vec<u8> = Vec::new();
2605
2606        for fp in fast_paths {
2607            match fp {
2608                DoUpdateFastPath::IntAddConst { phys_idx, delta } => {
2609                    let decoded =
2610                        crate::encoding::decode_columns(&bufs.new_value_buf, &[*phys_idx])?;
2611                    let old_val = &decoded[0];
2612                    let new_val = match old_val {
2613                        Value::Integer(i) => Value::Integer(i.wrapping_add(*delta)),
2614                        Value::Null => Value::Null,
2615                        _ => {
2616                            return Err(SqlError::TypeMismatch {
2617                                expected: "INTEGER".into(),
2618                                got: old_val.data_type().to_string(),
2619                            });
2620                        }
2621                    };
2622                    if !crate::encoding::patch_column_in_place(
2623                        &mut bufs.new_value_buf,
2624                        *phys_idx,
2625                        &new_val,
2626                    )? {
2627                        patch_scratch.clear();
2628                        crate::encoding::patch_row_column(
2629                            &bufs.new_value_buf,
2630                            *phys_idx,
2631                            &new_val,
2632                            &mut patch_scratch,
2633                        )?;
2634                        std::mem::swap(&mut bufs.new_value_buf, &mut patch_scratch);
2635                    }
2636                }
2637            }
2638        }
2639
2640        if bufs.new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2641            return Err(SqlError::RowTooLarge {
2642                size: bufs.new_value_buf.len(),
2643                max: citadel_core::MAX_VALUE_SIZE,
2644            });
2645        }
2646
2647        Ok(UpsertAction::Replace(bufs.new_value_buf.clone()))
2648    })
2649}
2650
2651fn upsert_needs_row(oc: &CompiledOnConflict, ts: &TableSchema) -> bool {
2652    if !ts.indices.is_empty() {
2653        return true;
2654    }
2655    match oc {
2656        CompiledOnConflict::DoNothing { .. } => false,
2657        CompiledOnConflict::DoUpdate { fast_paths, .. } => fast_paths.is_none() || ts.has_checks(),
2658    }
2659}
2660
2661fn can_fuse_do_update(ts: &TableSchema, assignments: &[(usize, Expr)]) -> bool {
2662    if !ts.indices.is_empty() {
2663        return false;
2664    }
2665    if !ts.foreign_keys.is_empty() {
2666        return false;
2667    }
2668    if ts.columns.iter().any(|c| c.generated_kind.is_some()) {
2669        return false;
2670    }
2671    let pk = ts.pk_indices();
2672    !assignments.iter().any(|(ci, _)| pk.contains(ci))
2673}
2674
2675#[allow(clippy::too_many_arguments)]
2676#[inline]
2677fn apply_do_update_fused(
2678    wtx: &mut WriteTxn<'_>,
2679    table_schema: &TableSchema,
2680    table_bytes: &[u8],
2681    key_buf: &[u8],
2682    value_buf: &[u8],
2683    proposed_row: &[Value],
2684    assignments: &[(usize, Expr)],
2685    where_clause: Option<&Expr>,
2686    col_map: &ColumnMap,
2687    fast_paths: Option<&[DoUpdateFastPath]>,
2688    capture_returning: bool,
2689) -> Result<InsertRowOutcome> {
2690    let non_pk = table_schema.non_pk_indices();
2691    let enc_pos = table_schema.encoding_positions();
2692    let phys_count = table_schema.physical_non_pk_count();
2693    let dropped = table_schema.dropped_non_pk_slots();
2694    let has_checks = table_schema.has_checks();
2695    let has_fks = !table_schema.foreign_keys.is_empty();
2696
2697    let captured: std::cell::RefCell<Option<(Vec<Value>, Vec<Value>)>> =
2698        std::cell::RefCell::new(None);
2699
2700    let outcome =
2701        wtx.table_upsert_with::<_, SqlError>(table_bytes, key_buf, value_buf, |old_bytes| {
2702            if let Some(fps) = fast_paths {
2703                if !has_checks {
2704                    let action = apply_fast_path_patch(old_bytes, fps)?;
2705                    if capture_returning {
2706                        if let UpsertAction::Replace(ref new_bytes) = action {
2707                            let old_row = decode_full_row(table_schema, key_buf, old_bytes)?;
2708                            let new_row = decode_full_row(table_schema, key_buf, new_bytes)?;
2709                            *captured.borrow_mut() = Some((old_row, new_row));
2710                        }
2711                    }
2712                    return Ok(action);
2713                }
2714            }
2715            UPSERT_SCRATCH.with(|slot| {
2716                let mut bufs = slot.borrow_mut();
2717                let UpsertBufs {
2718                    old_row,
2719                    new_row,
2720                    value_values,
2721                    new_value_buf,
2722                } = &mut *bufs;
2723
2724                old_row.clear();
2725                old_row.resize(table_schema.columns.len(), Value::Null);
2726                decode_full_row_into(table_schema, key_buf, old_bytes, old_row)?;
2727
2728                if let Some(w) = where_clause {
2729                    let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2730                    let result = eval_expr(w, &ctx)?;
2731                    if result.is_null() || !is_truthy(&result) {
2732                        return Ok(UpsertAction::Skip);
2733                    }
2734                }
2735
2736                new_row.clear();
2737                new_row.extend_from_slice(old_row);
2738                for (col_idx, expr) in assignments {
2739                    let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2740                    let val = eval_expr(expr, &ctx)?;
2741                    let col = &table_schema.columns[*col_idx];
2742                    new_row[*col_idx] = if val.is_null() {
2743                        Value::Null
2744                    } else {
2745                        let got = val.data_type();
2746                        val.coerce_into(col.data_type)
2747                            .ok_or_else(|| SqlError::TypeMismatch {
2748                                expected: col.data_type.to_string(),
2749                                got: got.to_string(),
2750                            })?
2751                    };
2752                }
2753
2754                for (assigned_idx, _) in assignments {
2755                    let col = &table_schema.columns[*assigned_idx];
2756                    if !col.nullable && new_row[col.position as usize].is_null() {
2757                        return Err(SqlError::NotNullViolation(col.name.clone()));
2758                    }
2759                }
2760                if has_checks {
2761                    for col in &table_schema.columns {
2762                        if let Some(ref check) = col.check_expr {
2763                            let ctx = EvalCtx::new(col_map, new_row);
2764                            let result = eval_expr(check, &ctx)?;
2765                            if !is_truthy(&result) && !result.is_null() {
2766                                let name = col.check_name.as_deref().unwrap_or(&col.name);
2767                                return Err(SqlError::CheckViolation(name.to_string()));
2768                            }
2769                        }
2770                    }
2771                    for tc in &table_schema.check_constraints {
2772                        let ctx = EvalCtx::new(col_map, new_row);
2773                        let result = eval_expr(&tc.expr, &ctx)?;
2774                        if !is_truthy(&result) && !result.is_null() {
2775                            let name = tc.name.as_deref().unwrap_or(&tc.sql);
2776                            return Err(SqlError::CheckViolation(name.to_string()));
2777                        }
2778                    }
2779                }
2780                let _ = has_fks;
2781
2782                value_values.clear();
2783                value_values.resize(phys_count, Value::Null);
2784                for &slot in dropped {
2785                    value_values[slot as usize] = Value::Null;
2786                }
2787                for (j, &i) in non_pk.iter().enumerate() {
2788                    value_values[enc_pos[j] as usize] = new_row[i].clone();
2789                }
2790                new_value_buf.clear();
2791                crate::encoding::encode_row_into(value_values, new_value_buf);
2792
2793                if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2794                    return Err(SqlError::RowTooLarge {
2795                        size: new_value_buf.len(),
2796                        max: citadel_core::MAX_VALUE_SIZE,
2797                    });
2798                }
2799
2800                if capture_returning {
2801                    *captured.borrow_mut() = Some((old_row.clone(), new_row.clone()));
2802                }
2803                Ok(UpsertAction::Replace(new_value_buf.clone()))
2804            })
2805        })?;
2806
2807    match outcome {
2808        UpsertOutcome::Inserted => Ok(InsertRowOutcome::Inserted),
2809        UpsertOutcome::Updated => {
2810            if capture_returning {
2811                let (old, new) = captured.into_inner().ok_or_else(|| {
2812                    SqlError::InvalidValue("DO UPDATE produced no captured rows".into())
2813                })?;
2814                Ok(InsertRowOutcome::Updated { old, new })
2815            } else {
2816                Ok(InsertRowOutcome::Inserted)
2817            }
2818        }
2819        UpsertOutcome::Skipped => Ok(InsertRowOutcome::Skipped),
2820    }
2821}
2822
2823fn fetch_unique_index_pk(
2824    wtx: &mut WriteTxn<'_>,
2825    table_schema: &TableSchema,
2826    index_idx: usize,
2827    row: &[Value],
2828) -> Result<Vec<u8>> {
2829    let idx = &table_schema.indices[index_idx];
2830    let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2831    let indexed: Vec<Value> = idx
2832        .column_positions_iter()
2833        .map(|col_idx| row[col_idx as usize].clone())
2834        .collect();
2835    let key = crate::encoding::encode_composite_key(&indexed);
2836    let value = wtx
2837        .table_get(&idx_table, &key)
2838        .map_err(SqlError::Storage)?
2839        .ok_or_else(|| {
2840            SqlError::InvalidValue("unique index missing expected collision entry".into())
2841        })?;
2842    Ok(value)
2843}
2844
2845#[allow(clippy::too_many_arguments)]
2846fn apply_do_update(
2847    wtx: &mut WriteTxn<'_>,
2848    table_schema: &TableSchema,
2849    pk_key: &[u8],
2850    proposed_row: &[Value],
2851    assignments: &[(usize, Expr)],
2852    where_clause: Option<&Expr>,
2853    col_map: &ColumnMap,
2854    capture_returning: bool,
2855) -> Result<InsertRowOutcome> {
2856    let old_value = wtx
2857        .table_get(table_schema.name.as_bytes(), pk_key)
2858        .map_err(SqlError::Storage)?
2859        .ok_or_else(|| SqlError::InvalidValue("primary row missing for DO UPDATE target".into()))?;
2860    let old_row = decode_full_row(table_schema, pk_key, &old_value)?;
2861    apply_do_update_with_old_row(
2862        wtx,
2863        table_schema,
2864        pk_key,
2865        &old_row,
2866        proposed_row,
2867        assignments,
2868        where_clause,
2869        col_map,
2870        capture_returning,
2871    )
2872}
2873
2874#[allow(clippy::too_many_arguments)]
2875fn apply_do_update_with_old_row(
2876    wtx: &mut WriteTxn<'_>,
2877    table_schema: &TableSchema,
2878    old_pk_key: &[u8],
2879    old_row: &[Value],
2880    proposed_row: &[Value],
2881    assignments: &[(usize, Expr)],
2882    where_clause: Option<&Expr>,
2883    col_map: &ColumnMap,
2884    capture_returning: bool,
2885) -> Result<InsertRowOutcome> {
2886    if let Some(w) = where_clause {
2887        let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2888        let result = eval_expr(w, &ctx)?;
2889        if result.is_null() || !is_truthy(&result) {
2890            return Ok(InsertRowOutcome::Skipped);
2891        }
2892    }
2893
2894    let mut new_row = old_row.to_vec();
2895    for (col_idx, expr) in assignments {
2896        let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2897        let val = eval_expr(expr, &ctx)?;
2898        let col = &table_schema.columns[*col_idx];
2899        new_row[*col_idx] = if val.is_null() {
2900            Value::Null
2901        } else {
2902            let got = val.data_type();
2903            val.coerce_into(col.data_type)
2904                .ok_or_else(|| SqlError::TypeMismatch {
2905                    expected: col.data_type.to_string(),
2906                    got: got.to_string(),
2907                })?
2908        };
2909    }
2910
2911    for col in &table_schema.columns {
2912        if matches!(
2913            col.generated_kind,
2914            Some(crate::parser::GeneratedKind::Stored)
2915        ) {
2916            let val = eval_expr(
2917                col.generated_expr.as_ref().unwrap(),
2918                &EvalCtx::new(col_map, &new_row),
2919            )?;
2920            let pos = col.position as usize;
2921            new_row[pos] = if val.is_null() {
2922                if !col.nullable {
2923                    return Err(SqlError::NotNullViolation(col.name.clone()));
2924                }
2925                Value::Null
2926            } else {
2927                let got = val.data_type();
2928                val.coerce_into(col.data_type)
2929                    .ok_or_else(|| SqlError::TypeMismatch {
2930                        expected: col.data_type.to_string(),
2931                        got: got.to_string(),
2932                    })?
2933            };
2934        }
2935    }
2936
2937    let pk_indices = table_schema.pk_indices();
2938    let assigned_pk = assignments.iter().any(|(ci, _)| pk_indices.contains(ci));
2939    let pk_changed = assigned_pk && pk_indices.iter().any(|&i| old_row[i] != new_row[i]);
2940
2941    for (assigned_idx, _) in assignments {
2942        let col = &table_schema.columns[*assigned_idx];
2943        if !col.nullable && new_row[col.position as usize].is_null() {
2944            return Err(SqlError::NotNullViolation(col.name.clone()));
2945        }
2946    }
2947    if table_schema.has_checks() {
2948        for col in &table_schema.columns {
2949            if let Some(ref check) = col.check_expr {
2950                let ctx = EvalCtx::new(col_map, &new_row);
2951                let result = eval_expr(check, &ctx)?;
2952                if !is_truthy(&result) && !result.is_null() {
2953                    let name = col.check_name.as_deref().unwrap_or(&col.name);
2954                    return Err(SqlError::CheckViolation(name.to_string()));
2955                }
2956            }
2957        }
2958        for tc in &table_schema.check_constraints {
2959            let ctx = EvalCtx::new(col_map, &new_row);
2960            let result = eval_expr(&tc.expr, &ctx)?;
2961            if !is_truthy(&result) && !result.is_null() {
2962                let name = tc.name.as_deref().unwrap_or(&tc.sql);
2963                return Err(SqlError::CheckViolation(name.to_string()));
2964            }
2965        }
2966    }
2967    for fk in &table_schema.foreign_keys {
2968        let changed = fk
2969            .columns
2970            .iter()
2971            .any(|&ci| old_row[ci as usize] != new_row[ci as usize]);
2972        if !changed {
2973            continue;
2974        }
2975        let any_null = fk.columns.iter().any(|&ci| new_row[ci as usize].is_null());
2976        if any_null {
2977            continue;
2978        }
2979        let fk_vals: Vec<Value> = fk
2980            .columns
2981            .iter()
2982            .map(|&ci| new_row[ci as usize].clone())
2983            .collect();
2984        let fk_key = crate::encoding::encode_composite_key(&fk_vals);
2985        if fk.deferrable && fk.initially_deferred {
2986            let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
2987            wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
2988                fk_name: name,
2989                foreign_table: fk.foreign_table.as_bytes().to_vec(),
2990                parent_key: fk_key,
2991            });
2992            continue;
2993        }
2994        if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key) {
2995            let found = wtx
2996                .table_get(fk.foreign_table.as_bytes(), &fk_key)
2997                .map_err(SqlError::Storage)?;
2998            if found.is_none() {
2999                let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
3000                return Err(SqlError::ForeignKeyViolation(name.to_string()));
3001            }
3002            wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key);
3003        }
3004    }
3005
3006    let has_indices = !table_schema.indices.is_empty();
3007    let old_pk_values: Vec<Value> = if has_indices || pk_changed {
3008        pk_indices.iter().map(|&i| old_row[i].clone()).collect()
3009    } else {
3010        Vec::new()
3011    };
3012    let new_pk_values: Vec<Value> = if has_indices || pk_changed {
3013        pk_indices.iter().map(|&i| new_row[i].clone()).collect()
3014    } else {
3015        Vec::new()
3016    };
3017
3018    let non_pk = table_schema.non_pk_indices();
3019    let enc_pos = table_schema.encoding_positions();
3020    let phys_count = table_schema.physical_non_pk_count();
3021    let dropped = table_schema.dropped_non_pk_slots();
3022    let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
3023    for &slot in dropped {
3024        value_values[slot as usize] = Value::Null;
3025    }
3026    for (j, &i) in non_pk.iter().enumerate() {
3027        let col = &table_schema.columns[i];
3028        value_values[enc_pos[j] as usize] = if matches!(
3029            col.generated_kind,
3030            Some(crate::parser::GeneratedKind::Virtual)
3031        ) {
3032            Value::Null
3033        } else {
3034            new_row[i].clone()
3035        };
3036    }
3037    let mut new_value_buf = Vec::with_capacity(256);
3038    crate::encoding::encode_row_into(&value_values, &mut new_value_buf);
3039
3040    if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
3041        return Err(SqlError::RowTooLarge {
3042            size: new_value_buf.len(),
3043            max: citadel_core::MAX_VALUE_SIZE,
3044        });
3045    }
3046
3047    if pk_changed {
3048        let new_pk_key = crate::encoding::encode_composite_key(&new_pk_values);
3049        let inserted = wtx
3050            .table_insert(table_schema.name.as_bytes(), &new_pk_key, &new_value_buf)
3051            .map_err(SqlError::Storage)?;
3052        if !inserted {
3053            return Err(SqlError::DuplicateKey);
3054        }
3055        wtx.table_delete(table_schema.name.as_bytes(), old_pk_key)
3056            .map_err(SqlError::Storage)?;
3057        for idx in &table_schema.indices {
3058            let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
3059            let old_idx_key =
3060                encode_index_key_with_schema(idx, old_row, &old_pk_values, table_schema);
3061            wtx.table_delete(&idx_table, &old_idx_key)
3062                .map_err(SqlError::Storage)?;
3063            let new_idx_key =
3064                encode_index_key_with_schema(idx, &new_row, &new_pk_values, table_schema);
3065            let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
3066            let is_new = wtx
3067                .table_insert(&idx_table, &new_idx_key, &new_idx_val)
3068                .map_err(SqlError::Storage)?;
3069            if idx.unique && !is_new {
3070                let any_null = idx
3071                    .column_positions_iter()
3072                    .any(|c| new_row[c as usize].is_null());
3073                if !any_null {
3074                    return Err(SqlError::UniqueViolation(idx.name.clone()));
3075                }
3076            }
3077        }
3078    } else {
3079        wtx.table_update_sorted(
3080            table_schema.name.as_bytes(),
3081            &[(old_pk_key, new_value_buf.as_slice())],
3082        )
3083        .map_err(SqlError::Storage)?;
3084        let col_map_partial = any_partial_index(table_schema).then(|| table_schema.column_map());
3085        for idx in &table_schema.indices {
3086            let cols_changed = index_columns_changed(idx, old_row, &new_row);
3087            let (del, ins) = partial_idx_update_actions(
3088                idx,
3089                old_row,
3090                &new_row,
3091                cols_changed,
3092                false,
3093                col_map_partial,
3094            );
3095            let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
3096            if del {
3097                let old_idx_key =
3098                    encode_index_key_with_schema(idx, old_row, &old_pk_values, table_schema);
3099                wtx.table_delete(&idx_table, &old_idx_key)
3100                    .map_err(SqlError::Storage)?;
3101            }
3102            if ins {
3103                let new_idx_key =
3104                    encode_index_key_with_schema(idx, &new_row, &new_pk_values, table_schema);
3105                let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
3106                let is_new = wtx
3107                    .table_insert(&idx_table, &new_idx_key, &new_idx_val)
3108                    .map_err(SqlError::Storage)?;
3109                if idx.unique && !is_new {
3110                    let any_null = idx
3111                        .column_positions_iter()
3112                        .any(|c| new_row[c as usize].is_null());
3113                    if !any_null {
3114                        return Err(SqlError::UniqueViolation(idx.name.clone()));
3115                    }
3116                }
3117            }
3118        }
3119    }
3120
3121    if capture_returning {
3122        Ok(InsertRowOutcome::Updated {
3123            old: old_row.to_vec(),
3124            new: new_row,
3125        })
3126    } else {
3127        Ok(InsertRowOutcome::Inserted)
3128    }
3129}
3130
3131fn detect_fast_paths(
3132    ts: &TableSchema,
3133    assignments: &[(usize, Expr)],
3134) -> Option<Vec<DoUpdateFastPath>> {
3135    let non_pk = ts.non_pk_indices();
3136    let enc_pos = ts.encoding_positions();
3137    let mut out = Vec::with_capacity(assignments.len());
3138    for (col_idx, expr) in assignments {
3139        let col = &ts.columns[*col_idx];
3140        if col.data_type != DataType::Integer {
3141            return None;
3142        }
3143        let nonpk_order = non_pk.iter().position(|&i| i == *col_idx)?;
3144        let phys_idx = enc_pos[nonpk_order] as usize;
3145
3146        if let Expr::BinaryOp { left, op, right } = expr {
3147            if !matches!(op, BinOp::Add | BinOp::Sub) {
3148                return None;
3149            }
3150            let reads_target =
3151                matches!(left.as_ref(), Expr::Column(n) if n.eq_ignore_ascii_case(&col.name));
3152            if !reads_target {
3153                return None;
3154            }
3155            if let Expr::Literal(Value::Integer(n)) = right.as_ref() {
3156                let delta = if matches!(op, BinOp::Sub) { -n } else { *n };
3157                let _ = col_idx;
3158                out.push(DoUpdateFastPath::IntAddConst { phys_idx, delta });
3159                continue;
3160            }
3161            return None;
3162        }
3163        return None;
3164    }
3165    Some(out)
3166}
3167
3168fn compile_on_conflict(oc: &OnConflictClause, ts: &TableSchema) -> Result<CompiledOnConflict> {
3169    let target = oc
3170        .target
3171        .as_ref()
3172        .map(|t| resolve_conflict_target(t, ts))
3173        .transpose()?;
3174    match &oc.action {
3175        OnConflictAction::DoNothing => Ok(CompiledOnConflict::DoNothing { target }),
3176        OnConflictAction::DoUpdate {
3177            assignments,
3178            where_clause,
3179        } => {
3180            let target = target.ok_or_else(|| {
3181                SqlError::Plan("ON CONFLICT without target requires DO NOTHING".into())
3182            })?;
3183            let compiled_assignments: Vec<(usize, Expr)> = assignments
3184                .iter()
3185                .map(|(name, expr)| {
3186                    let col_idx = ts
3187                        .column_index(name)
3188                        .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?;
3189                    Ok((col_idx, expr.clone()))
3190                })
3191                .collect::<Result<_>>()?;
3192            let fast_paths = if where_clause.is_none() {
3193                detect_fast_paths(ts, &compiled_assignments)
3194            } else {
3195                None
3196            };
3197            Ok(CompiledOnConflict::DoUpdate {
3198                target,
3199                assignments: compiled_assignments,
3200                where_clause: where_clause.clone(),
3201                fast_paths,
3202            })
3203        }
3204    }
3205}
3206
3207/// Caller MUST check `cache.is_trivial_fast` first.
3208fn exec_insert_trivial_fast(
3209    wtx: &mut WriteTxn<'_>,
3210    table_lower: &str,
3211    cache: &InsertCache,
3212    bufs: &mut InsertBufs,
3213    params: &[Value],
3214) -> Result<ExecutionResult> {
3215    let prog = cache
3216        .trivial_fast_program
3217        .as_ref()
3218        .expect("trivial fast: program");
3219
3220    for &p in &prog.not_null_param_indices {
3221        if params[p as usize].is_null() {
3222            return Err(SqlError::NotNullViolation(format!("param@{p}")));
3223        }
3224    }
3225
3226    match &params[prog.pk_param as usize] {
3227        Value::Integer(v) => crate::encoding::encode_int_key_into(*v, &mut bufs.key_buf),
3228        _ => return Err(SqlError::InvalidValue("non-integer PK in fast path".into())),
3229    }
3230
3231    bufs.value_buf.clear();
3232    bufs.value_buf.extend_from_slice(&prog.template);
3233
3234    for op in &prog.ops {
3235        match op {
3236            WriteOp::ParamI64 { param_idx, off } => match &params[*param_idx as usize] {
3237                Value::Integer(v) => {
3238                    let off = *off as usize;
3239                    bufs.value_buf[off..off + 8].copy_from_slice(&v.to_le_bytes());
3240                }
3241                other => {
3242                    return Err(SqlError::TypeMismatch {
3243                        expected: "Integer".into(),
3244                        got: other.data_type().to_string(),
3245                    });
3246                }
3247            },
3248            WriteOp::LiteralI64 { value, off } => {
3249                let off = *off as usize;
3250                bufs.value_buf[off..off + 8].copy_from_slice(&value.to_le_bytes());
3251            }
3252            WriteOp::GenAddParamsI64 {
3253                a_param,
3254                b_param,
3255                off,
3256                bitmap_byte_off,
3257                bitmap_bit_mask,
3258            } => match (&params[*a_param as usize], &params[*b_param as usize]) {
3259                (Value::Integer(a), Value::Integer(b)) => {
3260                    let off = *off as usize;
3261                    bufs.value_buf[off..off + 8].copy_from_slice(&a.wrapping_add(*b).to_le_bytes());
3262                }
3263                _ => {
3264                    bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
3265                }
3266            },
3267            WriteOp::GenMulAddParamI64 {
3268                param_idx,
3269                mul,
3270                add,
3271                off,
3272                bitmap_byte_off,
3273                bitmap_bit_mask,
3274            } => match &params[*param_idx as usize] {
3275                Value::Integer(v) => {
3276                    let r = v.wrapping_mul(*mul).wrapping_add(*add);
3277                    let off = *off as usize;
3278                    bufs.value_buf[off..off + 8].copy_from_slice(&r.to_le_bytes());
3279                }
3280                _ => {
3281                    bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
3282                }
3283            },
3284        }
3285    }
3286
3287    let is_new = wtx
3288        .table_insert(table_lower.as_bytes(), &bufs.key_buf, &bufs.value_buf)
3289        .map_err(SqlError::Storage)?;
3290    if !is_new {
3291        return Err(SqlError::DuplicateKey);
3292    }
3293    Ok(ExecutionResult::RowsAffected(1))
3294}
3295
3296fn build_bind_plan(
3297    stmt: &InsertStmt,
3298    col_indices: &[usize],
3299    col_data_types: &[DataType],
3300) -> Option<Vec<BindAction>> {
3301    let rows = match &stmt.source {
3302        InsertSource::Values(rows) => rows,
3303        _ => return None,
3304    };
3305    if rows.len() != 1 {
3306        return None;
3307    }
3308    let value_row = &rows[0];
3309    if value_row.len() != col_indices.len() {
3310        return None;
3311    }
3312    let mut plan = Vec::with_capacity(value_row.len());
3313    for (i, expr) in value_row.iter().enumerate() {
3314        let col_idx = col_indices[i];
3315        let target = col_data_types[col_idx];
3316        match expr {
3317            Expr::Parameter(n) => {
3318                if *n == 0 {
3319                    return None;
3320                }
3321                plan.push(BindAction::Param {
3322                    param_idx: n - 1,
3323                    col_idx,
3324                    target,
3325                });
3326            }
3327            Expr::Literal(v) => plan.push(BindAction::Literal {
3328                value: v.clone(),
3329                col_idx,
3330            }),
3331            _ => return None,
3332        }
3333    }
3334    Some(plan)
3335}
3336
3337impl CompiledInsert {
3338    pub fn try_compile(schema: &SchemaManager, stmt: &InsertStmt) -> Option<Self> {
3339        let lower = stmt.table.to_ascii_lowercase();
3340        let cached = if let Some(ts) = schema.get(&lower) {
3341            let insert_columns: Vec<&str> = if stmt.columns.is_empty() {
3342                ts.columns.iter().map(|c| c.name.as_str()).collect()
3343            } else {
3344                stmt.columns.iter().map(|s| s.as_str()).collect()
3345            };
3346            let mut col_indices = Vec::with_capacity(insert_columns.len());
3347            for name in &insert_columns {
3348                col_indices.push(ts.column_index(name)?);
3349            }
3350            if col_indices
3351                .iter()
3352                .any(|&ci| ts.columns[ci].generated_kind.is_some())
3353            {
3354                return None;
3355            }
3356            let on_conflict = stmt
3357                .on_conflict
3358                .as_ref()
3359                .map(|oc| compile_on_conflict(oc, ts))
3360                .transpose()
3361                .ok()
3362                .flatten()
3363                .map(Arc::new);
3364            let generated_col_positions: Vec<usize> = ts
3365                .columns
3366                .iter()
3367                .enumerate()
3368                .filter_map(|(i, c)| {
3369                    matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored))
3370                        .then_some(i)
3371                })
3372                .collect();
3373            let generated_fast_evals: Vec<FastGenEval> = generated_col_positions
3374                .iter()
3375                .map(|&pos| {
3376                    detect_fast_gen_eval(ts.columns[pos].generated_expr.as_ref().unwrap(), ts)
3377                })
3378                .collect();
3379            let row_col_map = if on_conflict.is_some() || !generated_col_positions.is_empty() {
3380                Some(ColumnMap::new(&ts.columns))
3381            } else {
3382                None
3383            };
3384            let pk_indices: Vec<usize> = ts.pk_indices().to_vec();
3385            let non_pk_indices: Vec<usize> = ts.non_pk_indices().to_vec();
3386            let encoding_positions: Vec<u16> = ts.encoding_positions().to_vec();
3387            let dropped_non_pk_slots: Vec<u16> = ts.dropped_non_pk_slots().to_vec();
3388            let phys_count = ts.physical_non_pk_count();
3389            let col_data_types: Vec<DataType> = ts.columns.iter().map(|c| c.data_type).collect();
3390            let single_int_pk =
3391                pk_indices.len() == 1 && ts.columns[pk_indices[0]].data_type == DataType::Integer;
3392            let not_null_indices: Vec<u16> = ts
3393                .columns
3394                .iter()
3395                .filter(|c| !c.nullable)
3396                .map(|c| c.position)
3397                .collect();
3398            let bind_plan = build_bind_plan(stmt, &col_indices, &col_data_types);
3399            let any_defaults_flag = ts.columns.iter().any(|c| c.default_expr.is_some());
3400            let row_fully_overwritten = if any_defaults_flag {
3401                false
3402            } else {
3403                let mut covered: rustc_hash::FxHashSet<usize> =
3404                    col_indices.iter().copied().collect();
3405                covered.extend(generated_col_positions.iter().copied());
3406                for (j, &i) in non_pk_indices.iter().enumerate() {
3407                    let _ = j;
3408                    if matches!(
3409                        ts.columns[i].generated_kind,
3410                        Some(crate::parser::GeneratedKind::Virtual)
3411                    ) {
3412                        covered.insert(i);
3413                    }
3414                }
3415                bind_plan.is_some() && covered.len() == ts.columns.len()
3416            };
3417            let has_fks = !ts.foreign_keys.is_empty();
3418            let has_indices = !ts.indices.is_empty();
3419            let mut non_virtual_pairs: Vec<(usize, usize)> = Vec::new();
3420            let mut null_value_slots: Vec<usize> =
3421                dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3422            for (j, &i) in non_pk_indices.iter().enumerate() {
3423                let slot = encoding_positions[j] as usize;
3424                if matches!(
3425                    ts.columns[i].generated_kind,
3426                    Some(crate::parser::GeneratedKind::Virtual)
3427                ) {
3428                    null_value_slots.push(slot);
3429                } else {
3430                    non_virtual_pairs.push((i, slot));
3431                }
3432            }
3433            let row_encoder = {
3434                let all_int_or_null = non_pk_indices.iter().enumerate().all(|(j, &i)| {
3435                    let col = &ts.columns[i];
3436                    if matches!(
3437                        col.generated_kind,
3438                        Some(crate::parser::GeneratedKind::Virtual)
3439                    ) {
3440                        true
3441                    } else {
3442                        col.data_type == DataType::Integer && encoding_positions[j] != u16::MAX
3443                    }
3444                });
3445                if all_int_or_null {
3446                    let mut null_slots: Vec<usize> =
3447                        dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3448                    for (j, &i) in non_pk_indices.iter().enumerate() {
3449                        if matches!(
3450                            ts.columns[i].generated_kind,
3451                            Some(crate::parser::GeneratedKind::Virtual)
3452                        ) {
3453                            null_slots.push(encoding_positions[j] as usize);
3454                        }
3455                    }
3456                    Some(crate::encoding::build_int_row_template(
3457                        phys_count,
3458                        &null_slots,
3459                    ))
3460                } else {
3461                    None
3462                }
3463            };
3464            let is_trivial_fast_eligible = !insert_has_subquery(stmt)
3465                && !ts.columns.iter().any(|c| c.default_expr.is_some())
3466                && !ts.has_checks()
3467                && !has_fks
3468                && !has_indices
3469                && stmt.on_conflict.is_none()
3470                && stmt.returning.is_none()
3471                && bind_plan.is_some()
3472                && row_encoder.is_some()
3473                && row_fully_overwritten
3474                && single_int_pk
3475                && generated_fast_evals
3476                    .iter()
3477                    .all(|fe| !matches!(fe, FastGenEval::None));
3478            let trivial_fast_program = if is_trivial_fast_eligible {
3479                build_trivial_fast_program(
3480                    bind_plan.as_ref().unwrap(),
3481                    row_encoder.as_ref().unwrap(),
3482                    &non_virtual_pairs,
3483                    &generated_col_positions,
3484                    &generated_fast_evals,
3485                    &pk_indices,
3486                    &ts.columns,
3487                )
3488            } else {
3489                None
3490            };
3491            let is_trivial_fast = trivial_fast_program.is_some();
3492            let has_checks = ts.has_checks();
3493            let any_defaults = ts.columns.iter().any(|c| c.default_expr.is_some());
3494            let needs_scoped_params = bind_plan.is_none()
3495                || has_checks
3496                || any_defaults
3497                || !generated_col_positions.is_empty()
3498                || on_conflict.is_some()
3499                || stmt.returning.is_some()
3500                || insert_has_subquery(stmt)
3501                || super::helpers::any_partial_index(ts);
3502            Some(InsertCache {
3503                col_indices,
3504                has_subquery: insert_has_subquery(stmt),
3505                any_defaults,
3506                has_checks,
3507                on_conflict,
3508                row_col_map,
3509                generated_col_positions,
3510                generated_fast_evals,
3511                pk_indices,
3512                non_pk_indices,
3513                encoding_positions,
3514                dropped_non_pk_slots,
3515                phys_count,
3516                single_int_pk,
3517                not_null_indices,
3518                bind_plan,
3519                row_fully_overwritten,
3520                row_encoder,
3521                is_trivial_fast,
3522                trivial_fast_program,
3523                needs_scoped_params,
3524            })
3525        } else if schema.get_view(&lower).is_some() {
3526            None
3527        } else {
3528            return None;
3529        };
3530        Some(Self {
3531            table_lower: lower,
3532            cached,
3533        })
3534    }
3535}
3536
3537impl CompiledPlan for CompiledInsert {
3538    fn execute(
3539        &self,
3540        db: &Database,
3541        schema: &SchemaManager,
3542        stmt: &Statement,
3543        params: &[Value],
3544        txn: super::compile::ActiveTxnRef<'_, '_>,
3545    ) -> Result<ExecutionResult> {
3546        let ins = match stmt {
3547            Statement::Insert(i) => i,
3548            _ => {
3549                return Err(SqlError::Unsupported(
3550                    "CompiledInsert received non-INSERT statement".into(),
3551                ))
3552            }
3553        };
3554        use super::compile::ActiveTxnRef;
3555        match txn {
3556            ActiveTxnRef::None => exec_insert(db, schema, ins, params),
3557            ActiveTxnRef::Read(_) => Err(SqlError::Unsupported(
3558                "cannot execute mutating statement inside a read-only transaction".into(),
3559            )),
3560            ActiveTxnRef::Write(outer) => match self.cached.as_ref() {
3561                Some(c) if c.is_trivial_fast => with_insert_scratch(|bufs| {
3562                    exec_insert_trivial_fast(outer, &self.table_lower, c, bufs, params)
3563                }),
3564                Some(c) => exec_insert_in_txn_cached(outer, schema, ins, params, c),
3565                None => exec_insert_in_txn(outer, schema, ins, params),
3566            },
3567        }
3568    }
3569
3570    fn uses_scoped_params(&self) -> bool {
3571        match self.cached.as_ref() {
3572            Some(c) => !c.is_trivial_fast && c.needs_scoped_params,
3573            None => true,
3574        }
3575    }
3576}
3577
3578pub struct CompiledDelete {
3579    table_lower: String,
3580}
3581
3582impl CompiledDelete {
3583    pub fn try_compile(schema: &SchemaManager, stmt: &DeleteStmt) -> Option<Self> {
3584        let lower = stmt.table.to_ascii_lowercase();
3585        schema.get(&lower)?;
3586        Some(Self { table_lower: lower })
3587    }
3588}
3589
3590impl CompiledPlan for CompiledDelete {
3591    fn execute(
3592        &self,
3593        db: &Database,
3594        schema: &SchemaManager,
3595        stmt: &Statement,
3596        _params: &[Value],
3597        txn: super::compile::ActiveTxnRef<'_, '_>,
3598    ) -> Result<ExecutionResult> {
3599        let del = match stmt {
3600            Statement::Delete(d) => d,
3601            _ => {
3602                return Err(SqlError::Unsupported(
3603                    "CompiledDelete received non-DELETE statement".into(),
3604                ))
3605            }
3606        };
3607        let _ = &self.table_lower;
3608        use super::compile::ActiveTxnRef;
3609        match txn {
3610            ActiveTxnRef::None => super::write::exec_delete(db, schema, del),
3611            ActiveTxnRef::Read(_) => Err(SqlError::Unsupported(
3612                "cannot execute mutating statement inside a read-only transaction".into(),
3613            )),
3614            ActiveTxnRef::Write(outer) => super::write::exec_delete_in_txn(outer, schema, del),
3615        }
3616    }
3617}
3618
3619fn exec_instead_of_view_insert_auto(
3620    db: &Database,
3621    schema: &SchemaManager,
3622    view_name: &str,
3623    aliases: &[String],
3624    stmt: &InsertStmt,
3625    params: &[Value],
3626) -> Result<ExecutionResult> {
3627    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
3628    let r = exec_instead_of_view_insert_in_txn(&mut wtx, schema, view_name, aliases, stmt, params)?;
3629    wtx.commit().map_err(SqlError::Storage)?;
3630    Ok(r)
3631}
3632
3633fn exec_instead_of_view_insert_in_txn(
3634    wtx: &mut WriteTxn<'_>,
3635    schema: &SchemaManager,
3636    view_name: &str,
3637    aliases: &[String],
3638    stmt: &InsertStmt,
3639    params: &[Value],
3640) -> Result<ExecutionResult> {
3641    // CREATE VIEW without explicit aliases stores an empty vec — derive at runtime.
3642    let resolved_aliases: Vec<String> = if aliases.is_empty() {
3643        derive_view_columns(wtx, schema, view_name)?
3644    } else {
3645        aliases.to_vec()
3646    };
3647    let view_cols = super::triggers::view_columns_from_aliases(&resolved_aliases);
3648    let alias_map: rustc_hash::FxHashMap<String, usize> = resolved_aliases
3649        .iter()
3650        .enumerate()
3651        .map(|(i, name)| (name.to_ascii_lowercase(), i))
3652        .collect();
3653
3654    let target_positions: Vec<usize> = if stmt.columns.is_empty() {
3655        (0..resolved_aliases.len()).collect()
3656    } else {
3657        stmt.columns
3658            .iter()
3659            .map(|c| {
3660                alias_map
3661                    .get(&c.to_ascii_lowercase())
3662                    .copied()
3663                    .ok_or_else(|| SqlError::ColumnNotFound(c.clone()))
3664            })
3665            .collect::<Result<_>>()?
3666    };
3667
3668    let source_rows: Vec<Vec<Value>> = match &stmt.source {
3669        InsertSource::Values(rows) => {
3670            let mut out = Vec::with_capacity(rows.len());
3671            for row in rows {
3672                if row.len() != target_positions.len() {
3673                    return Err(SqlError::InvalidValue(format!(
3674                        "expected {} values, got {}",
3675                        target_positions.len(),
3676                        row.len()
3677                    )));
3678                }
3679                let mut vals = Vec::with_capacity(row.len());
3680                for expr in row {
3681                    let v = match expr {
3682                        Expr::Parameter(n) => params
3683                            .get(n - 1)
3684                            .cloned()
3685                            .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
3686                        Expr::Literal(v) => v.clone(),
3687                        other => eval_const_expr(other)?,
3688                    };
3689                    vals.push(v);
3690                }
3691                out.push(vals);
3692            }
3693            out
3694        }
3695        InsertSource::Select(sq) => {
3696            let empty_ctes = CteContext::default();
3697            let qr = exec_query_body_write(wtx, schema, &sq.body, &empty_ctes)?;
3698            qr.rows
3699        }
3700    };
3701
3702    let mut count: u64 = 0;
3703    for row in source_rows {
3704        if row.len() != target_positions.len() {
3705            return Err(SqlError::InvalidValue(format!(
3706                "expected {} values, got {}",
3707                target_positions.len(),
3708                row.len()
3709            )));
3710        }
3711        let mut new_row = vec![Value::Null; resolved_aliases.len()];
3712        for (slot, val) in target_positions.iter().zip(row) {
3713            new_row[*slot] = val;
3714        }
3715        super::triggers::fire_row_triggers(
3716            wtx,
3717            schema,
3718            view_name,
3719            crate::parser::TriggerTiming::InsteadOf,
3720            super::triggers::FireEvent::Insert,
3721            None,
3722            Some(new_row),
3723            &view_cols,
3724        )?;
3725        count += 1;
3726    }
3727    Ok(ExecutionResult::RowsAffected(count))
3728}
3729
3730fn derive_view_columns(
3731    wtx: &mut WriteTxn<'_>,
3732    schema: &SchemaManager,
3733    view_name: &str,
3734) -> Result<Vec<String>> {
3735    use crate::parser::{QueryBody, SelectColumn, SelectQuery, SelectStmt};
3736    let sel = SelectStmt {
3737        columns: vec![SelectColumn::AllColumns],
3738        from: view_name.to_string(),
3739        from_alias: None,
3740        from_subquery: None,
3741        from_args: None,
3742        from_json_table: None,
3743        joins: vec![],
3744        distinct: false,
3745        where_clause: None,
3746        order_by: vec![],
3747        limit: Some(Expr::Literal(Value::Integer(1))),
3748        offset: None,
3749        group_by: vec![],
3750        having: None,
3751    };
3752    let sq = SelectQuery {
3753        ctes: vec![],
3754        recursive: false,
3755        body: QueryBody::Select(Box::new(sel)),
3756    };
3757    let qr = super::cte::exec_select_query_in_txn(wtx, schema, &sq)?;
3758    match qr {
3759        ExecutionResult::Query(q) => Ok(q.columns),
3760        _ => Ok(Vec::new()),
3761    }
3762}
3763
3764#[cfg(test)]
3765#[path = "dml_tests.rs"]
3766mod tests;