1#![forbid(unsafe_code)]
6
7use super::types::ForeignKeyAction;
8use super::validation::validate_foreign_key_rows;
9use super::validation::{
10 ConstraintColumnInfo, UniqueKey, build_composite_unique_key, ensure_multi_column_unique,
11 ensure_primary_key, ensure_single_column_unique, validate_check_constraints,
12};
13use crate::catalog::TableCatalog;
14use crate::metadata::MetadataManager;
15use crate::types::{FieldId, RowId, TableId};
16use crate::view::ForeignKeyView;
17use llkv_plan::PlanValue;
18use llkv_result::{Error, Result as LlkvResult};
19use llkv_storage::pager::Pager;
20use rustc_hash::{FxHashMap, FxHashSet};
21use simd_r_drive_entry_handle::EntryHandle;
22use std::sync::Arc;
23
24#[derive(Clone, Debug)]
26pub struct InsertColumnConstraint {
27 pub schema_index: usize,
28 pub column: ConstraintColumnInfo,
29}
30
31#[derive(Clone, Debug)]
33pub struct InsertUniqueColumn {
34 pub schema_index: usize,
35 pub field_id: FieldId,
36 pub name: String,
37}
38
39#[derive(Clone, Debug)]
41pub struct InsertMultiColumnUnique {
42 pub schema_indices: Vec<usize>,
43 pub field_ids: Vec<FieldId>,
44 pub column_names: Vec<String>,
45}
46
47pub struct ForeignKeyRowFetch<'a> {
49 pub referenced_table_id: TableId,
50 pub referenced_table_canonical: &'a str,
51 pub referenced_field_ids: &'a [FieldId],
52}
53
54pub struct ForeignKeyParentRowsFetch<'a> {
56 pub referenced_table_id: TableId,
57 pub referenced_row_ids: &'a [RowId],
58 pub referenced_field_ids: &'a [FieldId],
59}
60
61pub struct ForeignKeyChildRowsFetch<'a> {
63 pub referencing_table_id: TableId,
64 pub referencing_table_canonical: &'a str,
65 pub referencing_field_ids: &'a [FieldId],
66}
67
68#[derive(Clone)]
70pub struct ConstraintService<P>
71where
72 P: Pager<Blob = EntryHandle> + Send + Sync,
73{
74 metadata: Arc<MetadataManager<P>>,
75 catalog: Arc<TableCatalog>,
76}
77
78impl<P> ConstraintService<P>
79where
80 P: Pager<Blob = EntryHandle> + Send + Sync,
81{
82 pub fn new(metadata: Arc<MetadataManager<P>>, catalog: Arc<TableCatalog>) -> Self {
83 Self { metadata, catalog }
84 }
85
86 pub fn validate_insert_foreign_keys<F>(
88 &self,
89 referencing_table_id: TableId,
90 schema_field_ids: &[FieldId],
91 column_order: &[usize],
92 rows: &[Vec<PlanValue>],
93 mut fetch_parent_rows: F,
94 ) -> LlkvResult<()>
95 where
96 F: FnMut(ForeignKeyRowFetch<'_>) -> LlkvResult<Vec<Vec<PlanValue>>>,
97 {
98 if rows.is_empty() {
99 return Ok(());
100 }
101
102 let details = self
103 .metadata
104 .foreign_key_views(self.catalog.as_ref(), referencing_table_id)?;
105
106 if details.is_empty() {
107 return Ok(());
108 }
109
110 let field_lookup = build_field_lookup(schema_field_ids);
111 let mut table_to_row_index: Vec<Option<usize>> = vec![None; schema_field_ids.len()];
112 for (row_pos, &schema_idx) in column_order.iter().enumerate() {
113 if let Some(slot) = table_to_row_index.get_mut(schema_idx) {
114 *slot = Some(row_pos);
115 }
116 }
117
118 for detail in &details {
119 if detail.referencing_field_ids.is_empty() {
120 continue;
121 }
122
123 let referencing_positions = referencing_row_positions(
124 detail,
125 &field_lookup,
126 &table_to_row_index,
127 referencing_table_id,
128 )?;
129
130 let parent_rows = fetch_parent_rows(ForeignKeyRowFetch {
131 referenced_table_id: detail.referenced_table_id,
132 referenced_table_canonical: &detail.referenced_table_canonical,
133 referenced_field_ids: &detail.referenced_field_ids,
134 })?;
135
136 let parent_keys = canonical_parent_keys(detail, parent_rows);
137 let candidate_keys = candidate_child_keys(&referencing_positions, rows)?;
138
139 validate_foreign_key_rows(
140 detail.constraint_name.as_deref(),
141 &detail.referencing_table_display,
142 &detail.referenced_table_display,
143 &detail.referenced_column_names,
144 &parent_keys,
145 &candidate_keys,
146 )?;
147 }
148
149 Ok(())
150 }
151
152 #[allow(clippy::too_many_arguments)]
153 pub fn validate_insert_constraints<FSingle, FMulti>(
154 &self,
155 schema_field_ids: &[FieldId],
156 column_constraints: &[InsertColumnConstraint],
157 unique_columns: &[InsertUniqueColumn],
158 multi_column_uniques: &[InsertMultiColumnUnique],
159 primary_key: Option<&InsertMultiColumnUnique>,
160 column_order: &[usize],
161 rows: &[Vec<PlanValue>],
162 mut fetch_column_values: FSingle,
163 mut fetch_multi_column_rows: FMulti,
164 ) -> LlkvResult<()>
165 where
166 FSingle: FnMut(FieldId) -> LlkvResult<Vec<PlanValue>>,
167 FMulti: FnMut(&[FieldId]) -> LlkvResult<Vec<Vec<PlanValue>>>,
168 {
169 if rows.is_empty() {
170 return Ok(());
171 }
172
173 let schema_to_row_index = build_schema_to_row_index(schema_field_ids.len(), column_order)?;
174 validate_row_constraints_with_mapping(
175 column_constraints,
176 rows,
177 &schema_to_row_index,
178 column_order,
179 )?;
180
181 for unique in unique_columns {
182 let Some(row_pos) = schema_to_row_index
183 .get(unique.schema_index)
184 .and_then(|opt| *opt)
185 else {
186 continue;
187 };
188
189 let existing_values = fetch_column_values(unique.field_id)?;
190 let mut new_values: Vec<PlanValue> = Vec::with_capacity(rows.len());
191 for row in rows {
192 let value = row.get(row_pos).cloned().unwrap_or(PlanValue::Null);
193 new_values.push(value);
194 }
195
196 ensure_single_column_unique(&existing_values, &new_values, &unique.name)?;
197 }
198
199 for constraint in multi_column_uniques {
200 if constraint.schema_indices.is_empty() {
201 continue;
202 }
203
204 let existing_rows = fetch_multi_column_rows(&constraint.field_ids)?;
205 let new_rows = collect_row_sets(rows, &schema_to_row_index, &constraint.schema_indices);
206 ensure_multi_column_unique(&existing_rows, &new_rows, &constraint.column_names)?;
207 }
208
209 if let Some(pk) = primary_key
210 && !pk.schema_indices.is_empty()
211 {
212 let existing_rows = fetch_multi_column_rows(&pk.field_ids)?;
213 let new_rows = collect_row_sets(rows, &schema_to_row_index, &pk.schema_indices);
214 ensure_primary_key(&existing_rows, &new_rows, &pk.column_names)?;
215 }
216
217 Ok(())
218 }
219
220 pub fn validate_row_level_constraints(
221 &self,
222 schema_field_ids: &[FieldId],
223 column_constraints: &[InsertColumnConstraint],
224 column_order: &[usize],
225 rows: &[Vec<PlanValue>],
226 ) -> LlkvResult<()> {
227 if rows.is_empty() {
228 return Ok(());
229 }
230
231 let schema_to_row_index = build_schema_to_row_index(schema_field_ids.len(), column_order)?;
232 validate_row_constraints_with_mapping(
233 column_constraints,
234 rows,
235 &schema_to_row_index,
236 column_order,
237 )
238 }
239
240 pub fn validate_primary_key_rows<F>(
241 &self,
242 schema_field_ids: &[FieldId],
243 primary_key: &InsertMultiColumnUnique,
244 column_order: &[usize],
245 rows: &[Vec<PlanValue>],
246 mut fetch_multi_column_rows: F,
247 ) -> LlkvResult<()>
248 where
249 F: FnMut(&[FieldId]) -> LlkvResult<Vec<Vec<PlanValue>>>,
250 {
251 if rows.is_empty() || primary_key.schema_indices.is_empty() {
252 return Ok(());
253 }
254
255 let schema_to_row_index = build_schema_to_row_index(schema_field_ids.len(), column_order)?;
256 let existing_rows = fetch_multi_column_rows(&primary_key.field_ids)?;
257 let new_rows = collect_row_sets(rows, &schema_to_row_index, &primary_key.schema_indices);
258 ensure_primary_key(&existing_rows, &new_rows, &primary_key.column_names)
259 }
260
261 pub fn validate_update_primary_keys<F>(
262 &self,
263 schema_field_ids: &[FieldId],
264 primary_key: &InsertMultiColumnUnique,
265 column_order: &[usize],
266 rows: &[Vec<PlanValue>],
267 original_keys: &[Option<UniqueKey>],
268 mut fetch_multi_column_rows: F,
269 ) -> LlkvResult<()>
270 where
271 F: FnMut(&[FieldId]) -> LlkvResult<Vec<Vec<PlanValue>>>,
272 {
273 if rows.is_empty() || primary_key.schema_indices.is_empty() {
274 return Ok(());
275 }
276
277 if original_keys.len() != rows.len() {
278 return Err(Error::Internal(
279 "primary key original value count does not match row count".into(),
280 ));
281 }
282
283 let schema_to_row_index = build_schema_to_row_index(schema_field_ids.len(), column_order)?;
284
285 let mut existing_rows = fetch_multi_column_rows(&primary_key.field_ids)?;
286 let mut existing_keys: FxHashSet<UniqueKey> = FxHashSet::default();
287 for row_values in existing_rows.drain(..) {
288 if let Some(key) = build_composite_unique_key(&row_values, &primary_key.column_names)? {
289 existing_keys.insert(key);
290 }
291 }
292
293 for key in original_keys.iter().flatten() {
294 existing_keys.remove(key);
295 }
296
297 let (pk_label, pk_display) = primary_key_context(&primary_key.column_names);
298 let mut new_seen: FxHashSet<UniqueKey> = FxHashSet::default();
299 let new_row_sets =
300 collect_row_sets(rows, &schema_to_row_index, &primary_key.schema_indices);
301
302 for values in new_row_sets {
303 let key = build_composite_unique_key(&values, &primary_key.column_names)?;
304 let key = key.ok_or_else(|| {
305 Error::ConstraintError(format!(
306 "constraint failed: NOT NULL constraint failed for PRIMARY KEY {pk_label} '{pk_display}'"
307 ))
308 })?;
309
310 if existing_keys.contains(&key) {
311 return Err(Error::ConstraintError(format!(
312 "Duplicate key violates primary key constraint on {pk_label} '{}' (PRIMARY KEY or UNIQUE constraint violation)",
313 pk_display
314 )));
315 }
316
317 if !new_seen.insert(key.clone()) {
318 return Err(Error::ConstraintError(format!(
319 "Duplicate key violates primary key constraint on {pk_label} '{}' (PRIMARY KEY or UNIQUE constraint violation)",
320 pk_display
321 )));
322 }
323
324 existing_keys.insert(key);
325 }
326
327 Ok(())
328 }
329
330 pub fn validate_delete_foreign_keys<FParents, FChildren>(
332 &self,
333 referenced_table_id: TableId,
334 referenced_row_ids: &[RowId],
335 mut fetch_parent_rows: FParents,
336 mut fetch_child_rows: FChildren,
337 ) -> LlkvResult<()>
338 where
339 FParents: FnMut(ForeignKeyParentRowsFetch<'_>) -> LlkvResult<Vec<Vec<PlanValue>>>,
340 FChildren: FnMut(ForeignKeyChildRowsFetch<'_>) -> LlkvResult<Vec<(RowId, Vec<PlanValue>)>>,
341 {
342 if referenced_row_ids.is_empty() {
343 return Ok(());
344 }
345
346 let referencing = self
347 .metadata
348 .foreign_keys_referencing(referenced_table_id)?;
349 if referencing.is_empty() {
350 return Ok(());
351 }
352
353 let deleting_row_ids: FxHashSet<RowId> = referenced_row_ids.iter().copied().collect();
354
355 for (child_table_id, constraint_id) in referencing {
356 let details = self
357 .metadata
358 .foreign_key_views(self.catalog.as_ref(), child_table_id)?;
359
360 let Some(detail) = details
361 .into_iter()
362 .find(|detail| detail.constraint_id == constraint_id)
363 else {
364 continue;
365 };
366
367 if detail.referenced_field_ids.is_empty() || detail.referencing_field_ids.is_empty() {
368 continue;
369 }
370
371 let parent_rows = fetch_parent_rows(ForeignKeyParentRowsFetch {
372 referenced_table_id,
373 referenced_row_ids,
374 referenced_field_ids: &detail.referenced_field_ids,
375 })?;
376
377 let parent_keys = canonical_parent_keys(&detail, parent_rows);
378 if parent_keys.is_empty() {
379 continue;
380 }
381
382 let child_rows = fetch_child_rows(ForeignKeyChildRowsFetch {
383 referencing_table_id: detail.referencing_table_id,
384 referencing_table_canonical: &detail.referencing_table_canonical,
385 referencing_field_ids: &detail.referencing_field_ids,
386 })?;
387
388 if child_rows.is_empty() {
389 continue;
390 }
391
392 for (child_row_id, values) in child_rows {
393 if values.len() != detail.referencing_field_ids.len() {
394 continue;
395 }
396
397 if values.iter().any(|value| matches!(value, PlanValue::Null)) {
398 continue;
399 }
400
401 if parent_keys.iter().all(|key| key != &values) {
402 continue;
403 }
404
405 if detail.referencing_table_id == detail.referenced_table_id
406 && deleting_row_ids.contains(&child_row_id)
407 {
408 continue;
409 }
410
411 let constraint_label = detail.constraint_name.as_deref().unwrap_or("FOREIGN KEY");
412 match detail.on_delete {
413 ForeignKeyAction::NoAction | ForeignKeyAction::Restrict => {
414 return Err(Error::ConstraintError(format!(
415 "Violates foreign key constraint '{}' on table '{}' referencing '{}' - row is still referenced by a foreign key in a different table",
416 constraint_label,
417 detail.referencing_table_display,
418 detail.referenced_table_display,
419 )));
420 }
421 }
422 }
423 }
424
425 Ok(())
426 }
427
428 pub fn referencing_foreign_keys(
430 &self,
431 referenced_table_id: TableId,
432 ) -> LlkvResult<Vec<ForeignKeyView>> {
433 let referencing = self
434 .metadata
435 .foreign_keys_referencing(referenced_table_id)?;
436
437 if referencing.is_empty() {
438 return Ok(Vec::new());
439 }
440
441 let mut details_out = Vec::new();
442 for (child_table_id, constraint_id) in referencing {
443 let details = match self
444 .metadata
445 .foreign_key_views(self.catalog.as_ref(), child_table_id)
446 {
447 Ok(details) => details,
448 Err(Error::InvalidArgumentError(_)) | Err(Error::CatalogError(_)) => continue,
449 Err(err) => return Err(err),
450 };
451
452 if let Some(detail) = details
453 .into_iter()
454 .find(|detail| detail.constraint_id == constraint_id)
455 {
456 details_out.push(detail);
457 }
458 }
459
460 Ok(details_out)
461 }
462}
463
464fn build_field_lookup(schema_field_ids: &[FieldId]) -> FxHashMap<FieldId, usize> {
465 let mut lookup = FxHashMap::default();
466 for (idx, field_id) in schema_field_ids.iter().copied().enumerate() {
467 lookup.insert(field_id, idx);
468 }
469 lookup
470}
471
472fn validate_row_constraints_with_mapping(
473 column_constraints: &[InsertColumnConstraint],
474 rows: &[Vec<PlanValue>],
475 schema_to_row_index: &[Option<usize>],
476 column_order: &[usize],
477) -> LlkvResult<()> {
478 for constraint in column_constraints {
479 if constraint.column.nullable {
480 continue;
481 }
482
483 let Some(row_pos) = schema_to_row_index
484 .get(constraint.schema_index)
485 .and_then(|opt| *opt)
486 else {
487 return Err(Error::ConstraintError(format!(
488 "NOT NULL column '{}' missing from INSERT/UPDATE",
489 constraint.column.name
490 )));
491 };
492
493 for row in rows {
494 if matches!(row.get(row_pos), Some(PlanValue::Null)) {
495 return Err(Error::ConstraintError(format!(
496 "NOT NULL constraint failed for column '{}'",
497 constraint.column.name
498 )));
499 }
500 }
501 }
502
503 let check_columns: Vec<ConstraintColumnInfo> = column_constraints
504 .iter()
505 .map(|constraint| constraint.column.clone())
506 .collect();
507 validate_check_constraints(check_columns.as_slice(), rows, column_order)?;
508 Ok(())
509}
510
511fn build_schema_to_row_index(
512 schema_len: usize,
513 column_order: &[usize],
514) -> LlkvResult<Vec<Option<usize>>> {
515 let mut schema_to_row_index: Vec<Option<usize>> = vec![None; schema_len];
516 for (row_pos, &schema_idx) in column_order.iter().enumerate() {
517 if schema_idx >= schema_len {
518 return Err(Error::Internal(format!(
519 "column index {} out of bounds for schema (len={})",
520 schema_idx, schema_len
521 )));
522 }
523 schema_to_row_index[schema_idx] = Some(row_pos);
524 }
525 Ok(schema_to_row_index)
526}
527
528fn primary_key_context(column_names: &[String]) -> (&'static str, String) {
529 if column_names.len() == 1 {
530 ("column", column_names[0].clone())
531 } else {
532 ("columns", column_names.join(", "))
533 }
534}
535
536fn collect_row_sets(
537 rows: &[Vec<PlanValue>],
538 schema_to_row_index: &[Option<usize>],
539 schema_indices: &[usize],
540) -> Vec<Vec<PlanValue>> {
541 rows.iter()
542 .map(|row| {
543 schema_indices
544 .iter()
545 .map(|&schema_idx| {
546 schema_to_row_index
547 .get(schema_idx)
548 .and_then(|opt| {
549 opt.map(|row_pos| row.get(row_pos).cloned().unwrap_or(PlanValue::Null))
550 })
551 .unwrap_or(PlanValue::Null)
552 })
553 .collect()
554 })
555 .collect()
556}
557
558fn referencing_row_positions(
559 detail: &ForeignKeyView,
560 lookup: &FxHashMap<FieldId, usize>,
561 table_to_row_index: &[Option<usize>],
562 table_id: TableId,
563) -> LlkvResult<Vec<usize>> {
564 let mut positions = Vec::with_capacity(detail.referencing_field_ids.len());
565
566 for (idx, field_id) in detail.referencing_field_ids.iter().cloned().enumerate() {
567 let schema_index = lookup.get(&field_id).cloned().ok_or_else(|| {
568 Error::Internal(format!(
569 "referencing field id {} not found in table '{}' (table_id={})",
570 field_id, detail.referencing_table_display, table_id
571 ))
572 })?;
573
574 let position = table_to_row_index
575 .get(schema_index)
576 .and_then(|value| *value)
577 .ok_or_else(|| {
578 let column_name = detail
579 .referencing_column_names
580 .get(idx)
581 .cloned()
582 .unwrap_or_else(|| schema_index.to_string());
583 Error::InvalidArgumentError(format!(
584 "FOREIGN KEY column '{}' missing from INSERT statement",
585 column_name
586 ))
587 })?;
588
589 positions.push(position);
590 }
591
592 Ok(positions)
593}
594
595fn canonical_parent_keys(
596 detail: &ForeignKeyView,
597 parent_rows: Vec<Vec<PlanValue>>,
598) -> Vec<Vec<PlanValue>> {
599 parent_rows
600 .into_iter()
601 .filter(|values| values.len() == detail.referenced_field_ids.len())
602 .filter(|values| !values.iter().any(|value| matches!(value, PlanValue::Null)))
603 .collect()
604}
605
606fn candidate_child_keys(
607 positions: &[usize],
608 rows: &[Vec<PlanValue>],
609) -> LlkvResult<Vec<Vec<PlanValue>>> {
610 let mut keys = Vec::new();
611
612 for row in rows {
613 let mut key: Vec<PlanValue> = Vec::with_capacity(positions.len());
614 let mut contains_null = false;
615
616 for &row_pos in positions {
617 let value = row.get(row_pos).cloned().ok_or_else(|| {
618 Error::InvalidArgumentError("INSERT row is missing a required column value".into())
619 })?;
620
621 if matches!(value, PlanValue::Null) {
622 contains_null = true;
623 break;
624 }
625
626 key.push(value);
627 }
628
629 if contains_null {
630 continue;
631 }
632
633 keys.push(key);
634 }
635
636 Ok(keys)
637}