1use arrow::datatypes::DataType;
2use llkv_plan::{
3 CanonicalScalar, ForeignKeyAction as PlanForeignKeyAction, ForeignKeySpec, PlanValue,
4};
5use llkv_result::{Error, Result as LlkvResult};
6use rustc_hash::{FxHashMap, FxHashSet};
7use sqlparser::ast::{self, Expr as SqlExpr};
8use sqlparser::dialect::GenericDialect;
9use sqlparser::parser::Parser;
10
11use super::types::ForeignKeyAction;
12use crate::sys_catalog::MultiColumnIndexEntryMeta;
13use crate::types::{FieldId, TableId};
14
15#[derive(Clone, Debug)]
17pub struct ConstraintColumnInfo {
18 pub name: String,
19 pub field_id: FieldId,
20 pub data_type: DataType,
21 pub nullable: bool,
22 pub check_expr: Option<String>,
23}
24
25#[derive(Hash, Eq, PartialEq, Debug, Clone)]
27pub struct UniqueKey(Vec<CanonicalScalar>);
28
29impl UniqueKey {
30 pub fn from_scalar(value: CanonicalScalar) -> Self {
31 UniqueKey(vec![value])
32 }
33
34 pub fn from_components(values: Vec<CanonicalScalar>) -> Self {
35 UniqueKey(values)
36 }
37
38 pub fn components(&self) -> &[CanonicalScalar] {
39 &self.0
40 }
41
42 pub fn into_components(self) -> Vec<CanonicalScalar> {
43 self.0
44 }
45}
46
47impl From<CanonicalScalar> for UniqueKey {
48 fn from(value: CanonicalScalar) -> Self {
49 UniqueKey::from_scalar(value)
50 }
51}
52
53pub fn validate_check_constraints(
55 columns: &[ConstraintColumnInfo],
56 rows: &[Vec<PlanValue>],
57 column_order: &[usize],
58) -> LlkvResult<()> {
59 if rows.is_empty() {
60 return Ok(());
61 }
62
63 let dialect = GenericDialect {};
64
65 let mut parsed_checks: Vec<(usize, String, String, SqlExpr)> = Vec::new();
66
67 for (idx, column) in columns.iter().enumerate() {
68 if let Some(expr_str) = &column.check_expr {
69 let expr = parse_check_expression(&dialect, expr_str)?;
70 parsed_checks.push((idx, column.name.clone(), expr_str.clone(), expr));
71 }
72 }
73
74 if parsed_checks.is_empty() {
75 return Ok(());
76 }
77
78 let mut name_lookup: FxHashMap<String, usize> = FxHashMap::default();
79 for (idx, column) in columns.iter().enumerate() {
80 name_lookup.insert(column.name.to_ascii_lowercase(), idx);
81 }
82
83 for row in rows {
84 for (_schema_idx, column_name, expr_str, expr) in &parsed_checks {
85 let result = evaluate_check_expression(expr, row, column_order, columns, &name_lookup)?;
86
87 if !result {
88 return Err(Error::ConstraintError(format!(
89 "CHECK constraint failed for column '{}': {}",
90 column_name, expr_str
91 )));
92 }
93 }
94 }
95
96 Ok(())
97}
98
99pub fn ensure_multi_column_unique(
101 existing_rows: &[Vec<PlanValue>],
102 new_rows: &[Vec<PlanValue>],
103 column_names: &[String],
104) -> LlkvResult<()> {
105 let mut existing_keys: FxHashSet<UniqueKey> = FxHashSet::default();
106 for values in existing_rows {
107 if let Some(key) = build_composite_unique_key(values, column_names)?
108 && !existing_keys.insert(key.clone())
109 {
110 return Err(Error::ConstraintError(format!(
111 "constraint violation on columns '{}'",
112 column_names.join(", ")
113 )));
114 }
115 }
116
117 let mut new_keys: FxHashSet<UniqueKey> = FxHashSet::default();
118 for values in new_rows {
119 if let Some(key) = build_composite_unique_key(values, column_names)?
120 && (existing_keys.contains(&key) || !new_keys.insert(key))
121 {
122 return Err(Error::ConstraintError(format!(
123 "constraint violation on columns '{}'",
124 column_names.join(", ")
125 )));
126 }
127 }
128
129 Ok(())
130}
131
132pub fn unique_key_component(value: &PlanValue, column_name: &str) -> LlkvResult<Option<UniqueKey>> {
134 match value {
135 PlanValue::Null => Ok(None),
136 PlanValue::Struct(_) => Err(Error::InvalidArgumentError(format!(
137 "UNIQUE index is not supported on struct column '{}'",
138 column_name
139 ))),
140 _ => {
141 let scalar = CanonicalScalar::from_plan_value(value)?;
142 if matches!(scalar, CanonicalScalar::Null) {
143 Ok(None)
144 } else {
145 Ok(Some(UniqueKey::from_scalar(scalar)))
146 }
147 }
148 }
149}
150
151pub fn build_composite_unique_key(
153 values: &[PlanValue],
154 column_names: &[String],
155) -> LlkvResult<Option<UniqueKey>> {
156 if values.is_empty() {
157 return Ok(None);
158 }
159
160 let mut components = Vec::with_capacity(values.len());
161 for (value, column_name) in values.iter().zip(column_names) {
162 match unique_key_component(value, column_name)? {
163 Some(component) => components.extend(component.into_components()),
164 None => return Ok(None),
165 }
166 }
167
168 Ok(Some(UniqueKey::from_components(components)))
169}
170
171fn parse_check_expression(dialect: &GenericDialect, check_expr_str: &str) -> LlkvResult<SqlExpr> {
172 let sql = format!("SELECT {}", check_expr_str);
173 let mut ast = Parser::parse_sql(dialect, &sql).map_err(|e| {
174 Error::InvalidArgumentError(format!(
175 "Failed to parse CHECK expression '{}': {}",
176 check_expr_str, e
177 ))
178 })?;
179
180 let stmt = ast.pop().ok_or_else(|| {
181 Error::InvalidArgumentError(format!(
182 "CHECK expression '{}' resulted in empty AST",
183 check_expr_str
184 ))
185 })?;
186
187 let query = match stmt {
188 ast::Statement::Query(q) => q,
189 _ => {
190 return Err(Error::InvalidArgumentError(format!(
191 "CHECK expression '{}' did not parse as SELECT",
192 check_expr_str
193 )));
194 }
195 };
196
197 let body = match *query.body {
198 ast::SetExpr::Select(s) => s,
199 _ => {
200 return Err(Error::InvalidArgumentError(format!(
201 "CHECK expression '{}' is not a simple SELECT",
202 check_expr_str
203 )));
204 }
205 };
206
207 if body.projection.len() != 1 {
208 return Err(Error::InvalidArgumentError(format!(
209 "CHECK expression '{}' must have exactly one projection",
210 check_expr_str
211 )));
212 }
213
214 match &body.projection[0] {
215 ast::SelectItem::UnnamedExpr(expr) | ast::SelectItem::ExprWithAlias { expr, .. } => {
216 Ok(expr.clone())
217 }
218 _ => Err(Error::InvalidArgumentError(format!(
219 "CHECK expression '{}' projection is not a simple expression",
220 check_expr_str
221 ))),
222 }
223}
224
225fn evaluate_check_expression(
226 expr: &SqlExpr,
227 row: &[PlanValue],
228 column_order: &[usize],
229 columns: &[ConstraintColumnInfo],
230 name_lookup: &FxHashMap<String, usize>,
231) -> LlkvResult<bool> {
232 use sqlparser::ast::BinaryOperator;
233
234 match expr {
235 SqlExpr::BinaryOp { left, op, right } => {
236 let left_val =
237 evaluate_check_expr_value(left, row, column_order, columns, name_lookup)?;
238 let right_val =
239 evaluate_check_expr_value(right, row, column_order, columns, name_lookup)?;
240
241 match op {
242 BinaryOperator::Eq => {
243 if matches!(left_val, PlanValue::Null) || matches!(right_val, PlanValue::Null) {
244 Ok(true)
245 } else {
246 Ok(left_val == right_val)
247 }
248 }
249 BinaryOperator::NotEq => {
250 if matches!(left_val, PlanValue::Null) || matches!(right_val, PlanValue::Null) {
251 Ok(true)
252 } else {
253 Ok(left_val != right_val)
254 }
255 }
256 BinaryOperator::Lt => compare_numeric(&left_val, &right_val, |l, r| l < r),
257 BinaryOperator::LtEq => compare_numeric(&left_val, &right_val, |l, r| l <= r),
258 BinaryOperator::Gt => compare_numeric(&left_val, &right_val, |l, r| l > r),
259 BinaryOperator::GtEq => compare_numeric(&left_val, &right_val, |l, r| l >= r),
260 _ => Err(Error::InvalidArgumentError(format!(
261 "Unsupported operator in CHECK constraint: {:?}",
262 op
263 ))),
264 }
265 }
266 SqlExpr::IsNull(inner) => {
267 let value = evaluate_check_expr_value(inner, row, column_order, columns, name_lookup)?;
268 Ok(matches!(value, PlanValue::Null))
269 }
270 SqlExpr::IsNotNull(inner) => {
271 let value = evaluate_check_expr_value(inner, row, column_order, columns, name_lookup)?;
272 Ok(!matches!(value, PlanValue::Null))
273 }
274 SqlExpr::Nested(inner) => {
275 evaluate_check_expression(inner, row, column_order, columns, name_lookup)
276 }
277 _ => Err(Error::InvalidArgumentError(format!(
278 "Unsupported expression in CHECK constraint: {:?}",
279 expr
280 ))),
281 }
282}
283
284#[allow(clippy::only_used_in_recursion)]
285fn evaluate_check_expr_value(
286 expr: &SqlExpr,
287 row: &[PlanValue],
288 column_order: &[usize],
289 columns: &[ConstraintColumnInfo],
290 name_lookup: &FxHashMap<String, usize>,
291) -> LlkvResult<PlanValue> {
292 use sqlparser::ast::{BinaryOperator, Expr as SqlExpr};
293
294 match expr {
295 SqlExpr::BinaryOp { left, op, right } => {
296 let left_val =
297 evaluate_check_expr_value(left, row, column_order, columns, name_lookup)?;
298 let right_val =
299 evaluate_check_expr_value(right, row, column_order, columns, name_lookup)?;
300
301 match op {
302 BinaryOperator::Plus => apply_numeric_op(left_val, right_val, |l, r| l + r),
303 BinaryOperator::Minus => apply_numeric_op(left_val, right_val, |l, r| l - r),
304 BinaryOperator::Multiply => apply_numeric_op(left_val, right_val, |l, r| l * r),
305 BinaryOperator::Divide => divide_numeric(left_val, right_val),
306 _ => Err(Error::InvalidArgumentError(format!(
307 "Unsupported binary operator in CHECK constraint value expression: {:?}",
308 op
309 ))),
310 }
311 }
312 SqlExpr::Identifier(ident) => {
313 let column_idx = lookup_column_index(name_lookup, &ident.value)?;
314 extract_row_value(row, column_order, column_idx, &ident.value)
315 }
316 SqlExpr::CompoundIdentifier(idents) => {
317 if idents.len() == 2 {
318 let column_name = &idents[0].value;
319 let field_name = &idents[1].value;
320 let column_idx = lookup_column_index(name_lookup, column_name)?;
321 let value = extract_row_value(row, column_order, column_idx, column_name)?;
322 extract_struct_field(value, column_name, field_name)
323 } else if idents.len() == 3 {
324 let column_name = &idents[1].value;
325 let field_name = &idents[2].value;
326 let column_idx = lookup_column_index(name_lookup, column_name)?;
327 let value = extract_row_value(row, column_order, column_idx, column_name)?;
328 extract_struct_field(value, column_name, field_name)
329 } else {
330 Err(Error::InvalidArgumentError(format!(
331 "Unsupported compound identifier in CHECK constraint: {} parts",
332 idents.len()
333 )))
334 }
335 }
336 SqlExpr::Value(val_with_span) => match &val_with_span.value {
337 ast::Value::Number(n, _) => {
338 if let Ok(i) = n.parse::<i64>() {
339 Ok(PlanValue::Integer(i))
340 } else if let Ok(f) = n.parse::<f64>() {
341 Ok(PlanValue::Float(f))
342 } else {
343 Err(Error::InvalidArgumentError(format!(
344 "Invalid number in CHECK constraint: {}",
345 n
346 )))
347 }
348 }
349 ast::Value::SingleQuotedString(s) | ast::Value::DoubleQuotedString(s) => {
350 Ok(PlanValue::String(s.clone()))
351 }
352 ast::Value::Null => Ok(PlanValue::Null),
353 _ => Err(Error::InvalidArgumentError(format!(
354 "Unsupported value type in CHECK constraint: {:?}",
355 val_with_span.value
356 ))),
357 },
358 SqlExpr::Nested(inner) => {
359 evaluate_check_expr_value(inner, row, column_order, columns, name_lookup)
360 }
361 _ => Err(Error::InvalidArgumentError(format!(
362 "Unsupported expression type in CHECK constraint: {:?}",
363 expr
364 ))),
365 }
366}
367
368fn lookup_column_index(
369 name_lookup: &FxHashMap<String, usize>,
370 column_name: &str,
371) -> LlkvResult<usize> {
372 name_lookup
373 .get(&column_name.to_ascii_lowercase())
374 .copied()
375 .ok_or_else(|| {
376 Error::InvalidArgumentError(format!(
377 "Unknown column '{}' in CHECK constraint",
378 column_name
379 ))
380 })
381}
382
383fn extract_row_value(
384 row: &[PlanValue],
385 column_order: &[usize],
386 schema_idx: usize,
387 column_name: &str,
388) -> LlkvResult<PlanValue> {
389 let insert_pos = column_order
390 .iter()
391 .position(|&dest_idx| dest_idx == schema_idx)
392 .ok_or_else(|| {
393 Error::InvalidArgumentError(format!("Column '{}' not provided in INSERT", column_name))
394 })?;
395
396 Ok(row[insert_pos].clone())
397}
398
399fn extract_struct_field(
400 value: PlanValue,
401 column_name: &str,
402 field_name: &str,
403) -> LlkvResult<PlanValue> {
404 match value {
405 PlanValue::Struct(fields) => fields
406 .into_iter()
407 .find(|(name, _)| name.eq_ignore_ascii_case(field_name))
408 .map(|(_, val)| val)
409 .ok_or_else(|| {
410 Error::InvalidArgumentError(format!(
411 "Struct field '{}' not found in column '{}'",
412 field_name, column_name
413 ))
414 }),
415 _ => Err(Error::InvalidArgumentError(format!(
416 "Column '{}' is not a struct, cannot access field '{}'",
417 column_name, field_name
418 ))),
419 }
420}
421
422fn compare_numeric<F>(left: &PlanValue, right: &PlanValue, compare: F) -> LlkvResult<bool>
423where
424 F: Fn(f64, f64) -> bool,
425{
426 if matches!(left, PlanValue::Null) || matches!(right, PlanValue::Null) {
427 return Ok(true);
430 }
431
432 match (left, right) {
433 (PlanValue::Integer(l), PlanValue::Integer(r)) => Ok(compare(*l as f64, *r as f64)),
434 (PlanValue::Float(l), PlanValue::Float(r)) => Ok(compare(*l, *r)),
435 (PlanValue::Integer(l), PlanValue::Float(r)) => Ok(compare(*l as f64, *r)),
436 (PlanValue::Float(l), PlanValue::Integer(r)) => Ok(compare(*l, *r as f64)),
437 _ => Err(Error::InvalidArgumentError(
438 "CHECK constraint comparison requires numeric values".into(),
439 )),
440 }
441}
442
443fn apply_numeric_op(
444 left: PlanValue,
445 right: PlanValue,
446 op: fn(f64, f64) -> f64,
447) -> LlkvResult<PlanValue> {
448 if matches!(left, PlanValue::Null) || matches!(right, PlanValue::Null) {
449 return Ok(PlanValue::Null);
450 }
451
452 match (left, right) {
453 (PlanValue::Integer(l), PlanValue::Integer(r)) => {
454 let result = op(l as f64, r as f64);
455 if result.fract() == 0.0 {
456 Ok(PlanValue::Integer(result as i64))
457 } else {
458 Ok(PlanValue::Float(result))
459 }
460 }
461 (PlanValue::Float(l), PlanValue::Float(r)) => Ok(PlanValue::Float(op(l, r))),
462 (PlanValue::Integer(l), PlanValue::Float(r)) => Ok(PlanValue::Float(op(l as f64, r))),
463 (PlanValue::Float(l), PlanValue::Integer(r)) => Ok(PlanValue::Float(op(l, r as f64))),
464 _ => Err(Error::InvalidArgumentError(
465 "CHECK constraint arithmetic requires numeric values".into(),
466 )),
467 }
468}
469
470fn divide_numeric(left: PlanValue, right: PlanValue) -> LlkvResult<PlanValue> {
471 if matches!(left, PlanValue::Null) || matches!(right, PlanValue::Null) {
472 return Ok(PlanValue::Null);
473 }
474
475 match (left, right) {
476 (PlanValue::Integer(l), PlanValue::Integer(r)) => {
477 if r == 0 {
478 Err(Error::InvalidArgumentError(
479 "Division by zero in CHECK constraint".into(),
480 ))
481 } else {
482 Ok(PlanValue::Integer(l / r))
483 }
484 }
485 (PlanValue::Float(l), PlanValue::Float(r)) => {
486 if r == 0.0 {
487 Err(Error::InvalidArgumentError(
488 "Division by zero in CHECK constraint".into(),
489 ))
490 } else {
491 Ok(PlanValue::Float(l / r))
492 }
493 }
494 (PlanValue::Integer(l), PlanValue::Float(r)) => {
495 if r == 0.0 {
496 Err(Error::InvalidArgumentError(
497 "Division by zero in CHECK constraint".into(),
498 ))
499 } else {
500 Ok(PlanValue::Float(l as f64 / r))
501 }
502 }
503 (PlanValue::Float(l), PlanValue::Integer(r)) => {
504 if r == 0 {
505 Err(Error::InvalidArgumentError(
506 "Division by zero in CHECK constraint".into(),
507 ))
508 } else {
509 Ok(PlanValue::Float(l / r as f64))
510 }
511 }
512 _ => Err(Error::InvalidArgumentError(
513 "CHECK constraint / operator requires numeric values".into(),
514 )),
515 }
516}
517
518#[derive(Clone, Debug)]
524pub struct ForeignKeyColumn {
525 pub name: String,
526 pub data_type: DataType,
527 pub nullable: bool,
528 pub primary_key: bool,
529 pub unique: bool,
530 pub field_id: FieldId,
531}
532
533#[derive(Clone, Debug)]
535pub struct ForeignKeyTableInfo {
536 pub display_name: String,
537 pub canonical_name: String,
538 pub table_id: TableId,
539 pub columns: Vec<ForeignKeyColumn>,
540 pub multi_column_uniques: Vec<MultiColumnIndexEntryMeta>,
541}
542
543#[derive(Clone, Debug)]
545pub struct ValidatedForeignKey {
546 pub name: Option<String>,
547 pub referencing_indices: Vec<usize>,
548 pub referencing_field_ids: Vec<FieldId>,
549 pub referencing_column_names: Vec<String>,
550 pub referenced_table_id: TableId,
551 pub referenced_table_display: String,
552 pub referenced_table_canonical: String,
553 pub referenced_field_ids: Vec<FieldId>,
554 pub referenced_column_names: Vec<String>,
555 pub on_delete: ForeignKeyAction,
556 pub on_update: ForeignKeyAction,
557}
558
559pub fn validate_foreign_keys<F>(
561 referencing_table: &ForeignKeyTableInfo,
562 specs: &[ForeignKeySpec],
563 mut lookup_table: F,
564) -> LlkvResult<Vec<ValidatedForeignKey>>
565where
566 F: FnMut(&str) -> LlkvResult<ForeignKeyTableInfo>,
567{
568 if specs.is_empty() {
569 return Ok(Vec::new());
570 }
571
572 let mut referencing_lookup: FxHashMap<String, (usize, &ForeignKeyColumn)> =
573 FxHashMap::default();
574 for (idx, column) in referencing_table.columns.iter().enumerate() {
575 referencing_lookup.insert(column.name.to_ascii_lowercase(), (idx, column));
576 }
577
578 let mut results = Vec::with_capacity(specs.len());
579
580 for spec in specs {
581 if spec.columns.is_empty() {
582 return Err(Error::InvalidArgumentError(
583 "FOREIGN KEY requires at least one referencing column".into(),
584 ));
585 }
586
587 let mut seen_referencing = FxHashSet::default();
588 let mut referencing_indices = Vec::with_capacity(spec.columns.len());
589 let mut referencing_field_ids = Vec::with_capacity(spec.columns.len());
590 let mut referencing_column_defs = Vec::with_capacity(spec.columns.len());
591 let mut referencing_column_names = Vec::with_capacity(spec.columns.len());
592
593 for column_name in &spec.columns {
594 let normalized = column_name.to_ascii_lowercase();
595 if !seen_referencing.insert(normalized.clone()) {
596 return Err(Error::InvalidArgumentError(format!(
597 "duplicate column '{}' in FOREIGN KEY constraint",
598 column_name
599 )));
600 }
601
602 let (idx, column) = referencing_lookup.get(&normalized).ok_or_else(|| {
603 Error::InvalidArgumentError(format!(
604 "unknown column '{}' in FOREIGN KEY constraint",
605 column_name
606 ))
607 })?;
608
609 referencing_indices.push(*idx);
610 referencing_field_ids.push(column.field_id);
611 referencing_column_defs.push((*column).clone());
612 referencing_column_names.push(column.name.clone());
613 }
614
615 let referenced_table_info = lookup_table(&spec.referenced_table)?;
616
617 let referenced_columns = if spec.referenced_columns.is_empty() {
618 referenced_table_info
619 .columns
620 .iter()
621 .filter(|col| col.primary_key)
622 .map(|col| col.name.clone())
623 .collect::<Vec<_>>()
624 } else {
625 spec.referenced_columns.clone()
626 };
627
628 if referenced_columns.is_empty() {
629 return Err(Error::InvalidArgumentError(format!(
630 "there is no primary key for referenced table '{}'",
631 spec.referenced_table
632 )));
633 }
634
635 if spec.columns.len() != referenced_columns.len() {
636 return Err(Error::InvalidArgumentError(format!(
637 "number of referencing columns ({}) does not match number of referenced columns ({})",
638 spec.columns.len(),
639 referenced_columns.len()
640 )));
641 }
642
643 let mut seen_referenced = FxHashSet::default();
644 let mut referenced_lookup: FxHashMap<String, &ForeignKeyColumn> = FxHashMap::default();
645 for column in &referenced_table_info.columns {
646 referenced_lookup.insert(column.name.to_ascii_lowercase(), column);
647 }
648
649 let mut referenced_field_ids = Vec::with_capacity(referenced_columns.len());
650 let mut referenced_column_defs = Vec::with_capacity(referenced_columns.len());
651 let mut referenced_column_names = Vec::with_capacity(referenced_columns.len());
652
653 for column_name in referenced_columns.iter() {
654 let normalized = column_name.to_ascii_lowercase();
655 if !seen_referenced.insert(normalized.clone()) {
656 return Err(Error::InvalidArgumentError(format!(
657 "duplicate referenced column '{}' in FOREIGN KEY constraint",
658 column_name
659 )));
660 }
661
662 let column = referenced_lookup.get(&normalized).ok_or_else(|| {
663 Error::InvalidArgumentError(format!(
664 "unknown referenced column '{}' in table '{}'",
665 column_name, referenced_table_info.display_name
666 ))
667 })?;
668
669 referenced_field_ids.push(column.field_id);
670 referenced_column_defs.push((*column).clone());
671 referenced_column_names.push(column.name.clone());
672 }
673
674 if referenced_columns.len() == 1 {
676 let column = &referenced_column_defs[0];
678 if !column.primary_key && !column.unique {
679 return Err(Error::InvalidArgumentError(format!(
680 "FOREIGN KEY references column '{}' in table '{}' that is not UNIQUE or PRIMARY KEY",
681 column.name, referenced_table_info.display_name
682 )));
683 }
684 } else {
685 let all_primary_key = referenced_column_defs.iter().all(|col| col.primary_key);
689
690 let has_multi_column_unique =
692 referenced_table_info
693 .multi_column_uniques
694 .iter()
695 .any(|unique_entry| {
696 if unique_entry.column_ids.len() != referenced_field_ids.len() {
698 return false;
699 }
700 let unique_set: FxHashSet<_> =
702 unique_entry.column_ids.iter().copied().collect();
703 let referenced_set: FxHashSet<_> =
704 referenced_field_ids.iter().copied().collect();
705 unique_set == referenced_set
706 });
707
708 if !all_primary_key && !has_multi_column_unique {
709 return Err(Error::InvalidArgumentError(format!(
710 "FOREIGN KEY references columns ({}) in table '{}' that do not form a UNIQUE or PRIMARY KEY constraint",
711 referenced_column_names.join(", "),
712 referenced_table_info.display_name
713 )));
714 }
715 }
716
717 for (child_col, parent_col) in referencing_column_defs
718 .iter()
719 .zip(referenced_column_defs.iter())
720 {
721 if child_col.data_type != parent_col.data_type {
722 return Err(Error::InvalidArgumentError(format!(
723 "FOREIGN KEY column '{}' type {:?} does not match referenced column '{}' type {:?}",
724 child_col.name, child_col.data_type, parent_col.name, parent_col.data_type
725 )));
726 }
727
728 }
730
731 results.push(ValidatedForeignKey {
732 name: spec.name.clone(),
733 referencing_indices,
734 referencing_field_ids,
735 referencing_column_names,
736 referenced_table_id: referenced_table_info.table_id,
737 referenced_table_display: referenced_table_info.display_name.clone(),
738 referenced_table_canonical: referenced_table_info.canonical_name.clone(),
739 referenced_field_ids,
740 referenced_column_names,
741 on_delete: map_plan_action(spec.on_delete.clone()),
742 on_update: map_plan_action(spec.on_update.clone()),
743 });
744 }
745
746 Ok(results)
747}
748
749fn map_plan_action(action: PlanForeignKeyAction) -> ForeignKeyAction {
750 match action {
751 PlanForeignKeyAction::NoAction => ForeignKeyAction::NoAction,
752 PlanForeignKeyAction::Restrict => ForeignKeyAction::Restrict,
753 }
754}
755
756pub fn ensure_single_column_unique(
762 existing_values: &[PlanValue],
763 new_values: &[PlanValue],
764 column_name: &str,
765) -> LlkvResult<()> {
766 let mut seen: FxHashSet<UniqueKey> = FxHashSet::default();
767
768 for value in existing_values {
769 if let Some(key) = unique_key_component(value, column_name)?
770 && !seen.insert(key.clone())
771 {
772 return Err(Error::ConstraintError(format!(
773 "constraint violation on column '{}'",
774 column_name
775 )));
776 }
777 }
778
779 for value in new_values {
780 if let Some(key) = unique_key_component(value, column_name)?
781 && !seen.insert(key.clone())
782 {
783 return Err(Error::ConstraintError(format!(
784 "constraint violation on column '{}'",
785 column_name
786 )));
787 }
788 }
789
790 Ok(())
791}
792
793pub fn ensure_primary_key(
795 existing_rows: &[Vec<PlanValue>],
796 new_rows: &[Vec<PlanValue>],
797 column_names: &[String],
798) -> LlkvResult<()> {
799 let pk_label = if column_names.len() == 1 {
800 "column"
801 } else {
802 "columns"
803 };
804 let pk_display = if column_names.len() == 1 {
805 column_names[0].clone()
806 } else {
807 column_names.join(", ")
808 };
809
810 let mut seen: FxHashSet<UniqueKey> = FxHashSet::default();
811
812 for row_values in existing_rows {
813 if row_values.len() != column_names.len() {
814 continue;
815 }
816
817 let key = build_composite_unique_key(row_values, column_names)?;
818 let key = key.ok_or_else(|| {
819 Error::ConstraintError(format!(
820 "constraint failed: NOT NULL constraint failed for PRIMARY KEY {pk_label} '{pk_display}'"
821 ))
822 })?;
823
824 if !seen.insert(key.clone()) {
825 return Err(Error::ConstraintError(format!(
826 "Duplicate key violates primary key constraint on {pk_label} '{pk_display}' (PRIMARY KEY or UNIQUE constraint violation)"
827 )));
828 }
829 }
830
831 for row_values in new_rows {
832 if row_values.len() != column_names.len() {
833 continue;
834 }
835
836 let key = build_composite_unique_key(row_values, column_names)?;
837 let key = key.ok_or_else(|| {
838 Error::ConstraintError(format!(
839 "constraint failed: NOT NULL constraint failed for PRIMARY KEY {pk_label} '{pk_display}'"
840 ))
841 })?;
842
843 if !seen.insert(key.clone()) {
844 return Err(Error::ConstraintError(format!(
845 "Duplicate key violates primary key constraint on {pk_label} '{pk_display}' (PRIMARY KEY or UNIQUE constraint violation)"
846 )));
847 }
848 }
849
850 Ok(())
851}
852
853pub fn validate_foreign_key_rows(
855 constraint_name: Option<&str>,
856 referencing_table: &str,
857 referenced_table: &str,
858 referenced_column_names: &[String],
859 parent_keys: &[Vec<PlanValue>],
860 candidate_keys: &[Vec<PlanValue>],
861) -> LlkvResult<()> {
862 if parent_keys.is_empty() {
863 for key in candidate_keys {
865 if key.iter().all(|value| !matches!(value, PlanValue::Null)) {
866 let constraint_label = constraint_name.unwrap_or("FOREIGN KEY");
867 let referenced_columns = if referenced_column_names.is_empty() {
868 String::from("<unknown>")
869 } else {
870 referenced_column_names.join(", ")
871 };
872 return Err(Error::ConstraintError(format!(
873 "Violates foreign key constraint '{}' on table '{}' referencing '{}' (columns: {}) - does not exist in the referenced table",
874 constraint_label, referencing_table, referenced_table, referenced_columns,
875 )));
876 }
877 }
878 return Ok(());
879 }
880
881 for key in candidate_keys {
882 if key.iter().any(|value| matches!(value, PlanValue::Null)) {
883 continue;
884 }
885
886 if parent_keys.iter().any(|existing| existing == key) {
887 continue;
888 }
889
890 let constraint_label = constraint_name.unwrap_or("FOREIGN KEY");
891 let referenced_columns = if referenced_column_names.is_empty() {
892 String::from("<unknown>")
893 } else {
894 referenced_column_names.join(", ")
895 };
896
897 return Err(Error::ConstraintError(format!(
898 "Violates foreign key constraint '{}' on table '{}' referencing '{}' (columns: {}) - does not exist in the referenced table",
899 constraint_label, referencing_table, referenced_table, referenced_columns,
900 )));
901 }
902
903 Ok(())
904}
905
906use crate::{CatalogManager, TableView};
911use llkv_plan::AlterTableOperation;
912use llkv_storage::pager::Pager;
913use simd_r_drive_entry_handle::EntryHandle;
914
915pub fn column_in_primary_or_unique(view: &TableView, field_id: FieldId) -> bool {
917 view.constraint_records
918 .iter()
919 .filter(|record| record.is_active())
920 .any(|record| match &record.kind {
921 super::ConstraintKind::PrimaryKey(payload) => payload.field_ids.contains(&field_id),
922 super::ConstraintKind::Unique(payload) => payload.field_ids.contains(&field_id),
923 _ => false,
924 })
925}
926
927pub fn column_in_multi_column_unique(view: &TableView, field_id: FieldId) -> bool {
929 view.multi_column_uniques
930 .iter()
931 .any(|entry| entry.column_ids.contains(&field_id))
932}
933
934pub fn column_in_foreign_keys<PagerType>(
938 view: &TableView,
939 field_id: FieldId,
940 table_id: TableId,
941 catalog_service: &CatalogManager<PagerType>,
942) -> LlkvResult<Option<String>>
943where
944 PagerType: Pager<Blob = EntryHandle> + Send + Sync + 'static,
945{
946 if let Some(fk) = view
948 .foreign_keys
949 .iter()
950 .find(|fk| fk.referencing_field_ids.contains(&field_id))
951 {
952 return Ok(Some(
953 fk.constraint_name
954 .as_deref()
955 .unwrap_or("unnamed")
956 .to_string(),
957 ));
958 }
959
960 let mut visited: FxHashSet<TableId> = FxHashSet::default();
962 for (referencing_table_id, _) in catalog_service.foreign_keys_referencing(table_id)? {
963 if !visited.insert(referencing_table_id) {
964 continue;
965 }
966
967 for fk in catalog_service.foreign_key_views_for_table(referencing_table_id)? {
968 if fk.referenced_table_id == table_id && fk.referenced_field_ids.contains(&field_id) {
969 return Ok(Some(
970 fk.constraint_name
971 .as_deref()
972 .unwrap_or("unnamed")
973 .to_string(),
974 ));
975 }
976 }
977 }
978
979 Ok(None)
980}
981
982pub fn validate_alter_table_operation<PagerType>(
992 operation: &AlterTableOperation,
993 view: &TableView,
994 table_id: TableId,
995 catalog_service: &CatalogManager<PagerType>,
996) -> LlkvResult<()>
997where
998 PagerType: Pager<Blob = EntryHandle> + Send + Sync + 'static,
999{
1000 let resolver = catalog_service
1001 .field_resolver(table_id)
1002 .ok_or_else(|| Error::Internal("missing field resolver for table".into()))?;
1003
1004 match operation {
1005 AlterTableOperation::RenameColumn {
1006 old_column_name,
1007 new_column_name,
1008 } => {
1009 let field_id = resolver.field_id(old_column_name).ok_or_else(|| {
1010 Error::CatalogError(format!(
1011 "Catalog Error: column '{}' does not exist",
1012 old_column_name
1013 ))
1014 })?;
1015
1016 if resolver.field_id(new_column_name).is_some() {
1017 return Err(Error::CatalogError(format!(
1018 "Catalog Error: column '{}' already exists",
1019 new_column_name
1020 )));
1021 }
1022
1023 if let Some(constraint) =
1024 column_in_foreign_keys(view, field_id, table_id, catalog_service)?
1025 {
1026 return Err(Error::CatalogError(format!(
1027 "Catalog Error: column '{}' is involved in the foreign key constraint '{}'",
1028 old_column_name, constraint
1029 )));
1030 }
1031
1032 Ok(())
1033 }
1034 AlterTableOperation::SetColumnDataType { column_name, .. } => {
1035 let field_id = resolver.field_id(column_name).ok_or_else(|| {
1036 Error::CatalogError(format!(
1037 "Catalog Error: column '{}' does not exist",
1038 column_name
1039 ))
1040 })?;
1041
1042 if column_in_primary_or_unique(view, field_id)
1043 || column_in_multi_column_unique(view, field_id)
1044 {
1045 return Err(Error::InvalidArgumentError(format!(
1046 "Binder Error: Cannot change the type of a column that has a UNIQUE or PRIMARY KEY constraint specified (column '{}')",
1047 column_name
1048 )));
1049 }
1050
1051 if let Some(constraint) =
1052 column_in_foreign_keys(view, field_id, table_id, catalog_service)?
1053 {
1054 return Err(Error::CatalogError(format!(
1055 "Catalog Error: column '{}' is involved in the foreign key constraint '{}'",
1056 column_name, constraint
1057 )));
1058 }
1059
1060 Ok(())
1061 }
1062 AlterTableOperation::DropColumn {
1063 column_name,
1064 if_exists,
1065 ..
1066 } => {
1067 let field_id = match resolver.field_id(column_name) {
1068 Some(id) => id,
1069 None if *if_exists => return Ok(()),
1070 None => {
1071 return Err(Error::CatalogError(format!(
1072 "Catalog Error: column '{}' does not exist",
1073 column_name
1074 )));
1075 }
1076 };
1077
1078 if column_in_primary_or_unique(view, field_id)
1079 || column_in_multi_column_unique(view, field_id)
1080 {
1081 return Err(Error::CatalogError(format!(
1082 "Catalog Error: there is a UNIQUE constraint that depends on it (column '{}')",
1083 column_name
1084 )));
1085 }
1086
1087 if column_in_foreign_keys(view, field_id, table_id, catalog_service)?.is_some() {
1088 return Err(Error::CatalogError(format!(
1089 "Catalog Error: there is a FOREIGN KEY constraint that depends on it (column '{}')",
1090 column_name
1091 )));
1092 }
1093
1094 Ok(())
1095 }
1096 }
1097}