1use std::cell::RefCell;
2use std::sync::Arc;
3
4use citadel::Database;
5use citadel_buffer::btree::{UpsertAction, UpsertOutcome};
6use citadel_txn::read_txn::ReadTxn;
7use citadel_txn::write_txn::WriteTxn;
8use rustc_hash::FxHashMap;
9
10use crate::encoding::{encode_composite_key_into, encode_row_into};
11use crate::error::{Result, SqlError};
12use crate::eval::{eval_expr, is_truthy, ColumnMap, EvalCtx};
13use crate::parser::*;
14use crate::types::*;
15
16use crate::schema::SchemaManager;
17
18use super::compile::CompiledPlan;
19use super::helpers::*;
20use super::CteContext;
21
22pub(super) fn exec_insert(
23 db: &Database,
24 schema: &SchemaManager,
25 stmt: &InsertStmt,
26 params: &[Value],
27) -> Result<ExecutionResult> {
28 let empty_ctes = CteContext::default();
29 let materialized;
30 let stmt = if insert_has_subquery(stmt) {
31 materialized = materialize_insert(stmt, &mut |sub| {
32 exec_subquery_read(db, schema, sub, &empty_ctes)
33 })?;
34 &materialized
35 } else {
36 stmt
37 };
38
39 let lower_name = stmt.table.to_ascii_lowercase();
40 if let Some(view_def) = schema.get_view(&lower_name) {
41 if super::triggers::has_instead_of(schema, &lower_name, super::triggers::FireEvent::Insert)
42 {
43 let aliases = view_def.column_aliases.clone();
44 return exec_instead_of_view_insert_auto(
45 db,
46 schema,
47 &lower_name,
48 &aliases,
49 stmt,
50 params,
51 );
52 }
53 return Err(SqlError::CannotModifyView(stmt.table.clone()));
54 }
55 if schema.get_matview(&lower_name).is_some() {
56 return Err(SqlError::CannotModifyView(format!(
57 "materialized view '{}' is read-only — use REFRESH MATERIALIZED VIEW",
58 stmt.table
59 )));
60 }
61 let table_schema = schema
62 .get(&lower_name)
63 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
64 schema.mark_dml(&table_schema.name);
65
66 let insert_columns = if stmt.columns.is_empty() {
67 table_schema
68 .columns
69 .iter()
70 .map(|c| c.name.clone())
71 .collect::<Vec<_>>()
72 } else {
73 stmt.columns
74 .iter()
75 .map(|c| c.to_ascii_lowercase())
76 .collect()
77 };
78
79 let col_indices: Vec<usize> = insert_columns
80 .iter()
81 .map(|name| {
82 table_schema
83 .column_index(name)
84 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
85 })
86 .collect::<Result<_>>()?;
87
88 for &ci in &col_indices {
89 if table_schema.columns[ci].generated_kind.is_some() {
90 return Err(SqlError::CannotInsertIntoGeneratedColumn(
91 table_schema.columns[ci].name.clone(),
92 ));
93 }
94 }
95
96 let defaults: Vec<(usize, &Expr)> = table_schema
97 .columns
98 .iter()
99 .filter(|c| c.default_expr.is_some() && !col_indices.contains(&(c.position as usize)))
100 .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
101 .collect();
102
103 let generated_cols: Vec<(usize, &Expr)> = table_schema
104 .columns
105 .iter()
106 .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
107 .map(|c| (c.position as usize, c.generated_expr.as_ref().unwrap()))
108 .collect();
109
110 let has_checks = table_schema.has_checks();
111 let strict = table_schema.is_strict();
112 let row_col_map_for_gen = if !generated_cols.is_empty() {
113 Some(ColumnMap::new(&table_schema.columns))
114 } else {
115 None
116 };
117 let check_col_map = if has_checks {
118 Some(ColumnMap::new(&table_schema.columns))
119 } else {
120 None
121 };
122
123 let select_rows = match &stmt.source {
124 InsertSource::Select(sq) => {
125 let insert_ctes =
126 super::materialize_all_ctes(&sq.ctes, sq.recursive, &mut |body, ctx| {
127 exec_query_body_read(db, schema, body, ctx)
128 })?;
129 let qr = exec_query_body_read(db, schema, &sq.body, &insert_ctes)?;
130 Some(qr.rows)
131 }
132 InsertSource::Values(_) => None,
133 };
134
135 let compiled_conflict: Option<Arc<CompiledOnConflict>> = stmt
136 .on_conflict
137 .as_ref()
138 .map(|oc| compile_on_conflict(oc, table_schema).map(Arc::new))
139 .transpose()?;
140
141 let row_col_map = compiled_conflict
142 .as_ref()
143 .map(|_| ColumnMap::new(&table_schema.columns));
144
145 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
146 super::ann_persist::purge_segment(&mut wtx, &table_schema.name)?;
150 let mut count: u64 = 0;
151 let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
152 stmt.returning.as_ref().map(|_| Vec::new());
153
154 let pk_indices = table_schema.pk_indices();
155 let non_pk = table_schema.non_pk_indices();
156 let enc_pos = table_schema.encoding_positions();
157 let phys_count = table_schema.physical_non_pk_count();
158 let mut row = vec![Value::Null; table_schema.columns.len()];
159 let mut pk_values: Vec<Value> = vec![Value::Null; pk_indices.len()];
160 let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
161 let mut key_buf: Vec<u8> = Vec::with_capacity(64);
162 let mut value_buf: Vec<u8> = Vec::with_capacity(256);
163 let mut fk_key_buf: Vec<u8> = Vec::with_capacity(64);
164
165 let values = match &stmt.source {
166 InsertSource::Values(rows) => Some(rows.as_slice()),
167 InsertSource::Select(_) => None,
168 };
169 let sel_rows = select_rows.as_deref();
170
171 let total = match (values, sel_rows) {
172 (Some(rows), _) => rows.len(),
173 (_, Some(rows)) => rows.len(),
174 _ => 0,
175 };
176
177 if let Some(sel) = sel_rows {
178 if !sel.is_empty() && sel[0].len() != insert_columns.len() {
179 return Err(SqlError::InvalidValue(format!(
180 "INSERT ... SELECT column count mismatch: expected {}, got {}",
181 insert_columns.len(),
182 sel[0].len()
183 )));
184 }
185 }
186
187 let has_insert_statement_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
188 t.enabled
189 && t.granularity == crate::parser::TriggerGranularity::ForEachStatement
190 && t.events
191 .iter()
192 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
193 });
194 let mut stmt_new_rows: Vec<Vec<Value>> = if has_insert_statement_triggers {
195 Vec::with_capacity(total)
196 } else {
197 Vec::new()
198 };
199
200 if has_insert_statement_triggers {
201 super::triggers::fire_statement_triggers(
202 &mut wtx,
203 schema,
204 &table_schema.name,
205 crate::parser::TriggerTiming::Before,
206 super::triggers::FireEvent::Insert,
207 &table_schema.columns,
208 &[],
209 &[],
210 )?;
211 }
212
213 for idx in 0..total {
214 for v in row.iter_mut() {
215 *v = Value::Null;
216 }
217
218 if let Some(value_rows) = values {
219 let value_row = &value_rows[idx];
220 if value_row.len() != insert_columns.len() {
221 return Err(SqlError::InvalidValue(format!(
222 "expected {} values, got {}",
223 insert_columns.len(),
224 value_row.len()
225 )));
226 }
227 for (i, expr) in value_row.iter().enumerate() {
228 let val = if let Expr::Parameter(n) = expr {
229 params
230 .get(n - 1)
231 .cloned()
232 .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?
233 } else {
234 eval_const_expr(expr)?
235 };
236 let col_idx = col_indices[i];
237 let col = &table_schema.columns[col_idx];
238 row[col_idx] = if val.is_null() {
239 Value::Null
240 } else {
241 coerce_for_column(val, col, strict)?
242 };
243 }
244 } else if let Some(sel) = sel_rows {
245 let sel_row = &sel[idx];
246 for (i, val) in sel_row.iter().enumerate() {
247 let col_idx = col_indices[i];
248 let col = &table_schema.columns[col_idx];
249 row[col_idx] = if val.is_null() {
250 Value::Null
251 } else {
252 coerce_for_column(val.clone(), col, strict)?
253 };
254 }
255 }
256
257 for &(pos, def_expr) in &defaults {
258 let val = eval_const_expr(def_expr)?;
259 let col = &table_schema.columns[pos];
260 if !val.is_null() {
261 row[pos] = coerce_for_column(val, col, strict)?;
262 }
263 }
264
265 if let Some(ref gen_map) = row_col_map_for_gen {
266 for &(pos, gen_expr) in &generated_cols {
267 let val = eval_expr(gen_expr, &EvalCtx::new(gen_map, &row))?;
268 let col = &table_schema.columns[pos];
269 row[pos] = if val.is_null() {
270 Value::Null
271 } else {
272 coerce_for_column(val, col, strict)?
273 };
274 }
275 }
276
277 for col in &table_schema.columns {
278 if !col.nullable && row[col.position as usize].is_null() {
279 return Err(SqlError::NotNullViolation(col.name.clone()));
280 }
281 }
282
283 if let Some(ref col_map) = check_col_map {
284 for col in &table_schema.columns {
285 if let Some(ref check) = col.check_expr {
286 let result = eval_expr(check, &EvalCtx::new(col_map, &row))?;
287 if !is_truthy(&result) && !result.is_null() {
288 let name = col.check_name.as_deref().unwrap_or(&col.name);
289 return Err(SqlError::CheckViolation(name.to_string()));
290 }
291 }
292 }
293 for tc in &table_schema.check_constraints {
294 let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &row))?;
295 if !is_truthy(&result) && !result.is_null() {
296 let name = tc.name.as_deref().unwrap_or(&tc.sql);
297 return Err(SqlError::CheckViolation(name.to_string()));
298 }
299 }
300 }
301
302 for fk in &table_schema.foreign_keys {
303 let any_null = fk.columns.iter().any(|&ci| row[ci as usize].is_null());
304 if any_null {
305 continue; }
307 let fk_vals: Vec<Value> = fk
308 .columns
309 .iter()
310 .map(|&ci| row[ci as usize].clone())
311 .collect();
312 fk_key_buf.clear();
313 encode_composite_key_into(&fk_vals, &mut fk_key_buf);
314 if fk.deferrable && fk.initially_deferred {
315 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
316 wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
317 fk_name: name,
318 foreign_table: fk.foreign_table.as_bytes().to_vec(),
319 parent_key: fk_key_buf.clone(),
320 });
321 continue;
322 }
323 if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key_buf) {
324 let found = wtx
325 .table_get(fk.foreign_table.as_bytes(), &fk_key_buf)
326 .map_err(SqlError::Storage)?;
327 if found.is_none() {
328 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
329 return Err(SqlError::ForeignKeyViolation(name.to_string()));
330 }
331 wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key_buf);
332 }
333 }
334
335 let proposed_row_for_returning: Option<Vec<Value>> =
336 returning_rows.as_ref().map(|_| row.clone());
337 let row_for_stmt_trigger: Option<Vec<Value>> = if has_insert_statement_triggers {
338 Some(row.clone())
339 } else {
340 None
341 };
342
343 let has_before_insert_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
344 t.enabled
345 && t.timing == crate::parser::TriggerTiming::Before
346 && t.granularity == crate::parser::TriggerGranularity::ForEachRow
347 && t.events
348 .iter()
349 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
350 });
351 if has_before_insert_triggers {
352 super::triggers::fire_row_triggers(
353 &mut wtx,
354 schema,
355 &table_schema.name,
356 crate::parser::TriggerTiming::Before,
357 super::triggers::FireEvent::Insert,
358 None,
359 Some(row.clone()),
360 &table_schema.columns,
361 )?;
362 }
363
364 for (j, &i) in pk_indices.iter().enumerate() {
365 pk_values[j] = std::mem::replace(&mut row[i], Value::Null);
366 }
367 encode_composite_key_into(&pk_values, &mut key_buf);
368
369 for (j, &i) in non_pk.iter().enumerate() {
370 let col = &table_schema.columns[i];
371 if matches!(
372 col.generated_kind,
373 Some(crate::parser::GeneratedKind::Virtual)
374 ) {
375 value_values[enc_pos[j] as usize] = Value::Null;
376 row[i] = Value::Null;
377 } else {
378 value_values[enc_pos[j] as usize] = std::mem::replace(&mut row[i], Value::Null);
379 }
380 }
381 encode_row_into(&value_values, &mut value_buf);
382
383 if key_buf.len() > citadel_core::MAX_KEY_SIZE {
384 return Err(SqlError::KeyTooLarge {
385 size: key_buf.len(),
386 max: citadel_core::MAX_KEY_SIZE,
387 });
388 }
389 if value_buf.len() > citadel_core::MAX_VALUE_SIZE {
390 return Err(SqlError::RowTooLarge {
391 size: value_buf.len(),
392 max: citadel_core::MAX_VALUE_SIZE,
393 });
394 }
395
396 match compiled_conflict.as_ref() {
397 None => {
398 let is_new = wtx
399 .table_insert(table_schema.name.as_bytes(), &key_buf, &value_buf)
400 .map_err(SqlError::Storage)?;
401 if !is_new {
402 return Err(SqlError::DuplicateKey);
403 }
404 let has_after_insert_triggers =
405 schema.triggers_for(&table_schema.name).iter().any(|t| {
406 t.enabled
407 && t.timing == crate::parser::TriggerTiming::After
408 && t.granularity == crate::parser::TriggerGranularity::ForEachRow
409 && t.events
410 .iter()
411 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
412 });
413 if !table_schema.indices.is_empty() || has_after_insert_triggers {
414 for (j, &i) in pk_indices.iter().enumerate() {
415 row[i] = pk_values[j].clone();
416 }
417 for (j, &i) in non_pk.iter().enumerate() {
418 row[i] =
419 std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
420 }
421 if !table_schema.indices.is_empty() {
422 insert_index_entries(&mut wtx, table_schema, &row, &pk_values)?;
423 }
424 if has_after_insert_triggers {
425 super::triggers::fire_row_triggers(
426 &mut wtx,
427 schema,
428 &table_schema.name,
429 crate::parser::TriggerTiming::After,
430 super::triggers::FireEvent::Insert,
431 None,
432 Some(row.clone()),
433 &table_schema.columns,
434 )?;
435 }
436 }
437 if let Some(r) = row_for_stmt_trigger.clone() {
438 stmt_new_rows.push(r);
439 }
440 count += 1;
441 if let Some(buf) = returning_rows.as_mut() {
442 buf.push((None, proposed_row_for_returning));
443 }
444 }
445 Some(oc) => {
446 let oc_ref: &CompiledOnConflict = oc;
447 let needs_row = upsert_needs_row(oc_ref, table_schema);
448 if needs_row {
449 for (j, &i) in pk_indices.iter().enumerate() {
450 row[i] = pk_values[j].clone();
451 }
452 for (j, &i) in non_pk.iter().enumerate() {
453 row[i] =
454 std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
455 }
456 }
457 let outcome = apply_insert_with_conflict(
458 &mut wtx,
459 table_schema,
460 &key_buf,
461 &value_buf,
462 &row,
463 &pk_values,
464 oc_ref,
465 row_col_map.as_ref().unwrap(),
466 stmt.returning.is_some(),
467 )?;
468 match outcome {
469 InsertRowOutcome::Inserted => {
470 count += 1;
471 if let Some(buf) = returning_rows.as_mut() {
472 buf.push((None, proposed_row_for_returning));
473 }
474 if let Some(r) = row_for_stmt_trigger.clone() {
475 stmt_new_rows.push(r);
476 }
477 let has_after_insert_triggers =
478 schema.triggers_for(&table_schema.name).iter().any(|t| {
479 t.enabled
480 && t.timing == crate::parser::TriggerTiming::After
481 && t.granularity
482 == crate::parser::TriggerGranularity::ForEachRow
483 && t.events
484 .iter()
485 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
486 });
487 if has_after_insert_triggers {
488 super::triggers::fire_row_triggers(
489 &mut wtx,
490 schema,
491 &table_schema.name,
492 crate::parser::TriggerTiming::After,
493 super::triggers::FireEvent::Insert,
494 None,
495 Some(row.clone()),
496 &table_schema.columns,
497 )?;
498 }
499 }
500 InsertRowOutcome::Updated { old, new } => {
501 count += 1;
502 if let Some(buf) = returning_rows.as_mut() {
503 buf.push((Some(old.clone()), Some(new.clone())));
504 }
505 let has_after_update_triggers =
506 schema.triggers_for(&table_schema.name).iter().any(|t| {
507 t.enabled
508 && t.timing == crate::parser::TriggerTiming::After
509 && t.granularity
510 == crate::parser::TriggerGranularity::ForEachRow
511 && t.events.iter().any(|e| {
512 matches!(e, crate::parser::TriggerEvent::Update(_))
513 })
514 });
515 if has_after_update_triggers {
516 let changed_cols: Vec<String> = match oc_ref {
517 CompiledOnConflict::DoUpdate { assignments, .. } => assignments
518 .iter()
519 .map(|(col_idx, _)| table_schema.columns[*col_idx].name.clone())
520 .collect(),
521 _ => Vec::new(),
522 };
523 super::triggers::fire_row_triggers(
524 &mut wtx,
525 schema,
526 &table_schema.name,
527 crate::parser::TriggerTiming::After,
528 super::triggers::FireEvent::Update {
529 changed_columns: &changed_cols,
530 },
531 Some(old),
532 Some(new),
533 &table_schema.columns,
534 )?;
535 }
536 }
537 InsertRowOutcome::Skipped => {}
538 }
539 }
540 }
541 }
542
543 if has_insert_statement_triggers {
544 super::triggers::fire_statement_triggers(
545 &mut wtx,
546 schema,
547 &table_schema.name,
548 crate::parser::TriggerTiming::After,
549 super::triggers::FireEvent::Insert,
550 &table_schema.columns,
551 &[],
552 &stmt_new_rows,
553 )?;
554 }
555
556 if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
557 let qr = super::helpers::project_returning(table_schema, returning_cols, &rows)?;
558 super::helpers::drain_deferred_fk_checks(&mut wtx)?;
559 wtx.commit().map_err(SqlError::Storage)?;
560 return Ok(ExecutionResult::Query(qr));
561 }
562
563 super::helpers::drain_deferred_fk_checks(&mut wtx)?;
564 wtx.commit().map_err(SqlError::Storage)?;
565 Ok(ExecutionResult::RowsAffected(count))
566}
567
568pub(super) fn has_subquery(expr: &Expr) -> bool {
569 crate::parser::has_subquery(expr)
570}
571
572pub(super) fn stmt_has_subquery(stmt: &SelectStmt) -> bool {
573 if let Some(ref w) = stmt.where_clause {
574 if has_subquery(w) {
575 return true;
576 }
577 }
578 if let Some(ref h) = stmt.having {
579 if has_subquery(h) {
580 return true;
581 }
582 }
583 for col in &stmt.columns {
584 if let SelectColumn::Expr { expr, .. } = col {
585 if has_subquery(expr) {
586 return true;
587 }
588 }
589 }
590 for ob in &stmt.order_by {
591 if has_subquery(&ob.expr) {
592 return true;
593 }
594 }
595 for join in &stmt.joins {
596 if let Some(ref on_expr) = join.on_clause {
597 if has_subquery(on_expr) {
598 return true;
599 }
600 }
601 }
602 false
603}
604
605pub(super) fn materialize_expr(
606 expr: &Expr,
607 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
608) -> Result<Expr> {
609 match expr {
610 Expr::InSubquery {
611 expr: e,
612 subquery,
613 negated,
614 } => {
615 let inner = materialize_expr(e, exec_sub)?;
616 let qr = exec_sub(subquery)?;
617 if !qr.columns.is_empty() && qr.columns.len() != 1 {
618 return Err(SqlError::SubqueryMultipleColumns);
619 }
620 let mut values = rustc_hash::FxHashSet::default();
621 let mut has_null = false;
622 for row in &qr.rows {
623 if row[0].is_null() {
624 has_null = true;
625 } else {
626 values.insert(row[0].clone());
627 }
628 }
629 Ok(Expr::InSet {
630 expr: Box::new(inner),
631 values,
632 has_null,
633 negated: *negated,
634 })
635 }
636 Expr::ScalarSubquery(subquery) => {
637 let qr = exec_sub(subquery)?;
638 if qr.rows.len() > 1 {
639 return Err(SqlError::SubqueryMultipleRows);
640 }
641 let val = if qr.rows.is_empty() {
642 Value::Null
643 } else {
644 qr.rows[0][0].clone()
645 };
646 Ok(Expr::Literal(val))
647 }
648 Expr::Exists { subquery, negated } => {
649 let qr = exec_sub(subquery)?;
650 let exists = !qr.rows.is_empty();
651 let result = if *negated { !exists } else { exists };
652 Ok(Expr::Literal(Value::Boolean(result)))
653 }
654 Expr::InList {
655 expr: e,
656 list,
657 negated,
658 } => {
659 let inner = materialize_expr(e, exec_sub)?;
660 let items = list
661 .iter()
662 .map(|item| materialize_expr(item, exec_sub))
663 .collect::<Result<Vec<_>>>()?;
664 Ok(Expr::InList {
665 expr: Box::new(inner),
666 list: items,
667 negated: *negated,
668 })
669 }
670 Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
671 left: Box::new(materialize_expr(left, exec_sub)?),
672 op: *op,
673 right: Box::new(materialize_expr(right, exec_sub)?),
674 }),
675 Expr::UnaryOp { op, expr: e } => Ok(Expr::UnaryOp {
676 op: *op,
677 expr: Box::new(materialize_expr(e, exec_sub)?),
678 }),
679 Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(materialize_expr(e, exec_sub)?))),
680 Expr::IsNotNull(e) => Ok(Expr::IsNotNull(Box::new(materialize_expr(e, exec_sub)?))),
681 Expr::InSet {
682 expr: e,
683 values,
684 has_null,
685 negated,
686 } => Ok(Expr::InSet {
687 expr: Box::new(materialize_expr(e, exec_sub)?),
688 values: values.clone(),
689 has_null: *has_null,
690 negated: *negated,
691 }),
692 Expr::Between {
693 expr: e,
694 low,
695 high,
696 negated,
697 } => Ok(Expr::Between {
698 expr: Box::new(materialize_expr(e, exec_sub)?),
699 low: Box::new(materialize_expr(low, exec_sub)?),
700 high: Box::new(materialize_expr(high, exec_sub)?),
701 negated: *negated,
702 }),
703 Expr::Like {
704 expr: e,
705 pattern,
706 escape,
707 negated,
708 } => {
709 let esc = escape
710 .as_ref()
711 .map(|es| materialize_expr(es, exec_sub).map(Box::new))
712 .transpose()?;
713 Ok(Expr::Like {
714 expr: Box::new(materialize_expr(e, exec_sub)?),
715 pattern: Box::new(materialize_expr(pattern, exec_sub)?),
716 escape: esc,
717 negated: *negated,
718 })
719 }
720 Expr::Case {
721 operand,
722 conditions,
723 else_result,
724 } => {
725 let op = operand
726 .as_ref()
727 .map(|e| materialize_expr(e, exec_sub).map(Box::new))
728 .transpose()?;
729 let conds = conditions
730 .iter()
731 .map(|(c, r)| {
732 Ok((
733 materialize_expr(c, exec_sub)?,
734 materialize_expr(r, exec_sub)?,
735 ))
736 })
737 .collect::<Result<Vec<_>>>()?;
738 let else_r = else_result
739 .as_ref()
740 .map(|e| materialize_expr(e, exec_sub).map(Box::new))
741 .transpose()?;
742 Ok(Expr::Case {
743 operand: op,
744 conditions: conds,
745 else_result: else_r,
746 })
747 }
748 Expr::Coalesce(args) => {
749 let materialized = args
750 .iter()
751 .map(|a| materialize_expr(a, exec_sub))
752 .collect::<Result<Vec<_>>>()?;
753 Ok(Expr::Coalesce(materialized))
754 }
755 Expr::Cast { expr: e, data_type } => Ok(Expr::Cast {
756 expr: Box::new(materialize_expr(e, exec_sub)?),
757 data_type: *data_type,
758 }),
759 Expr::Function {
760 name,
761 args,
762 distinct,
763 } => {
764 let materialized = args
765 .iter()
766 .map(|a| materialize_expr(a, exec_sub))
767 .collect::<Result<Vec<_>>>()?;
768 Ok(Expr::Function {
769 name: name.clone(),
770 args: materialized,
771 distinct: *distinct,
772 })
773 }
774 other => Ok(other.clone()),
775 }
776}
777
778pub(super) fn materialize_stmt(
779 stmt: &SelectStmt,
780 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
781) -> Result<SelectStmt> {
782 let where_clause = stmt
783 .where_clause
784 .as_ref()
785 .map(|e| materialize_expr(e, exec_sub))
786 .transpose()?;
787 let having = stmt
788 .having
789 .as_ref()
790 .map(|e| materialize_expr(e, exec_sub))
791 .transpose()?;
792 let columns = stmt
793 .columns
794 .iter()
795 .map(|c| match c {
796 SelectColumn::AllColumns => Ok(SelectColumn::AllColumns),
797 SelectColumn::AllFromOld => Ok(SelectColumn::AllFromOld),
798 SelectColumn::AllFromNew => Ok(SelectColumn::AllFromNew),
799 SelectColumn::Expr { expr, alias } => Ok(SelectColumn::Expr {
800 expr: materialize_expr(expr, exec_sub)?,
801 alias: alias.clone(),
802 }),
803 })
804 .collect::<Result<Vec<_>>>()?;
805 let order_by = stmt
806 .order_by
807 .iter()
808 .map(|ob| {
809 Ok(OrderByItem {
810 expr: materialize_expr(&ob.expr, exec_sub)?,
811 descending: ob.descending,
812 nulls_first: ob.nulls_first,
813 })
814 })
815 .collect::<Result<Vec<_>>>()?;
816 let joins = stmt
817 .joins
818 .iter()
819 .map(|j| {
820 let on_clause = j
821 .on_clause
822 .as_ref()
823 .map(|e| materialize_expr(e, exec_sub))
824 .transpose()?;
825 Ok(JoinClause {
826 join_type: j.join_type,
827 table: j.table.clone(),
828 subquery: j.subquery.clone(),
829 on_clause,
830 })
831 })
832 .collect::<Result<Vec<_>>>()?;
833 let group_by = stmt
834 .group_by
835 .iter()
836 .map(|e| materialize_expr(e, exec_sub))
837 .collect::<Result<Vec<_>>>()?;
838 Ok(SelectStmt {
839 columns,
840 from: stmt.from.clone(),
841 from_alias: stmt.from_alias.clone(),
842 from_subquery: stmt.from_subquery.clone(),
843 from_args: stmt.from_args.clone(),
844 from_json_table: stmt.from_json_table.clone(),
845 joins,
846 distinct: stmt.distinct,
847 where_clause,
848 order_by,
849 limit: stmt.limit.clone(),
850 offset: stmt.offset.clone(),
851 group_by,
852 having,
853 })
854}
855
856pub(super) fn exec_subquery_read(
857 db: &Database,
858 schema: &SchemaManager,
859 stmt: &SelectStmt,
860 ctes: &CteContext,
861) -> Result<QueryResult> {
862 let mut rtx = db.begin_read();
863 exec_subquery_with_read(&mut rtx, schema, stmt, ctes)
864}
865
866pub(super) fn exec_subquery_with_read(
867 rtx: &mut ReadTxn<'_>,
868 schema: &SchemaManager,
869 stmt: &SelectStmt,
870 ctes: &CteContext,
871) -> Result<QueryResult> {
872 match super::exec_select_with_read(rtx, schema, stmt, ctes)? {
873 ExecutionResult::Query(qr) => Ok(qr),
874 _ => Ok(QueryResult {
875 columns: vec![],
876 rows: vec![],
877 }),
878 }
879}
880
881pub(super) fn exec_subquery_write(
882 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
883 schema: &SchemaManager,
884 stmt: &SelectStmt,
885 ctes: &CteContext,
886) -> Result<QueryResult> {
887 match super::exec_select_in_txn(wtx, schema, stmt, ctes)? {
888 ExecutionResult::Query(qr) => Ok(qr),
889 _ => Ok(QueryResult {
890 columns: vec![],
891 rows: vec![],
892 }),
893 }
894}
895
896pub(super) fn update_has_subquery(stmt: &UpdateStmt) -> bool {
897 stmt.where_clause.as_ref().is_some_and(has_subquery)
898 || stmt.assignments.iter().any(|(_, e)| has_subquery(e))
899}
900
901pub(super) fn materialize_update(
902 stmt: &UpdateStmt,
903 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
904) -> Result<UpdateStmt> {
905 let where_clause = stmt
906 .where_clause
907 .as_ref()
908 .map(|e| materialize_expr(e, exec_sub))
909 .transpose()?;
910 let assignments = stmt
911 .assignments
912 .iter()
913 .map(|(name, expr)| Ok((name.clone(), materialize_expr(expr, exec_sub)?)))
914 .collect::<Result<Vec<_>>>()?;
915 Ok(UpdateStmt {
916 table: stmt.table.clone(),
917 assignments,
918 where_clause,
919 returning: stmt.returning.clone(),
920 })
921}
922
923pub(super) fn delete_has_subquery(stmt: &DeleteStmt) -> bool {
924 stmt.where_clause.as_ref().is_some_and(has_subquery)
925}
926
927pub(super) fn materialize_delete(
928 stmt: &DeleteStmt,
929 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
930) -> Result<DeleteStmt> {
931 let where_clause = stmt
932 .where_clause
933 .as_ref()
934 .map(|e| materialize_expr(e, exec_sub))
935 .transpose()?;
936 Ok(DeleteStmt {
937 table: stmt.table.clone(),
938 where_clause,
939 returning: stmt.returning.clone(),
940 })
941}
942
943pub(super) fn insert_has_subquery(stmt: &InsertStmt) -> bool {
944 match &stmt.source {
945 InsertSource::Values(rows) => rows.iter().any(|row| row.iter().any(has_subquery)),
946 InsertSource::Select(_) => false,
948 }
949}
950
951pub(super) fn materialize_insert(
952 stmt: &InsertStmt,
953 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
954) -> Result<InsertStmt> {
955 let source = match &stmt.source {
956 InsertSource::Values(rows) => {
957 let mat = rows
958 .iter()
959 .map(|row| {
960 row.iter()
961 .map(|e| materialize_expr(e, exec_sub))
962 .collect::<Result<Vec<_>>>()
963 })
964 .collect::<Result<Vec<_>>>()?;
965 InsertSource::Values(mat)
966 }
967 InsertSource::Select(sq) => {
968 let ctes = sq
969 .ctes
970 .iter()
971 .map(|c| {
972 Ok(CteDefinition {
973 name: c.name.clone(),
974 column_aliases: c.column_aliases.clone(),
975 body: materialize_query_body(&c.body, exec_sub)?,
976 })
977 })
978 .collect::<Result<Vec<_>>>()?;
979 let body = materialize_query_body(&sq.body, exec_sub)?;
980 InsertSource::Select(Box::new(SelectQuery {
981 ctes,
982 recursive: sq.recursive,
983 body,
984 }))
985 }
986 };
987 Ok(InsertStmt {
988 table: stmt.table.clone(),
989 columns: stmt.columns.clone(),
990 source,
991 on_conflict: stmt.on_conflict.clone(),
992 returning: stmt.returning.clone(),
993 })
994}
995
996pub(super) fn materialize_query_body(
997 body: &QueryBody,
998 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
999) -> Result<QueryBody> {
1000 match body {
1001 QueryBody::Select(sel) => Ok(QueryBody::Select(Box::new(materialize_stmt(
1002 sel, exec_sub,
1003 )?))),
1004 QueryBody::Compound(comp) => Ok(QueryBody::Compound(Box::new(CompoundSelect {
1005 op: comp.op.clone(),
1006 all: comp.all,
1007 left: Box::new(materialize_query_body(&comp.left, exec_sub)?),
1008 right: Box::new(materialize_query_body(&comp.right, exec_sub)?),
1009 order_by: comp.order_by.clone(),
1010 limit: comp.limit.clone(),
1011 offset: comp.offset.clone(),
1012 }))),
1013 QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Ok(body.clone()),
1014 }
1015}
1016
1017pub(super) fn exec_query_body_with_read(
1018 rtx: &mut ReadTxn<'_>,
1019 schema: &SchemaManager,
1020 body: &QueryBody,
1021 ctes: &CteContext,
1022) -> Result<ExecutionResult> {
1023 match body {
1024 QueryBody::Select(sel) => super::exec_select_with_read(rtx, schema, sel, ctes),
1025 QueryBody::Compound(comp) => exec_compound_select_with_read(rtx, schema, comp, ctes),
1026 QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Err(
1027 SqlError::Unsupported("DML CTE bodies require an active write transaction".into()),
1028 ),
1029 }
1030}
1031
1032pub(super) fn exec_query_body_in_txn(
1033 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1034 schema: &SchemaManager,
1035 body: &QueryBody,
1036 ctes: &CteContext,
1037) -> Result<ExecutionResult> {
1038 match body {
1039 QueryBody::Select(sel) => super::exec_select_in_txn(wtx, schema, sel, ctes),
1040 QueryBody::Compound(comp) => exec_compound_select_in_txn(wtx, schema, comp, ctes),
1041 QueryBody::Insert(ins) => exec_insert_in_txn_with_ctes(wtx, schema, ins, &[], ctes),
1042 QueryBody::Update(upd) => super::exec_update_in_txn(wtx, schema, upd),
1043 QueryBody::Delete(del) => super::exec_delete_in_txn(wtx, schema, del),
1044 }
1045}
1046
1047pub(super) fn exec_query_body_read(
1048 db: &Database,
1049 schema: &SchemaManager,
1050 body: &QueryBody,
1051 ctes: &CteContext,
1052) -> Result<QueryResult> {
1053 let mut rtx = db.begin_read();
1054 exec_query_body_with_read_qr(&mut rtx, schema, body, ctes)
1055}
1056
1057pub(super) fn exec_query_body_with_read_qr(
1058 rtx: &mut ReadTxn<'_>,
1059 schema: &SchemaManager,
1060 body: &QueryBody,
1061 ctes: &CteContext,
1062) -> Result<QueryResult> {
1063 match exec_query_body_with_read(rtx, schema, body, ctes)? {
1064 ExecutionResult::Query(qr) => Ok(qr),
1065 _ => Ok(QueryResult {
1066 columns: vec![],
1067 rows: vec![],
1068 }),
1069 }
1070}
1071
1072pub(super) fn exec_query_body_write(
1073 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1074 schema: &SchemaManager,
1075 body: &QueryBody,
1076 ctes: &CteContext,
1077) -> Result<QueryResult> {
1078 match exec_query_body_in_txn(wtx, schema, body, ctes)? {
1079 ExecutionResult::Query(qr) => Ok(qr),
1080 _ => Ok(QueryResult {
1081 columns: vec![],
1082 rows: vec![],
1083 }),
1084 }
1085}
1086
1087pub(super) fn exec_compound_select_with_read(
1088 rtx: &mut ReadTxn<'_>,
1089 schema: &SchemaManager,
1090 comp: &CompoundSelect,
1091 ctes: &CteContext,
1092) -> Result<ExecutionResult> {
1093 let left_qr = match exec_query_body_with_read(rtx, schema, &comp.left, ctes)? {
1094 ExecutionResult::Query(qr) => qr,
1095 _ => QueryResult {
1096 columns: vec![],
1097 rows: vec![],
1098 },
1099 };
1100 let right_qr = match exec_query_body_with_read(rtx, schema, &comp.right, ctes)? {
1101 ExecutionResult::Query(qr) => qr,
1102 _ => QueryResult {
1103 columns: vec![],
1104 rows: vec![],
1105 },
1106 };
1107 apply_set_operation(comp, left_qr, right_qr)
1108}
1109
1110pub(super) fn exec_compound_select_in_txn(
1111 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1112 schema: &SchemaManager,
1113 comp: &CompoundSelect,
1114 ctes: &CteContext,
1115) -> Result<ExecutionResult> {
1116 let left_qr = match exec_query_body_in_txn(wtx, schema, &comp.left, ctes)? {
1117 ExecutionResult::Query(qr) => qr,
1118 _ => QueryResult {
1119 columns: vec![],
1120 rows: vec![],
1121 },
1122 };
1123 let right_qr = match exec_query_body_in_txn(wtx, schema, &comp.right, ctes)? {
1124 ExecutionResult::Query(qr) => qr,
1125 _ => QueryResult {
1126 columns: vec![],
1127 rows: vec![],
1128 },
1129 };
1130 apply_set_operation(comp, left_qr, right_qr)
1131}
1132
1133pub(super) fn apply_set_operation(
1134 comp: &CompoundSelect,
1135 left_qr: QueryResult,
1136 right_qr: QueryResult,
1137) -> Result<ExecutionResult> {
1138 if !left_qr.columns.is_empty()
1139 && !right_qr.columns.is_empty()
1140 && left_qr.columns.len() != right_qr.columns.len()
1141 {
1142 return Err(SqlError::CompoundColumnCountMismatch {
1143 left: left_qr.columns.len(),
1144 right: right_qr.columns.len(),
1145 });
1146 }
1147
1148 let columns = left_qr.columns;
1149
1150 let mut rows = match (&comp.op, comp.all) {
1151 (SetOp::Union, true) => {
1152 let total = left_qr.rows.len().saturating_add(right_qr.rows.len());
1153 let mut rows = Vec::with_capacity(total);
1154 rows.extend(left_qr.rows);
1155 rows.extend(right_qr.rows);
1156 rows
1157 }
1158 (SetOp::Union, false) => {
1159 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1160 let mut rows = Vec::new();
1161 for row in left_qr.rows.into_iter().chain(right_qr.rows) {
1162 if !seen.contains(&row) {
1163 seen.insert(row.clone());
1164 rows.push(row);
1165 }
1166 }
1167 rows
1168 }
1169 (SetOp::Intersect, true) => {
1170 let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1171 for row in &right_qr.rows {
1172 *right_counts.entry(row.clone()).or_insert(0) += 1;
1173 }
1174 let mut rows = Vec::new();
1175 for row in left_qr.rows {
1176 if let Some(count) = right_counts.get_mut(&row) {
1177 if *count > 0 {
1178 *count -= 1;
1179 rows.push(row);
1180 }
1181 }
1182 }
1183 rows
1184 }
1185 (SetOp::Intersect, false) => {
1186 let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1187 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1188 let mut rows = Vec::new();
1189 for row in left_qr.rows {
1190 if right_set.contains(&row) && !seen.contains(&row) {
1191 seen.insert(row.clone());
1192 rows.push(row);
1193 }
1194 }
1195 rows
1196 }
1197 (SetOp::Except, true) => {
1198 let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1199 for row in &right_qr.rows {
1200 *right_counts.entry(row.clone()).or_insert(0) += 1;
1201 }
1202 let mut rows = Vec::new();
1203 for row in left_qr.rows {
1204 if let Some(count) = right_counts.get_mut(&row) {
1205 if *count > 0 {
1206 *count -= 1;
1207 continue;
1208 }
1209 }
1210 rows.push(row);
1211 }
1212 rows
1213 }
1214 (SetOp::Except, false) => {
1215 let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1216 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1217 let mut rows = Vec::new();
1218 for row in left_qr.rows {
1219 if !right_set.contains(&row) && !seen.contains(&row) {
1220 seen.insert(row.clone());
1221 rows.push(row);
1222 }
1223 }
1224 rows
1225 }
1226 };
1227
1228 if !comp.order_by.is_empty() {
1229 let col_defs: Vec<crate::types::ColumnDef> = columns
1230 .iter()
1231 .enumerate()
1232 .map(|(i, name)| crate::types::ColumnDef {
1233 name: name.clone(),
1234 data_type: crate::types::DataType::Null,
1235 nullable: true,
1236 position: i as u16,
1237 default_expr: None,
1238 default_sql: None,
1239 check_expr: None,
1240 check_sql: None,
1241 check_name: None,
1242 is_with_timezone: false,
1243 generated_expr: None,
1244 generated_sql: None,
1245 generated_kind: None,
1246 collation: crate::types::Collation::Binary,
1247 })
1248 .collect();
1249 sort_rows(&mut rows, &comp.order_by, &col_defs)?;
1250 }
1251
1252 if let Some(ref offset_expr) = comp.offset {
1253 let offset = eval_const_int(offset_expr)?.max(0) as usize;
1254 if offset < rows.len() {
1255 rows = rows.split_off(offset);
1256 } else {
1257 rows.clear();
1258 }
1259 }
1260
1261 if let Some(ref limit_expr) = comp.limit {
1262 let limit = eval_const_int(limit_expr)?.max(0) as usize;
1263 rows.truncate(limit);
1264 }
1265
1266 Ok(ExecutionResult::Query(QueryResult { columns, rows }))
1267}
1268
1269struct InsertBufs {
1270 row: Vec<Value>,
1271 pk_values: Vec<Value>,
1272 value_values: Vec<Value>,
1273 key_buf: Vec<u8>,
1274 value_buf: Vec<u8>,
1275 col_indices: Vec<usize>,
1276 fk_key_buf: Vec<u8>,
1277}
1278
1279impl InsertBufs {
1280 fn new() -> Self {
1281 Self {
1282 row: Vec::new(),
1283 pk_values: Vec::new(),
1284 value_values: Vec::new(),
1285 key_buf: Vec::with_capacity(64),
1286 value_buf: Vec::with_capacity(256),
1287 col_indices: Vec::new(),
1288 fk_key_buf: Vec::with_capacity(64),
1289 }
1290 }
1291}
1292
1293thread_local! {
1294 static INSERT_SCRATCH: RefCell<InsertBufs> = RefCell::new(InsertBufs::new());
1295 static UPSERT_SCRATCH: RefCell<UpsertBufs> = RefCell::new(UpsertBufs::new());
1296}
1297
1298fn with_insert_scratch<R>(f: impl FnOnce(&mut InsertBufs) -> R) -> R {
1299 INSERT_SCRATCH.with(|slot| match slot.try_borrow_mut() {
1300 Ok(mut borrowed) => f(&mut borrowed),
1301 Err(_) => {
1302 let mut local = InsertBufs::new();
1303 f(&mut local)
1304 }
1305 })
1306}
1307
1308pub(super) struct UpsertBufs {
1309 old_row: Vec<Value>,
1310 new_row: Vec<Value>,
1311 value_values: Vec<Value>,
1312 new_value_buf: Vec<u8>,
1313}
1314
1315impl UpsertBufs {
1316 pub(super) fn new() -> Self {
1317 Self {
1318 old_row: Vec::new(),
1319 new_row: Vec::new(),
1320 value_values: Vec::new(),
1321 new_value_buf: Vec::with_capacity(256),
1322 }
1323 }
1324}
1325
1326pub fn exec_insert_in_txn(
1327 wtx: &mut WriteTxn<'_>,
1328 schema: &SchemaManager,
1329 stmt: &InsertStmt,
1330 params: &[Value],
1331) -> Result<ExecutionResult> {
1332 with_insert_scratch(|bufs| {
1333 exec_insert_in_txn_impl(
1334 wtx,
1335 schema,
1336 stmt,
1337 params,
1338 bufs,
1339 None,
1340 &CteContext::default(),
1341 )
1342 })
1343}
1344
1345pub(super) fn exec_insert_in_txn_with_ctes(
1346 wtx: &mut WriteTxn<'_>,
1347 schema: &SchemaManager,
1348 stmt: &InsertStmt,
1349 params: &[Value],
1350 outer_ctes: &CteContext,
1351) -> Result<ExecutionResult> {
1352 with_insert_scratch(|bufs| {
1353 exec_insert_in_txn_impl(wtx, schema, stmt, params, bufs, None, outer_ctes)
1354 })
1355}
1356
1357fn exec_insert_in_txn_cached(
1358 wtx: &mut WriteTxn<'_>,
1359 schema: &SchemaManager,
1360 stmt: &InsertStmt,
1361 params: &[Value],
1362 cache: &InsertCache,
1363) -> Result<ExecutionResult> {
1364 with_insert_scratch(|bufs| {
1365 exec_insert_in_txn_impl(
1366 wtx,
1367 schema,
1368 stmt,
1369 params,
1370 bufs,
1371 Some(cache),
1372 &CteContext::default(),
1373 )
1374 })
1375}
1376
1377fn exec_insert_in_txn_impl(
1378 wtx: &mut WriteTxn<'_>,
1379 schema: &SchemaManager,
1380 stmt: &InsertStmt,
1381 params: &[Value],
1382 bufs: &mut InsertBufs,
1383 cache: Option<&InsertCache>,
1384 outer_ctes: &CteContext,
1385) -> Result<ExecutionResult> {
1386 let empty_ctes = CteContext::default();
1387 let materialized;
1388 let has_sub = match cache {
1389 Some(c) => c.has_subquery,
1390 None => insert_has_subquery(stmt),
1391 };
1392 let stmt = if has_sub {
1393 materialized = materialize_insert(stmt, &mut |sub| {
1394 exec_subquery_write(wtx, schema, sub, &empty_ctes)
1395 })?;
1396 &materialized
1397 } else {
1398 stmt
1399 };
1400
1401 let view_lookup_key = stmt.table.to_ascii_lowercase();
1402 if let Some(view_def) = schema.get_view(&view_lookup_key) {
1403 if super::triggers::has_instead_of(
1404 schema,
1405 &view_lookup_key,
1406 super::triggers::FireEvent::Insert,
1407 ) {
1408 let aliases = view_def.column_aliases.clone();
1409 return exec_instead_of_view_insert_in_txn(
1410 wtx,
1411 schema,
1412 &view_lookup_key,
1413 &aliases,
1414 stmt,
1415 params,
1416 );
1417 }
1418 return Err(SqlError::CannotModifyView(stmt.table.clone()));
1419 }
1420
1421 let table_schema = schema
1422 .get(&stmt.table)
1423 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1424 schema.mark_dml(&table_schema.name);
1425 super::ann_persist::purge_segment(wtx, &table_schema.name)?;
1426
1427 let default_columns;
1428 let insert_columns: &[String] = if stmt.columns.is_empty() {
1429 default_columns = table_schema
1430 .columns
1431 .iter()
1432 .map(|c| c.name.clone())
1433 .collect::<Vec<_>>();
1434 &default_columns
1435 } else {
1436 &stmt.columns
1437 };
1438
1439 bufs.col_indices.clear();
1440 if let Some(c) = cache {
1441 bufs.col_indices.extend_from_slice(&c.col_indices);
1442 } else {
1443 for name in insert_columns {
1444 bufs.col_indices.push(
1445 table_schema
1446 .column_index(name)
1447 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?,
1448 );
1449 }
1450 }
1451
1452 if cache.is_none() {
1453 for &ci in &bufs.col_indices {
1454 if table_schema.columns[ci].generated_kind.is_some() {
1455 return Err(SqlError::CannotInsertIntoGeneratedColumn(
1456 table_schema.columns[ci].name.clone(),
1457 ));
1458 }
1459 }
1460 }
1461
1462 let generated_cols_uncached: Vec<(usize, &Expr, FastGenEval)>;
1463 let cached_gen_positions: &[usize];
1464 let cached_gen_fast_evals: &[FastGenEval];
1465 if let Some(c) = cache {
1466 cached_gen_positions = &c.generated_col_positions;
1467 cached_gen_fast_evals = &c.generated_fast_evals;
1468 generated_cols_uncached = Vec::new();
1469 } else {
1470 cached_gen_positions = &[];
1471 cached_gen_fast_evals = &[];
1472 generated_cols_uncached = table_schema
1473 .columns
1474 .iter()
1475 .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
1476 .map(|c| {
1477 let expr = c.generated_expr.as_ref().unwrap();
1478 let fe = detect_fast_gen_eval(expr, table_schema);
1479 (c.position as usize, expr, fe)
1480 })
1481 .collect();
1482 }
1483 let has_gen_cols = !cached_gen_positions.is_empty() || !generated_cols_uncached.is_empty();
1484 let row_col_map_for_gen_owned: Option<ColumnMap> = if !has_gen_cols || cache.is_some() {
1485 None
1486 } else {
1487 Some(ColumnMap::new(&table_schema.columns))
1488 };
1489 let row_col_map_for_gen: Option<&ColumnMap> = if !has_gen_cols {
1490 None
1491 } else if let Some(c) = cache {
1492 c.row_col_map.as_ref()
1493 } else {
1494 row_col_map_for_gen_owned.as_ref()
1495 };
1496
1497 let any_defaults = match cache {
1498 Some(c) => c.any_defaults,
1499 None => table_schema
1500 .columns
1501 .iter()
1502 .any(|c| c.default_expr.is_some()),
1503 };
1504 let defaults: Vec<(usize, &Expr)> = if any_defaults {
1505 table_schema
1506 .columns
1507 .iter()
1508 .filter(|c| {
1509 c.default_expr.is_some() && !bufs.col_indices.contains(&(c.position as usize))
1510 })
1511 .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
1512 .collect()
1513 } else {
1514 Vec::new()
1515 };
1516
1517 let has_checks = match cache {
1518 Some(c) => c.has_checks,
1519 None => table_schema.has_checks(),
1520 };
1521 let check_col_map = if has_checks {
1522 Some(ColumnMap::new(&table_schema.columns))
1523 } else {
1524 None
1525 };
1526
1527 let (pk_indices, non_pk, enc_pos, phys_count, dropped): (
1528 &[usize],
1529 &[usize],
1530 &[u16],
1531 usize,
1532 &[u16],
1533 ) = if let Some(c) = cache {
1534 (
1535 &c.pk_indices,
1536 &c.non_pk_indices,
1537 &c.encoding_positions,
1538 c.phys_count,
1539 &c.dropped_non_pk_slots,
1540 )
1541 } else {
1542 (
1543 table_schema.pk_indices(),
1544 table_schema.non_pk_indices(),
1545 table_schema.encoding_positions(),
1546 table_schema.physical_non_pk_count(),
1547 table_schema.dropped_non_pk_slots(),
1548 )
1549 };
1550
1551 bufs.row.resize(table_schema.columns.len(), Value::Null);
1552 bufs.pk_values.resize(pk_indices.len(), Value::Null);
1553 bufs.value_values.resize(phys_count, Value::Null);
1554
1555 let table_bytes = table_schema.name.as_bytes();
1556 let has_fks = !table_schema.foreign_keys.is_empty();
1557 let has_indices = !table_schema.indices.is_empty();
1558 let has_defaults = !defaults.is_empty();
1559
1560 let compiled_conflict: Option<Arc<CompiledOnConflict>> = match (cache, &stmt.on_conflict) {
1561 (Some(c), Some(_)) if c.on_conflict.is_some() => c.on_conflict.clone(),
1562 (_, Some(oc)) => Some(Arc::new(compile_on_conflict(oc, table_schema)?)),
1563 (_, None) => None,
1564 };
1565
1566 let row_col_map_owned: Option<ColumnMap> =
1567 if compiled_conflict.is_some() && cache.and_then(|c| c.row_col_map.as_ref()).is_none() {
1568 Some(ColumnMap::new(&table_schema.columns))
1569 } else {
1570 None
1571 };
1572 let row_col_map: Option<&ColumnMap> = cache
1573 .and_then(|c| c.row_col_map.as_ref())
1574 .or(row_col_map_owned.as_ref());
1575
1576 let select_rows = match &stmt.source {
1577 InsertSource::Select(sq) => {
1578 let insert_ctes = super::materialize_all_ctes_with_outer(
1579 &sq.ctes,
1580 sq.recursive,
1581 outer_ctes,
1582 &mut |body, ctx| exec_query_body_write(wtx, schema, body, ctx),
1583 )?;
1584 let qr = exec_query_body_write(wtx, schema, &sq.body, &insert_ctes)?;
1585 Some(qr.rows)
1586 }
1587 InsertSource::Values(_) => None,
1588 };
1589
1590 let mut count: u64 = 0;
1591 let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
1592 stmt.returning.as_ref().map(|_| Vec::new());
1593
1594 let values = match &stmt.source {
1595 InsertSource::Values(rows) => Some(rows.as_slice()),
1596 InsertSource::Select(_) => None,
1597 };
1598 let sel_rows = select_rows.as_deref();
1599
1600 let total = match (values, sel_rows) {
1601 (Some(rows), _) => rows.len(),
1602 (_, Some(rows)) => rows.len(),
1603 _ => 0,
1604 };
1605
1606 if let Some(sel) = sel_rows {
1607 if !sel.is_empty() && sel[0].len() != insert_columns.len() {
1608 return Err(SqlError::InvalidValue(format!(
1609 "INSERT ... SELECT column count mismatch: expected {}, got {}",
1610 insert_columns.len(),
1611 sel[0].len()
1612 )));
1613 }
1614 }
1615
1616 let has_insert_statement_triggers_impl =
1617 schema.triggers_for(&table_schema.name).iter().any(|t| {
1618 t.enabled
1619 && t.granularity == crate::parser::TriggerGranularity::ForEachStatement
1620 && t.events
1621 .iter()
1622 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1623 });
1624 let mut stmt_new_rows_impl: Vec<Vec<Value>> = if has_insert_statement_triggers_impl {
1625 Vec::with_capacity(total)
1626 } else {
1627 Vec::new()
1628 };
1629 if has_insert_statement_triggers_impl {
1630 super::triggers::fire_statement_triggers(
1631 wtx,
1632 schema,
1633 &table_schema.name,
1634 crate::parser::TriggerTiming::Before,
1635 super::triggers::FireEvent::Insert,
1636 &table_schema.columns,
1637 &[],
1638 &[],
1639 )?;
1640 }
1641
1642 let skip_row_clear = cache.is_some_and(|c| c.row_fully_overwritten);
1643 for idx in 0..total {
1644 if !skip_row_clear {
1645 for v in bufs.row.iter_mut() {
1646 *v = Value::Null;
1647 }
1648 }
1649
1650 if let Some(value_rows) = values {
1651 if let Some(plan) = cache.and_then(|c| c.bind_plan.as_ref()) {
1652 for action in plan {
1653 match action {
1654 BindAction::Param {
1655 param_idx,
1656 col_idx,
1657 target,
1658 } => {
1659 let v = ¶ms[*param_idx];
1660 bufs.row[*col_idx] = if v.is_null() {
1661 Value::Null
1662 } else if v.data_type() == *target {
1663 v.clone()
1664 } else {
1665 let got = v.data_type();
1666 v.clone().coerce_into(*target).ok_or_else(|| {
1667 SqlError::TypeMismatch {
1668 expected: target.to_string(),
1669 got: got.to_string(),
1670 }
1671 })?
1672 };
1673 }
1674 BindAction::Literal { value, col_idx } => {
1675 bufs.row[*col_idx] = value.clone();
1676 }
1677 }
1678 }
1679 } else {
1680 let value_row = &value_rows[idx];
1681 if value_row.len() != insert_columns.len() {
1682 return Err(SqlError::InvalidValue(format!(
1683 "expected {} values, got {}",
1684 insert_columns.len(),
1685 value_row.len()
1686 )));
1687 }
1688 for (i, expr) in value_row.iter().enumerate() {
1689 let val = match expr {
1690 Expr::Parameter(n) => params
1691 .get(n - 1)
1692 .cloned()
1693 .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
1694 Expr::Literal(v) => v.clone(),
1695 _ => eval_const_expr(expr)?,
1696 };
1697 let col_idx = bufs.col_indices[i];
1698 let col = &table_schema.columns[col_idx];
1699 let got_type = val.data_type();
1700 bufs.row[col_idx] = if val.is_null() {
1701 Value::Null
1702 } else {
1703 val.coerce_into(col.data_type)
1704 .ok_or_else(|| SqlError::TypeMismatch {
1705 expected: col.data_type.to_string(),
1706 got: got_type.to_string(),
1707 })?
1708 };
1709 }
1710 }
1711 } else if let Some(sel) = sel_rows {
1712 let sel_row = &sel[idx];
1713 for (i, val) in sel_row.iter().enumerate() {
1714 let col_idx = bufs.col_indices[i];
1715 let col = &table_schema.columns[col_idx];
1716 let got_type = val.data_type();
1717 bufs.row[col_idx] = if val.is_null() {
1718 Value::Null
1719 } else {
1720 val.clone().coerce_into(col.data_type).ok_or_else(|| {
1721 SqlError::TypeMismatch {
1722 expected: col.data_type.to_string(),
1723 got: got_type.to_string(),
1724 }
1725 })?
1726 };
1727 }
1728 }
1729
1730 if has_defaults {
1731 for &(pos, def_expr) in &defaults {
1732 let val = eval_const_expr(def_expr)?;
1733 let col = &table_schema.columns[pos];
1734 if !val.is_null() {
1735 let got_type = val.data_type();
1736 bufs.row[pos] =
1737 val.coerce_into(col.data_type)
1738 .ok_or_else(|| SqlError::TypeMismatch {
1739 expected: col.data_type.to_string(),
1740 got: got_type.to_string(),
1741 })?;
1742 }
1743 }
1744 }
1745
1746 if let Some(gen_map) = row_col_map_for_gen {
1747 if cache.is_some() {
1748 for (pos, fast) in cached_gen_positions
1749 .iter()
1750 .copied()
1751 .zip(cached_gen_fast_evals.iter())
1752 {
1753 let gen_expr = table_schema.columns[pos].generated_expr.as_ref().unwrap();
1754 let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1755 let col = &table_schema.columns[pos];
1756 bufs.row[pos] = if val.is_null() {
1757 Value::Null
1758 } else {
1759 let got_type = val.data_type();
1760 val.coerce_into(col.data_type)
1761 .ok_or_else(|| SqlError::TypeMismatch {
1762 expected: col.data_type.to_string(),
1763 got: got_type.to_string(),
1764 })?
1765 };
1766 }
1767 } else {
1768 for (pos, gen_expr, fast) in &generated_cols_uncached {
1769 let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1770 let col = &table_schema.columns[*pos];
1771 bufs.row[*pos] = if val.is_null() {
1772 Value::Null
1773 } else {
1774 let got_type = val.data_type();
1775 val.coerce_into(col.data_type)
1776 .ok_or_else(|| SqlError::TypeMismatch {
1777 expected: col.data_type.to_string(),
1778 got: got_type.to_string(),
1779 })?
1780 };
1781 }
1782 }
1783 }
1784
1785 if let Some(c) = cache {
1786 for &pos in &c.not_null_indices {
1787 if bufs.row[pos as usize].is_null() {
1788 return Err(SqlError::NotNullViolation(
1789 table_schema.columns[pos as usize].name.clone(),
1790 ));
1791 }
1792 }
1793 } else {
1794 for col in &table_schema.columns {
1795 if !col.nullable && bufs.row[col.position as usize].is_null() {
1796 return Err(SqlError::NotNullViolation(col.name.clone()));
1797 }
1798 }
1799 }
1800
1801 if let Some(ref col_map) = check_col_map {
1802 for col in &table_schema.columns {
1803 if let Some(ref check) = col.check_expr {
1804 let result = eval_expr(check, &EvalCtx::new(col_map, &bufs.row))?;
1805 if !is_truthy(&result) && !result.is_null() {
1806 let name = col.check_name.as_deref().unwrap_or(&col.name);
1807 return Err(SqlError::CheckViolation(name.to_string()));
1808 }
1809 }
1810 }
1811 for tc in &table_schema.check_constraints {
1812 let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &bufs.row))?;
1813 if !is_truthy(&result) && !result.is_null() {
1814 let name = tc.name.as_deref().unwrap_or(&tc.sql);
1815 return Err(SqlError::CheckViolation(name.to_string()));
1816 }
1817 }
1818 }
1819
1820 if has_fks {
1821 for fk in &table_schema.foreign_keys {
1822 let any_null = fk.columns.iter().any(|&ci| bufs.row[ci as usize].is_null());
1823 if any_null {
1824 continue;
1825 }
1826 crate::encoding::encode_composite_key_from_indices(
1827 &fk.columns,
1828 &bufs.row,
1829 &mut bufs.fk_key_buf,
1830 );
1831 if fk.deferrable && fk.initially_deferred {
1832 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
1833 wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
1834 fk_name: name,
1835 foreign_table: fk.foreign_table.as_bytes().to_vec(),
1836 parent_key: bufs.fk_key_buf.clone(),
1837 });
1838 continue;
1839 }
1840 if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &bufs.fk_key_buf) {
1841 let found = wtx
1842 .table_get(fk.foreign_table.as_bytes(), &bufs.fk_key_buf)
1843 .map_err(SqlError::Storage)?;
1844 if found.is_none() {
1845 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
1846 return Err(SqlError::ForeignKeyViolation(name.to_string()));
1847 }
1848 wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &bufs.fk_key_buf);
1849 }
1850 }
1851 }
1852
1853 let proposed_row_for_returning: Option<Vec<Value>> =
1854 returning_rows.as_ref().map(|_| bufs.row.clone());
1855 let row_for_stmt_trigger_impl: Option<Vec<Value>> = if has_insert_statement_triggers_impl {
1856 Some(bufs.row.clone())
1857 } else {
1858 None
1859 };
1860
1861 let has_before_insert_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
1862 t.enabled
1863 && t.timing == crate::parser::TriggerTiming::Before
1864 && t.granularity == crate::parser::TriggerGranularity::ForEachRow
1865 && t.events
1866 .iter()
1867 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1868 });
1869 if has_before_insert_triggers {
1870 super::triggers::fire_row_triggers(
1871 wtx,
1872 schema,
1873 &table_schema.name,
1874 crate::parser::TriggerTiming::Before,
1875 super::triggers::FireEvent::Insert,
1876 None,
1877 Some(bufs.row.clone()),
1878 &table_schema.columns,
1879 )?;
1880 }
1881
1882 for (j, &i) in pk_indices.iter().enumerate() {
1883 bufs.pk_values[j] = std::mem::replace(&mut bufs.row[i], Value::Null);
1884 }
1885 match cache.map(|c| c.single_int_pk).unwrap_or(false) {
1886 true => match bufs.pk_values[0] {
1887 Value::Integer(v) => crate::encoding::encode_int_key_into(v, &mut bufs.key_buf),
1888 _ => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1889 },
1890 false => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1891 }
1892
1893 for &slot in dropped {
1894 bufs.value_values[slot as usize] = Value::Null;
1895 }
1896 for (j, &i) in non_pk.iter().enumerate() {
1897 let col = &table_schema.columns[i];
1898 if matches!(
1899 col.generated_kind,
1900 Some(crate::parser::GeneratedKind::Virtual)
1901 ) {
1902 bufs.value_values[enc_pos[j] as usize] = Value::Null;
1903 bufs.row[i] = Value::Null;
1904 } else {
1905 bufs.value_values[enc_pos[j] as usize] =
1906 std::mem::replace(&mut bufs.row[i], Value::Null);
1907 }
1908 }
1909 match cache.and_then(|c| c.row_encoder.as_ref()) {
1910 Some(tmpl) => crate::encoding::encode_int_row_with_template(
1911 tmpl,
1912 &bufs.value_values,
1913 &mut bufs.value_buf,
1914 )?,
1915 None => encode_row_into(&bufs.value_values, &mut bufs.value_buf),
1916 }
1917
1918 if bufs.key_buf.len() > citadel_core::MAX_KEY_SIZE {
1919 return Err(SqlError::KeyTooLarge {
1920 size: bufs.key_buf.len(),
1921 max: citadel_core::MAX_KEY_SIZE,
1922 });
1923 }
1924 if bufs.value_buf.len() > citadel_core::MAX_VALUE_SIZE {
1925 return Err(SqlError::RowTooLarge {
1926 size: bufs.value_buf.len(),
1927 max: citadel_core::MAX_VALUE_SIZE,
1928 });
1929 }
1930
1931 match compiled_conflict.as_ref() {
1932 None => {
1933 let is_new = wtx
1934 .table_insert(table_bytes, &bufs.key_buf, &bufs.value_buf)
1935 .map_err(SqlError::Storage)?;
1936 if !is_new {
1937 return Err(SqlError::DuplicateKey);
1938 }
1939 let has_after_insert_triggers =
1940 schema.triggers_for(&table_schema.name).iter().any(|t| {
1941 t.enabled
1942 && t.timing == crate::parser::TriggerTiming::After
1943 && t.granularity == crate::parser::TriggerGranularity::ForEachRow
1944 && t.events
1945 .iter()
1946 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1947 });
1948 if has_indices || has_after_insert_triggers {
1949 for (j, &i) in pk_indices.iter().enumerate() {
1950 bufs.row[i] = bufs.pk_values[j].clone();
1951 }
1952 for (j, &i) in non_pk.iter().enumerate() {
1953 bufs.row[i] = std::mem::replace(
1954 &mut bufs.value_values[enc_pos[j] as usize],
1955 Value::Null,
1956 );
1957 }
1958 if has_indices {
1959 insert_index_entries(wtx, table_schema, &bufs.row, &bufs.pk_values)?;
1960 }
1961 if has_after_insert_triggers {
1962 super::triggers::fire_row_triggers(
1963 wtx,
1964 schema,
1965 &table_schema.name,
1966 crate::parser::TriggerTiming::After,
1967 super::triggers::FireEvent::Insert,
1968 None,
1969 Some(bufs.row.clone()),
1970 &table_schema.columns,
1971 )?;
1972 }
1973 }
1974 if let Some(r) = row_for_stmt_trigger_impl.clone() {
1975 stmt_new_rows_impl.push(r);
1976 }
1977 count += 1;
1978 if let Some(buf) = returning_rows.as_mut() {
1979 buf.push((None, proposed_row_for_returning));
1980 }
1981 }
1982 Some(oc) => {
1983 let oc_ref: &CompiledOnConflict = oc;
1984 let needs_row = upsert_needs_row(oc_ref, table_schema);
1985 if needs_row {
1986 for (j, &i) in pk_indices.iter().enumerate() {
1987 bufs.row[i] = bufs.pk_values[j].clone();
1988 }
1989 for (j, &i) in non_pk.iter().enumerate() {
1990 bufs.row[i] = std::mem::replace(
1991 &mut bufs.value_values[enc_pos[j] as usize],
1992 Value::Null,
1993 );
1994 }
1995 }
1996 let outcome = apply_insert_with_conflict(
1997 wtx,
1998 table_schema,
1999 &bufs.key_buf,
2000 &bufs.value_buf,
2001 &bufs.row,
2002 &bufs.pk_values,
2003 oc_ref,
2004 row_col_map.unwrap(),
2005 stmt.returning.is_some(),
2006 )?;
2007 match outcome {
2008 InsertRowOutcome::Inserted => {
2009 count += 1;
2010 if let Some(buf) = returning_rows.as_mut() {
2011 buf.push((None, proposed_row_for_returning));
2012 }
2013 if let Some(r) = row_for_stmt_trigger_impl.clone() {
2014 stmt_new_rows_impl.push(r);
2015 }
2016 let has_after_insert_triggers =
2017 schema.triggers_for(&table_schema.name).iter().any(|t| {
2018 t.enabled
2019 && t.timing == crate::parser::TriggerTiming::After
2020 && t.granularity
2021 == crate::parser::TriggerGranularity::ForEachRow
2022 && t.events
2023 .iter()
2024 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
2025 });
2026 if has_after_insert_triggers {
2027 super::triggers::fire_row_triggers(
2028 wtx,
2029 schema,
2030 &table_schema.name,
2031 crate::parser::TriggerTiming::After,
2032 super::triggers::FireEvent::Insert,
2033 None,
2034 Some(bufs.row.clone()),
2035 &table_schema.columns,
2036 )?;
2037 }
2038 }
2039 InsertRowOutcome::Updated { old, new } => {
2040 count += 1;
2041 if let Some(buf) = returning_rows.as_mut() {
2042 buf.push((Some(old.clone()), Some(new.clone())));
2043 }
2044 let has_after_update_triggers =
2045 schema.triggers_for(&table_schema.name).iter().any(|t| {
2046 t.enabled
2047 && t.timing == crate::parser::TriggerTiming::After
2048 && t.granularity
2049 == crate::parser::TriggerGranularity::ForEachRow
2050 && t.events.iter().any(|e| {
2051 matches!(e, crate::parser::TriggerEvent::Update(_))
2052 })
2053 });
2054 if has_after_update_triggers {
2055 let changed_cols: Vec<String> = match oc_ref {
2056 CompiledOnConflict::DoUpdate { assignments, .. } => assignments
2057 .iter()
2058 .map(|(col_idx, _)| table_schema.columns[*col_idx].name.clone())
2059 .collect(),
2060 _ => Vec::new(),
2061 };
2062 super::triggers::fire_row_triggers(
2063 wtx,
2064 schema,
2065 &table_schema.name,
2066 crate::parser::TriggerTiming::After,
2067 super::triggers::FireEvent::Update {
2068 changed_columns: &changed_cols,
2069 },
2070 Some(old),
2071 Some(new),
2072 &table_schema.columns,
2073 )?;
2074 }
2075 }
2076 InsertRowOutcome::Skipped => {}
2077 }
2078 }
2079 }
2080 }
2081
2082 if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
2083 if has_insert_statement_triggers_impl {
2084 super::triggers::fire_statement_triggers(
2085 wtx,
2086 schema,
2087 &table_schema.name,
2088 crate::parser::TriggerTiming::After,
2089 super::triggers::FireEvent::Insert,
2090 &table_schema.columns,
2091 &[],
2092 &stmt_new_rows_impl,
2093 )?;
2094 }
2095 return Ok(ExecutionResult::Query(super::helpers::project_returning(
2096 table_schema,
2097 returning_cols,
2098 &rows,
2099 )?));
2100 }
2101
2102 if has_insert_statement_triggers_impl {
2103 super::triggers::fire_statement_triggers(
2104 wtx,
2105 schema,
2106 &table_schema.name,
2107 crate::parser::TriggerTiming::After,
2108 super::triggers::FireEvent::Insert,
2109 &table_schema.columns,
2110 &[],
2111 &stmt_new_rows_impl,
2112 )?;
2113 }
2114
2115 Ok(ExecutionResult::RowsAffected(count))
2116}
2117
2118pub struct CompiledInsert {
2119 table_lower: String,
2120 cached: Option<InsertCache>,
2121}
2122
2123struct InsertCache {
2124 col_indices: Vec<usize>,
2125 has_subquery: bool,
2126 any_defaults: bool,
2127 has_checks: bool,
2128 on_conflict: Option<Arc<CompiledOnConflict>>,
2129 row_col_map: Option<ColumnMap>,
2130 generated_col_positions: Vec<usize>,
2131 generated_fast_evals: Vec<FastGenEval>,
2132 pk_indices: Vec<usize>,
2133 non_pk_indices: Vec<usize>,
2134 encoding_positions: Vec<u16>,
2135 dropped_non_pk_slots: Vec<u16>,
2136 phys_count: usize,
2137 single_int_pk: bool,
2138 not_null_indices: Vec<u16>,
2139 bind_plan: Option<Vec<BindAction>>,
2140 row_fully_overwritten: bool,
2141 row_encoder: Option<crate::encoding::IntRowTemplate>,
2142 is_trivial_fast: bool,
2143 trivial_fast_program: Option<TrivialFastProgram>,
2144 needs_scoped_params: bool,
2145}
2146
2147#[derive(Clone)]
2148enum BindAction {
2149 Param {
2150 param_idx: usize,
2151 col_idx: usize,
2152 target: DataType,
2153 },
2154 Literal {
2155 value: Value,
2156 col_idx: usize,
2157 },
2158}
2159
2160#[derive(Clone)]
2161struct TrivialFastProgram {
2162 template: Vec<u8>,
2163 ops: Vec<WriteOp>,
2164 pk_param: u8,
2165 not_null_param_indices: Vec<u8>,
2166}
2167
2168#[derive(Clone)]
2169enum WriteOp {
2170 ParamI64 {
2171 param_idx: u8,
2172 off: u32,
2173 },
2174 LiteralI64 {
2175 value: i64,
2176 off: u32,
2177 },
2178 GenAddParamsI64 {
2179 a_param: u8,
2180 b_param: u8,
2181 off: u32,
2182 bitmap_byte_off: u32,
2183 bitmap_bit_mask: u8,
2184 },
2185 GenMulAddParamI64 {
2186 param_idx: u8,
2187 mul: i64,
2188 add: i64,
2189 off: u32,
2190 bitmap_byte_off: u32,
2191 bitmap_bit_mask: u8,
2192 },
2193}
2194
2195fn build_trivial_fast_program(
2196 bind_plan: &[BindAction],
2197 row_encoder: &crate::encoding::IntRowTemplate,
2198 non_virtual_pairs: &[(usize, usize)],
2199 generated_col_positions: &[usize],
2200 generated_fast_evals: &[FastGenEval],
2201 pk_indices: &[usize],
2202 columns: &[crate::types::ColumnDef],
2203) -> Option<TrivialFastProgram> {
2204 let pk_col = pk_indices[0];
2205 let col_to_slot: rustc_hash::FxHashMap<usize, usize> =
2206 non_virtual_pairs.iter().copied().collect();
2207 let slot_to_off: rustc_hash::FxHashMap<usize, usize> =
2208 row_encoder.slot_offsets.iter().copied().collect();
2209
2210 let mut col_to_param: rustc_hash::FxHashMap<usize, u8> = Default::default();
2211 let mut col_to_lit_int: rustc_hash::FxHashMap<usize, i64> = Default::default();
2212 let mut pk_param: Option<u8> = None;
2213 let mut ops: Vec<WriteOp> = Vec::with_capacity(bind_plan.len() + generated_col_positions.len());
2214 let mut not_null_param_indices: Vec<u8> = Vec::new();
2215
2216 for action in bind_plan {
2217 match action {
2218 BindAction::Param {
2219 param_idx,
2220 col_idx,
2221 target,
2222 } => {
2223 if *target != DataType::Integer {
2224 return None;
2225 }
2226 let pi: u8 = u8::try_from(*param_idx).ok()?;
2227 col_to_param.insert(*col_idx, pi);
2228 if *col_idx == pk_col {
2229 pk_param = Some(pi);
2230 } else {
2231 let slot = *col_to_slot.get(col_idx)?;
2232 let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
2233 ops.push(WriteOp::ParamI64 { param_idx: pi, off });
2234 if !columns[*col_idx].nullable {
2235 not_null_param_indices.push(pi);
2236 }
2237 }
2238 }
2239 BindAction::Literal { value, col_idx } => match value {
2240 Value::Integer(v) => {
2241 col_to_lit_int.insert(*col_idx, *v);
2242 if *col_idx == pk_col {
2243 return None;
2244 }
2245 let slot = *col_to_slot.get(col_idx)?;
2246 let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
2247 ops.push(WriteOp::LiteralI64 { value: *v, off });
2248 }
2249 _ => return None,
2250 },
2251 }
2252 }
2253
2254 let pk_param = pk_param?;
2255
2256 for (i, &gen_pos) in generated_col_positions.iter().enumerate() {
2257 let gen_slot = *col_to_slot.get(&gen_pos)?;
2258 let gen_off = u32::try_from(*slot_to_off.get(&gen_slot)?).ok()?;
2259 let bitmap_byte_off = u32::try_from(2 + gen_slot / 8).ok()?;
2260 let bitmap_bit_mask: u8 = 1u8 << (gen_slot % 8);
2261 let gen_col_nullable = columns[gen_pos].nullable;
2262
2263 match &generated_fast_evals[i] {
2264 FastGenEval::IntColAddCol {
2265 left_idx,
2266 right_idx,
2267 } => {
2268 let a_param = col_to_param.get(left_idx).copied();
2269 let b_param = col_to_param.get(right_idx).copied();
2270 match (a_param, b_param) {
2271 (Some(ap), Some(bp)) => {
2272 let deps_safe = gen_col_nullable
2273 || (not_null_param_indices.contains(&ap)
2274 && not_null_param_indices.contains(&bp));
2275 if !deps_safe {
2276 return None;
2277 }
2278 ops.push(WriteOp::GenAddParamsI64 {
2279 a_param: ap,
2280 b_param: bp,
2281 off: gen_off,
2282 bitmap_byte_off,
2283 bitmap_bit_mask,
2284 });
2285 }
2286 (Some(p), None) => {
2287 let lit = col_to_lit_int.get(right_idx).copied()?;
2288 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2289 return None;
2290 }
2291 ops.push(WriteOp::GenMulAddParamI64 {
2292 param_idx: p,
2293 mul: 1,
2294 add: lit,
2295 off: gen_off,
2296 bitmap_byte_off,
2297 bitmap_bit_mask,
2298 });
2299 }
2300 (None, Some(p)) => {
2301 let lit = col_to_lit_int.get(left_idx).copied()?;
2302 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2303 return None;
2304 }
2305 ops.push(WriteOp::GenMulAddParamI64 {
2306 param_idx: p,
2307 mul: 1,
2308 add: lit,
2309 off: gen_off,
2310 bitmap_byte_off,
2311 bitmap_bit_mask,
2312 });
2313 }
2314 (None, None) => {
2315 let la = col_to_lit_int.get(left_idx).copied()?;
2316 let lb = col_to_lit_int.get(right_idx).copied()?;
2317 ops.push(WriteOp::LiteralI64 {
2318 value: la.wrapping_add(lb),
2319 off: gen_off,
2320 });
2321 }
2322 }
2323 }
2324 FastGenEval::IntColMulAdd {
2325 col_schema_idx,
2326 mul,
2327 add,
2328 } => {
2329 if let Some(p) = col_to_param.get(col_schema_idx).copied() {
2330 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2331 return None;
2332 }
2333 ops.push(WriteOp::GenMulAddParamI64 {
2334 param_idx: p,
2335 mul: *mul,
2336 add: *add,
2337 off: gen_off,
2338 bitmap_byte_off,
2339 bitmap_bit_mask,
2340 });
2341 } else if let Some(lit) = col_to_lit_int.get(col_schema_idx).copied() {
2342 ops.push(WriteOp::LiteralI64 {
2343 value: lit.wrapping_mul(*mul).wrapping_add(*add),
2344 off: gen_off,
2345 });
2346 } else {
2347 return None;
2348 }
2349 }
2350 FastGenEval::None => return None,
2351 }
2352 }
2353
2354 Some(TrivialFastProgram {
2355 template: row_encoder.template.clone(),
2356 ops,
2357 pk_param,
2358 not_null_param_indices,
2359 })
2360}
2361
2362#[derive(Clone)]
2363pub(super) enum CompiledOnConflict {
2364 DoNothing {
2365 target: Option<ConflictKind>,
2366 },
2367 DoUpdate {
2368 target: ConflictKind,
2369 assignments: Vec<(usize, Expr)>,
2370 where_clause: Option<Expr>,
2371 fast_paths: Option<Vec<DoUpdateFastPath>>,
2372 },
2373}
2374
2375#[derive(Clone, Copy)]
2376pub(super) enum DoUpdateFastPath {
2377 IntAddConst { phys_idx: usize, delta: i64 },
2378}
2379
2380#[derive(Clone, Debug)]
2381pub(super) enum ConflictKind {
2382 PrimaryKey,
2383 UniqueIndex { index_idx: usize },
2384}
2385
2386fn resolve_conflict_target(target: &ConflictTarget, ts: &TableSchema) -> Result<ConflictKind> {
2387 match target {
2388 ConflictTarget::Columns(cols) => {
2389 let col_idx_set: Vec<u16> = cols
2390 .iter()
2391 .map(|name| {
2392 ts.column_index(name)
2393 .map(|i| i as u16)
2394 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
2395 })
2396 .collect::<Result<_>>()?;
2397 let pk_set = ts.primary_key_columns.clone();
2398 if set_equal(&col_idx_set, &pk_set) {
2399 return Ok(ConflictKind::PrimaryKey);
2400 }
2401 for (index_idx, idx) in ts.indices.iter().enumerate() {
2402 if idx.unique && set_equal(&col_idx_set, &idx.columns_vec()) {
2403 return Ok(ConflictKind::UniqueIndex { index_idx });
2404 }
2405 }
2406 Err(SqlError::Plan(
2407 "ON CONFLICT target does not match any unique constraint".into(),
2408 ))
2409 }
2410 ConflictTarget::Constraint(name) => {
2411 let lower = name.to_ascii_lowercase();
2412 for (index_idx, idx) in ts.indices.iter().enumerate() {
2413 if idx.name.eq_ignore_ascii_case(&lower) {
2414 if idx.unique {
2415 return Ok(ConflictKind::UniqueIndex { index_idx });
2416 }
2417 return Err(SqlError::Plan(format!(
2418 "ON CONFLICT ON CONSTRAINT '{name}' requires a unique index"
2419 )));
2420 }
2421 }
2422 Err(SqlError::Plan(format!(
2423 "unknown constraint '{name}'; primary keys cannot be referenced by name, use ON CONFLICT (col_list)"
2424 )))
2425 }
2426 }
2427}
2428
2429fn set_equal(a: &[u16], b: &[u16]) -> bool {
2430 if a.len() != b.len() {
2431 return false;
2432 }
2433 let mut a_sorted = a.to_vec();
2434 let mut b_sorted = b.to_vec();
2435 a_sorted.sort_unstable();
2436 b_sorted.sort_unstable();
2437 a_sorted == b_sorted
2438}
2439
2440pub(super) enum InsertRowOutcome {
2441 Inserted,
2442 Updated { old: Vec<Value>, new: Vec<Value> },
2443 Skipped,
2444}
2445
2446#[allow(clippy::too_many_arguments)]
2447#[inline]
2448pub(super) fn apply_insert_with_conflict(
2449 wtx: &mut WriteTxn<'_>,
2450 table_schema: &TableSchema,
2451 key_buf: &[u8],
2452 value_buf: &[u8],
2453 row: &[Value],
2454 pk_values: &[Value],
2455 on_conflict: &CompiledOnConflict,
2456 col_map: &ColumnMap,
2457 capture_returning: bool,
2458) -> Result<InsertRowOutcome> {
2459 let table_bytes = table_schema.name.as_bytes();
2460
2461 if let CompiledOnConflict::DoNothing { target } = on_conflict {
2462 let pk_target = matches!(target, None | Some(ConflictKind::PrimaryKey));
2463 if pk_target && table_schema.indices.is_empty() && table_schema.foreign_keys.is_empty() {
2464 let inserted = wtx
2465 .table_insert_if_absent(table_bytes, key_buf, value_buf)
2466 .map_err(SqlError::Storage)?;
2467 return Ok(if inserted {
2468 InsertRowOutcome::Inserted
2469 } else {
2470 InsertRowOutcome::Skipped
2471 });
2472 }
2473 }
2474
2475 if let CompiledOnConflict::DoUpdate {
2476 target: ConflictKind::PrimaryKey,
2477 assignments,
2478 where_clause,
2479 fast_paths,
2480 } = on_conflict
2481 {
2482 if can_fuse_do_update(table_schema, assignments) {
2483 return apply_do_update_fused(
2484 wtx,
2485 table_schema,
2486 table_bytes,
2487 key_buf,
2488 value_buf,
2489 row,
2490 assignments,
2491 where_clause.as_ref(),
2492 col_map,
2493 fast_paths.as_deref(),
2494 capture_returning,
2495 );
2496 }
2497 }
2498
2499 let primary_outcome = wtx
2500 .table_insert_or_fetch(table_bytes, key_buf, value_buf)
2501 .map_err(SqlError::Storage)?;
2502
2503 match primary_outcome {
2504 citadel_txn::write_txn::InsertOutcome::Inserted => {
2505 if table_schema.indices.is_empty() {
2506 return Ok(InsertRowOutcome::Inserted);
2507 }
2508 let mut inserted_keys: Vec<(usize, Vec<u8>)> = Vec::new();
2509 match insert_index_entries_or_fetch(
2510 wtx,
2511 table_schema,
2512 row,
2513 pk_values,
2514 &mut inserted_keys,
2515 )? {
2516 None => Ok(InsertRowOutcome::Inserted),
2517 Some(conflicting_idx) => {
2518 let matches_target =
2519 matches!(on_conflict, CompiledOnConflict::DoNothing { target: None })
2520 || matches!(
2521 on_conflict,
2522 CompiledOnConflict::DoNothing {
2523 target: Some(ConflictKind::UniqueIndex { index_idx }),
2524 } | CompiledOnConflict::DoUpdate {
2525 target: ConflictKind::UniqueIndex { index_idx },
2526 ..
2527 } if *index_idx == conflicting_idx
2528 );
2529 undo_partial_insert(wtx, table_schema, key_buf, &inserted_keys)?;
2530 if !matches_target {
2531 return Err(SqlError::UniqueViolation(
2532 table_schema.indices[conflicting_idx].name.clone(),
2533 ));
2534 }
2535 match on_conflict {
2536 CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2537 CompiledOnConflict::DoUpdate {
2538 assignments,
2539 where_clause,
2540 ..
2541 } => {
2542 let existing_pk =
2543 fetch_unique_index_pk(wtx, table_schema, conflicting_idx, row)?;
2544 apply_do_update(
2545 wtx,
2546 table_schema,
2547 &existing_pk,
2548 row,
2549 assignments,
2550 where_clause.as_ref(),
2551 col_map,
2552 capture_returning,
2553 )
2554 }
2555 }
2556 }
2557 }
2558 }
2559 citadel_txn::write_txn::InsertOutcome::Existed(old_bytes) => {
2560 let matches_target = matches!(
2561 on_conflict,
2562 CompiledOnConflict::DoNothing { target: None }
2563 | CompiledOnConflict::DoNothing {
2564 target: Some(ConflictKind::PrimaryKey),
2565 }
2566 | CompiledOnConflict::DoUpdate {
2567 target: ConflictKind::PrimaryKey,
2568 ..
2569 }
2570 );
2571 if !matches_target {
2572 return Err(SqlError::DuplicateKey);
2573 }
2574 match on_conflict {
2575 CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2576 CompiledOnConflict::DoUpdate {
2577 assignments,
2578 where_clause,
2579 ..
2580 } => {
2581 let old_row = decode_full_row(table_schema, key_buf, &old_bytes)?;
2582 apply_do_update_with_old_row(
2583 wtx,
2584 table_schema,
2585 key_buf,
2586 &old_row,
2587 row,
2588 assignments,
2589 where_clause.as_ref(),
2590 col_map,
2591 capture_returning,
2592 )
2593 }
2594 }
2595 }
2596 }
2597}
2598
2599#[inline]
2600fn apply_fast_path_patch(
2601 old_bytes: &[u8],
2602 fast_paths: &[DoUpdateFastPath],
2603) -> Result<UpsertAction> {
2604 UPSERT_SCRATCH.with(|slot| {
2605 let mut bufs = slot.borrow_mut();
2606 bufs.new_value_buf.clear();
2607 bufs.new_value_buf.extend_from_slice(old_bytes);
2608
2609 let mut patch_scratch: Vec<u8> = Vec::new();
2610
2611 for fp in fast_paths {
2612 match fp {
2613 DoUpdateFastPath::IntAddConst { phys_idx, delta } => {
2614 let decoded =
2615 crate::encoding::decode_columns(&bufs.new_value_buf, &[*phys_idx])?;
2616 let old_val = &decoded[0];
2617 let new_val = match old_val {
2618 Value::Integer(i) => Value::Integer(i.wrapping_add(*delta)),
2619 Value::Null => Value::Null,
2620 _ => {
2621 return Err(SqlError::TypeMismatch {
2622 expected: "INTEGER".into(),
2623 got: old_val.data_type().to_string(),
2624 });
2625 }
2626 };
2627 if !crate::encoding::patch_column_in_place(
2628 &mut bufs.new_value_buf,
2629 *phys_idx,
2630 &new_val,
2631 )? {
2632 patch_scratch.clear();
2633 crate::encoding::patch_row_column(
2634 &bufs.new_value_buf,
2635 *phys_idx,
2636 &new_val,
2637 &mut patch_scratch,
2638 )?;
2639 std::mem::swap(&mut bufs.new_value_buf, &mut patch_scratch);
2640 }
2641 }
2642 }
2643 }
2644
2645 if bufs.new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2646 return Err(SqlError::RowTooLarge {
2647 size: bufs.new_value_buf.len(),
2648 max: citadel_core::MAX_VALUE_SIZE,
2649 });
2650 }
2651
2652 Ok(UpsertAction::Replace(bufs.new_value_buf.clone()))
2653 })
2654}
2655
2656fn upsert_needs_row(oc: &CompiledOnConflict, ts: &TableSchema) -> bool {
2657 if !ts.indices.is_empty() {
2658 return true;
2659 }
2660 match oc {
2661 CompiledOnConflict::DoNothing { .. } => false,
2662 CompiledOnConflict::DoUpdate { fast_paths, .. } => fast_paths.is_none() || ts.has_checks(),
2663 }
2664}
2665
2666fn can_fuse_do_update(ts: &TableSchema, assignments: &[(usize, Expr)]) -> bool {
2667 if !ts.indices.is_empty() {
2668 return false;
2669 }
2670 if !ts.foreign_keys.is_empty() {
2671 return false;
2672 }
2673 if ts.columns.iter().any(|c| c.generated_kind.is_some()) {
2674 return false;
2675 }
2676 let pk = ts.pk_indices();
2677 !assignments.iter().any(|(ci, _)| pk.contains(ci))
2678}
2679
2680#[allow(clippy::too_many_arguments)]
2681#[inline]
2682fn apply_do_update_fused(
2683 wtx: &mut WriteTxn<'_>,
2684 table_schema: &TableSchema,
2685 table_bytes: &[u8],
2686 key_buf: &[u8],
2687 value_buf: &[u8],
2688 proposed_row: &[Value],
2689 assignments: &[(usize, Expr)],
2690 where_clause: Option<&Expr>,
2691 col_map: &ColumnMap,
2692 fast_paths: Option<&[DoUpdateFastPath]>,
2693 capture_returning: bool,
2694) -> Result<InsertRowOutcome> {
2695 let non_pk = table_schema.non_pk_indices();
2696 let enc_pos = table_schema.encoding_positions();
2697 let phys_count = table_schema.physical_non_pk_count();
2698 let dropped = table_schema.dropped_non_pk_slots();
2699 let has_checks = table_schema.has_checks();
2700 let has_fks = !table_schema.foreign_keys.is_empty();
2701
2702 let captured: std::cell::RefCell<Option<(Vec<Value>, Vec<Value>)>> =
2703 std::cell::RefCell::new(None);
2704
2705 let outcome =
2706 wtx.table_upsert_with::<_, SqlError>(table_bytes, key_buf, value_buf, |old_bytes| {
2707 if let Some(fps) = fast_paths {
2708 if !has_checks {
2709 let action = apply_fast_path_patch(old_bytes, fps)?;
2710 if capture_returning {
2711 if let UpsertAction::Replace(ref new_bytes) = action {
2712 let old_row = decode_full_row(table_schema, key_buf, old_bytes)?;
2713 let new_row = decode_full_row(table_schema, key_buf, new_bytes)?;
2714 *captured.borrow_mut() = Some((old_row, new_row));
2715 }
2716 }
2717 return Ok(action);
2718 }
2719 }
2720 UPSERT_SCRATCH.with(|slot| {
2721 let mut bufs = slot.borrow_mut();
2722 let UpsertBufs {
2723 old_row,
2724 new_row,
2725 value_values,
2726 new_value_buf,
2727 } = &mut *bufs;
2728
2729 old_row.clear();
2730 old_row.resize(table_schema.columns.len(), Value::Null);
2731 decode_full_row_into(table_schema, key_buf, old_bytes, old_row)?;
2732
2733 if let Some(w) = where_clause {
2734 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2735 let result = eval_expr(w, &ctx)?;
2736 if result.is_null() || !is_truthy(&result) {
2737 return Ok(UpsertAction::Skip);
2738 }
2739 }
2740
2741 new_row.clear();
2742 new_row.extend_from_slice(old_row);
2743 for (col_idx, expr) in assignments {
2744 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2745 let val = eval_expr(expr, &ctx)?;
2746 let col = &table_schema.columns[*col_idx];
2747 new_row[*col_idx] = if val.is_null() {
2748 Value::Null
2749 } else {
2750 let got = val.data_type();
2751 val.coerce_into(col.data_type)
2752 .ok_or_else(|| SqlError::TypeMismatch {
2753 expected: col.data_type.to_string(),
2754 got: got.to_string(),
2755 })?
2756 };
2757 }
2758
2759 for (assigned_idx, _) in assignments {
2760 let col = &table_schema.columns[*assigned_idx];
2761 if !col.nullable && new_row[col.position as usize].is_null() {
2762 return Err(SqlError::NotNullViolation(col.name.clone()));
2763 }
2764 }
2765 if has_checks {
2766 for col in &table_schema.columns {
2767 if let Some(ref check) = col.check_expr {
2768 let ctx = EvalCtx::new(col_map, new_row);
2769 let result = eval_expr(check, &ctx)?;
2770 if !is_truthy(&result) && !result.is_null() {
2771 let name = col.check_name.as_deref().unwrap_or(&col.name);
2772 return Err(SqlError::CheckViolation(name.to_string()));
2773 }
2774 }
2775 }
2776 for tc in &table_schema.check_constraints {
2777 let ctx = EvalCtx::new(col_map, new_row);
2778 let result = eval_expr(&tc.expr, &ctx)?;
2779 if !is_truthy(&result) && !result.is_null() {
2780 let name = tc.name.as_deref().unwrap_or(&tc.sql);
2781 return Err(SqlError::CheckViolation(name.to_string()));
2782 }
2783 }
2784 }
2785 let _ = has_fks;
2786
2787 value_values.clear();
2788 value_values.resize(phys_count, Value::Null);
2789 for &slot in dropped {
2790 value_values[slot as usize] = Value::Null;
2791 }
2792 for (j, &i) in non_pk.iter().enumerate() {
2793 value_values[enc_pos[j] as usize] = new_row[i].clone();
2794 }
2795 new_value_buf.clear();
2796 crate::encoding::encode_row_into(value_values, new_value_buf);
2797
2798 if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2799 return Err(SqlError::RowTooLarge {
2800 size: new_value_buf.len(),
2801 max: citadel_core::MAX_VALUE_SIZE,
2802 });
2803 }
2804
2805 if capture_returning {
2806 *captured.borrow_mut() = Some((old_row.clone(), new_row.clone()));
2807 }
2808 Ok(UpsertAction::Replace(new_value_buf.clone()))
2809 })
2810 })?;
2811
2812 match outcome {
2813 UpsertOutcome::Inserted => Ok(InsertRowOutcome::Inserted),
2814 UpsertOutcome::Updated => {
2815 if capture_returning {
2816 let (old, new) = captured.into_inner().ok_or_else(|| {
2817 SqlError::InvalidValue("DO UPDATE produced no captured rows".into())
2818 })?;
2819 Ok(InsertRowOutcome::Updated { old, new })
2820 } else {
2821 Ok(InsertRowOutcome::Inserted)
2822 }
2823 }
2824 UpsertOutcome::Skipped => Ok(InsertRowOutcome::Skipped),
2825 }
2826}
2827
2828fn fetch_unique_index_pk(
2829 wtx: &mut WriteTxn<'_>,
2830 table_schema: &TableSchema,
2831 index_idx: usize,
2832 row: &[Value],
2833) -> Result<Vec<u8>> {
2834 let idx = &table_schema.indices[index_idx];
2835 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2836 let indexed: Vec<Value> = idx
2837 .column_positions_iter()
2838 .map(|col_idx| row[col_idx as usize].clone())
2839 .collect();
2840 let key = crate::encoding::encode_composite_key(&indexed);
2841 let value = wtx
2842 .table_get(&idx_table, &key)
2843 .map_err(SqlError::Storage)?
2844 .ok_or_else(|| {
2845 SqlError::InvalidValue("unique index missing expected collision entry".into())
2846 })?;
2847 Ok(value)
2848}
2849
2850#[allow(clippy::too_many_arguments)]
2851fn apply_do_update(
2852 wtx: &mut WriteTxn<'_>,
2853 table_schema: &TableSchema,
2854 pk_key: &[u8],
2855 proposed_row: &[Value],
2856 assignments: &[(usize, Expr)],
2857 where_clause: Option<&Expr>,
2858 col_map: &ColumnMap,
2859 capture_returning: bool,
2860) -> Result<InsertRowOutcome> {
2861 let old_value = wtx
2862 .table_get(table_schema.name.as_bytes(), pk_key)
2863 .map_err(SqlError::Storage)?
2864 .ok_or_else(|| SqlError::InvalidValue("primary row missing for DO UPDATE target".into()))?;
2865 let old_row = decode_full_row(table_schema, pk_key, &old_value)?;
2866 apply_do_update_with_old_row(
2867 wtx,
2868 table_schema,
2869 pk_key,
2870 &old_row,
2871 proposed_row,
2872 assignments,
2873 where_clause,
2874 col_map,
2875 capture_returning,
2876 )
2877}
2878
2879#[allow(clippy::too_many_arguments)]
2880fn apply_do_update_with_old_row(
2881 wtx: &mut WriteTxn<'_>,
2882 table_schema: &TableSchema,
2883 old_pk_key: &[u8],
2884 old_row: &[Value],
2885 proposed_row: &[Value],
2886 assignments: &[(usize, Expr)],
2887 where_clause: Option<&Expr>,
2888 col_map: &ColumnMap,
2889 capture_returning: bool,
2890) -> Result<InsertRowOutcome> {
2891 if let Some(w) = where_clause {
2892 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2893 let result = eval_expr(w, &ctx)?;
2894 if result.is_null() || !is_truthy(&result) {
2895 return Ok(InsertRowOutcome::Skipped);
2896 }
2897 }
2898
2899 let mut new_row = old_row.to_vec();
2900 for (col_idx, expr) in assignments {
2901 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2902 let val = eval_expr(expr, &ctx)?;
2903 let col = &table_schema.columns[*col_idx];
2904 new_row[*col_idx] = if val.is_null() {
2905 Value::Null
2906 } else {
2907 let got = val.data_type();
2908 val.coerce_into(col.data_type)
2909 .ok_or_else(|| SqlError::TypeMismatch {
2910 expected: col.data_type.to_string(),
2911 got: got.to_string(),
2912 })?
2913 };
2914 }
2915
2916 for col in &table_schema.columns {
2917 if matches!(
2918 col.generated_kind,
2919 Some(crate::parser::GeneratedKind::Stored)
2920 ) {
2921 let val = eval_expr(
2922 col.generated_expr.as_ref().unwrap(),
2923 &EvalCtx::new(col_map, &new_row),
2924 )?;
2925 let pos = col.position as usize;
2926 new_row[pos] = if val.is_null() {
2927 if !col.nullable {
2928 return Err(SqlError::NotNullViolation(col.name.clone()));
2929 }
2930 Value::Null
2931 } else {
2932 let got = val.data_type();
2933 val.coerce_into(col.data_type)
2934 .ok_or_else(|| SqlError::TypeMismatch {
2935 expected: col.data_type.to_string(),
2936 got: got.to_string(),
2937 })?
2938 };
2939 }
2940 }
2941
2942 let pk_indices = table_schema.pk_indices();
2943 let assigned_pk = assignments.iter().any(|(ci, _)| pk_indices.contains(ci));
2944 let pk_changed = assigned_pk && pk_indices.iter().any(|&i| old_row[i] != new_row[i]);
2945
2946 for (assigned_idx, _) in assignments {
2947 let col = &table_schema.columns[*assigned_idx];
2948 if !col.nullable && new_row[col.position as usize].is_null() {
2949 return Err(SqlError::NotNullViolation(col.name.clone()));
2950 }
2951 }
2952 if table_schema.has_checks() {
2953 for col in &table_schema.columns {
2954 if let Some(ref check) = col.check_expr {
2955 let ctx = EvalCtx::new(col_map, &new_row);
2956 let result = eval_expr(check, &ctx)?;
2957 if !is_truthy(&result) && !result.is_null() {
2958 let name = col.check_name.as_deref().unwrap_or(&col.name);
2959 return Err(SqlError::CheckViolation(name.to_string()));
2960 }
2961 }
2962 }
2963 for tc in &table_schema.check_constraints {
2964 let ctx = EvalCtx::new(col_map, &new_row);
2965 let result = eval_expr(&tc.expr, &ctx)?;
2966 if !is_truthy(&result) && !result.is_null() {
2967 let name = tc.name.as_deref().unwrap_or(&tc.sql);
2968 return Err(SqlError::CheckViolation(name.to_string()));
2969 }
2970 }
2971 }
2972 for fk in &table_schema.foreign_keys {
2973 let changed = fk
2974 .columns
2975 .iter()
2976 .any(|&ci| old_row[ci as usize] != new_row[ci as usize]);
2977 if !changed {
2978 continue;
2979 }
2980 let any_null = fk.columns.iter().any(|&ci| new_row[ci as usize].is_null());
2981 if any_null {
2982 continue;
2983 }
2984 let fk_vals: Vec<Value> = fk
2985 .columns
2986 .iter()
2987 .map(|&ci| new_row[ci as usize].clone())
2988 .collect();
2989 let fk_key = crate::encoding::encode_composite_key(&fk_vals);
2990 if fk.deferrable && fk.initially_deferred {
2991 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
2992 wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
2993 fk_name: name,
2994 foreign_table: fk.foreign_table.as_bytes().to_vec(),
2995 parent_key: fk_key,
2996 });
2997 continue;
2998 }
2999 if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key) {
3000 let found = wtx
3001 .table_get(fk.foreign_table.as_bytes(), &fk_key)
3002 .map_err(SqlError::Storage)?;
3003 if found.is_none() {
3004 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
3005 return Err(SqlError::ForeignKeyViolation(name.to_string()));
3006 }
3007 wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key);
3008 }
3009 }
3010
3011 let has_indices = !table_schema.indices.is_empty();
3012 let old_pk_values: Vec<Value> = if has_indices || pk_changed {
3013 pk_indices.iter().map(|&i| old_row[i].clone()).collect()
3014 } else {
3015 Vec::new()
3016 };
3017 let new_pk_values: Vec<Value> = if has_indices || pk_changed {
3018 pk_indices.iter().map(|&i| new_row[i].clone()).collect()
3019 } else {
3020 Vec::new()
3021 };
3022
3023 let non_pk = table_schema.non_pk_indices();
3024 let enc_pos = table_schema.encoding_positions();
3025 let phys_count = table_schema.physical_non_pk_count();
3026 let dropped = table_schema.dropped_non_pk_slots();
3027 let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
3028 for &slot in dropped {
3029 value_values[slot as usize] = Value::Null;
3030 }
3031 for (j, &i) in non_pk.iter().enumerate() {
3032 let col = &table_schema.columns[i];
3033 value_values[enc_pos[j] as usize] = if matches!(
3034 col.generated_kind,
3035 Some(crate::parser::GeneratedKind::Virtual)
3036 ) {
3037 Value::Null
3038 } else {
3039 new_row[i].clone()
3040 };
3041 }
3042 let mut new_value_buf = Vec::with_capacity(256);
3043 crate::encoding::encode_row_into(&value_values, &mut new_value_buf);
3044
3045 if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
3046 return Err(SqlError::RowTooLarge {
3047 size: new_value_buf.len(),
3048 max: citadel_core::MAX_VALUE_SIZE,
3049 });
3050 }
3051
3052 if pk_changed {
3053 let new_pk_key = crate::encoding::encode_composite_key(&new_pk_values);
3054 let inserted = wtx
3055 .table_insert(table_schema.name.as_bytes(), &new_pk_key, &new_value_buf)
3056 .map_err(SqlError::Storage)?;
3057 if !inserted {
3058 return Err(SqlError::DuplicateKey);
3059 }
3060 wtx.table_delete(table_schema.name.as_bytes(), old_pk_key)
3061 .map_err(SqlError::Storage)?;
3062 for idx in &table_schema.indices {
3063 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
3064 let old_idx_key =
3065 encode_index_key_with_schema(idx, old_row, &old_pk_values, table_schema);
3066 wtx.table_delete(&idx_table, &old_idx_key)
3067 .map_err(SqlError::Storage)?;
3068 let new_idx_key =
3069 encode_index_key_with_schema(idx, &new_row, &new_pk_values, table_schema);
3070 let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
3071 let is_new = wtx
3072 .table_insert(&idx_table, &new_idx_key, &new_idx_val)
3073 .map_err(SqlError::Storage)?;
3074 if idx.unique && !is_new {
3075 let any_null = idx
3076 .column_positions_iter()
3077 .any(|c| new_row[c as usize].is_null());
3078 if !any_null {
3079 return Err(SqlError::UniqueViolation(idx.name.clone()));
3080 }
3081 }
3082 }
3083 } else {
3084 wtx.table_update_sorted(
3085 table_schema.name.as_bytes(),
3086 &[(old_pk_key, new_value_buf.as_slice())],
3087 )
3088 .map_err(SqlError::Storage)?;
3089 let col_map_partial = any_partial_index(table_schema).then(|| table_schema.column_map());
3090 for idx in &table_schema.indices {
3091 let cols_changed = index_columns_changed(idx, old_row, &new_row);
3092 let (del, ins) = partial_idx_update_actions(
3093 idx,
3094 old_row,
3095 &new_row,
3096 cols_changed,
3097 false,
3098 col_map_partial,
3099 );
3100 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
3101 if del {
3102 let old_idx_key =
3103 encode_index_key_with_schema(idx, old_row, &old_pk_values, table_schema);
3104 wtx.table_delete(&idx_table, &old_idx_key)
3105 .map_err(SqlError::Storage)?;
3106 }
3107 if ins {
3108 let new_idx_key =
3109 encode_index_key_with_schema(idx, &new_row, &new_pk_values, table_schema);
3110 let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
3111 let is_new = wtx
3112 .table_insert(&idx_table, &new_idx_key, &new_idx_val)
3113 .map_err(SqlError::Storage)?;
3114 if idx.unique && !is_new {
3115 let any_null = idx
3116 .column_positions_iter()
3117 .any(|c| new_row[c as usize].is_null());
3118 if !any_null {
3119 return Err(SqlError::UniqueViolation(idx.name.clone()));
3120 }
3121 }
3122 }
3123 }
3124 }
3125
3126 if capture_returning {
3127 Ok(InsertRowOutcome::Updated {
3128 old: old_row.to_vec(),
3129 new: new_row,
3130 })
3131 } else {
3132 Ok(InsertRowOutcome::Inserted)
3133 }
3134}
3135
3136fn detect_fast_paths(
3137 ts: &TableSchema,
3138 assignments: &[(usize, Expr)],
3139) -> Option<Vec<DoUpdateFastPath>> {
3140 let non_pk = ts.non_pk_indices();
3141 let enc_pos = ts.encoding_positions();
3142 let mut out = Vec::with_capacity(assignments.len());
3143 for (col_idx, expr) in assignments {
3144 let col = &ts.columns[*col_idx];
3145 if col.data_type != DataType::Integer {
3146 return None;
3147 }
3148 let nonpk_order = non_pk.iter().position(|&i| i == *col_idx)?;
3149 let phys_idx = enc_pos[nonpk_order] as usize;
3150
3151 if let Expr::BinaryOp { left, op, right } = expr {
3152 if !matches!(op, BinOp::Add | BinOp::Sub) {
3153 return None;
3154 }
3155 let reads_target =
3156 matches!(left.as_ref(), Expr::Column(n) if n.eq_ignore_ascii_case(&col.name));
3157 if !reads_target {
3158 return None;
3159 }
3160 if let Expr::Literal(Value::Integer(n)) = right.as_ref() {
3161 let delta = if matches!(op, BinOp::Sub) { -n } else { *n };
3162 let _ = col_idx;
3163 out.push(DoUpdateFastPath::IntAddConst { phys_idx, delta });
3164 continue;
3165 }
3166 return None;
3167 }
3168 return None;
3169 }
3170 Some(out)
3171}
3172
3173fn compile_on_conflict(oc: &OnConflictClause, ts: &TableSchema) -> Result<CompiledOnConflict> {
3174 let target = oc
3175 .target
3176 .as_ref()
3177 .map(|t| resolve_conflict_target(t, ts))
3178 .transpose()?;
3179 match &oc.action {
3180 OnConflictAction::DoNothing => Ok(CompiledOnConflict::DoNothing { target }),
3181 OnConflictAction::DoUpdate {
3182 assignments,
3183 where_clause,
3184 } => {
3185 let target = target.ok_or_else(|| {
3186 SqlError::Plan("ON CONFLICT without target requires DO NOTHING".into())
3187 })?;
3188 let compiled_assignments: Vec<(usize, Expr)> = assignments
3189 .iter()
3190 .map(|(name, expr)| {
3191 let col_idx = ts
3192 .column_index(name)
3193 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?;
3194 Ok((col_idx, expr.clone()))
3195 })
3196 .collect::<Result<_>>()?;
3197 let fast_paths = if where_clause.is_none() {
3198 detect_fast_paths(ts, &compiled_assignments)
3199 } else {
3200 None
3201 };
3202 Ok(CompiledOnConflict::DoUpdate {
3203 target,
3204 assignments: compiled_assignments,
3205 where_clause: where_clause.clone(),
3206 fast_paths,
3207 })
3208 }
3209 }
3210}
3211
3212fn exec_insert_trivial_fast(
3214 wtx: &mut WriteTxn<'_>,
3215 table_lower: &str,
3216 cache: &InsertCache,
3217 bufs: &mut InsertBufs,
3218 params: &[Value],
3219) -> Result<ExecutionResult> {
3220 let prog = cache
3221 .trivial_fast_program
3222 .as_ref()
3223 .expect("trivial fast: program");
3224
3225 for &p in &prog.not_null_param_indices {
3226 if params[p as usize].is_null() {
3227 return Err(SqlError::NotNullViolation(format!("param@{p}")));
3228 }
3229 }
3230
3231 match ¶ms[prog.pk_param as usize] {
3232 Value::Integer(v) => crate::encoding::encode_int_key_into(*v, &mut bufs.key_buf),
3233 _ => return Err(SqlError::InvalidValue("non-integer PK in fast path".into())),
3234 }
3235
3236 bufs.value_buf.clear();
3237 bufs.value_buf.extend_from_slice(&prog.template);
3238
3239 for op in &prog.ops {
3240 match op {
3241 WriteOp::ParamI64 { param_idx, off } => match ¶ms[*param_idx as usize] {
3242 Value::Integer(v) => {
3243 let off = *off as usize;
3244 bufs.value_buf[off..off + 8].copy_from_slice(&v.to_le_bytes());
3245 }
3246 other => {
3247 return Err(SqlError::TypeMismatch {
3248 expected: "Integer".into(),
3249 got: other.data_type().to_string(),
3250 });
3251 }
3252 },
3253 WriteOp::LiteralI64 { value, off } => {
3254 let off = *off as usize;
3255 bufs.value_buf[off..off + 8].copy_from_slice(&value.to_le_bytes());
3256 }
3257 WriteOp::GenAddParamsI64 {
3258 a_param,
3259 b_param,
3260 off,
3261 bitmap_byte_off,
3262 bitmap_bit_mask,
3263 } => match (¶ms[*a_param as usize], ¶ms[*b_param as usize]) {
3264 (Value::Integer(a), Value::Integer(b)) => {
3265 let off = *off as usize;
3266 bufs.value_buf[off..off + 8].copy_from_slice(&a.wrapping_add(*b).to_le_bytes());
3267 }
3268 _ => {
3269 bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
3270 }
3271 },
3272 WriteOp::GenMulAddParamI64 {
3273 param_idx,
3274 mul,
3275 add,
3276 off,
3277 bitmap_byte_off,
3278 bitmap_bit_mask,
3279 } => match ¶ms[*param_idx as usize] {
3280 Value::Integer(v) => {
3281 let r = v.wrapping_mul(*mul).wrapping_add(*add);
3282 let off = *off as usize;
3283 bufs.value_buf[off..off + 8].copy_from_slice(&r.to_le_bytes());
3284 }
3285 _ => {
3286 bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
3287 }
3288 },
3289 }
3290 }
3291
3292 let is_new = wtx
3293 .table_insert(table_lower.as_bytes(), &bufs.key_buf, &bufs.value_buf)
3294 .map_err(SqlError::Storage)?;
3295 if !is_new {
3296 return Err(SqlError::DuplicateKey);
3297 }
3298 Ok(ExecutionResult::RowsAffected(1))
3299}
3300
3301fn build_bind_plan(
3302 stmt: &InsertStmt,
3303 col_indices: &[usize],
3304 col_data_types: &[DataType],
3305) -> Option<Vec<BindAction>> {
3306 let rows = match &stmt.source {
3307 InsertSource::Values(rows) => rows,
3308 _ => return None,
3309 };
3310 if rows.len() != 1 {
3311 return None;
3312 }
3313 let value_row = &rows[0];
3314 if value_row.len() != col_indices.len() {
3315 return None;
3316 }
3317 let mut plan = Vec::with_capacity(value_row.len());
3318 for (i, expr) in value_row.iter().enumerate() {
3319 let col_idx = col_indices[i];
3320 let target = col_data_types[col_idx];
3321 match expr {
3322 Expr::Parameter(n) => {
3323 if *n == 0 {
3324 return None;
3325 }
3326 plan.push(BindAction::Param {
3327 param_idx: n - 1,
3328 col_idx,
3329 target,
3330 });
3331 }
3332 Expr::Literal(v) => plan.push(BindAction::Literal {
3333 value: v.clone(),
3334 col_idx,
3335 }),
3336 _ => return None,
3337 }
3338 }
3339 Some(plan)
3340}
3341
3342impl CompiledInsert {
3343 pub fn try_compile(schema: &SchemaManager, stmt: &InsertStmt) -> Option<Self> {
3344 let lower = stmt.table.to_ascii_lowercase();
3345 let cached = if let Some(ts) = schema.get(&lower) {
3346 let insert_columns: Vec<&str> = if stmt.columns.is_empty() {
3347 ts.columns.iter().map(|c| c.name.as_str()).collect()
3348 } else {
3349 stmt.columns.iter().map(|s| s.as_str()).collect()
3350 };
3351 let mut col_indices = Vec::with_capacity(insert_columns.len());
3352 for name in &insert_columns {
3353 col_indices.push(ts.column_index(name)?);
3354 }
3355 if col_indices
3356 .iter()
3357 .any(|&ci| ts.columns[ci].generated_kind.is_some())
3358 {
3359 return None;
3360 }
3361 let on_conflict = stmt
3362 .on_conflict
3363 .as_ref()
3364 .map(|oc| compile_on_conflict(oc, ts))
3365 .transpose()
3366 .ok()
3367 .flatten()
3368 .map(Arc::new);
3369 let generated_col_positions: Vec<usize> = ts
3370 .columns
3371 .iter()
3372 .enumerate()
3373 .filter_map(|(i, c)| {
3374 matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored))
3375 .then_some(i)
3376 })
3377 .collect();
3378 let generated_fast_evals: Vec<FastGenEval> = generated_col_positions
3379 .iter()
3380 .map(|&pos| {
3381 detect_fast_gen_eval(ts.columns[pos].generated_expr.as_ref().unwrap(), ts)
3382 })
3383 .collect();
3384 let row_col_map = if on_conflict.is_some() || !generated_col_positions.is_empty() {
3385 Some(ColumnMap::new(&ts.columns))
3386 } else {
3387 None
3388 };
3389 let pk_indices: Vec<usize> = ts.pk_indices().to_vec();
3390 let non_pk_indices: Vec<usize> = ts.non_pk_indices().to_vec();
3391 let encoding_positions: Vec<u16> = ts.encoding_positions().to_vec();
3392 let dropped_non_pk_slots: Vec<u16> = ts.dropped_non_pk_slots().to_vec();
3393 let phys_count = ts.physical_non_pk_count();
3394 let col_data_types: Vec<DataType> = ts.columns.iter().map(|c| c.data_type).collect();
3395 let single_int_pk =
3396 pk_indices.len() == 1 && ts.columns[pk_indices[0]].data_type == DataType::Integer;
3397 let not_null_indices: Vec<u16> = ts
3398 .columns
3399 .iter()
3400 .filter(|c| !c.nullable)
3401 .map(|c| c.position)
3402 .collect();
3403 let bind_plan = build_bind_plan(stmt, &col_indices, &col_data_types);
3404 let any_defaults_flag = ts.columns.iter().any(|c| c.default_expr.is_some());
3405 let row_fully_overwritten = if any_defaults_flag {
3406 false
3407 } else {
3408 let mut covered: rustc_hash::FxHashSet<usize> =
3409 col_indices.iter().copied().collect();
3410 covered.extend(generated_col_positions.iter().copied());
3411 for (j, &i) in non_pk_indices.iter().enumerate() {
3412 let _ = j;
3413 if matches!(
3414 ts.columns[i].generated_kind,
3415 Some(crate::parser::GeneratedKind::Virtual)
3416 ) {
3417 covered.insert(i);
3418 }
3419 }
3420 bind_plan.is_some() && covered.len() == ts.columns.len()
3421 };
3422 let has_fks = !ts.foreign_keys.is_empty();
3423 let has_indices = !ts.indices.is_empty();
3424 let mut non_virtual_pairs: Vec<(usize, usize)> = Vec::new();
3425 let mut null_value_slots: Vec<usize> =
3426 dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3427 for (j, &i) in non_pk_indices.iter().enumerate() {
3428 let slot = encoding_positions[j] as usize;
3429 if matches!(
3430 ts.columns[i].generated_kind,
3431 Some(crate::parser::GeneratedKind::Virtual)
3432 ) {
3433 null_value_slots.push(slot);
3434 } else {
3435 non_virtual_pairs.push((i, slot));
3436 }
3437 }
3438 let row_encoder = {
3439 let all_int_or_null = non_pk_indices.iter().enumerate().all(|(j, &i)| {
3440 let col = &ts.columns[i];
3441 if matches!(
3442 col.generated_kind,
3443 Some(crate::parser::GeneratedKind::Virtual)
3444 ) {
3445 true
3446 } else {
3447 col.data_type == DataType::Integer && encoding_positions[j] != u16::MAX
3448 }
3449 });
3450 if all_int_or_null {
3451 let mut null_slots: Vec<usize> =
3452 dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3453 for (j, &i) in non_pk_indices.iter().enumerate() {
3454 if matches!(
3455 ts.columns[i].generated_kind,
3456 Some(crate::parser::GeneratedKind::Virtual)
3457 ) {
3458 null_slots.push(encoding_positions[j] as usize);
3459 }
3460 }
3461 Some(crate::encoding::build_int_row_template(
3462 phys_count,
3463 &null_slots,
3464 ))
3465 } else {
3466 None
3467 }
3468 };
3469 let is_trivial_fast_eligible = !insert_has_subquery(stmt)
3470 && !ts.columns.iter().any(|c| c.default_expr.is_some())
3471 && !ts.has_checks()
3472 && !has_fks
3473 && !has_indices
3474 && stmt.on_conflict.is_none()
3475 && stmt.returning.is_none()
3476 && bind_plan.is_some()
3477 && row_encoder.is_some()
3478 && row_fully_overwritten
3479 && single_int_pk
3480 && generated_fast_evals
3481 .iter()
3482 .all(|fe| !matches!(fe, FastGenEval::None));
3483 let trivial_fast_program = if is_trivial_fast_eligible {
3484 build_trivial_fast_program(
3485 bind_plan.as_ref().unwrap(),
3486 row_encoder.as_ref().unwrap(),
3487 &non_virtual_pairs,
3488 &generated_col_positions,
3489 &generated_fast_evals,
3490 &pk_indices,
3491 &ts.columns,
3492 )
3493 } else {
3494 None
3495 };
3496 let is_trivial_fast = trivial_fast_program.is_some();
3497 let has_checks = ts.has_checks();
3498 let any_defaults = ts.columns.iter().any(|c| c.default_expr.is_some());
3499 let needs_scoped_params = bind_plan.is_none()
3500 || has_checks
3501 || any_defaults
3502 || !generated_col_positions.is_empty()
3503 || on_conflict.is_some()
3504 || stmt.returning.is_some()
3505 || insert_has_subquery(stmt)
3506 || super::helpers::any_partial_index(ts);
3507 Some(InsertCache {
3508 col_indices,
3509 has_subquery: insert_has_subquery(stmt),
3510 any_defaults,
3511 has_checks,
3512 on_conflict,
3513 row_col_map,
3514 generated_col_positions,
3515 generated_fast_evals,
3516 pk_indices,
3517 non_pk_indices,
3518 encoding_positions,
3519 dropped_non_pk_slots,
3520 phys_count,
3521 single_int_pk,
3522 not_null_indices,
3523 bind_plan,
3524 row_fully_overwritten,
3525 row_encoder,
3526 is_trivial_fast,
3527 trivial_fast_program,
3528 needs_scoped_params,
3529 })
3530 } else if schema.get_view(&lower).is_some() {
3531 None
3532 } else {
3533 return None;
3534 };
3535 Some(Self {
3536 table_lower: lower,
3537 cached,
3538 })
3539 }
3540}
3541
3542impl CompiledPlan for CompiledInsert {
3543 fn execute(
3544 &self,
3545 db: &Database,
3546 schema: &SchemaManager,
3547 stmt: &Statement,
3548 params: &[Value],
3549 txn: super::compile::ActiveTxnRef<'_, '_>,
3550 ) -> Result<ExecutionResult> {
3551 let ins = match stmt {
3552 Statement::Insert(i) => i,
3553 _ => {
3554 return Err(SqlError::Unsupported(
3555 "CompiledInsert received non-INSERT statement".into(),
3556 ))
3557 }
3558 };
3559 use super::compile::ActiveTxnRef;
3560 match txn {
3561 ActiveTxnRef::None => exec_insert(db, schema, ins, params),
3562 ActiveTxnRef::Read(_) => Err(SqlError::Unsupported(
3563 "cannot execute mutating statement inside a read-only transaction".into(),
3564 )),
3565 ActiveTxnRef::Write(outer) => match self.cached.as_ref() {
3566 Some(c) if c.is_trivial_fast => with_insert_scratch(|bufs| {
3567 exec_insert_trivial_fast(outer, &self.table_lower, c, bufs, params)
3568 }),
3569 Some(c) => exec_insert_in_txn_cached(outer, schema, ins, params, c),
3570 None => exec_insert_in_txn(outer, schema, ins, params),
3571 },
3572 }
3573 }
3574
3575 fn uses_scoped_params(&self) -> bool {
3576 match self.cached.as_ref() {
3577 Some(c) => !c.is_trivial_fast && c.needs_scoped_params,
3578 None => true,
3579 }
3580 }
3581}
3582
3583pub struct CompiledDelete {
3584 table_lower: String,
3585}
3586
3587impl CompiledDelete {
3588 pub fn try_compile(schema: &SchemaManager, stmt: &DeleteStmt) -> Option<Self> {
3589 let lower = stmt.table.to_ascii_lowercase();
3590 schema.get(&lower)?;
3591 Some(Self { table_lower: lower })
3592 }
3593}
3594
3595impl CompiledPlan for CompiledDelete {
3596 fn execute(
3597 &self,
3598 db: &Database,
3599 schema: &SchemaManager,
3600 stmt: &Statement,
3601 _params: &[Value],
3602 txn: super::compile::ActiveTxnRef<'_, '_>,
3603 ) -> Result<ExecutionResult> {
3604 let del = match stmt {
3605 Statement::Delete(d) => d,
3606 _ => {
3607 return Err(SqlError::Unsupported(
3608 "CompiledDelete received non-DELETE statement".into(),
3609 ))
3610 }
3611 };
3612 let _ = &self.table_lower;
3613 use super::compile::ActiveTxnRef;
3614 match txn {
3615 ActiveTxnRef::None => super::write::exec_delete(db, schema, del),
3616 ActiveTxnRef::Read(_) => Err(SqlError::Unsupported(
3617 "cannot execute mutating statement inside a read-only transaction".into(),
3618 )),
3619 ActiveTxnRef::Write(outer) => super::write::exec_delete_in_txn(outer, schema, del),
3620 }
3621 }
3622}
3623
3624fn exec_instead_of_view_insert_auto(
3625 db: &Database,
3626 schema: &SchemaManager,
3627 view_name: &str,
3628 aliases: &[String],
3629 stmt: &InsertStmt,
3630 params: &[Value],
3631) -> Result<ExecutionResult> {
3632 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
3633 let r = exec_instead_of_view_insert_in_txn(&mut wtx, schema, view_name, aliases, stmt, params)?;
3634 wtx.commit().map_err(SqlError::Storage)?;
3635 Ok(r)
3636}
3637
3638fn exec_instead_of_view_insert_in_txn(
3639 wtx: &mut WriteTxn<'_>,
3640 schema: &SchemaManager,
3641 view_name: &str,
3642 aliases: &[String],
3643 stmt: &InsertStmt,
3644 params: &[Value],
3645) -> Result<ExecutionResult> {
3646 let resolved_aliases: Vec<String> = if aliases.is_empty() {
3648 derive_view_columns(wtx, schema, view_name)?
3649 } else {
3650 aliases.to_vec()
3651 };
3652 let view_cols = super::triggers::view_columns_from_aliases(&resolved_aliases);
3653 let alias_map: rustc_hash::FxHashMap<String, usize> = resolved_aliases
3654 .iter()
3655 .enumerate()
3656 .map(|(i, name)| (name.to_ascii_lowercase(), i))
3657 .collect();
3658
3659 let target_positions: Vec<usize> = if stmt.columns.is_empty() {
3660 (0..resolved_aliases.len()).collect()
3661 } else {
3662 stmt.columns
3663 .iter()
3664 .map(|c| {
3665 alias_map
3666 .get(&c.to_ascii_lowercase())
3667 .copied()
3668 .ok_or_else(|| SqlError::ColumnNotFound(c.clone()))
3669 })
3670 .collect::<Result<_>>()?
3671 };
3672
3673 let source_rows: Vec<Vec<Value>> = match &stmt.source {
3674 InsertSource::Values(rows) => {
3675 let mut out = Vec::with_capacity(rows.len());
3676 for row in rows {
3677 if row.len() != target_positions.len() {
3678 return Err(SqlError::InvalidValue(format!(
3679 "expected {} values, got {}",
3680 target_positions.len(),
3681 row.len()
3682 )));
3683 }
3684 let mut vals = Vec::with_capacity(row.len());
3685 for expr in row {
3686 let v = match expr {
3687 Expr::Parameter(n) => params
3688 .get(n - 1)
3689 .cloned()
3690 .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
3691 Expr::Literal(v) => v.clone(),
3692 other => eval_const_expr(other)?,
3693 };
3694 vals.push(v);
3695 }
3696 out.push(vals);
3697 }
3698 out
3699 }
3700 InsertSource::Select(sq) => {
3701 let empty_ctes = CteContext::default();
3702 let qr = exec_query_body_write(wtx, schema, &sq.body, &empty_ctes)?;
3703 qr.rows
3704 }
3705 };
3706
3707 let mut count: u64 = 0;
3708 for row in source_rows {
3709 if row.len() != target_positions.len() {
3710 return Err(SqlError::InvalidValue(format!(
3711 "expected {} values, got {}",
3712 target_positions.len(),
3713 row.len()
3714 )));
3715 }
3716 let mut new_row = vec![Value::Null; resolved_aliases.len()];
3717 for (slot, val) in target_positions.iter().zip(row) {
3718 new_row[*slot] = val;
3719 }
3720 super::triggers::fire_row_triggers(
3721 wtx,
3722 schema,
3723 view_name,
3724 crate::parser::TriggerTiming::InsteadOf,
3725 super::triggers::FireEvent::Insert,
3726 None,
3727 Some(new_row),
3728 &view_cols,
3729 )?;
3730 count += 1;
3731 }
3732 Ok(ExecutionResult::RowsAffected(count))
3733}
3734
3735fn derive_view_columns(
3736 wtx: &mut WriteTxn<'_>,
3737 schema: &SchemaManager,
3738 view_name: &str,
3739) -> Result<Vec<String>> {
3740 use crate::parser::{QueryBody, SelectColumn, SelectQuery, SelectStmt};
3741 let sel = SelectStmt {
3742 columns: vec![SelectColumn::AllColumns],
3743 from: view_name.to_string(),
3744 from_alias: None,
3745 from_subquery: None,
3746 from_args: None,
3747 from_json_table: None,
3748 joins: vec![],
3749 distinct: false,
3750 where_clause: None,
3751 order_by: vec![],
3752 limit: Some(Expr::Literal(Value::Integer(1))),
3753 offset: None,
3754 group_by: vec![],
3755 having: None,
3756 };
3757 let sq = SelectQuery {
3758 ctes: vec![],
3759 recursive: false,
3760 body: QueryBody::Select(Box::new(sel)),
3761 };
3762 let qr = super::cte::exec_select_query_in_txn(wtx, schema, &sq)?;
3763 match qr {
3764 ExecutionResult::Query(q) => Ok(q.columns),
3765 _ => Ok(Vec::new()),
3766 }
3767}
3768
3769#[cfg(test)]
3770#[path = "dml_tests.rs"]
3771mod tests;