mod index;
mod relation;
use crate::db::schema::{FieldId, PersistedFieldSnapshot, SchemaRowLayout, SchemaVersion};
pub(in crate::db::schema) use index::schema_snapshot_index_integrity_detail;
pub(in crate::db::schema) use relation::schema_snapshot_relation_integrity_detail;
pub(in crate::db::schema) fn schema_snapshot_integrity_detail(
subject: &str,
version: SchemaVersion,
primary_key_field_ids: &[FieldId],
row_layout: &SchemaRowLayout,
fields: &[PersistedFieldSnapshot],
) -> Option<String> {
if version.get() == 0 {
return Some(format!("{subject} schema_version must be positive"));
}
if row_layout.version() != version {
return Some(format!(
"{subject} row-layout version mismatch: snapshot={} row_layout={}",
version.get(),
row_layout.version().get(),
));
}
if let Some(detail) = duplicate_row_layout_detail(subject, row_layout) {
return Some(detail);
}
if let Some(detail) = duplicate_retired_row_layout_detail(subject, row_layout) {
return Some(detail);
}
if let Some(detail) = duplicate_field_detail(subject, fields) {
return Some(detail);
}
if primary_key_field_ids.is_empty() {
return Some(format!("{subject} primary key has no fields"));
}
for (index, primary_key_field_id) in primary_key_field_ids.iter().enumerate() {
if primary_key_field_ids[..index].contains(primary_key_field_id) {
return Some(format!(
"{subject} duplicate primary key field: field_id={}",
primary_key_field_id.get(),
));
}
if row_layout.slot_for_field(*primary_key_field_id).is_none() {
return Some(format!(
"{subject} primary key field missing from row layout: field_id={}",
primary_key_field_id.get(),
));
}
}
if row_layout.field_to_slot().len() != fields.len() {
return Some(format!(
"{subject} row-layout field count mismatch: row_layout={} fields={}",
row_layout.field_to_slot().len(),
fields.len(),
));
}
let mut matched_primary_key_fields = 0usize;
for field in fields {
if primary_key_field_ids.contains(&field.id()) {
matched_primary_key_fields += 1;
}
let Some(row_layout_slot) = row_layout.slot_for_field(field.id()) else {
return Some(format!(
"{subject} missing row-layout slot for field_id={}",
field.id().get(),
));
};
if row_layout_slot != field.slot() {
return Some(format!(
"{subject} field slot mismatch: field_id={} field_slot={} row_layout_slot={}",
field.id().get(),
field.slot().get(),
row_layout_slot.get(),
));
}
}
if matched_primary_key_fields != primary_key_field_ids.len() {
let missing = primary_key_field_ids
.iter()
.find(|field_id| !fields.iter().any(|field| field.id() == **field_id))
.copied()
.unwrap_or(primary_key_field_ids[0]);
return Some(format!(
"{subject} primary key field missing from fields: field_id={}",
missing.get(),
));
}
None
}
fn duplicate_row_layout_detail(subject: &str, row_layout: &SchemaRowLayout) -> Option<String> {
let entries = row_layout.field_to_slot();
for (index, (field_id, slot)) in entries.iter().enumerate() {
for (other_field_id, other_slot) in &entries[index + 1..] {
if field_id == other_field_id {
return Some(format!(
"{subject} duplicate row-layout field id: field_id={}",
field_id.get(),
));
}
if slot == other_slot {
return Some(format!(
"{subject} duplicate row-layout slot: slot={}",
slot.get(),
));
}
}
}
None
}
fn duplicate_retired_row_layout_detail(
subject: &str,
row_layout: &SchemaRowLayout,
) -> Option<String> {
for (field_id, slot) in row_layout.field_to_slot() {
for (retired_field_id, retired_slot) in row_layout.retired_field_slots() {
if field_id == retired_field_id {
return Some(format!(
"{subject} active and retired row-layout field id overlap: field_id={}",
field_id.get(),
));
}
if slot == retired_slot {
return Some(format!(
"{subject} active and retired row-layout slot overlap: slot={}",
slot.get(),
));
}
}
}
let entries = row_layout.retired_field_slots();
for (index, (field_id, slot)) in entries.iter().enumerate() {
for (other_field_id, other_slot) in &entries[index + 1..] {
if field_id == other_field_id {
return Some(format!(
"{subject} duplicate retired row-layout field id: field_id={}",
field_id.get(),
));
}
if slot == other_slot {
return Some(format!(
"{subject} duplicate retired row-layout slot: slot={}",
slot.get(),
));
}
}
}
None
}
fn duplicate_field_detail(subject: &str, fields: &[PersistedFieldSnapshot]) -> Option<String> {
for (index, field) in fields.iter().enumerate() {
for other in &fields[index + 1..] {
if field.id() == other.id() {
return Some(format!(
"{subject} duplicate field id: field_id={}",
field.id().get(),
));
}
if field.name() == other.name() {
return Some(format!(
"{subject} duplicate field name: name='{}'",
field.name(),
));
}
}
if let Some(detail) = nested_leaf_detail(subject, field) {
return Some(detail);
}
}
None
}
fn nested_leaf_detail(subject: &str, field: &PersistedFieldSnapshot) -> Option<String> {
for (index, leaf) in field.nested_leaves().iter().enumerate() {
if leaf.path().is_empty() {
return Some(format!(
"{subject} empty nested leaf path: field_id={}",
field.id().get(),
));
}
for other in &field.nested_leaves()[index + 1..] {
if leaf.path() == other.path() {
return Some(format!(
"{subject} duplicate nested leaf path: field_id={} path={:?}",
field.id().get(),
leaf.path(),
));
}
}
}
None
}