1#![forbid(unsafe_code)]
6
7use super::types::{ConstraintId, ForeignKeyAction};
8use super::validation::validate_foreign_key_rows;
9use super::validation::{
10 ConstraintColumnInfo, UniqueKey, build_composite_unique_key, ensure_multi_column_unique,
11 ensure_primary_key, ensure_single_column_unique, validate_check_constraints,
12};
13use crate::catalog::TableCatalog;
14use crate::metadata::MetadataManager;
15use crate::types::{FieldId, RowId, TableId};
16use crate::view::ForeignKeyView;
17use croaring::Treemap;
18use llkv_plan::PlanValue;
19use llkv_result::{Error, Result as LlkvResult};
20use llkv_storage::pager::Pager;
21use rustc_hash::{FxHashMap, FxHashSet};
22use simd_r_drive_entry_handle::EntryHandle;
23use std::sync::{Arc, RwLock};
24
25type ForeignKeyConstraintCache = FxHashMap<ConstraintId, Arc<FxHashSet<UniqueKey>>>;
26type ForeignKeyCacheMap = FxHashMap<TableId, ForeignKeyConstraintCache>;
27type SharedForeignKeyCaches = Arc<RwLock<ForeignKeyCacheMap>>;
28
29#[derive(Clone, Debug)]
31pub struct InsertColumnConstraint {
32 pub schema_index: usize,
33 pub column: ConstraintColumnInfo,
34}
35
36#[derive(Clone, Debug)]
38pub struct InsertUniqueColumn {
39 pub schema_index: usize,
40 pub field_id: FieldId,
41 pub name: String,
42}
43
44#[derive(Clone, Debug)]
46pub struct InsertMultiColumnUnique {
47 pub schema_indices: Vec<usize>,
48 pub field_ids: Vec<FieldId>,
49 pub column_names: Vec<String>,
50}
51
52pub struct ForeignKeyRowFetch<'a> {
54 pub referenced_table_id: TableId,
55 pub referenced_table_canonical: &'a str,
56 pub referenced_field_ids: &'a [FieldId],
57}
58
59pub struct ForeignKeyParentRowsFetch<'a> {
61 pub referenced_table_id: TableId,
62 pub referenced_row_ids: &'a Treemap,
63 pub referenced_field_ids: &'a [FieldId],
64}
65
66pub struct ForeignKeyChildRowsFetch<'a> {
68 pub referencing_table_id: TableId,
69 pub referencing_table_canonical: &'a str,
70 pub referencing_field_ids: &'a [FieldId],
71}
72
73#[derive(Clone)]
75pub struct ConstraintService<P>
76where
77 P: Pager<Blob = EntryHandle> + Send + Sync,
78{
79 metadata: Arc<MetadataManager<P>>,
80 catalog: Arc<TableCatalog>,
81 fk_parent_caches: SharedForeignKeyCaches,
82}
83
84impl<P> ConstraintService<P>
85where
86 P: Pager<Blob = EntryHandle> + Send + Sync,
87{
88 pub fn new(metadata: Arc<MetadataManager<P>>, catalog: Arc<TableCatalog>) -> Self {
90 Self {
91 metadata,
92 catalog,
93 fk_parent_caches: Arc::new(RwLock::new(FxHashMap::default())),
94 }
95 }
96
97 pub fn enable_foreign_key_cache(&self, referencing_table_id: TableId) {
99 let mut caches = self
100 .fk_parent_caches
101 .write()
102 .expect("foreign key cache poisoned");
103 caches.entry(referencing_table_id).or_default();
104 }
105
106 pub fn clear_foreign_key_cache(&self, referencing_table_id: TableId) {
108 self.fk_parent_caches
109 .write()
110 .expect("foreign key cache poisoned")
111 .remove(&referencing_table_id);
112 }
113
114 pub fn validate_insert_foreign_keys<F>(
116 &self,
117 referencing_table_id: TableId,
118 schema_field_ids: &[FieldId],
119 column_order: &[usize],
120 rows: &[Vec<PlanValue>],
121 mut fetch_parent_rows: F,
122 ) -> LlkvResult<()>
123 where
124 F: FnMut(ForeignKeyRowFetch<'_>) -> LlkvResult<Vec<Vec<PlanValue>>>,
125 {
126 if rows.is_empty() {
127 return Ok(());
128 }
129
130 let details = self
131 .metadata
132 .foreign_key_views(self.catalog.as_ref(), referencing_table_id)?;
133
134 if details.is_empty() {
135 return Ok(());
136 }
137
138 let caching_enabled = self.is_foreign_key_cache_enabled(referencing_table_id);
139 let mut parent_key_cache: FxHashMap<ConstraintId, Arc<FxHashSet<UniqueKey>>> =
140 FxHashMap::default();
141 let field_lookup = build_field_lookup(schema_field_ids);
142 let mut table_to_row_index: Vec<Option<usize>> = vec![None; schema_field_ids.len()];
143 for (row_pos, &schema_idx) in column_order.iter().enumerate() {
144 if let Some(slot) = table_to_row_index.get_mut(schema_idx) {
145 *slot = Some(row_pos);
146 }
147 }
148
149 for detail in &details {
150 if detail.referencing_field_ids.is_empty() {
151 continue;
152 }
153
154 let referencing_positions = referencing_row_positions(
155 detail,
156 &field_lookup,
157 &table_to_row_index,
158 referencing_table_id,
159 )?;
160
161 let parent_keys = self.resolve_parent_keys(
162 referencing_table_id,
163 detail,
164 caching_enabled,
165 &mut parent_key_cache,
166 &mut fetch_parent_rows,
167 )?;
168
169 let candidate_keys = candidate_child_keys(detail, &referencing_positions, rows)?;
170
171 validate_foreign_key_rows(
172 detail.constraint_name.as_deref(),
173 &detail.referencing_table_display,
174 &detail.referenced_table_display,
175 &detail.referenced_column_names,
176 parent_keys.as_ref(),
177 &candidate_keys,
178 )?;
179 }
180
181 Ok(())
182 }
183
184 #[allow(clippy::too_many_arguments)]
188 pub fn validate_insert_constraints<FSingle, FMulti>(
189 &self,
190 schema_field_ids: &[FieldId],
191 column_constraints: &[InsertColumnConstraint],
192 unique_columns: &[InsertUniqueColumn],
193 multi_column_uniques: &[InsertMultiColumnUnique],
194 primary_key: Option<&InsertMultiColumnUnique>,
195 column_order: &[usize],
196 rows: &[Vec<PlanValue>],
197 mut fetch_column_values: FSingle,
198 mut fetch_multi_column_rows: FMulti,
199 ) -> LlkvResult<()>
200 where
201 FSingle: FnMut(FieldId) -> LlkvResult<Vec<PlanValue>>,
202 FMulti: FnMut(&[FieldId]) -> LlkvResult<Vec<Vec<PlanValue>>>,
203 {
204 if rows.is_empty() {
205 return Ok(());
206 }
207
208 let schema_to_row_index = build_schema_to_row_index(schema_field_ids.len(), column_order)?;
209 validate_row_constraints_with_mapping(
210 column_constraints,
211 rows,
212 &schema_to_row_index,
213 column_order,
214 )?;
215
216 for unique in unique_columns {
217 let Some(row_pos) = schema_to_row_index
218 .get(unique.schema_index)
219 .and_then(|opt| *opt)
220 else {
221 continue;
222 };
223
224 let existing_values = fetch_column_values(unique.field_id)?;
225 let mut new_values: Vec<PlanValue> = Vec::with_capacity(rows.len());
226 for row in rows {
227 let value = row.get(row_pos).cloned().unwrap_or(PlanValue::Null);
228 new_values.push(value);
229 }
230
231 ensure_single_column_unique(&existing_values, &new_values, &unique.name)?;
232 }
233
234 for constraint in multi_column_uniques {
235 if constraint.schema_indices.is_empty() {
236 continue;
237 }
238
239 let existing_rows = fetch_multi_column_rows(&constraint.field_ids)?;
240 let existing_keys =
241 rows_to_unique_keys(existing_rows, &constraint.column_names, NullKeyMode::Skip)?;
242 let new_keys = collect_unique_keys_from_rows(
243 rows,
244 &schema_to_row_index,
245 &constraint.schema_indices,
246 &constraint.column_names,
247 NullKeyMode::Skip,
248 )?;
249 ensure_multi_column_unique(&existing_keys, &new_keys, &constraint.column_names)?;
250 }
251
252 if let Some(pk) = primary_key
253 && !pk.schema_indices.is_empty()
254 {
255 let (pk_label, pk_display) = primary_key_context(&pk.column_names);
256 let existing_rows = fetch_multi_column_rows(&pk.field_ids)?;
257 let existing_keys = rows_to_unique_keys(
258 existing_rows,
259 &pk.column_names,
260 NullKeyMode::PrimaryKey {
261 label: pk_label,
262 display: &pk_display,
263 },
264 )?;
265 let new_keys = collect_unique_keys_from_rows(
266 rows,
267 &schema_to_row_index,
268 &pk.schema_indices,
269 &pk.column_names,
270 NullKeyMode::PrimaryKey {
271 label: pk_label,
272 display: &pk_display,
273 },
274 )?;
275 ensure_primary_key(&existing_keys, &new_keys, &pk.column_names)?;
276 }
277
278 Ok(())
279 }
280
281 pub fn validate_row_level_constraints(
284 &self,
285 schema_field_ids: &[FieldId],
286 column_constraints: &[InsertColumnConstraint],
287 column_order: &[usize],
288 rows: &[Vec<PlanValue>],
289 ) -> LlkvResult<()> {
290 if rows.is_empty() {
291 return Ok(());
292 }
293
294 let schema_to_row_index = build_schema_to_row_index(schema_field_ids.len(), column_order)?;
295 validate_row_constraints_with_mapping(
296 column_constraints,
297 rows,
298 &schema_to_row_index,
299 column_order,
300 )
301 }
302
303 pub fn validate_primary_key_rows<F>(
306 &self,
307 schema_field_ids: &[FieldId],
308 primary_key: &InsertMultiColumnUnique,
309 column_order: &[usize],
310 rows: &[Vec<PlanValue>],
311 mut fetch_multi_column_rows: F,
312 ) -> LlkvResult<()>
313 where
314 F: FnMut(&[FieldId]) -> LlkvResult<Vec<Vec<PlanValue>>>,
315 {
316 if rows.is_empty() || primary_key.schema_indices.is_empty() {
317 return Ok(());
318 }
319
320 let schema_to_row_index = build_schema_to_row_index(schema_field_ids.len(), column_order)?;
321 let (pk_label, pk_display) = primary_key_context(&primary_key.column_names);
322 let existing_rows = fetch_multi_column_rows(&primary_key.field_ids)?;
323 let existing_keys = rows_to_unique_keys(
324 existing_rows,
325 &primary_key.column_names,
326 NullKeyMode::PrimaryKey {
327 label: pk_label,
328 display: &pk_display,
329 },
330 )?;
331 let new_keys = collect_unique_keys_from_rows(
332 rows,
333 &schema_to_row_index,
334 &primary_key.schema_indices,
335 &primary_key.column_names,
336 NullKeyMode::PrimaryKey {
337 label: pk_label,
338 display: &pk_display,
339 },
340 )?;
341 ensure_primary_key(&existing_keys, &new_keys, &primary_key.column_names)
342 }
343
344 pub fn validate_update_primary_keys<F>(
347 &self,
348 schema_field_ids: &[FieldId],
349 primary_key: &InsertMultiColumnUnique,
350 column_order: &[usize],
351 rows: &[Vec<PlanValue>],
352 original_keys: &[Option<UniqueKey>],
353 mut fetch_multi_column_rows: F,
354 ) -> LlkvResult<()>
355 where
356 F: FnMut(&[FieldId]) -> LlkvResult<Vec<Vec<PlanValue>>>,
357 {
358 if rows.is_empty() || primary_key.schema_indices.is_empty() {
359 return Ok(());
360 }
361
362 if original_keys.len() != rows.len() {
363 return Err(Error::Internal(
364 "primary key original value count does not match row count".into(),
365 ));
366 }
367
368 let schema_to_row_index = build_schema_to_row_index(schema_field_ids.len(), column_order)?;
369
370 let (pk_label, pk_display) = primary_key_context(&primary_key.column_names);
371 let existing_rows = fetch_multi_column_rows(&primary_key.field_ids)?;
372 let existing_key_vec = rows_to_unique_keys(
373 existing_rows,
374 &primary_key.column_names,
375 NullKeyMode::PrimaryKey {
376 label: pk_label,
377 display: &pk_display,
378 },
379 )?;
380 let mut existing_keys: FxHashSet<UniqueKey> = existing_key_vec.into_iter().collect();
381
382 for key in original_keys.iter().flatten() {
383 existing_keys.remove(key);
384 }
385
386 let mut new_seen: FxHashSet<UniqueKey> = FxHashSet::default();
387 let new_keys = collect_unique_keys_from_rows(
388 rows,
389 &schema_to_row_index,
390 &primary_key.schema_indices,
391 &primary_key.column_names,
392 NullKeyMode::PrimaryKey {
393 label: pk_label,
394 display: &pk_display,
395 },
396 )?;
397
398 for key in new_keys {
399 if existing_keys.contains(&key) {
400 return Err(Error::ConstraintError(format!(
401 "Duplicate key violates primary key constraint on {pk_label} '{}' (PRIMARY KEY or UNIQUE constraint violation)",
402 pk_display
403 )));
404 }
405
406 if !new_seen.insert(key.clone()) {
407 return Err(Error::ConstraintError(format!(
408 "Duplicate key violates primary key constraint on {pk_label} '{}' (PRIMARY KEY or UNIQUE constraint violation)",
409 pk_display
410 )));
411 }
412
413 existing_keys.insert(key);
414 }
415
416 Ok(())
417 }
418
419 pub fn validate_delete_foreign_keys<FParents, FChildren>(
421 &self,
422 referenced_table_id: TableId,
423 referenced_row_ids: &Treemap,
424 mut fetch_parent_rows: FParents,
425 mut fetch_child_rows: FChildren,
426 ) -> LlkvResult<()>
427 where
428 FParents: FnMut(ForeignKeyParentRowsFetch<'_>) -> LlkvResult<Vec<Vec<PlanValue>>>,
429 FChildren: FnMut(ForeignKeyChildRowsFetch<'_>) -> LlkvResult<Vec<(RowId, Vec<PlanValue>)>>,
430 {
431 if referenced_row_ids.is_empty() {
432 return Ok(());
433 }
434
435 let referencing = self
436 .metadata
437 .foreign_keys_referencing(referenced_table_id)?;
438 if referencing.is_empty() {
439 return Ok(());
440 }
441
442 for (child_table_id, constraint_id) in referencing {
443 let details = self
444 .metadata
445 .foreign_key_views(self.catalog.as_ref(), child_table_id)?;
446
447 let Some(detail) = details
448 .into_iter()
449 .find(|detail| detail.constraint_id == constraint_id)
450 else {
451 continue;
452 };
453
454 if detail.referenced_field_ids.is_empty() || detail.referencing_field_ids.is_empty() {
455 continue;
456 }
457
458 let parent_rows = fetch_parent_rows(ForeignKeyParentRowsFetch {
459 referenced_table_id,
460 referenced_row_ids,
461 referenced_field_ids: &detail.referenced_field_ids,
462 })?;
463
464 let parent_keys = canonical_parent_keys(&detail, parent_rows)?;
465 if parent_keys.is_empty() {
466 continue;
467 }
468
469 let child_rows = fetch_child_rows(ForeignKeyChildRowsFetch {
470 referencing_table_id: detail.referencing_table_id,
471 referencing_table_canonical: &detail.referencing_table_canonical,
472 referencing_field_ids: &detail.referencing_field_ids,
473 })?;
474
475 if child_rows.is_empty() {
476 continue;
477 }
478
479 for (child_row_id, values) in child_rows {
480 if values.len() != detail.referencing_field_ids.len() {
481 continue;
482 }
483
484 let child_key =
485 build_composite_unique_key(&values, &detail.referencing_column_names)?;
486 let Some(child_key) = child_key else { continue };
487
488 if !parent_keys.contains(&child_key) {
489 continue;
490 }
491
492 if detail.referencing_table_id == detail.referenced_table_id
493 && referenced_row_ids.contains(child_row_id)
494 {
495 continue;
496 }
497
498 let constraint_label = detail.constraint_name.as_deref().unwrap_or("FOREIGN KEY");
499 match detail.on_delete {
500 ForeignKeyAction::NoAction | ForeignKeyAction::Restrict => {
501 return Err(Error::ConstraintError(format!(
502 "Violates foreign key constraint '{}' on table '{}' referencing '{}' - row is still referenced by a foreign key in a different table",
503 constraint_label,
504 detail.referencing_table_display,
505 detail.referenced_table_display,
506 )));
507 }
508 }
509 }
510 }
511
512 Ok(())
513 }
514
515 pub fn validate_update_foreign_keys<FParents, FChildren>(
520 &self,
521 referenced_table_id: TableId,
522 referenced_row_ids: &Treemap,
523 updated_field_ids: &[FieldId],
524 mut fetch_parent_rows: FParents,
525 mut fetch_child_rows: FChildren,
526 ) -> LlkvResult<()>
527 where
528 FParents: FnMut(ForeignKeyParentRowsFetch<'_>) -> LlkvResult<Vec<Vec<PlanValue>>>,
529 FChildren: FnMut(ForeignKeyChildRowsFetch<'_>) -> LlkvResult<Vec<(RowId, Vec<PlanValue>)>>,
530 {
531 if referenced_row_ids.is_empty() || updated_field_ids.is_empty() {
532 return Ok(());
533 }
534
535 let referencing = self
536 .metadata
537 .foreign_keys_referencing(referenced_table_id)?;
538 if referencing.is_empty() {
539 return Ok(());
540 }
541
542 for (child_table_id, constraint_id) in referencing {
543 let details = self
544 .metadata
545 .foreign_key_views(self.catalog.as_ref(), child_table_id)?;
546
547 let Some(detail) = details
548 .into_iter()
549 .find(|detail| detail.constraint_id == constraint_id)
550 else {
551 continue;
552 };
553
554 if detail.referenced_field_ids.is_empty() || detail.referencing_field_ids.is_empty() {
555 continue;
556 }
557
558 let is_referenced_column_updated = detail
560 .referenced_field_ids
561 .iter()
562 .any(|fid| updated_field_ids.contains(fid));
563
564 if !is_referenced_column_updated {
565 continue;
567 }
568
569 let parent_rows = fetch_parent_rows(ForeignKeyParentRowsFetch {
571 referenced_table_id,
572 referenced_row_ids,
573 referenced_field_ids: &detail.referenced_field_ids,
574 })?;
575
576 let parent_keys = canonical_parent_keys(&detail, parent_rows)?;
577 if parent_keys.is_empty() {
578 continue;
579 }
580
581 let child_rows = fetch_child_rows(ForeignKeyChildRowsFetch {
583 referencing_table_id: detail.referencing_table_id,
584 referencing_table_canonical: &detail.referencing_table_canonical,
585 referencing_field_ids: &detail.referencing_field_ids,
586 })?;
587
588 if child_rows.is_empty() {
589 continue;
590 }
591
592 for (_child_row_id, values) in child_rows {
594 if values.len() != detail.referencing_field_ids.len() {
595 continue;
596 }
597
598 let child_key =
599 build_composite_unique_key(&values, &detail.referencing_column_names)?;
600 let Some(child_key) = child_key else { continue };
601
602 if parent_keys.contains(&child_key) {
604 let constraint_label =
605 detail.constraint_name.as_deref().unwrap_or("FOREIGN KEY");
606 return Err(Error::ConstraintError(format!(
607 "Violates foreign key constraint '{}' on table '{}' referencing '{}' - cannot update referenced column while foreign key exists",
608 constraint_label,
609 detail.referencing_table_display,
610 detail.referenced_table_display,
611 )));
612 }
613 }
614 }
615
616 Ok(())
617 }
618
619 pub fn referencing_foreign_keys(
621 &self,
622 referenced_table_id: TableId,
623 ) -> LlkvResult<Vec<ForeignKeyView>> {
624 let referencing = self
625 .metadata
626 .foreign_keys_referencing(referenced_table_id)?;
627
628 if referencing.is_empty() {
629 return Ok(Vec::new());
630 }
631
632 let mut details_out = Vec::new();
633 for (child_table_id, constraint_id) in referencing {
634 let details = match self
635 .metadata
636 .foreign_key_views(self.catalog.as_ref(), child_table_id)
637 {
638 Ok(details) => details,
639 Err(Error::InvalidArgumentError(_)) | Err(Error::CatalogError(_)) => continue,
640 Err(err) => return Err(err),
641 };
642
643 if let Some(detail) = details
644 .into_iter()
645 .find(|detail| detail.constraint_id == constraint_id)
646 {
647 details_out.push(detail);
648 }
649 }
650
651 Ok(details_out)
652 }
653
654 fn is_foreign_key_cache_enabled(&self, table_id: TableId) -> bool {
655 self.fk_parent_caches
656 .read()
657 .expect("foreign key cache poisoned")
658 .contains_key(&table_id)
659 }
660
661 fn cached_parent_keys(
662 &self,
663 table_id: TableId,
664 constraint_id: ConstraintId,
665 ) -> Option<Arc<FxHashSet<UniqueKey>>> {
666 self.fk_parent_caches
667 .read()
668 .expect("foreign key cache poisoned")
669 .get(&table_id)
670 .and_then(|cache| cache.get(&constraint_id).map(Arc::clone))
671 }
672
673 fn store_cached_parent_keys(
674 &self,
675 table_id: TableId,
676 constraint_id: ConstraintId,
677 keys: FxHashSet<UniqueKey>,
678 ) -> Arc<FxHashSet<UniqueKey>> {
679 let mut caches = self
680 .fk_parent_caches
681 .write()
682 .expect("foreign key cache poisoned");
683 let Some(entry) = caches.get_mut(&table_id) else {
684 return Arc::new(keys);
685 };
686
687 let arc_keys = Arc::new(keys);
688 entry.insert(constraint_id, Arc::clone(&arc_keys));
689 arc_keys
690 }
691
692 fn resolve_parent_keys<F>(
693 &self,
694 referencing_table_id: TableId,
695 detail: &ForeignKeyView,
696 caching_enabled: bool,
697 local_cache: &mut FxHashMap<ConstraintId, Arc<FxHashSet<UniqueKey>>>,
698 fetch_parent_rows: &mut F,
699 ) -> LlkvResult<Arc<FxHashSet<UniqueKey>>>
700 where
701 F: FnMut(ForeignKeyRowFetch<'_>) -> LlkvResult<Vec<Vec<PlanValue>>>,
702 {
703 if caching_enabled {
704 if let Some(keys) = self.cached_parent_keys(referencing_table_id, detail.constraint_id)
705 {
706 return Ok(keys);
707 }
708 } else if let Some(keys) = local_cache.get(&detail.constraint_id) {
709 return Ok(Arc::clone(keys));
710 }
711
712 let parent_rows = fetch_parent_rows(ForeignKeyRowFetch {
713 referenced_table_id: detail.referenced_table_id,
714 referenced_table_canonical: &detail.referenced_table_canonical,
715 referenced_field_ids: &detail.referenced_field_ids,
716 })?;
717
718 let key_set = canonical_parent_keys(detail, parent_rows)?;
719
720 if caching_enabled {
721 Ok(self.store_cached_parent_keys(referencing_table_id, detail.constraint_id, key_set))
722 } else {
723 let arc_keys = Arc::new(key_set);
724 local_cache.insert(detail.constraint_id, Arc::clone(&arc_keys));
725 Ok(arc_keys)
726 }
727 }
728}
729
730fn build_field_lookup(schema_field_ids: &[FieldId]) -> FxHashMap<FieldId, usize> {
731 let mut lookup = FxHashMap::default();
732 for (idx, field_id) in schema_field_ids.iter().copied().enumerate() {
733 lookup.insert(field_id, idx);
734 }
735 lookup
736}
737
738fn validate_row_constraints_with_mapping(
739 column_constraints: &[InsertColumnConstraint],
740 rows: &[Vec<PlanValue>],
741 schema_to_row_index: &[Option<usize>],
742 column_order: &[usize],
743) -> LlkvResult<()> {
744 for constraint in column_constraints {
745 if constraint.column.nullable {
746 continue;
747 }
748
749 let Some(row_pos) = schema_to_row_index
750 .get(constraint.schema_index)
751 .and_then(|opt| *opt)
752 else {
753 return Err(Error::ConstraintError(format!(
754 "NOT NULL column '{}' missing from INSERT/UPDATE",
755 constraint.column.name
756 )));
757 };
758
759 for row in rows {
760 if matches!(row.get(row_pos), Some(PlanValue::Null)) {
761 return Err(Error::ConstraintError(format!(
762 "NOT NULL constraint failed for column '{}'",
763 constraint.column.name
764 )));
765 }
766 }
767 }
768
769 let check_columns: Vec<ConstraintColumnInfo> = column_constraints
770 .iter()
771 .map(|constraint| constraint.column.clone())
772 .collect();
773 validate_check_constraints(check_columns.as_slice(), rows, column_order)?;
774 Ok(())
775}
776
777fn build_schema_to_row_index(
778 schema_len: usize,
779 column_order: &[usize],
780) -> LlkvResult<Vec<Option<usize>>> {
781 let mut schema_to_row_index: Vec<Option<usize>> = vec![None; schema_len];
782 for (row_pos, &schema_idx) in column_order.iter().enumerate() {
783 if schema_idx >= schema_len {
784 return Err(Error::Internal(format!(
785 "column index {} out of bounds for schema (len={})",
786 schema_idx, schema_len
787 )));
788 }
789 schema_to_row_index[schema_idx] = Some(row_pos);
790 }
791 Ok(schema_to_row_index)
792}
793
794fn primary_key_context(column_names: &[String]) -> (&'static str, String) {
795 if column_names.len() == 1 {
796 ("column", column_names[0].clone())
797 } else {
798 ("columns", column_names.join(", "))
799 }
800}
801
802#[derive(Clone, Copy)]
803enum NullKeyMode<'a> {
804 Skip,
805 PrimaryKey {
806 label: &'static str,
807 display: &'a str,
808 },
809}
810
811fn rows_to_unique_keys<'a>(
812 rows: Vec<Vec<PlanValue>>,
813 column_names: &[String],
814 mode: NullKeyMode<'a>,
815) -> LlkvResult<Vec<UniqueKey>> {
816 let mut keys = Vec::with_capacity(rows.len());
817 for values in rows {
818 push_unique_key(&mut keys, &values, column_names, mode)?;
819 }
820 Ok(keys)
821}
822
823fn collect_unique_keys_from_rows<'a>(
824 rows: &[Vec<PlanValue>],
825 schema_to_row_index: &[Option<usize>],
826 schema_indices: &[usize],
827 column_names: &[String],
828 mode: NullKeyMode<'a>,
829) -> LlkvResult<Vec<UniqueKey>> {
830 if schema_indices.is_empty() {
831 return Ok(Vec::new());
832 }
833
834 let mut keys = Vec::with_capacity(rows.len());
835 let mut buffer = Vec::with_capacity(schema_indices.len());
836 for row in rows {
837 buffer.clear();
838 for &schema_idx in schema_indices {
839 let value = schema_to_row_index
840 .get(schema_idx)
841 .and_then(|opt| *opt)
842 .and_then(|row_pos| row.get(row_pos).cloned())
843 .unwrap_or(PlanValue::Null);
844 buffer.push(value);
845 }
846
847 push_unique_key(&mut keys, &buffer, column_names, mode)?;
848 }
849
850 Ok(keys)
851}
852
853fn push_unique_key<'a>(
854 keys: &mut Vec<UniqueKey>,
855 values: &[PlanValue],
856 column_names: &[String],
857 mode: NullKeyMode<'a>,
858) -> LlkvResult<()> {
859 match build_composite_unique_key(values, column_names)? {
860 Some(key) => {
861 keys.push(key);
862 Ok(())
863 }
864 None => match mode {
865 NullKeyMode::Skip => Ok(()),
866 NullKeyMode::PrimaryKey { label, display } => Err(Error::ConstraintError(format!(
867 "constraint failed: NOT NULL constraint failed for PRIMARY KEY {label} '{display}'"
868 ))),
869 },
870 }
871}
872
873fn referencing_row_positions(
874 detail: &ForeignKeyView,
875 lookup: &FxHashMap<FieldId, usize>,
876 table_to_row_index: &[Option<usize>],
877 table_id: TableId,
878) -> LlkvResult<Vec<usize>> {
879 let mut positions = Vec::with_capacity(detail.referencing_field_ids.len());
880
881 for (idx, field_id) in detail.referencing_field_ids.iter().cloned().enumerate() {
882 let schema_index = lookup.get(&field_id).cloned().ok_or_else(|| {
883 Error::Internal(format!(
884 "referencing field id {} not found in table '{}' (table_id={})",
885 field_id, detail.referencing_table_display, table_id
886 ))
887 })?;
888
889 let position = table_to_row_index
890 .get(schema_index)
891 .and_then(|value| *value)
892 .ok_or_else(|| {
893 let column_name = detail
894 .referencing_column_names
895 .get(idx)
896 .cloned()
897 .unwrap_or_else(|| schema_index.to_string());
898 Error::InvalidArgumentError(format!(
899 "FOREIGN KEY column '{}' missing from INSERT statement",
900 column_name
901 ))
902 })?;
903
904 positions.push(position);
905 }
906
907 Ok(positions)
908}
909
910fn canonical_parent_keys(
911 detail: &ForeignKeyView,
912 parent_rows: Vec<Vec<PlanValue>>,
913) -> LlkvResult<FxHashSet<UniqueKey>> {
914 let mut keys = FxHashSet::default();
915 for values in parent_rows {
916 if values.len() != detail.referenced_field_ids.len() {
917 continue;
918 }
919
920 let key = build_composite_unique_key(&values, &detail.referenced_column_names)?;
921 if let Some(key) = key {
922 keys.insert(key);
923 }
924 }
925
926 Ok(keys)
927}
928
929fn candidate_child_keys(
930 detail: &ForeignKeyView,
931 positions: &[usize],
932 rows: &[Vec<PlanValue>],
933) -> LlkvResult<Vec<UniqueKey>> {
934 let mut keys = Vec::new();
935
936 for row in rows {
937 let mut values: Vec<PlanValue> = Vec::with_capacity(positions.len());
938
939 for &row_pos in positions {
940 let value = row.get(row_pos).cloned().ok_or_else(|| {
941 Error::InvalidArgumentError("INSERT row is missing a required column value".into())
942 })?;
943 values.push(value);
944 }
945
946 let key = build_composite_unique_key(&values, &detail.referencing_column_names)?;
947 if let Some(key) = key {
948 keys.push(key);
949 }
950 }
951
952 Ok(keys)
953}