1use std::cell::RefCell;
2use std::sync::Arc;
3
4use citadel::Database;
5use citadel_buffer::btree::{UpsertAction, UpsertOutcome};
6use citadel_txn::read_txn::ReadTxn;
7use citadel_txn::write_txn::WriteTxn;
8use rustc_hash::FxHashMap;
9
10use crate::encoding::{encode_composite_key_into, encode_row_into};
11use crate::error::{Result, SqlError};
12use crate::eval::{eval_expr, is_truthy, ColumnMap, EvalCtx};
13use crate::parser::*;
14use crate::types::*;
15
16use crate::schema::SchemaManager;
17
18use super::compile::CompiledPlan;
19use super::helpers::*;
20use super::CteContext;
21
22fn mark_insert_dml(
25 schema: &SchemaManager,
26 table_name: &str,
27 on_conflict: bool,
28 single_int_pk: bool,
29 min_inserted_pk: Option<i64>,
30 rows_written: u64,
31) {
32 if on_conflict {
33 schema.mark_dml(table_name);
34 } else if single_int_pk {
35 if let Some(m) = min_inserted_pk {
36 schema.mark_dml_append(table_name, m);
37 }
38 } else if rows_written > 0 {
39 schema.mark_dml(table_name);
40 }
41}
42
43fn is_single_int_pk(table_schema: &TableSchema) -> bool {
45 table_schema.primary_key_columns.len() == 1
46 && matches!(
47 table_schema.columns[table_schema.primary_key_columns[0] as usize].data_type,
48 DataType::Integer
49 )
50}
51
52pub(super) fn exec_insert(
53 db: &Database,
54 schema: &SchemaManager,
55 stmt: &InsertStmt,
56 params: &[Value],
57) -> Result<ExecutionResult> {
58 let empty_ctes = CteContext::default();
59 let materialized;
60 let stmt = if insert_has_subquery(stmt) {
61 materialized = materialize_insert(stmt, &mut |sub| {
62 exec_subquery_read(db, schema, sub, &empty_ctes)
63 })?;
64 &materialized
65 } else {
66 stmt
67 };
68
69 let lower_name = stmt.table.to_ascii_lowercase();
70 if let Some(view_def) = schema.get_view(&lower_name) {
71 if super::triggers::has_instead_of(schema, &lower_name, super::triggers::FireEvent::Insert)
72 {
73 let aliases = view_def.column_aliases.clone();
74 return exec_instead_of_view_insert_auto(
75 db,
76 schema,
77 &lower_name,
78 &aliases,
79 stmt,
80 params,
81 );
82 }
83 return Err(SqlError::CannotModifyView(stmt.table.clone()));
84 }
85 if schema.get_matview(&lower_name).is_some() {
86 return Err(SqlError::CannotModifyView(format!(
87 "materialized view '{}' is read-only — use REFRESH MATERIALIZED VIEW",
88 stmt.table
89 )));
90 }
91 let table_schema = schema
92 .get(&lower_name)
93 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
94
95 let insert_columns = if stmt.columns.is_empty() {
96 table_schema
97 .columns
98 .iter()
99 .map(|c| c.name.clone())
100 .collect::<Vec<_>>()
101 } else {
102 stmt.columns
103 .iter()
104 .map(|c| c.to_ascii_lowercase())
105 .collect()
106 };
107
108 let col_indices: Vec<usize> = insert_columns
109 .iter()
110 .map(|name| {
111 table_schema
112 .column_index(name)
113 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
114 })
115 .collect::<Result<_>>()?;
116
117 for &ci in &col_indices {
118 if table_schema.columns[ci].generated_kind.is_some() {
119 return Err(SqlError::CannotInsertIntoGeneratedColumn(
120 table_schema.columns[ci].name.clone(),
121 ));
122 }
123 }
124
125 let defaults: Vec<(usize, &Expr)> = table_schema
126 .columns
127 .iter()
128 .filter(|c| c.default_expr.is_some() && !col_indices.contains(&(c.position as usize)))
129 .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
130 .collect();
131
132 let generated_cols: Vec<(usize, &Expr)> = table_schema
133 .columns
134 .iter()
135 .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
136 .map(|c| (c.position as usize, c.generated_expr.as_ref().unwrap()))
137 .collect();
138
139 let has_checks = table_schema.has_checks();
140 let strict = table_schema.is_strict();
141 let row_col_map_for_gen = if !generated_cols.is_empty() {
142 Some(ColumnMap::new(&table_schema.columns))
143 } else {
144 None
145 };
146 let check_col_map = if has_checks {
147 Some(ColumnMap::new(&table_schema.columns))
148 } else {
149 None
150 };
151
152 let select_rows = match &stmt.source {
153 InsertSource::Select(sq) => {
154 let insert_ctes =
155 super::materialize_all_ctes(&sq.ctes, sq.recursive, &mut |body, ctx| {
156 exec_query_body_read(db, schema, body, ctx)
157 })?;
158 let qr = exec_query_body_read(db, schema, &sq.body, &insert_ctes)?;
159 Some(qr.rows)
160 }
161 InsertSource::Values(_) => None,
162 };
163
164 let compiled_conflict: Option<Arc<CompiledOnConflict>> = stmt
165 .on_conflict
166 .as_ref()
167 .map(|oc| compile_on_conflict(oc, table_schema).map(Arc::new))
168 .transpose()?;
169
170 let row_col_map = compiled_conflict
171 .as_ref()
172 .map(|_| ColumnMap::new(&table_schema.columns));
173
174 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
175 super::ann_persist::purge_segment(&mut wtx, &table_schema.name)?;
179 let mut count: u64 = 0;
180 let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
181 stmt.returning.as_ref().map(|_| Vec::new());
182
183 let pk_indices = table_schema.pk_indices();
184 let non_pk = table_schema.non_pk_indices();
185 let enc_pos = table_schema.encoding_positions();
186 let phys_count = table_schema.physical_non_pk_count();
187 let mut row = vec![Value::Null; table_schema.columns.len()];
188 let mut pk_values: Vec<Value> = vec![Value::Null; pk_indices.len()];
189 let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
190 let mut key_buf: Vec<u8> = Vec::with_capacity(64);
191 let mut value_buf: Vec<u8> = Vec::with_capacity(256);
192 let mut fk_key_buf: Vec<u8> = Vec::with_capacity(64);
193
194 let values = match &stmt.source {
195 InsertSource::Values(rows) => Some(rows.as_slice()),
196 InsertSource::Select(_) => None,
197 };
198 let sel_rows = select_rows.as_deref();
199
200 let total = match (values, sel_rows) {
201 (Some(rows), _) => rows.len(),
202 (_, Some(rows)) => rows.len(),
203 _ => 0,
204 };
205
206 if let Some(sel) = sel_rows {
207 if !sel.is_empty() && sel[0].len() != insert_columns.len() {
208 return Err(SqlError::InvalidValue(format!(
209 "INSERT ... SELECT column count mismatch: expected {}, got {}",
210 insert_columns.len(),
211 sel[0].len()
212 )));
213 }
214 }
215
216 let has_insert_statement_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
217 t.enabled
218 && t.granularity == crate::parser::TriggerGranularity::ForEachStatement
219 && t.events
220 .iter()
221 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
222 });
223 let mut stmt_new_rows: Vec<Vec<Value>> = if has_insert_statement_triggers {
224 Vec::with_capacity(total)
225 } else {
226 Vec::new()
227 };
228
229 if has_insert_statement_triggers {
230 super::triggers::fire_statement_triggers(
231 &mut wtx,
232 schema,
233 &table_schema.name,
234 crate::parser::TriggerTiming::Before,
235 super::triggers::FireEvent::Insert,
236 &table_schema.columns,
237 &[],
238 &[],
239 )?;
240 }
241
242 let plain_insert = compiled_conflict.is_none();
243 let single_int_pk = is_single_int_pk(table_schema);
244 let mut min_inserted_pk: Option<i64> = None;
245
246 for idx in 0..total {
247 for v in row.iter_mut() {
248 *v = Value::Null;
249 }
250
251 if let Some(value_rows) = values {
252 let value_row = &value_rows[idx];
253 if value_row.len() != insert_columns.len() {
254 return Err(SqlError::InvalidValue(format!(
255 "expected {} values, got {}",
256 insert_columns.len(),
257 value_row.len()
258 )));
259 }
260 for (i, expr) in value_row.iter().enumerate() {
261 let val = if let Expr::Parameter(n) = expr {
262 params
263 .get(n - 1)
264 .cloned()
265 .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?
266 } else {
267 eval_const_expr(expr)?
268 };
269 let col_idx = col_indices[i];
270 let col = &table_schema.columns[col_idx];
271 row[col_idx] = if val.is_null() {
272 Value::Null
273 } else {
274 coerce_for_column(val, col, strict)?
275 };
276 }
277 } else if let Some(sel) = sel_rows {
278 let sel_row = &sel[idx];
279 for (i, val) in sel_row.iter().enumerate() {
280 let col_idx = col_indices[i];
281 let col = &table_schema.columns[col_idx];
282 row[col_idx] = if val.is_null() {
283 Value::Null
284 } else {
285 coerce_for_column(val.clone(), col, strict)?
286 };
287 }
288 }
289
290 for &(pos, def_expr) in &defaults {
291 let val = eval_const_expr(def_expr)?;
292 let col = &table_schema.columns[pos];
293 if !val.is_null() {
294 row[pos] = coerce_for_column(val, col, strict)?;
295 }
296 }
297
298 if let Some(ref gen_map) = row_col_map_for_gen {
299 for &(pos, gen_expr) in &generated_cols {
300 let val = eval_expr(gen_expr, &EvalCtx::new(gen_map, &row))?;
301 let col = &table_schema.columns[pos];
302 row[pos] = if val.is_null() {
303 Value::Null
304 } else {
305 coerce_for_column(val, col, strict)?
306 };
307 }
308 }
309
310 for col in &table_schema.columns {
311 if !col.nullable && row[col.position as usize].is_null() {
312 return Err(SqlError::NotNullViolation(col.name.clone()));
313 }
314 }
315
316 if let Some(ref col_map) = check_col_map {
317 for col in &table_schema.columns {
318 if let Some(ref check) = col.check_expr {
319 let result = eval_expr(check, &EvalCtx::new(col_map, &row))?;
320 if !is_truthy(&result) && !result.is_null() {
321 let name = col.check_name.as_deref().unwrap_or(&col.name);
322 return Err(SqlError::CheckViolation(name.to_string()));
323 }
324 }
325 }
326 for tc in &table_schema.check_constraints {
327 let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &row))?;
328 if !is_truthy(&result) && !result.is_null() {
329 let name = tc.name.as_deref().unwrap_or(&tc.sql);
330 return Err(SqlError::CheckViolation(name.to_string()));
331 }
332 }
333 }
334
335 for fk in &table_schema.foreign_keys {
336 let any_null = fk.columns.iter().any(|&ci| row[ci as usize].is_null());
337 if any_null {
338 continue; }
340 let fk_vals: Vec<Value> = fk
341 .columns
342 .iter()
343 .map(|&ci| row[ci as usize].clone())
344 .collect();
345 fk_key_buf.clear();
346 encode_composite_key_into(&fk_vals, &mut fk_key_buf);
347 if fk.deferrable && fk.initially_deferred {
348 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
349 wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
350 fk_name: name,
351 foreign_table: fk.foreign_table.as_bytes().to_vec(),
352 parent_key: fk_key_buf.clone(),
353 });
354 continue;
355 }
356 if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key_buf) {
357 let found = wtx
358 .table_get(fk.foreign_table.as_bytes(), &fk_key_buf)
359 .map_err(SqlError::Storage)?;
360 if found.is_none() {
361 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
362 return Err(SqlError::ForeignKeyViolation(name.to_string()));
363 }
364 wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key_buf);
365 }
366 }
367
368 let proposed_row_for_returning: Option<Vec<Value>> =
369 returning_rows.as_ref().map(|_| row.clone());
370 let row_for_stmt_trigger: Option<Vec<Value>> = if has_insert_statement_triggers {
371 Some(row.clone())
372 } else {
373 None
374 };
375
376 let has_before_insert_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
377 t.enabled
378 && t.timing == crate::parser::TriggerTiming::Before
379 && t.granularity == crate::parser::TriggerGranularity::ForEachRow
380 && t.events
381 .iter()
382 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
383 });
384 if has_before_insert_triggers {
385 super::triggers::fire_row_triggers(
386 &mut wtx,
387 schema,
388 &table_schema.name,
389 crate::parser::TriggerTiming::Before,
390 super::triggers::FireEvent::Insert,
391 None,
392 Some(row.clone()),
393 &table_schema.columns,
394 )?;
395 }
396
397 for (j, &i) in pk_indices.iter().enumerate() {
398 pk_values[j] = std::mem::replace(&mut row[i], Value::Null);
399 }
400 encode_composite_key_into(&pk_values, &mut key_buf);
401 if plain_insert && single_int_pk {
402 if let Value::Integer(id) = &pk_values[0] {
403 min_inserted_pk = Some(min_inserted_pk.map_or(*id, |m| m.min(*id)));
404 }
405 }
406
407 for (j, &i) in non_pk.iter().enumerate() {
408 let col = &table_schema.columns[i];
409 if matches!(
410 col.generated_kind,
411 Some(crate::parser::GeneratedKind::Virtual)
412 ) {
413 value_values[enc_pos[j] as usize] = Value::Null;
414 row[i] = Value::Null;
415 } else {
416 value_values[enc_pos[j] as usize] = std::mem::replace(&mut row[i], Value::Null);
417 }
418 }
419 encode_row_into(&value_values, &mut value_buf);
420
421 if key_buf.len() > citadel_core::MAX_KEY_SIZE {
422 return Err(SqlError::KeyTooLarge {
423 size: key_buf.len(),
424 max: citadel_core::MAX_KEY_SIZE,
425 });
426 }
427 if value_buf.len() > citadel_core::MAX_VALUE_SIZE {
428 return Err(SqlError::RowTooLarge {
429 size: value_buf.len(),
430 max: citadel_core::MAX_VALUE_SIZE,
431 });
432 }
433
434 match compiled_conflict.as_ref() {
435 None => {
436 let is_new = wtx
437 .table_insert(table_schema.name.as_bytes(), &key_buf, &value_buf)
438 .map_err(SqlError::Storage)?;
439 if !is_new {
440 return Err(SqlError::DuplicateKey);
441 }
442 let has_after_insert_triggers =
443 schema.triggers_for(&table_schema.name).iter().any(|t| {
444 t.enabled
445 && t.timing == crate::parser::TriggerTiming::After
446 && t.granularity == crate::parser::TriggerGranularity::ForEachRow
447 && t.events
448 .iter()
449 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
450 });
451 if !table_schema.indices.is_empty() || has_after_insert_triggers {
452 for (j, &i) in pk_indices.iter().enumerate() {
453 row[i] = pk_values[j].clone();
454 }
455 for (j, &i) in non_pk.iter().enumerate() {
456 row[i] =
457 std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
458 }
459 if !table_schema.indices.is_empty() {
460 insert_index_entries(&mut wtx, table_schema, &row, &pk_values)?;
461 }
462 if has_after_insert_triggers {
463 super::triggers::fire_row_triggers(
464 &mut wtx,
465 schema,
466 &table_schema.name,
467 crate::parser::TriggerTiming::After,
468 super::triggers::FireEvent::Insert,
469 None,
470 Some(row.clone()),
471 &table_schema.columns,
472 )?;
473 }
474 }
475 if let Some(r) = row_for_stmt_trigger.clone() {
476 stmt_new_rows.push(r);
477 }
478 count += 1;
479 if let Some(buf) = returning_rows.as_mut() {
480 buf.push((None, proposed_row_for_returning));
481 }
482 }
483 Some(oc) => {
484 let oc_ref: &CompiledOnConflict = oc;
485 let needs_row = upsert_needs_row(oc_ref, table_schema);
486 if needs_row {
487 for (j, &i) in pk_indices.iter().enumerate() {
488 row[i] = pk_values[j].clone();
489 }
490 for (j, &i) in non_pk.iter().enumerate() {
491 row[i] =
492 std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
493 }
494 }
495 let outcome = apply_insert_with_conflict(
496 &mut wtx,
497 table_schema,
498 &key_buf,
499 &value_buf,
500 &row,
501 &pk_values,
502 oc_ref,
503 row_col_map.as_ref().unwrap(),
504 stmt.returning.is_some(),
505 )?;
506 match outcome {
507 InsertRowOutcome::Inserted => {
508 count += 1;
509 if let Some(buf) = returning_rows.as_mut() {
510 buf.push((None, proposed_row_for_returning));
511 }
512 if let Some(r) = row_for_stmt_trigger.clone() {
513 stmt_new_rows.push(r);
514 }
515 let has_after_insert_triggers =
516 schema.triggers_for(&table_schema.name).iter().any(|t| {
517 t.enabled
518 && t.timing == crate::parser::TriggerTiming::After
519 && t.granularity
520 == crate::parser::TriggerGranularity::ForEachRow
521 && t.events
522 .iter()
523 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
524 });
525 if has_after_insert_triggers {
526 super::triggers::fire_row_triggers(
527 &mut wtx,
528 schema,
529 &table_schema.name,
530 crate::parser::TriggerTiming::After,
531 super::triggers::FireEvent::Insert,
532 None,
533 Some(row.clone()),
534 &table_schema.columns,
535 )?;
536 }
537 }
538 InsertRowOutcome::Updated { old, new } => {
539 count += 1;
540 if let Some(buf) = returning_rows.as_mut() {
541 buf.push((Some(old.clone()), Some(new.clone())));
542 }
543 let has_after_update_triggers =
544 schema.triggers_for(&table_schema.name).iter().any(|t| {
545 t.enabled
546 && t.timing == crate::parser::TriggerTiming::After
547 && t.granularity
548 == crate::parser::TriggerGranularity::ForEachRow
549 && t.events.iter().any(|e| {
550 matches!(e, crate::parser::TriggerEvent::Update(_))
551 })
552 });
553 if has_after_update_triggers {
554 let changed_cols: Vec<String> = match oc_ref {
555 CompiledOnConflict::DoUpdate { assignments, .. } => assignments
556 .iter()
557 .map(|(col_idx, _)| table_schema.columns[*col_idx].name.clone())
558 .collect(),
559 _ => Vec::new(),
560 };
561 super::triggers::fire_row_triggers(
562 &mut wtx,
563 schema,
564 &table_schema.name,
565 crate::parser::TriggerTiming::After,
566 super::triggers::FireEvent::Update {
567 changed_columns: &changed_cols,
568 },
569 Some(old),
570 Some(new),
571 &table_schema.columns,
572 )?;
573 }
574 }
575 InsertRowOutcome::Skipped => {}
576 }
577 }
578 }
579 }
580
581 if has_insert_statement_triggers {
582 super::triggers::fire_statement_triggers(
583 &mut wtx,
584 schema,
585 &table_schema.name,
586 crate::parser::TriggerTiming::After,
587 super::triggers::FireEvent::Insert,
588 &table_schema.columns,
589 &[],
590 &stmt_new_rows,
591 )?;
592 }
593
594 mark_insert_dml(
595 schema,
596 &table_schema.name,
597 !plain_insert,
598 single_int_pk,
599 min_inserted_pk,
600 count,
601 );
602
603 if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
604 let qr = super::helpers::project_returning(table_schema, returning_cols, &rows)?;
605 super::helpers::drain_deferred_fk_checks(&mut wtx)?;
606 wtx.commit().map_err(SqlError::Storage)?;
607 return Ok(ExecutionResult::Query(qr));
608 }
609
610 super::helpers::drain_deferred_fk_checks(&mut wtx)?;
611 wtx.commit().map_err(SqlError::Storage)?;
612 Ok(ExecutionResult::RowsAffected(count))
613}
614
615pub(super) fn has_subquery(expr: &Expr) -> bool {
616 crate::parser::has_subquery(expr)
617}
618
619pub(super) fn stmt_has_subquery(stmt: &SelectStmt) -> bool {
620 if let Some(ref w) = stmt.where_clause {
621 if has_subquery(w) {
622 return true;
623 }
624 }
625 if let Some(ref h) = stmt.having {
626 if has_subquery(h) {
627 return true;
628 }
629 }
630 for col in &stmt.columns {
631 if let SelectColumn::Expr { expr, .. } = col {
632 if has_subquery(expr) {
633 return true;
634 }
635 }
636 }
637 for ob in &stmt.order_by {
638 if has_subquery(&ob.expr) {
639 return true;
640 }
641 }
642 for join in &stmt.joins {
643 if let Some(ref on_expr) = join.on_clause {
644 if has_subquery(on_expr) {
645 return true;
646 }
647 }
648 }
649 false
650}
651
652pub(super) fn materialize_expr(
653 expr: &Expr,
654 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
655) -> Result<Expr> {
656 match expr {
657 Expr::InSubquery {
658 expr: e,
659 subquery,
660 negated,
661 } => {
662 let inner = materialize_expr(e, exec_sub)?;
663 let qr = exec_sub(subquery)?;
664 if !qr.columns.is_empty() && qr.columns.len() != 1 {
665 return Err(SqlError::SubqueryMultipleColumns);
666 }
667 let mut values = rustc_hash::FxHashSet::default();
668 let mut has_null = false;
669 for row in &qr.rows {
670 if row[0].is_null() {
671 has_null = true;
672 } else {
673 values.insert(row[0].clone());
674 }
675 }
676 Ok(Expr::InSet {
677 expr: Box::new(inner),
678 values,
679 has_null,
680 negated: *negated,
681 })
682 }
683 Expr::ScalarSubquery(subquery) => {
684 let qr = exec_sub(subquery)?;
685 if qr.rows.len() > 1 {
686 return Err(SqlError::SubqueryMultipleRows);
687 }
688 let val = if qr.rows.is_empty() {
689 Value::Null
690 } else {
691 qr.rows[0][0].clone()
692 };
693 Ok(Expr::Literal(val))
694 }
695 Expr::Exists { subquery, negated } => {
696 let qr = exec_sub(subquery)?;
697 let exists = !qr.rows.is_empty();
698 let result = if *negated { !exists } else { exists };
699 Ok(Expr::Literal(Value::Boolean(result)))
700 }
701 Expr::InList {
702 expr: e,
703 list,
704 negated,
705 } => {
706 let inner = materialize_expr(e, exec_sub)?;
707 let items = list
708 .iter()
709 .map(|item| materialize_expr(item, exec_sub))
710 .collect::<Result<Vec<_>>>()?;
711 Ok(Expr::InList {
712 expr: Box::new(inner),
713 list: items,
714 negated: *negated,
715 })
716 }
717 Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
718 left: Box::new(materialize_expr(left, exec_sub)?),
719 op: *op,
720 right: Box::new(materialize_expr(right, exec_sub)?),
721 }),
722 Expr::UnaryOp { op, expr: e } => Ok(Expr::UnaryOp {
723 op: *op,
724 expr: Box::new(materialize_expr(e, exec_sub)?),
725 }),
726 Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(materialize_expr(e, exec_sub)?))),
727 Expr::IsNotNull(e) => Ok(Expr::IsNotNull(Box::new(materialize_expr(e, exec_sub)?))),
728 Expr::InSet {
729 expr: e,
730 values,
731 has_null,
732 negated,
733 } => Ok(Expr::InSet {
734 expr: Box::new(materialize_expr(e, exec_sub)?),
735 values: values.clone(),
736 has_null: *has_null,
737 negated: *negated,
738 }),
739 Expr::Between {
740 expr: e,
741 low,
742 high,
743 negated,
744 } => Ok(Expr::Between {
745 expr: Box::new(materialize_expr(e, exec_sub)?),
746 low: Box::new(materialize_expr(low, exec_sub)?),
747 high: Box::new(materialize_expr(high, exec_sub)?),
748 negated: *negated,
749 }),
750 Expr::Like {
751 expr: e,
752 pattern,
753 escape,
754 negated,
755 } => {
756 let esc = escape
757 .as_ref()
758 .map(|es| materialize_expr(es, exec_sub).map(Box::new))
759 .transpose()?;
760 Ok(Expr::Like {
761 expr: Box::new(materialize_expr(e, exec_sub)?),
762 pattern: Box::new(materialize_expr(pattern, exec_sub)?),
763 escape: esc,
764 negated: *negated,
765 })
766 }
767 Expr::Case {
768 operand,
769 conditions,
770 else_result,
771 } => {
772 let op = operand
773 .as_ref()
774 .map(|e| materialize_expr(e, exec_sub).map(Box::new))
775 .transpose()?;
776 let conds = conditions
777 .iter()
778 .map(|(c, r)| {
779 Ok((
780 materialize_expr(c, exec_sub)?,
781 materialize_expr(r, exec_sub)?,
782 ))
783 })
784 .collect::<Result<Vec<_>>>()?;
785 let else_r = else_result
786 .as_ref()
787 .map(|e| materialize_expr(e, exec_sub).map(Box::new))
788 .transpose()?;
789 Ok(Expr::Case {
790 operand: op,
791 conditions: conds,
792 else_result: else_r,
793 })
794 }
795 Expr::Coalesce(args) => {
796 let materialized = args
797 .iter()
798 .map(|a| materialize_expr(a, exec_sub))
799 .collect::<Result<Vec<_>>>()?;
800 Ok(Expr::Coalesce(materialized))
801 }
802 Expr::Cast { expr: e, data_type } => Ok(Expr::Cast {
803 expr: Box::new(materialize_expr(e, exec_sub)?),
804 data_type: *data_type,
805 }),
806 Expr::Function {
807 name,
808 args,
809 distinct,
810 } => {
811 let materialized = args
812 .iter()
813 .map(|a| materialize_expr(a, exec_sub))
814 .collect::<Result<Vec<_>>>()?;
815 Ok(Expr::Function {
816 name: name.clone(),
817 args: materialized,
818 distinct: *distinct,
819 })
820 }
821 other => Ok(other.clone()),
822 }
823}
824
825pub(super) fn materialize_stmt(
826 stmt: &SelectStmt,
827 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
828) -> Result<SelectStmt> {
829 let where_clause = stmt
830 .where_clause
831 .as_ref()
832 .map(|e| materialize_expr(e, exec_sub))
833 .transpose()?;
834 let having = stmt
835 .having
836 .as_ref()
837 .map(|e| materialize_expr(e, exec_sub))
838 .transpose()?;
839 let columns = stmt
840 .columns
841 .iter()
842 .map(|c| match c {
843 SelectColumn::AllColumns => Ok(SelectColumn::AllColumns),
844 SelectColumn::AllFromOld => Ok(SelectColumn::AllFromOld),
845 SelectColumn::AllFromNew => Ok(SelectColumn::AllFromNew),
846 SelectColumn::Expr { expr, alias } => Ok(SelectColumn::Expr {
847 expr: materialize_expr(expr, exec_sub)?,
848 alias: alias.clone(),
849 }),
850 })
851 .collect::<Result<Vec<_>>>()?;
852 let order_by = stmt
853 .order_by
854 .iter()
855 .map(|ob| {
856 Ok(OrderByItem {
857 expr: materialize_expr(&ob.expr, exec_sub)?,
858 descending: ob.descending,
859 nulls_first: ob.nulls_first,
860 })
861 })
862 .collect::<Result<Vec<_>>>()?;
863 let joins = stmt
864 .joins
865 .iter()
866 .map(|j| {
867 let on_clause = j
868 .on_clause
869 .as_ref()
870 .map(|e| materialize_expr(e, exec_sub))
871 .transpose()?;
872 Ok(JoinClause {
873 join_type: j.join_type,
874 table: j.table.clone(),
875 subquery: j.subquery.clone(),
876 on_clause,
877 })
878 })
879 .collect::<Result<Vec<_>>>()?;
880 let group_by = stmt
881 .group_by
882 .iter()
883 .map(|e| materialize_expr(e, exec_sub))
884 .collect::<Result<Vec<_>>>()?;
885 Ok(SelectStmt {
886 columns,
887 from: stmt.from.clone(),
888 from_alias: stmt.from_alias.clone(),
889 from_subquery: stmt.from_subquery.clone(),
890 from_args: stmt.from_args.clone(),
891 from_json_table: stmt.from_json_table.clone(),
892 joins,
893 distinct: stmt.distinct,
894 where_clause,
895 order_by,
896 limit: stmt.limit.clone(),
897 offset: stmt.offset.clone(),
898 group_by,
899 having,
900 })
901}
902
903pub(super) fn exec_subquery_read(
904 db: &Database,
905 schema: &SchemaManager,
906 stmt: &SelectStmt,
907 ctes: &CteContext,
908) -> Result<QueryResult> {
909 let mut rtx = db.begin_read();
910 exec_subquery_with_read(&mut rtx, schema, stmt, ctes)
911}
912
913pub(super) fn exec_subquery_with_read(
914 rtx: &mut ReadTxn<'_>,
915 schema: &SchemaManager,
916 stmt: &SelectStmt,
917 ctes: &CteContext,
918) -> Result<QueryResult> {
919 match super::exec_select_with_read(rtx, schema, stmt, ctes)? {
920 ExecutionResult::Query(qr) => Ok(qr),
921 _ => Ok(QueryResult {
922 columns: vec![],
923 rows: vec![],
924 }),
925 }
926}
927
928pub(super) fn exec_subquery_write(
929 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
930 schema: &SchemaManager,
931 stmt: &SelectStmt,
932 ctes: &CteContext,
933) -> Result<QueryResult> {
934 match super::exec_select_in_txn(wtx, schema, stmt, ctes)? {
935 ExecutionResult::Query(qr) => Ok(qr),
936 _ => Ok(QueryResult {
937 columns: vec![],
938 rows: vec![],
939 }),
940 }
941}
942
943pub(super) fn update_has_subquery(stmt: &UpdateStmt) -> bool {
944 stmt.where_clause.as_ref().is_some_and(has_subquery)
945 || stmt.assignments.iter().any(|(_, e)| has_subquery(e))
946}
947
948pub(super) fn materialize_update(
949 stmt: &UpdateStmt,
950 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
951) -> Result<UpdateStmt> {
952 let where_clause = stmt
953 .where_clause
954 .as_ref()
955 .map(|e| materialize_expr(e, exec_sub))
956 .transpose()?;
957 let assignments = stmt
958 .assignments
959 .iter()
960 .map(|(name, expr)| Ok((name.clone(), materialize_expr(expr, exec_sub)?)))
961 .collect::<Result<Vec<_>>>()?;
962 Ok(UpdateStmt {
963 table: stmt.table.clone(),
964 assignments,
965 where_clause,
966 returning: stmt.returning.clone(),
967 })
968}
969
970pub(super) fn delete_has_subquery(stmt: &DeleteStmt) -> bool {
971 stmt.where_clause.as_ref().is_some_and(has_subquery)
972}
973
974pub(super) fn materialize_delete(
975 stmt: &DeleteStmt,
976 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
977) -> Result<DeleteStmt> {
978 let where_clause = stmt
979 .where_clause
980 .as_ref()
981 .map(|e| materialize_expr(e, exec_sub))
982 .transpose()?;
983 Ok(DeleteStmt {
984 table: stmt.table.clone(),
985 where_clause,
986 returning: stmt.returning.clone(),
987 })
988}
989
990pub(super) fn insert_has_subquery(stmt: &InsertStmt) -> bool {
991 match &stmt.source {
992 InsertSource::Values(rows) => rows.iter().any(|row| row.iter().any(has_subquery)),
993 InsertSource::Select(_) => false,
995 }
996}
997
998pub(super) fn materialize_insert(
999 stmt: &InsertStmt,
1000 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1001) -> Result<InsertStmt> {
1002 let source = match &stmt.source {
1003 InsertSource::Values(rows) => {
1004 let mat = rows
1005 .iter()
1006 .map(|row| {
1007 row.iter()
1008 .map(|e| materialize_expr(e, exec_sub))
1009 .collect::<Result<Vec<_>>>()
1010 })
1011 .collect::<Result<Vec<_>>>()?;
1012 InsertSource::Values(mat)
1013 }
1014 InsertSource::Select(sq) => {
1015 let ctes = sq
1016 .ctes
1017 .iter()
1018 .map(|c| {
1019 Ok(CteDefinition {
1020 name: c.name.clone(),
1021 column_aliases: c.column_aliases.clone(),
1022 body: materialize_query_body(&c.body, exec_sub)?,
1023 })
1024 })
1025 .collect::<Result<Vec<_>>>()?;
1026 let body = materialize_query_body(&sq.body, exec_sub)?;
1027 InsertSource::Select(Box::new(SelectQuery {
1028 ctes,
1029 recursive: sq.recursive,
1030 body,
1031 }))
1032 }
1033 };
1034 Ok(InsertStmt {
1035 table: stmt.table.clone(),
1036 columns: stmt.columns.clone(),
1037 source,
1038 on_conflict: stmt.on_conflict.clone(),
1039 returning: stmt.returning.clone(),
1040 })
1041}
1042
1043pub(super) fn materialize_query_body(
1044 body: &QueryBody,
1045 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
1046) -> Result<QueryBody> {
1047 match body {
1048 QueryBody::Select(sel) => Ok(QueryBody::Select(Box::new(materialize_stmt(
1049 sel, exec_sub,
1050 )?))),
1051 QueryBody::Compound(comp) => Ok(QueryBody::Compound(Box::new(CompoundSelect {
1052 op: comp.op.clone(),
1053 all: comp.all,
1054 left: Box::new(materialize_query_body(&comp.left, exec_sub)?),
1055 right: Box::new(materialize_query_body(&comp.right, exec_sub)?),
1056 order_by: comp.order_by.clone(),
1057 limit: comp.limit.clone(),
1058 offset: comp.offset.clone(),
1059 }))),
1060 QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Ok(body.clone()),
1061 }
1062}
1063
1064pub(super) fn exec_query_body_with_read(
1065 rtx: &mut ReadTxn<'_>,
1066 schema: &SchemaManager,
1067 body: &QueryBody,
1068 ctes: &CteContext,
1069) -> Result<ExecutionResult> {
1070 match body {
1071 QueryBody::Select(sel) => super::exec_select_with_read(rtx, schema, sel, ctes),
1072 QueryBody::Compound(comp) => exec_compound_select_with_read(rtx, schema, comp, ctes),
1073 QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Err(
1074 SqlError::Unsupported("DML CTE bodies require an active write transaction".into()),
1075 ),
1076 }
1077}
1078
1079pub(super) fn exec_query_body_in_txn(
1080 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1081 schema: &SchemaManager,
1082 body: &QueryBody,
1083 ctes: &CteContext,
1084) -> Result<ExecutionResult> {
1085 match body {
1086 QueryBody::Select(sel) => super::exec_select_in_txn(wtx, schema, sel, ctes),
1087 QueryBody::Compound(comp) => exec_compound_select_in_txn(wtx, schema, comp, ctes),
1088 QueryBody::Insert(ins) => exec_insert_in_txn_with_ctes(wtx, schema, ins, &[], ctes),
1089 QueryBody::Update(upd) => super::exec_update_in_txn(wtx, schema, upd),
1090 QueryBody::Delete(del) => super::exec_delete_in_txn(wtx, schema, del),
1091 }
1092}
1093
1094pub(super) fn exec_query_body_read(
1095 db: &Database,
1096 schema: &SchemaManager,
1097 body: &QueryBody,
1098 ctes: &CteContext,
1099) -> Result<QueryResult> {
1100 let mut rtx = db.begin_read();
1101 exec_query_body_with_read_qr(&mut rtx, schema, body, ctes)
1102}
1103
1104pub(super) fn exec_query_body_with_read_qr(
1105 rtx: &mut ReadTxn<'_>,
1106 schema: &SchemaManager,
1107 body: &QueryBody,
1108 ctes: &CteContext,
1109) -> Result<QueryResult> {
1110 match exec_query_body_with_read(rtx, schema, body, ctes)? {
1111 ExecutionResult::Query(qr) => Ok(qr),
1112 _ => Ok(QueryResult {
1113 columns: vec![],
1114 rows: vec![],
1115 }),
1116 }
1117}
1118
1119pub(super) fn exec_query_body_write(
1120 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1121 schema: &SchemaManager,
1122 body: &QueryBody,
1123 ctes: &CteContext,
1124) -> Result<QueryResult> {
1125 match exec_query_body_in_txn(wtx, schema, body, ctes)? {
1126 ExecutionResult::Query(qr) => Ok(qr),
1127 _ => Ok(QueryResult {
1128 columns: vec![],
1129 rows: vec![],
1130 }),
1131 }
1132}
1133
1134pub(super) fn exec_compound_select_with_read(
1135 rtx: &mut ReadTxn<'_>,
1136 schema: &SchemaManager,
1137 comp: &CompoundSelect,
1138 ctes: &CteContext,
1139) -> Result<ExecutionResult> {
1140 let left_qr = match exec_query_body_with_read(rtx, schema, &comp.left, ctes)? {
1141 ExecutionResult::Query(qr) => qr,
1142 _ => QueryResult {
1143 columns: vec![],
1144 rows: vec![],
1145 },
1146 };
1147 let right_qr = match exec_query_body_with_read(rtx, schema, &comp.right, ctes)? {
1148 ExecutionResult::Query(qr) => qr,
1149 _ => QueryResult {
1150 columns: vec![],
1151 rows: vec![],
1152 },
1153 };
1154 apply_set_operation(comp, left_qr, right_qr)
1155}
1156
1157pub(super) fn exec_compound_select_in_txn(
1158 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1159 schema: &SchemaManager,
1160 comp: &CompoundSelect,
1161 ctes: &CteContext,
1162) -> Result<ExecutionResult> {
1163 let left_qr = match exec_query_body_in_txn(wtx, schema, &comp.left, ctes)? {
1164 ExecutionResult::Query(qr) => qr,
1165 _ => QueryResult {
1166 columns: vec![],
1167 rows: vec![],
1168 },
1169 };
1170 let right_qr = match exec_query_body_in_txn(wtx, schema, &comp.right, ctes)? {
1171 ExecutionResult::Query(qr) => qr,
1172 _ => QueryResult {
1173 columns: vec![],
1174 rows: vec![],
1175 },
1176 };
1177 apply_set_operation(comp, left_qr, right_qr)
1178}
1179
1180pub(super) fn apply_set_operation(
1181 comp: &CompoundSelect,
1182 left_qr: QueryResult,
1183 right_qr: QueryResult,
1184) -> Result<ExecutionResult> {
1185 if !left_qr.columns.is_empty()
1186 && !right_qr.columns.is_empty()
1187 && left_qr.columns.len() != right_qr.columns.len()
1188 {
1189 return Err(SqlError::CompoundColumnCountMismatch {
1190 left: left_qr.columns.len(),
1191 right: right_qr.columns.len(),
1192 });
1193 }
1194
1195 let columns = left_qr.columns;
1196
1197 let mut rows = match (&comp.op, comp.all) {
1198 (SetOp::Union, true) => {
1199 let total = left_qr.rows.len().saturating_add(right_qr.rows.len());
1200 let mut rows = Vec::with_capacity(total);
1201 rows.extend(left_qr.rows);
1202 rows.extend(right_qr.rows);
1203 rows
1204 }
1205 (SetOp::Union, false) => {
1206 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1207 let mut rows = Vec::new();
1208 for row in left_qr.rows.into_iter().chain(right_qr.rows) {
1209 if !seen.contains(&row) {
1210 seen.insert(row.clone());
1211 rows.push(row);
1212 }
1213 }
1214 rows
1215 }
1216 (SetOp::Intersect, true) => {
1217 let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1218 for row in &right_qr.rows {
1219 *right_counts.entry(row.clone()).or_insert(0) += 1;
1220 }
1221 let mut rows = Vec::new();
1222 for row in left_qr.rows {
1223 if let Some(count) = right_counts.get_mut(&row) {
1224 if *count > 0 {
1225 *count -= 1;
1226 rows.push(row);
1227 }
1228 }
1229 }
1230 rows
1231 }
1232 (SetOp::Intersect, false) => {
1233 let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1234 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1235 let mut rows = Vec::new();
1236 for row in left_qr.rows {
1237 if right_set.contains(&row) && !seen.contains(&row) {
1238 seen.insert(row.clone());
1239 rows.push(row);
1240 }
1241 }
1242 rows
1243 }
1244 (SetOp::Except, true) => {
1245 let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1246 for row in &right_qr.rows {
1247 *right_counts.entry(row.clone()).or_insert(0) += 1;
1248 }
1249 let mut rows = Vec::new();
1250 for row in left_qr.rows {
1251 if let Some(count) = right_counts.get_mut(&row) {
1252 if *count > 0 {
1253 *count -= 1;
1254 continue;
1255 }
1256 }
1257 rows.push(row);
1258 }
1259 rows
1260 }
1261 (SetOp::Except, false) => {
1262 let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1263 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1264 let mut rows = Vec::new();
1265 for row in left_qr.rows {
1266 if !right_set.contains(&row) && !seen.contains(&row) {
1267 seen.insert(row.clone());
1268 rows.push(row);
1269 }
1270 }
1271 rows
1272 }
1273 };
1274
1275 if !comp.order_by.is_empty() {
1276 let col_defs: Vec<crate::types::ColumnDef> = columns
1277 .iter()
1278 .enumerate()
1279 .map(|(i, name)| crate::types::ColumnDef {
1280 name: name.clone(),
1281 data_type: crate::types::DataType::Null,
1282 nullable: true,
1283 position: i as u16,
1284 default_expr: None,
1285 default_sql: None,
1286 check_expr: None,
1287 check_sql: None,
1288 check_name: None,
1289 is_with_timezone: false,
1290 generated_expr: None,
1291 generated_sql: None,
1292 generated_kind: None,
1293 collation: crate::types::Collation::Binary,
1294 })
1295 .collect();
1296 sort_rows(&mut rows, &comp.order_by, &col_defs)?;
1297 }
1298
1299 if let Some(ref offset_expr) = comp.offset {
1300 let offset = eval_const_int(offset_expr)?.max(0) as usize;
1301 if offset < rows.len() {
1302 rows = rows.split_off(offset);
1303 } else {
1304 rows.clear();
1305 }
1306 }
1307
1308 if let Some(ref limit_expr) = comp.limit {
1309 let limit = eval_const_int(limit_expr)?.max(0) as usize;
1310 rows.truncate(limit);
1311 }
1312
1313 Ok(ExecutionResult::Query(QueryResult { columns, rows }))
1314}
1315
1316struct InsertBufs {
1317 row: Vec<Value>,
1318 pk_values: Vec<Value>,
1319 value_values: Vec<Value>,
1320 key_buf: Vec<u8>,
1321 value_buf: Vec<u8>,
1322 col_indices: Vec<usize>,
1323 fk_key_buf: Vec<u8>,
1324}
1325
1326impl InsertBufs {
1327 fn new() -> Self {
1328 Self {
1329 row: Vec::new(),
1330 pk_values: Vec::new(),
1331 value_values: Vec::new(),
1332 key_buf: Vec::with_capacity(64),
1333 value_buf: Vec::with_capacity(256),
1334 col_indices: Vec::new(),
1335 fk_key_buf: Vec::with_capacity(64),
1336 }
1337 }
1338}
1339
1340thread_local! {
1341 static INSERT_SCRATCH: RefCell<InsertBufs> = RefCell::new(InsertBufs::new());
1342 static UPSERT_SCRATCH: RefCell<UpsertBufs> = RefCell::new(UpsertBufs::new());
1343}
1344
1345fn with_insert_scratch<R>(f: impl FnOnce(&mut InsertBufs) -> R) -> R {
1346 INSERT_SCRATCH.with(|slot| match slot.try_borrow_mut() {
1347 Ok(mut borrowed) => f(&mut borrowed),
1348 Err(_) => {
1349 let mut local = InsertBufs::new();
1350 f(&mut local)
1351 }
1352 })
1353}
1354
1355pub(super) struct UpsertBufs {
1356 old_row: Vec<Value>,
1357 new_row: Vec<Value>,
1358 value_values: Vec<Value>,
1359 new_value_buf: Vec<u8>,
1360}
1361
1362impl UpsertBufs {
1363 pub(super) fn new() -> Self {
1364 Self {
1365 old_row: Vec::new(),
1366 new_row: Vec::new(),
1367 value_values: Vec::new(),
1368 new_value_buf: Vec::with_capacity(256),
1369 }
1370 }
1371}
1372
1373pub fn exec_insert_in_txn(
1374 wtx: &mut WriteTxn<'_>,
1375 schema: &SchemaManager,
1376 stmt: &InsertStmt,
1377 params: &[Value],
1378) -> Result<ExecutionResult> {
1379 with_insert_scratch(|bufs| {
1380 exec_insert_in_txn_impl(
1381 wtx,
1382 schema,
1383 stmt,
1384 params,
1385 bufs,
1386 None,
1387 &CteContext::default(),
1388 )
1389 })
1390}
1391
1392pub(super) fn exec_insert_in_txn_with_ctes(
1393 wtx: &mut WriteTxn<'_>,
1394 schema: &SchemaManager,
1395 stmt: &InsertStmt,
1396 params: &[Value],
1397 outer_ctes: &CteContext,
1398) -> Result<ExecutionResult> {
1399 with_insert_scratch(|bufs| {
1400 exec_insert_in_txn_impl(wtx, schema, stmt, params, bufs, None, outer_ctes)
1401 })
1402}
1403
1404fn exec_insert_in_txn_cached(
1405 wtx: &mut WriteTxn<'_>,
1406 schema: &SchemaManager,
1407 stmt: &InsertStmt,
1408 params: &[Value],
1409 cache: &InsertCache,
1410) -> Result<ExecutionResult> {
1411 with_insert_scratch(|bufs| {
1412 exec_insert_in_txn_impl(
1413 wtx,
1414 schema,
1415 stmt,
1416 params,
1417 bufs,
1418 Some(cache),
1419 &CteContext::default(),
1420 )
1421 })
1422}
1423
1424fn exec_insert_in_txn_impl(
1425 wtx: &mut WriteTxn<'_>,
1426 schema: &SchemaManager,
1427 stmt: &InsertStmt,
1428 params: &[Value],
1429 bufs: &mut InsertBufs,
1430 cache: Option<&InsertCache>,
1431 outer_ctes: &CteContext,
1432) -> Result<ExecutionResult> {
1433 let empty_ctes = CteContext::default();
1434 let materialized;
1435 let has_sub = match cache {
1436 Some(c) => c.has_subquery,
1437 None => insert_has_subquery(stmt),
1438 };
1439 let stmt = if has_sub {
1440 materialized = materialize_insert(stmt, &mut |sub| {
1441 exec_subquery_write(wtx, schema, sub, &empty_ctes)
1442 })?;
1443 &materialized
1444 } else {
1445 stmt
1446 };
1447
1448 let view_lookup_key = stmt.table.to_ascii_lowercase();
1449 if let Some(view_def) = schema.get_view(&view_lookup_key) {
1450 if super::triggers::has_instead_of(
1451 schema,
1452 &view_lookup_key,
1453 super::triggers::FireEvent::Insert,
1454 ) {
1455 let aliases = view_def.column_aliases.clone();
1456 return exec_instead_of_view_insert_in_txn(
1457 wtx,
1458 schema,
1459 &view_lookup_key,
1460 &aliases,
1461 stmt,
1462 params,
1463 );
1464 }
1465 return Err(SqlError::CannotModifyView(stmt.table.clone()));
1466 }
1467
1468 let table_schema = schema
1469 .get(&stmt.table)
1470 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1471 super::ann_persist::purge_segment(wtx, &table_schema.name)?;
1472
1473 let default_columns;
1474 let insert_columns: &[String] = if stmt.columns.is_empty() {
1475 default_columns = table_schema
1476 .columns
1477 .iter()
1478 .map(|c| c.name.clone())
1479 .collect::<Vec<_>>();
1480 &default_columns
1481 } else {
1482 &stmt.columns
1483 };
1484
1485 bufs.col_indices.clear();
1486 if let Some(c) = cache {
1487 bufs.col_indices.extend_from_slice(&c.col_indices);
1488 } else {
1489 for name in insert_columns {
1490 bufs.col_indices.push(
1491 table_schema
1492 .column_index(name)
1493 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?,
1494 );
1495 }
1496 }
1497
1498 if cache.is_none() {
1499 for &ci in &bufs.col_indices {
1500 if table_schema.columns[ci].generated_kind.is_some() {
1501 return Err(SqlError::CannotInsertIntoGeneratedColumn(
1502 table_schema.columns[ci].name.clone(),
1503 ));
1504 }
1505 }
1506 }
1507
1508 let generated_cols_uncached: Vec<(usize, &Expr, FastGenEval)>;
1509 let cached_gen_positions: &[usize];
1510 let cached_gen_fast_evals: &[FastGenEval];
1511 if let Some(c) = cache {
1512 cached_gen_positions = &c.generated_col_positions;
1513 cached_gen_fast_evals = &c.generated_fast_evals;
1514 generated_cols_uncached = Vec::new();
1515 } else {
1516 cached_gen_positions = &[];
1517 cached_gen_fast_evals = &[];
1518 generated_cols_uncached = table_schema
1519 .columns
1520 .iter()
1521 .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
1522 .map(|c| {
1523 let expr = c.generated_expr.as_ref().unwrap();
1524 let fe = detect_fast_gen_eval(expr, table_schema);
1525 (c.position as usize, expr, fe)
1526 })
1527 .collect();
1528 }
1529 let has_gen_cols = !cached_gen_positions.is_empty() || !generated_cols_uncached.is_empty();
1530 let row_col_map_for_gen_owned: Option<ColumnMap> = if !has_gen_cols || cache.is_some() {
1531 None
1532 } else {
1533 Some(ColumnMap::new(&table_schema.columns))
1534 };
1535 let row_col_map_for_gen: Option<&ColumnMap> = if !has_gen_cols {
1536 None
1537 } else if let Some(c) = cache {
1538 c.row_col_map.as_ref()
1539 } else {
1540 row_col_map_for_gen_owned.as_ref()
1541 };
1542
1543 let any_defaults = match cache {
1544 Some(c) => c.any_defaults,
1545 None => table_schema
1546 .columns
1547 .iter()
1548 .any(|c| c.default_expr.is_some()),
1549 };
1550 let defaults: Vec<(usize, &Expr)> = if any_defaults {
1551 table_schema
1552 .columns
1553 .iter()
1554 .filter(|c| {
1555 c.default_expr.is_some() && !bufs.col_indices.contains(&(c.position as usize))
1556 })
1557 .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
1558 .collect()
1559 } else {
1560 Vec::new()
1561 };
1562
1563 let has_checks = match cache {
1564 Some(c) => c.has_checks,
1565 None => table_schema.has_checks(),
1566 };
1567 let check_col_map = if has_checks {
1568 Some(ColumnMap::new(&table_schema.columns))
1569 } else {
1570 None
1571 };
1572
1573 let (pk_indices, non_pk, enc_pos, phys_count, dropped): (
1574 &[usize],
1575 &[usize],
1576 &[u16],
1577 usize,
1578 &[u16],
1579 ) = if let Some(c) = cache {
1580 (
1581 &c.pk_indices,
1582 &c.non_pk_indices,
1583 &c.encoding_positions,
1584 c.phys_count,
1585 &c.dropped_non_pk_slots,
1586 )
1587 } else {
1588 (
1589 table_schema.pk_indices(),
1590 table_schema.non_pk_indices(),
1591 table_schema.encoding_positions(),
1592 table_schema.physical_non_pk_count(),
1593 table_schema.dropped_non_pk_slots(),
1594 )
1595 };
1596
1597 bufs.row.resize(table_schema.columns.len(), Value::Null);
1598 bufs.pk_values.resize(pk_indices.len(), Value::Null);
1599 bufs.value_values.resize(phys_count, Value::Null);
1600
1601 let table_bytes = table_schema.name.as_bytes();
1602 let has_fks = !table_schema.foreign_keys.is_empty();
1603 let has_indices = !table_schema.indices.is_empty();
1604 let has_defaults = !defaults.is_empty();
1605
1606 let compiled_conflict: Option<Arc<CompiledOnConflict>> = match (cache, &stmt.on_conflict) {
1607 (Some(c), Some(_)) if c.on_conflict.is_some() => c.on_conflict.clone(),
1608 (_, Some(oc)) => Some(Arc::new(compile_on_conflict(oc, table_schema)?)),
1609 (_, None) => None,
1610 };
1611
1612 let row_col_map_owned: Option<ColumnMap> =
1613 if compiled_conflict.is_some() && cache.and_then(|c| c.row_col_map.as_ref()).is_none() {
1614 Some(ColumnMap::new(&table_schema.columns))
1615 } else {
1616 None
1617 };
1618 let row_col_map: Option<&ColumnMap> = cache
1619 .and_then(|c| c.row_col_map.as_ref())
1620 .or(row_col_map_owned.as_ref());
1621
1622 let select_rows = match &stmt.source {
1623 InsertSource::Select(sq) => {
1624 let insert_ctes = super::materialize_all_ctes_with_outer(
1625 &sq.ctes,
1626 sq.recursive,
1627 outer_ctes,
1628 &mut |body, ctx| exec_query_body_write(wtx, schema, body, ctx),
1629 )?;
1630 let qr = exec_query_body_write(wtx, schema, &sq.body, &insert_ctes)?;
1631 Some(qr.rows)
1632 }
1633 InsertSource::Values(_) => None,
1634 };
1635
1636 let mut count: u64 = 0;
1637 let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
1638 stmt.returning.as_ref().map(|_| Vec::new());
1639
1640 let plain_insert = compiled_conflict.is_none();
1641 let single_int_pk = is_single_int_pk(table_schema);
1642 let mut min_inserted_pk: Option<i64> = None;
1643
1644 let values = match &stmt.source {
1645 InsertSource::Values(rows) => Some(rows.as_slice()),
1646 InsertSource::Select(_) => None,
1647 };
1648 let sel_rows = select_rows.as_deref();
1649
1650 let total = match (values, sel_rows) {
1651 (Some(rows), _) => rows.len(),
1652 (_, Some(rows)) => rows.len(),
1653 _ => 0,
1654 };
1655
1656 if let Some(sel) = sel_rows {
1657 if !sel.is_empty() && sel[0].len() != insert_columns.len() {
1658 return Err(SqlError::InvalidValue(format!(
1659 "INSERT ... SELECT column count mismatch: expected {}, got {}",
1660 insert_columns.len(),
1661 sel[0].len()
1662 )));
1663 }
1664 }
1665
1666 let has_insert_statement_triggers_impl =
1667 schema.triggers_for(&table_schema.name).iter().any(|t| {
1668 t.enabled
1669 && t.granularity == crate::parser::TriggerGranularity::ForEachStatement
1670 && t.events
1671 .iter()
1672 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1673 });
1674 let mut stmt_new_rows_impl: Vec<Vec<Value>> = if has_insert_statement_triggers_impl {
1675 Vec::with_capacity(total)
1676 } else {
1677 Vec::new()
1678 };
1679 if has_insert_statement_triggers_impl {
1680 super::triggers::fire_statement_triggers(
1681 wtx,
1682 schema,
1683 &table_schema.name,
1684 crate::parser::TriggerTiming::Before,
1685 super::triggers::FireEvent::Insert,
1686 &table_schema.columns,
1687 &[],
1688 &[],
1689 )?;
1690 }
1691
1692 let skip_row_clear = cache.is_some_and(|c| c.row_fully_overwritten);
1693 for idx in 0..total {
1694 if !skip_row_clear {
1695 for v in bufs.row.iter_mut() {
1696 *v = Value::Null;
1697 }
1698 }
1699
1700 if let Some(value_rows) = values {
1701 if let Some(plan) = cache.and_then(|c| c.bind_plan.as_ref()) {
1702 for action in plan {
1703 match action {
1704 BindAction::Param {
1705 param_idx,
1706 col_idx,
1707 target,
1708 } => {
1709 let v = ¶ms[*param_idx];
1710 bufs.row[*col_idx] = if v.is_null() {
1711 Value::Null
1712 } else if v.data_type() == *target {
1713 v.clone()
1714 } else {
1715 let got = v.data_type();
1716 v.clone().coerce_into(*target).ok_or_else(|| {
1717 SqlError::TypeMismatch {
1718 expected: target.to_string(),
1719 got: got.to_string(),
1720 }
1721 })?
1722 };
1723 }
1724 BindAction::Literal { value, col_idx } => {
1725 bufs.row[*col_idx] = value.clone();
1726 }
1727 }
1728 }
1729 } else {
1730 let value_row = &value_rows[idx];
1731 if value_row.len() != insert_columns.len() {
1732 return Err(SqlError::InvalidValue(format!(
1733 "expected {} values, got {}",
1734 insert_columns.len(),
1735 value_row.len()
1736 )));
1737 }
1738 for (i, expr) in value_row.iter().enumerate() {
1739 let val = match expr {
1740 Expr::Parameter(n) => params
1741 .get(n - 1)
1742 .cloned()
1743 .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
1744 Expr::Literal(v) => v.clone(),
1745 _ => eval_const_expr(expr)?,
1746 };
1747 let col_idx = bufs.col_indices[i];
1748 let col = &table_schema.columns[col_idx];
1749 let got_type = val.data_type();
1750 bufs.row[col_idx] = if val.is_null() {
1751 Value::Null
1752 } else {
1753 val.coerce_into(col.data_type)
1754 .ok_or_else(|| SqlError::TypeMismatch {
1755 expected: col.data_type.to_string(),
1756 got: got_type.to_string(),
1757 })?
1758 };
1759 }
1760 }
1761 } else if let Some(sel) = sel_rows {
1762 let sel_row = &sel[idx];
1763 for (i, val) in sel_row.iter().enumerate() {
1764 let col_idx = bufs.col_indices[i];
1765 let col = &table_schema.columns[col_idx];
1766 let got_type = val.data_type();
1767 bufs.row[col_idx] = if val.is_null() {
1768 Value::Null
1769 } else {
1770 val.clone().coerce_into(col.data_type).ok_or_else(|| {
1771 SqlError::TypeMismatch {
1772 expected: col.data_type.to_string(),
1773 got: got_type.to_string(),
1774 }
1775 })?
1776 };
1777 }
1778 }
1779
1780 if has_defaults {
1781 for &(pos, def_expr) in &defaults {
1782 let val = eval_const_expr(def_expr)?;
1783 let col = &table_schema.columns[pos];
1784 if !val.is_null() {
1785 let got_type = val.data_type();
1786 bufs.row[pos] =
1787 val.coerce_into(col.data_type)
1788 .ok_or_else(|| SqlError::TypeMismatch {
1789 expected: col.data_type.to_string(),
1790 got: got_type.to_string(),
1791 })?;
1792 }
1793 }
1794 }
1795
1796 if let Some(gen_map) = row_col_map_for_gen {
1797 if cache.is_some() {
1798 for (pos, fast) in cached_gen_positions
1799 .iter()
1800 .copied()
1801 .zip(cached_gen_fast_evals.iter())
1802 {
1803 let gen_expr = table_schema.columns[pos].generated_expr.as_ref().unwrap();
1804 let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1805 let col = &table_schema.columns[pos];
1806 bufs.row[pos] = if val.is_null() {
1807 Value::Null
1808 } else {
1809 let got_type = val.data_type();
1810 val.coerce_into(col.data_type)
1811 .ok_or_else(|| SqlError::TypeMismatch {
1812 expected: col.data_type.to_string(),
1813 got: got_type.to_string(),
1814 })?
1815 };
1816 }
1817 } else {
1818 for (pos, gen_expr, fast) in &generated_cols_uncached {
1819 let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1820 let col = &table_schema.columns[*pos];
1821 bufs.row[*pos] = if val.is_null() {
1822 Value::Null
1823 } else {
1824 let got_type = val.data_type();
1825 val.coerce_into(col.data_type)
1826 .ok_or_else(|| SqlError::TypeMismatch {
1827 expected: col.data_type.to_string(),
1828 got: got_type.to_string(),
1829 })?
1830 };
1831 }
1832 }
1833 }
1834
1835 if let Some(c) = cache {
1836 for &pos in &c.not_null_indices {
1837 if bufs.row[pos as usize].is_null() {
1838 return Err(SqlError::NotNullViolation(
1839 table_schema.columns[pos as usize].name.clone(),
1840 ));
1841 }
1842 }
1843 } else {
1844 for col in &table_schema.columns {
1845 if !col.nullable && bufs.row[col.position as usize].is_null() {
1846 return Err(SqlError::NotNullViolation(col.name.clone()));
1847 }
1848 }
1849 }
1850
1851 if let Some(ref col_map) = check_col_map {
1852 for col in &table_schema.columns {
1853 if let Some(ref check) = col.check_expr {
1854 let result = eval_expr(check, &EvalCtx::new(col_map, &bufs.row))?;
1855 if !is_truthy(&result) && !result.is_null() {
1856 let name = col.check_name.as_deref().unwrap_or(&col.name);
1857 return Err(SqlError::CheckViolation(name.to_string()));
1858 }
1859 }
1860 }
1861 for tc in &table_schema.check_constraints {
1862 let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &bufs.row))?;
1863 if !is_truthy(&result) && !result.is_null() {
1864 let name = tc.name.as_deref().unwrap_or(&tc.sql);
1865 return Err(SqlError::CheckViolation(name.to_string()));
1866 }
1867 }
1868 }
1869
1870 if has_fks {
1871 for fk in &table_schema.foreign_keys {
1872 let any_null = fk.columns.iter().any(|&ci| bufs.row[ci as usize].is_null());
1873 if any_null {
1874 continue;
1875 }
1876 crate::encoding::encode_composite_key_from_indices(
1877 &fk.columns,
1878 &bufs.row,
1879 &mut bufs.fk_key_buf,
1880 );
1881 if fk.deferrable && fk.initially_deferred {
1882 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
1883 wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
1884 fk_name: name,
1885 foreign_table: fk.foreign_table.as_bytes().to_vec(),
1886 parent_key: bufs.fk_key_buf.clone(),
1887 });
1888 continue;
1889 }
1890 if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &bufs.fk_key_buf) {
1891 let found = wtx
1892 .table_get(fk.foreign_table.as_bytes(), &bufs.fk_key_buf)
1893 .map_err(SqlError::Storage)?;
1894 if found.is_none() {
1895 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
1896 return Err(SqlError::ForeignKeyViolation(name.to_string()));
1897 }
1898 wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &bufs.fk_key_buf);
1899 }
1900 }
1901 }
1902
1903 let proposed_row_for_returning: Option<Vec<Value>> =
1904 returning_rows.as_ref().map(|_| bufs.row.clone());
1905 let row_for_stmt_trigger_impl: Option<Vec<Value>> = if has_insert_statement_triggers_impl {
1906 Some(bufs.row.clone())
1907 } else {
1908 None
1909 };
1910
1911 let has_before_insert_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
1912 t.enabled
1913 && t.timing == crate::parser::TriggerTiming::Before
1914 && t.granularity == crate::parser::TriggerGranularity::ForEachRow
1915 && t.events
1916 .iter()
1917 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1918 });
1919 if has_before_insert_triggers {
1920 super::triggers::fire_row_triggers(
1921 wtx,
1922 schema,
1923 &table_schema.name,
1924 crate::parser::TriggerTiming::Before,
1925 super::triggers::FireEvent::Insert,
1926 None,
1927 Some(bufs.row.clone()),
1928 &table_schema.columns,
1929 )?;
1930 }
1931
1932 for (j, &i) in pk_indices.iter().enumerate() {
1933 bufs.pk_values[j] = std::mem::replace(&mut bufs.row[i], Value::Null);
1934 }
1935 match cache.map(|c| c.single_int_pk).unwrap_or(false) {
1936 true => match bufs.pk_values[0] {
1937 Value::Integer(v) => crate::encoding::encode_int_key_into(v, &mut bufs.key_buf),
1938 _ => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1939 },
1940 false => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1941 }
1942 if plain_insert && single_int_pk {
1943 if let Value::Integer(id) = &bufs.pk_values[0] {
1944 min_inserted_pk = Some(min_inserted_pk.map_or(*id, |m| m.min(*id)));
1945 }
1946 }
1947
1948 for &slot in dropped {
1949 bufs.value_values[slot as usize] = Value::Null;
1950 }
1951 for (j, &i) in non_pk.iter().enumerate() {
1952 let col = &table_schema.columns[i];
1953 if matches!(
1954 col.generated_kind,
1955 Some(crate::parser::GeneratedKind::Virtual)
1956 ) {
1957 bufs.value_values[enc_pos[j] as usize] = Value::Null;
1958 bufs.row[i] = Value::Null;
1959 } else {
1960 bufs.value_values[enc_pos[j] as usize] =
1961 std::mem::replace(&mut bufs.row[i], Value::Null);
1962 }
1963 }
1964 match cache.and_then(|c| c.row_encoder.as_ref()) {
1965 Some(tmpl) => crate::encoding::encode_int_row_with_template(
1966 tmpl,
1967 &bufs.value_values,
1968 &mut bufs.value_buf,
1969 )?,
1970 None => encode_row_into(&bufs.value_values, &mut bufs.value_buf),
1971 }
1972
1973 if bufs.key_buf.len() > citadel_core::MAX_KEY_SIZE {
1974 return Err(SqlError::KeyTooLarge {
1975 size: bufs.key_buf.len(),
1976 max: citadel_core::MAX_KEY_SIZE,
1977 });
1978 }
1979 if bufs.value_buf.len() > citadel_core::MAX_VALUE_SIZE {
1980 return Err(SqlError::RowTooLarge {
1981 size: bufs.value_buf.len(),
1982 max: citadel_core::MAX_VALUE_SIZE,
1983 });
1984 }
1985
1986 match compiled_conflict.as_ref() {
1987 None => {
1988 let is_new = wtx
1989 .table_insert(table_bytes, &bufs.key_buf, &bufs.value_buf)
1990 .map_err(SqlError::Storage)?;
1991 if !is_new {
1992 return Err(SqlError::DuplicateKey);
1993 }
1994 let has_after_insert_triggers =
1995 schema.triggers_for(&table_schema.name).iter().any(|t| {
1996 t.enabled
1997 && t.timing == crate::parser::TriggerTiming::After
1998 && t.granularity == crate::parser::TriggerGranularity::ForEachRow
1999 && t.events
2000 .iter()
2001 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
2002 });
2003 if has_indices || has_after_insert_triggers {
2004 for (j, &i) in pk_indices.iter().enumerate() {
2005 bufs.row[i] = bufs.pk_values[j].clone();
2006 }
2007 for (j, &i) in non_pk.iter().enumerate() {
2008 bufs.row[i] = std::mem::replace(
2009 &mut bufs.value_values[enc_pos[j] as usize],
2010 Value::Null,
2011 );
2012 }
2013 if has_indices {
2014 insert_index_entries(wtx, table_schema, &bufs.row, &bufs.pk_values)?;
2015 }
2016 if has_after_insert_triggers {
2017 super::triggers::fire_row_triggers(
2018 wtx,
2019 schema,
2020 &table_schema.name,
2021 crate::parser::TriggerTiming::After,
2022 super::triggers::FireEvent::Insert,
2023 None,
2024 Some(bufs.row.clone()),
2025 &table_schema.columns,
2026 )?;
2027 }
2028 }
2029 if let Some(r) = row_for_stmt_trigger_impl.clone() {
2030 stmt_new_rows_impl.push(r);
2031 }
2032 count += 1;
2033 if let Some(buf) = returning_rows.as_mut() {
2034 buf.push((None, proposed_row_for_returning));
2035 }
2036 }
2037 Some(oc) => {
2038 let oc_ref: &CompiledOnConflict = oc;
2039 let needs_row = upsert_needs_row(oc_ref, table_schema);
2040 if needs_row {
2041 for (j, &i) in pk_indices.iter().enumerate() {
2042 bufs.row[i] = bufs.pk_values[j].clone();
2043 }
2044 for (j, &i) in non_pk.iter().enumerate() {
2045 bufs.row[i] = std::mem::replace(
2046 &mut bufs.value_values[enc_pos[j] as usize],
2047 Value::Null,
2048 );
2049 }
2050 }
2051 let outcome = apply_insert_with_conflict(
2052 wtx,
2053 table_schema,
2054 &bufs.key_buf,
2055 &bufs.value_buf,
2056 &bufs.row,
2057 &bufs.pk_values,
2058 oc_ref,
2059 row_col_map.unwrap(),
2060 stmt.returning.is_some(),
2061 )?;
2062 match outcome {
2063 InsertRowOutcome::Inserted => {
2064 count += 1;
2065 if let Some(buf) = returning_rows.as_mut() {
2066 buf.push((None, proposed_row_for_returning));
2067 }
2068 if let Some(r) = row_for_stmt_trigger_impl.clone() {
2069 stmt_new_rows_impl.push(r);
2070 }
2071 let has_after_insert_triggers =
2072 schema.triggers_for(&table_schema.name).iter().any(|t| {
2073 t.enabled
2074 && t.timing == crate::parser::TriggerTiming::After
2075 && t.granularity
2076 == crate::parser::TriggerGranularity::ForEachRow
2077 && t.events
2078 .iter()
2079 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
2080 });
2081 if has_after_insert_triggers {
2082 super::triggers::fire_row_triggers(
2083 wtx,
2084 schema,
2085 &table_schema.name,
2086 crate::parser::TriggerTiming::After,
2087 super::triggers::FireEvent::Insert,
2088 None,
2089 Some(bufs.row.clone()),
2090 &table_schema.columns,
2091 )?;
2092 }
2093 }
2094 InsertRowOutcome::Updated { old, new } => {
2095 count += 1;
2096 if let Some(buf) = returning_rows.as_mut() {
2097 buf.push((Some(old.clone()), Some(new.clone())));
2098 }
2099 let has_after_update_triggers =
2100 schema.triggers_for(&table_schema.name).iter().any(|t| {
2101 t.enabled
2102 && t.timing == crate::parser::TriggerTiming::After
2103 && t.granularity
2104 == crate::parser::TriggerGranularity::ForEachRow
2105 && t.events.iter().any(|e| {
2106 matches!(e, crate::parser::TriggerEvent::Update(_))
2107 })
2108 });
2109 if has_after_update_triggers {
2110 let changed_cols: Vec<String> = match oc_ref {
2111 CompiledOnConflict::DoUpdate { assignments, .. } => assignments
2112 .iter()
2113 .map(|(col_idx, _)| table_schema.columns[*col_idx].name.clone())
2114 .collect(),
2115 _ => Vec::new(),
2116 };
2117 super::triggers::fire_row_triggers(
2118 wtx,
2119 schema,
2120 &table_schema.name,
2121 crate::parser::TriggerTiming::After,
2122 super::triggers::FireEvent::Update {
2123 changed_columns: &changed_cols,
2124 },
2125 Some(old),
2126 Some(new),
2127 &table_schema.columns,
2128 )?;
2129 }
2130 }
2131 InsertRowOutcome::Skipped => {}
2132 }
2133 }
2134 }
2135 }
2136
2137 mark_insert_dml(
2138 schema,
2139 &table_schema.name,
2140 !plain_insert,
2141 single_int_pk,
2142 min_inserted_pk,
2143 count,
2144 );
2145
2146 if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
2147 if has_insert_statement_triggers_impl {
2148 super::triggers::fire_statement_triggers(
2149 wtx,
2150 schema,
2151 &table_schema.name,
2152 crate::parser::TriggerTiming::After,
2153 super::triggers::FireEvent::Insert,
2154 &table_schema.columns,
2155 &[],
2156 &stmt_new_rows_impl,
2157 )?;
2158 }
2159 return Ok(ExecutionResult::Query(super::helpers::project_returning(
2160 table_schema,
2161 returning_cols,
2162 &rows,
2163 )?));
2164 }
2165
2166 if has_insert_statement_triggers_impl {
2167 super::triggers::fire_statement_triggers(
2168 wtx,
2169 schema,
2170 &table_schema.name,
2171 crate::parser::TriggerTiming::After,
2172 super::triggers::FireEvent::Insert,
2173 &table_schema.columns,
2174 &[],
2175 &stmt_new_rows_impl,
2176 )?;
2177 }
2178
2179 Ok(ExecutionResult::RowsAffected(count))
2180}
2181
2182pub struct CompiledInsert {
2183 table_lower: String,
2184 cached: Option<InsertCache>,
2185}
2186
2187struct InsertCache {
2188 col_indices: Vec<usize>,
2189 has_subquery: bool,
2190 any_defaults: bool,
2191 has_checks: bool,
2192 on_conflict: Option<Arc<CompiledOnConflict>>,
2193 row_col_map: Option<ColumnMap>,
2194 generated_col_positions: Vec<usize>,
2195 generated_fast_evals: Vec<FastGenEval>,
2196 pk_indices: Vec<usize>,
2197 non_pk_indices: Vec<usize>,
2198 encoding_positions: Vec<u16>,
2199 dropped_non_pk_slots: Vec<u16>,
2200 phys_count: usize,
2201 single_int_pk: bool,
2202 not_null_indices: Vec<u16>,
2203 bind_plan: Option<Vec<BindAction>>,
2204 row_fully_overwritten: bool,
2205 row_encoder: Option<crate::encoding::IntRowTemplate>,
2206 is_trivial_fast: bool,
2207 trivial_fast_program: Option<TrivialFastProgram>,
2208 needs_scoped_params: bool,
2209}
2210
2211#[derive(Clone)]
2212enum BindAction {
2213 Param {
2214 param_idx: usize,
2215 col_idx: usize,
2216 target: DataType,
2217 },
2218 Literal {
2219 value: Value,
2220 col_idx: usize,
2221 },
2222}
2223
2224#[derive(Clone)]
2225struct TrivialFastProgram {
2226 template: Vec<u8>,
2227 ops: Vec<WriteOp>,
2228 pk_param: u8,
2229 not_null_param_indices: Vec<u8>,
2230}
2231
2232#[derive(Clone)]
2233enum WriteOp {
2234 ParamI64 {
2235 param_idx: u8,
2236 off: u32,
2237 },
2238 LiteralI64 {
2239 value: i64,
2240 off: u32,
2241 },
2242 GenAddParamsI64 {
2243 a_param: u8,
2244 b_param: u8,
2245 off: u32,
2246 bitmap_byte_off: u32,
2247 bitmap_bit_mask: u8,
2248 },
2249 GenMulAddParamI64 {
2250 param_idx: u8,
2251 mul: i64,
2252 add: i64,
2253 off: u32,
2254 bitmap_byte_off: u32,
2255 bitmap_bit_mask: u8,
2256 },
2257}
2258
2259fn build_trivial_fast_program(
2260 bind_plan: &[BindAction],
2261 row_encoder: &crate::encoding::IntRowTemplate,
2262 non_virtual_pairs: &[(usize, usize)],
2263 generated_col_positions: &[usize],
2264 generated_fast_evals: &[FastGenEval],
2265 pk_indices: &[usize],
2266 columns: &[crate::types::ColumnDef],
2267) -> Option<TrivialFastProgram> {
2268 let pk_col = pk_indices[0];
2269 let col_to_slot: rustc_hash::FxHashMap<usize, usize> =
2270 non_virtual_pairs.iter().copied().collect();
2271 let slot_to_off: rustc_hash::FxHashMap<usize, usize> =
2272 row_encoder.slot_offsets.iter().copied().collect();
2273
2274 let mut col_to_param: rustc_hash::FxHashMap<usize, u8> = Default::default();
2275 let mut col_to_lit_int: rustc_hash::FxHashMap<usize, i64> = Default::default();
2276 let mut pk_param: Option<u8> = None;
2277 let mut ops: Vec<WriteOp> = Vec::with_capacity(bind_plan.len() + generated_col_positions.len());
2278 let mut not_null_param_indices: Vec<u8> = Vec::new();
2279
2280 for action in bind_plan {
2281 match action {
2282 BindAction::Param {
2283 param_idx,
2284 col_idx,
2285 target,
2286 } => {
2287 if *target != DataType::Integer {
2288 return None;
2289 }
2290 let pi: u8 = u8::try_from(*param_idx).ok()?;
2291 col_to_param.insert(*col_idx, pi);
2292 if *col_idx == pk_col {
2293 pk_param = Some(pi);
2294 } else {
2295 let slot = *col_to_slot.get(col_idx)?;
2296 let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
2297 ops.push(WriteOp::ParamI64 { param_idx: pi, off });
2298 if !columns[*col_idx].nullable {
2299 not_null_param_indices.push(pi);
2300 }
2301 }
2302 }
2303 BindAction::Literal { value, col_idx } => match value {
2304 Value::Integer(v) => {
2305 col_to_lit_int.insert(*col_idx, *v);
2306 if *col_idx == pk_col {
2307 return None;
2308 }
2309 let slot = *col_to_slot.get(col_idx)?;
2310 let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
2311 ops.push(WriteOp::LiteralI64 { value: *v, off });
2312 }
2313 _ => return None,
2314 },
2315 }
2316 }
2317
2318 let pk_param = pk_param?;
2319
2320 for (i, &gen_pos) in generated_col_positions.iter().enumerate() {
2321 let gen_slot = *col_to_slot.get(&gen_pos)?;
2322 let gen_off = u32::try_from(*slot_to_off.get(&gen_slot)?).ok()?;
2323 let bitmap_byte_off = u32::try_from(2 + gen_slot / 8).ok()?;
2324 let bitmap_bit_mask: u8 = 1u8 << (gen_slot % 8);
2325 let gen_col_nullable = columns[gen_pos].nullable;
2326
2327 match &generated_fast_evals[i] {
2328 FastGenEval::IntColAddCol {
2329 left_idx,
2330 right_idx,
2331 } => {
2332 let a_param = col_to_param.get(left_idx).copied();
2333 let b_param = col_to_param.get(right_idx).copied();
2334 match (a_param, b_param) {
2335 (Some(ap), Some(bp)) => {
2336 let deps_safe = gen_col_nullable
2337 || (not_null_param_indices.contains(&ap)
2338 && not_null_param_indices.contains(&bp));
2339 if !deps_safe {
2340 return None;
2341 }
2342 ops.push(WriteOp::GenAddParamsI64 {
2343 a_param: ap,
2344 b_param: bp,
2345 off: gen_off,
2346 bitmap_byte_off,
2347 bitmap_bit_mask,
2348 });
2349 }
2350 (Some(p), None) => {
2351 let lit = col_to_lit_int.get(right_idx).copied()?;
2352 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2353 return None;
2354 }
2355 ops.push(WriteOp::GenMulAddParamI64 {
2356 param_idx: p,
2357 mul: 1,
2358 add: lit,
2359 off: gen_off,
2360 bitmap_byte_off,
2361 bitmap_bit_mask,
2362 });
2363 }
2364 (None, Some(p)) => {
2365 let lit = col_to_lit_int.get(left_idx).copied()?;
2366 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2367 return None;
2368 }
2369 ops.push(WriteOp::GenMulAddParamI64 {
2370 param_idx: p,
2371 mul: 1,
2372 add: lit,
2373 off: gen_off,
2374 bitmap_byte_off,
2375 bitmap_bit_mask,
2376 });
2377 }
2378 (None, None) => {
2379 let la = col_to_lit_int.get(left_idx).copied()?;
2380 let lb = col_to_lit_int.get(right_idx).copied()?;
2381 ops.push(WriteOp::LiteralI64 {
2382 value: la.wrapping_add(lb),
2383 off: gen_off,
2384 });
2385 }
2386 }
2387 }
2388 FastGenEval::IntColMulAdd {
2389 col_schema_idx,
2390 mul,
2391 add,
2392 } => {
2393 if let Some(p) = col_to_param.get(col_schema_idx).copied() {
2394 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2395 return None;
2396 }
2397 ops.push(WriteOp::GenMulAddParamI64 {
2398 param_idx: p,
2399 mul: *mul,
2400 add: *add,
2401 off: gen_off,
2402 bitmap_byte_off,
2403 bitmap_bit_mask,
2404 });
2405 } else if let Some(lit) = col_to_lit_int.get(col_schema_idx).copied() {
2406 ops.push(WriteOp::LiteralI64 {
2407 value: lit.wrapping_mul(*mul).wrapping_add(*add),
2408 off: gen_off,
2409 });
2410 } else {
2411 return None;
2412 }
2413 }
2414 FastGenEval::None => return None,
2415 }
2416 }
2417
2418 Some(TrivialFastProgram {
2419 template: row_encoder.template.clone(),
2420 ops,
2421 pk_param,
2422 not_null_param_indices,
2423 })
2424}
2425
2426#[derive(Clone)]
2427pub(super) enum CompiledOnConflict {
2428 DoNothing {
2429 target: Option<ConflictKind>,
2430 },
2431 DoUpdate {
2432 target: ConflictKind,
2433 assignments: Vec<(usize, Expr)>,
2434 where_clause: Option<Expr>,
2435 fast_paths: Option<Vec<DoUpdateFastPath>>,
2436 },
2437}
2438
2439#[derive(Clone, Copy)]
2440pub(super) enum DoUpdateFastPath {
2441 IntAddConst { phys_idx: usize, delta: i64 },
2442}
2443
2444#[derive(Clone, Debug)]
2445pub(super) enum ConflictKind {
2446 PrimaryKey,
2447 UniqueIndex { index_idx: usize },
2448}
2449
2450fn resolve_conflict_target(target: &ConflictTarget, ts: &TableSchema) -> Result<ConflictKind> {
2451 match target {
2452 ConflictTarget::Columns(cols) => {
2453 let col_idx_set: Vec<u16> = cols
2454 .iter()
2455 .map(|name| {
2456 ts.column_index(name)
2457 .map(|i| i as u16)
2458 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
2459 })
2460 .collect::<Result<_>>()?;
2461 let pk_set = ts.primary_key_columns.clone();
2462 if set_equal(&col_idx_set, &pk_set) {
2463 return Ok(ConflictKind::PrimaryKey);
2464 }
2465 for (index_idx, idx) in ts.indices.iter().enumerate() {
2466 if idx.unique && set_equal(&col_idx_set, &idx.columns_vec()) {
2467 return Ok(ConflictKind::UniqueIndex { index_idx });
2468 }
2469 }
2470 Err(SqlError::Plan(
2471 "ON CONFLICT target does not match any unique constraint".into(),
2472 ))
2473 }
2474 ConflictTarget::Constraint(name) => {
2475 let lower = name.to_ascii_lowercase();
2476 for (index_idx, idx) in ts.indices.iter().enumerate() {
2477 if idx.name.eq_ignore_ascii_case(&lower) {
2478 if idx.unique {
2479 return Ok(ConflictKind::UniqueIndex { index_idx });
2480 }
2481 return Err(SqlError::Plan(format!(
2482 "ON CONFLICT ON CONSTRAINT '{name}' requires a unique index"
2483 )));
2484 }
2485 }
2486 Err(SqlError::Plan(format!(
2487 "unknown constraint '{name}'; primary keys cannot be referenced by name, use ON CONFLICT (col_list)"
2488 )))
2489 }
2490 }
2491}
2492
2493fn set_equal(a: &[u16], b: &[u16]) -> bool {
2494 if a.len() != b.len() {
2495 return false;
2496 }
2497 let mut a_sorted = a.to_vec();
2498 let mut b_sorted = b.to_vec();
2499 a_sorted.sort_unstable();
2500 b_sorted.sort_unstable();
2501 a_sorted == b_sorted
2502}
2503
2504pub(super) enum InsertRowOutcome {
2505 Inserted,
2506 Updated { old: Vec<Value>, new: Vec<Value> },
2507 Skipped,
2508}
2509
2510#[allow(clippy::too_many_arguments)]
2511#[inline]
2512pub(super) fn apply_insert_with_conflict(
2513 wtx: &mut WriteTxn<'_>,
2514 table_schema: &TableSchema,
2515 key_buf: &[u8],
2516 value_buf: &[u8],
2517 row: &[Value],
2518 pk_values: &[Value],
2519 on_conflict: &CompiledOnConflict,
2520 col_map: &ColumnMap,
2521 capture_returning: bool,
2522) -> Result<InsertRowOutcome> {
2523 let table_bytes = table_schema.name.as_bytes();
2524
2525 if let CompiledOnConflict::DoNothing { target } = on_conflict {
2526 let pk_target = matches!(target, None | Some(ConflictKind::PrimaryKey));
2527 if pk_target && table_schema.indices.is_empty() && table_schema.foreign_keys.is_empty() {
2528 let inserted = wtx
2529 .table_insert_if_absent(table_bytes, key_buf, value_buf)
2530 .map_err(SqlError::Storage)?;
2531 return Ok(if inserted {
2532 InsertRowOutcome::Inserted
2533 } else {
2534 InsertRowOutcome::Skipped
2535 });
2536 }
2537 }
2538
2539 if let CompiledOnConflict::DoUpdate {
2540 target: ConflictKind::PrimaryKey,
2541 assignments,
2542 where_clause,
2543 fast_paths,
2544 } = on_conflict
2545 {
2546 if can_fuse_do_update(table_schema, assignments) {
2547 return apply_do_update_fused(
2548 wtx,
2549 table_schema,
2550 table_bytes,
2551 key_buf,
2552 value_buf,
2553 row,
2554 assignments,
2555 where_clause.as_ref(),
2556 col_map,
2557 fast_paths.as_deref(),
2558 capture_returning,
2559 );
2560 }
2561 }
2562
2563 let primary_outcome = wtx
2564 .table_insert_or_fetch(table_bytes, key_buf, value_buf)
2565 .map_err(SqlError::Storage)?;
2566
2567 match primary_outcome {
2568 citadel_txn::write_txn::InsertOutcome::Inserted => {
2569 if table_schema.indices.is_empty() {
2570 return Ok(InsertRowOutcome::Inserted);
2571 }
2572 let mut inserted_keys: Vec<(usize, Vec<u8>)> = Vec::new();
2573 match insert_index_entries_or_fetch(
2574 wtx,
2575 table_schema,
2576 row,
2577 pk_values,
2578 &mut inserted_keys,
2579 )? {
2580 None => Ok(InsertRowOutcome::Inserted),
2581 Some(conflicting_idx) => {
2582 let matches_target =
2583 matches!(on_conflict, CompiledOnConflict::DoNothing { target: None })
2584 || matches!(
2585 on_conflict,
2586 CompiledOnConflict::DoNothing {
2587 target: Some(ConflictKind::UniqueIndex { index_idx }),
2588 } | CompiledOnConflict::DoUpdate {
2589 target: ConflictKind::UniqueIndex { index_idx },
2590 ..
2591 } if *index_idx == conflicting_idx
2592 );
2593 undo_partial_insert(wtx, table_schema, key_buf, &inserted_keys)?;
2594 if !matches_target {
2595 return Err(SqlError::UniqueViolation(
2596 table_schema.indices[conflicting_idx].name.clone(),
2597 ));
2598 }
2599 match on_conflict {
2600 CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2601 CompiledOnConflict::DoUpdate {
2602 assignments,
2603 where_clause,
2604 ..
2605 } => {
2606 let existing_pk =
2607 fetch_unique_index_pk(wtx, table_schema, conflicting_idx, row)?;
2608 apply_do_update(
2609 wtx,
2610 table_schema,
2611 &existing_pk,
2612 row,
2613 assignments,
2614 where_clause.as_ref(),
2615 col_map,
2616 capture_returning,
2617 )
2618 }
2619 }
2620 }
2621 }
2622 }
2623 citadel_txn::write_txn::InsertOutcome::Existed(old_bytes) => {
2624 let matches_target = matches!(
2625 on_conflict,
2626 CompiledOnConflict::DoNothing { target: None }
2627 | CompiledOnConflict::DoNothing {
2628 target: Some(ConflictKind::PrimaryKey),
2629 }
2630 | CompiledOnConflict::DoUpdate {
2631 target: ConflictKind::PrimaryKey,
2632 ..
2633 }
2634 );
2635 if !matches_target {
2636 return Err(SqlError::DuplicateKey);
2637 }
2638 match on_conflict {
2639 CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2640 CompiledOnConflict::DoUpdate {
2641 assignments,
2642 where_clause,
2643 ..
2644 } => {
2645 let old_row = decode_full_row(table_schema, key_buf, &old_bytes)?;
2646 apply_do_update_with_old_row(
2647 wtx,
2648 table_schema,
2649 key_buf,
2650 &old_row,
2651 row,
2652 assignments,
2653 where_clause.as_ref(),
2654 col_map,
2655 capture_returning,
2656 )
2657 }
2658 }
2659 }
2660 }
2661}
2662
2663#[inline]
2664fn apply_fast_path_patch(
2665 old_bytes: &[u8],
2666 fast_paths: &[DoUpdateFastPath],
2667) -> Result<UpsertAction> {
2668 UPSERT_SCRATCH.with(|slot| {
2669 let mut bufs = slot.borrow_mut();
2670 bufs.new_value_buf.clear();
2671 bufs.new_value_buf.extend_from_slice(old_bytes);
2672
2673 let mut patch_scratch: Vec<u8> = Vec::new();
2674
2675 for fp in fast_paths {
2676 match fp {
2677 DoUpdateFastPath::IntAddConst { phys_idx, delta } => {
2678 let decoded =
2679 crate::encoding::decode_columns(&bufs.new_value_buf, &[*phys_idx])?;
2680 let old_val = &decoded[0];
2681 let new_val = match old_val {
2682 Value::Integer(i) => Value::Integer(i.wrapping_add(*delta)),
2683 Value::Null => Value::Null,
2684 _ => {
2685 return Err(SqlError::TypeMismatch {
2686 expected: "INTEGER".into(),
2687 got: old_val.data_type().to_string(),
2688 });
2689 }
2690 };
2691 if !crate::encoding::patch_column_in_place(
2692 &mut bufs.new_value_buf,
2693 *phys_idx,
2694 &new_val,
2695 )? {
2696 patch_scratch.clear();
2697 crate::encoding::patch_row_column(
2698 &bufs.new_value_buf,
2699 *phys_idx,
2700 &new_val,
2701 &mut patch_scratch,
2702 )?;
2703 std::mem::swap(&mut bufs.new_value_buf, &mut patch_scratch);
2704 }
2705 }
2706 }
2707 }
2708
2709 if bufs.new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2710 return Err(SqlError::RowTooLarge {
2711 size: bufs.new_value_buf.len(),
2712 max: citadel_core::MAX_VALUE_SIZE,
2713 });
2714 }
2715
2716 Ok(UpsertAction::Replace(bufs.new_value_buf.clone()))
2717 })
2718}
2719
2720fn upsert_needs_row(oc: &CompiledOnConflict, ts: &TableSchema) -> bool {
2721 if !ts.indices.is_empty() {
2722 return true;
2723 }
2724 match oc {
2725 CompiledOnConflict::DoNothing { .. } => false,
2726 CompiledOnConflict::DoUpdate { fast_paths, .. } => fast_paths.is_none() || ts.has_checks(),
2727 }
2728}
2729
2730fn can_fuse_do_update(ts: &TableSchema, assignments: &[(usize, Expr)]) -> bool {
2731 if !ts.indices.is_empty() {
2732 return false;
2733 }
2734 if !ts.foreign_keys.is_empty() {
2735 return false;
2736 }
2737 if ts.columns.iter().any(|c| c.generated_kind.is_some()) {
2738 return false;
2739 }
2740 let pk = ts.pk_indices();
2741 !assignments.iter().any(|(ci, _)| pk.contains(ci))
2742}
2743
2744#[allow(clippy::too_many_arguments)]
2745#[inline]
2746fn apply_do_update_fused(
2747 wtx: &mut WriteTxn<'_>,
2748 table_schema: &TableSchema,
2749 table_bytes: &[u8],
2750 key_buf: &[u8],
2751 value_buf: &[u8],
2752 proposed_row: &[Value],
2753 assignments: &[(usize, Expr)],
2754 where_clause: Option<&Expr>,
2755 col_map: &ColumnMap,
2756 fast_paths: Option<&[DoUpdateFastPath]>,
2757 capture_returning: bool,
2758) -> Result<InsertRowOutcome> {
2759 let non_pk = table_schema.non_pk_indices();
2760 let enc_pos = table_schema.encoding_positions();
2761 let phys_count = table_schema.physical_non_pk_count();
2762 let dropped = table_schema.dropped_non_pk_slots();
2763 let has_checks = table_schema.has_checks();
2764 let has_fks = !table_schema.foreign_keys.is_empty();
2765
2766 let captured: std::cell::RefCell<Option<(Vec<Value>, Vec<Value>)>> =
2767 std::cell::RefCell::new(None);
2768
2769 let outcome =
2770 wtx.table_upsert_with::<_, SqlError>(table_bytes, key_buf, value_buf, |old_bytes| {
2771 if let Some(fps) = fast_paths {
2772 if !has_checks {
2773 let action = apply_fast_path_patch(old_bytes, fps)?;
2774 if capture_returning {
2775 if let UpsertAction::Replace(ref new_bytes) = action {
2776 let old_row = decode_full_row(table_schema, key_buf, old_bytes)?;
2777 let new_row = decode_full_row(table_schema, key_buf, new_bytes)?;
2778 *captured.borrow_mut() = Some((old_row, new_row));
2779 }
2780 }
2781 return Ok(action);
2782 }
2783 }
2784 UPSERT_SCRATCH.with(|slot| {
2785 let mut bufs = slot.borrow_mut();
2786 let UpsertBufs {
2787 old_row,
2788 new_row,
2789 value_values,
2790 new_value_buf,
2791 } = &mut *bufs;
2792
2793 old_row.clear();
2794 old_row.resize(table_schema.columns.len(), Value::Null);
2795 decode_full_row_into(table_schema, key_buf, old_bytes, old_row)?;
2796
2797 if let Some(w) = where_clause {
2798 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2799 let result = eval_expr(w, &ctx)?;
2800 if result.is_null() || !is_truthy(&result) {
2801 return Ok(UpsertAction::Skip);
2802 }
2803 }
2804
2805 new_row.clear();
2806 new_row.extend_from_slice(old_row);
2807 for (col_idx, expr) in assignments {
2808 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2809 let val = eval_expr(expr, &ctx)?;
2810 let col = &table_schema.columns[*col_idx];
2811 new_row[*col_idx] = if val.is_null() {
2812 Value::Null
2813 } else {
2814 let got = val.data_type();
2815 val.coerce_into(col.data_type)
2816 .ok_or_else(|| SqlError::TypeMismatch {
2817 expected: col.data_type.to_string(),
2818 got: got.to_string(),
2819 })?
2820 };
2821 }
2822
2823 for (assigned_idx, _) in assignments {
2824 let col = &table_schema.columns[*assigned_idx];
2825 if !col.nullable && new_row[col.position as usize].is_null() {
2826 return Err(SqlError::NotNullViolation(col.name.clone()));
2827 }
2828 }
2829 if has_checks {
2830 for col in &table_schema.columns {
2831 if let Some(ref check) = col.check_expr {
2832 let ctx = EvalCtx::new(col_map, new_row);
2833 let result = eval_expr(check, &ctx)?;
2834 if !is_truthy(&result) && !result.is_null() {
2835 let name = col.check_name.as_deref().unwrap_or(&col.name);
2836 return Err(SqlError::CheckViolation(name.to_string()));
2837 }
2838 }
2839 }
2840 for tc in &table_schema.check_constraints {
2841 let ctx = EvalCtx::new(col_map, new_row);
2842 let result = eval_expr(&tc.expr, &ctx)?;
2843 if !is_truthy(&result) && !result.is_null() {
2844 let name = tc.name.as_deref().unwrap_or(&tc.sql);
2845 return Err(SqlError::CheckViolation(name.to_string()));
2846 }
2847 }
2848 }
2849 let _ = has_fks;
2850
2851 value_values.clear();
2852 value_values.resize(phys_count, Value::Null);
2853 for &slot in dropped {
2854 value_values[slot as usize] = Value::Null;
2855 }
2856 for (j, &i) in non_pk.iter().enumerate() {
2857 value_values[enc_pos[j] as usize] = new_row[i].clone();
2858 }
2859 new_value_buf.clear();
2860 crate::encoding::encode_row_into(value_values, new_value_buf);
2861
2862 if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2863 return Err(SqlError::RowTooLarge {
2864 size: new_value_buf.len(),
2865 max: citadel_core::MAX_VALUE_SIZE,
2866 });
2867 }
2868
2869 if capture_returning {
2870 *captured.borrow_mut() = Some((old_row.clone(), new_row.clone()));
2871 }
2872 Ok(UpsertAction::Replace(new_value_buf.clone()))
2873 })
2874 })?;
2875
2876 match outcome {
2877 UpsertOutcome::Inserted => Ok(InsertRowOutcome::Inserted),
2878 UpsertOutcome::Updated => {
2879 if capture_returning {
2880 let (old, new) = captured.into_inner().ok_or_else(|| {
2881 SqlError::InvalidValue("DO UPDATE produced no captured rows".into())
2882 })?;
2883 Ok(InsertRowOutcome::Updated { old, new })
2884 } else {
2885 Ok(InsertRowOutcome::Inserted)
2886 }
2887 }
2888 UpsertOutcome::Skipped => Ok(InsertRowOutcome::Skipped),
2889 }
2890}
2891
2892fn fetch_unique_index_pk(
2893 wtx: &mut WriteTxn<'_>,
2894 table_schema: &TableSchema,
2895 index_idx: usize,
2896 row: &[Value],
2897) -> Result<Vec<u8>> {
2898 let idx = &table_schema.indices[index_idx];
2899 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2900 let indexed: Vec<Value> = idx
2901 .column_positions_iter()
2902 .map(|col_idx| row[col_idx as usize].clone())
2903 .collect();
2904 let key = crate::encoding::encode_composite_key(&indexed);
2905 let value = wtx
2906 .table_get(&idx_table, &key)
2907 .map_err(SqlError::Storage)?
2908 .ok_or_else(|| {
2909 SqlError::InvalidValue("unique index missing expected collision entry".into())
2910 })?;
2911 Ok(value)
2912}
2913
2914#[allow(clippy::too_many_arguments)]
2915fn apply_do_update(
2916 wtx: &mut WriteTxn<'_>,
2917 table_schema: &TableSchema,
2918 pk_key: &[u8],
2919 proposed_row: &[Value],
2920 assignments: &[(usize, Expr)],
2921 where_clause: Option<&Expr>,
2922 col_map: &ColumnMap,
2923 capture_returning: bool,
2924) -> Result<InsertRowOutcome> {
2925 let old_value = wtx
2926 .table_get(table_schema.name.as_bytes(), pk_key)
2927 .map_err(SqlError::Storage)?
2928 .ok_or_else(|| SqlError::InvalidValue("primary row missing for DO UPDATE target".into()))?;
2929 let old_row = decode_full_row(table_schema, pk_key, &old_value)?;
2930 apply_do_update_with_old_row(
2931 wtx,
2932 table_schema,
2933 pk_key,
2934 &old_row,
2935 proposed_row,
2936 assignments,
2937 where_clause,
2938 col_map,
2939 capture_returning,
2940 )
2941}
2942
2943#[allow(clippy::too_many_arguments)]
2944fn apply_do_update_with_old_row(
2945 wtx: &mut WriteTxn<'_>,
2946 table_schema: &TableSchema,
2947 old_pk_key: &[u8],
2948 old_row: &[Value],
2949 proposed_row: &[Value],
2950 assignments: &[(usize, Expr)],
2951 where_clause: Option<&Expr>,
2952 col_map: &ColumnMap,
2953 capture_returning: bool,
2954) -> Result<InsertRowOutcome> {
2955 if let Some(w) = where_clause {
2956 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2957 let result = eval_expr(w, &ctx)?;
2958 if result.is_null() || !is_truthy(&result) {
2959 return Ok(InsertRowOutcome::Skipped);
2960 }
2961 }
2962
2963 let mut new_row = old_row.to_vec();
2964 for (col_idx, expr) in assignments {
2965 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2966 let val = eval_expr(expr, &ctx)?;
2967 let col = &table_schema.columns[*col_idx];
2968 new_row[*col_idx] = if val.is_null() {
2969 Value::Null
2970 } else {
2971 let got = val.data_type();
2972 val.coerce_into(col.data_type)
2973 .ok_or_else(|| SqlError::TypeMismatch {
2974 expected: col.data_type.to_string(),
2975 got: got.to_string(),
2976 })?
2977 };
2978 }
2979
2980 for col in &table_schema.columns {
2981 if matches!(
2982 col.generated_kind,
2983 Some(crate::parser::GeneratedKind::Stored)
2984 ) {
2985 let val = eval_expr(
2986 col.generated_expr.as_ref().unwrap(),
2987 &EvalCtx::new(col_map, &new_row),
2988 )?;
2989 let pos = col.position as usize;
2990 new_row[pos] = if val.is_null() {
2991 if !col.nullable {
2992 return Err(SqlError::NotNullViolation(col.name.clone()));
2993 }
2994 Value::Null
2995 } else {
2996 let got = val.data_type();
2997 val.coerce_into(col.data_type)
2998 .ok_or_else(|| SqlError::TypeMismatch {
2999 expected: col.data_type.to_string(),
3000 got: got.to_string(),
3001 })?
3002 };
3003 }
3004 }
3005
3006 let pk_indices = table_schema.pk_indices();
3007 let assigned_pk = assignments.iter().any(|(ci, _)| pk_indices.contains(ci));
3008 let pk_changed = assigned_pk && pk_indices.iter().any(|&i| old_row[i] != new_row[i]);
3009
3010 for (assigned_idx, _) in assignments {
3011 let col = &table_schema.columns[*assigned_idx];
3012 if !col.nullable && new_row[col.position as usize].is_null() {
3013 return Err(SqlError::NotNullViolation(col.name.clone()));
3014 }
3015 }
3016 if table_schema.has_checks() {
3017 for col in &table_schema.columns {
3018 if let Some(ref check) = col.check_expr {
3019 let ctx = EvalCtx::new(col_map, &new_row);
3020 let result = eval_expr(check, &ctx)?;
3021 if !is_truthy(&result) && !result.is_null() {
3022 let name = col.check_name.as_deref().unwrap_or(&col.name);
3023 return Err(SqlError::CheckViolation(name.to_string()));
3024 }
3025 }
3026 }
3027 for tc in &table_schema.check_constraints {
3028 let ctx = EvalCtx::new(col_map, &new_row);
3029 let result = eval_expr(&tc.expr, &ctx)?;
3030 if !is_truthy(&result) && !result.is_null() {
3031 let name = tc.name.as_deref().unwrap_or(&tc.sql);
3032 return Err(SqlError::CheckViolation(name.to_string()));
3033 }
3034 }
3035 }
3036 for fk in &table_schema.foreign_keys {
3037 let changed = fk
3038 .columns
3039 .iter()
3040 .any(|&ci| old_row[ci as usize] != new_row[ci as usize]);
3041 if !changed {
3042 continue;
3043 }
3044 let any_null = fk.columns.iter().any(|&ci| new_row[ci as usize].is_null());
3045 if any_null {
3046 continue;
3047 }
3048 let fk_vals: Vec<Value> = fk
3049 .columns
3050 .iter()
3051 .map(|&ci| new_row[ci as usize].clone())
3052 .collect();
3053 let fk_key = crate::encoding::encode_composite_key(&fk_vals);
3054 if fk.deferrable && fk.initially_deferred {
3055 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
3056 wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
3057 fk_name: name,
3058 foreign_table: fk.foreign_table.as_bytes().to_vec(),
3059 parent_key: fk_key,
3060 });
3061 continue;
3062 }
3063 if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key) {
3064 let found = wtx
3065 .table_get(fk.foreign_table.as_bytes(), &fk_key)
3066 .map_err(SqlError::Storage)?;
3067 if found.is_none() {
3068 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
3069 return Err(SqlError::ForeignKeyViolation(name.to_string()));
3070 }
3071 wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key);
3072 }
3073 }
3074
3075 let has_indices = !table_schema.indices.is_empty();
3076 let old_pk_values: Vec<Value> = if has_indices || pk_changed {
3077 pk_indices.iter().map(|&i| old_row[i].clone()).collect()
3078 } else {
3079 Vec::new()
3080 };
3081 let new_pk_values: Vec<Value> = if has_indices || pk_changed {
3082 pk_indices.iter().map(|&i| new_row[i].clone()).collect()
3083 } else {
3084 Vec::new()
3085 };
3086
3087 let non_pk = table_schema.non_pk_indices();
3088 let enc_pos = table_schema.encoding_positions();
3089 let phys_count = table_schema.physical_non_pk_count();
3090 let dropped = table_schema.dropped_non_pk_slots();
3091 let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
3092 for &slot in dropped {
3093 value_values[slot as usize] = Value::Null;
3094 }
3095 for (j, &i) in non_pk.iter().enumerate() {
3096 let col = &table_schema.columns[i];
3097 value_values[enc_pos[j] as usize] = if matches!(
3098 col.generated_kind,
3099 Some(crate::parser::GeneratedKind::Virtual)
3100 ) {
3101 Value::Null
3102 } else {
3103 new_row[i].clone()
3104 };
3105 }
3106 let mut new_value_buf = Vec::with_capacity(256);
3107 crate::encoding::encode_row_into(&value_values, &mut new_value_buf);
3108
3109 if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
3110 return Err(SqlError::RowTooLarge {
3111 size: new_value_buf.len(),
3112 max: citadel_core::MAX_VALUE_SIZE,
3113 });
3114 }
3115
3116 if pk_changed {
3117 let new_pk_key = crate::encoding::encode_composite_key(&new_pk_values);
3118 let inserted = wtx
3119 .table_insert(table_schema.name.as_bytes(), &new_pk_key, &new_value_buf)
3120 .map_err(SqlError::Storage)?;
3121 if !inserted {
3122 return Err(SqlError::DuplicateKey);
3123 }
3124 wtx.table_delete(table_schema.name.as_bytes(), old_pk_key)
3125 .map_err(SqlError::Storage)?;
3126 for idx in &table_schema.indices {
3127 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
3128 let old_idx_key =
3129 encode_index_key_with_schema(idx, old_row, &old_pk_values, table_schema);
3130 wtx.table_delete(&idx_table, &old_idx_key)
3131 .map_err(SqlError::Storage)?;
3132 let new_idx_key =
3133 encode_index_key_with_schema(idx, &new_row, &new_pk_values, table_schema);
3134 let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
3135 let is_new = wtx
3136 .table_insert(&idx_table, &new_idx_key, &new_idx_val)
3137 .map_err(SqlError::Storage)?;
3138 if idx.unique && !is_new {
3139 let any_null = idx
3140 .column_positions_iter()
3141 .any(|c| new_row[c as usize].is_null());
3142 if !any_null {
3143 return Err(SqlError::UniqueViolation(idx.name.clone()));
3144 }
3145 }
3146 }
3147 } else {
3148 wtx.table_update_sorted(
3149 table_schema.name.as_bytes(),
3150 &[(old_pk_key, new_value_buf.as_slice())],
3151 )
3152 .map_err(SqlError::Storage)?;
3153 let col_map_partial = any_partial_index(table_schema).then(|| table_schema.column_map());
3154 for idx in &table_schema.indices {
3155 let cols_changed = index_columns_changed(idx, old_row, &new_row);
3156 let (del, ins) = partial_idx_update_actions(
3157 idx,
3158 old_row,
3159 &new_row,
3160 cols_changed,
3161 false,
3162 col_map_partial,
3163 );
3164 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
3165 if del {
3166 let old_idx_key =
3167 encode_index_key_with_schema(idx, old_row, &old_pk_values, table_schema);
3168 wtx.table_delete(&idx_table, &old_idx_key)
3169 .map_err(SqlError::Storage)?;
3170 }
3171 if ins {
3172 let new_idx_key =
3173 encode_index_key_with_schema(idx, &new_row, &new_pk_values, table_schema);
3174 let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
3175 let is_new = wtx
3176 .table_insert(&idx_table, &new_idx_key, &new_idx_val)
3177 .map_err(SqlError::Storage)?;
3178 if idx.unique && !is_new {
3179 let any_null = idx
3180 .column_positions_iter()
3181 .any(|c| new_row[c as usize].is_null());
3182 if !any_null {
3183 return Err(SqlError::UniqueViolation(idx.name.clone()));
3184 }
3185 }
3186 }
3187 }
3188 }
3189
3190 if capture_returning {
3191 Ok(InsertRowOutcome::Updated {
3192 old: old_row.to_vec(),
3193 new: new_row,
3194 })
3195 } else {
3196 Ok(InsertRowOutcome::Inserted)
3197 }
3198}
3199
3200fn detect_fast_paths(
3201 ts: &TableSchema,
3202 assignments: &[(usize, Expr)],
3203) -> Option<Vec<DoUpdateFastPath>> {
3204 let non_pk = ts.non_pk_indices();
3205 let enc_pos = ts.encoding_positions();
3206 let mut out = Vec::with_capacity(assignments.len());
3207 for (col_idx, expr) in assignments {
3208 let col = &ts.columns[*col_idx];
3209 if col.data_type != DataType::Integer {
3210 return None;
3211 }
3212 let nonpk_order = non_pk.iter().position(|&i| i == *col_idx)?;
3213 let phys_idx = enc_pos[nonpk_order] as usize;
3214
3215 if let Expr::BinaryOp { left, op, right } = expr {
3216 if !matches!(op, BinOp::Add | BinOp::Sub) {
3217 return None;
3218 }
3219 let reads_target =
3220 matches!(left.as_ref(), Expr::Column(n) if n.eq_ignore_ascii_case(&col.name));
3221 if !reads_target {
3222 return None;
3223 }
3224 if let Expr::Literal(Value::Integer(n)) = right.as_ref() {
3225 let delta = if matches!(op, BinOp::Sub) { -n } else { *n };
3226 let _ = col_idx;
3227 out.push(DoUpdateFastPath::IntAddConst { phys_idx, delta });
3228 continue;
3229 }
3230 return None;
3231 }
3232 return None;
3233 }
3234 Some(out)
3235}
3236
3237fn compile_on_conflict(oc: &OnConflictClause, ts: &TableSchema) -> Result<CompiledOnConflict> {
3238 let target = oc
3239 .target
3240 .as_ref()
3241 .map(|t| resolve_conflict_target(t, ts))
3242 .transpose()?;
3243 match &oc.action {
3244 OnConflictAction::DoNothing => Ok(CompiledOnConflict::DoNothing { target }),
3245 OnConflictAction::DoUpdate {
3246 assignments,
3247 where_clause,
3248 } => {
3249 let target = target.ok_or_else(|| {
3250 SqlError::Plan("ON CONFLICT without target requires DO NOTHING".into())
3251 })?;
3252 let compiled_assignments: Vec<(usize, Expr)> = assignments
3253 .iter()
3254 .map(|(name, expr)| {
3255 let col_idx = ts
3256 .column_index(name)
3257 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?;
3258 Ok((col_idx, expr.clone()))
3259 })
3260 .collect::<Result<_>>()?;
3261 let fast_paths = if where_clause.is_none() {
3262 detect_fast_paths(ts, &compiled_assignments)
3263 } else {
3264 None
3265 };
3266 Ok(CompiledOnConflict::DoUpdate {
3267 target,
3268 assignments: compiled_assignments,
3269 where_clause: where_clause.clone(),
3270 fast_paths,
3271 })
3272 }
3273 }
3274}
3275
3276fn exec_insert_trivial_fast(
3278 wtx: &mut WriteTxn<'_>,
3279 table_lower: &str,
3280 cache: &InsertCache,
3281 bufs: &mut InsertBufs,
3282 params: &[Value],
3283) -> Result<ExecutionResult> {
3284 let prog = cache
3285 .trivial_fast_program
3286 .as_ref()
3287 .expect("trivial fast: program");
3288
3289 for &p in &prog.not_null_param_indices {
3290 if params[p as usize].is_null() {
3291 return Err(SqlError::NotNullViolation(format!("param@{p}")));
3292 }
3293 }
3294
3295 match ¶ms[prog.pk_param as usize] {
3296 Value::Integer(v) => crate::encoding::encode_int_key_into(*v, &mut bufs.key_buf),
3297 _ => return Err(SqlError::InvalidValue("non-integer PK in fast path".into())),
3298 }
3299
3300 bufs.value_buf.clear();
3301 bufs.value_buf.extend_from_slice(&prog.template);
3302
3303 for op in &prog.ops {
3304 match op {
3305 WriteOp::ParamI64 { param_idx, off } => match ¶ms[*param_idx as usize] {
3306 Value::Integer(v) => {
3307 let off = *off as usize;
3308 bufs.value_buf[off..off + 8].copy_from_slice(&v.to_le_bytes());
3309 }
3310 other => {
3311 return Err(SqlError::TypeMismatch {
3312 expected: "Integer".into(),
3313 got: other.data_type().to_string(),
3314 });
3315 }
3316 },
3317 WriteOp::LiteralI64 { value, off } => {
3318 let off = *off as usize;
3319 bufs.value_buf[off..off + 8].copy_from_slice(&value.to_le_bytes());
3320 }
3321 WriteOp::GenAddParamsI64 {
3322 a_param,
3323 b_param,
3324 off,
3325 bitmap_byte_off,
3326 bitmap_bit_mask,
3327 } => match (¶ms[*a_param as usize], ¶ms[*b_param as usize]) {
3328 (Value::Integer(a), Value::Integer(b)) => {
3329 let off = *off as usize;
3330 bufs.value_buf[off..off + 8].copy_from_slice(&a.wrapping_add(*b).to_le_bytes());
3331 }
3332 _ => {
3333 bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
3334 }
3335 },
3336 WriteOp::GenMulAddParamI64 {
3337 param_idx,
3338 mul,
3339 add,
3340 off,
3341 bitmap_byte_off,
3342 bitmap_bit_mask,
3343 } => match ¶ms[*param_idx as usize] {
3344 Value::Integer(v) => {
3345 let r = v.wrapping_mul(*mul).wrapping_add(*add);
3346 let off = *off as usize;
3347 bufs.value_buf[off..off + 8].copy_from_slice(&r.to_le_bytes());
3348 }
3349 _ => {
3350 bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
3351 }
3352 },
3353 }
3354 }
3355
3356 let is_new = wtx
3357 .table_insert(table_lower.as_bytes(), &bufs.key_buf, &bufs.value_buf)
3358 .map_err(SqlError::Storage)?;
3359 if !is_new {
3360 return Err(SqlError::DuplicateKey);
3361 }
3362 Ok(ExecutionResult::RowsAffected(1))
3363}
3364
3365fn build_bind_plan(
3366 stmt: &InsertStmt,
3367 col_indices: &[usize],
3368 col_data_types: &[DataType],
3369) -> Option<Vec<BindAction>> {
3370 let rows = match &stmt.source {
3371 InsertSource::Values(rows) => rows,
3372 _ => return None,
3373 };
3374 if rows.len() != 1 {
3375 return None;
3376 }
3377 let value_row = &rows[0];
3378 if value_row.len() != col_indices.len() {
3379 return None;
3380 }
3381 let mut plan = Vec::with_capacity(value_row.len());
3382 for (i, expr) in value_row.iter().enumerate() {
3383 let col_idx = col_indices[i];
3384 let target = col_data_types[col_idx];
3385 match expr {
3386 Expr::Parameter(n) => {
3387 if *n == 0 {
3388 return None;
3389 }
3390 plan.push(BindAction::Param {
3391 param_idx: n - 1,
3392 col_idx,
3393 target,
3394 });
3395 }
3396 Expr::Literal(v) => plan.push(BindAction::Literal {
3397 value: v.clone(),
3398 col_idx,
3399 }),
3400 _ => return None,
3401 }
3402 }
3403 Some(plan)
3404}
3405
3406impl CompiledInsert {
3407 pub fn try_compile(schema: &SchemaManager, stmt: &InsertStmt) -> Option<Self> {
3408 let lower = stmt.table.to_ascii_lowercase();
3409 let cached = if let Some(ts) = schema.get(&lower) {
3410 let insert_columns: Vec<&str> = if stmt.columns.is_empty() {
3411 ts.columns.iter().map(|c| c.name.as_str()).collect()
3412 } else {
3413 stmt.columns.iter().map(|s| s.as_str()).collect()
3414 };
3415 let mut col_indices = Vec::with_capacity(insert_columns.len());
3416 for name in &insert_columns {
3417 col_indices.push(ts.column_index(name)?);
3418 }
3419 if col_indices
3420 .iter()
3421 .any(|&ci| ts.columns[ci].generated_kind.is_some())
3422 {
3423 return None;
3424 }
3425 let on_conflict = stmt
3426 .on_conflict
3427 .as_ref()
3428 .map(|oc| compile_on_conflict(oc, ts))
3429 .transpose()
3430 .ok()
3431 .flatten()
3432 .map(Arc::new);
3433 let generated_col_positions: Vec<usize> = ts
3434 .columns
3435 .iter()
3436 .enumerate()
3437 .filter_map(|(i, c)| {
3438 matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored))
3439 .then_some(i)
3440 })
3441 .collect();
3442 let generated_fast_evals: Vec<FastGenEval> = generated_col_positions
3443 .iter()
3444 .map(|&pos| {
3445 detect_fast_gen_eval(ts.columns[pos].generated_expr.as_ref().unwrap(), ts)
3446 })
3447 .collect();
3448 let row_col_map = if on_conflict.is_some() || !generated_col_positions.is_empty() {
3449 Some(ColumnMap::new(&ts.columns))
3450 } else {
3451 None
3452 };
3453 let pk_indices: Vec<usize> = ts.pk_indices().to_vec();
3454 let non_pk_indices: Vec<usize> = ts.non_pk_indices().to_vec();
3455 let encoding_positions: Vec<u16> = ts.encoding_positions().to_vec();
3456 let dropped_non_pk_slots: Vec<u16> = ts.dropped_non_pk_slots().to_vec();
3457 let phys_count = ts.physical_non_pk_count();
3458 let col_data_types: Vec<DataType> = ts.columns.iter().map(|c| c.data_type).collect();
3459 let single_int_pk =
3460 pk_indices.len() == 1 && ts.columns[pk_indices[0]].data_type == DataType::Integer;
3461 let not_null_indices: Vec<u16> = ts
3462 .columns
3463 .iter()
3464 .filter(|c| !c.nullable)
3465 .map(|c| c.position)
3466 .collect();
3467 let bind_plan = build_bind_plan(stmt, &col_indices, &col_data_types);
3468 let any_defaults_flag = ts.columns.iter().any(|c| c.default_expr.is_some());
3469 let row_fully_overwritten = if any_defaults_flag {
3470 false
3471 } else {
3472 let mut covered: rustc_hash::FxHashSet<usize> =
3473 col_indices.iter().copied().collect();
3474 covered.extend(generated_col_positions.iter().copied());
3475 for (j, &i) in non_pk_indices.iter().enumerate() {
3476 let _ = j;
3477 if matches!(
3478 ts.columns[i].generated_kind,
3479 Some(crate::parser::GeneratedKind::Virtual)
3480 ) {
3481 covered.insert(i);
3482 }
3483 }
3484 bind_plan.is_some() && covered.len() == ts.columns.len()
3485 };
3486 let has_fks = !ts.foreign_keys.is_empty();
3487 let has_indices = !ts.indices.is_empty();
3488 let mut non_virtual_pairs: Vec<(usize, usize)> = Vec::new();
3489 let mut null_value_slots: Vec<usize> =
3490 dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3491 for (j, &i) in non_pk_indices.iter().enumerate() {
3492 let slot = encoding_positions[j] as usize;
3493 if matches!(
3494 ts.columns[i].generated_kind,
3495 Some(crate::parser::GeneratedKind::Virtual)
3496 ) {
3497 null_value_slots.push(slot);
3498 } else {
3499 non_virtual_pairs.push((i, slot));
3500 }
3501 }
3502 let row_encoder = {
3503 let all_int_or_null = non_pk_indices.iter().enumerate().all(|(j, &i)| {
3504 let col = &ts.columns[i];
3505 if matches!(
3506 col.generated_kind,
3507 Some(crate::parser::GeneratedKind::Virtual)
3508 ) {
3509 true
3510 } else {
3511 col.data_type == DataType::Integer && encoding_positions[j] != u16::MAX
3512 }
3513 });
3514 if all_int_or_null {
3515 let mut null_slots: Vec<usize> =
3516 dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3517 for (j, &i) in non_pk_indices.iter().enumerate() {
3518 if matches!(
3519 ts.columns[i].generated_kind,
3520 Some(crate::parser::GeneratedKind::Virtual)
3521 ) {
3522 null_slots.push(encoding_positions[j] as usize);
3523 }
3524 }
3525 Some(crate::encoding::build_int_row_template(
3526 phys_count,
3527 &null_slots,
3528 ))
3529 } else {
3530 None
3531 }
3532 };
3533 let is_trivial_fast_eligible = !insert_has_subquery(stmt)
3534 && !ts.columns.iter().any(|c| c.default_expr.is_some())
3535 && !ts.has_checks()
3536 && !has_fks
3537 && !has_indices
3538 && stmt.on_conflict.is_none()
3539 && stmt.returning.is_none()
3540 && bind_plan.is_some()
3541 && row_encoder.is_some()
3542 && row_fully_overwritten
3543 && single_int_pk
3544 && generated_fast_evals
3545 .iter()
3546 .all(|fe| !matches!(fe, FastGenEval::None));
3547 let trivial_fast_program = if is_trivial_fast_eligible {
3548 build_trivial_fast_program(
3549 bind_plan.as_ref().unwrap(),
3550 row_encoder.as_ref().unwrap(),
3551 &non_virtual_pairs,
3552 &generated_col_positions,
3553 &generated_fast_evals,
3554 &pk_indices,
3555 &ts.columns,
3556 )
3557 } else {
3558 None
3559 };
3560 let is_trivial_fast = trivial_fast_program.is_some();
3561 let has_checks = ts.has_checks();
3562 let any_defaults = ts.columns.iter().any(|c| c.default_expr.is_some());
3563 let needs_scoped_params = bind_plan.is_none()
3564 || has_checks
3565 || any_defaults
3566 || !generated_col_positions.is_empty()
3567 || on_conflict.is_some()
3568 || stmt.returning.is_some()
3569 || insert_has_subquery(stmt)
3570 || super::helpers::any_partial_index(ts);
3571 Some(InsertCache {
3572 col_indices,
3573 has_subquery: insert_has_subquery(stmt),
3574 any_defaults,
3575 has_checks,
3576 on_conflict,
3577 row_col_map,
3578 generated_col_positions,
3579 generated_fast_evals,
3580 pk_indices,
3581 non_pk_indices,
3582 encoding_positions,
3583 dropped_non_pk_slots,
3584 phys_count,
3585 single_int_pk,
3586 not_null_indices,
3587 bind_plan,
3588 row_fully_overwritten,
3589 row_encoder,
3590 is_trivial_fast,
3591 trivial_fast_program,
3592 needs_scoped_params,
3593 })
3594 } else if schema.get_view(&lower).is_some() {
3595 None
3596 } else {
3597 return None;
3598 };
3599 Some(Self {
3600 table_lower: lower,
3601 cached,
3602 })
3603 }
3604}
3605
3606impl CompiledPlan for CompiledInsert {
3607 fn execute(
3608 &self,
3609 db: &Database,
3610 schema: &SchemaManager,
3611 stmt: &Statement,
3612 params: &[Value],
3613 txn: super::compile::ActiveTxnRef<'_, '_>,
3614 ) -> Result<ExecutionResult> {
3615 let ins = match stmt {
3616 Statement::Insert(i) => i,
3617 _ => {
3618 return Err(SqlError::Unsupported(
3619 "CompiledInsert received non-INSERT statement".into(),
3620 ))
3621 }
3622 };
3623 use super::compile::ActiveTxnRef;
3624 match txn {
3625 ActiveTxnRef::None => exec_insert(db, schema, ins, params),
3626 ActiveTxnRef::Read(_) => Err(SqlError::Unsupported(
3627 "cannot execute mutating statement inside a read-only transaction".into(),
3628 )),
3629 ActiveTxnRef::Write(outer) => match self.cached.as_ref() {
3630 Some(c) if c.is_trivial_fast => with_insert_scratch(|bufs| {
3631 exec_insert_trivial_fast(outer, &self.table_lower, c, bufs, params)
3632 }),
3633 Some(c) => exec_insert_in_txn_cached(outer, schema, ins, params, c),
3634 None => exec_insert_in_txn(outer, schema, ins, params),
3635 },
3636 }
3637 }
3638
3639 fn uses_scoped_params(&self) -> bool {
3640 match self.cached.as_ref() {
3641 Some(c) => !c.is_trivial_fast && c.needs_scoped_params,
3642 None => true,
3643 }
3644 }
3645}
3646
3647pub struct CompiledDelete {
3648 table_lower: String,
3649}
3650
3651impl CompiledDelete {
3652 pub fn try_compile(schema: &SchemaManager, stmt: &DeleteStmt) -> Option<Self> {
3653 let lower = stmt.table.to_ascii_lowercase();
3654 schema.get(&lower)?;
3655 Some(Self { table_lower: lower })
3656 }
3657}
3658
3659impl CompiledPlan for CompiledDelete {
3660 fn execute(
3661 &self,
3662 db: &Database,
3663 schema: &SchemaManager,
3664 stmt: &Statement,
3665 _params: &[Value],
3666 txn: super::compile::ActiveTxnRef<'_, '_>,
3667 ) -> Result<ExecutionResult> {
3668 let del = match stmt {
3669 Statement::Delete(d) => d,
3670 _ => {
3671 return Err(SqlError::Unsupported(
3672 "CompiledDelete received non-DELETE statement".into(),
3673 ))
3674 }
3675 };
3676 let _ = &self.table_lower;
3677 use super::compile::ActiveTxnRef;
3678 match txn {
3679 ActiveTxnRef::None => super::write::exec_delete(db, schema, del),
3680 ActiveTxnRef::Read(_) => Err(SqlError::Unsupported(
3681 "cannot execute mutating statement inside a read-only transaction".into(),
3682 )),
3683 ActiveTxnRef::Write(outer) => super::write::exec_delete_in_txn(outer, schema, del),
3684 }
3685 }
3686}
3687
3688fn exec_instead_of_view_insert_auto(
3689 db: &Database,
3690 schema: &SchemaManager,
3691 view_name: &str,
3692 aliases: &[String],
3693 stmt: &InsertStmt,
3694 params: &[Value],
3695) -> Result<ExecutionResult> {
3696 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
3697 let r = exec_instead_of_view_insert_in_txn(&mut wtx, schema, view_name, aliases, stmt, params)?;
3698 wtx.commit().map_err(SqlError::Storage)?;
3699 Ok(r)
3700}
3701
3702fn exec_instead_of_view_insert_in_txn(
3703 wtx: &mut WriteTxn<'_>,
3704 schema: &SchemaManager,
3705 view_name: &str,
3706 aliases: &[String],
3707 stmt: &InsertStmt,
3708 params: &[Value],
3709) -> Result<ExecutionResult> {
3710 let resolved_aliases: Vec<String> = if aliases.is_empty() {
3712 derive_view_columns(wtx, schema, view_name)?
3713 } else {
3714 aliases.to_vec()
3715 };
3716 let view_cols = super::triggers::view_columns_from_aliases(&resolved_aliases);
3717 let alias_map: rustc_hash::FxHashMap<String, usize> = resolved_aliases
3718 .iter()
3719 .enumerate()
3720 .map(|(i, name)| (name.to_ascii_lowercase(), i))
3721 .collect();
3722
3723 let target_positions: Vec<usize> = if stmt.columns.is_empty() {
3724 (0..resolved_aliases.len()).collect()
3725 } else {
3726 stmt.columns
3727 .iter()
3728 .map(|c| {
3729 alias_map
3730 .get(&c.to_ascii_lowercase())
3731 .copied()
3732 .ok_or_else(|| SqlError::ColumnNotFound(c.clone()))
3733 })
3734 .collect::<Result<_>>()?
3735 };
3736
3737 let source_rows: Vec<Vec<Value>> = match &stmt.source {
3738 InsertSource::Values(rows) => {
3739 let mut out = Vec::with_capacity(rows.len());
3740 for row in rows {
3741 if row.len() != target_positions.len() {
3742 return Err(SqlError::InvalidValue(format!(
3743 "expected {} values, got {}",
3744 target_positions.len(),
3745 row.len()
3746 )));
3747 }
3748 let mut vals = Vec::with_capacity(row.len());
3749 for expr in row {
3750 let v = match expr {
3751 Expr::Parameter(n) => params
3752 .get(n - 1)
3753 .cloned()
3754 .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
3755 Expr::Literal(v) => v.clone(),
3756 other => eval_const_expr(other)?,
3757 };
3758 vals.push(v);
3759 }
3760 out.push(vals);
3761 }
3762 out
3763 }
3764 InsertSource::Select(sq) => {
3765 let empty_ctes = CteContext::default();
3766 let qr = exec_query_body_write(wtx, schema, &sq.body, &empty_ctes)?;
3767 qr.rows
3768 }
3769 };
3770
3771 let mut count: u64 = 0;
3772 for row in source_rows {
3773 if row.len() != target_positions.len() {
3774 return Err(SqlError::InvalidValue(format!(
3775 "expected {} values, got {}",
3776 target_positions.len(),
3777 row.len()
3778 )));
3779 }
3780 let mut new_row = vec![Value::Null; resolved_aliases.len()];
3781 for (slot, val) in target_positions.iter().zip(row) {
3782 new_row[*slot] = val;
3783 }
3784 super::triggers::fire_row_triggers(
3785 wtx,
3786 schema,
3787 view_name,
3788 crate::parser::TriggerTiming::InsteadOf,
3789 super::triggers::FireEvent::Insert,
3790 None,
3791 Some(new_row),
3792 &view_cols,
3793 )?;
3794 count += 1;
3795 }
3796 Ok(ExecutionResult::RowsAffected(count))
3797}
3798
3799fn derive_view_columns(
3800 wtx: &mut WriteTxn<'_>,
3801 schema: &SchemaManager,
3802 view_name: &str,
3803) -> Result<Vec<String>> {
3804 use crate::parser::{QueryBody, SelectColumn, SelectQuery, SelectStmt};
3805 let sel = SelectStmt {
3806 columns: vec![SelectColumn::AllColumns],
3807 from: view_name.to_string(),
3808 from_alias: None,
3809 from_subquery: None,
3810 from_args: None,
3811 from_json_table: None,
3812 joins: vec![],
3813 distinct: false,
3814 where_clause: None,
3815 order_by: vec![],
3816 limit: Some(Expr::Literal(Value::Integer(1))),
3817 offset: None,
3818 group_by: vec![],
3819 having: None,
3820 };
3821 let sq = SelectQuery {
3822 ctes: vec![],
3823 recursive: false,
3824 body: QueryBody::Select(Box::new(sel)),
3825 };
3826 let qr = super::cte::exec_select_query_in_txn(wtx, schema, &sq)?;
3827 match qr {
3828 ExecutionResult::Query(q) => Ok(q.columns),
3829 _ => Ok(Vec::new()),
3830 }
3831}
3832
3833#[cfg(test)]
3834#[path = "dml_tests.rs"]
3835mod tests;