Skip to main content

citadel_sql/executor/
dml.rs

1use std::cell::RefCell;
2use std::sync::Arc;
3
4use citadel::Database;
5use citadel_buffer::btree::{UpsertAction, UpsertOutcome};
6use citadel_txn::read_txn::ReadTxn;
7use citadel_txn::write_txn::WriteTxn;
8use rustc_hash::FxHashMap;
9
10use crate::encoding::{encode_composite_key_into, encode_row_into};
11use crate::error::{Result, SqlError};
12use crate::eval::{eval_expr, is_truthy, ColumnMap, EvalCtx};
13use crate::parser::*;
14use crate::types::*;
15
16use crate::schema::SchemaManager;
17
18use super::compile::CompiledPlan;
19use super::helpers::*;
20use super::CteContext;
21
22pub(super) fn exec_insert(
23    db: &Database,
24    schema: &SchemaManager,
25    stmt: &InsertStmt,
26    params: &[Value],
27) -> Result<ExecutionResult> {
28    let empty_ctes = CteContext::default();
29    let materialized;
30    let stmt = if insert_has_subquery(stmt) {
31        materialized = materialize_insert(stmt, &mut |sub| {
32            exec_subquery_read(db, schema, sub, &empty_ctes)
33        })?;
34        &materialized
35    } else {
36        stmt
37    };
38
39    let lower_name = stmt.table.to_ascii_lowercase();
40    if let Some(view_def) = schema.get_view(&lower_name) {
41        if super::triggers::has_instead_of(schema, &lower_name, super::triggers::FireEvent::Insert)
42        {
43            let aliases = view_def.column_aliases.clone();
44            return exec_instead_of_view_insert_auto(
45                db,
46                schema,
47                &lower_name,
48                &aliases,
49                stmt,
50                params,
51            );
52        }
53        return Err(SqlError::CannotModifyView(stmt.table.clone()));
54    }
55    if schema.get_matview(&lower_name).is_some() {
56        return Err(SqlError::CannotModifyView(format!(
57            "materialized view '{}' is read-only — use REFRESH MATERIALIZED VIEW",
58            stmt.table
59        )));
60    }
61    let table_schema = schema
62        .get(&lower_name)
63        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
64    schema.mark_dml(&table_schema.name);
65
66    let insert_columns = if stmt.columns.is_empty() {
67        table_schema
68            .columns
69            .iter()
70            .map(|c| c.name.clone())
71            .collect::<Vec<_>>()
72    } else {
73        stmt.columns
74            .iter()
75            .map(|c| c.to_ascii_lowercase())
76            .collect()
77    };
78
79    let col_indices: Vec<usize> = insert_columns
80        .iter()
81        .map(|name| {
82            table_schema
83                .column_index(name)
84                .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
85        })
86        .collect::<Result<_>>()?;
87
88    for &ci in &col_indices {
89        if table_schema.columns[ci].generated_kind.is_some() {
90            return Err(SqlError::CannotInsertIntoGeneratedColumn(
91                table_schema.columns[ci].name.clone(),
92            ));
93        }
94    }
95
96    let defaults: Vec<(usize, &Expr)> = table_schema
97        .columns
98        .iter()
99        .filter(|c| c.default_expr.is_some() && !col_indices.contains(&(c.position as usize)))
100        .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
101        .collect();
102
103    let generated_cols: Vec<(usize, &Expr)> = table_schema
104        .columns
105        .iter()
106        .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
107        .map(|c| (c.position as usize, c.generated_expr.as_ref().unwrap()))
108        .collect();
109
110    let has_checks = table_schema.has_checks();
111    let strict = table_schema.is_strict();
112    let row_col_map_for_gen = if !generated_cols.is_empty() {
113        Some(ColumnMap::new(&table_schema.columns))
114    } else {
115        None
116    };
117    let check_col_map = if has_checks {
118        Some(ColumnMap::new(&table_schema.columns))
119    } else {
120        None
121    };
122
123    let select_rows = match &stmt.source {
124        InsertSource::Select(sq) => {
125            let insert_ctes =
126                super::materialize_all_ctes(&sq.ctes, sq.recursive, &mut |body, ctx| {
127                    exec_query_body_read(db, schema, body, ctx)
128                })?;
129            let qr = exec_query_body_read(db, schema, &sq.body, &insert_ctes)?;
130            Some(qr.rows)
131        }
132        InsertSource::Values(_) => None,
133    };
134
135    let compiled_conflict: Option<Arc<CompiledOnConflict>> = stmt
136        .on_conflict
137        .as_ref()
138        .map(|oc| compile_on_conflict(oc, table_schema).map(Arc::new))
139        .transpose()?;
140
141    let row_col_map = compiled_conflict
142        .as_ref()
143        .map(|_| ColumnMap::new(&table_schema.columns));
144
145    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
146    // DML invalidates the table's persisted ANN segment in the SAME txn
147    // (rollback restores it; commit makes table-changed-but-segment-survives
148    // unrepresentable for this path).
149    super::ann_persist::purge_segment(&mut wtx, &table_schema.name)?;
150    let mut count: u64 = 0;
151    let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
152        stmt.returning.as_ref().map(|_| Vec::new());
153
154    let pk_indices = table_schema.pk_indices();
155    let non_pk = table_schema.non_pk_indices();
156    let enc_pos = table_schema.encoding_positions();
157    let phys_count = table_schema.physical_non_pk_count();
158    let mut row = vec![Value::Null; table_schema.columns.len()];
159    let mut pk_values: Vec<Value> = vec![Value::Null; pk_indices.len()];
160    let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
161    let mut key_buf: Vec<u8> = Vec::with_capacity(64);
162    let mut value_buf: Vec<u8> = Vec::with_capacity(256);
163    let mut fk_key_buf: Vec<u8> = Vec::with_capacity(64);
164
165    let values = match &stmt.source {
166        InsertSource::Values(rows) => Some(rows.as_slice()),
167        InsertSource::Select(_) => None,
168    };
169    let sel_rows = select_rows.as_deref();
170
171    let total = match (values, sel_rows) {
172        (Some(rows), _) => rows.len(),
173        (_, Some(rows)) => rows.len(),
174        _ => 0,
175    };
176
177    if let Some(sel) = sel_rows {
178        if !sel.is_empty() && sel[0].len() != insert_columns.len() {
179            return Err(SqlError::InvalidValue(format!(
180                "INSERT ... SELECT column count mismatch: expected {}, got {}",
181                insert_columns.len(),
182                sel[0].len()
183            )));
184        }
185    }
186
187    let has_insert_statement_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
188        t.enabled
189            && t.granularity == crate::parser::TriggerGranularity::ForEachStatement
190            && t.events
191                .iter()
192                .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
193    });
194    let mut stmt_new_rows: Vec<Vec<Value>> = if has_insert_statement_triggers {
195        Vec::with_capacity(total)
196    } else {
197        Vec::new()
198    };
199
200    if has_insert_statement_triggers {
201        super::triggers::fire_statement_triggers(
202            &mut wtx,
203            schema,
204            &table_schema.name,
205            crate::parser::TriggerTiming::Before,
206            super::triggers::FireEvent::Insert,
207            &table_schema.columns,
208            &[],
209            &[],
210        )?;
211    }
212
213    for idx in 0..total {
214        for v in row.iter_mut() {
215            *v = Value::Null;
216        }
217
218        if let Some(value_rows) = values {
219            let value_row = &value_rows[idx];
220            if value_row.len() != insert_columns.len() {
221                return Err(SqlError::InvalidValue(format!(
222                    "expected {} values, got {}",
223                    insert_columns.len(),
224                    value_row.len()
225                )));
226            }
227            for (i, expr) in value_row.iter().enumerate() {
228                let val = if let Expr::Parameter(n) = expr {
229                    params
230                        .get(n - 1)
231                        .cloned()
232                        .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?
233                } else {
234                    eval_const_expr(expr)?
235                };
236                let col_idx = col_indices[i];
237                let col = &table_schema.columns[col_idx];
238                row[col_idx] = if val.is_null() {
239                    Value::Null
240                } else {
241                    coerce_for_column(val, col, strict)?
242                };
243            }
244        } else if let Some(sel) = sel_rows {
245            let sel_row = &sel[idx];
246            for (i, val) in sel_row.iter().enumerate() {
247                let col_idx = col_indices[i];
248                let col = &table_schema.columns[col_idx];
249                row[col_idx] = if val.is_null() {
250                    Value::Null
251                } else {
252                    coerce_for_column(val.clone(), col, strict)?
253                };
254            }
255        }
256
257        for &(pos, def_expr) in &defaults {
258            let val = eval_const_expr(def_expr)?;
259            let col = &table_schema.columns[pos];
260            if !val.is_null() {
261                row[pos] = coerce_for_column(val, col, strict)?;
262            }
263        }
264
265        if let Some(ref gen_map) = row_col_map_for_gen {
266            for &(pos, gen_expr) in &generated_cols {
267                let val = eval_expr(gen_expr, &EvalCtx::new(gen_map, &row))?;
268                let col = &table_schema.columns[pos];
269                row[pos] = if val.is_null() {
270                    Value::Null
271                } else {
272                    coerce_for_column(val, col, strict)?
273                };
274            }
275        }
276
277        for col in &table_schema.columns {
278            if !col.nullable && row[col.position as usize].is_null() {
279                return Err(SqlError::NotNullViolation(col.name.clone()));
280            }
281        }
282
283        if let Some(ref col_map) = check_col_map {
284            for col in &table_schema.columns {
285                if let Some(ref check) = col.check_expr {
286                    let result = eval_expr(check, &EvalCtx::new(col_map, &row))?;
287                    if !is_truthy(&result) && !result.is_null() {
288                        let name = col.check_name.as_deref().unwrap_or(&col.name);
289                        return Err(SqlError::CheckViolation(name.to_string()));
290                    }
291                }
292            }
293            for tc in &table_schema.check_constraints {
294                let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &row))?;
295                if !is_truthy(&result) && !result.is_null() {
296                    let name = tc.name.as_deref().unwrap_or(&tc.sql);
297                    return Err(SqlError::CheckViolation(name.to_string()));
298                }
299            }
300        }
301
302        for fk in &table_schema.foreign_keys {
303            let any_null = fk.columns.iter().any(|&ci| row[ci as usize].is_null());
304            if any_null {
305                continue; // MATCH SIMPLE: skip if any FK col is NULL
306            }
307            let fk_vals: Vec<Value> = fk
308                .columns
309                .iter()
310                .map(|&ci| row[ci as usize].clone())
311                .collect();
312            fk_key_buf.clear();
313            encode_composite_key_into(&fk_vals, &mut fk_key_buf);
314            if fk.deferrable && fk.initially_deferred {
315                let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
316                wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
317                    fk_name: name,
318                    foreign_table: fk.foreign_table.as_bytes().to_vec(),
319                    parent_key: fk_key_buf.clone(),
320                });
321                continue;
322            }
323            if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key_buf) {
324                let found = wtx
325                    .table_get(fk.foreign_table.as_bytes(), &fk_key_buf)
326                    .map_err(SqlError::Storage)?;
327                if found.is_none() {
328                    let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
329                    return Err(SqlError::ForeignKeyViolation(name.to_string()));
330                }
331                wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key_buf);
332            }
333        }
334
335        let proposed_row_for_returning: Option<Vec<Value>> =
336            returning_rows.as_ref().map(|_| row.clone());
337        let row_for_stmt_trigger: Option<Vec<Value>> = if has_insert_statement_triggers {
338            Some(row.clone())
339        } else {
340            None
341        };
342
343        let has_before_insert_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
344            t.enabled
345                && t.timing == crate::parser::TriggerTiming::Before
346                && t.granularity == crate::parser::TriggerGranularity::ForEachRow
347                && t.events
348                    .iter()
349                    .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
350        });
351        if has_before_insert_triggers {
352            super::triggers::fire_row_triggers(
353                &mut wtx,
354                schema,
355                &table_schema.name,
356                crate::parser::TriggerTiming::Before,
357                super::triggers::FireEvent::Insert,
358                None,
359                Some(row.clone()),
360                &table_schema.columns,
361            )?;
362        }
363
364        for (j, &i) in pk_indices.iter().enumerate() {
365            pk_values[j] = std::mem::replace(&mut row[i], Value::Null);
366        }
367        encode_composite_key_into(&pk_values, &mut key_buf);
368
369        for (j, &i) in non_pk.iter().enumerate() {
370            let col = &table_schema.columns[i];
371            if matches!(
372                col.generated_kind,
373                Some(crate::parser::GeneratedKind::Virtual)
374            ) {
375                value_values[enc_pos[j] as usize] = Value::Null;
376                row[i] = Value::Null;
377            } else {
378                value_values[enc_pos[j] as usize] = std::mem::replace(&mut row[i], Value::Null);
379            }
380        }
381        encode_row_into(&value_values, &mut value_buf);
382
383        if key_buf.len() > citadel_core::MAX_KEY_SIZE {
384            return Err(SqlError::KeyTooLarge {
385                size: key_buf.len(),
386                max: citadel_core::MAX_KEY_SIZE,
387            });
388        }
389        if value_buf.len() > citadel_core::MAX_VALUE_SIZE {
390            return Err(SqlError::RowTooLarge {
391                size: value_buf.len(),
392                max: citadel_core::MAX_VALUE_SIZE,
393            });
394        }
395
396        match compiled_conflict.as_ref() {
397            None => {
398                let is_new = wtx
399                    .table_insert(table_schema.name.as_bytes(), &key_buf, &value_buf)
400                    .map_err(SqlError::Storage)?;
401                if !is_new {
402                    return Err(SqlError::DuplicateKey);
403                }
404                let has_after_insert_triggers =
405                    schema.triggers_for(&table_schema.name).iter().any(|t| {
406                        t.enabled
407                            && t.timing == crate::parser::TriggerTiming::After
408                            && t.granularity == crate::parser::TriggerGranularity::ForEachRow
409                            && t.events
410                                .iter()
411                                .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
412                    });
413                if !table_schema.indices.is_empty() || has_after_insert_triggers {
414                    for (j, &i) in pk_indices.iter().enumerate() {
415                        row[i] = pk_values[j].clone();
416                    }
417                    for (j, &i) in non_pk.iter().enumerate() {
418                        row[i] =
419                            std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
420                    }
421                    if !table_schema.indices.is_empty() {
422                        insert_index_entries(&mut wtx, table_schema, &row, &pk_values)?;
423                    }
424                    if has_after_insert_triggers {
425                        super::triggers::fire_row_triggers(
426                            &mut wtx,
427                            schema,
428                            &table_schema.name,
429                            crate::parser::TriggerTiming::After,
430                            super::triggers::FireEvent::Insert,
431                            None,
432                            Some(row.clone()),
433                            &table_schema.columns,
434                        )?;
435                    }
436                }
437                if let Some(r) = row_for_stmt_trigger.clone() {
438                    stmt_new_rows.push(r);
439                }
440                count += 1;
441                if let Some(buf) = returning_rows.as_mut() {
442                    buf.push((None, proposed_row_for_returning));
443                }
444            }
445            Some(oc) => {
446                let oc_ref: &CompiledOnConflict = oc;
447                let needs_row = upsert_needs_row(oc_ref, table_schema);
448                if needs_row {
449                    for (j, &i) in pk_indices.iter().enumerate() {
450                        row[i] = pk_values[j].clone();
451                    }
452                    for (j, &i) in non_pk.iter().enumerate() {
453                        row[i] =
454                            std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
455                    }
456                }
457                let outcome = apply_insert_with_conflict(
458                    &mut wtx,
459                    table_schema,
460                    &key_buf,
461                    &value_buf,
462                    &row,
463                    &pk_values,
464                    oc_ref,
465                    row_col_map.as_ref().unwrap(),
466                    stmt.returning.is_some(),
467                )?;
468                match outcome {
469                    InsertRowOutcome::Inserted => {
470                        count += 1;
471                        if let Some(buf) = returning_rows.as_mut() {
472                            buf.push((None, proposed_row_for_returning));
473                        }
474                        if let Some(r) = row_for_stmt_trigger.clone() {
475                            stmt_new_rows.push(r);
476                        }
477                        let has_after_insert_triggers =
478                            schema.triggers_for(&table_schema.name).iter().any(|t| {
479                                t.enabled
480                                    && t.timing == crate::parser::TriggerTiming::After
481                                    && t.granularity
482                                        == crate::parser::TriggerGranularity::ForEachRow
483                                    && t.events
484                                        .iter()
485                                        .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
486                            });
487                        if has_after_insert_triggers {
488                            super::triggers::fire_row_triggers(
489                                &mut wtx,
490                                schema,
491                                &table_schema.name,
492                                crate::parser::TriggerTiming::After,
493                                super::triggers::FireEvent::Insert,
494                                None,
495                                Some(row.clone()),
496                                &table_schema.columns,
497                            )?;
498                        }
499                    }
500                    InsertRowOutcome::Updated { old, new } => {
501                        count += 1;
502                        if let Some(buf) = returning_rows.as_mut() {
503                            buf.push((Some(old.clone()), Some(new.clone())));
504                        }
505                        let has_after_update_triggers =
506                            schema.triggers_for(&table_schema.name).iter().any(|t| {
507                                t.enabled
508                                    && t.timing == crate::parser::TriggerTiming::After
509                                    && t.granularity
510                                        == crate::parser::TriggerGranularity::ForEachRow
511                                    && t.events.iter().any(|e| {
512                                        matches!(e, crate::parser::TriggerEvent::Update(_))
513                                    })
514                            });
515                        if has_after_update_triggers {
516                            let changed_cols: Vec<String> = match oc_ref {
517                                CompiledOnConflict::DoUpdate { assignments, .. } => assignments
518                                    .iter()
519                                    .map(|(col_idx, _)| table_schema.columns[*col_idx].name.clone())
520                                    .collect(),
521                                _ => Vec::new(),
522                            };
523                            super::triggers::fire_row_triggers(
524                                &mut wtx,
525                                schema,
526                                &table_schema.name,
527                                crate::parser::TriggerTiming::After,
528                                super::triggers::FireEvent::Update {
529                                    changed_columns: &changed_cols,
530                                },
531                                Some(old),
532                                Some(new),
533                                &table_schema.columns,
534                            )?;
535                        }
536                    }
537                    InsertRowOutcome::Skipped => {}
538                }
539            }
540        }
541    }
542
543    if has_insert_statement_triggers {
544        super::triggers::fire_statement_triggers(
545            &mut wtx,
546            schema,
547            &table_schema.name,
548            crate::parser::TriggerTiming::After,
549            super::triggers::FireEvent::Insert,
550            &table_schema.columns,
551            &[],
552            &stmt_new_rows,
553        )?;
554    }
555
556    if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
557        let qr = super::helpers::project_returning(table_schema, returning_cols, &rows)?;
558        super::helpers::drain_deferred_fk_checks(&mut wtx)?;
559        wtx.commit().map_err(SqlError::Storage)?;
560        return Ok(ExecutionResult::Query(qr));
561    }
562
563    super::helpers::drain_deferred_fk_checks(&mut wtx)?;
564    wtx.commit().map_err(SqlError::Storage)?;
565    Ok(ExecutionResult::RowsAffected(count))
566}
567
568pub(super) fn has_subquery(expr: &Expr) -> bool {
569    crate::parser::has_subquery(expr)
570}
571
572pub(super) fn stmt_has_subquery(stmt: &SelectStmt) -> bool {
573    if let Some(ref w) = stmt.where_clause {
574        if has_subquery(w) {
575            return true;
576        }
577    }
578    if let Some(ref h) = stmt.having {
579        if has_subquery(h) {
580            return true;
581        }
582    }
583    for col in &stmt.columns {
584        if let SelectColumn::Expr { expr, .. } = col {
585            if has_subquery(expr) {
586                return true;
587            }
588        }
589    }
590    for ob in &stmt.order_by {
591        if has_subquery(&ob.expr) {
592            return true;
593        }
594    }
595    for join in &stmt.joins {
596        if let Some(ref on_expr) = join.on_clause {
597            if has_subquery(on_expr) {
598                return true;
599            }
600        }
601    }
602    false
603}
604
605pub(super) fn materialize_expr(
606    expr: &Expr,
607    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
608) -> Result<Expr> {
609    match expr {
610        Expr::InSubquery {
611            expr: e,
612            subquery,
613            negated,
614        } => {
615            let inner = materialize_expr(e, exec_sub)?;
616            let qr = exec_sub(subquery)?;
617            if !qr.columns.is_empty() && qr.columns.len() != 1 {
618                return Err(SqlError::SubqueryMultipleColumns);
619            }
620            let mut values = rustc_hash::FxHashSet::default();
621            let mut has_null = false;
622            for row in &qr.rows {
623                if row[0].is_null() {
624                    has_null = true;
625                } else {
626                    values.insert(row[0].clone());
627                }
628            }
629            Ok(Expr::InSet {
630                expr: Box::new(inner),
631                values,
632                has_null,
633                negated: *negated,
634            })
635        }
636        Expr::ScalarSubquery(subquery) => {
637            let qr = exec_sub(subquery)?;
638            if qr.rows.len() > 1 {
639                return Err(SqlError::SubqueryMultipleRows);
640            }
641            let val = if qr.rows.is_empty() {
642                Value::Null
643            } else {
644                qr.rows[0][0].clone()
645            };
646            Ok(Expr::Literal(val))
647        }
648        Expr::Exists { subquery, negated } => {
649            let qr = exec_sub(subquery)?;
650            let exists = !qr.rows.is_empty();
651            let result = if *negated { !exists } else { exists };
652            Ok(Expr::Literal(Value::Boolean(result)))
653        }
654        Expr::InList {
655            expr: e,
656            list,
657            negated,
658        } => {
659            let inner = materialize_expr(e, exec_sub)?;
660            let items = list
661                .iter()
662                .map(|item| materialize_expr(item, exec_sub))
663                .collect::<Result<Vec<_>>>()?;
664            Ok(Expr::InList {
665                expr: Box::new(inner),
666                list: items,
667                negated: *negated,
668            })
669        }
670        Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
671            left: Box::new(materialize_expr(left, exec_sub)?),
672            op: *op,
673            right: Box::new(materialize_expr(right, exec_sub)?),
674        }),
675        Expr::UnaryOp { op, expr: e } => Ok(Expr::UnaryOp {
676            op: *op,
677            expr: Box::new(materialize_expr(e, exec_sub)?),
678        }),
679        Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(materialize_expr(e, exec_sub)?))),
680        Expr::IsNotNull(e) => Ok(Expr::IsNotNull(Box::new(materialize_expr(e, exec_sub)?))),
681        Expr::InSet {
682            expr: e,
683            values,
684            has_null,
685            negated,
686        } => Ok(Expr::InSet {
687            expr: Box::new(materialize_expr(e, exec_sub)?),
688            values: values.clone(),
689            has_null: *has_null,
690            negated: *negated,
691        }),
692        Expr::Between {
693            expr: e,
694            low,
695            high,
696            negated,
697        } => Ok(Expr::Between {
698            expr: Box::new(materialize_expr(e, exec_sub)?),
699            low: Box::new(materialize_expr(low, exec_sub)?),
700            high: Box::new(materialize_expr(high, exec_sub)?),
701            negated: *negated,
702        }),
703        Expr::Like {
704            expr: e,
705            pattern,
706            escape,
707            negated,
708        } => {
709            let esc = escape
710                .as_ref()
711                .map(|es| materialize_expr(es, exec_sub).map(Box::new))
712                .transpose()?;
713            Ok(Expr::Like {
714                expr: Box::new(materialize_expr(e, exec_sub)?),
715                pattern: Box::new(materialize_expr(pattern, exec_sub)?),
716                escape: esc,
717                negated: *negated,
718            })
719        }
720        Expr::Case {
721            operand,
722            conditions,
723            else_result,
724        } => {
725            let op = operand
726                .as_ref()
727                .map(|e| materialize_expr(e, exec_sub).map(Box::new))
728                .transpose()?;
729            let conds = conditions
730                .iter()
731                .map(|(c, r)| {
732                    Ok((
733                        materialize_expr(c, exec_sub)?,
734                        materialize_expr(r, exec_sub)?,
735                    ))
736                })
737                .collect::<Result<Vec<_>>>()?;
738            let else_r = else_result
739                .as_ref()
740                .map(|e| materialize_expr(e, exec_sub).map(Box::new))
741                .transpose()?;
742            Ok(Expr::Case {
743                operand: op,
744                conditions: conds,
745                else_result: else_r,
746            })
747        }
748        Expr::Coalesce(args) => {
749            let materialized = args
750                .iter()
751                .map(|a| materialize_expr(a, exec_sub))
752                .collect::<Result<Vec<_>>>()?;
753            Ok(Expr::Coalesce(materialized))
754        }
755        Expr::Cast { expr: e, data_type } => Ok(Expr::Cast {
756            expr: Box::new(materialize_expr(e, exec_sub)?),
757            data_type: *data_type,
758        }),
759        Expr::Function {
760            name,
761            args,
762            distinct,
763        } => {
764            let materialized = args
765                .iter()
766                .map(|a| materialize_expr(a, exec_sub))
767                .collect::<Result<Vec<_>>>()?;
768            Ok(Expr::Function {
769                name: name.clone(),
770                args: materialized,
771                distinct: *distinct,
772            })
773        }
774        other => Ok(other.clone()),
775    }
776}
777
778pub(super) fn materialize_stmt(
779    stmt: &SelectStmt,
780    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
781) -> Result<SelectStmt> {
782    let where_clause = stmt
783        .where_clause
784        .as_ref()
785        .map(|e| materialize_expr(e, exec_sub))
786        .transpose()?;
787    let having = stmt
788        .having
789        .as_ref()
790        .map(|e| materialize_expr(e, exec_sub))
791        .transpose()?;
792    let columns = stmt
793        .columns
794        .iter()
795        .map(|c| match c {
796            SelectColumn::AllColumns => Ok(SelectColumn::AllColumns),
797            SelectColumn::AllFromOld => Ok(SelectColumn::AllFromOld),
798            SelectColumn::AllFromNew => Ok(SelectColumn::AllFromNew),
799            SelectColumn::Expr { expr, alias } => Ok(SelectColumn::Expr {
800                expr: materialize_expr(expr, exec_sub)?,
801                alias: alias.clone(),
802            }),
803        })
804        .collect::<Result<Vec<_>>>()?;
805    let order_by = stmt
806        .order_by
807        .iter()
808        .map(|ob| {
809            Ok(OrderByItem {
810                expr: materialize_expr(&ob.expr, exec_sub)?,
811                descending: ob.descending,
812                nulls_first: ob.nulls_first,
813            })
814        })
815        .collect::<Result<Vec<_>>>()?;
816    let joins = stmt
817        .joins
818        .iter()
819        .map(|j| {
820            let on_clause = j
821                .on_clause
822                .as_ref()
823                .map(|e| materialize_expr(e, exec_sub))
824                .transpose()?;
825            Ok(JoinClause {
826                join_type: j.join_type,
827                table: j.table.clone(),
828                subquery: j.subquery.clone(),
829                on_clause,
830            })
831        })
832        .collect::<Result<Vec<_>>>()?;
833    let group_by = stmt
834        .group_by
835        .iter()
836        .map(|e| materialize_expr(e, exec_sub))
837        .collect::<Result<Vec<_>>>()?;
838    Ok(SelectStmt {
839        columns,
840        from: stmt.from.clone(),
841        from_alias: stmt.from_alias.clone(),
842        from_subquery: stmt.from_subquery.clone(),
843        from_args: stmt.from_args.clone(),
844        from_json_table: stmt.from_json_table.clone(),
845        joins,
846        distinct: stmt.distinct,
847        where_clause,
848        order_by,
849        limit: stmt.limit.clone(),
850        offset: stmt.offset.clone(),
851        group_by,
852        having,
853    })
854}
855
856pub(super) fn exec_subquery_read(
857    db: &Database,
858    schema: &SchemaManager,
859    stmt: &SelectStmt,
860    ctes: &CteContext,
861) -> Result<QueryResult> {
862    let mut rtx = db.begin_read();
863    exec_subquery_with_read(&mut rtx, schema, stmt, ctes)
864}
865
866pub(super) fn exec_subquery_with_read(
867    rtx: &mut ReadTxn<'_>,
868    schema: &SchemaManager,
869    stmt: &SelectStmt,
870    ctes: &CteContext,
871) -> Result<QueryResult> {
872    match super::exec_select_with_read(rtx, schema, stmt, ctes)? {
873        ExecutionResult::Query(qr) => Ok(qr),
874        _ => Ok(QueryResult {
875            columns: vec![],
876            rows: vec![],
877        }),
878    }
879}
880
881pub(super) fn exec_subquery_write(
882    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
883    schema: &SchemaManager,
884    stmt: &SelectStmt,
885    ctes: &CteContext,
886) -> Result<QueryResult> {
887    match super::exec_select_in_txn(wtx, schema, stmt, ctes)? {
888        ExecutionResult::Query(qr) => Ok(qr),
889        _ => Ok(QueryResult {
890            columns: vec![],
891            rows: vec![],
892        }),
893    }
894}
895
896pub(super) fn update_has_subquery(stmt: &UpdateStmt) -> bool {
897    stmt.where_clause.as_ref().is_some_and(has_subquery)
898        || stmt.assignments.iter().any(|(_, e)| has_subquery(e))
899}
900
901pub(super) fn materialize_update(
902    stmt: &UpdateStmt,
903    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
904) -> Result<UpdateStmt> {
905    let where_clause = stmt
906        .where_clause
907        .as_ref()
908        .map(|e| materialize_expr(e, exec_sub))
909        .transpose()?;
910    let assignments = stmt
911        .assignments
912        .iter()
913        .map(|(name, expr)| Ok((name.clone(), materialize_expr(expr, exec_sub)?)))
914        .collect::<Result<Vec<_>>>()?;
915    Ok(UpdateStmt {
916        table: stmt.table.clone(),
917        assignments,
918        where_clause,
919        returning: stmt.returning.clone(),
920    })
921}
922
923pub(super) fn delete_has_subquery(stmt: &DeleteStmt) -> bool {
924    stmt.where_clause.as_ref().is_some_and(has_subquery)
925}
926
927pub(super) fn materialize_delete(
928    stmt: &DeleteStmt,
929    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
930) -> Result<DeleteStmt> {
931    let where_clause = stmt
932        .where_clause
933        .as_ref()
934        .map(|e| materialize_expr(e, exec_sub))
935        .transpose()?;
936    Ok(DeleteStmt {
937        table: stmt.table.clone(),
938        where_clause,
939        returning: stmt.returning.clone(),
940    })
941}
942
943pub(super) fn insert_has_subquery(stmt: &InsertStmt) -> bool {
944    match &stmt.source {
945        InsertSource::Values(rows) => rows.iter().any(|row| row.iter().any(has_subquery)),
946        // SELECT source subqueries are handled by exec_select's correlated/non-correlated paths
947        InsertSource::Select(_) => false,
948    }
949}
950
951pub(super) fn materialize_insert(
952    stmt: &InsertStmt,
953    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
954) -> Result<InsertStmt> {
955    let source = match &stmt.source {
956        InsertSource::Values(rows) => {
957            let mat = rows
958                .iter()
959                .map(|row| {
960                    row.iter()
961                        .map(|e| materialize_expr(e, exec_sub))
962                        .collect::<Result<Vec<_>>>()
963                })
964                .collect::<Result<Vec<_>>>()?;
965            InsertSource::Values(mat)
966        }
967        InsertSource::Select(sq) => {
968            let ctes = sq
969                .ctes
970                .iter()
971                .map(|c| {
972                    Ok(CteDefinition {
973                        name: c.name.clone(),
974                        column_aliases: c.column_aliases.clone(),
975                        body: materialize_query_body(&c.body, exec_sub)?,
976                    })
977                })
978                .collect::<Result<Vec<_>>>()?;
979            let body = materialize_query_body(&sq.body, exec_sub)?;
980            InsertSource::Select(Box::new(SelectQuery {
981                ctes,
982                recursive: sq.recursive,
983                body,
984            }))
985        }
986    };
987    Ok(InsertStmt {
988        table: stmt.table.clone(),
989        columns: stmt.columns.clone(),
990        source,
991        on_conflict: stmt.on_conflict.clone(),
992        returning: stmt.returning.clone(),
993    })
994}
995
996pub(super) fn materialize_query_body(
997    body: &QueryBody,
998    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
999) -> Result<QueryBody> {
1000    match body {
1001        QueryBody::Select(sel) => Ok(QueryBody::Select(Box::new(materialize_stmt(
1002            sel, exec_sub,
1003        )?))),
1004        QueryBody::Compound(comp) => Ok(QueryBody::Compound(Box::new(CompoundSelect {
1005            op: comp.op.clone(),
1006            all: comp.all,
1007            left: Box::new(materialize_query_body(&comp.left, exec_sub)?),
1008            right: Box::new(materialize_query_body(&comp.right, exec_sub)?),
1009            order_by: comp.order_by.clone(),
1010            limit: comp.limit.clone(),
1011            offset: comp.offset.clone(),
1012        }))),
1013        QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Ok(body.clone()),
1014    }
1015}
1016
1017pub(super) fn exec_query_body_with_read(
1018    rtx: &mut ReadTxn<'_>,
1019    schema: &SchemaManager,
1020    body: &QueryBody,
1021    ctes: &CteContext,
1022) -> Result<ExecutionResult> {
1023    match body {
1024        QueryBody::Select(sel) => super::exec_select_with_read(rtx, schema, sel, ctes),
1025        QueryBody::Compound(comp) => exec_compound_select_with_read(rtx, schema, comp, ctes),
1026        QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Err(
1027            SqlError::Unsupported("DML CTE bodies require an active write transaction".into()),
1028        ),
1029    }
1030}
1031
1032pub(super) fn exec_query_body_in_txn(
1033    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1034    schema: &SchemaManager,
1035    body: &QueryBody,
1036    ctes: &CteContext,
1037) -> Result<ExecutionResult> {
1038    match body {
1039        QueryBody::Select(sel) => super::exec_select_in_txn(wtx, schema, sel, ctes),
1040        QueryBody::Compound(comp) => exec_compound_select_in_txn(wtx, schema, comp, ctes),
1041        QueryBody::Insert(ins) => exec_insert_in_txn_with_ctes(wtx, schema, ins, &[], ctes),
1042        QueryBody::Update(upd) => super::exec_update_in_txn(wtx, schema, upd),
1043        QueryBody::Delete(del) => super::exec_delete_in_txn(wtx, schema, del),
1044    }
1045}
1046
1047pub(super) fn exec_query_body_read(
1048    db: &Database,
1049    schema: &SchemaManager,
1050    body: &QueryBody,
1051    ctes: &CteContext,
1052) -> Result<QueryResult> {
1053    let mut rtx = db.begin_read();
1054    exec_query_body_with_read_qr(&mut rtx, schema, body, ctes)
1055}
1056
1057pub(super) fn exec_query_body_with_read_qr(
1058    rtx: &mut ReadTxn<'_>,
1059    schema: &SchemaManager,
1060    body: &QueryBody,
1061    ctes: &CteContext,
1062) -> Result<QueryResult> {
1063    match exec_query_body_with_read(rtx, schema, body, ctes)? {
1064        ExecutionResult::Query(qr) => Ok(qr),
1065        _ => Ok(QueryResult {
1066            columns: vec![],
1067            rows: vec![],
1068        }),
1069    }
1070}
1071
1072pub(super) fn exec_query_body_write(
1073    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1074    schema: &SchemaManager,
1075    body: &QueryBody,
1076    ctes: &CteContext,
1077) -> Result<QueryResult> {
1078    match exec_query_body_in_txn(wtx, schema, body, ctes)? {
1079        ExecutionResult::Query(qr) => Ok(qr),
1080        _ => Ok(QueryResult {
1081            columns: vec![],
1082            rows: vec![],
1083        }),
1084    }
1085}
1086
1087pub(super) fn exec_compound_select_with_read(
1088    rtx: &mut ReadTxn<'_>,
1089    schema: &SchemaManager,
1090    comp: &CompoundSelect,
1091    ctes: &CteContext,
1092) -> Result<ExecutionResult> {
1093    let left_qr = match exec_query_body_with_read(rtx, schema, &comp.left, ctes)? {
1094        ExecutionResult::Query(qr) => qr,
1095        _ => QueryResult {
1096            columns: vec![],
1097            rows: vec![],
1098        },
1099    };
1100    let right_qr = match exec_query_body_with_read(rtx, schema, &comp.right, ctes)? {
1101        ExecutionResult::Query(qr) => qr,
1102        _ => QueryResult {
1103            columns: vec![],
1104            rows: vec![],
1105        },
1106    };
1107    apply_set_operation(comp, left_qr, right_qr)
1108}
1109
1110pub(super) fn exec_compound_select_in_txn(
1111    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1112    schema: &SchemaManager,
1113    comp: &CompoundSelect,
1114    ctes: &CteContext,
1115) -> Result<ExecutionResult> {
1116    let left_qr = match exec_query_body_in_txn(wtx, schema, &comp.left, ctes)? {
1117        ExecutionResult::Query(qr) => qr,
1118        _ => QueryResult {
1119            columns: vec![],
1120            rows: vec![],
1121        },
1122    };
1123    let right_qr = match exec_query_body_in_txn(wtx, schema, &comp.right, ctes)? {
1124        ExecutionResult::Query(qr) => qr,
1125        _ => QueryResult {
1126            columns: vec![],
1127            rows: vec![],
1128        },
1129    };
1130    apply_set_operation(comp, left_qr, right_qr)
1131}
1132
1133pub(super) fn apply_set_operation(
1134    comp: &CompoundSelect,
1135    left_qr: QueryResult,
1136    right_qr: QueryResult,
1137) -> Result<ExecutionResult> {
1138    if !left_qr.columns.is_empty()
1139        && !right_qr.columns.is_empty()
1140        && left_qr.columns.len() != right_qr.columns.len()
1141    {
1142        return Err(SqlError::CompoundColumnCountMismatch {
1143            left: left_qr.columns.len(),
1144            right: right_qr.columns.len(),
1145        });
1146    }
1147
1148    let columns = left_qr.columns;
1149
1150    let mut rows = match (&comp.op, comp.all) {
1151        (SetOp::Union, true) => {
1152            let total = left_qr.rows.len().saturating_add(right_qr.rows.len());
1153            let mut rows = Vec::with_capacity(total);
1154            rows.extend(left_qr.rows);
1155            rows.extend(right_qr.rows);
1156            rows
1157        }
1158        (SetOp::Union, false) => {
1159            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1160            let mut rows = Vec::new();
1161            for row in left_qr.rows.into_iter().chain(right_qr.rows) {
1162                if !seen.contains(&row) {
1163                    seen.insert(row.clone());
1164                    rows.push(row);
1165                }
1166            }
1167            rows
1168        }
1169        (SetOp::Intersect, true) => {
1170            let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1171            for row in &right_qr.rows {
1172                *right_counts.entry(row.clone()).or_insert(0) += 1;
1173            }
1174            let mut rows = Vec::new();
1175            for row in left_qr.rows {
1176                if let Some(count) = right_counts.get_mut(&row) {
1177                    if *count > 0 {
1178                        *count -= 1;
1179                        rows.push(row);
1180                    }
1181                }
1182            }
1183            rows
1184        }
1185        (SetOp::Intersect, false) => {
1186            let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1187            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1188            let mut rows = Vec::new();
1189            for row in left_qr.rows {
1190                if right_set.contains(&row) && !seen.contains(&row) {
1191                    seen.insert(row.clone());
1192                    rows.push(row);
1193                }
1194            }
1195            rows
1196        }
1197        (SetOp::Except, true) => {
1198            let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1199            for row in &right_qr.rows {
1200                *right_counts.entry(row.clone()).or_insert(0) += 1;
1201            }
1202            let mut rows = Vec::new();
1203            for row in left_qr.rows {
1204                if let Some(count) = right_counts.get_mut(&row) {
1205                    if *count > 0 {
1206                        *count -= 1;
1207                        continue;
1208                    }
1209                }
1210                rows.push(row);
1211            }
1212            rows
1213        }
1214        (SetOp::Except, false) => {
1215            let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1216            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1217            let mut rows = Vec::new();
1218            for row in left_qr.rows {
1219                if !right_set.contains(&row) && !seen.contains(&row) {
1220                    seen.insert(row.clone());
1221                    rows.push(row);
1222                }
1223            }
1224            rows
1225        }
1226    };
1227
1228    if !comp.order_by.is_empty() {
1229        let col_defs: Vec<crate::types::ColumnDef> = columns
1230            .iter()
1231            .enumerate()
1232            .map(|(i, name)| crate::types::ColumnDef {
1233                name: name.clone(),
1234                data_type: crate::types::DataType::Null,
1235                nullable: true,
1236                position: i as u16,
1237                default_expr: None,
1238                default_sql: None,
1239                check_expr: None,
1240                check_sql: None,
1241                check_name: None,
1242                is_with_timezone: false,
1243                generated_expr: None,
1244                generated_sql: None,
1245                generated_kind: None,
1246                collation: crate::types::Collation::Binary,
1247            })
1248            .collect();
1249        sort_rows(&mut rows, &comp.order_by, &col_defs)?;
1250    }
1251
1252    if let Some(ref offset_expr) = comp.offset {
1253        let offset = eval_const_int(offset_expr)?.max(0) as usize;
1254        if offset < rows.len() {
1255            rows = rows.split_off(offset);
1256        } else {
1257            rows.clear();
1258        }
1259    }
1260
1261    if let Some(ref limit_expr) = comp.limit {
1262        let limit = eval_const_int(limit_expr)?.max(0) as usize;
1263        rows.truncate(limit);
1264    }
1265
1266    Ok(ExecutionResult::Query(QueryResult { columns, rows }))
1267}
1268
1269struct InsertBufs {
1270    row: Vec<Value>,
1271    pk_values: Vec<Value>,
1272    value_values: Vec<Value>,
1273    key_buf: Vec<u8>,
1274    value_buf: Vec<u8>,
1275    col_indices: Vec<usize>,
1276    fk_key_buf: Vec<u8>,
1277}
1278
1279impl InsertBufs {
1280    fn new() -> Self {
1281        Self {
1282            row: Vec::new(),
1283            pk_values: Vec::new(),
1284            value_values: Vec::new(),
1285            key_buf: Vec::with_capacity(64),
1286            value_buf: Vec::with_capacity(256),
1287            col_indices: Vec::new(),
1288            fk_key_buf: Vec::with_capacity(64),
1289        }
1290    }
1291}
1292
1293thread_local! {
1294    static INSERT_SCRATCH: RefCell<InsertBufs> = RefCell::new(InsertBufs::new());
1295    static UPSERT_SCRATCH: RefCell<UpsertBufs> = RefCell::new(UpsertBufs::new());
1296}
1297
1298fn with_insert_scratch<R>(f: impl FnOnce(&mut InsertBufs) -> R) -> R {
1299    INSERT_SCRATCH.with(|slot| match slot.try_borrow_mut() {
1300        Ok(mut borrowed) => f(&mut borrowed),
1301        Err(_) => {
1302            let mut local = InsertBufs::new();
1303            f(&mut local)
1304        }
1305    })
1306}
1307
1308pub(super) struct UpsertBufs {
1309    old_row: Vec<Value>,
1310    new_row: Vec<Value>,
1311    value_values: Vec<Value>,
1312    new_value_buf: Vec<u8>,
1313}
1314
1315impl UpsertBufs {
1316    pub(super) fn new() -> Self {
1317        Self {
1318            old_row: Vec::new(),
1319            new_row: Vec::new(),
1320            value_values: Vec::new(),
1321            new_value_buf: Vec::with_capacity(256),
1322        }
1323    }
1324}
1325
1326pub fn exec_insert_in_txn(
1327    wtx: &mut WriteTxn<'_>,
1328    schema: &SchemaManager,
1329    stmt: &InsertStmt,
1330    params: &[Value],
1331) -> Result<ExecutionResult> {
1332    with_insert_scratch(|bufs| {
1333        exec_insert_in_txn_impl(
1334            wtx,
1335            schema,
1336            stmt,
1337            params,
1338            bufs,
1339            None,
1340            &CteContext::default(),
1341        )
1342    })
1343}
1344
1345pub(super) fn exec_insert_in_txn_with_ctes(
1346    wtx: &mut WriteTxn<'_>,
1347    schema: &SchemaManager,
1348    stmt: &InsertStmt,
1349    params: &[Value],
1350    outer_ctes: &CteContext,
1351) -> Result<ExecutionResult> {
1352    with_insert_scratch(|bufs| {
1353        exec_insert_in_txn_impl(wtx, schema, stmt, params, bufs, None, outer_ctes)
1354    })
1355}
1356
1357fn exec_insert_in_txn_cached(
1358    wtx: &mut WriteTxn<'_>,
1359    schema: &SchemaManager,
1360    stmt: &InsertStmt,
1361    params: &[Value],
1362    cache: &InsertCache,
1363) -> Result<ExecutionResult> {
1364    with_insert_scratch(|bufs| {
1365        exec_insert_in_txn_impl(
1366            wtx,
1367            schema,
1368            stmt,
1369            params,
1370            bufs,
1371            Some(cache),
1372            &CteContext::default(),
1373        )
1374    })
1375}
1376
1377fn exec_insert_in_txn_impl(
1378    wtx: &mut WriteTxn<'_>,
1379    schema: &SchemaManager,
1380    stmt: &InsertStmt,
1381    params: &[Value],
1382    bufs: &mut InsertBufs,
1383    cache: Option<&InsertCache>,
1384    outer_ctes: &CteContext,
1385) -> Result<ExecutionResult> {
1386    let empty_ctes = CteContext::default();
1387    let materialized;
1388    let has_sub = match cache {
1389        Some(c) => c.has_subquery,
1390        None => insert_has_subquery(stmt),
1391    };
1392    let stmt = if has_sub {
1393        materialized = materialize_insert(stmt, &mut |sub| {
1394            exec_subquery_write(wtx, schema, sub, &empty_ctes)
1395        })?;
1396        &materialized
1397    } else {
1398        stmt
1399    };
1400
1401    let view_lookup_key = stmt.table.to_ascii_lowercase();
1402    if let Some(view_def) = schema.get_view(&view_lookup_key) {
1403        if super::triggers::has_instead_of(
1404            schema,
1405            &view_lookup_key,
1406            super::triggers::FireEvent::Insert,
1407        ) {
1408            let aliases = view_def.column_aliases.clone();
1409            return exec_instead_of_view_insert_in_txn(
1410                wtx,
1411                schema,
1412                &view_lookup_key,
1413                &aliases,
1414                stmt,
1415                params,
1416            );
1417        }
1418        return Err(SqlError::CannotModifyView(stmt.table.clone()));
1419    }
1420
1421    let table_schema = schema
1422        .get(&stmt.table)
1423        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1424    schema.mark_dml(&table_schema.name);
1425    super::ann_persist::purge_segment(wtx, &table_schema.name)?;
1426
1427    let default_columns;
1428    let insert_columns: &[String] = if stmt.columns.is_empty() {
1429        default_columns = table_schema
1430            .columns
1431            .iter()
1432            .map(|c| c.name.clone())
1433            .collect::<Vec<_>>();
1434        &default_columns
1435    } else {
1436        &stmt.columns
1437    };
1438
1439    bufs.col_indices.clear();
1440    if let Some(c) = cache {
1441        bufs.col_indices.extend_from_slice(&c.col_indices);
1442    } else {
1443        for name in insert_columns {
1444            bufs.col_indices.push(
1445                table_schema
1446                    .column_index(name)
1447                    .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?,
1448            );
1449        }
1450    }
1451
1452    if cache.is_none() {
1453        for &ci in &bufs.col_indices {
1454            if table_schema.columns[ci].generated_kind.is_some() {
1455                return Err(SqlError::CannotInsertIntoGeneratedColumn(
1456                    table_schema.columns[ci].name.clone(),
1457                ));
1458            }
1459        }
1460    }
1461
1462    let generated_cols_uncached: Vec<(usize, &Expr, FastGenEval)>;
1463    let cached_gen_positions: &[usize];
1464    let cached_gen_fast_evals: &[FastGenEval];
1465    if let Some(c) = cache {
1466        cached_gen_positions = &c.generated_col_positions;
1467        cached_gen_fast_evals = &c.generated_fast_evals;
1468        generated_cols_uncached = Vec::new();
1469    } else {
1470        cached_gen_positions = &[];
1471        cached_gen_fast_evals = &[];
1472        generated_cols_uncached = table_schema
1473            .columns
1474            .iter()
1475            .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
1476            .map(|c| {
1477                let expr = c.generated_expr.as_ref().unwrap();
1478                let fe = detect_fast_gen_eval(expr, table_schema);
1479                (c.position as usize, expr, fe)
1480            })
1481            .collect();
1482    }
1483    let has_gen_cols = !cached_gen_positions.is_empty() || !generated_cols_uncached.is_empty();
1484    let row_col_map_for_gen_owned: Option<ColumnMap> = if !has_gen_cols || cache.is_some() {
1485        None
1486    } else {
1487        Some(ColumnMap::new(&table_schema.columns))
1488    };
1489    let row_col_map_for_gen: Option<&ColumnMap> = if !has_gen_cols {
1490        None
1491    } else if let Some(c) = cache {
1492        c.row_col_map.as_ref()
1493    } else {
1494        row_col_map_for_gen_owned.as_ref()
1495    };
1496
1497    let any_defaults = match cache {
1498        Some(c) => c.any_defaults,
1499        None => table_schema
1500            .columns
1501            .iter()
1502            .any(|c| c.default_expr.is_some()),
1503    };
1504    let defaults: Vec<(usize, &Expr)> = if any_defaults {
1505        table_schema
1506            .columns
1507            .iter()
1508            .filter(|c| {
1509                c.default_expr.is_some() && !bufs.col_indices.contains(&(c.position as usize))
1510            })
1511            .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
1512            .collect()
1513    } else {
1514        Vec::new()
1515    };
1516
1517    let has_checks = match cache {
1518        Some(c) => c.has_checks,
1519        None => table_schema.has_checks(),
1520    };
1521    let check_col_map = if has_checks {
1522        Some(ColumnMap::new(&table_schema.columns))
1523    } else {
1524        None
1525    };
1526
1527    let (pk_indices, non_pk, enc_pos, phys_count, dropped): (
1528        &[usize],
1529        &[usize],
1530        &[u16],
1531        usize,
1532        &[u16],
1533    ) = if let Some(c) = cache {
1534        (
1535            &c.pk_indices,
1536            &c.non_pk_indices,
1537            &c.encoding_positions,
1538            c.phys_count,
1539            &c.dropped_non_pk_slots,
1540        )
1541    } else {
1542        (
1543            table_schema.pk_indices(),
1544            table_schema.non_pk_indices(),
1545            table_schema.encoding_positions(),
1546            table_schema.physical_non_pk_count(),
1547            table_schema.dropped_non_pk_slots(),
1548        )
1549    };
1550
1551    bufs.row.resize(table_schema.columns.len(), Value::Null);
1552    bufs.pk_values.resize(pk_indices.len(), Value::Null);
1553    bufs.value_values.resize(phys_count, Value::Null);
1554
1555    let table_bytes = table_schema.name.as_bytes();
1556    let has_fks = !table_schema.foreign_keys.is_empty();
1557    let has_indices = !table_schema.indices.is_empty();
1558    let has_defaults = !defaults.is_empty();
1559
1560    let compiled_conflict: Option<Arc<CompiledOnConflict>> = match (cache, &stmt.on_conflict) {
1561        (Some(c), Some(_)) if c.on_conflict.is_some() => c.on_conflict.clone(),
1562        (_, Some(oc)) => Some(Arc::new(compile_on_conflict(oc, table_schema)?)),
1563        (_, None) => None,
1564    };
1565
1566    let row_col_map_owned: Option<ColumnMap> =
1567        if compiled_conflict.is_some() && cache.and_then(|c| c.row_col_map.as_ref()).is_none() {
1568            Some(ColumnMap::new(&table_schema.columns))
1569        } else {
1570            None
1571        };
1572    let row_col_map: Option<&ColumnMap> = cache
1573        .and_then(|c| c.row_col_map.as_ref())
1574        .or(row_col_map_owned.as_ref());
1575
1576    let select_rows = match &stmt.source {
1577        InsertSource::Select(sq) => {
1578            let insert_ctes = super::materialize_all_ctes_with_outer(
1579                &sq.ctes,
1580                sq.recursive,
1581                outer_ctes,
1582                &mut |body, ctx| exec_query_body_write(wtx, schema, body, ctx),
1583            )?;
1584            let qr = exec_query_body_write(wtx, schema, &sq.body, &insert_ctes)?;
1585            Some(qr.rows)
1586        }
1587        InsertSource::Values(_) => None,
1588    };
1589
1590    let mut count: u64 = 0;
1591    let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
1592        stmt.returning.as_ref().map(|_| Vec::new());
1593
1594    let values = match &stmt.source {
1595        InsertSource::Values(rows) => Some(rows.as_slice()),
1596        InsertSource::Select(_) => None,
1597    };
1598    let sel_rows = select_rows.as_deref();
1599
1600    let total = match (values, sel_rows) {
1601        (Some(rows), _) => rows.len(),
1602        (_, Some(rows)) => rows.len(),
1603        _ => 0,
1604    };
1605
1606    if let Some(sel) = sel_rows {
1607        if !sel.is_empty() && sel[0].len() != insert_columns.len() {
1608            return Err(SqlError::InvalidValue(format!(
1609                "INSERT ... SELECT column count mismatch: expected {}, got {}",
1610                insert_columns.len(),
1611                sel[0].len()
1612            )));
1613        }
1614    }
1615
1616    let has_insert_statement_triggers_impl =
1617        schema.triggers_for(&table_schema.name).iter().any(|t| {
1618            t.enabled
1619                && t.granularity == crate::parser::TriggerGranularity::ForEachStatement
1620                && t.events
1621                    .iter()
1622                    .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1623        });
1624    let mut stmt_new_rows_impl: Vec<Vec<Value>> = if has_insert_statement_triggers_impl {
1625        Vec::with_capacity(total)
1626    } else {
1627        Vec::new()
1628    };
1629    if has_insert_statement_triggers_impl {
1630        super::triggers::fire_statement_triggers(
1631            wtx,
1632            schema,
1633            &table_schema.name,
1634            crate::parser::TriggerTiming::Before,
1635            super::triggers::FireEvent::Insert,
1636            &table_schema.columns,
1637            &[],
1638            &[],
1639        )?;
1640    }
1641
1642    let skip_row_clear = cache.is_some_and(|c| c.row_fully_overwritten);
1643    for idx in 0..total {
1644        if !skip_row_clear {
1645            for v in bufs.row.iter_mut() {
1646                *v = Value::Null;
1647            }
1648        }
1649
1650        if let Some(value_rows) = values {
1651            if let Some(plan) = cache.and_then(|c| c.bind_plan.as_ref()) {
1652                for action in plan {
1653                    match action {
1654                        BindAction::Param {
1655                            param_idx,
1656                            col_idx,
1657                            target,
1658                        } => {
1659                            let v = &params[*param_idx];
1660                            bufs.row[*col_idx] = if v.is_null() {
1661                                Value::Null
1662                            } else if v.data_type() == *target {
1663                                v.clone()
1664                            } else {
1665                                let got = v.data_type();
1666                                v.clone().coerce_into(*target).ok_or_else(|| {
1667                                    SqlError::TypeMismatch {
1668                                        expected: target.to_string(),
1669                                        got: got.to_string(),
1670                                    }
1671                                })?
1672                            };
1673                        }
1674                        BindAction::Literal { value, col_idx } => {
1675                            bufs.row[*col_idx] = value.clone();
1676                        }
1677                    }
1678                }
1679            } else {
1680                let value_row = &value_rows[idx];
1681                if value_row.len() != insert_columns.len() {
1682                    return Err(SqlError::InvalidValue(format!(
1683                        "expected {} values, got {}",
1684                        insert_columns.len(),
1685                        value_row.len()
1686                    )));
1687                }
1688                for (i, expr) in value_row.iter().enumerate() {
1689                    let val = match expr {
1690                        Expr::Parameter(n) => params
1691                            .get(n - 1)
1692                            .cloned()
1693                            .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
1694                        Expr::Literal(v) => v.clone(),
1695                        _ => eval_const_expr(expr)?,
1696                    };
1697                    let col_idx = bufs.col_indices[i];
1698                    let col = &table_schema.columns[col_idx];
1699                    let got_type = val.data_type();
1700                    bufs.row[col_idx] = if val.is_null() {
1701                        Value::Null
1702                    } else {
1703                        val.coerce_into(col.data_type)
1704                            .ok_or_else(|| SqlError::TypeMismatch {
1705                                expected: col.data_type.to_string(),
1706                                got: got_type.to_string(),
1707                            })?
1708                    };
1709                }
1710            }
1711        } else if let Some(sel) = sel_rows {
1712            let sel_row = &sel[idx];
1713            for (i, val) in sel_row.iter().enumerate() {
1714                let col_idx = bufs.col_indices[i];
1715                let col = &table_schema.columns[col_idx];
1716                let got_type = val.data_type();
1717                bufs.row[col_idx] = if val.is_null() {
1718                    Value::Null
1719                } else {
1720                    val.clone().coerce_into(col.data_type).ok_or_else(|| {
1721                        SqlError::TypeMismatch {
1722                            expected: col.data_type.to_string(),
1723                            got: got_type.to_string(),
1724                        }
1725                    })?
1726                };
1727            }
1728        }
1729
1730        if has_defaults {
1731            for &(pos, def_expr) in &defaults {
1732                let val = eval_const_expr(def_expr)?;
1733                let col = &table_schema.columns[pos];
1734                if !val.is_null() {
1735                    let got_type = val.data_type();
1736                    bufs.row[pos] =
1737                        val.coerce_into(col.data_type)
1738                            .ok_or_else(|| SqlError::TypeMismatch {
1739                                expected: col.data_type.to_string(),
1740                                got: got_type.to_string(),
1741                            })?;
1742                }
1743            }
1744        }
1745
1746        if let Some(gen_map) = row_col_map_for_gen {
1747            if cache.is_some() {
1748                for (pos, fast) in cached_gen_positions
1749                    .iter()
1750                    .copied()
1751                    .zip(cached_gen_fast_evals.iter())
1752                {
1753                    let gen_expr = table_schema.columns[pos].generated_expr.as_ref().unwrap();
1754                    let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1755                    let col = &table_schema.columns[pos];
1756                    bufs.row[pos] = if val.is_null() {
1757                        Value::Null
1758                    } else {
1759                        let got_type = val.data_type();
1760                        val.coerce_into(col.data_type)
1761                            .ok_or_else(|| SqlError::TypeMismatch {
1762                                expected: col.data_type.to_string(),
1763                                got: got_type.to_string(),
1764                            })?
1765                    };
1766                }
1767            } else {
1768                for (pos, gen_expr, fast) in &generated_cols_uncached {
1769                    let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1770                    let col = &table_schema.columns[*pos];
1771                    bufs.row[*pos] = if val.is_null() {
1772                        Value::Null
1773                    } else {
1774                        let got_type = val.data_type();
1775                        val.coerce_into(col.data_type)
1776                            .ok_or_else(|| SqlError::TypeMismatch {
1777                                expected: col.data_type.to_string(),
1778                                got: got_type.to_string(),
1779                            })?
1780                    };
1781                }
1782            }
1783        }
1784
1785        if let Some(c) = cache {
1786            for &pos in &c.not_null_indices {
1787                if bufs.row[pos as usize].is_null() {
1788                    return Err(SqlError::NotNullViolation(
1789                        table_schema.columns[pos as usize].name.clone(),
1790                    ));
1791                }
1792            }
1793        } else {
1794            for col in &table_schema.columns {
1795                if !col.nullable && bufs.row[col.position as usize].is_null() {
1796                    return Err(SqlError::NotNullViolation(col.name.clone()));
1797                }
1798            }
1799        }
1800
1801        if let Some(ref col_map) = check_col_map {
1802            for col in &table_schema.columns {
1803                if let Some(ref check) = col.check_expr {
1804                    let result = eval_expr(check, &EvalCtx::new(col_map, &bufs.row))?;
1805                    if !is_truthy(&result) && !result.is_null() {
1806                        let name = col.check_name.as_deref().unwrap_or(&col.name);
1807                        return Err(SqlError::CheckViolation(name.to_string()));
1808                    }
1809                }
1810            }
1811            for tc in &table_schema.check_constraints {
1812                let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &bufs.row))?;
1813                if !is_truthy(&result) && !result.is_null() {
1814                    let name = tc.name.as_deref().unwrap_or(&tc.sql);
1815                    return Err(SqlError::CheckViolation(name.to_string()));
1816                }
1817            }
1818        }
1819
1820        if has_fks {
1821            for fk in &table_schema.foreign_keys {
1822                let any_null = fk.columns.iter().any(|&ci| bufs.row[ci as usize].is_null());
1823                if any_null {
1824                    continue;
1825                }
1826                crate::encoding::encode_composite_key_from_indices(
1827                    &fk.columns,
1828                    &bufs.row,
1829                    &mut bufs.fk_key_buf,
1830                );
1831                if fk.deferrable && fk.initially_deferred {
1832                    let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
1833                    wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
1834                        fk_name: name,
1835                        foreign_table: fk.foreign_table.as_bytes().to_vec(),
1836                        parent_key: bufs.fk_key_buf.clone(),
1837                    });
1838                    continue;
1839                }
1840                if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &bufs.fk_key_buf) {
1841                    let found = wtx
1842                        .table_get(fk.foreign_table.as_bytes(), &bufs.fk_key_buf)
1843                        .map_err(SqlError::Storage)?;
1844                    if found.is_none() {
1845                        let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
1846                        return Err(SqlError::ForeignKeyViolation(name.to_string()));
1847                    }
1848                    wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &bufs.fk_key_buf);
1849                }
1850            }
1851        }
1852
1853        let proposed_row_for_returning: Option<Vec<Value>> =
1854            returning_rows.as_ref().map(|_| bufs.row.clone());
1855        let row_for_stmt_trigger_impl: Option<Vec<Value>> = if has_insert_statement_triggers_impl {
1856            Some(bufs.row.clone())
1857        } else {
1858            None
1859        };
1860
1861        let has_before_insert_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
1862            t.enabled
1863                && t.timing == crate::parser::TriggerTiming::Before
1864                && t.granularity == crate::parser::TriggerGranularity::ForEachRow
1865                && t.events
1866                    .iter()
1867                    .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1868        });
1869        if has_before_insert_triggers {
1870            super::triggers::fire_row_triggers(
1871                wtx,
1872                schema,
1873                &table_schema.name,
1874                crate::parser::TriggerTiming::Before,
1875                super::triggers::FireEvent::Insert,
1876                None,
1877                Some(bufs.row.clone()),
1878                &table_schema.columns,
1879            )?;
1880        }
1881
1882        for (j, &i) in pk_indices.iter().enumerate() {
1883            bufs.pk_values[j] = std::mem::replace(&mut bufs.row[i], Value::Null);
1884        }
1885        match cache.map(|c| c.single_int_pk).unwrap_or(false) {
1886            true => match bufs.pk_values[0] {
1887                Value::Integer(v) => crate::encoding::encode_int_key_into(v, &mut bufs.key_buf),
1888                _ => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1889            },
1890            false => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1891        }
1892
1893        for &slot in dropped {
1894            bufs.value_values[slot as usize] = Value::Null;
1895        }
1896        for (j, &i) in non_pk.iter().enumerate() {
1897            let col = &table_schema.columns[i];
1898            if matches!(
1899                col.generated_kind,
1900                Some(crate::parser::GeneratedKind::Virtual)
1901            ) {
1902                bufs.value_values[enc_pos[j] as usize] = Value::Null;
1903                bufs.row[i] = Value::Null;
1904            } else {
1905                bufs.value_values[enc_pos[j] as usize] =
1906                    std::mem::replace(&mut bufs.row[i], Value::Null);
1907            }
1908        }
1909        match cache.and_then(|c| c.row_encoder.as_ref()) {
1910            Some(tmpl) => crate::encoding::encode_int_row_with_template(
1911                tmpl,
1912                &bufs.value_values,
1913                &mut bufs.value_buf,
1914            )?,
1915            None => encode_row_into(&bufs.value_values, &mut bufs.value_buf),
1916        }
1917
1918        if bufs.key_buf.len() > citadel_core::MAX_KEY_SIZE {
1919            return Err(SqlError::KeyTooLarge {
1920                size: bufs.key_buf.len(),
1921                max: citadel_core::MAX_KEY_SIZE,
1922            });
1923        }
1924        if bufs.value_buf.len() > citadel_core::MAX_VALUE_SIZE {
1925            return Err(SqlError::RowTooLarge {
1926                size: bufs.value_buf.len(),
1927                max: citadel_core::MAX_VALUE_SIZE,
1928            });
1929        }
1930
1931        match compiled_conflict.as_ref() {
1932            None => {
1933                let is_new = wtx
1934                    .table_insert(table_bytes, &bufs.key_buf, &bufs.value_buf)
1935                    .map_err(SqlError::Storage)?;
1936                if !is_new {
1937                    return Err(SqlError::DuplicateKey);
1938                }
1939                let has_after_insert_triggers =
1940                    schema.triggers_for(&table_schema.name).iter().any(|t| {
1941                        t.enabled
1942                            && t.timing == crate::parser::TriggerTiming::After
1943                            && t.granularity == crate::parser::TriggerGranularity::ForEachRow
1944                            && t.events
1945                                .iter()
1946                                .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1947                    });
1948                if has_indices || has_after_insert_triggers {
1949                    for (j, &i) in pk_indices.iter().enumerate() {
1950                        bufs.row[i] = bufs.pk_values[j].clone();
1951                    }
1952                    for (j, &i) in non_pk.iter().enumerate() {
1953                        bufs.row[i] = std::mem::replace(
1954                            &mut bufs.value_values[enc_pos[j] as usize],
1955                            Value::Null,
1956                        );
1957                    }
1958                    if has_indices {
1959                        insert_index_entries(wtx, table_schema, &bufs.row, &bufs.pk_values)?;
1960                    }
1961                    if has_after_insert_triggers {
1962                        super::triggers::fire_row_triggers(
1963                            wtx,
1964                            schema,
1965                            &table_schema.name,
1966                            crate::parser::TriggerTiming::After,
1967                            super::triggers::FireEvent::Insert,
1968                            None,
1969                            Some(bufs.row.clone()),
1970                            &table_schema.columns,
1971                        )?;
1972                    }
1973                }
1974                if let Some(r) = row_for_stmt_trigger_impl.clone() {
1975                    stmt_new_rows_impl.push(r);
1976                }
1977                count += 1;
1978                if let Some(buf) = returning_rows.as_mut() {
1979                    buf.push((None, proposed_row_for_returning));
1980                }
1981            }
1982            Some(oc) => {
1983                let oc_ref: &CompiledOnConflict = oc;
1984                let needs_row = upsert_needs_row(oc_ref, table_schema);
1985                if needs_row {
1986                    for (j, &i) in pk_indices.iter().enumerate() {
1987                        bufs.row[i] = bufs.pk_values[j].clone();
1988                    }
1989                    for (j, &i) in non_pk.iter().enumerate() {
1990                        bufs.row[i] = std::mem::replace(
1991                            &mut bufs.value_values[enc_pos[j] as usize],
1992                            Value::Null,
1993                        );
1994                    }
1995                }
1996                let outcome = apply_insert_with_conflict(
1997                    wtx,
1998                    table_schema,
1999                    &bufs.key_buf,
2000                    &bufs.value_buf,
2001                    &bufs.row,
2002                    &bufs.pk_values,
2003                    oc_ref,
2004                    row_col_map.unwrap(),
2005                    stmt.returning.is_some(),
2006                )?;
2007                match outcome {
2008                    InsertRowOutcome::Inserted => {
2009                        count += 1;
2010                        if let Some(buf) = returning_rows.as_mut() {
2011                            buf.push((None, proposed_row_for_returning));
2012                        }
2013                        if let Some(r) = row_for_stmt_trigger_impl.clone() {
2014                            stmt_new_rows_impl.push(r);
2015                        }
2016                        let has_after_insert_triggers =
2017                            schema.triggers_for(&table_schema.name).iter().any(|t| {
2018                                t.enabled
2019                                    && t.timing == crate::parser::TriggerTiming::After
2020                                    && t.granularity
2021                                        == crate::parser::TriggerGranularity::ForEachRow
2022                                    && t.events
2023                                        .iter()
2024                                        .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
2025                            });
2026                        if has_after_insert_triggers {
2027                            super::triggers::fire_row_triggers(
2028                                wtx,
2029                                schema,
2030                                &table_schema.name,
2031                                crate::parser::TriggerTiming::After,
2032                                super::triggers::FireEvent::Insert,
2033                                None,
2034                                Some(bufs.row.clone()),
2035                                &table_schema.columns,
2036                            )?;
2037                        }
2038                    }
2039                    InsertRowOutcome::Updated { old, new } => {
2040                        count += 1;
2041                        if let Some(buf) = returning_rows.as_mut() {
2042                            buf.push((Some(old.clone()), Some(new.clone())));
2043                        }
2044                        let has_after_update_triggers =
2045                            schema.triggers_for(&table_schema.name).iter().any(|t| {
2046                                t.enabled
2047                                    && t.timing == crate::parser::TriggerTiming::After
2048                                    && t.granularity
2049                                        == crate::parser::TriggerGranularity::ForEachRow
2050                                    && t.events.iter().any(|e| {
2051                                        matches!(e, crate::parser::TriggerEvent::Update(_))
2052                                    })
2053                            });
2054                        if has_after_update_triggers {
2055                            let changed_cols: Vec<String> = match oc_ref {
2056                                CompiledOnConflict::DoUpdate { assignments, .. } => assignments
2057                                    .iter()
2058                                    .map(|(col_idx, _)| table_schema.columns[*col_idx].name.clone())
2059                                    .collect(),
2060                                _ => Vec::new(),
2061                            };
2062                            super::triggers::fire_row_triggers(
2063                                wtx,
2064                                schema,
2065                                &table_schema.name,
2066                                crate::parser::TriggerTiming::After,
2067                                super::triggers::FireEvent::Update {
2068                                    changed_columns: &changed_cols,
2069                                },
2070                                Some(old),
2071                                Some(new),
2072                                &table_schema.columns,
2073                            )?;
2074                        }
2075                    }
2076                    InsertRowOutcome::Skipped => {}
2077                }
2078            }
2079        }
2080    }
2081
2082    if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
2083        if has_insert_statement_triggers_impl {
2084            super::triggers::fire_statement_triggers(
2085                wtx,
2086                schema,
2087                &table_schema.name,
2088                crate::parser::TriggerTiming::After,
2089                super::triggers::FireEvent::Insert,
2090                &table_schema.columns,
2091                &[],
2092                &stmt_new_rows_impl,
2093            )?;
2094        }
2095        return Ok(ExecutionResult::Query(super::helpers::project_returning(
2096            table_schema,
2097            returning_cols,
2098            &rows,
2099        )?));
2100    }
2101
2102    if has_insert_statement_triggers_impl {
2103        super::triggers::fire_statement_triggers(
2104            wtx,
2105            schema,
2106            &table_schema.name,
2107            crate::parser::TriggerTiming::After,
2108            super::triggers::FireEvent::Insert,
2109            &table_schema.columns,
2110            &[],
2111            &stmt_new_rows_impl,
2112        )?;
2113    }
2114
2115    Ok(ExecutionResult::RowsAffected(count))
2116}
2117
2118pub struct CompiledInsert {
2119    table_lower: String,
2120    cached: Option<InsertCache>,
2121}
2122
2123struct InsertCache {
2124    col_indices: Vec<usize>,
2125    has_subquery: bool,
2126    any_defaults: bool,
2127    has_checks: bool,
2128    on_conflict: Option<Arc<CompiledOnConflict>>,
2129    row_col_map: Option<ColumnMap>,
2130    generated_col_positions: Vec<usize>,
2131    generated_fast_evals: Vec<FastGenEval>,
2132    pk_indices: Vec<usize>,
2133    non_pk_indices: Vec<usize>,
2134    encoding_positions: Vec<u16>,
2135    dropped_non_pk_slots: Vec<u16>,
2136    phys_count: usize,
2137    single_int_pk: bool,
2138    not_null_indices: Vec<u16>,
2139    bind_plan: Option<Vec<BindAction>>,
2140    row_fully_overwritten: bool,
2141    row_encoder: Option<crate::encoding::IntRowTemplate>,
2142    is_trivial_fast: bool,
2143    trivial_fast_program: Option<TrivialFastProgram>,
2144    needs_scoped_params: bool,
2145}
2146
2147#[derive(Clone)]
2148enum BindAction {
2149    Param {
2150        param_idx: usize,
2151        col_idx: usize,
2152        target: DataType,
2153    },
2154    Literal {
2155        value: Value,
2156        col_idx: usize,
2157    },
2158}
2159
2160#[derive(Clone)]
2161struct TrivialFastProgram {
2162    template: Vec<u8>,
2163    ops: Vec<WriteOp>,
2164    pk_param: u8,
2165    not_null_param_indices: Vec<u8>,
2166}
2167
2168#[derive(Clone)]
2169enum WriteOp {
2170    ParamI64 {
2171        param_idx: u8,
2172        off: u32,
2173    },
2174    LiteralI64 {
2175        value: i64,
2176        off: u32,
2177    },
2178    GenAddParamsI64 {
2179        a_param: u8,
2180        b_param: u8,
2181        off: u32,
2182        bitmap_byte_off: u32,
2183        bitmap_bit_mask: u8,
2184    },
2185    GenMulAddParamI64 {
2186        param_idx: u8,
2187        mul: i64,
2188        add: i64,
2189        off: u32,
2190        bitmap_byte_off: u32,
2191        bitmap_bit_mask: u8,
2192    },
2193}
2194
2195fn build_trivial_fast_program(
2196    bind_plan: &[BindAction],
2197    row_encoder: &crate::encoding::IntRowTemplate,
2198    non_virtual_pairs: &[(usize, usize)],
2199    generated_col_positions: &[usize],
2200    generated_fast_evals: &[FastGenEval],
2201    pk_indices: &[usize],
2202    columns: &[crate::types::ColumnDef],
2203) -> Option<TrivialFastProgram> {
2204    let pk_col = pk_indices[0];
2205    let col_to_slot: rustc_hash::FxHashMap<usize, usize> =
2206        non_virtual_pairs.iter().copied().collect();
2207    let slot_to_off: rustc_hash::FxHashMap<usize, usize> =
2208        row_encoder.slot_offsets.iter().copied().collect();
2209
2210    let mut col_to_param: rustc_hash::FxHashMap<usize, u8> = Default::default();
2211    let mut col_to_lit_int: rustc_hash::FxHashMap<usize, i64> = Default::default();
2212    let mut pk_param: Option<u8> = None;
2213    let mut ops: Vec<WriteOp> = Vec::with_capacity(bind_plan.len() + generated_col_positions.len());
2214    let mut not_null_param_indices: Vec<u8> = Vec::new();
2215
2216    for action in bind_plan {
2217        match action {
2218            BindAction::Param {
2219                param_idx,
2220                col_idx,
2221                target,
2222            } => {
2223                if *target != DataType::Integer {
2224                    return None;
2225                }
2226                let pi: u8 = u8::try_from(*param_idx).ok()?;
2227                col_to_param.insert(*col_idx, pi);
2228                if *col_idx == pk_col {
2229                    pk_param = Some(pi);
2230                } else {
2231                    let slot = *col_to_slot.get(col_idx)?;
2232                    let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
2233                    ops.push(WriteOp::ParamI64 { param_idx: pi, off });
2234                    if !columns[*col_idx].nullable {
2235                        not_null_param_indices.push(pi);
2236                    }
2237                }
2238            }
2239            BindAction::Literal { value, col_idx } => match value {
2240                Value::Integer(v) => {
2241                    col_to_lit_int.insert(*col_idx, *v);
2242                    if *col_idx == pk_col {
2243                        return None;
2244                    }
2245                    let slot = *col_to_slot.get(col_idx)?;
2246                    let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
2247                    ops.push(WriteOp::LiteralI64 { value: *v, off });
2248                }
2249                _ => return None,
2250            },
2251        }
2252    }
2253
2254    let pk_param = pk_param?;
2255
2256    for (i, &gen_pos) in generated_col_positions.iter().enumerate() {
2257        let gen_slot = *col_to_slot.get(&gen_pos)?;
2258        let gen_off = u32::try_from(*slot_to_off.get(&gen_slot)?).ok()?;
2259        let bitmap_byte_off = u32::try_from(2 + gen_slot / 8).ok()?;
2260        let bitmap_bit_mask: u8 = 1u8 << (gen_slot % 8);
2261        let gen_col_nullable = columns[gen_pos].nullable;
2262
2263        match &generated_fast_evals[i] {
2264            FastGenEval::IntColAddCol {
2265                left_idx,
2266                right_idx,
2267            } => {
2268                let a_param = col_to_param.get(left_idx).copied();
2269                let b_param = col_to_param.get(right_idx).copied();
2270                match (a_param, b_param) {
2271                    (Some(ap), Some(bp)) => {
2272                        let deps_safe = gen_col_nullable
2273                            || (not_null_param_indices.contains(&ap)
2274                                && not_null_param_indices.contains(&bp));
2275                        if !deps_safe {
2276                            return None;
2277                        }
2278                        ops.push(WriteOp::GenAddParamsI64 {
2279                            a_param: ap,
2280                            b_param: bp,
2281                            off: gen_off,
2282                            bitmap_byte_off,
2283                            bitmap_bit_mask,
2284                        });
2285                    }
2286                    (Some(p), None) => {
2287                        let lit = col_to_lit_int.get(right_idx).copied()?;
2288                        if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2289                            return None;
2290                        }
2291                        ops.push(WriteOp::GenMulAddParamI64 {
2292                            param_idx: p,
2293                            mul: 1,
2294                            add: lit,
2295                            off: gen_off,
2296                            bitmap_byte_off,
2297                            bitmap_bit_mask,
2298                        });
2299                    }
2300                    (None, Some(p)) => {
2301                        let lit = col_to_lit_int.get(left_idx).copied()?;
2302                        if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2303                            return None;
2304                        }
2305                        ops.push(WriteOp::GenMulAddParamI64 {
2306                            param_idx: p,
2307                            mul: 1,
2308                            add: lit,
2309                            off: gen_off,
2310                            bitmap_byte_off,
2311                            bitmap_bit_mask,
2312                        });
2313                    }
2314                    (None, None) => {
2315                        let la = col_to_lit_int.get(left_idx).copied()?;
2316                        let lb = col_to_lit_int.get(right_idx).copied()?;
2317                        ops.push(WriteOp::LiteralI64 {
2318                            value: la.wrapping_add(lb),
2319                            off: gen_off,
2320                        });
2321                    }
2322                }
2323            }
2324            FastGenEval::IntColMulAdd {
2325                col_schema_idx,
2326                mul,
2327                add,
2328            } => {
2329                if let Some(p) = col_to_param.get(col_schema_idx).copied() {
2330                    if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2331                        return None;
2332                    }
2333                    ops.push(WriteOp::GenMulAddParamI64 {
2334                        param_idx: p,
2335                        mul: *mul,
2336                        add: *add,
2337                        off: gen_off,
2338                        bitmap_byte_off,
2339                        bitmap_bit_mask,
2340                    });
2341                } else if let Some(lit) = col_to_lit_int.get(col_schema_idx).copied() {
2342                    ops.push(WriteOp::LiteralI64 {
2343                        value: lit.wrapping_mul(*mul).wrapping_add(*add),
2344                        off: gen_off,
2345                    });
2346                } else {
2347                    return None;
2348                }
2349            }
2350            FastGenEval::None => return None,
2351        }
2352    }
2353
2354    Some(TrivialFastProgram {
2355        template: row_encoder.template.clone(),
2356        ops,
2357        pk_param,
2358        not_null_param_indices,
2359    })
2360}
2361
2362#[derive(Clone)]
2363pub(super) enum CompiledOnConflict {
2364    DoNothing {
2365        target: Option<ConflictKind>,
2366    },
2367    DoUpdate {
2368        target: ConflictKind,
2369        assignments: Vec<(usize, Expr)>,
2370        where_clause: Option<Expr>,
2371        fast_paths: Option<Vec<DoUpdateFastPath>>,
2372    },
2373}
2374
2375#[derive(Clone, Copy)]
2376pub(super) enum DoUpdateFastPath {
2377    IntAddConst { phys_idx: usize, delta: i64 },
2378}
2379
2380#[derive(Clone, Debug)]
2381pub(super) enum ConflictKind {
2382    PrimaryKey,
2383    UniqueIndex { index_idx: usize },
2384}
2385
2386fn resolve_conflict_target(target: &ConflictTarget, ts: &TableSchema) -> Result<ConflictKind> {
2387    match target {
2388        ConflictTarget::Columns(cols) => {
2389            let col_idx_set: Vec<u16> = cols
2390                .iter()
2391                .map(|name| {
2392                    ts.column_index(name)
2393                        .map(|i| i as u16)
2394                        .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
2395                })
2396                .collect::<Result<_>>()?;
2397            let pk_set = ts.primary_key_columns.clone();
2398            if set_equal(&col_idx_set, &pk_set) {
2399                return Ok(ConflictKind::PrimaryKey);
2400            }
2401            for (index_idx, idx) in ts.indices.iter().enumerate() {
2402                if idx.unique && set_equal(&col_idx_set, &idx.columns_vec()) {
2403                    return Ok(ConflictKind::UniqueIndex { index_idx });
2404                }
2405            }
2406            Err(SqlError::Plan(
2407                "ON CONFLICT target does not match any unique constraint".into(),
2408            ))
2409        }
2410        ConflictTarget::Constraint(name) => {
2411            let lower = name.to_ascii_lowercase();
2412            for (index_idx, idx) in ts.indices.iter().enumerate() {
2413                if idx.name.eq_ignore_ascii_case(&lower) {
2414                    if idx.unique {
2415                        return Ok(ConflictKind::UniqueIndex { index_idx });
2416                    }
2417                    return Err(SqlError::Plan(format!(
2418                        "ON CONFLICT ON CONSTRAINT '{name}' requires a unique index"
2419                    )));
2420                }
2421            }
2422            Err(SqlError::Plan(format!(
2423                "unknown constraint '{name}'; primary keys cannot be referenced by name, use ON CONFLICT (col_list)"
2424            )))
2425        }
2426    }
2427}
2428
2429fn set_equal(a: &[u16], b: &[u16]) -> bool {
2430    if a.len() != b.len() {
2431        return false;
2432    }
2433    let mut a_sorted = a.to_vec();
2434    let mut b_sorted = b.to_vec();
2435    a_sorted.sort_unstable();
2436    b_sorted.sort_unstable();
2437    a_sorted == b_sorted
2438}
2439
2440pub(super) enum InsertRowOutcome {
2441    Inserted,
2442    Updated { old: Vec<Value>, new: Vec<Value> },
2443    Skipped,
2444}
2445
2446#[allow(clippy::too_many_arguments)]
2447#[inline]
2448pub(super) fn apply_insert_with_conflict(
2449    wtx: &mut WriteTxn<'_>,
2450    table_schema: &TableSchema,
2451    key_buf: &[u8],
2452    value_buf: &[u8],
2453    row: &[Value],
2454    pk_values: &[Value],
2455    on_conflict: &CompiledOnConflict,
2456    col_map: &ColumnMap,
2457    capture_returning: bool,
2458) -> Result<InsertRowOutcome> {
2459    let table_bytes = table_schema.name.as_bytes();
2460
2461    if let CompiledOnConflict::DoNothing { target } = on_conflict {
2462        let pk_target = matches!(target, None | Some(ConflictKind::PrimaryKey));
2463        if pk_target && table_schema.indices.is_empty() && table_schema.foreign_keys.is_empty() {
2464            let inserted = wtx
2465                .table_insert_if_absent(table_bytes, key_buf, value_buf)
2466                .map_err(SqlError::Storage)?;
2467            return Ok(if inserted {
2468                InsertRowOutcome::Inserted
2469            } else {
2470                InsertRowOutcome::Skipped
2471            });
2472        }
2473    }
2474
2475    if let CompiledOnConflict::DoUpdate {
2476        target: ConflictKind::PrimaryKey,
2477        assignments,
2478        where_clause,
2479        fast_paths,
2480    } = on_conflict
2481    {
2482        if can_fuse_do_update(table_schema, assignments) {
2483            return apply_do_update_fused(
2484                wtx,
2485                table_schema,
2486                table_bytes,
2487                key_buf,
2488                value_buf,
2489                row,
2490                assignments,
2491                where_clause.as_ref(),
2492                col_map,
2493                fast_paths.as_deref(),
2494                capture_returning,
2495            );
2496        }
2497    }
2498
2499    let primary_outcome = wtx
2500        .table_insert_or_fetch(table_bytes, key_buf, value_buf)
2501        .map_err(SqlError::Storage)?;
2502
2503    match primary_outcome {
2504        citadel_txn::write_txn::InsertOutcome::Inserted => {
2505            if table_schema.indices.is_empty() {
2506                return Ok(InsertRowOutcome::Inserted);
2507            }
2508            let mut inserted_keys: Vec<(usize, Vec<u8>)> = Vec::new();
2509            match insert_index_entries_or_fetch(
2510                wtx,
2511                table_schema,
2512                row,
2513                pk_values,
2514                &mut inserted_keys,
2515            )? {
2516                None => Ok(InsertRowOutcome::Inserted),
2517                Some(conflicting_idx) => {
2518                    let matches_target =
2519                        matches!(on_conflict, CompiledOnConflict::DoNothing { target: None })
2520                            || matches!(
2521                                on_conflict,
2522                                CompiledOnConflict::DoNothing {
2523                                    target: Some(ConflictKind::UniqueIndex { index_idx }),
2524                                } | CompiledOnConflict::DoUpdate {
2525                                    target: ConflictKind::UniqueIndex { index_idx },
2526                                    ..
2527                                } if *index_idx == conflicting_idx
2528                            );
2529                    undo_partial_insert(wtx, table_schema, key_buf, &inserted_keys)?;
2530                    if !matches_target {
2531                        return Err(SqlError::UniqueViolation(
2532                            table_schema.indices[conflicting_idx].name.clone(),
2533                        ));
2534                    }
2535                    match on_conflict {
2536                        CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2537                        CompiledOnConflict::DoUpdate {
2538                            assignments,
2539                            where_clause,
2540                            ..
2541                        } => {
2542                            let existing_pk =
2543                                fetch_unique_index_pk(wtx, table_schema, conflicting_idx, row)?;
2544                            apply_do_update(
2545                                wtx,
2546                                table_schema,
2547                                &existing_pk,
2548                                row,
2549                                assignments,
2550                                where_clause.as_ref(),
2551                                col_map,
2552                                capture_returning,
2553                            )
2554                        }
2555                    }
2556                }
2557            }
2558        }
2559        citadel_txn::write_txn::InsertOutcome::Existed(old_bytes) => {
2560            let matches_target = matches!(
2561                on_conflict,
2562                CompiledOnConflict::DoNothing { target: None }
2563                    | CompiledOnConflict::DoNothing {
2564                        target: Some(ConflictKind::PrimaryKey),
2565                    }
2566                    | CompiledOnConflict::DoUpdate {
2567                        target: ConflictKind::PrimaryKey,
2568                        ..
2569                    }
2570            );
2571            if !matches_target {
2572                return Err(SqlError::DuplicateKey);
2573            }
2574            match on_conflict {
2575                CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2576                CompiledOnConflict::DoUpdate {
2577                    assignments,
2578                    where_clause,
2579                    ..
2580                } => {
2581                    let old_row = decode_full_row(table_schema, key_buf, &old_bytes)?;
2582                    apply_do_update_with_old_row(
2583                        wtx,
2584                        table_schema,
2585                        key_buf,
2586                        &old_row,
2587                        row,
2588                        assignments,
2589                        where_clause.as_ref(),
2590                        col_map,
2591                        capture_returning,
2592                    )
2593                }
2594            }
2595        }
2596    }
2597}
2598
2599#[inline]
2600fn apply_fast_path_patch(
2601    old_bytes: &[u8],
2602    fast_paths: &[DoUpdateFastPath],
2603) -> Result<UpsertAction> {
2604    UPSERT_SCRATCH.with(|slot| {
2605        let mut bufs = slot.borrow_mut();
2606        bufs.new_value_buf.clear();
2607        bufs.new_value_buf.extend_from_slice(old_bytes);
2608
2609        let mut patch_scratch: Vec<u8> = Vec::new();
2610
2611        for fp in fast_paths {
2612            match fp {
2613                DoUpdateFastPath::IntAddConst { phys_idx, delta } => {
2614                    let decoded =
2615                        crate::encoding::decode_columns(&bufs.new_value_buf, &[*phys_idx])?;
2616                    let old_val = &decoded[0];
2617                    let new_val = match old_val {
2618                        Value::Integer(i) => Value::Integer(i.wrapping_add(*delta)),
2619                        Value::Null => Value::Null,
2620                        _ => {
2621                            return Err(SqlError::TypeMismatch {
2622                                expected: "INTEGER".into(),
2623                                got: old_val.data_type().to_string(),
2624                            });
2625                        }
2626                    };
2627                    if !crate::encoding::patch_column_in_place(
2628                        &mut bufs.new_value_buf,
2629                        *phys_idx,
2630                        &new_val,
2631                    )? {
2632                        patch_scratch.clear();
2633                        crate::encoding::patch_row_column(
2634                            &bufs.new_value_buf,
2635                            *phys_idx,
2636                            &new_val,
2637                            &mut patch_scratch,
2638                        )?;
2639                        std::mem::swap(&mut bufs.new_value_buf, &mut patch_scratch);
2640                    }
2641                }
2642            }
2643        }
2644
2645        if bufs.new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2646            return Err(SqlError::RowTooLarge {
2647                size: bufs.new_value_buf.len(),
2648                max: citadel_core::MAX_VALUE_SIZE,
2649            });
2650        }
2651
2652        Ok(UpsertAction::Replace(bufs.new_value_buf.clone()))
2653    })
2654}
2655
2656fn upsert_needs_row(oc: &CompiledOnConflict, ts: &TableSchema) -> bool {
2657    if !ts.indices.is_empty() {
2658        return true;
2659    }
2660    match oc {
2661        CompiledOnConflict::DoNothing { .. } => false,
2662        CompiledOnConflict::DoUpdate { fast_paths, .. } => fast_paths.is_none() || ts.has_checks(),
2663    }
2664}
2665
2666fn can_fuse_do_update(ts: &TableSchema, assignments: &[(usize, Expr)]) -> bool {
2667    if !ts.indices.is_empty() {
2668        return false;
2669    }
2670    if !ts.foreign_keys.is_empty() {
2671        return false;
2672    }
2673    if ts.columns.iter().any(|c| c.generated_kind.is_some()) {
2674        return false;
2675    }
2676    let pk = ts.pk_indices();
2677    !assignments.iter().any(|(ci, _)| pk.contains(ci))
2678}
2679
2680#[allow(clippy::too_many_arguments)]
2681#[inline]
2682fn apply_do_update_fused(
2683    wtx: &mut WriteTxn<'_>,
2684    table_schema: &TableSchema,
2685    table_bytes: &[u8],
2686    key_buf: &[u8],
2687    value_buf: &[u8],
2688    proposed_row: &[Value],
2689    assignments: &[(usize, Expr)],
2690    where_clause: Option<&Expr>,
2691    col_map: &ColumnMap,
2692    fast_paths: Option<&[DoUpdateFastPath]>,
2693    capture_returning: bool,
2694) -> Result<InsertRowOutcome> {
2695    let non_pk = table_schema.non_pk_indices();
2696    let enc_pos = table_schema.encoding_positions();
2697    let phys_count = table_schema.physical_non_pk_count();
2698    let dropped = table_schema.dropped_non_pk_slots();
2699    let has_checks = table_schema.has_checks();
2700    let has_fks = !table_schema.foreign_keys.is_empty();
2701
2702    let captured: std::cell::RefCell<Option<(Vec<Value>, Vec<Value>)>> =
2703        std::cell::RefCell::new(None);
2704
2705    let outcome =
2706        wtx.table_upsert_with::<_, SqlError>(table_bytes, key_buf, value_buf, |old_bytes| {
2707            if let Some(fps) = fast_paths {
2708                if !has_checks {
2709                    let action = apply_fast_path_patch(old_bytes, fps)?;
2710                    if capture_returning {
2711                        if let UpsertAction::Replace(ref new_bytes) = action {
2712                            let old_row = decode_full_row(table_schema, key_buf, old_bytes)?;
2713                            let new_row = decode_full_row(table_schema, key_buf, new_bytes)?;
2714                            *captured.borrow_mut() = Some((old_row, new_row));
2715                        }
2716                    }
2717                    return Ok(action);
2718                }
2719            }
2720            UPSERT_SCRATCH.with(|slot| {
2721                let mut bufs = slot.borrow_mut();
2722                let UpsertBufs {
2723                    old_row,
2724                    new_row,
2725                    value_values,
2726                    new_value_buf,
2727                } = &mut *bufs;
2728
2729                old_row.clear();
2730                old_row.resize(table_schema.columns.len(), Value::Null);
2731                decode_full_row_into(table_schema, key_buf, old_bytes, old_row)?;
2732
2733                if let Some(w) = where_clause {
2734                    let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2735                    let result = eval_expr(w, &ctx)?;
2736                    if result.is_null() || !is_truthy(&result) {
2737                        return Ok(UpsertAction::Skip);
2738                    }
2739                }
2740
2741                new_row.clear();
2742                new_row.extend_from_slice(old_row);
2743                for (col_idx, expr) in assignments {
2744                    let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2745                    let val = eval_expr(expr, &ctx)?;
2746                    let col = &table_schema.columns[*col_idx];
2747                    new_row[*col_idx] = if val.is_null() {
2748                        Value::Null
2749                    } else {
2750                        let got = val.data_type();
2751                        val.coerce_into(col.data_type)
2752                            .ok_or_else(|| SqlError::TypeMismatch {
2753                                expected: col.data_type.to_string(),
2754                                got: got.to_string(),
2755                            })?
2756                    };
2757                }
2758
2759                for (assigned_idx, _) in assignments {
2760                    let col = &table_schema.columns[*assigned_idx];
2761                    if !col.nullable && new_row[col.position as usize].is_null() {
2762                        return Err(SqlError::NotNullViolation(col.name.clone()));
2763                    }
2764                }
2765                if has_checks {
2766                    for col in &table_schema.columns {
2767                        if let Some(ref check) = col.check_expr {
2768                            let ctx = EvalCtx::new(col_map, new_row);
2769                            let result = eval_expr(check, &ctx)?;
2770                            if !is_truthy(&result) && !result.is_null() {
2771                                let name = col.check_name.as_deref().unwrap_or(&col.name);
2772                                return Err(SqlError::CheckViolation(name.to_string()));
2773                            }
2774                        }
2775                    }
2776                    for tc in &table_schema.check_constraints {
2777                        let ctx = EvalCtx::new(col_map, new_row);
2778                        let result = eval_expr(&tc.expr, &ctx)?;
2779                        if !is_truthy(&result) && !result.is_null() {
2780                            let name = tc.name.as_deref().unwrap_or(&tc.sql);
2781                            return Err(SqlError::CheckViolation(name.to_string()));
2782                        }
2783                    }
2784                }
2785                let _ = has_fks;
2786
2787                value_values.clear();
2788                value_values.resize(phys_count, Value::Null);
2789                for &slot in dropped {
2790                    value_values[slot as usize] = Value::Null;
2791                }
2792                for (j, &i) in non_pk.iter().enumerate() {
2793                    value_values[enc_pos[j] as usize] = new_row[i].clone();
2794                }
2795                new_value_buf.clear();
2796                crate::encoding::encode_row_into(value_values, new_value_buf);
2797
2798                if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2799                    return Err(SqlError::RowTooLarge {
2800                        size: new_value_buf.len(),
2801                        max: citadel_core::MAX_VALUE_SIZE,
2802                    });
2803                }
2804
2805                if capture_returning {
2806                    *captured.borrow_mut() = Some((old_row.clone(), new_row.clone()));
2807                }
2808                Ok(UpsertAction::Replace(new_value_buf.clone()))
2809            })
2810        })?;
2811
2812    match outcome {
2813        UpsertOutcome::Inserted => Ok(InsertRowOutcome::Inserted),
2814        UpsertOutcome::Updated => {
2815            if capture_returning {
2816                let (old, new) = captured.into_inner().ok_or_else(|| {
2817                    SqlError::InvalidValue("DO UPDATE produced no captured rows".into())
2818                })?;
2819                Ok(InsertRowOutcome::Updated { old, new })
2820            } else {
2821                Ok(InsertRowOutcome::Inserted)
2822            }
2823        }
2824        UpsertOutcome::Skipped => Ok(InsertRowOutcome::Skipped),
2825    }
2826}
2827
2828fn fetch_unique_index_pk(
2829    wtx: &mut WriteTxn<'_>,
2830    table_schema: &TableSchema,
2831    index_idx: usize,
2832    row: &[Value],
2833) -> Result<Vec<u8>> {
2834    let idx = &table_schema.indices[index_idx];
2835    let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2836    let indexed: Vec<Value> = idx
2837        .column_positions_iter()
2838        .map(|col_idx| row[col_idx as usize].clone())
2839        .collect();
2840    let key = crate::encoding::encode_composite_key(&indexed);
2841    let value = wtx
2842        .table_get(&idx_table, &key)
2843        .map_err(SqlError::Storage)?
2844        .ok_or_else(|| {
2845            SqlError::InvalidValue("unique index missing expected collision entry".into())
2846        })?;
2847    Ok(value)
2848}
2849
2850#[allow(clippy::too_many_arguments)]
2851fn apply_do_update(
2852    wtx: &mut WriteTxn<'_>,
2853    table_schema: &TableSchema,
2854    pk_key: &[u8],
2855    proposed_row: &[Value],
2856    assignments: &[(usize, Expr)],
2857    where_clause: Option<&Expr>,
2858    col_map: &ColumnMap,
2859    capture_returning: bool,
2860) -> Result<InsertRowOutcome> {
2861    let old_value = wtx
2862        .table_get(table_schema.name.as_bytes(), pk_key)
2863        .map_err(SqlError::Storage)?
2864        .ok_or_else(|| SqlError::InvalidValue("primary row missing for DO UPDATE target".into()))?;
2865    let old_row = decode_full_row(table_schema, pk_key, &old_value)?;
2866    apply_do_update_with_old_row(
2867        wtx,
2868        table_schema,
2869        pk_key,
2870        &old_row,
2871        proposed_row,
2872        assignments,
2873        where_clause,
2874        col_map,
2875        capture_returning,
2876    )
2877}
2878
2879#[allow(clippy::too_many_arguments)]
2880fn apply_do_update_with_old_row(
2881    wtx: &mut WriteTxn<'_>,
2882    table_schema: &TableSchema,
2883    old_pk_key: &[u8],
2884    old_row: &[Value],
2885    proposed_row: &[Value],
2886    assignments: &[(usize, Expr)],
2887    where_clause: Option<&Expr>,
2888    col_map: &ColumnMap,
2889    capture_returning: bool,
2890) -> Result<InsertRowOutcome> {
2891    if let Some(w) = where_clause {
2892        let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2893        let result = eval_expr(w, &ctx)?;
2894        if result.is_null() || !is_truthy(&result) {
2895            return Ok(InsertRowOutcome::Skipped);
2896        }
2897    }
2898
2899    let mut new_row = old_row.to_vec();
2900    for (col_idx, expr) in assignments {
2901        let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2902        let val = eval_expr(expr, &ctx)?;
2903        let col = &table_schema.columns[*col_idx];
2904        new_row[*col_idx] = if val.is_null() {
2905            Value::Null
2906        } else {
2907            let got = val.data_type();
2908            val.coerce_into(col.data_type)
2909                .ok_or_else(|| SqlError::TypeMismatch {
2910                    expected: col.data_type.to_string(),
2911                    got: got.to_string(),
2912                })?
2913        };
2914    }
2915
2916    for col in &table_schema.columns {
2917        if matches!(
2918            col.generated_kind,
2919            Some(crate::parser::GeneratedKind::Stored)
2920        ) {
2921            let val = eval_expr(
2922                col.generated_expr.as_ref().unwrap(),
2923                &EvalCtx::new(col_map, &new_row),
2924            )?;
2925            let pos = col.position as usize;
2926            new_row[pos] = if val.is_null() {
2927                if !col.nullable {
2928                    return Err(SqlError::NotNullViolation(col.name.clone()));
2929                }
2930                Value::Null
2931            } else {
2932                let got = val.data_type();
2933                val.coerce_into(col.data_type)
2934                    .ok_or_else(|| SqlError::TypeMismatch {
2935                        expected: col.data_type.to_string(),
2936                        got: got.to_string(),
2937                    })?
2938            };
2939        }
2940    }
2941
2942    let pk_indices = table_schema.pk_indices();
2943    let assigned_pk = assignments.iter().any(|(ci, _)| pk_indices.contains(ci));
2944    let pk_changed = assigned_pk && pk_indices.iter().any(|&i| old_row[i] != new_row[i]);
2945
2946    for (assigned_idx, _) in assignments {
2947        let col = &table_schema.columns[*assigned_idx];
2948        if !col.nullable && new_row[col.position as usize].is_null() {
2949            return Err(SqlError::NotNullViolation(col.name.clone()));
2950        }
2951    }
2952    if table_schema.has_checks() {
2953        for col in &table_schema.columns {
2954            if let Some(ref check) = col.check_expr {
2955                let ctx = EvalCtx::new(col_map, &new_row);
2956                let result = eval_expr(check, &ctx)?;
2957                if !is_truthy(&result) && !result.is_null() {
2958                    let name = col.check_name.as_deref().unwrap_or(&col.name);
2959                    return Err(SqlError::CheckViolation(name.to_string()));
2960                }
2961            }
2962        }
2963        for tc in &table_schema.check_constraints {
2964            let ctx = EvalCtx::new(col_map, &new_row);
2965            let result = eval_expr(&tc.expr, &ctx)?;
2966            if !is_truthy(&result) && !result.is_null() {
2967                let name = tc.name.as_deref().unwrap_or(&tc.sql);
2968                return Err(SqlError::CheckViolation(name.to_string()));
2969            }
2970        }
2971    }
2972    for fk in &table_schema.foreign_keys {
2973        let changed = fk
2974            .columns
2975            .iter()
2976            .any(|&ci| old_row[ci as usize] != new_row[ci as usize]);
2977        if !changed {
2978            continue;
2979        }
2980        let any_null = fk.columns.iter().any(|&ci| new_row[ci as usize].is_null());
2981        if any_null {
2982            continue;
2983        }
2984        let fk_vals: Vec<Value> = fk
2985            .columns
2986            .iter()
2987            .map(|&ci| new_row[ci as usize].clone())
2988            .collect();
2989        let fk_key = crate::encoding::encode_composite_key(&fk_vals);
2990        if fk.deferrable && fk.initially_deferred {
2991            let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
2992            wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
2993                fk_name: name,
2994                foreign_table: fk.foreign_table.as_bytes().to_vec(),
2995                parent_key: fk_key,
2996            });
2997            continue;
2998        }
2999        if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key) {
3000            let found = wtx
3001                .table_get(fk.foreign_table.as_bytes(), &fk_key)
3002                .map_err(SqlError::Storage)?;
3003            if found.is_none() {
3004                let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
3005                return Err(SqlError::ForeignKeyViolation(name.to_string()));
3006            }
3007            wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key);
3008        }
3009    }
3010
3011    let has_indices = !table_schema.indices.is_empty();
3012    let old_pk_values: Vec<Value> = if has_indices || pk_changed {
3013        pk_indices.iter().map(|&i| old_row[i].clone()).collect()
3014    } else {
3015        Vec::new()
3016    };
3017    let new_pk_values: Vec<Value> = if has_indices || pk_changed {
3018        pk_indices.iter().map(|&i| new_row[i].clone()).collect()
3019    } else {
3020        Vec::new()
3021    };
3022
3023    let non_pk = table_schema.non_pk_indices();
3024    let enc_pos = table_schema.encoding_positions();
3025    let phys_count = table_schema.physical_non_pk_count();
3026    let dropped = table_schema.dropped_non_pk_slots();
3027    let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
3028    for &slot in dropped {
3029        value_values[slot as usize] = Value::Null;
3030    }
3031    for (j, &i) in non_pk.iter().enumerate() {
3032        let col = &table_schema.columns[i];
3033        value_values[enc_pos[j] as usize] = if matches!(
3034            col.generated_kind,
3035            Some(crate::parser::GeneratedKind::Virtual)
3036        ) {
3037            Value::Null
3038        } else {
3039            new_row[i].clone()
3040        };
3041    }
3042    let mut new_value_buf = Vec::with_capacity(256);
3043    crate::encoding::encode_row_into(&value_values, &mut new_value_buf);
3044
3045    if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
3046        return Err(SqlError::RowTooLarge {
3047            size: new_value_buf.len(),
3048            max: citadel_core::MAX_VALUE_SIZE,
3049        });
3050    }
3051
3052    if pk_changed {
3053        let new_pk_key = crate::encoding::encode_composite_key(&new_pk_values);
3054        let inserted = wtx
3055            .table_insert(table_schema.name.as_bytes(), &new_pk_key, &new_value_buf)
3056            .map_err(SqlError::Storage)?;
3057        if !inserted {
3058            return Err(SqlError::DuplicateKey);
3059        }
3060        wtx.table_delete(table_schema.name.as_bytes(), old_pk_key)
3061            .map_err(SqlError::Storage)?;
3062        for idx in &table_schema.indices {
3063            let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
3064            let old_idx_key =
3065                encode_index_key_with_schema(idx, old_row, &old_pk_values, table_schema);
3066            wtx.table_delete(&idx_table, &old_idx_key)
3067                .map_err(SqlError::Storage)?;
3068            let new_idx_key =
3069                encode_index_key_with_schema(idx, &new_row, &new_pk_values, table_schema);
3070            let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
3071            let is_new = wtx
3072                .table_insert(&idx_table, &new_idx_key, &new_idx_val)
3073                .map_err(SqlError::Storage)?;
3074            if idx.unique && !is_new {
3075                let any_null = idx
3076                    .column_positions_iter()
3077                    .any(|c| new_row[c as usize].is_null());
3078                if !any_null {
3079                    return Err(SqlError::UniqueViolation(idx.name.clone()));
3080                }
3081            }
3082        }
3083    } else {
3084        wtx.table_update_sorted(
3085            table_schema.name.as_bytes(),
3086            &[(old_pk_key, new_value_buf.as_slice())],
3087        )
3088        .map_err(SqlError::Storage)?;
3089        let col_map_partial = any_partial_index(table_schema).then(|| table_schema.column_map());
3090        for idx in &table_schema.indices {
3091            let cols_changed = index_columns_changed(idx, old_row, &new_row);
3092            let (del, ins) = partial_idx_update_actions(
3093                idx,
3094                old_row,
3095                &new_row,
3096                cols_changed,
3097                false,
3098                col_map_partial,
3099            );
3100            let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
3101            if del {
3102                let old_idx_key =
3103                    encode_index_key_with_schema(idx, old_row, &old_pk_values, table_schema);
3104                wtx.table_delete(&idx_table, &old_idx_key)
3105                    .map_err(SqlError::Storage)?;
3106            }
3107            if ins {
3108                let new_idx_key =
3109                    encode_index_key_with_schema(idx, &new_row, &new_pk_values, table_schema);
3110                let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
3111                let is_new = wtx
3112                    .table_insert(&idx_table, &new_idx_key, &new_idx_val)
3113                    .map_err(SqlError::Storage)?;
3114                if idx.unique && !is_new {
3115                    let any_null = idx
3116                        .column_positions_iter()
3117                        .any(|c| new_row[c as usize].is_null());
3118                    if !any_null {
3119                        return Err(SqlError::UniqueViolation(idx.name.clone()));
3120                    }
3121                }
3122            }
3123        }
3124    }
3125
3126    if capture_returning {
3127        Ok(InsertRowOutcome::Updated {
3128            old: old_row.to_vec(),
3129            new: new_row,
3130        })
3131    } else {
3132        Ok(InsertRowOutcome::Inserted)
3133    }
3134}
3135
3136fn detect_fast_paths(
3137    ts: &TableSchema,
3138    assignments: &[(usize, Expr)],
3139) -> Option<Vec<DoUpdateFastPath>> {
3140    let non_pk = ts.non_pk_indices();
3141    let enc_pos = ts.encoding_positions();
3142    let mut out = Vec::with_capacity(assignments.len());
3143    for (col_idx, expr) in assignments {
3144        let col = &ts.columns[*col_idx];
3145        if col.data_type != DataType::Integer {
3146            return None;
3147        }
3148        let nonpk_order = non_pk.iter().position(|&i| i == *col_idx)?;
3149        let phys_idx = enc_pos[nonpk_order] as usize;
3150
3151        if let Expr::BinaryOp { left, op, right } = expr {
3152            if !matches!(op, BinOp::Add | BinOp::Sub) {
3153                return None;
3154            }
3155            let reads_target =
3156                matches!(left.as_ref(), Expr::Column(n) if n.eq_ignore_ascii_case(&col.name));
3157            if !reads_target {
3158                return None;
3159            }
3160            if let Expr::Literal(Value::Integer(n)) = right.as_ref() {
3161                let delta = if matches!(op, BinOp::Sub) { -n } else { *n };
3162                let _ = col_idx;
3163                out.push(DoUpdateFastPath::IntAddConst { phys_idx, delta });
3164                continue;
3165            }
3166            return None;
3167        }
3168        return None;
3169    }
3170    Some(out)
3171}
3172
3173fn compile_on_conflict(oc: &OnConflictClause, ts: &TableSchema) -> Result<CompiledOnConflict> {
3174    let target = oc
3175        .target
3176        .as_ref()
3177        .map(|t| resolve_conflict_target(t, ts))
3178        .transpose()?;
3179    match &oc.action {
3180        OnConflictAction::DoNothing => Ok(CompiledOnConflict::DoNothing { target }),
3181        OnConflictAction::DoUpdate {
3182            assignments,
3183            where_clause,
3184        } => {
3185            let target = target.ok_or_else(|| {
3186                SqlError::Plan("ON CONFLICT without target requires DO NOTHING".into())
3187            })?;
3188            let compiled_assignments: Vec<(usize, Expr)> = assignments
3189                .iter()
3190                .map(|(name, expr)| {
3191                    let col_idx = ts
3192                        .column_index(name)
3193                        .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?;
3194                    Ok((col_idx, expr.clone()))
3195                })
3196                .collect::<Result<_>>()?;
3197            let fast_paths = if where_clause.is_none() {
3198                detect_fast_paths(ts, &compiled_assignments)
3199            } else {
3200                None
3201            };
3202            Ok(CompiledOnConflict::DoUpdate {
3203                target,
3204                assignments: compiled_assignments,
3205                where_clause: where_clause.clone(),
3206                fast_paths,
3207            })
3208        }
3209    }
3210}
3211
3212/// Caller MUST check `cache.is_trivial_fast` first.
3213fn exec_insert_trivial_fast(
3214    wtx: &mut WriteTxn<'_>,
3215    table_lower: &str,
3216    cache: &InsertCache,
3217    bufs: &mut InsertBufs,
3218    params: &[Value],
3219) -> Result<ExecutionResult> {
3220    let prog = cache
3221        .trivial_fast_program
3222        .as_ref()
3223        .expect("trivial fast: program");
3224
3225    for &p in &prog.not_null_param_indices {
3226        if params[p as usize].is_null() {
3227            return Err(SqlError::NotNullViolation(format!("param@{p}")));
3228        }
3229    }
3230
3231    match &params[prog.pk_param as usize] {
3232        Value::Integer(v) => crate::encoding::encode_int_key_into(*v, &mut bufs.key_buf),
3233        _ => return Err(SqlError::InvalidValue("non-integer PK in fast path".into())),
3234    }
3235
3236    bufs.value_buf.clear();
3237    bufs.value_buf.extend_from_slice(&prog.template);
3238
3239    for op in &prog.ops {
3240        match op {
3241            WriteOp::ParamI64 { param_idx, off } => match &params[*param_idx as usize] {
3242                Value::Integer(v) => {
3243                    let off = *off as usize;
3244                    bufs.value_buf[off..off + 8].copy_from_slice(&v.to_le_bytes());
3245                }
3246                other => {
3247                    return Err(SqlError::TypeMismatch {
3248                        expected: "Integer".into(),
3249                        got: other.data_type().to_string(),
3250                    });
3251                }
3252            },
3253            WriteOp::LiteralI64 { value, off } => {
3254                let off = *off as usize;
3255                bufs.value_buf[off..off + 8].copy_from_slice(&value.to_le_bytes());
3256            }
3257            WriteOp::GenAddParamsI64 {
3258                a_param,
3259                b_param,
3260                off,
3261                bitmap_byte_off,
3262                bitmap_bit_mask,
3263            } => match (&params[*a_param as usize], &params[*b_param as usize]) {
3264                (Value::Integer(a), Value::Integer(b)) => {
3265                    let off = *off as usize;
3266                    bufs.value_buf[off..off + 8].copy_from_slice(&a.wrapping_add(*b).to_le_bytes());
3267                }
3268                _ => {
3269                    bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
3270                }
3271            },
3272            WriteOp::GenMulAddParamI64 {
3273                param_idx,
3274                mul,
3275                add,
3276                off,
3277                bitmap_byte_off,
3278                bitmap_bit_mask,
3279            } => match &params[*param_idx as usize] {
3280                Value::Integer(v) => {
3281                    let r = v.wrapping_mul(*mul).wrapping_add(*add);
3282                    let off = *off as usize;
3283                    bufs.value_buf[off..off + 8].copy_from_slice(&r.to_le_bytes());
3284                }
3285                _ => {
3286                    bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
3287                }
3288            },
3289        }
3290    }
3291
3292    let is_new = wtx
3293        .table_insert(table_lower.as_bytes(), &bufs.key_buf, &bufs.value_buf)
3294        .map_err(SqlError::Storage)?;
3295    if !is_new {
3296        return Err(SqlError::DuplicateKey);
3297    }
3298    Ok(ExecutionResult::RowsAffected(1))
3299}
3300
3301fn build_bind_plan(
3302    stmt: &InsertStmt,
3303    col_indices: &[usize],
3304    col_data_types: &[DataType],
3305) -> Option<Vec<BindAction>> {
3306    let rows = match &stmt.source {
3307        InsertSource::Values(rows) => rows,
3308        _ => return None,
3309    };
3310    if rows.len() != 1 {
3311        return None;
3312    }
3313    let value_row = &rows[0];
3314    if value_row.len() != col_indices.len() {
3315        return None;
3316    }
3317    let mut plan = Vec::with_capacity(value_row.len());
3318    for (i, expr) in value_row.iter().enumerate() {
3319        let col_idx = col_indices[i];
3320        let target = col_data_types[col_idx];
3321        match expr {
3322            Expr::Parameter(n) => {
3323                if *n == 0 {
3324                    return None;
3325                }
3326                plan.push(BindAction::Param {
3327                    param_idx: n - 1,
3328                    col_idx,
3329                    target,
3330                });
3331            }
3332            Expr::Literal(v) => plan.push(BindAction::Literal {
3333                value: v.clone(),
3334                col_idx,
3335            }),
3336            _ => return None,
3337        }
3338    }
3339    Some(plan)
3340}
3341
3342impl CompiledInsert {
3343    pub fn try_compile(schema: &SchemaManager, stmt: &InsertStmt) -> Option<Self> {
3344        let lower = stmt.table.to_ascii_lowercase();
3345        let cached = if let Some(ts) = schema.get(&lower) {
3346            let insert_columns: Vec<&str> = if stmt.columns.is_empty() {
3347                ts.columns.iter().map(|c| c.name.as_str()).collect()
3348            } else {
3349                stmt.columns.iter().map(|s| s.as_str()).collect()
3350            };
3351            let mut col_indices = Vec::with_capacity(insert_columns.len());
3352            for name in &insert_columns {
3353                col_indices.push(ts.column_index(name)?);
3354            }
3355            if col_indices
3356                .iter()
3357                .any(|&ci| ts.columns[ci].generated_kind.is_some())
3358            {
3359                return None;
3360            }
3361            let on_conflict = stmt
3362                .on_conflict
3363                .as_ref()
3364                .map(|oc| compile_on_conflict(oc, ts))
3365                .transpose()
3366                .ok()
3367                .flatten()
3368                .map(Arc::new);
3369            let generated_col_positions: Vec<usize> = ts
3370                .columns
3371                .iter()
3372                .enumerate()
3373                .filter_map(|(i, c)| {
3374                    matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored))
3375                        .then_some(i)
3376                })
3377                .collect();
3378            let generated_fast_evals: Vec<FastGenEval> = generated_col_positions
3379                .iter()
3380                .map(|&pos| {
3381                    detect_fast_gen_eval(ts.columns[pos].generated_expr.as_ref().unwrap(), ts)
3382                })
3383                .collect();
3384            let row_col_map = if on_conflict.is_some() || !generated_col_positions.is_empty() {
3385                Some(ColumnMap::new(&ts.columns))
3386            } else {
3387                None
3388            };
3389            let pk_indices: Vec<usize> = ts.pk_indices().to_vec();
3390            let non_pk_indices: Vec<usize> = ts.non_pk_indices().to_vec();
3391            let encoding_positions: Vec<u16> = ts.encoding_positions().to_vec();
3392            let dropped_non_pk_slots: Vec<u16> = ts.dropped_non_pk_slots().to_vec();
3393            let phys_count = ts.physical_non_pk_count();
3394            let col_data_types: Vec<DataType> = ts.columns.iter().map(|c| c.data_type).collect();
3395            let single_int_pk =
3396                pk_indices.len() == 1 && ts.columns[pk_indices[0]].data_type == DataType::Integer;
3397            let not_null_indices: Vec<u16> = ts
3398                .columns
3399                .iter()
3400                .filter(|c| !c.nullable)
3401                .map(|c| c.position)
3402                .collect();
3403            let bind_plan = build_bind_plan(stmt, &col_indices, &col_data_types);
3404            let any_defaults_flag = ts.columns.iter().any(|c| c.default_expr.is_some());
3405            let row_fully_overwritten = if any_defaults_flag {
3406                false
3407            } else {
3408                let mut covered: rustc_hash::FxHashSet<usize> =
3409                    col_indices.iter().copied().collect();
3410                covered.extend(generated_col_positions.iter().copied());
3411                for (j, &i) in non_pk_indices.iter().enumerate() {
3412                    let _ = j;
3413                    if matches!(
3414                        ts.columns[i].generated_kind,
3415                        Some(crate::parser::GeneratedKind::Virtual)
3416                    ) {
3417                        covered.insert(i);
3418                    }
3419                }
3420                bind_plan.is_some() && covered.len() == ts.columns.len()
3421            };
3422            let has_fks = !ts.foreign_keys.is_empty();
3423            let has_indices = !ts.indices.is_empty();
3424            let mut non_virtual_pairs: Vec<(usize, usize)> = Vec::new();
3425            let mut null_value_slots: Vec<usize> =
3426                dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3427            for (j, &i) in non_pk_indices.iter().enumerate() {
3428                let slot = encoding_positions[j] as usize;
3429                if matches!(
3430                    ts.columns[i].generated_kind,
3431                    Some(crate::parser::GeneratedKind::Virtual)
3432                ) {
3433                    null_value_slots.push(slot);
3434                } else {
3435                    non_virtual_pairs.push((i, slot));
3436                }
3437            }
3438            let row_encoder = {
3439                let all_int_or_null = non_pk_indices.iter().enumerate().all(|(j, &i)| {
3440                    let col = &ts.columns[i];
3441                    if matches!(
3442                        col.generated_kind,
3443                        Some(crate::parser::GeneratedKind::Virtual)
3444                    ) {
3445                        true
3446                    } else {
3447                        col.data_type == DataType::Integer && encoding_positions[j] != u16::MAX
3448                    }
3449                });
3450                if all_int_or_null {
3451                    let mut null_slots: Vec<usize> =
3452                        dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3453                    for (j, &i) in non_pk_indices.iter().enumerate() {
3454                        if matches!(
3455                            ts.columns[i].generated_kind,
3456                            Some(crate::parser::GeneratedKind::Virtual)
3457                        ) {
3458                            null_slots.push(encoding_positions[j] as usize);
3459                        }
3460                    }
3461                    Some(crate::encoding::build_int_row_template(
3462                        phys_count,
3463                        &null_slots,
3464                    ))
3465                } else {
3466                    None
3467                }
3468            };
3469            let is_trivial_fast_eligible = !insert_has_subquery(stmt)
3470                && !ts.columns.iter().any(|c| c.default_expr.is_some())
3471                && !ts.has_checks()
3472                && !has_fks
3473                && !has_indices
3474                && stmt.on_conflict.is_none()
3475                && stmt.returning.is_none()
3476                && bind_plan.is_some()
3477                && row_encoder.is_some()
3478                && row_fully_overwritten
3479                && single_int_pk
3480                && generated_fast_evals
3481                    .iter()
3482                    .all(|fe| !matches!(fe, FastGenEval::None));
3483            let trivial_fast_program = if is_trivial_fast_eligible {
3484                build_trivial_fast_program(
3485                    bind_plan.as_ref().unwrap(),
3486                    row_encoder.as_ref().unwrap(),
3487                    &non_virtual_pairs,
3488                    &generated_col_positions,
3489                    &generated_fast_evals,
3490                    &pk_indices,
3491                    &ts.columns,
3492                )
3493            } else {
3494                None
3495            };
3496            let is_trivial_fast = trivial_fast_program.is_some();
3497            let has_checks = ts.has_checks();
3498            let any_defaults = ts.columns.iter().any(|c| c.default_expr.is_some());
3499            let needs_scoped_params = bind_plan.is_none()
3500                || has_checks
3501                || any_defaults
3502                || !generated_col_positions.is_empty()
3503                || on_conflict.is_some()
3504                || stmt.returning.is_some()
3505                || insert_has_subquery(stmt)
3506                || super::helpers::any_partial_index(ts);
3507            Some(InsertCache {
3508                col_indices,
3509                has_subquery: insert_has_subquery(stmt),
3510                any_defaults,
3511                has_checks,
3512                on_conflict,
3513                row_col_map,
3514                generated_col_positions,
3515                generated_fast_evals,
3516                pk_indices,
3517                non_pk_indices,
3518                encoding_positions,
3519                dropped_non_pk_slots,
3520                phys_count,
3521                single_int_pk,
3522                not_null_indices,
3523                bind_plan,
3524                row_fully_overwritten,
3525                row_encoder,
3526                is_trivial_fast,
3527                trivial_fast_program,
3528                needs_scoped_params,
3529            })
3530        } else if schema.get_view(&lower).is_some() {
3531            None
3532        } else {
3533            return None;
3534        };
3535        Some(Self {
3536            table_lower: lower,
3537            cached,
3538        })
3539    }
3540}
3541
3542impl CompiledPlan for CompiledInsert {
3543    fn execute(
3544        &self,
3545        db: &Database,
3546        schema: &SchemaManager,
3547        stmt: &Statement,
3548        params: &[Value],
3549        txn: super::compile::ActiveTxnRef<'_, '_>,
3550    ) -> Result<ExecutionResult> {
3551        let ins = match stmt {
3552            Statement::Insert(i) => i,
3553            _ => {
3554                return Err(SqlError::Unsupported(
3555                    "CompiledInsert received non-INSERT statement".into(),
3556                ))
3557            }
3558        };
3559        use super::compile::ActiveTxnRef;
3560        match txn {
3561            ActiveTxnRef::None => exec_insert(db, schema, ins, params),
3562            ActiveTxnRef::Read(_) => Err(SqlError::Unsupported(
3563                "cannot execute mutating statement inside a read-only transaction".into(),
3564            )),
3565            ActiveTxnRef::Write(outer) => match self.cached.as_ref() {
3566                Some(c) if c.is_trivial_fast => with_insert_scratch(|bufs| {
3567                    exec_insert_trivial_fast(outer, &self.table_lower, c, bufs, params)
3568                }),
3569                Some(c) => exec_insert_in_txn_cached(outer, schema, ins, params, c),
3570                None => exec_insert_in_txn(outer, schema, ins, params),
3571            },
3572        }
3573    }
3574
3575    fn uses_scoped_params(&self) -> bool {
3576        match self.cached.as_ref() {
3577            Some(c) => !c.is_trivial_fast && c.needs_scoped_params,
3578            None => true,
3579        }
3580    }
3581}
3582
3583pub struct CompiledDelete {
3584    table_lower: String,
3585}
3586
3587impl CompiledDelete {
3588    pub fn try_compile(schema: &SchemaManager, stmt: &DeleteStmt) -> Option<Self> {
3589        let lower = stmt.table.to_ascii_lowercase();
3590        schema.get(&lower)?;
3591        Some(Self { table_lower: lower })
3592    }
3593}
3594
3595impl CompiledPlan for CompiledDelete {
3596    fn execute(
3597        &self,
3598        db: &Database,
3599        schema: &SchemaManager,
3600        stmt: &Statement,
3601        _params: &[Value],
3602        txn: super::compile::ActiveTxnRef<'_, '_>,
3603    ) -> Result<ExecutionResult> {
3604        let del = match stmt {
3605            Statement::Delete(d) => d,
3606            _ => {
3607                return Err(SqlError::Unsupported(
3608                    "CompiledDelete received non-DELETE statement".into(),
3609                ))
3610            }
3611        };
3612        let _ = &self.table_lower;
3613        use super::compile::ActiveTxnRef;
3614        match txn {
3615            ActiveTxnRef::None => super::write::exec_delete(db, schema, del),
3616            ActiveTxnRef::Read(_) => Err(SqlError::Unsupported(
3617                "cannot execute mutating statement inside a read-only transaction".into(),
3618            )),
3619            ActiveTxnRef::Write(outer) => super::write::exec_delete_in_txn(outer, schema, del),
3620        }
3621    }
3622}
3623
3624fn exec_instead_of_view_insert_auto(
3625    db: &Database,
3626    schema: &SchemaManager,
3627    view_name: &str,
3628    aliases: &[String],
3629    stmt: &InsertStmt,
3630    params: &[Value],
3631) -> Result<ExecutionResult> {
3632    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
3633    let r = exec_instead_of_view_insert_in_txn(&mut wtx, schema, view_name, aliases, stmt, params)?;
3634    wtx.commit().map_err(SqlError::Storage)?;
3635    Ok(r)
3636}
3637
3638fn exec_instead_of_view_insert_in_txn(
3639    wtx: &mut WriteTxn<'_>,
3640    schema: &SchemaManager,
3641    view_name: &str,
3642    aliases: &[String],
3643    stmt: &InsertStmt,
3644    params: &[Value],
3645) -> Result<ExecutionResult> {
3646    // CREATE VIEW without explicit aliases stores an empty vec; derive at runtime.
3647    let resolved_aliases: Vec<String> = if aliases.is_empty() {
3648        derive_view_columns(wtx, schema, view_name)?
3649    } else {
3650        aliases.to_vec()
3651    };
3652    let view_cols = super::triggers::view_columns_from_aliases(&resolved_aliases);
3653    let alias_map: rustc_hash::FxHashMap<String, usize> = resolved_aliases
3654        .iter()
3655        .enumerate()
3656        .map(|(i, name)| (name.to_ascii_lowercase(), i))
3657        .collect();
3658
3659    let target_positions: Vec<usize> = if stmt.columns.is_empty() {
3660        (0..resolved_aliases.len()).collect()
3661    } else {
3662        stmt.columns
3663            .iter()
3664            .map(|c| {
3665                alias_map
3666                    .get(&c.to_ascii_lowercase())
3667                    .copied()
3668                    .ok_or_else(|| SqlError::ColumnNotFound(c.clone()))
3669            })
3670            .collect::<Result<_>>()?
3671    };
3672
3673    let source_rows: Vec<Vec<Value>> = match &stmt.source {
3674        InsertSource::Values(rows) => {
3675            let mut out = Vec::with_capacity(rows.len());
3676            for row in rows {
3677                if row.len() != target_positions.len() {
3678                    return Err(SqlError::InvalidValue(format!(
3679                        "expected {} values, got {}",
3680                        target_positions.len(),
3681                        row.len()
3682                    )));
3683                }
3684                let mut vals = Vec::with_capacity(row.len());
3685                for expr in row {
3686                    let v = match expr {
3687                        Expr::Parameter(n) => params
3688                            .get(n - 1)
3689                            .cloned()
3690                            .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
3691                        Expr::Literal(v) => v.clone(),
3692                        other => eval_const_expr(other)?,
3693                    };
3694                    vals.push(v);
3695                }
3696                out.push(vals);
3697            }
3698            out
3699        }
3700        InsertSource::Select(sq) => {
3701            let empty_ctes = CteContext::default();
3702            let qr = exec_query_body_write(wtx, schema, &sq.body, &empty_ctes)?;
3703            qr.rows
3704        }
3705    };
3706
3707    let mut count: u64 = 0;
3708    for row in source_rows {
3709        if row.len() != target_positions.len() {
3710            return Err(SqlError::InvalidValue(format!(
3711                "expected {} values, got {}",
3712                target_positions.len(),
3713                row.len()
3714            )));
3715        }
3716        let mut new_row = vec![Value::Null; resolved_aliases.len()];
3717        for (slot, val) in target_positions.iter().zip(row) {
3718            new_row[*slot] = val;
3719        }
3720        super::triggers::fire_row_triggers(
3721            wtx,
3722            schema,
3723            view_name,
3724            crate::parser::TriggerTiming::InsteadOf,
3725            super::triggers::FireEvent::Insert,
3726            None,
3727            Some(new_row),
3728            &view_cols,
3729        )?;
3730        count += 1;
3731    }
3732    Ok(ExecutionResult::RowsAffected(count))
3733}
3734
3735fn derive_view_columns(
3736    wtx: &mut WriteTxn<'_>,
3737    schema: &SchemaManager,
3738    view_name: &str,
3739) -> Result<Vec<String>> {
3740    use crate::parser::{QueryBody, SelectColumn, SelectQuery, SelectStmt};
3741    let sel = SelectStmt {
3742        columns: vec![SelectColumn::AllColumns],
3743        from: view_name.to_string(),
3744        from_alias: None,
3745        from_subquery: None,
3746        from_args: None,
3747        from_json_table: None,
3748        joins: vec![],
3749        distinct: false,
3750        where_clause: None,
3751        order_by: vec![],
3752        limit: Some(Expr::Literal(Value::Integer(1))),
3753        offset: None,
3754        group_by: vec![],
3755        having: None,
3756    };
3757    let sq = SelectQuery {
3758        ctes: vec![],
3759        recursive: false,
3760        body: QueryBody::Select(Box::new(sel)),
3761    };
3762    let qr = super::cte::exec_select_query_in_txn(wtx, schema, &sq)?;
3763    match qr {
3764        ExecutionResult::Query(q) => Ok(q.columns),
3765        _ => Ok(Vec::new()),
3766    }
3767}
3768
3769#[cfg(test)]
3770#[path = "dml_tests.rs"]
3771mod tests;