1use std::cell::RefCell;
2use std::sync::Arc;
3
4use citadel::Database;
5use citadel_buffer::btree::{UpsertAction, UpsertOutcome};
6use citadel_txn::write_txn::WriteTxn;
7use rustc_hash::FxHashMap;
8
9use crate::encoding::{encode_composite_key_into, encode_row_into};
10use crate::error::{Result, SqlError};
11use crate::eval::{eval_expr, is_truthy, ColumnMap, EvalCtx};
12use crate::parser::*;
13use crate::types::*;
14
15use crate::schema::SchemaManager;
16
17use super::compile::CompiledPlan;
18use super::helpers::*;
19use super::CteContext;
20
21pub(super) fn exec_insert(
22 db: &Database,
23 schema: &SchemaManager,
24 stmt: &InsertStmt,
25 params: &[Value],
26) -> Result<ExecutionResult> {
27 let empty_ctes = CteContext::default();
28 let materialized;
29 let stmt = if insert_has_subquery(stmt) {
30 materialized = materialize_insert(stmt, &mut |sub| {
31 exec_subquery_read(db, schema, sub, &empty_ctes)
32 })?;
33 &materialized
34 } else {
35 stmt
36 };
37
38 let lower_name = stmt.table.to_ascii_lowercase();
39 if schema.get_view(&lower_name).is_some() {
40 return Err(SqlError::CannotModifyView(stmt.table.clone()));
41 }
42 let table_schema = schema
43 .get(&lower_name)
44 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
45
46 let insert_columns = if stmt.columns.is_empty() {
47 table_schema
48 .columns
49 .iter()
50 .map(|c| c.name.clone())
51 .collect::<Vec<_>>()
52 } else {
53 stmt.columns
54 .iter()
55 .map(|c| c.to_ascii_lowercase())
56 .collect()
57 };
58
59 let col_indices: Vec<usize> = insert_columns
60 .iter()
61 .map(|name| {
62 table_schema
63 .column_index(name)
64 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
65 })
66 .collect::<Result<_>>()?;
67
68 for &ci in &col_indices {
69 if table_schema.columns[ci].generated_kind.is_some() {
70 return Err(SqlError::CannotInsertIntoGeneratedColumn(
71 table_schema.columns[ci].name.clone(),
72 ));
73 }
74 }
75
76 let defaults: Vec<(usize, &Expr)> = table_schema
77 .columns
78 .iter()
79 .filter(|c| c.default_expr.is_some() && !col_indices.contains(&(c.position as usize)))
80 .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
81 .collect();
82
83 let generated_cols: Vec<(usize, &Expr)> = table_schema
84 .columns
85 .iter()
86 .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
87 .map(|c| (c.position as usize, c.generated_expr.as_ref().unwrap()))
88 .collect();
89
90 let has_checks = table_schema.has_checks();
91 let row_col_map_for_gen = if !generated_cols.is_empty() {
92 Some(ColumnMap::new(&table_schema.columns))
93 } else {
94 None
95 };
96 let check_col_map = if has_checks {
97 Some(ColumnMap::new(&table_schema.columns))
98 } else {
99 None
100 };
101
102 let select_rows = match &stmt.source {
103 InsertSource::Select(sq) => {
104 let insert_ctes =
105 super::materialize_all_ctes(&sq.ctes, sq.recursive, &mut |body, ctx| {
106 exec_query_body_read(db, schema, body, ctx)
107 })?;
108 let qr = exec_query_body_read(db, schema, &sq.body, &insert_ctes)?;
109 Some(qr.rows)
110 }
111 InsertSource::Values(_) => None,
112 };
113
114 let compiled_conflict: Option<Arc<CompiledOnConflict>> = stmt
115 .on_conflict
116 .as_ref()
117 .map(|oc| compile_on_conflict(oc, table_schema).map(Arc::new))
118 .transpose()?;
119
120 let row_col_map = compiled_conflict
121 .as_ref()
122 .map(|_| ColumnMap::new(&table_schema.columns));
123
124 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
125 let mut count: u64 = 0;
126 let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
127 stmt.returning.as_ref().map(|_| Vec::new());
128
129 let pk_indices = table_schema.pk_indices();
130 let non_pk = table_schema.non_pk_indices();
131 let enc_pos = table_schema.encoding_positions();
132 let phys_count = table_schema.physical_non_pk_count();
133 let mut row = vec![Value::Null; table_schema.columns.len()];
134 let mut pk_values: Vec<Value> = vec![Value::Null; pk_indices.len()];
135 let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
136 let mut key_buf: Vec<u8> = Vec::with_capacity(64);
137 let mut value_buf: Vec<u8> = Vec::with_capacity(256);
138 let mut fk_key_buf: Vec<u8> = Vec::with_capacity(64);
139
140 let values = match &stmt.source {
141 InsertSource::Values(rows) => Some(rows.as_slice()),
142 InsertSource::Select(_) => None,
143 };
144 let sel_rows = select_rows.as_deref();
145
146 let total = match (values, sel_rows) {
147 (Some(rows), _) => rows.len(),
148 (_, Some(rows)) => rows.len(),
149 _ => 0,
150 };
151
152 if let Some(sel) = sel_rows {
153 if !sel.is_empty() && sel[0].len() != insert_columns.len() {
154 return Err(SqlError::InvalidValue(format!(
155 "INSERT ... SELECT column count mismatch: expected {}, got {}",
156 insert_columns.len(),
157 sel[0].len()
158 )));
159 }
160 }
161
162 for idx in 0..total {
163 for v in row.iter_mut() {
164 *v = Value::Null;
165 }
166
167 if let Some(value_rows) = values {
168 let value_row = &value_rows[idx];
169 if value_row.len() != insert_columns.len() {
170 return Err(SqlError::InvalidValue(format!(
171 "expected {} values, got {}",
172 insert_columns.len(),
173 value_row.len()
174 )));
175 }
176 for (i, expr) in value_row.iter().enumerate() {
177 let val = if let Expr::Parameter(n) = expr {
178 params
179 .get(n - 1)
180 .cloned()
181 .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?
182 } else {
183 eval_const_expr(expr)?
184 };
185 let col_idx = col_indices[i];
186 let col = &table_schema.columns[col_idx];
187 let got_type = val.data_type();
188 row[col_idx] = if val.is_null() {
189 Value::Null
190 } else {
191 val.coerce_into(col.data_type)
192 .ok_or_else(|| SqlError::TypeMismatch {
193 expected: col.data_type.to_string(),
194 got: got_type.to_string(),
195 })?
196 };
197 }
198 } else if let Some(sel) = sel_rows {
199 let sel_row = &sel[idx];
200 for (i, val) in sel_row.iter().enumerate() {
201 let col_idx = col_indices[i];
202 let col = &table_schema.columns[col_idx];
203 let got_type = val.data_type();
204 row[col_idx] = if val.is_null() {
205 Value::Null
206 } else {
207 val.clone().coerce_into(col.data_type).ok_or_else(|| {
208 SqlError::TypeMismatch {
209 expected: col.data_type.to_string(),
210 got: got_type.to_string(),
211 }
212 })?
213 };
214 }
215 }
216
217 for &(pos, def_expr) in &defaults {
218 let val = eval_const_expr(def_expr)?;
219 let col = &table_schema.columns[pos];
220 if !val.is_null() {
221 let got_type = val.data_type();
222 row[pos] =
223 val.coerce_into(col.data_type)
224 .ok_or_else(|| SqlError::TypeMismatch {
225 expected: col.data_type.to_string(),
226 got: got_type.to_string(),
227 })?;
228 }
229 }
230
231 if let Some(ref gen_map) = row_col_map_for_gen {
232 for &(pos, gen_expr) in &generated_cols {
233 let val = eval_expr(gen_expr, &EvalCtx::new(gen_map, &row))?;
234 let col = &table_schema.columns[pos];
235 row[pos] = if val.is_null() {
236 Value::Null
237 } else {
238 let got_type = val.data_type();
239 val.coerce_into(col.data_type)
240 .ok_or_else(|| SqlError::TypeMismatch {
241 expected: col.data_type.to_string(),
242 got: got_type.to_string(),
243 })?
244 };
245 }
246 }
247
248 for col in &table_schema.columns {
249 if !col.nullable && row[col.position as usize].is_null() {
250 return Err(SqlError::NotNullViolation(col.name.clone()));
251 }
252 }
253
254 if let Some(ref col_map) = check_col_map {
255 for col in &table_schema.columns {
256 if let Some(ref check) = col.check_expr {
257 let result = eval_expr(check, &EvalCtx::new(col_map, &row))?;
258 if !is_truthy(&result) && !result.is_null() {
259 let name = col.check_name.as_deref().unwrap_or(&col.name);
260 return Err(SqlError::CheckViolation(name.to_string()));
261 }
262 }
263 }
264 for tc in &table_schema.check_constraints {
265 let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &row))?;
266 if !is_truthy(&result) && !result.is_null() {
267 let name = tc.name.as_deref().unwrap_or(&tc.sql);
268 return Err(SqlError::CheckViolation(name.to_string()));
269 }
270 }
271 }
272
273 for fk in &table_schema.foreign_keys {
274 let any_null = fk.columns.iter().any(|&ci| row[ci as usize].is_null());
275 if any_null {
276 continue; }
278 let fk_vals: Vec<Value> = fk
279 .columns
280 .iter()
281 .map(|&ci| row[ci as usize].clone())
282 .collect();
283 fk_key_buf.clear();
284 encode_composite_key_into(&fk_vals, &mut fk_key_buf);
285 let found = wtx
286 .table_get(fk.foreign_table.as_bytes(), &fk_key_buf)
287 .map_err(SqlError::Storage)?;
288 if found.is_none() {
289 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
290 return Err(SqlError::ForeignKeyViolation(name.to_string()));
291 }
292 }
293
294 let proposed_row_for_returning: Option<Vec<Value>> =
295 returning_rows.as_ref().map(|_| row.clone());
296
297 for (j, &i) in pk_indices.iter().enumerate() {
298 pk_values[j] = std::mem::replace(&mut row[i], Value::Null);
299 }
300 encode_composite_key_into(&pk_values, &mut key_buf);
301
302 for (j, &i) in non_pk.iter().enumerate() {
303 let col = &table_schema.columns[i];
304 if matches!(
305 col.generated_kind,
306 Some(crate::parser::GeneratedKind::Virtual)
307 ) {
308 value_values[enc_pos[j] as usize] = Value::Null;
309 row[i] = Value::Null;
310 } else {
311 value_values[enc_pos[j] as usize] = std::mem::replace(&mut row[i], Value::Null);
312 }
313 }
314 encode_row_into(&value_values, &mut value_buf);
315
316 if key_buf.len() > citadel_core::MAX_KEY_SIZE {
317 return Err(SqlError::KeyTooLarge {
318 size: key_buf.len(),
319 max: citadel_core::MAX_KEY_SIZE,
320 });
321 }
322 if value_buf.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
323 return Err(SqlError::RowTooLarge {
324 size: value_buf.len(),
325 max: citadel_core::MAX_INLINE_VALUE_SIZE,
326 });
327 }
328
329 match compiled_conflict.as_ref() {
330 None => {
331 let is_new = wtx
332 .table_insert(stmt.table.as_bytes(), &key_buf, &value_buf)
333 .map_err(SqlError::Storage)?;
334 if !is_new {
335 return Err(SqlError::DuplicateKey);
336 }
337 if !table_schema.indices.is_empty() {
338 for (j, &i) in pk_indices.iter().enumerate() {
339 row[i] = pk_values[j].clone();
340 }
341 for (j, &i) in non_pk.iter().enumerate() {
342 row[i] =
343 std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
344 }
345 insert_index_entries(&mut wtx, table_schema, &row, &pk_values)?;
346 }
347 count += 1;
348 if let Some(buf) = returning_rows.as_mut() {
349 buf.push((None, proposed_row_for_returning));
350 }
351 }
352 Some(oc) => {
353 let oc_ref: &CompiledOnConflict = oc;
354 let needs_row = upsert_needs_row(oc_ref, table_schema);
355 if needs_row {
356 for (j, &i) in pk_indices.iter().enumerate() {
357 row[i] = pk_values[j].clone();
358 }
359 for (j, &i) in non_pk.iter().enumerate() {
360 row[i] =
361 std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
362 }
363 }
364 let outcome = apply_insert_with_conflict(
365 &mut wtx,
366 table_schema,
367 &key_buf,
368 &value_buf,
369 &row,
370 &pk_values,
371 oc_ref,
372 row_col_map.as_ref().unwrap(),
373 stmt.returning.is_some(),
374 )?;
375 match outcome {
376 InsertRowOutcome::Inserted => {
377 count += 1;
378 if let Some(buf) = returning_rows.as_mut() {
379 buf.push((None, proposed_row_for_returning));
380 }
381 }
382 InsertRowOutcome::Updated { old, new } => {
383 count += 1;
384 if let Some(buf) = returning_rows.as_mut() {
385 buf.push((Some(old), Some(new)));
386 }
387 }
388 InsertRowOutcome::Skipped => {}
389 }
390 }
391 }
392 }
393
394 if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
395 let qr = super::helpers::project_returning(table_schema, returning_cols, &rows)?;
396 wtx.commit().map_err(SqlError::Storage)?;
397 return Ok(ExecutionResult::Query(qr));
398 }
399
400 wtx.commit().map_err(SqlError::Storage)?;
401 Ok(ExecutionResult::RowsAffected(count))
402}
403
404pub(super) fn has_subquery(expr: &Expr) -> bool {
405 crate::parser::has_subquery(expr)
406}
407
408pub(super) fn stmt_has_subquery(stmt: &SelectStmt) -> bool {
409 if let Some(ref w) = stmt.where_clause {
410 if has_subquery(w) {
411 return true;
412 }
413 }
414 if let Some(ref h) = stmt.having {
415 if has_subquery(h) {
416 return true;
417 }
418 }
419 for col in &stmt.columns {
420 if let SelectColumn::Expr { expr, .. } = col {
421 if has_subquery(expr) {
422 return true;
423 }
424 }
425 }
426 for ob in &stmt.order_by {
427 if has_subquery(&ob.expr) {
428 return true;
429 }
430 }
431 for join in &stmt.joins {
432 if let Some(ref on_expr) = join.on_clause {
433 if has_subquery(on_expr) {
434 return true;
435 }
436 }
437 }
438 false
439}
440
441pub(super) fn materialize_expr(
442 expr: &Expr,
443 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
444) -> Result<Expr> {
445 match expr {
446 Expr::InSubquery {
447 expr: e,
448 subquery,
449 negated,
450 } => {
451 let inner = materialize_expr(e, exec_sub)?;
452 let qr = exec_sub(subquery)?;
453 if !qr.columns.is_empty() && qr.columns.len() != 1 {
454 return Err(SqlError::SubqueryMultipleColumns);
455 }
456 let mut values = rustc_hash::FxHashSet::default();
457 let mut has_null = false;
458 for row in &qr.rows {
459 if row[0].is_null() {
460 has_null = true;
461 } else {
462 values.insert(row[0].clone());
463 }
464 }
465 Ok(Expr::InSet {
466 expr: Box::new(inner),
467 values,
468 has_null,
469 negated: *negated,
470 })
471 }
472 Expr::ScalarSubquery(subquery) => {
473 let qr = exec_sub(subquery)?;
474 if qr.rows.len() > 1 {
475 return Err(SqlError::SubqueryMultipleRows);
476 }
477 let val = if qr.rows.is_empty() {
478 Value::Null
479 } else {
480 qr.rows[0][0].clone()
481 };
482 Ok(Expr::Literal(val))
483 }
484 Expr::Exists { subquery, negated } => {
485 let qr = exec_sub(subquery)?;
486 let exists = !qr.rows.is_empty();
487 let result = if *negated { !exists } else { exists };
488 Ok(Expr::Literal(Value::Boolean(result)))
489 }
490 Expr::InList {
491 expr: e,
492 list,
493 negated,
494 } => {
495 let inner = materialize_expr(e, exec_sub)?;
496 let items = list
497 .iter()
498 .map(|item| materialize_expr(item, exec_sub))
499 .collect::<Result<Vec<_>>>()?;
500 Ok(Expr::InList {
501 expr: Box::new(inner),
502 list: items,
503 negated: *negated,
504 })
505 }
506 Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
507 left: Box::new(materialize_expr(left, exec_sub)?),
508 op: *op,
509 right: Box::new(materialize_expr(right, exec_sub)?),
510 }),
511 Expr::UnaryOp { op, expr: e } => Ok(Expr::UnaryOp {
512 op: *op,
513 expr: Box::new(materialize_expr(e, exec_sub)?),
514 }),
515 Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(materialize_expr(e, exec_sub)?))),
516 Expr::IsNotNull(e) => Ok(Expr::IsNotNull(Box::new(materialize_expr(e, exec_sub)?))),
517 Expr::InSet {
518 expr: e,
519 values,
520 has_null,
521 negated,
522 } => Ok(Expr::InSet {
523 expr: Box::new(materialize_expr(e, exec_sub)?),
524 values: values.clone(),
525 has_null: *has_null,
526 negated: *negated,
527 }),
528 Expr::Between {
529 expr: e,
530 low,
531 high,
532 negated,
533 } => Ok(Expr::Between {
534 expr: Box::new(materialize_expr(e, exec_sub)?),
535 low: Box::new(materialize_expr(low, exec_sub)?),
536 high: Box::new(materialize_expr(high, exec_sub)?),
537 negated: *negated,
538 }),
539 Expr::Like {
540 expr: e,
541 pattern,
542 escape,
543 negated,
544 } => {
545 let esc = escape
546 .as_ref()
547 .map(|es| materialize_expr(es, exec_sub).map(Box::new))
548 .transpose()?;
549 Ok(Expr::Like {
550 expr: Box::new(materialize_expr(e, exec_sub)?),
551 pattern: Box::new(materialize_expr(pattern, exec_sub)?),
552 escape: esc,
553 negated: *negated,
554 })
555 }
556 Expr::Case {
557 operand,
558 conditions,
559 else_result,
560 } => {
561 let op = operand
562 .as_ref()
563 .map(|e| materialize_expr(e, exec_sub).map(Box::new))
564 .transpose()?;
565 let conds = conditions
566 .iter()
567 .map(|(c, r)| {
568 Ok((
569 materialize_expr(c, exec_sub)?,
570 materialize_expr(r, exec_sub)?,
571 ))
572 })
573 .collect::<Result<Vec<_>>>()?;
574 let else_r = else_result
575 .as_ref()
576 .map(|e| materialize_expr(e, exec_sub).map(Box::new))
577 .transpose()?;
578 Ok(Expr::Case {
579 operand: op,
580 conditions: conds,
581 else_result: else_r,
582 })
583 }
584 Expr::Coalesce(args) => {
585 let materialized = args
586 .iter()
587 .map(|a| materialize_expr(a, exec_sub))
588 .collect::<Result<Vec<_>>>()?;
589 Ok(Expr::Coalesce(materialized))
590 }
591 Expr::Cast { expr: e, data_type } => Ok(Expr::Cast {
592 expr: Box::new(materialize_expr(e, exec_sub)?),
593 data_type: *data_type,
594 }),
595 Expr::Function {
596 name,
597 args,
598 distinct,
599 } => {
600 let materialized = args
601 .iter()
602 .map(|a| materialize_expr(a, exec_sub))
603 .collect::<Result<Vec<_>>>()?;
604 Ok(Expr::Function {
605 name: name.clone(),
606 args: materialized,
607 distinct: *distinct,
608 })
609 }
610 other => Ok(other.clone()),
611 }
612}
613
614pub(super) fn materialize_stmt(
615 stmt: &SelectStmt,
616 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
617) -> Result<SelectStmt> {
618 let where_clause = stmt
619 .where_clause
620 .as_ref()
621 .map(|e| materialize_expr(e, exec_sub))
622 .transpose()?;
623 let having = stmt
624 .having
625 .as_ref()
626 .map(|e| materialize_expr(e, exec_sub))
627 .transpose()?;
628 let columns = stmt
629 .columns
630 .iter()
631 .map(|c| match c {
632 SelectColumn::AllColumns => Ok(SelectColumn::AllColumns),
633 SelectColumn::AllFromOld => Ok(SelectColumn::AllFromOld),
634 SelectColumn::AllFromNew => Ok(SelectColumn::AllFromNew),
635 SelectColumn::Expr { expr, alias } => Ok(SelectColumn::Expr {
636 expr: materialize_expr(expr, exec_sub)?,
637 alias: alias.clone(),
638 }),
639 })
640 .collect::<Result<Vec<_>>>()?;
641 let order_by = stmt
642 .order_by
643 .iter()
644 .map(|ob| {
645 Ok(OrderByItem {
646 expr: materialize_expr(&ob.expr, exec_sub)?,
647 descending: ob.descending,
648 nulls_first: ob.nulls_first,
649 })
650 })
651 .collect::<Result<Vec<_>>>()?;
652 let joins = stmt
653 .joins
654 .iter()
655 .map(|j| {
656 let on_clause = j
657 .on_clause
658 .as_ref()
659 .map(|e| materialize_expr(e, exec_sub))
660 .transpose()?;
661 Ok(JoinClause {
662 join_type: j.join_type,
663 table: j.table.clone(),
664 on_clause,
665 })
666 })
667 .collect::<Result<Vec<_>>>()?;
668 let group_by = stmt
669 .group_by
670 .iter()
671 .map(|e| materialize_expr(e, exec_sub))
672 .collect::<Result<Vec<_>>>()?;
673 Ok(SelectStmt {
674 columns,
675 from: stmt.from.clone(),
676 from_alias: stmt.from_alias.clone(),
677 joins,
678 distinct: stmt.distinct,
679 where_clause,
680 order_by,
681 limit: stmt.limit.clone(),
682 offset: stmt.offset.clone(),
683 group_by,
684 having,
685 })
686}
687
688pub(super) fn exec_subquery_read(
689 db: &Database,
690 schema: &SchemaManager,
691 stmt: &SelectStmt,
692 ctes: &CteContext,
693) -> Result<QueryResult> {
694 match super::exec_select(db, schema, stmt, ctes)? {
695 ExecutionResult::Query(qr) => Ok(qr),
696 _ => Ok(QueryResult {
697 columns: vec![],
698 rows: vec![],
699 }),
700 }
701}
702
703pub(super) fn exec_subquery_write(
704 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
705 schema: &SchemaManager,
706 stmt: &SelectStmt,
707 ctes: &CteContext,
708) -> Result<QueryResult> {
709 match super::exec_select_in_txn(wtx, schema, stmt, ctes)? {
710 ExecutionResult::Query(qr) => Ok(qr),
711 _ => Ok(QueryResult {
712 columns: vec![],
713 rows: vec![],
714 }),
715 }
716}
717
718pub(super) fn update_has_subquery(stmt: &UpdateStmt) -> bool {
719 stmt.where_clause.as_ref().is_some_and(has_subquery)
720 || stmt.assignments.iter().any(|(_, e)| has_subquery(e))
721}
722
723pub(super) fn materialize_update(
724 stmt: &UpdateStmt,
725 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
726) -> Result<UpdateStmt> {
727 let where_clause = stmt
728 .where_clause
729 .as_ref()
730 .map(|e| materialize_expr(e, exec_sub))
731 .transpose()?;
732 let assignments = stmt
733 .assignments
734 .iter()
735 .map(|(name, expr)| Ok((name.clone(), materialize_expr(expr, exec_sub)?)))
736 .collect::<Result<Vec<_>>>()?;
737 Ok(UpdateStmt {
738 table: stmt.table.clone(),
739 assignments,
740 where_clause,
741 returning: stmt.returning.clone(),
742 })
743}
744
745pub(super) fn delete_has_subquery(stmt: &DeleteStmt) -> bool {
746 stmt.where_clause.as_ref().is_some_and(has_subquery)
747}
748
749pub(super) fn materialize_delete(
750 stmt: &DeleteStmt,
751 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
752) -> Result<DeleteStmt> {
753 let where_clause = stmt
754 .where_clause
755 .as_ref()
756 .map(|e| materialize_expr(e, exec_sub))
757 .transpose()?;
758 Ok(DeleteStmt {
759 table: stmt.table.clone(),
760 where_clause,
761 returning: stmt.returning.clone(),
762 })
763}
764
765pub(super) fn insert_has_subquery(stmt: &InsertStmt) -> bool {
766 match &stmt.source {
767 InsertSource::Values(rows) => rows.iter().any(|row| row.iter().any(has_subquery)),
768 InsertSource::Select(_) => false,
770 }
771}
772
773pub(super) fn materialize_insert(
774 stmt: &InsertStmt,
775 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
776) -> Result<InsertStmt> {
777 let source = match &stmt.source {
778 InsertSource::Values(rows) => {
779 let mat = rows
780 .iter()
781 .map(|row| {
782 row.iter()
783 .map(|e| materialize_expr(e, exec_sub))
784 .collect::<Result<Vec<_>>>()
785 })
786 .collect::<Result<Vec<_>>>()?;
787 InsertSource::Values(mat)
788 }
789 InsertSource::Select(sq) => {
790 let ctes = sq
791 .ctes
792 .iter()
793 .map(|c| {
794 Ok(CteDefinition {
795 name: c.name.clone(),
796 column_aliases: c.column_aliases.clone(),
797 body: materialize_query_body(&c.body, exec_sub)?,
798 })
799 })
800 .collect::<Result<Vec<_>>>()?;
801 let body = materialize_query_body(&sq.body, exec_sub)?;
802 InsertSource::Select(Box::new(SelectQuery {
803 ctes,
804 recursive: sq.recursive,
805 body,
806 }))
807 }
808 };
809 Ok(InsertStmt {
810 table: stmt.table.clone(),
811 columns: stmt.columns.clone(),
812 source,
813 on_conflict: stmt.on_conflict.clone(),
814 returning: stmt.returning.clone(),
815 })
816}
817
818pub(super) fn materialize_query_body(
819 body: &QueryBody,
820 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
821) -> Result<QueryBody> {
822 match body {
823 QueryBody::Select(sel) => Ok(QueryBody::Select(Box::new(materialize_stmt(
824 sel, exec_sub,
825 )?))),
826 QueryBody::Compound(comp) => Ok(QueryBody::Compound(Box::new(CompoundSelect {
827 op: comp.op.clone(),
828 all: comp.all,
829 left: Box::new(materialize_query_body(&comp.left, exec_sub)?),
830 right: Box::new(materialize_query_body(&comp.right, exec_sub)?),
831 order_by: comp.order_by.clone(),
832 limit: comp.limit.clone(),
833 offset: comp.offset.clone(),
834 }))),
835 }
836}
837
838pub(super) fn exec_query_body(
839 db: &Database,
840 schema: &SchemaManager,
841 body: &QueryBody,
842 ctes: &CteContext,
843) -> Result<ExecutionResult> {
844 match body {
845 QueryBody::Select(sel) => super::exec_select(db, schema, sel, ctes),
846 QueryBody::Compound(comp) => exec_compound_select(db, schema, comp, ctes),
847 }
848}
849
850pub(super) fn exec_query_body_in_txn(
851 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
852 schema: &SchemaManager,
853 body: &QueryBody,
854 ctes: &CteContext,
855) -> Result<ExecutionResult> {
856 match body {
857 QueryBody::Select(sel) => super::exec_select_in_txn(wtx, schema, sel, ctes),
858 QueryBody::Compound(comp) => exec_compound_select_in_txn(wtx, schema, comp, ctes),
859 }
860}
861
862pub(super) fn exec_query_body_read(
863 db: &Database,
864 schema: &SchemaManager,
865 body: &QueryBody,
866 ctes: &CteContext,
867) -> Result<QueryResult> {
868 match exec_query_body(db, schema, body, ctes)? {
869 ExecutionResult::Query(qr) => Ok(qr),
870 _ => Ok(QueryResult {
871 columns: vec![],
872 rows: vec![],
873 }),
874 }
875}
876
877pub(super) fn exec_query_body_write(
878 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
879 schema: &SchemaManager,
880 body: &QueryBody,
881 ctes: &CteContext,
882) -> Result<QueryResult> {
883 match exec_query_body_in_txn(wtx, schema, body, ctes)? {
884 ExecutionResult::Query(qr) => Ok(qr),
885 _ => Ok(QueryResult {
886 columns: vec![],
887 rows: vec![],
888 }),
889 }
890}
891
892pub(super) fn exec_compound_select(
893 db: &Database,
894 schema: &SchemaManager,
895 comp: &CompoundSelect,
896 ctes: &CteContext,
897) -> Result<ExecutionResult> {
898 let left_qr = match exec_query_body(db, schema, &comp.left, ctes)? {
899 ExecutionResult::Query(qr) => qr,
900 _ => QueryResult {
901 columns: vec![],
902 rows: vec![],
903 },
904 };
905 let right_qr = match exec_query_body(db, schema, &comp.right, ctes)? {
906 ExecutionResult::Query(qr) => qr,
907 _ => QueryResult {
908 columns: vec![],
909 rows: vec![],
910 },
911 };
912 apply_set_operation(comp, left_qr, right_qr)
913}
914
915pub(super) fn exec_compound_select_in_txn(
916 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
917 schema: &SchemaManager,
918 comp: &CompoundSelect,
919 ctes: &CteContext,
920) -> Result<ExecutionResult> {
921 let left_qr = match exec_query_body_in_txn(wtx, schema, &comp.left, ctes)? {
922 ExecutionResult::Query(qr) => qr,
923 _ => QueryResult {
924 columns: vec![],
925 rows: vec![],
926 },
927 };
928 let right_qr = match exec_query_body_in_txn(wtx, schema, &comp.right, ctes)? {
929 ExecutionResult::Query(qr) => qr,
930 _ => QueryResult {
931 columns: vec![],
932 rows: vec![],
933 },
934 };
935 apply_set_operation(comp, left_qr, right_qr)
936}
937
938pub(super) fn apply_set_operation(
939 comp: &CompoundSelect,
940 left_qr: QueryResult,
941 right_qr: QueryResult,
942) -> Result<ExecutionResult> {
943 if !left_qr.columns.is_empty()
944 && !right_qr.columns.is_empty()
945 && left_qr.columns.len() != right_qr.columns.len()
946 {
947 return Err(SqlError::CompoundColumnCountMismatch {
948 left: left_qr.columns.len(),
949 right: right_qr.columns.len(),
950 });
951 }
952
953 let columns = left_qr.columns;
954
955 let mut rows = match (&comp.op, comp.all) {
956 (SetOp::Union, true) => {
957 let mut rows = left_qr.rows;
958 rows.extend(right_qr.rows);
959 rows
960 }
961 (SetOp::Union, false) => {
962 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
963 let mut rows = Vec::new();
964 for row in left_qr.rows.into_iter().chain(right_qr.rows) {
965 if !seen.contains(&row) {
966 seen.insert(row.clone());
967 rows.push(row);
968 }
969 }
970 rows
971 }
972 (SetOp::Intersect, true) => {
973 let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
974 for row in &right_qr.rows {
975 *right_counts.entry(row.clone()).or_insert(0) += 1;
976 }
977 let mut rows = Vec::new();
978 for row in left_qr.rows {
979 if let Some(count) = right_counts.get_mut(&row) {
980 if *count > 0 {
981 *count -= 1;
982 rows.push(row);
983 }
984 }
985 }
986 rows
987 }
988 (SetOp::Intersect, false) => {
989 let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
990 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
991 let mut rows = Vec::new();
992 for row in left_qr.rows {
993 if right_set.contains(&row) && !seen.contains(&row) {
994 seen.insert(row.clone());
995 rows.push(row);
996 }
997 }
998 rows
999 }
1000 (SetOp::Except, true) => {
1001 let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
1002 for row in &right_qr.rows {
1003 *right_counts.entry(row.clone()).or_insert(0) += 1;
1004 }
1005 let mut rows = Vec::new();
1006 for row in left_qr.rows {
1007 if let Some(count) = right_counts.get_mut(&row) {
1008 if *count > 0 {
1009 *count -= 1;
1010 continue;
1011 }
1012 }
1013 rows.push(row);
1014 }
1015 rows
1016 }
1017 (SetOp::Except, false) => {
1018 let right_set: rustc_hash::FxHashSet<Vec<Value>> = right_qr.rows.into_iter().collect();
1019 let mut seen: rustc_hash::FxHashSet<Vec<Value>> = rustc_hash::FxHashSet::default();
1020 let mut rows = Vec::new();
1021 for row in left_qr.rows {
1022 if !right_set.contains(&row) && !seen.contains(&row) {
1023 seen.insert(row.clone());
1024 rows.push(row);
1025 }
1026 }
1027 rows
1028 }
1029 };
1030
1031 if !comp.order_by.is_empty() {
1032 let col_defs: Vec<crate::types::ColumnDef> = columns
1033 .iter()
1034 .enumerate()
1035 .map(|(i, name)| crate::types::ColumnDef {
1036 name: name.clone(),
1037 data_type: crate::types::DataType::Null,
1038 nullable: true,
1039 position: i as u16,
1040 default_expr: None,
1041 default_sql: None,
1042 check_expr: None,
1043 check_sql: None,
1044 check_name: None,
1045 is_with_timezone: false,
1046 generated_expr: None,
1047 generated_sql: None,
1048 generated_kind: None,
1049 })
1050 .collect();
1051 sort_rows(&mut rows, &comp.order_by, &col_defs)?;
1052 }
1053
1054 if let Some(ref offset_expr) = comp.offset {
1055 let offset = eval_const_int(offset_expr)?.max(0) as usize;
1056 if offset < rows.len() {
1057 rows = rows.split_off(offset);
1058 } else {
1059 rows.clear();
1060 }
1061 }
1062
1063 if let Some(ref limit_expr) = comp.limit {
1064 let limit = eval_const_int(limit_expr)?.max(0) as usize;
1065 rows.truncate(limit);
1066 }
1067
1068 Ok(ExecutionResult::Query(QueryResult { columns, rows }))
1069}
1070
1071struct InsertBufs {
1072 row: Vec<Value>,
1073 pk_values: Vec<Value>,
1074 value_values: Vec<Value>,
1075 key_buf: Vec<u8>,
1076 value_buf: Vec<u8>,
1077 col_indices: Vec<usize>,
1078 fk_key_buf: Vec<u8>,
1079}
1080
1081impl InsertBufs {
1082 fn new() -> Self {
1083 Self {
1084 row: Vec::new(),
1085 pk_values: Vec::new(),
1086 value_values: Vec::new(),
1087 key_buf: Vec::with_capacity(64),
1088 value_buf: Vec::with_capacity(256),
1089 col_indices: Vec::new(),
1090 fk_key_buf: Vec::with_capacity(64),
1091 }
1092 }
1093}
1094
1095thread_local! {
1096 static INSERT_SCRATCH: RefCell<InsertBufs> = RefCell::new(InsertBufs::new());
1097 static UPSERT_SCRATCH: RefCell<UpsertBufs> = RefCell::new(UpsertBufs::new());
1098}
1099
1100fn with_insert_scratch<R>(f: impl FnOnce(&mut InsertBufs) -> R) -> R {
1101 INSERT_SCRATCH.with(|slot| f(&mut slot.borrow_mut()))
1102}
1103
1104pub(super) struct UpsertBufs {
1105 old_row: Vec<Value>,
1106 new_row: Vec<Value>,
1107 value_values: Vec<Value>,
1108 new_value_buf: Vec<u8>,
1109}
1110
1111impl UpsertBufs {
1112 pub(super) fn new() -> Self {
1113 Self {
1114 old_row: Vec::new(),
1115 new_row: Vec::new(),
1116 value_values: Vec::new(),
1117 new_value_buf: Vec::with_capacity(256),
1118 }
1119 }
1120}
1121
1122pub fn exec_insert_in_txn(
1123 wtx: &mut WriteTxn<'_>,
1124 schema: &SchemaManager,
1125 stmt: &InsertStmt,
1126 params: &[Value],
1127) -> Result<ExecutionResult> {
1128 with_insert_scratch(|bufs| exec_insert_in_txn_impl(wtx, schema, stmt, params, bufs, None))
1129}
1130
1131fn exec_insert_in_txn_cached(
1132 wtx: &mut WriteTxn<'_>,
1133 schema: &SchemaManager,
1134 stmt: &InsertStmt,
1135 params: &[Value],
1136 cache: &InsertCache,
1137) -> Result<ExecutionResult> {
1138 with_insert_scratch(|bufs| {
1139 exec_insert_in_txn_impl(wtx, schema, stmt, params, bufs, Some(cache))
1140 })
1141}
1142
1143fn exec_insert_in_txn_impl(
1144 wtx: &mut WriteTxn<'_>,
1145 schema: &SchemaManager,
1146 stmt: &InsertStmt,
1147 params: &[Value],
1148 bufs: &mut InsertBufs,
1149 cache: Option<&InsertCache>,
1150) -> Result<ExecutionResult> {
1151 let empty_ctes = CteContext::default();
1152 let materialized;
1153 let has_sub = match cache {
1154 Some(c) => c.has_subquery,
1155 None => insert_has_subquery(stmt),
1156 };
1157 let stmt = if has_sub {
1158 materialized = materialize_insert(stmt, &mut |sub| {
1159 exec_subquery_write(wtx, schema, sub, &empty_ctes)
1160 })?;
1161 &materialized
1162 } else {
1163 stmt
1164 };
1165
1166 let table_schema = schema
1167 .get(&stmt.table)
1168 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1169
1170 let default_columns;
1171 let insert_columns: &[String] = if stmt.columns.is_empty() {
1172 default_columns = table_schema
1173 .columns
1174 .iter()
1175 .map(|c| c.name.clone())
1176 .collect::<Vec<_>>();
1177 &default_columns
1178 } else {
1179 &stmt.columns
1180 };
1181
1182 bufs.col_indices.clear();
1183 if let Some(c) = cache {
1184 bufs.col_indices.extend_from_slice(&c.col_indices);
1185 } else {
1186 for name in insert_columns {
1187 bufs.col_indices.push(
1188 table_schema
1189 .column_index(name)
1190 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?,
1191 );
1192 }
1193 }
1194
1195 if cache.is_none() {
1196 for &ci in &bufs.col_indices {
1197 if table_schema.columns[ci].generated_kind.is_some() {
1198 return Err(SqlError::CannotInsertIntoGeneratedColumn(
1199 table_schema.columns[ci].name.clone(),
1200 ));
1201 }
1202 }
1203 }
1204
1205 let generated_cols_uncached: Vec<(usize, &Expr, FastGenEval)>;
1206 let cached_gen_positions: &[usize];
1207 let cached_gen_fast_evals: &[FastGenEval];
1208 if let Some(c) = cache {
1209 cached_gen_positions = &c.generated_col_positions;
1210 cached_gen_fast_evals = &c.generated_fast_evals;
1211 generated_cols_uncached = Vec::new();
1212 } else {
1213 cached_gen_positions = &[];
1214 cached_gen_fast_evals = &[];
1215 generated_cols_uncached = table_schema
1216 .columns
1217 .iter()
1218 .filter(|c| matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored)))
1219 .map(|c| {
1220 let expr = c.generated_expr.as_ref().unwrap();
1221 let fe = detect_fast_gen_eval(expr, table_schema);
1222 (c.position as usize, expr, fe)
1223 })
1224 .collect();
1225 }
1226 let has_gen_cols = !cached_gen_positions.is_empty() || !generated_cols_uncached.is_empty();
1227 let row_col_map_for_gen_owned: Option<ColumnMap> = if !has_gen_cols || cache.is_some() {
1228 None
1229 } else {
1230 Some(ColumnMap::new(&table_schema.columns))
1231 };
1232 let row_col_map_for_gen: Option<&ColumnMap> = if !has_gen_cols {
1233 None
1234 } else if let Some(c) = cache {
1235 c.row_col_map.as_ref()
1236 } else {
1237 row_col_map_for_gen_owned.as_ref()
1238 };
1239
1240 let any_defaults = match cache {
1241 Some(c) => c.any_defaults,
1242 None => table_schema
1243 .columns
1244 .iter()
1245 .any(|c| c.default_expr.is_some()),
1246 };
1247 let defaults: Vec<(usize, &Expr)> = if any_defaults {
1248 table_schema
1249 .columns
1250 .iter()
1251 .filter(|c| {
1252 c.default_expr.is_some() && !bufs.col_indices.contains(&(c.position as usize))
1253 })
1254 .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
1255 .collect()
1256 } else {
1257 Vec::new()
1258 };
1259
1260 let has_checks = match cache {
1261 Some(c) => c.has_checks,
1262 None => table_schema.has_checks(),
1263 };
1264 let check_col_map = if has_checks {
1265 Some(ColumnMap::new(&table_schema.columns))
1266 } else {
1267 None
1268 };
1269
1270 let (pk_indices, non_pk, enc_pos, phys_count, dropped): (
1271 &[usize],
1272 &[usize],
1273 &[u16],
1274 usize,
1275 &[u16],
1276 ) = if let Some(c) = cache {
1277 (
1278 &c.pk_indices,
1279 &c.non_pk_indices,
1280 &c.encoding_positions,
1281 c.phys_count,
1282 &c.dropped_non_pk_slots,
1283 )
1284 } else {
1285 (
1286 table_schema.pk_indices(),
1287 table_schema.non_pk_indices(),
1288 table_schema.encoding_positions(),
1289 table_schema.physical_non_pk_count(),
1290 table_schema.dropped_non_pk_slots(),
1291 )
1292 };
1293
1294 bufs.row.resize(table_schema.columns.len(), Value::Null);
1295 bufs.pk_values.resize(pk_indices.len(), Value::Null);
1296 bufs.value_values.resize(phys_count, Value::Null);
1297
1298 let table_bytes = stmt.table.as_bytes();
1299 let has_fks = !table_schema.foreign_keys.is_empty();
1300 let has_indices = !table_schema.indices.is_empty();
1301 let has_defaults = !defaults.is_empty();
1302
1303 let compiled_conflict: Option<Arc<CompiledOnConflict>> = match (cache, &stmt.on_conflict) {
1304 (Some(c), Some(_)) if c.on_conflict.is_some() => c.on_conflict.clone(),
1305 (_, Some(oc)) => Some(Arc::new(compile_on_conflict(oc, table_schema)?)),
1306 (_, None) => None,
1307 };
1308
1309 let row_col_map_owned: Option<ColumnMap> =
1310 if compiled_conflict.is_some() && cache.and_then(|c| c.row_col_map.as_ref()).is_none() {
1311 Some(ColumnMap::new(&table_schema.columns))
1312 } else {
1313 None
1314 };
1315 let row_col_map: Option<&ColumnMap> = cache
1316 .and_then(|c| c.row_col_map.as_ref())
1317 .or(row_col_map_owned.as_ref());
1318
1319 let select_rows = match &stmt.source {
1320 InsertSource::Select(sq) => {
1321 let insert_ctes =
1322 super::materialize_all_ctes(&sq.ctes, sq.recursive, &mut |body, ctx| {
1323 exec_query_body_write(wtx, schema, body, ctx)
1324 })?;
1325 let qr = exec_query_body_write(wtx, schema, &sq.body, &insert_ctes)?;
1326 Some(qr.rows)
1327 }
1328 InsertSource::Values(_) => None,
1329 };
1330
1331 let mut count: u64 = 0;
1332 let mut returning_rows: Option<Vec<super::helpers::ReturningRow>> =
1333 stmt.returning.as_ref().map(|_| Vec::new());
1334
1335 let values = match &stmt.source {
1336 InsertSource::Values(rows) => Some(rows.as_slice()),
1337 InsertSource::Select(_) => None,
1338 };
1339 let sel_rows = select_rows.as_deref();
1340
1341 let total = match (values, sel_rows) {
1342 (Some(rows), _) => rows.len(),
1343 (_, Some(rows)) => rows.len(),
1344 _ => 0,
1345 };
1346
1347 if let Some(sel) = sel_rows {
1348 if !sel.is_empty() && sel[0].len() != insert_columns.len() {
1349 return Err(SqlError::InvalidValue(format!(
1350 "INSERT ... SELECT column count mismatch: expected {}, got {}",
1351 insert_columns.len(),
1352 sel[0].len()
1353 )));
1354 }
1355 }
1356
1357 let skip_row_clear = cache.is_some_and(|c| c.row_fully_overwritten);
1358 for idx in 0..total {
1359 if !skip_row_clear {
1360 for v in bufs.row.iter_mut() {
1361 *v = Value::Null;
1362 }
1363 }
1364
1365 if let Some(value_rows) = values {
1366 if let Some(plan) = cache.and_then(|c| c.bind_plan.as_ref()) {
1367 for action in plan {
1368 match action {
1369 BindAction::Param {
1370 param_idx,
1371 col_idx,
1372 target,
1373 } => {
1374 let v = ¶ms[*param_idx];
1375 bufs.row[*col_idx] = if v.is_null() {
1376 Value::Null
1377 } else if v.data_type() == *target {
1378 v.clone()
1379 } else {
1380 let got = v.data_type();
1381 v.clone().coerce_into(*target).ok_or_else(|| {
1382 SqlError::TypeMismatch {
1383 expected: target.to_string(),
1384 got: got.to_string(),
1385 }
1386 })?
1387 };
1388 }
1389 BindAction::Literal { value, col_idx } => {
1390 bufs.row[*col_idx] = value.clone();
1391 }
1392 }
1393 }
1394 } else {
1395 let value_row = &value_rows[idx];
1396 if value_row.len() != insert_columns.len() {
1397 return Err(SqlError::InvalidValue(format!(
1398 "expected {} values, got {}",
1399 insert_columns.len(),
1400 value_row.len()
1401 )));
1402 }
1403 for (i, expr) in value_row.iter().enumerate() {
1404 let val = match expr {
1405 Expr::Parameter(n) => params
1406 .get(n - 1)
1407 .cloned()
1408 .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
1409 Expr::Literal(v) => v.clone(),
1410 _ => eval_const_expr(expr)?,
1411 };
1412 let col_idx = bufs.col_indices[i];
1413 let col = &table_schema.columns[col_idx];
1414 let got_type = val.data_type();
1415 bufs.row[col_idx] = if val.is_null() {
1416 Value::Null
1417 } else {
1418 val.coerce_into(col.data_type)
1419 .ok_or_else(|| SqlError::TypeMismatch {
1420 expected: col.data_type.to_string(),
1421 got: got_type.to_string(),
1422 })?
1423 };
1424 }
1425 }
1426 } else if let Some(sel) = sel_rows {
1427 let sel_row = &sel[idx];
1428 for (i, val) in sel_row.iter().enumerate() {
1429 let col_idx = bufs.col_indices[i];
1430 let col = &table_schema.columns[col_idx];
1431 let got_type = val.data_type();
1432 bufs.row[col_idx] = if val.is_null() {
1433 Value::Null
1434 } else {
1435 val.clone().coerce_into(col.data_type).ok_or_else(|| {
1436 SqlError::TypeMismatch {
1437 expected: col.data_type.to_string(),
1438 got: got_type.to_string(),
1439 }
1440 })?
1441 };
1442 }
1443 }
1444
1445 if has_defaults {
1446 for &(pos, def_expr) in &defaults {
1447 let val = eval_const_expr(def_expr)?;
1448 let col = &table_schema.columns[pos];
1449 if !val.is_null() {
1450 let got_type = val.data_type();
1451 bufs.row[pos] =
1452 val.coerce_into(col.data_type)
1453 .ok_or_else(|| SqlError::TypeMismatch {
1454 expected: col.data_type.to_string(),
1455 got: got_type.to_string(),
1456 })?;
1457 }
1458 }
1459 }
1460
1461 if let Some(gen_map) = row_col_map_for_gen {
1462 if cache.is_some() {
1463 for (pos, fast) in cached_gen_positions
1464 .iter()
1465 .copied()
1466 .zip(cached_gen_fast_evals.iter())
1467 {
1468 let gen_expr = table_schema.columns[pos].generated_expr.as_ref().unwrap();
1469 let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1470 let col = &table_schema.columns[pos];
1471 bufs.row[pos] = if val.is_null() {
1472 Value::Null
1473 } else {
1474 let got_type = val.data_type();
1475 val.coerce_into(col.data_type)
1476 .ok_or_else(|| SqlError::TypeMismatch {
1477 expected: col.data_type.to_string(),
1478 got: got_type.to_string(),
1479 })?
1480 };
1481 }
1482 } else {
1483 for (pos, gen_expr, fast) in &generated_cols_uncached {
1484 let val = eval_fast_gen(fast, gen_expr, &bufs.row, gen_map)?;
1485 let col = &table_schema.columns[*pos];
1486 bufs.row[*pos] = if val.is_null() {
1487 Value::Null
1488 } else {
1489 let got_type = val.data_type();
1490 val.coerce_into(col.data_type)
1491 .ok_or_else(|| SqlError::TypeMismatch {
1492 expected: col.data_type.to_string(),
1493 got: got_type.to_string(),
1494 })?
1495 };
1496 }
1497 }
1498 }
1499
1500 if let Some(c) = cache {
1501 for &pos in &c.not_null_indices {
1502 if bufs.row[pos as usize].is_null() {
1503 return Err(SqlError::NotNullViolation(
1504 table_schema.columns[pos as usize].name.clone(),
1505 ));
1506 }
1507 }
1508 } else {
1509 for col in &table_schema.columns {
1510 if !col.nullable && bufs.row[col.position as usize].is_null() {
1511 return Err(SqlError::NotNullViolation(col.name.clone()));
1512 }
1513 }
1514 }
1515
1516 if let Some(ref col_map) = check_col_map {
1517 for col in &table_schema.columns {
1518 if let Some(ref check) = col.check_expr {
1519 let result = eval_expr(check, &EvalCtx::new(col_map, &bufs.row))?;
1520 if !is_truthy(&result) && !result.is_null() {
1521 let name = col.check_name.as_deref().unwrap_or(&col.name);
1522 return Err(SqlError::CheckViolation(name.to_string()));
1523 }
1524 }
1525 }
1526 for tc in &table_schema.check_constraints {
1527 let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &bufs.row))?;
1528 if !is_truthy(&result) && !result.is_null() {
1529 let name = tc.name.as_deref().unwrap_or(&tc.sql);
1530 return Err(SqlError::CheckViolation(name.to_string()));
1531 }
1532 }
1533 }
1534
1535 if has_fks {
1536 for fk in &table_schema.foreign_keys {
1537 let any_null = fk.columns.iter().any(|&ci| bufs.row[ci as usize].is_null());
1538 if any_null {
1539 continue;
1540 }
1541 let fk_vals: Vec<Value> = fk
1542 .columns
1543 .iter()
1544 .map(|&ci| bufs.row[ci as usize].clone())
1545 .collect();
1546 bufs.fk_key_buf.clear();
1547 encode_composite_key_into(&fk_vals, &mut bufs.fk_key_buf);
1548 let found = wtx
1549 .table_get(fk.foreign_table.as_bytes(), &bufs.fk_key_buf)
1550 .map_err(SqlError::Storage)?;
1551 if found.is_none() {
1552 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
1553 return Err(SqlError::ForeignKeyViolation(name.to_string()));
1554 }
1555 }
1556 }
1557
1558 let proposed_row_for_returning: Option<Vec<Value>> =
1559 returning_rows.as_ref().map(|_| bufs.row.clone());
1560
1561 for (j, &i) in pk_indices.iter().enumerate() {
1562 bufs.pk_values[j] = std::mem::replace(&mut bufs.row[i], Value::Null);
1563 }
1564 match cache.map(|c| c.single_int_pk).unwrap_or(false) {
1565 true => match bufs.pk_values[0] {
1566 Value::Integer(v) => crate::encoding::encode_int_key_into(v, &mut bufs.key_buf),
1567 _ => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1568 },
1569 false => encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf),
1570 }
1571
1572 for &slot in dropped {
1573 bufs.value_values[slot as usize] = Value::Null;
1574 }
1575 for (j, &i) in non_pk.iter().enumerate() {
1576 let col = &table_schema.columns[i];
1577 if matches!(
1578 col.generated_kind,
1579 Some(crate::parser::GeneratedKind::Virtual)
1580 ) {
1581 bufs.value_values[enc_pos[j] as usize] = Value::Null;
1582 bufs.row[i] = Value::Null;
1583 } else {
1584 bufs.value_values[enc_pos[j] as usize] =
1585 std::mem::replace(&mut bufs.row[i], Value::Null);
1586 }
1587 }
1588 match cache.and_then(|c| c.row_encoder.as_ref()) {
1589 Some(tmpl) => crate::encoding::encode_int_row_with_template(
1590 tmpl,
1591 &bufs.value_values,
1592 &mut bufs.value_buf,
1593 )?,
1594 None => encode_row_into(&bufs.value_values, &mut bufs.value_buf),
1595 }
1596
1597 if bufs.key_buf.len() > citadel_core::MAX_KEY_SIZE {
1598 return Err(SqlError::KeyTooLarge {
1599 size: bufs.key_buf.len(),
1600 max: citadel_core::MAX_KEY_SIZE,
1601 });
1602 }
1603 if bufs.value_buf.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
1604 return Err(SqlError::RowTooLarge {
1605 size: bufs.value_buf.len(),
1606 max: citadel_core::MAX_INLINE_VALUE_SIZE,
1607 });
1608 }
1609
1610 match compiled_conflict.as_ref() {
1611 None => {
1612 let is_new = wtx
1613 .table_insert(table_bytes, &bufs.key_buf, &bufs.value_buf)
1614 .map_err(SqlError::Storage)?;
1615 if !is_new {
1616 return Err(SqlError::DuplicateKey);
1617 }
1618 if has_indices {
1619 for (j, &i) in pk_indices.iter().enumerate() {
1620 bufs.row[i] = bufs.pk_values[j].clone();
1621 }
1622 for (j, &i) in non_pk.iter().enumerate() {
1623 bufs.row[i] = std::mem::replace(
1624 &mut bufs.value_values[enc_pos[j] as usize],
1625 Value::Null,
1626 );
1627 }
1628 insert_index_entries(wtx, table_schema, &bufs.row, &bufs.pk_values)?;
1629 }
1630 count += 1;
1631 if let Some(buf) = returning_rows.as_mut() {
1632 buf.push((None, proposed_row_for_returning));
1633 }
1634 }
1635 Some(oc) => {
1636 let oc_ref: &CompiledOnConflict = oc;
1637 let needs_row = upsert_needs_row(oc_ref, table_schema);
1638 if needs_row {
1639 for (j, &i) in pk_indices.iter().enumerate() {
1640 bufs.row[i] = bufs.pk_values[j].clone();
1641 }
1642 for (j, &i) in non_pk.iter().enumerate() {
1643 bufs.row[i] = std::mem::replace(
1644 &mut bufs.value_values[enc_pos[j] as usize],
1645 Value::Null,
1646 );
1647 }
1648 }
1649 let outcome = apply_insert_with_conflict(
1650 wtx,
1651 table_schema,
1652 &bufs.key_buf,
1653 &bufs.value_buf,
1654 &bufs.row,
1655 &bufs.pk_values,
1656 oc_ref,
1657 row_col_map.unwrap(),
1658 stmt.returning.is_some(),
1659 )?;
1660 match outcome {
1661 InsertRowOutcome::Inserted => {
1662 count += 1;
1663 if let Some(buf) = returning_rows.as_mut() {
1664 buf.push((None, proposed_row_for_returning));
1665 }
1666 }
1667 InsertRowOutcome::Updated { old, new } => {
1668 count += 1;
1669 if let Some(buf) = returning_rows.as_mut() {
1670 buf.push((Some(old), Some(new)));
1671 }
1672 }
1673 InsertRowOutcome::Skipped => {}
1674 }
1675 }
1676 }
1677 }
1678
1679 if let (Some(returning_cols), Some(rows)) = (stmt.returning.as_ref(), returning_rows) {
1680 return Ok(ExecutionResult::Query(super::helpers::project_returning(
1681 table_schema,
1682 returning_cols,
1683 &rows,
1684 )?));
1685 }
1686
1687 Ok(ExecutionResult::RowsAffected(count))
1688}
1689
1690pub struct CompiledInsert {
1691 table_lower: String,
1692 cached: Option<InsertCache>,
1693}
1694
1695struct InsertCache {
1696 col_indices: Vec<usize>,
1697 has_subquery: bool,
1698 any_defaults: bool,
1699 has_checks: bool,
1700 on_conflict: Option<Arc<CompiledOnConflict>>,
1701 row_col_map: Option<ColumnMap>,
1702 generated_col_positions: Vec<usize>,
1703 generated_fast_evals: Vec<FastGenEval>,
1704 pk_indices: Vec<usize>,
1705 non_pk_indices: Vec<usize>,
1706 encoding_positions: Vec<u16>,
1707 dropped_non_pk_slots: Vec<u16>,
1708 phys_count: usize,
1709 single_int_pk: bool,
1710 not_null_indices: Vec<u16>,
1711 bind_plan: Option<Vec<BindAction>>,
1712 row_fully_overwritten: bool,
1713 row_encoder: Option<crate::encoding::IntRowTemplate>,
1714 is_trivial_fast: bool,
1715 trivial_fast_program: Option<TrivialFastProgram>,
1716}
1717
1718#[derive(Clone)]
1719enum BindAction {
1720 Param {
1721 param_idx: usize,
1722 col_idx: usize,
1723 target: DataType,
1724 },
1725 Literal {
1726 value: Value,
1727 col_idx: usize,
1728 },
1729}
1730
1731#[derive(Clone)]
1732struct TrivialFastProgram {
1733 template: Vec<u8>,
1734 ops: Vec<WriteOp>,
1735 pk_param: u8,
1736 not_null_param_indices: Vec<u8>,
1737}
1738
1739#[derive(Clone)]
1740enum WriteOp {
1741 ParamI64 {
1742 param_idx: u8,
1743 off: u32,
1744 },
1745 LiteralI64 {
1746 value: i64,
1747 off: u32,
1748 },
1749 GenAddParamsI64 {
1750 a_param: u8,
1751 b_param: u8,
1752 off: u32,
1753 bitmap_byte_off: u32,
1754 bitmap_bit_mask: u8,
1755 },
1756 GenMulAddParamI64 {
1757 param_idx: u8,
1758 mul: i64,
1759 add: i64,
1760 off: u32,
1761 bitmap_byte_off: u32,
1762 bitmap_bit_mask: u8,
1763 },
1764}
1765
1766fn build_trivial_fast_program(
1767 bind_plan: &[BindAction],
1768 row_encoder: &crate::encoding::IntRowTemplate,
1769 non_virtual_pairs: &[(usize, usize)],
1770 generated_col_positions: &[usize],
1771 generated_fast_evals: &[FastGenEval],
1772 pk_indices: &[usize],
1773 columns: &[crate::types::ColumnDef],
1774) -> Option<TrivialFastProgram> {
1775 let pk_col = pk_indices[0];
1776 let col_to_slot: rustc_hash::FxHashMap<usize, usize> =
1777 non_virtual_pairs.iter().copied().collect();
1778 let slot_to_off: rustc_hash::FxHashMap<usize, usize> =
1779 row_encoder.slot_offsets.iter().copied().collect();
1780
1781 let mut col_to_param: rustc_hash::FxHashMap<usize, u8> = Default::default();
1782 let mut col_to_lit_int: rustc_hash::FxHashMap<usize, i64> = Default::default();
1783 let mut pk_param: Option<u8> = None;
1784 let mut ops: Vec<WriteOp> = Vec::with_capacity(bind_plan.len() + generated_col_positions.len());
1785 let mut not_null_param_indices: Vec<u8> = Vec::new();
1786
1787 for action in bind_plan {
1788 match action {
1789 BindAction::Param {
1790 param_idx,
1791 col_idx,
1792 target,
1793 } => {
1794 if *target != DataType::Integer {
1795 return None;
1796 }
1797 let pi: u8 = u8::try_from(*param_idx).ok()?;
1798 col_to_param.insert(*col_idx, pi);
1799 if *col_idx == pk_col {
1800 pk_param = Some(pi);
1801 } else {
1802 let slot = *col_to_slot.get(col_idx)?;
1803 let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
1804 ops.push(WriteOp::ParamI64 { param_idx: pi, off });
1805 if !columns[*col_idx].nullable {
1806 not_null_param_indices.push(pi);
1807 }
1808 }
1809 }
1810 BindAction::Literal { value, col_idx } => match value {
1811 Value::Integer(v) => {
1812 col_to_lit_int.insert(*col_idx, *v);
1813 if *col_idx == pk_col {
1814 return None;
1815 }
1816 let slot = *col_to_slot.get(col_idx)?;
1817 let off = u32::try_from(*slot_to_off.get(&slot)?).ok()?;
1818 ops.push(WriteOp::LiteralI64 { value: *v, off });
1819 }
1820 _ => return None,
1821 },
1822 }
1823 }
1824
1825 let pk_param = pk_param?;
1826
1827 for (i, &gen_pos) in generated_col_positions.iter().enumerate() {
1828 let gen_slot = *col_to_slot.get(&gen_pos)?;
1829 let gen_off = u32::try_from(*slot_to_off.get(&gen_slot)?).ok()?;
1830 let bitmap_byte_off = u32::try_from(2 + gen_slot / 8).ok()?;
1831 let bitmap_bit_mask: u8 = 1u8 << (gen_slot % 8);
1832 let gen_col_nullable = columns[gen_pos].nullable;
1833
1834 match &generated_fast_evals[i] {
1835 FastGenEval::IntColAddCol {
1836 left_idx,
1837 right_idx,
1838 } => {
1839 let a_param = col_to_param.get(left_idx).copied();
1840 let b_param = col_to_param.get(right_idx).copied();
1841 match (a_param, b_param) {
1842 (Some(ap), Some(bp)) => {
1843 let deps_safe = gen_col_nullable
1844 || (not_null_param_indices.contains(&ap)
1845 && not_null_param_indices.contains(&bp));
1846 if !deps_safe {
1847 return None;
1848 }
1849 ops.push(WriteOp::GenAddParamsI64 {
1850 a_param: ap,
1851 b_param: bp,
1852 off: gen_off,
1853 bitmap_byte_off,
1854 bitmap_bit_mask,
1855 });
1856 }
1857 (Some(p), None) => {
1858 let lit = col_to_lit_int.get(right_idx).copied()?;
1859 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
1860 return None;
1861 }
1862 ops.push(WriteOp::GenMulAddParamI64 {
1863 param_idx: p,
1864 mul: 1,
1865 add: lit,
1866 off: gen_off,
1867 bitmap_byte_off,
1868 bitmap_bit_mask,
1869 });
1870 }
1871 (None, Some(p)) => {
1872 let lit = col_to_lit_int.get(left_idx).copied()?;
1873 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
1874 return None;
1875 }
1876 ops.push(WriteOp::GenMulAddParamI64 {
1877 param_idx: p,
1878 mul: 1,
1879 add: lit,
1880 off: gen_off,
1881 bitmap_byte_off,
1882 bitmap_bit_mask,
1883 });
1884 }
1885 (None, None) => {
1886 let la = col_to_lit_int.get(left_idx).copied()?;
1887 let lb = col_to_lit_int.get(right_idx).copied()?;
1888 ops.push(WriteOp::LiteralI64 {
1889 value: la.wrapping_add(lb),
1890 off: gen_off,
1891 });
1892 }
1893 }
1894 }
1895 FastGenEval::IntColMulAdd {
1896 col_schema_idx,
1897 mul,
1898 add,
1899 } => {
1900 if let Some(p) = col_to_param.get(col_schema_idx).copied() {
1901 if !gen_col_nullable && !not_null_param_indices.contains(&p) {
1902 return None;
1903 }
1904 ops.push(WriteOp::GenMulAddParamI64 {
1905 param_idx: p,
1906 mul: *mul,
1907 add: *add,
1908 off: gen_off,
1909 bitmap_byte_off,
1910 bitmap_bit_mask,
1911 });
1912 } else if let Some(lit) = col_to_lit_int.get(col_schema_idx).copied() {
1913 ops.push(WriteOp::LiteralI64 {
1914 value: lit.wrapping_mul(*mul).wrapping_add(*add),
1915 off: gen_off,
1916 });
1917 } else {
1918 return None;
1919 }
1920 }
1921 FastGenEval::None => return None,
1922 }
1923 }
1924
1925 Some(TrivialFastProgram {
1926 template: row_encoder.template.clone(),
1927 ops,
1928 pk_param,
1929 not_null_param_indices,
1930 })
1931}
1932
1933#[derive(Clone)]
1934pub(super) enum CompiledOnConflict {
1935 DoNothing {
1936 target: Option<ConflictKind>,
1937 },
1938 DoUpdate {
1939 target: ConflictKind,
1940 assignments: Vec<(usize, Expr)>,
1941 where_clause: Option<Expr>,
1942 fast_paths: Option<Vec<DoUpdateFastPath>>,
1943 },
1944}
1945
1946#[derive(Clone, Copy)]
1947pub(super) enum DoUpdateFastPath {
1948 IntAddConst { phys_idx: usize, delta: i64 },
1949}
1950
1951#[derive(Clone, Debug)]
1952pub(super) enum ConflictKind {
1953 PrimaryKey,
1954 UniqueIndex { index_idx: usize },
1955}
1956
1957fn resolve_conflict_target(target: &ConflictTarget, ts: &TableSchema) -> Result<ConflictKind> {
1958 match target {
1959 ConflictTarget::Columns(cols) => {
1960 let col_idx_set: Vec<u16> = cols
1961 .iter()
1962 .map(|name| {
1963 ts.column_index(name)
1964 .map(|i| i as u16)
1965 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
1966 })
1967 .collect::<Result<_>>()?;
1968 let pk_set = ts.primary_key_columns.clone();
1969 if set_equal(&col_idx_set, &pk_set) {
1970 return Ok(ConflictKind::PrimaryKey);
1971 }
1972 for (index_idx, idx) in ts.indices.iter().enumerate() {
1973 if idx.unique && set_equal(&col_idx_set, &idx.columns) {
1974 return Ok(ConflictKind::UniqueIndex { index_idx });
1975 }
1976 }
1977 Err(SqlError::Plan(
1978 "ON CONFLICT target does not match any unique constraint".into(),
1979 ))
1980 }
1981 ConflictTarget::Constraint(name) => {
1982 let lower = name.to_ascii_lowercase();
1983 for (index_idx, idx) in ts.indices.iter().enumerate() {
1984 if idx.name.eq_ignore_ascii_case(&lower) {
1985 if idx.unique {
1986 return Ok(ConflictKind::UniqueIndex { index_idx });
1987 }
1988 return Err(SqlError::Plan(format!(
1989 "ON CONFLICT ON CONSTRAINT '{name}' requires a unique index"
1990 )));
1991 }
1992 }
1993 Err(SqlError::Plan(format!(
1994 "unknown constraint '{name}'; primary keys cannot be referenced by name, use ON CONFLICT (col_list)"
1995 )))
1996 }
1997 }
1998}
1999
2000fn set_equal(a: &[u16], b: &[u16]) -> bool {
2001 if a.len() != b.len() {
2002 return false;
2003 }
2004 let mut a_sorted = a.to_vec();
2005 let mut b_sorted = b.to_vec();
2006 a_sorted.sort_unstable();
2007 b_sorted.sort_unstable();
2008 a_sorted == b_sorted
2009}
2010
2011pub(super) enum InsertRowOutcome {
2012 Inserted,
2013 Updated { old: Vec<Value>, new: Vec<Value> },
2014 Skipped,
2015}
2016
2017#[allow(clippy::too_many_arguments)]
2018#[inline]
2019pub(super) fn apply_insert_with_conflict(
2020 wtx: &mut WriteTxn<'_>,
2021 table_schema: &TableSchema,
2022 key_buf: &[u8],
2023 value_buf: &[u8],
2024 row: &[Value],
2025 pk_values: &[Value],
2026 on_conflict: &CompiledOnConflict,
2027 col_map: &ColumnMap,
2028 capture_returning: bool,
2029) -> Result<InsertRowOutcome> {
2030 let table_bytes = table_schema.name.as_bytes();
2031
2032 if let CompiledOnConflict::DoNothing { target } = on_conflict {
2033 let pk_target = matches!(target, None | Some(ConflictKind::PrimaryKey));
2034 if pk_target && table_schema.indices.is_empty() && table_schema.foreign_keys.is_empty() {
2035 let inserted = wtx
2036 .table_insert_if_absent(table_bytes, key_buf, value_buf)
2037 .map_err(SqlError::Storage)?;
2038 return Ok(if inserted {
2039 InsertRowOutcome::Inserted
2040 } else {
2041 InsertRowOutcome::Skipped
2042 });
2043 }
2044 }
2045
2046 if let CompiledOnConflict::DoUpdate {
2047 target: ConflictKind::PrimaryKey,
2048 assignments,
2049 where_clause,
2050 fast_paths,
2051 } = on_conflict
2052 {
2053 if can_fuse_do_update(table_schema, assignments) {
2054 return apply_do_update_fused(
2055 wtx,
2056 table_schema,
2057 table_bytes,
2058 key_buf,
2059 value_buf,
2060 row,
2061 assignments,
2062 where_clause.as_ref(),
2063 col_map,
2064 fast_paths.as_deref(),
2065 capture_returning,
2066 );
2067 }
2068 }
2069
2070 let primary_outcome = wtx
2071 .table_insert_or_fetch(table_bytes, key_buf, value_buf)
2072 .map_err(SqlError::Storage)?;
2073
2074 match primary_outcome {
2075 citadel_txn::write_txn::InsertOutcome::Inserted => {
2076 if table_schema.indices.is_empty() {
2077 return Ok(InsertRowOutcome::Inserted);
2078 }
2079 let mut inserted_keys: Vec<(usize, Vec<u8>)> = Vec::new();
2080 match insert_index_entries_or_fetch(
2081 wtx,
2082 table_schema,
2083 row,
2084 pk_values,
2085 &mut inserted_keys,
2086 )? {
2087 None => Ok(InsertRowOutcome::Inserted),
2088 Some(conflicting_idx) => {
2089 let matches_target =
2090 matches!(on_conflict, CompiledOnConflict::DoNothing { target: None })
2091 || matches!(
2092 on_conflict,
2093 CompiledOnConflict::DoNothing {
2094 target: Some(ConflictKind::UniqueIndex { index_idx }),
2095 } | CompiledOnConflict::DoUpdate {
2096 target: ConflictKind::UniqueIndex { index_idx },
2097 ..
2098 } if *index_idx == conflicting_idx
2099 );
2100 undo_partial_insert(wtx, table_schema, key_buf, &inserted_keys)?;
2101 if !matches_target {
2102 return Err(SqlError::UniqueViolation(
2103 table_schema.indices[conflicting_idx].name.clone(),
2104 ));
2105 }
2106 match on_conflict {
2107 CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2108 CompiledOnConflict::DoUpdate {
2109 assignments,
2110 where_clause,
2111 ..
2112 } => {
2113 let existing_pk =
2114 fetch_unique_index_pk(wtx, table_schema, conflicting_idx, row)?;
2115 apply_do_update(
2116 wtx,
2117 table_schema,
2118 &existing_pk,
2119 row,
2120 assignments,
2121 where_clause.as_ref(),
2122 col_map,
2123 capture_returning,
2124 )
2125 }
2126 }
2127 }
2128 }
2129 }
2130 citadel_txn::write_txn::InsertOutcome::Existed(old_bytes) => {
2131 let matches_target = matches!(
2132 on_conflict,
2133 CompiledOnConflict::DoNothing { target: None }
2134 | CompiledOnConflict::DoNothing {
2135 target: Some(ConflictKind::PrimaryKey),
2136 }
2137 | CompiledOnConflict::DoUpdate {
2138 target: ConflictKind::PrimaryKey,
2139 ..
2140 }
2141 );
2142 if !matches_target {
2143 return Err(SqlError::DuplicateKey);
2144 }
2145 match on_conflict {
2146 CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
2147 CompiledOnConflict::DoUpdate {
2148 assignments,
2149 where_clause,
2150 ..
2151 } => {
2152 let old_row = decode_full_row(table_schema, key_buf, &old_bytes)?;
2153 apply_do_update_with_old_row(
2154 wtx,
2155 table_schema,
2156 key_buf,
2157 &old_row,
2158 row,
2159 assignments,
2160 where_clause.as_ref(),
2161 col_map,
2162 capture_returning,
2163 )
2164 }
2165 }
2166 }
2167 }
2168}
2169
2170#[inline]
2171fn apply_fast_path_patch(
2172 old_bytes: &[u8],
2173 fast_paths: &[DoUpdateFastPath],
2174) -> Result<UpsertAction> {
2175 UPSERT_SCRATCH.with(|slot| {
2176 let mut bufs = slot.borrow_mut();
2177 bufs.new_value_buf.clear();
2178 bufs.new_value_buf.extend_from_slice(old_bytes);
2179
2180 let mut patch_scratch: Vec<u8> = Vec::new();
2181
2182 for fp in fast_paths {
2183 match fp {
2184 DoUpdateFastPath::IntAddConst { phys_idx, delta } => {
2185 let decoded =
2186 crate::encoding::decode_columns(&bufs.new_value_buf, &[*phys_idx])?;
2187 let old_val = &decoded[0];
2188 let new_val = match old_val {
2189 Value::Integer(i) => Value::Integer(i.wrapping_add(*delta)),
2190 Value::Null => Value::Null,
2191 _ => {
2192 return Err(SqlError::TypeMismatch {
2193 expected: "INTEGER".into(),
2194 got: old_val.data_type().to_string(),
2195 });
2196 }
2197 };
2198 if !crate::encoding::patch_column_in_place(
2199 &mut bufs.new_value_buf,
2200 *phys_idx,
2201 &new_val,
2202 )? {
2203 patch_scratch.clear();
2204 crate::encoding::patch_row_column(
2205 &bufs.new_value_buf,
2206 *phys_idx,
2207 &new_val,
2208 &mut patch_scratch,
2209 )?;
2210 std::mem::swap(&mut bufs.new_value_buf, &mut patch_scratch);
2211 }
2212 }
2213 }
2214 }
2215
2216 if bufs.new_value_buf.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
2217 return Err(SqlError::RowTooLarge {
2218 size: bufs.new_value_buf.len(),
2219 max: citadel_core::MAX_INLINE_VALUE_SIZE,
2220 });
2221 }
2222
2223 Ok(UpsertAction::Replace(bufs.new_value_buf.clone()))
2224 })
2225}
2226
2227fn upsert_needs_row(oc: &CompiledOnConflict, ts: &TableSchema) -> bool {
2228 if !ts.indices.is_empty() {
2229 return true;
2230 }
2231 match oc {
2232 CompiledOnConflict::DoNothing { .. } => false,
2233 CompiledOnConflict::DoUpdate { fast_paths, .. } => fast_paths.is_none() || ts.has_checks(),
2234 }
2235}
2236
2237fn can_fuse_do_update(ts: &TableSchema, assignments: &[(usize, Expr)]) -> bool {
2238 if !ts.indices.is_empty() {
2239 return false;
2240 }
2241 if !ts.foreign_keys.is_empty() {
2242 return false;
2243 }
2244 if ts.columns.iter().any(|c| c.generated_kind.is_some()) {
2245 return false;
2246 }
2247 let pk = ts.pk_indices();
2248 !assignments.iter().any(|(ci, _)| pk.contains(ci))
2249}
2250
2251#[allow(clippy::too_many_arguments)]
2252#[inline]
2253fn apply_do_update_fused(
2254 wtx: &mut WriteTxn<'_>,
2255 table_schema: &TableSchema,
2256 table_bytes: &[u8],
2257 key_buf: &[u8],
2258 value_buf: &[u8],
2259 proposed_row: &[Value],
2260 assignments: &[(usize, Expr)],
2261 where_clause: Option<&Expr>,
2262 col_map: &ColumnMap,
2263 fast_paths: Option<&[DoUpdateFastPath]>,
2264 capture_returning: bool,
2265) -> Result<InsertRowOutcome> {
2266 let non_pk = table_schema.non_pk_indices();
2267 let enc_pos = table_schema.encoding_positions();
2268 let phys_count = table_schema.physical_non_pk_count();
2269 let dropped = table_schema.dropped_non_pk_slots();
2270 let has_checks = table_schema.has_checks();
2271 let has_fks = !table_schema.foreign_keys.is_empty();
2272
2273 let captured: std::cell::RefCell<Option<(Vec<Value>, Vec<Value>)>> =
2274 std::cell::RefCell::new(None);
2275
2276 let outcome =
2277 wtx.table_upsert_with::<_, SqlError>(table_bytes, key_buf, value_buf, |old_bytes| {
2278 if let Some(fps) = fast_paths {
2279 if !has_checks {
2280 let action = apply_fast_path_patch(old_bytes, fps)?;
2281 if capture_returning {
2282 if let UpsertAction::Replace(ref new_bytes) = action {
2283 let old_row = decode_full_row(table_schema, key_buf, old_bytes)?;
2284 let new_row = decode_full_row(table_schema, key_buf, new_bytes)?;
2285 *captured.borrow_mut() = Some((old_row, new_row));
2286 }
2287 }
2288 return Ok(action);
2289 }
2290 }
2291 UPSERT_SCRATCH.with(|slot| {
2292 let mut bufs = slot.borrow_mut();
2293 let UpsertBufs {
2294 old_row,
2295 new_row,
2296 value_values,
2297 new_value_buf,
2298 } = &mut *bufs;
2299
2300 old_row.clear();
2301 old_row.resize(table_schema.columns.len(), Value::Null);
2302 decode_full_row_into(table_schema, key_buf, old_bytes, old_row)?;
2303
2304 if let Some(w) = where_clause {
2305 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2306 let result = eval_expr(w, &ctx)?;
2307 if result.is_null() || !is_truthy(&result) {
2308 return Ok(UpsertAction::Skip);
2309 }
2310 }
2311
2312 new_row.clear();
2313 new_row.extend_from_slice(old_row);
2314 for (col_idx, expr) in assignments {
2315 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2316 let val = eval_expr(expr, &ctx)?;
2317 let col = &table_schema.columns[*col_idx];
2318 new_row[*col_idx] = if val.is_null() {
2319 Value::Null
2320 } else {
2321 let got = val.data_type();
2322 val.coerce_into(col.data_type)
2323 .ok_or_else(|| SqlError::TypeMismatch {
2324 expected: col.data_type.to_string(),
2325 got: got.to_string(),
2326 })?
2327 };
2328 }
2329
2330 for (assigned_idx, _) in assignments {
2331 let col = &table_schema.columns[*assigned_idx];
2332 if !col.nullable && new_row[col.position as usize].is_null() {
2333 return Err(SqlError::NotNullViolation(col.name.clone()));
2334 }
2335 }
2336 if has_checks {
2337 for col in &table_schema.columns {
2338 if let Some(ref check) = col.check_expr {
2339 let ctx = EvalCtx::new(col_map, new_row);
2340 let result = eval_expr(check, &ctx)?;
2341 if !is_truthy(&result) && !result.is_null() {
2342 let name = col.check_name.as_deref().unwrap_or(&col.name);
2343 return Err(SqlError::CheckViolation(name.to_string()));
2344 }
2345 }
2346 }
2347 for tc in &table_schema.check_constraints {
2348 let ctx = EvalCtx::new(col_map, new_row);
2349 let result = eval_expr(&tc.expr, &ctx)?;
2350 if !is_truthy(&result) && !result.is_null() {
2351 let name = tc.name.as_deref().unwrap_or(&tc.sql);
2352 return Err(SqlError::CheckViolation(name.to_string()));
2353 }
2354 }
2355 }
2356 let _ = has_fks;
2357
2358 value_values.clear();
2359 value_values.resize(phys_count, Value::Null);
2360 for &slot in dropped {
2361 value_values[slot as usize] = Value::Null;
2362 }
2363 for (j, &i) in non_pk.iter().enumerate() {
2364 value_values[enc_pos[j] as usize] = new_row[i].clone();
2365 }
2366 new_value_buf.clear();
2367 crate::encoding::encode_row_into(value_values, new_value_buf);
2368
2369 if new_value_buf.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
2370 return Err(SqlError::RowTooLarge {
2371 size: new_value_buf.len(),
2372 max: citadel_core::MAX_INLINE_VALUE_SIZE,
2373 });
2374 }
2375
2376 if capture_returning {
2377 *captured.borrow_mut() = Some((old_row.clone(), new_row.clone()));
2378 }
2379 Ok(UpsertAction::Replace(new_value_buf.clone()))
2380 })
2381 })?;
2382
2383 match outcome {
2384 UpsertOutcome::Inserted => Ok(InsertRowOutcome::Inserted),
2385 UpsertOutcome::Updated => {
2386 if capture_returning {
2387 let (old, new) = captured.into_inner().ok_or_else(|| {
2388 SqlError::InvalidValue("DO UPDATE produced no captured rows".into())
2389 })?;
2390 Ok(InsertRowOutcome::Updated { old, new })
2391 } else {
2392 Ok(InsertRowOutcome::Inserted)
2393 }
2394 }
2395 UpsertOutcome::Skipped => Ok(InsertRowOutcome::Skipped),
2396 }
2397}
2398
2399fn fetch_unique_index_pk(
2400 wtx: &mut WriteTxn<'_>,
2401 table_schema: &TableSchema,
2402 index_idx: usize,
2403 row: &[Value],
2404) -> Result<Vec<u8>> {
2405 let idx = &table_schema.indices[index_idx];
2406 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2407 let indexed: Vec<Value> = idx
2408 .columns
2409 .iter()
2410 .map(|&col_idx| row[col_idx as usize].clone())
2411 .collect();
2412 let key = crate::encoding::encode_composite_key(&indexed);
2413 let value = wtx
2414 .table_get(&idx_table, &key)
2415 .map_err(SqlError::Storage)?
2416 .ok_or_else(|| {
2417 SqlError::InvalidValue("unique index missing expected collision entry".into())
2418 })?;
2419 Ok(value)
2420}
2421
2422#[allow(clippy::too_many_arguments)]
2423fn apply_do_update(
2424 wtx: &mut WriteTxn<'_>,
2425 table_schema: &TableSchema,
2426 pk_key: &[u8],
2427 proposed_row: &[Value],
2428 assignments: &[(usize, Expr)],
2429 where_clause: Option<&Expr>,
2430 col_map: &ColumnMap,
2431 capture_returning: bool,
2432) -> Result<InsertRowOutcome> {
2433 let old_value = wtx
2434 .table_get(table_schema.name.as_bytes(), pk_key)
2435 .map_err(SqlError::Storage)?
2436 .ok_or_else(|| SqlError::InvalidValue("primary row missing for DO UPDATE target".into()))?;
2437 let old_row = decode_full_row(table_schema, pk_key, &old_value)?;
2438 apply_do_update_with_old_row(
2439 wtx,
2440 table_schema,
2441 pk_key,
2442 &old_row,
2443 proposed_row,
2444 assignments,
2445 where_clause,
2446 col_map,
2447 capture_returning,
2448 )
2449}
2450
2451#[allow(clippy::too_many_arguments)]
2452fn apply_do_update_with_old_row(
2453 wtx: &mut WriteTxn<'_>,
2454 table_schema: &TableSchema,
2455 old_pk_key: &[u8],
2456 old_row: &[Value],
2457 proposed_row: &[Value],
2458 assignments: &[(usize, Expr)],
2459 where_clause: Option<&Expr>,
2460 col_map: &ColumnMap,
2461 capture_returning: bool,
2462) -> Result<InsertRowOutcome> {
2463 if let Some(w) = where_clause {
2464 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2465 let result = eval_expr(w, &ctx)?;
2466 if result.is_null() || !is_truthy(&result) {
2467 return Ok(InsertRowOutcome::Skipped);
2468 }
2469 }
2470
2471 let mut new_row = old_row.to_vec();
2472 for (col_idx, expr) in assignments {
2473 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
2474 let val = eval_expr(expr, &ctx)?;
2475 let col = &table_schema.columns[*col_idx];
2476 new_row[*col_idx] = if val.is_null() {
2477 Value::Null
2478 } else {
2479 let got = val.data_type();
2480 val.coerce_into(col.data_type)
2481 .ok_or_else(|| SqlError::TypeMismatch {
2482 expected: col.data_type.to_string(),
2483 got: got.to_string(),
2484 })?
2485 };
2486 }
2487
2488 for col in &table_schema.columns {
2489 if matches!(
2490 col.generated_kind,
2491 Some(crate::parser::GeneratedKind::Stored)
2492 ) {
2493 let val = eval_expr(
2494 col.generated_expr.as_ref().unwrap(),
2495 &EvalCtx::new(col_map, &new_row),
2496 )?;
2497 let pos = col.position as usize;
2498 new_row[pos] = if val.is_null() {
2499 if !col.nullable {
2500 return Err(SqlError::NotNullViolation(col.name.clone()));
2501 }
2502 Value::Null
2503 } else {
2504 let got = val.data_type();
2505 val.coerce_into(col.data_type)
2506 .ok_or_else(|| SqlError::TypeMismatch {
2507 expected: col.data_type.to_string(),
2508 got: got.to_string(),
2509 })?
2510 };
2511 }
2512 }
2513
2514 let pk_indices = table_schema.pk_indices();
2515 let assigned_pk = assignments.iter().any(|(ci, _)| pk_indices.contains(ci));
2516 let pk_changed = assigned_pk && pk_indices.iter().any(|&i| old_row[i] != new_row[i]);
2517
2518 for (assigned_idx, _) in assignments {
2519 let col = &table_schema.columns[*assigned_idx];
2520 if !col.nullable && new_row[col.position as usize].is_null() {
2521 return Err(SqlError::NotNullViolation(col.name.clone()));
2522 }
2523 }
2524 if table_schema.has_checks() {
2525 for col in &table_schema.columns {
2526 if let Some(ref check) = col.check_expr {
2527 let ctx = EvalCtx::new(col_map, &new_row);
2528 let result = eval_expr(check, &ctx)?;
2529 if !is_truthy(&result) && !result.is_null() {
2530 let name = col.check_name.as_deref().unwrap_or(&col.name);
2531 return Err(SqlError::CheckViolation(name.to_string()));
2532 }
2533 }
2534 }
2535 for tc in &table_schema.check_constraints {
2536 let ctx = EvalCtx::new(col_map, &new_row);
2537 let result = eval_expr(&tc.expr, &ctx)?;
2538 if !is_truthy(&result) && !result.is_null() {
2539 let name = tc.name.as_deref().unwrap_or(&tc.sql);
2540 return Err(SqlError::CheckViolation(name.to_string()));
2541 }
2542 }
2543 }
2544 for fk in &table_schema.foreign_keys {
2545 let changed = fk
2546 .columns
2547 .iter()
2548 .any(|&ci| old_row[ci as usize] != new_row[ci as usize]);
2549 if !changed {
2550 continue;
2551 }
2552 let any_null = fk.columns.iter().any(|&ci| new_row[ci as usize].is_null());
2553 if any_null {
2554 continue;
2555 }
2556 let fk_vals: Vec<Value> = fk
2557 .columns
2558 .iter()
2559 .map(|&ci| new_row[ci as usize].clone())
2560 .collect();
2561 let fk_key = crate::encoding::encode_composite_key(&fk_vals);
2562 let found = wtx
2563 .table_get(fk.foreign_table.as_bytes(), &fk_key)
2564 .map_err(SqlError::Storage)?;
2565 if found.is_none() {
2566 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
2567 return Err(SqlError::ForeignKeyViolation(name.to_string()));
2568 }
2569 }
2570
2571 let has_indices = !table_schema.indices.is_empty();
2572 let old_pk_values: Vec<Value> = if has_indices || pk_changed {
2573 pk_indices.iter().map(|&i| old_row[i].clone()).collect()
2574 } else {
2575 Vec::new()
2576 };
2577 let new_pk_values: Vec<Value> = if has_indices || pk_changed {
2578 pk_indices.iter().map(|&i| new_row[i].clone()).collect()
2579 } else {
2580 Vec::new()
2581 };
2582
2583 let non_pk = table_schema.non_pk_indices();
2584 let enc_pos = table_schema.encoding_positions();
2585 let phys_count = table_schema.physical_non_pk_count();
2586 let dropped = table_schema.dropped_non_pk_slots();
2587 let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
2588 for &slot in dropped {
2589 value_values[slot as usize] = Value::Null;
2590 }
2591 for (j, &i) in non_pk.iter().enumerate() {
2592 let col = &table_schema.columns[i];
2593 value_values[enc_pos[j] as usize] = if matches!(
2594 col.generated_kind,
2595 Some(crate::parser::GeneratedKind::Virtual)
2596 ) {
2597 Value::Null
2598 } else {
2599 new_row[i].clone()
2600 };
2601 }
2602 let mut new_value_buf = Vec::with_capacity(256);
2603 crate::encoding::encode_row_into(&value_values, &mut new_value_buf);
2604
2605 if new_value_buf.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
2606 return Err(SqlError::RowTooLarge {
2607 size: new_value_buf.len(),
2608 max: citadel_core::MAX_INLINE_VALUE_SIZE,
2609 });
2610 }
2611
2612 if pk_changed {
2613 let new_pk_key = crate::encoding::encode_composite_key(&new_pk_values);
2614 let inserted = wtx
2615 .table_insert(table_schema.name.as_bytes(), &new_pk_key, &new_value_buf)
2616 .map_err(SqlError::Storage)?;
2617 if !inserted {
2618 return Err(SqlError::DuplicateKey);
2619 }
2620 wtx.table_delete(table_schema.name.as_bytes(), old_pk_key)
2621 .map_err(SqlError::Storage)?;
2622 for idx in &table_schema.indices {
2623 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2624 let old_idx_key = encode_index_key(idx, old_row, &old_pk_values);
2625 wtx.table_delete(&idx_table, &old_idx_key)
2626 .map_err(SqlError::Storage)?;
2627 let new_idx_key = encode_index_key(idx, &new_row, &new_pk_values);
2628 let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
2629 let is_new = wtx
2630 .table_insert(&idx_table, &new_idx_key, &new_idx_val)
2631 .map_err(SqlError::Storage)?;
2632 if idx.unique && !is_new {
2633 let any_null = idx.columns.iter().any(|&c| new_row[c as usize].is_null());
2634 if !any_null {
2635 return Err(SqlError::UniqueViolation(idx.name.clone()));
2636 }
2637 }
2638 }
2639 } else {
2640 wtx.table_update_sorted(
2641 table_schema.name.as_bytes(),
2642 &[(old_pk_key, new_value_buf.as_slice())],
2643 )
2644 .map_err(SqlError::Storage)?;
2645 for idx in &table_schema.indices {
2646 if !index_columns_changed(idx, old_row, &new_row) {
2647 continue;
2648 }
2649 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2650 let old_idx_key = encode_index_key(idx, old_row, &old_pk_values);
2651 wtx.table_delete(&idx_table, &old_idx_key)
2652 .map_err(SqlError::Storage)?;
2653 let new_idx_key = encode_index_key(idx, &new_row, &new_pk_values);
2654 let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
2655 let is_new = wtx
2656 .table_insert(&idx_table, &new_idx_key, &new_idx_val)
2657 .map_err(SqlError::Storage)?;
2658 if idx.unique && !is_new {
2659 let any_null = idx.columns.iter().any(|&c| new_row[c as usize].is_null());
2660 if !any_null {
2661 return Err(SqlError::UniqueViolation(idx.name.clone()));
2662 }
2663 }
2664 }
2665 }
2666
2667 if capture_returning {
2668 Ok(InsertRowOutcome::Updated {
2669 old: old_row.to_vec(),
2670 new: new_row,
2671 })
2672 } else {
2673 Ok(InsertRowOutcome::Inserted)
2674 }
2675}
2676
2677fn detect_fast_paths(
2678 ts: &TableSchema,
2679 assignments: &[(usize, Expr)],
2680) -> Option<Vec<DoUpdateFastPath>> {
2681 let non_pk = ts.non_pk_indices();
2682 let enc_pos = ts.encoding_positions();
2683 let mut out = Vec::with_capacity(assignments.len());
2684 for (col_idx, expr) in assignments {
2685 let col = &ts.columns[*col_idx];
2686 if col.data_type != DataType::Integer {
2687 return None;
2688 }
2689 let nonpk_order = non_pk.iter().position(|&i| i == *col_idx)?;
2690 let phys_idx = enc_pos[nonpk_order] as usize;
2691
2692 if let Expr::BinaryOp { left, op, right } = expr {
2693 if !matches!(op, BinOp::Add | BinOp::Sub) {
2694 return None;
2695 }
2696 let reads_target =
2697 matches!(left.as_ref(), Expr::Column(n) if n.eq_ignore_ascii_case(&col.name));
2698 if !reads_target {
2699 return None;
2700 }
2701 if let Expr::Literal(Value::Integer(n)) = right.as_ref() {
2702 let delta = if matches!(op, BinOp::Sub) { -n } else { *n };
2703 let _ = col_idx;
2704 out.push(DoUpdateFastPath::IntAddConst { phys_idx, delta });
2705 continue;
2706 }
2707 return None;
2708 }
2709 return None;
2710 }
2711 Some(out)
2712}
2713
2714fn compile_on_conflict(oc: &OnConflictClause, ts: &TableSchema) -> Result<CompiledOnConflict> {
2715 let target = oc
2716 .target
2717 .as_ref()
2718 .map(|t| resolve_conflict_target(t, ts))
2719 .transpose()?;
2720 match &oc.action {
2721 OnConflictAction::DoNothing => Ok(CompiledOnConflict::DoNothing { target }),
2722 OnConflictAction::DoUpdate {
2723 assignments,
2724 where_clause,
2725 } => {
2726 let target = target.ok_or_else(|| {
2727 SqlError::Plan("ON CONFLICT without target requires DO NOTHING".into())
2728 })?;
2729 let compiled_assignments: Vec<(usize, Expr)> = assignments
2730 .iter()
2731 .map(|(name, expr)| {
2732 let col_idx = ts
2733 .column_index(name)
2734 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?;
2735 Ok((col_idx, expr.clone()))
2736 })
2737 .collect::<Result<_>>()?;
2738 let fast_paths = if where_clause.is_none() {
2739 detect_fast_paths(ts, &compiled_assignments)
2740 } else {
2741 None
2742 };
2743 Ok(CompiledOnConflict::DoUpdate {
2744 target,
2745 assignments: compiled_assignments,
2746 where_clause: where_clause.clone(),
2747 fast_paths,
2748 })
2749 }
2750 }
2751}
2752
2753fn exec_insert_trivial_fast(
2755 wtx: &mut WriteTxn<'_>,
2756 table_lower: &str,
2757 cache: &InsertCache,
2758 bufs: &mut InsertBufs,
2759 params: &[Value],
2760) -> Result<ExecutionResult> {
2761 let prog = cache
2762 .trivial_fast_program
2763 .as_ref()
2764 .expect("trivial fast: program");
2765
2766 for &p in &prog.not_null_param_indices {
2767 if params[p as usize].is_null() {
2768 return Err(SqlError::NotNullViolation(format!("param@{p}")));
2769 }
2770 }
2771
2772 match ¶ms[prog.pk_param as usize] {
2773 Value::Integer(v) => crate::encoding::encode_int_key_into(*v, &mut bufs.key_buf),
2774 _ => return Err(SqlError::InvalidValue("non-integer PK in fast path".into())),
2775 }
2776
2777 bufs.value_buf.clear();
2778 bufs.value_buf.extend_from_slice(&prog.template);
2779
2780 for op in &prog.ops {
2781 match op {
2782 WriteOp::ParamI64 { param_idx, off } => match ¶ms[*param_idx as usize] {
2783 Value::Integer(v) => {
2784 let off = *off as usize;
2785 bufs.value_buf[off..off + 8].copy_from_slice(&v.to_le_bytes());
2786 }
2787 other => {
2788 return Err(SqlError::TypeMismatch {
2789 expected: "Integer".into(),
2790 got: other.data_type().to_string(),
2791 });
2792 }
2793 },
2794 WriteOp::LiteralI64 { value, off } => {
2795 let off = *off as usize;
2796 bufs.value_buf[off..off + 8].copy_from_slice(&value.to_le_bytes());
2797 }
2798 WriteOp::GenAddParamsI64 {
2799 a_param,
2800 b_param,
2801 off,
2802 bitmap_byte_off,
2803 bitmap_bit_mask,
2804 } => match (¶ms[*a_param as usize], ¶ms[*b_param as usize]) {
2805 (Value::Integer(a), Value::Integer(b)) => {
2806 let off = *off as usize;
2807 bufs.value_buf[off..off + 8].copy_from_slice(&a.wrapping_add(*b).to_le_bytes());
2808 }
2809 _ => {
2810 bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
2811 }
2812 },
2813 WriteOp::GenMulAddParamI64 {
2814 param_idx,
2815 mul,
2816 add,
2817 off,
2818 bitmap_byte_off,
2819 bitmap_bit_mask,
2820 } => match ¶ms[*param_idx as usize] {
2821 Value::Integer(v) => {
2822 let r = v.wrapping_mul(*mul).wrapping_add(*add);
2823 let off = *off as usize;
2824 bufs.value_buf[off..off + 8].copy_from_slice(&r.to_le_bytes());
2825 }
2826 _ => {
2827 bufs.value_buf[*bitmap_byte_off as usize] |= *bitmap_bit_mask;
2828 }
2829 },
2830 }
2831 }
2832
2833 let is_new = wtx
2834 .table_insert(table_lower.as_bytes(), &bufs.key_buf, &bufs.value_buf)
2835 .map_err(SqlError::Storage)?;
2836 if !is_new {
2837 return Err(SqlError::DuplicateKey);
2838 }
2839 Ok(ExecutionResult::RowsAffected(1))
2840}
2841
2842fn build_bind_plan(
2843 stmt: &InsertStmt,
2844 col_indices: &[usize],
2845 col_data_types: &[DataType],
2846) -> Option<Vec<BindAction>> {
2847 let rows = match &stmt.source {
2848 InsertSource::Values(rows) => rows,
2849 _ => return None,
2850 };
2851 if rows.len() != 1 {
2852 return None;
2853 }
2854 let value_row = &rows[0];
2855 if value_row.len() != col_indices.len() {
2856 return None;
2857 }
2858 let mut plan = Vec::with_capacity(value_row.len());
2859 for (i, expr) in value_row.iter().enumerate() {
2860 let col_idx = col_indices[i];
2861 let target = col_data_types[col_idx];
2862 match expr {
2863 Expr::Parameter(n) => {
2864 if *n == 0 {
2865 return None;
2866 }
2867 plan.push(BindAction::Param {
2868 param_idx: n - 1,
2869 col_idx,
2870 target,
2871 });
2872 }
2873 Expr::Literal(v) => plan.push(BindAction::Literal {
2874 value: v.clone(),
2875 col_idx,
2876 }),
2877 _ => return None,
2878 }
2879 }
2880 Some(plan)
2881}
2882
2883impl CompiledInsert {
2884 pub fn try_compile(schema: &SchemaManager, stmt: &InsertStmt) -> Option<Self> {
2885 let lower = stmt.table.to_ascii_lowercase();
2886 let cached = if let Some(ts) = schema.get(&lower) {
2887 let insert_columns: Vec<&str> = if stmt.columns.is_empty() {
2888 ts.columns.iter().map(|c| c.name.as_str()).collect()
2889 } else {
2890 stmt.columns.iter().map(|s| s.as_str()).collect()
2891 };
2892 let mut col_indices = Vec::with_capacity(insert_columns.len());
2893 for name in &insert_columns {
2894 col_indices.push(ts.column_index(name)?);
2895 }
2896 if col_indices
2897 .iter()
2898 .any(|&ci| ts.columns[ci].generated_kind.is_some())
2899 {
2900 return None;
2901 }
2902 let on_conflict = stmt
2903 .on_conflict
2904 .as_ref()
2905 .map(|oc| compile_on_conflict(oc, ts))
2906 .transpose()
2907 .ok()
2908 .flatten()
2909 .map(Arc::new);
2910 let generated_col_positions: Vec<usize> = ts
2911 .columns
2912 .iter()
2913 .enumerate()
2914 .filter_map(|(i, c)| {
2915 matches!(c.generated_kind, Some(crate::parser::GeneratedKind::Stored))
2916 .then_some(i)
2917 })
2918 .collect();
2919 let generated_fast_evals: Vec<FastGenEval> = generated_col_positions
2920 .iter()
2921 .map(|&pos| {
2922 detect_fast_gen_eval(ts.columns[pos].generated_expr.as_ref().unwrap(), ts)
2923 })
2924 .collect();
2925 let row_col_map = if on_conflict.is_some() || !generated_col_positions.is_empty() {
2926 Some(ColumnMap::new(&ts.columns))
2927 } else {
2928 None
2929 };
2930 let pk_indices: Vec<usize> = ts.pk_indices().to_vec();
2931 let non_pk_indices: Vec<usize> = ts.non_pk_indices().to_vec();
2932 let encoding_positions: Vec<u16> = ts.encoding_positions().to_vec();
2933 let dropped_non_pk_slots: Vec<u16> = ts.dropped_non_pk_slots().to_vec();
2934 let phys_count = ts.physical_non_pk_count();
2935 let col_data_types: Vec<DataType> = ts.columns.iter().map(|c| c.data_type).collect();
2936 let single_int_pk =
2937 pk_indices.len() == 1 && ts.columns[pk_indices[0]].data_type == DataType::Integer;
2938 let not_null_indices: Vec<u16> = ts
2939 .columns
2940 .iter()
2941 .filter(|c| !c.nullable)
2942 .map(|c| c.position)
2943 .collect();
2944 let bind_plan = build_bind_plan(stmt, &col_indices, &col_data_types);
2945 let any_defaults_flag = ts.columns.iter().any(|c| c.default_expr.is_some());
2946 let row_fully_overwritten = if any_defaults_flag {
2947 false
2948 } else {
2949 let mut covered: rustc_hash::FxHashSet<usize> =
2950 col_indices.iter().copied().collect();
2951 covered.extend(generated_col_positions.iter().copied());
2952 for (j, &i) in non_pk_indices.iter().enumerate() {
2953 let _ = j;
2954 if matches!(
2955 ts.columns[i].generated_kind,
2956 Some(crate::parser::GeneratedKind::Virtual)
2957 ) {
2958 covered.insert(i);
2959 }
2960 }
2961 bind_plan.is_some() && covered.len() == ts.columns.len()
2962 };
2963 let has_fks = !ts.foreign_keys.is_empty();
2964 let has_indices = !ts.indices.is_empty();
2965 let mut non_virtual_pairs: Vec<(usize, usize)> = Vec::new();
2966 let mut null_value_slots: Vec<usize> =
2967 dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
2968 for (j, &i) in non_pk_indices.iter().enumerate() {
2969 let slot = encoding_positions[j] as usize;
2970 if matches!(
2971 ts.columns[i].generated_kind,
2972 Some(crate::parser::GeneratedKind::Virtual)
2973 ) {
2974 null_value_slots.push(slot);
2975 } else {
2976 non_virtual_pairs.push((i, slot));
2977 }
2978 }
2979 let row_encoder = {
2980 let all_int_or_null = non_pk_indices.iter().enumerate().all(|(j, &i)| {
2981 let col = &ts.columns[i];
2982 if matches!(
2983 col.generated_kind,
2984 Some(crate::parser::GeneratedKind::Virtual)
2985 ) {
2986 true
2987 } else {
2988 col.data_type == DataType::Integer && encoding_positions[j] != u16::MAX
2989 }
2990 });
2991 if all_int_or_null {
2992 let mut null_slots: Vec<usize> =
2993 dropped_non_pk_slots.iter().map(|&s| s as usize).collect();
2994 for (j, &i) in non_pk_indices.iter().enumerate() {
2995 if matches!(
2996 ts.columns[i].generated_kind,
2997 Some(crate::parser::GeneratedKind::Virtual)
2998 ) {
2999 null_slots.push(encoding_positions[j] as usize);
3000 }
3001 }
3002 Some(crate::encoding::build_int_row_template(
3003 phys_count,
3004 &null_slots,
3005 ))
3006 } else {
3007 None
3008 }
3009 };
3010 let is_trivial_fast_eligible = !insert_has_subquery(stmt)
3011 && !ts.columns.iter().any(|c| c.default_expr.is_some())
3012 && !ts.has_checks()
3013 && !has_fks
3014 && !has_indices
3015 && stmt.on_conflict.is_none()
3016 && stmt.returning.is_none()
3017 && bind_plan.is_some()
3018 && row_encoder.is_some()
3019 && row_fully_overwritten
3020 && single_int_pk
3021 && generated_fast_evals
3022 .iter()
3023 .all(|fe| !matches!(fe, FastGenEval::None));
3024 let trivial_fast_program = if is_trivial_fast_eligible {
3025 build_trivial_fast_program(
3026 bind_plan.as_ref().unwrap(),
3027 row_encoder.as_ref().unwrap(),
3028 &non_virtual_pairs,
3029 &generated_col_positions,
3030 &generated_fast_evals,
3031 &pk_indices,
3032 &ts.columns,
3033 )
3034 } else {
3035 None
3036 };
3037 let is_trivial_fast = trivial_fast_program.is_some();
3038 Some(InsertCache {
3039 col_indices,
3040 has_subquery: insert_has_subquery(stmt),
3041 any_defaults: ts.columns.iter().any(|c| c.default_expr.is_some()),
3042 has_checks: ts.has_checks(),
3043 on_conflict,
3044 row_col_map,
3045 generated_col_positions,
3046 generated_fast_evals,
3047 pk_indices,
3048 non_pk_indices,
3049 encoding_positions,
3050 dropped_non_pk_slots,
3051 phys_count,
3052 single_int_pk,
3053 not_null_indices,
3054 bind_plan,
3055 row_fully_overwritten,
3056 row_encoder,
3057 is_trivial_fast,
3058 trivial_fast_program,
3059 })
3060 } else if schema.get_view(&lower).is_some() {
3061 None
3062 } else {
3063 return None;
3064 };
3065 Some(Self {
3066 table_lower: lower,
3067 cached,
3068 })
3069 }
3070}
3071
3072impl CompiledPlan for CompiledInsert {
3073 fn execute(
3074 &self,
3075 db: &Database,
3076 schema: &SchemaManager,
3077 stmt: &Statement,
3078 params: &[Value],
3079 wtx: Option<&mut WriteTxn<'_>>,
3080 ) -> Result<ExecutionResult> {
3081 let ins = match stmt {
3082 Statement::Insert(i) => i,
3083 _ => {
3084 return Err(SqlError::Unsupported(
3085 "CompiledInsert received non-INSERT statement".into(),
3086 ))
3087 }
3088 };
3089 match wtx {
3090 None => exec_insert(db, schema, ins, params),
3091 Some(outer) => match self.cached.as_ref() {
3092 Some(c) if c.is_trivial_fast => with_insert_scratch(|bufs| {
3093 exec_insert_trivial_fast(outer, &self.table_lower, c, bufs, params)
3094 }),
3095 Some(c) => exec_insert_in_txn_cached(outer, schema, ins, params, c),
3096 None => exec_insert_in_txn(outer, schema, ins, params),
3097 },
3098 }
3099 }
3100
3101 fn uses_scoped_params(&self) -> bool {
3102 match self.cached.as_ref() {
3103 Some(c) => !c.is_trivial_fast,
3104 None => true,
3105 }
3106 }
3107}
3108
3109pub struct CompiledDelete {
3110 table_lower: String,
3111}
3112
3113impl CompiledDelete {
3114 pub fn try_compile(schema: &SchemaManager, stmt: &DeleteStmt) -> Option<Self> {
3115 let lower = stmt.table.to_ascii_lowercase();
3116 schema.get(&lower)?;
3117 Some(Self { table_lower: lower })
3118 }
3119}
3120
3121impl CompiledPlan for CompiledDelete {
3122 fn execute(
3123 &self,
3124 db: &Database,
3125 schema: &SchemaManager,
3126 stmt: &Statement,
3127 _params: &[Value],
3128 wtx: Option<&mut WriteTxn<'_>>,
3129 ) -> Result<ExecutionResult> {
3130 let del = match stmt {
3131 Statement::Delete(d) => d,
3132 _ => {
3133 return Err(SqlError::Unsupported(
3134 "CompiledDelete received non-DELETE statement".into(),
3135 ))
3136 }
3137 };
3138 let _ = &self.table_lower;
3139 match wtx {
3140 None => super::write::exec_delete(db, schema, del),
3141 Some(outer) => super::write::exec_delete_in_txn(outer, schema, del),
3142 }
3143 }
3144}