llkv_table/constraints/
service.rs

1//! Constraint service that centralises runtime-facing constraint validation helpers.
2//! Currently focuses on foreign key enforcement for INSERT operations and will
3//! gradually expand to cover additional constraint workflows.
4
5#![forbid(unsafe_code)]
6
7use super::types::{ConstraintId, ForeignKeyAction};
8use super::validation::validate_foreign_key_rows;
9use super::validation::{
10    ConstraintColumnInfo, UniqueKey, build_composite_unique_key, ensure_multi_column_unique,
11    ensure_primary_key, ensure_single_column_unique, validate_check_constraints,
12};
13use crate::catalog::TableCatalog;
14use crate::metadata::MetadataManager;
15use crate::types::{FieldId, RowId, TableId};
16use crate::view::ForeignKeyView;
17use croaring::Treemap;
18use llkv_plan::PlanValue;
19use llkv_result::{Error, Result as LlkvResult};
20use llkv_storage::pager::Pager;
21use rustc_hash::{FxHashMap, FxHashSet};
22use simd_r_drive_entry_handle::EntryHandle;
23use std::sync::{Arc, RwLock};
24
25type ForeignKeyConstraintCache = FxHashMap<ConstraintId, Arc<FxHashSet<UniqueKey>>>;
26type ForeignKeyCacheMap = FxHashMap<TableId, ForeignKeyConstraintCache>;
27type SharedForeignKeyCaches = Arc<RwLock<ForeignKeyCacheMap>>;
28
29/// Column metadata required to validate NOT NULL and CHECK constraints during inserts.
30#[derive(Clone, Debug)]
31pub struct InsertColumnConstraint {
32    pub schema_index: usize,
33    pub column: ConstraintColumnInfo,
34}
35
36/// Descriptor for a single-column UNIQUE constraint.
37#[derive(Clone, Debug)]
38pub struct InsertUniqueColumn {
39    pub schema_index: usize,
40    pub field_id: FieldId,
41    pub name: String,
42}
43
44/// Descriptor for composite UNIQUE or PRIMARY KEY constraints.
45#[derive(Clone, Debug)]
46pub struct InsertMultiColumnUnique {
47    pub schema_indices: Vec<usize>,
48    pub field_ids: Vec<FieldId>,
49    pub column_names: Vec<String>,
50}
51
52/// Callback payload describing what parent rows need to be fetched for validation.
53pub struct ForeignKeyRowFetch<'a> {
54    pub referenced_table_id: TableId,
55    pub referenced_table_canonical: &'a str,
56    pub referenced_field_ids: &'a [FieldId],
57}
58
59/// Context for collecting parent row values involved in a DELETE operation.
60pub struct ForeignKeyParentRowsFetch<'a> {
61    pub referenced_table_id: TableId,
62    pub referenced_row_ids: &'a Treemap,
63    pub referenced_field_ids: &'a [FieldId],
64}
65
66/// Context for fetching visible child rows that might reference deleted parents.
67pub struct ForeignKeyChildRowsFetch<'a> {
68    pub referencing_table_id: TableId,
69    pub referencing_table_canonical: &'a str,
70    pub referencing_field_ids: &'a [FieldId],
71}
72
73/// High-level constraint service API intended for runtime consumers.
74#[derive(Clone)]
75pub struct ConstraintService<P>
76where
77    P: Pager<Blob = EntryHandle> + Send + Sync,
78{
79    metadata: Arc<MetadataManager<P>>,
80    catalog: Arc<TableCatalog>,
81    fk_parent_caches: SharedForeignKeyCaches,
82}
83
84impl<P> ConstraintService<P>
85where
86    P: Pager<Blob = EntryHandle> + Send + Sync,
87{
88    /// Create a new constraint validation service.
89    pub fn new(metadata: Arc<MetadataManager<P>>, catalog: Arc<TableCatalog>) -> Self {
90        Self {
91            metadata,
92            catalog,
93            fk_parent_caches: Arc::new(RwLock::new(FxHashMap::default())),
94        }
95    }
96
97    /// Enable parent-key caching for a referencing table during foreign key validation.
98    pub fn enable_foreign_key_cache(&self, referencing_table_id: TableId) {
99        let mut caches = self
100            .fk_parent_caches
101            .write()
102            .expect("foreign key cache poisoned");
103        caches.entry(referencing_table_id).or_default();
104    }
105
106    /// Clear any cached parent keys associated with the provided referencing table.
107    pub fn clear_foreign_key_cache(&self, referencing_table_id: TableId) {
108        self.fk_parent_caches
109            .write()
110            .expect("foreign key cache poisoned")
111            .remove(&referencing_table_id);
112    }
113
114    /// Validate that incoming INSERT rows satisfy the table's foreign key constraints.
115    pub fn validate_insert_foreign_keys<F>(
116        &self,
117        referencing_table_id: TableId,
118        schema_field_ids: &[FieldId],
119        column_order: &[usize],
120        rows: &[Vec<PlanValue>],
121        mut fetch_parent_rows: F,
122    ) -> LlkvResult<()>
123    where
124        F: FnMut(ForeignKeyRowFetch<'_>) -> LlkvResult<Vec<Vec<PlanValue>>>,
125    {
126        if rows.is_empty() {
127            return Ok(());
128        }
129
130        let details = self
131            .metadata
132            .foreign_key_views(self.catalog.as_ref(), referencing_table_id)?;
133
134        if details.is_empty() {
135            return Ok(());
136        }
137
138        let caching_enabled = self.is_foreign_key_cache_enabled(referencing_table_id);
139        let mut parent_key_cache: FxHashMap<ConstraintId, Arc<FxHashSet<UniqueKey>>> =
140            FxHashMap::default();
141        let field_lookup = build_field_lookup(schema_field_ids);
142        let mut table_to_row_index: Vec<Option<usize>> = vec![None; schema_field_ids.len()];
143        for (row_pos, &schema_idx) in column_order.iter().enumerate() {
144            if let Some(slot) = table_to_row_index.get_mut(schema_idx) {
145                *slot = Some(row_pos);
146            }
147        }
148
149        for detail in &details {
150            if detail.referencing_field_ids.is_empty() {
151                continue;
152            }
153
154            let referencing_positions = referencing_row_positions(
155                detail,
156                &field_lookup,
157                &table_to_row_index,
158                referencing_table_id,
159            )?;
160
161            let parent_keys = self.resolve_parent_keys(
162                referencing_table_id,
163                detail,
164                caching_enabled,
165                &mut parent_key_cache,
166                &mut fetch_parent_rows,
167            )?;
168
169            let candidate_keys = candidate_child_keys(detail, &referencing_positions, rows)?;
170
171            validate_foreign_key_rows(
172                detail.constraint_name.as_deref(),
173                &detail.referencing_table_display,
174                &detail.referenced_table_display,
175                &detail.referenced_column_names,
176                parent_keys.as_ref(),
177                &candidate_keys,
178            )?;
179        }
180
181        Ok(())
182    }
183
184    /// Validate INSERT rows against all table constraints including primary keys, unique constraints,
185    /// and CHECK expressions. This is a comprehensive validation that combines uniqueness checks
186    /// (both single-column and multi-column) with row-level CHECK constraint evaluation.
187    #[allow(clippy::too_many_arguments)]
188    pub fn validate_insert_constraints<FSingle, FMulti>(
189        &self,
190        schema_field_ids: &[FieldId],
191        column_constraints: &[InsertColumnConstraint],
192        unique_columns: &[InsertUniqueColumn],
193        multi_column_uniques: &[InsertMultiColumnUnique],
194        primary_key: Option<&InsertMultiColumnUnique>,
195        column_order: &[usize],
196        rows: &[Vec<PlanValue>],
197        mut fetch_column_values: FSingle,
198        mut fetch_multi_column_rows: FMulti,
199    ) -> LlkvResult<()>
200    where
201        FSingle: FnMut(FieldId) -> LlkvResult<Vec<PlanValue>>,
202        FMulti: FnMut(&[FieldId]) -> LlkvResult<Vec<Vec<PlanValue>>>,
203    {
204        if rows.is_empty() {
205            return Ok(());
206        }
207
208        let schema_to_row_index = build_schema_to_row_index(schema_field_ids.len(), column_order)?;
209        validate_row_constraints_with_mapping(
210            column_constraints,
211            rows,
212            &schema_to_row_index,
213            column_order,
214        )?;
215
216        for unique in unique_columns {
217            let Some(row_pos) = schema_to_row_index
218                .get(unique.schema_index)
219                .and_then(|opt| *opt)
220            else {
221                continue;
222            };
223
224            let existing_values = fetch_column_values(unique.field_id)?;
225            let mut new_values: Vec<PlanValue> = Vec::with_capacity(rows.len());
226            for row in rows {
227                let value = row.get(row_pos).cloned().unwrap_or(PlanValue::Null);
228                new_values.push(value);
229            }
230
231            ensure_single_column_unique(&existing_values, &new_values, &unique.name)?;
232        }
233
234        for constraint in multi_column_uniques {
235            if constraint.schema_indices.is_empty() {
236                continue;
237            }
238
239            let existing_rows = fetch_multi_column_rows(&constraint.field_ids)?;
240            let existing_keys =
241                rows_to_unique_keys(existing_rows, &constraint.column_names, NullKeyMode::Skip)?;
242            let new_keys = collect_unique_keys_from_rows(
243                rows,
244                &schema_to_row_index,
245                &constraint.schema_indices,
246                &constraint.column_names,
247                NullKeyMode::Skip,
248            )?;
249            ensure_multi_column_unique(&existing_keys, &new_keys, &constraint.column_names)?;
250        }
251
252        if let Some(pk) = primary_key
253            && !pk.schema_indices.is_empty()
254        {
255            let (pk_label, pk_display) = primary_key_context(&pk.column_names);
256            let existing_rows = fetch_multi_column_rows(&pk.field_ids)?;
257            let existing_keys = rows_to_unique_keys(
258                existing_rows,
259                &pk.column_names,
260                NullKeyMode::PrimaryKey {
261                    label: pk_label,
262                    display: &pk_display,
263                },
264            )?;
265            let new_keys = collect_unique_keys_from_rows(
266                rows,
267                &schema_to_row_index,
268                &pk.schema_indices,
269                &pk.column_names,
270                NullKeyMode::PrimaryKey {
271                    label: pk_label,
272                    display: &pk_display,
273                },
274            )?;
275            ensure_primary_key(&existing_keys, &new_keys, &pk.column_names)?;
276        }
277
278        Ok(())
279    }
280
281    /// Validate rows against CHECK constraints. This method evaluates CHECK expressions
282    /// for each row, ensuring they satisfy the table's row-level constraint rules.
283    pub fn validate_row_level_constraints(
284        &self,
285        schema_field_ids: &[FieldId],
286        column_constraints: &[InsertColumnConstraint],
287        column_order: &[usize],
288        rows: &[Vec<PlanValue>],
289    ) -> LlkvResult<()> {
290        if rows.is_empty() {
291            return Ok(());
292        }
293
294        let schema_to_row_index = build_schema_to_row_index(schema_field_ids.len(), column_order)?;
295        validate_row_constraints_with_mapping(
296            column_constraints,
297            rows,
298            &schema_to_row_index,
299            column_order,
300        )
301    }
302
303    /// Validate that INSERT rows satisfy the primary key constraint by checking for duplicates
304    /// against both existing rows in the table and within the new batch.
305    pub fn validate_primary_key_rows<F>(
306        &self,
307        schema_field_ids: &[FieldId],
308        primary_key: &InsertMultiColumnUnique,
309        column_order: &[usize],
310        rows: &[Vec<PlanValue>],
311        mut fetch_multi_column_rows: F,
312    ) -> LlkvResult<()>
313    where
314        F: FnMut(&[FieldId]) -> LlkvResult<Vec<Vec<PlanValue>>>,
315    {
316        if rows.is_empty() || primary_key.schema_indices.is_empty() {
317            return Ok(());
318        }
319
320        let schema_to_row_index = build_schema_to_row_index(schema_field_ids.len(), column_order)?;
321        let (pk_label, pk_display) = primary_key_context(&primary_key.column_names);
322        let existing_rows = fetch_multi_column_rows(&primary_key.field_ids)?;
323        let existing_keys = rows_to_unique_keys(
324            existing_rows,
325            &primary_key.column_names,
326            NullKeyMode::PrimaryKey {
327                label: pk_label,
328                display: &pk_display,
329            },
330        )?;
331        let new_keys = collect_unique_keys_from_rows(
332            rows,
333            &schema_to_row_index,
334            &primary_key.schema_indices,
335            &primary_key.column_names,
336            NullKeyMode::PrimaryKey {
337                label: pk_label,
338                display: &pk_display,
339            },
340        )?;
341        ensure_primary_key(&existing_keys, &new_keys, &primary_key.column_names)
342    }
343
344    /// Validate UPDATE operations that modify primary key columns. Ensures that updated
345    /// primary key values don't conflict with existing rows (excluding the original row being updated).
346    pub fn validate_update_primary_keys<F>(
347        &self,
348        schema_field_ids: &[FieldId],
349        primary_key: &InsertMultiColumnUnique,
350        column_order: &[usize],
351        rows: &[Vec<PlanValue>],
352        original_keys: &[Option<UniqueKey>],
353        mut fetch_multi_column_rows: F,
354    ) -> LlkvResult<()>
355    where
356        F: FnMut(&[FieldId]) -> LlkvResult<Vec<Vec<PlanValue>>>,
357    {
358        if rows.is_empty() || primary_key.schema_indices.is_empty() {
359            return Ok(());
360        }
361
362        if original_keys.len() != rows.len() {
363            return Err(Error::Internal(
364                "primary key original value count does not match row count".into(),
365            ));
366        }
367
368        let schema_to_row_index = build_schema_to_row_index(schema_field_ids.len(), column_order)?;
369
370        let (pk_label, pk_display) = primary_key_context(&primary_key.column_names);
371        let existing_rows = fetch_multi_column_rows(&primary_key.field_ids)?;
372        let existing_key_vec = rows_to_unique_keys(
373            existing_rows,
374            &primary_key.column_names,
375            NullKeyMode::PrimaryKey {
376                label: pk_label,
377                display: &pk_display,
378            },
379        )?;
380        let mut existing_keys: FxHashSet<UniqueKey> = existing_key_vec.into_iter().collect();
381
382        for key in original_keys.iter().flatten() {
383            existing_keys.remove(key);
384        }
385
386        let mut new_seen: FxHashSet<UniqueKey> = FxHashSet::default();
387        let new_keys = collect_unique_keys_from_rows(
388            rows,
389            &schema_to_row_index,
390            &primary_key.schema_indices,
391            &primary_key.column_names,
392            NullKeyMode::PrimaryKey {
393                label: pk_label,
394                display: &pk_display,
395            },
396        )?;
397
398        for key in new_keys {
399            if existing_keys.contains(&key) {
400                return Err(Error::ConstraintError(format!(
401                    "Duplicate key violates primary key constraint on {pk_label} '{}' (PRIMARY KEY or UNIQUE constraint violation)",
402                    pk_display
403                )));
404            }
405
406            if !new_seen.insert(key.clone()) {
407                return Err(Error::ConstraintError(format!(
408                    "Duplicate key violates primary key constraint on {pk_label} '{}' (PRIMARY KEY or UNIQUE constraint violation)",
409                    pk_display
410                )));
411            }
412
413            existing_keys.insert(key);
414        }
415
416        Ok(())
417    }
418
419    /// Validate that deleting the given rows will not violate foreign key constraints.
420    pub fn validate_delete_foreign_keys<FParents, FChildren>(
421        &self,
422        referenced_table_id: TableId,
423        referenced_row_ids: &Treemap,
424        mut fetch_parent_rows: FParents,
425        mut fetch_child_rows: FChildren,
426    ) -> LlkvResult<()>
427    where
428        FParents: FnMut(ForeignKeyParentRowsFetch<'_>) -> LlkvResult<Vec<Vec<PlanValue>>>,
429        FChildren: FnMut(ForeignKeyChildRowsFetch<'_>) -> LlkvResult<Vec<(RowId, Vec<PlanValue>)>>,
430    {
431        if referenced_row_ids.is_empty() {
432            return Ok(());
433        }
434
435        let referencing = self
436            .metadata
437            .foreign_keys_referencing(referenced_table_id)?;
438        if referencing.is_empty() {
439            return Ok(());
440        }
441
442        for (child_table_id, constraint_id) in referencing {
443            let details = self
444                .metadata
445                .foreign_key_views(self.catalog.as_ref(), child_table_id)?;
446
447            let Some(detail) = details
448                .into_iter()
449                .find(|detail| detail.constraint_id == constraint_id)
450            else {
451                continue;
452            };
453
454            if detail.referenced_field_ids.is_empty() || detail.referencing_field_ids.is_empty() {
455                continue;
456            }
457
458            let parent_rows = fetch_parent_rows(ForeignKeyParentRowsFetch {
459                referenced_table_id,
460                referenced_row_ids,
461                referenced_field_ids: &detail.referenced_field_ids,
462            })?;
463
464            let parent_keys = canonical_parent_keys(&detail, parent_rows)?;
465            if parent_keys.is_empty() {
466                continue;
467            }
468
469            let child_rows = fetch_child_rows(ForeignKeyChildRowsFetch {
470                referencing_table_id: detail.referencing_table_id,
471                referencing_table_canonical: &detail.referencing_table_canonical,
472                referencing_field_ids: &detail.referencing_field_ids,
473            })?;
474
475            if child_rows.is_empty() {
476                continue;
477            }
478
479            for (child_row_id, values) in child_rows {
480                if values.len() != detail.referencing_field_ids.len() {
481                    continue;
482                }
483
484                let child_key =
485                    build_composite_unique_key(&values, &detail.referencing_column_names)?;
486                let Some(child_key) = child_key else { continue };
487
488                if !parent_keys.contains(&child_key) {
489                    continue;
490                }
491
492                if detail.referencing_table_id == detail.referenced_table_id
493                    && referenced_row_ids.contains(child_row_id)
494                {
495                    continue;
496                }
497
498                let constraint_label = detail.constraint_name.as_deref().unwrap_or("FOREIGN KEY");
499                match detail.on_delete {
500                    ForeignKeyAction::NoAction | ForeignKeyAction::Restrict => {
501                        return Err(Error::ConstraintError(format!(
502                            "Violates foreign key constraint '{}' on table '{}' referencing '{}' - row is still referenced by a foreign key in a different table",
503                            constraint_label,
504                            detail.referencing_table_display,
505                            detail.referenced_table_display,
506                        )));
507                    }
508                }
509            }
510        }
511
512        Ok(())
513    }
514
515    /// Validate that updating the given rows will not violate foreign key constraints.
516    ///
517    /// This checks if any columns being updated are referenced by foreign keys, and whether
518    /// the OLD values are still being referenced by child tables.
519    pub fn validate_update_foreign_keys<FParents, FChildren>(
520        &self,
521        referenced_table_id: TableId,
522        referenced_row_ids: &Treemap,
523        updated_field_ids: &[FieldId],
524        mut fetch_parent_rows: FParents,
525        mut fetch_child_rows: FChildren,
526    ) -> LlkvResult<()>
527    where
528        FParents: FnMut(ForeignKeyParentRowsFetch<'_>) -> LlkvResult<Vec<Vec<PlanValue>>>,
529        FChildren: FnMut(ForeignKeyChildRowsFetch<'_>) -> LlkvResult<Vec<(RowId, Vec<PlanValue>)>>,
530    {
531        if referenced_row_ids.is_empty() || updated_field_ids.is_empty() {
532            return Ok(());
533        }
534
535        let referencing = self
536            .metadata
537            .foreign_keys_referencing(referenced_table_id)?;
538        if referencing.is_empty() {
539            return Ok(());
540        }
541
542        for (child_table_id, constraint_id) in referencing {
543            let details = self
544                .metadata
545                .foreign_key_views(self.catalog.as_ref(), child_table_id)?;
546
547            let Some(detail) = details
548                .into_iter()
549                .find(|detail| detail.constraint_id == constraint_id)
550            else {
551                continue;
552            };
553
554            if detail.referenced_field_ids.is_empty() || detail.referencing_field_ids.is_empty() {
555                continue;
556            }
557
558            // Check if any of the columns being updated are part of this foreign key
559            let is_referenced_column_updated = detail
560                .referenced_field_ids
561                .iter()
562                .any(|fid| updated_field_ids.contains(fid));
563
564            if !is_referenced_column_updated {
565                // This FK doesn't reference any columns being updated, skip
566                continue;
567            }
568
569            // Fetch the OLD values from the parent table (before update)
570            let parent_rows = fetch_parent_rows(ForeignKeyParentRowsFetch {
571                referenced_table_id,
572                referenced_row_ids,
573                referenced_field_ids: &detail.referenced_field_ids,
574            })?;
575
576            let parent_keys = canonical_parent_keys(&detail, parent_rows)?;
577            if parent_keys.is_empty() {
578                continue;
579            }
580
581            // Fetch all rows from child table that reference this parent
582            let child_rows = fetch_child_rows(ForeignKeyChildRowsFetch {
583                referencing_table_id: detail.referencing_table_id,
584                referencing_table_canonical: &detail.referencing_table_canonical,
585                referencing_field_ids: &detail.referencing_field_ids,
586            })?;
587
588            if child_rows.is_empty() {
589                continue;
590            }
591
592            // Check if any child rows reference the OLD values
593            for (_child_row_id, values) in child_rows {
594                if values.len() != detail.referencing_field_ids.len() {
595                    continue;
596                }
597
598                let child_key =
599                    build_composite_unique_key(&values, &detail.referencing_column_names)?;
600                let Some(child_key) = child_key else { continue };
601
602                // If a child row references one of the parent keys being updated, fail
603                if parent_keys.contains(&child_key) {
604                    let constraint_label =
605                        detail.constraint_name.as_deref().unwrap_or("FOREIGN KEY");
606                    return Err(Error::ConstraintError(format!(
607                        "Violates foreign key constraint '{}' on table '{}' referencing '{}' - cannot update referenced column while foreign key exists",
608                        constraint_label,
609                        detail.referencing_table_display,
610                        detail.referenced_table_display,
611                    )));
612                }
613            }
614        }
615
616        Ok(())
617    }
618
619    /// Return the set of foreign keys referencing the provided table.
620    pub fn referencing_foreign_keys(
621        &self,
622        referenced_table_id: TableId,
623    ) -> LlkvResult<Vec<ForeignKeyView>> {
624        let referencing = self
625            .metadata
626            .foreign_keys_referencing(referenced_table_id)?;
627
628        if referencing.is_empty() {
629            return Ok(Vec::new());
630        }
631
632        let mut details_out = Vec::new();
633        for (child_table_id, constraint_id) in referencing {
634            let details = match self
635                .metadata
636                .foreign_key_views(self.catalog.as_ref(), child_table_id)
637            {
638                Ok(details) => details,
639                Err(Error::InvalidArgumentError(_)) | Err(Error::CatalogError(_)) => continue,
640                Err(err) => return Err(err),
641            };
642
643            if let Some(detail) = details
644                .into_iter()
645                .find(|detail| detail.constraint_id == constraint_id)
646            {
647                details_out.push(detail);
648            }
649        }
650
651        Ok(details_out)
652    }
653
654    fn is_foreign_key_cache_enabled(&self, table_id: TableId) -> bool {
655        self.fk_parent_caches
656            .read()
657            .expect("foreign key cache poisoned")
658            .contains_key(&table_id)
659    }
660
661    fn cached_parent_keys(
662        &self,
663        table_id: TableId,
664        constraint_id: ConstraintId,
665    ) -> Option<Arc<FxHashSet<UniqueKey>>> {
666        self.fk_parent_caches
667            .read()
668            .expect("foreign key cache poisoned")
669            .get(&table_id)
670            .and_then(|cache| cache.get(&constraint_id).map(Arc::clone))
671    }
672
673    fn store_cached_parent_keys(
674        &self,
675        table_id: TableId,
676        constraint_id: ConstraintId,
677        keys: FxHashSet<UniqueKey>,
678    ) -> Arc<FxHashSet<UniqueKey>> {
679        let mut caches = self
680            .fk_parent_caches
681            .write()
682            .expect("foreign key cache poisoned");
683        let Some(entry) = caches.get_mut(&table_id) else {
684            return Arc::new(keys);
685        };
686
687        let arc_keys = Arc::new(keys);
688        entry.insert(constraint_id, Arc::clone(&arc_keys));
689        arc_keys
690    }
691
692    fn resolve_parent_keys<F>(
693        &self,
694        referencing_table_id: TableId,
695        detail: &ForeignKeyView,
696        caching_enabled: bool,
697        local_cache: &mut FxHashMap<ConstraintId, Arc<FxHashSet<UniqueKey>>>,
698        fetch_parent_rows: &mut F,
699    ) -> LlkvResult<Arc<FxHashSet<UniqueKey>>>
700    where
701        F: FnMut(ForeignKeyRowFetch<'_>) -> LlkvResult<Vec<Vec<PlanValue>>>,
702    {
703        if caching_enabled {
704            if let Some(keys) = self.cached_parent_keys(referencing_table_id, detail.constraint_id)
705            {
706                return Ok(keys);
707            }
708        } else if let Some(keys) = local_cache.get(&detail.constraint_id) {
709            return Ok(Arc::clone(keys));
710        }
711
712        let parent_rows = fetch_parent_rows(ForeignKeyRowFetch {
713            referenced_table_id: detail.referenced_table_id,
714            referenced_table_canonical: &detail.referenced_table_canonical,
715            referenced_field_ids: &detail.referenced_field_ids,
716        })?;
717
718        let key_set = canonical_parent_keys(detail, parent_rows)?;
719
720        if caching_enabled {
721            Ok(self.store_cached_parent_keys(referencing_table_id, detail.constraint_id, key_set))
722        } else {
723            let arc_keys = Arc::new(key_set);
724            local_cache.insert(detail.constraint_id, Arc::clone(&arc_keys));
725            Ok(arc_keys)
726        }
727    }
728}
729
730fn build_field_lookup(schema_field_ids: &[FieldId]) -> FxHashMap<FieldId, usize> {
731    let mut lookup = FxHashMap::default();
732    for (idx, field_id) in schema_field_ids.iter().copied().enumerate() {
733        lookup.insert(field_id, idx);
734    }
735    lookup
736}
737
738fn validate_row_constraints_with_mapping(
739    column_constraints: &[InsertColumnConstraint],
740    rows: &[Vec<PlanValue>],
741    schema_to_row_index: &[Option<usize>],
742    column_order: &[usize],
743) -> LlkvResult<()> {
744    for constraint in column_constraints {
745        if constraint.column.nullable {
746            continue;
747        }
748
749        let Some(row_pos) = schema_to_row_index
750            .get(constraint.schema_index)
751            .and_then(|opt| *opt)
752        else {
753            return Err(Error::ConstraintError(format!(
754                "NOT NULL column '{}' missing from INSERT/UPDATE",
755                constraint.column.name
756            )));
757        };
758
759        for row in rows {
760            if matches!(row.get(row_pos), Some(PlanValue::Null)) {
761                return Err(Error::ConstraintError(format!(
762                    "NOT NULL constraint failed for column '{}'",
763                    constraint.column.name
764                )));
765            }
766        }
767    }
768
769    let check_columns: Vec<ConstraintColumnInfo> = column_constraints
770        .iter()
771        .map(|constraint| constraint.column.clone())
772        .collect();
773    validate_check_constraints(check_columns.as_slice(), rows, column_order)?;
774    Ok(())
775}
776
777fn build_schema_to_row_index(
778    schema_len: usize,
779    column_order: &[usize],
780) -> LlkvResult<Vec<Option<usize>>> {
781    let mut schema_to_row_index: Vec<Option<usize>> = vec![None; schema_len];
782    for (row_pos, &schema_idx) in column_order.iter().enumerate() {
783        if schema_idx >= schema_len {
784            return Err(Error::Internal(format!(
785                "column index {} out of bounds for schema (len={})",
786                schema_idx, schema_len
787            )));
788        }
789        schema_to_row_index[schema_idx] = Some(row_pos);
790    }
791    Ok(schema_to_row_index)
792}
793
794fn primary_key_context(column_names: &[String]) -> (&'static str, String) {
795    if column_names.len() == 1 {
796        ("column", column_names[0].clone())
797    } else {
798        ("columns", column_names.join(", "))
799    }
800}
801
802#[derive(Clone, Copy)]
803enum NullKeyMode<'a> {
804    Skip,
805    PrimaryKey {
806        label: &'static str,
807        display: &'a str,
808    },
809}
810
811fn rows_to_unique_keys<'a>(
812    rows: Vec<Vec<PlanValue>>,
813    column_names: &[String],
814    mode: NullKeyMode<'a>,
815) -> LlkvResult<Vec<UniqueKey>> {
816    let mut keys = Vec::with_capacity(rows.len());
817    for values in rows {
818        push_unique_key(&mut keys, &values, column_names, mode)?;
819    }
820    Ok(keys)
821}
822
823fn collect_unique_keys_from_rows<'a>(
824    rows: &[Vec<PlanValue>],
825    schema_to_row_index: &[Option<usize>],
826    schema_indices: &[usize],
827    column_names: &[String],
828    mode: NullKeyMode<'a>,
829) -> LlkvResult<Vec<UniqueKey>> {
830    if schema_indices.is_empty() {
831        return Ok(Vec::new());
832    }
833
834    let mut keys = Vec::with_capacity(rows.len());
835    let mut buffer = Vec::with_capacity(schema_indices.len());
836    for row in rows {
837        buffer.clear();
838        for &schema_idx in schema_indices {
839            let value = schema_to_row_index
840                .get(schema_idx)
841                .and_then(|opt| *opt)
842                .and_then(|row_pos| row.get(row_pos).cloned())
843                .unwrap_or(PlanValue::Null);
844            buffer.push(value);
845        }
846
847        push_unique_key(&mut keys, &buffer, column_names, mode)?;
848    }
849
850    Ok(keys)
851}
852
853fn push_unique_key<'a>(
854    keys: &mut Vec<UniqueKey>,
855    values: &[PlanValue],
856    column_names: &[String],
857    mode: NullKeyMode<'a>,
858) -> LlkvResult<()> {
859    match build_composite_unique_key(values, column_names)? {
860        Some(key) => {
861            keys.push(key);
862            Ok(())
863        }
864        None => match mode {
865            NullKeyMode::Skip => Ok(()),
866            NullKeyMode::PrimaryKey { label, display } => Err(Error::ConstraintError(format!(
867                "constraint failed: NOT NULL constraint failed for PRIMARY KEY {label} '{display}'"
868            ))),
869        },
870    }
871}
872
873fn referencing_row_positions(
874    detail: &ForeignKeyView,
875    lookup: &FxHashMap<FieldId, usize>,
876    table_to_row_index: &[Option<usize>],
877    table_id: TableId,
878) -> LlkvResult<Vec<usize>> {
879    let mut positions = Vec::with_capacity(detail.referencing_field_ids.len());
880
881    for (idx, field_id) in detail.referencing_field_ids.iter().cloned().enumerate() {
882        let schema_index = lookup.get(&field_id).cloned().ok_or_else(|| {
883            Error::Internal(format!(
884                "referencing field id {} not found in table '{}' (table_id={})",
885                field_id, detail.referencing_table_display, table_id
886            ))
887        })?;
888
889        let position = table_to_row_index
890            .get(schema_index)
891            .and_then(|value| *value)
892            .ok_or_else(|| {
893                let column_name = detail
894                    .referencing_column_names
895                    .get(idx)
896                    .cloned()
897                    .unwrap_or_else(|| schema_index.to_string());
898                Error::InvalidArgumentError(format!(
899                    "FOREIGN KEY column '{}' missing from INSERT statement",
900                    column_name
901                ))
902            })?;
903
904        positions.push(position);
905    }
906
907    Ok(positions)
908}
909
910fn canonical_parent_keys(
911    detail: &ForeignKeyView,
912    parent_rows: Vec<Vec<PlanValue>>,
913) -> LlkvResult<FxHashSet<UniqueKey>> {
914    let mut keys = FxHashSet::default();
915    for values in parent_rows {
916        if values.len() != detail.referenced_field_ids.len() {
917            continue;
918        }
919
920        let key = build_composite_unique_key(&values, &detail.referenced_column_names)?;
921        if let Some(key) = key {
922            keys.insert(key);
923        }
924    }
925
926    Ok(keys)
927}
928
929fn candidate_child_keys(
930    detail: &ForeignKeyView,
931    positions: &[usize],
932    rows: &[Vec<PlanValue>],
933) -> LlkvResult<Vec<UniqueKey>> {
934    let mut keys = Vec::new();
935
936    for row in rows {
937        let mut values: Vec<PlanValue> = Vec::with_capacity(positions.len());
938
939        for &row_pos in positions {
940            let value = row.get(row_pos).cloned().ok_or_else(|| {
941                Error::InvalidArgumentError("INSERT row is missing a required column value".into())
942            })?;
943            values.push(value);
944        }
945
946        let key = build_composite_unique_key(&values, &detail.referencing_column_names)?;
947        if let Some(key) = key {
948            keys.push(key);
949        }
950    }
951
952    Ok(keys)
953}