1use std::cell::RefCell;
2use std::sync::Arc;
3
4use citadel::Database;
5use citadel_buffer::btree::{UpsertAction, UpsertOutcome};
6use citadel_txn::write_txn::WriteTxn;
7use rustc_hash::FxHashMap;
8
9use crate::encoding::{encode_composite_key_into, encode_row_into};
10use crate::error::{Result, SqlError};
11use crate::eval::{eval_expr, is_truthy, ColumnMap, EvalCtx};
12use crate::parser::*;
13use crate::types::*;
14
15use crate::schema::SchemaManager;
16
17use super::compile::CompiledPlan;
18use super::helpers::*;
19use super::CteContext;
20
21pub(super) fn exec_insert(
22 db: &Database,
23 schema: &SchemaManager,
24 stmt: &InsertStmt,
25 params: &[Value],
26) -> Result<ExecutionResult> {
27 let empty_ctes = CteContext::default();
28 let materialized;
29 let stmt = if insert_has_subquery(stmt) {
30 materialized = materialize_insert(stmt, &mut |sub| {
31 exec_subquery_read(db, schema, sub, &empty_ctes)
32 })?;
33 &materialized
34 } else {
35 stmt
36 };
37
38 let lower_name = stmt.table.to_ascii_lowercase();
39 if schema.get_view(&lower_name).is_some() {
40 return Err(SqlError::CannotModifyView(stmt.table.clone()));
41 }
42 let table_schema = schema
43 .get(&lower_name)
44 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
45
46 let insert_columns = if stmt.columns.is_empty() {
47 table_schema
48 .columns
49 .iter()
50 .map(|c| c.name.clone())
51 .collect::<Vec<_>>()
52 } else {
53 stmt.columns
54 .iter()
55 .map(|c| c.to_ascii_lowercase())
56 .collect()
57 };
58
59 let col_indices: Vec<usize> = insert_columns
60 .iter()
61 .map(|name| {
62 table_schema
63 .column_index(name)
64 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
65 })
66 .collect::<Result<_>>()?;
67
68 for &ci in &col_indices {
69 if table_schema.columns[ci].generated_kind.is_some() {
70 return Err(SqlError::CannotInsertIntoGeneratedColumn(
71 table_schema.columns[ci].name.clone(),
72 ));
73 }
74 }
75
76 let defaults: Vec<(usize, &Expr)> = table_schema
77 .columns
78 .iter()
79 .filter(|c| c.default_expr.is_some() && !col_indices.contains(&(c.position as usize)))
80 .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
81 .collect();
82
83 let generated_cols: Vec<(usize, &Expr)> = table_schema
84 .columns
85 .iter()
86 .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
87 .map(|c| (c.position as usize, c.generated_expr.as_ref().unwrap()))
88 .collect();
89
90 let has_checks = table_schema.has_checks();
91 let strict = table_schema.is_strict();
92 let row_col_map_for_gen = if !generated_cols.is_empty() {
93 Some(ColumnMap::new(&table_schema.columns))
94 } else {
95 None
96 };
97 let check_col_map = if has_checks {
98 Some(ColumnMap::new(&table_schema.columns))
99 } else {
100 None
101 };
102
103 let select_rows = match &stmt.source {
104 InsertSource::Select(sq) => {
105 let insert_ctes =
106 super::materialize_all_ctes(&sq.ctes, sq.recursive, &mut |body, ctx| {
107 exec_query_body_read(db, schema, body, ctx)
108 })?;
109 let qr = exec_query_body_read(db, schema, &sq.body, &insert_ctes)?;
110 Some(qr.rows)
111 }
112 InsertSource::Values(_) => None,
113 };
114
115 let compiled_conflict: Option<Arc<CompiledOnConflict>> = stmt
116 .on_conflict
117 .as_ref()
118 .map(|oc| compile_on_conflict(oc, table_schema).map(Arc::new))
119 .transpose()?;
120
121 let row_col_map = compiled_conflict
122 .as_ref()
123 .map(|_| ColumnMap::new(&table_schema.columns));
124
125 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
126 let mut count: u64 = 0;
127 let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
128 stmt.returning.as_ref().map(|_| Vec::new());
129
130 let pk_indices = table_schema.pk_indices();
131 let non_pk = table_schema.non_pk_indices();
132 let enc_pos = table_schema.encoding_positions();
133 let phys_count = table_schema.physical_non_pk_count();
134 let mut row = vec![Value::Null; table_schema.columns.len()];
135 let mut pk_values: Vec<Value> = vec![Value::Null; pk_indices.len()];
136 let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
137 let mut key_buf: Vec<u8> = Vec::with_capacity(64);
138 let mut value_buf: Vec<u8> = Vec::with_capacity(256);
139 let mut fk_key_buf: Vec<u8> = Vec::with_capacity(64);
140
141 let values = match &stmt.source {
142 InsertSource::Values(rows) => Some(rows.as_slice()),
143 InsertSource::Select(_) => None,
144 };
145 let sel_rows = select_rows.as_deref();
146
147 let total = match (values, sel_rows) {
148 (Some(rows), _) => rows.len(),
149 (_, Some(rows)) => rows.len(),
150 _ => 0,
151 };
152
153 if let Some(sel) = sel_rows {
154 if !sel.is_empty() && sel[0].len() != insert_columns.len() {
155 return Err(SqlError::InvalidValue(format!(
156 "INSERT ... SELECT column count mismatch: expected {}, got {}",
157 insert_columns.len(),
158 sel[0].len()
159 )));
160 }
161 }
162
163 for idx in 0..total {
164 for v in row.iter_mut() {
165 *v = Value::Null;
166 }
167
168 if let Some(value_rows) = values {
169 let value_row = &value_rows[idx];
170 if value_row.len() != insert_columns.len() {
171 return Err(SqlError::InvalidValue(format!(
172 "expected {} values, got {}",
173 insert_columns.len(),
174 value_row.len()
175 )));
176 }
177 for (i, expr) in value_row.iter().enumerate() {
178 let val = if let Expr::Parameter(n) = expr {
179 params
180 .get(n - 1)
181 .cloned()
182 .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?
183 } else {
184 eval_const_expr(expr)?
185 };
186 let col_idx = col_indices[i];
187 let col = &table_schema.columns[col_idx];
188 row[col_idx] = if val.is_null() {
189 Value::Null
190 } else {
191 coerce_for_column(val, col, strict)?
192 };
193 }
194 } else if let Some(sel) = sel_rows {
195 let sel_row = &sel[idx];
196 for (i, val) in sel_row.iter().enumerate() {
197 let col_idx = col_indices[i];
198 let col = &table_schema.columns[col_idx];
199 row[col_idx] = if val.is_null() {
200 Value::Null
201 } else {
202 coerce_for_column(val.clone(), col, strict)?
203 };
204 }
205 }
206
207 for &(pos, def_expr) in &defaults {
208 let val = eval_const_expr(def_expr)?;
209 let col = &table_schema.columns[pos];
210 if !val.is_null() {
211 row[pos] = coerce_for_column(val, col, strict)?;
212 }
213 }
214
215 if let Some(ref gen_map) = row_col_map_for_gen {
216 for &(pos, gen_expr) in &generated_cols {
217 let val = eval_expr(gen_expr, &EvalCtx::new(gen_map, &row))?;
218 let col = &table_schema.columns[pos];
219 row[pos] = if val.is_null() {
220 Value::Null
221 } else {
222 coerce_for_column(val, col, strict)?
223 };
224 }
225 }
226
227 for col in &table_schema.columns {
228 if !col.nullable && row[col.position as usize].is_null() {
229 return Err(SqlError::NotNullViolation(col.name.clone()));
230 }
231 }
232
233 if let Some(ref col_map) = check_col_map {
234 for col in &table_schema.columns {
235 if let Some(ref check) = col.check_expr {
236 let result = eval_expr(check, &EvalCtx::new(col_map, &row))?;
237 if !is_truthy(&result) && !result.is_null() {
238 let name = col.check_name.as_deref().unwrap_or(&col.name);
239 return Err(SqlError::CheckViolation(name.to_string()));
240 }
241 }
242 }
243 for tc in &table_schema.check_constraints {
244 let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &row))?;
245 if !is_truthy(&result) && !result.is_null() {
246 let name = tc.name.as_deref().unwrap_or(&tc.sql);
247 return Err(SqlError::CheckViolation(name.to_string()));
248 }
249 }
250 }
251
252 for fk in &table_schema.foreign_keys {
253 let any_null = fk.columns.iter().any(|&ci| row[ci as usize].is_null());
254 if any_null {
255 continue; }
257 let fk_vals: Vec<Value> = fk
258 .columns
259 .iter()
260 .map(|&ci| row[ci as usize].clone())
261 .collect();
262 fk_key_buf.clear();
263 encode_composite_key_into(&fk_vals, &mut fk_key_buf);
264 let found = wtx
265 .table_get(fk.foreign_table.as_bytes(), &fk_key_buf)
266 .map_err(SqlError::Storage)?;
267 if found.is_none() {
268 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
269 return Err(SqlError::ForeignKeyViolation(name.to_string()));
270 }
271 }
272
273 let proposed_row_for_returning: Option<Vec<Value>> =
274 returning_rows.as_ref().map(|_| row.clone());
275
276 for (j, &i) in pk_indices.iter().enumerate() {
277 pk_values[j] = std::mem::replace(&mut row[i], Value::Null);
278 }
279 encode_composite_key_into(&pk_values, &mut key_buf);
280
281 for (j, &i) in non_pk.iter().enumerate() {
282 let col = &table_schema.columns[i];
283 if matches!(
284 col.generated_kind,
285 Some(crate::parser::GeneratedKind::Virtual)
286 ) {
287 value_values[enc_pos[j] as usize] = Value::Null;
288 row[i] = Value::Null;
289 } else {
290 value_values[enc_pos[j] as usize] = std::mem::replace(&mut row[i], Value::Null);
291 }
292 }
293 encode_row_into(&value_values, &mut value_buf);
294
295 if key_buf.len() > citadel_core::MAX_KEY_SIZE {
296 return Err(SqlError::KeyTooLarge {
297 size: key_buf.len(),
298 max: citadel_core::MAX_KEY_SIZE,
299 });
300 }
301 if value_buf.len() > citadel_core::MAX_VALUE_SIZE {
302 return Err(SqlError::RowTooLarge {
303 size: value_buf.len(),
304 max: citadel_core::MAX_VALUE_SIZE,
305 });
306 }
307
308 match compiled_conflict.as_ref() {
309 None => {
310 let is_new = wtx
311 .table_insert(stmt.table.as_bytes(), &key_buf, &value_buf)
312 .map_err(SqlError::Storage)?;
313 if !is_new {
314 return Err(SqlError::DuplicateKey);
315 }
316 if !table_schema.indices.is_empty() {
317 for (j, &i) in pk_indices.iter().enumerate() {
318 row[i] = pk_values[j].clone();
319 }
320 for (j, &i) in non_pk.iter().enumerate() {
321 row[i] =
322 std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
323 }
324 insert_index_entries(&mut wtx, table_schema, &row, &pk_values)?;
325 }
326 count += 1;
327 if let Some(buf) = returning_rows.as_mut() {
328 buf.push((None, proposed_row_for_returning));
329 }
330 }
331 Some(oc) => {
332 let oc_ref: &CompiledOnConflict = oc;
333 let needs_row = upsert_needs_row(oc_ref, table_schema);
334 if needs_row {
335 for (j, &i) in pk_indices.iter().enumerate() {
336 row[i] = pk_values[j].clone();
337 }
338 for (j, &i) in non_pk.iter().enumerate() {
339 row[i] =
340 std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
341 }
342 }
343 let outcome = apply_insert_with_conflict(
344 &mut wtx,
345 table_schema,
346 &key_buf,
347 &value_buf,
348 &row,
349 &pk_values,
350 oc_ref,
351 row_col_map.as_ref().unwrap(),
352 stmt.returning.is_some(),
353 )?;
354 match outcome {
355 InsertRowOutcome::Inserted => {
356 count += 1;
357 if let Some(buf) = returning_rows.as_mut() {
358 buf.push((None, proposed_row_for_returning));
359 }
360 }
361 InsertRowOutcome::Updated { old, new } => {
362 count += 1;
363 if let Some(buf) = returning_rows.as_mut() {
364 buf.push((Some(old), Some(new)));
365 }
366 }
367 InsertRowOutcome::Skipped => {}
368 }
369 }
370 }
371 }
372
373 if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
374 let qr = super::helpers::project_returning(table_schema, returning_cols, &rows)?;
375 wtx.commit().map_err(SqlError::Storage)?;
376 return Ok(ExecutionResult::Query(qr));
377 }
378
379 wtx.commit().map_err(SqlError::Storage)?;
380 Ok(ExecutionResult::RowsAffected(count))
381}
382
383pub(super) fn has_subquery(expr: &Expr) -> bool {
384 crate::parser::has_subquery(expr)
385}
386
387pub(super) fn stmt_has_subquery(stmt: &SelectStmt) -> bool {
388 if let Some(ref w) = stmt.where_clause {
389 if has_subquery(w) {
390 return true;
391 }
392 }
393 if let Some(ref h) = stmt.having {
394 if has_subquery(h) {
395 return true;
396 }
397 }
398 for col in &stmt.columns {
399 if let SelectColumn::Expr { expr, .. } = col {
400 if has_subquery(expr) {
401 return true;
402 }
403 }
404 }
405 for ob in &stmt.order_by {
406 if has_subquery(&ob.expr) {
407 return true;
408 }
409 }
410 for join in &stmt.joins {
411 if let Some(ref on_expr) = join.on_clause {
412 if has_subquery(on_expr) {
413 return true;
414 }
415 }
416 }
417 false
418}
419
420pub(super) fn materialize_expr(
421 expr: &Expr,
422 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
423) -> Result<Expr> {
424 match expr {
425 Expr::InSubquery {
426 expr: e,
427 subquery,
428 negated,
429 } => {
430 let inner = materialize_expr(e, exec_sub)?;
431 let qr = exec_sub(subquery)?;
432 if !qr.columns.is_empty() && qr.columns.len() != 1 {
433 return Err(SqlError::SubqueryMultipleColumns);
434 }
435 let mut values = rustc_hash::FxHashSet::default();
436 let mut has_null = false;
437 for row in &qr.rows {
438 if row[0].is_null() {
439 has_null = true;
440 } else {
441 values.insert(row[0].clone());
442 }
443 }
444 Ok(Expr::InSet {
445 expr: Box::new(inner),
446 values,
447 has_null,
448 negated: *negated,
449 })
450 }
451 Expr::ScalarSubquery(subquery) => {
452 let qr = exec_sub(subquery)?;
453 if qr.rows.len() > 1 {
454 return Err(SqlError::SubqueryMultipleRows);
455 }
456 let val = if qr.rows.is_empty() {
457 Value::Null
458 } else {
459 qr.rows[0][0].clone()
460 };
461 Ok(Expr::Literal(val))
462 }
463 Expr::Exists { subquery, negated } => {
464 let qr = exec_sub(subquery)?;
465 let exists = !qr.rows.is_empty();
466 let result = if *negated { !exists } else { exists };
467 Ok(Expr::Literal(Value::Boolean(result)))
468 }
469 Expr::InList {
470 expr: e,
471 list,
472 negated,
473 } => {
474 let inner = materialize_expr(e, exec_sub)?;
475 let items = list
476 .iter()
477 .map(|item| materialize_expr(item, exec_sub))
478 .collect::<Result<Vec<_>>>()?;
479 Ok(Expr::InList {
480 expr: Box::new(inner),
481 list: items,
482 negated: *negated,
483 })
484 }
485 Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
486 left: Box::new(materialize_expr(left, exec_sub)?),
487 op: *op,
488 right: Box::new(materialize_expr(right, exec_sub)?),
489 }),
490 Expr::UnaryOp { op, expr: e } => Ok(Expr::UnaryOp {
491 op: *op,
492 expr: Box::new(materialize_expr(e, exec_sub)?),
493 }),
494 Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(materialize_expr(e, exec_sub)?))),
495 Expr::IsNotNull(e) => Ok(Expr::IsNotNull(Box::new(materialize_expr(e, exec_sub)?))),
496 Expr::InSet {
497 expr: e,
498 values,
499 has_null,
500 negated,
501 } => Ok(Expr::InSet {
502 expr: Box::new(materialize_expr(e, exec_sub)?),
503 values: values.clone(),
504 has_null: *has_null,
505 negated: *negated,
506 }),
507 Expr::Between {
508 expr: e,
509 low,
510 high,
511 negated,
512 } => Ok(Expr::Between {
513 expr: Box::new(materialize_expr(e, exec_sub)?),
514 low: Box::new(materialize_expr(low, exec_sub)?),
515 high: Box::new(materialize_expr(high, exec_sub)?),
516 negated: *negated,
517 }),
518 Expr::Like {
519 expr: e,
520 pattern,
521 escape,
522 negated,
523 } => {
524 let esc = escape
525 .as_ref()
526 .map(|es| materialize_expr(es, exec_sub).map(Box::new))
527 .transpose()?;
528 Ok(Expr::Like {
529 expr: Box::new(materialize_expr(e, exec_sub)?),
530 pattern: Box::new(materialize_expr(pattern, exec_sub)?),
531 escape: esc,
532 negated: *negated,
533 })
534 }
535 Expr::Case {
536 operand,
537 conditions,
538 else_result,
539 } => {
540 let op = operand
541 .as_ref()
542 .map(|e| materialize_expr(e, exec_sub).map(Box::new))
543 .transpose()?;
544 let conds = conditions
545 .iter()
546 .map(|(c, r)| {
547 Ok((
548 materialize_expr(c, exec_sub)?,
549 materialize_expr(r, exec_sub)?,
550 ))
551 })
552 .collect::<Result<Vec<_>>>()?;
553 let else_r = else_result
554 .as_ref()
555 .map(|e| materialize_expr(e, exec_sub).map(Box::new))
556 .transpose()?;
557 Ok(Expr::Case {
558 operand: op,
559 conditions: conds,
560 else_result: else_r,
561 })
562 }
563 Expr::Coalesce(args) => {
564 let materialized = args
565 .iter()
566 .map(|a| materialize_expr(a, exec_sub))
567 .collect::<Result<Vec<_>>>()?;
568 Ok(Expr::Coalesce(materialized))
569 }
570 Expr::Cast { expr: e, data_type } => Ok(Expr::Cast {
571 expr: Box::new(materialize_expr(e, exec_sub)?),
572 data_type: *data_type,
573 }),
574 Expr::Function {
575 name,
576 args,
577 distinct,
578 } => {
579 let materialized = args
580 .iter()
581 .map(|a| materialize_expr(a, exec_sub))
582 .collect::<Result<Vec<_>>>()?;
583 Ok(Expr::Function {
584 name: name.clone(),
585 args: materialized,
586 distinct: *distinct,
587 })
588 }
589 other => Ok(other.clone()),
590 }
591}
592
593pub(super) fn materialize_stmt(
594 stmt: &SelectStmt,
595 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
596) -> Result<SelectStmt> {
597 let where_clause = stmt
598 .where_clause
599 .as_ref()
600 .map(|e| materialize_expr(e, exec_sub))
601 .transpose()?;
602 let having = stmt
603 .having
604 .as_ref()
605 .map(|e| materialize_expr(e, exec_sub))
606 .transpose()?;
607 let columns = stmt
608 .columns
609 .iter()
610 .map(|c| match c {
611 SelectColumn::AllColumns => Ok(SelectColumn::AllColumns),
612 SelectColumn::AllFromOld => Ok(SelectColumn::AllFromOld),
613 SelectColumn::AllFromNew => Ok(SelectColumn::AllFromNew),
614 SelectColumn::Expr { expr, alias } => Ok(SelectColumn::Expr {
615 expr: materialize_expr(expr, exec_sub)?,
616 alias: alias.clone(),
617 }),
618 })
619 .collect::<Result<Vec<_>>>()?;
620 let order_by = stmt
621 .order_by
622 .iter()
623 .map(|ob| {
624 Ok(OrderByItem {
625 expr: materialize_expr(&ob.expr, exec_sub)?,
626 descending: ob.descending,
627 nulls_first: ob.nulls_first,
628 })
629 })
630 .collect::<Result<Vec<_>>>()?;
631 let joins = stmt
632 .joins
633 .iter()
634 .map(|j| {
635 let on_clause = j
636 .on_clause
637 .as_ref()
638 .map(|e| materialize_expr(e, exec_sub))
639 .transpose()?;
640 Ok(JoinClause {
641 join_type: j.join_type,
642 table: j.table.clone(),
643 subquery: j.subquery.clone(),
644 on_clause,
645 })
646 })
647 .collect::<Result<Vec<_>>>()?;
648 let group_by = stmt
649 .group_by
650 .iter()
651 .map(|e| materialize_expr(e, exec_sub))
652 .collect::<Result<Vec<_>>>()?;
653 Ok(SelectStmt {
654 columns,
655 from: stmt.from.clone(),
656 from_alias: stmt.from_alias.clone(),
657 from_subquery: stmt.from_subquery.clone(),
658 from_args: stmt.from_args.clone(),
659 from_json_table: stmt.from_json_table.clone(),
660 joins,
661 distinct: stmt.distinct,
662 where_clause,
663 order_by,
664 limit: stmt.limit.clone(),
665 offset: stmt.offset.clone(),
666 group_by,
667 having,
668 })
669}
670
671pub(super) fn exec_subquery_read(
672 db: &Database,
673 schema: &SchemaManager,
674 stmt: &SelectStmt,
675 ctes: &CteContext,
676) -> Result<QueryResult> {
677 match super::exec_select(db, schema, stmt, ctes)? {
678 ExecutionResult::Query(qr) => Ok(qr),
679 _ => Ok(QueryResult {
680 columns: vec![],
681 rows: vec![],
682 }),
683 }
684}
685
686pub(super) fn exec_subquery_write(
687 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
688 schema: &SchemaManager,
689 stmt: &SelectStmt,
690 ctes: &CteContext,
691) -> Result<QueryResult> {
692 match super::exec_select_in_txn(wtx, schema, stmt, ctes)? {
693 ExecutionResult::Query(qr) => Ok(qr),
694 _ => Ok(QueryResult {
695 columns: vec![],
696 rows: vec![],
697 }),
698 }
699}
700
701pub(super) fn update_has_subquery(stmt: &UpdateStmt) -> bool {
702 stmt.where_clause.as_ref().is_some_and(has_subquery)
703 || stmt.assignments.iter().any(|(_, e)| has_subquery(e))
704}
705
706pub(super) fn materialize_update(
707 stmt: &UpdateStmt,
708 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
709) -> Result<UpdateStmt> {
710 let where_clause = stmt
711 .where_clause
712 .as_ref()
713 .map(|e| materialize_expr(e, exec_sub))
714 .transpose()?;
715 let assignments = stmt
716 .assignments
717 .iter()
718 .map(|(name, expr)| Ok((name.clone(), materialize_expr(expr, exec_sub)?)))
719 .collect::<Result<Vec<_>>>()?;
720 Ok(UpdateStmt {
721 table: stmt.table.clone(),
722 assignments,
723 where_clause,
724 returning: stmt.returning.clone(),
725 })
726}
727
728pub(super) fn delete_has_subquery(stmt: &DeleteStmt) -> bool {
729 stmt.where_clause.as_ref().is_some_and(has_subquery)
730}
731
732pub(super) fn materialize_delete(
733 stmt: &DeleteStmt,
734 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
735) -> Result<DeleteStmt> {
736 let where_clause = stmt
737 .where_clause
738 .as_ref()
739 .map(|e| materialize_expr(e, exec_sub))
740 .transpose()?;
741 Ok(DeleteStmt {
742 table: stmt.table.clone(),
743 where_clause,
744 returning: stmt.returning.clone(),
745 })
746}
747
748pub(super) fn insert_has_subquery(stmt: &InsertStmt) -> bool {
749 match &stmt.source {
750 InsertSource::Values(rows) => rows.iter().any(|row| row.iter().any(has_subquery)),
751 InsertSource::Select(_) => false,
753 }
754}
755
756pub(super) fn materialize_insert(
757 stmt: &InsertStmt,
758 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
759) -> Result<InsertStmt> {
760 let source = match &stmt.source {
761 InsertSource::Values(rows) => {
762 let mat = rows
763 .iter()
764 .map(|row| {
765 row.iter()
766 .map(|e| materialize_expr(e, exec_sub))
767 .collect::<Result<Vec<_>>>()
768 })
769 .collect::<Result<Vec<_>>>()?;
770 InsertSource::Values(mat)
771 }
772 InsertSource::Select(sq) => {
773 let ctes = sq
774 .ctes
775 .iter()
776 .map(|c| {
777 Ok(CteDefinition {
778 name: c.name.clone(),
779 column_aliases: c.column_aliases.clone(),
780 body: materialize_query_body(&c.body, exec_sub)?,
781 })
782 })
783 .collect::<Result<Vec<_>>>()?;
784 let body = materialize_query_body(&sq.body, exec_sub)?;
785 InsertSource::Select(Box::new(SelectQuery {
786 ctes,
787 recursive: sq.recursive,
788 body,
789 }))
790 }
791 };
792 Ok(InsertStmt {
793 table: stmt.table.clone(),
794 columns: stmt.columns.clone(),
795 source,
796 on_conflict: stmt.on_conflict.clone(),
797 returning: stmt.returning.clone(),
798 })
799}
800
801pub(super) fn materialize_query_body(
802 body: &QueryBody,
803 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
804) -> Result<QueryBody> {
805 match body {
806 QueryBody::Select(sel) => Ok(QueryBody::Select(Box::new(materialize_stmt(
807 sel, exec_sub,
808 )?))),
809 QueryBody::Compound(comp) => Ok(QueryBody::Compound(Box::new(CompoundSelect {
810 op: comp.op.clone(),
811 all: comp.all,
812 left: Box::new(materialize_query_body(&comp.left, exec_sub)?),
813 right: Box::new(materialize_query_body(&comp.right, exec_sub)?),
814 order_by: comp.order_by.clone(),
815 limit: comp.limit.clone(),
816 offset: comp.offset.clone(),
817 }))),
818 QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Ok(body.clone()),
819 }
820}
821
822pub(super) fn exec_query_body(
823 db: &Database,
824 schema: &SchemaManager,
825 body: &QueryBody,
826 ctes: &CteContext,
827) -> Result<ExecutionResult> {
828 match body {
829 QueryBody::Select(sel) => super::exec_select(db, schema, sel, ctes),
830 QueryBody::Compound(comp) => exec_compound_select(db, schema, comp, ctes),
831 QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Err(
832 SqlError::Unsupported("DML CTE bodies require an active write transaction".into()),
833 ),
834 }
835}
836
837pub(super) fn exec_query_body_in_txn(
838 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
839 schema: &SchemaManager,
840 body: &QueryBody,
841 ctes: &CteContext,
842) -> Result<ExecutionResult> {
843 match body {
844 QueryBody::Select(sel) => super::exec_select_in_txn(wtx, schema, sel, ctes),
845 QueryBody::Compound(comp) => exec_compound_select_in_txn(wtx, schema, comp, ctes),
846 QueryBody::Insert(ins) => exec_insert_in_txn_with_ctes(wtx, schema, ins, &[], ctes),
847 QueryBody::Update(upd) => super::exec_update_in_txn(wtx, schema, upd),
848 QueryBody::Delete(del) => super::exec_delete_in_txn(wtx, schema, del),
849 }
850}
851
852pub(super) fn exec_query_body_read(
853 db: &Database,
854 schema: &SchemaManager,
855 body: &QueryBody,
856 ctes: &CteContext,
857) -> Result<QueryResult> {
858 match exec_query_body(db, schema, body, ctes)? {
859 ExecutionResult::Query(qr) => Ok(qr),
860 _ => Ok(QueryResult {
861 columns: vec![],
862 rows: vec![],
863 }),
864 }
865}
866
867pub(super) fn exec_query_body_write(
868 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
869 schema: &SchemaManager,
870 body: &QueryBody,
871 ctes: &CteContext,
872) -> Result<QueryResult> {
873 match exec_query_body_in_txn(wtx, schema, body, ctes)? {
874 ExecutionResult::Query(qr) => Ok(qr),
875 _ => Ok(QueryResult {
876 columns: vec![],
877 rows: vec![],
878 }),
879 }
880}
881
882pub(super) fn exec_compound_select(
883 db: &Database,
884 schema: &SchemaManager,
885 comp: &CompoundSelect,
886 ctes: &CteContext,
887) -> Result<ExecutionResult> {
888 let left_qr = match exec_query_body(db, schema, &comp.left, ctes)? {
889 ExecutionResult::Query(qr) => qr,
890 _ => QueryResult {
891 columns: vec![],
892 rows: vec![],
893 },
894 };
895 let right_qr = match exec_query_body(db, schema, &comp.right, ctes)? {
896 ExecutionResult::Query(qr) => qr,
897 _ => QueryResult {
898 columns: vec![],
899 rows: vec![],
900 },
901 };
902 apply_set_operation(comp, left_qr, right_qr)
903}
904
905pub(super) fn exec_compound_select_in_txn(
906 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
907 schema: &SchemaManager,
908 comp: &CompoundSelect,
909 ctes: &CteContext,
910) -> Result<ExecutionResult> {
911 let left_qr = match exec_query_body_in_txn(wtx, schema, &comp.left, ctes)? {
912 ExecutionResult::Query(qr) => qr,
913 _ => QueryResult {
914 columns: vec![],
915 rows: vec![],
916 },
917 };
918 let right_qr = match exec_query_body_in_txn(wtx, schema, &comp.right, ctes)? {
919 ExecutionResult::Query(qr) => qr,
920 _ => QueryResult {
921 columns: vec![],
922 rows: vec![],
923 },
924 };
925 apply_set_operation(comp, left_qr, right_qr)
926}
927
928pub(super) fn apply_set_operation(
929 comp: &CompoundSelect,
930 left_qr: QueryResult,
931 right_qr: QueryResult,
932) -> Result<ExecutionResult> {
933 if !left_qr.columns.is_empty()
934 && !right_qr.columns.is_empty()
935 && left_qr.columns.len() != right_qr.columns.len()
936 {
937 return Err(SqlError::CompoundColumnCountMismatch {
938 left: left_qr.columns.len(),
939 right: right_qr.columns.len(),
940 });
941 }
942
943 let columns = left_qr.columns;
944
945 let mut rows = match (&comp.op, comp.all) {
946 (SetOp::Union, true) => {
947 let mut rows = left_qr.rows;
948 rows.extend(right_qr.rows);
949 rows
950 }
951 (SetOp::Union, false) => {
952 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
953 let mut rows = Vec::new();
954 for row in left_qr.rows.into_iter().chain(right_qr.rows) {
955 if !seen.contains(&row) {
956 seen.insert(row.clone());
957 rows.push(row);
958 }
959 }
960 rows
961 }
962 (SetOp::Intersect, true) => {
963 let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
964 for row in &right_qr.rows {
965 *right_counts.entry(row.clone()).or_insert(0) += 1;
966 }
967 let mut rows = Vec::new();
968 for row in left_qr.rows {
969 if let Some(count) = right_counts.get_mut(&row) {
970 if *count > 0 {
971 *count -= 1;
972 rows.push(row);
973 }
974 }
975 }
976 rows
977 }
978 (SetOp::Intersect, false) => {
979 let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
980 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
981 let mut rows = Vec::new();
982 for row in left_qr.rows {
983 if right_set.contains(&row) && !seen.contains(&row) {
984 seen.insert(row.clone());
985 rows.push(row);
986 }
987 }
988 rows
989 }
990 (SetOp::Except, true) => {
991 let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
992 for row in &right_qr.rows {
993 *right_counts.entry(row.clone()).or_insert(0) += 1;
994 }
995 let mut rows = Vec::new();
996 for row in left_qr.rows {
997 if let Some(count) = right_counts.get_mut(&row) {
998 if *count > 0 {
999 *count -= 1;
1000 continue;
1001 }
1002 }
1003 rows.push(row);
1004 }
1005 rows
1006 }
1007 (SetOp::Except, false) => {
1008 let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1009 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1010 let mut rows = Vec::new();
1011 for row in left_qr.rows {
1012 if !right_set.contains(&row) && !seen.contains(&row) {
1013 seen.insert(row.clone());
1014 rows.push(row);
1015 }
1016 }
1017 rows
1018 }
1019 };
1020
1021 if !comp.order_by.is_empty() {
1022 let col_defs: Vec<crate::types::ColumnDef> = columns
1023 .iter()
1024 .enumerate()
1025 .map(|(i, name)| crate::types::ColumnDef {
1026 name: name.clone(),
1027 data_type: crate::types::DataType::Null,
1028 nullable: true,
1029 position: i as u16,
1030 default_expr: None,
1031 default_sql: None,
1032 check_expr: None,
1033 check_sql: None,
1034 check_name: None,
1035 is_with_timezone: false,
1036 generated_expr: None,
1037 generated_sql: None,
1038 generated_kind: None,
1039 collation: crate::types::Collation::Binary,
1040 })
1041 .collect();
1042 sort_rows(&mut rows, &comp.order_by, &col_defs)?;
1043 }
1044
1045 if let Some(ref offset_expr) = comp.offset {
1046 let offset = eval_const_int(offset_expr)?.max(0) as usize;
1047 if offset < rows.len() {
1048 rows = rows.split_off(offset);
1049 } else {
1050 rows.clear();
1051 }
1052 }
1053
1054 if let Some(ref limit_expr) = comp.limit {
1055 let limit = eval_const_int(limit_expr)?.max(0) as usize;
1056 rows.truncate(limit);
1057 }
1058
1059 Ok(ExecutionResult::Query(QueryResult { columns, rows }))
1060}
1061
1062struct InsertBufs {
1063 row: Vec<Value>,
1064 pk_values: Vec<Value>,
1065 value_values: Vec<Value>,
1066 key_buf: Vec<u8>,
1067 value_buf: Vec<u8>,
1068 col_indices: Vec<usize>,
1069 fk_key_buf: Vec<u8>,
1070}
1071
1072impl InsertBufs {
1073 fn new() -> Self {
1074 Self {
1075 row: Vec::new(),
1076 pk_values: Vec::new(),
1077 value_values: Vec::new(),
1078 key_buf: Vec::with_capacity(64),
1079 value_buf: Vec::with_capacity(256),
1080 col_indices: Vec::new(),
1081 fk_key_buf: Vec::with_capacity(64),
1082 }
1083 }
1084}
1085
1086thread_local! {
1087 static INSERT_SCRATCH: RefCell<InsertBufs> = RefCell::new(InsertBufs::new());
1088 static UPSERT_SCRATCH: RefCell<UpsertBufs> = RefCell::new(UpsertBufs::new());
1089}
1090
1091fn with_insert_scratch<R>(f: impl FnOnce(&mut InsertBufs) -> R) -> R {
1092 INSERT_SCRATCH.with(|slot| f(&mut slot.borrow_mut()))
1093}
1094
1095pub(super) struct UpsertBufs {
1096 old_row: Vec<Value>,
1097 new_row: Vec<Value>,
1098 value_values: Vec<Value>,
1099 new_value_buf: Vec<u8>,
1100}
1101
1102impl UpsertBufs {
1103 pub(super) fn new() -> Self {
1104 Self {
1105 old_row: Vec::new(),
1106 new_row: Vec::new(),
1107 value_values: Vec::new(),
1108 new_value_buf: Vec::with_capacity(256),
1109 }
1110 }
1111}
1112
1113pub fn exec_insert_in_txn(
1114 wtx: &mut WriteTxn<'_>,
1115 schema: &SchemaManager,
1116 stmt: &InsertStmt,
1117 params: &[Value],
1118) -> Result<ExecutionResult> {
1119 with_insert_scratch(|bufs| {
1120 exec_insert_in_txn_impl(
1121 wtx,
1122 schema,
1123 stmt,
1124 params,
1125 bufs,
1126 None,
1127 &CteContext::default(),
1128 )
1129 })
1130}
1131
1132pub(super) fn exec_insert_in_txn_with_ctes(
1133 wtx: &mut WriteTxn<'_>,
1134 schema: &SchemaManager,
1135 stmt: &InsertStmt,
1136 params: &[Value],
1137 outer_ctes: &CteContext,
1138) -> Result<ExecutionResult> {
1139 with_insert_scratch(|bufs| {
1140 exec_insert_in_txn_impl(wtx, schema, stmt, params, bufs, None, outer_ctes)
1141 })
1142}
1143
1144fn exec_insert_in_txn_cached(
1145 wtx: &mut WriteTxn<'_>,
1146 schema: &SchemaManager,
1147 stmt: &InsertStmt,
1148 params: &[Value],
1149 cache: &InsertCache,
1150) -> Result<ExecutionResult> {
1151 with_insert_scratch(|bufs| {
1152 exec_insert_in_txn_impl(
1153 wtx,
1154 schema,
1155 stmt,
1156 params,
1157 bufs,
1158 Some(cache),
1159 &CteContext::default(),
1160 )
1161 })
1162}
1163
1164fn exec_insert_in_txn_impl(
1165 wtx: &mut WriteTxn<'_>,
1166 schema: &SchemaManager,
1167 stmt: &InsertStmt,
1168 params: &[Value],
1169 bufs: &mut InsertBufs,
1170 cache: Option<&InsertCache>,
1171 outer_ctes: &CteContext,
1172) -> Result<ExecutionResult> {
1173 let empty_ctes = CteContext::default();
1174 let materialized;
1175 let has_sub = match cache {
1176 Some(c) => c.has_subquery,
1177 None => insert_has_subquery(stmt),
1178 };
1179 let stmt = if has_sub {
1180 materialized = materialize_insert(stmt, &mut |sub| {
1181 exec_subquery_write(wtx, schema, sub, &empty_ctes)
1182 })?;
1183 &materialized
1184 } else {
1185 stmt
1186 };
1187
1188 let table_schema = schema
1189 .get(&stmt.table)
1190 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1191
1192 let default_columns;
1193 let insert_columns: &[String] = if stmt.columns.is_empty() {
1194 default_columns = table_schema
1195 .columns
1196 .iter()
1197 .map(|c| c.name.clone())
1198 .collect::<Vec<_>>();
1199 &default_columns
1200 } else {
1201 &stmt.columns
1202 };
1203
1204 bufs.col_indices.clear();
1205 if let Some(c) = cache {
1206 bufs.col_indices.extend_from_slice(&c.col_indices);
1207 } else {
1208 for name in insert_columns {
1209 bufs.col_indices.push(
1210 table_schema
1211 .column_index(name)
1212 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?,
1213 );
1214 }
1215 }
1216
1217 if cache.is_none() {
1218 for &ci in &bufs.col_indices {
1219 if table_schema.columns[ci].generated_kind.is_some() {
1220 return Err(SqlError::CannotInsertIntoGeneratedColumn(
1221 table_schema.columns[ci].name.clone(),
1222 ));
1223 }
1224 }
1225 }
1226
1227 let generated_cols_uncached: Vec<(usize, &Expr, FastGenEval)>;
1228 let cached_gen_positions: &[usize];
1229 let cached_gen_fast_evals: &[FastGenEval];
1230 if let Some(c) = cache {
1231 cached_gen_positions = &c.generated_col_positions;
1232 cached_gen_fast_evals = &c.generated_fast_evals;
1233 generated_cols_uncached = Vec::new();
1234 } else {
1235 cached_gen_positions = &[];
1236 cached_gen_fast_evals = &[];
1237 generated_cols_uncached = table_schema
1238 .columns
1239 .iter()
1240 .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
1241 .map(|c| {
1242 let expr = c.generated_expr.as_ref().unwrap();
1243 let fe = detect_fast_gen_eval(expr, table_schema);
1244 (c.position as usize, expr, fe)
1245 })
1246 .collect();
1247 }
1248 let has_gen_cols = !cached_gen_positions.is_empty() || !generated_cols_uncached.is_empty();
1249 let row_col_map_for_gen_owned: Option<ColumnMap> = if !has_gen_cols || cache.is_some() {
1250 None
1251 } else {
1252 Some(ColumnMap::new(&table_schema.columns))
1253 };
1254 let row_col_map_for_gen: Option<&ColumnMap> = if !has_gen_cols {
1255 None
1256 } else if let Some(c) = cache {
1257 c.row_col_map.as_ref()
1258 } else {
1259 row_col_map_for_gen_owned.as_ref()
1260 };
1261
1262 let any_defaults = match cache {
1263 Some(c) => c.any_defaults,
1264 None => table_schema
1265 .columns
1266 .iter()
1267 .any(|c| c.default_expr.is_some()),
1268 };
1269 let defaults: Vec<(usize, &Expr)> = if any_defaults {
1270 table_schema
1271 .columns
1272 .iter()
1273 .filter(|c| {
1274 c.default_expr.is_some() && !bufs.col_indices.contains(&(c.position as usize))
1275 })
1276 .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
1277 .collect()
1278 } else {
1279 Vec::new()
1280 };
1281
1282 let has_checks = match cache {
1283 Some(c) => c.has_checks,
1284 None => table_schema.has_checks(),
1285 };
1286 let check_col_map = if has_checks {
1287 Some(ColumnMap::new(&table_schema.columns))
1288 } else {
1289 None
1290 };
1291
1292 let (pk_indices, non_pk, enc_pos, phys_count, dropped): (
1293 &[usize],
1294 &[usize],
1295 &[u16],
1296 usize,
1297 &[u16],
1298 ) = if let Some(c) = cache {
1299 (
1300 &c.pk_indices,
1301 &c.non_pk_indices,
1302 &c.encoding_positions,
1303 c.phys_count,
1304 &c.dropped_non_pk_slots,
1305 )
1306 } else {
1307 (
1308 table_schema.pk_indices(),
1309 table_schema.non_pk_indices(),
1310 table_schema.encoding_positions(),
1311 table_schema.physical_non_pk_count(),
1312 table_schema.dropped_non_pk_slots(),
1313 )
1314 };
1315
1316 bufs.row.resize(table_schema.columns.len(), Value::Null);
1317 bufs.pk_values.resize(pk_indices.len(), Value::Null);
1318 bufs.value_values.resize(phys_count, Value::Null);
1319
1320 let table_bytes = stmt.table.as_bytes();
1321 let has_fks = !table_schema.foreign_keys.is_empty();
1322 let has_indices = !table_schema.indices.is_empty();
1323 let has_defaults = !defaults.is_empty();
1324
1325 let compiled_conflict: Option<Arc<CompiledOnConflict>> = match (cache, &stmt.on_conflict) {
1326 (Some(c), Some(_)) if c.on_conflict.is_some() => c.on_conflict.clone(),
1327 (_, Some(oc)) => Some(Arc::new(compile_on_conflict(oc, table_schema)?)),
1328 (_, None) => None,
1329 };
1330
1331 let row_col_map_owned: Option<ColumnMap> =
1332 if compiled_conflict.is_some() && cache.and_then(|c| c.row_col_map.as_ref()).is_none() {
1333 Some(ColumnMap::new(&table_schema.columns))
1334 } else {
1335 None
1336 };
1337 let row_col_map: Option<&ColumnMap> = cache
1338 .and_then(|c| c.row_col_map.as_ref())
1339 .or(row_col_map_owned.as_ref());
1340
1341 let select_rows = match &stmt.source {
1342 InsertSource::Select(sq) => {
1343 let insert_ctes = super::materialize_all_ctes_with_outer(
1344 &sq.ctes,
1345 sq.recursive,
1346 outer_ctes,
1347 &mut |body, ctx| exec_query_body_write(wtx, schema, body, ctx),
1348 )?;
1349 let qr = exec_query_body_write(wtx, schema, &sq.body, &insert_ctes)?;
1350 Some(qr.rows)
1351 }
1352 InsertSource::Values(_) => None,
1353 };
1354
1355 let mut count: u64 = 0;
1356 let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
1357 stmt.returning.as_ref().map(|_| Vec::new());
1358
1359 let values = match &stmt.source {
1360 InsertSource::Values(rows) => Some(rows.as_slice()),
1361 InsertSource::Select(_) => None,
1362 };
1363 let sel_rows = select_rows.as_deref();
1364
1365 let total = match (values, sel_rows) {
1366 (Some(rows), _) => rows.len(),
1367 (_, Some(rows)) => rows.len(),
1368 _ => 0,
1369 };
1370
1371 if let Some(sel) = sel_rows {
1372 if !sel.is_empty() && sel[0].len() != insert_columns.len() {
1373 return Err(SqlError::InvalidValue(format!(
1374 "INSERT ... SELECT column count mismatch: expected {}, got {}",
1375 insert_columns.len(),
1376 sel[0].len()
1377 )));
1378 }
1379 }
1380
1381 let skip_row_clear = cache.is_some_and(|c| c.row_fully_overwritten);
1382 for idx in 0..total {
1383 if !skip_row_clear {
1384 for v in bufs.row.iter_mut() {
1385 *v = Value::Null;
1386 }
1387 }
1388
1389 if let Some(value_rows) = values {
1390 if let Some(plan) = cache.and_then(|c| c.bind_plan.as_ref()) {
1391 for action in plan {
1392 match action {
1393 BindAction::Param {
1394 param_idx,
1395 col_idx,
1396 target,
1397 } => {
1398 let v = ¶ms[*param_idx];
1399 bufs.row[*col_idx] = if v.is_null() {
1400 Value::Null
1401 } else if v.data_type() == *target {
1402 v.clone()
1403 } else {
1404 let got = v.data_type();
1405 v.clone().coerce_into(*target).ok_or_else(|| {
1406 SqlError::TypeMismatch {
1407 expected: target.to_string(),
1408 got: got.to_string(),
1409 }
1410 })?
1411 };
1412 }
1413 BindAction::Literal { value, col_idx } => {
1414 bufs.row[*col_idx] = value.clone();
1415 }
1416 }
1417 }
1418 } else {
1419 let value_row = &value_rows[idx];
1420 if value_row.len() != insert_columns.len() {
1421 return Err(SqlError::InvalidValue(format!(
1422 "expected {} values, got {}",
1423 insert_columns.len(),
1424 value_row.len()
1425 )));
1426 }
1427 for (i, expr) in value_row.iter().enumerate() {
1428 let val = match expr {
1429 Expr::Parameter(n) => params
1430 .get(n - 1)
1431 .cloned()
1432 .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
1433 Expr::Literal(v) => v.clone(),
1434 _ => eval_const_expr(expr)?,
1435 };
1436 let col_idx = bufs.col_indices[i];
1437 let col = &table_schema.columns[col_idx];
1438 let got_type = val.data_type();
1439 bufs.row[col_idx] = if val.is_null() {
1440 Value::Null
1441 } else {
1442 val.coerce_into(col.data_type)
1443 .ok_or_else(|| SqlError::TypeMismatch {
1444 expected: col.data_type.to_string(),
1445 got: got_type.to_string(),
1446 })?
1447 };
1448 }
1449 }
1450 } else if let Some(sel) = sel_rows {
1451 let sel_row = &sel[idx];
1452 for (i, val) in sel_row.iter().enumerate() {
1453 let col_idx = bufs.col_indices[i];
1454 let col = &table_schema.columns[col_idx];
1455 let got_type = val.data_type();
1456 bufs.row[col_idx] = if val.is_null() {
1457 Value::Null
1458 } else {
1459 val.clone().coerce_into(col.data_type).ok_or_else(|| {
1460 SqlError::TypeMismatch {
1461 expected: col.data_type.to_string(),
1462 got: got_type.to_string(),
1463 }
1464 })?
1465 };
1466 }
1467 }
1468
1469 if has_defaults {
1470 for &(pos, def_expr) in &defaults {
1471 let val = eval_const_expr(def_expr)?;
1472 let col = &table_schema.columns[pos];
1473 if !val.is_null() {
1474 let got_type = val.data_type();
1475 bufs.row[pos] =
1476 val.coerce_into(col.data_type)
1477 .ok_or_else(|| SqlError::TypeMismatch {
1478 expected: col.data_type.to_string(),
1479 got: got_type.to_string(),
1480 })?;
1481 }
1482 }
1483 }
1484
1485 if let Some(gen_map) = row_col_map_for_gen {
1486 if cache.is_some() {
1487 for (pos, fast) in cached_gen_positions
1488 .iter()
1489 .copied()
1490 .zip(cached_gen_fast_evals.iter())
1491 {
1492 let gen_expr = table_schema.columns[pos].generated_expr.as_ref().unwrap();
1493 let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1494 let col = &table_schema.columns[pos];
1495 bufs.row[pos] = if val.is_null() {
1496 Value::Null
1497 } else {
1498 let got_type = val.data_type();
1499 val.coerce_into(col.data_type)
1500 .ok_or_else(|| SqlError::TypeMismatch {
1501 expected: col.data_type.to_string(),
1502 got: got_type.to_string(),
1503 })?
1504 };
1505 }
1506 } else {
1507 for (pos, gen_expr, fast) in &generated_cols_uncached {
1508 let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1509 let col = &table_schema.columns[*pos];
1510 bufs.row[*pos] = if val.is_null() {
1511 Value::Null
1512 } else {
1513 let got_type = val.data_type();
1514 val.coerce_into(col.data_type)
1515 .ok_or_else(|| SqlError::TypeMismatch {
1516 expected: col.data_type.to_string(),
1517 got: got_type.to_string(),
1518 })?
1519 };
1520 }
1521 }
1522 }
1523
1524 if let Some(c) = cache {
1525 for &pos in &c.not_null_indices {
1526 if bufs.row[pos as usize].is_null() {
1527 return Err(SqlError::NotNullViolation(
1528 table_schema.columns[pos as usize].name.clone(),
1529 ));
1530 }
1531 }
1532 } else {
1533 for col in &table_schema.columns {
1534 if !col.nullable && bufs.row[col.position as usize].is_null() {
1535 return Err(SqlError::NotNullViolation(col.name.clone()));
1536 }
1537 }
1538 }
1539
1540 if let Some(ref col_map) = check_col_map {
1541 for col in &table_schema.columns {
1542 if let Some(ref check) = col.check_expr {
1543 let result = eval_expr(check, &EvalCtx::new(col_map, &bufs.row))?;
1544 if !is_truthy(&result) && !result.is_null() {
1545 let name = col.check_name.as_deref().unwrap_or(&col.name);
1546 return Err(SqlError::CheckViolation(name.to_string()));
1547 }
1548 }
1549 }
1550 for tc in &table_schema.check_constraints {
1551 let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &bufs.row))?;
1552 if !is_truthy(&result) && !result.is_null() {
1553 let name = tc.name.as_deref().unwrap_or(&tc.sql);
1554 return Err(SqlError::CheckViolation(name.to_string()));
1555 }
1556 }
1557 }
1558
1559 if has_fks {
1560 for fk in &table_schema.foreign_keys {
1561 let any_null = fk.columns.iter().any(|&ci| bufs.row[ci as usize].is_null());
1562 if any_null {
1563 continue;
1564 }
1565 crate::encoding::encode_composite_key_from_indices(
1566 &fk.columns,
1567 &bufs.row,
1568 &mut bufs.fk_key_buf,
1569 );
1570 let found = wtx
1571 .table_get(fk.foreign_table.as_bytes(), &bufs.fk_key_buf)
1572 .map_err(SqlError::Storage)?;
1573 if found.is_none() {
1574 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
1575 return Err(SqlError::ForeignKeyViolation(name.to_string()));
1576 }
1577 }
1578 }
1579
1580 let proposed_row_for_returning: Option<Vec<Value>> =
1581 returning_rows.as_ref().map(|_| bufs.row.clone());
1582
1583 for (j, &i) in pk_indices.iter().enumerate() {
1584 bufs.pk_values[j] = std::mem::replace(&mut bufs.row[i], Value::Null);
1585 }
1586 match cache.map(|c| c.single_int_pk).unwrap_or(false) {
1587 true => match bufs.pk_values[0] {
1588 Value::Integer(v) => crate::encoding::encode_int_key_into(v, &mut bufs.key_buf),
1589 _ => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1590 },
1591 false => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1592 }
1593
1594 for &slot in dropped {
1595 bufs.value_values[slot as usize] = Value::Null;
1596 }
1597 for (j, &i) in non_pk.iter().enumerate() {
1598 let col = &table_schema.columns[i];
1599 if matches!(
1600 col.generated_kind,
1601 Some(crate::parser::GeneratedKind::Virtual)
1602 ) {
1603 bufs.value_values[enc_pos[j] as usize] = Value::Null;
1604 bufs.row[i] = Value::Null;
1605 } else {
1606 bufs.value_values[enc_pos[j] as usize] =
1607 std::mem::replace(&mut bufs.row[i], Value::Null);
1608 }
1609 }
1610 match cache.and_then(|c| c.row_encoder.as_ref()) {
1611 Some(tmpl) => crate::encoding::encode_int_row_with_template(
1612 tmpl,
1613 &bufs.value_values,
1614 &mut bufs.value_buf,
1615 )?,
1616 None => encode_row_into(&bufs.value_values, &mut bufs.value_buf),
1617 }
1618
1619 if bufs.key_buf.len() > citadel_core::MAX_KEY_SIZE {
1620 return Err(SqlError::KeyTooLarge {
1621 size: bufs.key_buf.len(),
1622 max: citadel_core::MAX_KEY_SIZE,
1623 });
1624 }
1625 if bufs.value_buf.len() > citadel_core::MAX_VALUE_SIZE {
1626 return Err(SqlError::RowTooLarge {
1627 size: bufs.value_buf.len(),
1628 max: citadel_core::MAX_VALUE_SIZE,
1629 });
1630 }
1631
1632 match compiled_conflict.as_ref() {
1633 None => {
1634 let is_new = wtx
1635 .table_insert(table_bytes, &bufs.key_buf, &bufs.value_buf)
1636 .map_err(SqlError::Storage)?;
1637 if !is_new {
1638 return Err(SqlError::DuplicateKey);
1639 }
1640 if has_indices {
1641 for (j, &i) in pk_indices.iter().enumerate() {
1642 bufs.row[i] = bufs.pk_values[j].clone();
1643 }
1644 for (j, &i) in non_pk.iter().enumerate() {
1645 bufs.row[i] = std::mem::replace(
1646 &mut bufs.value_values[enc_pos[j] as usize],
1647 Value::Null,
1648 );
1649 }
1650 insert_index_entries(wtx, table_schema, &bufs.row, &bufs.pk_values)?;
1651 }
1652 count += 1;
1653 if let Some(buf) = returning_rows.as_mut() {
1654 buf.push((None, proposed_row_for_returning));
1655 }
1656 }
1657 Some(oc) => {
1658 let oc_ref: &CompiledOnConflict = oc;
1659 let needs_row = upsert_needs_row(oc_ref, table_schema);
1660 if needs_row {
1661 for (j, &i) in pk_indices.iter().enumerate() {
1662 bufs.row[i] = bufs.pk_values[j].clone();
1663 }
1664 for (j, &i) in non_pk.iter().enumerate() {
1665 bufs.row[i] = std::mem::replace(
1666 &mut bufs.value_values[enc_pos[j] as usize],
1667 Value::Null,
1668 );
1669 }
1670 }
1671 let outcome = apply_insert_with_conflict(
1672 wtx,
1673 table_schema,
1674 &bufs.key_buf,
1675 &bufs.value_buf,
1676 &bufs.row,
1677 &bufs.pk_values,
1678 oc_ref,
1679 row_col_map.unwrap(),
1680 stmt.returning.is_some(),
1681 )?;
1682 match outcome {
1683 InsertRowOutcome::Inserted => {
1684 count += 1;
1685 if let Some(buf) = returning_rows.as_mut() {
1686 buf.push((None, proposed_row_for_returning));
1687 }
1688 }
1689 InsertRowOutcome::Updated { old, new } => {
1690 count += 1;
1691 if let Some(buf) = returning_rows.as_mut() {
1692 buf.push((Some(old), Some(new)));
1693 }
1694 }
1695 InsertRowOutcome::Skipped => {}
1696 }
1697 }
1698 }
1699 }
1700
1701 if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
1702 return Ok(ExecutionResult::Query(super::helpers::project_returning(
1703 table_schema,
1704 returning_cols,
1705 &rows,
1706 )?));
1707 }
1708
1709 Ok(ExecutionResult::RowsAffected(count))
1710}
1711
1712pub struct CompiledInsert {
1713 table_lower: String,
1714 cached: Option<InsertCache>,
1715}
1716
1717struct InsertCache {
1718 col_indices: Vec<usize>,
1719 has_subquery: bool,
1720 any_defaults: bool,
1721 has_checks: bool,
1722 on_conflict: Option<Arc<CompiledOnConflict>>,
1723 row_col_map: Option<ColumnMap>,
1724 generated_col_positions: Vec<usize>,
1725 generated_fast_evals: Vec<FastGenEval>,
1726 pk_indices: Vec<usize>,
1727 non_pk_indices: Vec<usize>,
1728 encoding_positions: Vec<u16>,
1729 dropped_non_pk_slots: Vec<u16>,
1730 phys_count: usize,
1731 single_int_pk: bool,
1732 not_null_indices: Vec<u16>,
1733 bind_plan: Option<Vec<BindAction>>,
1734 row_fully_overwritten: bool,
1735 row_encoder: Option<crate::encoding::IntRowTemplate>,
1736 is_trivial_fast: bool,
1737 trivial_fast_program: Option<TrivialFastProgram>,
1738}
1739
1740#[derive(Clone)]
1741enum BindAction {
1742 Param {
1743 param_idx: usize,
1744 col_idx: usize,
1745 target: DataType,
1746 },
1747 Literal {
1748 value: Value,
1749 col_idx: usize,
1750 },
1751}
1752
1753#[derive(Clone)]
1754struct TrivialFastProgram {
1755 template: Vec<u8>,
1756 ops: Vec<WriteOp>,
1757 pk_param: u8,
1758 not_null_param_indices: Vec<u8>,
1759}
1760
1761#[derive(Clone)]
1762enum WriteOp {
1763 ParamI64 {
1764 param_idx: u8,
1765 off: u32,
1766 },
1767 LiteralI64 {
1768 value: i64,
1769 off: u32,
1770 },
1771 GenAddParamsI64 {
1772 a_param: u8,
1773 b_param: u8,
1774 off: u32,
1775 bitmap_byte_off: u32,
1776 bitmap_bit_mask: u8,
1777 },
1778 GenMulAddParamI64 {
1779 param_idx: u8,
1780 mul: i64,
1781 add: i64,
1782 off: u32,
1783 bitmap_byte_off: u32,
1784 bitmap_bit_mask: u8,
1785 },
1786}
1787
1788fn build_trivial_fast_program(
1789 bind_plan: &[BindAction],
1790 row_encoder: &crate::encoding::IntRowTemplate,
1791 non_virtual_pairs: &[(usize, usize)],
1792 generated_col_positions: &[usize],
1793 generated_fast_evals: &[FastGenEval],
1794 pk_indices: &[usize],
1795 columns: &[crate::types::ColumnDef],
1796) -> Option<TrivialFastProgram> {
1797 let pk_col = pk_indices[0];
1798 let col_to_slot: rustc_hash::FxHashMap<usize, usize> =
1799 non_virtual_pairs.iter().copied().collect();
1800 let slot_to_off: rustc_hash::FxHashMap<usize, usize> =
1801 row_encoder.slot_offsets.iter().copied().collect();
1802
1803 let mut col_to_param: rustc_hash::FxHashMap<usize, u8> = Default::default();
1804 let mut col_to_lit_int: rustc_hash::FxHashMap<usize, i64> = Default::default();
1805 let mut pk_param: Option<u8> = None;
1806 let mut ops: Vec<WriteOp> = Vec::with_capacity(bind_plan.len() + generated_col_positions.len());
1807 let mut not_null_param_indices: Vec<u8> = Vec::new();
1808
1809 for action in bind_plan {
1810 match action {
1811 BindAction::Param {
1812 param_idx,
1813 col_idx,
1814 target,
1815 } => {
1816 if *target != DataType::Integer {
1817 return None;
1818 }
1819 let pi: u8 = u8::try_from(*param_idx).ok()?;
1820 col_to_param.insert(*col_idx, pi);
1821 if *col_idx == pk_col {
1822 pk_param = Some(pi);
1823 } else {
1824 let slot = *col_to_slot.get(col_idx)?;
1825 let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
1826 ops.push(WriteOp::ParamI64 { param_idx: pi, off });
1827 if !columns[*col_idx].nullable {
1828 not_null_param_indices.push(pi);
1829 }
1830 }
1831 }
1832 BindAction::Literal { value, col_idx } => match value {
1833 Value::Integer(v) => {
1834 col_to_lit_int.insert(*col_idx, *v);
1835 if *col_idx == pk_col {
1836 return None;
1837 }
1838 let slot = *col_to_slot.get(col_idx)?;
1839 let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
1840 ops.push(WriteOp::LiteralI64 { value: *v, off });
1841 }
1842 _ => return None,
1843 },
1844 }
1845 }
1846
1847 let pk_param = pk_param?;
1848
1849 for (i, &gen_pos) in generated_col_positions.iter().enumerate() {
1850 let gen_slot = *col_to_slot.get(&gen_pos)?;
1851 let gen_off = u32::try_from(*slot_to_off.get(&gen_slot)?).ok()?;
1852 let bitmap_byte_off = u32::try_from(2 + gen_slot / 8).ok()?;
1853 let bitmap_bit_mask: u8 = 1u8 << (gen_slot % 8);
1854 let gen_col_nullable = columns[gen_pos].nullable;
1855
1856 match &generated_fast_evals[i] {
1857 FastGenEval::IntColAddCol {
1858 left_idx,
1859 right_idx,
1860 } => {
1861 let a_param = col_to_param.get(left_idx).copied();
1862 let b_param = col_to_param.get(right_idx).copied();
1863 match (a_param, b_param) {
1864 (Some(ap), Some(bp)) => {
1865 let deps_safe = gen_col_nullable
1866 || (not_null_param_indices.contains(&ap)
1867 && not_null_param_indices.contains(&bp));
1868 if !deps_safe {
1869 return None;
1870 }
1871 ops.push(WriteOp::GenAddParamsI64 {
1872 a_param: ap,
1873 b_param: bp,
1874 off: gen_off,
1875 bitmap_byte_off,
1876 bitmap_bit_mask,
1877 });
1878 }
1879 (Some(p), None) => {
1880 let lit = col_to_lit_int.get(right_idx).copied()?;
1881 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
1882 return None;
1883 }
1884 ops.push(WriteOp::GenMulAddParamI64 {
1885 param_idx: p,
1886 mul: 1,
1887 add: lit,
1888 off: gen_off,
1889 bitmap_byte_off,
1890 bitmap_bit_mask,
1891 });
1892 }
1893 (None, Some(p)) => {
1894 let lit = col_to_lit_int.get(left_idx).copied()?;
1895 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
1896 return None;
1897 }
1898 ops.push(WriteOp::GenMulAddParamI64 {
1899 param_idx: p,
1900 mul: 1,
1901 add: lit,
1902 off: gen_off,
1903 bitmap_byte_off,
1904 bitmap_bit_mask,
1905 });
1906 }
1907 (None, None) => {
1908 let la = col_to_lit_int.get(left_idx).copied()?;
1909 let lb = col_to_lit_int.get(right_idx).copied()?;
1910 ops.push(WriteOp::LiteralI64 {
1911 value: la.wrapping_add(lb),
1912 off: gen_off,
1913 });
1914 }
1915 }
1916 }
1917 FastGenEval::IntColMulAdd {
1918 col_schema_idx,
1919 mul,
1920 add,
1921 } => {
1922 if let Some(p) = col_to_param.get(col_schema_idx).copied() {
1923 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
1924 return None;
1925 }
1926 ops.push(WriteOp::GenMulAddParamI64 {
1927 param_idx: p,
1928 mul: *mul,
1929 add: *add,
1930 off: gen_off,
1931 bitmap_byte_off,
1932 bitmap_bit_mask,
1933 });
1934 } else if let Some(lit) = col_to_lit_int.get(col_schema_idx).copied() {
1935 ops.push(WriteOp::LiteralI64 {
1936 value: lit.wrapping_mul(*mul).wrapping_add(*add),
1937 off: gen_off,
1938 });
1939 } else {
1940 return None;
1941 }
1942 }
1943 FastGenEval::None => return None,
1944 }
1945 }
1946
1947 Some(TrivialFastProgram {
1948 template: row_encoder.template.clone(),
1949 ops,
1950 pk_param,
1951 not_null_param_indices,
1952 })
1953}
1954
1955#[derive(Clone)]
1956pub(super) enum CompiledOnConflict {
1957 DoNothing {
1958 target: Option<ConflictKind>,
1959 },
1960 DoUpdate {
1961 target: ConflictKind,
1962 assignments: Vec<(usize, Expr)>,
1963 where_clause: Option<Expr>,
1964 fast_paths: Option<Vec<DoUpdateFastPath>>,
1965 },
1966}
1967
1968#[derive(Clone, Copy)]
1969pub(super) enum DoUpdateFastPath {
1970 IntAddConst { phys_idx: usize, delta: i64 },
1971}
1972
1973#[derive(Clone, Debug)]
1974pub(super) enum ConflictKind {
1975 PrimaryKey,
1976 UniqueIndex { index_idx: usize },
1977}
1978
1979fn resolve_conflict_target(target: &ConflictTarget, ts: &TableSchema) -> Result<ConflictKind> {
1980 match target {
1981 ConflictTarget::Columns(cols) => {
1982 let col_idx_set: Vec<u16> = cols
1983 .iter()
1984 .map(|name| {
1985 ts.column_index(name)
1986 .map(|i| i as u16)
1987 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
1988 })
1989 .collect::<Result<_>>()?;
1990 let pk_set = ts.primary_key_columns.clone();
1991 if set_equal(&col_idx_set, &pk_set) {
1992 return Ok(ConflictKind::PrimaryKey);
1993 }
1994 for (index_idx, idx) in ts.indices.iter().enumerate() {
1995 if idx.unique && set_equal(&col_idx_set, &idx.columns) {
1996 return Ok(ConflictKind::UniqueIndex { index_idx });
1997 }
1998 }
1999 Err(SqlError::Plan(
2000 "ON CONFLICT target does not match any unique constraint".into(),
2001 ))
2002 }
2003 ConflictTarget::Constraint(name) => {
2004 let lower = name.to_ascii_lowercase();
2005 for (index_idx, idx) in ts.indices.iter().enumerate() {
2006 if idx.name.eq_ignore_ascii_case(&lower) {
2007 if idx.unique {
2008 return Ok(ConflictKind::UniqueIndex { index_idx });
2009 }
2010 return Err(SqlError::Plan(format!(
2011 "ON CONFLICT ON CONSTRAINT '{name}' requires a unique index"
2012 )));
2013 }
2014 }
2015 Err(SqlError::Plan(format!(
2016 "unknown constraint '{name}'; primary keys cannot be referenced by name, use ON CONFLICT (col_list)"
2017 )))
2018 }
2019 }
2020}
2021
2022fn set_equal(a: &[u16], b: &[u16]) -> bool {
2023 if a.len() != b.len() {
2024 return false;
2025 }
2026 let mut a_sorted = a.to_vec();
2027 let mut b_sorted = b.to_vec();
2028 a_sorted.sort_unstable();
2029 b_sorted.sort_unstable();
2030 a_sorted == b_sorted
2031}
2032
2033pub(super) enum InsertRowOutcome {
2034 Inserted,
2035 Updated { old: Vec<Value>, new: Vec<Value> },
2036 Skipped,
2037}
2038
2039#[allow(clippy::too_many_arguments)]
2040#[inline]
2041pub(super) fn apply_insert_with_conflict(
2042 wtx: &mut WriteTxn<'_>,
2043 table_schema: &TableSchema,
2044 key_buf: &[u8],
2045 value_buf: &[u8],
2046 row: &[Value],
2047 pk_values: &[Value],
2048 on_conflict: &CompiledOnConflict,
2049 col_map: &ColumnMap,
2050 capture_returning: bool,
2051) -> Result<InsertRowOutcome> {
2052 let table_bytes = table_schema.name.as_bytes();
2053
2054 if let CompiledOnConflict::DoNothing { target } = on_conflict {
2055 let pk_target = matches!(target, None | Some(ConflictKind::PrimaryKey));
2056 if pk_target && table_schema.indices.is_empty() && table_schema.foreign_keys.is_empty() {
2057 let inserted = wtx
2058 .table_insert_if_absent(table_bytes, key_buf, value_buf)
2059 .map_err(SqlError::Storage)?;
2060 return Ok(if inserted {
2061 InsertRowOutcome::Inserted
2062 } else {
2063 InsertRowOutcome::Skipped
2064 });
2065 }
2066 }
2067
2068 if let CompiledOnConflict::DoUpdate {
2069 target: ConflictKind::PrimaryKey,
2070 assignments,
2071 where_clause,
2072 fast_paths,
2073 } = on_conflict
2074 {
2075 if can_fuse_do_update(table_schema, assignments) {
2076 return apply_do_update_fused(
2077 wtx,
2078 table_schema,
2079 table_bytes,
2080 key_buf,
2081 value_buf,
2082 row,
2083 assignments,
2084 where_clause.as_ref(),
2085 col_map,
2086 fast_paths.as_deref(),
2087 capture_returning,
2088 );
2089 }
2090 }
2091
2092 let primary_outcome = wtx
2093 .table_insert_or_fetch(table_bytes, key_buf, value_buf)
2094 .map_err(SqlError::Storage)?;
2095
2096 match primary_outcome {
2097 citadel_txn::write_txn::InsertOutcome::Inserted => {
2098 if table_schema.indices.is_empty() {
2099 return Ok(InsertRowOutcome::Inserted);
2100 }
2101 let mut inserted_keys: Vec<(usize, Vec<u8>)> = Vec::new();
2102 match insert_index_entries_or_fetch(
2103 wtx,
2104 table_schema,
2105 row,
2106 pk_values,
2107 &mut inserted_keys,
2108 )? {
2109 None => Ok(InsertRowOutcome::Inserted),
2110 Some(conflicting_idx) => {
2111 let matches_target =
2112 matches!(on_conflict, CompiledOnConflict::DoNothing { target: None })
2113 || matches!(
2114 on_conflict,
2115 CompiledOnConflict::DoNothing {
2116 target: Some(ConflictKind::UniqueIndex { index_idx }),
2117 } | CompiledOnConflict::DoUpdate {
2118 target: ConflictKind::UniqueIndex { index_idx },
2119 ..
2120 } if *index_idx == conflicting_idx
2121 );
2122 undo_partial_insert(wtx, table_schema, key_buf, &inserted_keys)?;
2123 if !matches_target {
2124 return Err(SqlError::UniqueViolation(
2125 table_schema.indices[conflicting_idx].name.clone(),
2126 ));
2127 }
2128 match on_conflict {
2129 CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2130 CompiledOnConflict::DoUpdate {
2131 assignments,
2132 where_clause,
2133 ..
2134 } => {
2135 let existing_pk =
2136 fetch_unique_index_pk(wtx, table_schema, conflicting_idx, row)?;
2137 apply_do_update(
2138 wtx,
2139 table_schema,
2140 &existing_pk,
2141 row,
2142 assignments,
2143 where_clause.as_ref(),
2144 col_map,
2145 capture_returning,
2146 )
2147 }
2148 }
2149 }
2150 }
2151 }
2152 citadel_txn::write_txn::InsertOutcome::Existed(old_bytes) => {
2153 let matches_target = matches!(
2154 on_conflict,
2155 CompiledOnConflict::DoNothing { target: None }
2156 | CompiledOnConflict::DoNothing {
2157 target: Some(ConflictKind::PrimaryKey),
2158 }
2159 | CompiledOnConflict::DoUpdate {
2160 target: ConflictKind::PrimaryKey,
2161 ..
2162 }
2163 );
2164 if !matches_target {
2165 return Err(SqlError::DuplicateKey);
2166 }
2167 match on_conflict {
2168 CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2169 CompiledOnConflict::DoUpdate {
2170 assignments,
2171 where_clause,
2172 ..
2173 } => {
2174 let old_row = decode_full_row(table_schema, key_buf, &old_bytes)?;
2175 apply_do_update_with_old_row(
2176 wtx,
2177 table_schema,
2178 key_buf,
2179 &old_row,
2180 row,
2181 assignments,
2182 where_clause.as_ref(),
2183 col_map,
2184 capture_returning,
2185 )
2186 }
2187 }
2188 }
2189 }
2190}
2191
2192#[inline]
2193fn apply_fast_path_patch(
2194 old_bytes: &[u8],
2195 fast_paths: &[DoUpdateFastPath],
2196) -> Result<UpsertAction> {
2197 UPSERT_SCRATCH.with(|slot| {
2198 let mut bufs = slot.borrow_mut();
2199 bufs.new_value_buf.clear();
2200 bufs.new_value_buf.extend_from_slice(old_bytes);
2201
2202 let mut patch_scratch: Vec<u8> = Vec::new();
2203
2204 for fp in fast_paths {
2205 match fp {
2206 DoUpdateFastPath::IntAddConst { phys_idx, delta } => {
2207 let decoded =
2208 crate::encoding::decode_columns(&bufs.new_value_buf, &[*phys_idx])?;
2209 let old_val = &decoded[0];
2210 let new_val = match old_val {
2211 Value::Integer(i) => Value::Integer(i.wrapping_add(*delta)),
2212 Value::Null => Value::Null,
2213 _ => {
2214 return Err(SqlError::TypeMismatch {
2215 expected: "INTEGER".into(),
2216 got: old_val.data_type().to_string(),
2217 });
2218 }
2219 };
2220 if !crate::encoding::patch_column_in_place(
2221 &mut bufs.new_value_buf,
2222 *phys_idx,
2223 &new_val,
2224 )? {
2225 patch_scratch.clear();
2226 crate::encoding::patch_row_column(
2227 &bufs.new_value_buf,
2228 *phys_idx,
2229 &new_val,
2230 &mut patch_scratch,
2231 )?;
2232 std::mem::swap(&mut bufs.new_value_buf, &mut patch_scratch);
2233 }
2234 }
2235 }
2236 }
2237
2238 if bufs.new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2239 return Err(SqlError::RowTooLarge {
2240 size: bufs.new_value_buf.len(),
2241 max: citadel_core::MAX_VALUE_SIZE,
2242 });
2243 }
2244
2245 Ok(UpsertAction::Replace(bufs.new_value_buf.clone()))
2246 })
2247}
2248
2249fn upsert_needs_row(oc: &CompiledOnConflict, ts: &TableSchema) -> bool {
2250 if !ts.indices.is_empty() {
2251 return true;
2252 }
2253 match oc {
2254 CompiledOnConflict::DoNothing { .. } => false,
2255 CompiledOnConflict::DoUpdate { fast_paths, .. } => fast_paths.is_none() || ts.has_checks(),
2256 }
2257}
2258
2259fn can_fuse_do_update(ts: &TableSchema, assignments: &[(usize, Expr)]) -> bool {
2260 if !ts.indices.is_empty() {
2261 return false;
2262 }
2263 if !ts.foreign_keys.is_empty() {
2264 return false;
2265 }
2266 if ts.columns.iter().any(|c| c.generated_kind.is_some()) {
2267 return false;
2268 }
2269 let pk = ts.pk_indices();
2270 !assignments.iter().any(|(ci, _)| pk.contains(ci))
2271}
2272
2273#[allow(clippy::too_many_arguments)]
2274#[inline]
2275fn apply_do_update_fused(
2276 wtx: &mut WriteTxn<'_>,
2277 table_schema: &TableSchema,
2278 table_bytes: &[u8],
2279 key_buf: &[u8],
2280 value_buf: &[u8],
2281 proposed_row: &[Value],
2282 assignments: &[(usize, Expr)],
2283 where_clause: Option<&Expr>,
2284 col_map: &ColumnMap,
2285 fast_paths: Option<&[DoUpdateFastPath]>,
2286 capture_returning: bool,
2287) -> Result<InsertRowOutcome> {
2288 let non_pk = table_schema.non_pk_indices();
2289 let enc_pos = table_schema.encoding_positions();
2290 let phys_count = table_schema.physical_non_pk_count();
2291 let dropped = table_schema.dropped_non_pk_slots();
2292 let has_checks = table_schema.has_checks();
2293 let has_fks = !table_schema.foreign_keys.is_empty();
2294
2295 let captured: std::cell::RefCell<Option<(Vec<Value>, Vec<Value>)>> =
2296 std::cell::RefCell::new(None);
2297
2298 let outcome =
2299 wtx.table_upsert_with::<_, SqlError>(table_bytes, key_buf, value_buf, |old_bytes| {
2300 if let Some(fps) = fast_paths {
2301 if !has_checks {
2302 let action = apply_fast_path_patch(old_bytes, fps)?;
2303 if capture_returning {
2304 if let UpsertAction::Replace(ref new_bytes) = action {
2305 let old_row = decode_full_row(table_schema, key_buf, old_bytes)?;
2306 let new_row = decode_full_row(table_schema, key_buf, new_bytes)?;
2307 *captured.borrow_mut() = Some((old_row, new_row));
2308 }
2309 }
2310 return Ok(action);
2311 }
2312 }
2313 UPSERT_SCRATCH.with(|slot| {
2314 let mut bufs = slot.borrow_mut();
2315 let UpsertBufs {
2316 old_row,
2317 new_row,
2318 value_values,
2319 new_value_buf,
2320 } = &mut *bufs;
2321
2322 old_row.clear();
2323 old_row.resize(table_schema.columns.len(), Value::Null);
2324 decode_full_row_into(table_schema, key_buf, old_bytes, old_row)?;
2325
2326 if let Some(w) = where_clause {
2327 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2328 let result = eval_expr(w, &ctx)?;
2329 if result.is_null() || !is_truthy(&result) {
2330 return Ok(UpsertAction::Skip);
2331 }
2332 }
2333
2334 new_row.clear();
2335 new_row.extend_from_slice(old_row);
2336 for (col_idx, expr) in assignments {
2337 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2338 let val = eval_expr(expr, &ctx)?;
2339 let col = &table_schema.columns[*col_idx];
2340 new_row[*col_idx] = if val.is_null() {
2341 Value::Null
2342 } else {
2343 let got = val.data_type();
2344 val.coerce_into(col.data_type)
2345 .ok_or_else(|| SqlError::TypeMismatch {
2346 expected: col.data_type.to_string(),
2347 got: got.to_string(),
2348 })?
2349 };
2350 }
2351
2352 for (assigned_idx, _) in assignments {
2353 let col = &table_schema.columns[*assigned_idx];
2354 if !col.nullable && new_row[col.position as usize].is_null() {
2355 return Err(SqlError::NotNullViolation(col.name.clone()));
2356 }
2357 }
2358 if has_checks {
2359 for col in &table_schema.columns {
2360 if let Some(ref check) = col.check_expr {
2361 let ctx = EvalCtx::new(col_map, new_row);
2362 let result = eval_expr(check, &ctx)?;
2363 if !is_truthy(&result) && !result.is_null() {
2364 let name = col.check_name.as_deref().unwrap_or(&col.name);
2365 return Err(SqlError::CheckViolation(name.to_string()));
2366 }
2367 }
2368 }
2369 for tc in &table_schema.check_constraints {
2370 let ctx = EvalCtx::new(col_map, new_row);
2371 let result = eval_expr(&tc.expr, &ctx)?;
2372 if !is_truthy(&result) && !result.is_null() {
2373 let name = tc.name.as_deref().unwrap_or(&tc.sql);
2374 return Err(SqlError::CheckViolation(name.to_string()));
2375 }
2376 }
2377 }
2378 let _ = has_fks;
2379
2380 value_values.clear();
2381 value_values.resize(phys_count, Value::Null);
2382 for &slot in dropped {
2383 value_values[slot as usize] = Value::Null;
2384 }
2385 for (j, &i) in non_pk.iter().enumerate() {
2386 value_values[enc_pos[j] as usize] = new_row[i].clone();
2387 }
2388 new_value_buf.clear();
2389 crate::encoding::encode_row_into(value_values, new_value_buf);
2390
2391 if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2392 return Err(SqlError::RowTooLarge {
2393 size: new_value_buf.len(),
2394 max: citadel_core::MAX_VALUE_SIZE,
2395 });
2396 }
2397
2398 if capture_returning {
2399 *captured.borrow_mut() = Some((old_row.clone(), new_row.clone()));
2400 }
2401 Ok(UpsertAction::Replace(new_value_buf.clone()))
2402 })
2403 })?;
2404
2405 match outcome {
2406 UpsertOutcome::Inserted => Ok(InsertRowOutcome::Inserted),
2407 UpsertOutcome::Updated => {
2408 if capture_returning {
2409 let (old, new) = captured.into_inner().ok_or_else(|| {
2410 SqlError::InvalidValue("DO UPDATE produced no captured rows".into())
2411 })?;
2412 Ok(InsertRowOutcome::Updated { old, new })
2413 } else {
2414 Ok(InsertRowOutcome::Inserted)
2415 }
2416 }
2417 UpsertOutcome::Skipped => Ok(InsertRowOutcome::Skipped),
2418 }
2419}
2420
2421fn fetch_unique_index_pk(
2422 wtx: &mut WriteTxn<'_>,
2423 table_schema: &TableSchema,
2424 index_idx: usize,
2425 row: &[Value],
2426) -> Result<Vec<u8>> {
2427 let idx = &table_schema.indices[index_idx];
2428 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2429 let indexed: Vec<Value> = idx
2430 .columns
2431 .iter()
2432 .map(|&col_idx| row[col_idx as usize].clone())
2433 .collect();
2434 let key = crate::encoding::encode_composite_key(&indexed);
2435 let value = wtx
2436 .table_get(&idx_table, &key)
2437 .map_err(SqlError::Storage)?
2438 .ok_or_else(|| {
2439 SqlError::InvalidValue("unique index missing expected collision entry".into())
2440 })?;
2441 Ok(value)
2442}
2443
2444#[allow(clippy::too_many_arguments)]
2445fn apply_do_update(
2446 wtx: &mut WriteTxn<'_>,
2447 table_schema: &TableSchema,
2448 pk_key: &[u8],
2449 proposed_row: &[Value],
2450 assignments: &[(usize, Expr)],
2451 where_clause: Option<&Expr>,
2452 col_map: &ColumnMap,
2453 capture_returning: bool,
2454) -> Result<InsertRowOutcome> {
2455 let old_value = wtx
2456 .table_get(table_schema.name.as_bytes(), pk_key)
2457 .map_err(SqlError::Storage)?
2458 .ok_or_else(|| SqlError::InvalidValue("primary row missing for DO UPDATE target".into()))?;
2459 let old_row = decode_full_row(table_schema, pk_key, &old_value)?;
2460 apply_do_update_with_old_row(
2461 wtx,
2462 table_schema,
2463 pk_key,
2464 &old_row,
2465 proposed_row,
2466 assignments,
2467 where_clause,
2468 col_map,
2469 capture_returning,
2470 )
2471}
2472
2473#[allow(clippy::too_many_arguments)]
2474fn apply_do_update_with_old_row(
2475 wtx: &mut WriteTxn<'_>,
2476 table_schema: &TableSchema,
2477 old_pk_key: &[u8],
2478 old_row: &[Value],
2479 proposed_row: &[Value],
2480 assignments: &[(usize, Expr)],
2481 where_clause: Option<&Expr>,
2482 col_map: &ColumnMap,
2483 capture_returning: bool,
2484) -> Result<InsertRowOutcome> {
2485 if let Some(w) = where_clause {
2486 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2487 let result = eval_expr(w, &ctx)?;
2488 if result.is_null() || !is_truthy(&result) {
2489 return Ok(InsertRowOutcome::Skipped);
2490 }
2491 }
2492
2493 let mut new_row = old_row.to_vec();
2494 for (col_idx, expr) in assignments {
2495 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2496 let val = eval_expr(expr, &ctx)?;
2497 let col = &table_schema.columns[*col_idx];
2498 new_row[*col_idx] = if val.is_null() {
2499 Value::Null
2500 } else {
2501 let got = val.data_type();
2502 val.coerce_into(col.data_type)
2503 .ok_or_else(|| SqlError::TypeMismatch {
2504 expected: col.data_type.to_string(),
2505 got: got.to_string(),
2506 })?
2507 };
2508 }
2509
2510 for col in &table_schema.columns {
2511 if matches!(
2512 col.generated_kind,
2513 Some(crate::parser::GeneratedKind::Stored)
2514 ) {
2515 let val = eval_expr(
2516 col.generated_expr.as_ref().unwrap(),
2517 &EvalCtx::new(col_map, &new_row),
2518 )?;
2519 let pos = col.position as usize;
2520 new_row[pos] = if val.is_null() {
2521 if !col.nullable {
2522 return Err(SqlError::NotNullViolation(col.name.clone()));
2523 }
2524 Value::Null
2525 } else {
2526 let got = val.data_type();
2527 val.coerce_into(col.data_type)
2528 .ok_or_else(|| SqlError::TypeMismatch {
2529 expected: col.data_type.to_string(),
2530 got: got.to_string(),
2531 })?
2532 };
2533 }
2534 }
2535
2536 let pk_indices = table_schema.pk_indices();
2537 let assigned_pk = assignments.iter().any(|(ci, _)| pk_indices.contains(ci));
2538 let pk_changed = assigned_pk && pk_indices.iter().any(|&i| old_row[i] != new_row[i]);
2539
2540 for (assigned_idx, _) in assignments {
2541 let col = &table_schema.columns[*assigned_idx];
2542 if !col.nullable && new_row[col.position as usize].is_null() {
2543 return Err(SqlError::NotNullViolation(col.name.clone()));
2544 }
2545 }
2546 if table_schema.has_checks() {
2547 for col in &table_schema.columns {
2548 if let Some(ref check) = col.check_expr {
2549 let ctx = EvalCtx::new(col_map, &new_row);
2550 let result = eval_expr(check, &ctx)?;
2551 if !is_truthy(&result) && !result.is_null() {
2552 let name = col.check_name.as_deref().unwrap_or(&col.name);
2553 return Err(SqlError::CheckViolation(name.to_string()));
2554 }
2555 }
2556 }
2557 for tc in &table_schema.check_constraints {
2558 let ctx = EvalCtx::new(col_map, &new_row);
2559 let result = eval_expr(&tc.expr, &ctx)?;
2560 if !is_truthy(&result) && !result.is_null() {
2561 let name = tc.name.as_deref().unwrap_or(&tc.sql);
2562 return Err(SqlError::CheckViolation(name.to_string()));
2563 }
2564 }
2565 }
2566 for fk in &table_schema.foreign_keys {
2567 let changed = fk
2568 .columns
2569 .iter()
2570 .any(|&ci| old_row[ci as usize] != new_row[ci as usize]);
2571 if !changed {
2572 continue;
2573 }
2574 let any_null = fk.columns.iter().any(|&ci| new_row[ci as usize].is_null());
2575 if any_null {
2576 continue;
2577 }
2578 let fk_vals: Vec<Value> = fk
2579 .columns
2580 .iter()
2581 .map(|&ci| new_row[ci as usize].clone())
2582 .collect();
2583 let fk_key = crate::encoding::encode_composite_key(&fk_vals);
2584 let found = wtx
2585 .table_get(fk.foreign_table.as_bytes(), &fk_key)
2586 .map_err(SqlError::Storage)?;
2587 if found.is_none() {
2588 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
2589 return Err(SqlError::ForeignKeyViolation(name.to_string()));
2590 }
2591 }
2592
2593 let has_indices = !table_schema.indices.is_empty();
2594 let old_pk_values: Vec<Value> = if has_indices || pk_changed {
2595 pk_indices.iter().map(|&i| old_row[i].clone()).collect()
2596 } else {
2597 Vec::new()
2598 };
2599 let new_pk_values: Vec<Value> = if has_indices || pk_changed {
2600 pk_indices.iter().map(|&i| new_row[i].clone()).collect()
2601 } else {
2602 Vec::new()
2603 };
2604
2605 let non_pk = table_schema.non_pk_indices();
2606 let enc_pos = table_schema.encoding_positions();
2607 let phys_count = table_schema.physical_non_pk_count();
2608 let dropped = table_schema.dropped_non_pk_slots();
2609 let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
2610 for &slot in dropped {
2611 value_values[slot as usize] = Value::Null;
2612 }
2613 for (j, &i) in non_pk.iter().enumerate() {
2614 let col = &table_schema.columns[i];
2615 value_values[enc_pos[j] as usize] = if matches!(
2616 col.generated_kind,
2617 Some(crate::parser::GeneratedKind::Virtual)
2618 ) {
2619 Value::Null
2620 } else {
2621 new_row[i].clone()
2622 };
2623 }
2624 let mut new_value_buf = Vec::with_capacity(256);
2625 crate::encoding::encode_row_into(&value_values, &mut new_value_buf);
2626
2627 if new_value_buf.len() > citadel_core::MAX_VALUE_SIZE {
2628 return Err(SqlError::RowTooLarge {
2629 size: new_value_buf.len(),
2630 max: citadel_core::MAX_VALUE_SIZE,
2631 });
2632 }
2633
2634 if pk_changed {
2635 let new_pk_key = crate::encoding::encode_composite_key(&new_pk_values);
2636 let inserted = wtx
2637 .table_insert(table_schema.name.as_bytes(), &new_pk_key, &new_value_buf)
2638 .map_err(SqlError::Storage)?;
2639 if !inserted {
2640 return Err(SqlError::DuplicateKey);
2641 }
2642 wtx.table_delete(table_schema.name.as_bytes(), old_pk_key)
2643 .map_err(SqlError::Storage)?;
2644 for idx in &table_schema.indices {
2645 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2646 let old_idx_key = encode_index_key(idx, old_row, &old_pk_values);
2647 wtx.table_delete(&idx_table, &old_idx_key)
2648 .map_err(SqlError::Storage)?;
2649 let new_idx_key = encode_index_key(idx, &new_row, &new_pk_values);
2650 let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
2651 let is_new = wtx
2652 .table_insert(&idx_table, &new_idx_key, &new_idx_val)
2653 .map_err(SqlError::Storage)?;
2654 if idx.unique && !is_new {
2655 let any_null = idx.columns.iter().any(|&c| new_row[c as usize].is_null());
2656 if !any_null {
2657 return Err(SqlError::UniqueViolation(idx.name.clone()));
2658 }
2659 }
2660 }
2661 } else {
2662 wtx.table_update_sorted(
2663 table_schema.name.as_bytes(),
2664 &[(old_pk_key, new_value_buf.as_slice())],
2665 )
2666 .map_err(SqlError::Storage)?;
2667 let col_map_partial =
2668 any_partial_index(table_schema).then(|| ColumnMap::new(&table_schema.columns));
2669 for idx in &table_schema.indices {
2670 let cols_changed = index_columns_changed(idx, old_row, &new_row);
2671 let (del, ins) = partial_idx_update_actions(
2672 idx,
2673 old_row,
2674 &new_row,
2675 cols_changed,
2676 false,
2677 col_map_partial.as_ref(),
2678 );
2679 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2680 if del {
2681 let old_idx_key = encode_index_key(idx, old_row, &old_pk_values);
2682 wtx.table_delete(&idx_table, &old_idx_key)
2683 .map_err(SqlError::Storage)?;
2684 }
2685 if ins {
2686 let new_idx_key = encode_index_key(idx, &new_row, &new_pk_values);
2687 let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
2688 let is_new = wtx
2689 .table_insert(&idx_table, &new_idx_key, &new_idx_val)
2690 .map_err(SqlError::Storage)?;
2691 if idx.unique && !is_new {
2692 let any_null = idx.columns.iter().any(|&c| new_row[c as usize].is_null());
2693 if !any_null {
2694 return Err(SqlError::UniqueViolation(idx.name.clone()));
2695 }
2696 }
2697 }
2698 }
2699 }
2700
2701 if capture_returning {
2702 Ok(InsertRowOutcome::Updated {
2703 old: old_row.to_vec(),
2704 new: new_row,
2705 })
2706 } else {
2707 Ok(InsertRowOutcome::Inserted)
2708 }
2709}
2710
2711fn detect_fast_paths(
2712 ts: &TableSchema,
2713 assignments: &[(usize, Expr)],
2714) -> Option<Vec<DoUpdateFastPath>> {
2715 let non_pk = ts.non_pk_indices();
2716 let enc_pos = ts.encoding_positions();
2717 let mut out = Vec::with_capacity(assignments.len());
2718 for (col_idx, expr) in assignments {
2719 let col = &ts.columns[*col_idx];
2720 if col.data_type != DataType::Integer {
2721 return None;
2722 }
2723 let nonpk_order = non_pk.iter().position(|&i| i == *col_idx)?;
2724 let phys_idx = enc_pos[nonpk_order] as usize;
2725
2726 if let Expr::BinaryOp { left, op, right } = expr {
2727 if !matches!(op, BinOp::Add | BinOp::Sub) {
2728 return None;
2729 }
2730 let reads_target =
2731 matches!(left.as_ref(), Expr::Column(n) if n.eq_ignore_ascii_case(&col.name));
2732 if !reads_target {
2733 return None;
2734 }
2735 if let Expr::Literal(Value::Integer(n)) = right.as_ref() {
2736 let delta = if matches!(op, BinOp::Sub) { -n } else { *n };
2737 let _ = col_idx;
2738 out.push(DoUpdateFastPath::IntAddConst { phys_idx, delta });
2739 continue;
2740 }
2741 return None;
2742 }
2743 return None;
2744 }
2745 Some(out)
2746}
2747
2748fn compile_on_conflict(oc: &OnConflictClause, ts: &TableSchema) -> Result<CompiledOnConflict> {
2749 let target = oc
2750 .target
2751 .as_ref()
2752 .map(|t| resolve_conflict_target(t, ts))
2753 .transpose()?;
2754 match &oc.action {
2755 OnConflictAction::DoNothing => Ok(CompiledOnConflict::DoNothing { target }),
2756 OnConflictAction::DoUpdate {
2757 assignments,
2758 where_clause,
2759 } => {
2760 let target = target.ok_or_else(|| {
2761 SqlError::Plan("ON CONFLICT without target requires DO NOTHING".into())
2762 })?;
2763 let compiled_assignments: Vec<(usize, Expr)> = assignments
2764 .iter()
2765 .map(|(name, expr)| {
2766 let col_idx = ts
2767 .column_index(name)
2768 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?;
2769 Ok((col_idx, expr.clone()))
2770 })
2771 .collect::<Result<_>>()?;
2772 let fast_paths = if where_clause.is_none() {
2773 detect_fast_paths(ts, &compiled_assignments)
2774 } else {
2775 None
2776 };
2777 Ok(CompiledOnConflict::DoUpdate {
2778 target,
2779 assignments: compiled_assignments,
2780 where_clause: where_clause.clone(),
2781 fast_paths,
2782 })
2783 }
2784 }
2785}
2786
2787fn exec_insert_trivial_fast(
2789 wtx: &mut WriteTxn<'_>,
2790 table_lower: &str,
2791 cache: &InsertCache,
2792 bufs: &mut InsertBufs,
2793 params: &[Value],
2794) -> Result<ExecutionResult> {
2795 let prog = cache
2796 .trivial_fast_program
2797 .as_ref()
2798 .expect("trivial fast: program");
2799
2800 for &p in &prog.not_null_param_indices {
2801 if params[p as usize].is_null() {
2802 return Err(SqlError::NotNullViolation(format!("param@{p}")));
2803 }
2804 }
2805
2806 match ¶ms[prog.pk_param as usize] {
2807 Value::Integer(v) => crate::encoding::encode_int_key_into(*v, &mut bufs.key_buf),
2808 _ => return Err(SqlError::InvalidValue("non-integer PK in fast path".into())),
2809 }
2810
2811 bufs.value_buf.clear();
2812 bufs.value_buf.extend_from_slice(&prog.template);
2813
2814 for op in &prog.ops {
2815 match op {
2816 WriteOp::ParamI64 { param_idx, off } => match ¶ms[*param_idx as usize] {
2817 Value::Integer(v) => {
2818 let off = *off as usize;
2819 bufs.value_buf[off..off + 8].copy_from_slice(&v.to_le_bytes());
2820 }
2821 other => {
2822 return Err(SqlError::TypeMismatch {
2823 expected: "Integer".into(),
2824 got: other.data_type().to_string(),
2825 });
2826 }
2827 },
2828 WriteOp::LiteralI64 { value, off } => {
2829 let off = *off as usize;
2830 bufs.value_buf[off..off + 8].copy_from_slice(&value.to_le_bytes());
2831 }
2832 WriteOp::GenAddParamsI64 {
2833 a_param,
2834 b_param,
2835 off,
2836 bitmap_byte_off,
2837 bitmap_bit_mask,
2838 } => match (¶ms[*a_param as usize], ¶ms[*b_param as usize]) {
2839 (Value::Integer(a), Value::Integer(b)) => {
2840 let off = *off as usize;
2841 bufs.value_buf[off..off + 8].copy_from_slice(&a.wrapping_add(*b).to_le_bytes());
2842 }
2843 _ => {
2844 bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
2845 }
2846 },
2847 WriteOp::GenMulAddParamI64 {
2848 param_idx,
2849 mul,
2850 add,
2851 off,
2852 bitmap_byte_off,
2853 bitmap_bit_mask,
2854 } => match ¶ms[*param_idx as usize] {
2855 Value::Integer(v) => {
2856 let r = v.wrapping_mul(*mul).wrapping_add(*add);
2857 let off = *off as usize;
2858 bufs.value_buf[off..off + 8].copy_from_slice(&r.to_le_bytes());
2859 }
2860 _ => {
2861 bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
2862 }
2863 },
2864 }
2865 }
2866
2867 let is_new = wtx
2868 .table_insert(table_lower.as_bytes(), &bufs.key_buf, &bufs.value_buf)
2869 .map_err(SqlError::Storage)?;
2870 if !is_new {
2871 return Err(SqlError::DuplicateKey);
2872 }
2873 Ok(ExecutionResult::RowsAffected(1))
2874}
2875
2876fn build_bind_plan(
2877 stmt: &InsertStmt,
2878 col_indices: &[usize],
2879 col_data_types: &[DataType],
2880) -> Option<Vec<BindAction>> {
2881 let rows = match &stmt.source {
2882 InsertSource::Values(rows) => rows,
2883 _ => return None,
2884 };
2885 if rows.len() != 1 {
2886 return None;
2887 }
2888 let value_row = &rows[0];
2889 if value_row.len() != col_indices.len() {
2890 return None;
2891 }
2892 let mut plan = Vec::with_capacity(value_row.len());
2893 for (i, expr) in value_row.iter().enumerate() {
2894 let col_idx = col_indices[i];
2895 let target = col_data_types[col_idx];
2896 match expr {
2897 Expr::Parameter(n) => {
2898 if *n == 0 {
2899 return None;
2900 }
2901 plan.push(BindAction::Param {
2902 param_idx: n - 1,
2903 col_idx,
2904 target,
2905 });
2906 }
2907 Expr::Literal(v) => plan.push(BindAction::Literal {
2908 value: v.clone(),
2909 col_idx,
2910 }),
2911 _ => return None,
2912 }
2913 }
2914 Some(plan)
2915}
2916
2917impl CompiledInsert {
2918 pub fn try_compile(schema: &SchemaManager, stmt: &InsertStmt) -> Option<Self> {
2919 let lower = stmt.table.to_ascii_lowercase();
2920 let cached = if let Some(ts) = schema.get(&lower) {
2921 let insert_columns: Vec<&str> = if stmt.columns.is_empty() {
2922 ts.columns.iter().map(|c| c.name.as_str()).collect()
2923 } else {
2924 stmt.columns.iter().map(|s| s.as_str()).collect()
2925 };
2926 let mut col_indices = Vec::with_capacity(insert_columns.len());
2927 for name in &insert_columns {
2928 col_indices.push(ts.column_index(name)?);
2929 }
2930 if col_indices
2931 .iter()
2932 .any(|&ci| ts.columns[ci].generated_kind.is_some())
2933 {
2934 return None;
2935 }
2936 let on_conflict = stmt
2937 .on_conflict
2938 .as_ref()
2939 .map(|oc| compile_on_conflict(oc, ts))
2940 .transpose()
2941 .ok()
2942 .flatten()
2943 .map(Arc::new);
2944 let generated_col_positions: Vec<usize> = ts
2945 .columns
2946 .iter()
2947 .enumerate()
2948 .filter_map(|(i, c)| {
2949 matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored))
2950 .then_some(i)
2951 })
2952 .collect();
2953 let generated_fast_evals: Vec<FastGenEval> = generated_col_positions
2954 .iter()
2955 .map(|&pos| {
2956 detect_fast_gen_eval(ts.columns[pos].generated_expr.as_ref().unwrap(), ts)
2957 })
2958 .collect();
2959 let row_col_map = if on_conflict.is_some() || !generated_col_positions.is_empty() {
2960 Some(ColumnMap::new(&ts.columns))
2961 } else {
2962 None
2963 };
2964 let pk_indices: Vec<usize> = ts.pk_indices().to_vec();
2965 let non_pk_indices: Vec<usize> = ts.non_pk_indices().to_vec();
2966 let encoding_positions: Vec<u16> = ts.encoding_positions().to_vec();
2967 let dropped_non_pk_slots: Vec<u16> = ts.dropped_non_pk_slots().to_vec();
2968 let phys_count = ts.physical_non_pk_count();
2969 let col_data_types: Vec<DataType> = ts.columns.iter().map(|c| c.data_type).collect();
2970 let single_int_pk =
2971 pk_indices.len() == 1 && ts.columns[pk_indices[0]].data_type == DataType::Integer;
2972 let not_null_indices: Vec<u16> = ts
2973 .columns
2974 .iter()
2975 .filter(|c| !c.nullable)
2976 .map(|c| c.position)
2977 .collect();
2978 let bind_plan = build_bind_plan(stmt, &col_indices, &col_data_types);
2979 let any_defaults_flag = ts.columns.iter().any(|c| c.default_expr.is_some());
2980 let row_fully_overwritten = if any_defaults_flag {
2981 false
2982 } else {
2983 let mut covered: rustc_hash::FxHashSet<usize> =
2984 col_indices.iter().copied().collect();
2985 covered.extend(generated_col_positions.iter().copied());
2986 for (j, &i) in non_pk_indices.iter().enumerate() {
2987 let _ = j;
2988 if matches!(
2989 ts.columns[i].generated_kind,
2990 Some(crate::parser::GeneratedKind::Virtual)
2991 ) {
2992 covered.insert(i);
2993 }
2994 }
2995 bind_plan.is_some() && covered.len() == ts.columns.len()
2996 };
2997 let has_fks = !ts.foreign_keys.is_empty();
2998 let has_indices = !ts.indices.is_empty();
2999 let mut non_virtual_pairs: Vec<(usize, usize)> = Vec::new();
3000 let mut null_value_slots: Vec<usize> =
3001 dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3002 for (j, &i) in non_pk_indices.iter().enumerate() {
3003 let slot = encoding_positions[j] as usize;
3004 if matches!(
3005 ts.columns[i].generated_kind,
3006 Some(crate::parser::GeneratedKind::Virtual)
3007 ) {
3008 null_value_slots.push(slot);
3009 } else {
3010 non_virtual_pairs.push((i, slot));
3011 }
3012 }
3013 let row_encoder = {
3014 let all_int_or_null = non_pk_indices.iter().enumerate().all(|(j, &i)| {
3015 let col = &ts.columns[i];
3016 if matches!(
3017 col.generated_kind,
3018 Some(crate::parser::GeneratedKind::Virtual)
3019 ) {
3020 true
3021 } else {
3022 col.data_type == DataType::Integer && encoding_positions[j] != u16::MAX
3023 }
3024 });
3025 if all_int_or_null {
3026 let mut null_slots: Vec<usize> =
3027 dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
3028 for (j, &i) in non_pk_indices.iter().enumerate() {
3029 if matches!(
3030 ts.columns[i].generated_kind,
3031 Some(crate::parser::GeneratedKind::Virtual)
3032 ) {
3033 null_slots.push(encoding_positions[j] as usize);
3034 }
3035 }
3036 Some(crate::encoding::build_int_row_template(
3037 phys_count,
3038 &null_slots,
3039 ))
3040 } else {
3041 None
3042 }
3043 };
3044 let is_trivial_fast_eligible = !insert_has_subquery(stmt)
3045 && !ts.columns.iter().any(|c| c.default_expr.is_some())
3046 && !ts.has_checks()
3047 && !has_fks
3048 && !has_indices
3049 && stmt.on_conflict.is_none()
3050 && stmt.returning.is_none()
3051 && bind_plan.is_some()
3052 && row_encoder.is_some()
3053 && row_fully_overwritten
3054 && single_int_pk
3055 && generated_fast_evals
3056 .iter()
3057 .all(|fe| !matches!(fe, FastGenEval::None));
3058 let trivial_fast_program = if is_trivial_fast_eligible {
3059 build_trivial_fast_program(
3060 bind_plan.as_ref().unwrap(),
3061 row_encoder.as_ref().unwrap(),
3062 &non_virtual_pairs,
3063 &generated_col_positions,
3064 &generated_fast_evals,
3065 &pk_indices,
3066 &ts.columns,
3067 )
3068 } else {
3069 None
3070 };
3071 let is_trivial_fast = trivial_fast_program.is_some();
3072 Some(InsertCache {
3073 col_indices,
3074 has_subquery: insert_has_subquery(stmt),
3075 any_defaults: ts.columns.iter().any(|c| c.default_expr.is_some()),
3076 has_checks: ts.has_checks(),
3077 on_conflict,
3078 row_col_map,
3079 generated_col_positions,
3080 generated_fast_evals,
3081 pk_indices,
3082 non_pk_indices,
3083 encoding_positions,
3084 dropped_non_pk_slots,
3085 phys_count,
3086 single_int_pk,
3087 not_null_indices,
3088 bind_plan,
3089 row_fully_overwritten,
3090 row_encoder,
3091 is_trivial_fast,
3092 trivial_fast_program,
3093 })
3094 } else if schema.get_view(&lower).is_some() {
3095 None
3096 } else {
3097 return None;
3098 };
3099 Some(Self {
3100 table_lower: lower,
3101 cached,
3102 })
3103 }
3104}
3105
3106impl CompiledPlan for CompiledInsert {
3107 fn execute(
3108 &self,
3109 db: &Database,
3110 schema: &SchemaManager,
3111 stmt: &Statement,
3112 params: &[Value],
3113 wtx: Option<&mut WriteTxn<'_>>,
3114 ) -> Result<ExecutionResult> {
3115 let ins = match stmt {
3116 Statement::Insert(i) => i,
3117 _ => {
3118 return Err(SqlError::Unsupported(
3119 "CompiledInsert received non-INSERT statement".into(),
3120 ))
3121 }
3122 };
3123 match wtx {
3124 None => exec_insert(db, schema, ins, params),
3125 Some(outer) => match self.cached.as_ref() {
3126 Some(c) if c.is_trivial_fast => with_insert_scratch(|bufs| {
3127 exec_insert_trivial_fast(outer, &self.table_lower, c, bufs, params)
3128 }),
3129 Some(c) => exec_insert_in_txn_cached(outer, schema, ins, params, c),
3130 None => exec_insert_in_txn(outer, schema, ins, params),
3131 },
3132 }
3133 }
3134
3135 fn uses_scoped_params(&self) -> bool {
3136 match self.cached.as_ref() {
3137 Some(c) => !c.is_trivial_fast,
3138 None => true,
3139 }
3140 }
3141}
3142
3143pub struct CompiledDelete {
3144 table_lower: String,
3145}
3146
3147impl CompiledDelete {
3148 pub fn try_compile(schema: &SchemaManager, stmt: &DeleteStmt) -> Option<Self> {
3149 let lower = stmt.table.to_ascii_lowercase();
3150 schema.get(&lower)?;
3151 Some(Self { table_lower: lower })
3152 }
3153}
3154
3155impl CompiledPlan for CompiledDelete {
3156 fn execute(
3157 &self,
3158 db: &Database,
3159 schema: &SchemaManager,
3160 stmt: &Statement,
3161 _params: &[Value],
3162 wtx: Option<&mut WriteTxn<'_>>,
3163 ) -> Result<ExecutionResult> {
3164 let del = match stmt {
3165 Statement::Delete(d) => d,
3166 _ => {
3167 return Err(SqlError::Unsupported(
3168 "CompiledDelete received non-DELETE statement".into(),
3169 ))
3170 }
3171 };
3172 let _ = &self.table_lower;
3173 match wtx {
3174 None => super::write::exec_delete(db, schema, del),
3175 Some(outer) => super::write::exec_delete_in_txn(outer, schema, del),
3176 }
3177 }
3178}