1use std::cell::RefCell;
2use std::sync::Arc;
3
4use citadel::Database;
5use citadel_buffer::btree::{UpsertAction, UpsertOutcome};
6use citadel_txn::write_txn::WriteTxn;
7use rustc_hash::FxHashMap;
8
9use crate::encoding::{encode_composite_key_into, encode_row_into};
10use crate::error::{Result, SqlError};
11use crate::eval::{eval_expr, is_truthy, ColumnMap, EvalCtx};
12use crate::parser::*;
13use crate::types::*;
14
15use crate::schema::SchemaManager;
16
17use super::compile::CompiledPlan;
18use super::helpers::*;
19use super::CteContext;
20
21pub(super) fn exec_insert(
22 db: &Database,
23 schema: &SchemaManager,
24 stmt: &InsertStmt,
25 params: &[Value],
26) -> Result<ExecutionResult> {
27 let empty_ctes = CteContext::default();
28 let materialized;
29 let stmt = if insert_has_subquery(stmt) {
30 materialized = materialize_insert(stmt, &mut |sub| {
31 exec_subquery_read(db, schema, sub, &empty_ctes)
32 })?;
33 &materialized
34 } else {
35 stmt
36 };
37
38 let lower_name = stmt.table.to_ascii_lowercase();
39 if let Some(view_def) = schema.get_view(&lower_name) {
40 if super::triggers::has_instead_of(schema, &lower_name, super::triggers::FireEvent::Insert)
41 {
42 let aliases = view_def.column_aliases.clone();
43 return exec_instead_of_view_insert_auto(
44 db,
45 schema,
46 &lower_name,
47 &aliases,
48 stmt,
49 params,
50 );
51 }
52 return Err(SqlError::CannotModifyView(stmt.table.clone()));
53 }
54 if schema.get_matview(&lower_name).is_some() {
55 return Err(SqlError::CannotModifyView(format!(
56 "materialized view '{}' is read-only — use REFRESH MATERIALIZED VIEW",
57 stmt.table
58 )));
59 }
60 let table_schema = schema
61 .get(&lower_name)
62 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
63
64 let insert_columns = if stmt.columns.is_empty() {
65 table_schema
66 .columns
67 .iter()
68 .map(|c| c.name.clone())
69 .collect::<Vec<_>>()
70 } else {
71 stmt.columns
72 .iter()
73 .map(|c| c.to_ascii_lowercase())
74 .collect()
75 };
76
77 let col_indices: Vec<usize> = insert_columns
78 .iter()
79 .map(|name| {
80 table_schema
81 .column_index(name)
82 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
83 })
84 .collect::<Result<_>>()?;
85
86 for &ci in &col_indices {
87 if table_schema.columns[ci].generated_kind.is_some() {
88 return Err(SqlError::CannotInsertIntoGeneratedColumn(
89 table_schema.columns[ci].name.clone(),
90 ));
91 }
92 }
93
94 let defaults: Vec<(usize, &Expr)> = table_schema
95 .columns
96 .iter()
97 .filter(|c| c.default_expr.is_some() && !col_indices.contains(&(c.position as usize)))
98 .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
99 .collect();
100
101 let generated_cols: Vec<(usize, &Expr)> = table_schema
102 .columns
103 .iter()
104 .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
105 .map(|c| (c.position as usize, c.generated_expr.as_ref().unwrap()))
106 .collect();
107
108 let has_checks = table_schema.has_checks();
109 let strict = table_schema.is_strict();
110 let row_col_map_for_gen = if !generated_cols.is_empty() {
111 Some(ColumnMap::new(&table_schema.columns))
112 } else {
113 None
114 };
115 let check_col_map = if has_checks {
116 Some(ColumnMap::new(&table_schema.columns))
117 } else {
118 None
119 };
120
121 let select_rows = match &stmt.source {
122 InsertSource::Select(sq) => {
123 let insert_ctes =
124 super::materialize_all_ctes(&sq.ctes, sq.recursive, &mut |body, ctx| {
125 exec_query_body_read(db, schema, body, ctx)
126 })?;
127 let qr = exec_query_body_read(db, schema, &sq.body, &insert_ctes)?;
128 Some(qr.rows)
129 }
130 InsertSource::Values(_) => None,
131 };
132
133 let compiled_conflict: Option<Arc<CompiledOnConflict>> = stmt
134 .on_conflict
135 .as_ref()
136 .map(|oc| compile_on_conflict(oc, table_schema).map(Arc::new))
137 .transpose()?;
138
139 let row_col_map = compiled_conflict
140 .as_ref()
141 .map(|_| ColumnMap::new(&table_schema.columns));
142
143 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
144 let mut count: u64 = 0;
145 let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
146 stmt.returning.as_ref().map(|_| Vec::new());
147
148 let pk_indices = table_schema.pk_indices();
149 let non_pk = table_schema.non_pk_indices();
150 let enc_pos = table_schema.encoding_positions();
151 let phys_count = table_schema.physical_non_pk_count();
152 let mut row = vec![Value::Null; table_schema.columns.len()];
153 let mut pk_values: Vec<Value> = vec![Value::Null; pk_indices.len()];
154 let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
155 let mut key_buf: Vec<u8> = Vec::with_capacity(64);
156 let mut value_buf: Vec<u8> = Vec::with_capacity(256);
157 let mut fk_key_buf: Vec<u8> = Vec::with_capacity(64);
158
159 let values = match &stmt.source {
160 InsertSource::Values(rows) => Some(rows.as_slice()),
161 InsertSource::Select(_) => None,
162 };
163 let sel_rows = select_rows.as_deref();
164
165 let total = match (values, sel_rows) {
166 (Some(rows), _) => rows.len(),
167 (_, Some(rows)) => rows.len(),
168 _ => 0,
169 };
170
171 if let Some(sel) = sel_rows {
172 if !sel.is_empty() && sel[0].len() != insert_columns.len() {
173 return Err(SqlError::InvalidValue(format!(
174 "INSERT ... SELECT column count mismatch: expected {}, got {}",
175 insert_columns.len(),
176 sel[0].len()
177 )));
178 }
179 }
180
181 let has_insert_statement_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
182 t.enabled
183 && t.granularity == crate::parser::TriggerGranularity::ForEachStatement
184 && t.events
185 .iter()
186 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
187 });
188 let mut stmt_new_rows: Vec<Vec<Value>> = if has_insert_statement_triggers {
189 Vec::with_capacity(total)
190 } else {
191 Vec::new()
192 };
193
194 if has_insert_statement_triggers {
195 super::triggers::fire_statement_triggers(
196 &mut wtx,
197 schema,
198 &table_schema.name,
199 crate::parser::TriggerTiming::Before,
200 super::triggers::FireEvent::Insert,
201 &table_schema.columns,
202 &[],
203 &[],
204 )?;
205 }
206
207 for idx in 0..total {
208 for v in row.iter_mut() {
209 *v = Value::Null;
210 }
211
212 if let Some(value_rows) = values {
213 let value_row = &value_rows[idx];
214 if value_row.len() != insert_columns.len() {
215 return Err(SqlError::InvalidValue(format!(
216 "expected {} values, got {}",
217 insert_columns.len(),
218 value_row.len()
219 )));
220 }
221 for (i, expr) in value_row.iter().enumerate() {
222 let val = if let Expr::Parameter(n) = expr {
223 params
224 .get(n - 1)
225 .cloned()
226 .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?
227 } else {
228 eval_const_expr(expr)?
229 };
230 let col_idx = col_indices[i];
231 let col = &table_schema.columns[col_idx];
232 row[col_idx] = if val.is_null() {
233 Value::Null
234 } else {
235 coerce_for_column(val, col, strict)?
236 };
237 }
238 } else if let Some(sel) = sel_rows {
239 let sel_row = &sel[idx];
240 for (i, val) in sel_row.iter().enumerate() {
241 let col_idx = col_indices[i];
242 let col = &table_schema.columns[col_idx];
243 row[col_idx] = if val.is_null() {
244 Value::Null
245 } else {
246 coerce_for_column(val.clone(), col, strict)?
247 };
248 }
249 }
250
251 for &(pos, def_expr) in &defaults {
252 let val = eval_const_expr(def_expr)?;
253 let col = &table_schema.columns[pos];
254 if !val.is_null() {
255 row[pos] = coerce_for_column(val, col, strict)?;
256 }
257 }
258
259 if let Some(ref gen_map) = row_col_map_for_gen {
260 for &(pos, gen_expr) in &generated_cols {
261 let val = eval_expr(gen_expr, &EvalCtx::new(gen_map, &row))?;
262 let col = &table_schema.columns[pos];
263 row[pos] = if val.is_null() {
264 Value::Null
265 } else {
266 coerce_for_column(val, col, strict)?
267 };
268 }
269 }
270
271 for col in &table_schema.columns {
272 if !col.nullable && row[col.position as usize].is_null() {
273 return Err(SqlError::NotNullViolation(col.name.clone()));
274 }
275 }
276
277 if let Some(ref col_map) = check_col_map {
278 for col in &table_schema.columns {
279 if let Some(ref check) = col.check_expr {
280 let result = eval_expr(check, &EvalCtx::new(col_map, &row))?;
281 if !is_truthy(&result) && !result.is_null() {
282 let name = col.check_name.as_deref().unwrap_or(&col.name);
283 return Err(SqlError::CheckViolation(name.to_string()));
284 }
285 }
286 }
287 for tc in &table_schema.check_constraints {
288 let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &row))?;
289 if !is_truthy(&result) && !result.is_null() {
290 let name = tc.name.as_deref().unwrap_or(&tc.sql);
291 return Err(SqlError::CheckViolation(name.to_string()));
292 }
293 }
294 }
295
296 for fk in &table_schema.foreign_keys {
297 let any_null = fk.columns.iter().any(|&ci| row[ci as usize].is_null());
298 if any_null {
299 continue; }
301 let fk_vals: Vec<Value> = fk
302 .columns
303 .iter()
304 .map(|&ci| row[ci as usize].clone())
305 .collect();
306 fk_key_buf.clear();
307 encode_composite_key_into(&fk_vals, &mut fk_key_buf);
308 if fk.deferrable && fk.initially_deferred {
309 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
310 wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
311 fk_name: name,
312 foreign_table: fk.foreign_table.as_bytes().to_vec(),
313 parent_key: fk_key_buf.clone(),
314 });
315 continue;
316 }
317 if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key_buf) {
318 let found = wtx
319 .table_get(fk.foreign_table.as_bytes(), &fk_key_buf)
320 .map_err(SqlError::Storage)?;
321 if found.is_none() {
322 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
323 return Err(SqlError::ForeignKeyViolation(name.to_string()));
324 }
325 wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key_buf);
326 }
327 }
328
329 let proposed_row_for_returning: Option<Vec<Value>> =
330 returning_rows.as_ref().map(|_| row.clone());
331 let row_for_stmt_trigger: Option<Vec<Value>> = if has_insert_statement_triggers {
332 Some(row.clone())
333 } else {
334 None
335 };
336
337 let has_before_insert_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
338 t.enabled
339 && t.timing == crate::parser::TriggerTiming::Before
340 && t.granularity == crate::parser::TriggerGranularity::ForEachRow
341 && t.events
342 .iter()
343 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
344 });
345 if has_before_insert_triggers {
346 super::triggers::fire_row_triggers(
347 &mut wtx,
348 schema,
349 &table_schema.name,
350 crate::parser::TriggerTiming::Before,
351 super::triggers::FireEvent::Insert,
352 None,
353 Some(row.clone()),
354 &table_schema.columns,
355 )?;
356 }
357
358 for (j, &i) in pk_indices.iter().enumerate() {
359 pk_values[j] = std::mem::replace(&mut row[i], Value::Null);
360 }
361 encode_composite_key_into(&pk_values, &mut key_buf);
362
363 for (j, &i) in non_pk.iter().enumerate() {
364 let col = &table_schema.columns[i];
365 if matches!(
366 col.generated_kind,
367 Some(crate::parser::GeneratedKind::Virtual)
368 ) {
369 value_values[enc_pos[j] as usize] = Value::Null;
370 row[i] = Value::Null;
371 } else {
372 value_values[enc_pos[j] as usize] = std::mem::replace(&mut row[i], Value::Null);
373 }
374 }
375 encode_row_into(&value_values, &mut value_buf);
376
377 if key_buf.len() > citadel_core::MAX_KEY_SIZE {
378 return Err(SqlError::KeyTooLarge {
379 size: key_buf.len(),
380 max: citadel_core::MAX_KEY_SIZE,
381 });
382 }
383 if value_buf.len() > citadel_core::MAX_VALUE_SIZE {
384 return Err(SqlError::RowTooLarge {
385 size: value_buf.len(),
386 max: citadel_core::MAX_VALUE_SIZE,
387 });
388 }
389
390 match compiled_conflict.as_ref() {
391 None => {
392 let is_new = wtx
393 .table_insert(table_schema.name.as_bytes(), &key_buf, &value_buf)
394 .map_err(SqlError::Storage)?;
395 if !is_new {
396 return Err(SqlError::DuplicateKey);
397 }
398 let has_after_insert_triggers =
399 schema.triggers_for(&table_schema.name).iter().any(|t| {
400 t.enabled
401 && t.timing == crate::parser::TriggerTiming::After
402 && t.granularity == crate::parser::TriggerGranularity::ForEachRow
403 && t.events
404 .iter()
405 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
406 });
407 if !table_schema.indices.is_empty() || has_after_insert_triggers {
408 for (j, &i) in pk_indices.iter().enumerate() {
409 row[i] = pk_values[j].clone();
410 }
411 for (j, &i) in non_pk.iter().enumerate() {
412 row[i] =
413 std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
414 }
415 if !table_schema.indices.is_empty() {
416 insert_index_entries(&mut wtx, table_schema, &row, &pk_values)?;
417 }
418 if has_after_insert_triggers {
419 super::triggers::fire_row_triggers(
420 &mut wtx,
421 schema,
422 &table_schema.name,
423 crate::parser::TriggerTiming::After,
424 super::triggers::FireEvent::Insert,
425 None,
426 Some(row.clone()),
427 &table_schema.columns,
428 )?;
429 }
430 }
431 if let Some(r) = row_for_stmt_trigger.clone() {
432 stmt_new_rows.push(r);
433 }
434 count += 1;
435 if let Some(buf) = returning_rows.as_mut() {
436 buf.push((None, proposed_row_for_returning));
437 }
438 }
439 Some(oc) => {
440 let oc_ref: &CompiledOnConflict = oc;
441 let needs_row = upsert_needs_row(oc_ref, table_schema);
442 if needs_row {
443 for (j, &i) in pk_indices.iter().enumerate() {
444 row[i] = pk_values[j].clone();
445 }
446 for (j, &i) in non_pk.iter().enumerate() {
447 row[i] =
448 std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
449 }
450 }
451 let outcome = apply_insert_with_conflict(
452 &mut wtx,
453 table_schema,
454 &key_buf,
455 &value_buf,
456 &row,
457 &pk_values,
458 oc_ref,
459 row_col_map.as_ref().unwrap(),
460 stmt.returning.is_some(),
461 )?;
462 match outcome {
463 InsertRowOutcome::Inserted => {
464 count += 1;
465 if let Some(buf) = returning_rows.as_mut() {
466 buf.push((None, proposed_row_for_returning));
467 }
468 if let Some(r) = row_for_stmt_trigger.clone() {
469 stmt_new_rows.push(r);
470 }
471 let has_after_insert_triggers =
472 schema.triggers_for(&table_schema.name).iter().any(|t| {
473 t.enabled
474 && t.timing == crate::parser::TriggerTiming::After
475 && t.granularity
476 == crate::parser::TriggerGranularity::ForEachRow
477 && t.events
478 .iter()
479 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
480 });
481 if has_after_insert_triggers {
482 super::triggers::fire_row_triggers(
483 &mut wtx,
484 schema,
485 &table_schema.name,
486 crate::parser::TriggerTiming::After,
487 super::triggers::FireEvent::Insert,
488 None,
489 Some(row.clone()),
490 &table_schema.columns,
491 )?;
492 }
493 }
494 InsertRowOutcome::Updated { old, new } => {
495 count += 1;
496 if let Some(buf) = returning_rows.as_mut() {
497 buf.push((Some(old.clone()), Some(new.clone())));
498 }
499 let has_after_update_triggers =
500 schema.triggers_for(&table_schema.name).iter().any(|t| {
501 t.enabled
502 && t.timing == crate::parser::TriggerTiming::After
503 && t.granularity
504 == crate::parser::TriggerGranularity::ForEachRow
505 && t.events.iter().any(|e| {
506 matches!(e, crate::parser::TriggerEvent::Update(_))
507 })
508 });
509 if has_after_update_triggers {
510 let changed_cols: Vec<String> = match oc_ref {
511 CompiledOnConflict::DoUpdate { assignments, .. } => assignments
512 .iter()
513 .map(|(col_idx, _)| table_schema.columns[*col_idx].name.clone())
514 .collect(),
515 _ => Vec::new(),
516 };
517 super::triggers::fire_row_triggers(
518 &mut wtx,
519 schema,
520 &table_schema.name,
521 crate::parser::TriggerTiming::After,
522 super::triggers::FireEvent::Update {
523 changed_columns: &changed_cols,
524 },
525 Some(old),
526 Some(new),
527 &table_schema.columns,
528 )?;
529 }
530 }
531 InsertRowOutcome::Skipped => {}
532 }
533 }
534 }
535 }
536
537 if has_insert_statement_triggers {
538 super::triggers::fire_statement_triggers(
539 &mut wtx,
540 schema,
541 &table_schema.name,
542 crate::parser::TriggerTiming::After,
543 super::triggers::FireEvent::Insert,
544 &table_schema.columns,
545 &[],
546 &stmt_new_rows,
547 )?;
548 }
549
550 if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
551 let qr = super::helpers::project_returning(table_schema, returning_cols, &rows)?;
552 super::helpers::drain_deferred_fk_checks(&mut wtx)?;
553 wtx.commit().map_err(SqlError::Storage)?;
554 return Ok(ExecutionResult::Query(qr));
555 }
556
557 super::helpers::drain_deferred_fk_checks(&mut wtx)?;
558 wtx.commit().map_err(SqlError::Storage)?;
559 Ok(ExecutionResult::RowsAffected(count))
560}
561
562pub(super) fn has_subquery(expr: &Expr) -> bool {
563 crate::parser::has_subquery(expr)
564}
565
566pub(super) fn stmt_has_subquery(stmt: &SelectStmt) -> bool {
567 if let Some(ref w) = stmt.where_clause {
568 if has_subquery(w) {
569 return true;
570 }
571 }
572 if let Some(ref h) = stmt.having {
573 if has_subquery(h) {
574 return true;
575 }
576 }
577 for col in &stmt.columns {
578 if let SelectColumn::Expr { expr, .. } = col {
579 if has_subquery(expr) {
580 return true;
581 }
582 }
583 }
584 for ob in &stmt.order_by {
585 if has_subquery(&ob.expr) {
586 return true;
587 }
588 }
589 for join in &stmt.joins {
590 if let Some(ref on_expr) = join.on_clause {
591 if has_subquery(on_expr) {
592 return true;
593 }
594 }
595 }
596 false
597}
598
599pub(super) fn materialize_expr(
600 expr: &Expr,
601 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
602) -> Result<Expr> {
603 match expr {
604 Expr::InSubquery {
605 expr: e,
606 subquery,
607 negated,
608 } => {
609 let inner = materialize_expr(e, exec_sub)?;
610 let qr = exec_sub(subquery)?;
611 if !qr.columns.is_empty() && qr.columns.len() != 1 {
612 return Err(SqlError::SubqueryMultipleColumns);
613 }
614 let mut values = rustc_hash::FxHashSet::default();
615 let mut has_null = false;
616 for row in &qr.rows {
617 if row[0].is_null() {
618 has_null = true;
619 } else {
620 values.insert(row[0].clone());
621 }
622 }
623 Ok(Expr::InSet {
624 expr: Box::new(inner),
625 values,
626 has_null,
627 negated: *negated,
628 })
629 }
630 Expr::ScalarSubquery(subquery) => {
631 let qr = exec_sub(subquery)?;
632 if qr.rows.len() > 1 {
633 return Err(SqlError::SubqueryMultipleRows);
634 }
635 let val = if qr.rows.is_empty() {
636 Value::Null
637 } else {
638 qr.rows[0][0].clone()
639 };
640 Ok(Expr::Literal(val))
641 }
642 Expr::Exists { subquery, negated } => {
643 let qr = exec_sub(subquery)?;
644 let exists = !qr.rows.is_empty();
645 let result = if *negated { !exists } else { exists };
646 Ok(Expr::Literal(Value::Boolean(result)))
647 }
648 Expr::InList {
649 expr: e,
650 list,
651 negated,
652 } => {
653 let inner = materialize_expr(e, exec_sub)?;
654 let items = list
655 .iter()
656 .map(|item| materialize_expr(item, exec_sub))
657 .collect::<Result<Vec<_>>>()?;
658 Ok(Expr::InList {
659 expr: Box::new(inner),
660 list: items,
661 negated: *negated,
662 })
663 }
664 Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
665 left: Box::new(materialize_expr(left, exec_sub)?),
666 op: *op,
667 right: Box::new(materialize_expr(right, exec_sub)?),
668 }),
669 Expr::UnaryOp { op, expr: e } => Ok(Expr::UnaryOp {
670 op: *op,
671 expr: Box::new(materialize_expr(e, exec_sub)?),
672 }),
673 Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(materialize_expr(e, exec_sub)?))),
674 Expr::IsNotNull(e) => Ok(Expr::IsNotNull(Box::new(materialize_expr(e, exec_sub)?))),
675 Expr::InSet {
676 expr: e,
677 values,
678 has_null,
679 negated,
680 } => Ok(Expr::InSet {
681 expr: Box::new(materialize_expr(e, exec_sub)?),
682 values: values.clone(),
683 has_null: *has_null,
684 negated: *negated,
685 }),
686 Expr::Between {
687 expr: e,
688 low,
689 high,
690 negated,
691 } => Ok(Expr::Between {
692 expr: Box::new(materialize_expr(e, exec_sub)?),
693 low: Box::new(materialize_expr(low, exec_sub)?),
694 high: Box::new(materialize_expr(high, exec_sub)?),
695 negated: *negated,
696 }),
697 Expr::Like {
698 expr: e,
699 pattern,
700 escape,
701 negated,
702 } => {
703 let esc = escape
704 .as_ref()
705 .map(|es| materialize_expr(es, exec_sub).map(Box::new))
706 .transpose()?;
707 Ok(Expr::Like {
708 expr: Box::new(materialize_expr(e, exec_sub)?),
709 pattern: Box::new(materialize_expr(pattern, exec_sub)?),
710 escape: esc,
711 negated: *negated,
712 })
713 }
714 Expr::Case {
715 operand,
716 conditions,
717 else_result,
718 } => {
719 let op = operand
720 .as_ref()
721 .map(|e| materialize_expr(e, exec_sub).map(Box::new))
722 .transpose()?;
723 let conds = conditions
724 .iter()
725 .map(|(c, r)| {
726 Ok((
727 materialize_expr(c, exec_sub)?,
728 materialize_expr(r, exec_sub)?,
729 ))
730 })
731 .collect::<Result<Vec<_>>>()?;
732 let else_r = else_result
733 .as_ref()
734 .map(|e| materialize_expr(e, exec_sub).map(Box::new))
735 .transpose()?;
736 Ok(Expr::Case {
737 operand: op,
738 conditions: conds,
739 else_result: else_r,
740 })
741 }
742 Expr::Coalesce(args) => {
743 let materialized = args
744 .iter()
745 .map(|a| materialize_expr(a, exec_sub))
746 .collect::<Result<Vec<_>>>()?;
747 Ok(Expr::Coalesce(materialized))
748 }
749 Expr::Cast { expr: e, data_type } => Ok(Expr::Cast {
750 expr: Box::new(materialize_expr(e, exec_sub)?),
751 data_type: *data_type,
752 }),
753 Expr::Function {
754 name,
755 args,
756 distinct,
757 } => {
758 let materialized = args
759 .iter()
760 .map(|a| materialize_expr(a, exec_sub))
761 .collect::<Result<Vec<_>>>()?;
762 Ok(Expr::Function {
763 name: name.clone(),
764 args: materialized,
765 distinct: *distinct,
766 })
767 }
768 other => Ok(other.clone()),
769 }
770}
771
772pub(super) fn materialize_stmt(
773 stmt: &SelectStmt,
774 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
775) -> Result<SelectStmt> {
776 let where_clause = stmt
777 .where_clause
778 .as_ref()
779 .map(|e| materialize_expr(e, exec_sub))
780 .transpose()?;
781 let having = stmt
782 .having
783 .as_ref()
784 .map(|e| materialize_expr(e, exec_sub))
785 .transpose()?;
786 let columns = stmt
787 .columns
788 .iter()
789 .map(|c| match c {
790 SelectColumn::AllColumns => Ok(SelectColumn::AllColumns),
791 SelectColumn::AllFromOld => Ok(SelectColumn::AllFromOld),
792 SelectColumn::AllFromNew => Ok(SelectColumn::AllFromNew),
793 SelectColumn::Expr { expr, alias } => Ok(SelectColumn::Expr {
794 expr: materialize_expr(expr, exec_sub)?,
795 alias: alias.clone(),
796 }),
797 })
798 .collect::<Result<Vec<_>>>()?;
799 let order_by = stmt
800 .order_by
801 .iter()
802 .map(|ob| {
803 Ok(OrderByItem {
804 expr: materialize_expr(&ob.expr, exec_sub)?,
805 descending: ob.descending,
806 nulls_first: ob.nulls_first,
807 })
808 })
809 .collect::<Result<Vec<_>>>()?;
810 let joins = stmt
811 .joins
812 .iter()
813 .map(|j| {
814 let on_clause = j
815 .on_clause
816 .as_ref()
817 .map(|e| materialize_expr(e, exec_sub))
818 .transpose()?;
819 Ok(JoinClause {
820 join_type: j.join_type,
821 table: j.table.clone(),
822 subquery: j.subquery.clone(),
823 on_clause,
824 })
825 })
826 .collect::<Result<Vec<_>>>()?;
827 let group_by = stmt
828 .group_by
829 .iter()
830 .map(|e| materialize_expr(e, exec_sub))
831 .collect::<Result<Vec<_>>>()?;
832 Ok(SelectStmt {
833 columns,
834 from: stmt.from.clone(),
835 from_alias: stmt.from_alias.clone(),
836 from_subquery: stmt.from_subquery.clone(),
837 from_args: stmt.from_args.clone(),
838 from_json_table: stmt.from_json_table.clone(),
839 joins,
840 distinct: stmt.distinct,
841 where_clause,
842 order_by,
843 limit: stmt.limit.clone(),
844 offset: stmt.offset.clone(),
845 group_by,
846 having,
847 })
848}
849
850pub(super) fn exec_subquery_read(
851 db: &Database,
852 schema: &SchemaManager,
853 stmt: &SelectStmt,
854 ctes: &CteContext,
855) -> Result<QueryResult> {
856 match super::exec_select(db, schema, stmt, ctes)? {
857 ExecutionResult::Query(qr) => Ok(qr),
858 _ => Ok(QueryResult {
859 columns: vec![],
860 rows: vec![],
861 }),
862 }
863}
864
865pub(super) fn exec_subquery_write(
866 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
867 schema: &SchemaManager,
868 stmt: &SelectStmt,
869 ctes: &CteContext,
870) -> Result<QueryResult> {
871 match super::exec_select_in_txn(wtx, schema, stmt, ctes)? {
872 ExecutionResult::Query(qr) => Ok(qr),
873 _ => Ok(QueryResult {
874 columns: vec![],
875 rows: vec![],
876 }),
877 }
878}
879
880pub(super) fn update_has_subquery(stmt: &UpdateStmt) -> bool {
881 stmt.where_clause.as_ref().is_some_and(has_subquery)
882 || stmt.assignments.iter().any(|(_, e)| has_subquery(e))
883}
884
885pub(super) fn materialize_update(
886 stmt: &UpdateStmt,
887 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
888) -> Result<UpdateStmt> {
889 let where_clause = stmt
890 .where_clause
891 .as_ref()
892 .map(|e| materialize_expr(e, exec_sub))
893 .transpose()?;
894 let assignments = stmt
895 .assignments
896 .iter()
897 .map(|(name, expr)| Ok((name.clone(), materialize_expr(expr, exec_sub)?)))
898 .collect::<Result<Vec<_>>>()?;
899 Ok(UpdateStmt {
900 table: stmt.table.clone(),
901 assignments,
902 where_clause,
903 returning: stmt.returning.clone(),
904 })
905}
906
907pub(super) fn delete_has_subquery(stmt: &DeleteStmt) -> bool {
908 stmt.where_clause.as_ref().is_some_and(has_subquery)
909}
910
911pub(super) fn materialize_delete(
912 stmt: &DeleteStmt,
913 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
914) -> Result<DeleteStmt> {
915 let where_clause = stmt
916 .where_clause
917 .as_ref()
918 .map(|e| materialize_expr(e, exec_sub))
919 .transpose()?;
920 Ok(DeleteStmt {
921 table: stmt.table.clone(),
922 where_clause,
923 returning: stmt.returning.clone(),
924 })
925}
926
927pub(super) fn insert_has_subquery(stmt: &InsertStmt) -> bool {
928 match &stmt.source {
929 InsertSource::Values(rows) => rows.iter().any(|row| row.iter().any(has_subquery)),
930 InsertSource::Select(_) => false,
932 }
933}
934
935pub(super) fn materialize_insert(
936 stmt: &InsertStmt,
937 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
938) -> Result<InsertStmt> {
939 let source = match &stmt.source {
940 InsertSource::Values(rows) => {
941 let mat = rows
942 .iter()
943 .map(|row| {
944 row.iter()
945 .map(|e| materialize_expr(e, exec_sub))
946 .collect::<Result<Vec<_>>>()
947 })
948 .collect::<Result<Vec<_>>>()?;
949 InsertSource::Values(mat)
950 }
951 InsertSource::Select(sq) => {
952 let ctes = sq
953 .ctes
954 .iter()
955 .map(|c| {
956 Ok(CteDefinition {
957 name: c.name.clone(),
958 column_aliases: c.column_aliases.clone(),
959 body: materialize_query_body(&c.body, exec_sub)?,
960 })
961 })
962 .collect::<Result<Vec<_>>>()?;
963 let body = materialize_query_body(&sq.body, exec_sub)?;
964 InsertSource::Select(Box::new(SelectQuery {
965 ctes,
966 recursive: sq.recursive,
967 body,
968 }))
969 }
970 };
971 Ok(InsertStmt {
972 table: stmt.table.clone(),
973 columns: stmt.columns.clone(),
974 source,
975 on_conflict: stmt.on_conflict.clone(),
976 returning: stmt.returning.clone(),
977 })
978}
979
980pub(super) fn materialize_query_body(
981 body: &QueryBody,
982 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
983) -> Result<QueryBody> {
984 match body {
985 QueryBody::Select(sel) => Ok(QueryBody::Select(Box::new(materialize_stmt(
986 sel, exec_sub,
987 )?))),
988 QueryBody::Compound(comp) => Ok(QueryBody::Compound(Box::new(CompoundSelect {
989 op: comp.op.clone(),
990 all: comp.all,
991 left: Box::new(materialize_query_body(&comp.left, exec_sub)?),
992 right: Box::new(materialize_query_body(&comp.right, exec_sub)?),
993 order_by: comp.order_by.clone(),
994 limit: comp.limit.clone(),
995 offset: comp.offset.clone(),
996 }))),
997 QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Ok(body.clone()),
998 }
999}
1000
1001pub(super) fn exec_query_body(
1002 db: &Database,
1003 schema: &SchemaManager,
1004 body: &QueryBody,
1005 ctes: &CteContext,
1006) -> Result<ExecutionResult> {
1007 match body {
1008 QueryBody::Select(sel) => super::exec_select(db, schema, sel, ctes),
1009 QueryBody::Compound(comp) => exec_compound_select(db, schema, comp, ctes),
1010 QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Err(
1011 SqlError::Unsupported("DML CTE bodies require an active write transaction".into()),
1012 ),
1013 }
1014}
1015
1016pub(super) fn exec_query_body_in_txn(
1017 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1018 schema: &SchemaManager,
1019 body: &QueryBody,
1020 ctes: &CteContext,
1021) -> Result<ExecutionResult> {
1022 match body {
1023 QueryBody::Select(sel) => super::exec_select_in_txn(wtx, schema, sel, ctes),
1024 QueryBody::Compound(comp) => exec_compound_select_in_txn(wtx, schema, comp, ctes),
1025 QueryBody::Insert(ins) => exec_insert_in_txn_with_ctes(wtx, schema, ins, &[], ctes),
1026 QueryBody::Update(upd) => super::exec_update_in_txn(wtx, schema, upd),
1027 QueryBody::Delete(del) => super::exec_delete_in_txn(wtx, schema, del),
1028 }
1029}
1030
1031pub(super) fn exec_query_body_read(
1032 db: &Database,
1033 schema: &SchemaManager,
1034 body: &QueryBody,
1035 ctes: &CteContext,
1036) -> Result<QueryResult> {
1037 match exec_query_body(db, schema, body, ctes)? {
1038 ExecutionResult::Query(qr) => Ok(qr),
1039 _ => Ok(QueryResult {
1040 columns: vec![],
1041 rows: vec![],
1042 }),
1043 }
1044}
1045
1046pub(super) fn exec_query_body_write(
1047 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1048 schema: &SchemaManager,
1049 body: &QueryBody,
1050 ctes: &CteContext,
1051) -> Result<QueryResult> {
1052 match exec_query_body_in_txn(wtx, schema, body, ctes)? {
1053 ExecutionResult::Query(qr) => Ok(qr),
1054 _ => Ok(QueryResult {
1055 columns: vec![],
1056 rows: vec![],
1057 }),
1058 }
1059}
1060
1061pub(super) fn exec_compound_select(
1062 db: &Database,
1063 schema: &SchemaManager,
1064 comp: &CompoundSelect,
1065 ctes: &CteContext,
1066) -> Result<ExecutionResult> {
1067 let left_qr = match exec_query_body(db, schema, &comp.left, ctes)? {
1068 ExecutionResult::Query(qr) => qr,
1069 _ => QueryResult {
1070 columns: vec![],
1071 rows: vec![],
1072 },
1073 };
1074 let right_qr = match exec_query_body(db, schema, &comp.right, ctes)? {
1075 ExecutionResult::Query(qr) => qr,
1076 _ => QueryResult {
1077 columns: vec![],
1078 rows: vec![],
1079 },
1080 };
1081 apply_set_operation(comp, left_qr, right_qr)
1082}
1083
1084pub(super) fn exec_compound_select_in_txn(
1085 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1086 schema: &SchemaManager,
1087 comp: &CompoundSelect,
1088 ctes: &CteContext,
1089) -> Result<ExecutionResult> {
1090 let left_qr = match exec_query_body_in_txn(wtx, schema, &comp.left, ctes)? {
1091 ExecutionResult::Query(qr) => qr,
1092 _ => QueryResult {
1093 columns: vec![],
1094 rows: vec![],
1095 },
1096 };
1097 let right_qr = match exec_query_body_in_txn(wtx, schema, &comp.right, ctes)? {
1098 ExecutionResult::Query(qr) => qr,
1099 _ => QueryResult {
1100 columns: vec![],
1101 rows: vec![],
1102 },
1103 };
1104 apply_set_operation(comp, left_qr, right_qr)
1105}
1106
1107pub(super) fn apply_set_operation(
1108 comp: &CompoundSelect,
1109 left_qr: QueryResult,
1110 right_qr: QueryResult,
1111) -> Result<ExecutionResult> {
1112 if !left_qr.columns.is_empty()
1113 && !right_qr.columns.is_empty()
1114 && left_qr.columns.len() != right_qr.columns.len()
1115 {
1116 return Err(SqlError::CompoundColumnCountMismatch {
1117 left: left_qr.columns.len(),
1118 right: right_qr.columns.len(),
1119 });
1120 }
1121
1122 let columns = left_qr.columns;
1123
1124 let mut rows = match (&comp.op, comp.all) {
1125 (SetOp::Union, true) => {
1126 let total = left_qr.rows.len().saturating_add(right_qr.rows.len());
1127 let mut rows = Vec::with_capacity(total);
1128 rows.extend(left_qr.rows);
1129 rows.extend(right_qr.rows);
1130 rows
1131 }
1132 (SetOp::Union, false) => {
1133 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1134 let mut rows = Vec::new();
1135 for row in left_qr.rows.into_iter().chain(right_qr.rows) {
1136 if !seen.contains(&row) {
1137 seen.insert(row.clone());
1138 rows.push(row);
1139 }
1140 }
1141 rows
1142 }
1143 (SetOp::Intersect, true) => {
1144 let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1145 for row in &right_qr.rows {
1146 *right_counts.entry(row.clone()).or_insert(0) += 1;
1147 }
1148 let mut rows = Vec::new();
1149 for row in left_qr.rows {
1150 if let Some(count) = right_counts.get_mut(&row) {
1151 if *count > 0 {
1152 *count -= 1;
1153 rows.push(row);
1154 }
1155 }
1156 }
1157 rows
1158 }
1159 (SetOp::Intersect, false) => {
1160 let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1161 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1162 let mut rows = Vec::new();
1163 for row in left_qr.rows {
1164 if right_set.contains(&row) && !seen.contains(&row) {
1165 seen.insert(row.clone());
1166 rows.push(row);
1167 }
1168 }
1169 rows
1170 }
1171 (SetOp::Except, true) => {
1172 let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1173 for row in &right_qr.rows {
1174 *right_counts.entry(row.clone()).or_insert(0) += 1;
1175 }
1176 let mut rows = Vec::new();
1177 for row in left_qr.rows {
1178 if let Some(count) = right_counts.get_mut(&row) {
1179 if *count > 0 {
1180 *count -= 1;
1181 continue;
1182 }
1183 }
1184 rows.push(row);
1185 }
1186 rows
1187 }
1188 (SetOp::Except, false) => {
1189 let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1190 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1191 let mut rows = Vec::new();
1192 for row in left_qr.rows {
1193 if !right_set.contains(&row) && !seen.contains(&row) {
1194 seen.insert(row.clone());
1195 rows.push(row);
1196 }
1197 }
1198 rows
1199 }
1200 };
1201
1202 if !comp.order_by.is_empty() {
1203 let col_defs: Vec<crate::types::ColumnDef> = columns
1204 .iter()
1205 .enumerate()
1206 .map(|(i, name)| crate::types::ColumnDef {
1207 name: name.clone(),
1208 data_type: crate::types::DataType::Null,
1209 nullable: true,
1210 position: i as u16,
1211 default_expr: None,
1212 default_sql: None,
1213 check_expr: None,
1214 check_sql: None,
1215 check_name: None,
1216 is_with_timezone: false,
1217 generated_expr: None,
1218 generated_sql: None,
1219 generated_kind: None,
1220 collation: crate::types::Collation::Binary,
1221 })
1222 .collect();
1223 sort_rows(&mut rows, &comp.order_by, &col_defs)?;
1224 }
1225
1226 if let Some(ref offset_expr) = comp.offset {
1227 let offset = eval_const_int(offset_expr)?.max(0) as usize;
1228 if offset < rows.len() {
1229 rows = rows.split_off(offset);
1230 } else {
1231 rows.clear();
1232 }
1233 }
1234
1235 if let Some(ref limit_expr) = comp.limit {
1236 let limit = eval_const_int(limit_expr)?.max(0) as usize;
1237 rows.truncate(limit);
1238 }
1239
1240 Ok(ExecutionResult::Query(QueryResult { columns, rows }))
1241}
1242
1243struct InsertBufs {
1244 row: Vec<Value>,
1245 pk_values: Vec<Value>,
1246 value_values: Vec<Value>,
1247 key_buf: Vec<u8>,
1248 value_buf: Vec<u8>,
1249 col_indices: Vec<usize>,
1250 fk_key_buf: Vec<u8>,
1251}
1252
1253impl InsertBufs {
1254 fn new() -> Self {
1255 Self {
1256 row: Vec::new(),
1257 pk_values: Vec::new(),
1258 value_values: Vec::new(),
1259 key_buf: Vec::with_capacity(64),
1260 value_buf: Vec::with_capacity(256),
1261 col_indices: Vec::new(),
1262 fk_key_buf: Vec::with_capacity(64),
1263 }
1264 }
1265}
1266
1267thread_local! {
1268 static INSERT_SCRATCH: RefCell<InsertBufs> = RefCell::new(InsertBufs::new());
1269 static UPSERT_SCRATCH: RefCell<UpsertBufs> = RefCell::new(UpsertBufs::new());
1270}
1271
1272fn with_insert_scratch<R>(f: impl FnOnce(&mut InsertBufs) -> R) -> R {
1273 INSERT_SCRATCH.with(|slot| match slot.try_borrow_mut() {
1274 Ok(mut borrowed) => f(&mut borrowed),
1275 Err(_) => {
1276 let mut local = InsertBufs::new();
1277 f(&mut local)
1278 }
1279 })
1280}
1281
1282pub(super) struct UpsertBufs {
1283 old_row: Vec<Value>,
1284 new_row: Vec<Value>,
1285 value_values: Vec<Value>,
1286 new_value_buf: Vec<u8>,
1287}
1288
1289impl UpsertBufs {
1290 pub(super) fn new() -> Self {
1291 Self {
1292 old_row: Vec::new(),
1293 new_row: Vec::new(),
1294 value_values: Vec::new(),
1295 new_value_buf: Vec::with_capacity(256),
1296 }
1297 }
1298}
1299
1300pub fn exec_insert_in_txn(
1301 wtx: &mut WriteTxn<'_>,
1302 schema: &SchemaManager,
1303 stmt: &InsertStmt,
1304 params: &[Value],
1305) -> Result<ExecutionResult> {
1306 with_insert_scratch(|bufs| {
1307 exec_insert_in_txn_impl(
1308 wtx,
1309 schema,
1310 stmt,
1311 params,
1312 bufs,
1313 None,
1314 &CteContext::default(),
1315 )
1316 })
1317}
1318
1319pub(super) fn exec_insert_in_txn_with_ctes(
1320 wtx: &mut WriteTxn<'_>,
1321 schema: &SchemaManager,
1322 stmt: &InsertStmt,
1323 params: &[Value],
1324 outer_ctes: &CteContext,
1325) -> Result<ExecutionResult> {
1326 with_insert_scratch(|bufs| {
1327 exec_insert_in_txn_impl(wtx, schema, stmt, params, bufs, None, outer_ctes)
1328 })
1329}
1330
1331fn exec_insert_in_txn_cached(
1332 wtx: &mut WriteTxn<'_>,
1333 schema: &SchemaManager,
1334 stmt: &InsertStmt,
1335 params: &[Value],
1336 cache: &InsertCache,
1337) -> Result<ExecutionResult> {
1338 with_insert_scratch(|bufs| {
1339 exec_insert_in_txn_impl(
1340 wtx,
1341 schema,
1342 stmt,
1343 params,
1344 bufs,
1345 Some(cache),
1346 &CteContext::default(),
1347 )
1348 })
1349}
1350
1351fn exec_insert_in_txn_impl(
1352 wtx: &mut WriteTxn<'_>,
1353 schema: &SchemaManager,
1354 stmt: &InsertStmt,
1355 params: &[Value],
1356 bufs: &mut InsertBufs,
1357 cache: Option<&InsertCache>,
1358 outer_ctes: &CteContext,
1359) -> Result<ExecutionResult> {
1360 let empty_ctes = CteContext::default();
1361 let materialized;
1362 let has_sub = match cache {
1363 Some(c) => c.has_subquery,
1364 None => insert_has_subquery(stmt),
1365 };
1366 let stmt = if has_sub {
1367 materialized = materialize_insert(stmt, &mut |sub| {
1368 exec_subquery_write(wtx, schema, sub, &empty_ctes)
1369 })?;
1370 &materialized
1371 } else {
1372 stmt
1373 };
1374
1375 let view_lookup_key = stmt.table.to_ascii_lowercase();
1376 if let Some(view_def) = schema.get_view(&view_lookup_key) {
1377 if super::triggers::has_instead_of(
1378 schema,
1379 &view_lookup_key,
1380 super::triggers::FireEvent::Insert,
1381 ) {
1382 let aliases = view_def.column_aliases.clone();
1383 return exec_instead_of_view_insert_in_txn(
1384 wtx,
1385 schema,
1386 &view_lookup_key,
1387 &aliases,
1388 stmt,
1389 params,
1390 );
1391 }
1392 return Err(SqlError::CannotModifyView(stmt.table.clone()));
1393 }
1394
1395 let table_schema = schema
1396 .get(&stmt.table)
1397 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1398
1399 let default_columns;
1400 let insert_columns: &[String] = if stmt.columns.is_empty() {
1401 default_columns = table_schema
1402 .columns
1403 .iter()
1404 .map(|c| c.name.clone())
1405 .collect::<Vec<_>>();
1406 &default_columns
1407 } else {
1408 &stmt.columns
1409 };
1410
1411 bufs.col_indices.clear();
1412 if let Some(c) = cache {
1413 bufs.col_indices.extend_from_slice(&c.col_indices);
1414 } else {
1415 for name in insert_columns {
1416 bufs.col_indices.push(
1417 table_schema
1418 .column_index(name)
1419 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?,
1420 );
1421 }
1422 }
1423
1424 if cache.is_none() {
1425 for &ci in &bufs.col_indices {
1426 if table_schema.columns[ci].generated_kind.is_some() {
1427 return Err(SqlError::CannotInsertIntoGeneratedColumn(
1428 table_schema.columns[ci].name.clone(),
1429 ));
1430 }
1431 }
1432 }
1433
1434 let generated_cols_uncached: Vec<(usize, &Expr, FastGenEval)>;
1435 let cached_gen_positions: &[usize];
1436 let cached_gen_fast_evals: &[FastGenEval];
1437 if let Some(c) = cache {
1438 cached_gen_positions = &c.generated_col_positions;
1439 cached_gen_fast_evals = &c.generated_fast_evals;
1440 generated_cols_uncached = Vec::new();
1441 } else {
1442 cached_gen_positions = &[];
1443 cached_gen_fast_evals = &[];
1444 generated_cols_uncached = table_schema
1445 .columns
1446 .iter()
1447 .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
1448 .map(|c| {
1449 let expr = c.generated_expr.as_ref().unwrap();
1450 let fe = detect_fast_gen_eval(expr, table_schema);
1451 (c.position as usize, expr, fe)
1452 })
1453 .collect();
1454 }
1455 let has_gen_cols = !cached_gen_positions.is_empty() || !generated_cols_uncached.is_empty();
1456 let row_col_map_for_gen_owned: Option<ColumnMap> = if !has_gen_cols || cache.is_some() {
1457 None
1458 } else {
1459 Some(ColumnMap::new(&table_schema.columns))
1460 };
1461 let row_col_map_for_gen: Option<&ColumnMap> = if !has_gen_cols {
1462 None
1463 } else if let Some(c) = cache {
1464 c.row_col_map.as_ref()
1465 } else {
1466 row_col_map_for_gen_owned.as_ref()
1467 };
1468
1469 let any_defaults = match cache {
1470 Some(c) => c.any_defaults,
1471 None => table_schema
1472 .columns
1473 .iter()
1474 .any(|c| c.default_expr.is_some()),
1475 };
1476 let defaults: Vec<(usize, &Expr)> = if any_defaults {
1477 table_schema
1478 .columns
1479 .iter()
1480 .filter(|c| {
1481 c.default_expr.is_some() && !bufs.col_indices.contains(&(c.position as usize))
1482 })
1483 .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
1484 .collect()
1485 } else {
1486 Vec::new()
1487 };
1488
1489 let has_checks = match cache {
1490 Some(c) => c.has_checks,
1491 None => table_schema.has_checks(),
1492 };
1493 let check_col_map = if has_checks {
1494 Some(ColumnMap::new(&table_schema.columns))
1495 } else {
1496 None
1497 };
1498
1499 let (pk_indices, non_pk, enc_pos, phys_count, dropped): (
1500 &[usize],
1501 &[usize],
1502 &[u16],
1503 usize,
1504 &[u16],
1505 ) = if let Some(c) = cache {
1506 (
1507 &c.pk_indices,
1508 &c.non_pk_indices,
1509 &c.encoding_positions,
1510 c.phys_count,
1511 &c.dropped_non_pk_slots,
1512 )
1513 } else {
1514 (
1515 table_schema.pk_indices(),
1516 table_schema.non_pk_indices(),
1517 table_schema.encoding_positions(),
1518 table_schema.physical_non_pk_count(),
1519 table_schema.dropped_non_pk_slots(),
1520 )
1521 };
1522
1523 bufs.row.resize(table_schema.columns.len(), Value::Null);
1524 bufs.pk_values.resize(pk_indices.len(), Value::Null);
1525 bufs.value_values.resize(phys_count, Value::Null);
1526
1527 let table_bytes = table_schema.name.as_bytes();
1528 let has_fks = !table_schema.foreign_keys.is_empty();
1529 let has_indices = !table_schema.indices.is_empty();
1530 let has_defaults = !defaults.is_empty();
1531
1532 let compiled_conflict: Option<Arc<CompiledOnConflict>> = match (cache, &stmt.on_conflict) {
1533 (Some(c), Some(_)) if c.on_conflict.is_some() => c.on_conflict.clone(),
1534 (_, Some(oc)) => Some(Arc::new(compile_on_conflict(oc, table_schema)?)),
1535 (_, None) => None,
1536 };
1537
1538 let row_col_map_owned: Option<ColumnMap> =
1539 if compiled_conflict.is_some() && cache.and_then(|c| c.row_col_map.as_ref()).is_none() {
1540 Some(ColumnMap::new(&table_schema.columns))
1541 } else {
1542 None
1543 };
1544 let row_col_map: Option<&ColumnMap> = cache
1545 .and_then(|c| c.row_col_map.as_ref())
1546 .or(row_col_map_owned.as_ref());
1547
1548 let select_rows = match &stmt.source {
1549 InsertSource::Select(sq) => {
1550 let insert_ctes = super::materialize_all_ctes_with_outer(
1551 &sq.ctes,
1552 sq.recursive,
1553 outer_ctes,
1554 &mut |body, ctx| exec_query_body_write(wtx, schema, body, ctx),
1555 )?;
1556 let qr = exec_query_body_write(wtx, schema, &sq.body, &insert_ctes)?;
1557 Some(qr.rows)
1558 }
1559 InsertSource::Values(_) => None,
1560 };
1561
1562 let mut count: u64 = 0;
1563 let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
1564 stmt.returning.as_ref().map(|_| Vec::new());
1565
1566 let values = match &stmt.source {
1567 InsertSource::Values(rows) => Some(rows.as_slice()),
1568 InsertSource::Select(_) => None,
1569 };
1570 let sel_rows = select_rows.as_deref();
1571
1572 let total = match (values, sel_rows) {
1573 (Some(rows), _) => rows.len(),
1574 (_, Some(rows)) => rows.len(),
1575 _ => 0,
1576 };
1577
1578 if let Some(sel) = sel_rows {
1579 if !sel.is_empty() && sel[0].len() != insert_columns.len() {
1580 return Err(SqlError::InvalidValue(format!(
1581 "INSERT ... SELECT column count mismatch: expected {}, got {}",
1582 insert_columns.len(),
1583 sel[0].len()
1584 )));
1585 }
1586 }
1587
1588 let has_insert_statement_triggers_impl =
1589 schema.triggers_for(&table_schema.name).iter().any(|t| {
1590 t.enabled
1591 && t.granularity == crate::parser::TriggerGranularity::ForEachStatement
1592 && t.events
1593 .iter()
1594 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1595 });
1596 let mut stmt_new_rows_impl: Vec<Vec<Value>> = if has_insert_statement_triggers_impl {
1597 Vec::with_capacity(total)
1598 } else {
1599 Vec::new()
1600 };
1601 if has_insert_statement_triggers_impl {
1602 super::triggers::fire_statement_triggers(
1603 wtx,
1604 schema,
1605 &table_schema.name,
1606 crate::parser::TriggerTiming::Before,
1607 super::triggers::FireEvent::Insert,
1608 &table_schema.columns,
1609 &[],
1610 &[],
1611 )?;
1612 }
1613
1614 let skip_row_clear = cache.is_some_and(|c| c.row_fully_overwritten);
1615 for idx in 0..total {
1616 if !skip_row_clear {
1617 for v in bufs.row.iter_mut() {
1618 *v = Value::Null;
1619 }
1620 }
1621
1622 if let Some(value_rows) = values {
1623 if let Some(plan) = cache.and_then(|c| c.bind_plan.as_ref()) {
1624 for action in plan {
1625 match action {
1626 BindAction::Param {
1627 param_idx,
1628 col_idx,
1629 target,
1630 } => {
1631 let v = ¶ms[*param_idx];
1632 bufs.row[*col_idx] = if v.is_null() {
1633 Value::Null
1634 } else if v.data_type() == *target {
1635 v.clone()
1636 } else {
1637 let got = v.data_type();
1638 v.clone().coerce_into(*target).ok_or_else(|| {
1639 SqlError::TypeMismatch {
1640 expected: target.to_string(),
1641 got: got.to_string(),
1642 }
1643 })?
1644 };
1645 }
1646 BindAction::Literal { value, col_idx } => {
1647 bufs.row[*col_idx] = value.clone();
1648 }
1649 }
1650 }
1651 } else {
1652 let value_row = &value_rows[idx];
1653 if value_row.len() != insert_columns.len() {
1654 return Err(SqlError::InvalidValue(format!(
1655 "expected {} values, got {}",
1656 insert_columns.len(),
1657 value_row.len()
1658 )));
1659 }
1660 for (i, expr) in value_row.iter().enumerate() {
1661 let val = match expr {
1662 Expr::Parameter(n) => params
1663 .get(n - 1)
1664 .cloned()
1665 .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
1666 Expr::Literal(v) => v.clone(),
1667 _ => eval_const_expr(expr)?,
1668 };
1669 let col_idx = bufs.col_indices[i];
1670 let col = &table_schema.columns[col_idx];
1671 let got_type = val.data_type();
1672 bufs.row[col_idx] = if val.is_null() {
1673 Value::Null
1674 } else {
1675 val.coerce_into(col.data_type)
1676 .ok_or_else(|| SqlError::TypeMismatch {
1677 expected: col.data_type.to_string(),
1678 got: got_type.to_string(),
1679 })?
1680 };
1681 }
1682 }
1683 } else if let Some(sel) = sel_rows {
1684 let sel_row = &sel[idx];
1685 for (i, val) in sel_row.iter().enumerate() {
1686 let col_idx = bufs.col_indices[i];
1687 let col = &table_schema.columns[col_idx];
1688 let got_type = val.data_type();
1689 bufs.row[col_idx] = if val.is_null() {
1690 Value::Null
1691 } else {
1692 val.clone().coerce_into(col.data_type).ok_or_else(|| {
1693 SqlError::TypeMismatch {
1694 expected: col.data_type.to_string(),
1695 got: got_type.to_string(),
1696 }
1697 })?
1698 };
1699 }
1700 }
1701
1702 if has_defaults {
1703 for &(pos, def_expr) in &defaults {
1704 let val = eval_const_expr(def_expr)?;
1705 let col = &table_schema.columns[pos];
1706 if !val.is_null() {
1707 let got_type = val.data_type();
1708 bufs.row[pos] =
1709 val.coerce_into(col.data_type)
1710 .ok_or_else(|| SqlError::TypeMismatch {
1711 expected: col.data_type.to_string(),
1712 got: got_type.to_string(),
1713 })?;
1714 }
1715 }
1716 }
1717
1718 if let Some(gen_map) = row_col_map_for_gen {
1719 if cache.is_some() {
1720 for (pos, fast) in cached_gen_positions
1721 .iter()
1722 .copied()
1723 .zip(cached_gen_fast_evals.iter())
1724 {
1725 let gen_expr = table_schema.columns[pos].generated_expr.as_ref().unwrap();
1726 let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1727 let col = &table_schema.columns[pos];
1728 bufs.row[pos] = if val.is_null() {
1729 Value::Null
1730 } else {
1731 let got_type = val.data_type();
1732 val.coerce_into(col.data_type)
1733 .ok_or_else(|| SqlError::TypeMismatch {
1734 expected: col.data_type.to_string(),
1735 got: got_type.to_string(),
1736 })?
1737 };
1738 }
1739 } else {
1740 for (pos, gen_expr, fast) in &generated_cols_uncached {
1741 let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1742 let col = &table_schema.columns[*pos];
1743 bufs.row[*pos] = if val.is_null() {
1744 Value::Null
1745 } else {
1746 let got_type = val.data_type();
1747 val.coerce_into(col.data_type)
1748 .ok_or_else(|| SqlError::TypeMismatch {
1749 expected: col.data_type.to_string(),
1750 got: got_type.to_string(),
1751 })?
1752 };
1753 }
1754 }
1755 }
1756
1757 if let Some(c) = cache {
1758 for &pos in &c.not_null_indices {
1759 if bufs.row[pos as usize].is_null() {
1760 return Err(SqlError::NotNullViolation(
1761 table_schema.columns[pos as usize].name.clone(),
1762 ));
1763 }
1764 }
1765 } else {
1766 for col in &table_schema.columns {
1767 if !col.nullable && bufs.row[col.position as usize].is_null() {
1768 return Err(SqlError::NotNullViolation(col.name.clone()));
1769 }
1770 }
1771 }
1772
1773 if let Some(ref col_map) = check_col_map {
1774 for col in &table_schema.columns {
1775 if let Some(ref check) = col.check_expr {
1776 let result = eval_expr(check, &EvalCtx::new(col_map, &bufs.row))?;
1777 if !is_truthy(&result) && !result.is_null() {
1778 let name = col.check_name.as_deref().unwrap_or(&col.name);
1779 return Err(SqlError::CheckViolation(name.to_string()));
1780 }
1781 }
1782 }
1783 for tc in &table_schema.check_constraints {
1784 let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &bufs.row))?;
1785 if !is_truthy(&result) && !result.is_null() {
1786 let name = tc.name.as_deref().unwrap_or(&tc.sql);
1787 return Err(SqlError::CheckViolation(name.to_string()));
1788 }
1789 }
1790 }
1791
1792 if has_fks {
1793 for fk in &table_schema.foreign_keys {
1794 let any_null = fk.columns.iter().any(|&ci| bufs.row[ci as usize].is_null());
1795 if any_null {
1796 continue;
1797 }
1798 crate::encoding::encode_composite_key_from_indices(
1799 &fk.columns,
1800 &bufs.row,
1801 &mut bufs.fk_key_buf,
1802 );
1803 if fk.deferrable && fk.initially_deferred {
1804 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
1805 wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
1806 fk_name: name,
1807 foreign_table: fk.foreign_table.as_bytes().to_vec(),
1808 parent_key: bufs.fk_key_buf.clone(),
1809 });
1810 continue;
1811 }
1812 if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &bufs.fk_key_buf) {
1813 let found = wtx
1814 .table_get(fk.foreign_table.as_bytes(), &bufs.fk_key_buf)
1815 .map_err(SqlError::Storage)?;
1816 if found.is_none() {
1817 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
1818 return Err(SqlError::ForeignKeyViolation(name.to_string()));
1819 }
1820 wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &bufs.fk_key_buf);
1821 }
1822 }
1823 }
1824
1825 let proposed_row_for_returning: Option<Vec<Value>> =
1826 returning_rows.as_ref().map(|_| bufs.row.clone());
1827 let row_for_stmt_trigger_impl: Option<Vec<Value>> = if has_insert_statement_triggers_impl {
1828 Some(bufs.row.clone())
1829 } else {
1830 None
1831 };
1832
1833 let has_before_insert_triggers = schema.triggers_for(&table_schema.name).iter().any(|t| {
1834 t.enabled
1835 && t.timing == crate::parser::TriggerTiming::Before
1836 && t.granularity == crate::parser::TriggerGranularity::ForEachRow
1837 && t.events
1838 .iter()
1839 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1840 });
1841 if has_before_insert_triggers {
1842 super::triggers::fire_row_triggers(
1843 wtx,
1844 schema,
1845 &table_schema.name,
1846 crate::parser::TriggerTiming::Before,
1847 super::triggers::FireEvent::Insert,
1848 None,
1849 Some(bufs.row.clone()),
1850 &table_schema.columns,
1851 )?;
1852 }
1853
1854 for (j, &i) in pk_indices.iter().enumerate() {
1855 bufs.pk_values[j] = std::mem::replace(&mut bufs.row[i], Value::Null);
1856 }
1857 match cache.map(|c| c.single_int_pk).unwrap_or(false) {
1858 true => match bufs.pk_values[0] {
1859 Value::Integer(v) => crate::encoding::encode_int_key_into(v, &mut bufs.key_buf),
1860 _ => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1861 },
1862 false => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1863 }
1864
1865 for &slot in dropped {
1866 bufs.value_values[slot as usize] = Value::Null;
1867 }
1868 for (j, &i) in non_pk.iter().enumerate() {
1869 let col = &table_schema.columns[i];
1870 if matches!(
1871 col.generated_kind,
1872 Some(crate::parser::GeneratedKind::Virtual)
1873 ) {
1874 bufs.value_values[enc_pos[j] as usize] = Value::Null;
1875 bufs.row[i] = Value::Null;
1876 } else {
1877 bufs.value_values[enc_pos[j] as usize] =
1878 std::mem::replace(&mut bufs.row[i], Value::Null);
1879 }
1880 }
1881 match cache.and_then(|c| c.row_encoder.as_ref()) {
1882 Some(tmpl) => crate::encoding::encode_int_row_with_template(
1883 tmpl,
1884 &bufs.value_values,
1885 &mut bufs.value_buf,
1886 )?,
1887 None => encode_row_into(&bufs.value_values, &mut bufs.value_buf),
1888 }
1889
1890 if bufs.key_buf.len() > citadel_core::MAX_KEY_SIZE {
1891 return Err(SqlError::KeyTooLarge {
1892 size: bufs.key_buf.len(),
1893 max: citadel_core::MAX_KEY_SIZE,
1894 });
1895 }
1896 if bufs.value_buf.len() > citadel_core::MAX_VALUE_SIZE {
1897 return Err(SqlError::RowTooLarge {
1898 size: bufs.value_buf.len(),
1899 max: citadel_core::MAX_VALUE_SIZE,
1900 });
1901 }
1902
1903 match compiled_conflict.as_ref() {
1904 None => {
1905 let is_new = wtx
1906 .table_insert(table_bytes, &bufs.key_buf, &bufs.value_buf)
1907 .map_err(SqlError::Storage)?;
1908 if !is_new {
1909 return Err(SqlError::DuplicateKey);
1910 }
1911 let has_after_insert_triggers =
1912 schema.triggers_for(&table_schema.name).iter().any(|t| {
1913 t.enabled
1914 && t.timing == crate::parser::TriggerTiming::After
1915 && t.granularity == crate::parser::TriggerGranularity::ForEachRow
1916 && t.events
1917 .iter()
1918 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1919 });
1920 if has_indices || has_after_insert_triggers {
1921 for (j, &i) in pk_indices.iter().enumerate() {
1922 bufs.row[i] = bufs.pk_values[j].clone();
1923 }
1924 for (j, &i) in non_pk.iter().enumerate() {
1925 bufs.row[i] = std::mem::replace(
1926 &mut bufs.value_values[enc_pos[j] as usize],
1927 Value::Null,
1928 );
1929 }
1930 if has_indices {
1931 insert_index_entries(wtx, table_schema, &bufs.row, &bufs.pk_values)?;
1932 }
1933 if has_after_insert_triggers {
1934 super::triggers::fire_row_triggers(
1935 wtx,
1936 schema,
1937 &table_schema.name,
1938 crate::parser::TriggerTiming::After,
1939 super::triggers::FireEvent::Insert,
1940 None,
1941 Some(bufs.row.clone()),
1942 &table_schema.columns,
1943 )?;
1944 }
1945 }
1946 if let Some(r) = row_for_stmt_trigger_impl.clone() {
1947 stmt_new_rows_impl.push(r);
1948 }
1949 count += 1;
1950 if let Some(buf) = returning_rows.as_mut() {
1951 buf.push((None, proposed_row_for_returning));
1952 }
1953 }
1954 Some(oc) => {
1955 let oc_ref: &CompiledOnConflict = oc;
1956 let needs_row = upsert_needs_row(oc_ref, table_schema);
1957 if needs_row {
1958 for (j, &i) in pk_indices.iter().enumerate() {
1959 bufs.row[i] = bufs.pk_values[j].clone();
1960 }
1961 for (j, &i) in non_pk.iter().enumerate() {
1962 bufs.row[i] = std::mem::replace(
1963 &mut bufs.value_values[enc_pos[j] as usize],
1964 Value::Null,
1965 );
1966 }
1967 }
1968 let outcome = apply_insert_with_conflict(
1969 wtx,
1970 table_schema,
1971 &bufs.key_buf,
1972 &bufs.value_buf,
1973 &bufs.row,
1974 &bufs.pk_values,
1975 oc_ref,
1976 row_col_map.unwrap(),
1977 stmt.returning.is_some(),
1978 )?;
1979 match outcome {
1980 InsertRowOutcome::Inserted => {
1981 count += 1;
1982 if let Some(buf) = returning_rows.as_mut() {
1983 buf.push((None, proposed_row_for_returning));
1984 }
1985 if let Some(r) = row_for_stmt_trigger_impl.clone() {
1986 stmt_new_rows_impl.push(r);
1987 }
1988 let has_after_insert_triggers =
1989 schema.triggers_for(&table_schema.name).iter().any(|t| {
1990 t.enabled
1991 && t.timing == crate::parser::TriggerTiming::After
1992 && t.granularity
1993 == crate::parser::TriggerGranularity::ForEachRow
1994 && t.events
1995 .iter()
1996 .any(|e| matches!(e, crate::parser::TriggerEvent::Insert))
1997 });
1998 if has_after_insert_triggers {
1999 super::triggers::fire_row_triggers(
2000 wtx,
2001 schema,
2002 &table_schema.name,
2003 crate::parser::TriggerTiming::After,
2004 super::triggers::FireEvent::Insert,
2005 None,
2006 Some(bufs.row.clone()),
2007 &table_schema.columns,
2008 )?;
2009 }
2010 }
2011 InsertRowOutcome::Updated { old, new } => {
2012 count += 1;
2013 if let Some(buf) = returning_rows.as_mut() {
2014 buf.push((Some(old.clone()), Some(new.clone())));
2015 }
2016 let has_after_update_triggers =
2017 schema.triggers_for(&table_schema.name).iter().any(|t| {
2018 t.enabled
2019 && t.timing == crate::parser::TriggerTiming::After
2020 && t.granularity
2021 == crate::parser::TriggerGranularity::ForEachRow
2022 && t.events.iter().any(|e| {
2023 matches!(e, crate::parser::TriggerEvent::Update(_))
2024 })
2025 });
2026 if has_after_update_triggers {
2027 let changed_cols: Vec<String> = match oc_ref {
2028 CompiledOnConflict::DoUpdate { assignments, .. } => assignments
2029 .iter()
2030 .map(|(col_idx, _)| table_schema.columns[*col_idx].name.clone())
2031 .collect(),
2032 _ => Vec::new(),
2033 };
2034 super::triggers::fire_row_triggers(
2035 wtx,
2036 schema,
2037 &table_schema.name,
2038 crate::parser::TriggerTiming::After,
2039 super::triggers::FireEvent::Update {
2040 changed_columns: &changed_cols,
2041 },
2042 Some(old),
2043 Some(new),
2044 &table_schema.columns,
2045 )?;
2046 }
2047 }
2048 InsertRowOutcome::Skipped => {}
2049 }
2050 }
2051 }
2052 }
2053
2054 if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
2055 if has_insert_statement_triggers_impl {
2056 super::triggers::fire_statement_triggers(
2057 wtx,
2058 schema,
2059 &table_schema.name,
2060 crate::parser::TriggerTiming::After,
2061 super::triggers::FireEvent::Insert,
2062 &table_schema.columns,
2063 &[],
2064 &stmt_new_rows_impl,
2065 )?;
2066 }
2067 return Ok(ExecutionResult::Query(super::helpers::project_returning(
2068 table_schema,
2069 returning_cols,
2070 &rows,
2071 )?));
2072 }
2073
2074 if has_insert_statement_triggers_impl {
2075 super::triggers::fire_statement_triggers(
2076 wtx,
2077 schema,
2078 &table_schema.name,
2079 crate::parser::TriggerTiming::After,
2080 super::triggers::FireEvent::Insert,
2081 &table_schema.columns,
2082 &[],
2083 &stmt_new_rows_impl,
2084 )?;
2085 }
2086
2087 Ok(ExecutionResult::RowsAffected(count))
2088}
2089
2090pub struct CompiledInsert {
2091 table_lower: String,
2092 cached: Option<InsertCache>,
2093}
2094
2095struct InsertCache {
2096 col_indices: Vec<usize>,
2097 has_subquery: bool,
2098 any_defaults: bool,
2099 has_checks: bool,
2100 on_conflict: Option<Arc<CompiledOnConflict>>,
2101 row_col_map: Option<ColumnMap>,
2102 generated_col_positions: Vec<usize>,
2103 generated_fast_evals: Vec<FastGenEval>,
2104 pk_indices: Vec<usize>,
2105 non_pk_indices: Vec<usize>,
2106 encoding_positions: Vec<u16>,
2107 dropped_non_pk_slots: Vec<u16>,
2108 phys_count: usize,
2109 single_int_pk: bool,
2110 not_null_indices: Vec<u16>,
2111 bind_plan: Option<Vec<BindAction>>,
2112 row_fully_overwritten: bool,
2113 row_encoder: Option<crate::encoding::IntRowTemplate>,
2114 is_trivial_fast: bool,
2115 trivial_fast_program: Option<TrivialFastProgram>,
2116 needs_scoped_params: bool,
2117}
2118
2119#[derive(Clone)]
2120enum BindAction {
2121 Param {
2122 param_idx: usize,
2123 col_idx: usize,
2124 target: DataType,
2125 },
2126 Literal {
2127 value: Value,
2128 col_idx: usize,
2129 },
2130}
2131
2132#[derive(Clone)]
2133struct TrivialFastProgram {
2134 template: Vec<u8>,
2135 ops: Vec<WriteOp>,
2136 pk_param: u8,
2137 not_null_param_indices: Vec<u8>,
2138}
2139
2140#[derive(Clone)]
2141enum WriteOp {
2142 ParamI64 {
2143 param_idx: u8,
2144 off: u32,
2145 },
2146 LiteralI64 {
2147 value: i64,
2148 off: u32,
2149 },
2150 GenAddParamsI64 {
2151 a_param: u8,
2152 b_param: u8,
2153 off: u32,
2154 bitmap_byte_off: u32,
2155 bitmap_bit_mask: u8,
2156 },
2157 GenMulAddParamI64 {
2158 param_idx: u8,
2159 mul: i64,
2160 add: i64,
2161 off: u32,
2162 bitmap_byte_off: u32,
2163 bitmap_bit_mask: u8,
2164 },
2165}
2166
2167fn build_trivial_fast_program(
2168 bind_plan: &[BindAction],
2169 row_encoder: &crate::encoding::IntRowTemplate,
2170 non_virtual_pairs: &[(usize, usize)],
2171 generated_col_positions: &[usize],
2172 generated_fast_evals: &[FastGenEval],
2173 pk_indices: &[usize],
2174 columns: &[crate::types::ColumnDef],
2175) -> Option<TrivialFastProgram> {
2176 let pk_col = pk_indices[0];
2177 let col_to_slot: rustc_hash::FxHashMap<usize, usize> =
2178 non_virtual_pairs.iter().copied().collect();
2179 let slot_to_off: rustc_hash::FxHashMap<usize, usize> =
2180 row_encoder.slot_offsets.iter().copied().collect();
2181
2182 let mut col_to_param: rustc_hash::FxHashMap<usize, u8> = Default::default();
2183 let mut col_to_lit_int: rustc_hash::FxHashMap<usize, i64> = Default::default();
2184 let mut pk_param: Option<u8> = None;
2185 let mut ops: Vec<WriteOp> = Vec::with_capacity(bind_plan.len() + generated_col_positions.len());
2186 let mut not_null_param_indices: Vec<u8> = Vec::new();
2187
2188 for action in bind_plan {
2189 match action {
2190 BindAction::Param {
2191 param_idx,
2192 col_idx,
2193 target,
2194 } => {
2195 if *target != DataType::Integer {
2196 return None;
2197 }
2198 let pi: u8 = u8::try_from(*param_idx).ok()?;
2199 col_to_param.insert(*col_idx, pi);
2200 if *col_idx == pk_col {
2201 pk_param = Some(pi);
2202 } else {
2203 let slot = *col_to_slot.get(col_idx)?;
2204 let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
2205 ops.push(WriteOp::ParamI64 { param_idx: pi, off });
2206 if !columns[*col_idx].nullable {
2207 not_null_param_indices.push(pi);
2208 }
2209 }
2210 }
2211 BindAction::Literal { value, col_idx } => match value {
2212 Value::Integer(v) => {
2213 col_to_lit_int.insert(*col_idx, *v);
2214 if *col_idx == pk_col {
2215 return None;
2216 }
2217 let slot = *col_to_slot.get(col_idx)?;
2218 let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
2219 ops.push(WriteOp::LiteralI64 { value: *v, off });
2220 }
2221 _ => return None,
2222 },
2223 }
2224 }
2225
2226 let pk_param = pk_param?;
2227
2228 for (i, &gen_pos) in generated_col_positions.iter().enumerate() {
2229 let gen_slot = *col_to_slot.get(&gen_pos)?;
2230 let gen_off = u32::try_from(*slot_to_off.get(&gen_slot)?).ok()?;
2231 let bitmap_byte_off = u32::try_from(2 + gen_slot / 8).ok()?;
2232 let bitmap_bit_mask: u8 = 1u8 << (gen_slot % 8);
2233 let gen_col_nullable = columns[gen_pos].nullable;
2234
2235 match &generated_fast_evals[i] {
2236 FastGenEval::IntColAddCol {
2237 left_idx,
2238 right_idx,
2239 } => {
2240 let a_param = col_to_param.get(left_idx).copied();
2241 let b_param = col_to_param.get(right_idx).copied();
2242 match (a_param, b_param) {
2243 (Some(ap), Some(bp)) => {
2244 let deps_safe = gen_col_nullable
2245 || (not_null_param_indices.contains(&ap)
2246 && not_null_param_indices.contains(&bp));
2247 if !deps_safe {
2248 return None;
2249 }
2250 ops.push(WriteOp::GenAddParamsI64 {
2251 a_param: ap,
2252 b_param: bp,
2253 off: gen_off,
2254 bitmap_byte_off,
2255 bitmap_bit_mask,
2256 });
2257 }
2258 (Some(p), None) => {
2259 let lit = col_to_lit_int.get(right_idx).copied()?;
2260 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2261 return None;
2262 }
2263 ops.push(WriteOp::GenMulAddParamI64 {
2264 param_idx: p,
2265 mul: 1,
2266 add: lit,
2267 off: gen_off,
2268 bitmap_byte_off,
2269 bitmap_bit_mask,
2270 });
2271 }
2272 (None, Some(p)) => {
2273 let lit = col_to_lit_int.get(left_idx).copied()?;
2274 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2275 return None;
2276 }
2277 ops.push(WriteOp::GenMulAddParamI64 {
2278 param_idx: p,
2279 mul: 1,
2280 add: lit,
2281 off: gen_off,
2282 bitmap_byte_off,
2283 bitmap_bit_mask,
2284 });
2285 }
2286 (None, None) => {
2287 let la = col_to_lit_int.get(left_idx).copied()?;
2288 let lb = col_to_lit_int.get(right_idx).copied()?;
2289 ops.push(WriteOp::LiteralI64 {
2290 value: la.wrapping_add(lb),
2291 off: gen_off,
2292 });
2293 }
2294 }
2295 }
2296 FastGenEval::IntColMulAdd {
2297 col_schema_idx,
2298 mul,
2299 add,
2300 } => {
2301 if let Some(p) = col_to_param.get(col_schema_idx).copied() {
2302 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
2303 return None;
2304 }
2305 ops.push(WriteOp::GenMulAddParamI64 {
2306 param_idx: p,
2307 mul: *mul,
2308 add: *add,
2309 off: gen_off,
2310 bitmap_byte_off,
2311 bitmap_bit_mask,
2312 });
2313 } else if let Some(lit) = col_to_lit_int.get(col_schema_idx).copied() {
2314 ops.push(WriteOp::LiteralI64 {
2315 value: lit.wrapping_mul(*mul).wrapping_add(*add),
2316 off: gen_off,
2317 });
2318 } else {
2319 return None;
2320 }
2321 }
2322 FastGenEval::None => return None,
2323 }
2324 }
2325
2326 Some(TrivialFastProgram {
2327 template: row_encoder.template.clone(),
2328 ops,
2329 pk_param,
2330 not_null_param_indices,
2331 })
2332}
2333
2334#[derive(Clone)]
2335pub(super) enum CompiledOnConflict {
2336 DoNothing {
2337 target: Option<ConflictKind>,
2338 },
2339 DoUpdate {
2340 target: ConflictKind,
2341 assignments: Vec<(usize, Expr)>,
2342 where_clause: Option<Expr>,
2343 fast_paths: Option<Vec<DoUpdateFastPath>>,
2344 },
2345}
2346
2347#[derive(Clone, Copy)]
2348pub(super) enum DoUpdateFastPath {
2349 IntAddConst { phys_idx: usize, delta: i64 },
2350}
2351
2352#[derive(Clone, Debug)]
2353pub(super) enum ConflictKind {
2354 PrimaryKey,
2355 UniqueIndex { index_idx: usize },
2356}
2357
2358fn resolve_conflict_target(target: &ConflictTarget, ts: &TableSchema) -> Result<ConflictKind> {
2359 match target {
2360 ConflictTarget::Columns(cols) => {
2361 let col_idx_set: Vec<u16> = cols
2362 .iter()
2363 .map(|name| {
2364 ts.column_index(name)
2365 .map(|i| i as u16)
2366 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
2367 })
2368 .collect::<Result<_>>()?;
2369 let pk_set = ts.primary_key_columns.clone();
2370 if set_equal(&col_idx_set, &pk_set) {
2371 return Ok(ConflictKind::PrimaryKey);
2372 }
2373 for (index_idx, idx) in ts.indices.iter().enumerate() {
2374 if idx.unique && set_equal(&col_idx_set, &idx.columns_vec()) {
2375 return Ok(ConflictKind::UniqueIndex { index_idx });
2376 }
2377 }
2378 Err(SqlError::Plan(
2379 "ON CONFLICT target does not match any unique constraint".into(),
2380 ))
2381 }
2382 ConflictTarget::Constraint(name) => {
2383 let lower = name.to_ascii_lowercase();
2384 for (index_idx, idx) in ts.indices.iter().enumerate() {
2385 if idx.name.eq_ignore_ascii_case(&lower) {
2386 if idx.unique {
2387 return Ok(ConflictKind::UniqueIndex { index_idx });
2388 }
2389 return Err(SqlError::Plan(format!(
2390 "ON CONFLICT ON CONSTRAINT '{name}' requires a unique index"
2391 )));
2392 }
2393 }
2394 Err(SqlError::Plan(format!(
2395 "unknown constraint '{name}'; primary keys cannot be referenced by name, use ON CONFLICT (col_list)"
2396 )))
2397 }
2398 }
2399}
2400
2401fn set_equal(a: &[u16], b: &[u16]) -> bool {
2402 if a.len() != b.len() {
2403 return false;
2404 }
2405 let mut a_sorted = a.to_vec();
2406 let mut b_sorted = b.to_vec();
2407 a_sorted.sort_unstable();
2408 b_sorted.sort_unstable();
2409 a_sorted == b_sorted
2410}
2411
2412pub(super) enum InsertRowOutcome {
2413 Inserted,
2414 Updated { old: Vec<Value>, new: Vec<Value> },
2415 Skipped,
2416}
2417
2418#[allow(clippy::too_many_arguments)]
2419#[inline]
2420pub(super) fn apply_insert_with_conflict(
2421 wtx: &mut WriteTxn<'_>,
2422 table_schema: &TableSchema,
2423 key_buf: &[u8],
2424 value_buf: &[u8],
2425 row: &[Value],
2426 pk_values: &[Value],
2427 on_conflict: &CompiledOnConflict,
2428 col_map: &ColumnMap,
2429 capture_returning: bool,
2430) -> Result<InsertRowOutcome> {
2431 let table_bytes = table_schema.name.as_bytes();
2432
2433 if let CompiledOnConflict::DoNothing { target } = on_conflict {
2434 let pk_target = matches!(target, None | Some(ConflictKind::PrimaryKey));
2435 if pk_target && table_schema.indices.is_empty() && table_schema.foreign_keys.is_empty() {
2436 let inserted = wtx
2437 .table_insert_if_absent(table_bytes, key_buf, value_buf)
2438 .map_err(SqlError::Storage)?;
2439 return Ok(if inserted {
2440 InsertRowOutcome::Inserted
2441 } else {
2442 InsertRowOutcome::Skipped
2443 });
2444 }
2445 }
2446
2447 if let CompiledOnConflict::DoUpdate {
2448 target: ConflictKind::PrimaryKey,
2449 assignments,
2450 where_clause,
2451 fast_paths,
2452 } = on_conflict
2453 {
2454 if can_fuse_do_update(table_schema, assignments) {
2455 return apply_do_update_fused(
2456 wtx,
2457 table_schema,
2458 table_bytes,
2459 key_buf,
2460 value_buf,
2461 row,
2462 assignments,
2463 where_clause.as_ref(),
2464 col_map,
2465 fast_paths.as_deref(),
2466 capture_returning,
2467 );
2468 }
2469 }
2470
2471 let primary_outcome = wtx
2472 .table_insert_or_fetch(table_bytes, key_buf, value_buf)
2473 .map_err(SqlError::Storage)?;
2474
2475 match primary_outcome {
2476 citadel_txn::write_txn::InsertOutcome::Inserted => {
2477 if table_schema.indices.is_empty() {
2478 return Ok(InsertRowOutcome::Inserted);
2479 }
2480 let mut inserted_keys: Vec<(usize, Vec<u8>)> = Vec::new();
2481 match insert_index_entries_or_fetch(
2482 wtx,
2483 table_schema,
2484 row,
2485 pk_values,
2486 &mut inserted_keys,
2487 )? {
2488 None => Ok(InsertRowOutcome::Inserted),
2489 Some(conflicting_idx) => {
2490 let matches_target =
2491 matches!(on_conflict, CompiledOnConflict::DoNothing { target: None })
2492 || matches!(
2493 on_conflict,
2494 CompiledOnConflict::DoNothing {
2495 target: Some(ConflictKind::UniqueIndex { index_idx }),
2496 } | CompiledOnConflict::DoUpdate {
2497 target: ConflictKind::UniqueIndex { index_idx },
2498 ..
2499 } if *index_idx == conflicting_idx
2500 );
2501 undo_partial_insert(wtx, table_schema, key_buf, &inserted_keys)?;
2502 if !matches_target {
2503 return Err(SqlError::UniqueViolation(
2504 table_schema.indices[conflicting_idx].name.clone(),
2505 ));
2506 }
2507 match on_conflict {
2508 CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2509 CompiledOnConflict::DoUpdate {
2510 assignments,
2511 where_clause,
2512 ..
2513 } => {
2514 let existing_pk =
2515 fetch_unique_index_pk(wtx, table_schema, conflicting_idx, row)?;
2516 apply_do_update(
2517 wtx,
2518 table_schema,
2519 &existing_pk,
2520 row,
2521 assignments,
2522 where_clause.as_ref(),
2523 col_map,
2524 capture_returning,
2525 )
2526 }
2527 }
2528 }
2529 }
2530 }
2531 citadel_txn::write_txn::InsertOutcome::Existed(old_bytes) => {
2532 let matches_target = matches!(
2533 on_conflict,
2534 CompiledOnConflict::DoNothing { target: None }
2535 | CompiledOnConflict::DoNothing {
2536 target: Some(ConflictKind::PrimaryKey),
2537 }
2538 | CompiledOnConflict::DoUpdate {
2539 target: ConflictKind::PrimaryKey,
2540 ..
2541 }
2542 );
2543 if !matches_target {
2544 return Err(SqlError::DuplicateKey);
2545 }
2546 match on_conflict {
2547 CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2548 CompiledOnConflict::DoUpdate {
2549 assignments,
2550 where_clause,
2551 ..
2552 } => {
2553 let old_row = decode_full_row(table_schema, key_buf, &old_bytes)?;
2554 apply_do_update_with_old_row(
2555 wtx,
2556 table_schema,
2557 key_buf,
2558 &old_row,
2559 row,
2560 assignments,
2561 where_clause.as_ref(),
2562 col_map,
2563 capture_returning,
2564 )
2565 }
2566 }
2567 }
2568 }
2569}
2570
2571#[inline]
2572fn apply_fast_path_patch(
2573 old_bytes: &[u8],
2574 fast_paths: &[DoUpdateFastPath],
2575) -> Result<UpsertAction> {
2576 UPSERT_SCRATCH.with(|slot| {
2577 let mut bufs = slot.borrow_mut();
2578 bufs.new_value_buf.clear();
2579 bufs.new_value_buf.extend_from_slice(old_bytes);
2580
2581 let mut patch_scratch: Vec<u8> = Vec::new();
2582
2583 for fp in fast_paths {
2584 match fp {
2585 DoUpdateFastPath::IntAddConst { phys_idx, delta } => {
2586 let decoded =
2587 crate::encoding::decode_columns(&bufs.new_value_buf, &[*phys_idx])?;
2588 let old_val = &decoded[0];
2589 let new_val = match old_val {
2590 Value::Integer(i) => Value::Integer(i.wrapping_add(*delta)),
2591 Value::Null => Value::Null,
2592 _ => {
2593 return Err(SqlError::TypeMismatch {
2594 expected: "INTEGER".into(),
2595 got: old_val.data_type().to_string(),
2596 });
2597 }
2598 };
2599 if !crate::encoding::patch_column_in_place(
2600 &mut bufs.new_value_buf,
2601 *phys_idx,
2602 &new_val,
2603 )? {
2604 patch_scratch.clear();
2605 crate::encoding::patch_row_column(
2606 &bufs.new_value_buf,
2607 *phys_idx,
2608 &new_val,
2609 &mut patch_scratch,
2610 )?;
2611 std::mem::swap(&mut bufs.new_value_buf, &mut patch_scratch);
2612 }
2613 }
2614 }
2615 }
2616
2617 if bufs.new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2618 return Err(SqlError::RowTooLarge {
2619 size: bufs.new_value_buf.len(),
2620 max: citadel_core::MAX_VALUE_SIZE,
2621 });
2622 }
2623
2624 Ok(UpsertAction::Replace(bufs.new_value_buf.clone()))
2625 })
2626}
2627
2628fn upsert_needs_row(oc: &CompiledOnConflict, ts: &TableSchema) -> bool {
2629 if !ts.indices.is_empty() {
2630 return true;
2631 }
2632 match oc {
2633 CompiledOnConflict::DoNothing { .. } => false,
2634 CompiledOnConflict::DoUpdate { fast_paths, .. } => fast_paths.is_none() || ts.has_checks(),
2635 }
2636}
2637
2638fn can_fuse_do_update(ts: &TableSchema, assignments: &[(usize, Expr)]) -> bool {
2639 if !ts.indices.is_empty() {
2640 return false;
2641 }
2642 if !ts.foreign_keys.is_empty() {
2643 return false;
2644 }
2645 if ts.columns.iter().any(|c| c.generated_kind.is_some()) {
2646 return false;
2647 }
2648 let pk = ts.pk_indices();
2649 !assignments.iter().any(|(ci, _)| pk.contains(ci))
2650}
2651
2652#[allow(clippy::too_many_arguments)]
2653#[inline]
2654fn apply_do_update_fused(
2655 wtx: &mut WriteTxn<'_>,
2656 table_schema: &TableSchema,
2657 table_bytes: &[u8],
2658 key_buf: &[u8],
2659 value_buf: &[u8],
2660 proposed_row: &[Value],
2661 assignments: &[(usize, Expr)],
2662 where_clause: Option<&Expr>,
2663 col_map: &ColumnMap,
2664 fast_paths: Option<&[DoUpdateFastPath]>,
2665 capture_returning: bool,
2666) -> Result<InsertRowOutcome> {
2667 let non_pk = table_schema.non_pk_indices();
2668 let enc_pos = table_schema.encoding_positions();
2669 let phys_count = table_schema.physical_non_pk_count();
2670 let dropped = table_schema.dropped_non_pk_slots();
2671 let has_checks = table_schema.has_checks();
2672 let has_fks = !table_schema.foreign_keys.is_empty();
2673
2674 let captured: std::cell::RefCell<Option<(Vec<Value>, Vec<Value>)>> =
2675 std::cell::RefCell::new(None);
2676
2677 let outcome =
2678 wtx.table_upsert_with::<_, SqlError>(table_bytes, key_buf, value_buf, |old_bytes| {
2679 if let Some(fps) = fast_paths {
2680 if !has_checks {
2681 let action = apply_fast_path_patch(old_bytes, fps)?;
2682 if capture_returning {
2683 if let UpsertAction::Replace(ref new_bytes) = action {
2684 let old_row = decode_full_row(table_schema, key_buf, old_bytes)?;
2685 let new_row = decode_full_row(table_schema, key_buf, new_bytes)?;
2686 *captured.borrow_mut() = Some((old_row, new_row));
2687 }
2688 }
2689 return Ok(action);
2690 }
2691 }
2692 UPSERT_SCRATCH.with(|slot| {
2693 let mut bufs = slot.borrow_mut();
2694 let UpsertBufs {
2695 old_row,
2696 new_row,
2697 value_values,
2698 new_value_buf,
2699 } = &mut *bufs;
2700
2701 old_row.clear();
2702 old_row.resize(table_schema.columns.len(), Value::Null);
2703 decode_full_row_into(table_schema, key_buf, old_bytes, old_row)?;
2704
2705 if let Some(w) = where_clause {
2706 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2707 let result = eval_expr(w, &ctx)?;
2708 if result.is_null() || !is_truthy(&result) {
2709 return Ok(UpsertAction::Skip);
2710 }
2711 }
2712
2713 new_row.clear();
2714 new_row.extend_from_slice(old_row);
2715 for (col_idx, expr) in assignments {
2716 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2717 let val = eval_expr(expr, &ctx)?;
2718 let col = &table_schema.columns[*col_idx];
2719 new_row[*col_idx] = if val.is_null() {
2720 Value::Null
2721 } else {
2722 let got = val.data_type();
2723 val.coerce_into(col.data_type)
2724 .ok_or_else(|| SqlError::TypeMismatch {
2725 expected: col.data_type.to_string(),
2726 got: got.to_string(),
2727 })?
2728 };
2729 }
2730
2731 for (assigned_idx, _) in assignments {
2732 let col = &table_schema.columns[*assigned_idx];
2733 if !col.nullable && new_row[col.position as usize].is_null() {
2734 return Err(SqlError::NotNullViolation(col.name.clone()));
2735 }
2736 }
2737 if has_checks {
2738 for col in &table_schema.columns {
2739 if let Some(ref check) = col.check_expr {
2740 let ctx = EvalCtx::new(col_map, new_row);
2741 let result = eval_expr(check, &ctx)?;
2742 if !is_truthy(&result) && !result.is_null() {
2743 let name = col.check_name.as_deref().unwrap_or(&col.name);
2744 return Err(SqlError::CheckViolation(name.to_string()));
2745 }
2746 }
2747 }
2748 for tc in &table_schema.check_constraints {
2749 let ctx = EvalCtx::new(col_map, new_row);
2750 let result = eval_expr(&tc.expr, &ctx)?;
2751 if !is_truthy(&result) && !result.is_null() {
2752 let name = tc.name.as_deref().unwrap_or(&tc.sql);
2753 return Err(SqlError::CheckViolation(name.to_string()));
2754 }
2755 }
2756 }
2757 let _ = has_fks;
2758
2759 value_values.clear();
2760 value_values.resize(phys_count, Value::Null);
2761 for &slot in dropped {
2762 value_values[slot as usize] = Value::Null;
2763 }
2764 for (j, &i) in non_pk.iter().enumerate() {
2765 value_values[enc_pos[j] as usize] = new_row[i].clone();
2766 }
2767 new_value_buf.clear();
2768 crate::encoding::encode_row_into(value_values, new_value_buf);
2769
2770 if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2771 return Err(SqlError::RowTooLarge {
2772 size: new_value_buf.len(),
2773 max: citadel_core::MAX_VALUE_SIZE,
2774 });
2775 }
2776
2777 if capture_returning {
2778 *captured.borrow_mut() = Some((old_row.clone(), new_row.clone()));
2779 }
2780 Ok(UpsertAction::Replace(new_value_buf.clone()))
2781 })
2782 })?;
2783
2784 match outcome {
2785 UpsertOutcome::Inserted => Ok(InsertRowOutcome::Inserted),
2786 UpsertOutcome::Updated => {
2787 if capture_returning {
2788 let (old, new) = captured.into_inner().ok_or_else(|| {
2789 SqlError::InvalidValue("DO UPDATE produced no captured rows".into())
2790 })?;
2791 Ok(InsertRowOutcome::Updated { old, new })
2792 } else {
2793 Ok(InsertRowOutcome::Inserted)
2794 }
2795 }
2796 UpsertOutcome::Skipped => Ok(InsertRowOutcome::Skipped),
2797 }
2798}
2799
2800fn fetch_unique_index_pk(
2801 wtx: &mut WriteTxn<'_>,
2802 table_schema: &TableSchema,
2803 index_idx: usize,
2804 row: &[Value],
2805) -> Result<Vec<u8>> {
2806 let idx = &table_schema.indices[index_idx];
2807 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2808 let indexed: Vec<Value> = idx
2809 .column_positions_iter()
2810 .map(|col_idx| row[col_idx as usize].clone())
2811 .collect();
2812 let key = crate::encoding::encode_composite_key(&indexed);
2813 let value = wtx
2814 .table_get(&idx_table, &key)
2815 .map_err(SqlError::Storage)?
2816 .ok_or_else(|| {
2817 SqlError::InvalidValue("unique index missing expected collision entry".into())
2818 })?;
2819 Ok(value)
2820}
2821
2822#[allow(clippy::too_many_arguments)]
2823fn apply_do_update(
2824 wtx: &mut WriteTxn<'_>,
2825 table_schema: &TableSchema,
2826 pk_key: &[u8],
2827 proposed_row: &[Value],
2828 assignments: &[(usize, Expr)],
2829 where_clause: Option<&Expr>,
2830 col_map: &ColumnMap,
2831 capture_returning: bool,
2832) -> Result<InsertRowOutcome> {
2833 let old_value = wtx
2834 .table_get(table_schema.name.as_bytes(), pk_key)
2835 .map_err(SqlError::Storage)?
2836 .ok_or_else(|| SqlError::InvalidValue("primary row missing for DO UPDATE target".into()))?;
2837 let old_row = decode_full_row(table_schema, pk_key, &old_value)?;
2838 apply_do_update_with_old_row(
2839 wtx,
2840 table_schema,
2841 pk_key,
2842 &old_row,
2843 proposed_row,
2844 assignments,
2845 where_clause,
2846 col_map,
2847 capture_returning,
2848 )
2849}
2850
2851#[allow(clippy::too_many_arguments)]
2852fn apply_do_update_with_old_row(
2853 wtx: &mut WriteTxn<'_>,
2854 table_schema: &TableSchema,
2855 old_pk_key: &[u8],
2856 old_row: &[Value],
2857 proposed_row: &[Value],
2858 assignments: &[(usize, Expr)],
2859 where_clause: Option<&Expr>,
2860 col_map: &ColumnMap,
2861 capture_returning: bool,
2862) -> Result<InsertRowOutcome> {
2863 if let Some(w) = where_clause {
2864 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2865 let result = eval_expr(w, &ctx)?;
2866 if result.is_null() || !is_truthy(&result) {
2867 return Ok(InsertRowOutcome::Skipped);
2868 }
2869 }
2870
2871 let mut new_row = old_row.to_vec();
2872 for (col_idx, expr) in assignments {
2873 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2874 let val = eval_expr(expr, &ctx)?;
2875 let col = &table_schema.columns[*col_idx];
2876 new_row[*col_idx] = if val.is_null() {
2877 Value::Null
2878 } else {
2879 let got = val.data_type();
2880 val.coerce_into(col.data_type)
2881 .ok_or_else(|| SqlError::TypeMismatch {
2882 expected: col.data_type.to_string(),
2883 got: got.to_string(),
2884 })?
2885 };
2886 }
2887
2888 for col in &table_schema.columns {
2889 if matches!(
2890 col.generated_kind,
2891 Some(crate::parser::GeneratedKind::Stored)
2892 ) {
2893 let val = eval_expr(
2894 col.generated_expr.as_ref().unwrap(),
2895 &EvalCtx::new(col_map, &new_row),
2896 )?;
2897 let pos = col.position as usize;
2898 new_row[pos] = if val.is_null() {
2899 if !col.nullable {
2900 return Err(SqlError::NotNullViolation(col.name.clone()));
2901 }
2902 Value::Null
2903 } else {
2904 let got = val.data_type();
2905 val.coerce_into(col.data_type)
2906 .ok_or_else(|| SqlError::TypeMismatch {
2907 expected: col.data_type.to_string(),
2908 got: got.to_string(),
2909 })?
2910 };
2911 }
2912 }
2913
2914 let pk_indices = table_schema.pk_indices();
2915 let assigned_pk = assignments.iter().any(|(ci, _)| pk_indices.contains(ci));
2916 let pk_changed = assigned_pk && pk_indices.iter().any(|&i| old_row[i] != new_row[i]);
2917
2918 for (assigned_idx, _) in assignments {
2919 let col = &table_schema.columns[*assigned_idx];
2920 if !col.nullable && new_row[col.position as usize].is_null() {
2921 return Err(SqlError::NotNullViolation(col.name.clone()));
2922 }
2923 }
2924 if table_schema.has_checks() {
2925 for col in &table_schema.columns {
2926 if let Some(ref check) = col.check_expr {
2927 let ctx = EvalCtx::new(col_map, &new_row);
2928 let result = eval_expr(check, &ctx)?;
2929 if !is_truthy(&result) && !result.is_null() {
2930 let name = col.check_name.as_deref().unwrap_or(&col.name);
2931 return Err(SqlError::CheckViolation(name.to_string()));
2932 }
2933 }
2934 }
2935 for tc in &table_schema.check_constraints {
2936 let ctx = EvalCtx::new(col_map, &new_row);
2937 let result = eval_expr(&tc.expr, &ctx)?;
2938 if !is_truthy(&result) && !result.is_null() {
2939 let name = tc.name.as_deref().unwrap_or(&tc.sql);
2940 return Err(SqlError::CheckViolation(name.to_string()));
2941 }
2942 }
2943 }
2944 for fk in &table_schema.foreign_keys {
2945 let changed = fk
2946 .columns
2947 .iter()
2948 .any(|&ci| old_row[ci as usize] != new_row[ci as usize]);
2949 if !changed {
2950 continue;
2951 }
2952 let any_null = fk.columns.iter().any(|&ci| new_row[ci as usize].is_null());
2953 if any_null {
2954 continue;
2955 }
2956 let fk_vals: Vec<Value> = fk
2957 .columns
2958 .iter()
2959 .map(|&ci| new_row[ci as usize].clone())
2960 .collect();
2961 let fk_key = crate::encoding::encode_composite_key(&fk_vals);
2962 if fk.deferrable && fk.initially_deferred {
2963 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table).to_string();
2964 wtx.defer_fk_check(citadel_txn::write_txn::DeferredFkCheck {
2965 fk_name: name,
2966 foreign_table: fk.foreign_table.as_bytes().to_vec(),
2967 parent_key: fk_key,
2968 });
2969 continue;
2970 }
2971 if !wtx.fk_check_cached(fk.foreign_table.as_bytes(), &fk_key) {
2972 let found = wtx
2973 .table_get(fk.foreign_table.as_bytes(), &fk_key)
2974 .map_err(SqlError::Storage)?;
2975 if found.is_none() {
2976 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
2977 return Err(SqlError::ForeignKeyViolation(name.to_string()));
2978 }
2979 wtx.mark_fk_verified(fk.foreign_table.as_bytes(), &fk_key);
2980 }
2981 }
2982
2983 let has_indices = !table_schema.indices.is_empty();
2984 let old_pk_values: Vec<Value> = if has_indices || pk_changed {
2985 pk_indices.iter().map(|&i| old_row[i].clone()).collect()
2986 } else {
2987 Vec::new()
2988 };
2989 let new_pk_values: Vec<Value> = if has_indices || pk_changed {
2990 pk_indices.iter().map(|&i| new_row[i].clone()).collect()
2991 } else {
2992 Vec::new()
2993 };
2994
2995 let non_pk = table_schema.non_pk_indices();
2996 let enc_pos = table_schema.encoding_positions();
2997 let phys_count = table_schema.physical_non_pk_count();
2998 let dropped = table_schema.dropped_non_pk_slots();
2999 let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
3000 for &slot in dropped {
3001 value_values[slot as usize] = Value::Null;
3002 }
3003 for (j, &i) in non_pk.iter().enumerate() {
3004 let col = &table_schema.columns[i];
3005 value_values[enc_pos[j] as usize] = if matches!(
3006 col.generated_kind,
3007 Some(crate::parser::GeneratedKind::Virtual)
3008 ) {
3009 Value::Null
3010 } else {
3011 new_row[i].clone()
3012 };
3013 }
3014 let mut new_value_buf = Vec::with_capacity(256);
3015 crate::encoding::encode_row_into(&value_values, &mut new_value_buf);
3016
3017 if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
3018 return Err(SqlError::RowTooLarge {
3019 size: new_value_buf.len(),
3020 max: citadel_core::MAX_VALUE_SIZE,
3021 });
3022 }
3023
3024 if pk_changed {
3025 let new_pk_key = crate::encoding::encode_composite_key(&new_pk_values);
3026 let inserted = wtx
3027 .table_insert(table_schema.name.as_bytes(), &new_pk_key, &new_value_buf)
3028 .map_err(SqlError::Storage)?;
3029 if !inserted {
3030 return Err(SqlError::DuplicateKey);
3031 }
3032 wtx.table_delete(table_schema.name.as_bytes(), old_pk_key)
3033 .map_err(SqlError::Storage)?;
3034 for idx in &table_schema.indices {
3035 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
3036 let old_idx_key =
3037 encode_index_key_with_schema(idx, old_row, &old_pk_values, table_schema);
3038 wtx.table_delete(&idx_table, &old_idx_key)
3039 .map_err(SqlError::Storage)?;
3040 let new_idx_key =
3041 encode_index_key_with_schema(idx, &new_row, &new_pk_values, table_schema);
3042 let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
3043 let is_new = wtx
3044 .table_insert(&idx_table, &new_idx_key, &new_idx_val)
3045 .map_err(SqlError::Storage)?;
3046 if idx.unique && !is_new {
3047 let any_null = idx
3048 .column_positions_iter()
3049 .any(|c| new_row[c as usize].is_null());
3050 if !any_null {
3051 return Err(SqlError::UniqueViolation(idx.name.clone()));
3052 }
3053 }
3054 }
3055 } else {
3056 wtx.table_update_sorted(
3057 table_schema.name.as_bytes(),
3058 &[(old_pk_key, new_value_buf.as_slice())],
3059 )
3060 .map_err(SqlError::Storage)?;
3061 let col_map_partial = any_partial_index(table_schema).then(|| table_schema.column_map());
3062 for idx in &table_schema.indices {
3063 let cols_changed = index_columns_changed(idx, old_row, &new_row);
3064 let (del, ins) = partial_idx_update_actions(
3065 idx,
3066 old_row,
3067 &new_row,
3068 cols_changed,
3069 false,
3070 col_map_partial,
3071 );
3072 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
3073 if del {
3074 let old_idx_key =
3075 encode_index_key_with_schema(idx, old_row, &old_pk_values, table_schema);
3076 wtx.table_delete(&idx_table, &old_idx_key)
3077 .map_err(SqlError::Storage)?;
3078 }
3079 if ins {
3080 let new_idx_key =
3081 encode_index_key_with_schema(idx, &new_row, &new_pk_values, table_schema);
3082 let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
3083 let is_new = wtx
3084 .table_insert(&idx_table, &new_idx_key, &new_idx_val)
3085 .map_err(SqlError::Storage)?;
3086 if idx.unique && !is_new {
3087 let any_null = idx
3088 .column_positions_iter()
3089 .any(|c| new_row[c as usize].is_null());
3090 if !any_null {
3091 return Err(SqlError::UniqueViolation(idx.name.clone()));
3092 }
3093 }
3094 }
3095 }
3096 }
3097
3098 if capture_returning {
3099 Ok(InsertRowOutcome::Updated {
3100 old: old_row.to_vec(),
3101 new: new_row,
3102 })
3103 } else {
3104 Ok(InsertRowOutcome::Inserted)
3105 }
3106}
3107
3108fn detect_fast_paths(
3109 ts: &TableSchema,
3110 assignments: &[(usize, Expr)],
3111) -> Option<Vec<DoUpdateFastPath>> {
3112 let non_pk = ts.non_pk_indices();
3113 let enc_pos = ts.encoding_positions();
3114 let mut out = Vec::with_capacity(assignments.len());
3115 for (col_idx, expr) in assignments {
3116 let col = &ts.columns[*col_idx];
3117 if col.data_type != DataType::Integer {
3118 return None;
3119 }
3120 let nonpk_order = non_pk.iter().position(|&i| i == *col_idx)?;
3121 let phys_idx = enc_pos[nonpk_order] as usize;
3122
3123 if let Expr::BinaryOp { left, op, right } = expr {
3124 if !matches!(op, BinOp::Add | BinOp::Sub) {
3125 return None;
3126 }
3127 let reads_target =
3128 matches!(left.as_ref(), Expr::Column(n) if n.eq_ignore_ascii_case(&col.name));
3129 if !reads_target {
3130 return None;
3131 }
3132 if let Expr::Literal(Value::Integer(n)) = right.as_ref() {
3133 let delta = if matches!(op, BinOp::Sub) { -n } else { *n };
3134 let _ = col_idx;
3135 out.push(DoUpdateFastPath::IntAddConst { phys_idx, delta });
3136 continue;
3137 }
3138 return None;
3139 }
3140 return None;
3141 }
3142 Some(out)
3143}
3144
3145fn compile_on_conflict(oc: &OnConflictClause, ts: &TableSchema) -> Result<CompiledOnConflict> {
3146 let target = oc
3147 .target
3148 .as_ref()
3149 .map(|t| resolve_conflict_target(t, ts))
3150 .transpose()?;
3151 match &oc.action {
3152 OnConflictAction::DoNothing => Ok(CompiledOnConflict::DoNothing { target }),
3153 OnConflictAction::DoUpdate {
3154 assignments,
3155 where_clause,
3156 } => {
3157 let target = target.ok_or_else(|| {
3158 SqlError::Plan("ON CONFLICT without target requires DO NOTHING".into())
3159 })?;
3160 let compiled_assignments: Vec<(usize, Expr)> = assignments
3161 .iter()
3162 .map(|(name, expr)| {
3163 let col_idx = ts
3164 .column_index(name)
3165 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?;
3166 Ok((col_idx, expr.clone()))
3167 })
3168 .collect::<Result<_>>()?;
3169 let fast_paths = if where_clause.is_none() {
3170 detect_fast_paths(ts, &compiled_assignments)
3171 } else {
3172 None
3173 };
3174 Ok(CompiledOnConflict::DoUpdate {
3175 target,
3176 assignments: compiled_assignments,
3177 where_clause: where_clause.clone(),
3178 fast_paths,
3179 })
3180 }
3181 }
3182}
3183
3184fn exec_insert_trivial_fast(
3186 wtx: &mut WriteTxn<'_>,
3187 table_lower: &str,
3188 cache: &InsertCache,
3189 bufs: &mut InsertBufs,
3190 params: &[Value],
3191) -> Result<ExecutionResult> {
3192 let prog = cache
3193 .trivial_fast_program
3194 .as_ref()
3195 .expect("trivial fast: program");
3196
3197 for &p in &prog.not_null_param_indices {
3198 if params[p as usize].is_null() {
3199 return Err(SqlError::NotNullViolation(format!("param@{p}")));
3200 }
3201 }
3202
3203 match ¶ms[prog.pk_param as usize] {
3204 Value::Integer(v) => crate::encoding::encode_int_key_into(*v, &mut bufs.key_buf),
3205 _ => return Err(SqlError::InvalidValue("non-integer PK in fast path".into())),
3206 }
3207
3208 bufs.value_buf.clear();
3209 bufs.value_buf.extend_from_slice(&prog.template);
3210
3211 for op in &prog.ops {
3212 match op {
3213 WriteOp::ParamI64 { param_idx, off } => match ¶ms[*param_idx as usize] {
3214 Value::Integer(v) => {
3215 let off = *off as usize;
3216 bufs.value_buf[off..off + 8].copy_from_slice(&v.to_le_bytes());
3217 }
3218 other => {
3219 return Err(SqlError::TypeMismatch {
3220 expected: "Integer".into(),
3221 got: other.data_type().to_string(),
3222 });
3223 }
3224 },
3225 WriteOp::LiteralI64 { value, off } => {
3226 let off = *off as usize;
3227 bufs.value_buf[off..off + 8].copy_from_slice(&value.to_le_bytes());
3228 }
3229 WriteOp::GenAddParamsI64 {
3230 a_param,
3231 b_param,
3232 off,
3233 bitmap_byte_off,
3234 bitmap_bit_mask,
3235 } => match (¶ms[*a_param as usize], ¶ms[*b_param as usize]) {
3236 (Value::Integer(a), Value::Integer(b)) => {
3237 let off = *off as usize;
3238 bufs.value_buf[off..off + 8].copy_from_slice(&a.wrapping_add(*b).to_le_bytes());
3239 }
3240 _ => {
3241 bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
3242 }
3243 },
3244 WriteOp::GenMulAddParamI64 {
3245 param_idx,
3246 mul,
3247 add,
3248 off,
3249 bitmap_byte_off,
3250 bitmap_bit_mask,
3251 } => match ¶ms[*param_idx as usize] {
3252 Value::Integer(v) => {
3253 let r = v.wrapping_mul(*mul).wrapping_add(*add);
3254 let off = *off as usize;
3255 bufs.value_buf[off..off + 8].copy_from_slice(&r.to_le_bytes());
3256 }
3257 _ => {
3258 bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
3259 }
3260 },
3261 }
3262 }
3263
3264 let is_new = wtx
3265 .table_insert(table_lower.as_bytes(), &bufs.key_buf, &bufs.value_buf)
3266 .map_err(SqlError::Storage)?;
3267 if !is_new {
3268 return Err(SqlError::DuplicateKey);
3269 }
3270 Ok(ExecutionResult::RowsAffected(1))
3271}
3272
3273fn build_bind_plan(
3274 stmt: &InsertStmt,
3275 col_indices: &[usize],
3276 col_data_types: &[DataType],
3277) -> Option<Vec<BindAction>> {
3278 let rows = match &stmt.source {
3279 InsertSource::Values(rows) => rows,
3280 _ => return None,
3281 };
3282 if rows.len() != 1 {
3283 return None;
3284 }
3285 let value_row = &rows[0];
3286 if value_row.len() != col_indices.len() {
3287 return None;
3288 }
3289 let mut plan = Vec::with_capacity(value_row.len());
3290 for (i, expr) in value_row.iter().enumerate() {
3291 let col_idx = col_indices[i];
3292 let target = col_data_types[col_idx];
3293 match expr {
3294 Expr::Parameter(n) => {
3295 if *n == 0 {
3296 return None;
3297 }
3298 plan.push(BindAction::Param {
3299 param_idx: n - 1,
3300 col_idx,
3301 target,
3302 });
3303 }
3304 Expr::Literal(v) => plan.push(BindAction::Literal {
3305 value: v.clone(),
3306 col_idx,
3307 }),
3308 _ => return None,
3309 }
3310 }
3311 Some(plan)
3312}
3313
3314impl CompiledInsert {
3315 pub fn try_compile(schema: &SchemaManager, stmt: &InsertStmt) -> Option<Self> {
3316 let lower = stmt.table.to_ascii_lowercase();
3317 let cached = if let Some(ts) = schema.get(&lower) {
3318 let insert_columns: Vec<&str> = if stmt.columns.is_empty() {
3319 ts.columns.iter().map(|c| c.name.as_str()).collect()
3320 } else {
3321 stmt.columns.iter().map(|s| s.as_str()).collect()
3322 };
3323 let mut col_indices = Vec::with_capacity(insert_columns.len());
3324 for name in &insert_columns {
3325 col_indices.push(ts.column_index(name)?);
3326 }
3327 if col_indices
3328 .iter()
3329 .any(|&ci| ts.columns[ci].generated_kind.is_some())
3330 {
3331 return None;
3332 }
3333 let on_conflict = stmt
3334 .on_conflict
3335 .as_ref()
3336 .map(|oc| compile_on_conflict(oc, ts))
3337 .transpose()
3338 .ok()
3339 .flatten()
3340 .map(Arc::new);
3341 let generated_col_positions: Vec<usize> = ts
3342 .columns
3343 .iter()
3344 .enumerate()
3345 .filter_map(|(i, c)| {
3346 matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored))
3347 .then_some(i)
3348 })
3349 .collect();
3350 let generated_fast_evals: Vec<FastGenEval> = generated_col_positions
3351 .iter()
3352 .map(|&pos| {
3353 detect_fast_gen_eval(ts.columns[pos].generated_expr.as_ref().unwrap(), ts)
3354 })
3355 .collect();
3356 let row_col_map = if on_conflict.is_some() || !generated_col_positions.is_empty() {
3357 Some(ColumnMap::new(&ts.columns))
3358 } else {
3359 None
3360 };
3361 let pk_indices: Vec<usize> = ts.pk_indices().to_vec();
3362 let non_pk_indices: Vec<usize> = ts.non_pk_indices().to_vec();
3363 let encoding_positions: Vec<u16> = ts.encoding_positions().to_vec();
3364 let dropped_non_pk_slots: Vec<u16> = ts.dropped_non_pk_slots().to_vec();
3365 let phys_count = ts.physical_non_pk_count();
3366 let col_data_types: Vec<DataType> = ts.columns.iter().map(|c| c.data_type).collect();
3367 let single_int_pk =
3368 pk_indices.len() == 1 && ts.columns[pk_indices[0]].data_type == DataType::Integer;
3369 let not_null_indices: Vec<u16> = ts
3370 .columns
3371 .iter()
3372 .filter(|c| !c.nullable)
3373 .map(|c| c.position)
3374 .collect();
3375 let bind_plan = build_bind_plan(stmt, &col_indices, &col_data_types);
3376 let any_defaults_flag = ts.columns.iter().any(|c| c.default_expr.is_some());
3377 let row_fully_overwritten = if any_defaults_flag {
3378 false
3379 } else {
3380 let mut covered: rustc_hash::FxHashSet<usize> =
3381 col_indices.iter().copied().collect();
3382 covered.extend(generated_col_positions.iter().copied());
3383 for (j, &i) in non_pk_indices.iter().enumerate() {
3384 let _ = j;
3385 if matches!(
3386 ts.columns[i].generated_kind,
3387 Some(crate::parser::GeneratedKind::Virtual)
3388 ) {
3389 covered.insert(i);
3390 }
3391 }
3392 bind_plan.is_some() && covered.len() == ts.columns.len()
3393 };
3394 let has_fks = !ts.foreign_keys.is_empty();
3395 let has_indices = !ts.indices.is_empty();
3396 let mut non_virtual_pairs: Vec<(usize, usize)> = Vec::new();
3397 let mut null_value_slots: Vec<usize> =
3398 dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3399 for (j, &i) in non_pk_indices.iter().enumerate() {
3400 let slot = encoding_positions[j] as usize;
3401 if matches!(
3402 ts.columns[i].generated_kind,
3403 Some(crate::parser::GeneratedKind::Virtual)
3404 ) {
3405 null_value_slots.push(slot);
3406 } else {
3407 non_virtual_pairs.push((i, slot));
3408 }
3409 }
3410 let row_encoder = {
3411 let all_int_or_null = non_pk_indices.iter().enumerate().all(|(j, &i)| {
3412 let col = &ts.columns[i];
3413 if matches!(
3414 col.generated_kind,
3415 Some(crate::parser::GeneratedKind::Virtual)
3416 ) {
3417 true
3418 } else {
3419 col.data_type == DataType::Integer && encoding_positions[j] != u16::MAX
3420 }
3421 });
3422 if all_int_or_null {
3423 let mut null_slots: Vec<usize> =
3424 dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3425 for (j, &i) in non_pk_indices.iter().enumerate() {
3426 if matches!(
3427 ts.columns[i].generated_kind,
3428 Some(crate::parser::GeneratedKind::Virtual)
3429 ) {
3430 null_slots.push(encoding_positions[j] as usize);
3431 }
3432 }
3433 Some(crate::encoding::build_int_row_template(
3434 phys_count,
3435 &null_slots,
3436 ))
3437 } else {
3438 None
3439 }
3440 };
3441 let is_trivial_fast_eligible = !insert_has_subquery(stmt)
3442 && !ts.columns.iter().any(|c| c.default_expr.is_some())
3443 && !ts.has_checks()
3444 && !has_fks
3445 && !has_indices
3446 && stmt.on_conflict.is_none()
3447 && stmt.returning.is_none()
3448 && bind_plan.is_some()
3449 && row_encoder.is_some()
3450 && row_fully_overwritten
3451 && single_int_pk
3452 && generated_fast_evals
3453 .iter()
3454 .all(|fe| !matches!(fe, FastGenEval::None));
3455 let trivial_fast_program = if is_trivial_fast_eligible {
3456 build_trivial_fast_program(
3457 bind_plan.as_ref().unwrap(),
3458 row_encoder.as_ref().unwrap(),
3459 &non_virtual_pairs,
3460 &generated_col_positions,
3461 &generated_fast_evals,
3462 &pk_indices,
3463 &ts.columns,
3464 )
3465 } else {
3466 None
3467 };
3468 let is_trivial_fast = trivial_fast_program.is_some();
3469 let has_checks = ts.has_checks();
3470 let any_defaults = ts.columns.iter().any(|c| c.default_expr.is_some());
3471 let needs_scoped_params = bind_plan.is_none()
3472 || has_checks
3473 || any_defaults
3474 || !generated_col_positions.is_empty()
3475 || on_conflict.is_some()
3476 || stmt.returning.is_some()
3477 || insert_has_subquery(stmt)
3478 || super::helpers::any_partial_index(ts);
3479 Some(InsertCache {
3480 col_indices,
3481 has_subquery: insert_has_subquery(stmt),
3482 any_defaults,
3483 has_checks,
3484 on_conflict,
3485 row_col_map,
3486 generated_col_positions,
3487 generated_fast_evals,
3488 pk_indices,
3489 non_pk_indices,
3490 encoding_positions,
3491 dropped_non_pk_slots,
3492 phys_count,
3493 single_int_pk,
3494 not_null_indices,
3495 bind_plan,
3496 row_fully_overwritten,
3497 row_encoder,
3498 is_trivial_fast,
3499 trivial_fast_program,
3500 needs_scoped_params,
3501 })
3502 } else if schema.get_view(&lower).is_some() {
3503 None
3504 } else {
3505 return None;
3506 };
3507 Some(Self {
3508 table_lower: lower,
3509 cached,
3510 })
3511 }
3512}
3513
3514impl CompiledPlan for CompiledInsert {
3515 fn execute(
3516 &self,
3517 db: &Database,
3518 schema: &SchemaManager,
3519 stmt: &Statement,
3520 params: &[Value],
3521 wtx: Option<&mut WriteTxn<'_>>,
3522 ) -> Result<ExecutionResult> {
3523 let ins = match stmt {
3524 Statement::Insert(i) => i,
3525 _ => {
3526 return Err(SqlError::Unsupported(
3527 "CompiledInsert received non-INSERT statement".into(),
3528 ))
3529 }
3530 };
3531 match wtx {
3532 None => exec_insert(db, schema, ins, params),
3533 Some(outer) => match self.cached.as_ref() {
3534 Some(c) if c.is_trivial_fast => with_insert_scratch(|bufs| {
3535 exec_insert_trivial_fast(outer, &self.table_lower, c, bufs, params)
3536 }),
3537 Some(c) => exec_insert_in_txn_cached(outer, schema, ins, params, c),
3538 None => exec_insert_in_txn(outer, schema, ins, params),
3539 },
3540 }
3541 }
3542
3543 fn uses_scoped_params(&self) -> bool {
3544 match self.cached.as_ref() {
3545 Some(c) => !c.is_trivial_fast && c.needs_scoped_params,
3546 None => true,
3547 }
3548 }
3549}
3550
3551pub struct CompiledDelete {
3552 table_lower: String,
3553}
3554
3555impl CompiledDelete {
3556 pub fn try_compile(schema: &SchemaManager, stmt: &DeleteStmt) -> Option<Self> {
3557 let lower = stmt.table.to_ascii_lowercase();
3558 schema.get(&lower)?;
3559 Some(Self { table_lower: lower })
3560 }
3561}
3562
3563impl CompiledPlan for CompiledDelete {
3564 fn execute(
3565 &self,
3566 db: &Database,
3567 schema: &SchemaManager,
3568 stmt: &Statement,
3569 _params: &[Value],
3570 wtx: Option<&mut WriteTxn<'_>>,
3571 ) -> Result<ExecutionResult> {
3572 let del = match stmt {
3573 Statement::Delete(d) => d,
3574 _ => {
3575 return Err(SqlError::Unsupported(
3576 "CompiledDelete received non-DELETE statement".into(),
3577 ))
3578 }
3579 };
3580 let _ = &self.table_lower;
3581 match wtx {
3582 None => super::write::exec_delete(db, schema, del),
3583 Some(outer) => super::write::exec_delete_in_txn(outer, schema, del),
3584 }
3585 }
3586}
3587
3588fn exec_instead_of_view_insert_auto(
3589 db: &Database,
3590 schema: &SchemaManager,
3591 view_name: &str,
3592 aliases: &[String],
3593 stmt: &InsertStmt,
3594 params: &[Value],
3595) -> Result<ExecutionResult> {
3596 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
3597 let r = exec_instead_of_view_insert_in_txn(&mut wtx, schema, view_name, aliases, stmt, params)?;
3598 wtx.commit().map_err(SqlError::Storage)?;
3599 Ok(r)
3600}
3601
3602fn exec_instead_of_view_insert_in_txn(
3603 wtx: &mut WriteTxn<'_>,
3604 schema: &SchemaManager,
3605 view_name: &str,
3606 aliases: &[String],
3607 stmt: &InsertStmt,
3608 params: &[Value],
3609) -> Result<ExecutionResult> {
3610 let resolved_aliases: Vec<String> = if aliases.is_empty() {
3612 derive_view_columns(wtx, schema, view_name)?
3613 } else {
3614 aliases.to_vec()
3615 };
3616 let view_cols = super::triggers::view_columns_from_aliases(&resolved_aliases);
3617 let alias_map: rustc_hash::FxHashMap<String, usize> = resolved_aliases
3618 .iter()
3619 .enumerate()
3620 .map(|(i, name)| (name.to_ascii_lowercase(), i))
3621 .collect();
3622
3623 let target_positions: Vec<usize> = if stmt.columns.is_empty() {
3624 (0..resolved_aliases.len()).collect()
3625 } else {
3626 stmt.columns
3627 .iter()
3628 .map(|c| {
3629 alias_map
3630 .get(&c.to_ascii_lowercase())
3631 .copied()
3632 .ok_or_else(|| SqlError::ColumnNotFound(c.clone()))
3633 })
3634 .collect::<Result<_>>()?
3635 };
3636
3637 let source_rows: Vec<Vec<Value>> = match &stmt.source {
3638 InsertSource::Values(rows) => {
3639 let mut out = Vec::with_capacity(rows.len());
3640 for row in rows {
3641 if row.len() != target_positions.len() {
3642 return Err(SqlError::InvalidValue(format!(
3643 "expected {} values, got {}",
3644 target_positions.len(),
3645 row.len()
3646 )));
3647 }
3648 let mut vals = Vec::with_capacity(row.len());
3649 for expr in row {
3650 let v = match expr {
3651 Expr::Parameter(n) => params
3652 .get(n - 1)
3653 .cloned()
3654 .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
3655 Expr::Literal(v) => v.clone(),
3656 other => eval_const_expr(other)?,
3657 };
3658 vals.push(v);
3659 }
3660 out.push(vals);
3661 }
3662 out
3663 }
3664 InsertSource::Select(sq) => {
3665 let empty_ctes = CteContext::default();
3666 let qr = exec_query_body_write(wtx, schema, &sq.body, &empty_ctes)?;
3667 qr.rows
3668 }
3669 };
3670
3671 let mut count: u64 = 0;
3672 for row in source_rows {
3673 if row.len() != target_positions.len() {
3674 return Err(SqlError::InvalidValue(format!(
3675 "expected {} values, got {}",
3676 target_positions.len(),
3677 row.len()
3678 )));
3679 }
3680 let mut new_row = vec![Value::Null; resolved_aliases.len()];
3681 for (slot, val) in target_positions.iter().zip(row) {
3682 new_row[*slot] = val;
3683 }
3684 super::triggers::fire_row_triggers(
3685 wtx,
3686 schema,
3687 view_name,
3688 crate::parser::TriggerTiming::InsteadOf,
3689 super::triggers::FireEvent::Insert,
3690 None,
3691 Some(new_row),
3692 &view_cols,
3693 )?;
3694 count += 1;
3695 }
3696 Ok(ExecutionResult::RowsAffected(count))
3697}
3698
3699fn derive_view_columns(
3700 wtx: &mut WriteTxn<'_>,
3701 schema: &SchemaManager,
3702 view_name: &str,
3703) -> Result<Vec<String>> {
3704 use crate::parser::{QueryBody, SelectColumn, SelectQuery, SelectStmt};
3705 let sel = SelectStmt {
3706 columns: vec![SelectColumn::AllColumns],
3707 from: view_name.to_string(),
3708 from_alias: None,
3709 from_subquery: None,
3710 from_args: None,
3711 from_json_table: None,
3712 joins: vec![],
3713 distinct: false,
3714 where_clause: None,
3715 order_by: vec![],
3716 limit: Some(Expr::Literal(Value::Integer(1))),
3717 offset: None,
3718 group_by: vec![],
3719 having: None,
3720 };
3721 let sq = SelectQuery {
3722 ctes: vec![],
3723 recursive: false,
3724 body: QueryBody::Select(Box::new(sel)),
3725 };
3726 let qr = super::cte::exec_select_query_in_txn(wtx, schema, &sq)?;
3727 match qr {
3728 ExecutionResult::Query(q) => Ok(q.columns),
3729 _ => Ok(Vec::new()),
3730 }
3731}