Skip to main content

citadel_sql/executor/
dml.rs

1use std::cell::RefCell;
2use std::sync::Arc;
3
4use citadel::Database;
5use citadel_buffer::btree::{UpsertAction, UpsertOutcome};
6use citadel_txn::read_txn::ReadTxn;
7use citadel_txn::write_txn::WriteTxn;
8use rustc_hash::FxHashMap;
9
10use crate::encoding::{encode_composite_key_into, encode_row_into};
11use crate::error::{Result, SqlError};
12use crate::eval::{eval_expr, is_truthy, ColumnMap, EvalCtx};
13use crate::parser::*;
14use crate::types::*;
15
16use crate::schema::SchemaManager;
17
18use super::compile::CompiledPlan;
19use super::helpers::*;
20use super::CteContext;
21
22/// Classify an INSERT for cache invalidation: a pure append (single-INTEGER pk,
23/// no conflict) stays append-retainable; anything else hard-invalidates.
24fn mark_insert_dml(
25    schema: &SchemaManager,
26    table_name: &str,
27    on_conflict: bool,
28    single_int_pk: bool,
29    min_inserted_pk: Option<i64>,
30    rows_written: u64,
31) {
32    if on_conflict {
33        schema.mark_dml(table_name);
34    } else if single_int_pk {
35        if let Some(m) = min_inserted_pk {
36            schema.mark_dml_append(table_name, m);
37        }
38    } else if rows_written > 0 {
39        schema.mark_dml(table_name);
40    }
41}
42
43/// Single INTEGER pk - the only shape an ANN plan indexes (and append-retains).
44fn is_single_int_pk(table_schema: &TableSchema) -> bool {
45    table_schema.primary_key_columns.len() == 1
46        && matches!(
47            table_schema.columns[table_schema.primary_key_columns[0] as usize].data_type,
48            DataType::Integer
49        )
50}
51
52pub(super) fn exec_insert(
53    db: &Database,
54    schema: &SchemaManager,
55    stmt: &InsertStmt,
56    params: &[Value],
57) -> Result<ExecutionResult> {
58    let empty_ctes = CteContext::default();
59    let materialized;
60    let stmt = if insert_has_subquery(stmt) {
61        materialized = materialize_insert(stmt, &mut |sub| {
62            exec_subquery_read(db, schema, sub, &empty_ctes)
63        })?;
64        &materialized
65    } else {
66        stmt
67    };
68
69    let lower_name = stmt.table.to_ascii_lowercase();
70    if let Some(view_def) = schema.get_view(&lower_name) {
71        if super::triggers::has_instead_of(schema, &lower_name, super::triggers::FireEvent::Insert)
72        {
73            let aliases = view_def.column_aliases.clone();
74            return exec_instead_of_view_insert_auto(
75                db,
76                schema,
77                &lower_name,
78                &aliases,
79                stmt,
80                params,
81            );
82        }
83        return Err(SqlError::CannotModifyView(stmt.table.clone()));
84    }
85    if schema.get_matview(&lower_name).is_some() {
86        return Err(SqlError::CannotModifyView(format!(
87            "materialized view '{}' is read-only — use REFRESH MATERIALIZED VIEW",
88            stmt.table
89        )));
90    }
91    let table_schema = schema
92        .get(&lower_name)
93        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
94
95    let insert_columns = if stmt.columns.is_empty() {
96        table_schema
97            .columns
98            .iter()
99            .map(|c| c.name.clone())
100            .collect::<Vec<_>>()
101    } else {
102        stmt.columns
103            .iter()
104            .map(|c| c.to_ascii_lowercase())
105            .collect()
106    };
107
108    let col_indices: Vec<usize> = insert_columns
109        .iter()
110        .map(|name| {
111            table_schema
112                .column_index(name)
113                .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
114        })
115        .collect::<Result<_>>()?;
116
117    for &ci in &col_indices {
118        if table_schema.columns[ci].generated_kind.is_some() {
119            return Err(SqlError::CannotInsertIntoGeneratedColumn(
120                table_schema.columns[ci].name.clone(),
121            ));
122        }
123    }
124
125    let defaults: Vec<(usize, &Expr)> = table_schema
126        .columns
127        .iter()
128        .filter(|c| c.default_expr.is_some() && !col_indices.contains(&(c.position as usize)))
129        .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
130        .collect();
131
132    let generated_cols: Vec<(usize, &Expr)> = table_schema
133        .columns
134        .iter()
135        .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
136        .map(|c| (c.position as usize, c.generated_expr.as_ref().unwrap()))
137        .collect();
138
139    let has_checks = table_schema.has_checks();
140    let strict = table_schema.is_strict();
141    let row_col_map_for_gen = if !generated_cols.is_empty() {
142        Some(ColumnMap::new(&table_schema.columns))
143    } else {
144        None
145    };
146    let check_col_map = if has_checks {
147        Some(ColumnMap::new(&table_schema.columns))
148    } else {
149        None
150    };
151
152    let select_rows = match &stmt.source {
153        InsertSource::Select(sq) => {
154            let insert_ctes =
155                super::materialize_all_ctes(&sq.ctes, sq.recursive, &mut |body, ctx| {
156                    exec_query_body_read(db, schema, body, ctx)
157                })?;
158            let qr = exec_query_body_read(db, schema, &sq.body, &insert_ctes)?;
159            Some(qr.rows)
160        }
161        InsertSource::Values(_) => None,
162    };
163
164    let compiled_conflict: Option<Arc<CompiledOnConflict>> = stmt
165        .on_conflict
166        .as_ref()
167        .map(|oc| compile_on_conflict(oc, table_schema).map(Arc::new))
168        .transpose()?;
169
170    let row_col_map = compiled_conflict
171        .as_ref()
172        .map(|_| ColumnMap::new(&table_schema.columns));
173
174    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
175    // DML invalidates the table's persisted ANN segment in the SAME txn
176    // (rollback restores it; commit makes table-changed-but-segment-survives
177    // unrepresentable for this path).
178    super::ann_persist::purge_segment(&mut wtx, &table_schema.name)?;
179    let mut count: u64 = 0;
180    let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
181        stmt.returning.as_ref().map(|_| Vec::new());
182
183    let pk_indices = table_schema.pk_indices();
184    let non_pk = table_schema.non_pk_indices();
185    let enc_pos = table_schema.encoding_positions();
186    let phys_count = table_schema.physical_non_pk_count();
187    let mut row = vec![Value::Null; table_schema.columns.len()];
188    let mut pk_values: Vec<Value> = vec![Value::Null; pk_indices.len()];
189    let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
190    let mut key_buf: Vec<u8> = Vec::with_capacity(64);
191    let mut value_buf: Vec<u8> = Vec::with_capacity(256);
192    let mut fk_key_buf: Vec<u8> = Vec::with_capacity(64);
193
194    let values = match &stmt.source {
195        InsertSource::Values(rows) => Some(rows.as_slice()),
196        InsertSource::Select(_) => None,
197    };
198    let sel_rows = select_rows.as_deref();
199
200    let total = match (values, sel_rows) {
201        (Some(rows), _) => rows.len(),
202        (_, Some(rows)) => rows.len(),
203        _ => 0,
204    };
205
206    if let Some(sel) = sel_rows {
207        if !sel.is_empty() && sel[0].len() != insert_columns.len() {
208            return Err(SqlError::InvalidValue(format!(
209                "INSERT ... SELECT column count mismatch: expected {}, got {}",
210                insert_columns.len(),
211                sel[0].len()
212            )));
213        }
214    }
215
216    let has_insert_statement_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
217        t.enabled
218            && t.granularity == crate::parser::TriggerGranularity::ForEachStatement
219            && t.events
220                .iter()
221                .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
222    });
223    let mut stmt_new_rows: Vec<Vec<Value>> = if has_insert_statement_triggers {
224        Vec::with_capacity(total)
225    } else {
226        Vec::new()
227    };
228
229    if has_insert_statement_triggers {
230        super::triggers::fire_statement_triggers(
231            &mut wtx,
232            schema,
233            &table_schema.name,
234            crate::parser::TriggerTiming::Before,
235            super::triggers::FireEvent::Insert,
236            &table_schema.columns,
237            &[],
238            &[],
239        )?;
240    }
241
242    let plain_insert = compiled_conflict.is_none();
243    let single_int_pk = is_single_int_pk(table_schema);
244    let mut min_inserted_pk: Option<i64> = None;
245
246    for idx in 0..total {
247        for v in row.iter_mut() {
248            *v = Value::Null;
249        }
250
251        if let Some(value_rows) = values {
252            let value_row = &value_rows[idx];
253            if value_row.len() != insert_columns.len() {
254                return Err(SqlError::InvalidValue(format!(
255                    "expected {} values, got {}",
256                    insert_columns.len(),
257                    value_row.len()
258                )));
259            }
260            for (i, expr) in value_row.iter().enumerate() {
261                let val = if let Expr::Parameter(n) = expr {
262                    params
263                        .get(n - 1)
264                        .cloned()
265                        .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?
266                } else {
267                    eval_const_expr(expr)?
268                };
269                let col_idx = col_indices[i];
270                let col = &table_schema.columns[col_idx];
271                row[col_idx] = if val.is_null() {
272                    Value::Null
273                } else {
274                    coerce_for_column(val, col, strict)?
275                };
276            }
277        } else if let Some(sel) = sel_rows {
278            let sel_row = &sel[idx];
279            for (i, val) in sel_row.iter().enumerate() {
280                let col_idx = col_indices[i];
281                let col = &table_schema.columns[col_idx];
282                row[col_idx] = if val.is_null() {
283                    Value::Null
284                } else {
285                    coerce_for_column(val.clone(), col, strict)?
286                };
287            }
288        }
289
290        for &(pos, def_expr) in &defaults {
291            let val = eval_const_expr(def_expr)?;
292            let col = &table_schema.columns[pos];
293            if !val.is_null() {
294                row[pos] = coerce_for_column(val, col, strict)?;
295            }
296        }
297
298        if let Some(ref gen_map) = row_col_map_for_gen {
299            for &(pos, gen_expr) in &generated_cols {
300                let val = eval_expr(gen_expr, &EvalCtx::new(gen_map, &row))?;
301                let col = &table_schema.columns[pos];
302                row[pos] = if val.is_null() {
303                    Value::Null
304                } else {
305                    coerce_for_column(val, col, strict)?
306                };
307            }
308        }
309
310        for col in &table_schema.columns {
311            if !col.nullable && row[col.position as usize].is_null() {
312                return Err(SqlError::NotNullViolation(col.name.clone()));
313            }
314        }
315
316        if let Some(ref col_map) = check_col_map {
317            for col in &table_schema.columns {
318                if let Some(ref check) = col.check_expr {
319                    let result = eval_expr(check, &EvalCtx::new(col_map, &row))?;
320                    if !is_truthy(&result) && !result.is_null() {
321                        let name = col.check_name.as_deref().unwrap_or(&col.name);
322                        return Err(SqlError::CheckViolation(name.to_string()));
323                    }
324                }
325            }
326            for tc in &table_schema.check_constraints {
327                let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &row))?;
328                if !is_truthy(&result) && !result.is_null() {
329                    let name = tc.name.as_deref().unwrap_or(&tc.sql);
330                    return Err(SqlError::CheckViolation(name.to_string()));
331                }
332            }
333        }
334
335        for fk in &table_schema.foreign_keys {
336            let any_null = fk.columns.iter().any(|&ci| row[ci as usize].is_null());
337            if any_null {
338                continue; // MATCH SIMPLE: skip if any FK col is NULL
339            }
340            let fk_vals: Vec<Value> = fk
341                .columns
342                .iter()
343                .map(|&ci| row[ci as usize].clone())
344                .collect();
345            fk_key_buf.clear();
346            encode_composite_key_into(&fk_vals, &mut fk_key_buf);
347            if fk.deferrable && fk.initially_deferred {
348                let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
349                wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
350                    fk_name: name,
351                    foreign_table: fk.foreign_table.as_bytes().to_vec(),
352                    parent_key: fk_key_buf.clone(),
353                });
354                continue;
355            }
356            if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key_buf) {
357                let found = wtx
358                    .table_get(fk.foreign_table.as_bytes(), &fk_key_buf)
359                    .map_err(SqlError::Storage)?;
360                if found.is_none() {
361                    let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
362                    return Err(SqlError::ForeignKeyViolation(name.to_string()));
363                }
364                wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key_buf);
365            }
366        }
367
368        let proposed_row_for_returning: Option<Vec<Value>> =
369            returning_rows.as_ref().map(|_| row.clone());
370        let row_for_stmt_trigger: Option<Vec<Value>> = if has_insert_statement_triggers {
371            Some(row.clone())
372        } else {
373            None
374        };
375
376        let has_before_insert_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
377            t.enabled
378                && t.timing == crate::parser::TriggerTiming::Before
379                && t.granularity == crate::parser::TriggerGranularity::ForEachRow
380                && t.events
381                    .iter()
382                    .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
383        });
384        if has_before_insert_triggers {
385            super::triggers::fire_row_triggers(
386                &mut wtx,
387                schema,
388                &table_schema.name,
389                crate::parser::TriggerTiming::Before,
390                super::triggers::FireEvent::Insert,
391                None,
392                Some(row.clone()),
393                &table_schema.columns,
394            )?;
395        }
396
397        for (j, &i) in pk_indices.iter().enumerate() {
398            pk_values[j] = std::mem::replace(&mut row[i], Value::Null);
399        }
400        encode_composite_key_into(&pk_values, &mut key_buf);
401        if plain_insert && single_int_pk {
402            if let Value::Integer(id) = &pk_values[0] {
403                min_inserted_pk = Some(min_inserted_pk.map_or(*id, |m| m.min(*id)));
404            }
405        }
406
407        for (j, &i) in non_pk.iter().enumerate() {
408            let col = &table_schema.columns[i];
409            if matches!(
410                col.generated_kind,
411                Some(crate::parser::GeneratedKind::Virtual)
412            ) {
413                value_values[enc_pos[j] as usize] = Value::Null;
414                row[i] = Value::Null;
415            } else {
416                value_values[enc_pos[j] as usize] = std::mem::replace(&mut row[i], Value::Null);
417            }
418        }
419        encode_row_into(&value_values, &mut value_buf);
420
421        if key_buf.len() > citadel_core::MAX_KEY_SIZE {
422            return Err(SqlError::KeyTooLarge {
423                size: key_buf.len(),
424                max: citadel_core::MAX_KEY_SIZE,
425            });
426        }
427        if value_buf.len() > citadel_core::MAX_VALUE_SIZE {
428            return Err(SqlError::RowTooLarge {
429                size: value_buf.len(),
430                max: citadel_core::MAX_VALUE_SIZE,
431            });
432        }
433
434        match compiled_conflict.as_ref() {
435            None => {
436                let is_new = wtx
437                    .table_insert(table_schema.name.as_bytes(), &key_buf, &value_buf)
438                    .map_err(SqlError::Storage)?;
439                if !is_new {
440                    return Err(SqlError::DuplicateKey);
441                }
442                let has_after_insert_triggers =
443                    schema.triggers_for(&table_schema.name).iter().any(|t| {
444                        t.enabled
445                            && t.timing == crate::parser::TriggerTiming::After
446                            && t.granularity == crate::parser::TriggerGranularity::ForEachRow
447                            && t.events
448                                .iter()
449                                .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
450                    });
451                if !table_schema.indices.is_empty() || has_after_insert_triggers {
452                    for (j, &i) in pk_indices.iter().enumerate() {
453                        row[i] = pk_values[j].clone();
454                    }
455                    for (j, &i) in non_pk.iter().enumerate() {
456                        row[i] =
457                            std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
458                    }
459                    if !table_schema.indices.is_empty() {
460                        insert_index_entries(&mut wtx, table_schema, &row, &pk_values)?;
461                    }
462                    if has_after_insert_triggers {
463                        super::triggers::fire_row_triggers(
464                            &mut wtx,
465                            schema,
466                            &table_schema.name,
467                            crate::parser::TriggerTiming::After,
468                            super::triggers::FireEvent::Insert,
469                            None,
470                            Some(row.clone()),
471                            &table_schema.columns,
472                        )?;
473                    }
474                }
475                if let Some(r) = row_for_stmt_trigger.clone() {
476                    stmt_new_rows.push(r);
477                }
478                count += 1;
479                if let Some(buf) = returning_rows.as_mut() {
480                    buf.push((None, proposed_row_for_returning));
481                }
482            }
483            Some(oc) => {
484                let oc_ref: &CompiledOnConflict = oc;
485                let needs_row = upsert_needs_row(oc_ref, table_schema);
486                if needs_row {
487                    for (j, &i) in pk_indices.iter().enumerate() {
488                        row[i] = pk_values[j].clone();
489                    }
490                    for (j, &i) in non_pk.iter().enumerate() {
491                        row[i] =
492                            std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
493                    }
494                }
495                let outcome = apply_insert_with_conflict(
496                    &mut wtx,
497                    table_schema,
498                    &key_buf,
499                    &value_buf,
500                    &row,
501                    &pk_values,
502                    oc_ref,
503                    row_col_map.as_ref().unwrap(),
504                    stmt.returning.is_some(),
505                )?;
506                match outcome {
507                    InsertRowOutcome::Inserted => {
508                        count += 1;
509                        if let Some(buf) = returning_rows.as_mut() {
510                            buf.push((None, proposed_row_for_returning));
511                        }
512                        if let Some(r) = row_for_stmt_trigger.clone() {
513                            stmt_new_rows.push(r);
514                        }
515                        let has_after_insert_triggers =
516                            schema.triggers_for(&table_schema.name).iter().any(|t| {
517                                t.enabled
518                                    && t.timing == crate::parser::TriggerTiming::After
519                                    && t.granularity
520                                        == crate::parser::TriggerGranularity::ForEachRow
521                                    && t.events
522                                        .iter()
523                                        .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
524                            });
525                        if has_after_insert_triggers {
526                            super::triggers::fire_row_triggers(
527                                &mut wtx,
528                                schema,
529                                &table_schema.name,
530                                crate::parser::TriggerTiming::After,
531                                super::triggers::FireEvent::Insert,
532                                None,
533                                Some(row.clone()),
534                                &table_schema.columns,
535                            )?;
536                        }
537                    }
538                    InsertRowOutcome::Updated { old, new } => {
539                        count += 1;
540                        if let Some(buf) = returning_rows.as_mut() {
541                            buf.push((Some(old.clone()), Some(new.clone())));
542                        }
543                        let has_after_update_triggers =
544                            schema.triggers_for(&table_schema.name).iter().any(|t| {
545                                t.enabled
546                                    && t.timing == crate::parser::TriggerTiming::After
547                                    && t.granularity
548                                        == crate::parser::TriggerGranularity::ForEachRow
549                                    && t.events.iter().any(|e| {
550                                        matches!(e, crate::parser::TriggerEvent::Update(_))
551                                    })
552                            });
553                        if has_after_update_triggers {
554                            let changed_cols: Vec<String> = match oc_ref {
555                                CompiledOnConflict::DoUpdate { assignments, .. } => assignments
556                                    .iter()
557                                    .map(|(col_idx, _)| table_schema.columns[*col_idx].name.clone())
558                                    .collect(),
559                                _ => Vec::new(),
560                            };
561                            super::triggers::fire_row_triggers(
562                                &mut wtx,
563                                schema,
564                                &table_schema.name,
565                                crate::parser::TriggerTiming::After,
566                                super::triggers::FireEvent::Update {
567                                    changed_columns: &changed_cols,
568                                },
569                                Some(old),
570                                Some(new),
571                                &table_schema.columns,
572                            )?;
573                        }
574                    }
575                    InsertRowOutcome::Skipped => {}
576                }
577            }
578        }
579    }
580
581    if has_insert_statement_triggers {
582        super::triggers::fire_statement_triggers(
583            &mut wtx,
584            schema,
585            &table_schema.name,
586            crate::parser::TriggerTiming::After,
587            super::triggers::FireEvent::Insert,
588            &table_schema.columns,
589            &[],
590            &stmt_new_rows,
591        )?;
592    }
593
594    mark_insert_dml(
595        schema,
596        &table_schema.name,
597        !plain_insert,
598        single_int_pk,
599        min_inserted_pk,
600        count,
601    );
602
603    if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
604        let qr = super::helpers::project_returning(table_schema, returning_cols, &rows)?;
605        super::helpers::drain_deferred_fk_checks(&mut wtx)?;
606        wtx.commit().map_err(SqlError::Storage)?;
607        return Ok(ExecutionResult::Query(qr));
608    }
609
610    super::helpers::drain_deferred_fk_checks(&mut wtx)?;
611    wtx.commit().map_err(SqlError::Storage)?;
612    Ok(ExecutionResult::RowsAffected(count))
613}
614
615pub(super) fn has_subquery(expr: &Expr) -> bool {
616    crate::parser::has_subquery(expr)
617}
618
619pub(super) fn stmt_has_subquery(stmt: &SelectStmt) -> bool {
620    if let Some(ref w) = stmt.where_clause {
621        if has_subquery(w) {
622            return true;
623        }
624    }
625    if let Some(ref h) = stmt.having {
626        if has_subquery(h) {
627            return true;
628        }
629    }
630    for col in &stmt.columns {
631        if let SelectColumn::Expr { expr, .. } = col {
632            if has_subquery(expr) {
633                return true;
634            }
635        }
636    }
637    for ob in &stmt.order_by {
638        if has_subquery(&ob.expr) {
639            return true;
640        }
641    }
642    for join in &stmt.joins {
643        if let Some(ref on_expr) = join.on_clause {
644            if has_subquery(on_expr) {
645                return true;
646            }
647        }
648    }
649    false
650}
651
652pub(super) fn materialize_expr(
653    expr: &Expr,
654    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
655) -> Result<Expr> {
656    match expr {
657        Expr::InSubquery {
658            expr: e,
659            subquery,
660            negated,
661        } => {
662            let inner = materialize_expr(e, exec_sub)?;
663            let qr = exec_sub(subquery)?;
664            if !qr.columns.is_empty() && qr.columns.len() != 1 {
665                return Err(SqlError::SubqueryMultipleColumns);
666            }
667            let mut values = rustc_hash::FxHashSet::default();
668            let mut has_null = false;
669            for row in &qr.rows {
670                if row[0].is_null() {
671                    has_null = true;
672                } else {
673                    values.insert(row[0].clone());
674                }
675            }
676            Ok(Expr::InSet {
677                expr: Box::new(inner),
678                values,
679                has_null,
680                negated: *negated,
681            })
682        }
683        Expr::ScalarSubquery(subquery) => {
684            let qr = exec_sub(subquery)?;
685            if qr.rows.len() > 1 {
686                return Err(SqlError::SubqueryMultipleRows);
687            }
688            let val = if qr.rows.is_empty() {
689                Value::Null
690            } else {
691                qr.rows[0][0].clone()
692            };
693            Ok(Expr::Literal(val))
694        }
695        Expr::Exists { subquery, negated } => {
696            let qr = exec_sub(subquery)?;
697            let exists = !qr.rows.is_empty();
698            let result = if *negated { !exists } else { exists };
699            Ok(Expr::Literal(Value::Boolean(result)))
700        }
701        Expr::InList {
702            expr: e,
703            list,
704            negated,
705        } => {
706            let inner = materialize_expr(e, exec_sub)?;
707            let items = list
708                .iter()
709                .map(|item| materialize_expr(item, exec_sub))
710                .collect::<Result<Vec<_>>>()?;
711            Ok(Expr::InList {
712                expr: Box::new(inner),
713                list: items,
714                negated: *negated,
715            })
716        }
717        Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
718            left: Box::new(materialize_expr(left, exec_sub)?),
719            op: *op,
720            right: Box::new(materialize_expr(right, exec_sub)?),
721        }),
722        Expr::UnaryOp { op, expr: e } => Ok(Expr::UnaryOp {
723            op: *op,
724            expr: Box::new(materialize_expr(e, exec_sub)?),
725        }),
726        Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(materialize_expr(e, exec_sub)?))),
727        Expr::IsNotNull(e) => Ok(Expr::IsNotNull(Box::new(materialize_expr(e, exec_sub)?))),
728        Expr::InSet {
729            expr: e,
730            values,
731            has_null,
732            negated,
733        } => Ok(Expr::InSet {
734            expr: Box::new(materialize_expr(e, exec_sub)?),
735            values: values.clone(),
736            has_null: *has_null,
737            negated: *negated,
738        }),
739        Expr::Between {
740            expr: e,
741            low,
742            high,
743            negated,
744        } => Ok(Expr::Between {
745            expr: Box::new(materialize_expr(e, exec_sub)?),
746            low: Box::new(materialize_expr(low, exec_sub)?),
747            high: Box::new(materialize_expr(high, exec_sub)?),
748            negated: *negated,
749        }),
750        Expr::Like {
751            expr: e,
752            pattern,
753            escape,
754            negated,
755        } => {
756            let esc = escape
757                .as_ref()
758                .map(|es| materialize_expr(es, exec_sub).map(Box::new))
759                .transpose()?;
760            Ok(Expr::Like {
761                expr: Box::new(materialize_expr(e, exec_sub)?),
762                pattern: Box::new(materialize_expr(pattern, exec_sub)?),
763                escape: esc,
764                negated: *negated,
765            })
766        }
767        Expr::Case {
768            operand,
769            conditions,
770            else_result,
771        } => {
772            let op = operand
773                .as_ref()
774                .map(|e| materialize_expr(e, exec_sub).map(Box::new))
775                .transpose()?;
776            let conds = conditions
777                .iter()
778                .map(|(c, r)| {
779                    Ok((
780                        materialize_expr(c, exec_sub)?,
781                        materialize_expr(r, exec_sub)?,
782                    ))
783                })
784                .collect::<Result<Vec<_>>>()?;
785            let else_r = else_result
786                .as_ref()
787                .map(|e| materialize_expr(e, exec_sub).map(Box::new))
788                .transpose()?;
789            Ok(Expr::Case {
790                operand: op,
791                conditions: conds,
792                else_result: else_r,
793            })
794        }
795        Expr::Coalesce(args) => {
796            let materialized = args
797                .iter()
798                .map(|a| materialize_expr(a, exec_sub))
799                .collect::<Result<Vec<_>>>()?;
800            Ok(Expr::Coalesce(materialized))
801        }
802        Expr::Cast { expr: e, data_type } => Ok(Expr::Cast {
803            expr: Box::new(materialize_expr(e, exec_sub)?),
804            data_type: *data_type,
805        }),
806        Expr::Function {
807            name,
808            args,
809            distinct,
810        } => {
811            let materialized = args
812                .iter()
813                .map(|a| materialize_expr(a, exec_sub))
814                .collect::<Result<Vec<_>>>()?;
815            Ok(Expr::Function {
816                name: name.clone(),
817                args: materialized,
818                distinct: *distinct,
819            })
820        }
821        other => Ok(other.clone()),
822    }
823}
824
825pub(super) fn materialize_stmt(
826    stmt: &SelectStmt,
827    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
828) -> Result<SelectStmt> {
829    let where_clause = stmt
830        .where_clause
831        .as_ref()
832        .map(|e| materialize_expr(e, exec_sub))
833        .transpose()?;
834    let having = stmt
835        .having
836        .as_ref()
837        .map(|e| materialize_expr(e, exec_sub))
838        .transpose()?;
839    let columns = stmt
840        .columns
841        .iter()
842        .map(|c| match c {
843            SelectColumn::AllColumns => Ok(SelectColumn::AllColumns),
844            SelectColumn::AllFromOld => Ok(SelectColumn::AllFromOld),
845            SelectColumn::AllFromNew => Ok(SelectColumn::AllFromNew),
846            SelectColumn::Expr { expr, alias } => Ok(SelectColumn::Expr {
847                expr: materialize_expr(expr, exec_sub)?,
848                alias: alias.clone(),
849            }),
850        })
851        .collect::<Result<Vec<_>>>()?;
852    let order_by = stmt
853        .order_by
854        .iter()
855        .map(|ob| {
856            Ok(OrderByItem {
857                expr: materialize_expr(&ob.expr, exec_sub)?,
858                descending: ob.descending,
859                nulls_first: ob.nulls_first,
860            })
861        })
862        .collect::<Result<Vec<_>>>()?;
863    let joins = stmt
864        .joins
865        .iter()
866        .map(|j| {
867            let on_clause = j
868                .on_clause
869                .as_ref()
870                .map(|e| materialize_expr(e, exec_sub))
871                .transpose()?;
872            Ok(JoinClause {
873                join_type: j.join_type,
874                table: j.table.clone(),
875                subquery: j.subquery.clone(),
876                on_clause,
877            })
878        })
879        .collect::<Result<Vec<_>>>()?;
880    let group_by = stmt
881        .group_by
882        .iter()
883        .map(|e| materialize_expr(e, exec_sub))
884        .collect::<Result<Vec<_>>>()?;
885    Ok(SelectStmt {
886        columns,
887        from: stmt.from.clone(),
888        from_alias: stmt.from_alias.clone(),
889        from_subquery: stmt.from_subquery.clone(),
890        from_args: stmt.from_args.clone(),
891        from_json_table: stmt.from_json_table.clone(),
892        joins,
893        distinct: stmt.distinct,
894        where_clause,
895        order_by,
896        limit: stmt.limit.clone(),
897        offset: stmt.offset.clone(),
898        group_by,
899        having,
900    })
901}
902
903pub(super) fn exec_subquery_read(
904    db: &Database,
905    schema: &SchemaManager,
906    stmt: &SelectStmt,
907    ctes: &CteContext,
908) -> Result<QueryResult> {
909    let mut rtx = db.begin_read();
910    exec_subquery_with_read(&mut rtx, schema, stmt, ctes)
911}
912
913pub(super) fn exec_subquery_with_read(
914    rtx: &mut ReadTxn<'_>,
915    schema: &SchemaManager,
916    stmt: &SelectStmt,
917    ctes: &CteContext,
918) -> Result<QueryResult> {
919    match super::exec_select_with_read(rtx, schema, stmt, ctes)? {
920        ExecutionResult::Query(qr) => Ok(qr),
921        _ => Ok(QueryResult {
922            columns: vec![],
923            rows: vec![],
924        }),
925    }
926}
927
928pub(super) fn exec_subquery_write(
929    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
930    schema: &SchemaManager,
931    stmt: &SelectStmt,
932    ctes: &CteContext,
933) -> Result<QueryResult> {
934    match super::exec_select_in_txn(wtx, schema, stmt, ctes)? {
935        ExecutionResult::Query(qr) => Ok(qr),
936        _ => Ok(QueryResult {
937            columns: vec![],
938            rows: vec![],
939        }),
940    }
941}
942
943pub(super) fn update_has_subquery(stmt: &UpdateStmt) -> bool {
944    stmt.where_clause.as_ref().is_some_and(has_subquery)
945        || stmt.assignments.iter().any(|(_, e)| has_subquery(e))
946}
947
948pub(super) fn materialize_update(
949    stmt: &UpdateStmt,
950    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
951) -> Result<UpdateStmt> {
952    let where_clause = stmt
953        .where_clause
954        .as_ref()
955        .map(|e| materialize_expr(e, exec_sub))
956        .transpose()?;
957    let assignments = stmt
958        .assignments
959        .iter()
960        .map(|(name, expr)| Ok((name.clone(), materialize_expr(expr, exec_sub)?)))
961        .collect::<Result<Vec<_>>>()?;
962    Ok(UpdateStmt {
963        table: stmt.table.clone(),
964        assignments,
965        where_clause,
966        returning: stmt.returning.clone(),
967    })
968}
969
970pub(super) fn delete_has_subquery(stmt: &DeleteStmt) -> bool {
971    stmt.where_clause.as_ref().is_some_and(has_subquery)
972}
973
974pub(super) fn materialize_delete(
975    stmt: &DeleteStmt,
976    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
977) -> Result<DeleteStmt> {
978    let where_clause = stmt
979        .where_clause
980        .as_ref()
981        .map(|e| materialize_expr(e, exec_sub))
982        .transpose()?;
983    Ok(DeleteStmt {
984        table: stmt.table.clone(),
985        where_clause,
986        returning: stmt.returning.clone(),
987    })
988}
989
990pub(super) fn insert_has_subquery(stmt: &InsertStmt) -> bool {
991    match &stmt.source {
992        InsertSource::Values(rows) => rows.iter().any(|row| row.iter().any(has_subquery)),
993        // SELECT source subqueries are handled by exec_select's correlated/non-correlated paths
994        InsertSource::Select(_) => false,
995    }
996}
997
998pub(super) fn materialize_insert(
999    stmt: &InsertStmt,
1000    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1001) -> Result<InsertStmt> {
1002    let source = match &stmt.source {
1003        InsertSource::Values(rows) => {
1004            let mat = rows
1005                .iter()
1006                .map(|row| {
1007                    row.iter()
1008                        .map(|e| materialize_expr(e, exec_sub))
1009                        .collect::<Result<Vec<_>>>()
1010                })
1011                .collect::<Result<Vec<_>>>()?;
1012            InsertSource::Values(mat)
1013        }
1014        InsertSource::Select(sq) => {
1015            let ctes = sq
1016                .ctes
1017                .iter()
1018                .map(|c| {
1019                    Ok(CteDefinition {
1020                        name: c.name.clone(),
1021                        column_aliases: c.column_aliases.clone(),
1022                        body: materialize_query_body(&c.body, exec_sub)?,
1023                    })
1024                })
1025                .collect::<Result<Vec<_>>>()?;
1026            let body = materialize_query_body(&sq.body, exec_sub)?;
1027            InsertSource::Select(Box::new(SelectQuery {
1028                ctes,
1029                recursive: sq.recursive,
1030                body,
1031            }))
1032        }
1033    };
1034    Ok(InsertStmt {
1035        table: stmt.table.clone(),
1036        columns: stmt.columns.clone(),
1037        source,
1038        on_conflict: stmt.on_conflict.clone(),
1039        returning: stmt.returning.clone(),
1040    })
1041}
1042
1043pub(super) fn materialize_query_body(
1044    body: &QueryBody,
1045    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1046) -> Result<QueryBody> {
1047    match body {
1048        QueryBody::Select(sel) => Ok(QueryBody::Select(Box::new(materialize_stmt(
1049            sel, exec_sub,
1050        )?))),
1051        QueryBody::Compound(comp) => Ok(QueryBody::Compound(Box::new(CompoundSelect {
1052            op: comp.op.clone(),
1053            all: comp.all,
1054            left: Box::new(materialize_query_body(&comp.left, exec_sub)?),
1055            right: Box::new(materialize_query_body(&comp.right, exec_sub)?),
1056            order_by: comp.order_by.clone(),
1057            limit: comp.limit.clone(),
1058            offset: comp.offset.clone(),
1059        }))),
1060        QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Ok(body.clone()),
1061    }
1062}
1063
1064pub(super) fn exec_query_body_with_read(
1065    rtx: &mut ReadTxn<'_>,
1066    schema: &SchemaManager,
1067    body: &QueryBody,
1068    ctes: &CteContext,
1069) -> Result<ExecutionResult> {
1070    match body {
1071        QueryBody::Select(sel) => super::exec_select_with_read(rtx, schema, sel, ctes),
1072        QueryBody::Compound(comp) => exec_compound_select_with_read(rtx, schema, comp, ctes),
1073        QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Err(
1074            SqlError::Unsupported("DML CTE bodies require an active write transaction".into()),
1075        ),
1076    }
1077}
1078
1079pub(super) fn exec_query_body_in_txn(
1080    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1081    schema: &SchemaManager,
1082    body: &QueryBody,
1083    ctes: &CteContext,
1084) -> Result<ExecutionResult> {
1085    match body {
1086        QueryBody::Select(sel) => super::exec_select_in_txn(wtx, schema, sel, ctes),
1087        QueryBody::Compound(comp) => exec_compound_select_in_txn(wtx, schema, comp, ctes),
1088        QueryBody::Insert(ins) => exec_insert_in_txn_with_ctes(wtx, schema, ins, &[], ctes),
1089        QueryBody::Update(upd) => super::exec_update_in_txn(wtx, schema, upd),
1090        QueryBody::Delete(del) => super::exec_delete_in_txn(wtx, schema, del),
1091    }
1092}
1093
1094pub(super) fn exec_query_body_read(
1095    db: &Database,
1096    schema: &SchemaManager,
1097    body: &QueryBody,
1098    ctes: &CteContext,
1099) -> Result<QueryResult> {
1100    let mut rtx = db.begin_read();
1101    exec_query_body_with_read_qr(&mut rtx, schema, body, ctes)
1102}
1103
1104pub(super) fn exec_query_body_with_read_qr(
1105    rtx: &mut ReadTxn<'_>,
1106    schema: &SchemaManager,
1107    body: &QueryBody,
1108    ctes: &CteContext,
1109) -> Result<QueryResult> {
1110    match exec_query_body_with_read(rtx, schema, body, ctes)? {
1111        ExecutionResult::Query(qr) => Ok(qr),
1112        _ => Ok(QueryResult {
1113            columns: vec![],
1114            rows: vec![],
1115        }),
1116    }
1117}
1118
1119pub(super) fn exec_query_body_write(
1120    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1121    schema: &SchemaManager,
1122    body: &QueryBody,
1123    ctes: &CteContext,
1124) -> Result<QueryResult> {
1125    match exec_query_body_in_txn(wtx, schema, body, ctes)? {
1126        ExecutionResult::Query(qr) => Ok(qr),
1127        _ => Ok(QueryResult {
1128            columns: vec![],
1129            rows: vec![],
1130        }),
1131    }
1132}
1133
1134pub(super) fn exec_compound_select_with_read(
1135    rtx: &mut ReadTxn<'_>,
1136    schema: &SchemaManager,
1137    comp: &CompoundSelect,
1138    ctes: &CteContext,
1139) -> Result<ExecutionResult> {
1140    let left_qr = match exec_query_body_with_read(rtx, schema, &comp.left, ctes)? {
1141        ExecutionResult::Query(qr) => qr,
1142        _ => QueryResult {
1143            columns: vec![],
1144            rows: vec![],
1145        },
1146    };
1147    let right_qr = match exec_query_body_with_read(rtx, schema, &comp.right, ctes)? {
1148        ExecutionResult::Query(qr) => qr,
1149        _ => QueryResult {
1150            columns: vec![],
1151            rows: vec![],
1152        },
1153    };
1154    apply_set_operation(comp, left_qr, right_qr)
1155}
1156
1157pub(super) fn exec_compound_select_in_txn(
1158    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1159    schema: &SchemaManager,
1160    comp: &CompoundSelect,
1161    ctes: &CteContext,
1162) -> Result<ExecutionResult> {
1163    let left_qr = match exec_query_body_in_txn(wtx, schema, &comp.left, ctes)? {
1164        ExecutionResult::Query(qr) => qr,
1165        _ => QueryResult {
1166            columns: vec![],
1167            rows: vec![],
1168        },
1169    };
1170    let right_qr = match exec_query_body_in_txn(wtx, schema, &comp.right, ctes)? {
1171        ExecutionResult::Query(qr) => qr,
1172        _ => QueryResult {
1173            columns: vec![],
1174            rows: vec![],
1175        },
1176    };
1177    apply_set_operation(comp, left_qr, right_qr)
1178}
1179
1180pub(super) fn apply_set_operation(
1181    comp: &CompoundSelect,
1182    left_qr: QueryResult,
1183    right_qr: QueryResult,
1184) -> Result<ExecutionResult> {
1185    if !left_qr.columns.is_empty()
1186        && !right_qr.columns.is_empty()
1187        && left_qr.columns.len() != right_qr.columns.len()
1188    {
1189        return Err(SqlError::CompoundColumnCountMismatch {
1190            left: left_qr.columns.len(),
1191            right: right_qr.columns.len(),
1192        });
1193    }
1194
1195    let columns = left_qr.columns;
1196
1197    let mut rows = match (&comp.op, comp.all) {
1198        (SetOp::Union, true) => {
1199            let total = left_qr.rows.len().saturating_add(right_qr.rows.len());
1200            let mut rows = Vec::with_capacity(total);
1201            rows.extend(left_qr.rows);
1202            rows.extend(right_qr.rows);
1203            rows
1204        }
1205        (SetOp::Union, false) => {
1206            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1207            let mut rows = Vec::new();
1208            for row in left_qr.rows.into_iter().chain(right_qr.rows) {
1209                if !seen.contains(&row) {
1210                    seen.insert(row.clone());
1211                    rows.push(row);
1212                }
1213            }
1214            rows
1215        }
1216        (SetOp::Intersect, true) => {
1217            let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1218            for row in &right_qr.rows {
1219                *right_counts.entry(row.clone()).or_insert(0) += 1;
1220            }
1221            let mut rows = Vec::new();
1222            for row in left_qr.rows {
1223                if let Some(count) = right_counts.get_mut(&row) {
1224                    if *count > 0 {
1225                        *count -= 1;
1226                        rows.push(row);
1227                    }
1228                }
1229            }
1230            rows
1231        }
1232        (SetOp::Intersect, false) => {
1233            let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1234            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1235            let mut rows = Vec::new();
1236            for row in left_qr.rows {
1237                if right_set.contains(&row) && !seen.contains(&row) {
1238                    seen.insert(row.clone());
1239                    rows.push(row);
1240                }
1241            }
1242            rows
1243        }
1244        (SetOp::Except, true) => {
1245            let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1246            for row in &right_qr.rows {
1247                *right_counts.entry(row.clone()).or_insert(0) += 1;
1248            }
1249            let mut rows = Vec::new();
1250            for row in left_qr.rows {
1251                if let Some(count) = right_counts.get_mut(&row) {
1252                    if *count > 0 {
1253                        *count -= 1;
1254                        continue;
1255                    }
1256                }
1257                rows.push(row);
1258            }
1259            rows
1260        }
1261        (SetOp::Except, false) => {
1262            let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1263            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1264            let mut rows = Vec::new();
1265            for row in left_qr.rows {
1266                if !right_set.contains(&row) && !seen.contains(&row) {
1267                    seen.insert(row.clone());
1268                    rows.push(row);
1269                }
1270            }
1271            rows
1272        }
1273    };
1274
1275    if !comp.order_by.is_empty() {
1276        let col_defs: Vec<crate::types::ColumnDef> = columns
1277            .iter()
1278            .enumerate()
1279            .map(|(i, name)| crate::types::ColumnDef {
1280                name: name.clone(),
1281                data_type: crate::types::DataType::Null,
1282                nullable: true,
1283                position: i as u16,
1284                default_expr: None,
1285                default_sql: None,
1286                check_expr: None,
1287                check_sql: None,
1288                check_name: None,
1289                is_with_timezone: false,
1290                generated_expr: None,
1291                generated_sql: None,
1292                generated_kind: None,
1293                collation: crate::types::Collation::Binary,
1294            })
1295            .collect();
1296        sort_rows(&mut rows, &comp.order_by, &col_defs)?;
1297    }
1298
1299    if let Some(ref offset_expr) = comp.offset {
1300        let offset = eval_const_int(offset_expr)?.max(0) as usize;
1301        if offset < rows.len() {
1302            rows = rows.split_off(offset);
1303        } else {
1304            rows.clear();
1305        }
1306    }
1307
1308    if let Some(ref limit_expr) = comp.limit {
1309        let limit = eval_const_int(limit_expr)?.max(0) as usize;
1310        rows.truncate(limit);
1311    }
1312
1313    Ok(ExecutionResult::Query(QueryResult { columns, rows }))
1314}
1315
1316struct InsertBufs {
1317    row: Vec<Value>,
1318    pk_values: Vec<Value>,
1319    value_values: Vec<Value>,
1320    key_buf: Vec<u8>,
1321    value_buf: Vec<u8>,
1322    col_indices: Vec<usize>,
1323    fk_key_buf: Vec<u8>,
1324}
1325
1326impl InsertBufs {
1327    fn new() -> Self {
1328        Self {
1329            row: Vec::new(),
1330            pk_values: Vec::new(),
1331            value_values: Vec::new(),
1332            key_buf: Vec::with_capacity(64),
1333            value_buf: Vec::with_capacity(256),
1334            col_indices: Vec::new(),
1335            fk_key_buf: Vec::with_capacity(64),
1336        }
1337    }
1338}
1339
1340thread_local! {
1341    static INSERT_SCRATCH: RefCell<InsertBufs> = RefCell::new(InsertBufs::new());
1342    static UPSERT_SCRATCH: RefCell<UpsertBufs> = RefCell::new(UpsertBufs::new());
1343}
1344
1345fn with_insert_scratch<R>(f: impl FnOnce(&mut InsertBufs) -> R) -> R {
1346    INSERT_SCRATCH.with(|slot| match slot.try_borrow_mut() {
1347        Ok(mut borrowed) => f(&mut borrowed),
1348        Err(_) => {
1349            let mut local = InsertBufs::new();
1350            f(&mut local)
1351        }
1352    })
1353}
1354
1355pub(super) struct UpsertBufs {
1356    old_row: Vec<Value>,
1357    new_row: Vec<Value>,
1358    value_values: Vec<Value>,
1359    new_value_buf: Vec<u8>,
1360}
1361
1362impl UpsertBufs {
1363    pub(super) fn new() -> Self {
1364        Self {
1365            old_row: Vec::new(),
1366            new_row: Vec::new(),
1367            value_values: Vec::new(),
1368            new_value_buf: Vec::with_capacity(256),
1369        }
1370    }
1371}
1372
1373pub fn exec_insert_in_txn(
1374    wtx: &mut WriteTxn<'_>,
1375    schema: &SchemaManager,
1376    stmt: &InsertStmt,
1377    params: &[Value],
1378) -> Result<ExecutionResult> {
1379    with_insert_scratch(|bufs| {
1380        exec_insert_in_txn_impl(
1381            wtx,
1382            schema,
1383            stmt,
1384            params,
1385            bufs,
1386            None,
1387            &CteContext::default(),
1388        )
1389    })
1390}
1391
1392pub(super) fn exec_insert_in_txn_with_ctes(
1393    wtx: &mut WriteTxn<'_>,
1394    schema: &SchemaManager,
1395    stmt: &InsertStmt,
1396    params: &[Value],
1397    outer_ctes: &CteContext,
1398) -> Result<ExecutionResult> {
1399    with_insert_scratch(|bufs| {
1400        exec_insert_in_txn_impl(wtx, schema, stmt, params, bufs, None, outer_ctes)
1401    })
1402}
1403
1404fn exec_insert_in_txn_cached(
1405    wtx: &mut WriteTxn<'_>,
1406    schema: &SchemaManager,
1407    stmt: &InsertStmt,
1408    params: &[Value],
1409    cache: &InsertCache,
1410) -> Result<ExecutionResult> {
1411    with_insert_scratch(|bufs| {
1412        exec_insert_in_txn_impl(
1413            wtx,
1414            schema,
1415            stmt,
1416            params,
1417            bufs,
1418            Some(cache),
1419            &CteContext::default(),
1420        )
1421    })
1422}
1423
1424fn exec_insert_in_txn_impl(
1425    wtx: &mut WriteTxn<'_>,
1426    schema: &SchemaManager,
1427    stmt: &InsertStmt,
1428    params: &[Value],
1429    bufs: &mut InsertBufs,
1430    cache: Option<&InsertCache>,
1431    outer_ctes: &CteContext,
1432) -> Result<ExecutionResult> {
1433    let empty_ctes = CteContext::default();
1434    let materialized;
1435    let has_sub = match cache {
1436        Some(c) => c.has_subquery,
1437        None => insert_has_subquery(stmt),
1438    };
1439    let stmt = if has_sub {
1440        materialized = materialize_insert(stmt, &mut |sub| {
1441            exec_subquery_write(wtx, schema, sub, &empty_ctes)
1442        })?;
1443        &materialized
1444    } else {
1445        stmt
1446    };
1447
1448    let view_lookup_key = stmt.table.to_ascii_lowercase();
1449    if let Some(view_def) = schema.get_view(&view_lookup_key) {
1450        if super::triggers::has_instead_of(
1451            schema,
1452            &view_lookup_key,
1453            super::triggers::FireEvent::Insert,
1454        ) {
1455            let aliases = view_def.column_aliases.clone();
1456            return exec_instead_of_view_insert_in_txn(
1457                wtx,
1458                schema,
1459                &view_lookup_key,
1460                &aliases,
1461                stmt,
1462                params,
1463            );
1464        }
1465        return Err(SqlError::CannotModifyView(stmt.table.clone()));
1466    }
1467
1468    let table_schema = schema
1469        .get(&stmt.table)
1470        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1471    super::ann_persist::purge_segment(wtx, &table_schema.name)?;
1472
1473    let default_columns;
1474    let insert_columns: &[String] = if stmt.columns.is_empty() {
1475        default_columns = table_schema
1476            .columns
1477            .iter()
1478            .map(|c| c.name.clone())
1479            .collect::<Vec<_>>();
1480        &default_columns
1481    } else {
1482        &stmt.columns
1483    };
1484
1485    bufs.col_indices.clear();
1486    if let Some(c) = cache {
1487        bufs.col_indices.extend_from_slice(&c.col_indices);
1488    } else {
1489        for name in insert_columns {
1490            bufs.col_indices.push(
1491                table_schema
1492                    .column_index(name)
1493                    .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?,
1494            );
1495        }
1496    }
1497
1498    if cache.is_none() {
1499        for &ci in &bufs.col_indices {
1500            if table_schema.columns[ci].generated_kind.is_some() {
1501                return Err(SqlError::CannotInsertIntoGeneratedColumn(
1502                    table_schema.columns[ci].name.clone(),
1503                ));
1504            }
1505        }
1506    }
1507
1508    let generated_cols_uncached: Vec<(usize, &Expr, FastGenEval)>;
1509    let cached_gen_positions: &[usize];
1510    let cached_gen_fast_evals: &[FastGenEval];
1511    if let Some(c) = cache {
1512        cached_gen_positions = &c.generated_col_positions;
1513        cached_gen_fast_evals = &c.generated_fast_evals;
1514        generated_cols_uncached = Vec::new();
1515    } else {
1516        cached_gen_positions = &[];
1517        cached_gen_fast_evals = &[];
1518        generated_cols_uncached = table_schema
1519            .columns
1520            .iter()
1521            .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
1522            .map(|c| {
1523                let expr = c.generated_expr.as_ref().unwrap();
1524                let fe = detect_fast_gen_eval(expr, table_schema);
1525                (c.position as usize, expr, fe)
1526            })
1527            .collect();
1528    }
1529    let has_gen_cols = !cached_gen_positions.is_empty() || !generated_cols_uncached.is_empty();
1530    let row_col_map_for_gen_owned: Option<ColumnMap> = if !has_gen_cols || cache.is_some() {
1531        None
1532    } else {
1533        Some(ColumnMap::new(&table_schema.columns))
1534    };
1535    let row_col_map_for_gen: Option<&ColumnMap> = if !has_gen_cols {
1536        None
1537    } else if let Some(c) = cache {
1538        c.row_col_map.as_ref()
1539    } else {
1540        row_col_map_for_gen_owned.as_ref()
1541    };
1542
1543    let any_defaults = match cache {
1544        Some(c) => c.any_defaults,
1545        None => table_schema
1546            .columns
1547            .iter()
1548            .any(|c| c.default_expr.is_some()),
1549    };
1550    let defaults: Vec<(usize, &Expr)> = if any_defaults {
1551        table_schema
1552            .columns
1553            .iter()
1554            .filter(|c| {
1555                c.default_expr.is_some() && !bufs.col_indices.contains(&(c.position as usize))
1556            })
1557            .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
1558            .collect()
1559    } else {
1560        Vec::new()
1561    };
1562
1563    let has_checks = match cache {
1564        Some(c) => c.has_checks,
1565        None => table_schema.has_checks(),
1566    };
1567    let check_col_map = if has_checks {
1568        Some(ColumnMap::new(&table_schema.columns))
1569    } else {
1570        None
1571    };
1572
1573    let (pk_indices, non_pk, enc_pos, phys_count, dropped): (
1574        &[usize],
1575        &[usize],
1576        &[u16],
1577        usize,
1578        &[u16],
1579    ) = if let Some(c) = cache {
1580        (
1581            &c.pk_indices,
1582            &c.non_pk_indices,
1583            &c.encoding_positions,
1584            c.phys_count,
1585            &c.dropped_non_pk_slots,
1586        )
1587    } else {
1588        (
1589            table_schema.pk_indices(),
1590            table_schema.non_pk_indices(),
1591            table_schema.encoding_positions(),
1592            table_schema.physical_non_pk_count(),
1593            table_schema.dropped_non_pk_slots(),
1594        )
1595    };
1596
1597    bufs.row.resize(table_schema.columns.len(), Value::Null);
1598    bufs.pk_values.resize(pk_indices.len(), Value::Null);
1599    bufs.value_values.resize(phys_count, Value::Null);
1600
1601    let table_bytes = table_schema.name.as_bytes();
1602    let has_fks = !table_schema.foreign_keys.is_empty();
1603    let has_indices = !table_schema.indices.is_empty();
1604    let has_defaults = !defaults.is_empty();
1605
1606    let compiled_conflict: Option<Arc<CompiledOnConflict>> = match (cache, &stmt.on_conflict) {
1607        (Some(c), Some(_)) if c.on_conflict.is_some() => c.on_conflict.clone(),
1608        (_, Some(oc)) => Some(Arc::new(compile_on_conflict(oc, table_schema)?)),
1609        (_, None) => None,
1610    };
1611
1612    let row_col_map_owned: Option<ColumnMap> =
1613        if compiled_conflict.is_some() && cache.and_then(|c| c.row_col_map.as_ref()).is_none() {
1614            Some(ColumnMap::new(&table_schema.columns))
1615        } else {
1616            None
1617        };
1618    let row_col_map: Option<&ColumnMap> = cache
1619        .and_then(|c| c.row_col_map.as_ref())
1620        .or(row_col_map_owned.as_ref());
1621
1622    let select_rows = match &stmt.source {
1623        InsertSource::Select(sq) => {
1624            let insert_ctes = super::materialize_all_ctes_with_outer(
1625                &sq.ctes,
1626                sq.recursive,
1627                outer_ctes,
1628                &mut |body, ctx| exec_query_body_write(wtx, schema, body, ctx),
1629            )?;
1630            let qr = exec_query_body_write(wtx, schema, &sq.body, &insert_ctes)?;
1631            Some(qr.rows)
1632        }
1633        InsertSource::Values(_) => None,
1634    };
1635
1636    let mut count: u64 = 0;
1637    let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
1638        stmt.returning.as_ref().map(|_| Vec::new());
1639
1640    let plain_insert = compiled_conflict.is_none();
1641    let single_int_pk = is_single_int_pk(table_schema);
1642    let mut min_inserted_pk: Option<i64> = None;
1643
1644    let values = match &stmt.source {
1645        InsertSource::Values(rows) => Some(rows.as_slice()),
1646        InsertSource::Select(_) => None,
1647    };
1648    let sel_rows = select_rows.as_deref();
1649
1650    let total = match (values, sel_rows) {
1651        (Some(rows), _) => rows.len(),
1652        (_, Some(rows)) => rows.len(),
1653        _ => 0,
1654    };
1655
1656    if let Some(sel) = sel_rows {
1657        if !sel.is_empty() && sel[0].len() != insert_columns.len() {
1658            return Err(SqlError::InvalidValue(format!(
1659                "INSERT ... SELECT column count mismatch: expected {}, got {}",
1660                insert_columns.len(),
1661                sel[0].len()
1662            )));
1663        }
1664    }
1665
1666    let has_insert_statement_triggers_impl =
1667        schema.triggers_for(&table_schema.name).iter().any(|t| {
1668            t.enabled
1669                && t.granularity == crate::parser::TriggerGranularity::ForEachStatement
1670                && t.events
1671                    .iter()
1672                    .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1673        });
1674    let mut stmt_new_rows_impl: Vec<Vec<Value>> = if has_insert_statement_triggers_impl {
1675        Vec::with_capacity(total)
1676    } else {
1677        Vec::new()
1678    };
1679    if has_insert_statement_triggers_impl {
1680        super::triggers::fire_statement_triggers(
1681            wtx,
1682            schema,
1683            &table_schema.name,
1684            crate::parser::TriggerTiming::Before,
1685            super::triggers::FireEvent::Insert,
1686            &table_schema.columns,
1687            &[],
1688            &[],
1689        )?;
1690    }
1691
1692    let skip_row_clear = cache.is_some_and(|c| c.row_fully_overwritten);
1693    for idx in 0..total {
1694        if !skip_row_clear {
1695            for v in bufs.row.iter_mut() {
1696                *v = Value::Null;
1697            }
1698        }
1699
1700        if let Some(value_rows) = values {
1701            if let Some(plan) = cache.and_then(|c| c.bind_plan.as_ref()) {
1702                for action in plan {
1703                    match action {
1704                        BindAction::Param {
1705                            param_idx,
1706                            col_idx,
1707                            target,
1708                        } => {
1709                            let v = &params[*param_idx];
1710                            bufs.row[*col_idx] = if v.is_null() {
1711                                Value::Null
1712                            } else if v.data_type() == *target {
1713                                v.clone()
1714                            } else {
1715                                let got = v.data_type();
1716                                v.clone().coerce_into(*target).ok_or_else(|| {
1717                                    SqlError::TypeMismatch {
1718                                        expected: target.to_string(),
1719                                        got: got.to_string(),
1720                                    }
1721                                })?
1722                            };
1723                        }
1724                        BindAction::Literal { value, col_idx } => {
1725                            bufs.row[*col_idx] = value.clone();
1726                        }
1727                    }
1728                }
1729            } else {
1730                let value_row = &value_rows[idx];
1731                if value_row.len() != insert_columns.len() {
1732                    return Err(SqlError::InvalidValue(format!(
1733                        "expected {} values, got {}",
1734                        insert_columns.len(),
1735                        value_row.len()
1736                    )));
1737                }
1738                for (i, expr) in value_row.iter().enumerate() {
1739                    let val = match expr {
1740                        Expr::Parameter(n) => params
1741                            .get(n - 1)
1742                            .cloned()
1743                            .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
1744                        Expr::Literal(v) => v.clone(),
1745                        _ => eval_const_expr(expr)?,
1746                    };
1747                    let col_idx = bufs.col_indices[i];
1748                    let col = &table_schema.columns[col_idx];
1749                    let got_type = val.data_type();
1750                    bufs.row[col_idx] = if val.is_null() {
1751                        Value::Null
1752                    } else {
1753                        val.coerce_into(col.data_type)
1754                            .ok_or_else(|| SqlError::TypeMismatch {
1755                                expected: col.data_type.to_string(),
1756                                got: got_type.to_string(),
1757                            })?
1758                    };
1759                }
1760            }
1761        } else if let Some(sel) = sel_rows {
1762            let sel_row = &sel[idx];
1763            for (i, val) in sel_row.iter().enumerate() {
1764                let col_idx = bufs.col_indices[i];
1765                let col = &table_schema.columns[col_idx];
1766                let got_type = val.data_type();
1767                bufs.row[col_idx] = if val.is_null() {
1768                    Value::Null
1769                } else {
1770                    val.clone().coerce_into(col.data_type).ok_or_else(|| {
1771                        SqlError::TypeMismatch {
1772                            expected: col.data_type.to_string(),
1773                            got: got_type.to_string(),
1774                        }
1775                    })?
1776                };
1777            }
1778        }
1779
1780        if has_defaults {
1781            for &(pos, def_expr) in &defaults {
1782                let val = eval_const_expr(def_expr)?;
1783                let col = &table_schema.columns[pos];
1784                if !val.is_null() {
1785                    let got_type = val.data_type();
1786                    bufs.row[pos] =
1787                        val.coerce_into(col.data_type)
1788                            .ok_or_else(|| SqlError::TypeMismatch {
1789                                expected: col.data_type.to_string(),
1790                                got: got_type.to_string(),
1791                            })?;
1792                }
1793            }
1794        }
1795
1796        if let Some(gen_map) = row_col_map_for_gen {
1797            if cache.is_some() {
1798                for (pos, fast) in cached_gen_positions
1799                    .iter()
1800                    .copied()
1801                    .zip(cached_gen_fast_evals.iter())
1802                {
1803                    let gen_expr = table_schema.columns[pos].generated_expr.as_ref().unwrap();
1804                    let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1805                    let col = &table_schema.columns[pos];
1806                    bufs.row[pos] = if val.is_null() {
1807                        Value::Null
1808                    } else {
1809                        let got_type = val.data_type();
1810                        val.coerce_into(col.data_type)
1811                            .ok_or_else(|| SqlError::TypeMismatch {
1812                                expected: col.data_type.to_string(),
1813                                got: got_type.to_string(),
1814                            })?
1815                    };
1816                }
1817            } else {
1818                for (pos, gen_expr, fast) in &generated_cols_uncached {
1819                    let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1820                    let col = &table_schema.columns[*pos];
1821                    bufs.row[*pos] = if val.is_null() {
1822                        Value::Null
1823                    } else {
1824                        let got_type = val.data_type();
1825                        val.coerce_into(col.data_type)
1826                            .ok_or_else(|| SqlError::TypeMismatch {
1827                                expected: col.data_type.to_string(),
1828                                got: got_type.to_string(),
1829                            })?
1830                    };
1831                }
1832            }
1833        }
1834
1835        if let Some(c) = cache {
1836            for &pos in &c.not_null_indices {
1837                if bufs.row[pos as usize].is_null() {
1838                    return Err(SqlError::NotNullViolation(
1839                        table_schema.columns[pos as usize].name.clone(),
1840                    ));
1841                }
1842            }
1843        } else {
1844            for col in &table_schema.columns {
1845                if !col.nullable && bufs.row[col.position as usize].is_null() {
1846                    return Err(SqlError::NotNullViolation(col.name.clone()));
1847                }
1848            }
1849        }
1850
1851        if let Some(ref col_map) = check_col_map {
1852            for col in &table_schema.columns {
1853                if let Some(ref check) = col.check_expr {
1854                    let result = eval_expr(check, &EvalCtx::new(col_map, &bufs.row))?;
1855                    if !is_truthy(&result) && !result.is_null() {
1856                        let name = col.check_name.as_deref().unwrap_or(&col.name);
1857                        return Err(SqlError::CheckViolation(name.to_string()));
1858                    }
1859                }
1860            }
1861            for tc in &table_schema.check_constraints {
1862                let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &bufs.row))?;
1863                if !is_truthy(&result) && !result.is_null() {
1864                    let name = tc.name.as_deref().unwrap_or(&tc.sql);
1865                    return Err(SqlError::CheckViolation(name.to_string()));
1866                }
1867            }
1868        }
1869
1870        if has_fks {
1871            for fk in &table_schema.foreign_keys {
1872                let any_null = fk.columns.iter().any(|&ci| bufs.row[ci as usize].is_null());
1873                if any_null {
1874                    continue;
1875                }
1876                crate::encoding::encode_composite_key_from_indices(
1877                    &fk.columns,
1878                    &bufs.row,
1879                    &mut bufs.fk_key_buf,
1880                );
1881                if fk.deferrable && fk.initially_deferred {
1882                    let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
1883                    wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
1884                        fk_name: name,
1885                        foreign_table: fk.foreign_table.as_bytes().to_vec(),
1886                        parent_key: bufs.fk_key_buf.clone(),
1887                    });
1888                    continue;
1889                }
1890                if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &bufs.fk_key_buf) {
1891                    let found = wtx
1892                        .table_get(fk.foreign_table.as_bytes(), &bufs.fk_key_buf)
1893                        .map_err(SqlError::Storage)?;
1894                    if found.is_none() {
1895                        let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
1896                        return Err(SqlError::ForeignKeyViolation(name.to_string()));
1897                    }
1898                    wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &bufs.fk_key_buf);
1899                }
1900            }
1901        }
1902
1903        let proposed_row_for_returning: Option<Vec<Value>> =
1904            returning_rows.as_ref().map(|_| bufs.row.clone());
1905        let row_for_stmt_trigger_impl: Option<Vec<Value>> = if has_insert_statement_triggers_impl {
1906            Some(bufs.row.clone())
1907        } else {
1908            None
1909        };
1910
1911        let has_before_insert_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
1912            t.enabled
1913                && t.timing == crate::parser::TriggerTiming::Before
1914                && t.granularity == crate::parser::TriggerGranularity::ForEachRow
1915                && t.events
1916                    .iter()
1917                    .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1918        });
1919        if has_before_insert_triggers {
1920            super::triggers::fire_row_triggers(
1921                wtx,
1922                schema,
1923                &table_schema.name,
1924                crate::parser::TriggerTiming::Before,
1925                super::triggers::FireEvent::Insert,
1926                None,
1927                Some(bufs.row.clone()),
1928                &table_schema.columns,
1929            )?;
1930        }
1931
1932        for (j, &i) in pk_indices.iter().enumerate() {
1933            bufs.pk_values[j] = std::mem::replace(&mut bufs.row[i], Value::Null);
1934        }
1935        match cache.map(|c| c.single_int_pk).unwrap_or(false) {
1936            true => match bufs.pk_values[0] {
1937                Value::Integer(v) => crate::encoding::encode_int_key_into(v, &mut bufs.key_buf),
1938                _ => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1939            },
1940            false => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1941        }
1942        if plain_insert && single_int_pk {
1943            if let Value::Integer(id) = &bufs.pk_values[0] {
1944                min_inserted_pk = Some(min_inserted_pk.map_or(*id, |m| m.min(*id)));
1945            }
1946        }
1947
1948        for &slot in dropped {
1949            bufs.value_values[slot as usize] = Value::Null;
1950        }
1951        for (j, &i) in non_pk.iter().enumerate() {
1952            let col = &table_schema.columns[i];
1953            if matches!(
1954                col.generated_kind,
1955                Some(crate::parser::GeneratedKind::Virtual)
1956            ) {
1957                bufs.value_values[enc_pos[j] as usize] = Value::Null;
1958                bufs.row[i] = Value::Null;
1959            } else {
1960                bufs.value_values[enc_pos[j] as usize] =
1961                    std::mem::replace(&mut bufs.row[i], Value::Null);
1962            }
1963        }
1964        match cache.and_then(|c| c.row_encoder.as_ref()) {
1965            Some(tmpl) => crate::encoding::encode_int_row_with_template(
1966                tmpl,
1967                &bufs.value_values,
1968                &mut bufs.value_buf,
1969            )?,
1970            None => encode_row_into(&bufs.value_values, &mut bufs.value_buf),
1971        }
1972
1973        if bufs.key_buf.len() > citadel_core::MAX_KEY_SIZE {
1974            return Err(SqlError::KeyTooLarge {
1975                size: bufs.key_buf.len(),
1976                max: citadel_core::MAX_KEY_SIZE,
1977            });
1978        }
1979        if bufs.value_buf.len() > citadel_core::MAX_VALUE_SIZE {
1980            return Err(SqlError::RowTooLarge {
1981                size: bufs.value_buf.len(),
1982                max: citadel_core::MAX_VALUE_SIZE,
1983            });
1984        }
1985
1986        match compiled_conflict.as_ref() {
1987            None => {
1988                let is_new = wtx
1989                    .table_insert(table_bytes, &bufs.key_buf, &bufs.value_buf)
1990                    .map_err(SqlError::Storage)?;
1991                if !is_new {
1992                    return Err(SqlError::DuplicateKey);
1993                }
1994                let has_after_insert_triggers =
1995                    schema.triggers_for(&table_schema.name).iter().any(|t| {
1996                        t.enabled
1997                            && t.timing == crate::parser::TriggerTiming::After
1998                            && t.granularity == crate::parser::TriggerGranularity::ForEachRow
1999                            && t.events
2000                                .iter()
2001                                .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
2002                    });
2003                if has_indices || has_after_insert_triggers {
2004                    for (j, &i) in pk_indices.iter().enumerate() {
2005                        bufs.row[i] = bufs.pk_values[j].clone();
2006                    }
2007                    for (j, &i) in non_pk.iter().enumerate() {
2008                        bufs.row[i] = std::mem::replace(
2009                            &mut bufs.value_values[enc_pos[j] as usize],
2010                            Value::Null,
2011                        );
2012                    }
2013                    if has_indices {
2014                        insert_index_entries(wtx, table_schema, &bufs.row, &bufs.pk_values)?;
2015                    }
2016                    if has_after_insert_triggers {
2017                        super::triggers::fire_row_triggers(
2018                            wtx,
2019                            schema,
2020                            &table_schema.name,
2021                            crate::parser::TriggerTiming::After,
2022                            super::triggers::FireEvent::Insert,
2023                            None,
2024                            Some(bufs.row.clone()),
2025                            &table_schema.columns,
2026                        )?;
2027                    }
2028                }
2029                if let Some(r) = row_for_stmt_trigger_impl.clone() {
2030                    stmt_new_rows_impl.push(r);
2031                }
2032                count += 1;
2033                if let Some(buf) = returning_rows.as_mut() {
2034                    buf.push((None, proposed_row_for_returning));
2035                }
2036            }
2037            Some(oc) => {
2038                let oc_ref: &CompiledOnConflict = oc;
2039                let needs_row = upsert_needs_row(oc_ref, table_schema);
2040                if needs_row {
2041                    for (j, &i) in pk_indices.iter().enumerate() {
2042                        bufs.row[i] = bufs.pk_values[j].clone();
2043                    }
2044                    for (j, &i) in non_pk.iter().enumerate() {
2045                        bufs.row[i] = std::mem::replace(
2046                            &mut bufs.value_values[enc_pos[j] as usize],
2047                            Value::Null,
2048                        );
2049                    }
2050                }
2051                let outcome = apply_insert_with_conflict(
2052                    wtx,
2053                    table_schema,
2054                    &bufs.key_buf,
2055                    &bufs.value_buf,
2056                    &bufs.row,
2057                    &bufs.pk_values,
2058                    oc_ref,
2059                    row_col_map.unwrap(),
2060                    stmt.returning.is_some(),
2061                )?;
2062                match outcome {
2063                    InsertRowOutcome::Inserted => {
2064                        count += 1;
2065                        if let Some(buf) = returning_rows.as_mut() {
2066                            buf.push((None, proposed_row_for_returning));
2067                        }
2068                        if let Some(r) = row_for_stmt_trigger_impl.clone() {
2069                            stmt_new_rows_impl.push(r);
2070                        }
2071                        let has_after_insert_triggers =
2072                            schema.triggers_for(&table_schema.name).iter().any(|t| {
2073                                t.enabled
2074                                    && t.timing == crate::parser::TriggerTiming::After
2075                                    && t.granularity
2076                                        == crate::parser::TriggerGranularity::ForEachRow
2077                                    && t.events
2078                                        .iter()
2079                                        .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
2080                            });
2081                        if has_after_insert_triggers {
2082                            super::triggers::fire_row_triggers(
2083                                wtx,
2084                                schema,
2085                                &table_schema.name,
2086                                crate::parser::TriggerTiming::After,
2087                                super::triggers::FireEvent::Insert,
2088                                None,
2089                                Some(bufs.row.clone()),
2090                                &table_schema.columns,
2091                            )?;
2092                        }
2093                    }
2094                    InsertRowOutcome::Updated { old, new } => {
2095                        count += 1;
2096                        if let Some(buf) = returning_rows.as_mut() {
2097                            buf.push((Some(old.clone()), Some(new.clone())));
2098                        }
2099                        let has_after_update_triggers =
2100                            schema.triggers_for(&table_schema.name).iter().any(|t| {
2101                                t.enabled
2102                                    && t.timing == crate::parser::TriggerTiming::After
2103                                    && t.granularity
2104                                        == crate::parser::TriggerGranularity::ForEachRow
2105                                    && t.events.iter().any(|e| {
2106                                        matches!(e, crate::parser::TriggerEvent::Update(_))
2107                                    })
2108                            });
2109                        if has_after_update_triggers {
2110                            let changed_cols: Vec<String> = match oc_ref {
2111                                CompiledOnConflict::DoUpdate { assignments, .. } => assignments
2112                                    .iter()
2113                                    .map(|(col_idx, _)| table_schema.columns[*col_idx].name.clone())
2114                                    .collect(),
2115                                _ => Vec::new(),
2116                            };
2117                            super::triggers::fire_row_triggers(
2118                                wtx,
2119                                schema,
2120                                &table_schema.name,
2121                                crate::parser::TriggerTiming::After,
2122                                super::triggers::FireEvent::Update {
2123                                    changed_columns: &changed_cols,
2124                                },
2125                                Some(old),
2126                                Some(new),
2127                                &table_schema.columns,
2128                            )?;
2129                        }
2130                    }
2131                    InsertRowOutcome::Skipped => {}
2132                }
2133            }
2134        }
2135    }
2136
2137    mark_insert_dml(
2138        schema,
2139        &table_schema.name,
2140        !plain_insert,
2141        single_int_pk,
2142        min_inserted_pk,
2143        count,
2144    );
2145
2146    if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
2147        if has_insert_statement_triggers_impl {
2148            super::triggers::fire_statement_triggers(
2149                wtx,
2150                schema,
2151                &table_schema.name,
2152                crate::parser::TriggerTiming::After,
2153                super::triggers::FireEvent::Insert,
2154                &table_schema.columns,
2155                &[],
2156                &stmt_new_rows_impl,
2157            )?;
2158        }
2159        return Ok(ExecutionResult::Query(super::helpers::project_returning(
2160            table_schema,
2161            returning_cols,
2162            &rows,
2163        )?));
2164    }
2165
2166    if has_insert_statement_triggers_impl {
2167        super::triggers::fire_statement_triggers(
2168            wtx,
2169            schema,
2170            &table_schema.name,
2171            crate::parser::TriggerTiming::After,
2172            super::triggers::FireEvent::Insert,
2173            &table_schema.columns,
2174            &[],
2175            &stmt_new_rows_impl,
2176        )?;
2177    }
2178
2179    Ok(ExecutionResult::RowsAffected(count))
2180}
2181
2182pub struct CompiledInsert {
2183    table_lower: String,
2184    cached: Option<InsertCache>,
2185}
2186
2187struct InsertCache {
2188    col_indices: Vec<usize>,
2189    has_subquery: bool,
2190    any_defaults: bool,
2191    has_checks: bool,
2192    on_conflict: Option<Arc<CompiledOnConflict>>,
2193    row_col_map: Option<ColumnMap>,
2194    generated_col_positions: Vec<usize>,
2195    generated_fast_evals: Vec<FastGenEval>,
2196    pk_indices: Vec<usize>,
2197    non_pk_indices: Vec<usize>,
2198    encoding_positions: Vec<u16>,
2199    dropped_non_pk_slots: Vec<u16>,
2200    phys_count: usize,
2201    single_int_pk: bool,
2202    not_null_indices: Vec<u16>,
2203    bind_plan: Option<Vec<BindAction>>,
2204    row_fully_overwritten: bool,
2205    row_encoder: Option<crate::encoding::IntRowTemplate>,
2206    is_trivial_fast: bool,
2207    trivial_fast_program: Option<TrivialFastProgram>,
2208    needs_scoped_params: bool,
2209}
2210
2211#[derive(Clone)]
2212enum BindAction {
2213    Param {
2214        param_idx: usize,
2215        col_idx: usize,
2216        target: DataType,
2217    },
2218    Literal {
2219        value: Value,
2220        col_idx: usize,
2221    },
2222}
2223
2224#[derive(Clone)]
2225struct TrivialFastProgram {
2226    template: Vec<u8>,
2227    ops: Vec<WriteOp>,
2228    pk_param: u8,
2229    not_null_param_indices: Vec<u8>,
2230}
2231
2232#[derive(Clone)]
2233enum WriteOp {
2234    ParamI64 {
2235        param_idx: u8,
2236        off: u32,
2237    },
2238    LiteralI64 {
2239        value: i64,
2240        off: u32,
2241    },
2242    GenAddParamsI64 {
2243        a_param: u8,
2244        b_param: u8,
2245        off: u32,
2246        bitmap_byte_off: u32,
2247        bitmap_bit_mask: u8,
2248    },
2249    GenMulAddParamI64 {
2250        param_idx: u8,
2251        mul: i64,
2252        add: i64,
2253        off: u32,
2254        bitmap_byte_off: u32,
2255        bitmap_bit_mask: u8,
2256    },
2257}
2258
2259fn build_trivial_fast_program(
2260    bind_plan: &[BindAction],
2261    row_encoder: &crate::encoding::IntRowTemplate,
2262    non_virtual_pairs: &[(usize, usize)],
2263    generated_col_positions: &[usize],
2264    generated_fast_evals: &[FastGenEval],
2265    pk_indices: &[usize],
2266    columns: &[crate::types::ColumnDef],
2267) -> Option<TrivialFastProgram> {
2268    let pk_col = pk_indices[0];
2269    let col_to_slot: rustc_hash::FxHashMap<usize, usize> =
2270        non_virtual_pairs.iter().copied().collect();
2271    let slot_to_off: rustc_hash::FxHashMap<usize, usize> =
2272        row_encoder.slot_offsets.iter().copied().collect();
2273
2274    let mut col_to_param: rustc_hash::FxHashMap<usize, u8> = Default::default();
2275    let mut col_to_lit_int: rustc_hash::FxHashMap<usize, i64> = Default::default();
2276    let mut pk_param: Option<u8> = None;
2277    let mut ops: Vec<WriteOp> = Vec::with_capacity(bind_plan.len() + generated_col_positions.len());
2278    let mut not_null_param_indices: Vec<u8> = Vec::new();
2279
2280    for action in bind_plan {
2281        match action {
2282            BindAction::Param {
2283                param_idx,
2284                col_idx,
2285                target,
2286            } => {
2287                if *target != DataType::Integer {
2288                    return None;
2289                }
2290                let pi: u8 = u8::try_from(*param_idx).ok()?;
2291                col_to_param.insert(*col_idx, pi);
2292                if *col_idx == pk_col {
2293                    pk_param = Some(pi);
2294                } else {
2295                    let slot = *col_to_slot.get(col_idx)?;
2296                    let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
2297                    ops.push(WriteOp::ParamI64 { param_idx: pi, off });
2298                    if !columns[*col_idx].nullable {
2299                        not_null_param_indices.push(pi);
2300                    }
2301                }
2302            }
2303            BindAction::Literal { value, col_idx } => match value {
2304                Value::Integer(v) => {
2305                    col_to_lit_int.insert(*col_idx, *v);
2306                    if *col_idx == pk_col {
2307                        return None;
2308                    }
2309                    let slot = *col_to_slot.get(col_idx)?;
2310                    let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
2311                    ops.push(WriteOp::LiteralI64 { value: *v, off });
2312                }
2313                _ => return None,
2314            },
2315        }
2316    }
2317
2318    let pk_param = pk_param?;
2319
2320    for (i, &gen_pos) in generated_col_positions.iter().enumerate() {
2321        let gen_slot = *col_to_slot.get(&gen_pos)?;
2322        let gen_off = u32::try_from(*slot_to_off.get(&gen_slot)?).ok()?;
2323        let bitmap_byte_off = u32::try_from(2 + gen_slot / 8).ok()?;
2324        let bitmap_bit_mask: u8 = 1u8 << (gen_slot % 8);
2325        let gen_col_nullable = columns[gen_pos].nullable;
2326
2327        match &generated_fast_evals[i] {
2328            FastGenEval::IntColAddCol {
2329                left_idx,
2330                right_idx,
2331            } => {
2332                let a_param = col_to_param.get(left_idx).copied();
2333                let b_param = col_to_param.get(right_idx).copied();
2334                match (a_param, b_param) {
2335                    (Some(ap), Some(bp)) => {
2336                        let deps_safe = gen_col_nullable
2337                            || (not_null_param_indices.contains(&ap)
2338                                && not_null_param_indices.contains(&bp));
2339                        if !deps_safe {
2340                            return None;
2341                        }
2342                        ops.push(WriteOp::GenAddParamsI64 {
2343                            a_param: ap,
2344                            b_param: bp,
2345                            off: gen_off,
2346                            bitmap_byte_off,
2347                            bitmap_bit_mask,
2348                        });
2349                    }
2350                    (Some(p), None) => {
2351                        let lit = col_to_lit_int.get(right_idx).copied()?;
2352                        if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2353                            return None;
2354                        }
2355                        ops.push(WriteOp::GenMulAddParamI64 {
2356                            param_idx: p,
2357                            mul: 1,
2358                            add: lit,
2359                            off: gen_off,
2360                            bitmap_byte_off,
2361                            bitmap_bit_mask,
2362                        });
2363                    }
2364                    (None, Some(p)) => {
2365                        let lit = col_to_lit_int.get(left_idx).copied()?;
2366                        if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2367                            return None;
2368                        }
2369                        ops.push(WriteOp::GenMulAddParamI64 {
2370                            param_idx: p,
2371                            mul: 1,
2372                            add: lit,
2373                            off: gen_off,
2374                            bitmap_byte_off,
2375                            bitmap_bit_mask,
2376                        });
2377                    }
2378                    (None, None) => {
2379                        let la = col_to_lit_int.get(left_idx).copied()?;
2380                        let lb = col_to_lit_int.get(right_idx).copied()?;
2381                        ops.push(WriteOp::LiteralI64 {
2382                            value: la.wrapping_add(lb),
2383                            off: gen_off,
2384                        });
2385                    }
2386                }
2387            }
2388            FastGenEval::IntColMulAdd {
2389                col_schema_idx,
2390                mul,
2391                add,
2392            } => {
2393                if let Some(p) = col_to_param.get(col_schema_idx).copied() {
2394                    if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2395                        return None;
2396                    }
2397                    ops.push(WriteOp::GenMulAddParamI64 {
2398                        param_idx: p,
2399                        mul: *mul,
2400                        add: *add,
2401                        off: gen_off,
2402                        bitmap_byte_off,
2403                        bitmap_bit_mask,
2404                    });
2405                } else if let Some(lit) = col_to_lit_int.get(col_schema_idx).copied() {
2406                    ops.push(WriteOp::LiteralI64 {
2407                        value: lit.wrapping_mul(*mul).wrapping_add(*add),
2408                        off: gen_off,
2409                    });
2410                } else {
2411                    return None;
2412                }
2413            }
2414            FastGenEval::None => return None,
2415        }
2416    }
2417
2418    Some(TrivialFastProgram {
2419        template: row_encoder.template.clone(),
2420        ops,
2421        pk_param,
2422        not_null_param_indices,
2423    })
2424}
2425
2426#[derive(Clone)]
2427pub(super) enum CompiledOnConflict {
2428    DoNothing {
2429        target: Option<ConflictKind>,
2430    },
2431    DoUpdate {
2432        target: ConflictKind,
2433        assignments: Vec<(usize, Expr)>,
2434        where_clause: Option<Expr>,
2435        fast_paths: Option<Vec<DoUpdateFastPath>>,
2436    },
2437}
2438
2439#[derive(Clone, Copy)]
2440pub(super) enum DoUpdateFastPath {
2441    IntAddConst { phys_idx: usize, delta: i64 },
2442}
2443
2444#[derive(Clone, Debug)]
2445pub(super) enum ConflictKind {
2446    PrimaryKey,
2447    UniqueIndex { index_idx: usize },
2448}
2449
2450fn resolve_conflict_target(target: &ConflictTarget, ts: &TableSchema) -> Result<ConflictKind> {
2451    match target {
2452        ConflictTarget::Columns(cols) => {
2453            let col_idx_set: Vec<u16> = cols
2454                .iter()
2455                .map(|name| {
2456                    ts.column_index(name)
2457                        .map(|i| i as u16)
2458                        .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
2459                })
2460                .collect::<Result<_>>()?;
2461            let pk_set = ts.primary_key_columns.clone();
2462            if set_equal(&col_idx_set, &pk_set) {
2463                return Ok(ConflictKind::PrimaryKey);
2464            }
2465            for (index_idx, idx) in ts.indices.iter().enumerate() {
2466                if idx.unique && set_equal(&col_idx_set, &idx.columns_vec()) {
2467                    return Ok(ConflictKind::UniqueIndex { index_idx });
2468                }
2469            }
2470            Err(SqlError::Plan(
2471                "ON CONFLICT target does not match any unique constraint".into(),
2472            ))
2473        }
2474        ConflictTarget::Constraint(name) => {
2475            let lower = name.to_ascii_lowercase();
2476            for (index_idx, idx) in ts.indices.iter().enumerate() {
2477                if idx.name.eq_ignore_ascii_case(&lower) {
2478                    if idx.unique {
2479                        return Ok(ConflictKind::UniqueIndex { index_idx });
2480                    }
2481                    return Err(SqlError::Plan(format!(
2482                        "ON CONFLICT ON CONSTRAINT '{name}' requires a unique index"
2483                    )));
2484                }
2485            }
2486            Err(SqlError::Plan(format!(
2487                "unknown constraint '{name}'; primary keys cannot be referenced by name, use ON CONFLICT (col_list)"
2488            )))
2489        }
2490    }
2491}
2492
2493fn set_equal(a: &[u16], b: &[u16]) -> bool {
2494    if a.len() != b.len() {
2495        return false;
2496    }
2497    let mut a_sorted = a.to_vec();
2498    let mut b_sorted = b.to_vec();
2499    a_sorted.sort_unstable();
2500    b_sorted.sort_unstable();
2501    a_sorted == b_sorted
2502}
2503
2504pub(super) enum InsertRowOutcome {
2505    Inserted,
2506    Updated { old: Vec<Value>, new: Vec<Value> },
2507    Skipped,
2508}
2509
2510#[allow(clippy::too_many_arguments)]
2511#[inline]
2512pub(super) fn apply_insert_with_conflict(
2513    wtx: &mut WriteTxn<'_>,
2514    table_schema: &TableSchema,
2515    key_buf: &[u8],
2516    value_buf: &[u8],
2517    row: &[Value],
2518    pk_values: &[Value],
2519    on_conflict: &CompiledOnConflict,
2520    col_map: &ColumnMap,
2521    capture_returning: bool,
2522) -> Result<InsertRowOutcome> {
2523    let table_bytes = table_schema.name.as_bytes();
2524
2525    if let CompiledOnConflict::DoNothing { target } = on_conflict {
2526        let pk_target = matches!(target, None | Some(ConflictKind::PrimaryKey));
2527        if pk_target && table_schema.indices.is_empty() && table_schema.foreign_keys.is_empty() {
2528            let inserted = wtx
2529                .table_insert_if_absent(table_bytes, key_buf, value_buf)
2530                .map_err(SqlError::Storage)?;
2531            return Ok(if inserted {
2532                InsertRowOutcome::Inserted
2533            } else {
2534                InsertRowOutcome::Skipped
2535            });
2536        }
2537    }
2538
2539    if let CompiledOnConflict::DoUpdate {
2540        target: ConflictKind::PrimaryKey,
2541        assignments,
2542        where_clause,
2543        fast_paths,
2544    } = on_conflict
2545    {
2546        if can_fuse_do_update(table_schema, assignments) {
2547            return apply_do_update_fused(
2548                wtx,
2549                table_schema,
2550                table_bytes,
2551                key_buf,
2552                value_buf,
2553                row,
2554                assignments,
2555                where_clause.as_ref(),
2556                col_map,
2557                fast_paths.as_deref(),
2558                capture_returning,
2559            );
2560        }
2561    }
2562
2563    let primary_outcome = wtx
2564        .table_insert_or_fetch(table_bytes, key_buf, value_buf)
2565        .map_err(SqlError::Storage)?;
2566
2567    match primary_outcome {
2568        citadel_txn::write_txn::InsertOutcome::Inserted => {
2569            if table_schema.indices.is_empty() {
2570                return Ok(InsertRowOutcome::Inserted);
2571            }
2572            let mut inserted_keys: Vec<(usize, Vec<u8>)> = Vec::new();
2573            match insert_index_entries_or_fetch(
2574                wtx,
2575                table_schema,
2576                row,
2577                pk_values,
2578                &mut inserted_keys,
2579            )? {
2580                None => Ok(InsertRowOutcome::Inserted),
2581                Some(conflicting_idx) => {
2582                    let matches_target =
2583                        matches!(on_conflict, CompiledOnConflict::DoNothing { target: None })
2584                            || matches!(
2585                                on_conflict,
2586                                CompiledOnConflict::DoNothing {
2587                                    target: Some(ConflictKind::UniqueIndex { index_idx }),
2588                                } | CompiledOnConflict::DoUpdate {
2589                                    target: ConflictKind::UniqueIndex { index_idx },
2590                                    ..
2591                                } if *index_idx == conflicting_idx
2592                            );
2593                    undo_partial_insert(wtx, table_schema, key_buf, &inserted_keys)?;
2594                    if !matches_target {
2595                        return Err(SqlError::UniqueViolation(
2596                            table_schema.indices[conflicting_idx].name.clone(),
2597                        ));
2598                    }
2599                    match on_conflict {
2600                        CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2601                        CompiledOnConflict::DoUpdate {
2602                            assignments,
2603                            where_clause,
2604                            ..
2605                        } => {
2606                            let existing_pk =
2607                                fetch_unique_index_pk(wtx, table_schema, conflicting_idx, row)?;
2608                            apply_do_update(
2609                                wtx,
2610                                table_schema,
2611                                &existing_pk,
2612                                row,
2613                                assignments,
2614                                where_clause.as_ref(),
2615                                col_map,
2616                                capture_returning,
2617                            )
2618                        }
2619                    }
2620                }
2621            }
2622        }
2623        citadel_txn::write_txn::InsertOutcome::Existed(old_bytes) => {
2624            let matches_target = matches!(
2625                on_conflict,
2626                CompiledOnConflict::DoNothing { target: None }
2627                    | CompiledOnConflict::DoNothing {
2628                        target: Some(ConflictKind::PrimaryKey),
2629                    }
2630                    | CompiledOnConflict::DoUpdate {
2631                        target: ConflictKind::PrimaryKey,
2632                        ..
2633                    }
2634            );
2635            if !matches_target {
2636                return Err(SqlError::DuplicateKey);
2637            }
2638            match on_conflict {
2639                CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2640                CompiledOnConflict::DoUpdate {
2641                    assignments,
2642                    where_clause,
2643                    ..
2644                } => {
2645                    let old_row = decode_full_row(table_schema, key_buf, &old_bytes)?;
2646                    apply_do_update_with_old_row(
2647                        wtx,
2648                        table_schema,
2649                        key_buf,
2650                        &old_row,
2651                        row,
2652                        assignments,
2653                        where_clause.as_ref(),
2654                        col_map,
2655                        capture_returning,
2656                    )
2657                }
2658            }
2659        }
2660    }
2661}
2662
2663#[inline]
2664fn apply_fast_path_patch(
2665    old_bytes: &[u8],
2666    fast_paths: &[DoUpdateFastPath],
2667) -> Result<UpsertAction> {
2668    UPSERT_SCRATCH.with(|slot| {
2669        let mut bufs = slot.borrow_mut();
2670        bufs.new_value_buf.clear();
2671        bufs.new_value_buf.extend_from_slice(old_bytes);
2672
2673        let mut patch_scratch: Vec<u8> = Vec::new();
2674
2675        for fp in fast_paths {
2676            match fp {
2677                DoUpdateFastPath::IntAddConst { phys_idx, delta } => {
2678                    let decoded =
2679                        crate::encoding::decode_columns(&bufs.new_value_buf, &[*phys_idx])?;
2680                    let old_val = &decoded[0];
2681                    let new_val = match old_val {
2682                        Value::Integer(i) => Value::Integer(i.wrapping_add(*delta)),
2683                        Value::Null => Value::Null,
2684                        _ => {
2685                            return Err(SqlError::TypeMismatch {
2686                                expected: "INTEGER".into(),
2687                                got: old_val.data_type().to_string(),
2688                            });
2689                        }
2690                    };
2691                    if !crate::encoding::patch_column_in_place(
2692                        &mut bufs.new_value_buf,
2693                        *phys_idx,
2694                        &new_val,
2695                    )? {
2696                        patch_scratch.clear();
2697                        crate::encoding::patch_row_column(
2698                            &bufs.new_value_buf,
2699                            *phys_idx,
2700                            &new_val,
2701                            &mut patch_scratch,
2702                        )?;
2703                        std::mem::swap(&mut bufs.new_value_buf, &mut patch_scratch);
2704                    }
2705                }
2706            }
2707        }
2708
2709        if bufs.new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2710            return Err(SqlError::RowTooLarge {
2711                size: bufs.new_value_buf.len(),
2712                max: citadel_core::MAX_VALUE_SIZE,
2713            });
2714        }
2715
2716        Ok(UpsertAction::Replace(bufs.new_value_buf.clone()))
2717    })
2718}
2719
2720fn upsert_needs_row(oc: &CompiledOnConflict, ts: &TableSchema) -> bool {
2721    if !ts.indices.is_empty() {
2722        return true;
2723    }
2724    match oc {
2725        CompiledOnConflict::DoNothing { .. } => false,
2726        CompiledOnConflict::DoUpdate { fast_paths, .. } => fast_paths.is_none() || ts.has_checks(),
2727    }
2728}
2729
2730fn can_fuse_do_update(ts: &TableSchema, assignments: &[(usize, Expr)]) -> bool {
2731    if !ts.indices.is_empty() {
2732        return false;
2733    }
2734    if !ts.foreign_keys.is_empty() {
2735        return false;
2736    }
2737    if ts.columns.iter().any(|c| c.generated_kind.is_some()) {
2738        return false;
2739    }
2740    let pk = ts.pk_indices();
2741    !assignments.iter().any(|(ci, _)| pk.contains(ci))
2742}
2743
2744#[allow(clippy::too_many_arguments)]
2745#[inline]
2746fn apply_do_update_fused(
2747    wtx: &mut WriteTxn<'_>,
2748    table_schema: &TableSchema,
2749    table_bytes: &[u8],
2750    key_buf: &[u8],
2751    value_buf: &[u8],
2752    proposed_row: &[Value],
2753    assignments: &[(usize, Expr)],
2754    where_clause: Option<&Expr>,
2755    col_map: &ColumnMap,
2756    fast_paths: Option<&[DoUpdateFastPath]>,
2757    capture_returning: bool,
2758) -> Result<InsertRowOutcome> {
2759    let non_pk = table_schema.non_pk_indices();
2760    let enc_pos = table_schema.encoding_positions();
2761    let phys_count = table_schema.physical_non_pk_count();
2762    let dropped = table_schema.dropped_non_pk_slots();
2763    let has_checks = table_schema.has_checks();
2764    let has_fks = !table_schema.foreign_keys.is_empty();
2765
2766    let captured: std::cell::RefCell<Option<(Vec<Value>, Vec<Value>)>> =
2767        std::cell::RefCell::new(None);
2768
2769    let outcome =
2770        wtx.table_upsert_with::<_, SqlError>(table_bytes, key_buf, value_buf, |old_bytes| {
2771            if let Some(fps) = fast_paths {
2772                if !has_checks {
2773                    let action = apply_fast_path_patch(old_bytes, fps)?;
2774                    if capture_returning {
2775                        if let UpsertAction::Replace(ref new_bytes) = action {
2776                            let old_row = decode_full_row(table_schema, key_buf, old_bytes)?;
2777                            let new_row = decode_full_row(table_schema, key_buf, new_bytes)?;
2778                            *captured.borrow_mut() = Some((old_row, new_row));
2779                        }
2780                    }
2781                    return Ok(action);
2782                }
2783            }
2784            UPSERT_SCRATCH.with(|slot| {
2785                let mut bufs = slot.borrow_mut();
2786                let UpsertBufs {
2787                    old_row,
2788                    new_row,
2789                    value_values,
2790                    new_value_buf,
2791                } = &mut *bufs;
2792
2793                old_row.clear();
2794                old_row.resize(table_schema.columns.len(), Value::Null);
2795                decode_full_row_into(table_schema, key_buf, old_bytes, old_row)?;
2796
2797                if let Some(w) = where_clause {
2798                    let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2799                    let result = eval_expr(w, &ctx)?;
2800                    if result.is_null() || !is_truthy(&result) {
2801                        return Ok(UpsertAction::Skip);
2802                    }
2803                }
2804
2805                new_row.clear();
2806                new_row.extend_from_slice(old_row);
2807                for (col_idx, expr) in assignments {
2808                    let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2809                    let val = eval_expr(expr, &ctx)?;
2810                    let col = &table_schema.columns[*col_idx];
2811                    new_row[*col_idx] = if val.is_null() {
2812                        Value::Null
2813                    } else {
2814                        let got = val.data_type();
2815                        val.coerce_into(col.data_type)
2816                            .ok_or_else(|| SqlError::TypeMismatch {
2817                                expected: col.data_type.to_string(),
2818                                got: got.to_string(),
2819                            })?
2820                    };
2821                }
2822
2823                for (assigned_idx, _) in assignments {
2824                    let col = &table_schema.columns[*assigned_idx];
2825                    if !col.nullable && new_row[col.position as usize].is_null() {
2826                        return Err(SqlError::NotNullViolation(col.name.clone()));
2827                    }
2828                }
2829                if has_checks {
2830                    for col in &table_schema.columns {
2831                        if let Some(ref check) = col.check_expr {
2832                            let ctx = EvalCtx::new(col_map, new_row);
2833                            let result = eval_expr(check, &ctx)?;
2834                            if !is_truthy(&result) && !result.is_null() {
2835                                let name = col.check_name.as_deref().unwrap_or(&col.name);
2836                                return Err(SqlError::CheckViolation(name.to_string()));
2837                            }
2838                        }
2839                    }
2840                    for tc in &table_schema.check_constraints {
2841                        let ctx = EvalCtx::new(col_map, new_row);
2842                        let result = eval_expr(&tc.expr, &ctx)?;
2843                        if !is_truthy(&result) && !result.is_null() {
2844                            let name = tc.name.as_deref().unwrap_or(&tc.sql);
2845                            return Err(SqlError::CheckViolation(name.to_string()));
2846                        }
2847                    }
2848                }
2849                let _ = has_fks;
2850
2851                value_values.clear();
2852                value_values.resize(phys_count, Value::Null);
2853                for &slot in dropped {
2854                    value_values[slot as usize] = Value::Null;
2855                }
2856                for (j, &i) in non_pk.iter().enumerate() {
2857                    value_values[enc_pos[j] as usize] = new_row[i].clone();
2858                }
2859                new_value_buf.clear();
2860                crate::encoding::encode_row_into(value_values, new_value_buf);
2861
2862                if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2863                    return Err(SqlError::RowTooLarge {
2864                        size: new_value_buf.len(),
2865                        max: citadel_core::MAX_VALUE_SIZE,
2866                    });
2867                }
2868
2869                if capture_returning {
2870                    *captured.borrow_mut() = Some((old_row.clone(), new_row.clone()));
2871                }
2872                Ok(UpsertAction::Replace(new_value_buf.clone()))
2873            })
2874        })?;
2875
2876    match outcome {
2877        UpsertOutcome::Inserted => Ok(InsertRowOutcome::Inserted),
2878        UpsertOutcome::Updated => {
2879            if capture_returning {
2880                let (old, new) = captured.into_inner().ok_or_else(|| {
2881                    SqlError::InvalidValue("DO UPDATE produced no captured rows".into())
2882                })?;
2883                Ok(InsertRowOutcome::Updated { old, new })
2884            } else {
2885                Ok(InsertRowOutcome::Inserted)
2886            }
2887        }
2888        UpsertOutcome::Skipped => Ok(InsertRowOutcome::Skipped),
2889    }
2890}
2891
2892fn fetch_unique_index_pk(
2893    wtx: &mut WriteTxn<'_>,
2894    table_schema: &TableSchema,
2895    index_idx: usize,
2896    row: &[Value],
2897) -> Result<Vec<u8>> {
2898    let idx = &table_schema.indices[index_idx];
2899    let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2900    let indexed: Vec<Value> = idx
2901        .column_positions_iter()
2902        .map(|col_idx| row[col_idx as usize].clone())
2903        .collect();
2904    let key = crate::encoding::encode_composite_key(&indexed);
2905    let value = wtx
2906        .table_get(&idx_table, &key)
2907        .map_err(SqlError::Storage)?
2908        .ok_or_else(|| {
2909            SqlError::InvalidValue("unique index missing expected collision entry".into())
2910        })?;
2911    Ok(value)
2912}
2913
2914#[allow(clippy::too_many_arguments)]
2915fn apply_do_update(
2916    wtx: &mut WriteTxn<'_>,
2917    table_schema: &TableSchema,
2918    pk_key: &[u8],
2919    proposed_row: &[Value],
2920    assignments: &[(usize, Expr)],
2921    where_clause: Option<&Expr>,
2922    col_map: &ColumnMap,
2923    capture_returning: bool,
2924) -> Result<InsertRowOutcome> {
2925    let old_value = wtx
2926        .table_get(table_schema.name.as_bytes(), pk_key)
2927        .map_err(SqlError::Storage)?
2928        .ok_or_else(|| SqlError::InvalidValue("primary row missing for DO UPDATE target".into()))?;
2929    let old_row = decode_full_row(table_schema, pk_key, &old_value)?;
2930    apply_do_update_with_old_row(
2931        wtx,
2932        table_schema,
2933        pk_key,
2934        &old_row,
2935        proposed_row,
2936        assignments,
2937        where_clause,
2938        col_map,
2939        capture_returning,
2940    )
2941}
2942
2943#[allow(clippy::too_many_arguments)]
2944fn apply_do_update_with_old_row(
2945    wtx: &mut WriteTxn<'_>,
2946    table_schema: &TableSchema,
2947    old_pk_key: &[u8],
2948    old_row: &[Value],
2949    proposed_row: &[Value],
2950    assignments: &[(usize, Expr)],
2951    where_clause: Option<&Expr>,
2952    col_map: &ColumnMap,
2953    capture_returning: bool,
2954) -> Result<InsertRowOutcome> {
2955    if let Some(w) = where_clause {
2956        let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2957        let result = eval_expr(w, &ctx)?;
2958        if result.is_null() || !is_truthy(&result) {
2959            return Ok(InsertRowOutcome::Skipped);
2960        }
2961    }
2962
2963    let mut new_row = old_row.to_vec();
2964    for (col_idx, expr) in assignments {
2965        let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2966        let val = eval_expr(expr, &ctx)?;
2967        let col = &table_schema.columns[*col_idx];
2968        new_row[*col_idx] = if val.is_null() {
2969            Value::Null
2970        } else {
2971            let got = val.data_type();
2972            val.coerce_into(col.data_type)
2973                .ok_or_else(|| SqlError::TypeMismatch {
2974                    expected: col.data_type.to_string(),
2975                    got: got.to_string(),
2976                })?
2977        };
2978    }
2979
2980    for col in &table_schema.columns {
2981        if matches!(
2982            col.generated_kind,
2983            Some(crate::parser::GeneratedKind::Stored)
2984        ) {
2985            let val = eval_expr(
2986                col.generated_expr.as_ref().unwrap(),
2987                &EvalCtx::new(col_map, &new_row),
2988            )?;
2989            let pos = col.position as usize;
2990            new_row[pos] = if val.is_null() {
2991                if !col.nullable {
2992                    return Err(SqlError::NotNullViolation(col.name.clone()));
2993                }
2994                Value::Null
2995            } else {
2996                let got = val.data_type();
2997                val.coerce_into(col.data_type)
2998                    .ok_or_else(|| SqlError::TypeMismatch {
2999                        expected: col.data_type.to_string(),
3000                        got: got.to_string(),
3001                    })?
3002            };
3003        }
3004    }
3005
3006    let pk_indices = table_schema.pk_indices();
3007    let assigned_pk = assignments.iter().any(|(ci, _)| pk_indices.contains(ci));
3008    let pk_changed = assigned_pk && pk_indices.iter().any(|&i| old_row[i] != new_row[i]);
3009
3010    for (assigned_idx, _) in assignments {
3011        let col = &table_schema.columns[*assigned_idx];
3012        if !col.nullable && new_row[col.position as usize].is_null() {
3013            return Err(SqlError::NotNullViolation(col.name.clone()));
3014        }
3015    }
3016    if table_schema.has_checks() {
3017        for col in &table_schema.columns {
3018            if let Some(ref check) = col.check_expr {
3019                let ctx = EvalCtx::new(col_map, &new_row);
3020                let result = eval_expr(check, &ctx)?;
3021                if !is_truthy(&result) && !result.is_null() {
3022                    let name = col.check_name.as_deref().unwrap_or(&col.name);
3023                    return Err(SqlError::CheckViolation(name.to_string()));
3024                }
3025            }
3026        }
3027        for tc in &table_schema.check_constraints {
3028            let ctx = EvalCtx::new(col_map, &new_row);
3029            let result = eval_expr(&tc.expr, &ctx)?;
3030            if !is_truthy(&result) && !result.is_null() {
3031                let name = tc.name.as_deref().unwrap_or(&tc.sql);
3032                return Err(SqlError::CheckViolation(name.to_string()));
3033            }
3034        }
3035    }
3036    for fk in &table_schema.foreign_keys {
3037        let changed = fk
3038            .columns
3039            .iter()
3040            .any(|&ci| old_row[ci as usize] != new_row[ci as usize]);
3041        if !changed {
3042            continue;
3043        }
3044        let any_null = fk.columns.iter().any(|&ci| new_row[ci as usize].is_null());
3045        if any_null {
3046            continue;
3047        }
3048        let fk_vals: Vec<Value> = fk
3049            .columns
3050            .iter()
3051            .map(|&ci| new_row[ci as usize].clone())
3052            .collect();
3053        let fk_key = crate::encoding::encode_composite_key(&fk_vals);
3054        if fk.deferrable && fk.initially_deferred {
3055            let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
3056            wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
3057                fk_name: name,
3058                foreign_table: fk.foreign_table.as_bytes().to_vec(),
3059                parent_key: fk_key,
3060            });
3061            continue;
3062        }
3063        if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key) {
3064            let found = wtx
3065                .table_get(fk.foreign_table.as_bytes(), &fk_key)
3066                .map_err(SqlError::Storage)?;
3067            if found.is_none() {
3068                let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
3069                return Err(SqlError::ForeignKeyViolation(name.to_string()));
3070            }
3071            wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key);
3072        }
3073    }
3074
3075    let has_indices = !table_schema.indices.is_empty();
3076    let old_pk_values: Vec<Value> = if has_indices || pk_changed {
3077        pk_indices.iter().map(|&i| old_row[i].clone()).collect()
3078    } else {
3079        Vec::new()
3080    };
3081    let new_pk_values: Vec<Value> = if has_indices || pk_changed {
3082        pk_indices.iter().map(|&i| new_row[i].clone()).collect()
3083    } else {
3084        Vec::new()
3085    };
3086
3087    let non_pk = table_schema.non_pk_indices();
3088    let enc_pos = table_schema.encoding_positions();
3089    let phys_count = table_schema.physical_non_pk_count();
3090    let dropped = table_schema.dropped_non_pk_slots();
3091    let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
3092    for &slot in dropped {
3093        value_values[slot as usize] = Value::Null;
3094    }
3095    for (j, &i) in non_pk.iter().enumerate() {
3096        let col = &table_schema.columns[i];
3097        value_values[enc_pos[j] as usize] = if matches!(
3098            col.generated_kind,
3099            Some(crate::parser::GeneratedKind::Virtual)
3100        ) {
3101            Value::Null
3102        } else {
3103            new_row[i].clone()
3104        };
3105    }
3106    let mut new_value_buf = Vec::with_capacity(256);
3107    crate::encoding::encode_row_into(&value_values, &mut new_value_buf);
3108
3109    if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
3110        return Err(SqlError::RowTooLarge {
3111            size: new_value_buf.len(),
3112            max: citadel_core::MAX_VALUE_SIZE,
3113        });
3114    }
3115
3116    if pk_changed {
3117        let new_pk_key = crate::encoding::encode_composite_key(&new_pk_values);
3118        let inserted = wtx
3119            .table_insert(table_schema.name.as_bytes(), &new_pk_key, &new_value_buf)
3120            .map_err(SqlError::Storage)?;
3121        if !inserted {
3122            return Err(SqlError::DuplicateKey);
3123        }
3124        wtx.table_delete(table_schema.name.as_bytes(), old_pk_key)
3125            .map_err(SqlError::Storage)?;
3126        for idx in &table_schema.indices {
3127            let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
3128            let old_idx_key =
3129                encode_index_key_with_schema(idx, old_row, &old_pk_values, table_schema);
3130            wtx.table_delete(&idx_table, &old_idx_key)
3131                .map_err(SqlError::Storage)?;
3132            let new_idx_key =
3133                encode_index_key_with_schema(idx, &new_row, &new_pk_values, table_schema);
3134            let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
3135            let is_new = wtx
3136                .table_insert(&idx_table, &new_idx_key, &new_idx_val)
3137                .map_err(SqlError::Storage)?;
3138            if idx.unique && !is_new {
3139                let any_null = idx
3140                    .column_positions_iter()
3141                    .any(|c| new_row[c as usize].is_null());
3142                if !any_null {
3143                    return Err(SqlError::UniqueViolation(idx.name.clone()));
3144                }
3145            }
3146        }
3147    } else {
3148        wtx.table_update_sorted(
3149            table_schema.name.as_bytes(),
3150            &[(old_pk_key, new_value_buf.as_slice())],
3151        )
3152        .map_err(SqlError::Storage)?;
3153        let col_map_partial = any_partial_index(table_schema).then(|| table_schema.column_map());
3154        for idx in &table_schema.indices {
3155            let cols_changed = index_columns_changed(idx, old_row, &new_row);
3156            let (del, ins) = partial_idx_update_actions(
3157                idx,
3158                old_row,
3159                &new_row,
3160                cols_changed,
3161                false,
3162                col_map_partial,
3163            );
3164            let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
3165            if del {
3166                let old_idx_key =
3167                    encode_index_key_with_schema(idx, old_row, &old_pk_values, table_schema);
3168                wtx.table_delete(&idx_table, &old_idx_key)
3169                    .map_err(SqlError::Storage)?;
3170            }
3171            if ins {
3172                let new_idx_key =
3173                    encode_index_key_with_schema(idx, &new_row, &new_pk_values, table_schema);
3174                let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
3175                let is_new = wtx
3176                    .table_insert(&idx_table, &new_idx_key, &new_idx_val)
3177                    .map_err(SqlError::Storage)?;
3178                if idx.unique && !is_new {
3179                    let any_null = idx
3180                        .column_positions_iter()
3181                        .any(|c| new_row[c as usize].is_null());
3182                    if !any_null {
3183                        return Err(SqlError::UniqueViolation(idx.name.clone()));
3184                    }
3185                }
3186            }
3187        }
3188    }
3189
3190    if capture_returning {
3191        Ok(InsertRowOutcome::Updated {
3192            old: old_row.to_vec(),
3193            new: new_row,
3194        })
3195    } else {
3196        Ok(InsertRowOutcome::Inserted)
3197    }
3198}
3199
3200fn detect_fast_paths(
3201    ts: &TableSchema,
3202    assignments: &[(usize, Expr)],
3203) -> Option<Vec<DoUpdateFastPath>> {
3204    let non_pk = ts.non_pk_indices();
3205    let enc_pos = ts.encoding_positions();
3206    let mut out = Vec::with_capacity(assignments.len());
3207    for (col_idx, expr) in assignments {
3208        let col = &ts.columns[*col_idx];
3209        if col.data_type != DataType::Integer {
3210            return None;
3211        }
3212        let nonpk_order = non_pk.iter().position(|&i| i == *col_idx)?;
3213        let phys_idx = enc_pos[nonpk_order] as usize;
3214
3215        if let Expr::BinaryOp { left, op, right } = expr {
3216            if !matches!(op, BinOp::Add | BinOp::Sub) {
3217                return None;
3218            }
3219            let reads_target =
3220                matches!(left.as_ref(), Expr::Column(n) if n.eq_ignore_ascii_case(&col.name));
3221            if !reads_target {
3222                return None;
3223            }
3224            if let Expr::Literal(Value::Integer(n)) = right.as_ref() {
3225                let delta = if matches!(op, BinOp::Sub) { -n } else { *n };
3226                let _ = col_idx;
3227                out.push(DoUpdateFastPath::IntAddConst { phys_idx, delta });
3228                continue;
3229            }
3230            return None;
3231        }
3232        return None;
3233    }
3234    Some(out)
3235}
3236
3237fn compile_on_conflict(oc: &OnConflictClause, ts: &TableSchema) -> Result<CompiledOnConflict> {
3238    let target = oc
3239        .target
3240        .as_ref()
3241        .map(|t| resolve_conflict_target(t, ts))
3242        .transpose()?;
3243    match &oc.action {
3244        OnConflictAction::DoNothing => Ok(CompiledOnConflict::DoNothing { target }),
3245        OnConflictAction::DoUpdate {
3246            assignments,
3247            where_clause,
3248        } => {
3249            let target = target.ok_or_else(|| {
3250                SqlError::Plan("ON CONFLICT without target requires DO NOTHING".into())
3251            })?;
3252            let compiled_assignments: Vec<(usize, Expr)> = assignments
3253                .iter()
3254                .map(|(name, expr)| {
3255                    let col_idx = ts
3256                        .column_index(name)
3257                        .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?;
3258                    Ok((col_idx, expr.clone()))
3259                })
3260                .collect::<Result<_>>()?;
3261            let fast_paths = if where_clause.is_none() {
3262                detect_fast_paths(ts, &compiled_assignments)
3263            } else {
3264                None
3265            };
3266            Ok(CompiledOnConflict::DoUpdate {
3267                target,
3268                assignments: compiled_assignments,
3269                where_clause: where_clause.clone(),
3270                fast_paths,
3271            })
3272        }
3273    }
3274}
3275
3276/// Caller MUST check `cache.is_trivial_fast` first.
3277fn exec_insert_trivial_fast(
3278    wtx: &mut WriteTxn<'_>,
3279    table_lower: &str,
3280    cache: &InsertCache,
3281    bufs: &mut InsertBufs,
3282    params: &[Value],
3283) -> Result<ExecutionResult> {
3284    let prog = cache
3285        .trivial_fast_program
3286        .as_ref()
3287        .expect("trivial fast: program");
3288
3289    for &p in &prog.not_null_param_indices {
3290        if params[p as usize].is_null() {
3291            return Err(SqlError::NotNullViolation(format!("param@{p}")));
3292        }
3293    }
3294
3295    match &params[prog.pk_param as usize] {
3296        Value::Integer(v) => crate::encoding::encode_int_key_into(*v, &mut bufs.key_buf),
3297        _ => return Err(SqlError::InvalidValue("non-integer PK in fast path".into())),
3298    }
3299
3300    bufs.value_buf.clear();
3301    bufs.value_buf.extend_from_slice(&prog.template);
3302
3303    for op in &prog.ops {
3304        match op {
3305            WriteOp::ParamI64 { param_idx, off } => match &params[*param_idx as usize] {
3306                Value::Integer(v) => {
3307                    let off = *off as usize;
3308                    bufs.value_buf[off..off + 8].copy_from_slice(&v.to_le_bytes());
3309                }
3310                other => {
3311                    return Err(SqlError::TypeMismatch {
3312                        expected: "Integer".into(),
3313                        got: other.data_type().to_string(),
3314                    });
3315                }
3316            },
3317            WriteOp::LiteralI64 { value, off } => {
3318                let off = *off as usize;
3319                bufs.value_buf[off..off + 8].copy_from_slice(&value.to_le_bytes());
3320            }
3321            WriteOp::GenAddParamsI64 {
3322                a_param,
3323                b_param,
3324                off,
3325                bitmap_byte_off,
3326                bitmap_bit_mask,
3327            } => match (&params[*a_param as usize], &params[*b_param as usize]) {
3328                (Value::Integer(a), Value::Integer(b)) => {
3329                    let off = *off as usize;
3330                    bufs.value_buf[off..off + 8].copy_from_slice(&a.wrapping_add(*b).to_le_bytes());
3331                }
3332                _ => {
3333                    bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
3334                }
3335            },
3336            WriteOp::GenMulAddParamI64 {
3337                param_idx,
3338                mul,
3339                add,
3340                off,
3341                bitmap_byte_off,
3342                bitmap_bit_mask,
3343            } => match &params[*param_idx as usize] {
3344                Value::Integer(v) => {
3345                    let r = v.wrapping_mul(*mul).wrapping_add(*add);
3346                    let off = *off as usize;
3347                    bufs.value_buf[off..off + 8].copy_from_slice(&r.to_le_bytes());
3348                }
3349                _ => {
3350                    bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
3351                }
3352            },
3353        }
3354    }
3355
3356    let is_new = wtx
3357        .table_insert(table_lower.as_bytes(), &bufs.key_buf, &bufs.value_buf)
3358        .map_err(SqlError::Storage)?;
3359    if !is_new {
3360        return Err(SqlError::DuplicateKey);
3361    }
3362    Ok(ExecutionResult::RowsAffected(1))
3363}
3364
3365fn build_bind_plan(
3366    stmt: &InsertStmt,
3367    col_indices: &[usize],
3368    col_data_types: &[DataType],
3369) -> Option<Vec<BindAction>> {
3370    let rows = match &stmt.source {
3371        InsertSource::Values(rows) => rows,
3372        _ => return None,
3373    };
3374    if rows.len() != 1 {
3375        return None;
3376    }
3377    let value_row = &rows[0];
3378    if value_row.len() != col_indices.len() {
3379        return None;
3380    }
3381    let mut plan = Vec::with_capacity(value_row.len());
3382    for (i, expr) in value_row.iter().enumerate() {
3383        let col_idx = col_indices[i];
3384        let target = col_data_types[col_idx];
3385        match expr {
3386            Expr::Parameter(n) => {
3387                if *n == 0 {
3388                    return None;
3389                }
3390                plan.push(BindAction::Param {
3391                    param_idx: n - 1,
3392                    col_idx,
3393                    target,
3394                });
3395            }
3396            Expr::Literal(v) => plan.push(BindAction::Literal {
3397                value: v.clone(),
3398                col_idx,
3399            }),
3400            _ => return None,
3401        }
3402    }
3403    Some(plan)
3404}
3405
3406impl CompiledInsert {
3407    pub fn try_compile(schema: &SchemaManager, stmt: &InsertStmt) -> Option<Self> {
3408        let lower = stmt.table.to_ascii_lowercase();
3409        let cached = if let Some(ts) = schema.get(&lower) {
3410            let insert_columns: Vec<&str> = if stmt.columns.is_empty() {
3411                ts.columns.iter().map(|c| c.name.as_str()).collect()
3412            } else {
3413                stmt.columns.iter().map(|s| s.as_str()).collect()
3414            };
3415            let mut col_indices = Vec::with_capacity(insert_columns.len());
3416            for name in &insert_columns {
3417                col_indices.push(ts.column_index(name)?);
3418            }
3419            if col_indices
3420                .iter()
3421                .any(|&ci| ts.columns[ci].generated_kind.is_some())
3422            {
3423                return None;
3424            }
3425            let on_conflict = stmt
3426                .on_conflict
3427                .as_ref()
3428                .map(|oc| compile_on_conflict(oc, ts))
3429                .transpose()
3430                .ok()
3431                .flatten()
3432                .map(Arc::new);
3433            let generated_col_positions: Vec<usize> = ts
3434                .columns
3435                .iter()
3436                .enumerate()
3437                .filter_map(|(i, c)| {
3438                    matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored))
3439                        .then_some(i)
3440                })
3441                .collect();
3442            let generated_fast_evals: Vec<FastGenEval> = generated_col_positions
3443                .iter()
3444                .map(|&pos| {
3445                    detect_fast_gen_eval(ts.columns[pos].generated_expr.as_ref().unwrap(), ts)
3446                })
3447                .collect();
3448            let row_col_map = if on_conflict.is_some() || !generated_col_positions.is_empty() {
3449                Some(ColumnMap::new(&ts.columns))
3450            } else {
3451                None
3452            };
3453            let pk_indices: Vec<usize> = ts.pk_indices().to_vec();
3454            let non_pk_indices: Vec<usize> = ts.non_pk_indices().to_vec();
3455            let encoding_positions: Vec<u16> = ts.encoding_positions().to_vec();
3456            let dropped_non_pk_slots: Vec<u16> = ts.dropped_non_pk_slots().to_vec();
3457            let phys_count = ts.physical_non_pk_count();
3458            let col_data_types: Vec<DataType> = ts.columns.iter().map(|c| c.data_type).collect();
3459            let single_int_pk =
3460                pk_indices.len() == 1 && ts.columns[pk_indices[0]].data_type == DataType::Integer;
3461            let not_null_indices: Vec<u16> = ts
3462                .columns
3463                .iter()
3464                .filter(|c| !c.nullable)
3465                .map(|c| c.position)
3466                .collect();
3467            let bind_plan = build_bind_plan(stmt, &col_indices, &col_data_types);
3468            let any_defaults_flag = ts.columns.iter().any(|c| c.default_expr.is_some());
3469            let row_fully_overwritten = if any_defaults_flag {
3470                false
3471            } else {
3472                let mut covered: rustc_hash::FxHashSet<usize> =
3473                    col_indices.iter().copied().collect();
3474                covered.extend(generated_col_positions.iter().copied());
3475                for (j, &i) in non_pk_indices.iter().enumerate() {
3476                    let _ = j;
3477                    if matches!(
3478                        ts.columns[i].generated_kind,
3479                        Some(crate::parser::GeneratedKind::Virtual)
3480                    ) {
3481                        covered.insert(i);
3482                    }
3483                }
3484                bind_plan.is_some() && covered.len() == ts.columns.len()
3485            };
3486            let has_fks = !ts.foreign_keys.is_empty();
3487            let has_indices = !ts.indices.is_empty();
3488            let mut non_virtual_pairs: Vec<(usize, usize)> = Vec::new();
3489            let mut null_value_slots: Vec<usize> =
3490                dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3491            for (j, &i) in non_pk_indices.iter().enumerate() {
3492                let slot = encoding_positions[j] as usize;
3493                if matches!(
3494                    ts.columns[i].generated_kind,
3495                    Some(crate::parser::GeneratedKind::Virtual)
3496                ) {
3497                    null_value_slots.push(slot);
3498                } else {
3499                    non_virtual_pairs.push((i, slot));
3500                }
3501            }
3502            let row_encoder = {
3503                let all_int_or_null = non_pk_indices.iter().enumerate().all(|(j, &i)| {
3504                    let col = &ts.columns[i];
3505                    if matches!(
3506                        col.generated_kind,
3507                        Some(crate::parser::GeneratedKind::Virtual)
3508                    ) {
3509                        true
3510                    } else {
3511                        col.data_type == DataType::Integer && encoding_positions[j] != u16::MAX
3512                    }
3513                });
3514                if all_int_or_null {
3515                    let mut null_slots: Vec<usize> =
3516                        dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3517                    for (j, &i) in non_pk_indices.iter().enumerate() {
3518                        if matches!(
3519                            ts.columns[i].generated_kind,
3520                            Some(crate::parser::GeneratedKind::Virtual)
3521                        ) {
3522                            null_slots.push(encoding_positions[j] as usize);
3523                        }
3524                    }
3525                    Some(crate::encoding::build_int_row_template(
3526                        phys_count,
3527                        &null_slots,
3528                    ))
3529                } else {
3530                    None
3531                }
3532            };
3533            let is_trivial_fast_eligible = !insert_has_subquery(stmt)
3534                && !ts.columns.iter().any(|c| c.default_expr.is_some())
3535                && !ts.has_checks()
3536                && !has_fks
3537                && !has_indices
3538                && stmt.on_conflict.is_none()
3539                && stmt.returning.is_none()
3540                && bind_plan.is_some()
3541                && row_encoder.is_some()
3542                && row_fully_overwritten
3543                && single_int_pk
3544                && generated_fast_evals
3545                    .iter()
3546                    .all(|fe| !matches!(fe, FastGenEval::None));
3547            let trivial_fast_program = if is_trivial_fast_eligible {
3548                build_trivial_fast_program(
3549                    bind_plan.as_ref().unwrap(),
3550                    row_encoder.as_ref().unwrap(),
3551                    &non_virtual_pairs,
3552                    &generated_col_positions,
3553                    &generated_fast_evals,
3554                    &pk_indices,
3555                    &ts.columns,
3556                )
3557            } else {
3558                None
3559            };
3560            let is_trivial_fast = trivial_fast_program.is_some();
3561            let has_checks = ts.has_checks();
3562            let any_defaults = ts.columns.iter().any(|c| c.default_expr.is_some());
3563            let needs_scoped_params = bind_plan.is_none()
3564                || has_checks
3565                || any_defaults
3566                || !generated_col_positions.is_empty()
3567                || on_conflict.is_some()
3568                || stmt.returning.is_some()
3569                || insert_has_subquery(stmt)
3570                || super::helpers::any_partial_index(ts);
3571            Some(InsertCache {
3572                col_indices,
3573                has_subquery: insert_has_subquery(stmt),
3574                any_defaults,
3575                has_checks,
3576                on_conflict,
3577                row_col_map,
3578                generated_col_positions,
3579                generated_fast_evals,
3580                pk_indices,
3581                non_pk_indices,
3582                encoding_positions,
3583                dropped_non_pk_slots,
3584                phys_count,
3585                single_int_pk,
3586                not_null_indices,
3587                bind_plan,
3588                row_fully_overwritten,
3589                row_encoder,
3590                is_trivial_fast,
3591                trivial_fast_program,
3592                needs_scoped_params,
3593            })
3594        } else if schema.get_view(&lower).is_some() {
3595            None
3596        } else {
3597            return None;
3598        };
3599        Some(Self {
3600            table_lower: lower,
3601            cached,
3602        })
3603    }
3604}
3605
3606impl CompiledPlan for CompiledInsert {
3607    fn execute(
3608        &self,
3609        db: &Database,
3610        schema: &SchemaManager,
3611        stmt: &Statement,
3612        params: &[Value],
3613        txn: super::compile::ActiveTxnRef<'_, '_>,
3614    ) -> Result<ExecutionResult> {
3615        let ins = match stmt {
3616            Statement::Insert(i) => i,
3617            _ => {
3618                return Err(SqlError::Unsupported(
3619                    "CompiledInsert received non-INSERT statement".into(),
3620                ))
3621            }
3622        };
3623        use super::compile::ActiveTxnRef;
3624        match txn {
3625            ActiveTxnRef::None => exec_insert(db, schema, ins, params),
3626            ActiveTxnRef::Read(_) => Err(SqlError::Unsupported(
3627                "cannot execute mutating statement inside a read-only transaction".into(),
3628            )),
3629            ActiveTxnRef::Write(outer) => match self.cached.as_ref() {
3630                Some(c) if c.is_trivial_fast => with_insert_scratch(|bufs| {
3631                    exec_insert_trivial_fast(outer, &self.table_lower, c, bufs, params)
3632                }),
3633                Some(c) => exec_insert_in_txn_cached(outer, schema, ins, params, c),
3634                None => exec_insert_in_txn(outer, schema, ins, params),
3635            },
3636        }
3637    }
3638
3639    fn uses_scoped_params(&self) -> bool {
3640        match self.cached.as_ref() {
3641            Some(c) => !c.is_trivial_fast && c.needs_scoped_params,
3642            None => true,
3643        }
3644    }
3645}
3646
3647pub struct CompiledDelete {
3648    table_lower: String,
3649}
3650
3651impl CompiledDelete {
3652    pub fn try_compile(schema: &SchemaManager, stmt: &DeleteStmt) -> Option<Self> {
3653        let lower = stmt.table.to_ascii_lowercase();
3654        schema.get(&lower)?;
3655        Some(Self { table_lower: lower })
3656    }
3657}
3658
3659impl CompiledPlan for CompiledDelete {
3660    fn execute(
3661        &self,
3662        db: &Database,
3663        schema: &SchemaManager,
3664        stmt: &Statement,
3665        _params: &[Value],
3666        txn: super::compile::ActiveTxnRef<'_, '_>,
3667    ) -> Result<ExecutionResult> {
3668        let del = match stmt {
3669            Statement::Delete(d) => d,
3670            _ => {
3671                return Err(SqlError::Unsupported(
3672                    "CompiledDelete received non-DELETE statement".into(),
3673                ))
3674            }
3675        };
3676        let _ = &self.table_lower;
3677        use super::compile::ActiveTxnRef;
3678        match txn {
3679            ActiveTxnRef::None => super::write::exec_delete(db, schema, del),
3680            ActiveTxnRef::Read(_) => Err(SqlError::Unsupported(
3681                "cannot execute mutating statement inside a read-only transaction".into(),
3682            )),
3683            ActiveTxnRef::Write(outer) => super::write::exec_delete_in_txn(outer, schema, del),
3684        }
3685    }
3686}
3687
3688fn exec_instead_of_view_insert_auto(
3689    db: &Database,
3690    schema: &SchemaManager,
3691    view_name: &str,
3692    aliases: &[String],
3693    stmt: &InsertStmt,
3694    params: &[Value],
3695) -> Result<ExecutionResult> {
3696    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
3697    let r = exec_instead_of_view_insert_in_txn(&mut wtx, schema, view_name, aliases, stmt, params)?;
3698    wtx.commit().map_err(SqlError::Storage)?;
3699    Ok(r)
3700}
3701
3702fn exec_instead_of_view_insert_in_txn(
3703    wtx: &mut WriteTxn<'_>,
3704    schema: &SchemaManager,
3705    view_name: &str,
3706    aliases: &[String],
3707    stmt: &InsertStmt,
3708    params: &[Value],
3709) -> Result<ExecutionResult> {
3710    // CREATE VIEW without explicit aliases stores an empty vec; derive at runtime.
3711    let resolved_aliases: Vec<String> = if aliases.is_empty() {
3712        derive_view_columns(wtx, schema, view_name)?
3713    } else {
3714        aliases.to_vec()
3715    };
3716    let view_cols = super::triggers::view_columns_from_aliases(&resolved_aliases);
3717    let alias_map: rustc_hash::FxHashMap<String, usize> = resolved_aliases
3718        .iter()
3719        .enumerate()
3720        .map(|(i, name)| (name.to_ascii_lowercase(), i))
3721        .collect();
3722
3723    let target_positions: Vec<usize> = if stmt.columns.is_empty() {
3724        (0..resolved_aliases.len()).collect()
3725    } else {
3726        stmt.columns
3727            .iter()
3728            .map(|c| {
3729                alias_map
3730                    .get(&c.to_ascii_lowercase())
3731                    .copied()
3732                    .ok_or_else(|| SqlError::ColumnNotFound(c.clone()))
3733            })
3734            .collect::<Result<_>>()?
3735    };
3736
3737    let source_rows: Vec<Vec<Value>> = match &stmt.source {
3738        InsertSource::Values(rows) => {
3739            let mut out = Vec::with_capacity(rows.len());
3740            for row in rows {
3741                if row.len() != target_positions.len() {
3742                    return Err(SqlError::InvalidValue(format!(
3743                        "expected {} values, got {}",
3744                        target_positions.len(),
3745                        row.len()
3746                    )));
3747                }
3748                let mut vals = Vec::with_capacity(row.len());
3749                for expr in row {
3750                    let v = match expr {
3751                        Expr::Parameter(n) => params
3752                            .get(n - 1)
3753                            .cloned()
3754                            .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
3755                        Expr::Literal(v) => v.clone(),
3756                        other => eval_const_expr(other)?,
3757                    };
3758                    vals.push(v);
3759                }
3760                out.push(vals);
3761            }
3762            out
3763        }
3764        InsertSource::Select(sq) => {
3765            let empty_ctes = CteContext::default();
3766            let qr = exec_query_body_write(wtx, schema, &sq.body, &empty_ctes)?;
3767            qr.rows
3768        }
3769    };
3770
3771    let mut count: u64 = 0;
3772    for row in source_rows {
3773        if row.len() != target_positions.len() {
3774            return Err(SqlError::InvalidValue(format!(
3775                "expected {} values, got {}",
3776                target_positions.len(),
3777                row.len()
3778            )));
3779        }
3780        let mut new_row = vec![Value::Null; resolved_aliases.len()];
3781        for (slot, val) in target_positions.iter().zip(row) {
3782            new_row[*slot] = val;
3783        }
3784        super::triggers::fire_row_triggers(
3785            wtx,
3786            schema,
3787            view_name,
3788            crate::parser::TriggerTiming::InsteadOf,
3789            super::triggers::FireEvent::Insert,
3790            None,
3791            Some(new_row),
3792            &view_cols,
3793        )?;
3794        count += 1;
3795    }
3796    Ok(ExecutionResult::RowsAffected(count))
3797}
3798
3799fn derive_view_columns(
3800    wtx: &mut WriteTxn<'_>,
3801    schema: &SchemaManager,
3802    view_name: &str,
3803) -> Result<Vec<String>> {
3804    use crate::parser::{QueryBody, SelectColumn, SelectQuery, SelectStmt};
3805    let sel = SelectStmt {
3806        columns: vec![SelectColumn::AllColumns],
3807        from: view_name.to_string(),
3808        from_alias: None,
3809        from_subquery: None,
3810        from_args: None,
3811        from_json_table: None,
3812        joins: vec![],
3813        distinct: false,
3814        where_clause: None,
3815        order_by: vec![],
3816        limit: Some(Expr::Literal(Value::Integer(1))),
3817        offset: None,
3818        group_by: vec![],
3819        having: None,
3820    };
3821    let sq = SelectQuery {
3822        ctes: vec![],
3823        recursive: false,
3824        body: QueryBody::Select(Box::new(sel)),
3825    };
3826    let qr = super::cte::exec_select_query_in_txn(wtx, schema, &sq)?;
3827    match qr {
3828        ExecutionResult::Query(q) => Ok(q.columns),
3829        _ => Ok(Vec::new()),
3830    }
3831}
3832
3833#[cfg(test)]
3834#[path = "dml_tests.rs"]
3835mod tests;