Skip to main content

citadel_sql/executor/
dml.rs

1use std::cell::RefCell;
2use std::sync::Arc;
3
4use citadel::Database;
5use citadel_buffer::btree::{UpsertAction, UpsertOutcome};
6use citadel_txn::read_txn::ReadTxn;
7use citadel_txn::write_txn::WriteTxn;
8use rustc_hash::FxHashMap;
9
10use crate::encoding::{encode_composite_key_into, encode_row_into};
11use crate::error::{Result, SqlError};
12use crate::eval::{eval_expr, is_truthy, ColumnMap, EvalCtx};
13use crate::parser::*;
14use crate::types::*;
15
16use crate::schema::SchemaManager;
17
18use super::compile::CompiledPlan;
19use super::helpers::*;
20use super::CteContext;
21
22pub(super) fn exec_insert(
23    db: &Database,
24    schema: &SchemaManager,
25    stmt: &InsertStmt,
26    params: &[Value],
27) -> Result<ExecutionResult> {
28    let empty_ctes = CteContext::default();
29    let materialized;
30    let stmt = if insert_has_subquery(stmt) {
31        materialized = materialize_insert(stmt, &mut |sub| {
32            exec_subquery_read(db, schema, sub, &empty_ctes)
33        })?;
34        &materialized
35    } else {
36        stmt
37    };
38
39    let lower_name = stmt.table.to_ascii_lowercase();
40    if let Some(view_def) = schema.get_view(&lower_name) {
41        if super::triggers::has_instead_of(schema, &lower_name, super::triggers::FireEvent::Insert)
42        {
43            let aliases = view_def.column_aliases.clone();
44            return exec_instead_of_view_insert_auto(
45                db,
46                schema,
47                &lower_name,
48                &aliases,
49                stmt,
50                params,
51            );
52        }
53        return Err(SqlError::CannotModifyView(stmt.table.clone()));
54    }
55    if schema.get_matview(&lower_name).is_some() {
56        return Err(SqlError::CannotModifyView(format!(
57            "materialized view '{}' is read-only — use REFRESH MATERIALIZED VIEW",
58            stmt.table
59        )));
60    }
61    let table_schema = schema
62        .get(&lower_name)
63        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
64
65    let insert_columns = if stmt.columns.is_empty() {
66        table_schema
67            .columns
68            .iter()
69            .map(|c| c.name.clone())
70            .collect::<Vec<_>>()
71    } else {
72        stmt.columns
73            .iter()
74            .map(|c| c.to_ascii_lowercase())
75            .collect()
76    };
77
78    let col_indices: Vec<usize> = insert_columns
79        .iter()
80        .map(|name| {
81            table_schema
82                .column_index(name)
83                .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
84        })
85        .collect::<Result<_>>()?;
86
87    for &ci in &col_indices {
88        if table_schema.columns[ci].generated_kind.is_some() {
89            return Err(SqlError::CannotInsertIntoGeneratedColumn(
90                table_schema.columns[ci].name.clone(),
91            ));
92        }
93    }
94
95    let defaults: Vec<(usize, &Expr)> = table_schema
96        .columns
97        .iter()
98        .filter(|c| c.default_expr.is_some() && !col_indices.contains(&(c.position as usize)))
99        .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
100        .collect();
101
102    let generated_cols: Vec<(usize, &Expr)> = table_schema
103        .columns
104        .iter()
105        .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
106        .map(|c| (c.position as usize, c.generated_expr.as_ref().unwrap()))
107        .collect();
108
109    let has_checks = table_schema.has_checks();
110    let strict = table_schema.is_strict();
111    let row_col_map_for_gen = if !generated_cols.is_empty() {
112        Some(ColumnMap::new(&table_schema.columns))
113    } else {
114        None
115    };
116    let check_col_map = if has_checks {
117        Some(ColumnMap::new(&table_schema.columns))
118    } else {
119        None
120    };
121
122    let select_rows = match &stmt.source {
123        InsertSource::Select(sq) => {
124            let insert_ctes =
125                super::materialize_all_ctes(&sq.ctes, sq.recursive, &mut |body, ctx| {
126                    exec_query_body_read(db, schema, body, ctx)
127                })?;
128            let qr = exec_query_body_read(db, schema, &sq.body, &insert_ctes)?;
129            Some(qr.rows)
130        }
131        InsertSource::Values(_) => None,
132    };
133
134    let compiled_conflict: Option<Arc<CompiledOnConflict>> = stmt
135        .on_conflict
136        .as_ref()
137        .map(|oc| compile_on_conflict(oc, table_schema).map(Arc::new))
138        .transpose()?;
139
140    let row_col_map = compiled_conflict
141        .as_ref()
142        .map(|_| ColumnMap::new(&table_schema.columns));
143
144    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
145    let mut count: u64 = 0;
146    let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
147        stmt.returning.as_ref().map(|_| Vec::new());
148
149    let pk_indices = table_schema.pk_indices();
150    let non_pk = table_schema.non_pk_indices();
151    let enc_pos = table_schema.encoding_positions();
152    let phys_count = table_schema.physical_non_pk_count();
153    let mut row = vec![Value::Null; table_schema.columns.len()];
154    let mut pk_values: Vec<Value> = vec![Value::Null; pk_indices.len()];
155    let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
156    let mut key_buf: Vec<u8> = Vec::with_capacity(64);
157    let mut value_buf: Vec<u8> = Vec::with_capacity(256);
158    let mut fk_key_buf: Vec<u8> = Vec::with_capacity(64);
159
160    let values = match &stmt.source {
161        InsertSource::Values(rows) => Some(rows.as_slice()),
162        InsertSource::Select(_) => None,
163    };
164    let sel_rows = select_rows.as_deref();
165
166    let total = match (values, sel_rows) {
167        (Some(rows), _) => rows.len(),
168        (_, Some(rows)) => rows.len(),
169        _ => 0,
170    };
171
172    if let Some(sel) = sel_rows {
173        if !sel.is_empty() && sel[0].len() != insert_columns.len() {
174            return Err(SqlError::InvalidValue(format!(
175                "INSERT ... SELECT column count mismatch: expected {}, got {}",
176                insert_columns.len(),
177                sel[0].len()
178            )));
179        }
180    }
181
182    let has_insert_statement_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
183        t.enabled
184            && t.granularity == crate::parser::TriggerGranularity::ForEachStatement
185            && t.events
186                .iter()
187                .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
188    });
189    let mut stmt_new_rows: Vec<Vec<Value>> = if has_insert_statement_triggers {
190        Vec::with_capacity(total)
191    } else {
192        Vec::new()
193    };
194
195    if has_insert_statement_triggers {
196        super::triggers::fire_statement_triggers(
197            &mut wtx,
198            schema,
199            &table_schema.name,
200            crate::parser::TriggerTiming::Before,
201            super::triggers::FireEvent::Insert,
202            &table_schema.columns,
203            &[],
204            &[],
205        )?;
206    }
207
208    for idx in 0..total {
209        for v in row.iter_mut() {
210            *v = Value::Null;
211        }
212
213        if let Some(value_rows) = values {
214            let value_row = &value_rows[idx];
215            if value_row.len() != insert_columns.len() {
216                return Err(SqlError::InvalidValue(format!(
217                    "expected {} values, got {}",
218                    insert_columns.len(),
219                    value_row.len()
220                )));
221            }
222            for (i, expr) in value_row.iter().enumerate() {
223                let val = if let Expr::Parameter(n) = expr {
224                    params
225                        .get(n - 1)
226                        .cloned()
227                        .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?
228                } else {
229                    eval_const_expr(expr)?
230                };
231                let col_idx = col_indices[i];
232                let col = &table_schema.columns[col_idx];
233                row[col_idx] = if val.is_null() {
234                    Value::Null
235                } else {
236                    coerce_for_column(val, col, strict)?
237                };
238            }
239        } else if let Some(sel) = sel_rows {
240            let sel_row = &sel[idx];
241            for (i, val) in sel_row.iter().enumerate() {
242                let col_idx = col_indices[i];
243                let col = &table_schema.columns[col_idx];
244                row[col_idx] = if val.is_null() {
245                    Value::Null
246                } else {
247                    coerce_for_column(val.clone(), col, strict)?
248                };
249            }
250        }
251
252        for &(pos, def_expr) in &defaults {
253            let val = eval_const_expr(def_expr)?;
254            let col = &table_schema.columns[pos];
255            if !val.is_null() {
256                row[pos] = coerce_for_column(val, col, strict)?;
257            }
258        }
259
260        if let Some(ref gen_map) = row_col_map_for_gen {
261            for &(pos, gen_expr) in &generated_cols {
262                let val = eval_expr(gen_expr, &EvalCtx::new(gen_map, &row))?;
263                let col = &table_schema.columns[pos];
264                row[pos] = if val.is_null() {
265                    Value::Null
266                } else {
267                    coerce_for_column(val, col, strict)?
268                };
269            }
270        }
271
272        for col in &table_schema.columns {
273            if !col.nullable && row[col.position as usize].is_null() {
274                return Err(SqlError::NotNullViolation(col.name.clone()));
275            }
276        }
277
278        if let Some(ref col_map) = check_col_map {
279            for col in &table_schema.columns {
280                if let Some(ref check) = col.check_expr {
281                    let result = eval_expr(check, &EvalCtx::new(col_map, &row))?;
282                    if !is_truthy(&result) && !result.is_null() {
283                        let name = col.check_name.as_deref().unwrap_or(&col.name);
284                        return Err(SqlError::CheckViolation(name.to_string()));
285                    }
286                }
287            }
288            for tc in &table_schema.check_constraints {
289                let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &row))?;
290                if !is_truthy(&result) && !result.is_null() {
291                    let name = tc.name.as_deref().unwrap_or(&tc.sql);
292                    return Err(SqlError::CheckViolation(name.to_string()));
293                }
294            }
295        }
296
297        for fk in &table_schema.foreign_keys {
298            let any_null = fk.columns.iter().any(|&ci| row[ci as usize].is_null());
299            if any_null {
300                continue; // MATCH SIMPLE: skip if any FK col is NULL
301            }
302            let fk_vals: Vec<Value> = fk
303                .columns
304                .iter()
305                .map(|&ci| row[ci as usize].clone())
306                .collect();
307            fk_key_buf.clear();
308            encode_composite_key_into(&fk_vals, &mut fk_key_buf);
309            if fk.deferrable && fk.initially_deferred {
310                let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
311                wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
312                    fk_name: name,
313                    foreign_table: fk.foreign_table.as_bytes().to_vec(),
314                    parent_key: fk_key_buf.clone(),
315                });
316                continue;
317            }
318            if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key_buf) {
319                let found = wtx
320                    .table_get(fk.foreign_table.as_bytes(), &fk_key_buf)
321                    .map_err(SqlError::Storage)?;
322                if found.is_none() {
323                    let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
324                    return Err(SqlError::ForeignKeyViolation(name.to_string()));
325                }
326                wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key_buf);
327            }
328        }
329
330        let proposed_row_for_returning: Option<Vec<Value>> =
331            returning_rows.as_ref().map(|_| row.clone());
332        let row_for_stmt_trigger: Option<Vec<Value>> = if has_insert_statement_triggers {
333            Some(row.clone())
334        } else {
335            None
336        };
337
338        let has_before_insert_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
339            t.enabled
340                && t.timing == crate::parser::TriggerTiming::Before
341                && t.granularity == crate::parser::TriggerGranularity::ForEachRow
342                && t.events
343                    .iter()
344                    .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
345        });
346        if has_before_insert_triggers {
347            super::triggers::fire_row_triggers(
348                &mut wtx,
349                schema,
350                &table_schema.name,
351                crate::parser::TriggerTiming::Before,
352                super::triggers::FireEvent::Insert,
353                None,
354                Some(row.clone()),
355                &table_schema.columns,
356            )?;
357        }
358
359        for (j, &i) in pk_indices.iter().enumerate() {
360            pk_values[j] = std::mem::replace(&mut row[i], Value::Null);
361        }
362        encode_composite_key_into(&pk_values, &mut key_buf);
363
364        for (j, &i) in non_pk.iter().enumerate() {
365            let col = &table_schema.columns[i];
366            if matches!(
367                col.generated_kind,
368                Some(crate::parser::GeneratedKind::Virtual)
369            ) {
370                value_values[enc_pos[j] as usize] = Value::Null;
371                row[i] = Value::Null;
372            } else {
373                value_values[enc_pos[j] as usize] = std::mem::replace(&mut row[i], Value::Null);
374            }
375        }
376        encode_row_into(&value_values, &mut value_buf);
377
378        if key_buf.len() > citadel_core::MAX_KEY_SIZE {
379            return Err(SqlError::KeyTooLarge {
380                size: key_buf.len(),
381                max: citadel_core::MAX_KEY_SIZE,
382            });
383        }
384        if value_buf.len() > citadel_core::MAX_VALUE_SIZE {
385            return Err(SqlError::RowTooLarge {
386                size: value_buf.len(),
387                max: citadel_core::MAX_VALUE_SIZE,
388            });
389        }
390
391        match compiled_conflict.as_ref() {
392            None => {
393                let is_new = wtx
394                    .table_insert(table_schema.name.as_bytes(), &key_buf, &value_buf)
395                    .map_err(SqlError::Storage)?;
396                if !is_new {
397                    return Err(SqlError::DuplicateKey);
398                }
399                let has_after_insert_triggers =
400                    schema.triggers_for(&table_schema.name).iter().any(|t| {
401                        t.enabled
402                            && t.timing == crate::parser::TriggerTiming::After
403                            && t.granularity == crate::parser::TriggerGranularity::ForEachRow
404                            && t.events
405                                .iter()
406                                .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
407                    });
408                if !table_schema.indices.is_empty() || has_after_insert_triggers {
409                    for (j, &i) in pk_indices.iter().enumerate() {
410                        row[i] = pk_values[j].clone();
411                    }
412                    for (j, &i) in non_pk.iter().enumerate() {
413                        row[i] =
414                            std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
415                    }
416                    if !table_schema.indices.is_empty() {
417                        insert_index_entries(&mut wtx, table_schema, &row, &pk_values)?;
418                    }
419                    if has_after_insert_triggers {
420                        super::triggers::fire_row_triggers(
421                            &mut wtx,
422                            schema,
423                            &table_schema.name,
424                            crate::parser::TriggerTiming::After,
425                            super::triggers::FireEvent::Insert,
426                            None,
427                            Some(row.clone()),
428                            &table_schema.columns,
429                        )?;
430                    }
431                }
432                if let Some(r) = row_for_stmt_trigger.clone() {
433                    stmt_new_rows.push(r);
434                }
435                count += 1;
436                if let Some(buf) = returning_rows.as_mut() {
437                    buf.push((None, proposed_row_for_returning));
438                }
439            }
440            Some(oc) => {
441                let oc_ref: &CompiledOnConflict = oc;
442                let needs_row = upsert_needs_row(oc_ref, table_schema);
443                if needs_row {
444                    for (j, &i) in pk_indices.iter().enumerate() {
445                        row[i] = pk_values[j].clone();
446                    }
447                    for (j, &i) in non_pk.iter().enumerate() {
448                        row[i] =
449                            std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
450                    }
451                }
452                let outcome = apply_insert_with_conflict(
453                    &mut wtx,
454                    table_schema,
455                    &key_buf,
456                    &value_buf,
457                    &row,
458                    &pk_values,
459                    oc_ref,
460                    row_col_map.as_ref().unwrap(),
461                    stmt.returning.is_some(),
462                )?;
463                match outcome {
464                    InsertRowOutcome::Inserted => {
465                        count += 1;
466                        if let Some(buf) = returning_rows.as_mut() {
467                            buf.push((None, proposed_row_for_returning));
468                        }
469                        if let Some(r) = row_for_stmt_trigger.clone() {
470                            stmt_new_rows.push(r);
471                        }
472                        let has_after_insert_triggers =
473                            schema.triggers_for(&table_schema.name).iter().any(|t| {
474                                t.enabled
475                                    && t.timing == crate::parser::TriggerTiming::After
476                                    && t.granularity
477                                        == crate::parser::TriggerGranularity::ForEachRow
478                                    && t.events
479                                        .iter()
480                                        .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
481                            });
482                        if has_after_insert_triggers {
483                            super::triggers::fire_row_triggers(
484                                &mut wtx,
485                                schema,
486                                &table_schema.name,
487                                crate::parser::TriggerTiming::After,
488                                super::triggers::FireEvent::Insert,
489                                None,
490                                Some(row.clone()),
491                                &table_schema.columns,
492                            )?;
493                        }
494                    }
495                    InsertRowOutcome::Updated { old, new } => {
496                        count += 1;
497                        if let Some(buf) = returning_rows.as_mut() {
498                            buf.push((Some(old.clone()), Some(new.clone())));
499                        }
500                        let has_after_update_triggers =
501                            schema.triggers_for(&table_schema.name).iter().any(|t| {
502                                t.enabled
503                                    && t.timing == crate::parser::TriggerTiming::After
504                                    && t.granularity
505                                        == crate::parser::TriggerGranularity::ForEachRow
506                                    && t.events.iter().any(|e| {
507                                        matches!(e, crate::parser::TriggerEvent::Update(_))
508                                    })
509                            });
510                        if has_after_update_triggers {
511                            let changed_cols: Vec<String> = match oc_ref {
512                                CompiledOnConflict::DoUpdate { assignments, .. } => assignments
513                                    .iter()
514                                    .map(|(col_idx, _)| table_schema.columns[*col_idx].name.clone())
515                                    .collect(),
516                                _ => Vec::new(),
517                            };
518                            super::triggers::fire_row_triggers(
519                                &mut wtx,
520                                schema,
521                                &table_schema.name,
522                                crate::parser::TriggerTiming::After,
523                                super::triggers::FireEvent::Update {
524                                    changed_columns: &changed_cols,
525                                },
526                                Some(old),
527                                Some(new),
528                                &table_schema.columns,
529                            )?;
530                        }
531                    }
532                    InsertRowOutcome::Skipped => {}
533                }
534            }
535        }
536    }
537
538    if has_insert_statement_triggers {
539        super::triggers::fire_statement_triggers(
540            &mut wtx,
541            schema,
542            &table_schema.name,
543            crate::parser::TriggerTiming::After,
544            super::triggers::FireEvent::Insert,
545            &table_schema.columns,
546            &[],
547            &stmt_new_rows,
548        )?;
549    }
550
551    if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
552        let qr = super::helpers::project_returning(table_schema, returning_cols, &rows)?;
553        super::helpers::drain_deferred_fk_checks(&mut wtx)?;
554        wtx.commit().map_err(SqlError::Storage)?;
555        return Ok(ExecutionResult::Query(qr));
556    }
557
558    super::helpers::drain_deferred_fk_checks(&mut wtx)?;
559    wtx.commit().map_err(SqlError::Storage)?;
560    Ok(ExecutionResult::RowsAffected(count))
561}
562
563pub(super) fn has_subquery(expr: &Expr) -> bool {
564    crate::parser::has_subquery(expr)
565}
566
567pub(super) fn stmt_has_subquery(stmt: &SelectStmt) -> bool {
568    if let Some(ref w) = stmt.where_clause {
569        if has_subquery(w) {
570            return true;
571        }
572    }
573    if let Some(ref h) = stmt.having {
574        if has_subquery(h) {
575            return true;
576        }
577    }
578    for col in &stmt.columns {
579        if let SelectColumn::Expr { expr, .. } = col {
580            if has_subquery(expr) {
581                return true;
582            }
583        }
584    }
585    for ob in &stmt.order_by {
586        if has_subquery(&ob.expr) {
587            return true;
588        }
589    }
590    for join in &stmt.joins {
591        if let Some(ref on_expr) = join.on_clause {
592            if has_subquery(on_expr) {
593                return true;
594            }
595        }
596    }
597    false
598}
599
600pub(super) fn materialize_expr(
601    expr: &Expr,
602    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
603) -> Result<Expr> {
604    match expr {
605        Expr::InSubquery {
606            expr: e,
607            subquery,
608            negated,
609        } => {
610            let inner = materialize_expr(e, exec_sub)?;
611            let qr = exec_sub(subquery)?;
612            if !qr.columns.is_empty() && qr.columns.len() != 1 {
613                return Err(SqlError::SubqueryMultipleColumns);
614            }
615            let mut values = rustc_hash::FxHashSet::default();
616            let mut has_null = false;
617            for row in &qr.rows {
618                if row[0].is_null() {
619                    has_null = true;
620                } else {
621                    values.insert(row[0].clone());
622                }
623            }
624            Ok(Expr::InSet {
625                expr: Box::new(inner),
626                values,
627                has_null,
628                negated: *negated,
629            })
630        }
631        Expr::ScalarSubquery(subquery) => {
632            let qr = exec_sub(subquery)?;
633            if qr.rows.len() > 1 {
634                return Err(SqlError::SubqueryMultipleRows);
635            }
636            let val = if qr.rows.is_empty() {
637                Value::Null
638            } else {
639                qr.rows[0][0].clone()
640            };
641            Ok(Expr::Literal(val))
642        }
643        Expr::Exists { subquery, negated } => {
644            let qr = exec_sub(subquery)?;
645            let exists = !qr.rows.is_empty();
646            let result = if *negated { !exists } else { exists };
647            Ok(Expr::Literal(Value::Boolean(result)))
648        }
649        Expr::InList {
650            expr: e,
651            list,
652            negated,
653        } => {
654            let inner = materialize_expr(e, exec_sub)?;
655            let items = list
656                .iter()
657                .map(|item| materialize_expr(item, exec_sub))
658                .collect::<Result<Vec<_>>>()?;
659            Ok(Expr::InList {
660                expr: Box::new(inner),
661                list: items,
662                negated: *negated,
663            })
664        }
665        Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
666            left: Box::new(materialize_expr(left, exec_sub)?),
667            op: *op,
668            right: Box::new(materialize_expr(right, exec_sub)?),
669        }),
670        Expr::UnaryOp { op, expr: e } => Ok(Expr::UnaryOp {
671            op: *op,
672            expr: Box::new(materialize_expr(e, exec_sub)?),
673        }),
674        Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(materialize_expr(e, exec_sub)?))),
675        Expr::IsNotNull(e) => Ok(Expr::IsNotNull(Box::new(materialize_expr(e, exec_sub)?))),
676        Expr::InSet {
677            expr: e,
678            values,
679            has_null,
680            negated,
681        } => Ok(Expr::InSet {
682            expr: Box::new(materialize_expr(e, exec_sub)?),
683            values: values.clone(),
684            has_null: *has_null,
685            negated: *negated,
686        }),
687        Expr::Between {
688            expr: e,
689            low,
690            high,
691            negated,
692        } => Ok(Expr::Between {
693            expr: Box::new(materialize_expr(e, exec_sub)?),
694            low: Box::new(materialize_expr(low, exec_sub)?),
695            high: Box::new(materialize_expr(high, exec_sub)?),
696            negated: *negated,
697        }),
698        Expr::Like {
699            expr: e,
700            pattern,
701            escape,
702            negated,
703        } => {
704            let esc = escape
705                .as_ref()
706                .map(|es| materialize_expr(es, exec_sub).map(Box::new))
707                .transpose()?;
708            Ok(Expr::Like {
709                expr: Box::new(materialize_expr(e, exec_sub)?),
710                pattern: Box::new(materialize_expr(pattern, exec_sub)?),
711                escape: esc,
712                negated: *negated,
713            })
714        }
715        Expr::Case {
716            operand,
717            conditions,
718            else_result,
719        } => {
720            let op = operand
721                .as_ref()
722                .map(|e| materialize_expr(e, exec_sub).map(Box::new))
723                .transpose()?;
724            let conds = conditions
725                .iter()
726                .map(|(c, r)| {
727                    Ok((
728                        materialize_expr(c, exec_sub)?,
729                        materialize_expr(r, exec_sub)?,
730                    ))
731                })
732                .collect::<Result<Vec<_>>>()?;
733            let else_r = else_result
734                .as_ref()
735                .map(|e| materialize_expr(e, exec_sub).map(Box::new))
736                .transpose()?;
737            Ok(Expr::Case {
738                operand: op,
739                conditions: conds,
740                else_result: else_r,
741            })
742        }
743        Expr::Coalesce(args) => {
744            let materialized = args
745                .iter()
746                .map(|a| materialize_expr(a, exec_sub))
747                .collect::<Result<Vec<_>>>()?;
748            Ok(Expr::Coalesce(materialized))
749        }
750        Expr::Cast { expr: e, data_type } => Ok(Expr::Cast {
751            expr: Box::new(materialize_expr(e, exec_sub)?),
752            data_type: *data_type,
753        }),
754        Expr::Function {
755            name,
756            args,
757            distinct,
758        } => {
759            let materialized = args
760                .iter()
761                .map(|a| materialize_expr(a, exec_sub))
762                .collect::<Result<Vec<_>>>()?;
763            Ok(Expr::Function {
764                name: name.clone(),
765                args: materialized,
766                distinct: *distinct,
767            })
768        }
769        other => Ok(other.clone()),
770    }
771}
772
773pub(super) fn materialize_stmt(
774    stmt: &SelectStmt,
775    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
776) -> Result<SelectStmt> {
777    let where_clause = stmt
778        .where_clause
779        .as_ref()
780        .map(|e| materialize_expr(e, exec_sub))
781        .transpose()?;
782    let having = stmt
783        .having
784        .as_ref()
785        .map(|e| materialize_expr(e, exec_sub))
786        .transpose()?;
787    let columns = stmt
788        .columns
789        .iter()
790        .map(|c| match c {
791            SelectColumn::AllColumns => Ok(SelectColumn::AllColumns),
792            SelectColumn::AllFromOld => Ok(SelectColumn::AllFromOld),
793            SelectColumn::AllFromNew => Ok(SelectColumn::AllFromNew),
794            SelectColumn::Expr { expr, alias } => Ok(SelectColumn::Expr {
795                expr: materialize_expr(expr, exec_sub)?,
796                alias: alias.clone(),
797            }),
798        })
799        .collect::<Result<Vec<_>>>()?;
800    let order_by = stmt
801        .order_by
802        .iter()
803        .map(|ob| {
804            Ok(OrderByItem {
805                expr: materialize_expr(&ob.expr, exec_sub)?,
806                descending: ob.descending,
807                nulls_first: ob.nulls_first,
808            })
809        })
810        .collect::<Result<Vec<_>>>()?;
811    let joins = stmt
812        .joins
813        .iter()
814        .map(|j| {
815            let on_clause = j
816                .on_clause
817                .as_ref()
818                .map(|e| materialize_expr(e, exec_sub))
819                .transpose()?;
820            Ok(JoinClause {
821                join_type: j.join_type,
822                table: j.table.clone(),
823                subquery: j.subquery.clone(),
824                on_clause,
825            })
826        })
827        .collect::<Result<Vec<_>>>()?;
828    let group_by = stmt
829        .group_by
830        .iter()
831        .map(|e| materialize_expr(e, exec_sub))
832        .collect::<Result<Vec<_>>>()?;
833    Ok(SelectStmt {
834        columns,
835        from: stmt.from.clone(),
836        from_alias: stmt.from_alias.clone(),
837        from_subquery: stmt.from_subquery.clone(),
838        from_args: stmt.from_args.clone(),
839        from_json_table: stmt.from_json_table.clone(),
840        joins,
841        distinct: stmt.distinct,
842        where_clause,
843        order_by,
844        limit: stmt.limit.clone(),
845        offset: stmt.offset.clone(),
846        group_by,
847        having,
848    })
849}
850
851pub(super) fn exec_subquery_read(
852    db: &Database,
853    schema: &SchemaManager,
854    stmt: &SelectStmt,
855    ctes: &CteContext,
856) -> Result<QueryResult> {
857    let mut rtx = db.begin_read();
858    exec_subquery_with_read(&mut rtx, schema, stmt, ctes)
859}
860
861pub(super) fn exec_subquery_with_read(
862    rtx: &mut ReadTxn<'_>,
863    schema: &SchemaManager,
864    stmt: &SelectStmt,
865    ctes: &CteContext,
866) -> Result<QueryResult> {
867    match super::exec_select_with_read(rtx, schema, stmt, ctes)? {
868        ExecutionResult::Query(qr) => Ok(qr),
869        _ => Ok(QueryResult {
870            columns: vec![],
871            rows: vec![],
872        }),
873    }
874}
875
876pub(super) fn exec_subquery_write(
877    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
878    schema: &SchemaManager,
879    stmt: &SelectStmt,
880    ctes: &CteContext,
881) -> Result<QueryResult> {
882    match super::exec_select_in_txn(wtx, schema, stmt, ctes)? {
883        ExecutionResult::Query(qr) => Ok(qr),
884        _ => Ok(QueryResult {
885            columns: vec![],
886            rows: vec![],
887        }),
888    }
889}
890
891pub(super) fn update_has_subquery(stmt: &UpdateStmt) -> bool {
892    stmt.where_clause.as_ref().is_some_and(has_subquery)
893        || stmt.assignments.iter().any(|(_, e)| has_subquery(e))
894}
895
896pub(super) fn materialize_update(
897    stmt: &UpdateStmt,
898    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
899) -> Result<UpdateStmt> {
900    let where_clause = stmt
901        .where_clause
902        .as_ref()
903        .map(|e| materialize_expr(e, exec_sub))
904        .transpose()?;
905    let assignments = stmt
906        .assignments
907        .iter()
908        .map(|(name, expr)| Ok((name.clone(), materialize_expr(expr, exec_sub)?)))
909        .collect::<Result<Vec<_>>>()?;
910    Ok(UpdateStmt {
911        table: stmt.table.clone(),
912        assignments,
913        where_clause,
914        returning: stmt.returning.clone(),
915    })
916}
917
918pub(super) fn delete_has_subquery(stmt: &DeleteStmt) -> bool {
919    stmt.where_clause.as_ref().is_some_and(has_subquery)
920}
921
922pub(super) fn materialize_delete(
923    stmt: &DeleteStmt,
924    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
925) -> Result<DeleteStmt> {
926    let where_clause = stmt
927        .where_clause
928        .as_ref()
929        .map(|e| materialize_expr(e, exec_sub))
930        .transpose()?;
931    Ok(DeleteStmt {
932        table: stmt.table.clone(),
933        where_clause,
934        returning: stmt.returning.clone(),
935    })
936}
937
938pub(super) fn insert_has_subquery(stmt: &InsertStmt) -> bool {
939    match &stmt.source {
940        InsertSource::Values(rows) => rows.iter().any(|row| row.iter().any(has_subquery)),
941        // SELECT source subqueries are handled by exec_select's correlated/non-correlated paths
942        InsertSource::Select(_) => false,
943    }
944}
945
946pub(super) fn materialize_insert(
947    stmt: &InsertStmt,
948    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
949) -> Result<InsertStmt> {
950    let source = match &stmt.source {
951        InsertSource::Values(rows) => {
952            let mat = rows
953                .iter()
954                .map(|row| {
955                    row.iter()
956                        .map(|e| materialize_expr(e, exec_sub))
957                        .collect::<Result<Vec<_>>>()
958                })
959                .collect::<Result<Vec<_>>>()?;
960            InsertSource::Values(mat)
961        }
962        InsertSource::Select(sq) => {
963            let ctes = sq
964                .ctes
965                .iter()
966                .map(|c| {
967                    Ok(CteDefinition {
968                        name: c.name.clone(),
969                        column_aliases: c.column_aliases.clone(),
970                        body: materialize_query_body(&c.body, exec_sub)?,
971                    })
972                })
973                .collect::<Result<Vec<_>>>()?;
974            let body = materialize_query_body(&sq.body, exec_sub)?;
975            InsertSource::Select(Box::new(SelectQuery {
976                ctes,
977                recursive: sq.recursive,
978                body,
979            }))
980        }
981    };
982    Ok(InsertStmt {
983        table: stmt.table.clone(),
984        columns: stmt.columns.clone(),
985        source,
986        on_conflict: stmt.on_conflict.clone(),
987        returning: stmt.returning.clone(),
988    })
989}
990
991pub(super) fn materialize_query_body(
992    body: &QueryBody,
993    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
994) -> Result<QueryBody> {
995    match body {
996        QueryBody::Select(sel) => Ok(QueryBody::Select(Box::new(materialize_stmt(
997            sel, exec_sub,
998        )?))),
999        QueryBody::Compound(comp) => Ok(QueryBody::Compound(Box::new(CompoundSelect {
1000            op: comp.op.clone(),
1001            all: comp.all,
1002            left: Box::new(materialize_query_body(&comp.left, exec_sub)?),
1003            right: Box::new(materialize_query_body(&comp.right, exec_sub)?),
1004            order_by: comp.order_by.clone(),
1005            limit: comp.limit.clone(),
1006            offset: comp.offset.clone(),
1007        }))),
1008        QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Ok(body.clone()),
1009    }
1010}
1011
1012pub(super) fn exec_query_body_with_read(
1013    rtx: &mut ReadTxn<'_>,
1014    schema: &SchemaManager,
1015    body: &QueryBody,
1016    ctes: &CteContext,
1017) -> Result<ExecutionResult> {
1018    match body {
1019        QueryBody::Select(sel) => super::exec_select_with_read(rtx, schema, sel, ctes),
1020        QueryBody::Compound(comp) => exec_compound_select_with_read(rtx, schema, comp, ctes),
1021        QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Err(
1022            SqlError::Unsupported("DML CTE bodies require an active write transaction".into()),
1023        ),
1024    }
1025}
1026
1027pub(super) fn exec_query_body_in_txn(
1028    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1029    schema: &SchemaManager,
1030    body: &QueryBody,
1031    ctes: &CteContext,
1032) -> Result<ExecutionResult> {
1033    match body {
1034        QueryBody::Select(sel) => super::exec_select_in_txn(wtx, schema, sel, ctes),
1035        QueryBody::Compound(comp) => exec_compound_select_in_txn(wtx, schema, comp, ctes),
1036        QueryBody::Insert(ins) => exec_insert_in_txn_with_ctes(wtx, schema, ins, &[], ctes),
1037        QueryBody::Update(upd) => super::exec_update_in_txn(wtx, schema, upd),
1038        QueryBody::Delete(del) => super::exec_delete_in_txn(wtx, schema, del),
1039    }
1040}
1041
1042pub(super) fn exec_query_body_read(
1043    db: &Database,
1044    schema: &SchemaManager,
1045    body: &QueryBody,
1046    ctes: &CteContext,
1047) -> Result<QueryResult> {
1048    let mut rtx = db.begin_read();
1049    exec_query_body_with_read_qr(&mut rtx, schema, body, ctes)
1050}
1051
1052pub(super) fn exec_query_body_with_read_qr(
1053    rtx: &mut ReadTxn<'_>,
1054    schema: &SchemaManager,
1055    body: &QueryBody,
1056    ctes: &CteContext,
1057) -> Result<QueryResult> {
1058    match exec_query_body_with_read(rtx, schema, body, ctes)? {
1059        ExecutionResult::Query(qr) => Ok(qr),
1060        _ => Ok(QueryResult {
1061            columns: vec![],
1062            rows: vec![],
1063        }),
1064    }
1065}
1066
1067pub(super) fn exec_query_body_write(
1068    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1069    schema: &SchemaManager,
1070    body: &QueryBody,
1071    ctes: &CteContext,
1072) -> Result<QueryResult> {
1073    match exec_query_body_in_txn(wtx, schema, body, ctes)? {
1074        ExecutionResult::Query(qr) => Ok(qr),
1075        _ => Ok(QueryResult {
1076            columns: vec![],
1077            rows: vec![],
1078        }),
1079    }
1080}
1081
1082pub(super) fn exec_compound_select_with_read(
1083    rtx: &mut ReadTxn<'_>,
1084    schema: &SchemaManager,
1085    comp: &CompoundSelect,
1086    ctes: &CteContext,
1087) -> Result<ExecutionResult> {
1088    let left_qr = match exec_query_body_with_read(rtx, schema, &comp.left, ctes)? {
1089        ExecutionResult::Query(qr) => qr,
1090        _ => QueryResult {
1091            columns: vec![],
1092            rows: vec![],
1093        },
1094    };
1095    let right_qr = match exec_query_body_with_read(rtx, schema, &comp.right, ctes)? {
1096        ExecutionResult::Query(qr) => qr,
1097        _ => QueryResult {
1098            columns: vec![],
1099            rows: vec![],
1100        },
1101    };
1102    apply_set_operation(comp, left_qr, right_qr)
1103}
1104
1105pub(super) fn exec_compound_select_in_txn(
1106    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1107    schema: &SchemaManager,
1108    comp: &CompoundSelect,
1109    ctes: &CteContext,
1110) -> Result<ExecutionResult> {
1111    let left_qr = match exec_query_body_in_txn(wtx, schema, &comp.left, ctes)? {
1112        ExecutionResult::Query(qr) => qr,
1113        _ => QueryResult {
1114            columns: vec![],
1115            rows: vec![],
1116        },
1117    };
1118    let right_qr = match exec_query_body_in_txn(wtx, schema, &comp.right, ctes)? {
1119        ExecutionResult::Query(qr) => qr,
1120        _ => QueryResult {
1121            columns: vec![],
1122            rows: vec![],
1123        },
1124    };
1125    apply_set_operation(comp, left_qr, right_qr)
1126}
1127
1128pub(super) fn apply_set_operation(
1129    comp: &CompoundSelect,
1130    left_qr: QueryResult,
1131    right_qr: QueryResult,
1132) -> Result<ExecutionResult> {
1133    if !left_qr.columns.is_empty()
1134        && !right_qr.columns.is_empty()
1135        && left_qr.columns.len() != right_qr.columns.len()
1136    {
1137        return Err(SqlError::CompoundColumnCountMismatch {
1138            left: left_qr.columns.len(),
1139            right: right_qr.columns.len(),
1140        });
1141    }
1142
1143    let columns = left_qr.columns;
1144
1145    let mut rows = match (&comp.op, comp.all) {
1146        (SetOp::Union, true) => {
1147            let total = left_qr.rows.len().saturating_add(right_qr.rows.len());
1148            let mut rows = Vec::with_capacity(total);
1149            rows.extend(left_qr.rows);
1150            rows.extend(right_qr.rows);
1151            rows
1152        }
1153        (SetOp::Union, false) => {
1154            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1155            let mut rows = Vec::new();
1156            for row in left_qr.rows.into_iter().chain(right_qr.rows) {
1157                if !seen.contains(&row) {
1158                    seen.insert(row.clone());
1159                    rows.push(row);
1160                }
1161            }
1162            rows
1163        }
1164        (SetOp::Intersect, true) => {
1165            let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1166            for row in &right_qr.rows {
1167                *right_counts.entry(row.clone()).or_insert(0) += 1;
1168            }
1169            let mut rows = Vec::new();
1170            for row in left_qr.rows {
1171                if let Some(count) = right_counts.get_mut(&row) {
1172                    if *count > 0 {
1173                        *count -= 1;
1174                        rows.push(row);
1175                    }
1176                }
1177            }
1178            rows
1179        }
1180        (SetOp::Intersect, false) => {
1181            let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1182            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1183            let mut rows = Vec::new();
1184            for row in left_qr.rows {
1185                if right_set.contains(&row) && !seen.contains(&row) {
1186                    seen.insert(row.clone());
1187                    rows.push(row);
1188                }
1189            }
1190            rows
1191        }
1192        (SetOp::Except, true) => {
1193            let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1194            for row in &right_qr.rows {
1195                *right_counts.entry(row.clone()).or_insert(0) += 1;
1196            }
1197            let mut rows = Vec::new();
1198            for row in left_qr.rows {
1199                if let Some(count) = right_counts.get_mut(&row) {
1200                    if *count > 0 {
1201                        *count -= 1;
1202                        continue;
1203                    }
1204                }
1205                rows.push(row);
1206            }
1207            rows
1208        }
1209        (SetOp::Except, false) => {
1210            let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1211            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1212            let mut rows = Vec::new();
1213            for row in left_qr.rows {
1214                if !right_set.contains(&row) && !seen.contains(&row) {
1215                    seen.insert(row.clone());
1216                    rows.push(row);
1217                }
1218            }
1219            rows
1220        }
1221    };
1222
1223    if !comp.order_by.is_empty() {
1224        let col_defs: Vec<crate::types::ColumnDef> = columns
1225            .iter()
1226            .enumerate()
1227            .map(|(i, name)| crate::types::ColumnDef {
1228                name: name.clone(),
1229                data_type: crate::types::DataType::Null,
1230                nullable: true,
1231                position: i as u16,
1232                default_expr: None,
1233                default_sql: None,
1234                check_expr: None,
1235                check_sql: None,
1236                check_name: None,
1237                is_with_timezone: false,
1238                generated_expr: None,
1239                generated_sql: None,
1240                generated_kind: None,
1241                collation: crate::types::Collation::Binary,
1242            })
1243            .collect();
1244        sort_rows(&mut rows, &comp.order_by, &col_defs)?;
1245    }
1246
1247    if let Some(ref offset_expr) = comp.offset {
1248        let offset = eval_const_int(offset_expr)?.max(0) as usize;
1249        if offset < rows.len() {
1250            rows = rows.split_off(offset);
1251        } else {
1252            rows.clear();
1253        }
1254    }
1255
1256    if let Some(ref limit_expr) = comp.limit {
1257        let limit = eval_const_int(limit_expr)?.max(0) as usize;
1258        rows.truncate(limit);
1259    }
1260
1261    Ok(ExecutionResult::Query(QueryResult { columns, rows }))
1262}
1263
1264struct InsertBufs {
1265    row: Vec<Value>,
1266    pk_values: Vec<Value>,
1267    value_values: Vec<Value>,
1268    key_buf: Vec<u8>,
1269    value_buf: Vec<u8>,
1270    col_indices: Vec<usize>,
1271    fk_key_buf: Vec<u8>,
1272}
1273
1274impl InsertBufs {
1275    fn new() -> Self {
1276        Self {
1277            row: Vec::new(),
1278            pk_values: Vec::new(),
1279            value_values: Vec::new(),
1280            key_buf: Vec::with_capacity(64),
1281            value_buf: Vec::with_capacity(256),
1282            col_indices: Vec::new(),
1283            fk_key_buf: Vec::with_capacity(64),
1284        }
1285    }
1286}
1287
1288thread_local! {
1289    static INSERT_SCRATCH: RefCell<InsertBufs> = RefCell::new(InsertBufs::new());
1290    static UPSERT_SCRATCH: RefCell<UpsertBufs> = RefCell::new(UpsertBufs::new());
1291}
1292
1293fn with_insert_scratch<R>(f: impl FnOnce(&mut InsertBufs) -> R) -> R {
1294    INSERT_SCRATCH.with(|slot| match slot.try_borrow_mut() {
1295        Ok(mut borrowed) => f(&mut borrowed),
1296        Err(_) => {
1297            let mut local = InsertBufs::new();
1298            f(&mut local)
1299        }
1300    })
1301}
1302
1303pub(super) struct UpsertBufs {
1304    old_row: Vec<Value>,
1305    new_row: Vec<Value>,
1306    value_values: Vec<Value>,
1307    new_value_buf: Vec<u8>,
1308}
1309
1310impl UpsertBufs {
1311    pub(super) fn new() -> Self {
1312        Self {
1313            old_row: Vec::new(),
1314            new_row: Vec::new(),
1315            value_values: Vec::new(),
1316            new_value_buf: Vec::with_capacity(256),
1317        }
1318    }
1319}
1320
1321pub fn exec_insert_in_txn(
1322    wtx: &mut WriteTxn<'_>,
1323    schema: &SchemaManager,
1324    stmt: &InsertStmt,
1325    params: &[Value],
1326) -> Result<ExecutionResult> {
1327    with_insert_scratch(|bufs| {
1328        exec_insert_in_txn_impl(
1329            wtx,
1330            schema,
1331            stmt,
1332            params,
1333            bufs,
1334            None,
1335            &CteContext::default(),
1336        )
1337    })
1338}
1339
1340pub(super) fn exec_insert_in_txn_with_ctes(
1341    wtx: &mut WriteTxn<'_>,
1342    schema: &SchemaManager,
1343    stmt: &InsertStmt,
1344    params: &[Value],
1345    outer_ctes: &CteContext,
1346) -> Result<ExecutionResult> {
1347    with_insert_scratch(|bufs| {
1348        exec_insert_in_txn_impl(wtx, schema, stmt, params, bufs, None, outer_ctes)
1349    })
1350}
1351
1352fn exec_insert_in_txn_cached(
1353    wtx: &mut WriteTxn<'_>,
1354    schema: &SchemaManager,
1355    stmt: &InsertStmt,
1356    params: &[Value],
1357    cache: &InsertCache,
1358) -> Result<ExecutionResult> {
1359    with_insert_scratch(|bufs| {
1360        exec_insert_in_txn_impl(
1361            wtx,
1362            schema,
1363            stmt,
1364            params,
1365            bufs,
1366            Some(cache),
1367            &CteContext::default(),
1368        )
1369    })
1370}
1371
1372fn exec_insert_in_txn_impl(
1373    wtx: &mut WriteTxn<'_>,
1374    schema: &SchemaManager,
1375    stmt: &InsertStmt,
1376    params: &[Value],
1377    bufs: &mut InsertBufs,
1378    cache: Option<&InsertCache>,
1379    outer_ctes: &CteContext,
1380) -> Result<ExecutionResult> {
1381    let empty_ctes = CteContext::default();
1382    let materialized;
1383    let has_sub = match cache {
1384        Some(c) => c.has_subquery,
1385        None => insert_has_subquery(stmt),
1386    };
1387    let stmt = if has_sub {
1388        materialized = materialize_insert(stmt, &mut |sub| {
1389            exec_subquery_write(wtx, schema, sub, &empty_ctes)
1390        })?;
1391        &materialized
1392    } else {
1393        stmt
1394    };
1395
1396    let view_lookup_key = stmt.table.to_ascii_lowercase();
1397    if let Some(view_def) = schema.get_view(&view_lookup_key) {
1398        if super::triggers::has_instead_of(
1399            schema,
1400            &view_lookup_key,
1401            super::triggers::FireEvent::Insert,
1402        ) {
1403            let aliases = view_def.column_aliases.clone();
1404            return exec_instead_of_view_insert_in_txn(
1405                wtx,
1406                schema,
1407                &view_lookup_key,
1408                &aliases,
1409                stmt,
1410                params,
1411            );
1412        }
1413        return Err(SqlError::CannotModifyView(stmt.table.clone()));
1414    }
1415
1416    let table_schema = schema
1417        .get(&stmt.table)
1418        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1419
1420    let default_columns;
1421    let insert_columns: &[String] = if stmt.columns.is_empty() {
1422        default_columns = table_schema
1423            .columns
1424            .iter()
1425            .map(|c| c.name.clone())
1426            .collect::<Vec<_>>();
1427        &default_columns
1428    } else {
1429        &stmt.columns
1430    };
1431
1432    bufs.col_indices.clear();
1433    if let Some(c) = cache {
1434        bufs.col_indices.extend_from_slice(&c.col_indices);
1435    } else {
1436        for name in insert_columns {
1437            bufs.col_indices.push(
1438                table_schema
1439                    .column_index(name)
1440                    .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?,
1441            );
1442        }
1443    }
1444
1445    if cache.is_none() {
1446        for &ci in &bufs.col_indices {
1447            if table_schema.columns[ci].generated_kind.is_some() {
1448                return Err(SqlError::CannotInsertIntoGeneratedColumn(
1449                    table_schema.columns[ci].name.clone(),
1450                ));
1451            }
1452        }
1453    }
1454
1455    let generated_cols_uncached: Vec<(usize, &Expr, FastGenEval)>;
1456    let cached_gen_positions: &[usize];
1457    let cached_gen_fast_evals: &[FastGenEval];
1458    if let Some(c) = cache {
1459        cached_gen_positions = &c.generated_col_positions;
1460        cached_gen_fast_evals = &c.generated_fast_evals;
1461        generated_cols_uncached = Vec::new();
1462    } else {
1463        cached_gen_positions = &[];
1464        cached_gen_fast_evals = &[];
1465        generated_cols_uncached = table_schema
1466            .columns
1467            .iter()
1468            .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
1469            .map(|c| {
1470                let expr = c.generated_expr.as_ref().unwrap();
1471                let fe = detect_fast_gen_eval(expr, table_schema);
1472                (c.position as usize, expr, fe)
1473            })
1474            .collect();
1475    }
1476    let has_gen_cols = !cached_gen_positions.is_empty() || !generated_cols_uncached.is_empty();
1477    let row_col_map_for_gen_owned: Option<ColumnMap> = if !has_gen_cols || cache.is_some() {
1478        None
1479    } else {
1480        Some(ColumnMap::new(&table_schema.columns))
1481    };
1482    let row_col_map_for_gen: Option<&ColumnMap> = if !has_gen_cols {
1483        None
1484    } else if let Some(c) = cache {
1485        c.row_col_map.as_ref()
1486    } else {
1487        row_col_map_for_gen_owned.as_ref()
1488    };
1489
1490    let any_defaults = match cache {
1491        Some(c) => c.any_defaults,
1492        None => table_schema
1493            .columns
1494            .iter()
1495            .any(|c| c.default_expr.is_some()),
1496    };
1497    let defaults: Vec<(usize, &Expr)> = if any_defaults {
1498        table_schema
1499            .columns
1500            .iter()
1501            .filter(|c| {
1502                c.default_expr.is_some() && !bufs.col_indices.contains(&(c.position as usize))
1503            })
1504            .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
1505            .collect()
1506    } else {
1507        Vec::new()
1508    };
1509
1510    let has_checks = match cache {
1511        Some(c) => c.has_checks,
1512        None => table_schema.has_checks(),
1513    };
1514    let check_col_map = if has_checks {
1515        Some(ColumnMap::new(&table_schema.columns))
1516    } else {
1517        None
1518    };
1519
1520    let (pk_indices, non_pk, enc_pos, phys_count, dropped): (
1521        &[usize],
1522        &[usize],
1523        &[u16],
1524        usize,
1525        &[u16],
1526    ) = if let Some(c) = cache {
1527        (
1528            &c.pk_indices,
1529            &c.non_pk_indices,
1530            &c.encoding_positions,
1531            c.phys_count,
1532            &c.dropped_non_pk_slots,
1533        )
1534    } else {
1535        (
1536            table_schema.pk_indices(),
1537            table_schema.non_pk_indices(),
1538            table_schema.encoding_positions(),
1539            table_schema.physical_non_pk_count(),
1540            table_schema.dropped_non_pk_slots(),
1541        )
1542    };
1543
1544    bufs.row.resize(table_schema.columns.len(), Value::Null);
1545    bufs.pk_values.resize(pk_indices.len(), Value::Null);
1546    bufs.value_values.resize(phys_count, Value::Null);
1547
1548    let table_bytes = table_schema.name.as_bytes();
1549    let has_fks = !table_schema.foreign_keys.is_empty();
1550    let has_indices = !table_schema.indices.is_empty();
1551    let has_defaults = !defaults.is_empty();
1552
1553    let compiled_conflict: Option<Arc<CompiledOnConflict>> = match (cache, &stmt.on_conflict) {
1554        (Some(c), Some(_)) if c.on_conflict.is_some() => c.on_conflict.clone(),
1555        (_, Some(oc)) => Some(Arc::new(compile_on_conflict(oc, table_schema)?)),
1556        (_, None) => None,
1557    };
1558
1559    let row_col_map_owned: Option<ColumnMap> =
1560        if compiled_conflict.is_some() && cache.and_then(|c| c.row_col_map.as_ref()).is_none() {
1561            Some(ColumnMap::new(&table_schema.columns))
1562        } else {
1563            None
1564        };
1565    let row_col_map: Option<&ColumnMap> = cache
1566        .and_then(|c| c.row_col_map.as_ref())
1567        .or(row_col_map_owned.as_ref());
1568
1569    let select_rows = match &stmt.source {
1570        InsertSource::Select(sq) => {
1571            let insert_ctes = super::materialize_all_ctes_with_outer(
1572                &sq.ctes,
1573                sq.recursive,
1574                outer_ctes,
1575                &mut |body, ctx| exec_query_body_write(wtx, schema, body, ctx),
1576            )?;
1577            let qr = exec_query_body_write(wtx, schema, &sq.body, &insert_ctes)?;
1578            Some(qr.rows)
1579        }
1580        InsertSource::Values(_) => None,
1581    };
1582
1583    let mut count: u64 = 0;
1584    let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
1585        stmt.returning.as_ref().map(|_| Vec::new());
1586
1587    let values = match &stmt.source {
1588        InsertSource::Values(rows) => Some(rows.as_slice()),
1589        InsertSource::Select(_) => None,
1590    };
1591    let sel_rows = select_rows.as_deref();
1592
1593    let total = match (values, sel_rows) {
1594        (Some(rows), _) => rows.len(),
1595        (_, Some(rows)) => rows.len(),
1596        _ => 0,
1597    };
1598
1599    if let Some(sel) = sel_rows {
1600        if !sel.is_empty() && sel[0].len() != insert_columns.len() {
1601            return Err(SqlError::InvalidValue(format!(
1602                "INSERT ... SELECT column count mismatch: expected {}, got {}",
1603                insert_columns.len(),
1604                sel[0].len()
1605            )));
1606        }
1607    }
1608
1609    let has_insert_statement_triggers_impl =
1610        schema.triggers_for(&table_schema.name).iter().any(|t| {
1611            t.enabled
1612                && t.granularity == crate::parser::TriggerGranularity::ForEachStatement
1613                && t.events
1614                    .iter()
1615                    .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1616        });
1617    let mut stmt_new_rows_impl: Vec<Vec<Value>> = if has_insert_statement_triggers_impl {
1618        Vec::with_capacity(total)
1619    } else {
1620        Vec::new()
1621    };
1622    if has_insert_statement_triggers_impl {
1623        super::triggers::fire_statement_triggers(
1624            wtx,
1625            schema,
1626            &table_schema.name,
1627            crate::parser::TriggerTiming::Before,
1628            super::triggers::FireEvent::Insert,
1629            &table_schema.columns,
1630            &[],
1631            &[],
1632        )?;
1633    }
1634
1635    let skip_row_clear = cache.is_some_and(|c| c.row_fully_overwritten);
1636    for idx in 0..total {
1637        if !skip_row_clear {
1638            for v in bufs.row.iter_mut() {
1639                *v = Value::Null;
1640            }
1641        }
1642
1643        if let Some(value_rows) = values {
1644            if let Some(plan) = cache.and_then(|c| c.bind_plan.as_ref()) {
1645                for action in plan {
1646                    match action {
1647                        BindAction::Param {
1648                            param_idx,
1649                            col_idx,
1650                            target,
1651                        } => {
1652                            let v = &params[*param_idx];
1653                            bufs.row[*col_idx] = if v.is_null() {
1654                                Value::Null
1655                            } else if v.data_type() == *target {
1656                                v.clone()
1657                            } else {
1658                                let got = v.data_type();
1659                                v.clone().coerce_into(*target).ok_or_else(|| {
1660                                    SqlError::TypeMismatch {
1661                                        expected: target.to_string(),
1662                                        got: got.to_string(),
1663                                    }
1664                                })?
1665                            };
1666                        }
1667                        BindAction::Literal { value, col_idx } => {
1668                            bufs.row[*col_idx] = value.clone();
1669                        }
1670                    }
1671                }
1672            } else {
1673                let value_row = &value_rows[idx];
1674                if value_row.len() != insert_columns.len() {
1675                    return Err(SqlError::InvalidValue(format!(
1676                        "expected {} values, got {}",
1677                        insert_columns.len(),
1678                        value_row.len()
1679                    )));
1680                }
1681                for (i, expr) in value_row.iter().enumerate() {
1682                    let val = match expr {
1683                        Expr::Parameter(n) => params
1684                            .get(n - 1)
1685                            .cloned()
1686                            .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
1687                        Expr::Literal(v) => v.clone(),
1688                        _ => eval_const_expr(expr)?,
1689                    };
1690                    let col_idx = bufs.col_indices[i];
1691                    let col = &table_schema.columns[col_idx];
1692                    let got_type = val.data_type();
1693                    bufs.row[col_idx] = if val.is_null() {
1694                        Value::Null
1695                    } else {
1696                        val.coerce_into(col.data_type)
1697                            .ok_or_else(|| SqlError::TypeMismatch {
1698                                expected: col.data_type.to_string(),
1699                                got: got_type.to_string(),
1700                            })?
1701                    };
1702                }
1703            }
1704        } else if let Some(sel) = sel_rows {
1705            let sel_row = &sel[idx];
1706            for (i, val) in sel_row.iter().enumerate() {
1707                let col_idx = bufs.col_indices[i];
1708                let col = &table_schema.columns[col_idx];
1709                let got_type = val.data_type();
1710                bufs.row[col_idx] = if val.is_null() {
1711                    Value::Null
1712                } else {
1713                    val.clone().coerce_into(col.data_type).ok_or_else(|| {
1714                        SqlError::TypeMismatch {
1715                            expected: col.data_type.to_string(),
1716                            got: got_type.to_string(),
1717                        }
1718                    })?
1719                };
1720            }
1721        }
1722
1723        if has_defaults {
1724            for &(pos, def_expr) in &defaults {
1725                let val = eval_const_expr(def_expr)?;
1726                let col = &table_schema.columns[pos];
1727                if !val.is_null() {
1728                    let got_type = val.data_type();
1729                    bufs.row[pos] =
1730                        val.coerce_into(col.data_type)
1731                            .ok_or_else(|| SqlError::TypeMismatch {
1732                                expected: col.data_type.to_string(),
1733                                got: got_type.to_string(),
1734                            })?;
1735                }
1736            }
1737        }
1738
1739        if let Some(gen_map) = row_col_map_for_gen {
1740            if cache.is_some() {
1741                for (pos, fast) in cached_gen_positions
1742                    .iter()
1743                    .copied()
1744                    .zip(cached_gen_fast_evals.iter())
1745                {
1746                    let gen_expr = table_schema.columns[pos].generated_expr.as_ref().unwrap();
1747                    let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1748                    let col = &table_schema.columns[pos];
1749                    bufs.row[pos] = if val.is_null() {
1750                        Value::Null
1751                    } else {
1752                        let got_type = val.data_type();
1753                        val.coerce_into(col.data_type)
1754                            .ok_or_else(|| SqlError::TypeMismatch {
1755                                expected: col.data_type.to_string(),
1756                                got: got_type.to_string(),
1757                            })?
1758                    };
1759                }
1760            } else {
1761                for (pos, gen_expr, fast) in &generated_cols_uncached {
1762                    let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1763                    let col = &table_schema.columns[*pos];
1764                    bufs.row[*pos] = if val.is_null() {
1765                        Value::Null
1766                    } else {
1767                        let got_type = val.data_type();
1768                        val.coerce_into(col.data_type)
1769                            .ok_or_else(|| SqlError::TypeMismatch {
1770                                expected: col.data_type.to_string(),
1771                                got: got_type.to_string(),
1772                            })?
1773                    };
1774                }
1775            }
1776        }
1777
1778        if let Some(c) = cache {
1779            for &pos in &c.not_null_indices {
1780                if bufs.row[pos as usize].is_null() {
1781                    return Err(SqlError::NotNullViolation(
1782                        table_schema.columns[pos as usize].name.clone(),
1783                    ));
1784                }
1785            }
1786        } else {
1787            for col in &table_schema.columns {
1788                if !col.nullable && bufs.row[col.position as usize].is_null() {
1789                    return Err(SqlError::NotNullViolation(col.name.clone()));
1790                }
1791            }
1792        }
1793
1794        if let Some(ref col_map) = check_col_map {
1795            for col in &table_schema.columns {
1796                if let Some(ref check) = col.check_expr {
1797                    let result = eval_expr(check, &EvalCtx::new(col_map, &bufs.row))?;
1798                    if !is_truthy(&result) && !result.is_null() {
1799                        let name = col.check_name.as_deref().unwrap_or(&col.name);
1800                        return Err(SqlError::CheckViolation(name.to_string()));
1801                    }
1802                }
1803            }
1804            for tc in &table_schema.check_constraints {
1805                let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &bufs.row))?;
1806                if !is_truthy(&result) && !result.is_null() {
1807                    let name = tc.name.as_deref().unwrap_or(&tc.sql);
1808                    return Err(SqlError::CheckViolation(name.to_string()));
1809                }
1810            }
1811        }
1812
1813        if has_fks {
1814            for fk in &table_schema.foreign_keys {
1815                let any_null = fk.columns.iter().any(|&ci| bufs.row[ci as usize].is_null());
1816                if any_null {
1817                    continue;
1818                }
1819                crate::encoding::encode_composite_key_from_indices(
1820                    &fk.columns,
1821                    &bufs.row,
1822                    &mut bufs.fk_key_buf,
1823                );
1824                if fk.deferrable && fk.initially_deferred {
1825                    let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
1826                    wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
1827                        fk_name: name,
1828                        foreign_table: fk.foreign_table.as_bytes().to_vec(),
1829                        parent_key: bufs.fk_key_buf.clone(),
1830                    });
1831                    continue;
1832                }
1833                if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &bufs.fk_key_buf) {
1834                    let found = wtx
1835                        .table_get(fk.foreign_table.as_bytes(), &bufs.fk_key_buf)
1836                        .map_err(SqlError::Storage)?;
1837                    if found.is_none() {
1838                        let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
1839                        return Err(SqlError::ForeignKeyViolation(name.to_string()));
1840                    }
1841                    wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &bufs.fk_key_buf);
1842                }
1843            }
1844        }
1845
1846        let proposed_row_for_returning: Option<Vec<Value>> =
1847            returning_rows.as_ref().map(|_| bufs.row.clone());
1848        let row_for_stmt_trigger_impl: Option<Vec<Value>> = if has_insert_statement_triggers_impl {
1849            Some(bufs.row.clone())
1850        } else {
1851            None
1852        };
1853
1854        let has_before_insert_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
1855            t.enabled
1856                && t.timing == crate::parser::TriggerTiming::Before
1857                && t.granularity == crate::parser::TriggerGranularity::ForEachRow
1858                && t.events
1859                    .iter()
1860                    .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1861        });
1862        if has_before_insert_triggers {
1863            super::triggers::fire_row_triggers(
1864                wtx,
1865                schema,
1866                &table_schema.name,
1867                crate::parser::TriggerTiming::Before,
1868                super::triggers::FireEvent::Insert,
1869                None,
1870                Some(bufs.row.clone()),
1871                &table_schema.columns,
1872            )?;
1873        }
1874
1875        for (j, &i) in pk_indices.iter().enumerate() {
1876            bufs.pk_values[j] = std::mem::replace(&mut bufs.row[i], Value::Null);
1877        }
1878        match cache.map(|c| c.single_int_pk).unwrap_or(false) {
1879            true => match bufs.pk_values[0] {
1880                Value::Integer(v) => crate::encoding::encode_int_key_into(v, &mut bufs.key_buf),
1881                _ => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1882            },
1883            false => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1884        }
1885
1886        for &slot in dropped {
1887            bufs.value_values[slot as usize] = Value::Null;
1888        }
1889        for (j, &i) in non_pk.iter().enumerate() {
1890            let col = &table_schema.columns[i];
1891            if matches!(
1892                col.generated_kind,
1893                Some(crate::parser::GeneratedKind::Virtual)
1894            ) {
1895                bufs.value_values[enc_pos[j] as usize] = Value::Null;
1896                bufs.row[i] = Value::Null;
1897            } else {
1898                bufs.value_values[enc_pos[j] as usize] =
1899                    std::mem::replace(&mut bufs.row[i], Value::Null);
1900            }
1901        }
1902        match cache.and_then(|c| c.row_encoder.as_ref()) {
1903            Some(tmpl) => crate::encoding::encode_int_row_with_template(
1904                tmpl,
1905                &bufs.value_values,
1906                &mut bufs.value_buf,
1907            )?,
1908            None => encode_row_into(&bufs.value_values, &mut bufs.value_buf),
1909        }
1910
1911        if bufs.key_buf.len() > citadel_core::MAX_KEY_SIZE {
1912            return Err(SqlError::KeyTooLarge {
1913                size: bufs.key_buf.len(),
1914                max: citadel_core::MAX_KEY_SIZE,
1915            });
1916        }
1917        if bufs.value_buf.len() > citadel_core::MAX_VALUE_SIZE {
1918            return Err(SqlError::RowTooLarge {
1919                size: bufs.value_buf.len(),
1920                max: citadel_core::MAX_VALUE_SIZE,
1921            });
1922        }
1923
1924        match compiled_conflict.as_ref() {
1925            None => {
1926                let is_new = wtx
1927                    .table_insert(table_bytes, &bufs.key_buf, &bufs.value_buf)
1928                    .map_err(SqlError::Storage)?;
1929                if !is_new {
1930                    return Err(SqlError::DuplicateKey);
1931                }
1932                let has_after_insert_triggers =
1933                    schema.triggers_for(&table_schema.name).iter().any(|t| {
1934                        t.enabled
1935                            && t.timing == crate::parser::TriggerTiming::After
1936                            && t.granularity == crate::parser::TriggerGranularity::ForEachRow
1937                            && t.events
1938                                .iter()
1939                                .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1940                    });
1941                if has_indices || has_after_insert_triggers {
1942                    for (j, &i) in pk_indices.iter().enumerate() {
1943                        bufs.row[i] = bufs.pk_values[j].clone();
1944                    }
1945                    for (j, &i) in non_pk.iter().enumerate() {
1946                        bufs.row[i] = std::mem::replace(
1947                            &mut bufs.value_values[enc_pos[j] as usize],
1948                            Value::Null,
1949                        );
1950                    }
1951                    if has_indices {
1952                        insert_index_entries(wtx, table_schema, &bufs.row, &bufs.pk_values)?;
1953                    }
1954                    if has_after_insert_triggers {
1955                        super::triggers::fire_row_triggers(
1956                            wtx,
1957                            schema,
1958                            &table_schema.name,
1959                            crate::parser::TriggerTiming::After,
1960                            super::triggers::FireEvent::Insert,
1961                            None,
1962                            Some(bufs.row.clone()),
1963                            &table_schema.columns,
1964                        )?;
1965                    }
1966                }
1967                if let Some(r) = row_for_stmt_trigger_impl.clone() {
1968                    stmt_new_rows_impl.push(r);
1969                }
1970                count += 1;
1971                if let Some(buf) = returning_rows.as_mut() {
1972                    buf.push((None, proposed_row_for_returning));
1973                }
1974            }
1975            Some(oc) => {
1976                let oc_ref: &CompiledOnConflict = oc;
1977                let needs_row = upsert_needs_row(oc_ref, table_schema);
1978                if needs_row {
1979                    for (j, &i) in pk_indices.iter().enumerate() {
1980                        bufs.row[i] = bufs.pk_values[j].clone();
1981                    }
1982                    for (j, &i) in non_pk.iter().enumerate() {
1983                        bufs.row[i] = std::mem::replace(
1984                            &mut bufs.value_values[enc_pos[j] as usize],
1985                            Value::Null,
1986                        );
1987                    }
1988                }
1989                let outcome = apply_insert_with_conflict(
1990                    wtx,
1991                    table_schema,
1992                    &bufs.key_buf,
1993                    &bufs.value_buf,
1994                    &bufs.row,
1995                    &bufs.pk_values,
1996                    oc_ref,
1997                    row_col_map.unwrap(),
1998                    stmt.returning.is_some(),
1999                )?;
2000                match outcome {
2001                    InsertRowOutcome::Inserted => {
2002                        count += 1;
2003                        if let Some(buf) = returning_rows.as_mut() {
2004                            buf.push((None, proposed_row_for_returning));
2005                        }
2006                        if let Some(r) = row_for_stmt_trigger_impl.clone() {
2007                            stmt_new_rows_impl.push(r);
2008                        }
2009                        let has_after_insert_triggers =
2010                            schema.triggers_for(&table_schema.name).iter().any(|t| {
2011                                t.enabled
2012                                    && t.timing == crate::parser::TriggerTiming::After
2013                                    && t.granularity
2014                                        == crate::parser::TriggerGranularity::ForEachRow
2015                                    && t.events
2016                                        .iter()
2017                                        .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
2018                            });
2019                        if has_after_insert_triggers {
2020                            super::triggers::fire_row_triggers(
2021                                wtx,
2022                                schema,
2023                                &table_schema.name,
2024                                crate::parser::TriggerTiming::After,
2025                                super::triggers::FireEvent::Insert,
2026                                None,
2027                                Some(bufs.row.clone()),
2028                                &table_schema.columns,
2029                            )?;
2030                        }
2031                    }
2032                    InsertRowOutcome::Updated { old, new } => {
2033                        count += 1;
2034                        if let Some(buf) = returning_rows.as_mut() {
2035                            buf.push((Some(old.clone()), Some(new.clone())));
2036                        }
2037                        let has_after_update_triggers =
2038                            schema.triggers_for(&table_schema.name).iter().any(|t| {
2039                                t.enabled
2040                                    && t.timing == crate::parser::TriggerTiming::After
2041                                    && t.granularity
2042                                        == crate::parser::TriggerGranularity::ForEachRow
2043                                    && t.events.iter().any(|e| {
2044                                        matches!(e, crate::parser::TriggerEvent::Update(_))
2045                                    })
2046                            });
2047                        if has_after_update_triggers {
2048                            let changed_cols: Vec<String> = match oc_ref {
2049                                CompiledOnConflict::DoUpdate { assignments, .. } => assignments
2050                                    .iter()
2051                                    .map(|(col_idx, _)| table_schema.columns[*col_idx].name.clone())
2052                                    .collect(),
2053                                _ => Vec::new(),
2054                            };
2055                            super::triggers::fire_row_triggers(
2056                                wtx,
2057                                schema,
2058                                &table_schema.name,
2059                                crate::parser::TriggerTiming::After,
2060                                super::triggers::FireEvent::Update {
2061                                    changed_columns: &changed_cols,
2062                                },
2063                                Some(old),
2064                                Some(new),
2065                                &table_schema.columns,
2066                            )?;
2067                        }
2068                    }
2069                    InsertRowOutcome::Skipped => {}
2070                }
2071            }
2072        }
2073    }
2074
2075    if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
2076        if has_insert_statement_triggers_impl {
2077            super::triggers::fire_statement_triggers(
2078                wtx,
2079                schema,
2080                &table_schema.name,
2081                crate::parser::TriggerTiming::After,
2082                super::triggers::FireEvent::Insert,
2083                &table_schema.columns,
2084                &[],
2085                &stmt_new_rows_impl,
2086            )?;
2087        }
2088        return Ok(ExecutionResult::Query(super::helpers::project_returning(
2089            table_schema,
2090            returning_cols,
2091            &rows,
2092        )?));
2093    }
2094
2095    if has_insert_statement_triggers_impl {
2096        super::triggers::fire_statement_triggers(
2097            wtx,
2098            schema,
2099            &table_schema.name,
2100            crate::parser::TriggerTiming::After,
2101            super::triggers::FireEvent::Insert,
2102            &table_schema.columns,
2103            &[],
2104            &stmt_new_rows_impl,
2105        )?;
2106    }
2107
2108    Ok(ExecutionResult::RowsAffected(count))
2109}
2110
2111pub struct CompiledInsert {
2112    table_lower: String,
2113    cached: Option<InsertCache>,
2114}
2115
2116struct InsertCache {
2117    col_indices: Vec<usize>,
2118    has_subquery: bool,
2119    any_defaults: bool,
2120    has_checks: bool,
2121    on_conflict: Option<Arc<CompiledOnConflict>>,
2122    row_col_map: Option<ColumnMap>,
2123    generated_col_positions: Vec<usize>,
2124    generated_fast_evals: Vec<FastGenEval>,
2125    pk_indices: Vec<usize>,
2126    non_pk_indices: Vec<usize>,
2127    encoding_positions: Vec<u16>,
2128    dropped_non_pk_slots: Vec<u16>,
2129    phys_count: usize,
2130    single_int_pk: bool,
2131    not_null_indices: Vec<u16>,
2132    bind_plan: Option<Vec<BindAction>>,
2133    row_fully_overwritten: bool,
2134    row_encoder: Option<crate::encoding::IntRowTemplate>,
2135    is_trivial_fast: bool,
2136    trivial_fast_program: Option<TrivialFastProgram>,
2137    needs_scoped_params: bool,
2138}
2139
2140#[derive(Clone)]
2141enum BindAction {
2142    Param {
2143        param_idx: usize,
2144        col_idx: usize,
2145        target: DataType,
2146    },
2147    Literal {
2148        value: Value,
2149        col_idx: usize,
2150    },
2151}
2152
2153#[derive(Clone)]
2154struct TrivialFastProgram {
2155    template: Vec<u8>,
2156    ops: Vec<WriteOp>,
2157    pk_param: u8,
2158    not_null_param_indices: Vec<u8>,
2159}
2160
2161#[derive(Clone)]
2162enum WriteOp {
2163    ParamI64 {
2164        param_idx: u8,
2165        off: u32,
2166    },
2167    LiteralI64 {
2168        value: i64,
2169        off: u32,
2170    },
2171    GenAddParamsI64 {
2172        a_param: u8,
2173        b_param: u8,
2174        off: u32,
2175        bitmap_byte_off: u32,
2176        bitmap_bit_mask: u8,
2177    },
2178    GenMulAddParamI64 {
2179        param_idx: u8,
2180        mul: i64,
2181        add: i64,
2182        off: u32,
2183        bitmap_byte_off: u32,
2184        bitmap_bit_mask: u8,
2185    },
2186}
2187
2188fn build_trivial_fast_program(
2189    bind_plan: &[BindAction],
2190    row_encoder: &crate::encoding::IntRowTemplate,
2191    non_virtual_pairs: &[(usize, usize)],
2192    generated_col_positions: &[usize],
2193    generated_fast_evals: &[FastGenEval],
2194    pk_indices: &[usize],
2195    columns: &[crate::types::ColumnDef],
2196) -> Option<TrivialFastProgram> {
2197    let pk_col = pk_indices[0];
2198    let col_to_slot: rustc_hash::FxHashMap<usize, usize> =
2199        non_virtual_pairs.iter().copied().collect();
2200    let slot_to_off: rustc_hash::FxHashMap<usize, usize> =
2201        row_encoder.slot_offsets.iter().copied().collect();
2202
2203    let mut col_to_param: rustc_hash::FxHashMap<usize, u8> = Default::default();
2204    let mut col_to_lit_int: rustc_hash::FxHashMap<usize, i64> = Default::default();
2205    let mut pk_param: Option<u8> = None;
2206    let mut ops: Vec<WriteOp> = Vec::with_capacity(bind_plan.len() + generated_col_positions.len());
2207    let mut not_null_param_indices: Vec<u8> = Vec::new();
2208
2209    for action in bind_plan {
2210        match action {
2211            BindAction::Param {
2212                param_idx,
2213                col_idx,
2214                target,
2215            } => {
2216                if *target != DataType::Integer {
2217                    return None;
2218                }
2219                let pi: u8 = u8::try_from(*param_idx).ok()?;
2220                col_to_param.insert(*col_idx, pi);
2221                if *col_idx == pk_col {
2222                    pk_param = Some(pi);
2223                } else {
2224                    let slot = *col_to_slot.get(col_idx)?;
2225                    let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
2226                    ops.push(WriteOp::ParamI64 { param_idx: pi, off });
2227                    if !columns[*col_idx].nullable {
2228                        not_null_param_indices.push(pi);
2229                    }
2230                }
2231            }
2232            BindAction::Literal { value, col_idx } => match value {
2233                Value::Integer(v) => {
2234                    col_to_lit_int.insert(*col_idx, *v);
2235                    if *col_idx == pk_col {
2236                        return None;
2237                    }
2238                    let slot = *col_to_slot.get(col_idx)?;
2239                    let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
2240                    ops.push(WriteOp::LiteralI64 { value: *v, off });
2241                }
2242                _ => return None,
2243            },
2244        }
2245    }
2246
2247    let pk_param = pk_param?;
2248
2249    for (i, &gen_pos) in generated_col_positions.iter().enumerate() {
2250        let gen_slot = *col_to_slot.get(&gen_pos)?;
2251        let gen_off = u32::try_from(*slot_to_off.get(&gen_slot)?).ok()?;
2252        let bitmap_byte_off = u32::try_from(2 + gen_slot / 8).ok()?;
2253        let bitmap_bit_mask: u8 = 1u8 << (gen_slot % 8);
2254        let gen_col_nullable = columns[gen_pos].nullable;
2255
2256        match &generated_fast_evals[i] {
2257            FastGenEval::IntColAddCol {
2258                left_idx,
2259                right_idx,
2260            } => {
2261                let a_param = col_to_param.get(left_idx).copied();
2262                let b_param = col_to_param.get(right_idx).copied();
2263                match (a_param, b_param) {
2264                    (Some(ap), Some(bp)) => {
2265                        let deps_safe = gen_col_nullable
2266                            || (not_null_param_indices.contains(&ap)
2267                                && not_null_param_indices.contains(&bp));
2268                        if !deps_safe {
2269                            return None;
2270                        }
2271                        ops.push(WriteOp::GenAddParamsI64 {
2272                            a_param: ap,
2273                            b_param: bp,
2274                            off: gen_off,
2275                            bitmap_byte_off,
2276                            bitmap_bit_mask,
2277                        });
2278                    }
2279                    (Some(p), None) => {
2280                        let lit = col_to_lit_int.get(right_idx).copied()?;
2281                        if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2282                            return None;
2283                        }
2284                        ops.push(WriteOp::GenMulAddParamI64 {
2285                            param_idx: p,
2286                            mul: 1,
2287                            add: lit,
2288                            off: gen_off,
2289                            bitmap_byte_off,
2290                            bitmap_bit_mask,
2291                        });
2292                    }
2293                    (None, Some(p)) => {
2294                        let lit = col_to_lit_int.get(left_idx).copied()?;
2295                        if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2296                            return None;
2297                        }
2298                        ops.push(WriteOp::GenMulAddParamI64 {
2299                            param_idx: p,
2300                            mul: 1,
2301                            add: lit,
2302                            off: gen_off,
2303                            bitmap_byte_off,
2304                            bitmap_bit_mask,
2305                        });
2306                    }
2307                    (None, None) => {
2308                        let la = col_to_lit_int.get(left_idx).copied()?;
2309                        let lb = col_to_lit_int.get(right_idx).copied()?;
2310                        ops.push(WriteOp::LiteralI64 {
2311                            value: la.wrapping_add(lb),
2312                            off: gen_off,
2313                        });
2314                    }
2315                }
2316            }
2317            FastGenEval::IntColMulAdd {
2318                col_schema_idx,
2319                mul,
2320                add,
2321            } => {
2322                if let Some(p) = col_to_param.get(col_schema_idx).copied() {
2323                    if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2324                        return None;
2325                    }
2326                    ops.push(WriteOp::GenMulAddParamI64 {
2327                        param_idx: p,
2328                        mul: *mul,
2329                        add: *add,
2330                        off: gen_off,
2331                        bitmap_byte_off,
2332                        bitmap_bit_mask,
2333                    });
2334                } else if let Some(lit) = col_to_lit_int.get(col_schema_idx).copied() {
2335                    ops.push(WriteOp::LiteralI64 {
2336                        value: lit.wrapping_mul(*mul).wrapping_add(*add),
2337                        off: gen_off,
2338                    });
2339                } else {
2340                    return None;
2341                }
2342            }
2343            FastGenEval::None => return None,
2344        }
2345    }
2346
2347    Some(TrivialFastProgram {
2348        template: row_encoder.template.clone(),
2349        ops,
2350        pk_param,
2351        not_null_param_indices,
2352    })
2353}
2354
2355#[derive(Clone)]
2356pub(super) enum CompiledOnConflict {
2357    DoNothing {
2358        target: Option<ConflictKind>,
2359    },
2360    DoUpdate {
2361        target: ConflictKind,
2362        assignments: Vec<(usize, Expr)>,
2363        where_clause: Option<Expr>,
2364        fast_paths: Option<Vec<DoUpdateFastPath>>,
2365    },
2366}
2367
2368#[derive(Clone, Copy)]
2369pub(super) enum DoUpdateFastPath {
2370    IntAddConst { phys_idx: usize, delta: i64 },
2371}
2372
2373#[derive(Clone, Debug)]
2374pub(super) enum ConflictKind {
2375    PrimaryKey,
2376    UniqueIndex { index_idx: usize },
2377}
2378
2379fn resolve_conflict_target(target: &ConflictTarget, ts: &TableSchema) -> Result<ConflictKind> {
2380    match target {
2381        ConflictTarget::Columns(cols) => {
2382            let col_idx_set: Vec<u16> = cols
2383                .iter()
2384                .map(|name| {
2385                    ts.column_index(name)
2386                        .map(|i| i as u16)
2387                        .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
2388                })
2389                .collect::<Result<_>>()?;
2390            let pk_set = ts.primary_key_columns.clone();
2391            if set_equal(&col_idx_set, &pk_set) {
2392                return Ok(ConflictKind::PrimaryKey);
2393            }
2394            for (index_idx, idx) in ts.indices.iter().enumerate() {
2395                if idx.unique && set_equal(&col_idx_set, &idx.columns_vec()) {
2396                    return Ok(ConflictKind::UniqueIndex { index_idx });
2397                }
2398            }
2399            Err(SqlError::Plan(
2400                "ON CONFLICT target does not match any unique constraint".into(),
2401            ))
2402        }
2403        ConflictTarget::Constraint(name) => {
2404            let lower = name.to_ascii_lowercase();
2405            for (index_idx, idx) in ts.indices.iter().enumerate() {
2406                if idx.name.eq_ignore_ascii_case(&lower) {
2407                    if idx.unique {
2408                        return Ok(ConflictKind::UniqueIndex { index_idx });
2409                    }
2410                    return Err(SqlError::Plan(format!(
2411                        "ON CONFLICT ON CONSTRAINT '{name}' requires a unique index"
2412                    )));
2413                }
2414            }
2415            Err(SqlError::Plan(format!(
2416                "unknown constraint '{name}'; primary keys cannot be referenced by name, use ON CONFLICT (col_list)"
2417            )))
2418        }
2419    }
2420}
2421
2422fn set_equal(a: &[u16], b: &[u16]) -> bool {
2423    if a.len() != b.len() {
2424        return false;
2425    }
2426    let mut a_sorted = a.to_vec();
2427    let mut b_sorted = b.to_vec();
2428    a_sorted.sort_unstable();
2429    b_sorted.sort_unstable();
2430    a_sorted == b_sorted
2431}
2432
2433pub(super) enum InsertRowOutcome {
2434    Inserted,
2435    Updated { old: Vec<Value>, new: Vec<Value> },
2436    Skipped,
2437}
2438
2439#[allow(clippy::too_many_arguments)]
2440#[inline]
2441pub(super) fn apply_insert_with_conflict(
2442    wtx: &mut WriteTxn<'_>,
2443    table_schema: &TableSchema,
2444    key_buf: &[u8],
2445    value_buf: &[u8],
2446    row: &[Value],
2447    pk_values: &[Value],
2448    on_conflict: &CompiledOnConflict,
2449    col_map: &ColumnMap,
2450    capture_returning: bool,
2451) -> Result<InsertRowOutcome> {
2452    let table_bytes = table_schema.name.as_bytes();
2453
2454    if let CompiledOnConflict::DoNothing { target } = on_conflict {
2455        let pk_target = matches!(target, None | Some(ConflictKind::PrimaryKey));
2456        if pk_target && table_schema.indices.is_empty() && table_schema.foreign_keys.is_empty() {
2457            let inserted = wtx
2458                .table_insert_if_absent(table_bytes, key_buf, value_buf)
2459                .map_err(SqlError::Storage)?;
2460            return Ok(if inserted {
2461                InsertRowOutcome::Inserted
2462            } else {
2463                InsertRowOutcome::Skipped
2464            });
2465        }
2466    }
2467
2468    if let CompiledOnConflict::DoUpdate {
2469        target: ConflictKind::PrimaryKey,
2470        assignments,
2471        where_clause,
2472        fast_paths,
2473    } = on_conflict
2474    {
2475        if can_fuse_do_update(table_schema, assignments) {
2476            return apply_do_update_fused(
2477                wtx,
2478                table_schema,
2479                table_bytes,
2480                key_buf,
2481                value_buf,
2482                row,
2483                assignments,
2484                where_clause.as_ref(),
2485                col_map,
2486                fast_paths.as_deref(),
2487                capture_returning,
2488            );
2489        }
2490    }
2491
2492    let primary_outcome = wtx
2493        .table_insert_or_fetch(table_bytes, key_buf, value_buf)
2494        .map_err(SqlError::Storage)?;
2495
2496    match primary_outcome {
2497        citadel_txn::write_txn::InsertOutcome::Inserted => {
2498            if table_schema.indices.is_empty() {
2499                return Ok(InsertRowOutcome::Inserted);
2500            }
2501            let mut inserted_keys: Vec<(usize, Vec<u8>)> = Vec::new();
2502            match insert_index_entries_or_fetch(
2503                wtx,
2504                table_schema,
2505                row,
2506                pk_values,
2507                &mut inserted_keys,
2508            )? {
2509                None => Ok(InsertRowOutcome::Inserted),
2510                Some(conflicting_idx) => {
2511                    let matches_target =
2512                        matches!(on_conflict, CompiledOnConflict::DoNothing { target: None })
2513                            || matches!(
2514                                on_conflict,
2515                                CompiledOnConflict::DoNothing {
2516                                    target: Some(ConflictKind::UniqueIndex { index_idx }),
2517                                } | CompiledOnConflict::DoUpdate {
2518                                    target: ConflictKind::UniqueIndex { index_idx },
2519                                    ..
2520                                } if *index_idx == conflicting_idx
2521                            );
2522                    undo_partial_insert(wtx, table_schema, key_buf, &inserted_keys)?;
2523                    if !matches_target {
2524                        return Err(SqlError::UniqueViolation(
2525                            table_schema.indices[conflicting_idx].name.clone(),
2526                        ));
2527                    }
2528                    match on_conflict {
2529                        CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2530                        CompiledOnConflict::DoUpdate {
2531                            assignments,
2532                            where_clause,
2533                            ..
2534                        } => {
2535                            let existing_pk =
2536                                fetch_unique_index_pk(wtx, table_schema, conflicting_idx, row)?;
2537                            apply_do_update(
2538                                wtx,
2539                                table_schema,
2540                                &existing_pk,
2541                                row,
2542                                assignments,
2543                                where_clause.as_ref(),
2544                                col_map,
2545                                capture_returning,
2546                            )
2547                        }
2548                    }
2549                }
2550            }
2551        }
2552        citadel_txn::write_txn::InsertOutcome::Existed(old_bytes) => {
2553            let matches_target = matches!(
2554                on_conflict,
2555                CompiledOnConflict::DoNothing { target: None }
2556                    | CompiledOnConflict::DoNothing {
2557                        target: Some(ConflictKind::PrimaryKey),
2558                    }
2559                    | CompiledOnConflict::DoUpdate {
2560                        target: ConflictKind::PrimaryKey,
2561                        ..
2562                    }
2563            );
2564            if !matches_target {
2565                return Err(SqlError::DuplicateKey);
2566            }
2567            match on_conflict {
2568                CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2569                CompiledOnConflict::DoUpdate {
2570                    assignments,
2571                    where_clause,
2572                    ..
2573                } => {
2574                    let old_row = decode_full_row(table_schema, key_buf, &old_bytes)?;
2575                    apply_do_update_with_old_row(
2576                        wtx,
2577                        table_schema,
2578                        key_buf,
2579                        &old_row,
2580                        row,
2581                        assignments,
2582                        where_clause.as_ref(),
2583                        col_map,
2584                        capture_returning,
2585                    )
2586                }
2587            }
2588        }
2589    }
2590}
2591
2592#[inline]
2593fn apply_fast_path_patch(
2594    old_bytes: &[u8],
2595    fast_paths: &[DoUpdateFastPath],
2596) -> Result<UpsertAction> {
2597    UPSERT_SCRATCH.with(|slot| {
2598        let mut bufs = slot.borrow_mut();
2599        bufs.new_value_buf.clear();
2600        bufs.new_value_buf.extend_from_slice(old_bytes);
2601
2602        let mut patch_scratch: Vec<u8> = Vec::new();
2603
2604        for fp in fast_paths {
2605            match fp {
2606                DoUpdateFastPath::IntAddConst { phys_idx, delta } => {
2607                    let decoded =
2608                        crate::encoding::decode_columns(&bufs.new_value_buf, &[*phys_idx])?;
2609                    let old_val = &decoded[0];
2610                    let new_val = match old_val {
2611                        Value::Integer(i) => Value::Integer(i.wrapping_add(*delta)),
2612                        Value::Null => Value::Null,
2613                        _ => {
2614                            return Err(SqlError::TypeMismatch {
2615                                expected: "INTEGER".into(),
2616                                got: old_val.data_type().to_string(),
2617                            });
2618                        }
2619                    };
2620                    if !crate::encoding::patch_column_in_place(
2621                        &mut bufs.new_value_buf,
2622                        *phys_idx,
2623                        &new_val,
2624                    )? {
2625                        patch_scratch.clear();
2626                        crate::encoding::patch_row_column(
2627                            &bufs.new_value_buf,
2628                            *phys_idx,
2629                            &new_val,
2630                            &mut patch_scratch,
2631                        )?;
2632                        std::mem::swap(&mut bufs.new_value_buf, &mut patch_scratch);
2633                    }
2634                }
2635            }
2636        }
2637
2638        if bufs.new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2639            return Err(SqlError::RowTooLarge {
2640                size: bufs.new_value_buf.len(),
2641                max: citadel_core::MAX_VALUE_SIZE,
2642            });
2643        }
2644
2645        Ok(UpsertAction::Replace(bufs.new_value_buf.clone()))
2646    })
2647}
2648
2649fn upsert_needs_row(oc: &CompiledOnConflict, ts: &TableSchema) -> bool {
2650    if !ts.indices.is_empty() {
2651        return true;
2652    }
2653    match oc {
2654        CompiledOnConflict::DoNothing { .. } => false,
2655        CompiledOnConflict::DoUpdate { fast_paths, .. } => fast_paths.is_none() || ts.has_checks(),
2656    }
2657}
2658
2659fn can_fuse_do_update(ts: &TableSchema, assignments: &[(usize, Expr)]) -> bool {
2660    if !ts.indices.is_empty() {
2661        return false;
2662    }
2663    if !ts.foreign_keys.is_empty() {
2664        return false;
2665    }
2666    if ts.columns.iter().any(|c| c.generated_kind.is_some()) {
2667        return false;
2668    }
2669    let pk = ts.pk_indices();
2670    !assignments.iter().any(|(ci, _)| pk.contains(ci))
2671}
2672
2673#[allow(clippy::too_many_arguments)]
2674#[inline]
2675fn apply_do_update_fused(
2676    wtx: &mut WriteTxn<'_>,
2677    table_schema: &TableSchema,
2678    table_bytes: &[u8],
2679    key_buf: &[u8],
2680    value_buf: &[u8],
2681    proposed_row: &[Value],
2682    assignments: &[(usize, Expr)],
2683    where_clause: Option<&Expr>,
2684    col_map: &ColumnMap,
2685    fast_paths: Option<&[DoUpdateFastPath]>,
2686    capture_returning: bool,
2687) -> Result<InsertRowOutcome> {
2688    let non_pk = table_schema.non_pk_indices();
2689    let enc_pos = table_schema.encoding_positions();
2690    let phys_count = table_schema.physical_non_pk_count();
2691    let dropped = table_schema.dropped_non_pk_slots();
2692    let has_checks = table_schema.has_checks();
2693    let has_fks = !table_schema.foreign_keys.is_empty();
2694
2695    let captured: std::cell::RefCell<Option<(Vec<Value>, Vec<Value>)>> =
2696        std::cell::RefCell::new(None);
2697
2698    let outcome =
2699        wtx.table_upsert_with::<_, SqlError>(table_bytes, key_buf, value_buf, |old_bytes| {
2700            if let Some(fps) = fast_paths {
2701                if !has_checks {
2702                    let action = apply_fast_path_patch(old_bytes, fps)?;
2703                    if capture_returning {
2704                        if let UpsertAction::Replace(ref new_bytes) = action {
2705                            let old_row = decode_full_row(table_schema, key_buf, old_bytes)?;
2706                            let new_row = decode_full_row(table_schema, key_buf, new_bytes)?;
2707                            *captured.borrow_mut() = Some((old_row, new_row));
2708                        }
2709                    }
2710                    return Ok(action);
2711                }
2712            }
2713            UPSERT_SCRATCH.with(|slot| {
2714                let mut bufs = slot.borrow_mut();
2715                let UpsertBufs {
2716                    old_row,
2717                    new_row,
2718                    value_values,
2719                    new_value_buf,
2720                } = &mut *bufs;
2721
2722                old_row.clear();
2723                old_row.resize(table_schema.columns.len(), Value::Null);
2724                decode_full_row_into(table_schema, key_buf, old_bytes, old_row)?;
2725
2726                if let Some(w) = where_clause {
2727                    let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2728                    let result = eval_expr(w, &ctx)?;
2729                    if result.is_null() || !is_truthy(&result) {
2730                        return Ok(UpsertAction::Skip);
2731                    }
2732                }
2733
2734                new_row.clear();
2735                new_row.extend_from_slice(old_row);
2736                for (col_idx, expr) in assignments {
2737                    let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2738                    let val = eval_expr(expr, &ctx)?;
2739                    let col = &table_schema.columns[*col_idx];
2740                    new_row[*col_idx] = if val.is_null() {
2741                        Value::Null
2742                    } else {
2743                        let got = val.data_type();
2744                        val.coerce_into(col.data_type)
2745                            .ok_or_else(|| SqlError::TypeMismatch {
2746                                expected: col.data_type.to_string(),
2747                                got: got.to_string(),
2748                            })?
2749                    };
2750                }
2751
2752                for (assigned_idx, _) in assignments {
2753                    let col = &table_schema.columns[*assigned_idx];
2754                    if !col.nullable && new_row[col.position as usize].is_null() {
2755                        return Err(SqlError::NotNullViolation(col.name.clone()));
2756                    }
2757                }
2758                if has_checks {
2759                    for col in &table_schema.columns {
2760                        if let Some(ref check) = col.check_expr {
2761                            let ctx = EvalCtx::new(col_map, new_row);
2762                            let result = eval_expr(check, &ctx)?;
2763                            if !is_truthy(&result) && !result.is_null() {
2764                                let name = col.check_name.as_deref().unwrap_or(&col.name);
2765                                return Err(SqlError::CheckViolation(name.to_string()));
2766                            }
2767                        }
2768                    }
2769                    for tc in &table_schema.check_constraints {
2770                        let ctx = EvalCtx::new(col_map, new_row);
2771                        let result = eval_expr(&tc.expr, &ctx)?;
2772                        if !is_truthy(&result) && !result.is_null() {
2773                            let name = tc.name.as_deref().unwrap_or(&tc.sql);
2774                            return Err(SqlError::CheckViolation(name.to_string()));
2775                        }
2776                    }
2777                }
2778                let _ = has_fks;
2779
2780                value_values.clear();
2781                value_values.resize(phys_count, Value::Null);
2782                for &slot in dropped {
2783                    value_values[slot as usize] = Value::Null;
2784                }
2785                for (j, &i) in non_pk.iter().enumerate() {
2786                    value_values[enc_pos[j] as usize] = new_row[i].clone();
2787                }
2788                new_value_buf.clear();
2789                crate::encoding::encode_row_into(value_values, new_value_buf);
2790
2791                if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2792                    return Err(SqlError::RowTooLarge {
2793                        size: new_value_buf.len(),
2794                        max: citadel_core::MAX_VALUE_SIZE,
2795                    });
2796                }
2797
2798                if capture_returning {
2799                    *captured.borrow_mut() = Some((old_row.clone(), new_row.clone()));
2800                }
2801                Ok(UpsertAction::Replace(new_value_buf.clone()))
2802            })
2803        })?;
2804
2805    match outcome {
2806        UpsertOutcome::Inserted => Ok(InsertRowOutcome::Inserted),
2807        UpsertOutcome::Updated => {
2808            if capture_returning {
2809                let (old, new) = captured.into_inner().ok_or_else(|| {
2810                    SqlError::InvalidValue("DO UPDATE produced no captured rows".into())
2811                })?;
2812                Ok(InsertRowOutcome::Updated { old, new })
2813            } else {
2814                Ok(InsertRowOutcome::Inserted)
2815            }
2816        }
2817        UpsertOutcome::Skipped => Ok(InsertRowOutcome::Skipped),
2818    }
2819}
2820
2821fn fetch_unique_index_pk(
2822    wtx: &mut WriteTxn<'_>,
2823    table_schema: &TableSchema,
2824    index_idx: usize,
2825    row: &[Value],
2826) -> Result<Vec<u8>> {
2827    let idx = &table_schema.indices[index_idx];
2828    let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2829    let indexed: Vec<Value> = idx
2830        .column_positions_iter()
2831        .map(|col_idx| row[col_idx as usize].clone())
2832        .collect();
2833    let key = crate::encoding::encode_composite_key(&indexed);
2834    let value = wtx
2835        .table_get(&idx_table, &key)
2836        .map_err(SqlError::Storage)?
2837        .ok_or_else(|| {
2838            SqlError::InvalidValue("unique index missing expected collision entry".into())
2839        })?;
2840    Ok(value)
2841}
2842
2843#[allow(clippy::too_many_arguments)]
2844fn apply_do_update(
2845    wtx: &mut WriteTxn<'_>,
2846    table_schema: &TableSchema,
2847    pk_key: &[u8],
2848    proposed_row: &[Value],
2849    assignments: &[(usize, Expr)],
2850    where_clause: Option<&Expr>,
2851    col_map: &ColumnMap,
2852    capture_returning: bool,
2853) -> Result<InsertRowOutcome> {
2854    let old_value = wtx
2855        .table_get(table_schema.name.as_bytes(), pk_key)
2856        .map_err(SqlError::Storage)?
2857        .ok_or_else(|| SqlError::InvalidValue("primary row missing for DO UPDATE target".into()))?;
2858    let old_row = decode_full_row(table_schema, pk_key, &old_value)?;
2859    apply_do_update_with_old_row(
2860        wtx,
2861        table_schema,
2862        pk_key,
2863        &old_row,
2864        proposed_row,
2865        assignments,
2866        where_clause,
2867        col_map,
2868        capture_returning,
2869    )
2870}
2871
2872#[allow(clippy::too_many_arguments)]
2873fn apply_do_update_with_old_row(
2874    wtx: &mut WriteTxn<'_>,
2875    table_schema: &TableSchema,
2876    old_pk_key: &[u8],
2877    old_row: &[Value],
2878    proposed_row: &[Value],
2879    assignments: &[(usize, Expr)],
2880    where_clause: Option<&Expr>,
2881    col_map: &ColumnMap,
2882    capture_returning: bool,
2883) -> Result<InsertRowOutcome> {
2884    if let Some(w) = where_clause {
2885        let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2886        let result = eval_expr(w, &ctx)?;
2887        if result.is_null() || !is_truthy(&result) {
2888            return Ok(InsertRowOutcome::Skipped);
2889        }
2890    }
2891
2892    let mut new_row = old_row.to_vec();
2893    for (col_idx, expr) in assignments {
2894        let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2895        let val = eval_expr(expr, &ctx)?;
2896        let col = &table_schema.columns[*col_idx];
2897        new_row[*col_idx] = if val.is_null() {
2898            Value::Null
2899        } else {
2900            let got = val.data_type();
2901            val.coerce_into(col.data_type)
2902                .ok_or_else(|| SqlError::TypeMismatch {
2903                    expected: col.data_type.to_string(),
2904                    got: got.to_string(),
2905                })?
2906        };
2907    }
2908
2909    for col in &table_schema.columns {
2910        if matches!(
2911            col.generated_kind,
2912            Some(crate::parser::GeneratedKind::Stored)
2913        ) {
2914            let val = eval_expr(
2915                col.generated_expr.as_ref().unwrap(),
2916                &EvalCtx::new(col_map, &new_row),
2917            )?;
2918            let pos = col.position as usize;
2919            new_row[pos] = if val.is_null() {
2920                if !col.nullable {
2921                    return Err(SqlError::NotNullViolation(col.name.clone()));
2922                }
2923                Value::Null
2924            } else {
2925                let got = val.data_type();
2926                val.coerce_into(col.data_type)
2927                    .ok_or_else(|| SqlError::TypeMismatch {
2928                        expected: col.data_type.to_string(),
2929                        got: got.to_string(),
2930                    })?
2931            };
2932        }
2933    }
2934
2935    let pk_indices = table_schema.pk_indices();
2936    let assigned_pk = assignments.iter().any(|(ci, _)| pk_indices.contains(ci));
2937    let pk_changed = assigned_pk && pk_indices.iter().any(|&i| old_row[i] != new_row[i]);
2938
2939    for (assigned_idx, _) in assignments {
2940        let col = &table_schema.columns[*assigned_idx];
2941        if !col.nullable && new_row[col.position as usize].is_null() {
2942            return Err(SqlError::NotNullViolation(col.name.clone()));
2943        }
2944    }
2945    if table_schema.has_checks() {
2946        for col in &table_schema.columns {
2947            if let Some(ref check) = col.check_expr {
2948                let ctx = EvalCtx::new(col_map, &new_row);
2949                let result = eval_expr(check, &ctx)?;
2950                if !is_truthy(&result) && !result.is_null() {
2951                    let name = col.check_name.as_deref().unwrap_or(&col.name);
2952                    return Err(SqlError::CheckViolation(name.to_string()));
2953                }
2954            }
2955        }
2956        for tc in &table_schema.check_constraints {
2957            let ctx = EvalCtx::new(col_map, &new_row);
2958            let result = eval_expr(&tc.expr, &ctx)?;
2959            if !is_truthy(&result) && !result.is_null() {
2960                let name = tc.name.as_deref().unwrap_or(&tc.sql);
2961                return Err(SqlError::CheckViolation(name.to_string()));
2962            }
2963        }
2964    }
2965    for fk in &table_schema.foreign_keys {
2966        let changed = fk
2967            .columns
2968            .iter()
2969            .any(|&ci| old_row[ci as usize] != new_row[ci as usize]);
2970        if !changed {
2971            continue;
2972        }
2973        let any_null = fk.columns.iter().any(|&ci| new_row[ci as usize].is_null());
2974        if any_null {
2975            continue;
2976        }
2977        let fk_vals: Vec<Value> = fk
2978            .columns
2979            .iter()
2980            .map(|&ci| new_row[ci as usize].clone())
2981            .collect();
2982        let fk_key = crate::encoding::encode_composite_key(&fk_vals);
2983        if fk.deferrable && fk.initially_deferred {
2984            let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
2985            wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
2986                fk_name: name,
2987                foreign_table: fk.foreign_table.as_bytes().to_vec(),
2988                parent_key: fk_key,
2989            });
2990            continue;
2991        }
2992        if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key) {
2993            let found = wtx
2994                .table_get(fk.foreign_table.as_bytes(), &fk_key)
2995                .map_err(SqlError::Storage)?;
2996            if found.is_none() {
2997                let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
2998                return Err(SqlError::ForeignKeyViolation(name.to_string()));
2999            }
3000            wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key);
3001        }
3002    }
3003
3004    let has_indices = !table_schema.indices.is_empty();
3005    let old_pk_values: Vec<Value> = if has_indices || pk_changed {
3006        pk_indices.iter().map(|&i| old_row[i].clone()).collect()
3007    } else {
3008        Vec::new()
3009    };
3010    let new_pk_values: Vec<Value> = if has_indices || pk_changed {
3011        pk_indices.iter().map(|&i| new_row[i].clone()).collect()
3012    } else {
3013        Vec::new()
3014    };
3015
3016    let non_pk = table_schema.non_pk_indices();
3017    let enc_pos = table_schema.encoding_positions();
3018    let phys_count = table_schema.physical_non_pk_count();
3019    let dropped = table_schema.dropped_non_pk_slots();
3020    let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
3021    for &slot in dropped {
3022        value_values[slot as usize] = Value::Null;
3023    }
3024    for (j, &i) in non_pk.iter().enumerate() {
3025        let col = &table_schema.columns[i];
3026        value_values[enc_pos[j] as usize] = if matches!(
3027            col.generated_kind,
3028            Some(crate::parser::GeneratedKind::Virtual)
3029        ) {
3030            Value::Null
3031        } else {
3032            new_row[i].clone()
3033        };
3034    }
3035    let mut new_value_buf = Vec::with_capacity(256);
3036    crate::encoding::encode_row_into(&value_values, &mut new_value_buf);
3037
3038    if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
3039        return Err(SqlError::RowTooLarge {
3040            size: new_value_buf.len(),
3041            max: citadel_core::MAX_VALUE_SIZE,
3042        });
3043    }
3044
3045    if pk_changed {
3046        let new_pk_key = crate::encoding::encode_composite_key(&new_pk_values);
3047        let inserted = wtx
3048            .table_insert(table_schema.name.as_bytes(), &new_pk_key, &new_value_buf)
3049            .map_err(SqlError::Storage)?;
3050        if !inserted {
3051            return Err(SqlError::DuplicateKey);
3052        }
3053        wtx.table_delete(table_schema.name.as_bytes(), old_pk_key)
3054            .map_err(SqlError::Storage)?;
3055        for idx in &table_schema.indices {
3056            let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
3057            let old_idx_key =
3058                encode_index_key_with_schema(idx, old_row, &old_pk_values, table_schema);
3059            wtx.table_delete(&idx_table, &old_idx_key)
3060                .map_err(SqlError::Storage)?;
3061            let new_idx_key =
3062                encode_index_key_with_schema(idx, &new_row, &new_pk_values, table_schema);
3063            let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
3064            let is_new = wtx
3065                .table_insert(&idx_table, &new_idx_key, &new_idx_val)
3066                .map_err(SqlError::Storage)?;
3067            if idx.unique && !is_new {
3068                let any_null = idx
3069                    .column_positions_iter()
3070                    .any(|c| new_row[c as usize].is_null());
3071                if !any_null {
3072                    return Err(SqlError::UniqueViolation(idx.name.clone()));
3073                }
3074            }
3075        }
3076    } else {
3077        wtx.table_update_sorted(
3078            table_schema.name.as_bytes(),
3079            &[(old_pk_key, new_value_buf.as_slice())],
3080        )
3081        .map_err(SqlError::Storage)?;
3082        let col_map_partial = any_partial_index(table_schema).then(|| table_schema.column_map());
3083        for idx in &table_schema.indices {
3084            let cols_changed = index_columns_changed(idx, old_row, &new_row);
3085            let (del, ins) = partial_idx_update_actions(
3086                idx,
3087                old_row,
3088                &new_row,
3089                cols_changed,
3090                false,
3091                col_map_partial,
3092            );
3093            let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
3094            if del {
3095                let old_idx_key =
3096                    encode_index_key_with_schema(idx, old_row, &old_pk_values, table_schema);
3097                wtx.table_delete(&idx_table, &old_idx_key)
3098                    .map_err(SqlError::Storage)?;
3099            }
3100            if ins {
3101                let new_idx_key =
3102                    encode_index_key_with_schema(idx, &new_row, &new_pk_values, table_schema);
3103                let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
3104                let is_new = wtx
3105                    .table_insert(&idx_table, &new_idx_key, &new_idx_val)
3106                    .map_err(SqlError::Storage)?;
3107                if idx.unique && !is_new {
3108                    let any_null = idx
3109                        .column_positions_iter()
3110                        .any(|c| new_row[c as usize].is_null());
3111                    if !any_null {
3112                        return Err(SqlError::UniqueViolation(idx.name.clone()));
3113                    }
3114                }
3115            }
3116        }
3117    }
3118
3119    if capture_returning {
3120        Ok(InsertRowOutcome::Updated {
3121            old: old_row.to_vec(),
3122            new: new_row,
3123        })
3124    } else {
3125        Ok(InsertRowOutcome::Inserted)
3126    }
3127}
3128
3129fn detect_fast_paths(
3130    ts: &TableSchema,
3131    assignments: &[(usize, Expr)],
3132) -> Option<Vec<DoUpdateFastPath>> {
3133    let non_pk = ts.non_pk_indices();
3134    let enc_pos = ts.encoding_positions();
3135    let mut out = Vec::with_capacity(assignments.len());
3136    for (col_idx, expr) in assignments {
3137        let col = &ts.columns[*col_idx];
3138        if col.data_type != DataType::Integer {
3139            return None;
3140        }
3141        let nonpk_order = non_pk.iter().position(|&i| i == *col_idx)?;
3142        let phys_idx = enc_pos[nonpk_order] as usize;
3143
3144        if let Expr::BinaryOp { left, op, right } = expr {
3145            if !matches!(op, BinOp::Add | BinOp::Sub) {
3146                return None;
3147            }
3148            let reads_target =
3149                matches!(left.as_ref(), Expr::Column(n) if n.eq_ignore_ascii_case(&col.name));
3150            if !reads_target {
3151                return None;
3152            }
3153            if let Expr::Literal(Value::Integer(n)) = right.as_ref() {
3154                let delta = if matches!(op, BinOp::Sub) { -n } else { *n };
3155                let _ = col_idx;
3156                out.push(DoUpdateFastPath::IntAddConst { phys_idx, delta });
3157                continue;
3158            }
3159            return None;
3160        }
3161        return None;
3162    }
3163    Some(out)
3164}
3165
3166fn compile_on_conflict(oc: &OnConflictClause, ts: &TableSchema) -> Result<CompiledOnConflict> {
3167    let target = oc
3168        .target
3169        .as_ref()
3170        .map(|t| resolve_conflict_target(t, ts))
3171        .transpose()?;
3172    match &oc.action {
3173        OnConflictAction::DoNothing => Ok(CompiledOnConflict::DoNothing { target }),
3174        OnConflictAction::DoUpdate {
3175            assignments,
3176            where_clause,
3177        } => {
3178            let target = target.ok_or_else(|| {
3179                SqlError::Plan("ON CONFLICT without target requires DO NOTHING".into())
3180            })?;
3181            let compiled_assignments: Vec<(usize, Expr)> = assignments
3182                .iter()
3183                .map(|(name, expr)| {
3184                    let col_idx = ts
3185                        .column_index(name)
3186                        .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?;
3187                    Ok((col_idx, expr.clone()))
3188                })
3189                .collect::<Result<_>>()?;
3190            let fast_paths = if where_clause.is_none() {
3191                detect_fast_paths(ts, &compiled_assignments)
3192            } else {
3193                None
3194            };
3195            Ok(CompiledOnConflict::DoUpdate {
3196                target,
3197                assignments: compiled_assignments,
3198                where_clause: where_clause.clone(),
3199                fast_paths,
3200            })
3201        }
3202    }
3203}
3204
3205/// Caller MUST check `cache.is_trivial_fast` first.
3206fn exec_insert_trivial_fast(
3207    wtx: &mut WriteTxn<'_>,
3208    table_lower: &str,
3209    cache: &InsertCache,
3210    bufs: &mut InsertBufs,
3211    params: &[Value],
3212) -> Result<ExecutionResult> {
3213    let prog = cache
3214        .trivial_fast_program
3215        .as_ref()
3216        .expect("trivial fast: program");
3217
3218    for &p in &prog.not_null_param_indices {
3219        if params[p as usize].is_null() {
3220            return Err(SqlError::NotNullViolation(format!("param@{p}")));
3221        }
3222    }
3223
3224    match &params[prog.pk_param as usize] {
3225        Value::Integer(v) => crate::encoding::encode_int_key_into(*v, &mut bufs.key_buf),
3226        _ => return Err(SqlError::InvalidValue("non-integer PK in fast path".into())),
3227    }
3228
3229    bufs.value_buf.clear();
3230    bufs.value_buf.extend_from_slice(&prog.template);
3231
3232    for op in &prog.ops {
3233        match op {
3234            WriteOp::ParamI64 { param_idx, off } => match &params[*param_idx as usize] {
3235                Value::Integer(v) => {
3236                    let off = *off as usize;
3237                    bufs.value_buf[off..off + 8].copy_from_slice(&v.to_le_bytes());
3238                }
3239                other => {
3240                    return Err(SqlError::TypeMismatch {
3241                        expected: "Integer".into(),
3242                        got: other.data_type().to_string(),
3243                    });
3244                }
3245            },
3246            WriteOp::LiteralI64 { value, off } => {
3247                let off = *off as usize;
3248                bufs.value_buf[off..off + 8].copy_from_slice(&value.to_le_bytes());
3249            }
3250            WriteOp::GenAddParamsI64 {
3251                a_param,
3252                b_param,
3253                off,
3254                bitmap_byte_off,
3255                bitmap_bit_mask,
3256            } => match (&params[*a_param as usize], &params[*b_param as usize]) {
3257                (Value::Integer(a), Value::Integer(b)) => {
3258                    let off = *off as usize;
3259                    bufs.value_buf[off..off + 8].copy_from_slice(&a.wrapping_add(*b).to_le_bytes());
3260                }
3261                _ => {
3262                    bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
3263                }
3264            },
3265            WriteOp::GenMulAddParamI64 {
3266                param_idx,
3267                mul,
3268                add,
3269                off,
3270                bitmap_byte_off,
3271                bitmap_bit_mask,
3272            } => match &params[*param_idx as usize] {
3273                Value::Integer(v) => {
3274                    let r = v.wrapping_mul(*mul).wrapping_add(*add);
3275                    let off = *off as usize;
3276                    bufs.value_buf[off..off + 8].copy_from_slice(&r.to_le_bytes());
3277                }
3278                _ => {
3279                    bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
3280                }
3281            },
3282        }
3283    }
3284
3285    let is_new = wtx
3286        .table_insert(table_lower.as_bytes(), &bufs.key_buf, &bufs.value_buf)
3287        .map_err(SqlError::Storage)?;
3288    if !is_new {
3289        return Err(SqlError::DuplicateKey);
3290    }
3291    Ok(ExecutionResult::RowsAffected(1))
3292}
3293
3294fn build_bind_plan(
3295    stmt: &InsertStmt,
3296    col_indices: &[usize],
3297    col_data_types: &[DataType],
3298) -> Option<Vec<BindAction>> {
3299    let rows = match &stmt.source {
3300        InsertSource::Values(rows) => rows,
3301        _ => return None,
3302    };
3303    if rows.len() != 1 {
3304        return None;
3305    }
3306    let value_row = &rows[0];
3307    if value_row.len() != col_indices.len() {
3308        return None;
3309    }
3310    let mut plan = Vec::with_capacity(value_row.len());
3311    for (i, expr) in value_row.iter().enumerate() {
3312        let col_idx = col_indices[i];
3313        let target = col_data_types[col_idx];
3314        match expr {
3315            Expr::Parameter(n) => {
3316                if *n == 0 {
3317                    return None;
3318                }
3319                plan.push(BindAction::Param {
3320                    param_idx: n - 1,
3321                    col_idx,
3322                    target,
3323                });
3324            }
3325            Expr::Literal(v) => plan.push(BindAction::Literal {
3326                value: v.clone(),
3327                col_idx,
3328            }),
3329            _ => return None,
3330        }
3331    }
3332    Some(plan)
3333}
3334
3335impl CompiledInsert {
3336    pub fn try_compile(schema: &SchemaManager, stmt: &InsertStmt) -> Option<Self> {
3337        let lower = stmt.table.to_ascii_lowercase();
3338        let cached = if let Some(ts) = schema.get(&lower) {
3339            let insert_columns: Vec<&str> = if stmt.columns.is_empty() {
3340                ts.columns.iter().map(|c| c.name.as_str()).collect()
3341            } else {
3342                stmt.columns.iter().map(|s| s.as_str()).collect()
3343            };
3344            let mut col_indices = Vec::with_capacity(insert_columns.len());
3345            for name in &insert_columns {
3346                col_indices.push(ts.column_index(name)?);
3347            }
3348            if col_indices
3349                .iter()
3350                .any(|&ci| ts.columns[ci].generated_kind.is_some())
3351            {
3352                return None;
3353            }
3354            let on_conflict = stmt
3355                .on_conflict
3356                .as_ref()
3357                .map(|oc| compile_on_conflict(oc, ts))
3358                .transpose()
3359                .ok()
3360                .flatten()
3361                .map(Arc::new);
3362            let generated_col_positions: Vec<usize> = ts
3363                .columns
3364                .iter()
3365                .enumerate()
3366                .filter_map(|(i, c)| {
3367                    matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored))
3368                        .then_some(i)
3369                })
3370                .collect();
3371            let generated_fast_evals: Vec<FastGenEval> = generated_col_positions
3372                .iter()
3373                .map(|&pos| {
3374                    detect_fast_gen_eval(ts.columns[pos].generated_expr.as_ref().unwrap(), ts)
3375                })
3376                .collect();
3377            let row_col_map = if on_conflict.is_some() || !generated_col_positions.is_empty() {
3378                Some(ColumnMap::new(&ts.columns))
3379            } else {
3380                None
3381            };
3382            let pk_indices: Vec<usize> = ts.pk_indices().to_vec();
3383            let non_pk_indices: Vec<usize> = ts.non_pk_indices().to_vec();
3384            let encoding_positions: Vec<u16> = ts.encoding_positions().to_vec();
3385            let dropped_non_pk_slots: Vec<u16> = ts.dropped_non_pk_slots().to_vec();
3386            let phys_count = ts.physical_non_pk_count();
3387            let col_data_types: Vec<DataType> = ts.columns.iter().map(|c| c.data_type).collect();
3388            let single_int_pk =
3389                pk_indices.len() == 1 && ts.columns[pk_indices[0]].data_type == DataType::Integer;
3390            let not_null_indices: Vec<u16> = ts
3391                .columns
3392                .iter()
3393                .filter(|c| !c.nullable)
3394                .map(|c| c.position)
3395                .collect();
3396            let bind_plan = build_bind_plan(stmt, &col_indices, &col_data_types);
3397            let any_defaults_flag = ts.columns.iter().any(|c| c.default_expr.is_some());
3398            let row_fully_overwritten = if any_defaults_flag {
3399                false
3400            } else {
3401                let mut covered: rustc_hash::FxHashSet<usize> =
3402                    col_indices.iter().copied().collect();
3403                covered.extend(generated_col_positions.iter().copied());
3404                for (j, &i) in non_pk_indices.iter().enumerate() {
3405                    let _ = j;
3406                    if matches!(
3407                        ts.columns[i].generated_kind,
3408                        Some(crate::parser::GeneratedKind::Virtual)
3409                    ) {
3410                        covered.insert(i);
3411                    }
3412                }
3413                bind_plan.is_some() && covered.len() == ts.columns.len()
3414            };
3415            let has_fks = !ts.foreign_keys.is_empty();
3416            let has_indices = !ts.indices.is_empty();
3417            let mut non_virtual_pairs: Vec<(usize, usize)> = Vec::new();
3418            let mut null_value_slots: Vec<usize> =
3419                dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3420            for (j, &i) in non_pk_indices.iter().enumerate() {
3421                let slot = encoding_positions[j] as usize;
3422                if matches!(
3423                    ts.columns[i].generated_kind,
3424                    Some(crate::parser::GeneratedKind::Virtual)
3425                ) {
3426                    null_value_slots.push(slot);
3427                } else {
3428                    non_virtual_pairs.push((i, slot));
3429                }
3430            }
3431            let row_encoder = {
3432                let all_int_or_null = non_pk_indices.iter().enumerate().all(|(j, &i)| {
3433                    let col = &ts.columns[i];
3434                    if matches!(
3435                        col.generated_kind,
3436                        Some(crate::parser::GeneratedKind::Virtual)
3437                    ) {
3438                        true
3439                    } else {
3440                        col.data_type == DataType::Integer && encoding_positions[j] != u16::MAX
3441                    }
3442                });
3443                if all_int_or_null {
3444                    let mut null_slots: Vec<usize> =
3445                        dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3446                    for (j, &i) in non_pk_indices.iter().enumerate() {
3447                        if matches!(
3448                            ts.columns[i].generated_kind,
3449                            Some(crate::parser::GeneratedKind::Virtual)
3450                        ) {
3451                            null_slots.push(encoding_positions[j] as usize);
3452                        }
3453                    }
3454                    Some(crate::encoding::build_int_row_template(
3455                        phys_count,
3456                        &null_slots,
3457                    ))
3458                } else {
3459                    None
3460                }
3461            };
3462            let is_trivial_fast_eligible = !insert_has_subquery(stmt)
3463                && !ts.columns.iter().any(|c| c.default_expr.is_some())
3464                && !ts.has_checks()
3465                && !has_fks
3466                && !has_indices
3467                && stmt.on_conflict.is_none()
3468                && stmt.returning.is_none()
3469                && bind_plan.is_some()
3470                && row_encoder.is_some()
3471                && row_fully_overwritten
3472                && single_int_pk
3473                && generated_fast_evals
3474                    .iter()
3475                    .all(|fe| !matches!(fe, FastGenEval::None));
3476            let trivial_fast_program = if is_trivial_fast_eligible {
3477                build_trivial_fast_program(
3478                    bind_plan.as_ref().unwrap(),
3479                    row_encoder.as_ref().unwrap(),
3480                    &non_virtual_pairs,
3481                    &generated_col_positions,
3482                    &generated_fast_evals,
3483                    &pk_indices,
3484                    &ts.columns,
3485                )
3486            } else {
3487                None
3488            };
3489            let is_trivial_fast = trivial_fast_program.is_some();
3490            let has_checks = ts.has_checks();
3491            let any_defaults = ts.columns.iter().any(|c| c.default_expr.is_some());
3492            let needs_scoped_params = bind_plan.is_none()
3493                || has_checks
3494                || any_defaults
3495                || !generated_col_positions.is_empty()
3496                || on_conflict.is_some()
3497                || stmt.returning.is_some()
3498                || insert_has_subquery(stmt)
3499                || super::helpers::any_partial_index(ts);
3500            Some(InsertCache {
3501                col_indices,
3502                has_subquery: insert_has_subquery(stmt),
3503                any_defaults,
3504                has_checks,
3505                on_conflict,
3506                row_col_map,
3507                generated_col_positions,
3508                generated_fast_evals,
3509                pk_indices,
3510                non_pk_indices,
3511                encoding_positions,
3512                dropped_non_pk_slots,
3513                phys_count,
3514                single_int_pk,
3515                not_null_indices,
3516                bind_plan,
3517                row_fully_overwritten,
3518                row_encoder,
3519                is_trivial_fast,
3520                trivial_fast_program,
3521                needs_scoped_params,
3522            })
3523        } else if schema.get_view(&lower).is_some() {
3524            None
3525        } else {
3526            return None;
3527        };
3528        Some(Self {
3529            table_lower: lower,
3530            cached,
3531        })
3532    }
3533}
3534
3535impl CompiledPlan for CompiledInsert {
3536    fn execute(
3537        &self,
3538        db: &Database,
3539        schema: &SchemaManager,
3540        stmt: &Statement,
3541        params: &[Value],
3542        txn: super::compile::ActiveTxnRef<'_, '_>,
3543    ) -> Result<ExecutionResult> {
3544        let ins = match stmt {
3545            Statement::Insert(i) => i,
3546            _ => {
3547                return Err(SqlError::Unsupported(
3548                    "CompiledInsert received non-INSERT statement".into(),
3549                ))
3550            }
3551        };
3552        use super::compile::ActiveTxnRef;
3553        match txn {
3554            ActiveTxnRef::None => exec_insert(db, schema, ins, params),
3555            ActiveTxnRef::Read(_) => Err(SqlError::Unsupported(
3556                "cannot execute mutating statement inside a read-only transaction".into(),
3557            )),
3558            ActiveTxnRef::Write(outer) => match self.cached.as_ref() {
3559                Some(c) if c.is_trivial_fast => with_insert_scratch(|bufs| {
3560                    exec_insert_trivial_fast(outer, &self.table_lower, c, bufs, params)
3561                }),
3562                Some(c) => exec_insert_in_txn_cached(outer, schema, ins, params, c),
3563                None => exec_insert_in_txn(outer, schema, ins, params),
3564            },
3565        }
3566    }
3567
3568    fn uses_scoped_params(&self) -> bool {
3569        match self.cached.as_ref() {
3570            Some(c) => !c.is_trivial_fast && c.needs_scoped_params,
3571            None => true,
3572        }
3573    }
3574}
3575
3576pub struct CompiledDelete {
3577    table_lower: String,
3578}
3579
3580impl CompiledDelete {
3581    pub fn try_compile(schema: &SchemaManager, stmt: &DeleteStmt) -> Option<Self> {
3582        let lower = stmt.table.to_ascii_lowercase();
3583        schema.get(&lower)?;
3584        Some(Self { table_lower: lower })
3585    }
3586}
3587
3588impl CompiledPlan for CompiledDelete {
3589    fn execute(
3590        &self,
3591        db: &Database,
3592        schema: &SchemaManager,
3593        stmt: &Statement,
3594        _params: &[Value],
3595        txn: super::compile::ActiveTxnRef<'_, '_>,
3596    ) -> Result<ExecutionResult> {
3597        let del = match stmt {
3598            Statement::Delete(d) => d,
3599            _ => {
3600                return Err(SqlError::Unsupported(
3601                    "CompiledDelete received non-DELETE statement".into(),
3602                ))
3603            }
3604        };
3605        let _ = &self.table_lower;
3606        use super::compile::ActiveTxnRef;
3607        match txn {
3608            ActiveTxnRef::None => super::write::exec_delete(db, schema, del),
3609            ActiveTxnRef::Read(_) => Err(SqlError::Unsupported(
3610                "cannot execute mutating statement inside a read-only transaction".into(),
3611            )),
3612            ActiveTxnRef::Write(outer) => super::write::exec_delete_in_txn(outer, schema, del),
3613        }
3614    }
3615}
3616
3617fn exec_instead_of_view_insert_auto(
3618    db: &Database,
3619    schema: &SchemaManager,
3620    view_name: &str,
3621    aliases: &[String],
3622    stmt: &InsertStmt,
3623    params: &[Value],
3624) -> Result<ExecutionResult> {
3625    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
3626    let r = exec_instead_of_view_insert_in_txn(&mut wtx, schema, view_name, aliases, stmt, params)?;
3627    wtx.commit().map_err(SqlError::Storage)?;
3628    Ok(r)
3629}
3630
3631fn exec_instead_of_view_insert_in_txn(
3632    wtx: &mut WriteTxn<'_>,
3633    schema: &SchemaManager,
3634    view_name: &str,
3635    aliases: &[String],
3636    stmt: &InsertStmt,
3637    params: &[Value],
3638) -> Result<ExecutionResult> {
3639    // CREATE VIEW without explicit aliases stores an empty vec — derive at runtime.
3640    let resolved_aliases: Vec<String> = if aliases.is_empty() {
3641        derive_view_columns(wtx, schema, view_name)?
3642    } else {
3643        aliases.to_vec()
3644    };
3645    let view_cols = super::triggers::view_columns_from_aliases(&resolved_aliases);
3646    let alias_map: rustc_hash::FxHashMap<String, usize> = resolved_aliases
3647        .iter()
3648        .enumerate()
3649        .map(|(i, name)| (name.to_ascii_lowercase(), i))
3650        .collect();
3651
3652    let target_positions: Vec<usize> = if stmt.columns.is_empty() {
3653        (0..resolved_aliases.len()).collect()
3654    } else {
3655        stmt.columns
3656            .iter()
3657            .map(|c| {
3658                alias_map
3659                    .get(&c.to_ascii_lowercase())
3660                    .copied()
3661                    .ok_or_else(|| SqlError::ColumnNotFound(c.clone()))
3662            })
3663            .collect::<Result<_>>()?
3664    };
3665
3666    let source_rows: Vec<Vec<Value>> = match &stmt.source {
3667        InsertSource::Values(rows) => {
3668            let mut out = Vec::with_capacity(rows.len());
3669            for row in rows {
3670                if row.len() != target_positions.len() {
3671                    return Err(SqlError::InvalidValue(format!(
3672                        "expected {} values, got {}",
3673                        target_positions.len(),
3674                        row.len()
3675                    )));
3676                }
3677                let mut vals = Vec::with_capacity(row.len());
3678                for expr in row {
3679                    let v = match expr {
3680                        Expr::Parameter(n) => params
3681                            .get(n - 1)
3682                            .cloned()
3683                            .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
3684                        Expr::Literal(v) => v.clone(),
3685                        other => eval_const_expr(other)?,
3686                    };
3687                    vals.push(v);
3688                }
3689                out.push(vals);
3690            }
3691            out
3692        }
3693        InsertSource::Select(sq) => {
3694            let empty_ctes = CteContext::default();
3695            let qr = exec_query_body_write(wtx, schema, &sq.body, &empty_ctes)?;
3696            qr.rows
3697        }
3698    };
3699
3700    let mut count: u64 = 0;
3701    for row in source_rows {
3702        if row.len() != target_positions.len() {
3703            return Err(SqlError::InvalidValue(format!(
3704                "expected {} values, got {}",
3705                target_positions.len(),
3706                row.len()
3707            )));
3708        }
3709        let mut new_row = vec![Value::Null; resolved_aliases.len()];
3710        for (slot, val) in target_positions.iter().zip(row) {
3711            new_row[*slot] = val;
3712        }
3713        super::triggers::fire_row_triggers(
3714            wtx,
3715            schema,
3716            view_name,
3717            crate::parser::TriggerTiming::InsteadOf,
3718            super::triggers::FireEvent::Insert,
3719            None,
3720            Some(new_row),
3721            &view_cols,
3722        )?;
3723        count += 1;
3724    }
3725    Ok(ExecutionResult::RowsAffected(count))
3726}
3727
3728fn derive_view_columns(
3729    wtx: &mut WriteTxn<'_>,
3730    schema: &SchemaManager,
3731    view_name: &str,
3732) -> Result<Vec<String>> {
3733    use crate::parser::{QueryBody, SelectColumn, SelectQuery, SelectStmt};
3734    let sel = SelectStmt {
3735        columns: vec![SelectColumn::AllColumns],
3736        from: view_name.to_string(),
3737        from_alias: None,
3738        from_subquery: None,
3739        from_args: None,
3740        from_json_table: None,
3741        joins: vec![],
3742        distinct: false,
3743        where_clause: None,
3744        order_by: vec![],
3745        limit: Some(Expr::Literal(Value::Integer(1))),
3746        offset: None,
3747        group_by: vec![],
3748        having: None,
3749    };
3750    let sq = SelectQuery {
3751        ctes: vec![],
3752        recursive: false,
3753        body: QueryBody::Select(Box::new(sel)),
3754    };
3755    let qr = super::cte::exec_select_query_in_txn(wtx, schema, &sq)?;
3756    match qr {
3757        ExecutionResult::Query(q) => Ok(q.columns),
3758        _ => Ok(Vec::new()),
3759    }
3760}
3761
3762#[cfg(test)]
3763#[path = "dml_tests.rs"]
3764mod tests;