llkv_table/constraints/
service.rs

1//! Constraint service that centralises runtime-facing constraint validation helpers.
2//! Currently focuses on foreign key enforcement for INSERT operations and will
3//! gradually expand to cover additional constraint workflows.
4
5#![forbid(unsafe_code)]
6
7use super::types::ForeignKeyAction;
8use super::validation::validate_foreign_key_rows;
9use super::validation::{
10    ConstraintColumnInfo, UniqueKey, build_composite_unique_key, ensure_multi_column_unique,
11    ensure_primary_key, ensure_single_column_unique, validate_check_constraints,
12};
13use crate::catalog::TableCatalog;
14use crate::metadata::MetadataManager;
15use crate::types::{FieldId, RowId, TableId};
16use crate::view::ForeignKeyView;
17use llkv_plan::PlanValue;
18use llkv_result::{Error, Result as LlkvResult};
19use llkv_storage::pager::Pager;
20use rustc_hash::{FxHashMap, FxHashSet};
21use simd_r_drive_entry_handle::EntryHandle;
22use std::sync::Arc;
23
24/// Column metadata required to validate NOT NULL and CHECK constraints during inserts.
25#[derive(Clone, Debug)]
26pub struct InsertColumnConstraint {
27    pub schema_index: usize,
28    pub column: ConstraintColumnInfo,
29}
30
31/// Descriptor for a single-column UNIQUE constraint.
32#[derive(Clone, Debug)]
33pub struct InsertUniqueColumn {
34    pub schema_index: usize,
35    pub field_id: FieldId,
36    pub name: String,
37}
38
39/// Descriptor for composite UNIQUE or PRIMARY KEY constraints.
40#[derive(Clone, Debug)]
41pub struct InsertMultiColumnUnique {
42    pub schema_indices: Vec<usize>,
43    pub field_ids: Vec<FieldId>,
44    pub column_names: Vec<String>,
45}
46
47/// Callback payload describing what parent rows need to be fetched for validation.
48pub struct ForeignKeyRowFetch<'a> {
49    pub referenced_table_id: TableId,
50    pub referenced_table_canonical: &'a str,
51    pub referenced_field_ids: &'a [FieldId],
52}
53
54/// Context for collecting parent row values involved in a DELETE operation.
55pub struct ForeignKeyParentRowsFetch<'a> {
56    pub referenced_table_id: TableId,
57    pub referenced_row_ids: &'a [RowId],
58    pub referenced_field_ids: &'a [FieldId],
59}
60
61/// Context for fetching visible child rows that might reference deleted parents.
62pub struct ForeignKeyChildRowsFetch<'a> {
63    pub referencing_table_id: TableId,
64    pub referencing_table_canonical: &'a str,
65    pub referencing_field_ids: &'a [FieldId],
66}
67
68/// High-level constraint service API intended for runtime consumers.
69#[derive(Clone)]
70pub struct ConstraintService<P>
71where
72    P: Pager<Blob = EntryHandle> + Send + Sync,
73{
74    metadata: Arc<MetadataManager<P>>,
75    catalog: Arc<TableCatalog>,
76}
77
78impl<P> ConstraintService<P>
79where
80    P: Pager<Blob = EntryHandle> + Send + Sync,
81{
82    /// Create a new constraint validation service.
83    pub fn new(metadata: Arc<MetadataManager<P>>, catalog: Arc<TableCatalog>) -> Self {
84        Self { metadata, catalog }
85    }
86
87    /// Validate that incoming INSERT rows satisfy the table's foreign key constraints.
88    pub fn validate_insert_foreign_keys<F>(
89        &self,
90        referencing_table_id: TableId,
91        schema_field_ids: &[FieldId],
92        column_order: &[usize],
93        rows: &[Vec<PlanValue>],
94        mut fetch_parent_rows: F,
95    ) -> LlkvResult<()>
96    where
97        F: FnMut(ForeignKeyRowFetch<'_>) -> LlkvResult<Vec<Vec<PlanValue>>>,
98    {
99        if rows.is_empty() {
100            return Ok(());
101        }
102
103        let details = self
104            .metadata
105            .foreign_key_views(self.catalog.as_ref(), referencing_table_id)?;
106
107        if details.is_empty() {
108            return Ok(());
109        }
110
111        let field_lookup = build_field_lookup(schema_field_ids);
112        let mut table_to_row_index: Vec<Option<usize>> = vec![None; schema_field_ids.len()];
113        for (row_pos, &schema_idx) in column_order.iter().enumerate() {
114            if let Some(slot) = table_to_row_index.get_mut(schema_idx) {
115                *slot = Some(row_pos);
116            }
117        }
118
119        for detail in &details {
120            if detail.referencing_field_ids.is_empty() {
121                continue;
122            }
123
124            let referencing_positions = referencing_row_positions(
125                detail,
126                &field_lookup,
127                &table_to_row_index,
128                referencing_table_id,
129            )?;
130
131            let parent_rows = fetch_parent_rows(ForeignKeyRowFetch {
132                referenced_table_id: detail.referenced_table_id,
133                referenced_table_canonical: &detail.referenced_table_canonical,
134                referenced_field_ids: &detail.referenced_field_ids,
135            })?;
136
137            let parent_keys = canonical_parent_keys(detail, parent_rows);
138            let candidate_keys = candidate_child_keys(&referencing_positions, rows)?;
139
140            validate_foreign_key_rows(
141                detail.constraint_name.as_deref(),
142                &detail.referencing_table_display,
143                &detail.referenced_table_display,
144                &detail.referenced_column_names,
145                &parent_keys,
146                &candidate_keys,
147            )?;
148        }
149
150        Ok(())
151    }
152
153    /// Validate INSERT rows against all table constraints including primary keys, unique constraints,
154    /// and CHECK expressions. This is a comprehensive validation that combines uniqueness checks
155    /// (both single-column and multi-column) with row-level CHECK constraint evaluation.
156    #[allow(clippy::too_many_arguments)]
157    pub fn validate_insert_constraints<FSingle, FMulti>(
158        &self,
159        schema_field_ids: &[FieldId],
160        column_constraints: &[InsertColumnConstraint],
161        unique_columns: &[InsertUniqueColumn],
162        multi_column_uniques: &[InsertMultiColumnUnique],
163        primary_key: Option<&InsertMultiColumnUnique>,
164        column_order: &[usize],
165        rows: &[Vec<PlanValue>],
166        mut fetch_column_values: FSingle,
167        mut fetch_multi_column_rows: FMulti,
168    ) -> LlkvResult<()>
169    where
170        FSingle: FnMut(FieldId) -> LlkvResult<Vec<PlanValue>>,
171        FMulti: FnMut(&[FieldId]) -> LlkvResult<Vec<Vec<PlanValue>>>,
172    {
173        if rows.is_empty() {
174            return Ok(());
175        }
176
177        let schema_to_row_index = build_schema_to_row_index(schema_field_ids.len(), column_order)?;
178        validate_row_constraints_with_mapping(
179            column_constraints,
180            rows,
181            &schema_to_row_index,
182            column_order,
183        )?;
184
185        for unique in unique_columns {
186            let Some(row_pos) = schema_to_row_index
187                .get(unique.schema_index)
188                .and_then(|opt| *opt)
189            else {
190                continue;
191            };
192
193            let existing_values = fetch_column_values(unique.field_id)?;
194            let mut new_values: Vec<PlanValue> = Vec::with_capacity(rows.len());
195            for row in rows {
196                let value = row.get(row_pos).cloned().unwrap_or(PlanValue::Null);
197                new_values.push(value);
198            }
199
200            ensure_single_column_unique(&existing_values, &new_values, &unique.name)?;
201        }
202
203        for constraint in multi_column_uniques {
204            if constraint.schema_indices.is_empty() {
205                continue;
206            }
207
208            let existing_rows = fetch_multi_column_rows(&constraint.field_ids)?;
209            let new_rows = collect_row_sets(rows, &schema_to_row_index, &constraint.schema_indices);
210            ensure_multi_column_unique(&existing_rows, &new_rows, &constraint.column_names)?;
211        }
212
213        if let Some(pk) = primary_key
214            && !pk.schema_indices.is_empty()
215        {
216            let existing_rows = fetch_multi_column_rows(&pk.field_ids)?;
217            let new_rows = collect_row_sets(rows, &schema_to_row_index, &pk.schema_indices);
218            ensure_primary_key(&existing_rows, &new_rows, &pk.column_names)?;
219        }
220
221        Ok(())
222    }
223
224    /// Validate rows against CHECK constraints. This method evaluates CHECK expressions
225    /// for each row, ensuring they satisfy the table's row-level constraint rules.
226    pub fn validate_row_level_constraints(
227        &self,
228        schema_field_ids: &[FieldId],
229        column_constraints: &[InsertColumnConstraint],
230        column_order: &[usize],
231        rows: &[Vec<PlanValue>],
232    ) -> LlkvResult<()> {
233        if rows.is_empty() {
234            return Ok(());
235        }
236
237        let schema_to_row_index = build_schema_to_row_index(schema_field_ids.len(), column_order)?;
238        validate_row_constraints_with_mapping(
239            column_constraints,
240            rows,
241            &schema_to_row_index,
242            column_order,
243        )
244    }
245
246    /// Validate that INSERT rows satisfy the primary key constraint by checking for duplicates
247    /// against both existing rows in the table and within the new batch.
248    pub fn validate_primary_key_rows<F>(
249        &self,
250        schema_field_ids: &[FieldId],
251        primary_key: &InsertMultiColumnUnique,
252        column_order: &[usize],
253        rows: &[Vec<PlanValue>],
254        mut fetch_multi_column_rows: F,
255    ) -> LlkvResult<()>
256    where
257        F: FnMut(&[FieldId]) -> LlkvResult<Vec<Vec<PlanValue>>>,
258    {
259        if rows.is_empty() || primary_key.schema_indices.is_empty() {
260            return Ok(());
261        }
262
263        let schema_to_row_index = build_schema_to_row_index(schema_field_ids.len(), column_order)?;
264        let existing_rows = fetch_multi_column_rows(&primary_key.field_ids)?;
265        let new_rows = collect_row_sets(rows, &schema_to_row_index, &primary_key.schema_indices);
266        ensure_primary_key(&existing_rows, &new_rows, &primary_key.column_names)
267    }
268
269    /// Validate UPDATE operations that modify primary key columns. Ensures that updated
270    /// primary key values don't conflict with existing rows (excluding the original row being updated).
271    pub fn validate_update_primary_keys<F>(
272        &self,
273        schema_field_ids: &[FieldId],
274        primary_key: &InsertMultiColumnUnique,
275        column_order: &[usize],
276        rows: &[Vec<PlanValue>],
277        original_keys: &[Option<UniqueKey>],
278        mut fetch_multi_column_rows: F,
279    ) -> LlkvResult<()>
280    where
281        F: FnMut(&[FieldId]) -> LlkvResult<Vec<Vec<PlanValue>>>,
282    {
283        if rows.is_empty() || primary_key.schema_indices.is_empty() {
284            return Ok(());
285        }
286
287        if original_keys.len() != rows.len() {
288            return Err(Error::Internal(
289                "primary key original value count does not match row count".into(),
290            ));
291        }
292
293        let schema_to_row_index = build_schema_to_row_index(schema_field_ids.len(), column_order)?;
294
295        let mut existing_rows = fetch_multi_column_rows(&primary_key.field_ids)?;
296        let mut existing_keys: FxHashSet<UniqueKey> = FxHashSet::default();
297        for row_values in existing_rows.drain(..) {
298            if let Some(key) = build_composite_unique_key(&row_values, &primary_key.column_names)? {
299                existing_keys.insert(key);
300            }
301        }
302
303        for key in original_keys.iter().flatten() {
304            existing_keys.remove(key);
305        }
306
307        let (pk_label, pk_display) = primary_key_context(&primary_key.column_names);
308        let mut new_seen: FxHashSet<UniqueKey> = FxHashSet::default();
309        let new_row_sets =
310            collect_row_sets(rows, &schema_to_row_index, &primary_key.schema_indices);
311
312        for values in new_row_sets {
313            let key = build_composite_unique_key(&values, &primary_key.column_names)?;
314            let key = key.ok_or_else(|| {
315                Error::ConstraintError(format!(
316                    "constraint failed: NOT NULL constraint failed for PRIMARY KEY {pk_label} '{pk_display}'"
317                ))
318            })?;
319
320            if existing_keys.contains(&key) {
321                return Err(Error::ConstraintError(format!(
322                    "Duplicate key violates primary key constraint on {pk_label} '{}' (PRIMARY KEY or UNIQUE constraint violation)",
323                    pk_display
324                )));
325            }
326
327            if !new_seen.insert(key.clone()) {
328                return Err(Error::ConstraintError(format!(
329                    "Duplicate key violates primary key constraint on {pk_label} '{}' (PRIMARY KEY or UNIQUE constraint violation)",
330                    pk_display
331                )));
332            }
333
334            existing_keys.insert(key);
335        }
336
337        Ok(())
338    }
339
340    /// Validate that deleting the given rows will not violate foreign key constraints.
341    pub fn validate_delete_foreign_keys<FParents, FChildren>(
342        &self,
343        referenced_table_id: TableId,
344        referenced_row_ids: &[RowId],
345        mut fetch_parent_rows: FParents,
346        mut fetch_child_rows: FChildren,
347    ) -> LlkvResult<()>
348    where
349        FParents: FnMut(ForeignKeyParentRowsFetch<'_>) -> LlkvResult<Vec<Vec<PlanValue>>>,
350        FChildren: FnMut(ForeignKeyChildRowsFetch<'_>) -> LlkvResult<Vec<(RowId, Vec<PlanValue>)>>,
351    {
352        if referenced_row_ids.is_empty() {
353            return Ok(());
354        }
355
356        let referencing = self
357            .metadata
358            .foreign_keys_referencing(referenced_table_id)?;
359        if referencing.is_empty() {
360            return Ok(());
361        }
362
363        let deleting_row_ids: FxHashSet<RowId> = referenced_row_ids.iter().copied().collect();
364
365        for (child_table_id, constraint_id) in referencing {
366            let details = self
367                .metadata
368                .foreign_key_views(self.catalog.as_ref(), child_table_id)?;
369
370            let Some(detail) = details
371                .into_iter()
372                .find(|detail| detail.constraint_id == constraint_id)
373            else {
374                continue;
375            };
376
377            if detail.referenced_field_ids.is_empty() || detail.referencing_field_ids.is_empty() {
378                continue;
379            }
380
381            let parent_rows = fetch_parent_rows(ForeignKeyParentRowsFetch {
382                referenced_table_id,
383                referenced_row_ids,
384                referenced_field_ids: &detail.referenced_field_ids,
385            })?;
386
387            let parent_keys = canonical_parent_keys(&detail, parent_rows);
388            if parent_keys.is_empty() {
389                continue;
390            }
391
392            let child_rows = fetch_child_rows(ForeignKeyChildRowsFetch {
393                referencing_table_id: detail.referencing_table_id,
394                referencing_table_canonical: &detail.referencing_table_canonical,
395                referencing_field_ids: &detail.referencing_field_ids,
396            })?;
397
398            if child_rows.is_empty() {
399                continue;
400            }
401
402            for (child_row_id, values) in child_rows {
403                if values.len() != detail.referencing_field_ids.len() {
404                    continue;
405                }
406
407                if values.iter().any(|value| matches!(value, PlanValue::Null)) {
408                    continue;
409                }
410
411                if parent_keys.iter().all(|key| key != &values) {
412                    continue;
413                }
414
415                if detail.referencing_table_id == detail.referenced_table_id
416                    && deleting_row_ids.contains(&child_row_id)
417                {
418                    continue;
419                }
420
421                let constraint_label = detail.constraint_name.as_deref().unwrap_or("FOREIGN KEY");
422                match detail.on_delete {
423                    ForeignKeyAction::NoAction | ForeignKeyAction::Restrict => {
424                        return Err(Error::ConstraintError(format!(
425                            "Violates foreign key constraint '{}' on table '{}' referencing '{}' - row is still referenced by a foreign key in a different table",
426                            constraint_label,
427                            detail.referencing_table_display,
428                            detail.referenced_table_display,
429                        )));
430                    }
431                }
432            }
433        }
434
435        Ok(())
436    }
437
438    /// Return the set of foreign keys referencing the provided table.
439    pub fn referencing_foreign_keys(
440        &self,
441        referenced_table_id: TableId,
442    ) -> LlkvResult<Vec<ForeignKeyView>> {
443        let referencing = self
444            .metadata
445            .foreign_keys_referencing(referenced_table_id)?;
446
447        if referencing.is_empty() {
448            return Ok(Vec::new());
449        }
450
451        let mut details_out = Vec::new();
452        for (child_table_id, constraint_id) in referencing {
453            let details = match self
454                .metadata
455                .foreign_key_views(self.catalog.as_ref(), child_table_id)
456            {
457                Ok(details) => details,
458                Err(Error::InvalidArgumentError(_)) | Err(Error::CatalogError(_)) => continue,
459                Err(err) => return Err(err),
460            };
461
462            if let Some(detail) = details
463                .into_iter()
464                .find(|detail| detail.constraint_id == constraint_id)
465            {
466                details_out.push(detail);
467            }
468        }
469
470        Ok(details_out)
471    }
472}
473
474fn build_field_lookup(schema_field_ids: &[FieldId]) -> FxHashMap<FieldId, usize> {
475    let mut lookup = FxHashMap::default();
476    for (idx, field_id) in schema_field_ids.iter().copied().enumerate() {
477        lookup.insert(field_id, idx);
478    }
479    lookup
480}
481
482fn validate_row_constraints_with_mapping(
483    column_constraints: &[InsertColumnConstraint],
484    rows: &[Vec<PlanValue>],
485    schema_to_row_index: &[Option<usize>],
486    column_order: &[usize],
487) -> LlkvResult<()> {
488    for constraint in column_constraints {
489        if constraint.column.nullable {
490            continue;
491        }
492
493        let Some(row_pos) = schema_to_row_index
494            .get(constraint.schema_index)
495            .and_then(|opt| *opt)
496        else {
497            return Err(Error::ConstraintError(format!(
498                "NOT NULL column '{}' missing from INSERT/UPDATE",
499                constraint.column.name
500            )));
501        };
502
503        for row in rows {
504            if matches!(row.get(row_pos), Some(PlanValue::Null)) {
505                return Err(Error::ConstraintError(format!(
506                    "NOT NULL constraint failed for column '{}'",
507                    constraint.column.name
508                )));
509            }
510        }
511    }
512
513    let check_columns: Vec<ConstraintColumnInfo> = column_constraints
514        .iter()
515        .map(|constraint| constraint.column.clone())
516        .collect();
517    validate_check_constraints(check_columns.as_slice(), rows, column_order)?;
518    Ok(())
519}
520
521fn build_schema_to_row_index(
522    schema_len: usize,
523    column_order: &[usize],
524) -> LlkvResult<Vec<Option<usize>>> {
525    let mut schema_to_row_index: Vec<Option<usize>> = vec![None; schema_len];
526    for (row_pos, &schema_idx) in column_order.iter().enumerate() {
527        if schema_idx >= schema_len {
528            return Err(Error::Internal(format!(
529                "column index {} out of bounds for schema (len={})",
530                schema_idx, schema_len
531            )));
532        }
533        schema_to_row_index[schema_idx] = Some(row_pos);
534    }
535    Ok(schema_to_row_index)
536}
537
538fn primary_key_context(column_names: &[String]) -> (&'static str, String) {
539    if column_names.len() == 1 {
540        ("column", column_names[0].clone())
541    } else {
542        ("columns", column_names.join(", "))
543    }
544}
545
546fn collect_row_sets(
547    rows: &[Vec<PlanValue>],
548    schema_to_row_index: &[Option<usize>],
549    schema_indices: &[usize],
550) -> Vec<Vec<PlanValue>> {
551    rows.iter()
552        .map(|row| {
553            schema_indices
554                .iter()
555                .map(|&schema_idx| {
556                    schema_to_row_index
557                        .get(schema_idx)
558                        .and_then(|opt| {
559                            opt.map(|row_pos| row.get(row_pos).cloned().unwrap_or(PlanValue::Null))
560                        })
561                        .unwrap_or(PlanValue::Null)
562                })
563                .collect()
564        })
565        .collect()
566}
567
568fn referencing_row_positions(
569    detail: &ForeignKeyView,
570    lookup: &FxHashMap<FieldId, usize>,
571    table_to_row_index: &[Option<usize>],
572    table_id: TableId,
573) -> LlkvResult<Vec<usize>> {
574    let mut positions = Vec::with_capacity(detail.referencing_field_ids.len());
575
576    for (idx, field_id) in detail.referencing_field_ids.iter().cloned().enumerate() {
577        let schema_index = lookup.get(&field_id).cloned().ok_or_else(|| {
578            Error::Internal(format!(
579                "referencing field id {} not found in table '{}' (table_id={})",
580                field_id, detail.referencing_table_display, table_id
581            ))
582        })?;
583
584        let position = table_to_row_index
585            .get(schema_index)
586            .and_then(|value| *value)
587            .ok_or_else(|| {
588                let column_name = detail
589                    .referencing_column_names
590                    .get(idx)
591                    .cloned()
592                    .unwrap_or_else(|| schema_index.to_string());
593                Error::InvalidArgumentError(format!(
594                    "FOREIGN KEY column '{}' missing from INSERT statement",
595                    column_name
596                ))
597            })?;
598
599        positions.push(position);
600    }
601
602    Ok(positions)
603}
604
605fn canonical_parent_keys(
606    detail: &ForeignKeyView,
607    parent_rows: Vec<Vec<PlanValue>>,
608) -> Vec<Vec<PlanValue>> {
609    parent_rows
610        .into_iter()
611        .filter(|values| values.len() == detail.referenced_field_ids.len())
612        .filter(|values| !values.iter().any(|value| matches!(value, PlanValue::Null)))
613        .collect()
614}
615
616fn candidate_child_keys(
617    positions: &[usize],
618    rows: &[Vec<PlanValue>],
619) -> LlkvResult<Vec<Vec<PlanValue>>> {
620    let mut keys = Vec::new();
621
622    for row in rows {
623        let mut key: Vec<PlanValue> = Vec::with_capacity(positions.len());
624        let mut contains_null = false;
625
626        for &row_pos in positions {
627            let value = row.get(row_pos).cloned().ok_or_else(|| {
628                Error::InvalidArgumentError("INSERT row is missing a required column value".into())
629            })?;
630
631            if matches!(value, PlanValue::Null) {
632                contains_null = true;
633                break;
634            }
635
636            key.push(value);
637        }
638
639        if contains_null {
640            continue;
641        }
642
643        keys.push(key);
644    }
645
646    Ok(keys)
647}