1use std::cell::RefCell;
2use std::sync::Arc;
3
4use citadel::Database;
5use citadel_buffer::btree::{UpsertAction, UpsertOutcome};
6use citadel_txn::write_txn::WriteTxn;
7use rustc_hash::FxHashMap;
8
9use crate::encoding::{encode_composite_key_into, encode_row_into};
10use crate::error::{Result, SqlError};
11use crate::eval::{eval_expr, is_truthy, ColumnMap, EvalCtx};
12use crate::parser::*;
13use crate::types::*;
14
15use crate::schema::SchemaManager;
16
17use super::compile::CompiledPlan;
18use super::helpers::*;
19use super::CteContext;
20
21pub(super) fn exec_insert(
22 db: &Database,
23 schema: &SchemaManager,
24 stmt: &InsertStmt,
25 params: &[Value],
26) -> Result<ExecutionResult> {
27 let empty_ctes = CteContext::default();
28 let materialized;
29 let stmt = if insert_has_subquery(stmt) {
30 materialized = materialize_insert(stmt, &mut |sub| {
31 exec_subquery_read(db, schema, sub, &empty_ctes)
32 })?;
33 &materialized
34 } else {
35 stmt
36 };
37
38 let lower_name = stmt.table.to_ascii_lowercase();
39 if schema.get_view(&lower_name).is_some() {
40 return Err(SqlError::CannotModifyView(stmt.table.clone()));
41 }
42 let table_schema = schema
43 .get(&lower_name)
44 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
45
46 let insert_columns = if stmt.columns.is_empty() {
47 table_schema
48 .columns
49 .iter()
50 .map(|c| c.name.clone())
51 .collect::<Vec<_>>()
52 } else {
53 stmt.columns
54 .iter()
55 .map(|c| c.to_ascii_lowercase())
56 .collect()
57 };
58
59 let col_indices: Vec<usize> = insert_columns
60 .iter()
61 .map(|name| {
62 table_schema
63 .column_index(name)
64 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
65 })
66 .collect::<Result<_>>()?;
67
68 let defaults: Vec<(usize, &Expr)> = table_schema
69 .columns
70 .iter()
71 .filter(|c| c.default_expr.is_some() && !col_indices.contains(&(c.position as usize)))
72 .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
73 .collect();
74
75 let has_checks = table_schema.has_checks();
77 let check_col_map = if has_checks {
78 Some(ColumnMap::new(&table_schema.columns))
79 } else {
80 None
81 };
82
83 let select_rows = match &stmt.source {
84 InsertSource::Select(sq) => {
85 let insert_ctes =
86 super::materialize_all_ctes(&sq.ctes, sq.recursive, &mut |body, ctx| {
87 exec_query_body_read(db, schema, body, ctx)
88 })?;
89 let qr = exec_query_body_read(db, schema, &sq.body, &insert_ctes)?;
90 Some(qr.rows)
91 }
92 InsertSource::Values(_) => None,
93 };
94
95 let compiled_conflict: Option<Arc<CompiledOnConflict>> = stmt
96 .on_conflict
97 .as_ref()
98 .map(|oc| compile_on_conflict(oc, table_schema).map(Arc::new))
99 .transpose()?;
100
101 let row_col_map = compiled_conflict
102 .as_ref()
103 .map(|_| ColumnMap::new(&table_schema.columns));
104
105 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
106 let mut count: u64 = 0;
107
108 let pk_indices = table_schema.pk_indices();
109 let non_pk = table_schema.non_pk_indices();
110 let enc_pos = table_schema.encoding_positions();
111 let phys_count = table_schema.physical_non_pk_count();
112 let mut row = vec![Value::Null; table_schema.columns.len()];
113 let mut pk_values: Vec<Value> = vec![Value::Null; pk_indices.len()];
114 let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
115 let mut key_buf: Vec<u8> = Vec::with_capacity(64);
116 let mut value_buf: Vec<u8> = Vec::with_capacity(256);
117 let mut fk_key_buf: Vec<u8> = Vec::with_capacity(64);
118
119 let values = match &stmt.source {
120 InsertSource::Values(rows) => Some(rows.as_slice()),
121 InsertSource::Select(_) => None,
122 };
123 let sel_rows = select_rows.as_deref();
124
125 let total = match (values, sel_rows) {
126 (Some(rows), _) => rows.len(),
127 (_, Some(rows)) => rows.len(),
128 _ => 0,
129 };
130
131 if let Some(sel) = sel_rows {
132 if !sel.is_empty() && sel[0].len() != insert_columns.len() {
133 return Err(SqlError::InvalidValue(format!(
134 "INSERT ... SELECT column count mismatch: expected {}, got {}",
135 insert_columns.len(),
136 sel[0].len()
137 )));
138 }
139 }
140
141 for idx in 0..total {
142 for v in row.iter_mut() {
143 *v = Value::Null;
144 }
145
146 if let Some(value_rows) = values {
147 let value_row = &value_rows[idx];
148 if value_row.len() != insert_columns.len() {
149 return Err(SqlError::InvalidValue(format!(
150 "expected {} values, got {}",
151 insert_columns.len(),
152 value_row.len()
153 )));
154 }
155 for (i, expr) in value_row.iter().enumerate() {
156 let val = if let Expr::Parameter(n) = expr {
157 params
158 .get(n - 1)
159 .cloned()
160 .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?
161 } else {
162 eval_const_expr(expr)?
163 };
164 let col_idx = col_indices[i];
165 let col = &table_schema.columns[col_idx];
166 let got_type = val.data_type();
167 row[col_idx] = if val.is_null() {
168 Value::Null
169 } else {
170 val.coerce_into(col.data_type)
171 .ok_or_else(|| SqlError::TypeMismatch {
172 expected: col.data_type.to_string(),
173 got: got_type.to_string(),
174 })?
175 };
176 }
177 } else if let Some(sel) = sel_rows {
178 let sel_row = &sel[idx];
179 for (i, val) in sel_row.iter().enumerate() {
180 let col_idx = col_indices[i];
181 let col = &table_schema.columns[col_idx];
182 let got_type = val.data_type();
183 row[col_idx] = if val.is_null() {
184 Value::Null
185 } else {
186 val.clone().coerce_into(col.data_type).ok_or_else(|| {
187 SqlError::TypeMismatch {
188 expected: col.data_type.to_string(),
189 got: got_type.to_string(),
190 }
191 })?
192 };
193 }
194 }
195
196 for &(pos, def_expr) in &defaults {
197 let val = eval_const_expr(def_expr)?;
198 let col = &table_schema.columns[pos];
199 if val.is_null() {
200 } else {
202 let got_type = val.data_type();
203 row[pos] =
204 val.coerce_into(col.data_type)
205 .ok_or_else(|| SqlError::TypeMismatch {
206 expected: col.data_type.to_string(),
207 got: got_type.to_string(),
208 })?;
209 }
210 }
211
212 for col in &table_schema.columns {
213 if !col.nullable && row[col.position as usize].is_null() {
214 return Err(SqlError::NotNullViolation(col.name.clone()));
215 }
216 }
217
218 if let Some(ref col_map) = check_col_map {
219 for col in &table_schema.columns {
220 if let Some(ref check) = col.check_expr {
221 let result = eval_expr(check, &EvalCtx::new(col_map, &row))?;
222 if !is_truthy(&result) && !result.is_null() {
223 let name = col.check_name.as_deref().unwrap_or(&col.name);
224 return Err(SqlError::CheckViolation(name.to_string()));
225 }
226 }
227 }
228 for tc in &table_schema.check_constraints {
229 let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &row))?;
230 if !is_truthy(&result) && !result.is_null() {
231 let name = tc.name.as_deref().unwrap_or(&tc.sql);
232 return Err(SqlError::CheckViolation(name.to_string()));
233 }
234 }
235 }
236
237 for fk in &table_schema.foreign_keys {
238 let any_null = fk.columns.iter().any(|&ci| row[ci as usize].is_null());
239 if any_null {
240 continue; }
242 let fk_vals: Vec<Value> = fk
243 .columns
244 .iter()
245 .map(|&ci| row[ci as usize].clone())
246 .collect();
247 fk_key_buf.clear();
248 encode_composite_key_into(&fk_vals, &mut fk_key_buf);
249 let found = wtx
250 .table_get(fk.foreign_table.as_bytes(), &fk_key_buf)
251 .map_err(SqlError::Storage)?;
252 if found.is_none() {
253 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
254 return Err(SqlError::ForeignKeyViolation(name.to_string()));
255 }
256 }
257
258 for (j, &i) in pk_indices.iter().enumerate() {
259 pk_values[j] = std::mem::replace(&mut row[i], Value::Null);
260 }
261 encode_composite_key_into(&pk_values, &mut key_buf);
262
263 for (j, &i) in non_pk.iter().enumerate() {
264 value_values[enc_pos[j] as usize] = std::mem::replace(&mut row[i], Value::Null);
265 }
266 encode_row_into(&value_values, &mut value_buf);
267
268 if key_buf.len() > citadel_core::MAX_KEY_SIZE {
269 return Err(SqlError::KeyTooLarge {
270 size: key_buf.len(),
271 max: citadel_core::MAX_KEY_SIZE,
272 });
273 }
274 if value_buf.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
275 return Err(SqlError::RowTooLarge {
276 size: value_buf.len(),
277 max: citadel_core::MAX_INLINE_VALUE_SIZE,
278 });
279 }
280
281 match compiled_conflict.as_ref() {
282 None => {
283 let is_new = wtx
284 .table_insert(stmt.table.as_bytes(), &key_buf, &value_buf)
285 .map_err(SqlError::Storage)?;
286 if !is_new {
287 return Err(SqlError::DuplicateKey);
288 }
289 if !table_schema.indices.is_empty() {
290 for (j, &i) in pk_indices.iter().enumerate() {
291 row[i] = pk_values[j].clone();
292 }
293 for (j, &i) in non_pk.iter().enumerate() {
294 row[i] =
295 std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
296 }
297 insert_index_entries(&mut wtx, table_schema, &row, &pk_values)?;
298 }
299 count += 1;
300 }
301 Some(oc) => {
302 let oc_ref: &CompiledOnConflict = oc;
303 let needs_row = upsert_needs_row(oc_ref, table_schema);
304 if needs_row {
305 for (j, &i) in pk_indices.iter().enumerate() {
306 row[i] = pk_values[j].clone();
307 }
308 for (j, &i) in non_pk.iter().enumerate() {
309 row[i] =
310 std::mem::replace(&mut value_values[enc_pos[j] as usize], Value::Null);
311 }
312 }
313 match apply_insert_with_conflict(
314 &mut wtx,
315 table_schema,
316 &key_buf,
317 &value_buf,
318 &row,
319 &pk_values,
320 oc_ref,
321 row_col_map.as_ref().unwrap(),
322 )? {
323 InsertRowOutcome::Inserted => count += 1,
324 InsertRowOutcome::Skipped => {}
325 }
326 }
327 }
328 }
329
330 wtx.commit().map_err(SqlError::Storage)?;
331 Ok(ExecutionResult::RowsAffected(count))
332}
333
334pub(super) fn has_subquery(expr: &Expr) -> bool {
335 crate::parser::has_subquery(expr)
336}
337
338pub(super) fn stmt_has_subquery(stmt: &SelectStmt) -> bool {
339 if let Some(ref w) = stmt.where_clause {
340 if has_subquery(w) {
341 return true;
342 }
343 }
344 if let Some(ref h) = stmt.having {
345 if has_subquery(h) {
346 return true;
347 }
348 }
349 for col in &stmt.columns {
350 if let SelectColumn::Expr { expr, .. } = col {
351 if has_subquery(expr) {
352 return true;
353 }
354 }
355 }
356 for ob in &stmt.order_by {
357 if has_subquery(&ob.expr) {
358 return true;
359 }
360 }
361 for join in &stmt.joins {
362 if let Some(ref on_expr) = join.on_clause {
363 if has_subquery(on_expr) {
364 return true;
365 }
366 }
367 }
368 false
369}
370
371pub(super) fn materialize_expr(
372 expr: &Expr,
373 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
374) -> Result<Expr> {
375 match expr {
376 Expr::InSubquery {
377 expr: e,
378 subquery,
379 negated,
380 } => {
381 let inner = materialize_expr(e, exec_sub)?;
382 let qr = exec_sub(subquery)?;
383 if !qr.columns.is_empty() && qr.columns.len() != 1 {
384 return Err(SqlError::SubqueryMultipleColumns);
385 }
386 let mut values = std::collections::HashSet::new();
387 let mut has_null = false;
388 for row in &qr.rows {
389 if row[0].is_null() {
390 has_null = true;
391 } else {
392 values.insert(row[0].clone());
393 }
394 }
395 Ok(Expr::InSet {
396 expr: Box::new(inner),
397 values,
398 has_null,
399 negated: *negated,
400 })
401 }
402 Expr::ScalarSubquery(subquery) => {
403 let qr = exec_sub(subquery)?;
404 if qr.rows.len() > 1 {
405 return Err(SqlError::SubqueryMultipleRows);
406 }
407 let val = if qr.rows.is_empty() {
408 Value::Null
409 } else {
410 qr.rows[0][0].clone()
411 };
412 Ok(Expr::Literal(val))
413 }
414 Expr::Exists { subquery, negated } => {
415 let qr = exec_sub(subquery)?;
416 let exists = !qr.rows.is_empty();
417 let result = if *negated { !exists } else { exists };
418 Ok(Expr::Literal(Value::Boolean(result)))
419 }
420 Expr::InList {
421 expr: e,
422 list,
423 negated,
424 } => {
425 let inner = materialize_expr(e, exec_sub)?;
426 let items = list
427 .iter()
428 .map(|item| materialize_expr(item, exec_sub))
429 .collect::<Result<Vec<_>>>()?;
430 Ok(Expr::InList {
431 expr: Box::new(inner),
432 list: items,
433 negated: *negated,
434 })
435 }
436 Expr::BinaryOp { left, op, right } => Ok(Expr::BinaryOp {
437 left: Box::new(materialize_expr(left, exec_sub)?),
438 op: *op,
439 right: Box::new(materialize_expr(right, exec_sub)?),
440 }),
441 Expr::UnaryOp { op, expr: e } => Ok(Expr::UnaryOp {
442 op: *op,
443 expr: Box::new(materialize_expr(e, exec_sub)?),
444 }),
445 Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(materialize_expr(e, exec_sub)?))),
446 Expr::IsNotNull(e) => Ok(Expr::IsNotNull(Box::new(materialize_expr(e, exec_sub)?))),
447 Expr::InSet {
448 expr: e,
449 values,
450 has_null,
451 negated,
452 } => Ok(Expr::InSet {
453 expr: Box::new(materialize_expr(e, exec_sub)?),
454 values: values.clone(),
455 has_null: *has_null,
456 negated: *negated,
457 }),
458 Expr::Between {
459 expr: e,
460 low,
461 high,
462 negated,
463 } => Ok(Expr::Between {
464 expr: Box::new(materialize_expr(e, exec_sub)?),
465 low: Box::new(materialize_expr(low, exec_sub)?),
466 high: Box::new(materialize_expr(high, exec_sub)?),
467 negated: *negated,
468 }),
469 Expr::Like {
470 expr: e,
471 pattern,
472 escape,
473 negated,
474 } => {
475 let esc = escape
476 .as_ref()
477 .map(|es| materialize_expr(es, exec_sub).map(Box::new))
478 .transpose()?;
479 Ok(Expr::Like {
480 expr: Box::new(materialize_expr(e, exec_sub)?),
481 pattern: Box::new(materialize_expr(pattern, exec_sub)?),
482 escape: esc,
483 negated: *negated,
484 })
485 }
486 Expr::Case {
487 operand,
488 conditions,
489 else_result,
490 } => {
491 let op = operand
492 .as_ref()
493 .map(|e| materialize_expr(e, exec_sub).map(Box::new))
494 .transpose()?;
495 let conds = conditions
496 .iter()
497 .map(|(c, r)| {
498 Ok((
499 materialize_expr(c, exec_sub)?,
500 materialize_expr(r, exec_sub)?,
501 ))
502 })
503 .collect::<Result<Vec<_>>>()?;
504 let else_r = else_result
505 .as_ref()
506 .map(|e| materialize_expr(e, exec_sub).map(Box::new))
507 .transpose()?;
508 Ok(Expr::Case {
509 operand: op,
510 conditions: conds,
511 else_result: else_r,
512 })
513 }
514 Expr::Coalesce(args) => {
515 let materialized = args
516 .iter()
517 .map(|a| materialize_expr(a, exec_sub))
518 .collect::<Result<Vec<_>>>()?;
519 Ok(Expr::Coalesce(materialized))
520 }
521 Expr::Cast { expr: e, data_type } => Ok(Expr::Cast {
522 expr: Box::new(materialize_expr(e, exec_sub)?),
523 data_type: *data_type,
524 }),
525 Expr::Function {
526 name,
527 args,
528 distinct,
529 } => {
530 let materialized = args
531 .iter()
532 .map(|a| materialize_expr(a, exec_sub))
533 .collect::<Result<Vec<_>>>()?;
534 Ok(Expr::Function {
535 name: name.clone(),
536 args: materialized,
537 distinct: *distinct,
538 })
539 }
540 other => Ok(other.clone()),
541 }
542}
543
544pub(super) fn materialize_stmt(
545 stmt: &SelectStmt,
546 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
547) -> Result<SelectStmt> {
548 let where_clause = stmt
549 .where_clause
550 .as_ref()
551 .map(|e| materialize_expr(e, exec_sub))
552 .transpose()?;
553 let having = stmt
554 .having
555 .as_ref()
556 .map(|e| materialize_expr(e, exec_sub))
557 .transpose()?;
558 let columns = stmt
559 .columns
560 .iter()
561 .map(|c| match c {
562 SelectColumn::AllColumns => Ok(SelectColumn::AllColumns),
563 SelectColumn::Expr { expr, alias } => Ok(SelectColumn::Expr {
564 expr: materialize_expr(expr, exec_sub)?,
565 alias: alias.clone(),
566 }),
567 })
568 .collect::<Result<Vec<_>>>()?;
569 let order_by = stmt
570 .order_by
571 .iter()
572 .map(|ob| {
573 Ok(OrderByItem {
574 expr: materialize_expr(&ob.expr, exec_sub)?,
575 descending: ob.descending,
576 nulls_first: ob.nulls_first,
577 })
578 })
579 .collect::<Result<Vec<_>>>()?;
580 let joins = stmt
581 .joins
582 .iter()
583 .map(|j| {
584 let on_clause = j
585 .on_clause
586 .as_ref()
587 .map(|e| materialize_expr(e, exec_sub))
588 .transpose()?;
589 Ok(JoinClause {
590 join_type: j.join_type,
591 table: j.table.clone(),
592 on_clause,
593 })
594 })
595 .collect::<Result<Vec<_>>>()?;
596 let group_by = stmt
597 .group_by
598 .iter()
599 .map(|e| materialize_expr(e, exec_sub))
600 .collect::<Result<Vec<_>>>()?;
601 Ok(SelectStmt {
602 columns,
603 from: stmt.from.clone(),
604 from_alias: stmt.from_alias.clone(),
605 joins,
606 distinct: stmt.distinct,
607 where_clause,
608 order_by,
609 limit: stmt.limit.clone(),
610 offset: stmt.offset.clone(),
611 group_by,
612 having,
613 })
614}
615
616pub(super) fn exec_subquery_read(
617 db: &Database,
618 schema: &SchemaManager,
619 stmt: &SelectStmt,
620 ctes: &CteContext,
621) -> Result<QueryResult> {
622 match super::exec_select(db, schema, stmt, ctes)? {
623 ExecutionResult::Query(qr) => Ok(qr),
624 _ => Ok(QueryResult {
625 columns: vec![],
626 rows: vec![],
627 }),
628 }
629}
630
631pub(super) fn exec_subquery_write(
632 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
633 schema: &SchemaManager,
634 stmt: &SelectStmt,
635 ctes: &CteContext,
636) -> Result<QueryResult> {
637 match super::exec_select_in_txn(wtx, schema, stmt, ctes)? {
638 ExecutionResult::Query(qr) => Ok(qr),
639 _ => Ok(QueryResult {
640 columns: vec![],
641 rows: vec![],
642 }),
643 }
644}
645
646pub(super) fn update_has_subquery(stmt: &UpdateStmt) -> bool {
647 stmt.where_clause.as_ref().is_some_and(has_subquery)
648 || stmt.assignments.iter().any(|(_, e)| has_subquery(e))
649}
650
651pub(super) fn materialize_update(
652 stmt: &UpdateStmt,
653 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
654) -> Result<UpdateStmt> {
655 let where_clause = stmt
656 .where_clause
657 .as_ref()
658 .map(|e| materialize_expr(e, exec_sub))
659 .transpose()?;
660 let assignments = stmt
661 .assignments
662 .iter()
663 .map(|(name, expr)| Ok((name.clone(), materialize_expr(expr, exec_sub)?)))
664 .collect::<Result<Vec<_>>>()?;
665 Ok(UpdateStmt {
666 table: stmt.table.clone(),
667 assignments,
668 where_clause,
669 })
670}
671
672pub(super) fn delete_has_subquery(stmt: &DeleteStmt) -> bool {
673 stmt.where_clause.as_ref().is_some_and(has_subquery)
674}
675
676pub(super) fn materialize_delete(
677 stmt: &DeleteStmt,
678 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
679) -> Result<DeleteStmt> {
680 let where_clause = stmt
681 .where_clause
682 .as_ref()
683 .map(|e| materialize_expr(e, exec_sub))
684 .transpose()?;
685 Ok(DeleteStmt {
686 table: stmt.table.clone(),
687 where_clause,
688 })
689}
690
691pub(super) fn insert_has_subquery(stmt: &InsertStmt) -> bool {
692 match &stmt.source {
693 InsertSource::Values(rows) => rows.iter().any(|row| row.iter().any(has_subquery)),
694 InsertSource::Select(_) => false,
696 }
697}
698
699pub(super) fn materialize_insert(
700 stmt: &InsertStmt,
701 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
702) -> Result<InsertStmt> {
703 let source = match &stmt.source {
704 InsertSource::Values(rows) => {
705 let mat = rows
706 .iter()
707 .map(|row| {
708 row.iter()
709 .map(|e| materialize_expr(e, exec_sub))
710 .collect::<Result<Vec<_>>>()
711 })
712 .collect::<Result<Vec<_>>>()?;
713 InsertSource::Values(mat)
714 }
715 InsertSource::Select(sq) => {
716 let ctes = sq
717 .ctes
718 .iter()
719 .map(|c| {
720 Ok(CteDefinition {
721 name: c.name.clone(),
722 column_aliases: c.column_aliases.clone(),
723 body: materialize_query_body(&c.body, exec_sub)?,
724 })
725 })
726 .collect::<Result<Vec<_>>>()?;
727 let body = materialize_query_body(&sq.body, exec_sub)?;
728 InsertSource::Select(Box::new(SelectQuery {
729 ctes,
730 recursive: sq.recursive,
731 body,
732 }))
733 }
734 };
735 Ok(InsertStmt {
736 table: stmt.table.clone(),
737 columns: stmt.columns.clone(),
738 source,
739 on_conflict: stmt.on_conflict.clone(),
740 })
741}
742
743pub(super) fn materialize_query_body(
744 body: &QueryBody,
745 exec_sub: &mut dyn FnMut(&SelectStmt) -> Result<QueryResult>,
746) -> Result<QueryBody> {
747 match body {
748 QueryBody::Select(sel) => Ok(QueryBody::Select(Box::new(materialize_stmt(
749 sel, exec_sub,
750 )?))),
751 QueryBody::Compound(comp) => Ok(QueryBody::Compound(Box::new(CompoundSelect {
752 op: comp.op.clone(),
753 all: comp.all,
754 left: Box::new(materialize_query_body(&comp.left, exec_sub)?),
755 right: Box::new(materialize_query_body(&comp.right, exec_sub)?),
756 order_by: comp.order_by.clone(),
757 limit: comp.limit.clone(),
758 offset: comp.offset.clone(),
759 }))),
760 }
761}
762
763pub(super) fn exec_query_body(
764 db: &Database,
765 schema: &SchemaManager,
766 body: &QueryBody,
767 ctes: &CteContext,
768) -> Result<ExecutionResult> {
769 match body {
770 QueryBody::Select(sel) => super::exec_select(db, schema, sel, ctes),
771 QueryBody::Compound(comp) => exec_compound_select(db, schema, comp, ctes),
772 }
773}
774
775pub(super) fn exec_query_body_in_txn(
776 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
777 schema: &SchemaManager,
778 body: &QueryBody,
779 ctes: &CteContext,
780) -> Result<ExecutionResult> {
781 match body {
782 QueryBody::Select(sel) => super::exec_select_in_txn(wtx, schema, sel, ctes),
783 QueryBody::Compound(comp) => exec_compound_select_in_txn(wtx, schema, comp, ctes),
784 }
785}
786
787pub(super) fn exec_query_body_read(
788 db: &Database,
789 schema: &SchemaManager,
790 body: &QueryBody,
791 ctes: &CteContext,
792) -> Result<QueryResult> {
793 match exec_query_body(db, schema, body, ctes)? {
794 ExecutionResult::Query(qr) => Ok(qr),
795 _ => Ok(QueryResult {
796 columns: vec![],
797 rows: vec![],
798 }),
799 }
800}
801
802pub(super) fn exec_query_body_write(
803 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
804 schema: &SchemaManager,
805 body: &QueryBody,
806 ctes: &CteContext,
807) -> Result<QueryResult> {
808 match exec_query_body_in_txn(wtx, schema, body, ctes)? {
809 ExecutionResult::Query(qr) => Ok(qr),
810 _ => Ok(QueryResult {
811 columns: vec![],
812 rows: vec![],
813 }),
814 }
815}
816
817pub(super) fn exec_compound_select(
818 db: &Database,
819 schema: &SchemaManager,
820 comp: &CompoundSelect,
821 ctes: &CteContext,
822) -> Result<ExecutionResult> {
823 let left_qr = match exec_query_body(db, schema, &comp.left, ctes)? {
824 ExecutionResult::Query(qr) => qr,
825 _ => QueryResult {
826 columns: vec![],
827 rows: vec![],
828 },
829 };
830 let right_qr = match exec_query_body(db, schema, &comp.right, ctes)? {
831 ExecutionResult::Query(qr) => qr,
832 _ => QueryResult {
833 columns: vec![],
834 rows: vec![],
835 },
836 };
837 apply_set_operation(comp, left_qr, right_qr)
838}
839
840pub(super) fn exec_compound_select_in_txn(
841 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
842 schema: &SchemaManager,
843 comp: &CompoundSelect,
844 ctes: &CteContext,
845) -> Result<ExecutionResult> {
846 let left_qr = match exec_query_body_in_txn(wtx, schema, &comp.left, ctes)? {
847 ExecutionResult::Query(qr) => qr,
848 _ => QueryResult {
849 columns: vec![],
850 rows: vec![],
851 },
852 };
853 let right_qr = match exec_query_body_in_txn(wtx, schema, &comp.right, ctes)? {
854 ExecutionResult::Query(qr) => qr,
855 _ => QueryResult {
856 columns: vec![],
857 rows: vec![],
858 },
859 };
860 apply_set_operation(comp, left_qr, right_qr)
861}
862
863pub(super) fn apply_set_operation(
864 comp: &CompoundSelect,
865 left_qr: QueryResult,
866 right_qr: QueryResult,
867) -> Result<ExecutionResult> {
868 if !left_qr.columns.is_empty()
869 && !right_qr.columns.is_empty()
870 && left_qr.columns.len() != right_qr.columns.len()
871 {
872 return Err(SqlError::CompoundColumnCountMismatch {
873 left: left_qr.columns.len(),
874 right: right_qr.columns.len(),
875 });
876 }
877
878 let columns = left_qr.columns;
879
880 let mut rows = match (&comp.op, comp.all) {
881 (SetOp::Union, true) => {
882 let mut rows = left_qr.rows;
883 rows.extend(right_qr.rows);
884 rows
885 }
886 (SetOp::Union, false) => {
887 let mut seen: std::collections::HashSet<Vec<Value>> = std::collections::HashSet::new();
888 let mut rows = Vec::new();
889 for row in left_qr.rows.into_iter().chain(right_qr.rows) {
890 if !seen.contains(&row) {
891 seen.insert(row.clone());
892 rows.push(row);
893 }
894 }
895 rows
896 }
897 (SetOp::Intersect, true) => {
898 let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
899 for row in &right_qr.rows {
900 *right_counts.entry(row.clone()).or_insert(0) += 1;
901 }
902 let mut rows = Vec::new();
903 for row in left_qr.rows {
904 if let Some(count) = right_counts.get_mut(&row) {
905 if *count > 0 {
906 *count -= 1;
907 rows.push(row);
908 }
909 }
910 }
911 rows
912 }
913 (SetOp::Intersect, false) => {
914 let right_set: std::collections::HashSet<Vec<Value>> =
915 right_qr.rows.into_iter().collect();
916 let mut seen: std::collections::HashSet<Vec<Value>> = std::collections::HashSet::new();
917 let mut rows = Vec::new();
918 for row in left_qr.rows {
919 if right_set.contains(&row) && !seen.contains(&row) {
920 seen.insert(row.clone());
921 rows.push(row);
922 }
923 }
924 rows
925 }
926 (SetOp::Except, true) => {
927 let mut right_counts: FxHashMap<Vec<Value>, usize> = FxHashMap::default();
928 for row in &right_qr.rows {
929 *right_counts.entry(row.clone()).or_insert(0) += 1;
930 }
931 let mut rows = Vec::new();
932 for row in left_qr.rows {
933 if let Some(count) = right_counts.get_mut(&row) {
934 if *count > 0 {
935 *count -= 1;
936 continue;
937 }
938 }
939 rows.push(row);
940 }
941 rows
942 }
943 (SetOp::Except, false) => {
944 let right_set: std::collections::HashSet<Vec<Value>> =
945 right_qr.rows.into_iter().collect();
946 let mut seen: std::collections::HashSet<Vec<Value>> = std::collections::HashSet::new();
947 let mut rows = Vec::new();
948 for row in left_qr.rows {
949 if !right_set.contains(&row) && !seen.contains(&row) {
950 seen.insert(row.clone());
951 rows.push(row);
952 }
953 }
954 rows
955 }
956 };
957
958 if !comp.order_by.is_empty() {
959 let col_defs: Vec<crate::types::ColumnDef> = columns
960 .iter()
961 .enumerate()
962 .map(|(i, name)| crate::types::ColumnDef {
963 name: name.clone(),
964 data_type: crate::types::DataType::Null,
965 nullable: true,
966 position: i as u16,
967 default_expr: None,
968 default_sql: None,
969 check_expr: None,
970 check_sql: None,
971 check_name: None,
972 is_with_timezone: false,
973 })
974 .collect();
975 sort_rows(&mut rows, &comp.order_by, &col_defs)?;
976 }
977
978 if let Some(ref offset_expr) = comp.offset {
979 let offset = eval_const_int(offset_expr)?.max(0) as usize;
980 if offset < rows.len() {
981 rows = rows.split_off(offset);
982 } else {
983 rows.clear();
984 }
985 }
986
987 if let Some(ref limit_expr) = comp.limit {
988 let limit = eval_const_int(limit_expr)?.max(0) as usize;
989 rows.truncate(limit);
990 }
991
992 Ok(ExecutionResult::Query(QueryResult { columns, rows }))
993}
994
995struct InsertBufs {
996 row: Vec<Value>,
997 pk_values: Vec<Value>,
998 value_values: Vec<Value>,
999 key_buf: Vec<u8>,
1000 value_buf: Vec<u8>,
1001 col_indices: Vec<usize>,
1002 fk_key_buf: Vec<u8>,
1003}
1004
1005impl InsertBufs {
1006 fn new() -> Self {
1007 Self {
1008 row: Vec::new(),
1009 pk_values: Vec::new(),
1010 value_values: Vec::new(),
1011 key_buf: Vec::with_capacity(64),
1012 value_buf: Vec::with_capacity(256),
1013 col_indices: Vec::new(),
1014 fk_key_buf: Vec::with_capacity(64),
1015 }
1016 }
1017}
1018
1019thread_local! {
1020 static INSERT_SCRATCH: RefCell<InsertBufs> = RefCell::new(InsertBufs::new());
1021 static UPSERT_SCRATCH: RefCell<UpsertBufs> = RefCell::new(UpsertBufs::new());
1022}
1023
1024fn with_insert_scratch<R>(f: impl FnOnce(&mut InsertBufs) -> R) -> R {
1025 INSERT_SCRATCH.with(|slot| f(&mut slot.borrow_mut()))
1026}
1027
1028pub(super) struct UpsertBufs {
1029 old_row: Vec<Value>,
1030 new_row: Vec<Value>,
1031 value_values: Vec<Value>,
1032 new_value_buf: Vec<u8>,
1033}
1034
1035impl UpsertBufs {
1036 pub(super) fn new() -> Self {
1037 Self {
1038 old_row: Vec::new(),
1039 new_row: Vec::new(),
1040 value_values: Vec::new(),
1041 new_value_buf: Vec::with_capacity(256),
1042 }
1043 }
1044}
1045
1046pub fn exec_insert_in_txn(
1047 wtx: &mut WriteTxn<'_>,
1048 schema: &SchemaManager,
1049 stmt: &InsertStmt,
1050 params: &[Value],
1051) -> Result<ExecutionResult> {
1052 with_insert_scratch(|bufs| exec_insert_in_txn_impl(wtx, schema, stmt, params, bufs, None))
1053}
1054
1055fn exec_insert_in_txn_cached(
1056 wtx: &mut WriteTxn<'_>,
1057 schema: &SchemaManager,
1058 stmt: &InsertStmt,
1059 params: &[Value],
1060 cache: &InsertCache,
1061) -> Result<ExecutionResult> {
1062 with_insert_scratch(|bufs| {
1063 exec_insert_in_txn_impl(wtx, schema, stmt, params, bufs, Some(cache))
1064 })
1065}
1066
1067fn exec_insert_in_txn_impl(
1068 wtx: &mut WriteTxn<'_>,
1069 schema: &SchemaManager,
1070 stmt: &InsertStmt,
1071 params: &[Value],
1072 bufs: &mut InsertBufs,
1073 cache: Option<&InsertCache>,
1074) -> Result<ExecutionResult> {
1075 let empty_ctes = CteContext::default();
1076 let materialized;
1077 let has_sub = match cache {
1078 Some(c) => c.has_subquery,
1079 None => insert_has_subquery(stmt),
1080 };
1081 let stmt = if has_sub {
1082 materialized = materialize_insert(stmt, &mut |sub| {
1083 exec_subquery_write(wtx, schema, sub, &empty_ctes)
1084 })?;
1085 &materialized
1086 } else {
1087 stmt
1088 };
1089
1090 let table_schema = schema
1091 .get(&stmt.table)
1092 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1093
1094 let default_columns;
1095 let insert_columns: &[String] = if stmt.columns.is_empty() {
1096 default_columns = table_schema
1097 .columns
1098 .iter()
1099 .map(|c| c.name.clone())
1100 .collect::<Vec<_>>();
1101 &default_columns
1102 } else {
1103 &stmt.columns
1104 };
1105
1106 bufs.col_indices.clear();
1107 if let Some(c) = cache {
1108 bufs.col_indices.extend_from_slice(&c.col_indices);
1109 } else {
1110 for name in insert_columns {
1111 bufs.col_indices.push(
1112 table_schema
1113 .column_index(name)
1114 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?,
1115 );
1116 }
1117 }
1118
1119 let any_defaults = match cache {
1120 Some(c) => c.any_defaults,
1121 None => table_schema
1122 .columns
1123 .iter()
1124 .any(|c| c.default_expr.is_some()),
1125 };
1126 let defaults: Vec<(usize, &Expr)> = if any_defaults {
1127 table_schema
1128 .columns
1129 .iter()
1130 .filter(|c| {
1131 c.default_expr.is_some() && !bufs.col_indices.contains(&(c.position as usize))
1132 })
1133 .map(|c| (c.position as usize, c.default_expr.as_ref().unwrap()))
1134 .collect()
1135 } else {
1136 Vec::new()
1137 };
1138
1139 let has_checks = match cache {
1140 Some(c) => c.has_checks,
1141 None => table_schema.has_checks(),
1142 };
1143 let check_col_map = if has_checks {
1144 Some(ColumnMap::new(&table_schema.columns))
1145 } else {
1146 None
1147 };
1148
1149 let pk_indices = table_schema.pk_indices();
1150 let non_pk = table_schema.non_pk_indices();
1151 let enc_pos = table_schema.encoding_positions();
1152 let phys_count = table_schema.physical_non_pk_count();
1153 let dropped = table_schema.dropped_non_pk_slots();
1154
1155 bufs.row.resize(table_schema.columns.len(), Value::Null);
1156 bufs.pk_values.resize(pk_indices.len(), Value::Null);
1157 bufs.value_values.resize(phys_count, Value::Null);
1158
1159 let table_bytes = stmt.table.as_bytes();
1160 let has_fks = !table_schema.foreign_keys.is_empty();
1161 let has_indices = !table_schema.indices.is_empty();
1162 let has_defaults = !defaults.is_empty();
1163
1164 let compiled_conflict: Option<Arc<CompiledOnConflict>> = match (cache, &stmt.on_conflict) {
1165 (Some(c), Some(_)) if c.on_conflict.is_some() => c.on_conflict.clone(),
1166 (_, Some(oc)) => Some(Arc::new(compile_on_conflict(oc, table_schema)?)),
1167 (_, None) => None,
1168 };
1169
1170 let row_col_map_owned: Option<ColumnMap> =
1171 if compiled_conflict.is_some() && cache.and_then(|c| c.row_col_map.as_ref()).is_none() {
1172 Some(ColumnMap::new(&table_schema.columns))
1173 } else {
1174 None
1175 };
1176 let row_col_map: Option<&ColumnMap> = cache
1177 .and_then(|c| c.row_col_map.as_ref())
1178 .or(row_col_map_owned.as_ref());
1179
1180 let select_rows = match &stmt.source {
1181 InsertSource::Select(sq) => {
1182 let insert_ctes =
1183 super::materialize_all_ctes(&sq.ctes, sq.recursive, &mut |body, ctx| {
1184 exec_query_body_write(wtx, schema, body, ctx)
1185 })?;
1186 let qr = exec_query_body_write(wtx, schema, &sq.body, &insert_ctes)?;
1187 Some(qr.rows)
1188 }
1189 InsertSource::Values(_) => None,
1190 };
1191
1192 let mut count: u64 = 0;
1193
1194 let values = match &stmt.source {
1195 InsertSource::Values(rows) => Some(rows.as_slice()),
1196 InsertSource::Select(_) => None,
1197 };
1198 let sel_rows = select_rows.as_deref();
1199
1200 let total = match (values, sel_rows) {
1201 (Some(rows), _) => rows.len(),
1202 (_, Some(rows)) => rows.len(),
1203 _ => 0,
1204 };
1205
1206 if let Some(sel) = sel_rows {
1207 if !sel.is_empty() && sel[0].len() != insert_columns.len() {
1208 return Err(SqlError::InvalidValue(format!(
1209 "INSERT ... SELECT column count mismatch: expected {}, got {}",
1210 insert_columns.len(),
1211 sel[0].len()
1212 )));
1213 }
1214 }
1215
1216 for idx in 0..total {
1217 for v in bufs.row.iter_mut() {
1218 *v = Value::Null;
1219 }
1220
1221 if let Some(value_rows) = values {
1222 let value_row = &value_rows[idx];
1223 if value_row.len() != insert_columns.len() {
1224 return Err(SqlError::InvalidValue(format!(
1225 "expected {} values, got {}",
1226 insert_columns.len(),
1227 value_row.len()
1228 )));
1229 }
1230 for (i, expr) in value_row.iter().enumerate() {
1231 let val = match expr {
1232 Expr::Parameter(n) => params
1233 .get(n - 1)
1234 .cloned()
1235 .ok_or_else(|| SqlError::Parse(format!("unbound parameter ${n}")))?,
1236 Expr::Literal(v) => v.clone(),
1237 _ => eval_const_expr(expr)?,
1238 };
1239 let col_idx = bufs.col_indices[i];
1240 let col = &table_schema.columns[col_idx];
1241 let got_type = val.data_type();
1242 bufs.row[col_idx] = if val.is_null() {
1243 Value::Null
1244 } else {
1245 val.coerce_into(col.data_type)
1246 .ok_or_else(|| SqlError::TypeMismatch {
1247 expected: col.data_type.to_string(),
1248 got: got_type.to_string(),
1249 })?
1250 };
1251 }
1252 } else if let Some(sel) = sel_rows {
1253 let sel_row = &sel[idx];
1254 for (i, val) in sel_row.iter().enumerate() {
1255 let col_idx = bufs.col_indices[i];
1256 let col = &table_schema.columns[col_idx];
1257 let got_type = val.data_type();
1258 bufs.row[col_idx] = if val.is_null() {
1259 Value::Null
1260 } else {
1261 val.clone().coerce_into(col.data_type).ok_or_else(|| {
1262 SqlError::TypeMismatch {
1263 expected: col.data_type.to_string(),
1264 got: got_type.to_string(),
1265 }
1266 })?
1267 };
1268 }
1269 }
1270
1271 if has_defaults {
1272 for &(pos, def_expr) in &defaults {
1273 let val = eval_const_expr(def_expr)?;
1274 let col = &table_schema.columns[pos];
1275 if !val.is_null() {
1276 let got_type = val.data_type();
1277 bufs.row[pos] =
1278 val.coerce_into(col.data_type)
1279 .ok_or_else(|| SqlError::TypeMismatch {
1280 expected: col.data_type.to_string(),
1281 got: got_type.to_string(),
1282 })?;
1283 }
1284 }
1285 }
1286
1287 for col in &table_schema.columns {
1288 if !col.nullable && bufs.row[col.position as usize].is_null() {
1289 return Err(SqlError::NotNullViolation(col.name.clone()));
1290 }
1291 }
1292
1293 if let Some(ref col_map) = check_col_map {
1294 for col in &table_schema.columns {
1295 if let Some(ref check) = col.check_expr {
1296 let result = eval_expr(check, &EvalCtx::new(col_map, &bufs.row))?;
1297 if !is_truthy(&result) && !result.is_null() {
1298 let name = col.check_name.as_deref().unwrap_or(&col.name);
1299 return Err(SqlError::CheckViolation(name.to_string()));
1300 }
1301 }
1302 }
1303 for tc in &table_schema.check_constraints {
1304 let result = eval_expr(&tc.expr, &EvalCtx::new(col_map, &bufs.row))?;
1305 if !is_truthy(&result) && !result.is_null() {
1306 let name = tc.name.as_deref().unwrap_or(&tc.sql);
1307 return Err(SqlError::CheckViolation(name.to_string()));
1308 }
1309 }
1310 }
1311
1312 if has_fks {
1313 for fk in &table_schema.foreign_keys {
1314 let any_null = fk.columns.iter().any(|&ci| bufs.row[ci as usize].is_null());
1315 if any_null {
1316 continue;
1317 }
1318 let fk_vals: Vec<Value> = fk
1319 .columns
1320 .iter()
1321 .map(|&ci| bufs.row[ci as usize].clone())
1322 .collect();
1323 bufs.fk_key_buf.clear();
1324 encode_composite_key_into(&fk_vals, &mut bufs.fk_key_buf);
1325 let found = wtx
1326 .table_get(fk.foreign_table.as_bytes(), &bufs.fk_key_buf)
1327 .map_err(SqlError::Storage)?;
1328 if found.is_none() {
1329 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
1330 return Err(SqlError::ForeignKeyViolation(name.to_string()));
1331 }
1332 }
1333 }
1334
1335 for (j, &i) in pk_indices.iter().enumerate() {
1336 bufs.pk_values[j] = std::mem::replace(&mut bufs.row[i], Value::Null);
1337 }
1338 encode_composite_key_into(&bufs.pk_values, &mut bufs.key_buf);
1339
1340 for &slot in dropped {
1341 bufs.value_values[slot as usize] = Value::Null;
1342 }
1343 for (j, &i) in non_pk.iter().enumerate() {
1344 bufs.value_values[enc_pos[j] as usize] =
1345 std::mem::replace(&mut bufs.row[i], Value::Null);
1346 }
1347 encode_row_into(&bufs.value_values, &mut bufs.value_buf);
1348
1349 if bufs.key_buf.len() > citadel_core::MAX_KEY_SIZE {
1350 return Err(SqlError::KeyTooLarge {
1351 size: bufs.key_buf.len(),
1352 max: citadel_core::MAX_KEY_SIZE,
1353 });
1354 }
1355 if bufs.value_buf.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
1356 return Err(SqlError::RowTooLarge {
1357 size: bufs.value_buf.len(),
1358 max: citadel_core::MAX_INLINE_VALUE_SIZE,
1359 });
1360 }
1361
1362 match compiled_conflict.as_ref() {
1363 None => {
1364 let is_new = wtx
1365 .table_insert(table_bytes, &bufs.key_buf, &bufs.value_buf)
1366 .map_err(SqlError::Storage)?;
1367 if !is_new {
1368 return Err(SqlError::DuplicateKey);
1369 }
1370 if has_indices {
1371 for (j, &i) in pk_indices.iter().enumerate() {
1372 bufs.row[i] = bufs.pk_values[j].clone();
1373 }
1374 for (j, &i) in non_pk.iter().enumerate() {
1375 bufs.row[i] = std::mem::replace(
1376 &mut bufs.value_values[enc_pos[j] as usize],
1377 Value::Null,
1378 );
1379 }
1380 insert_index_entries(wtx, table_schema, &bufs.row, &bufs.pk_values)?;
1381 }
1382 count += 1;
1383 }
1384 Some(oc) => {
1385 let oc_ref: &CompiledOnConflict = oc;
1386 let needs_row = upsert_needs_row(oc_ref, table_schema);
1387 if needs_row {
1388 for (j, &i) in pk_indices.iter().enumerate() {
1389 bufs.row[i] = bufs.pk_values[j].clone();
1390 }
1391 for (j, &i) in non_pk.iter().enumerate() {
1392 bufs.row[i] = std::mem::replace(
1393 &mut bufs.value_values[enc_pos[j] as usize],
1394 Value::Null,
1395 );
1396 }
1397 }
1398 match apply_insert_with_conflict(
1399 wtx,
1400 table_schema,
1401 &bufs.key_buf,
1402 &bufs.value_buf,
1403 &bufs.row,
1404 &bufs.pk_values,
1405 oc_ref,
1406 row_col_map.unwrap(),
1407 )? {
1408 InsertRowOutcome::Inserted => count += 1,
1409 InsertRowOutcome::Skipped => {}
1410 }
1411 }
1412 }
1413 }
1414
1415 Ok(ExecutionResult::RowsAffected(count))
1416}
1417
1418pub struct CompiledInsert {
1419 table_lower: String,
1420 cached: Option<InsertCache>,
1421}
1422
1423struct InsertCache {
1424 col_indices: Vec<usize>,
1425 has_subquery: bool,
1426 any_defaults: bool,
1427 has_checks: bool,
1428 on_conflict: Option<Arc<CompiledOnConflict>>,
1429 row_col_map: Option<ColumnMap>,
1430}
1431
1432#[derive(Clone)]
1433pub(super) enum CompiledOnConflict {
1434 DoNothing {
1435 target: Option<ConflictKind>,
1436 },
1437 DoUpdate {
1438 target: ConflictKind,
1439 assignments: Vec<(usize, Expr)>,
1440 where_clause: Option<Expr>,
1441 fast_paths: Option<Vec<DoUpdateFastPath>>,
1442 },
1443}
1444
1445#[derive(Clone, Copy)]
1446pub(super) enum DoUpdateFastPath {
1447 IntAddConst { phys_idx: usize, delta: i64 },
1448}
1449
1450#[derive(Clone, Debug)]
1451pub(super) enum ConflictKind {
1452 PrimaryKey,
1453 UniqueIndex { index_idx: usize },
1454}
1455
1456fn resolve_conflict_target(target: &ConflictTarget, ts: &TableSchema) -> Result<ConflictKind> {
1457 match target {
1458 ConflictTarget::Columns(cols) => {
1459 let col_idx_set: Vec<u16> = cols
1460 .iter()
1461 .map(|name| {
1462 ts.column_index(name)
1463 .map(|i| i as u16)
1464 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))
1465 })
1466 .collect::<Result<_>>()?;
1467 let pk_set = ts.primary_key_columns.clone();
1468 if set_equal(&col_idx_set, &pk_set) {
1469 return Ok(ConflictKind::PrimaryKey);
1470 }
1471 for (index_idx, idx) in ts.indices.iter().enumerate() {
1472 if idx.unique && set_equal(&col_idx_set, &idx.columns) {
1473 return Ok(ConflictKind::UniqueIndex { index_idx });
1474 }
1475 }
1476 Err(SqlError::Plan(
1477 "ON CONFLICT target does not match any unique constraint".into(),
1478 ))
1479 }
1480 ConflictTarget::Constraint(name) => {
1481 let lower = name.to_ascii_lowercase();
1482 for (index_idx, idx) in ts.indices.iter().enumerate() {
1483 if idx.name.eq_ignore_ascii_case(&lower) {
1484 if idx.unique {
1485 return Ok(ConflictKind::UniqueIndex { index_idx });
1486 }
1487 return Err(SqlError::Plan(format!(
1488 "ON CONFLICT ON CONSTRAINT '{name}' requires a unique index"
1489 )));
1490 }
1491 }
1492 Err(SqlError::Plan(format!(
1493 "unknown constraint '{name}'; primary keys cannot be referenced by name, use ON CONFLICT (col_list)"
1494 )))
1495 }
1496 }
1497}
1498
1499fn set_equal(a: &[u16], b: &[u16]) -> bool {
1500 if a.len() != b.len() {
1501 return false;
1502 }
1503 let mut a_sorted = a.to_vec();
1504 let mut b_sorted = b.to_vec();
1505 a_sorted.sort_unstable();
1506 b_sorted.sort_unstable();
1507 a_sorted == b_sorted
1508}
1509
1510pub(super) enum InsertRowOutcome {
1511 Inserted,
1512 Skipped,
1513}
1514
1515#[allow(clippy::too_many_arguments)]
1516#[inline]
1517pub(super) fn apply_insert_with_conflict(
1518 wtx: &mut WriteTxn<'_>,
1519 table_schema: &TableSchema,
1520 key_buf: &[u8],
1521 value_buf: &[u8],
1522 row: &[Value],
1523 pk_values: &[Value],
1524 on_conflict: &CompiledOnConflict,
1525 col_map: &ColumnMap,
1526) -> Result<InsertRowOutcome> {
1527 let table_bytes = table_schema.name.as_bytes();
1528
1529 if let CompiledOnConflict::DoNothing { target } = on_conflict {
1530 let pk_target = matches!(target, None | Some(ConflictKind::PrimaryKey));
1531 if pk_target && table_schema.indices.is_empty() && table_schema.foreign_keys.is_empty() {
1532 let inserted = wtx
1533 .table_insert_if_absent(table_bytes, key_buf, value_buf)
1534 .map_err(SqlError::Storage)?;
1535 return Ok(if inserted {
1536 InsertRowOutcome::Inserted
1537 } else {
1538 InsertRowOutcome::Skipped
1539 });
1540 }
1541 }
1542
1543 if let CompiledOnConflict::DoUpdate {
1544 target: ConflictKind::PrimaryKey,
1545 assignments,
1546 where_clause,
1547 fast_paths,
1548 } = on_conflict
1549 {
1550 if can_fuse_do_update(table_schema, assignments) {
1551 return apply_do_update_fused(
1552 wtx,
1553 table_schema,
1554 table_bytes,
1555 key_buf,
1556 value_buf,
1557 row,
1558 assignments,
1559 where_clause.as_ref(),
1560 col_map,
1561 fast_paths.as_deref(),
1562 );
1563 }
1564 }
1565
1566 let primary_outcome = wtx
1567 .table_insert_or_fetch(table_bytes, key_buf, value_buf)
1568 .map_err(SqlError::Storage)?;
1569
1570 match primary_outcome {
1571 citadel_txn::write_txn::InsertOutcome::Inserted => {
1572 if table_schema.indices.is_empty() {
1573 return Ok(InsertRowOutcome::Inserted);
1574 }
1575 let mut inserted_keys: Vec<(usize, Vec<u8>)> = Vec::new();
1576 match insert_index_entries_or_fetch(
1577 wtx,
1578 table_schema,
1579 row,
1580 pk_values,
1581 &mut inserted_keys,
1582 )? {
1583 None => Ok(InsertRowOutcome::Inserted),
1584 Some(conflicting_idx) => {
1585 let matches_target =
1586 matches!(on_conflict, CompiledOnConflict::DoNothing { target: None })
1587 || matches!(
1588 on_conflict,
1589 CompiledOnConflict::DoNothing {
1590 target: Some(ConflictKind::UniqueIndex { index_idx }),
1591 } | CompiledOnConflict::DoUpdate {
1592 target: ConflictKind::UniqueIndex { index_idx },
1593 ..
1594 } if *index_idx == conflicting_idx
1595 );
1596 undo_partial_insert(wtx, table_schema, key_buf, &inserted_keys)?;
1597 if !matches_target {
1598 return Err(SqlError::UniqueViolation(
1599 table_schema.indices[conflicting_idx].name.clone(),
1600 ));
1601 }
1602 match on_conflict {
1603 CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
1604 CompiledOnConflict::DoUpdate {
1605 assignments,
1606 where_clause,
1607 ..
1608 } => {
1609 let existing_pk =
1610 fetch_unique_index_pk(wtx, table_schema, conflicting_idx, row)?;
1611 apply_do_update(
1612 wtx,
1613 table_schema,
1614 &existing_pk,
1615 row,
1616 assignments,
1617 where_clause.as_ref(),
1618 col_map,
1619 )
1620 }
1621 }
1622 }
1623 }
1624 }
1625 citadel_txn::write_txn::InsertOutcome::Existed(old_bytes) => {
1626 let matches_target = matches!(
1627 on_conflict,
1628 CompiledOnConflict::DoNothing { target: None }
1629 | CompiledOnConflict::DoNothing {
1630 target: Some(ConflictKind::PrimaryKey),
1631 }
1632 | CompiledOnConflict::DoUpdate {
1633 target: ConflictKind::PrimaryKey,
1634 ..
1635 }
1636 );
1637 if !matches_target {
1638 return Err(SqlError::DuplicateKey);
1639 }
1640 match on_conflict {
1641 CompiledOnConflict::DoNothing { .. } => Ok(InsertRowOutcome::Skipped),
1642 CompiledOnConflict::DoUpdate {
1643 assignments,
1644 where_clause,
1645 ..
1646 } => {
1647 let old_row = decode_full_row(table_schema, key_buf, &old_bytes)?;
1648 apply_do_update_with_old_row(
1649 wtx,
1650 table_schema,
1651 key_buf,
1652 &old_row,
1653 row,
1654 assignments,
1655 where_clause.as_ref(),
1656 col_map,
1657 )
1658 }
1659 }
1660 }
1661 }
1662}
1663
1664#[inline]
1665fn apply_fast_path_patch(
1666 old_bytes: &[u8],
1667 fast_paths: &[DoUpdateFastPath],
1668) -> Result<UpsertAction> {
1669 UPSERT_SCRATCH.with(|slot| {
1670 let mut bufs = slot.borrow_mut();
1671 bufs.new_value_buf.clear();
1672 bufs.new_value_buf.extend_from_slice(old_bytes);
1673
1674 let mut patch_scratch: Vec<u8> = Vec::new();
1675
1676 for fp in fast_paths {
1677 match fp {
1678 DoUpdateFastPath::IntAddConst { phys_idx, delta } => {
1679 let decoded =
1680 crate::encoding::decode_columns(&bufs.new_value_buf, &[*phys_idx])?;
1681 let old_val = &decoded[0];
1682 let new_val = match old_val {
1683 Value::Integer(i) => Value::Integer(i.wrapping_add(*delta)),
1684 Value::Null => Value::Null,
1685 _ => {
1686 return Err(SqlError::TypeMismatch {
1687 expected: "INTEGER".into(),
1688 got: old_val.data_type().to_string(),
1689 });
1690 }
1691 };
1692 if !crate::encoding::patch_column_in_place(
1693 &mut bufs.new_value_buf,
1694 *phys_idx,
1695 &new_val,
1696 )? {
1697 patch_scratch.clear();
1698 crate::encoding::patch_row_column(
1699 &bufs.new_value_buf,
1700 *phys_idx,
1701 &new_val,
1702 &mut patch_scratch,
1703 )?;
1704 std::mem::swap(&mut bufs.new_value_buf, &mut patch_scratch);
1705 }
1706 }
1707 }
1708 }
1709
1710 if bufs.new_value_buf.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
1711 return Err(SqlError::RowTooLarge {
1712 size: bufs.new_value_buf.len(),
1713 max: citadel_core::MAX_INLINE_VALUE_SIZE,
1714 });
1715 }
1716
1717 Ok(UpsertAction::Replace(bufs.new_value_buf.clone()))
1718 })
1719}
1720
1721fn upsert_needs_row(oc: &CompiledOnConflict, ts: &TableSchema) -> bool {
1722 if !ts.indices.is_empty() {
1723 return true;
1724 }
1725 match oc {
1726 CompiledOnConflict::DoNothing { .. } => false,
1727 CompiledOnConflict::DoUpdate { fast_paths, .. } => fast_paths.is_none() || ts.has_checks(),
1728 }
1729}
1730
1731fn can_fuse_do_update(ts: &TableSchema, assignments: &[(usize, Expr)]) -> bool {
1732 if !ts.indices.is_empty() {
1733 return false;
1734 }
1735 if !ts.foreign_keys.is_empty() {
1736 return false;
1737 }
1738 let pk = ts.pk_indices();
1739 !assignments.iter().any(|(ci, _)| pk.contains(ci))
1740}
1741
1742#[allow(clippy::too_many_arguments)]
1743#[inline]
1744fn apply_do_update_fused(
1745 wtx: &mut WriteTxn<'_>,
1746 table_schema: &TableSchema,
1747 table_bytes: &[u8],
1748 key_buf: &[u8],
1749 value_buf: &[u8],
1750 proposed_row: &[Value],
1751 assignments: &[(usize, Expr)],
1752 where_clause: Option<&Expr>,
1753 col_map: &ColumnMap,
1754 fast_paths: Option<&[DoUpdateFastPath]>,
1755) -> Result<InsertRowOutcome> {
1756 let non_pk = table_schema.non_pk_indices();
1757 let enc_pos = table_schema.encoding_positions();
1758 let phys_count = table_schema.physical_non_pk_count();
1759 let dropped = table_schema.dropped_non_pk_slots();
1760 let has_checks = table_schema.has_checks();
1761 let has_fks = !table_schema.foreign_keys.is_empty();
1762
1763 let outcome =
1764 wtx.table_upsert_with::<_, SqlError>(table_bytes, key_buf, value_buf, |old_bytes| {
1765 if let Some(fps) = fast_paths {
1766 if !has_checks {
1767 return apply_fast_path_patch(old_bytes, fps);
1768 }
1769 }
1770 UPSERT_SCRATCH.with(|slot| {
1771 let mut bufs = slot.borrow_mut();
1772 let UpsertBufs {
1773 old_row,
1774 new_row,
1775 value_values,
1776 new_value_buf,
1777 } = &mut *bufs;
1778
1779 old_row.clear();
1780 old_row.resize(table_schema.columns.len(), Value::Null);
1781 decode_full_row_into(table_schema, key_buf, old_bytes, old_row)?;
1782
1783 if let Some(w) = where_clause {
1784 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
1785 let result = eval_expr(w, &ctx)?;
1786 if result.is_null() || !is_truthy(&result) {
1787 return Ok(UpsertAction::Skip);
1788 }
1789 }
1790
1791 new_row.clear();
1792 new_row.extend_from_slice(old_row);
1793 for (col_idx, expr) in assignments {
1794 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
1795 let val = eval_expr(expr, &ctx)?;
1796 let col = &table_schema.columns[*col_idx];
1797 new_row[*col_idx] = if val.is_null() {
1798 Value::Null
1799 } else {
1800 let got = val.data_type();
1801 val.coerce_into(col.data_type)
1802 .ok_or_else(|| SqlError::TypeMismatch {
1803 expected: col.data_type.to_string(),
1804 got: got.to_string(),
1805 })?
1806 };
1807 }
1808
1809 for (assigned_idx, _) in assignments {
1810 let col = &table_schema.columns[*assigned_idx];
1811 if !col.nullable && new_row[col.position as usize].is_null() {
1812 return Err(SqlError::NotNullViolation(col.name.clone()));
1813 }
1814 }
1815 if has_checks {
1816 for col in &table_schema.columns {
1817 if let Some(ref check) = col.check_expr {
1818 let ctx = EvalCtx::new(col_map, new_row);
1819 let result = eval_expr(check, &ctx)?;
1820 if !is_truthy(&result) && !result.is_null() {
1821 let name = col.check_name.as_deref().unwrap_or(&col.name);
1822 return Err(SqlError::CheckViolation(name.to_string()));
1823 }
1824 }
1825 }
1826 for tc in &table_schema.check_constraints {
1827 let ctx = EvalCtx::new(col_map, new_row);
1828 let result = eval_expr(&tc.expr, &ctx)?;
1829 if !is_truthy(&result) && !result.is_null() {
1830 let name = tc.name.as_deref().unwrap_or(&tc.sql);
1831 return Err(SqlError::CheckViolation(name.to_string()));
1832 }
1833 }
1834 }
1835 let _ = has_fks;
1836
1837 value_values.clear();
1838 value_values.resize(phys_count, Value::Null);
1839 for &slot in dropped {
1840 value_values[slot as usize] = Value::Null;
1841 }
1842 for (j, &i) in non_pk.iter().enumerate() {
1843 value_values[enc_pos[j] as usize] = new_row[i].clone();
1844 }
1845 new_value_buf.clear();
1846 crate::encoding::encode_row_into(value_values, new_value_buf);
1847
1848 if new_value_buf.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
1849 return Err(SqlError::RowTooLarge {
1850 size: new_value_buf.len(),
1851 max: citadel_core::MAX_INLINE_VALUE_SIZE,
1852 });
1853 }
1854
1855 Ok(UpsertAction::Replace(new_value_buf.clone()))
1856 })
1857 })?;
1858
1859 match outcome {
1860 UpsertOutcome::Inserted | UpsertOutcome::Updated => Ok(InsertRowOutcome::Inserted),
1861 UpsertOutcome::Skipped => Ok(InsertRowOutcome::Skipped),
1862 }
1863}
1864
1865fn fetch_unique_index_pk(
1866 wtx: &mut WriteTxn<'_>,
1867 table_schema: &TableSchema,
1868 index_idx: usize,
1869 row: &[Value],
1870) -> Result<Vec<u8>> {
1871 let idx = &table_schema.indices[index_idx];
1872 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
1873 let indexed: Vec<Value> = idx
1874 .columns
1875 .iter()
1876 .map(|&col_idx| row[col_idx as usize].clone())
1877 .collect();
1878 let key = crate::encoding::encode_composite_key(&indexed);
1879 let value = wtx
1880 .table_get(&idx_table, &key)
1881 .map_err(SqlError::Storage)?
1882 .ok_or_else(|| {
1883 SqlError::InvalidValue("unique index missing expected collision entry".into())
1884 })?;
1885 Ok(value)
1886}
1887
1888fn apply_do_update(
1889 wtx: &mut WriteTxn<'_>,
1890 table_schema: &TableSchema,
1891 pk_key: &[u8],
1892 proposed_row: &[Value],
1893 assignments: &[(usize, Expr)],
1894 where_clause: Option<&Expr>,
1895 col_map: &ColumnMap,
1896) -> Result<InsertRowOutcome> {
1897 let old_value = wtx
1898 .table_get(table_schema.name.as_bytes(), pk_key)
1899 .map_err(SqlError::Storage)?
1900 .ok_or_else(|| SqlError::InvalidValue("primary row missing for DO UPDATE target".into()))?;
1901 let old_row = decode_full_row(table_schema, pk_key, &old_value)?;
1902 apply_do_update_with_old_row(
1903 wtx,
1904 table_schema,
1905 pk_key,
1906 &old_row,
1907 proposed_row,
1908 assignments,
1909 where_clause,
1910 col_map,
1911 )
1912}
1913
1914#[allow(clippy::too_many_arguments)]
1915fn apply_do_update_with_old_row(
1916 wtx: &mut WriteTxn<'_>,
1917 table_schema: &TableSchema,
1918 old_pk_key: &[u8],
1919 old_row: &[Value],
1920 proposed_row: &[Value],
1921 assignments: &[(usize, Expr)],
1922 where_clause: Option<&Expr>,
1923 col_map: &ColumnMap,
1924) -> Result<InsertRowOutcome> {
1925 if let Some(w) = where_clause {
1926 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
1927 let result = eval_expr(w, &ctx)?;
1928 if result.is_null() || !is_truthy(&result) {
1929 return Ok(InsertRowOutcome::Skipped);
1930 }
1931 }
1932
1933 let mut new_row = old_row.to_vec();
1934 for (col_idx, expr) in assignments {
1935 let ctx = EvalCtx::with_excluded(col_map, old_row, col_map, proposed_row);
1936 let val = eval_expr(expr, &ctx)?;
1937 let col = &table_schema.columns[*col_idx];
1938 new_row[*col_idx] = if val.is_null() {
1939 Value::Null
1940 } else {
1941 let got = val.data_type();
1942 val.coerce_into(col.data_type)
1943 .ok_or_else(|| SqlError::TypeMismatch {
1944 expected: col.data_type.to_string(),
1945 got: got.to_string(),
1946 })?
1947 };
1948 }
1949
1950 let pk_indices = table_schema.pk_indices();
1951 let assigned_pk = assignments.iter().any(|(ci, _)| pk_indices.contains(ci));
1952 let pk_changed = assigned_pk && pk_indices.iter().any(|&i| old_row[i] != new_row[i]);
1953
1954 for (assigned_idx, _) in assignments {
1955 let col = &table_schema.columns[*assigned_idx];
1956 if !col.nullable && new_row[col.position as usize].is_null() {
1957 return Err(SqlError::NotNullViolation(col.name.clone()));
1958 }
1959 }
1960 if table_schema.has_checks() {
1961 for col in &table_schema.columns {
1962 if let Some(ref check) = col.check_expr {
1963 let ctx = EvalCtx::new(col_map, &new_row);
1964 let result = eval_expr(check, &ctx)?;
1965 if !is_truthy(&result) && !result.is_null() {
1966 let name = col.check_name.as_deref().unwrap_or(&col.name);
1967 return Err(SqlError::CheckViolation(name.to_string()));
1968 }
1969 }
1970 }
1971 for tc in &table_schema.check_constraints {
1972 let ctx = EvalCtx::new(col_map, &new_row);
1973 let result = eval_expr(&tc.expr, &ctx)?;
1974 if !is_truthy(&result) && !result.is_null() {
1975 let name = tc.name.as_deref().unwrap_or(&tc.sql);
1976 return Err(SqlError::CheckViolation(name.to_string()));
1977 }
1978 }
1979 }
1980 for fk in &table_schema.foreign_keys {
1981 let changed = fk
1982 .columns
1983 .iter()
1984 .any(|&ci| old_row[ci as usize] != new_row[ci as usize]);
1985 if !changed {
1986 continue;
1987 }
1988 let any_null = fk.columns.iter().any(|&ci| new_row[ci as usize].is_null());
1989 if any_null {
1990 continue;
1991 }
1992 let fk_vals: Vec<Value> = fk
1993 .columns
1994 .iter()
1995 .map(|&ci| new_row[ci as usize].clone())
1996 .collect();
1997 let fk_key = crate::encoding::encode_composite_key(&fk_vals);
1998 let found = wtx
1999 .table_get(fk.foreign_table.as_bytes(), &fk_key)
2000 .map_err(SqlError::Storage)?;
2001 if found.is_none() {
2002 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
2003 return Err(SqlError::ForeignKeyViolation(name.to_string()));
2004 }
2005 }
2006
2007 let has_indices = !table_schema.indices.is_empty();
2008 let old_pk_values: Vec<Value> = if has_indices || pk_changed {
2009 pk_indices.iter().map(|&i| old_row[i].clone()).collect()
2010 } else {
2011 Vec::new()
2012 };
2013 let new_pk_values: Vec<Value> = if has_indices || pk_changed {
2014 pk_indices.iter().map(|&i| new_row[i].clone()).collect()
2015 } else {
2016 Vec::new()
2017 };
2018
2019 let non_pk = table_schema.non_pk_indices();
2020 let enc_pos = table_schema.encoding_positions();
2021 let phys_count = table_schema.physical_non_pk_count();
2022 let dropped = table_schema.dropped_non_pk_slots();
2023 let mut value_values: Vec<Value> = vec![Value::Null; phys_count];
2024 for &slot in dropped {
2025 value_values[slot as usize] = Value::Null;
2026 }
2027 for (j, &i) in non_pk.iter().enumerate() {
2028 value_values[enc_pos[j] as usize] = new_row[i].clone();
2029 }
2030 let mut new_value_buf = Vec::with_capacity(256);
2031 crate::encoding::encode_row_into(&value_values, &mut new_value_buf);
2032
2033 if new_value_buf.len() > citadel_core::MAX_INLINE_VALUE_SIZE {
2034 return Err(SqlError::RowTooLarge {
2035 size: new_value_buf.len(),
2036 max: citadel_core::MAX_INLINE_VALUE_SIZE,
2037 });
2038 }
2039
2040 if pk_changed {
2041 let new_pk_key = crate::encoding::encode_composite_key(&new_pk_values);
2042 let inserted = wtx
2043 .table_insert(table_schema.name.as_bytes(), &new_pk_key, &new_value_buf)
2044 .map_err(SqlError::Storage)?;
2045 if !inserted {
2046 return Err(SqlError::DuplicateKey);
2047 }
2048 wtx.table_delete(table_schema.name.as_bytes(), old_pk_key)
2049 .map_err(SqlError::Storage)?;
2050 for idx in &table_schema.indices {
2051 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2052 let old_idx_key = encode_index_key(idx, old_row, &old_pk_values);
2053 wtx.table_delete(&idx_table, &old_idx_key)
2054 .map_err(SqlError::Storage)?;
2055 let new_idx_key = encode_index_key(idx, &new_row, &new_pk_values);
2056 let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
2057 let is_new = wtx
2058 .table_insert(&idx_table, &new_idx_key, &new_idx_val)
2059 .map_err(SqlError::Storage)?;
2060 if idx.unique && !is_new {
2061 let any_null = idx.columns.iter().any(|&c| new_row[c as usize].is_null());
2062 if !any_null {
2063 return Err(SqlError::UniqueViolation(idx.name.clone()));
2064 }
2065 }
2066 }
2067 } else {
2068 wtx.table_update_sorted(
2069 table_schema.name.as_bytes(),
2070 &[(old_pk_key, new_value_buf.as_slice())],
2071 )
2072 .map_err(SqlError::Storage)?;
2073 for idx in &table_schema.indices {
2074 if !index_columns_changed(idx, old_row, &new_row) {
2075 continue;
2076 }
2077 let idx_table = TableSchema::index_table_name(&table_schema.name, &idx.name);
2078 let old_idx_key = encode_index_key(idx, old_row, &old_pk_values);
2079 wtx.table_delete(&idx_table, &old_idx_key)
2080 .map_err(SqlError::Storage)?;
2081 let new_idx_key = encode_index_key(idx, &new_row, &new_pk_values);
2082 let new_idx_val = encode_index_value(idx, &new_row, &new_pk_values);
2083 let is_new = wtx
2084 .table_insert(&idx_table, &new_idx_key, &new_idx_val)
2085 .map_err(SqlError::Storage)?;
2086 if idx.unique && !is_new {
2087 let any_null = idx.columns.iter().any(|&c| new_row[c as usize].is_null());
2088 if !any_null {
2089 return Err(SqlError::UniqueViolation(idx.name.clone()));
2090 }
2091 }
2092 }
2093 }
2094
2095 Ok(InsertRowOutcome::Inserted)
2096}
2097
2098fn detect_fast_paths(
2099 ts: &TableSchema,
2100 assignments: &[(usize, Expr)],
2101) -> Option<Vec<DoUpdateFastPath>> {
2102 let non_pk = ts.non_pk_indices();
2103 let enc_pos = ts.encoding_positions();
2104 let mut out = Vec::with_capacity(assignments.len());
2105 for (col_idx, expr) in assignments {
2106 let col = &ts.columns[*col_idx];
2107 if col.data_type != DataType::Integer {
2108 return None;
2109 }
2110 let nonpk_order = non_pk.iter().position(|&i| i == *col_idx)?;
2111 let phys_idx = enc_pos[nonpk_order] as usize;
2112
2113 if let Expr::BinaryOp { left, op, right } = expr {
2114 if !matches!(op, BinOp::Add | BinOp::Sub) {
2115 return None;
2116 }
2117 let reads_target =
2118 matches!(left.as_ref(), Expr::Column(n) if n.eq_ignore_ascii_case(&col.name));
2119 if !reads_target {
2120 return None;
2121 }
2122 if let Expr::Literal(Value::Integer(n)) = right.as_ref() {
2123 let delta = if matches!(op, BinOp::Sub) { -n } else { *n };
2124 let _ = col_idx;
2125 out.push(DoUpdateFastPath::IntAddConst { phys_idx, delta });
2126 continue;
2127 }
2128 return None;
2129 }
2130 return None;
2131 }
2132 Some(out)
2133}
2134
2135fn compile_on_conflict(oc: &OnConflictClause, ts: &TableSchema) -> Result<CompiledOnConflict> {
2136 let target = oc
2137 .target
2138 .as_ref()
2139 .map(|t| resolve_conflict_target(t, ts))
2140 .transpose()?;
2141 match &oc.action {
2142 OnConflictAction::DoNothing => Ok(CompiledOnConflict::DoNothing { target }),
2143 OnConflictAction::DoUpdate {
2144 assignments,
2145 where_clause,
2146 } => {
2147 let target = target.ok_or_else(|| {
2148 SqlError::Plan("ON CONFLICT without target requires DO NOTHING".into())
2149 })?;
2150 let compiled_assignments: Vec<(usize, Expr)> = assignments
2151 .iter()
2152 .map(|(name, expr)| {
2153 let col_idx = ts
2154 .column_index(name)
2155 .ok_or_else(|| SqlError::ColumnNotFound(name.clone()))?;
2156 Ok((col_idx, expr.clone()))
2157 })
2158 .collect::<Result<_>>()?;
2159 let fast_paths = if where_clause.is_none() {
2160 detect_fast_paths(ts, &compiled_assignments)
2161 } else {
2162 None
2163 };
2164 Ok(CompiledOnConflict::DoUpdate {
2165 target,
2166 assignments: compiled_assignments,
2167 where_clause: where_clause.clone(),
2168 fast_paths,
2169 })
2170 }
2171 }
2172}
2173
2174impl CompiledInsert {
2175 pub fn try_compile(schema: &SchemaManager, stmt: &InsertStmt) -> Option<Self> {
2176 let lower = stmt.table.to_ascii_lowercase();
2177 let cached = if let Some(ts) = schema.get(&lower) {
2178 let insert_columns: Vec<&str> = if stmt.columns.is_empty() {
2179 ts.columns.iter().map(|c| c.name.as_str()).collect()
2180 } else {
2181 stmt.columns.iter().map(|s| s.as_str()).collect()
2182 };
2183 let mut col_indices = Vec::with_capacity(insert_columns.len());
2184 for name in &insert_columns {
2185 col_indices.push(ts.column_index(name)?);
2186 }
2187 let on_conflict = stmt
2188 .on_conflict
2189 .as_ref()
2190 .map(|oc| compile_on_conflict(oc, ts))
2191 .transpose()
2192 .ok()
2193 .flatten()
2194 .map(Arc::new);
2195 let row_col_map = on_conflict.as_ref().map(|_| ColumnMap::new(&ts.columns));
2196 Some(InsertCache {
2197 col_indices,
2198 has_subquery: insert_has_subquery(stmt),
2199 any_defaults: ts.columns.iter().any(|c| c.default_expr.is_some()),
2200 has_checks: ts.has_checks(),
2201 on_conflict,
2202 row_col_map,
2203 })
2204 } else if schema.get_view(&lower).is_some() {
2205 None
2206 } else {
2207 return None;
2208 };
2209 Some(Self {
2210 table_lower: lower,
2211 cached,
2212 })
2213 }
2214}
2215
2216impl CompiledPlan for CompiledInsert {
2217 fn execute(
2218 &self,
2219 db: &Database,
2220 schema: &SchemaManager,
2221 stmt: &Statement,
2222 params: &[Value],
2223 wtx: Option<&mut WriteTxn<'_>>,
2224 ) -> Result<ExecutionResult> {
2225 let ins = match stmt {
2226 Statement::Insert(i) => i,
2227 _ => {
2228 return Err(SqlError::Unsupported(
2229 "CompiledInsert received non-INSERT statement".into(),
2230 ))
2231 }
2232 };
2233 let _ = &self.table_lower;
2234 match wtx {
2235 None => exec_insert(db, schema, ins, params),
2236 Some(outer) => match self.cached.as_ref() {
2237 Some(c) => exec_insert_in_txn_cached(outer, schema, ins, params, c),
2238 None => exec_insert_in_txn(outer, schema, ins, params),
2239 },
2240 }
2241 }
2242}
2243
2244pub struct CompiledDelete {
2245 table_lower: String,
2246}
2247
2248impl CompiledDelete {
2249 pub fn try_compile(schema: &SchemaManager, stmt: &DeleteStmt) -> Option<Self> {
2250 let lower = stmt.table.to_ascii_lowercase();
2251 schema.get(&lower)?;
2252 Some(Self { table_lower: lower })
2253 }
2254}
2255
2256impl CompiledPlan for CompiledDelete {
2257 fn execute(
2258 &self,
2259 db: &Database,
2260 schema: &SchemaManager,
2261 stmt: &Statement,
2262 _params: &[Value],
2263 wtx: Option<&mut WriteTxn<'_>>,
2264 ) -> Result<ExecutionResult> {
2265 let del = match stmt {
2266 Statement::Delete(d) => d,
2267 _ => {
2268 return Err(SqlError::Unsupported(
2269 "CompiledDelete received non-DELETE statement".into(),
2270 ))
2271 }
2272 };
2273 let _ = &self.table_lower;
2274 match wtx {
2275 None => super::write::exec_delete(db, schema, del),
2276 Some(outer) => super::write::exec_delete_in_txn(outer, schema, del),
2277 }
2278 }
2279}