1use arrow::datatypes::DataType;
2use llkv_plan::{
3 CanonicalScalar, ForeignKeyAction as PlanForeignKeyAction, ForeignKeySpec, PlanValue,
4 canonical_scalar_from_plan_value,
5};
6use llkv_result::{Error, Result as LlkvResult};
7use rustc_hash::{FxHashMap, FxHashSet};
8use sqlparser::ast::{self, Expr as SqlExpr};
9use sqlparser::dialect::GenericDialect;
10use sqlparser::parser::Parser;
11
12use super::types::ForeignKeyAction;
13use crate::sys_catalog::MultiColumnIndexEntryMeta;
14use crate::types::{FieldId, TableId};
15
16#[derive(Clone, Debug)]
18pub struct ConstraintColumnInfo {
19 pub name: String,
20 pub field_id: FieldId,
21 pub data_type: DataType,
22 pub nullable: bool,
23 pub check_expr: Option<String>,
24}
25
26#[derive(Hash, Eq, PartialEq, Debug, Clone)]
28pub struct UniqueKey(Vec<CanonicalScalar>);
29
30impl UniqueKey {
31 pub fn from_scalar(value: CanonicalScalar) -> Self {
32 UniqueKey(vec![value])
33 }
34
35 pub fn from_components(values: Vec<CanonicalScalar>) -> Self {
36 UniqueKey(values)
37 }
38
39 pub fn components(&self) -> &[CanonicalScalar] {
40 &self.0
41 }
42
43 pub fn into_components(self) -> Vec<CanonicalScalar> {
44 self.0
45 }
46}
47
48impl From<CanonicalScalar> for UniqueKey {
49 fn from(value: CanonicalScalar) -> Self {
50 UniqueKey::from_scalar(value)
51 }
52}
53
54pub fn validate_check_constraints(
56 columns: &[ConstraintColumnInfo],
57 rows: &[Vec<PlanValue>],
58 column_order: &[usize],
59) -> LlkvResult<()> {
60 if rows.is_empty() {
61 return Ok(());
62 }
63
64 let dialect = GenericDialect {};
65
66 let mut parsed_checks: Vec<(usize, String, String, SqlExpr)> = Vec::new();
67
68 for (idx, column) in columns.iter().enumerate() {
69 if let Some(expr_str) = &column.check_expr {
70 let expr = parse_check_expression(&dialect, expr_str)?;
71 parsed_checks.push((idx, column.name.clone(), expr_str.clone(), expr));
72 }
73 }
74
75 if parsed_checks.is_empty() {
76 return Ok(());
77 }
78
79 let mut name_lookup: FxHashMap<String, usize> = FxHashMap::default();
80 for (idx, column) in columns.iter().enumerate() {
81 name_lookup.insert(column.name.to_ascii_lowercase(), idx);
82 }
83
84 for row in rows {
85 for (_schema_idx, column_name, expr_str, expr) in &parsed_checks {
86 let result = evaluate_check_expression(expr, row, column_order, columns, &name_lookup)?;
87
88 if !result {
89 return Err(Error::ConstraintError(format!(
90 "CHECK constraint failed for column '{}': {}",
91 column_name, expr_str
92 )));
93 }
94 }
95 }
96
97 Ok(())
98}
99
100pub fn ensure_multi_column_unique(
102 existing_keys: &[UniqueKey],
103 new_keys: &[UniqueKey],
104 column_names: &[String],
105) -> LlkvResult<()> {
106 let mut seen: FxHashSet<UniqueKey> = FxHashSet::default();
107 for key in existing_keys {
108 if !seen.insert(key.clone()) {
109 return Err(Error::ConstraintError(format!(
110 "constraint violation on columns '{}'",
111 column_names.join(", ")
112 )));
113 }
114 }
115
116 for key in new_keys {
117 if !seen.insert(key.clone()) {
118 return Err(Error::ConstraintError(format!(
119 "constraint violation on columns '{}'",
120 column_names.join(", ")
121 )));
122 }
123 }
124
125 Ok(())
126}
127
128pub fn unique_key_component(value: &PlanValue, column_name: &str) -> LlkvResult<Option<UniqueKey>> {
130 match value {
131 PlanValue::Null => Ok(None),
132 PlanValue::Struct(_) => Err(Error::InvalidArgumentError(format!(
133 "UNIQUE index is not supported on struct column '{}'",
134 column_name
135 ))),
136 _ => {
137 let scalar = canonical_scalar_from_plan_value(value)?;
138 if matches!(scalar, CanonicalScalar::Null) {
139 Ok(None)
140 } else {
141 Ok(Some(UniqueKey::from_scalar(scalar)))
142 }
143 }
144 }
145}
146
147pub fn build_composite_unique_key(
149 values: &[PlanValue],
150 column_names: &[String],
151) -> LlkvResult<Option<UniqueKey>> {
152 if values.is_empty() {
153 return Ok(None);
154 }
155
156 let mut components = Vec::with_capacity(values.len());
157 for (value, column_name) in values.iter().zip(column_names) {
158 match unique_key_component(value, column_name)? {
159 Some(component) => components.extend(component.into_components()),
160 None => return Ok(None),
161 }
162 }
163
164 Ok(Some(UniqueKey::from_components(components)))
165}
166
167fn parse_check_expression(dialect: &GenericDialect, check_expr_str: &str) -> LlkvResult<SqlExpr> {
168 let sql = format!("SELECT {}", check_expr_str);
169 let mut ast = Parser::parse_sql(dialect, &sql).map_err(|e| {
170 Error::InvalidArgumentError(format!(
171 "Failed to parse CHECK expression '{}': {}",
172 check_expr_str, e
173 ))
174 })?;
175
176 let stmt = ast.pop().ok_or_else(|| {
177 Error::InvalidArgumentError(format!(
178 "CHECK expression '{}' resulted in empty AST",
179 check_expr_str
180 ))
181 })?;
182
183 let query = match stmt {
184 ast::Statement::Query(q) => q,
185 _ => {
186 return Err(Error::InvalidArgumentError(format!(
187 "CHECK expression '{}' did not parse as SELECT",
188 check_expr_str
189 )));
190 }
191 };
192
193 let body = match *query.body {
194 ast::SetExpr::Select(s) => s,
195 _ => {
196 return Err(Error::InvalidArgumentError(format!(
197 "CHECK expression '{}' is not a simple SELECT",
198 check_expr_str
199 )));
200 }
201 };
202
203 if body.projection.len() != 1 {
204 return Err(Error::InvalidArgumentError(format!(
205 "CHECK expression '{}' must have exactly one projection",
206 check_expr_str
207 )));
208 }
209
210 match &body.projection[0] {
211 ast::SelectItem::UnnamedExpr(expr) | ast::SelectItem::ExprWithAlias { expr, .. } => {
212 Ok(expr.clone())
213 }
214 _ => Err(Error::InvalidArgumentError(format!(
215 "CHECK expression '{}' projection is not a simple expression",
216 check_expr_str
217 ))),
218 }
219}
220
221fn evaluate_check_expression(
222 expr: &SqlExpr,
223 row: &[PlanValue],
224 column_order: &[usize],
225 columns: &[ConstraintColumnInfo],
226 name_lookup: &FxHashMap<String, usize>,
227) -> LlkvResult<bool> {
228 use sqlparser::ast::BinaryOperator;
229
230 match expr {
231 SqlExpr::BinaryOp { left, op, right } => {
232 let left_val =
233 evaluate_check_expr_value(left, row, column_order, columns, name_lookup)?;
234 let right_val =
235 evaluate_check_expr_value(right, row, column_order, columns, name_lookup)?;
236
237 match op {
238 BinaryOperator::Eq => {
239 if matches!(left_val, PlanValue::Null) || matches!(right_val, PlanValue::Null) {
240 Ok(true)
241 } else {
242 Ok(left_val == right_val)
243 }
244 }
245 BinaryOperator::NotEq => {
246 if matches!(left_val, PlanValue::Null) || matches!(right_val, PlanValue::Null) {
247 Ok(true)
248 } else {
249 Ok(left_val != right_val)
250 }
251 }
252 BinaryOperator::Lt => compare_numeric(&left_val, &right_val, |l, r| l < r),
253 BinaryOperator::LtEq => compare_numeric(&left_val, &right_val, |l, r| l <= r),
254 BinaryOperator::Gt => compare_numeric(&left_val, &right_val, |l, r| l > r),
255 BinaryOperator::GtEq => compare_numeric(&left_val, &right_val, |l, r| l >= r),
256 _ => Err(Error::InvalidArgumentError(format!(
257 "Unsupported operator in CHECK constraint: {:?}",
258 op
259 ))),
260 }
261 }
262 SqlExpr::IsNull(inner) => {
263 let value = evaluate_check_expr_value(inner, row, column_order, columns, name_lookup)?;
264 Ok(matches!(value, PlanValue::Null))
265 }
266 SqlExpr::IsNotNull(inner) => {
267 let value = evaluate_check_expr_value(inner, row, column_order, columns, name_lookup)?;
268 Ok(!matches!(value, PlanValue::Null))
269 }
270 SqlExpr::Nested(inner) => {
271 evaluate_check_expression(inner, row, column_order, columns, name_lookup)
272 }
273 _ => Err(Error::InvalidArgumentError(format!(
274 "Unsupported expression in CHECK constraint: {:?}",
275 expr
276 ))),
277 }
278}
279
280#[allow(clippy::only_used_in_recursion)]
281fn evaluate_check_expr_value(
282 expr: &SqlExpr,
283 row: &[PlanValue],
284 column_order: &[usize],
285 columns: &[ConstraintColumnInfo],
286 name_lookup: &FxHashMap<String, usize>,
287) -> LlkvResult<PlanValue> {
288 use sqlparser::ast::{BinaryOperator, Expr as SqlExpr};
289
290 match expr {
291 SqlExpr::BinaryOp { left, op, right } => {
292 let left_val =
293 evaluate_check_expr_value(left, row, column_order, columns, name_lookup)?;
294 let right_val =
295 evaluate_check_expr_value(right, row, column_order, columns, name_lookup)?;
296
297 match op {
298 BinaryOperator::Plus => apply_numeric_op(left_val, right_val, |l, r| l + r),
299 BinaryOperator::Minus => apply_numeric_op(left_val, right_val, |l, r| l - r),
300 BinaryOperator::Multiply => apply_numeric_op(left_val, right_val, |l, r| l * r),
301 BinaryOperator::Divide => divide_numeric(left_val, right_val),
302 _ => Err(Error::InvalidArgumentError(format!(
303 "Unsupported binary operator in CHECK constraint value expression: {:?}",
304 op
305 ))),
306 }
307 }
308 SqlExpr::Identifier(ident) => {
309 let column_idx = lookup_column_index(name_lookup, &ident.value)?;
310 extract_row_value(row, column_order, column_idx, &ident.value)
311 }
312 SqlExpr::CompoundIdentifier(idents) => {
313 if idents.len() == 2 {
314 let column_name = &idents[0].value;
315 let field_name = &idents[1].value;
316 let column_idx = lookup_column_index(name_lookup, column_name)?;
317 let value = extract_row_value(row, column_order, column_idx, column_name)?;
318 extract_struct_field(value, column_name, field_name)
319 } else if idents.len() == 3 {
320 let column_name = &idents[1].value;
321 let field_name = &idents[2].value;
322 let column_idx = lookup_column_index(name_lookup, column_name)?;
323 let value = extract_row_value(row, column_order, column_idx, column_name)?;
324 extract_struct_field(value, column_name, field_name)
325 } else {
326 Err(Error::InvalidArgumentError(format!(
327 "Unsupported compound identifier in CHECK constraint: {} parts",
328 idents.len()
329 )))
330 }
331 }
332 SqlExpr::Value(val_with_span) => match &val_with_span.value {
333 ast::Value::Number(n, _) => {
334 if let Ok(i) = n.parse::<i64>() {
335 Ok(PlanValue::Integer(i))
336 } else if let Ok(f) = n.parse::<f64>() {
337 Ok(PlanValue::Float(f))
338 } else {
339 Err(Error::InvalidArgumentError(format!(
340 "Invalid number in CHECK constraint: {}",
341 n
342 )))
343 }
344 }
345 ast::Value::SingleQuotedString(s) | ast::Value::DoubleQuotedString(s) => {
346 Ok(PlanValue::String(s.clone()))
347 }
348 ast::Value::Null => Ok(PlanValue::Null),
349 _ => Err(Error::InvalidArgumentError(format!(
350 "Unsupported value type in CHECK constraint: {:?}",
351 val_with_span.value
352 ))),
353 },
354 SqlExpr::Nested(inner) => {
355 evaluate_check_expr_value(inner, row, column_order, columns, name_lookup)
356 }
357 _ => Err(Error::InvalidArgumentError(format!(
358 "Unsupported expression type in CHECK constraint: {:?}",
359 expr
360 ))),
361 }
362}
363
364fn lookup_column_index(
365 name_lookup: &FxHashMap<String, usize>,
366 column_name: &str,
367) -> LlkvResult<usize> {
368 name_lookup
369 .get(&column_name.to_ascii_lowercase())
370 .copied()
371 .ok_or_else(|| {
372 Error::InvalidArgumentError(format!(
373 "Unknown column '{}' in CHECK constraint",
374 column_name
375 ))
376 })
377}
378
379fn extract_row_value(
380 row: &[PlanValue],
381 column_order: &[usize],
382 schema_idx: usize,
383 column_name: &str,
384) -> LlkvResult<PlanValue> {
385 let insert_pos = column_order
386 .iter()
387 .position(|&dest_idx| dest_idx == schema_idx)
388 .ok_or_else(|| {
389 Error::InvalidArgumentError(format!("Column '{}' not provided in INSERT", column_name))
390 })?;
391
392 Ok(row[insert_pos].clone())
393}
394
395fn extract_struct_field(
396 value: PlanValue,
397 column_name: &str,
398 field_name: &str,
399) -> LlkvResult<PlanValue> {
400 match value {
401 PlanValue::Struct(fields) => fields
402 .into_iter()
403 .find(|(name, _)| name.eq_ignore_ascii_case(field_name))
404 .map(|(_, val)| val)
405 .ok_or_else(|| {
406 Error::InvalidArgumentError(format!(
407 "Struct field '{}' not found in column '{}'",
408 field_name, column_name
409 ))
410 }),
411 _ => Err(Error::InvalidArgumentError(format!(
412 "Column '{}' is not a struct, cannot access field '{}'",
413 column_name, field_name
414 ))),
415 }
416}
417
418fn compare_numeric<F>(left: &PlanValue, right: &PlanValue, compare: F) -> LlkvResult<bool>
419where
420 F: Fn(f64, f64) -> bool,
421{
422 if matches!(left, PlanValue::Null) || matches!(right, PlanValue::Null) {
423 return Ok(true);
426 }
427
428 match (left, right) {
429 (PlanValue::Integer(l), PlanValue::Integer(r)) => Ok(compare(*l as f64, *r as f64)),
430 (PlanValue::Float(l), PlanValue::Float(r)) => Ok(compare(*l, *r)),
431 (PlanValue::Integer(l), PlanValue::Float(r)) => Ok(compare(*l as f64, *r)),
432 (PlanValue::Float(l), PlanValue::Integer(r)) => Ok(compare(*l, *r as f64)),
433 _ => Err(Error::InvalidArgumentError(
434 "CHECK constraint comparison requires numeric values".into(),
435 )),
436 }
437}
438
439fn apply_numeric_op(
440 left: PlanValue,
441 right: PlanValue,
442 op: fn(f64, f64) -> f64,
443) -> LlkvResult<PlanValue> {
444 if matches!(left, PlanValue::Null) || matches!(right, PlanValue::Null) {
445 return Ok(PlanValue::Null);
446 }
447
448 match (left, right) {
449 (PlanValue::Integer(l), PlanValue::Integer(r)) => {
450 let result = op(l as f64, r as f64);
451 if result.fract() == 0.0 {
452 Ok(PlanValue::Integer(result as i64))
453 } else {
454 Ok(PlanValue::Float(result))
455 }
456 }
457 (PlanValue::Float(l), PlanValue::Float(r)) => Ok(PlanValue::Float(op(l, r))),
458 (PlanValue::Integer(l), PlanValue::Float(r)) => Ok(PlanValue::Float(op(l as f64, r))),
459 (PlanValue::Float(l), PlanValue::Integer(r)) => Ok(PlanValue::Float(op(l, r as f64))),
460 _ => Err(Error::InvalidArgumentError(
461 "CHECK constraint arithmetic requires numeric values".into(),
462 )),
463 }
464}
465
466fn divide_numeric(left: PlanValue, right: PlanValue) -> LlkvResult<PlanValue> {
467 if matches!(left, PlanValue::Null) || matches!(right, PlanValue::Null) {
468 return Ok(PlanValue::Null);
469 }
470
471 match (left, right) {
472 (PlanValue::Integer(l), PlanValue::Integer(r)) => {
473 if r == 0 {
474 Err(Error::InvalidArgumentError(
475 "Division by zero in CHECK constraint".into(),
476 ))
477 } else {
478 Ok(PlanValue::Integer(l / r))
479 }
480 }
481 (PlanValue::Float(l), PlanValue::Float(r)) => {
482 if r == 0.0 {
483 Err(Error::InvalidArgumentError(
484 "Division by zero in CHECK constraint".into(),
485 ))
486 } else {
487 Ok(PlanValue::Float(l / r))
488 }
489 }
490 (PlanValue::Integer(l), PlanValue::Float(r)) => {
491 if r == 0.0 {
492 Err(Error::InvalidArgumentError(
493 "Division by zero in CHECK constraint".into(),
494 ))
495 } else {
496 Ok(PlanValue::Float(l as f64 / r))
497 }
498 }
499 (PlanValue::Float(l), PlanValue::Integer(r)) => {
500 if r == 0 {
501 Err(Error::InvalidArgumentError(
502 "Division by zero in CHECK constraint".into(),
503 ))
504 } else {
505 Ok(PlanValue::Float(l / r as f64))
506 }
507 }
508 _ => Err(Error::InvalidArgumentError(
509 "CHECK constraint / operator requires numeric values".into(),
510 )),
511 }
512}
513
514#[derive(Clone, Debug)]
520pub struct ForeignKeyColumn {
521 pub name: String,
522 pub data_type: DataType,
523 pub nullable: bool,
524 pub primary_key: bool,
525 pub unique: bool,
526 pub field_id: FieldId,
527}
528
529#[derive(Clone, Debug)]
531pub struct ForeignKeyTableInfo {
532 pub display_name: String,
533 pub canonical_name: String,
534 pub table_id: TableId,
535 pub columns: Vec<ForeignKeyColumn>,
536 pub multi_column_uniques: Vec<MultiColumnIndexEntryMeta>,
537}
538
539#[derive(Clone, Debug)]
541pub struct ValidatedForeignKey {
542 pub name: Option<String>,
543 pub referencing_indices: Vec<usize>,
544 pub referencing_field_ids: Vec<FieldId>,
545 pub referencing_column_names: Vec<String>,
546 pub referenced_table_id: TableId,
547 pub referenced_table_display: String,
548 pub referenced_table_canonical: String,
549 pub referenced_field_ids: Vec<FieldId>,
550 pub referenced_column_names: Vec<String>,
551 pub on_delete: ForeignKeyAction,
552 pub on_update: ForeignKeyAction,
553}
554
555pub fn validate_foreign_keys<F>(
557 referencing_table: &ForeignKeyTableInfo,
558 specs: &[ForeignKeySpec],
559 mut lookup_table: F,
560) -> LlkvResult<Vec<ValidatedForeignKey>>
561where
562 F: FnMut(&str) -> LlkvResult<ForeignKeyTableInfo>,
563{
564 if specs.is_empty() {
565 return Ok(Vec::new());
566 }
567
568 let mut referencing_lookup: FxHashMap<String, (usize, &ForeignKeyColumn)> =
569 FxHashMap::default();
570 for (idx, column) in referencing_table.columns.iter().enumerate() {
571 referencing_lookup.insert(column.name.to_ascii_lowercase(), (idx, column));
572 }
573
574 let mut results = Vec::with_capacity(specs.len());
575
576 for spec in specs {
577 if spec.columns.is_empty() {
578 return Err(Error::InvalidArgumentError(
579 "FOREIGN KEY requires at least one referencing column".into(),
580 ));
581 }
582
583 let mut seen_referencing = FxHashSet::default();
584 let mut referencing_indices = Vec::with_capacity(spec.columns.len());
585 let mut referencing_field_ids = Vec::with_capacity(spec.columns.len());
586 let mut referencing_column_defs = Vec::with_capacity(spec.columns.len());
587 let mut referencing_column_names = Vec::with_capacity(spec.columns.len());
588
589 for column_name in &spec.columns {
590 let normalized = column_name.to_ascii_lowercase();
591 if !seen_referencing.insert(normalized.clone()) {
592 return Err(Error::InvalidArgumentError(format!(
593 "duplicate column '{}' in FOREIGN KEY constraint",
594 column_name
595 )));
596 }
597
598 let (idx, column) = referencing_lookup.get(&normalized).ok_or_else(|| {
599 Error::InvalidArgumentError(format!(
600 "unknown column '{}' in FOREIGN KEY constraint",
601 column_name
602 ))
603 })?;
604
605 referencing_indices.push(*idx);
606 referencing_field_ids.push(column.field_id);
607 referencing_column_defs.push((*column).clone());
608 referencing_column_names.push(column.name.clone());
609 }
610
611 let referenced_table_info = lookup_table(&spec.referenced_table)?;
612
613 let referenced_columns = if spec.referenced_columns.is_empty() {
614 referenced_table_info
615 .columns
616 .iter()
617 .filter(|col| col.primary_key)
618 .map(|col| col.name.clone())
619 .collect::<Vec<_>>()
620 } else {
621 spec.referenced_columns.clone()
622 };
623
624 if referenced_columns.is_empty() {
625 return Err(Error::InvalidArgumentError(format!(
626 "there is no primary key for referenced table '{}'",
627 spec.referenced_table
628 )));
629 }
630
631 if spec.columns.len() != referenced_columns.len() {
632 return Err(Error::InvalidArgumentError(format!(
633 "number of referencing columns ({}) does not match number of referenced columns ({})",
634 spec.columns.len(),
635 referenced_columns.len()
636 )));
637 }
638
639 let mut seen_referenced = FxHashSet::default();
640 let mut referenced_lookup: FxHashMap<String, &ForeignKeyColumn> = FxHashMap::default();
641 for column in &referenced_table_info.columns {
642 referenced_lookup.insert(column.name.to_ascii_lowercase(), column);
643 }
644
645 let mut referenced_field_ids = Vec::with_capacity(referenced_columns.len());
646 let mut referenced_column_defs = Vec::with_capacity(referenced_columns.len());
647 let mut referenced_column_names = Vec::with_capacity(referenced_columns.len());
648
649 for column_name in referenced_columns.iter() {
650 let normalized = column_name.to_ascii_lowercase();
651 if !seen_referenced.insert(normalized.clone()) {
652 return Err(Error::InvalidArgumentError(format!(
653 "duplicate referenced column '{}' in FOREIGN KEY constraint",
654 column_name
655 )));
656 }
657
658 let column = referenced_lookup.get(&normalized).ok_or_else(|| {
659 Error::InvalidArgumentError(format!(
660 "unknown referenced column '{}' in table '{}'",
661 column_name, referenced_table_info.display_name
662 ))
663 })?;
664
665 referenced_field_ids.push(column.field_id);
666 referenced_column_defs.push((*column).clone());
667 referenced_column_names.push(column.name.clone());
668 }
669
670 if referenced_columns.len() == 1 {
672 let column = &referenced_column_defs[0];
674 if !column.primary_key && !column.unique {
675 return Err(Error::InvalidArgumentError(format!(
676 "FOREIGN KEY references column '{}' in table '{}' that is not UNIQUE or PRIMARY KEY",
677 column.name, referenced_table_info.display_name
678 )));
679 }
680 } else {
681 let all_primary_key = referenced_column_defs.iter().all(|col| col.primary_key);
685
686 let has_multi_column_unique =
688 referenced_table_info
689 .multi_column_uniques
690 .iter()
691 .any(|unique_entry| {
692 if unique_entry.column_ids.len() != referenced_field_ids.len() {
694 return false;
695 }
696 let unique_set: FxHashSet<_> =
698 unique_entry.column_ids.iter().copied().collect();
699 let referenced_set: FxHashSet<_> =
700 referenced_field_ids.iter().copied().collect();
701 unique_set == referenced_set
702 });
703
704 if !all_primary_key && !has_multi_column_unique {
705 return Err(Error::InvalidArgumentError(format!(
706 "FOREIGN KEY references columns ({}) in table '{}' that do not form a UNIQUE or PRIMARY KEY constraint",
707 referenced_column_names.join(", "),
708 referenced_table_info.display_name
709 )));
710 }
711 }
712
713 for (child_col, parent_col) in referencing_column_defs
714 .iter()
715 .zip(referenced_column_defs.iter())
716 {
717 if child_col.data_type != parent_col.data_type {
718 return Err(Error::InvalidArgumentError(format!(
719 "FOREIGN KEY column '{}' type {:?} does not match referenced column '{}' type {:?}",
720 child_col.name, child_col.data_type, parent_col.name, parent_col.data_type
721 )));
722 }
723
724 }
726
727 results.push(ValidatedForeignKey {
728 name: spec.name.clone(),
729 referencing_indices,
730 referencing_field_ids,
731 referencing_column_names,
732 referenced_table_id: referenced_table_info.table_id,
733 referenced_table_display: referenced_table_info.display_name.clone(),
734 referenced_table_canonical: referenced_table_info.canonical_name.clone(),
735 referenced_field_ids,
736 referenced_column_names,
737 on_delete: map_plan_action(spec.on_delete.clone()),
738 on_update: map_plan_action(spec.on_update.clone()),
739 });
740 }
741
742 Ok(results)
743}
744
745fn map_plan_action(action: PlanForeignKeyAction) -> ForeignKeyAction {
746 match action {
747 PlanForeignKeyAction::NoAction => ForeignKeyAction::NoAction,
748 PlanForeignKeyAction::Restrict => ForeignKeyAction::Restrict,
749 }
750}
751
752pub fn ensure_single_column_unique(
758 existing_values: &[PlanValue],
759 new_values: &[PlanValue],
760 column_name: &str,
761) -> LlkvResult<()> {
762 let mut seen: FxHashSet<UniqueKey> = FxHashSet::default();
763
764 for value in existing_values {
765 if let Some(key) = unique_key_component(value, column_name)?
766 && !seen.insert(key.clone())
767 {
768 return Err(Error::ConstraintError(format!(
769 "constraint violation on column '{}'",
770 column_name
771 )));
772 }
773 }
774
775 for value in new_values {
776 if let Some(key) = unique_key_component(value, column_name)?
777 && !seen.insert(key.clone())
778 {
779 return Err(Error::ConstraintError(format!(
780 "constraint violation on column '{}'",
781 column_name
782 )));
783 }
784 }
785
786 Ok(())
787}
788
789pub fn ensure_primary_key(
791 existing_keys: &[UniqueKey],
792 new_keys: &[UniqueKey],
793 column_names: &[String],
794) -> LlkvResult<()> {
795 let pk_label = if column_names.len() == 1 {
796 "column"
797 } else {
798 "columns"
799 };
800 let pk_display = if column_names.len() == 1 {
801 column_names[0].clone()
802 } else {
803 column_names.join(", ")
804 };
805
806 let mut seen: FxHashSet<UniqueKey> = FxHashSet::default();
807
808 for key in existing_keys.iter().chain(new_keys.iter()) {
809 if !seen.insert(key.clone()) {
810 return Err(Error::ConstraintError(format!(
811 "Duplicate key violates primary key constraint on {pk_label} '{pk_display}' (PRIMARY KEY or UNIQUE constraint violation)"
812 )));
813 }
814 }
815
816 Ok(())
817}
818
819pub fn validate_foreign_key_rows(
821 constraint_name: Option<&str>,
822 referencing_table: &str,
823 referenced_table: &str,
824 referenced_column_names: &[String],
825 parent_keys: &FxHashSet<UniqueKey>,
826 candidate_keys: &[UniqueKey],
827) -> LlkvResult<()> {
828 if parent_keys.is_empty() {
829 if candidate_keys.is_empty() {
830 return Ok(());
831 }
832
833 let constraint_label = constraint_name.unwrap_or("FOREIGN KEY");
834 let referenced_columns = if referenced_column_names.is_empty() {
835 String::from("<unknown>")
836 } else {
837 referenced_column_names.join(", ")
838 };
839 return Err(Error::ConstraintError(format!(
840 "Violates foreign key constraint '{}' on table '{}' referencing '{}' (columns: {}) - does not exist in the referenced table",
841 constraint_label, referencing_table, referenced_table, referenced_columns,
842 )));
843 }
844
845 for key in candidate_keys {
846 if parent_keys.contains(key) {
847 continue;
848 }
849
850 let constraint_label = constraint_name.unwrap_or("FOREIGN KEY");
851 let referenced_columns = if referenced_column_names.is_empty() {
852 String::from("<unknown>")
853 } else {
854 referenced_column_names.join(", ")
855 };
856
857 return Err(Error::ConstraintError(format!(
858 "Violates foreign key constraint '{}' on table '{}' referencing '{}' (columns: {}) - does not exist in the referenced table",
859 constraint_label, referencing_table, referenced_table, referenced_columns,
860 )));
861 }
862
863 Ok(())
864}
865
866use crate::{CatalogManager, TableView};
871use llkv_plan::AlterTableOperation;
872use llkv_storage::pager::Pager;
873use simd_r_drive_entry_handle::EntryHandle;
874
875pub fn column_in_primary_or_unique(view: &TableView, field_id: FieldId) -> bool {
877 view.constraint_records
878 .iter()
879 .filter(|record| record.is_active())
880 .any(|record| match &record.kind {
881 super::ConstraintKind::PrimaryKey(payload) => payload.field_ids.contains(&field_id),
882 super::ConstraintKind::Unique(payload) => payload.field_ids.contains(&field_id),
883 _ => false,
884 })
885}
886
887pub fn column_in_multi_column_unique(view: &TableView, field_id: FieldId) -> bool {
889 view.multi_column_uniques
890 .iter()
891 .any(|entry| entry.column_ids.contains(&field_id))
892}
893
894pub fn column_in_foreign_keys<PagerType>(
898 view: &TableView,
899 field_id: FieldId,
900 table_id: TableId,
901 catalog_service: &CatalogManager<PagerType>,
902) -> LlkvResult<Option<String>>
903where
904 PagerType: Pager<Blob = EntryHandle> + Send + Sync + 'static,
905{
906 if let Some(fk) = view
908 .foreign_keys
909 .iter()
910 .find(|fk| fk.referencing_field_ids.contains(&field_id))
911 {
912 return Ok(Some(
913 fk.constraint_name
914 .as_deref()
915 .unwrap_or("unnamed")
916 .to_string(),
917 ));
918 }
919
920 let mut visited: FxHashSet<TableId> = FxHashSet::default();
922 for (referencing_table_id, _) in catalog_service.foreign_keys_referencing(table_id)? {
923 if !visited.insert(referencing_table_id) {
924 continue;
925 }
926
927 for fk in catalog_service.foreign_key_views_for_table(referencing_table_id)? {
928 if fk.referenced_table_id == table_id && fk.referenced_field_ids.contains(&field_id) {
929 return Ok(Some(
930 fk.constraint_name
931 .as_deref()
932 .unwrap_or("unnamed")
933 .to_string(),
934 ));
935 }
936 }
937 }
938
939 Ok(None)
940}
941
942pub fn validate_alter_table_operation<PagerType>(
952 operation: &AlterTableOperation,
953 view: &TableView,
954 table_id: TableId,
955 catalog_service: &CatalogManager<PagerType>,
956) -> LlkvResult<()>
957where
958 PagerType: Pager<Blob = EntryHandle> + Send + Sync + 'static,
959{
960 let resolver = catalog_service
961 .field_resolver(table_id)
962 .ok_or_else(|| Error::Internal("missing field resolver for table".into()))?;
963
964 match operation {
965 AlterTableOperation::RenameColumn {
966 old_column_name,
967 new_column_name,
968 } => {
969 let field_id = resolver.field_id(old_column_name).ok_or_else(|| {
970 Error::CatalogError(format!(
971 "Catalog Error: column '{}' does not exist",
972 old_column_name
973 ))
974 })?;
975
976 if resolver.field_id(new_column_name).is_some() {
977 return Err(Error::CatalogError(format!(
978 "Catalog Error: column '{}' already exists",
979 new_column_name
980 )));
981 }
982
983 if let Some(constraint) =
984 column_in_foreign_keys(view, field_id, table_id, catalog_service)?
985 {
986 return Err(Error::CatalogError(format!(
987 "Catalog Error: column '{}' is involved in the foreign key constraint '{}'",
988 old_column_name, constraint
989 )));
990 }
991
992 Ok(())
993 }
994 AlterTableOperation::SetColumnDataType { column_name, .. } => {
995 let field_id = resolver.field_id(column_name).ok_or_else(|| {
996 Error::CatalogError(format!(
997 "Catalog Error: column '{}' does not exist",
998 column_name
999 ))
1000 })?;
1001
1002 if column_in_primary_or_unique(view, field_id)
1003 || column_in_multi_column_unique(view, field_id)
1004 {
1005 return Err(Error::InvalidArgumentError(format!(
1006 "Binder Error: Cannot change the type of a column that has a UNIQUE or PRIMARY KEY constraint specified (column '{}')",
1007 column_name
1008 )));
1009 }
1010
1011 if let Some(constraint) =
1012 column_in_foreign_keys(view, field_id, table_id, catalog_service)?
1013 {
1014 return Err(Error::CatalogError(format!(
1015 "Catalog Error: column '{}' is involved in the foreign key constraint '{}'",
1016 column_name, constraint
1017 )));
1018 }
1019
1020 Ok(())
1021 }
1022 AlterTableOperation::DropColumn {
1023 column_name,
1024 if_exists,
1025 ..
1026 } => {
1027 let field_id = match resolver.field_id(column_name) {
1028 Some(id) => id,
1029 None if *if_exists => return Ok(()),
1030 None => {
1031 return Err(Error::CatalogError(format!(
1032 "Catalog Error: column '{}' does not exist",
1033 column_name
1034 )));
1035 }
1036 };
1037
1038 if column_in_primary_or_unique(view, field_id)
1039 || column_in_multi_column_unique(view, field_id)
1040 {
1041 return Err(Error::CatalogError(format!(
1042 "Catalog Error: there is a UNIQUE constraint that depends on it (column '{}')",
1043 column_name
1044 )));
1045 }
1046
1047 if column_in_foreign_keys(view, field_id, table_id, catalog_service)?.is_some() {
1048 return Err(Error::CatalogError(format!(
1049 "Catalog Error: there is a FOREIGN KEY constraint that depends on it (column '{}')",
1050 column_name
1051 )));
1052 }
1053
1054 Ok(())
1055 }
1056 }
1057}