1#![forbid(unsafe_code)]
6
7use super::types::ForeignKeyAction;
8use super::validation::validate_foreign_key_rows;
9use super::validation::{
10 ConstraintColumnInfo, UniqueKey, build_composite_unique_key, ensure_multi_column_unique,
11 ensure_primary_key, ensure_single_column_unique, validate_check_constraints,
12};
13use crate::catalog::TableCatalog;
14use crate::metadata::MetadataManager;
15use crate::types::{FieldId, RowId, TableId};
16use crate::view::ForeignKeyView;
17use llkv_plan::PlanValue;
18use llkv_result::{Error, Result as LlkvResult};
19use llkv_storage::pager::Pager;
20use rustc_hash::{FxHashMap, FxHashSet};
21use simd_r_drive_entry_handle::EntryHandle;
22use std::sync::Arc;
23
24#[derive(Clone, Debug)]
26pub struct InsertColumnConstraint {
27 pub schema_index: usize,
28 pub column: ConstraintColumnInfo,
29}
30
31#[derive(Clone, Debug)]
33pub struct InsertUniqueColumn {
34 pub schema_index: usize,
35 pub field_id: FieldId,
36 pub name: String,
37}
38
39#[derive(Clone, Debug)]
41pub struct InsertMultiColumnUnique {
42 pub schema_indices: Vec<usize>,
43 pub field_ids: Vec<FieldId>,
44 pub column_names: Vec<String>,
45}
46
47pub struct ForeignKeyRowFetch<'a> {
49 pub referenced_table_id: TableId,
50 pub referenced_table_canonical: &'a str,
51 pub referenced_field_ids: &'a [FieldId],
52}
53
54pub struct ForeignKeyParentRowsFetch<'a> {
56 pub referenced_table_id: TableId,
57 pub referenced_row_ids: &'a [RowId],
58 pub referenced_field_ids: &'a [FieldId],
59}
60
61pub struct ForeignKeyChildRowsFetch<'a> {
63 pub referencing_table_id: TableId,
64 pub referencing_table_canonical: &'a str,
65 pub referencing_field_ids: &'a [FieldId],
66}
67
68#[derive(Clone)]
70pub struct ConstraintService<P>
71where
72 P: Pager<Blob = EntryHandle> + Send + Sync,
73{
74 metadata: Arc<MetadataManager<P>>,
75 catalog: Arc<TableCatalog>,
76}
77
78impl<P> ConstraintService<P>
79where
80 P: Pager<Blob = EntryHandle> + Send + Sync,
81{
82 pub fn new(metadata: Arc<MetadataManager<P>>, catalog: Arc<TableCatalog>) -> Self {
84 Self { metadata, catalog }
85 }
86
87 pub fn validate_insert_foreign_keys<F>(
89 &self,
90 referencing_table_id: TableId,
91 schema_field_ids: &[FieldId],
92 column_order: &[usize],
93 rows: &[Vec<PlanValue>],
94 mut fetch_parent_rows: F,
95 ) -> LlkvResult<()>
96 where
97 F: FnMut(ForeignKeyRowFetch<'_>) -> LlkvResult<Vec<Vec<PlanValue>>>,
98 {
99 if rows.is_empty() {
100 return Ok(());
101 }
102
103 let details = self
104 .metadata
105 .foreign_key_views(self.catalog.as_ref(), referencing_table_id)?;
106
107 if details.is_empty() {
108 return Ok(());
109 }
110
111 let field_lookup = build_field_lookup(schema_field_ids);
112 let mut table_to_row_index: Vec<Option<usize>> = vec![None; schema_field_ids.len()];
113 for (row_pos, &schema_idx) in column_order.iter().enumerate() {
114 if let Some(slot) = table_to_row_index.get_mut(schema_idx) {
115 *slot = Some(row_pos);
116 }
117 }
118
119 for detail in &details {
120 if detail.referencing_field_ids.is_empty() {
121 continue;
122 }
123
124 let referencing_positions = referencing_row_positions(
125 detail,
126 &field_lookup,
127 &table_to_row_index,
128 referencing_table_id,
129 )?;
130
131 let parent_rows = fetch_parent_rows(ForeignKeyRowFetch {
132 referenced_table_id: detail.referenced_table_id,
133 referenced_table_canonical: &detail.referenced_table_canonical,
134 referenced_field_ids: &detail.referenced_field_ids,
135 })?;
136
137 let parent_keys = canonical_parent_keys(detail, parent_rows);
138 let candidate_keys = candidate_child_keys(&referencing_positions, rows)?;
139
140 validate_foreign_key_rows(
141 detail.constraint_name.as_deref(),
142 &detail.referencing_table_display,
143 &detail.referenced_table_display,
144 &detail.referenced_column_names,
145 &parent_keys,
146 &candidate_keys,
147 )?;
148 }
149
150 Ok(())
151 }
152
153 #[allow(clippy::too_many_arguments)]
157 pub fn validate_insert_constraints<FSingle, FMulti>(
158 &self,
159 schema_field_ids: &[FieldId],
160 column_constraints: &[InsertColumnConstraint],
161 unique_columns: &[InsertUniqueColumn],
162 multi_column_uniques: &[InsertMultiColumnUnique],
163 primary_key: Option<&InsertMultiColumnUnique>,
164 column_order: &[usize],
165 rows: &[Vec<PlanValue>],
166 mut fetch_column_values: FSingle,
167 mut fetch_multi_column_rows: FMulti,
168 ) -> LlkvResult<()>
169 where
170 FSingle: FnMut(FieldId) -> LlkvResult<Vec<PlanValue>>,
171 FMulti: FnMut(&[FieldId]) -> LlkvResult<Vec<Vec<PlanValue>>>,
172 {
173 if rows.is_empty() {
174 return Ok(());
175 }
176
177 let schema_to_row_index = build_schema_to_row_index(schema_field_ids.len(), column_order)?;
178 validate_row_constraints_with_mapping(
179 column_constraints,
180 rows,
181 &schema_to_row_index,
182 column_order,
183 )?;
184
185 for unique in unique_columns {
186 let Some(row_pos) = schema_to_row_index
187 .get(unique.schema_index)
188 .and_then(|opt| *opt)
189 else {
190 continue;
191 };
192
193 let existing_values = fetch_column_values(unique.field_id)?;
194 let mut new_values: Vec<PlanValue> = Vec::with_capacity(rows.len());
195 for row in rows {
196 let value = row.get(row_pos).cloned().unwrap_or(PlanValue::Null);
197 new_values.push(value);
198 }
199
200 ensure_single_column_unique(&existing_values, &new_values, &unique.name)?;
201 }
202
203 for constraint in multi_column_uniques {
204 if constraint.schema_indices.is_empty() {
205 continue;
206 }
207
208 let existing_rows = fetch_multi_column_rows(&constraint.field_ids)?;
209 let new_rows = collect_row_sets(rows, &schema_to_row_index, &constraint.schema_indices);
210 ensure_multi_column_unique(&existing_rows, &new_rows, &constraint.column_names)?;
211 }
212
213 if let Some(pk) = primary_key
214 && !pk.schema_indices.is_empty()
215 {
216 let existing_rows = fetch_multi_column_rows(&pk.field_ids)?;
217 let new_rows = collect_row_sets(rows, &schema_to_row_index, &pk.schema_indices);
218 ensure_primary_key(&existing_rows, &new_rows, &pk.column_names)?;
219 }
220
221 Ok(())
222 }
223
224 pub fn validate_row_level_constraints(
227 &self,
228 schema_field_ids: &[FieldId],
229 column_constraints: &[InsertColumnConstraint],
230 column_order: &[usize],
231 rows: &[Vec<PlanValue>],
232 ) -> LlkvResult<()> {
233 if rows.is_empty() {
234 return Ok(());
235 }
236
237 let schema_to_row_index = build_schema_to_row_index(schema_field_ids.len(), column_order)?;
238 validate_row_constraints_with_mapping(
239 column_constraints,
240 rows,
241 &schema_to_row_index,
242 column_order,
243 )
244 }
245
246 pub fn validate_primary_key_rows<F>(
249 &self,
250 schema_field_ids: &[FieldId],
251 primary_key: &InsertMultiColumnUnique,
252 column_order: &[usize],
253 rows: &[Vec<PlanValue>],
254 mut fetch_multi_column_rows: F,
255 ) -> LlkvResult<()>
256 where
257 F: FnMut(&[FieldId]) -> LlkvResult<Vec<Vec<PlanValue>>>,
258 {
259 if rows.is_empty() || primary_key.schema_indices.is_empty() {
260 return Ok(());
261 }
262
263 let schema_to_row_index = build_schema_to_row_index(schema_field_ids.len(), column_order)?;
264 let existing_rows = fetch_multi_column_rows(&primary_key.field_ids)?;
265 let new_rows = collect_row_sets(rows, &schema_to_row_index, &primary_key.schema_indices);
266 ensure_primary_key(&existing_rows, &new_rows, &primary_key.column_names)
267 }
268
269 pub fn validate_update_primary_keys<F>(
272 &self,
273 schema_field_ids: &[FieldId],
274 primary_key: &InsertMultiColumnUnique,
275 column_order: &[usize],
276 rows: &[Vec<PlanValue>],
277 original_keys: &[Option<UniqueKey>],
278 mut fetch_multi_column_rows: F,
279 ) -> LlkvResult<()>
280 where
281 F: FnMut(&[FieldId]) -> LlkvResult<Vec<Vec<PlanValue>>>,
282 {
283 if rows.is_empty() || primary_key.schema_indices.is_empty() {
284 return Ok(());
285 }
286
287 if original_keys.len() != rows.len() {
288 return Err(Error::Internal(
289 "primary key original value count does not match row count".into(),
290 ));
291 }
292
293 let schema_to_row_index = build_schema_to_row_index(schema_field_ids.len(), column_order)?;
294
295 let mut existing_rows = fetch_multi_column_rows(&primary_key.field_ids)?;
296 let mut existing_keys: FxHashSet<UniqueKey> = FxHashSet::default();
297 for row_values in existing_rows.drain(..) {
298 if let Some(key) = build_composite_unique_key(&row_values, &primary_key.column_names)? {
299 existing_keys.insert(key);
300 }
301 }
302
303 for key in original_keys.iter().flatten() {
304 existing_keys.remove(key);
305 }
306
307 let (pk_label, pk_display) = primary_key_context(&primary_key.column_names);
308 let mut new_seen: FxHashSet<UniqueKey> = FxHashSet::default();
309 let new_row_sets =
310 collect_row_sets(rows, &schema_to_row_index, &primary_key.schema_indices);
311
312 for values in new_row_sets {
313 let key = build_composite_unique_key(&values, &primary_key.column_names)?;
314 let key = key.ok_or_else(|| {
315 Error::ConstraintError(format!(
316 "constraint failed: NOT NULL constraint failed for PRIMARY KEY {pk_label} '{pk_display}'"
317 ))
318 })?;
319
320 if existing_keys.contains(&key) {
321 return Err(Error::ConstraintError(format!(
322 "Duplicate key violates primary key constraint on {pk_label} '{}' (PRIMARY KEY or UNIQUE constraint violation)",
323 pk_display
324 )));
325 }
326
327 if !new_seen.insert(key.clone()) {
328 return Err(Error::ConstraintError(format!(
329 "Duplicate key violates primary key constraint on {pk_label} '{}' (PRIMARY KEY or UNIQUE constraint violation)",
330 pk_display
331 )));
332 }
333
334 existing_keys.insert(key);
335 }
336
337 Ok(())
338 }
339
340 pub fn validate_delete_foreign_keys<FParents, FChildren>(
342 &self,
343 referenced_table_id: TableId,
344 referenced_row_ids: &[RowId],
345 mut fetch_parent_rows: FParents,
346 mut fetch_child_rows: FChildren,
347 ) -> LlkvResult<()>
348 where
349 FParents: FnMut(ForeignKeyParentRowsFetch<'_>) -> LlkvResult<Vec<Vec<PlanValue>>>,
350 FChildren: FnMut(ForeignKeyChildRowsFetch<'_>) -> LlkvResult<Vec<(RowId, Vec<PlanValue>)>>,
351 {
352 if referenced_row_ids.is_empty() {
353 return Ok(());
354 }
355
356 let referencing = self
357 .metadata
358 .foreign_keys_referencing(referenced_table_id)?;
359 if referencing.is_empty() {
360 return Ok(());
361 }
362
363 let deleting_row_ids: FxHashSet<RowId> = referenced_row_ids.iter().copied().collect();
364
365 for (child_table_id, constraint_id) in referencing {
366 let details = self
367 .metadata
368 .foreign_key_views(self.catalog.as_ref(), child_table_id)?;
369
370 let Some(detail) = details
371 .into_iter()
372 .find(|detail| detail.constraint_id == constraint_id)
373 else {
374 continue;
375 };
376
377 if detail.referenced_field_ids.is_empty() || detail.referencing_field_ids.is_empty() {
378 continue;
379 }
380
381 let parent_rows = fetch_parent_rows(ForeignKeyParentRowsFetch {
382 referenced_table_id,
383 referenced_row_ids,
384 referenced_field_ids: &detail.referenced_field_ids,
385 })?;
386
387 let parent_keys = canonical_parent_keys(&detail, parent_rows);
388 if parent_keys.is_empty() {
389 continue;
390 }
391
392 let child_rows = fetch_child_rows(ForeignKeyChildRowsFetch {
393 referencing_table_id: detail.referencing_table_id,
394 referencing_table_canonical: &detail.referencing_table_canonical,
395 referencing_field_ids: &detail.referencing_field_ids,
396 })?;
397
398 if child_rows.is_empty() {
399 continue;
400 }
401
402 for (child_row_id, values) in child_rows {
403 if values.len() != detail.referencing_field_ids.len() {
404 continue;
405 }
406
407 if values.iter().any(|value| matches!(value, PlanValue::Null)) {
408 continue;
409 }
410
411 if parent_keys.iter().all(|key| key != &values) {
412 continue;
413 }
414
415 if detail.referencing_table_id == detail.referenced_table_id
416 && deleting_row_ids.contains(&child_row_id)
417 {
418 continue;
419 }
420
421 let constraint_label = detail.constraint_name.as_deref().unwrap_or("FOREIGN KEY");
422 match detail.on_delete {
423 ForeignKeyAction::NoAction | ForeignKeyAction::Restrict => {
424 return Err(Error::ConstraintError(format!(
425 "Violates foreign key constraint '{}' on table '{}' referencing '{}' - row is still referenced by a foreign key in a different table",
426 constraint_label,
427 detail.referencing_table_display,
428 detail.referenced_table_display,
429 )));
430 }
431 }
432 }
433 }
434
435 Ok(())
436 }
437
438 pub fn validate_update_foreign_keys<FParents, FChildren>(
443 &self,
444 referenced_table_id: TableId,
445 referenced_row_ids: &[RowId],
446 updated_field_ids: &[FieldId],
447 mut fetch_parent_rows: FParents,
448 mut fetch_child_rows: FChildren,
449 ) -> LlkvResult<()>
450 where
451 FParents: FnMut(ForeignKeyParentRowsFetch<'_>) -> LlkvResult<Vec<Vec<PlanValue>>>,
452 FChildren: FnMut(ForeignKeyChildRowsFetch<'_>) -> LlkvResult<Vec<(RowId, Vec<PlanValue>)>>,
453 {
454 if referenced_row_ids.is_empty() || updated_field_ids.is_empty() {
455 return Ok(());
456 }
457
458 let referencing = self
459 .metadata
460 .foreign_keys_referencing(referenced_table_id)?;
461 if referencing.is_empty() {
462 return Ok(());
463 }
464
465 for (child_table_id, constraint_id) in referencing {
466 let details = self
467 .metadata
468 .foreign_key_views(self.catalog.as_ref(), child_table_id)?;
469
470 let Some(detail) = details
471 .into_iter()
472 .find(|detail| detail.constraint_id == constraint_id)
473 else {
474 continue;
475 };
476
477 if detail.referenced_field_ids.is_empty() || detail.referencing_field_ids.is_empty() {
478 continue;
479 }
480
481 let is_referenced_column_updated = detail
483 .referenced_field_ids
484 .iter()
485 .any(|fid| updated_field_ids.contains(fid));
486
487 if !is_referenced_column_updated {
488 continue;
490 }
491
492 let parent_rows = fetch_parent_rows(ForeignKeyParentRowsFetch {
494 referenced_table_id,
495 referenced_row_ids,
496 referenced_field_ids: &detail.referenced_field_ids,
497 })?;
498
499 let parent_keys = canonical_parent_keys(&detail, parent_rows);
500 if parent_keys.is_empty() {
501 continue;
502 }
503
504 let child_rows = fetch_child_rows(ForeignKeyChildRowsFetch {
506 referencing_table_id: detail.referencing_table_id,
507 referencing_table_canonical: &detail.referencing_table_canonical,
508 referencing_field_ids: &detail.referencing_field_ids,
509 })?;
510
511 if child_rows.is_empty() {
512 continue;
513 }
514
515 for (_child_row_id, values) in child_rows {
517 if values.len() != detail.referencing_field_ids.len() {
518 continue;
519 }
520
521 if values.iter().any(|value| matches!(value, PlanValue::Null)) {
522 continue;
523 }
524
525 if parent_keys.iter().any(|key| key == &values) {
527 let constraint_label =
528 detail.constraint_name.as_deref().unwrap_or("FOREIGN KEY");
529 return Err(Error::ConstraintError(format!(
530 "Violates foreign key constraint '{}' on table '{}' referencing '{}' - cannot update referenced column while foreign key exists",
531 constraint_label,
532 detail.referencing_table_display,
533 detail.referenced_table_display,
534 )));
535 }
536 }
537 }
538
539 Ok(())
540 }
541
542 pub fn referencing_foreign_keys(
544 &self,
545 referenced_table_id: TableId,
546 ) -> LlkvResult<Vec<ForeignKeyView>> {
547 let referencing = self
548 .metadata
549 .foreign_keys_referencing(referenced_table_id)?;
550
551 if referencing.is_empty() {
552 return Ok(Vec::new());
553 }
554
555 let mut details_out = Vec::new();
556 for (child_table_id, constraint_id) in referencing {
557 let details = match self
558 .metadata
559 .foreign_key_views(self.catalog.as_ref(), child_table_id)
560 {
561 Ok(details) => details,
562 Err(Error::InvalidArgumentError(_)) | Err(Error::CatalogError(_)) => continue,
563 Err(err) => return Err(err),
564 };
565
566 if let Some(detail) = details
567 .into_iter()
568 .find(|detail| detail.constraint_id == constraint_id)
569 {
570 details_out.push(detail);
571 }
572 }
573
574 Ok(details_out)
575 }
576}
577
578fn build_field_lookup(schema_field_ids: &[FieldId]) -> FxHashMap<FieldId, usize> {
579 let mut lookup = FxHashMap::default();
580 for (idx, field_id) in schema_field_ids.iter().copied().enumerate() {
581 lookup.insert(field_id, idx);
582 }
583 lookup
584}
585
586fn validate_row_constraints_with_mapping(
587 column_constraints: &[InsertColumnConstraint],
588 rows: &[Vec<PlanValue>],
589 schema_to_row_index: &[Option<usize>],
590 column_order: &[usize],
591) -> LlkvResult<()> {
592 for constraint in column_constraints {
593 if constraint.column.nullable {
594 continue;
595 }
596
597 let Some(row_pos) = schema_to_row_index
598 .get(constraint.schema_index)
599 .and_then(|opt| *opt)
600 else {
601 return Err(Error::ConstraintError(format!(
602 "NOT NULL column '{}' missing from INSERT/UPDATE",
603 constraint.column.name
604 )));
605 };
606
607 for row in rows {
608 if matches!(row.get(row_pos), Some(PlanValue::Null)) {
609 return Err(Error::ConstraintError(format!(
610 "NOT NULL constraint failed for column '{}'",
611 constraint.column.name
612 )));
613 }
614 }
615 }
616
617 let check_columns: Vec<ConstraintColumnInfo> = column_constraints
618 .iter()
619 .map(|constraint| constraint.column.clone())
620 .collect();
621 validate_check_constraints(check_columns.as_slice(), rows, column_order)?;
622 Ok(())
623}
624
625fn build_schema_to_row_index(
626 schema_len: usize,
627 column_order: &[usize],
628) -> LlkvResult<Vec<Option<usize>>> {
629 let mut schema_to_row_index: Vec<Option<usize>> = vec![None; schema_len];
630 for (row_pos, &schema_idx) in column_order.iter().enumerate() {
631 if schema_idx >= schema_len {
632 return Err(Error::Internal(format!(
633 "column index {} out of bounds for schema (len={})",
634 schema_idx, schema_len
635 )));
636 }
637 schema_to_row_index[schema_idx] = Some(row_pos);
638 }
639 Ok(schema_to_row_index)
640}
641
642fn primary_key_context(column_names: &[String]) -> (&'static str, String) {
643 if column_names.len() == 1 {
644 ("column", column_names[0].clone())
645 } else {
646 ("columns", column_names.join(", "))
647 }
648}
649
650fn collect_row_sets(
651 rows: &[Vec<PlanValue>],
652 schema_to_row_index: &[Option<usize>],
653 schema_indices: &[usize],
654) -> Vec<Vec<PlanValue>> {
655 rows.iter()
656 .map(|row| {
657 schema_indices
658 .iter()
659 .map(|&schema_idx| {
660 schema_to_row_index
661 .get(schema_idx)
662 .and_then(|opt| {
663 opt.map(|row_pos| row.get(row_pos).cloned().unwrap_or(PlanValue::Null))
664 })
665 .unwrap_or(PlanValue::Null)
666 })
667 .collect()
668 })
669 .collect()
670}
671
672fn referencing_row_positions(
673 detail: &ForeignKeyView,
674 lookup: &FxHashMap<FieldId, usize>,
675 table_to_row_index: &[Option<usize>],
676 table_id: TableId,
677) -> LlkvResult<Vec<usize>> {
678 let mut positions = Vec::with_capacity(detail.referencing_field_ids.len());
679
680 for (idx, field_id) in detail.referencing_field_ids.iter().cloned().enumerate() {
681 let schema_index = lookup.get(&field_id).cloned().ok_or_else(|| {
682 Error::Internal(format!(
683 "referencing field id {} not found in table '{}' (table_id={})",
684 field_id, detail.referencing_table_display, table_id
685 ))
686 })?;
687
688 let position = table_to_row_index
689 .get(schema_index)
690 .and_then(|value| *value)
691 .ok_or_else(|| {
692 let column_name = detail
693 .referencing_column_names
694 .get(idx)
695 .cloned()
696 .unwrap_or_else(|| schema_index.to_string());
697 Error::InvalidArgumentError(format!(
698 "FOREIGN KEY column '{}' missing from INSERT statement",
699 column_name
700 ))
701 })?;
702
703 positions.push(position);
704 }
705
706 Ok(positions)
707}
708
709fn canonical_parent_keys(
710 detail: &ForeignKeyView,
711 parent_rows: Vec<Vec<PlanValue>>,
712) -> Vec<Vec<PlanValue>> {
713 parent_rows
714 .into_iter()
715 .filter(|values| values.len() == detail.referenced_field_ids.len())
716 .filter(|values| !values.iter().any(|value| matches!(value, PlanValue::Null)))
717 .collect()
718}
719
720fn candidate_child_keys(
721 positions: &[usize],
722 rows: &[Vec<PlanValue>],
723) -> LlkvResult<Vec<Vec<PlanValue>>> {
724 let mut keys = Vec::new();
725
726 for row in rows {
727 let mut key: Vec<PlanValue> = Vec::with_capacity(positions.len());
728 let mut contains_null = false;
729
730 for &row_pos in positions {
731 let value = row.get(row_pos).cloned().ok_or_else(|| {
732 Error::InvalidArgumentError("INSERT row is missing a required column value".into())
733 })?;
734
735 if matches!(value, PlanValue::Null) {
736 contains_null = true;
737 break;
738 }
739
740 key.push(value);
741 }
742
743 if contains_null {
744 continue;
745 }
746
747 keys.push(key);
748 }
749
750 Ok(keys)
751}