llkv_table/constraints/
validation.rs

1use arrow::datatypes::DataType;
2use llkv_plan::{
3    CanonicalScalar, ForeignKeyAction as PlanForeignKeyAction, ForeignKeySpec, PlanValue,
4};
5use llkv_result::{Error, Result as LlkvResult};
6use rustc_hash::{FxHashMap, FxHashSet};
7use sqlparser::ast::{self, Expr as SqlExpr};
8use sqlparser::dialect::GenericDialect;
9use sqlparser::parser::Parser;
10
11use super::types::ForeignKeyAction;
12use crate::sys_catalog::MultiColumnIndexEntryMeta;
13use crate::types::{FieldId, TableId};
14
15/// Lightweight column descriptor used for constraint validation.
16#[derive(Clone, Debug)]
17pub struct ConstraintColumnInfo {
18    pub name: String,
19    pub field_id: FieldId,
20    pub data_type: DataType,
21    pub nullable: bool,
22    pub check_expr: Option<String>,
23}
24
25/// Canonical representation of values participating in UNIQUE or PRIMARY KEY checks.
26#[derive(Hash, Eq, PartialEq, Debug, Clone)]
27pub struct UniqueKey(Vec<CanonicalScalar>);
28
29impl UniqueKey {
30    pub fn from_scalar(value: CanonicalScalar) -> Self {
31        UniqueKey(vec![value])
32    }
33
34    pub fn from_components(values: Vec<CanonicalScalar>) -> Self {
35        UniqueKey(values)
36    }
37
38    pub fn components(&self) -> &[CanonicalScalar] {
39        &self.0
40    }
41
42    pub fn into_components(self) -> Vec<CanonicalScalar> {
43        self.0
44    }
45}
46
47impl From<CanonicalScalar> for UniqueKey {
48    fn from(value: CanonicalScalar) -> Self {
49        UniqueKey::from_scalar(value)
50    }
51}
52
53/// Validate all CHECK constraints for the provided rows.
54pub fn validate_check_constraints(
55    columns: &[ConstraintColumnInfo],
56    rows: &[Vec<PlanValue>],
57    column_order: &[usize],
58) -> LlkvResult<()> {
59    if rows.is_empty() {
60        return Ok(());
61    }
62
63    let dialect = GenericDialect {};
64
65    let mut parsed_checks: Vec<(usize, String, String, SqlExpr)> = Vec::new();
66
67    for (idx, column) in columns.iter().enumerate() {
68        if let Some(expr_str) = &column.check_expr {
69            let expr = parse_check_expression(&dialect, expr_str)?;
70            parsed_checks.push((idx, column.name.clone(), expr_str.clone(), expr));
71        }
72    }
73
74    if parsed_checks.is_empty() {
75        return Ok(());
76    }
77
78    let mut name_lookup: FxHashMap<String, usize> = FxHashMap::default();
79    for (idx, column) in columns.iter().enumerate() {
80        name_lookup.insert(column.name.to_ascii_lowercase(), idx);
81    }
82
83    for row in rows {
84        for (_schema_idx, column_name, expr_str, expr) in &parsed_checks {
85            let result = evaluate_check_expression(expr, row, column_order, columns, &name_lookup)?;
86
87            if !result {
88                return Err(Error::ConstraintError(format!(
89                    "CHECK constraint failed for column '{}': {}",
90                    column_name, expr_str
91                )));
92            }
93        }
94    }
95
96    Ok(())
97}
98
99/// Ensure that the provided multi-column values maintain uniqueness.
100pub fn ensure_multi_column_unique(
101    existing_rows: &[Vec<PlanValue>],
102    new_rows: &[Vec<PlanValue>],
103    column_names: &[String],
104) -> LlkvResult<()> {
105    let mut existing_keys: FxHashSet<UniqueKey> = FxHashSet::default();
106    for values in existing_rows {
107        if let Some(key) = build_composite_unique_key(values, column_names)?
108            && !existing_keys.insert(key.clone())
109        {
110            return Err(Error::ConstraintError(format!(
111                "constraint violation on columns '{}'",
112                column_names.join(", ")
113            )));
114        }
115    }
116
117    let mut new_keys: FxHashSet<UniqueKey> = FxHashSet::default();
118    for values in new_rows {
119        if let Some(key) = build_composite_unique_key(values, column_names)?
120            && (existing_keys.contains(&key) || !new_keys.insert(key))
121        {
122            return Err(Error::ConstraintError(format!(
123                "constraint violation on columns '{}'",
124                column_names.join(", ")
125            )));
126        }
127    }
128
129    Ok(())
130}
131
132/// Build a unique key component for a single value.
133pub fn unique_key_component(value: &PlanValue, column_name: &str) -> LlkvResult<Option<UniqueKey>> {
134    match value {
135        PlanValue::Null => Ok(None),
136        PlanValue::Struct(_) => Err(Error::InvalidArgumentError(format!(
137            "UNIQUE index is not supported on struct column '{}'",
138            column_name
139        ))),
140        _ => {
141            let scalar = CanonicalScalar::from_plan_value(value)?;
142            if matches!(scalar, CanonicalScalar::Null) {
143                Ok(None)
144            } else {
145                Ok(Some(UniqueKey::from_scalar(scalar)))
146            }
147        }
148    }
149}
150
151/// Build a composite unique key from column values.
152pub fn build_composite_unique_key(
153    values: &[PlanValue],
154    column_names: &[String],
155) -> LlkvResult<Option<UniqueKey>> {
156    if values.is_empty() {
157        return Ok(None);
158    }
159
160    let mut components = Vec::with_capacity(values.len());
161    for (value, column_name) in values.iter().zip(column_names) {
162        match unique_key_component(value, column_name)? {
163            Some(component) => components.extend(component.into_components()),
164            None => return Ok(None),
165        }
166    }
167
168    Ok(Some(UniqueKey::from_components(components)))
169}
170
171fn parse_check_expression(dialect: &GenericDialect, check_expr_str: &str) -> LlkvResult<SqlExpr> {
172    let sql = format!("SELECT {}", check_expr_str);
173    let mut ast = Parser::parse_sql(dialect, &sql).map_err(|e| {
174        Error::InvalidArgumentError(format!(
175            "Failed to parse CHECK expression '{}': {}",
176            check_expr_str, e
177        ))
178    })?;
179
180    let stmt = ast.pop().ok_or_else(|| {
181        Error::InvalidArgumentError(format!(
182            "CHECK expression '{}' resulted in empty AST",
183            check_expr_str
184        ))
185    })?;
186
187    let query = match stmt {
188        ast::Statement::Query(q) => q,
189        _ => {
190            return Err(Error::InvalidArgumentError(format!(
191                "CHECK expression '{}' did not parse as SELECT",
192                check_expr_str
193            )));
194        }
195    };
196
197    let body = match *query.body {
198        ast::SetExpr::Select(s) => s,
199        _ => {
200            return Err(Error::InvalidArgumentError(format!(
201                "CHECK expression '{}' is not a simple SELECT",
202                check_expr_str
203            )));
204        }
205    };
206
207    if body.projection.len() != 1 {
208        return Err(Error::InvalidArgumentError(format!(
209            "CHECK expression '{}' must have exactly one projection",
210            check_expr_str
211        )));
212    }
213
214    match &body.projection[0] {
215        ast::SelectItem::UnnamedExpr(expr) | ast::SelectItem::ExprWithAlias { expr, .. } => {
216            Ok(expr.clone())
217        }
218        _ => Err(Error::InvalidArgumentError(format!(
219            "CHECK expression '{}' projection is not a simple expression",
220            check_expr_str
221        ))),
222    }
223}
224
225fn evaluate_check_expression(
226    expr: &SqlExpr,
227    row: &[PlanValue],
228    column_order: &[usize],
229    columns: &[ConstraintColumnInfo],
230    name_lookup: &FxHashMap<String, usize>,
231) -> LlkvResult<bool> {
232    use sqlparser::ast::BinaryOperator;
233
234    match expr {
235        SqlExpr::BinaryOp { left, op, right } => {
236            let left_val =
237                evaluate_check_expr_value(left, row, column_order, columns, name_lookup)?;
238            let right_val =
239                evaluate_check_expr_value(right, row, column_order, columns, name_lookup)?;
240
241            match op {
242                BinaryOperator::Eq => {
243                    if matches!(left_val, PlanValue::Null) || matches!(right_val, PlanValue::Null) {
244                        Ok(true)
245                    } else {
246                        Ok(left_val == right_val)
247                    }
248                }
249                BinaryOperator::NotEq => {
250                    if matches!(left_val, PlanValue::Null) || matches!(right_val, PlanValue::Null) {
251                        Ok(true)
252                    } else {
253                        Ok(left_val != right_val)
254                    }
255                }
256                BinaryOperator::Lt => compare_numeric(&left_val, &right_val, |l, r| l < r),
257                BinaryOperator::LtEq => compare_numeric(&left_val, &right_val, |l, r| l <= r),
258                BinaryOperator::Gt => compare_numeric(&left_val, &right_val, |l, r| l > r),
259                BinaryOperator::GtEq => compare_numeric(&left_val, &right_val, |l, r| l >= r),
260                _ => Err(Error::InvalidArgumentError(format!(
261                    "Unsupported operator in CHECK constraint: {:?}",
262                    op
263                ))),
264            }
265        }
266        SqlExpr::IsNull(inner) => {
267            let value = evaluate_check_expr_value(inner, row, column_order, columns, name_lookup)?;
268            Ok(matches!(value, PlanValue::Null))
269        }
270        SqlExpr::IsNotNull(inner) => {
271            let value = evaluate_check_expr_value(inner, row, column_order, columns, name_lookup)?;
272            Ok(!matches!(value, PlanValue::Null))
273        }
274        SqlExpr::Nested(inner) => {
275            evaluate_check_expression(inner, row, column_order, columns, name_lookup)
276        }
277        _ => Err(Error::InvalidArgumentError(format!(
278            "Unsupported expression in CHECK constraint: {:?}",
279            expr
280        ))),
281    }
282}
283
284#[allow(clippy::only_used_in_recursion)]
285fn evaluate_check_expr_value(
286    expr: &SqlExpr,
287    row: &[PlanValue],
288    column_order: &[usize],
289    columns: &[ConstraintColumnInfo],
290    name_lookup: &FxHashMap<String, usize>,
291) -> LlkvResult<PlanValue> {
292    use sqlparser::ast::{BinaryOperator, Expr as SqlExpr};
293
294    match expr {
295        SqlExpr::BinaryOp { left, op, right } => {
296            let left_val =
297                evaluate_check_expr_value(left, row, column_order, columns, name_lookup)?;
298            let right_val =
299                evaluate_check_expr_value(right, row, column_order, columns, name_lookup)?;
300
301            match op {
302                BinaryOperator::Plus => apply_numeric_op(left_val, right_val, |l, r| l + r),
303                BinaryOperator::Minus => apply_numeric_op(left_val, right_val, |l, r| l - r),
304                BinaryOperator::Multiply => apply_numeric_op(left_val, right_val, |l, r| l * r),
305                BinaryOperator::Divide => divide_numeric(left_val, right_val),
306                _ => Err(Error::InvalidArgumentError(format!(
307                    "Unsupported binary operator in CHECK constraint value expression: {:?}",
308                    op
309                ))),
310            }
311        }
312        SqlExpr::Identifier(ident) => {
313            let column_idx = lookup_column_index(name_lookup, &ident.value)?;
314            extract_row_value(row, column_order, column_idx, &ident.value)
315        }
316        SqlExpr::CompoundIdentifier(idents) => {
317            if idents.len() == 2 {
318                let column_name = &idents[0].value;
319                let field_name = &idents[1].value;
320                let column_idx = lookup_column_index(name_lookup, column_name)?;
321                let value = extract_row_value(row, column_order, column_idx, column_name)?;
322                extract_struct_field(value, column_name, field_name)
323            } else if idents.len() == 3 {
324                let column_name = &idents[1].value;
325                let field_name = &idents[2].value;
326                let column_idx = lookup_column_index(name_lookup, column_name)?;
327                let value = extract_row_value(row, column_order, column_idx, column_name)?;
328                extract_struct_field(value, column_name, field_name)
329            } else {
330                Err(Error::InvalidArgumentError(format!(
331                    "Unsupported compound identifier in CHECK constraint: {} parts",
332                    idents.len()
333                )))
334            }
335        }
336        SqlExpr::Value(val_with_span) => match &val_with_span.value {
337            ast::Value::Number(n, _) => {
338                if let Ok(i) = n.parse::<i64>() {
339                    Ok(PlanValue::Integer(i))
340                } else if let Ok(f) = n.parse::<f64>() {
341                    Ok(PlanValue::Float(f))
342                } else {
343                    Err(Error::InvalidArgumentError(format!(
344                        "Invalid number in CHECK constraint: {}",
345                        n
346                    )))
347                }
348            }
349            ast::Value::SingleQuotedString(s) | ast::Value::DoubleQuotedString(s) => {
350                Ok(PlanValue::String(s.clone()))
351            }
352            ast::Value::Null => Ok(PlanValue::Null),
353            _ => Err(Error::InvalidArgumentError(format!(
354                "Unsupported value type in CHECK constraint: {:?}",
355                val_with_span.value
356            ))),
357        },
358        SqlExpr::Nested(inner) => {
359            evaluate_check_expr_value(inner, row, column_order, columns, name_lookup)
360        }
361        _ => Err(Error::InvalidArgumentError(format!(
362            "Unsupported expression type in CHECK constraint: {:?}",
363            expr
364        ))),
365    }
366}
367
368fn lookup_column_index(
369    name_lookup: &FxHashMap<String, usize>,
370    column_name: &str,
371) -> LlkvResult<usize> {
372    name_lookup
373        .get(&column_name.to_ascii_lowercase())
374        .copied()
375        .ok_or_else(|| {
376            Error::InvalidArgumentError(format!(
377                "Unknown column '{}' in CHECK constraint",
378                column_name
379            ))
380        })
381}
382
383fn extract_row_value(
384    row: &[PlanValue],
385    column_order: &[usize],
386    schema_idx: usize,
387    column_name: &str,
388) -> LlkvResult<PlanValue> {
389    let insert_pos = column_order
390        .iter()
391        .position(|&dest_idx| dest_idx == schema_idx)
392        .ok_or_else(|| {
393            Error::InvalidArgumentError(format!("Column '{}' not provided in INSERT", column_name))
394        })?;
395
396    Ok(row[insert_pos].clone())
397}
398
399fn extract_struct_field(
400    value: PlanValue,
401    column_name: &str,
402    field_name: &str,
403) -> LlkvResult<PlanValue> {
404    match value {
405        PlanValue::Struct(fields) => fields
406            .into_iter()
407            .find(|(name, _)| name.eq_ignore_ascii_case(field_name))
408            .map(|(_, val)| val)
409            .ok_or_else(|| {
410                Error::InvalidArgumentError(format!(
411                    "Struct field '{}' not found in column '{}'",
412                    field_name, column_name
413                ))
414            }),
415        _ => Err(Error::InvalidArgumentError(format!(
416            "Column '{}' is not a struct, cannot access field '{}'",
417            column_name, field_name
418        ))),
419    }
420}
421
422fn compare_numeric<F>(left: &PlanValue, right: &PlanValue, compare: F) -> LlkvResult<bool>
423where
424    F: Fn(f64, f64) -> bool,
425{
426    if matches!(left, PlanValue::Null) || matches!(right, PlanValue::Null) {
427        // In SQL, any comparison with NULL yields UNKNOWN.
428        // For CHECK constraints, UNKNOWN is treated as TRUE (constraint passes).
429        return Ok(true);
430    }
431
432    match (left, right) {
433        (PlanValue::Integer(l), PlanValue::Integer(r)) => Ok(compare(*l as f64, *r as f64)),
434        (PlanValue::Float(l), PlanValue::Float(r)) => Ok(compare(*l, *r)),
435        (PlanValue::Integer(l), PlanValue::Float(r)) => Ok(compare(*l as f64, *r)),
436        (PlanValue::Float(l), PlanValue::Integer(r)) => Ok(compare(*l, *r as f64)),
437        _ => Err(Error::InvalidArgumentError(
438            "CHECK constraint comparison requires numeric values".into(),
439        )),
440    }
441}
442
443fn apply_numeric_op(
444    left: PlanValue,
445    right: PlanValue,
446    op: fn(f64, f64) -> f64,
447) -> LlkvResult<PlanValue> {
448    if matches!(left, PlanValue::Null) || matches!(right, PlanValue::Null) {
449        return Ok(PlanValue::Null);
450    }
451
452    match (left, right) {
453        (PlanValue::Integer(l), PlanValue::Integer(r)) => {
454            let result = op(l as f64, r as f64);
455            if result.fract() == 0.0 {
456                Ok(PlanValue::Integer(result as i64))
457            } else {
458                Ok(PlanValue::Float(result))
459            }
460        }
461        (PlanValue::Float(l), PlanValue::Float(r)) => Ok(PlanValue::Float(op(l, r))),
462        (PlanValue::Integer(l), PlanValue::Float(r)) => Ok(PlanValue::Float(op(l as f64, r))),
463        (PlanValue::Float(l), PlanValue::Integer(r)) => Ok(PlanValue::Float(op(l, r as f64))),
464        _ => Err(Error::InvalidArgumentError(
465            "CHECK constraint arithmetic requires numeric values".into(),
466        )),
467    }
468}
469
470fn divide_numeric(left: PlanValue, right: PlanValue) -> LlkvResult<PlanValue> {
471    if matches!(left, PlanValue::Null) || matches!(right, PlanValue::Null) {
472        return Ok(PlanValue::Null);
473    }
474
475    match (left, right) {
476        (PlanValue::Integer(l), PlanValue::Integer(r)) => {
477            if r == 0 {
478                Err(Error::InvalidArgumentError(
479                    "Division by zero in CHECK constraint".into(),
480                ))
481            } else {
482                Ok(PlanValue::Integer(l / r))
483            }
484        }
485        (PlanValue::Float(l), PlanValue::Float(r)) => {
486            if r == 0.0 {
487                Err(Error::InvalidArgumentError(
488                    "Division by zero in CHECK constraint".into(),
489                ))
490            } else {
491                Ok(PlanValue::Float(l / r))
492            }
493        }
494        (PlanValue::Integer(l), PlanValue::Float(r)) => {
495            if r == 0.0 {
496                Err(Error::InvalidArgumentError(
497                    "Division by zero in CHECK constraint".into(),
498                ))
499            } else {
500                Ok(PlanValue::Float(l as f64 / r))
501            }
502        }
503        (PlanValue::Float(l), PlanValue::Integer(r)) => {
504            if r == 0 {
505                Err(Error::InvalidArgumentError(
506                    "Division by zero in CHECK constraint".into(),
507                ))
508            } else {
509                Ok(PlanValue::Float(l / r as f64))
510            }
511        }
512        _ => Err(Error::InvalidArgumentError(
513            "CHECK constraint / operator requires numeric values".into(),
514        )),
515    }
516}
517
518// ============================================================================
519// Foreign key validation
520// ============================================================================
521
522/// Column metadata used when validating foreign key definitions.
523#[derive(Clone, Debug)]
524pub struct ForeignKeyColumn {
525    pub name: String,
526    pub data_type: DataType,
527    pub nullable: bool,
528    pub primary_key: bool,
529    pub unique: bool,
530    pub field_id: FieldId,
531}
532
533/// Table metadata used when validating foreign key definitions.
534#[derive(Clone, Debug)]
535pub struct ForeignKeyTableInfo {
536    pub display_name: String,
537    pub canonical_name: String,
538    pub table_id: TableId,
539    pub columns: Vec<ForeignKeyColumn>,
540    pub multi_column_uniques: Vec<MultiColumnIndexEntryMeta>,
541}
542
543/// Result of validating a foreign key specification.
544#[derive(Clone, Debug)]
545pub struct ValidatedForeignKey {
546    pub name: Option<String>,
547    pub referencing_indices: Vec<usize>,
548    pub referencing_field_ids: Vec<FieldId>,
549    pub referencing_column_names: Vec<String>,
550    pub referenced_table_id: TableId,
551    pub referenced_table_display: String,
552    pub referenced_table_canonical: String,
553    pub referenced_field_ids: Vec<FieldId>,
554    pub referenced_column_names: Vec<String>,
555    pub on_delete: ForeignKeyAction,
556    pub on_update: ForeignKeyAction,
557}
558
559/// Validate a set of foreign key specifications against the provided table schemas.
560pub fn validate_foreign_keys<F>(
561    referencing_table: &ForeignKeyTableInfo,
562    specs: &[ForeignKeySpec],
563    mut lookup_table: F,
564) -> LlkvResult<Vec<ValidatedForeignKey>>
565where
566    F: FnMut(&str) -> LlkvResult<ForeignKeyTableInfo>,
567{
568    if specs.is_empty() {
569        return Ok(Vec::new());
570    }
571
572    let mut referencing_lookup: FxHashMap<String, (usize, &ForeignKeyColumn)> =
573        FxHashMap::default();
574    for (idx, column) in referencing_table.columns.iter().enumerate() {
575        referencing_lookup.insert(column.name.to_ascii_lowercase(), (idx, column));
576    }
577
578    let mut results = Vec::with_capacity(specs.len());
579
580    for spec in specs {
581        if spec.columns.is_empty() {
582            return Err(Error::InvalidArgumentError(
583                "FOREIGN KEY requires at least one referencing column".into(),
584            ));
585        }
586
587        let mut seen_referencing = FxHashSet::default();
588        let mut referencing_indices = Vec::with_capacity(spec.columns.len());
589        let mut referencing_field_ids = Vec::with_capacity(spec.columns.len());
590        let mut referencing_column_defs = Vec::with_capacity(spec.columns.len());
591        let mut referencing_column_names = Vec::with_capacity(spec.columns.len());
592
593        for column_name in &spec.columns {
594            let normalized = column_name.to_ascii_lowercase();
595            if !seen_referencing.insert(normalized.clone()) {
596                return Err(Error::InvalidArgumentError(format!(
597                    "duplicate column '{}' in FOREIGN KEY constraint",
598                    column_name
599                )));
600            }
601
602            let (idx, column) = referencing_lookup.get(&normalized).ok_or_else(|| {
603                Error::InvalidArgumentError(format!(
604                    "unknown column '{}' in FOREIGN KEY constraint",
605                    column_name
606                ))
607            })?;
608
609            referencing_indices.push(*idx);
610            referencing_field_ids.push(column.field_id);
611            referencing_column_defs.push((*column).clone());
612            referencing_column_names.push(column.name.clone());
613        }
614
615        let referenced_table_info = lookup_table(&spec.referenced_table)?;
616
617        let referenced_columns = if spec.referenced_columns.is_empty() {
618            referenced_table_info
619                .columns
620                .iter()
621                .filter(|col| col.primary_key)
622                .map(|col| col.name.clone())
623                .collect::<Vec<_>>()
624        } else {
625            spec.referenced_columns.clone()
626        };
627
628        if referenced_columns.is_empty() {
629            return Err(Error::InvalidArgumentError(format!(
630                "there is no primary key for referenced table '{}'",
631                spec.referenced_table
632            )));
633        }
634
635        if spec.columns.len() != referenced_columns.len() {
636            return Err(Error::InvalidArgumentError(format!(
637                "number of referencing columns ({}) does not match number of referenced columns ({})",
638                spec.columns.len(),
639                referenced_columns.len()
640            )));
641        }
642
643        let mut seen_referenced = FxHashSet::default();
644        let mut referenced_lookup: FxHashMap<String, &ForeignKeyColumn> = FxHashMap::default();
645        for column in &referenced_table_info.columns {
646            referenced_lookup.insert(column.name.to_ascii_lowercase(), column);
647        }
648
649        let mut referenced_field_ids = Vec::with_capacity(referenced_columns.len());
650        let mut referenced_column_defs = Vec::with_capacity(referenced_columns.len());
651        let mut referenced_column_names = Vec::with_capacity(referenced_columns.len());
652
653        for column_name in referenced_columns.iter() {
654            let normalized = column_name.to_ascii_lowercase();
655            if !seen_referenced.insert(normalized.clone()) {
656                return Err(Error::InvalidArgumentError(format!(
657                    "duplicate referenced column '{}' in FOREIGN KEY constraint",
658                    column_name
659                )));
660            }
661
662            let column = referenced_lookup.get(&normalized).ok_or_else(|| {
663                Error::InvalidArgumentError(format!(
664                    "unknown referenced column '{}' in table '{}'",
665                    column_name, referenced_table_info.display_name
666                ))
667            })?;
668
669            referenced_field_ids.push(column.field_id);
670            referenced_column_defs.push((*column).clone());
671            referenced_column_names.push(column.name.clone());
672        }
673
674        // Validate that the referenced columns form a UNIQUE or PRIMARY KEY constraint
675        if referenced_columns.len() == 1 {
676            // Single column: check if it has UNIQUE or PRIMARY KEY constraint
677            let column = &referenced_column_defs[0];
678            if !column.primary_key && !column.unique {
679                return Err(Error::InvalidArgumentError(format!(
680                    "FOREIGN KEY references column '{}' in table '{}' that is not UNIQUE or PRIMARY KEY",
681                    column.name, referenced_table_info.display_name
682                )));
683            }
684        } else {
685            // Multiple columns: check if they form a multi-column PRIMARY KEY or UNIQUE constraint
686
687            // First check if all columns have primary_key = true (multi-column PRIMARY KEY)
688            let all_primary_key = referenced_column_defs.iter().all(|col| col.primary_key);
689
690            // Also check if they form a multi-column UNIQUE constraint
691            let has_multi_column_unique =
692                referenced_table_info
693                    .multi_column_uniques
694                    .iter()
695                    .any(|unique_entry| {
696                        // Check if this unique constraint matches our referenced columns
697                        if unique_entry.column_ids.len() != referenced_field_ids.len() {
698                            return false;
699                        }
700                        // Check if all field IDs match (order-independent)
701                        let unique_set: FxHashSet<_> =
702                            unique_entry.column_ids.iter().copied().collect();
703                        let referenced_set: FxHashSet<_> =
704                            referenced_field_ids.iter().copied().collect();
705                        unique_set == referenced_set
706                    });
707
708            if !all_primary_key && !has_multi_column_unique {
709                return Err(Error::InvalidArgumentError(format!(
710                    "FOREIGN KEY references columns ({}) in table '{}' that do not form a UNIQUE or PRIMARY KEY constraint",
711                    referenced_column_names.join(", "),
712                    referenced_table_info.display_name
713                )));
714            }
715        }
716
717        for (child_col, parent_col) in referencing_column_defs
718            .iter()
719            .zip(referenced_column_defs.iter())
720        {
721            if child_col.data_type != parent_col.data_type {
722                return Err(Error::InvalidArgumentError(format!(
723                    "FOREIGN KEY column '{}' type {:?} does not match referenced column '{}' type {:?}",
724                    child_col.name, child_col.data_type, parent_col.name, parent_col.data_type
725                )));
726            }
727
728            // Nullable child referencing non-null parent is allowed; no additional action required.
729        }
730
731        results.push(ValidatedForeignKey {
732            name: spec.name.clone(),
733            referencing_indices,
734            referencing_field_ids,
735            referencing_column_names,
736            referenced_table_id: referenced_table_info.table_id,
737            referenced_table_display: referenced_table_info.display_name.clone(),
738            referenced_table_canonical: referenced_table_info.canonical_name.clone(),
739            referenced_field_ids,
740            referenced_column_names,
741            on_delete: map_plan_action(spec.on_delete.clone()),
742            on_update: map_plan_action(spec.on_update.clone()),
743        });
744    }
745
746    Ok(results)
747}
748
749fn map_plan_action(action: PlanForeignKeyAction) -> ForeignKeyAction {
750    match action {
751        PlanForeignKeyAction::NoAction => ForeignKeyAction::NoAction,
752        PlanForeignKeyAction::Restrict => ForeignKeyAction::Restrict,
753    }
754}
755
756// ============================================================================
757// Runtime constraint helpers
758// ============================================================================
759
760/// Ensure existing + incoming values remain unique for a single column.
761pub fn ensure_single_column_unique(
762    existing_values: &[PlanValue],
763    new_values: &[PlanValue],
764    column_name: &str,
765) -> LlkvResult<()> {
766    let mut seen: FxHashSet<UniqueKey> = FxHashSet::default();
767
768    for value in existing_values {
769        if let Some(key) = unique_key_component(value, column_name)?
770            && !seen.insert(key.clone())
771        {
772            return Err(Error::ConstraintError(format!(
773                "constraint violation on column '{}'",
774                column_name
775            )));
776        }
777    }
778
779    for value in new_values {
780        if let Some(key) = unique_key_component(value, column_name)?
781            && !seen.insert(key.clone())
782        {
783            return Err(Error::ConstraintError(format!(
784                "constraint violation on column '{}'",
785                column_name
786            )));
787        }
788    }
789
790    Ok(())
791}
792
793/// Ensure primary key values remain unique and non-null.
794pub fn ensure_primary_key(
795    existing_rows: &[Vec<PlanValue>],
796    new_rows: &[Vec<PlanValue>],
797    column_names: &[String],
798) -> LlkvResult<()> {
799    let pk_label = if column_names.len() == 1 {
800        "column"
801    } else {
802        "columns"
803    };
804    let pk_display = if column_names.len() == 1 {
805        column_names[0].clone()
806    } else {
807        column_names.join(", ")
808    };
809
810    let mut seen: FxHashSet<UniqueKey> = FxHashSet::default();
811
812    for row_values in existing_rows {
813        if row_values.len() != column_names.len() {
814            continue;
815        }
816
817        let key = build_composite_unique_key(row_values, column_names)?;
818        let key = key.ok_or_else(|| {
819            Error::ConstraintError(format!(
820                "constraint failed: NOT NULL constraint failed for PRIMARY KEY {pk_label} '{pk_display}'"
821            ))
822        })?;
823
824        if !seen.insert(key.clone()) {
825            return Err(Error::ConstraintError(format!(
826                "Duplicate key violates primary key constraint on {pk_label} '{pk_display}' (PRIMARY KEY or UNIQUE constraint violation)"
827            )));
828        }
829    }
830
831    for row_values in new_rows {
832        if row_values.len() != column_names.len() {
833            continue;
834        }
835
836        let key = build_composite_unique_key(row_values, column_names)?;
837        let key = key.ok_or_else(|| {
838            Error::ConstraintError(format!(
839                "constraint failed: NOT NULL constraint failed for PRIMARY KEY {pk_label} '{pk_display}'"
840            ))
841        })?;
842
843        if !seen.insert(key.clone()) {
844            return Err(Error::ConstraintError(format!(
845                "Duplicate key violates primary key constraint on {pk_label} '{pk_display}' (PRIMARY KEY or UNIQUE constraint violation)"
846            )));
847        }
848    }
849
850    Ok(())
851}
852
853/// Ensure that referencing rows satisfy the foreign key constraint by matching existing parent keys.
854pub fn validate_foreign_key_rows(
855    constraint_name: Option<&str>,
856    referencing_table: &str,
857    referenced_table: &str,
858    referenced_column_names: &[String],
859    parent_keys: &[Vec<PlanValue>],
860    candidate_keys: &[Vec<PlanValue>],
861) -> LlkvResult<()> {
862    if parent_keys.is_empty() {
863        // If there are no parent keys, every non-null candidate will fail.
864        for key in candidate_keys {
865            if key.iter().all(|value| !matches!(value, PlanValue::Null)) {
866                let constraint_label = constraint_name.unwrap_or("FOREIGN KEY");
867                let referenced_columns = if referenced_column_names.is_empty() {
868                    String::from("<unknown>")
869                } else {
870                    referenced_column_names.join(", ")
871                };
872                return Err(Error::ConstraintError(format!(
873                    "Violates foreign key constraint '{}' on table '{}' referencing '{}' (columns: {}) - does not exist in the referenced table",
874                    constraint_label, referencing_table, referenced_table, referenced_columns,
875                )));
876            }
877        }
878        return Ok(());
879    }
880
881    for key in candidate_keys {
882        if key.iter().any(|value| matches!(value, PlanValue::Null)) {
883            continue;
884        }
885
886        if parent_keys.iter().any(|existing| existing == key) {
887            continue;
888        }
889
890        let constraint_label = constraint_name.unwrap_or("FOREIGN KEY");
891        let referenced_columns = if referenced_column_names.is_empty() {
892            String::from("<unknown>")
893        } else {
894            referenced_column_names.join(", ")
895        };
896
897        return Err(Error::ConstraintError(format!(
898            "Violates foreign key constraint '{}' on table '{}' referencing '{}' (columns: {}) - does not exist in the referenced table",
899            constraint_label, referencing_table, referenced_table, referenced_columns,
900        )));
901    }
902
903    Ok(())
904}
905
906// ========================================
907// ALTER TABLE validation helpers
908// ========================================
909
910use crate::{CatalogManager, TableView};
911use llkv_plan::AlterTableOperation;
912use llkv_storage::pager::Pager;
913use simd_r_drive_entry_handle::EntryHandle;
914
915/// Check if a column is part of a PRIMARY KEY or UNIQUE constraint.
916pub fn column_in_primary_or_unique(view: &TableView, field_id: FieldId) -> bool {
917    view.constraint_records
918        .iter()
919        .filter(|record| record.is_active())
920        .any(|record| match &record.kind {
921            super::ConstraintKind::PrimaryKey(payload) => payload.field_ids.contains(&field_id),
922            super::ConstraintKind::Unique(payload) => payload.field_ids.contains(&field_id),
923            _ => false,
924        })
925}
926
927/// Check if a column is part of a multi-column unique constraint.
928pub fn column_in_multi_column_unique(view: &TableView, field_id: FieldId) -> bool {
929    view.multi_column_uniques
930        .iter()
931        .any(|entry| entry.column_ids.contains(&field_id))
932}
933
934/// Check if a column is involved in any foreign key constraints.
935///
936/// Returns the name of the constraint if found, or None if the column is not referenced.
937pub fn column_in_foreign_keys<PagerType>(
938    view: &TableView,
939    field_id: FieldId,
940    table_id: TableId,
941    catalog_service: &CatalogManager<PagerType>,
942) -> LlkvResult<Option<String>>
943where
944    PagerType: Pager<Blob = EntryHandle> + Send + Sync + 'static,
945{
946    // Check if column is a referencing column in this table's foreign keys
947    if let Some(fk) = view
948        .foreign_keys
949        .iter()
950        .find(|fk| fk.referencing_field_ids.contains(&field_id))
951    {
952        return Ok(Some(
953            fk.constraint_name
954                .as_deref()
955                .unwrap_or("unnamed")
956                .to_string(),
957        ));
958    }
959
960    // Check if column is referenced by other tables' foreign keys
961    let mut visited: FxHashSet<TableId> = FxHashSet::default();
962    for (referencing_table_id, _) in catalog_service.foreign_keys_referencing(table_id)? {
963        if !visited.insert(referencing_table_id) {
964            continue;
965        }
966
967        for fk in catalog_service.foreign_key_views_for_table(referencing_table_id)? {
968            if fk.referenced_table_id == table_id && fk.referenced_field_ids.contains(&field_id) {
969                return Ok(Some(
970                    fk.constraint_name
971                        .as_deref()
972                        .unwrap_or("unnamed")
973                        .to_string(),
974                ));
975            }
976        }
977    }
978
979    Ok(None)
980}
981
982/// Validate an ALTER TABLE operation against existing constraints.
983///
984/// This function checks whether the requested ALTER TABLE operation would violate
985/// any existing constraints on the table, including:
986/// - PRIMARY KEY constraints
987/// - UNIQUE constraints
988/// - FOREIGN KEY constraints
989///
990/// Returns an error if the operation would violate a constraint.
991pub fn validate_alter_table_operation<PagerType>(
992    operation: &AlterTableOperation,
993    view: &TableView,
994    table_id: TableId,
995    catalog_service: &CatalogManager<PagerType>,
996) -> LlkvResult<()>
997where
998    PagerType: Pager<Blob = EntryHandle> + Send + Sync + 'static,
999{
1000    let resolver = catalog_service
1001        .field_resolver(table_id)
1002        .ok_or_else(|| Error::Internal("missing field resolver for table".into()))?;
1003
1004    match operation {
1005        AlterTableOperation::RenameColumn {
1006            old_column_name,
1007            new_column_name,
1008        } => {
1009            let field_id = resolver.field_id(old_column_name).ok_or_else(|| {
1010                Error::CatalogError(format!(
1011                    "Catalog Error: column '{}' does not exist",
1012                    old_column_name
1013                ))
1014            })?;
1015
1016            if resolver.field_id(new_column_name).is_some() {
1017                return Err(Error::CatalogError(format!(
1018                    "Catalog Error: column '{}' already exists",
1019                    new_column_name
1020                )));
1021            }
1022
1023            if let Some(constraint) =
1024                column_in_foreign_keys(view, field_id, table_id, catalog_service)?
1025            {
1026                return Err(Error::CatalogError(format!(
1027                    "Catalog Error: column '{}' is involved in the foreign key constraint '{}'",
1028                    old_column_name, constraint
1029                )));
1030            }
1031
1032            Ok(())
1033        }
1034        AlterTableOperation::SetColumnDataType { column_name, .. } => {
1035            let field_id = resolver.field_id(column_name).ok_or_else(|| {
1036                Error::CatalogError(format!(
1037                    "Catalog Error: column '{}' does not exist",
1038                    column_name
1039                ))
1040            })?;
1041
1042            if column_in_primary_or_unique(view, field_id)
1043                || column_in_multi_column_unique(view, field_id)
1044            {
1045                return Err(Error::InvalidArgumentError(format!(
1046                    "Binder Error: Cannot change the type of a column that has a UNIQUE or PRIMARY KEY constraint specified (column '{}')",
1047                    column_name
1048                )));
1049            }
1050
1051            if let Some(constraint) =
1052                column_in_foreign_keys(view, field_id, table_id, catalog_service)?
1053            {
1054                return Err(Error::CatalogError(format!(
1055                    "Catalog Error: column '{}' is involved in the foreign key constraint '{}'",
1056                    column_name, constraint
1057                )));
1058            }
1059
1060            Ok(())
1061        }
1062        AlterTableOperation::DropColumn {
1063            column_name,
1064            if_exists,
1065            ..
1066        } => {
1067            let field_id = match resolver.field_id(column_name) {
1068                Some(id) => id,
1069                None if *if_exists => return Ok(()),
1070                None => {
1071                    return Err(Error::CatalogError(format!(
1072                        "Catalog Error: column '{}' does not exist",
1073                        column_name
1074                    )));
1075                }
1076            };
1077
1078            if column_in_primary_or_unique(view, field_id)
1079                || column_in_multi_column_unique(view, field_id)
1080            {
1081                return Err(Error::CatalogError(format!(
1082                    "Catalog Error: there is a UNIQUE constraint that depends on it (column '{}')",
1083                    column_name
1084                )));
1085            }
1086
1087            if column_in_foreign_keys(view, field_id, table_id, catalog_service)?.is_some() {
1088                return Err(Error::CatalogError(format!(
1089                    "Catalog Error: there is a FOREIGN KEY constraint that depends on it (column '{}')",
1090                    column_name
1091                )));
1092            }
1093
1094            Ok(())
1095        }
1096    }
1097}