llkv_table/constraints/
service.rs

1//! Constraint service that centralises runtime-facing constraint validation helpers.
2//! Currently focuses on foreign key enforcement for INSERT operations and will
3//! gradually expand to cover additional constraint workflows.
4
5#![forbid(unsafe_code)]
6
7use super::types::ForeignKeyAction;
8use super::validation::validate_foreign_key_rows;
9use super::validation::{
10    ConstraintColumnInfo, UniqueKey, build_composite_unique_key, ensure_multi_column_unique,
11    ensure_primary_key, ensure_single_column_unique, validate_check_constraints,
12};
13use crate::catalog::TableCatalog;
14use crate::metadata::MetadataManager;
15use crate::types::{FieldId, RowId, TableId};
16use crate::view::ForeignKeyView;
17use llkv_plan::PlanValue;
18use llkv_result::{Error, Result as LlkvResult};
19use llkv_storage::pager::Pager;
20use rustc_hash::{FxHashMap, FxHashSet};
21use simd_r_drive_entry_handle::EntryHandle;
22use std::sync::Arc;
23
24/// Column metadata required to validate NOT NULL and CHECK constraints during inserts.
25#[derive(Clone, Debug)]
26pub struct InsertColumnConstraint {
27    pub schema_index: usize,
28    pub column: ConstraintColumnInfo,
29}
30
31/// Descriptor for a single-column UNIQUE constraint.
32#[derive(Clone, Debug)]
33pub struct InsertUniqueColumn {
34    pub schema_index: usize,
35    pub field_id: FieldId,
36    pub name: String,
37}
38
39/// Descriptor for composite UNIQUE or PRIMARY KEY constraints.
40#[derive(Clone, Debug)]
41pub struct InsertMultiColumnUnique {
42    pub schema_indices: Vec<usize>,
43    pub field_ids: Vec<FieldId>,
44    pub column_names: Vec<String>,
45}
46
47/// Callback payload describing what parent rows need to be fetched for validation.
48pub struct ForeignKeyRowFetch<'a> {
49    pub referenced_table_id: TableId,
50    pub referenced_table_canonical: &'a str,
51    pub referenced_field_ids: &'a [FieldId],
52}
53
54/// Context for collecting parent row values involved in a DELETE operation.
55pub struct ForeignKeyParentRowsFetch<'a> {
56    pub referenced_table_id: TableId,
57    pub referenced_row_ids: &'a [RowId],
58    pub referenced_field_ids: &'a [FieldId],
59}
60
61/// Context for fetching visible child rows that might reference deleted parents.
62pub struct ForeignKeyChildRowsFetch<'a> {
63    pub referencing_table_id: TableId,
64    pub referencing_table_canonical: &'a str,
65    pub referencing_field_ids: &'a [FieldId],
66}
67
68/// High-level constraint service API intended for runtime consumers.
69#[derive(Clone)]
70pub struct ConstraintService<P>
71where
72    P: Pager<Blob = EntryHandle> + Send + Sync,
73{
74    metadata: Arc<MetadataManager<P>>,
75    catalog: Arc<TableCatalog>,
76}
77
78impl<P> ConstraintService<P>
79where
80    P: Pager<Blob = EntryHandle> + Send + Sync,
81{
82    pub fn new(metadata: Arc<MetadataManager<P>>, catalog: Arc<TableCatalog>) -> Self {
83        Self { metadata, catalog }
84    }
85
86    /// Validate that incoming INSERT rows satisfy the table's foreign key constraints.
87    pub fn validate_insert_foreign_keys<F>(
88        &self,
89        referencing_table_id: TableId,
90        schema_field_ids: &[FieldId],
91        column_order: &[usize],
92        rows: &[Vec<PlanValue>],
93        mut fetch_parent_rows: F,
94    ) -> LlkvResult<()>
95    where
96        F: FnMut(ForeignKeyRowFetch<'_>) -> LlkvResult<Vec<Vec<PlanValue>>>,
97    {
98        if rows.is_empty() {
99            return Ok(());
100        }
101
102        let details = self
103            .metadata
104            .foreign_key_views(self.catalog.as_ref(), referencing_table_id)?;
105
106        if details.is_empty() {
107            return Ok(());
108        }
109
110        let field_lookup = build_field_lookup(schema_field_ids);
111        let mut table_to_row_index: Vec<Option<usize>> = vec![None; schema_field_ids.len()];
112        for (row_pos, &schema_idx) in column_order.iter().enumerate() {
113            if let Some(slot) = table_to_row_index.get_mut(schema_idx) {
114                *slot = Some(row_pos);
115            }
116        }
117
118        for detail in &details {
119            if detail.referencing_field_ids.is_empty() {
120                continue;
121            }
122
123            let referencing_positions = referencing_row_positions(
124                detail,
125                &field_lookup,
126                &table_to_row_index,
127                referencing_table_id,
128            )?;
129
130            let parent_rows = fetch_parent_rows(ForeignKeyRowFetch {
131                referenced_table_id: detail.referenced_table_id,
132                referenced_table_canonical: &detail.referenced_table_canonical,
133                referenced_field_ids: &detail.referenced_field_ids,
134            })?;
135
136            let parent_keys = canonical_parent_keys(detail, parent_rows);
137            let candidate_keys = candidate_child_keys(&referencing_positions, rows)?;
138
139            validate_foreign_key_rows(
140                detail.constraint_name.as_deref(),
141                &detail.referencing_table_display,
142                &detail.referenced_table_display,
143                &detail.referenced_column_names,
144                &parent_keys,
145                &candidate_keys,
146            )?;
147        }
148
149        Ok(())
150    }
151
152    #[allow(clippy::too_many_arguments)]
153    pub fn validate_insert_constraints<FSingle, FMulti>(
154        &self,
155        schema_field_ids: &[FieldId],
156        column_constraints: &[InsertColumnConstraint],
157        unique_columns: &[InsertUniqueColumn],
158        multi_column_uniques: &[InsertMultiColumnUnique],
159        primary_key: Option<&InsertMultiColumnUnique>,
160        column_order: &[usize],
161        rows: &[Vec<PlanValue>],
162        mut fetch_column_values: FSingle,
163        mut fetch_multi_column_rows: FMulti,
164    ) -> LlkvResult<()>
165    where
166        FSingle: FnMut(FieldId) -> LlkvResult<Vec<PlanValue>>,
167        FMulti: FnMut(&[FieldId]) -> LlkvResult<Vec<Vec<PlanValue>>>,
168    {
169        if rows.is_empty() {
170            return Ok(());
171        }
172
173        let schema_to_row_index = build_schema_to_row_index(schema_field_ids.len(), column_order)?;
174        validate_row_constraints_with_mapping(
175            column_constraints,
176            rows,
177            &schema_to_row_index,
178            column_order,
179        )?;
180
181        for unique in unique_columns {
182            let Some(row_pos) = schema_to_row_index
183                .get(unique.schema_index)
184                .and_then(|opt| *opt)
185            else {
186                continue;
187            };
188
189            let existing_values = fetch_column_values(unique.field_id)?;
190            let mut new_values: Vec<PlanValue> = Vec::with_capacity(rows.len());
191            for row in rows {
192                let value = row.get(row_pos).cloned().unwrap_or(PlanValue::Null);
193                new_values.push(value);
194            }
195
196            ensure_single_column_unique(&existing_values, &new_values, &unique.name)?;
197        }
198
199        for constraint in multi_column_uniques {
200            if constraint.schema_indices.is_empty() {
201                continue;
202            }
203
204            let existing_rows = fetch_multi_column_rows(&constraint.field_ids)?;
205            let new_rows = collect_row_sets(rows, &schema_to_row_index, &constraint.schema_indices);
206            ensure_multi_column_unique(&existing_rows, &new_rows, &constraint.column_names)?;
207        }
208
209        if let Some(pk) = primary_key
210            && !pk.schema_indices.is_empty()
211        {
212            let existing_rows = fetch_multi_column_rows(&pk.field_ids)?;
213            let new_rows = collect_row_sets(rows, &schema_to_row_index, &pk.schema_indices);
214            ensure_primary_key(&existing_rows, &new_rows, &pk.column_names)?;
215        }
216
217        Ok(())
218    }
219
220    pub fn validate_row_level_constraints(
221        &self,
222        schema_field_ids: &[FieldId],
223        column_constraints: &[InsertColumnConstraint],
224        column_order: &[usize],
225        rows: &[Vec<PlanValue>],
226    ) -> LlkvResult<()> {
227        if rows.is_empty() {
228            return Ok(());
229        }
230
231        let schema_to_row_index = build_schema_to_row_index(schema_field_ids.len(), column_order)?;
232        validate_row_constraints_with_mapping(
233            column_constraints,
234            rows,
235            &schema_to_row_index,
236            column_order,
237        )
238    }
239
240    pub fn validate_primary_key_rows<F>(
241        &self,
242        schema_field_ids: &[FieldId],
243        primary_key: &InsertMultiColumnUnique,
244        column_order: &[usize],
245        rows: &[Vec<PlanValue>],
246        mut fetch_multi_column_rows: F,
247    ) -> LlkvResult<()>
248    where
249        F: FnMut(&[FieldId]) -> LlkvResult<Vec<Vec<PlanValue>>>,
250    {
251        if rows.is_empty() || primary_key.schema_indices.is_empty() {
252            return Ok(());
253        }
254
255        let schema_to_row_index = build_schema_to_row_index(schema_field_ids.len(), column_order)?;
256        let existing_rows = fetch_multi_column_rows(&primary_key.field_ids)?;
257        let new_rows = collect_row_sets(rows, &schema_to_row_index, &primary_key.schema_indices);
258        ensure_primary_key(&existing_rows, &new_rows, &primary_key.column_names)
259    }
260
261    pub fn validate_update_primary_keys<F>(
262        &self,
263        schema_field_ids: &[FieldId],
264        primary_key: &InsertMultiColumnUnique,
265        column_order: &[usize],
266        rows: &[Vec<PlanValue>],
267        original_keys: &[Option<UniqueKey>],
268        mut fetch_multi_column_rows: F,
269    ) -> LlkvResult<()>
270    where
271        F: FnMut(&[FieldId]) -> LlkvResult<Vec<Vec<PlanValue>>>,
272    {
273        if rows.is_empty() || primary_key.schema_indices.is_empty() {
274            return Ok(());
275        }
276
277        if original_keys.len() != rows.len() {
278            return Err(Error::Internal(
279                "primary key original value count does not match row count".into(),
280            ));
281        }
282
283        let schema_to_row_index = build_schema_to_row_index(schema_field_ids.len(), column_order)?;
284
285        let mut existing_rows = fetch_multi_column_rows(&primary_key.field_ids)?;
286        let mut existing_keys: FxHashSet<UniqueKey> = FxHashSet::default();
287        for row_values in existing_rows.drain(..) {
288            if let Some(key) = build_composite_unique_key(&row_values, &primary_key.column_names)? {
289                existing_keys.insert(key);
290            }
291        }
292
293        for key in original_keys.iter().flatten() {
294            existing_keys.remove(key);
295        }
296
297        let (pk_label, pk_display) = primary_key_context(&primary_key.column_names);
298        let mut new_seen: FxHashSet<UniqueKey> = FxHashSet::default();
299        let new_row_sets =
300            collect_row_sets(rows, &schema_to_row_index, &primary_key.schema_indices);
301
302        for values in new_row_sets {
303            let key = build_composite_unique_key(&values, &primary_key.column_names)?;
304            let key = key.ok_or_else(|| {
305                Error::ConstraintError(format!(
306                    "constraint failed: NOT NULL constraint failed for PRIMARY KEY {pk_label} '{pk_display}'"
307                ))
308            })?;
309
310            if existing_keys.contains(&key) {
311                return Err(Error::ConstraintError(format!(
312                    "Duplicate key violates primary key constraint on {pk_label} '{}' (PRIMARY KEY or UNIQUE constraint violation)",
313                    pk_display
314                )));
315            }
316
317            if !new_seen.insert(key.clone()) {
318                return Err(Error::ConstraintError(format!(
319                    "Duplicate key violates primary key constraint on {pk_label} '{}' (PRIMARY KEY or UNIQUE constraint violation)",
320                    pk_display
321                )));
322            }
323
324            existing_keys.insert(key);
325        }
326
327        Ok(())
328    }
329
330    /// Validate that deleting the given rows will not violate foreign key constraints.
331    pub fn validate_delete_foreign_keys<FParents, FChildren>(
332        &self,
333        referenced_table_id: TableId,
334        referenced_row_ids: &[RowId],
335        mut fetch_parent_rows: FParents,
336        mut fetch_child_rows: FChildren,
337    ) -> LlkvResult<()>
338    where
339        FParents: FnMut(ForeignKeyParentRowsFetch<'_>) -> LlkvResult<Vec<Vec<PlanValue>>>,
340        FChildren: FnMut(ForeignKeyChildRowsFetch<'_>) -> LlkvResult<Vec<(RowId, Vec<PlanValue>)>>,
341    {
342        if referenced_row_ids.is_empty() {
343            return Ok(());
344        }
345
346        let referencing = self
347            .metadata
348            .foreign_keys_referencing(referenced_table_id)?;
349        if referencing.is_empty() {
350            return Ok(());
351        }
352
353        let deleting_row_ids: FxHashSet<RowId> = referenced_row_ids.iter().copied().collect();
354
355        for (child_table_id, constraint_id) in referencing {
356            let details = self
357                .metadata
358                .foreign_key_views(self.catalog.as_ref(), child_table_id)?;
359
360            let Some(detail) = details
361                .into_iter()
362                .find(|detail| detail.constraint_id == constraint_id)
363            else {
364                continue;
365            };
366
367            if detail.referenced_field_ids.is_empty() || detail.referencing_field_ids.is_empty() {
368                continue;
369            }
370
371            let parent_rows = fetch_parent_rows(ForeignKeyParentRowsFetch {
372                referenced_table_id,
373                referenced_row_ids,
374                referenced_field_ids: &detail.referenced_field_ids,
375            })?;
376
377            let parent_keys = canonical_parent_keys(&detail, parent_rows);
378            if parent_keys.is_empty() {
379                continue;
380            }
381
382            let child_rows = fetch_child_rows(ForeignKeyChildRowsFetch {
383                referencing_table_id: detail.referencing_table_id,
384                referencing_table_canonical: &detail.referencing_table_canonical,
385                referencing_field_ids: &detail.referencing_field_ids,
386            })?;
387
388            if child_rows.is_empty() {
389                continue;
390            }
391
392            for (child_row_id, values) in child_rows {
393                if values.len() != detail.referencing_field_ids.len() {
394                    continue;
395                }
396
397                if values.iter().any(|value| matches!(value, PlanValue::Null)) {
398                    continue;
399                }
400
401                if parent_keys.iter().all(|key| key != &values) {
402                    continue;
403                }
404
405                if detail.referencing_table_id == detail.referenced_table_id
406                    && deleting_row_ids.contains(&child_row_id)
407                {
408                    continue;
409                }
410
411                let constraint_label = detail.constraint_name.as_deref().unwrap_or("FOREIGN KEY");
412                match detail.on_delete {
413                    ForeignKeyAction::NoAction | ForeignKeyAction::Restrict => {
414                        return Err(Error::ConstraintError(format!(
415                            "Violates foreign key constraint '{}' on table '{}' referencing '{}' - row is still referenced by a foreign key in a different table",
416                            constraint_label,
417                            detail.referencing_table_display,
418                            detail.referenced_table_display,
419                        )));
420                    }
421                }
422            }
423        }
424
425        Ok(())
426    }
427
428    /// Return the set of foreign keys referencing the provided table.
429    pub fn referencing_foreign_keys(
430        &self,
431        referenced_table_id: TableId,
432    ) -> LlkvResult<Vec<ForeignKeyView>> {
433        let referencing = self
434            .metadata
435            .foreign_keys_referencing(referenced_table_id)?;
436
437        if referencing.is_empty() {
438            return Ok(Vec::new());
439        }
440
441        let mut details_out = Vec::new();
442        for (child_table_id, constraint_id) in referencing {
443            let details = match self
444                .metadata
445                .foreign_key_views(self.catalog.as_ref(), child_table_id)
446            {
447                Ok(details) => details,
448                Err(Error::InvalidArgumentError(_)) | Err(Error::CatalogError(_)) => continue,
449                Err(err) => return Err(err),
450            };
451
452            if let Some(detail) = details
453                .into_iter()
454                .find(|detail| detail.constraint_id == constraint_id)
455            {
456                details_out.push(detail);
457            }
458        }
459
460        Ok(details_out)
461    }
462}
463
464fn build_field_lookup(schema_field_ids: &[FieldId]) -> FxHashMap<FieldId, usize> {
465    let mut lookup = FxHashMap::default();
466    for (idx, field_id) in schema_field_ids.iter().copied().enumerate() {
467        lookup.insert(field_id, idx);
468    }
469    lookup
470}
471
472fn validate_row_constraints_with_mapping(
473    column_constraints: &[InsertColumnConstraint],
474    rows: &[Vec<PlanValue>],
475    schema_to_row_index: &[Option<usize>],
476    column_order: &[usize],
477) -> LlkvResult<()> {
478    for constraint in column_constraints {
479        if constraint.column.nullable {
480            continue;
481        }
482
483        let Some(row_pos) = schema_to_row_index
484            .get(constraint.schema_index)
485            .and_then(|opt| *opt)
486        else {
487            return Err(Error::ConstraintError(format!(
488                "NOT NULL column '{}' missing from INSERT/UPDATE",
489                constraint.column.name
490            )));
491        };
492
493        for row in rows {
494            if matches!(row.get(row_pos), Some(PlanValue::Null)) {
495                return Err(Error::ConstraintError(format!(
496                    "NOT NULL constraint failed for column '{}'",
497                    constraint.column.name
498                )));
499            }
500        }
501    }
502
503    let check_columns: Vec<ConstraintColumnInfo> = column_constraints
504        .iter()
505        .map(|constraint| constraint.column.clone())
506        .collect();
507    validate_check_constraints(check_columns.as_slice(), rows, column_order)?;
508    Ok(())
509}
510
511fn build_schema_to_row_index(
512    schema_len: usize,
513    column_order: &[usize],
514) -> LlkvResult<Vec<Option<usize>>> {
515    let mut schema_to_row_index: Vec<Option<usize>> = vec![None; schema_len];
516    for (row_pos, &schema_idx) in column_order.iter().enumerate() {
517        if schema_idx >= schema_len {
518            return Err(Error::Internal(format!(
519                "column index {} out of bounds for schema (len={})",
520                schema_idx, schema_len
521            )));
522        }
523        schema_to_row_index[schema_idx] = Some(row_pos);
524    }
525    Ok(schema_to_row_index)
526}
527
528fn primary_key_context(column_names: &[String]) -> (&'static str, String) {
529    if column_names.len() == 1 {
530        ("column", column_names[0].clone())
531    } else {
532        ("columns", column_names.join(", "))
533    }
534}
535
536fn collect_row_sets(
537    rows: &[Vec<PlanValue>],
538    schema_to_row_index: &[Option<usize>],
539    schema_indices: &[usize],
540) -> Vec<Vec<PlanValue>> {
541    rows.iter()
542        .map(|row| {
543            schema_indices
544                .iter()
545                .map(|&schema_idx| {
546                    schema_to_row_index
547                        .get(schema_idx)
548                        .and_then(|opt| {
549                            opt.map(|row_pos| row.get(row_pos).cloned().unwrap_or(PlanValue::Null))
550                        })
551                        .unwrap_or(PlanValue::Null)
552                })
553                .collect()
554        })
555        .collect()
556}
557
558fn referencing_row_positions(
559    detail: &ForeignKeyView,
560    lookup: &FxHashMap<FieldId, usize>,
561    table_to_row_index: &[Option<usize>],
562    table_id: TableId,
563) -> LlkvResult<Vec<usize>> {
564    let mut positions = Vec::with_capacity(detail.referencing_field_ids.len());
565
566    for (idx, field_id) in detail.referencing_field_ids.iter().cloned().enumerate() {
567        let schema_index = lookup.get(&field_id).cloned().ok_or_else(|| {
568            Error::Internal(format!(
569                "referencing field id {} not found in table '{}' (table_id={})",
570                field_id, detail.referencing_table_display, table_id
571            ))
572        })?;
573
574        let position = table_to_row_index
575            .get(schema_index)
576            .and_then(|value| *value)
577            .ok_or_else(|| {
578                let column_name = detail
579                    .referencing_column_names
580                    .get(idx)
581                    .cloned()
582                    .unwrap_or_else(|| schema_index.to_string());
583                Error::InvalidArgumentError(format!(
584                    "FOREIGN KEY column '{}' missing from INSERT statement",
585                    column_name
586                ))
587            })?;
588
589        positions.push(position);
590    }
591
592    Ok(positions)
593}
594
595fn canonical_parent_keys(
596    detail: &ForeignKeyView,
597    parent_rows: Vec<Vec<PlanValue>>,
598) -> Vec<Vec<PlanValue>> {
599    parent_rows
600        .into_iter()
601        .filter(|values| values.len() == detail.referenced_field_ids.len())
602        .filter(|values| !values.iter().any(|value| matches!(value, PlanValue::Null)))
603        .collect()
604}
605
606fn candidate_child_keys(
607    positions: &[usize],
608    rows: &[Vec<PlanValue>],
609) -> LlkvResult<Vec<Vec<PlanValue>>> {
610    let mut keys = Vec::new();
611
612    for row in rows {
613        let mut key: Vec<PlanValue> = Vec::with_capacity(positions.len());
614        let mut contains_null = false;
615
616        for &row_pos in positions {
617            let value = row.get(row_pos).cloned().ok_or_else(|| {
618                Error::InvalidArgumentError("INSERT row is missing a required column value".into())
619            })?;
620
621            if matches!(value, PlanValue::Null) {
622                contains_null = true;
623                break;
624            }
625
626            key.push(value);
627        }
628
629        if contains_null {
630            continue;
631        }
632
633        keys.push(key);
634    }
635
636    Ok(keys)
637}