1use std::cell::RefCell;
2use std::sync::Arc;
3
4use citadel::Database;
5use citadel_buffer::btree::{UpsertAction, UpsertOutcome};
6use citadel_txn::read_txn::ReadTxn;
7use citadel_txn::write_txn::WriteTxn;
8use rustc_hash::FxHashMap;
9
10use crate::encoding::{encode_composite_key_into, encode_row_into};
11use crate::error::{Result, SqlError};
12use crate::eval::{eval_expr, is_truthy, ColumnMap, EvalCtx};
13use crate::parser::*;
14use crate::types::*;
15
16use crate::schema::SchemaManager;
17
18use super::compile::CompiledPlan;
19use super::helpers::*;
20use super::CteContext;
21
22fn row_insert_trigger_flags(schema: &SchemaManager, table_name: &str) -> (bool, bool, bool) {
24 use crate::parser::{TriggerEvent, TriggerGranularity, TriggerTiming};
25 let triggers = schema.triggers_for(table_name);
26 let row =
27 |t: &crate::types::TriggerDef| t.enabled && t.granularity == TriggerGranularity::ForEachRow;
28 let ev = |t: &crate::types::TriggerDef, update: bool| {
29 t.events.iter().any(|e| {
30 if update {
31 matches!(e, TriggerEvent::Update(_))
32 } else {
33 matches!(e, TriggerEvent::Insert)
34 }
35 })
36 };
37 (
38 triggers
39 .iter()
40 .any(|t| row(t) && t.timing == TriggerTiming::Before && ev(t, false)),
41 triggers
42 .iter()
43 .any(|t| row(t) && t.timing == TriggerTiming::After && ev(t, false)),
44 triggers
45 .iter()
46 .any(|t| row(t) && t.timing == TriggerTiming::After && ev(t, true)),
47 )
48}
49
50fn mark_insert_dml(
53 schema: &SchemaManager,
54 table_name: &str,
55 on_conflict: bool,
56 single_int_pk: bool,
57 min_inserted_pk: Option<i64>,
58 rows_written: u64,
59) {
60 if on_conflict {
61 schema.mark_dml(table_name);
62 } else if single_int_pk {
63 if let Some(m) = min_inserted_pk {
64 schema.mark_dml_append(table_name, m);
65 }
66 } else if rows_written > 0 {
67 schema.mark_dml(table_name);
68 }
69}
70
71fn is_single_int_pk(table_schema: &TableSchema) -> bool {
73 table_schema.primary_key_columns.len() == 1
74 && matches!(
75 table_schema.columns[table_schema.primary_key_columns[0] as usize].data_type,
76 DataType::Integer
77 )
78}
79
80pub(super) fn exec_insert(
81 db: &Database,
82 schema: &SchemaManager,
83 stmt: &InsertStmt,
84 params: &[Value],
85) -> Result<ExecutionResult> {
86 let empty_ctes = CteContext::default();
87 let materialized;
88 let stmt = if insert_has_subquery(stmt) {
89 materialized = materialize_insert(stmt, &mut |sub| {
90 exec_subquery_read(db, schema, sub, &empty_ctes)
91 })?;
92 &materialized
93 } else {
94 stmt
95 };
96
97 let lower_name = stmt.table.to_ascii_lowercase();
98 if let Some(view_def) = schema.get_view(&lower_name) {
99 if super::triggers::has_instead_of(schema, &lower_name, super::triggers::FireEvent::Insert)
100 {
101 let aliases = view_def.column_aliases.clone();
102 return exec_instead_of_view_insert_auto(
103 db,
104 schema,
105 &lower_name,
106 &aliases,
107 stmt,
108 params,
109 );
110 }
111 return Err(SqlError::CannotModifyView(stmt.table.clone()));
112 }
113 if schema.get_matview(&lower_name).is_some() {
114 return Err(SqlError::CannotModifyView(format!(
115 "materialized view '{}' is read-only — use REFRESH MATERIALIZED VIEW",
116 stmt.table
117 )));
118 }
119 let table_schema = schema
120 .get(&lower_name)
121 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
122
123 let insert_columns = if stmt.columns.is_empty() {
124 table_schema
125 .columns
126 .iter()
127 .map(|c| c.name.clone())
128 .collect::<Vec<_>>()
129 } else {
130 stmt.columns
131 .iter()
132 .map(|c| c.to_ascii_lowercase())
133 .collect()
134 };
135
136 let col_indices: Vec<usize> = insert_columns
137 .iter()
138 .map(|name| {
139 table_schema
140 .column_index(name)
141 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
142 })
143 .collect::<Result<_>>()?;
144
145 for &ci in &col_indices {
146 if table_schema.columns[ci].generated_kind.is_some() {
147 return Err(SqlError::CannotInsertIntoGeneratedColumn(
148 table_schema.columns[ci].name.clone(),
149 ));
150 }
151 }
152
153 let defaults: Vec<(usize, &Expr)> = table_schema
154 .columns
155 .iter()
156 .filter(|c| c.default_expr.is_some() && !col_indices.contains(&(c.position as usize)))
157 .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
158 .collect();
159
160 let generated_cols: Vec<(usize, &Expr)> = table_schema
161 .columns
162 .iter()
163 .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
164 .map(|c| (c.position as usize, c.generated_expr.as_ref().unwrap()))
165 .collect();
166
167 let has_checks = table_schema.has_checks();
168 let strict = table_schema.is_strict();
169 let row_col_map_for_gen = if !generated_cols.is_empty() {
170 Some(ColumnMap::new(&table_schema.columns))
171 } else {
172 None
173 };
174 let check_col_map = if has_checks {
175 Some(ColumnMap::new(&table_schema.columns))
176 } else {
177 None
178 };
179
180 let select_rows = match &stmt.source {
181 InsertSource::Select(sq) => {
182 let insert_ctes =
183 super::materialize_all_ctes(&sq.ctes, sq.recursive, &mut |body, ctx| {
184 exec_query_body_read(db, schema, body, ctx)
185 })?;
186 let qr = exec_query_body_read(db, schema, &sq.body, &insert_ctes)?;
187 Some(qr.rows)
188 }
189 InsertSource::Values(_) => None,
190 };
191
192 let compiled_conflict: Option<Arc<CompiledOnConflict>> = stmt
193 .on_conflict
194 .as_ref()
195 .map(|oc| compile_on_conflict(oc, table_schema).map(Arc::new))
196 .transpose()?;
197
198 let row_col_map = compiled_conflict
199 .as_ref()
200 .map(|_| ColumnMap::new(&table_schema.columns));
201
202 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
203 super::ann_persist::purge_segment(&mut wtx, &table_schema.name)?;
207 let mut count: u64 = 0;
208 let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
209 stmt.returning.as_ref().map(|_| Vec::new());
210
211 let pk_indices = table_schema.pk_indices();
212 let non_pk = table_schema.non_pk_indices();
213 let enc_pos = table_schema.encoding_positions();
214 let phys_count = table_schema.physical_non_pk_count();
215 let mut row = vec![Value::Null; table_schema.columns.len()];
216 let mut pk_values: Vec<Value> = vec![Value::Null; pk_indices.len()];
217 let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
218 let mut key_buf: Vec<u8> = Vec::with_capacity(64);
219 let mut value_buf: Vec<u8> = Vec::with_capacity(256);
220 let mut fk_key_buf: Vec<u8> = Vec::with_capacity(64);
221
222 let values = match &stmt.source {
223 InsertSource::Values(rows) => Some(rows.as_slice()),
224 InsertSource::Select(_) => None,
225 };
226 let sel_rows = select_rows.as_deref();
227
228 let total = match (values, sel_rows) {
229 (Some(rows), _) => rows.len(),
230 (_, Some(rows)) => rows.len(),
231 _ => 0,
232 };
233
234 if let Some(sel) = sel_rows {
235 if !sel.is_empty() && sel[0].len() != insert_columns.len() {
236 return Err(SqlError::InvalidValue(format!(
237 "INSERT ... SELECT column count mismatch: expected {}, got {}",
238 insert_columns.len(),
239 sel[0].len()
240 )));
241 }
242 }
243
244 let has_insert_statement_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
245 t.enabled
246 && t.granularity == crate::parser::TriggerGranularity::ForEachStatement
247 && t.events
248 .iter()
249 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
250 });
251 let mut stmt_new_rows: Vec<Vec<Value>> = if has_insert_statement_triggers {
252 Vec::with_capacity(total)
253 } else {
254 Vec::new()
255 };
256
257 if has_insert_statement_triggers {
258 super::triggers::fire_statement_triggers(
259 &mut wtx,
260 schema,
261 &table_schema.name,
262 crate::parser::TriggerTiming::Before,
263 super::triggers::FireEvent::Insert,
264 &table_schema.columns,
265 &[],
266 &[],
267 )?;
268 }
269
270 let plain_insert = compiled_conflict.is_none();
271 let single_int_pk = is_single_int_pk(table_schema);
272 let mut min_inserted_pk: Option<i64> = None;
273 let (has_before_insert_triggers, has_after_insert_triggers, has_after_update_triggers) =
274 row_insert_trigger_flags(schema, &table_schema.name);
275
276 for idx in 0..total {
277 for v in row.iter_mut() {
278 *v = Value::Null;
279 }
280
281 if let Some(value_rows) = values {
282 let value_row = &value_rows[idx];
283 if value_row.len() != insert_columns.len() {
284 return Err(SqlError::InvalidValue(format!(
285 "expected {} values, got {}",
286 insert_columns.len(),
287 value_row.len()
288 )));
289 }
290 for (i, expr) in value_row.iter().enumerate() {
291 let val = if let Expr::Parameter(n) = expr {
292 params
293 .get(n - 1)
294 .cloned()
295 .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?
296 } else {
297 eval_const_expr(expr)?
298 };
299 let col_idx = col_indices[i];
300 let col = &table_schema.columns[col_idx];
301 row[col_idx] = if val.is_null() {
302 Value::Null
303 } else {
304 coerce_for_column(val, col, strict)?
305 };
306 }
307 } else if let Some(sel) = sel_rows {
308 let sel_row = &sel[idx];
309 for (i, val) in sel_row.iter().enumerate() {
310 let col_idx = col_indices[i];
311 let col = &table_schema.columns[col_idx];
312 row[col_idx] = if val.is_null() {
313 Value::Null
314 } else {
315 coerce_for_column(val.clone(), col, strict)?
316 };
317 }
318 }
319
320 for &(pos, def_expr) in &defaults {
321 let val = eval_const_expr(def_expr)?;
322 let col = &table_schema.columns[pos];
323 if !val.is_null() {
324 row[pos] = coerce_for_column(val, col, strict)?;
325 }
326 }
327
328 if let Some(ref gen_map) = row_col_map_for_gen {
329 for &(pos, gen_expr) in &generated_cols {
330 let val = eval_expr(gen_expr, &EvalCtx::new(gen_map, &row))?;
331 let col = &table_schema.columns[pos];
332 row[pos] = if val.is_null() {
333 Value::Null
334 } else {
335 coerce_for_column(val, col, strict)?
336 };
337 }
338 }
339
340 for col in &table_schema.columns {
341 if !col.nullable && row[col.position as usize].is_null() {
342 return Err(SqlError::NotNullViolation(col.name.clone()));
343 }
344 }
345
346 if let Some(ref col_map) = check_col_map {
347 for col in &table_schema.columns {
348 if let Some(ref check) = col.check_expr {
349 let result = eval_expr(check, &EvalCtx::new(col_map, &row))?;
350 if !is_truthy(&result) && !result.is_null() {
351 let name = col.check_name.as_deref().unwrap_or(&col.name);
352 return Err(SqlError::CheckViolation(name.to_string()));
353 }
354 }
355 }
356 for tc in &table_schema.check_constraints {
357 let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &row))?;
358 if !is_truthy(&result) && !result.is_null() {
359 let name = tc.name.as_deref().unwrap_or(&tc.sql);
360 return Err(SqlError::CheckViolation(name.to_string()));
361 }
362 }
363 }
364
365 for fk in &table_schema.foreign_keys {
366 let any_null = fk.columns.iter().any(|&ci| row[ci as usize].is_null());
367 if any_null {
368 continue; }
370 let fk_vals: Vec<Value> = fk
371 .columns
372 .iter()
373 .map(|&ci| row[ci as usize].clone())
374 .collect();
375 fk_key_buf.clear();
376 encode_composite_key_into(&fk_vals, &mut fk_key_buf);
377 if fk.deferrable && fk.initially_deferred {
378 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
379 wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
380 fk_name: name,
381 foreign_table: fk.foreign_table.as_bytes().to_vec(),
382 parent_key: fk_key_buf.clone(),
383 });
384 continue;
385 }
386 if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key_buf) {
387 let found = wtx
388 .table_get(fk.foreign_table.as_bytes(), &fk_key_buf)
389 .map_err(SqlError::Storage)?;
390 if found.is_none() {
391 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
392 return Err(SqlError::ForeignKeyViolation(name.to_string()));
393 }
394 wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key_buf);
395 }
396 }
397
398 let proposed_row_for_returning: Option<Vec<Value>> =
399 returning_rows.as_ref().map(|_| row.clone());
400 let row_for_stmt_trigger: Option<Vec<Value>> = if has_insert_statement_triggers {
401 Some(row.clone())
402 } else {
403 None
404 };
405
406 if has_before_insert_triggers {
407 super::triggers::fire_row_triggers(
408 &mut wtx,
409 schema,
410 &table_schema.name,
411 crate::parser::TriggerTiming::Before,
412 super::triggers::FireEvent::Insert,
413 None,
414 Some(row.clone()),
415 &table_schema.columns,
416 )?;
417 }
418
419 for (j, &i) in pk_indices.iter().enumerate() {
420 pk_values[j] = std::mem::replace(&mut row[i], Value::Null);
421 }
422 encode_composite_key_into(&pk_values, &mut key_buf);
423 if plain_insert && single_int_pk {
424 if let Value::Integer(id) = &pk_values[0] {
425 min_inserted_pk = Some(min_inserted_pk.map_or(*id, |m| m.min(*id)));
426 }
427 }
428
429 for (j, &i) in non_pk.iter().enumerate() {
430 let col = &table_schema.columns[i];
431 if matches!(
432 col.generated_kind,
433 Some(crate::parser::GeneratedKind::Virtual)
434 ) {
435 value_values[enc_pos[j] as usize] = Value::Null;
436 row[i] = Value::Null;
437 } else {
438 value_values[enc_pos[j] as usize] = std::mem::replace(&mut row[i], Value::Null);
439 }
440 }
441 encode_row_into(&value_values, &mut value_buf);
442
443 if key_buf.len() > citadel_core::MAX_KEY_SIZE {
444 return Err(SqlError::KeyTooLarge {
445 size: key_buf.len(),
446 max: citadel_core::MAX_KEY_SIZE,
447 });
448 }
449 if value_buf.len() > citadel_core::MAX_VALUE_SIZE {
450 return Err(SqlError::RowTooLarge {
451 size: value_buf.len(),
452 max: citadel_core::MAX_VALUE_SIZE,
453 });
454 }
455
456 match compiled_conflict.as_ref() {
457 None => {
458 let is_new = wtx
459 .table_insert(table_schema.name.as_bytes(), &key_buf, &value_buf)
460 .map_err(SqlError::Storage)?;
461 if !is_new {
462 return Err(SqlError::DuplicateKey);
463 }
464 if !table_schema.indices.is_empty() || has_after_insert_triggers {
465 for (j, &i) in pk_indices.iter().enumerate() {
466 row[i] = pk_values[j].clone();
467 }
468 for (j, &i) in non_pk.iter().enumerate() {
469 row[i] =
470 std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
471 }
472 if !table_schema.indices.is_empty() {
473 insert_index_entries(&mut wtx, table_schema, &row, &pk_values)?;
474 }
475 if has_after_insert_triggers {
476 super::triggers::fire_row_triggers(
477 &mut wtx,
478 schema,
479 &table_schema.name,
480 crate::parser::TriggerTiming::After,
481 super::triggers::FireEvent::Insert,
482 None,
483 Some(row.clone()),
484 &table_schema.columns,
485 )?;
486 }
487 }
488 if let Some(r) = row_for_stmt_trigger.clone() {
489 stmt_new_rows.push(r);
490 }
491 count += 1;
492 if let Some(buf) = returning_rows.as_mut() {
493 buf.push((None, proposed_row_for_returning));
494 }
495 }
496 Some(oc) => {
497 let oc_ref: &CompiledOnConflict = oc;
498 let needs_row = upsert_needs_row(oc_ref, table_schema);
499 if needs_row {
500 for (j, &i) in pk_indices.iter().enumerate() {
501 row[i] = pk_values[j].clone();
502 }
503 for (j, &i) in non_pk.iter().enumerate() {
504 row[i] =
505 std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
506 }
507 }
508 let outcome = apply_insert_with_conflict(
509 &mut wtx,
510 table_schema,
511 &key_buf,
512 &value_buf,
513 &row,
514 &pk_values,
515 oc_ref,
516 row_col_map.as_ref().unwrap(),
517 stmt.returning.is_some(),
518 )?;
519 match outcome {
520 InsertRowOutcome::Inserted => {
521 count += 1;
522 if let Some(buf) = returning_rows.as_mut() {
523 buf.push((None, proposed_row_for_returning));
524 }
525 if let Some(r) = row_for_stmt_trigger.clone() {
526 stmt_new_rows.push(r);
527 }
528 if has_after_insert_triggers {
529 super::triggers::fire_row_triggers(
530 &mut wtx,
531 schema,
532 &table_schema.name,
533 crate::parser::TriggerTiming::After,
534 super::triggers::FireEvent::Insert,
535 None,
536 Some(row.clone()),
537 &table_schema.columns,
538 )?;
539 }
540 }
541 InsertRowOutcome::Updated { old, new } => {
542 count += 1;
543 if let Some(buf) = returning_rows.as_mut() {
544 buf.push((Some(old.clone()), Some(new.clone())));
545 }
546 if has_after_update_triggers {
547 let changed_cols: Vec<String> = match oc_ref {
548 CompiledOnConflict::DoUpdate { assignments, .. } => assignments
549 .iter()
550 .map(|(col_idx, _)| table_schema.columns[*col_idx].name.clone())
551 .collect(),
552 _ => Vec::new(),
553 };
554 super::triggers::fire_row_triggers(
555 &mut wtx,
556 schema,
557 &table_schema.name,
558 crate::parser::TriggerTiming::After,
559 super::triggers::FireEvent::Update {
560 changed_columns: &changed_cols,
561 },
562 Some(old),
563 Some(new),
564 &table_schema.columns,
565 )?;
566 }
567 }
568 InsertRowOutcome::Skipped => {}
569 }
570 }
571 }
572 }
573
574 if has_insert_statement_triggers {
575 super::triggers::fire_statement_triggers(
576 &mut wtx,
577 schema,
578 &table_schema.name,
579 crate::parser::TriggerTiming::After,
580 super::triggers::FireEvent::Insert,
581 &table_schema.columns,
582 &[],
583 &stmt_new_rows,
584 )?;
585 }
586
587 mark_insert_dml(
588 schema,
589 &table_schema.name,
590 !plain_insert,
591 single_int_pk,
592 min_inserted_pk,
593 count,
594 );
595
596 if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
597 let qr = super::helpers::project_returning(table_schema, returning_cols, &rows)?;
598 super::helpers::drain_deferred_fk_checks(&mut wtx)?;
599 wtx.commit().map_err(SqlError::Storage)?;
600 return Ok(ExecutionResult::Query(qr));
601 }
602
603 super::helpers::drain_deferred_fk_checks(&mut wtx)?;
604 wtx.commit().map_err(SqlError::Storage)?;
605 Ok(ExecutionResult::RowsAffected(count))
606}
607
608pub(super) fn has_subquery(expr: &Expr) -> bool {
609 crate::parser::has_subquery(expr)
610}
611
612pub(super) fn stmt_has_subquery(stmt: &SelectStmt) -> bool {
613 if let Some(ref w) = stmt.where_clause {
614 if has_subquery(w) {
615 return true;
616 }
617 }
618 if let Some(ref h) = stmt.having {
619 if has_subquery(h) {
620 return true;
621 }
622 }
623 for col in &stmt.columns {
624 if let SelectColumn::Expr { expr, .. } = col {
625 if has_subquery(expr) {
626 return true;
627 }
628 }
629 }
630 for ob in &stmt.order_by {
631 if has_subquery(&ob.expr) {
632 return true;
633 }
634 }
635 for join in &stmt.joins {
636 if let Some(ref on_expr) = join.on_clause {
637 if has_subquery(on_expr) {
638 return true;
639 }
640 }
641 }
642 false
643}
644
645pub(super) fn materialize_expr(
646 expr: &Expr,
647 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
648) -> Result<Expr> {
649 match expr {
650 Expr::InSubquery {
651 expr: e,
652 subquery,
653 negated,
654 } => {
655 let inner = materialize_expr(e, exec_sub)?;
656 let qr = exec_sub(subquery)?;
657 if !qr.columns.is_empty() && qr.columns.len() != 1 {
658 return Err(SqlError::SubqueryMultipleColumns);
659 }
660 let mut values = rustc_hash::FxHashSet::default();
661 let mut has_null = false;
662 for row in &qr.rows {
663 if row[0].is_null() {
664 has_null = true;
665 } else {
666 values.insert(row[0].clone());
667 }
668 }
669 Ok(Expr::InSet {
670 expr: Box::new(inner),
671 values,
672 has_null,
673 negated: *negated,
674 })
675 }
676 Expr::ScalarSubquery(subquery) => {
677 let qr = exec_sub(subquery)?;
678 if qr.rows.len() > 1 {
679 return Err(SqlError::SubqueryMultipleRows);
680 }
681 let val = if qr.rows.is_empty() {
682 Value::Null
683 } else {
684 qr.rows[0][0].clone()
685 };
686 Ok(Expr::Literal(val))
687 }
688 Expr::Exists { subquery, negated } => {
689 let qr = exec_sub(subquery)?;
690 let exists = !qr.rows.is_empty();
691 let result = if *negated { !exists } else { exists };
692 Ok(Expr::Literal(Value::Boolean(result)))
693 }
694 Expr::InList {
695 expr: e,
696 list,
697 negated,
698 } => {
699 let inner = materialize_expr(e, exec_sub)?;
700 let items = list
701 .iter()
702 .map(|item| materialize_expr(item, exec_sub))
703 .collect::<Result<Vec<_>>>()?;
704 Ok(Expr::InList {
705 expr: Box::new(inner),
706 list: items,
707 negated: *negated,
708 })
709 }
710 Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
711 left: Box::new(materialize_expr(left, exec_sub)?),
712 op: *op,
713 right: Box::new(materialize_expr(right, exec_sub)?),
714 }),
715 Expr::UnaryOp { op, expr: e } => Ok(Expr::UnaryOp {
716 op: *op,
717 expr: Box::new(materialize_expr(e, exec_sub)?),
718 }),
719 Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(materialize_expr(e, exec_sub)?))),
720 Expr::IsNotNull(e) => Ok(Expr::IsNotNull(Box::new(materialize_expr(e, exec_sub)?))),
721 Expr::InSet {
722 expr: e,
723 values,
724 has_null,
725 negated,
726 } => Ok(Expr::InSet {
727 expr: Box::new(materialize_expr(e, exec_sub)?),
728 values: values.clone(),
729 has_null: *has_null,
730 negated: *negated,
731 }),
732 Expr::Between {
733 expr: e,
734 low,
735 high,
736 negated,
737 } => Ok(Expr::Between {
738 expr: Box::new(materialize_expr(e, exec_sub)?),
739 low: Box::new(materialize_expr(low, exec_sub)?),
740 high: Box::new(materialize_expr(high, exec_sub)?),
741 negated: *negated,
742 }),
743 Expr::Like {
744 expr: e,
745 pattern,
746 escape,
747 negated,
748 } => {
749 let esc = escape
750 .as_ref()
751 .map(|es| materialize_expr(es, exec_sub).map(Box::new))
752 .transpose()?;
753 Ok(Expr::Like {
754 expr: Box::new(materialize_expr(e, exec_sub)?),
755 pattern: Box::new(materialize_expr(pattern, exec_sub)?),
756 escape: esc,
757 negated: *negated,
758 })
759 }
760 Expr::Case {
761 operand,
762 conditions,
763 else_result,
764 } => {
765 let op = operand
766 .as_ref()
767 .map(|e| materialize_expr(e, exec_sub).map(Box::new))
768 .transpose()?;
769 let conds = conditions
770 .iter()
771 .map(|(c, r)| {
772 Ok((
773 materialize_expr(c, exec_sub)?,
774 materialize_expr(r, exec_sub)?,
775 ))
776 })
777 .collect::<Result<Vec<_>>>()?;
778 let else_r = else_result
779 .as_ref()
780 .map(|e| materialize_expr(e, exec_sub).map(Box::new))
781 .transpose()?;
782 Ok(Expr::Case {
783 operand: op,
784 conditions: conds,
785 else_result: else_r,
786 })
787 }
788 Expr::Coalesce(args) => {
789 let materialized = args
790 .iter()
791 .map(|a| materialize_expr(a, exec_sub))
792 .collect::<Result<Vec<_>>>()?;
793 Ok(Expr::Coalesce(materialized))
794 }
795 Expr::Cast { expr: e, data_type } => Ok(Expr::Cast {
796 expr: Box::new(materialize_expr(e, exec_sub)?),
797 data_type: *data_type,
798 }),
799 Expr::Function {
800 name,
801 args,
802 distinct,
803 } => {
804 let materialized = args
805 .iter()
806 .map(|a| materialize_expr(a, exec_sub))
807 .collect::<Result<Vec<_>>>()?;
808 Ok(Expr::Function {
809 name: name.clone(),
810 args: materialized,
811 distinct: *distinct,
812 })
813 }
814 other => Ok(other.clone()),
815 }
816}
817
818pub(super) fn materialize_stmt(
819 stmt: &SelectStmt,
820 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
821) -> Result<SelectStmt> {
822 let where_clause = stmt
823 .where_clause
824 .as_ref()
825 .map(|e| materialize_expr(e, exec_sub))
826 .transpose()?;
827 let having = stmt
828 .having
829 .as_ref()
830 .map(|e| materialize_expr(e, exec_sub))
831 .transpose()?;
832 let columns = stmt
833 .columns
834 .iter()
835 .map(|c| match c {
836 SelectColumn::AllColumns => Ok(SelectColumn::AllColumns),
837 SelectColumn::AllFromOld => Ok(SelectColumn::AllFromOld),
838 SelectColumn::AllFromNew => Ok(SelectColumn::AllFromNew),
839 SelectColumn::Expr { expr, alias } => Ok(SelectColumn::Expr {
840 expr: materialize_expr(expr, exec_sub)?,
841 alias: alias.clone(),
842 }),
843 })
844 .collect::<Result<Vec<_>>>()?;
845 let order_by = stmt
846 .order_by
847 .iter()
848 .map(|ob| {
849 Ok(OrderByItem {
850 expr: materialize_expr(&ob.expr, exec_sub)?,
851 descending: ob.descending,
852 nulls_first: ob.nulls_first,
853 })
854 })
855 .collect::<Result<Vec<_>>>()?;
856 let joins = stmt
857 .joins
858 .iter()
859 .map(|j| {
860 let on_clause = j
861 .on_clause
862 .as_ref()
863 .map(|e| materialize_expr(e, exec_sub))
864 .transpose()?;
865 Ok(JoinClause {
866 join_type: j.join_type,
867 table: j.table.clone(),
868 subquery: j.subquery.clone(),
869 on_clause,
870 })
871 })
872 .collect::<Result<Vec<_>>>()?;
873 let group_by = stmt
874 .group_by
875 .iter()
876 .map(|e| materialize_expr(e, exec_sub))
877 .collect::<Result<Vec<_>>>()?;
878 Ok(SelectStmt {
879 columns,
880 from: stmt.from.clone(),
881 from_alias: stmt.from_alias.clone(),
882 from_subquery: stmt.from_subquery.clone(),
883 from_args: stmt.from_args.clone(),
884 from_json_table: stmt.from_json_table.clone(),
885 joins,
886 distinct: stmt.distinct,
887 where_clause,
888 order_by,
889 limit: stmt.limit.clone(),
890 offset: stmt.offset.clone(),
891 group_by,
892 having,
893 })
894}
895
896pub(super) fn exec_subquery_read(
897 db: &Database,
898 schema: &SchemaManager,
899 stmt: &SelectStmt,
900 ctes: &CteContext,
901) -> Result<QueryResult> {
902 let mut rtx = db.begin_read();
903 exec_subquery_with_read(&mut rtx, schema, stmt, ctes)
904}
905
906pub(super) fn exec_subquery_with_read(
907 rtx: &mut ReadTxn<'_>,
908 schema: &SchemaManager,
909 stmt: &SelectStmt,
910 ctes: &CteContext,
911) -> Result<QueryResult> {
912 match super::exec_select_with_read(rtx, schema, stmt, ctes)? {
913 ExecutionResult::Query(qr) => Ok(qr),
914 _ => Ok(QueryResult {
915 columns: vec![],
916 rows: vec![],
917 }),
918 }
919}
920
921pub(super) fn exec_subquery_write(
922 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
923 schema: &SchemaManager,
924 stmt: &SelectStmt,
925 ctes: &CteContext,
926) -> Result<QueryResult> {
927 match super::exec_select_in_txn(wtx, schema, stmt, ctes)? {
928 ExecutionResult::Query(qr) => Ok(qr),
929 _ => Ok(QueryResult {
930 columns: vec![],
931 rows: vec![],
932 }),
933 }
934}
935
936pub(super) fn update_has_subquery(stmt: &UpdateStmt) -> bool {
937 stmt.where_clause.as_ref().is_some_and(has_subquery)
938 || stmt.assignments.iter().any(|(_, e)| has_subquery(e))
939}
940
941pub(super) fn materialize_update(
942 stmt: &UpdateStmt,
943 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
944) -> Result<UpdateStmt> {
945 let where_clause = stmt
946 .where_clause
947 .as_ref()
948 .map(|e| materialize_expr(e, exec_sub))
949 .transpose()?;
950 let assignments = stmt
951 .assignments
952 .iter()
953 .map(|(name, expr)| Ok((name.clone(), materialize_expr(expr, exec_sub)?)))
954 .collect::<Result<Vec<_>>>()?;
955 Ok(UpdateStmt {
956 table: stmt.table.clone(),
957 assignments,
958 where_clause,
959 returning: stmt.returning.clone(),
960 })
961}
962
963pub(super) fn delete_has_subquery(stmt: &DeleteStmt) -> bool {
964 stmt.where_clause.as_ref().is_some_and(has_subquery)
965}
966
967pub(super) fn materialize_delete(
968 stmt: &DeleteStmt,
969 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
970) -> Result<DeleteStmt> {
971 let where_clause = stmt
972 .where_clause
973 .as_ref()
974 .map(|e| materialize_expr(e, exec_sub))
975 .transpose()?;
976 Ok(DeleteStmt {
977 table: stmt.table.clone(),
978 where_clause,
979 returning: stmt.returning.clone(),
980 })
981}
982
983pub(super) fn insert_has_subquery(stmt: &InsertStmt) -> bool {
984 match &stmt.source {
985 InsertSource::Values(rows) => rows.iter().any(|row| row.iter().any(has_subquery)),
986 InsertSource::Select(_) => false,
988 }
989}
990
991pub(super) fn materialize_insert(
992 stmt: &InsertStmt,
993 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
994) -> Result<InsertStmt> {
995 let source = match &stmt.source {
996 InsertSource::Values(rows) => {
997 let mat = rows
998 .iter()
999 .map(|row| {
1000 row.iter()
1001 .map(|e| materialize_expr(e, exec_sub))
1002 .collect::<Result<Vec<_>>>()
1003 })
1004 .collect::<Result<Vec<_>>>()?;
1005 InsertSource::Values(mat)
1006 }
1007 InsertSource::Select(sq) => {
1008 let ctes = sq
1009 .ctes
1010 .iter()
1011 .map(|c| {
1012 Ok(CteDefinition {
1013 name: c.name.clone(),
1014 column_aliases: c.column_aliases.clone(),
1015 body: materialize_query_body(&c.body, exec_sub)?,
1016 })
1017 })
1018 .collect::<Result<Vec<_>>>()?;
1019 let body = materialize_query_body(&sq.body, exec_sub)?;
1020 InsertSource::Select(Box::new(SelectQuery {
1021 ctes,
1022 recursive: sq.recursive,
1023 body,
1024 }))
1025 }
1026 };
1027 Ok(InsertStmt {
1028 table: stmt.table.clone(),
1029 columns: stmt.columns.clone(),
1030 source,
1031 on_conflict: stmt.on_conflict.clone(),
1032 returning: stmt.returning.clone(),
1033 })
1034}
1035
1036pub(super) fn materialize_query_body(
1037 body: &QueryBody,
1038 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1039) -> Result<QueryBody> {
1040 match body {
1041 QueryBody::Select(sel) => Ok(QueryBody::Select(Box::new(materialize_stmt(
1042 sel, exec_sub,
1043 )?))),
1044 QueryBody::Compound(comp) => Ok(QueryBody::Compound(Box::new(CompoundSelect {
1045 op: comp.op.clone(),
1046 all: comp.all,
1047 left: Box::new(materialize_query_body(&comp.left, exec_sub)?),
1048 right: Box::new(materialize_query_body(&comp.right, exec_sub)?),
1049 order_by: comp.order_by.clone(),
1050 limit: comp.limit.clone(),
1051 offset: comp.offset.clone(),
1052 }))),
1053 QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Ok(body.clone()),
1054 }
1055}
1056
1057pub(super) fn exec_query_body_with_read(
1058 rtx: &mut ReadTxn<'_>,
1059 schema: &SchemaManager,
1060 body: &QueryBody,
1061 ctes: &CteContext,
1062) -> Result<ExecutionResult> {
1063 match body {
1064 QueryBody::Select(sel) => super::exec_select_with_read(rtx, schema, sel, ctes),
1065 QueryBody::Compound(comp) => exec_compound_select_with_read(rtx, schema, comp, ctes),
1066 QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Err(
1067 SqlError::Unsupported("DML CTE bodies require an active write transaction".into()),
1068 ),
1069 }
1070}
1071
1072pub(super) fn exec_query_body_in_txn(
1073 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1074 schema: &SchemaManager,
1075 body: &QueryBody,
1076 ctes: &CteContext,
1077) -> Result<ExecutionResult> {
1078 match body {
1079 QueryBody::Select(sel) => super::exec_select_in_txn(wtx, schema, sel, ctes),
1080 QueryBody::Compound(comp) => exec_compound_select_in_txn(wtx, schema, comp, ctes),
1081 QueryBody::Insert(ins) => exec_insert_in_txn_with_ctes(wtx, schema, ins, &[], ctes),
1082 QueryBody::Update(upd) => super::exec_update_in_txn(wtx, schema, upd),
1083 QueryBody::Delete(del) => super::exec_delete_in_txn(wtx, schema, del),
1084 }
1085}
1086
1087pub(super) fn exec_query_body_read(
1088 db: &Database,
1089 schema: &SchemaManager,
1090 body: &QueryBody,
1091 ctes: &CteContext,
1092) -> Result<QueryResult> {
1093 let mut rtx = db.begin_read();
1094 exec_query_body_with_read_qr(&mut rtx, schema, body, ctes)
1095}
1096
1097pub(super) fn exec_query_body_with_read_qr(
1098 rtx: &mut ReadTxn<'_>,
1099 schema: &SchemaManager,
1100 body: &QueryBody,
1101 ctes: &CteContext,
1102) -> Result<QueryResult> {
1103 match exec_query_body_with_read(rtx, schema, body, ctes)? {
1104 ExecutionResult::Query(qr) => Ok(qr),
1105 _ => Ok(QueryResult {
1106 columns: vec![],
1107 rows: vec![],
1108 }),
1109 }
1110}
1111
1112pub(super) fn exec_query_body_write(
1113 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1114 schema: &SchemaManager,
1115 body: &QueryBody,
1116 ctes: &CteContext,
1117) -> Result<QueryResult> {
1118 match exec_query_body_in_txn(wtx, schema, body, ctes)? {
1119 ExecutionResult::Query(qr) => Ok(qr),
1120 _ => Ok(QueryResult {
1121 columns: vec![],
1122 rows: vec![],
1123 }),
1124 }
1125}
1126
1127pub(super) fn exec_compound_select_with_read(
1128 rtx: &mut ReadTxn<'_>,
1129 schema: &SchemaManager,
1130 comp: &CompoundSelect,
1131 ctes: &CteContext,
1132) -> Result<ExecutionResult> {
1133 let left_qr = match exec_query_body_with_read(rtx, schema, &comp.left, ctes)? {
1134 ExecutionResult::Query(qr) => qr,
1135 _ => QueryResult {
1136 columns: vec![],
1137 rows: vec![],
1138 },
1139 };
1140 let right_qr = match exec_query_body_with_read(rtx, schema, &comp.right, ctes)? {
1141 ExecutionResult::Query(qr) => qr,
1142 _ => QueryResult {
1143 columns: vec![],
1144 rows: vec![],
1145 },
1146 };
1147 apply_set_operation(comp, left_qr, right_qr)
1148}
1149
1150pub(super) fn exec_compound_select_in_txn(
1151 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1152 schema: &SchemaManager,
1153 comp: &CompoundSelect,
1154 ctes: &CteContext,
1155) -> Result<ExecutionResult> {
1156 let left_qr = match exec_query_body_in_txn(wtx, schema, &comp.left, ctes)? {
1157 ExecutionResult::Query(qr) => qr,
1158 _ => QueryResult {
1159 columns: vec![],
1160 rows: vec![],
1161 },
1162 };
1163 let right_qr = match exec_query_body_in_txn(wtx, schema, &comp.right, ctes)? {
1164 ExecutionResult::Query(qr) => qr,
1165 _ => QueryResult {
1166 columns: vec![],
1167 rows: vec![],
1168 },
1169 };
1170 apply_set_operation(comp, left_qr, right_qr)
1171}
1172
1173pub(super) fn apply_set_operation(
1174 comp: &CompoundSelect,
1175 left_qr: QueryResult,
1176 right_qr: QueryResult,
1177) -> Result<ExecutionResult> {
1178 if !left_qr.columns.is_empty()
1179 && !right_qr.columns.is_empty()
1180 && left_qr.columns.len() != right_qr.columns.len()
1181 {
1182 return Err(SqlError::CompoundColumnCountMismatch {
1183 left: left_qr.columns.len(),
1184 right: right_qr.columns.len(),
1185 });
1186 }
1187
1188 let columns = left_qr.columns;
1189
1190 let mut rows = match (&comp.op, comp.all) {
1191 (SetOp::Union, true) => {
1192 let total = left_qr.rows.len().saturating_add(right_qr.rows.len());
1193 let mut rows = Vec::with_capacity(total);
1194 rows.extend(left_qr.rows);
1195 rows.extend(right_qr.rows);
1196 rows
1197 }
1198 (SetOp::Union, false) => {
1199 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1200 let mut rows = Vec::new();
1201 for row in left_qr.rows.into_iter().chain(right_qr.rows) {
1202 if !seen.contains(&row) {
1203 seen.insert(row.clone());
1204 rows.push(row);
1205 }
1206 }
1207 rows
1208 }
1209 (SetOp::Intersect, true) => {
1210 let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1211 for row in &right_qr.rows {
1212 *right_counts.entry(row.clone()).or_insert(0) += 1;
1213 }
1214 let mut rows = Vec::new();
1215 for row in left_qr.rows {
1216 if let Some(count) = right_counts.get_mut(&row) {
1217 if *count > 0 {
1218 *count -= 1;
1219 rows.push(row);
1220 }
1221 }
1222 }
1223 rows
1224 }
1225 (SetOp::Intersect, false) => {
1226 let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1227 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1228 let mut rows = Vec::new();
1229 for row in left_qr.rows {
1230 if right_set.contains(&row) && !seen.contains(&row) {
1231 seen.insert(row.clone());
1232 rows.push(row);
1233 }
1234 }
1235 rows
1236 }
1237 (SetOp::Except, true) => {
1238 let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1239 for row in &right_qr.rows {
1240 *right_counts.entry(row.clone()).or_insert(0) += 1;
1241 }
1242 let mut rows = Vec::new();
1243 for row in left_qr.rows {
1244 if let Some(count) = right_counts.get_mut(&row) {
1245 if *count > 0 {
1246 *count -= 1;
1247 continue;
1248 }
1249 }
1250 rows.push(row);
1251 }
1252 rows
1253 }
1254 (SetOp::Except, false) => {
1255 let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1256 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1257 let mut rows = Vec::new();
1258 for row in left_qr.rows {
1259 if !right_set.contains(&row) && !seen.contains(&row) {
1260 seen.insert(row.clone());
1261 rows.push(row);
1262 }
1263 }
1264 rows
1265 }
1266 };
1267
1268 if !comp.order_by.is_empty() {
1269 let col_defs: Vec<crate::types::ColumnDef> = columns
1270 .iter()
1271 .enumerate()
1272 .map(|(i, name)| crate::types::ColumnDef {
1273 name: name.clone(),
1274 data_type: crate::types::DataType::Null,
1275 nullable: true,
1276 position: i as u16,
1277 default_expr: None,
1278 default_sql: None,
1279 check_expr: None,
1280 check_sql: None,
1281 check_name: None,
1282 is_with_timezone: false,
1283 generated_expr: None,
1284 generated_sql: None,
1285 generated_kind: None,
1286 collation: crate::types::Collation::Binary,
1287 })
1288 .collect();
1289 sort_rows(&mut rows, &comp.order_by, &col_defs)?;
1290 }
1291
1292 if let Some(ref offset_expr) = comp.offset {
1293 let offset = eval_const_int(offset_expr)?.max(0) as usize;
1294 if offset < rows.len() {
1295 rows = rows.split_off(offset);
1296 } else {
1297 rows.clear();
1298 }
1299 }
1300
1301 if let Some(ref limit_expr) = comp.limit {
1302 let limit = eval_const_int(limit_expr)?.max(0) as usize;
1303 rows.truncate(limit);
1304 }
1305
1306 Ok(ExecutionResult::Query(QueryResult { columns, rows }))
1307}
1308
1309struct InsertBufs {
1310 row: Vec<Value>,
1311 pk_values: Vec<Value>,
1312 value_values: Vec<Value>,
1313 key_buf: Vec<u8>,
1314 value_buf: Vec<u8>,
1315 col_indices: Vec<usize>,
1316 fk_key_buf: Vec<u8>,
1317}
1318
1319impl InsertBufs {
1320 fn new() -> Self {
1321 Self {
1322 row: Vec::new(),
1323 pk_values: Vec::new(),
1324 value_values: Vec::new(),
1325 key_buf: Vec::with_capacity(64),
1326 value_buf: Vec::with_capacity(256),
1327 col_indices: Vec::new(),
1328 fk_key_buf: Vec::with_capacity(64),
1329 }
1330 }
1331}
1332
1333thread_local! {
1334 static INSERT_SCRATCH: RefCell<InsertBufs> = RefCell::new(InsertBufs::new());
1335 static UPSERT_SCRATCH: RefCell<UpsertBufs> = RefCell::new(UpsertBufs::new());
1336}
1337
1338fn with_insert_scratch<R>(f: impl FnOnce(&mut InsertBufs) -> R) -> R {
1339 INSERT_SCRATCH.with(|slot| match slot.try_borrow_mut() {
1340 Ok(mut borrowed) => f(&mut borrowed),
1341 Err(_) => {
1342 let mut local = InsertBufs::new();
1343 f(&mut local)
1344 }
1345 })
1346}
1347
1348pub(super) struct UpsertBufs {
1349 old_row: Vec<Value>,
1350 new_row: Vec<Value>,
1351 value_values: Vec<Value>,
1352 new_value_buf: Vec<u8>,
1353}
1354
1355impl UpsertBufs {
1356 pub(super) fn new() -> Self {
1357 Self {
1358 old_row: Vec::new(),
1359 new_row: Vec::new(),
1360 value_values: Vec::new(),
1361 new_value_buf: Vec::with_capacity(256),
1362 }
1363 }
1364}
1365
1366pub fn exec_insert_in_txn(
1367 wtx: &mut WriteTxn<'_>,
1368 schema: &SchemaManager,
1369 stmt: &InsertStmt,
1370 params: &[Value],
1371) -> Result<ExecutionResult> {
1372 with_insert_scratch(|bufs| {
1373 exec_insert_in_txn_impl(
1374 wtx,
1375 schema,
1376 stmt,
1377 params,
1378 bufs,
1379 None,
1380 &CteContext::default(),
1381 )
1382 })
1383}
1384
1385pub(super) fn exec_insert_in_txn_with_ctes(
1386 wtx: &mut WriteTxn<'_>,
1387 schema: &SchemaManager,
1388 stmt: &InsertStmt,
1389 params: &[Value],
1390 outer_ctes: &CteContext,
1391) -> Result<ExecutionResult> {
1392 with_insert_scratch(|bufs| {
1393 exec_insert_in_txn_impl(wtx, schema, stmt, params, bufs, None, outer_ctes)
1394 })
1395}
1396
1397fn exec_insert_in_txn_cached(
1398 wtx: &mut WriteTxn<'_>,
1399 schema: &SchemaManager,
1400 stmt: &InsertStmt,
1401 params: &[Value],
1402 cache: &InsertCache,
1403) -> Result<ExecutionResult> {
1404 with_insert_scratch(|bufs| {
1405 exec_insert_in_txn_impl(
1406 wtx,
1407 schema,
1408 stmt,
1409 params,
1410 bufs,
1411 Some(cache),
1412 &CteContext::default(),
1413 )
1414 })
1415}
1416
1417fn exec_insert_in_txn_impl(
1418 wtx: &mut WriteTxn<'_>,
1419 schema: &SchemaManager,
1420 stmt: &InsertStmt,
1421 params: &[Value],
1422 bufs: &mut InsertBufs,
1423 cache: Option<&InsertCache>,
1424 outer_ctes: &CteContext,
1425) -> Result<ExecutionResult> {
1426 let empty_ctes = CteContext::default();
1427 let materialized;
1428 let has_sub = match cache {
1429 Some(c) => c.has_subquery,
1430 None => insert_has_subquery(stmt),
1431 };
1432 let stmt = if has_sub {
1433 materialized = materialize_insert(stmt, &mut |sub| {
1434 exec_subquery_write(wtx, schema, sub, &empty_ctes)
1435 })?;
1436 &materialized
1437 } else {
1438 stmt
1439 };
1440
1441 let view_lookup_key = stmt.table.to_ascii_lowercase();
1442 if let Some(view_def) = schema.get_view(&view_lookup_key) {
1443 if super::triggers::has_instead_of(
1444 schema,
1445 &view_lookup_key,
1446 super::triggers::FireEvent::Insert,
1447 ) {
1448 let aliases = view_def.column_aliases.clone();
1449 return exec_instead_of_view_insert_in_txn(
1450 wtx,
1451 schema,
1452 &view_lookup_key,
1453 &aliases,
1454 stmt,
1455 params,
1456 );
1457 }
1458 return Err(SqlError::CannotModifyView(stmt.table.clone()));
1459 }
1460
1461 let table_schema = schema
1462 .get(&stmt.table)
1463 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1464 if table_schema.has_ann_index() {
1465 super::ann_persist::purge_segment(wtx, &table_schema.name)?;
1466 }
1467
1468 let default_columns;
1469 let insert_columns: &[String] = if stmt.columns.is_empty() {
1470 default_columns = table_schema
1471 .columns
1472 .iter()
1473 .map(|c| c.name.clone())
1474 .collect::<Vec<_>>();
1475 &default_columns
1476 } else {
1477 &stmt.columns
1478 };
1479
1480 bufs.col_indices.clear();
1481 if let Some(c) = cache {
1482 bufs.col_indices.extend_from_slice(&c.col_indices);
1483 } else {
1484 for name in insert_columns {
1485 bufs.col_indices.push(
1486 table_schema
1487 .column_index(name)
1488 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?,
1489 );
1490 }
1491 }
1492
1493 if cache.is_none() {
1494 for &ci in &bufs.col_indices {
1495 if table_schema.columns[ci].generated_kind.is_some() {
1496 return Err(SqlError::CannotInsertIntoGeneratedColumn(
1497 table_schema.columns[ci].name.clone(),
1498 ));
1499 }
1500 }
1501 }
1502
1503 let generated_cols_uncached: Vec<(usize, &Expr, FastGenEval)>;
1504 let cached_gen_positions: &[usize];
1505 let cached_gen_fast_evals: &[FastGenEval];
1506 if let Some(c) = cache {
1507 cached_gen_positions = &c.generated_col_positions;
1508 cached_gen_fast_evals = &c.generated_fast_evals;
1509 generated_cols_uncached = Vec::new();
1510 } else {
1511 cached_gen_positions = &[];
1512 cached_gen_fast_evals = &[];
1513 generated_cols_uncached = table_schema
1514 .columns
1515 .iter()
1516 .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
1517 .map(|c| {
1518 let expr = c.generated_expr.as_ref().unwrap();
1519 let fe = detect_fast_gen_eval(expr, table_schema);
1520 (c.position as usize, expr, fe)
1521 })
1522 .collect();
1523 }
1524 let has_gen_cols = !cached_gen_positions.is_empty() || !generated_cols_uncached.is_empty();
1525 let row_col_map_for_gen_owned: Option<ColumnMap> = if !has_gen_cols || cache.is_some() {
1526 None
1527 } else {
1528 Some(ColumnMap::new(&table_schema.columns))
1529 };
1530 let row_col_map_for_gen: Option<&ColumnMap> = if !has_gen_cols {
1531 None
1532 } else if let Some(c) = cache {
1533 c.row_col_map.as_ref()
1534 } else {
1535 row_col_map_for_gen_owned.as_ref()
1536 };
1537
1538 let any_defaults = match cache {
1539 Some(c) => c.any_defaults,
1540 None => table_schema
1541 .columns
1542 .iter()
1543 .any(|c| c.default_expr.is_some()),
1544 };
1545 let defaults: Vec<(usize, &Expr)> = if any_defaults {
1546 table_schema
1547 .columns
1548 .iter()
1549 .filter(|c| {
1550 c.default_expr.is_some() && !bufs.col_indices.contains(&(c.position as usize))
1551 })
1552 .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
1553 .collect()
1554 } else {
1555 Vec::new()
1556 };
1557
1558 let has_checks = match cache {
1559 Some(c) => c.has_checks,
1560 None => table_schema.has_checks(),
1561 };
1562 let check_col_map = if has_checks {
1563 Some(ColumnMap::new(&table_schema.columns))
1564 } else {
1565 None
1566 };
1567
1568 let (pk_indices, non_pk, enc_pos, phys_count, dropped): (
1569 &[usize],
1570 &[usize],
1571 &[u16],
1572 usize,
1573 &[u16],
1574 ) = if let Some(c) = cache {
1575 (
1576 &c.pk_indices,
1577 &c.non_pk_indices,
1578 &c.encoding_positions,
1579 c.phys_count,
1580 &c.dropped_non_pk_slots,
1581 )
1582 } else {
1583 (
1584 table_schema.pk_indices(),
1585 table_schema.non_pk_indices(),
1586 table_schema.encoding_positions(),
1587 table_schema.physical_non_pk_count(),
1588 table_schema.dropped_non_pk_slots(),
1589 )
1590 };
1591
1592 bufs.row.resize(table_schema.columns.len(), Value::Null);
1593 bufs.pk_values.resize(pk_indices.len(), Value::Null);
1594 bufs.value_values.resize(phys_count, Value::Null);
1595
1596 let table_bytes = table_schema.name.as_bytes();
1597 let has_fks = !table_schema.foreign_keys.is_empty();
1598 let has_indices = !table_schema.indices.is_empty();
1599 let has_defaults = !defaults.is_empty();
1600
1601 let compiled_conflict: Option<Arc<CompiledOnConflict>> = match (cache, &stmt.on_conflict) {
1602 (Some(c), Some(_)) if c.on_conflict.is_some() => c.on_conflict.clone(),
1603 (_, Some(oc)) => Some(Arc::new(compile_on_conflict(oc, table_schema)?)),
1604 (_, None) => None,
1605 };
1606
1607 let row_col_map_owned: Option<ColumnMap> =
1608 if compiled_conflict.is_some() && cache.and_then(|c| c.row_col_map.as_ref()).is_none() {
1609 Some(ColumnMap::new(&table_schema.columns))
1610 } else {
1611 None
1612 };
1613 let row_col_map: Option<&ColumnMap> = cache
1614 .and_then(|c| c.row_col_map.as_ref())
1615 .or(row_col_map_owned.as_ref());
1616
1617 let select_rows = match &stmt.source {
1618 InsertSource::Select(sq) => {
1619 let insert_ctes = super::materialize_all_ctes_with_outer(
1620 &sq.ctes,
1621 sq.recursive,
1622 outer_ctes,
1623 &mut |body, ctx| exec_query_body_write(wtx, schema, body, ctx),
1624 )?;
1625 let qr = exec_query_body_write(wtx, schema, &sq.body, &insert_ctes)?;
1626 Some(qr.rows)
1627 }
1628 InsertSource::Values(_) => None,
1629 };
1630
1631 let mut count: u64 = 0;
1632 let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
1633 stmt.returning.as_ref().map(|_| Vec::new());
1634
1635 let plain_insert = compiled_conflict.is_none();
1636 let single_int_pk = is_single_int_pk(table_schema);
1637 let mut min_inserted_pk: Option<i64> = None;
1638
1639 let values = match &stmt.source {
1640 InsertSource::Values(rows) => Some(rows.as_slice()),
1641 InsertSource::Select(_) => None,
1642 };
1643 let sel_rows = select_rows.as_deref();
1644
1645 let total = match (values, sel_rows) {
1646 (Some(rows), _) => rows.len(),
1647 (_, Some(rows)) => rows.len(),
1648 _ => 0,
1649 };
1650
1651 if let Some(sel) = sel_rows {
1652 if !sel.is_empty() && sel[0].len() != insert_columns.len() {
1653 return Err(SqlError::InvalidValue(format!(
1654 "INSERT ... SELECT column count mismatch: expected {}, got {}",
1655 insert_columns.len(),
1656 sel[0].len()
1657 )));
1658 }
1659 }
1660
1661 let has_insert_statement_triggers_impl =
1662 schema.triggers_for(&table_schema.name).iter().any(|t| {
1663 t.enabled
1664 && t.granularity == crate::parser::TriggerGranularity::ForEachStatement
1665 && t.events
1666 .iter()
1667 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1668 });
1669 let mut stmt_new_rows_impl: Vec<Vec<Value>> = if has_insert_statement_triggers_impl {
1670 Vec::with_capacity(total)
1671 } else {
1672 Vec::new()
1673 };
1674 if has_insert_statement_triggers_impl {
1675 super::triggers::fire_statement_triggers(
1676 wtx,
1677 schema,
1678 &table_schema.name,
1679 crate::parser::TriggerTiming::Before,
1680 super::triggers::FireEvent::Insert,
1681 &table_schema.columns,
1682 &[],
1683 &[],
1684 )?;
1685 }
1686
1687 let (has_before_insert_triggers, has_after_insert_triggers, has_after_update_triggers) =
1688 row_insert_trigger_flags(schema, &table_schema.name);
1689
1690 let skip_row_clear = cache.is_some_and(|c| c.row_fully_overwritten);
1691 for idx in 0..total {
1692 if !skip_row_clear {
1693 for v in bufs.row.iter_mut() {
1694 *v = Value::Null;
1695 }
1696 }
1697
1698 if let Some(value_rows) = values {
1699 if let Some(plan) = cache.and_then(|c| c.bind_plan.as_ref()) {
1700 for action in plan {
1701 match action {
1702 BindAction::Param {
1703 param_idx,
1704 col_idx,
1705 target,
1706 } => {
1707 let v = ¶ms[*param_idx];
1708 bufs.row[*col_idx] = if v.is_null() {
1709 Value::Null
1710 } else if v.data_type() == *target {
1711 v.clone()
1712 } else {
1713 let got = v.data_type();
1714 v.clone().coerce_into(*target).ok_or_else(|| {
1715 SqlError::TypeMismatch {
1716 expected: target.to_string(),
1717 got: got.to_string(),
1718 }
1719 })?
1720 };
1721 }
1722 BindAction::Literal { value, col_idx } => {
1723 bufs.row[*col_idx] = value.clone();
1724 }
1725 }
1726 }
1727 } else {
1728 let value_row = &value_rows[idx];
1729 if value_row.len() != insert_columns.len() {
1730 return Err(SqlError::InvalidValue(format!(
1731 "expected {} values, got {}",
1732 insert_columns.len(),
1733 value_row.len()
1734 )));
1735 }
1736 for (i, expr) in value_row.iter().enumerate() {
1737 let val = match expr {
1738 Expr::Parameter(n) => params
1739 .get(n - 1)
1740 .cloned()
1741 .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
1742 Expr::Literal(v) => v.clone(),
1743 _ => eval_const_expr(expr)?,
1744 };
1745 let col_idx = bufs.col_indices[i];
1746 let col = &table_schema.columns[col_idx];
1747 let got_type = val.data_type();
1748 bufs.row[col_idx] = if val.is_null() {
1749 Value::Null
1750 } else {
1751 val.coerce_into(col.data_type)
1752 .ok_or_else(|| SqlError::TypeMismatch {
1753 expected: col.data_type.to_string(),
1754 got: got_type.to_string(),
1755 })?
1756 };
1757 }
1758 }
1759 } else if let Some(sel) = sel_rows {
1760 let sel_row = &sel[idx];
1761 for (i, val) in sel_row.iter().enumerate() {
1762 let col_idx = bufs.col_indices[i];
1763 let col = &table_schema.columns[col_idx];
1764 let got_type = val.data_type();
1765 bufs.row[col_idx] = if val.is_null() {
1766 Value::Null
1767 } else {
1768 val.clone().coerce_into(col.data_type).ok_or_else(|| {
1769 SqlError::TypeMismatch {
1770 expected: col.data_type.to_string(),
1771 got: got_type.to_string(),
1772 }
1773 })?
1774 };
1775 }
1776 }
1777
1778 if has_defaults {
1779 for &(pos, def_expr) in &defaults {
1780 let val = eval_const_expr(def_expr)?;
1781 let col = &table_schema.columns[pos];
1782 if !val.is_null() {
1783 let got_type = val.data_type();
1784 bufs.row[pos] =
1785 val.coerce_into(col.data_type)
1786 .ok_or_else(|| SqlError::TypeMismatch {
1787 expected: col.data_type.to_string(),
1788 got: got_type.to_string(),
1789 })?;
1790 }
1791 }
1792 }
1793
1794 if let Some(gen_map) = row_col_map_for_gen {
1795 if cache.is_some() {
1796 for (pos, fast) in cached_gen_positions
1797 .iter()
1798 .copied()
1799 .zip(cached_gen_fast_evals.iter())
1800 {
1801 let gen_expr = table_schema.columns[pos].generated_expr.as_ref().unwrap();
1802 let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1803 let col = &table_schema.columns[pos];
1804 bufs.row[pos] = if val.is_null() {
1805 Value::Null
1806 } else {
1807 let got_type = val.data_type();
1808 val.coerce_into(col.data_type)
1809 .ok_or_else(|| SqlError::TypeMismatch {
1810 expected: col.data_type.to_string(),
1811 got: got_type.to_string(),
1812 })?
1813 };
1814 }
1815 } else {
1816 for (pos, gen_expr, fast) in &generated_cols_uncached {
1817 let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1818 let col = &table_schema.columns[*pos];
1819 bufs.row[*pos] = if val.is_null() {
1820 Value::Null
1821 } else {
1822 let got_type = val.data_type();
1823 val.coerce_into(col.data_type)
1824 .ok_or_else(|| SqlError::TypeMismatch {
1825 expected: col.data_type.to_string(),
1826 got: got_type.to_string(),
1827 })?
1828 };
1829 }
1830 }
1831 }
1832
1833 if let Some(c) = cache {
1834 for &pos in &c.not_null_indices {
1835 if bufs.row[pos as usize].is_null() {
1836 return Err(SqlError::NotNullViolation(
1837 table_schema.columns[pos as usize].name.clone(),
1838 ));
1839 }
1840 }
1841 } else {
1842 for col in &table_schema.columns {
1843 if !col.nullable && bufs.row[col.position as usize].is_null() {
1844 return Err(SqlError::NotNullViolation(col.name.clone()));
1845 }
1846 }
1847 }
1848
1849 if let Some(ref col_map) = check_col_map {
1850 for col in &table_schema.columns {
1851 if let Some(ref check) = col.check_expr {
1852 let result = eval_expr(check, &EvalCtx::new(col_map, &bufs.row))?;
1853 if !is_truthy(&result) && !result.is_null() {
1854 let name = col.check_name.as_deref().unwrap_or(&col.name);
1855 return Err(SqlError::CheckViolation(name.to_string()));
1856 }
1857 }
1858 }
1859 for tc in &table_schema.check_constraints {
1860 let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &bufs.row))?;
1861 if !is_truthy(&result) && !result.is_null() {
1862 let name = tc.name.as_deref().unwrap_or(&tc.sql);
1863 return Err(SqlError::CheckViolation(name.to_string()));
1864 }
1865 }
1866 }
1867
1868 if has_fks {
1869 for fk in &table_schema.foreign_keys {
1870 let any_null = fk.columns.iter().any(|&ci| bufs.row[ci as usize].is_null());
1871 if any_null {
1872 continue;
1873 }
1874 crate::encoding::encode_composite_key_from_indices(
1875 &fk.columns,
1876 &bufs.row,
1877 &mut bufs.fk_key_buf,
1878 );
1879 if fk.deferrable && fk.initially_deferred {
1880 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
1881 wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
1882 fk_name: name,
1883 foreign_table: fk.foreign_table.as_bytes().to_vec(),
1884 parent_key: bufs.fk_key_buf.clone(),
1885 });
1886 continue;
1887 }
1888 if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &bufs.fk_key_buf) {
1889 let found = wtx
1890 .table_get(fk.foreign_table.as_bytes(), &bufs.fk_key_buf)
1891 .map_err(SqlError::Storage)?;
1892 if found.is_none() {
1893 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
1894 return Err(SqlError::ForeignKeyViolation(name.to_string()));
1895 }
1896 wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &bufs.fk_key_buf);
1897 }
1898 }
1899 }
1900
1901 let proposed_row_for_returning: Option<Vec<Value>> =
1902 returning_rows.as_ref().map(|_| bufs.row.clone());
1903 let row_for_stmt_trigger_impl: Option<Vec<Value>> = if has_insert_statement_triggers_impl {
1904 Some(bufs.row.clone())
1905 } else {
1906 None
1907 };
1908
1909 if has_before_insert_triggers {
1910 super::triggers::fire_row_triggers(
1911 wtx,
1912 schema,
1913 &table_schema.name,
1914 crate::parser::TriggerTiming::Before,
1915 super::triggers::FireEvent::Insert,
1916 None,
1917 Some(bufs.row.clone()),
1918 &table_schema.columns,
1919 )?;
1920 }
1921
1922 for (j, &i) in pk_indices.iter().enumerate() {
1923 bufs.pk_values[j] = std::mem::replace(&mut bufs.row[i], Value::Null);
1924 }
1925 match cache.map(|c| c.single_int_pk).unwrap_or(false) {
1926 true => match bufs.pk_values[0] {
1927 Value::Integer(v) => crate::encoding::encode_int_key_into(v, &mut bufs.key_buf),
1928 _ => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1929 },
1930 false => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1931 }
1932 if plain_insert && single_int_pk {
1933 if let Value::Integer(id) = &bufs.pk_values[0] {
1934 min_inserted_pk = Some(min_inserted_pk.map_or(*id, |m| m.min(*id)));
1935 }
1936 }
1937
1938 for &slot in dropped {
1939 bufs.value_values[slot as usize] = Value::Null;
1940 }
1941 for (j, &i) in non_pk.iter().enumerate() {
1942 let col = &table_schema.columns[i];
1943 if matches!(
1944 col.generated_kind,
1945 Some(crate::parser::GeneratedKind::Virtual)
1946 ) {
1947 bufs.value_values[enc_pos[j] as usize] = Value::Null;
1948 bufs.row[i] = Value::Null;
1949 } else {
1950 bufs.value_values[enc_pos[j] as usize] =
1951 std::mem::replace(&mut bufs.row[i], Value::Null);
1952 }
1953 }
1954 match cache.and_then(|c| c.row_encoder.as_ref()) {
1955 Some(tmpl) => crate::encoding::encode_row_with_template(
1956 tmpl,
1957 &bufs.value_values,
1958 &mut bufs.value_buf,
1959 )?,
1960 None => encode_row_into(&bufs.value_values, &mut bufs.value_buf),
1961 }
1962
1963 if bufs.key_buf.len() > citadel_core::MAX_KEY_SIZE {
1964 return Err(SqlError::KeyTooLarge {
1965 size: bufs.key_buf.len(),
1966 max: citadel_core::MAX_KEY_SIZE,
1967 });
1968 }
1969 if bufs.value_buf.len() > citadel_core::MAX_VALUE_SIZE {
1970 return Err(SqlError::RowTooLarge {
1971 size: bufs.value_buf.len(),
1972 max: citadel_core::MAX_VALUE_SIZE,
1973 });
1974 }
1975
1976 match compiled_conflict.as_ref() {
1977 None => {
1978 let is_new = wtx
1979 .table_insert(table_bytes, &bufs.key_buf, &bufs.value_buf)
1980 .map_err(SqlError::Storage)?;
1981 if !is_new {
1982 return Err(SqlError::DuplicateKey);
1983 }
1984 if has_indices || has_after_insert_triggers {
1985 for (j, &i) in pk_indices.iter().enumerate() {
1986 bufs.row[i] = bufs.pk_values[j].clone();
1987 }
1988 for (j, &i) in non_pk.iter().enumerate() {
1989 bufs.row[i] = std::mem::replace(
1990 &mut bufs.value_values[enc_pos[j] as usize],
1991 Value::Null,
1992 );
1993 }
1994 if has_indices {
1995 insert_index_entries(wtx, table_schema, &bufs.row, &bufs.pk_values)?;
1996 }
1997 if has_after_insert_triggers {
1998 super::triggers::fire_row_triggers(
1999 wtx,
2000 schema,
2001 &table_schema.name,
2002 crate::parser::TriggerTiming::After,
2003 super::triggers::FireEvent::Insert,
2004 None,
2005 Some(bufs.row.clone()),
2006 &table_schema.columns,
2007 )?;
2008 }
2009 }
2010 if let Some(r) = row_for_stmt_trigger_impl.clone() {
2011 stmt_new_rows_impl.push(r);
2012 }
2013 count += 1;
2014 if let Some(buf) = returning_rows.as_mut() {
2015 buf.push((None, proposed_row_for_returning));
2016 }
2017 }
2018 Some(oc) => {
2019 let oc_ref: &CompiledOnConflict = oc;
2020 let needs_row = upsert_needs_row(oc_ref, table_schema);
2021 if needs_row {
2022 for (j, &i) in pk_indices.iter().enumerate() {
2023 bufs.row[i] = bufs.pk_values[j].clone();
2024 }
2025 for (j, &i) in non_pk.iter().enumerate() {
2026 bufs.row[i] = std::mem::replace(
2027 &mut bufs.value_values[enc_pos[j] as usize],
2028 Value::Null,
2029 );
2030 }
2031 }
2032 let outcome = apply_insert_with_conflict(
2033 wtx,
2034 table_schema,
2035 &bufs.key_buf,
2036 &bufs.value_buf,
2037 &bufs.row,
2038 &bufs.pk_values,
2039 oc_ref,
2040 row_col_map.unwrap(),
2041 stmt.returning.is_some(),
2042 )?;
2043 match outcome {
2044 InsertRowOutcome::Inserted => {
2045 count += 1;
2046 if let Some(buf) = returning_rows.as_mut() {
2047 buf.push((None, proposed_row_for_returning));
2048 }
2049 if let Some(r) = row_for_stmt_trigger_impl.clone() {
2050 stmt_new_rows_impl.push(r);
2051 }
2052 if has_after_insert_triggers {
2053 super::triggers::fire_row_triggers(
2054 wtx,
2055 schema,
2056 &table_schema.name,
2057 crate::parser::TriggerTiming::After,
2058 super::triggers::FireEvent::Insert,
2059 None,
2060 Some(bufs.row.clone()),
2061 &table_schema.columns,
2062 )?;
2063 }
2064 }
2065 InsertRowOutcome::Updated { old, new } => {
2066 count += 1;
2067 if let Some(buf) = returning_rows.as_mut() {
2068 buf.push((Some(old.clone()), Some(new.clone())));
2069 }
2070 if has_after_update_triggers {
2071 let changed_cols: Vec<String> = match oc_ref {
2072 CompiledOnConflict::DoUpdate { assignments, .. } => assignments
2073 .iter()
2074 .map(|(col_idx, _)| table_schema.columns[*col_idx].name.clone())
2075 .collect(),
2076 _ => Vec::new(),
2077 };
2078 super::triggers::fire_row_triggers(
2079 wtx,
2080 schema,
2081 &table_schema.name,
2082 crate::parser::TriggerTiming::After,
2083 super::triggers::FireEvent::Update {
2084 changed_columns: &changed_cols,
2085 },
2086 Some(old),
2087 Some(new),
2088 &table_schema.columns,
2089 )?;
2090 }
2091 }
2092 InsertRowOutcome::Skipped => {}
2093 }
2094 }
2095 }
2096 }
2097
2098 mark_insert_dml(
2099 schema,
2100 &table_schema.name,
2101 !plain_insert,
2102 single_int_pk,
2103 min_inserted_pk,
2104 count,
2105 );
2106
2107 if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
2108 if has_insert_statement_triggers_impl {
2109 super::triggers::fire_statement_triggers(
2110 wtx,
2111 schema,
2112 &table_schema.name,
2113 crate::parser::TriggerTiming::After,
2114 super::triggers::FireEvent::Insert,
2115 &table_schema.columns,
2116 &[],
2117 &stmt_new_rows_impl,
2118 )?;
2119 }
2120 return Ok(ExecutionResult::Query(super::helpers::project_returning(
2121 table_schema,
2122 returning_cols,
2123 &rows,
2124 )?));
2125 }
2126
2127 if has_insert_statement_triggers_impl {
2128 super::triggers::fire_statement_triggers(
2129 wtx,
2130 schema,
2131 &table_schema.name,
2132 crate::parser::TriggerTiming::After,
2133 super::triggers::FireEvent::Insert,
2134 &table_schema.columns,
2135 &[],
2136 &stmt_new_rows_impl,
2137 )?;
2138 }
2139
2140 Ok(ExecutionResult::RowsAffected(count))
2141}
2142
2143pub struct CompiledInsert {
2144 table_lower: String,
2145 cached: Option<InsertCache>,
2146}
2147
2148struct InsertCache {
2149 col_indices: Vec<usize>,
2150 has_subquery: bool,
2151 any_defaults: bool,
2152 has_checks: bool,
2153 on_conflict: Option<Arc<CompiledOnConflict>>,
2154 row_col_map: Option<ColumnMap>,
2155 generated_col_positions: Vec<usize>,
2156 generated_fast_evals: Vec<FastGenEval>,
2157 pk_indices: Vec<usize>,
2158 non_pk_indices: Vec<usize>,
2159 encoding_positions: Vec<u16>,
2160 dropped_non_pk_slots: Vec<u16>,
2161 phys_count: usize,
2162 single_int_pk: bool,
2163 not_null_indices: Vec<u16>,
2164 bind_plan: Option<Vec<BindAction>>,
2165 row_fully_overwritten: bool,
2166 row_encoder: Option<crate::encoding::RowTemplate>,
2167 is_trivial_fast: bool,
2168 trivial_fast_program: Option<TrivialFastProgram>,
2169 needs_scoped_params: bool,
2170}
2171
2172#[derive(Clone)]
2173enum BindAction {
2174 Param {
2175 param_idx: usize,
2176 col_idx: usize,
2177 target: DataType,
2178 },
2179 Literal {
2180 value: Value,
2181 col_idx: usize,
2182 },
2183}
2184
2185#[derive(Clone)]
2186struct TrivialFastProgram {
2187 template: Vec<u8>,
2188 ops: Vec<WriteOp>,
2189 pk_param: u8,
2190 not_null_param_indices: Vec<u8>,
2191 fk_checks: Vec<FkCheckSpec>,
2192 index_inserts: Vec<IndexInsertSpec>,
2193 on_dup: DupPolicy,
2194}
2195
2196#[derive(Clone, Copy, PartialEq, Eq)]
2198enum DupPolicy {
2199 Error,
2200 Skip,
2201}
2202
2203#[derive(Clone)]
2205struct FkCheckSpec {
2206 foreign_table: Vec<u8>,
2207 col_params: Vec<u8>,
2208}
2209
2210#[derive(Clone)]
2212struct IndexInsertSpec {
2213 table: Vec<u8>,
2214 key_params: Vec<(u8, crate::types::Collation)>,
2215}
2216
2217#[derive(Clone)]
2218enum WriteOp {
2219 ParamI64 {
2220 param_idx: u8,
2221 off: u32,
2222 },
2223 LiteralI64 {
2224 value: i64,
2225 off: u32,
2226 },
2227 GenAddParamsI64 {
2228 a_param: u8,
2229 b_param: u8,
2230 off: u32,
2231 bitmap_byte_off: u32,
2232 bitmap_bit_mask: u8,
2233 },
2234 GenMulAddParamI64 {
2235 param_idx: u8,
2236 mul: i64,
2237 add: i64,
2238 off: u32,
2239 bitmap_byte_off: u32,
2240 bitmap_bit_mask: u8,
2241 },
2242}
2243
2244fn build_trivial_fast_program(
2245 bind_plan: &[BindAction],
2246 phys_count: usize,
2247 non_virtual_pairs: &[(usize, usize)],
2248 generated_col_positions: &[usize],
2249 generated_fast_evals: &[FastGenEval],
2250 ts: &TableSchema,
2251 on_conflict: Option<&CompiledOnConflict>,
2252) -> Option<TrivialFastProgram> {
2253 let columns = &ts.columns;
2254 let pk_col = ts.pk_indices()[0];
2255
2256 let on_dup = match on_conflict {
2258 None => DupPolicy::Error,
2259 Some(CompiledOnConflict::DoNothing { target })
2260 if matches!(target, None | Some(ConflictKind::PrimaryKey))
2261 && ts.indices.is_empty()
2262 && ts.foreign_keys.is_empty() =>
2263 {
2264 DupPolicy::Skip
2265 }
2266 _ => return None,
2267 };
2268
2269 let mut col_to_bind: rustc_hash::FxHashMap<usize, &BindAction> = Default::default();
2270 for action in bind_plan {
2271 let col = match action {
2272 BindAction::Param { col_idx, .. } | BindAction::Literal { col_idx, .. } => *col_idx,
2273 };
2274 col_to_bind.insert(col, action);
2275 }
2276
2277 let mut slots: Vec<crate::encoding::TemplateSlot> = (0..phys_count)
2279 .map(|_| crate::encoding::TemplateSlot::Null)
2280 .collect();
2281 for &(col, slot) in non_virtual_pairs {
2282 slots[slot] = match col_to_bind.get(&col) {
2283 Some(BindAction::Literal { value, .. }) => {
2284 crate::encoding::TemplateSlot::Const(value.clone())
2285 }
2286 Some(BindAction::Param { target, .. }) => {
2287 if *target != DataType::Integer {
2288 return None;
2289 }
2290 crate::encoding::TemplateSlot::IntHole
2291 }
2292 None => {
2293 if columns[col].data_type != DataType::Integer {
2294 return None;
2295 }
2296 crate::encoding::TemplateSlot::IntHole
2297 }
2298 };
2299 }
2300 let tmpl = crate::encoding::build_row_template(phys_count, &slots);
2301 let col_to_slot: rustc_hash::FxHashMap<usize, usize> =
2302 non_virtual_pairs.iter().copied().collect();
2303 let slot_to_off: rustc_hash::FxHashMap<usize, usize> =
2304 tmpl.slot_offsets.iter().copied().collect();
2305
2306 let mut col_to_param: rustc_hash::FxHashMap<usize, u8> = Default::default();
2307 let mut col_to_lit_int: rustc_hash::FxHashMap<usize, i64> = Default::default();
2308 let mut pk_param: Option<u8> = None;
2309 let mut ops: Vec<WriteOp> = Vec::with_capacity(bind_plan.len() + generated_col_positions.len());
2310 let mut not_null_param_indices: Vec<u8> = Vec::new();
2311
2312 for action in bind_plan {
2313 match action {
2314 BindAction::Param {
2315 param_idx,
2316 col_idx,
2317 target,
2318 } => {
2319 if *target != DataType::Integer {
2320 return None;
2321 }
2322 let pi: u8 = u8::try_from(*param_idx).ok()?;
2323 col_to_param.insert(*col_idx, pi);
2324 if *col_idx == pk_col {
2325 pk_param = Some(pi);
2326 } else {
2327 let slot = *col_to_slot.get(col_idx)?;
2328 let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
2329 ops.push(WriteOp::ParamI64 { param_idx: pi, off });
2330 if !columns[*col_idx].nullable {
2331 not_null_param_indices.push(pi);
2332 }
2333 }
2334 }
2335 BindAction::Literal { value, col_idx } => {
2337 if *col_idx == pk_col {
2338 return None;
2339 }
2340 if let Value::Integer(v) = value {
2341 col_to_lit_int.insert(*col_idx, *v);
2342 }
2343 }
2344 }
2345 }
2346
2347 let pk_param = pk_param?;
2348
2349 for (i, &gen_pos) in generated_col_positions.iter().enumerate() {
2350 let gen_slot = *col_to_slot.get(&gen_pos)?;
2351 let gen_off = u32::try_from(*slot_to_off.get(&gen_slot)?).ok()?;
2352 let bitmap_byte_off = u32::try_from(2 + gen_slot / 8).ok()?;
2353 let bitmap_bit_mask: u8 = 1u8 << (gen_slot % 8);
2354 let gen_col_nullable = columns[gen_pos].nullable;
2355
2356 match &generated_fast_evals[i] {
2357 FastGenEval::IntColAddCol {
2358 left_idx,
2359 right_idx,
2360 } => {
2361 let a_param = col_to_param.get(left_idx).copied();
2362 let b_param = col_to_param.get(right_idx).copied();
2363 match (a_param, b_param) {
2364 (Some(ap), Some(bp)) => {
2365 let deps_safe = gen_col_nullable
2366 || (not_null_param_indices.contains(&ap)
2367 && not_null_param_indices.contains(&bp));
2368 if !deps_safe {
2369 return None;
2370 }
2371 ops.push(WriteOp::GenAddParamsI64 {
2372 a_param: ap,
2373 b_param: bp,
2374 off: gen_off,
2375 bitmap_byte_off,
2376 bitmap_bit_mask,
2377 });
2378 }
2379 (Some(p), None) => {
2380 let lit = col_to_lit_int.get(right_idx).copied()?;
2381 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2382 return None;
2383 }
2384 ops.push(WriteOp::GenMulAddParamI64 {
2385 param_idx: p,
2386 mul: 1,
2387 add: lit,
2388 off: gen_off,
2389 bitmap_byte_off,
2390 bitmap_bit_mask,
2391 });
2392 }
2393 (None, Some(p)) => {
2394 let lit = col_to_lit_int.get(left_idx).copied()?;
2395 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2396 return None;
2397 }
2398 ops.push(WriteOp::GenMulAddParamI64 {
2399 param_idx: p,
2400 mul: 1,
2401 add: lit,
2402 off: gen_off,
2403 bitmap_byte_off,
2404 bitmap_bit_mask,
2405 });
2406 }
2407 (None, None) => {
2408 let la = col_to_lit_int.get(left_idx).copied()?;
2409 let lb = col_to_lit_int.get(right_idx).copied()?;
2410 ops.push(WriteOp::LiteralI64 {
2411 value: la.wrapping_add(lb),
2412 off: gen_off,
2413 });
2414 }
2415 }
2416 }
2417 FastGenEval::IntColMulAdd {
2418 col_schema_idx,
2419 mul,
2420 add,
2421 } => {
2422 if let Some(p) = col_to_param.get(col_schema_idx).copied() {
2423 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2424 return None;
2425 }
2426 ops.push(WriteOp::GenMulAddParamI64 {
2427 param_idx: p,
2428 mul: *mul,
2429 add: *add,
2430 off: gen_off,
2431 bitmap_byte_off,
2432 bitmap_bit_mask,
2433 });
2434 } else if let Some(lit) = col_to_lit_int.get(col_schema_idx).copied() {
2435 ops.push(WriteOp::LiteralI64 {
2436 value: lit.wrapping_mul(*mul).wrapping_add(*add),
2437 off: gen_off,
2438 });
2439 } else {
2440 return None;
2441 }
2442 }
2443 FastGenEval::None => return None,
2444 }
2445 }
2446
2447 let mut fk_checks: Vec<FkCheckSpec> = Vec::with_capacity(ts.foreign_keys.len());
2448 for fk in &ts.foreign_keys {
2449 if fk.deferrable && fk.initially_deferred {
2450 return None;
2451 }
2452 let mut col_params = Vec::with_capacity(fk.columns.len());
2453 for &c in &fk.columns {
2454 col_params.push(col_to_param.get(&(c as usize)).copied()?);
2455 }
2456 fk_checks.push(FkCheckSpec {
2457 foreign_table: fk.foreign_table.as_bytes().to_vec(),
2458 col_params,
2459 });
2460 }
2461
2462 let mut index_inserts: Vec<IndexInsertSpec> = Vec::with_capacity(ts.indices.len());
2463 for idx in &ts.indices {
2464 if idx.unique
2465 || !idx.is_pure_column_index()
2466 || idx.predicate_expr.is_some()
2467 || idx.predicate_sql.is_some()
2468 {
2469 return None;
2470 }
2471 let mut key_params = Vec::with_capacity(idx.keys.len());
2472 for (i, key) in idx.keys.iter().enumerate() {
2473 let crate::types::IndexKey::Column { idx: col_idx, .. } = key else {
2474 return None;
2475 };
2476 key_params.push((
2477 col_to_param.get(&(*col_idx as usize)).copied()?,
2478 idx.collation_at(i),
2479 ));
2480 }
2481 index_inserts.push(IndexInsertSpec {
2482 table: TableSchema::index_table_name(&ts.name, &idx.name),
2483 key_params,
2484 });
2485 }
2486
2487 Some(TrivialFastProgram {
2488 template: tmpl.template,
2489 ops,
2490 pk_param,
2491 not_null_param_indices,
2492 fk_checks,
2493 index_inserts,
2494 on_dup,
2495 })
2496}
2497
2498#[derive(Clone)]
2499pub(super) enum CompiledOnConflict {
2500 DoNothing {
2501 target: Option<ConflictKind>,
2502 },
2503 DoUpdate {
2504 target: ConflictKind,
2505 assignments: Vec<(usize, Expr)>,
2506 where_clause: Option<Expr>,
2507 fast_paths: Option<Vec<DoUpdateFastPath>>,
2508 },
2509}
2510
2511#[derive(Clone, Copy)]
2512pub(super) enum DoUpdateFastPath {
2513 IntAddConst { phys_idx: usize, delta: i64 },
2514}
2515
2516#[derive(Clone, Debug)]
2517pub(super) enum ConflictKind {
2518 PrimaryKey,
2519 UniqueIndex { index_idx: usize },
2520}
2521
2522fn resolve_conflict_target(target: &ConflictTarget, ts: &TableSchema) -> Result<ConflictKind> {
2523 match target {
2524 ConflictTarget::Columns(cols) => {
2525 let col_idx_set: Vec<u16> = cols
2526 .iter()
2527 .map(|name| {
2528 ts.column_index(name)
2529 .map(|i| i as u16)
2530 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
2531 })
2532 .collect::<Result<_>>()?;
2533 let pk_set = ts.primary_key_columns.clone();
2534 if set_equal(&col_idx_set, &pk_set) {
2535 return Ok(ConflictKind::PrimaryKey);
2536 }
2537 for (index_idx, idx) in ts.indices.iter().enumerate() {
2538 if idx.unique && set_equal(&col_idx_set, &idx.columns_vec()) {
2539 return Ok(ConflictKind::UniqueIndex { index_idx });
2540 }
2541 }
2542 Err(SqlError::Plan(
2543 "ON CONFLICT target does not match any unique constraint".into(),
2544 ))
2545 }
2546 ConflictTarget::Constraint(name) => {
2547 let lower = name.to_ascii_lowercase();
2548 for (index_idx, idx) in ts.indices.iter().enumerate() {
2549 if idx.name.eq_ignore_ascii_case(&lower) {
2550 if idx.unique {
2551 return Ok(ConflictKind::UniqueIndex { index_idx });
2552 }
2553 return Err(SqlError::Plan(format!(
2554 "ON CONFLICT ON CONSTRAINT '{name}' requires a unique index"
2555 )));
2556 }
2557 }
2558 Err(SqlError::Plan(format!(
2559 "unknown constraint '{name}'; primary keys cannot be referenced by name, use ON CONFLICT (col_list)"
2560 )))
2561 }
2562 }
2563}
2564
2565fn set_equal(a: &[u16], b: &[u16]) -> bool {
2566 if a.len() != b.len() {
2567 return false;
2568 }
2569 let mut a_sorted = a.to_vec();
2570 let mut b_sorted = b.to_vec();
2571 a_sorted.sort_unstable();
2572 b_sorted.sort_unstable();
2573 a_sorted == b_sorted
2574}
2575
2576pub(super) enum InsertRowOutcome {
2577 Inserted,
2578 Updated { old: Vec<Value>, new: Vec<Value> },
2579 Skipped,
2580}
2581
2582#[allow(clippy::too_many_arguments)]
2583#[inline]
2584pub(super) fn apply_insert_with_conflict(
2585 wtx: &mut WriteTxn<'_>,
2586 table_schema: &TableSchema,
2587 key_buf: &[u8],
2588 value_buf: &[u8],
2589 row: &[Value],
2590 pk_values: &[Value],
2591 on_conflict: &CompiledOnConflict,
2592 col_map: &ColumnMap,
2593 capture_returning: bool,
2594) -> Result<InsertRowOutcome> {
2595 let table_bytes = table_schema.name.as_bytes();
2596
2597 if let CompiledOnConflict::DoNothing { target } = on_conflict {
2598 let pk_target = matches!(target, None | Some(ConflictKind::PrimaryKey));
2599 if pk_target && table_schema.indices.is_empty() && table_schema.foreign_keys.is_empty() {
2600 let inserted = wtx
2601 .table_insert_if_absent(table_bytes, key_buf, value_buf)
2602 .map_err(SqlError::Storage)?;
2603 return Ok(if inserted {
2604 InsertRowOutcome::Inserted
2605 } else {
2606 InsertRowOutcome::Skipped
2607 });
2608 }
2609 }
2610
2611 if let CompiledOnConflict::DoUpdate {
2612 target: ConflictKind::PrimaryKey,
2613 assignments,
2614 where_clause,
2615 fast_paths,
2616 } = on_conflict
2617 {
2618 if can_fuse_do_update(table_schema, assignments) {
2619 return apply_do_update_fused(
2620 wtx,
2621 table_schema,
2622 table_bytes,
2623 key_buf,
2624 value_buf,
2625 row,
2626 assignments,
2627 where_clause.as_ref(),
2628 col_map,
2629 fast_paths.as_deref(),
2630 capture_returning,
2631 );
2632 }
2633 }
2634
2635 let primary_outcome = wtx
2636 .table_insert_or_fetch(table_bytes, key_buf, value_buf)
2637 .map_err(SqlError::Storage)?;
2638
2639 match primary_outcome {
2640 citadel_txn::write_txn::InsertOutcome::Inserted => {
2641 if table_schema.indices.is_empty() {
2642 return Ok(InsertRowOutcome::Inserted);
2643 }
2644 let mut inserted_keys: Vec<(usize, Vec<u8>)> = Vec::new();
2645 match insert_index_entries_or_fetch(
2646 wtx,
2647 table_schema,
2648 row,
2649 pk_values,
2650 &mut inserted_keys,
2651 )? {
2652 None => Ok(InsertRowOutcome::Inserted),
2653 Some(conflicting_idx) => {
2654 let matches_target =
2655 matches!(on_conflict, CompiledOnConflict::DoNothing { target: None })
2656 || matches!(
2657 on_conflict,
2658 CompiledOnConflict::DoNothing {
2659 target: Some(ConflictKind::UniqueIndex { index_idx }),
2660 } | CompiledOnConflict::DoUpdate {
2661 target: ConflictKind::UniqueIndex { index_idx },
2662 ..
2663 } if *index_idx == conflicting_idx
2664 );
2665 undo_partial_insert(wtx, table_schema, key_buf, &inserted_keys)?;
2666 if !matches_target {
2667 return Err(SqlError::UniqueViolation(
2668 table_schema.indices[conflicting_idx].name.clone(),
2669 ));
2670 }
2671 match on_conflict {
2672 CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2673 CompiledOnConflict::DoUpdate {
2674 assignments,
2675 where_clause,
2676 ..
2677 } => {
2678 let existing_pk =
2679 fetch_unique_index_pk(wtx, table_schema, conflicting_idx, row)?;
2680 apply_do_update(
2681 wtx,
2682 table_schema,
2683 &existing_pk,
2684 row,
2685 assignments,
2686 where_clause.as_ref(),
2687 col_map,
2688 capture_returning,
2689 )
2690 }
2691 }
2692 }
2693 }
2694 }
2695 citadel_txn::write_txn::InsertOutcome::Existed(old_bytes) => {
2696 let matches_target = matches!(
2697 on_conflict,
2698 CompiledOnConflict::DoNothing { target: None }
2699 | CompiledOnConflict::DoNothing {
2700 target: Some(ConflictKind::PrimaryKey),
2701 }
2702 | CompiledOnConflict::DoUpdate {
2703 target: ConflictKind::PrimaryKey,
2704 ..
2705 }
2706 );
2707 if !matches_target {
2708 return Err(SqlError::DuplicateKey);
2709 }
2710 match on_conflict {
2711 CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2712 CompiledOnConflict::DoUpdate {
2713 assignments,
2714 where_clause,
2715 ..
2716 } => {
2717 let old_row = decode_full_row(table_schema, key_buf, &old_bytes)?;
2718 apply_do_update_with_old_row(
2719 wtx,
2720 table_schema,
2721 key_buf,
2722 &old_row,
2723 row,
2724 assignments,
2725 where_clause.as_ref(),
2726 col_map,
2727 capture_returning,
2728 )
2729 }
2730 }
2731 }
2732 }
2733}
2734
2735#[inline]
2736fn apply_fast_path_patch(
2737 old_bytes: &[u8],
2738 fast_paths: &[DoUpdateFastPath],
2739) -> Result<UpsertAction> {
2740 UPSERT_SCRATCH.with(|slot| {
2741 let mut bufs = slot.borrow_mut();
2742 bufs.new_value_buf.clear();
2743 bufs.new_value_buf.extend_from_slice(old_bytes);
2744
2745 let mut patch_scratch: Vec<u8> = Vec::new();
2746
2747 for fp in fast_paths {
2748 match fp {
2749 DoUpdateFastPath::IntAddConst { phys_idx, delta } => {
2750 let decoded =
2751 crate::encoding::decode_columns(&bufs.new_value_buf, &[*phys_idx])?;
2752 let old_val = &decoded[0];
2753 let new_val = match old_val {
2754 Value::Integer(i) => Value::Integer(i.wrapping_add(*delta)),
2755 Value::Null => Value::Null,
2756 _ => {
2757 return Err(SqlError::TypeMismatch {
2758 expected: "INTEGER".into(),
2759 got: old_val.data_type().to_string(),
2760 });
2761 }
2762 };
2763 if !crate::encoding::patch_column_in_place(
2764 &mut bufs.new_value_buf,
2765 *phys_idx,
2766 &new_val,
2767 )? {
2768 patch_scratch.clear();
2769 crate::encoding::patch_row_column(
2770 &bufs.new_value_buf,
2771 *phys_idx,
2772 &new_val,
2773 &mut patch_scratch,
2774 )?;
2775 std::mem::swap(&mut bufs.new_value_buf, &mut patch_scratch);
2776 }
2777 }
2778 }
2779 }
2780
2781 if bufs.new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2782 return Err(SqlError::RowTooLarge {
2783 size: bufs.new_value_buf.len(),
2784 max: citadel_core::MAX_VALUE_SIZE,
2785 });
2786 }
2787
2788 Ok(UpsertAction::Replace(bufs.new_value_buf.clone()))
2789 })
2790}
2791
2792fn upsert_needs_row(oc: &CompiledOnConflict, ts: &TableSchema) -> bool {
2793 if !ts.indices.is_empty() {
2794 return true;
2795 }
2796 match oc {
2797 CompiledOnConflict::DoNothing { .. } => false,
2798 CompiledOnConflict::DoUpdate { fast_paths, .. } => fast_paths.is_none() || ts.has_checks(),
2799 }
2800}
2801
2802fn can_fuse_do_update(ts: &TableSchema, assignments: &[(usize, Expr)]) -> bool {
2803 if !ts.indices.is_empty() {
2804 return false;
2805 }
2806 if !ts.foreign_keys.is_empty() {
2807 return false;
2808 }
2809 if ts.columns.iter().any(|c| c.generated_kind.is_some()) {
2810 return false;
2811 }
2812 let pk = ts.pk_indices();
2813 !assignments.iter().any(|(ci, _)| pk.contains(ci))
2814}
2815
2816#[allow(clippy::too_many_arguments)]
2817#[inline]
2818fn apply_do_update_fused(
2819 wtx: &mut WriteTxn<'_>,
2820 table_schema: &TableSchema,
2821 table_bytes: &[u8],
2822 key_buf: &[u8],
2823 value_buf: &[u8],
2824 proposed_row: &[Value],
2825 assignments: &[(usize, Expr)],
2826 where_clause: Option<&Expr>,
2827 col_map: &ColumnMap,
2828 fast_paths: Option<&[DoUpdateFastPath]>,
2829 capture_returning: bool,
2830) -> Result<InsertRowOutcome> {
2831 let non_pk = table_schema.non_pk_indices();
2832 let enc_pos = table_schema.encoding_positions();
2833 let phys_count = table_schema.physical_non_pk_count();
2834 let dropped = table_schema.dropped_non_pk_slots();
2835 let has_checks = table_schema.has_checks();
2836 let has_fks = !table_schema.foreign_keys.is_empty();
2837
2838 let captured: std::cell::RefCell<Option<(Vec<Value>, Vec<Value>)>> =
2839 std::cell::RefCell::new(None);
2840
2841 let outcome =
2842 wtx.table_upsert_with::<_, SqlError>(table_bytes, key_buf, value_buf, |old_bytes| {
2843 if let Some(fps) = fast_paths {
2844 if !has_checks {
2845 let action = apply_fast_path_patch(old_bytes, fps)?;
2846 if capture_returning {
2847 if let UpsertAction::Replace(ref new_bytes) = action {
2848 let old_row = decode_full_row(table_schema, key_buf, old_bytes)?;
2849 let new_row = decode_full_row(table_schema, key_buf, new_bytes)?;
2850 *captured.borrow_mut() = Some((old_row, new_row));
2851 }
2852 }
2853 return Ok(action);
2854 }
2855 }
2856 UPSERT_SCRATCH.with(|slot| {
2857 let mut bufs = slot.borrow_mut();
2858 let UpsertBufs {
2859 old_row,
2860 new_row,
2861 value_values,
2862 new_value_buf,
2863 } = &mut *bufs;
2864
2865 old_row.clear();
2866 old_row.resize(table_schema.columns.len(), Value::Null);
2867 decode_full_row_into(table_schema, key_buf, old_bytes, old_row)?;
2868
2869 if let Some(w) = where_clause {
2870 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2871 let result = eval_expr(w, &ctx)?;
2872 if result.is_null() || !is_truthy(&result) {
2873 return Ok(UpsertAction::Skip);
2874 }
2875 }
2876
2877 new_row.clear();
2878 new_row.extend_from_slice(old_row);
2879 for (col_idx, expr) in assignments {
2880 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2881 let val = eval_expr(expr, &ctx)?;
2882 let col = &table_schema.columns[*col_idx];
2883 new_row[*col_idx] = if val.is_null() {
2884 Value::Null
2885 } else {
2886 let got = val.data_type();
2887 val.coerce_into(col.data_type)
2888 .ok_or_else(|| SqlError::TypeMismatch {
2889 expected: col.data_type.to_string(),
2890 got: got.to_string(),
2891 })?
2892 };
2893 }
2894
2895 for (assigned_idx, _) in assignments {
2896 let col = &table_schema.columns[*assigned_idx];
2897 if !col.nullable && new_row[col.position as usize].is_null() {
2898 return Err(SqlError::NotNullViolation(col.name.clone()));
2899 }
2900 }
2901 if has_checks {
2902 for col in &table_schema.columns {
2903 if let Some(ref check) = col.check_expr {
2904 let ctx = EvalCtx::new(col_map, new_row);
2905 let result = eval_expr(check, &ctx)?;
2906 if !is_truthy(&result) && !result.is_null() {
2907 let name = col.check_name.as_deref().unwrap_or(&col.name);
2908 return Err(SqlError::CheckViolation(name.to_string()));
2909 }
2910 }
2911 }
2912 for tc in &table_schema.check_constraints {
2913 let ctx = EvalCtx::new(col_map, new_row);
2914 let result = eval_expr(&tc.expr, &ctx)?;
2915 if !is_truthy(&result) && !result.is_null() {
2916 let name = tc.name.as_deref().unwrap_or(&tc.sql);
2917 return Err(SqlError::CheckViolation(name.to_string()));
2918 }
2919 }
2920 }
2921 let _ = has_fks;
2922
2923 value_values.clear();
2924 value_values.resize(phys_count, Value::Null);
2925 for &slot in dropped {
2926 value_values[slot as usize] = Value::Null;
2927 }
2928 for (j, &i) in non_pk.iter().enumerate() {
2929 value_values[enc_pos[j] as usize] = new_row[i].clone();
2930 }
2931 new_value_buf.clear();
2932 crate::encoding::encode_row_into(value_values, new_value_buf);
2933
2934 if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2935 return Err(SqlError::RowTooLarge {
2936 size: new_value_buf.len(),
2937 max: citadel_core::MAX_VALUE_SIZE,
2938 });
2939 }
2940
2941 if capture_returning {
2942 *captured.borrow_mut() = Some((old_row.clone(), new_row.clone()));
2943 }
2944 Ok(UpsertAction::Replace(new_value_buf.clone()))
2945 })
2946 })?;
2947
2948 match outcome {
2949 UpsertOutcome::Inserted => Ok(InsertRowOutcome::Inserted),
2950 UpsertOutcome::Updated => {
2951 if capture_returning {
2952 let (old, new) = captured.into_inner().ok_or_else(|| {
2953 SqlError::InvalidValue("DO UPDATE produced no captured rows".into())
2954 })?;
2955 Ok(InsertRowOutcome::Updated { old, new })
2956 } else {
2957 Ok(InsertRowOutcome::Inserted)
2958 }
2959 }
2960 UpsertOutcome::Skipped => Ok(InsertRowOutcome::Skipped),
2961 }
2962}
2963
2964fn fetch_unique_index_pk(
2965 wtx: &mut WriteTxn<'_>,
2966 table_schema: &TableSchema,
2967 index_idx: usize,
2968 row: &[Value],
2969) -> Result<Vec<u8>> {
2970 let idx = &table_schema.indices[index_idx];
2971 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2972 let indexed: Vec<Value> = idx
2973 .column_positions_iter()
2974 .map(|col_idx| row[col_idx as usize].clone())
2975 .collect();
2976 let key = crate::encoding::encode_composite_key(&indexed);
2977 let value = wtx
2978 .table_get(&idx_table, &key)
2979 .map_err(SqlError::Storage)?
2980 .ok_or_else(|| {
2981 SqlError::InvalidValue("unique index missing expected collision entry".into())
2982 })?;
2983 Ok(value)
2984}
2985
2986#[allow(clippy::too_many_arguments)]
2987fn apply_do_update(
2988 wtx: &mut WriteTxn<'_>,
2989 table_schema: &TableSchema,
2990 pk_key: &[u8],
2991 proposed_row: &[Value],
2992 assignments: &[(usize, Expr)],
2993 where_clause: Option<&Expr>,
2994 col_map: &ColumnMap,
2995 capture_returning: bool,
2996) -> Result<InsertRowOutcome> {
2997 let old_value = wtx
2998 .table_get(table_schema.name.as_bytes(), pk_key)
2999 .map_err(SqlError::Storage)?
3000 .ok_or_else(|| SqlError::InvalidValue("primary row missing for DO UPDATE target".into()))?;
3001 let old_row = decode_full_row(table_schema, pk_key, &old_value)?;
3002 apply_do_update_with_old_row(
3003 wtx,
3004 table_schema,
3005 pk_key,
3006 &old_row,
3007 proposed_row,
3008 assignments,
3009 where_clause,
3010 col_map,
3011 capture_returning,
3012 )
3013}
3014
3015#[allow(clippy::too_many_arguments)]
3016fn apply_do_update_with_old_row(
3017 wtx: &mut WriteTxn<'_>,
3018 table_schema: &TableSchema,
3019 old_pk_key: &[u8],
3020 old_row: &[Value],
3021 proposed_row: &[Value],
3022 assignments: &[(usize, Expr)],
3023 where_clause: Option<&Expr>,
3024 col_map: &ColumnMap,
3025 capture_returning: bool,
3026) -> Result<InsertRowOutcome> {
3027 if let Some(w) = where_clause {
3028 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
3029 let result = eval_expr(w, &ctx)?;
3030 if result.is_null() || !is_truthy(&result) {
3031 return Ok(InsertRowOutcome::Skipped);
3032 }
3033 }
3034
3035 let mut new_row = old_row.to_vec();
3036 for (col_idx, expr) in assignments {
3037 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
3038 let val = eval_expr(expr, &ctx)?;
3039 let col = &table_schema.columns[*col_idx];
3040 new_row[*col_idx] = if val.is_null() {
3041 Value::Null
3042 } else {
3043 let got = val.data_type();
3044 val.coerce_into(col.data_type)
3045 .ok_or_else(|| SqlError::TypeMismatch {
3046 expected: col.data_type.to_string(),
3047 got: got.to_string(),
3048 })?
3049 };
3050 }
3051
3052 for col in &table_schema.columns {
3053 if matches!(
3054 col.generated_kind,
3055 Some(crate::parser::GeneratedKind::Stored)
3056 ) {
3057 let val = eval_expr(
3058 col.generated_expr.as_ref().unwrap(),
3059 &EvalCtx::new(col_map, &new_row),
3060 )?;
3061 let pos = col.position as usize;
3062 new_row[pos] = if val.is_null() {
3063 if !col.nullable {
3064 return Err(SqlError::NotNullViolation(col.name.clone()));
3065 }
3066 Value::Null
3067 } else {
3068 let got = val.data_type();
3069 val.coerce_into(col.data_type)
3070 .ok_or_else(|| SqlError::TypeMismatch {
3071 expected: col.data_type.to_string(),
3072 got: got.to_string(),
3073 })?
3074 };
3075 }
3076 }
3077
3078 let pk_indices = table_schema.pk_indices();
3079 let assigned_pk = assignments.iter().any(|(ci, _)| pk_indices.contains(ci));
3080 let pk_changed = assigned_pk && pk_indices.iter().any(|&i| old_row[i] != new_row[i]);
3081
3082 for (assigned_idx, _) in assignments {
3083 let col = &table_schema.columns[*assigned_idx];
3084 if !col.nullable && new_row[col.position as usize].is_null() {
3085 return Err(SqlError::NotNullViolation(col.name.clone()));
3086 }
3087 }
3088 if table_schema.has_checks() {
3089 for col in &table_schema.columns {
3090 if let Some(ref check) = col.check_expr {
3091 let ctx = EvalCtx::new(col_map, &new_row);
3092 let result = eval_expr(check, &ctx)?;
3093 if !is_truthy(&result) && !result.is_null() {
3094 let name = col.check_name.as_deref().unwrap_or(&col.name);
3095 return Err(SqlError::CheckViolation(name.to_string()));
3096 }
3097 }
3098 }
3099 for tc in &table_schema.check_constraints {
3100 let ctx = EvalCtx::new(col_map, &new_row);
3101 let result = eval_expr(&tc.expr, &ctx)?;
3102 if !is_truthy(&result) && !result.is_null() {
3103 let name = tc.name.as_deref().unwrap_or(&tc.sql);
3104 return Err(SqlError::CheckViolation(name.to_string()));
3105 }
3106 }
3107 }
3108 for fk in &table_schema.foreign_keys {
3109 let changed = fk
3110 .columns
3111 .iter()
3112 .any(|&ci| old_row[ci as usize] != new_row[ci as usize]);
3113 if !changed {
3114 continue;
3115 }
3116 let any_null = fk.columns.iter().any(|&ci| new_row[ci as usize].is_null());
3117 if any_null {
3118 continue;
3119 }
3120 let fk_vals: Vec<Value> = fk
3121 .columns
3122 .iter()
3123 .map(|&ci| new_row[ci as usize].clone())
3124 .collect();
3125 let fk_key = crate::encoding::encode_composite_key(&fk_vals);
3126 if fk.deferrable && fk.initially_deferred {
3127 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
3128 wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
3129 fk_name: name,
3130 foreign_table: fk.foreign_table.as_bytes().to_vec(),
3131 parent_key: fk_key,
3132 });
3133 continue;
3134 }
3135 if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key) {
3136 let found = wtx
3137 .table_get(fk.foreign_table.as_bytes(), &fk_key)
3138 .map_err(SqlError::Storage)?;
3139 if found.is_none() {
3140 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
3141 return Err(SqlError::ForeignKeyViolation(name.to_string()));
3142 }
3143 wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key);
3144 }
3145 }
3146
3147 let has_indices = !table_schema.indices.is_empty();
3148 let old_pk_values: Vec<Value> = if has_indices || pk_changed {
3149 pk_indices.iter().map(|&i| old_row[i].clone()).collect()
3150 } else {
3151 Vec::new()
3152 };
3153 let new_pk_values: Vec<Value> = if has_indices || pk_changed {
3154 pk_indices.iter().map(|&i| new_row[i].clone()).collect()
3155 } else {
3156 Vec::new()
3157 };
3158
3159 let non_pk = table_schema.non_pk_indices();
3160 let enc_pos = table_schema.encoding_positions();
3161 let phys_count = table_schema.physical_non_pk_count();
3162 let dropped = table_schema.dropped_non_pk_slots();
3163 let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
3164 for &slot in dropped {
3165 value_values[slot as usize] = Value::Null;
3166 }
3167 for (j, &i) in non_pk.iter().enumerate() {
3168 let col = &table_schema.columns[i];
3169 value_values[enc_pos[j] as usize] = if matches!(
3170 col.generated_kind,
3171 Some(crate::parser::GeneratedKind::Virtual)
3172 ) {
3173 Value::Null
3174 } else {
3175 new_row[i].clone()
3176 };
3177 }
3178 let mut new_value_buf = Vec::with_capacity(256);
3179 crate::encoding::encode_row_into(&value_values, &mut new_value_buf);
3180
3181 if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
3182 return Err(SqlError::RowTooLarge {
3183 size: new_value_buf.len(),
3184 max: citadel_core::MAX_VALUE_SIZE,
3185 });
3186 }
3187
3188 if pk_changed {
3189 let new_pk_key = crate::encoding::encode_composite_key(&new_pk_values);
3190 let inserted = wtx
3191 .table_insert(table_schema.name.as_bytes(), &new_pk_key, &new_value_buf)
3192 .map_err(SqlError::Storage)?;
3193 if !inserted {
3194 return Err(SqlError::DuplicateKey);
3195 }
3196 wtx.table_delete(table_schema.name.as_bytes(), old_pk_key)
3197 .map_err(SqlError::Storage)?;
3198 for idx in &table_schema.indices {
3199 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
3200 let old_idx_key =
3201 encode_index_key_with_schema(idx, old_row, &old_pk_values, table_schema);
3202 wtx.table_delete(&idx_table, &old_idx_key)
3203 .map_err(SqlError::Storage)?;
3204 let new_idx_key =
3205 encode_index_key_with_schema(idx, &new_row, &new_pk_values, table_schema);
3206 let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
3207 let is_new = wtx
3208 .table_insert(&idx_table, &new_idx_key, &new_idx_val)
3209 .map_err(SqlError::Storage)?;
3210 if idx.unique && !is_new {
3211 let any_null = idx
3212 .column_positions_iter()
3213 .any(|c| new_row[c as usize].is_null());
3214 if !any_null {
3215 return Err(SqlError::UniqueViolation(idx.name.clone()));
3216 }
3217 }
3218 }
3219 } else {
3220 wtx.table_update_sorted(
3221 table_schema.name.as_bytes(),
3222 &[(old_pk_key, new_value_buf.as_slice())],
3223 )
3224 .map_err(SqlError::Storage)?;
3225 let col_map_partial = any_partial_index(table_schema).then(|| table_schema.column_map());
3226 for idx in &table_schema.indices {
3227 let cols_changed = index_columns_changed(idx, old_row, &new_row);
3228 let (del, ins) = partial_idx_update_actions(
3229 idx,
3230 old_row,
3231 &new_row,
3232 cols_changed,
3233 false,
3234 col_map_partial,
3235 );
3236 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
3237 if del {
3238 let old_idx_key =
3239 encode_index_key_with_schema(idx, old_row, &old_pk_values, table_schema);
3240 wtx.table_delete(&idx_table, &old_idx_key)
3241 .map_err(SqlError::Storage)?;
3242 }
3243 if ins {
3244 let new_idx_key =
3245 encode_index_key_with_schema(idx, &new_row, &new_pk_values, table_schema);
3246 let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
3247 let is_new = wtx
3248 .table_insert(&idx_table, &new_idx_key, &new_idx_val)
3249 .map_err(SqlError::Storage)?;
3250 if idx.unique && !is_new {
3251 let any_null = idx
3252 .column_positions_iter()
3253 .any(|c| new_row[c as usize].is_null());
3254 if !any_null {
3255 return Err(SqlError::UniqueViolation(idx.name.clone()));
3256 }
3257 }
3258 }
3259 }
3260 }
3261
3262 if capture_returning {
3263 Ok(InsertRowOutcome::Updated {
3264 old: old_row.to_vec(),
3265 new: new_row,
3266 })
3267 } else {
3268 Ok(InsertRowOutcome::Inserted)
3269 }
3270}
3271
3272fn detect_fast_paths(
3273 ts: &TableSchema,
3274 assignments: &[(usize, Expr)],
3275) -> Option<Vec<DoUpdateFastPath>> {
3276 let non_pk = ts.non_pk_indices();
3277 let enc_pos = ts.encoding_positions();
3278 let mut out = Vec::with_capacity(assignments.len());
3279 for (col_idx, expr) in assignments {
3280 let col = &ts.columns[*col_idx];
3281 if col.data_type != DataType::Integer {
3282 return None;
3283 }
3284 let nonpk_order = non_pk.iter().position(|&i| i == *col_idx)?;
3285 let phys_idx = enc_pos[nonpk_order] as usize;
3286
3287 if let Expr::BinaryOp { left, op, right } = expr {
3288 if !matches!(op, BinOp::Add | BinOp::Sub) {
3289 return None;
3290 }
3291 let reads_target =
3292 matches!(left.as_ref(), Expr::Column(n) if n.eq_ignore_ascii_case(&col.name));
3293 if !reads_target {
3294 return None;
3295 }
3296 if let Expr::Literal(Value::Integer(n)) = right.as_ref() {
3297 let delta = if matches!(op, BinOp::Sub) { -n } else { *n };
3298 let _ = col_idx;
3299 out.push(DoUpdateFastPath::IntAddConst { phys_idx, delta });
3300 continue;
3301 }
3302 return None;
3303 }
3304 return None;
3305 }
3306 Some(out)
3307}
3308
3309fn compile_on_conflict(oc: &OnConflictClause, ts: &TableSchema) -> Result<CompiledOnConflict> {
3310 let target = oc
3311 .target
3312 .as_ref()
3313 .map(|t| resolve_conflict_target(t, ts))
3314 .transpose()?;
3315 match &oc.action {
3316 OnConflictAction::DoNothing => Ok(CompiledOnConflict::DoNothing { target }),
3317 OnConflictAction::DoUpdate {
3318 assignments,
3319 where_clause,
3320 } => {
3321 let target = target.ok_or_else(|| {
3322 SqlError::Plan("ON CONFLICT without target requires DO NOTHING".into())
3323 })?;
3324 let compiled_assignments: Vec<(usize, Expr)> = assignments
3325 .iter()
3326 .map(|(name, expr)| {
3327 let col_idx = ts
3328 .column_index(name)
3329 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?;
3330 Ok((col_idx, expr.clone()))
3331 })
3332 .collect::<Result<_>>()?;
3333 let fast_paths = if where_clause.is_none() {
3334 detect_fast_paths(ts, &compiled_assignments)
3335 } else {
3336 None
3337 };
3338 Ok(CompiledOnConflict::DoUpdate {
3339 target,
3340 assignments: compiled_assignments,
3341 where_clause: where_clause.clone(),
3342 fast_paths,
3343 })
3344 }
3345 }
3346}
3347
3348fn exec_insert_trivial_fast(
3350 wtx: &mut WriteTxn<'_>,
3351 table_lower: &str,
3352 cache: &InsertCache,
3353 bufs: &mut InsertBufs,
3354 params: &[Value],
3355) -> Result<ExecutionResult> {
3356 let prog = cache
3357 .trivial_fast_program
3358 .as_ref()
3359 .expect("trivial fast: program");
3360
3361 for &p in &prog.not_null_param_indices {
3362 if params[p as usize].is_null() {
3363 return Err(SqlError::NotNullViolation(format!("param@{p}")));
3364 }
3365 }
3366
3367 match ¶ms[prog.pk_param as usize] {
3368 Value::Integer(v) => crate::encoding::encode_int_key_into(*v, &mut bufs.key_buf),
3369 _ => return Err(SqlError::InvalidValue("non-integer PK in fast path".into())),
3370 }
3371
3372 bufs.value_buf.clear();
3373 bufs.value_buf.extend_from_slice(&prog.template);
3374
3375 for op in &prog.ops {
3376 match op {
3377 WriteOp::ParamI64 { param_idx, off } => match ¶ms[*param_idx as usize] {
3378 Value::Integer(v) => {
3379 let off = *off as usize;
3380 bufs.value_buf[off..off + 8].copy_from_slice(&v.to_le_bytes());
3381 }
3382 other => {
3383 return Err(SqlError::TypeMismatch {
3384 expected: "Integer".into(),
3385 got: other.data_type().to_string(),
3386 });
3387 }
3388 },
3389 WriteOp::LiteralI64 { value, off } => {
3390 let off = *off as usize;
3391 bufs.value_buf[off..off + 8].copy_from_slice(&value.to_le_bytes());
3392 }
3393 WriteOp::GenAddParamsI64 {
3394 a_param,
3395 b_param,
3396 off,
3397 bitmap_byte_off,
3398 bitmap_bit_mask,
3399 } => match (¶ms[*a_param as usize], ¶ms[*b_param as usize]) {
3400 (Value::Integer(a), Value::Integer(b)) => {
3401 let off = *off as usize;
3402 bufs.value_buf[off..off + 8].copy_from_slice(&a.wrapping_add(*b).to_le_bytes());
3403 }
3404 _ => {
3405 bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
3406 }
3407 },
3408 WriteOp::GenMulAddParamI64 {
3409 param_idx,
3410 mul,
3411 add,
3412 off,
3413 bitmap_byte_off,
3414 bitmap_bit_mask,
3415 } => match ¶ms[*param_idx as usize] {
3416 Value::Integer(v) => {
3417 let r = v.wrapping_mul(*mul).wrapping_add(*add);
3418 let off = *off as usize;
3419 bufs.value_buf[off..off + 8].copy_from_slice(&r.to_le_bytes());
3420 }
3421 _ => {
3422 bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
3423 }
3424 },
3425 }
3426 }
3427
3428 for fk in &prog.fk_checks {
3429 if fk.col_params.iter().any(|&p| params[p as usize].is_null()) {
3430 continue;
3431 }
3432 bufs.fk_key_buf.clear();
3433 for &p in &fk.col_params {
3434 crate::encoding::encode_key_value_into(¶ms[p as usize], &mut bufs.fk_key_buf);
3435 }
3436 if !wtx.fk_check_cached(&fk.foreign_table, &bufs.fk_key_buf) {
3437 let found = wtx
3438 .table_get(&fk.foreign_table, &bufs.fk_key_buf)
3439 .map_err(SqlError::Storage)?;
3440 if found.is_none() {
3441 return Err(SqlError::ForeignKeyViolation(
3442 String::from_utf8_lossy(&fk.foreign_table).into_owned(),
3443 ));
3444 }
3445 wtx.mark_fk_verified(&fk.foreign_table, &bufs.fk_key_buf);
3446 }
3447 }
3448
3449 let is_new = wtx
3450 .table_insert_if_absent(table_lower.as_bytes(), &bufs.key_buf, &bufs.value_buf)
3451 .map_err(SqlError::Storage)?;
3452 if !is_new {
3453 return match prog.on_dup {
3454 DupPolicy::Error => Err(SqlError::DuplicateKey),
3455 DupPolicy::Skip => Ok(ExecutionResult::RowsAffected(0)),
3456 };
3457 }
3458
3459 for idx in &prog.index_inserts {
3460 bufs.fk_key_buf.clear();
3461 for &(p, coll) in &idx.key_params {
3462 crate::encoding::encode_key_value_collated_into(
3463 ¶ms[p as usize],
3464 coll,
3465 &mut bufs.fk_key_buf,
3466 );
3467 }
3468 crate::encoding::encode_key_value_into(
3469 ¶ms[prog.pk_param as usize],
3470 &mut bufs.fk_key_buf,
3471 );
3472 wtx.table_insert_index(&idx.table, &bufs.fk_key_buf, &[])
3473 .map_err(SqlError::Storage)?;
3474 }
3475
3476 Ok(ExecutionResult::RowsAffected(1))
3477}
3478
3479fn build_bind_plan(
3480 stmt: &InsertStmt,
3481 col_indices: &[usize],
3482 col_data_types: &[DataType],
3483) -> Option<Vec<BindAction>> {
3484 let rows = match &stmt.source {
3485 InsertSource::Values(rows) => rows,
3486 _ => return None,
3487 };
3488 if rows.len() != 1 {
3489 return None;
3490 }
3491 let value_row = &rows[0];
3492 if value_row.len() != col_indices.len() {
3493 return None;
3494 }
3495 let mut plan = Vec::with_capacity(value_row.len());
3496 for (i, expr) in value_row.iter().enumerate() {
3497 let col_idx = col_indices[i];
3498 let target = col_data_types[col_idx];
3499 match expr {
3500 Expr::Parameter(n) => {
3501 if *n == 0 {
3502 return None;
3503 }
3504 plan.push(BindAction::Param {
3505 param_idx: n - 1,
3506 col_idx,
3507 target,
3508 });
3509 }
3510 Expr::Literal(v) => plan.push(BindAction::Literal {
3511 value: v.clone(),
3512 col_idx,
3513 }),
3514 _ => return None,
3515 }
3516 }
3517 Some(plan)
3518}
3519
3520impl CompiledInsert {
3521 pub fn try_compile(schema: &SchemaManager, stmt: &InsertStmt) -> Option<Self> {
3522 let lower = stmt.table.to_ascii_lowercase();
3523 let cached = if let Some(ts) = schema.get(&lower) {
3524 let insert_columns: Vec<&str> = if stmt.columns.is_empty() {
3525 ts.columns.iter().map(|c| c.name.as_str()).collect()
3526 } else {
3527 stmt.columns.iter().map(|s| s.as_str()).collect()
3528 };
3529 let mut col_indices = Vec::with_capacity(insert_columns.len());
3530 for name in &insert_columns {
3531 col_indices.push(ts.column_index(name)?);
3532 }
3533 if col_indices
3534 .iter()
3535 .any(|&ci| ts.columns[ci].generated_kind.is_some())
3536 {
3537 return None;
3538 }
3539 let on_conflict = stmt
3540 .on_conflict
3541 .as_ref()
3542 .map(|oc| compile_on_conflict(oc, ts))
3543 .transpose()
3544 .ok()
3545 .flatten()
3546 .map(Arc::new);
3547 let generated_col_positions: Vec<usize> = ts
3548 .columns
3549 .iter()
3550 .enumerate()
3551 .filter_map(|(i, c)| {
3552 matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored))
3553 .then_some(i)
3554 })
3555 .collect();
3556 let generated_fast_evals: Vec<FastGenEval> = generated_col_positions
3557 .iter()
3558 .map(|&pos| {
3559 detect_fast_gen_eval(ts.columns[pos].generated_expr.as_ref().unwrap(), ts)
3560 })
3561 .collect();
3562 let row_col_map = if on_conflict.is_some() || !generated_col_positions.is_empty() {
3563 Some(ColumnMap::new(&ts.columns))
3564 } else {
3565 None
3566 };
3567 let pk_indices: Vec<usize> = ts.pk_indices().to_vec();
3568 let non_pk_indices: Vec<usize> = ts.non_pk_indices().to_vec();
3569 let encoding_positions: Vec<u16> = ts.encoding_positions().to_vec();
3570 let dropped_non_pk_slots: Vec<u16> = ts.dropped_non_pk_slots().to_vec();
3571 let phys_count = ts.physical_non_pk_count();
3572 let col_data_types: Vec<DataType> = ts.columns.iter().map(|c| c.data_type).collect();
3573 let single_int_pk =
3574 pk_indices.len() == 1 && ts.columns[pk_indices[0]].data_type == DataType::Integer;
3575 let not_null_indices: Vec<u16> = ts
3576 .columns
3577 .iter()
3578 .filter(|c| !c.nullable)
3579 .map(|c| c.position)
3580 .collect();
3581 let bind_plan = build_bind_plan(stmt, &col_indices, &col_data_types);
3582 let any_defaults_flag = ts.columns.iter().any(|c| c.default_expr.is_some());
3583 let row_fully_overwritten = if any_defaults_flag {
3584 false
3585 } else {
3586 let mut covered: rustc_hash::FxHashSet<usize> =
3587 col_indices.iter().copied().collect();
3588 covered.extend(generated_col_positions.iter().copied());
3589 for (j, &i) in non_pk_indices.iter().enumerate() {
3590 let _ = j;
3591 if matches!(
3592 ts.columns[i].generated_kind,
3593 Some(crate::parser::GeneratedKind::Virtual)
3594 ) {
3595 covered.insert(i);
3596 }
3597 }
3598 bind_plan.is_some() && covered.len() == ts.columns.len()
3599 };
3600 let mut non_virtual_pairs: Vec<(usize, usize)> = Vec::new();
3601 let mut null_value_slots: Vec<usize> =
3602 dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3603 for (j, &i) in non_pk_indices.iter().enumerate() {
3604 let slot = encoding_positions[j] as usize;
3605 if matches!(
3606 ts.columns[i].generated_kind,
3607 Some(crate::parser::GeneratedKind::Virtual)
3608 ) {
3609 null_value_slots.push(slot);
3610 } else {
3611 non_virtual_pairs.push((i, slot));
3612 }
3613 }
3614 let row_encoder = {
3615 let all_int_or_null = non_pk_indices.iter().enumerate().all(|(j, &i)| {
3616 let col = &ts.columns[i];
3617 if matches!(
3618 col.generated_kind,
3619 Some(crate::parser::GeneratedKind::Virtual)
3620 ) {
3621 true
3622 } else {
3623 col.data_type == DataType::Integer && encoding_positions[j] != u16::MAX
3624 }
3625 });
3626 if all_int_or_null {
3627 let mut slots: Vec<crate::encoding::TemplateSlot> = (0..phys_count)
3628 .map(|_| crate::encoding::TemplateSlot::IntHole)
3629 .collect();
3630 for &s in &dropped_non_pk_slots {
3631 slots[s as usize] = crate::encoding::TemplateSlot::Null;
3632 }
3633 for (j, &i) in non_pk_indices.iter().enumerate() {
3634 if matches!(
3635 ts.columns[i].generated_kind,
3636 Some(crate::parser::GeneratedKind::Virtual)
3637 ) {
3638 slots[encoding_positions[j] as usize] =
3639 crate::encoding::TemplateSlot::Null;
3640 }
3641 }
3642 Some(crate::encoding::build_row_template(phys_count, &slots))
3643 } else {
3644 None
3645 }
3646 };
3647 let is_trivial_fast_eligible = !insert_has_subquery(stmt)
3649 && !ts.columns.iter().any(|c| c.default_expr.is_some())
3650 && !ts.has_checks()
3651 && stmt.returning.is_none()
3652 && bind_plan.is_some()
3653 && row_fully_overwritten
3654 && single_int_pk
3655 && generated_fast_evals
3656 .iter()
3657 .all(|fe| !matches!(fe, FastGenEval::None));
3658 let trivial_fast_program = if is_trivial_fast_eligible {
3659 build_trivial_fast_program(
3660 bind_plan.as_ref().unwrap(),
3661 phys_count,
3662 &non_virtual_pairs,
3663 &generated_col_positions,
3664 &generated_fast_evals,
3665 ts,
3666 on_conflict.as_deref(),
3667 )
3668 } else {
3669 None
3670 };
3671 let is_trivial_fast = trivial_fast_program.is_some();
3672 let has_checks = ts.has_checks();
3673 let any_defaults = ts.columns.iter().any(|c| c.default_expr.is_some());
3674 let needs_scoped_params = bind_plan.is_none()
3675 || has_checks
3676 || any_defaults
3677 || !generated_col_positions.is_empty()
3678 || on_conflict.is_some()
3679 || stmt.returning.is_some()
3680 || insert_has_subquery(stmt)
3681 || super::helpers::any_partial_index(ts);
3682 Some(InsertCache {
3683 col_indices,
3684 has_subquery: insert_has_subquery(stmt),
3685 any_defaults,
3686 has_checks,
3687 on_conflict,
3688 row_col_map,
3689 generated_col_positions,
3690 generated_fast_evals,
3691 pk_indices,
3692 non_pk_indices,
3693 encoding_positions,
3694 dropped_non_pk_slots,
3695 phys_count,
3696 single_int_pk,
3697 not_null_indices,
3698 bind_plan,
3699 row_fully_overwritten,
3700 row_encoder,
3701 is_trivial_fast,
3702 trivial_fast_program,
3703 needs_scoped_params,
3704 })
3705 } else if schema.get_view(&lower).is_some() {
3706 None
3707 } else {
3708 return None;
3709 };
3710 Some(Self {
3711 table_lower: lower,
3712 cached,
3713 })
3714 }
3715}
3716
3717impl CompiledPlan for CompiledInsert {
3718 fn execute(
3719 &self,
3720 db: &Database,
3721 schema: &SchemaManager,
3722 stmt: &Statement,
3723 params: &[Value],
3724 txn: super::compile::ActiveTxnRef<'_, '_>,
3725 ) -> Result<ExecutionResult> {
3726 let ins = match stmt {
3727 Statement::Insert(i) => i,
3728 _ => {
3729 return Err(SqlError::Unsupported(
3730 "CompiledInsert received non-INSERT statement".into(),
3731 ))
3732 }
3733 };
3734 use super::compile::ActiveTxnRef;
3735 match txn {
3736 ActiveTxnRef::None => exec_insert(db, schema, ins, params),
3737 ActiveTxnRef::Read(_) => Err(SqlError::Unsupported(
3738 "cannot execute mutating statement inside a read-only transaction".into(),
3739 )),
3740 ActiveTxnRef::Write(outer) => match self.cached.as_ref() {
3741 Some(c) if c.is_trivial_fast => with_insert_scratch(|bufs| {
3742 exec_insert_trivial_fast(outer, &self.table_lower, c, bufs, params)
3743 }),
3744 Some(c) => exec_insert_in_txn_cached(outer, schema, ins, params, c),
3745 None => exec_insert_in_txn(outer, schema, ins, params),
3746 },
3747 }
3748 }
3749
3750 fn uses_scoped_params(&self) -> bool {
3751 match self.cached.as_ref() {
3752 Some(c) => !c.is_trivial_fast && c.needs_scoped_params,
3753 None => true,
3754 }
3755 }
3756}
3757
3758pub struct CompiledDelete {
3759 table_lower: String,
3760}
3761
3762impl CompiledDelete {
3763 pub fn try_compile(schema: &SchemaManager, stmt: &DeleteStmt) -> Option<Self> {
3764 let lower = stmt.table.to_ascii_lowercase();
3765 schema.get(&lower)?;
3766 Some(Self { table_lower: lower })
3767 }
3768}
3769
3770impl CompiledPlan for CompiledDelete {
3771 fn execute(
3772 &self,
3773 db: &Database,
3774 schema: &SchemaManager,
3775 stmt: &Statement,
3776 _params: &[Value],
3777 txn: super::compile::ActiveTxnRef<'_, '_>,
3778 ) -> Result<ExecutionResult> {
3779 let del = match stmt {
3780 Statement::Delete(d) => d,
3781 _ => {
3782 return Err(SqlError::Unsupported(
3783 "CompiledDelete received non-DELETE statement".into(),
3784 ))
3785 }
3786 };
3787 let _ = &self.table_lower;
3788 use super::compile::ActiveTxnRef;
3789 match txn {
3790 ActiveTxnRef::None => super::write::exec_delete(db, schema, del),
3791 ActiveTxnRef::Read(_) => Err(SqlError::Unsupported(
3792 "cannot execute mutating statement inside a read-only transaction".into(),
3793 )),
3794 ActiveTxnRef::Write(outer) => super::write::exec_delete_in_txn(outer, schema, del),
3795 }
3796 }
3797}
3798
3799fn exec_instead_of_view_insert_auto(
3800 db: &Database,
3801 schema: &SchemaManager,
3802 view_name: &str,
3803 aliases: &[String],
3804 stmt: &InsertStmt,
3805 params: &[Value],
3806) -> Result<ExecutionResult> {
3807 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
3808 let r = exec_instead_of_view_insert_in_txn(&mut wtx, schema, view_name, aliases, stmt, params)?;
3809 wtx.commit().map_err(SqlError::Storage)?;
3810 Ok(r)
3811}
3812
3813fn exec_instead_of_view_insert_in_txn(
3814 wtx: &mut WriteTxn<'_>,
3815 schema: &SchemaManager,
3816 view_name: &str,
3817 aliases: &[String],
3818 stmt: &InsertStmt,
3819 params: &[Value],
3820) -> Result<ExecutionResult> {
3821 let resolved_aliases: Vec<String> = if aliases.is_empty() {
3823 derive_view_columns(wtx, schema, view_name)?
3824 } else {
3825 aliases.to_vec()
3826 };
3827 let view_cols = super::triggers::view_columns_from_aliases(&resolved_aliases);
3828 let alias_map: rustc_hash::FxHashMap<String, usize> = resolved_aliases
3829 .iter()
3830 .enumerate()
3831 .map(|(i, name)| (name.to_ascii_lowercase(), i))
3832 .collect();
3833
3834 let target_positions: Vec<usize> = if stmt.columns.is_empty() {
3835 (0..resolved_aliases.len()).collect()
3836 } else {
3837 stmt.columns
3838 .iter()
3839 .map(|c| {
3840 alias_map
3841 .get(&c.to_ascii_lowercase())
3842 .copied()
3843 .ok_or_else(|| SqlError::ColumnNotFound(c.clone()))
3844 })
3845 .collect::<Result<_>>()?
3846 };
3847
3848 let source_rows: Vec<Vec<Value>> = match &stmt.source {
3849 InsertSource::Values(rows) => {
3850 let mut out = Vec::with_capacity(rows.len());
3851 for row in rows {
3852 if row.len() != target_positions.len() {
3853 return Err(SqlError::InvalidValue(format!(
3854 "expected {} values, got {}",
3855 target_positions.len(),
3856 row.len()
3857 )));
3858 }
3859 let mut vals = Vec::with_capacity(row.len());
3860 for expr in row {
3861 let v = match expr {
3862 Expr::Parameter(n) => params
3863 .get(n - 1)
3864 .cloned()
3865 .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
3866 Expr::Literal(v) => v.clone(),
3867 other => eval_const_expr(other)?,
3868 };
3869 vals.push(v);
3870 }
3871 out.push(vals);
3872 }
3873 out
3874 }
3875 InsertSource::Select(sq) => {
3876 let empty_ctes = CteContext::default();
3877 let qr = exec_query_body_write(wtx, schema, &sq.body, &empty_ctes)?;
3878 qr.rows
3879 }
3880 };
3881
3882 let mut count: u64 = 0;
3883 for row in source_rows {
3884 if row.len() != target_positions.len() {
3885 return Err(SqlError::InvalidValue(format!(
3886 "expected {} values, got {}",
3887 target_positions.len(),
3888 row.len()
3889 )));
3890 }
3891 let mut new_row = vec![Value::Null; resolved_aliases.len()];
3892 for (slot, val) in target_positions.iter().zip(row) {
3893 new_row[*slot] = val;
3894 }
3895 super::triggers::fire_row_triggers(
3896 wtx,
3897 schema,
3898 view_name,
3899 crate::parser::TriggerTiming::InsteadOf,
3900 super::triggers::FireEvent::Insert,
3901 None,
3902 Some(new_row),
3903 &view_cols,
3904 )?;
3905 count += 1;
3906 }
3907 Ok(ExecutionResult::RowsAffected(count))
3908}
3909
3910fn derive_view_columns(
3911 wtx: &mut WriteTxn<'_>,
3912 schema: &SchemaManager,
3913 view_name: &str,
3914) -> Result<Vec<String>> {
3915 use crate::parser::{QueryBody, SelectColumn, SelectQuery, SelectStmt};
3916 let sel = SelectStmt {
3917 columns: vec![SelectColumn::AllColumns],
3918 from: view_name.to_string(),
3919 from_alias: None,
3920 from_subquery: None,
3921 from_args: None,
3922 from_json_table: None,
3923 joins: vec![],
3924 distinct: false,
3925 where_clause: None,
3926 order_by: vec![],
3927 limit: Some(Expr::Literal(Value::Integer(1))),
3928 offset: None,
3929 group_by: vec![],
3930 having: None,
3931 };
3932 let sq = SelectQuery {
3933 ctes: vec![],
3934 recursive: false,
3935 body: QueryBody::Select(Box::new(sel)),
3936 };
3937 let qr = super::cte::exec_select_query_in_txn(wtx, schema, &sq)?;
3938 match qr {
3939 ExecutionResult::Query(q) => Ok(q.columns),
3940 _ => Ok(Vec::new()),
3941 }
3942}
3943
3944#[cfg(test)]
3945#[path = "dml_tests.rs"]
3946mod tests;