llkv_table/constraints/
validation.rs

1use arrow::datatypes::DataType;
2use llkv_plan::{
3    CanonicalScalar, ForeignKeyAction as PlanForeignKeyAction, ForeignKeySpec, PlanValue,
4    canonical_scalar_from_plan_value,
5};
6use llkv_result::{Error, Result as LlkvResult};
7use rustc_hash::{FxHashMap, FxHashSet};
8use sqlparser::ast::{self, Expr as SqlExpr};
9use sqlparser::dialect::GenericDialect;
10use sqlparser::parser::Parser;
11
12use super::types::ForeignKeyAction;
13use crate::sys_catalog::MultiColumnIndexEntryMeta;
14use crate::types::{FieldId, TableId};
15
16/// Lightweight column descriptor used for constraint validation.
17#[derive(Clone, Debug)]
18pub struct ConstraintColumnInfo {
19    pub name: String,
20    pub field_id: FieldId,
21    pub data_type: DataType,
22    pub nullable: bool,
23    pub check_expr: Option<String>,
24}
25
26/// Canonical representation of values participating in UNIQUE or PRIMARY KEY checks.
27#[derive(Hash, Eq, PartialEq, Debug, Clone)]
28pub struct UniqueKey(Vec<CanonicalScalar>);
29
30impl UniqueKey {
31    pub fn from_scalar(value: CanonicalScalar) -> Self {
32        UniqueKey(vec![value])
33    }
34
35    pub fn from_components(values: Vec<CanonicalScalar>) -> Self {
36        UniqueKey(values)
37    }
38
39    pub fn components(&self) -> &[CanonicalScalar] {
40        &self.0
41    }
42
43    pub fn into_components(self) -> Vec<CanonicalScalar> {
44        self.0
45    }
46}
47
48impl From<CanonicalScalar> for UniqueKey {
49    fn from(value: CanonicalScalar) -> Self {
50        UniqueKey::from_scalar(value)
51    }
52}
53
54/// Validate all CHECK constraints for the provided rows.
55pub fn validate_check_constraints(
56    columns: &[ConstraintColumnInfo],
57    rows: &[Vec<PlanValue>],
58    column_order: &[usize],
59) -> LlkvResult<()> {
60    if rows.is_empty() {
61        return Ok(());
62    }
63
64    let dialect = GenericDialect {};
65
66    let mut parsed_checks: Vec<(usize, String, String, SqlExpr)> = Vec::new();
67
68    for (idx, column) in columns.iter().enumerate() {
69        if let Some(expr_str) = &column.check_expr {
70            let expr = parse_check_expression(&dialect, expr_str)?;
71            parsed_checks.push((idx, column.name.clone(), expr_str.clone(), expr));
72        }
73    }
74
75    if parsed_checks.is_empty() {
76        return Ok(());
77    }
78
79    let mut name_lookup: FxHashMap<String, usize> = FxHashMap::default();
80    for (idx, column) in columns.iter().enumerate() {
81        name_lookup.insert(column.name.to_ascii_lowercase(), idx);
82    }
83
84    for row in rows {
85        for (_schema_idx, column_name, expr_str, expr) in &parsed_checks {
86            let result = evaluate_check_expression(expr, row, column_order, columns, &name_lookup)?;
87
88            if !result {
89                return Err(Error::ConstraintError(format!(
90                    "CHECK constraint failed for column '{}': {}",
91                    column_name, expr_str
92                )));
93            }
94        }
95    }
96
97    Ok(())
98}
99
100/// Ensure that the provided multi-column values maintain uniqueness.
101pub fn ensure_multi_column_unique(
102    existing_keys: &[UniqueKey],
103    new_keys: &[UniqueKey],
104    column_names: &[String],
105) -> LlkvResult<()> {
106    let mut seen: FxHashSet<UniqueKey> = FxHashSet::default();
107    for key in existing_keys {
108        if !seen.insert(key.clone()) {
109            return Err(Error::ConstraintError(format!(
110                "constraint violation on columns '{}'",
111                column_names.join(", ")
112            )));
113        }
114    }
115
116    for key in new_keys {
117        if !seen.insert(key.clone()) {
118            return Err(Error::ConstraintError(format!(
119                "constraint violation on columns '{}'",
120                column_names.join(", ")
121            )));
122        }
123    }
124
125    Ok(())
126}
127
128/// Build a unique key component for a single value.
129pub fn unique_key_component(value: &PlanValue, column_name: &str) -> LlkvResult<Option<UniqueKey>> {
130    match value {
131        PlanValue::Null => Ok(None),
132        PlanValue::Struct(_) => Err(Error::InvalidArgumentError(format!(
133            "UNIQUE index is not supported on struct column '{}'",
134            column_name
135        ))),
136        _ => {
137            let scalar = canonical_scalar_from_plan_value(value)?;
138            if matches!(scalar, CanonicalScalar::Null) {
139                Ok(None)
140            } else {
141                Ok(Some(UniqueKey::from_scalar(scalar)))
142            }
143        }
144    }
145}
146
147/// Build a composite unique key from column values.
148pub fn build_composite_unique_key(
149    values: &[PlanValue],
150    column_names: &[String],
151) -> LlkvResult<Option<UniqueKey>> {
152    if values.is_empty() {
153        return Ok(None);
154    }
155
156    let mut components = Vec::with_capacity(values.len());
157    for (value, column_name) in values.iter().zip(column_names) {
158        match unique_key_component(value, column_name)? {
159            Some(component) => components.extend(component.into_components()),
160            None => return Ok(None),
161        }
162    }
163
164    Ok(Some(UniqueKey::from_components(components)))
165}
166
167fn parse_check_expression(dialect: &GenericDialect, check_expr_str: &str) -> LlkvResult<SqlExpr> {
168    let sql = format!("SELECT {}", check_expr_str);
169    let mut ast = Parser::parse_sql(dialect, &sql).map_err(|e| {
170        Error::InvalidArgumentError(format!(
171            "Failed to parse CHECK expression '{}': {}",
172            check_expr_str, e
173        ))
174    })?;
175
176    let stmt = ast.pop().ok_or_else(|| {
177        Error::InvalidArgumentError(format!(
178            "CHECK expression '{}' resulted in empty AST",
179            check_expr_str
180        ))
181    })?;
182
183    let query = match stmt {
184        ast::Statement::Query(q) => q,
185        _ => {
186            return Err(Error::InvalidArgumentError(format!(
187                "CHECK expression '{}' did not parse as SELECT",
188                check_expr_str
189            )));
190        }
191    };
192
193    let body = match *query.body {
194        ast::SetExpr::Select(s) => s,
195        _ => {
196            return Err(Error::InvalidArgumentError(format!(
197                "CHECK expression '{}' is not a simple SELECT",
198                check_expr_str
199            )));
200        }
201    };
202
203    if body.projection.len() != 1 {
204        return Err(Error::InvalidArgumentError(format!(
205            "CHECK expression '{}' must have exactly one projection",
206            check_expr_str
207        )));
208    }
209
210    match &body.projection[0] {
211        ast::SelectItem::UnnamedExpr(expr) | ast::SelectItem::ExprWithAlias { expr, .. } => {
212            Ok(expr.clone())
213        }
214        _ => Err(Error::InvalidArgumentError(format!(
215            "CHECK expression '{}' projection is not a simple expression",
216            check_expr_str
217        ))),
218    }
219}
220
221fn evaluate_check_expression(
222    expr: &SqlExpr,
223    row: &[PlanValue],
224    column_order: &[usize],
225    columns: &[ConstraintColumnInfo],
226    name_lookup: &FxHashMap<String, usize>,
227) -> LlkvResult<bool> {
228    use sqlparser::ast::BinaryOperator;
229
230    match expr {
231        SqlExpr::BinaryOp { left, op, right } => {
232            let left_val =
233                evaluate_check_expr_value(left, row, column_order, columns, name_lookup)?;
234            let right_val =
235                evaluate_check_expr_value(right, row, column_order, columns, name_lookup)?;
236
237            match op {
238                BinaryOperator::Eq => {
239                    if matches!(left_val, PlanValue::Null) || matches!(right_val, PlanValue::Null) {
240                        Ok(true)
241                    } else {
242                        Ok(left_val == right_val)
243                    }
244                }
245                BinaryOperator::NotEq => {
246                    if matches!(left_val, PlanValue::Null) || matches!(right_val, PlanValue::Null) {
247                        Ok(true)
248                    } else {
249                        Ok(left_val != right_val)
250                    }
251                }
252                BinaryOperator::Lt => compare_numeric(&left_val, &right_val, |l, r| l < r),
253                BinaryOperator::LtEq => compare_numeric(&left_val, &right_val, |l, r| l <= r),
254                BinaryOperator::Gt => compare_numeric(&left_val, &right_val, |l, r| l > r),
255                BinaryOperator::GtEq => compare_numeric(&left_val, &right_val, |l, r| l >= r),
256                _ => Err(Error::InvalidArgumentError(format!(
257                    "Unsupported operator in CHECK constraint: {:?}",
258                    op
259                ))),
260            }
261        }
262        SqlExpr::IsNull(inner) => {
263            let value = evaluate_check_expr_value(inner, row, column_order, columns, name_lookup)?;
264            Ok(matches!(value, PlanValue::Null))
265        }
266        SqlExpr::IsNotNull(inner) => {
267            let value = evaluate_check_expr_value(inner, row, column_order, columns, name_lookup)?;
268            Ok(!matches!(value, PlanValue::Null))
269        }
270        SqlExpr::Nested(inner) => {
271            evaluate_check_expression(inner, row, column_order, columns, name_lookup)
272        }
273        _ => Err(Error::InvalidArgumentError(format!(
274            "Unsupported expression in CHECK constraint: {:?}",
275            expr
276        ))),
277    }
278}
279
280#[allow(clippy::only_used_in_recursion)]
281fn evaluate_check_expr_value(
282    expr: &SqlExpr,
283    row: &[PlanValue],
284    column_order: &[usize],
285    columns: &[ConstraintColumnInfo],
286    name_lookup: &FxHashMap<String, usize>,
287) -> LlkvResult<PlanValue> {
288    use sqlparser::ast::{BinaryOperator, Expr as SqlExpr};
289
290    match expr {
291        SqlExpr::BinaryOp { left, op, right } => {
292            let left_val =
293                evaluate_check_expr_value(left, row, column_order, columns, name_lookup)?;
294            let right_val =
295                evaluate_check_expr_value(right, row, column_order, columns, name_lookup)?;
296
297            match op {
298                BinaryOperator::Plus => apply_numeric_op(left_val, right_val, |l, r| l + r),
299                BinaryOperator::Minus => apply_numeric_op(left_val, right_val, |l, r| l - r),
300                BinaryOperator::Multiply => apply_numeric_op(left_val, right_val, |l, r| l * r),
301                BinaryOperator::Divide => divide_numeric(left_val, right_val),
302                _ => Err(Error::InvalidArgumentError(format!(
303                    "Unsupported binary operator in CHECK constraint value expression: {:?}",
304                    op
305                ))),
306            }
307        }
308        SqlExpr::Identifier(ident) => {
309            let column_idx = lookup_column_index(name_lookup, &ident.value)?;
310            extract_row_value(row, column_order, column_idx, &ident.value)
311        }
312        SqlExpr::CompoundIdentifier(idents) => {
313            if idents.len() == 2 {
314                let column_name = &idents[0].value;
315                let field_name = &idents[1].value;
316                let column_idx = lookup_column_index(name_lookup, column_name)?;
317                let value = extract_row_value(row, column_order, column_idx, column_name)?;
318                extract_struct_field(value, column_name, field_name)
319            } else if idents.len() == 3 {
320                let column_name = &idents[1].value;
321                let field_name = &idents[2].value;
322                let column_idx = lookup_column_index(name_lookup, column_name)?;
323                let value = extract_row_value(row, column_order, column_idx, column_name)?;
324                extract_struct_field(value, column_name, field_name)
325            } else {
326                Err(Error::InvalidArgumentError(format!(
327                    "Unsupported compound identifier in CHECK constraint: {} parts",
328                    idents.len()
329                )))
330            }
331        }
332        SqlExpr::Value(val_with_span) => match &val_with_span.value {
333            ast::Value::Number(n, _) => {
334                if let Ok(i) = n.parse::<i64>() {
335                    Ok(PlanValue::Integer(i))
336                } else if let Ok(f) = n.parse::<f64>() {
337                    Ok(PlanValue::Float(f))
338                } else {
339                    Err(Error::InvalidArgumentError(format!(
340                        "Invalid number in CHECK constraint: {}",
341                        n
342                    )))
343                }
344            }
345            ast::Value::SingleQuotedString(s) | ast::Value::DoubleQuotedString(s) => {
346                Ok(PlanValue::String(s.clone()))
347            }
348            ast::Value::Null => Ok(PlanValue::Null),
349            _ => Err(Error::InvalidArgumentError(format!(
350                "Unsupported value type in CHECK constraint: {:?}",
351                val_with_span.value
352            ))),
353        },
354        SqlExpr::Nested(inner) => {
355            evaluate_check_expr_value(inner, row, column_order, columns, name_lookup)
356        }
357        _ => Err(Error::InvalidArgumentError(format!(
358            "Unsupported expression type in CHECK constraint: {:?}",
359            expr
360        ))),
361    }
362}
363
364fn lookup_column_index(
365    name_lookup: &FxHashMap<String, usize>,
366    column_name: &str,
367) -> LlkvResult<usize> {
368    name_lookup
369        .get(&column_name.to_ascii_lowercase())
370        .copied()
371        .ok_or_else(|| {
372            Error::InvalidArgumentError(format!(
373                "Unknown column '{}' in CHECK constraint",
374                column_name
375            ))
376        })
377}
378
379fn extract_row_value(
380    row: &[PlanValue],
381    column_order: &[usize],
382    schema_idx: usize,
383    column_name: &str,
384) -> LlkvResult<PlanValue> {
385    let insert_pos = column_order
386        .iter()
387        .position(|&dest_idx| dest_idx == schema_idx)
388        .ok_or_else(|| {
389            Error::InvalidArgumentError(format!("Column '{}' not provided in INSERT", column_name))
390        })?;
391
392    Ok(row[insert_pos].clone())
393}
394
395fn extract_struct_field(
396    value: PlanValue,
397    column_name: &str,
398    field_name: &str,
399) -> LlkvResult<PlanValue> {
400    match value {
401        PlanValue::Struct(fields) => fields
402            .into_iter()
403            .find(|(name, _)| name.eq_ignore_ascii_case(field_name))
404            .map(|(_, val)| val)
405            .ok_or_else(|| {
406                Error::InvalidArgumentError(format!(
407                    "Struct field '{}' not found in column '{}'",
408                    field_name, column_name
409                ))
410            }),
411        _ => Err(Error::InvalidArgumentError(format!(
412            "Column '{}' is not a struct, cannot access field '{}'",
413            column_name, field_name
414        ))),
415    }
416}
417
418fn compare_numeric<F>(left: &PlanValue, right: &PlanValue, compare: F) -> LlkvResult<bool>
419where
420    F: Fn(f64, f64) -> bool,
421{
422    if matches!(left, PlanValue::Null) || matches!(right, PlanValue::Null) {
423        // In SQL, any comparison with NULL yields UNKNOWN.
424        // For CHECK constraints, UNKNOWN is treated as TRUE (constraint passes).
425        return Ok(true);
426    }
427
428    match (left, right) {
429        (PlanValue::Integer(l), PlanValue::Integer(r)) => Ok(compare(*l as f64, *r as f64)),
430        (PlanValue::Float(l), PlanValue::Float(r)) => Ok(compare(*l, *r)),
431        (PlanValue::Integer(l), PlanValue::Float(r)) => Ok(compare(*l as f64, *r)),
432        (PlanValue::Float(l), PlanValue::Integer(r)) => Ok(compare(*l, *r as f64)),
433        _ => Err(Error::InvalidArgumentError(
434            "CHECK constraint comparison requires numeric values".into(),
435        )),
436    }
437}
438
439fn apply_numeric_op(
440    left: PlanValue,
441    right: PlanValue,
442    op: fn(f64, f64) -> f64,
443) -> LlkvResult<PlanValue> {
444    if matches!(left, PlanValue::Null) || matches!(right, PlanValue::Null) {
445        return Ok(PlanValue::Null);
446    }
447
448    match (left, right) {
449        (PlanValue::Integer(l), PlanValue::Integer(r)) => {
450            let result = op(l as f64, r as f64);
451            if result.fract() == 0.0 {
452                Ok(PlanValue::Integer(result as i64))
453            } else {
454                Ok(PlanValue::Float(result))
455            }
456        }
457        (PlanValue::Float(l), PlanValue::Float(r)) => Ok(PlanValue::Float(op(l, r))),
458        (PlanValue::Integer(l), PlanValue::Float(r)) => Ok(PlanValue::Float(op(l as f64, r))),
459        (PlanValue::Float(l), PlanValue::Integer(r)) => Ok(PlanValue::Float(op(l, r as f64))),
460        _ => Err(Error::InvalidArgumentError(
461            "CHECK constraint arithmetic requires numeric values".into(),
462        )),
463    }
464}
465
466fn divide_numeric(left: PlanValue, right: PlanValue) -> LlkvResult<PlanValue> {
467    if matches!(left, PlanValue::Null) || matches!(right, PlanValue::Null) {
468        return Ok(PlanValue::Null);
469    }
470
471    match (left, right) {
472        (PlanValue::Integer(l), PlanValue::Integer(r)) => {
473            if r == 0 {
474                Err(Error::InvalidArgumentError(
475                    "Division by zero in CHECK constraint".into(),
476                ))
477            } else {
478                Ok(PlanValue::Integer(l / r))
479            }
480        }
481        (PlanValue::Float(l), PlanValue::Float(r)) => {
482            if r == 0.0 {
483                Err(Error::InvalidArgumentError(
484                    "Division by zero in CHECK constraint".into(),
485                ))
486            } else {
487                Ok(PlanValue::Float(l / r))
488            }
489        }
490        (PlanValue::Integer(l), PlanValue::Float(r)) => {
491            if r == 0.0 {
492                Err(Error::InvalidArgumentError(
493                    "Division by zero in CHECK constraint".into(),
494                ))
495            } else {
496                Ok(PlanValue::Float(l as f64 / r))
497            }
498        }
499        (PlanValue::Float(l), PlanValue::Integer(r)) => {
500            if r == 0 {
501                Err(Error::InvalidArgumentError(
502                    "Division by zero in CHECK constraint".into(),
503                ))
504            } else {
505                Ok(PlanValue::Float(l / r as f64))
506            }
507        }
508        _ => Err(Error::InvalidArgumentError(
509            "CHECK constraint / operator requires numeric values".into(),
510        )),
511    }
512}
513
514// ============================================================================
515// Foreign key validation
516// ============================================================================
517
518/// Column metadata used when validating foreign key definitions.
519#[derive(Clone, Debug)]
520pub struct ForeignKeyColumn {
521    pub name: String,
522    pub data_type: DataType,
523    pub nullable: bool,
524    pub primary_key: bool,
525    pub unique: bool,
526    pub field_id: FieldId,
527}
528
529/// Table metadata used when validating foreign key definitions.
530#[derive(Clone, Debug)]
531pub struct ForeignKeyTableInfo {
532    pub display_name: String,
533    pub canonical_name: String,
534    pub table_id: TableId,
535    pub columns: Vec<ForeignKeyColumn>,
536    pub multi_column_uniques: Vec<MultiColumnIndexEntryMeta>,
537}
538
539/// Result of validating a foreign key specification.
540#[derive(Clone, Debug)]
541pub struct ValidatedForeignKey {
542    pub name: Option<String>,
543    pub referencing_indices: Vec<usize>,
544    pub referencing_field_ids: Vec<FieldId>,
545    pub referencing_column_names: Vec<String>,
546    pub referenced_table_id: TableId,
547    pub referenced_table_display: String,
548    pub referenced_table_canonical: String,
549    pub referenced_field_ids: Vec<FieldId>,
550    pub referenced_column_names: Vec<String>,
551    pub on_delete: ForeignKeyAction,
552    pub on_update: ForeignKeyAction,
553}
554
555/// Validate a set of foreign key specifications against the provided table schemas.
556pub fn validate_foreign_keys<F>(
557    referencing_table: &ForeignKeyTableInfo,
558    specs: &[ForeignKeySpec],
559    mut lookup_table: F,
560) -> LlkvResult<Vec<ValidatedForeignKey>>
561where
562    F: FnMut(&str) -> LlkvResult<ForeignKeyTableInfo>,
563{
564    if specs.is_empty() {
565        return Ok(Vec::new());
566    }
567
568    let mut referencing_lookup: FxHashMap<String, (usize, &ForeignKeyColumn)> =
569        FxHashMap::default();
570    for (idx, column) in referencing_table.columns.iter().enumerate() {
571        referencing_lookup.insert(column.name.to_ascii_lowercase(), (idx, column));
572    }
573
574    let mut results = Vec::with_capacity(specs.len());
575
576    for spec in specs {
577        if spec.columns.is_empty() {
578            return Err(Error::InvalidArgumentError(
579                "FOREIGN KEY requires at least one referencing column".into(),
580            ));
581        }
582
583        let mut seen_referencing = FxHashSet::default();
584        let mut referencing_indices = Vec::with_capacity(spec.columns.len());
585        let mut referencing_field_ids = Vec::with_capacity(spec.columns.len());
586        let mut referencing_column_defs = Vec::with_capacity(spec.columns.len());
587        let mut referencing_column_names = Vec::with_capacity(spec.columns.len());
588
589        for column_name in &spec.columns {
590            let normalized = column_name.to_ascii_lowercase();
591            if !seen_referencing.insert(normalized.clone()) {
592                return Err(Error::InvalidArgumentError(format!(
593                    "duplicate column '{}' in FOREIGN KEY constraint",
594                    column_name
595                )));
596            }
597
598            let (idx, column) = referencing_lookup.get(&normalized).ok_or_else(|| {
599                Error::InvalidArgumentError(format!(
600                    "unknown column '{}' in FOREIGN KEY constraint",
601                    column_name
602                ))
603            })?;
604
605            referencing_indices.push(*idx);
606            referencing_field_ids.push(column.field_id);
607            referencing_column_defs.push((*column).clone());
608            referencing_column_names.push(column.name.clone());
609        }
610
611        let referenced_table_info = lookup_table(&spec.referenced_table)?;
612
613        let referenced_columns = if spec.referenced_columns.is_empty() {
614            referenced_table_info
615                .columns
616                .iter()
617                .filter(|col| col.primary_key)
618                .map(|col| col.name.clone())
619                .collect::<Vec<_>>()
620        } else {
621            spec.referenced_columns.clone()
622        };
623
624        if referenced_columns.is_empty() {
625            return Err(Error::InvalidArgumentError(format!(
626                "there is no primary key for referenced table '{}'",
627                spec.referenced_table
628            )));
629        }
630
631        if spec.columns.len() != referenced_columns.len() {
632            return Err(Error::InvalidArgumentError(format!(
633                "number of referencing columns ({}) does not match number of referenced columns ({})",
634                spec.columns.len(),
635                referenced_columns.len()
636            )));
637        }
638
639        let mut seen_referenced = FxHashSet::default();
640        let mut referenced_lookup: FxHashMap<String, &ForeignKeyColumn> = FxHashMap::default();
641        for column in &referenced_table_info.columns {
642            referenced_lookup.insert(column.name.to_ascii_lowercase(), column);
643        }
644
645        let mut referenced_field_ids = Vec::with_capacity(referenced_columns.len());
646        let mut referenced_column_defs = Vec::with_capacity(referenced_columns.len());
647        let mut referenced_column_names = Vec::with_capacity(referenced_columns.len());
648
649        for column_name in referenced_columns.iter() {
650            let normalized = column_name.to_ascii_lowercase();
651            if !seen_referenced.insert(normalized.clone()) {
652                return Err(Error::InvalidArgumentError(format!(
653                    "duplicate referenced column '{}' in FOREIGN KEY constraint",
654                    column_name
655                )));
656            }
657
658            let column = referenced_lookup.get(&normalized).ok_or_else(|| {
659                Error::InvalidArgumentError(format!(
660                    "unknown referenced column '{}' in table '{}'",
661                    column_name, referenced_table_info.display_name
662                ))
663            })?;
664
665            referenced_field_ids.push(column.field_id);
666            referenced_column_defs.push((*column).clone());
667            referenced_column_names.push(column.name.clone());
668        }
669
670        // Validate that the referenced columns form a UNIQUE or PRIMARY KEY constraint
671        if referenced_columns.len() == 1 {
672            // Single column: check if it has UNIQUE or PRIMARY KEY constraint
673            let column = &referenced_column_defs[0];
674            if !column.primary_key && !column.unique {
675                return Err(Error::InvalidArgumentError(format!(
676                    "FOREIGN KEY references column '{}' in table '{}' that is not UNIQUE or PRIMARY KEY",
677                    column.name, referenced_table_info.display_name
678                )));
679            }
680        } else {
681            // Multiple columns: check if they form a multi-column PRIMARY KEY or UNIQUE constraint
682
683            // First check if all columns have primary_key = true (multi-column PRIMARY KEY)
684            let all_primary_key = referenced_column_defs.iter().all(|col| col.primary_key);
685
686            // Also check if they form a multi-column UNIQUE constraint
687            let has_multi_column_unique =
688                referenced_table_info
689                    .multi_column_uniques
690                    .iter()
691                    .any(|unique_entry| {
692                        // Check if this unique constraint matches our referenced columns
693                        if unique_entry.column_ids.len() != referenced_field_ids.len() {
694                            return false;
695                        }
696                        // Check if all field IDs match (order-independent)
697                        let unique_set: FxHashSet<_> =
698                            unique_entry.column_ids.iter().copied().collect();
699                        let referenced_set: FxHashSet<_> =
700                            referenced_field_ids.iter().copied().collect();
701                        unique_set == referenced_set
702                    });
703
704            if !all_primary_key && !has_multi_column_unique {
705                return Err(Error::InvalidArgumentError(format!(
706                    "FOREIGN KEY references columns ({}) in table '{}' that do not form a UNIQUE or PRIMARY KEY constraint",
707                    referenced_column_names.join(", "),
708                    referenced_table_info.display_name
709                )));
710            }
711        }
712
713        for (child_col, parent_col) in referencing_column_defs
714            .iter()
715            .zip(referenced_column_defs.iter())
716        {
717            if child_col.data_type != parent_col.data_type {
718                return Err(Error::InvalidArgumentError(format!(
719                    "FOREIGN KEY column '{}' type {:?} does not match referenced column '{}' type {:?}",
720                    child_col.name, child_col.data_type, parent_col.name, parent_col.data_type
721                )));
722            }
723
724            // Nullable child referencing non-null parent is allowed; no additional action required.
725        }
726
727        results.push(ValidatedForeignKey {
728            name: spec.name.clone(),
729            referencing_indices,
730            referencing_field_ids,
731            referencing_column_names,
732            referenced_table_id: referenced_table_info.table_id,
733            referenced_table_display: referenced_table_info.display_name.clone(),
734            referenced_table_canonical: referenced_table_info.canonical_name.clone(),
735            referenced_field_ids,
736            referenced_column_names,
737            on_delete: map_plan_action(spec.on_delete.clone()),
738            on_update: map_plan_action(spec.on_update.clone()),
739        });
740    }
741
742    Ok(results)
743}
744
745fn map_plan_action(action: PlanForeignKeyAction) -> ForeignKeyAction {
746    match action {
747        PlanForeignKeyAction::NoAction => ForeignKeyAction::NoAction,
748        PlanForeignKeyAction::Restrict => ForeignKeyAction::Restrict,
749    }
750}
751
752// ============================================================================
753// Runtime constraint helpers
754// ============================================================================
755
756/// Ensure existing + incoming values remain unique for a single column.
757pub fn ensure_single_column_unique(
758    existing_values: &[PlanValue],
759    new_values: &[PlanValue],
760    column_name: &str,
761) -> LlkvResult<()> {
762    let mut seen: FxHashSet<UniqueKey> = FxHashSet::default();
763
764    for value in existing_values {
765        if let Some(key) = unique_key_component(value, column_name)?
766            && !seen.insert(key.clone())
767        {
768            return Err(Error::ConstraintError(format!(
769                "constraint violation on column '{}'",
770                column_name
771            )));
772        }
773    }
774
775    for value in new_values {
776        if let Some(key) = unique_key_component(value, column_name)?
777            && !seen.insert(key.clone())
778        {
779            return Err(Error::ConstraintError(format!(
780                "constraint violation on column '{}'",
781                column_name
782            )));
783        }
784    }
785
786    Ok(())
787}
788
789/// Ensure primary key values remain unique and non-null.
790pub fn ensure_primary_key(
791    existing_keys: &[UniqueKey],
792    new_keys: &[UniqueKey],
793    column_names: &[String],
794) -> LlkvResult<()> {
795    let pk_label = if column_names.len() == 1 {
796        "column"
797    } else {
798        "columns"
799    };
800    let pk_display = if column_names.len() == 1 {
801        column_names[0].clone()
802    } else {
803        column_names.join(", ")
804    };
805
806    let mut seen: FxHashSet<UniqueKey> = FxHashSet::default();
807
808    for key in existing_keys.iter().chain(new_keys.iter()) {
809        if !seen.insert(key.clone()) {
810            return Err(Error::ConstraintError(format!(
811                "Duplicate key violates primary key constraint on {pk_label} '{pk_display}' (PRIMARY KEY or UNIQUE constraint violation)"
812            )));
813        }
814    }
815
816    Ok(())
817}
818
819/// Ensure that referencing rows satisfy the foreign key constraint by matching existing parent keys.
820pub fn validate_foreign_key_rows(
821    constraint_name: Option<&str>,
822    referencing_table: &str,
823    referenced_table: &str,
824    referenced_column_names: &[String],
825    parent_keys: &FxHashSet<UniqueKey>,
826    candidate_keys: &[UniqueKey],
827) -> LlkvResult<()> {
828    if parent_keys.is_empty() {
829        if candidate_keys.is_empty() {
830            return Ok(());
831        }
832
833        let constraint_label = constraint_name.unwrap_or("FOREIGN KEY");
834        let referenced_columns = if referenced_column_names.is_empty() {
835            String::from("<unknown>")
836        } else {
837            referenced_column_names.join(", ")
838        };
839        return Err(Error::ConstraintError(format!(
840            "Violates foreign key constraint '{}' on table '{}' referencing '{}' (columns: {}) - does not exist in the referenced table",
841            constraint_label, referencing_table, referenced_table, referenced_columns,
842        )));
843    }
844
845    for key in candidate_keys {
846        if parent_keys.contains(key) {
847            continue;
848        }
849
850        let constraint_label = constraint_name.unwrap_or("FOREIGN KEY");
851        let referenced_columns = if referenced_column_names.is_empty() {
852            String::from("<unknown>")
853        } else {
854            referenced_column_names.join(", ")
855        };
856
857        return Err(Error::ConstraintError(format!(
858            "Violates foreign key constraint '{}' on table '{}' referencing '{}' (columns: {}) - does not exist in the referenced table",
859            constraint_label, referencing_table, referenced_table, referenced_columns,
860        )));
861    }
862
863    Ok(())
864}
865
866// ========================================
867// ALTER TABLE validation helpers
868// ========================================
869
870use crate::{CatalogManager, TableView};
871use llkv_plan::AlterTableOperation;
872use llkv_storage::pager::Pager;
873use simd_r_drive_entry_handle::EntryHandle;
874
875/// Check if a column is part of a PRIMARY KEY or UNIQUE constraint.
876pub fn column_in_primary_or_unique(view: &TableView, field_id: FieldId) -> bool {
877    view.constraint_records
878        .iter()
879        .filter(|record| record.is_active())
880        .any(|record| match &record.kind {
881            super::ConstraintKind::PrimaryKey(payload) => payload.field_ids.contains(&field_id),
882            super::ConstraintKind::Unique(payload) => payload.field_ids.contains(&field_id),
883            _ => false,
884        })
885}
886
887/// Check if a column is part of a multi-column unique constraint.
888pub fn column_in_multi_column_unique(view: &TableView, field_id: FieldId) -> bool {
889    view.multi_column_uniques
890        .iter()
891        .any(|entry| entry.column_ids.contains(&field_id))
892}
893
894/// Check if a column is involved in any foreign key constraints.
895///
896/// Returns the name of the constraint if found, or None if the column is not referenced.
897pub fn column_in_foreign_keys<PagerType>(
898    view: &TableView,
899    field_id: FieldId,
900    table_id: TableId,
901    catalog_service: &CatalogManager<PagerType>,
902) -> LlkvResult<Option<String>>
903where
904    PagerType: Pager<Blob = EntryHandle> + Send + Sync + 'static,
905{
906    // Check if column is a referencing column in this table's foreign keys
907    if let Some(fk) = view
908        .foreign_keys
909        .iter()
910        .find(|fk| fk.referencing_field_ids.contains(&field_id))
911    {
912        return Ok(Some(
913            fk.constraint_name
914                .as_deref()
915                .unwrap_or("unnamed")
916                .to_string(),
917        ));
918    }
919
920    // Check if column is referenced by other tables' foreign keys
921    let mut visited: FxHashSet<TableId> = FxHashSet::default();
922    for (referencing_table_id, _) in catalog_service.foreign_keys_referencing(table_id)? {
923        if !visited.insert(referencing_table_id) {
924            continue;
925        }
926
927        for fk in catalog_service.foreign_key_views_for_table(referencing_table_id)? {
928            if fk.referenced_table_id == table_id && fk.referenced_field_ids.contains(&field_id) {
929                return Ok(Some(
930                    fk.constraint_name
931                        .as_deref()
932                        .unwrap_or("unnamed")
933                        .to_string(),
934                ));
935            }
936        }
937    }
938
939    Ok(None)
940}
941
942/// Validate an ALTER TABLE operation against existing constraints.
943///
944/// This function checks whether the requested ALTER TABLE operation would violate
945/// any existing constraints on the table, including:
946/// - PRIMARY KEY constraints
947/// - UNIQUE constraints
948/// - FOREIGN KEY constraints
949///
950/// Returns an error if the operation would violate a constraint.
951pub fn validate_alter_table_operation<PagerType>(
952    operation: &AlterTableOperation,
953    view: &TableView,
954    table_id: TableId,
955    catalog_service: &CatalogManager<PagerType>,
956) -> LlkvResult<()>
957where
958    PagerType: Pager<Blob = EntryHandle> + Send + Sync + 'static,
959{
960    let resolver = catalog_service
961        .field_resolver(table_id)
962        .ok_or_else(|| Error::Internal("missing field resolver for table".into()))?;
963
964    match operation {
965        AlterTableOperation::RenameColumn {
966            old_column_name,
967            new_column_name,
968        } => {
969            let field_id = resolver.field_id(old_column_name).ok_or_else(|| {
970                Error::CatalogError(format!(
971                    "Catalog Error: column '{}' does not exist",
972                    old_column_name
973                ))
974            })?;
975
976            if resolver.field_id(new_column_name).is_some() {
977                return Err(Error::CatalogError(format!(
978                    "Catalog Error: column '{}' already exists",
979                    new_column_name
980                )));
981            }
982
983            if let Some(constraint) =
984                column_in_foreign_keys(view, field_id, table_id, catalog_service)?
985            {
986                return Err(Error::CatalogError(format!(
987                    "Catalog Error: column '{}' is involved in the foreign key constraint '{}'",
988                    old_column_name, constraint
989                )));
990            }
991
992            Ok(())
993        }
994        AlterTableOperation::SetColumnDataType { column_name, .. } => {
995            let field_id = resolver.field_id(column_name).ok_or_else(|| {
996                Error::CatalogError(format!(
997                    "Catalog Error: column '{}' does not exist",
998                    column_name
999                ))
1000            })?;
1001
1002            if column_in_primary_or_unique(view, field_id)
1003                || column_in_multi_column_unique(view, field_id)
1004            {
1005                return Err(Error::InvalidArgumentError(format!(
1006                    "Binder Error: Cannot change the type of a column that has a UNIQUE or PRIMARY KEY constraint specified (column '{}')",
1007                    column_name
1008                )));
1009            }
1010
1011            if let Some(constraint) =
1012                column_in_foreign_keys(view, field_id, table_id, catalog_service)?
1013            {
1014                return Err(Error::CatalogError(format!(
1015                    "Catalog Error: column '{}' is involved in the foreign key constraint '{}'",
1016                    column_name, constraint
1017                )));
1018            }
1019
1020            Ok(())
1021        }
1022        AlterTableOperation::DropColumn {
1023            column_name,
1024            if_exists,
1025            ..
1026        } => {
1027            let field_id = match resolver.field_id(column_name) {
1028                Some(id) => id,
1029                None if *if_exists => return Ok(()),
1030                None => {
1031                    return Err(Error::CatalogError(format!(
1032                        "Catalog Error: column '{}' does not exist",
1033                        column_name
1034                    )));
1035                }
1036            };
1037
1038            if column_in_primary_or_unique(view, field_id)
1039                || column_in_multi_column_unique(view, field_id)
1040            {
1041                return Err(Error::CatalogError(format!(
1042                    "Catalog Error: there is a UNIQUE constraint that depends on it (column '{}')",
1043                    column_name
1044                )));
1045            }
1046
1047            if column_in_foreign_keys(view, field_id, table_id, catalog_service)?.is_some() {
1048                return Err(Error::CatalogError(format!(
1049                    "Catalog Error: there is a FOREIGN KEY constraint that depends on it (column '{}')",
1050                    column_name
1051                )));
1052            }
1053
1054            Ok(())
1055        }
1056    }
1057}