1use std::cell::RefCell;
2use std::sync::Arc;
3
4use citadel::Database;
5use citadel_buffer::btree::{UpsertAction, UpsertOutcome};
6use citadel_txn::write_txn::WriteTxn;
7use rustc_hash::FxHashMap;
8
9use crate::encoding::{encode_composite_key_into, encode_row_into};
10use crate::error::{Result, SqlError};
11use crate::eval::{eval_expr, is_truthy, ColumnMap, EvalCtx};
12use crate::parser::*;
13use crate::types::*;
14
15use crate::schema::SchemaManager;
16
17use super::compile::CompiledPlan;
18use super::helpers::*;
19use super::CteContext;
20
21pub(super) fn exec_insert(
22 db: &Database,
23 schema: &SchemaManager,
24 stmt: &InsertStmt,
25 params: &[Value],
26) -> Result<ExecutionResult> {
27 let empty_ctes = CteContext::default();
28 let materialized;
29 let stmt = if insert_has_subquery(stmt) {
30 materialized = materialize_insert(stmt, &mut |sub| {
31 exec_subquery_read(db, schema, sub, &empty_ctes)
32 })?;
33 &materialized
34 } else {
35 stmt
36 };
37
38 let lower_name = stmt.table.to_ascii_lowercase();
39 if schema.get_view(&lower_name).is_some() {
40 return Err(SqlError::CannotModifyView(stmt.table.clone()));
41 }
42 let table_schema = schema
43 .get(&lower_name)
44 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
45
46 let insert_columns = if stmt.columns.is_empty() {
47 table_schema
48 .columns
49 .iter()
50 .map(|c| c.name.clone())
51 .collect::<Vec<_>>()
52 } else {
53 stmt.columns
54 .iter()
55 .map(|c| c.to_ascii_lowercase())
56 .collect()
57 };
58
59 let col_indices: Vec<usize> = insert_columns
60 .iter()
61 .map(|name| {
62 table_schema
63 .column_index(name)
64 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
65 })
66 .collect::<Result<_>>()?;
67
68 for &ci in &col_indices {
69 if table_schema.columns[ci].generated_kind.is_some() {
70 return Err(SqlError::CannotInsertIntoGeneratedColumn(
71 table_schema.columns[ci].name.clone(),
72 ));
73 }
74 }
75
76 let defaults: Vec<(usize, &Expr)> = table_schema
77 .columns
78 .iter()
79 .filter(|c| c.default_expr.is_some() && !col_indices.contains(&(c.position as usize)))
80 .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
81 .collect();
82
83 let generated_cols: Vec<(usize, &Expr)> = table_schema
84 .columns
85 .iter()
86 .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
87 .map(|c| (c.position as usize, c.generated_expr.as_ref().unwrap()))
88 .collect();
89
90 let has_checks = table_schema.has_checks();
91 let row_col_map_for_gen = if !generated_cols.is_empty() {
92 Some(ColumnMap::new(&table_schema.columns))
93 } else {
94 None
95 };
96 let check_col_map = if has_checks {
97 Some(ColumnMap::new(&table_schema.columns))
98 } else {
99 None
100 };
101
102 let select_rows = match &stmt.source {
103 InsertSource::Select(sq) => {
104 let insert_ctes =
105 super::materialize_all_ctes(&sq.ctes, sq.recursive, &mut |body, ctx| {
106 exec_query_body_read(db, schema, body, ctx)
107 })?;
108 let qr = exec_query_body_read(db, schema, &sq.body, &insert_ctes)?;
109 Some(qr.rows)
110 }
111 InsertSource::Values(_) => None,
112 };
113
114 let compiled_conflict: Option<Arc<CompiledOnConflict>> = stmt
115 .on_conflict
116 .as_ref()
117 .map(|oc| compile_on_conflict(oc, table_schema).map(Arc::new))
118 .transpose()?;
119
120 let row_col_map = compiled_conflict
121 .as_ref()
122 .map(|_| ColumnMap::new(&table_schema.columns));
123
124 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
125 let mut count: u64 = 0;
126 let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
127 stmt.returning.as_ref().map(|_| Vec::new());
128
129 let pk_indices = table_schema.pk_indices();
130 let non_pk = table_schema.non_pk_indices();
131 let enc_pos = table_schema.encoding_positions();
132 let phys_count = table_schema.physical_non_pk_count();
133 let mut row = vec![Value::Null; table_schema.columns.len()];
134 let mut pk_values: Vec<Value> = vec![Value::Null; pk_indices.len()];
135 let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
136 let mut key_buf: Vec<u8> = Vec::with_capacity(64);
137 let mut value_buf: Vec<u8> = Vec::with_capacity(256);
138 let mut fk_key_buf: Vec<u8> = Vec::with_capacity(64);
139
140 let values = match &stmt.source {
141 InsertSource::Values(rows) => Some(rows.as_slice()),
142 InsertSource::Select(_) => None,
143 };
144 let sel_rows = select_rows.as_deref();
145
146 let total = match (values, sel_rows) {
147 (Some(rows), _) => rows.len(),
148 (_, Some(rows)) => rows.len(),
149 _ => 0,
150 };
151
152 if let Some(sel) = sel_rows {
153 if !sel.is_empty() && sel[0].len() != insert_columns.len() {
154 return Err(SqlError::InvalidValue(format!(
155 "INSERT ... SELECT column count mismatch: expected {}, got {}",
156 insert_columns.len(),
157 sel[0].len()
158 )));
159 }
160 }
161
162 for idx in 0..total {
163 for v in row.iter_mut() {
164 *v = Value::Null;
165 }
166
167 if let Some(value_rows) = values {
168 let value_row = &value_rows[idx];
169 if value_row.len() != insert_columns.len() {
170 return Err(SqlError::InvalidValue(format!(
171 "expected {} values, got {}",
172 insert_columns.len(),
173 value_row.len()
174 )));
175 }
176 for (i, expr) in value_row.iter().enumerate() {
177 let val = if let Expr::Parameter(n) = expr {
178 params
179 .get(n - 1)
180 .cloned()
181 .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?
182 } else {
183 eval_const_expr(expr)?
184 };
185 let col_idx = col_indices[i];
186 let col = &table_schema.columns[col_idx];
187 let got_type = val.data_type();
188 row[col_idx] = if val.is_null() {
189 Value::Null
190 } else {
191 val.coerce_into(col.data_type)
192 .ok_or_else(|| SqlError::TypeMismatch {
193 expected: col.data_type.to_string(),
194 got: got_type.to_string(),
195 })?
196 };
197 }
198 } else if let Some(sel) = sel_rows {
199 let sel_row = &sel[idx];
200 for (i, val) in sel_row.iter().enumerate() {
201 let col_idx = col_indices[i];
202 let col = &table_schema.columns[col_idx];
203 let got_type = val.data_type();
204 row[col_idx] = if val.is_null() {
205 Value::Null
206 } else {
207 val.clone().coerce_into(col.data_type).ok_or_else(|| {
208 SqlError::TypeMismatch {
209 expected: col.data_type.to_string(),
210 got: got_type.to_string(),
211 }
212 })?
213 };
214 }
215 }
216
217 for &(pos, def_expr) in &defaults {
218 let val = eval_const_expr(def_expr)?;
219 let col = &table_schema.columns[pos];
220 if !val.is_null() {
221 let got_type = val.data_type();
222 row[pos] =
223 val.coerce_into(col.data_type)
224 .ok_or_else(|| SqlError::TypeMismatch {
225 expected: col.data_type.to_string(),
226 got: got_type.to_string(),
227 })?;
228 }
229 }
230
231 if let Some(ref gen_map) = row_col_map_for_gen {
232 for &(pos, gen_expr) in &generated_cols {
233 let val = eval_expr(gen_expr, &EvalCtx::new(gen_map, &row))?;
234 let col = &table_schema.columns[pos];
235 row[pos] = if val.is_null() {
236 Value::Null
237 } else {
238 let got_type = val.data_type();
239 val.coerce_into(col.data_type)
240 .ok_or_else(|| SqlError::TypeMismatch {
241 expected: col.data_type.to_string(),
242 got: got_type.to_string(),
243 })?
244 };
245 }
246 }
247
248 for col in &table_schema.columns {
249 if !col.nullable && row[col.position as usize].is_null() {
250 return Err(SqlError::NotNullViolation(col.name.clone()));
251 }
252 }
253
254 if let Some(ref col_map) = check_col_map {
255 for col in &table_schema.columns {
256 if let Some(ref check) = col.check_expr {
257 let result = eval_expr(check, &EvalCtx::new(col_map, &row))?;
258 if !is_truthy(&result) && !result.is_null() {
259 let name = col.check_name.as_deref().unwrap_or(&col.name);
260 return Err(SqlError::CheckViolation(name.to_string()));
261 }
262 }
263 }
264 for tc in &table_schema.check_constraints {
265 let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &row))?;
266 if !is_truthy(&result) && !result.is_null() {
267 let name = tc.name.as_deref().unwrap_or(&tc.sql);
268 return Err(SqlError::CheckViolation(name.to_string()));
269 }
270 }
271 }
272
273 for fk in &table_schema.foreign_keys {
274 let any_null = fk.columns.iter().any(|&ci| row[ci as usize].is_null());
275 if any_null {
276 continue; }
278 let fk_vals: Vec<Value> = fk
279 .columns
280 .iter()
281 .map(|&ci| row[ci as usize].clone())
282 .collect();
283 fk_key_buf.clear();
284 encode_composite_key_into(&fk_vals, &mut fk_key_buf);
285 let found = wtx
286 .table_get(fk.foreign_table.as_bytes(), &fk_key_buf)
287 .map_err(SqlError::Storage)?;
288 if found.is_none() {
289 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
290 return Err(SqlError::ForeignKeyViolation(name.to_string()));
291 }
292 }
293
294 let proposed_row_for_returning: Option<Vec<Value>> =
295 returning_rows.as_ref().map(|_| row.clone());
296
297 for (j, &i) in pk_indices.iter().enumerate() {
298 pk_values[j] = std::mem::replace(&mut row[i], Value::Null);
299 }
300 encode_composite_key_into(&pk_values, &mut key_buf);
301
302 for (j, &i) in non_pk.iter().enumerate() {
303 let col = &table_schema.columns[i];
304 if matches!(
305 col.generated_kind,
306 Some(crate::parser::GeneratedKind::Virtual)
307 ) {
308 value_values[enc_pos[j] as usize] = Value::Null;
309 row[i] = Value::Null;
310 } else {
311 value_values[enc_pos[j] as usize] = std::mem::replace(&mut row[i], Value::Null);
312 }
313 }
314 encode_row_into(&value_values, &mut value_buf);
315
316 if key_buf.len() > citadel_core::MAX_KEY_SIZE {
317 return Err(SqlError::KeyTooLarge {
318 size: key_buf.len(),
319 max: citadel_core::MAX_KEY_SIZE,
320 });
321 }
322 if value_buf.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
323 return Err(SqlError::RowTooLarge {
324 size: value_buf.len(),
325 max: citadel_core::MAX_INLINE_VALUE_SIZE,
326 });
327 }
328
329 match compiled_conflict.as_ref() {
330 None => {
331 let is_new = wtx
332 .table_insert(stmt.table.as_bytes(), &key_buf, &value_buf)
333 .map_err(SqlError::Storage)?;
334 if !is_new {
335 return Err(SqlError::DuplicateKey);
336 }
337 if !table_schema.indices.is_empty() {
338 for (j, &i) in pk_indices.iter().enumerate() {
339 row[i] = pk_values[j].clone();
340 }
341 for (j, &i) in non_pk.iter().enumerate() {
342 row[i] =
343 std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
344 }
345 insert_index_entries(&mut wtx, table_schema, &row, &pk_values)?;
346 }
347 count += 1;
348 if let Some(buf) = returning_rows.as_mut() {
349 buf.push((None, proposed_row_for_returning));
350 }
351 }
352 Some(oc) => {
353 let oc_ref: &CompiledOnConflict = oc;
354 let needs_row = upsert_needs_row(oc_ref, table_schema);
355 if needs_row {
356 for (j, &i) in pk_indices.iter().enumerate() {
357 row[i] = pk_values[j].clone();
358 }
359 for (j, &i) in non_pk.iter().enumerate() {
360 row[i] =
361 std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
362 }
363 }
364 let outcome = apply_insert_with_conflict(
365 &mut wtx,
366 table_schema,
367 &key_buf,
368 &value_buf,
369 &row,
370 &pk_values,
371 oc_ref,
372 row_col_map.as_ref().unwrap(),
373 stmt.returning.is_some(),
374 )?;
375 match outcome {
376 InsertRowOutcome::Inserted => {
377 count += 1;
378 if let Some(buf) = returning_rows.as_mut() {
379 buf.push((None, proposed_row_for_returning));
380 }
381 }
382 InsertRowOutcome::Updated { old, new } => {
383 count += 1;
384 if let Some(buf) = returning_rows.as_mut() {
385 buf.push((Some(old), Some(new)));
386 }
387 }
388 InsertRowOutcome::Skipped => {}
389 }
390 }
391 }
392 }
393
394 if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
395 let qr = super::helpers::project_returning(table_schema, returning_cols, &rows)?;
396 wtx.commit().map_err(SqlError::Storage)?;
397 return Ok(ExecutionResult::Query(qr));
398 }
399
400 wtx.commit().map_err(SqlError::Storage)?;
401 Ok(ExecutionResult::RowsAffected(count))
402}
403
404pub(super) fn has_subquery(expr: &Expr) -> bool {
405 crate::parser::has_subquery(expr)
406}
407
408pub(super) fn stmt_has_subquery(stmt: &SelectStmt) -> bool {
409 if let Some(ref w) = stmt.where_clause {
410 if has_subquery(w) {
411 return true;
412 }
413 }
414 if let Some(ref h) = stmt.having {
415 if has_subquery(h) {
416 return true;
417 }
418 }
419 for col in &stmt.columns {
420 if let SelectColumn::Expr { expr, .. } = col {
421 if has_subquery(expr) {
422 return true;
423 }
424 }
425 }
426 for ob in &stmt.order_by {
427 if has_subquery(&ob.expr) {
428 return true;
429 }
430 }
431 for join in &stmt.joins {
432 if let Some(ref on_expr) = join.on_clause {
433 if has_subquery(on_expr) {
434 return true;
435 }
436 }
437 }
438 false
439}
440
441pub(super) fn materialize_expr(
442 expr: &Expr,
443 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
444) -> Result<Expr> {
445 match expr {
446 Expr::InSubquery {
447 expr: e,
448 subquery,
449 negated,
450 } => {
451 let inner = materialize_expr(e, exec_sub)?;
452 let qr = exec_sub(subquery)?;
453 if !qr.columns.is_empty() && qr.columns.len() != 1 {
454 return Err(SqlError::SubqueryMultipleColumns);
455 }
456 let mut values = rustc_hash::FxHashSet::default();
457 let mut has_null = false;
458 for row in &qr.rows {
459 if row[0].is_null() {
460 has_null = true;
461 } else {
462 values.insert(row[0].clone());
463 }
464 }
465 Ok(Expr::InSet {
466 expr: Box::new(inner),
467 values,
468 has_null,
469 negated: *negated,
470 })
471 }
472 Expr::ScalarSubquery(subquery) => {
473 let qr = exec_sub(subquery)?;
474 if qr.rows.len() > 1 {
475 return Err(SqlError::SubqueryMultipleRows);
476 }
477 let val = if qr.rows.is_empty() {
478 Value::Null
479 } else {
480 qr.rows[0][0].clone()
481 };
482 Ok(Expr::Literal(val))
483 }
484 Expr::Exists { subquery, negated } => {
485 let qr = exec_sub(subquery)?;
486 let exists = !qr.rows.is_empty();
487 let result = if *negated { !exists } else { exists };
488 Ok(Expr::Literal(Value::Boolean(result)))
489 }
490 Expr::InList {
491 expr: e,
492 list,
493 negated,
494 } => {
495 let inner = materialize_expr(e, exec_sub)?;
496 let items = list
497 .iter()
498 .map(|item| materialize_expr(item, exec_sub))
499 .collect::<Result<Vec<_>>>()?;
500 Ok(Expr::InList {
501 expr: Box::new(inner),
502 list: items,
503 negated: *negated,
504 })
505 }
506 Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
507 left: Box::new(materialize_expr(left, exec_sub)?),
508 op: *op,
509 right: Box::new(materialize_expr(right, exec_sub)?),
510 }),
511 Expr::UnaryOp { op, expr: e } => Ok(Expr::UnaryOp {
512 op: *op,
513 expr: Box::new(materialize_expr(e, exec_sub)?),
514 }),
515 Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(materialize_expr(e, exec_sub)?))),
516 Expr::IsNotNull(e) => Ok(Expr::IsNotNull(Box::new(materialize_expr(e, exec_sub)?))),
517 Expr::InSet {
518 expr: e,
519 values,
520 has_null,
521 negated,
522 } => Ok(Expr::InSet {
523 expr: Box::new(materialize_expr(e, exec_sub)?),
524 values: values.clone(),
525 has_null: *has_null,
526 negated: *negated,
527 }),
528 Expr::Between {
529 expr: e,
530 low,
531 high,
532 negated,
533 } => Ok(Expr::Between {
534 expr: Box::new(materialize_expr(e, exec_sub)?),
535 low: Box::new(materialize_expr(low, exec_sub)?),
536 high: Box::new(materialize_expr(high, exec_sub)?),
537 negated: *negated,
538 }),
539 Expr::Like {
540 expr: e,
541 pattern,
542 escape,
543 negated,
544 } => {
545 let esc = escape
546 .as_ref()
547 .map(|es| materialize_expr(es, exec_sub).map(Box::new))
548 .transpose()?;
549 Ok(Expr::Like {
550 expr: Box::new(materialize_expr(e, exec_sub)?),
551 pattern: Box::new(materialize_expr(pattern, exec_sub)?),
552 escape: esc,
553 negated: *negated,
554 })
555 }
556 Expr::Case {
557 operand,
558 conditions,
559 else_result,
560 } => {
561 let op = operand
562 .as_ref()
563 .map(|e| materialize_expr(e, exec_sub).map(Box::new))
564 .transpose()?;
565 let conds = conditions
566 .iter()
567 .map(|(c, r)| {
568 Ok((
569 materialize_expr(c, exec_sub)?,
570 materialize_expr(r, exec_sub)?,
571 ))
572 })
573 .collect::<Result<Vec<_>>>()?;
574 let else_r = else_result
575 .as_ref()
576 .map(|e| materialize_expr(e, exec_sub).map(Box::new))
577 .transpose()?;
578 Ok(Expr::Case {
579 operand: op,
580 conditions: conds,
581 else_result: else_r,
582 })
583 }
584 Expr::Coalesce(args) => {
585 let materialized = args
586 .iter()
587 .map(|a| materialize_expr(a, exec_sub))
588 .collect::<Result<Vec<_>>>()?;
589 Ok(Expr::Coalesce(materialized))
590 }
591 Expr::Cast { expr: e, data_type } => Ok(Expr::Cast {
592 expr: Box::new(materialize_expr(e, exec_sub)?),
593 data_type: *data_type,
594 }),
595 Expr::Function {
596 name,
597 args,
598 distinct,
599 } => {
600 let materialized = args
601 .iter()
602 .map(|a| materialize_expr(a, exec_sub))
603 .collect::<Result<Vec<_>>>()?;
604 Ok(Expr::Function {
605 name: name.clone(),
606 args: materialized,
607 distinct: *distinct,
608 })
609 }
610 other => Ok(other.clone()),
611 }
612}
613
614pub(super) fn materialize_stmt(
615 stmt: &SelectStmt,
616 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
617) -> Result<SelectStmt> {
618 let where_clause = stmt
619 .where_clause
620 .as_ref()
621 .map(|e| materialize_expr(e, exec_sub))
622 .transpose()?;
623 let having = stmt
624 .having
625 .as_ref()
626 .map(|e| materialize_expr(e, exec_sub))
627 .transpose()?;
628 let columns = stmt
629 .columns
630 .iter()
631 .map(|c| match c {
632 SelectColumn::AllColumns => Ok(SelectColumn::AllColumns),
633 SelectColumn::AllFromOld => Ok(SelectColumn::AllFromOld),
634 SelectColumn::AllFromNew => Ok(SelectColumn::AllFromNew),
635 SelectColumn::Expr { expr, alias } => Ok(SelectColumn::Expr {
636 expr: materialize_expr(expr, exec_sub)?,
637 alias: alias.clone(),
638 }),
639 })
640 .collect::<Result<Vec<_>>>()?;
641 let order_by = stmt
642 .order_by
643 .iter()
644 .map(|ob| {
645 Ok(OrderByItem {
646 expr: materialize_expr(&ob.expr, exec_sub)?,
647 descending: ob.descending,
648 nulls_first: ob.nulls_first,
649 })
650 })
651 .collect::<Result<Vec<_>>>()?;
652 let joins = stmt
653 .joins
654 .iter()
655 .map(|j| {
656 let on_clause = j
657 .on_clause
658 .as_ref()
659 .map(|e| materialize_expr(e, exec_sub))
660 .transpose()?;
661 Ok(JoinClause {
662 join_type: j.join_type,
663 table: j.table.clone(),
664 on_clause,
665 })
666 })
667 .collect::<Result<Vec<_>>>()?;
668 let group_by = stmt
669 .group_by
670 .iter()
671 .map(|e| materialize_expr(e, exec_sub))
672 .collect::<Result<Vec<_>>>()?;
673 Ok(SelectStmt {
674 columns,
675 from: stmt.from.clone(),
676 from_alias: stmt.from_alias.clone(),
677 joins,
678 distinct: stmt.distinct,
679 where_clause,
680 order_by,
681 limit: stmt.limit.clone(),
682 offset: stmt.offset.clone(),
683 group_by,
684 having,
685 })
686}
687
688pub(super) fn exec_subquery_read(
689 db: &Database,
690 schema: &SchemaManager,
691 stmt: &SelectStmt,
692 ctes: &CteContext,
693) -> Result<QueryResult> {
694 match super::exec_select(db, schema, stmt, ctes)? {
695 ExecutionResult::Query(qr) => Ok(qr),
696 _ => Ok(QueryResult {
697 columns: vec![],
698 rows: vec![],
699 }),
700 }
701}
702
703pub(super) fn exec_subquery_write(
704 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
705 schema: &SchemaManager,
706 stmt: &SelectStmt,
707 ctes: &CteContext,
708) -> Result<QueryResult> {
709 match super::exec_select_in_txn(wtx, schema, stmt, ctes)? {
710 ExecutionResult::Query(qr) => Ok(qr),
711 _ => Ok(QueryResult {
712 columns: vec![],
713 rows: vec![],
714 }),
715 }
716}
717
718pub(super) fn update_has_subquery(stmt: &UpdateStmt) -> bool {
719 stmt.where_clause.as_ref().is_some_and(has_subquery)
720 || stmt.assignments.iter().any(|(_, e)| has_subquery(e))
721}
722
723pub(super) fn materialize_update(
724 stmt: &UpdateStmt,
725 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
726) -> Result<UpdateStmt> {
727 let where_clause = stmt
728 .where_clause
729 .as_ref()
730 .map(|e| materialize_expr(e, exec_sub))
731 .transpose()?;
732 let assignments = stmt
733 .assignments
734 .iter()
735 .map(|(name, expr)| Ok((name.clone(), materialize_expr(expr, exec_sub)?)))
736 .collect::<Result<Vec<_>>>()?;
737 Ok(UpdateStmt {
738 table: stmt.table.clone(),
739 assignments,
740 where_clause,
741 returning: stmt.returning.clone(),
742 })
743}
744
745pub(super) fn delete_has_subquery(stmt: &DeleteStmt) -> bool {
746 stmt.where_clause.as_ref().is_some_and(has_subquery)
747}
748
749pub(super) fn materialize_delete(
750 stmt: &DeleteStmt,
751 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
752) -> Result<DeleteStmt> {
753 let where_clause = stmt
754 .where_clause
755 .as_ref()
756 .map(|e| materialize_expr(e, exec_sub))
757 .transpose()?;
758 Ok(DeleteStmt {
759 table: stmt.table.clone(),
760 where_clause,
761 returning: stmt.returning.clone(),
762 })
763}
764
765pub(super) fn insert_has_subquery(stmt: &InsertStmt) -> bool {
766 match &stmt.source {
767 InsertSource::Values(rows) => rows.iter().any(|row| row.iter().any(has_subquery)),
768 InsertSource::Select(_) => false,
770 }
771}
772
773pub(super) fn materialize_insert(
774 stmt: &InsertStmt,
775 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
776) -> Result<InsertStmt> {
777 let source = match &stmt.source {
778 InsertSource::Values(rows) => {
779 let mat = rows
780 .iter()
781 .map(|row| {
782 row.iter()
783 .map(|e| materialize_expr(e, exec_sub))
784 .collect::<Result<Vec<_>>>()
785 })
786 .collect::<Result<Vec<_>>>()?;
787 InsertSource::Values(mat)
788 }
789 InsertSource::Select(sq) => {
790 let ctes = sq
791 .ctes
792 .iter()
793 .map(|c| {
794 Ok(CteDefinition {
795 name: c.name.clone(),
796 column_aliases: c.column_aliases.clone(),
797 body: materialize_query_body(&c.body, exec_sub)?,
798 })
799 })
800 .collect::<Result<Vec<_>>>()?;
801 let body = materialize_query_body(&sq.body, exec_sub)?;
802 InsertSource::Select(Box::new(SelectQuery {
803 ctes,
804 recursive: sq.recursive,
805 body,
806 }))
807 }
808 };
809 Ok(InsertStmt {
810 table: stmt.table.clone(),
811 columns: stmt.columns.clone(),
812 source,
813 on_conflict: stmt.on_conflict.clone(),
814 returning: stmt.returning.clone(),
815 })
816}
817
818pub(super) fn materialize_query_body(
819 body: &QueryBody,
820 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
821) -> Result<QueryBody> {
822 match body {
823 QueryBody::Select(sel) => Ok(QueryBody::Select(Box::new(materialize_stmt(
824 sel, exec_sub,
825 )?))),
826 QueryBody::Compound(comp) => Ok(QueryBody::Compound(Box::new(CompoundSelect {
827 op: comp.op.clone(),
828 all: comp.all,
829 left: Box::new(materialize_query_body(&comp.left, exec_sub)?),
830 right: Box::new(materialize_query_body(&comp.right, exec_sub)?),
831 order_by: comp.order_by.clone(),
832 limit: comp.limit.clone(),
833 offset: comp.offset.clone(),
834 }))),
835 QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Ok(body.clone()),
836 }
837}
838
839pub(super) fn exec_query_body(
840 db: &Database,
841 schema: &SchemaManager,
842 body: &QueryBody,
843 ctes: &CteContext,
844) -> Result<ExecutionResult> {
845 match body {
846 QueryBody::Select(sel) => super::exec_select(db, schema, sel, ctes),
847 QueryBody::Compound(comp) => exec_compound_select(db, schema, comp, ctes),
848 QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Err(
849 SqlError::Unsupported("DML CTE bodies require an active write transaction".into()),
850 ),
851 }
852}
853
854pub(super) fn exec_query_body_in_txn(
855 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
856 schema: &SchemaManager,
857 body: &QueryBody,
858 ctes: &CteContext,
859) -> Result<ExecutionResult> {
860 match body {
861 QueryBody::Select(sel) => super::exec_select_in_txn(wtx, schema, sel, ctes),
862 QueryBody::Compound(comp) => exec_compound_select_in_txn(wtx, schema, comp, ctes),
863 QueryBody::Insert(ins) => exec_insert_in_txn_with_ctes(wtx, schema, ins, &[], ctes),
864 QueryBody::Update(upd) => super::exec_update_in_txn(wtx, schema, upd),
865 QueryBody::Delete(del) => super::exec_delete_in_txn(wtx, schema, del),
866 }
867}
868
869pub(super) fn exec_query_body_read(
870 db: &Database,
871 schema: &SchemaManager,
872 body: &QueryBody,
873 ctes: &CteContext,
874) -> Result<QueryResult> {
875 match exec_query_body(db, schema, body, ctes)? {
876 ExecutionResult::Query(qr) => Ok(qr),
877 _ => Ok(QueryResult {
878 columns: vec![],
879 rows: vec![],
880 }),
881 }
882}
883
884pub(super) fn exec_query_body_write(
885 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
886 schema: &SchemaManager,
887 body: &QueryBody,
888 ctes: &CteContext,
889) -> Result<QueryResult> {
890 match exec_query_body_in_txn(wtx, schema, body, ctes)? {
891 ExecutionResult::Query(qr) => Ok(qr),
892 _ => Ok(QueryResult {
893 columns: vec![],
894 rows: vec![],
895 }),
896 }
897}
898
899pub(super) fn exec_compound_select(
900 db: &Database,
901 schema: &SchemaManager,
902 comp: &CompoundSelect,
903 ctes: &CteContext,
904) -> Result<ExecutionResult> {
905 let left_qr = match exec_query_body(db, schema, &comp.left, ctes)? {
906 ExecutionResult::Query(qr) => qr,
907 _ => QueryResult {
908 columns: vec![],
909 rows: vec![],
910 },
911 };
912 let right_qr = match exec_query_body(db, schema, &comp.right, ctes)? {
913 ExecutionResult::Query(qr) => qr,
914 _ => QueryResult {
915 columns: vec![],
916 rows: vec![],
917 },
918 };
919 apply_set_operation(comp, left_qr, right_qr)
920}
921
922pub(super) fn exec_compound_select_in_txn(
923 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
924 schema: &SchemaManager,
925 comp: &CompoundSelect,
926 ctes: &CteContext,
927) -> Result<ExecutionResult> {
928 let left_qr = match exec_query_body_in_txn(wtx, schema, &comp.left, ctes)? {
929 ExecutionResult::Query(qr) => qr,
930 _ => QueryResult {
931 columns: vec![],
932 rows: vec![],
933 },
934 };
935 let right_qr = match exec_query_body_in_txn(wtx, schema, &comp.right, ctes)? {
936 ExecutionResult::Query(qr) => qr,
937 _ => QueryResult {
938 columns: vec![],
939 rows: vec![],
940 },
941 };
942 apply_set_operation(comp, left_qr, right_qr)
943}
944
945pub(super) fn apply_set_operation(
946 comp: &CompoundSelect,
947 left_qr: QueryResult,
948 right_qr: QueryResult,
949) -> Result<ExecutionResult> {
950 if !left_qr.columns.is_empty()
951 && !right_qr.columns.is_empty()
952 && left_qr.columns.len() != right_qr.columns.len()
953 {
954 return Err(SqlError::CompoundColumnCountMismatch {
955 left: left_qr.columns.len(),
956 right: right_qr.columns.len(),
957 });
958 }
959
960 let columns = left_qr.columns;
961
962 let mut rows = match (&comp.op, comp.all) {
963 (SetOp::Union, true) => {
964 let mut rows = left_qr.rows;
965 rows.extend(right_qr.rows);
966 rows
967 }
968 (SetOp::Union, false) => {
969 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
970 let mut rows = Vec::new();
971 for row in left_qr.rows.into_iter().chain(right_qr.rows) {
972 if !seen.contains(&row) {
973 seen.insert(row.clone());
974 rows.push(row);
975 }
976 }
977 rows
978 }
979 (SetOp::Intersect, true) => {
980 let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
981 for row in &right_qr.rows {
982 *right_counts.entry(row.clone()).or_insert(0) += 1;
983 }
984 let mut rows = Vec::new();
985 for row in left_qr.rows {
986 if let Some(count) = right_counts.get_mut(&row) {
987 if *count > 0 {
988 *count -= 1;
989 rows.push(row);
990 }
991 }
992 }
993 rows
994 }
995 (SetOp::Intersect, false) => {
996 let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
997 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
998 let mut rows = Vec::new();
999 for row in left_qr.rows {
1000 if right_set.contains(&row) && !seen.contains(&row) {
1001 seen.insert(row.clone());
1002 rows.push(row);
1003 }
1004 }
1005 rows
1006 }
1007 (SetOp::Except, true) => {
1008 let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1009 for row in &right_qr.rows {
1010 *right_counts.entry(row.clone()).or_insert(0) += 1;
1011 }
1012 let mut rows = Vec::new();
1013 for row in left_qr.rows {
1014 if let Some(count) = right_counts.get_mut(&row) {
1015 if *count > 0 {
1016 *count -= 1;
1017 continue;
1018 }
1019 }
1020 rows.push(row);
1021 }
1022 rows
1023 }
1024 (SetOp::Except, false) => {
1025 let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1026 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1027 let mut rows = Vec::new();
1028 for row in left_qr.rows {
1029 if !right_set.contains(&row) && !seen.contains(&row) {
1030 seen.insert(row.clone());
1031 rows.push(row);
1032 }
1033 }
1034 rows
1035 }
1036 };
1037
1038 if !comp.order_by.is_empty() {
1039 let col_defs: Vec<crate::types::ColumnDef> = columns
1040 .iter()
1041 .enumerate()
1042 .map(|(i, name)| crate::types::ColumnDef {
1043 name: name.clone(),
1044 data_type: crate::types::DataType::Null,
1045 nullable: true,
1046 position: i as u16,
1047 default_expr: None,
1048 default_sql: None,
1049 check_expr: None,
1050 check_sql: None,
1051 check_name: None,
1052 is_with_timezone: false,
1053 generated_expr: None,
1054 generated_sql: None,
1055 generated_kind: None,
1056 })
1057 .collect();
1058 sort_rows(&mut rows, &comp.order_by, &col_defs)?;
1059 }
1060
1061 if let Some(ref offset_expr) = comp.offset {
1062 let offset = eval_const_int(offset_expr)?.max(0) as usize;
1063 if offset < rows.len() {
1064 rows = rows.split_off(offset);
1065 } else {
1066 rows.clear();
1067 }
1068 }
1069
1070 if let Some(ref limit_expr) = comp.limit {
1071 let limit = eval_const_int(limit_expr)?.max(0) as usize;
1072 rows.truncate(limit);
1073 }
1074
1075 Ok(ExecutionResult::Query(QueryResult { columns, rows }))
1076}
1077
1078struct InsertBufs {
1079 row: Vec<Value>,
1080 pk_values: Vec<Value>,
1081 value_values: Vec<Value>,
1082 key_buf: Vec<u8>,
1083 value_buf: Vec<u8>,
1084 col_indices: Vec<usize>,
1085 fk_key_buf: Vec<u8>,
1086}
1087
1088impl InsertBufs {
1089 fn new() -> Self {
1090 Self {
1091 row: Vec::new(),
1092 pk_values: Vec::new(),
1093 value_values: Vec::new(),
1094 key_buf: Vec::with_capacity(64),
1095 value_buf: Vec::with_capacity(256),
1096 col_indices: Vec::new(),
1097 fk_key_buf: Vec::with_capacity(64),
1098 }
1099 }
1100}
1101
1102thread_local! {
1103 static INSERT_SCRATCH: RefCell<InsertBufs> = RefCell::new(InsertBufs::new());
1104 static UPSERT_SCRATCH: RefCell<UpsertBufs> = RefCell::new(UpsertBufs::new());
1105}
1106
1107fn with_insert_scratch<R>(f: impl FnOnce(&mut InsertBufs) -> R) -> R {
1108 INSERT_SCRATCH.with(|slot| f(&mut slot.borrow_mut()))
1109}
1110
1111pub(super) struct UpsertBufs {
1112 old_row: Vec<Value>,
1113 new_row: Vec<Value>,
1114 value_values: Vec<Value>,
1115 new_value_buf: Vec<u8>,
1116}
1117
1118impl UpsertBufs {
1119 pub(super) fn new() -> Self {
1120 Self {
1121 old_row: Vec::new(),
1122 new_row: Vec::new(),
1123 value_values: Vec::new(),
1124 new_value_buf: Vec::with_capacity(256),
1125 }
1126 }
1127}
1128
1129pub fn exec_insert_in_txn(
1130 wtx: &mut WriteTxn<'_>,
1131 schema: &SchemaManager,
1132 stmt: &InsertStmt,
1133 params: &[Value],
1134) -> Result<ExecutionResult> {
1135 with_insert_scratch(|bufs| {
1136 exec_insert_in_txn_impl(
1137 wtx,
1138 schema,
1139 stmt,
1140 params,
1141 bufs,
1142 None,
1143 &CteContext::default(),
1144 )
1145 })
1146}
1147
1148pub(super) fn exec_insert_in_txn_with_ctes(
1149 wtx: &mut WriteTxn<'_>,
1150 schema: &SchemaManager,
1151 stmt: &InsertStmt,
1152 params: &[Value],
1153 outer_ctes: &CteContext,
1154) -> Result<ExecutionResult> {
1155 with_insert_scratch(|bufs| {
1156 exec_insert_in_txn_impl(wtx, schema, stmt, params, bufs, None, outer_ctes)
1157 })
1158}
1159
1160fn exec_insert_in_txn_cached(
1161 wtx: &mut WriteTxn<'_>,
1162 schema: &SchemaManager,
1163 stmt: &InsertStmt,
1164 params: &[Value],
1165 cache: &InsertCache,
1166) -> Result<ExecutionResult> {
1167 with_insert_scratch(|bufs| {
1168 exec_insert_in_txn_impl(
1169 wtx,
1170 schema,
1171 stmt,
1172 params,
1173 bufs,
1174 Some(cache),
1175 &CteContext::default(),
1176 )
1177 })
1178}
1179
1180fn exec_insert_in_txn_impl(
1181 wtx: &mut WriteTxn<'_>,
1182 schema: &SchemaManager,
1183 stmt: &InsertStmt,
1184 params: &[Value],
1185 bufs: &mut InsertBufs,
1186 cache: Option<&InsertCache>,
1187 outer_ctes: &CteContext,
1188) -> Result<ExecutionResult> {
1189 let empty_ctes = CteContext::default();
1190 let materialized;
1191 let has_sub = match cache {
1192 Some(c) => c.has_subquery,
1193 None => insert_has_subquery(stmt),
1194 };
1195 let stmt = if has_sub {
1196 materialized = materialize_insert(stmt, &mut |sub| {
1197 exec_subquery_write(wtx, schema, sub, &empty_ctes)
1198 })?;
1199 &materialized
1200 } else {
1201 stmt
1202 };
1203
1204 let table_schema = schema
1205 .get(&stmt.table)
1206 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1207
1208 let default_columns;
1209 let insert_columns: &[String] = if stmt.columns.is_empty() {
1210 default_columns = table_schema
1211 .columns
1212 .iter()
1213 .map(|c| c.name.clone())
1214 .collect::<Vec<_>>();
1215 &default_columns
1216 } else {
1217 &stmt.columns
1218 };
1219
1220 bufs.col_indices.clear();
1221 if let Some(c) = cache {
1222 bufs.col_indices.extend_from_slice(&c.col_indices);
1223 } else {
1224 for name in insert_columns {
1225 bufs.col_indices.push(
1226 table_schema
1227 .column_index(name)
1228 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?,
1229 );
1230 }
1231 }
1232
1233 if cache.is_none() {
1234 for &ci in &bufs.col_indices {
1235 if table_schema.columns[ci].generated_kind.is_some() {
1236 return Err(SqlError::CannotInsertIntoGeneratedColumn(
1237 table_schema.columns[ci].name.clone(),
1238 ));
1239 }
1240 }
1241 }
1242
1243 let generated_cols_uncached: Vec<(usize, &Expr, FastGenEval)>;
1244 let cached_gen_positions: &[usize];
1245 let cached_gen_fast_evals: &[FastGenEval];
1246 if let Some(c) = cache {
1247 cached_gen_positions = &c.generated_col_positions;
1248 cached_gen_fast_evals = &c.generated_fast_evals;
1249 generated_cols_uncached = Vec::new();
1250 } else {
1251 cached_gen_positions = &[];
1252 cached_gen_fast_evals = &[];
1253 generated_cols_uncached = table_schema
1254 .columns
1255 .iter()
1256 .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
1257 .map(|c| {
1258 let expr = c.generated_expr.as_ref().unwrap();
1259 let fe = detect_fast_gen_eval(expr, table_schema);
1260 (c.position as usize, expr, fe)
1261 })
1262 .collect();
1263 }
1264 let has_gen_cols = !cached_gen_positions.is_empty() || !generated_cols_uncached.is_empty();
1265 let row_col_map_for_gen_owned: Option<ColumnMap> = if !has_gen_cols || cache.is_some() {
1266 None
1267 } else {
1268 Some(ColumnMap::new(&table_schema.columns))
1269 };
1270 let row_col_map_for_gen: Option<&ColumnMap> = if !has_gen_cols {
1271 None
1272 } else if let Some(c) = cache {
1273 c.row_col_map.as_ref()
1274 } else {
1275 row_col_map_for_gen_owned.as_ref()
1276 };
1277
1278 let any_defaults = match cache {
1279 Some(c) => c.any_defaults,
1280 None => table_schema
1281 .columns
1282 .iter()
1283 .any(|c| c.default_expr.is_some()),
1284 };
1285 let defaults: Vec<(usize, &Expr)> = if any_defaults {
1286 table_schema
1287 .columns
1288 .iter()
1289 .filter(|c| {
1290 c.default_expr.is_some() && !bufs.col_indices.contains(&(c.position as usize))
1291 })
1292 .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
1293 .collect()
1294 } else {
1295 Vec::new()
1296 };
1297
1298 let has_checks = match cache {
1299 Some(c) => c.has_checks,
1300 None => table_schema.has_checks(),
1301 };
1302 let check_col_map = if has_checks {
1303 Some(ColumnMap::new(&table_schema.columns))
1304 } else {
1305 None
1306 };
1307
1308 let (pk_indices, non_pk, enc_pos, phys_count, dropped): (
1309 &[usize],
1310 &[usize],
1311 &[u16],
1312 usize,
1313 &[u16],
1314 ) = if let Some(c) = cache {
1315 (
1316 &c.pk_indices,
1317 &c.non_pk_indices,
1318 &c.encoding_positions,
1319 c.phys_count,
1320 &c.dropped_non_pk_slots,
1321 )
1322 } else {
1323 (
1324 table_schema.pk_indices(),
1325 table_schema.non_pk_indices(),
1326 table_schema.encoding_positions(),
1327 table_schema.physical_non_pk_count(),
1328 table_schema.dropped_non_pk_slots(),
1329 )
1330 };
1331
1332 bufs.row.resize(table_schema.columns.len(), Value::Null);
1333 bufs.pk_values.resize(pk_indices.len(), Value::Null);
1334 bufs.value_values.resize(phys_count, Value::Null);
1335
1336 let table_bytes = stmt.table.as_bytes();
1337 let has_fks = !table_schema.foreign_keys.is_empty();
1338 let has_indices = !table_schema.indices.is_empty();
1339 let has_defaults = !defaults.is_empty();
1340
1341 let compiled_conflict: Option<Arc<CompiledOnConflict>> = match (cache, &stmt.on_conflict) {
1342 (Some(c), Some(_)) if c.on_conflict.is_some() => c.on_conflict.clone(),
1343 (_, Some(oc)) => Some(Arc::new(compile_on_conflict(oc, table_schema)?)),
1344 (_, None) => None,
1345 };
1346
1347 let row_col_map_owned: Option<ColumnMap> =
1348 if compiled_conflict.is_some() && cache.and_then(|c| c.row_col_map.as_ref()).is_none() {
1349 Some(ColumnMap::new(&table_schema.columns))
1350 } else {
1351 None
1352 };
1353 let row_col_map: Option<&ColumnMap> = cache
1354 .and_then(|c| c.row_col_map.as_ref())
1355 .or(row_col_map_owned.as_ref());
1356
1357 let select_rows = match &stmt.source {
1358 InsertSource::Select(sq) => {
1359 let insert_ctes = super::materialize_all_ctes_with_outer(
1360 &sq.ctes,
1361 sq.recursive,
1362 outer_ctes,
1363 &mut |body, ctx| exec_query_body_write(wtx, schema, body, ctx),
1364 )?;
1365 let qr = exec_query_body_write(wtx, schema, &sq.body, &insert_ctes)?;
1366 Some(qr.rows)
1367 }
1368 InsertSource::Values(_) => None,
1369 };
1370
1371 let mut count: u64 = 0;
1372 let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
1373 stmt.returning.as_ref().map(|_| Vec::new());
1374
1375 let values = match &stmt.source {
1376 InsertSource::Values(rows) => Some(rows.as_slice()),
1377 InsertSource::Select(_) => None,
1378 };
1379 let sel_rows = select_rows.as_deref();
1380
1381 let total = match (values, sel_rows) {
1382 (Some(rows), _) => rows.len(),
1383 (_, Some(rows)) => rows.len(),
1384 _ => 0,
1385 };
1386
1387 if let Some(sel) = sel_rows {
1388 if !sel.is_empty() && sel[0].len() != insert_columns.len() {
1389 return Err(SqlError::InvalidValue(format!(
1390 "INSERT ... SELECT column count mismatch: expected {}, got {}",
1391 insert_columns.len(),
1392 sel[0].len()
1393 )));
1394 }
1395 }
1396
1397 let skip_row_clear = cache.is_some_and(|c| c.row_fully_overwritten);
1398 for idx in 0..total {
1399 if !skip_row_clear {
1400 for v in bufs.row.iter_mut() {
1401 *v = Value::Null;
1402 }
1403 }
1404
1405 if let Some(value_rows) = values {
1406 if let Some(plan) = cache.and_then(|c| c.bind_plan.as_ref()) {
1407 for action in plan {
1408 match action {
1409 BindAction::Param {
1410 param_idx,
1411 col_idx,
1412 target,
1413 } => {
1414 let v = ¶ms[*param_idx];
1415 bufs.row[*col_idx] = if v.is_null() {
1416 Value::Null
1417 } else if v.data_type() == *target {
1418 v.clone()
1419 } else {
1420 let got = v.data_type();
1421 v.clone().coerce_into(*target).ok_or_else(|| {
1422 SqlError::TypeMismatch {
1423 expected: target.to_string(),
1424 got: got.to_string(),
1425 }
1426 })?
1427 };
1428 }
1429 BindAction::Literal { value, col_idx } => {
1430 bufs.row[*col_idx] = value.clone();
1431 }
1432 }
1433 }
1434 } else {
1435 let value_row = &value_rows[idx];
1436 if value_row.len() != insert_columns.len() {
1437 return Err(SqlError::InvalidValue(format!(
1438 "expected {} values, got {}",
1439 insert_columns.len(),
1440 value_row.len()
1441 )));
1442 }
1443 for (i, expr) in value_row.iter().enumerate() {
1444 let val = match expr {
1445 Expr::Parameter(n) => params
1446 .get(n - 1)
1447 .cloned()
1448 .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
1449 Expr::Literal(v) => v.clone(),
1450 _ => eval_const_expr(expr)?,
1451 };
1452 let col_idx = bufs.col_indices[i];
1453 let col = &table_schema.columns[col_idx];
1454 let got_type = val.data_type();
1455 bufs.row[col_idx] = if val.is_null() {
1456 Value::Null
1457 } else {
1458 val.coerce_into(col.data_type)
1459 .ok_or_else(|| SqlError::TypeMismatch {
1460 expected: col.data_type.to_string(),
1461 got: got_type.to_string(),
1462 })?
1463 };
1464 }
1465 }
1466 } else if let Some(sel) = sel_rows {
1467 let sel_row = &sel[idx];
1468 for (i, val) in sel_row.iter().enumerate() {
1469 let col_idx = bufs.col_indices[i];
1470 let col = &table_schema.columns[col_idx];
1471 let got_type = val.data_type();
1472 bufs.row[col_idx] = if val.is_null() {
1473 Value::Null
1474 } else {
1475 val.clone().coerce_into(col.data_type).ok_or_else(|| {
1476 SqlError::TypeMismatch {
1477 expected: col.data_type.to_string(),
1478 got: got_type.to_string(),
1479 }
1480 })?
1481 };
1482 }
1483 }
1484
1485 if has_defaults {
1486 for &(pos, def_expr) in &defaults {
1487 let val = eval_const_expr(def_expr)?;
1488 let col = &table_schema.columns[pos];
1489 if !val.is_null() {
1490 let got_type = val.data_type();
1491 bufs.row[pos] =
1492 val.coerce_into(col.data_type)
1493 .ok_or_else(|| SqlError::TypeMismatch {
1494 expected: col.data_type.to_string(),
1495 got: got_type.to_string(),
1496 })?;
1497 }
1498 }
1499 }
1500
1501 if let Some(gen_map) = row_col_map_for_gen {
1502 if cache.is_some() {
1503 for (pos, fast) in cached_gen_positions
1504 .iter()
1505 .copied()
1506 .zip(cached_gen_fast_evals.iter())
1507 {
1508 let gen_expr = table_schema.columns[pos].generated_expr.as_ref().unwrap();
1509 let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1510 let col = &table_schema.columns[pos];
1511 bufs.row[pos] = if val.is_null() {
1512 Value::Null
1513 } else {
1514 let got_type = val.data_type();
1515 val.coerce_into(col.data_type)
1516 .ok_or_else(|| SqlError::TypeMismatch {
1517 expected: col.data_type.to_string(),
1518 got: got_type.to_string(),
1519 })?
1520 };
1521 }
1522 } else {
1523 for (pos, gen_expr, fast) in &generated_cols_uncached {
1524 let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1525 let col = &table_schema.columns[*pos];
1526 bufs.row[*pos] = if val.is_null() {
1527 Value::Null
1528 } else {
1529 let got_type = val.data_type();
1530 val.coerce_into(col.data_type)
1531 .ok_or_else(|| SqlError::TypeMismatch {
1532 expected: col.data_type.to_string(),
1533 got: got_type.to_string(),
1534 })?
1535 };
1536 }
1537 }
1538 }
1539
1540 if let Some(c) = cache {
1541 for &pos in &c.not_null_indices {
1542 if bufs.row[pos as usize].is_null() {
1543 return Err(SqlError::NotNullViolation(
1544 table_schema.columns[pos as usize].name.clone(),
1545 ));
1546 }
1547 }
1548 } else {
1549 for col in &table_schema.columns {
1550 if !col.nullable && bufs.row[col.position as usize].is_null() {
1551 return Err(SqlError::NotNullViolation(col.name.clone()));
1552 }
1553 }
1554 }
1555
1556 if let Some(ref col_map) = check_col_map {
1557 for col in &table_schema.columns {
1558 if let Some(ref check) = col.check_expr {
1559 let result = eval_expr(check, &EvalCtx::new(col_map, &bufs.row))?;
1560 if !is_truthy(&result) && !result.is_null() {
1561 let name = col.check_name.as_deref().unwrap_or(&col.name);
1562 return Err(SqlError::CheckViolation(name.to_string()));
1563 }
1564 }
1565 }
1566 for tc in &table_schema.check_constraints {
1567 let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &bufs.row))?;
1568 if !is_truthy(&result) && !result.is_null() {
1569 let name = tc.name.as_deref().unwrap_or(&tc.sql);
1570 return Err(SqlError::CheckViolation(name.to_string()));
1571 }
1572 }
1573 }
1574
1575 if has_fks {
1576 for fk in &table_schema.foreign_keys {
1577 let any_null = fk.columns.iter().any(|&ci| bufs.row[ci as usize].is_null());
1578 if any_null {
1579 continue;
1580 }
1581 crate::encoding::encode_composite_key_from_indices(
1582 &fk.columns,
1583 &bufs.row,
1584 &mut bufs.fk_key_buf,
1585 );
1586 let found = wtx
1587 .table_get(fk.foreign_table.as_bytes(), &bufs.fk_key_buf)
1588 .map_err(SqlError::Storage)?;
1589 if found.is_none() {
1590 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
1591 return Err(SqlError::ForeignKeyViolation(name.to_string()));
1592 }
1593 }
1594 }
1595
1596 let proposed_row_for_returning: Option<Vec<Value>> =
1597 returning_rows.as_ref().map(|_| bufs.row.clone());
1598
1599 for (j, &i) in pk_indices.iter().enumerate() {
1600 bufs.pk_values[j] = std::mem::replace(&mut bufs.row[i], Value::Null);
1601 }
1602 match cache.map(|c| c.single_int_pk).unwrap_or(false) {
1603 true => match bufs.pk_values[0] {
1604 Value::Integer(v) => crate::encoding::encode_int_key_into(v, &mut bufs.key_buf),
1605 _ => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1606 },
1607 false => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1608 }
1609
1610 for &slot in dropped {
1611 bufs.value_values[slot as usize] = Value::Null;
1612 }
1613 for (j, &i) in non_pk.iter().enumerate() {
1614 let col = &table_schema.columns[i];
1615 if matches!(
1616 col.generated_kind,
1617 Some(crate::parser::GeneratedKind::Virtual)
1618 ) {
1619 bufs.value_values[enc_pos[j] as usize] = Value::Null;
1620 bufs.row[i] = Value::Null;
1621 } else {
1622 bufs.value_values[enc_pos[j] as usize] =
1623 std::mem::replace(&mut bufs.row[i], Value::Null);
1624 }
1625 }
1626 match cache.and_then(|c| c.row_encoder.as_ref()) {
1627 Some(tmpl) => crate::encoding::encode_int_row_with_template(
1628 tmpl,
1629 &bufs.value_values,
1630 &mut bufs.value_buf,
1631 )?,
1632 None => encode_row_into(&bufs.value_values, &mut bufs.value_buf),
1633 }
1634
1635 if bufs.key_buf.len() > citadel_core::MAX_KEY_SIZE {
1636 return Err(SqlError::KeyTooLarge {
1637 size: bufs.key_buf.len(),
1638 max: citadel_core::MAX_KEY_SIZE,
1639 });
1640 }
1641 if bufs.value_buf.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
1642 return Err(SqlError::RowTooLarge {
1643 size: bufs.value_buf.len(),
1644 max: citadel_core::MAX_INLINE_VALUE_SIZE,
1645 });
1646 }
1647
1648 match compiled_conflict.as_ref() {
1649 None => {
1650 let is_new = wtx
1651 .table_insert(table_bytes, &bufs.key_buf, &bufs.value_buf)
1652 .map_err(SqlError::Storage)?;
1653 if !is_new {
1654 return Err(SqlError::DuplicateKey);
1655 }
1656 if has_indices {
1657 for (j, &i) in pk_indices.iter().enumerate() {
1658 bufs.row[i] = bufs.pk_values[j].clone();
1659 }
1660 for (j, &i) in non_pk.iter().enumerate() {
1661 bufs.row[i] = std::mem::replace(
1662 &mut bufs.value_values[enc_pos[j] as usize],
1663 Value::Null,
1664 );
1665 }
1666 insert_index_entries(wtx, table_schema, &bufs.row, &bufs.pk_values)?;
1667 }
1668 count += 1;
1669 if let Some(buf) = returning_rows.as_mut() {
1670 buf.push((None, proposed_row_for_returning));
1671 }
1672 }
1673 Some(oc) => {
1674 let oc_ref: &CompiledOnConflict = oc;
1675 let needs_row = upsert_needs_row(oc_ref, table_schema);
1676 if needs_row {
1677 for (j, &i) in pk_indices.iter().enumerate() {
1678 bufs.row[i] = bufs.pk_values[j].clone();
1679 }
1680 for (j, &i) in non_pk.iter().enumerate() {
1681 bufs.row[i] = std::mem::replace(
1682 &mut bufs.value_values[enc_pos[j] as usize],
1683 Value::Null,
1684 );
1685 }
1686 }
1687 let outcome = apply_insert_with_conflict(
1688 wtx,
1689 table_schema,
1690 &bufs.key_buf,
1691 &bufs.value_buf,
1692 &bufs.row,
1693 &bufs.pk_values,
1694 oc_ref,
1695 row_col_map.unwrap(),
1696 stmt.returning.is_some(),
1697 )?;
1698 match outcome {
1699 InsertRowOutcome::Inserted => {
1700 count += 1;
1701 if let Some(buf) = returning_rows.as_mut() {
1702 buf.push((None, proposed_row_for_returning));
1703 }
1704 }
1705 InsertRowOutcome::Updated { old, new } => {
1706 count += 1;
1707 if let Some(buf) = returning_rows.as_mut() {
1708 buf.push((Some(old), Some(new)));
1709 }
1710 }
1711 InsertRowOutcome::Skipped => {}
1712 }
1713 }
1714 }
1715 }
1716
1717 if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
1718 return Ok(ExecutionResult::Query(super::helpers::project_returning(
1719 table_schema,
1720 returning_cols,
1721 &rows,
1722 )?));
1723 }
1724
1725 Ok(ExecutionResult::RowsAffected(count))
1726}
1727
1728pub struct CompiledInsert {
1729 table_lower: String,
1730 cached: Option<InsertCache>,
1731}
1732
1733struct InsertCache {
1734 col_indices: Vec<usize>,
1735 has_subquery: bool,
1736 any_defaults: bool,
1737 has_checks: bool,
1738 on_conflict: Option<Arc<CompiledOnConflict>>,
1739 row_col_map: Option<ColumnMap>,
1740 generated_col_positions: Vec<usize>,
1741 generated_fast_evals: Vec<FastGenEval>,
1742 pk_indices: Vec<usize>,
1743 non_pk_indices: Vec<usize>,
1744 encoding_positions: Vec<u16>,
1745 dropped_non_pk_slots: Vec<u16>,
1746 phys_count: usize,
1747 single_int_pk: bool,
1748 not_null_indices: Vec<u16>,
1749 bind_plan: Option<Vec<BindAction>>,
1750 row_fully_overwritten: bool,
1751 row_encoder: Option<crate::encoding::IntRowTemplate>,
1752 is_trivial_fast: bool,
1753 trivial_fast_program: Option<TrivialFastProgram>,
1754}
1755
1756#[derive(Clone)]
1757enum BindAction {
1758 Param {
1759 param_idx: usize,
1760 col_idx: usize,
1761 target: DataType,
1762 },
1763 Literal {
1764 value: Value,
1765 col_idx: usize,
1766 },
1767}
1768
1769#[derive(Clone)]
1770struct TrivialFastProgram {
1771 template: Vec<u8>,
1772 ops: Vec<WriteOp>,
1773 pk_param: u8,
1774 not_null_param_indices: Vec<u8>,
1775}
1776
1777#[derive(Clone)]
1778enum WriteOp {
1779 ParamI64 {
1780 param_idx: u8,
1781 off: u32,
1782 },
1783 LiteralI64 {
1784 value: i64,
1785 off: u32,
1786 },
1787 GenAddParamsI64 {
1788 a_param: u8,
1789 b_param: u8,
1790 off: u32,
1791 bitmap_byte_off: u32,
1792 bitmap_bit_mask: u8,
1793 },
1794 GenMulAddParamI64 {
1795 param_idx: u8,
1796 mul: i64,
1797 add: i64,
1798 off: u32,
1799 bitmap_byte_off: u32,
1800 bitmap_bit_mask: u8,
1801 },
1802}
1803
1804fn build_trivial_fast_program(
1805 bind_plan: &[BindAction],
1806 row_encoder: &crate::encoding::IntRowTemplate,
1807 non_virtual_pairs: &[(usize, usize)],
1808 generated_col_positions: &[usize],
1809 generated_fast_evals: &[FastGenEval],
1810 pk_indices: &[usize],
1811 columns: &[crate::types::ColumnDef],
1812) -> Option<TrivialFastProgram> {
1813 let pk_col = pk_indices[0];
1814 let col_to_slot: rustc_hash::FxHashMap<usize, usize> =
1815 non_virtual_pairs.iter().copied().collect();
1816 let slot_to_off: rustc_hash::FxHashMap<usize, usize> =
1817 row_encoder.slot_offsets.iter().copied().collect();
1818
1819 let mut col_to_param: rustc_hash::FxHashMap<usize, u8> = Default::default();
1820 let mut col_to_lit_int: rustc_hash::FxHashMap<usize, i64> = Default::default();
1821 let mut pk_param: Option<u8> = None;
1822 let mut ops: Vec<WriteOp> = Vec::with_capacity(bind_plan.len() + generated_col_positions.len());
1823 let mut not_null_param_indices: Vec<u8> = Vec::new();
1824
1825 for action in bind_plan {
1826 match action {
1827 BindAction::Param {
1828 param_idx,
1829 col_idx,
1830 target,
1831 } => {
1832 if *target != DataType::Integer {
1833 return None;
1834 }
1835 let pi: u8 = u8::try_from(*param_idx).ok()?;
1836 col_to_param.insert(*col_idx, pi);
1837 if *col_idx == pk_col {
1838 pk_param = Some(pi);
1839 } else {
1840 let slot = *col_to_slot.get(col_idx)?;
1841 let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
1842 ops.push(WriteOp::ParamI64 { param_idx: pi, off });
1843 if !columns[*col_idx].nullable {
1844 not_null_param_indices.push(pi);
1845 }
1846 }
1847 }
1848 BindAction::Literal { value, col_idx } => match value {
1849 Value::Integer(v) => {
1850 col_to_lit_int.insert(*col_idx, *v);
1851 if *col_idx == pk_col {
1852 return None;
1853 }
1854 let slot = *col_to_slot.get(col_idx)?;
1855 let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
1856 ops.push(WriteOp::LiteralI64 { value: *v, off });
1857 }
1858 _ => return None,
1859 },
1860 }
1861 }
1862
1863 let pk_param = pk_param?;
1864
1865 for (i, &gen_pos) in generated_col_positions.iter().enumerate() {
1866 let gen_slot = *col_to_slot.get(&gen_pos)?;
1867 let gen_off = u32::try_from(*slot_to_off.get(&gen_slot)?).ok()?;
1868 let bitmap_byte_off = u32::try_from(2 + gen_slot / 8).ok()?;
1869 let bitmap_bit_mask: u8 = 1u8 << (gen_slot % 8);
1870 let gen_col_nullable = columns[gen_pos].nullable;
1871
1872 match &generated_fast_evals[i] {
1873 FastGenEval::IntColAddCol {
1874 left_idx,
1875 right_idx,
1876 } => {
1877 let a_param = col_to_param.get(left_idx).copied();
1878 let b_param = col_to_param.get(right_idx).copied();
1879 match (a_param, b_param) {
1880 (Some(ap), Some(bp)) => {
1881 let deps_safe = gen_col_nullable
1882 || (not_null_param_indices.contains(&ap)
1883 && not_null_param_indices.contains(&bp));
1884 if !deps_safe {
1885 return None;
1886 }
1887 ops.push(WriteOp::GenAddParamsI64 {
1888 a_param: ap,
1889 b_param: bp,
1890 off: gen_off,
1891 bitmap_byte_off,
1892 bitmap_bit_mask,
1893 });
1894 }
1895 (Some(p), None) => {
1896 let lit = col_to_lit_int.get(right_idx).copied()?;
1897 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
1898 return None;
1899 }
1900 ops.push(WriteOp::GenMulAddParamI64 {
1901 param_idx: p,
1902 mul: 1,
1903 add: lit,
1904 off: gen_off,
1905 bitmap_byte_off,
1906 bitmap_bit_mask,
1907 });
1908 }
1909 (None, Some(p)) => {
1910 let lit = col_to_lit_int.get(left_idx).copied()?;
1911 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
1912 return None;
1913 }
1914 ops.push(WriteOp::GenMulAddParamI64 {
1915 param_idx: p,
1916 mul: 1,
1917 add: lit,
1918 off: gen_off,
1919 bitmap_byte_off,
1920 bitmap_bit_mask,
1921 });
1922 }
1923 (None, None) => {
1924 let la = col_to_lit_int.get(left_idx).copied()?;
1925 let lb = col_to_lit_int.get(right_idx).copied()?;
1926 ops.push(WriteOp::LiteralI64 {
1927 value: la.wrapping_add(lb),
1928 off: gen_off,
1929 });
1930 }
1931 }
1932 }
1933 FastGenEval::IntColMulAdd {
1934 col_schema_idx,
1935 mul,
1936 add,
1937 } => {
1938 if let Some(p) = col_to_param.get(col_schema_idx).copied() {
1939 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
1940 return None;
1941 }
1942 ops.push(WriteOp::GenMulAddParamI64 {
1943 param_idx: p,
1944 mul: *mul,
1945 add: *add,
1946 off: gen_off,
1947 bitmap_byte_off,
1948 bitmap_bit_mask,
1949 });
1950 } else if let Some(lit) = col_to_lit_int.get(col_schema_idx).copied() {
1951 ops.push(WriteOp::LiteralI64 {
1952 value: lit.wrapping_mul(*mul).wrapping_add(*add),
1953 off: gen_off,
1954 });
1955 } else {
1956 return None;
1957 }
1958 }
1959 FastGenEval::None => return None,
1960 }
1961 }
1962
1963 Some(TrivialFastProgram {
1964 template: row_encoder.template.clone(),
1965 ops,
1966 pk_param,
1967 not_null_param_indices,
1968 })
1969}
1970
1971#[derive(Clone)]
1972pub(super) enum CompiledOnConflict {
1973 DoNothing {
1974 target: Option<ConflictKind>,
1975 },
1976 DoUpdate {
1977 target: ConflictKind,
1978 assignments: Vec<(usize, Expr)>,
1979 where_clause: Option<Expr>,
1980 fast_paths: Option<Vec<DoUpdateFastPath>>,
1981 },
1982}
1983
1984#[derive(Clone, Copy)]
1985pub(super) enum DoUpdateFastPath {
1986 IntAddConst { phys_idx: usize, delta: i64 },
1987}
1988
1989#[derive(Clone, Debug)]
1990pub(super) enum ConflictKind {
1991 PrimaryKey,
1992 UniqueIndex { index_idx: usize },
1993}
1994
1995fn resolve_conflict_target(target: &ConflictTarget, ts: &TableSchema) -> Result<ConflictKind> {
1996 match target {
1997 ConflictTarget::Columns(cols) => {
1998 let col_idx_set: Vec<u16> = cols
1999 .iter()
2000 .map(|name| {
2001 ts.column_index(name)
2002 .map(|i| i as u16)
2003 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
2004 })
2005 .collect::<Result<_>>()?;
2006 let pk_set = ts.primary_key_columns.clone();
2007 if set_equal(&col_idx_set, &pk_set) {
2008 return Ok(ConflictKind::PrimaryKey);
2009 }
2010 for (index_idx, idx) in ts.indices.iter().enumerate() {
2011 if idx.unique && set_equal(&col_idx_set, &idx.columns) {
2012 return Ok(ConflictKind::UniqueIndex { index_idx });
2013 }
2014 }
2015 Err(SqlError::Plan(
2016 "ON CONFLICT target does not match any unique constraint".into(),
2017 ))
2018 }
2019 ConflictTarget::Constraint(name) => {
2020 let lower = name.to_ascii_lowercase();
2021 for (index_idx, idx) in ts.indices.iter().enumerate() {
2022 if idx.name.eq_ignore_ascii_case(&lower) {
2023 if idx.unique {
2024 return Ok(ConflictKind::UniqueIndex { index_idx });
2025 }
2026 return Err(SqlError::Plan(format!(
2027 "ON CONFLICT ON CONSTRAINT '{name}' requires a unique index"
2028 )));
2029 }
2030 }
2031 Err(SqlError::Plan(format!(
2032 "unknown constraint '{name}'; primary keys cannot be referenced by name, use ON CONFLICT (col_list)"
2033 )))
2034 }
2035 }
2036}
2037
2038fn set_equal(a: &[u16], b: &[u16]) -> bool {
2039 if a.len() != b.len() {
2040 return false;
2041 }
2042 let mut a_sorted = a.to_vec();
2043 let mut b_sorted = b.to_vec();
2044 a_sorted.sort_unstable();
2045 b_sorted.sort_unstable();
2046 a_sorted == b_sorted
2047}
2048
2049pub(super) enum InsertRowOutcome {
2050 Inserted,
2051 Updated { old: Vec<Value>, new: Vec<Value> },
2052 Skipped,
2053}
2054
2055#[allow(clippy::too_many_arguments)]
2056#[inline]
2057pub(super) fn apply_insert_with_conflict(
2058 wtx: &mut WriteTxn<'_>,
2059 table_schema: &TableSchema,
2060 key_buf: &[u8],
2061 value_buf: &[u8],
2062 row: &[Value],
2063 pk_values: &[Value],
2064 on_conflict: &CompiledOnConflict,
2065 col_map: &ColumnMap,
2066 capture_returning: bool,
2067) -> Result<InsertRowOutcome> {
2068 let table_bytes = table_schema.name.as_bytes();
2069
2070 if let CompiledOnConflict::DoNothing { target } = on_conflict {
2071 let pk_target = matches!(target, None | Some(ConflictKind::PrimaryKey));
2072 if pk_target && table_schema.indices.is_empty() && table_schema.foreign_keys.is_empty() {
2073 let inserted = wtx
2074 .table_insert_if_absent(table_bytes, key_buf, value_buf)
2075 .map_err(SqlError::Storage)?;
2076 return Ok(if inserted {
2077 InsertRowOutcome::Inserted
2078 } else {
2079 InsertRowOutcome::Skipped
2080 });
2081 }
2082 }
2083
2084 if let CompiledOnConflict::DoUpdate {
2085 target: ConflictKind::PrimaryKey,
2086 assignments,
2087 where_clause,
2088 fast_paths,
2089 } = on_conflict
2090 {
2091 if can_fuse_do_update(table_schema, assignments) {
2092 return apply_do_update_fused(
2093 wtx,
2094 table_schema,
2095 table_bytes,
2096 key_buf,
2097 value_buf,
2098 row,
2099 assignments,
2100 where_clause.as_ref(),
2101 col_map,
2102 fast_paths.as_deref(),
2103 capture_returning,
2104 );
2105 }
2106 }
2107
2108 let primary_outcome = wtx
2109 .table_insert_or_fetch(table_bytes, key_buf, value_buf)
2110 .map_err(SqlError::Storage)?;
2111
2112 match primary_outcome {
2113 citadel_txn::write_txn::InsertOutcome::Inserted => {
2114 if table_schema.indices.is_empty() {
2115 return Ok(InsertRowOutcome::Inserted);
2116 }
2117 let mut inserted_keys: Vec<(usize, Vec<u8>)> = Vec::new();
2118 match insert_index_entries_or_fetch(
2119 wtx,
2120 table_schema,
2121 row,
2122 pk_values,
2123 &mut inserted_keys,
2124 )? {
2125 None => Ok(InsertRowOutcome::Inserted),
2126 Some(conflicting_idx) => {
2127 let matches_target =
2128 matches!(on_conflict, CompiledOnConflict::DoNothing { target: None })
2129 || matches!(
2130 on_conflict,
2131 CompiledOnConflict::DoNothing {
2132 target: Some(ConflictKind::UniqueIndex { index_idx }),
2133 } | CompiledOnConflict::DoUpdate {
2134 target: ConflictKind::UniqueIndex { index_idx },
2135 ..
2136 } if *index_idx == conflicting_idx
2137 );
2138 undo_partial_insert(wtx, table_schema, key_buf, &inserted_keys)?;
2139 if !matches_target {
2140 return Err(SqlError::UniqueViolation(
2141 table_schema.indices[conflicting_idx].name.clone(),
2142 ));
2143 }
2144 match on_conflict {
2145 CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2146 CompiledOnConflict::DoUpdate {
2147 assignments,
2148 where_clause,
2149 ..
2150 } => {
2151 let existing_pk =
2152 fetch_unique_index_pk(wtx, table_schema, conflicting_idx, row)?;
2153 apply_do_update(
2154 wtx,
2155 table_schema,
2156 &existing_pk,
2157 row,
2158 assignments,
2159 where_clause.as_ref(),
2160 col_map,
2161 capture_returning,
2162 )
2163 }
2164 }
2165 }
2166 }
2167 }
2168 citadel_txn::write_txn::InsertOutcome::Existed(old_bytes) => {
2169 let matches_target = matches!(
2170 on_conflict,
2171 CompiledOnConflict::DoNothing { target: None }
2172 | CompiledOnConflict::DoNothing {
2173 target: Some(ConflictKind::PrimaryKey),
2174 }
2175 | CompiledOnConflict::DoUpdate {
2176 target: ConflictKind::PrimaryKey,
2177 ..
2178 }
2179 );
2180 if !matches_target {
2181 return Err(SqlError::DuplicateKey);
2182 }
2183 match on_conflict {
2184 CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2185 CompiledOnConflict::DoUpdate {
2186 assignments,
2187 where_clause,
2188 ..
2189 } => {
2190 let old_row = decode_full_row(table_schema, key_buf, &old_bytes)?;
2191 apply_do_update_with_old_row(
2192 wtx,
2193 table_schema,
2194 key_buf,
2195 &old_row,
2196 row,
2197 assignments,
2198 where_clause.as_ref(),
2199 col_map,
2200 capture_returning,
2201 )
2202 }
2203 }
2204 }
2205 }
2206}
2207
2208#[inline]
2209fn apply_fast_path_patch(
2210 old_bytes: &[u8],
2211 fast_paths: &[DoUpdateFastPath],
2212) -> Result<UpsertAction> {
2213 UPSERT_SCRATCH.with(|slot| {
2214 let mut bufs = slot.borrow_mut();
2215 bufs.new_value_buf.clear();
2216 bufs.new_value_buf.extend_from_slice(old_bytes);
2217
2218 let mut patch_scratch: Vec<u8> = Vec::new();
2219
2220 for fp in fast_paths {
2221 match fp {
2222 DoUpdateFastPath::IntAddConst { phys_idx, delta } => {
2223 let decoded =
2224 crate::encoding::decode_columns(&bufs.new_value_buf, &[*phys_idx])?;
2225 let old_val = &decoded[0];
2226 let new_val = match old_val {
2227 Value::Integer(i) => Value::Integer(i.wrapping_add(*delta)),
2228 Value::Null => Value::Null,
2229 _ => {
2230 return Err(SqlError::TypeMismatch {
2231 expected: "INTEGER".into(),
2232 got: old_val.data_type().to_string(),
2233 });
2234 }
2235 };
2236 if !crate::encoding::patch_column_in_place(
2237 &mut bufs.new_value_buf,
2238 *phys_idx,
2239 &new_val,
2240 )? {
2241 patch_scratch.clear();
2242 crate::encoding::patch_row_column(
2243 &bufs.new_value_buf,
2244 *phys_idx,
2245 &new_val,
2246 &mut patch_scratch,
2247 )?;
2248 std::mem::swap(&mut bufs.new_value_buf, &mut patch_scratch);
2249 }
2250 }
2251 }
2252 }
2253
2254 if bufs.new_value_buf.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
2255 return Err(SqlError::RowTooLarge {
2256 size: bufs.new_value_buf.len(),
2257 max: citadel_core::MAX_INLINE_VALUE_SIZE,
2258 });
2259 }
2260
2261 Ok(UpsertAction::Replace(bufs.new_value_buf.clone()))
2262 })
2263}
2264
2265fn upsert_needs_row(oc: &CompiledOnConflict, ts: &TableSchema) -> bool {
2266 if !ts.indices.is_empty() {
2267 return true;
2268 }
2269 match oc {
2270 CompiledOnConflict::DoNothing { .. } => false,
2271 CompiledOnConflict::DoUpdate { fast_paths, .. } => fast_paths.is_none() || ts.has_checks(),
2272 }
2273}
2274
2275fn can_fuse_do_update(ts: &TableSchema, assignments: &[(usize, Expr)]) -> bool {
2276 if !ts.indices.is_empty() {
2277 return false;
2278 }
2279 if !ts.foreign_keys.is_empty() {
2280 return false;
2281 }
2282 if ts.columns.iter().any(|c| c.generated_kind.is_some()) {
2283 return false;
2284 }
2285 let pk = ts.pk_indices();
2286 !assignments.iter().any(|(ci, _)| pk.contains(ci))
2287}
2288
2289#[allow(clippy::too_many_arguments)]
2290#[inline]
2291fn apply_do_update_fused(
2292 wtx: &mut WriteTxn<'_>,
2293 table_schema: &TableSchema,
2294 table_bytes: &[u8],
2295 key_buf: &[u8],
2296 value_buf: &[u8],
2297 proposed_row: &[Value],
2298 assignments: &[(usize, Expr)],
2299 where_clause: Option<&Expr>,
2300 col_map: &ColumnMap,
2301 fast_paths: Option<&[DoUpdateFastPath]>,
2302 capture_returning: bool,
2303) -> Result<InsertRowOutcome> {
2304 let non_pk = table_schema.non_pk_indices();
2305 let enc_pos = table_schema.encoding_positions();
2306 let phys_count = table_schema.physical_non_pk_count();
2307 let dropped = table_schema.dropped_non_pk_slots();
2308 let has_checks = table_schema.has_checks();
2309 let has_fks = !table_schema.foreign_keys.is_empty();
2310
2311 let captured: std::cell::RefCell<Option<(Vec<Value>, Vec<Value>)>> =
2312 std::cell::RefCell::new(None);
2313
2314 let outcome =
2315 wtx.table_upsert_with::<_, SqlError>(table_bytes, key_buf, value_buf, |old_bytes| {
2316 if let Some(fps) = fast_paths {
2317 if !has_checks {
2318 let action = apply_fast_path_patch(old_bytes, fps)?;
2319 if capture_returning {
2320 if let UpsertAction::Replace(ref new_bytes) = action {
2321 let old_row = decode_full_row(table_schema, key_buf, old_bytes)?;
2322 let new_row = decode_full_row(table_schema, key_buf, new_bytes)?;
2323 *captured.borrow_mut() = Some((old_row, new_row));
2324 }
2325 }
2326 return Ok(action);
2327 }
2328 }
2329 UPSERT_SCRATCH.with(|slot| {
2330 let mut bufs = slot.borrow_mut();
2331 let UpsertBufs {
2332 old_row,
2333 new_row,
2334 value_values,
2335 new_value_buf,
2336 } = &mut *bufs;
2337
2338 old_row.clear();
2339 old_row.resize(table_schema.columns.len(), Value::Null);
2340 decode_full_row_into(table_schema, key_buf, old_bytes, old_row)?;
2341
2342 if let Some(w) = where_clause {
2343 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2344 let result = eval_expr(w, &ctx)?;
2345 if result.is_null() || !is_truthy(&result) {
2346 return Ok(UpsertAction::Skip);
2347 }
2348 }
2349
2350 new_row.clear();
2351 new_row.extend_from_slice(old_row);
2352 for (col_idx, expr) in assignments {
2353 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2354 let val = eval_expr(expr, &ctx)?;
2355 let col = &table_schema.columns[*col_idx];
2356 new_row[*col_idx] = if val.is_null() {
2357 Value::Null
2358 } else {
2359 let got = val.data_type();
2360 val.coerce_into(col.data_type)
2361 .ok_or_else(|| SqlError::TypeMismatch {
2362 expected: col.data_type.to_string(),
2363 got: got.to_string(),
2364 })?
2365 };
2366 }
2367
2368 for (assigned_idx, _) in assignments {
2369 let col = &table_schema.columns[*assigned_idx];
2370 if !col.nullable && new_row[col.position as usize].is_null() {
2371 return Err(SqlError::NotNullViolation(col.name.clone()));
2372 }
2373 }
2374 if has_checks {
2375 for col in &table_schema.columns {
2376 if let Some(ref check) = col.check_expr {
2377 let ctx = EvalCtx::new(col_map, new_row);
2378 let result = eval_expr(check, &ctx)?;
2379 if !is_truthy(&result) && !result.is_null() {
2380 let name = col.check_name.as_deref().unwrap_or(&col.name);
2381 return Err(SqlError::CheckViolation(name.to_string()));
2382 }
2383 }
2384 }
2385 for tc in &table_schema.check_constraints {
2386 let ctx = EvalCtx::new(col_map, new_row);
2387 let result = eval_expr(&tc.expr, &ctx)?;
2388 if !is_truthy(&result) && !result.is_null() {
2389 let name = tc.name.as_deref().unwrap_or(&tc.sql);
2390 return Err(SqlError::CheckViolation(name.to_string()));
2391 }
2392 }
2393 }
2394 let _ = has_fks;
2395
2396 value_values.clear();
2397 value_values.resize(phys_count, Value::Null);
2398 for &slot in dropped {
2399 value_values[slot as usize] = Value::Null;
2400 }
2401 for (j, &i) in non_pk.iter().enumerate() {
2402 value_values[enc_pos[j] as usize] = new_row[i].clone();
2403 }
2404 new_value_buf.clear();
2405 crate::encoding::encode_row_into(value_values, new_value_buf);
2406
2407 if new_value_buf.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
2408 return Err(SqlError::RowTooLarge {
2409 size: new_value_buf.len(),
2410 max: citadel_core::MAX_INLINE_VALUE_SIZE,
2411 });
2412 }
2413
2414 if capture_returning {
2415 *captured.borrow_mut() = Some((old_row.clone(), new_row.clone()));
2416 }
2417 Ok(UpsertAction::Replace(new_value_buf.clone()))
2418 })
2419 })?;
2420
2421 match outcome {
2422 UpsertOutcome::Inserted => Ok(InsertRowOutcome::Inserted),
2423 UpsertOutcome::Updated => {
2424 if capture_returning {
2425 let (old, new) = captured.into_inner().ok_or_else(|| {
2426 SqlError::InvalidValue("DO UPDATE produced no captured rows".into())
2427 })?;
2428 Ok(InsertRowOutcome::Updated { old, new })
2429 } else {
2430 Ok(InsertRowOutcome::Inserted)
2431 }
2432 }
2433 UpsertOutcome::Skipped => Ok(InsertRowOutcome::Skipped),
2434 }
2435}
2436
2437fn fetch_unique_index_pk(
2438 wtx: &mut WriteTxn<'_>,
2439 table_schema: &TableSchema,
2440 index_idx: usize,
2441 row: &[Value],
2442) -> Result<Vec<u8>> {
2443 let idx = &table_schema.indices[index_idx];
2444 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2445 let indexed: Vec<Value> = idx
2446 .columns
2447 .iter()
2448 .map(|&col_idx| row[col_idx as usize].clone())
2449 .collect();
2450 let key = crate::encoding::encode_composite_key(&indexed);
2451 let value = wtx
2452 .table_get(&idx_table, &key)
2453 .map_err(SqlError::Storage)?
2454 .ok_or_else(|| {
2455 SqlError::InvalidValue("unique index missing expected collision entry".into())
2456 })?;
2457 Ok(value)
2458}
2459
2460#[allow(clippy::too_many_arguments)]
2461fn apply_do_update(
2462 wtx: &mut WriteTxn<'_>,
2463 table_schema: &TableSchema,
2464 pk_key: &[u8],
2465 proposed_row: &[Value],
2466 assignments: &[(usize, Expr)],
2467 where_clause: Option<&Expr>,
2468 col_map: &ColumnMap,
2469 capture_returning: bool,
2470) -> Result<InsertRowOutcome> {
2471 let old_value = wtx
2472 .table_get(table_schema.name.as_bytes(), pk_key)
2473 .map_err(SqlError::Storage)?
2474 .ok_or_else(|| SqlError::InvalidValue("primary row missing for DO UPDATE target".into()))?;
2475 let old_row = decode_full_row(table_schema, pk_key, &old_value)?;
2476 apply_do_update_with_old_row(
2477 wtx,
2478 table_schema,
2479 pk_key,
2480 &old_row,
2481 proposed_row,
2482 assignments,
2483 where_clause,
2484 col_map,
2485 capture_returning,
2486 )
2487}
2488
2489#[allow(clippy::too_many_arguments)]
2490fn apply_do_update_with_old_row(
2491 wtx: &mut WriteTxn<'_>,
2492 table_schema: &TableSchema,
2493 old_pk_key: &[u8],
2494 old_row: &[Value],
2495 proposed_row: &[Value],
2496 assignments: &[(usize, Expr)],
2497 where_clause: Option<&Expr>,
2498 col_map: &ColumnMap,
2499 capture_returning: bool,
2500) -> Result<InsertRowOutcome> {
2501 if let Some(w) = where_clause {
2502 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2503 let result = eval_expr(w, &ctx)?;
2504 if result.is_null() || !is_truthy(&result) {
2505 return Ok(InsertRowOutcome::Skipped);
2506 }
2507 }
2508
2509 let mut new_row = old_row.to_vec();
2510 for (col_idx, expr) in assignments {
2511 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2512 let val = eval_expr(expr, &ctx)?;
2513 let col = &table_schema.columns[*col_idx];
2514 new_row[*col_idx] = if val.is_null() {
2515 Value::Null
2516 } else {
2517 let got = val.data_type();
2518 val.coerce_into(col.data_type)
2519 .ok_or_else(|| SqlError::TypeMismatch {
2520 expected: col.data_type.to_string(),
2521 got: got.to_string(),
2522 })?
2523 };
2524 }
2525
2526 for col in &table_schema.columns {
2527 if matches!(
2528 col.generated_kind,
2529 Some(crate::parser::GeneratedKind::Stored)
2530 ) {
2531 let val = eval_expr(
2532 col.generated_expr.as_ref().unwrap(),
2533 &EvalCtx::new(col_map, &new_row),
2534 )?;
2535 let pos = col.position as usize;
2536 new_row[pos] = if val.is_null() {
2537 if !col.nullable {
2538 return Err(SqlError::NotNullViolation(col.name.clone()));
2539 }
2540 Value::Null
2541 } else {
2542 let got = val.data_type();
2543 val.coerce_into(col.data_type)
2544 .ok_or_else(|| SqlError::TypeMismatch {
2545 expected: col.data_type.to_string(),
2546 got: got.to_string(),
2547 })?
2548 };
2549 }
2550 }
2551
2552 let pk_indices = table_schema.pk_indices();
2553 let assigned_pk = assignments.iter().any(|(ci, _)| pk_indices.contains(ci));
2554 let pk_changed = assigned_pk && pk_indices.iter().any(|&i| old_row[i] != new_row[i]);
2555
2556 for (assigned_idx, _) in assignments {
2557 let col = &table_schema.columns[*assigned_idx];
2558 if !col.nullable && new_row[col.position as usize].is_null() {
2559 return Err(SqlError::NotNullViolation(col.name.clone()));
2560 }
2561 }
2562 if table_schema.has_checks() {
2563 for col in &table_schema.columns {
2564 if let Some(ref check) = col.check_expr {
2565 let ctx = EvalCtx::new(col_map, &new_row);
2566 let result = eval_expr(check, &ctx)?;
2567 if !is_truthy(&result) && !result.is_null() {
2568 let name = col.check_name.as_deref().unwrap_or(&col.name);
2569 return Err(SqlError::CheckViolation(name.to_string()));
2570 }
2571 }
2572 }
2573 for tc in &table_schema.check_constraints {
2574 let ctx = EvalCtx::new(col_map, &new_row);
2575 let result = eval_expr(&tc.expr, &ctx)?;
2576 if !is_truthy(&result) && !result.is_null() {
2577 let name = tc.name.as_deref().unwrap_or(&tc.sql);
2578 return Err(SqlError::CheckViolation(name.to_string()));
2579 }
2580 }
2581 }
2582 for fk in &table_schema.foreign_keys {
2583 let changed = fk
2584 .columns
2585 .iter()
2586 .any(|&ci| old_row[ci as usize] != new_row[ci as usize]);
2587 if !changed {
2588 continue;
2589 }
2590 let any_null = fk.columns.iter().any(|&ci| new_row[ci as usize].is_null());
2591 if any_null {
2592 continue;
2593 }
2594 let fk_vals: Vec<Value> = fk
2595 .columns
2596 .iter()
2597 .map(|&ci| new_row[ci as usize].clone())
2598 .collect();
2599 let fk_key = crate::encoding::encode_composite_key(&fk_vals);
2600 let found = wtx
2601 .table_get(fk.foreign_table.as_bytes(), &fk_key)
2602 .map_err(SqlError::Storage)?;
2603 if found.is_none() {
2604 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
2605 return Err(SqlError::ForeignKeyViolation(name.to_string()));
2606 }
2607 }
2608
2609 let has_indices = !table_schema.indices.is_empty();
2610 let old_pk_values: Vec<Value> = if has_indices || pk_changed {
2611 pk_indices.iter().map(|&i| old_row[i].clone()).collect()
2612 } else {
2613 Vec::new()
2614 };
2615 let new_pk_values: Vec<Value> = if has_indices || pk_changed {
2616 pk_indices.iter().map(|&i| new_row[i].clone()).collect()
2617 } else {
2618 Vec::new()
2619 };
2620
2621 let non_pk = table_schema.non_pk_indices();
2622 let enc_pos = table_schema.encoding_positions();
2623 let phys_count = table_schema.physical_non_pk_count();
2624 let dropped = table_schema.dropped_non_pk_slots();
2625 let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
2626 for &slot in dropped {
2627 value_values[slot as usize] = Value::Null;
2628 }
2629 for (j, &i) in non_pk.iter().enumerate() {
2630 let col = &table_schema.columns[i];
2631 value_values[enc_pos[j] as usize] = if matches!(
2632 col.generated_kind,
2633 Some(crate::parser::GeneratedKind::Virtual)
2634 ) {
2635 Value::Null
2636 } else {
2637 new_row[i].clone()
2638 };
2639 }
2640 let mut new_value_buf = Vec::with_capacity(256);
2641 crate::encoding::encode_row_into(&value_values, &mut new_value_buf);
2642
2643 if new_value_buf.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
2644 return Err(SqlError::RowTooLarge {
2645 size: new_value_buf.len(),
2646 max: citadel_core::MAX_INLINE_VALUE_SIZE,
2647 });
2648 }
2649
2650 if pk_changed {
2651 let new_pk_key = crate::encoding::encode_composite_key(&new_pk_values);
2652 let inserted = wtx
2653 .table_insert(table_schema.name.as_bytes(), &new_pk_key, &new_value_buf)
2654 .map_err(SqlError::Storage)?;
2655 if !inserted {
2656 return Err(SqlError::DuplicateKey);
2657 }
2658 wtx.table_delete(table_schema.name.as_bytes(), old_pk_key)
2659 .map_err(SqlError::Storage)?;
2660 for idx in &table_schema.indices {
2661 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2662 let old_idx_key = encode_index_key(idx, old_row, &old_pk_values);
2663 wtx.table_delete(&idx_table, &old_idx_key)
2664 .map_err(SqlError::Storage)?;
2665 let new_idx_key = encode_index_key(idx, &new_row, &new_pk_values);
2666 let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
2667 let is_new = wtx
2668 .table_insert(&idx_table, &new_idx_key, &new_idx_val)
2669 .map_err(SqlError::Storage)?;
2670 if idx.unique && !is_new {
2671 let any_null = idx.columns.iter().any(|&c| new_row[c as usize].is_null());
2672 if !any_null {
2673 return Err(SqlError::UniqueViolation(idx.name.clone()));
2674 }
2675 }
2676 }
2677 } else {
2678 wtx.table_update_sorted(
2679 table_schema.name.as_bytes(),
2680 &[(old_pk_key, new_value_buf.as_slice())],
2681 )
2682 .map_err(SqlError::Storage)?;
2683 let col_map_partial =
2684 any_partial_index(table_schema).then(|| ColumnMap::new(&table_schema.columns));
2685 for idx in &table_schema.indices {
2686 let cols_changed = index_columns_changed(idx, old_row, &new_row);
2687 let (del, ins) = partial_idx_update_actions(
2688 idx,
2689 old_row,
2690 &new_row,
2691 cols_changed,
2692 false,
2693 col_map_partial.as_ref(),
2694 );
2695 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2696 if del {
2697 let old_idx_key = encode_index_key(idx, old_row, &old_pk_values);
2698 wtx.table_delete(&idx_table, &old_idx_key)
2699 .map_err(SqlError::Storage)?;
2700 }
2701 if ins {
2702 let new_idx_key = encode_index_key(idx, &new_row, &new_pk_values);
2703 let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
2704 let is_new = wtx
2705 .table_insert(&idx_table, &new_idx_key, &new_idx_val)
2706 .map_err(SqlError::Storage)?;
2707 if idx.unique && !is_new {
2708 let any_null = idx.columns.iter().any(|&c| new_row[c as usize].is_null());
2709 if !any_null {
2710 return Err(SqlError::UniqueViolation(idx.name.clone()));
2711 }
2712 }
2713 }
2714 }
2715 }
2716
2717 if capture_returning {
2718 Ok(InsertRowOutcome::Updated {
2719 old: old_row.to_vec(),
2720 new: new_row,
2721 })
2722 } else {
2723 Ok(InsertRowOutcome::Inserted)
2724 }
2725}
2726
2727fn detect_fast_paths(
2728 ts: &TableSchema,
2729 assignments: &[(usize, Expr)],
2730) -> Option<Vec<DoUpdateFastPath>> {
2731 let non_pk = ts.non_pk_indices();
2732 let enc_pos = ts.encoding_positions();
2733 let mut out = Vec::with_capacity(assignments.len());
2734 for (col_idx, expr) in assignments {
2735 let col = &ts.columns[*col_idx];
2736 if col.data_type != DataType::Integer {
2737 return None;
2738 }
2739 let nonpk_order = non_pk.iter().position(|&i| i == *col_idx)?;
2740 let phys_idx = enc_pos[nonpk_order] as usize;
2741
2742 if let Expr::BinaryOp { left, op, right } = expr {
2743 if !matches!(op, BinOp::Add | BinOp::Sub) {
2744 return None;
2745 }
2746 let reads_target =
2747 matches!(left.as_ref(), Expr::Column(n) if n.eq_ignore_ascii_case(&col.name));
2748 if !reads_target {
2749 return None;
2750 }
2751 if let Expr::Literal(Value::Integer(n)) = right.as_ref() {
2752 let delta = if matches!(op, BinOp::Sub) { -n } else { *n };
2753 let _ = col_idx;
2754 out.push(DoUpdateFastPath::IntAddConst { phys_idx, delta });
2755 continue;
2756 }
2757 return None;
2758 }
2759 return None;
2760 }
2761 Some(out)
2762}
2763
2764fn compile_on_conflict(oc: &OnConflictClause, ts: &TableSchema) -> Result<CompiledOnConflict> {
2765 let target = oc
2766 .target
2767 .as_ref()
2768 .map(|t| resolve_conflict_target(t, ts))
2769 .transpose()?;
2770 match &oc.action {
2771 OnConflictAction::DoNothing => Ok(CompiledOnConflict::DoNothing { target }),
2772 OnConflictAction::DoUpdate {
2773 assignments,
2774 where_clause,
2775 } => {
2776 let target = target.ok_or_else(|| {
2777 SqlError::Plan("ON CONFLICT without target requires DO NOTHING".into())
2778 })?;
2779 let compiled_assignments: Vec<(usize, Expr)> = assignments
2780 .iter()
2781 .map(|(name, expr)| {
2782 let col_idx = ts
2783 .column_index(name)
2784 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?;
2785 Ok((col_idx, expr.clone()))
2786 })
2787 .collect::<Result<_>>()?;
2788 let fast_paths = if where_clause.is_none() {
2789 detect_fast_paths(ts, &compiled_assignments)
2790 } else {
2791 None
2792 };
2793 Ok(CompiledOnConflict::DoUpdate {
2794 target,
2795 assignments: compiled_assignments,
2796 where_clause: where_clause.clone(),
2797 fast_paths,
2798 })
2799 }
2800 }
2801}
2802
2803fn exec_insert_trivial_fast(
2805 wtx: &mut WriteTxn<'_>,
2806 table_lower: &str,
2807 cache: &InsertCache,
2808 bufs: &mut InsertBufs,
2809 params: &[Value],
2810) -> Result<ExecutionResult> {
2811 let prog = cache
2812 .trivial_fast_program
2813 .as_ref()
2814 .expect("trivial fast: program");
2815
2816 for &p in &prog.not_null_param_indices {
2817 if params[p as usize].is_null() {
2818 return Err(SqlError::NotNullViolation(format!("param@{p}")));
2819 }
2820 }
2821
2822 match ¶ms[prog.pk_param as usize] {
2823 Value::Integer(v) => crate::encoding::encode_int_key_into(*v, &mut bufs.key_buf),
2824 _ => return Err(SqlError::InvalidValue("non-integer PK in fast path".into())),
2825 }
2826
2827 bufs.value_buf.clear();
2828 bufs.value_buf.extend_from_slice(&prog.template);
2829
2830 for op in &prog.ops {
2831 match op {
2832 WriteOp::ParamI64 { param_idx, off } => match ¶ms[*param_idx as usize] {
2833 Value::Integer(v) => {
2834 let off = *off as usize;
2835 bufs.value_buf[off..off + 8].copy_from_slice(&v.to_le_bytes());
2836 }
2837 other => {
2838 return Err(SqlError::TypeMismatch {
2839 expected: "Integer".into(),
2840 got: other.data_type().to_string(),
2841 });
2842 }
2843 },
2844 WriteOp::LiteralI64 { value, off } => {
2845 let off = *off as usize;
2846 bufs.value_buf[off..off + 8].copy_from_slice(&value.to_le_bytes());
2847 }
2848 WriteOp::GenAddParamsI64 {
2849 a_param,
2850 b_param,
2851 off,
2852 bitmap_byte_off,
2853 bitmap_bit_mask,
2854 } => match (¶ms[*a_param as usize], ¶ms[*b_param as usize]) {
2855 (Value::Integer(a), Value::Integer(b)) => {
2856 let off = *off as usize;
2857 bufs.value_buf[off..off + 8].copy_from_slice(&a.wrapping_add(*b).to_le_bytes());
2858 }
2859 _ => {
2860 bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
2861 }
2862 },
2863 WriteOp::GenMulAddParamI64 {
2864 param_idx,
2865 mul,
2866 add,
2867 off,
2868 bitmap_byte_off,
2869 bitmap_bit_mask,
2870 } => match ¶ms[*param_idx as usize] {
2871 Value::Integer(v) => {
2872 let r = v.wrapping_mul(*mul).wrapping_add(*add);
2873 let off = *off as usize;
2874 bufs.value_buf[off..off + 8].copy_from_slice(&r.to_le_bytes());
2875 }
2876 _ => {
2877 bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
2878 }
2879 },
2880 }
2881 }
2882
2883 let is_new = wtx
2884 .table_insert(table_lower.as_bytes(), &bufs.key_buf, &bufs.value_buf)
2885 .map_err(SqlError::Storage)?;
2886 if !is_new {
2887 return Err(SqlError::DuplicateKey);
2888 }
2889 Ok(ExecutionResult::RowsAffected(1))
2890}
2891
2892fn build_bind_plan(
2893 stmt: &InsertStmt,
2894 col_indices: &[usize],
2895 col_data_types: &[DataType],
2896) -> Option<Vec<BindAction>> {
2897 let rows = match &stmt.source {
2898 InsertSource::Values(rows) => rows,
2899 _ => return None,
2900 };
2901 if rows.len() != 1 {
2902 return None;
2903 }
2904 let value_row = &rows[0];
2905 if value_row.len() != col_indices.len() {
2906 return None;
2907 }
2908 let mut plan = Vec::with_capacity(value_row.len());
2909 for (i, expr) in value_row.iter().enumerate() {
2910 let col_idx = col_indices[i];
2911 let target = col_data_types[col_idx];
2912 match expr {
2913 Expr::Parameter(n) => {
2914 if *n == 0 {
2915 return None;
2916 }
2917 plan.push(BindAction::Param {
2918 param_idx: n - 1,
2919 col_idx,
2920 target,
2921 });
2922 }
2923 Expr::Literal(v) => plan.push(BindAction::Literal {
2924 value: v.clone(),
2925 col_idx,
2926 }),
2927 _ => return None,
2928 }
2929 }
2930 Some(plan)
2931}
2932
2933impl CompiledInsert {
2934 pub fn try_compile(schema: &SchemaManager, stmt: &InsertStmt) -> Option<Self> {
2935 let lower = stmt.table.to_ascii_lowercase();
2936 let cached = if let Some(ts) = schema.get(&lower) {
2937 let insert_columns: Vec<&str> = if stmt.columns.is_empty() {
2938 ts.columns.iter().map(|c| c.name.as_str()).collect()
2939 } else {
2940 stmt.columns.iter().map(|s| s.as_str()).collect()
2941 };
2942 let mut col_indices = Vec::with_capacity(insert_columns.len());
2943 for name in &insert_columns {
2944 col_indices.push(ts.column_index(name)?);
2945 }
2946 if col_indices
2947 .iter()
2948 .any(|&ci| ts.columns[ci].generated_kind.is_some())
2949 {
2950 return None;
2951 }
2952 let on_conflict = stmt
2953 .on_conflict
2954 .as_ref()
2955 .map(|oc| compile_on_conflict(oc, ts))
2956 .transpose()
2957 .ok()
2958 .flatten()
2959 .map(Arc::new);
2960 let generated_col_positions: Vec<usize> = ts
2961 .columns
2962 .iter()
2963 .enumerate()
2964 .filter_map(|(i, c)| {
2965 matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored))
2966 .then_some(i)
2967 })
2968 .collect();
2969 let generated_fast_evals: Vec<FastGenEval> = generated_col_positions
2970 .iter()
2971 .map(|&pos| {
2972 detect_fast_gen_eval(ts.columns[pos].generated_expr.as_ref().unwrap(), ts)
2973 })
2974 .collect();
2975 let row_col_map = if on_conflict.is_some() || !generated_col_positions.is_empty() {
2976 Some(ColumnMap::new(&ts.columns))
2977 } else {
2978 None
2979 };
2980 let pk_indices: Vec<usize> = ts.pk_indices().to_vec();
2981 let non_pk_indices: Vec<usize> = ts.non_pk_indices().to_vec();
2982 let encoding_positions: Vec<u16> = ts.encoding_positions().to_vec();
2983 let dropped_non_pk_slots: Vec<u16> = ts.dropped_non_pk_slots().to_vec();
2984 let phys_count = ts.physical_non_pk_count();
2985 let col_data_types: Vec<DataType> = ts.columns.iter().map(|c| c.data_type).collect();
2986 let single_int_pk =
2987 pk_indices.len() == 1 && ts.columns[pk_indices[0]].data_type == DataType::Integer;
2988 let not_null_indices: Vec<u16> = ts
2989 .columns
2990 .iter()
2991 .filter(|c| !c.nullable)
2992 .map(|c| c.position)
2993 .collect();
2994 let bind_plan = build_bind_plan(stmt, &col_indices, &col_data_types);
2995 let any_defaults_flag = ts.columns.iter().any(|c| c.default_expr.is_some());
2996 let row_fully_overwritten = if any_defaults_flag {
2997 false
2998 } else {
2999 let mut covered: rustc_hash::FxHashSet<usize> =
3000 col_indices.iter().copied().collect();
3001 covered.extend(generated_col_positions.iter().copied());
3002 for (j, &i) in non_pk_indices.iter().enumerate() {
3003 let _ = j;
3004 if matches!(
3005 ts.columns[i].generated_kind,
3006 Some(crate::parser::GeneratedKind::Virtual)
3007 ) {
3008 covered.insert(i);
3009 }
3010 }
3011 bind_plan.is_some() && covered.len() == ts.columns.len()
3012 };
3013 let has_fks = !ts.foreign_keys.is_empty();
3014 let has_indices = !ts.indices.is_empty();
3015 let mut non_virtual_pairs: Vec<(usize, usize)> = Vec::new();
3016 let mut null_value_slots: Vec<usize> =
3017 dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3018 for (j, &i) in non_pk_indices.iter().enumerate() {
3019 let slot = encoding_positions[j] as usize;
3020 if matches!(
3021 ts.columns[i].generated_kind,
3022 Some(crate::parser::GeneratedKind::Virtual)
3023 ) {
3024 null_value_slots.push(slot);
3025 } else {
3026 non_virtual_pairs.push((i, slot));
3027 }
3028 }
3029 let row_encoder = {
3030 let all_int_or_null = non_pk_indices.iter().enumerate().all(|(j, &i)| {
3031 let col = &ts.columns[i];
3032 if matches!(
3033 col.generated_kind,
3034 Some(crate::parser::GeneratedKind::Virtual)
3035 ) {
3036 true
3037 } else {
3038 col.data_type == DataType::Integer && encoding_positions[j] != u16::MAX
3039 }
3040 });
3041 if all_int_or_null {
3042 let mut null_slots: Vec<usize> =
3043 dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3044 for (j, &i) in non_pk_indices.iter().enumerate() {
3045 if matches!(
3046 ts.columns[i].generated_kind,
3047 Some(crate::parser::GeneratedKind::Virtual)
3048 ) {
3049 null_slots.push(encoding_positions[j] as usize);
3050 }
3051 }
3052 Some(crate::encoding::build_int_row_template(
3053 phys_count,
3054 &null_slots,
3055 ))
3056 } else {
3057 None
3058 }
3059 };
3060 let is_trivial_fast_eligible = !insert_has_subquery(stmt)
3061 && !ts.columns.iter().any(|c| c.default_expr.is_some())
3062 && !ts.has_checks()
3063 && !has_fks
3064 && !has_indices
3065 && stmt.on_conflict.is_none()
3066 && stmt.returning.is_none()
3067 && bind_plan.is_some()
3068 && row_encoder.is_some()
3069 && row_fully_overwritten
3070 && single_int_pk
3071 && generated_fast_evals
3072 .iter()
3073 .all(|fe| !matches!(fe, FastGenEval::None));
3074 let trivial_fast_program = if is_trivial_fast_eligible {
3075 build_trivial_fast_program(
3076 bind_plan.as_ref().unwrap(),
3077 row_encoder.as_ref().unwrap(),
3078 &non_virtual_pairs,
3079 &generated_col_positions,
3080 &generated_fast_evals,
3081 &pk_indices,
3082 &ts.columns,
3083 )
3084 } else {
3085 None
3086 };
3087 let is_trivial_fast = trivial_fast_program.is_some();
3088 Some(InsertCache {
3089 col_indices,
3090 has_subquery: insert_has_subquery(stmt),
3091 any_defaults: ts.columns.iter().any(|c| c.default_expr.is_some()),
3092 has_checks: ts.has_checks(),
3093 on_conflict,
3094 row_col_map,
3095 generated_col_positions,
3096 generated_fast_evals,
3097 pk_indices,
3098 non_pk_indices,
3099 encoding_positions,
3100 dropped_non_pk_slots,
3101 phys_count,
3102 single_int_pk,
3103 not_null_indices,
3104 bind_plan,
3105 row_fully_overwritten,
3106 row_encoder,
3107 is_trivial_fast,
3108 trivial_fast_program,
3109 })
3110 } else if schema.get_view(&lower).is_some() {
3111 None
3112 } else {
3113 return None;
3114 };
3115 Some(Self {
3116 table_lower: lower,
3117 cached,
3118 })
3119 }
3120}
3121
3122impl CompiledPlan for CompiledInsert {
3123 fn execute(
3124 &self,
3125 db: &Database,
3126 schema: &SchemaManager,
3127 stmt: &Statement,
3128 params: &[Value],
3129 wtx: Option<&mut WriteTxn<'_>>,
3130 ) -> Result<ExecutionResult> {
3131 let ins = match stmt {
3132 Statement::Insert(i) => i,
3133 _ => {
3134 return Err(SqlError::Unsupported(
3135 "CompiledInsert received non-INSERT statement".into(),
3136 ))
3137 }
3138 };
3139 match wtx {
3140 None => exec_insert(db, schema, ins, params),
3141 Some(outer) => match self.cached.as_ref() {
3142 Some(c) if c.is_trivial_fast => with_insert_scratch(|bufs| {
3143 exec_insert_trivial_fast(outer, &self.table_lower, c, bufs, params)
3144 }),
3145 Some(c) => exec_insert_in_txn_cached(outer, schema, ins, params, c),
3146 None => exec_insert_in_txn(outer, schema, ins, params),
3147 },
3148 }
3149 }
3150
3151 fn uses_scoped_params(&self) -> bool {
3152 match self.cached.as_ref() {
3153 Some(c) => !c.is_trivial_fast,
3154 None => true,
3155 }
3156 }
3157}
3158
3159pub struct CompiledDelete {
3160 table_lower: String,
3161}
3162
3163impl CompiledDelete {
3164 pub fn try_compile(schema: &SchemaManager, stmt: &DeleteStmt) -> Option<Self> {
3165 let lower = stmt.table.to_ascii_lowercase();
3166 schema.get(&lower)?;
3167 Some(Self { table_lower: lower })
3168 }
3169}
3170
3171impl CompiledPlan for CompiledDelete {
3172 fn execute(
3173 &self,
3174 db: &Database,
3175 schema: &SchemaManager,
3176 stmt: &Statement,
3177 _params: &[Value],
3178 wtx: Option<&mut WriteTxn<'_>>,
3179 ) -> Result<ExecutionResult> {
3180 let del = match stmt {
3181 Statement::Delete(d) => d,
3182 _ => {
3183 return Err(SqlError::Unsupported(
3184 "CompiledDelete received non-DELETE statement".into(),
3185 ))
3186 }
3187 };
3188 let _ = &self.table_lower;
3189 match wtx {
3190 None => super::write::exec_delete(db, schema, del),
3191 Some(outer) => super::write::exec_delete_in_txn(outer, schema, del),
3192 }
3193 }
3194}