Skip to main content

citadel_sql/executor/
dml.rs

1use std::cell::RefCell;
2use std::sync::Arc;
3
4use citadel::Database;
5use citadel_buffer::btree::{UpsertAction, UpsertOutcome};
6use citadel_txn::write_txn::WriteTxn;
7use rustc_hash::FxHashMap;
8
9use crate::encoding::{encode_composite_key_into, encode_row_into};
10use crate::error::{Result, SqlError};
11use crate::eval::{eval_expr, is_truthy, ColumnMap, EvalCtx};
12use crate::parser::*;
13use crate::types::*;
14
15use crate::schema::SchemaManager;
16
17use super::compile::CompiledPlan;
18use super::helpers::*;
19use super::CteContext;
20
21pub(super) fn exec_insert(
22    db: &Database,
23    schema: &SchemaManager,
24    stmt: &InsertStmt,
25    params: &[Value],
26) -> Result<ExecutionResult> {
27    let empty_ctes = CteContext::default();
28    let materialized;
29    let stmt = if insert_has_subquery(stmt) {
30        materialized = materialize_insert(stmt, &mut |sub| {
31            exec_subquery_read(db, schema, sub, &empty_ctes)
32        })?;
33        &materialized
34    } else {
35        stmt
36    };
37
38    let lower_name = stmt.table.to_ascii_lowercase();
39    if schema.get_view(&lower_name).is_some() {
40        return Err(SqlError::CannotModifyView(stmt.table.clone()));
41    }
42    let table_schema = schema
43        .get(&lower_name)
44        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
45
46    let insert_columns = if stmt.columns.is_empty() {
47        table_schema
48            .columns
49            .iter()
50            .map(|c| c.name.clone())
51            .collect::<Vec<_>>()
52    } else {
53        stmt.columns
54            .iter()
55            .map(|c| c.to_ascii_lowercase())
56            .collect()
57    };
58
59    let col_indices: Vec<usize> = insert_columns
60        .iter()
61        .map(|name| {
62            table_schema
63                .column_index(name)
64                .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
65        })
66        .collect::<Result<_>>()?;
67
68    for &ci in &col_indices {
69        if table_schema.columns[ci].generated_kind.is_some() {
70            return Err(SqlError::CannotInsertIntoGeneratedColumn(
71                table_schema.columns[ci].name.clone(),
72            ));
73        }
74    }
75
76    let defaults: Vec<(usize, &Expr)> = table_schema
77        .columns
78        .iter()
79        .filter(|c| c.default_expr.is_some() && !col_indices.contains(&(c.position as usize)))
80        .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
81        .collect();
82
83    let generated_cols: Vec<(usize, &Expr)> = table_schema
84        .columns
85        .iter()
86        .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
87        .map(|c| (c.position as usize, c.generated_expr.as_ref().unwrap()))
88        .collect();
89
90    let has_checks = table_schema.has_checks();
91    let row_col_map_for_gen = if !generated_cols.is_empty() {
92        Some(ColumnMap::new(&table_schema.columns))
93    } else {
94        None
95    };
96    let check_col_map = if has_checks {
97        Some(ColumnMap::new(&table_schema.columns))
98    } else {
99        None
100    };
101
102    let select_rows = match &stmt.source {
103        InsertSource::Select(sq) => {
104            let insert_ctes =
105                super::materialize_all_ctes(&sq.ctes, sq.recursive, &mut |body, ctx| {
106                    exec_query_body_read(db, schema, body, ctx)
107                })?;
108            let qr = exec_query_body_read(db, schema, &sq.body, &insert_ctes)?;
109            Some(qr.rows)
110        }
111        InsertSource::Values(_) => None,
112    };
113
114    let compiled_conflict: Option<Arc<CompiledOnConflict>> = stmt
115        .on_conflict
116        .as_ref()
117        .map(|oc| compile_on_conflict(oc, table_schema).map(Arc::new))
118        .transpose()?;
119
120    let row_col_map = compiled_conflict
121        .as_ref()
122        .map(|_| ColumnMap::new(&table_schema.columns));
123
124    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
125    let mut count: u64 = 0;
126    let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
127        stmt.returning.as_ref().map(|_| Vec::new());
128
129    let pk_indices = table_schema.pk_indices();
130    let non_pk = table_schema.non_pk_indices();
131    let enc_pos = table_schema.encoding_positions();
132    let phys_count = table_schema.physical_non_pk_count();
133    let mut row = vec![Value::Null; table_schema.columns.len()];
134    let mut pk_values: Vec<Value> = vec![Value::Null; pk_indices.len()];
135    let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
136    let mut key_buf: Vec<u8> = Vec::with_capacity(64);
137    let mut value_buf: Vec<u8> = Vec::with_capacity(256);
138    let mut fk_key_buf: Vec<u8> = Vec::with_capacity(64);
139
140    let values = match &stmt.source {
141        InsertSource::Values(rows) => Some(rows.as_slice()),
142        InsertSource::Select(_) => None,
143    };
144    let sel_rows = select_rows.as_deref();
145
146    let total = match (values, sel_rows) {
147        (Some(rows), _) => rows.len(),
148        (_, Some(rows)) => rows.len(),
149        _ => 0,
150    };
151
152    if let Some(sel) = sel_rows {
153        if !sel.is_empty() && sel[0].len() != insert_columns.len() {
154            return Err(SqlError::InvalidValue(format!(
155                "INSERT ... SELECT column count mismatch: expected {}, got {}",
156                insert_columns.len(),
157                sel[0].len()
158            )));
159        }
160    }
161
162    for idx in 0..total {
163        for v in row.iter_mut() {
164            *v = Value::Null;
165        }
166
167        if let Some(value_rows) = values {
168            let value_row = &value_rows[idx];
169            if value_row.len() != insert_columns.len() {
170                return Err(SqlError::InvalidValue(format!(
171                    "expected {} values, got {}",
172                    insert_columns.len(),
173                    value_row.len()
174                )));
175            }
176            for (i, expr) in value_row.iter().enumerate() {
177                let val = if let Expr::Parameter(n) = expr {
178                    params
179                        .get(n - 1)
180                        .cloned()
181                        .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?
182                } else {
183                    eval_const_expr(expr)?
184                };
185                let col_idx = col_indices[i];
186                let col = &table_schema.columns[col_idx];
187                let got_type = val.data_type();
188                row[col_idx] = if val.is_null() {
189                    Value::Null
190                } else {
191                    val.coerce_into(col.data_type)
192                        .ok_or_else(|| SqlError::TypeMismatch {
193                            expected: col.data_type.to_string(),
194                            got: got_type.to_string(),
195                        })?
196                };
197            }
198        } else if let Some(sel) = sel_rows {
199            let sel_row = &sel[idx];
200            for (i, val) in sel_row.iter().enumerate() {
201                let col_idx = col_indices[i];
202                let col = &table_schema.columns[col_idx];
203                let got_type = val.data_type();
204                row[col_idx] = if val.is_null() {
205                    Value::Null
206                } else {
207                    val.clone().coerce_into(col.data_type).ok_or_else(|| {
208                        SqlError::TypeMismatch {
209                            expected: col.data_type.to_string(),
210                            got: got_type.to_string(),
211                        }
212                    })?
213                };
214            }
215        }
216
217        for &(pos, def_expr) in &defaults {
218            let val = eval_const_expr(def_expr)?;
219            let col = &table_schema.columns[pos];
220            if !val.is_null() {
221                let got_type = val.data_type();
222                row[pos] =
223                    val.coerce_into(col.data_type)
224                        .ok_or_else(|| SqlError::TypeMismatch {
225                            expected: col.data_type.to_string(),
226                            got: got_type.to_string(),
227                        })?;
228            }
229        }
230
231        if let Some(ref gen_map) = row_col_map_for_gen {
232            for &(pos, gen_expr) in &generated_cols {
233                let val = eval_expr(gen_expr, &EvalCtx::new(gen_map, &row))?;
234                let col = &table_schema.columns[pos];
235                row[pos] = if val.is_null() {
236                    Value::Null
237                } else {
238                    let got_type = val.data_type();
239                    val.coerce_into(col.data_type)
240                        .ok_or_else(|| SqlError::TypeMismatch {
241                            expected: col.data_type.to_string(),
242                            got: got_type.to_string(),
243                        })?
244                };
245            }
246        }
247
248        for col in &table_schema.columns {
249            if !col.nullable && row[col.position as usize].is_null() {
250                return Err(SqlError::NotNullViolation(col.name.clone()));
251            }
252        }
253
254        if let Some(ref col_map) = check_col_map {
255            for col in &table_schema.columns {
256                if let Some(ref check) = col.check_expr {
257                    let result = eval_expr(check, &EvalCtx::new(col_map, &row))?;
258                    if !is_truthy(&result) && !result.is_null() {
259                        let name = col.check_name.as_deref().unwrap_or(&col.name);
260                        return Err(SqlError::CheckViolation(name.to_string()));
261                    }
262                }
263            }
264            for tc in &table_schema.check_constraints {
265                let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &row))?;
266                if !is_truthy(&result) && !result.is_null() {
267                    let name = tc.name.as_deref().unwrap_or(&tc.sql);
268                    return Err(SqlError::CheckViolation(name.to_string()));
269                }
270            }
271        }
272
273        for fk in &table_schema.foreign_keys {
274            let any_null = fk.columns.iter().any(|&ci| row[ci as usize].is_null());
275            if any_null {
276                continue; // MATCH SIMPLE: skip if any FK col is NULL
277            }
278            let fk_vals: Vec<Value> = fk
279                .columns
280                .iter()
281                .map(|&ci| row[ci as usize].clone())
282                .collect();
283            fk_key_buf.clear();
284            encode_composite_key_into(&fk_vals, &mut fk_key_buf);
285            let found = wtx
286                .table_get(fk.foreign_table.as_bytes(), &fk_key_buf)
287                .map_err(SqlError::Storage)?;
288            if found.is_none() {
289                let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
290                return Err(SqlError::ForeignKeyViolation(name.to_string()));
291            }
292        }
293
294        let proposed_row_for_returning: Option<Vec<Value>> =
295            returning_rows.as_ref().map(|_| row.clone());
296
297        for (j, &i) in pk_indices.iter().enumerate() {
298            pk_values[j] = std::mem::replace(&mut row[i], Value::Null);
299        }
300        encode_composite_key_into(&pk_values, &mut key_buf);
301
302        for (j, &i) in non_pk.iter().enumerate() {
303            let col = &table_schema.columns[i];
304            if matches!(
305                col.generated_kind,
306                Some(crate::parser::GeneratedKind::Virtual)
307            ) {
308                value_values[enc_pos[j] as usize] = Value::Null;
309                row[i] = Value::Null;
310            } else {
311                value_values[enc_pos[j] as usize] = std::mem::replace(&mut row[i], Value::Null);
312            }
313        }
314        encode_row_into(&value_values, &mut value_buf);
315
316        if key_buf.len() > citadel_core::MAX_KEY_SIZE {
317            return Err(SqlError::KeyTooLarge {
318                size: key_buf.len(),
319                max: citadel_core::MAX_KEY_SIZE,
320            });
321        }
322        if value_buf.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
323            return Err(SqlError::RowTooLarge {
324                size: value_buf.len(),
325                max: citadel_core::MAX_INLINE_VALUE_SIZE,
326            });
327        }
328
329        match compiled_conflict.as_ref() {
330            None => {
331                let is_new = wtx
332                    .table_insert(stmt.table.as_bytes(), &key_buf, &value_buf)
333                    .map_err(SqlError::Storage)?;
334                if !is_new {
335                    return Err(SqlError::DuplicateKey);
336                }
337                if !table_schema.indices.is_empty() {
338                    for (j, &i) in pk_indices.iter().enumerate() {
339                        row[i] = pk_values[j].clone();
340                    }
341                    for (j, &i) in non_pk.iter().enumerate() {
342                        row[i] =
343                            std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
344                    }
345                    insert_index_entries(&mut wtx, table_schema, &row, &pk_values)?;
346                }
347                count += 1;
348                if let Some(buf) = returning_rows.as_mut() {
349                    buf.push((None, proposed_row_for_returning));
350                }
351            }
352            Some(oc) => {
353                let oc_ref: &CompiledOnConflict = oc;
354                let needs_row = upsert_needs_row(oc_ref, table_schema);
355                if needs_row {
356                    for (j, &i) in pk_indices.iter().enumerate() {
357                        row[i] = pk_values[j].clone();
358                    }
359                    for (j, &i) in non_pk.iter().enumerate() {
360                        row[i] =
361                            std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
362                    }
363                }
364                let outcome = apply_insert_with_conflict(
365                    &mut wtx,
366                    table_schema,
367                    &key_buf,
368                    &value_buf,
369                    &row,
370                    &pk_values,
371                    oc_ref,
372                    row_col_map.as_ref().unwrap(),
373                    stmt.returning.is_some(),
374                )?;
375                match outcome {
376                    InsertRowOutcome::Inserted => {
377                        count += 1;
378                        if let Some(buf) = returning_rows.as_mut() {
379                            buf.push((None, proposed_row_for_returning));
380                        }
381                    }
382                    InsertRowOutcome::Updated { old, new } => {
383                        count += 1;
384                        if let Some(buf) = returning_rows.as_mut() {
385                            buf.push((Some(old), Some(new)));
386                        }
387                    }
388                    InsertRowOutcome::Skipped => {}
389                }
390            }
391        }
392    }
393
394    if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
395        let qr = super::helpers::project_returning(table_schema, returning_cols, &rows)?;
396        wtx.commit().map_err(SqlError::Storage)?;
397        return Ok(ExecutionResult::Query(qr));
398    }
399
400    wtx.commit().map_err(SqlError::Storage)?;
401    Ok(ExecutionResult::RowsAffected(count))
402}
403
404pub(super) fn has_subquery(expr: &Expr) -> bool {
405    crate::parser::has_subquery(expr)
406}
407
408pub(super) fn stmt_has_subquery(stmt: &SelectStmt) -> bool {
409    if let Some(ref w) = stmt.where_clause {
410        if has_subquery(w) {
411            return true;
412        }
413    }
414    if let Some(ref h) = stmt.having {
415        if has_subquery(h) {
416            return true;
417        }
418    }
419    for col in &stmt.columns {
420        if let SelectColumn::Expr { expr, .. } = col {
421            if has_subquery(expr) {
422                return true;
423            }
424        }
425    }
426    for ob in &stmt.order_by {
427        if has_subquery(&ob.expr) {
428            return true;
429        }
430    }
431    for join in &stmt.joins {
432        if let Some(ref on_expr) = join.on_clause {
433            if has_subquery(on_expr) {
434                return true;
435            }
436        }
437    }
438    false
439}
440
441pub(super) fn materialize_expr(
442    expr: &Expr,
443    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
444) -> Result<Expr> {
445    match expr {
446        Expr::InSubquery {
447            expr: e,
448            subquery,
449            negated,
450        } => {
451            let inner = materialize_expr(e, exec_sub)?;
452            let qr = exec_sub(subquery)?;
453            if !qr.columns.is_empty() && qr.columns.len() != 1 {
454                return Err(SqlError::SubqueryMultipleColumns);
455            }
456            let mut values = rustc_hash::FxHashSet::default();
457            let mut has_null = false;
458            for row in &qr.rows {
459                if row[0].is_null() {
460                    has_null = true;
461                } else {
462                    values.insert(row[0].clone());
463                }
464            }
465            Ok(Expr::InSet {
466                expr: Box::new(inner),
467                values,
468                has_null,
469                negated: *negated,
470            })
471        }
472        Expr::ScalarSubquery(subquery) => {
473            let qr = exec_sub(subquery)?;
474            if qr.rows.len() > 1 {
475                return Err(SqlError::SubqueryMultipleRows);
476            }
477            let val = if qr.rows.is_empty() {
478                Value::Null
479            } else {
480                qr.rows[0][0].clone()
481            };
482            Ok(Expr::Literal(val))
483        }
484        Expr::Exists { subquery, negated } => {
485            let qr = exec_sub(subquery)?;
486            let exists = !qr.rows.is_empty();
487            let result = if *negated { !exists } else { exists };
488            Ok(Expr::Literal(Value::Boolean(result)))
489        }
490        Expr::InList {
491            expr: e,
492            list,
493            negated,
494        } => {
495            let inner = materialize_expr(e, exec_sub)?;
496            let items = list
497                .iter()
498                .map(|item| materialize_expr(item, exec_sub))
499                .collect::<Result<Vec<_>>>()?;
500            Ok(Expr::InList {
501                expr: Box::new(inner),
502                list: items,
503                negated: *negated,
504            })
505        }
506        Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
507            left: Box::new(materialize_expr(left, exec_sub)?),
508            op: *op,
509            right: Box::new(materialize_expr(right, exec_sub)?),
510        }),
511        Expr::UnaryOp { op, expr: e } => Ok(Expr::UnaryOp {
512            op: *op,
513            expr: Box::new(materialize_expr(e, exec_sub)?),
514        }),
515        Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(materialize_expr(e, exec_sub)?))),
516        Expr::IsNotNull(e) => Ok(Expr::IsNotNull(Box::new(materialize_expr(e, exec_sub)?))),
517        Expr::InSet {
518            expr: e,
519            values,
520            has_null,
521            negated,
522        } => Ok(Expr::InSet {
523            expr: Box::new(materialize_expr(e, exec_sub)?),
524            values: values.clone(),
525            has_null: *has_null,
526            negated: *negated,
527        }),
528        Expr::Between {
529            expr: e,
530            low,
531            high,
532            negated,
533        } => Ok(Expr::Between {
534            expr: Box::new(materialize_expr(e, exec_sub)?),
535            low: Box::new(materialize_expr(low, exec_sub)?),
536            high: Box::new(materialize_expr(high, exec_sub)?),
537            negated: *negated,
538        }),
539        Expr::Like {
540            expr: e,
541            pattern,
542            escape,
543            negated,
544        } => {
545            let esc = escape
546                .as_ref()
547                .map(|es| materialize_expr(es, exec_sub).map(Box::new))
548                .transpose()?;
549            Ok(Expr::Like {
550                expr: Box::new(materialize_expr(e, exec_sub)?),
551                pattern: Box::new(materialize_expr(pattern, exec_sub)?),
552                escape: esc,
553                negated: *negated,
554            })
555        }
556        Expr::Case {
557            operand,
558            conditions,
559            else_result,
560        } => {
561            let op = operand
562                .as_ref()
563                .map(|e| materialize_expr(e, exec_sub).map(Box::new))
564                .transpose()?;
565            let conds = conditions
566                .iter()
567                .map(|(c, r)| {
568                    Ok((
569                        materialize_expr(c, exec_sub)?,
570                        materialize_expr(r, exec_sub)?,
571                    ))
572                })
573                .collect::<Result<Vec<_>>>()?;
574            let else_r = else_result
575                .as_ref()
576                .map(|e| materialize_expr(e, exec_sub).map(Box::new))
577                .transpose()?;
578            Ok(Expr::Case {
579                operand: op,
580                conditions: conds,
581                else_result: else_r,
582            })
583        }
584        Expr::Coalesce(args) => {
585            let materialized = args
586                .iter()
587                .map(|a| materialize_expr(a, exec_sub))
588                .collect::<Result<Vec<_>>>()?;
589            Ok(Expr::Coalesce(materialized))
590        }
591        Expr::Cast { expr: e, data_type } => Ok(Expr::Cast {
592            expr: Box::new(materialize_expr(e, exec_sub)?),
593            data_type: *data_type,
594        }),
595        Expr::Function {
596            name,
597            args,
598            distinct,
599        } => {
600            let materialized = args
601                .iter()
602                .map(|a| materialize_expr(a, exec_sub))
603                .collect::<Result<Vec<_>>>()?;
604            Ok(Expr::Function {
605                name: name.clone(),
606                args: materialized,
607                distinct: *distinct,
608            })
609        }
610        other => Ok(other.clone()),
611    }
612}
613
614pub(super) fn materialize_stmt(
615    stmt: &SelectStmt,
616    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
617) -> Result<SelectStmt> {
618    let where_clause = stmt
619        .where_clause
620        .as_ref()
621        .map(|e| materialize_expr(e, exec_sub))
622        .transpose()?;
623    let having = stmt
624        .having
625        .as_ref()
626        .map(|e| materialize_expr(e, exec_sub))
627        .transpose()?;
628    let columns = stmt
629        .columns
630        .iter()
631        .map(|c| match c {
632            SelectColumn::AllColumns => Ok(SelectColumn::AllColumns),
633            SelectColumn::AllFromOld => Ok(SelectColumn::AllFromOld),
634            SelectColumn::AllFromNew => Ok(SelectColumn::AllFromNew),
635            SelectColumn::Expr { expr, alias } => Ok(SelectColumn::Expr {
636                expr: materialize_expr(expr, exec_sub)?,
637                alias: alias.clone(),
638            }),
639        })
640        .collect::<Result<Vec<_>>>()?;
641    let order_by = stmt
642        .order_by
643        .iter()
644        .map(|ob| {
645            Ok(OrderByItem {
646                expr: materialize_expr(&ob.expr, exec_sub)?,
647                descending: ob.descending,
648                nulls_first: ob.nulls_first,
649            })
650        })
651        .collect::<Result<Vec<_>>>()?;
652    let joins = stmt
653        .joins
654        .iter()
655        .map(|j| {
656            let on_clause = j
657                .on_clause
658                .as_ref()
659                .map(|e| materialize_expr(e, exec_sub))
660                .transpose()?;
661            Ok(JoinClause {
662                join_type: j.join_type,
663                table: j.table.clone(),
664                on_clause,
665            })
666        })
667        .collect::<Result<Vec<_>>>()?;
668    let group_by = stmt
669        .group_by
670        .iter()
671        .map(|e| materialize_expr(e, exec_sub))
672        .collect::<Result<Vec<_>>>()?;
673    Ok(SelectStmt {
674        columns,
675        from: stmt.from.clone(),
676        from_alias: stmt.from_alias.clone(),
677        joins,
678        distinct: stmt.distinct,
679        where_clause,
680        order_by,
681        limit: stmt.limit.clone(),
682        offset: stmt.offset.clone(),
683        group_by,
684        having,
685    })
686}
687
688pub(super) fn exec_subquery_read(
689    db: &Database,
690    schema: &SchemaManager,
691    stmt: &SelectStmt,
692    ctes: &CteContext,
693) -> Result<QueryResult> {
694    match super::exec_select(db, schema, stmt, ctes)? {
695        ExecutionResult::Query(qr) => Ok(qr),
696        _ => Ok(QueryResult {
697            columns: vec![],
698            rows: vec![],
699        }),
700    }
701}
702
703pub(super) fn exec_subquery_write(
704    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
705    schema: &SchemaManager,
706    stmt: &SelectStmt,
707    ctes: &CteContext,
708) -> Result<QueryResult> {
709    match super::exec_select_in_txn(wtx, schema, stmt, ctes)? {
710        ExecutionResult::Query(qr) => Ok(qr),
711        _ => Ok(QueryResult {
712            columns: vec![],
713            rows: vec![],
714        }),
715    }
716}
717
718pub(super) fn update_has_subquery(stmt: &UpdateStmt) -> bool {
719    stmt.where_clause.as_ref().is_some_and(has_subquery)
720        || stmt.assignments.iter().any(|(_, e)| has_subquery(e))
721}
722
723pub(super) fn materialize_update(
724    stmt: &UpdateStmt,
725    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
726) -> Result<UpdateStmt> {
727    let where_clause = stmt
728        .where_clause
729        .as_ref()
730        .map(|e| materialize_expr(e, exec_sub))
731        .transpose()?;
732    let assignments = stmt
733        .assignments
734        .iter()
735        .map(|(name, expr)| Ok((name.clone(), materialize_expr(expr, exec_sub)?)))
736        .collect::<Result<Vec<_>>>()?;
737    Ok(UpdateStmt {
738        table: stmt.table.clone(),
739        assignments,
740        where_clause,
741        returning: stmt.returning.clone(),
742    })
743}
744
745pub(super) fn delete_has_subquery(stmt: &DeleteStmt) -> bool {
746    stmt.where_clause.as_ref().is_some_and(has_subquery)
747}
748
749pub(super) fn materialize_delete(
750    stmt: &DeleteStmt,
751    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
752) -> Result<DeleteStmt> {
753    let where_clause = stmt
754        .where_clause
755        .as_ref()
756        .map(|e| materialize_expr(e, exec_sub))
757        .transpose()?;
758    Ok(DeleteStmt {
759        table: stmt.table.clone(),
760        where_clause,
761        returning: stmt.returning.clone(),
762    })
763}
764
765pub(super) fn insert_has_subquery(stmt: &InsertStmt) -> bool {
766    match &stmt.source {
767        InsertSource::Values(rows) => rows.iter().any(|row| row.iter().any(has_subquery)),
768        // SELECT source subqueries are handled by exec_select's correlated/non-correlated paths
769        InsertSource::Select(_) => false,
770    }
771}
772
773pub(super) fn materialize_insert(
774    stmt: &InsertStmt,
775    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
776) -> Result<InsertStmt> {
777    let source = match &stmt.source {
778        InsertSource::Values(rows) => {
779            let mat = rows
780                .iter()
781                .map(|row| {
782                    row.iter()
783                        .map(|e| materialize_expr(e, exec_sub))
784                        .collect::<Result<Vec<_>>>()
785                })
786                .collect::<Result<Vec<_>>>()?;
787            InsertSource::Values(mat)
788        }
789        InsertSource::Select(sq) => {
790            let ctes = sq
791                .ctes
792                .iter()
793                .map(|c| {
794                    Ok(CteDefinition {
795                        name: c.name.clone(),
796                        column_aliases: c.column_aliases.clone(),
797                        body: materialize_query_body(&c.body, exec_sub)?,
798                    })
799                })
800                .collect::<Result<Vec<_>>>()?;
801            let body = materialize_query_body(&sq.body, exec_sub)?;
802            InsertSource::Select(Box::new(SelectQuery {
803                ctes,
804                recursive: sq.recursive,
805                body,
806            }))
807        }
808    };
809    Ok(InsertStmt {
810        table: stmt.table.clone(),
811        columns: stmt.columns.clone(),
812        source,
813        on_conflict: stmt.on_conflict.clone(),
814        returning: stmt.returning.clone(),
815    })
816}
817
818pub(super) fn materialize_query_body(
819    body: &QueryBody,
820    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
821) -> Result<QueryBody> {
822    match body {
823        QueryBody::Select(sel) => Ok(QueryBody::Select(Box::new(materialize_stmt(
824            sel, exec_sub,
825        )?))),
826        QueryBody::Compound(comp) => Ok(QueryBody::Compound(Box::new(CompoundSelect {
827            op: comp.op.clone(),
828            all: comp.all,
829            left: Box::new(materialize_query_body(&comp.left, exec_sub)?),
830            right: Box::new(materialize_query_body(&comp.right, exec_sub)?),
831            order_by: comp.order_by.clone(),
832            limit: comp.limit.clone(),
833            offset: comp.offset.clone(),
834        }))),
835        QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Ok(body.clone()),
836    }
837}
838
839pub(super) fn exec_query_body(
840    db: &Database,
841    schema: &SchemaManager,
842    body: &QueryBody,
843    ctes: &CteContext,
844) -> Result<ExecutionResult> {
845    match body {
846        QueryBody::Select(sel) => super::exec_select(db, schema, sel, ctes),
847        QueryBody::Compound(comp) => exec_compound_select(db, schema, comp, ctes),
848        QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Err(
849            SqlError::Unsupported("DML CTE bodies require an active write transaction".into()),
850        ),
851    }
852}
853
854pub(super) fn exec_query_body_in_txn(
855    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
856    schema: &SchemaManager,
857    body: &QueryBody,
858    ctes: &CteContext,
859) -> Result<ExecutionResult> {
860    match body {
861        QueryBody::Select(sel) => super::exec_select_in_txn(wtx, schema, sel, ctes),
862        QueryBody::Compound(comp) => exec_compound_select_in_txn(wtx, schema, comp, ctes),
863        QueryBody::Insert(ins) => exec_insert_in_txn_with_ctes(wtx, schema, ins, &[], ctes),
864        QueryBody::Update(upd) => super::exec_update_in_txn(wtx, schema, upd),
865        QueryBody::Delete(del) => super::exec_delete_in_txn(wtx, schema, del),
866    }
867}
868
869pub(super) fn exec_query_body_read(
870    db: &Database,
871    schema: &SchemaManager,
872    body: &QueryBody,
873    ctes: &CteContext,
874) -> Result<QueryResult> {
875    match exec_query_body(db, schema, body, ctes)? {
876        ExecutionResult::Query(qr) => Ok(qr),
877        _ => Ok(QueryResult {
878            columns: vec![],
879            rows: vec![],
880        }),
881    }
882}
883
884pub(super) fn exec_query_body_write(
885    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
886    schema: &SchemaManager,
887    body: &QueryBody,
888    ctes: &CteContext,
889) -> Result<QueryResult> {
890    match exec_query_body_in_txn(wtx, schema, body, ctes)? {
891        ExecutionResult::Query(qr) => Ok(qr),
892        _ => Ok(QueryResult {
893            columns: vec![],
894            rows: vec![],
895        }),
896    }
897}
898
899pub(super) fn exec_compound_select(
900    db: &Database,
901    schema: &SchemaManager,
902    comp: &CompoundSelect,
903    ctes: &CteContext,
904) -> Result<ExecutionResult> {
905    let left_qr = match exec_query_body(db, schema, &comp.left, ctes)? {
906        ExecutionResult::Query(qr) => qr,
907        _ => QueryResult {
908            columns: vec![],
909            rows: vec![],
910        },
911    };
912    let right_qr = match exec_query_body(db, schema, &comp.right, ctes)? {
913        ExecutionResult::Query(qr) => qr,
914        _ => QueryResult {
915            columns: vec![],
916            rows: vec![],
917        },
918    };
919    apply_set_operation(comp, left_qr, right_qr)
920}
921
922pub(super) fn exec_compound_select_in_txn(
923    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
924    schema: &SchemaManager,
925    comp: &CompoundSelect,
926    ctes: &CteContext,
927) -> Result<ExecutionResult> {
928    let left_qr = match exec_query_body_in_txn(wtx, schema, &comp.left, ctes)? {
929        ExecutionResult::Query(qr) => qr,
930        _ => QueryResult {
931            columns: vec![],
932            rows: vec![],
933        },
934    };
935    let right_qr = match exec_query_body_in_txn(wtx, schema, &comp.right, ctes)? {
936        ExecutionResult::Query(qr) => qr,
937        _ => QueryResult {
938            columns: vec![],
939            rows: vec![],
940        },
941    };
942    apply_set_operation(comp, left_qr, right_qr)
943}
944
945pub(super) fn apply_set_operation(
946    comp: &CompoundSelect,
947    left_qr: QueryResult,
948    right_qr: QueryResult,
949) -> Result<ExecutionResult> {
950    if !left_qr.columns.is_empty()
951        && !right_qr.columns.is_empty()
952        && left_qr.columns.len() != right_qr.columns.len()
953    {
954        return Err(SqlError::CompoundColumnCountMismatch {
955            left: left_qr.columns.len(),
956            right: right_qr.columns.len(),
957        });
958    }
959
960    let columns = left_qr.columns;
961
962    let mut rows = match (&comp.op, comp.all) {
963        (SetOp::Union, true) => {
964            let mut rows = left_qr.rows;
965            rows.extend(right_qr.rows);
966            rows
967        }
968        (SetOp::Union, false) => {
969            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
970            let mut rows = Vec::new();
971            for row in left_qr.rows.into_iter().chain(right_qr.rows) {
972                if !seen.contains(&row) {
973                    seen.insert(row.clone());
974                    rows.push(row);
975                }
976            }
977            rows
978        }
979        (SetOp::Intersect, true) => {
980            let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
981            for row in &right_qr.rows {
982                *right_counts.entry(row.clone()).or_insert(0) += 1;
983            }
984            let mut rows = Vec::new();
985            for row in left_qr.rows {
986                if let Some(count) = right_counts.get_mut(&row) {
987                    if *count > 0 {
988                        *count -= 1;
989                        rows.push(row);
990                    }
991                }
992            }
993            rows
994        }
995        (SetOp::Intersect, false) => {
996            let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
997            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
998            let mut rows = Vec::new();
999            for row in left_qr.rows {
1000                if right_set.contains(&row) && !seen.contains(&row) {
1001                    seen.insert(row.clone());
1002                    rows.push(row);
1003                }
1004            }
1005            rows
1006        }
1007        (SetOp::Except, true) => {
1008            let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1009            for row in &right_qr.rows {
1010                *right_counts.entry(row.clone()).or_insert(0) += 1;
1011            }
1012            let mut rows = Vec::new();
1013            for row in left_qr.rows {
1014                if let Some(count) = right_counts.get_mut(&row) {
1015                    if *count > 0 {
1016                        *count -= 1;
1017                        continue;
1018                    }
1019                }
1020                rows.push(row);
1021            }
1022            rows
1023        }
1024        (SetOp::Except, false) => {
1025            let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1026            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1027            let mut rows = Vec::new();
1028            for row in left_qr.rows {
1029                if !right_set.contains(&row) && !seen.contains(&row) {
1030                    seen.insert(row.clone());
1031                    rows.push(row);
1032                }
1033            }
1034            rows
1035        }
1036    };
1037
1038    if !comp.order_by.is_empty() {
1039        let col_defs: Vec<crate::types::ColumnDef> = columns
1040            .iter()
1041            .enumerate()
1042            .map(|(i, name)| crate::types::ColumnDef {
1043                name: name.clone(),
1044                data_type: crate::types::DataType::Null,
1045                nullable: true,
1046                position: i as u16,
1047                default_expr: None,
1048                default_sql: None,
1049                check_expr: None,
1050                check_sql: None,
1051                check_name: None,
1052                is_with_timezone: false,
1053                generated_expr: None,
1054                generated_sql: None,
1055                generated_kind: None,
1056            })
1057            .collect();
1058        sort_rows(&mut rows, &comp.order_by, &col_defs)?;
1059    }
1060
1061    if let Some(ref offset_expr) = comp.offset {
1062        let offset = eval_const_int(offset_expr)?.max(0) as usize;
1063        if offset < rows.len() {
1064            rows = rows.split_off(offset);
1065        } else {
1066            rows.clear();
1067        }
1068    }
1069
1070    if let Some(ref limit_expr) = comp.limit {
1071        let limit = eval_const_int(limit_expr)?.max(0) as usize;
1072        rows.truncate(limit);
1073    }
1074
1075    Ok(ExecutionResult::Query(QueryResult { columns, rows }))
1076}
1077
1078struct InsertBufs {
1079    row: Vec<Value>,
1080    pk_values: Vec<Value>,
1081    value_values: Vec<Value>,
1082    key_buf: Vec<u8>,
1083    value_buf: Vec<u8>,
1084    col_indices: Vec<usize>,
1085    fk_key_buf: Vec<u8>,
1086}
1087
1088impl InsertBufs {
1089    fn new() -> Self {
1090        Self {
1091            row: Vec::new(),
1092            pk_values: Vec::new(),
1093            value_values: Vec::new(),
1094            key_buf: Vec::with_capacity(64),
1095            value_buf: Vec::with_capacity(256),
1096            col_indices: Vec::new(),
1097            fk_key_buf: Vec::with_capacity(64),
1098        }
1099    }
1100}
1101
1102thread_local! {
1103    static INSERT_SCRATCH: RefCell<InsertBufs> = RefCell::new(InsertBufs::new());
1104    static UPSERT_SCRATCH: RefCell<UpsertBufs> = RefCell::new(UpsertBufs::new());
1105}
1106
1107fn with_insert_scratch<R>(f: impl FnOnce(&mut InsertBufs) -> R) -> R {
1108    INSERT_SCRATCH.with(|slot| f(&mut slot.borrow_mut()))
1109}
1110
1111pub(super) struct UpsertBufs {
1112    old_row: Vec<Value>,
1113    new_row: Vec<Value>,
1114    value_values: Vec<Value>,
1115    new_value_buf: Vec<u8>,
1116}
1117
1118impl UpsertBufs {
1119    pub(super) fn new() -> Self {
1120        Self {
1121            old_row: Vec::new(),
1122            new_row: Vec::new(),
1123            value_values: Vec::new(),
1124            new_value_buf: Vec::with_capacity(256),
1125        }
1126    }
1127}
1128
1129pub fn exec_insert_in_txn(
1130    wtx: &mut WriteTxn<'_>,
1131    schema: &SchemaManager,
1132    stmt: &InsertStmt,
1133    params: &[Value],
1134) -> Result<ExecutionResult> {
1135    with_insert_scratch(|bufs| {
1136        exec_insert_in_txn_impl(
1137            wtx,
1138            schema,
1139            stmt,
1140            params,
1141            bufs,
1142            None,
1143            &CteContext::default(),
1144        )
1145    })
1146}
1147
1148pub(super) fn exec_insert_in_txn_with_ctes(
1149    wtx: &mut WriteTxn<'_>,
1150    schema: &SchemaManager,
1151    stmt: &InsertStmt,
1152    params: &[Value],
1153    outer_ctes: &CteContext,
1154) -> Result<ExecutionResult> {
1155    with_insert_scratch(|bufs| {
1156        exec_insert_in_txn_impl(wtx, schema, stmt, params, bufs, None, outer_ctes)
1157    })
1158}
1159
1160fn exec_insert_in_txn_cached(
1161    wtx: &mut WriteTxn<'_>,
1162    schema: &SchemaManager,
1163    stmt: &InsertStmt,
1164    params: &[Value],
1165    cache: &InsertCache,
1166) -> Result<ExecutionResult> {
1167    with_insert_scratch(|bufs| {
1168        exec_insert_in_txn_impl(
1169            wtx,
1170            schema,
1171            stmt,
1172            params,
1173            bufs,
1174            Some(cache),
1175            &CteContext::default(),
1176        )
1177    })
1178}
1179
1180fn exec_insert_in_txn_impl(
1181    wtx: &mut WriteTxn<'_>,
1182    schema: &SchemaManager,
1183    stmt: &InsertStmt,
1184    params: &[Value],
1185    bufs: &mut InsertBufs,
1186    cache: Option<&InsertCache>,
1187    outer_ctes: &CteContext,
1188) -> Result<ExecutionResult> {
1189    let empty_ctes = CteContext::default();
1190    let materialized;
1191    let has_sub = match cache {
1192        Some(c) => c.has_subquery,
1193        None => insert_has_subquery(stmt),
1194    };
1195    let stmt = if has_sub {
1196        materialized = materialize_insert(stmt, &mut |sub| {
1197            exec_subquery_write(wtx, schema, sub, &empty_ctes)
1198        })?;
1199        &materialized
1200    } else {
1201        stmt
1202    };
1203
1204    let table_schema = schema
1205        .get(&stmt.table)
1206        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1207
1208    let default_columns;
1209    let insert_columns: &[String] = if stmt.columns.is_empty() {
1210        default_columns = table_schema
1211            .columns
1212            .iter()
1213            .map(|c| c.name.clone())
1214            .collect::<Vec<_>>();
1215        &default_columns
1216    } else {
1217        &stmt.columns
1218    };
1219
1220    bufs.col_indices.clear();
1221    if let Some(c) = cache {
1222        bufs.col_indices.extend_from_slice(&c.col_indices);
1223    } else {
1224        for name in insert_columns {
1225            bufs.col_indices.push(
1226                table_schema
1227                    .column_index(name)
1228                    .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?,
1229            );
1230        }
1231    }
1232
1233    if cache.is_none() {
1234        for &ci in &bufs.col_indices {
1235            if table_schema.columns[ci].generated_kind.is_some() {
1236                return Err(SqlError::CannotInsertIntoGeneratedColumn(
1237                    table_schema.columns[ci].name.clone(),
1238                ));
1239            }
1240        }
1241    }
1242
1243    let generated_cols_uncached: Vec<(usize, &Expr, FastGenEval)>;
1244    let cached_gen_positions: &[usize];
1245    let cached_gen_fast_evals: &[FastGenEval];
1246    if let Some(c) = cache {
1247        cached_gen_positions = &c.generated_col_positions;
1248        cached_gen_fast_evals = &c.generated_fast_evals;
1249        generated_cols_uncached = Vec::new();
1250    } else {
1251        cached_gen_positions = &[];
1252        cached_gen_fast_evals = &[];
1253        generated_cols_uncached = table_schema
1254            .columns
1255            .iter()
1256            .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
1257            .map(|c| {
1258                let expr = c.generated_expr.as_ref().unwrap();
1259                let fe = detect_fast_gen_eval(expr, table_schema);
1260                (c.position as usize, expr, fe)
1261            })
1262            .collect();
1263    }
1264    let has_gen_cols = !cached_gen_positions.is_empty() || !generated_cols_uncached.is_empty();
1265    let row_col_map_for_gen_owned: Option<ColumnMap> = if !has_gen_cols || cache.is_some() {
1266        None
1267    } else {
1268        Some(ColumnMap::new(&table_schema.columns))
1269    };
1270    let row_col_map_for_gen: Option<&ColumnMap> = if !has_gen_cols {
1271        None
1272    } else if let Some(c) = cache {
1273        c.row_col_map.as_ref()
1274    } else {
1275        row_col_map_for_gen_owned.as_ref()
1276    };
1277
1278    let any_defaults = match cache {
1279        Some(c) => c.any_defaults,
1280        None => table_schema
1281            .columns
1282            .iter()
1283            .any(|c| c.default_expr.is_some()),
1284    };
1285    let defaults: Vec<(usize, &Expr)> = if any_defaults {
1286        table_schema
1287            .columns
1288            .iter()
1289            .filter(|c| {
1290                c.default_expr.is_some() && !bufs.col_indices.contains(&(c.position as usize))
1291            })
1292            .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
1293            .collect()
1294    } else {
1295        Vec::new()
1296    };
1297
1298    let has_checks = match cache {
1299        Some(c) => c.has_checks,
1300        None => table_schema.has_checks(),
1301    };
1302    let check_col_map = if has_checks {
1303        Some(ColumnMap::new(&table_schema.columns))
1304    } else {
1305        None
1306    };
1307
1308    let (pk_indices, non_pk, enc_pos, phys_count, dropped): (
1309        &[usize],
1310        &[usize],
1311        &[u16],
1312        usize,
1313        &[u16],
1314    ) = if let Some(c) = cache {
1315        (
1316            &c.pk_indices,
1317            &c.non_pk_indices,
1318            &c.encoding_positions,
1319            c.phys_count,
1320            &c.dropped_non_pk_slots,
1321        )
1322    } else {
1323        (
1324            table_schema.pk_indices(),
1325            table_schema.non_pk_indices(),
1326            table_schema.encoding_positions(),
1327            table_schema.physical_non_pk_count(),
1328            table_schema.dropped_non_pk_slots(),
1329        )
1330    };
1331
1332    bufs.row.resize(table_schema.columns.len(), Value::Null);
1333    bufs.pk_values.resize(pk_indices.len(), Value::Null);
1334    bufs.value_values.resize(phys_count, Value::Null);
1335
1336    let table_bytes = stmt.table.as_bytes();
1337    let has_fks = !table_schema.foreign_keys.is_empty();
1338    let has_indices = !table_schema.indices.is_empty();
1339    let has_defaults = !defaults.is_empty();
1340
1341    let compiled_conflict: Option<Arc<CompiledOnConflict>> = match (cache, &stmt.on_conflict) {
1342        (Some(c), Some(_)) if c.on_conflict.is_some() => c.on_conflict.clone(),
1343        (_, Some(oc)) => Some(Arc::new(compile_on_conflict(oc, table_schema)?)),
1344        (_, None) => None,
1345    };
1346
1347    let row_col_map_owned: Option<ColumnMap> =
1348        if compiled_conflict.is_some() && cache.and_then(|c| c.row_col_map.as_ref()).is_none() {
1349            Some(ColumnMap::new(&table_schema.columns))
1350        } else {
1351            None
1352        };
1353    let row_col_map: Option<&ColumnMap> = cache
1354        .and_then(|c| c.row_col_map.as_ref())
1355        .or(row_col_map_owned.as_ref());
1356
1357    let select_rows = match &stmt.source {
1358        InsertSource::Select(sq) => {
1359            let insert_ctes = super::materialize_all_ctes_with_outer(
1360                &sq.ctes,
1361                sq.recursive,
1362                outer_ctes,
1363                &mut |body, ctx| exec_query_body_write(wtx, schema, body, ctx),
1364            )?;
1365            let qr = exec_query_body_write(wtx, schema, &sq.body, &insert_ctes)?;
1366            Some(qr.rows)
1367        }
1368        InsertSource::Values(_) => None,
1369    };
1370
1371    let mut count: u64 = 0;
1372    let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
1373        stmt.returning.as_ref().map(|_| Vec::new());
1374
1375    let values = match &stmt.source {
1376        InsertSource::Values(rows) => Some(rows.as_slice()),
1377        InsertSource::Select(_) => None,
1378    };
1379    let sel_rows = select_rows.as_deref();
1380
1381    let total = match (values, sel_rows) {
1382        (Some(rows), _) => rows.len(),
1383        (_, Some(rows)) => rows.len(),
1384        _ => 0,
1385    };
1386
1387    if let Some(sel) = sel_rows {
1388        if !sel.is_empty() && sel[0].len() != insert_columns.len() {
1389            return Err(SqlError::InvalidValue(format!(
1390                "INSERT ... SELECT column count mismatch: expected {}, got {}",
1391                insert_columns.len(),
1392                sel[0].len()
1393            )));
1394        }
1395    }
1396
1397    let skip_row_clear = cache.is_some_and(|c| c.row_fully_overwritten);
1398    for idx in 0..total {
1399        if !skip_row_clear {
1400            for v in bufs.row.iter_mut() {
1401                *v = Value::Null;
1402            }
1403        }
1404
1405        if let Some(value_rows) = values {
1406            if let Some(plan) = cache.and_then(|c| c.bind_plan.as_ref()) {
1407                for action in plan {
1408                    match action {
1409                        BindAction::Param {
1410                            param_idx,
1411                            col_idx,
1412                            target,
1413                        } => {
1414                            let v = &params[*param_idx];
1415                            bufs.row[*col_idx] = if v.is_null() {
1416                                Value::Null
1417                            } else if v.data_type() == *target {
1418                                v.clone()
1419                            } else {
1420                                let got = v.data_type();
1421                                v.clone().coerce_into(*target).ok_or_else(|| {
1422                                    SqlError::TypeMismatch {
1423                                        expected: target.to_string(),
1424                                        got: got.to_string(),
1425                                    }
1426                                })?
1427                            };
1428                        }
1429                        BindAction::Literal { value, col_idx } => {
1430                            bufs.row[*col_idx] = value.clone();
1431                        }
1432                    }
1433                }
1434            } else {
1435                let value_row = &value_rows[idx];
1436                if value_row.len() != insert_columns.len() {
1437                    return Err(SqlError::InvalidValue(format!(
1438                        "expected {} values, got {}",
1439                        insert_columns.len(),
1440                        value_row.len()
1441                    )));
1442                }
1443                for (i, expr) in value_row.iter().enumerate() {
1444                    let val = match expr {
1445                        Expr::Parameter(n) => params
1446                            .get(n - 1)
1447                            .cloned()
1448                            .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
1449                        Expr::Literal(v) => v.clone(),
1450                        _ => eval_const_expr(expr)?,
1451                    };
1452                    let col_idx = bufs.col_indices[i];
1453                    let col = &table_schema.columns[col_idx];
1454                    let got_type = val.data_type();
1455                    bufs.row[col_idx] = if val.is_null() {
1456                        Value::Null
1457                    } else {
1458                        val.coerce_into(col.data_type)
1459                            .ok_or_else(|| SqlError::TypeMismatch {
1460                                expected: col.data_type.to_string(),
1461                                got: got_type.to_string(),
1462                            })?
1463                    };
1464                }
1465            }
1466        } else if let Some(sel) = sel_rows {
1467            let sel_row = &sel[idx];
1468            for (i, val) in sel_row.iter().enumerate() {
1469                let col_idx = bufs.col_indices[i];
1470                let col = &table_schema.columns[col_idx];
1471                let got_type = val.data_type();
1472                bufs.row[col_idx] = if val.is_null() {
1473                    Value::Null
1474                } else {
1475                    val.clone().coerce_into(col.data_type).ok_or_else(|| {
1476                        SqlError::TypeMismatch {
1477                            expected: col.data_type.to_string(),
1478                            got: got_type.to_string(),
1479                        }
1480                    })?
1481                };
1482            }
1483        }
1484
1485        if has_defaults {
1486            for &(pos, def_expr) in &defaults {
1487                let val = eval_const_expr(def_expr)?;
1488                let col = &table_schema.columns[pos];
1489                if !val.is_null() {
1490                    let got_type = val.data_type();
1491                    bufs.row[pos] =
1492                        val.coerce_into(col.data_type)
1493                            .ok_or_else(|| SqlError::TypeMismatch {
1494                                expected: col.data_type.to_string(),
1495                                got: got_type.to_string(),
1496                            })?;
1497                }
1498            }
1499        }
1500
1501        if let Some(gen_map) = row_col_map_for_gen {
1502            if cache.is_some() {
1503                for (pos, fast) in cached_gen_positions
1504                    .iter()
1505                    .copied()
1506                    .zip(cached_gen_fast_evals.iter())
1507                {
1508                    let gen_expr = table_schema.columns[pos].generated_expr.as_ref().unwrap();
1509                    let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1510                    let col = &table_schema.columns[pos];
1511                    bufs.row[pos] = if val.is_null() {
1512                        Value::Null
1513                    } else {
1514                        let got_type = val.data_type();
1515                        val.coerce_into(col.data_type)
1516                            .ok_or_else(|| SqlError::TypeMismatch {
1517                                expected: col.data_type.to_string(),
1518                                got: got_type.to_string(),
1519                            })?
1520                    };
1521                }
1522            } else {
1523                for (pos, gen_expr, fast) in &generated_cols_uncached {
1524                    let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1525                    let col = &table_schema.columns[*pos];
1526                    bufs.row[*pos] = if val.is_null() {
1527                        Value::Null
1528                    } else {
1529                        let got_type = val.data_type();
1530                        val.coerce_into(col.data_type)
1531                            .ok_or_else(|| SqlError::TypeMismatch {
1532                                expected: col.data_type.to_string(),
1533                                got: got_type.to_string(),
1534                            })?
1535                    };
1536                }
1537            }
1538        }
1539
1540        if let Some(c) = cache {
1541            for &pos in &c.not_null_indices {
1542                if bufs.row[pos as usize].is_null() {
1543                    return Err(SqlError::NotNullViolation(
1544                        table_schema.columns[pos as usize].name.clone(),
1545                    ));
1546                }
1547            }
1548        } else {
1549            for col in &table_schema.columns {
1550                if !col.nullable && bufs.row[col.position as usize].is_null() {
1551                    return Err(SqlError::NotNullViolation(col.name.clone()));
1552                }
1553            }
1554        }
1555
1556        if let Some(ref col_map) = check_col_map {
1557            for col in &table_schema.columns {
1558                if let Some(ref check) = col.check_expr {
1559                    let result = eval_expr(check, &EvalCtx::new(col_map, &bufs.row))?;
1560                    if !is_truthy(&result) && !result.is_null() {
1561                        let name = col.check_name.as_deref().unwrap_or(&col.name);
1562                        return Err(SqlError::CheckViolation(name.to_string()));
1563                    }
1564                }
1565            }
1566            for tc in &table_schema.check_constraints {
1567                let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &bufs.row))?;
1568                if !is_truthy(&result) && !result.is_null() {
1569                    let name = tc.name.as_deref().unwrap_or(&tc.sql);
1570                    return Err(SqlError::CheckViolation(name.to_string()));
1571                }
1572            }
1573        }
1574
1575        if has_fks {
1576            for fk in &table_schema.foreign_keys {
1577                let any_null = fk.columns.iter().any(|&ci| bufs.row[ci as usize].is_null());
1578                if any_null {
1579                    continue;
1580                }
1581                crate::encoding::encode_composite_key_from_indices(
1582                    &fk.columns,
1583                    &bufs.row,
1584                    &mut bufs.fk_key_buf,
1585                );
1586                let found = wtx
1587                    .table_get(fk.foreign_table.as_bytes(), &bufs.fk_key_buf)
1588                    .map_err(SqlError::Storage)?;
1589                if found.is_none() {
1590                    let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
1591                    return Err(SqlError::ForeignKeyViolation(name.to_string()));
1592                }
1593            }
1594        }
1595
1596        let proposed_row_for_returning: Option<Vec<Value>> =
1597            returning_rows.as_ref().map(|_| bufs.row.clone());
1598
1599        for (j, &i) in pk_indices.iter().enumerate() {
1600            bufs.pk_values[j] = std::mem::replace(&mut bufs.row[i], Value::Null);
1601        }
1602        match cache.map(|c| c.single_int_pk).unwrap_or(false) {
1603            true => match bufs.pk_values[0] {
1604                Value::Integer(v) => crate::encoding::encode_int_key_into(v, &mut bufs.key_buf),
1605                _ => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1606            },
1607            false => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1608        }
1609
1610        for &slot in dropped {
1611            bufs.value_values[slot as usize] = Value::Null;
1612        }
1613        for (j, &i) in non_pk.iter().enumerate() {
1614            let col = &table_schema.columns[i];
1615            if matches!(
1616                col.generated_kind,
1617                Some(crate::parser::GeneratedKind::Virtual)
1618            ) {
1619                bufs.value_values[enc_pos[j] as usize] = Value::Null;
1620                bufs.row[i] = Value::Null;
1621            } else {
1622                bufs.value_values[enc_pos[j] as usize] =
1623                    std::mem::replace(&mut bufs.row[i], Value::Null);
1624            }
1625        }
1626        match cache.and_then(|c| c.row_encoder.as_ref()) {
1627            Some(tmpl) => crate::encoding::encode_int_row_with_template(
1628                tmpl,
1629                &bufs.value_values,
1630                &mut bufs.value_buf,
1631            )?,
1632            None => encode_row_into(&bufs.value_values, &mut bufs.value_buf),
1633        }
1634
1635        if bufs.key_buf.len() > citadel_core::MAX_KEY_SIZE {
1636            return Err(SqlError::KeyTooLarge {
1637                size: bufs.key_buf.len(),
1638                max: citadel_core::MAX_KEY_SIZE,
1639            });
1640        }
1641        if bufs.value_buf.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
1642            return Err(SqlError::RowTooLarge {
1643                size: bufs.value_buf.len(),
1644                max: citadel_core::MAX_INLINE_VALUE_SIZE,
1645            });
1646        }
1647
1648        match compiled_conflict.as_ref() {
1649            None => {
1650                let is_new = wtx
1651                    .table_insert(table_bytes, &bufs.key_buf, &bufs.value_buf)
1652                    .map_err(SqlError::Storage)?;
1653                if !is_new {
1654                    return Err(SqlError::DuplicateKey);
1655                }
1656                if has_indices {
1657                    for (j, &i) in pk_indices.iter().enumerate() {
1658                        bufs.row[i] = bufs.pk_values[j].clone();
1659                    }
1660                    for (j, &i) in non_pk.iter().enumerate() {
1661                        bufs.row[i] = std::mem::replace(
1662                            &mut bufs.value_values[enc_pos[j] as usize],
1663                            Value::Null,
1664                        );
1665                    }
1666                    insert_index_entries(wtx, table_schema, &bufs.row, &bufs.pk_values)?;
1667                }
1668                count += 1;
1669                if let Some(buf) = returning_rows.as_mut() {
1670                    buf.push((None, proposed_row_for_returning));
1671                }
1672            }
1673            Some(oc) => {
1674                let oc_ref: &CompiledOnConflict = oc;
1675                let needs_row = upsert_needs_row(oc_ref, table_schema);
1676                if needs_row {
1677                    for (j, &i) in pk_indices.iter().enumerate() {
1678                        bufs.row[i] = bufs.pk_values[j].clone();
1679                    }
1680                    for (j, &i) in non_pk.iter().enumerate() {
1681                        bufs.row[i] = std::mem::replace(
1682                            &mut bufs.value_values[enc_pos[j] as usize],
1683                            Value::Null,
1684                        );
1685                    }
1686                }
1687                let outcome = apply_insert_with_conflict(
1688                    wtx,
1689                    table_schema,
1690                    &bufs.key_buf,
1691                    &bufs.value_buf,
1692                    &bufs.row,
1693                    &bufs.pk_values,
1694                    oc_ref,
1695                    row_col_map.unwrap(),
1696                    stmt.returning.is_some(),
1697                )?;
1698                match outcome {
1699                    InsertRowOutcome::Inserted => {
1700                        count += 1;
1701                        if let Some(buf) = returning_rows.as_mut() {
1702                            buf.push((None, proposed_row_for_returning));
1703                        }
1704                    }
1705                    InsertRowOutcome::Updated { old, new } => {
1706                        count += 1;
1707                        if let Some(buf) = returning_rows.as_mut() {
1708                            buf.push((Some(old), Some(new)));
1709                        }
1710                    }
1711                    InsertRowOutcome::Skipped => {}
1712                }
1713            }
1714        }
1715    }
1716
1717    if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
1718        return Ok(ExecutionResult::Query(super::helpers::project_returning(
1719            table_schema,
1720            returning_cols,
1721            &rows,
1722        )?));
1723    }
1724
1725    Ok(ExecutionResult::RowsAffected(count))
1726}
1727
1728pub struct CompiledInsert {
1729    table_lower: String,
1730    cached: Option<InsertCache>,
1731}
1732
1733struct InsertCache {
1734    col_indices: Vec<usize>,
1735    has_subquery: bool,
1736    any_defaults: bool,
1737    has_checks: bool,
1738    on_conflict: Option<Arc<CompiledOnConflict>>,
1739    row_col_map: Option<ColumnMap>,
1740    generated_col_positions: Vec<usize>,
1741    generated_fast_evals: Vec<FastGenEval>,
1742    pk_indices: Vec<usize>,
1743    non_pk_indices: Vec<usize>,
1744    encoding_positions: Vec<u16>,
1745    dropped_non_pk_slots: Vec<u16>,
1746    phys_count: usize,
1747    single_int_pk: bool,
1748    not_null_indices: Vec<u16>,
1749    bind_plan: Option<Vec<BindAction>>,
1750    row_fully_overwritten: bool,
1751    row_encoder: Option<crate::encoding::IntRowTemplate>,
1752    is_trivial_fast: bool,
1753    trivial_fast_program: Option<TrivialFastProgram>,
1754}
1755
1756#[derive(Clone)]
1757enum BindAction {
1758    Param {
1759        param_idx: usize,
1760        col_idx: usize,
1761        target: DataType,
1762    },
1763    Literal {
1764        value: Value,
1765        col_idx: usize,
1766    },
1767}
1768
1769#[derive(Clone)]
1770struct TrivialFastProgram {
1771    template: Vec<u8>,
1772    ops: Vec<WriteOp>,
1773    pk_param: u8,
1774    not_null_param_indices: Vec<u8>,
1775}
1776
1777#[derive(Clone)]
1778enum WriteOp {
1779    ParamI64 {
1780        param_idx: u8,
1781        off: u32,
1782    },
1783    LiteralI64 {
1784        value: i64,
1785        off: u32,
1786    },
1787    GenAddParamsI64 {
1788        a_param: u8,
1789        b_param: u8,
1790        off: u32,
1791        bitmap_byte_off: u32,
1792        bitmap_bit_mask: u8,
1793    },
1794    GenMulAddParamI64 {
1795        param_idx: u8,
1796        mul: i64,
1797        add: i64,
1798        off: u32,
1799        bitmap_byte_off: u32,
1800        bitmap_bit_mask: u8,
1801    },
1802}
1803
1804fn build_trivial_fast_program(
1805    bind_plan: &[BindAction],
1806    row_encoder: &crate::encoding::IntRowTemplate,
1807    non_virtual_pairs: &[(usize, usize)],
1808    generated_col_positions: &[usize],
1809    generated_fast_evals: &[FastGenEval],
1810    pk_indices: &[usize],
1811    columns: &[crate::types::ColumnDef],
1812) -> Option<TrivialFastProgram> {
1813    let pk_col = pk_indices[0];
1814    let col_to_slot: rustc_hash::FxHashMap<usize, usize> =
1815        non_virtual_pairs.iter().copied().collect();
1816    let slot_to_off: rustc_hash::FxHashMap<usize, usize> =
1817        row_encoder.slot_offsets.iter().copied().collect();
1818
1819    let mut col_to_param: rustc_hash::FxHashMap<usize, u8> = Default::default();
1820    let mut col_to_lit_int: rustc_hash::FxHashMap<usize, i64> = Default::default();
1821    let mut pk_param: Option<u8> = None;
1822    let mut ops: Vec<WriteOp> = Vec::with_capacity(bind_plan.len() + generated_col_positions.len());
1823    let mut not_null_param_indices: Vec<u8> = Vec::new();
1824
1825    for action in bind_plan {
1826        match action {
1827            BindAction::Param {
1828                param_idx,
1829                col_idx,
1830                target,
1831            } => {
1832                if *target != DataType::Integer {
1833                    return None;
1834                }
1835                let pi: u8 = u8::try_from(*param_idx).ok()?;
1836                col_to_param.insert(*col_idx, pi);
1837                if *col_idx == pk_col {
1838                    pk_param = Some(pi);
1839                } else {
1840                    let slot = *col_to_slot.get(col_idx)?;
1841                    let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
1842                    ops.push(WriteOp::ParamI64 { param_idx: pi, off });
1843                    if !columns[*col_idx].nullable {
1844                        not_null_param_indices.push(pi);
1845                    }
1846                }
1847            }
1848            BindAction::Literal { value, col_idx } => match value {
1849                Value::Integer(v) => {
1850                    col_to_lit_int.insert(*col_idx, *v);
1851                    if *col_idx == pk_col {
1852                        return None;
1853                    }
1854                    let slot = *col_to_slot.get(col_idx)?;
1855                    let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
1856                    ops.push(WriteOp::LiteralI64 { value: *v, off });
1857                }
1858                _ => return None,
1859            },
1860        }
1861    }
1862
1863    let pk_param = pk_param?;
1864
1865    for (i, &gen_pos) in generated_col_positions.iter().enumerate() {
1866        let gen_slot = *col_to_slot.get(&gen_pos)?;
1867        let gen_off = u32::try_from(*slot_to_off.get(&gen_slot)?).ok()?;
1868        let bitmap_byte_off = u32::try_from(2 + gen_slot / 8).ok()?;
1869        let bitmap_bit_mask: u8 = 1u8 << (gen_slot % 8);
1870        let gen_col_nullable = columns[gen_pos].nullable;
1871
1872        match &generated_fast_evals[i] {
1873            FastGenEval::IntColAddCol {
1874                left_idx,
1875                right_idx,
1876            } => {
1877                let a_param = col_to_param.get(left_idx).copied();
1878                let b_param = col_to_param.get(right_idx).copied();
1879                match (a_param, b_param) {
1880                    (Some(ap), Some(bp)) => {
1881                        let deps_safe = gen_col_nullable
1882                            || (not_null_param_indices.contains(&ap)
1883                                && not_null_param_indices.contains(&bp));
1884                        if !deps_safe {
1885                            return None;
1886                        }
1887                        ops.push(WriteOp::GenAddParamsI64 {
1888                            a_param: ap,
1889                            b_param: bp,
1890                            off: gen_off,
1891                            bitmap_byte_off,
1892                            bitmap_bit_mask,
1893                        });
1894                    }
1895                    (Some(p), None) => {
1896                        let lit = col_to_lit_int.get(right_idx).copied()?;
1897                        if !gen_col_nullable && !not_null_param_indices.contains(&p) {
1898                            return None;
1899                        }
1900                        ops.push(WriteOp::GenMulAddParamI64 {
1901                            param_idx: p,
1902                            mul: 1,
1903                            add: lit,
1904                            off: gen_off,
1905                            bitmap_byte_off,
1906                            bitmap_bit_mask,
1907                        });
1908                    }
1909                    (None, Some(p)) => {
1910                        let lit = col_to_lit_int.get(left_idx).copied()?;
1911                        if !gen_col_nullable && !not_null_param_indices.contains(&p) {
1912                            return None;
1913                        }
1914                        ops.push(WriteOp::GenMulAddParamI64 {
1915                            param_idx: p,
1916                            mul: 1,
1917                            add: lit,
1918                            off: gen_off,
1919                            bitmap_byte_off,
1920                            bitmap_bit_mask,
1921                        });
1922                    }
1923                    (None, None) => {
1924                        let la = col_to_lit_int.get(left_idx).copied()?;
1925                        let lb = col_to_lit_int.get(right_idx).copied()?;
1926                        ops.push(WriteOp::LiteralI64 {
1927                            value: la.wrapping_add(lb),
1928                            off: gen_off,
1929                        });
1930                    }
1931                }
1932            }
1933            FastGenEval::IntColMulAdd {
1934                col_schema_idx,
1935                mul,
1936                add,
1937            } => {
1938                if let Some(p) = col_to_param.get(col_schema_idx).copied() {
1939                    if !gen_col_nullable && !not_null_param_indices.contains(&p) {
1940                        return None;
1941                    }
1942                    ops.push(WriteOp::GenMulAddParamI64 {
1943                        param_idx: p,
1944                        mul: *mul,
1945                        add: *add,
1946                        off: gen_off,
1947                        bitmap_byte_off,
1948                        bitmap_bit_mask,
1949                    });
1950                } else if let Some(lit) = col_to_lit_int.get(col_schema_idx).copied() {
1951                    ops.push(WriteOp::LiteralI64 {
1952                        value: lit.wrapping_mul(*mul).wrapping_add(*add),
1953                        off: gen_off,
1954                    });
1955                } else {
1956                    return None;
1957                }
1958            }
1959            FastGenEval::None => return None,
1960        }
1961    }
1962
1963    Some(TrivialFastProgram {
1964        template: row_encoder.template.clone(),
1965        ops,
1966        pk_param,
1967        not_null_param_indices,
1968    })
1969}
1970
1971#[derive(Clone)]
1972pub(super) enum CompiledOnConflict {
1973    DoNothing {
1974        target: Option<ConflictKind>,
1975    },
1976    DoUpdate {
1977        target: ConflictKind,
1978        assignments: Vec<(usize, Expr)>,
1979        where_clause: Option<Expr>,
1980        fast_paths: Option<Vec<DoUpdateFastPath>>,
1981    },
1982}
1983
1984#[derive(Clone, Copy)]
1985pub(super) enum DoUpdateFastPath {
1986    IntAddConst { phys_idx: usize, delta: i64 },
1987}
1988
1989#[derive(Clone, Debug)]
1990pub(super) enum ConflictKind {
1991    PrimaryKey,
1992    UniqueIndex { index_idx: usize },
1993}
1994
1995fn resolve_conflict_target(target: &ConflictTarget, ts: &TableSchema) -> Result<ConflictKind> {
1996    match target {
1997        ConflictTarget::Columns(cols) => {
1998            let col_idx_set: Vec<u16> = cols
1999                .iter()
2000                .map(|name| {
2001                    ts.column_index(name)
2002                        .map(|i| i as u16)
2003                        .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
2004                })
2005                .collect::<Result<_>>()?;
2006            let pk_set = ts.primary_key_columns.clone();
2007            if set_equal(&col_idx_set, &pk_set) {
2008                return Ok(ConflictKind::PrimaryKey);
2009            }
2010            for (index_idx, idx) in ts.indices.iter().enumerate() {
2011                if idx.unique && set_equal(&col_idx_set, &idx.columns) {
2012                    return Ok(ConflictKind::UniqueIndex { index_idx });
2013                }
2014            }
2015            Err(SqlError::Plan(
2016                "ON CONFLICT target does not match any unique constraint".into(),
2017            ))
2018        }
2019        ConflictTarget::Constraint(name) => {
2020            let lower = name.to_ascii_lowercase();
2021            for (index_idx, idx) in ts.indices.iter().enumerate() {
2022                if idx.name.eq_ignore_ascii_case(&lower) {
2023                    if idx.unique {
2024                        return Ok(ConflictKind::UniqueIndex { index_idx });
2025                    }
2026                    return Err(SqlError::Plan(format!(
2027                        "ON CONFLICT ON CONSTRAINT '{name}' requires a unique index"
2028                    )));
2029                }
2030            }
2031            Err(SqlError::Plan(format!(
2032                "unknown constraint '{name}'; primary keys cannot be referenced by name, use ON CONFLICT (col_list)"
2033            )))
2034        }
2035    }
2036}
2037
2038fn set_equal(a: &[u16], b: &[u16]) -> bool {
2039    if a.len() != b.len() {
2040        return false;
2041    }
2042    let mut a_sorted = a.to_vec();
2043    let mut b_sorted = b.to_vec();
2044    a_sorted.sort_unstable();
2045    b_sorted.sort_unstable();
2046    a_sorted == b_sorted
2047}
2048
2049pub(super) enum InsertRowOutcome {
2050    Inserted,
2051    Updated { old: Vec<Value>, new: Vec<Value> },
2052    Skipped,
2053}
2054
2055#[allow(clippy::too_many_arguments)]
2056#[inline]
2057pub(super) fn apply_insert_with_conflict(
2058    wtx: &mut WriteTxn<'_>,
2059    table_schema: &TableSchema,
2060    key_buf: &[u8],
2061    value_buf: &[u8],
2062    row: &[Value],
2063    pk_values: &[Value],
2064    on_conflict: &CompiledOnConflict,
2065    col_map: &ColumnMap,
2066    capture_returning: bool,
2067) -> Result<InsertRowOutcome> {
2068    let table_bytes = table_schema.name.as_bytes();
2069
2070    if let CompiledOnConflict::DoNothing { target } = on_conflict {
2071        let pk_target = matches!(target, None | Some(ConflictKind::PrimaryKey));
2072        if pk_target && table_schema.indices.is_empty() && table_schema.foreign_keys.is_empty() {
2073            let inserted = wtx
2074                .table_insert_if_absent(table_bytes, key_buf, value_buf)
2075                .map_err(SqlError::Storage)?;
2076            return Ok(if inserted {
2077                InsertRowOutcome::Inserted
2078            } else {
2079                InsertRowOutcome::Skipped
2080            });
2081        }
2082    }
2083
2084    if let CompiledOnConflict::DoUpdate {
2085        target: ConflictKind::PrimaryKey,
2086        assignments,
2087        where_clause,
2088        fast_paths,
2089    } = on_conflict
2090    {
2091        if can_fuse_do_update(table_schema, assignments) {
2092            return apply_do_update_fused(
2093                wtx,
2094                table_schema,
2095                table_bytes,
2096                key_buf,
2097                value_buf,
2098                row,
2099                assignments,
2100                where_clause.as_ref(),
2101                col_map,
2102                fast_paths.as_deref(),
2103                capture_returning,
2104            );
2105        }
2106    }
2107
2108    let primary_outcome = wtx
2109        .table_insert_or_fetch(table_bytes, key_buf, value_buf)
2110        .map_err(SqlError::Storage)?;
2111
2112    match primary_outcome {
2113        citadel_txn::write_txn::InsertOutcome::Inserted => {
2114            if table_schema.indices.is_empty() {
2115                return Ok(InsertRowOutcome::Inserted);
2116            }
2117            let mut inserted_keys: Vec<(usize, Vec<u8>)> = Vec::new();
2118            match insert_index_entries_or_fetch(
2119                wtx,
2120                table_schema,
2121                row,
2122                pk_values,
2123                &mut inserted_keys,
2124            )? {
2125                None => Ok(InsertRowOutcome::Inserted),
2126                Some(conflicting_idx) => {
2127                    let matches_target =
2128                        matches!(on_conflict, CompiledOnConflict::DoNothing { target: None })
2129                            || matches!(
2130                                on_conflict,
2131                                CompiledOnConflict::DoNothing {
2132                                    target: Some(ConflictKind::UniqueIndex { index_idx }),
2133                                } | CompiledOnConflict::DoUpdate {
2134                                    target: ConflictKind::UniqueIndex { index_idx },
2135                                    ..
2136                                } if *index_idx == conflicting_idx
2137                            );
2138                    undo_partial_insert(wtx, table_schema, key_buf, &inserted_keys)?;
2139                    if !matches_target {
2140                        return Err(SqlError::UniqueViolation(
2141                            table_schema.indices[conflicting_idx].name.clone(),
2142                        ));
2143                    }
2144                    match on_conflict {
2145                        CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2146                        CompiledOnConflict::DoUpdate {
2147                            assignments,
2148                            where_clause,
2149                            ..
2150                        } => {
2151                            let existing_pk =
2152                                fetch_unique_index_pk(wtx, table_schema, conflicting_idx, row)?;
2153                            apply_do_update(
2154                                wtx,
2155                                table_schema,
2156                                &existing_pk,
2157                                row,
2158                                assignments,
2159                                where_clause.as_ref(),
2160                                col_map,
2161                                capture_returning,
2162                            )
2163                        }
2164                    }
2165                }
2166            }
2167        }
2168        citadel_txn::write_txn::InsertOutcome::Existed(old_bytes) => {
2169            let matches_target = matches!(
2170                on_conflict,
2171                CompiledOnConflict::DoNothing { target: None }
2172                    | CompiledOnConflict::DoNothing {
2173                        target: Some(ConflictKind::PrimaryKey),
2174                    }
2175                    | CompiledOnConflict::DoUpdate {
2176                        target: ConflictKind::PrimaryKey,
2177                        ..
2178                    }
2179            );
2180            if !matches_target {
2181                return Err(SqlError::DuplicateKey);
2182            }
2183            match on_conflict {
2184                CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2185                CompiledOnConflict::DoUpdate {
2186                    assignments,
2187                    where_clause,
2188                    ..
2189                } => {
2190                    let old_row = decode_full_row(table_schema, key_buf, &old_bytes)?;
2191                    apply_do_update_with_old_row(
2192                        wtx,
2193                        table_schema,
2194                        key_buf,
2195                        &old_row,
2196                        row,
2197                        assignments,
2198                        where_clause.as_ref(),
2199                        col_map,
2200                        capture_returning,
2201                    )
2202                }
2203            }
2204        }
2205    }
2206}
2207
2208#[inline]
2209fn apply_fast_path_patch(
2210    old_bytes: &[u8],
2211    fast_paths: &[DoUpdateFastPath],
2212) -> Result<UpsertAction> {
2213    UPSERT_SCRATCH.with(|slot| {
2214        let mut bufs = slot.borrow_mut();
2215        bufs.new_value_buf.clear();
2216        bufs.new_value_buf.extend_from_slice(old_bytes);
2217
2218        let mut patch_scratch: Vec<u8> = Vec::new();
2219
2220        for fp in fast_paths {
2221            match fp {
2222                DoUpdateFastPath::IntAddConst { phys_idx, delta } => {
2223                    let decoded =
2224                        crate::encoding::decode_columns(&bufs.new_value_buf, &[*phys_idx])?;
2225                    let old_val = &decoded[0];
2226                    let new_val = match old_val {
2227                        Value::Integer(i) => Value::Integer(i.wrapping_add(*delta)),
2228                        Value::Null => Value::Null,
2229                        _ => {
2230                            return Err(SqlError::TypeMismatch {
2231                                expected: "INTEGER".into(),
2232                                got: old_val.data_type().to_string(),
2233                            });
2234                        }
2235                    };
2236                    if !crate::encoding::patch_column_in_place(
2237                        &mut bufs.new_value_buf,
2238                        *phys_idx,
2239                        &new_val,
2240                    )? {
2241                        patch_scratch.clear();
2242                        crate::encoding::patch_row_column(
2243                            &bufs.new_value_buf,
2244                            *phys_idx,
2245                            &new_val,
2246                            &mut patch_scratch,
2247                        )?;
2248                        std::mem::swap(&mut bufs.new_value_buf, &mut patch_scratch);
2249                    }
2250                }
2251            }
2252        }
2253
2254        if bufs.new_value_buf.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
2255            return Err(SqlError::RowTooLarge {
2256                size: bufs.new_value_buf.len(),
2257                max: citadel_core::MAX_INLINE_VALUE_SIZE,
2258            });
2259        }
2260
2261        Ok(UpsertAction::Replace(bufs.new_value_buf.clone()))
2262    })
2263}
2264
2265fn upsert_needs_row(oc: &CompiledOnConflict, ts: &TableSchema) -> bool {
2266    if !ts.indices.is_empty() {
2267        return true;
2268    }
2269    match oc {
2270        CompiledOnConflict::DoNothing { .. } => false,
2271        CompiledOnConflict::DoUpdate { fast_paths, .. } => fast_paths.is_none() || ts.has_checks(),
2272    }
2273}
2274
2275fn can_fuse_do_update(ts: &TableSchema, assignments: &[(usize, Expr)]) -> bool {
2276    if !ts.indices.is_empty() {
2277        return false;
2278    }
2279    if !ts.foreign_keys.is_empty() {
2280        return false;
2281    }
2282    if ts.columns.iter().any(|c| c.generated_kind.is_some()) {
2283        return false;
2284    }
2285    let pk = ts.pk_indices();
2286    !assignments.iter().any(|(ci, _)| pk.contains(ci))
2287}
2288
2289#[allow(clippy::too_many_arguments)]
2290#[inline]
2291fn apply_do_update_fused(
2292    wtx: &mut WriteTxn<'_>,
2293    table_schema: &TableSchema,
2294    table_bytes: &[u8],
2295    key_buf: &[u8],
2296    value_buf: &[u8],
2297    proposed_row: &[Value],
2298    assignments: &[(usize, Expr)],
2299    where_clause: Option<&Expr>,
2300    col_map: &ColumnMap,
2301    fast_paths: Option<&[DoUpdateFastPath]>,
2302    capture_returning: bool,
2303) -> Result<InsertRowOutcome> {
2304    let non_pk = table_schema.non_pk_indices();
2305    let enc_pos = table_schema.encoding_positions();
2306    let phys_count = table_schema.physical_non_pk_count();
2307    let dropped = table_schema.dropped_non_pk_slots();
2308    let has_checks = table_schema.has_checks();
2309    let has_fks = !table_schema.foreign_keys.is_empty();
2310
2311    let captured: std::cell::RefCell<Option<(Vec<Value>, Vec<Value>)>> =
2312        std::cell::RefCell::new(None);
2313
2314    let outcome =
2315        wtx.table_upsert_with::<_, SqlError>(table_bytes, key_buf, value_buf, |old_bytes| {
2316            if let Some(fps) = fast_paths {
2317                if !has_checks {
2318                    let action = apply_fast_path_patch(old_bytes, fps)?;
2319                    if capture_returning {
2320                        if let UpsertAction::Replace(ref new_bytes) = action {
2321                            let old_row = decode_full_row(table_schema, key_buf, old_bytes)?;
2322                            let new_row = decode_full_row(table_schema, key_buf, new_bytes)?;
2323                            *captured.borrow_mut() = Some((old_row, new_row));
2324                        }
2325                    }
2326                    return Ok(action);
2327                }
2328            }
2329            UPSERT_SCRATCH.with(|slot| {
2330                let mut bufs = slot.borrow_mut();
2331                let UpsertBufs {
2332                    old_row,
2333                    new_row,
2334                    value_values,
2335                    new_value_buf,
2336                } = &mut *bufs;
2337
2338                old_row.clear();
2339                old_row.resize(table_schema.columns.len(), Value::Null);
2340                decode_full_row_into(table_schema, key_buf, old_bytes, old_row)?;
2341
2342                if let Some(w) = where_clause {
2343                    let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2344                    let result = eval_expr(w, &ctx)?;
2345                    if result.is_null() || !is_truthy(&result) {
2346                        return Ok(UpsertAction::Skip);
2347                    }
2348                }
2349
2350                new_row.clear();
2351                new_row.extend_from_slice(old_row);
2352                for (col_idx, expr) in assignments {
2353                    let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2354                    let val = eval_expr(expr, &ctx)?;
2355                    let col = &table_schema.columns[*col_idx];
2356                    new_row[*col_idx] = if val.is_null() {
2357                        Value::Null
2358                    } else {
2359                        let got = val.data_type();
2360                        val.coerce_into(col.data_type)
2361                            .ok_or_else(|| SqlError::TypeMismatch {
2362                                expected: col.data_type.to_string(),
2363                                got: got.to_string(),
2364                            })?
2365                    };
2366                }
2367
2368                for (assigned_idx, _) in assignments {
2369                    let col = &table_schema.columns[*assigned_idx];
2370                    if !col.nullable && new_row[col.position as usize].is_null() {
2371                        return Err(SqlError::NotNullViolation(col.name.clone()));
2372                    }
2373                }
2374                if has_checks {
2375                    for col in &table_schema.columns {
2376                        if let Some(ref check) = col.check_expr {
2377                            let ctx = EvalCtx::new(col_map, new_row);
2378                            let result = eval_expr(check, &ctx)?;
2379                            if !is_truthy(&result) && !result.is_null() {
2380                                let name = col.check_name.as_deref().unwrap_or(&col.name);
2381                                return Err(SqlError::CheckViolation(name.to_string()));
2382                            }
2383                        }
2384                    }
2385                    for tc in &table_schema.check_constraints {
2386                        let ctx = EvalCtx::new(col_map, new_row);
2387                        let result = eval_expr(&tc.expr, &ctx)?;
2388                        if !is_truthy(&result) && !result.is_null() {
2389                            let name = tc.name.as_deref().unwrap_or(&tc.sql);
2390                            return Err(SqlError::CheckViolation(name.to_string()));
2391                        }
2392                    }
2393                }
2394                let _ = has_fks;
2395
2396                value_values.clear();
2397                value_values.resize(phys_count, Value::Null);
2398                for &slot in dropped {
2399                    value_values[slot as usize] = Value::Null;
2400                }
2401                for (j, &i) in non_pk.iter().enumerate() {
2402                    value_values[enc_pos[j] as usize] = new_row[i].clone();
2403                }
2404                new_value_buf.clear();
2405                crate::encoding::encode_row_into(value_values, new_value_buf);
2406
2407                if new_value_buf.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
2408                    return Err(SqlError::RowTooLarge {
2409                        size: new_value_buf.len(),
2410                        max: citadel_core::MAX_INLINE_VALUE_SIZE,
2411                    });
2412                }
2413
2414                if capture_returning {
2415                    *captured.borrow_mut() = Some((old_row.clone(), new_row.clone()));
2416                }
2417                Ok(UpsertAction::Replace(new_value_buf.clone()))
2418            })
2419        })?;
2420
2421    match outcome {
2422        UpsertOutcome::Inserted => Ok(InsertRowOutcome::Inserted),
2423        UpsertOutcome::Updated => {
2424            if capture_returning {
2425                let (old, new) = captured.into_inner().ok_or_else(|| {
2426                    SqlError::InvalidValue("DO UPDATE produced no captured rows".into())
2427                })?;
2428                Ok(InsertRowOutcome::Updated { old, new })
2429            } else {
2430                Ok(InsertRowOutcome::Inserted)
2431            }
2432        }
2433        UpsertOutcome::Skipped => Ok(InsertRowOutcome::Skipped),
2434    }
2435}
2436
2437fn fetch_unique_index_pk(
2438    wtx: &mut WriteTxn<'_>,
2439    table_schema: &TableSchema,
2440    index_idx: usize,
2441    row: &[Value],
2442) -> Result<Vec<u8>> {
2443    let idx = &table_schema.indices[index_idx];
2444    let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2445    let indexed: Vec<Value> = idx
2446        .columns
2447        .iter()
2448        .map(|&col_idx| row[col_idx as usize].clone())
2449        .collect();
2450    let key = crate::encoding::encode_composite_key(&indexed);
2451    let value = wtx
2452        .table_get(&idx_table, &key)
2453        .map_err(SqlError::Storage)?
2454        .ok_or_else(|| {
2455            SqlError::InvalidValue("unique index missing expected collision entry".into())
2456        })?;
2457    Ok(value)
2458}
2459
2460#[allow(clippy::too_many_arguments)]
2461fn apply_do_update(
2462    wtx: &mut WriteTxn<'_>,
2463    table_schema: &TableSchema,
2464    pk_key: &[u8],
2465    proposed_row: &[Value],
2466    assignments: &[(usize, Expr)],
2467    where_clause: Option<&Expr>,
2468    col_map: &ColumnMap,
2469    capture_returning: bool,
2470) -> Result<InsertRowOutcome> {
2471    let old_value = wtx
2472        .table_get(table_schema.name.as_bytes(), pk_key)
2473        .map_err(SqlError::Storage)?
2474        .ok_or_else(|| SqlError::InvalidValue("primary row missing for DO UPDATE target".into()))?;
2475    let old_row = decode_full_row(table_schema, pk_key, &old_value)?;
2476    apply_do_update_with_old_row(
2477        wtx,
2478        table_schema,
2479        pk_key,
2480        &old_row,
2481        proposed_row,
2482        assignments,
2483        where_clause,
2484        col_map,
2485        capture_returning,
2486    )
2487}
2488
2489#[allow(clippy::too_many_arguments)]
2490fn apply_do_update_with_old_row(
2491    wtx: &mut WriteTxn<'_>,
2492    table_schema: &TableSchema,
2493    old_pk_key: &[u8],
2494    old_row: &[Value],
2495    proposed_row: &[Value],
2496    assignments: &[(usize, Expr)],
2497    where_clause: Option<&Expr>,
2498    col_map: &ColumnMap,
2499    capture_returning: bool,
2500) -> Result<InsertRowOutcome> {
2501    if let Some(w) = where_clause {
2502        let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2503        let result = eval_expr(w, &ctx)?;
2504        if result.is_null() || !is_truthy(&result) {
2505            return Ok(InsertRowOutcome::Skipped);
2506        }
2507    }
2508
2509    let mut new_row = old_row.to_vec();
2510    for (col_idx, expr) in assignments {
2511        let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2512        let val = eval_expr(expr, &ctx)?;
2513        let col = &table_schema.columns[*col_idx];
2514        new_row[*col_idx] = if val.is_null() {
2515            Value::Null
2516        } else {
2517            let got = val.data_type();
2518            val.coerce_into(col.data_type)
2519                .ok_or_else(|| SqlError::TypeMismatch {
2520                    expected: col.data_type.to_string(),
2521                    got: got.to_string(),
2522                })?
2523        };
2524    }
2525
2526    for col in &table_schema.columns {
2527        if matches!(
2528            col.generated_kind,
2529            Some(crate::parser::GeneratedKind::Stored)
2530        ) {
2531            let val = eval_expr(
2532                col.generated_expr.as_ref().unwrap(),
2533                &EvalCtx::new(col_map, &new_row),
2534            )?;
2535            let pos = col.position as usize;
2536            new_row[pos] = if val.is_null() {
2537                if !col.nullable {
2538                    return Err(SqlError::NotNullViolation(col.name.clone()));
2539                }
2540                Value::Null
2541            } else {
2542                let got = val.data_type();
2543                val.coerce_into(col.data_type)
2544                    .ok_or_else(|| SqlError::TypeMismatch {
2545                        expected: col.data_type.to_string(),
2546                        got: got.to_string(),
2547                    })?
2548            };
2549        }
2550    }
2551
2552    let pk_indices = table_schema.pk_indices();
2553    let assigned_pk = assignments.iter().any(|(ci, _)| pk_indices.contains(ci));
2554    let pk_changed = assigned_pk && pk_indices.iter().any(|&i| old_row[i] != new_row[i]);
2555
2556    for (assigned_idx, _) in assignments {
2557        let col = &table_schema.columns[*assigned_idx];
2558        if !col.nullable && new_row[col.position as usize].is_null() {
2559            return Err(SqlError::NotNullViolation(col.name.clone()));
2560        }
2561    }
2562    if table_schema.has_checks() {
2563        for col in &table_schema.columns {
2564            if let Some(ref check) = col.check_expr {
2565                let ctx = EvalCtx::new(col_map, &new_row);
2566                let result = eval_expr(check, &ctx)?;
2567                if !is_truthy(&result) && !result.is_null() {
2568                    let name = col.check_name.as_deref().unwrap_or(&col.name);
2569                    return Err(SqlError::CheckViolation(name.to_string()));
2570                }
2571            }
2572        }
2573        for tc in &table_schema.check_constraints {
2574            let ctx = EvalCtx::new(col_map, &new_row);
2575            let result = eval_expr(&tc.expr, &ctx)?;
2576            if !is_truthy(&result) && !result.is_null() {
2577                let name = tc.name.as_deref().unwrap_or(&tc.sql);
2578                return Err(SqlError::CheckViolation(name.to_string()));
2579            }
2580        }
2581    }
2582    for fk in &table_schema.foreign_keys {
2583        let changed = fk
2584            .columns
2585            .iter()
2586            .any(|&ci| old_row[ci as usize] != new_row[ci as usize]);
2587        if !changed {
2588            continue;
2589        }
2590        let any_null = fk.columns.iter().any(|&ci| new_row[ci as usize].is_null());
2591        if any_null {
2592            continue;
2593        }
2594        let fk_vals: Vec<Value> = fk
2595            .columns
2596            .iter()
2597            .map(|&ci| new_row[ci as usize].clone())
2598            .collect();
2599        let fk_key = crate::encoding::encode_composite_key(&fk_vals);
2600        let found = wtx
2601            .table_get(fk.foreign_table.as_bytes(), &fk_key)
2602            .map_err(SqlError::Storage)?;
2603        if found.is_none() {
2604            let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
2605            return Err(SqlError::ForeignKeyViolation(name.to_string()));
2606        }
2607    }
2608
2609    let has_indices = !table_schema.indices.is_empty();
2610    let old_pk_values: Vec<Value> = if has_indices || pk_changed {
2611        pk_indices.iter().map(|&i| old_row[i].clone()).collect()
2612    } else {
2613        Vec::new()
2614    };
2615    let new_pk_values: Vec<Value> = if has_indices || pk_changed {
2616        pk_indices.iter().map(|&i| new_row[i].clone()).collect()
2617    } else {
2618        Vec::new()
2619    };
2620
2621    let non_pk = table_schema.non_pk_indices();
2622    let enc_pos = table_schema.encoding_positions();
2623    let phys_count = table_schema.physical_non_pk_count();
2624    let dropped = table_schema.dropped_non_pk_slots();
2625    let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
2626    for &slot in dropped {
2627        value_values[slot as usize] = Value::Null;
2628    }
2629    for (j, &i) in non_pk.iter().enumerate() {
2630        let col = &table_schema.columns[i];
2631        value_values[enc_pos[j] as usize] = if matches!(
2632            col.generated_kind,
2633            Some(crate::parser::GeneratedKind::Virtual)
2634        ) {
2635            Value::Null
2636        } else {
2637            new_row[i].clone()
2638        };
2639    }
2640    let mut new_value_buf = Vec::with_capacity(256);
2641    crate::encoding::encode_row_into(&value_values, &mut new_value_buf);
2642
2643    if new_value_buf.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
2644        return Err(SqlError::RowTooLarge {
2645            size: new_value_buf.len(),
2646            max: citadel_core::MAX_INLINE_VALUE_SIZE,
2647        });
2648    }
2649
2650    if pk_changed {
2651        let new_pk_key = crate::encoding::encode_composite_key(&new_pk_values);
2652        let inserted = wtx
2653            .table_insert(table_schema.name.as_bytes(), &new_pk_key, &new_value_buf)
2654            .map_err(SqlError::Storage)?;
2655        if !inserted {
2656            return Err(SqlError::DuplicateKey);
2657        }
2658        wtx.table_delete(table_schema.name.as_bytes(), old_pk_key)
2659            .map_err(SqlError::Storage)?;
2660        for idx in &table_schema.indices {
2661            let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2662            let old_idx_key = encode_index_key(idx, old_row, &old_pk_values);
2663            wtx.table_delete(&idx_table, &old_idx_key)
2664                .map_err(SqlError::Storage)?;
2665            let new_idx_key = encode_index_key(idx, &new_row, &new_pk_values);
2666            let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
2667            let is_new = wtx
2668                .table_insert(&idx_table, &new_idx_key, &new_idx_val)
2669                .map_err(SqlError::Storage)?;
2670            if idx.unique && !is_new {
2671                let any_null = idx.columns.iter().any(|&c| new_row[c as usize].is_null());
2672                if !any_null {
2673                    return Err(SqlError::UniqueViolation(idx.name.clone()));
2674                }
2675            }
2676        }
2677    } else {
2678        wtx.table_update_sorted(
2679            table_schema.name.as_bytes(),
2680            &[(old_pk_key, new_value_buf.as_slice())],
2681        )
2682        .map_err(SqlError::Storage)?;
2683        let col_map_partial =
2684            any_partial_index(table_schema).then(|| ColumnMap::new(&table_schema.columns));
2685        for idx in &table_schema.indices {
2686            let cols_changed = index_columns_changed(idx, old_row, &new_row);
2687            let (del, ins) = partial_idx_update_actions(
2688                idx,
2689                old_row,
2690                &new_row,
2691                cols_changed,
2692                false,
2693                col_map_partial.as_ref(),
2694            );
2695            let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2696            if del {
2697                let old_idx_key = encode_index_key(idx, old_row, &old_pk_values);
2698                wtx.table_delete(&idx_table, &old_idx_key)
2699                    .map_err(SqlError::Storage)?;
2700            }
2701            if ins {
2702                let new_idx_key = encode_index_key(idx, &new_row, &new_pk_values);
2703                let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
2704                let is_new = wtx
2705                    .table_insert(&idx_table, &new_idx_key, &new_idx_val)
2706                    .map_err(SqlError::Storage)?;
2707                if idx.unique && !is_new {
2708                    let any_null = idx.columns.iter().any(|&c| new_row[c as usize].is_null());
2709                    if !any_null {
2710                        return Err(SqlError::UniqueViolation(idx.name.clone()));
2711                    }
2712                }
2713            }
2714        }
2715    }
2716
2717    if capture_returning {
2718        Ok(InsertRowOutcome::Updated {
2719            old: old_row.to_vec(),
2720            new: new_row,
2721        })
2722    } else {
2723        Ok(InsertRowOutcome::Inserted)
2724    }
2725}
2726
2727fn detect_fast_paths(
2728    ts: &TableSchema,
2729    assignments: &[(usize, Expr)],
2730) -> Option<Vec<DoUpdateFastPath>> {
2731    let non_pk = ts.non_pk_indices();
2732    let enc_pos = ts.encoding_positions();
2733    let mut out = Vec::with_capacity(assignments.len());
2734    for (col_idx, expr) in assignments {
2735        let col = &ts.columns[*col_idx];
2736        if col.data_type != DataType::Integer {
2737            return None;
2738        }
2739        let nonpk_order = non_pk.iter().position(|&i| i == *col_idx)?;
2740        let phys_idx = enc_pos[nonpk_order] as usize;
2741
2742        if let Expr::BinaryOp { left, op, right } = expr {
2743            if !matches!(op, BinOp::Add | BinOp::Sub) {
2744                return None;
2745            }
2746            let reads_target =
2747                matches!(left.as_ref(), Expr::Column(n) if n.eq_ignore_ascii_case(&col.name));
2748            if !reads_target {
2749                return None;
2750            }
2751            if let Expr::Literal(Value::Integer(n)) = right.as_ref() {
2752                let delta = if matches!(op, BinOp::Sub) { -n } else { *n };
2753                let _ = col_idx;
2754                out.push(DoUpdateFastPath::IntAddConst { phys_idx, delta });
2755                continue;
2756            }
2757            return None;
2758        }
2759        return None;
2760    }
2761    Some(out)
2762}
2763
2764fn compile_on_conflict(oc: &OnConflictClause, ts: &TableSchema) -> Result<CompiledOnConflict> {
2765    let target = oc
2766        .target
2767        .as_ref()
2768        .map(|t| resolve_conflict_target(t, ts))
2769        .transpose()?;
2770    match &oc.action {
2771        OnConflictAction::DoNothing => Ok(CompiledOnConflict::DoNothing { target }),
2772        OnConflictAction::DoUpdate {
2773            assignments,
2774            where_clause,
2775        } => {
2776            let target = target.ok_or_else(|| {
2777                SqlError::Plan("ON CONFLICT without target requires DO NOTHING".into())
2778            })?;
2779            let compiled_assignments: Vec<(usize, Expr)> = assignments
2780                .iter()
2781                .map(|(name, expr)| {
2782                    let col_idx = ts
2783                        .column_index(name)
2784                        .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?;
2785                    Ok((col_idx, expr.clone()))
2786                })
2787                .collect::<Result<_>>()?;
2788            let fast_paths = if where_clause.is_none() {
2789                detect_fast_paths(ts, &compiled_assignments)
2790            } else {
2791                None
2792            };
2793            Ok(CompiledOnConflict::DoUpdate {
2794                target,
2795                assignments: compiled_assignments,
2796                where_clause: where_clause.clone(),
2797                fast_paths,
2798            })
2799        }
2800    }
2801}
2802
2803/// Caller MUST check `cache.is_trivial_fast` first.
2804fn exec_insert_trivial_fast(
2805    wtx: &mut WriteTxn<'_>,
2806    table_lower: &str,
2807    cache: &InsertCache,
2808    bufs: &mut InsertBufs,
2809    params: &[Value],
2810) -> Result<ExecutionResult> {
2811    let prog = cache
2812        .trivial_fast_program
2813        .as_ref()
2814        .expect("trivial fast: program");
2815
2816    for &p in &prog.not_null_param_indices {
2817        if params[p as usize].is_null() {
2818            return Err(SqlError::NotNullViolation(format!("param@{p}")));
2819        }
2820    }
2821
2822    match &params[prog.pk_param as usize] {
2823        Value::Integer(v) => crate::encoding::encode_int_key_into(*v, &mut bufs.key_buf),
2824        _ => return Err(SqlError::InvalidValue("non-integer PK in fast path".into())),
2825    }
2826
2827    bufs.value_buf.clear();
2828    bufs.value_buf.extend_from_slice(&prog.template);
2829
2830    for op in &prog.ops {
2831        match op {
2832            WriteOp::ParamI64 { param_idx, off } => match &params[*param_idx as usize] {
2833                Value::Integer(v) => {
2834                    let off = *off as usize;
2835                    bufs.value_buf[off..off + 8].copy_from_slice(&v.to_le_bytes());
2836                }
2837                other => {
2838                    return Err(SqlError::TypeMismatch {
2839                        expected: "Integer".into(),
2840                        got: other.data_type().to_string(),
2841                    });
2842                }
2843            },
2844            WriteOp::LiteralI64 { value, off } => {
2845                let off = *off as usize;
2846                bufs.value_buf[off..off + 8].copy_from_slice(&value.to_le_bytes());
2847            }
2848            WriteOp::GenAddParamsI64 {
2849                a_param,
2850                b_param,
2851                off,
2852                bitmap_byte_off,
2853                bitmap_bit_mask,
2854            } => match (&params[*a_param as usize], &params[*b_param as usize]) {
2855                (Value::Integer(a), Value::Integer(b)) => {
2856                    let off = *off as usize;
2857                    bufs.value_buf[off..off + 8].copy_from_slice(&a.wrapping_add(*b).to_le_bytes());
2858                }
2859                _ => {
2860                    bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
2861                }
2862            },
2863            WriteOp::GenMulAddParamI64 {
2864                param_idx,
2865                mul,
2866                add,
2867                off,
2868                bitmap_byte_off,
2869                bitmap_bit_mask,
2870            } => match &params[*param_idx as usize] {
2871                Value::Integer(v) => {
2872                    let r = v.wrapping_mul(*mul).wrapping_add(*add);
2873                    let off = *off as usize;
2874                    bufs.value_buf[off..off + 8].copy_from_slice(&r.to_le_bytes());
2875                }
2876                _ => {
2877                    bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
2878                }
2879            },
2880        }
2881    }
2882
2883    let is_new = wtx
2884        .table_insert(table_lower.as_bytes(), &bufs.key_buf, &bufs.value_buf)
2885        .map_err(SqlError::Storage)?;
2886    if !is_new {
2887        return Err(SqlError::DuplicateKey);
2888    }
2889    Ok(ExecutionResult::RowsAffected(1))
2890}
2891
2892fn build_bind_plan(
2893    stmt: &InsertStmt,
2894    col_indices: &[usize],
2895    col_data_types: &[DataType],
2896) -> Option<Vec<BindAction>> {
2897    let rows = match &stmt.source {
2898        InsertSource::Values(rows) => rows,
2899        _ => return None,
2900    };
2901    if rows.len() != 1 {
2902        return None;
2903    }
2904    let value_row = &rows[0];
2905    if value_row.len() != col_indices.len() {
2906        return None;
2907    }
2908    let mut plan = Vec::with_capacity(value_row.len());
2909    for (i, expr) in value_row.iter().enumerate() {
2910        let col_idx = col_indices[i];
2911        let target = col_data_types[col_idx];
2912        match expr {
2913            Expr::Parameter(n) => {
2914                if *n == 0 {
2915                    return None;
2916                }
2917                plan.push(BindAction::Param {
2918                    param_idx: n - 1,
2919                    col_idx,
2920                    target,
2921                });
2922            }
2923            Expr::Literal(v) => plan.push(BindAction::Literal {
2924                value: v.clone(),
2925                col_idx,
2926            }),
2927            _ => return None,
2928        }
2929    }
2930    Some(plan)
2931}
2932
2933impl CompiledInsert {
2934    pub fn try_compile(schema: &SchemaManager, stmt: &InsertStmt) -> Option<Self> {
2935        let lower = stmt.table.to_ascii_lowercase();
2936        let cached = if let Some(ts) = schema.get(&lower) {
2937            let insert_columns: Vec<&str> = if stmt.columns.is_empty() {
2938                ts.columns.iter().map(|c| c.name.as_str()).collect()
2939            } else {
2940                stmt.columns.iter().map(|s| s.as_str()).collect()
2941            };
2942            let mut col_indices = Vec::with_capacity(insert_columns.len());
2943            for name in &insert_columns {
2944                col_indices.push(ts.column_index(name)?);
2945            }
2946            if col_indices
2947                .iter()
2948                .any(|&ci| ts.columns[ci].generated_kind.is_some())
2949            {
2950                return None;
2951            }
2952            let on_conflict = stmt
2953                .on_conflict
2954                .as_ref()
2955                .map(|oc| compile_on_conflict(oc, ts))
2956                .transpose()
2957                .ok()
2958                .flatten()
2959                .map(Arc::new);
2960            let generated_col_positions: Vec<usize> = ts
2961                .columns
2962                .iter()
2963                .enumerate()
2964                .filter_map(|(i, c)| {
2965                    matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored))
2966                        .then_some(i)
2967                })
2968                .collect();
2969            let generated_fast_evals: Vec<FastGenEval> = generated_col_positions
2970                .iter()
2971                .map(|&pos| {
2972                    detect_fast_gen_eval(ts.columns[pos].generated_expr.as_ref().unwrap(), ts)
2973                })
2974                .collect();
2975            let row_col_map = if on_conflict.is_some() || !generated_col_positions.is_empty() {
2976                Some(ColumnMap::new(&ts.columns))
2977            } else {
2978                None
2979            };
2980            let pk_indices: Vec<usize> = ts.pk_indices().to_vec();
2981            let non_pk_indices: Vec<usize> = ts.non_pk_indices().to_vec();
2982            let encoding_positions: Vec<u16> = ts.encoding_positions().to_vec();
2983            let dropped_non_pk_slots: Vec<u16> = ts.dropped_non_pk_slots().to_vec();
2984            let phys_count = ts.physical_non_pk_count();
2985            let col_data_types: Vec<DataType> = ts.columns.iter().map(|c| c.data_type).collect();
2986            let single_int_pk =
2987                pk_indices.len() == 1 && ts.columns[pk_indices[0]].data_type == DataType::Integer;
2988            let not_null_indices: Vec<u16> = ts
2989                .columns
2990                .iter()
2991                .filter(|c| !c.nullable)
2992                .map(|c| c.position)
2993                .collect();
2994            let bind_plan = build_bind_plan(stmt, &col_indices, &col_data_types);
2995            let any_defaults_flag = ts.columns.iter().any(|c| c.default_expr.is_some());
2996            let row_fully_overwritten = if any_defaults_flag {
2997                false
2998            } else {
2999                let mut covered: rustc_hash::FxHashSet<usize> =
3000                    col_indices.iter().copied().collect();
3001                covered.extend(generated_col_positions.iter().copied());
3002                for (j, &i) in non_pk_indices.iter().enumerate() {
3003                    let _ = j;
3004                    if matches!(
3005                        ts.columns[i].generated_kind,
3006                        Some(crate::parser::GeneratedKind::Virtual)
3007                    ) {
3008                        covered.insert(i);
3009                    }
3010                }
3011                bind_plan.is_some() && covered.len() == ts.columns.len()
3012            };
3013            let has_fks = !ts.foreign_keys.is_empty();
3014            let has_indices = !ts.indices.is_empty();
3015            let mut non_virtual_pairs: Vec<(usize, usize)> = Vec::new();
3016            let mut null_value_slots: Vec<usize> =
3017                dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3018            for (j, &i) in non_pk_indices.iter().enumerate() {
3019                let slot = encoding_positions[j] as usize;
3020                if matches!(
3021                    ts.columns[i].generated_kind,
3022                    Some(crate::parser::GeneratedKind::Virtual)
3023                ) {
3024                    null_value_slots.push(slot);
3025                } else {
3026                    non_virtual_pairs.push((i, slot));
3027                }
3028            }
3029            let row_encoder = {
3030                let all_int_or_null = non_pk_indices.iter().enumerate().all(|(j, &i)| {
3031                    let col = &ts.columns[i];
3032                    if matches!(
3033                        col.generated_kind,
3034                        Some(crate::parser::GeneratedKind::Virtual)
3035                    ) {
3036                        true
3037                    } else {
3038                        col.data_type == DataType::Integer && encoding_positions[j] != u16::MAX
3039                    }
3040                });
3041                if all_int_or_null {
3042                    let mut null_slots: Vec<usize> =
3043                        dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3044                    for (j, &i) in non_pk_indices.iter().enumerate() {
3045                        if matches!(
3046                            ts.columns[i].generated_kind,
3047                            Some(crate::parser::GeneratedKind::Virtual)
3048                        ) {
3049                            null_slots.push(encoding_positions[j] as usize);
3050                        }
3051                    }
3052                    Some(crate::encoding::build_int_row_template(
3053                        phys_count,
3054                        &null_slots,
3055                    ))
3056                } else {
3057                    None
3058                }
3059            };
3060            let is_trivial_fast_eligible = !insert_has_subquery(stmt)
3061                && !ts.columns.iter().any(|c| c.default_expr.is_some())
3062                && !ts.has_checks()
3063                && !has_fks
3064                && !has_indices
3065                && stmt.on_conflict.is_none()
3066                && stmt.returning.is_none()
3067                && bind_plan.is_some()
3068                && row_encoder.is_some()
3069                && row_fully_overwritten
3070                && single_int_pk
3071                && generated_fast_evals
3072                    .iter()
3073                    .all(|fe| !matches!(fe, FastGenEval::None));
3074            let trivial_fast_program = if is_trivial_fast_eligible {
3075                build_trivial_fast_program(
3076                    bind_plan.as_ref().unwrap(),
3077                    row_encoder.as_ref().unwrap(),
3078                    &non_virtual_pairs,
3079                    &generated_col_positions,
3080                    &generated_fast_evals,
3081                    &pk_indices,
3082                    &ts.columns,
3083                )
3084            } else {
3085                None
3086            };
3087            let is_trivial_fast = trivial_fast_program.is_some();
3088            Some(InsertCache {
3089                col_indices,
3090                has_subquery: insert_has_subquery(stmt),
3091                any_defaults: ts.columns.iter().any(|c| c.default_expr.is_some()),
3092                has_checks: ts.has_checks(),
3093                on_conflict,
3094                row_col_map,
3095                generated_col_positions,
3096                generated_fast_evals,
3097                pk_indices,
3098                non_pk_indices,
3099                encoding_positions,
3100                dropped_non_pk_slots,
3101                phys_count,
3102                single_int_pk,
3103                not_null_indices,
3104                bind_plan,
3105                row_fully_overwritten,
3106                row_encoder,
3107                is_trivial_fast,
3108                trivial_fast_program,
3109            })
3110        } else if schema.get_view(&lower).is_some() {
3111            None
3112        } else {
3113            return None;
3114        };
3115        Some(Self {
3116            table_lower: lower,
3117            cached,
3118        })
3119    }
3120}
3121
3122impl CompiledPlan for CompiledInsert {
3123    fn execute(
3124        &self,
3125        db: &Database,
3126        schema: &SchemaManager,
3127        stmt: &Statement,
3128        params: &[Value],
3129        wtx: Option<&mut WriteTxn<'_>>,
3130    ) -> Result<ExecutionResult> {
3131        let ins = match stmt {
3132            Statement::Insert(i) => i,
3133            _ => {
3134                return Err(SqlError::Unsupported(
3135                    "CompiledInsert received non-INSERT statement".into(),
3136                ))
3137            }
3138        };
3139        match wtx {
3140            None => exec_insert(db, schema, ins, params),
3141            Some(outer) => match self.cached.as_ref() {
3142                Some(c) if c.is_trivial_fast => with_insert_scratch(|bufs| {
3143                    exec_insert_trivial_fast(outer, &self.table_lower, c, bufs, params)
3144                }),
3145                Some(c) => exec_insert_in_txn_cached(outer, schema, ins, params, c),
3146                None => exec_insert_in_txn(outer, schema, ins, params),
3147            },
3148        }
3149    }
3150
3151    fn uses_scoped_params(&self) -> bool {
3152        match self.cached.as_ref() {
3153            Some(c) => !c.is_trivial_fast,
3154            None => true,
3155        }
3156    }
3157}
3158
3159pub struct CompiledDelete {
3160    table_lower: String,
3161}
3162
3163impl CompiledDelete {
3164    pub fn try_compile(schema: &SchemaManager, stmt: &DeleteStmt) -> Option<Self> {
3165        let lower = stmt.table.to_ascii_lowercase();
3166        schema.get(&lower)?;
3167        Some(Self { table_lower: lower })
3168    }
3169}
3170
3171impl CompiledPlan for CompiledDelete {
3172    fn execute(
3173        &self,
3174        db: &Database,
3175        schema: &SchemaManager,
3176        stmt: &Statement,
3177        _params: &[Value],
3178        wtx: Option<&mut WriteTxn<'_>>,
3179    ) -> Result<ExecutionResult> {
3180        let del = match stmt {
3181            Statement::Delete(d) => d,
3182            _ => {
3183                return Err(SqlError::Unsupported(
3184                    "CompiledDelete received non-DELETE statement".into(),
3185                ))
3186            }
3187        };
3188        let _ = &self.table_lower;
3189        match wtx {
3190            None => super::write::exec_delete(db, schema, del),
3191            Some(outer) => super::write::exec_delete_in_txn(outer, schema, del),
3192        }
3193    }
3194}