1use std::cell::RefCell;
2use std::sync::Arc;
3
4use citadel::Database;
5use citadel_buffer::btree::{UpsertAction, UpsertOutcome};
6use citadel_txn::write_txn::WriteTxn;
7use rustc_hash::FxHashMap;
8
9use crate::encoding::{encode_composite_key_into, encode_row_into};
10use crate::error::{Result, SqlError};
11use crate::eval::{eval_expr, is_truthy, ColumnMap, EvalCtx};
12use crate::parser::*;
13use crate::types::*;
14
15use crate::schema::SchemaManager;
16
17use super::compile::CompiledPlan;
18use super::helpers::*;
19use super::CteContext;
20
21pub(super) fn exec_insert(
22 db: &Database,
23 schema: &SchemaManager,
24 stmt: &InsertStmt,
25 params: &[Value],
26) -> Result<ExecutionResult> {
27 let empty_ctes = CteContext::default();
28 let materialized;
29 let stmt = if insert_has_subquery(stmt) {
30 materialized = materialize_insert(stmt, &mut |sub| {
31 exec_subquery_read(db, schema, sub, &empty_ctes)
32 })?;
33 &materialized
34 } else {
35 stmt
36 };
37
38 let lower_name = stmt.table.to_ascii_lowercase();
39 if schema.get_view(&lower_name).is_some() {
40 return Err(SqlError::CannotModifyView(stmt.table.clone()));
41 }
42 let table_schema = schema
43 .get(&lower_name)
44 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
45
46 let insert_columns = if stmt.columns.is_empty() {
47 table_schema
48 .columns
49 .iter()
50 .map(|c| c.name.clone())
51 .collect::<Vec<_>>()
52 } else {
53 stmt.columns
54 .iter()
55 .map(|c| c.to_ascii_lowercase())
56 .collect()
57 };
58
59 let col_indices: Vec<usize> = insert_columns
60 .iter()
61 .map(|name| {
62 table_schema
63 .column_index(name)
64 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
65 })
66 .collect::<Result<_>>()?;
67
68 for &ci in &col_indices {
69 if table_schema.columns[ci].generated_kind.is_some() {
70 return Err(SqlError::CannotInsertIntoGeneratedColumn(
71 table_schema.columns[ci].name.clone(),
72 ));
73 }
74 }
75
76 let defaults: Vec<(usize, &Expr)> = table_schema
77 .columns
78 .iter()
79 .filter(|c| c.default_expr.is_some() && !col_indices.contains(&(c.position as usize)))
80 .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
81 .collect();
82
83 let generated_cols: Vec<(usize, &Expr)> = table_schema
84 .columns
85 .iter()
86 .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
87 .map(|c| (c.position as usize, c.generated_expr.as_ref().unwrap()))
88 .collect();
89
90 let has_checks = table_schema.has_checks();
91 let strict = table_schema.is_strict();
92 let row_col_map_for_gen = if !generated_cols.is_empty() {
93 Some(ColumnMap::new(&table_schema.columns))
94 } else {
95 None
96 };
97 let check_col_map = if has_checks {
98 Some(ColumnMap::new(&table_schema.columns))
99 } else {
100 None
101 };
102
103 let select_rows = match &stmt.source {
104 InsertSource::Select(sq) => {
105 let insert_ctes =
106 super::materialize_all_ctes(&sq.ctes, sq.recursive, &mut |body, ctx| {
107 exec_query_body_read(db, schema, body, ctx)
108 })?;
109 let qr = exec_query_body_read(db, schema, &sq.body, &insert_ctes)?;
110 Some(qr.rows)
111 }
112 InsertSource::Values(_) => None,
113 };
114
115 let compiled_conflict: Option<Arc<CompiledOnConflict>> = stmt
116 .on_conflict
117 .as_ref()
118 .map(|oc| compile_on_conflict(oc, table_schema).map(Arc::new))
119 .transpose()?;
120
121 let row_col_map = compiled_conflict
122 .as_ref()
123 .map(|_| ColumnMap::new(&table_schema.columns));
124
125 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
126 let mut count: u64 = 0;
127 let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
128 stmt.returning.as_ref().map(|_| Vec::new());
129
130 let pk_indices = table_schema.pk_indices();
131 let non_pk = table_schema.non_pk_indices();
132 let enc_pos = table_schema.encoding_positions();
133 let phys_count = table_schema.physical_non_pk_count();
134 let mut row = vec![Value::Null; table_schema.columns.len()];
135 let mut pk_values: Vec<Value> = vec![Value::Null; pk_indices.len()];
136 let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
137 let mut key_buf: Vec<u8> = Vec::with_capacity(64);
138 let mut value_buf: Vec<u8> = Vec::with_capacity(256);
139 let mut fk_key_buf: Vec<u8> = Vec::with_capacity(64);
140
141 let values = match &stmt.source {
142 InsertSource::Values(rows) => Some(rows.as_slice()),
143 InsertSource::Select(_) => None,
144 };
145 let sel_rows = select_rows.as_deref();
146
147 let total = match (values, sel_rows) {
148 (Some(rows), _) => rows.len(),
149 (_, Some(rows)) => rows.len(),
150 _ => 0,
151 };
152
153 if let Some(sel) = sel_rows {
154 if !sel.is_empty() && sel[0].len() != insert_columns.len() {
155 return Err(SqlError::InvalidValue(format!(
156 "INSERT ... SELECT column count mismatch: expected {}, got {}",
157 insert_columns.len(),
158 sel[0].len()
159 )));
160 }
161 }
162
163 for idx in 0..total {
164 for v in row.iter_mut() {
165 *v = Value::Null;
166 }
167
168 if let Some(value_rows) = values {
169 let value_row = &value_rows[idx];
170 if value_row.len() != insert_columns.len() {
171 return Err(SqlError::InvalidValue(format!(
172 "expected {} values, got {}",
173 insert_columns.len(),
174 value_row.len()
175 )));
176 }
177 for (i, expr) in value_row.iter().enumerate() {
178 let val = if let Expr::Parameter(n) = expr {
179 params
180 .get(n - 1)
181 .cloned()
182 .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?
183 } else {
184 eval_const_expr(expr)?
185 };
186 let col_idx = col_indices[i];
187 let col = &table_schema.columns[col_idx];
188 row[col_idx] = if val.is_null() {
189 Value::Null
190 } else {
191 coerce_for_column(val, col, strict)?
192 };
193 }
194 } else if let Some(sel) = sel_rows {
195 let sel_row = &sel[idx];
196 for (i, val) in sel_row.iter().enumerate() {
197 let col_idx = col_indices[i];
198 let col = &table_schema.columns[col_idx];
199 row[col_idx] = if val.is_null() {
200 Value::Null
201 } else {
202 coerce_for_column(val.clone(), col, strict)?
203 };
204 }
205 }
206
207 for &(pos, def_expr) in &defaults {
208 let val = eval_const_expr(def_expr)?;
209 let col = &table_schema.columns[pos];
210 if !val.is_null() {
211 row[pos] = coerce_for_column(val, col, strict)?;
212 }
213 }
214
215 if let Some(ref gen_map) = row_col_map_for_gen {
216 for &(pos, gen_expr) in &generated_cols {
217 let val = eval_expr(gen_expr, &EvalCtx::new(gen_map, &row))?;
218 let col = &table_schema.columns[pos];
219 row[pos] = if val.is_null() {
220 Value::Null
221 } else {
222 coerce_for_column(val, col, strict)?
223 };
224 }
225 }
226
227 for col in &table_schema.columns {
228 if !col.nullable && row[col.position as usize].is_null() {
229 return Err(SqlError::NotNullViolation(col.name.clone()));
230 }
231 }
232
233 if let Some(ref col_map) = check_col_map {
234 for col in &table_schema.columns {
235 if let Some(ref check) = col.check_expr {
236 let result = eval_expr(check, &EvalCtx::new(col_map, &row))?;
237 if !is_truthy(&result) && !result.is_null() {
238 let name = col.check_name.as_deref().unwrap_or(&col.name);
239 return Err(SqlError::CheckViolation(name.to_string()));
240 }
241 }
242 }
243 for tc in &table_schema.check_constraints {
244 let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &row))?;
245 if !is_truthy(&result) && !result.is_null() {
246 let name = tc.name.as_deref().unwrap_or(&tc.sql);
247 return Err(SqlError::CheckViolation(name.to_string()));
248 }
249 }
250 }
251
252 for fk in &table_schema.foreign_keys {
253 let any_null = fk.columns.iter().any(|&ci| row[ci as usize].is_null());
254 if any_null {
255 continue; }
257 let fk_vals: Vec<Value> = fk
258 .columns
259 .iter()
260 .map(|&ci| row[ci as usize].clone())
261 .collect();
262 fk_key_buf.clear();
263 encode_composite_key_into(&fk_vals, &mut fk_key_buf);
264 if fk.deferrable && fk.initially_deferred {
265 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
266 wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
267 fk_name: name,
268 foreign_table: fk.foreign_table.as_bytes().to_vec(),
269 parent_key: fk_key_buf.clone(),
270 });
271 continue;
272 }
273 if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key_buf) {
274 let found = wtx
275 .table_get(fk.foreign_table.as_bytes(), &fk_key_buf)
276 .map_err(SqlError::Storage)?;
277 if found.is_none() {
278 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
279 return Err(SqlError::ForeignKeyViolation(name.to_string()));
280 }
281 wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key_buf);
282 }
283 }
284
285 let proposed_row_for_returning: Option<Vec<Value>> =
286 returning_rows.as_ref().map(|_| row.clone());
287
288 for (j, &i) in pk_indices.iter().enumerate() {
289 pk_values[j] = std::mem::replace(&mut row[i], Value::Null);
290 }
291 encode_composite_key_into(&pk_values, &mut key_buf);
292
293 for (j, &i) in non_pk.iter().enumerate() {
294 let col = &table_schema.columns[i];
295 if matches!(
296 col.generated_kind,
297 Some(crate::parser::GeneratedKind::Virtual)
298 ) {
299 value_values[enc_pos[j] as usize] = Value::Null;
300 row[i] = Value::Null;
301 } else {
302 value_values[enc_pos[j] as usize] = std::mem::replace(&mut row[i], Value::Null);
303 }
304 }
305 encode_row_into(&value_values, &mut value_buf);
306
307 if key_buf.len() > citadel_core::MAX_KEY_SIZE {
308 return Err(SqlError::KeyTooLarge {
309 size: key_buf.len(),
310 max: citadel_core::MAX_KEY_SIZE,
311 });
312 }
313 if value_buf.len() > citadel_core::MAX_VALUE_SIZE {
314 return Err(SqlError::RowTooLarge {
315 size: value_buf.len(),
316 max: citadel_core::MAX_VALUE_SIZE,
317 });
318 }
319
320 match compiled_conflict.as_ref() {
321 None => {
322 let is_new = wtx
323 .table_insert(stmt.table.as_bytes(), &key_buf, &value_buf)
324 .map_err(SqlError::Storage)?;
325 if !is_new {
326 return Err(SqlError::DuplicateKey);
327 }
328 if !table_schema.indices.is_empty() {
329 for (j, &i) in pk_indices.iter().enumerate() {
330 row[i] = pk_values[j].clone();
331 }
332 for (j, &i) in non_pk.iter().enumerate() {
333 row[i] =
334 std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
335 }
336 insert_index_entries(&mut wtx, table_schema, &row, &pk_values)?;
337 }
338 count += 1;
339 if let Some(buf) = returning_rows.as_mut() {
340 buf.push((None, proposed_row_for_returning));
341 }
342 }
343 Some(oc) => {
344 let oc_ref: &CompiledOnConflict = oc;
345 let needs_row = upsert_needs_row(oc_ref, table_schema);
346 if needs_row {
347 for (j, &i) in pk_indices.iter().enumerate() {
348 row[i] = pk_values[j].clone();
349 }
350 for (j, &i) in non_pk.iter().enumerate() {
351 row[i] =
352 std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
353 }
354 }
355 let outcome = apply_insert_with_conflict(
356 &mut wtx,
357 table_schema,
358 &key_buf,
359 &value_buf,
360 &row,
361 &pk_values,
362 oc_ref,
363 row_col_map.as_ref().unwrap(),
364 stmt.returning.is_some(),
365 )?;
366 match outcome {
367 InsertRowOutcome::Inserted => {
368 count += 1;
369 if let Some(buf) = returning_rows.as_mut() {
370 buf.push((None, proposed_row_for_returning));
371 }
372 }
373 InsertRowOutcome::Updated { old, new } => {
374 count += 1;
375 if let Some(buf) = returning_rows.as_mut() {
376 buf.push((Some(old), Some(new)));
377 }
378 }
379 InsertRowOutcome::Skipped => {}
380 }
381 }
382 }
383 }
384
385 if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
386 let qr = super::helpers::project_returning(table_schema, returning_cols, &rows)?;
387 super::helpers::drain_deferred_fk_checks(&mut wtx)?;
388 wtx.commit().map_err(SqlError::Storage)?;
389 return Ok(ExecutionResult::Query(qr));
390 }
391
392 super::helpers::drain_deferred_fk_checks(&mut wtx)?;
393 wtx.commit().map_err(SqlError::Storage)?;
394 Ok(ExecutionResult::RowsAffected(count))
395}
396
397pub(super) fn has_subquery(expr: &Expr) -> bool {
398 crate::parser::has_subquery(expr)
399}
400
401pub(super) fn stmt_has_subquery(stmt: &SelectStmt) -> bool {
402 if let Some(ref w) = stmt.where_clause {
403 if has_subquery(w) {
404 return true;
405 }
406 }
407 if let Some(ref h) = stmt.having {
408 if has_subquery(h) {
409 return true;
410 }
411 }
412 for col in &stmt.columns {
413 if let SelectColumn::Expr { expr, .. } = col {
414 if has_subquery(expr) {
415 return true;
416 }
417 }
418 }
419 for ob in &stmt.order_by {
420 if has_subquery(&ob.expr) {
421 return true;
422 }
423 }
424 for join in &stmt.joins {
425 if let Some(ref on_expr) = join.on_clause {
426 if has_subquery(on_expr) {
427 return true;
428 }
429 }
430 }
431 false
432}
433
434pub(super) fn materialize_expr(
435 expr: &Expr,
436 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
437) -> Result<Expr> {
438 match expr {
439 Expr::InSubquery {
440 expr: e,
441 subquery,
442 negated,
443 } => {
444 let inner = materialize_expr(e, exec_sub)?;
445 let qr = exec_sub(subquery)?;
446 if !qr.columns.is_empty() && qr.columns.len() != 1 {
447 return Err(SqlError::SubqueryMultipleColumns);
448 }
449 let mut values = rustc_hash::FxHashSet::default();
450 let mut has_null = false;
451 for row in &qr.rows {
452 if row[0].is_null() {
453 has_null = true;
454 } else {
455 values.insert(row[0].clone());
456 }
457 }
458 Ok(Expr::InSet {
459 expr: Box::new(inner),
460 values,
461 has_null,
462 negated: *negated,
463 })
464 }
465 Expr::ScalarSubquery(subquery) => {
466 let qr = exec_sub(subquery)?;
467 if qr.rows.len() > 1 {
468 return Err(SqlError::SubqueryMultipleRows);
469 }
470 let val = if qr.rows.is_empty() {
471 Value::Null
472 } else {
473 qr.rows[0][0].clone()
474 };
475 Ok(Expr::Literal(val))
476 }
477 Expr::Exists { subquery, negated } => {
478 let qr = exec_sub(subquery)?;
479 let exists = !qr.rows.is_empty();
480 let result = if *negated { !exists } else { exists };
481 Ok(Expr::Literal(Value::Boolean(result)))
482 }
483 Expr::InList {
484 expr: e,
485 list,
486 negated,
487 } => {
488 let inner = materialize_expr(e, exec_sub)?;
489 let items = list
490 .iter()
491 .map(|item| materialize_expr(item, exec_sub))
492 .collect::<Result<Vec<_>>>()?;
493 Ok(Expr::InList {
494 expr: Box::new(inner),
495 list: items,
496 negated: *negated,
497 })
498 }
499 Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
500 left: Box::new(materialize_expr(left, exec_sub)?),
501 op: *op,
502 right: Box::new(materialize_expr(right, exec_sub)?),
503 }),
504 Expr::UnaryOp { op, expr: e } => Ok(Expr::UnaryOp {
505 op: *op,
506 expr: Box::new(materialize_expr(e, exec_sub)?),
507 }),
508 Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(materialize_expr(e, exec_sub)?))),
509 Expr::IsNotNull(e) => Ok(Expr::IsNotNull(Box::new(materialize_expr(e, exec_sub)?))),
510 Expr::InSet {
511 expr: e,
512 values,
513 has_null,
514 negated,
515 } => Ok(Expr::InSet {
516 expr: Box::new(materialize_expr(e, exec_sub)?),
517 values: values.clone(),
518 has_null: *has_null,
519 negated: *negated,
520 }),
521 Expr::Between {
522 expr: e,
523 low,
524 high,
525 negated,
526 } => Ok(Expr::Between {
527 expr: Box::new(materialize_expr(e, exec_sub)?),
528 low: Box::new(materialize_expr(low, exec_sub)?),
529 high: Box::new(materialize_expr(high, exec_sub)?),
530 negated: *negated,
531 }),
532 Expr::Like {
533 expr: e,
534 pattern,
535 escape,
536 negated,
537 } => {
538 let esc = escape
539 .as_ref()
540 .map(|es| materialize_expr(es, exec_sub).map(Box::new))
541 .transpose()?;
542 Ok(Expr::Like {
543 expr: Box::new(materialize_expr(e, exec_sub)?),
544 pattern: Box::new(materialize_expr(pattern, exec_sub)?),
545 escape: esc,
546 negated: *negated,
547 })
548 }
549 Expr::Case {
550 operand,
551 conditions,
552 else_result,
553 } => {
554 let op = operand
555 .as_ref()
556 .map(|e| materialize_expr(e, exec_sub).map(Box::new))
557 .transpose()?;
558 let conds = conditions
559 .iter()
560 .map(|(c, r)| {
561 Ok((
562 materialize_expr(c, exec_sub)?,
563 materialize_expr(r, exec_sub)?,
564 ))
565 })
566 .collect::<Result<Vec<_>>>()?;
567 let else_r = else_result
568 .as_ref()
569 .map(|e| materialize_expr(e, exec_sub).map(Box::new))
570 .transpose()?;
571 Ok(Expr::Case {
572 operand: op,
573 conditions: conds,
574 else_result: else_r,
575 })
576 }
577 Expr::Coalesce(args) => {
578 let materialized = args
579 .iter()
580 .map(|a| materialize_expr(a, exec_sub))
581 .collect::<Result<Vec<_>>>()?;
582 Ok(Expr::Coalesce(materialized))
583 }
584 Expr::Cast { expr: e, data_type } => Ok(Expr::Cast {
585 expr: Box::new(materialize_expr(e, exec_sub)?),
586 data_type: *data_type,
587 }),
588 Expr::Function {
589 name,
590 args,
591 distinct,
592 } => {
593 let materialized = args
594 .iter()
595 .map(|a| materialize_expr(a, exec_sub))
596 .collect::<Result<Vec<_>>>()?;
597 Ok(Expr::Function {
598 name: name.clone(),
599 args: materialized,
600 distinct: *distinct,
601 })
602 }
603 other => Ok(other.clone()),
604 }
605}
606
607pub(super) fn materialize_stmt(
608 stmt: &SelectStmt,
609 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
610) -> Result<SelectStmt> {
611 let where_clause = stmt
612 .where_clause
613 .as_ref()
614 .map(|e| materialize_expr(e, exec_sub))
615 .transpose()?;
616 let having = stmt
617 .having
618 .as_ref()
619 .map(|e| materialize_expr(e, exec_sub))
620 .transpose()?;
621 let columns = stmt
622 .columns
623 .iter()
624 .map(|c| match c {
625 SelectColumn::AllColumns => Ok(SelectColumn::AllColumns),
626 SelectColumn::AllFromOld => Ok(SelectColumn::AllFromOld),
627 SelectColumn::AllFromNew => Ok(SelectColumn::AllFromNew),
628 SelectColumn::Expr { expr, alias } => Ok(SelectColumn::Expr {
629 expr: materialize_expr(expr, exec_sub)?,
630 alias: alias.clone(),
631 }),
632 })
633 .collect::<Result<Vec<_>>>()?;
634 let order_by = stmt
635 .order_by
636 .iter()
637 .map(|ob| {
638 Ok(OrderByItem {
639 expr: materialize_expr(&ob.expr, exec_sub)?,
640 descending: ob.descending,
641 nulls_first: ob.nulls_first,
642 })
643 })
644 .collect::<Result<Vec<_>>>()?;
645 let joins = stmt
646 .joins
647 .iter()
648 .map(|j| {
649 let on_clause = j
650 .on_clause
651 .as_ref()
652 .map(|e| materialize_expr(e, exec_sub))
653 .transpose()?;
654 Ok(JoinClause {
655 join_type: j.join_type,
656 table: j.table.clone(),
657 subquery: j.subquery.clone(),
658 on_clause,
659 })
660 })
661 .collect::<Result<Vec<_>>>()?;
662 let group_by = stmt
663 .group_by
664 .iter()
665 .map(|e| materialize_expr(e, exec_sub))
666 .collect::<Result<Vec<_>>>()?;
667 Ok(SelectStmt {
668 columns,
669 from: stmt.from.clone(),
670 from_alias: stmt.from_alias.clone(),
671 from_subquery: stmt.from_subquery.clone(),
672 from_args: stmt.from_args.clone(),
673 from_json_table: stmt.from_json_table.clone(),
674 joins,
675 distinct: stmt.distinct,
676 where_clause,
677 order_by,
678 limit: stmt.limit.clone(),
679 offset: stmt.offset.clone(),
680 group_by,
681 having,
682 })
683}
684
685pub(super) fn exec_subquery_read(
686 db: &Database,
687 schema: &SchemaManager,
688 stmt: &SelectStmt,
689 ctes: &CteContext,
690) -> Result<QueryResult> {
691 match super::exec_select(db, schema, stmt, ctes)? {
692 ExecutionResult::Query(qr) => Ok(qr),
693 _ => Ok(QueryResult {
694 columns: vec![],
695 rows: vec![],
696 }),
697 }
698}
699
700pub(super) fn exec_subquery_write(
701 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
702 schema: &SchemaManager,
703 stmt: &SelectStmt,
704 ctes: &CteContext,
705) -> Result<QueryResult> {
706 match super::exec_select_in_txn(wtx, schema, stmt, ctes)? {
707 ExecutionResult::Query(qr) => Ok(qr),
708 _ => Ok(QueryResult {
709 columns: vec![],
710 rows: vec![],
711 }),
712 }
713}
714
715pub(super) fn update_has_subquery(stmt: &UpdateStmt) -> bool {
716 stmt.where_clause.as_ref().is_some_and(has_subquery)
717 || stmt.assignments.iter().any(|(_, e)| has_subquery(e))
718}
719
720pub(super) fn materialize_update(
721 stmt: &UpdateStmt,
722 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
723) -> Result<UpdateStmt> {
724 let where_clause = stmt
725 .where_clause
726 .as_ref()
727 .map(|e| materialize_expr(e, exec_sub))
728 .transpose()?;
729 let assignments = stmt
730 .assignments
731 .iter()
732 .map(|(name, expr)| Ok((name.clone(), materialize_expr(expr, exec_sub)?)))
733 .collect::<Result<Vec<_>>>()?;
734 Ok(UpdateStmt {
735 table: stmt.table.clone(),
736 assignments,
737 where_clause,
738 returning: stmt.returning.clone(),
739 })
740}
741
742pub(super) fn delete_has_subquery(stmt: &DeleteStmt) -> bool {
743 stmt.where_clause.as_ref().is_some_and(has_subquery)
744}
745
746pub(super) fn materialize_delete(
747 stmt: &DeleteStmt,
748 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
749) -> Result<DeleteStmt> {
750 let where_clause = stmt
751 .where_clause
752 .as_ref()
753 .map(|e| materialize_expr(e, exec_sub))
754 .transpose()?;
755 Ok(DeleteStmt {
756 table: stmt.table.clone(),
757 where_clause,
758 returning: stmt.returning.clone(),
759 })
760}
761
762pub(super) fn insert_has_subquery(stmt: &InsertStmt) -> bool {
763 match &stmt.source {
764 InsertSource::Values(rows) => rows.iter().any(|row| row.iter().any(has_subquery)),
765 InsertSource::Select(_) => false,
767 }
768}
769
770pub(super) fn materialize_insert(
771 stmt: &InsertStmt,
772 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
773) -> Result<InsertStmt> {
774 let source = match &stmt.source {
775 InsertSource::Values(rows) => {
776 let mat = rows
777 .iter()
778 .map(|row| {
779 row.iter()
780 .map(|e| materialize_expr(e, exec_sub))
781 .collect::<Result<Vec<_>>>()
782 })
783 .collect::<Result<Vec<_>>>()?;
784 InsertSource::Values(mat)
785 }
786 InsertSource::Select(sq) => {
787 let ctes = sq
788 .ctes
789 .iter()
790 .map(|c| {
791 Ok(CteDefinition {
792 name: c.name.clone(),
793 column_aliases: c.column_aliases.clone(),
794 body: materialize_query_body(&c.body, exec_sub)?,
795 })
796 })
797 .collect::<Result<Vec<_>>>()?;
798 let body = materialize_query_body(&sq.body, exec_sub)?;
799 InsertSource::Select(Box::new(SelectQuery {
800 ctes,
801 recursive: sq.recursive,
802 body,
803 }))
804 }
805 };
806 Ok(InsertStmt {
807 table: stmt.table.clone(),
808 columns: stmt.columns.clone(),
809 source,
810 on_conflict: stmt.on_conflict.clone(),
811 returning: stmt.returning.clone(),
812 })
813}
814
815pub(super) fn materialize_query_body(
816 body: &QueryBody,
817 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
818) -> Result<QueryBody> {
819 match body {
820 QueryBody::Select(sel) => Ok(QueryBody::Select(Box::new(materialize_stmt(
821 sel, exec_sub,
822 )?))),
823 QueryBody::Compound(comp) => Ok(QueryBody::Compound(Box::new(CompoundSelect {
824 op: comp.op.clone(),
825 all: comp.all,
826 left: Box::new(materialize_query_body(&comp.left, exec_sub)?),
827 right: Box::new(materialize_query_body(&comp.right, exec_sub)?),
828 order_by: comp.order_by.clone(),
829 limit: comp.limit.clone(),
830 offset: comp.offset.clone(),
831 }))),
832 QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Ok(body.clone()),
833 }
834}
835
836pub(super) fn exec_query_body(
837 db: &Database,
838 schema: &SchemaManager,
839 body: &QueryBody,
840 ctes: &CteContext,
841) -> Result<ExecutionResult> {
842 match body {
843 QueryBody::Select(sel) => super::exec_select(db, schema, sel, ctes),
844 QueryBody::Compound(comp) => exec_compound_select(db, schema, comp, ctes),
845 QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Err(
846 SqlError::Unsupported("DML CTE bodies require an active write transaction".into()),
847 ),
848 }
849}
850
851pub(super) fn exec_query_body_in_txn(
852 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
853 schema: &SchemaManager,
854 body: &QueryBody,
855 ctes: &CteContext,
856) -> Result<ExecutionResult> {
857 match body {
858 QueryBody::Select(sel) => super::exec_select_in_txn(wtx, schema, sel, ctes),
859 QueryBody::Compound(comp) => exec_compound_select_in_txn(wtx, schema, comp, ctes),
860 QueryBody::Insert(ins) => exec_insert_in_txn_with_ctes(wtx, schema, ins, &[], ctes),
861 QueryBody::Update(upd) => super::exec_update_in_txn(wtx, schema, upd),
862 QueryBody::Delete(del) => super::exec_delete_in_txn(wtx, schema, del),
863 }
864}
865
866pub(super) fn exec_query_body_read(
867 db: &Database,
868 schema: &SchemaManager,
869 body: &QueryBody,
870 ctes: &CteContext,
871) -> Result<QueryResult> {
872 match exec_query_body(db, schema, body, ctes)? {
873 ExecutionResult::Query(qr) => Ok(qr),
874 _ => Ok(QueryResult {
875 columns: vec![],
876 rows: vec![],
877 }),
878 }
879}
880
881pub(super) fn exec_query_body_write(
882 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
883 schema: &SchemaManager,
884 body: &QueryBody,
885 ctes: &CteContext,
886) -> Result<QueryResult> {
887 match exec_query_body_in_txn(wtx, schema, body, ctes)? {
888 ExecutionResult::Query(qr) => Ok(qr),
889 _ => Ok(QueryResult {
890 columns: vec![],
891 rows: vec![],
892 }),
893 }
894}
895
896pub(super) fn exec_compound_select(
897 db: &Database,
898 schema: &SchemaManager,
899 comp: &CompoundSelect,
900 ctes: &CteContext,
901) -> Result<ExecutionResult> {
902 let left_qr = match exec_query_body(db, schema, &comp.left, ctes)? {
903 ExecutionResult::Query(qr) => qr,
904 _ => QueryResult {
905 columns: vec![],
906 rows: vec![],
907 },
908 };
909 let right_qr = match exec_query_body(db, schema, &comp.right, ctes)? {
910 ExecutionResult::Query(qr) => qr,
911 _ => QueryResult {
912 columns: vec![],
913 rows: vec![],
914 },
915 };
916 apply_set_operation(comp, left_qr, right_qr)
917}
918
919pub(super) fn exec_compound_select_in_txn(
920 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
921 schema: &SchemaManager,
922 comp: &CompoundSelect,
923 ctes: &CteContext,
924) -> Result<ExecutionResult> {
925 let left_qr = match exec_query_body_in_txn(wtx, schema, &comp.left, ctes)? {
926 ExecutionResult::Query(qr) => qr,
927 _ => QueryResult {
928 columns: vec![],
929 rows: vec![],
930 },
931 };
932 let right_qr = match exec_query_body_in_txn(wtx, schema, &comp.right, ctes)? {
933 ExecutionResult::Query(qr) => qr,
934 _ => QueryResult {
935 columns: vec![],
936 rows: vec![],
937 },
938 };
939 apply_set_operation(comp, left_qr, right_qr)
940}
941
942pub(super) fn apply_set_operation(
943 comp: &CompoundSelect,
944 left_qr: QueryResult,
945 right_qr: QueryResult,
946) -> Result<ExecutionResult> {
947 if !left_qr.columns.is_empty()
948 && !right_qr.columns.is_empty()
949 && left_qr.columns.len() != right_qr.columns.len()
950 {
951 return Err(SqlError::CompoundColumnCountMismatch {
952 left: left_qr.columns.len(),
953 right: right_qr.columns.len(),
954 });
955 }
956
957 let columns = left_qr.columns;
958
959 let mut rows = match (&comp.op, comp.all) {
960 (SetOp::Union, true) => {
961 let total = left_qr.rows.len().saturating_add(right_qr.rows.len());
962 let mut rows = Vec::with_capacity(total);
963 rows.extend(left_qr.rows);
964 rows.extend(right_qr.rows);
965 rows
966 }
967 (SetOp::Union, false) => {
968 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
969 let mut rows = Vec::new();
970 for row in left_qr.rows.into_iter().chain(right_qr.rows) {
971 if !seen.contains(&row) {
972 seen.insert(row.clone());
973 rows.push(row);
974 }
975 }
976 rows
977 }
978 (SetOp::Intersect, true) => {
979 let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
980 for row in &right_qr.rows {
981 *right_counts.entry(row.clone()).or_insert(0) += 1;
982 }
983 let mut rows = Vec::new();
984 for row in left_qr.rows {
985 if let Some(count) = right_counts.get_mut(&row) {
986 if *count > 0 {
987 *count -= 1;
988 rows.push(row);
989 }
990 }
991 }
992 rows
993 }
994 (SetOp::Intersect, false) => {
995 let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
996 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
997 let mut rows = Vec::new();
998 for row in left_qr.rows {
999 if right_set.contains(&row) && !seen.contains(&row) {
1000 seen.insert(row.clone());
1001 rows.push(row);
1002 }
1003 }
1004 rows
1005 }
1006 (SetOp::Except, true) => {
1007 let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1008 for row in &right_qr.rows {
1009 *right_counts.entry(row.clone()).or_insert(0) += 1;
1010 }
1011 let mut rows = Vec::new();
1012 for row in left_qr.rows {
1013 if let Some(count) = right_counts.get_mut(&row) {
1014 if *count > 0 {
1015 *count -= 1;
1016 continue;
1017 }
1018 }
1019 rows.push(row);
1020 }
1021 rows
1022 }
1023 (SetOp::Except, false) => {
1024 let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1025 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1026 let mut rows = Vec::new();
1027 for row in left_qr.rows {
1028 if !right_set.contains(&row) && !seen.contains(&row) {
1029 seen.insert(row.clone());
1030 rows.push(row);
1031 }
1032 }
1033 rows
1034 }
1035 };
1036
1037 if !comp.order_by.is_empty() {
1038 let col_defs: Vec<crate::types::ColumnDef> = columns
1039 .iter()
1040 .enumerate()
1041 .map(|(i, name)| crate::types::ColumnDef {
1042 name: name.clone(),
1043 data_type: crate::types::DataType::Null,
1044 nullable: true,
1045 position: i as u16,
1046 default_expr: None,
1047 default_sql: None,
1048 check_expr: None,
1049 check_sql: None,
1050 check_name: None,
1051 is_with_timezone: false,
1052 generated_expr: None,
1053 generated_sql: None,
1054 generated_kind: None,
1055 collation: crate::types::Collation::Binary,
1056 })
1057 .collect();
1058 sort_rows(&mut rows, &comp.order_by, &col_defs)?;
1059 }
1060
1061 if let Some(ref offset_expr) = comp.offset {
1062 let offset = eval_const_int(offset_expr)?.max(0) as usize;
1063 if offset < rows.len() {
1064 rows = rows.split_off(offset);
1065 } else {
1066 rows.clear();
1067 }
1068 }
1069
1070 if let Some(ref limit_expr) = comp.limit {
1071 let limit = eval_const_int(limit_expr)?.max(0) as usize;
1072 rows.truncate(limit);
1073 }
1074
1075 Ok(ExecutionResult::Query(QueryResult { columns, rows }))
1076}
1077
1078struct InsertBufs {
1079 row: Vec<Value>,
1080 pk_values: Vec<Value>,
1081 value_values: Vec<Value>,
1082 key_buf: Vec<u8>,
1083 value_buf: Vec<u8>,
1084 col_indices: Vec<usize>,
1085 fk_key_buf: Vec<u8>,
1086}
1087
1088impl InsertBufs {
1089 fn new() -> Self {
1090 Self {
1091 row: Vec::new(),
1092 pk_values: Vec::new(),
1093 value_values: Vec::new(),
1094 key_buf: Vec::with_capacity(64),
1095 value_buf: Vec::with_capacity(256),
1096 col_indices: Vec::new(),
1097 fk_key_buf: Vec::with_capacity(64),
1098 }
1099 }
1100}
1101
1102thread_local! {
1103 static INSERT_SCRATCH: RefCell<InsertBufs> = RefCell::new(InsertBufs::new());
1104 static UPSERT_SCRATCH: RefCell<UpsertBufs> = RefCell::new(UpsertBufs::new());
1105}
1106
1107fn with_insert_scratch<R>(f: impl FnOnce(&mut InsertBufs) -> R) -> R {
1108 INSERT_SCRATCH.with(|slot| f(&mut slot.borrow_mut()))
1109}
1110
1111pub(super) struct UpsertBufs {
1112 old_row: Vec<Value>,
1113 new_row: Vec<Value>,
1114 value_values: Vec<Value>,
1115 new_value_buf: Vec<u8>,
1116}
1117
1118impl UpsertBufs {
1119 pub(super) fn new() -> Self {
1120 Self {
1121 old_row: Vec::new(),
1122 new_row: Vec::new(),
1123 value_values: Vec::new(),
1124 new_value_buf: Vec::with_capacity(256),
1125 }
1126 }
1127}
1128
1129pub fn exec_insert_in_txn(
1130 wtx: &mut WriteTxn<'_>,
1131 schema: &SchemaManager,
1132 stmt: &InsertStmt,
1133 params: &[Value],
1134) -> Result<ExecutionResult> {
1135 with_insert_scratch(|bufs| {
1136 exec_insert_in_txn_impl(
1137 wtx,
1138 schema,
1139 stmt,
1140 params,
1141 bufs,
1142 None,
1143 &CteContext::default(),
1144 )
1145 })
1146}
1147
1148pub(super) fn exec_insert_in_txn_with_ctes(
1149 wtx: &mut WriteTxn<'_>,
1150 schema: &SchemaManager,
1151 stmt: &InsertStmt,
1152 params: &[Value],
1153 outer_ctes: &CteContext,
1154) -> Result<ExecutionResult> {
1155 with_insert_scratch(|bufs| {
1156 exec_insert_in_txn_impl(wtx, schema, stmt, params, bufs, None, outer_ctes)
1157 })
1158}
1159
1160fn exec_insert_in_txn_cached(
1161 wtx: &mut WriteTxn<'_>,
1162 schema: &SchemaManager,
1163 stmt: &InsertStmt,
1164 params: &[Value],
1165 cache: &InsertCache,
1166) -> Result<ExecutionResult> {
1167 with_insert_scratch(|bufs| {
1168 exec_insert_in_txn_impl(
1169 wtx,
1170 schema,
1171 stmt,
1172 params,
1173 bufs,
1174 Some(cache),
1175 &CteContext::default(),
1176 )
1177 })
1178}
1179
1180fn exec_insert_in_txn_impl(
1181 wtx: &mut WriteTxn<'_>,
1182 schema: &SchemaManager,
1183 stmt: &InsertStmt,
1184 params: &[Value],
1185 bufs: &mut InsertBufs,
1186 cache: Option<&InsertCache>,
1187 outer_ctes: &CteContext,
1188) -> Result<ExecutionResult> {
1189 let empty_ctes = CteContext::default();
1190 let materialized;
1191 let has_sub = match cache {
1192 Some(c) => c.has_subquery,
1193 None => insert_has_subquery(stmt),
1194 };
1195 let stmt = if has_sub {
1196 materialized = materialize_insert(stmt, &mut |sub| {
1197 exec_subquery_write(wtx, schema, sub, &empty_ctes)
1198 })?;
1199 &materialized
1200 } else {
1201 stmt
1202 };
1203
1204 let table_schema = schema
1205 .get(&stmt.table)
1206 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1207
1208 let default_columns;
1209 let insert_columns: &[String] = if stmt.columns.is_empty() {
1210 default_columns = table_schema
1211 .columns
1212 .iter()
1213 .map(|c| c.name.clone())
1214 .collect::<Vec<_>>();
1215 &default_columns
1216 } else {
1217 &stmt.columns
1218 };
1219
1220 bufs.col_indices.clear();
1221 if let Some(c) = cache {
1222 bufs.col_indices.extend_from_slice(&c.col_indices);
1223 } else {
1224 for name in insert_columns {
1225 bufs.col_indices.push(
1226 table_schema
1227 .column_index(name)
1228 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?,
1229 );
1230 }
1231 }
1232
1233 if cache.is_none() {
1234 for &ci in &bufs.col_indices {
1235 if table_schema.columns[ci].generated_kind.is_some() {
1236 return Err(SqlError::CannotInsertIntoGeneratedColumn(
1237 table_schema.columns[ci].name.clone(),
1238 ));
1239 }
1240 }
1241 }
1242
1243 let generated_cols_uncached: Vec<(usize, &Expr, FastGenEval)>;
1244 let cached_gen_positions: &[usize];
1245 let cached_gen_fast_evals: &[FastGenEval];
1246 if let Some(c) = cache {
1247 cached_gen_positions = &c.generated_col_positions;
1248 cached_gen_fast_evals = &c.generated_fast_evals;
1249 generated_cols_uncached = Vec::new();
1250 } else {
1251 cached_gen_positions = &[];
1252 cached_gen_fast_evals = &[];
1253 generated_cols_uncached = table_schema
1254 .columns
1255 .iter()
1256 .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
1257 .map(|c| {
1258 let expr = c.generated_expr.as_ref().unwrap();
1259 let fe = detect_fast_gen_eval(expr, table_schema);
1260 (c.position as usize, expr, fe)
1261 })
1262 .collect();
1263 }
1264 let has_gen_cols = !cached_gen_positions.is_empty() || !generated_cols_uncached.is_empty();
1265 let row_col_map_for_gen_owned: Option<ColumnMap> = if !has_gen_cols || cache.is_some() {
1266 None
1267 } else {
1268 Some(ColumnMap::new(&table_schema.columns))
1269 };
1270 let row_col_map_for_gen: Option<&ColumnMap> = if !has_gen_cols {
1271 None
1272 } else if let Some(c) = cache {
1273 c.row_col_map.as_ref()
1274 } else {
1275 row_col_map_for_gen_owned.as_ref()
1276 };
1277
1278 let any_defaults = match cache {
1279 Some(c) => c.any_defaults,
1280 None => table_schema
1281 .columns
1282 .iter()
1283 .any(|c| c.default_expr.is_some()),
1284 };
1285 let defaults: Vec<(usize, &Expr)> = if any_defaults {
1286 table_schema
1287 .columns
1288 .iter()
1289 .filter(|c| {
1290 c.default_expr.is_some() && !bufs.col_indices.contains(&(c.position as usize))
1291 })
1292 .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
1293 .collect()
1294 } else {
1295 Vec::new()
1296 };
1297
1298 let has_checks = match cache {
1299 Some(c) => c.has_checks,
1300 None => table_schema.has_checks(),
1301 };
1302 let check_col_map = if has_checks {
1303 Some(ColumnMap::new(&table_schema.columns))
1304 } else {
1305 None
1306 };
1307
1308 let (pk_indices, non_pk, enc_pos, phys_count, dropped): (
1309 &[usize],
1310 &[usize],
1311 &[u16],
1312 usize,
1313 &[u16],
1314 ) = if let Some(c) = cache {
1315 (
1316 &c.pk_indices,
1317 &c.non_pk_indices,
1318 &c.encoding_positions,
1319 c.phys_count,
1320 &c.dropped_non_pk_slots,
1321 )
1322 } else {
1323 (
1324 table_schema.pk_indices(),
1325 table_schema.non_pk_indices(),
1326 table_schema.encoding_positions(),
1327 table_schema.physical_non_pk_count(),
1328 table_schema.dropped_non_pk_slots(),
1329 )
1330 };
1331
1332 bufs.row.resize(table_schema.columns.len(), Value::Null);
1333 bufs.pk_values.resize(pk_indices.len(), Value::Null);
1334 bufs.value_values.resize(phys_count, Value::Null);
1335
1336 let table_bytes = stmt.table.as_bytes();
1337 let has_fks = !table_schema.foreign_keys.is_empty();
1338 let has_indices = !table_schema.indices.is_empty();
1339 let has_defaults = !defaults.is_empty();
1340
1341 let compiled_conflict: Option<Arc<CompiledOnConflict>> = match (cache, &stmt.on_conflict) {
1342 (Some(c), Some(_)) if c.on_conflict.is_some() => c.on_conflict.clone(),
1343 (_, Some(oc)) => Some(Arc::new(compile_on_conflict(oc, table_schema)?)),
1344 (_, None) => None,
1345 };
1346
1347 let row_col_map_owned: Option<ColumnMap> =
1348 if compiled_conflict.is_some() && cache.and_then(|c| c.row_col_map.as_ref()).is_none() {
1349 Some(ColumnMap::new(&table_schema.columns))
1350 } else {
1351 None
1352 };
1353 let row_col_map: Option<&ColumnMap> = cache
1354 .and_then(|c| c.row_col_map.as_ref())
1355 .or(row_col_map_owned.as_ref());
1356
1357 let select_rows = match &stmt.source {
1358 InsertSource::Select(sq) => {
1359 let insert_ctes = super::materialize_all_ctes_with_outer(
1360 &sq.ctes,
1361 sq.recursive,
1362 outer_ctes,
1363 &mut |body, ctx| exec_query_body_write(wtx, schema, body, ctx),
1364 )?;
1365 let qr = exec_query_body_write(wtx, schema, &sq.body, &insert_ctes)?;
1366 Some(qr.rows)
1367 }
1368 InsertSource::Values(_) => None,
1369 };
1370
1371 let mut count: u64 = 0;
1372 let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
1373 stmt.returning.as_ref().map(|_| Vec::new());
1374
1375 let values = match &stmt.source {
1376 InsertSource::Values(rows) => Some(rows.as_slice()),
1377 InsertSource::Select(_) => None,
1378 };
1379 let sel_rows = select_rows.as_deref();
1380
1381 let total = match (values, sel_rows) {
1382 (Some(rows), _) => rows.len(),
1383 (_, Some(rows)) => rows.len(),
1384 _ => 0,
1385 };
1386
1387 if let Some(sel) = sel_rows {
1388 if !sel.is_empty() && sel[0].len() != insert_columns.len() {
1389 return Err(SqlError::InvalidValue(format!(
1390 "INSERT ... SELECT column count mismatch: expected {}, got {}",
1391 insert_columns.len(),
1392 sel[0].len()
1393 )));
1394 }
1395 }
1396
1397 let skip_row_clear = cache.is_some_and(|c| c.row_fully_overwritten);
1398 for idx in 0..total {
1399 if !skip_row_clear {
1400 for v in bufs.row.iter_mut() {
1401 *v = Value::Null;
1402 }
1403 }
1404
1405 if let Some(value_rows) = values {
1406 if let Some(plan) = cache.and_then(|c| c.bind_plan.as_ref()) {
1407 for action in plan {
1408 match action {
1409 BindAction::Param {
1410 param_idx,
1411 col_idx,
1412 target,
1413 } => {
1414 let v = ¶ms[*param_idx];
1415 bufs.row[*col_idx] = if v.is_null() {
1416 Value::Null
1417 } else if v.data_type() == *target {
1418 v.clone()
1419 } else {
1420 let got = v.data_type();
1421 v.clone().coerce_into(*target).ok_or_else(|| {
1422 SqlError::TypeMismatch {
1423 expected: target.to_string(),
1424 got: got.to_string(),
1425 }
1426 })?
1427 };
1428 }
1429 BindAction::Literal { value, col_idx } => {
1430 bufs.row[*col_idx] = value.clone();
1431 }
1432 }
1433 }
1434 } else {
1435 let value_row = &value_rows[idx];
1436 if value_row.len() != insert_columns.len() {
1437 return Err(SqlError::InvalidValue(format!(
1438 "expected {} values, got {}",
1439 insert_columns.len(),
1440 value_row.len()
1441 )));
1442 }
1443 for (i, expr) in value_row.iter().enumerate() {
1444 let val = match expr {
1445 Expr::Parameter(n) => params
1446 .get(n - 1)
1447 .cloned()
1448 .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
1449 Expr::Literal(v) => v.clone(),
1450 _ => eval_const_expr(expr)?,
1451 };
1452 let col_idx = bufs.col_indices[i];
1453 let col = &table_schema.columns[col_idx];
1454 let got_type = val.data_type();
1455 bufs.row[col_idx] = if val.is_null() {
1456 Value::Null
1457 } else {
1458 val.coerce_into(col.data_type)
1459 .ok_or_else(|| SqlError::TypeMismatch {
1460 expected: col.data_type.to_string(),
1461 got: got_type.to_string(),
1462 })?
1463 };
1464 }
1465 }
1466 } else if let Some(sel) = sel_rows {
1467 let sel_row = &sel[idx];
1468 for (i, val) in sel_row.iter().enumerate() {
1469 let col_idx = bufs.col_indices[i];
1470 let col = &table_schema.columns[col_idx];
1471 let got_type = val.data_type();
1472 bufs.row[col_idx] = if val.is_null() {
1473 Value::Null
1474 } else {
1475 val.clone().coerce_into(col.data_type).ok_or_else(|| {
1476 SqlError::TypeMismatch {
1477 expected: col.data_type.to_string(),
1478 got: got_type.to_string(),
1479 }
1480 })?
1481 };
1482 }
1483 }
1484
1485 if has_defaults {
1486 for &(pos, def_expr) in &defaults {
1487 let val = eval_const_expr(def_expr)?;
1488 let col = &table_schema.columns[pos];
1489 if !val.is_null() {
1490 let got_type = val.data_type();
1491 bufs.row[pos] =
1492 val.coerce_into(col.data_type)
1493 .ok_or_else(|| SqlError::TypeMismatch {
1494 expected: col.data_type.to_string(),
1495 got: got_type.to_string(),
1496 })?;
1497 }
1498 }
1499 }
1500
1501 if let Some(gen_map) = row_col_map_for_gen {
1502 if cache.is_some() {
1503 for (pos, fast) in cached_gen_positions
1504 .iter()
1505 .copied()
1506 .zip(cached_gen_fast_evals.iter())
1507 {
1508 let gen_expr = table_schema.columns[pos].generated_expr.as_ref().unwrap();
1509 let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1510 let col = &table_schema.columns[pos];
1511 bufs.row[pos] = if val.is_null() {
1512 Value::Null
1513 } else {
1514 let got_type = val.data_type();
1515 val.coerce_into(col.data_type)
1516 .ok_or_else(|| SqlError::TypeMismatch {
1517 expected: col.data_type.to_string(),
1518 got: got_type.to_string(),
1519 })?
1520 };
1521 }
1522 } else {
1523 for (pos, gen_expr, fast) in &generated_cols_uncached {
1524 let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1525 let col = &table_schema.columns[*pos];
1526 bufs.row[*pos] = if val.is_null() {
1527 Value::Null
1528 } else {
1529 let got_type = val.data_type();
1530 val.coerce_into(col.data_type)
1531 .ok_or_else(|| SqlError::TypeMismatch {
1532 expected: col.data_type.to_string(),
1533 got: got_type.to_string(),
1534 })?
1535 };
1536 }
1537 }
1538 }
1539
1540 if let Some(c) = cache {
1541 for &pos in &c.not_null_indices {
1542 if bufs.row[pos as usize].is_null() {
1543 return Err(SqlError::NotNullViolation(
1544 table_schema.columns[pos as usize].name.clone(),
1545 ));
1546 }
1547 }
1548 } else {
1549 for col in &table_schema.columns {
1550 if !col.nullable && bufs.row[col.position as usize].is_null() {
1551 return Err(SqlError::NotNullViolation(col.name.clone()));
1552 }
1553 }
1554 }
1555
1556 if let Some(ref col_map) = check_col_map {
1557 for col in &table_schema.columns {
1558 if let Some(ref check) = col.check_expr {
1559 let result = eval_expr(check, &EvalCtx::new(col_map, &bufs.row))?;
1560 if !is_truthy(&result) && !result.is_null() {
1561 let name = col.check_name.as_deref().unwrap_or(&col.name);
1562 return Err(SqlError::CheckViolation(name.to_string()));
1563 }
1564 }
1565 }
1566 for tc in &table_schema.check_constraints {
1567 let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &bufs.row))?;
1568 if !is_truthy(&result) && !result.is_null() {
1569 let name = tc.name.as_deref().unwrap_or(&tc.sql);
1570 return Err(SqlError::CheckViolation(name.to_string()));
1571 }
1572 }
1573 }
1574
1575 if has_fks {
1576 for fk in &table_schema.foreign_keys {
1577 let any_null = fk.columns.iter().any(|&ci| bufs.row[ci as usize].is_null());
1578 if any_null {
1579 continue;
1580 }
1581 crate::encoding::encode_composite_key_from_indices(
1582 &fk.columns,
1583 &bufs.row,
1584 &mut bufs.fk_key_buf,
1585 );
1586 if fk.deferrable && fk.initially_deferred {
1587 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
1588 wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
1589 fk_name: name,
1590 foreign_table: fk.foreign_table.as_bytes().to_vec(),
1591 parent_key: bufs.fk_key_buf.clone(),
1592 });
1593 continue;
1594 }
1595 if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &bufs.fk_key_buf) {
1596 let found = wtx
1597 .table_get(fk.foreign_table.as_bytes(), &bufs.fk_key_buf)
1598 .map_err(SqlError::Storage)?;
1599 if found.is_none() {
1600 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
1601 return Err(SqlError::ForeignKeyViolation(name.to_string()));
1602 }
1603 wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &bufs.fk_key_buf);
1604 }
1605 }
1606 }
1607
1608 let proposed_row_for_returning: Option<Vec<Value>> =
1609 returning_rows.as_ref().map(|_| bufs.row.clone());
1610
1611 for (j, &i) in pk_indices.iter().enumerate() {
1612 bufs.pk_values[j] = std::mem::replace(&mut bufs.row[i], Value::Null);
1613 }
1614 match cache.map(|c| c.single_int_pk).unwrap_or(false) {
1615 true => match bufs.pk_values[0] {
1616 Value::Integer(v) => crate::encoding::encode_int_key_into(v, &mut bufs.key_buf),
1617 _ => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1618 },
1619 false => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1620 }
1621
1622 for &slot in dropped {
1623 bufs.value_values[slot as usize] = Value::Null;
1624 }
1625 for (j, &i) in non_pk.iter().enumerate() {
1626 let col = &table_schema.columns[i];
1627 if matches!(
1628 col.generated_kind,
1629 Some(crate::parser::GeneratedKind::Virtual)
1630 ) {
1631 bufs.value_values[enc_pos[j] as usize] = Value::Null;
1632 bufs.row[i] = Value::Null;
1633 } else {
1634 bufs.value_values[enc_pos[j] as usize] =
1635 std::mem::replace(&mut bufs.row[i], Value::Null);
1636 }
1637 }
1638 match cache.and_then(|c| c.row_encoder.as_ref()) {
1639 Some(tmpl) => crate::encoding::encode_int_row_with_template(
1640 tmpl,
1641 &bufs.value_values,
1642 &mut bufs.value_buf,
1643 )?,
1644 None => encode_row_into(&bufs.value_values, &mut bufs.value_buf),
1645 }
1646
1647 if bufs.key_buf.len() > citadel_core::MAX_KEY_SIZE {
1648 return Err(SqlError::KeyTooLarge {
1649 size: bufs.key_buf.len(),
1650 max: citadel_core::MAX_KEY_SIZE,
1651 });
1652 }
1653 if bufs.value_buf.len() > citadel_core::MAX_VALUE_SIZE {
1654 return Err(SqlError::RowTooLarge {
1655 size: bufs.value_buf.len(),
1656 max: citadel_core::MAX_VALUE_SIZE,
1657 });
1658 }
1659
1660 match compiled_conflict.as_ref() {
1661 None => {
1662 let is_new = wtx
1663 .table_insert(table_bytes, &bufs.key_buf, &bufs.value_buf)
1664 .map_err(SqlError::Storage)?;
1665 if !is_new {
1666 return Err(SqlError::DuplicateKey);
1667 }
1668 if has_indices {
1669 for (j, &i) in pk_indices.iter().enumerate() {
1670 bufs.row[i] = bufs.pk_values[j].clone();
1671 }
1672 for (j, &i) in non_pk.iter().enumerate() {
1673 bufs.row[i] = std::mem::replace(
1674 &mut bufs.value_values[enc_pos[j] as usize],
1675 Value::Null,
1676 );
1677 }
1678 insert_index_entries(wtx, table_schema, &bufs.row, &bufs.pk_values)?;
1679 }
1680 count += 1;
1681 if let Some(buf) = returning_rows.as_mut() {
1682 buf.push((None, proposed_row_for_returning));
1683 }
1684 }
1685 Some(oc) => {
1686 let oc_ref: &CompiledOnConflict = oc;
1687 let needs_row = upsert_needs_row(oc_ref, table_schema);
1688 if needs_row {
1689 for (j, &i) in pk_indices.iter().enumerate() {
1690 bufs.row[i] = bufs.pk_values[j].clone();
1691 }
1692 for (j, &i) in non_pk.iter().enumerate() {
1693 bufs.row[i] = std::mem::replace(
1694 &mut bufs.value_values[enc_pos[j] as usize],
1695 Value::Null,
1696 );
1697 }
1698 }
1699 let outcome = apply_insert_with_conflict(
1700 wtx,
1701 table_schema,
1702 &bufs.key_buf,
1703 &bufs.value_buf,
1704 &bufs.row,
1705 &bufs.pk_values,
1706 oc_ref,
1707 row_col_map.unwrap(),
1708 stmt.returning.is_some(),
1709 )?;
1710 match outcome {
1711 InsertRowOutcome::Inserted => {
1712 count += 1;
1713 if let Some(buf) = returning_rows.as_mut() {
1714 buf.push((None, proposed_row_for_returning));
1715 }
1716 }
1717 InsertRowOutcome::Updated { old, new } => {
1718 count += 1;
1719 if let Some(buf) = returning_rows.as_mut() {
1720 buf.push((Some(old), Some(new)));
1721 }
1722 }
1723 InsertRowOutcome::Skipped => {}
1724 }
1725 }
1726 }
1727 }
1728
1729 if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
1730 return Ok(ExecutionResult::Query(super::helpers::project_returning(
1731 table_schema,
1732 returning_cols,
1733 &rows,
1734 )?));
1735 }
1736
1737 Ok(ExecutionResult::RowsAffected(count))
1738}
1739
1740pub struct CompiledInsert {
1741 table_lower: String,
1742 cached: Option<InsertCache>,
1743}
1744
1745struct InsertCache {
1746 col_indices: Vec<usize>,
1747 has_subquery: bool,
1748 any_defaults: bool,
1749 has_checks: bool,
1750 on_conflict: Option<Arc<CompiledOnConflict>>,
1751 row_col_map: Option<ColumnMap>,
1752 generated_col_positions: Vec<usize>,
1753 generated_fast_evals: Vec<FastGenEval>,
1754 pk_indices: Vec<usize>,
1755 non_pk_indices: Vec<usize>,
1756 encoding_positions: Vec<u16>,
1757 dropped_non_pk_slots: Vec<u16>,
1758 phys_count: usize,
1759 single_int_pk: bool,
1760 not_null_indices: Vec<u16>,
1761 bind_plan: Option<Vec<BindAction>>,
1762 row_fully_overwritten: bool,
1763 row_encoder: Option<crate::encoding::IntRowTemplate>,
1764 is_trivial_fast: bool,
1765 trivial_fast_program: Option<TrivialFastProgram>,
1766 needs_scoped_params: bool,
1767}
1768
1769#[derive(Clone)]
1770enum BindAction {
1771 Param {
1772 param_idx: usize,
1773 col_idx: usize,
1774 target: DataType,
1775 },
1776 Literal {
1777 value: Value,
1778 col_idx: usize,
1779 },
1780}
1781
1782#[derive(Clone)]
1783struct TrivialFastProgram {
1784 template: Vec<u8>,
1785 ops: Vec<WriteOp>,
1786 pk_param: u8,
1787 not_null_param_indices: Vec<u8>,
1788}
1789
1790#[derive(Clone)]
1791enum WriteOp {
1792 ParamI64 {
1793 param_idx: u8,
1794 off: u32,
1795 },
1796 LiteralI64 {
1797 value: i64,
1798 off: u32,
1799 },
1800 GenAddParamsI64 {
1801 a_param: u8,
1802 b_param: u8,
1803 off: u32,
1804 bitmap_byte_off: u32,
1805 bitmap_bit_mask: u8,
1806 },
1807 GenMulAddParamI64 {
1808 param_idx: u8,
1809 mul: i64,
1810 add: i64,
1811 off: u32,
1812 bitmap_byte_off: u32,
1813 bitmap_bit_mask: u8,
1814 },
1815}
1816
1817fn build_trivial_fast_program(
1818 bind_plan: &[BindAction],
1819 row_encoder: &crate::encoding::IntRowTemplate,
1820 non_virtual_pairs: &[(usize, usize)],
1821 generated_col_positions: &[usize],
1822 generated_fast_evals: &[FastGenEval],
1823 pk_indices: &[usize],
1824 columns: &[crate::types::ColumnDef],
1825) -> Option<TrivialFastProgram> {
1826 let pk_col = pk_indices[0];
1827 let col_to_slot: rustc_hash::FxHashMap<usize, usize> =
1828 non_virtual_pairs.iter().copied().collect();
1829 let slot_to_off: rustc_hash::FxHashMap<usize, usize> =
1830 row_encoder.slot_offsets.iter().copied().collect();
1831
1832 let mut col_to_param: rustc_hash::FxHashMap<usize, u8> = Default::default();
1833 let mut col_to_lit_int: rustc_hash::FxHashMap<usize, i64> = Default::default();
1834 let mut pk_param: Option<u8> = None;
1835 let mut ops: Vec<WriteOp> = Vec::with_capacity(bind_plan.len() + generated_col_positions.len());
1836 let mut not_null_param_indices: Vec<u8> = Vec::new();
1837
1838 for action in bind_plan {
1839 match action {
1840 BindAction::Param {
1841 param_idx,
1842 col_idx,
1843 target,
1844 } => {
1845 if *target != DataType::Integer {
1846 return None;
1847 }
1848 let pi: u8 = u8::try_from(*param_idx).ok()?;
1849 col_to_param.insert(*col_idx, pi);
1850 if *col_idx == pk_col {
1851 pk_param = Some(pi);
1852 } else {
1853 let slot = *col_to_slot.get(col_idx)?;
1854 let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
1855 ops.push(WriteOp::ParamI64 { param_idx: pi, off });
1856 if !columns[*col_idx].nullable {
1857 not_null_param_indices.push(pi);
1858 }
1859 }
1860 }
1861 BindAction::Literal { value, col_idx } => match value {
1862 Value::Integer(v) => {
1863 col_to_lit_int.insert(*col_idx, *v);
1864 if *col_idx == pk_col {
1865 return None;
1866 }
1867 let slot = *col_to_slot.get(col_idx)?;
1868 let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
1869 ops.push(WriteOp::LiteralI64 { value: *v, off });
1870 }
1871 _ => return None,
1872 },
1873 }
1874 }
1875
1876 let pk_param = pk_param?;
1877
1878 for (i, &gen_pos) in generated_col_positions.iter().enumerate() {
1879 let gen_slot = *col_to_slot.get(&gen_pos)?;
1880 let gen_off = u32::try_from(*slot_to_off.get(&gen_slot)?).ok()?;
1881 let bitmap_byte_off = u32::try_from(2 + gen_slot / 8).ok()?;
1882 let bitmap_bit_mask: u8 = 1u8 << (gen_slot % 8);
1883 let gen_col_nullable = columns[gen_pos].nullable;
1884
1885 match &generated_fast_evals[i] {
1886 FastGenEval::IntColAddCol {
1887 left_idx,
1888 right_idx,
1889 } => {
1890 let a_param = col_to_param.get(left_idx).copied();
1891 let b_param = col_to_param.get(right_idx).copied();
1892 match (a_param, b_param) {
1893 (Some(ap), Some(bp)) => {
1894 let deps_safe = gen_col_nullable
1895 || (not_null_param_indices.contains(&ap)
1896 && not_null_param_indices.contains(&bp));
1897 if !deps_safe {
1898 return None;
1899 }
1900 ops.push(WriteOp::GenAddParamsI64 {
1901 a_param: ap,
1902 b_param: bp,
1903 off: gen_off,
1904 bitmap_byte_off,
1905 bitmap_bit_mask,
1906 });
1907 }
1908 (Some(p), None) => {
1909 let lit = col_to_lit_int.get(right_idx).copied()?;
1910 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
1911 return None;
1912 }
1913 ops.push(WriteOp::GenMulAddParamI64 {
1914 param_idx: p,
1915 mul: 1,
1916 add: lit,
1917 off: gen_off,
1918 bitmap_byte_off,
1919 bitmap_bit_mask,
1920 });
1921 }
1922 (None, Some(p)) => {
1923 let lit = col_to_lit_int.get(left_idx).copied()?;
1924 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
1925 return None;
1926 }
1927 ops.push(WriteOp::GenMulAddParamI64 {
1928 param_idx: p,
1929 mul: 1,
1930 add: lit,
1931 off: gen_off,
1932 bitmap_byte_off,
1933 bitmap_bit_mask,
1934 });
1935 }
1936 (None, None) => {
1937 let la = col_to_lit_int.get(left_idx).copied()?;
1938 let lb = col_to_lit_int.get(right_idx).copied()?;
1939 ops.push(WriteOp::LiteralI64 {
1940 value: la.wrapping_add(lb),
1941 off: gen_off,
1942 });
1943 }
1944 }
1945 }
1946 FastGenEval::IntColMulAdd {
1947 col_schema_idx,
1948 mul,
1949 add,
1950 } => {
1951 if let Some(p) = col_to_param.get(col_schema_idx).copied() {
1952 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
1953 return None;
1954 }
1955 ops.push(WriteOp::GenMulAddParamI64 {
1956 param_idx: p,
1957 mul: *mul,
1958 add: *add,
1959 off: gen_off,
1960 bitmap_byte_off,
1961 bitmap_bit_mask,
1962 });
1963 } else if let Some(lit) = col_to_lit_int.get(col_schema_idx).copied() {
1964 ops.push(WriteOp::LiteralI64 {
1965 value: lit.wrapping_mul(*mul).wrapping_add(*add),
1966 off: gen_off,
1967 });
1968 } else {
1969 return None;
1970 }
1971 }
1972 FastGenEval::None => return None,
1973 }
1974 }
1975
1976 Some(TrivialFastProgram {
1977 template: row_encoder.template.clone(),
1978 ops,
1979 pk_param,
1980 not_null_param_indices,
1981 })
1982}
1983
1984#[derive(Clone)]
1985pub(super) enum CompiledOnConflict {
1986 DoNothing {
1987 target: Option<ConflictKind>,
1988 },
1989 DoUpdate {
1990 target: ConflictKind,
1991 assignments: Vec<(usize, Expr)>,
1992 where_clause: Option<Expr>,
1993 fast_paths: Option<Vec<DoUpdateFastPath>>,
1994 },
1995}
1996
1997#[derive(Clone, Copy)]
1998pub(super) enum DoUpdateFastPath {
1999 IntAddConst { phys_idx: usize, delta: i64 },
2000}
2001
2002#[derive(Clone, Debug)]
2003pub(super) enum ConflictKind {
2004 PrimaryKey,
2005 UniqueIndex { index_idx: usize },
2006}
2007
2008fn resolve_conflict_target(target: &ConflictTarget, ts: &TableSchema) -> Result<ConflictKind> {
2009 match target {
2010 ConflictTarget::Columns(cols) => {
2011 let col_idx_set: Vec<u16> = cols
2012 .iter()
2013 .map(|name| {
2014 ts.column_index(name)
2015 .map(|i| i as u16)
2016 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
2017 })
2018 .collect::<Result<_>>()?;
2019 let pk_set = ts.primary_key_columns.clone();
2020 if set_equal(&col_idx_set, &pk_set) {
2021 return Ok(ConflictKind::PrimaryKey);
2022 }
2023 for (index_idx, idx) in ts.indices.iter().enumerate() {
2024 if idx.unique && set_equal(&col_idx_set, &idx.columns) {
2025 return Ok(ConflictKind::UniqueIndex { index_idx });
2026 }
2027 }
2028 Err(SqlError::Plan(
2029 "ON CONFLICT target does not match any unique constraint".into(),
2030 ))
2031 }
2032 ConflictTarget::Constraint(name) => {
2033 let lower = name.to_ascii_lowercase();
2034 for (index_idx, idx) in ts.indices.iter().enumerate() {
2035 if idx.name.eq_ignore_ascii_case(&lower) {
2036 if idx.unique {
2037 return Ok(ConflictKind::UniqueIndex { index_idx });
2038 }
2039 return Err(SqlError::Plan(format!(
2040 "ON CONFLICT ON CONSTRAINT '{name}' requires a unique index"
2041 )));
2042 }
2043 }
2044 Err(SqlError::Plan(format!(
2045 "unknown constraint '{name}'; primary keys cannot be referenced by name, use ON CONFLICT (col_list)"
2046 )))
2047 }
2048 }
2049}
2050
2051fn set_equal(a: &[u16], b: &[u16]) -> bool {
2052 if a.len() != b.len() {
2053 return false;
2054 }
2055 let mut a_sorted = a.to_vec();
2056 let mut b_sorted = b.to_vec();
2057 a_sorted.sort_unstable();
2058 b_sorted.sort_unstable();
2059 a_sorted == b_sorted
2060}
2061
2062pub(super) enum InsertRowOutcome {
2063 Inserted,
2064 Updated { old: Vec<Value>, new: Vec<Value> },
2065 Skipped,
2066}
2067
2068#[allow(clippy::too_many_arguments)]
2069#[inline]
2070pub(super) fn apply_insert_with_conflict(
2071 wtx: &mut WriteTxn<'_>,
2072 table_schema: &TableSchema,
2073 key_buf: &[u8],
2074 value_buf: &[u8],
2075 row: &[Value],
2076 pk_values: &[Value],
2077 on_conflict: &CompiledOnConflict,
2078 col_map: &ColumnMap,
2079 capture_returning: bool,
2080) -> Result<InsertRowOutcome> {
2081 let table_bytes = table_schema.name.as_bytes();
2082
2083 if let CompiledOnConflict::DoNothing { target } = on_conflict {
2084 let pk_target = matches!(target, None | Some(ConflictKind::PrimaryKey));
2085 if pk_target && table_schema.indices.is_empty() && table_schema.foreign_keys.is_empty() {
2086 let inserted = wtx
2087 .table_insert_if_absent(table_bytes, key_buf, value_buf)
2088 .map_err(SqlError::Storage)?;
2089 return Ok(if inserted {
2090 InsertRowOutcome::Inserted
2091 } else {
2092 InsertRowOutcome::Skipped
2093 });
2094 }
2095 }
2096
2097 if let CompiledOnConflict::DoUpdate {
2098 target: ConflictKind::PrimaryKey,
2099 assignments,
2100 where_clause,
2101 fast_paths,
2102 } = on_conflict
2103 {
2104 if can_fuse_do_update(table_schema, assignments) {
2105 return apply_do_update_fused(
2106 wtx,
2107 table_schema,
2108 table_bytes,
2109 key_buf,
2110 value_buf,
2111 row,
2112 assignments,
2113 where_clause.as_ref(),
2114 col_map,
2115 fast_paths.as_deref(),
2116 capture_returning,
2117 );
2118 }
2119 }
2120
2121 let primary_outcome = wtx
2122 .table_insert_or_fetch(table_bytes, key_buf, value_buf)
2123 .map_err(SqlError::Storage)?;
2124
2125 match primary_outcome {
2126 citadel_txn::write_txn::InsertOutcome::Inserted => {
2127 if table_schema.indices.is_empty() {
2128 return Ok(InsertRowOutcome::Inserted);
2129 }
2130 let mut inserted_keys: Vec<(usize, Vec<u8>)> = Vec::new();
2131 match insert_index_entries_or_fetch(
2132 wtx,
2133 table_schema,
2134 row,
2135 pk_values,
2136 &mut inserted_keys,
2137 )? {
2138 None => Ok(InsertRowOutcome::Inserted),
2139 Some(conflicting_idx) => {
2140 let matches_target =
2141 matches!(on_conflict, CompiledOnConflict::DoNothing { target: None })
2142 || matches!(
2143 on_conflict,
2144 CompiledOnConflict::DoNothing {
2145 target: Some(ConflictKind::UniqueIndex { index_idx }),
2146 } | CompiledOnConflict::DoUpdate {
2147 target: ConflictKind::UniqueIndex { index_idx },
2148 ..
2149 } if *index_idx == conflicting_idx
2150 );
2151 undo_partial_insert(wtx, table_schema, key_buf, &inserted_keys)?;
2152 if !matches_target {
2153 return Err(SqlError::UniqueViolation(
2154 table_schema.indices[conflicting_idx].name.clone(),
2155 ));
2156 }
2157 match on_conflict {
2158 CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2159 CompiledOnConflict::DoUpdate {
2160 assignments,
2161 where_clause,
2162 ..
2163 } => {
2164 let existing_pk =
2165 fetch_unique_index_pk(wtx, table_schema, conflicting_idx, row)?;
2166 apply_do_update(
2167 wtx,
2168 table_schema,
2169 &existing_pk,
2170 row,
2171 assignments,
2172 where_clause.as_ref(),
2173 col_map,
2174 capture_returning,
2175 )
2176 }
2177 }
2178 }
2179 }
2180 }
2181 citadel_txn::write_txn::InsertOutcome::Existed(old_bytes) => {
2182 let matches_target = matches!(
2183 on_conflict,
2184 CompiledOnConflict::DoNothing { target: None }
2185 | CompiledOnConflict::DoNothing {
2186 target: Some(ConflictKind::PrimaryKey),
2187 }
2188 | CompiledOnConflict::DoUpdate {
2189 target: ConflictKind::PrimaryKey,
2190 ..
2191 }
2192 );
2193 if !matches_target {
2194 return Err(SqlError::DuplicateKey);
2195 }
2196 match on_conflict {
2197 CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2198 CompiledOnConflict::DoUpdate {
2199 assignments,
2200 where_clause,
2201 ..
2202 } => {
2203 let old_row = decode_full_row(table_schema, key_buf, &old_bytes)?;
2204 apply_do_update_with_old_row(
2205 wtx,
2206 table_schema,
2207 key_buf,
2208 &old_row,
2209 row,
2210 assignments,
2211 where_clause.as_ref(),
2212 col_map,
2213 capture_returning,
2214 )
2215 }
2216 }
2217 }
2218 }
2219}
2220
2221#[inline]
2222fn apply_fast_path_patch(
2223 old_bytes: &[u8],
2224 fast_paths: &[DoUpdateFastPath],
2225) -> Result<UpsertAction> {
2226 UPSERT_SCRATCH.with(|slot| {
2227 let mut bufs = slot.borrow_mut();
2228 bufs.new_value_buf.clear();
2229 bufs.new_value_buf.extend_from_slice(old_bytes);
2230
2231 let mut patch_scratch: Vec<u8> = Vec::new();
2232
2233 for fp in fast_paths {
2234 match fp {
2235 DoUpdateFastPath::IntAddConst { phys_idx, delta } => {
2236 let decoded =
2237 crate::encoding::decode_columns(&bufs.new_value_buf, &[*phys_idx])?;
2238 let old_val = &decoded[0];
2239 let new_val = match old_val {
2240 Value::Integer(i) => Value::Integer(i.wrapping_add(*delta)),
2241 Value::Null => Value::Null,
2242 _ => {
2243 return Err(SqlError::TypeMismatch {
2244 expected: "INTEGER".into(),
2245 got: old_val.data_type().to_string(),
2246 });
2247 }
2248 };
2249 if !crate::encoding::patch_column_in_place(
2250 &mut bufs.new_value_buf,
2251 *phys_idx,
2252 &new_val,
2253 )? {
2254 patch_scratch.clear();
2255 crate::encoding::patch_row_column(
2256 &bufs.new_value_buf,
2257 *phys_idx,
2258 &new_val,
2259 &mut patch_scratch,
2260 )?;
2261 std::mem::swap(&mut bufs.new_value_buf, &mut patch_scratch);
2262 }
2263 }
2264 }
2265 }
2266
2267 if bufs.new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2268 return Err(SqlError::RowTooLarge {
2269 size: bufs.new_value_buf.len(),
2270 max: citadel_core::MAX_VALUE_SIZE,
2271 });
2272 }
2273
2274 Ok(UpsertAction::Replace(bufs.new_value_buf.clone()))
2275 })
2276}
2277
2278fn upsert_needs_row(oc: &CompiledOnConflict, ts: &TableSchema) -> bool {
2279 if !ts.indices.is_empty() {
2280 return true;
2281 }
2282 match oc {
2283 CompiledOnConflict::DoNothing { .. } => false,
2284 CompiledOnConflict::DoUpdate { fast_paths, .. } => fast_paths.is_none() || ts.has_checks(),
2285 }
2286}
2287
2288fn can_fuse_do_update(ts: &TableSchema, assignments: &[(usize, Expr)]) -> bool {
2289 if !ts.indices.is_empty() {
2290 return false;
2291 }
2292 if !ts.foreign_keys.is_empty() {
2293 return false;
2294 }
2295 if ts.columns.iter().any(|c| c.generated_kind.is_some()) {
2296 return false;
2297 }
2298 let pk = ts.pk_indices();
2299 !assignments.iter().any(|(ci, _)| pk.contains(ci))
2300}
2301
2302#[allow(clippy::too_many_arguments)]
2303#[inline]
2304fn apply_do_update_fused(
2305 wtx: &mut WriteTxn<'_>,
2306 table_schema: &TableSchema,
2307 table_bytes: &[u8],
2308 key_buf: &[u8],
2309 value_buf: &[u8],
2310 proposed_row: &[Value],
2311 assignments: &[(usize, Expr)],
2312 where_clause: Option<&Expr>,
2313 col_map: &ColumnMap,
2314 fast_paths: Option<&[DoUpdateFastPath]>,
2315 capture_returning: bool,
2316) -> Result<InsertRowOutcome> {
2317 let non_pk = table_schema.non_pk_indices();
2318 let enc_pos = table_schema.encoding_positions();
2319 let phys_count = table_schema.physical_non_pk_count();
2320 let dropped = table_schema.dropped_non_pk_slots();
2321 let has_checks = table_schema.has_checks();
2322 let has_fks = !table_schema.foreign_keys.is_empty();
2323
2324 let captured: std::cell::RefCell<Option<(Vec<Value>, Vec<Value>)>> =
2325 std::cell::RefCell::new(None);
2326
2327 let outcome =
2328 wtx.table_upsert_with::<_, SqlError>(table_bytes, key_buf, value_buf, |old_bytes| {
2329 if let Some(fps) = fast_paths {
2330 if !has_checks {
2331 let action = apply_fast_path_patch(old_bytes, fps)?;
2332 if capture_returning {
2333 if let UpsertAction::Replace(ref new_bytes) = action {
2334 let old_row = decode_full_row(table_schema, key_buf, old_bytes)?;
2335 let new_row = decode_full_row(table_schema, key_buf, new_bytes)?;
2336 *captured.borrow_mut() = Some((old_row, new_row));
2337 }
2338 }
2339 return Ok(action);
2340 }
2341 }
2342 UPSERT_SCRATCH.with(|slot| {
2343 let mut bufs = slot.borrow_mut();
2344 let UpsertBufs {
2345 old_row,
2346 new_row,
2347 value_values,
2348 new_value_buf,
2349 } = &mut *bufs;
2350
2351 old_row.clear();
2352 old_row.resize(table_schema.columns.len(), Value::Null);
2353 decode_full_row_into(table_schema, key_buf, old_bytes, old_row)?;
2354
2355 if let Some(w) = where_clause {
2356 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2357 let result = eval_expr(w, &ctx)?;
2358 if result.is_null() || !is_truthy(&result) {
2359 return Ok(UpsertAction::Skip);
2360 }
2361 }
2362
2363 new_row.clear();
2364 new_row.extend_from_slice(old_row);
2365 for (col_idx, expr) in assignments {
2366 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2367 let val = eval_expr(expr, &ctx)?;
2368 let col = &table_schema.columns[*col_idx];
2369 new_row[*col_idx] = if val.is_null() {
2370 Value::Null
2371 } else {
2372 let got = val.data_type();
2373 val.coerce_into(col.data_type)
2374 .ok_or_else(|| SqlError::TypeMismatch {
2375 expected: col.data_type.to_string(),
2376 got: got.to_string(),
2377 })?
2378 };
2379 }
2380
2381 for (assigned_idx, _) in assignments {
2382 let col = &table_schema.columns[*assigned_idx];
2383 if !col.nullable && new_row[col.position as usize].is_null() {
2384 return Err(SqlError::NotNullViolation(col.name.clone()));
2385 }
2386 }
2387 if has_checks {
2388 for col in &table_schema.columns {
2389 if let Some(ref check) = col.check_expr {
2390 let ctx = EvalCtx::new(col_map, new_row);
2391 let result = eval_expr(check, &ctx)?;
2392 if !is_truthy(&result) && !result.is_null() {
2393 let name = col.check_name.as_deref().unwrap_or(&col.name);
2394 return Err(SqlError::CheckViolation(name.to_string()));
2395 }
2396 }
2397 }
2398 for tc in &table_schema.check_constraints {
2399 let ctx = EvalCtx::new(col_map, new_row);
2400 let result = eval_expr(&tc.expr, &ctx)?;
2401 if !is_truthy(&result) && !result.is_null() {
2402 let name = tc.name.as_deref().unwrap_or(&tc.sql);
2403 return Err(SqlError::CheckViolation(name.to_string()));
2404 }
2405 }
2406 }
2407 let _ = has_fks;
2408
2409 value_values.clear();
2410 value_values.resize(phys_count, Value::Null);
2411 for &slot in dropped {
2412 value_values[slot as usize] = Value::Null;
2413 }
2414 for (j, &i) in non_pk.iter().enumerate() {
2415 value_values[enc_pos[j] as usize] = new_row[i].clone();
2416 }
2417 new_value_buf.clear();
2418 crate::encoding::encode_row_into(value_values, new_value_buf);
2419
2420 if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2421 return Err(SqlError::RowTooLarge {
2422 size: new_value_buf.len(),
2423 max: citadel_core::MAX_VALUE_SIZE,
2424 });
2425 }
2426
2427 if capture_returning {
2428 *captured.borrow_mut() = Some((old_row.clone(), new_row.clone()));
2429 }
2430 Ok(UpsertAction::Replace(new_value_buf.clone()))
2431 })
2432 })?;
2433
2434 match outcome {
2435 UpsertOutcome::Inserted => Ok(InsertRowOutcome::Inserted),
2436 UpsertOutcome::Updated => {
2437 if capture_returning {
2438 let (old, new) = captured.into_inner().ok_or_else(|| {
2439 SqlError::InvalidValue("DO UPDATE produced no captured rows".into())
2440 })?;
2441 Ok(InsertRowOutcome::Updated { old, new })
2442 } else {
2443 Ok(InsertRowOutcome::Inserted)
2444 }
2445 }
2446 UpsertOutcome::Skipped => Ok(InsertRowOutcome::Skipped),
2447 }
2448}
2449
2450fn fetch_unique_index_pk(
2451 wtx: &mut WriteTxn<'_>,
2452 table_schema: &TableSchema,
2453 index_idx: usize,
2454 row: &[Value],
2455) -> Result<Vec<u8>> {
2456 let idx = &table_schema.indices[index_idx];
2457 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2458 let indexed: Vec<Value> = idx
2459 .columns
2460 .iter()
2461 .map(|&col_idx| row[col_idx as usize].clone())
2462 .collect();
2463 let key = crate::encoding::encode_composite_key(&indexed);
2464 let value = wtx
2465 .table_get(&idx_table, &key)
2466 .map_err(SqlError::Storage)?
2467 .ok_or_else(|| {
2468 SqlError::InvalidValue("unique index missing expected collision entry".into())
2469 })?;
2470 Ok(value)
2471}
2472
2473#[allow(clippy::too_many_arguments)]
2474fn apply_do_update(
2475 wtx: &mut WriteTxn<'_>,
2476 table_schema: &TableSchema,
2477 pk_key: &[u8],
2478 proposed_row: &[Value],
2479 assignments: &[(usize, Expr)],
2480 where_clause: Option<&Expr>,
2481 col_map: &ColumnMap,
2482 capture_returning: bool,
2483) -> Result<InsertRowOutcome> {
2484 let old_value = wtx
2485 .table_get(table_schema.name.as_bytes(), pk_key)
2486 .map_err(SqlError::Storage)?
2487 .ok_or_else(|| SqlError::InvalidValue("primary row missing for DO UPDATE target".into()))?;
2488 let old_row = decode_full_row(table_schema, pk_key, &old_value)?;
2489 apply_do_update_with_old_row(
2490 wtx,
2491 table_schema,
2492 pk_key,
2493 &old_row,
2494 proposed_row,
2495 assignments,
2496 where_clause,
2497 col_map,
2498 capture_returning,
2499 )
2500}
2501
2502#[allow(clippy::too_many_arguments)]
2503fn apply_do_update_with_old_row(
2504 wtx: &mut WriteTxn<'_>,
2505 table_schema: &TableSchema,
2506 old_pk_key: &[u8],
2507 old_row: &[Value],
2508 proposed_row: &[Value],
2509 assignments: &[(usize, Expr)],
2510 where_clause: Option<&Expr>,
2511 col_map: &ColumnMap,
2512 capture_returning: bool,
2513) -> Result<InsertRowOutcome> {
2514 if let Some(w) = where_clause {
2515 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2516 let result = eval_expr(w, &ctx)?;
2517 if result.is_null() || !is_truthy(&result) {
2518 return Ok(InsertRowOutcome::Skipped);
2519 }
2520 }
2521
2522 let mut new_row = old_row.to_vec();
2523 for (col_idx, expr) in assignments {
2524 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2525 let val = eval_expr(expr, &ctx)?;
2526 let col = &table_schema.columns[*col_idx];
2527 new_row[*col_idx] = if val.is_null() {
2528 Value::Null
2529 } else {
2530 let got = val.data_type();
2531 val.coerce_into(col.data_type)
2532 .ok_or_else(|| SqlError::TypeMismatch {
2533 expected: col.data_type.to_string(),
2534 got: got.to_string(),
2535 })?
2536 };
2537 }
2538
2539 for col in &table_schema.columns {
2540 if matches!(
2541 col.generated_kind,
2542 Some(crate::parser::GeneratedKind::Stored)
2543 ) {
2544 let val = eval_expr(
2545 col.generated_expr.as_ref().unwrap(),
2546 &EvalCtx::new(col_map, &new_row),
2547 )?;
2548 let pos = col.position as usize;
2549 new_row[pos] = if val.is_null() {
2550 if !col.nullable {
2551 return Err(SqlError::NotNullViolation(col.name.clone()));
2552 }
2553 Value::Null
2554 } else {
2555 let got = val.data_type();
2556 val.coerce_into(col.data_type)
2557 .ok_or_else(|| SqlError::TypeMismatch {
2558 expected: col.data_type.to_string(),
2559 got: got.to_string(),
2560 })?
2561 };
2562 }
2563 }
2564
2565 let pk_indices = table_schema.pk_indices();
2566 let assigned_pk = assignments.iter().any(|(ci, _)| pk_indices.contains(ci));
2567 let pk_changed = assigned_pk && pk_indices.iter().any(|&i| old_row[i] != new_row[i]);
2568
2569 for (assigned_idx, _) in assignments {
2570 let col = &table_schema.columns[*assigned_idx];
2571 if !col.nullable && new_row[col.position as usize].is_null() {
2572 return Err(SqlError::NotNullViolation(col.name.clone()));
2573 }
2574 }
2575 if table_schema.has_checks() {
2576 for col in &table_schema.columns {
2577 if let Some(ref check) = col.check_expr {
2578 let ctx = EvalCtx::new(col_map, &new_row);
2579 let result = eval_expr(check, &ctx)?;
2580 if !is_truthy(&result) && !result.is_null() {
2581 let name = col.check_name.as_deref().unwrap_or(&col.name);
2582 return Err(SqlError::CheckViolation(name.to_string()));
2583 }
2584 }
2585 }
2586 for tc in &table_schema.check_constraints {
2587 let ctx = EvalCtx::new(col_map, &new_row);
2588 let result = eval_expr(&tc.expr, &ctx)?;
2589 if !is_truthy(&result) && !result.is_null() {
2590 let name = tc.name.as_deref().unwrap_or(&tc.sql);
2591 return Err(SqlError::CheckViolation(name.to_string()));
2592 }
2593 }
2594 }
2595 for fk in &table_schema.foreign_keys {
2596 let changed = fk
2597 .columns
2598 .iter()
2599 .any(|&ci| old_row[ci as usize] != new_row[ci as usize]);
2600 if !changed {
2601 continue;
2602 }
2603 let any_null = fk.columns.iter().any(|&ci| new_row[ci as usize].is_null());
2604 if any_null {
2605 continue;
2606 }
2607 let fk_vals: Vec<Value> = fk
2608 .columns
2609 .iter()
2610 .map(|&ci| new_row[ci as usize].clone())
2611 .collect();
2612 let fk_key = crate::encoding::encode_composite_key(&fk_vals);
2613 if fk.deferrable && fk.initially_deferred {
2614 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
2615 wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
2616 fk_name: name,
2617 foreign_table: fk.foreign_table.as_bytes().to_vec(),
2618 parent_key: fk_key,
2619 });
2620 continue;
2621 }
2622 if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key) {
2623 let found = wtx
2624 .table_get(fk.foreign_table.as_bytes(), &fk_key)
2625 .map_err(SqlError::Storage)?;
2626 if found.is_none() {
2627 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
2628 return Err(SqlError::ForeignKeyViolation(name.to_string()));
2629 }
2630 wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key);
2631 }
2632 }
2633
2634 let has_indices = !table_schema.indices.is_empty();
2635 let old_pk_values: Vec<Value> = if has_indices || pk_changed {
2636 pk_indices.iter().map(|&i| old_row[i].clone()).collect()
2637 } else {
2638 Vec::new()
2639 };
2640 let new_pk_values: Vec<Value> = if has_indices || pk_changed {
2641 pk_indices.iter().map(|&i| new_row[i].clone()).collect()
2642 } else {
2643 Vec::new()
2644 };
2645
2646 let non_pk = table_schema.non_pk_indices();
2647 let enc_pos = table_schema.encoding_positions();
2648 let phys_count = table_schema.physical_non_pk_count();
2649 let dropped = table_schema.dropped_non_pk_slots();
2650 let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
2651 for &slot in dropped {
2652 value_values[slot as usize] = Value::Null;
2653 }
2654 for (j, &i) in non_pk.iter().enumerate() {
2655 let col = &table_schema.columns[i];
2656 value_values[enc_pos[j] as usize] = if matches!(
2657 col.generated_kind,
2658 Some(crate::parser::GeneratedKind::Virtual)
2659 ) {
2660 Value::Null
2661 } else {
2662 new_row[i].clone()
2663 };
2664 }
2665 let mut new_value_buf = Vec::with_capacity(256);
2666 crate::encoding::encode_row_into(&value_values, &mut new_value_buf);
2667
2668 if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2669 return Err(SqlError::RowTooLarge {
2670 size: new_value_buf.len(),
2671 max: citadel_core::MAX_VALUE_SIZE,
2672 });
2673 }
2674
2675 if pk_changed {
2676 let new_pk_key = crate::encoding::encode_composite_key(&new_pk_values);
2677 let inserted = wtx
2678 .table_insert(table_schema.name.as_bytes(), &new_pk_key, &new_value_buf)
2679 .map_err(SqlError::Storage)?;
2680 if !inserted {
2681 return Err(SqlError::DuplicateKey);
2682 }
2683 wtx.table_delete(table_schema.name.as_bytes(), old_pk_key)
2684 .map_err(SqlError::Storage)?;
2685 for idx in &table_schema.indices {
2686 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2687 let old_idx_key = encode_index_key(idx, old_row, &old_pk_values);
2688 wtx.table_delete(&idx_table, &old_idx_key)
2689 .map_err(SqlError::Storage)?;
2690 let new_idx_key = encode_index_key(idx, &new_row, &new_pk_values);
2691 let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
2692 let is_new = wtx
2693 .table_insert(&idx_table, &new_idx_key, &new_idx_val)
2694 .map_err(SqlError::Storage)?;
2695 if idx.unique && !is_new {
2696 let any_null = idx.columns.iter().any(|&c| new_row[c as usize].is_null());
2697 if !any_null {
2698 return Err(SqlError::UniqueViolation(idx.name.clone()));
2699 }
2700 }
2701 }
2702 } else {
2703 wtx.table_update_sorted(
2704 table_schema.name.as_bytes(),
2705 &[(old_pk_key, new_value_buf.as_slice())],
2706 )
2707 .map_err(SqlError::Storage)?;
2708 let col_map_partial = any_partial_index(table_schema).then(|| table_schema.column_map());
2709 for idx in &table_schema.indices {
2710 let cols_changed = index_columns_changed(idx, old_row, &new_row);
2711 let (del, ins) = partial_idx_update_actions(
2712 idx,
2713 old_row,
2714 &new_row,
2715 cols_changed,
2716 false,
2717 col_map_partial,
2718 );
2719 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2720 if del {
2721 let old_idx_key = encode_index_key(idx, old_row, &old_pk_values);
2722 wtx.table_delete(&idx_table, &old_idx_key)
2723 .map_err(SqlError::Storage)?;
2724 }
2725 if ins {
2726 let new_idx_key = encode_index_key(idx, &new_row, &new_pk_values);
2727 let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
2728 let is_new = wtx
2729 .table_insert(&idx_table, &new_idx_key, &new_idx_val)
2730 .map_err(SqlError::Storage)?;
2731 if idx.unique && !is_new {
2732 let any_null = idx.columns.iter().any(|&c| new_row[c as usize].is_null());
2733 if !any_null {
2734 return Err(SqlError::UniqueViolation(idx.name.clone()));
2735 }
2736 }
2737 }
2738 }
2739 }
2740
2741 if capture_returning {
2742 Ok(InsertRowOutcome::Updated {
2743 old: old_row.to_vec(),
2744 new: new_row,
2745 })
2746 } else {
2747 Ok(InsertRowOutcome::Inserted)
2748 }
2749}
2750
2751fn detect_fast_paths(
2752 ts: &TableSchema,
2753 assignments: &[(usize, Expr)],
2754) -> Option<Vec<DoUpdateFastPath>> {
2755 let non_pk = ts.non_pk_indices();
2756 let enc_pos = ts.encoding_positions();
2757 let mut out = Vec::with_capacity(assignments.len());
2758 for (col_idx, expr) in assignments {
2759 let col = &ts.columns[*col_idx];
2760 if col.data_type != DataType::Integer {
2761 return None;
2762 }
2763 let nonpk_order = non_pk.iter().position(|&i| i == *col_idx)?;
2764 let phys_idx = enc_pos[nonpk_order] as usize;
2765
2766 if let Expr::BinaryOp { left, op, right } = expr {
2767 if !matches!(op, BinOp::Add | BinOp::Sub) {
2768 return None;
2769 }
2770 let reads_target =
2771 matches!(left.as_ref(), Expr::Column(n) if n.eq_ignore_ascii_case(&col.name));
2772 if !reads_target {
2773 return None;
2774 }
2775 if let Expr::Literal(Value::Integer(n)) = right.as_ref() {
2776 let delta = if matches!(op, BinOp::Sub) { -n } else { *n };
2777 let _ = col_idx;
2778 out.push(DoUpdateFastPath::IntAddConst { phys_idx, delta });
2779 continue;
2780 }
2781 return None;
2782 }
2783 return None;
2784 }
2785 Some(out)
2786}
2787
2788fn compile_on_conflict(oc: &OnConflictClause, ts: &TableSchema) -> Result<CompiledOnConflict> {
2789 let target = oc
2790 .target
2791 .as_ref()
2792 .map(|t| resolve_conflict_target(t, ts))
2793 .transpose()?;
2794 match &oc.action {
2795 OnConflictAction::DoNothing => Ok(CompiledOnConflict::DoNothing { target }),
2796 OnConflictAction::DoUpdate {
2797 assignments,
2798 where_clause,
2799 } => {
2800 let target = target.ok_or_else(|| {
2801 SqlError::Plan("ON CONFLICT without target requires DO NOTHING".into())
2802 })?;
2803 let compiled_assignments: Vec<(usize, Expr)> = assignments
2804 .iter()
2805 .map(|(name, expr)| {
2806 let col_idx = ts
2807 .column_index(name)
2808 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?;
2809 Ok((col_idx, expr.clone()))
2810 })
2811 .collect::<Result<_>>()?;
2812 let fast_paths = if where_clause.is_none() {
2813 detect_fast_paths(ts, &compiled_assignments)
2814 } else {
2815 None
2816 };
2817 Ok(CompiledOnConflict::DoUpdate {
2818 target,
2819 assignments: compiled_assignments,
2820 where_clause: where_clause.clone(),
2821 fast_paths,
2822 })
2823 }
2824 }
2825}
2826
2827fn exec_insert_trivial_fast(
2829 wtx: &mut WriteTxn<'_>,
2830 table_lower: &str,
2831 cache: &InsertCache,
2832 bufs: &mut InsertBufs,
2833 params: &[Value],
2834) -> Result<ExecutionResult> {
2835 let prog = cache
2836 .trivial_fast_program
2837 .as_ref()
2838 .expect("trivial fast: program");
2839
2840 for &p in &prog.not_null_param_indices {
2841 if params[p as usize].is_null() {
2842 return Err(SqlError::NotNullViolation(format!("param@{p}")));
2843 }
2844 }
2845
2846 match ¶ms[prog.pk_param as usize] {
2847 Value::Integer(v) => crate::encoding::encode_int_key_into(*v, &mut bufs.key_buf),
2848 _ => return Err(SqlError::InvalidValue("non-integer PK in fast path".into())),
2849 }
2850
2851 bufs.value_buf.clear();
2852 bufs.value_buf.extend_from_slice(&prog.template);
2853
2854 for op in &prog.ops {
2855 match op {
2856 WriteOp::ParamI64 { param_idx, off } => match ¶ms[*param_idx as usize] {
2857 Value::Integer(v) => {
2858 let off = *off as usize;
2859 bufs.value_buf[off..off + 8].copy_from_slice(&v.to_le_bytes());
2860 }
2861 other => {
2862 return Err(SqlError::TypeMismatch {
2863 expected: "Integer".into(),
2864 got: other.data_type().to_string(),
2865 });
2866 }
2867 },
2868 WriteOp::LiteralI64 { value, off } => {
2869 let off = *off as usize;
2870 bufs.value_buf[off..off + 8].copy_from_slice(&value.to_le_bytes());
2871 }
2872 WriteOp::GenAddParamsI64 {
2873 a_param,
2874 b_param,
2875 off,
2876 bitmap_byte_off,
2877 bitmap_bit_mask,
2878 } => match (¶ms[*a_param as usize], ¶ms[*b_param as usize]) {
2879 (Value::Integer(a), Value::Integer(b)) => {
2880 let off = *off as usize;
2881 bufs.value_buf[off..off + 8].copy_from_slice(&a.wrapping_add(*b).to_le_bytes());
2882 }
2883 _ => {
2884 bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
2885 }
2886 },
2887 WriteOp::GenMulAddParamI64 {
2888 param_idx,
2889 mul,
2890 add,
2891 off,
2892 bitmap_byte_off,
2893 bitmap_bit_mask,
2894 } => match ¶ms[*param_idx as usize] {
2895 Value::Integer(v) => {
2896 let r = v.wrapping_mul(*mul).wrapping_add(*add);
2897 let off = *off as usize;
2898 bufs.value_buf[off..off + 8].copy_from_slice(&r.to_le_bytes());
2899 }
2900 _ => {
2901 bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
2902 }
2903 },
2904 }
2905 }
2906
2907 let is_new = wtx
2908 .table_insert(table_lower.as_bytes(), &bufs.key_buf, &bufs.value_buf)
2909 .map_err(SqlError::Storage)?;
2910 if !is_new {
2911 return Err(SqlError::DuplicateKey);
2912 }
2913 Ok(ExecutionResult::RowsAffected(1))
2914}
2915
2916fn build_bind_plan(
2917 stmt: &InsertStmt,
2918 col_indices: &[usize],
2919 col_data_types: &[DataType],
2920) -> Option<Vec<BindAction>> {
2921 let rows = match &stmt.source {
2922 InsertSource::Values(rows) => rows,
2923 _ => return None,
2924 };
2925 if rows.len() != 1 {
2926 return None;
2927 }
2928 let value_row = &rows[0];
2929 if value_row.len() != col_indices.len() {
2930 return None;
2931 }
2932 let mut plan = Vec::with_capacity(value_row.len());
2933 for (i, expr) in value_row.iter().enumerate() {
2934 let col_idx = col_indices[i];
2935 let target = col_data_types[col_idx];
2936 match expr {
2937 Expr::Parameter(n) => {
2938 if *n == 0 {
2939 return None;
2940 }
2941 plan.push(BindAction::Param {
2942 param_idx: n - 1,
2943 col_idx,
2944 target,
2945 });
2946 }
2947 Expr::Literal(v) => plan.push(BindAction::Literal {
2948 value: v.clone(),
2949 col_idx,
2950 }),
2951 _ => return None,
2952 }
2953 }
2954 Some(plan)
2955}
2956
2957impl CompiledInsert {
2958 pub fn try_compile(schema: &SchemaManager, stmt: &InsertStmt) -> Option<Self> {
2959 let lower = stmt.table.to_ascii_lowercase();
2960 let cached = if let Some(ts) = schema.get(&lower) {
2961 let insert_columns: Vec<&str> = if stmt.columns.is_empty() {
2962 ts.columns.iter().map(|c| c.name.as_str()).collect()
2963 } else {
2964 stmt.columns.iter().map(|s| s.as_str()).collect()
2965 };
2966 let mut col_indices = Vec::with_capacity(insert_columns.len());
2967 for name in &insert_columns {
2968 col_indices.push(ts.column_index(name)?);
2969 }
2970 if col_indices
2971 .iter()
2972 .any(|&ci| ts.columns[ci].generated_kind.is_some())
2973 {
2974 return None;
2975 }
2976 let on_conflict = stmt
2977 .on_conflict
2978 .as_ref()
2979 .map(|oc| compile_on_conflict(oc, ts))
2980 .transpose()
2981 .ok()
2982 .flatten()
2983 .map(Arc::new);
2984 let generated_col_positions: Vec<usize> = ts
2985 .columns
2986 .iter()
2987 .enumerate()
2988 .filter_map(|(i, c)| {
2989 matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored))
2990 .then_some(i)
2991 })
2992 .collect();
2993 let generated_fast_evals: Vec<FastGenEval> = generated_col_positions
2994 .iter()
2995 .map(|&pos| {
2996 detect_fast_gen_eval(ts.columns[pos].generated_expr.as_ref().unwrap(), ts)
2997 })
2998 .collect();
2999 let row_col_map = if on_conflict.is_some() || !generated_col_positions.is_empty() {
3000 Some(ColumnMap::new(&ts.columns))
3001 } else {
3002 None
3003 };
3004 let pk_indices: Vec<usize> = ts.pk_indices().to_vec();
3005 let non_pk_indices: Vec<usize> = ts.non_pk_indices().to_vec();
3006 let encoding_positions: Vec<u16> = ts.encoding_positions().to_vec();
3007 let dropped_non_pk_slots: Vec<u16> = ts.dropped_non_pk_slots().to_vec();
3008 let phys_count = ts.physical_non_pk_count();
3009 let col_data_types: Vec<DataType> = ts.columns.iter().map(|c| c.data_type).collect();
3010 let single_int_pk =
3011 pk_indices.len() == 1 && ts.columns[pk_indices[0]].data_type == DataType::Integer;
3012 let not_null_indices: Vec<u16> = ts
3013 .columns
3014 .iter()
3015 .filter(|c| !c.nullable)
3016 .map(|c| c.position)
3017 .collect();
3018 let bind_plan = build_bind_plan(stmt, &col_indices, &col_data_types);
3019 let any_defaults_flag = ts.columns.iter().any(|c| c.default_expr.is_some());
3020 let row_fully_overwritten = if any_defaults_flag {
3021 false
3022 } else {
3023 let mut covered: rustc_hash::FxHashSet<usize> =
3024 col_indices.iter().copied().collect();
3025 covered.extend(generated_col_positions.iter().copied());
3026 for (j, &i) in non_pk_indices.iter().enumerate() {
3027 let _ = j;
3028 if matches!(
3029 ts.columns[i].generated_kind,
3030 Some(crate::parser::GeneratedKind::Virtual)
3031 ) {
3032 covered.insert(i);
3033 }
3034 }
3035 bind_plan.is_some() && covered.len() == ts.columns.len()
3036 };
3037 let has_fks = !ts.foreign_keys.is_empty();
3038 let has_indices = !ts.indices.is_empty();
3039 let mut non_virtual_pairs: Vec<(usize, usize)> = Vec::new();
3040 let mut null_value_slots: Vec<usize> =
3041 dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3042 for (j, &i) in non_pk_indices.iter().enumerate() {
3043 let slot = encoding_positions[j] as usize;
3044 if matches!(
3045 ts.columns[i].generated_kind,
3046 Some(crate::parser::GeneratedKind::Virtual)
3047 ) {
3048 null_value_slots.push(slot);
3049 } else {
3050 non_virtual_pairs.push((i, slot));
3051 }
3052 }
3053 let row_encoder = {
3054 let all_int_or_null = non_pk_indices.iter().enumerate().all(|(j, &i)| {
3055 let col = &ts.columns[i];
3056 if matches!(
3057 col.generated_kind,
3058 Some(crate::parser::GeneratedKind::Virtual)
3059 ) {
3060 true
3061 } else {
3062 col.data_type == DataType::Integer && encoding_positions[j] != u16::MAX
3063 }
3064 });
3065 if all_int_or_null {
3066 let mut null_slots: Vec<usize> =
3067 dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3068 for (j, &i) in non_pk_indices.iter().enumerate() {
3069 if matches!(
3070 ts.columns[i].generated_kind,
3071 Some(crate::parser::GeneratedKind::Virtual)
3072 ) {
3073 null_slots.push(encoding_positions[j] as usize);
3074 }
3075 }
3076 Some(crate::encoding::build_int_row_template(
3077 phys_count,
3078 &null_slots,
3079 ))
3080 } else {
3081 None
3082 }
3083 };
3084 let is_trivial_fast_eligible = !insert_has_subquery(stmt)
3085 && !ts.columns.iter().any(|c| c.default_expr.is_some())
3086 && !ts.has_checks()
3087 && !has_fks
3088 && !has_indices
3089 && stmt.on_conflict.is_none()
3090 && stmt.returning.is_none()
3091 && bind_plan.is_some()
3092 && row_encoder.is_some()
3093 && row_fully_overwritten
3094 && single_int_pk
3095 && generated_fast_evals
3096 .iter()
3097 .all(|fe| !matches!(fe, FastGenEval::None));
3098 let trivial_fast_program = if is_trivial_fast_eligible {
3099 build_trivial_fast_program(
3100 bind_plan.as_ref().unwrap(),
3101 row_encoder.as_ref().unwrap(),
3102 &non_virtual_pairs,
3103 &generated_col_positions,
3104 &generated_fast_evals,
3105 &pk_indices,
3106 &ts.columns,
3107 )
3108 } else {
3109 None
3110 };
3111 let is_trivial_fast = trivial_fast_program.is_some();
3112 let has_checks = ts.has_checks();
3113 let any_defaults = ts.columns.iter().any(|c| c.default_expr.is_some());
3114 let needs_scoped_params = bind_plan.is_none()
3115 || has_checks
3116 || any_defaults
3117 || !generated_col_positions.is_empty()
3118 || on_conflict.is_some()
3119 || stmt.returning.is_some()
3120 || insert_has_subquery(stmt)
3121 || super::helpers::any_partial_index(ts);
3122 Some(InsertCache {
3123 col_indices,
3124 has_subquery: insert_has_subquery(stmt),
3125 any_defaults,
3126 has_checks,
3127 on_conflict,
3128 row_col_map,
3129 generated_col_positions,
3130 generated_fast_evals,
3131 pk_indices,
3132 non_pk_indices,
3133 encoding_positions,
3134 dropped_non_pk_slots,
3135 phys_count,
3136 single_int_pk,
3137 not_null_indices,
3138 bind_plan,
3139 row_fully_overwritten,
3140 row_encoder,
3141 is_trivial_fast,
3142 trivial_fast_program,
3143 needs_scoped_params,
3144 })
3145 } else if schema.get_view(&lower).is_some() {
3146 None
3147 } else {
3148 return None;
3149 };
3150 Some(Self {
3151 table_lower: lower,
3152 cached,
3153 })
3154 }
3155}
3156
3157impl CompiledPlan for CompiledInsert {
3158 fn execute(
3159 &self,
3160 db: &Database,
3161 schema: &SchemaManager,
3162 stmt: &Statement,
3163 params: &[Value],
3164 wtx: Option<&mut WriteTxn<'_>>,
3165 ) -> Result<ExecutionResult> {
3166 let ins = match stmt {
3167 Statement::Insert(i) => i,
3168 _ => {
3169 return Err(SqlError::Unsupported(
3170 "CompiledInsert received non-INSERT statement".into(),
3171 ))
3172 }
3173 };
3174 match wtx {
3175 None => exec_insert(db, schema, ins, params),
3176 Some(outer) => match self.cached.as_ref() {
3177 Some(c) if c.is_trivial_fast => with_insert_scratch(|bufs| {
3178 exec_insert_trivial_fast(outer, &self.table_lower, c, bufs, params)
3179 }),
3180 Some(c) => exec_insert_in_txn_cached(outer, schema, ins, params, c),
3181 None => exec_insert_in_txn(outer, schema, ins, params),
3182 },
3183 }
3184 }
3185
3186 fn uses_scoped_params(&self) -> bool {
3187 match self.cached.as_ref() {
3188 Some(c) => !c.is_trivial_fast && c.needs_scoped_params,
3189 None => true,
3190 }
3191 }
3192}
3193
3194pub struct CompiledDelete {
3195 table_lower: String,
3196}
3197
3198impl CompiledDelete {
3199 pub fn try_compile(schema: &SchemaManager, stmt: &DeleteStmt) -> Option<Self> {
3200 let lower = stmt.table.to_ascii_lowercase();
3201 schema.get(&lower)?;
3202 Some(Self { table_lower: lower })
3203 }
3204}
3205
3206impl CompiledPlan for CompiledDelete {
3207 fn execute(
3208 &self,
3209 db: &Database,
3210 schema: &SchemaManager,
3211 stmt: &Statement,
3212 _params: &[Value],
3213 wtx: Option<&mut WriteTxn<'_>>,
3214 ) -> Result<ExecutionResult> {
3215 let del = match stmt {
3216 Statement::Delete(d) => d,
3217 _ => {
3218 return Err(SqlError::Unsupported(
3219 "CompiledDelete received non-DELETE statement".into(),
3220 ))
3221 }
3222 };
3223 let _ = &self.table_lower;
3224 match wtx {
3225 None => super::write::exec_delete(db, schema, del),
3226 Some(outer) => super::write::exec_delete_in_txn(outer, schema, del),
3227 }
3228 }
3229}