1use arrow::datatypes::DataType;
2use llkv_plan::{ForeignKeyAction as PlanForeignKeyAction, ForeignKeySpec, PlanValue};
3use llkv_result::{Error, Result as LlkvResult};
4use rustc_hash::{FxHashMap, FxHashSet};
5use sqlparser::ast::{self, Expr as SqlExpr};
6use sqlparser::dialect::GenericDialect;
7use sqlparser::parser::Parser;
8
9use super::types::ForeignKeyAction;
10use crate::types::{FieldId, TableId};
11
12#[derive(Clone, Debug)]
14pub struct ConstraintColumnInfo {
15 pub name: String,
16 pub field_id: FieldId,
17 pub data_type: DataType,
18 pub nullable: bool,
19 pub check_expr: Option<String>,
20}
21
22#[derive(Hash, Eq, PartialEq, Debug, Clone)]
24pub enum UniqueKey {
25 Int(i64),
26 Float(u64),
27 Str(String),
28 Composite(Vec<UniqueKey>),
29}
30
31pub fn validate_check_constraints(
33 columns: &[ConstraintColumnInfo],
34 rows: &[Vec<PlanValue>],
35 column_order: &[usize],
36) -> LlkvResult<()> {
37 if rows.is_empty() {
38 return Ok(());
39 }
40
41 let dialect = GenericDialect {};
42
43 let mut parsed_checks: Vec<(usize, String, String, SqlExpr)> = Vec::new();
44
45 for (idx, column) in columns.iter().enumerate() {
46 if let Some(expr_str) = &column.check_expr {
47 let expr = parse_check_expression(&dialect, expr_str)?;
48 parsed_checks.push((idx, column.name.clone(), expr_str.clone(), expr));
49 }
50 }
51
52 if parsed_checks.is_empty() {
53 return Ok(());
54 }
55
56 let mut name_lookup: FxHashMap<String, usize> = FxHashMap::default();
57 for (idx, column) in columns.iter().enumerate() {
58 name_lookup.insert(column.name.to_ascii_lowercase(), idx);
59 }
60
61 for row in rows {
62 for (_schema_idx, column_name, expr_str, expr) in &parsed_checks {
63 let result = evaluate_check_expression(expr, row, column_order, columns, &name_lookup)?;
64
65 if !result {
66 return Err(Error::ConstraintError(format!(
67 "CHECK constraint failed for column '{}': {}",
68 column_name, expr_str
69 )));
70 }
71 }
72 }
73
74 Ok(())
75}
76
77pub fn ensure_multi_column_unique(
79 existing_rows: &[Vec<PlanValue>],
80 new_rows: &[Vec<PlanValue>],
81 column_names: &[String],
82) -> LlkvResult<()> {
83 let mut existing_keys: FxHashSet<UniqueKey> = FxHashSet::default();
84 for values in existing_rows {
85 if let Some(key) = build_composite_unique_key(values, column_names)?
86 && !existing_keys.insert(key.clone())
87 {
88 return Err(Error::ConstraintError(format!(
89 "constraint violation on columns '{}'",
90 column_names.join(", ")
91 )));
92 }
93 }
94
95 let mut new_keys: FxHashSet<UniqueKey> = FxHashSet::default();
96 for values in new_rows {
97 if let Some(key) = build_composite_unique_key(values, column_names)?
98 && (existing_keys.contains(&key) || !new_keys.insert(key))
99 {
100 return Err(Error::ConstraintError(format!(
101 "constraint violation on columns '{}'",
102 column_names.join(", ")
103 )));
104 }
105 }
106
107 Ok(())
108}
109
110pub fn unique_key_component(value: &PlanValue, column_name: &str) -> LlkvResult<Option<UniqueKey>> {
112 match value {
113 PlanValue::Null => Ok(None),
114 PlanValue::Integer(v) => Ok(Some(UniqueKey::Int(*v))),
115 PlanValue::Float(v) => Ok(Some(UniqueKey::Float(v.to_bits()))),
116 PlanValue::String(s) => Ok(Some(UniqueKey::Str(s.clone()))),
117 PlanValue::Struct(_) => Err(Error::InvalidArgumentError(format!(
118 "UNIQUE index is not supported on struct column '{}'",
119 column_name
120 ))),
121 }
122}
123
124pub fn build_composite_unique_key(
126 values: &[PlanValue],
127 column_names: &[String],
128) -> LlkvResult<Option<UniqueKey>> {
129 if values.is_empty() {
130 return Ok(None);
131 }
132
133 let mut components = Vec::with_capacity(values.len());
134 for (value, column_name) in values.iter().zip(column_names) {
135 match unique_key_component(value, column_name)? {
136 Some(component) => components.push(component),
137 None => return Ok(None),
138 }
139 }
140
141 Ok(Some(UniqueKey::Composite(components)))
142}
143
144fn parse_check_expression(dialect: &GenericDialect, check_expr_str: &str) -> LlkvResult<SqlExpr> {
145 let sql = format!("SELECT {}", check_expr_str);
146 let mut ast = Parser::parse_sql(dialect, &sql).map_err(|e| {
147 Error::InvalidArgumentError(format!(
148 "Failed to parse CHECK expression '{}': {}",
149 check_expr_str, e
150 ))
151 })?;
152
153 let stmt = ast.pop().ok_or_else(|| {
154 Error::InvalidArgumentError(format!(
155 "CHECK expression '{}' resulted in empty AST",
156 check_expr_str
157 ))
158 })?;
159
160 let query = match stmt {
161 ast::Statement::Query(q) => q,
162 _ => {
163 return Err(Error::InvalidArgumentError(format!(
164 "CHECK expression '{}' did not parse as SELECT",
165 check_expr_str
166 )));
167 }
168 };
169
170 let body = match *query.body {
171 ast::SetExpr::Select(s) => s,
172 _ => {
173 return Err(Error::InvalidArgumentError(format!(
174 "CHECK expression '{}' is not a simple SELECT",
175 check_expr_str
176 )));
177 }
178 };
179
180 if body.projection.len() != 1 {
181 return Err(Error::InvalidArgumentError(format!(
182 "CHECK expression '{}' must have exactly one projection",
183 check_expr_str
184 )));
185 }
186
187 match &body.projection[0] {
188 ast::SelectItem::UnnamedExpr(expr) | ast::SelectItem::ExprWithAlias { expr, .. } => {
189 Ok(expr.clone())
190 }
191 _ => Err(Error::InvalidArgumentError(format!(
192 "CHECK expression '{}' projection is not a simple expression",
193 check_expr_str
194 ))),
195 }
196}
197
198fn evaluate_check_expression(
199 expr: &SqlExpr,
200 row: &[PlanValue],
201 column_order: &[usize],
202 columns: &[ConstraintColumnInfo],
203 name_lookup: &FxHashMap<String, usize>,
204) -> LlkvResult<bool> {
205 use sqlparser::ast::BinaryOperator;
206
207 match expr {
208 SqlExpr::BinaryOp { left, op, right } => {
209 let left_val =
210 evaluate_check_expr_value(left, row, column_order, columns, name_lookup)?;
211 let right_val =
212 evaluate_check_expr_value(right, row, column_order, columns, name_lookup)?;
213
214 match op {
215 BinaryOperator::Eq => {
216 if matches!(left_val, PlanValue::Null) || matches!(right_val, PlanValue::Null) {
217 Ok(true)
218 } else {
219 Ok(left_val == right_val)
220 }
221 }
222 BinaryOperator::NotEq => {
223 if matches!(left_val, PlanValue::Null) || matches!(right_val, PlanValue::Null) {
224 Ok(true)
225 } else {
226 Ok(left_val != right_val)
227 }
228 }
229 BinaryOperator::Lt => compare_numeric(&left_val, &right_val, |l, r| l < r),
230 BinaryOperator::LtEq => compare_numeric(&left_val, &right_val, |l, r| l <= r),
231 BinaryOperator::Gt => compare_numeric(&left_val, &right_val, |l, r| l > r),
232 BinaryOperator::GtEq => compare_numeric(&left_val, &right_val, |l, r| l >= r),
233 _ => Err(Error::InvalidArgumentError(format!(
234 "Unsupported operator in CHECK constraint: {:?}",
235 op
236 ))),
237 }
238 }
239 SqlExpr::IsNull(inner) => {
240 let value = evaluate_check_expr_value(inner, row, column_order, columns, name_lookup)?;
241 Ok(matches!(value, PlanValue::Null))
242 }
243 SqlExpr::IsNotNull(inner) => {
244 let value = evaluate_check_expr_value(inner, row, column_order, columns, name_lookup)?;
245 Ok(!matches!(value, PlanValue::Null))
246 }
247 SqlExpr::Nested(inner) => {
248 evaluate_check_expression(inner, row, column_order, columns, name_lookup)
249 }
250 _ => Err(Error::InvalidArgumentError(format!(
251 "Unsupported expression in CHECK constraint: {:?}",
252 expr
253 ))),
254 }
255}
256
257#[allow(clippy::only_used_in_recursion)]
258fn evaluate_check_expr_value(
259 expr: &SqlExpr,
260 row: &[PlanValue],
261 column_order: &[usize],
262 columns: &[ConstraintColumnInfo],
263 name_lookup: &FxHashMap<String, usize>,
264) -> LlkvResult<PlanValue> {
265 use sqlparser::ast::{BinaryOperator, Expr as SqlExpr};
266
267 match expr {
268 SqlExpr::BinaryOp { left, op, right } => {
269 let left_val =
270 evaluate_check_expr_value(left, row, column_order, columns, name_lookup)?;
271 let right_val =
272 evaluate_check_expr_value(right, row, column_order, columns, name_lookup)?;
273
274 match op {
275 BinaryOperator::Plus => apply_numeric_op(left_val, right_val, |l, r| l + r),
276 BinaryOperator::Minus => apply_numeric_op(left_val, right_val, |l, r| l - r),
277 BinaryOperator::Multiply => apply_numeric_op(left_val, right_val, |l, r| l * r),
278 BinaryOperator::Divide => divide_numeric(left_val, right_val),
279 _ => Err(Error::InvalidArgumentError(format!(
280 "Unsupported binary operator in CHECK constraint value expression: {:?}",
281 op
282 ))),
283 }
284 }
285 SqlExpr::Identifier(ident) => {
286 let column_idx = lookup_column_index(name_lookup, &ident.value)?;
287 extract_row_value(row, column_order, column_idx, &ident.value)
288 }
289 SqlExpr::CompoundIdentifier(idents) => {
290 if idents.len() == 2 {
291 let column_name = &idents[0].value;
292 let field_name = &idents[1].value;
293 let column_idx = lookup_column_index(name_lookup, column_name)?;
294 let value = extract_row_value(row, column_order, column_idx, column_name)?;
295 extract_struct_field(value, column_name, field_name)
296 } else if idents.len() == 3 {
297 let column_name = &idents[1].value;
298 let field_name = &idents[2].value;
299 let column_idx = lookup_column_index(name_lookup, column_name)?;
300 let value = extract_row_value(row, column_order, column_idx, column_name)?;
301 extract_struct_field(value, column_name, field_name)
302 } else {
303 Err(Error::InvalidArgumentError(format!(
304 "Unsupported compound identifier in CHECK constraint: {} parts",
305 idents.len()
306 )))
307 }
308 }
309 SqlExpr::Value(val_with_span) => match &val_with_span.value {
310 ast::Value::Number(n, _) => {
311 if let Ok(i) = n.parse::<i64>() {
312 Ok(PlanValue::Integer(i))
313 } else if let Ok(f) = n.parse::<f64>() {
314 Ok(PlanValue::Float(f))
315 } else {
316 Err(Error::InvalidArgumentError(format!(
317 "Invalid number in CHECK constraint: {}",
318 n
319 )))
320 }
321 }
322 ast::Value::SingleQuotedString(s) | ast::Value::DoubleQuotedString(s) => {
323 Ok(PlanValue::String(s.clone()))
324 }
325 ast::Value::Null => Ok(PlanValue::Null),
326 _ => Err(Error::InvalidArgumentError(format!(
327 "Unsupported value type in CHECK constraint: {:?}",
328 val_with_span.value
329 ))),
330 },
331 SqlExpr::Nested(inner) => {
332 evaluate_check_expr_value(inner, row, column_order, columns, name_lookup)
333 }
334 _ => Err(Error::InvalidArgumentError(format!(
335 "Unsupported expression type in CHECK constraint: {:?}",
336 expr
337 ))),
338 }
339}
340
341fn lookup_column_index(
342 name_lookup: &FxHashMap<String, usize>,
343 column_name: &str,
344) -> LlkvResult<usize> {
345 name_lookup
346 .get(&column_name.to_ascii_lowercase())
347 .copied()
348 .ok_or_else(|| {
349 Error::InvalidArgumentError(format!(
350 "Unknown column '{}' in CHECK constraint",
351 column_name
352 ))
353 })
354}
355
356fn extract_row_value(
357 row: &[PlanValue],
358 column_order: &[usize],
359 schema_idx: usize,
360 column_name: &str,
361) -> LlkvResult<PlanValue> {
362 let insert_pos = column_order
363 .iter()
364 .position(|&dest_idx| dest_idx == schema_idx)
365 .ok_or_else(|| {
366 Error::InvalidArgumentError(format!("Column '{}' not provided in INSERT", column_name))
367 })?;
368
369 Ok(row[insert_pos].clone())
370}
371
372fn extract_struct_field(
373 value: PlanValue,
374 column_name: &str,
375 field_name: &str,
376) -> LlkvResult<PlanValue> {
377 match value {
378 PlanValue::Struct(fields) => fields
379 .into_iter()
380 .find(|(name, _)| name.eq_ignore_ascii_case(field_name))
381 .map(|(_, val)| val)
382 .ok_or_else(|| {
383 Error::InvalidArgumentError(format!(
384 "Struct field '{}' not found in column '{}'",
385 field_name, column_name
386 ))
387 }),
388 _ => Err(Error::InvalidArgumentError(format!(
389 "Column '{}' is not a struct, cannot access field '{}'",
390 column_name, field_name
391 ))),
392 }
393}
394
395fn compare_numeric<F>(left: &PlanValue, right: &PlanValue, compare: F) -> LlkvResult<bool>
396where
397 F: Fn(f64, f64) -> bool,
398{
399 if matches!(left, PlanValue::Null) || matches!(right, PlanValue::Null) {
400 return Ok(true);
403 }
404
405 match (left, right) {
406 (PlanValue::Integer(l), PlanValue::Integer(r)) => Ok(compare(*l as f64, *r as f64)),
407 (PlanValue::Float(l), PlanValue::Float(r)) => Ok(compare(*l, *r)),
408 (PlanValue::Integer(l), PlanValue::Float(r)) => Ok(compare(*l as f64, *r)),
409 (PlanValue::Float(l), PlanValue::Integer(r)) => Ok(compare(*l, *r as f64)),
410 _ => Err(Error::InvalidArgumentError(
411 "CHECK constraint comparison requires numeric values".into(),
412 )),
413 }
414}
415
416fn apply_numeric_op(
417 left: PlanValue,
418 right: PlanValue,
419 op: fn(f64, f64) -> f64,
420) -> LlkvResult<PlanValue> {
421 if matches!(left, PlanValue::Null) || matches!(right, PlanValue::Null) {
422 return Ok(PlanValue::Null);
423 }
424
425 match (left, right) {
426 (PlanValue::Integer(l), PlanValue::Integer(r)) => {
427 let result = op(l as f64, r as f64);
428 if result.fract() == 0.0 {
429 Ok(PlanValue::Integer(result as i64))
430 } else {
431 Ok(PlanValue::Float(result))
432 }
433 }
434 (PlanValue::Float(l), PlanValue::Float(r)) => Ok(PlanValue::Float(op(l, r))),
435 (PlanValue::Integer(l), PlanValue::Float(r)) => Ok(PlanValue::Float(op(l as f64, r))),
436 (PlanValue::Float(l), PlanValue::Integer(r)) => Ok(PlanValue::Float(op(l, r as f64))),
437 _ => Err(Error::InvalidArgumentError(
438 "CHECK constraint arithmetic requires numeric values".into(),
439 )),
440 }
441}
442
443fn divide_numeric(left: PlanValue, right: PlanValue) -> LlkvResult<PlanValue> {
444 if matches!(left, PlanValue::Null) || matches!(right, PlanValue::Null) {
445 return Ok(PlanValue::Null);
446 }
447
448 match (left, right) {
449 (PlanValue::Integer(l), PlanValue::Integer(r)) => {
450 if r == 0 {
451 Err(Error::InvalidArgumentError(
452 "Division by zero in CHECK constraint".into(),
453 ))
454 } else {
455 Ok(PlanValue::Integer(l / r))
456 }
457 }
458 (PlanValue::Float(l), PlanValue::Float(r)) => {
459 if r == 0.0 {
460 Err(Error::InvalidArgumentError(
461 "Division by zero in CHECK constraint".into(),
462 ))
463 } else {
464 Ok(PlanValue::Float(l / r))
465 }
466 }
467 (PlanValue::Integer(l), PlanValue::Float(r)) => {
468 if r == 0.0 {
469 Err(Error::InvalidArgumentError(
470 "Division by zero in CHECK constraint".into(),
471 ))
472 } else {
473 Ok(PlanValue::Float(l as f64 / r))
474 }
475 }
476 (PlanValue::Float(l), PlanValue::Integer(r)) => {
477 if r == 0 {
478 Err(Error::InvalidArgumentError(
479 "Division by zero in CHECK constraint".into(),
480 ))
481 } else {
482 Ok(PlanValue::Float(l / r as f64))
483 }
484 }
485 _ => Err(Error::InvalidArgumentError(
486 "CHECK constraint / operator requires numeric values".into(),
487 )),
488 }
489}
490
491#[derive(Clone, Debug)]
497pub struct ForeignKeyColumn {
498 pub name: String,
499 pub data_type: DataType,
500 pub nullable: bool,
501 pub primary_key: bool,
502 pub unique: bool,
503 pub field_id: FieldId,
504}
505
506#[derive(Clone, Debug)]
508pub struct ForeignKeyTableInfo {
509 pub display_name: String,
510 pub canonical_name: String,
511 pub table_id: TableId,
512 pub columns: Vec<ForeignKeyColumn>,
513}
514
515#[derive(Clone, Debug)]
517pub struct ValidatedForeignKey {
518 pub name: Option<String>,
519 pub referencing_indices: Vec<usize>,
520 pub referencing_field_ids: Vec<FieldId>,
521 pub referencing_column_names: Vec<String>,
522 pub referenced_table_id: TableId,
523 pub referenced_table_display: String,
524 pub referenced_table_canonical: String,
525 pub referenced_field_ids: Vec<FieldId>,
526 pub referenced_column_names: Vec<String>,
527 pub on_delete: ForeignKeyAction,
528 pub on_update: ForeignKeyAction,
529}
530
531pub fn validate_foreign_keys<F>(
533 referencing_table: &ForeignKeyTableInfo,
534 specs: &[ForeignKeySpec],
535 mut lookup_table: F,
536) -> LlkvResult<Vec<ValidatedForeignKey>>
537where
538 F: FnMut(&str) -> LlkvResult<ForeignKeyTableInfo>,
539{
540 if specs.is_empty() {
541 return Ok(Vec::new());
542 }
543
544 let mut referencing_lookup: FxHashMap<String, (usize, &ForeignKeyColumn)> =
545 FxHashMap::default();
546 for (idx, column) in referencing_table.columns.iter().enumerate() {
547 referencing_lookup.insert(column.name.to_ascii_lowercase(), (idx, column));
548 }
549
550 let mut results = Vec::with_capacity(specs.len());
551
552 for spec in specs {
553 if spec.columns.is_empty() {
554 return Err(Error::InvalidArgumentError(
555 "FOREIGN KEY requires at least one referencing column".into(),
556 ));
557 }
558
559 let mut seen_referencing = FxHashSet::default();
560 let mut referencing_indices = Vec::with_capacity(spec.columns.len());
561 let mut referencing_field_ids = Vec::with_capacity(spec.columns.len());
562 let mut referencing_column_defs = Vec::with_capacity(spec.columns.len());
563 let mut referencing_column_names = Vec::with_capacity(spec.columns.len());
564
565 for column_name in &spec.columns {
566 let normalized = column_name.to_ascii_lowercase();
567 if !seen_referencing.insert(normalized.clone()) {
568 return Err(Error::InvalidArgumentError(format!(
569 "duplicate column '{}' in FOREIGN KEY constraint",
570 column_name
571 )));
572 }
573
574 let (idx, column) = referencing_lookup.get(&normalized).ok_or_else(|| {
575 Error::InvalidArgumentError(format!(
576 "unknown column '{}' in FOREIGN KEY constraint",
577 column_name
578 ))
579 })?;
580
581 referencing_indices.push(*idx);
582 referencing_field_ids.push(column.field_id);
583 referencing_column_defs.push((*column).clone());
584 referencing_column_names.push(column.name.clone());
585 }
586
587 let referenced_table_info = lookup_table(&spec.referenced_table)?;
588
589 let referenced_columns = if spec.referenced_columns.is_empty() {
590 referenced_table_info
591 .columns
592 .iter()
593 .filter(|col| col.primary_key)
594 .map(|col| col.name.clone())
595 .collect::<Vec<_>>()
596 } else {
597 spec.referenced_columns.clone()
598 };
599
600 if referenced_columns.is_empty() {
601 return Err(Error::InvalidArgumentError(format!(
602 "there is no primary key for referenced table '{}'",
603 spec.referenced_table
604 )));
605 }
606
607 if spec.columns.len() != referenced_columns.len() {
608 return Err(Error::InvalidArgumentError(format!(
609 "number of referencing columns ({}) does not match number of referenced columns ({})",
610 spec.columns.len(),
611 referenced_columns.len()
612 )));
613 }
614
615 let mut seen_referenced = FxHashSet::default();
616 let mut referenced_lookup: FxHashMap<String, &ForeignKeyColumn> = FxHashMap::default();
617 for column in &referenced_table_info.columns {
618 referenced_lookup.insert(column.name.to_ascii_lowercase(), column);
619 }
620
621 let mut referenced_field_ids = Vec::with_capacity(referenced_columns.len());
622 let mut referenced_column_defs = Vec::with_capacity(referenced_columns.len());
623 let mut referenced_column_names = Vec::with_capacity(referenced_columns.len());
624
625 for column_name in referenced_columns.iter() {
626 let normalized = column_name.to_ascii_lowercase();
627 if !seen_referenced.insert(normalized.clone()) {
628 return Err(Error::InvalidArgumentError(format!(
629 "duplicate referenced column '{}' in FOREIGN KEY constraint",
630 column_name
631 )));
632 }
633
634 let column = referenced_lookup.get(&normalized).ok_or_else(|| {
635 Error::InvalidArgumentError(format!(
636 "unknown referenced column '{}' in table '{}'",
637 column_name, referenced_table_info.display_name
638 ))
639 })?;
640
641 if !column.primary_key && !column.unique {
642 return Err(Error::InvalidArgumentError(format!(
643 "FOREIGN KEY references column '{}' in table '{}' that is not UNIQUE or PRIMARY KEY",
644 column_name, referenced_table_info.display_name
645 )));
646 }
647
648 referenced_field_ids.push(column.field_id);
649 referenced_column_defs.push((*column).clone());
650 referenced_column_names.push(column.name.clone());
651 }
652
653 for (child_col, parent_col) in referencing_column_defs
654 .iter()
655 .zip(referenced_column_defs.iter())
656 {
657 if child_col.data_type != parent_col.data_type {
658 return Err(Error::InvalidArgumentError(format!(
659 "FOREIGN KEY column '{}' type {:?} does not match referenced column '{}' type {:?}",
660 child_col.name, child_col.data_type, parent_col.name, parent_col.data_type
661 )));
662 }
663
664 }
666
667 results.push(ValidatedForeignKey {
668 name: spec.name.clone(),
669 referencing_indices,
670 referencing_field_ids,
671 referencing_column_names,
672 referenced_table_id: referenced_table_info.table_id,
673 referenced_table_display: referenced_table_info.display_name.clone(),
674 referenced_table_canonical: referenced_table_info.canonical_name.clone(),
675 referenced_field_ids,
676 referenced_column_names,
677 on_delete: map_plan_action(spec.on_delete.clone()),
678 on_update: map_plan_action(spec.on_update.clone()),
679 });
680 }
681
682 Ok(results)
683}
684
685fn map_plan_action(action: PlanForeignKeyAction) -> ForeignKeyAction {
686 match action {
687 PlanForeignKeyAction::NoAction => ForeignKeyAction::NoAction,
688 PlanForeignKeyAction::Restrict => ForeignKeyAction::Restrict,
689 }
690}
691
692pub fn ensure_single_column_unique(
698 existing_values: &[PlanValue],
699 new_values: &[PlanValue],
700 column_name: &str,
701) -> LlkvResult<()> {
702 let mut seen: FxHashSet<UniqueKey> = FxHashSet::default();
703
704 for value in existing_values {
705 if let Some(key) = unique_key_component(value, column_name)?
706 && !seen.insert(key.clone())
707 {
708 return Err(Error::ConstraintError(format!(
709 "constraint violation on column '{}'",
710 column_name
711 )));
712 }
713 }
714
715 for value in new_values {
716 if let Some(key) = unique_key_component(value, column_name)?
717 && !seen.insert(key.clone())
718 {
719 return Err(Error::ConstraintError(format!(
720 "constraint violation on column '{}'",
721 column_name
722 )));
723 }
724 }
725
726 Ok(())
727}
728
729pub fn ensure_primary_key(
731 existing_rows: &[Vec<PlanValue>],
732 new_rows: &[Vec<PlanValue>],
733 column_names: &[String],
734) -> LlkvResult<()> {
735 let pk_label = if column_names.len() == 1 {
736 "column"
737 } else {
738 "columns"
739 };
740 let pk_display = if column_names.len() == 1 {
741 column_names[0].clone()
742 } else {
743 column_names.join(", ")
744 };
745
746 let mut seen: FxHashSet<UniqueKey> = FxHashSet::default();
747
748 for row_values in existing_rows {
749 if row_values.len() != column_names.len() {
750 continue;
751 }
752
753 let key = build_composite_unique_key(row_values, column_names)?;
754 let key = key.ok_or_else(|| {
755 Error::ConstraintError(format!(
756 "constraint failed: NOT NULL constraint failed for PRIMARY KEY {pk_label} '{pk_display}'"
757 ))
758 })?;
759
760 if !seen.insert(key.clone()) {
761 return Err(Error::ConstraintError(format!(
762 "Duplicate key violates primary key constraint on {pk_label} '{pk_display}' (PRIMARY KEY or UNIQUE constraint violation)"
763 )));
764 }
765 }
766
767 for row_values in new_rows {
768 if row_values.len() != column_names.len() {
769 continue;
770 }
771
772 let key = build_composite_unique_key(row_values, column_names)?;
773 let key = key.ok_or_else(|| {
774 Error::ConstraintError(format!(
775 "constraint failed: NOT NULL constraint failed for PRIMARY KEY {pk_label} '{pk_display}'"
776 ))
777 })?;
778
779 if !seen.insert(key.clone()) {
780 return Err(Error::ConstraintError(format!(
781 "Duplicate key violates primary key constraint on {pk_label} '{pk_display}' (PRIMARY KEY or UNIQUE constraint violation)"
782 )));
783 }
784 }
785
786 Ok(())
787}
788
789pub fn validate_foreign_key_rows(
791 constraint_name: Option<&str>,
792 referencing_table: &str,
793 referenced_table: &str,
794 referenced_column_names: &[String],
795 parent_keys: &[Vec<PlanValue>],
796 candidate_keys: &[Vec<PlanValue>],
797) -> LlkvResult<()> {
798 if parent_keys.is_empty() {
799 for key in candidate_keys {
801 if key.iter().all(|value| !matches!(value, PlanValue::Null)) {
802 let constraint_label = constraint_name.unwrap_or("FOREIGN KEY");
803 let referenced_columns = if referenced_column_names.is_empty() {
804 String::from("<unknown>")
805 } else {
806 referenced_column_names.join(", ")
807 };
808 return Err(Error::ConstraintError(format!(
809 "Violates foreign key constraint '{}' on table '{}' referencing '{}' (columns: {}) - does not exist in the referenced table",
810 constraint_label, referencing_table, referenced_table, referenced_columns,
811 )));
812 }
813 }
814 return Ok(());
815 }
816
817 for key in candidate_keys {
818 if key.iter().any(|value| matches!(value, PlanValue::Null)) {
819 continue;
820 }
821
822 if parent_keys.iter().any(|existing| existing == key) {
823 continue;
824 }
825
826 let constraint_label = constraint_name.unwrap_or("FOREIGN KEY");
827 let referenced_columns = if referenced_column_names.is_empty() {
828 String::from("<unknown>")
829 } else {
830 referenced_column_names.join(", ")
831 };
832
833 return Err(Error::ConstraintError(format!(
834 "Violates foreign key constraint '{}' on table '{}' referencing '{}' (columns: {}) - does not exist in the referenced table",
835 constraint_label, referencing_table, referenced_table, referenced_columns,
836 )));
837 }
838
839 Ok(())
840}