1use std::cell::RefCell;
2use std::sync::Arc;
3
4use citadel::Database;
5use citadel_buffer::btree::{UpsertAction, UpsertOutcome};
6use citadel_txn::read_txn::ReadTxn;
7use citadel_txn::write_txn::WriteTxn;
8use rustc_hash::FxHashMap;
9
10use crate::encoding::{encode_composite_key_into, encode_row_into};
11use crate::error::{Result, SqlError};
12use crate::eval::{eval_expr, is_truthy, ColumnMap, EvalCtx};
13use crate::parser::*;
14use crate::types::*;
15
16use crate::schema::SchemaManager;
17
18use super::compile::CompiledPlan;
19use super::helpers::*;
20use super::CteContext;
21
22pub(super) fn exec_insert(
23 db: &Database,
24 schema: &SchemaManager,
25 stmt: &InsertStmt,
26 params: &[Value],
27) -> Result<ExecutionResult> {
28 let empty_ctes = CteContext::default();
29 let materialized;
30 let stmt = if insert_has_subquery(stmt) {
31 materialized = materialize_insert(stmt, &mut |sub| {
32 exec_subquery_read(db, schema, sub, &empty_ctes)
33 })?;
34 &materialized
35 } else {
36 stmt
37 };
38
39 let lower_name = stmt.table.to_ascii_lowercase();
40 if let Some(view_def) = schema.get_view(&lower_name) {
41 if super::triggers::has_instead_of(schema, &lower_name, super::triggers::FireEvent::Insert)
42 {
43 let aliases = view_def.column_aliases.clone();
44 return exec_instead_of_view_insert_auto(
45 db,
46 schema,
47 &lower_name,
48 &aliases,
49 stmt,
50 params,
51 );
52 }
53 return Err(SqlError::CannotModifyView(stmt.table.clone()));
54 }
55 if schema.get_matview(&lower_name).is_some() {
56 return Err(SqlError::CannotModifyView(format!(
57 "materialized view '{}' is read-only — use REFRESH MATERIALIZED VIEW",
58 stmt.table
59 )));
60 }
61 let table_schema = schema
62 .get(&lower_name)
63 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
64
65 let insert_columns = if stmt.columns.is_empty() {
66 table_schema
67 .columns
68 .iter()
69 .map(|c| c.name.clone())
70 .collect::<Vec<_>>()
71 } else {
72 stmt.columns
73 .iter()
74 .map(|c| c.to_ascii_lowercase())
75 .collect()
76 };
77
78 let col_indices: Vec<usize> = insert_columns
79 .iter()
80 .map(|name| {
81 table_schema
82 .column_index(name)
83 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
84 })
85 .collect::<Result<_>>()?;
86
87 for &ci in &col_indices {
88 if table_schema.columns[ci].generated_kind.is_some() {
89 return Err(SqlError::CannotInsertIntoGeneratedColumn(
90 table_schema.columns[ci].name.clone(),
91 ));
92 }
93 }
94
95 let defaults: Vec<(usize, &Expr)> = table_schema
96 .columns
97 .iter()
98 .filter(|c| c.default_expr.is_some() && !col_indices.contains(&(c.position as usize)))
99 .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
100 .collect();
101
102 let generated_cols: Vec<(usize, &Expr)> = table_schema
103 .columns
104 .iter()
105 .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
106 .map(|c| (c.position as usize, c.generated_expr.as_ref().unwrap()))
107 .collect();
108
109 let has_checks = table_schema.has_checks();
110 let strict = table_schema.is_strict();
111 let row_col_map_for_gen = if !generated_cols.is_empty() {
112 Some(ColumnMap::new(&table_schema.columns))
113 } else {
114 None
115 };
116 let check_col_map = if has_checks {
117 Some(ColumnMap::new(&table_schema.columns))
118 } else {
119 None
120 };
121
122 let select_rows = match &stmt.source {
123 InsertSource::Select(sq) => {
124 let insert_ctes =
125 super::materialize_all_ctes(&sq.ctes, sq.recursive, &mut |body, ctx| {
126 exec_query_body_read(db, schema, body, ctx)
127 })?;
128 let qr = exec_query_body_read(db, schema, &sq.body, &insert_ctes)?;
129 Some(qr.rows)
130 }
131 InsertSource::Values(_) => None,
132 };
133
134 let compiled_conflict: Option<Arc<CompiledOnConflict>> = stmt
135 .on_conflict
136 .as_ref()
137 .map(|oc| compile_on_conflict(oc, table_schema).map(Arc::new))
138 .transpose()?;
139
140 let row_col_map = compiled_conflict
141 .as_ref()
142 .map(|_| ColumnMap::new(&table_schema.columns));
143
144 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
145 let mut count: u64 = 0;
146 let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
147 stmt.returning.as_ref().map(|_| Vec::new());
148
149 let pk_indices = table_schema.pk_indices();
150 let non_pk = table_schema.non_pk_indices();
151 let enc_pos = table_schema.encoding_positions();
152 let phys_count = table_schema.physical_non_pk_count();
153 let mut row = vec![Value::Null; table_schema.columns.len()];
154 let mut pk_values: Vec<Value> = vec![Value::Null; pk_indices.len()];
155 let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
156 let mut key_buf: Vec<u8> = Vec::with_capacity(64);
157 let mut value_buf: Vec<u8> = Vec::with_capacity(256);
158 let mut fk_key_buf: Vec<u8> = Vec::with_capacity(64);
159
160 let values = match &stmt.source {
161 InsertSource::Values(rows) => Some(rows.as_slice()),
162 InsertSource::Select(_) => None,
163 };
164 let sel_rows = select_rows.as_deref();
165
166 let total = match (values, sel_rows) {
167 (Some(rows), _) => rows.len(),
168 (_, Some(rows)) => rows.len(),
169 _ => 0,
170 };
171
172 if let Some(sel) = sel_rows {
173 if !sel.is_empty() && sel[0].len() != insert_columns.len() {
174 return Err(SqlError::InvalidValue(format!(
175 "INSERT ... SELECT column count mismatch: expected {}, got {}",
176 insert_columns.len(),
177 sel[0].len()
178 )));
179 }
180 }
181
182 let has_insert_statement_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
183 t.enabled
184 && t.granularity == crate::parser::TriggerGranularity::ForEachStatement
185 && t.events
186 .iter()
187 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
188 });
189 let mut stmt_new_rows: Vec<Vec<Value>> = if has_insert_statement_triggers {
190 Vec::with_capacity(total)
191 } else {
192 Vec::new()
193 };
194
195 if has_insert_statement_triggers {
196 super::triggers::fire_statement_triggers(
197 &mut wtx,
198 schema,
199 &table_schema.name,
200 crate::parser::TriggerTiming::Before,
201 super::triggers::FireEvent::Insert,
202 &table_schema.columns,
203 &[],
204 &[],
205 )?;
206 }
207
208 for idx in 0..total {
209 for v in row.iter_mut() {
210 *v = Value::Null;
211 }
212
213 if let Some(value_rows) = values {
214 let value_row = &value_rows[idx];
215 if value_row.len() != insert_columns.len() {
216 return Err(SqlError::InvalidValue(format!(
217 "expected {} values, got {}",
218 insert_columns.len(),
219 value_row.len()
220 )));
221 }
222 for (i, expr) in value_row.iter().enumerate() {
223 let val = if let Expr::Parameter(n) = expr {
224 params
225 .get(n - 1)
226 .cloned()
227 .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?
228 } else {
229 eval_const_expr(expr)?
230 };
231 let col_idx = col_indices[i];
232 let col = &table_schema.columns[col_idx];
233 row[col_idx] = if val.is_null() {
234 Value::Null
235 } else {
236 coerce_for_column(val, col, strict)?
237 };
238 }
239 } else if let Some(sel) = sel_rows {
240 let sel_row = &sel[idx];
241 for (i, val) in sel_row.iter().enumerate() {
242 let col_idx = col_indices[i];
243 let col = &table_schema.columns[col_idx];
244 row[col_idx] = if val.is_null() {
245 Value::Null
246 } else {
247 coerce_for_column(val.clone(), col, strict)?
248 };
249 }
250 }
251
252 for &(pos, def_expr) in &defaults {
253 let val = eval_const_expr(def_expr)?;
254 let col = &table_schema.columns[pos];
255 if !val.is_null() {
256 row[pos] = coerce_for_column(val, col, strict)?;
257 }
258 }
259
260 if let Some(ref gen_map) = row_col_map_for_gen {
261 for &(pos, gen_expr) in &generated_cols {
262 let val = eval_expr(gen_expr, &EvalCtx::new(gen_map, &row))?;
263 let col = &table_schema.columns[pos];
264 row[pos] = if val.is_null() {
265 Value::Null
266 } else {
267 coerce_for_column(val, col, strict)?
268 };
269 }
270 }
271
272 for col in &table_schema.columns {
273 if !col.nullable && row[col.position as usize].is_null() {
274 return Err(SqlError::NotNullViolation(col.name.clone()));
275 }
276 }
277
278 if let Some(ref col_map) = check_col_map {
279 for col in &table_schema.columns {
280 if let Some(ref check) = col.check_expr {
281 let result = eval_expr(check, &EvalCtx::new(col_map, &row))?;
282 if !is_truthy(&result) && !result.is_null() {
283 let name = col.check_name.as_deref().unwrap_or(&col.name);
284 return Err(SqlError::CheckViolation(name.to_string()));
285 }
286 }
287 }
288 for tc in &table_schema.check_constraints {
289 let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &row))?;
290 if !is_truthy(&result) && !result.is_null() {
291 let name = tc.name.as_deref().unwrap_or(&tc.sql);
292 return Err(SqlError::CheckViolation(name.to_string()));
293 }
294 }
295 }
296
297 for fk in &table_schema.foreign_keys {
298 let any_null = fk.columns.iter().any(|&ci| row[ci as usize].is_null());
299 if any_null {
300 continue; }
302 let fk_vals: Vec<Value> = fk
303 .columns
304 .iter()
305 .map(|&ci| row[ci as usize].clone())
306 .collect();
307 fk_key_buf.clear();
308 encode_composite_key_into(&fk_vals, &mut fk_key_buf);
309 if fk.deferrable && fk.initially_deferred {
310 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
311 wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
312 fk_name: name,
313 foreign_table: fk.foreign_table.as_bytes().to_vec(),
314 parent_key: fk_key_buf.clone(),
315 });
316 continue;
317 }
318 if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key_buf) {
319 let found = wtx
320 .table_get(fk.foreign_table.as_bytes(), &fk_key_buf)
321 .map_err(SqlError::Storage)?;
322 if found.is_none() {
323 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
324 return Err(SqlError::ForeignKeyViolation(name.to_string()));
325 }
326 wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key_buf);
327 }
328 }
329
330 let proposed_row_for_returning: Option<Vec<Value>> =
331 returning_rows.as_ref().map(|_| row.clone());
332 let row_for_stmt_trigger: Option<Vec<Value>> = if has_insert_statement_triggers {
333 Some(row.clone())
334 } else {
335 None
336 };
337
338 let has_before_insert_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
339 t.enabled
340 && t.timing == crate::parser::TriggerTiming::Before
341 && t.granularity == crate::parser::TriggerGranularity::ForEachRow
342 && t.events
343 .iter()
344 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
345 });
346 if has_before_insert_triggers {
347 super::triggers::fire_row_triggers(
348 &mut wtx,
349 schema,
350 &table_schema.name,
351 crate::parser::TriggerTiming::Before,
352 super::triggers::FireEvent::Insert,
353 None,
354 Some(row.clone()),
355 &table_schema.columns,
356 )?;
357 }
358
359 for (j, &i) in pk_indices.iter().enumerate() {
360 pk_values[j] = std::mem::replace(&mut row[i], Value::Null);
361 }
362 encode_composite_key_into(&pk_values, &mut key_buf);
363
364 for (j, &i) in non_pk.iter().enumerate() {
365 let col = &table_schema.columns[i];
366 if matches!(
367 col.generated_kind,
368 Some(crate::parser::GeneratedKind::Virtual)
369 ) {
370 value_values[enc_pos[j] as usize] = Value::Null;
371 row[i] = Value::Null;
372 } else {
373 value_values[enc_pos[j] as usize] = std::mem::replace(&mut row[i], Value::Null);
374 }
375 }
376 encode_row_into(&value_values, &mut value_buf);
377
378 if key_buf.len() > citadel_core::MAX_KEY_SIZE {
379 return Err(SqlError::KeyTooLarge {
380 size: key_buf.len(),
381 max: citadel_core::MAX_KEY_SIZE,
382 });
383 }
384 if value_buf.len() > citadel_core::MAX_VALUE_SIZE {
385 return Err(SqlError::RowTooLarge {
386 size: value_buf.len(),
387 max: citadel_core::MAX_VALUE_SIZE,
388 });
389 }
390
391 match compiled_conflict.as_ref() {
392 None => {
393 let is_new = wtx
394 .table_insert(table_schema.name.as_bytes(), &key_buf, &value_buf)
395 .map_err(SqlError::Storage)?;
396 if !is_new {
397 return Err(SqlError::DuplicateKey);
398 }
399 let has_after_insert_triggers =
400 schema.triggers_for(&table_schema.name).iter().any(|t| {
401 t.enabled
402 && t.timing == crate::parser::TriggerTiming::After
403 && t.granularity == crate::parser::TriggerGranularity::ForEachRow
404 && t.events
405 .iter()
406 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
407 });
408 if !table_schema.indices.is_empty() || has_after_insert_triggers {
409 for (j, &i) in pk_indices.iter().enumerate() {
410 row[i] = pk_values[j].clone();
411 }
412 for (j, &i) in non_pk.iter().enumerate() {
413 row[i] =
414 std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
415 }
416 if !table_schema.indices.is_empty() {
417 insert_index_entries(&mut wtx, table_schema, &row, &pk_values)?;
418 }
419 if has_after_insert_triggers {
420 super::triggers::fire_row_triggers(
421 &mut wtx,
422 schema,
423 &table_schema.name,
424 crate::parser::TriggerTiming::After,
425 super::triggers::FireEvent::Insert,
426 None,
427 Some(row.clone()),
428 &table_schema.columns,
429 )?;
430 }
431 }
432 if let Some(r) = row_for_stmt_trigger.clone() {
433 stmt_new_rows.push(r);
434 }
435 count += 1;
436 if let Some(buf) = returning_rows.as_mut() {
437 buf.push((None, proposed_row_for_returning));
438 }
439 }
440 Some(oc) => {
441 let oc_ref: &CompiledOnConflict = oc;
442 let needs_row = upsert_needs_row(oc_ref, table_schema);
443 if needs_row {
444 for (j, &i) in pk_indices.iter().enumerate() {
445 row[i] = pk_values[j].clone();
446 }
447 for (j, &i) in non_pk.iter().enumerate() {
448 row[i] =
449 std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
450 }
451 }
452 let outcome = apply_insert_with_conflict(
453 &mut wtx,
454 table_schema,
455 &key_buf,
456 &value_buf,
457 &row,
458 &pk_values,
459 oc_ref,
460 row_col_map.as_ref().unwrap(),
461 stmt.returning.is_some(),
462 )?;
463 match outcome {
464 InsertRowOutcome::Inserted => {
465 count += 1;
466 if let Some(buf) = returning_rows.as_mut() {
467 buf.push((None, proposed_row_for_returning));
468 }
469 if let Some(r) = row_for_stmt_trigger.clone() {
470 stmt_new_rows.push(r);
471 }
472 let has_after_insert_triggers =
473 schema.triggers_for(&table_schema.name).iter().any(|t| {
474 t.enabled
475 && t.timing == crate::parser::TriggerTiming::After
476 && t.granularity
477 == crate::parser::TriggerGranularity::ForEachRow
478 && t.events
479 .iter()
480 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
481 });
482 if has_after_insert_triggers {
483 super::triggers::fire_row_triggers(
484 &mut wtx,
485 schema,
486 &table_schema.name,
487 crate::parser::TriggerTiming::After,
488 super::triggers::FireEvent::Insert,
489 None,
490 Some(row.clone()),
491 &table_schema.columns,
492 )?;
493 }
494 }
495 InsertRowOutcome::Updated { old, new } => {
496 count += 1;
497 if let Some(buf) = returning_rows.as_mut() {
498 buf.push((Some(old.clone()), Some(new.clone())));
499 }
500 let has_after_update_triggers =
501 schema.triggers_for(&table_schema.name).iter().any(|t| {
502 t.enabled
503 && t.timing == crate::parser::TriggerTiming::After
504 && t.granularity
505 == crate::parser::TriggerGranularity::ForEachRow
506 && t.events.iter().any(|e| {
507 matches!(e, crate::parser::TriggerEvent::Update(_))
508 })
509 });
510 if has_after_update_triggers {
511 let changed_cols: Vec<String> = match oc_ref {
512 CompiledOnConflict::DoUpdate { assignments, .. } => assignments
513 .iter()
514 .map(|(col_idx, _)| table_schema.columns[*col_idx].name.clone())
515 .collect(),
516 _ => Vec::new(),
517 };
518 super::triggers::fire_row_triggers(
519 &mut wtx,
520 schema,
521 &table_schema.name,
522 crate::parser::TriggerTiming::After,
523 super::triggers::FireEvent::Update {
524 changed_columns: &changed_cols,
525 },
526 Some(old),
527 Some(new),
528 &table_schema.columns,
529 )?;
530 }
531 }
532 InsertRowOutcome::Skipped => {}
533 }
534 }
535 }
536 }
537
538 if has_insert_statement_triggers {
539 super::triggers::fire_statement_triggers(
540 &mut wtx,
541 schema,
542 &table_schema.name,
543 crate::parser::TriggerTiming::After,
544 super::triggers::FireEvent::Insert,
545 &table_schema.columns,
546 &[],
547 &stmt_new_rows,
548 )?;
549 }
550
551 if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
552 let qr = super::helpers::project_returning(table_schema, returning_cols, &rows)?;
553 super::helpers::drain_deferred_fk_checks(&mut wtx)?;
554 wtx.commit().map_err(SqlError::Storage)?;
555 return Ok(ExecutionResult::Query(qr));
556 }
557
558 super::helpers::drain_deferred_fk_checks(&mut wtx)?;
559 wtx.commit().map_err(SqlError::Storage)?;
560 Ok(ExecutionResult::RowsAffected(count))
561}
562
563pub(super) fn has_subquery(expr: &Expr) -> bool {
564 crate::parser::has_subquery(expr)
565}
566
567pub(super) fn stmt_has_subquery(stmt: &SelectStmt) -> bool {
568 if let Some(ref w) = stmt.where_clause {
569 if has_subquery(w) {
570 return true;
571 }
572 }
573 if let Some(ref h) = stmt.having {
574 if has_subquery(h) {
575 return true;
576 }
577 }
578 for col in &stmt.columns {
579 if let SelectColumn::Expr { expr, .. } = col {
580 if has_subquery(expr) {
581 return true;
582 }
583 }
584 }
585 for ob in &stmt.order_by {
586 if has_subquery(&ob.expr) {
587 return true;
588 }
589 }
590 for join in &stmt.joins {
591 if let Some(ref on_expr) = join.on_clause {
592 if has_subquery(on_expr) {
593 return true;
594 }
595 }
596 }
597 false
598}
599
600pub(super) fn materialize_expr(
601 expr: &Expr,
602 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
603) -> Result<Expr> {
604 match expr {
605 Expr::InSubquery {
606 expr: e,
607 subquery,
608 negated,
609 } => {
610 let inner = materialize_expr(e, exec_sub)?;
611 let qr = exec_sub(subquery)?;
612 if !qr.columns.is_empty() && qr.columns.len() != 1 {
613 return Err(SqlError::SubqueryMultipleColumns);
614 }
615 let mut values = rustc_hash::FxHashSet::default();
616 let mut has_null = false;
617 for row in &qr.rows {
618 if row[0].is_null() {
619 has_null = true;
620 } else {
621 values.insert(row[0].clone());
622 }
623 }
624 Ok(Expr::InSet {
625 expr: Box::new(inner),
626 values,
627 has_null,
628 negated: *negated,
629 })
630 }
631 Expr::ScalarSubquery(subquery) => {
632 let qr = exec_sub(subquery)?;
633 if qr.rows.len() > 1 {
634 return Err(SqlError::SubqueryMultipleRows);
635 }
636 let val = if qr.rows.is_empty() {
637 Value::Null
638 } else {
639 qr.rows[0][0].clone()
640 };
641 Ok(Expr::Literal(val))
642 }
643 Expr::Exists { subquery, negated } => {
644 let qr = exec_sub(subquery)?;
645 let exists = !qr.rows.is_empty();
646 let result = if *negated { !exists } else { exists };
647 Ok(Expr::Literal(Value::Boolean(result)))
648 }
649 Expr::InList {
650 expr: e,
651 list,
652 negated,
653 } => {
654 let inner = materialize_expr(e, exec_sub)?;
655 let items = list
656 .iter()
657 .map(|item| materialize_expr(item, exec_sub))
658 .collect::<Result<Vec<_>>>()?;
659 Ok(Expr::InList {
660 expr: Box::new(inner),
661 list: items,
662 negated: *negated,
663 })
664 }
665 Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
666 left: Box::new(materialize_expr(left, exec_sub)?),
667 op: *op,
668 right: Box::new(materialize_expr(right, exec_sub)?),
669 }),
670 Expr::UnaryOp { op, expr: e } => Ok(Expr::UnaryOp {
671 op: *op,
672 expr: Box::new(materialize_expr(e, exec_sub)?),
673 }),
674 Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(materialize_expr(e, exec_sub)?))),
675 Expr::IsNotNull(e) => Ok(Expr::IsNotNull(Box::new(materialize_expr(e, exec_sub)?))),
676 Expr::InSet {
677 expr: e,
678 values,
679 has_null,
680 negated,
681 } => Ok(Expr::InSet {
682 expr: Box::new(materialize_expr(e, exec_sub)?),
683 values: values.clone(),
684 has_null: *has_null,
685 negated: *negated,
686 }),
687 Expr::Between {
688 expr: e,
689 low,
690 high,
691 negated,
692 } => Ok(Expr::Between {
693 expr: Box::new(materialize_expr(e, exec_sub)?),
694 low: Box::new(materialize_expr(low, exec_sub)?),
695 high: Box::new(materialize_expr(high, exec_sub)?),
696 negated: *negated,
697 }),
698 Expr::Like {
699 expr: e,
700 pattern,
701 escape,
702 negated,
703 } => {
704 let esc = escape
705 .as_ref()
706 .map(|es| materialize_expr(es, exec_sub).map(Box::new))
707 .transpose()?;
708 Ok(Expr::Like {
709 expr: Box::new(materialize_expr(e, exec_sub)?),
710 pattern: Box::new(materialize_expr(pattern, exec_sub)?),
711 escape: esc,
712 negated: *negated,
713 })
714 }
715 Expr::Case {
716 operand,
717 conditions,
718 else_result,
719 } => {
720 let op = operand
721 .as_ref()
722 .map(|e| materialize_expr(e, exec_sub).map(Box::new))
723 .transpose()?;
724 let conds = conditions
725 .iter()
726 .map(|(c, r)| {
727 Ok((
728 materialize_expr(c, exec_sub)?,
729 materialize_expr(r, exec_sub)?,
730 ))
731 })
732 .collect::<Result<Vec<_>>>()?;
733 let else_r = else_result
734 .as_ref()
735 .map(|e| materialize_expr(e, exec_sub).map(Box::new))
736 .transpose()?;
737 Ok(Expr::Case {
738 operand: op,
739 conditions: conds,
740 else_result: else_r,
741 })
742 }
743 Expr::Coalesce(args) => {
744 let materialized = args
745 .iter()
746 .map(|a| materialize_expr(a, exec_sub))
747 .collect::<Result<Vec<_>>>()?;
748 Ok(Expr::Coalesce(materialized))
749 }
750 Expr::Cast { expr: e, data_type } => Ok(Expr::Cast {
751 expr: Box::new(materialize_expr(e, exec_sub)?),
752 data_type: *data_type,
753 }),
754 Expr::Function {
755 name,
756 args,
757 distinct,
758 } => {
759 let materialized = args
760 .iter()
761 .map(|a| materialize_expr(a, exec_sub))
762 .collect::<Result<Vec<_>>>()?;
763 Ok(Expr::Function {
764 name: name.clone(),
765 args: materialized,
766 distinct: *distinct,
767 })
768 }
769 other => Ok(other.clone()),
770 }
771}
772
773pub(super) fn materialize_stmt(
774 stmt: &SelectStmt,
775 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
776) -> Result<SelectStmt> {
777 let where_clause = stmt
778 .where_clause
779 .as_ref()
780 .map(|e| materialize_expr(e, exec_sub))
781 .transpose()?;
782 let having = stmt
783 .having
784 .as_ref()
785 .map(|e| materialize_expr(e, exec_sub))
786 .transpose()?;
787 let columns = stmt
788 .columns
789 .iter()
790 .map(|c| match c {
791 SelectColumn::AllColumns => Ok(SelectColumn::AllColumns),
792 SelectColumn::AllFromOld => Ok(SelectColumn::AllFromOld),
793 SelectColumn::AllFromNew => Ok(SelectColumn::AllFromNew),
794 SelectColumn::Expr { expr, alias } => Ok(SelectColumn::Expr {
795 expr: materialize_expr(expr, exec_sub)?,
796 alias: alias.clone(),
797 }),
798 })
799 .collect::<Result<Vec<_>>>()?;
800 let order_by = stmt
801 .order_by
802 .iter()
803 .map(|ob| {
804 Ok(OrderByItem {
805 expr: materialize_expr(&ob.expr, exec_sub)?,
806 descending: ob.descending,
807 nulls_first: ob.nulls_first,
808 })
809 })
810 .collect::<Result<Vec<_>>>()?;
811 let joins = stmt
812 .joins
813 .iter()
814 .map(|j| {
815 let on_clause = j
816 .on_clause
817 .as_ref()
818 .map(|e| materialize_expr(e, exec_sub))
819 .transpose()?;
820 Ok(JoinClause {
821 join_type: j.join_type,
822 table: j.table.clone(),
823 subquery: j.subquery.clone(),
824 on_clause,
825 })
826 })
827 .collect::<Result<Vec<_>>>()?;
828 let group_by = stmt
829 .group_by
830 .iter()
831 .map(|e| materialize_expr(e, exec_sub))
832 .collect::<Result<Vec<_>>>()?;
833 Ok(SelectStmt {
834 columns,
835 from: stmt.from.clone(),
836 from_alias: stmt.from_alias.clone(),
837 from_subquery: stmt.from_subquery.clone(),
838 from_args: stmt.from_args.clone(),
839 from_json_table: stmt.from_json_table.clone(),
840 joins,
841 distinct: stmt.distinct,
842 where_clause,
843 order_by,
844 limit: stmt.limit.clone(),
845 offset: stmt.offset.clone(),
846 group_by,
847 having,
848 })
849}
850
851pub(super) fn exec_subquery_read(
852 db: &Database,
853 schema: &SchemaManager,
854 stmt: &SelectStmt,
855 ctes: &CteContext,
856) -> Result<QueryResult> {
857 let mut rtx = db.begin_read();
858 exec_subquery_with_read(&mut rtx, schema, stmt, ctes)
859}
860
861pub(super) fn exec_subquery_with_read(
862 rtx: &mut ReadTxn<'_>,
863 schema: &SchemaManager,
864 stmt: &SelectStmt,
865 ctes: &CteContext,
866) -> Result<QueryResult> {
867 match super::exec_select_with_read(rtx, schema, stmt, ctes)? {
868 ExecutionResult::Query(qr) => Ok(qr),
869 _ => Ok(QueryResult {
870 columns: vec![],
871 rows: vec![],
872 }),
873 }
874}
875
876pub(super) fn exec_subquery_write(
877 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
878 schema: &SchemaManager,
879 stmt: &SelectStmt,
880 ctes: &CteContext,
881) -> Result<QueryResult> {
882 match super::exec_select_in_txn(wtx, schema, stmt, ctes)? {
883 ExecutionResult::Query(qr) => Ok(qr),
884 _ => Ok(QueryResult {
885 columns: vec![],
886 rows: vec![],
887 }),
888 }
889}
890
891pub(super) fn update_has_subquery(stmt: &UpdateStmt) -> bool {
892 stmt.where_clause.as_ref().is_some_and(has_subquery)
893 || stmt.assignments.iter().any(|(_, e)| has_subquery(e))
894}
895
896pub(super) fn materialize_update(
897 stmt: &UpdateStmt,
898 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
899) -> Result<UpdateStmt> {
900 let where_clause = stmt
901 .where_clause
902 .as_ref()
903 .map(|e| materialize_expr(e, exec_sub))
904 .transpose()?;
905 let assignments = stmt
906 .assignments
907 .iter()
908 .map(|(name, expr)| Ok((name.clone(), materialize_expr(expr, exec_sub)?)))
909 .collect::<Result<Vec<_>>>()?;
910 Ok(UpdateStmt {
911 table: stmt.table.clone(),
912 assignments,
913 where_clause,
914 returning: stmt.returning.clone(),
915 })
916}
917
918pub(super) fn delete_has_subquery(stmt: &DeleteStmt) -> bool {
919 stmt.where_clause.as_ref().is_some_and(has_subquery)
920}
921
922pub(super) fn materialize_delete(
923 stmt: &DeleteStmt,
924 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
925) -> Result<DeleteStmt> {
926 let where_clause = stmt
927 .where_clause
928 .as_ref()
929 .map(|e| materialize_expr(e, exec_sub))
930 .transpose()?;
931 Ok(DeleteStmt {
932 table: stmt.table.clone(),
933 where_clause,
934 returning: stmt.returning.clone(),
935 })
936}
937
938pub(super) fn insert_has_subquery(stmt: &InsertStmt) -> bool {
939 match &stmt.source {
940 InsertSource::Values(rows) => rows.iter().any(|row| row.iter().any(has_subquery)),
941 InsertSource::Select(_) => false,
943 }
944}
945
946pub(super) fn materialize_insert(
947 stmt: &InsertStmt,
948 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
949) -> Result<InsertStmt> {
950 let source = match &stmt.source {
951 InsertSource::Values(rows) => {
952 let mat = rows
953 .iter()
954 .map(|row| {
955 row.iter()
956 .map(|e| materialize_expr(e, exec_sub))
957 .collect::<Result<Vec<_>>>()
958 })
959 .collect::<Result<Vec<_>>>()?;
960 InsertSource::Values(mat)
961 }
962 InsertSource::Select(sq) => {
963 let ctes = sq
964 .ctes
965 .iter()
966 .map(|c| {
967 Ok(CteDefinition {
968 name: c.name.clone(),
969 column_aliases: c.column_aliases.clone(),
970 body: materialize_query_body(&c.body, exec_sub)?,
971 })
972 })
973 .collect::<Result<Vec<_>>>()?;
974 let body = materialize_query_body(&sq.body, exec_sub)?;
975 InsertSource::Select(Box::new(SelectQuery {
976 ctes,
977 recursive: sq.recursive,
978 body,
979 }))
980 }
981 };
982 Ok(InsertStmt {
983 table: stmt.table.clone(),
984 columns: stmt.columns.clone(),
985 source,
986 on_conflict: stmt.on_conflict.clone(),
987 returning: stmt.returning.clone(),
988 })
989}
990
991pub(super) fn materialize_query_body(
992 body: &QueryBody,
993 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
994) -> Result<QueryBody> {
995 match body {
996 QueryBody::Select(sel) => Ok(QueryBody::Select(Box::new(materialize_stmt(
997 sel, exec_sub,
998 )?))),
999 QueryBody::Compound(comp) => Ok(QueryBody::Compound(Box::new(CompoundSelect {
1000 op: comp.op.clone(),
1001 all: comp.all,
1002 left: Box::new(materialize_query_body(&comp.left, exec_sub)?),
1003 right: Box::new(materialize_query_body(&comp.right, exec_sub)?),
1004 order_by: comp.order_by.clone(),
1005 limit: comp.limit.clone(),
1006 offset: comp.offset.clone(),
1007 }))),
1008 QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Ok(body.clone()),
1009 }
1010}
1011
1012pub(super) fn exec_query_body_with_read(
1013 rtx: &mut ReadTxn<'_>,
1014 schema: &SchemaManager,
1015 body: &QueryBody,
1016 ctes: &CteContext,
1017) -> Result<ExecutionResult> {
1018 match body {
1019 QueryBody::Select(sel) => super::exec_select_with_read(rtx, schema, sel, ctes),
1020 QueryBody::Compound(comp) => exec_compound_select_with_read(rtx, schema, comp, ctes),
1021 QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Err(
1022 SqlError::Unsupported("DML CTE bodies require an active write transaction".into()),
1023 ),
1024 }
1025}
1026
1027pub(super) fn exec_query_body_in_txn(
1028 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1029 schema: &SchemaManager,
1030 body: &QueryBody,
1031 ctes: &CteContext,
1032) -> Result<ExecutionResult> {
1033 match body {
1034 QueryBody::Select(sel) => super::exec_select_in_txn(wtx, schema, sel, ctes),
1035 QueryBody::Compound(comp) => exec_compound_select_in_txn(wtx, schema, comp, ctes),
1036 QueryBody::Insert(ins) => exec_insert_in_txn_with_ctes(wtx, schema, ins, &[], ctes),
1037 QueryBody::Update(upd) => super::exec_update_in_txn(wtx, schema, upd),
1038 QueryBody::Delete(del) => super::exec_delete_in_txn(wtx, schema, del),
1039 }
1040}
1041
1042pub(super) fn exec_query_body_read(
1043 db: &Database,
1044 schema: &SchemaManager,
1045 body: &QueryBody,
1046 ctes: &CteContext,
1047) -> Result<QueryResult> {
1048 let mut rtx = db.begin_read();
1049 exec_query_body_with_read_qr(&mut rtx, schema, body, ctes)
1050}
1051
1052pub(super) fn exec_query_body_with_read_qr(
1053 rtx: &mut ReadTxn<'_>,
1054 schema: &SchemaManager,
1055 body: &QueryBody,
1056 ctes: &CteContext,
1057) -> Result<QueryResult> {
1058 match exec_query_body_with_read(rtx, schema, body, ctes)? {
1059 ExecutionResult::Query(qr) => Ok(qr),
1060 _ => Ok(QueryResult {
1061 columns: vec![],
1062 rows: vec![],
1063 }),
1064 }
1065}
1066
1067pub(super) fn exec_query_body_write(
1068 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1069 schema: &SchemaManager,
1070 body: &QueryBody,
1071 ctes: &CteContext,
1072) -> Result<QueryResult> {
1073 match exec_query_body_in_txn(wtx, schema, body, ctes)? {
1074 ExecutionResult::Query(qr) => Ok(qr),
1075 _ => Ok(QueryResult {
1076 columns: vec![],
1077 rows: vec![],
1078 }),
1079 }
1080}
1081
1082pub(super) fn exec_compound_select_with_read(
1083 rtx: &mut ReadTxn<'_>,
1084 schema: &SchemaManager,
1085 comp: &CompoundSelect,
1086 ctes: &CteContext,
1087) -> Result<ExecutionResult> {
1088 let left_qr = match exec_query_body_with_read(rtx, schema, &comp.left, ctes)? {
1089 ExecutionResult::Query(qr) => qr,
1090 _ => QueryResult {
1091 columns: vec![],
1092 rows: vec![],
1093 },
1094 };
1095 let right_qr = match exec_query_body_with_read(rtx, schema, &comp.right, ctes)? {
1096 ExecutionResult::Query(qr) => qr,
1097 _ => QueryResult {
1098 columns: vec![],
1099 rows: vec![],
1100 },
1101 };
1102 apply_set_operation(comp, left_qr, right_qr)
1103}
1104
1105pub(super) fn exec_compound_select_in_txn(
1106 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1107 schema: &SchemaManager,
1108 comp: &CompoundSelect,
1109 ctes: &CteContext,
1110) -> Result<ExecutionResult> {
1111 let left_qr = match exec_query_body_in_txn(wtx, schema, &comp.left, ctes)? {
1112 ExecutionResult::Query(qr) => qr,
1113 _ => QueryResult {
1114 columns: vec![],
1115 rows: vec![],
1116 },
1117 };
1118 let right_qr = match exec_query_body_in_txn(wtx, schema, &comp.right, ctes)? {
1119 ExecutionResult::Query(qr) => qr,
1120 _ => QueryResult {
1121 columns: vec![],
1122 rows: vec![],
1123 },
1124 };
1125 apply_set_operation(comp, left_qr, right_qr)
1126}
1127
1128pub(super) fn apply_set_operation(
1129 comp: &CompoundSelect,
1130 left_qr: QueryResult,
1131 right_qr: QueryResult,
1132) -> Result<ExecutionResult> {
1133 if !left_qr.columns.is_empty()
1134 && !right_qr.columns.is_empty()
1135 && left_qr.columns.len() != right_qr.columns.len()
1136 {
1137 return Err(SqlError::CompoundColumnCountMismatch {
1138 left: left_qr.columns.len(),
1139 right: right_qr.columns.len(),
1140 });
1141 }
1142
1143 let columns = left_qr.columns;
1144
1145 let mut rows = match (&comp.op, comp.all) {
1146 (SetOp::Union, true) => {
1147 let total = left_qr.rows.len().saturating_add(right_qr.rows.len());
1148 let mut rows = Vec::with_capacity(total);
1149 rows.extend(left_qr.rows);
1150 rows.extend(right_qr.rows);
1151 rows
1152 }
1153 (SetOp::Union, false) => {
1154 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1155 let mut rows = Vec::new();
1156 for row in left_qr.rows.into_iter().chain(right_qr.rows) {
1157 if !seen.contains(&row) {
1158 seen.insert(row.clone());
1159 rows.push(row);
1160 }
1161 }
1162 rows
1163 }
1164 (SetOp::Intersect, true) => {
1165 let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1166 for row in &right_qr.rows {
1167 *right_counts.entry(row.clone()).or_insert(0) += 1;
1168 }
1169 let mut rows = Vec::new();
1170 for row in left_qr.rows {
1171 if let Some(count) = right_counts.get_mut(&row) {
1172 if *count > 0 {
1173 *count -= 1;
1174 rows.push(row);
1175 }
1176 }
1177 }
1178 rows
1179 }
1180 (SetOp::Intersect, false) => {
1181 let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1182 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1183 let mut rows = Vec::new();
1184 for row in left_qr.rows {
1185 if right_set.contains(&row) && !seen.contains(&row) {
1186 seen.insert(row.clone());
1187 rows.push(row);
1188 }
1189 }
1190 rows
1191 }
1192 (SetOp::Except, true) => {
1193 let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1194 for row in &right_qr.rows {
1195 *right_counts.entry(row.clone()).or_insert(0) += 1;
1196 }
1197 let mut rows = Vec::new();
1198 for row in left_qr.rows {
1199 if let Some(count) = right_counts.get_mut(&row) {
1200 if *count > 0 {
1201 *count -= 1;
1202 continue;
1203 }
1204 }
1205 rows.push(row);
1206 }
1207 rows
1208 }
1209 (SetOp::Except, false) => {
1210 let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1211 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1212 let mut rows = Vec::new();
1213 for row in left_qr.rows {
1214 if !right_set.contains(&row) && !seen.contains(&row) {
1215 seen.insert(row.clone());
1216 rows.push(row);
1217 }
1218 }
1219 rows
1220 }
1221 };
1222
1223 if !comp.order_by.is_empty() {
1224 let col_defs: Vec<crate::types::ColumnDef> = columns
1225 .iter()
1226 .enumerate()
1227 .map(|(i, name)| crate::types::ColumnDef {
1228 name: name.clone(),
1229 data_type: crate::types::DataType::Null,
1230 nullable: true,
1231 position: i as u16,
1232 default_expr: None,
1233 default_sql: None,
1234 check_expr: None,
1235 check_sql: None,
1236 check_name: None,
1237 is_with_timezone: false,
1238 generated_expr: None,
1239 generated_sql: None,
1240 generated_kind: None,
1241 collation: crate::types::Collation::Binary,
1242 })
1243 .collect();
1244 sort_rows(&mut rows, &comp.order_by, &col_defs)?;
1245 }
1246
1247 if let Some(ref offset_expr) = comp.offset {
1248 let offset = eval_const_int(offset_expr)?.max(0) as usize;
1249 if offset < rows.len() {
1250 rows = rows.split_off(offset);
1251 } else {
1252 rows.clear();
1253 }
1254 }
1255
1256 if let Some(ref limit_expr) = comp.limit {
1257 let limit = eval_const_int(limit_expr)?.max(0) as usize;
1258 rows.truncate(limit);
1259 }
1260
1261 Ok(ExecutionResult::Query(QueryResult { columns, rows }))
1262}
1263
1264struct InsertBufs {
1265 row: Vec<Value>,
1266 pk_values: Vec<Value>,
1267 value_values: Vec<Value>,
1268 key_buf: Vec<u8>,
1269 value_buf: Vec<u8>,
1270 col_indices: Vec<usize>,
1271 fk_key_buf: Vec<u8>,
1272}
1273
1274impl InsertBufs {
1275 fn new() -> Self {
1276 Self {
1277 row: Vec::new(),
1278 pk_values: Vec::new(),
1279 value_values: Vec::new(),
1280 key_buf: Vec::with_capacity(64),
1281 value_buf: Vec::with_capacity(256),
1282 col_indices: Vec::new(),
1283 fk_key_buf: Vec::with_capacity(64),
1284 }
1285 }
1286}
1287
1288thread_local! {
1289 static INSERT_SCRATCH: RefCell<InsertBufs> = RefCell::new(InsertBufs::new());
1290 static UPSERT_SCRATCH: RefCell<UpsertBufs> = RefCell::new(UpsertBufs::new());
1291}
1292
1293fn with_insert_scratch<R>(f: impl FnOnce(&mut InsertBufs) -> R) -> R {
1294 INSERT_SCRATCH.with(|slot| match slot.try_borrow_mut() {
1295 Ok(mut borrowed) => f(&mut borrowed),
1296 Err(_) => {
1297 let mut local = InsertBufs::new();
1298 f(&mut local)
1299 }
1300 })
1301}
1302
1303pub(super) struct UpsertBufs {
1304 old_row: Vec<Value>,
1305 new_row: Vec<Value>,
1306 value_values: Vec<Value>,
1307 new_value_buf: Vec<u8>,
1308}
1309
1310impl UpsertBufs {
1311 pub(super) fn new() -> Self {
1312 Self {
1313 old_row: Vec::new(),
1314 new_row: Vec::new(),
1315 value_values: Vec::new(),
1316 new_value_buf: Vec::with_capacity(256),
1317 }
1318 }
1319}
1320
1321pub fn exec_insert_in_txn(
1322 wtx: &mut WriteTxn<'_>,
1323 schema: &SchemaManager,
1324 stmt: &InsertStmt,
1325 params: &[Value],
1326) -> Result<ExecutionResult> {
1327 with_insert_scratch(|bufs| {
1328 exec_insert_in_txn_impl(
1329 wtx,
1330 schema,
1331 stmt,
1332 params,
1333 bufs,
1334 None,
1335 &CteContext::default(),
1336 )
1337 })
1338}
1339
1340pub(super) fn exec_insert_in_txn_with_ctes(
1341 wtx: &mut WriteTxn<'_>,
1342 schema: &SchemaManager,
1343 stmt: &InsertStmt,
1344 params: &[Value],
1345 outer_ctes: &CteContext,
1346) -> Result<ExecutionResult> {
1347 with_insert_scratch(|bufs| {
1348 exec_insert_in_txn_impl(wtx, schema, stmt, params, bufs, None, outer_ctes)
1349 })
1350}
1351
1352fn exec_insert_in_txn_cached(
1353 wtx: &mut WriteTxn<'_>,
1354 schema: &SchemaManager,
1355 stmt: &InsertStmt,
1356 params: &[Value],
1357 cache: &InsertCache,
1358) -> Result<ExecutionResult> {
1359 with_insert_scratch(|bufs| {
1360 exec_insert_in_txn_impl(
1361 wtx,
1362 schema,
1363 stmt,
1364 params,
1365 bufs,
1366 Some(cache),
1367 &CteContext::default(),
1368 )
1369 })
1370}
1371
1372fn exec_insert_in_txn_impl(
1373 wtx: &mut WriteTxn<'_>,
1374 schema: &SchemaManager,
1375 stmt: &InsertStmt,
1376 params: &[Value],
1377 bufs: &mut InsertBufs,
1378 cache: Option<&InsertCache>,
1379 outer_ctes: &CteContext,
1380) -> Result<ExecutionResult> {
1381 let empty_ctes = CteContext::default();
1382 let materialized;
1383 let has_sub = match cache {
1384 Some(c) => c.has_subquery,
1385 None => insert_has_subquery(stmt),
1386 };
1387 let stmt = if has_sub {
1388 materialized = materialize_insert(stmt, &mut |sub| {
1389 exec_subquery_write(wtx, schema, sub, &empty_ctes)
1390 })?;
1391 &materialized
1392 } else {
1393 stmt
1394 };
1395
1396 let view_lookup_key = stmt.table.to_ascii_lowercase();
1397 if let Some(view_def) = schema.get_view(&view_lookup_key) {
1398 if super::triggers::has_instead_of(
1399 schema,
1400 &view_lookup_key,
1401 super::triggers::FireEvent::Insert,
1402 ) {
1403 let aliases = view_def.column_aliases.clone();
1404 return exec_instead_of_view_insert_in_txn(
1405 wtx,
1406 schema,
1407 &view_lookup_key,
1408 &aliases,
1409 stmt,
1410 params,
1411 );
1412 }
1413 return Err(SqlError::CannotModifyView(stmt.table.clone()));
1414 }
1415
1416 let table_schema = schema
1417 .get(&stmt.table)
1418 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1419
1420 let default_columns;
1421 let insert_columns: &[String] = if stmt.columns.is_empty() {
1422 default_columns = table_schema
1423 .columns
1424 .iter()
1425 .map(|c| c.name.clone())
1426 .collect::<Vec<_>>();
1427 &default_columns
1428 } else {
1429 &stmt.columns
1430 };
1431
1432 bufs.col_indices.clear();
1433 if let Some(c) = cache {
1434 bufs.col_indices.extend_from_slice(&c.col_indices);
1435 } else {
1436 for name in insert_columns {
1437 bufs.col_indices.push(
1438 table_schema
1439 .column_index(name)
1440 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?,
1441 );
1442 }
1443 }
1444
1445 if cache.is_none() {
1446 for &ci in &bufs.col_indices {
1447 if table_schema.columns[ci].generated_kind.is_some() {
1448 return Err(SqlError::CannotInsertIntoGeneratedColumn(
1449 table_schema.columns[ci].name.clone(),
1450 ));
1451 }
1452 }
1453 }
1454
1455 let generated_cols_uncached: Vec<(usize, &Expr, FastGenEval)>;
1456 let cached_gen_positions: &[usize];
1457 let cached_gen_fast_evals: &[FastGenEval];
1458 if let Some(c) = cache {
1459 cached_gen_positions = &c.generated_col_positions;
1460 cached_gen_fast_evals = &c.generated_fast_evals;
1461 generated_cols_uncached = Vec::new();
1462 } else {
1463 cached_gen_positions = &[];
1464 cached_gen_fast_evals = &[];
1465 generated_cols_uncached = table_schema
1466 .columns
1467 .iter()
1468 .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
1469 .map(|c| {
1470 let expr = c.generated_expr.as_ref().unwrap();
1471 let fe = detect_fast_gen_eval(expr, table_schema);
1472 (c.position as usize, expr, fe)
1473 })
1474 .collect();
1475 }
1476 let has_gen_cols = !cached_gen_positions.is_empty() || !generated_cols_uncached.is_empty();
1477 let row_col_map_for_gen_owned: Option<ColumnMap> = if !has_gen_cols || cache.is_some() {
1478 None
1479 } else {
1480 Some(ColumnMap::new(&table_schema.columns))
1481 };
1482 let row_col_map_for_gen: Option<&ColumnMap> = if !has_gen_cols {
1483 None
1484 } else if let Some(c) = cache {
1485 c.row_col_map.as_ref()
1486 } else {
1487 row_col_map_for_gen_owned.as_ref()
1488 };
1489
1490 let any_defaults = match cache {
1491 Some(c) => c.any_defaults,
1492 None => table_schema
1493 .columns
1494 .iter()
1495 .any(|c| c.default_expr.is_some()),
1496 };
1497 let defaults: Vec<(usize, &Expr)> = if any_defaults {
1498 table_schema
1499 .columns
1500 .iter()
1501 .filter(|c| {
1502 c.default_expr.is_some() && !bufs.col_indices.contains(&(c.position as usize))
1503 })
1504 .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
1505 .collect()
1506 } else {
1507 Vec::new()
1508 };
1509
1510 let has_checks = match cache {
1511 Some(c) => c.has_checks,
1512 None => table_schema.has_checks(),
1513 };
1514 let check_col_map = if has_checks {
1515 Some(ColumnMap::new(&table_schema.columns))
1516 } else {
1517 None
1518 };
1519
1520 let (pk_indices, non_pk, enc_pos, phys_count, dropped): (
1521 &[usize],
1522 &[usize],
1523 &[u16],
1524 usize,
1525 &[u16],
1526 ) = if let Some(c) = cache {
1527 (
1528 &c.pk_indices,
1529 &c.non_pk_indices,
1530 &c.encoding_positions,
1531 c.phys_count,
1532 &c.dropped_non_pk_slots,
1533 )
1534 } else {
1535 (
1536 table_schema.pk_indices(),
1537 table_schema.non_pk_indices(),
1538 table_schema.encoding_positions(),
1539 table_schema.physical_non_pk_count(),
1540 table_schema.dropped_non_pk_slots(),
1541 )
1542 };
1543
1544 bufs.row.resize(table_schema.columns.len(), Value::Null);
1545 bufs.pk_values.resize(pk_indices.len(), Value::Null);
1546 bufs.value_values.resize(phys_count, Value::Null);
1547
1548 let table_bytes = table_schema.name.as_bytes();
1549 let has_fks = !table_schema.foreign_keys.is_empty();
1550 let has_indices = !table_schema.indices.is_empty();
1551 let has_defaults = !defaults.is_empty();
1552
1553 let compiled_conflict: Option<Arc<CompiledOnConflict>> = match (cache, &stmt.on_conflict) {
1554 (Some(c), Some(_)) if c.on_conflict.is_some() => c.on_conflict.clone(),
1555 (_, Some(oc)) => Some(Arc::new(compile_on_conflict(oc, table_schema)?)),
1556 (_, None) => None,
1557 };
1558
1559 let row_col_map_owned: Option<ColumnMap> =
1560 if compiled_conflict.is_some() && cache.and_then(|c| c.row_col_map.as_ref()).is_none() {
1561 Some(ColumnMap::new(&table_schema.columns))
1562 } else {
1563 None
1564 };
1565 let row_col_map: Option<&ColumnMap> = cache
1566 .and_then(|c| c.row_col_map.as_ref())
1567 .or(row_col_map_owned.as_ref());
1568
1569 let select_rows = match &stmt.source {
1570 InsertSource::Select(sq) => {
1571 let insert_ctes = super::materialize_all_ctes_with_outer(
1572 &sq.ctes,
1573 sq.recursive,
1574 outer_ctes,
1575 &mut |body, ctx| exec_query_body_write(wtx, schema, body, ctx),
1576 )?;
1577 let qr = exec_query_body_write(wtx, schema, &sq.body, &insert_ctes)?;
1578 Some(qr.rows)
1579 }
1580 InsertSource::Values(_) => None,
1581 };
1582
1583 let mut count: u64 = 0;
1584 let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
1585 stmt.returning.as_ref().map(|_| Vec::new());
1586
1587 let values = match &stmt.source {
1588 InsertSource::Values(rows) => Some(rows.as_slice()),
1589 InsertSource::Select(_) => None,
1590 };
1591 let sel_rows = select_rows.as_deref();
1592
1593 let total = match (values, sel_rows) {
1594 (Some(rows), _) => rows.len(),
1595 (_, Some(rows)) => rows.len(),
1596 _ => 0,
1597 };
1598
1599 if let Some(sel) = sel_rows {
1600 if !sel.is_empty() && sel[0].len() != insert_columns.len() {
1601 return Err(SqlError::InvalidValue(format!(
1602 "INSERT ... SELECT column count mismatch: expected {}, got {}",
1603 insert_columns.len(),
1604 sel[0].len()
1605 )));
1606 }
1607 }
1608
1609 let has_insert_statement_triggers_impl =
1610 schema.triggers_for(&table_schema.name).iter().any(|t| {
1611 t.enabled
1612 && t.granularity == crate::parser::TriggerGranularity::ForEachStatement
1613 && t.events
1614 .iter()
1615 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1616 });
1617 let mut stmt_new_rows_impl: Vec<Vec<Value>> = if has_insert_statement_triggers_impl {
1618 Vec::with_capacity(total)
1619 } else {
1620 Vec::new()
1621 };
1622 if has_insert_statement_triggers_impl {
1623 super::triggers::fire_statement_triggers(
1624 wtx,
1625 schema,
1626 &table_schema.name,
1627 crate::parser::TriggerTiming::Before,
1628 super::triggers::FireEvent::Insert,
1629 &table_schema.columns,
1630 &[],
1631 &[],
1632 )?;
1633 }
1634
1635 let skip_row_clear = cache.is_some_and(|c| c.row_fully_overwritten);
1636 for idx in 0..total {
1637 if !skip_row_clear {
1638 for v in bufs.row.iter_mut() {
1639 *v = Value::Null;
1640 }
1641 }
1642
1643 if let Some(value_rows) = values {
1644 if let Some(plan) = cache.and_then(|c| c.bind_plan.as_ref()) {
1645 for action in plan {
1646 match action {
1647 BindAction::Param {
1648 param_idx,
1649 col_idx,
1650 target,
1651 } => {
1652 let v = ¶ms[*param_idx];
1653 bufs.row[*col_idx] = if v.is_null() {
1654 Value::Null
1655 } else if v.data_type() == *target {
1656 v.clone()
1657 } else {
1658 let got = v.data_type();
1659 v.clone().coerce_into(*target).ok_or_else(|| {
1660 SqlError::TypeMismatch {
1661 expected: target.to_string(),
1662 got: got.to_string(),
1663 }
1664 })?
1665 };
1666 }
1667 BindAction::Literal { value, col_idx } => {
1668 bufs.row[*col_idx] = value.clone();
1669 }
1670 }
1671 }
1672 } else {
1673 let value_row = &value_rows[idx];
1674 if value_row.len() != insert_columns.len() {
1675 return Err(SqlError::InvalidValue(format!(
1676 "expected {} values, got {}",
1677 insert_columns.len(),
1678 value_row.len()
1679 )));
1680 }
1681 for (i, expr) in value_row.iter().enumerate() {
1682 let val = match expr {
1683 Expr::Parameter(n) => params
1684 .get(n - 1)
1685 .cloned()
1686 .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
1687 Expr::Literal(v) => v.clone(),
1688 _ => eval_const_expr(expr)?,
1689 };
1690 let col_idx = bufs.col_indices[i];
1691 let col = &table_schema.columns[col_idx];
1692 let got_type = val.data_type();
1693 bufs.row[col_idx] = if val.is_null() {
1694 Value::Null
1695 } else {
1696 val.coerce_into(col.data_type)
1697 .ok_or_else(|| SqlError::TypeMismatch {
1698 expected: col.data_type.to_string(),
1699 got: got_type.to_string(),
1700 })?
1701 };
1702 }
1703 }
1704 } else if let Some(sel) = sel_rows {
1705 let sel_row = &sel[idx];
1706 for (i, val) in sel_row.iter().enumerate() {
1707 let col_idx = bufs.col_indices[i];
1708 let col = &table_schema.columns[col_idx];
1709 let got_type = val.data_type();
1710 bufs.row[col_idx] = if val.is_null() {
1711 Value::Null
1712 } else {
1713 val.clone().coerce_into(col.data_type).ok_or_else(|| {
1714 SqlError::TypeMismatch {
1715 expected: col.data_type.to_string(),
1716 got: got_type.to_string(),
1717 }
1718 })?
1719 };
1720 }
1721 }
1722
1723 if has_defaults {
1724 for &(pos, def_expr) in &defaults {
1725 let val = eval_const_expr(def_expr)?;
1726 let col = &table_schema.columns[pos];
1727 if !val.is_null() {
1728 let got_type = val.data_type();
1729 bufs.row[pos] =
1730 val.coerce_into(col.data_type)
1731 .ok_or_else(|| SqlError::TypeMismatch {
1732 expected: col.data_type.to_string(),
1733 got: got_type.to_string(),
1734 })?;
1735 }
1736 }
1737 }
1738
1739 if let Some(gen_map) = row_col_map_for_gen {
1740 if cache.is_some() {
1741 for (pos, fast) in cached_gen_positions
1742 .iter()
1743 .copied()
1744 .zip(cached_gen_fast_evals.iter())
1745 {
1746 let gen_expr = table_schema.columns[pos].generated_expr.as_ref().unwrap();
1747 let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1748 let col = &table_schema.columns[pos];
1749 bufs.row[pos] = if val.is_null() {
1750 Value::Null
1751 } else {
1752 let got_type = val.data_type();
1753 val.coerce_into(col.data_type)
1754 .ok_or_else(|| SqlError::TypeMismatch {
1755 expected: col.data_type.to_string(),
1756 got: got_type.to_string(),
1757 })?
1758 };
1759 }
1760 } else {
1761 for (pos, gen_expr, fast) in &generated_cols_uncached {
1762 let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1763 let col = &table_schema.columns[*pos];
1764 bufs.row[*pos] = if val.is_null() {
1765 Value::Null
1766 } else {
1767 let got_type = val.data_type();
1768 val.coerce_into(col.data_type)
1769 .ok_or_else(|| SqlError::TypeMismatch {
1770 expected: col.data_type.to_string(),
1771 got: got_type.to_string(),
1772 })?
1773 };
1774 }
1775 }
1776 }
1777
1778 if let Some(c) = cache {
1779 for &pos in &c.not_null_indices {
1780 if bufs.row[pos as usize].is_null() {
1781 return Err(SqlError::NotNullViolation(
1782 table_schema.columns[pos as usize].name.clone(),
1783 ));
1784 }
1785 }
1786 } else {
1787 for col in &table_schema.columns {
1788 if !col.nullable && bufs.row[col.position as usize].is_null() {
1789 return Err(SqlError::NotNullViolation(col.name.clone()));
1790 }
1791 }
1792 }
1793
1794 if let Some(ref col_map) = check_col_map {
1795 for col in &table_schema.columns {
1796 if let Some(ref check) = col.check_expr {
1797 let result = eval_expr(check, &EvalCtx::new(col_map, &bufs.row))?;
1798 if !is_truthy(&result) && !result.is_null() {
1799 let name = col.check_name.as_deref().unwrap_or(&col.name);
1800 return Err(SqlError::CheckViolation(name.to_string()));
1801 }
1802 }
1803 }
1804 for tc in &table_schema.check_constraints {
1805 let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &bufs.row))?;
1806 if !is_truthy(&result) && !result.is_null() {
1807 let name = tc.name.as_deref().unwrap_or(&tc.sql);
1808 return Err(SqlError::CheckViolation(name.to_string()));
1809 }
1810 }
1811 }
1812
1813 if has_fks {
1814 for fk in &table_schema.foreign_keys {
1815 let any_null = fk.columns.iter().any(|&ci| bufs.row[ci as usize].is_null());
1816 if any_null {
1817 continue;
1818 }
1819 crate::encoding::encode_composite_key_from_indices(
1820 &fk.columns,
1821 &bufs.row,
1822 &mut bufs.fk_key_buf,
1823 );
1824 if fk.deferrable && fk.initially_deferred {
1825 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
1826 wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
1827 fk_name: name,
1828 foreign_table: fk.foreign_table.as_bytes().to_vec(),
1829 parent_key: bufs.fk_key_buf.clone(),
1830 });
1831 continue;
1832 }
1833 if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &bufs.fk_key_buf) {
1834 let found = wtx
1835 .table_get(fk.foreign_table.as_bytes(), &bufs.fk_key_buf)
1836 .map_err(SqlError::Storage)?;
1837 if found.is_none() {
1838 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
1839 return Err(SqlError::ForeignKeyViolation(name.to_string()));
1840 }
1841 wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &bufs.fk_key_buf);
1842 }
1843 }
1844 }
1845
1846 let proposed_row_for_returning: Option<Vec<Value>> =
1847 returning_rows.as_ref().map(|_| bufs.row.clone());
1848 let row_for_stmt_trigger_impl: Option<Vec<Value>> = if has_insert_statement_triggers_impl {
1849 Some(bufs.row.clone())
1850 } else {
1851 None
1852 };
1853
1854 let has_before_insert_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
1855 t.enabled
1856 && t.timing == crate::parser::TriggerTiming::Before
1857 && t.granularity == crate::parser::TriggerGranularity::ForEachRow
1858 && t.events
1859 .iter()
1860 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1861 });
1862 if has_before_insert_triggers {
1863 super::triggers::fire_row_triggers(
1864 wtx,
1865 schema,
1866 &table_schema.name,
1867 crate::parser::TriggerTiming::Before,
1868 super::triggers::FireEvent::Insert,
1869 None,
1870 Some(bufs.row.clone()),
1871 &table_schema.columns,
1872 )?;
1873 }
1874
1875 for (j, &i) in pk_indices.iter().enumerate() {
1876 bufs.pk_values[j] = std::mem::replace(&mut bufs.row[i], Value::Null);
1877 }
1878 match cache.map(|c| c.single_int_pk).unwrap_or(false) {
1879 true => match bufs.pk_values[0] {
1880 Value::Integer(v) => crate::encoding::encode_int_key_into(v, &mut bufs.key_buf),
1881 _ => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1882 },
1883 false => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1884 }
1885
1886 for &slot in dropped {
1887 bufs.value_values[slot as usize] = Value::Null;
1888 }
1889 for (j, &i) in non_pk.iter().enumerate() {
1890 let col = &table_schema.columns[i];
1891 if matches!(
1892 col.generated_kind,
1893 Some(crate::parser::GeneratedKind::Virtual)
1894 ) {
1895 bufs.value_values[enc_pos[j] as usize] = Value::Null;
1896 bufs.row[i] = Value::Null;
1897 } else {
1898 bufs.value_values[enc_pos[j] as usize] =
1899 std::mem::replace(&mut bufs.row[i], Value::Null);
1900 }
1901 }
1902 match cache.and_then(|c| c.row_encoder.as_ref()) {
1903 Some(tmpl) => crate::encoding::encode_int_row_with_template(
1904 tmpl,
1905 &bufs.value_values,
1906 &mut bufs.value_buf,
1907 )?,
1908 None => encode_row_into(&bufs.value_values, &mut bufs.value_buf),
1909 }
1910
1911 if bufs.key_buf.len() > citadel_core::MAX_KEY_SIZE {
1912 return Err(SqlError::KeyTooLarge {
1913 size: bufs.key_buf.len(),
1914 max: citadel_core::MAX_KEY_SIZE,
1915 });
1916 }
1917 if bufs.value_buf.len() > citadel_core::MAX_VALUE_SIZE {
1918 return Err(SqlError::RowTooLarge {
1919 size: bufs.value_buf.len(),
1920 max: citadel_core::MAX_VALUE_SIZE,
1921 });
1922 }
1923
1924 match compiled_conflict.as_ref() {
1925 None => {
1926 let is_new = wtx
1927 .table_insert(table_bytes, &bufs.key_buf, &bufs.value_buf)
1928 .map_err(SqlError::Storage)?;
1929 if !is_new {
1930 return Err(SqlError::DuplicateKey);
1931 }
1932 let has_after_insert_triggers =
1933 schema.triggers_for(&table_schema.name).iter().any(|t| {
1934 t.enabled
1935 && t.timing == crate::parser::TriggerTiming::After
1936 && t.granularity == crate::parser::TriggerGranularity::ForEachRow
1937 && t.events
1938 .iter()
1939 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1940 });
1941 if has_indices || has_after_insert_triggers {
1942 for (j, &i) in pk_indices.iter().enumerate() {
1943 bufs.row[i] = bufs.pk_values[j].clone();
1944 }
1945 for (j, &i) in non_pk.iter().enumerate() {
1946 bufs.row[i] = std::mem::replace(
1947 &mut bufs.value_values[enc_pos[j] as usize],
1948 Value::Null,
1949 );
1950 }
1951 if has_indices {
1952 insert_index_entries(wtx, table_schema, &bufs.row, &bufs.pk_values)?;
1953 }
1954 if has_after_insert_triggers {
1955 super::triggers::fire_row_triggers(
1956 wtx,
1957 schema,
1958 &table_schema.name,
1959 crate::parser::TriggerTiming::After,
1960 super::triggers::FireEvent::Insert,
1961 None,
1962 Some(bufs.row.clone()),
1963 &table_schema.columns,
1964 )?;
1965 }
1966 }
1967 if let Some(r) = row_for_stmt_trigger_impl.clone() {
1968 stmt_new_rows_impl.push(r);
1969 }
1970 count += 1;
1971 if let Some(buf) = returning_rows.as_mut() {
1972 buf.push((None, proposed_row_for_returning));
1973 }
1974 }
1975 Some(oc) => {
1976 let oc_ref: &CompiledOnConflict = oc;
1977 let needs_row = upsert_needs_row(oc_ref, table_schema);
1978 if needs_row {
1979 for (j, &i) in pk_indices.iter().enumerate() {
1980 bufs.row[i] = bufs.pk_values[j].clone();
1981 }
1982 for (j, &i) in non_pk.iter().enumerate() {
1983 bufs.row[i] = std::mem::replace(
1984 &mut bufs.value_values[enc_pos[j] as usize],
1985 Value::Null,
1986 );
1987 }
1988 }
1989 let outcome = apply_insert_with_conflict(
1990 wtx,
1991 table_schema,
1992 &bufs.key_buf,
1993 &bufs.value_buf,
1994 &bufs.row,
1995 &bufs.pk_values,
1996 oc_ref,
1997 row_col_map.unwrap(),
1998 stmt.returning.is_some(),
1999 )?;
2000 match outcome {
2001 InsertRowOutcome::Inserted => {
2002 count += 1;
2003 if let Some(buf) = returning_rows.as_mut() {
2004 buf.push((None, proposed_row_for_returning));
2005 }
2006 if let Some(r) = row_for_stmt_trigger_impl.clone() {
2007 stmt_new_rows_impl.push(r);
2008 }
2009 let has_after_insert_triggers =
2010 schema.triggers_for(&table_schema.name).iter().any(|t| {
2011 t.enabled
2012 && t.timing == crate::parser::TriggerTiming::After
2013 && t.granularity
2014 == crate::parser::TriggerGranularity::ForEachRow
2015 && t.events
2016 .iter()
2017 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
2018 });
2019 if has_after_insert_triggers {
2020 super::triggers::fire_row_triggers(
2021 wtx,
2022 schema,
2023 &table_schema.name,
2024 crate::parser::TriggerTiming::After,
2025 super::triggers::FireEvent::Insert,
2026 None,
2027 Some(bufs.row.clone()),
2028 &table_schema.columns,
2029 )?;
2030 }
2031 }
2032 InsertRowOutcome::Updated { old, new } => {
2033 count += 1;
2034 if let Some(buf) = returning_rows.as_mut() {
2035 buf.push((Some(old.clone()), Some(new.clone())));
2036 }
2037 let has_after_update_triggers =
2038 schema.triggers_for(&table_schema.name).iter().any(|t| {
2039 t.enabled
2040 && t.timing == crate::parser::TriggerTiming::After
2041 && t.granularity
2042 == crate::parser::TriggerGranularity::ForEachRow
2043 && t.events.iter().any(|e| {
2044 matches!(e, crate::parser::TriggerEvent::Update(_))
2045 })
2046 });
2047 if has_after_update_triggers {
2048 let changed_cols: Vec<String> = match oc_ref {
2049 CompiledOnConflict::DoUpdate { assignments, .. } => assignments
2050 .iter()
2051 .map(|(col_idx, _)| table_schema.columns[*col_idx].name.clone())
2052 .collect(),
2053 _ => Vec::new(),
2054 };
2055 super::triggers::fire_row_triggers(
2056 wtx,
2057 schema,
2058 &table_schema.name,
2059 crate::parser::TriggerTiming::After,
2060 super::triggers::FireEvent::Update {
2061 changed_columns: &changed_cols,
2062 },
2063 Some(old),
2064 Some(new),
2065 &table_schema.columns,
2066 )?;
2067 }
2068 }
2069 InsertRowOutcome::Skipped => {}
2070 }
2071 }
2072 }
2073 }
2074
2075 if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
2076 if has_insert_statement_triggers_impl {
2077 super::triggers::fire_statement_triggers(
2078 wtx,
2079 schema,
2080 &table_schema.name,
2081 crate::parser::TriggerTiming::After,
2082 super::triggers::FireEvent::Insert,
2083 &table_schema.columns,
2084 &[],
2085 &stmt_new_rows_impl,
2086 )?;
2087 }
2088 return Ok(ExecutionResult::Query(super::helpers::project_returning(
2089 table_schema,
2090 returning_cols,
2091 &rows,
2092 )?));
2093 }
2094
2095 if has_insert_statement_triggers_impl {
2096 super::triggers::fire_statement_triggers(
2097 wtx,
2098 schema,
2099 &table_schema.name,
2100 crate::parser::TriggerTiming::After,
2101 super::triggers::FireEvent::Insert,
2102 &table_schema.columns,
2103 &[],
2104 &stmt_new_rows_impl,
2105 )?;
2106 }
2107
2108 Ok(ExecutionResult::RowsAffected(count))
2109}
2110
2111pub struct CompiledInsert {
2112 table_lower: String,
2113 cached: Option<InsertCache>,
2114}
2115
2116struct InsertCache {
2117 col_indices: Vec<usize>,
2118 has_subquery: bool,
2119 any_defaults: bool,
2120 has_checks: bool,
2121 on_conflict: Option<Arc<CompiledOnConflict>>,
2122 row_col_map: Option<ColumnMap>,
2123 generated_col_positions: Vec<usize>,
2124 generated_fast_evals: Vec<FastGenEval>,
2125 pk_indices: Vec<usize>,
2126 non_pk_indices: Vec<usize>,
2127 encoding_positions: Vec<u16>,
2128 dropped_non_pk_slots: Vec<u16>,
2129 phys_count: usize,
2130 single_int_pk: bool,
2131 not_null_indices: Vec<u16>,
2132 bind_plan: Option<Vec<BindAction>>,
2133 row_fully_overwritten: bool,
2134 row_encoder: Option<crate::encoding::IntRowTemplate>,
2135 is_trivial_fast: bool,
2136 trivial_fast_program: Option<TrivialFastProgram>,
2137 needs_scoped_params: bool,
2138}
2139
2140#[derive(Clone)]
2141enum BindAction {
2142 Param {
2143 param_idx: usize,
2144 col_idx: usize,
2145 target: DataType,
2146 },
2147 Literal {
2148 value: Value,
2149 col_idx: usize,
2150 },
2151}
2152
2153#[derive(Clone)]
2154struct TrivialFastProgram {
2155 template: Vec<u8>,
2156 ops: Vec<WriteOp>,
2157 pk_param: u8,
2158 not_null_param_indices: Vec<u8>,
2159}
2160
2161#[derive(Clone)]
2162enum WriteOp {
2163 ParamI64 {
2164 param_idx: u8,
2165 off: u32,
2166 },
2167 LiteralI64 {
2168 value: i64,
2169 off: u32,
2170 },
2171 GenAddParamsI64 {
2172 a_param: u8,
2173 b_param: u8,
2174 off: u32,
2175 bitmap_byte_off: u32,
2176 bitmap_bit_mask: u8,
2177 },
2178 GenMulAddParamI64 {
2179 param_idx: u8,
2180 mul: i64,
2181 add: i64,
2182 off: u32,
2183 bitmap_byte_off: u32,
2184 bitmap_bit_mask: u8,
2185 },
2186}
2187
2188fn build_trivial_fast_program(
2189 bind_plan: &[BindAction],
2190 row_encoder: &crate::encoding::IntRowTemplate,
2191 non_virtual_pairs: &[(usize, usize)],
2192 generated_col_positions: &[usize],
2193 generated_fast_evals: &[FastGenEval],
2194 pk_indices: &[usize],
2195 columns: &[crate::types::ColumnDef],
2196) -> Option<TrivialFastProgram> {
2197 let pk_col = pk_indices[0];
2198 let col_to_slot: rustc_hash::FxHashMap<usize, usize> =
2199 non_virtual_pairs.iter().copied().collect();
2200 let slot_to_off: rustc_hash::FxHashMap<usize, usize> =
2201 row_encoder.slot_offsets.iter().copied().collect();
2202
2203 let mut col_to_param: rustc_hash::FxHashMap<usize, u8> = Default::default();
2204 let mut col_to_lit_int: rustc_hash::FxHashMap<usize, i64> = Default::default();
2205 let mut pk_param: Option<u8> = None;
2206 let mut ops: Vec<WriteOp> = Vec::with_capacity(bind_plan.len() + generated_col_positions.len());
2207 let mut not_null_param_indices: Vec<u8> = Vec::new();
2208
2209 for action in bind_plan {
2210 match action {
2211 BindAction::Param {
2212 param_idx,
2213 col_idx,
2214 target,
2215 } => {
2216 if *target != DataType::Integer {
2217 return None;
2218 }
2219 let pi: u8 = u8::try_from(*param_idx).ok()?;
2220 col_to_param.insert(*col_idx, pi);
2221 if *col_idx == pk_col {
2222 pk_param = Some(pi);
2223 } else {
2224 let slot = *col_to_slot.get(col_idx)?;
2225 let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
2226 ops.push(WriteOp::ParamI64 { param_idx: pi, off });
2227 if !columns[*col_idx].nullable {
2228 not_null_param_indices.push(pi);
2229 }
2230 }
2231 }
2232 BindAction::Literal { value, col_idx } => match value {
2233 Value::Integer(v) => {
2234 col_to_lit_int.insert(*col_idx, *v);
2235 if *col_idx == pk_col {
2236 return None;
2237 }
2238 let slot = *col_to_slot.get(col_idx)?;
2239 let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
2240 ops.push(WriteOp::LiteralI64 { value: *v, off });
2241 }
2242 _ => return None,
2243 },
2244 }
2245 }
2246
2247 let pk_param = pk_param?;
2248
2249 for (i, &gen_pos) in generated_col_positions.iter().enumerate() {
2250 let gen_slot = *col_to_slot.get(&gen_pos)?;
2251 let gen_off = u32::try_from(*slot_to_off.get(&gen_slot)?).ok()?;
2252 let bitmap_byte_off = u32::try_from(2 + gen_slot / 8).ok()?;
2253 let bitmap_bit_mask: u8 = 1u8 << (gen_slot % 8);
2254 let gen_col_nullable = columns[gen_pos].nullable;
2255
2256 match &generated_fast_evals[i] {
2257 FastGenEval::IntColAddCol {
2258 left_idx,
2259 right_idx,
2260 } => {
2261 let a_param = col_to_param.get(left_idx).copied();
2262 let b_param = col_to_param.get(right_idx).copied();
2263 match (a_param, b_param) {
2264 (Some(ap), Some(bp)) => {
2265 let deps_safe = gen_col_nullable
2266 || (not_null_param_indices.contains(&ap)
2267 && not_null_param_indices.contains(&bp));
2268 if !deps_safe {
2269 return None;
2270 }
2271 ops.push(WriteOp::GenAddParamsI64 {
2272 a_param: ap,
2273 b_param: bp,
2274 off: gen_off,
2275 bitmap_byte_off,
2276 bitmap_bit_mask,
2277 });
2278 }
2279 (Some(p), None) => {
2280 let lit = col_to_lit_int.get(right_idx).copied()?;
2281 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2282 return None;
2283 }
2284 ops.push(WriteOp::GenMulAddParamI64 {
2285 param_idx: p,
2286 mul: 1,
2287 add: lit,
2288 off: gen_off,
2289 bitmap_byte_off,
2290 bitmap_bit_mask,
2291 });
2292 }
2293 (None, Some(p)) => {
2294 let lit = col_to_lit_int.get(left_idx).copied()?;
2295 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2296 return None;
2297 }
2298 ops.push(WriteOp::GenMulAddParamI64 {
2299 param_idx: p,
2300 mul: 1,
2301 add: lit,
2302 off: gen_off,
2303 bitmap_byte_off,
2304 bitmap_bit_mask,
2305 });
2306 }
2307 (None, None) => {
2308 let la = col_to_lit_int.get(left_idx).copied()?;
2309 let lb = col_to_lit_int.get(right_idx).copied()?;
2310 ops.push(WriteOp::LiteralI64 {
2311 value: la.wrapping_add(lb),
2312 off: gen_off,
2313 });
2314 }
2315 }
2316 }
2317 FastGenEval::IntColMulAdd {
2318 col_schema_idx,
2319 mul,
2320 add,
2321 } => {
2322 if let Some(p) = col_to_param.get(col_schema_idx).copied() {
2323 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2324 return None;
2325 }
2326 ops.push(WriteOp::GenMulAddParamI64 {
2327 param_idx: p,
2328 mul: *mul,
2329 add: *add,
2330 off: gen_off,
2331 bitmap_byte_off,
2332 bitmap_bit_mask,
2333 });
2334 } else if let Some(lit) = col_to_lit_int.get(col_schema_idx).copied() {
2335 ops.push(WriteOp::LiteralI64 {
2336 value: lit.wrapping_mul(*mul).wrapping_add(*add),
2337 off: gen_off,
2338 });
2339 } else {
2340 return None;
2341 }
2342 }
2343 FastGenEval::None => return None,
2344 }
2345 }
2346
2347 Some(TrivialFastProgram {
2348 template: row_encoder.template.clone(),
2349 ops,
2350 pk_param,
2351 not_null_param_indices,
2352 })
2353}
2354
2355#[derive(Clone)]
2356pub(super) enum CompiledOnConflict {
2357 DoNothing {
2358 target: Option<ConflictKind>,
2359 },
2360 DoUpdate {
2361 target: ConflictKind,
2362 assignments: Vec<(usize, Expr)>,
2363 where_clause: Option<Expr>,
2364 fast_paths: Option<Vec<DoUpdateFastPath>>,
2365 },
2366}
2367
2368#[derive(Clone, Copy)]
2369pub(super) enum DoUpdateFastPath {
2370 IntAddConst { phys_idx: usize, delta: i64 },
2371}
2372
2373#[derive(Clone, Debug)]
2374pub(super) enum ConflictKind {
2375 PrimaryKey,
2376 UniqueIndex { index_idx: usize },
2377}
2378
2379fn resolve_conflict_target(target: &ConflictTarget, ts: &TableSchema) -> Result<ConflictKind> {
2380 match target {
2381 ConflictTarget::Columns(cols) => {
2382 let col_idx_set: Vec<u16> = cols
2383 .iter()
2384 .map(|name| {
2385 ts.column_index(name)
2386 .map(|i| i as u16)
2387 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
2388 })
2389 .collect::<Result<_>>()?;
2390 let pk_set = ts.primary_key_columns.clone();
2391 if set_equal(&col_idx_set, &pk_set) {
2392 return Ok(ConflictKind::PrimaryKey);
2393 }
2394 for (index_idx, idx) in ts.indices.iter().enumerate() {
2395 if idx.unique && set_equal(&col_idx_set, &idx.columns_vec()) {
2396 return Ok(ConflictKind::UniqueIndex { index_idx });
2397 }
2398 }
2399 Err(SqlError::Plan(
2400 "ON CONFLICT target does not match any unique constraint".into(),
2401 ))
2402 }
2403 ConflictTarget::Constraint(name) => {
2404 let lower = name.to_ascii_lowercase();
2405 for (index_idx, idx) in ts.indices.iter().enumerate() {
2406 if idx.name.eq_ignore_ascii_case(&lower) {
2407 if idx.unique {
2408 return Ok(ConflictKind::UniqueIndex { index_idx });
2409 }
2410 return Err(SqlError::Plan(format!(
2411 "ON CONFLICT ON CONSTRAINT '{name}' requires a unique index"
2412 )));
2413 }
2414 }
2415 Err(SqlError::Plan(format!(
2416 "unknown constraint '{name}'; primary keys cannot be referenced by name, use ON CONFLICT (col_list)"
2417 )))
2418 }
2419 }
2420}
2421
2422fn set_equal(a: &[u16], b: &[u16]) -> bool {
2423 if a.len() != b.len() {
2424 return false;
2425 }
2426 let mut a_sorted = a.to_vec();
2427 let mut b_sorted = b.to_vec();
2428 a_sorted.sort_unstable();
2429 b_sorted.sort_unstable();
2430 a_sorted == b_sorted
2431}
2432
2433pub(super) enum InsertRowOutcome {
2434 Inserted,
2435 Updated { old: Vec<Value>, new: Vec<Value> },
2436 Skipped,
2437}
2438
2439#[allow(clippy::too_many_arguments)]
2440#[inline]
2441pub(super) fn apply_insert_with_conflict(
2442 wtx: &mut WriteTxn<'_>,
2443 table_schema: &TableSchema,
2444 key_buf: &[u8],
2445 value_buf: &[u8],
2446 row: &[Value],
2447 pk_values: &[Value],
2448 on_conflict: &CompiledOnConflict,
2449 col_map: &ColumnMap,
2450 capture_returning: bool,
2451) -> Result<InsertRowOutcome> {
2452 let table_bytes = table_schema.name.as_bytes();
2453
2454 if let CompiledOnConflict::DoNothing { target } = on_conflict {
2455 let pk_target = matches!(target, None | Some(ConflictKind::PrimaryKey));
2456 if pk_target && table_schema.indices.is_empty() && table_schema.foreign_keys.is_empty() {
2457 let inserted = wtx
2458 .table_insert_if_absent(table_bytes, key_buf, value_buf)
2459 .map_err(SqlError::Storage)?;
2460 return Ok(if inserted {
2461 InsertRowOutcome::Inserted
2462 } else {
2463 InsertRowOutcome::Skipped
2464 });
2465 }
2466 }
2467
2468 if let CompiledOnConflict::DoUpdate {
2469 target: ConflictKind::PrimaryKey,
2470 assignments,
2471 where_clause,
2472 fast_paths,
2473 } = on_conflict
2474 {
2475 if can_fuse_do_update(table_schema, assignments) {
2476 return apply_do_update_fused(
2477 wtx,
2478 table_schema,
2479 table_bytes,
2480 key_buf,
2481 value_buf,
2482 row,
2483 assignments,
2484 where_clause.as_ref(),
2485 col_map,
2486 fast_paths.as_deref(),
2487 capture_returning,
2488 );
2489 }
2490 }
2491
2492 let primary_outcome = wtx
2493 .table_insert_or_fetch(table_bytes, key_buf, value_buf)
2494 .map_err(SqlError::Storage)?;
2495
2496 match primary_outcome {
2497 citadel_txn::write_txn::InsertOutcome::Inserted => {
2498 if table_schema.indices.is_empty() {
2499 return Ok(InsertRowOutcome::Inserted);
2500 }
2501 let mut inserted_keys: Vec<(usize, Vec<u8>)> = Vec::new();
2502 match insert_index_entries_or_fetch(
2503 wtx,
2504 table_schema,
2505 row,
2506 pk_values,
2507 &mut inserted_keys,
2508 )? {
2509 None => Ok(InsertRowOutcome::Inserted),
2510 Some(conflicting_idx) => {
2511 let matches_target =
2512 matches!(on_conflict, CompiledOnConflict::DoNothing { target: None })
2513 || matches!(
2514 on_conflict,
2515 CompiledOnConflict::DoNothing {
2516 target: Some(ConflictKind::UniqueIndex { index_idx }),
2517 } | CompiledOnConflict::DoUpdate {
2518 target: ConflictKind::UniqueIndex { index_idx },
2519 ..
2520 } if *index_idx == conflicting_idx
2521 );
2522 undo_partial_insert(wtx, table_schema, key_buf, &inserted_keys)?;
2523 if !matches_target {
2524 return Err(SqlError::UniqueViolation(
2525 table_schema.indices[conflicting_idx].name.clone(),
2526 ));
2527 }
2528 match on_conflict {
2529 CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2530 CompiledOnConflict::DoUpdate {
2531 assignments,
2532 where_clause,
2533 ..
2534 } => {
2535 let existing_pk =
2536 fetch_unique_index_pk(wtx, table_schema, conflicting_idx, row)?;
2537 apply_do_update(
2538 wtx,
2539 table_schema,
2540 &existing_pk,
2541 row,
2542 assignments,
2543 where_clause.as_ref(),
2544 col_map,
2545 capture_returning,
2546 )
2547 }
2548 }
2549 }
2550 }
2551 }
2552 citadel_txn::write_txn::InsertOutcome::Existed(old_bytes) => {
2553 let matches_target = matches!(
2554 on_conflict,
2555 CompiledOnConflict::DoNothing { target: None }
2556 | CompiledOnConflict::DoNothing {
2557 target: Some(ConflictKind::PrimaryKey),
2558 }
2559 | CompiledOnConflict::DoUpdate {
2560 target: ConflictKind::PrimaryKey,
2561 ..
2562 }
2563 );
2564 if !matches_target {
2565 return Err(SqlError::DuplicateKey);
2566 }
2567 match on_conflict {
2568 CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2569 CompiledOnConflict::DoUpdate {
2570 assignments,
2571 where_clause,
2572 ..
2573 } => {
2574 let old_row = decode_full_row(table_schema, key_buf, &old_bytes)?;
2575 apply_do_update_with_old_row(
2576 wtx,
2577 table_schema,
2578 key_buf,
2579 &old_row,
2580 row,
2581 assignments,
2582 where_clause.as_ref(),
2583 col_map,
2584 capture_returning,
2585 )
2586 }
2587 }
2588 }
2589 }
2590}
2591
2592#[inline]
2593fn apply_fast_path_patch(
2594 old_bytes: &[u8],
2595 fast_paths: &[DoUpdateFastPath],
2596) -> Result<UpsertAction> {
2597 UPSERT_SCRATCH.with(|slot| {
2598 let mut bufs = slot.borrow_mut();
2599 bufs.new_value_buf.clear();
2600 bufs.new_value_buf.extend_from_slice(old_bytes);
2601
2602 let mut patch_scratch: Vec<u8> = Vec::new();
2603
2604 for fp in fast_paths {
2605 match fp {
2606 DoUpdateFastPath::IntAddConst { phys_idx, delta } => {
2607 let decoded =
2608 crate::encoding::decode_columns(&bufs.new_value_buf, &[*phys_idx])?;
2609 let old_val = &decoded[0];
2610 let new_val = match old_val {
2611 Value::Integer(i) => Value::Integer(i.wrapping_add(*delta)),
2612 Value::Null => Value::Null,
2613 _ => {
2614 return Err(SqlError::TypeMismatch {
2615 expected: "INTEGER".into(),
2616 got: old_val.data_type().to_string(),
2617 });
2618 }
2619 };
2620 if !crate::encoding::patch_column_in_place(
2621 &mut bufs.new_value_buf,
2622 *phys_idx,
2623 &new_val,
2624 )? {
2625 patch_scratch.clear();
2626 crate::encoding::patch_row_column(
2627 &bufs.new_value_buf,
2628 *phys_idx,
2629 &new_val,
2630 &mut patch_scratch,
2631 )?;
2632 std::mem::swap(&mut bufs.new_value_buf, &mut patch_scratch);
2633 }
2634 }
2635 }
2636 }
2637
2638 if bufs.new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2639 return Err(SqlError::RowTooLarge {
2640 size: bufs.new_value_buf.len(),
2641 max: citadel_core::MAX_VALUE_SIZE,
2642 });
2643 }
2644
2645 Ok(UpsertAction::Replace(bufs.new_value_buf.clone()))
2646 })
2647}
2648
2649fn upsert_needs_row(oc: &CompiledOnConflict, ts: &TableSchema) -> bool {
2650 if !ts.indices.is_empty() {
2651 return true;
2652 }
2653 match oc {
2654 CompiledOnConflict::DoNothing { .. } => false,
2655 CompiledOnConflict::DoUpdate { fast_paths, .. } => fast_paths.is_none() || ts.has_checks(),
2656 }
2657}
2658
2659fn can_fuse_do_update(ts: &TableSchema, assignments: &[(usize, Expr)]) -> bool {
2660 if !ts.indices.is_empty() {
2661 return false;
2662 }
2663 if !ts.foreign_keys.is_empty() {
2664 return false;
2665 }
2666 if ts.columns.iter().any(|c| c.generated_kind.is_some()) {
2667 return false;
2668 }
2669 let pk = ts.pk_indices();
2670 !assignments.iter().any(|(ci, _)| pk.contains(ci))
2671}
2672
2673#[allow(clippy::too_many_arguments)]
2674#[inline]
2675fn apply_do_update_fused(
2676 wtx: &mut WriteTxn<'_>,
2677 table_schema: &TableSchema,
2678 table_bytes: &[u8],
2679 key_buf: &[u8],
2680 value_buf: &[u8],
2681 proposed_row: &[Value],
2682 assignments: &[(usize, Expr)],
2683 where_clause: Option<&Expr>,
2684 col_map: &ColumnMap,
2685 fast_paths: Option<&[DoUpdateFastPath]>,
2686 capture_returning: bool,
2687) -> Result<InsertRowOutcome> {
2688 let non_pk = table_schema.non_pk_indices();
2689 let enc_pos = table_schema.encoding_positions();
2690 let phys_count = table_schema.physical_non_pk_count();
2691 let dropped = table_schema.dropped_non_pk_slots();
2692 let has_checks = table_schema.has_checks();
2693 let has_fks = !table_schema.foreign_keys.is_empty();
2694
2695 let captured: std::cell::RefCell<Option<(Vec<Value>, Vec<Value>)>> =
2696 std::cell::RefCell::new(None);
2697
2698 let outcome =
2699 wtx.table_upsert_with::<_, SqlError>(table_bytes, key_buf, value_buf, |old_bytes| {
2700 if let Some(fps) = fast_paths {
2701 if !has_checks {
2702 let action = apply_fast_path_patch(old_bytes, fps)?;
2703 if capture_returning {
2704 if let UpsertAction::Replace(ref new_bytes) = action {
2705 let old_row = decode_full_row(table_schema, key_buf, old_bytes)?;
2706 let new_row = decode_full_row(table_schema, key_buf, new_bytes)?;
2707 *captured.borrow_mut() = Some((old_row, new_row));
2708 }
2709 }
2710 return Ok(action);
2711 }
2712 }
2713 UPSERT_SCRATCH.with(|slot| {
2714 let mut bufs = slot.borrow_mut();
2715 let UpsertBufs {
2716 old_row,
2717 new_row,
2718 value_values,
2719 new_value_buf,
2720 } = &mut *bufs;
2721
2722 old_row.clear();
2723 old_row.resize(table_schema.columns.len(), Value::Null);
2724 decode_full_row_into(table_schema, key_buf, old_bytes, old_row)?;
2725
2726 if let Some(w) = where_clause {
2727 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2728 let result = eval_expr(w, &ctx)?;
2729 if result.is_null() || !is_truthy(&result) {
2730 return Ok(UpsertAction::Skip);
2731 }
2732 }
2733
2734 new_row.clear();
2735 new_row.extend_from_slice(old_row);
2736 for (col_idx, expr) in assignments {
2737 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2738 let val = eval_expr(expr, &ctx)?;
2739 let col = &table_schema.columns[*col_idx];
2740 new_row[*col_idx] = if val.is_null() {
2741 Value::Null
2742 } else {
2743 let got = val.data_type();
2744 val.coerce_into(col.data_type)
2745 .ok_or_else(|| SqlError::TypeMismatch {
2746 expected: col.data_type.to_string(),
2747 got: got.to_string(),
2748 })?
2749 };
2750 }
2751
2752 for (assigned_idx, _) in assignments {
2753 let col = &table_schema.columns[*assigned_idx];
2754 if !col.nullable && new_row[col.position as usize].is_null() {
2755 return Err(SqlError::NotNullViolation(col.name.clone()));
2756 }
2757 }
2758 if has_checks {
2759 for col in &table_schema.columns {
2760 if let Some(ref check) = col.check_expr {
2761 let ctx = EvalCtx::new(col_map, new_row);
2762 let result = eval_expr(check, &ctx)?;
2763 if !is_truthy(&result) && !result.is_null() {
2764 let name = col.check_name.as_deref().unwrap_or(&col.name);
2765 return Err(SqlError::CheckViolation(name.to_string()));
2766 }
2767 }
2768 }
2769 for tc in &table_schema.check_constraints {
2770 let ctx = EvalCtx::new(col_map, new_row);
2771 let result = eval_expr(&tc.expr, &ctx)?;
2772 if !is_truthy(&result) && !result.is_null() {
2773 let name = tc.name.as_deref().unwrap_or(&tc.sql);
2774 return Err(SqlError::CheckViolation(name.to_string()));
2775 }
2776 }
2777 }
2778 let _ = has_fks;
2779
2780 value_values.clear();
2781 value_values.resize(phys_count, Value::Null);
2782 for &slot in dropped {
2783 value_values[slot as usize] = Value::Null;
2784 }
2785 for (j, &i) in non_pk.iter().enumerate() {
2786 value_values[enc_pos[j] as usize] = new_row[i].clone();
2787 }
2788 new_value_buf.clear();
2789 crate::encoding::encode_row_into(value_values, new_value_buf);
2790
2791 if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2792 return Err(SqlError::RowTooLarge {
2793 size: new_value_buf.len(),
2794 max: citadel_core::MAX_VALUE_SIZE,
2795 });
2796 }
2797
2798 if capture_returning {
2799 *captured.borrow_mut() = Some((old_row.clone(), new_row.clone()));
2800 }
2801 Ok(UpsertAction::Replace(new_value_buf.clone()))
2802 })
2803 })?;
2804
2805 match outcome {
2806 UpsertOutcome::Inserted => Ok(InsertRowOutcome::Inserted),
2807 UpsertOutcome::Updated => {
2808 if capture_returning {
2809 let (old, new) = captured.into_inner().ok_or_else(|| {
2810 SqlError::InvalidValue("DO UPDATE produced no captured rows".into())
2811 })?;
2812 Ok(InsertRowOutcome::Updated { old, new })
2813 } else {
2814 Ok(InsertRowOutcome::Inserted)
2815 }
2816 }
2817 UpsertOutcome::Skipped => Ok(InsertRowOutcome::Skipped),
2818 }
2819}
2820
2821fn fetch_unique_index_pk(
2822 wtx: &mut WriteTxn<'_>,
2823 table_schema: &TableSchema,
2824 index_idx: usize,
2825 row: &[Value],
2826) -> Result<Vec<u8>> {
2827 let idx = &table_schema.indices[index_idx];
2828 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2829 let indexed: Vec<Value> = idx
2830 .column_positions_iter()
2831 .map(|col_idx| row[col_idx as usize].clone())
2832 .collect();
2833 let key = crate::encoding::encode_composite_key(&indexed);
2834 let value = wtx
2835 .table_get(&idx_table, &key)
2836 .map_err(SqlError::Storage)?
2837 .ok_or_else(|| {
2838 SqlError::InvalidValue("unique index missing expected collision entry".into())
2839 })?;
2840 Ok(value)
2841}
2842
2843#[allow(clippy::too_many_arguments)]
2844fn apply_do_update(
2845 wtx: &mut WriteTxn<'_>,
2846 table_schema: &TableSchema,
2847 pk_key: &[u8],
2848 proposed_row: &[Value],
2849 assignments: &[(usize, Expr)],
2850 where_clause: Option<&Expr>,
2851 col_map: &ColumnMap,
2852 capture_returning: bool,
2853) -> Result<InsertRowOutcome> {
2854 let old_value = wtx
2855 .table_get(table_schema.name.as_bytes(), pk_key)
2856 .map_err(SqlError::Storage)?
2857 .ok_or_else(|| SqlError::InvalidValue("primary row missing for DO UPDATE target".into()))?;
2858 let old_row = decode_full_row(table_schema, pk_key, &old_value)?;
2859 apply_do_update_with_old_row(
2860 wtx,
2861 table_schema,
2862 pk_key,
2863 &old_row,
2864 proposed_row,
2865 assignments,
2866 where_clause,
2867 col_map,
2868 capture_returning,
2869 )
2870}
2871
2872#[allow(clippy::too_many_arguments)]
2873fn apply_do_update_with_old_row(
2874 wtx: &mut WriteTxn<'_>,
2875 table_schema: &TableSchema,
2876 old_pk_key: &[u8],
2877 old_row: &[Value],
2878 proposed_row: &[Value],
2879 assignments: &[(usize, Expr)],
2880 where_clause: Option<&Expr>,
2881 col_map: &ColumnMap,
2882 capture_returning: bool,
2883) -> Result<InsertRowOutcome> {
2884 if let Some(w) = where_clause {
2885 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2886 let result = eval_expr(w, &ctx)?;
2887 if result.is_null() || !is_truthy(&result) {
2888 return Ok(InsertRowOutcome::Skipped);
2889 }
2890 }
2891
2892 let mut new_row = old_row.to_vec();
2893 for (col_idx, expr) in assignments {
2894 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2895 let val = eval_expr(expr, &ctx)?;
2896 let col = &table_schema.columns[*col_idx];
2897 new_row[*col_idx] = if val.is_null() {
2898 Value::Null
2899 } else {
2900 let got = val.data_type();
2901 val.coerce_into(col.data_type)
2902 .ok_or_else(|| SqlError::TypeMismatch {
2903 expected: col.data_type.to_string(),
2904 got: got.to_string(),
2905 })?
2906 };
2907 }
2908
2909 for col in &table_schema.columns {
2910 if matches!(
2911 col.generated_kind,
2912 Some(crate::parser::GeneratedKind::Stored)
2913 ) {
2914 let val = eval_expr(
2915 col.generated_expr.as_ref().unwrap(),
2916 &EvalCtx::new(col_map, &new_row),
2917 )?;
2918 let pos = col.position as usize;
2919 new_row[pos] = if val.is_null() {
2920 if !col.nullable {
2921 return Err(SqlError::NotNullViolation(col.name.clone()));
2922 }
2923 Value::Null
2924 } else {
2925 let got = val.data_type();
2926 val.coerce_into(col.data_type)
2927 .ok_or_else(|| SqlError::TypeMismatch {
2928 expected: col.data_type.to_string(),
2929 got: got.to_string(),
2930 })?
2931 };
2932 }
2933 }
2934
2935 let pk_indices = table_schema.pk_indices();
2936 let assigned_pk = assignments.iter().any(|(ci, _)| pk_indices.contains(ci));
2937 let pk_changed = assigned_pk && pk_indices.iter().any(|&i| old_row[i] != new_row[i]);
2938
2939 for (assigned_idx, _) in assignments {
2940 let col = &table_schema.columns[*assigned_idx];
2941 if !col.nullable && new_row[col.position as usize].is_null() {
2942 return Err(SqlError::NotNullViolation(col.name.clone()));
2943 }
2944 }
2945 if table_schema.has_checks() {
2946 for col in &table_schema.columns {
2947 if let Some(ref check) = col.check_expr {
2948 let ctx = EvalCtx::new(col_map, &new_row);
2949 let result = eval_expr(check, &ctx)?;
2950 if !is_truthy(&result) && !result.is_null() {
2951 let name = col.check_name.as_deref().unwrap_or(&col.name);
2952 return Err(SqlError::CheckViolation(name.to_string()));
2953 }
2954 }
2955 }
2956 for tc in &table_schema.check_constraints {
2957 let ctx = EvalCtx::new(col_map, &new_row);
2958 let result = eval_expr(&tc.expr, &ctx)?;
2959 if !is_truthy(&result) && !result.is_null() {
2960 let name = tc.name.as_deref().unwrap_or(&tc.sql);
2961 return Err(SqlError::CheckViolation(name.to_string()));
2962 }
2963 }
2964 }
2965 for fk in &table_schema.foreign_keys {
2966 let changed = fk
2967 .columns
2968 .iter()
2969 .any(|&ci| old_row[ci as usize] != new_row[ci as usize]);
2970 if !changed {
2971 continue;
2972 }
2973 let any_null = fk.columns.iter().any(|&ci| new_row[ci as usize].is_null());
2974 if any_null {
2975 continue;
2976 }
2977 let fk_vals: Vec<Value> = fk
2978 .columns
2979 .iter()
2980 .map(|&ci| new_row[ci as usize].clone())
2981 .collect();
2982 let fk_key = crate::encoding::encode_composite_key(&fk_vals);
2983 if fk.deferrable && fk.initially_deferred {
2984 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
2985 wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
2986 fk_name: name,
2987 foreign_table: fk.foreign_table.as_bytes().to_vec(),
2988 parent_key: fk_key,
2989 });
2990 continue;
2991 }
2992 if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key) {
2993 let found = wtx
2994 .table_get(fk.foreign_table.as_bytes(), &fk_key)
2995 .map_err(SqlError::Storage)?;
2996 if found.is_none() {
2997 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
2998 return Err(SqlError::ForeignKeyViolation(name.to_string()));
2999 }
3000 wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key);
3001 }
3002 }
3003
3004 let has_indices = !table_schema.indices.is_empty();
3005 let old_pk_values: Vec<Value> = if has_indices || pk_changed {
3006 pk_indices.iter().map(|&i| old_row[i].clone()).collect()
3007 } else {
3008 Vec::new()
3009 };
3010 let new_pk_values: Vec<Value> = if has_indices || pk_changed {
3011 pk_indices.iter().map(|&i| new_row[i].clone()).collect()
3012 } else {
3013 Vec::new()
3014 };
3015
3016 let non_pk = table_schema.non_pk_indices();
3017 let enc_pos = table_schema.encoding_positions();
3018 let phys_count = table_schema.physical_non_pk_count();
3019 let dropped = table_schema.dropped_non_pk_slots();
3020 let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
3021 for &slot in dropped {
3022 value_values[slot as usize] = Value::Null;
3023 }
3024 for (j, &i) in non_pk.iter().enumerate() {
3025 let col = &table_schema.columns[i];
3026 value_values[enc_pos[j] as usize] = if matches!(
3027 col.generated_kind,
3028 Some(crate::parser::GeneratedKind::Virtual)
3029 ) {
3030 Value::Null
3031 } else {
3032 new_row[i].clone()
3033 };
3034 }
3035 let mut new_value_buf = Vec::with_capacity(256);
3036 crate::encoding::encode_row_into(&value_values, &mut new_value_buf);
3037
3038 if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
3039 return Err(SqlError::RowTooLarge {
3040 size: new_value_buf.len(),
3041 max: citadel_core::MAX_VALUE_SIZE,
3042 });
3043 }
3044
3045 if pk_changed {
3046 let new_pk_key = crate::encoding::encode_composite_key(&new_pk_values);
3047 let inserted = wtx
3048 .table_insert(table_schema.name.as_bytes(), &new_pk_key, &new_value_buf)
3049 .map_err(SqlError::Storage)?;
3050 if !inserted {
3051 return Err(SqlError::DuplicateKey);
3052 }
3053 wtx.table_delete(table_schema.name.as_bytes(), old_pk_key)
3054 .map_err(SqlError::Storage)?;
3055 for idx in &table_schema.indices {
3056 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
3057 let old_idx_key =
3058 encode_index_key_with_schema(idx, old_row, &old_pk_values, table_schema);
3059 wtx.table_delete(&idx_table, &old_idx_key)
3060 .map_err(SqlError::Storage)?;
3061 let new_idx_key =
3062 encode_index_key_with_schema(idx, &new_row, &new_pk_values, table_schema);
3063 let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
3064 let is_new = wtx
3065 .table_insert(&idx_table, &new_idx_key, &new_idx_val)
3066 .map_err(SqlError::Storage)?;
3067 if idx.unique && !is_new {
3068 let any_null = idx
3069 .column_positions_iter()
3070 .any(|c| new_row[c as usize].is_null());
3071 if !any_null {
3072 return Err(SqlError::UniqueViolation(idx.name.clone()));
3073 }
3074 }
3075 }
3076 } else {
3077 wtx.table_update_sorted(
3078 table_schema.name.as_bytes(),
3079 &[(old_pk_key, new_value_buf.as_slice())],
3080 )
3081 .map_err(SqlError::Storage)?;
3082 let col_map_partial = any_partial_index(table_schema).then(|| table_schema.column_map());
3083 for idx in &table_schema.indices {
3084 let cols_changed = index_columns_changed(idx, old_row, &new_row);
3085 let (del, ins) = partial_idx_update_actions(
3086 idx,
3087 old_row,
3088 &new_row,
3089 cols_changed,
3090 false,
3091 col_map_partial,
3092 );
3093 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
3094 if del {
3095 let old_idx_key =
3096 encode_index_key_with_schema(idx, old_row, &old_pk_values, table_schema);
3097 wtx.table_delete(&idx_table, &old_idx_key)
3098 .map_err(SqlError::Storage)?;
3099 }
3100 if ins {
3101 let new_idx_key =
3102 encode_index_key_with_schema(idx, &new_row, &new_pk_values, table_schema);
3103 let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
3104 let is_new = wtx
3105 .table_insert(&idx_table, &new_idx_key, &new_idx_val)
3106 .map_err(SqlError::Storage)?;
3107 if idx.unique && !is_new {
3108 let any_null = idx
3109 .column_positions_iter()
3110 .any(|c| new_row[c as usize].is_null());
3111 if !any_null {
3112 return Err(SqlError::UniqueViolation(idx.name.clone()));
3113 }
3114 }
3115 }
3116 }
3117 }
3118
3119 if capture_returning {
3120 Ok(InsertRowOutcome::Updated {
3121 old: old_row.to_vec(),
3122 new: new_row,
3123 })
3124 } else {
3125 Ok(InsertRowOutcome::Inserted)
3126 }
3127}
3128
3129fn detect_fast_paths(
3130 ts: &TableSchema,
3131 assignments: &[(usize, Expr)],
3132) -> Option<Vec<DoUpdateFastPath>> {
3133 let non_pk = ts.non_pk_indices();
3134 let enc_pos = ts.encoding_positions();
3135 let mut out = Vec::with_capacity(assignments.len());
3136 for (col_idx, expr) in assignments {
3137 let col = &ts.columns[*col_idx];
3138 if col.data_type != DataType::Integer {
3139 return None;
3140 }
3141 let nonpk_order = non_pk.iter().position(|&i| i == *col_idx)?;
3142 let phys_idx = enc_pos[nonpk_order] as usize;
3143
3144 if let Expr::BinaryOp { left, op, right } = expr {
3145 if !matches!(op, BinOp::Add | BinOp::Sub) {
3146 return None;
3147 }
3148 let reads_target =
3149 matches!(left.as_ref(), Expr::Column(n) if n.eq_ignore_ascii_case(&col.name));
3150 if !reads_target {
3151 return None;
3152 }
3153 if let Expr::Literal(Value::Integer(n)) = right.as_ref() {
3154 let delta = if matches!(op, BinOp::Sub) { -n } else { *n };
3155 let _ = col_idx;
3156 out.push(DoUpdateFastPath::IntAddConst { phys_idx, delta });
3157 continue;
3158 }
3159 return None;
3160 }
3161 return None;
3162 }
3163 Some(out)
3164}
3165
3166fn compile_on_conflict(oc: &OnConflictClause, ts: &TableSchema) -> Result<CompiledOnConflict> {
3167 let target = oc
3168 .target
3169 .as_ref()
3170 .map(|t| resolve_conflict_target(t, ts))
3171 .transpose()?;
3172 match &oc.action {
3173 OnConflictAction::DoNothing => Ok(CompiledOnConflict::DoNothing { target }),
3174 OnConflictAction::DoUpdate {
3175 assignments,
3176 where_clause,
3177 } => {
3178 let target = target.ok_or_else(|| {
3179 SqlError::Plan("ON CONFLICT without target requires DO NOTHING".into())
3180 })?;
3181 let compiled_assignments: Vec<(usize, Expr)> = assignments
3182 .iter()
3183 .map(|(name, expr)| {
3184 let col_idx = ts
3185 .column_index(name)
3186 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?;
3187 Ok((col_idx, expr.clone()))
3188 })
3189 .collect::<Result<_>>()?;
3190 let fast_paths = if where_clause.is_none() {
3191 detect_fast_paths(ts, &compiled_assignments)
3192 } else {
3193 None
3194 };
3195 Ok(CompiledOnConflict::DoUpdate {
3196 target,
3197 assignments: compiled_assignments,
3198 where_clause: where_clause.clone(),
3199 fast_paths,
3200 })
3201 }
3202 }
3203}
3204
3205fn exec_insert_trivial_fast(
3207 wtx: &mut WriteTxn<'_>,
3208 table_lower: &str,
3209 cache: &InsertCache,
3210 bufs: &mut InsertBufs,
3211 params: &[Value],
3212) -> Result<ExecutionResult> {
3213 let prog = cache
3214 .trivial_fast_program
3215 .as_ref()
3216 .expect("trivial fast: program");
3217
3218 for &p in &prog.not_null_param_indices {
3219 if params[p as usize].is_null() {
3220 return Err(SqlError::NotNullViolation(format!("param@{p}")));
3221 }
3222 }
3223
3224 match ¶ms[prog.pk_param as usize] {
3225 Value::Integer(v) => crate::encoding::encode_int_key_into(*v, &mut bufs.key_buf),
3226 _ => return Err(SqlError::InvalidValue("non-integer PK in fast path".into())),
3227 }
3228
3229 bufs.value_buf.clear();
3230 bufs.value_buf.extend_from_slice(&prog.template);
3231
3232 for op in &prog.ops {
3233 match op {
3234 WriteOp::ParamI64 { param_idx, off } => match ¶ms[*param_idx as usize] {
3235 Value::Integer(v) => {
3236 let off = *off as usize;
3237 bufs.value_buf[off..off + 8].copy_from_slice(&v.to_le_bytes());
3238 }
3239 other => {
3240 return Err(SqlError::TypeMismatch {
3241 expected: "Integer".into(),
3242 got: other.data_type().to_string(),
3243 });
3244 }
3245 },
3246 WriteOp::LiteralI64 { value, off } => {
3247 let off = *off as usize;
3248 bufs.value_buf[off..off + 8].copy_from_slice(&value.to_le_bytes());
3249 }
3250 WriteOp::GenAddParamsI64 {
3251 a_param,
3252 b_param,
3253 off,
3254 bitmap_byte_off,
3255 bitmap_bit_mask,
3256 } => match (¶ms[*a_param as usize], ¶ms[*b_param as usize]) {
3257 (Value::Integer(a), Value::Integer(b)) => {
3258 let off = *off as usize;
3259 bufs.value_buf[off..off + 8].copy_from_slice(&a.wrapping_add(*b).to_le_bytes());
3260 }
3261 _ => {
3262 bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
3263 }
3264 },
3265 WriteOp::GenMulAddParamI64 {
3266 param_idx,
3267 mul,
3268 add,
3269 off,
3270 bitmap_byte_off,
3271 bitmap_bit_mask,
3272 } => match ¶ms[*param_idx as usize] {
3273 Value::Integer(v) => {
3274 let r = v.wrapping_mul(*mul).wrapping_add(*add);
3275 let off = *off as usize;
3276 bufs.value_buf[off..off + 8].copy_from_slice(&r.to_le_bytes());
3277 }
3278 _ => {
3279 bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
3280 }
3281 },
3282 }
3283 }
3284
3285 let is_new = wtx
3286 .table_insert(table_lower.as_bytes(), &bufs.key_buf, &bufs.value_buf)
3287 .map_err(SqlError::Storage)?;
3288 if !is_new {
3289 return Err(SqlError::DuplicateKey);
3290 }
3291 Ok(ExecutionResult::RowsAffected(1))
3292}
3293
3294fn build_bind_plan(
3295 stmt: &InsertStmt,
3296 col_indices: &[usize],
3297 col_data_types: &[DataType],
3298) -> Option<Vec<BindAction>> {
3299 let rows = match &stmt.source {
3300 InsertSource::Values(rows) => rows,
3301 _ => return None,
3302 };
3303 if rows.len() != 1 {
3304 return None;
3305 }
3306 let value_row = &rows[0];
3307 if value_row.len() != col_indices.len() {
3308 return None;
3309 }
3310 let mut plan = Vec::with_capacity(value_row.len());
3311 for (i, expr) in value_row.iter().enumerate() {
3312 let col_idx = col_indices[i];
3313 let target = col_data_types[col_idx];
3314 match expr {
3315 Expr::Parameter(n) => {
3316 if *n == 0 {
3317 return None;
3318 }
3319 plan.push(BindAction::Param {
3320 param_idx: n - 1,
3321 col_idx,
3322 target,
3323 });
3324 }
3325 Expr::Literal(v) => plan.push(BindAction::Literal {
3326 value: v.clone(),
3327 col_idx,
3328 }),
3329 _ => return None,
3330 }
3331 }
3332 Some(plan)
3333}
3334
3335impl CompiledInsert {
3336 pub fn try_compile(schema: &SchemaManager, stmt: &InsertStmt) -> Option<Self> {
3337 let lower = stmt.table.to_ascii_lowercase();
3338 let cached = if let Some(ts) = schema.get(&lower) {
3339 let insert_columns: Vec<&str> = if stmt.columns.is_empty() {
3340 ts.columns.iter().map(|c| c.name.as_str()).collect()
3341 } else {
3342 stmt.columns.iter().map(|s| s.as_str()).collect()
3343 };
3344 let mut col_indices = Vec::with_capacity(insert_columns.len());
3345 for name in &insert_columns {
3346 col_indices.push(ts.column_index(name)?);
3347 }
3348 if col_indices
3349 .iter()
3350 .any(|&ci| ts.columns[ci].generated_kind.is_some())
3351 {
3352 return None;
3353 }
3354 let on_conflict = stmt
3355 .on_conflict
3356 .as_ref()
3357 .map(|oc| compile_on_conflict(oc, ts))
3358 .transpose()
3359 .ok()
3360 .flatten()
3361 .map(Arc::new);
3362 let generated_col_positions: Vec<usize> = ts
3363 .columns
3364 .iter()
3365 .enumerate()
3366 .filter_map(|(i, c)| {
3367 matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored))
3368 .then_some(i)
3369 })
3370 .collect();
3371 let generated_fast_evals: Vec<FastGenEval> = generated_col_positions
3372 .iter()
3373 .map(|&pos| {
3374 detect_fast_gen_eval(ts.columns[pos].generated_expr.as_ref().unwrap(), ts)
3375 })
3376 .collect();
3377 let row_col_map = if on_conflict.is_some() || !generated_col_positions.is_empty() {
3378 Some(ColumnMap::new(&ts.columns))
3379 } else {
3380 None
3381 };
3382 let pk_indices: Vec<usize> = ts.pk_indices().to_vec();
3383 let non_pk_indices: Vec<usize> = ts.non_pk_indices().to_vec();
3384 let encoding_positions: Vec<u16> = ts.encoding_positions().to_vec();
3385 let dropped_non_pk_slots: Vec<u16> = ts.dropped_non_pk_slots().to_vec();
3386 let phys_count = ts.physical_non_pk_count();
3387 let col_data_types: Vec<DataType> = ts.columns.iter().map(|c| c.data_type).collect();
3388 let single_int_pk =
3389 pk_indices.len() == 1 && ts.columns[pk_indices[0]].data_type == DataType::Integer;
3390 let not_null_indices: Vec<u16> = ts
3391 .columns
3392 .iter()
3393 .filter(|c| !c.nullable)
3394 .map(|c| c.position)
3395 .collect();
3396 let bind_plan = build_bind_plan(stmt, &col_indices, &col_data_types);
3397 let any_defaults_flag = ts.columns.iter().any(|c| c.default_expr.is_some());
3398 let row_fully_overwritten = if any_defaults_flag {
3399 false
3400 } else {
3401 let mut covered: rustc_hash::FxHashSet<usize> =
3402 col_indices.iter().copied().collect();
3403 covered.extend(generated_col_positions.iter().copied());
3404 for (j, &i) in non_pk_indices.iter().enumerate() {
3405 let _ = j;
3406 if matches!(
3407 ts.columns[i].generated_kind,
3408 Some(crate::parser::GeneratedKind::Virtual)
3409 ) {
3410 covered.insert(i);
3411 }
3412 }
3413 bind_plan.is_some() && covered.len() == ts.columns.len()
3414 };
3415 let has_fks = !ts.foreign_keys.is_empty();
3416 let has_indices = !ts.indices.is_empty();
3417 let mut non_virtual_pairs: Vec<(usize, usize)> = Vec::new();
3418 let mut null_value_slots: Vec<usize> =
3419 dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3420 for (j, &i) in non_pk_indices.iter().enumerate() {
3421 let slot = encoding_positions[j] as usize;
3422 if matches!(
3423 ts.columns[i].generated_kind,
3424 Some(crate::parser::GeneratedKind::Virtual)
3425 ) {
3426 null_value_slots.push(slot);
3427 } else {
3428 non_virtual_pairs.push((i, slot));
3429 }
3430 }
3431 let row_encoder = {
3432 let all_int_or_null = non_pk_indices.iter().enumerate().all(|(j, &i)| {
3433 let col = &ts.columns[i];
3434 if matches!(
3435 col.generated_kind,
3436 Some(crate::parser::GeneratedKind::Virtual)
3437 ) {
3438 true
3439 } else {
3440 col.data_type == DataType::Integer && encoding_positions[j] != u16::MAX
3441 }
3442 });
3443 if all_int_or_null {
3444 let mut null_slots: Vec<usize> =
3445 dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3446 for (j, &i) in non_pk_indices.iter().enumerate() {
3447 if matches!(
3448 ts.columns[i].generated_kind,
3449 Some(crate::parser::GeneratedKind::Virtual)
3450 ) {
3451 null_slots.push(encoding_positions[j] as usize);
3452 }
3453 }
3454 Some(crate::encoding::build_int_row_template(
3455 phys_count,
3456 &null_slots,
3457 ))
3458 } else {
3459 None
3460 }
3461 };
3462 let is_trivial_fast_eligible = !insert_has_subquery(stmt)
3463 && !ts.columns.iter().any(|c| c.default_expr.is_some())
3464 && !ts.has_checks()
3465 && !has_fks
3466 && !has_indices
3467 && stmt.on_conflict.is_none()
3468 && stmt.returning.is_none()
3469 && bind_plan.is_some()
3470 && row_encoder.is_some()
3471 && row_fully_overwritten
3472 && single_int_pk
3473 && generated_fast_evals
3474 .iter()
3475 .all(|fe| !matches!(fe, FastGenEval::None));
3476 let trivial_fast_program = if is_trivial_fast_eligible {
3477 build_trivial_fast_program(
3478 bind_plan.as_ref().unwrap(),
3479 row_encoder.as_ref().unwrap(),
3480 &non_virtual_pairs,
3481 &generated_col_positions,
3482 &generated_fast_evals,
3483 &pk_indices,
3484 &ts.columns,
3485 )
3486 } else {
3487 None
3488 };
3489 let is_trivial_fast = trivial_fast_program.is_some();
3490 let has_checks = ts.has_checks();
3491 let any_defaults = ts.columns.iter().any(|c| c.default_expr.is_some());
3492 let needs_scoped_params = bind_plan.is_none()
3493 || has_checks
3494 || any_defaults
3495 || !generated_col_positions.is_empty()
3496 || on_conflict.is_some()
3497 || stmt.returning.is_some()
3498 || insert_has_subquery(stmt)
3499 || super::helpers::any_partial_index(ts);
3500 Some(InsertCache {
3501 col_indices,
3502 has_subquery: insert_has_subquery(stmt),
3503 any_defaults,
3504 has_checks,
3505 on_conflict,
3506 row_col_map,
3507 generated_col_positions,
3508 generated_fast_evals,
3509 pk_indices,
3510 non_pk_indices,
3511 encoding_positions,
3512 dropped_non_pk_slots,
3513 phys_count,
3514 single_int_pk,
3515 not_null_indices,
3516 bind_plan,
3517 row_fully_overwritten,
3518 row_encoder,
3519 is_trivial_fast,
3520 trivial_fast_program,
3521 needs_scoped_params,
3522 })
3523 } else if schema.get_view(&lower).is_some() {
3524 None
3525 } else {
3526 return None;
3527 };
3528 Some(Self {
3529 table_lower: lower,
3530 cached,
3531 })
3532 }
3533}
3534
3535impl CompiledPlan for CompiledInsert {
3536 fn execute(
3537 &self,
3538 db: &Database,
3539 schema: &SchemaManager,
3540 stmt: &Statement,
3541 params: &[Value],
3542 txn: super::compile::ActiveTxnRef<'_, '_>,
3543 ) -> Result<ExecutionResult> {
3544 let ins = match stmt {
3545 Statement::Insert(i) => i,
3546 _ => {
3547 return Err(SqlError::Unsupported(
3548 "CompiledInsert received non-INSERT statement".into(),
3549 ))
3550 }
3551 };
3552 use super::compile::ActiveTxnRef;
3553 match txn {
3554 ActiveTxnRef::None => exec_insert(db, schema, ins, params),
3555 ActiveTxnRef::Read(_) => Err(SqlError::Unsupported(
3556 "cannot execute mutating statement inside a read-only transaction".into(),
3557 )),
3558 ActiveTxnRef::Write(outer) => match self.cached.as_ref() {
3559 Some(c) if c.is_trivial_fast => with_insert_scratch(|bufs| {
3560 exec_insert_trivial_fast(outer, &self.table_lower, c, bufs, params)
3561 }),
3562 Some(c) => exec_insert_in_txn_cached(outer, schema, ins, params, c),
3563 None => exec_insert_in_txn(outer, schema, ins, params),
3564 },
3565 }
3566 }
3567
3568 fn uses_scoped_params(&self) -> bool {
3569 match self.cached.as_ref() {
3570 Some(c) => !c.is_trivial_fast && c.needs_scoped_params,
3571 None => true,
3572 }
3573 }
3574}
3575
3576pub struct CompiledDelete {
3577 table_lower: String,
3578}
3579
3580impl CompiledDelete {
3581 pub fn try_compile(schema: &SchemaManager, stmt: &DeleteStmt) -> Option<Self> {
3582 let lower = stmt.table.to_ascii_lowercase();
3583 schema.get(&lower)?;
3584 Some(Self { table_lower: lower })
3585 }
3586}
3587
3588impl CompiledPlan for CompiledDelete {
3589 fn execute(
3590 &self,
3591 db: &Database,
3592 schema: &SchemaManager,
3593 stmt: &Statement,
3594 _params: &[Value],
3595 txn: super::compile::ActiveTxnRef<'_, '_>,
3596 ) -> Result<ExecutionResult> {
3597 let del = match stmt {
3598 Statement::Delete(d) => d,
3599 _ => {
3600 return Err(SqlError::Unsupported(
3601 "CompiledDelete received non-DELETE statement".into(),
3602 ))
3603 }
3604 };
3605 let _ = &self.table_lower;
3606 use super::compile::ActiveTxnRef;
3607 match txn {
3608 ActiveTxnRef::None => super::write::exec_delete(db, schema, del),
3609 ActiveTxnRef::Read(_) => Err(SqlError::Unsupported(
3610 "cannot execute mutating statement inside a read-only transaction".into(),
3611 )),
3612 ActiveTxnRef::Write(outer) => super::write::exec_delete_in_txn(outer, schema, del),
3613 }
3614 }
3615}
3616
3617fn exec_instead_of_view_insert_auto(
3618 db: &Database,
3619 schema: &SchemaManager,
3620 view_name: &str,
3621 aliases: &[String],
3622 stmt: &InsertStmt,
3623 params: &[Value],
3624) -> Result<ExecutionResult> {
3625 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
3626 let r = exec_instead_of_view_insert_in_txn(&mut wtx, schema, view_name, aliases, stmt, params)?;
3627 wtx.commit().map_err(SqlError::Storage)?;
3628 Ok(r)
3629}
3630
3631fn exec_instead_of_view_insert_in_txn(
3632 wtx: &mut WriteTxn<'_>,
3633 schema: &SchemaManager,
3634 view_name: &str,
3635 aliases: &[String],
3636 stmt: &InsertStmt,
3637 params: &[Value],
3638) -> Result<ExecutionResult> {
3639 let resolved_aliases: Vec<String> = if aliases.is_empty() {
3641 derive_view_columns(wtx, schema, view_name)?
3642 } else {
3643 aliases.to_vec()
3644 };
3645 let view_cols = super::triggers::view_columns_from_aliases(&resolved_aliases);
3646 let alias_map: rustc_hash::FxHashMap<String, usize> = resolved_aliases
3647 .iter()
3648 .enumerate()
3649 .map(|(i, name)| (name.to_ascii_lowercase(), i))
3650 .collect();
3651
3652 let target_positions: Vec<usize> = if stmt.columns.is_empty() {
3653 (0..resolved_aliases.len()).collect()
3654 } else {
3655 stmt.columns
3656 .iter()
3657 .map(|c| {
3658 alias_map
3659 .get(&c.to_ascii_lowercase())
3660 .copied()
3661 .ok_or_else(|| SqlError::ColumnNotFound(c.clone()))
3662 })
3663 .collect::<Result<_>>()?
3664 };
3665
3666 let source_rows: Vec<Vec<Value>> = match &stmt.source {
3667 InsertSource::Values(rows) => {
3668 let mut out = Vec::with_capacity(rows.len());
3669 for row in rows {
3670 if row.len() != target_positions.len() {
3671 return Err(SqlError::InvalidValue(format!(
3672 "expected {} values, got {}",
3673 target_positions.len(),
3674 row.len()
3675 )));
3676 }
3677 let mut vals = Vec::with_capacity(row.len());
3678 for expr in row {
3679 let v = match expr {
3680 Expr::Parameter(n) => params
3681 .get(n - 1)
3682 .cloned()
3683 .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
3684 Expr::Literal(v) => v.clone(),
3685 other => eval_const_expr(other)?,
3686 };
3687 vals.push(v);
3688 }
3689 out.push(vals);
3690 }
3691 out
3692 }
3693 InsertSource::Select(sq) => {
3694 let empty_ctes = CteContext::default();
3695 let qr = exec_query_body_write(wtx, schema, &sq.body, &empty_ctes)?;
3696 qr.rows
3697 }
3698 };
3699
3700 let mut count: u64 = 0;
3701 for row in source_rows {
3702 if row.len() != target_positions.len() {
3703 return Err(SqlError::InvalidValue(format!(
3704 "expected {} values, got {}",
3705 target_positions.len(),
3706 row.len()
3707 )));
3708 }
3709 let mut new_row = vec![Value::Null; resolved_aliases.len()];
3710 for (slot, val) in target_positions.iter().zip(row) {
3711 new_row[*slot] = val;
3712 }
3713 super::triggers::fire_row_triggers(
3714 wtx,
3715 schema,
3716 view_name,
3717 crate::parser::TriggerTiming::InsteadOf,
3718 super::triggers::FireEvent::Insert,
3719 None,
3720 Some(new_row),
3721 &view_cols,
3722 )?;
3723 count += 1;
3724 }
3725 Ok(ExecutionResult::RowsAffected(count))
3726}
3727
3728fn derive_view_columns(
3729 wtx: &mut WriteTxn<'_>,
3730 schema: &SchemaManager,
3731 view_name: &str,
3732) -> Result<Vec<String>> {
3733 use crate::parser::{QueryBody, SelectColumn, SelectQuery, SelectStmt};
3734 let sel = SelectStmt {
3735 columns: vec![SelectColumn::AllColumns],
3736 from: view_name.to_string(),
3737 from_alias: None,
3738 from_subquery: None,
3739 from_args: None,
3740 from_json_table: None,
3741 joins: vec![],
3742 distinct: false,
3743 where_clause: None,
3744 order_by: vec![],
3745 limit: Some(Expr::Literal(Value::Integer(1))),
3746 offset: None,
3747 group_by: vec![],
3748 having: None,
3749 };
3750 let sq = SelectQuery {
3751 ctes: vec![],
3752 recursive: false,
3753 body: QueryBody::Select(Box::new(sel)),
3754 };
3755 let qr = super::cte::exec_select_query_in_txn(wtx, schema, &sq)?;
3756 match qr {
3757 ExecutionResult::Query(q) => Ok(q.columns),
3758 _ => Ok(Vec::new()),
3759 }
3760}
3761
3762#[cfg(test)]
3763#[path = "dml_tests.rs"]
3764mod tests;