llkv_table/constraints/
service.rs

1//! Constraint service that centralises runtime-facing constraint validation helpers.
2//! Currently focuses on foreign key enforcement for INSERT operations and will
3//! gradually expand to cover additional constraint workflows.
4
5#![forbid(unsafe_code)]
6
7use super::types::ForeignKeyAction;
8use super::validation::validate_foreign_key_rows;
9use super::validation::{
10    ConstraintColumnInfo, UniqueKey, build_composite_unique_key, ensure_multi_column_unique,
11    ensure_primary_key, ensure_single_column_unique, validate_check_constraints,
12};
13use crate::catalog::TableCatalog;
14use crate::metadata::MetadataManager;
15use crate::types::{FieldId, RowId, TableId};
16use crate::view::ForeignKeyView;
17use llkv_plan::PlanValue;
18use llkv_result::{Error, Result as LlkvResult};
19use llkv_storage::pager::Pager;
20use rustc_hash::{FxHashMap, FxHashSet};
21use simd_r_drive_entry_handle::EntryHandle;
22use std::sync::Arc;
23
24/// Column metadata required to validate NOT NULL and CHECK constraints during inserts.
25#[derive(Clone, Debug)]
26pub struct InsertColumnConstraint {
27    pub schema_index: usize,
28    pub column: ConstraintColumnInfo,
29}
30
31/// Descriptor for a single-column UNIQUE constraint.
32#[derive(Clone, Debug)]
33pub struct InsertUniqueColumn {
34    pub schema_index: usize,
35    pub field_id: FieldId,
36    pub name: String,
37}
38
39/// Descriptor for composite UNIQUE or PRIMARY KEY constraints.
40#[derive(Clone, Debug)]
41pub struct InsertMultiColumnUnique {
42    pub schema_indices: Vec<usize>,
43    pub field_ids: Vec<FieldId>,
44    pub column_names: Vec<String>,
45}
46
47/// Callback payload describing what parent rows need to be fetched for validation.
48pub struct ForeignKeyRowFetch<'a> {
49    pub referenced_table_id: TableId,
50    pub referenced_table_canonical: &'a str,
51    pub referenced_field_ids: &'a [FieldId],
52}
53
54/// Context for collecting parent row values involved in a DELETE operation.
55pub struct ForeignKeyParentRowsFetch<'a> {
56    pub referenced_table_id: TableId,
57    pub referenced_row_ids: &'a [RowId],
58    pub referenced_field_ids: &'a [FieldId],
59}
60
61/// Context for fetching visible child rows that might reference deleted parents.
62pub struct ForeignKeyChildRowsFetch<'a> {
63    pub referencing_table_id: TableId,
64    pub referencing_table_canonical: &'a str,
65    pub referencing_field_ids: &'a [FieldId],
66}
67
68/// High-level constraint service API intended for runtime consumers.
69#[derive(Clone)]
70pub struct ConstraintService<P>
71where
72    P: Pager<Blob = EntryHandle> + Send + Sync,
73{
74    metadata: Arc<MetadataManager<P>>,
75    catalog: Arc<TableCatalog>,
76}
77
78impl<P> ConstraintService<P>
79where
80    P: Pager<Blob = EntryHandle> + Send + Sync,
81{
82    /// Create a new constraint validation service.
83    pub fn new(metadata: Arc<MetadataManager<P>>, catalog: Arc<TableCatalog>) -> Self {
84        Self { metadata, catalog }
85    }
86
87    /// Validate that incoming INSERT rows satisfy the table's foreign key constraints.
88    pub fn validate_insert_foreign_keys<F>(
89        &self,
90        referencing_table_id: TableId,
91        schema_field_ids: &[FieldId],
92        column_order: &[usize],
93        rows: &[Vec<PlanValue>],
94        mut fetch_parent_rows: F,
95    ) -> LlkvResult<()>
96    where
97        F: FnMut(ForeignKeyRowFetch<'_>) -> LlkvResult<Vec<Vec<PlanValue>>>,
98    {
99        if rows.is_empty() {
100            return Ok(());
101        }
102
103        let details = self
104            .metadata
105            .foreign_key_views(self.catalog.as_ref(), referencing_table_id)?;
106
107        if details.is_empty() {
108            return Ok(());
109        }
110
111        let field_lookup = build_field_lookup(schema_field_ids);
112        let mut table_to_row_index: Vec<Option<usize>> = vec![None; schema_field_ids.len()];
113        for (row_pos, &schema_idx) in column_order.iter().enumerate() {
114            if let Some(slot) = table_to_row_index.get_mut(schema_idx) {
115                *slot = Some(row_pos);
116            }
117        }
118
119        for detail in &details {
120            if detail.referencing_field_ids.is_empty() {
121                continue;
122            }
123
124            let referencing_positions = referencing_row_positions(
125                detail,
126                &field_lookup,
127                &table_to_row_index,
128                referencing_table_id,
129            )?;
130
131            let parent_rows = fetch_parent_rows(ForeignKeyRowFetch {
132                referenced_table_id: detail.referenced_table_id,
133                referenced_table_canonical: &detail.referenced_table_canonical,
134                referenced_field_ids: &detail.referenced_field_ids,
135            })?;
136
137            let parent_keys = canonical_parent_keys(detail, parent_rows);
138            let candidate_keys = candidate_child_keys(&referencing_positions, rows)?;
139
140            validate_foreign_key_rows(
141                detail.constraint_name.as_deref(),
142                &detail.referencing_table_display,
143                &detail.referenced_table_display,
144                &detail.referenced_column_names,
145                &parent_keys,
146                &candidate_keys,
147            )?;
148        }
149
150        Ok(())
151    }
152
153    /// Validate INSERT rows against all table constraints including primary keys, unique constraints,
154    /// and CHECK expressions. This is a comprehensive validation that combines uniqueness checks
155    /// (both single-column and multi-column) with row-level CHECK constraint evaluation.
156    #[allow(clippy::too_many_arguments)]
157    pub fn validate_insert_constraints<FSingle, FMulti>(
158        &self,
159        schema_field_ids: &[FieldId],
160        column_constraints: &[InsertColumnConstraint],
161        unique_columns: &[InsertUniqueColumn],
162        multi_column_uniques: &[InsertMultiColumnUnique],
163        primary_key: Option<&InsertMultiColumnUnique>,
164        column_order: &[usize],
165        rows: &[Vec<PlanValue>],
166        mut fetch_column_values: FSingle,
167        mut fetch_multi_column_rows: FMulti,
168    ) -> LlkvResult<()>
169    where
170        FSingle: FnMut(FieldId) -> LlkvResult<Vec<PlanValue>>,
171        FMulti: FnMut(&[FieldId]) -> LlkvResult<Vec<Vec<PlanValue>>>,
172    {
173        if rows.is_empty() {
174            return Ok(());
175        }
176
177        let schema_to_row_index = build_schema_to_row_index(schema_field_ids.len(), column_order)?;
178        validate_row_constraints_with_mapping(
179            column_constraints,
180            rows,
181            &schema_to_row_index,
182            column_order,
183        )?;
184
185        for unique in unique_columns {
186            let Some(row_pos) = schema_to_row_index
187                .get(unique.schema_index)
188                .and_then(|opt| *opt)
189            else {
190                continue;
191            };
192
193            let existing_values = fetch_column_values(unique.field_id)?;
194            let mut new_values: Vec<PlanValue> = Vec::with_capacity(rows.len());
195            for row in rows {
196                let value = row.get(row_pos).cloned().unwrap_or(PlanValue::Null);
197                new_values.push(value);
198            }
199
200            ensure_single_column_unique(&existing_values, &new_values, &unique.name)?;
201        }
202
203        for constraint in multi_column_uniques {
204            if constraint.schema_indices.is_empty() {
205                continue;
206            }
207
208            let existing_rows = fetch_multi_column_rows(&constraint.field_ids)?;
209            let new_rows = collect_row_sets(rows, &schema_to_row_index, &constraint.schema_indices);
210            ensure_multi_column_unique(&existing_rows, &new_rows, &constraint.column_names)?;
211        }
212
213        if let Some(pk) = primary_key
214            && !pk.schema_indices.is_empty()
215        {
216            let existing_rows = fetch_multi_column_rows(&pk.field_ids)?;
217            let new_rows = collect_row_sets(rows, &schema_to_row_index, &pk.schema_indices);
218            ensure_primary_key(&existing_rows, &new_rows, &pk.column_names)?;
219        }
220
221        Ok(())
222    }
223
224    /// Validate rows against CHECK constraints. This method evaluates CHECK expressions
225    /// for each row, ensuring they satisfy the table's row-level constraint rules.
226    pub fn validate_row_level_constraints(
227        &self,
228        schema_field_ids: &[FieldId],
229        column_constraints: &[InsertColumnConstraint],
230        column_order: &[usize],
231        rows: &[Vec<PlanValue>],
232    ) -> LlkvResult<()> {
233        if rows.is_empty() {
234            return Ok(());
235        }
236
237        let schema_to_row_index = build_schema_to_row_index(schema_field_ids.len(), column_order)?;
238        validate_row_constraints_with_mapping(
239            column_constraints,
240            rows,
241            &schema_to_row_index,
242            column_order,
243        )
244    }
245
246    /// Validate that INSERT rows satisfy the primary key constraint by checking for duplicates
247    /// against both existing rows in the table and within the new batch.
248    pub fn validate_primary_key_rows<F>(
249        &self,
250        schema_field_ids: &[FieldId],
251        primary_key: &InsertMultiColumnUnique,
252        column_order: &[usize],
253        rows: &[Vec<PlanValue>],
254        mut fetch_multi_column_rows: F,
255    ) -> LlkvResult<()>
256    where
257        F: FnMut(&[FieldId]) -> LlkvResult<Vec<Vec<PlanValue>>>,
258    {
259        if rows.is_empty() || primary_key.schema_indices.is_empty() {
260            return Ok(());
261        }
262
263        let schema_to_row_index = build_schema_to_row_index(schema_field_ids.len(), column_order)?;
264        let existing_rows = fetch_multi_column_rows(&primary_key.field_ids)?;
265        let new_rows = collect_row_sets(rows, &schema_to_row_index, &primary_key.schema_indices);
266        ensure_primary_key(&existing_rows, &new_rows, &primary_key.column_names)
267    }
268
269    /// Validate UPDATE operations that modify primary key columns. Ensures that updated
270    /// primary key values don't conflict with existing rows (excluding the original row being updated).
271    pub fn validate_update_primary_keys<F>(
272        &self,
273        schema_field_ids: &[FieldId],
274        primary_key: &InsertMultiColumnUnique,
275        column_order: &[usize],
276        rows: &[Vec<PlanValue>],
277        original_keys: &[Option<UniqueKey>],
278        mut fetch_multi_column_rows: F,
279    ) -> LlkvResult<()>
280    where
281        F: FnMut(&[FieldId]) -> LlkvResult<Vec<Vec<PlanValue>>>,
282    {
283        if rows.is_empty() || primary_key.schema_indices.is_empty() {
284            return Ok(());
285        }
286
287        if original_keys.len() != rows.len() {
288            return Err(Error::Internal(
289                "primary key original value count does not match row count".into(),
290            ));
291        }
292
293        let schema_to_row_index = build_schema_to_row_index(schema_field_ids.len(), column_order)?;
294
295        let mut existing_rows = fetch_multi_column_rows(&primary_key.field_ids)?;
296        let mut existing_keys: FxHashSet<UniqueKey> = FxHashSet::default();
297        for row_values in existing_rows.drain(..) {
298            if let Some(key) = build_composite_unique_key(&row_values, &primary_key.column_names)? {
299                existing_keys.insert(key);
300            }
301        }
302
303        for key in original_keys.iter().flatten() {
304            existing_keys.remove(key);
305        }
306
307        let (pk_label, pk_display) = primary_key_context(&primary_key.column_names);
308        let mut new_seen: FxHashSet<UniqueKey> = FxHashSet::default();
309        let new_row_sets =
310            collect_row_sets(rows, &schema_to_row_index, &primary_key.schema_indices);
311
312        for values in new_row_sets {
313            let key = build_composite_unique_key(&values, &primary_key.column_names)?;
314            let key = key.ok_or_else(|| {
315                Error::ConstraintError(format!(
316                    "constraint failed: NOT NULL constraint failed for PRIMARY KEY {pk_label} '{pk_display}'"
317                ))
318            })?;
319
320            if existing_keys.contains(&key) {
321                return Err(Error::ConstraintError(format!(
322                    "Duplicate key violates primary key constraint on {pk_label} '{}' (PRIMARY KEY or UNIQUE constraint violation)",
323                    pk_display
324                )));
325            }
326
327            if !new_seen.insert(key.clone()) {
328                return Err(Error::ConstraintError(format!(
329                    "Duplicate key violates primary key constraint on {pk_label} '{}' (PRIMARY KEY or UNIQUE constraint violation)",
330                    pk_display
331                )));
332            }
333
334            existing_keys.insert(key);
335        }
336
337        Ok(())
338    }
339
340    /// Validate that deleting the given rows will not violate foreign key constraints.
341    pub fn validate_delete_foreign_keys<FParents, FChildren>(
342        &self,
343        referenced_table_id: TableId,
344        referenced_row_ids: &[RowId],
345        mut fetch_parent_rows: FParents,
346        mut fetch_child_rows: FChildren,
347    ) -> LlkvResult<()>
348    where
349        FParents: FnMut(ForeignKeyParentRowsFetch<'_>) -> LlkvResult<Vec<Vec<PlanValue>>>,
350        FChildren: FnMut(ForeignKeyChildRowsFetch<'_>) -> LlkvResult<Vec<(RowId, Vec<PlanValue>)>>,
351    {
352        if referenced_row_ids.is_empty() {
353            return Ok(());
354        }
355
356        let referencing = self
357            .metadata
358            .foreign_keys_referencing(referenced_table_id)?;
359        if referencing.is_empty() {
360            return Ok(());
361        }
362
363        let deleting_row_ids: FxHashSet<RowId> = referenced_row_ids.iter().copied().collect();
364
365        for (child_table_id, constraint_id) in referencing {
366            let details = self
367                .metadata
368                .foreign_key_views(self.catalog.as_ref(), child_table_id)?;
369
370            let Some(detail) = details
371                .into_iter()
372                .find(|detail| detail.constraint_id == constraint_id)
373            else {
374                continue;
375            };
376
377            if detail.referenced_field_ids.is_empty() || detail.referencing_field_ids.is_empty() {
378                continue;
379            }
380
381            let parent_rows = fetch_parent_rows(ForeignKeyParentRowsFetch {
382                referenced_table_id,
383                referenced_row_ids,
384                referenced_field_ids: &detail.referenced_field_ids,
385            })?;
386
387            let parent_keys = canonical_parent_keys(&detail, parent_rows);
388            if parent_keys.is_empty() {
389                continue;
390            }
391
392            let child_rows = fetch_child_rows(ForeignKeyChildRowsFetch {
393                referencing_table_id: detail.referencing_table_id,
394                referencing_table_canonical: &detail.referencing_table_canonical,
395                referencing_field_ids: &detail.referencing_field_ids,
396            })?;
397
398            if child_rows.is_empty() {
399                continue;
400            }
401
402            for (child_row_id, values) in child_rows {
403                if values.len() != detail.referencing_field_ids.len() {
404                    continue;
405                }
406
407                if values.iter().any(|value| matches!(value, PlanValue::Null)) {
408                    continue;
409                }
410
411                if parent_keys.iter().all(|key| key != &values) {
412                    continue;
413                }
414
415                if detail.referencing_table_id == detail.referenced_table_id
416                    && deleting_row_ids.contains(&child_row_id)
417                {
418                    continue;
419                }
420
421                let constraint_label = detail.constraint_name.as_deref().unwrap_or("FOREIGN KEY");
422                match detail.on_delete {
423                    ForeignKeyAction::NoAction | ForeignKeyAction::Restrict => {
424                        return Err(Error::ConstraintError(format!(
425                            "Violates foreign key constraint '{}' on table '{}' referencing '{}' - row is still referenced by a foreign key in a different table",
426                            constraint_label,
427                            detail.referencing_table_display,
428                            detail.referenced_table_display,
429                        )));
430                    }
431                }
432            }
433        }
434
435        Ok(())
436    }
437
438    /// Validate that updating the given rows will not violate foreign key constraints.
439    ///
440    /// This checks if any columns being updated are referenced by foreign keys, and whether
441    /// the OLD values are still being referenced by child tables.
442    pub fn validate_update_foreign_keys<FParents, FChildren>(
443        &self,
444        referenced_table_id: TableId,
445        referenced_row_ids: &[RowId],
446        updated_field_ids: &[FieldId],
447        mut fetch_parent_rows: FParents,
448        mut fetch_child_rows: FChildren,
449    ) -> LlkvResult<()>
450    where
451        FParents: FnMut(ForeignKeyParentRowsFetch<'_>) -> LlkvResult<Vec<Vec<PlanValue>>>,
452        FChildren: FnMut(ForeignKeyChildRowsFetch<'_>) -> LlkvResult<Vec<(RowId, Vec<PlanValue>)>>,
453    {
454        if referenced_row_ids.is_empty() || updated_field_ids.is_empty() {
455            return Ok(());
456        }
457
458        let referencing = self
459            .metadata
460            .foreign_keys_referencing(referenced_table_id)?;
461        if referencing.is_empty() {
462            return Ok(());
463        }
464
465        for (child_table_id, constraint_id) in referencing {
466            let details = self
467                .metadata
468                .foreign_key_views(self.catalog.as_ref(), child_table_id)?;
469
470            let Some(detail) = details
471                .into_iter()
472                .find(|detail| detail.constraint_id == constraint_id)
473            else {
474                continue;
475            };
476
477            if detail.referenced_field_ids.is_empty() || detail.referencing_field_ids.is_empty() {
478                continue;
479            }
480
481            // Check if any of the columns being updated are part of this foreign key
482            let is_referenced_column_updated = detail
483                .referenced_field_ids
484                .iter()
485                .any(|fid| updated_field_ids.contains(fid));
486
487            if !is_referenced_column_updated {
488                // This FK doesn't reference any columns being updated, skip
489                continue;
490            }
491
492            // Fetch the OLD values from the parent table (before update)
493            let parent_rows = fetch_parent_rows(ForeignKeyParentRowsFetch {
494                referenced_table_id,
495                referenced_row_ids,
496                referenced_field_ids: &detail.referenced_field_ids,
497            })?;
498
499            let parent_keys = canonical_parent_keys(&detail, parent_rows);
500            if parent_keys.is_empty() {
501                continue;
502            }
503
504            // Fetch all rows from child table that reference this parent
505            let child_rows = fetch_child_rows(ForeignKeyChildRowsFetch {
506                referencing_table_id: detail.referencing_table_id,
507                referencing_table_canonical: &detail.referencing_table_canonical,
508                referencing_field_ids: &detail.referencing_field_ids,
509            })?;
510
511            if child_rows.is_empty() {
512                continue;
513            }
514
515            // Check if any child rows reference the OLD values
516            for (_child_row_id, values) in child_rows {
517                if values.len() != detail.referencing_field_ids.len() {
518                    continue;
519                }
520
521                if values.iter().any(|value| matches!(value, PlanValue::Null)) {
522                    continue;
523                }
524
525                // If a child row references one of the parent keys being updated, fail
526                if parent_keys.iter().any(|key| key == &values) {
527                    let constraint_label =
528                        detail.constraint_name.as_deref().unwrap_or("FOREIGN KEY");
529                    return Err(Error::ConstraintError(format!(
530                        "Violates foreign key constraint '{}' on table '{}' referencing '{}' - cannot update referenced column while foreign key exists",
531                        constraint_label,
532                        detail.referencing_table_display,
533                        detail.referenced_table_display,
534                    )));
535                }
536            }
537        }
538
539        Ok(())
540    }
541
542    /// Return the set of foreign keys referencing the provided table.
543    pub fn referencing_foreign_keys(
544        &self,
545        referenced_table_id: TableId,
546    ) -> LlkvResult<Vec<ForeignKeyView>> {
547        let referencing = self
548            .metadata
549            .foreign_keys_referencing(referenced_table_id)?;
550
551        if referencing.is_empty() {
552            return Ok(Vec::new());
553        }
554
555        let mut details_out = Vec::new();
556        for (child_table_id, constraint_id) in referencing {
557            let details = match self
558                .metadata
559                .foreign_key_views(self.catalog.as_ref(), child_table_id)
560            {
561                Ok(details) => details,
562                Err(Error::InvalidArgumentError(_)) | Err(Error::CatalogError(_)) => continue,
563                Err(err) => return Err(err),
564            };
565
566            if let Some(detail) = details
567                .into_iter()
568                .find(|detail| detail.constraint_id == constraint_id)
569            {
570                details_out.push(detail);
571            }
572        }
573
574        Ok(details_out)
575    }
576}
577
578fn build_field_lookup(schema_field_ids: &[FieldId]) -> FxHashMap<FieldId, usize> {
579    let mut lookup = FxHashMap::default();
580    for (idx, field_id) in schema_field_ids.iter().copied().enumerate() {
581        lookup.insert(field_id, idx);
582    }
583    lookup
584}
585
586fn validate_row_constraints_with_mapping(
587    column_constraints: &[InsertColumnConstraint],
588    rows: &[Vec<PlanValue>],
589    schema_to_row_index: &[Option<usize>],
590    column_order: &[usize],
591) -> LlkvResult<()> {
592    for constraint in column_constraints {
593        if constraint.column.nullable {
594            continue;
595        }
596
597        let Some(row_pos) = schema_to_row_index
598            .get(constraint.schema_index)
599            .and_then(|opt| *opt)
600        else {
601            return Err(Error::ConstraintError(format!(
602                "NOT NULL column '{}' missing from INSERT/UPDATE",
603                constraint.column.name
604            )));
605        };
606
607        for row in rows {
608            if matches!(row.get(row_pos), Some(PlanValue::Null)) {
609                return Err(Error::ConstraintError(format!(
610                    "NOT NULL constraint failed for column '{}'",
611                    constraint.column.name
612                )));
613            }
614        }
615    }
616
617    let check_columns: Vec<ConstraintColumnInfo> = column_constraints
618        .iter()
619        .map(|constraint| constraint.column.clone())
620        .collect();
621    validate_check_constraints(check_columns.as_slice(), rows, column_order)?;
622    Ok(())
623}
624
625fn build_schema_to_row_index(
626    schema_len: usize,
627    column_order: &[usize],
628) -> LlkvResult<Vec<Option<usize>>> {
629    let mut schema_to_row_index: Vec<Option<usize>> = vec![None; schema_len];
630    for (row_pos, &schema_idx) in column_order.iter().enumerate() {
631        if schema_idx >= schema_len {
632            return Err(Error::Internal(format!(
633                "column index {} out of bounds for schema (len={})",
634                schema_idx, schema_len
635            )));
636        }
637        schema_to_row_index[schema_idx] = Some(row_pos);
638    }
639    Ok(schema_to_row_index)
640}
641
642fn primary_key_context(column_names: &[String]) -> (&'static str, String) {
643    if column_names.len() == 1 {
644        ("column", column_names[0].clone())
645    } else {
646        ("columns", column_names.join(", "))
647    }
648}
649
650fn collect_row_sets(
651    rows: &[Vec<PlanValue>],
652    schema_to_row_index: &[Option<usize>],
653    schema_indices: &[usize],
654) -> Vec<Vec<PlanValue>> {
655    rows.iter()
656        .map(|row| {
657            schema_indices
658                .iter()
659                .map(|&schema_idx| {
660                    schema_to_row_index
661                        .get(schema_idx)
662                        .and_then(|opt| {
663                            opt.map(|row_pos| row.get(row_pos).cloned().unwrap_or(PlanValue::Null))
664                        })
665                        .unwrap_or(PlanValue::Null)
666                })
667                .collect()
668        })
669        .collect()
670}
671
672fn referencing_row_positions(
673    detail: &ForeignKeyView,
674    lookup: &FxHashMap<FieldId, usize>,
675    table_to_row_index: &[Option<usize>],
676    table_id: TableId,
677) -> LlkvResult<Vec<usize>> {
678    let mut positions = Vec::with_capacity(detail.referencing_field_ids.len());
679
680    for (idx, field_id) in detail.referencing_field_ids.iter().cloned().enumerate() {
681        let schema_index = lookup.get(&field_id).cloned().ok_or_else(|| {
682            Error::Internal(format!(
683                "referencing field id {} not found in table '{}' (table_id={})",
684                field_id, detail.referencing_table_display, table_id
685            ))
686        })?;
687
688        let position = table_to_row_index
689            .get(schema_index)
690            .and_then(|value| *value)
691            .ok_or_else(|| {
692                let column_name = detail
693                    .referencing_column_names
694                    .get(idx)
695                    .cloned()
696                    .unwrap_or_else(|| schema_index.to_string());
697                Error::InvalidArgumentError(format!(
698                    "FOREIGN KEY column '{}' missing from INSERT statement",
699                    column_name
700                ))
701            })?;
702
703        positions.push(position);
704    }
705
706    Ok(positions)
707}
708
709fn canonical_parent_keys(
710    detail: &ForeignKeyView,
711    parent_rows: Vec<Vec<PlanValue>>,
712) -> Vec<Vec<PlanValue>> {
713    parent_rows
714        .into_iter()
715        .filter(|values| values.len() == detail.referenced_field_ids.len())
716        .filter(|values| !values.iter().any(|value| matches!(value, PlanValue::Null)))
717        .collect()
718}
719
720fn candidate_child_keys(
721    positions: &[usize],
722    rows: &[Vec<PlanValue>],
723) -> LlkvResult<Vec<Vec<PlanValue>>> {
724    let mut keys = Vec::new();
725
726    for row in rows {
727        let mut key: Vec<PlanValue> = Vec::with_capacity(positions.len());
728        let mut contains_null = false;
729
730        for &row_pos in positions {
731            let value = row.get(row_pos).cloned().ok_or_else(|| {
732                Error::InvalidArgumentError("INSERT row is missing a required column value".into())
733            })?;
734
735            if matches!(value, PlanValue::Null) {
736                contains_null = true;
737                break;
738            }
739
740            key.push(value);
741        }
742
743        if contains_null {
744            continue;
745        }
746
747        keys.push(key);
748    }
749
750    Ok(keys)
751}