Skip to main content

citadel_sql/executor/
dml.rs

1use std::cell::RefCell;
2use std::sync::Arc;
3
4use citadel::Database;
5use citadel_buffer::btree::{UpsertAction, UpsertOutcome};
6use citadel_txn::write_txn::WriteTxn;
7use rustc_hash::FxHashMap;
8
9use crate::encoding::{encode_composite_key_into, encode_row_into};
10use crate::error::{Result, SqlError};
11use crate::eval::{eval_expr, is_truthy, ColumnMap, EvalCtx};
12use crate::parser::*;
13use crate::types::*;
14
15use crate::schema::SchemaManager;
16
17use super::compile::CompiledPlan;
18use super::helpers::*;
19use super::CteContext;
20
21pub(super) fn exec_insert(
22    db: &Database,
23    schema: &SchemaManager,
24    stmt: &InsertStmt,
25    params: &[Value],
26) -> Result<ExecutionResult> {
27    let empty_ctes = CteContext::default();
28    let materialized;
29    let stmt = if insert_has_subquery(stmt) {
30        materialized = materialize_insert(stmt, &mut |sub| {
31            exec_subquery_read(db, schema, sub, &empty_ctes)
32        })?;
33        &materialized
34    } else {
35        stmt
36    };
37
38    let lower_name = stmt.table.to_ascii_lowercase();
39    if schema.get_view(&lower_name).is_some() {
40        return Err(SqlError::CannotModifyView(stmt.table.clone()));
41    }
42    let table_schema = schema
43        .get(&lower_name)
44        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
45
46    let insert_columns = if stmt.columns.is_empty() {
47        table_schema
48            .columns
49            .iter()
50            .map(|c| c.name.clone())
51            .collect::<Vec<_>>()
52    } else {
53        stmt.columns
54            .iter()
55            .map(|c| c.to_ascii_lowercase())
56            .collect()
57    };
58
59    let col_indices: Vec<usize> = insert_columns
60        .iter()
61        .map(|name| {
62            table_schema
63                .column_index(name)
64                .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
65        })
66        .collect::<Result<_>>()?;
67
68    for &ci in &col_indices {
69        if table_schema.columns[ci].generated_kind.is_some() {
70            return Err(SqlError::CannotInsertIntoGeneratedColumn(
71                table_schema.columns[ci].name.clone(),
72            ));
73        }
74    }
75
76    let defaults: Vec<(usize, &Expr)> = table_schema
77        .columns
78        .iter()
79        .filter(|c| c.default_expr.is_some() && !col_indices.contains(&(c.position as usize)))
80        .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
81        .collect();
82
83    let generated_cols: Vec<(usize, &Expr)> = table_schema
84        .columns
85        .iter()
86        .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
87        .map(|c| (c.position as usize, c.generated_expr.as_ref().unwrap()))
88        .collect();
89
90    let has_checks = table_schema.has_checks();
91    let strict = table_schema.is_strict();
92    let row_col_map_for_gen = if !generated_cols.is_empty() {
93        Some(ColumnMap::new(&table_schema.columns))
94    } else {
95        None
96    };
97    let check_col_map = if has_checks {
98        Some(ColumnMap::new(&table_schema.columns))
99    } else {
100        None
101    };
102
103    let select_rows = match &stmt.source {
104        InsertSource::Select(sq) => {
105            let insert_ctes =
106                super::materialize_all_ctes(&sq.ctes, sq.recursive, &mut |body, ctx| {
107                    exec_query_body_read(db, schema, body, ctx)
108                })?;
109            let qr = exec_query_body_read(db, schema, &sq.body, &insert_ctes)?;
110            Some(qr.rows)
111        }
112        InsertSource::Values(_) => None,
113    };
114
115    let compiled_conflict: Option<Arc<CompiledOnConflict>> = stmt
116        .on_conflict
117        .as_ref()
118        .map(|oc| compile_on_conflict(oc, table_schema).map(Arc::new))
119        .transpose()?;
120
121    let row_col_map = compiled_conflict
122        .as_ref()
123        .map(|_| ColumnMap::new(&table_schema.columns));
124
125    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
126    let mut count: u64 = 0;
127    let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
128        stmt.returning.as_ref().map(|_| Vec::new());
129
130    let pk_indices = table_schema.pk_indices();
131    let non_pk = table_schema.non_pk_indices();
132    let enc_pos = table_schema.encoding_positions();
133    let phys_count = table_schema.physical_non_pk_count();
134    let mut row = vec![Value::Null; table_schema.columns.len()];
135    let mut pk_values: Vec<Value> = vec![Value::Null; pk_indices.len()];
136    let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
137    let mut key_buf: Vec<u8> = Vec::with_capacity(64);
138    let mut value_buf: Vec<u8> = Vec::with_capacity(256);
139    let mut fk_key_buf: Vec<u8> = Vec::with_capacity(64);
140
141    let values = match &stmt.source {
142        InsertSource::Values(rows) => Some(rows.as_slice()),
143        InsertSource::Select(_) => None,
144    };
145    let sel_rows = select_rows.as_deref();
146
147    let total = match (values, sel_rows) {
148        (Some(rows), _) => rows.len(),
149        (_, Some(rows)) => rows.len(),
150        _ => 0,
151    };
152
153    if let Some(sel) = sel_rows {
154        if !sel.is_empty() && sel[0].len() != insert_columns.len() {
155            return Err(SqlError::InvalidValue(format!(
156                "INSERT ... SELECT column count mismatch: expected {}, got {}",
157                insert_columns.len(),
158                sel[0].len()
159            )));
160        }
161    }
162
163    for idx in 0..total {
164        for v in row.iter_mut() {
165            *v = Value::Null;
166        }
167
168        if let Some(value_rows) = values {
169            let value_row = &value_rows[idx];
170            if value_row.len() != insert_columns.len() {
171                return Err(SqlError::InvalidValue(format!(
172                    "expected {} values, got {}",
173                    insert_columns.len(),
174                    value_row.len()
175                )));
176            }
177            for (i, expr) in value_row.iter().enumerate() {
178                let val = if let Expr::Parameter(n) = expr {
179                    params
180                        .get(n - 1)
181                        .cloned()
182                        .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?
183                } else {
184                    eval_const_expr(expr)?
185                };
186                let col_idx = col_indices[i];
187                let col = &table_schema.columns[col_idx];
188                row[col_idx] = if val.is_null() {
189                    Value::Null
190                } else {
191                    coerce_for_column(val, col, strict)?
192                };
193            }
194        } else if let Some(sel) = sel_rows {
195            let sel_row = &sel[idx];
196            for (i, val) in sel_row.iter().enumerate() {
197                let col_idx = col_indices[i];
198                let col = &table_schema.columns[col_idx];
199                row[col_idx] = if val.is_null() {
200                    Value::Null
201                } else {
202                    coerce_for_column(val.clone(), col, strict)?
203                };
204            }
205        }
206
207        for &(pos, def_expr) in &defaults {
208            let val = eval_const_expr(def_expr)?;
209            let col = &table_schema.columns[pos];
210            if !val.is_null() {
211                row[pos] = coerce_for_column(val, col, strict)?;
212            }
213        }
214
215        if let Some(ref gen_map) = row_col_map_for_gen {
216            for &(pos, gen_expr) in &generated_cols {
217                let val = eval_expr(gen_expr, &EvalCtx::new(gen_map, &row))?;
218                let col = &table_schema.columns[pos];
219                row[pos] = if val.is_null() {
220                    Value::Null
221                } else {
222                    coerce_for_column(val, col, strict)?
223                };
224            }
225        }
226
227        for col in &table_schema.columns {
228            if !col.nullable && row[col.position as usize].is_null() {
229                return Err(SqlError::NotNullViolation(col.name.clone()));
230            }
231        }
232
233        if let Some(ref col_map) = check_col_map {
234            for col in &table_schema.columns {
235                if let Some(ref check) = col.check_expr {
236                    let result = eval_expr(check, &EvalCtx::new(col_map, &row))?;
237                    if !is_truthy(&result) && !result.is_null() {
238                        let name = col.check_name.as_deref().unwrap_or(&col.name);
239                        return Err(SqlError::CheckViolation(name.to_string()));
240                    }
241                }
242            }
243            for tc in &table_schema.check_constraints {
244                let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &row))?;
245                if !is_truthy(&result) && !result.is_null() {
246                    let name = tc.name.as_deref().unwrap_or(&tc.sql);
247                    return Err(SqlError::CheckViolation(name.to_string()));
248                }
249            }
250        }
251
252        for fk in &table_schema.foreign_keys {
253            let any_null = fk.columns.iter().any(|&ci| row[ci as usize].is_null());
254            if any_null {
255                continue; // MATCH SIMPLE: skip if any FK col is NULL
256            }
257            let fk_vals: Vec<Value> = fk
258                .columns
259                .iter()
260                .map(|&ci| row[ci as usize].clone())
261                .collect();
262            fk_key_buf.clear();
263            encode_composite_key_into(&fk_vals, &mut fk_key_buf);
264            if fk.deferrable && fk.initially_deferred {
265                let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
266                wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
267                    fk_name: name,
268                    foreign_table: fk.foreign_table.as_bytes().to_vec(),
269                    parent_key: fk_key_buf.clone(),
270                });
271                continue;
272            }
273            if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key_buf) {
274                let found = wtx
275                    .table_get(fk.foreign_table.as_bytes(), &fk_key_buf)
276                    .map_err(SqlError::Storage)?;
277                if found.is_none() {
278                    let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
279                    return Err(SqlError::ForeignKeyViolation(name.to_string()));
280                }
281                wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key_buf);
282            }
283        }
284
285        let proposed_row_for_returning: Option<Vec<Value>> =
286            returning_rows.as_ref().map(|_| row.clone());
287
288        for (j, &i) in pk_indices.iter().enumerate() {
289            pk_values[j] = std::mem::replace(&mut row[i], Value::Null);
290        }
291        encode_composite_key_into(&pk_values, &mut key_buf);
292
293        for (j, &i) in non_pk.iter().enumerate() {
294            let col = &table_schema.columns[i];
295            if matches!(
296                col.generated_kind,
297                Some(crate::parser::GeneratedKind::Virtual)
298            ) {
299                value_values[enc_pos[j] as usize] = Value::Null;
300                row[i] = Value::Null;
301            } else {
302                value_values[enc_pos[j] as usize] = std::mem::replace(&mut row[i], Value::Null);
303            }
304        }
305        encode_row_into(&value_values, &mut value_buf);
306
307        if key_buf.len() > citadel_core::MAX_KEY_SIZE {
308            return Err(SqlError::KeyTooLarge {
309                size: key_buf.len(),
310                max: citadel_core::MAX_KEY_SIZE,
311            });
312        }
313        if value_buf.len() > citadel_core::MAX_VALUE_SIZE {
314            return Err(SqlError::RowTooLarge {
315                size: value_buf.len(),
316                max: citadel_core::MAX_VALUE_SIZE,
317            });
318        }
319
320        match compiled_conflict.as_ref() {
321            None => {
322                let is_new = wtx
323                    .table_insert(stmt.table.as_bytes(), &key_buf, &value_buf)
324                    .map_err(SqlError::Storage)?;
325                if !is_new {
326                    return Err(SqlError::DuplicateKey);
327                }
328                if !table_schema.indices.is_empty() {
329                    for (j, &i) in pk_indices.iter().enumerate() {
330                        row[i] = pk_values[j].clone();
331                    }
332                    for (j, &i) in non_pk.iter().enumerate() {
333                        row[i] =
334                            std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
335                    }
336                    insert_index_entries(&mut wtx, table_schema, &row, &pk_values)?;
337                }
338                count += 1;
339                if let Some(buf) = returning_rows.as_mut() {
340                    buf.push((None, proposed_row_for_returning));
341                }
342            }
343            Some(oc) => {
344                let oc_ref: &CompiledOnConflict = oc;
345                let needs_row = upsert_needs_row(oc_ref, table_schema);
346                if needs_row {
347                    for (j, &i) in pk_indices.iter().enumerate() {
348                        row[i] = pk_values[j].clone();
349                    }
350                    for (j, &i) in non_pk.iter().enumerate() {
351                        row[i] =
352                            std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
353                    }
354                }
355                let outcome = apply_insert_with_conflict(
356                    &mut wtx,
357                    table_schema,
358                    &key_buf,
359                    &value_buf,
360                    &row,
361                    &pk_values,
362                    oc_ref,
363                    row_col_map.as_ref().unwrap(),
364                    stmt.returning.is_some(),
365                )?;
366                match outcome {
367                    InsertRowOutcome::Inserted => {
368                        count += 1;
369                        if let Some(buf) = returning_rows.as_mut() {
370                            buf.push((None, proposed_row_for_returning));
371                        }
372                    }
373                    InsertRowOutcome::Updated { old, new } => {
374                        count += 1;
375                        if let Some(buf) = returning_rows.as_mut() {
376                            buf.push((Some(old), Some(new)));
377                        }
378                    }
379                    InsertRowOutcome::Skipped => {}
380                }
381            }
382        }
383    }
384
385    if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
386        let qr = super::helpers::project_returning(table_schema, returning_cols, &rows)?;
387        super::helpers::drain_deferred_fk_checks(&mut wtx)?;
388        wtx.commit().map_err(SqlError::Storage)?;
389        return Ok(ExecutionResult::Query(qr));
390    }
391
392    super::helpers::drain_deferred_fk_checks(&mut wtx)?;
393    wtx.commit().map_err(SqlError::Storage)?;
394    Ok(ExecutionResult::RowsAffected(count))
395}
396
397pub(super) fn has_subquery(expr: &Expr) -> bool {
398    crate::parser::has_subquery(expr)
399}
400
401pub(super) fn stmt_has_subquery(stmt: &SelectStmt) -> bool {
402    if let Some(ref w) = stmt.where_clause {
403        if has_subquery(w) {
404            return true;
405        }
406    }
407    if let Some(ref h) = stmt.having {
408        if has_subquery(h) {
409            return true;
410        }
411    }
412    for col in &stmt.columns {
413        if let SelectColumn::Expr { expr, .. } = col {
414            if has_subquery(expr) {
415                return true;
416            }
417        }
418    }
419    for ob in &stmt.order_by {
420        if has_subquery(&ob.expr) {
421            return true;
422        }
423    }
424    for join in &stmt.joins {
425        if let Some(ref on_expr) = join.on_clause {
426            if has_subquery(on_expr) {
427                return true;
428            }
429        }
430    }
431    false
432}
433
434pub(super) fn materialize_expr(
435    expr: &Expr,
436    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
437) -> Result<Expr> {
438    match expr {
439        Expr::InSubquery {
440            expr: e,
441            subquery,
442            negated,
443        } => {
444            let inner = materialize_expr(e, exec_sub)?;
445            let qr = exec_sub(subquery)?;
446            if !qr.columns.is_empty() && qr.columns.len() != 1 {
447                return Err(SqlError::SubqueryMultipleColumns);
448            }
449            let mut values = rustc_hash::FxHashSet::default();
450            let mut has_null = false;
451            for row in &qr.rows {
452                if row[0].is_null() {
453                    has_null = true;
454                } else {
455                    values.insert(row[0].clone());
456                }
457            }
458            Ok(Expr::InSet {
459                expr: Box::new(inner),
460                values,
461                has_null,
462                negated: *negated,
463            })
464        }
465        Expr::ScalarSubquery(subquery) => {
466            let qr = exec_sub(subquery)?;
467            if qr.rows.len() > 1 {
468                return Err(SqlError::SubqueryMultipleRows);
469            }
470            let val = if qr.rows.is_empty() {
471                Value::Null
472            } else {
473                qr.rows[0][0].clone()
474            };
475            Ok(Expr::Literal(val))
476        }
477        Expr::Exists { subquery, negated } => {
478            let qr = exec_sub(subquery)?;
479            let exists = !qr.rows.is_empty();
480            let result = if *negated { !exists } else { exists };
481            Ok(Expr::Literal(Value::Boolean(result)))
482        }
483        Expr::InList {
484            expr: e,
485            list,
486            negated,
487        } => {
488            let inner = materialize_expr(e, exec_sub)?;
489            let items = list
490                .iter()
491                .map(|item| materialize_expr(item, exec_sub))
492                .collect::<Result<Vec<_>>>()?;
493            Ok(Expr::InList {
494                expr: Box::new(inner),
495                list: items,
496                negated: *negated,
497            })
498        }
499        Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
500            left: Box::new(materialize_expr(left, exec_sub)?),
501            op: *op,
502            right: Box::new(materialize_expr(right, exec_sub)?),
503        }),
504        Expr::UnaryOp { op, expr: e } => Ok(Expr::UnaryOp {
505            op: *op,
506            expr: Box::new(materialize_expr(e, exec_sub)?),
507        }),
508        Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(materialize_expr(e, exec_sub)?))),
509        Expr::IsNotNull(e) => Ok(Expr::IsNotNull(Box::new(materialize_expr(e, exec_sub)?))),
510        Expr::InSet {
511            expr: e,
512            values,
513            has_null,
514            negated,
515        } => Ok(Expr::InSet {
516            expr: Box::new(materialize_expr(e, exec_sub)?),
517            values: values.clone(),
518            has_null: *has_null,
519            negated: *negated,
520        }),
521        Expr::Between {
522            expr: e,
523            low,
524            high,
525            negated,
526        } => Ok(Expr::Between {
527            expr: Box::new(materialize_expr(e, exec_sub)?),
528            low: Box::new(materialize_expr(low, exec_sub)?),
529            high: Box::new(materialize_expr(high, exec_sub)?),
530            negated: *negated,
531        }),
532        Expr::Like {
533            expr: e,
534            pattern,
535            escape,
536            negated,
537        } => {
538            let esc = escape
539                .as_ref()
540                .map(|es| materialize_expr(es, exec_sub).map(Box::new))
541                .transpose()?;
542            Ok(Expr::Like {
543                expr: Box::new(materialize_expr(e, exec_sub)?),
544                pattern: Box::new(materialize_expr(pattern, exec_sub)?),
545                escape: esc,
546                negated: *negated,
547            })
548        }
549        Expr::Case {
550            operand,
551            conditions,
552            else_result,
553        } => {
554            let op = operand
555                .as_ref()
556                .map(|e| materialize_expr(e, exec_sub).map(Box::new))
557                .transpose()?;
558            let conds = conditions
559                .iter()
560                .map(|(c, r)| {
561                    Ok((
562                        materialize_expr(c, exec_sub)?,
563                        materialize_expr(r, exec_sub)?,
564                    ))
565                })
566                .collect::<Result<Vec<_>>>()?;
567            let else_r = else_result
568                .as_ref()
569                .map(|e| materialize_expr(e, exec_sub).map(Box::new))
570                .transpose()?;
571            Ok(Expr::Case {
572                operand: op,
573                conditions: conds,
574                else_result: else_r,
575            })
576        }
577        Expr::Coalesce(args) => {
578            let materialized = args
579                .iter()
580                .map(|a| materialize_expr(a, exec_sub))
581                .collect::<Result<Vec<_>>>()?;
582            Ok(Expr::Coalesce(materialized))
583        }
584        Expr::Cast { expr: e, data_type } => Ok(Expr::Cast {
585            expr: Box::new(materialize_expr(e, exec_sub)?),
586            data_type: *data_type,
587        }),
588        Expr::Function {
589            name,
590            args,
591            distinct,
592        } => {
593            let materialized = args
594                .iter()
595                .map(|a| materialize_expr(a, exec_sub))
596                .collect::<Result<Vec<_>>>()?;
597            Ok(Expr::Function {
598                name: name.clone(),
599                args: materialized,
600                distinct: *distinct,
601            })
602        }
603        other => Ok(other.clone()),
604    }
605}
606
607pub(super) fn materialize_stmt(
608    stmt: &SelectStmt,
609    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
610) -> Result<SelectStmt> {
611    let where_clause = stmt
612        .where_clause
613        .as_ref()
614        .map(|e| materialize_expr(e, exec_sub))
615        .transpose()?;
616    let having = stmt
617        .having
618        .as_ref()
619        .map(|e| materialize_expr(e, exec_sub))
620        .transpose()?;
621    let columns = stmt
622        .columns
623        .iter()
624        .map(|c| match c {
625            SelectColumn::AllColumns => Ok(SelectColumn::AllColumns),
626            SelectColumn::AllFromOld => Ok(SelectColumn::AllFromOld),
627            SelectColumn::AllFromNew => Ok(SelectColumn::AllFromNew),
628            SelectColumn::Expr { expr, alias } => Ok(SelectColumn::Expr {
629                expr: materialize_expr(expr, exec_sub)?,
630                alias: alias.clone(),
631            }),
632        })
633        .collect::<Result<Vec<_>>>()?;
634    let order_by = stmt
635        .order_by
636        .iter()
637        .map(|ob| {
638            Ok(OrderByItem {
639                expr: materialize_expr(&ob.expr, exec_sub)?,
640                descending: ob.descending,
641                nulls_first: ob.nulls_first,
642            })
643        })
644        .collect::<Result<Vec<_>>>()?;
645    let joins = stmt
646        .joins
647        .iter()
648        .map(|j| {
649            let on_clause = j
650                .on_clause
651                .as_ref()
652                .map(|e| materialize_expr(e, exec_sub))
653                .transpose()?;
654            Ok(JoinClause {
655                join_type: j.join_type,
656                table: j.table.clone(),
657                subquery: j.subquery.clone(),
658                on_clause,
659            })
660        })
661        .collect::<Result<Vec<_>>>()?;
662    let group_by = stmt
663        .group_by
664        .iter()
665        .map(|e| materialize_expr(e, exec_sub))
666        .collect::<Result<Vec<_>>>()?;
667    Ok(SelectStmt {
668        columns,
669        from: stmt.from.clone(),
670        from_alias: stmt.from_alias.clone(),
671        from_subquery: stmt.from_subquery.clone(),
672        from_args: stmt.from_args.clone(),
673        from_json_table: stmt.from_json_table.clone(),
674        joins,
675        distinct: stmt.distinct,
676        where_clause,
677        order_by,
678        limit: stmt.limit.clone(),
679        offset: stmt.offset.clone(),
680        group_by,
681        having,
682    })
683}
684
685pub(super) fn exec_subquery_read(
686    db: &Database,
687    schema: &SchemaManager,
688    stmt: &SelectStmt,
689    ctes: &CteContext,
690) -> Result<QueryResult> {
691    match super::exec_select(db, schema, stmt, ctes)? {
692        ExecutionResult::Query(qr) => Ok(qr),
693        _ => Ok(QueryResult {
694            columns: vec![],
695            rows: vec![],
696        }),
697    }
698}
699
700pub(super) fn exec_subquery_write(
701    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
702    schema: &SchemaManager,
703    stmt: &SelectStmt,
704    ctes: &CteContext,
705) -> Result<QueryResult> {
706    match super::exec_select_in_txn(wtx, schema, stmt, ctes)? {
707        ExecutionResult::Query(qr) => Ok(qr),
708        _ => Ok(QueryResult {
709            columns: vec![],
710            rows: vec![],
711        }),
712    }
713}
714
715pub(super) fn update_has_subquery(stmt: &UpdateStmt) -> bool {
716    stmt.where_clause.as_ref().is_some_and(has_subquery)
717        || stmt.assignments.iter().any(|(_, e)| has_subquery(e))
718}
719
720pub(super) fn materialize_update(
721    stmt: &UpdateStmt,
722    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
723) -> Result<UpdateStmt> {
724    let where_clause = stmt
725        .where_clause
726        .as_ref()
727        .map(|e| materialize_expr(e, exec_sub))
728        .transpose()?;
729    let assignments = stmt
730        .assignments
731        .iter()
732        .map(|(name, expr)| Ok((name.clone(), materialize_expr(expr, exec_sub)?)))
733        .collect::<Result<Vec<_>>>()?;
734    Ok(UpdateStmt {
735        table: stmt.table.clone(),
736        assignments,
737        where_clause,
738        returning: stmt.returning.clone(),
739    })
740}
741
742pub(super) fn delete_has_subquery(stmt: &DeleteStmt) -> bool {
743    stmt.where_clause.as_ref().is_some_and(has_subquery)
744}
745
746pub(super) fn materialize_delete(
747    stmt: &DeleteStmt,
748    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
749) -> Result<DeleteStmt> {
750    let where_clause = stmt
751        .where_clause
752        .as_ref()
753        .map(|e| materialize_expr(e, exec_sub))
754        .transpose()?;
755    Ok(DeleteStmt {
756        table: stmt.table.clone(),
757        where_clause,
758        returning: stmt.returning.clone(),
759    })
760}
761
762pub(super) fn insert_has_subquery(stmt: &InsertStmt) -> bool {
763    match &stmt.source {
764        InsertSource::Values(rows) => rows.iter().any(|row| row.iter().any(has_subquery)),
765        // SELECT source subqueries are handled by exec_select's correlated/non-correlated paths
766        InsertSource::Select(_) => false,
767    }
768}
769
770pub(super) fn materialize_insert(
771    stmt: &InsertStmt,
772    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
773) -> Result<InsertStmt> {
774    let source = match &stmt.source {
775        InsertSource::Values(rows) => {
776            let mat = rows
777                .iter()
778                .map(|row| {
779                    row.iter()
780                        .map(|e| materialize_expr(e, exec_sub))
781                        .collect::<Result<Vec<_>>>()
782                })
783                .collect::<Result<Vec<_>>>()?;
784            InsertSource::Values(mat)
785        }
786        InsertSource::Select(sq) => {
787            let ctes = sq
788                .ctes
789                .iter()
790                .map(|c| {
791                    Ok(CteDefinition {
792                        name: c.name.clone(),
793                        column_aliases: c.column_aliases.clone(),
794                        body: materialize_query_body(&c.body, exec_sub)?,
795                    })
796                })
797                .collect::<Result<Vec<_>>>()?;
798            let body = materialize_query_body(&sq.body, exec_sub)?;
799            InsertSource::Select(Box::new(SelectQuery {
800                ctes,
801                recursive: sq.recursive,
802                body,
803            }))
804        }
805    };
806    Ok(InsertStmt {
807        table: stmt.table.clone(),
808        columns: stmt.columns.clone(),
809        source,
810        on_conflict: stmt.on_conflict.clone(),
811        returning: stmt.returning.clone(),
812    })
813}
814
815pub(super) fn materialize_query_body(
816    body: &QueryBody,
817    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
818) -> Result<QueryBody> {
819    match body {
820        QueryBody::Select(sel) => Ok(QueryBody::Select(Box::new(materialize_stmt(
821            sel, exec_sub,
822        )?))),
823        QueryBody::Compound(comp) => Ok(QueryBody::Compound(Box::new(CompoundSelect {
824            op: comp.op.clone(),
825            all: comp.all,
826            left: Box::new(materialize_query_body(&comp.left, exec_sub)?),
827            right: Box::new(materialize_query_body(&comp.right, exec_sub)?),
828            order_by: comp.order_by.clone(),
829            limit: comp.limit.clone(),
830            offset: comp.offset.clone(),
831        }))),
832        QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Ok(body.clone()),
833    }
834}
835
836pub(super) fn exec_query_body(
837    db: &Database,
838    schema: &SchemaManager,
839    body: &QueryBody,
840    ctes: &CteContext,
841) -> Result<ExecutionResult> {
842    match body {
843        QueryBody::Select(sel) => super::exec_select(db, schema, sel, ctes),
844        QueryBody::Compound(comp) => exec_compound_select(db, schema, comp, ctes),
845        QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Err(
846            SqlError::Unsupported("DML CTE bodies require an active write transaction".into()),
847        ),
848    }
849}
850
851pub(super) fn exec_query_body_in_txn(
852    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
853    schema: &SchemaManager,
854    body: &QueryBody,
855    ctes: &CteContext,
856) -> Result<ExecutionResult> {
857    match body {
858        QueryBody::Select(sel) => super::exec_select_in_txn(wtx, schema, sel, ctes),
859        QueryBody::Compound(comp) => exec_compound_select_in_txn(wtx, schema, comp, ctes),
860        QueryBody::Insert(ins) => exec_insert_in_txn_with_ctes(wtx, schema, ins, &[], ctes),
861        QueryBody::Update(upd) => super::exec_update_in_txn(wtx, schema, upd),
862        QueryBody::Delete(del) => super::exec_delete_in_txn(wtx, schema, del),
863    }
864}
865
866pub(super) fn exec_query_body_read(
867    db: &Database,
868    schema: &SchemaManager,
869    body: &QueryBody,
870    ctes: &CteContext,
871) -> Result<QueryResult> {
872    match exec_query_body(db, schema, body, ctes)? {
873        ExecutionResult::Query(qr) => Ok(qr),
874        _ => Ok(QueryResult {
875            columns: vec![],
876            rows: vec![],
877        }),
878    }
879}
880
881pub(super) fn exec_query_body_write(
882    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
883    schema: &SchemaManager,
884    body: &QueryBody,
885    ctes: &CteContext,
886) -> Result<QueryResult> {
887    match exec_query_body_in_txn(wtx, schema, body, ctes)? {
888        ExecutionResult::Query(qr) => Ok(qr),
889        _ => Ok(QueryResult {
890            columns: vec![],
891            rows: vec![],
892        }),
893    }
894}
895
896pub(super) fn exec_compound_select(
897    db: &Database,
898    schema: &SchemaManager,
899    comp: &CompoundSelect,
900    ctes: &CteContext,
901) -> Result<ExecutionResult> {
902    let left_qr = match exec_query_body(db, schema, &comp.left, ctes)? {
903        ExecutionResult::Query(qr) => qr,
904        _ => QueryResult {
905            columns: vec![],
906            rows: vec![],
907        },
908    };
909    let right_qr = match exec_query_body(db, schema, &comp.right, ctes)? {
910        ExecutionResult::Query(qr) => qr,
911        _ => QueryResult {
912            columns: vec![],
913            rows: vec![],
914        },
915    };
916    apply_set_operation(comp, left_qr, right_qr)
917}
918
919pub(super) fn exec_compound_select_in_txn(
920    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
921    schema: &SchemaManager,
922    comp: &CompoundSelect,
923    ctes: &CteContext,
924) -> Result<ExecutionResult> {
925    let left_qr = match exec_query_body_in_txn(wtx, schema, &comp.left, ctes)? {
926        ExecutionResult::Query(qr) => qr,
927        _ => QueryResult {
928            columns: vec![],
929            rows: vec![],
930        },
931    };
932    let right_qr = match exec_query_body_in_txn(wtx, schema, &comp.right, ctes)? {
933        ExecutionResult::Query(qr) => qr,
934        _ => QueryResult {
935            columns: vec![],
936            rows: vec![],
937        },
938    };
939    apply_set_operation(comp, left_qr, right_qr)
940}
941
942pub(super) fn apply_set_operation(
943    comp: &CompoundSelect,
944    left_qr: QueryResult,
945    right_qr: QueryResult,
946) -> Result<ExecutionResult> {
947    if !left_qr.columns.is_empty()
948        && !right_qr.columns.is_empty()
949        && left_qr.columns.len() != right_qr.columns.len()
950    {
951        return Err(SqlError::CompoundColumnCountMismatch {
952            left: left_qr.columns.len(),
953            right: right_qr.columns.len(),
954        });
955    }
956
957    let columns = left_qr.columns;
958
959    let mut rows = match (&comp.op, comp.all) {
960        (SetOp::Union, true) => {
961            let total = left_qr.rows.len().saturating_add(right_qr.rows.len());
962            let mut rows = Vec::with_capacity(total);
963            rows.extend(left_qr.rows);
964            rows.extend(right_qr.rows);
965            rows
966        }
967        (SetOp::Union, false) => {
968            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
969            let mut rows = Vec::new();
970            for row in left_qr.rows.into_iter().chain(right_qr.rows) {
971                if !seen.contains(&row) {
972                    seen.insert(row.clone());
973                    rows.push(row);
974                }
975            }
976            rows
977        }
978        (SetOp::Intersect, true) => {
979            let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
980            for row in &right_qr.rows {
981                *right_counts.entry(row.clone()).or_insert(0) += 1;
982            }
983            let mut rows = Vec::new();
984            for row in left_qr.rows {
985                if let Some(count) = right_counts.get_mut(&row) {
986                    if *count > 0 {
987                        *count -= 1;
988                        rows.push(row);
989                    }
990                }
991            }
992            rows
993        }
994        (SetOp::Intersect, false) => {
995            let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
996            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
997            let mut rows = Vec::new();
998            for row in left_qr.rows {
999                if right_set.contains(&row) && !seen.contains(&row) {
1000                    seen.insert(row.clone());
1001                    rows.push(row);
1002                }
1003            }
1004            rows
1005        }
1006        (SetOp::Except, true) => {
1007            let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1008            for row in &right_qr.rows {
1009                *right_counts.entry(row.clone()).or_insert(0) += 1;
1010            }
1011            let mut rows = Vec::new();
1012            for row in left_qr.rows {
1013                if let Some(count) = right_counts.get_mut(&row) {
1014                    if *count > 0 {
1015                        *count -= 1;
1016                        continue;
1017                    }
1018                }
1019                rows.push(row);
1020            }
1021            rows
1022        }
1023        (SetOp::Except, false) => {
1024            let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1025            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1026            let mut rows = Vec::new();
1027            for row in left_qr.rows {
1028                if !right_set.contains(&row) && !seen.contains(&row) {
1029                    seen.insert(row.clone());
1030                    rows.push(row);
1031                }
1032            }
1033            rows
1034        }
1035    };
1036
1037    if !comp.order_by.is_empty() {
1038        let col_defs: Vec<crate::types::ColumnDef> = columns
1039            .iter()
1040            .enumerate()
1041            .map(|(i, name)| crate::types::ColumnDef {
1042                name: name.clone(),
1043                data_type: crate::types::DataType::Null,
1044                nullable: true,
1045                position: i as u16,
1046                default_expr: None,
1047                default_sql: None,
1048                check_expr: None,
1049                check_sql: None,
1050                check_name: None,
1051                is_with_timezone: false,
1052                generated_expr: None,
1053                generated_sql: None,
1054                generated_kind: None,
1055                collation: crate::types::Collation::Binary,
1056            })
1057            .collect();
1058        sort_rows(&mut rows, &comp.order_by, &col_defs)?;
1059    }
1060
1061    if let Some(ref offset_expr) = comp.offset {
1062        let offset = eval_const_int(offset_expr)?.max(0) as usize;
1063        if offset < rows.len() {
1064            rows = rows.split_off(offset);
1065        } else {
1066            rows.clear();
1067        }
1068    }
1069
1070    if let Some(ref limit_expr) = comp.limit {
1071        let limit = eval_const_int(limit_expr)?.max(0) as usize;
1072        rows.truncate(limit);
1073    }
1074
1075    Ok(ExecutionResult::Query(QueryResult { columns, rows }))
1076}
1077
1078struct InsertBufs {
1079    row: Vec<Value>,
1080    pk_values: Vec<Value>,
1081    value_values: Vec<Value>,
1082    key_buf: Vec<u8>,
1083    value_buf: Vec<u8>,
1084    col_indices: Vec<usize>,
1085    fk_key_buf: Vec<u8>,
1086}
1087
1088impl InsertBufs {
1089    fn new() -> Self {
1090        Self {
1091            row: Vec::new(),
1092            pk_values: Vec::new(),
1093            value_values: Vec::new(),
1094            key_buf: Vec::with_capacity(64),
1095            value_buf: Vec::with_capacity(256),
1096            col_indices: Vec::new(),
1097            fk_key_buf: Vec::with_capacity(64),
1098        }
1099    }
1100}
1101
1102thread_local! {
1103    static INSERT_SCRATCH: RefCell<InsertBufs> = RefCell::new(InsertBufs::new());
1104    static UPSERT_SCRATCH: RefCell<UpsertBufs> = RefCell::new(UpsertBufs::new());
1105}
1106
1107fn with_insert_scratch<R>(f: impl FnOnce(&mut InsertBufs) -> R) -> R {
1108    INSERT_SCRATCH.with(|slot| f(&mut slot.borrow_mut()))
1109}
1110
1111pub(super) struct UpsertBufs {
1112    old_row: Vec<Value>,
1113    new_row: Vec<Value>,
1114    value_values: Vec<Value>,
1115    new_value_buf: Vec<u8>,
1116}
1117
1118impl UpsertBufs {
1119    pub(super) fn new() -> Self {
1120        Self {
1121            old_row: Vec::new(),
1122            new_row: Vec::new(),
1123            value_values: Vec::new(),
1124            new_value_buf: Vec::with_capacity(256),
1125        }
1126    }
1127}
1128
1129pub fn exec_insert_in_txn(
1130    wtx: &mut WriteTxn<'_>,
1131    schema: &SchemaManager,
1132    stmt: &InsertStmt,
1133    params: &[Value],
1134) -> Result<ExecutionResult> {
1135    with_insert_scratch(|bufs| {
1136        exec_insert_in_txn_impl(
1137            wtx,
1138            schema,
1139            stmt,
1140            params,
1141            bufs,
1142            None,
1143            &CteContext::default(),
1144        )
1145    })
1146}
1147
1148pub(super) fn exec_insert_in_txn_with_ctes(
1149    wtx: &mut WriteTxn<'_>,
1150    schema: &SchemaManager,
1151    stmt: &InsertStmt,
1152    params: &[Value],
1153    outer_ctes: &CteContext,
1154) -> Result<ExecutionResult> {
1155    with_insert_scratch(|bufs| {
1156        exec_insert_in_txn_impl(wtx, schema, stmt, params, bufs, None, outer_ctes)
1157    })
1158}
1159
1160fn exec_insert_in_txn_cached(
1161    wtx: &mut WriteTxn<'_>,
1162    schema: &SchemaManager,
1163    stmt: &InsertStmt,
1164    params: &[Value],
1165    cache: &InsertCache,
1166) -> Result<ExecutionResult> {
1167    with_insert_scratch(|bufs| {
1168        exec_insert_in_txn_impl(
1169            wtx,
1170            schema,
1171            stmt,
1172            params,
1173            bufs,
1174            Some(cache),
1175            &CteContext::default(),
1176        )
1177    })
1178}
1179
1180fn exec_insert_in_txn_impl(
1181    wtx: &mut WriteTxn<'_>,
1182    schema: &SchemaManager,
1183    stmt: &InsertStmt,
1184    params: &[Value],
1185    bufs: &mut InsertBufs,
1186    cache: Option<&InsertCache>,
1187    outer_ctes: &CteContext,
1188) -> Result<ExecutionResult> {
1189    let empty_ctes = CteContext::default();
1190    let materialized;
1191    let has_sub = match cache {
1192        Some(c) => c.has_subquery,
1193        None => insert_has_subquery(stmt),
1194    };
1195    let stmt = if has_sub {
1196        materialized = materialize_insert(stmt, &mut |sub| {
1197            exec_subquery_write(wtx, schema, sub, &empty_ctes)
1198        })?;
1199        &materialized
1200    } else {
1201        stmt
1202    };
1203
1204    let table_schema = schema
1205        .get(&stmt.table)
1206        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1207
1208    let default_columns;
1209    let insert_columns: &[String] = if stmt.columns.is_empty() {
1210        default_columns = table_schema
1211            .columns
1212            .iter()
1213            .map(|c| c.name.clone())
1214            .collect::<Vec<_>>();
1215        &default_columns
1216    } else {
1217        &stmt.columns
1218    };
1219
1220    bufs.col_indices.clear();
1221    if let Some(c) = cache {
1222        bufs.col_indices.extend_from_slice(&c.col_indices);
1223    } else {
1224        for name in insert_columns {
1225            bufs.col_indices.push(
1226                table_schema
1227                    .column_index(name)
1228                    .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?,
1229            );
1230        }
1231    }
1232
1233    if cache.is_none() {
1234        for &ci in &bufs.col_indices {
1235            if table_schema.columns[ci].generated_kind.is_some() {
1236                return Err(SqlError::CannotInsertIntoGeneratedColumn(
1237                    table_schema.columns[ci].name.clone(),
1238                ));
1239            }
1240        }
1241    }
1242
1243    let generated_cols_uncached: Vec<(usize, &Expr, FastGenEval)>;
1244    let cached_gen_positions: &[usize];
1245    let cached_gen_fast_evals: &[FastGenEval];
1246    if let Some(c) = cache {
1247        cached_gen_positions = &c.generated_col_positions;
1248        cached_gen_fast_evals = &c.generated_fast_evals;
1249        generated_cols_uncached = Vec::new();
1250    } else {
1251        cached_gen_positions = &[];
1252        cached_gen_fast_evals = &[];
1253        generated_cols_uncached = table_schema
1254            .columns
1255            .iter()
1256            .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
1257            .map(|c| {
1258                let expr = c.generated_expr.as_ref().unwrap();
1259                let fe = detect_fast_gen_eval(expr, table_schema);
1260                (c.position as usize, expr, fe)
1261            })
1262            .collect();
1263    }
1264    let has_gen_cols = !cached_gen_positions.is_empty() || !generated_cols_uncached.is_empty();
1265    let row_col_map_for_gen_owned: Option<ColumnMap> = if !has_gen_cols || cache.is_some() {
1266        None
1267    } else {
1268        Some(ColumnMap::new(&table_schema.columns))
1269    };
1270    let row_col_map_for_gen: Option<&ColumnMap> = if !has_gen_cols {
1271        None
1272    } else if let Some(c) = cache {
1273        c.row_col_map.as_ref()
1274    } else {
1275        row_col_map_for_gen_owned.as_ref()
1276    };
1277
1278    let any_defaults = match cache {
1279        Some(c) => c.any_defaults,
1280        None => table_schema
1281            .columns
1282            .iter()
1283            .any(|c| c.default_expr.is_some()),
1284    };
1285    let defaults: Vec<(usize, &Expr)> = if any_defaults {
1286        table_schema
1287            .columns
1288            .iter()
1289            .filter(|c| {
1290                c.default_expr.is_some() && !bufs.col_indices.contains(&(c.position as usize))
1291            })
1292            .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
1293            .collect()
1294    } else {
1295        Vec::new()
1296    };
1297
1298    let has_checks = match cache {
1299        Some(c) => c.has_checks,
1300        None => table_schema.has_checks(),
1301    };
1302    let check_col_map = if has_checks {
1303        Some(ColumnMap::new(&table_schema.columns))
1304    } else {
1305        None
1306    };
1307
1308    let (pk_indices, non_pk, enc_pos, phys_count, dropped): (
1309        &[usize],
1310        &[usize],
1311        &[u16],
1312        usize,
1313        &[u16],
1314    ) = if let Some(c) = cache {
1315        (
1316            &c.pk_indices,
1317            &c.non_pk_indices,
1318            &c.encoding_positions,
1319            c.phys_count,
1320            &c.dropped_non_pk_slots,
1321        )
1322    } else {
1323        (
1324            table_schema.pk_indices(),
1325            table_schema.non_pk_indices(),
1326            table_schema.encoding_positions(),
1327            table_schema.physical_non_pk_count(),
1328            table_schema.dropped_non_pk_slots(),
1329        )
1330    };
1331
1332    bufs.row.resize(table_schema.columns.len(), Value::Null);
1333    bufs.pk_values.resize(pk_indices.len(), Value::Null);
1334    bufs.value_values.resize(phys_count, Value::Null);
1335
1336    let table_bytes = stmt.table.as_bytes();
1337    let has_fks = !table_schema.foreign_keys.is_empty();
1338    let has_indices = !table_schema.indices.is_empty();
1339    let has_defaults = !defaults.is_empty();
1340
1341    let compiled_conflict: Option<Arc<CompiledOnConflict>> = match (cache, &stmt.on_conflict) {
1342        (Some(c), Some(_)) if c.on_conflict.is_some() => c.on_conflict.clone(),
1343        (_, Some(oc)) => Some(Arc::new(compile_on_conflict(oc, table_schema)?)),
1344        (_, None) => None,
1345    };
1346
1347    let row_col_map_owned: Option<ColumnMap> =
1348        if compiled_conflict.is_some() && cache.and_then(|c| c.row_col_map.as_ref()).is_none() {
1349            Some(ColumnMap::new(&table_schema.columns))
1350        } else {
1351            None
1352        };
1353    let row_col_map: Option<&ColumnMap> = cache
1354        .and_then(|c| c.row_col_map.as_ref())
1355        .or(row_col_map_owned.as_ref());
1356
1357    let select_rows = match &stmt.source {
1358        InsertSource::Select(sq) => {
1359            let insert_ctes = super::materialize_all_ctes_with_outer(
1360                &sq.ctes,
1361                sq.recursive,
1362                outer_ctes,
1363                &mut |body, ctx| exec_query_body_write(wtx, schema, body, ctx),
1364            )?;
1365            let qr = exec_query_body_write(wtx, schema, &sq.body, &insert_ctes)?;
1366            Some(qr.rows)
1367        }
1368        InsertSource::Values(_) => None,
1369    };
1370
1371    let mut count: u64 = 0;
1372    let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
1373        stmt.returning.as_ref().map(|_| Vec::new());
1374
1375    let values = match &stmt.source {
1376        InsertSource::Values(rows) => Some(rows.as_slice()),
1377        InsertSource::Select(_) => None,
1378    };
1379    let sel_rows = select_rows.as_deref();
1380
1381    let total = match (values, sel_rows) {
1382        (Some(rows), _) => rows.len(),
1383        (_, Some(rows)) => rows.len(),
1384        _ => 0,
1385    };
1386
1387    if let Some(sel) = sel_rows {
1388        if !sel.is_empty() && sel[0].len() != insert_columns.len() {
1389            return Err(SqlError::InvalidValue(format!(
1390                "INSERT ... SELECT column count mismatch: expected {}, got {}",
1391                insert_columns.len(),
1392                sel[0].len()
1393            )));
1394        }
1395    }
1396
1397    let skip_row_clear = cache.is_some_and(|c| c.row_fully_overwritten);
1398    for idx in 0..total {
1399        if !skip_row_clear {
1400            for v in bufs.row.iter_mut() {
1401                *v = Value::Null;
1402            }
1403        }
1404
1405        if let Some(value_rows) = values {
1406            if let Some(plan) = cache.and_then(|c| c.bind_plan.as_ref()) {
1407                for action in plan {
1408                    match action {
1409                        BindAction::Param {
1410                            param_idx,
1411                            col_idx,
1412                            target,
1413                        } => {
1414                            let v = &params[*param_idx];
1415                            bufs.row[*col_idx] = if v.is_null() {
1416                                Value::Null
1417                            } else if v.data_type() == *target {
1418                                v.clone()
1419                            } else {
1420                                let got = v.data_type();
1421                                v.clone().coerce_into(*target).ok_or_else(|| {
1422                                    SqlError::TypeMismatch {
1423                                        expected: target.to_string(),
1424                                        got: got.to_string(),
1425                                    }
1426                                })?
1427                            };
1428                        }
1429                        BindAction::Literal { value, col_idx } => {
1430                            bufs.row[*col_idx] = value.clone();
1431                        }
1432                    }
1433                }
1434            } else {
1435                let value_row = &value_rows[idx];
1436                if value_row.len() != insert_columns.len() {
1437                    return Err(SqlError::InvalidValue(format!(
1438                        "expected {} values, got {}",
1439                        insert_columns.len(),
1440                        value_row.len()
1441                    )));
1442                }
1443                for (i, expr) in value_row.iter().enumerate() {
1444                    let val = match expr {
1445                        Expr::Parameter(n) => params
1446                            .get(n - 1)
1447                            .cloned()
1448                            .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
1449                        Expr::Literal(v) => v.clone(),
1450                        _ => eval_const_expr(expr)?,
1451                    };
1452                    let col_idx = bufs.col_indices[i];
1453                    let col = &table_schema.columns[col_idx];
1454                    let got_type = val.data_type();
1455                    bufs.row[col_idx] = if val.is_null() {
1456                        Value::Null
1457                    } else {
1458                        val.coerce_into(col.data_type)
1459                            .ok_or_else(|| SqlError::TypeMismatch {
1460                                expected: col.data_type.to_string(),
1461                                got: got_type.to_string(),
1462                            })?
1463                    };
1464                }
1465            }
1466        } else if let Some(sel) = sel_rows {
1467            let sel_row = &sel[idx];
1468            for (i, val) in sel_row.iter().enumerate() {
1469                let col_idx = bufs.col_indices[i];
1470                let col = &table_schema.columns[col_idx];
1471                let got_type = val.data_type();
1472                bufs.row[col_idx] = if val.is_null() {
1473                    Value::Null
1474                } else {
1475                    val.clone().coerce_into(col.data_type).ok_or_else(|| {
1476                        SqlError::TypeMismatch {
1477                            expected: col.data_type.to_string(),
1478                            got: got_type.to_string(),
1479                        }
1480                    })?
1481                };
1482            }
1483        }
1484
1485        if has_defaults {
1486            for &(pos, def_expr) in &defaults {
1487                let val = eval_const_expr(def_expr)?;
1488                let col = &table_schema.columns[pos];
1489                if !val.is_null() {
1490                    let got_type = val.data_type();
1491                    bufs.row[pos] =
1492                        val.coerce_into(col.data_type)
1493                            .ok_or_else(|| SqlError::TypeMismatch {
1494                                expected: col.data_type.to_string(),
1495                                got: got_type.to_string(),
1496                            })?;
1497                }
1498            }
1499        }
1500
1501        if let Some(gen_map) = row_col_map_for_gen {
1502            if cache.is_some() {
1503                for (pos, fast) in cached_gen_positions
1504                    .iter()
1505                    .copied()
1506                    .zip(cached_gen_fast_evals.iter())
1507                {
1508                    let gen_expr = table_schema.columns[pos].generated_expr.as_ref().unwrap();
1509                    let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1510                    let col = &table_schema.columns[pos];
1511                    bufs.row[pos] = if val.is_null() {
1512                        Value::Null
1513                    } else {
1514                        let got_type = val.data_type();
1515                        val.coerce_into(col.data_type)
1516                            .ok_or_else(|| SqlError::TypeMismatch {
1517                                expected: col.data_type.to_string(),
1518                                got: got_type.to_string(),
1519                            })?
1520                    };
1521                }
1522            } else {
1523                for (pos, gen_expr, fast) in &generated_cols_uncached {
1524                    let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1525                    let col = &table_schema.columns[*pos];
1526                    bufs.row[*pos] = if val.is_null() {
1527                        Value::Null
1528                    } else {
1529                        let got_type = val.data_type();
1530                        val.coerce_into(col.data_type)
1531                            .ok_or_else(|| SqlError::TypeMismatch {
1532                                expected: col.data_type.to_string(),
1533                                got: got_type.to_string(),
1534                            })?
1535                    };
1536                }
1537            }
1538        }
1539
1540        if let Some(c) = cache {
1541            for &pos in &c.not_null_indices {
1542                if bufs.row[pos as usize].is_null() {
1543                    return Err(SqlError::NotNullViolation(
1544                        table_schema.columns[pos as usize].name.clone(),
1545                    ));
1546                }
1547            }
1548        } else {
1549            for col in &table_schema.columns {
1550                if !col.nullable && bufs.row[col.position as usize].is_null() {
1551                    return Err(SqlError::NotNullViolation(col.name.clone()));
1552                }
1553            }
1554        }
1555
1556        if let Some(ref col_map) = check_col_map {
1557            for col in &table_schema.columns {
1558                if let Some(ref check) = col.check_expr {
1559                    let result = eval_expr(check, &EvalCtx::new(col_map, &bufs.row))?;
1560                    if !is_truthy(&result) && !result.is_null() {
1561                        let name = col.check_name.as_deref().unwrap_or(&col.name);
1562                        return Err(SqlError::CheckViolation(name.to_string()));
1563                    }
1564                }
1565            }
1566            for tc in &table_schema.check_constraints {
1567                let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &bufs.row))?;
1568                if !is_truthy(&result) && !result.is_null() {
1569                    let name = tc.name.as_deref().unwrap_or(&tc.sql);
1570                    return Err(SqlError::CheckViolation(name.to_string()));
1571                }
1572            }
1573        }
1574
1575        if has_fks {
1576            for fk in &table_schema.foreign_keys {
1577                let any_null = fk.columns.iter().any(|&ci| bufs.row[ci as usize].is_null());
1578                if any_null {
1579                    continue;
1580                }
1581                crate::encoding::encode_composite_key_from_indices(
1582                    &fk.columns,
1583                    &bufs.row,
1584                    &mut bufs.fk_key_buf,
1585                );
1586                if fk.deferrable && fk.initially_deferred {
1587                    let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
1588                    wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
1589                        fk_name: name,
1590                        foreign_table: fk.foreign_table.as_bytes().to_vec(),
1591                        parent_key: bufs.fk_key_buf.clone(),
1592                    });
1593                    continue;
1594                }
1595                if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &bufs.fk_key_buf) {
1596                    let found = wtx
1597                        .table_get(fk.foreign_table.as_bytes(), &bufs.fk_key_buf)
1598                        .map_err(SqlError::Storage)?;
1599                    if found.is_none() {
1600                        let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
1601                        return Err(SqlError::ForeignKeyViolation(name.to_string()));
1602                    }
1603                    wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &bufs.fk_key_buf);
1604                }
1605            }
1606        }
1607
1608        let proposed_row_for_returning: Option<Vec<Value>> =
1609            returning_rows.as_ref().map(|_| bufs.row.clone());
1610
1611        for (j, &i) in pk_indices.iter().enumerate() {
1612            bufs.pk_values[j] = std::mem::replace(&mut bufs.row[i], Value::Null);
1613        }
1614        match cache.map(|c| c.single_int_pk).unwrap_or(false) {
1615            true => match bufs.pk_values[0] {
1616                Value::Integer(v) => crate::encoding::encode_int_key_into(v, &mut bufs.key_buf),
1617                _ => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1618            },
1619            false => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1620        }
1621
1622        for &slot in dropped {
1623            bufs.value_values[slot as usize] = Value::Null;
1624        }
1625        for (j, &i) in non_pk.iter().enumerate() {
1626            let col = &table_schema.columns[i];
1627            if matches!(
1628                col.generated_kind,
1629                Some(crate::parser::GeneratedKind::Virtual)
1630            ) {
1631                bufs.value_values[enc_pos[j] as usize] = Value::Null;
1632                bufs.row[i] = Value::Null;
1633            } else {
1634                bufs.value_values[enc_pos[j] as usize] =
1635                    std::mem::replace(&mut bufs.row[i], Value::Null);
1636            }
1637        }
1638        match cache.and_then(|c| c.row_encoder.as_ref()) {
1639            Some(tmpl) => crate::encoding::encode_int_row_with_template(
1640                tmpl,
1641                &bufs.value_values,
1642                &mut bufs.value_buf,
1643            )?,
1644            None => encode_row_into(&bufs.value_values, &mut bufs.value_buf),
1645        }
1646
1647        if bufs.key_buf.len() > citadel_core::MAX_KEY_SIZE {
1648            return Err(SqlError::KeyTooLarge {
1649                size: bufs.key_buf.len(),
1650                max: citadel_core::MAX_KEY_SIZE,
1651            });
1652        }
1653        if bufs.value_buf.len() > citadel_core::MAX_VALUE_SIZE {
1654            return Err(SqlError::RowTooLarge {
1655                size: bufs.value_buf.len(),
1656                max: citadel_core::MAX_VALUE_SIZE,
1657            });
1658        }
1659
1660        match compiled_conflict.as_ref() {
1661            None => {
1662                let is_new = wtx
1663                    .table_insert(table_bytes, &bufs.key_buf, &bufs.value_buf)
1664                    .map_err(SqlError::Storage)?;
1665                if !is_new {
1666                    return Err(SqlError::DuplicateKey);
1667                }
1668                if has_indices {
1669                    for (j, &i) in pk_indices.iter().enumerate() {
1670                        bufs.row[i] = bufs.pk_values[j].clone();
1671                    }
1672                    for (j, &i) in non_pk.iter().enumerate() {
1673                        bufs.row[i] = std::mem::replace(
1674                            &mut bufs.value_values[enc_pos[j] as usize],
1675                            Value::Null,
1676                        );
1677                    }
1678                    insert_index_entries(wtx, table_schema, &bufs.row, &bufs.pk_values)?;
1679                }
1680                count += 1;
1681                if let Some(buf) = returning_rows.as_mut() {
1682                    buf.push((None, proposed_row_for_returning));
1683                }
1684            }
1685            Some(oc) => {
1686                let oc_ref: &CompiledOnConflict = oc;
1687                let needs_row = upsert_needs_row(oc_ref, table_schema);
1688                if needs_row {
1689                    for (j, &i) in pk_indices.iter().enumerate() {
1690                        bufs.row[i] = bufs.pk_values[j].clone();
1691                    }
1692                    for (j, &i) in non_pk.iter().enumerate() {
1693                        bufs.row[i] = std::mem::replace(
1694                            &mut bufs.value_values[enc_pos[j] as usize],
1695                            Value::Null,
1696                        );
1697                    }
1698                }
1699                let outcome = apply_insert_with_conflict(
1700                    wtx,
1701                    table_schema,
1702                    &bufs.key_buf,
1703                    &bufs.value_buf,
1704                    &bufs.row,
1705                    &bufs.pk_values,
1706                    oc_ref,
1707                    row_col_map.unwrap(),
1708                    stmt.returning.is_some(),
1709                )?;
1710                match outcome {
1711                    InsertRowOutcome::Inserted => {
1712                        count += 1;
1713                        if let Some(buf) = returning_rows.as_mut() {
1714                            buf.push((None, proposed_row_for_returning));
1715                        }
1716                    }
1717                    InsertRowOutcome::Updated { old, new } => {
1718                        count += 1;
1719                        if let Some(buf) = returning_rows.as_mut() {
1720                            buf.push((Some(old), Some(new)));
1721                        }
1722                    }
1723                    InsertRowOutcome::Skipped => {}
1724                }
1725            }
1726        }
1727    }
1728
1729    if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
1730        return Ok(ExecutionResult::Query(super::helpers::project_returning(
1731            table_schema,
1732            returning_cols,
1733            &rows,
1734        )?));
1735    }
1736
1737    Ok(ExecutionResult::RowsAffected(count))
1738}
1739
1740pub struct CompiledInsert {
1741    table_lower: String,
1742    cached: Option<InsertCache>,
1743}
1744
1745struct InsertCache {
1746    col_indices: Vec<usize>,
1747    has_subquery: bool,
1748    any_defaults: bool,
1749    has_checks: bool,
1750    on_conflict: Option<Arc<CompiledOnConflict>>,
1751    row_col_map: Option<ColumnMap>,
1752    generated_col_positions: Vec<usize>,
1753    generated_fast_evals: Vec<FastGenEval>,
1754    pk_indices: Vec<usize>,
1755    non_pk_indices: Vec<usize>,
1756    encoding_positions: Vec<u16>,
1757    dropped_non_pk_slots: Vec<u16>,
1758    phys_count: usize,
1759    single_int_pk: bool,
1760    not_null_indices: Vec<u16>,
1761    bind_plan: Option<Vec<BindAction>>,
1762    row_fully_overwritten: bool,
1763    row_encoder: Option<crate::encoding::IntRowTemplate>,
1764    is_trivial_fast: bool,
1765    trivial_fast_program: Option<TrivialFastProgram>,
1766    needs_scoped_params: bool,
1767}
1768
1769#[derive(Clone)]
1770enum BindAction {
1771    Param {
1772        param_idx: usize,
1773        col_idx: usize,
1774        target: DataType,
1775    },
1776    Literal {
1777        value: Value,
1778        col_idx: usize,
1779    },
1780}
1781
1782#[derive(Clone)]
1783struct TrivialFastProgram {
1784    template: Vec<u8>,
1785    ops: Vec<WriteOp>,
1786    pk_param: u8,
1787    not_null_param_indices: Vec<u8>,
1788}
1789
1790#[derive(Clone)]
1791enum WriteOp {
1792    ParamI64 {
1793        param_idx: u8,
1794        off: u32,
1795    },
1796    LiteralI64 {
1797        value: i64,
1798        off: u32,
1799    },
1800    GenAddParamsI64 {
1801        a_param: u8,
1802        b_param: u8,
1803        off: u32,
1804        bitmap_byte_off: u32,
1805        bitmap_bit_mask: u8,
1806    },
1807    GenMulAddParamI64 {
1808        param_idx: u8,
1809        mul: i64,
1810        add: i64,
1811        off: u32,
1812        bitmap_byte_off: u32,
1813        bitmap_bit_mask: u8,
1814    },
1815}
1816
1817fn build_trivial_fast_program(
1818    bind_plan: &[BindAction],
1819    row_encoder: &crate::encoding::IntRowTemplate,
1820    non_virtual_pairs: &[(usize, usize)],
1821    generated_col_positions: &[usize],
1822    generated_fast_evals: &[FastGenEval],
1823    pk_indices: &[usize],
1824    columns: &[crate::types::ColumnDef],
1825) -> Option<TrivialFastProgram> {
1826    let pk_col = pk_indices[0];
1827    let col_to_slot: rustc_hash::FxHashMap<usize, usize> =
1828        non_virtual_pairs.iter().copied().collect();
1829    let slot_to_off: rustc_hash::FxHashMap<usize, usize> =
1830        row_encoder.slot_offsets.iter().copied().collect();
1831
1832    let mut col_to_param: rustc_hash::FxHashMap<usize, u8> = Default::default();
1833    let mut col_to_lit_int: rustc_hash::FxHashMap<usize, i64> = Default::default();
1834    let mut pk_param: Option<u8> = None;
1835    let mut ops: Vec<WriteOp> = Vec::with_capacity(bind_plan.len() + generated_col_positions.len());
1836    let mut not_null_param_indices: Vec<u8> = Vec::new();
1837
1838    for action in bind_plan {
1839        match action {
1840            BindAction::Param {
1841                param_idx,
1842                col_idx,
1843                target,
1844            } => {
1845                if *target != DataType::Integer {
1846                    return None;
1847                }
1848                let pi: u8 = u8::try_from(*param_idx).ok()?;
1849                col_to_param.insert(*col_idx, pi);
1850                if *col_idx == pk_col {
1851                    pk_param = Some(pi);
1852                } else {
1853                    let slot = *col_to_slot.get(col_idx)?;
1854                    let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
1855                    ops.push(WriteOp::ParamI64 { param_idx: pi, off });
1856                    if !columns[*col_idx].nullable {
1857                        not_null_param_indices.push(pi);
1858                    }
1859                }
1860            }
1861            BindAction::Literal { value, col_idx } => match value {
1862                Value::Integer(v) => {
1863                    col_to_lit_int.insert(*col_idx, *v);
1864                    if *col_idx == pk_col {
1865                        return None;
1866                    }
1867                    let slot = *col_to_slot.get(col_idx)?;
1868                    let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
1869                    ops.push(WriteOp::LiteralI64 { value: *v, off });
1870                }
1871                _ => return None,
1872            },
1873        }
1874    }
1875
1876    let pk_param = pk_param?;
1877
1878    for (i, &gen_pos) in generated_col_positions.iter().enumerate() {
1879        let gen_slot = *col_to_slot.get(&gen_pos)?;
1880        let gen_off = u32::try_from(*slot_to_off.get(&gen_slot)?).ok()?;
1881        let bitmap_byte_off = u32::try_from(2 + gen_slot / 8).ok()?;
1882        let bitmap_bit_mask: u8 = 1u8 << (gen_slot % 8);
1883        let gen_col_nullable = columns[gen_pos].nullable;
1884
1885        match &generated_fast_evals[i] {
1886            FastGenEval::IntColAddCol {
1887                left_idx,
1888                right_idx,
1889            } => {
1890                let a_param = col_to_param.get(left_idx).copied();
1891                let b_param = col_to_param.get(right_idx).copied();
1892                match (a_param, b_param) {
1893                    (Some(ap), Some(bp)) => {
1894                        let deps_safe = gen_col_nullable
1895                            || (not_null_param_indices.contains(&ap)
1896                                && not_null_param_indices.contains(&bp));
1897                        if !deps_safe {
1898                            return None;
1899                        }
1900                        ops.push(WriteOp::GenAddParamsI64 {
1901                            a_param: ap,
1902                            b_param: bp,
1903                            off: gen_off,
1904                            bitmap_byte_off,
1905                            bitmap_bit_mask,
1906                        });
1907                    }
1908                    (Some(p), None) => {
1909                        let lit = col_to_lit_int.get(right_idx).copied()?;
1910                        if !gen_col_nullable && !not_null_param_indices.contains(&p) {
1911                            return None;
1912                        }
1913                        ops.push(WriteOp::GenMulAddParamI64 {
1914                            param_idx: p,
1915                            mul: 1,
1916                            add: lit,
1917                            off: gen_off,
1918                            bitmap_byte_off,
1919                            bitmap_bit_mask,
1920                        });
1921                    }
1922                    (None, Some(p)) => {
1923                        let lit = col_to_lit_int.get(left_idx).copied()?;
1924                        if !gen_col_nullable && !not_null_param_indices.contains(&p) {
1925                            return None;
1926                        }
1927                        ops.push(WriteOp::GenMulAddParamI64 {
1928                            param_idx: p,
1929                            mul: 1,
1930                            add: lit,
1931                            off: gen_off,
1932                            bitmap_byte_off,
1933                            bitmap_bit_mask,
1934                        });
1935                    }
1936                    (None, None) => {
1937                        let la = col_to_lit_int.get(left_idx).copied()?;
1938                        let lb = col_to_lit_int.get(right_idx).copied()?;
1939                        ops.push(WriteOp::LiteralI64 {
1940                            value: la.wrapping_add(lb),
1941                            off: gen_off,
1942                        });
1943                    }
1944                }
1945            }
1946            FastGenEval::IntColMulAdd {
1947                col_schema_idx,
1948                mul,
1949                add,
1950            } => {
1951                if let Some(p) = col_to_param.get(col_schema_idx).copied() {
1952                    if !gen_col_nullable && !not_null_param_indices.contains(&p) {
1953                        return None;
1954                    }
1955                    ops.push(WriteOp::GenMulAddParamI64 {
1956                        param_idx: p,
1957                        mul: *mul,
1958                        add: *add,
1959                        off: gen_off,
1960                        bitmap_byte_off,
1961                        bitmap_bit_mask,
1962                    });
1963                } else if let Some(lit) = col_to_lit_int.get(col_schema_idx).copied() {
1964                    ops.push(WriteOp::LiteralI64 {
1965                        value: lit.wrapping_mul(*mul).wrapping_add(*add),
1966                        off: gen_off,
1967                    });
1968                } else {
1969                    return None;
1970                }
1971            }
1972            FastGenEval::None => return None,
1973        }
1974    }
1975
1976    Some(TrivialFastProgram {
1977        template: row_encoder.template.clone(),
1978        ops,
1979        pk_param,
1980        not_null_param_indices,
1981    })
1982}
1983
1984#[derive(Clone)]
1985pub(super) enum CompiledOnConflict {
1986    DoNothing {
1987        target: Option<ConflictKind>,
1988    },
1989    DoUpdate {
1990        target: ConflictKind,
1991        assignments: Vec<(usize, Expr)>,
1992        where_clause: Option<Expr>,
1993        fast_paths: Option<Vec<DoUpdateFastPath>>,
1994    },
1995}
1996
1997#[derive(Clone, Copy)]
1998pub(super) enum DoUpdateFastPath {
1999    IntAddConst { phys_idx: usize, delta: i64 },
2000}
2001
2002#[derive(Clone, Debug)]
2003pub(super) enum ConflictKind {
2004    PrimaryKey,
2005    UniqueIndex { index_idx: usize },
2006}
2007
2008fn resolve_conflict_target(target: &ConflictTarget, ts: &TableSchema) -> Result<ConflictKind> {
2009    match target {
2010        ConflictTarget::Columns(cols) => {
2011            let col_idx_set: Vec<u16> = cols
2012                .iter()
2013                .map(|name| {
2014                    ts.column_index(name)
2015                        .map(|i| i as u16)
2016                        .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
2017                })
2018                .collect::<Result<_>>()?;
2019            let pk_set = ts.primary_key_columns.clone();
2020            if set_equal(&col_idx_set, &pk_set) {
2021                return Ok(ConflictKind::PrimaryKey);
2022            }
2023            for (index_idx, idx) in ts.indices.iter().enumerate() {
2024                if idx.unique && set_equal(&col_idx_set, &idx.columns) {
2025                    return Ok(ConflictKind::UniqueIndex { index_idx });
2026                }
2027            }
2028            Err(SqlError::Plan(
2029                "ON CONFLICT target does not match any unique constraint".into(),
2030            ))
2031        }
2032        ConflictTarget::Constraint(name) => {
2033            let lower = name.to_ascii_lowercase();
2034            for (index_idx, idx) in ts.indices.iter().enumerate() {
2035                if idx.name.eq_ignore_ascii_case(&lower) {
2036                    if idx.unique {
2037                        return Ok(ConflictKind::UniqueIndex { index_idx });
2038                    }
2039                    return Err(SqlError::Plan(format!(
2040                        "ON CONFLICT ON CONSTRAINT '{name}' requires a unique index"
2041                    )));
2042                }
2043            }
2044            Err(SqlError::Plan(format!(
2045                "unknown constraint '{name}'; primary keys cannot be referenced by name, use ON CONFLICT (col_list)"
2046            )))
2047        }
2048    }
2049}
2050
2051fn set_equal(a: &[u16], b: &[u16]) -> bool {
2052    if a.len() != b.len() {
2053        return false;
2054    }
2055    let mut a_sorted = a.to_vec();
2056    let mut b_sorted = b.to_vec();
2057    a_sorted.sort_unstable();
2058    b_sorted.sort_unstable();
2059    a_sorted == b_sorted
2060}
2061
2062pub(super) enum InsertRowOutcome {
2063    Inserted,
2064    Updated { old: Vec<Value>, new: Vec<Value> },
2065    Skipped,
2066}
2067
2068#[allow(clippy::too_many_arguments)]
2069#[inline]
2070pub(super) fn apply_insert_with_conflict(
2071    wtx: &mut WriteTxn<'_>,
2072    table_schema: &TableSchema,
2073    key_buf: &[u8],
2074    value_buf: &[u8],
2075    row: &[Value],
2076    pk_values: &[Value],
2077    on_conflict: &CompiledOnConflict,
2078    col_map: &ColumnMap,
2079    capture_returning: bool,
2080) -> Result<InsertRowOutcome> {
2081    let table_bytes = table_schema.name.as_bytes();
2082
2083    if let CompiledOnConflict::DoNothing { target } = on_conflict {
2084        let pk_target = matches!(target, None | Some(ConflictKind::PrimaryKey));
2085        if pk_target && table_schema.indices.is_empty() && table_schema.foreign_keys.is_empty() {
2086            let inserted = wtx
2087                .table_insert_if_absent(table_bytes, key_buf, value_buf)
2088                .map_err(SqlError::Storage)?;
2089            return Ok(if inserted {
2090                InsertRowOutcome::Inserted
2091            } else {
2092                InsertRowOutcome::Skipped
2093            });
2094        }
2095    }
2096
2097    if let CompiledOnConflict::DoUpdate {
2098        target: ConflictKind::PrimaryKey,
2099        assignments,
2100        where_clause,
2101        fast_paths,
2102    } = on_conflict
2103    {
2104        if can_fuse_do_update(table_schema, assignments) {
2105            return apply_do_update_fused(
2106                wtx,
2107                table_schema,
2108                table_bytes,
2109                key_buf,
2110                value_buf,
2111                row,
2112                assignments,
2113                where_clause.as_ref(),
2114                col_map,
2115                fast_paths.as_deref(),
2116                capture_returning,
2117            );
2118        }
2119    }
2120
2121    let primary_outcome = wtx
2122        .table_insert_or_fetch(table_bytes, key_buf, value_buf)
2123        .map_err(SqlError::Storage)?;
2124
2125    match primary_outcome {
2126        citadel_txn::write_txn::InsertOutcome::Inserted => {
2127            if table_schema.indices.is_empty() {
2128                return Ok(InsertRowOutcome::Inserted);
2129            }
2130            let mut inserted_keys: Vec<(usize, Vec<u8>)> = Vec::new();
2131            match insert_index_entries_or_fetch(
2132                wtx,
2133                table_schema,
2134                row,
2135                pk_values,
2136                &mut inserted_keys,
2137            )? {
2138                None => Ok(InsertRowOutcome::Inserted),
2139                Some(conflicting_idx) => {
2140                    let matches_target =
2141                        matches!(on_conflict, CompiledOnConflict::DoNothing { target: None })
2142                            || matches!(
2143                                on_conflict,
2144                                CompiledOnConflict::DoNothing {
2145                                    target: Some(ConflictKind::UniqueIndex { index_idx }),
2146                                } | CompiledOnConflict::DoUpdate {
2147                                    target: ConflictKind::UniqueIndex { index_idx },
2148                                    ..
2149                                } if *index_idx == conflicting_idx
2150                            );
2151                    undo_partial_insert(wtx, table_schema, key_buf, &inserted_keys)?;
2152                    if !matches_target {
2153                        return Err(SqlError::UniqueViolation(
2154                            table_schema.indices[conflicting_idx].name.clone(),
2155                        ));
2156                    }
2157                    match on_conflict {
2158                        CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2159                        CompiledOnConflict::DoUpdate {
2160                            assignments,
2161                            where_clause,
2162                            ..
2163                        } => {
2164                            let existing_pk =
2165                                fetch_unique_index_pk(wtx, table_schema, conflicting_idx, row)?;
2166                            apply_do_update(
2167                                wtx,
2168                                table_schema,
2169                                &existing_pk,
2170                                row,
2171                                assignments,
2172                                where_clause.as_ref(),
2173                                col_map,
2174                                capture_returning,
2175                            )
2176                        }
2177                    }
2178                }
2179            }
2180        }
2181        citadel_txn::write_txn::InsertOutcome::Existed(old_bytes) => {
2182            let matches_target = matches!(
2183                on_conflict,
2184                CompiledOnConflict::DoNothing { target: None }
2185                    | CompiledOnConflict::DoNothing {
2186                        target: Some(ConflictKind::PrimaryKey),
2187                    }
2188                    | CompiledOnConflict::DoUpdate {
2189                        target: ConflictKind::PrimaryKey,
2190                        ..
2191                    }
2192            );
2193            if !matches_target {
2194                return Err(SqlError::DuplicateKey);
2195            }
2196            match on_conflict {
2197                CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2198                CompiledOnConflict::DoUpdate {
2199                    assignments,
2200                    where_clause,
2201                    ..
2202                } => {
2203                    let old_row = decode_full_row(table_schema, key_buf, &old_bytes)?;
2204                    apply_do_update_with_old_row(
2205                        wtx,
2206                        table_schema,
2207                        key_buf,
2208                        &old_row,
2209                        row,
2210                        assignments,
2211                        where_clause.as_ref(),
2212                        col_map,
2213                        capture_returning,
2214                    )
2215                }
2216            }
2217        }
2218    }
2219}
2220
2221#[inline]
2222fn apply_fast_path_patch(
2223    old_bytes: &[u8],
2224    fast_paths: &[DoUpdateFastPath],
2225) -> Result<UpsertAction> {
2226    UPSERT_SCRATCH.with(|slot| {
2227        let mut bufs = slot.borrow_mut();
2228        bufs.new_value_buf.clear();
2229        bufs.new_value_buf.extend_from_slice(old_bytes);
2230
2231        let mut patch_scratch: Vec<u8> = Vec::new();
2232
2233        for fp in fast_paths {
2234            match fp {
2235                DoUpdateFastPath::IntAddConst { phys_idx, delta } => {
2236                    let decoded =
2237                        crate::encoding::decode_columns(&bufs.new_value_buf, &[*phys_idx])?;
2238                    let old_val = &decoded[0];
2239                    let new_val = match old_val {
2240                        Value::Integer(i) => Value::Integer(i.wrapping_add(*delta)),
2241                        Value::Null => Value::Null,
2242                        _ => {
2243                            return Err(SqlError::TypeMismatch {
2244                                expected: "INTEGER".into(),
2245                                got: old_val.data_type().to_string(),
2246                            });
2247                        }
2248                    };
2249                    if !crate::encoding::patch_column_in_place(
2250                        &mut bufs.new_value_buf,
2251                        *phys_idx,
2252                        &new_val,
2253                    )? {
2254                        patch_scratch.clear();
2255                        crate::encoding::patch_row_column(
2256                            &bufs.new_value_buf,
2257                            *phys_idx,
2258                            &new_val,
2259                            &mut patch_scratch,
2260                        )?;
2261                        std::mem::swap(&mut bufs.new_value_buf, &mut patch_scratch);
2262                    }
2263                }
2264            }
2265        }
2266
2267        if bufs.new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2268            return Err(SqlError::RowTooLarge {
2269                size: bufs.new_value_buf.len(),
2270                max: citadel_core::MAX_VALUE_SIZE,
2271            });
2272        }
2273
2274        Ok(UpsertAction::Replace(bufs.new_value_buf.clone()))
2275    })
2276}
2277
2278fn upsert_needs_row(oc: &CompiledOnConflict, ts: &TableSchema) -> bool {
2279    if !ts.indices.is_empty() {
2280        return true;
2281    }
2282    match oc {
2283        CompiledOnConflict::DoNothing { .. } => false,
2284        CompiledOnConflict::DoUpdate { fast_paths, .. } => fast_paths.is_none() || ts.has_checks(),
2285    }
2286}
2287
2288fn can_fuse_do_update(ts: &TableSchema, assignments: &[(usize, Expr)]) -> bool {
2289    if !ts.indices.is_empty() {
2290        return false;
2291    }
2292    if !ts.foreign_keys.is_empty() {
2293        return false;
2294    }
2295    if ts.columns.iter().any(|c| c.generated_kind.is_some()) {
2296        return false;
2297    }
2298    let pk = ts.pk_indices();
2299    !assignments.iter().any(|(ci, _)| pk.contains(ci))
2300}
2301
2302#[allow(clippy::too_many_arguments)]
2303#[inline]
2304fn apply_do_update_fused(
2305    wtx: &mut WriteTxn<'_>,
2306    table_schema: &TableSchema,
2307    table_bytes: &[u8],
2308    key_buf: &[u8],
2309    value_buf: &[u8],
2310    proposed_row: &[Value],
2311    assignments: &[(usize, Expr)],
2312    where_clause: Option<&Expr>,
2313    col_map: &ColumnMap,
2314    fast_paths: Option<&[DoUpdateFastPath]>,
2315    capture_returning: bool,
2316) -> Result<InsertRowOutcome> {
2317    let non_pk = table_schema.non_pk_indices();
2318    let enc_pos = table_schema.encoding_positions();
2319    let phys_count = table_schema.physical_non_pk_count();
2320    let dropped = table_schema.dropped_non_pk_slots();
2321    let has_checks = table_schema.has_checks();
2322    let has_fks = !table_schema.foreign_keys.is_empty();
2323
2324    let captured: std::cell::RefCell<Option<(Vec<Value>, Vec<Value>)>> =
2325        std::cell::RefCell::new(None);
2326
2327    let outcome =
2328        wtx.table_upsert_with::<_, SqlError>(table_bytes, key_buf, value_buf, |old_bytes| {
2329            if let Some(fps) = fast_paths {
2330                if !has_checks {
2331                    let action = apply_fast_path_patch(old_bytes, fps)?;
2332                    if capture_returning {
2333                        if let UpsertAction::Replace(ref new_bytes) = action {
2334                            let old_row = decode_full_row(table_schema, key_buf, old_bytes)?;
2335                            let new_row = decode_full_row(table_schema, key_buf, new_bytes)?;
2336                            *captured.borrow_mut() = Some((old_row, new_row));
2337                        }
2338                    }
2339                    return Ok(action);
2340                }
2341            }
2342            UPSERT_SCRATCH.with(|slot| {
2343                let mut bufs = slot.borrow_mut();
2344                let UpsertBufs {
2345                    old_row,
2346                    new_row,
2347                    value_values,
2348                    new_value_buf,
2349                } = &mut *bufs;
2350
2351                old_row.clear();
2352                old_row.resize(table_schema.columns.len(), Value::Null);
2353                decode_full_row_into(table_schema, key_buf, old_bytes, old_row)?;
2354
2355                if let Some(w) = where_clause {
2356                    let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2357                    let result = eval_expr(w, &ctx)?;
2358                    if result.is_null() || !is_truthy(&result) {
2359                        return Ok(UpsertAction::Skip);
2360                    }
2361                }
2362
2363                new_row.clear();
2364                new_row.extend_from_slice(old_row);
2365                for (col_idx, expr) in assignments {
2366                    let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2367                    let val = eval_expr(expr, &ctx)?;
2368                    let col = &table_schema.columns[*col_idx];
2369                    new_row[*col_idx] = if val.is_null() {
2370                        Value::Null
2371                    } else {
2372                        let got = val.data_type();
2373                        val.coerce_into(col.data_type)
2374                            .ok_or_else(|| SqlError::TypeMismatch {
2375                                expected: col.data_type.to_string(),
2376                                got: got.to_string(),
2377                            })?
2378                    };
2379                }
2380
2381                for (assigned_idx, _) in assignments {
2382                    let col = &table_schema.columns[*assigned_idx];
2383                    if !col.nullable && new_row[col.position as usize].is_null() {
2384                        return Err(SqlError::NotNullViolation(col.name.clone()));
2385                    }
2386                }
2387                if has_checks {
2388                    for col in &table_schema.columns {
2389                        if let Some(ref check) = col.check_expr {
2390                            let ctx = EvalCtx::new(col_map, new_row);
2391                            let result = eval_expr(check, &ctx)?;
2392                            if !is_truthy(&result) && !result.is_null() {
2393                                let name = col.check_name.as_deref().unwrap_or(&col.name);
2394                                return Err(SqlError::CheckViolation(name.to_string()));
2395                            }
2396                        }
2397                    }
2398                    for tc in &table_schema.check_constraints {
2399                        let ctx = EvalCtx::new(col_map, new_row);
2400                        let result = eval_expr(&tc.expr, &ctx)?;
2401                        if !is_truthy(&result) && !result.is_null() {
2402                            let name = tc.name.as_deref().unwrap_or(&tc.sql);
2403                            return Err(SqlError::CheckViolation(name.to_string()));
2404                        }
2405                    }
2406                }
2407                let _ = has_fks;
2408
2409                value_values.clear();
2410                value_values.resize(phys_count, Value::Null);
2411                for &slot in dropped {
2412                    value_values[slot as usize] = Value::Null;
2413                }
2414                for (j, &i) in non_pk.iter().enumerate() {
2415                    value_values[enc_pos[j] as usize] = new_row[i].clone();
2416                }
2417                new_value_buf.clear();
2418                crate::encoding::encode_row_into(value_values, new_value_buf);
2419
2420                if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2421                    return Err(SqlError::RowTooLarge {
2422                        size: new_value_buf.len(),
2423                        max: citadel_core::MAX_VALUE_SIZE,
2424                    });
2425                }
2426
2427                if capture_returning {
2428                    *captured.borrow_mut() = Some((old_row.clone(), new_row.clone()));
2429                }
2430                Ok(UpsertAction::Replace(new_value_buf.clone()))
2431            })
2432        })?;
2433
2434    match outcome {
2435        UpsertOutcome::Inserted => Ok(InsertRowOutcome::Inserted),
2436        UpsertOutcome::Updated => {
2437            if capture_returning {
2438                let (old, new) = captured.into_inner().ok_or_else(|| {
2439                    SqlError::InvalidValue("DO UPDATE produced no captured rows".into())
2440                })?;
2441                Ok(InsertRowOutcome::Updated { old, new })
2442            } else {
2443                Ok(InsertRowOutcome::Inserted)
2444            }
2445        }
2446        UpsertOutcome::Skipped => Ok(InsertRowOutcome::Skipped),
2447    }
2448}
2449
2450fn fetch_unique_index_pk(
2451    wtx: &mut WriteTxn<'_>,
2452    table_schema: &TableSchema,
2453    index_idx: usize,
2454    row: &[Value],
2455) -> Result<Vec<u8>> {
2456    let idx = &table_schema.indices[index_idx];
2457    let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2458    let indexed: Vec<Value> = idx
2459        .columns
2460        .iter()
2461        .map(|&col_idx| row[col_idx as usize].clone())
2462        .collect();
2463    let key = crate::encoding::encode_composite_key(&indexed);
2464    let value = wtx
2465        .table_get(&idx_table, &key)
2466        .map_err(SqlError::Storage)?
2467        .ok_or_else(|| {
2468            SqlError::InvalidValue("unique index missing expected collision entry".into())
2469        })?;
2470    Ok(value)
2471}
2472
2473#[allow(clippy::too_many_arguments)]
2474fn apply_do_update(
2475    wtx: &mut WriteTxn<'_>,
2476    table_schema: &TableSchema,
2477    pk_key: &[u8],
2478    proposed_row: &[Value],
2479    assignments: &[(usize, Expr)],
2480    where_clause: Option<&Expr>,
2481    col_map: &ColumnMap,
2482    capture_returning: bool,
2483) -> Result<InsertRowOutcome> {
2484    let old_value = wtx
2485        .table_get(table_schema.name.as_bytes(), pk_key)
2486        .map_err(SqlError::Storage)?
2487        .ok_or_else(|| SqlError::InvalidValue("primary row missing for DO UPDATE target".into()))?;
2488    let old_row = decode_full_row(table_schema, pk_key, &old_value)?;
2489    apply_do_update_with_old_row(
2490        wtx,
2491        table_schema,
2492        pk_key,
2493        &old_row,
2494        proposed_row,
2495        assignments,
2496        where_clause,
2497        col_map,
2498        capture_returning,
2499    )
2500}
2501
2502#[allow(clippy::too_many_arguments)]
2503fn apply_do_update_with_old_row(
2504    wtx: &mut WriteTxn<'_>,
2505    table_schema: &TableSchema,
2506    old_pk_key: &[u8],
2507    old_row: &[Value],
2508    proposed_row: &[Value],
2509    assignments: &[(usize, Expr)],
2510    where_clause: Option<&Expr>,
2511    col_map: &ColumnMap,
2512    capture_returning: bool,
2513) -> Result<InsertRowOutcome> {
2514    if let Some(w) = where_clause {
2515        let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2516        let result = eval_expr(w, &ctx)?;
2517        if result.is_null() || !is_truthy(&result) {
2518            return Ok(InsertRowOutcome::Skipped);
2519        }
2520    }
2521
2522    let mut new_row = old_row.to_vec();
2523    for (col_idx, expr) in assignments {
2524        let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2525        let val = eval_expr(expr, &ctx)?;
2526        let col = &table_schema.columns[*col_idx];
2527        new_row[*col_idx] = if val.is_null() {
2528            Value::Null
2529        } else {
2530            let got = val.data_type();
2531            val.coerce_into(col.data_type)
2532                .ok_or_else(|| SqlError::TypeMismatch {
2533                    expected: col.data_type.to_string(),
2534                    got: got.to_string(),
2535                })?
2536        };
2537    }
2538
2539    for col in &table_schema.columns {
2540        if matches!(
2541            col.generated_kind,
2542            Some(crate::parser::GeneratedKind::Stored)
2543        ) {
2544            let val = eval_expr(
2545                col.generated_expr.as_ref().unwrap(),
2546                &EvalCtx::new(col_map, &new_row),
2547            )?;
2548            let pos = col.position as usize;
2549            new_row[pos] = if val.is_null() {
2550                if !col.nullable {
2551                    return Err(SqlError::NotNullViolation(col.name.clone()));
2552                }
2553                Value::Null
2554            } else {
2555                let got = val.data_type();
2556                val.coerce_into(col.data_type)
2557                    .ok_or_else(|| SqlError::TypeMismatch {
2558                        expected: col.data_type.to_string(),
2559                        got: got.to_string(),
2560                    })?
2561            };
2562        }
2563    }
2564
2565    let pk_indices = table_schema.pk_indices();
2566    let assigned_pk = assignments.iter().any(|(ci, _)| pk_indices.contains(ci));
2567    let pk_changed = assigned_pk && pk_indices.iter().any(|&i| old_row[i] != new_row[i]);
2568
2569    for (assigned_idx, _) in assignments {
2570        let col = &table_schema.columns[*assigned_idx];
2571        if !col.nullable && new_row[col.position as usize].is_null() {
2572            return Err(SqlError::NotNullViolation(col.name.clone()));
2573        }
2574    }
2575    if table_schema.has_checks() {
2576        for col in &table_schema.columns {
2577            if let Some(ref check) = col.check_expr {
2578                let ctx = EvalCtx::new(col_map, &new_row);
2579                let result = eval_expr(check, &ctx)?;
2580                if !is_truthy(&result) && !result.is_null() {
2581                    let name = col.check_name.as_deref().unwrap_or(&col.name);
2582                    return Err(SqlError::CheckViolation(name.to_string()));
2583                }
2584            }
2585        }
2586        for tc in &table_schema.check_constraints {
2587            let ctx = EvalCtx::new(col_map, &new_row);
2588            let result = eval_expr(&tc.expr, &ctx)?;
2589            if !is_truthy(&result) && !result.is_null() {
2590                let name = tc.name.as_deref().unwrap_or(&tc.sql);
2591                return Err(SqlError::CheckViolation(name.to_string()));
2592            }
2593        }
2594    }
2595    for fk in &table_schema.foreign_keys {
2596        let changed = fk
2597            .columns
2598            .iter()
2599            .any(|&ci| old_row[ci as usize] != new_row[ci as usize]);
2600        if !changed {
2601            continue;
2602        }
2603        let any_null = fk.columns.iter().any(|&ci| new_row[ci as usize].is_null());
2604        if any_null {
2605            continue;
2606        }
2607        let fk_vals: Vec<Value> = fk
2608            .columns
2609            .iter()
2610            .map(|&ci| new_row[ci as usize].clone())
2611            .collect();
2612        let fk_key = crate::encoding::encode_composite_key(&fk_vals);
2613        if fk.deferrable && fk.initially_deferred {
2614            let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
2615            wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
2616                fk_name: name,
2617                foreign_table: fk.foreign_table.as_bytes().to_vec(),
2618                parent_key: fk_key,
2619            });
2620            continue;
2621        }
2622        if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key) {
2623            let found = wtx
2624                .table_get(fk.foreign_table.as_bytes(), &fk_key)
2625                .map_err(SqlError::Storage)?;
2626            if found.is_none() {
2627                let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
2628                return Err(SqlError::ForeignKeyViolation(name.to_string()));
2629            }
2630            wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key);
2631        }
2632    }
2633
2634    let has_indices = !table_schema.indices.is_empty();
2635    let old_pk_values: Vec<Value> = if has_indices || pk_changed {
2636        pk_indices.iter().map(|&i| old_row[i].clone()).collect()
2637    } else {
2638        Vec::new()
2639    };
2640    let new_pk_values: Vec<Value> = if has_indices || pk_changed {
2641        pk_indices.iter().map(|&i| new_row[i].clone()).collect()
2642    } else {
2643        Vec::new()
2644    };
2645
2646    let non_pk = table_schema.non_pk_indices();
2647    let enc_pos = table_schema.encoding_positions();
2648    let phys_count = table_schema.physical_non_pk_count();
2649    let dropped = table_schema.dropped_non_pk_slots();
2650    let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
2651    for &slot in dropped {
2652        value_values[slot as usize] = Value::Null;
2653    }
2654    for (j, &i) in non_pk.iter().enumerate() {
2655        let col = &table_schema.columns[i];
2656        value_values[enc_pos[j] as usize] = if matches!(
2657            col.generated_kind,
2658            Some(crate::parser::GeneratedKind::Virtual)
2659        ) {
2660            Value::Null
2661        } else {
2662            new_row[i].clone()
2663        };
2664    }
2665    let mut new_value_buf = Vec::with_capacity(256);
2666    crate::encoding::encode_row_into(&value_values, &mut new_value_buf);
2667
2668    if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2669        return Err(SqlError::RowTooLarge {
2670            size: new_value_buf.len(),
2671            max: citadel_core::MAX_VALUE_SIZE,
2672        });
2673    }
2674
2675    if pk_changed {
2676        let new_pk_key = crate::encoding::encode_composite_key(&new_pk_values);
2677        let inserted = wtx
2678            .table_insert(table_schema.name.as_bytes(), &new_pk_key, &new_value_buf)
2679            .map_err(SqlError::Storage)?;
2680        if !inserted {
2681            return Err(SqlError::DuplicateKey);
2682        }
2683        wtx.table_delete(table_schema.name.as_bytes(), old_pk_key)
2684            .map_err(SqlError::Storage)?;
2685        for idx in &table_schema.indices {
2686            let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2687            let old_idx_key = encode_index_key(idx, old_row, &old_pk_values);
2688            wtx.table_delete(&idx_table, &old_idx_key)
2689                .map_err(SqlError::Storage)?;
2690            let new_idx_key = encode_index_key(idx, &new_row, &new_pk_values);
2691            let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
2692            let is_new = wtx
2693                .table_insert(&idx_table, &new_idx_key, &new_idx_val)
2694                .map_err(SqlError::Storage)?;
2695            if idx.unique && !is_new {
2696                let any_null = idx.columns.iter().any(|&c| new_row[c as usize].is_null());
2697                if !any_null {
2698                    return Err(SqlError::UniqueViolation(idx.name.clone()));
2699                }
2700            }
2701        }
2702    } else {
2703        wtx.table_update_sorted(
2704            table_schema.name.as_bytes(),
2705            &[(old_pk_key, new_value_buf.as_slice())],
2706        )
2707        .map_err(SqlError::Storage)?;
2708        let col_map_partial = any_partial_index(table_schema).then(|| table_schema.column_map());
2709        for idx in &table_schema.indices {
2710            let cols_changed = index_columns_changed(idx, old_row, &new_row);
2711            let (del, ins) = partial_idx_update_actions(
2712                idx,
2713                old_row,
2714                &new_row,
2715                cols_changed,
2716                false,
2717                col_map_partial,
2718            );
2719            let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2720            if del {
2721                let old_idx_key = encode_index_key(idx, old_row, &old_pk_values);
2722                wtx.table_delete(&idx_table, &old_idx_key)
2723                    .map_err(SqlError::Storage)?;
2724            }
2725            if ins {
2726                let new_idx_key = encode_index_key(idx, &new_row, &new_pk_values);
2727                let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
2728                let is_new = wtx
2729                    .table_insert(&idx_table, &new_idx_key, &new_idx_val)
2730                    .map_err(SqlError::Storage)?;
2731                if idx.unique && !is_new {
2732                    let any_null = idx.columns.iter().any(|&c| new_row[c as usize].is_null());
2733                    if !any_null {
2734                        return Err(SqlError::UniqueViolation(idx.name.clone()));
2735                    }
2736                }
2737            }
2738        }
2739    }
2740
2741    if capture_returning {
2742        Ok(InsertRowOutcome::Updated {
2743            old: old_row.to_vec(),
2744            new: new_row,
2745        })
2746    } else {
2747        Ok(InsertRowOutcome::Inserted)
2748    }
2749}
2750
2751fn detect_fast_paths(
2752    ts: &TableSchema,
2753    assignments: &[(usize, Expr)],
2754) -> Option<Vec<DoUpdateFastPath>> {
2755    let non_pk = ts.non_pk_indices();
2756    let enc_pos = ts.encoding_positions();
2757    let mut out = Vec::with_capacity(assignments.len());
2758    for (col_idx, expr) in assignments {
2759        let col = &ts.columns[*col_idx];
2760        if col.data_type != DataType::Integer {
2761            return None;
2762        }
2763        let nonpk_order = non_pk.iter().position(|&i| i == *col_idx)?;
2764        let phys_idx = enc_pos[nonpk_order] as usize;
2765
2766        if let Expr::BinaryOp { left, op, right } = expr {
2767            if !matches!(op, BinOp::Add | BinOp::Sub) {
2768                return None;
2769            }
2770            let reads_target =
2771                matches!(left.as_ref(), Expr::Column(n) if n.eq_ignore_ascii_case(&col.name));
2772            if !reads_target {
2773                return None;
2774            }
2775            if let Expr::Literal(Value::Integer(n)) = right.as_ref() {
2776                let delta = if matches!(op, BinOp::Sub) { -n } else { *n };
2777                let _ = col_idx;
2778                out.push(DoUpdateFastPath::IntAddConst { phys_idx, delta });
2779                continue;
2780            }
2781            return None;
2782        }
2783        return None;
2784    }
2785    Some(out)
2786}
2787
2788fn compile_on_conflict(oc: &OnConflictClause, ts: &TableSchema) -> Result<CompiledOnConflict> {
2789    let target = oc
2790        .target
2791        .as_ref()
2792        .map(|t| resolve_conflict_target(t, ts))
2793        .transpose()?;
2794    match &oc.action {
2795        OnConflictAction::DoNothing => Ok(CompiledOnConflict::DoNothing { target }),
2796        OnConflictAction::DoUpdate {
2797            assignments,
2798            where_clause,
2799        } => {
2800            let target = target.ok_or_else(|| {
2801                SqlError::Plan("ON CONFLICT without target requires DO NOTHING".into())
2802            })?;
2803            let compiled_assignments: Vec<(usize, Expr)> = assignments
2804                .iter()
2805                .map(|(name, expr)| {
2806                    let col_idx = ts
2807                        .column_index(name)
2808                        .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?;
2809                    Ok((col_idx, expr.clone()))
2810                })
2811                .collect::<Result<_>>()?;
2812            let fast_paths = if where_clause.is_none() {
2813                detect_fast_paths(ts, &compiled_assignments)
2814            } else {
2815                None
2816            };
2817            Ok(CompiledOnConflict::DoUpdate {
2818                target,
2819                assignments: compiled_assignments,
2820                where_clause: where_clause.clone(),
2821                fast_paths,
2822            })
2823        }
2824    }
2825}
2826
2827/// Caller MUST check `cache.is_trivial_fast` first.
2828fn exec_insert_trivial_fast(
2829    wtx: &mut WriteTxn<'_>,
2830    table_lower: &str,
2831    cache: &InsertCache,
2832    bufs: &mut InsertBufs,
2833    params: &[Value],
2834) -> Result<ExecutionResult> {
2835    let prog = cache
2836        .trivial_fast_program
2837        .as_ref()
2838        .expect("trivial fast: program");
2839
2840    for &p in &prog.not_null_param_indices {
2841        if params[p as usize].is_null() {
2842            return Err(SqlError::NotNullViolation(format!("param@{p}")));
2843        }
2844    }
2845
2846    match &params[prog.pk_param as usize] {
2847        Value::Integer(v) => crate::encoding::encode_int_key_into(*v, &mut bufs.key_buf),
2848        _ => return Err(SqlError::InvalidValue("non-integer PK in fast path".into())),
2849    }
2850
2851    bufs.value_buf.clear();
2852    bufs.value_buf.extend_from_slice(&prog.template);
2853
2854    for op in &prog.ops {
2855        match op {
2856            WriteOp::ParamI64 { param_idx, off } => match &params[*param_idx as usize] {
2857                Value::Integer(v) => {
2858                    let off = *off as usize;
2859                    bufs.value_buf[off..off + 8].copy_from_slice(&v.to_le_bytes());
2860                }
2861                other => {
2862                    return Err(SqlError::TypeMismatch {
2863                        expected: "Integer".into(),
2864                        got: other.data_type().to_string(),
2865                    });
2866                }
2867            },
2868            WriteOp::LiteralI64 { value, off } => {
2869                let off = *off as usize;
2870                bufs.value_buf[off..off + 8].copy_from_slice(&value.to_le_bytes());
2871            }
2872            WriteOp::GenAddParamsI64 {
2873                a_param,
2874                b_param,
2875                off,
2876                bitmap_byte_off,
2877                bitmap_bit_mask,
2878            } => match (&params[*a_param as usize], &params[*b_param as usize]) {
2879                (Value::Integer(a), Value::Integer(b)) => {
2880                    let off = *off as usize;
2881                    bufs.value_buf[off..off + 8].copy_from_slice(&a.wrapping_add(*b).to_le_bytes());
2882                }
2883                _ => {
2884                    bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
2885                }
2886            },
2887            WriteOp::GenMulAddParamI64 {
2888                param_idx,
2889                mul,
2890                add,
2891                off,
2892                bitmap_byte_off,
2893                bitmap_bit_mask,
2894            } => match &params[*param_idx as usize] {
2895                Value::Integer(v) => {
2896                    let r = v.wrapping_mul(*mul).wrapping_add(*add);
2897                    let off = *off as usize;
2898                    bufs.value_buf[off..off + 8].copy_from_slice(&r.to_le_bytes());
2899                }
2900                _ => {
2901                    bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
2902                }
2903            },
2904        }
2905    }
2906
2907    let is_new = wtx
2908        .table_insert(table_lower.as_bytes(), &bufs.key_buf, &bufs.value_buf)
2909        .map_err(SqlError::Storage)?;
2910    if !is_new {
2911        return Err(SqlError::DuplicateKey);
2912    }
2913    Ok(ExecutionResult::RowsAffected(1))
2914}
2915
2916fn build_bind_plan(
2917    stmt: &InsertStmt,
2918    col_indices: &[usize],
2919    col_data_types: &[DataType],
2920) -> Option<Vec<BindAction>> {
2921    let rows = match &stmt.source {
2922        InsertSource::Values(rows) => rows,
2923        _ => return None,
2924    };
2925    if rows.len() != 1 {
2926        return None;
2927    }
2928    let value_row = &rows[0];
2929    if value_row.len() != col_indices.len() {
2930        return None;
2931    }
2932    let mut plan = Vec::with_capacity(value_row.len());
2933    for (i, expr) in value_row.iter().enumerate() {
2934        let col_idx = col_indices[i];
2935        let target = col_data_types[col_idx];
2936        match expr {
2937            Expr::Parameter(n) => {
2938                if *n == 0 {
2939                    return None;
2940                }
2941                plan.push(BindAction::Param {
2942                    param_idx: n - 1,
2943                    col_idx,
2944                    target,
2945                });
2946            }
2947            Expr::Literal(v) => plan.push(BindAction::Literal {
2948                value: v.clone(),
2949                col_idx,
2950            }),
2951            _ => return None,
2952        }
2953    }
2954    Some(plan)
2955}
2956
2957impl CompiledInsert {
2958    pub fn try_compile(schema: &SchemaManager, stmt: &InsertStmt) -> Option<Self> {
2959        let lower = stmt.table.to_ascii_lowercase();
2960        let cached = if let Some(ts) = schema.get(&lower) {
2961            let insert_columns: Vec<&str> = if stmt.columns.is_empty() {
2962                ts.columns.iter().map(|c| c.name.as_str()).collect()
2963            } else {
2964                stmt.columns.iter().map(|s| s.as_str()).collect()
2965            };
2966            let mut col_indices = Vec::with_capacity(insert_columns.len());
2967            for name in &insert_columns {
2968                col_indices.push(ts.column_index(name)?);
2969            }
2970            if col_indices
2971                .iter()
2972                .any(|&ci| ts.columns[ci].generated_kind.is_some())
2973            {
2974                return None;
2975            }
2976            let on_conflict = stmt
2977                .on_conflict
2978                .as_ref()
2979                .map(|oc| compile_on_conflict(oc, ts))
2980                .transpose()
2981                .ok()
2982                .flatten()
2983                .map(Arc::new);
2984            let generated_col_positions: Vec<usize> = ts
2985                .columns
2986                .iter()
2987                .enumerate()
2988                .filter_map(|(i, c)| {
2989                    matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored))
2990                        .then_some(i)
2991                })
2992                .collect();
2993            let generated_fast_evals: Vec<FastGenEval> = generated_col_positions
2994                .iter()
2995                .map(|&pos| {
2996                    detect_fast_gen_eval(ts.columns[pos].generated_expr.as_ref().unwrap(), ts)
2997                })
2998                .collect();
2999            let row_col_map = if on_conflict.is_some() || !generated_col_positions.is_empty() {
3000                Some(ColumnMap::new(&ts.columns))
3001            } else {
3002                None
3003            };
3004            let pk_indices: Vec<usize> = ts.pk_indices().to_vec();
3005            let non_pk_indices: Vec<usize> = ts.non_pk_indices().to_vec();
3006            let encoding_positions: Vec<u16> = ts.encoding_positions().to_vec();
3007            let dropped_non_pk_slots: Vec<u16> = ts.dropped_non_pk_slots().to_vec();
3008            let phys_count = ts.physical_non_pk_count();
3009            let col_data_types: Vec<DataType> = ts.columns.iter().map(|c| c.data_type).collect();
3010            let single_int_pk =
3011                pk_indices.len() == 1 && ts.columns[pk_indices[0]].data_type == DataType::Integer;
3012            let not_null_indices: Vec<u16> = ts
3013                .columns
3014                .iter()
3015                .filter(|c| !c.nullable)
3016                .map(|c| c.position)
3017                .collect();
3018            let bind_plan = build_bind_plan(stmt, &col_indices, &col_data_types);
3019            let any_defaults_flag = ts.columns.iter().any(|c| c.default_expr.is_some());
3020            let row_fully_overwritten = if any_defaults_flag {
3021                false
3022            } else {
3023                let mut covered: rustc_hash::FxHashSet<usize> =
3024                    col_indices.iter().copied().collect();
3025                covered.extend(generated_col_positions.iter().copied());
3026                for (j, &i) in non_pk_indices.iter().enumerate() {
3027                    let _ = j;
3028                    if matches!(
3029                        ts.columns[i].generated_kind,
3030                        Some(crate::parser::GeneratedKind::Virtual)
3031                    ) {
3032                        covered.insert(i);
3033                    }
3034                }
3035                bind_plan.is_some() && covered.len() == ts.columns.len()
3036            };
3037            let has_fks = !ts.foreign_keys.is_empty();
3038            let has_indices = !ts.indices.is_empty();
3039            let mut non_virtual_pairs: Vec<(usize, usize)> = Vec::new();
3040            let mut null_value_slots: Vec<usize> =
3041                dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3042            for (j, &i) in non_pk_indices.iter().enumerate() {
3043                let slot = encoding_positions[j] as usize;
3044                if matches!(
3045                    ts.columns[i].generated_kind,
3046                    Some(crate::parser::GeneratedKind::Virtual)
3047                ) {
3048                    null_value_slots.push(slot);
3049                } else {
3050                    non_virtual_pairs.push((i, slot));
3051                }
3052            }
3053            let row_encoder = {
3054                let all_int_or_null = non_pk_indices.iter().enumerate().all(|(j, &i)| {
3055                    let col = &ts.columns[i];
3056                    if matches!(
3057                        col.generated_kind,
3058                        Some(crate::parser::GeneratedKind::Virtual)
3059                    ) {
3060                        true
3061                    } else {
3062                        col.data_type == DataType::Integer && encoding_positions[j] != u16::MAX
3063                    }
3064                });
3065                if all_int_or_null {
3066                    let mut null_slots: Vec<usize> =
3067                        dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3068                    for (j, &i) in non_pk_indices.iter().enumerate() {
3069                        if matches!(
3070                            ts.columns[i].generated_kind,
3071                            Some(crate::parser::GeneratedKind::Virtual)
3072                        ) {
3073                            null_slots.push(encoding_positions[j] as usize);
3074                        }
3075                    }
3076                    Some(crate::encoding::build_int_row_template(
3077                        phys_count,
3078                        &null_slots,
3079                    ))
3080                } else {
3081                    None
3082                }
3083            };
3084            let is_trivial_fast_eligible = !insert_has_subquery(stmt)
3085                && !ts.columns.iter().any(|c| c.default_expr.is_some())
3086                && !ts.has_checks()
3087                && !has_fks
3088                && !has_indices
3089                && stmt.on_conflict.is_none()
3090                && stmt.returning.is_none()
3091                && bind_plan.is_some()
3092                && row_encoder.is_some()
3093                && row_fully_overwritten
3094                && single_int_pk
3095                && generated_fast_evals
3096                    .iter()
3097                    .all(|fe| !matches!(fe, FastGenEval::None));
3098            let trivial_fast_program = if is_trivial_fast_eligible {
3099                build_trivial_fast_program(
3100                    bind_plan.as_ref().unwrap(),
3101                    row_encoder.as_ref().unwrap(),
3102                    &non_virtual_pairs,
3103                    &generated_col_positions,
3104                    &generated_fast_evals,
3105                    &pk_indices,
3106                    &ts.columns,
3107                )
3108            } else {
3109                None
3110            };
3111            let is_trivial_fast = trivial_fast_program.is_some();
3112            let has_checks = ts.has_checks();
3113            let any_defaults = ts.columns.iter().any(|c| c.default_expr.is_some());
3114            let needs_scoped_params = bind_plan.is_none()
3115                || has_checks
3116                || any_defaults
3117                || !generated_col_positions.is_empty()
3118                || on_conflict.is_some()
3119                || stmt.returning.is_some()
3120                || insert_has_subquery(stmt)
3121                || super::helpers::any_partial_index(ts);
3122            Some(InsertCache {
3123                col_indices,
3124                has_subquery: insert_has_subquery(stmt),
3125                any_defaults,
3126                has_checks,
3127                on_conflict,
3128                row_col_map,
3129                generated_col_positions,
3130                generated_fast_evals,
3131                pk_indices,
3132                non_pk_indices,
3133                encoding_positions,
3134                dropped_non_pk_slots,
3135                phys_count,
3136                single_int_pk,
3137                not_null_indices,
3138                bind_plan,
3139                row_fully_overwritten,
3140                row_encoder,
3141                is_trivial_fast,
3142                trivial_fast_program,
3143                needs_scoped_params,
3144            })
3145        } else if schema.get_view(&lower).is_some() {
3146            None
3147        } else {
3148            return None;
3149        };
3150        Some(Self {
3151            table_lower: lower,
3152            cached,
3153        })
3154    }
3155}
3156
3157impl CompiledPlan for CompiledInsert {
3158    fn execute(
3159        &self,
3160        db: &Database,
3161        schema: &SchemaManager,
3162        stmt: &Statement,
3163        params: &[Value],
3164        wtx: Option<&mut WriteTxn<'_>>,
3165    ) -> Result<ExecutionResult> {
3166        let ins = match stmt {
3167            Statement::Insert(i) => i,
3168            _ => {
3169                return Err(SqlError::Unsupported(
3170                    "CompiledInsert received non-INSERT statement".into(),
3171                ))
3172            }
3173        };
3174        match wtx {
3175            None => exec_insert(db, schema, ins, params),
3176            Some(outer) => match self.cached.as_ref() {
3177                Some(c) if c.is_trivial_fast => with_insert_scratch(|bufs| {
3178                    exec_insert_trivial_fast(outer, &self.table_lower, c, bufs, params)
3179                }),
3180                Some(c) => exec_insert_in_txn_cached(outer, schema, ins, params, c),
3181                None => exec_insert_in_txn(outer, schema, ins, params),
3182            },
3183        }
3184    }
3185
3186    fn uses_scoped_params(&self) -> bool {
3187        match self.cached.as_ref() {
3188            Some(c) => !c.is_trivial_fast && c.needs_scoped_params,
3189            None => true,
3190        }
3191    }
3192}
3193
3194pub struct CompiledDelete {
3195    table_lower: String,
3196}
3197
3198impl CompiledDelete {
3199    pub fn try_compile(schema: &SchemaManager, stmt: &DeleteStmt) -> Option<Self> {
3200        let lower = stmt.table.to_ascii_lowercase();
3201        schema.get(&lower)?;
3202        Some(Self { table_lower: lower })
3203    }
3204}
3205
3206impl CompiledPlan for CompiledDelete {
3207    fn execute(
3208        &self,
3209        db: &Database,
3210        schema: &SchemaManager,
3211        stmt: &Statement,
3212        _params: &[Value],
3213        wtx: Option<&mut WriteTxn<'_>>,
3214    ) -> Result<ExecutionResult> {
3215        let del = match stmt {
3216            Statement::Delete(d) => d,
3217            _ => {
3218                return Err(SqlError::Unsupported(
3219                    "CompiledDelete received non-DELETE statement".into(),
3220                ))
3221            }
3222        };
3223        let _ = &self.table_lower;
3224        match wtx {
3225            None => super::write::exec_delete(db, schema, del),
3226            Some(outer) => super::write::exec_delete_in_txn(outer, schema, del),
3227        }
3228    }
3229}