llkv_table/constraints/
validation.rs

1use arrow::datatypes::DataType;
2use llkv_plan::{ForeignKeyAction as PlanForeignKeyAction, ForeignKeySpec, PlanValue};
3use llkv_result::{Error, Result as LlkvResult};
4use rustc_hash::{FxHashMap, FxHashSet};
5use sqlparser::ast::{self, Expr as SqlExpr};
6use sqlparser::dialect::GenericDialect;
7use sqlparser::parser::Parser;
8
9use super::types::ForeignKeyAction;
10use crate::types::{FieldId, TableId};
11
12/// Lightweight column descriptor used for constraint validation.
13#[derive(Clone, Debug)]
14pub struct ConstraintColumnInfo {
15    pub name: String,
16    pub field_id: FieldId,
17    pub data_type: DataType,
18    pub nullable: bool,
19    pub check_expr: Option<String>,
20}
21
22/// Canonical representation of values participating in UNIQUE or PRIMARY KEY checks.
23#[derive(Hash, Eq, PartialEq, Debug, Clone)]
24pub enum UniqueKey {
25    Int(i64),
26    Float(u64),
27    Str(String),
28    Composite(Vec<UniqueKey>),
29}
30
31/// Validate all CHECK constraints for the provided rows.
32pub fn validate_check_constraints(
33    columns: &[ConstraintColumnInfo],
34    rows: &[Vec<PlanValue>],
35    column_order: &[usize],
36) -> LlkvResult<()> {
37    if rows.is_empty() {
38        return Ok(());
39    }
40
41    let dialect = GenericDialect {};
42
43    let mut parsed_checks: Vec<(usize, String, String, SqlExpr)> = Vec::new();
44
45    for (idx, column) in columns.iter().enumerate() {
46        if let Some(expr_str) = &column.check_expr {
47            let expr = parse_check_expression(&dialect, expr_str)?;
48            parsed_checks.push((idx, column.name.clone(), expr_str.clone(), expr));
49        }
50    }
51
52    if parsed_checks.is_empty() {
53        return Ok(());
54    }
55
56    let mut name_lookup: FxHashMap<String, usize> = FxHashMap::default();
57    for (idx, column) in columns.iter().enumerate() {
58        name_lookup.insert(column.name.to_ascii_lowercase(), idx);
59    }
60
61    for row in rows {
62        for (_schema_idx, column_name, expr_str, expr) in &parsed_checks {
63            let result = evaluate_check_expression(expr, row, column_order, columns, &name_lookup)?;
64
65            if !result {
66                return Err(Error::ConstraintError(format!(
67                    "CHECK constraint failed for column '{}': {}",
68                    column_name, expr_str
69                )));
70            }
71        }
72    }
73
74    Ok(())
75}
76
77/// Ensure that the provided multi-column values maintain uniqueness.
78pub fn ensure_multi_column_unique(
79    existing_rows: &[Vec<PlanValue>],
80    new_rows: &[Vec<PlanValue>],
81    column_names: &[String],
82) -> LlkvResult<()> {
83    let mut existing_keys: FxHashSet<UniqueKey> = FxHashSet::default();
84    for values in existing_rows {
85        if let Some(key) = build_composite_unique_key(values, column_names)?
86            && !existing_keys.insert(key.clone())
87        {
88            return Err(Error::ConstraintError(format!(
89                "constraint violation on columns '{}'",
90                column_names.join(", ")
91            )));
92        }
93    }
94
95    let mut new_keys: FxHashSet<UniqueKey> = FxHashSet::default();
96    for values in new_rows {
97        if let Some(key) = build_composite_unique_key(values, column_names)?
98            && (existing_keys.contains(&key) || !new_keys.insert(key))
99        {
100            return Err(Error::ConstraintError(format!(
101                "constraint violation on columns '{}'",
102                column_names.join(", ")
103            )));
104        }
105    }
106
107    Ok(())
108}
109
110/// Build a unique key component for a single value.
111pub fn unique_key_component(value: &PlanValue, column_name: &str) -> LlkvResult<Option<UniqueKey>> {
112    match value {
113        PlanValue::Null => Ok(None),
114        PlanValue::Integer(v) => Ok(Some(UniqueKey::Int(*v))),
115        PlanValue::Float(v) => Ok(Some(UniqueKey::Float(v.to_bits()))),
116        PlanValue::String(s) => Ok(Some(UniqueKey::Str(s.clone()))),
117        PlanValue::Struct(_) => Err(Error::InvalidArgumentError(format!(
118            "UNIQUE index is not supported on struct column '{}'",
119            column_name
120        ))),
121    }
122}
123
124/// Build a composite unique key from column values.
125pub fn build_composite_unique_key(
126    values: &[PlanValue],
127    column_names: &[String],
128) -> LlkvResult<Option<UniqueKey>> {
129    if values.is_empty() {
130        return Ok(None);
131    }
132
133    let mut components = Vec::with_capacity(values.len());
134    for (value, column_name) in values.iter().zip(column_names) {
135        match unique_key_component(value, column_name)? {
136            Some(component) => components.push(component),
137            None => return Ok(None),
138        }
139    }
140
141    Ok(Some(UniqueKey::Composite(components)))
142}
143
144fn parse_check_expression(dialect: &GenericDialect, check_expr_str: &str) -> LlkvResult<SqlExpr> {
145    let sql = format!("SELECT {}", check_expr_str);
146    let mut ast = Parser::parse_sql(dialect, &sql).map_err(|e| {
147        Error::InvalidArgumentError(format!(
148            "Failed to parse CHECK expression '{}': {}",
149            check_expr_str, e
150        ))
151    })?;
152
153    let stmt = ast.pop().ok_or_else(|| {
154        Error::InvalidArgumentError(format!(
155            "CHECK expression '{}' resulted in empty AST",
156            check_expr_str
157        ))
158    })?;
159
160    let query = match stmt {
161        ast::Statement::Query(q) => q,
162        _ => {
163            return Err(Error::InvalidArgumentError(format!(
164                "CHECK expression '{}' did not parse as SELECT",
165                check_expr_str
166            )));
167        }
168    };
169
170    let body = match *query.body {
171        ast::SetExpr::Select(s) => s,
172        _ => {
173            return Err(Error::InvalidArgumentError(format!(
174                "CHECK expression '{}' is not a simple SELECT",
175                check_expr_str
176            )));
177        }
178    };
179
180    if body.projection.len() != 1 {
181        return Err(Error::InvalidArgumentError(format!(
182            "CHECK expression '{}' must have exactly one projection",
183            check_expr_str
184        )));
185    }
186
187    match &body.projection[0] {
188        ast::SelectItem::UnnamedExpr(expr) | ast::SelectItem::ExprWithAlias { expr, .. } => {
189            Ok(expr.clone())
190        }
191        _ => Err(Error::InvalidArgumentError(format!(
192            "CHECK expression '{}' projection is not a simple expression",
193            check_expr_str
194        ))),
195    }
196}
197
198fn evaluate_check_expression(
199    expr: &SqlExpr,
200    row: &[PlanValue],
201    column_order: &[usize],
202    columns: &[ConstraintColumnInfo],
203    name_lookup: &FxHashMap<String, usize>,
204) -> LlkvResult<bool> {
205    use sqlparser::ast::BinaryOperator;
206
207    match expr {
208        SqlExpr::BinaryOp { left, op, right } => {
209            let left_val =
210                evaluate_check_expr_value(left, row, column_order, columns, name_lookup)?;
211            let right_val =
212                evaluate_check_expr_value(right, row, column_order, columns, name_lookup)?;
213
214            match op {
215                BinaryOperator::Eq => {
216                    if matches!(left_val, PlanValue::Null) || matches!(right_val, PlanValue::Null) {
217                        Ok(true)
218                    } else {
219                        Ok(left_val == right_val)
220                    }
221                }
222                BinaryOperator::NotEq => {
223                    if matches!(left_val, PlanValue::Null) || matches!(right_val, PlanValue::Null) {
224                        Ok(true)
225                    } else {
226                        Ok(left_val != right_val)
227                    }
228                }
229                BinaryOperator::Lt => compare_numeric(&left_val, &right_val, |l, r| l < r),
230                BinaryOperator::LtEq => compare_numeric(&left_val, &right_val, |l, r| l <= r),
231                BinaryOperator::Gt => compare_numeric(&left_val, &right_val, |l, r| l > r),
232                BinaryOperator::GtEq => compare_numeric(&left_val, &right_val, |l, r| l >= r),
233                _ => Err(Error::InvalidArgumentError(format!(
234                    "Unsupported operator in CHECK constraint: {:?}",
235                    op
236                ))),
237            }
238        }
239        SqlExpr::IsNull(inner) => {
240            let value = evaluate_check_expr_value(inner, row, column_order, columns, name_lookup)?;
241            Ok(matches!(value, PlanValue::Null))
242        }
243        SqlExpr::IsNotNull(inner) => {
244            let value = evaluate_check_expr_value(inner, row, column_order, columns, name_lookup)?;
245            Ok(!matches!(value, PlanValue::Null))
246        }
247        SqlExpr::Nested(inner) => {
248            evaluate_check_expression(inner, row, column_order, columns, name_lookup)
249        }
250        _ => Err(Error::InvalidArgumentError(format!(
251            "Unsupported expression in CHECK constraint: {:?}",
252            expr
253        ))),
254    }
255}
256
257#[allow(clippy::only_used_in_recursion)]
258fn evaluate_check_expr_value(
259    expr: &SqlExpr,
260    row: &[PlanValue],
261    column_order: &[usize],
262    columns: &[ConstraintColumnInfo],
263    name_lookup: &FxHashMap<String, usize>,
264) -> LlkvResult<PlanValue> {
265    use sqlparser::ast::{BinaryOperator, Expr as SqlExpr};
266
267    match expr {
268        SqlExpr::BinaryOp { left, op, right } => {
269            let left_val =
270                evaluate_check_expr_value(left, row, column_order, columns, name_lookup)?;
271            let right_val =
272                evaluate_check_expr_value(right, row, column_order, columns, name_lookup)?;
273
274            match op {
275                BinaryOperator::Plus => apply_numeric_op(left_val, right_val, |l, r| l + r),
276                BinaryOperator::Minus => apply_numeric_op(left_val, right_val, |l, r| l - r),
277                BinaryOperator::Multiply => apply_numeric_op(left_val, right_val, |l, r| l * r),
278                BinaryOperator::Divide => divide_numeric(left_val, right_val),
279                _ => Err(Error::InvalidArgumentError(format!(
280                    "Unsupported binary operator in CHECK constraint value expression: {:?}",
281                    op
282                ))),
283            }
284        }
285        SqlExpr::Identifier(ident) => {
286            let column_idx = lookup_column_index(name_lookup, &ident.value)?;
287            extract_row_value(row, column_order, column_idx, &ident.value)
288        }
289        SqlExpr::CompoundIdentifier(idents) => {
290            if idents.len() == 2 {
291                let column_name = &idents[0].value;
292                let field_name = &idents[1].value;
293                let column_idx = lookup_column_index(name_lookup, column_name)?;
294                let value = extract_row_value(row, column_order, column_idx, column_name)?;
295                extract_struct_field(value, column_name, field_name)
296            } else if idents.len() == 3 {
297                let column_name = &idents[1].value;
298                let field_name = &idents[2].value;
299                let column_idx = lookup_column_index(name_lookup, column_name)?;
300                let value = extract_row_value(row, column_order, column_idx, column_name)?;
301                extract_struct_field(value, column_name, field_name)
302            } else {
303                Err(Error::InvalidArgumentError(format!(
304                    "Unsupported compound identifier in CHECK constraint: {} parts",
305                    idents.len()
306                )))
307            }
308        }
309        SqlExpr::Value(val_with_span) => match &val_with_span.value {
310            ast::Value::Number(n, _) => {
311                if let Ok(i) = n.parse::<i64>() {
312                    Ok(PlanValue::Integer(i))
313                } else if let Ok(f) = n.parse::<f64>() {
314                    Ok(PlanValue::Float(f))
315                } else {
316                    Err(Error::InvalidArgumentError(format!(
317                        "Invalid number in CHECK constraint: {}",
318                        n
319                    )))
320                }
321            }
322            ast::Value::SingleQuotedString(s) | ast::Value::DoubleQuotedString(s) => {
323                Ok(PlanValue::String(s.clone()))
324            }
325            ast::Value::Null => Ok(PlanValue::Null),
326            _ => Err(Error::InvalidArgumentError(format!(
327                "Unsupported value type in CHECK constraint: {:?}",
328                val_with_span.value
329            ))),
330        },
331        SqlExpr::Nested(inner) => {
332            evaluate_check_expr_value(inner, row, column_order, columns, name_lookup)
333        }
334        _ => Err(Error::InvalidArgumentError(format!(
335            "Unsupported expression type in CHECK constraint: {:?}",
336            expr
337        ))),
338    }
339}
340
341fn lookup_column_index(
342    name_lookup: &FxHashMap<String, usize>,
343    column_name: &str,
344) -> LlkvResult<usize> {
345    name_lookup
346        .get(&column_name.to_ascii_lowercase())
347        .copied()
348        .ok_or_else(|| {
349            Error::InvalidArgumentError(format!(
350                "Unknown column '{}' in CHECK constraint",
351                column_name
352            ))
353        })
354}
355
356fn extract_row_value(
357    row: &[PlanValue],
358    column_order: &[usize],
359    schema_idx: usize,
360    column_name: &str,
361) -> LlkvResult<PlanValue> {
362    let insert_pos = column_order
363        .iter()
364        .position(|&dest_idx| dest_idx == schema_idx)
365        .ok_or_else(|| {
366            Error::InvalidArgumentError(format!("Column '{}' not provided in INSERT", column_name))
367        })?;
368
369    Ok(row[insert_pos].clone())
370}
371
372fn extract_struct_field(
373    value: PlanValue,
374    column_name: &str,
375    field_name: &str,
376) -> LlkvResult<PlanValue> {
377    match value {
378        PlanValue::Struct(fields) => fields
379            .into_iter()
380            .find(|(name, _)| name.eq_ignore_ascii_case(field_name))
381            .map(|(_, val)| val)
382            .ok_or_else(|| {
383                Error::InvalidArgumentError(format!(
384                    "Struct field '{}' not found in column '{}'",
385                    field_name, column_name
386                ))
387            }),
388        _ => Err(Error::InvalidArgumentError(format!(
389            "Column '{}' is not a struct, cannot access field '{}'",
390            column_name, field_name
391        ))),
392    }
393}
394
395fn compare_numeric<F>(left: &PlanValue, right: &PlanValue, compare: F) -> LlkvResult<bool>
396where
397    F: Fn(f64, f64) -> bool,
398{
399    if matches!(left, PlanValue::Null) || matches!(right, PlanValue::Null) {
400        // In SQL, any comparison with NULL yields UNKNOWN.
401        // For CHECK constraints, UNKNOWN is treated as TRUE (constraint passes).
402        return Ok(true);
403    }
404
405    match (left, right) {
406        (PlanValue::Integer(l), PlanValue::Integer(r)) => Ok(compare(*l as f64, *r as f64)),
407        (PlanValue::Float(l), PlanValue::Float(r)) => Ok(compare(*l, *r)),
408        (PlanValue::Integer(l), PlanValue::Float(r)) => Ok(compare(*l as f64, *r)),
409        (PlanValue::Float(l), PlanValue::Integer(r)) => Ok(compare(*l, *r as f64)),
410        _ => Err(Error::InvalidArgumentError(
411            "CHECK constraint comparison requires numeric values".into(),
412        )),
413    }
414}
415
416fn apply_numeric_op(
417    left: PlanValue,
418    right: PlanValue,
419    op: fn(f64, f64) -> f64,
420) -> LlkvResult<PlanValue> {
421    if matches!(left, PlanValue::Null) || matches!(right, PlanValue::Null) {
422        return Ok(PlanValue::Null);
423    }
424
425    match (left, right) {
426        (PlanValue::Integer(l), PlanValue::Integer(r)) => {
427            let result = op(l as f64, r as f64);
428            if result.fract() == 0.0 {
429                Ok(PlanValue::Integer(result as i64))
430            } else {
431                Ok(PlanValue::Float(result))
432            }
433        }
434        (PlanValue::Float(l), PlanValue::Float(r)) => Ok(PlanValue::Float(op(l, r))),
435        (PlanValue::Integer(l), PlanValue::Float(r)) => Ok(PlanValue::Float(op(l as f64, r))),
436        (PlanValue::Float(l), PlanValue::Integer(r)) => Ok(PlanValue::Float(op(l, r as f64))),
437        _ => Err(Error::InvalidArgumentError(
438            "CHECK constraint arithmetic requires numeric values".into(),
439        )),
440    }
441}
442
443fn divide_numeric(left: PlanValue, right: PlanValue) -> LlkvResult<PlanValue> {
444    if matches!(left, PlanValue::Null) || matches!(right, PlanValue::Null) {
445        return Ok(PlanValue::Null);
446    }
447
448    match (left, right) {
449        (PlanValue::Integer(l), PlanValue::Integer(r)) => {
450            if r == 0 {
451                Err(Error::InvalidArgumentError(
452                    "Division by zero in CHECK constraint".into(),
453                ))
454            } else {
455                Ok(PlanValue::Integer(l / r))
456            }
457        }
458        (PlanValue::Float(l), PlanValue::Float(r)) => {
459            if r == 0.0 {
460                Err(Error::InvalidArgumentError(
461                    "Division by zero in CHECK constraint".into(),
462                ))
463            } else {
464                Ok(PlanValue::Float(l / r))
465            }
466        }
467        (PlanValue::Integer(l), PlanValue::Float(r)) => {
468            if r == 0.0 {
469                Err(Error::InvalidArgumentError(
470                    "Division by zero in CHECK constraint".into(),
471                ))
472            } else {
473                Ok(PlanValue::Float(l as f64 / r))
474            }
475        }
476        (PlanValue::Float(l), PlanValue::Integer(r)) => {
477            if r == 0 {
478                Err(Error::InvalidArgumentError(
479                    "Division by zero in CHECK constraint".into(),
480                ))
481            } else {
482                Ok(PlanValue::Float(l / r as f64))
483            }
484        }
485        _ => Err(Error::InvalidArgumentError(
486            "CHECK constraint / operator requires numeric values".into(),
487        )),
488    }
489}
490
491// ============================================================================
492// Foreign key validation
493// ============================================================================
494
495/// Column metadata used when validating foreign key definitions.
496#[derive(Clone, Debug)]
497pub struct ForeignKeyColumn {
498    pub name: String,
499    pub data_type: DataType,
500    pub nullable: bool,
501    pub primary_key: bool,
502    pub unique: bool,
503    pub field_id: FieldId,
504}
505
506/// Table metadata used when validating foreign key definitions.
507#[derive(Clone, Debug)]
508pub struct ForeignKeyTableInfo {
509    pub display_name: String,
510    pub canonical_name: String,
511    pub table_id: TableId,
512    pub columns: Vec<ForeignKeyColumn>,
513}
514
515/// Result of validating a foreign key specification.
516#[derive(Clone, Debug)]
517pub struct ValidatedForeignKey {
518    pub name: Option<String>,
519    pub referencing_indices: Vec<usize>,
520    pub referencing_field_ids: Vec<FieldId>,
521    pub referencing_column_names: Vec<String>,
522    pub referenced_table_id: TableId,
523    pub referenced_table_display: String,
524    pub referenced_table_canonical: String,
525    pub referenced_field_ids: Vec<FieldId>,
526    pub referenced_column_names: Vec<String>,
527    pub on_delete: ForeignKeyAction,
528    pub on_update: ForeignKeyAction,
529}
530
531/// Validate a set of foreign key specifications against the provided table schemas.
532pub fn validate_foreign_keys<F>(
533    referencing_table: &ForeignKeyTableInfo,
534    specs: &[ForeignKeySpec],
535    mut lookup_table: F,
536) -> LlkvResult<Vec<ValidatedForeignKey>>
537where
538    F: FnMut(&str) -> LlkvResult<ForeignKeyTableInfo>,
539{
540    if specs.is_empty() {
541        return Ok(Vec::new());
542    }
543
544    let mut referencing_lookup: FxHashMap<String, (usize, &ForeignKeyColumn)> =
545        FxHashMap::default();
546    for (idx, column) in referencing_table.columns.iter().enumerate() {
547        referencing_lookup.insert(column.name.to_ascii_lowercase(), (idx, column));
548    }
549
550    let mut results = Vec::with_capacity(specs.len());
551
552    for spec in specs {
553        if spec.columns.is_empty() {
554            return Err(Error::InvalidArgumentError(
555                "FOREIGN KEY requires at least one referencing column".into(),
556            ));
557        }
558
559        let mut seen_referencing = FxHashSet::default();
560        let mut referencing_indices = Vec::with_capacity(spec.columns.len());
561        let mut referencing_field_ids = Vec::with_capacity(spec.columns.len());
562        let mut referencing_column_defs = Vec::with_capacity(spec.columns.len());
563        let mut referencing_column_names = Vec::with_capacity(spec.columns.len());
564
565        for column_name in &spec.columns {
566            let normalized = column_name.to_ascii_lowercase();
567            if !seen_referencing.insert(normalized.clone()) {
568                return Err(Error::InvalidArgumentError(format!(
569                    "duplicate column '{}' in FOREIGN KEY constraint",
570                    column_name
571                )));
572            }
573
574            let (idx, column) = referencing_lookup.get(&normalized).ok_or_else(|| {
575                Error::InvalidArgumentError(format!(
576                    "unknown column '{}' in FOREIGN KEY constraint",
577                    column_name
578                ))
579            })?;
580
581            referencing_indices.push(*idx);
582            referencing_field_ids.push(column.field_id);
583            referencing_column_defs.push((*column).clone());
584            referencing_column_names.push(column.name.clone());
585        }
586
587        let referenced_table_info = lookup_table(&spec.referenced_table)?;
588
589        let referenced_columns = if spec.referenced_columns.is_empty() {
590            referenced_table_info
591                .columns
592                .iter()
593                .filter(|col| col.primary_key)
594                .map(|col| col.name.clone())
595                .collect::<Vec<_>>()
596        } else {
597            spec.referenced_columns.clone()
598        };
599
600        if referenced_columns.is_empty() {
601            return Err(Error::InvalidArgumentError(format!(
602                "there is no primary key for referenced table '{}'",
603                spec.referenced_table
604            )));
605        }
606
607        if spec.columns.len() != referenced_columns.len() {
608            return Err(Error::InvalidArgumentError(format!(
609                "number of referencing columns ({}) does not match number of referenced columns ({})",
610                spec.columns.len(),
611                referenced_columns.len()
612            )));
613        }
614
615        let mut seen_referenced = FxHashSet::default();
616        let mut referenced_lookup: FxHashMap<String, &ForeignKeyColumn> = FxHashMap::default();
617        for column in &referenced_table_info.columns {
618            referenced_lookup.insert(column.name.to_ascii_lowercase(), column);
619        }
620
621        let mut referenced_field_ids = Vec::with_capacity(referenced_columns.len());
622        let mut referenced_column_defs = Vec::with_capacity(referenced_columns.len());
623        let mut referenced_column_names = Vec::with_capacity(referenced_columns.len());
624
625        for column_name in referenced_columns.iter() {
626            let normalized = column_name.to_ascii_lowercase();
627            if !seen_referenced.insert(normalized.clone()) {
628                return Err(Error::InvalidArgumentError(format!(
629                    "duplicate referenced column '{}' in FOREIGN KEY constraint",
630                    column_name
631                )));
632            }
633
634            let column = referenced_lookup.get(&normalized).ok_or_else(|| {
635                Error::InvalidArgumentError(format!(
636                    "unknown referenced column '{}' in table '{}'",
637                    column_name, referenced_table_info.display_name
638                ))
639            })?;
640
641            if !column.primary_key && !column.unique {
642                return Err(Error::InvalidArgumentError(format!(
643                    "FOREIGN KEY references column '{}' in table '{}' that is not UNIQUE or PRIMARY KEY",
644                    column_name, referenced_table_info.display_name
645                )));
646            }
647
648            referenced_field_ids.push(column.field_id);
649            referenced_column_defs.push((*column).clone());
650            referenced_column_names.push(column.name.clone());
651        }
652
653        for (child_col, parent_col) in referencing_column_defs
654            .iter()
655            .zip(referenced_column_defs.iter())
656        {
657            if child_col.data_type != parent_col.data_type {
658                return Err(Error::InvalidArgumentError(format!(
659                    "FOREIGN KEY column '{}' type {:?} does not match referenced column '{}' type {:?}",
660                    child_col.name, child_col.data_type, parent_col.name, parent_col.data_type
661                )));
662            }
663
664            // Nullable child referencing non-null parent is allowed; no additional action required.
665        }
666
667        results.push(ValidatedForeignKey {
668            name: spec.name.clone(),
669            referencing_indices,
670            referencing_field_ids,
671            referencing_column_names,
672            referenced_table_id: referenced_table_info.table_id,
673            referenced_table_display: referenced_table_info.display_name.clone(),
674            referenced_table_canonical: referenced_table_info.canonical_name.clone(),
675            referenced_field_ids,
676            referenced_column_names,
677            on_delete: map_plan_action(spec.on_delete.clone()),
678            on_update: map_plan_action(spec.on_update.clone()),
679        });
680    }
681
682    Ok(results)
683}
684
685fn map_plan_action(action: PlanForeignKeyAction) -> ForeignKeyAction {
686    match action {
687        PlanForeignKeyAction::NoAction => ForeignKeyAction::NoAction,
688        PlanForeignKeyAction::Restrict => ForeignKeyAction::Restrict,
689    }
690}
691
692// ============================================================================
693// Runtime constraint helpers
694// ============================================================================
695
696/// Ensure existing + incoming values remain unique for a single column.
697pub fn ensure_single_column_unique(
698    existing_values: &[PlanValue],
699    new_values: &[PlanValue],
700    column_name: &str,
701) -> LlkvResult<()> {
702    let mut seen: FxHashSet<UniqueKey> = FxHashSet::default();
703
704    for value in existing_values {
705        if let Some(key) = unique_key_component(value, column_name)?
706            && !seen.insert(key.clone())
707        {
708            return Err(Error::ConstraintError(format!(
709                "constraint violation on column '{}'",
710                column_name
711            )));
712        }
713    }
714
715    for value in new_values {
716        if let Some(key) = unique_key_component(value, column_name)?
717            && !seen.insert(key.clone())
718        {
719            return Err(Error::ConstraintError(format!(
720                "constraint violation on column '{}'",
721                column_name
722            )));
723        }
724    }
725
726    Ok(())
727}
728
729/// Ensure primary key values remain unique and non-null.
730pub fn ensure_primary_key(
731    existing_rows: &[Vec<PlanValue>],
732    new_rows: &[Vec<PlanValue>],
733    column_names: &[String],
734) -> LlkvResult<()> {
735    let pk_label = if column_names.len() == 1 {
736        "column"
737    } else {
738        "columns"
739    };
740    let pk_display = if column_names.len() == 1 {
741        column_names[0].clone()
742    } else {
743        column_names.join(", ")
744    };
745
746    let mut seen: FxHashSet<UniqueKey> = FxHashSet::default();
747
748    for row_values in existing_rows {
749        if row_values.len() != column_names.len() {
750            continue;
751        }
752
753        let key = build_composite_unique_key(row_values, column_names)?;
754        let key = key.ok_or_else(|| {
755            Error::ConstraintError(format!(
756                "constraint failed: NOT NULL constraint failed for PRIMARY KEY {pk_label} '{pk_display}'"
757            ))
758        })?;
759
760        if !seen.insert(key.clone()) {
761            return Err(Error::ConstraintError(format!(
762                "Duplicate key violates primary key constraint on {pk_label} '{pk_display}' (PRIMARY KEY or UNIQUE constraint violation)"
763            )));
764        }
765    }
766
767    for row_values in new_rows {
768        if row_values.len() != column_names.len() {
769            continue;
770        }
771
772        let key = build_composite_unique_key(row_values, column_names)?;
773        let key = key.ok_or_else(|| {
774            Error::ConstraintError(format!(
775                "constraint failed: NOT NULL constraint failed for PRIMARY KEY {pk_label} '{pk_display}'"
776            ))
777        })?;
778
779        if !seen.insert(key.clone()) {
780            return Err(Error::ConstraintError(format!(
781                "Duplicate key violates primary key constraint on {pk_label} '{pk_display}' (PRIMARY KEY or UNIQUE constraint violation)"
782            )));
783        }
784    }
785
786    Ok(())
787}
788
789/// Ensure that referencing rows satisfy the foreign key constraint by matching existing parent keys.
790pub fn validate_foreign_key_rows(
791    constraint_name: Option<&str>,
792    referencing_table: &str,
793    referenced_table: &str,
794    referenced_column_names: &[String],
795    parent_keys: &[Vec<PlanValue>],
796    candidate_keys: &[Vec<PlanValue>],
797) -> LlkvResult<()> {
798    if parent_keys.is_empty() {
799        // If there are no parent keys, every non-null candidate will fail.
800        for key in candidate_keys {
801            if key.iter().all(|value| !matches!(value, PlanValue::Null)) {
802                let constraint_label = constraint_name.unwrap_or("FOREIGN KEY");
803                let referenced_columns = if referenced_column_names.is_empty() {
804                    String::from("<unknown>")
805                } else {
806                    referenced_column_names.join(", ")
807                };
808                return Err(Error::ConstraintError(format!(
809                    "Violates foreign key constraint '{}' on table '{}' referencing '{}' (columns: {}) - does not exist in the referenced table",
810                    constraint_label, referencing_table, referenced_table, referenced_columns,
811                )));
812            }
813        }
814        return Ok(());
815    }
816
817    for key in candidate_keys {
818        if key.iter().any(|value| matches!(value, PlanValue::Null)) {
819            continue;
820        }
821
822        if parent_keys.iter().any(|existing| existing == key) {
823            continue;
824        }
825
826        let constraint_label = constraint_name.unwrap_or("FOREIGN KEY");
827        let referenced_columns = if referenced_column_names.is_empty() {
828            String::from("<unknown>")
829        } else {
830            referenced_column_names.join(", ")
831        };
832
833        return Err(Error::ConstraintError(format!(
834            "Violates foreign key constraint '{}' on table '{}' referencing '{}' (columns: {}) - does not exist in the referenced table",
835            constraint_label, referencing_table, referenced_table, referenced_columns,
836        )));
837    }
838
839    Ok(())
840}