1use std::collections::HashMap;
2
3use citadel::Database;
4
5use crate::encoding::{
6 decode_column_raw, decode_column_with_offset, decode_composite_key, decode_pk_integer,
7 encode_composite_key, encode_row, patch_at_offset, patch_column_in_place, patch_row_column,
8};
9use crate::error::{Result, SqlError};
10use crate::eval::{eval_expr, is_truthy, ColumnMap};
11use crate::parser::*;
12use crate::schema::SchemaManager;
13use crate::types::*;
14
15use super::correlated::*;
16use super::dml::*;
17use super::helpers::*;
18use super::scan::*;
19use super::select::*;
20use super::view::*;
21use super::CteContext;
22
23pub struct UpdateBufs {
26 partial_row: Vec<Value>,
27 patch_buf: Vec<u8>,
28 offsets: Vec<usize>,
29}
30
31impl Default for UpdateBufs {
32 fn default() -> Self {
33 Self {
34 partial_row: Vec::new(),
35 patch_buf: Vec::with_capacity(256),
36 offsets: Vec::new(),
37 }
38 }
39}
40
41impl UpdateBufs {
42 pub fn new() -> Self {
43 Self::default()
44 }
45}
46
47pub struct CompiledUpdate {
48 table_name_lower: String,
49 is_view: bool,
50 has_correlated_where: bool,
51 has_subquery: bool,
52 can_fast_path: bool,
53 fast: Option<CompiledFastPath>,
54}
55
56struct CompiledFastPath {
57 num_pk_cols: usize,
58 num_columns: usize,
59 single_int_pk: bool,
60 targets: Vec<CompiledTarget>,
61 scan_plan: crate::planner::ScanPlan,
62 pk_idx_cache: Vec<usize>,
63 col_map: ColumnMap,
64 range_bounds_i64: Option<Vec<(BinOp, i64)>>,
65}
66
67enum FastEval {
68 None,
69 IntAdd(i64),
70 IntSub(i64),
71 IntMul(i64),
72 IntSet(i64),
73}
74
75struct CompiledTarget {
76 schema_idx: usize,
77 phys_idx: usize,
78 expr: Expr,
79 col: ColumnDef,
80 fast_eval: FastEval,
81}
82
83fn detect_fast_eval(expr: &Expr, col_name: &str) -> FastEval {
84 let lower = col_name.to_ascii_lowercase();
85 match expr {
86 Expr::Literal(Value::Integer(n)) => FastEval::IntSet(*n),
87 Expr::BinaryOp { left, op, right } => {
88 let col_match =
89 |e: &Expr| matches!(e, Expr::Column(c) if c.to_ascii_lowercase() == lower);
90 let int_lit = |e: &Expr| match e {
91 Expr::Literal(Value::Integer(n)) => Some(*n),
92 _ => None,
93 };
94 if col_match(left) {
95 if let Some(n) = int_lit(right) {
96 return match op {
97 BinOp::Add => FastEval::IntAdd(n),
98 BinOp::Sub => FastEval::IntSub(n),
99 BinOp::Mul => FastEval::IntMul(n),
100 _ => FastEval::None,
101 };
102 }
103 }
104 if col_match(right) {
105 if let Some(n) = int_lit(left) {
106 return match op {
107 BinOp::Add => FastEval::IntAdd(n),
108 BinOp::Mul => FastEval::IntMul(n),
109 _ => FastEval::None,
110 };
111 }
112 }
113 FastEval::None
114 }
115 _ => FastEval::None,
116 }
117}
118
119pub fn compile_update(schema: &SchemaManager, stmt: &UpdateStmt) -> Result<CompiledUpdate> {
120 let table_name_lower = stmt.table.to_ascii_lowercase();
121 let is_view = schema.get_view(&table_name_lower).is_some();
122 if is_view {
123 return Ok(CompiledUpdate {
124 table_name_lower,
125 is_view: true,
126 has_correlated_where: false,
127 has_subquery: false,
128 can_fast_path: false,
129 fast: None,
130 });
131 }
132
133 let table_schema = schema
134 .get(&table_name_lower)
135 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
136
137 let corr_ctx = CorrelationCtx {
138 outer_schema: table_schema,
139 outer_alias: None,
140 };
141 let has_correlated = has_correlated_where(&stmt.where_clause, &corr_ctx, schema);
142 let has_sub = update_has_subquery(stmt);
143
144 if has_correlated || has_sub {
145 return Ok(CompiledUpdate {
146 table_name_lower,
147 is_view: false,
148 has_correlated_where: has_correlated,
149 has_subquery: has_sub,
150 can_fast_path: false,
151 fast: None,
152 });
153 }
154
155 let pk_indices = table_schema.pk_indices();
156 let pk_changed_by_set = stmt.assignments.iter().any(|(col_name, _)| {
157 table_schema
158 .column_index(col_name)
159 .is_some_and(|idx| table_schema.primary_key_columns.contains(&(idx as u16)))
160 });
161 let has_fk = !table_schema.foreign_keys.is_empty();
162 let has_indices = !table_schema.indices.is_empty();
163 let has_child_fk = !schema.child_fks_for(&table_name_lower).is_empty();
164 let can_fast_path = !pk_changed_by_set
165 && !has_fk
166 && !has_indices
167 && !has_child_fk
168 && !table_schema.has_checks();
169
170 let fast = if can_fast_path {
171 let non_pk = table_schema.non_pk_indices();
172 let enc_pos = table_schema.encoding_positions();
173 let num_pk_cols = table_schema.primary_key_columns.len();
174
175 let mut targets = Vec::with_capacity(stmt.assignments.len());
176 for (col_name, expr) in &stmt.assignments {
177 let schema_idx = table_schema
178 .column_index(col_name)
179 .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))?;
180 let nonpk_order = non_pk
181 .iter()
182 .position(|&i| i == schema_idx)
183 .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))?;
184 let phys_idx = enc_pos[nonpk_order] as usize;
185 let fast_eval = detect_fast_eval(expr, col_name);
186 targets.push(CompiledTarget {
187 schema_idx,
188 phys_idx,
189 expr: expr.clone(),
190 col: table_schema.columns[schema_idx].clone(),
191 fast_eval,
192 });
193 }
194
195 let plan = crate::planner::plan_select(table_schema, &stmt.where_clause);
196 let single_int_pk = num_pk_cols == 1
197 && table_schema.columns[table_schema.primary_key_columns[0] as usize].data_type
198 == DataType::Integer;
199
200 let range_bounds_i64 = if single_int_pk {
201 if let crate::planner::ScanPlan::PkRangeScan {
202 ref range_conds, ..
203 } = plan
204 {
205 let bounds: Vec<(BinOp, i64)> = range_conds
206 .iter()
207 .filter_map(|(op, val)| match val {
208 Value::Integer(i) => Some((*op, *i)),
209 _ => None,
210 })
211 .collect();
212 if bounds.len() == range_conds.len() {
213 Some(bounds)
214 } else {
215 None
216 }
217 } else {
218 None
219 }
220 } else {
221 None
222 };
223
224 Some(CompiledFastPath {
225 num_pk_cols,
226 num_columns: table_schema.columns.len(),
227 single_int_pk,
228 targets,
229 scan_plan: plan,
230 pk_idx_cache: pk_indices.to_vec(),
231 col_map: ColumnMap::new(&table_schema.columns),
232 range_bounds_i64,
233 })
234 } else {
235 None
236 };
237
238 Ok(CompiledUpdate {
239 table_name_lower,
240 is_view: false,
241 has_correlated_where: false,
242 has_subquery: false,
243 can_fast_path,
244 fast,
245 })
246}
247
248pub fn exec_update_compiled(
249 db: &Database,
250 schema: &SchemaManager,
251 stmt: &UpdateStmt,
252 compiled: &CompiledUpdate,
253 bufs: &mut UpdateBufs,
254) -> Result<ExecutionResult> {
255 if compiled.is_view {
256 return Err(SqlError::CannotModifyView(stmt.table.clone()));
257 }
258 if compiled.has_correlated_where || compiled.has_subquery || !compiled.can_fast_path {
259 return exec_update(db, schema, stmt);
260 }
261
262 let fast = compiled.fast.as_ref().unwrap();
263 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
264
265 if let crate::planner::ScanPlan::PkRangeScan {
266 ref start_key,
267 ref range_conds,
268 ..
269 } = fast.scan_plan
270 {
271 bufs.partial_row.clear();
272 bufs.partial_row.resize(fast.num_columns, Value::Null);
273 bufs.offsets.clear();
274 bufs.offsets.resize(fast.targets.len(), usize::MAX);
275
276 let count = wtx.table_update_range(
277 compiled.table_name_lower.as_bytes(),
278 start_key,
279 |key, value| {
280 if let Some(ref bounds) = fast.range_bounds_i64 {
281 let pk = decode_pk_integer(key)?;
282 for &(op, bound) in bounds {
283 match op {
284 BinOp::Lt if pk >= bound => return Ok(None),
285 BinOp::LtEq if pk > bound => return Ok(None),
286 BinOp::Gt if pk <= bound => return Ok(Some(false)),
287 BinOp::GtEq if pk < bound => return Ok(Some(false)),
288 _ => {}
289 }
290 }
291 bufs.partial_row[fast.pk_idx_cache[0]] = Value::Integer(pk);
292 } else if fast.single_int_pk {
293 let pk = decode_pk_integer(key)?;
294 let pk_val = Value::Integer(pk);
295 for (op, bound) in range_conds {
296 match op {
297 BinOp::Lt if &pk_val >= bound => return Ok(None),
298 BinOp::LtEq if &pk_val > bound => return Ok(None),
299 BinOp::Gt if &pk_val <= bound => return Ok(Some(false)),
300 BinOp::GtEq if &pk_val < bound => return Ok(Some(false)),
301 _ => {}
302 }
303 }
304 bufs.partial_row[fast.pk_idx_cache[0]] = pk_val;
305 } else {
306 let pk_vals = decode_composite_key(key, fast.num_pk_cols)?;
307 for (op, bound) in range_conds {
308 match op {
309 BinOp::Lt if &pk_vals[0] >= bound => return Ok(None),
310 BinOp::LtEq if &pk_vals[0] > bound => return Ok(None),
311 BinOp::Gt if &pk_vals[0] <= bound => return Ok(Some(false)),
312 BinOp::GtEq if &pk_vals[0] < bound => return Ok(Some(false)),
313 _ => {}
314 }
315 }
316 for (i, &pi) in fast.pk_idx_cache.iter().enumerate() {
317 bufs.partial_row[pi] = pk_vals[i].clone();
318 }
319 }
320 for (i, target) in fast.targets.iter().enumerate() {
321 let (raw, off) = decode_column_with_offset(value, target.phys_idx)?;
322 bufs.partial_row[target.schema_idx] = raw.to_value();
323 bufs.offsets[i] = off;
324 }
325 for (i, target) in fast.targets.iter().enumerate() {
326 let new_val = match target.fast_eval {
327 FastEval::IntAdd(n) => {
328 if let Value::Integer(v) = bufs.partial_row[target.schema_idx] {
329 Value::Integer(v.wrapping_add(n))
330 } else {
331 eval_expr(&target.expr, &fast.col_map, &bufs.partial_row)?
332 }
333 }
334 FastEval::IntSub(n) => {
335 if let Value::Integer(v) = bufs.partial_row[target.schema_idx] {
336 Value::Integer(v.wrapping_sub(n))
337 } else {
338 eval_expr(&target.expr, &fast.col_map, &bufs.partial_row)?
339 }
340 }
341 FastEval::IntMul(n) => {
342 if let Value::Integer(v) = bufs.partial_row[target.schema_idx] {
343 Value::Integer(v.wrapping_mul(n))
344 } else {
345 eval_expr(&target.expr, &fast.col_map, &bufs.partial_row)?
346 }
347 }
348 FastEval::IntSet(n) => Value::Integer(n),
349 FastEval::None => {
350 eval_expr(&target.expr, &fast.col_map, &bufs.partial_row)?
351 }
352 };
353 let coerced = if new_val.is_null() {
354 if !target.col.nullable {
355 return Err(SqlError::NotNullViolation(target.col.name.clone()));
356 }
357 Value::Null
358 } else {
359 let got_type = new_val.data_type();
360 new_val.coerce_into(target.col.data_type).ok_or_else(|| {
361 SqlError::TypeMismatch {
362 expected: target.col.data_type.to_string(),
363 got: got_type.to_string(),
364 }
365 })?
366 };
367 if !patch_at_offset(value, bufs.offsets[i], &coerced)?
368 && !patch_column_in_place(value, target.phys_idx, &coerced)?
369 {
370 patch_row_column(value, target.phys_idx, &coerced, &mut bufs.patch_buf)?;
371 value[..bufs.patch_buf.len()].copy_from_slice(&bufs.patch_buf);
372 for off in bufs.offsets.iter_mut().skip(i + 1) {
373 *off = usize::MAX;
374 }
375 }
376 }
377 Ok(Some(true))
378 },
379 )?;
380
381 wtx.commit().map_err(SqlError::Storage)?;
382 return Ok(ExecutionResult::RowsAffected(count));
383 }
384
385 drop(wtx);
387 exec_update(db, schema, stmt)
388}
389
390pub(super) fn exec_update(
393 db: &Database,
394 schema: &SchemaManager,
395 stmt: &UpdateStmt,
396) -> Result<ExecutionResult> {
397 let lower_name = stmt.table.to_ascii_lowercase();
398 if schema.get_view(&lower_name).is_some() {
399 return Err(SqlError::CannotModifyView(stmt.table.clone()));
400 }
401 let table_schema = schema
402 .get(&lower_name)
403 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
404
405 let corr_ctx = CorrelationCtx {
407 outer_schema: table_schema,
408 outer_alias: None,
409 };
410 if has_correlated_where(&stmt.where_clause, &corr_ctx, schema) {
411 let select_stmt = SelectStmt {
412 columns: vec![SelectColumn::AllColumns],
413 from: stmt.table.clone(),
414 from_alias: None,
415 joins: vec![],
416 distinct: false,
417 where_clause: stmt.where_clause.clone(),
418 order_by: vec![],
419 limit: None,
420 offset: None,
421 group_by: vec![],
422 having: None,
423 };
424 let (mut rows, _) = collect_rows_read(db, table_schema, &None, None)?;
425 let remaining =
426 handle_correlated_where_read(db, schema, &select_stmt, &corr_ctx, &mut rows)?;
427
428 if let Some(ref w) = remaining {
429 let col_map = ColumnMap::new(&table_schema.columns);
430 rows.retain(|row| match eval_expr(w, &col_map, row) {
431 Ok(val) => is_truthy(&val),
432 Err(_) => false,
433 });
434 }
435
436 let pk_indices = table_schema.pk_indices();
437 let pk_values: Vec<Value> = rows.iter().map(|row| row[pk_indices[0]].clone()).collect();
438 let pk_col = &table_schema.columns[pk_indices[0]].name;
439 let in_set: std::collections::HashSet<Value> = pk_values.into_iter().collect();
440 let new_where = if in_set.is_empty() {
441 Some(Expr::Literal(Value::Boolean(false)))
442 } else {
443 Some(Expr::InSet {
444 expr: Box::new(Expr::Column(pk_col.clone())),
445 values: in_set,
446 has_null: false,
447 negated: false,
448 })
449 };
450
451 let rewritten = UpdateStmt {
452 table: stmt.table.clone(),
453 assignments: stmt.assignments.clone(),
454 where_clause: new_where,
455 };
456 return exec_update(db, schema, &rewritten);
457 }
458
459 let materialized;
460 let stmt = if update_has_subquery(stmt) {
461 materialized = materialize_update(stmt, &mut |sub| {
462 exec_subquery_read(db, schema, sub, &HashMap::new())
463 })?;
464 &materialized
465 } else {
466 stmt
467 };
468
469 let col_map = ColumnMap::new(&table_schema.columns);
470 let pk_indices = table_schema.pk_indices();
471
472 let pk_changed_by_set = stmt.assignments.iter().any(|(col_name, _)| {
473 table_schema
474 .column_index(col_name)
475 .is_some_and(|idx| table_schema.primary_key_columns.contains(&(idx as u16)))
476 });
477
478 let has_fk = !table_schema.foreign_keys.is_empty();
480 let has_indices = !table_schema.indices.is_empty();
481 let has_child_fk = !schema.child_fks_for(&lower_name).is_empty();
482 if !pk_changed_by_set && !has_fk && !has_indices && !has_child_fk && !table_schema.has_checks()
483 {
484 let non_pk = table_schema.non_pk_indices();
485 let enc_pos = table_schema.encoding_positions();
486 let num_pk_cols = table_schema.primary_key_columns.len();
487
488 struct AssignTarget {
489 schema_idx: usize,
490 phys_idx: usize,
491 expr: Expr,
492 col: ColumnDef,
493 }
494 let mut targets: Vec<AssignTarget> = Vec::with_capacity(stmt.assignments.len());
495 for (col_name, expr) in &stmt.assignments {
496 let schema_idx = table_schema
497 .column_index(col_name)
498 .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))?;
499 let nonpk_order = non_pk
500 .iter()
501 .position(|&i| i == schema_idx)
502 .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))?;
503 let phys_idx = enc_pos[nonpk_order] as usize;
504 targets.push(AssignTarget {
505 schema_idx,
506 phys_idx,
507 expr: expr.clone(),
508 col: table_schema.columns[schema_idx].clone(),
509 });
510 }
511
512 let plan = crate::planner::plan_select(table_schema, &stmt.where_clause);
513 let single_int_pk = num_pk_cols == 1
514 && table_schema.columns[table_schema.primary_key_columns[0] as usize].data_type
515 == DataType::Integer;
516
517 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
518
519 if let crate::planner::ScanPlan::PkRangeScan {
521 ref start_key,
522 ref range_conds,
523 ..
524 } = plan
525 {
526 let range_conds = range_conds.clone();
527 let mut partial_row = vec![Value::Null; table_schema.columns.len()];
528 let pk_idx_cache = table_schema.pk_indices().to_vec();
529 let mut patch_buf: Vec<u8> = Vec::with_capacity(256);
530
531 let count =
532 wtx.table_update_range(lower_name.as_bytes(), start_key, |key, value| {
533 if single_int_pk {
535 let pk_int = Value::Integer(decode_pk_integer(key)?);
536 for (op, bound) in &range_conds {
537 match op {
538 BinOp::Lt if &pk_int >= bound => return Ok(None),
539 BinOp::LtEq if &pk_int > bound => return Ok(None),
540 BinOp::Gt if &pk_int <= bound => return Ok(Some(false)),
541 BinOp::GtEq if &pk_int < bound => return Ok(Some(false)),
542 _ => {}
543 }
544 }
545 } else {
546 let pk_vals = decode_composite_key(key, num_pk_cols)?;
547 for (op, bound) in &range_conds {
548 match op {
549 BinOp::Lt if &pk_vals[0] >= bound => return Ok(None),
550 BinOp::LtEq if &pk_vals[0] > bound => return Ok(None),
551 BinOp::Gt if &pk_vals[0] <= bound => return Ok(Some(false)),
552 BinOp::GtEq if &pk_vals[0] < bound => return Ok(Some(false)),
553 _ => {}
554 }
555 }
556 }
557
558 if single_int_pk {
559 partial_row[pk_idx_cache[0]] = Value::Integer(decode_pk_integer(key)?);
560 } else {
561 let pk_vals = decode_composite_key(key, num_pk_cols)?;
562 for (i, &pi) in pk_idx_cache.iter().enumerate() {
563 partial_row[pi] = pk_vals[i].clone();
564 }
565 }
566 for target in &targets {
567 partial_row[target.schema_idx] =
568 decode_column_raw(value, target.phys_idx)?.to_value();
569 }
570 for target in &targets {
572 let new_val = eval_expr(&target.expr, &col_map, &partial_row)?;
573 let coerced = if new_val.is_null() {
574 if !target.col.nullable {
575 return Err(SqlError::NotNullViolation(target.col.name.clone()));
576 }
577 Value::Null
578 } else {
579 let got_type = new_val.data_type();
580 new_val.coerce_into(target.col.data_type).ok_or_else(|| {
581 SqlError::TypeMismatch {
582 expected: target.col.data_type.to_string(),
583 got: got_type.to_string(),
584 }
585 })?
586 };
587 if !patch_column_in_place(value, target.phys_idx, &coerced)? {
588 patch_row_column(value, target.phys_idx, &coerced, &mut patch_buf)?;
589 value[..patch_buf.len()].copy_from_slice(&patch_buf);
590 }
591 }
592 Ok(Some(true))
593 })?;
594
595 wtx.commit().map_err(SqlError::Storage)?;
596 return Ok(ExecutionResult::RowsAffected(count));
597 }
598
599 let mut kv_pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
601 {
602 match &plan {
603 crate::planner::ScanPlan::PkLookup { pk_values } => {
604 let key = crate::encoding::encode_composite_key(pk_values);
605 if let Some(value) = wtx
606 .table_get(lower_name.as_bytes(), &key)
607 .map_err(SqlError::Storage)?
608 {
609 kv_pairs.push((key, value));
610 }
611 }
612 _ => {
613 wtx.table_for_each(lower_name.as_bytes(), |key, value| {
614 kv_pairs.push((key.to_vec(), value.to_vec()));
615 Ok(())
616 })
617 .map_err(SqlError::Storage)?;
618 }
619 }
620 }
621
622 let mut patch_buf: Vec<u8> = Vec::with_capacity(256);
623 let mut partial_row = vec![Value::Null; table_schema.columns.len()];
624 let pk_idx_cache = table_schema.pk_indices().to_vec();
625 let mut patched: Vec<(Vec<u8>, Vec<u8>)> = Vec::with_capacity(kv_pairs.len());
626
627 for (key, raw_value) in &mut kv_pairs {
628 if matches!(plan, crate::planner::ScanPlan::SeqScan) {
629 if let Some(ref w) = stmt.where_clause {
630 let row = decode_full_row(table_schema, key, raw_value)?;
631 if !eval_expr(w, &col_map, &row).is_ok_and(|v| is_truthy(&v)) {
632 continue;
633 }
634 }
635 }
636 if single_int_pk {
637 partial_row[pk_idx_cache[0]] = Value::Integer(decode_pk_integer(key)?);
638 } else {
639 let pk_vals = decode_composite_key(key, num_pk_cols)?;
640 for (i, &pi) in pk_idx_cache.iter().enumerate() {
641 partial_row[pi] = pk_vals[i].clone();
642 }
643 }
644 for target in &targets {
645 partial_row[target.schema_idx] =
646 decode_column_raw(raw_value, target.phys_idx)?.to_value();
647 }
648 for target in &targets {
649 let new_val = eval_expr(&target.expr, &col_map, &partial_row)?;
650 let coerced = if new_val.is_null() {
651 if !target.col.nullable {
652 return Err(SqlError::NotNullViolation(target.col.name.clone()));
653 }
654 Value::Null
655 } else {
656 let got_type = new_val.data_type();
657 new_val.coerce_into(target.col.data_type).ok_or_else(|| {
658 SqlError::TypeMismatch {
659 expected: target.col.data_type.to_string(),
660 got: got_type.to_string(),
661 }
662 })?
663 };
664 if !patch_column_in_place(raw_value, target.phys_idx, &coerced)? {
665 patch_row_column(raw_value, target.phys_idx, &coerced, &mut patch_buf)?;
666 std::mem::swap(raw_value, &mut patch_buf);
667 }
668 }
669 patched.push((std::mem::take(key), std::mem::take(raw_value)));
670 }
671
672 if !patched.is_empty() {
673 let refs: Vec<(&[u8], &[u8])> = patched
674 .iter()
675 .map(|(k, v)| (k.as_slice(), v.as_slice()))
676 .collect();
677 wtx.table_update_sorted(lower_name.as_bytes(), &refs)
678 .map_err(SqlError::Storage)?;
679 }
680 let count = patched.len() as u64;
681 wtx.commit().map_err(SqlError::Storage)?;
682 return Ok(ExecutionResult::RowsAffected(count));
683 }
684
685 let all_candidates = collect_keyed_rows_read(db, table_schema, &stmt.where_clause)?;
687 let matching_rows: Vec<(Vec<u8>, Vec<Value>)> = all_candidates
688 .into_iter()
689 .filter(|(_, row)| match &stmt.where_clause {
690 Some(where_expr) => eval_expr(where_expr, &col_map, row).is_ok_and(|v| is_truthy(&v)),
691 None => true,
692 })
693 .collect();
694
695 if matching_rows.is_empty() {
696 return Ok(ExecutionResult::RowsAffected(0));
697 }
698
699 struct UpdateChange {
700 old_key: Vec<u8>,
701 new_key: Vec<u8>,
702 new_value: Vec<u8>,
703 pk_changed: bool,
704 old_row: Vec<Value>,
705 new_row: Vec<Value>,
706 }
707
708 let mut changes: Vec<UpdateChange> = Vec::new();
709
710 for (old_key, row) in &matching_rows {
711 let mut new_row = row.clone();
712
713 let mut evaluated: Vec<(usize, Value)> = Vec::with_capacity(stmt.assignments.len());
714 for (col_name, expr) in &stmt.assignments {
715 let col_idx = table_schema
716 .column_index(col_name)
717 .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))?;
718 let new_val = eval_expr(expr, &col_map, row)?;
719 let col = &table_schema.columns[col_idx];
720
721 let got_type = new_val.data_type();
722 let coerced = if new_val.is_null() {
723 if !col.nullable {
724 return Err(SqlError::NotNullViolation(col.name.clone()));
725 }
726 Value::Null
727 } else {
728 new_val
729 .coerce_into(col.data_type)
730 .ok_or_else(|| SqlError::TypeMismatch {
731 expected: col.data_type.to_string(),
732 got: got_type.to_string(),
733 })?
734 };
735
736 evaluated.push((col_idx, coerced));
737 }
738
739 for (col_idx, coerced) in evaluated {
740 new_row[col_idx] = coerced;
741 }
742
743 if table_schema.has_checks() {
744 for col in &table_schema.columns {
745 if let Some(ref check) = col.check_expr {
746 let result = eval_expr(check, &col_map, &new_row)?;
747 if !is_truthy(&result) && !result.is_null() {
748 let name = col.check_name.as_deref().unwrap_or(&col.name);
749 return Err(SqlError::CheckViolation(name.to_string()));
750 }
751 }
752 }
753 for tc in &table_schema.check_constraints {
754 let result = eval_expr(&tc.expr, &col_map, &new_row)?;
755 if !is_truthy(&result) && !result.is_null() {
756 let name = tc.name.as_deref().unwrap_or(&tc.sql);
757 return Err(SqlError::CheckViolation(name.to_string()));
758 }
759 }
760 }
761
762 let pk_values: Vec<Value> = pk_indices.iter().map(|&i| new_row[i].clone()).collect();
763 let new_key = encode_composite_key(&pk_values);
764
765 let non_pk = table_schema.non_pk_indices();
766 let enc_pos = table_schema.encoding_positions();
767 let phys_count = table_schema.physical_non_pk_count();
768 let mut value_values = vec![Value::Null; phys_count];
769 for (j, &i) in non_pk.iter().enumerate() {
770 value_values[enc_pos[j] as usize] = new_row[i].clone();
771 }
772 let new_value = encode_row(&value_values);
773
774 changes.push(UpdateChange {
775 old_key: old_key.clone(),
776 new_key,
777 new_value,
778 pk_changed: pk_changed_by_set,
779 old_row: row.clone(),
780 new_row,
781 });
782 }
783
784 {
785 use std::collections::HashSet;
786 let mut new_keys: HashSet<Vec<u8>> = HashSet::new();
787 for c in &changes {
788 if c.pk_changed && c.new_key != c.old_key && !new_keys.insert(c.new_key.clone()) {
789 return Err(SqlError::DuplicateKey);
790 }
791 }
792 }
793
794 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
795
796 if !table_schema.foreign_keys.is_empty() {
798 for c in &changes {
799 for fk in &table_schema.foreign_keys {
800 let fk_changed = fk
801 .columns
802 .iter()
803 .any(|&ci| c.old_row[ci as usize] != c.new_row[ci as usize]);
804 if !fk_changed {
805 continue;
806 }
807 let any_null = fk
808 .columns
809 .iter()
810 .any(|&ci| c.new_row[ci as usize].is_null());
811 if any_null {
812 continue;
813 }
814 let fk_vals: Vec<Value> = fk
815 .columns
816 .iter()
817 .map(|&ci| c.new_row[ci as usize].clone())
818 .collect();
819 let fk_key = encode_composite_key(&fk_vals);
820 let found = wtx
821 .table_get(fk.foreign_table.as_bytes(), &fk_key)
822 .map_err(SqlError::Storage)?;
823 if found.is_none() {
824 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
825 return Err(SqlError::ForeignKeyViolation(name.to_string()));
826 }
827 }
828 }
829 }
830
831 let child_fks = schema.child_fks_for(&lower_name);
833 if !child_fks.is_empty() {
834 for c in &changes {
835 if !c.pk_changed {
836 continue;
837 }
838 let old_pk: Vec<Value> = pk_indices.iter().map(|&i| c.old_row[i].clone()).collect();
839 let old_pk_key = encode_composite_key(&old_pk);
840 for &(child_table, fk) in &child_fks {
841 let child_schema = schema.get(child_table).unwrap();
842 let fk_idx = child_schema
843 .indices
844 .iter()
845 .find(|idx| idx.columns == fk.columns);
846 if let Some(idx) = fk_idx {
847 let idx_table = TableSchema::index_table_name(child_table, &idx.name);
848 let mut has_child = false;
849 wtx.table_scan_from(&idx_table, &old_pk_key, |key, _| {
850 if key.starts_with(&old_pk_key) {
851 has_child = true;
852 Ok(false) } else {
854 Ok(false) }
856 })
857 .map_err(SqlError::Storage)?;
858 if has_child {
859 return Err(SqlError::ForeignKeyViolation(format!(
860 "cannot update PK in '{}': referenced by '{}'",
861 lower_name, child_table
862 )));
863 }
864 }
865 }
866 }
867 }
868
869 for c in &changes {
870 let old_pk: Vec<Value> = pk_indices.iter().map(|&i| c.old_row[i].clone()).collect();
871
872 for idx in &table_schema.indices {
873 if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
874 let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
875 let old_idx_key = encode_index_key(idx, &c.old_row, &old_pk);
876 wtx.table_delete(&idx_table, &old_idx_key)
877 .map_err(SqlError::Storage)?;
878 }
879 }
880
881 if c.pk_changed {
882 wtx.table_delete(lower_name.as_bytes(), &c.old_key)
883 .map_err(SqlError::Storage)?;
884 }
885 }
886
887 for c in &changes {
888 let new_pk: Vec<Value> = pk_indices.iter().map(|&i| c.new_row[i].clone()).collect();
889
890 if c.pk_changed {
891 let is_new = wtx
892 .table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
893 .map_err(SqlError::Storage)?;
894 if !is_new {
895 return Err(SqlError::DuplicateKey);
896 }
897 } else {
898 wtx.table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
899 .map_err(SqlError::Storage)?;
900 }
901
902 for idx in &table_schema.indices {
903 if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
904 let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
905 let new_idx_key = encode_index_key(idx, &c.new_row, &new_pk);
906 let new_idx_val = encode_index_value(idx, &c.new_row, &new_pk);
907 let is_new = wtx
908 .table_insert(&idx_table, &new_idx_key, &new_idx_val)
909 .map_err(SqlError::Storage)?;
910 if idx.unique && !is_new {
911 let indexed_values: Vec<Value> = idx
912 .columns
913 .iter()
914 .map(|&col_idx| c.new_row[col_idx as usize].clone())
915 .collect();
916 let any_null = indexed_values.iter().any(|v| v.is_null());
917 if !any_null {
918 return Err(SqlError::UniqueViolation(idx.name.clone()));
919 }
920 }
921 }
922 }
923 }
924
925 let count = changes.len() as u64;
926 wtx.commit().map_err(SqlError::Storage)?;
927 Ok(ExecutionResult::RowsAffected(count))
928}
929
930pub(super) fn exec_delete(
931 db: &Database,
932 schema: &SchemaManager,
933 stmt: &DeleteStmt,
934) -> Result<ExecutionResult> {
935 let lower_name = stmt.table.to_ascii_lowercase();
936 if schema.get_view(&lower_name).is_some() {
937 return Err(SqlError::CannotModifyView(stmt.table.clone()));
938 }
939 let table_schema = schema
940 .get(&lower_name)
941 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
942
943 let corr_ctx = CorrelationCtx {
944 outer_schema: table_schema,
945 outer_alias: None,
946 };
947 if has_correlated_where(&stmt.where_clause, &corr_ctx, schema) {
948 let select_stmt = SelectStmt {
949 columns: vec![SelectColumn::AllColumns],
950 from: stmt.table.clone(),
951 from_alias: None,
952 joins: vec![],
953 distinct: false,
954 where_clause: stmt.where_clause.clone(),
955 order_by: vec![],
956 limit: None,
957 offset: None,
958 group_by: vec![],
959 having: None,
960 };
961 let (mut rows, _) = collect_rows_read(db, table_schema, &None, None)?;
962 let remaining =
963 handle_correlated_where_read(db, schema, &select_stmt, &corr_ctx, &mut rows)?;
964
965 if let Some(ref w) = remaining {
966 let col_map = ColumnMap::new(&table_schema.columns);
967 rows.retain(|row| match eval_expr(w, &col_map, row) {
968 Ok(val) => is_truthy(&val),
969 Err(_) => false,
970 });
971 }
972
973 let pk_indices = table_schema.pk_indices();
974 let pk_values: Vec<Value> = rows.iter().map(|row| row[pk_indices[0]].clone()).collect();
975 let pk_col = &table_schema.columns[pk_indices[0]].name;
976 let in_set: std::collections::HashSet<Value> = pk_values.into_iter().collect();
977 let new_where = if in_set.is_empty() {
978 Some(Expr::Literal(Value::Boolean(false)))
979 } else {
980 Some(Expr::InSet {
981 expr: Box::new(Expr::Column(pk_col.clone())),
982 values: in_set,
983 has_null: false,
984 negated: false,
985 })
986 };
987
988 let rewritten = DeleteStmt {
989 table: stmt.table.clone(),
990 where_clause: new_where,
991 };
992 return exec_delete(db, schema, &rewritten);
993 }
994
995 let materialized;
996 let stmt = if delete_has_subquery(stmt) {
997 materialized = materialize_delete(stmt, &mut |sub| {
998 exec_subquery_read(db, schema, sub, &HashMap::new())
999 })?;
1000 &materialized
1001 } else {
1002 stmt
1003 };
1004
1005 let col_map = ColumnMap::new(&table_schema.columns);
1006 let mut wtx = db.begin_write().map_err(SqlError::Storage)?;
1007 let all_candidates = collect_keyed_rows_write(&mut wtx, table_schema, &stmt.where_clause)?;
1008 let rows_to_delete: Vec<(Vec<u8>, Vec<Value>)> = all_candidates
1009 .into_iter()
1010 .filter(|(_, row)| match &stmt.where_clause {
1011 Some(where_expr) => match eval_expr(where_expr, &col_map, row) {
1012 Ok(val) => is_truthy(&val),
1013 Err(_) => false,
1014 },
1015 None => true,
1016 })
1017 .collect();
1018
1019 if rows_to_delete.is_empty() {
1020 wtx.commit().map_err(SqlError::Storage)?;
1021 return Ok(ExecutionResult::RowsAffected(0));
1022 }
1023
1024 let pk_indices = table_schema.pk_indices();
1025
1026 let child_fks = schema.child_fks_for(&lower_name);
1028 if !child_fks.is_empty() {
1029 for (_key, row) in &rows_to_delete {
1030 let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
1031 let pk_key = encode_composite_key(&pk_values);
1032 for &(child_table, fk) in &child_fks {
1033 let child_schema = schema.get(child_table).unwrap();
1034 let fk_idx = child_schema
1035 .indices
1036 .iter()
1037 .find(|idx| idx.columns == fk.columns);
1038 if let Some(idx) = fk_idx {
1039 let idx_table = TableSchema::index_table_name(child_table, &idx.name);
1040 let mut has_child = false;
1041 wtx.table_scan_from(&idx_table, &pk_key, |key, _| {
1042 if key.starts_with(&pk_key) {
1043 has_child = true;
1044 Ok(false)
1045 } else {
1046 Ok(false)
1047 }
1048 })
1049 .map_err(SqlError::Storage)?;
1050 if has_child {
1051 return Err(SqlError::ForeignKeyViolation(format!(
1052 "cannot delete from '{}': referenced by '{}'",
1053 lower_name, child_table
1054 )));
1055 }
1056 }
1057 }
1058 }
1059 }
1060
1061 for (key, row) in &rows_to_delete {
1062 let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
1063 delete_index_entries(&mut wtx, table_schema, row, &pk_values)?;
1064 wtx.table_delete(lower_name.as_bytes(), key)
1065 .map_err(SqlError::Storage)?;
1066 }
1067 let count = rows_to_delete.len() as u64;
1068 wtx.commit().map_err(SqlError::Storage)?;
1069 Ok(ExecutionResult::RowsAffected(count))
1070}
1071
1072pub(super) fn exec_select_in_txn(
1073 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1074 schema: &SchemaManager,
1075 stmt: &SelectStmt,
1076 ctes: &CteContext,
1077) -> Result<ExecutionResult> {
1078 if stmt.from.is_empty() {
1079 let materialized;
1080 let stmt = if stmt_has_subquery(stmt) {
1081 materialized =
1082 materialize_stmt(stmt, &mut |sub| exec_subquery_write(wtx, schema, sub, ctes))?;
1083 &materialized
1084 } else {
1085 stmt
1086 };
1087 return super::exec_select_no_from(stmt);
1088 }
1089
1090 let lower_name = stmt.from.to_ascii_lowercase();
1091
1092 if let Some(cte_result) = ctes.get(&lower_name) {
1093 if stmt.joins.is_empty() {
1094 return super::exec_select_from_cte(cte_result, stmt, &mut |sub| {
1095 exec_subquery_write(wtx, schema, sub, ctes)
1096 });
1097 } else {
1098 return super::exec_select_join_with_ctes(stmt, ctes, &mut |name| {
1099 super::scan_table_write(wtx, schema, name)
1100 });
1101 }
1102 }
1103
1104 if !ctes.is_empty()
1105 && stmt
1106 .joins
1107 .iter()
1108 .any(|j| ctes.contains_key(&j.table.name.to_ascii_lowercase()))
1109 {
1110 return super::exec_select_join_with_ctes(stmt, ctes, &mut |name| {
1111 super::scan_table_write_or_view(wtx, schema, name)
1112 });
1113 }
1114
1115 if let Some(view_def) = schema.get_view(&lower_name) {
1117 if let Some(fused) = try_fuse_view(stmt, schema, view_def)? {
1118 return super::exec_select_in_txn(wtx, schema, &fused, ctes);
1119 }
1120 let view_qr = exec_view_write(wtx, schema, view_def)?;
1121 if stmt.joins.is_empty() {
1122 return super::exec_select_from_cte(&view_qr, stmt, &mut |sub| {
1123 exec_subquery_write(wtx, schema, sub, ctes)
1124 });
1125 } else {
1126 let mut view_ctes = ctes.clone();
1127 view_ctes.insert(lower_name.clone(), view_qr);
1128 return super::exec_select_join_with_ctes(stmt, &view_ctes, &mut |name| {
1129 super::scan_table_write_or_view(wtx, schema, name)
1130 });
1131 }
1132 }
1133
1134 let any_join_view = stmt.joins.iter().any(|j| {
1135 schema
1136 .get_view(&j.table.name.to_ascii_lowercase())
1137 .is_some()
1138 });
1139 if any_join_view {
1140 let mut view_ctes = ctes.clone();
1141 for j in &stmt.joins {
1142 let jname = j.table.name.to_ascii_lowercase();
1143 if let Some(vd) = schema.get_view(&jname) {
1144 if let std::collections::hash_map::Entry::Vacant(e) = view_ctes.entry(jname) {
1145 let vqr = exec_view_write(wtx, schema, vd)?;
1146 e.insert(vqr);
1147 }
1148 }
1149 }
1150 return super::exec_select_join_with_ctes(stmt, &view_ctes, &mut |name| {
1151 super::scan_table_write(wtx, schema, name)
1152 });
1153 }
1154
1155 if !stmt.joins.is_empty() {
1156 return super::exec_select_join_in_txn(wtx, schema, stmt);
1157 }
1158
1159 let lower_name = stmt.from.to_ascii_lowercase();
1160 let table_schema = schema
1161 .get(&lower_name)
1162 .ok_or_else(|| SqlError::TableNotFound(stmt.from.clone()))?;
1163
1164 let corr_ctx = CorrelationCtx {
1166 outer_schema: table_schema,
1167 outer_alias: stmt.from_alias.as_deref(),
1168 };
1169 if has_correlated_where(&stmt.where_clause, &corr_ctx, schema) {
1170 let (mut rows, _) = collect_rows_write(wtx, table_schema, &None, None)?;
1171 let remaining_where =
1172 handle_correlated_where_write(wtx, schema, stmt, &corr_ctx, &mut rows)?;
1173 let clean_stmt = SelectStmt {
1174 where_clause: remaining_where,
1175 columns: stmt.columns.clone(),
1176 from: stmt.from.clone(),
1177 from_alias: stmt.from_alias.clone(),
1178 joins: stmt.joins.clone(),
1179 distinct: stmt.distinct,
1180 order_by: stmt.order_by.clone(),
1181 limit: stmt.limit.clone(),
1182 offset: stmt.offset.clone(),
1183 group_by: stmt.group_by.clone(),
1184 having: stmt.having.clone(),
1185 };
1186 let final_stmt;
1187 let s = if stmt_has_subquery(&clean_stmt) {
1188 final_stmt = materialize_stmt(&clean_stmt, &mut |sub| {
1189 exec_subquery_write(wtx, schema, sub, ctes)
1190 })?;
1191 &final_stmt
1192 } else {
1193 &clean_stmt
1194 };
1195 return super::process_select(&table_schema.columns, rows, s, false);
1196 }
1197
1198 let materialized;
1199 let stmt = if stmt_has_subquery(stmt) {
1200 materialized =
1201 materialize_stmt(stmt, &mut |sub| exec_subquery_write(wtx, schema, sub, ctes))?;
1202 &materialized
1203 } else {
1204 stmt
1205 };
1206
1207 if let Some(result) = try_count_star_shortcut(stmt, || {
1208 wtx.table_entry_count(lower_name.as_bytes())
1209 .map_err(SqlError::Storage)
1210 })? {
1211 return Ok(result);
1212 }
1213
1214 if let Some(plan) = StreamAggPlan::try_new(stmt, table_schema)? {
1215 let mut states: Vec<AggState> = plan.ops.iter().map(|(op, _)| AggState::new(op)).collect();
1216 let mut scan_err: Option<SqlError> = None;
1217 if stmt.where_clause.is_none() {
1218 wtx.table_scan_from(lower_name.as_bytes(), b"", |key, value| {
1219 Ok(plan.feed_row_raw(key, value, &mut states, &mut scan_err))
1220 })
1221 .map_err(SqlError::Storage)?;
1222 } else {
1223 let col_map = ColumnMap::new(&table_schema.columns);
1224 wtx.table_scan_from(lower_name.as_bytes(), b"", |key, value| {
1225 Ok(plan.feed_row(
1226 key,
1227 value,
1228 table_schema,
1229 &col_map,
1230 &stmt.where_clause,
1231 &mut states,
1232 &mut scan_err,
1233 ))
1234 })
1235 .map_err(SqlError::Storage)?;
1236 }
1237 if let Some(e) = scan_err {
1238 return Err(e);
1239 }
1240 return Ok(plan.finish(states));
1241 }
1242
1243 if let Some(plan) = StreamGroupByPlan::try_new(stmt, table_schema)? {
1244 let lower = lower_name.clone();
1245 return plan.execute_scan(|cb| {
1246 wtx.table_scan_from(lower.as_bytes(), b"", |key, value| Ok(cb(key, value)))
1247 });
1248 }
1249
1250 if let Some(plan) = TopKScanPlan::try_new(stmt, table_schema)? {
1251 let lower = lower_name.clone();
1252 return plan.execute_scan(table_schema, stmt, |cb| {
1253 wtx.table_scan_from(lower.as_bytes(), b"", |key, value| Ok(cb(key, value)))
1254 });
1255 }
1256
1257 let scan_limit = compute_scan_limit(stmt);
1258 let (rows, predicate_applied) =
1259 collect_rows_write(wtx, table_schema, &stmt.where_clause, scan_limit)?;
1260 super::process_select(&table_schema.columns, rows, stmt, predicate_applied)
1261}
1262
1263pub(super) fn exec_update_in_txn(
1264 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1265 schema: &SchemaManager,
1266 stmt: &UpdateStmt,
1267) -> Result<ExecutionResult> {
1268 let materialized;
1269 let stmt = if update_has_subquery(stmt) {
1270 materialized = materialize_update(stmt, &mut |sub| {
1271 exec_subquery_write(wtx, schema, sub, &HashMap::new())
1272 })?;
1273 &materialized
1274 } else {
1275 stmt
1276 };
1277
1278 let lower_name = stmt.table.to_ascii_lowercase();
1279 let table_schema = schema
1280 .get(&lower_name)
1281 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1282
1283 let col_map = ColumnMap::new(&table_schema.columns);
1284 let all_candidates = collect_keyed_rows_write(wtx, table_schema, &stmt.where_clause)?;
1285 let matching_rows: Vec<(Vec<u8>, Vec<Value>)> = all_candidates
1286 .into_iter()
1287 .filter(|(_, row)| match &stmt.where_clause {
1288 Some(where_expr) => match eval_expr(where_expr, &col_map, row) {
1289 Ok(val) => is_truthy(&val),
1290 Err(_) => false,
1291 },
1292 None => true,
1293 })
1294 .collect();
1295
1296 if matching_rows.is_empty() {
1297 return Ok(ExecutionResult::RowsAffected(0));
1298 }
1299
1300 struct UpdateChange {
1301 old_key: Vec<u8>,
1302 new_key: Vec<u8>,
1303 new_value: Vec<u8>,
1304 pk_changed: bool,
1305 old_row: Vec<Value>,
1306 new_row: Vec<Value>,
1307 }
1308
1309 let pk_indices = table_schema.pk_indices();
1310 let mut changes: Vec<UpdateChange> = Vec::new();
1311
1312 for (old_key, row) in &matching_rows {
1313 let mut new_row = row.clone();
1314 let mut pk_changed = false;
1315
1316 let mut evaluated: Vec<(usize, Value)> = Vec::with_capacity(stmt.assignments.len());
1318 for (col_name, expr) in &stmt.assignments {
1319 let col_idx = table_schema
1320 .column_index(col_name)
1321 .ok_or_else(|| SqlError::ColumnNotFound(col_name.clone()))?;
1322 let new_val = eval_expr(expr, &col_map, row)?;
1323 let col = &table_schema.columns[col_idx];
1324
1325 let got_type = new_val.data_type();
1326 let coerced = if new_val.is_null() {
1327 if !col.nullable {
1328 return Err(SqlError::NotNullViolation(col.name.clone()));
1329 }
1330 Value::Null
1331 } else {
1332 new_val
1333 .coerce_into(col.data_type)
1334 .ok_or_else(|| SqlError::TypeMismatch {
1335 expected: col.data_type.to_string(),
1336 got: got_type.to_string(),
1337 })?
1338 };
1339
1340 evaluated.push((col_idx, coerced));
1341 }
1342
1343 for (col_idx, coerced) in evaluated {
1344 if table_schema.primary_key_columns.contains(&(col_idx as u16)) {
1345 pk_changed = true;
1346 }
1347 new_row[col_idx] = coerced;
1348 }
1349
1350 if table_schema.has_checks() {
1351 for col in &table_schema.columns {
1352 if let Some(ref check) = col.check_expr {
1353 let result = eval_expr(check, &col_map, &new_row)?;
1354 if !is_truthy(&result) && !result.is_null() {
1355 let name = col.check_name.as_deref().unwrap_or(&col.name);
1356 return Err(SqlError::CheckViolation(name.to_string()));
1357 }
1358 }
1359 }
1360 for tc in &table_schema.check_constraints {
1361 let result = eval_expr(&tc.expr, &col_map, &new_row)?;
1362 if !is_truthy(&result) && !result.is_null() {
1363 let name = tc.name.as_deref().unwrap_or(&tc.sql);
1364 return Err(SqlError::CheckViolation(name.to_string()));
1365 }
1366 }
1367 }
1368
1369 let pk_values: Vec<Value> = pk_indices.iter().map(|&i| new_row[i].clone()).collect();
1370 let new_key = encode_composite_key(&pk_values);
1371
1372 let non_pk = table_schema.non_pk_indices();
1373 let enc_pos = table_schema.encoding_positions();
1374 let phys_count = table_schema.physical_non_pk_count();
1375 let mut value_values = vec![Value::Null; phys_count];
1376 for (j, &i) in non_pk.iter().enumerate() {
1377 value_values[enc_pos[j] as usize] = new_row[i].clone();
1378 }
1379 let new_value = encode_row(&value_values);
1380
1381 changes.push(UpdateChange {
1382 old_key: old_key.clone(),
1383 new_key,
1384 new_value,
1385 pk_changed,
1386 old_row: row.clone(),
1387 new_row,
1388 });
1389 }
1390
1391 {
1392 use std::collections::HashSet;
1393 let mut new_keys: HashSet<Vec<u8>> = HashSet::new();
1394 for c in &changes {
1395 if c.pk_changed && c.new_key != c.old_key && !new_keys.insert(c.new_key.clone()) {
1396 return Err(SqlError::DuplicateKey);
1397 }
1398 }
1399 }
1400
1401 if !table_schema.foreign_keys.is_empty() {
1403 for c in &changes {
1404 for fk in &table_schema.foreign_keys {
1405 let fk_changed = fk
1406 .columns
1407 .iter()
1408 .any(|&ci| c.old_row[ci as usize] != c.new_row[ci as usize]);
1409 if !fk_changed {
1410 continue;
1411 }
1412 let any_null = fk
1413 .columns
1414 .iter()
1415 .any(|&ci| c.new_row[ci as usize].is_null());
1416 if any_null {
1417 continue;
1418 }
1419 let fk_vals: Vec<Value> = fk
1420 .columns
1421 .iter()
1422 .map(|&ci| c.new_row[ci as usize].clone())
1423 .collect();
1424 let fk_key = encode_composite_key(&fk_vals);
1425 let found = wtx
1426 .table_get(fk.foreign_table.as_bytes(), &fk_key)
1427 .map_err(SqlError::Storage)?;
1428 if found.is_none() {
1429 let name = fk.name.as_deref().unwrap_or(&fk.foreign_table);
1430 return Err(SqlError::ForeignKeyViolation(name.to_string()));
1431 }
1432 }
1433 }
1434 }
1435
1436 let child_fks = schema.child_fks_for(&lower_name);
1438 if !child_fks.is_empty() {
1439 for c in &changes {
1440 if !c.pk_changed {
1441 continue;
1442 }
1443 let old_pk: Vec<Value> = pk_indices.iter().map(|&i| c.old_row[i].clone()).collect();
1444 let old_pk_key = encode_composite_key(&old_pk);
1445 for &(child_table, fk) in &child_fks {
1446 let child_schema = schema.get(child_table).unwrap();
1447 let fk_idx = child_schema
1448 .indices
1449 .iter()
1450 .find(|idx| idx.columns == fk.columns);
1451 if let Some(idx) = fk_idx {
1452 let idx_table = TableSchema::index_table_name(child_table, &idx.name);
1453 let mut has_child = false;
1454 wtx.table_scan_from(&idx_table, &old_pk_key, |key, _| {
1455 if key.starts_with(&old_pk_key) {
1456 has_child = true;
1457 Ok(false)
1458 } else {
1459 Ok(false)
1460 }
1461 })
1462 .map_err(SqlError::Storage)?;
1463 if has_child {
1464 return Err(SqlError::ForeignKeyViolation(format!(
1465 "cannot update PK in '{}': referenced by '{}'",
1466 lower_name, child_table
1467 )));
1468 }
1469 }
1470 }
1471 }
1472 }
1473
1474 for c in &changes {
1475 let old_pk: Vec<Value> = pk_indices.iter().map(|&i| c.old_row[i].clone()).collect();
1476
1477 for idx in &table_schema.indices {
1478 if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
1479 let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
1480 let old_idx_key = encode_index_key(idx, &c.old_row, &old_pk);
1481 wtx.table_delete(&idx_table, &old_idx_key)
1482 .map_err(SqlError::Storage)?;
1483 }
1484 }
1485
1486 if c.pk_changed {
1487 wtx.table_delete(lower_name.as_bytes(), &c.old_key)
1488 .map_err(SqlError::Storage)?;
1489 }
1490 }
1491
1492 for c in &changes {
1493 let new_pk: Vec<Value> = pk_indices.iter().map(|&i| c.new_row[i].clone()).collect();
1494
1495 if c.pk_changed {
1496 let is_new = wtx
1497 .table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
1498 .map_err(SqlError::Storage)?;
1499 if !is_new {
1500 return Err(SqlError::DuplicateKey);
1501 }
1502 } else {
1503 wtx.table_insert(lower_name.as_bytes(), &c.new_key, &c.new_value)
1504 .map_err(SqlError::Storage)?;
1505 }
1506
1507 for idx in &table_schema.indices {
1508 if index_columns_changed(idx, &c.old_row, &c.new_row) || c.pk_changed {
1509 let idx_table = TableSchema::index_table_name(&lower_name, &idx.name);
1510 let new_idx_key = encode_index_key(idx, &c.new_row, &new_pk);
1511 let new_idx_val = encode_index_value(idx, &c.new_row, &new_pk);
1512 let is_new = wtx
1513 .table_insert(&idx_table, &new_idx_key, &new_idx_val)
1514 .map_err(SqlError::Storage)?;
1515 if idx.unique && !is_new {
1516 let indexed_values: Vec<Value> = idx
1517 .columns
1518 .iter()
1519 .map(|&col_idx| c.new_row[col_idx as usize].clone())
1520 .collect();
1521 let any_null = indexed_values.iter().any(|v| v.is_null());
1522 if !any_null {
1523 return Err(SqlError::UniqueViolation(idx.name.clone()));
1524 }
1525 }
1526 }
1527 }
1528 }
1529
1530 let count = changes.len() as u64;
1531 Ok(ExecutionResult::RowsAffected(count))
1532}
1533
1534pub(super) fn exec_delete_in_txn(
1535 wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
1536 schema: &SchemaManager,
1537 stmt: &DeleteStmt,
1538) -> Result<ExecutionResult> {
1539 let materialized;
1540 let stmt = if delete_has_subquery(stmt) {
1541 materialized = materialize_delete(stmt, &mut |sub| {
1542 exec_subquery_write(wtx, schema, sub, &HashMap::new())
1543 })?;
1544 &materialized
1545 } else {
1546 stmt
1547 };
1548
1549 let lower_name = stmt.table.to_ascii_lowercase();
1550 let table_schema = schema
1551 .get(&lower_name)
1552 .ok_or_else(|| SqlError::TableNotFound(stmt.table.clone()))?;
1553
1554 let col_map = ColumnMap::new(&table_schema.columns);
1555 let all_candidates = collect_keyed_rows_write(wtx, table_schema, &stmt.where_clause)?;
1556 let rows_to_delete: Vec<(Vec<u8>, Vec<Value>)> = all_candidates
1557 .into_iter()
1558 .filter(|(_, row)| match &stmt.where_clause {
1559 Some(where_expr) => match eval_expr(where_expr, &col_map, row) {
1560 Ok(val) => is_truthy(&val),
1561 Err(_) => false,
1562 },
1563 None => true,
1564 })
1565 .collect();
1566
1567 if rows_to_delete.is_empty() {
1568 return Ok(ExecutionResult::RowsAffected(0));
1569 }
1570
1571 let pk_indices = table_schema.pk_indices();
1572
1573 let child_fks = schema.child_fks_for(&lower_name);
1575 if !child_fks.is_empty() {
1576 for (_key, row) in &rows_to_delete {
1577 let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
1578 let pk_key = encode_composite_key(&pk_values);
1579 for &(child_table, fk) in &child_fks {
1580 let child_schema = schema.get(child_table).unwrap();
1581 let fk_idx = child_schema
1582 .indices
1583 .iter()
1584 .find(|idx| idx.columns == fk.columns);
1585 if let Some(idx) = fk_idx {
1586 let idx_table = TableSchema::index_table_name(child_table, &idx.name);
1587 let mut has_child = false;
1588 wtx.table_scan_from(&idx_table, &pk_key, |key, _| {
1589 if key.starts_with(&pk_key) {
1590 has_child = true;
1591 Ok(false)
1592 } else {
1593 Ok(false)
1594 }
1595 })
1596 .map_err(SqlError::Storage)?;
1597 if has_child {
1598 return Err(SqlError::ForeignKeyViolation(format!(
1599 "cannot delete from '{}': referenced by '{}'",
1600 lower_name, child_table
1601 )));
1602 }
1603 }
1604 }
1605 }
1606 }
1607
1608 for (key, row) in &rows_to_delete {
1609 let pk_values: Vec<Value> = pk_indices.iter().map(|&i| row[i].clone()).collect();
1610 delete_index_entries(wtx, table_schema, row, &pk_values)?;
1611 wtx.table_delete(lower_name.as_bytes(), key)
1612 .map_err(SqlError::Storage)?;
1613 }
1614 let count = rows_to_delete.len() as u64;
1615 Ok(ExecutionResult::RowsAffected(count))
1616}