1use std::cell::RefCell;
2use std::sync::Arc;
3
4use citadel::Database;
5use citadel_buffer::btree::{UpsertAction, UpsertOutcome};
6use citadel_txn::read_txn::ReadTxn;
7use citadel_txn::write_txn::WriteTxn;
8use rustc_hash::FxHashMap;
9
10use crate::encoding::{encode_composite_key_into, encode_row_into};
11use crate::error::{Result, SqlError};
12use crate::eval::{eval_expr, is_truthy, ColumnMap, EvalCtx};
13use crate::parser::*;
14use crate::types::*;
15
16use crate::schema::SchemaManager;
17
18use super::compile::CompiledPlan;
19use super::helpers::*;
20use super::CteContext;
21
22fn row_insert_trigger_flags(schema: &SchemaManager, table_name: &str) -> (bool, bool, bool) {
24 use crate::parser::{TriggerEvent, TriggerGranularity, TriggerTiming};
25 let triggers = schema.triggers_for(table_name);
26 let row =
27 |t: &crate::types::TriggerDef| t.enabled && t.granularity == TriggerGranularity::ForEachRow;
28 let ev = |t: &crate::types::TriggerDef, update: bool| {
29 t.events.iter().any(|e| {
30 if update {
31 matches!(e, TriggerEvent::Update(_))
32 } else {
33 matches!(e, TriggerEvent::Insert)
34 }
35 })
36 };
37 (
38 triggers
39 .iter()
40 .any(|t| row(t) && t.timing == TriggerTiming::Before && ev(t, false)),
41 triggers
42 .iter()
43 .any(|t| row(t) && t.timing == TriggerTiming::After && ev(t, false)),
44 triggers
45 .iter()
46 .any(|t| row(t) && t.timing == TriggerTiming::After && ev(t, true)),
47 )
48}
49
50fn mark_insert_dml(
53 schema: &SchemaManager,
54 table_name: &str,
55 on_conflict: bool,
56 single_int_pk: bool,
57 min_inserted_pk: Option<i64>,
58 rows_written: u64,
59) {
60 if on_conflict {
61 schema.mark_dml(table_name);
62 } else if single_int_pk {
63 if let Some(m) = min_inserted_pk {
64 schema.mark_dml_append(table_name, m);
65 }
66 } else if rows_written > 0 {
67 schema.mark_dml(table_name);
68 }
69}
70
71fn is_single_int_pk(table_schema: &TableSchema) -> bool {
73 table_schema.primary_key_columns.len() == 1
74 && matches!(
75 table_schema.columns[table_schema.primary_key_columns[0] as usize].data_type,
76 DataType::Integer
77 )
78}
79
80pub(super) fn exec_insert(
81 db: &Database,
82 schema: &SchemaManager,
83 stmt: &InsertStmt,
84 params: &[Value],
85) -> Result<ExecutionResult> {
86 let empty_ctes = CteContext::default();
87 let materialized;
88 let stmt = if insert_has_subquery(stmt) {
89 materialized = materialize_insert(stmt, &mut |sub| {
90 exec_subquery_read(db, schema, sub, &empty_ctes)
91 })?;
92 &materialized
93 } else {
94 stmt
95 };
96
97 let lower_name = stmt.table.to_ascii_lowercase();
98 if let Some(view_def) = schema.get_view(&lower_name) {
99 if super::triggers::has_instead_of(schema, &lower_name, super::triggers::FireEvent::Insert)
100 {
101 let aliases = view_def.column_aliases.clone();
102 return exec_instead_of_view_insert_auto(
103 db,
104 schema,
105 &lower_name,
106 &aliases,
107 stmt,
108 params,
109 );
110 }
111 return Err(SqlError::CannotModifyView(stmt.table.clone()));
112 }
113 if schema.get_matview(&lower_name).is_some() {
114 return Err(SqlError::CannotModifyView(format!(
115 "materialized view '{}' is read-only — use REFRESH MATERIALIZED VIEW",
116 stmt.table
117 )));
118 }
119 let table_schema = schema
120 .get(&lower_name)
121 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
122
123 let insert_columns = if stmt.columns.is_empty() {
124 table_schema
125 .columns
126 .iter()
127 .map(|c| c.name.clone())
128 .collect::<Vec<_>>()
129 } else {
130 stmt.columns
131 .iter()
132 .map(|c| c.to_ascii_lowercase())
133 .collect()
134 };
135
136 let col_indices: Vec<usize> = insert_columns
137 .iter()
138 .map(|name| {
139 table_schema
140 .column_index(name)
141 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
142 })
143 .collect::<Result<_>>()?;
144
145 for &ci in &col_indices {
146 if table_schema.columns[ci].generated_kind.is_some() {
147 return Err(SqlError::CannotInsertIntoGeneratedColumn(
148 table_schema.columns[ci].name.clone(),
149 ));
150 }
151 }
152
153 let defaults: Vec<(usize, &Expr)> = table_schema
154 .columns
155 .iter()
156 .filter(|c| c.default_expr.is_some() && !col_indices.contains(&(c.position as usize)))
157 .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
158 .collect();
159
160 let generated_cols: Vec<(usize, &Expr)> = table_schema
161 .columns
162 .iter()
163 .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
164 .map(|c| (c.position as usize, c.generated_expr.as_ref().unwrap()))
165 .collect();
166
167 let has_checks = table_schema.has_checks();
168 let strict = table_schema.is_strict();
169 let row_col_map_for_gen = (!generated_cols.is_empty()).then(|| table_schema.column_map());
170 let check_col_map = has_checks.then(|| table_schema.column_map());
171
172 let select_rows = match &stmt.source {
173 InsertSource::Select(sq) => {
174 let insert_ctes =
175 super::materialize_all_ctes(&sq.ctes, sq.recursive, &mut |body, ctx| {
176 exec_query_body_read(db, schema, body, ctx)
177 })?;
178 let qr = exec_query_body_read(db, schema, &sq.body, &insert_ctes)?;
179 Some(qr.rows)
180 }
181 InsertSource::Values(_) => None,
182 };
183
184 let compiled_conflict: Option<Arc<CompiledOnConflict>> = stmt
185 .on_conflict
186 .as_ref()
187 .map(|oc| compile_on_conflict(oc, table_schema).map(Arc::new))
188 .transpose()?;
189
190 let row_col_map = compiled_conflict
191 .as_ref()
192 .map(|_| table_schema.column_map());
193
194 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
195 if table_schema.has_ann_index() {
199 super::ann_persist::purge_segment(&mut wtx, &table_schema.name)?;
200 }
201 let mut count: u64 = 0;
202 let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
203 stmt.returning.as_ref().map(|_| Vec::new());
204
205 let pk_indices = table_schema.pk_indices();
206 let non_pk = table_schema.non_pk_indices();
207 let enc_pos = table_schema.encoding_positions();
208 let phys_count = table_schema.physical_non_pk_count();
209 let mut row = vec![Value::Null; table_schema.columns.len()];
210 let mut pk_values: Vec<Value> = vec![Value::Null; pk_indices.len()];
211 let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
212 let mut key_buf: Vec<u8> = Vec::with_capacity(64);
213 let mut value_buf: Vec<u8> = Vec::with_capacity(256);
214 let mut fk_key_buf: Vec<u8> = Vec::with_capacity(64);
215
216 let values = match &stmt.source {
217 InsertSource::Values(rows) => Some(rows.as_slice()),
218 InsertSource::Select(_) => None,
219 };
220 let sel_rows = select_rows.as_deref();
221
222 let total = match (values, sel_rows) {
223 (Some(rows), _) => rows.len(),
224 (_, Some(rows)) => rows.len(),
225 _ => 0,
226 };
227
228 if let Some(sel) = sel_rows {
229 if !sel.is_empty() && sel[0].len() != insert_columns.len() {
230 return Err(SqlError::InvalidValue(format!(
231 "INSERT ... SELECT column count mismatch: expected {}, got {}",
232 insert_columns.len(),
233 sel[0].len()
234 )));
235 }
236 }
237
238 let has_insert_statement_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
239 t.enabled
240 && t.granularity == crate::parser::TriggerGranularity::ForEachStatement
241 && t.events
242 .iter()
243 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
244 });
245 let mut stmt_new_rows: Vec<Vec<Value>> = if has_insert_statement_triggers {
246 Vec::with_capacity(total)
247 } else {
248 Vec::new()
249 };
250
251 if has_insert_statement_triggers {
252 super::triggers::fire_statement_triggers(
253 &mut wtx,
254 schema,
255 &table_schema.name,
256 crate::parser::TriggerTiming::Before,
257 super::triggers::FireEvent::Insert,
258 &table_schema.columns,
259 &[],
260 &[],
261 )?;
262 }
263
264 let plain_insert = compiled_conflict.is_none();
265 let single_int_pk = is_single_int_pk(table_schema);
266 let mut min_inserted_pk: Option<i64> = None;
267 let (has_before_insert_triggers, has_after_insert_triggers, has_after_update_triggers) =
268 row_insert_trigger_flags(schema, &table_schema.name);
269
270 for idx in 0..total {
271 for v in row.iter_mut() {
272 *v = Value::Null;
273 }
274
275 if let Some(value_rows) = values {
276 let value_row = &value_rows[idx];
277 if value_row.len() != insert_columns.len() {
278 return Err(SqlError::InvalidValue(format!(
279 "expected {} values, got {}",
280 insert_columns.len(),
281 value_row.len()
282 )));
283 }
284 for (i, expr) in value_row.iter().enumerate() {
285 let val = if let Expr::Parameter(n) = expr {
286 params
287 .get(n - 1)
288 .cloned()
289 .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?
290 } else {
291 eval_const_expr(expr)?
292 };
293 let col_idx = col_indices[i];
294 let col = &table_schema.columns[col_idx];
295 row[col_idx] = if val.is_null() {
296 Value::Null
297 } else {
298 coerce_for_column(val, col, strict)?
299 };
300 }
301 } else if let Some(sel) = sel_rows {
302 let sel_row = &sel[idx];
303 for (i, val) in sel_row.iter().enumerate() {
304 let col_idx = col_indices[i];
305 let col = &table_schema.columns[col_idx];
306 row[col_idx] = if val.is_null() {
307 Value::Null
308 } else {
309 coerce_for_column(val.clone(), col, strict)?
310 };
311 }
312 }
313
314 for &(pos, def_expr) in &defaults {
315 let val = eval_const_expr(def_expr)?;
316 let col = &table_schema.columns[pos];
317 if !val.is_null() {
318 row[pos] = coerce_for_column(val, col, strict)?;
319 }
320 }
321
322 if let Some(gen_map) = row_col_map_for_gen {
323 for &(pos, gen_expr) in &generated_cols {
324 let val = eval_expr(gen_expr, &EvalCtx::new(gen_map, &row))?;
325 let col = &table_schema.columns[pos];
326 row[pos] = if val.is_null() {
327 Value::Null
328 } else {
329 coerce_for_column(val, col, strict)?
330 };
331 }
332 }
333
334 for col in &table_schema.columns {
335 if !col.nullable && row[col.position as usize].is_null() {
336 return Err(SqlError::NotNullViolation(col.name.clone()));
337 }
338 }
339
340 if let Some(col_map) = check_col_map {
341 for col in &table_schema.columns {
342 if let Some(ref check) = col.check_expr {
343 let result = eval_expr(check, &EvalCtx::new(col_map, &row))?;
344 if !is_truthy(&result) && !result.is_null() {
345 let name = col.check_name.as_deref().unwrap_or(&col.name);
346 return Err(SqlError::CheckViolation(name.to_string()));
347 }
348 }
349 }
350 for tc in &table_schema.check_constraints {
351 let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &row))?;
352 if !is_truthy(&result) && !result.is_null() {
353 let name = tc.name.as_deref().unwrap_or(&tc.sql);
354 return Err(SqlError::CheckViolation(name.to_string()));
355 }
356 }
357 }
358
359 for fk in &table_schema.foreign_keys {
360 let any_null = fk.columns.iter().any(|&ci| row[ci as usize].is_null());
361 if any_null {
362 continue; }
364 let fk_vals: Vec<Value> = fk
365 .columns
366 .iter()
367 .map(|&ci| row[ci as usize].clone())
368 .collect();
369 fk_key_buf.clear();
370 encode_composite_key_into(&fk_vals, &mut fk_key_buf);
371 if fk.deferrable && fk.initially_deferred {
372 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
373 wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
374 fk_name: name,
375 foreign_table: fk.foreign_table.as_bytes().to_vec(),
376 parent_key: fk_key_buf.clone(),
377 });
378 continue;
379 }
380 if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key_buf) {
381 let found = wtx
382 .table_get(fk.foreign_table.as_bytes(), &fk_key_buf)
383 .map_err(SqlError::Storage)?;
384 if found.is_none() {
385 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
386 return Err(SqlError::ForeignKeyViolation(name.to_string()));
387 }
388 wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key_buf);
389 }
390 }
391
392 let proposed_row_for_returning: Option<Vec<Value>> =
393 returning_rows.as_ref().map(|_| row.clone());
394 let row_for_stmt_trigger: Option<Vec<Value>> = if has_insert_statement_triggers {
395 Some(row.clone())
396 } else {
397 None
398 };
399
400 if has_before_insert_triggers {
401 super::triggers::fire_row_triggers(
402 &mut wtx,
403 schema,
404 &table_schema.name,
405 crate::parser::TriggerTiming::Before,
406 super::triggers::FireEvent::Insert,
407 None,
408 Some(row.clone()),
409 &table_schema.columns,
410 )?;
411 }
412
413 for (j, &i) in pk_indices.iter().enumerate() {
414 pk_values[j] = std::mem::replace(&mut row[i], Value::Null);
415 }
416 encode_composite_key_into(&pk_values, &mut key_buf);
417 if plain_insert && single_int_pk {
418 if let Value::Integer(id) = &pk_values[0] {
419 min_inserted_pk = Some(min_inserted_pk.map_or(*id, |m| m.min(*id)));
420 }
421 }
422
423 for (j, &i) in non_pk.iter().enumerate() {
424 let col = &table_schema.columns[i];
425 if matches!(
426 col.generated_kind,
427 Some(crate::parser::GeneratedKind::Virtual)
428 ) {
429 value_values[enc_pos[j] as usize] = Value::Null;
430 row[i] = Value::Null;
431 } else {
432 value_values[enc_pos[j] as usize] = std::mem::replace(&mut row[i], Value::Null);
433 }
434 }
435 encode_row_into(&value_values, &mut value_buf);
436
437 if key_buf.len() > citadel_core::MAX_KEY_SIZE {
438 return Err(SqlError::KeyTooLarge {
439 size: key_buf.len(),
440 max: citadel_core::MAX_KEY_SIZE,
441 });
442 }
443 if value_buf.len() > citadel_core::MAX_VALUE_SIZE {
444 return Err(SqlError::RowTooLarge {
445 size: value_buf.len(),
446 max: citadel_core::MAX_VALUE_SIZE,
447 });
448 }
449
450 match compiled_conflict.as_ref() {
451 None => {
452 let is_new = wtx
453 .table_insert(table_schema.name.as_bytes(), &key_buf, &value_buf)
454 .map_err(SqlError::Storage)?;
455 if !is_new {
456 return Err(SqlError::DuplicateKey);
457 }
458 if !table_schema.indices.is_empty() || has_after_insert_triggers {
459 for (j, &i) in pk_indices.iter().enumerate() {
460 row[i] = pk_values[j].clone();
461 }
462 for (j, &i) in non_pk.iter().enumerate() {
463 row[i] =
464 std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
465 }
466 if !table_schema.indices.is_empty() {
467 insert_index_entries(&mut wtx, table_schema, &row, &pk_values)?;
468 }
469 if has_after_insert_triggers {
470 super::triggers::fire_row_triggers(
471 &mut wtx,
472 schema,
473 &table_schema.name,
474 crate::parser::TriggerTiming::After,
475 super::triggers::FireEvent::Insert,
476 None,
477 Some(row.clone()),
478 &table_schema.columns,
479 )?;
480 }
481 }
482 if let Some(r) = row_for_stmt_trigger.clone() {
483 stmt_new_rows.push(r);
484 }
485 count += 1;
486 if let Some(buf) = returning_rows.as_mut() {
487 buf.push((None, proposed_row_for_returning));
488 }
489 }
490 Some(oc) => {
491 let oc_ref: &CompiledOnConflict = oc;
492 let needs_row = upsert_needs_row(oc_ref, table_schema);
493 if needs_row {
494 for (j, &i) in pk_indices.iter().enumerate() {
495 row[i] = pk_values[j].clone();
496 }
497 for (j, &i) in non_pk.iter().enumerate() {
498 row[i] =
499 std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
500 }
501 }
502 let outcome = apply_insert_with_conflict(
503 &mut wtx,
504 table_schema,
505 &key_buf,
506 &value_buf,
507 &row,
508 &pk_values,
509 oc_ref,
510 row_col_map.unwrap(),
511 stmt.returning.is_some() || has_after_update_triggers,
513 )?;
514 match outcome {
515 InsertRowOutcome::Inserted => {
516 count += 1;
517 if let Some(buf) = returning_rows.as_mut() {
518 buf.push((None, proposed_row_for_returning));
519 }
520 if let Some(r) = row_for_stmt_trigger.clone() {
521 stmt_new_rows.push(r);
522 }
523 if has_after_insert_triggers {
524 super::triggers::fire_row_triggers(
525 &mut wtx,
526 schema,
527 &table_schema.name,
528 crate::parser::TriggerTiming::After,
529 super::triggers::FireEvent::Insert,
530 None,
531 Some(row.clone()),
532 &table_schema.columns,
533 )?;
534 }
535 }
536 InsertRowOutcome::Updated { old, new } => {
537 count += 1;
538 if let Some(buf) = returning_rows.as_mut() {
539 buf.push((Some(old.clone()), Some(new.clone())));
540 }
541 if has_after_update_triggers {
542 let changed_cols: Vec<String> = match oc_ref {
543 CompiledOnConflict::DoUpdate { assignments, .. } => assignments
544 .iter()
545 .map(|(col_idx, _)| table_schema.columns[*col_idx].name.clone())
546 .collect(),
547 _ => Vec::new(),
548 };
549 super::triggers::fire_row_triggers(
550 &mut wtx,
551 schema,
552 &table_schema.name,
553 crate::parser::TriggerTiming::After,
554 super::triggers::FireEvent::Update {
555 changed_columns: &changed_cols,
556 },
557 Some(old),
558 Some(new),
559 &table_schema.columns,
560 )?;
561 }
562 }
563 InsertRowOutcome::Skipped => {}
564 }
565 }
566 }
567 }
568
569 if has_insert_statement_triggers {
570 super::triggers::fire_statement_triggers(
571 &mut wtx,
572 schema,
573 &table_schema.name,
574 crate::parser::TriggerTiming::After,
575 super::triggers::FireEvent::Insert,
576 &table_schema.columns,
577 &[],
578 &stmt_new_rows,
579 )?;
580 }
581
582 mark_insert_dml(
583 schema,
584 &table_schema.name,
585 !plain_insert,
586 single_int_pk,
587 min_inserted_pk,
588 count,
589 );
590
591 if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
592 let qr = super::helpers::project_returning(table_schema, returning_cols, &rows)?;
593 super::helpers::drain_deferred_fk_checks(&mut wtx)?;
594 wtx.commit().map_err(SqlError::Storage)?;
595 return Ok(ExecutionResult::Query(qr));
596 }
597
598 super::helpers::drain_deferred_fk_checks(&mut wtx)?;
599 wtx.commit().map_err(SqlError::Storage)?;
600 Ok(ExecutionResult::RowsAffected(count))
601}
602
603pub(super) fn has_subquery(expr: &Expr) -> bool {
604 crate::parser::has_subquery(expr)
605}
606
607pub(super) fn stmt_has_subquery(stmt: &SelectStmt) -> bool {
608 if let Some(ref w) = stmt.where_clause {
609 if has_subquery(w) {
610 return true;
611 }
612 }
613 if let Some(ref h) = stmt.having {
614 if has_subquery(h) {
615 return true;
616 }
617 }
618 for col in &stmt.columns {
619 if let SelectColumn::Expr { expr, .. } = col {
620 if has_subquery(expr) {
621 return true;
622 }
623 }
624 }
625 for ob in &stmt.order_by {
626 if has_subquery(&ob.expr) {
627 return true;
628 }
629 }
630 for gb in &stmt.group_by {
631 if has_subquery(gb) {
632 return true;
633 }
634 }
635 for join in &stmt.joins {
636 if let Some(ref on_expr) = join.on_clause {
637 if has_subquery(on_expr) {
638 return true;
639 }
640 }
641 }
642 false
643}
644
645pub(super) fn materialize_expr(
646 expr: &Expr,
647 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
648) -> Result<Expr> {
649 match expr {
650 Expr::InSubquery {
651 expr: e,
652 subquery,
653 negated,
654 } => {
655 let inner = materialize_expr(e, exec_sub)?;
656 let qr = exec_sub(subquery)?;
657 if !qr.columns.is_empty() && qr.columns.len() != 1 {
658 return Err(SqlError::SubqueryMultipleColumns);
659 }
660 let mut values = rustc_hash::FxHashSet::default();
661 let mut has_null = false;
662 for row in &qr.rows {
663 if row[0].is_null() {
664 has_null = true;
665 } else {
666 values.insert(row[0].clone());
667 }
668 }
669 Ok(Expr::InSet {
670 expr: Box::new(inner),
671 values,
672 has_null,
673 negated: *negated,
674 })
675 }
676 Expr::ScalarSubquery(subquery) => {
677 let qr = exec_sub(subquery)?;
678 if qr.rows.len() > 1 {
679 return Err(SqlError::SubqueryMultipleRows);
680 }
681 let val = if qr.rows.is_empty() {
682 Value::Null
683 } else {
684 qr.rows[0][0].clone()
685 };
686 Ok(Expr::Literal(val))
687 }
688 Expr::Exists { subquery, negated } => {
689 let qr = exec_sub(subquery)?;
690 let exists = !qr.rows.is_empty();
691 let result = if *negated { !exists } else { exists };
692 Ok(Expr::Literal(Value::Boolean(result)))
693 }
694 Expr::InList {
695 expr: e,
696 list,
697 negated,
698 } => {
699 let inner = materialize_expr(e, exec_sub)?;
700 let items = list
701 .iter()
702 .map(|item| materialize_expr(item, exec_sub))
703 .collect::<Result<Vec<_>>>()?;
704 Ok(Expr::InList {
705 expr: Box::new(inner),
706 list: items,
707 negated: *negated,
708 })
709 }
710 Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
711 left: Box::new(materialize_expr(left, exec_sub)?),
712 op: *op,
713 right: Box::new(materialize_expr(right, exec_sub)?),
714 }),
715 Expr::UnaryOp { op, expr: e } => Ok(Expr::UnaryOp {
716 op: *op,
717 expr: Box::new(materialize_expr(e, exec_sub)?),
718 }),
719 Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(materialize_expr(e, exec_sub)?))),
720 Expr::IsNotNull(e) => Ok(Expr::IsNotNull(Box::new(materialize_expr(e, exec_sub)?))),
721 Expr::InSet {
722 expr: e,
723 values,
724 has_null,
725 negated,
726 } => Ok(Expr::InSet {
727 expr: Box::new(materialize_expr(e, exec_sub)?),
728 values: values.clone(),
729 has_null: *has_null,
730 negated: *negated,
731 }),
732 Expr::Between {
733 expr: e,
734 low,
735 high,
736 negated,
737 } => Ok(Expr::Between {
738 expr: Box::new(materialize_expr(e, exec_sub)?),
739 low: Box::new(materialize_expr(low, exec_sub)?),
740 high: Box::new(materialize_expr(high, exec_sub)?),
741 negated: *negated,
742 }),
743 Expr::Like {
744 expr: e,
745 pattern,
746 escape,
747 negated,
748 } => {
749 let esc = escape
750 .as_ref()
751 .map(|es| materialize_expr(es, exec_sub).map(Box::new))
752 .transpose()?;
753 Ok(Expr::Like {
754 expr: Box::new(materialize_expr(e, exec_sub)?),
755 pattern: Box::new(materialize_expr(pattern, exec_sub)?),
756 escape: esc,
757 negated: *negated,
758 })
759 }
760 Expr::Case {
761 operand,
762 conditions,
763 else_result,
764 } => {
765 let op = operand
766 .as_ref()
767 .map(|e| materialize_expr(e, exec_sub).map(Box::new))
768 .transpose()?;
769 let conds = conditions
770 .iter()
771 .map(|(c, r)| {
772 Ok((
773 materialize_expr(c, exec_sub)?,
774 materialize_expr(r, exec_sub)?,
775 ))
776 })
777 .collect::<Result<Vec<_>>>()?;
778 let else_r = else_result
779 .as_ref()
780 .map(|e| materialize_expr(e, exec_sub).map(Box::new))
781 .transpose()?;
782 Ok(Expr::Case {
783 operand: op,
784 conditions: conds,
785 else_result: else_r,
786 })
787 }
788 Expr::Coalesce(args) => {
789 let materialized = args
790 .iter()
791 .map(|a| materialize_expr(a, exec_sub))
792 .collect::<Result<Vec<_>>>()?;
793 Ok(Expr::Coalesce(materialized))
794 }
795 Expr::Cast { expr: e, data_type } => Ok(Expr::Cast {
796 expr: Box::new(materialize_expr(e, exec_sub)?),
797 data_type: *data_type,
798 }),
799 Expr::Function {
800 name,
801 args,
802 distinct,
803 } => {
804 let materialized = args
805 .iter()
806 .map(|a| materialize_expr(a, exec_sub))
807 .collect::<Result<Vec<_>>>()?;
808 Ok(Expr::Function {
809 name: name.clone(),
810 args: materialized,
811 distinct: *distinct,
812 })
813 }
814 other => Ok(other.clone()),
815 }
816}
817
818pub(super) fn materialize_stmt(
819 stmt: &SelectStmt,
820 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
821) -> Result<SelectStmt> {
822 let where_clause = stmt
823 .where_clause
824 .as_ref()
825 .map(|e| materialize_expr(e, exec_sub))
826 .transpose()?;
827 let having = stmt
828 .having
829 .as_ref()
830 .map(|e| materialize_expr(e, exec_sub))
831 .transpose()?;
832 let columns = stmt
833 .columns
834 .iter()
835 .map(|c| match c {
836 SelectColumn::AllColumns => Ok(SelectColumn::AllColumns),
837 SelectColumn::AllFromOld => Ok(SelectColumn::AllFromOld),
838 SelectColumn::AllFromNew => Ok(SelectColumn::AllFromNew),
839 SelectColumn::Expr { expr, alias } => Ok(SelectColumn::Expr {
840 expr: materialize_expr(expr, exec_sub)?,
841 alias: alias.clone(),
842 }),
843 })
844 .collect::<Result<Vec<_>>>()?;
845 let order_by = stmt
846 .order_by
847 .iter()
848 .map(|ob| {
849 Ok(OrderByItem {
850 expr: materialize_expr(&ob.expr, exec_sub)?,
851 descending: ob.descending,
852 nulls_first: ob.nulls_first,
853 })
854 })
855 .collect::<Result<Vec<_>>>()?;
856 let joins = stmt
857 .joins
858 .iter()
859 .map(|j| {
860 let on_clause = j
861 .on_clause
862 .as_ref()
863 .map(|e| materialize_expr(e, exec_sub))
864 .transpose()?;
865 Ok(JoinClause {
866 join_type: j.join_type,
867 table: j.table.clone(),
868 subquery: j.subquery.clone(),
869 on_clause,
870 })
871 })
872 .collect::<Result<Vec<_>>>()?;
873 let group_by = stmt
874 .group_by
875 .iter()
876 .map(|e| materialize_expr(e, exec_sub))
877 .collect::<Result<Vec<_>>>()?;
878 Ok(SelectStmt {
879 columns,
880 from: stmt.from.clone(),
881 from_alias: stmt.from_alias.clone(),
882 from_subquery: stmt.from_subquery.clone(),
883 from_args: stmt.from_args.clone(),
884 from_json_table: stmt.from_json_table.clone(),
885 joins,
886 distinct: stmt.distinct,
887 where_clause,
888 order_by,
889 limit: stmt.limit.clone(),
890 offset: stmt.offset.clone(),
891 group_by,
892 having,
893 })
894}
895
896pub(super) fn exec_subquery_read(
897 db: &Database,
898 schema: &SchemaManager,
899 stmt: &SelectStmt,
900 ctes: &CteContext,
901) -> Result<QueryResult> {
902 let mut rtx = db.begin_read();
903 exec_subquery_with_read(&mut rtx, schema, stmt, ctes)
904}
905
906pub(super) fn exec_subquery_with_read(
907 rtx: &mut ReadTxn<'_>,
908 schema: &SchemaManager,
909 stmt: &SelectStmt,
910 ctes: &CteContext,
911) -> Result<QueryResult> {
912 match super::exec_select_with_read(rtx, schema, stmt, ctes)? {
913 ExecutionResult::Query(qr) => Ok(qr),
914 _ => Ok(QueryResult {
915 columns: vec![],
916 rows: vec![],
917 }),
918 }
919}
920
921pub(super) fn exec_subquery_write(
922 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
923 schema: &SchemaManager,
924 stmt: &SelectStmt,
925 ctes: &CteContext,
926) -> Result<QueryResult> {
927 match super::exec_select_in_txn(wtx, schema, stmt, ctes)? {
928 ExecutionResult::Query(qr) => Ok(qr),
929 _ => Ok(QueryResult {
930 columns: vec![],
931 rows: vec![],
932 }),
933 }
934}
935
936pub(super) fn update_has_subquery(stmt: &UpdateStmt) -> bool {
937 stmt.where_clause.as_ref().is_some_and(has_subquery)
938 || stmt.assignments.iter().any(|(_, e)| has_subquery(e))
939}
940
941pub(super) fn materialize_update(
942 stmt: &UpdateStmt,
943 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
944) -> Result<UpdateStmt> {
945 let where_clause = stmt
946 .where_clause
947 .as_ref()
948 .map(|e| materialize_expr(e, exec_sub))
949 .transpose()?;
950 let assignments = stmt
951 .assignments
952 .iter()
953 .map(|(name, expr)| Ok((name.clone(), materialize_expr(expr, exec_sub)?)))
954 .collect::<Result<Vec<_>>>()?;
955 Ok(UpdateStmt {
956 table: stmt.table.clone(),
957 assignments,
958 where_clause,
959 returning: stmt.returning.clone(),
960 })
961}
962
963pub(super) fn delete_has_subquery(stmt: &DeleteStmt) -> bool {
964 stmt.where_clause.as_ref().is_some_and(has_subquery)
965}
966
967pub(super) fn materialize_delete(
968 stmt: &DeleteStmt,
969 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
970) -> Result<DeleteStmt> {
971 let where_clause = stmt
972 .where_clause
973 .as_ref()
974 .map(|e| materialize_expr(e, exec_sub))
975 .transpose()?;
976 Ok(DeleteStmt {
977 table: stmt.table.clone(),
978 where_clause,
979 returning: stmt.returning.clone(),
980 })
981}
982
983pub(super) fn insert_has_subquery(stmt: &InsertStmt) -> bool {
984 match &stmt.source {
985 InsertSource::Values(rows) => rows.iter().any(|row| row.iter().any(has_subquery)),
986 InsertSource::Select(_) => false,
988 }
989}
990
991pub(super) fn materialize_insert(
992 stmt: &InsertStmt,
993 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
994) -> Result<InsertStmt> {
995 let source = match &stmt.source {
996 InsertSource::Values(rows) => {
997 let mat = rows
998 .iter()
999 .map(|row| {
1000 row.iter()
1001 .map(|e| materialize_expr(e, exec_sub))
1002 .collect::<Result<Vec<_>>>()
1003 })
1004 .collect::<Result<Vec<_>>>()?;
1005 InsertSource::Values(mat)
1006 }
1007 InsertSource::Select(sq) => {
1008 let ctes = sq
1009 .ctes
1010 .iter()
1011 .map(|c| {
1012 Ok(CteDefinition {
1013 name: c.name.clone(),
1014 column_aliases: c.column_aliases.clone(),
1015 body: materialize_query_body(&c.body, exec_sub)?,
1016 })
1017 })
1018 .collect::<Result<Vec<_>>>()?;
1019 let body = materialize_query_body(&sq.body, exec_sub)?;
1020 InsertSource::Select(Box::new(SelectQuery {
1021 ctes,
1022 recursive: sq.recursive,
1023 body,
1024 }))
1025 }
1026 };
1027 Ok(InsertStmt {
1028 table: stmt.table.clone(),
1029 columns: stmt.columns.clone(),
1030 source,
1031 on_conflict: stmt.on_conflict.clone(),
1032 returning: stmt.returning.clone(),
1033 })
1034}
1035
1036pub(super) fn materialize_query_body(
1037 body: &QueryBody,
1038 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1039) -> Result<QueryBody> {
1040 match body {
1041 QueryBody::Select(sel) => Ok(QueryBody::Select(Box::new(materialize_stmt(
1042 sel, exec_sub,
1043 )?))),
1044 QueryBody::Compound(comp) => Ok(QueryBody::Compound(Box::new(CompoundSelect {
1045 op: comp.op.clone(),
1046 all: comp.all,
1047 left: Box::new(materialize_query_body(&comp.left, exec_sub)?),
1048 right: Box::new(materialize_query_body(&comp.right, exec_sub)?),
1049 order_by: comp.order_by.clone(),
1050 limit: comp.limit.clone(),
1051 offset: comp.offset.clone(),
1052 }))),
1053 QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Ok(body.clone()),
1054 }
1055}
1056
1057pub(super) fn exec_query_body_with_read(
1058 rtx: &mut ReadTxn<'_>,
1059 schema: &SchemaManager,
1060 body: &QueryBody,
1061 ctes: &CteContext,
1062) -> Result<ExecutionResult> {
1063 match body {
1064 QueryBody::Select(sel) => super::exec_select_with_read(rtx, schema, sel, ctes),
1065 QueryBody::Compound(comp) => exec_compound_select_with_read(rtx, schema, comp, ctes),
1066 QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Err(
1067 SqlError::Unsupported("DML CTE bodies require an active write transaction".into()),
1068 ),
1069 }
1070}
1071
1072pub(super) fn exec_query_body_in_txn(
1073 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1074 schema: &SchemaManager,
1075 body: &QueryBody,
1076 ctes: &CteContext,
1077) -> Result<ExecutionResult> {
1078 match body {
1079 QueryBody::Select(sel) => super::exec_select_in_txn(wtx, schema, sel, ctes),
1080 QueryBody::Compound(comp) => exec_compound_select_in_txn(wtx, schema, comp, ctes),
1081 QueryBody::Insert(ins) => exec_insert_in_txn_with_ctes(wtx, schema, ins, &[], ctes),
1082 QueryBody::Update(upd) => super::exec_update_in_txn(wtx, schema, upd),
1083 QueryBody::Delete(del) => super::exec_delete_in_txn(wtx, schema, del),
1084 }
1085}
1086
1087pub(super) fn exec_query_body_read(
1088 db: &Database,
1089 schema: &SchemaManager,
1090 body: &QueryBody,
1091 ctes: &CteContext,
1092) -> Result<QueryResult> {
1093 let mut rtx = db.begin_read();
1094 exec_query_body_with_read_qr(&mut rtx, schema, body, ctes)
1095}
1096
1097pub(super) fn exec_query_body_with_read_qr(
1098 rtx: &mut ReadTxn<'_>,
1099 schema: &SchemaManager,
1100 body: &QueryBody,
1101 ctes: &CteContext,
1102) -> Result<QueryResult> {
1103 match exec_query_body_with_read(rtx, schema, body, ctes)? {
1104 ExecutionResult::Query(qr) => Ok(qr),
1105 _ => Ok(QueryResult {
1106 columns: vec![],
1107 rows: vec![],
1108 }),
1109 }
1110}
1111
1112pub(super) fn exec_query_body_write(
1113 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1114 schema: &SchemaManager,
1115 body: &QueryBody,
1116 ctes: &CteContext,
1117) -> Result<QueryResult> {
1118 match exec_query_body_in_txn(wtx, schema, body, ctes)? {
1119 ExecutionResult::Query(qr) => Ok(qr),
1120 _ => Ok(QueryResult {
1121 columns: vec![],
1122 rows: vec![],
1123 }),
1124 }
1125}
1126
1127pub(super) fn exec_compound_select_with_read(
1128 rtx: &mut ReadTxn<'_>,
1129 schema: &SchemaManager,
1130 comp: &CompoundSelect,
1131 ctes: &CteContext,
1132) -> Result<ExecutionResult> {
1133 let left_qr = match exec_query_body_with_read(rtx, schema, &comp.left, ctes)? {
1134 ExecutionResult::Query(qr) => qr,
1135 _ => QueryResult {
1136 columns: vec![],
1137 rows: vec![],
1138 },
1139 };
1140 let right_qr = match exec_query_body_with_read(rtx, schema, &comp.right, ctes)? {
1141 ExecutionResult::Query(qr) => qr,
1142 _ => QueryResult {
1143 columns: vec![],
1144 rows: vec![],
1145 },
1146 };
1147 apply_set_operation(comp, left_qr, right_qr)
1148}
1149
1150pub(super) fn exec_compound_select_in_txn(
1151 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1152 schema: &SchemaManager,
1153 comp: &CompoundSelect,
1154 ctes: &CteContext,
1155) -> Result<ExecutionResult> {
1156 let left_qr = match exec_query_body_in_txn(wtx, schema, &comp.left, ctes)? {
1157 ExecutionResult::Query(qr) => qr,
1158 _ => QueryResult {
1159 columns: vec![],
1160 rows: vec![],
1161 },
1162 };
1163 let right_qr = match exec_query_body_in_txn(wtx, schema, &comp.right, ctes)? {
1164 ExecutionResult::Query(qr) => qr,
1165 _ => QueryResult {
1166 columns: vec![],
1167 rows: vec![],
1168 },
1169 };
1170 apply_set_operation(comp, left_qr, right_qr)
1171}
1172
1173pub(super) fn apply_set_operation(
1174 comp: &CompoundSelect,
1175 left_qr: QueryResult,
1176 right_qr: QueryResult,
1177) -> Result<ExecutionResult> {
1178 if !left_qr.columns.is_empty()
1179 && !right_qr.columns.is_empty()
1180 && left_qr.columns.len() != right_qr.columns.len()
1181 {
1182 return Err(SqlError::CompoundColumnCountMismatch {
1183 left: left_qr.columns.len(),
1184 right: right_qr.columns.len(),
1185 });
1186 }
1187
1188 let columns = left_qr.columns;
1189
1190 let mut rows = match (&comp.op, comp.all) {
1191 (SetOp::Union, true) => {
1192 let total = left_qr.rows.len().saturating_add(right_qr.rows.len());
1193 let mut rows = Vec::with_capacity(total);
1194 rows.extend(left_qr.rows);
1195 rows.extend(right_qr.rows);
1196 rows
1197 }
1198 (SetOp::Union, false) => {
1199 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1200 let mut rows = Vec::new();
1201 for row in left_qr.rows.into_iter().chain(right_qr.rows) {
1202 if !seen.contains(&row) {
1203 seen.insert(row.clone());
1204 rows.push(row);
1205 }
1206 }
1207 rows
1208 }
1209 (SetOp::Intersect, true) => {
1210 let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1211 for row in &right_qr.rows {
1212 *right_counts.entry(row.clone()).or_insert(0) += 1;
1213 }
1214 let mut rows = Vec::new();
1215 for row in left_qr.rows {
1216 if let Some(count) = right_counts.get_mut(&row) {
1217 if *count > 0 {
1218 *count -= 1;
1219 rows.push(row);
1220 }
1221 }
1222 }
1223 rows
1224 }
1225 (SetOp::Intersect, false) => {
1226 let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1227 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1228 let mut rows = Vec::new();
1229 for row in left_qr.rows {
1230 if right_set.contains(&row) && !seen.contains(&row) {
1231 seen.insert(row.clone());
1232 rows.push(row);
1233 }
1234 }
1235 rows
1236 }
1237 (SetOp::Except, true) => {
1238 let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1239 for row in &right_qr.rows {
1240 *right_counts.entry(row.clone()).or_insert(0) += 1;
1241 }
1242 let mut rows = Vec::new();
1243 for row in left_qr.rows {
1244 if let Some(count) = right_counts.get_mut(&row) {
1245 if *count > 0 {
1246 *count -= 1;
1247 continue;
1248 }
1249 }
1250 rows.push(row);
1251 }
1252 rows
1253 }
1254 (SetOp::Except, false) => {
1255 let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1256 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1257 let mut rows = Vec::new();
1258 for row in left_qr.rows {
1259 if !right_set.contains(&row) && !seen.contains(&row) {
1260 seen.insert(row.clone());
1261 rows.push(row);
1262 }
1263 }
1264 rows
1265 }
1266 };
1267
1268 if !comp.order_by.is_empty() {
1269 let col_defs: Vec<crate::types::ColumnDef> = columns
1270 .iter()
1271 .enumerate()
1272 .map(|(i, name)| crate::types::ColumnDef {
1273 name: name.clone(),
1274 data_type: crate::types::DataType::Null,
1275 nullable: true,
1276 position: i as u16,
1277 default_expr: None,
1278 default_sql: None,
1279 check_expr: None,
1280 check_sql: None,
1281 check_name: None,
1282 is_with_timezone: false,
1283 generated_expr: None,
1284 generated_sql: None,
1285 generated_kind: None,
1286 collation: crate::types::Collation::Binary,
1287 })
1288 .collect();
1289 sort_rows(&mut rows, &comp.order_by, &col_defs)?;
1290 }
1291
1292 if let Some(ref offset_expr) = comp.offset {
1293 let offset = eval_const_int(offset_expr)?.max(0) as usize;
1294 if offset < rows.len() {
1295 rows = rows.split_off(offset);
1296 } else {
1297 rows.clear();
1298 }
1299 }
1300
1301 if let Some(ref limit_expr) = comp.limit {
1302 let limit = eval_const_int(limit_expr)?.max(0) as usize;
1303 rows.truncate(limit);
1304 }
1305
1306 Ok(ExecutionResult::Query(QueryResult { columns, rows }))
1307}
1308
1309struct InsertBufs {
1310 row: Vec<Value>,
1311 pk_values: Vec<Value>,
1312 value_values: Vec<Value>,
1313 key_buf: Vec<u8>,
1314 value_buf: Vec<u8>,
1315 col_indices: Vec<usize>,
1316 fk_key_buf: Vec<u8>,
1317}
1318
1319impl InsertBufs {
1320 fn new() -> Self {
1321 Self {
1322 row: Vec::new(),
1323 pk_values: Vec::new(),
1324 value_values: Vec::new(),
1325 key_buf: Vec::with_capacity(64),
1326 value_buf: Vec::with_capacity(256),
1327 col_indices: Vec::new(),
1328 fk_key_buf: Vec::with_capacity(64),
1329 }
1330 }
1331}
1332
1333thread_local! {
1334 static INSERT_SCRATCH: RefCell<InsertBufs> = RefCell::new(InsertBufs::new());
1335 static UPSERT_SCRATCH: RefCell<UpsertBufs> = RefCell::new(UpsertBufs::new());
1336}
1337
1338fn with_insert_scratch<R>(f: impl FnOnce(&mut InsertBufs) -> R) -> R {
1339 INSERT_SCRATCH.with(|slot| match slot.try_borrow_mut() {
1340 Ok(mut borrowed) => f(&mut borrowed),
1341 Err(_) => {
1342 let mut local = InsertBufs::new();
1343 f(&mut local)
1344 }
1345 })
1346}
1347
1348pub(super) struct UpsertBufs {
1349 old_row: Vec<Value>,
1350 new_row: Vec<Value>,
1351 value_values: Vec<Value>,
1352 new_value_buf: Vec<u8>,
1353}
1354
1355impl UpsertBufs {
1356 pub(super) fn new() -> Self {
1357 Self {
1358 old_row: Vec::new(),
1359 new_row: Vec::new(),
1360 value_values: Vec::new(),
1361 new_value_buf: Vec::with_capacity(256),
1362 }
1363 }
1364}
1365
1366pub fn exec_insert_in_txn(
1367 wtx: &mut WriteTxn<'_>,
1368 schema: &SchemaManager,
1369 stmt: &InsertStmt,
1370 params: &[Value],
1371) -> Result<ExecutionResult> {
1372 with_insert_scratch(|bufs| {
1373 exec_insert_in_txn_impl(
1374 wtx,
1375 schema,
1376 stmt,
1377 params,
1378 bufs,
1379 None,
1380 &CteContext::default(),
1381 )
1382 })
1383}
1384
1385pub(super) fn exec_insert_in_txn_with_ctes(
1386 wtx: &mut WriteTxn<'_>,
1387 schema: &SchemaManager,
1388 stmt: &InsertStmt,
1389 params: &[Value],
1390 outer_ctes: &CteContext,
1391) -> Result<ExecutionResult> {
1392 with_insert_scratch(|bufs| {
1393 exec_insert_in_txn_impl(wtx, schema, stmt, params, bufs, None, outer_ctes)
1394 })
1395}
1396
1397fn exec_insert_in_txn_cached(
1398 wtx: &mut WriteTxn<'_>,
1399 schema: &SchemaManager,
1400 stmt: &InsertStmt,
1401 params: &[Value],
1402 cache: &InsertCache,
1403) -> Result<ExecutionResult> {
1404 with_insert_scratch(|bufs| {
1405 exec_insert_in_txn_impl(
1406 wtx,
1407 schema,
1408 stmt,
1409 params,
1410 bufs,
1411 Some(cache),
1412 &CteContext::default(),
1413 )
1414 })
1415}
1416
1417fn exec_insert_in_txn_impl(
1418 wtx: &mut WriteTxn<'_>,
1419 schema: &SchemaManager,
1420 stmt: &InsertStmt,
1421 params: &[Value],
1422 bufs: &mut InsertBufs,
1423 cache: Option<&InsertCache>,
1424 outer_ctes: &CteContext,
1425) -> Result<ExecutionResult> {
1426 let empty_ctes = CteContext::default();
1427 let materialized;
1428 let has_sub = match cache {
1429 Some(c) => c.has_subquery,
1430 None => insert_has_subquery(stmt),
1431 };
1432 let stmt = if has_sub {
1433 materialized = materialize_insert(stmt, &mut |sub| {
1434 exec_subquery_write(wtx, schema, sub, &empty_ctes)
1435 })?;
1436 &materialized
1437 } else {
1438 stmt
1439 };
1440
1441 let view_lookup_key = stmt.table.to_ascii_lowercase();
1442 if let Some(view_def) = schema.get_view(&view_lookup_key) {
1443 if super::triggers::has_instead_of(
1444 schema,
1445 &view_lookup_key,
1446 super::triggers::FireEvent::Insert,
1447 ) {
1448 let aliases = view_def.column_aliases.clone();
1449 return exec_instead_of_view_insert_in_txn(
1450 wtx,
1451 schema,
1452 &view_lookup_key,
1453 &aliases,
1454 stmt,
1455 params,
1456 );
1457 }
1458 return Err(SqlError::CannotModifyView(stmt.table.clone()));
1459 }
1460 if schema.get_matview(&view_lookup_key).is_some() {
1461 return Err(SqlError::CannotModifyView(format!(
1462 "materialized view '{}' is read-only — use REFRESH MATERIALIZED VIEW",
1463 stmt.table
1464 )));
1465 }
1466
1467 let table_schema = schema
1468 .get(&stmt.table)
1469 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1470 if table_schema.has_ann_index() {
1471 super::ann_persist::purge_segment(wtx, &table_schema.name)?;
1472 }
1473
1474 let default_columns;
1475 let insert_columns: &[String] = if stmt.columns.is_empty() {
1476 default_columns = table_schema
1477 .columns
1478 .iter()
1479 .map(|c| c.name.clone())
1480 .collect::<Vec<_>>();
1481 &default_columns
1482 } else {
1483 &stmt.columns
1484 };
1485
1486 bufs.col_indices.clear();
1487 if let Some(c) = cache {
1488 bufs.col_indices.extend_from_slice(&c.col_indices);
1489 } else {
1490 for name in insert_columns {
1491 bufs.col_indices.push(
1492 table_schema
1493 .column_index(name)
1494 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?,
1495 );
1496 }
1497 }
1498
1499 if cache.is_none() {
1500 for &ci in &bufs.col_indices {
1501 if table_schema.columns[ci].generated_kind.is_some() {
1502 return Err(SqlError::CannotInsertIntoGeneratedColumn(
1503 table_schema.columns[ci].name.clone(),
1504 ));
1505 }
1506 }
1507 }
1508
1509 let generated_cols_uncached: Vec<(usize, &Expr, FastGenEval)>;
1510 let cached_gen_positions: &[usize];
1511 let cached_gen_fast_evals: &[FastGenEval];
1512 if let Some(c) = cache {
1513 cached_gen_positions = &c.generated_col_positions;
1514 cached_gen_fast_evals = &c.generated_fast_evals;
1515 generated_cols_uncached = Vec::new();
1516 } else {
1517 cached_gen_positions = &[];
1518 cached_gen_fast_evals = &[];
1519 generated_cols_uncached = table_schema
1520 .columns
1521 .iter()
1522 .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
1523 .map(|c| {
1524 let expr = c.generated_expr.as_ref().unwrap();
1525 let fe = detect_fast_gen_eval(expr, table_schema);
1526 (c.position as usize, expr, fe)
1527 })
1528 .collect();
1529 }
1530 let has_gen_cols = !cached_gen_positions.is_empty() || !generated_cols_uncached.is_empty();
1531 let row_col_map_for_gen: Option<&ColumnMap> = has_gen_cols.then(|| table_schema.column_map());
1532
1533 let any_defaults = match cache {
1534 Some(c) => c.any_defaults,
1535 None => table_schema
1536 .columns
1537 .iter()
1538 .any(|c| c.default_expr.is_some()),
1539 };
1540 let defaults: Vec<(usize, &Expr)> = if any_defaults {
1541 table_schema
1542 .columns
1543 .iter()
1544 .filter(|c| {
1545 c.default_expr.is_some() && !bufs.col_indices.contains(&(c.position as usize))
1546 })
1547 .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
1548 .collect()
1549 } else {
1550 Vec::new()
1551 };
1552
1553 let has_checks = match cache {
1554 Some(c) => c.has_checks,
1555 None => table_schema.has_checks(),
1556 };
1557 let check_col_map = if has_checks {
1558 Some(table_schema.column_map())
1559 } else {
1560 None
1561 };
1562
1563 let (pk_indices, non_pk, enc_pos, phys_count, dropped): (
1564 &[usize],
1565 &[usize],
1566 &[u16],
1567 usize,
1568 &[u16],
1569 ) = if let Some(c) = cache {
1570 (
1571 &c.pk_indices,
1572 &c.non_pk_indices,
1573 &c.encoding_positions,
1574 c.phys_count,
1575 &c.dropped_non_pk_slots,
1576 )
1577 } else {
1578 (
1579 table_schema.pk_indices(),
1580 table_schema.non_pk_indices(),
1581 table_schema.encoding_positions(),
1582 table_schema.physical_non_pk_count(),
1583 table_schema.dropped_non_pk_slots(),
1584 )
1585 };
1586
1587 bufs.row.resize(table_schema.columns.len(), Value::Null);
1588 bufs.pk_values.resize(pk_indices.len(), Value::Null);
1589 bufs.value_values.resize(phys_count, Value::Null);
1590
1591 let table_bytes = table_schema.name.as_bytes();
1592 let has_fks = !table_schema.foreign_keys.is_empty();
1593 let has_indices = !table_schema.indices.is_empty();
1594 let has_defaults = !defaults.is_empty();
1595
1596 let compiled_conflict: Option<Arc<CompiledOnConflict>> = match (cache, &stmt.on_conflict) {
1597 (Some(c), Some(_)) if c.on_conflict.is_some() => c.on_conflict.clone(),
1598 (_, Some(oc)) => Some(Arc::new(compile_on_conflict(oc, table_schema)?)),
1599 (_, None) => None,
1600 };
1601
1602 let row_col_map: Option<&ColumnMap> = compiled_conflict
1603 .is_some()
1604 .then(|| table_schema.column_map());
1605
1606 let select_rows = match &stmt.source {
1607 InsertSource::Select(sq) => {
1608 let insert_ctes = super::materialize_all_ctes_with_outer(
1609 &sq.ctes,
1610 sq.recursive,
1611 outer_ctes,
1612 &mut |body, ctx| exec_query_body_write(wtx, schema, body, ctx),
1613 )?;
1614 let qr = exec_query_body_write(wtx, schema, &sq.body, &insert_ctes)?;
1615 Some(qr.rows)
1616 }
1617 InsertSource::Values(_) => None,
1618 };
1619
1620 let mut count: u64 = 0;
1621 let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
1622 stmt.returning.as_ref().map(|_| Vec::new());
1623
1624 let plain_insert = compiled_conflict.is_none();
1625 let single_int_pk = is_single_int_pk(table_schema);
1626 let mut min_inserted_pk: Option<i64> = None;
1627
1628 let values = match &stmt.source {
1629 InsertSource::Values(rows) => Some(rows.as_slice()),
1630 InsertSource::Select(_) => None,
1631 };
1632 let sel_rows = select_rows.as_deref();
1633
1634 let total = match (values, sel_rows) {
1635 (Some(rows), _) => rows.len(),
1636 (_, Some(rows)) => rows.len(),
1637 _ => 0,
1638 };
1639
1640 if let Some(sel) = sel_rows {
1641 if !sel.is_empty() && sel[0].len() != insert_columns.len() {
1642 return Err(SqlError::InvalidValue(format!(
1643 "INSERT ... SELECT column count mismatch: expected {}, got {}",
1644 insert_columns.len(),
1645 sel[0].len()
1646 )));
1647 }
1648 }
1649
1650 let has_insert_statement_triggers_impl =
1651 schema.triggers_for(&table_schema.name).iter().any(|t| {
1652 t.enabled
1653 && t.granularity == crate::parser::TriggerGranularity::ForEachStatement
1654 && t.events
1655 .iter()
1656 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1657 });
1658 let mut stmt_new_rows_impl: Vec<Vec<Value>> = if has_insert_statement_triggers_impl {
1659 Vec::with_capacity(total)
1660 } else {
1661 Vec::new()
1662 };
1663 if has_insert_statement_triggers_impl {
1664 super::triggers::fire_statement_triggers(
1665 wtx,
1666 schema,
1667 &table_schema.name,
1668 crate::parser::TriggerTiming::Before,
1669 super::triggers::FireEvent::Insert,
1670 &table_schema.columns,
1671 &[],
1672 &[],
1673 )?;
1674 }
1675
1676 let (has_before_insert_triggers, has_after_insert_triggers, has_after_update_triggers) =
1677 row_insert_trigger_flags(schema, &table_schema.name);
1678
1679 let skip_row_clear = cache.is_some_and(|c| c.row_fully_overwritten);
1680 for idx in 0..total {
1681 if !skip_row_clear {
1682 for v in bufs.row.iter_mut() {
1683 *v = Value::Null;
1684 }
1685 }
1686
1687 if let Some(value_rows) = values {
1688 if let Some(plan) = cache.and_then(|c| c.bind_plan.as_ref()) {
1689 for action in plan {
1690 match action {
1691 BindAction::Param {
1692 param_idx,
1693 col_idx,
1694 target,
1695 } => {
1696 let v = ¶ms[*param_idx];
1697 bufs.row[*col_idx] = if v.is_null() {
1698 Value::Null
1699 } else if v.data_type() == *target {
1700 v.clone()
1701 } else {
1702 let got = v.data_type();
1703 v.clone().coerce_into(*target).ok_or_else(|| {
1704 SqlError::TypeMismatch {
1705 expected: target.to_string(),
1706 got: got.to_string(),
1707 }
1708 })?
1709 };
1710 }
1711 BindAction::Literal { value, col_idx } => {
1712 bufs.row[*col_idx] = value.clone();
1713 }
1714 }
1715 }
1716 } else {
1717 let value_row = &value_rows[idx];
1718 if value_row.len() != insert_columns.len() {
1719 return Err(SqlError::InvalidValue(format!(
1720 "expected {} values, got {}",
1721 insert_columns.len(),
1722 value_row.len()
1723 )));
1724 }
1725 for (i, expr) in value_row.iter().enumerate() {
1726 let val = match expr {
1727 Expr::Parameter(n) => params
1728 .get(n - 1)
1729 .cloned()
1730 .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
1731 Expr::Literal(v) => v.clone(),
1732 _ => eval_const_expr(expr)?,
1733 };
1734 let col_idx = bufs.col_indices[i];
1735 let col = &table_schema.columns[col_idx];
1736 let got_type = val.data_type();
1737 bufs.row[col_idx] = if val.is_null() {
1738 Value::Null
1739 } else {
1740 val.coerce_into(col.data_type)
1741 .ok_or_else(|| SqlError::TypeMismatch {
1742 expected: col.data_type.to_string(),
1743 got: got_type.to_string(),
1744 })?
1745 };
1746 }
1747 }
1748 } else if let Some(sel) = sel_rows {
1749 let sel_row = &sel[idx];
1750 for (i, val) in sel_row.iter().enumerate() {
1751 let col_idx = bufs.col_indices[i];
1752 let col = &table_schema.columns[col_idx];
1753 let got_type = val.data_type();
1754 bufs.row[col_idx] = if val.is_null() {
1755 Value::Null
1756 } else {
1757 val.clone().coerce_into(col.data_type).ok_or_else(|| {
1758 SqlError::TypeMismatch {
1759 expected: col.data_type.to_string(),
1760 got: got_type.to_string(),
1761 }
1762 })?
1763 };
1764 }
1765 }
1766
1767 if has_defaults {
1768 for &(pos, def_expr) in &defaults {
1769 let val = eval_const_expr(def_expr)?;
1770 let col = &table_schema.columns[pos];
1771 if !val.is_null() {
1772 let got_type = val.data_type();
1773 bufs.row[pos] =
1774 val.coerce_into(col.data_type)
1775 .ok_or_else(|| SqlError::TypeMismatch {
1776 expected: col.data_type.to_string(),
1777 got: got_type.to_string(),
1778 })?;
1779 }
1780 }
1781 }
1782
1783 if let Some(gen_map) = row_col_map_for_gen {
1784 if cache.is_some() {
1785 for (pos, fast) in cached_gen_positions
1786 .iter()
1787 .copied()
1788 .zip(cached_gen_fast_evals.iter())
1789 {
1790 let gen_expr = table_schema.columns[pos].generated_expr.as_ref().unwrap();
1791 let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1792 let col = &table_schema.columns[pos];
1793 bufs.row[pos] = if val.is_null() {
1794 Value::Null
1795 } else {
1796 let got_type = val.data_type();
1797 val.coerce_into(col.data_type)
1798 .ok_or_else(|| SqlError::TypeMismatch {
1799 expected: col.data_type.to_string(),
1800 got: got_type.to_string(),
1801 })?
1802 };
1803 }
1804 } else {
1805 for (pos, gen_expr, fast) in &generated_cols_uncached {
1806 let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1807 let col = &table_schema.columns[*pos];
1808 bufs.row[*pos] = if val.is_null() {
1809 Value::Null
1810 } else {
1811 let got_type = val.data_type();
1812 val.coerce_into(col.data_type)
1813 .ok_or_else(|| SqlError::TypeMismatch {
1814 expected: col.data_type.to_string(),
1815 got: got_type.to_string(),
1816 })?
1817 };
1818 }
1819 }
1820 }
1821
1822 if let Some(c) = cache {
1823 for &pos in &c.not_null_indices {
1824 if bufs.row[pos as usize].is_null() {
1825 return Err(SqlError::NotNullViolation(
1826 table_schema.columns[pos as usize].name.clone(),
1827 ));
1828 }
1829 }
1830 } else {
1831 for col in &table_schema.columns {
1832 if !col.nullable && bufs.row[col.position as usize].is_null() {
1833 return Err(SqlError::NotNullViolation(col.name.clone()));
1834 }
1835 }
1836 }
1837
1838 if let Some(col_map) = check_col_map {
1839 for col in &table_schema.columns {
1840 if let Some(ref check) = col.check_expr {
1841 let result = eval_expr(check, &EvalCtx::new(col_map, &bufs.row))?;
1842 if !is_truthy(&result) && !result.is_null() {
1843 let name = col.check_name.as_deref().unwrap_or(&col.name);
1844 return Err(SqlError::CheckViolation(name.to_string()));
1845 }
1846 }
1847 }
1848 for tc in &table_schema.check_constraints {
1849 let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &bufs.row))?;
1850 if !is_truthy(&result) && !result.is_null() {
1851 let name = tc.name.as_deref().unwrap_or(&tc.sql);
1852 return Err(SqlError::CheckViolation(name.to_string()));
1853 }
1854 }
1855 }
1856
1857 if has_fks {
1858 for fk in &table_schema.foreign_keys {
1859 let any_null = fk.columns.iter().any(|&ci| bufs.row[ci as usize].is_null());
1860 if any_null {
1861 continue;
1862 }
1863 crate::encoding::encode_composite_key_from_indices(
1864 &fk.columns,
1865 &bufs.row,
1866 &mut bufs.fk_key_buf,
1867 );
1868 if fk.deferrable && fk.initially_deferred {
1869 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
1870 wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
1871 fk_name: name,
1872 foreign_table: fk.foreign_table.as_bytes().to_vec(),
1873 parent_key: bufs.fk_key_buf.clone(),
1874 });
1875 continue;
1876 }
1877 if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &bufs.fk_key_buf) {
1878 let found = wtx
1879 .table_get(fk.foreign_table.as_bytes(), &bufs.fk_key_buf)
1880 .map_err(SqlError::Storage)?;
1881 if found.is_none() {
1882 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
1883 return Err(SqlError::ForeignKeyViolation(name.to_string()));
1884 }
1885 wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &bufs.fk_key_buf);
1886 }
1887 }
1888 }
1889
1890 let proposed_row_for_returning: Option<Vec<Value>> =
1891 returning_rows.as_ref().map(|_| bufs.row.clone());
1892 let row_for_stmt_trigger_impl: Option<Vec<Value>> = if has_insert_statement_triggers_impl {
1893 Some(bufs.row.clone())
1894 } else {
1895 None
1896 };
1897
1898 if has_before_insert_triggers {
1899 super::triggers::fire_row_triggers(
1900 wtx,
1901 schema,
1902 &table_schema.name,
1903 crate::parser::TriggerTiming::Before,
1904 super::triggers::FireEvent::Insert,
1905 None,
1906 Some(bufs.row.clone()),
1907 &table_schema.columns,
1908 )?;
1909 }
1910
1911 for (j, &i) in pk_indices.iter().enumerate() {
1912 bufs.pk_values[j] = std::mem::replace(&mut bufs.row[i], Value::Null);
1913 }
1914 match cache.map(|c| c.single_int_pk).unwrap_or(false) {
1915 true => match bufs.pk_values[0] {
1916 Value::Integer(v) => crate::encoding::encode_int_key_into(v, &mut bufs.key_buf),
1917 _ => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1918 },
1919 false => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1920 }
1921 if plain_insert && single_int_pk {
1922 if let Value::Integer(id) = &bufs.pk_values[0] {
1923 min_inserted_pk = Some(min_inserted_pk.map_or(*id, |m| m.min(*id)));
1924 }
1925 }
1926
1927 for &slot in dropped {
1928 bufs.value_values[slot as usize] = Value::Null;
1929 }
1930 for (j, &i) in non_pk.iter().enumerate() {
1931 let col = &table_schema.columns[i];
1932 if matches!(
1933 col.generated_kind,
1934 Some(crate::parser::GeneratedKind::Virtual)
1935 ) {
1936 bufs.value_values[enc_pos[j] as usize] = Value::Null;
1937 bufs.row[i] = Value::Null;
1938 } else {
1939 bufs.value_values[enc_pos[j] as usize] =
1940 std::mem::replace(&mut bufs.row[i], Value::Null);
1941 }
1942 }
1943 match cache.and_then(|c| c.row_encoder.as_ref()) {
1944 Some(tmpl) => crate::encoding::encode_row_with_template(
1945 tmpl,
1946 &bufs.value_values,
1947 &mut bufs.value_buf,
1948 )?,
1949 None => encode_row_into(&bufs.value_values, &mut bufs.value_buf),
1950 }
1951
1952 if bufs.key_buf.len() > citadel_core::MAX_KEY_SIZE {
1953 return Err(SqlError::KeyTooLarge {
1954 size: bufs.key_buf.len(),
1955 max: citadel_core::MAX_KEY_SIZE,
1956 });
1957 }
1958 if bufs.value_buf.len() > citadel_core::MAX_VALUE_SIZE {
1959 return Err(SqlError::RowTooLarge {
1960 size: bufs.value_buf.len(),
1961 max: citadel_core::MAX_VALUE_SIZE,
1962 });
1963 }
1964
1965 match compiled_conflict.as_ref() {
1966 None => {
1967 let is_new = wtx
1968 .table_insert(table_bytes, &bufs.key_buf, &bufs.value_buf)
1969 .map_err(SqlError::Storage)?;
1970 if !is_new {
1971 return Err(SqlError::DuplicateKey);
1972 }
1973 if has_indices || has_after_insert_triggers {
1974 for (j, &i) in pk_indices.iter().enumerate() {
1975 bufs.row[i] = bufs.pk_values[j].clone();
1976 }
1977 for (j, &i) in non_pk.iter().enumerate() {
1978 bufs.row[i] = std::mem::replace(
1979 &mut bufs.value_values[enc_pos[j] as usize],
1980 Value::Null,
1981 );
1982 }
1983 if has_indices {
1984 insert_index_entries(wtx, table_schema, &bufs.row, &bufs.pk_values)?;
1985 }
1986 if has_after_insert_triggers {
1987 super::triggers::fire_row_triggers(
1988 wtx,
1989 schema,
1990 &table_schema.name,
1991 crate::parser::TriggerTiming::After,
1992 super::triggers::FireEvent::Insert,
1993 None,
1994 Some(bufs.row.clone()),
1995 &table_schema.columns,
1996 )?;
1997 }
1998 }
1999 if let Some(r) = row_for_stmt_trigger_impl.clone() {
2000 stmt_new_rows_impl.push(r);
2001 }
2002 count += 1;
2003 if let Some(buf) = returning_rows.as_mut() {
2004 buf.push((None, proposed_row_for_returning));
2005 }
2006 }
2007 Some(oc) => {
2008 let oc_ref: &CompiledOnConflict = oc;
2009 let needs_row = upsert_needs_row(oc_ref, table_schema);
2010 if needs_row {
2011 for (j, &i) in pk_indices.iter().enumerate() {
2012 bufs.row[i] = bufs.pk_values[j].clone();
2013 }
2014 for (j, &i) in non_pk.iter().enumerate() {
2015 bufs.row[i] = std::mem::replace(
2016 &mut bufs.value_values[enc_pos[j] as usize],
2017 Value::Null,
2018 );
2019 }
2020 }
2021 let outcome = apply_insert_with_conflict(
2022 wtx,
2023 table_schema,
2024 &bufs.key_buf,
2025 &bufs.value_buf,
2026 &bufs.row,
2027 &bufs.pk_values,
2028 oc_ref,
2029 row_col_map.unwrap(),
2030 stmt.returning.is_some() || has_after_update_triggers,
2032 )?;
2033 match outcome {
2034 InsertRowOutcome::Inserted => {
2035 count += 1;
2036 if let Some(buf) = returning_rows.as_mut() {
2037 buf.push((None, proposed_row_for_returning));
2038 }
2039 if let Some(r) = row_for_stmt_trigger_impl.clone() {
2040 stmt_new_rows_impl.push(r);
2041 }
2042 if has_after_insert_triggers {
2043 super::triggers::fire_row_triggers(
2044 wtx,
2045 schema,
2046 &table_schema.name,
2047 crate::parser::TriggerTiming::After,
2048 super::triggers::FireEvent::Insert,
2049 None,
2050 Some(bufs.row.clone()),
2051 &table_schema.columns,
2052 )?;
2053 }
2054 }
2055 InsertRowOutcome::Updated { old, new } => {
2056 count += 1;
2057 if let Some(buf) = returning_rows.as_mut() {
2058 buf.push((Some(old.clone()), Some(new.clone())));
2059 }
2060 if has_after_update_triggers {
2061 let changed_cols: Vec<String> = match oc_ref {
2062 CompiledOnConflict::DoUpdate { assignments, .. } => assignments
2063 .iter()
2064 .map(|(col_idx, _)| table_schema.columns[*col_idx].name.clone())
2065 .collect(),
2066 _ => Vec::new(),
2067 };
2068 super::triggers::fire_row_triggers(
2069 wtx,
2070 schema,
2071 &table_schema.name,
2072 crate::parser::TriggerTiming::After,
2073 super::triggers::FireEvent::Update {
2074 changed_columns: &changed_cols,
2075 },
2076 Some(old),
2077 Some(new),
2078 &table_schema.columns,
2079 )?;
2080 }
2081 }
2082 InsertRowOutcome::Skipped => {}
2083 }
2084 }
2085 }
2086 }
2087
2088 mark_insert_dml(
2089 schema,
2090 &table_schema.name,
2091 !plain_insert,
2092 single_int_pk,
2093 min_inserted_pk,
2094 count,
2095 );
2096
2097 if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
2098 if has_insert_statement_triggers_impl {
2099 super::triggers::fire_statement_triggers(
2100 wtx,
2101 schema,
2102 &table_schema.name,
2103 crate::parser::TriggerTiming::After,
2104 super::triggers::FireEvent::Insert,
2105 &table_schema.columns,
2106 &[],
2107 &stmt_new_rows_impl,
2108 )?;
2109 }
2110 return Ok(ExecutionResult::Query(super::helpers::project_returning(
2111 table_schema,
2112 returning_cols,
2113 &rows,
2114 )?));
2115 }
2116
2117 if has_insert_statement_triggers_impl {
2118 super::triggers::fire_statement_triggers(
2119 wtx,
2120 schema,
2121 &table_schema.name,
2122 crate::parser::TriggerTiming::After,
2123 super::triggers::FireEvent::Insert,
2124 &table_schema.columns,
2125 &[],
2126 &stmt_new_rows_impl,
2127 )?;
2128 }
2129
2130 Ok(ExecutionResult::RowsAffected(count))
2131}
2132
2133pub struct CompiledInsert {
2134 table_lower: String,
2135 cached: Option<InsertCache>,
2136}
2137
2138struct InsertCache {
2139 col_indices: Vec<usize>,
2140 has_subquery: bool,
2141 any_defaults: bool,
2142 has_checks: bool,
2143 on_conflict: Option<Arc<CompiledOnConflict>>,
2144 generated_col_positions: Vec<usize>,
2145 generated_fast_evals: Vec<FastGenEval>,
2146 pk_indices: Vec<usize>,
2147 non_pk_indices: Vec<usize>,
2148 encoding_positions: Vec<u16>,
2149 dropped_non_pk_slots: Vec<u16>,
2150 phys_count: usize,
2151 single_int_pk: bool,
2152 not_null_indices: Vec<u16>,
2153 bind_plan: Option<Vec<BindAction>>,
2154 row_fully_overwritten: bool,
2155 row_encoder: Option<crate::encoding::RowTemplate>,
2156 is_trivial_fast: bool,
2157 trivial_fast_program: Option<TrivialFastProgram>,
2158 needs_scoped_params: bool,
2159}
2160
2161#[derive(Clone)]
2162enum BindAction {
2163 Param {
2164 param_idx: usize,
2165 col_idx: usize,
2166 target: DataType,
2167 },
2168 Literal {
2169 value: Value,
2170 col_idx: usize,
2171 },
2172}
2173
2174#[derive(Clone)]
2175struct TrivialFastProgram {
2176 template: Vec<u8>,
2177 ops: Vec<WriteOp>,
2178 pk_param: u8,
2179 fk_checks: Vec<FkCheckSpec>,
2180 index_inserts: Vec<IndexInsertSpec>,
2181 on_dup: DupPolicy,
2182}
2183
2184#[derive(Clone)]
2186enum DupPolicy {
2187 Error,
2188 Skip,
2189 Patch(Vec<DoUpdateFastPath>),
2190}
2191
2192#[derive(Clone)]
2194struct FkCheckSpec {
2195 foreign_table: Vec<u8>,
2196 col_params: Vec<u8>,
2197}
2198
2199#[derive(Clone)]
2201struct IndexInsertSpec {
2202 table: Vec<u8>,
2203 key_params: Vec<(u8, crate::types::Collation)>,
2204}
2205
2206#[derive(Clone)]
2207enum WriteOp {
2208 ParamI64 {
2209 param_idx: u8,
2210 off: u32,
2211 },
2212 LiteralI64 {
2213 value: i64,
2214 off: u32,
2215 },
2216 GenAddParamsI64 {
2217 a_param: u8,
2218 b_param: u8,
2219 off: u32,
2220 },
2221 GenMulAddParamI64 {
2222 param_idx: u8,
2223 mul: i64,
2224 add: i64,
2225 off: u32,
2226 },
2227}
2228
2229fn build_trivial_fast_program(
2230 bind_plan: &[BindAction],
2231 phys_count: usize,
2232 non_virtual_pairs: &[(usize, usize)],
2233 generated_col_positions: &[usize],
2234 generated_fast_evals: &[FastGenEval],
2235 ts: &TableSchema,
2236 on_conflict: Option<&CompiledOnConflict>,
2237) -> Option<TrivialFastProgram> {
2238 let columns = &ts.columns;
2239 let pk_col = ts.pk_indices()[0];
2240
2241 let on_dup = match on_conflict {
2243 None => DupPolicy::Error,
2244 Some(CompiledOnConflict::DoNothing { target })
2245 if matches!(target, None | Some(ConflictKind::PrimaryKey))
2246 && ts.indices.is_empty()
2247 && ts.foreign_keys.is_empty() =>
2248 {
2249 DupPolicy::Skip
2250 }
2251 Some(CompiledOnConflict::DoUpdate {
2252 target: ConflictKind::PrimaryKey,
2253 where_clause: None,
2254 fast_paths: Some(fps),
2255 ..
2256 }) if ts.indices.is_empty() && ts.foreign_keys.is_empty() && !ts.has_checks() => {
2257 DupPolicy::Patch(fps.clone())
2258 }
2259 _ => return None,
2260 };
2261
2262 let mut col_to_bind: rustc_hash::FxHashMap<usize, &BindAction> = Default::default();
2263 for action in bind_plan {
2264 let col = match action {
2265 BindAction::Param { col_idx, .. } | BindAction::Literal { col_idx, .. } => *col_idx,
2266 };
2267 col_to_bind.insert(col, action);
2268 }
2269
2270 let mut slots: Vec<crate::encoding::TemplateSlot> = (0..phys_count)
2272 .map(|_| crate::encoding::TemplateSlot::Null)
2273 .collect();
2274 for &(col, slot) in non_virtual_pairs {
2275 slots[slot] = match col_to_bind.get(&col) {
2276 Some(BindAction::Literal { value, .. }) => {
2277 crate::encoding::TemplateSlot::Const(value.clone())
2278 }
2279 Some(BindAction::Param { target, .. }) => {
2280 if *target != DataType::Integer {
2281 return None;
2282 }
2283 crate::encoding::TemplateSlot::IntHole
2284 }
2285 None => {
2286 if columns[col].data_type != DataType::Integer {
2287 return None;
2288 }
2289 crate::encoding::TemplateSlot::IntHole
2290 }
2291 };
2292 }
2293 let tmpl = crate::encoding::build_row_template(phys_count, &slots);
2294 let col_to_slot: rustc_hash::FxHashMap<usize, usize> =
2295 non_virtual_pairs.iter().copied().collect();
2296 let slot_to_off: rustc_hash::FxHashMap<usize, usize> =
2297 tmpl.slot_offsets.iter().copied().collect();
2298
2299 let mut col_to_param: rustc_hash::FxHashMap<usize, u8> = Default::default();
2300 let mut col_to_lit_int: rustc_hash::FxHashMap<usize, i64> = Default::default();
2301 let mut pk_param: Option<u8> = None;
2302 let mut ops: Vec<WriteOp> = Vec::with_capacity(bind_plan.len() + generated_col_positions.len());
2303 let mut not_null_param_indices: Vec<u8> = Vec::new();
2304
2305 for action in bind_plan {
2306 match action {
2307 BindAction::Param {
2308 param_idx,
2309 col_idx,
2310 target,
2311 } => {
2312 if *target != DataType::Integer {
2313 return None;
2314 }
2315 let pi: u8 = u8::try_from(*param_idx).ok()?;
2316 col_to_param.insert(*col_idx, pi);
2317 if *col_idx == pk_col {
2318 pk_param = Some(pi);
2319 } else {
2320 let slot = *col_to_slot.get(col_idx)?;
2321 let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
2322 ops.push(WriteOp::ParamI64 { param_idx: pi, off });
2323 if !columns[*col_idx].nullable {
2324 not_null_param_indices.push(pi);
2325 }
2326 }
2327 }
2328 BindAction::Literal { value, col_idx } => {
2330 if *col_idx == pk_col {
2331 return None;
2332 }
2333 if let Value::Integer(v) = value {
2334 col_to_lit_int.insert(*col_idx, *v);
2335 }
2336 }
2337 }
2338 }
2339
2340 let pk_param = pk_param?;
2341
2342 for (i, &gen_pos) in generated_col_positions.iter().enumerate() {
2343 let gen_slot = *col_to_slot.get(&gen_pos)?;
2344 let gen_off = u32::try_from(*slot_to_off.get(&gen_slot)?).ok()?;
2345 let gen_col_nullable = columns[gen_pos].nullable;
2346
2347 match &generated_fast_evals[i] {
2348 FastGenEval::IntColAddCol {
2349 left_idx,
2350 right_idx,
2351 } => {
2352 let a_param = col_to_param.get(left_idx).copied();
2353 let b_param = col_to_param.get(right_idx).copied();
2354 match (a_param, b_param) {
2355 (Some(ap), Some(bp)) => {
2356 let deps_safe = gen_col_nullable
2357 || (not_null_param_indices.contains(&ap)
2358 && not_null_param_indices.contains(&bp));
2359 if !deps_safe {
2360 return None;
2361 }
2362 ops.push(WriteOp::GenAddParamsI64 {
2363 a_param: ap,
2364 b_param: bp,
2365 off: gen_off,
2366 });
2367 }
2368 (Some(p), None) => {
2369 let lit = col_to_lit_int.get(right_idx).copied()?;
2370 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2371 return None;
2372 }
2373 ops.push(WriteOp::GenMulAddParamI64 {
2374 param_idx: p,
2375 mul: 1,
2376 add: lit,
2377 off: gen_off,
2378 });
2379 }
2380 (None, Some(p)) => {
2381 let lit = col_to_lit_int.get(left_idx).copied()?;
2382 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2383 return None;
2384 }
2385 ops.push(WriteOp::GenMulAddParamI64 {
2386 param_idx: p,
2387 mul: 1,
2388 add: lit,
2389 off: gen_off,
2390 });
2391 }
2392 (None, None) => {
2393 let la = col_to_lit_int.get(left_idx).copied()?;
2394 let lb = col_to_lit_int.get(right_idx).copied()?;
2395 ops.push(WriteOp::LiteralI64 {
2396 value: la.wrapping_add(lb),
2397 off: gen_off,
2398 });
2399 }
2400 }
2401 }
2402 FastGenEval::IntColMulAdd {
2403 col_schema_idx,
2404 mul,
2405 add,
2406 } => {
2407 if let Some(p) = col_to_param.get(col_schema_idx).copied() {
2408 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2409 return None;
2410 }
2411 ops.push(WriteOp::GenMulAddParamI64 {
2412 param_idx: p,
2413 mul: *mul,
2414 add: *add,
2415 off: gen_off,
2416 });
2417 } else if let Some(lit) = col_to_lit_int.get(col_schema_idx).copied() {
2418 ops.push(WriteOp::LiteralI64 {
2419 value: lit.wrapping_mul(*mul).wrapping_add(*add),
2420 off: gen_off,
2421 });
2422 } else {
2423 return None;
2424 }
2425 }
2426 FastGenEval::None => return None,
2427 }
2428 }
2429
2430 let mut fk_checks: Vec<FkCheckSpec> = Vec::with_capacity(ts.foreign_keys.len());
2431 for fk in &ts.foreign_keys {
2432 if fk.deferrable && fk.initially_deferred {
2433 return None;
2434 }
2435 let mut col_params = Vec::with_capacity(fk.columns.len());
2436 for &c in &fk.columns {
2437 col_params.push(col_to_param.get(&(c as usize)).copied()?);
2438 }
2439 fk_checks.push(FkCheckSpec {
2440 foreign_table: fk.foreign_table.as_bytes().to_vec(),
2441 col_params,
2442 });
2443 }
2444
2445 let mut index_inserts: Vec<IndexInsertSpec> = Vec::with_capacity(ts.indices.len());
2446 for idx in &ts.indices {
2447 if idx.unique
2448 || !idx.is_pure_column_index()
2449 || idx.predicate_expr.is_some()
2450 || idx.predicate_sql.is_some()
2451 {
2452 return None;
2453 }
2454 let mut key_params = Vec::with_capacity(idx.keys.len());
2455 for (i, key) in idx.keys.iter().enumerate() {
2456 let crate::types::IndexKey::Column { idx: col_idx, .. } = key else {
2457 return None;
2458 };
2459 key_params.push((
2460 col_to_param.get(&(*col_idx as usize)).copied()?,
2461 idx.collation_at(i),
2462 ));
2463 }
2464 index_inserts.push(IndexInsertSpec {
2465 table: TableSchema::index_table_name(&ts.name, &idx.name),
2466 key_params,
2467 });
2468 }
2469
2470 Some(TrivialFastProgram {
2471 template: tmpl.template,
2472 ops,
2473 pk_param,
2474 fk_checks,
2475 index_inserts,
2476 on_dup,
2477 })
2478}
2479
2480#[derive(Clone)]
2481pub(super) enum CompiledOnConflict {
2482 DoNothing {
2483 target: Option<ConflictKind>,
2484 },
2485 DoUpdate {
2486 target: ConflictKind,
2487 assignments: Vec<(usize, Expr)>,
2488 where_clause: Option<Expr>,
2489 fast_paths: Option<Vec<DoUpdateFastPath>>,
2490 },
2491}
2492
2493#[derive(Clone, Copy)]
2494pub(super) enum DoUpdateFastPath {
2495 IntAddConst { phys_idx: usize, delta: i64 },
2496}
2497
2498#[derive(Clone, Debug)]
2499pub(super) enum ConflictKind {
2500 PrimaryKey,
2501 UniqueIndex { index_idx: usize },
2502}
2503
2504fn resolve_conflict_target(target: &ConflictTarget, ts: &TableSchema) -> Result<ConflictKind> {
2505 match target {
2506 ConflictTarget::Columns(cols) => {
2507 let col_idx_set: Vec<u16> = cols
2508 .iter()
2509 .map(|name| {
2510 ts.column_index(name)
2511 .map(|i| i as u16)
2512 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
2513 })
2514 .collect::<Result<_>>()?;
2515 let pk_set = ts.primary_key_columns.clone();
2516 if set_equal(&col_idx_set, &pk_set) {
2517 return Ok(ConflictKind::PrimaryKey);
2518 }
2519 for (index_idx, idx) in ts.indices.iter().enumerate() {
2520 if idx.unique && set_equal(&col_idx_set, &idx.columns_vec()) {
2521 return Ok(ConflictKind::UniqueIndex { index_idx });
2522 }
2523 }
2524 Err(SqlError::Plan(
2525 "ON CONFLICT target does not match any unique constraint".into(),
2526 ))
2527 }
2528 ConflictTarget::Constraint(name) => {
2529 let lower = name.to_ascii_lowercase();
2530 for (index_idx, idx) in ts.indices.iter().enumerate() {
2531 if idx.name.eq_ignore_ascii_case(&lower) {
2532 if idx.unique {
2533 return Ok(ConflictKind::UniqueIndex { index_idx });
2534 }
2535 return Err(SqlError::Plan(format!(
2536 "ON CONFLICT ON CONSTRAINT '{name}' requires a unique index"
2537 )));
2538 }
2539 }
2540 Err(SqlError::Plan(format!(
2541 "unknown constraint '{name}'; primary keys cannot be referenced by name, use ON CONFLICT (col_list)"
2542 )))
2543 }
2544 }
2545}
2546
2547fn set_equal(a: &[u16], b: &[u16]) -> bool {
2548 if a.len() != b.len() {
2549 return false;
2550 }
2551 let mut a_sorted = a.to_vec();
2552 let mut b_sorted = b.to_vec();
2553 a_sorted.sort_unstable();
2554 b_sorted.sort_unstable();
2555 a_sorted == b_sorted
2556}
2557
2558pub(super) enum InsertRowOutcome {
2559 Inserted,
2560 Updated { old: Vec<Value>, new: Vec<Value> },
2561 Skipped,
2562}
2563
2564#[allow(clippy::too_many_arguments)]
2565#[inline]
2566pub(super) fn apply_insert_with_conflict(
2567 wtx: &mut WriteTxn<'_>,
2568 table_schema: &TableSchema,
2569 key_buf: &[u8],
2570 value_buf: &[u8],
2571 row: &[Value],
2572 pk_values: &[Value],
2573 on_conflict: &CompiledOnConflict,
2574 col_map: &ColumnMap,
2575 capture_returning: bool,
2576) -> Result<InsertRowOutcome> {
2577 let table_bytes = table_schema.name.as_bytes();
2578
2579 if let CompiledOnConflict::DoNothing { target } = on_conflict {
2580 let pk_target = matches!(target, None | Some(ConflictKind::PrimaryKey));
2581 if pk_target && table_schema.indices.is_empty() && table_schema.foreign_keys.is_empty() {
2582 let inserted = wtx
2583 .table_insert_if_absent(table_bytes, key_buf, value_buf)
2584 .map_err(SqlError::Storage)?;
2585 return Ok(if inserted {
2586 InsertRowOutcome::Inserted
2587 } else {
2588 InsertRowOutcome::Skipped
2589 });
2590 }
2591 }
2592
2593 if let CompiledOnConflict::DoUpdate {
2594 target: ConflictKind::PrimaryKey,
2595 assignments,
2596 where_clause,
2597 fast_paths,
2598 } = on_conflict
2599 {
2600 if can_fuse_do_update(table_schema, assignments) {
2601 return apply_do_update_fused(
2602 wtx,
2603 table_schema,
2604 table_bytes,
2605 key_buf,
2606 value_buf,
2607 row,
2608 assignments,
2609 where_clause.as_ref(),
2610 col_map,
2611 fast_paths.as_deref(),
2612 capture_returning,
2613 );
2614 }
2615 }
2616
2617 let primary_outcome = wtx
2618 .table_insert_or_fetch(table_bytes, key_buf, value_buf)
2619 .map_err(SqlError::Storage)?;
2620
2621 match primary_outcome {
2622 citadel_txn::write_txn::InsertOutcome::Inserted => {
2623 if table_schema.indices.is_empty() {
2624 return Ok(InsertRowOutcome::Inserted);
2625 }
2626 let mut inserted_keys: Vec<(usize, Vec<u8>)> = Vec::new();
2627 match insert_index_entries_or_fetch(
2628 wtx,
2629 table_schema,
2630 row,
2631 pk_values,
2632 &mut inserted_keys,
2633 )? {
2634 None => Ok(InsertRowOutcome::Inserted),
2635 Some(conflicting_idx) => {
2636 let matches_target =
2637 matches!(on_conflict, CompiledOnConflict::DoNothing { target: None })
2638 || matches!(
2639 on_conflict,
2640 CompiledOnConflict::DoNothing {
2641 target: Some(ConflictKind::UniqueIndex { index_idx }),
2642 } | CompiledOnConflict::DoUpdate {
2643 target: ConflictKind::UniqueIndex { index_idx },
2644 ..
2645 } if *index_idx == conflicting_idx
2646 );
2647 undo_partial_insert(wtx, table_schema, key_buf, &inserted_keys)?;
2648 if !matches_target {
2649 return Err(SqlError::UniqueViolation(
2650 table_schema.indices[conflicting_idx].name.clone(),
2651 ));
2652 }
2653 match on_conflict {
2654 CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2655 CompiledOnConflict::DoUpdate {
2656 assignments,
2657 where_clause,
2658 ..
2659 } => {
2660 let existing_pk =
2661 fetch_unique_index_pk(wtx, table_schema, conflicting_idx, row)?;
2662 apply_do_update(
2663 wtx,
2664 table_schema,
2665 &existing_pk,
2666 row,
2667 assignments,
2668 where_clause.as_ref(),
2669 col_map,
2670 capture_returning,
2671 )
2672 }
2673 }
2674 }
2675 }
2676 }
2677 citadel_txn::write_txn::InsertOutcome::Existed(old_bytes) => {
2678 let matches_target = matches!(
2679 on_conflict,
2680 CompiledOnConflict::DoNothing { target: None }
2681 | CompiledOnConflict::DoNothing {
2682 target: Some(ConflictKind::PrimaryKey),
2683 }
2684 | CompiledOnConflict::DoUpdate {
2685 target: ConflictKind::PrimaryKey,
2686 ..
2687 }
2688 );
2689 if !matches_target {
2690 return Err(SqlError::DuplicateKey);
2691 }
2692 match on_conflict {
2693 CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2694 CompiledOnConflict::DoUpdate {
2695 assignments,
2696 where_clause,
2697 ..
2698 } => {
2699 let old_row = decode_full_row(table_schema, key_buf, &old_bytes)?;
2700 apply_do_update_with_old_row(
2701 wtx,
2702 table_schema,
2703 key_buf,
2704 &old_row,
2705 row,
2706 assignments,
2707 where_clause.as_ref(),
2708 col_map,
2709 capture_returning,
2710 )
2711 }
2712 }
2713 }
2714 }
2715}
2716
2717#[inline]
2718fn apply_fast_path_patch(
2719 old_bytes: &[u8],
2720 fast_paths: &[DoUpdateFastPath],
2721) -> Result<UpsertAction> {
2722 UPSERT_SCRATCH.with(|slot| {
2723 let mut bufs = slot.borrow_mut();
2724 bufs.new_value_buf.clear();
2725 bufs.new_value_buf.extend_from_slice(old_bytes);
2726
2727 let mut patch_scratch: Vec<u8> = Vec::new();
2728
2729 for fp in fast_paths {
2730 match fp {
2731 DoUpdateFastPath::IntAddConst { phys_idx, delta } => {
2732 let decoded =
2733 crate::encoding::decode_columns(&bufs.new_value_buf, &[*phys_idx])?;
2734 let old_val = &decoded[0];
2735 let new_val = match old_val {
2736 Value::Integer(i) => Value::Integer(i.wrapping_add(*delta)),
2737 Value::Null => Value::Null,
2738 _ => {
2739 return Err(SqlError::TypeMismatch {
2740 expected: "INTEGER".into(),
2741 got: old_val.data_type().to_string(),
2742 });
2743 }
2744 };
2745 if !crate::encoding::patch_column_in_place(
2746 &mut bufs.new_value_buf,
2747 *phys_idx,
2748 &new_val,
2749 )? {
2750 patch_scratch.clear();
2751 crate::encoding::patch_row_column(
2752 &bufs.new_value_buf,
2753 *phys_idx,
2754 &new_val,
2755 &mut patch_scratch,
2756 )?;
2757 std::mem::swap(&mut bufs.new_value_buf, &mut patch_scratch);
2758 }
2759 }
2760 }
2761 }
2762
2763 if bufs.new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2764 return Err(SqlError::RowTooLarge {
2765 size: bufs.new_value_buf.len(),
2766 max: citadel_core::MAX_VALUE_SIZE,
2767 });
2768 }
2769
2770 Ok(UpsertAction::Replace(bufs.new_value_buf.clone()))
2771 })
2772}
2773
2774fn upsert_needs_row(oc: &CompiledOnConflict, ts: &TableSchema) -> bool {
2775 if !ts.indices.is_empty() {
2776 return true;
2777 }
2778 match oc {
2779 CompiledOnConflict::DoNothing { .. } => false,
2780 CompiledOnConflict::DoUpdate { fast_paths, .. } => fast_paths.is_none() || ts.has_checks(),
2781 }
2782}
2783
2784fn can_fuse_do_update(ts: &TableSchema, assignments: &[(usize, Expr)]) -> bool {
2785 if !ts.indices.is_empty() {
2786 return false;
2787 }
2788 if !ts.foreign_keys.is_empty() {
2789 return false;
2790 }
2791 if ts.columns.iter().any(|c| c.generated_kind.is_some()) {
2792 return false;
2793 }
2794 let pk = ts.pk_indices();
2795 !assignments.iter().any(|(ci, _)| pk.contains(ci))
2796}
2797
2798#[allow(clippy::too_many_arguments)]
2799#[inline]
2800fn apply_do_update_fused(
2801 wtx: &mut WriteTxn<'_>,
2802 table_schema: &TableSchema,
2803 table_bytes: &[u8],
2804 key_buf: &[u8],
2805 value_buf: &[u8],
2806 proposed_row: &[Value],
2807 assignments: &[(usize, Expr)],
2808 where_clause: Option<&Expr>,
2809 col_map: &ColumnMap,
2810 fast_paths: Option<&[DoUpdateFastPath]>,
2811 capture_returning: bool,
2812) -> Result<InsertRowOutcome> {
2813 let non_pk = table_schema.non_pk_indices();
2814 let enc_pos = table_schema.encoding_positions();
2815 let phys_count = table_schema.physical_non_pk_count();
2816 let dropped = table_schema.dropped_non_pk_slots();
2817 let has_checks = table_schema.has_checks();
2818 let has_fks = !table_schema.foreign_keys.is_empty();
2819
2820 let captured: std::cell::RefCell<Option<(Vec<Value>, Vec<Value>)>> =
2821 std::cell::RefCell::new(None);
2822
2823 let outcome =
2824 wtx.table_upsert_with::<_, SqlError>(table_bytes, key_buf, value_buf, |old_bytes| {
2825 if let Some(fps) = fast_paths {
2826 if !has_checks {
2827 let action = apply_fast_path_patch(old_bytes, fps)?;
2828 if capture_returning {
2829 if let UpsertAction::Replace(ref new_bytes) = action {
2830 let old_row = decode_full_row(table_schema, key_buf, old_bytes)?;
2831 let new_row = decode_full_row(table_schema, key_buf, new_bytes)?;
2832 *captured.borrow_mut() = Some((old_row, new_row));
2833 }
2834 }
2835 return Ok(action);
2836 }
2837 }
2838 UPSERT_SCRATCH.with(|slot| {
2839 let mut bufs = slot.borrow_mut();
2840 let UpsertBufs {
2841 old_row,
2842 new_row,
2843 value_values,
2844 new_value_buf,
2845 } = &mut *bufs;
2846
2847 old_row.clear();
2848 old_row.resize(table_schema.columns.len(), Value::Null);
2849 decode_full_row_into(table_schema, key_buf, old_bytes, old_row)?;
2850
2851 if let Some(w) = where_clause {
2852 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2853 let result = eval_expr(w, &ctx)?;
2854 if result.is_null() || !is_truthy(&result) {
2855 return Ok(UpsertAction::Skip);
2856 }
2857 }
2858
2859 new_row.clear();
2860 new_row.extend_from_slice(old_row);
2861 for (col_idx, expr) in assignments {
2862 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2863 let val = eval_expr(expr, &ctx)?;
2864 let col = &table_schema.columns[*col_idx];
2865 new_row[*col_idx] = if val.is_null() {
2866 Value::Null
2867 } else {
2868 let got = val.data_type();
2869 val.coerce_into(col.data_type)
2870 .ok_or_else(|| SqlError::TypeMismatch {
2871 expected: col.data_type.to_string(),
2872 got: got.to_string(),
2873 })?
2874 };
2875 }
2876
2877 for (assigned_idx, _) in assignments {
2878 let col = &table_schema.columns[*assigned_idx];
2879 if !col.nullable && new_row[col.position as usize].is_null() {
2880 return Err(SqlError::NotNullViolation(col.name.clone()));
2881 }
2882 }
2883 if has_checks {
2884 for col in &table_schema.columns {
2885 if let Some(ref check) = col.check_expr {
2886 let ctx = EvalCtx::new(col_map, new_row);
2887 let result = eval_expr(check, &ctx)?;
2888 if !is_truthy(&result) && !result.is_null() {
2889 let name = col.check_name.as_deref().unwrap_or(&col.name);
2890 return Err(SqlError::CheckViolation(name.to_string()));
2891 }
2892 }
2893 }
2894 for tc in &table_schema.check_constraints {
2895 let ctx = EvalCtx::new(col_map, new_row);
2896 let result = eval_expr(&tc.expr, &ctx)?;
2897 if !is_truthy(&result) && !result.is_null() {
2898 let name = tc.name.as_deref().unwrap_or(&tc.sql);
2899 return Err(SqlError::CheckViolation(name.to_string()));
2900 }
2901 }
2902 }
2903 let _ = has_fks;
2904
2905 value_values.clear();
2906 value_values.resize(phys_count, Value::Null);
2907 for &slot in dropped {
2908 value_values[slot as usize] = Value::Null;
2909 }
2910 for (j, &i) in non_pk.iter().enumerate() {
2911 value_values[enc_pos[j] as usize] = new_row[i].clone();
2912 }
2913 new_value_buf.clear();
2914 crate::encoding::encode_row_into(value_values, new_value_buf);
2915
2916 if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2917 return Err(SqlError::RowTooLarge {
2918 size: new_value_buf.len(),
2919 max: citadel_core::MAX_VALUE_SIZE,
2920 });
2921 }
2922
2923 if capture_returning {
2924 *captured.borrow_mut() = Some((old_row.clone(), new_row.clone()));
2925 }
2926 Ok(UpsertAction::Replace(new_value_buf.clone()))
2927 })
2928 })?;
2929
2930 match outcome {
2931 UpsertOutcome::Inserted => Ok(InsertRowOutcome::Inserted),
2932 UpsertOutcome::Updated => {
2933 if capture_returning {
2934 let (old, new) = captured.into_inner().ok_or_else(|| {
2935 SqlError::InvalidValue("DO UPDATE produced no captured rows".into())
2936 })?;
2937 Ok(InsertRowOutcome::Updated { old, new })
2938 } else {
2939 Ok(InsertRowOutcome::Inserted)
2940 }
2941 }
2942 UpsertOutcome::Skipped => Ok(InsertRowOutcome::Skipped),
2943 }
2944}
2945
2946fn fetch_unique_index_pk(
2947 wtx: &mut WriteTxn<'_>,
2948 table_schema: &TableSchema,
2949 index_idx: usize,
2950 row: &[Value],
2951) -> Result<Vec<u8>> {
2952 let idx = &table_schema.indices[index_idx];
2953 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2954 let indexed: Vec<Value> = idx
2955 .column_positions_iter()
2956 .map(|col_idx| row[col_idx as usize].clone())
2957 .collect();
2958 let key = crate::encoding::encode_composite_key(&indexed);
2959 let value = wtx
2960 .table_get(&idx_table, &key)
2961 .map_err(SqlError::Storage)?
2962 .ok_or_else(|| {
2963 SqlError::InvalidValue("unique index missing expected collision entry".into())
2964 })?;
2965 Ok(value)
2966}
2967
2968#[allow(clippy::too_many_arguments)]
2969fn apply_do_update(
2970 wtx: &mut WriteTxn<'_>,
2971 table_schema: &TableSchema,
2972 pk_key: &[u8],
2973 proposed_row: &[Value],
2974 assignments: &[(usize, Expr)],
2975 where_clause: Option<&Expr>,
2976 col_map: &ColumnMap,
2977 capture_returning: bool,
2978) -> Result<InsertRowOutcome> {
2979 let old_value = wtx
2980 .table_get(table_schema.name.as_bytes(), pk_key)
2981 .map_err(SqlError::Storage)?
2982 .ok_or_else(|| SqlError::InvalidValue("primary row missing for DO UPDATE target".into()))?;
2983 let old_row = decode_full_row(table_schema, pk_key, &old_value)?;
2984 apply_do_update_with_old_row(
2985 wtx,
2986 table_schema,
2987 pk_key,
2988 &old_row,
2989 proposed_row,
2990 assignments,
2991 where_clause,
2992 col_map,
2993 capture_returning,
2994 )
2995}
2996
2997#[allow(clippy::too_many_arguments)]
2998fn apply_do_update_with_old_row(
2999 wtx: &mut WriteTxn<'_>,
3000 table_schema: &TableSchema,
3001 old_pk_key: &[u8],
3002 old_row: &[Value],
3003 proposed_row: &[Value],
3004 assignments: &[(usize, Expr)],
3005 where_clause: Option<&Expr>,
3006 col_map: &ColumnMap,
3007 capture_returning: bool,
3008) -> Result<InsertRowOutcome> {
3009 if let Some(w) = where_clause {
3010 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
3011 let result = eval_expr(w, &ctx)?;
3012 if result.is_null() || !is_truthy(&result) {
3013 return Ok(InsertRowOutcome::Skipped);
3014 }
3015 }
3016
3017 let mut new_row = old_row.to_vec();
3018 for (col_idx, expr) in assignments {
3019 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
3020 let val = eval_expr(expr, &ctx)?;
3021 let col = &table_schema.columns[*col_idx];
3022 new_row[*col_idx] = if val.is_null() {
3023 Value::Null
3024 } else {
3025 let got = val.data_type();
3026 val.coerce_into(col.data_type)
3027 .ok_or_else(|| SqlError::TypeMismatch {
3028 expected: col.data_type.to_string(),
3029 got: got.to_string(),
3030 })?
3031 };
3032 }
3033
3034 for col in &table_schema.columns {
3035 if matches!(
3036 col.generated_kind,
3037 Some(crate::parser::GeneratedKind::Stored)
3038 ) {
3039 let val = eval_expr(
3040 col.generated_expr.as_ref().unwrap(),
3041 &EvalCtx::new(col_map, &new_row),
3042 )?;
3043 let pos = col.position as usize;
3044 new_row[pos] = if val.is_null() {
3045 if !col.nullable {
3046 return Err(SqlError::NotNullViolation(col.name.clone()));
3047 }
3048 Value::Null
3049 } else {
3050 let got = val.data_type();
3051 val.coerce_into(col.data_type)
3052 .ok_or_else(|| SqlError::TypeMismatch {
3053 expected: col.data_type.to_string(),
3054 got: got.to_string(),
3055 })?
3056 };
3057 }
3058 }
3059
3060 let pk_indices = table_schema.pk_indices();
3061 let assigned_pk = assignments.iter().any(|(ci, _)| pk_indices.contains(ci));
3062 let pk_changed = assigned_pk && pk_indices.iter().any(|&i| old_row[i] != new_row[i]);
3063
3064 for (assigned_idx, _) in assignments {
3065 let col = &table_schema.columns[*assigned_idx];
3066 if !col.nullable && new_row[col.position as usize].is_null() {
3067 return Err(SqlError::NotNullViolation(col.name.clone()));
3068 }
3069 }
3070 if table_schema.has_checks() {
3071 for col in &table_schema.columns {
3072 if let Some(ref check) = col.check_expr {
3073 let ctx = EvalCtx::new(col_map, &new_row);
3074 let result = eval_expr(check, &ctx)?;
3075 if !is_truthy(&result) && !result.is_null() {
3076 let name = col.check_name.as_deref().unwrap_or(&col.name);
3077 return Err(SqlError::CheckViolation(name.to_string()));
3078 }
3079 }
3080 }
3081 for tc in &table_schema.check_constraints {
3082 let ctx = EvalCtx::new(col_map, &new_row);
3083 let result = eval_expr(&tc.expr, &ctx)?;
3084 if !is_truthy(&result) && !result.is_null() {
3085 let name = tc.name.as_deref().unwrap_or(&tc.sql);
3086 return Err(SqlError::CheckViolation(name.to_string()));
3087 }
3088 }
3089 }
3090 for fk in &table_schema.foreign_keys {
3091 let changed = fk
3092 .columns
3093 .iter()
3094 .any(|&ci| old_row[ci as usize] != new_row[ci as usize]);
3095 if !changed {
3096 continue;
3097 }
3098 let any_null = fk.columns.iter().any(|&ci| new_row[ci as usize].is_null());
3099 if any_null {
3100 continue;
3101 }
3102 let fk_vals: Vec<Value> = fk
3103 .columns
3104 .iter()
3105 .map(|&ci| new_row[ci as usize].clone())
3106 .collect();
3107 let fk_key = crate::encoding::encode_composite_key(&fk_vals);
3108 if fk.deferrable && fk.initially_deferred {
3109 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
3110 wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
3111 fk_name: name,
3112 foreign_table: fk.foreign_table.as_bytes().to_vec(),
3113 parent_key: fk_key,
3114 });
3115 continue;
3116 }
3117 if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key) {
3118 let found = wtx
3119 .table_get(fk.foreign_table.as_bytes(), &fk_key)
3120 .map_err(SqlError::Storage)?;
3121 if found.is_none() {
3122 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
3123 return Err(SqlError::ForeignKeyViolation(name.to_string()));
3124 }
3125 wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key);
3126 }
3127 }
3128
3129 let has_indices = !table_schema.indices.is_empty();
3130 let old_pk_values: Vec<Value> = if has_indices || pk_changed {
3131 pk_indices.iter().map(|&i| old_row[i].clone()).collect()
3132 } else {
3133 Vec::new()
3134 };
3135 let new_pk_values: Vec<Value> = if has_indices || pk_changed {
3136 pk_indices.iter().map(|&i| new_row[i].clone()).collect()
3137 } else {
3138 Vec::new()
3139 };
3140
3141 let non_pk = table_schema.non_pk_indices();
3142 let enc_pos = table_schema.encoding_positions();
3143 let phys_count = table_schema.physical_non_pk_count();
3144 let dropped = table_schema.dropped_non_pk_slots();
3145 let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
3146 for &slot in dropped {
3147 value_values[slot as usize] = Value::Null;
3148 }
3149 for (j, &i) in non_pk.iter().enumerate() {
3150 let col = &table_schema.columns[i];
3151 value_values[enc_pos[j] as usize] = if matches!(
3152 col.generated_kind,
3153 Some(crate::parser::GeneratedKind::Virtual)
3154 ) {
3155 Value::Null
3156 } else {
3157 new_row[i].clone()
3158 };
3159 }
3160 let mut new_value_buf = Vec::with_capacity(256);
3161 crate::encoding::encode_row_into(&value_values, &mut new_value_buf);
3162
3163 if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
3164 return Err(SqlError::RowTooLarge {
3165 size: new_value_buf.len(),
3166 max: citadel_core::MAX_VALUE_SIZE,
3167 });
3168 }
3169
3170 if pk_changed {
3171 let new_pk_key = crate::encoding::encode_composite_key(&new_pk_values);
3172 let inserted = wtx
3173 .table_insert(table_schema.name.as_bytes(), &new_pk_key, &new_value_buf)
3174 .map_err(SqlError::Storage)?;
3175 if !inserted {
3176 return Err(SqlError::DuplicateKey);
3177 }
3178 wtx.table_delete(table_schema.name.as_bytes(), old_pk_key)
3179 .map_err(SqlError::Storage)?;
3180 for idx in &table_schema.indices {
3181 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
3182 let old_idx_key =
3183 encode_index_key_with_schema(idx, old_row, &old_pk_values, table_schema);
3184 wtx.table_delete(&idx_table, &old_idx_key)
3185 .map_err(SqlError::Storage)?;
3186 let new_idx_key =
3187 encode_index_key_with_schema(idx, &new_row, &new_pk_values, table_schema);
3188 let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
3189 let is_new = wtx
3190 .table_insert(&idx_table, &new_idx_key, &new_idx_val)
3191 .map_err(SqlError::Storage)?;
3192 if idx.unique && !is_new {
3193 let any_null = idx
3194 .column_positions_iter()
3195 .any(|c| new_row[c as usize].is_null());
3196 if !any_null {
3197 return Err(SqlError::UniqueViolation(idx.name.clone()));
3198 }
3199 }
3200 }
3201 } else {
3202 wtx.table_update_sorted(
3203 table_schema.name.as_bytes(),
3204 &[(old_pk_key, new_value_buf.as_slice())],
3205 )
3206 .map_err(SqlError::Storage)?;
3207 let col_map_partial = any_partial_index(table_schema).then(|| table_schema.column_map());
3208 for idx in &table_schema.indices {
3209 let cols_changed = index_columns_changed(idx, old_row, &new_row);
3210 let (del, ins) = partial_idx_update_actions(
3211 idx,
3212 old_row,
3213 &new_row,
3214 cols_changed,
3215 false,
3216 col_map_partial,
3217 );
3218 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
3219 if del {
3220 let old_idx_key =
3221 encode_index_key_with_schema(idx, old_row, &old_pk_values, table_schema);
3222 wtx.table_delete(&idx_table, &old_idx_key)
3223 .map_err(SqlError::Storage)?;
3224 }
3225 if ins {
3226 let new_idx_key =
3227 encode_index_key_with_schema(idx, &new_row, &new_pk_values, table_schema);
3228 let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
3229 let is_new = wtx
3230 .table_insert(&idx_table, &new_idx_key, &new_idx_val)
3231 .map_err(SqlError::Storage)?;
3232 if idx.unique && !is_new {
3233 let any_null = idx
3234 .column_positions_iter()
3235 .any(|c| new_row[c as usize].is_null());
3236 if !any_null {
3237 return Err(SqlError::UniqueViolation(idx.name.clone()));
3238 }
3239 }
3240 }
3241 }
3242 }
3243
3244 if capture_returning {
3245 Ok(InsertRowOutcome::Updated {
3246 old: old_row.to_vec(),
3247 new: new_row,
3248 })
3249 } else {
3250 Ok(InsertRowOutcome::Inserted)
3251 }
3252}
3253
3254fn detect_fast_paths(
3255 ts: &TableSchema,
3256 assignments: &[(usize, Expr)],
3257) -> Option<Vec<DoUpdateFastPath>> {
3258 let non_pk = ts.non_pk_indices();
3259 let enc_pos = ts.encoding_positions();
3260 let mut out = Vec::with_capacity(assignments.len());
3261 for (col_idx, expr) in assignments {
3262 let col = &ts.columns[*col_idx];
3263 if col.data_type != DataType::Integer {
3264 return None;
3265 }
3266 let nonpk_order = non_pk.iter().position(|&i| i == *col_idx)?;
3267 let phys_idx = enc_pos[nonpk_order] as usize;
3268
3269 if let Expr::BinaryOp { left, op, right } = expr {
3270 if !matches!(op, BinOp::Add | BinOp::Sub) {
3271 return None;
3272 }
3273 let reads_target =
3274 matches!(left.as_ref(), Expr::Column(n) if n.eq_ignore_ascii_case(&col.name));
3275 if !reads_target {
3276 return None;
3277 }
3278 if let Expr::Literal(Value::Integer(n)) = right.as_ref() {
3279 let delta = if matches!(op, BinOp::Sub) { -n } else { *n };
3280 let _ = col_idx;
3281 out.push(DoUpdateFastPath::IntAddConst { phys_idx, delta });
3282 continue;
3283 }
3284 return None;
3285 }
3286 return None;
3287 }
3288 Some(out)
3289}
3290
3291fn compile_on_conflict(oc: &OnConflictClause, ts: &TableSchema) -> Result<CompiledOnConflict> {
3292 let target = oc
3293 .target
3294 .as_ref()
3295 .map(|t| resolve_conflict_target(t, ts))
3296 .transpose()?;
3297 match &oc.action {
3298 OnConflictAction::DoNothing => Ok(CompiledOnConflict::DoNothing { target }),
3299 OnConflictAction::DoUpdate {
3300 assignments,
3301 where_clause,
3302 } => {
3303 let target = target.ok_or_else(|| {
3304 SqlError::Plan("ON CONFLICT without target requires DO NOTHING".into())
3305 })?;
3306 let compiled_assignments: Vec<(usize, Expr)> = assignments
3307 .iter()
3308 .map(|(name, expr)| {
3309 let col_idx = ts
3310 .column_index(name)
3311 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?;
3312 Ok((col_idx, expr.clone()))
3313 })
3314 .collect::<Result<_>>()?;
3315 let fast_paths = if where_clause.is_none() {
3316 detect_fast_paths(ts, &compiled_assignments)
3317 } else {
3318 None
3319 };
3320 Ok(CompiledOnConflict::DoUpdate {
3321 target,
3322 assignments: compiled_assignments,
3323 where_clause: where_clause.clone(),
3324 fast_paths,
3325 })
3326 }
3327 }
3328}
3329
3330fn exec_insert_trivial_fast(
3333 wtx: &mut WriteTxn<'_>,
3334 table_lower: &str,
3335 cache: &InsertCache,
3336 bufs: &mut InsertBufs,
3337 params: &[Value],
3338) -> Result<Option<ExecutionResult>> {
3339 let prog = cache
3340 .trivial_fast_program
3341 .as_ref()
3342 .expect("trivial fast: program");
3343
3344 match ¶ms[prog.pk_param as usize] {
3345 Value::Integer(v) => crate::encoding::encode_int_key_into(*v, &mut bufs.key_buf),
3346 Value::Null => return Ok(None),
3347 _ => return Err(SqlError::InvalidValue("non-integer PK in fast path".into())),
3348 }
3349
3350 bufs.value_buf.clear();
3351 bufs.value_buf.extend_from_slice(&prog.template);
3352
3353 for op in &prog.ops {
3354 match op {
3355 WriteOp::ParamI64 { param_idx, off } => match ¶ms[*param_idx as usize] {
3356 Value::Integer(v) => {
3357 let off = *off as usize;
3358 bufs.value_buf[off..off + 8].copy_from_slice(&v.to_le_bytes());
3359 }
3360 Value::Null => return Ok(None),
3361 other => {
3362 return Err(SqlError::TypeMismatch {
3363 expected: "Integer".into(),
3364 got: other.data_type().to_string(),
3365 });
3366 }
3367 },
3368 WriteOp::LiteralI64 { value, off } => {
3369 let off = *off as usize;
3370 bufs.value_buf[off..off + 8].copy_from_slice(&value.to_le_bytes());
3371 }
3372 WriteOp::GenAddParamsI64 {
3373 a_param,
3374 b_param,
3375 off,
3376 } => match (¶ms[*a_param as usize], ¶ms[*b_param as usize]) {
3377 (Value::Integer(a), Value::Integer(b)) => {
3378 let off = *off as usize;
3379 bufs.value_buf[off..off + 8].copy_from_slice(&a.wrapping_add(*b).to_le_bytes());
3380 }
3381 _ => return Ok(None),
3382 },
3383 WriteOp::GenMulAddParamI64 {
3384 param_idx,
3385 mul,
3386 add,
3387 off,
3388 } => match ¶ms[*param_idx as usize] {
3389 Value::Integer(v) => {
3390 let r = v.wrapping_mul(*mul).wrapping_add(*add);
3391 let off = *off as usize;
3392 bufs.value_buf[off..off + 8].copy_from_slice(&r.to_le_bytes());
3393 }
3394 _ => return Ok(None),
3395 },
3396 }
3397 }
3398
3399 for fk in &prog.fk_checks {
3400 if fk.col_params.iter().any(|&p| params[p as usize].is_null()) {
3401 continue;
3402 }
3403 bufs.fk_key_buf.clear();
3404 for &p in &fk.col_params {
3405 crate::encoding::encode_key_value_into(¶ms[p as usize], &mut bufs.fk_key_buf);
3406 }
3407 if !wtx.fk_check_cached(&fk.foreign_table, &bufs.fk_key_buf) {
3408 let found = wtx
3409 .table_get(&fk.foreign_table, &bufs.fk_key_buf)
3410 .map_err(SqlError::Storage)?;
3411 if found.is_none() {
3412 return Err(SqlError::ForeignKeyViolation(
3413 String::from_utf8_lossy(&fk.foreign_table).into_owned(),
3414 ));
3415 }
3416 wtx.mark_fk_verified(&fk.foreign_table, &bufs.fk_key_buf);
3417 }
3418 }
3419
3420 if let DupPolicy::Patch(fps) = &prog.on_dup {
3421 let outcome = wtx.table_upsert_with::<_, SqlError>(
3422 table_lower.as_bytes(),
3423 &bufs.key_buf,
3424 &bufs.value_buf,
3425 |old_bytes| apply_fast_path_patch(old_bytes, fps),
3426 )?;
3427 return Ok(Some(match outcome {
3428 UpsertOutcome::Inserted | UpsertOutcome::Updated => ExecutionResult::RowsAffected(1),
3429 UpsertOutcome::Skipped => ExecutionResult::RowsAffected(0),
3430 }));
3431 }
3432
3433 let is_new = wtx
3434 .table_insert_if_absent(table_lower.as_bytes(), &bufs.key_buf, &bufs.value_buf)
3435 .map_err(SqlError::Storage)?;
3436 if !is_new {
3437 return match &prog.on_dup {
3438 DupPolicy::Error => Err(SqlError::DuplicateKey),
3439 DupPolicy::Skip => Ok(Some(ExecutionResult::RowsAffected(0))),
3440 DupPolicy::Patch(_) => unreachable!("handled above"),
3441 };
3442 }
3443
3444 for idx in &prog.index_inserts {
3445 bufs.fk_key_buf.clear();
3446 for &(p, coll) in &idx.key_params {
3447 crate::encoding::encode_key_value_collated_into(
3448 ¶ms[p as usize],
3449 coll,
3450 &mut bufs.fk_key_buf,
3451 );
3452 }
3453 crate::encoding::encode_key_value_into(
3454 ¶ms[prog.pk_param as usize],
3455 &mut bufs.fk_key_buf,
3456 );
3457 wtx.table_insert_index(&idx.table, &bufs.fk_key_buf, &[])
3458 .map_err(SqlError::Storage)?;
3459 }
3460
3461 Ok(Some(ExecutionResult::RowsAffected(1)))
3462}
3463
3464fn build_bind_plan(
3465 stmt: &InsertStmt,
3466 col_indices: &[usize],
3467 col_data_types: &[DataType],
3468) -> Option<Vec<BindAction>> {
3469 let rows = match &stmt.source {
3470 InsertSource::Values(rows) => rows,
3471 _ => return None,
3472 };
3473 if rows.len() != 1 {
3474 return None;
3475 }
3476 let value_row = &rows[0];
3477 if value_row.len() != col_indices.len() {
3478 return None;
3479 }
3480 let mut plan = Vec::with_capacity(value_row.len());
3481 for (i, expr) in value_row.iter().enumerate() {
3482 let col_idx = col_indices[i];
3483 let target = col_data_types[col_idx];
3484 match expr {
3485 Expr::Parameter(n) => {
3486 if *n == 0 {
3487 return None;
3488 }
3489 plan.push(BindAction::Param {
3490 param_idx: n - 1,
3491 col_idx,
3492 target,
3493 });
3494 }
3495 Expr::Literal(v) => plan.push(BindAction::Literal {
3496 value: v.clone(),
3497 col_idx,
3498 }),
3499 _ => return None,
3500 }
3501 }
3502 Some(plan)
3503}
3504
3505impl CompiledInsert {
3506 pub fn try_compile(schema: &SchemaManager, stmt: &InsertStmt) -> Option<Self> {
3507 let lower = stmt.table.to_ascii_lowercase();
3508 let cached = if schema.get_matview(&lower).is_some() {
3511 None
3512 } else if let Some(ts) = schema.get(&lower) {
3513 let insert_columns: Vec<&str> = if stmt.columns.is_empty() {
3514 ts.columns.iter().map(|c| c.name.as_str()).collect()
3515 } else {
3516 stmt.columns.iter().map(|s| s.as_str()).collect()
3517 };
3518 let mut col_indices = Vec::with_capacity(insert_columns.len());
3519 for name in &insert_columns {
3520 col_indices.push(ts.column_index(name)?);
3521 }
3522 if col_indices
3523 .iter()
3524 .any(|&ci| ts.columns[ci].generated_kind.is_some())
3525 {
3526 return None;
3527 }
3528 let on_conflict = stmt
3529 .on_conflict
3530 .as_ref()
3531 .map(|oc| compile_on_conflict(oc, ts))
3532 .transpose()
3533 .ok()
3534 .flatten()
3535 .map(Arc::new);
3536 let generated_col_positions: Vec<usize> = ts
3537 .columns
3538 .iter()
3539 .enumerate()
3540 .filter_map(|(i, c)| {
3541 matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored))
3542 .then_some(i)
3543 })
3544 .collect();
3545 let generated_fast_evals: Vec<FastGenEval> = generated_col_positions
3546 .iter()
3547 .map(|&pos| {
3548 detect_fast_gen_eval(ts.columns[pos].generated_expr.as_ref().unwrap(), ts)
3549 })
3550 .collect();
3551 let pk_indices: Vec<usize> = ts.pk_indices().to_vec();
3552 let non_pk_indices: Vec<usize> = ts.non_pk_indices().to_vec();
3553 let encoding_positions: Vec<u16> = ts.encoding_positions().to_vec();
3554 let dropped_non_pk_slots: Vec<u16> = ts.dropped_non_pk_slots().to_vec();
3555 let phys_count = ts.physical_non_pk_count();
3556 let col_data_types: Vec<DataType> = ts.columns.iter().map(|c| c.data_type).collect();
3557 let single_int_pk =
3558 pk_indices.len() == 1 && ts.columns[pk_indices[0]].data_type == DataType::Integer;
3559 let not_null_indices: Vec<u16> = ts
3560 .columns
3561 .iter()
3562 .filter(|c| !c.nullable)
3563 .map(|c| c.position)
3564 .collect();
3565 let bind_plan = build_bind_plan(stmt, &col_indices, &col_data_types);
3566 let any_defaults_flag = ts.columns.iter().any(|c| c.default_expr.is_some());
3567 let row_fully_overwritten = if any_defaults_flag {
3568 false
3569 } else {
3570 let mut covered: rustc_hash::FxHashSet<usize> =
3571 col_indices.iter().copied().collect();
3572 covered.extend(generated_col_positions.iter().copied());
3573 for (j, &i) in non_pk_indices.iter().enumerate() {
3574 let _ = j;
3575 if matches!(
3576 ts.columns[i].generated_kind,
3577 Some(crate::parser::GeneratedKind::Virtual)
3578 ) {
3579 covered.insert(i);
3580 }
3581 }
3582 bind_plan.is_some() && covered.len() == ts.columns.len()
3583 };
3584 let mut non_virtual_pairs: Vec<(usize, usize)> = Vec::new();
3585 let mut null_value_slots: Vec<usize> =
3586 dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3587 for (j, &i) in non_pk_indices.iter().enumerate() {
3588 let slot = encoding_positions[j] as usize;
3589 if matches!(
3590 ts.columns[i].generated_kind,
3591 Some(crate::parser::GeneratedKind::Virtual)
3592 ) {
3593 null_value_slots.push(slot);
3594 } else {
3595 non_virtual_pairs.push((i, slot));
3596 }
3597 }
3598 let row_encoder = {
3599 let all_int_or_null = non_pk_indices.iter().enumerate().all(|(j, &i)| {
3600 let col = &ts.columns[i];
3601 if matches!(
3602 col.generated_kind,
3603 Some(crate::parser::GeneratedKind::Virtual)
3604 ) {
3605 true
3606 } else {
3607 col.data_type == DataType::Integer && encoding_positions[j] != u16::MAX
3608 }
3609 });
3610 if all_int_or_null {
3611 let mut slots: Vec<crate::encoding::TemplateSlot> = (0..phys_count)
3612 .map(|_| crate::encoding::TemplateSlot::IntHole)
3613 .collect();
3614 for &s in &dropped_non_pk_slots {
3615 slots[s as usize] = crate::encoding::TemplateSlot::Null;
3616 }
3617 for (j, &i) in non_pk_indices.iter().enumerate() {
3618 if matches!(
3619 ts.columns[i].generated_kind,
3620 Some(crate::parser::GeneratedKind::Virtual)
3621 ) {
3622 slots[encoding_positions[j] as usize] =
3623 crate::encoding::TemplateSlot::Null;
3624 }
3625 }
3626 Some(crate::encoding::build_row_template(phys_count, &slots))
3627 } else {
3628 None
3629 }
3630 };
3631 let is_trivial_fast_eligible = !insert_has_subquery(stmt)
3633 && !ts.columns.iter().any(|c| c.default_expr.is_some())
3634 && !ts.has_checks()
3635 && stmt.returning.is_none()
3636 && bind_plan.is_some()
3637 && row_fully_overwritten
3638 && single_int_pk
3639 && !super::triggers::has_insert_triggers(schema, &ts.name)
3640 && (stmt.on_conflict.is_none()
3642 || !super::triggers::has_update_triggers(schema, &ts.name))
3643 && generated_fast_evals
3644 .iter()
3645 .all(|fe| !matches!(fe, FastGenEval::None));
3646 let trivial_fast_program = if is_trivial_fast_eligible {
3647 build_trivial_fast_program(
3648 bind_plan.as_ref().unwrap(),
3649 phys_count,
3650 &non_virtual_pairs,
3651 &generated_col_positions,
3652 &generated_fast_evals,
3653 ts,
3654 on_conflict.as_deref(),
3655 )
3656 } else {
3657 None
3658 };
3659 let is_trivial_fast = trivial_fast_program.is_some();
3660 let has_checks = ts.has_checks();
3661 let any_defaults = ts.columns.iter().any(|c| c.default_expr.is_some());
3662 let needs_scoped_params = bind_plan.is_none()
3663 || has_checks
3664 || any_defaults
3665 || !generated_col_positions.is_empty()
3666 || on_conflict.is_some()
3667 || stmt.returning.is_some()
3668 || insert_has_subquery(stmt)
3669 || super::helpers::any_partial_index(ts);
3670 Some(InsertCache {
3671 col_indices,
3672 has_subquery: insert_has_subquery(stmt),
3673 any_defaults,
3674 has_checks,
3675 on_conflict,
3676 generated_col_positions,
3677 generated_fast_evals,
3678 pk_indices,
3679 non_pk_indices,
3680 encoding_positions,
3681 dropped_non_pk_slots,
3682 phys_count,
3683 single_int_pk,
3684 not_null_indices,
3685 bind_plan,
3686 row_fully_overwritten,
3687 row_encoder,
3688 is_trivial_fast,
3689 trivial_fast_program,
3690 needs_scoped_params,
3691 })
3692 } else if schema.get_view(&lower).is_some() {
3693 None
3694 } else {
3695 return None;
3696 };
3697 Some(Self {
3698 table_lower: lower,
3699 cached,
3700 })
3701 }
3702}
3703
3704impl CompiledPlan for CompiledInsert {
3705 fn execute(
3706 &self,
3707 db: &Database,
3708 schema: &SchemaManager,
3709 stmt: &Statement,
3710 params: &[Value],
3711 txn: super::compile::ActiveTxnRef<'_, '_>,
3712 ) -> Result<ExecutionResult> {
3713 let ins = match stmt {
3714 Statement::Insert(i) => i,
3715 _ => {
3716 return Err(SqlError::Unsupported(
3717 "CompiledInsert received non-INSERT statement".into(),
3718 ))
3719 }
3720 };
3721 use super::compile::ActiveTxnRef;
3722 match txn {
3723 ActiveTxnRef::None => exec_insert(db, schema, ins, params),
3724 ActiveTxnRef::Read(_) => Err(SqlError::Unsupported(
3725 "cannot execute mutating statement inside a read-only transaction".into(),
3726 )),
3727 ActiveTxnRef::Write(outer) => match self.cached.as_ref() {
3728 Some(c) if c.is_trivial_fast => {
3729 if matches!(
3731 c.trivial_fast_program.as_ref().map(|p| &p.on_dup),
3732 Some(DupPolicy::Patch(_))
3733 ) {
3734 schema.mark_dml(&self.table_lower);
3735 }
3736 match with_insert_scratch(|bufs| {
3737 exec_insert_trivial_fast(outer, &self.table_lower, c, bufs, params)
3738 })? {
3739 Some(r) => Ok(r),
3740 None => exec_insert_in_txn_cached(outer, schema, ins, params, c),
3741 }
3742 }
3743 Some(c) => exec_insert_in_txn_cached(outer, schema, ins, params, c),
3744 None => exec_insert_in_txn(outer, schema, ins, params),
3745 },
3746 }
3747 }
3748
3749 fn uses_scoped_params(&self) -> bool {
3750 match self.cached.as_ref() {
3751 Some(c) => !c.is_trivial_fast && c.needs_scoped_params,
3752 None => true,
3753 }
3754 }
3755}
3756
3757fn exec_instead_of_view_insert_auto(
3758 db: &Database,
3759 schema: &SchemaManager,
3760 view_name: &str,
3761 aliases: &[String],
3762 stmt: &InsertStmt,
3763 params: &[Value],
3764) -> Result<ExecutionResult> {
3765 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
3766 let r = exec_instead_of_view_insert_in_txn(&mut wtx, schema, view_name, aliases, stmt, params)?;
3767 wtx.commit().map_err(SqlError::Storage)?;
3768 Ok(r)
3769}
3770
3771fn exec_instead_of_view_insert_in_txn(
3772 wtx: &mut WriteTxn<'_>,
3773 schema: &SchemaManager,
3774 view_name: &str,
3775 aliases: &[String],
3776 stmt: &InsertStmt,
3777 params: &[Value],
3778) -> Result<ExecutionResult> {
3779 let resolved_aliases: Vec<String> = if aliases.is_empty() {
3781 derive_view_columns(wtx, schema, view_name)?
3782 } else {
3783 aliases.to_vec()
3784 };
3785 let view_cols = super::triggers::view_columns_from_aliases(&resolved_aliases);
3786 let alias_map: rustc_hash::FxHashMap<String, usize> = resolved_aliases
3787 .iter()
3788 .enumerate()
3789 .map(|(i, name)| (name.to_ascii_lowercase(), i))
3790 .collect();
3791
3792 let target_positions: Vec<usize> = if stmt.columns.is_empty() {
3793 (0..resolved_aliases.len()).collect()
3794 } else {
3795 stmt.columns
3796 .iter()
3797 .map(|c| {
3798 alias_map
3799 .get(&c.to_ascii_lowercase())
3800 .copied()
3801 .ok_or_else(|| SqlError::ColumnNotFound(c.clone()))
3802 })
3803 .collect::<Result<_>>()?
3804 };
3805
3806 let source_rows: Vec<Vec<Value>> = match &stmt.source {
3807 InsertSource::Values(rows) => {
3808 let mut out = Vec::with_capacity(rows.len());
3809 for row in rows {
3810 if row.len() != target_positions.len() {
3811 return Err(SqlError::InvalidValue(format!(
3812 "expected {} values, got {}",
3813 target_positions.len(),
3814 row.len()
3815 )));
3816 }
3817 let mut vals = Vec::with_capacity(row.len());
3818 for expr in row {
3819 let v = match expr {
3820 Expr::Parameter(n) => params
3821 .get(n - 1)
3822 .cloned()
3823 .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
3824 Expr::Literal(v) => v.clone(),
3825 other => eval_const_expr(other)?,
3826 };
3827 vals.push(v);
3828 }
3829 out.push(vals);
3830 }
3831 out
3832 }
3833 InsertSource::Select(sq) => {
3834 let empty_ctes = CteContext::default();
3835 let qr = exec_query_body_write(wtx, schema, &sq.body, &empty_ctes)?;
3836 qr.rows
3837 }
3838 };
3839
3840 let mut count: u64 = 0;
3841 for row in source_rows {
3842 if row.len() != target_positions.len() {
3843 return Err(SqlError::InvalidValue(format!(
3844 "expected {} values, got {}",
3845 target_positions.len(),
3846 row.len()
3847 )));
3848 }
3849 let mut new_row = vec![Value::Null; resolved_aliases.len()];
3850 for (slot, val) in target_positions.iter().zip(row) {
3851 new_row[*slot] = val;
3852 }
3853 super::triggers::fire_row_triggers(
3854 wtx,
3855 schema,
3856 view_name,
3857 crate::parser::TriggerTiming::InsteadOf,
3858 super::triggers::FireEvent::Insert,
3859 None,
3860 Some(new_row),
3861 &view_cols,
3862 )?;
3863 count += 1;
3864 }
3865 Ok(ExecutionResult::RowsAffected(count))
3866}
3867
3868fn derive_view_columns(
3869 wtx: &mut WriteTxn<'_>,
3870 schema: &SchemaManager,
3871 view_name: &str,
3872) -> Result<Vec<String>> {
3873 use crate::parser::{QueryBody, SelectColumn, SelectQuery, SelectStmt};
3874 let sel = SelectStmt {
3875 columns: vec![SelectColumn::AllColumns],
3876 from: view_name.to_string(),
3877 from_alias: None,
3878 from_subquery: None,
3879 from_args: None,
3880 from_json_table: None,
3881 joins: vec![],
3882 distinct: false,
3883 where_clause: None,
3884 order_by: vec![],
3885 limit: Some(Expr::Literal(Value::Integer(1))),
3886 offset: None,
3887 group_by: vec![],
3888 having: None,
3889 };
3890 let sq = SelectQuery {
3891 ctes: vec![],
3892 recursive: false,
3893 body: QueryBody::Select(Box::new(sel)),
3894 };
3895 let qr = super::cte::exec_select_query_in_txn(wtx, schema, &sq)?;
3896 match qr {
3897 ExecutionResult::Query(q) => Ok(q.columns),
3898 _ => Ok(Vec::new()),
3899 }
3900}
3901
3902#[cfg(test)]
3903#[path = "dml_tests.rs"]
3904mod tests;