Skip to main content

citadel_sql/executor/
dml.rs

1use std::cell::RefCell;
2use std::sync::Arc;
3
4use citadel::Database;
5use citadel_buffer::btree::{UpsertAction, UpsertOutcome};
6use citadel_txn::write_txn::WriteTxn;
7use rustc_hash::FxHashMap;
8
9use crate::encoding::{encode_composite_key_into, encode_row_into};
10use crate::error::{Result, SqlError};
11use crate::eval::{eval_expr, is_truthy, ColumnMap, EvalCtx};
12use crate::parser::*;
13use crate::types::*;
14
15use crate::schema::SchemaManager;
16
17use super::compile::CompiledPlan;
18use super::helpers::*;
19use super::CteContext;
20
21pub(super) fn exec_insert(
22    db: &Database,
23    schema: &SchemaManager,
24    stmt: &InsertStmt,
25    params: &[Value],
26) -> Result<ExecutionResult> {
27    let empty_ctes = CteContext::default();
28    let materialized;
29    let stmt = if insert_has_subquery(stmt) {
30        materialized = materialize_insert(stmt, &mut |sub| {
31            exec_subquery_read(db, schema, sub, &empty_ctes)
32        })?;
33        &materialized
34    } else {
35        stmt
36    };
37
38    let lower_name = stmt.table.to_ascii_lowercase();
39    if schema.get_view(&lower_name).is_some() {
40        return Err(SqlError::CannotModifyView(stmt.table.clone()));
41    }
42    let table_schema = schema
43        .get(&lower_name)
44        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
45
46    let insert_columns = if stmt.columns.is_empty() {
47        table_schema
48            .columns
49            .iter()
50            .map(|c| c.name.clone())
51            .collect::<Vec<_>>()
52    } else {
53        stmt.columns
54            .iter()
55            .map(|c| c.to_ascii_lowercase())
56            .collect()
57    };
58
59    let col_indices: Vec<usize> = insert_columns
60        .iter()
61        .map(|name| {
62            table_schema
63                .column_index(name)
64                .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
65        })
66        .collect::<Result<_>>()?;
67
68    let defaults: Vec<(usize, &Expr)> = table_schema
69        .columns
70        .iter()
71        .filter(|c| c.default_expr.is_some() && !col_indices.contains(&(c.position as usize)))
72        .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
73        .collect();
74
75    // ColumnMap for CHECK evaluation
76    let has_checks = table_schema.has_checks();
77    let check_col_map = if has_checks {
78        Some(ColumnMap::new(&table_schema.columns))
79    } else {
80        None
81    };
82
83    let select_rows = match &stmt.source {
84        InsertSource::Select(sq) => {
85            let insert_ctes =
86                super::materialize_all_ctes(&sq.ctes, sq.recursive, &mut |body, ctx| {
87                    exec_query_body_read(db, schema, body, ctx)
88                })?;
89            let qr = exec_query_body_read(db, schema, &sq.body, &insert_ctes)?;
90            Some(qr.rows)
91        }
92        InsertSource::Values(_) => None,
93    };
94
95    let compiled_conflict: Option<Arc<CompiledOnConflict>> = stmt
96        .on_conflict
97        .as_ref()
98        .map(|oc| compile_on_conflict(oc, table_schema).map(Arc::new))
99        .transpose()?;
100
101    let row_col_map = compiled_conflict
102        .as_ref()
103        .map(|_| ColumnMap::new(&table_schema.columns));
104
105    let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
106    let mut count: u64 = 0;
107
108    let pk_indices = table_schema.pk_indices();
109    let non_pk = table_schema.non_pk_indices();
110    let enc_pos = table_schema.encoding_positions();
111    let phys_count = table_schema.physical_non_pk_count();
112    let mut row = vec![Value::Null; table_schema.columns.len()];
113    let mut pk_values: Vec<Value> = vec![Value::Null; pk_indices.len()];
114    let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
115    let mut key_buf: Vec<u8> = Vec::with_capacity(64);
116    let mut value_buf: Vec<u8> = Vec::with_capacity(256);
117    let mut fk_key_buf: Vec<u8> = Vec::with_capacity(64);
118
119    let values = match &stmt.source {
120        InsertSource::Values(rows) => Some(rows.as_slice()),
121        InsertSource::Select(_) => None,
122    };
123    let sel_rows = select_rows.as_deref();
124
125    let total = match (values, sel_rows) {
126        (Some(rows), _) => rows.len(),
127        (_, Some(rows)) => rows.len(),
128        _ => 0,
129    };
130
131    if let Some(sel) = sel_rows {
132        if !sel.is_empty() && sel[0].len() != insert_columns.len() {
133            return Err(SqlError::InvalidValue(format!(
134                "INSERT ... SELECT column count mismatch: expected {}, got {}",
135                insert_columns.len(),
136                sel[0].len()
137            )));
138        }
139    }
140
141    for idx in 0..total {
142        for v in row.iter_mut() {
143            *v = Value::Null;
144        }
145
146        if let Some(value_rows) = values {
147            let value_row = &value_rows[idx];
148            if value_row.len() != insert_columns.len() {
149                return Err(SqlError::InvalidValue(format!(
150                    "expected {} values, got {}",
151                    insert_columns.len(),
152                    value_row.len()
153                )));
154            }
155            for (i, expr) in value_row.iter().enumerate() {
156                let val = if let Expr::Parameter(n) = expr {
157                    params
158                        .get(n - 1)
159                        .cloned()
160                        .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?
161                } else {
162                    eval_const_expr(expr)?
163                };
164                let col_idx = col_indices[i];
165                let col = &table_schema.columns[col_idx];
166                let got_type = val.data_type();
167                row[col_idx] = if val.is_null() {
168                    Value::Null
169                } else {
170                    val.coerce_into(col.data_type)
171                        .ok_or_else(|| SqlError::TypeMismatch {
172                            expected: col.data_type.to_string(),
173                            got: got_type.to_string(),
174                        })?
175                };
176            }
177        } else if let Some(sel) = sel_rows {
178            let sel_row = &sel[idx];
179            for (i, val) in sel_row.iter().enumerate() {
180                let col_idx = col_indices[i];
181                let col = &table_schema.columns[col_idx];
182                let got_type = val.data_type();
183                row[col_idx] = if val.is_null() {
184                    Value::Null
185                } else {
186                    val.clone().coerce_into(col.data_type).ok_or_else(|| {
187                        SqlError::TypeMismatch {
188                            expected: col.data_type.to_string(),
189                            got: got_type.to_string(),
190                        }
191                    })?
192                };
193            }
194        }
195
196        for &(pos, def_expr) in &defaults {
197            let val = eval_const_expr(def_expr)?;
198            let col = &table_schema.columns[pos];
199            if val.is_null() {
200                // row[pos] already Null from init
201            } else {
202                let got_type = val.data_type();
203                row[pos] =
204                    val.coerce_into(col.data_type)
205                        .ok_or_else(|| SqlError::TypeMismatch {
206                            expected: col.data_type.to_string(),
207                            got: got_type.to_string(),
208                        })?;
209            }
210        }
211
212        for col in &table_schema.columns {
213            if !col.nullable && row[col.position as usize].is_null() {
214                return Err(SqlError::NotNullViolation(col.name.clone()));
215            }
216        }
217
218        if let Some(ref col_map) = check_col_map {
219            for col in &table_schema.columns {
220                if let Some(ref check) = col.check_expr {
221                    let result = eval_expr(check, &EvalCtx::new(col_map, &row))?;
222                    if !is_truthy(&result) && !result.is_null() {
223                        let name = col.check_name.as_deref().unwrap_or(&col.name);
224                        return Err(SqlError::CheckViolation(name.to_string()));
225                    }
226                }
227            }
228            for tc in &table_schema.check_constraints {
229                let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &row))?;
230                if !is_truthy(&result) && !result.is_null() {
231                    let name = tc.name.as_deref().unwrap_or(&tc.sql);
232                    return Err(SqlError::CheckViolation(name.to_string()));
233                }
234            }
235        }
236
237        for fk in &table_schema.foreign_keys {
238            let any_null = fk.columns.iter().any(|&ci| row[ci as usize].is_null());
239            if any_null {
240                continue; // MATCH SIMPLE: skip if any FK col is NULL
241            }
242            let fk_vals: Vec<Value> = fk
243                .columns
244                .iter()
245                .map(|&ci| row[ci as usize].clone())
246                .collect();
247            fk_key_buf.clear();
248            encode_composite_key_into(&fk_vals, &mut fk_key_buf);
249            let found = wtx
250                .table_get(fk.foreign_table.as_bytes(), &fk_key_buf)
251                .map_err(SqlError::Storage)?;
252            if found.is_none() {
253                let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
254                return Err(SqlError::ForeignKeyViolation(name.to_string()));
255            }
256        }
257
258        for (j, &i) in pk_indices.iter().enumerate() {
259            pk_values[j] = std::mem::replace(&mut row[i], Value::Null);
260        }
261        encode_composite_key_into(&pk_values, &mut key_buf);
262
263        for (j, &i) in non_pk.iter().enumerate() {
264            value_values[enc_pos[j] as usize] = std::mem::replace(&mut row[i], Value::Null);
265        }
266        encode_row_into(&value_values, &mut value_buf);
267
268        if key_buf.len() > citadel_core::MAX_KEY_SIZE {
269            return Err(SqlError::KeyTooLarge {
270                size: key_buf.len(),
271                max: citadel_core::MAX_KEY_SIZE,
272            });
273        }
274        if value_buf.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
275            return Err(SqlError::RowTooLarge {
276                size: value_buf.len(),
277                max: citadel_core::MAX_INLINE_VALUE_SIZE,
278            });
279        }
280
281        match compiled_conflict.as_ref() {
282            None => {
283                let is_new = wtx
284                    .table_insert(stmt.table.as_bytes(), &key_buf, &value_buf)
285                    .map_err(SqlError::Storage)?;
286                if !is_new {
287                    return Err(SqlError::DuplicateKey);
288                }
289                if !table_schema.indices.is_empty() {
290                    for (j, &i) in pk_indices.iter().enumerate() {
291                        row[i] = pk_values[j].clone();
292                    }
293                    for (j, &i) in non_pk.iter().enumerate() {
294                        row[i] =
295                            std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
296                    }
297                    insert_index_entries(&mut wtx, table_schema, &row, &pk_values)?;
298                }
299                count += 1;
300            }
301            Some(oc) => {
302                let oc_ref: &CompiledOnConflict = oc;
303                let needs_row = upsert_needs_row(oc_ref, table_schema);
304                if needs_row {
305                    for (j, &i) in pk_indices.iter().enumerate() {
306                        row[i] = pk_values[j].clone();
307                    }
308                    for (j, &i) in non_pk.iter().enumerate() {
309                        row[i] =
310                            std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
311                    }
312                }
313                match apply_insert_with_conflict(
314                    &mut wtx,
315                    table_schema,
316                    &key_buf,
317                    &value_buf,
318                    &row,
319                    &pk_values,
320                    oc_ref,
321                    row_col_map.as_ref().unwrap(),
322                )? {
323                    InsertRowOutcome::Inserted => count += 1,
324                    InsertRowOutcome::Skipped => {}
325                }
326            }
327        }
328    }
329
330    wtx.commit().map_err(SqlError::Storage)?;
331    Ok(ExecutionResult::RowsAffected(count))
332}
333
334pub(super) fn has_subquery(expr: &Expr) -> bool {
335    crate::parser::has_subquery(expr)
336}
337
338pub(super) fn stmt_has_subquery(stmt: &SelectStmt) -> bool {
339    if let Some(ref w) = stmt.where_clause {
340        if has_subquery(w) {
341            return true;
342        }
343    }
344    if let Some(ref h) = stmt.having {
345        if has_subquery(h) {
346            return true;
347        }
348    }
349    for col in &stmt.columns {
350        if let SelectColumn::Expr { expr, .. } = col {
351            if has_subquery(expr) {
352                return true;
353            }
354        }
355    }
356    for ob in &stmt.order_by {
357        if has_subquery(&ob.expr) {
358            return true;
359        }
360    }
361    for join in &stmt.joins {
362        if let Some(ref on_expr) = join.on_clause {
363            if has_subquery(on_expr) {
364                return true;
365            }
366        }
367    }
368    false
369}
370
371pub(super) fn materialize_expr(
372    expr: &Expr,
373    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
374) -> Result<Expr> {
375    match expr {
376        Expr::InSubquery {
377            expr: e,
378            subquery,
379            negated,
380        } => {
381            let inner = materialize_expr(e, exec_sub)?;
382            let qr = exec_sub(subquery)?;
383            if !qr.columns.is_empty() && qr.columns.len() != 1 {
384                return Err(SqlError::SubqueryMultipleColumns);
385            }
386            let mut values = std::collections::HashSet::new();
387            let mut has_null = false;
388            for row in &qr.rows {
389                if row[0].is_null() {
390                    has_null = true;
391                } else {
392                    values.insert(row[0].clone());
393                }
394            }
395            Ok(Expr::InSet {
396                expr: Box::new(inner),
397                values,
398                has_null,
399                negated: *negated,
400            })
401        }
402        Expr::ScalarSubquery(subquery) => {
403            let qr = exec_sub(subquery)?;
404            if qr.rows.len() > 1 {
405                return Err(SqlError::SubqueryMultipleRows);
406            }
407            let val = if qr.rows.is_empty() {
408                Value::Null
409            } else {
410                qr.rows[0][0].clone()
411            };
412            Ok(Expr::Literal(val))
413        }
414        Expr::Exists { subquery, negated } => {
415            let qr = exec_sub(subquery)?;
416            let exists = !qr.rows.is_empty();
417            let result = if *negated { !exists } else { exists };
418            Ok(Expr::Literal(Value::Boolean(result)))
419        }
420        Expr::InList {
421            expr: e,
422            list,
423            negated,
424        } => {
425            let inner = materialize_expr(e, exec_sub)?;
426            let items = list
427                .iter()
428                .map(|item| materialize_expr(item, exec_sub))
429                .collect::<Result<Vec<_>>>()?;
430            Ok(Expr::InList {
431                expr: Box::new(inner),
432                list: items,
433                negated: *negated,
434            })
435        }
436        Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
437            left: Box::new(materialize_expr(left, exec_sub)?),
438            op: *op,
439            right: Box::new(materialize_expr(right, exec_sub)?),
440        }),
441        Expr::UnaryOp { op, expr: e } => Ok(Expr::UnaryOp {
442            op: *op,
443            expr: Box::new(materialize_expr(e, exec_sub)?),
444        }),
445        Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(materialize_expr(e, exec_sub)?))),
446        Expr::IsNotNull(e) => Ok(Expr::IsNotNull(Box::new(materialize_expr(e, exec_sub)?))),
447        Expr::InSet {
448            expr: e,
449            values,
450            has_null,
451            negated,
452        } => Ok(Expr::InSet {
453            expr: Box::new(materialize_expr(e, exec_sub)?),
454            values: values.clone(),
455            has_null: *has_null,
456            negated: *negated,
457        }),
458        Expr::Between {
459            expr: e,
460            low,
461            high,
462            negated,
463        } => Ok(Expr::Between {
464            expr: Box::new(materialize_expr(e, exec_sub)?),
465            low: Box::new(materialize_expr(low, exec_sub)?),
466            high: Box::new(materialize_expr(high, exec_sub)?),
467            negated: *negated,
468        }),
469        Expr::Like {
470            expr: e,
471            pattern,
472            escape,
473            negated,
474        } => {
475            let esc = escape
476                .as_ref()
477                .map(|es| materialize_expr(es, exec_sub).map(Box::new))
478                .transpose()?;
479            Ok(Expr::Like {
480                expr: Box::new(materialize_expr(e, exec_sub)?),
481                pattern: Box::new(materialize_expr(pattern, exec_sub)?),
482                escape: esc,
483                negated: *negated,
484            })
485        }
486        Expr::Case {
487            operand,
488            conditions,
489            else_result,
490        } => {
491            let op = operand
492                .as_ref()
493                .map(|e| materialize_expr(e, exec_sub).map(Box::new))
494                .transpose()?;
495            let conds = conditions
496                .iter()
497                .map(|(c, r)| {
498                    Ok((
499                        materialize_expr(c, exec_sub)?,
500                        materialize_expr(r, exec_sub)?,
501                    ))
502                })
503                .collect::<Result<Vec<_>>>()?;
504            let else_r = else_result
505                .as_ref()
506                .map(|e| materialize_expr(e, exec_sub).map(Box::new))
507                .transpose()?;
508            Ok(Expr::Case {
509                operand: op,
510                conditions: conds,
511                else_result: else_r,
512            })
513        }
514        Expr::Coalesce(args) => {
515            let materialized = args
516                .iter()
517                .map(|a| materialize_expr(a, exec_sub))
518                .collect::<Result<Vec<_>>>()?;
519            Ok(Expr::Coalesce(materialized))
520        }
521        Expr::Cast { expr: e, data_type } => Ok(Expr::Cast {
522            expr: Box::new(materialize_expr(e, exec_sub)?),
523            data_type: *data_type,
524        }),
525        Expr::Function {
526            name,
527            args,
528            distinct,
529        } => {
530            let materialized = args
531                .iter()
532                .map(|a| materialize_expr(a, exec_sub))
533                .collect::<Result<Vec<_>>>()?;
534            Ok(Expr::Function {
535                name: name.clone(),
536                args: materialized,
537                distinct: *distinct,
538            })
539        }
540        other => Ok(other.clone()),
541    }
542}
543
544pub(super) fn materialize_stmt(
545    stmt: &SelectStmt,
546    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
547) -> Result<SelectStmt> {
548    let where_clause = stmt
549        .where_clause
550        .as_ref()
551        .map(|e| materialize_expr(e, exec_sub))
552        .transpose()?;
553    let having = stmt
554        .having
555        .as_ref()
556        .map(|e| materialize_expr(e, exec_sub))
557        .transpose()?;
558    let columns = stmt
559        .columns
560        .iter()
561        .map(|c| match c {
562            SelectColumn::AllColumns => Ok(SelectColumn::AllColumns),
563            SelectColumn::Expr { expr, alias } => Ok(SelectColumn::Expr {
564                expr: materialize_expr(expr, exec_sub)?,
565                alias: alias.clone(),
566            }),
567        })
568        .collect::<Result<Vec<_>>>()?;
569    let order_by = stmt
570        .order_by
571        .iter()
572        .map(|ob| {
573            Ok(OrderByItem {
574                expr: materialize_expr(&ob.expr, exec_sub)?,
575                descending: ob.descending,
576                nulls_first: ob.nulls_first,
577            })
578        })
579        .collect::<Result<Vec<_>>>()?;
580    let joins = stmt
581        .joins
582        .iter()
583        .map(|j| {
584            let on_clause = j
585                .on_clause
586                .as_ref()
587                .map(|e| materialize_expr(e, exec_sub))
588                .transpose()?;
589            Ok(JoinClause {
590                join_type: j.join_type,
591                table: j.table.clone(),
592                on_clause,
593            })
594        })
595        .collect::<Result<Vec<_>>>()?;
596    let group_by = stmt
597        .group_by
598        .iter()
599        .map(|e| materialize_expr(e, exec_sub))
600        .collect::<Result<Vec<_>>>()?;
601    Ok(SelectStmt {
602        columns,
603        from: stmt.from.clone(),
604        from_alias: stmt.from_alias.clone(),
605        joins,
606        distinct: stmt.distinct,
607        where_clause,
608        order_by,
609        limit: stmt.limit.clone(),
610        offset: stmt.offset.clone(),
611        group_by,
612        having,
613    })
614}
615
616pub(super) fn exec_subquery_read(
617    db: &Database,
618    schema: &SchemaManager,
619    stmt: &SelectStmt,
620    ctes: &CteContext,
621) -> Result<QueryResult> {
622    match super::exec_select(db, schema, stmt, ctes)? {
623        ExecutionResult::Query(qr) => Ok(qr),
624        _ => Ok(QueryResult {
625            columns: vec![],
626            rows: vec![],
627        }),
628    }
629}
630
631pub(super) fn exec_subquery_write(
632    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
633    schema: &SchemaManager,
634    stmt: &SelectStmt,
635    ctes: &CteContext,
636) -> Result<QueryResult> {
637    match super::exec_select_in_txn(wtx, schema, stmt, ctes)? {
638        ExecutionResult::Query(qr) => Ok(qr),
639        _ => Ok(QueryResult {
640            columns: vec![],
641            rows: vec![],
642        }),
643    }
644}
645
646pub(super) fn update_has_subquery(stmt: &UpdateStmt) -> bool {
647    stmt.where_clause.as_ref().is_some_and(has_subquery)
648        || stmt.assignments.iter().any(|(_, e)| has_subquery(e))
649}
650
651pub(super) fn materialize_update(
652    stmt: &UpdateStmt,
653    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
654) -> Result<UpdateStmt> {
655    let where_clause = stmt
656        .where_clause
657        .as_ref()
658        .map(|e| materialize_expr(e, exec_sub))
659        .transpose()?;
660    let assignments = stmt
661        .assignments
662        .iter()
663        .map(|(name, expr)| Ok((name.clone(), materialize_expr(expr, exec_sub)?)))
664        .collect::<Result<Vec<_>>>()?;
665    Ok(UpdateStmt {
666        table: stmt.table.clone(),
667        assignments,
668        where_clause,
669    })
670}
671
672pub(super) fn delete_has_subquery(stmt: &DeleteStmt) -> bool {
673    stmt.where_clause.as_ref().is_some_and(has_subquery)
674}
675
676pub(super) fn materialize_delete(
677    stmt: &DeleteStmt,
678    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
679) -> Result<DeleteStmt> {
680    let where_clause = stmt
681        .where_clause
682        .as_ref()
683        .map(|e| materialize_expr(e, exec_sub))
684        .transpose()?;
685    Ok(DeleteStmt {
686        table: stmt.table.clone(),
687        where_clause,
688    })
689}
690
691pub(super) fn insert_has_subquery(stmt: &InsertStmt) -> bool {
692    match &stmt.source {
693        InsertSource::Values(rows) => rows.iter().any(|row| row.iter().any(has_subquery)),
694        // SELECT source subqueries are handled by exec_select's correlated/non-correlated paths
695        InsertSource::Select(_) => false,
696    }
697}
698
699pub(super) fn materialize_insert(
700    stmt: &InsertStmt,
701    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
702) -> Result<InsertStmt> {
703    let source = match &stmt.source {
704        InsertSource::Values(rows) => {
705            let mat = rows
706                .iter()
707                .map(|row| {
708                    row.iter()
709                        .map(|e| materialize_expr(e, exec_sub))
710                        .collect::<Result<Vec<_>>>()
711                })
712                .collect::<Result<Vec<_>>>()?;
713            InsertSource::Values(mat)
714        }
715        InsertSource::Select(sq) => {
716            let ctes = sq
717                .ctes
718                .iter()
719                .map(|c| {
720                    Ok(CteDefinition {
721                        name: c.name.clone(),
722                        column_aliases: c.column_aliases.clone(),
723                        body: materialize_query_body(&c.body, exec_sub)?,
724                    })
725                })
726                .collect::<Result<Vec<_>>>()?;
727            let body = materialize_query_body(&sq.body, exec_sub)?;
728            InsertSource::Select(Box::new(SelectQuery {
729                ctes,
730                recursive: sq.recursive,
731                body,
732            }))
733        }
734    };
735    Ok(InsertStmt {
736        table: stmt.table.clone(),
737        columns: stmt.columns.clone(),
738        source,
739        on_conflict: stmt.on_conflict.clone(),
740    })
741}
742
743pub(super) fn materialize_query_body(
744    body: &QueryBody,
745    exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
746) -> Result<QueryBody> {
747    match body {
748        QueryBody::Select(sel) => Ok(QueryBody::Select(Box::new(materialize_stmt(
749            sel, exec_sub,
750        )?))),
751        QueryBody::Compound(comp) => Ok(QueryBody::Compound(Box::new(CompoundSelect {
752            op: comp.op.clone(),
753            all: comp.all,
754            left: Box::new(materialize_query_body(&comp.left, exec_sub)?),
755            right: Box::new(materialize_query_body(&comp.right, exec_sub)?),
756            order_by: comp.order_by.clone(),
757            limit: comp.limit.clone(),
758            offset: comp.offset.clone(),
759        }))),
760    }
761}
762
763pub(super) fn exec_query_body(
764    db: &Database,
765    schema: &SchemaManager,
766    body: &QueryBody,
767    ctes: &CteContext,
768) -> Result<ExecutionResult> {
769    match body {
770        QueryBody::Select(sel) => super::exec_select(db, schema, sel, ctes),
771        QueryBody::Compound(comp) => exec_compound_select(db, schema, comp, ctes),
772    }
773}
774
775pub(super) fn exec_query_body_in_txn(
776    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
777    schema: &SchemaManager,
778    body: &QueryBody,
779    ctes: &CteContext,
780) -> Result<ExecutionResult> {
781    match body {
782        QueryBody::Select(sel) => super::exec_select_in_txn(wtx, schema, sel, ctes),
783        QueryBody::Compound(comp) => exec_compound_select_in_txn(wtx, schema, comp, ctes),
784    }
785}
786
787pub(super) fn exec_query_body_read(
788    db: &Database,
789    schema: &SchemaManager,
790    body: &QueryBody,
791    ctes: &CteContext,
792) -> Result<QueryResult> {
793    match exec_query_body(db, schema, body, ctes)? {
794        ExecutionResult::Query(qr) => Ok(qr),
795        _ => Ok(QueryResult {
796            columns: vec![],
797            rows: vec![],
798        }),
799    }
800}
801
802pub(super) fn exec_query_body_write(
803    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
804    schema: &SchemaManager,
805    body: &QueryBody,
806    ctes: &CteContext,
807) -> Result<QueryResult> {
808    match exec_query_body_in_txn(wtx, schema, body, ctes)? {
809        ExecutionResult::Query(qr) => Ok(qr),
810        _ => Ok(QueryResult {
811            columns: vec![],
812            rows: vec![],
813        }),
814    }
815}
816
817pub(super) fn exec_compound_select(
818    db: &Database,
819    schema: &SchemaManager,
820    comp: &CompoundSelect,
821    ctes: &CteContext,
822) -> Result<ExecutionResult> {
823    let left_qr = match exec_query_body(db, schema, &comp.left, ctes)? {
824        ExecutionResult::Query(qr) => qr,
825        _ => QueryResult {
826            columns: vec![],
827            rows: vec![],
828        },
829    };
830    let right_qr = match exec_query_body(db, schema, &comp.right, ctes)? {
831        ExecutionResult::Query(qr) => qr,
832        _ => QueryResult {
833            columns: vec![],
834            rows: vec![],
835        },
836    };
837    apply_set_operation(comp, left_qr, right_qr)
838}
839
840pub(super) fn exec_compound_select_in_txn(
841    wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
842    schema: &SchemaManager,
843    comp: &CompoundSelect,
844    ctes: &CteContext,
845) -> Result<ExecutionResult> {
846    let left_qr = match exec_query_body_in_txn(wtx, schema, &comp.left, ctes)? {
847        ExecutionResult::Query(qr) => qr,
848        _ => QueryResult {
849            columns: vec![],
850            rows: vec![],
851        },
852    };
853    let right_qr = match exec_query_body_in_txn(wtx, schema, &comp.right, ctes)? {
854        ExecutionResult::Query(qr) => qr,
855        _ => QueryResult {
856            columns: vec![],
857            rows: vec![],
858        },
859    };
860    apply_set_operation(comp, left_qr, right_qr)
861}
862
863pub(super) fn apply_set_operation(
864    comp: &CompoundSelect,
865    left_qr: QueryResult,
866    right_qr: QueryResult,
867) -> Result<ExecutionResult> {
868    if !left_qr.columns.is_empty()
869        && !right_qr.columns.is_empty()
870        && left_qr.columns.len() != right_qr.columns.len()
871    {
872        return Err(SqlError::CompoundColumnCountMismatch {
873            left: left_qr.columns.len(),
874            right: right_qr.columns.len(),
875        });
876    }
877
878    let columns = left_qr.columns;
879
880    let mut rows = match (&comp.op, comp.all) {
881        (SetOp::Union, true) => {
882            let mut rows = left_qr.rows;
883            rows.extend(right_qr.rows);
884            rows
885        }
886        (SetOp::Union, false) => {
887            let mut seen: std::collections::HashSet<Vec<Value>> = std::collections::HashSet::new();
888            let mut rows = Vec::new();
889            for row in left_qr.rows.into_iter().chain(right_qr.rows) {
890                if !seen.contains(&row) {
891                    seen.insert(row.clone());
892                    rows.push(row);
893                }
894            }
895            rows
896        }
897        (SetOp::Intersect, true) => {
898            let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
899            for row in &right_qr.rows {
900                *right_counts.entry(row.clone()).or_insert(0) += 1;
901            }
902            let mut rows = Vec::new();
903            for row in left_qr.rows {
904                if let Some(count) = right_counts.get_mut(&row) {
905                    if *count > 0 {
906                        *count -= 1;
907                        rows.push(row);
908                    }
909                }
910            }
911            rows
912        }
913        (SetOp::Intersect, false) => {
914            let right_set: std::collections::HashSet<Vec<Value>> =
915                right_qr.rows.into_iter().collect();
916            let mut seen: std::collections::HashSet<Vec<Value>> = std::collections::HashSet::new();
917            let mut rows = Vec::new();
918            for row in left_qr.rows {
919                if right_set.contains(&row) && !seen.contains(&row) {
920                    seen.insert(row.clone());
921                    rows.push(row);
922                }
923            }
924            rows
925        }
926        (SetOp::Except, true) => {
927            let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
928            for row in &right_qr.rows {
929                *right_counts.entry(row.clone()).or_insert(0) += 1;
930            }
931            let mut rows = Vec::new();
932            for row in left_qr.rows {
933                if let Some(count) = right_counts.get_mut(&row) {
934                    if *count > 0 {
935                        *count -= 1;
936                        continue;
937                    }
938                }
939                rows.push(row);
940            }
941            rows
942        }
943        (SetOp::Except, false) => {
944            let right_set: std::collections::HashSet<Vec<Value>> =
945                right_qr.rows.into_iter().collect();
946            let mut seen: std::collections::HashSet<Vec<Value>> = std::collections::HashSet::new();
947            let mut rows = Vec::new();
948            for row in left_qr.rows {
949                if !right_set.contains(&row) && !seen.contains(&row) {
950                    seen.insert(row.clone());
951                    rows.push(row);
952                }
953            }
954            rows
955        }
956    };
957
958    if !comp.order_by.is_empty() {
959        let col_defs: Vec<crate::types::ColumnDef> = columns
960            .iter()
961            .enumerate()
962            .map(|(i, name)| crate::types::ColumnDef {
963                name: name.clone(),
964                data_type: crate::types::DataType::Null,
965                nullable: true,
966                position: i as u16,
967                default_expr: None,
968                default_sql: None,
969                check_expr: None,
970                check_sql: None,
971                check_name: None,
972                is_with_timezone: false,
973            })
974            .collect();
975        sort_rows(&mut rows, &comp.order_by, &col_defs)?;
976    }
977
978    if let Some(ref offset_expr) = comp.offset {
979        let offset = eval_const_int(offset_expr)?.max(0) as usize;
980        if offset < rows.len() {
981            rows = rows.split_off(offset);
982        } else {
983            rows.clear();
984        }
985    }
986
987    if let Some(ref limit_expr) = comp.limit {
988        let limit = eval_const_int(limit_expr)?.max(0) as usize;
989        rows.truncate(limit);
990    }
991
992    Ok(ExecutionResult::Query(QueryResult { columns, rows }))
993}
994
995struct InsertBufs {
996    row: Vec<Value>,
997    pk_values: Vec<Value>,
998    value_values: Vec<Value>,
999    key_buf: Vec<u8>,
1000    value_buf: Vec<u8>,
1001    col_indices: Vec<usize>,
1002    fk_key_buf: Vec<u8>,
1003}
1004
1005impl InsertBufs {
1006    fn new() -> Self {
1007        Self {
1008            row: Vec::new(),
1009            pk_values: Vec::new(),
1010            value_values: Vec::new(),
1011            key_buf: Vec::with_capacity(64),
1012            value_buf: Vec::with_capacity(256),
1013            col_indices: Vec::new(),
1014            fk_key_buf: Vec::with_capacity(64),
1015        }
1016    }
1017}
1018
1019thread_local! {
1020    static INSERT_SCRATCH: RefCell<InsertBufs> = RefCell::new(InsertBufs::new());
1021    static UPSERT_SCRATCH: RefCell<UpsertBufs> = RefCell::new(UpsertBufs::new());
1022}
1023
1024fn with_insert_scratch<R>(f: impl FnOnce(&mut InsertBufs) -> R) -> R {
1025    INSERT_SCRATCH.with(|slot| f(&mut slot.borrow_mut()))
1026}
1027
1028pub(super) struct UpsertBufs {
1029    old_row: Vec<Value>,
1030    new_row: Vec<Value>,
1031    value_values: Vec<Value>,
1032    new_value_buf: Vec<u8>,
1033}
1034
1035impl UpsertBufs {
1036    pub(super) fn new() -> Self {
1037        Self {
1038            old_row: Vec::new(),
1039            new_row: Vec::new(),
1040            value_values: Vec::new(),
1041            new_value_buf: Vec::with_capacity(256),
1042        }
1043    }
1044}
1045
1046pub fn exec_insert_in_txn(
1047    wtx: &mut WriteTxn<'_>,
1048    schema: &SchemaManager,
1049    stmt: &InsertStmt,
1050    params: &[Value],
1051) -> Result<ExecutionResult> {
1052    with_insert_scratch(|bufs| exec_insert_in_txn_impl(wtx, schema, stmt, params, bufs, None))
1053}
1054
1055fn exec_insert_in_txn_cached(
1056    wtx: &mut WriteTxn<'_>,
1057    schema: &SchemaManager,
1058    stmt: &InsertStmt,
1059    params: &[Value],
1060    cache: &InsertCache,
1061) -> Result<ExecutionResult> {
1062    with_insert_scratch(|bufs| {
1063        exec_insert_in_txn_impl(wtx, schema, stmt, params, bufs, Some(cache))
1064    })
1065}
1066
1067fn exec_insert_in_txn_impl(
1068    wtx: &mut WriteTxn<'_>,
1069    schema: &SchemaManager,
1070    stmt: &InsertStmt,
1071    params: &[Value],
1072    bufs: &mut InsertBufs,
1073    cache: Option<&InsertCache>,
1074) -> Result<ExecutionResult> {
1075    let empty_ctes = CteContext::default();
1076    let materialized;
1077    let has_sub = match cache {
1078        Some(c) => c.has_subquery,
1079        None => insert_has_subquery(stmt),
1080    };
1081    let stmt = if has_sub {
1082        materialized = materialize_insert(stmt, &mut |sub| {
1083            exec_subquery_write(wtx, schema, sub, &empty_ctes)
1084        })?;
1085        &materialized
1086    } else {
1087        stmt
1088    };
1089
1090    let table_schema = schema
1091        .get(&stmt.table)
1092        .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1093
1094    let default_columns;
1095    let insert_columns: &[String] = if stmt.columns.is_empty() {
1096        default_columns = table_schema
1097            .columns
1098            .iter()
1099            .map(|c| c.name.clone())
1100            .collect::<Vec<_>>();
1101        &default_columns
1102    } else {
1103        &stmt.columns
1104    };
1105
1106    bufs.col_indices.clear();
1107    if let Some(c) = cache {
1108        bufs.col_indices.extend_from_slice(&c.col_indices);
1109    } else {
1110        for name in insert_columns {
1111            bufs.col_indices.push(
1112                table_schema
1113                    .column_index(name)
1114                    .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?,
1115            );
1116        }
1117    }
1118
1119    let any_defaults = match cache {
1120        Some(c) => c.any_defaults,
1121        None => table_schema
1122            .columns
1123            .iter()
1124            .any(|c| c.default_expr.is_some()),
1125    };
1126    let defaults: Vec<(usize, &Expr)> = if any_defaults {
1127        table_schema
1128            .columns
1129            .iter()
1130            .filter(|c| {
1131                c.default_expr.is_some() && !bufs.col_indices.contains(&(c.position as usize))
1132            })
1133            .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
1134            .collect()
1135    } else {
1136        Vec::new()
1137    };
1138
1139    let has_checks = match cache {
1140        Some(c) => c.has_checks,
1141        None => table_schema.has_checks(),
1142    };
1143    let check_col_map = if has_checks {
1144        Some(ColumnMap::new(&table_schema.columns))
1145    } else {
1146        None
1147    };
1148
1149    let pk_indices = table_schema.pk_indices();
1150    let non_pk = table_schema.non_pk_indices();
1151    let enc_pos = table_schema.encoding_positions();
1152    let phys_count = table_schema.physical_non_pk_count();
1153    let dropped = table_schema.dropped_non_pk_slots();
1154
1155    bufs.row.resize(table_schema.columns.len(), Value::Null);
1156    bufs.pk_values.resize(pk_indices.len(), Value::Null);
1157    bufs.value_values.resize(phys_count, Value::Null);
1158
1159    let table_bytes = stmt.table.as_bytes();
1160    let has_fks = !table_schema.foreign_keys.is_empty();
1161    let has_indices = !table_schema.indices.is_empty();
1162    let has_defaults = !defaults.is_empty();
1163
1164    let compiled_conflict: Option<Arc<CompiledOnConflict>> = match (cache, &stmt.on_conflict) {
1165        (Some(c), Some(_)) if c.on_conflict.is_some() => c.on_conflict.clone(),
1166        (_, Some(oc)) => Some(Arc::new(compile_on_conflict(oc, table_schema)?)),
1167        (_, None) => None,
1168    };
1169
1170    let row_col_map_owned: Option<ColumnMap> =
1171        if compiled_conflict.is_some() && cache.and_then(|c| c.row_col_map.as_ref()).is_none() {
1172            Some(ColumnMap::new(&table_schema.columns))
1173        } else {
1174            None
1175        };
1176    let row_col_map: Option<&ColumnMap> = cache
1177        .and_then(|c| c.row_col_map.as_ref())
1178        .or(row_col_map_owned.as_ref());
1179
1180    let select_rows = match &stmt.source {
1181        InsertSource::Select(sq) => {
1182            let insert_ctes =
1183                super::materialize_all_ctes(&sq.ctes, sq.recursive, &mut |body, ctx| {
1184                    exec_query_body_write(wtx, schema, body, ctx)
1185                })?;
1186            let qr = exec_query_body_write(wtx, schema, &sq.body, &insert_ctes)?;
1187            Some(qr.rows)
1188        }
1189        InsertSource::Values(_) => None,
1190    };
1191
1192    let mut count: u64 = 0;
1193
1194    let values = match &stmt.source {
1195        InsertSource::Values(rows) => Some(rows.as_slice()),
1196        InsertSource::Select(_) => None,
1197    };
1198    let sel_rows = select_rows.as_deref();
1199
1200    let total = match (values, sel_rows) {
1201        (Some(rows), _) => rows.len(),
1202        (_, Some(rows)) => rows.len(),
1203        _ => 0,
1204    };
1205
1206    if let Some(sel) = sel_rows {
1207        if !sel.is_empty() && sel[0].len() != insert_columns.len() {
1208            return Err(SqlError::InvalidValue(format!(
1209                "INSERT ... SELECT column count mismatch: expected {}, got {}",
1210                insert_columns.len(),
1211                sel[0].len()
1212            )));
1213        }
1214    }
1215
1216    for idx in 0..total {
1217        for v in bufs.row.iter_mut() {
1218            *v = Value::Null;
1219        }
1220
1221        if let Some(value_rows) = values {
1222            let value_row = &value_rows[idx];
1223            if value_row.len() != insert_columns.len() {
1224                return Err(SqlError::InvalidValue(format!(
1225                    "expected {} values, got {}",
1226                    insert_columns.len(),
1227                    value_row.len()
1228                )));
1229            }
1230            for (i, expr) in value_row.iter().enumerate() {
1231                let val = match expr {
1232                    Expr::Parameter(n) => params
1233                        .get(n - 1)
1234                        .cloned()
1235                        .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
1236                    Expr::Literal(v) => v.clone(),
1237                    _ => eval_const_expr(expr)?,
1238                };
1239                let col_idx = bufs.col_indices[i];
1240                let col = &table_schema.columns[col_idx];
1241                let got_type = val.data_type();
1242                bufs.row[col_idx] = if val.is_null() {
1243                    Value::Null
1244                } else {
1245                    val.coerce_into(col.data_type)
1246                        .ok_or_else(|| SqlError::TypeMismatch {
1247                            expected: col.data_type.to_string(),
1248                            got: got_type.to_string(),
1249                        })?
1250                };
1251            }
1252        } else if let Some(sel) = sel_rows {
1253            let sel_row = &sel[idx];
1254            for (i, val) in sel_row.iter().enumerate() {
1255                let col_idx = bufs.col_indices[i];
1256                let col = &table_schema.columns[col_idx];
1257                let got_type = val.data_type();
1258                bufs.row[col_idx] = if val.is_null() {
1259                    Value::Null
1260                } else {
1261                    val.clone().coerce_into(col.data_type).ok_or_else(|| {
1262                        SqlError::TypeMismatch {
1263                            expected: col.data_type.to_string(),
1264                            got: got_type.to_string(),
1265                        }
1266                    })?
1267                };
1268            }
1269        }
1270
1271        if has_defaults {
1272            for &(pos, def_expr) in &defaults {
1273                let val = eval_const_expr(def_expr)?;
1274                let col = &table_schema.columns[pos];
1275                if !val.is_null() {
1276                    let got_type = val.data_type();
1277                    bufs.row[pos] =
1278                        val.coerce_into(col.data_type)
1279                            .ok_or_else(|| SqlError::TypeMismatch {
1280                                expected: col.data_type.to_string(),
1281                                got: got_type.to_string(),
1282                            })?;
1283                }
1284            }
1285        }
1286
1287        for col in &table_schema.columns {
1288            if !col.nullable && bufs.row[col.position as usize].is_null() {
1289                return Err(SqlError::NotNullViolation(col.name.clone()));
1290            }
1291        }
1292
1293        if let Some(ref col_map) = check_col_map {
1294            for col in &table_schema.columns {
1295                if let Some(ref check) = col.check_expr {
1296                    let result = eval_expr(check, &EvalCtx::new(col_map, &bufs.row))?;
1297                    if !is_truthy(&result) && !result.is_null() {
1298                        let name = col.check_name.as_deref().unwrap_or(&col.name);
1299                        return Err(SqlError::CheckViolation(name.to_string()));
1300                    }
1301                }
1302            }
1303            for tc in &table_schema.check_constraints {
1304                let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &bufs.row))?;
1305                if !is_truthy(&result) && !result.is_null() {
1306                    let name = tc.name.as_deref().unwrap_or(&tc.sql);
1307                    return Err(SqlError::CheckViolation(name.to_string()));
1308                }
1309            }
1310        }
1311
1312        if has_fks {
1313            for fk in &table_schema.foreign_keys {
1314                let any_null = fk.columns.iter().any(|&ci| bufs.row[ci as usize].is_null());
1315                if any_null {
1316                    continue;
1317                }
1318                let fk_vals: Vec<Value> = fk
1319                    .columns
1320                    .iter()
1321                    .map(|&ci| bufs.row[ci as usize].clone())
1322                    .collect();
1323                bufs.fk_key_buf.clear();
1324                encode_composite_key_into(&fk_vals, &mut bufs.fk_key_buf);
1325                let found = wtx
1326                    .table_get(fk.foreign_table.as_bytes(), &bufs.fk_key_buf)
1327                    .map_err(SqlError::Storage)?;
1328                if found.is_none() {
1329                    let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
1330                    return Err(SqlError::ForeignKeyViolation(name.to_string()));
1331                }
1332            }
1333        }
1334
1335        for (j, &i) in pk_indices.iter().enumerate() {
1336            bufs.pk_values[j] = std::mem::replace(&mut bufs.row[i], Value::Null);
1337        }
1338        encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf);
1339
1340        for &slot in dropped {
1341            bufs.value_values[slot as usize] = Value::Null;
1342        }
1343        for (j, &i) in non_pk.iter().enumerate() {
1344            bufs.value_values[enc_pos[j] as usize] =
1345                std::mem::replace(&mut bufs.row[i], Value::Null);
1346        }
1347        encode_row_into(&bufs.value_values, &mut bufs.value_buf);
1348
1349        if bufs.key_buf.len() > citadel_core::MAX_KEY_SIZE {
1350            return Err(SqlError::KeyTooLarge {
1351                size: bufs.key_buf.len(),
1352                max: citadel_core::MAX_KEY_SIZE,
1353            });
1354        }
1355        if bufs.value_buf.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
1356            return Err(SqlError::RowTooLarge {
1357                size: bufs.value_buf.len(),
1358                max: citadel_core::MAX_INLINE_VALUE_SIZE,
1359            });
1360        }
1361
1362        match compiled_conflict.as_ref() {
1363            None => {
1364                let is_new = wtx
1365                    .table_insert(table_bytes, &bufs.key_buf, &bufs.value_buf)
1366                    .map_err(SqlError::Storage)?;
1367                if !is_new {
1368                    return Err(SqlError::DuplicateKey);
1369                }
1370                if has_indices {
1371                    for (j, &i) in pk_indices.iter().enumerate() {
1372                        bufs.row[i] = bufs.pk_values[j].clone();
1373                    }
1374                    for (j, &i) in non_pk.iter().enumerate() {
1375                        bufs.row[i] = std::mem::replace(
1376                            &mut bufs.value_values[enc_pos[j] as usize],
1377                            Value::Null,
1378                        );
1379                    }
1380                    insert_index_entries(wtx, table_schema, &bufs.row, &bufs.pk_values)?;
1381                }
1382                count += 1;
1383            }
1384            Some(oc) => {
1385                let oc_ref: &CompiledOnConflict = oc;
1386                let needs_row = upsert_needs_row(oc_ref, table_schema);
1387                if needs_row {
1388                    for (j, &i) in pk_indices.iter().enumerate() {
1389                        bufs.row[i] = bufs.pk_values[j].clone();
1390                    }
1391                    for (j, &i) in non_pk.iter().enumerate() {
1392                        bufs.row[i] = std::mem::replace(
1393                            &mut bufs.value_values[enc_pos[j] as usize],
1394                            Value::Null,
1395                        );
1396                    }
1397                }
1398                match apply_insert_with_conflict(
1399                    wtx,
1400                    table_schema,
1401                    &bufs.key_buf,
1402                    &bufs.value_buf,
1403                    &bufs.row,
1404                    &bufs.pk_values,
1405                    oc_ref,
1406                    row_col_map.unwrap(),
1407                )? {
1408                    InsertRowOutcome::Inserted => count += 1,
1409                    InsertRowOutcome::Skipped => {}
1410                }
1411            }
1412        }
1413    }
1414
1415    Ok(ExecutionResult::RowsAffected(count))
1416}
1417
1418pub struct CompiledInsert {
1419    table_lower: String,
1420    cached: Option<InsertCache>,
1421}
1422
1423struct InsertCache {
1424    col_indices: Vec<usize>,
1425    has_subquery: bool,
1426    any_defaults: bool,
1427    has_checks: bool,
1428    on_conflict: Option<Arc<CompiledOnConflict>>,
1429    row_col_map: Option<ColumnMap>,
1430}
1431
1432#[derive(Clone)]
1433pub(super) enum CompiledOnConflict {
1434    DoNothing {
1435        target: Option<ConflictKind>,
1436    },
1437    DoUpdate {
1438        target: ConflictKind,
1439        assignments: Vec<(usize, Expr)>,
1440        where_clause: Option<Expr>,
1441        fast_paths: Option<Vec<DoUpdateFastPath>>,
1442    },
1443}
1444
1445#[derive(Clone, Copy)]
1446pub(super) enum DoUpdateFastPath {
1447    IntAddConst { phys_idx: usize, delta: i64 },
1448}
1449
1450#[derive(Clone, Debug)]
1451pub(super) enum ConflictKind {
1452    PrimaryKey,
1453    UniqueIndex { index_idx: usize },
1454}
1455
1456fn resolve_conflict_target(target: &ConflictTarget, ts: &TableSchema) -> Result<ConflictKind> {
1457    match target {
1458        ConflictTarget::Columns(cols) => {
1459            let col_idx_set: Vec<u16> = cols
1460                .iter()
1461                .map(|name| {
1462                    ts.column_index(name)
1463                        .map(|i| i as u16)
1464                        .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
1465                })
1466                .collect::<Result<_>>()?;
1467            let pk_set = ts.primary_key_columns.clone();
1468            if set_equal(&col_idx_set, &pk_set) {
1469                return Ok(ConflictKind::PrimaryKey);
1470            }
1471            for (index_idx, idx) in ts.indices.iter().enumerate() {
1472                if idx.unique && set_equal(&col_idx_set, &idx.columns) {
1473                    return Ok(ConflictKind::UniqueIndex { index_idx });
1474                }
1475            }
1476            Err(SqlError::Plan(
1477                "ON CONFLICT target does not match any unique constraint".into(),
1478            ))
1479        }
1480        ConflictTarget::Constraint(name) => {
1481            let lower = name.to_ascii_lowercase();
1482            for (index_idx, idx) in ts.indices.iter().enumerate() {
1483                if idx.name.eq_ignore_ascii_case(&lower) {
1484                    if idx.unique {
1485                        return Ok(ConflictKind::UniqueIndex { index_idx });
1486                    }
1487                    return Err(SqlError::Plan(format!(
1488                        "ON CONFLICT ON CONSTRAINT '{name}' requires a unique index"
1489                    )));
1490                }
1491            }
1492            Err(SqlError::Plan(format!(
1493                "unknown constraint '{name}'; primary keys cannot be referenced by name, use ON CONFLICT (col_list)"
1494            )))
1495        }
1496    }
1497}
1498
1499fn set_equal(a: &[u16], b: &[u16]) -> bool {
1500    if a.len() != b.len() {
1501        return false;
1502    }
1503    let mut a_sorted = a.to_vec();
1504    let mut b_sorted = b.to_vec();
1505    a_sorted.sort_unstable();
1506    b_sorted.sort_unstable();
1507    a_sorted == b_sorted
1508}
1509
1510pub(super) enum InsertRowOutcome {
1511    Inserted,
1512    Skipped,
1513}
1514
1515#[allow(clippy::too_many_arguments)]
1516#[inline]
1517pub(super) fn apply_insert_with_conflict(
1518    wtx: &mut WriteTxn<'_>,
1519    table_schema: &TableSchema,
1520    key_buf: &[u8],
1521    value_buf: &[u8],
1522    row: &[Value],
1523    pk_values: &[Value],
1524    on_conflict: &CompiledOnConflict,
1525    col_map: &ColumnMap,
1526) -> Result<InsertRowOutcome> {
1527    let table_bytes = table_schema.name.as_bytes();
1528
1529    if let CompiledOnConflict::DoNothing { target } = on_conflict {
1530        let pk_target = matches!(target, None | Some(ConflictKind::PrimaryKey));
1531        if pk_target && table_schema.indices.is_empty() && table_schema.foreign_keys.is_empty() {
1532            let inserted = wtx
1533                .table_insert_if_absent(table_bytes, key_buf, value_buf)
1534                .map_err(SqlError::Storage)?;
1535            return Ok(if inserted {
1536                InsertRowOutcome::Inserted
1537            } else {
1538                InsertRowOutcome::Skipped
1539            });
1540        }
1541    }
1542
1543    if let CompiledOnConflict::DoUpdate {
1544        target: ConflictKind::PrimaryKey,
1545        assignments,
1546        where_clause,
1547        fast_paths,
1548    } = on_conflict
1549    {
1550        if can_fuse_do_update(table_schema, assignments) {
1551            return apply_do_update_fused(
1552                wtx,
1553                table_schema,
1554                table_bytes,
1555                key_buf,
1556                value_buf,
1557                row,
1558                assignments,
1559                where_clause.as_ref(),
1560                col_map,
1561                fast_paths.as_deref(),
1562            );
1563        }
1564    }
1565
1566    let primary_outcome = wtx
1567        .table_insert_or_fetch(table_bytes, key_buf, value_buf)
1568        .map_err(SqlError::Storage)?;
1569
1570    match primary_outcome {
1571        citadel_txn::write_txn::InsertOutcome::Inserted => {
1572            if table_schema.indices.is_empty() {
1573                return Ok(InsertRowOutcome::Inserted);
1574            }
1575            let mut inserted_keys: Vec<(usize, Vec<u8>)> = Vec::new();
1576            match insert_index_entries_or_fetch(
1577                wtx,
1578                table_schema,
1579                row,
1580                pk_values,
1581                &mut inserted_keys,
1582            )? {
1583                None => Ok(InsertRowOutcome::Inserted),
1584                Some(conflicting_idx) => {
1585                    let matches_target =
1586                        matches!(on_conflict, CompiledOnConflict::DoNothing { target: None })
1587                            || matches!(
1588                                on_conflict,
1589                                CompiledOnConflict::DoNothing {
1590                                    target: Some(ConflictKind::UniqueIndex { index_idx }),
1591                                } | CompiledOnConflict::DoUpdate {
1592                                    target: ConflictKind::UniqueIndex { index_idx },
1593                                    ..
1594                                } if *index_idx == conflicting_idx
1595                            );
1596                    undo_partial_insert(wtx, table_schema, key_buf, &inserted_keys)?;
1597                    if !matches_target {
1598                        return Err(SqlError::UniqueViolation(
1599                            table_schema.indices[conflicting_idx].name.clone(),
1600                        ));
1601                    }
1602                    match on_conflict {
1603                        CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
1604                        CompiledOnConflict::DoUpdate {
1605                            assignments,
1606                            where_clause,
1607                            ..
1608                        } => {
1609                            let existing_pk =
1610                                fetch_unique_index_pk(wtx, table_schema, conflicting_idx, row)?;
1611                            apply_do_update(
1612                                wtx,
1613                                table_schema,
1614                                &existing_pk,
1615                                row,
1616                                assignments,
1617                                where_clause.as_ref(),
1618                                col_map,
1619                            )
1620                        }
1621                    }
1622                }
1623            }
1624        }
1625        citadel_txn::write_txn::InsertOutcome::Existed(old_bytes) => {
1626            let matches_target = matches!(
1627                on_conflict,
1628                CompiledOnConflict::DoNothing { target: None }
1629                    | CompiledOnConflict::DoNothing {
1630                        target: Some(ConflictKind::PrimaryKey),
1631                    }
1632                    | CompiledOnConflict::DoUpdate {
1633                        target: ConflictKind::PrimaryKey,
1634                        ..
1635                    }
1636            );
1637            if !matches_target {
1638                return Err(SqlError::DuplicateKey);
1639            }
1640            match on_conflict {
1641                CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
1642                CompiledOnConflict::DoUpdate {
1643                    assignments,
1644                    where_clause,
1645                    ..
1646                } => {
1647                    let old_row = decode_full_row(table_schema, key_buf, &old_bytes)?;
1648                    apply_do_update_with_old_row(
1649                        wtx,
1650                        table_schema,
1651                        key_buf,
1652                        &old_row,
1653                        row,
1654                        assignments,
1655                        where_clause.as_ref(),
1656                        col_map,
1657                    )
1658                }
1659            }
1660        }
1661    }
1662}
1663
1664#[inline]
1665fn apply_fast_path_patch(
1666    old_bytes: &[u8],
1667    fast_paths: &[DoUpdateFastPath],
1668) -> Result<UpsertAction> {
1669    UPSERT_SCRATCH.with(|slot| {
1670        let mut bufs = slot.borrow_mut();
1671        bufs.new_value_buf.clear();
1672        bufs.new_value_buf.extend_from_slice(old_bytes);
1673
1674        let mut patch_scratch: Vec<u8> = Vec::new();
1675
1676        for fp in fast_paths {
1677            match fp {
1678                DoUpdateFastPath::IntAddConst { phys_idx, delta } => {
1679                    let decoded =
1680                        crate::encoding::decode_columns(&bufs.new_value_buf, &[*phys_idx])?;
1681                    let old_val = &decoded[0];
1682                    let new_val = match old_val {
1683                        Value::Integer(i) => Value::Integer(i.wrapping_add(*delta)),
1684                        Value::Null => Value::Null,
1685                        _ => {
1686                            return Err(SqlError::TypeMismatch {
1687                                expected: "INTEGER".into(),
1688                                got: old_val.data_type().to_string(),
1689                            });
1690                        }
1691                    };
1692                    if !crate::encoding::patch_column_in_place(
1693                        &mut bufs.new_value_buf,
1694                        *phys_idx,
1695                        &new_val,
1696                    )? {
1697                        patch_scratch.clear();
1698                        crate::encoding::patch_row_column(
1699                            &bufs.new_value_buf,
1700                            *phys_idx,
1701                            &new_val,
1702                            &mut patch_scratch,
1703                        )?;
1704                        std::mem::swap(&mut bufs.new_value_buf, &mut patch_scratch);
1705                    }
1706                }
1707            }
1708        }
1709
1710        if bufs.new_value_buf.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
1711            return Err(SqlError::RowTooLarge {
1712                size: bufs.new_value_buf.len(),
1713                max: citadel_core::MAX_INLINE_VALUE_SIZE,
1714            });
1715        }
1716
1717        Ok(UpsertAction::Replace(bufs.new_value_buf.clone()))
1718    })
1719}
1720
1721fn upsert_needs_row(oc: &CompiledOnConflict, ts: &TableSchema) -> bool {
1722    if !ts.indices.is_empty() {
1723        return true;
1724    }
1725    match oc {
1726        CompiledOnConflict::DoNothing { .. } => false,
1727        CompiledOnConflict::DoUpdate { fast_paths, .. } => fast_paths.is_none() || ts.has_checks(),
1728    }
1729}
1730
1731fn can_fuse_do_update(ts: &TableSchema, assignments: &[(usize, Expr)]) -> bool {
1732    if !ts.indices.is_empty() {
1733        return false;
1734    }
1735    if !ts.foreign_keys.is_empty() {
1736        return false;
1737    }
1738    let pk = ts.pk_indices();
1739    !assignments.iter().any(|(ci, _)| pk.contains(ci))
1740}
1741
1742#[allow(clippy::too_many_arguments)]
1743#[inline]
1744fn apply_do_update_fused(
1745    wtx: &mut WriteTxn<'_>,
1746    table_schema: &TableSchema,
1747    table_bytes: &[u8],
1748    key_buf: &[u8],
1749    value_buf: &[u8],
1750    proposed_row: &[Value],
1751    assignments: &[(usize, Expr)],
1752    where_clause: Option<&Expr>,
1753    col_map: &ColumnMap,
1754    fast_paths: Option<&[DoUpdateFastPath]>,
1755) -> Result<InsertRowOutcome> {
1756    let non_pk = table_schema.non_pk_indices();
1757    let enc_pos = table_schema.encoding_positions();
1758    let phys_count = table_schema.physical_non_pk_count();
1759    let dropped = table_schema.dropped_non_pk_slots();
1760    let has_checks = table_schema.has_checks();
1761    let has_fks = !table_schema.foreign_keys.is_empty();
1762
1763    let outcome =
1764        wtx.table_upsert_with::<_, SqlError>(table_bytes, key_buf, value_buf, |old_bytes| {
1765            if let Some(fps) = fast_paths {
1766                if !has_checks {
1767                    return apply_fast_path_patch(old_bytes, fps);
1768                }
1769            }
1770            UPSERT_SCRATCH.with(|slot| {
1771                let mut bufs = slot.borrow_mut();
1772                let UpsertBufs {
1773                    old_row,
1774                    new_row,
1775                    value_values,
1776                    new_value_buf,
1777                } = &mut *bufs;
1778
1779                old_row.clear();
1780                old_row.resize(table_schema.columns.len(), Value::Null);
1781                decode_full_row_into(table_schema, key_buf, old_bytes, old_row)?;
1782
1783                if let Some(w) = where_clause {
1784                    let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
1785                    let result = eval_expr(w, &ctx)?;
1786                    if result.is_null() || !is_truthy(&result) {
1787                        return Ok(UpsertAction::Skip);
1788                    }
1789                }
1790
1791                new_row.clear();
1792                new_row.extend_from_slice(old_row);
1793                for (col_idx, expr) in assignments {
1794                    let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
1795                    let val = eval_expr(expr, &ctx)?;
1796                    let col = &table_schema.columns[*col_idx];
1797                    new_row[*col_idx] = if val.is_null() {
1798                        Value::Null
1799                    } else {
1800                        let got = val.data_type();
1801                        val.coerce_into(col.data_type)
1802                            .ok_or_else(|| SqlError::TypeMismatch {
1803                                expected: col.data_type.to_string(),
1804                                got: got.to_string(),
1805                            })?
1806                    };
1807                }
1808
1809                for (assigned_idx, _) in assignments {
1810                    let col = &table_schema.columns[*assigned_idx];
1811                    if !col.nullable && new_row[col.position as usize].is_null() {
1812                        return Err(SqlError::NotNullViolation(col.name.clone()));
1813                    }
1814                }
1815                if has_checks {
1816                    for col in &table_schema.columns {
1817                        if let Some(ref check) = col.check_expr {
1818                            let ctx = EvalCtx::new(col_map, new_row);
1819                            let result = eval_expr(check, &ctx)?;
1820                            if !is_truthy(&result) && !result.is_null() {
1821                                let name = col.check_name.as_deref().unwrap_or(&col.name);
1822                                return Err(SqlError::CheckViolation(name.to_string()));
1823                            }
1824                        }
1825                    }
1826                    for tc in &table_schema.check_constraints {
1827                        let ctx = EvalCtx::new(col_map, new_row);
1828                        let result = eval_expr(&tc.expr, &ctx)?;
1829                        if !is_truthy(&result) && !result.is_null() {
1830                            let name = tc.name.as_deref().unwrap_or(&tc.sql);
1831                            return Err(SqlError::CheckViolation(name.to_string()));
1832                        }
1833                    }
1834                }
1835                let _ = has_fks;
1836
1837                value_values.clear();
1838                value_values.resize(phys_count, Value::Null);
1839                for &slot in dropped {
1840                    value_values[slot as usize] = Value::Null;
1841                }
1842                for (j, &i) in non_pk.iter().enumerate() {
1843                    value_values[enc_pos[j] as usize] = new_row[i].clone();
1844                }
1845                new_value_buf.clear();
1846                crate::encoding::encode_row_into(value_values, new_value_buf);
1847
1848                if new_value_buf.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
1849                    return Err(SqlError::RowTooLarge {
1850                        size: new_value_buf.len(),
1851                        max: citadel_core::MAX_INLINE_VALUE_SIZE,
1852                    });
1853                }
1854
1855                Ok(UpsertAction::Replace(new_value_buf.clone()))
1856            })
1857        })?;
1858
1859    match outcome {
1860        UpsertOutcome::Inserted | UpsertOutcome::Updated => Ok(InsertRowOutcome::Inserted),
1861        UpsertOutcome::Skipped => Ok(InsertRowOutcome::Skipped),
1862    }
1863}
1864
1865fn fetch_unique_index_pk(
1866    wtx: &mut WriteTxn<'_>,
1867    table_schema: &TableSchema,
1868    index_idx: usize,
1869    row: &[Value],
1870) -> Result<Vec<u8>> {
1871    let idx = &table_schema.indices[index_idx];
1872    let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
1873    let indexed: Vec<Value> = idx
1874        .columns
1875        .iter()
1876        .map(|&col_idx| row[col_idx as usize].clone())
1877        .collect();
1878    let key = crate::encoding::encode_composite_key(&indexed);
1879    let value = wtx
1880        .table_get(&idx_table, &key)
1881        .map_err(SqlError::Storage)?
1882        .ok_or_else(|| {
1883            SqlError::InvalidValue("unique index missing expected collision entry".into())
1884        })?;
1885    Ok(value)
1886}
1887
1888fn apply_do_update(
1889    wtx: &mut WriteTxn<'_>,
1890    table_schema: &TableSchema,
1891    pk_key: &[u8],
1892    proposed_row: &[Value],
1893    assignments: &[(usize, Expr)],
1894    where_clause: Option<&Expr>,
1895    col_map: &ColumnMap,
1896) -> Result<InsertRowOutcome> {
1897    let old_value = wtx
1898        .table_get(table_schema.name.as_bytes(), pk_key)
1899        .map_err(SqlError::Storage)?
1900        .ok_or_else(|| SqlError::InvalidValue("primary row missing for DO UPDATE target".into()))?;
1901    let old_row = decode_full_row(table_schema, pk_key, &old_value)?;
1902    apply_do_update_with_old_row(
1903        wtx,
1904        table_schema,
1905        pk_key,
1906        &old_row,
1907        proposed_row,
1908        assignments,
1909        where_clause,
1910        col_map,
1911    )
1912}
1913
1914#[allow(clippy::too_many_arguments)]
1915fn apply_do_update_with_old_row(
1916    wtx: &mut WriteTxn<'_>,
1917    table_schema: &TableSchema,
1918    old_pk_key: &[u8],
1919    old_row: &[Value],
1920    proposed_row: &[Value],
1921    assignments: &[(usize, Expr)],
1922    where_clause: Option<&Expr>,
1923    col_map: &ColumnMap,
1924) -> Result<InsertRowOutcome> {
1925    if let Some(w) = where_clause {
1926        let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
1927        let result = eval_expr(w, &ctx)?;
1928        if result.is_null() || !is_truthy(&result) {
1929            return Ok(InsertRowOutcome::Skipped);
1930        }
1931    }
1932
1933    let mut new_row = old_row.to_vec();
1934    for (col_idx, expr) in assignments {
1935        let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
1936        let val = eval_expr(expr, &ctx)?;
1937        let col = &table_schema.columns[*col_idx];
1938        new_row[*col_idx] = if val.is_null() {
1939            Value::Null
1940        } else {
1941            let got = val.data_type();
1942            val.coerce_into(col.data_type)
1943                .ok_or_else(|| SqlError::TypeMismatch {
1944                    expected: col.data_type.to_string(),
1945                    got: got.to_string(),
1946                })?
1947        };
1948    }
1949
1950    let pk_indices = table_schema.pk_indices();
1951    let assigned_pk = assignments.iter().any(|(ci, _)| pk_indices.contains(ci));
1952    let pk_changed = assigned_pk && pk_indices.iter().any(|&i| old_row[i] != new_row[i]);
1953
1954    for (assigned_idx, _) in assignments {
1955        let col = &table_schema.columns[*assigned_idx];
1956        if !col.nullable && new_row[col.position as usize].is_null() {
1957            return Err(SqlError::NotNullViolation(col.name.clone()));
1958        }
1959    }
1960    if table_schema.has_checks() {
1961        for col in &table_schema.columns {
1962            if let Some(ref check) = col.check_expr {
1963                let ctx = EvalCtx::new(col_map, &new_row);
1964                let result = eval_expr(check, &ctx)?;
1965                if !is_truthy(&result) && !result.is_null() {
1966                    let name = col.check_name.as_deref().unwrap_or(&col.name);
1967                    return Err(SqlError::CheckViolation(name.to_string()));
1968                }
1969            }
1970        }
1971        for tc in &table_schema.check_constraints {
1972            let ctx = EvalCtx::new(col_map, &new_row);
1973            let result = eval_expr(&tc.expr, &ctx)?;
1974            if !is_truthy(&result) && !result.is_null() {
1975                let name = tc.name.as_deref().unwrap_or(&tc.sql);
1976                return Err(SqlError::CheckViolation(name.to_string()));
1977            }
1978        }
1979    }
1980    for fk in &table_schema.foreign_keys {
1981        let changed = fk
1982            .columns
1983            .iter()
1984            .any(|&ci| old_row[ci as usize] != new_row[ci as usize]);
1985        if !changed {
1986            continue;
1987        }
1988        let any_null = fk.columns.iter().any(|&ci| new_row[ci as usize].is_null());
1989        if any_null {
1990            continue;
1991        }
1992        let fk_vals: Vec<Value> = fk
1993            .columns
1994            .iter()
1995            .map(|&ci| new_row[ci as usize].clone())
1996            .collect();
1997        let fk_key = crate::encoding::encode_composite_key(&fk_vals);
1998        let found = wtx
1999            .table_get(fk.foreign_table.as_bytes(), &fk_key)
2000            .map_err(SqlError::Storage)?;
2001        if found.is_none() {
2002            let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
2003            return Err(SqlError::ForeignKeyViolation(name.to_string()));
2004        }
2005    }
2006
2007    let has_indices = !table_schema.indices.is_empty();
2008    let old_pk_values: Vec<Value> = if has_indices || pk_changed {
2009        pk_indices.iter().map(|&i| old_row[i].clone()).collect()
2010    } else {
2011        Vec::new()
2012    };
2013    let new_pk_values: Vec<Value> = if has_indices || pk_changed {
2014        pk_indices.iter().map(|&i| new_row[i].clone()).collect()
2015    } else {
2016        Vec::new()
2017    };
2018
2019    let non_pk = table_schema.non_pk_indices();
2020    let enc_pos = table_schema.encoding_positions();
2021    let phys_count = table_schema.physical_non_pk_count();
2022    let dropped = table_schema.dropped_non_pk_slots();
2023    let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
2024    for &slot in dropped {
2025        value_values[slot as usize] = Value::Null;
2026    }
2027    for (j, &i) in non_pk.iter().enumerate() {
2028        value_values[enc_pos[j] as usize] = new_row[i].clone();
2029    }
2030    let mut new_value_buf = Vec::with_capacity(256);
2031    crate::encoding::encode_row_into(&value_values, &mut new_value_buf);
2032
2033    if new_value_buf.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
2034        return Err(SqlError::RowTooLarge {
2035            size: new_value_buf.len(),
2036            max: citadel_core::MAX_INLINE_VALUE_SIZE,
2037        });
2038    }
2039
2040    if pk_changed {
2041        let new_pk_key = crate::encoding::encode_composite_key(&new_pk_values);
2042        let inserted = wtx
2043            .table_insert(table_schema.name.as_bytes(), &new_pk_key, &new_value_buf)
2044            .map_err(SqlError::Storage)?;
2045        if !inserted {
2046            return Err(SqlError::DuplicateKey);
2047        }
2048        wtx.table_delete(table_schema.name.as_bytes(), old_pk_key)
2049            .map_err(SqlError::Storage)?;
2050        for idx in &table_schema.indices {
2051            let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2052            let old_idx_key = encode_index_key(idx, old_row, &old_pk_values);
2053            wtx.table_delete(&idx_table, &old_idx_key)
2054                .map_err(SqlError::Storage)?;
2055            let new_idx_key = encode_index_key(idx, &new_row, &new_pk_values);
2056            let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
2057            let is_new = wtx
2058                .table_insert(&idx_table, &new_idx_key, &new_idx_val)
2059                .map_err(SqlError::Storage)?;
2060            if idx.unique && !is_new {
2061                let any_null = idx.columns.iter().any(|&c| new_row[c as usize].is_null());
2062                if !any_null {
2063                    return Err(SqlError::UniqueViolation(idx.name.clone()));
2064                }
2065            }
2066        }
2067    } else {
2068        wtx.table_update_sorted(
2069            table_schema.name.as_bytes(),
2070            &[(old_pk_key, new_value_buf.as_slice())],
2071        )
2072        .map_err(SqlError::Storage)?;
2073        for idx in &table_schema.indices {
2074            if !index_columns_changed(idx, old_row, &new_row) {
2075                continue;
2076            }
2077            let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2078            let old_idx_key = encode_index_key(idx, old_row, &old_pk_values);
2079            wtx.table_delete(&idx_table, &old_idx_key)
2080                .map_err(SqlError::Storage)?;
2081            let new_idx_key = encode_index_key(idx, &new_row, &new_pk_values);
2082            let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
2083            let is_new = wtx
2084                .table_insert(&idx_table, &new_idx_key, &new_idx_val)
2085                .map_err(SqlError::Storage)?;
2086            if idx.unique && !is_new {
2087                let any_null = idx.columns.iter().any(|&c| new_row[c as usize].is_null());
2088                if !any_null {
2089                    return Err(SqlError::UniqueViolation(idx.name.clone()));
2090                }
2091            }
2092        }
2093    }
2094
2095    Ok(InsertRowOutcome::Inserted)
2096}
2097
2098fn detect_fast_paths(
2099    ts: &TableSchema,
2100    assignments: &[(usize, Expr)],
2101) -> Option<Vec<DoUpdateFastPath>> {
2102    let non_pk = ts.non_pk_indices();
2103    let enc_pos = ts.encoding_positions();
2104    let mut out = Vec::with_capacity(assignments.len());
2105    for (col_idx, expr) in assignments {
2106        let col = &ts.columns[*col_idx];
2107        if col.data_type != DataType::Integer {
2108            return None;
2109        }
2110        let nonpk_order = non_pk.iter().position(|&i| i == *col_idx)?;
2111        let phys_idx = enc_pos[nonpk_order] as usize;
2112
2113        if let Expr::BinaryOp { left, op, right } = expr {
2114            if !matches!(op, BinOp::Add | BinOp::Sub) {
2115                return None;
2116            }
2117            let reads_target =
2118                matches!(left.as_ref(), Expr::Column(n) if n.eq_ignore_ascii_case(&col.name));
2119            if !reads_target {
2120                return None;
2121            }
2122            if let Expr::Literal(Value::Integer(n)) = right.as_ref() {
2123                let delta = if matches!(op, BinOp::Sub) { -n } else { *n };
2124                let _ = col_idx;
2125                out.push(DoUpdateFastPath::IntAddConst { phys_idx, delta });
2126                continue;
2127            }
2128            return None;
2129        }
2130        return None;
2131    }
2132    Some(out)
2133}
2134
2135fn compile_on_conflict(oc: &OnConflictClause, ts: &TableSchema) -> Result<CompiledOnConflict> {
2136    let target = oc
2137        .target
2138        .as_ref()
2139        .map(|t| resolve_conflict_target(t, ts))
2140        .transpose()?;
2141    match &oc.action {
2142        OnConflictAction::DoNothing => Ok(CompiledOnConflict::DoNothing { target }),
2143        OnConflictAction::DoUpdate {
2144            assignments,
2145            where_clause,
2146        } => {
2147            let target = target.ok_or_else(|| {
2148                SqlError::Plan("ON CONFLICT without target requires DO NOTHING".into())
2149            })?;
2150            let compiled_assignments: Vec<(usize, Expr)> = assignments
2151                .iter()
2152                .map(|(name, expr)| {
2153                    let col_idx = ts
2154                        .column_index(name)
2155                        .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?;
2156                    Ok((col_idx, expr.clone()))
2157                })
2158                .collect::<Result<_>>()?;
2159            let fast_paths = if where_clause.is_none() {
2160                detect_fast_paths(ts, &compiled_assignments)
2161            } else {
2162                None
2163            };
2164            Ok(CompiledOnConflict::DoUpdate {
2165                target,
2166                assignments: compiled_assignments,
2167                where_clause: where_clause.clone(),
2168                fast_paths,
2169            })
2170        }
2171    }
2172}
2173
2174impl CompiledInsert {
2175    pub fn try_compile(schema: &SchemaManager, stmt: &InsertStmt) -> Option<Self> {
2176        let lower = stmt.table.to_ascii_lowercase();
2177        let cached = if let Some(ts) = schema.get(&lower) {
2178            let insert_columns: Vec<&str> = if stmt.columns.is_empty() {
2179                ts.columns.iter().map(|c| c.name.as_str()).collect()
2180            } else {
2181                stmt.columns.iter().map(|s| s.as_str()).collect()
2182            };
2183            let mut col_indices = Vec::with_capacity(insert_columns.len());
2184            for name in &insert_columns {
2185                col_indices.push(ts.column_index(name)?);
2186            }
2187            let on_conflict = stmt
2188                .on_conflict
2189                .as_ref()
2190                .map(|oc| compile_on_conflict(oc, ts))
2191                .transpose()
2192                .ok()
2193                .flatten()
2194                .map(Arc::new);
2195            let row_col_map = on_conflict.as_ref().map(|_| ColumnMap::new(&ts.columns));
2196            Some(InsertCache {
2197                col_indices,
2198                has_subquery: insert_has_subquery(stmt),
2199                any_defaults: ts.columns.iter().any(|c| c.default_expr.is_some()),
2200                has_checks: ts.has_checks(),
2201                on_conflict,
2202                row_col_map,
2203            })
2204        } else if schema.get_view(&lower).is_some() {
2205            None
2206        } else {
2207            return None;
2208        };
2209        Some(Self {
2210            table_lower: lower,
2211            cached,
2212        })
2213    }
2214}
2215
2216impl CompiledPlan for CompiledInsert {
2217    fn execute(
2218        &self,
2219        db: &Database,
2220        schema: &SchemaManager,
2221        stmt: &Statement,
2222        params: &[Value],
2223        wtx: Option<&mut WriteTxn<'_>>,
2224    ) -> Result<ExecutionResult> {
2225        let ins = match stmt {
2226            Statement::Insert(i) => i,
2227            _ => {
2228                return Err(SqlError::Unsupported(
2229                    "CompiledInsert received non-INSERT statement".into(),
2230                ))
2231            }
2232        };
2233        let _ = &self.table_lower;
2234        match wtx {
2235            None => exec_insert(db, schema, ins, params),
2236            Some(outer) => match self.cached.as_ref() {
2237                Some(c) => exec_insert_in_txn_cached(outer, schema, ins, params, c),
2238                None => exec_insert_in_txn(outer, schema, ins, params),
2239            },
2240        }
2241    }
2242}
2243
2244pub struct CompiledDelete {
2245    table_lower: String,
2246}
2247
2248impl CompiledDelete {
2249    pub fn try_compile(schema: &SchemaManager, stmt: &DeleteStmt) -> Option<Self> {
2250        let lower = stmt.table.to_ascii_lowercase();
2251        schema.get(&lower)?;
2252        Some(Self { table_lower: lower })
2253    }
2254}
2255
2256impl CompiledPlan for CompiledDelete {
2257    fn execute(
2258        &self,
2259        db: &Database,
2260        schema: &SchemaManager,
2261        stmt: &Statement,
2262        _params: &[Value],
2263        wtx: Option<&mut WriteTxn<'_>>,
2264    ) -> Result<ExecutionResult> {
2265        let del = match stmt {
2266            Statement::Delete(d) => d,
2267            _ => {
2268                return Err(SqlError::Unsupported(
2269                    "CompiledDelete received non-DELETE statement".into(),
2270                ))
2271            }
2272        };
2273        let _ = &self.table_lower;
2274        match wtx {
2275            None => super::write::exec_delete(db, schema, del),
2276            Some(outer) => super::write::exec_delete_in_txn(outer, schema, del),
2277        }
2278    }
2279}