Skip to main content

citadel_sql/executor/
dml.rs

1use std::cell::RefCell;
2use std::sync::Arc;
3
4use citadel::Database;
5use citadel_buffer::btree::{UpsertAction, UpsertOutcome};
6use citadel_txn::write_txn::WriteTxn;
7use rustc_hash::FxHashMap;
8
9use crate::encoding::{encode_composite_key_into, encode_row_into};
10use crate::error::{Result, SqlError};
11use crate::eval::{eval_expr, is_truthy, ColumnMap, EvalCtx};
12use crate::parser::*;
13use crate::types::*;
14
15use crate::schema::SchemaManager;
16
17use super::compile::CompiledPlan;
18use super::helpers::*;
19use super::CteContext;
20
21pub(super) fn exec_insert(
22    db: &Database,
23    schema: &SchemaManager,
24    stmt: &InsertStmt,
25    params: &[Value],
26) -> Result<ExecutionResult> {
27    let empty_ctes = CteContext::default();
28    let materialized;
29    let stmt = if insert_has_subquery(stmt) {
30        materialized = materialize_insert(stmt, &mut |sub| {
31            exec_subquery_read(db, schema, sub, &empty_ctes)
32        })?;
33        &materialized
34    } else {
35        stmt
36    };
37
38    let lower_name = stmt.table.to_ascii_lowercase();
39    if schema.get_view(&lower_name).is_some() {
40        return Err(SqlError::CannotModifyView(stmt.table.clone()));
41    }
42    let table_schema = schema
43        .get(&lower_name)
44        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
45
46    let insert_columns = if stmt.columns.is_empty() {
47        table_schema
48            .columns
49            .iter()
50            .map(|c| c.name.clone())
51            .collect::<Vec<_>>()
52    } else {
53        stmt.columns
54            .iter()
55            .map(|c| c.to_ascii_lowercase())
56            .collect()
57    };
58
59    let col_indices: Vec<usize> = insert_columns
60        .iter()
61        .map(|name| {
62            table_schema
63                .column_index(name)
64                .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
65        })
66        .collect::<Result<_>>()?;
67
68    for &ci in &col_indices {
69        if table_schema.columns[ci].generated_kind.is_some() {
70            return Err(SqlError::CannotInsertIntoGeneratedColumn(
71                table_schema.columns[ci].name.clone(),
72            ));
73        }
74    }
75
76    let defaults: Vec<(usize, &Expr)> = table_schema
77        .columns
78        .iter()
79        .filter(|c| c.default_expr.is_some() && !col_indices.contains(&(c.position as usize)))
80        .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
81        .collect();
82
83    let generated_cols: Vec<(usize, &Expr)> = table_schema
84        .columns
85        .iter()
86        .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
87        .map(|c| (c.position as usize, c.generated_expr.as_ref().unwrap()))
88        .collect();
89
90    let has_checks = table_schema.has_checks();
91    let row_col_map_for_gen = if !generated_cols.is_empty() {
92        Some(ColumnMap::new(&table_schema.columns))
93    } else {
94        None
95    };
96    let check_col_map = if has_checks {
97        Some(ColumnMap::new(&table_schema.columns))
98    } else {
99        None
100    };
101
102    let select_rows = match &stmt.source {
103        InsertSource::Select(sq) => {
104            let insert_ctes =
105                super::materialize_all_ctes(&sq.ctes, sq.recursive, &mut |body, ctx| {
106                    exec_query_body_read(db, schema, body, ctx)
107                })?;
108            let qr = exec_query_body_read(db, schema, &sq.body, &insert_ctes)?;
109            Some(qr.rows)
110        }
111        InsertSource::Values(_) => None,
112    };
113
114    let compiled_conflict: Option<Arc<CompiledOnConflict>> = stmt
115        .on_conflict
116        .as_ref()
117        .map(|oc| compile_on_conflict(oc, table_schema).map(Arc::new))
118        .transpose()?;
119
120    let row_col_map = compiled_conflict
121        .as_ref()
122        .map(|_| ColumnMap::new(&table_schema.columns));
123
124    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
125    let mut count: u64 = 0;
126    let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
127        stmt.returning.as_ref().map(|_| Vec::new());
128
129    let pk_indices = table_schema.pk_indices();
130    let non_pk = table_schema.non_pk_indices();
131    let enc_pos = table_schema.encoding_positions();
132    let phys_count = table_schema.physical_non_pk_count();
133    let mut row = vec![Value::Null; table_schema.columns.len()];
134    let mut pk_values: Vec<Value> = vec![Value::Null; pk_indices.len()];
135    let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
136    let mut key_buf: Vec<u8> = Vec::with_capacity(64);
137    let mut value_buf: Vec<u8> = Vec::with_capacity(256);
138    let mut fk_key_buf: Vec<u8> = Vec::with_capacity(64);
139
140    let values = match &stmt.source {
141        InsertSource::Values(rows) => Some(rows.as_slice()),
142        InsertSource::Select(_) => None,
143    };
144    let sel_rows = select_rows.as_deref();
145
146    let total = match (values, sel_rows) {
147        (Some(rows), _) => rows.len(),
148        (_, Some(rows)) => rows.len(),
149        _ => 0,
150    };
151
152    if let Some(sel) = sel_rows {
153        if !sel.is_empty() && sel[0].len() != insert_columns.len() {
154            return Err(SqlError::InvalidValue(format!(
155                "INSERT ... SELECT column count mismatch: expected {}, got {}",
156                insert_columns.len(),
157                sel[0].len()
158            )));
159        }
160    }
161
162    for idx in 0..total {
163        for v in row.iter_mut() {
164            *v = Value::Null;
165        }
166
167        if let Some(value_rows) = values {
168            let value_row = &value_rows[idx];
169            if value_row.len() != insert_columns.len() {
170                return Err(SqlError::InvalidValue(format!(
171                    "expected {} values, got {}",
172                    insert_columns.len(),
173                    value_row.len()
174                )));
175            }
176            for (i, expr) in value_row.iter().enumerate() {
177                let val = if let Expr::Parameter(n) = expr {
178                    params
179                        .get(n - 1)
180                        .cloned()
181                        .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?
182                } else {
183                    eval_const_expr(expr)?
184                };
185                let col_idx = col_indices[i];
186                let col = &table_schema.columns[col_idx];
187                let got_type = val.data_type();
188                row[col_idx] = if val.is_null() {
189                    Value::Null
190                } else {
191                    val.coerce_into(col.data_type)
192                        .ok_or_else(|| SqlError::TypeMismatch {
193                            expected: col.data_type.to_string(),
194                            got: got_type.to_string(),
195                        })?
196                };
197            }
198        } else if let Some(sel) = sel_rows {
199            let sel_row = &sel[idx];
200            for (i, val) in sel_row.iter().enumerate() {
201                let col_idx = col_indices[i];
202                let col = &table_schema.columns[col_idx];
203                let got_type = val.data_type();
204                row[col_idx] = if val.is_null() {
205                    Value::Null
206                } else {
207                    val.clone().coerce_into(col.data_type).ok_or_else(|| {
208                        SqlError::TypeMismatch {
209                            expected: col.data_type.to_string(),
210                            got: got_type.to_string(),
211                        }
212                    })?
213                };
214            }
215        }
216
217        for &(pos, def_expr) in &defaults {
218            let val = eval_const_expr(def_expr)?;
219            let col = &table_schema.columns[pos];
220            if !val.is_null() {
221                let got_type = val.data_type();
222                row[pos] =
223                    val.coerce_into(col.data_type)
224                        .ok_or_else(|| SqlError::TypeMismatch {
225                            expected: col.data_type.to_string(),
226                            got: got_type.to_string(),
227                        })?;
228            }
229        }
230
231        if let Some(ref gen_map) = row_col_map_for_gen {
232            for &(pos, gen_expr) in &generated_cols {
233                let val = eval_expr(gen_expr, &EvalCtx::new(gen_map, &row))?;
234                let col = &table_schema.columns[pos];
235                row[pos] = if val.is_null() {
236                    Value::Null
237                } else {
238                    let got_type = val.data_type();
239                    val.coerce_into(col.data_type)
240                        .ok_or_else(|| SqlError::TypeMismatch {
241                            expected: col.data_type.to_string(),
242                            got: got_type.to_string(),
243                        })?
244                };
245            }
246        }
247
248        for col in &table_schema.columns {
249            if !col.nullable && row[col.position as usize].is_null() {
250                return Err(SqlError::NotNullViolation(col.name.clone()));
251            }
252        }
253
254        if let Some(ref col_map) = check_col_map {
255            for col in &table_schema.columns {
256                if let Some(ref check) = col.check_expr {
257                    let result = eval_expr(check, &EvalCtx::new(col_map, &row))?;
258                    if !is_truthy(&result) && !result.is_null() {
259                        let name = col.check_name.as_deref().unwrap_or(&col.name);
260                        return Err(SqlError::CheckViolation(name.to_string()));
261                    }
262                }
263            }
264            for tc in &table_schema.check_constraints {
265                let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &row))?;
266                if !is_truthy(&result) && !result.is_null() {
267                    let name = tc.name.as_deref().unwrap_or(&tc.sql);
268                    return Err(SqlError::CheckViolation(name.to_string()));
269                }
270            }
271        }
272
273        for fk in &table_schema.foreign_keys {
274            let any_null = fk.columns.iter().any(|&ci| row[ci as usize].is_null());
275            if any_null {
276                continue; // MATCH SIMPLE: skip if any FK col is NULL
277            }
278            let fk_vals: Vec<Value> = fk
279                .columns
280                .iter()
281                .map(|&ci| row[ci as usize].clone())
282                .collect();
283            fk_key_buf.clear();
284            encode_composite_key_into(&fk_vals, &mut fk_key_buf);
285            let found = wtx
286                .table_get(fk.foreign_table.as_bytes(), &fk_key_buf)
287                .map_err(SqlError::Storage)?;
288            if found.is_none() {
289                let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
290                return Err(SqlError::ForeignKeyViolation(name.to_string()));
291            }
292        }
293
294        let proposed_row_for_returning: Option<Vec<Value>> =
295            returning_rows.as_ref().map(|_| row.clone());
296
297        for (j, &i) in pk_indices.iter().enumerate() {
298            pk_values[j] = std::mem::replace(&mut row[i], Value::Null);
299        }
300        encode_composite_key_into(&pk_values, &mut key_buf);
301
302        for (j, &i) in non_pk.iter().enumerate() {
303            let col = &table_schema.columns[i];
304            if matches!(
305                col.generated_kind,
306                Some(crate::parser::GeneratedKind::Virtual)
307            ) {
308                value_values[enc_pos[j] as usize] = Value::Null;
309                row[i] = Value::Null;
310            } else {
311                value_values[enc_pos[j] as usize] = std::mem::replace(&mut row[i], Value::Null);
312            }
313        }
314        encode_row_into(&value_values, &mut value_buf);
315
316        if key_buf.len() > citadel_core::MAX_KEY_SIZE {
317            return Err(SqlError::KeyTooLarge {
318                size: key_buf.len(),
319                max: citadel_core::MAX_KEY_SIZE,
320            });
321        }
322        if value_buf.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
323            return Err(SqlError::RowTooLarge {
324                size: value_buf.len(),
325                max: citadel_core::MAX_INLINE_VALUE_SIZE,
326            });
327        }
328
329        match compiled_conflict.as_ref() {
330            None => {
331                let is_new = wtx
332                    .table_insert(stmt.table.as_bytes(), &key_buf, &value_buf)
333                    .map_err(SqlError::Storage)?;
334                if !is_new {
335                    return Err(SqlError::DuplicateKey);
336                }
337                if !table_schema.indices.is_empty() {
338                    for (j, &i) in pk_indices.iter().enumerate() {
339                        row[i] = pk_values[j].clone();
340                    }
341                    for (j, &i) in non_pk.iter().enumerate() {
342                        row[i] =
343                            std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
344                    }
345                    insert_index_entries(&mut wtx, table_schema, &row, &pk_values)?;
346                }
347                count += 1;
348                if let Some(buf) = returning_rows.as_mut() {
349                    buf.push((None, proposed_row_for_returning));
350                }
351            }
352            Some(oc) => {
353                let oc_ref: &CompiledOnConflict = oc;
354                let needs_row = upsert_needs_row(oc_ref, table_schema);
355                if needs_row {
356                    for (j, &i) in pk_indices.iter().enumerate() {
357                        row[i] = pk_values[j].clone();
358                    }
359                    for (j, &i) in non_pk.iter().enumerate() {
360                        row[i] =
361                            std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
362                    }
363                }
364                let outcome = apply_insert_with_conflict(
365                    &mut wtx,
366                    table_schema,
367                    &key_buf,
368                    &value_buf,
369                    &row,
370                    &pk_values,
371                    oc_ref,
372                    row_col_map.as_ref().unwrap(),
373                    stmt.returning.is_some(),
374                )?;
375                match outcome {
376                    InsertRowOutcome::Inserted => {
377                        count += 1;
378                        if let Some(buf) = returning_rows.as_mut() {
379                            buf.push((None, proposed_row_for_returning));
380                        }
381                    }
382                    InsertRowOutcome::Updated { old, new } => {
383                        count += 1;
384                        if let Some(buf) = returning_rows.as_mut() {
385                            buf.push((Some(old), Some(new)));
386                        }
387                    }
388                    InsertRowOutcome::Skipped => {}
389                }
390            }
391        }
392    }
393
394    if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
395        let qr = super::helpers::project_returning(table_schema, returning_cols, &rows)?;
396        wtx.commit().map_err(SqlError::Storage)?;
397        return Ok(ExecutionResult::Query(qr));
398    }
399
400    wtx.commit().map_err(SqlError::Storage)?;
401    Ok(ExecutionResult::RowsAffected(count))
402}
403
404pub(super) fn has_subquery(expr: &Expr) -> bool {
405    crate::parser::has_subquery(expr)
406}
407
408pub(super) fn stmt_has_subquery(stmt: &SelectStmt) -> bool {
409    if let Some(ref w) = stmt.where_clause {
410        if has_subquery(w) {
411            return true;
412        }
413    }
414    if let Some(ref h) = stmt.having {
415        if has_subquery(h) {
416            return true;
417        }
418    }
419    for col in &stmt.columns {
420        if let SelectColumn::Expr { expr, .. } = col {
421            if has_subquery(expr) {
422                return true;
423            }
424        }
425    }
426    for ob in &stmt.order_by {
427        if has_subquery(&ob.expr) {
428            return true;
429        }
430    }
431    for join in &stmt.joins {
432        if let Some(ref on_expr) = join.on_clause {
433            if has_subquery(on_expr) {
434                return true;
435            }
436        }
437    }
438    false
439}
440
441pub(super) fn materialize_expr(
442    expr: &Expr,
443    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
444) -> Result<Expr> {
445    match expr {
446        Expr::InSubquery {
447            expr: e,
448            subquery,
449            negated,
450        } => {
451            let inner = materialize_expr(e, exec_sub)?;
452            let qr = exec_sub(subquery)?;
453            if !qr.columns.is_empty() && qr.columns.len() != 1 {
454                return Err(SqlError::SubqueryMultipleColumns);
455            }
456            let mut values = rustc_hash::FxHashSet::default();
457            let mut has_null = false;
458            for row in &qr.rows {
459                if row[0].is_null() {
460                    has_null = true;
461                } else {
462                    values.insert(row[0].clone());
463                }
464            }
465            Ok(Expr::InSet {
466                expr: Box::new(inner),
467                values,
468                has_null,
469                negated: *negated,
470            })
471        }
472        Expr::ScalarSubquery(subquery) => {
473            let qr = exec_sub(subquery)?;
474            if qr.rows.len() > 1 {
475                return Err(SqlError::SubqueryMultipleRows);
476            }
477            let val = if qr.rows.is_empty() {
478                Value::Null
479            } else {
480                qr.rows[0][0].clone()
481            };
482            Ok(Expr::Literal(val))
483        }
484        Expr::Exists { subquery, negated } => {
485            let qr = exec_sub(subquery)?;
486            let exists = !qr.rows.is_empty();
487            let result = if *negated { !exists } else { exists };
488            Ok(Expr::Literal(Value::Boolean(result)))
489        }
490        Expr::InList {
491            expr: e,
492            list,
493            negated,
494        } => {
495            let inner = materialize_expr(e, exec_sub)?;
496            let items = list
497                .iter()
498                .map(|item| materialize_expr(item, exec_sub))
499                .collect::<Result<Vec<_>>>()?;
500            Ok(Expr::InList {
501                expr: Box::new(inner),
502                list: items,
503                negated: *negated,
504            })
505        }
506        Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
507            left: Box::new(materialize_expr(left, exec_sub)?),
508            op: *op,
509            right: Box::new(materialize_expr(right, exec_sub)?),
510        }),
511        Expr::UnaryOp { op, expr: e } => Ok(Expr::UnaryOp {
512            op: *op,
513            expr: Box::new(materialize_expr(e, exec_sub)?),
514        }),
515        Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(materialize_expr(e, exec_sub)?))),
516        Expr::IsNotNull(e) => Ok(Expr::IsNotNull(Box::new(materialize_expr(e, exec_sub)?))),
517        Expr::InSet {
518            expr: e,
519            values,
520            has_null,
521            negated,
522        } => Ok(Expr::InSet {
523            expr: Box::new(materialize_expr(e, exec_sub)?),
524            values: values.clone(),
525            has_null: *has_null,
526            negated: *negated,
527        }),
528        Expr::Between {
529            expr: e,
530            low,
531            high,
532            negated,
533        } => Ok(Expr::Between {
534            expr: Box::new(materialize_expr(e, exec_sub)?),
535            low: Box::new(materialize_expr(low, exec_sub)?),
536            high: Box::new(materialize_expr(high, exec_sub)?),
537            negated: *negated,
538        }),
539        Expr::Like {
540            expr: e,
541            pattern,
542            escape,
543            negated,
544        } => {
545            let esc = escape
546                .as_ref()
547                .map(|es| materialize_expr(es, exec_sub).map(Box::new))
548                .transpose()?;
549            Ok(Expr::Like {
550                expr: Box::new(materialize_expr(e, exec_sub)?),
551                pattern: Box::new(materialize_expr(pattern, exec_sub)?),
552                escape: esc,
553                negated: *negated,
554            })
555        }
556        Expr::Case {
557            operand,
558            conditions,
559            else_result,
560        } => {
561            let op = operand
562                .as_ref()
563                .map(|e| materialize_expr(e, exec_sub).map(Box::new))
564                .transpose()?;
565            let conds = conditions
566                .iter()
567                .map(|(c, r)| {
568                    Ok((
569                        materialize_expr(c, exec_sub)?,
570                        materialize_expr(r, exec_sub)?,
571                    ))
572                })
573                .collect::<Result<Vec<_>>>()?;
574            let else_r = else_result
575                .as_ref()
576                .map(|e| materialize_expr(e, exec_sub).map(Box::new))
577                .transpose()?;
578            Ok(Expr::Case {
579                operand: op,
580                conditions: conds,
581                else_result: else_r,
582            })
583        }
584        Expr::Coalesce(args) => {
585            let materialized = args
586                .iter()
587                .map(|a| materialize_expr(a, exec_sub))
588                .collect::<Result<Vec<_>>>()?;
589            Ok(Expr::Coalesce(materialized))
590        }
591        Expr::Cast { expr: e, data_type } => Ok(Expr::Cast {
592            expr: Box::new(materialize_expr(e, exec_sub)?),
593            data_type: *data_type,
594        }),
595        Expr::Function {
596            name,
597            args,
598            distinct,
599        } => {
600            let materialized = args
601                .iter()
602                .map(|a| materialize_expr(a, exec_sub))
603                .collect::<Result<Vec<_>>>()?;
604            Ok(Expr::Function {
605                name: name.clone(),
606                args: materialized,
607                distinct: *distinct,
608            })
609        }
610        other => Ok(other.clone()),
611    }
612}
613
614pub(super) fn materialize_stmt(
615    stmt: &SelectStmt,
616    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
617) -> Result<SelectStmt> {
618    let where_clause = stmt
619        .where_clause
620        .as_ref()
621        .map(|e| materialize_expr(e, exec_sub))
622        .transpose()?;
623    let having = stmt
624        .having
625        .as_ref()
626        .map(|e| materialize_expr(e, exec_sub))
627        .transpose()?;
628    let columns = stmt
629        .columns
630        .iter()
631        .map(|c| match c {
632            SelectColumn::AllColumns => Ok(SelectColumn::AllColumns),
633            SelectColumn::AllFromOld => Ok(SelectColumn::AllFromOld),
634            SelectColumn::AllFromNew => Ok(SelectColumn::AllFromNew),
635            SelectColumn::Expr { expr, alias } => Ok(SelectColumn::Expr {
636                expr: materialize_expr(expr, exec_sub)?,
637                alias: alias.clone(),
638            }),
639        })
640        .collect::<Result<Vec<_>>>()?;
641    let order_by = stmt
642        .order_by
643        .iter()
644        .map(|ob| {
645            Ok(OrderByItem {
646                expr: materialize_expr(&ob.expr, exec_sub)?,
647                descending: ob.descending,
648                nulls_first: ob.nulls_first,
649            })
650        })
651        .collect::<Result<Vec<_>>>()?;
652    let joins = stmt
653        .joins
654        .iter()
655        .map(|j| {
656            let on_clause = j
657                .on_clause
658                .as_ref()
659                .map(|e| materialize_expr(e, exec_sub))
660                .transpose()?;
661            Ok(JoinClause {
662                join_type: j.join_type,
663                table: j.table.clone(),
664                on_clause,
665            })
666        })
667        .collect::<Result<Vec<_>>>()?;
668    let group_by = stmt
669        .group_by
670        .iter()
671        .map(|e| materialize_expr(e, exec_sub))
672        .collect::<Result<Vec<_>>>()?;
673    Ok(SelectStmt {
674        columns,
675        from: stmt.from.clone(),
676        from_alias: stmt.from_alias.clone(),
677        joins,
678        distinct: stmt.distinct,
679        where_clause,
680        order_by,
681        limit: stmt.limit.clone(),
682        offset: stmt.offset.clone(),
683        group_by,
684        having,
685    })
686}
687
688pub(super) fn exec_subquery_read(
689    db: &Database,
690    schema: &SchemaManager,
691    stmt: &SelectStmt,
692    ctes: &CteContext,
693) -> Result<QueryResult> {
694    match super::exec_select(db, schema, stmt, ctes)? {
695        ExecutionResult::Query(qr) => Ok(qr),
696        _ => Ok(QueryResult {
697            columns: vec![],
698            rows: vec![],
699        }),
700    }
701}
702
703pub(super) fn exec_subquery_write(
704    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
705    schema: &SchemaManager,
706    stmt: &SelectStmt,
707    ctes: &CteContext,
708) -> Result<QueryResult> {
709    match super::exec_select_in_txn(wtx, schema, stmt, ctes)? {
710        ExecutionResult::Query(qr) => Ok(qr),
711        _ => Ok(QueryResult {
712            columns: vec![],
713            rows: vec![],
714        }),
715    }
716}
717
718pub(super) fn update_has_subquery(stmt: &UpdateStmt) -> bool {
719    stmt.where_clause.as_ref().is_some_and(has_subquery)
720        || stmt.assignments.iter().any(|(_, e)| has_subquery(e))
721}
722
723pub(super) fn materialize_update(
724    stmt: &UpdateStmt,
725    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
726) -> Result<UpdateStmt> {
727    let where_clause = stmt
728        .where_clause
729        .as_ref()
730        .map(|e| materialize_expr(e, exec_sub))
731        .transpose()?;
732    let assignments = stmt
733        .assignments
734        .iter()
735        .map(|(name, expr)| Ok((name.clone(), materialize_expr(expr, exec_sub)?)))
736        .collect::<Result<Vec<_>>>()?;
737    Ok(UpdateStmt {
738        table: stmt.table.clone(),
739        assignments,
740        where_clause,
741        returning: stmt.returning.clone(),
742    })
743}
744
745pub(super) fn delete_has_subquery(stmt: &DeleteStmt) -> bool {
746    stmt.where_clause.as_ref().is_some_and(has_subquery)
747}
748
749pub(super) fn materialize_delete(
750    stmt: &DeleteStmt,
751    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
752) -> Result<DeleteStmt> {
753    let where_clause = stmt
754        .where_clause
755        .as_ref()
756        .map(|e| materialize_expr(e, exec_sub))
757        .transpose()?;
758    Ok(DeleteStmt {
759        table: stmt.table.clone(),
760        where_clause,
761        returning: stmt.returning.clone(),
762    })
763}
764
765pub(super) fn insert_has_subquery(stmt: &InsertStmt) -> bool {
766    match &stmt.source {
767        InsertSource::Values(rows) => rows.iter().any(|row| row.iter().any(has_subquery)),
768        // SELECT source subqueries are handled by exec_select's correlated/non-correlated paths
769        InsertSource::Select(_) => false,
770    }
771}
772
773pub(super) fn materialize_insert(
774    stmt: &InsertStmt,
775    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
776) -> Result<InsertStmt> {
777    let source = match &stmt.source {
778        InsertSource::Values(rows) => {
779            let mat = rows
780                .iter()
781                .map(|row| {
782                    row.iter()
783                        .map(|e| materialize_expr(e, exec_sub))
784                        .collect::<Result<Vec<_>>>()
785                })
786                .collect::<Result<Vec<_>>>()?;
787            InsertSource::Values(mat)
788        }
789        InsertSource::Select(sq) => {
790            let ctes = sq
791                .ctes
792                .iter()
793                .map(|c| {
794                    Ok(CteDefinition {
795                        name: c.name.clone(),
796                        column_aliases: c.column_aliases.clone(),
797                        body: materialize_query_body(&c.body, exec_sub)?,
798                    })
799                })
800                .collect::<Result<Vec<_>>>()?;
801            let body = materialize_query_body(&sq.body, exec_sub)?;
802            InsertSource::Select(Box::new(SelectQuery {
803                ctes,
804                recursive: sq.recursive,
805                body,
806            }))
807        }
808    };
809    Ok(InsertStmt {
810        table: stmt.table.clone(),
811        columns: stmt.columns.clone(),
812        source,
813        on_conflict: stmt.on_conflict.clone(),
814        returning: stmt.returning.clone(),
815    })
816}
817
818pub(super) fn materialize_query_body(
819    body: &QueryBody,
820    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
821) -> Result<QueryBody> {
822    match body {
823        QueryBody::Select(sel) => Ok(QueryBody::Select(Box::new(materialize_stmt(
824            sel, exec_sub,
825        )?))),
826        QueryBody::Compound(comp) => Ok(QueryBody::Compound(Box::new(CompoundSelect {
827            op: comp.op.clone(),
828            all: comp.all,
829            left: Box::new(materialize_query_body(&comp.left, exec_sub)?),
830            right: Box::new(materialize_query_body(&comp.right, exec_sub)?),
831            order_by: comp.order_by.clone(),
832            limit: comp.limit.clone(),
833            offset: comp.offset.clone(),
834        }))),
835    }
836}
837
838pub(super) fn exec_query_body(
839    db: &Database,
840    schema: &SchemaManager,
841    body: &QueryBody,
842    ctes: &CteContext,
843) -> Result<ExecutionResult> {
844    match body {
845        QueryBody::Select(sel) => super::exec_select(db, schema, sel, ctes),
846        QueryBody::Compound(comp) => exec_compound_select(db, schema, comp, ctes),
847    }
848}
849
850pub(super) fn exec_query_body_in_txn(
851    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
852    schema: &SchemaManager,
853    body: &QueryBody,
854    ctes: &CteContext,
855) -> Result<ExecutionResult> {
856    match body {
857        QueryBody::Select(sel) => super::exec_select_in_txn(wtx, schema, sel, ctes),
858        QueryBody::Compound(comp) => exec_compound_select_in_txn(wtx, schema, comp, ctes),
859    }
860}
861
862pub(super) fn exec_query_body_read(
863    db: &Database,
864    schema: &SchemaManager,
865    body: &QueryBody,
866    ctes: &CteContext,
867) -> Result<QueryResult> {
868    match exec_query_body(db, schema, body, ctes)? {
869        ExecutionResult::Query(qr) => Ok(qr),
870        _ => Ok(QueryResult {
871            columns: vec![],
872            rows: vec![],
873        }),
874    }
875}
876
877pub(super) fn exec_query_body_write(
878    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
879    schema: &SchemaManager,
880    body: &QueryBody,
881    ctes: &CteContext,
882) -> Result<QueryResult> {
883    match exec_query_body_in_txn(wtx, schema, body, ctes)? {
884        ExecutionResult::Query(qr) => Ok(qr),
885        _ => Ok(QueryResult {
886            columns: vec![],
887            rows: vec![],
888        }),
889    }
890}
891
892pub(super) fn exec_compound_select(
893    db: &Database,
894    schema: &SchemaManager,
895    comp: &CompoundSelect,
896    ctes: &CteContext,
897) -> Result<ExecutionResult> {
898    let left_qr = match exec_query_body(db, schema, &comp.left, ctes)? {
899        ExecutionResult::Query(qr) => qr,
900        _ => QueryResult {
901            columns: vec![],
902            rows: vec![],
903        },
904    };
905    let right_qr = match exec_query_body(db, schema, &comp.right, ctes)? {
906        ExecutionResult::Query(qr) => qr,
907        _ => QueryResult {
908            columns: vec![],
909            rows: vec![],
910        },
911    };
912    apply_set_operation(comp, left_qr, right_qr)
913}
914
915pub(super) fn exec_compound_select_in_txn(
916    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
917    schema: &SchemaManager,
918    comp: &CompoundSelect,
919    ctes: &CteContext,
920) -> Result<ExecutionResult> {
921    let left_qr = match exec_query_body_in_txn(wtx, schema, &comp.left, ctes)? {
922        ExecutionResult::Query(qr) => qr,
923        _ => QueryResult {
924            columns: vec![],
925            rows: vec![],
926        },
927    };
928    let right_qr = match exec_query_body_in_txn(wtx, schema, &comp.right, ctes)? {
929        ExecutionResult::Query(qr) => qr,
930        _ => QueryResult {
931            columns: vec![],
932            rows: vec![],
933        },
934    };
935    apply_set_operation(comp, left_qr, right_qr)
936}
937
938pub(super) fn apply_set_operation(
939    comp: &CompoundSelect,
940    left_qr: QueryResult,
941    right_qr: QueryResult,
942) -> Result<ExecutionResult> {
943    if !left_qr.columns.is_empty()
944        && !right_qr.columns.is_empty()
945        && left_qr.columns.len() != right_qr.columns.len()
946    {
947        return Err(SqlError::CompoundColumnCountMismatch {
948            left: left_qr.columns.len(),
949            right: right_qr.columns.len(),
950        });
951    }
952
953    let columns = left_qr.columns;
954
955    let mut rows = match (&comp.op, comp.all) {
956        (SetOp::Union, true) => {
957            let mut rows = left_qr.rows;
958            rows.extend(right_qr.rows);
959            rows
960        }
961        (SetOp::Union, false) => {
962            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
963            let mut rows = Vec::new();
964            for row in left_qr.rows.into_iter().chain(right_qr.rows) {
965                if !seen.contains(&row) {
966                    seen.insert(row.clone());
967                    rows.push(row);
968                }
969            }
970            rows
971        }
972        (SetOp::Intersect, true) => {
973            let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
974            for row in &right_qr.rows {
975                *right_counts.entry(row.clone()).or_insert(0) += 1;
976            }
977            let mut rows = Vec::new();
978            for row in left_qr.rows {
979                if let Some(count) = right_counts.get_mut(&row) {
980                    if *count > 0 {
981                        *count -= 1;
982                        rows.push(row);
983                    }
984                }
985            }
986            rows
987        }
988        (SetOp::Intersect, false) => {
989            let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
990            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
991            let mut rows = Vec::new();
992            for row in left_qr.rows {
993                if right_set.contains(&row) && !seen.contains(&row) {
994                    seen.insert(row.clone());
995                    rows.push(row);
996                }
997            }
998            rows
999        }
1000        (SetOp::Except, true) => {
1001            let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1002            for row in &right_qr.rows {
1003                *right_counts.entry(row.clone()).or_insert(0) += 1;
1004            }
1005            let mut rows = Vec::new();
1006            for row in left_qr.rows {
1007                if let Some(count) = right_counts.get_mut(&row) {
1008                    if *count > 0 {
1009                        *count -= 1;
1010                        continue;
1011                    }
1012                }
1013                rows.push(row);
1014            }
1015            rows
1016        }
1017        (SetOp::Except, false) => {
1018            let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1019            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1020            let mut rows = Vec::new();
1021            for row in left_qr.rows {
1022                if !right_set.contains(&row) && !seen.contains(&row) {
1023                    seen.insert(row.clone());
1024                    rows.push(row);
1025                }
1026            }
1027            rows
1028        }
1029    };
1030
1031    if !comp.order_by.is_empty() {
1032        let col_defs: Vec<crate::types::ColumnDef> = columns
1033            .iter()
1034            .enumerate()
1035            .map(|(i, name)| crate::types::ColumnDef {
1036                name: name.clone(),
1037                data_type: crate::types::DataType::Null,
1038                nullable: true,
1039                position: i as u16,
1040                default_expr: None,
1041                default_sql: None,
1042                check_expr: None,
1043                check_sql: None,
1044                check_name: None,
1045                is_with_timezone: false,
1046                generated_expr: None,
1047                generated_sql: None,
1048                generated_kind: None,
1049            })
1050            .collect();
1051        sort_rows(&mut rows, &comp.order_by, &col_defs)?;
1052    }
1053
1054    if let Some(ref offset_expr) = comp.offset {
1055        let offset = eval_const_int(offset_expr)?.max(0) as usize;
1056        if offset < rows.len() {
1057            rows = rows.split_off(offset);
1058        } else {
1059            rows.clear();
1060        }
1061    }
1062
1063    if let Some(ref limit_expr) = comp.limit {
1064        let limit = eval_const_int(limit_expr)?.max(0) as usize;
1065        rows.truncate(limit);
1066    }
1067
1068    Ok(ExecutionResult::Query(QueryResult { columns, rows }))
1069}
1070
1071struct InsertBufs {
1072    row: Vec<Value>,
1073    pk_values: Vec<Value>,
1074    value_values: Vec<Value>,
1075    key_buf: Vec<u8>,
1076    value_buf: Vec<u8>,
1077    col_indices: Vec<usize>,
1078    fk_key_buf: Vec<u8>,
1079}
1080
1081impl InsertBufs {
1082    fn new() -> Self {
1083        Self {
1084            row: Vec::new(),
1085            pk_values: Vec::new(),
1086            value_values: Vec::new(),
1087            key_buf: Vec::with_capacity(64),
1088            value_buf: Vec::with_capacity(256),
1089            col_indices: Vec::new(),
1090            fk_key_buf: Vec::with_capacity(64),
1091        }
1092    }
1093}
1094
1095thread_local! {
1096    static INSERT_SCRATCH: RefCell<InsertBufs> = RefCell::new(InsertBufs::new());
1097    static UPSERT_SCRATCH: RefCell<UpsertBufs> = RefCell::new(UpsertBufs::new());
1098}
1099
1100fn with_insert_scratch<R>(f: impl FnOnce(&mut InsertBufs) -> R) -> R {
1101    INSERT_SCRATCH.with(|slot| f(&mut slot.borrow_mut()))
1102}
1103
1104pub(super) struct UpsertBufs {
1105    old_row: Vec<Value>,
1106    new_row: Vec<Value>,
1107    value_values: Vec<Value>,
1108    new_value_buf: Vec<u8>,
1109}
1110
1111impl UpsertBufs {
1112    pub(super) fn new() -> Self {
1113        Self {
1114            old_row: Vec::new(),
1115            new_row: Vec::new(),
1116            value_values: Vec::new(),
1117            new_value_buf: Vec::with_capacity(256),
1118        }
1119    }
1120}
1121
1122pub fn exec_insert_in_txn(
1123    wtx: &mut WriteTxn<'_>,
1124    schema: &SchemaManager,
1125    stmt: &InsertStmt,
1126    params: &[Value],
1127) -> Result<ExecutionResult> {
1128    with_insert_scratch(|bufs| exec_insert_in_txn_impl(wtx, schema, stmt, params, bufs, None))
1129}
1130
1131fn exec_insert_in_txn_cached(
1132    wtx: &mut WriteTxn<'_>,
1133    schema: &SchemaManager,
1134    stmt: &InsertStmt,
1135    params: &[Value],
1136    cache: &InsertCache,
1137) -> Result<ExecutionResult> {
1138    with_insert_scratch(|bufs| {
1139        exec_insert_in_txn_impl(wtx, schema, stmt, params, bufs, Some(cache))
1140    })
1141}
1142
1143fn exec_insert_in_txn_impl(
1144    wtx: &mut WriteTxn<'_>,
1145    schema: &SchemaManager,
1146    stmt: &InsertStmt,
1147    params: &[Value],
1148    bufs: &mut InsertBufs,
1149    cache: Option<&InsertCache>,
1150) -> Result<ExecutionResult> {
1151    let empty_ctes = CteContext::default();
1152    let materialized;
1153    let has_sub = match cache {
1154        Some(c) => c.has_subquery,
1155        None => insert_has_subquery(stmt),
1156    };
1157    let stmt = if has_sub {
1158        materialized = materialize_insert(stmt, &mut |sub| {
1159            exec_subquery_write(wtx, schema, sub, &empty_ctes)
1160        })?;
1161        &materialized
1162    } else {
1163        stmt
1164    };
1165
1166    let table_schema = schema
1167        .get(&stmt.table)
1168        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1169
1170    let default_columns;
1171    let insert_columns: &[String] = if stmt.columns.is_empty() {
1172        default_columns = table_schema
1173            .columns
1174            .iter()
1175            .map(|c| c.name.clone())
1176            .collect::<Vec<_>>();
1177        &default_columns
1178    } else {
1179        &stmt.columns
1180    };
1181
1182    bufs.col_indices.clear();
1183    if let Some(c) = cache {
1184        bufs.col_indices.extend_from_slice(&c.col_indices);
1185    } else {
1186        for name in insert_columns {
1187            bufs.col_indices.push(
1188                table_schema
1189                    .column_index(name)
1190                    .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?,
1191            );
1192        }
1193    }
1194
1195    if cache.is_none() {
1196        for &ci in &bufs.col_indices {
1197            if table_schema.columns[ci].generated_kind.is_some() {
1198                return Err(SqlError::CannotInsertIntoGeneratedColumn(
1199                    table_schema.columns[ci].name.clone(),
1200                ));
1201            }
1202        }
1203    }
1204
1205    let generated_cols_uncached: Vec<(usize, &Expr, FastGenEval)>;
1206    let cached_gen_positions: &[usize];
1207    let cached_gen_fast_evals: &[FastGenEval];
1208    if let Some(c) = cache {
1209        cached_gen_positions = &c.generated_col_positions;
1210        cached_gen_fast_evals = &c.generated_fast_evals;
1211        generated_cols_uncached = Vec::new();
1212    } else {
1213        cached_gen_positions = &[];
1214        cached_gen_fast_evals = &[];
1215        generated_cols_uncached = table_schema
1216            .columns
1217            .iter()
1218            .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
1219            .map(|c| {
1220                let expr = c.generated_expr.as_ref().unwrap();
1221                let fe = detect_fast_gen_eval(expr, table_schema);
1222                (c.position as usize, expr, fe)
1223            })
1224            .collect();
1225    }
1226    let has_gen_cols = !cached_gen_positions.is_empty() || !generated_cols_uncached.is_empty();
1227    let row_col_map_for_gen_owned: Option<ColumnMap> = if !has_gen_cols || cache.is_some() {
1228        None
1229    } else {
1230        Some(ColumnMap::new(&table_schema.columns))
1231    };
1232    let row_col_map_for_gen: Option<&ColumnMap> = if !has_gen_cols {
1233        None
1234    } else if let Some(c) = cache {
1235        c.row_col_map.as_ref()
1236    } else {
1237        row_col_map_for_gen_owned.as_ref()
1238    };
1239
1240    let any_defaults = match cache {
1241        Some(c) => c.any_defaults,
1242        None => table_schema
1243            .columns
1244            .iter()
1245            .any(|c| c.default_expr.is_some()),
1246    };
1247    let defaults: Vec<(usize, &Expr)> = if any_defaults {
1248        table_schema
1249            .columns
1250            .iter()
1251            .filter(|c| {
1252                c.default_expr.is_some() && !bufs.col_indices.contains(&(c.position as usize))
1253            })
1254            .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
1255            .collect()
1256    } else {
1257        Vec::new()
1258    };
1259
1260    let has_checks = match cache {
1261        Some(c) => c.has_checks,
1262        None => table_schema.has_checks(),
1263    };
1264    let check_col_map = if has_checks {
1265        Some(ColumnMap::new(&table_schema.columns))
1266    } else {
1267        None
1268    };
1269
1270    let (pk_indices, non_pk, enc_pos, phys_count, dropped): (
1271        &[usize],
1272        &[usize],
1273        &[u16],
1274        usize,
1275        &[u16],
1276    ) = if let Some(c) = cache {
1277        (
1278            &c.pk_indices,
1279            &c.non_pk_indices,
1280            &c.encoding_positions,
1281            c.phys_count,
1282            &c.dropped_non_pk_slots,
1283        )
1284    } else {
1285        (
1286            table_schema.pk_indices(),
1287            table_schema.non_pk_indices(),
1288            table_schema.encoding_positions(),
1289            table_schema.physical_non_pk_count(),
1290            table_schema.dropped_non_pk_slots(),
1291        )
1292    };
1293
1294    bufs.row.resize(table_schema.columns.len(), Value::Null);
1295    bufs.pk_values.resize(pk_indices.len(), Value::Null);
1296    bufs.value_values.resize(phys_count, Value::Null);
1297
1298    let table_bytes = stmt.table.as_bytes();
1299    let has_fks = !table_schema.foreign_keys.is_empty();
1300    let has_indices = !table_schema.indices.is_empty();
1301    let has_defaults = !defaults.is_empty();
1302
1303    let compiled_conflict: Option<Arc<CompiledOnConflict>> = match (cache, &stmt.on_conflict) {
1304        (Some(c), Some(_)) if c.on_conflict.is_some() => c.on_conflict.clone(),
1305        (_, Some(oc)) => Some(Arc::new(compile_on_conflict(oc, table_schema)?)),
1306        (_, None) => None,
1307    };
1308
1309    let row_col_map_owned: Option<ColumnMap> =
1310        if compiled_conflict.is_some() && cache.and_then(|c| c.row_col_map.as_ref()).is_none() {
1311            Some(ColumnMap::new(&table_schema.columns))
1312        } else {
1313            None
1314        };
1315    let row_col_map: Option<&ColumnMap> = cache
1316        .and_then(|c| c.row_col_map.as_ref())
1317        .or(row_col_map_owned.as_ref());
1318
1319    let select_rows = match &stmt.source {
1320        InsertSource::Select(sq) => {
1321            let insert_ctes =
1322                super::materialize_all_ctes(&sq.ctes, sq.recursive, &mut |body, ctx| {
1323                    exec_query_body_write(wtx, schema, body, ctx)
1324                })?;
1325            let qr = exec_query_body_write(wtx, schema, &sq.body, &insert_ctes)?;
1326            Some(qr.rows)
1327        }
1328        InsertSource::Values(_) => None,
1329    };
1330
1331    let mut count: u64 = 0;
1332    let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
1333        stmt.returning.as_ref().map(|_| Vec::new());
1334
1335    let values = match &stmt.source {
1336        InsertSource::Values(rows) => Some(rows.as_slice()),
1337        InsertSource::Select(_) => None,
1338    };
1339    let sel_rows = select_rows.as_deref();
1340
1341    let total = match (values, sel_rows) {
1342        (Some(rows), _) => rows.len(),
1343        (_, Some(rows)) => rows.len(),
1344        _ => 0,
1345    };
1346
1347    if let Some(sel) = sel_rows {
1348        if !sel.is_empty() && sel[0].len() != insert_columns.len() {
1349            return Err(SqlError::InvalidValue(format!(
1350                "INSERT ... SELECT column count mismatch: expected {}, got {}",
1351                insert_columns.len(),
1352                sel[0].len()
1353            )));
1354        }
1355    }
1356
1357    let skip_row_clear = cache.is_some_and(|c| c.row_fully_overwritten);
1358    for idx in 0..total {
1359        if !skip_row_clear {
1360            for v in bufs.row.iter_mut() {
1361                *v = Value::Null;
1362            }
1363        }
1364
1365        if let Some(value_rows) = values {
1366            if let Some(plan) = cache.and_then(|c| c.bind_plan.as_ref()) {
1367                for action in plan {
1368                    match action {
1369                        BindAction::Param {
1370                            param_idx,
1371                            col_idx,
1372                            target,
1373                        } => {
1374                            let v = &params[*param_idx];
1375                            bufs.row[*col_idx] = if v.is_null() {
1376                                Value::Null
1377                            } else if v.data_type() == *target {
1378                                v.clone()
1379                            } else {
1380                                let got = v.data_type();
1381                                v.clone().coerce_into(*target).ok_or_else(|| {
1382                                    SqlError::TypeMismatch {
1383                                        expected: target.to_string(),
1384                                        got: got.to_string(),
1385                                    }
1386                                })?
1387                            };
1388                        }
1389                        BindAction::Literal { value, col_idx } => {
1390                            bufs.row[*col_idx] = value.clone();
1391                        }
1392                    }
1393                }
1394            } else {
1395                let value_row = &value_rows[idx];
1396                if value_row.len() != insert_columns.len() {
1397                    return Err(SqlError::InvalidValue(format!(
1398                        "expected {} values, got {}",
1399                        insert_columns.len(),
1400                        value_row.len()
1401                    )));
1402                }
1403                for (i, expr) in value_row.iter().enumerate() {
1404                    let val = match expr {
1405                        Expr::Parameter(n) => params
1406                            .get(n - 1)
1407                            .cloned()
1408                            .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
1409                        Expr::Literal(v) => v.clone(),
1410                        _ => eval_const_expr(expr)?,
1411                    };
1412                    let col_idx = bufs.col_indices[i];
1413                    let col = &table_schema.columns[col_idx];
1414                    let got_type = val.data_type();
1415                    bufs.row[col_idx] = if val.is_null() {
1416                        Value::Null
1417                    } else {
1418                        val.coerce_into(col.data_type)
1419                            .ok_or_else(|| SqlError::TypeMismatch {
1420                                expected: col.data_type.to_string(),
1421                                got: got_type.to_string(),
1422                            })?
1423                    };
1424                }
1425            }
1426        } else if let Some(sel) = sel_rows {
1427            let sel_row = &sel[idx];
1428            for (i, val) in sel_row.iter().enumerate() {
1429                let col_idx = bufs.col_indices[i];
1430                let col = &table_schema.columns[col_idx];
1431                let got_type = val.data_type();
1432                bufs.row[col_idx] = if val.is_null() {
1433                    Value::Null
1434                } else {
1435                    val.clone().coerce_into(col.data_type).ok_or_else(|| {
1436                        SqlError::TypeMismatch {
1437                            expected: col.data_type.to_string(),
1438                            got: got_type.to_string(),
1439                        }
1440                    })?
1441                };
1442            }
1443        }
1444
1445        if has_defaults {
1446            for &(pos, def_expr) in &defaults {
1447                let val = eval_const_expr(def_expr)?;
1448                let col = &table_schema.columns[pos];
1449                if !val.is_null() {
1450                    let got_type = val.data_type();
1451                    bufs.row[pos] =
1452                        val.coerce_into(col.data_type)
1453                            .ok_or_else(|| SqlError::TypeMismatch {
1454                                expected: col.data_type.to_string(),
1455                                got: got_type.to_string(),
1456                            })?;
1457                }
1458            }
1459        }
1460
1461        if let Some(gen_map) = row_col_map_for_gen {
1462            if cache.is_some() {
1463                for (pos, fast) in cached_gen_positions
1464                    .iter()
1465                    .copied()
1466                    .zip(cached_gen_fast_evals.iter())
1467                {
1468                    let gen_expr = table_schema.columns[pos].generated_expr.as_ref().unwrap();
1469                    let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1470                    let col = &table_schema.columns[pos];
1471                    bufs.row[pos] = if val.is_null() {
1472                        Value::Null
1473                    } else {
1474                        let got_type = val.data_type();
1475                        val.coerce_into(col.data_type)
1476                            .ok_or_else(|| SqlError::TypeMismatch {
1477                                expected: col.data_type.to_string(),
1478                                got: got_type.to_string(),
1479                            })?
1480                    };
1481                }
1482            } else {
1483                for (pos, gen_expr, fast) in &generated_cols_uncached {
1484                    let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1485                    let col = &table_schema.columns[*pos];
1486                    bufs.row[*pos] = if val.is_null() {
1487                        Value::Null
1488                    } else {
1489                        let got_type = val.data_type();
1490                        val.coerce_into(col.data_type)
1491                            .ok_or_else(|| SqlError::TypeMismatch {
1492                                expected: col.data_type.to_string(),
1493                                got: got_type.to_string(),
1494                            })?
1495                    };
1496                }
1497            }
1498        }
1499
1500        if let Some(c) = cache {
1501            for &pos in &c.not_null_indices {
1502                if bufs.row[pos as usize].is_null() {
1503                    return Err(SqlError::NotNullViolation(
1504                        table_schema.columns[pos as usize].name.clone(),
1505                    ));
1506                }
1507            }
1508        } else {
1509            for col in &table_schema.columns {
1510                if !col.nullable && bufs.row[col.position as usize].is_null() {
1511                    return Err(SqlError::NotNullViolation(col.name.clone()));
1512                }
1513            }
1514        }
1515
1516        if let Some(ref col_map) = check_col_map {
1517            for col in &table_schema.columns {
1518                if let Some(ref check) = col.check_expr {
1519                    let result = eval_expr(check, &EvalCtx::new(col_map, &bufs.row))?;
1520                    if !is_truthy(&result) && !result.is_null() {
1521                        let name = col.check_name.as_deref().unwrap_or(&col.name);
1522                        return Err(SqlError::CheckViolation(name.to_string()));
1523                    }
1524                }
1525            }
1526            for tc in &table_schema.check_constraints {
1527                let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &bufs.row))?;
1528                if !is_truthy(&result) && !result.is_null() {
1529                    let name = tc.name.as_deref().unwrap_or(&tc.sql);
1530                    return Err(SqlError::CheckViolation(name.to_string()));
1531                }
1532            }
1533        }
1534
1535        if has_fks {
1536            for fk in &table_schema.foreign_keys {
1537                let any_null = fk.columns.iter().any(|&ci| bufs.row[ci as usize].is_null());
1538                if any_null {
1539                    continue;
1540                }
1541                let fk_vals: Vec<Value> = fk
1542                    .columns
1543                    .iter()
1544                    .map(|&ci| bufs.row[ci as usize].clone())
1545                    .collect();
1546                bufs.fk_key_buf.clear();
1547                encode_composite_key_into(&fk_vals, &mut bufs.fk_key_buf);
1548                let found = wtx
1549                    .table_get(fk.foreign_table.as_bytes(), &bufs.fk_key_buf)
1550                    .map_err(SqlError::Storage)?;
1551                if found.is_none() {
1552                    let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
1553                    return Err(SqlError::ForeignKeyViolation(name.to_string()));
1554                }
1555            }
1556        }
1557
1558        let proposed_row_for_returning: Option<Vec<Value>> =
1559            returning_rows.as_ref().map(|_| bufs.row.clone());
1560
1561        for (j, &i) in pk_indices.iter().enumerate() {
1562            bufs.pk_values[j] = std::mem::replace(&mut bufs.row[i], Value::Null);
1563        }
1564        match cache.map(|c| c.single_int_pk).unwrap_or(false) {
1565            true => match bufs.pk_values[0] {
1566                Value::Integer(v) => crate::encoding::encode_int_key_into(v, &mut bufs.key_buf),
1567                _ => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1568            },
1569            false => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1570        }
1571
1572        for &slot in dropped {
1573            bufs.value_values[slot as usize] = Value::Null;
1574        }
1575        for (j, &i) in non_pk.iter().enumerate() {
1576            let col = &table_schema.columns[i];
1577            if matches!(
1578                col.generated_kind,
1579                Some(crate::parser::GeneratedKind::Virtual)
1580            ) {
1581                bufs.value_values[enc_pos[j] as usize] = Value::Null;
1582                bufs.row[i] = Value::Null;
1583            } else {
1584                bufs.value_values[enc_pos[j] as usize] =
1585                    std::mem::replace(&mut bufs.row[i], Value::Null);
1586            }
1587        }
1588        match cache.and_then(|c| c.row_encoder.as_ref()) {
1589            Some(tmpl) => crate::encoding::encode_int_row_with_template(
1590                tmpl,
1591                &bufs.value_values,
1592                &mut bufs.value_buf,
1593            )?,
1594            None => encode_row_into(&bufs.value_values, &mut bufs.value_buf),
1595        }
1596
1597        if bufs.key_buf.len() > citadel_core::MAX_KEY_SIZE {
1598            return Err(SqlError::KeyTooLarge {
1599                size: bufs.key_buf.len(),
1600                max: citadel_core::MAX_KEY_SIZE,
1601            });
1602        }
1603        if bufs.value_buf.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
1604            return Err(SqlError::RowTooLarge {
1605                size: bufs.value_buf.len(),
1606                max: citadel_core::MAX_INLINE_VALUE_SIZE,
1607            });
1608        }
1609
1610        match compiled_conflict.as_ref() {
1611            None => {
1612                let is_new = wtx
1613                    .table_insert(table_bytes, &bufs.key_buf, &bufs.value_buf)
1614                    .map_err(SqlError::Storage)?;
1615                if !is_new {
1616                    return Err(SqlError::DuplicateKey);
1617                }
1618                if has_indices {
1619                    for (j, &i) in pk_indices.iter().enumerate() {
1620                        bufs.row[i] = bufs.pk_values[j].clone();
1621                    }
1622                    for (j, &i) in non_pk.iter().enumerate() {
1623                        bufs.row[i] = std::mem::replace(
1624                            &mut bufs.value_values[enc_pos[j] as usize],
1625                            Value::Null,
1626                        );
1627                    }
1628                    insert_index_entries(wtx, table_schema, &bufs.row, &bufs.pk_values)?;
1629                }
1630                count += 1;
1631                if let Some(buf) = returning_rows.as_mut() {
1632                    buf.push((None, proposed_row_for_returning));
1633                }
1634            }
1635            Some(oc) => {
1636                let oc_ref: &CompiledOnConflict = oc;
1637                let needs_row = upsert_needs_row(oc_ref, table_schema);
1638                if needs_row {
1639                    for (j, &i) in pk_indices.iter().enumerate() {
1640                        bufs.row[i] = bufs.pk_values[j].clone();
1641                    }
1642                    for (j, &i) in non_pk.iter().enumerate() {
1643                        bufs.row[i] = std::mem::replace(
1644                            &mut bufs.value_values[enc_pos[j] as usize],
1645                            Value::Null,
1646                        );
1647                    }
1648                }
1649                let outcome = apply_insert_with_conflict(
1650                    wtx,
1651                    table_schema,
1652                    &bufs.key_buf,
1653                    &bufs.value_buf,
1654                    &bufs.row,
1655                    &bufs.pk_values,
1656                    oc_ref,
1657                    row_col_map.unwrap(),
1658                    stmt.returning.is_some(),
1659                )?;
1660                match outcome {
1661                    InsertRowOutcome::Inserted => {
1662                        count += 1;
1663                        if let Some(buf) = returning_rows.as_mut() {
1664                            buf.push((None, proposed_row_for_returning));
1665                        }
1666                    }
1667                    InsertRowOutcome::Updated { old, new } => {
1668                        count += 1;
1669                        if let Some(buf) = returning_rows.as_mut() {
1670                            buf.push((Some(old), Some(new)));
1671                        }
1672                    }
1673                    InsertRowOutcome::Skipped => {}
1674                }
1675            }
1676        }
1677    }
1678
1679    if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
1680        return Ok(ExecutionResult::Query(super::helpers::project_returning(
1681            table_schema,
1682            returning_cols,
1683            &rows,
1684        )?));
1685    }
1686
1687    Ok(ExecutionResult::RowsAffected(count))
1688}
1689
1690pub struct CompiledInsert {
1691    table_lower: String,
1692    cached: Option<InsertCache>,
1693}
1694
1695struct InsertCache {
1696    col_indices: Vec<usize>,
1697    has_subquery: bool,
1698    any_defaults: bool,
1699    has_checks: bool,
1700    on_conflict: Option<Arc<CompiledOnConflict>>,
1701    row_col_map: Option<ColumnMap>,
1702    generated_col_positions: Vec<usize>,
1703    generated_fast_evals: Vec<FastGenEval>,
1704    pk_indices: Vec<usize>,
1705    non_pk_indices: Vec<usize>,
1706    encoding_positions: Vec<u16>,
1707    dropped_non_pk_slots: Vec<u16>,
1708    phys_count: usize,
1709    single_int_pk: bool,
1710    not_null_indices: Vec<u16>,
1711    bind_plan: Option<Vec<BindAction>>,
1712    row_fully_overwritten: bool,
1713    row_encoder: Option<crate::encoding::IntRowTemplate>,
1714    is_trivial_fast: bool,
1715    trivial_fast_program: Option<TrivialFastProgram>,
1716}
1717
1718#[derive(Clone)]
1719enum BindAction {
1720    Param {
1721        param_idx: usize,
1722        col_idx: usize,
1723        target: DataType,
1724    },
1725    Literal {
1726        value: Value,
1727        col_idx: usize,
1728    },
1729}
1730
1731#[derive(Clone)]
1732struct TrivialFastProgram {
1733    template: Vec<u8>,
1734    ops: Vec<WriteOp>,
1735    pk_param: u8,
1736    not_null_param_indices: Vec<u8>,
1737}
1738
1739#[derive(Clone)]
1740enum WriteOp {
1741    ParamI64 {
1742        param_idx: u8,
1743        off: u32,
1744    },
1745    LiteralI64 {
1746        value: i64,
1747        off: u32,
1748    },
1749    GenAddParamsI64 {
1750        a_param: u8,
1751        b_param: u8,
1752        off: u32,
1753        bitmap_byte_off: u32,
1754        bitmap_bit_mask: u8,
1755    },
1756    GenMulAddParamI64 {
1757        param_idx: u8,
1758        mul: i64,
1759        add: i64,
1760        off: u32,
1761        bitmap_byte_off: u32,
1762        bitmap_bit_mask: u8,
1763    },
1764}
1765
1766fn build_trivial_fast_program(
1767    bind_plan: &[BindAction],
1768    row_encoder: &crate::encoding::IntRowTemplate,
1769    non_virtual_pairs: &[(usize, usize)],
1770    generated_col_positions: &[usize],
1771    generated_fast_evals: &[FastGenEval],
1772    pk_indices: &[usize],
1773    columns: &[crate::types::ColumnDef],
1774) -> Option<TrivialFastProgram> {
1775    let pk_col = pk_indices[0];
1776    let col_to_slot: rustc_hash::FxHashMap<usize, usize> =
1777        non_virtual_pairs.iter().copied().collect();
1778    let slot_to_off: rustc_hash::FxHashMap<usize, usize> =
1779        row_encoder.slot_offsets.iter().copied().collect();
1780
1781    let mut col_to_param: rustc_hash::FxHashMap<usize, u8> = Default::default();
1782    let mut col_to_lit_int: rustc_hash::FxHashMap<usize, i64> = Default::default();
1783    let mut pk_param: Option<u8> = None;
1784    let mut ops: Vec<WriteOp> = Vec::with_capacity(bind_plan.len() + generated_col_positions.len());
1785    let mut not_null_param_indices: Vec<u8> = Vec::new();
1786
1787    for action in bind_plan {
1788        match action {
1789            BindAction::Param {
1790                param_idx,
1791                col_idx,
1792                target,
1793            } => {
1794                if *target != DataType::Integer {
1795                    return None;
1796                }
1797                let pi: u8 = u8::try_from(*param_idx).ok()?;
1798                col_to_param.insert(*col_idx, pi);
1799                if *col_idx == pk_col {
1800                    pk_param = Some(pi);
1801                } else {
1802                    let slot = *col_to_slot.get(col_idx)?;
1803                    let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
1804                    ops.push(WriteOp::ParamI64 { param_idx: pi, off });
1805                    if !columns[*col_idx].nullable {
1806                        not_null_param_indices.push(pi);
1807                    }
1808                }
1809            }
1810            BindAction::Literal { value, col_idx } => match value {
1811                Value::Integer(v) => {
1812                    col_to_lit_int.insert(*col_idx, *v);
1813                    if *col_idx == pk_col {
1814                        return None;
1815                    }
1816                    let slot = *col_to_slot.get(col_idx)?;
1817                    let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
1818                    ops.push(WriteOp::LiteralI64 { value: *v, off });
1819                }
1820                _ => return None,
1821            },
1822        }
1823    }
1824
1825    let pk_param = pk_param?;
1826
1827    for (i, &gen_pos) in generated_col_positions.iter().enumerate() {
1828        let gen_slot = *col_to_slot.get(&gen_pos)?;
1829        let gen_off = u32::try_from(*slot_to_off.get(&gen_slot)?).ok()?;
1830        let bitmap_byte_off = u32::try_from(2 + gen_slot / 8).ok()?;
1831        let bitmap_bit_mask: u8 = 1u8 << (gen_slot % 8);
1832        let gen_col_nullable = columns[gen_pos].nullable;
1833
1834        match &generated_fast_evals[i] {
1835            FastGenEval::IntColAddCol {
1836                left_idx,
1837                right_idx,
1838            } => {
1839                let a_param = col_to_param.get(left_idx).copied();
1840                let b_param = col_to_param.get(right_idx).copied();
1841                match (a_param, b_param) {
1842                    (Some(ap), Some(bp)) => {
1843                        let deps_safe = gen_col_nullable
1844                            || (not_null_param_indices.contains(&ap)
1845                                && not_null_param_indices.contains(&bp));
1846                        if !deps_safe {
1847                            return None;
1848                        }
1849                        ops.push(WriteOp::GenAddParamsI64 {
1850                            a_param: ap,
1851                            b_param: bp,
1852                            off: gen_off,
1853                            bitmap_byte_off,
1854                            bitmap_bit_mask,
1855                        });
1856                    }
1857                    (Some(p), None) => {
1858                        let lit = col_to_lit_int.get(right_idx).copied()?;
1859                        if !gen_col_nullable && !not_null_param_indices.contains(&p) {
1860                            return None;
1861                        }
1862                        ops.push(WriteOp::GenMulAddParamI64 {
1863                            param_idx: p,
1864                            mul: 1,
1865                            add: lit,
1866                            off: gen_off,
1867                            bitmap_byte_off,
1868                            bitmap_bit_mask,
1869                        });
1870                    }
1871                    (None, Some(p)) => {
1872                        let lit = col_to_lit_int.get(left_idx).copied()?;
1873                        if !gen_col_nullable && !not_null_param_indices.contains(&p) {
1874                            return None;
1875                        }
1876                        ops.push(WriteOp::GenMulAddParamI64 {
1877                            param_idx: p,
1878                            mul: 1,
1879                            add: lit,
1880                            off: gen_off,
1881                            bitmap_byte_off,
1882                            bitmap_bit_mask,
1883                        });
1884                    }
1885                    (None, None) => {
1886                        let la = col_to_lit_int.get(left_idx).copied()?;
1887                        let lb = col_to_lit_int.get(right_idx).copied()?;
1888                        ops.push(WriteOp::LiteralI64 {
1889                            value: la.wrapping_add(lb),
1890                            off: gen_off,
1891                        });
1892                    }
1893                }
1894            }
1895            FastGenEval::IntColMulAdd {
1896                col_schema_idx,
1897                mul,
1898                add,
1899            } => {
1900                if let Some(p) = col_to_param.get(col_schema_idx).copied() {
1901                    if !gen_col_nullable && !not_null_param_indices.contains(&p) {
1902                        return None;
1903                    }
1904                    ops.push(WriteOp::GenMulAddParamI64 {
1905                        param_idx: p,
1906                        mul: *mul,
1907                        add: *add,
1908                        off: gen_off,
1909                        bitmap_byte_off,
1910                        bitmap_bit_mask,
1911                    });
1912                } else if let Some(lit) = col_to_lit_int.get(col_schema_idx).copied() {
1913                    ops.push(WriteOp::LiteralI64 {
1914                        value: lit.wrapping_mul(*mul).wrapping_add(*add),
1915                        off: gen_off,
1916                    });
1917                } else {
1918                    return None;
1919                }
1920            }
1921            FastGenEval::None => return None,
1922        }
1923    }
1924
1925    Some(TrivialFastProgram {
1926        template: row_encoder.template.clone(),
1927        ops,
1928        pk_param,
1929        not_null_param_indices,
1930    })
1931}
1932
1933#[derive(Clone)]
1934pub(super) enum CompiledOnConflict {
1935    DoNothing {
1936        target: Option<ConflictKind>,
1937    },
1938    DoUpdate {
1939        target: ConflictKind,
1940        assignments: Vec<(usize, Expr)>,
1941        where_clause: Option<Expr>,
1942        fast_paths: Option<Vec<DoUpdateFastPath>>,
1943    },
1944}
1945
1946#[derive(Clone, Copy)]
1947pub(super) enum DoUpdateFastPath {
1948    IntAddConst { phys_idx: usize, delta: i64 },
1949}
1950
1951#[derive(Clone, Debug)]
1952pub(super) enum ConflictKind {
1953    PrimaryKey,
1954    UniqueIndex { index_idx: usize },
1955}
1956
1957fn resolve_conflict_target(target: &ConflictTarget, ts: &TableSchema) -> Result<ConflictKind> {
1958    match target {
1959        ConflictTarget::Columns(cols) => {
1960            let col_idx_set: Vec<u16> = cols
1961                .iter()
1962                .map(|name| {
1963                    ts.column_index(name)
1964                        .map(|i| i as u16)
1965                        .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
1966                })
1967                .collect::<Result<_>>()?;
1968            let pk_set = ts.primary_key_columns.clone();
1969            if set_equal(&col_idx_set, &pk_set) {
1970                return Ok(ConflictKind::PrimaryKey);
1971            }
1972            for (index_idx, idx) in ts.indices.iter().enumerate() {
1973                if idx.unique && set_equal(&col_idx_set, &idx.columns) {
1974                    return Ok(ConflictKind::UniqueIndex { index_idx });
1975                }
1976            }
1977            Err(SqlError::Plan(
1978                "ON CONFLICT target does not match any unique constraint".into(),
1979            ))
1980        }
1981        ConflictTarget::Constraint(name) => {
1982            let lower = name.to_ascii_lowercase();
1983            for (index_idx, idx) in ts.indices.iter().enumerate() {
1984                if idx.name.eq_ignore_ascii_case(&lower) {
1985                    if idx.unique {
1986                        return Ok(ConflictKind::UniqueIndex { index_idx });
1987                    }
1988                    return Err(SqlError::Plan(format!(
1989                        "ON CONFLICT ON CONSTRAINT '{name}' requires a unique index"
1990                    )));
1991                }
1992            }
1993            Err(SqlError::Plan(format!(
1994                "unknown constraint '{name}'; primary keys cannot be referenced by name, use ON CONFLICT (col_list)"
1995            )))
1996        }
1997    }
1998}
1999
2000fn set_equal(a: &[u16], b: &[u16]) -> bool {
2001    if a.len() != b.len() {
2002        return false;
2003    }
2004    let mut a_sorted = a.to_vec();
2005    let mut b_sorted = b.to_vec();
2006    a_sorted.sort_unstable();
2007    b_sorted.sort_unstable();
2008    a_sorted == b_sorted
2009}
2010
2011pub(super) enum InsertRowOutcome {
2012    Inserted,
2013    Updated { old: Vec<Value>, new: Vec<Value> },
2014    Skipped,
2015}
2016
2017#[allow(clippy::too_many_arguments)]
2018#[inline]
2019pub(super) fn apply_insert_with_conflict(
2020    wtx: &mut WriteTxn<'_>,
2021    table_schema: &TableSchema,
2022    key_buf: &[u8],
2023    value_buf: &[u8],
2024    row: &[Value],
2025    pk_values: &[Value],
2026    on_conflict: &CompiledOnConflict,
2027    col_map: &ColumnMap,
2028    capture_returning: bool,
2029) -> Result<InsertRowOutcome> {
2030    let table_bytes = table_schema.name.as_bytes();
2031
2032    if let CompiledOnConflict::DoNothing { target } = on_conflict {
2033        let pk_target = matches!(target, None | Some(ConflictKind::PrimaryKey));
2034        if pk_target && table_schema.indices.is_empty() && table_schema.foreign_keys.is_empty() {
2035            let inserted = wtx
2036                .table_insert_if_absent(table_bytes, key_buf, value_buf)
2037                .map_err(SqlError::Storage)?;
2038            return Ok(if inserted {
2039                InsertRowOutcome::Inserted
2040            } else {
2041                InsertRowOutcome::Skipped
2042            });
2043        }
2044    }
2045
2046    if let CompiledOnConflict::DoUpdate {
2047        target: ConflictKind::PrimaryKey,
2048        assignments,
2049        where_clause,
2050        fast_paths,
2051    } = on_conflict
2052    {
2053        if can_fuse_do_update(table_schema, assignments) {
2054            return apply_do_update_fused(
2055                wtx,
2056                table_schema,
2057                table_bytes,
2058                key_buf,
2059                value_buf,
2060                row,
2061                assignments,
2062                where_clause.as_ref(),
2063                col_map,
2064                fast_paths.as_deref(),
2065                capture_returning,
2066            );
2067        }
2068    }
2069
2070    let primary_outcome = wtx
2071        .table_insert_or_fetch(table_bytes, key_buf, value_buf)
2072        .map_err(SqlError::Storage)?;
2073
2074    match primary_outcome {
2075        citadel_txn::write_txn::InsertOutcome::Inserted => {
2076            if table_schema.indices.is_empty() {
2077                return Ok(InsertRowOutcome::Inserted);
2078            }
2079            let mut inserted_keys: Vec<(usize, Vec<u8>)> = Vec::new();
2080            match insert_index_entries_or_fetch(
2081                wtx,
2082                table_schema,
2083                row,
2084                pk_values,
2085                &mut inserted_keys,
2086            )? {
2087                None => Ok(InsertRowOutcome::Inserted),
2088                Some(conflicting_idx) => {
2089                    let matches_target =
2090                        matches!(on_conflict, CompiledOnConflict::DoNothing { target: None })
2091                            || matches!(
2092                                on_conflict,
2093                                CompiledOnConflict::DoNothing {
2094                                    target: Some(ConflictKind::UniqueIndex { index_idx }),
2095                                } | CompiledOnConflict::DoUpdate {
2096                                    target: ConflictKind::UniqueIndex { index_idx },
2097                                    ..
2098                                } if *index_idx == conflicting_idx
2099                            );
2100                    undo_partial_insert(wtx, table_schema, key_buf, &inserted_keys)?;
2101                    if !matches_target {
2102                        return Err(SqlError::UniqueViolation(
2103                            table_schema.indices[conflicting_idx].name.clone(),
2104                        ));
2105                    }
2106                    match on_conflict {
2107                        CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2108                        CompiledOnConflict::DoUpdate {
2109                            assignments,
2110                            where_clause,
2111                            ..
2112                        } => {
2113                            let existing_pk =
2114                                fetch_unique_index_pk(wtx, table_schema, conflicting_idx, row)?;
2115                            apply_do_update(
2116                                wtx,
2117                                table_schema,
2118                                &existing_pk,
2119                                row,
2120                                assignments,
2121                                where_clause.as_ref(),
2122                                col_map,
2123                                capture_returning,
2124                            )
2125                        }
2126                    }
2127                }
2128            }
2129        }
2130        citadel_txn::write_txn::InsertOutcome::Existed(old_bytes) => {
2131            let matches_target = matches!(
2132                on_conflict,
2133                CompiledOnConflict::DoNothing { target: None }
2134                    | CompiledOnConflict::DoNothing {
2135                        target: Some(ConflictKind::PrimaryKey),
2136                    }
2137                    | CompiledOnConflict::DoUpdate {
2138                        target: ConflictKind::PrimaryKey,
2139                        ..
2140                    }
2141            );
2142            if !matches_target {
2143                return Err(SqlError::DuplicateKey);
2144            }
2145            match on_conflict {
2146                CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2147                CompiledOnConflict::DoUpdate {
2148                    assignments,
2149                    where_clause,
2150                    ..
2151                } => {
2152                    let old_row = decode_full_row(table_schema, key_buf, &old_bytes)?;
2153                    apply_do_update_with_old_row(
2154                        wtx,
2155                        table_schema,
2156                        key_buf,
2157                        &old_row,
2158                        row,
2159                        assignments,
2160                        where_clause.as_ref(),
2161                        col_map,
2162                        capture_returning,
2163                    )
2164                }
2165            }
2166        }
2167    }
2168}
2169
2170#[inline]
2171fn apply_fast_path_patch(
2172    old_bytes: &[u8],
2173    fast_paths: &[DoUpdateFastPath],
2174) -> Result<UpsertAction> {
2175    UPSERT_SCRATCH.with(|slot| {
2176        let mut bufs = slot.borrow_mut();
2177        bufs.new_value_buf.clear();
2178        bufs.new_value_buf.extend_from_slice(old_bytes);
2179
2180        let mut patch_scratch: Vec<u8> = Vec::new();
2181
2182        for fp in fast_paths {
2183            match fp {
2184                DoUpdateFastPath::IntAddConst { phys_idx, delta } => {
2185                    let decoded =
2186                        crate::encoding::decode_columns(&bufs.new_value_buf, &[*phys_idx])?;
2187                    let old_val = &decoded[0];
2188                    let new_val = match old_val {
2189                        Value::Integer(i) => Value::Integer(i.wrapping_add(*delta)),
2190                        Value::Null => Value::Null,
2191                        _ => {
2192                            return Err(SqlError::TypeMismatch {
2193                                expected: "INTEGER".into(),
2194                                got: old_val.data_type().to_string(),
2195                            });
2196                        }
2197                    };
2198                    if !crate::encoding::patch_column_in_place(
2199                        &mut bufs.new_value_buf,
2200                        *phys_idx,
2201                        &new_val,
2202                    )? {
2203                        patch_scratch.clear();
2204                        crate::encoding::patch_row_column(
2205                            &bufs.new_value_buf,
2206                            *phys_idx,
2207                            &new_val,
2208                            &mut patch_scratch,
2209                        )?;
2210                        std::mem::swap(&mut bufs.new_value_buf, &mut patch_scratch);
2211                    }
2212                }
2213            }
2214        }
2215
2216        if bufs.new_value_buf.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
2217            return Err(SqlError::RowTooLarge {
2218                size: bufs.new_value_buf.len(),
2219                max: citadel_core::MAX_INLINE_VALUE_SIZE,
2220            });
2221        }
2222
2223        Ok(UpsertAction::Replace(bufs.new_value_buf.clone()))
2224    })
2225}
2226
2227fn upsert_needs_row(oc: &CompiledOnConflict, ts: &TableSchema) -> bool {
2228    if !ts.indices.is_empty() {
2229        return true;
2230    }
2231    match oc {
2232        CompiledOnConflict::DoNothing { .. } => false,
2233        CompiledOnConflict::DoUpdate { fast_paths, .. } => fast_paths.is_none() || ts.has_checks(),
2234    }
2235}
2236
2237fn can_fuse_do_update(ts: &TableSchema, assignments: &[(usize, Expr)]) -> bool {
2238    if !ts.indices.is_empty() {
2239        return false;
2240    }
2241    if !ts.foreign_keys.is_empty() {
2242        return false;
2243    }
2244    if ts.columns.iter().any(|c| c.generated_kind.is_some()) {
2245        return false;
2246    }
2247    let pk = ts.pk_indices();
2248    !assignments.iter().any(|(ci, _)| pk.contains(ci))
2249}
2250
2251#[allow(clippy::too_many_arguments)]
2252#[inline]
2253fn apply_do_update_fused(
2254    wtx: &mut WriteTxn<'_>,
2255    table_schema: &TableSchema,
2256    table_bytes: &[u8],
2257    key_buf: &[u8],
2258    value_buf: &[u8],
2259    proposed_row: &[Value],
2260    assignments: &[(usize, Expr)],
2261    where_clause: Option<&Expr>,
2262    col_map: &ColumnMap,
2263    fast_paths: Option<&[DoUpdateFastPath]>,
2264    capture_returning: bool,
2265) -> Result<InsertRowOutcome> {
2266    let non_pk = table_schema.non_pk_indices();
2267    let enc_pos = table_schema.encoding_positions();
2268    let phys_count = table_schema.physical_non_pk_count();
2269    let dropped = table_schema.dropped_non_pk_slots();
2270    let has_checks = table_schema.has_checks();
2271    let has_fks = !table_schema.foreign_keys.is_empty();
2272
2273    let captured: std::cell::RefCell<Option<(Vec<Value>, Vec<Value>)>> =
2274        std::cell::RefCell::new(None);
2275
2276    let outcome =
2277        wtx.table_upsert_with::<_, SqlError>(table_bytes, key_buf, value_buf, |old_bytes| {
2278            if let Some(fps) = fast_paths {
2279                if !has_checks {
2280                    let action = apply_fast_path_patch(old_bytes, fps)?;
2281                    if capture_returning {
2282                        if let UpsertAction::Replace(ref new_bytes) = action {
2283                            let old_row = decode_full_row(table_schema, key_buf, old_bytes)?;
2284                            let new_row = decode_full_row(table_schema, key_buf, new_bytes)?;
2285                            *captured.borrow_mut() = Some((old_row, new_row));
2286                        }
2287                    }
2288                    return Ok(action);
2289                }
2290            }
2291            UPSERT_SCRATCH.with(|slot| {
2292                let mut bufs = slot.borrow_mut();
2293                let UpsertBufs {
2294                    old_row,
2295                    new_row,
2296                    value_values,
2297                    new_value_buf,
2298                } = &mut *bufs;
2299
2300                old_row.clear();
2301                old_row.resize(table_schema.columns.len(), Value::Null);
2302                decode_full_row_into(table_schema, key_buf, old_bytes, old_row)?;
2303
2304                if let Some(w) = where_clause {
2305                    let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2306                    let result = eval_expr(w, &ctx)?;
2307                    if result.is_null() || !is_truthy(&result) {
2308                        return Ok(UpsertAction::Skip);
2309                    }
2310                }
2311
2312                new_row.clear();
2313                new_row.extend_from_slice(old_row);
2314                for (col_idx, expr) in assignments {
2315                    let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2316                    let val = eval_expr(expr, &ctx)?;
2317                    let col = &table_schema.columns[*col_idx];
2318                    new_row[*col_idx] = if val.is_null() {
2319                        Value::Null
2320                    } else {
2321                        let got = val.data_type();
2322                        val.coerce_into(col.data_type)
2323                            .ok_or_else(|| SqlError::TypeMismatch {
2324                                expected: col.data_type.to_string(),
2325                                got: got.to_string(),
2326                            })?
2327                    };
2328                }
2329
2330                for (assigned_idx, _) in assignments {
2331                    let col = &table_schema.columns[*assigned_idx];
2332                    if !col.nullable && new_row[col.position as usize].is_null() {
2333                        return Err(SqlError::NotNullViolation(col.name.clone()));
2334                    }
2335                }
2336                if has_checks {
2337                    for col in &table_schema.columns {
2338                        if let Some(ref check) = col.check_expr {
2339                            let ctx = EvalCtx::new(col_map, new_row);
2340                            let result = eval_expr(check, &ctx)?;
2341                            if !is_truthy(&result) && !result.is_null() {
2342                                let name = col.check_name.as_deref().unwrap_or(&col.name);
2343                                return Err(SqlError::CheckViolation(name.to_string()));
2344                            }
2345                        }
2346                    }
2347                    for tc in &table_schema.check_constraints {
2348                        let ctx = EvalCtx::new(col_map, new_row);
2349                        let result = eval_expr(&tc.expr, &ctx)?;
2350                        if !is_truthy(&result) && !result.is_null() {
2351                            let name = tc.name.as_deref().unwrap_or(&tc.sql);
2352                            return Err(SqlError::CheckViolation(name.to_string()));
2353                        }
2354                    }
2355                }
2356                let _ = has_fks;
2357
2358                value_values.clear();
2359                value_values.resize(phys_count, Value::Null);
2360                for &slot in dropped {
2361                    value_values[slot as usize] = Value::Null;
2362                }
2363                for (j, &i) in non_pk.iter().enumerate() {
2364                    value_values[enc_pos[j] as usize] = new_row[i].clone();
2365                }
2366                new_value_buf.clear();
2367                crate::encoding::encode_row_into(value_values, new_value_buf);
2368
2369                if new_value_buf.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
2370                    return Err(SqlError::RowTooLarge {
2371                        size: new_value_buf.len(),
2372                        max: citadel_core::MAX_INLINE_VALUE_SIZE,
2373                    });
2374                }
2375
2376                if capture_returning {
2377                    *captured.borrow_mut() = Some((old_row.clone(), new_row.clone()));
2378                }
2379                Ok(UpsertAction::Replace(new_value_buf.clone()))
2380            })
2381        })?;
2382
2383    match outcome {
2384        UpsertOutcome::Inserted => Ok(InsertRowOutcome::Inserted),
2385        UpsertOutcome::Updated => {
2386            if capture_returning {
2387                let (old, new) = captured.into_inner().ok_or_else(|| {
2388                    SqlError::InvalidValue("DO UPDATE produced no captured rows".into())
2389                })?;
2390                Ok(InsertRowOutcome::Updated { old, new })
2391            } else {
2392                Ok(InsertRowOutcome::Inserted)
2393            }
2394        }
2395        UpsertOutcome::Skipped => Ok(InsertRowOutcome::Skipped),
2396    }
2397}
2398
2399fn fetch_unique_index_pk(
2400    wtx: &mut WriteTxn<'_>,
2401    table_schema: &TableSchema,
2402    index_idx: usize,
2403    row: &[Value],
2404) -> Result<Vec<u8>> {
2405    let idx = &table_schema.indices[index_idx];
2406    let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2407    let indexed: Vec<Value> = idx
2408        .columns
2409        .iter()
2410        .map(|&col_idx| row[col_idx as usize].clone())
2411        .collect();
2412    let key = crate::encoding::encode_composite_key(&indexed);
2413    let value = wtx
2414        .table_get(&idx_table, &key)
2415        .map_err(SqlError::Storage)?
2416        .ok_or_else(|| {
2417            SqlError::InvalidValue("unique index missing expected collision entry".into())
2418        })?;
2419    Ok(value)
2420}
2421
2422#[allow(clippy::too_many_arguments)]
2423fn apply_do_update(
2424    wtx: &mut WriteTxn<'_>,
2425    table_schema: &TableSchema,
2426    pk_key: &[u8],
2427    proposed_row: &[Value],
2428    assignments: &[(usize, Expr)],
2429    where_clause: Option<&Expr>,
2430    col_map: &ColumnMap,
2431    capture_returning: bool,
2432) -> Result<InsertRowOutcome> {
2433    let old_value = wtx
2434        .table_get(table_schema.name.as_bytes(), pk_key)
2435        .map_err(SqlError::Storage)?
2436        .ok_or_else(|| SqlError::InvalidValue("primary row missing for DO UPDATE target".into()))?;
2437    let old_row = decode_full_row(table_schema, pk_key, &old_value)?;
2438    apply_do_update_with_old_row(
2439        wtx,
2440        table_schema,
2441        pk_key,
2442        &old_row,
2443        proposed_row,
2444        assignments,
2445        where_clause,
2446        col_map,
2447        capture_returning,
2448    )
2449}
2450
2451#[allow(clippy::too_many_arguments)]
2452fn apply_do_update_with_old_row(
2453    wtx: &mut WriteTxn<'_>,
2454    table_schema: &TableSchema,
2455    old_pk_key: &[u8],
2456    old_row: &[Value],
2457    proposed_row: &[Value],
2458    assignments: &[(usize, Expr)],
2459    where_clause: Option<&Expr>,
2460    col_map: &ColumnMap,
2461    capture_returning: bool,
2462) -> Result<InsertRowOutcome> {
2463    if let Some(w) = where_clause {
2464        let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2465        let result = eval_expr(w, &ctx)?;
2466        if result.is_null() || !is_truthy(&result) {
2467            return Ok(InsertRowOutcome::Skipped);
2468        }
2469    }
2470
2471    let mut new_row = old_row.to_vec();
2472    for (col_idx, expr) in assignments {
2473        let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2474        let val = eval_expr(expr, &ctx)?;
2475        let col = &table_schema.columns[*col_idx];
2476        new_row[*col_idx] = if val.is_null() {
2477            Value::Null
2478        } else {
2479            let got = val.data_type();
2480            val.coerce_into(col.data_type)
2481                .ok_or_else(|| SqlError::TypeMismatch {
2482                    expected: col.data_type.to_string(),
2483                    got: got.to_string(),
2484                })?
2485        };
2486    }
2487
2488    for col in &table_schema.columns {
2489        if matches!(
2490            col.generated_kind,
2491            Some(crate::parser::GeneratedKind::Stored)
2492        ) {
2493            let val = eval_expr(
2494                col.generated_expr.as_ref().unwrap(),
2495                &EvalCtx::new(col_map, &new_row),
2496            )?;
2497            let pos = col.position as usize;
2498            new_row[pos] = if val.is_null() {
2499                if !col.nullable {
2500                    return Err(SqlError::NotNullViolation(col.name.clone()));
2501                }
2502                Value::Null
2503            } else {
2504                let got = val.data_type();
2505                val.coerce_into(col.data_type)
2506                    .ok_or_else(|| SqlError::TypeMismatch {
2507                        expected: col.data_type.to_string(),
2508                        got: got.to_string(),
2509                    })?
2510            };
2511        }
2512    }
2513
2514    let pk_indices = table_schema.pk_indices();
2515    let assigned_pk = assignments.iter().any(|(ci, _)| pk_indices.contains(ci));
2516    let pk_changed = assigned_pk && pk_indices.iter().any(|&i| old_row[i] != new_row[i]);
2517
2518    for (assigned_idx, _) in assignments {
2519        let col = &table_schema.columns[*assigned_idx];
2520        if !col.nullable && new_row[col.position as usize].is_null() {
2521            return Err(SqlError::NotNullViolation(col.name.clone()));
2522        }
2523    }
2524    if table_schema.has_checks() {
2525        for col in &table_schema.columns {
2526            if let Some(ref check) = col.check_expr {
2527                let ctx = EvalCtx::new(col_map, &new_row);
2528                let result = eval_expr(check, &ctx)?;
2529                if !is_truthy(&result) && !result.is_null() {
2530                    let name = col.check_name.as_deref().unwrap_or(&col.name);
2531                    return Err(SqlError::CheckViolation(name.to_string()));
2532                }
2533            }
2534        }
2535        for tc in &table_schema.check_constraints {
2536            let ctx = EvalCtx::new(col_map, &new_row);
2537            let result = eval_expr(&tc.expr, &ctx)?;
2538            if !is_truthy(&result) && !result.is_null() {
2539                let name = tc.name.as_deref().unwrap_or(&tc.sql);
2540                return Err(SqlError::CheckViolation(name.to_string()));
2541            }
2542        }
2543    }
2544    for fk in &table_schema.foreign_keys {
2545        let changed = fk
2546            .columns
2547            .iter()
2548            .any(|&ci| old_row[ci as usize] != new_row[ci as usize]);
2549        if !changed {
2550            continue;
2551        }
2552        let any_null = fk.columns.iter().any(|&ci| new_row[ci as usize].is_null());
2553        if any_null {
2554            continue;
2555        }
2556        let fk_vals: Vec<Value> = fk
2557            .columns
2558            .iter()
2559            .map(|&ci| new_row[ci as usize].clone())
2560            .collect();
2561        let fk_key = crate::encoding::encode_composite_key(&fk_vals);
2562        let found = wtx
2563            .table_get(fk.foreign_table.as_bytes(), &fk_key)
2564            .map_err(SqlError::Storage)?;
2565        if found.is_none() {
2566            let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
2567            return Err(SqlError::ForeignKeyViolation(name.to_string()));
2568        }
2569    }
2570
2571    let has_indices = !table_schema.indices.is_empty();
2572    let old_pk_values: Vec<Value> = if has_indices || pk_changed {
2573        pk_indices.iter().map(|&i| old_row[i].clone()).collect()
2574    } else {
2575        Vec::new()
2576    };
2577    let new_pk_values: Vec<Value> = if has_indices || pk_changed {
2578        pk_indices.iter().map(|&i| new_row[i].clone()).collect()
2579    } else {
2580        Vec::new()
2581    };
2582
2583    let non_pk = table_schema.non_pk_indices();
2584    let enc_pos = table_schema.encoding_positions();
2585    let phys_count = table_schema.physical_non_pk_count();
2586    let dropped = table_schema.dropped_non_pk_slots();
2587    let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
2588    for &slot in dropped {
2589        value_values[slot as usize] = Value::Null;
2590    }
2591    for (j, &i) in non_pk.iter().enumerate() {
2592        let col = &table_schema.columns[i];
2593        value_values[enc_pos[j] as usize] = if matches!(
2594            col.generated_kind,
2595            Some(crate::parser::GeneratedKind::Virtual)
2596        ) {
2597            Value::Null
2598        } else {
2599            new_row[i].clone()
2600        };
2601    }
2602    let mut new_value_buf = Vec::with_capacity(256);
2603    crate::encoding::encode_row_into(&value_values, &mut new_value_buf);
2604
2605    if new_value_buf.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
2606        return Err(SqlError::RowTooLarge {
2607            size: new_value_buf.len(),
2608            max: citadel_core::MAX_INLINE_VALUE_SIZE,
2609        });
2610    }
2611
2612    if pk_changed {
2613        let new_pk_key = crate::encoding::encode_composite_key(&new_pk_values);
2614        let inserted = wtx
2615            .table_insert(table_schema.name.as_bytes(), &new_pk_key, &new_value_buf)
2616            .map_err(SqlError::Storage)?;
2617        if !inserted {
2618            return Err(SqlError::DuplicateKey);
2619        }
2620        wtx.table_delete(table_schema.name.as_bytes(), old_pk_key)
2621            .map_err(SqlError::Storage)?;
2622        for idx in &table_schema.indices {
2623            let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2624            let old_idx_key = encode_index_key(idx, old_row, &old_pk_values);
2625            wtx.table_delete(&idx_table, &old_idx_key)
2626                .map_err(SqlError::Storage)?;
2627            let new_idx_key = encode_index_key(idx, &new_row, &new_pk_values);
2628            let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
2629            let is_new = wtx
2630                .table_insert(&idx_table, &new_idx_key, &new_idx_val)
2631                .map_err(SqlError::Storage)?;
2632            if idx.unique && !is_new {
2633                let any_null = idx.columns.iter().any(|&c| new_row[c as usize].is_null());
2634                if !any_null {
2635                    return Err(SqlError::UniqueViolation(idx.name.clone()));
2636                }
2637            }
2638        }
2639    } else {
2640        wtx.table_update_sorted(
2641            table_schema.name.as_bytes(),
2642            &[(old_pk_key, new_value_buf.as_slice())],
2643        )
2644        .map_err(SqlError::Storage)?;
2645        for idx in &table_schema.indices {
2646            if !index_columns_changed(idx, old_row, &new_row) {
2647                continue;
2648            }
2649            let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2650            let old_idx_key = encode_index_key(idx, old_row, &old_pk_values);
2651            wtx.table_delete(&idx_table, &old_idx_key)
2652                .map_err(SqlError::Storage)?;
2653            let new_idx_key = encode_index_key(idx, &new_row, &new_pk_values);
2654            let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
2655            let is_new = wtx
2656                .table_insert(&idx_table, &new_idx_key, &new_idx_val)
2657                .map_err(SqlError::Storage)?;
2658            if idx.unique && !is_new {
2659                let any_null = idx.columns.iter().any(|&c| new_row[c as usize].is_null());
2660                if !any_null {
2661                    return Err(SqlError::UniqueViolation(idx.name.clone()));
2662                }
2663            }
2664        }
2665    }
2666
2667    if capture_returning {
2668        Ok(InsertRowOutcome::Updated {
2669            old: old_row.to_vec(),
2670            new: new_row,
2671        })
2672    } else {
2673        Ok(InsertRowOutcome::Inserted)
2674    }
2675}
2676
2677fn detect_fast_paths(
2678    ts: &TableSchema,
2679    assignments: &[(usize, Expr)],
2680) -> Option<Vec<DoUpdateFastPath>> {
2681    let non_pk = ts.non_pk_indices();
2682    let enc_pos = ts.encoding_positions();
2683    let mut out = Vec::with_capacity(assignments.len());
2684    for (col_idx, expr) in assignments {
2685        let col = &ts.columns[*col_idx];
2686        if col.data_type != DataType::Integer {
2687            return None;
2688        }
2689        let nonpk_order = non_pk.iter().position(|&i| i == *col_idx)?;
2690        let phys_idx = enc_pos[nonpk_order] as usize;
2691
2692        if let Expr::BinaryOp { left, op, right } = expr {
2693            if !matches!(op, BinOp::Add | BinOp::Sub) {
2694                return None;
2695            }
2696            let reads_target =
2697                matches!(left.as_ref(), Expr::Column(n) if n.eq_ignore_ascii_case(&col.name));
2698            if !reads_target {
2699                return None;
2700            }
2701            if let Expr::Literal(Value::Integer(n)) = right.as_ref() {
2702                let delta = if matches!(op, BinOp::Sub) { -n } else { *n };
2703                let _ = col_idx;
2704                out.push(DoUpdateFastPath::IntAddConst { phys_idx, delta });
2705                continue;
2706            }
2707            return None;
2708        }
2709        return None;
2710    }
2711    Some(out)
2712}
2713
2714fn compile_on_conflict(oc: &OnConflictClause, ts: &TableSchema) -> Result<CompiledOnConflict> {
2715    let target = oc
2716        .target
2717        .as_ref()
2718        .map(|t| resolve_conflict_target(t, ts))
2719        .transpose()?;
2720    match &oc.action {
2721        OnConflictAction::DoNothing => Ok(CompiledOnConflict::DoNothing { target }),
2722        OnConflictAction::DoUpdate {
2723            assignments,
2724            where_clause,
2725        } => {
2726            let target = target.ok_or_else(|| {
2727                SqlError::Plan("ON CONFLICT without target requires DO NOTHING".into())
2728            })?;
2729            let compiled_assignments: Vec<(usize, Expr)> = assignments
2730                .iter()
2731                .map(|(name, expr)| {
2732                    let col_idx = ts
2733                        .column_index(name)
2734                        .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?;
2735                    Ok((col_idx, expr.clone()))
2736                })
2737                .collect::<Result<_>>()?;
2738            let fast_paths = if where_clause.is_none() {
2739                detect_fast_paths(ts, &compiled_assignments)
2740            } else {
2741                None
2742            };
2743            Ok(CompiledOnConflict::DoUpdate {
2744                target,
2745                assignments: compiled_assignments,
2746                where_clause: where_clause.clone(),
2747                fast_paths,
2748            })
2749        }
2750    }
2751}
2752
2753/// Caller MUST check `cache.is_trivial_fast` first.
2754fn exec_insert_trivial_fast(
2755    wtx: &mut WriteTxn<'_>,
2756    table_lower: &str,
2757    cache: &InsertCache,
2758    bufs: &mut InsertBufs,
2759    params: &[Value],
2760) -> Result<ExecutionResult> {
2761    let prog = cache
2762        .trivial_fast_program
2763        .as_ref()
2764        .expect("trivial fast: program");
2765
2766    for &p in &prog.not_null_param_indices {
2767        if params[p as usize].is_null() {
2768            return Err(SqlError::NotNullViolation(format!("param@{p}")));
2769        }
2770    }
2771
2772    match &params[prog.pk_param as usize] {
2773        Value::Integer(v) => crate::encoding::encode_int_key_into(*v, &mut bufs.key_buf),
2774        _ => return Err(SqlError::InvalidValue("non-integer PK in fast path".into())),
2775    }
2776
2777    bufs.value_buf.clear();
2778    bufs.value_buf.extend_from_slice(&prog.template);
2779
2780    for op in &prog.ops {
2781        match op {
2782            WriteOp::ParamI64 { param_idx, off } => match &params[*param_idx as usize] {
2783                Value::Integer(v) => {
2784                    let off = *off as usize;
2785                    bufs.value_buf[off..off + 8].copy_from_slice(&v.to_le_bytes());
2786                }
2787                other => {
2788                    return Err(SqlError::TypeMismatch {
2789                        expected: "Integer".into(),
2790                        got: other.data_type().to_string(),
2791                    });
2792                }
2793            },
2794            WriteOp::LiteralI64 { value, off } => {
2795                let off = *off as usize;
2796                bufs.value_buf[off..off + 8].copy_from_slice(&value.to_le_bytes());
2797            }
2798            WriteOp::GenAddParamsI64 {
2799                a_param,
2800                b_param,
2801                off,
2802                bitmap_byte_off,
2803                bitmap_bit_mask,
2804            } => match (&params[*a_param as usize], &params[*b_param as usize]) {
2805                (Value::Integer(a), Value::Integer(b)) => {
2806                    let off = *off as usize;
2807                    bufs.value_buf[off..off + 8].copy_from_slice(&a.wrapping_add(*b).to_le_bytes());
2808                }
2809                _ => {
2810                    bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
2811                }
2812            },
2813            WriteOp::GenMulAddParamI64 {
2814                param_idx,
2815                mul,
2816                add,
2817                off,
2818                bitmap_byte_off,
2819                bitmap_bit_mask,
2820            } => match &params[*param_idx as usize] {
2821                Value::Integer(v) => {
2822                    let r = v.wrapping_mul(*mul).wrapping_add(*add);
2823                    let off = *off as usize;
2824                    bufs.value_buf[off..off + 8].copy_from_slice(&r.to_le_bytes());
2825                }
2826                _ => {
2827                    bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
2828                }
2829            },
2830        }
2831    }
2832
2833    let is_new = wtx
2834        .table_insert(table_lower.as_bytes(), &bufs.key_buf, &bufs.value_buf)
2835        .map_err(SqlError::Storage)?;
2836    if !is_new {
2837        return Err(SqlError::DuplicateKey);
2838    }
2839    Ok(ExecutionResult::RowsAffected(1))
2840}
2841
2842fn build_bind_plan(
2843    stmt: &InsertStmt,
2844    col_indices: &[usize],
2845    col_data_types: &[DataType],
2846) -> Option<Vec<BindAction>> {
2847    let rows = match &stmt.source {
2848        InsertSource::Values(rows) => rows,
2849        _ => return None,
2850    };
2851    if rows.len() != 1 {
2852        return None;
2853    }
2854    let value_row = &rows[0];
2855    if value_row.len() != col_indices.len() {
2856        return None;
2857    }
2858    let mut plan = Vec::with_capacity(value_row.len());
2859    for (i, expr) in value_row.iter().enumerate() {
2860        let col_idx = col_indices[i];
2861        let target = col_data_types[col_idx];
2862        match expr {
2863            Expr::Parameter(n) => {
2864                if *n == 0 {
2865                    return None;
2866                }
2867                plan.push(BindAction::Param {
2868                    param_idx: n - 1,
2869                    col_idx,
2870                    target,
2871                });
2872            }
2873            Expr::Literal(v) => plan.push(BindAction::Literal {
2874                value: v.clone(),
2875                col_idx,
2876            }),
2877            _ => return None,
2878        }
2879    }
2880    Some(plan)
2881}
2882
2883impl CompiledInsert {
2884    pub fn try_compile(schema: &SchemaManager, stmt: &InsertStmt) -> Option<Self> {
2885        let lower = stmt.table.to_ascii_lowercase();
2886        let cached = if let Some(ts) = schema.get(&lower) {
2887            let insert_columns: Vec<&str> = if stmt.columns.is_empty() {
2888                ts.columns.iter().map(|c| c.name.as_str()).collect()
2889            } else {
2890                stmt.columns.iter().map(|s| s.as_str()).collect()
2891            };
2892            let mut col_indices = Vec::with_capacity(insert_columns.len());
2893            for name in &insert_columns {
2894                col_indices.push(ts.column_index(name)?);
2895            }
2896            if col_indices
2897                .iter()
2898                .any(|&ci| ts.columns[ci].generated_kind.is_some())
2899            {
2900                return None;
2901            }
2902            let on_conflict = stmt
2903                .on_conflict
2904                .as_ref()
2905                .map(|oc| compile_on_conflict(oc, ts))
2906                .transpose()
2907                .ok()
2908                .flatten()
2909                .map(Arc::new);
2910            let generated_col_positions: Vec<usize> = ts
2911                .columns
2912                .iter()
2913                .enumerate()
2914                .filter_map(|(i, c)| {
2915                    matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored))
2916                        .then_some(i)
2917                })
2918                .collect();
2919            let generated_fast_evals: Vec<FastGenEval> = generated_col_positions
2920                .iter()
2921                .map(|&pos| {
2922                    detect_fast_gen_eval(ts.columns[pos].generated_expr.as_ref().unwrap(), ts)
2923                })
2924                .collect();
2925            let row_col_map = if on_conflict.is_some() || !generated_col_positions.is_empty() {
2926                Some(ColumnMap::new(&ts.columns))
2927            } else {
2928                None
2929            };
2930            let pk_indices: Vec<usize> = ts.pk_indices().to_vec();
2931            let non_pk_indices: Vec<usize> = ts.non_pk_indices().to_vec();
2932            let encoding_positions: Vec<u16> = ts.encoding_positions().to_vec();
2933            let dropped_non_pk_slots: Vec<u16> = ts.dropped_non_pk_slots().to_vec();
2934            let phys_count = ts.physical_non_pk_count();
2935            let col_data_types: Vec<DataType> = ts.columns.iter().map(|c| c.data_type).collect();
2936            let single_int_pk =
2937                pk_indices.len() == 1 && ts.columns[pk_indices[0]].data_type == DataType::Integer;
2938            let not_null_indices: Vec<u16> = ts
2939                .columns
2940                .iter()
2941                .filter(|c| !c.nullable)
2942                .map(|c| c.position)
2943                .collect();
2944            let bind_plan = build_bind_plan(stmt, &col_indices, &col_data_types);
2945            let any_defaults_flag = ts.columns.iter().any(|c| c.default_expr.is_some());
2946            let row_fully_overwritten = if any_defaults_flag {
2947                false
2948            } else {
2949                let mut covered: rustc_hash::FxHashSet<usize> =
2950                    col_indices.iter().copied().collect();
2951                covered.extend(generated_col_positions.iter().copied());
2952                for (j, &i) in non_pk_indices.iter().enumerate() {
2953                    let _ = j;
2954                    if matches!(
2955                        ts.columns[i].generated_kind,
2956                        Some(crate::parser::GeneratedKind::Virtual)
2957                    ) {
2958                        covered.insert(i);
2959                    }
2960                }
2961                bind_plan.is_some() && covered.len() == ts.columns.len()
2962            };
2963            let has_fks = !ts.foreign_keys.is_empty();
2964            let has_indices = !ts.indices.is_empty();
2965            let mut non_virtual_pairs: Vec<(usize, usize)> = Vec::new();
2966            let mut null_value_slots: Vec<usize> =
2967                dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
2968            for (j, &i) in non_pk_indices.iter().enumerate() {
2969                let slot = encoding_positions[j] as usize;
2970                if matches!(
2971                    ts.columns[i].generated_kind,
2972                    Some(crate::parser::GeneratedKind::Virtual)
2973                ) {
2974                    null_value_slots.push(slot);
2975                } else {
2976                    non_virtual_pairs.push((i, slot));
2977                }
2978            }
2979            let row_encoder = {
2980                let all_int_or_null = non_pk_indices.iter().enumerate().all(|(j, &i)| {
2981                    let col = &ts.columns[i];
2982                    if matches!(
2983                        col.generated_kind,
2984                        Some(crate::parser::GeneratedKind::Virtual)
2985                    ) {
2986                        true
2987                    } else {
2988                        col.data_type == DataType::Integer && encoding_positions[j] != u16::MAX
2989                    }
2990                });
2991                if all_int_or_null {
2992                    let mut null_slots: Vec<usize> =
2993                        dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
2994                    for (j, &i) in non_pk_indices.iter().enumerate() {
2995                        if matches!(
2996                            ts.columns[i].generated_kind,
2997                            Some(crate::parser::GeneratedKind::Virtual)
2998                        ) {
2999                            null_slots.push(encoding_positions[j] as usize);
3000                        }
3001                    }
3002                    Some(crate::encoding::build_int_row_template(
3003                        phys_count,
3004                        &null_slots,
3005                    ))
3006                } else {
3007                    None
3008                }
3009            };
3010            let is_trivial_fast_eligible = !insert_has_subquery(stmt)
3011                && !ts.columns.iter().any(|c| c.default_expr.is_some())
3012                && !ts.has_checks()
3013                && !has_fks
3014                && !has_indices
3015                && stmt.on_conflict.is_none()
3016                && stmt.returning.is_none()
3017                && bind_plan.is_some()
3018                && row_encoder.is_some()
3019                && row_fully_overwritten
3020                && single_int_pk
3021                && generated_fast_evals
3022                    .iter()
3023                    .all(|fe| !matches!(fe, FastGenEval::None));
3024            let trivial_fast_program = if is_trivial_fast_eligible {
3025                build_trivial_fast_program(
3026                    bind_plan.as_ref().unwrap(),
3027                    row_encoder.as_ref().unwrap(),
3028                    &non_virtual_pairs,
3029                    &generated_col_positions,
3030                    &generated_fast_evals,
3031                    &pk_indices,
3032                    &ts.columns,
3033                )
3034            } else {
3035                None
3036            };
3037            let is_trivial_fast = trivial_fast_program.is_some();
3038            Some(InsertCache {
3039                col_indices,
3040                has_subquery: insert_has_subquery(stmt),
3041                any_defaults: ts.columns.iter().any(|c| c.default_expr.is_some()),
3042                has_checks: ts.has_checks(),
3043                on_conflict,
3044                row_col_map,
3045                generated_col_positions,
3046                generated_fast_evals,
3047                pk_indices,
3048                non_pk_indices,
3049                encoding_positions,
3050                dropped_non_pk_slots,
3051                phys_count,
3052                single_int_pk,
3053                not_null_indices,
3054                bind_plan,
3055                row_fully_overwritten,
3056                row_encoder,
3057                is_trivial_fast,
3058                trivial_fast_program,
3059            })
3060        } else if schema.get_view(&lower).is_some() {
3061            None
3062        } else {
3063            return None;
3064        };
3065        Some(Self {
3066            table_lower: lower,
3067            cached,
3068        })
3069    }
3070}
3071
3072impl CompiledPlan for CompiledInsert {
3073    fn execute(
3074        &self,
3075        db: &Database,
3076        schema: &SchemaManager,
3077        stmt: &Statement,
3078        params: &[Value],
3079        wtx: Option<&mut WriteTxn<'_>>,
3080    ) -> Result<ExecutionResult> {
3081        let ins = match stmt {
3082            Statement::Insert(i) => i,
3083            _ => {
3084                return Err(SqlError::Unsupported(
3085                    "CompiledInsert received non-INSERT statement".into(),
3086                ))
3087            }
3088        };
3089        match wtx {
3090            None => exec_insert(db, schema, ins, params),
3091            Some(outer) => match self.cached.as_ref() {
3092                Some(c) if c.is_trivial_fast => with_insert_scratch(|bufs| {
3093                    exec_insert_trivial_fast(outer, &self.table_lower, c, bufs, params)
3094                }),
3095                Some(c) => exec_insert_in_txn_cached(outer, schema, ins, params, c),
3096                None => exec_insert_in_txn(outer, schema, ins, params),
3097            },
3098        }
3099    }
3100
3101    fn uses_scoped_params(&self) -> bool {
3102        match self.cached.as_ref() {
3103            Some(c) => !c.is_trivial_fast,
3104            None => true,
3105        }
3106    }
3107}
3108
3109pub struct CompiledDelete {
3110    table_lower: String,
3111}
3112
3113impl CompiledDelete {
3114    pub fn try_compile(schema: &SchemaManager, stmt: &DeleteStmt) -> Option<Self> {
3115        let lower = stmt.table.to_ascii_lowercase();
3116        schema.get(&lower)?;
3117        Some(Self { table_lower: lower })
3118    }
3119}
3120
3121impl CompiledPlan for CompiledDelete {
3122    fn execute(
3123        &self,
3124        db: &Database,
3125        schema: &SchemaManager,
3126        stmt: &Statement,
3127        _params: &[Value],
3128        wtx: Option<&mut WriteTxn<'_>>,
3129    ) -> Result<ExecutionResult> {
3130        let del = match stmt {
3131            Statement::Delete(d) => d,
3132            _ => {
3133                return Err(SqlError::Unsupported(
3134                    "CompiledDelete received non-DELETE statement".into(),
3135                ))
3136            }
3137        };
3138        let _ = &self.table_lower;
3139        match wtx {
3140            None => super::write::exec_delete(db, schema, del),
3141            Some(outer) => super::write::exec_delete_in_txn(outer, schema, del),
3142        }
3143    }
3144}