1use std::cell::RefCell;
2use std::sync::Arc;
3
4use citadel::Database;
5use citadel_buffer::btree::{UpsertAction, UpsertOutcome};
6use citadel_txn::read_txn::ReadTxn;
7use citadel_txn::write_txn::WriteTxn;
8use rustc_hash::FxHashMap;
9
10use crate::encoding::{encode_composite_key_into, encode_row_into};
11use crate::error::{Result, SqlError};
12use crate::eval::{eval_expr, is_truthy, ColumnMap, EvalCtx};
13use crate::parser::*;
14use crate::types::*;
15
16use crate::schema::SchemaManager;
17
18use super::compile::CompiledPlan;
19use super::helpers::*;
20use super::CteContext;
21
22pub(super) fn exec_insert(
23 db: &Database,
24 schema: &SchemaManager,
25 stmt: &InsertStmt,
26 params: &[Value],
27) -> Result<ExecutionResult> {
28 let empty_ctes = CteContext::default();
29 let materialized;
30 let stmt = if insert_has_subquery(stmt) {
31 materialized = materialize_insert(stmt, &mut |sub| {
32 exec_subquery_read(db, schema, sub, &empty_ctes)
33 })?;
34 &materialized
35 } else {
36 stmt
37 };
38
39 let lower_name = stmt.table.to_ascii_lowercase();
40 if let Some(view_def) = schema.get_view(&lower_name) {
41 if super::triggers::has_instead_of(schema, &lower_name, super::triggers::FireEvent::Insert)
42 {
43 let aliases = view_def.column_aliases.clone();
44 return exec_instead_of_view_insert_auto(
45 db,
46 schema,
47 &lower_name,
48 &aliases,
49 stmt,
50 params,
51 );
52 }
53 return Err(SqlError::CannotModifyView(stmt.table.clone()));
54 }
55 if schema.get_matview(&lower_name).is_some() {
56 return Err(SqlError::CannotModifyView(format!(
57 "materialized view '{}' is read-only — use REFRESH MATERIALIZED VIEW",
58 stmt.table
59 )));
60 }
61 let table_schema = schema
62 .get(&lower_name)
63 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
64 schema.mark_dml(&table_schema.name);
65
66 let insert_columns = if stmt.columns.is_empty() {
67 table_schema
68 .columns
69 .iter()
70 .map(|c| c.name.clone())
71 .collect::<Vec<_>>()
72 } else {
73 stmt.columns
74 .iter()
75 .map(|c| c.to_ascii_lowercase())
76 .collect()
77 };
78
79 let col_indices: Vec<usize> = insert_columns
80 .iter()
81 .map(|name| {
82 table_schema
83 .column_index(name)
84 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
85 })
86 .collect::<Result<_>>()?;
87
88 for &ci in &col_indices {
89 if table_schema.columns[ci].generated_kind.is_some() {
90 return Err(SqlError::CannotInsertIntoGeneratedColumn(
91 table_schema.columns[ci].name.clone(),
92 ));
93 }
94 }
95
96 let defaults: Vec<(usize, &Expr)> = table_schema
97 .columns
98 .iter()
99 .filter(|c| c.default_expr.is_some() && !col_indices.contains(&(c.position as usize)))
100 .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
101 .collect();
102
103 let generated_cols: Vec<(usize, &Expr)> = table_schema
104 .columns
105 .iter()
106 .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
107 .map(|c| (c.position as usize, c.generated_expr.as_ref().unwrap()))
108 .collect();
109
110 let has_checks = table_schema.has_checks();
111 let strict = table_schema.is_strict();
112 let row_col_map_for_gen = if !generated_cols.is_empty() {
113 Some(ColumnMap::new(&table_schema.columns))
114 } else {
115 None
116 };
117 let check_col_map = if has_checks {
118 Some(ColumnMap::new(&table_schema.columns))
119 } else {
120 None
121 };
122
123 let select_rows = match &stmt.source {
124 InsertSource::Select(sq) => {
125 let insert_ctes =
126 super::materialize_all_ctes(&sq.ctes, sq.recursive, &mut |body, ctx| {
127 exec_query_body_read(db, schema, body, ctx)
128 })?;
129 let qr = exec_query_body_read(db, schema, &sq.body, &insert_ctes)?;
130 Some(qr.rows)
131 }
132 InsertSource::Values(_) => None,
133 };
134
135 let compiled_conflict: Option<Arc<CompiledOnConflict>> = stmt
136 .on_conflict
137 .as_ref()
138 .map(|oc| compile_on_conflict(oc, table_schema).map(Arc::new))
139 .transpose()?;
140
141 let row_col_map = compiled_conflict
142 .as_ref()
143 .map(|_| ColumnMap::new(&table_schema.columns));
144
145 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
146 let mut count: u64 = 0;
147 let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
148 stmt.returning.as_ref().map(|_| Vec::new());
149
150 let pk_indices = table_schema.pk_indices();
151 let non_pk = table_schema.non_pk_indices();
152 let enc_pos = table_schema.encoding_positions();
153 let phys_count = table_schema.physical_non_pk_count();
154 let mut row = vec![Value::Null; table_schema.columns.len()];
155 let mut pk_values: Vec<Value> = vec![Value::Null; pk_indices.len()];
156 let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
157 let mut key_buf: Vec<u8> = Vec::with_capacity(64);
158 let mut value_buf: Vec<u8> = Vec::with_capacity(256);
159 let mut fk_key_buf: Vec<u8> = Vec::with_capacity(64);
160
161 let values = match &stmt.source {
162 InsertSource::Values(rows) => Some(rows.as_slice()),
163 InsertSource::Select(_) => None,
164 };
165 let sel_rows = select_rows.as_deref();
166
167 let total = match (values, sel_rows) {
168 (Some(rows), _) => rows.len(),
169 (_, Some(rows)) => rows.len(),
170 _ => 0,
171 };
172
173 if let Some(sel) = sel_rows {
174 if !sel.is_empty() && sel[0].len() != insert_columns.len() {
175 return Err(SqlError::InvalidValue(format!(
176 "INSERT ... SELECT column count mismatch: expected {}, got {}",
177 insert_columns.len(),
178 sel[0].len()
179 )));
180 }
181 }
182
183 let has_insert_statement_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
184 t.enabled
185 && t.granularity == crate::parser::TriggerGranularity::ForEachStatement
186 && t.events
187 .iter()
188 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
189 });
190 let mut stmt_new_rows: Vec<Vec<Value>> = if has_insert_statement_triggers {
191 Vec::with_capacity(total)
192 } else {
193 Vec::new()
194 };
195
196 if has_insert_statement_triggers {
197 super::triggers::fire_statement_triggers(
198 &mut wtx,
199 schema,
200 &table_schema.name,
201 crate::parser::TriggerTiming::Before,
202 super::triggers::FireEvent::Insert,
203 &table_schema.columns,
204 &[],
205 &[],
206 )?;
207 }
208
209 for idx in 0..total {
210 for v in row.iter_mut() {
211 *v = Value::Null;
212 }
213
214 if let Some(value_rows) = values {
215 let value_row = &value_rows[idx];
216 if value_row.len() != insert_columns.len() {
217 return Err(SqlError::InvalidValue(format!(
218 "expected {} values, got {}",
219 insert_columns.len(),
220 value_row.len()
221 )));
222 }
223 for (i, expr) in value_row.iter().enumerate() {
224 let val = if let Expr::Parameter(n) = expr {
225 params
226 .get(n - 1)
227 .cloned()
228 .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?
229 } else {
230 eval_const_expr(expr)?
231 };
232 let col_idx = col_indices[i];
233 let col = &table_schema.columns[col_idx];
234 row[col_idx] = if val.is_null() {
235 Value::Null
236 } else {
237 coerce_for_column(val, col, strict)?
238 };
239 }
240 } else if let Some(sel) = sel_rows {
241 let sel_row = &sel[idx];
242 for (i, val) in sel_row.iter().enumerate() {
243 let col_idx = col_indices[i];
244 let col = &table_schema.columns[col_idx];
245 row[col_idx] = if val.is_null() {
246 Value::Null
247 } else {
248 coerce_for_column(val.clone(), col, strict)?
249 };
250 }
251 }
252
253 for &(pos, def_expr) in &defaults {
254 let val = eval_const_expr(def_expr)?;
255 let col = &table_schema.columns[pos];
256 if !val.is_null() {
257 row[pos] = coerce_for_column(val, col, strict)?;
258 }
259 }
260
261 if let Some(ref gen_map) = row_col_map_for_gen {
262 for &(pos, gen_expr) in &generated_cols {
263 let val = eval_expr(gen_expr, &EvalCtx::new(gen_map, &row))?;
264 let col = &table_schema.columns[pos];
265 row[pos] = if val.is_null() {
266 Value::Null
267 } else {
268 coerce_for_column(val, col, strict)?
269 };
270 }
271 }
272
273 for col in &table_schema.columns {
274 if !col.nullable && row[col.position as usize].is_null() {
275 return Err(SqlError::NotNullViolation(col.name.clone()));
276 }
277 }
278
279 if let Some(ref col_map) = check_col_map {
280 for col in &table_schema.columns {
281 if let Some(ref check) = col.check_expr {
282 let result = eval_expr(check, &EvalCtx::new(col_map, &row))?;
283 if !is_truthy(&result) && !result.is_null() {
284 let name = col.check_name.as_deref().unwrap_or(&col.name);
285 return Err(SqlError::CheckViolation(name.to_string()));
286 }
287 }
288 }
289 for tc in &table_schema.check_constraints {
290 let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &row))?;
291 if !is_truthy(&result) && !result.is_null() {
292 let name = tc.name.as_deref().unwrap_or(&tc.sql);
293 return Err(SqlError::CheckViolation(name.to_string()));
294 }
295 }
296 }
297
298 for fk in &table_schema.foreign_keys {
299 let any_null = fk.columns.iter().any(|&ci| row[ci as usize].is_null());
300 if any_null {
301 continue; }
303 let fk_vals: Vec<Value> = fk
304 .columns
305 .iter()
306 .map(|&ci| row[ci as usize].clone())
307 .collect();
308 fk_key_buf.clear();
309 encode_composite_key_into(&fk_vals, &mut fk_key_buf);
310 if fk.deferrable && fk.initially_deferred {
311 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
312 wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
313 fk_name: name,
314 foreign_table: fk.foreign_table.as_bytes().to_vec(),
315 parent_key: fk_key_buf.clone(),
316 });
317 continue;
318 }
319 if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key_buf) {
320 let found = wtx
321 .table_get(fk.foreign_table.as_bytes(), &fk_key_buf)
322 .map_err(SqlError::Storage)?;
323 if found.is_none() {
324 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
325 return Err(SqlError::ForeignKeyViolation(name.to_string()));
326 }
327 wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key_buf);
328 }
329 }
330
331 let proposed_row_for_returning: Option<Vec<Value>> =
332 returning_rows.as_ref().map(|_| row.clone());
333 let row_for_stmt_trigger: Option<Vec<Value>> = if has_insert_statement_triggers {
334 Some(row.clone())
335 } else {
336 None
337 };
338
339 let has_before_insert_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
340 t.enabled
341 && t.timing == crate::parser::TriggerTiming::Before
342 && t.granularity == crate::parser::TriggerGranularity::ForEachRow
343 && t.events
344 .iter()
345 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
346 });
347 if has_before_insert_triggers {
348 super::triggers::fire_row_triggers(
349 &mut wtx,
350 schema,
351 &table_schema.name,
352 crate::parser::TriggerTiming::Before,
353 super::triggers::FireEvent::Insert,
354 None,
355 Some(row.clone()),
356 &table_schema.columns,
357 )?;
358 }
359
360 for (j, &i) in pk_indices.iter().enumerate() {
361 pk_values[j] = std::mem::replace(&mut row[i], Value::Null);
362 }
363 encode_composite_key_into(&pk_values, &mut key_buf);
364
365 for (j, &i) in non_pk.iter().enumerate() {
366 let col = &table_schema.columns[i];
367 if matches!(
368 col.generated_kind,
369 Some(crate::parser::GeneratedKind::Virtual)
370 ) {
371 value_values[enc_pos[j] as usize] = Value::Null;
372 row[i] = Value::Null;
373 } else {
374 value_values[enc_pos[j] as usize] = std::mem::replace(&mut row[i], Value::Null);
375 }
376 }
377 encode_row_into(&value_values, &mut value_buf);
378
379 if key_buf.len() > citadel_core::MAX_KEY_SIZE {
380 return Err(SqlError::KeyTooLarge {
381 size: key_buf.len(),
382 max: citadel_core::MAX_KEY_SIZE,
383 });
384 }
385 if value_buf.len() > citadel_core::MAX_VALUE_SIZE {
386 return Err(SqlError::RowTooLarge {
387 size: value_buf.len(),
388 max: citadel_core::MAX_VALUE_SIZE,
389 });
390 }
391
392 match compiled_conflict.as_ref() {
393 None => {
394 let is_new = wtx
395 .table_insert(table_schema.name.as_bytes(), &key_buf, &value_buf)
396 .map_err(SqlError::Storage)?;
397 if !is_new {
398 return Err(SqlError::DuplicateKey);
399 }
400 let has_after_insert_triggers =
401 schema.triggers_for(&table_schema.name).iter().any(|t| {
402 t.enabled
403 && t.timing == crate::parser::TriggerTiming::After
404 && t.granularity == crate::parser::TriggerGranularity::ForEachRow
405 && t.events
406 .iter()
407 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
408 });
409 if !table_schema.indices.is_empty() || has_after_insert_triggers {
410 for (j, &i) in pk_indices.iter().enumerate() {
411 row[i] = pk_values[j].clone();
412 }
413 for (j, &i) in non_pk.iter().enumerate() {
414 row[i] =
415 std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
416 }
417 if !table_schema.indices.is_empty() {
418 insert_index_entries(&mut wtx, table_schema, &row, &pk_values)?;
419 }
420 if has_after_insert_triggers {
421 super::triggers::fire_row_triggers(
422 &mut wtx,
423 schema,
424 &table_schema.name,
425 crate::parser::TriggerTiming::After,
426 super::triggers::FireEvent::Insert,
427 None,
428 Some(row.clone()),
429 &table_schema.columns,
430 )?;
431 }
432 }
433 if let Some(r) = row_for_stmt_trigger.clone() {
434 stmt_new_rows.push(r);
435 }
436 count += 1;
437 if let Some(buf) = returning_rows.as_mut() {
438 buf.push((None, proposed_row_for_returning));
439 }
440 }
441 Some(oc) => {
442 let oc_ref: &CompiledOnConflict = oc;
443 let needs_row = upsert_needs_row(oc_ref, table_schema);
444 if needs_row {
445 for (j, &i) in pk_indices.iter().enumerate() {
446 row[i] = pk_values[j].clone();
447 }
448 for (j, &i) in non_pk.iter().enumerate() {
449 row[i] =
450 std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
451 }
452 }
453 let outcome = apply_insert_with_conflict(
454 &mut wtx,
455 table_schema,
456 &key_buf,
457 &value_buf,
458 &row,
459 &pk_values,
460 oc_ref,
461 row_col_map.as_ref().unwrap(),
462 stmt.returning.is_some(),
463 )?;
464 match outcome {
465 InsertRowOutcome::Inserted => {
466 count += 1;
467 if let Some(buf) = returning_rows.as_mut() {
468 buf.push((None, proposed_row_for_returning));
469 }
470 if let Some(r) = row_for_stmt_trigger.clone() {
471 stmt_new_rows.push(r);
472 }
473 let has_after_insert_triggers =
474 schema.triggers_for(&table_schema.name).iter().any(|t| {
475 t.enabled
476 && t.timing == crate::parser::TriggerTiming::After
477 && t.granularity
478 == crate::parser::TriggerGranularity::ForEachRow
479 && t.events
480 .iter()
481 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
482 });
483 if has_after_insert_triggers {
484 super::triggers::fire_row_triggers(
485 &mut wtx,
486 schema,
487 &table_schema.name,
488 crate::parser::TriggerTiming::After,
489 super::triggers::FireEvent::Insert,
490 None,
491 Some(row.clone()),
492 &table_schema.columns,
493 )?;
494 }
495 }
496 InsertRowOutcome::Updated { old, new } => {
497 count += 1;
498 if let Some(buf) = returning_rows.as_mut() {
499 buf.push((Some(old.clone()), Some(new.clone())));
500 }
501 let has_after_update_triggers =
502 schema.triggers_for(&table_schema.name).iter().any(|t| {
503 t.enabled
504 && t.timing == crate::parser::TriggerTiming::After
505 && t.granularity
506 == crate::parser::TriggerGranularity::ForEachRow
507 && t.events.iter().any(|e| {
508 matches!(e, crate::parser::TriggerEvent::Update(_))
509 })
510 });
511 if has_after_update_triggers {
512 let changed_cols: Vec<String> = match oc_ref {
513 CompiledOnConflict::DoUpdate { assignments, .. } => assignments
514 .iter()
515 .map(|(col_idx, _)| table_schema.columns[*col_idx].name.clone())
516 .collect(),
517 _ => Vec::new(),
518 };
519 super::triggers::fire_row_triggers(
520 &mut wtx,
521 schema,
522 &table_schema.name,
523 crate::parser::TriggerTiming::After,
524 super::triggers::FireEvent::Update {
525 changed_columns: &changed_cols,
526 },
527 Some(old),
528 Some(new),
529 &table_schema.columns,
530 )?;
531 }
532 }
533 InsertRowOutcome::Skipped => {}
534 }
535 }
536 }
537 }
538
539 if has_insert_statement_triggers {
540 super::triggers::fire_statement_triggers(
541 &mut wtx,
542 schema,
543 &table_schema.name,
544 crate::parser::TriggerTiming::After,
545 super::triggers::FireEvent::Insert,
546 &table_schema.columns,
547 &[],
548 &stmt_new_rows,
549 )?;
550 }
551
552 if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
553 let qr = super::helpers::project_returning(table_schema, returning_cols, &rows)?;
554 super::helpers::drain_deferred_fk_checks(&mut wtx)?;
555 wtx.commit().map_err(SqlError::Storage)?;
556 return Ok(ExecutionResult::Query(qr));
557 }
558
559 super::helpers::drain_deferred_fk_checks(&mut wtx)?;
560 wtx.commit().map_err(SqlError::Storage)?;
561 Ok(ExecutionResult::RowsAffected(count))
562}
563
564pub(super) fn has_subquery(expr: &Expr) -> bool {
565 crate::parser::has_subquery(expr)
566}
567
568pub(super) fn stmt_has_subquery(stmt: &SelectStmt) -> bool {
569 if let Some(ref w) = stmt.where_clause {
570 if has_subquery(w) {
571 return true;
572 }
573 }
574 if let Some(ref h) = stmt.having {
575 if has_subquery(h) {
576 return true;
577 }
578 }
579 for col in &stmt.columns {
580 if let SelectColumn::Expr { expr, .. } = col {
581 if has_subquery(expr) {
582 return true;
583 }
584 }
585 }
586 for ob in &stmt.order_by {
587 if has_subquery(&ob.expr) {
588 return true;
589 }
590 }
591 for join in &stmt.joins {
592 if let Some(ref on_expr) = join.on_clause {
593 if has_subquery(on_expr) {
594 return true;
595 }
596 }
597 }
598 false
599}
600
601pub(super) fn materialize_expr(
602 expr: &Expr,
603 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
604) -> Result<Expr> {
605 match expr {
606 Expr::InSubquery {
607 expr: e,
608 subquery,
609 negated,
610 } => {
611 let inner = materialize_expr(e, exec_sub)?;
612 let qr = exec_sub(subquery)?;
613 if !qr.columns.is_empty() && qr.columns.len() != 1 {
614 return Err(SqlError::SubqueryMultipleColumns);
615 }
616 let mut values = rustc_hash::FxHashSet::default();
617 let mut has_null = false;
618 for row in &qr.rows {
619 if row[0].is_null() {
620 has_null = true;
621 } else {
622 values.insert(row[0].clone());
623 }
624 }
625 Ok(Expr::InSet {
626 expr: Box::new(inner),
627 values,
628 has_null,
629 negated: *negated,
630 })
631 }
632 Expr::ScalarSubquery(subquery) => {
633 let qr = exec_sub(subquery)?;
634 if qr.rows.len() > 1 {
635 return Err(SqlError::SubqueryMultipleRows);
636 }
637 let val = if qr.rows.is_empty() {
638 Value::Null
639 } else {
640 qr.rows[0][0].clone()
641 };
642 Ok(Expr::Literal(val))
643 }
644 Expr::Exists { subquery, negated } => {
645 let qr = exec_sub(subquery)?;
646 let exists = !qr.rows.is_empty();
647 let result = if *negated { !exists } else { exists };
648 Ok(Expr::Literal(Value::Boolean(result)))
649 }
650 Expr::InList {
651 expr: e,
652 list,
653 negated,
654 } => {
655 let inner = materialize_expr(e, exec_sub)?;
656 let items = list
657 .iter()
658 .map(|item| materialize_expr(item, exec_sub))
659 .collect::<Result<Vec<_>>>()?;
660 Ok(Expr::InList {
661 expr: Box::new(inner),
662 list: items,
663 negated: *negated,
664 })
665 }
666 Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
667 left: Box::new(materialize_expr(left, exec_sub)?),
668 op: *op,
669 right: Box::new(materialize_expr(right, exec_sub)?),
670 }),
671 Expr::UnaryOp { op, expr: e } => Ok(Expr::UnaryOp {
672 op: *op,
673 expr: Box::new(materialize_expr(e, exec_sub)?),
674 }),
675 Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(materialize_expr(e, exec_sub)?))),
676 Expr::IsNotNull(e) => Ok(Expr::IsNotNull(Box::new(materialize_expr(e, exec_sub)?))),
677 Expr::InSet {
678 expr: e,
679 values,
680 has_null,
681 negated,
682 } => Ok(Expr::InSet {
683 expr: Box::new(materialize_expr(e, exec_sub)?),
684 values: values.clone(),
685 has_null: *has_null,
686 negated: *negated,
687 }),
688 Expr::Between {
689 expr: e,
690 low,
691 high,
692 negated,
693 } => Ok(Expr::Between {
694 expr: Box::new(materialize_expr(e, exec_sub)?),
695 low: Box::new(materialize_expr(low, exec_sub)?),
696 high: Box::new(materialize_expr(high, exec_sub)?),
697 negated: *negated,
698 }),
699 Expr::Like {
700 expr: e,
701 pattern,
702 escape,
703 negated,
704 } => {
705 let esc = escape
706 .as_ref()
707 .map(|es| materialize_expr(es, exec_sub).map(Box::new))
708 .transpose()?;
709 Ok(Expr::Like {
710 expr: Box::new(materialize_expr(e, exec_sub)?),
711 pattern: Box::new(materialize_expr(pattern, exec_sub)?),
712 escape: esc,
713 negated: *negated,
714 })
715 }
716 Expr::Case {
717 operand,
718 conditions,
719 else_result,
720 } => {
721 let op = operand
722 .as_ref()
723 .map(|e| materialize_expr(e, exec_sub).map(Box::new))
724 .transpose()?;
725 let conds = conditions
726 .iter()
727 .map(|(c, r)| {
728 Ok((
729 materialize_expr(c, exec_sub)?,
730 materialize_expr(r, exec_sub)?,
731 ))
732 })
733 .collect::<Result<Vec<_>>>()?;
734 let else_r = else_result
735 .as_ref()
736 .map(|e| materialize_expr(e, exec_sub).map(Box::new))
737 .transpose()?;
738 Ok(Expr::Case {
739 operand: op,
740 conditions: conds,
741 else_result: else_r,
742 })
743 }
744 Expr::Coalesce(args) => {
745 let materialized = args
746 .iter()
747 .map(|a| materialize_expr(a, exec_sub))
748 .collect::<Result<Vec<_>>>()?;
749 Ok(Expr::Coalesce(materialized))
750 }
751 Expr::Cast { expr: e, data_type } => Ok(Expr::Cast {
752 expr: Box::new(materialize_expr(e, exec_sub)?),
753 data_type: *data_type,
754 }),
755 Expr::Function {
756 name,
757 args,
758 distinct,
759 } => {
760 let materialized = args
761 .iter()
762 .map(|a| materialize_expr(a, exec_sub))
763 .collect::<Result<Vec<_>>>()?;
764 Ok(Expr::Function {
765 name: name.clone(),
766 args: materialized,
767 distinct: *distinct,
768 })
769 }
770 other => Ok(other.clone()),
771 }
772}
773
774pub(super) fn materialize_stmt(
775 stmt: &SelectStmt,
776 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
777) -> Result<SelectStmt> {
778 let where_clause = stmt
779 .where_clause
780 .as_ref()
781 .map(|e| materialize_expr(e, exec_sub))
782 .transpose()?;
783 let having = stmt
784 .having
785 .as_ref()
786 .map(|e| materialize_expr(e, exec_sub))
787 .transpose()?;
788 let columns = stmt
789 .columns
790 .iter()
791 .map(|c| match c {
792 SelectColumn::AllColumns => Ok(SelectColumn::AllColumns),
793 SelectColumn::AllFromOld => Ok(SelectColumn::AllFromOld),
794 SelectColumn::AllFromNew => Ok(SelectColumn::AllFromNew),
795 SelectColumn::Expr { expr, alias } => Ok(SelectColumn::Expr {
796 expr: materialize_expr(expr, exec_sub)?,
797 alias: alias.clone(),
798 }),
799 })
800 .collect::<Result<Vec<_>>>()?;
801 let order_by = stmt
802 .order_by
803 .iter()
804 .map(|ob| {
805 Ok(OrderByItem {
806 expr: materialize_expr(&ob.expr, exec_sub)?,
807 descending: ob.descending,
808 nulls_first: ob.nulls_first,
809 })
810 })
811 .collect::<Result<Vec<_>>>()?;
812 let joins = stmt
813 .joins
814 .iter()
815 .map(|j| {
816 let on_clause = j
817 .on_clause
818 .as_ref()
819 .map(|e| materialize_expr(e, exec_sub))
820 .transpose()?;
821 Ok(JoinClause {
822 join_type: j.join_type,
823 table: j.table.clone(),
824 subquery: j.subquery.clone(),
825 on_clause,
826 })
827 })
828 .collect::<Result<Vec<_>>>()?;
829 let group_by = stmt
830 .group_by
831 .iter()
832 .map(|e| materialize_expr(e, exec_sub))
833 .collect::<Result<Vec<_>>>()?;
834 Ok(SelectStmt {
835 columns,
836 from: stmt.from.clone(),
837 from_alias: stmt.from_alias.clone(),
838 from_subquery: stmt.from_subquery.clone(),
839 from_args: stmt.from_args.clone(),
840 from_json_table: stmt.from_json_table.clone(),
841 joins,
842 distinct: stmt.distinct,
843 where_clause,
844 order_by,
845 limit: stmt.limit.clone(),
846 offset: stmt.offset.clone(),
847 group_by,
848 having,
849 })
850}
851
852pub(super) fn exec_subquery_read(
853 db: &Database,
854 schema: &SchemaManager,
855 stmt: &SelectStmt,
856 ctes: &CteContext,
857) -> Result<QueryResult> {
858 let mut rtx = db.begin_read();
859 exec_subquery_with_read(&mut rtx, schema, stmt, ctes)
860}
861
862pub(super) fn exec_subquery_with_read(
863 rtx: &mut ReadTxn<'_>,
864 schema: &SchemaManager,
865 stmt: &SelectStmt,
866 ctes: &CteContext,
867) -> Result<QueryResult> {
868 match super::exec_select_with_read(rtx, schema, stmt, ctes)? {
869 ExecutionResult::Query(qr) => Ok(qr),
870 _ => Ok(QueryResult {
871 columns: vec![],
872 rows: vec![],
873 }),
874 }
875}
876
877pub(super) fn exec_subquery_write(
878 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
879 schema: &SchemaManager,
880 stmt: &SelectStmt,
881 ctes: &CteContext,
882) -> Result<QueryResult> {
883 match super::exec_select_in_txn(wtx, schema, stmt, ctes)? {
884 ExecutionResult::Query(qr) => Ok(qr),
885 _ => Ok(QueryResult {
886 columns: vec![],
887 rows: vec![],
888 }),
889 }
890}
891
892pub(super) fn update_has_subquery(stmt: &UpdateStmt) -> bool {
893 stmt.where_clause.as_ref().is_some_and(has_subquery)
894 || stmt.assignments.iter().any(|(_, e)| has_subquery(e))
895}
896
897pub(super) fn materialize_update(
898 stmt: &UpdateStmt,
899 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
900) -> Result<UpdateStmt> {
901 let where_clause = stmt
902 .where_clause
903 .as_ref()
904 .map(|e| materialize_expr(e, exec_sub))
905 .transpose()?;
906 let assignments = stmt
907 .assignments
908 .iter()
909 .map(|(name, expr)| Ok((name.clone(), materialize_expr(expr, exec_sub)?)))
910 .collect::<Result<Vec<_>>>()?;
911 Ok(UpdateStmt {
912 table: stmt.table.clone(),
913 assignments,
914 where_clause,
915 returning: stmt.returning.clone(),
916 })
917}
918
919pub(super) fn delete_has_subquery(stmt: &DeleteStmt) -> bool {
920 stmt.where_clause.as_ref().is_some_and(has_subquery)
921}
922
923pub(super) fn materialize_delete(
924 stmt: &DeleteStmt,
925 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
926) -> Result<DeleteStmt> {
927 let where_clause = stmt
928 .where_clause
929 .as_ref()
930 .map(|e| materialize_expr(e, exec_sub))
931 .transpose()?;
932 Ok(DeleteStmt {
933 table: stmt.table.clone(),
934 where_clause,
935 returning: stmt.returning.clone(),
936 })
937}
938
939pub(super) fn insert_has_subquery(stmt: &InsertStmt) -> bool {
940 match &stmt.source {
941 InsertSource::Values(rows) => rows.iter().any(|row| row.iter().any(has_subquery)),
942 InsertSource::Select(_) => false,
944 }
945}
946
947pub(super) fn materialize_insert(
948 stmt: &InsertStmt,
949 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
950) -> Result<InsertStmt> {
951 let source = match &stmt.source {
952 InsertSource::Values(rows) => {
953 let mat = rows
954 .iter()
955 .map(|row| {
956 row.iter()
957 .map(|e| materialize_expr(e, exec_sub))
958 .collect::<Result<Vec<_>>>()
959 })
960 .collect::<Result<Vec<_>>>()?;
961 InsertSource::Values(mat)
962 }
963 InsertSource::Select(sq) => {
964 let ctes = sq
965 .ctes
966 .iter()
967 .map(|c| {
968 Ok(CteDefinition {
969 name: c.name.clone(),
970 column_aliases: c.column_aliases.clone(),
971 body: materialize_query_body(&c.body, exec_sub)?,
972 })
973 })
974 .collect::<Result<Vec<_>>>()?;
975 let body = materialize_query_body(&sq.body, exec_sub)?;
976 InsertSource::Select(Box::new(SelectQuery {
977 ctes,
978 recursive: sq.recursive,
979 body,
980 }))
981 }
982 };
983 Ok(InsertStmt {
984 table: stmt.table.clone(),
985 columns: stmt.columns.clone(),
986 source,
987 on_conflict: stmt.on_conflict.clone(),
988 returning: stmt.returning.clone(),
989 })
990}
991
992pub(super) fn materialize_query_body(
993 body: &QueryBody,
994 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
995) -> Result<QueryBody> {
996 match body {
997 QueryBody::Select(sel) => Ok(QueryBody::Select(Box::new(materialize_stmt(
998 sel, exec_sub,
999 )?))),
1000 QueryBody::Compound(comp) => Ok(QueryBody::Compound(Box::new(CompoundSelect {
1001 op: comp.op.clone(),
1002 all: comp.all,
1003 left: Box::new(materialize_query_body(&comp.left, exec_sub)?),
1004 right: Box::new(materialize_query_body(&comp.right, exec_sub)?),
1005 order_by: comp.order_by.clone(),
1006 limit: comp.limit.clone(),
1007 offset: comp.offset.clone(),
1008 }))),
1009 QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Ok(body.clone()),
1010 }
1011}
1012
1013pub(super) fn exec_query_body_with_read(
1014 rtx: &mut ReadTxn<'_>,
1015 schema: &SchemaManager,
1016 body: &QueryBody,
1017 ctes: &CteContext,
1018) -> Result<ExecutionResult> {
1019 match body {
1020 QueryBody::Select(sel) => super::exec_select_with_read(rtx, schema, sel, ctes),
1021 QueryBody::Compound(comp) => exec_compound_select_with_read(rtx, schema, comp, ctes),
1022 QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Err(
1023 SqlError::Unsupported("DML CTE bodies require an active write transaction".into()),
1024 ),
1025 }
1026}
1027
1028pub(super) fn exec_query_body_in_txn(
1029 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1030 schema: &SchemaManager,
1031 body: &QueryBody,
1032 ctes: &CteContext,
1033) -> Result<ExecutionResult> {
1034 match body {
1035 QueryBody::Select(sel) => super::exec_select_in_txn(wtx, schema, sel, ctes),
1036 QueryBody::Compound(comp) => exec_compound_select_in_txn(wtx, schema, comp, ctes),
1037 QueryBody::Insert(ins) => exec_insert_in_txn_with_ctes(wtx, schema, ins, &[], ctes),
1038 QueryBody::Update(upd) => super::exec_update_in_txn(wtx, schema, upd),
1039 QueryBody::Delete(del) => super::exec_delete_in_txn(wtx, schema, del),
1040 }
1041}
1042
1043pub(super) fn exec_query_body_read(
1044 db: &Database,
1045 schema: &SchemaManager,
1046 body: &QueryBody,
1047 ctes: &CteContext,
1048) -> Result<QueryResult> {
1049 let mut rtx = db.begin_read();
1050 exec_query_body_with_read_qr(&mut rtx, schema, body, ctes)
1051}
1052
1053pub(super) fn exec_query_body_with_read_qr(
1054 rtx: &mut ReadTxn<'_>,
1055 schema: &SchemaManager,
1056 body: &QueryBody,
1057 ctes: &CteContext,
1058) -> Result<QueryResult> {
1059 match exec_query_body_with_read(rtx, schema, body, ctes)? {
1060 ExecutionResult::Query(qr) => Ok(qr),
1061 _ => Ok(QueryResult {
1062 columns: vec![],
1063 rows: vec![],
1064 }),
1065 }
1066}
1067
1068pub(super) fn exec_query_body_write(
1069 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1070 schema: &SchemaManager,
1071 body: &QueryBody,
1072 ctes: &CteContext,
1073) -> Result<QueryResult> {
1074 match exec_query_body_in_txn(wtx, schema, body, ctes)? {
1075 ExecutionResult::Query(qr) => Ok(qr),
1076 _ => Ok(QueryResult {
1077 columns: vec![],
1078 rows: vec![],
1079 }),
1080 }
1081}
1082
1083pub(super) fn exec_compound_select_with_read(
1084 rtx: &mut ReadTxn<'_>,
1085 schema: &SchemaManager,
1086 comp: &CompoundSelect,
1087 ctes: &CteContext,
1088) -> Result<ExecutionResult> {
1089 let left_qr = match exec_query_body_with_read(rtx, schema, &comp.left, ctes)? {
1090 ExecutionResult::Query(qr) => qr,
1091 _ => QueryResult {
1092 columns: vec![],
1093 rows: vec![],
1094 },
1095 };
1096 let right_qr = match exec_query_body_with_read(rtx, schema, &comp.right, ctes)? {
1097 ExecutionResult::Query(qr) => qr,
1098 _ => QueryResult {
1099 columns: vec![],
1100 rows: vec![],
1101 },
1102 };
1103 apply_set_operation(comp, left_qr, right_qr)
1104}
1105
1106pub(super) fn exec_compound_select_in_txn(
1107 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1108 schema: &SchemaManager,
1109 comp: &CompoundSelect,
1110 ctes: &CteContext,
1111) -> Result<ExecutionResult> {
1112 let left_qr = match exec_query_body_in_txn(wtx, schema, &comp.left, ctes)? {
1113 ExecutionResult::Query(qr) => qr,
1114 _ => QueryResult {
1115 columns: vec![],
1116 rows: vec![],
1117 },
1118 };
1119 let right_qr = match exec_query_body_in_txn(wtx, schema, &comp.right, ctes)? {
1120 ExecutionResult::Query(qr) => qr,
1121 _ => QueryResult {
1122 columns: vec![],
1123 rows: vec![],
1124 },
1125 };
1126 apply_set_operation(comp, left_qr, right_qr)
1127}
1128
1129pub(super) fn apply_set_operation(
1130 comp: &CompoundSelect,
1131 left_qr: QueryResult,
1132 right_qr: QueryResult,
1133) -> Result<ExecutionResult> {
1134 if !left_qr.columns.is_empty()
1135 && !right_qr.columns.is_empty()
1136 && left_qr.columns.len() != right_qr.columns.len()
1137 {
1138 return Err(SqlError::CompoundColumnCountMismatch {
1139 left: left_qr.columns.len(),
1140 right: right_qr.columns.len(),
1141 });
1142 }
1143
1144 let columns = left_qr.columns;
1145
1146 let mut rows = match (&comp.op, comp.all) {
1147 (SetOp::Union, true) => {
1148 let total = left_qr.rows.len().saturating_add(right_qr.rows.len());
1149 let mut rows = Vec::with_capacity(total);
1150 rows.extend(left_qr.rows);
1151 rows.extend(right_qr.rows);
1152 rows
1153 }
1154 (SetOp::Union, false) => {
1155 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1156 let mut rows = Vec::new();
1157 for row in left_qr.rows.into_iter().chain(right_qr.rows) {
1158 if !seen.contains(&row) {
1159 seen.insert(row.clone());
1160 rows.push(row);
1161 }
1162 }
1163 rows
1164 }
1165 (SetOp::Intersect, true) => {
1166 let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1167 for row in &right_qr.rows {
1168 *right_counts.entry(row.clone()).or_insert(0) += 1;
1169 }
1170 let mut rows = Vec::new();
1171 for row in left_qr.rows {
1172 if let Some(count) = right_counts.get_mut(&row) {
1173 if *count > 0 {
1174 *count -= 1;
1175 rows.push(row);
1176 }
1177 }
1178 }
1179 rows
1180 }
1181 (SetOp::Intersect, false) => {
1182 let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1183 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1184 let mut rows = Vec::new();
1185 for row in left_qr.rows {
1186 if right_set.contains(&row) && !seen.contains(&row) {
1187 seen.insert(row.clone());
1188 rows.push(row);
1189 }
1190 }
1191 rows
1192 }
1193 (SetOp::Except, true) => {
1194 let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1195 for row in &right_qr.rows {
1196 *right_counts.entry(row.clone()).or_insert(0) += 1;
1197 }
1198 let mut rows = Vec::new();
1199 for row in left_qr.rows {
1200 if let Some(count) = right_counts.get_mut(&row) {
1201 if *count > 0 {
1202 *count -= 1;
1203 continue;
1204 }
1205 }
1206 rows.push(row);
1207 }
1208 rows
1209 }
1210 (SetOp::Except, false) => {
1211 let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1212 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1213 let mut rows = Vec::new();
1214 for row in left_qr.rows {
1215 if !right_set.contains(&row) && !seen.contains(&row) {
1216 seen.insert(row.clone());
1217 rows.push(row);
1218 }
1219 }
1220 rows
1221 }
1222 };
1223
1224 if !comp.order_by.is_empty() {
1225 let col_defs: Vec<crate::types::ColumnDef> = columns
1226 .iter()
1227 .enumerate()
1228 .map(|(i, name)| crate::types::ColumnDef {
1229 name: name.clone(),
1230 data_type: crate::types::DataType::Null,
1231 nullable: true,
1232 position: i as u16,
1233 default_expr: None,
1234 default_sql: None,
1235 check_expr: None,
1236 check_sql: None,
1237 check_name: None,
1238 is_with_timezone: false,
1239 generated_expr: None,
1240 generated_sql: None,
1241 generated_kind: None,
1242 collation: crate::types::Collation::Binary,
1243 })
1244 .collect();
1245 sort_rows(&mut rows, &comp.order_by, &col_defs)?;
1246 }
1247
1248 if let Some(ref offset_expr) = comp.offset {
1249 let offset = eval_const_int(offset_expr)?.max(0) as usize;
1250 if offset < rows.len() {
1251 rows = rows.split_off(offset);
1252 } else {
1253 rows.clear();
1254 }
1255 }
1256
1257 if let Some(ref limit_expr) = comp.limit {
1258 let limit = eval_const_int(limit_expr)?.max(0) as usize;
1259 rows.truncate(limit);
1260 }
1261
1262 Ok(ExecutionResult::Query(QueryResult { columns, rows }))
1263}
1264
1265struct InsertBufs {
1266 row: Vec<Value>,
1267 pk_values: Vec<Value>,
1268 value_values: Vec<Value>,
1269 key_buf: Vec<u8>,
1270 value_buf: Vec<u8>,
1271 col_indices: Vec<usize>,
1272 fk_key_buf: Vec<u8>,
1273}
1274
1275impl InsertBufs {
1276 fn new() -> Self {
1277 Self {
1278 row: Vec::new(),
1279 pk_values: Vec::new(),
1280 value_values: Vec::new(),
1281 key_buf: Vec::with_capacity(64),
1282 value_buf: Vec::with_capacity(256),
1283 col_indices: Vec::new(),
1284 fk_key_buf: Vec::with_capacity(64),
1285 }
1286 }
1287}
1288
1289thread_local! {
1290 static INSERT_SCRATCH: RefCell<InsertBufs> = RefCell::new(InsertBufs::new());
1291 static UPSERT_SCRATCH: RefCell<UpsertBufs> = RefCell::new(UpsertBufs::new());
1292}
1293
1294fn with_insert_scratch<R>(f: impl FnOnce(&mut InsertBufs) -> R) -> R {
1295 INSERT_SCRATCH.with(|slot| match slot.try_borrow_mut() {
1296 Ok(mut borrowed) => f(&mut borrowed),
1297 Err(_) => {
1298 let mut local = InsertBufs::new();
1299 f(&mut local)
1300 }
1301 })
1302}
1303
1304pub(super) struct UpsertBufs {
1305 old_row: Vec<Value>,
1306 new_row: Vec<Value>,
1307 value_values: Vec<Value>,
1308 new_value_buf: Vec<u8>,
1309}
1310
1311impl UpsertBufs {
1312 pub(super) fn new() -> Self {
1313 Self {
1314 old_row: Vec::new(),
1315 new_row: Vec::new(),
1316 value_values: Vec::new(),
1317 new_value_buf: Vec::with_capacity(256),
1318 }
1319 }
1320}
1321
1322pub fn exec_insert_in_txn(
1323 wtx: &mut WriteTxn<'_>,
1324 schema: &SchemaManager,
1325 stmt: &InsertStmt,
1326 params: &[Value],
1327) -> Result<ExecutionResult> {
1328 with_insert_scratch(|bufs| {
1329 exec_insert_in_txn_impl(
1330 wtx,
1331 schema,
1332 stmt,
1333 params,
1334 bufs,
1335 None,
1336 &CteContext::default(),
1337 )
1338 })
1339}
1340
1341pub(super) fn exec_insert_in_txn_with_ctes(
1342 wtx: &mut WriteTxn<'_>,
1343 schema: &SchemaManager,
1344 stmt: &InsertStmt,
1345 params: &[Value],
1346 outer_ctes: &CteContext,
1347) -> Result<ExecutionResult> {
1348 with_insert_scratch(|bufs| {
1349 exec_insert_in_txn_impl(wtx, schema, stmt, params, bufs, None, outer_ctes)
1350 })
1351}
1352
1353fn exec_insert_in_txn_cached(
1354 wtx: &mut WriteTxn<'_>,
1355 schema: &SchemaManager,
1356 stmt: &InsertStmt,
1357 params: &[Value],
1358 cache: &InsertCache,
1359) -> Result<ExecutionResult> {
1360 with_insert_scratch(|bufs| {
1361 exec_insert_in_txn_impl(
1362 wtx,
1363 schema,
1364 stmt,
1365 params,
1366 bufs,
1367 Some(cache),
1368 &CteContext::default(),
1369 )
1370 })
1371}
1372
1373fn exec_insert_in_txn_impl(
1374 wtx: &mut WriteTxn<'_>,
1375 schema: &SchemaManager,
1376 stmt: &InsertStmt,
1377 params: &[Value],
1378 bufs: &mut InsertBufs,
1379 cache: Option<&InsertCache>,
1380 outer_ctes: &CteContext,
1381) -> Result<ExecutionResult> {
1382 let empty_ctes = CteContext::default();
1383 let materialized;
1384 let has_sub = match cache {
1385 Some(c) => c.has_subquery,
1386 None => insert_has_subquery(stmt),
1387 };
1388 let stmt = if has_sub {
1389 materialized = materialize_insert(stmt, &mut |sub| {
1390 exec_subquery_write(wtx, schema, sub, &empty_ctes)
1391 })?;
1392 &materialized
1393 } else {
1394 stmt
1395 };
1396
1397 let view_lookup_key = stmt.table.to_ascii_lowercase();
1398 if let Some(view_def) = schema.get_view(&view_lookup_key) {
1399 if super::triggers::has_instead_of(
1400 schema,
1401 &view_lookup_key,
1402 super::triggers::FireEvent::Insert,
1403 ) {
1404 let aliases = view_def.column_aliases.clone();
1405 return exec_instead_of_view_insert_in_txn(
1406 wtx,
1407 schema,
1408 &view_lookup_key,
1409 &aliases,
1410 stmt,
1411 params,
1412 );
1413 }
1414 return Err(SqlError::CannotModifyView(stmt.table.clone()));
1415 }
1416
1417 let table_schema = schema
1418 .get(&stmt.table)
1419 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1420 schema.mark_dml(&table_schema.name);
1421
1422 let default_columns;
1423 let insert_columns: &[String] = if stmt.columns.is_empty() {
1424 default_columns = table_schema
1425 .columns
1426 .iter()
1427 .map(|c| c.name.clone())
1428 .collect::<Vec<_>>();
1429 &default_columns
1430 } else {
1431 &stmt.columns
1432 };
1433
1434 bufs.col_indices.clear();
1435 if let Some(c) = cache {
1436 bufs.col_indices.extend_from_slice(&c.col_indices);
1437 } else {
1438 for name in insert_columns {
1439 bufs.col_indices.push(
1440 table_schema
1441 .column_index(name)
1442 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?,
1443 );
1444 }
1445 }
1446
1447 if cache.is_none() {
1448 for &ci in &bufs.col_indices {
1449 if table_schema.columns[ci].generated_kind.is_some() {
1450 return Err(SqlError::CannotInsertIntoGeneratedColumn(
1451 table_schema.columns[ci].name.clone(),
1452 ));
1453 }
1454 }
1455 }
1456
1457 let generated_cols_uncached: Vec<(usize, &Expr, FastGenEval)>;
1458 let cached_gen_positions: &[usize];
1459 let cached_gen_fast_evals: &[FastGenEval];
1460 if let Some(c) = cache {
1461 cached_gen_positions = &c.generated_col_positions;
1462 cached_gen_fast_evals = &c.generated_fast_evals;
1463 generated_cols_uncached = Vec::new();
1464 } else {
1465 cached_gen_positions = &[];
1466 cached_gen_fast_evals = &[];
1467 generated_cols_uncached = table_schema
1468 .columns
1469 .iter()
1470 .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
1471 .map(|c| {
1472 let expr = c.generated_expr.as_ref().unwrap();
1473 let fe = detect_fast_gen_eval(expr, table_schema);
1474 (c.position as usize, expr, fe)
1475 })
1476 .collect();
1477 }
1478 let has_gen_cols = !cached_gen_positions.is_empty() || !generated_cols_uncached.is_empty();
1479 let row_col_map_for_gen_owned: Option<ColumnMap> = if !has_gen_cols || cache.is_some() {
1480 None
1481 } else {
1482 Some(ColumnMap::new(&table_schema.columns))
1483 };
1484 let row_col_map_for_gen: Option<&ColumnMap> = if !has_gen_cols {
1485 None
1486 } else if let Some(c) = cache {
1487 c.row_col_map.as_ref()
1488 } else {
1489 row_col_map_for_gen_owned.as_ref()
1490 };
1491
1492 let any_defaults = match cache {
1493 Some(c) => c.any_defaults,
1494 None => table_schema
1495 .columns
1496 .iter()
1497 .any(|c| c.default_expr.is_some()),
1498 };
1499 let defaults: Vec<(usize, &Expr)> = if any_defaults {
1500 table_schema
1501 .columns
1502 .iter()
1503 .filter(|c| {
1504 c.default_expr.is_some() && !bufs.col_indices.contains(&(c.position as usize))
1505 })
1506 .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
1507 .collect()
1508 } else {
1509 Vec::new()
1510 };
1511
1512 let has_checks = match cache {
1513 Some(c) => c.has_checks,
1514 None => table_schema.has_checks(),
1515 };
1516 let check_col_map = if has_checks {
1517 Some(ColumnMap::new(&table_schema.columns))
1518 } else {
1519 None
1520 };
1521
1522 let (pk_indices, non_pk, enc_pos, phys_count, dropped): (
1523 &[usize],
1524 &[usize],
1525 &[u16],
1526 usize,
1527 &[u16],
1528 ) = if let Some(c) = cache {
1529 (
1530 &c.pk_indices,
1531 &c.non_pk_indices,
1532 &c.encoding_positions,
1533 c.phys_count,
1534 &c.dropped_non_pk_slots,
1535 )
1536 } else {
1537 (
1538 table_schema.pk_indices(),
1539 table_schema.non_pk_indices(),
1540 table_schema.encoding_positions(),
1541 table_schema.physical_non_pk_count(),
1542 table_schema.dropped_non_pk_slots(),
1543 )
1544 };
1545
1546 bufs.row.resize(table_schema.columns.len(), Value::Null);
1547 bufs.pk_values.resize(pk_indices.len(), Value::Null);
1548 bufs.value_values.resize(phys_count, Value::Null);
1549
1550 let table_bytes = table_schema.name.as_bytes();
1551 let has_fks = !table_schema.foreign_keys.is_empty();
1552 let has_indices = !table_schema.indices.is_empty();
1553 let has_defaults = !defaults.is_empty();
1554
1555 let compiled_conflict: Option<Arc<CompiledOnConflict>> = match (cache, &stmt.on_conflict) {
1556 (Some(c), Some(_)) if c.on_conflict.is_some() => c.on_conflict.clone(),
1557 (_, Some(oc)) => Some(Arc::new(compile_on_conflict(oc, table_schema)?)),
1558 (_, None) => None,
1559 };
1560
1561 let row_col_map_owned: Option<ColumnMap> =
1562 if compiled_conflict.is_some() && cache.and_then(|c| c.row_col_map.as_ref()).is_none() {
1563 Some(ColumnMap::new(&table_schema.columns))
1564 } else {
1565 None
1566 };
1567 let row_col_map: Option<&ColumnMap> = cache
1568 .and_then(|c| c.row_col_map.as_ref())
1569 .or(row_col_map_owned.as_ref());
1570
1571 let select_rows = match &stmt.source {
1572 InsertSource::Select(sq) => {
1573 let insert_ctes = super::materialize_all_ctes_with_outer(
1574 &sq.ctes,
1575 sq.recursive,
1576 outer_ctes,
1577 &mut |body, ctx| exec_query_body_write(wtx, schema, body, ctx),
1578 )?;
1579 let qr = exec_query_body_write(wtx, schema, &sq.body, &insert_ctes)?;
1580 Some(qr.rows)
1581 }
1582 InsertSource::Values(_) => None,
1583 };
1584
1585 let mut count: u64 = 0;
1586 let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
1587 stmt.returning.as_ref().map(|_| Vec::new());
1588
1589 let values = match &stmt.source {
1590 InsertSource::Values(rows) => Some(rows.as_slice()),
1591 InsertSource::Select(_) => None,
1592 };
1593 let sel_rows = select_rows.as_deref();
1594
1595 let total = match (values, sel_rows) {
1596 (Some(rows), _) => rows.len(),
1597 (_, Some(rows)) => rows.len(),
1598 _ => 0,
1599 };
1600
1601 if let Some(sel) = sel_rows {
1602 if !sel.is_empty() && sel[0].len() != insert_columns.len() {
1603 return Err(SqlError::InvalidValue(format!(
1604 "INSERT ... SELECT column count mismatch: expected {}, got {}",
1605 insert_columns.len(),
1606 sel[0].len()
1607 )));
1608 }
1609 }
1610
1611 let has_insert_statement_triggers_impl =
1612 schema.triggers_for(&table_schema.name).iter().any(|t| {
1613 t.enabled
1614 && t.granularity == crate::parser::TriggerGranularity::ForEachStatement
1615 && t.events
1616 .iter()
1617 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1618 });
1619 let mut stmt_new_rows_impl: Vec<Vec<Value>> = if has_insert_statement_triggers_impl {
1620 Vec::with_capacity(total)
1621 } else {
1622 Vec::new()
1623 };
1624 if has_insert_statement_triggers_impl {
1625 super::triggers::fire_statement_triggers(
1626 wtx,
1627 schema,
1628 &table_schema.name,
1629 crate::parser::TriggerTiming::Before,
1630 super::triggers::FireEvent::Insert,
1631 &table_schema.columns,
1632 &[],
1633 &[],
1634 )?;
1635 }
1636
1637 let skip_row_clear = cache.is_some_and(|c| c.row_fully_overwritten);
1638 for idx in 0..total {
1639 if !skip_row_clear {
1640 for v in bufs.row.iter_mut() {
1641 *v = Value::Null;
1642 }
1643 }
1644
1645 if let Some(value_rows) = values {
1646 if let Some(plan) = cache.and_then(|c| c.bind_plan.as_ref()) {
1647 for action in plan {
1648 match action {
1649 BindAction::Param {
1650 param_idx,
1651 col_idx,
1652 target,
1653 } => {
1654 let v = ¶ms[*param_idx];
1655 bufs.row[*col_idx] = if v.is_null() {
1656 Value::Null
1657 } else if v.data_type() == *target {
1658 v.clone()
1659 } else {
1660 let got = v.data_type();
1661 v.clone().coerce_into(*target).ok_or_else(|| {
1662 SqlError::TypeMismatch {
1663 expected: target.to_string(),
1664 got: got.to_string(),
1665 }
1666 })?
1667 };
1668 }
1669 BindAction::Literal { value, col_idx } => {
1670 bufs.row[*col_idx] = value.clone();
1671 }
1672 }
1673 }
1674 } else {
1675 let value_row = &value_rows[idx];
1676 if value_row.len() != insert_columns.len() {
1677 return Err(SqlError::InvalidValue(format!(
1678 "expected {} values, got {}",
1679 insert_columns.len(),
1680 value_row.len()
1681 )));
1682 }
1683 for (i, expr) in value_row.iter().enumerate() {
1684 let val = match expr {
1685 Expr::Parameter(n) => params
1686 .get(n - 1)
1687 .cloned()
1688 .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
1689 Expr::Literal(v) => v.clone(),
1690 _ => eval_const_expr(expr)?,
1691 };
1692 let col_idx = bufs.col_indices[i];
1693 let col = &table_schema.columns[col_idx];
1694 let got_type = val.data_type();
1695 bufs.row[col_idx] = if val.is_null() {
1696 Value::Null
1697 } else {
1698 val.coerce_into(col.data_type)
1699 .ok_or_else(|| SqlError::TypeMismatch {
1700 expected: col.data_type.to_string(),
1701 got: got_type.to_string(),
1702 })?
1703 };
1704 }
1705 }
1706 } else if let Some(sel) = sel_rows {
1707 let sel_row = &sel[idx];
1708 for (i, val) in sel_row.iter().enumerate() {
1709 let col_idx = bufs.col_indices[i];
1710 let col = &table_schema.columns[col_idx];
1711 let got_type = val.data_type();
1712 bufs.row[col_idx] = if val.is_null() {
1713 Value::Null
1714 } else {
1715 val.clone().coerce_into(col.data_type).ok_or_else(|| {
1716 SqlError::TypeMismatch {
1717 expected: col.data_type.to_string(),
1718 got: got_type.to_string(),
1719 }
1720 })?
1721 };
1722 }
1723 }
1724
1725 if has_defaults {
1726 for &(pos, def_expr) in &defaults {
1727 let val = eval_const_expr(def_expr)?;
1728 let col = &table_schema.columns[pos];
1729 if !val.is_null() {
1730 let got_type = val.data_type();
1731 bufs.row[pos] =
1732 val.coerce_into(col.data_type)
1733 .ok_or_else(|| SqlError::TypeMismatch {
1734 expected: col.data_type.to_string(),
1735 got: got_type.to_string(),
1736 })?;
1737 }
1738 }
1739 }
1740
1741 if let Some(gen_map) = row_col_map_for_gen {
1742 if cache.is_some() {
1743 for (pos, fast) in cached_gen_positions
1744 .iter()
1745 .copied()
1746 .zip(cached_gen_fast_evals.iter())
1747 {
1748 let gen_expr = table_schema.columns[pos].generated_expr.as_ref().unwrap();
1749 let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1750 let col = &table_schema.columns[pos];
1751 bufs.row[pos] = if val.is_null() {
1752 Value::Null
1753 } else {
1754 let got_type = val.data_type();
1755 val.coerce_into(col.data_type)
1756 .ok_or_else(|| SqlError::TypeMismatch {
1757 expected: col.data_type.to_string(),
1758 got: got_type.to_string(),
1759 })?
1760 };
1761 }
1762 } else {
1763 for (pos, gen_expr, fast) in &generated_cols_uncached {
1764 let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1765 let col = &table_schema.columns[*pos];
1766 bufs.row[*pos] = if val.is_null() {
1767 Value::Null
1768 } else {
1769 let got_type = val.data_type();
1770 val.coerce_into(col.data_type)
1771 .ok_or_else(|| SqlError::TypeMismatch {
1772 expected: col.data_type.to_string(),
1773 got: got_type.to_string(),
1774 })?
1775 };
1776 }
1777 }
1778 }
1779
1780 if let Some(c) = cache {
1781 for &pos in &c.not_null_indices {
1782 if bufs.row[pos as usize].is_null() {
1783 return Err(SqlError::NotNullViolation(
1784 table_schema.columns[pos as usize].name.clone(),
1785 ));
1786 }
1787 }
1788 } else {
1789 for col in &table_schema.columns {
1790 if !col.nullable && bufs.row[col.position as usize].is_null() {
1791 return Err(SqlError::NotNullViolation(col.name.clone()));
1792 }
1793 }
1794 }
1795
1796 if let Some(ref col_map) = check_col_map {
1797 for col in &table_schema.columns {
1798 if let Some(ref check) = col.check_expr {
1799 let result = eval_expr(check, &EvalCtx::new(col_map, &bufs.row))?;
1800 if !is_truthy(&result) && !result.is_null() {
1801 let name = col.check_name.as_deref().unwrap_or(&col.name);
1802 return Err(SqlError::CheckViolation(name.to_string()));
1803 }
1804 }
1805 }
1806 for tc in &table_schema.check_constraints {
1807 let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &bufs.row))?;
1808 if !is_truthy(&result) && !result.is_null() {
1809 let name = tc.name.as_deref().unwrap_or(&tc.sql);
1810 return Err(SqlError::CheckViolation(name.to_string()));
1811 }
1812 }
1813 }
1814
1815 if has_fks {
1816 for fk in &table_schema.foreign_keys {
1817 let any_null = fk.columns.iter().any(|&ci| bufs.row[ci as usize].is_null());
1818 if any_null {
1819 continue;
1820 }
1821 crate::encoding::encode_composite_key_from_indices(
1822 &fk.columns,
1823 &bufs.row,
1824 &mut bufs.fk_key_buf,
1825 );
1826 if fk.deferrable && fk.initially_deferred {
1827 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
1828 wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
1829 fk_name: name,
1830 foreign_table: fk.foreign_table.as_bytes().to_vec(),
1831 parent_key: bufs.fk_key_buf.clone(),
1832 });
1833 continue;
1834 }
1835 if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &bufs.fk_key_buf) {
1836 let found = wtx
1837 .table_get(fk.foreign_table.as_bytes(), &bufs.fk_key_buf)
1838 .map_err(SqlError::Storage)?;
1839 if found.is_none() {
1840 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
1841 return Err(SqlError::ForeignKeyViolation(name.to_string()));
1842 }
1843 wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &bufs.fk_key_buf);
1844 }
1845 }
1846 }
1847
1848 let proposed_row_for_returning: Option<Vec<Value>> =
1849 returning_rows.as_ref().map(|_| bufs.row.clone());
1850 let row_for_stmt_trigger_impl: Option<Vec<Value>> = if has_insert_statement_triggers_impl {
1851 Some(bufs.row.clone())
1852 } else {
1853 None
1854 };
1855
1856 let has_before_insert_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
1857 t.enabled
1858 && t.timing == crate::parser::TriggerTiming::Before
1859 && t.granularity == crate::parser::TriggerGranularity::ForEachRow
1860 && t.events
1861 .iter()
1862 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1863 });
1864 if has_before_insert_triggers {
1865 super::triggers::fire_row_triggers(
1866 wtx,
1867 schema,
1868 &table_schema.name,
1869 crate::parser::TriggerTiming::Before,
1870 super::triggers::FireEvent::Insert,
1871 None,
1872 Some(bufs.row.clone()),
1873 &table_schema.columns,
1874 )?;
1875 }
1876
1877 for (j, &i) in pk_indices.iter().enumerate() {
1878 bufs.pk_values[j] = std::mem::replace(&mut bufs.row[i], Value::Null);
1879 }
1880 match cache.map(|c| c.single_int_pk).unwrap_or(false) {
1881 true => match bufs.pk_values[0] {
1882 Value::Integer(v) => crate::encoding::encode_int_key_into(v, &mut bufs.key_buf),
1883 _ => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1884 },
1885 false => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1886 }
1887
1888 for &slot in dropped {
1889 bufs.value_values[slot as usize] = Value::Null;
1890 }
1891 for (j, &i) in non_pk.iter().enumerate() {
1892 let col = &table_schema.columns[i];
1893 if matches!(
1894 col.generated_kind,
1895 Some(crate::parser::GeneratedKind::Virtual)
1896 ) {
1897 bufs.value_values[enc_pos[j] as usize] = Value::Null;
1898 bufs.row[i] = Value::Null;
1899 } else {
1900 bufs.value_values[enc_pos[j] as usize] =
1901 std::mem::replace(&mut bufs.row[i], Value::Null);
1902 }
1903 }
1904 match cache.and_then(|c| c.row_encoder.as_ref()) {
1905 Some(tmpl) => crate::encoding::encode_int_row_with_template(
1906 tmpl,
1907 &bufs.value_values,
1908 &mut bufs.value_buf,
1909 )?,
1910 None => encode_row_into(&bufs.value_values, &mut bufs.value_buf),
1911 }
1912
1913 if bufs.key_buf.len() > citadel_core::MAX_KEY_SIZE {
1914 return Err(SqlError::KeyTooLarge {
1915 size: bufs.key_buf.len(),
1916 max: citadel_core::MAX_KEY_SIZE,
1917 });
1918 }
1919 if bufs.value_buf.len() > citadel_core::MAX_VALUE_SIZE {
1920 return Err(SqlError::RowTooLarge {
1921 size: bufs.value_buf.len(),
1922 max: citadel_core::MAX_VALUE_SIZE,
1923 });
1924 }
1925
1926 match compiled_conflict.as_ref() {
1927 None => {
1928 let is_new = wtx
1929 .table_insert(table_bytes, &bufs.key_buf, &bufs.value_buf)
1930 .map_err(SqlError::Storage)?;
1931 if !is_new {
1932 return Err(SqlError::DuplicateKey);
1933 }
1934 let has_after_insert_triggers =
1935 schema.triggers_for(&table_schema.name).iter().any(|t| {
1936 t.enabled
1937 && t.timing == crate::parser::TriggerTiming::After
1938 && t.granularity == crate::parser::TriggerGranularity::ForEachRow
1939 && t.events
1940 .iter()
1941 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1942 });
1943 if has_indices || has_after_insert_triggers {
1944 for (j, &i) in pk_indices.iter().enumerate() {
1945 bufs.row[i] = bufs.pk_values[j].clone();
1946 }
1947 for (j, &i) in non_pk.iter().enumerate() {
1948 bufs.row[i] = std::mem::replace(
1949 &mut bufs.value_values[enc_pos[j] as usize],
1950 Value::Null,
1951 );
1952 }
1953 if has_indices {
1954 insert_index_entries(wtx, table_schema, &bufs.row, &bufs.pk_values)?;
1955 }
1956 if has_after_insert_triggers {
1957 super::triggers::fire_row_triggers(
1958 wtx,
1959 schema,
1960 &table_schema.name,
1961 crate::parser::TriggerTiming::After,
1962 super::triggers::FireEvent::Insert,
1963 None,
1964 Some(bufs.row.clone()),
1965 &table_schema.columns,
1966 )?;
1967 }
1968 }
1969 if let Some(r) = row_for_stmt_trigger_impl.clone() {
1970 stmt_new_rows_impl.push(r);
1971 }
1972 count += 1;
1973 if let Some(buf) = returning_rows.as_mut() {
1974 buf.push((None, proposed_row_for_returning));
1975 }
1976 }
1977 Some(oc) => {
1978 let oc_ref: &CompiledOnConflict = oc;
1979 let needs_row = upsert_needs_row(oc_ref, table_schema);
1980 if needs_row {
1981 for (j, &i) in pk_indices.iter().enumerate() {
1982 bufs.row[i] = bufs.pk_values[j].clone();
1983 }
1984 for (j, &i) in non_pk.iter().enumerate() {
1985 bufs.row[i] = std::mem::replace(
1986 &mut bufs.value_values[enc_pos[j] as usize],
1987 Value::Null,
1988 );
1989 }
1990 }
1991 let outcome = apply_insert_with_conflict(
1992 wtx,
1993 table_schema,
1994 &bufs.key_buf,
1995 &bufs.value_buf,
1996 &bufs.row,
1997 &bufs.pk_values,
1998 oc_ref,
1999 row_col_map.unwrap(),
2000 stmt.returning.is_some(),
2001 )?;
2002 match outcome {
2003 InsertRowOutcome::Inserted => {
2004 count += 1;
2005 if let Some(buf) = returning_rows.as_mut() {
2006 buf.push((None, proposed_row_for_returning));
2007 }
2008 if let Some(r) = row_for_stmt_trigger_impl.clone() {
2009 stmt_new_rows_impl.push(r);
2010 }
2011 let has_after_insert_triggers =
2012 schema.triggers_for(&table_schema.name).iter().any(|t| {
2013 t.enabled
2014 && t.timing == crate::parser::TriggerTiming::After
2015 && t.granularity
2016 == crate::parser::TriggerGranularity::ForEachRow
2017 && t.events
2018 .iter()
2019 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
2020 });
2021 if has_after_insert_triggers {
2022 super::triggers::fire_row_triggers(
2023 wtx,
2024 schema,
2025 &table_schema.name,
2026 crate::parser::TriggerTiming::After,
2027 super::triggers::FireEvent::Insert,
2028 None,
2029 Some(bufs.row.clone()),
2030 &table_schema.columns,
2031 )?;
2032 }
2033 }
2034 InsertRowOutcome::Updated { old, new } => {
2035 count += 1;
2036 if let Some(buf) = returning_rows.as_mut() {
2037 buf.push((Some(old.clone()), Some(new.clone())));
2038 }
2039 let has_after_update_triggers =
2040 schema.triggers_for(&table_schema.name).iter().any(|t| {
2041 t.enabled
2042 && t.timing == crate::parser::TriggerTiming::After
2043 && t.granularity
2044 == crate::parser::TriggerGranularity::ForEachRow
2045 && t.events.iter().any(|e| {
2046 matches!(e, crate::parser::TriggerEvent::Update(_))
2047 })
2048 });
2049 if has_after_update_triggers {
2050 let changed_cols: Vec<String> = match oc_ref {
2051 CompiledOnConflict::DoUpdate { assignments, .. } => assignments
2052 .iter()
2053 .map(|(col_idx, _)| table_schema.columns[*col_idx].name.clone())
2054 .collect(),
2055 _ => Vec::new(),
2056 };
2057 super::triggers::fire_row_triggers(
2058 wtx,
2059 schema,
2060 &table_schema.name,
2061 crate::parser::TriggerTiming::After,
2062 super::triggers::FireEvent::Update {
2063 changed_columns: &changed_cols,
2064 },
2065 Some(old),
2066 Some(new),
2067 &table_schema.columns,
2068 )?;
2069 }
2070 }
2071 InsertRowOutcome::Skipped => {}
2072 }
2073 }
2074 }
2075 }
2076
2077 if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
2078 if has_insert_statement_triggers_impl {
2079 super::triggers::fire_statement_triggers(
2080 wtx,
2081 schema,
2082 &table_schema.name,
2083 crate::parser::TriggerTiming::After,
2084 super::triggers::FireEvent::Insert,
2085 &table_schema.columns,
2086 &[],
2087 &stmt_new_rows_impl,
2088 )?;
2089 }
2090 return Ok(ExecutionResult::Query(super::helpers::project_returning(
2091 table_schema,
2092 returning_cols,
2093 &rows,
2094 )?));
2095 }
2096
2097 if has_insert_statement_triggers_impl {
2098 super::triggers::fire_statement_triggers(
2099 wtx,
2100 schema,
2101 &table_schema.name,
2102 crate::parser::TriggerTiming::After,
2103 super::triggers::FireEvent::Insert,
2104 &table_schema.columns,
2105 &[],
2106 &stmt_new_rows_impl,
2107 )?;
2108 }
2109
2110 Ok(ExecutionResult::RowsAffected(count))
2111}
2112
2113pub struct CompiledInsert {
2114 table_lower: String,
2115 cached: Option<InsertCache>,
2116}
2117
2118struct InsertCache {
2119 col_indices: Vec<usize>,
2120 has_subquery: bool,
2121 any_defaults: bool,
2122 has_checks: bool,
2123 on_conflict: Option<Arc<CompiledOnConflict>>,
2124 row_col_map: Option<ColumnMap>,
2125 generated_col_positions: Vec<usize>,
2126 generated_fast_evals: Vec<FastGenEval>,
2127 pk_indices: Vec<usize>,
2128 non_pk_indices: Vec<usize>,
2129 encoding_positions: Vec<u16>,
2130 dropped_non_pk_slots: Vec<u16>,
2131 phys_count: usize,
2132 single_int_pk: bool,
2133 not_null_indices: Vec<u16>,
2134 bind_plan: Option<Vec<BindAction>>,
2135 row_fully_overwritten: bool,
2136 row_encoder: Option<crate::encoding::IntRowTemplate>,
2137 is_trivial_fast: bool,
2138 trivial_fast_program: Option<TrivialFastProgram>,
2139 needs_scoped_params: bool,
2140}
2141
2142#[derive(Clone)]
2143enum BindAction {
2144 Param {
2145 param_idx: usize,
2146 col_idx: usize,
2147 target: DataType,
2148 },
2149 Literal {
2150 value: Value,
2151 col_idx: usize,
2152 },
2153}
2154
2155#[derive(Clone)]
2156struct TrivialFastProgram {
2157 template: Vec<u8>,
2158 ops: Vec<WriteOp>,
2159 pk_param: u8,
2160 not_null_param_indices: Vec<u8>,
2161}
2162
2163#[derive(Clone)]
2164enum WriteOp {
2165 ParamI64 {
2166 param_idx: u8,
2167 off: u32,
2168 },
2169 LiteralI64 {
2170 value: i64,
2171 off: u32,
2172 },
2173 GenAddParamsI64 {
2174 a_param: u8,
2175 b_param: u8,
2176 off: u32,
2177 bitmap_byte_off: u32,
2178 bitmap_bit_mask: u8,
2179 },
2180 GenMulAddParamI64 {
2181 param_idx: u8,
2182 mul: i64,
2183 add: i64,
2184 off: u32,
2185 bitmap_byte_off: u32,
2186 bitmap_bit_mask: u8,
2187 },
2188}
2189
2190fn build_trivial_fast_program(
2191 bind_plan: &[BindAction],
2192 row_encoder: &crate::encoding::IntRowTemplate,
2193 non_virtual_pairs: &[(usize, usize)],
2194 generated_col_positions: &[usize],
2195 generated_fast_evals: &[FastGenEval],
2196 pk_indices: &[usize],
2197 columns: &[crate::types::ColumnDef],
2198) -> Option<TrivialFastProgram> {
2199 let pk_col = pk_indices[0];
2200 let col_to_slot: rustc_hash::FxHashMap<usize, usize> =
2201 non_virtual_pairs.iter().copied().collect();
2202 let slot_to_off: rustc_hash::FxHashMap<usize, usize> =
2203 row_encoder.slot_offsets.iter().copied().collect();
2204
2205 let mut col_to_param: rustc_hash::FxHashMap<usize, u8> = Default::default();
2206 let mut col_to_lit_int: rustc_hash::FxHashMap<usize, i64> = Default::default();
2207 let mut pk_param: Option<u8> = None;
2208 let mut ops: Vec<WriteOp> = Vec::with_capacity(bind_plan.len() + generated_col_positions.len());
2209 let mut not_null_param_indices: Vec<u8> = Vec::new();
2210
2211 for action in bind_plan {
2212 match action {
2213 BindAction::Param {
2214 param_idx,
2215 col_idx,
2216 target,
2217 } => {
2218 if *target != DataType::Integer {
2219 return None;
2220 }
2221 let pi: u8 = u8::try_from(*param_idx).ok()?;
2222 col_to_param.insert(*col_idx, pi);
2223 if *col_idx == pk_col {
2224 pk_param = Some(pi);
2225 } else {
2226 let slot = *col_to_slot.get(col_idx)?;
2227 let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
2228 ops.push(WriteOp::ParamI64 { param_idx: pi, off });
2229 if !columns[*col_idx].nullable {
2230 not_null_param_indices.push(pi);
2231 }
2232 }
2233 }
2234 BindAction::Literal { value, col_idx } => match value {
2235 Value::Integer(v) => {
2236 col_to_lit_int.insert(*col_idx, *v);
2237 if *col_idx == pk_col {
2238 return None;
2239 }
2240 let slot = *col_to_slot.get(col_idx)?;
2241 let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
2242 ops.push(WriteOp::LiteralI64 { value: *v, off });
2243 }
2244 _ => return None,
2245 },
2246 }
2247 }
2248
2249 let pk_param = pk_param?;
2250
2251 for (i, &gen_pos) in generated_col_positions.iter().enumerate() {
2252 let gen_slot = *col_to_slot.get(&gen_pos)?;
2253 let gen_off = u32::try_from(*slot_to_off.get(&gen_slot)?).ok()?;
2254 let bitmap_byte_off = u32::try_from(2 + gen_slot / 8).ok()?;
2255 let bitmap_bit_mask: u8 = 1u8 << (gen_slot % 8);
2256 let gen_col_nullable = columns[gen_pos].nullable;
2257
2258 match &generated_fast_evals[i] {
2259 FastGenEval::IntColAddCol {
2260 left_idx,
2261 right_idx,
2262 } => {
2263 let a_param = col_to_param.get(left_idx).copied();
2264 let b_param = col_to_param.get(right_idx).copied();
2265 match (a_param, b_param) {
2266 (Some(ap), Some(bp)) => {
2267 let deps_safe = gen_col_nullable
2268 || (not_null_param_indices.contains(&ap)
2269 && not_null_param_indices.contains(&bp));
2270 if !deps_safe {
2271 return None;
2272 }
2273 ops.push(WriteOp::GenAddParamsI64 {
2274 a_param: ap,
2275 b_param: bp,
2276 off: gen_off,
2277 bitmap_byte_off,
2278 bitmap_bit_mask,
2279 });
2280 }
2281 (Some(p), None) => {
2282 let lit = col_to_lit_int.get(right_idx).copied()?;
2283 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2284 return None;
2285 }
2286 ops.push(WriteOp::GenMulAddParamI64 {
2287 param_idx: p,
2288 mul: 1,
2289 add: lit,
2290 off: gen_off,
2291 bitmap_byte_off,
2292 bitmap_bit_mask,
2293 });
2294 }
2295 (None, Some(p)) => {
2296 let lit = col_to_lit_int.get(left_idx).copied()?;
2297 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2298 return None;
2299 }
2300 ops.push(WriteOp::GenMulAddParamI64 {
2301 param_idx: p,
2302 mul: 1,
2303 add: lit,
2304 off: gen_off,
2305 bitmap_byte_off,
2306 bitmap_bit_mask,
2307 });
2308 }
2309 (None, None) => {
2310 let la = col_to_lit_int.get(left_idx).copied()?;
2311 let lb = col_to_lit_int.get(right_idx).copied()?;
2312 ops.push(WriteOp::LiteralI64 {
2313 value: la.wrapping_add(lb),
2314 off: gen_off,
2315 });
2316 }
2317 }
2318 }
2319 FastGenEval::IntColMulAdd {
2320 col_schema_idx,
2321 mul,
2322 add,
2323 } => {
2324 if let Some(p) = col_to_param.get(col_schema_idx).copied() {
2325 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2326 return None;
2327 }
2328 ops.push(WriteOp::GenMulAddParamI64 {
2329 param_idx: p,
2330 mul: *mul,
2331 add: *add,
2332 off: gen_off,
2333 bitmap_byte_off,
2334 bitmap_bit_mask,
2335 });
2336 } else if let Some(lit) = col_to_lit_int.get(col_schema_idx).copied() {
2337 ops.push(WriteOp::LiteralI64 {
2338 value: lit.wrapping_mul(*mul).wrapping_add(*add),
2339 off: gen_off,
2340 });
2341 } else {
2342 return None;
2343 }
2344 }
2345 FastGenEval::None => return None,
2346 }
2347 }
2348
2349 Some(TrivialFastProgram {
2350 template: row_encoder.template.clone(),
2351 ops,
2352 pk_param,
2353 not_null_param_indices,
2354 })
2355}
2356
2357#[derive(Clone)]
2358pub(super) enum CompiledOnConflict {
2359 DoNothing {
2360 target: Option<ConflictKind>,
2361 },
2362 DoUpdate {
2363 target: ConflictKind,
2364 assignments: Vec<(usize, Expr)>,
2365 where_clause: Option<Expr>,
2366 fast_paths: Option<Vec<DoUpdateFastPath>>,
2367 },
2368}
2369
2370#[derive(Clone, Copy)]
2371pub(super) enum DoUpdateFastPath {
2372 IntAddConst { phys_idx: usize, delta: i64 },
2373}
2374
2375#[derive(Clone, Debug)]
2376pub(super) enum ConflictKind {
2377 PrimaryKey,
2378 UniqueIndex { index_idx: usize },
2379}
2380
2381fn resolve_conflict_target(target: &ConflictTarget, ts: &TableSchema) -> Result<ConflictKind> {
2382 match target {
2383 ConflictTarget::Columns(cols) => {
2384 let col_idx_set: Vec<u16> = cols
2385 .iter()
2386 .map(|name| {
2387 ts.column_index(name)
2388 .map(|i| i as u16)
2389 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
2390 })
2391 .collect::<Result<_>>()?;
2392 let pk_set = ts.primary_key_columns.clone();
2393 if set_equal(&col_idx_set, &pk_set) {
2394 return Ok(ConflictKind::PrimaryKey);
2395 }
2396 for (index_idx, idx) in ts.indices.iter().enumerate() {
2397 if idx.unique && set_equal(&col_idx_set, &idx.columns_vec()) {
2398 return Ok(ConflictKind::UniqueIndex { index_idx });
2399 }
2400 }
2401 Err(SqlError::Plan(
2402 "ON CONFLICT target does not match any unique constraint".into(),
2403 ))
2404 }
2405 ConflictTarget::Constraint(name) => {
2406 let lower = name.to_ascii_lowercase();
2407 for (index_idx, idx) in ts.indices.iter().enumerate() {
2408 if idx.name.eq_ignore_ascii_case(&lower) {
2409 if idx.unique {
2410 return Ok(ConflictKind::UniqueIndex { index_idx });
2411 }
2412 return Err(SqlError::Plan(format!(
2413 "ON CONFLICT ON CONSTRAINT '{name}' requires a unique index"
2414 )));
2415 }
2416 }
2417 Err(SqlError::Plan(format!(
2418 "unknown constraint '{name}'; primary keys cannot be referenced by name, use ON CONFLICT (col_list)"
2419 )))
2420 }
2421 }
2422}
2423
2424fn set_equal(a: &[u16], b: &[u16]) -> bool {
2425 if a.len() != b.len() {
2426 return false;
2427 }
2428 let mut a_sorted = a.to_vec();
2429 let mut b_sorted = b.to_vec();
2430 a_sorted.sort_unstable();
2431 b_sorted.sort_unstable();
2432 a_sorted == b_sorted
2433}
2434
2435pub(super) enum InsertRowOutcome {
2436 Inserted,
2437 Updated { old: Vec<Value>, new: Vec<Value> },
2438 Skipped,
2439}
2440
2441#[allow(clippy::too_many_arguments)]
2442#[inline]
2443pub(super) fn apply_insert_with_conflict(
2444 wtx: &mut WriteTxn<'_>,
2445 table_schema: &TableSchema,
2446 key_buf: &[u8],
2447 value_buf: &[u8],
2448 row: &[Value],
2449 pk_values: &[Value],
2450 on_conflict: &CompiledOnConflict,
2451 col_map: &ColumnMap,
2452 capture_returning: bool,
2453) -> Result<InsertRowOutcome> {
2454 let table_bytes = table_schema.name.as_bytes();
2455
2456 if let CompiledOnConflict::DoNothing { target } = on_conflict {
2457 let pk_target = matches!(target, None | Some(ConflictKind::PrimaryKey));
2458 if pk_target && table_schema.indices.is_empty() && table_schema.foreign_keys.is_empty() {
2459 let inserted = wtx
2460 .table_insert_if_absent(table_bytes, key_buf, value_buf)
2461 .map_err(SqlError::Storage)?;
2462 return Ok(if inserted {
2463 InsertRowOutcome::Inserted
2464 } else {
2465 InsertRowOutcome::Skipped
2466 });
2467 }
2468 }
2469
2470 if let CompiledOnConflict::DoUpdate {
2471 target: ConflictKind::PrimaryKey,
2472 assignments,
2473 where_clause,
2474 fast_paths,
2475 } = on_conflict
2476 {
2477 if can_fuse_do_update(table_schema, assignments) {
2478 return apply_do_update_fused(
2479 wtx,
2480 table_schema,
2481 table_bytes,
2482 key_buf,
2483 value_buf,
2484 row,
2485 assignments,
2486 where_clause.as_ref(),
2487 col_map,
2488 fast_paths.as_deref(),
2489 capture_returning,
2490 );
2491 }
2492 }
2493
2494 let primary_outcome = wtx
2495 .table_insert_or_fetch(table_bytes, key_buf, value_buf)
2496 .map_err(SqlError::Storage)?;
2497
2498 match primary_outcome {
2499 citadel_txn::write_txn::InsertOutcome::Inserted => {
2500 if table_schema.indices.is_empty() {
2501 return Ok(InsertRowOutcome::Inserted);
2502 }
2503 let mut inserted_keys: Vec<(usize, Vec<u8>)> = Vec::new();
2504 match insert_index_entries_or_fetch(
2505 wtx,
2506 table_schema,
2507 row,
2508 pk_values,
2509 &mut inserted_keys,
2510 )? {
2511 None => Ok(InsertRowOutcome::Inserted),
2512 Some(conflicting_idx) => {
2513 let matches_target =
2514 matches!(on_conflict, CompiledOnConflict::DoNothing { target: None })
2515 || matches!(
2516 on_conflict,
2517 CompiledOnConflict::DoNothing {
2518 target: Some(ConflictKind::UniqueIndex { index_idx }),
2519 } | CompiledOnConflict::DoUpdate {
2520 target: ConflictKind::UniqueIndex { index_idx },
2521 ..
2522 } if *index_idx == conflicting_idx
2523 );
2524 undo_partial_insert(wtx, table_schema, key_buf, &inserted_keys)?;
2525 if !matches_target {
2526 return Err(SqlError::UniqueViolation(
2527 table_schema.indices[conflicting_idx].name.clone(),
2528 ));
2529 }
2530 match on_conflict {
2531 CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2532 CompiledOnConflict::DoUpdate {
2533 assignments,
2534 where_clause,
2535 ..
2536 } => {
2537 let existing_pk =
2538 fetch_unique_index_pk(wtx, table_schema, conflicting_idx, row)?;
2539 apply_do_update(
2540 wtx,
2541 table_schema,
2542 &existing_pk,
2543 row,
2544 assignments,
2545 where_clause.as_ref(),
2546 col_map,
2547 capture_returning,
2548 )
2549 }
2550 }
2551 }
2552 }
2553 }
2554 citadel_txn::write_txn::InsertOutcome::Existed(old_bytes) => {
2555 let matches_target = matches!(
2556 on_conflict,
2557 CompiledOnConflict::DoNothing { target: None }
2558 | CompiledOnConflict::DoNothing {
2559 target: Some(ConflictKind::PrimaryKey),
2560 }
2561 | CompiledOnConflict::DoUpdate {
2562 target: ConflictKind::PrimaryKey,
2563 ..
2564 }
2565 );
2566 if !matches_target {
2567 return Err(SqlError::DuplicateKey);
2568 }
2569 match on_conflict {
2570 CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2571 CompiledOnConflict::DoUpdate {
2572 assignments,
2573 where_clause,
2574 ..
2575 } => {
2576 let old_row = decode_full_row(table_schema, key_buf, &old_bytes)?;
2577 apply_do_update_with_old_row(
2578 wtx,
2579 table_schema,
2580 key_buf,
2581 &old_row,
2582 row,
2583 assignments,
2584 where_clause.as_ref(),
2585 col_map,
2586 capture_returning,
2587 )
2588 }
2589 }
2590 }
2591 }
2592}
2593
2594#[inline]
2595fn apply_fast_path_patch(
2596 old_bytes: &[u8],
2597 fast_paths: &[DoUpdateFastPath],
2598) -> Result<UpsertAction> {
2599 UPSERT_SCRATCH.with(|slot| {
2600 let mut bufs = slot.borrow_mut();
2601 bufs.new_value_buf.clear();
2602 bufs.new_value_buf.extend_from_slice(old_bytes);
2603
2604 let mut patch_scratch: Vec<u8> = Vec::new();
2605
2606 for fp in fast_paths {
2607 match fp {
2608 DoUpdateFastPath::IntAddConst { phys_idx, delta } => {
2609 let decoded =
2610 crate::encoding::decode_columns(&bufs.new_value_buf, &[*phys_idx])?;
2611 let old_val = &decoded[0];
2612 let new_val = match old_val {
2613 Value::Integer(i) => Value::Integer(i.wrapping_add(*delta)),
2614 Value::Null => Value::Null,
2615 _ => {
2616 return Err(SqlError::TypeMismatch {
2617 expected: "INTEGER".into(),
2618 got: old_val.data_type().to_string(),
2619 });
2620 }
2621 };
2622 if !crate::encoding::patch_column_in_place(
2623 &mut bufs.new_value_buf,
2624 *phys_idx,
2625 &new_val,
2626 )? {
2627 patch_scratch.clear();
2628 crate::encoding::patch_row_column(
2629 &bufs.new_value_buf,
2630 *phys_idx,
2631 &new_val,
2632 &mut patch_scratch,
2633 )?;
2634 std::mem::swap(&mut bufs.new_value_buf, &mut patch_scratch);
2635 }
2636 }
2637 }
2638 }
2639
2640 if bufs.new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2641 return Err(SqlError::RowTooLarge {
2642 size: bufs.new_value_buf.len(),
2643 max: citadel_core::MAX_VALUE_SIZE,
2644 });
2645 }
2646
2647 Ok(UpsertAction::Replace(bufs.new_value_buf.clone()))
2648 })
2649}
2650
2651fn upsert_needs_row(oc: &CompiledOnConflict, ts: &TableSchema) -> bool {
2652 if !ts.indices.is_empty() {
2653 return true;
2654 }
2655 match oc {
2656 CompiledOnConflict::DoNothing { .. } => false,
2657 CompiledOnConflict::DoUpdate { fast_paths, .. } => fast_paths.is_none() || ts.has_checks(),
2658 }
2659}
2660
2661fn can_fuse_do_update(ts: &TableSchema, assignments: &[(usize, Expr)]) -> bool {
2662 if !ts.indices.is_empty() {
2663 return false;
2664 }
2665 if !ts.foreign_keys.is_empty() {
2666 return false;
2667 }
2668 if ts.columns.iter().any(|c| c.generated_kind.is_some()) {
2669 return false;
2670 }
2671 let pk = ts.pk_indices();
2672 !assignments.iter().any(|(ci, _)| pk.contains(ci))
2673}
2674
2675#[allow(clippy::too_many_arguments)]
2676#[inline]
2677fn apply_do_update_fused(
2678 wtx: &mut WriteTxn<'_>,
2679 table_schema: &TableSchema,
2680 table_bytes: &[u8],
2681 key_buf: &[u8],
2682 value_buf: &[u8],
2683 proposed_row: &[Value],
2684 assignments: &[(usize, Expr)],
2685 where_clause: Option<&Expr>,
2686 col_map: &ColumnMap,
2687 fast_paths: Option<&[DoUpdateFastPath]>,
2688 capture_returning: bool,
2689) -> Result<InsertRowOutcome> {
2690 let non_pk = table_schema.non_pk_indices();
2691 let enc_pos = table_schema.encoding_positions();
2692 let phys_count = table_schema.physical_non_pk_count();
2693 let dropped = table_schema.dropped_non_pk_slots();
2694 let has_checks = table_schema.has_checks();
2695 let has_fks = !table_schema.foreign_keys.is_empty();
2696
2697 let captured: std::cell::RefCell<Option<(Vec<Value>, Vec<Value>)>> =
2698 std::cell::RefCell::new(None);
2699
2700 let outcome =
2701 wtx.table_upsert_with::<_, SqlError>(table_bytes, key_buf, value_buf, |old_bytes| {
2702 if let Some(fps) = fast_paths {
2703 if !has_checks {
2704 let action = apply_fast_path_patch(old_bytes, fps)?;
2705 if capture_returning {
2706 if let UpsertAction::Replace(ref new_bytes) = action {
2707 let old_row = decode_full_row(table_schema, key_buf, old_bytes)?;
2708 let new_row = decode_full_row(table_schema, key_buf, new_bytes)?;
2709 *captured.borrow_mut() = Some((old_row, new_row));
2710 }
2711 }
2712 return Ok(action);
2713 }
2714 }
2715 UPSERT_SCRATCH.with(|slot| {
2716 let mut bufs = slot.borrow_mut();
2717 let UpsertBufs {
2718 old_row,
2719 new_row,
2720 value_values,
2721 new_value_buf,
2722 } = &mut *bufs;
2723
2724 old_row.clear();
2725 old_row.resize(table_schema.columns.len(), Value::Null);
2726 decode_full_row_into(table_schema, key_buf, old_bytes, old_row)?;
2727
2728 if let Some(w) = where_clause {
2729 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2730 let result = eval_expr(w, &ctx)?;
2731 if result.is_null() || !is_truthy(&result) {
2732 return Ok(UpsertAction::Skip);
2733 }
2734 }
2735
2736 new_row.clear();
2737 new_row.extend_from_slice(old_row);
2738 for (col_idx, expr) in assignments {
2739 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2740 let val = eval_expr(expr, &ctx)?;
2741 let col = &table_schema.columns[*col_idx];
2742 new_row[*col_idx] = if val.is_null() {
2743 Value::Null
2744 } else {
2745 let got = val.data_type();
2746 val.coerce_into(col.data_type)
2747 .ok_or_else(|| SqlError::TypeMismatch {
2748 expected: col.data_type.to_string(),
2749 got: got.to_string(),
2750 })?
2751 };
2752 }
2753
2754 for (assigned_idx, _) in assignments {
2755 let col = &table_schema.columns[*assigned_idx];
2756 if !col.nullable && new_row[col.position as usize].is_null() {
2757 return Err(SqlError::NotNullViolation(col.name.clone()));
2758 }
2759 }
2760 if has_checks {
2761 for col in &table_schema.columns {
2762 if let Some(ref check) = col.check_expr {
2763 let ctx = EvalCtx::new(col_map, new_row);
2764 let result = eval_expr(check, &ctx)?;
2765 if !is_truthy(&result) && !result.is_null() {
2766 let name = col.check_name.as_deref().unwrap_or(&col.name);
2767 return Err(SqlError::CheckViolation(name.to_string()));
2768 }
2769 }
2770 }
2771 for tc in &table_schema.check_constraints {
2772 let ctx = EvalCtx::new(col_map, new_row);
2773 let result = eval_expr(&tc.expr, &ctx)?;
2774 if !is_truthy(&result) && !result.is_null() {
2775 let name = tc.name.as_deref().unwrap_or(&tc.sql);
2776 return Err(SqlError::CheckViolation(name.to_string()));
2777 }
2778 }
2779 }
2780 let _ = has_fks;
2781
2782 value_values.clear();
2783 value_values.resize(phys_count, Value::Null);
2784 for &slot in dropped {
2785 value_values[slot as usize] = Value::Null;
2786 }
2787 for (j, &i) in non_pk.iter().enumerate() {
2788 value_values[enc_pos[j] as usize] = new_row[i].clone();
2789 }
2790 new_value_buf.clear();
2791 crate::encoding::encode_row_into(value_values, new_value_buf);
2792
2793 if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2794 return Err(SqlError::RowTooLarge {
2795 size: new_value_buf.len(),
2796 max: citadel_core::MAX_VALUE_SIZE,
2797 });
2798 }
2799
2800 if capture_returning {
2801 *captured.borrow_mut() = Some((old_row.clone(), new_row.clone()));
2802 }
2803 Ok(UpsertAction::Replace(new_value_buf.clone()))
2804 })
2805 })?;
2806
2807 match outcome {
2808 UpsertOutcome::Inserted => Ok(InsertRowOutcome::Inserted),
2809 UpsertOutcome::Updated => {
2810 if capture_returning {
2811 let (old, new) = captured.into_inner().ok_or_else(|| {
2812 SqlError::InvalidValue("DO UPDATE produced no captured rows".into())
2813 })?;
2814 Ok(InsertRowOutcome::Updated { old, new })
2815 } else {
2816 Ok(InsertRowOutcome::Inserted)
2817 }
2818 }
2819 UpsertOutcome::Skipped => Ok(InsertRowOutcome::Skipped),
2820 }
2821}
2822
2823fn fetch_unique_index_pk(
2824 wtx: &mut WriteTxn<'_>,
2825 table_schema: &TableSchema,
2826 index_idx: usize,
2827 row: &[Value],
2828) -> Result<Vec<u8>> {
2829 let idx = &table_schema.indices[index_idx];
2830 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2831 let indexed: Vec<Value> = idx
2832 .column_positions_iter()
2833 .map(|col_idx| row[col_idx as usize].clone())
2834 .collect();
2835 let key = crate::encoding::encode_composite_key(&indexed);
2836 let value = wtx
2837 .table_get(&idx_table, &key)
2838 .map_err(SqlError::Storage)?
2839 .ok_or_else(|| {
2840 SqlError::InvalidValue("unique index missing expected collision entry".into())
2841 })?;
2842 Ok(value)
2843}
2844
2845#[allow(clippy::too_many_arguments)]
2846fn apply_do_update(
2847 wtx: &mut WriteTxn<'_>,
2848 table_schema: &TableSchema,
2849 pk_key: &[u8],
2850 proposed_row: &[Value],
2851 assignments: &[(usize, Expr)],
2852 where_clause: Option<&Expr>,
2853 col_map: &ColumnMap,
2854 capture_returning: bool,
2855) -> Result<InsertRowOutcome> {
2856 let old_value = wtx
2857 .table_get(table_schema.name.as_bytes(), pk_key)
2858 .map_err(SqlError::Storage)?
2859 .ok_or_else(|| SqlError::InvalidValue("primary row missing for DO UPDATE target".into()))?;
2860 let old_row = decode_full_row(table_schema, pk_key, &old_value)?;
2861 apply_do_update_with_old_row(
2862 wtx,
2863 table_schema,
2864 pk_key,
2865 &old_row,
2866 proposed_row,
2867 assignments,
2868 where_clause,
2869 col_map,
2870 capture_returning,
2871 )
2872}
2873
2874#[allow(clippy::too_many_arguments)]
2875fn apply_do_update_with_old_row(
2876 wtx: &mut WriteTxn<'_>,
2877 table_schema: &TableSchema,
2878 old_pk_key: &[u8],
2879 old_row: &[Value],
2880 proposed_row: &[Value],
2881 assignments: &[(usize, Expr)],
2882 where_clause: Option<&Expr>,
2883 col_map: &ColumnMap,
2884 capture_returning: bool,
2885) -> Result<InsertRowOutcome> {
2886 if let Some(w) = where_clause {
2887 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2888 let result = eval_expr(w, &ctx)?;
2889 if result.is_null() || !is_truthy(&result) {
2890 return Ok(InsertRowOutcome::Skipped);
2891 }
2892 }
2893
2894 let mut new_row = old_row.to_vec();
2895 for (col_idx, expr) in assignments {
2896 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2897 let val = eval_expr(expr, &ctx)?;
2898 let col = &table_schema.columns[*col_idx];
2899 new_row[*col_idx] = if val.is_null() {
2900 Value::Null
2901 } else {
2902 let got = val.data_type();
2903 val.coerce_into(col.data_type)
2904 .ok_or_else(|| SqlError::TypeMismatch {
2905 expected: col.data_type.to_string(),
2906 got: got.to_string(),
2907 })?
2908 };
2909 }
2910
2911 for col in &table_schema.columns {
2912 if matches!(
2913 col.generated_kind,
2914 Some(crate::parser::GeneratedKind::Stored)
2915 ) {
2916 let val = eval_expr(
2917 col.generated_expr.as_ref().unwrap(),
2918 &EvalCtx::new(col_map, &new_row),
2919 )?;
2920 let pos = col.position as usize;
2921 new_row[pos] = if val.is_null() {
2922 if !col.nullable {
2923 return Err(SqlError::NotNullViolation(col.name.clone()));
2924 }
2925 Value::Null
2926 } else {
2927 let got = val.data_type();
2928 val.coerce_into(col.data_type)
2929 .ok_or_else(|| SqlError::TypeMismatch {
2930 expected: col.data_type.to_string(),
2931 got: got.to_string(),
2932 })?
2933 };
2934 }
2935 }
2936
2937 let pk_indices = table_schema.pk_indices();
2938 let assigned_pk = assignments.iter().any(|(ci, _)| pk_indices.contains(ci));
2939 let pk_changed = assigned_pk && pk_indices.iter().any(|&i| old_row[i] != new_row[i]);
2940
2941 for (assigned_idx, _) in assignments {
2942 let col = &table_schema.columns[*assigned_idx];
2943 if !col.nullable && new_row[col.position as usize].is_null() {
2944 return Err(SqlError::NotNullViolation(col.name.clone()));
2945 }
2946 }
2947 if table_schema.has_checks() {
2948 for col in &table_schema.columns {
2949 if let Some(ref check) = col.check_expr {
2950 let ctx = EvalCtx::new(col_map, &new_row);
2951 let result = eval_expr(check, &ctx)?;
2952 if !is_truthy(&result) && !result.is_null() {
2953 let name = col.check_name.as_deref().unwrap_or(&col.name);
2954 return Err(SqlError::CheckViolation(name.to_string()));
2955 }
2956 }
2957 }
2958 for tc in &table_schema.check_constraints {
2959 let ctx = EvalCtx::new(col_map, &new_row);
2960 let result = eval_expr(&tc.expr, &ctx)?;
2961 if !is_truthy(&result) && !result.is_null() {
2962 let name = tc.name.as_deref().unwrap_or(&tc.sql);
2963 return Err(SqlError::CheckViolation(name.to_string()));
2964 }
2965 }
2966 }
2967 for fk in &table_schema.foreign_keys {
2968 let changed = fk
2969 .columns
2970 .iter()
2971 .any(|&ci| old_row[ci as usize] != new_row[ci as usize]);
2972 if !changed {
2973 continue;
2974 }
2975 let any_null = fk.columns.iter().any(|&ci| new_row[ci as usize].is_null());
2976 if any_null {
2977 continue;
2978 }
2979 let fk_vals: Vec<Value> = fk
2980 .columns
2981 .iter()
2982 .map(|&ci| new_row[ci as usize].clone())
2983 .collect();
2984 let fk_key = crate::encoding::encode_composite_key(&fk_vals);
2985 if fk.deferrable && fk.initially_deferred {
2986 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
2987 wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
2988 fk_name: name,
2989 foreign_table: fk.foreign_table.as_bytes().to_vec(),
2990 parent_key: fk_key,
2991 });
2992 continue;
2993 }
2994 if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key) {
2995 let found = wtx
2996 .table_get(fk.foreign_table.as_bytes(), &fk_key)
2997 .map_err(SqlError::Storage)?;
2998 if found.is_none() {
2999 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
3000 return Err(SqlError::ForeignKeyViolation(name.to_string()));
3001 }
3002 wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key);
3003 }
3004 }
3005
3006 let has_indices = !table_schema.indices.is_empty();
3007 let old_pk_values: Vec<Value> = if has_indices || pk_changed {
3008 pk_indices.iter().map(|&i| old_row[i].clone()).collect()
3009 } else {
3010 Vec::new()
3011 };
3012 let new_pk_values: Vec<Value> = if has_indices || pk_changed {
3013 pk_indices.iter().map(|&i| new_row[i].clone()).collect()
3014 } else {
3015 Vec::new()
3016 };
3017
3018 let non_pk = table_schema.non_pk_indices();
3019 let enc_pos = table_schema.encoding_positions();
3020 let phys_count = table_schema.physical_non_pk_count();
3021 let dropped = table_schema.dropped_non_pk_slots();
3022 let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
3023 for &slot in dropped {
3024 value_values[slot as usize] = Value::Null;
3025 }
3026 for (j, &i) in non_pk.iter().enumerate() {
3027 let col = &table_schema.columns[i];
3028 value_values[enc_pos[j] as usize] = if matches!(
3029 col.generated_kind,
3030 Some(crate::parser::GeneratedKind::Virtual)
3031 ) {
3032 Value::Null
3033 } else {
3034 new_row[i].clone()
3035 };
3036 }
3037 let mut new_value_buf = Vec::with_capacity(256);
3038 crate::encoding::encode_row_into(&value_values, &mut new_value_buf);
3039
3040 if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
3041 return Err(SqlError::RowTooLarge {
3042 size: new_value_buf.len(),
3043 max: citadel_core::MAX_VALUE_SIZE,
3044 });
3045 }
3046
3047 if pk_changed {
3048 let new_pk_key = crate::encoding::encode_composite_key(&new_pk_values);
3049 let inserted = wtx
3050 .table_insert(table_schema.name.as_bytes(), &new_pk_key, &new_value_buf)
3051 .map_err(SqlError::Storage)?;
3052 if !inserted {
3053 return Err(SqlError::DuplicateKey);
3054 }
3055 wtx.table_delete(table_schema.name.as_bytes(), old_pk_key)
3056 .map_err(SqlError::Storage)?;
3057 for idx in &table_schema.indices {
3058 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
3059 let old_idx_key =
3060 encode_index_key_with_schema(idx, old_row, &old_pk_values, table_schema);
3061 wtx.table_delete(&idx_table, &old_idx_key)
3062 .map_err(SqlError::Storage)?;
3063 let new_idx_key =
3064 encode_index_key_with_schema(idx, &new_row, &new_pk_values, table_schema);
3065 let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
3066 let is_new = wtx
3067 .table_insert(&idx_table, &new_idx_key, &new_idx_val)
3068 .map_err(SqlError::Storage)?;
3069 if idx.unique && !is_new {
3070 let any_null = idx
3071 .column_positions_iter()
3072 .any(|c| new_row[c as usize].is_null());
3073 if !any_null {
3074 return Err(SqlError::UniqueViolation(idx.name.clone()));
3075 }
3076 }
3077 }
3078 } else {
3079 wtx.table_update_sorted(
3080 table_schema.name.as_bytes(),
3081 &[(old_pk_key, new_value_buf.as_slice())],
3082 )
3083 .map_err(SqlError::Storage)?;
3084 let col_map_partial = any_partial_index(table_schema).then(|| table_schema.column_map());
3085 for idx in &table_schema.indices {
3086 let cols_changed = index_columns_changed(idx, old_row, &new_row);
3087 let (del, ins) = partial_idx_update_actions(
3088 idx,
3089 old_row,
3090 &new_row,
3091 cols_changed,
3092 false,
3093 col_map_partial,
3094 );
3095 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
3096 if del {
3097 let old_idx_key =
3098 encode_index_key_with_schema(idx, old_row, &old_pk_values, table_schema);
3099 wtx.table_delete(&idx_table, &old_idx_key)
3100 .map_err(SqlError::Storage)?;
3101 }
3102 if ins {
3103 let new_idx_key =
3104 encode_index_key_with_schema(idx, &new_row, &new_pk_values, table_schema);
3105 let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
3106 let is_new = wtx
3107 .table_insert(&idx_table, &new_idx_key, &new_idx_val)
3108 .map_err(SqlError::Storage)?;
3109 if idx.unique && !is_new {
3110 let any_null = idx
3111 .column_positions_iter()
3112 .any(|c| new_row[c as usize].is_null());
3113 if !any_null {
3114 return Err(SqlError::UniqueViolation(idx.name.clone()));
3115 }
3116 }
3117 }
3118 }
3119 }
3120
3121 if capture_returning {
3122 Ok(InsertRowOutcome::Updated {
3123 old: old_row.to_vec(),
3124 new: new_row,
3125 })
3126 } else {
3127 Ok(InsertRowOutcome::Inserted)
3128 }
3129}
3130
3131fn detect_fast_paths(
3132 ts: &TableSchema,
3133 assignments: &[(usize, Expr)],
3134) -> Option<Vec<DoUpdateFastPath>> {
3135 let non_pk = ts.non_pk_indices();
3136 let enc_pos = ts.encoding_positions();
3137 let mut out = Vec::with_capacity(assignments.len());
3138 for (col_idx, expr) in assignments {
3139 let col = &ts.columns[*col_idx];
3140 if col.data_type != DataType::Integer {
3141 return None;
3142 }
3143 let nonpk_order = non_pk.iter().position(|&i| i == *col_idx)?;
3144 let phys_idx = enc_pos[nonpk_order] as usize;
3145
3146 if let Expr::BinaryOp { left, op, right } = expr {
3147 if !matches!(op, BinOp::Add | BinOp::Sub) {
3148 return None;
3149 }
3150 let reads_target =
3151 matches!(left.as_ref(), Expr::Column(n) if n.eq_ignore_ascii_case(&col.name));
3152 if !reads_target {
3153 return None;
3154 }
3155 if let Expr::Literal(Value::Integer(n)) = right.as_ref() {
3156 let delta = if matches!(op, BinOp::Sub) { -n } else { *n };
3157 let _ = col_idx;
3158 out.push(DoUpdateFastPath::IntAddConst { phys_idx, delta });
3159 continue;
3160 }
3161 return None;
3162 }
3163 return None;
3164 }
3165 Some(out)
3166}
3167
3168fn compile_on_conflict(oc: &OnConflictClause, ts: &TableSchema) -> Result<CompiledOnConflict> {
3169 let target = oc
3170 .target
3171 .as_ref()
3172 .map(|t| resolve_conflict_target(t, ts))
3173 .transpose()?;
3174 match &oc.action {
3175 OnConflictAction::DoNothing => Ok(CompiledOnConflict::DoNothing { target }),
3176 OnConflictAction::DoUpdate {
3177 assignments,
3178 where_clause,
3179 } => {
3180 let target = target.ok_or_else(|| {
3181 SqlError::Plan("ON CONFLICT without target requires DO NOTHING".into())
3182 })?;
3183 let compiled_assignments: Vec<(usize, Expr)> = assignments
3184 .iter()
3185 .map(|(name, expr)| {
3186 let col_idx = ts
3187 .column_index(name)
3188 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?;
3189 Ok((col_idx, expr.clone()))
3190 })
3191 .collect::<Result<_>>()?;
3192 let fast_paths = if where_clause.is_none() {
3193 detect_fast_paths(ts, &compiled_assignments)
3194 } else {
3195 None
3196 };
3197 Ok(CompiledOnConflict::DoUpdate {
3198 target,
3199 assignments: compiled_assignments,
3200 where_clause: where_clause.clone(),
3201 fast_paths,
3202 })
3203 }
3204 }
3205}
3206
3207fn exec_insert_trivial_fast(
3209 wtx: &mut WriteTxn<'_>,
3210 table_lower: &str,
3211 cache: &InsertCache,
3212 bufs: &mut InsertBufs,
3213 params: &[Value],
3214) -> Result<ExecutionResult> {
3215 let prog = cache
3216 .trivial_fast_program
3217 .as_ref()
3218 .expect("trivial fast: program");
3219
3220 for &p in &prog.not_null_param_indices {
3221 if params[p as usize].is_null() {
3222 return Err(SqlError::NotNullViolation(format!("param@{p}")));
3223 }
3224 }
3225
3226 match ¶ms[prog.pk_param as usize] {
3227 Value::Integer(v) => crate::encoding::encode_int_key_into(*v, &mut bufs.key_buf),
3228 _ => return Err(SqlError::InvalidValue("non-integer PK in fast path".into())),
3229 }
3230
3231 bufs.value_buf.clear();
3232 bufs.value_buf.extend_from_slice(&prog.template);
3233
3234 for op in &prog.ops {
3235 match op {
3236 WriteOp::ParamI64 { param_idx, off } => match ¶ms[*param_idx as usize] {
3237 Value::Integer(v) => {
3238 let off = *off as usize;
3239 bufs.value_buf[off..off + 8].copy_from_slice(&v.to_le_bytes());
3240 }
3241 other => {
3242 return Err(SqlError::TypeMismatch {
3243 expected: "Integer".into(),
3244 got: other.data_type().to_string(),
3245 });
3246 }
3247 },
3248 WriteOp::LiteralI64 { value, off } => {
3249 let off = *off as usize;
3250 bufs.value_buf[off..off + 8].copy_from_slice(&value.to_le_bytes());
3251 }
3252 WriteOp::GenAddParamsI64 {
3253 a_param,
3254 b_param,
3255 off,
3256 bitmap_byte_off,
3257 bitmap_bit_mask,
3258 } => match (¶ms[*a_param as usize], ¶ms[*b_param as usize]) {
3259 (Value::Integer(a), Value::Integer(b)) => {
3260 let off = *off as usize;
3261 bufs.value_buf[off..off + 8].copy_from_slice(&a.wrapping_add(*b).to_le_bytes());
3262 }
3263 _ => {
3264 bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
3265 }
3266 },
3267 WriteOp::GenMulAddParamI64 {
3268 param_idx,
3269 mul,
3270 add,
3271 off,
3272 bitmap_byte_off,
3273 bitmap_bit_mask,
3274 } => match ¶ms[*param_idx as usize] {
3275 Value::Integer(v) => {
3276 let r = v.wrapping_mul(*mul).wrapping_add(*add);
3277 let off = *off as usize;
3278 bufs.value_buf[off..off + 8].copy_from_slice(&r.to_le_bytes());
3279 }
3280 _ => {
3281 bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
3282 }
3283 },
3284 }
3285 }
3286
3287 let is_new = wtx
3288 .table_insert(table_lower.as_bytes(), &bufs.key_buf, &bufs.value_buf)
3289 .map_err(SqlError::Storage)?;
3290 if !is_new {
3291 return Err(SqlError::DuplicateKey);
3292 }
3293 Ok(ExecutionResult::RowsAffected(1))
3294}
3295
3296fn build_bind_plan(
3297 stmt: &InsertStmt,
3298 col_indices: &[usize],
3299 col_data_types: &[DataType],
3300) -> Option<Vec<BindAction>> {
3301 let rows = match &stmt.source {
3302 InsertSource::Values(rows) => rows,
3303 _ => return None,
3304 };
3305 if rows.len() != 1 {
3306 return None;
3307 }
3308 let value_row = &rows[0];
3309 if value_row.len() != col_indices.len() {
3310 return None;
3311 }
3312 let mut plan = Vec::with_capacity(value_row.len());
3313 for (i, expr) in value_row.iter().enumerate() {
3314 let col_idx = col_indices[i];
3315 let target = col_data_types[col_idx];
3316 match expr {
3317 Expr::Parameter(n) => {
3318 if *n == 0 {
3319 return None;
3320 }
3321 plan.push(BindAction::Param {
3322 param_idx: n - 1,
3323 col_idx,
3324 target,
3325 });
3326 }
3327 Expr::Literal(v) => plan.push(BindAction::Literal {
3328 value: v.clone(),
3329 col_idx,
3330 }),
3331 _ => return None,
3332 }
3333 }
3334 Some(plan)
3335}
3336
3337impl CompiledInsert {
3338 pub fn try_compile(schema: &SchemaManager, stmt: &InsertStmt) -> Option<Self> {
3339 let lower = stmt.table.to_ascii_lowercase();
3340 let cached = if let Some(ts) = schema.get(&lower) {
3341 let insert_columns: Vec<&str> = if stmt.columns.is_empty() {
3342 ts.columns.iter().map(|c| c.name.as_str()).collect()
3343 } else {
3344 stmt.columns.iter().map(|s| s.as_str()).collect()
3345 };
3346 let mut col_indices = Vec::with_capacity(insert_columns.len());
3347 for name in &insert_columns {
3348 col_indices.push(ts.column_index(name)?);
3349 }
3350 if col_indices
3351 .iter()
3352 .any(|&ci| ts.columns[ci].generated_kind.is_some())
3353 {
3354 return None;
3355 }
3356 let on_conflict = stmt
3357 .on_conflict
3358 .as_ref()
3359 .map(|oc| compile_on_conflict(oc, ts))
3360 .transpose()
3361 .ok()
3362 .flatten()
3363 .map(Arc::new);
3364 let generated_col_positions: Vec<usize> = ts
3365 .columns
3366 .iter()
3367 .enumerate()
3368 .filter_map(|(i, c)| {
3369 matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored))
3370 .then_some(i)
3371 })
3372 .collect();
3373 let generated_fast_evals: Vec<FastGenEval> = generated_col_positions
3374 .iter()
3375 .map(|&pos| {
3376 detect_fast_gen_eval(ts.columns[pos].generated_expr.as_ref().unwrap(), ts)
3377 })
3378 .collect();
3379 let row_col_map = if on_conflict.is_some() || !generated_col_positions.is_empty() {
3380 Some(ColumnMap::new(&ts.columns))
3381 } else {
3382 None
3383 };
3384 let pk_indices: Vec<usize> = ts.pk_indices().to_vec();
3385 let non_pk_indices: Vec<usize> = ts.non_pk_indices().to_vec();
3386 let encoding_positions: Vec<u16> = ts.encoding_positions().to_vec();
3387 let dropped_non_pk_slots: Vec<u16> = ts.dropped_non_pk_slots().to_vec();
3388 let phys_count = ts.physical_non_pk_count();
3389 let col_data_types: Vec<DataType> = ts.columns.iter().map(|c| c.data_type).collect();
3390 let single_int_pk =
3391 pk_indices.len() == 1 && ts.columns[pk_indices[0]].data_type == DataType::Integer;
3392 let not_null_indices: Vec<u16> = ts
3393 .columns
3394 .iter()
3395 .filter(|c| !c.nullable)
3396 .map(|c| c.position)
3397 .collect();
3398 let bind_plan = build_bind_plan(stmt, &col_indices, &col_data_types);
3399 let any_defaults_flag = ts.columns.iter().any(|c| c.default_expr.is_some());
3400 let row_fully_overwritten = if any_defaults_flag {
3401 false
3402 } else {
3403 let mut covered: rustc_hash::FxHashSet<usize> =
3404 col_indices.iter().copied().collect();
3405 covered.extend(generated_col_positions.iter().copied());
3406 for (j, &i) in non_pk_indices.iter().enumerate() {
3407 let _ = j;
3408 if matches!(
3409 ts.columns[i].generated_kind,
3410 Some(crate::parser::GeneratedKind::Virtual)
3411 ) {
3412 covered.insert(i);
3413 }
3414 }
3415 bind_plan.is_some() && covered.len() == ts.columns.len()
3416 };
3417 let has_fks = !ts.foreign_keys.is_empty();
3418 let has_indices = !ts.indices.is_empty();
3419 let mut non_virtual_pairs: Vec<(usize, usize)> = Vec::new();
3420 let mut null_value_slots: Vec<usize> =
3421 dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3422 for (j, &i) in non_pk_indices.iter().enumerate() {
3423 let slot = encoding_positions[j] as usize;
3424 if matches!(
3425 ts.columns[i].generated_kind,
3426 Some(crate::parser::GeneratedKind::Virtual)
3427 ) {
3428 null_value_slots.push(slot);
3429 } else {
3430 non_virtual_pairs.push((i, slot));
3431 }
3432 }
3433 let row_encoder = {
3434 let all_int_or_null = non_pk_indices.iter().enumerate().all(|(j, &i)| {
3435 let col = &ts.columns[i];
3436 if matches!(
3437 col.generated_kind,
3438 Some(crate::parser::GeneratedKind::Virtual)
3439 ) {
3440 true
3441 } else {
3442 col.data_type == DataType::Integer && encoding_positions[j] != u16::MAX
3443 }
3444 });
3445 if all_int_or_null {
3446 let mut null_slots: Vec<usize> =
3447 dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3448 for (j, &i) in non_pk_indices.iter().enumerate() {
3449 if matches!(
3450 ts.columns[i].generated_kind,
3451 Some(crate::parser::GeneratedKind::Virtual)
3452 ) {
3453 null_slots.push(encoding_positions[j] as usize);
3454 }
3455 }
3456 Some(crate::encoding::build_int_row_template(
3457 phys_count,
3458 &null_slots,
3459 ))
3460 } else {
3461 None
3462 }
3463 };
3464 let is_trivial_fast_eligible = !insert_has_subquery(stmt)
3465 && !ts.columns.iter().any(|c| c.default_expr.is_some())
3466 && !ts.has_checks()
3467 && !has_fks
3468 && !has_indices
3469 && stmt.on_conflict.is_none()
3470 && stmt.returning.is_none()
3471 && bind_plan.is_some()
3472 && row_encoder.is_some()
3473 && row_fully_overwritten
3474 && single_int_pk
3475 && generated_fast_evals
3476 .iter()
3477 .all(|fe| !matches!(fe, FastGenEval::None));
3478 let trivial_fast_program = if is_trivial_fast_eligible {
3479 build_trivial_fast_program(
3480 bind_plan.as_ref().unwrap(),
3481 row_encoder.as_ref().unwrap(),
3482 &non_virtual_pairs,
3483 &generated_col_positions,
3484 &generated_fast_evals,
3485 &pk_indices,
3486 &ts.columns,
3487 )
3488 } else {
3489 None
3490 };
3491 let is_trivial_fast = trivial_fast_program.is_some();
3492 let has_checks = ts.has_checks();
3493 let any_defaults = ts.columns.iter().any(|c| c.default_expr.is_some());
3494 let needs_scoped_params = bind_plan.is_none()
3495 || has_checks
3496 || any_defaults
3497 || !generated_col_positions.is_empty()
3498 || on_conflict.is_some()
3499 || stmt.returning.is_some()
3500 || insert_has_subquery(stmt)
3501 || super::helpers::any_partial_index(ts);
3502 Some(InsertCache {
3503 col_indices,
3504 has_subquery: insert_has_subquery(stmt),
3505 any_defaults,
3506 has_checks,
3507 on_conflict,
3508 row_col_map,
3509 generated_col_positions,
3510 generated_fast_evals,
3511 pk_indices,
3512 non_pk_indices,
3513 encoding_positions,
3514 dropped_non_pk_slots,
3515 phys_count,
3516 single_int_pk,
3517 not_null_indices,
3518 bind_plan,
3519 row_fully_overwritten,
3520 row_encoder,
3521 is_trivial_fast,
3522 trivial_fast_program,
3523 needs_scoped_params,
3524 })
3525 } else if schema.get_view(&lower).is_some() {
3526 None
3527 } else {
3528 return None;
3529 };
3530 Some(Self {
3531 table_lower: lower,
3532 cached,
3533 })
3534 }
3535}
3536
3537impl CompiledPlan for CompiledInsert {
3538 fn execute(
3539 &self,
3540 db: &Database,
3541 schema: &SchemaManager,
3542 stmt: &Statement,
3543 params: &[Value],
3544 txn: super::compile::ActiveTxnRef<'_, '_>,
3545 ) -> Result<ExecutionResult> {
3546 let ins = match stmt {
3547 Statement::Insert(i) => i,
3548 _ => {
3549 return Err(SqlError::Unsupported(
3550 "CompiledInsert received non-INSERT statement".into(),
3551 ))
3552 }
3553 };
3554 use super::compile::ActiveTxnRef;
3555 match txn {
3556 ActiveTxnRef::None => exec_insert(db, schema, ins, params),
3557 ActiveTxnRef::Read(_) => Err(SqlError::Unsupported(
3558 "cannot execute mutating statement inside a read-only transaction".into(),
3559 )),
3560 ActiveTxnRef::Write(outer) => match self.cached.as_ref() {
3561 Some(c) if c.is_trivial_fast => with_insert_scratch(|bufs| {
3562 exec_insert_trivial_fast(outer, &self.table_lower, c, bufs, params)
3563 }),
3564 Some(c) => exec_insert_in_txn_cached(outer, schema, ins, params, c),
3565 None => exec_insert_in_txn(outer, schema, ins, params),
3566 },
3567 }
3568 }
3569
3570 fn uses_scoped_params(&self) -> bool {
3571 match self.cached.as_ref() {
3572 Some(c) => !c.is_trivial_fast && c.needs_scoped_params,
3573 None => true,
3574 }
3575 }
3576}
3577
3578pub struct CompiledDelete {
3579 table_lower: String,
3580}
3581
3582impl CompiledDelete {
3583 pub fn try_compile(schema: &SchemaManager, stmt: &DeleteStmt) -> Option<Self> {
3584 let lower = stmt.table.to_ascii_lowercase();
3585 schema.get(&lower)?;
3586 Some(Self { table_lower: lower })
3587 }
3588}
3589
3590impl CompiledPlan for CompiledDelete {
3591 fn execute(
3592 &self,
3593 db: &Database,
3594 schema: &SchemaManager,
3595 stmt: &Statement,
3596 _params: &[Value],
3597 txn: super::compile::ActiveTxnRef<'_, '_>,
3598 ) -> Result<ExecutionResult> {
3599 let del = match stmt {
3600 Statement::Delete(d) => d,
3601 _ => {
3602 return Err(SqlError::Unsupported(
3603 "CompiledDelete received non-DELETE statement".into(),
3604 ))
3605 }
3606 };
3607 let _ = &self.table_lower;
3608 use super::compile::ActiveTxnRef;
3609 match txn {
3610 ActiveTxnRef::None => super::write::exec_delete(db, schema, del),
3611 ActiveTxnRef::Read(_) => Err(SqlError::Unsupported(
3612 "cannot execute mutating statement inside a read-only transaction".into(),
3613 )),
3614 ActiveTxnRef::Write(outer) => super::write::exec_delete_in_txn(outer, schema, del),
3615 }
3616 }
3617}
3618
3619fn exec_instead_of_view_insert_auto(
3620 db: &Database,
3621 schema: &SchemaManager,
3622 view_name: &str,
3623 aliases: &[String],
3624 stmt: &InsertStmt,
3625 params: &[Value],
3626) -> Result<ExecutionResult> {
3627 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
3628 let r = exec_instead_of_view_insert_in_txn(&mut wtx, schema, view_name, aliases, stmt, params)?;
3629 wtx.commit().map_err(SqlError::Storage)?;
3630 Ok(r)
3631}
3632
3633fn exec_instead_of_view_insert_in_txn(
3634 wtx: &mut WriteTxn<'_>,
3635 schema: &SchemaManager,
3636 view_name: &str,
3637 aliases: &[String],
3638 stmt: &InsertStmt,
3639 params: &[Value],
3640) -> Result<ExecutionResult> {
3641 let resolved_aliases: Vec<String> = if aliases.is_empty() {
3643 derive_view_columns(wtx, schema, view_name)?
3644 } else {
3645 aliases.to_vec()
3646 };
3647 let view_cols = super::triggers::view_columns_from_aliases(&resolved_aliases);
3648 let alias_map: rustc_hash::FxHashMap<String, usize> = resolved_aliases
3649 .iter()
3650 .enumerate()
3651 .map(|(i, name)| (name.to_ascii_lowercase(), i))
3652 .collect();
3653
3654 let target_positions: Vec<usize> = if stmt.columns.is_empty() {
3655 (0..resolved_aliases.len()).collect()
3656 } else {
3657 stmt.columns
3658 .iter()
3659 .map(|c| {
3660 alias_map
3661 .get(&c.to_ascii_lowercase())
3662 .copied()
3663 .ok_or_else(|| SqlError::ColumnNotFound(c.clone()))
3664 })
3665 .collect::<Result<_>>()?
3666 };
3667
3668 let source_rows: Vec<Vec<Value>> = match &stmt.source {
3669 InsertSource::Values(rows) => {
3670 let mut out = Vec::with_capacity(rows.len());
3671 for row in rows {
3672 if row.len() != target_positions.len() {
3673 return Err(SqlError::InvalidValue(format!(
3674 "expected {} values, got {}",
3675 target_positions.len(),
3676 row.len()
3677 )));
3678 }
3679 let mut vals = Vec::with_capacity(row.len());
3680 for expr in row {
3681 let v = match expr {
3682 Expr::Parameter(n) => params
3683 .get(n - 1)
3684 .cloned()
3685 .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
3686 Expr::Literal(v) => v.clone(),
3687 other => eval_const_expr(other)?,
3688 };
3689 vals.push(v);
3690 }
3691 out.push(vals);
3692 }
3693 out
3694 }
3695 InsertSource::Select(sq) => {
3696 let empty_ctes = CteContext::default();
3697 let qr = exec_query_body_write(wtx, schema, &sq.body, &empty_ctes)?;
3698 qr.rows
3699 }
3700 };
3701
3702 let mut count: u64 = 0;
3703 for row in source_rows {
3704 if row.len() != target_positions.len() {
3705 return Err(SqlError::InvalidValue(format!(
3706 "expected {} values, got {}",
3707 target_positions.len(),
3708 row.len()
3709 )));
3710 }
3711 let mut new_row = vec![Value::Null; resolved_aliases.len()];
3712 for (slot, val) in target_positions.iter().zip(row) {
3713 new_row[*slot] = val;
3714 }
3715 super::triggers::fire_row_triggers(
3716 wtx,
3717 schema,
3718 view_name,
3719 crate::parser::TriggerTiming::InsteadOf,
3720 super::triggers::FireEvent::Insert,
3721 None,
3722 Some(new_row),
3723 &view_cols,
3724 )?;
3725 count += 1;
3726 }
3727 Ok(ExecutionResult::RowsAffected(count))
3728}
3729
3730fn derive_view_columns(
3731 wtx: &mut WriteTxn<'_>,
3732 schema: &SchemaManager,
3733 view_name: &str,
3734) -> Result<Vec<String>> {
3735 use crate::parser::{QueryBody, SelectColumn, SelectQuery, SelectStmt};
3736 let sel = SelectStmt {
3737 columns: vec![SelectColumn::AllColumns],
3738 from: view_name.to_string(),
3739 from_alias: None,
3740 from_subquery: None,
3741 from_args: None,
3742 from_json_table: None,
3743 joins: vec![],
3744 distinct: false,
3745 where_clause: None,
3746 order_by: vec![],
3747 limit: Some(Expr::Literal(Value::Integer(1))),
3748 offset: None,
3749 group_by: vec![],
3750 having: None,
3751 };
3752 let sq = SelectQuery {
3753 ctes: vec![],
3754 recursive: false,
3755 body: QueryBody::Select(Box::new(sel)),
3756 };
3757 let qr = super::cte::exec_select_query_in_txn(wtx, schema, &sq)?;
3758 match qr {
3759 ExecutionResult::Query(q) => Ok(q.columns),
3760 _ => Ok(Vec::new()),
3761 }
3762}
3763
3764#[cfg(test)]
3765#[path = "dml_tests.rs"]
3766mod tests;