Skip to main content

citadel_sql/executor/
dml.rs

1use std::cell::RefCell;
2use std::sync::Arc;
3
4use citadel::Database;
5use citadel_buffer::btree::{UpsertAction, UpsertOutcome};
6use citadel_txn::write_txn::WriteTxn;
7use rustc_hash::FxHashMap;
8
9use crate::encoding::{encode_composite_key_into, encode_row_into};
10use crate::error::{Result, SqlError};
11use crate::eval::{eval_expr, is_truthy, ColumnMap, EvalCtx};
12use crate::parser::*;
13use crate::types::*;
14
15use crate::schema::SchemaManager;
16
17use super::compile::CompiledPlan;
18use super::helpers::*;
19use super::CteContext;
20
21pub(super) fn exec_insert(
22    db: &Database,
23    schema: &SchemaManager,
24    stmt: &InsertStmt,
25    params: &[Value],
26) -> Result<ExecutionResult> {
27    let empty_ctes = CteContext::default();
28    let materialized;
29    let stmt = if insert_has_subquery(stmt) {
30        materialized = materialize_insert(stmt, &mut |sub| {
31            exec_subquery_read(db, schema, sub, &empty_ctes)
32        })?;
33        &materialized
34    } else {
35        stmt
36    };
37
38    let lower_name = stmt.table.to_ascii_lowercase();
39    if schema.get_view(&lower_name).is_some() {
40        return Err(SqlError::CannotModifyView(stmt.table.clone()));
41    }
42    let table_schema = schema
43        .get(&lower_name)
44        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
45
46    let insert_columns = if stmt.columns.is_empty() {
47        table_schema
48            .columns
49            .iter()
50            .map(|c| c.name.clone())
51            .collect::<Vec<_>>()
52    } else {
53        stmt.columns
54            .iter()
55            .map(|c| c.to_ascii_lowercase())
56            .collect()
57    };
58
59    let col_indices: Vec<usize> = insert_columns
60        .iter()
61        .map(|name| {
62            table_schema
63                .column_index(name)
64                .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
65        })
66        .collect::<Result<_>>()?;
67
68    for &ci in &col_indices {
69        if table_schema.columns[ci].generated_kind.is_some() {
70            return Err(SqlError::CannotInsertIntoGeneratedColumn(
71                table_schema.columns[ci].name.clone(),
72            ));
73        }
74    }
75
76    let defaults: Vec<(usize, &Expr)> = table_schema
77        .columns
78        .iter()
79        .filter(|c| c.default_expr.is_some() && !col_indices.contains(&(c.position as usize)))
80        .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
81        .collect();
82
83    let generated_cols: Vec<(usize, &Expr)> = table_schema
84        .columns
85        .iter()
86        .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
87        .map(|c| (c.position as usize, c.generated_expr.as_ref().unwrap()))
88        .collect();
89
90    let has_checks = table_schema.has_checks();
91    let strict = table_schema.is_strict();
92    let row_col_map_for_gen = if !generated_cols.is_empty() {
93        Some(ColumnMap::new(&table_schema.columns))
94    } else {
95        None
96    };
97    let check_col_map = if has_checks {
98        Some(ColumnMap::new(&table_schema.columns))
99    } else {
100        None
101    };
102
103    let select_rows = match &stmt.source {
104        InsertSource::Select(sq) => {
105            let insert_ctes =
106                super::materialize_all_ctes(&sq.ctes, sq.recursive, &mut |body, ctx| {
107                    exec_query_body_read(db, schema, body, ctx)
108                })?;
109            let qr = exec_query_body_read(db, schema, &sq.body, &insert_ctes)?;
110            Some(qr.rows)
111        }
112        InsertSource::Values(_) => None,
113    };
114
115    let compiled_conflict: Option<Arc<CompiledOnConflict>> = stmt
116        .on_conflict
117        .as_ref()
118        .map(|oc| compile_on_conflict(oc, table_schema).map(Arc::new))
119        .transpose()?;
120
121    let row_col_map = compiled_conflict
122        .as_ref()
123        .map(|_| ColumnMap::new(&table_schema.columns));
124
125    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
126    let mut count: u64 = 0;
127    let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
128        stmt.returning.as_ref().map(|_| Vec::new());
129
130    let pk_indices = table_schema.pk_indices();
131    let non_pk = table_schema.non_pk_indices();
132    let enc_pos = table_schema.encoding_positions();
133    let phys_count = table_schema.physical_non_pk_count();
134    let mut row = vec![Value::Null; table_schema.columns.len()];
135    let mut pk_values: Vec<Value> = vec![Value::Null; pk_indices.len()];
136    let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
137    let mut key_buf: Vec<u8> = Vec::with_capacity(64);
138    let mut value_buf: Vec<u8> = Vec::with_capacity(256);
139    let mut fk_key_buf: Vec<u8> = Vec::with_capacity(64);
140
141    let values = match &stmt.source {
142        InsertSource::Values(rows) => Some(rows.as_slice()),
143        InsertSource::Select(_) => None,
144    };
145    let sel_rows = select_rows.as_deref();
146
147    let total = match (values, sel_rows) {
148        (Some(rows), _) => rows.len(),
149        (_, Some(rows)) => rows.len(),
150        _ => 0,
151    };
152
153    if let Some(sel) = sel_rows {
154        if !sel.is_empty() && sel[0].len() != insert_columns.len() {
155            return Err(SqlError::InvalidValue(format!(
156                "INSERT ... SELECT column count mismatch: expected {}, got {}",
157                insert_columns.len(),
158                sel[0].len()
159            )));
160        }
161    }
162
163    for idx in 0..total {
164        for v in row.iter_mut() {
165            *v = Value::Null;
166        }
167
168        if let Some(value_rows) = values {
169            let value_row = &value_rows[idx];
170            if value_row.len() != insert_columns.len() {
171                return Err(SqlError::InvalidValue(format!(
172                    "expected {} values, got {}",
173                    insert_columns.len(),
174                    value_row.len()
175                )));
176            }
177            for (i, expr) in value_row.iter().enumerate() {
178                let val = if let Expr::Parameter(n) = expr {
179                    params
180                        .get(n - 1)
181                        .cloned()
182                        .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?
183                } else {
184                    eval_const_expr(expr)?
185                };
186                let col_idx = col_indices[i];
187                let col = &table_schema.columns[col_idx];
188                row[col_idx] = if val.is_null() {
189                    Value::Null
190                } else {
191                    coerce_for_column(val, col, strict)?
192                };
193            }
194        } else if let Some(sel) = sel_rows {
195            let sel_row = &sel[idx];
196            for (i, val) in sel_row.iter().enumerate() {
197                let col_idx = col_indices[i];
198                let col = &table_schema.columns[col_idx];
199                row[col_idx] = if val.is_null() {
200                    Value::Null
201                } else {
202                    coerce_for_column(val.clone(), col, strict)?
203                };
204            }
205        }
206
207        for &(pos, def_expr) in &defaults {
208            let val = eval_const_expr(def_expr)?;
209            let col = &table_schema.columns[pos];
210            if !val.is_null() {
211                row[pos] = coerce_for_column(val, col, strict)?;
212            }
213        }
214
215        if let Some(ref gen_map) = row_col_map_for_gen {
216            for &(pos, gen_expr) in &generated_cols {
217                let val = eval_expr(gen_expr, &EvalCtx::new(gen_map, &row))?;
218                let col = &table_schema.columns[pos];
219                row[pos] = if val.is_null() {
220                    Value::Null
221                } else {
222                    coerce_for_column(val, col, strict)?
223                };
224            }
225        }
226
227        for col in &table_schema.columns {
228            if !col.nullable && row[col.position as usize].is_null() {
229                return Err(SqlError::NotNullViolation(col.name.clone()));
230            }
231        }
232
233        if let Some(ref col_map) = check_col_map {
234            for col in &table_schema.columns {
235                if let Some(ref check) = col.check_expr {
236                    let result = eval_expr(check, &EvalCtx::new(col_map, &row))?;
237                    if !is_truthy(&result) && !result.is_null() {
238                        let name = col.check_name.as_deref().unwrap_or(&col.name);
239                        return Err(SqlError::CheckViolation(name.to_string()));
240                    }
241                }
242            }
243            for tc in &table_schema.check_constraints {
244                let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &row))?;
245                if !is_truthy(&result) && !result.is_null() {
246                    let name = tc.name.as_deref().unwrap_or(&tc.sql);
247                    return Err(SqlError::CheckViolation(name.to_string()));
248                }
249            }
250        }
251
252        for fk in &table_schema.foreign_keys {
253            let any_null = fk.columns.iter().any(|&ci| row[ci as usize].is_null());
254            if any_null {
255                continue; // MATCH SIMPLE: skip if any FK col is NULL
256            }
257            let fk_vals: Vec<Value> = fk
258                .columns
259                .iter()
260                .map(|&ci| row[ci as usize].clone())
261                .collect();
262            fk_key_buf.clear();
263            encode_composite_key_into(&fk_vals, &mut fk_key_buf);
264            let found = wtx
265                .table_get(fk.foreign_table.as_bytes(), &fk_key_buf)
266                .map_err(SqlError::Storage)?;
267            if found.is_none() {
268                let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
269                return Err(SqlError::ForeignKeyViolation(name.to_string()));
270            }
271        }
272
273        let proposed_row_for_returning: Option<Vec<Value>> =
274            returning_rows.as_ref().map(|_| row.clone());
275
276        for (j, &i) in pk_indices.iter().enumerate() {
277            pk_values[j] = std::mem::replace(&mut row[i], Value::Null);
278        }
279        encode_composite_key_into(&pk_values, &mut key_buf);
280
281        for (j, &i) in non_pk.iter().enumerate() {
282            let col = &table_schema.columns[i];
283            if matches!(
284                col.generated_kind,
285                Some(crate::parser::GeneratedKind::Virtual)
286            ) {
287                value_values[enc_pos[j] as usize] = Value::Null;
288                row[i] = Value::Null;
289            } else {
290                value_values[enc_pos[j] as usize] = std::mem::replace(&mut row[i], Value::Null);
291            }
292        }
293        encode_row_into(&value_values, &mut value_buf);
294
295        if key_buf.len() > citadel_core::MAX_KEY_SIZE {
296            return Err(SqlError::KeyTooLarge {
297                size: key_buf.len(),
298                max: citadel_core::MAX_KEY_SIZE,
299            });
300        }
301        if value_buf.len() > citadel_core::MAX_VALUE_SIZE {
302            return Err(SqlError::RowTooLarge {
303                size: value_buf.len(),
304                max: citadel_core::MAX_VALUE_SIZE,
305            });
306        }
307
308        match compiled_conflict.as_ref() {
309            None => {
310                let is_new = wtx
311                    .table_insert(stmt.table.as_bytes(), &key_buf, &value_buf)
312                    .map_err(SqlError::Storage)?;
313                if !is_new {
314                    return Err(SqlError::DuplicateKey);
315                }
316                if !table_schema.indices.is_empty() {
317                    for (j, &i) in pk_indices.iter().enumerate() {
318                        row[i] = pk_values[j].clone();
319                    }
320                    for (j, &i) in non_pk.iter().enumerate() {
321                        row[i] =
322                            std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
323                    }
324                    insert_index_entries(&mut wtx, table_schema, &row, &pk_values)?;
325                }
326                count += 1;
327                if let Some(buf) = returning_rows.as_mut() {
328                    buf.push((None, proposed_row_for_returning));
329                }
330            }
331            Some(oc) => {
332                let oc_ref: &CompiledOnConflict = oc;
333                let needs_row = upsert_needs_row(oc_ref, table_schema);
334                if needs_row {
335                    for (j, &i) in pk_indices.iter().enumerate() {
336                        row[i] = pk_values[j].clone();
337                    }
338                    for (j, &i) in non_pk.iter().enumerate() {
339                        row[i] =
340                            std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
341                    }
342                }
343                let outcome = apply_insert_with_conflict(
344                    &mut wtx,
345                    table_schema,
346                    &key_buf,
347                    &value_buf,
348                    &row,
349                    &pk_values,
350                    oc_ref,
351                    row_col_map.as_ref().unwrap(),
352                    stmt.returning.is_some(),
353                )?;
354                match outcome {
355                    InsertRowOutcome::Inserted => {
356                        count += 1;
357                        if let Some(buf) = returning_rows.as_mut() {
358                            buf.push((None, proposed_row_for_returning));
359                        }
360                    }
361                    InsertRowOutcome::Updated { old, new } => {
362                        count += 1;
363                        if let Some(buf) = returning_rows.as_mut() {
364                            buf.push((Some(old), Some(new)));
365                        }
366                    }
367                    InsertRowOutcome::Skipped => {}
368                }
369            }
370        }
371    }
372
373    if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
374        let qr = super::helpers::project_returning(table_schema, returning_cols, &rows)?;
375        wtx.commit().map_err(SqlError::Storage)?;
376        return Ok(ExecutionResult::Query(qr));
377    }
378
379    wtx.commit().map_err(SqlError::Storage)?;
380    Ok(ExecutionResult::RowsAffected(count))
381}
382
383pub(super) fn has_subquery(expr: &Expr) -> bool {
384    crate::parser::has_subquery(expr)
385}
386
387pub(super) fn stmt_has_subquery(stmt: &SelectStmt) -> bool {
388    if let Some(ref w) = stmt.where_clause {
389        if has_subquery(w) {
390            return true;
391        }
392    }
393    if let Some(ref h) = stmt.having {
394        if has_subquery(h) {
395            return true;
396        }
397    }
398    for col in &stmt.columns {
399        if let SelectColumn::Expr { expr, .. } = col {
400            if has_subquery(expr) {
401                return true;
402            }
403        }
404    }
405    for ob in &stmt.order_by {
406        if has_subquery(&ob.expr) {
407            return true;
408        }
409    }
410    for join in &stmt.joins {
411        if let Some(ref on_expr) = join.on_clause {
412            if has_subquery(on_expr) {
413                return true;
414            }
415        }
416    }
417    false
418}
419
420pub(super) fn materialize_expr(
421    expr: &Expr,
422    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
423) -> Result<Expr> {
424    match expr {
425        Expr::InSubquery {
426            expr: e,
427            subquery,
428            negated,
429        } => {
430            let inner = materialize_expr(e, exec_sub)?;
431            let qr = exec_sub(subquery)?;
432            if !qr.columns.is_empty() && qr.columns.len() != 1 {
433                return Err(SqlError::SubqueryMultipleColumns);
434            }
435            let mut values = rustc_hash::FxHashSet::default();
436            let mut has_null = false;
437            for row in &qr.rows {
438                if row[0].is_null() {
439                    has_null = true;
440                } else {
441                    values.insert(row[0].clone());
442                }
443            }
444            Ok(Expr::InSet {
445                expr: Box::new(inner),
446                values,
447                has_null,
448                negated: *negated,
449            })
450        }
451        Expr::ScalarSubquery(subquery) => {
452            let qr = exec_sub(subquery)?;
453            if qr.rows.len() > 1 {
454                return Err(SqlError::SubqueryMultipleRows);
455            }
456            let val = if qr.rows.is_empty() {
457                Value::Null
458            } else {
459                qr.rows[0][0].clone()
460            };
461            Ok(Expr::Literal(val))
462        }
463        Expr::Exists { subquery, negated } => {
464            let qr = exec_sub(subquery)?;
465            let exists = !qr.rows.is_empty();
466            let result = if *negated { !exists } else { exists };
467            Ok(Expr::Literal(Value::Boolean(result)))
468        }
469        Expr::InList {
470            expr: e,
471            list,
472            negated,
473        } => {
474            let inner = materialize_expr(e, exec_sub)?;
475            let items = list
476                .iter()
477                .map(|item| materialize_expr(item, exec_sub))
478                .collect::<Result<Vec<_>>>()?;
479            Ok(Expr::InList {
480                expr: Box::new(inner),
481                list: items,
482                negated: *negated,
483            })
484        }
485        Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
486            left: Box::new(materialize_expr(left, exec_sub)?),
487            op: *op,
488            right: Box::new(materialize_expr(right, exec_sub)?),
489        }),
490        Expr::UnaryOp { op, expr: e } => Ok(Expr::UnaryOp {
491            op: *op,
492            expr: Box::new(materialize_expr(e, exec_sub)?),
493        }),
494        Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(materialize_expr(e, exec_sub)?))),
495        Expr::IsNotNull(e) => Ok(Expr::IsNotNull(Box::new(materialize_expr(e, exec_sub)?))),
496        Expr::InSet {
497            expr: e,
498            values,
499            has_null,
500            negated,
501        } => Ok(Expr::InSet {
502            expr: Box::new(materialize_expr(e, exec_sub)?),
503            values: values.clone(),
504            has_null: *has_null,
505            negated: *negated,
506        }),
507        Expr::Between {
508            expr: e,
509            low,
510            high,
511            negated,
512        } => Ok(Expr::Between {
513            expr: Box::new(materialize_expr(e, exec_sub)?),
514            low: Box::new(materialize_expr(low, exec_sub)?),
515            high: Box::new(materialize_expr(high, exec_sub)?),
516            negated: *negated,
517        }),
518        Expr::Like {
519            expr: e,
520            pattern,
521            escape,
522            negated,
523        } => {
524            let esc = escape
525                .as_ref()
526                .map(|es| materialize_expr(es, exec_sub).map(Box::new))
527                .transpose()?;
528            Ok(Expr::Like {
529                expr: Box::new(materialize_expr(e, exec_sub)?),
530                pattern: Box::new(materialize_expr(pattern, exec_sub)?),
531                escape: esc,
532                negated: *negated,
533            })
534        }
535        Expr::Case {
536            operand,
537            conditions,
538            else_result,
539        } => {
540            let op = operand
541                .as_ref()
542                .map(|e| materialize_expr(e, exec_sub).map(Box::new))
543                .transpose()?;
544            let conds = conditions
545                .iter()
546                .map(|(c, r)| {
547                    Ok((
548                        materialize_expr(c, exec_sub)?,
549                        materialize_expr(r, exec_sub)?,
550                    ))
551                })
552                .collect::<Result<Vec<_>>>()?;
553            let else_r = else_result
554                .as_ref()
555                .map(|e| materialize_expr(e, exec_sub).map(Box::new))
556                .transpose()?;
557            Ok(Expr::Case {
558                operand: op,
559                conditions: conds,
560                else_result: else_r,
561            })
562        }
563        Expr::Coalesce(args) => {
564            let materialized = args
565                .iter()
566                .map(|a| materialize_expr(a, exec_sub))
567                .collect::<Result<Vec<_>>>()?;
568            Ok(Expr::Coalesce(materialized))
569        }
570        Expr::Cast { expr: e, data_type } => Ok(Expr::Cast {
571            expr: Box::new(materialize_expr(e, exec_sub)?),
572            data_type: *data_type,
573        }),
574        Expr::Function {
575            name,
576            args,
577            distinct,
578        } => {
579            let materialized = args
580                .iter()
581                .map(|a| materialize_expr(a, exec_sub))
582                .collect::<Result<Vec<_>>>()?;
583            Ok(Expr::Function {
584                name: name.clone(),
585                args: materialized,
586                distinct: *distinct,
587            })
588        }
589        other => Ok(other.clone()),
590    }
591}
592
593pub(super) fn materialize_stmt(
594    stmt: &SelectStmt,
595    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
596) -> Result<SelectStmt> {
597    let where_clause = stmt
598        .where_clause
599        .as_ref()
600        .map(|e| materialize_expr(e, exec_sub))
601        .transpose()?;
602    let having = stmt
603        .having
604        .as_ref()
605        .map(|e| materialize_expr(e, exec_sub))
606        .transpose()?;
607    let columns = stmt
608        .columns
609        .iter()
610        .map(|c| match c {
611            SelectColumn::AllColumns => Ok(SelectColumn::AllColumns),
612            SelectColumn::AllFromOld => Ok(SelectColumn::AllFromOld),
613            SelectColumn::AllFromNew => Ok(SelectColumn::AllFromNew),
614            SelectColumn::Expr { expr, alias } => Ok(SelectColumn::Expr {
615                expr: materialize_expr(expr, exec_sub)?,
616                alias: alias.clone(),
617            }),
618        })
619        .collect::<Result<Vec<_>>>()?;
620    let order_by = stmt
621        .order_by
622        .iter()
623        .map(|ob| {
624            Ok(OrderByItem {
625                expr: materialize_expr(&ob.expr, exec_sub)?,
626                descending: ob.descending,
627                nulls_first: ob.nulls_first,
628            })
629        })
630        .collect::<Result<Vec<_>>>()?;
631    let joins = stmt
632        .joins
633        .iter()
634        .map(|j| {
635            let on_clause = j
636                .on_clause
637                .as_ref()
638                .map(|e| materialize_expr(e, exec_sub))
639                .transpose()?;
640            Ok(JoinClause {
641                join_type: j.join_type,
642                table: j.table.clone(),
643                subquery: j.subquery.clone(),
644                on_clause,
645            })
646        })
647        .collect::<Result<Vec<_>>>()?;
648    let group_by = stmt
649        .group_by
650        .iter()
651        .map(|e| materialize_expr(e, exec_sub))
652        .collect::<Result<Vec<_>>>()?;
653    Ok(SelectStmt {
654        columns,
655        from: stmt.from.clone(),
656        from_alias: stmt.from_alias.clone(),
657        from_subquery: stmt.from_subquery.clone(),
658        from_args: stmt.from_args.clone(),
659        from_json_table: stmt.from_json_table.clone(),
660        joins,
661        distinct: stmt.distinct,
662        where_clause,
663        order_by,
664        limit: stmt.limit.clone(),
665        offset: stmt.offset.clone(),
666        group_by,
667        having,
668    })
669}
670
671pub(super) fn exec_subquery_read(
672    db: &Database,
673    schema: &SchemaManager,
674    stmt: &SelectStmt,
675    ctes: &CteContext,
676) -> Result<QueryResult> {
677    match super::exec_select(db, schema, stmt, ctes)? {
678        ExecutionResult::Query(qr) => Ok(qr),
679        _ => Ok(QueryResult {
680            columns: vec![],
681            rows: vec![],
682        }),
683    }
684}
685
686pub(super) fn exec_subquery_write(
687    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
688    schema: &SchemaManager,
689    stmt: &SelectStmt,
690    ctes: &CteContext,
691) -> Result<QueryResult> {
692    match super::exec_select_in_txn(wtx, schema, stmt, ctes)? {
693        ExecutionResult::Query(qr) => Ok(qr),
694        _ => Ok(QueryResult {
695            columns: vec![],
696            rows: vec![],
697        }),
698    }
699}
700
701pub(super) fn update_has_subquery(stmt: &UpdateStmt) -> bool {
702    stmt.where_clause.as_ref().is_some_and(has_subquery)
703        || stmt.assignments.iter().any(|(_, e)| has_subquery(e))
704}
705
706pub(super) fn materialize_update(
707    stmt: &UpdateStmt,
708    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
709) -> Result<UpdateStmt> {
710    let where_clause = stmt
711        .where_clause
712        .as_ref()
713        .map(|e| materialize_expr(e, exec_sub))
714        .transpose()?;
715    let assignments = stmt
716        .assignments
717        .iter()
718        .map(|(name, expr)| Ok((name.clone(), materialize_expr(expr, exec_sub)?)))
719        .collect::<Result<Vec<_>>>()?;
720    Ok(UpdateStmt {
721        table: stmt.table.clone(),
722        assignments,
723        where_clause,
724        returning: stmt.returning.clone(),
725    })
726}
727
728pub(super) fn delete_has_subquery(stmt: &DeleteStmt) -> bool {
729    stmt.where_clause.as_ref().is_some_and(has_subquery)
730}
731
732pub(super) fn materialize_delete(
733    stmt: &DeleteStmt,
734    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
735) -> Result<DeleteStmt> {
736    let where_clause = stmt
737        .where_clause
738        .as_ref()
739        .map(|e| materialize_expr(e, exec_sub))
740        .transpose()?;
741    Ok(DeleteStmt {
742        table: stmt.table.clone(),
743        where_clause,
744        returning: stmt.returning.clone(),
745    })
746}
747
748pub(super) fn insert_has_subquery(stmt: &InsertStmt) -> bool {
749    match &stmt.source {
750        InsertSource::Values(rows) => rows.iter().any(|row| row.iter().any(has_subquery)),
751        // SELECT source subqueries are handled by exec_select's correlated/non-correlated paths
752        InsertSource::Select(_) => false,
753    }
754}
755
756pub(super) fn materialize_insert(
757    stmt: &InsertStmt,
758    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
759) -> Result<InsertStmt> {
760    let source = match &stmt.source {
761        InsertSource::Values(rows) => {
762            let mat = rows
763                .iter()
764                .map(|row| {
765                    row.iter()
766                        .map(|e| materialize_expr(e, exec_sub))
767                        .collect::<Result<Vec<_>>>()
768                })
769                .collect::<Result<Vec<_>>>()?;
770            InsertSource::Values(mat)
771        }
772        InsertSource::Select(sq) => {
773            let ctes = sq
774                .ctes
775                .iter()
776                .map(|c| {
777                    Ok(CteDefinition {
778                        name: c.name.clone(),
779                        column_aliases: c.column_aliases.clone(),
780                        body: materialize_query_body(&c.body, exec_sub)?,
781                    })
782                })
783                .collect::<Result<Vec<_>>>()?;
784            let body = materialize_query_body(&sq.body, exec_sub)?;
785            InsertSource::Select(Box::new(SelectQuery {
786                ctes,
787                recursive: sq.recursive,
788                body,
789            }))
790        }
791    };
792    Ok(InsertStmt {
793        table: stmt.table.clone(),
794        columns: stmt.columns.clone(),
795        source,
796        on_conflict: stmt.on_conflict.clone(),
797        returning: stmt.returning.clone(),
798    })
799}
800
801pub(super) fn materialize_query_body(
802    body: &QueryBody,
803    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
804) -> Result<QueryBody> {
805    match body {
806        QueryBody::Select(sel) => Ok(QueryBody::Select(Box::new(materialize_stmt(
807            sel, exec_sub,
808        )?))),
809        QueryBody::Compound(comp) => Ok(QueryBody::Compound(Box::new(CompoundSelect {
810            op: comp.op.clone(),
811            all: comp.all,
812            left: Box::new(materialize_query_body(&comp.left, exec_sub)?),
813            right: Box::new(materialize_query_body(&comp.right, exec_sub)?),
814            order_by: comp.order_by.clone(),
815            limit: comp.limit.clone(),
816            offset: comp.offset.clone(),
817        }))),
818        QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Ok(body.clone()),
819    }
820}
821
822pub(super) fn exec_query_body(
823    db: &Database,
824    schema: &SchemaManager,
825    body: &QueryBody,
826    ctes: &CteContext,
827) -> Result<ExecutionResult> {
828    match body {
829        QueryBody::Select(sel) => super::exec_select(db, schema, sel, ctes),
830        QueryBody::Compound(comp) => exec_compound_select(db, schema, comp, ctes),
831        QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Err(
832            SqlError::Unsupported("DML CTE bodies require an active write transaction".into()),
833        ),
834    }
835}
836
837pub(super) fn exec_query_body_in_txn(
838    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
839    schema: &SchemaManager,
840    body: &QueryBody,
841    ctes: &CteContext,
842) -> Result<ExecutionResult> {
843    match body {
844        QueryBody::Select(sel) => super::exec_select_in_txn(wtx, schema, sel, ctes),
845        QueryBody::Compound(comp) => exec_compound_select_in_txn(wtx, schema, comp, ctes),
846        QueryBody::Insert(ins) => exec_insert_in_txn_with_ctes(wtx, schema, ins, &[], ctes),
847        QueryBody::Update(upd) => super::exec_update_in_txn(wtx, schema, upd),
848        QueryBody::Delete(del) => super::exec_delete_in_txn(wtx, schema, del),
849    }
850}
851
852pub(super) fn exec_query_body_read(
853    db: &Database,
854    schema: &SchemaManager,
855    body: &QueryBody,
856    ctes: &CteContext,
857) -> Result<QueryResult> {
858    match exec_query_body(db, schema, body, ctes)? {
859        ExecutionResult::Query(qr) => Ok(qr),
860        _ => Ok(QueryResult {
861            columns: vec![],
862            rows: vec![],
863        }),
864    }
865}
866
867pub(super) fn exec_query_body_write(
868    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
869    schema: &SchemaManager,
870    body: &QueryBody,
871    ctes: &CteContext,
872) -> Result<QueryResult> {
873    match exec_query_body_in_txn(wtx, schema, body, ctes)? {
874        ExecutionResult::Query(qr) => Ok(qr),
875        _ => Ok(QueryResult {
876            columns: vec![],
877            rows: vec![],
878        }),
879    }
880}
881
882pub(super) fn exec_compound_select(
883    db: &Database,
884    schema: &SchemaManager,
885    comp: &CompoundSelect,
886    ctes: &CteContext,
887) -> Result<ExecutionResult> {
888    let left_qr = match exec_query_body(db, schema, &comp.left, ctes)? {
889        ExecutionResult::Query(qr) => qr,
890        _ => QueryResult {
891            columns: vec![],
892            rows: vec![],
893        },
894    };
895    let right_qr = match exec_query_body(db, schema, &comp.right, ctes)? {
896        ExecutionResult::Query(qr) => qr,
897        _ => QueryResult {
898            columns: vec![],
899            rows: vec![],
900        },
901    };
902    apply_set_operation(comp, left_qr, right_qr)
903}
904
905pub(super) fn exec_compound_select_in_txn(
906    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
907    schema: &SchemaManager,
908    comp: &CompoundSelect,
909    ctes: &CteContext,
910) -> Result<ExecutionResult> {
911    let left_qr = match exec_query_body_in_txn(wtx, schema, &comp.left, ctes)? {
912        ExecutionResult::Query(qr) => qr,
913        _ => QueryResult {
914            columns: vec![],
915            rows: vec![],
916        },
917    };
918    let right_qr = match exec_query_body_in_txn(wtx, schema, &comp.right, ctes)? {
919        ExecutionResult::Query(qr) => qr,
920        _ => QueryResult {
921            columns: vec![],
922            rows: vec![],
923        },
924    };
925    apply_set_operation(comp, left_qr, right_qr)
926}
927
928pub(super) fn apply_set_operation(
929    comp: &CompoundSelect,
930    left_qr: QueryResult,
931    right_qr: QueryResult,
932) -> Result<ExecutionResult> {
933    if !left_qr.columns.is_empty()
934        && !right_qr.columns.is_empty()
935        && left_qr.columns.len() != right_qr.columns.len()
936    {
937        return Err(SqlError::CompoundColumnCountMismatch {
938            left: left_qr.columns.len(),
939            right: right_qr.columns.len(),
940        });
941    }
942
943    let columns = left_qr.columns;
944
945    let mut rows = match (&comp.op, comp.all) {
946        (SetOp::Union, true) => {
947            let mut rows = left_qr.rows;
948            rows.extend(right_qr.rows);
949            rows
950        }
951        (SetOp::Union, false) => {
952            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
953            let mut rows = Vec::new();
954            for row in left_qr.rows.into_iter().chain(right_qr.rows) {
955                if !seen.contains(&row) {
956                    seen.insert(row.clone());
957                    rows.push(row);
958                }
959            }
960            rows
961        }
962        (SetOp::Intersect, true) => {
963            let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
964            for row in &right_qr.rows {
965                *right_counts.entry(row.clone()).or_insert(0) += 1;
966            }
967            let mut rows = Vec::new();
968            for row in left_qr.rows {
969                if let Some(count) = right_counts.get_mut(&row) {
970                    if *count > 0 {
971                        *count -= 1;
972                        rows.push(row);
973                    }
974                }
975            }
976            rows
977        }
978        (SetOp::Intersect, false) => {
979            let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
980            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
981            let mut rows = Vec::new();
982            for row in left_qr.rows {
983                if right_set.contains(&row) && !seen.contains(&row) {
984                    seen.insert(row.clone());
985                    rows.push(row);
986                }
987            }
988            rows
989        }
990        (SetOp::Except, true) => {
991            let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
992            for row in &right_qr.rows {
993                *right_counts.entry(row.clone()).or_insert(0) += 1;
994            }
995            let mut rows = Vec::new();
996            for row in left_qr.rows {
997                if let Some(count) = right_counts.get_mut(&row) {
998                    if *count > 0 {
999                        *count -= 1;
1000                        continue;
1001                    }
1002                }
1003                rows.push(row);
1004            }
1005            rows
1006        }
1007        (SetOp::Except, false) => {
1008            let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1009            let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1010            let mut rows = Vec::new();
1011            for row in left_qr.rows {
1012                if !right_set.contains(&row) && !seen.contains(&row) {
1013                    seen.insert(row.clone());
1014                    rows.push(row);
1015                }
1016            }
1017            rows
1018        }
1019    };
1020
1021    if !comp.order_by.is_empty() {
1022        let col_defs: Vec<crate::types::ColumnDef> = columns
1023            .iter()
1024            .enumerate()
1025            .map(|(i, name)| crate::types::ColumnDef {
1026                name: name.clone(),
1027                data_type: crate::types::DataType::Null,
1028                nullable: true,
1029                position: i as u16,
1030                default_expr: None,
1031                default_sql: None,
1032                check_expr: None,
1033                check_sql: None,
1034                check_name: None,
1035                is_with_timezone: false,
1036                generated_expr: None,
1037                generated_sql: None,
1038                generated_kind: None,
1039                collation: crate::types::Collation::Binary,
1040            })
1041            .collect();
1042        sort_rows(&mut rows, &comp.order_by, &col_defs)?;
1043    }
1044
1045    if let Some(ref offset_expr) = comp.offset {
1046        let offset = eval_const_int(offset_expr)?.max(0) as usize;
1047        if offset < rows.len() {
1048            rows = rows.split_off(offset);
1049        } else {
1050            rows.clear();
1051        }
1052    }
1053
1054    if let Some(ref limit_expr) = comp.limit {
1055        let limit = eval_const_int(limit_expr)?.max(0) as usize;
1056        rows.truncate(limit);
1057    }
1058
1059    Ok(ExecutionResult::Query(QueryResult { columns, rows }))
1060}
1061
1062struct InsertBufs {
1063    row: Vec<Value>,
1064    pk_values: Vec<Value>,
1065    value_values: Vec<Value>,
1066    key_buf: Vec<u8>,
1067    value_buf: Vec<u8>,
1068    col_indices: Vec<usize>,
1069    fk_key_buf: Vec<u8>,
1070}
1071
1072impl InsertBufs {
1073    fn new() -> Self {
1074        Self {
1075            row: Vec::new(),
1076            pk_values: Vec::new(),
1077            value_values: Vec::new(),
1078            key_buf: Vec::with_capacity(64),
1079            value_buf: Vec::with_capacity(256),
1080            col_indices: Vec::new(),
1081            fk_key_buf: Vec::with_capacity(64),
1082        }
1083    }
1084}
1085
1086thread_local! {
1087    static INSERT_SCRATCH: RefCell<InsertBufs> = RefCell::new(InsertBufs::new());
1088    static UPSERT_SCRATCH: RefCell<UpsertBufs> = RefCell::new(UpsertBufs::new());
1089}
1090
1091fn with_insert_scratch<R>(f: impl FnOnce(&mut InsertBufs) -> R) -> R {
1092    INSERT_SCRATCH.with(|slot| f(&mut slot.borrow_mut()))
1093}
1094
1095pub(super) struct UpsertBufs {
1096    old_row: Vec<Value>,
1097    new_row: Vec<Value>,
1098    value_values: Vec<Value>,
1099    new_value_buf: Vec<u8>,
1100}
1101
1102impl UpsertBufs {
1103    pub(super) fn new() -> Self {
1104        Self {
1105            old_row: Vec::new(),
1106            new_row: Vec::new(),
1107            value_values: Vec::new(),
1108            new_value_buf: Vec::with_capacity(256),
1109        }
1110    }
1111}
1112
1113pub fn exec_insert_in_txn(
1114    wtx: &mut WriteTxn<'_>,
1115    schema: &SchemaManager,
1116    stmt: &InsertStmt,
1117    params: &[Value],
1118) -> Result<ExecutionResult> {
1119    with_insert_scratch(|bufs| {
1120        exec_insert_in_txn_impl(
1121            wtx,
1122            schema,
1123            stmt,
1124            params,
1125            bufs,
1126            None,
1127            &CteContext::default(),
1128        )
1129    })
1130}
1131
1132pub(super) fn exec_insert_in_txn_with_ctes(
1133    wtx: &mut WriteTxn<'_>,
1134    schema: &SchemaManager,
1135    stmt: &InsertStmt,
1136    params: &[Value],
1137    outer_ctes: &CteContext,
1138) -> Result<ExecutionResult> {
1139    with_insert_scratch(|bufs| {
1140        exec_insert_in_txn_impl(wtx, schema, stmt, params, bufs, None, outer_ctes)
1141    })
1142}
1143
1144fn exec_insert_in_txn_cached(
1145    wtx: &mut WriteTxn<'_>,
1146    schema: &SchemaManager,
1147    stmt: &InsertStmt,
1148    params: &[Value],
1149    cache: &InsertCache,
1150) -> Result<ExecutionResult> {
1151    with_insert_scratch(|bufs| {
1152        exec_insert_in_txn_impl(
1153            wtx,
1154            schema,
1155            stmt,
1156            params,
1157            bufs,
1158            Some(cache),
1159            &CteContext::default(),
1160        )
1161    })
1162}
1163
1164fn exec_insert_in_txn_impl(
1165    wtx: &mut WriteTxn<'_>,
1166    schema: &SchemaManager,
1167    stmt: &InsertStmt,
1168    params: &[Value],
1169    bufs: &mut InsertBufs,
1170    cache: Option<&InsertCache>,
1171    outer_ctes: &CteContext,
1172) -> Result<ExecutionResult> {
1173    let empty_ctes = CteContext::default();
1174    let materialized;
1175    let has_sub = match cache {
1176        Some(c) => c.has_subquery,
1177        None => insert_has_subquery(stmt),
1178    };
1179    let stmt = if has_sub {
1180        materialized = materialize_insert(stmt, &mut |sub| {
1181            exec_subquery_write(wtx, schema, sub, &empty_ctes)
1182        })?;
1183        &materialized
1184    } else {
1185        stmt
1186    };
1187
1188    let table_schema = schema
1189        .get(&stmt.table)
1190        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1191
1192    let default_columns;
1193    let insert_columns: &[String] = if stmt.columns.is_empty() {
1194        default_columns = table_schema
1195            .columns
1196            .iter()
1197            .map(|c| c.name.clone())
1198            .collect::<Vec<_>>();
1199        &default_columns
1200    } else {
1201        &stmt.columns
1202    };
1203
1204    bufs.col_indices.clear();
1205    if let Some(c) = cache {
1206        bufs.col_indices.extend_from_slice(&c.col_indices);
1207    } else {
1208        for name in insert_columns {
1209            bufs.col_indices.push(
1210                table_schema
1211                    .column_index(name)
1212                    .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?,
1213            );
1214        }
1215    }
1216
1217    if cache.is_none() {
1218        for &ci in &bufs.col_indices {
1219            if table_schema.columns[ci].generated_kind.is_some() {
1220                return Err(SqlError::CannotInsertIntoGeneratedColumn(
1221                    table_schema.columns[ci].name.clone(),
1222                ));
1223            }
1224        }
1225    }
1226
1227    let generated_cols_uncached: Vec<(usize, &Expr, FastGenEval)>;
1228    let cached_gen_positions: &[usize];
1229    let cached_gen_fast_evals: &[FastGenEval];
1230    if let Some(c) = cache {
1231        cached_gen_positions = &c.generated_col_positions;
1232        cached_gen_fast_evals = &c.generated_fast_evals;
1233        generated_cols_uncached = Vec::new();
1234    } else {
1235        cached_gen_positions = &[];
1236        cached_gen_fast_evals = &[];
1237        generated_cols_uncached = table_schema
1238            .columns
1239            .iter()
1240            .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
1241            .map(|c| {
1242                let expr = c.generated_expr.as_ref().unwrap();
1243                let fe = detect_fast_gen_eval(expr, table_schema);
1244                (c.position as usize, expr, fe)
1245            })
1246            .collect();
1247    }
1248    let has_gen_cols = !cached_gen_positions.is_empty() || !generated_cols_uncached.is_empty();
1249    let row_col_map_for_gen_owned: Option<ColumnMap> = if !has_gen_cols || cache.is_some() {
1250        None
1251    } else {
1252        Some(ColumnMap::new(&table_schema.columns))
1253    };
1254    let row_col_map_for_gen: Option<&ColumnMap> = if !has_gen_cols {
1255        None
1256    } else if let Some(c) = cache {
1257        c.row_col_map.as_ref()
1258    } else {
1259        row_col_map_for_gen_owned.as_ref()
1260    };
1261
1262    let any_defaults = match cache {
1263        Some(c) => c.any_defaults,
1264        None => table_schema
1265            .columns
1266            .iter()
1267            .any(|c| c.default_expr.is_some()),
1268    };
1269    let defaults: Vec<(usize, &Expr)> = if any_defaults {
1270        table_schema
1271            .columns
1272            .iter()
1273            .filter(|c| {
1274                c.default_expr.is_some() && !bufs.col_indices.contains(&(c.position as usize))
1275            })
1276            .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
1277            .collect()
1278    } else {
1279        Vec::new()
1280    };
1281
1282    let has_checks = match cache {
1283        Some(c) => c.has_checks,
1284        None => table_schema.has_checks(),
1285    };
1286    let check_col_map = if has_checks {
1287        Some(ColumnMap::new(&table_schema.columns))
1288    } else {
1289        None
1290    };
1291
1292    let (pk_indices, non_pk, enc_pos, phys_count, dropped): (
1293        &[usize],
1294        &[usize],
1295        &[u16],
1296        usize,
1297        &[u16],
1298    ) = if let Some(c) = cache {
1299        (
1300            &c.pk_indices,
1301            &c.non_pk_indices,
1302            &c.encoding_positions,
1303            c.phys_count,
1304            &c.dropped_non_pk_slots,
1305        )
1306    } else {
1307        (
1308            table_schema.pk_indices(),
1309            table_schema.non_pk_indices(),
1310            table_schema.encoding_positions(),
1311            table_schema.physical_non_pk_count(),
1312            table_schema.dropped_non_pk_slots(),
1313        )
1314    };
1315
1316    bufs.row.resize(table_schema.columns.len(), Value::Null);
1317    bufs.pk_values.resize(pk_indices.len(), Value::Null);
1318    bufs.value_values.resize(phys_count, Value::Null);
1319
1320    let table_bytes = stmt.table.as_bytes();
1321    let has_fks = !table_schema.foreign_keys.is_empty();
1322    let has_indices = !table_schema.indices.is_empty();
1323    let has_defaults = !defaults.is_empty();
1324
1325    let compiled_conflict: Option<Arc<CompiledOnConflict>> = match (cache, &stmt.on_conflict) {
1326        (Some(c), Some(_)) if c.on_conflict.is_some() => c.on_conflict.clone(),
1327        (_, Some(oc)) => Some(Arc::new(compile_on_conflict(oc, table_schema)?)),
1328        (_, None) => None,
1329    };
1330
1331    let row_col_map_owned: Option<ColumnMap> =
1332        if compiled_conflict.is_some() && cache.and_then(|c| c.row_col_map.as_ref()).is_none() {
1333            Some(ColumnMap::new(&table_schema.columns))
1334        } else {
1335            None
1336        };
1337    let row_col_map: Option<&ColumnMap> = cache
1338        .and_then(|c| c.row_col_map.as_ref())
1339        .or(row_col_map_owned.as_ref());
1340
1341    let select_rows = match &stmt.source {
1342        InsertSource::Select(sq) => {
1343            let insert_ctes = super::materialize_all_ctes_with_outer(
1344                &sq.ctes,
1345                sq.recursive,
1346                outer_ctes,
1347                &mut |body, ctx| exec_query_body_write(wtx, schema, body, ctx),
1348            )?;
1349            let qr = exec_query_body_write(wtx, schema, &sq.body, &insert_ctes)?;
1350            Some(qr.rows)
1351        }
1352        InsertSource::Values(_) => None,
1353    };
1354
1355    let mut count: u64 = 0;
1356    let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
1357        stmt.returning.as_ref().map(|_| Vec::new());
1358
1359    let values = match &stmt.source {
1360        InsertSource::Values(rows) => Some(rows.as_slice()),
1361        InsertSource::Select(_) => None,
1362    };
1363    let sel_rows = select_rows.as_deref();
1364
1365    let total = match (values, sel_rows) {
1366        (Some(rows), _) => rows.len(),
1367        (_, Some(rows)) => rows.len(),
1368        _ => 0,
1369    };
1370
1371    if let Some(sel) = sel_rows {
1372        if !sel.is_empty() && sel[0].len() != insert_columns.len() {
1373            return Err(SqlError::InvalidValue(format!(
1374                "INSERT ... SELECT column count mismatch: expected {}, got {}",
1375                insert_columns.len(),
1376                sel[0].len()
1377            )));
1378        }
1379    }
1380
1381    let skip_row_clear = cache.is_some_and(|c| c.row_fully_overwritten);
1382    for idx in 0..total {
1383        if !skip_row_clear {
1384            for v in bufs.row.iter_mut() {
1385                *v = Value::Null;
1386            }
1387        }
1388
1389        if let Some(value_rows) = values {
1390            if let Some(plan) = cache.and_then(|c| c.bind_plan.as_ref()) {
1391                for action in plan {
1392                    match action {
1393                        BindAction::Param {
1394                            param_idx,
1395                            col_idx,
1396                            target,
1397                        } => {
1398                            let v = &params[*param_idx];
1399                            bufs.row[*col_idx] = if v.is_null() {
1400                                Value::Null
1401                            } else if v.data_type() == *target {
1402                                v.clone()
1403                            } else {
1404                                let got = v.data_type();
1405                                v.clone().coerce_into(*target).ok_or_else(|| {
1406                                    SqlError::TypeMismatch {
1407                                        expected: target.to_string(),
1408                                        got: got.to_string(),
1409                                    }
1410                                })?
1411                            };
1412                        }
1413                        BindAction::Literal { value, col_idx } => {
1414                            bufs.row[*col_idx] = value.clone();
1415                        }
1416                    }
1417                }
1418            } else {
1419                let value_row = &value_rows[idx];
1420                if value_row.len() != insert_columns.len() {
1421                    return Err(SqlError::InvalidValue(format!(
1422                        "expected {} values, got {}",
1423                        insert_columns.len(),
1424                        value_row.len()
1425                    )));
1426                }
1427                for (i, expr) in value_row.iter().enumerate() {
1428                    let val = match expr {
1429                        Expr::Parameter(n) => params
1430                            .get(n - 1)
1431                            .cloned()
1432                            .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
1433                        Expr::Literal(v) => v.clone(),
1434                        _ => eval_const_expr(expr)?,
1435                    };
1436                    let col_idx = bufs.col_indices[i];
1437                    let col = &table_schema.columns[col_idx];
1438                    let got_type = val.data_type();
1439                    bufs.row[col_idx] = if val.is_null() {
1440                        Value::Null
1441                    } else {
1442                        val.coerce_into(col.data_type)
1443                            .ok_or_else(|| SqlError::TypeMismatch {
1444                                expected: col.data_type.to_string(),
1445                                got: got_type.to_string(),
1446                            })?
1447                    };
1448                }
1449            }
1450        } else if let Some(sel) = sel_rows {
1451            let sel_row = &sel[idx];
1452            for (i, val) in sel_row.iter().enumerate() {
1453                let col_idx = bufs.col_indices[i];
1454                let col = &table_schema.columns[col_idx];
1455                let got_type = val.data_type();
1456                bufs.row[col_idx] = if val.is_null() {
1457                    Value::Null
1458                } else {
1459                    val.clone().coerce_into(col.data_type).ok_or_else(|| {
1460                        SqlError::TypeMismatch {
1461                            expected: col.data_type.to_string(),
1462                            got: got_type.to_string(),
1463                        }
1464                    })?
1465                };
1466            }
1467        }
1468
1469        if has_defaults {
1470            for &(pos, def_expr) in &defaults {
1471                let val = eval_const_expr(def_expr)?;
1472                let col = &table_schema.columns[pos];
1473                if !val.is_null() {
1474                    let got_type = val.data_type();
1475                    bufs.row[pos] =
1476                        val.coerce_into(col.data_type)
1477                            .ok_or_else(|| SqlError::TypeMismatch {
1478                                expected: col.data_type.to_string(),
1479                                got: got_type.to_string(),
1480                            })?;
1481                }
1482            }
1483        }
1484
1485        if let Some(gen_map) = row_col_map_for_gen {
1486            if cache.is_some() {
1487                for (pos, fast) in cached_gen_positions
1488                    .iter()
1489                    .copied()
1490                    .zip(cached_gen_fast_evals.iter())
1491                {
1492                    let gen_expr = table_schema.columns[pos].generated_expr.as_ref().unwrap();
1493                    let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1494                    let col = &table_schema.columns[pos];
1495                    bufs.row[pos] = if val.is_null() {
1496                        Value::Null
1497                    } else {
1498                        let got_type = val.data_type();
1499                        val.coerce_into(col.data_type)
1500                            .ok_or_else(|| SqlError::TypeMismatch {
1501                                expected: col.data_type.to_string(),
1502                                got: got_type.to_string(),
1503                            })?
1504                    };
1505                }
1506            } else {
1507                for (pos, gen_expr, fast) in &generated_cols_uncached {
1508                    let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1509                    let col = &table_schema.columns[*pos];
1510                    bufs.row[*pos] = if val.is_null() {
1511                        Value::Null
1512                    } else {
1513                        let got_type = val.data_type();
1514                        val.coerce_into(col.data_type)
1515                            .ok_or_else(|| SqlError::TypeMismatch {
1516                                expected: col.data_type.to_string(),
1517                                got: got_type.to_string(),
1518                            })?
1519                    };
1520                }
1521            }
1522        }
1523
1524        if let Some(c) = cache {
1525            for &pos in &c.not_null_indices {
1526                if bufs.row[pos as usize].is_null() {
1527                    return Err(SqlError::NotNullViolation(
1528                        table_schema.columns[pos as usize].name.clone(),
1529                    ));
1530                }
1531            }
1532        } else {
1533            for col in &table_schema.columns {
1534                if !col.nullable && bufs.row[col.position as usize].is_null() {
1535                    return Err(SqlError::NotNullViolation(col.name.clone()));
1536                }
1537            }
1538        }
1539
1540        if let Some(ref col_map) = check_col_map {
1541            for col in &table_schema.columns {
1542                if let Some(ref check) = col.check_expr {
1543                    let result = eval_expr(check, &EvalCtx::new(col_map, &bufs.row))?;
1544                    if !is_truthy(&result) && !result.is_null() {
1545                        let name = col.check_name.as_deref().unwrap_or(&col.name);
1546                        return Err(SqlError::CheckViolation(name.to_string()));
1547                    }
1548                }
1549            }
1550            for tc in &table_schema.check_constraints {
1551                let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &bufs.row))?;
1552                if !is_truthy(&result) && !result.is_null() {
1553                    let name = tc.name.as_deref().unwrap_or(&tc.sql);
1554                    return Err(SqlError::CheckViolation(name.to_string()));
1555                }
1556            }
1557        }
1558
1559        if has_fks {
1560            for fk in &table_schema.foreign_keys {
1561                let any_null = fk.columns.iter().any(|&ci| bufs.row[ci as usize].is_null());
1562                if any_null {
1563                    continue;
1564                }
1565                crate::encoding::encode_composite_key_from_indices(
1566                    &fk.columns,
1567                    &bufs.row,
1568                    &mut bufs.fk_key_buf,
1569                );
1570                let found = wtx
1571                    .table_get(fk.foreign_table.as_bytes(), &bufs.fk_key_buf)
1572                    .map_err(SqlError::Storage)?;
1573                if found.is_none() {
1574                    let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
1575                    return Err(SqlError::ForeignKeyViolation(name.to_string()));
1576                }
1577            }
1578        }
1579
1580        let proposed_row_for_returning: Option<Vec<Value>> =
1581            returning_rows.as_ref().map(|_| bufs.row.clone());
1582
1583        for (j, &i) in pk_indices.iter().enumerate() {
1584            bufs.pk_values[j] = std::mem::replace(&mut bufs.row[i], Value::Null);
1585        }
1586        match cache.map(|c| c.single_int_pk).unwrap_or(false) {
1587            true => match bufs.pk_values[0] {
1588                Value::Integer(v) => crate::encoding::encode_int_key_into(v, &mut bufs.key_buf),
1589                _ => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1590            },
1591            false => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1592        }
1593
1594        for &slot in dropped {
1595            bufs.value_values[slot as usize] = Value::Null;
1596        }
1597        for (j, &i) in non_pk.iter().enumerate() {
1598            let col = &table_schema.columns[i];
1599            if matches!(
1600                col.generated_kind,
1601                Some(crate::parser::GeneratedKind::Virtual)
1602            ) {
1603                bufs.value_values[enc_pos[j] as usize] = Value::Null;
1604                bufs.row[i] = Value::Null;
1605            } else {
1606                bufs.value_values[enc_pos[j] as usize] =
1607                    std::mem::replace(&mut bufs.row[i], Value::Null);
1608            }
1609        }
1610        match cache.and_then(|c| c.row_encoder.as_ref()) {
1611            Some(tmpl) => crate::encoding::encode_int_row_with_template(
1612                tmpl,
1613                &bufs.value_values,
1614                &mut bufs.value_buf,
1615            )?,
1616            None => encode_row_into(&bufs.value_values, &mut bufs.value_buf),
1617        }
1618
1619        if bufs.key_buf.len() > citadel_core::MAX_KEY_SIZE {
1620            return Err(SqlError::KeyTooLarge {
1621                size: bufs.key_buf.len(),
1622                max: citadel_core::MAX_KEY_SIZE,
1623            });
1624        }
1625        if bufs.value_buf.len() > citadel_core::MAX_VALUE_SIZE {
1626            return Err(SqlError::RowTooLarge {
1627                size: bufs.value_buf.len(),
1628                max: citadel_core::MAX_VALUE_SIZE,
1629            });
1630        }
1631
1632        match compiled_conflict.as_ref() {
1633            None => {
1634                let is_new = wtx
1635                    .table_insert(table_bytes, &bufs.key_buf, &bufs.value_buf)
1636                    .map_err(SqlError::Storage)?;
1637                if !is_new {
1638                    return Err(SqlError::DuplicateKey);
1639                }
1640                if has_indices {
1641                    for (j, &i) in pk_indices.iter().enumerate() {
1642                        bufs.row[i] = bufs.pk_values[j].clone();
1643                    }
1644                    for (j, &i) in non_pk.iter().enumerate() {
1645                        bufs.row[i] = std::mem::replace(
1646                            &mut bufs.value_values[enc_pos[j] as usize],
1647                            Value::Null,
1648                        );
1649                    }
1650                    insert_index_entries(wtx, table_schema, &bufs.row, &bufs.pk_values)?;
1651                }
1652                count += 1;
1653                if let Some(buf) = returning_rows.as_mut() {
1654                    buf.push((None, proposed_row_for_returning));
1655                }
1656            }
1657            Some(oc) => {
1658                let oc_ref: &CompiledOnConflict = oc;
1659                let needs_row = upsert_needs_row(oc_ref, table_schema);
1660                if needs_row {
1661                    for (j, &i) in pk_indices.iter().enumerate() {
1662                        bufs.row[i] = bufs.pk_values[j].clone();
1663                    }
1664                    for (j, &i) in non_pk.iter().enumerate() {
1665                        bufs.row[i] = std::mem::replace(
1666                            &mut bufs.value_values[enc_pos[j] as usize],
1667                            Value::Null,
1668                        );
1669                    }
1670                }
1671                let outcome = apply_insert_with_conflict(
1672                    wtx,
1673                    table_schema,
1674                    &bufs.key_buf,
1675                    &bufs.value_buf,
1676                    &bufs.row,
1677                    &bufs.pk_values,
1678                    oc_ref,
1679                    row_col_map.unwrap(),
1680                    stmt.returning.is_some(),
1681                )?;
1682                match outcome {
1683                    InsertRowOutcome::Inserted => {
1684                        count += 1;
1685                        if let Some(buf) = returning_rows.as_mut() {
1686                            buf.push((None, proposed_row_for_returning));
1687                        }
1688                    }
1689                    InsertRowOutcome::Updated { old, new } => {
1690                        count += 1;
1691                        if let Some(buf) = returning_rows.as_mut() {
1692                            buf.push((Some(old), Some(new)));
1693                        }
1694                    }
1695                    InsertRowOutcome::Skipped => {}
1696                }
1697            }
1698        }
1699    }
1700
1701    if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
1702        return Ok(ExecutionResult::Query(super::helpers::project_returning(
1703            table_schema,
1704            returning_cols,
1705            &rows,
1706        )?));
1707    }
1708
1709    Ok(ExecutionResult::RowsAffected(count))
1710}
1711
1712pub struct CompiledInsert {
1713    table_lower: String,
1714    cached: Option<InsertCache>,
1715}
1716
1717struct InsertCache {
1718    col_indices: Vec<usize>,
1719    has_subquery: bool,
1720    any_defaults: bool,
1721    has_checks: bool,
1722    on_conflict: Option<Arc<CompiledOnConflict>>,
1723    row_col_map: Option<ColumnMap>,
1724    generated_col_positions: Vec<usize>,
1725    generated_fast_evals: Vec<FastGenEval>,
1726    pk_indices: Vec<usize>,
1727    non_pk_indices: Vec<usize>,
1728    encoding_positions: Vec<u16>,
1729    dropped_non_pk_slots: Vec<u16>,
1730    phys_count: usize,
1731    single_int_pk: bool,
1732    not_null_indices: Vec<u16>,
1733    bind_plan: Option<Vec<BindAction>>,
1734    row_fully_overwritten: bool,
1735    row_encoder: Option<crate::encoding::IntRowTemplate>,
1736    is_trivial_fast: bool,
1737    trivial_fast_program: Option<TrivialFastProgram>,
1738}
1739
1740#[derive(Clone)]
1741enum BindAction {
1742    Param {
1743        param_idx: usize,
1744        col_idx: usize,
1745        target: DataType,
1746    },
1747    Literal {
1748        value: Value,
1749        col_idx: usize,
1750    },
1751}
1752
1753#[derive(Clone)]
1754struct TrivialFastProgram {
1755    template: Vec<u8>,
1756    ops: Vec<WriteOp>,
1757    pk_param: u8,
1758    not_null_param_indices: Vec<u8>,
1759}
1760
1761#[derive(Clone)]
1762enum WriteOp {
1763    ParamI64 {
1764        param_idx: u8,
1765        off: u32,
1766    },
1767    LiteralI64 {
1768        value: i64,
1769        off: u32,
1770    },
1771    GenAddParamsI64 {
1772        a_param: u8,
1773        b_param: u8,
1774        off: u32,
1775        bitmap_byte_off: u32,
1776        bitmap_bit_mask: u8,
1777    },
1778    GenMulAddParamI64 {
1779        param_idx: u8,
1780        mul: i64,
1781        add: i64,
1782        off: u32,
1783        bitmap_byte_off: u32,
1784        bitmap_bit_mask: u8,
1785    },
1786}
1787
1788fn build_trivial_fast_program(
1789    bind_plan: &[BindAction],
1790    row_encoder: &crate::encoding::IntRowTemplate,
1791    non_virtual_pairs: &[(usize, usize)],
1792    generated_col_positions: &[usize],
1793    generated_fast_evals: &[FastGenEval],
1794    pk_indices: &[usize],
1795    columns: &[crate::types::ColumnDef],
1796) -> Option<TrivialFastProgram> {
1797    let pk_col = pk_indices[0];
1798    let col_to_slot: rustc_hash::FxHashMap<usize, usize> =
1799        non_virtual_pairs.iter().copied().collect();
1800    let slot_to_off: rustc_hash::FxHashMap<usize, usize> =
1801        row_encoder.slot_offsets.iter().copied().collect();
1802
1803    let mut col_to_param: rustc_hash::FxHashMap<usize, u8> = Default::default();
1804    let mut col_to_lit_int: rustc_hash::FxHashMap<usize, i64> = Default::default();
1805    let mut pk_param: Option<u8> = None;
1806    let mut ops: Vec<WriteOp> = Vec::with_capacity(bind_plan.len() + generated_col_positions.len());
1807    let mut not_null_param_indices: Vec<u8> = Vec::new();
1808
1809    for action in bind_plan {
1810        match action {
1811            BindAction::Param {
1812                param_idx,
1813                col_idx,
1814                target,
1815            } => {
1816                if *target != DataType::Integer {
1817                    return None;
1818                }
1819                let pi: u8 = u8::try_from(*param_idx).ok()?;
1820                col_to_param.insert(*col_idx, pi);
1821                if *col_idx == pk_col {
1822                    pk_param = Some(pi);
1823                } else {
1824                    let slot = *col_to_slot.get(col_idx)?;
1825                    let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
1826                    ops.push(WriteOp::ParamI64 { param_idx: pi, off });
1827                    if !columns[*col_idx].nullable {
1828                        not_null_param_indices.push(pi);
1829                    }
1830                }
1831            }
1832            BindAction::Literal { value, col_idx } => match value {
1833                Value::Integer(v) => {
1834                    col_to_lit_int.insert(*col_idx, *v);
1835                    if *col_idx == pk_col {
1836                        return None;
1837                    }
1838                    let slot = *col_to_slot.get(col_idx)?;
1839                    let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
1840                    ops.push(WriteOp::LiteralI64 { value: *v, off });
1841                }
1842                _ => return None,
1843            },
1844        }
1845    }
1846
1847    let pk_param = pk_param?;
1848
1849    for (i, &gen_pos) in generated_col_positions.iter().enumerate() {
1850        let gen_slot = *col_to_slot.get(&gen_pos)?;
1851        let gen_off = u32::try_from(*slot_to_off.get(&gen_slot)?).ok()?;
1852        let bitmap_byte_off = u32::try_from(2 + gen_slot / 8).ok()?;
1853        let bitmap_bit_mask: u8 = 1u8 << (gen_slot % 8);
1854        let gen_col_nullable = columns[gen_pos].nullable;
1855
1856        match &generated_fast_evals[i] {
1857            FastGenEval::IntColAddCol {
1858                left_idx,
1859                right_idx,
1860            } => {
1861                let a_param = col_to_param.get(left_idx).copied();
1862                let b_param = col_to_param.get(right_idx).copied();
1863                match (a_param, b_param) {
1864                    (Some(ap), Some(bp)) => {
1865                        let deps_safe = gen_col_nullable
1866                            || (not_null_param_indices.contains(&ap)
1867                                && not_null_param_indices.contains(&bp));
1868                        if !deps_safe {
1869                            return None;
1870                        }
1871                        ops.push(WriteOp::GenAddParamsI64 {
1872                            a_param: ap,
1873                            b_param: bp,
1874                            off: gen_off,
1875                            bitmap_byte_off,
1876                            bitmap_bit_mask,
1877                        });
1878                    }
1879                    (Some(p), None) => {
1880                        let lit = col_to_lit_int.get(right_idx).copied()?;
1881                        if !gen_col_nullable && !not_null_param_indices.contains(&p) {
1882                            return None;
1883                        }
1884                        ops.push(WriteOp::GenMulAddParamI64 {
1885                            param_idx: p,
1886                            mul: 1,
1887                            add: lit,
1888                            off: gen_off,
1889                            bitmap_byte_off,
1890                            bitmap_bit_mask,
1891                        });
1892                    }
1893                    (None, Some(p)) => {
1894                        let lit = col_to_lit_int.get(left_idx).copied()?;
1895                        if !gen_col_nullable && !not_null_param_indices.contains(&p) {
1896                            return None;
1897                        }
1898                        ops.push(WriteOp::GenMulAddParamI64 {
1899                            param_idx: p,
1900                            mul: 1,
1901                            add: lit,
1902                            off: gen_off,
1903                            bitmap_byte_off,
1904                            bitmap_bit_mask,
1905                        });
1906                    }
1907                    (None, None) => {
1908                        let la = col_to_lit_int.get(left_idx).copied()?;
1909                        let lb = col_to_lit_int.get(right_idx).copied()?;
1910                        ops.push(WriteOp::LiteralI64 {
1911                            value: la.wrapping_add(lb),
1912                            off: gen_off,
1913                        });
1914                    }
1915                }
1916            }
1917            FastGenEval::IntColMulAdd {
1918                col_schema_idx,
1919                mul,
1920                add,
1921            } => {
1922                if let Some(p) = col_to_param.get(col_schema_idx).copied() {
1923                    if !gen_col_nullable && !not_null_param_indices.contains(&p) {
1924                        return None;
1925                    }
1926                    ops.push(WriteOp::GenMulAddParamI64 {
1927                        param_idx: p,
1928                        mul: *mul,
1929                        add: *add,
1930                        off: gen_off,
1931                        bitmap_byte_off,
1932                        bitmap_bit_mask,
1933                    });
1934                } else if let Some(lit) = col_to_lit_int.get(col_schema_idx).copied() {
1935                    ops.push(WriteOp::LiteralI64 {
1936                        value: lit.wrapping_mul(*mul).wrapping_add(*add),
1937                        off: gen_off,
1938                    });
1939                } else {
1940                    return None;
1941                }
1942            }
1943            FastGenEval::None => return None,
1944        }
1945    }
1946
1947    Some(TrivialFastProgram {
1948        template: row_encoder.template.clone(),
1949        ops,
1950        pk_param,
1951        not_null_param_indices,
1952    })
1953}
1954
1955#[derive(Clone)]
1956pub(super) enum CompiledOnConflict {
1957    DoNothing {
1958        target: Option<ConflictKind>,
1959    },
1960    DoUpdate {
1961        target: ConflictKind,
1962        assignments: Vec<(usize, Expr)>,
1963        where_clause: Option<Expr>,
1964        fast_paths: Option<Vec<DoUpdateFastPath>>,
1965    },
1966}
1967
1968#[derive(Clone, Copy)]
1969pub(super) enum DoUpdateFastPath {
1970    IntAddConst { phys_idx: usize, delta: i64 },
1971}
1972
1973#[derive(Clone, Debug)]
1974pub(super) enum ConflictKind {
1975    PrimaryKey,
1976    UniqueIndex { index_idx: usize },
1977}
1978
1979fn resolve_conflict_target(target: &ConflictTarget, ts: &TableSchema) -> Result<ConflictKind> {
1980    match target {
1981        ConflictTarget::Columns(cols) => {
1982            let col_idx_set: Vec<u16> = cols
1983                .iter()
1984                .map(|name| {
1985                    ts.column_index(name)
1986                        .map(|i| i as u16)
1987                        .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
1988                })
1989                .collect::<Result<_>>()?;
1990            let pk_set = ts.primary_key_columns.clone();
1991            if set_equal(&col_idx_set, &pk_set) {
1992                return Ok(ConflictKind::PrimaryKey);
1993            }
1994            for (index_idx, idx) in ts.indices.iter().enumerate() {
1995                if idx.unique && set_equal(&col_idx_set, &idx.columns) {
1996                    return Ok(ConflictKind::UniqueIndex { index_idx });
1997                }
1998            }
1999            Err(SqlError::Plan(
2000                "ON CONFLICT target does not match any unique constraint".into(),
2001            ))
2002        }
2003        ConflictTarget::Constraint(name) => {
2004            let lower = name.to_ascii_lowercase();
2005            for (index_idx, idx) in ts.indices.iter().enumerate() {
2006                if idx.name.eq_ignore_ascii_case(&lower) {
2007                    if idx.unique {
2008                        return Ok(ConflictKind::UniqueIndex { index_idx });
2009                    }
2010                    return Err(SqlError::Plan(format!(
2011                        "ON CONFLICT ON CONSTRAINT '{name}' requires a unique index"
2012                    )));
2013                }
2014            }
2015            Err(SqlError::Plan(format!(
2016                "unknown constraint '{name}'; primary keys cannot be referenced by name, use ON CONFLICT (col_list)"
2017            )))
2018        }
2019    }
2020}
2021
2022fn set_equal(a: &[u16], b: &[u16]) -> bool {
2023    if a.len() != b.len() {
2024        return false;
2025    }
2026    let mut a_sorted = a.to_vec();
2027    let mut b_sorted = b.to_vec();
2028    a_sorted.sort_unstable();
2029    b_sorted.sort_unstable();
2030    a_sorted == b_sorted
2031}
2032
2033pub(super) enum InsertRowOutcome {
2034    Inserted,
2035    Updated { old: Vec<Value>, new: Vec<Value> },
2036    Skipped,
2037}
2038
2039#[allow(clippy::too_many_arguments)]
2040#[inline]
2041pub(super) fn apply_insert_with_conflict(
2042    wtx: &mut WriteTxn<'_>,
2043    table_schema: &TableSchema,
2044    key_buf: &[u8],
2045    value_buf: &[u8],
2046    row: &[Value],
2047    pk_values: &[Value],
2048    on_conflict: &CompiledOnConflict,
2049    col_map: &ColumnMap,
2050    capture_returning: bool,
2051) -> Result<InsertRowOutcome> {
2052    let table_bytes = table_schema.name.as_bytes();
2053
2054    if let CompiledOnConflict::DoNothing { target } = on_conflict {
2055        let pk_target = matches!(target, None | Some(ConflictKind::PrimaryKey));
2056        if pk_target && table_schema.indices.is_empty() && table_schema.foreign_keys.is_empty() {
2057            let inserted = wtx
2058                .table_insert_if_absent(table_bytes, key_buf, value_buf)
2059                .map_err(SqlError::Storage)?;
2060            return Ok(if inserted {
2061                InsertRowOutcome::Inserted
2062            } else {
2063                InsertRowOutcome::Skipped
2064            });
2065        }
2066    }
2067
2068    if let CompiledOnConflict::DoUpdate {
2069        target: ConflictKind::PrimaryKey,
2070        assignments,
2071        where_clause,
2072        fast_paths,
2073    } = on_conflict
2074    {
2075        if can_fuse_do_update(table_schema, assignments) {
2076            return apply_do_update_fused(
2077                wtx,
2078                table_schema,
2079                table_bytes,
2080                key_buf,
2081                value_buf,
2082                row,
2083                assignments,
2084                where_clause.as_ref(),
2085                col_map,
2086                fast_paths.as_deref(),
2087                capture_returning,
2088            );
2089        }
2090    }
2091
2092    let primary_outcome = wtx
2093        .table_insert_or_fetch(table_bytes, key_buf, value_buf)
2094        .map_err(SqlError::Storage)?;
2095
2096    match primary_outcome {
2097        citadel_txn::write_txn::InsertOutcome::Inserted => {
2098            if table_schema.indices.is_empty() {
2099                return Ok(InsertRowOutcome::Inserted);
2100            }
2101            let mut inserted_keys: Vec<(usize, Vec<u8>)> = Vec::new();
2102            match insert_index_entries_or_fetch(
2103                wtx,
2104                table_schema,
2105                row,
2106                pk_values,
2107                &mut inserted_keys,
2108            )? {
2109                None => Ok(InsertRowOutcome::Inserted),
2110                Some(conflicting_idx) => {
2111                    let matches_target =
2112                        matches!(on_conflict, CompiledOnConflict::DoNothing { target: None })
2113                            || matches!(
2114                                on_conflict,
2115                                CompiledOnConflict::DoNothing {
2116                                    target: Some(ConflictKind::UniqueIndex { index_idx }),
2117                                } | CompiledOnConflict::DoUpdate {
2118                                    target: ConflictKind::UniqueIndex { index_idx },
2119                                    ..
2120                                } if *index_idx == conflicting_idx
2121                            );
2122                    undo_partial_insert(wtx, table_schema, key_buf, &inserted_keys)?;
2123                    if !matches_target {
2124                        return Err(SqlError::UniqueViolation(
2125                            table_schema.indices[conflicting_idx].name.clone(),
2126                        ));
2127                    }
2128                    match on_conflict {
2129                        CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2130                        CompiledOnConflict::DoUpdate {
2131                            assignments,
2132                            where_clause,
2133                            ..
2134                        } => {
2135                            let existing_pk =
2136                                fetch_unique_index_pk(wtx, table_schema, conflicting_idx, row)?;
2137                            apply_do_update(
2138                                wtx,
2139                                table_schema,
2140                                &existing_pk,
2141                                row,
2142                                assignments,
2143                                where_clause.as_ref(),
2144                                col_map,
2145                                capture_returning,
2146                            )
2147                        }
2148                    }
2149                }
2150            }
2151        }
2152        citadel_txn::write_txn::InsertOutcome::Existed(old_bytes) => {
2153            let matches_target = matches!(
2154                on_conflict,
2155                CompiledOnConflict::DoNothing { target: None }
2156                    | CompiledOnConflict::DoNothing {
2157                        target: Some(ConflictKind::PrimaryKey),
2158                    }
2159                    | CompiledOnConflict::DoUpdate {
2160                        target: ConflictKind::PrimaryKey,
2161                        ..
2162                    }
2163            );
2164            if !matches_target {
2165                return Err(SqlError::DuplicateKey);
2166            }
2167            match on_conflict {
2168                CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2169                CompiledOnConflict::DoUpdate {
2170                    assignments,
2171                    where_clause,
2172                    ..
2173                } => {
2174                    let old_row = decode_full_row(table_schema, key_buf, &old_bytes)?;
2175                    apply_do_update_with_old_row(
2176                        wtx,
2177                        table_schema,
2178                        key_buf,
2179                        &old_row,
2180                        row,
2181                        assignments,
2182                        where_clause.as_ref(),
2183                        col_map,
2184                        capture_returning,
2185                    )
2186                }
2187            }
2188        }
2189    }
2190}
2191
2192#[inline]
2193fn apply_fast_path_patch(
2194    old_bytes: &[u8],
2195    fast_paths: &[DoUpdateFastPath],
2196) -> Result<UpsertAction> {
2197    UPSERT_SCRATCH.with(|slot| {
2198        let mut bufs = slot.borrow_mut();
2199        bufs.new_value_buf.clear();
2200        bufs.new_value_buf.extend_from_slice(old_bytes);
2201
2202        let mut patch_scratch: Vec<u8> = Vec::new();
2203
2204        for fp in fast_paths {
2205            match fp {
2206                DoUpdateFastPath::IntAddConst { phys_idx, delta } => {
2207                    let decoded =
2208                        crate::encoding::decode_columns(&bufs.new_value_buf, &[*phys_idx])?;
2209                    let old_val = &decoded[0];
2210                    let new_val = match old_val {
2211                        Value::Integer(i) => Value::Integer(i.wrapping_add(*delta)),
2212                        Value::Null => Value::Null,
2213                        _ => {
2214                            return Err(SqlError::TypeMismatch {
2215                                expected: "INTEGER".into(),
2216                                got: old_val.data_type().to_string(),
2217                            });
2218                        }
2219                    };
2220                    if !crate::encoding::patch_column_in_place(
2221                        &mut bufs.new_value_buf,
2222                        *phys_idx,
2223                        &new_val,
2224                    )? {
2225                        patch_scratch.clear();
2226                        crate::encoding::patch_row_column(
2227                            &bufs.new_value_buf,
2228                            *phys_idx,
2229                            &new_val,
2230                            &mut patch_scratch,
2231                        )?;
2232                        std::mem::swap(&mut bufs.new_value_buf, &mut patch_scratch);
2233                    }
2234                }
2235            }
2236        }
2237
2238        if bufs.new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2239            return Err(SqlError::RowTooLarge {
2240                size: bufs.new_value_buf.len(),
2241                max: citadel_core::MAX_VALUE_SIZE,
2242            });
2243        }
2244
2245        Ok(UpsertAction::Replace(bufs.new_value_buf.clone()))
2246    })
2247}
2248
2249fn upsert_needs_row(oc: &CompiledOnConflict, ts: &TableSchema) -> bool {
2250    if !ts.indices.is_empty() {
2251        return true;
2252    }
2253    match oc {
2254        CompiledOnConflict::DoNothing { .. } => false,
2255        CompiledOnConflict::DoUpdate { fast_paths, .. } => fast_paths.is_none() || ts.has_checks(),
2256    }
2257}
2258
2259fn can_fuse_do_update(ts: &TableSchema, assignments: &[(usize, Expr)]) -> bool {
2260    if !ts.indices.is_empty() {
2261        return false;
2262    }
2263    if !ts.foreign_keys.is_empty() {
2264        return false;
2265    }
2266    if ts.columns.iter().any(|c| c.generated_kind.is_some()) {
2267        return false;
2268    }
2269    let pk = ts.pk_indices();
2270    !assignments.iter().any(|(ci, _)| pk.contains(ci))
2271}
2272
2273#[allow(clippy::too_many_arguments)]
2274#[inline]
2275fn apply_do_update_fused(
2276    wtx: &mut WriteTxn<'_>,
2277    table_schema: &TableSchema,
2278    table_bytes: &[u8],
2279    key_buf: &[u8],
2280    value_buf: &[u8],
2281    proposed_row: &[Value],
2282    assignments: &[(usize, Expr)],
2283    where_clause: Option<&Expr>,
2284    col_map: &ColumnMap,
2285    fast_paths: Option<&[DoUpdateFastPath]>,
2286    capture_returning: bool,
2287) -> Result<InsertRowOutcome> {
2288    let non_pk = table_schema.non_pk_indices();
2289    let enc_pos = table_schema.encoding_positions();
2290    let phys_count = table_schema.physical_non_pk_count();
2291    let dropped = table_schema.dropped_non_pk_slots();
2292    let has_checks = table_schema.has_checks();
2293    let has_fks = !table_schema.foreign_keys.is_empty();
2294
2295    let captured: std::cell::RefCell<Option<(Vec<Value>, Vec<Value>)>> =
2296        std::cell::RefCell::new(None);
2297
2298    let outcome =
2299        wtx.table_upsert_with::<_, SqlError>(table_bytes, key_buf, value_buf, |old_bytes| {
2300            if let Some(fps) = fast_paths {
2301                if !has_checks {
2302                    let action = apply_fast_path_patch(old_bytes, fps)?;
2303                    if capture_returning {
2304                        if let UpsertAction::Replace(ref new_bytes) = action {
2305                            let old_row = decode_full_row(table_schema, key_buf, old_bytes)?;
2306                            let new_row = decode_full_row(table_schema, key_buf, new_bytes)?;
2307                            *captured.borrow_mut() = Some((old_row, new_row));
2308                        }
2309                    }
2310                    return Ok(action);
2311                }
2312            }
2313            UPSERT_SCRATCH.with(|slot| {
2314                let mut bufs = slot.borrow_mut();
2315                let UpsertBufs {
2316                    old_row,
2317                    new_row,
2318                    value_values,
2319                    new_value_buf,
2320                } = &mut *bufs;
2321
2322                old_row.clear();
2323                old_row.resize(table_schema.columns.len(), Value::Null);
2324                decode_full_row_into(table_schema, key_buf, old_bytes, old_row)?;
2325
2326                if let Some(w) = where_clause {
2327                    let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2328                    let result = eval_expr(w, &ctx)?;
2329                    if result.is_null() || !is_truthy(&result) {
2330                        return Ok(UpsertAction::Skip);
2331                    }
2332                }
2333
2334                new_row.clear();
2335                new_row.extend_from_slice(old_row);
2336                for (col_idx, expr) in assignments {
2337                    let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2338                    let val = eval_expr(expr, &ctx)?;
2339                    let col = &table_schema.columns[*col_idx];
2340                    new_row[*col_idx] = if val.is_null() {
2341                        Value::Null
2342                    } else {
2343                        let got = val.data_type();
2344                        val.coerce_into(col.data_type)
2345                            .ok_or_else(|| SqlError::TypeMismatch {
2346                                expected: col.data_type.to_string(),
2347                                got: got.to_string(),
2348                            })?
2349                    };
2350                }
2351
2352                for (assigned_idx, _) in assignments {
2353                    let col = &table_schema.columns[*assigned_idx];
2354                    if !col.nullable && new_row[col.position as usize].is_null() {
2355                        return Err(SqlError::NotNullViolation(col.name.clone()));
2356                    }
2357                }
2358                if has_checks {
2359                    for col in &table_schema.columns {
2360                        if let Some(ref check) = col.check_expr {
2361                            let ctx = EvalCtx::new(col_map, new_row);
2362                            let result = eval_expr(check, &ctx)?;
2363                            if !is_truthy(&result) && !result.is_null() {
2364                                let name = col.check_name.as_deref().unwrap_or(&col.name);
2365                                return Err(SqlError::CheckViolation(name.to_string()));
2366                            }
2367                        }
2368                    }
2369                    for tc in &table_schema.check_constraints {
2370                        let ctx = EvalCtx::new(col_map, new_row);
2371                        let result = eval_expr(&tc.expr, &ctx)?;
2372                        if !is_truthy(&result) && !result.is_null() {
2373                            let name = tc.name.as_deref().unwrap_or(&tc.sql);
2374                            return Err(SqlError::CheckViolation(name.to_string()));
2375                        }
2376                    }
2377                }
2378                let _ = has_fks;
2379
2380                value_values.clear();
2381                value_values.resize(phys_count, Value::Null);
2382                for &slot in dropped {
2383                    value_values[slot as usize] = Value::Null;
2384                }
2385                for (j, &i) in non_pk.iter().enumerate() {
2386                    value_values[enc_pos[j] as usize] = new_row[i].clone();
2387                }
2388                new_value_buf.clear();
2389                crate::encoding::encode_row_into(value_values, new_value_buf);
2390
2391                if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2392                    return Err(SqlError::RowTooLarge {
2393                        size: new_value_buf.len(),
2394                        max: citadel_core::MAX_VALUE_SIZE,
2395                    });
2396                }
2397
2398                if capture_returning {
2399                    *captured.borrow_mut() = Some((old_row.clone(), new_row.clone()));
2400                }
2401                Ok(UpsertAction::Replace(new_value_buf.clone()))
2402            })
2403        })?;
2404
2405    match outcome {
2406        UpsertOutcome::Inserted => Ok(InsertRowOutcome::Inserted),
2407        UpsertOutcome::Updated => {
2408            if capture_returning {
2409                let (old, new) = captured.into_inner().ok_or_else(|| {
2410                    SqlError::InvalidValue("DO UPDATE produced no captured rows".into())
2411                })?;
2412                Ok(InsertRowOutcome::Updated { old, new })
2413            } else {
2414                Ok(InsertRowOutcome::Inserted)
2415            }
2416        }
2417        UpsertOutcome::Skipped => Ok(InsertRowOutcome::Skipped),
2418    }
2419}
2420
2421fn fetch_unique_index_pk(
2422    wtx: &mut WriteTxn<'_>,
2423    table_schema: &TableSchema,
2424    index_idx: usize,
2425    row: &[Value],
2426) -> Result<Vec<u8>> {
2427    let idx = &table_schema.indices[index_idx];
2428    let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2429    let indexed: Vec<Value> = idx
2430        .columns
2431        .iter()
2432        .map(|&col_idx| row[col_idx as usize].clone())
2433        .collect();
2434    let key = crate::encoding::encode_composite_key(&indexed);
2435    let value = wtx
2436        .table_get(&idx_table, &key)
2437        .map_err(SqlError::Storage)?
2438        .ok_or_else(|| {
2439            SqlError::InvalidValue("unique index missing expected collision entry".into())
2440        })?;
2441    Ok(value)
2442}
2443
2444#[allow(clippy::too_many_arguments)]
2445fn apply_do_update(
2446    wtx: &mut WriteTxn<'_>,
2447    table_schema: &TableSchema,
2448    pk_key: &[u8],
2449    proposed_row: &[Value],
2450    assignments: &[(usize, Expr)],
2451    where_clause: Option<&Expr>,
2452    col_map: &ColumnMap,
2453    capture_returning: bool,
2454) -> Result<InsertRowOutcome> {
2455    let old_value = wtx
2456        .table_get(table_schema.name.as_bytes(), pk_key)
2457        .map_err(SqlError::Storage)?
2458        .ok_or_else(|| SqlError::InvalidValue("primary row missing for DO UPDATE target".into()))?;
2459    let old_row = decode_full_row(table_schema, pk_key, &old_value)?;
2460    apply_do_update_with_old_row(
2461        wtx,
2462        table_schema,
2463        pk_key,
2464        &old_row,
2465        proposed_row,
2466        assignments,
2467        where_clause,
2468        col_map,
2469        capture_returning,
2470    )
2471}
2472
2473#[allow(clippy::too_many_arguments)]
2474fn apply_do_update_with_old_row(
2475    wtx: &mut WriteTxn<'_>,
2476    table_schema: &TableSchema,
2477    old_pk_key: &[u8],
2478    old_row: &[Value],
2479    proposed_row: &[Value],
2480    assignments: &[(usize, Expr)],
2481    where_clause: Option<&Expr>,
2482    col_map: &ColumnMap,
2483    capture_returning: bool,
2484) -> Result<InsertRowOutcome> {
2485    if let Some(w) = where_clause {
2486        let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2487        let result = eval_expr(w, &ctx)?;
2488        if result.is_null() || !is_truthy(&result) {
2489            return Ok(InsertRowOutcome::Skipped);
2490        }
2491    }
2492
2493    let mut new_row = old_row.to_vec();
2494    for (col_idx, expr) in assignments {
2495        let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2496        let val = eval_expr(expr, &ctx)?;
2497        let col = &table_schema.columns[*col_idx];
2498        new_row[*col_idx] = if val.is_null() {
2499            Value::Null
2500        } else {
2501            let got = val.data_type();
2502            val.coerce_into(col.data_type)
2503                .ok_or_else(|| SqlError::TypeMismatch {
2504                    expected: col.data_type.to_string(),
2505                    got: got.to_string(),
2506                })?
2507        };
2508    }
2509
2510    for col in &table_schema.columns {
2511        if matches!(
2512            col.generated_kind,
2513            Some(crate::parser::GeneratedKind::Stored)
2514        ) {
2515            let val = eval_expr(
2516                col.generated_expr.as_ref().unwrap(),
2517                &EvalCtx::new(col_map, &new_row),
2518            )?;
2519            let pos = col.position as usize;
2520            new_row[pos] = if val.is_null() {
2521                if !col.nullable {
2522                    return Err(SqlError::NotNullViolation(col.name.clone()));
2523                }
2524                Value::Null
2525            } else {
2526                let got = val.data_type();
2527                val.coerce_into(col.data_type)
2528                    .ok_or_else(|| SqlError::TypeMismatch {
2529                        expected: col.data_type.to_string(),
2530                        got: got.to_string(),
2531                    })?
2532            };
2533        }
2534    }
2535
2536    let pk_indices = table_schema.pk_indices();
2537    let assigned_pk = assignments.iter().any(|(ci, _)| pk_indices.contains(ci));
2538    let pk_changed = assigned_pk && pk_indices.iter().any(|&i| old_row[i] != new_row[i]);
2539
2540    for (assigned_idx, _) in assignments {
2541        let col = &table_schema.columns[*assigned_idx];
2542        if !col.nullable && new_row[col.position as usize].is_null() {
2543            return Err(SqlError::NotNullViolation(col.name.clone()));
2544        }
2545    }
2546    if table_schema.has_checks() {
2547        for col in &table_schema.columns {
2548            if let Some(ref check) = col.check_expr {
2549                let ctx = EvalCtx::new(col_map, &new_row);
2550                let result = eval_expr(check, &ctx)?;
2551                if !is_truthy(&result) && !result.is_null() {
2552                    let name = col.check_name.as_deref().unwrap_or(&col.name);
2553                    return Err(SqlError::CheckViolation(name.to_string()));
2554                }
2555            }
2556        }
2557        for tc in &table_schema.check_constraints {
2558            let ctx = EvalCtx::new(col_map, &new_row);
2559            let result = eval_expr(&tc.expr, &ctx)?;
2560            if !is_truthy(&result) && !result.is_null() {
2561                let name = tc.name.as_deref().unwrap_or(&tc.sql);
2562                return Err(SqlError::CheckViolation(name.to_string()));
2563            }
2564        }
2565    }
2566    for fk in &table_schema.foreign_keys {
2567        let changed = fk
2568            .columns
2569            .iter()
2570            .any(|&ci| old_row[ci as usize] != new_row[ci as usize]);
2571        if !changed {
2572            continue;
2573        }
2574        let any_null = fk.columns.iter().any(|&ci| new_row[ci as usize].is_null());
2575        if any_null {
2576            continue;
2577        }
2578        let fk_vals: Vec<Value> = fk
2579            .columns
2580            .iter()
2581            .map(|&ci| new_row[ci as usize].clone())
2582            .collect();
2583        let fk_key = crate::encoding::encode_composite_key(&fk_vals);
2584        let found = wtx
2585            .table_get(fk.foreign_table.as_bytes(), &fk_key)
2586            .map_err(SqlError::Storage)?;
2587        if found.is_none() {
2588            let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
2589            return Err(SqlError::ForeignKeyViolation(name.to_string()));
2590        }
2591    }
2592
2593    let has_indices = !table_schema.indices.is_empty();
2594    let old_pk_values: Vec<Value> = if has_indices || pk_changed {
2595        pk_indices.iter().map(|&i| old_row[i].clone()).collect()
2596    } else {
2597        Vec::new()
2598    };
2599    let new_pk_values: Vec<Value> = if has_indices || pk_changed {
2600        pk_indices.iter().map(|&i| new_row[i].clone()).collect()
2601    } else {
2602        Vec::new()
2603    };
2604
2605    let non_pk = table_schema.non_pk_indices();
2606    let enc_pos = table_schema.encoding_positions();
2607    let phys_count = table_schema.physical_non_pk_count();
2608    let dropped = table_schema.dropped_non_pk_slots();
2609    let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
2610    for &slot in dropped {
2611        value_values[slot as usize] = Value::Null;
2612    }
2613    for (j, &i) in non_pk.iter().enumerate() {
2614        let col = &table_schema.columns[i];
2615        value_values[enc_pos[j] as usize] = if matches!(
2616            col.generated_kind,
2617            Some(crate::parser::GeneratedKind::Virtual)
2618        ) {
2619            Value::Null
2620        } else {
2621            new_row[i].clone()
2622        };
2623    }
2624    let mut new_value_buf = Vec::with_capacity(256);
2625    crate::encoding::encode_row_into(&value_values, &mut new_value_buf);
2626
2627    if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2628        return Err(SqlError::RowTooLarge {
2629            size: new_value_buf.len(),
2630            max: citadel_core::MAX_VALUE_SIZE,
2631        });
2632    }
2633
2634    if pk_changed {
2635        let new_pk_key = crate::encoding::encode_composite_key(&new_pk_values);
2636        let inserted = wtx
2637            .table_insert(table_schema.name.as_bytes(), &new_pk_key, &new_value_buf)
2638            .map_err(SqlError::Storage)?;
2639        if !inserted {
2640            return Err(SqlError::DuplicateKey);
2641        }
2642        wtx.table_delete(table_schema.name.as_bytes(), old_pk_key)
2643            .map_err(SqlError::Storage)?;
2644        for idx in &table_schema.indices {
2645            let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2646            let old_idx_key = encode_index_key(idx, old_row, &old_pk_values);
2647            wtx.table_delete(&idx_table, &old_idx_key)
2648                .map_err(SqlError::Storage)?;
2649            let new_idx_key = encode_index_key(idx, &new_row, &new_pk_values);
2650            let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
2651            let is_new = wtx
2652                .table_insert(&idx_table, &new_idx_key, &new_idx_val)
2653                .map_err(SqlError::Storage)?;
2654            if idx.unique && !is_new {
2655                let any_null = idx.columns.iter().any(|&c| new_row[c as usize].is_null());
2656                if !any_null {
2657                    return Err(SqlError::UniqueViolation(idx.name.clone()));
2658                }
2659            }
2660        }
2661    } else {
2662        wtx.table_update_sorted(
2663            table_schema.name.as_bytes(),
2664            &[(old_pk_key, new_value_buf.as_slice())],
2665        )
2666        .map_err(SqlError::Storage)?;
2667        let col_map_partial =
2668            any_partial_index(table_schema).then(|| ColumnMap::new(&table_schema.columns));
2669        for idx in &table_schema.indices {
2670            let cols_changed = index_columns_changed(idx, old_row, &new_row);
2671            let (del, ins) = partial_idx_update_actions(
2672                idx,
2673                old_row,
2674                &new_row,
2675                cols_changed,
2676                false,
2677                col_map_partial.as_ref(),
2678            );
2679            let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2680            if del {
2681                let old_idx_key = encode_index_key(idx, old_row, &old_pk_values);
2682                wtx.table_delete(&idx_table, &old_idx_key)
2683                    .map_err(SqlError::Storage)?;
2684            }
2685            if ins {
2686                let new_idx_key = encode_index_key(idx, &new_row, &new_pk_values);
2687                let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
2688                let is_new = wtx
2689                    .table_insert(&idx_table, &new_idx_key, &new_idx_val)
2690                    .map_err(SqlError::Storage)?;
2691                if idx.unique && !is_new {
2692                    let any_null = idx.columns.iter().any(|&c| new_row[c as usize].is_null());
2693                    if !any_null {
2694                        return Err(SqlError::UniqueViolation(idx.name.clone()));
2695                    }
2696                }
2697            }
2698        }
2699    }
2700
2701    if capture_returning {
2702        Ok(InsertRowOutcome::Updated {
2703            old: old_row.to_vec(),
2704            new: new_row,
2705        })
2706    } else {
2707        Ok(InsertRowOutcome::Inserted)
2708    }
2709}
2710
2711fn detect_fast_paths(
2712    ts: &TableSchema,
2713    assignments: &[(usize, Expr)],
2714) -> Option<Vec<DoUpdateFastPath>> {
2715    let non_pk = ts.non_pk_indices();
2716    let enc_pos = ts.encoding_positions();
2717    let mut out = Vec::with_capacity(assignments.len());
2718    for (col_idx, expr) in assignments {
2719        let col = &ts.columns[*col_idx];
2720        if col.data_type != DataType::Integer {
2721            return None;
2722        }
2723        let nonpk_order = non_pk.iter().position(|&i| i == *col_idx)?;
2724        let phys_idx = enc_pos[nonpk_order] as usize;
2725
2726        if let Expr::BinaryOp { left, op, right } = expr {
2727            if !matches!(op, BinOp::Add | BinOp::Sub) {
2728                return None;
2729            }
2730            let reads_target =
2731                matches!(left.as_ref(), Expr::Column(n) if n.eq_ignore_ascii_case(&col.name));
2732            if !reads_target {
2733                return None;
2734            }
2735            if let Expr::Literal(Value::Integer(n)) = right.as_ref() {
2736                let delta = if matches!(op, BinOp::Sub) { -n } else { *n };
2737                let _ = col_idx;
2738                out.push(DoUpdateFastPath::IntAddConst { phys_idx, delta });
2739                continue;
2740            }
2741            return None;
2742        }
2743        return None;
2744    }
2745    Some(out)
2746}
2747
2748fn compile_on_conflict(oc: &OnConflictClause, ts: &TableSchema) -> Result<CompiledOnConflict> {
2749    let target = oc
2750        .target
2751        .as_ref()
2752        .map(|t| resolve_conflict_target(t, ts))
2753        .transpose()?;
2754    match &oc.action {
2755        OnConflictAction::DoNothing => Ok(CompiledOnConflict::DoNothing { target }),
2756        OnConflictAction::DoUpdate {
2757            assignments,
2758            where_clause,
2759        } => {
2760            let target = target.ok_or_else(|| {
2761                SqlError::Plan("ON CONFLICT without target requires DO NOTHING".into())
2762            })?;
2763            let compiled_assignments: Vec<(usize, Expr)> = assignments
2764                .iter()
2765                .map(|(name, expr)| {
2766                    let col_idx = ts
2767                        .column_index(name)
2768                        .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?;
2769                    Ok((col_idx, expr.clone()))
2770                })
2771                .collect::<Result<_>>()?;
2772            let fast_paths = if where_clause.is_none() {
2773                detect_fast_paths(ts, &compiled_assignments)
2774            } else {
2775                None
2776            };
2777            Ok(CompiledOnConflict::DoUpdate {
2778                target,
2779                assignments: compiled_assignments,
2780                where_clause: where_clause.clone(),
2781                fast_paths,
2782            })
2783        }
2784    }
2785}
2786
2787/// Caller MUST check `cache.is_trivial_fast` first.
2788fn exec_insert_trivial_fast(
2789    wtx: &mut WriteTxn<'_>,
2790    table_lower: &str,
2791    cache: &InsertCache,
2792    bufs: &mut InsertBufs,
2793    params: &[Value],
2794) -> Result<ExecutionResult> {
2795    let prog = cache
2796        .trivial_fast_program
2797        .as_ref()
2798        .expect("trivial fast: program");
2799
2800    for &p in &prog.not_null_param_indices {
2801        if params[p as usize].is_null() {
2802            return Err(SqlError::NotNullViolation(format!("param@{p}")));
2803        }
2804    }
2805
2806    match &params[prog.pk_param as usize] {
2807        Value::Integer(v) => crate::encoding::encode_int_key_into(*v, &mut bufs.key_buf),
2808        _ => return Err(SqlError::InvalidValue("non-integer PK in fast path".into())),
2809    }
2810
2811    bufs.value_buf.clear();
2812    bufs.value_buf.extend_from_slice(&prog.template);
2813
2814    for op in &prog.ops {
2815        match op {
2816            WriteOp::ParamI64 { param_idx, off } => match &params[*param_idx as usize] {
2817                Value::Integer(v) => {
2818                    let off = *off as usize;
2819                    bufs.value_buf[off..off + 8].copy_from_slice(&v.to_le_bytes());
2820                }
2821                other => {
2822                    return Err(SqlError::TypeMismatch {
2823                        expected: "Integer".into(),
2824                        got: other.data_type().to_string(),
2825                    });
2826                }
2827            },
2828            WriteOp::LiteralI64 { value, off } => {
2829                let off = *off as usize;
2830                bufs.value_buf[off..off + 8].copy_from_slice(&value.to_le_bytes());
2831            }
2832            WriteOp::GenAddParamsI64 {
2833                a_param,
2834                b_param,
2835                off,
2836                bitmap_byte_off,
2837                bitmap_bit_mask,
2838            } => match (&params[*a_param as usize], &params[*b_param as usize]) {
2839                (Value::Integer(a), Value::Integer(b)) => {
2840                    let off = *off as usize;
2841                    bufs.value_buf[off..off + 8].copy_from_slice(&a.wrapping_add(*b).to_le_bytes());
2842                }
2843                _ => {
2844                    bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
2845                }
2846            },
2847            WriteOp::GenMulAddParamI64 {
2848                param_idx,
2849                mul,
2850                add,
2851                off,
2852                bitmap_byte_off,
2853                bitmap_bit_mask,
2854            } => match &params[*param_idx as usize] {
2855                Value::Integer(v) => {
2856                    let r = v.wrapping_mul(*mul).wrapping_add(*add);
2857                    let off = *off as usize;
2858                    bufs.value_buf[off..off + 8].copy_from_slice(&r.to_le_bytes());
2859                }
2860                _ => {
2861                    bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
2862                }
2863            },
2864        }
2865    }
2866
2867    let is_new = wtx
2868        .table_insert(table_lower.as_bytes(), &bufs.key_buf, &bufs.value_buf)
2869        .map_err(SqlError::Storage)?;
2870    if !is_new {
2871        return Err(SqlError::DuplicateKey);
2872    }
2873    Ok(ExecutionResult::RowsAffected(1))
2874}
2875
2876fn build_bind_plan(
2877    stmt: &InsertStmt,
2878    col_indices: &[usize],
2879    col_data_types: &[DataType],
2880) -> Option<Vec<BindAction>> {
2881    let rows = match &stmt.source {
2882        InsertSource::Values(rows) => rows,
2883        _ => return None,
2884    };
2885    if rows.len() != 1 {
2886        return None;
2887    }
2888    let value_row = &rows[0];
2889    if value_row.len() != col_indices.len() {
2890        return None;
2891    }
2892    let mut plan = Vec::with_capacity(value_row.len());
2893    for (i, expr) in value_row.iter().enumerate() {
2894        let col_idx = col_indices[i];
2895        let target = col_data_types[col_idx];
2896        match expr {
2897            Expr::Parameter(n) => {
2898                if *n == 0 {
2899                    return None;
2900                }
2901                plan.push(BindAction::Param {
2902                    param_idx: n - 1,
2903                    col_idx,
2904                    target,
2905                });
2906            }
2907            Expr::Literal(v) => plan.push(BindAction::Literal {
2908                value: v.clone(),
2909                col_idx,
2910            }),
2911            _ => return None,
2912        }
2913    }
2914    Some(plan)
2915}
2916
2917impl CompiledInsert {
2918    pub fn try_compile(schema: &SchemaManager, stmt: &InsertStmt) -> Option<Self> {
2919        let lower = stmt.table.to_ascii_lowercase();
2920        let cached = if let Some(ts) = schema.get(&lower) {
2921            let insert_columns: Vec<&str> = if stmt.columns.is_empty() {
2922                ts.columns.iter().map(|c| c.name.as_str()).collect()
2923            } else {
2924                stmt.columns.iter().map(|s| s.as_str()).collect()
2925            };
2926            let mut col_indices = Vec::with_capacity(insert_columns.len());
2927            for name in &insert_columns {
2928                col_indices.push(ts.column_index(name)?);
2929            }
2930            if col_indices
2931                .iter()
2932                .any(|&ci| ts.columns[ci].generated_kind.is_some())
2933            {
2934                return None;
2935            }
2936            let on_conflict = stmt
2937                .on_conflict
2938                .as_ref()
2939                .map(|oc| compile_on_conflict(oc, ts))
2940                .transpose()
2941                .ok()
2942                .flatten()
2943                .map(Arc::new);
2944            let generated_col_positions: Vec<usize> = ts
2945                .columns
2946                .iter()
2947                .enumerate()
2948                .filter_map(|(i, c)| {
2949                    matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored))
2950                        .then_some(i)
2951                })
2952                .collect();
2953            let generated_fast_evals: Vec<FastGenEval> = generated_col_positions
2954                .iter()
2955                .map(|&pos| {
2956                    detect_fast_gen_eval(ts.columns[pos].generated_expr.as_ref().unwrap(), ts)
2957                })
2958                .collect();
2959            let row_col_map = if on_conflict.is_some() || !generated_col_positions.is_empty() {
2960                Some(ColumnMap::new(&ts.columns))
2961            } else {
2962                None
2963            };
2964            let pk_indices: Vec<usize> = ts.pk_indices().to_vec();
2965            let non_pk_indices: Vec<usize> = ts.non_pk_indices().to_vec();
2966            let encoding_positions: Vec<u16> = ts.encoding_positions().to_vec();
2967            let dropped_non_pk_slots: Vec<u16> = ts.dropped_non_pk_slots().to_vec();
2968            let phys_count = ts.physical_non_pk_count();
2969            let col_data_types: Vec<DataType> = ts.columns.iter().map(|c| c.data_type).collect();
2970            let single_int_pk =
2971                pk_indices.len() == 1 && ts.columns[pk_indices[0]].data_type == DataType::Integer;
2972            let not_null_indices: Vec<u16> = ts
2973                .columns
2974                .iter()
2975                .filter(|c| !c.nullable)
2976                .map(|c| c.position)
2977                .collect();
2978            let bind_plan = build_bind_plan(stmt, &col_indices, &col_data_types);
2979            let any_defaults_flag = ts.columns.iter().any(|c| c.default_expr.is_some());
2980            let row_fully_overwritten = if any_defaults_flag {
2981                false
2982            } else {
2983                let mut covered: rustc_hash::FxHashSet<usize> =
2984                    col_indices.iter().copied().collect();
2985                covered.extend(generated_col_positions.iter().copied());
2986                for (j, &i) in non_pk_indices.iter().enumerate() {
2987                    let _ = j;
2988                    if matches!(
2989                        ts.columns[i].generated_kind,
2990                        Some(crate::parser::GeneratedKind::Virtual)
2991                    ) {
2992                        covered.insert(i);
2993                    }
2994                }
2995                bind_plan.is_some() && covered.len() == ts.columns.len()
2996            };
2997            let has_fks = !ts.foreign_keys.is_empty();
2998            let has_indices = !ts.indices.is_empty();
2999            let mut non_virtual_pairs: Vec<(usize, usize)> = Vec::new();
3000            let mut null_value_slots: Vec<usize> =
3001                dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3002            for (j, &i) in non_pk_indices.iter().enumerate() {
3003                let slot = encoding_positions[j] as usize;
3004                if matches!(
3005                    ts.columns[i].generated_kind,
3006                    Some(crate::parser::GeneratedKind::Virtual)
3007                ) {
3008                    null_value_slots.push(slot);
3009                } else {
3010                    non_virtual_pairs.push((i, slot));
3011                }
3012            }
3013            let row_encoder = {
3014                let all_int_or_null = non_pk_indices.iter().enumerate().all(|(j, &i)| {
3015                    let col = &ts.columns[i];
3016                    if matches!(
3017                        col.generated_kind,
3018                        Some(crate::parser::GeneratedKind::Virtual)
3019                    ) {
3020                        true
3021                    } else {
3022                        col.data_type == DataType::Integer && encoding_positions[j] != u16::MAX
3023                    }
3024                });
3025                if all_int_or_null {
3026                    let mut null_slots: Vec<usize> =
3027                        dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3028                    for (j, &i) in non_pk_indices.iter().enumerate() {
3029                        if matches!(
3030                            ts.columns[i].generated_kind,
3031                            Some(crate::parser::GeneratedKind::Virtual)
3032                        ) {
3033                            null_slots.push(encoding_positions[j] as usize);
3034                        }
3035                    }
3036                    Some(crate::encoding::build_int_row_template(
3037                        phys_count,
3038                        &null_slots,
3039                    ))
3040                } else {
3041                    None
3042                }
3043            };
3044            let is_trivial_fast_eligible = !insert_has_subquery(stmt)
3045                && !ts.columns.iter().any(|c| c.default_expr.is_some())
3046                && !ts.has_checks()
3047                && !has_fks
3048                && !has_indices
3049                && stmt.on_conflict.is_none()
3050                && stmt.returning.is_none()
3051                && bind_plan.is_some()
3052                && row_encoder.is_some()
3053                && row_fully_overwritten
3054                && single_int_pk
3055                && generated_fast_evals
3056                    .iter()
3057                    .all(|fe| !matches!(fe, FastGenEval::None));
3058            let trivial_fast_program = if is_trivial_fast_eligible {
3059                build_trivial_fast_program(
3060                    bind_plan.as_ref().unwrap(),
3061                    row_encoder.as_ref().unwrap(),
3062                    &non_virtual_pairs,
3063                    &generated_col_positions,
3064                    &generated_fast_evals,
3065                    &pk_indices,
3066                    &ts.columns,
3067                )
3068            } else {
3069                None
3070            };
3071            let is_trivial_fast = trivial_fast_program.is_some();
3072            Some(InsertCache {
3073                col_indices,
3074                has_subquery: insert_has_subquery(stmt),
3075                any_defaults: ts.columns.iter().any(|c| c.default_expr.is_some()),
3076                has_checks: ts.has_checks(),
3077                on_conflict,
3078                row_col_map,
3079                generated_col_positions,
3080                generated_fast_evals,
3081                pk_indices,
3082                non_pk_indices,
3083                encoding_positions,
3084                dropped_non_pk_slots,
3085                phys_count,
3086                single_int_pk,
3087                not_null_indices,
3088                bind_plan,
3089                row_fully_overwritten,
3090                row_encoder,
3091                is_trivial_fast,
3092                trivial_fast_program,
3093            })
3094        } else if schema.get_view(&lower).is_some() {
3095            None
3096        } else {
3097            return None;
3098        };
3099        Some(Self {
3100            table_lower: lower,
3101            cached,
3102        })
3103    }
3104}
3105
3106impl CompiledPlan for CompiledInsert {
3107    fn execute(
3108        &self,
3109        db: &Database,
3110        schema: &SchemaManager,
3111        stmt: &Statement,
3112        params: &[Value],
3113        wtx: Option<&mut WriteTxn<'_>>,
3114    ) -> Result<ExecutionResult> {
3115        let ins = match stmt {
3116            Statement::Insert(i) => i,
3117            _ => {
3118                return Err(SqlError::Unsupported(
3119                    "CompiledInsert received non-INSERT statement".into(),
3120                ))
3121            }
3122        };
3123        match wtx {
3124            None => exec_insert(db, schema, ins, params),
3125            Some(outer) => match self.cached.as_ref() {
3126                Some(c) if c.is_trivial_fast => with_insert_scratch(|bufs| {
3127                    exec_insert_trivial_fast(outer, &self.table_lower, c, bufs, params)
3128                }),
3129                Some(c) => exec_insert_in_txn_cached(outer, schema, ins, params, c),
3130                None => exec_insert_in_txn(outer, schema, ins, params),
3131            },
3132        }
3133    }
3134
3135    fn uses_scoped_params(&self) -> bool {
3136        match self.cached.as_ref() {
3137            Some(c) => !c.is_trivial_fast,
3138            None => true,
3139        }
3140    }
3141}
3142
3143pub struct CompiledDelete {
3144    table_lower: String,
3145}
3146
3147impl CompiledDelete {
3148    pub fn try_compile(schema: &SchemaManager, stmt: &DeleteStmt) -> Option<Self> {
3149        let lower = stmt.table.to_ascii_lowercase();
3150        schema.get(&lower)?;
3151        Some(Self { table_lower: lower })
3152    }
3153}
3154
3155impl CompiledPlan for CompiledDelete {
3156    fn execute(
3157        &self,
3158        db: &Database,
3159        schema: &SchemaManager,
3160        stmt: &Statement,
3161        _params: &[Value],
3162        wtx: Option<&mut WriteTxn<'_>>,
3163    ) -> Result<ExecutionResult> {
3164        let del = match stmt {
3165            Statement::Delete(d) => d,
3166            _ => {
3167                return Err(SqlError::Unsupported(
3168                    "CompiledDelete received non-DELETE statement".into(),
3169                ))
3170            }
3171        };
3172        let _ = &self.table_lower;
3173        match wtx {
3174            None => super::write::exec_delete(db, schema, del),
3175            Some(outer) => super::write::exec_delete_in_txn(outer, schema, del),
3176        }
3177    }
3178}