mod delta;
mod error;
mod read;
mod unique;
use crate::{
db::{
data::{CanonicalSlotReader, StorageKey},
index::{IndexEntry, IndexEntryCorruption, IndexKey, canonical_index_predicate},
predicate::PredicateProgram,
},
error::InternalError,
model::{entity::EntityModel, index::IndexModel},
types::EntityTag,
};
use error::IndexPlanError;
pub(in crate::db) use delta::{
IndexDelta, IndexDeltaGroup, IndexMembershipDelta, IndexMutationPlan,
};
pub(in crate::db) use read::IndexPlanReadView;
#[derive(Clone, Copy)]
enum IndexKeyLane {
Old,
New,
}
impl IndexKeyLane {
fn missing_entity_key_error(self) -> InternalError {
match self {
Self::Old => InternalError::structural_index_removal_entity_key_required(),
Self::New => InternalError::structural_index_insertion_entity_key_required(),
}
}
}
pub(super) fn index_fields_csv(index: &IndexModel) -> String {
index.fields().join(", ")
}
pub(in crate::db) fn compile_index_membership_predicate_structural(
_entity_path: &'static str,
model: &'static EntityModel,
index: &IndexModel,
) -> Option<PredicateProgram> {
let predicate = canonical_index_predicate(index)?;
Some(PredicateProgram::compile(model, predicate))
}
pub(in crate::db) fn index_key_for_slot_reader_with_membership_structural(
entity_tag: EntityTag,
index: &IndexModel,
predicate_program: Option<&PredicateProgram>,
storage_key: StorageKey,
slots: &dyn CanonicalSlotReader,
) -> Result<Option<IndexKey>, InternalError> {
if let Some(predicate_program) = predicate_program {
let keep_row = predicate_program.eval_with_structural_slot_reader(slots)?;
if !keep_row {
return Ok(None);
}
}
let index_key = IndexKey::new_from_slots(entity_tag, storage_key, slots, index)?;
Ok(index_key)
}
fn load_structural_index_key(
lane: IndexKeyLane,
entity_tag: EntityTag,
index: &IndexModel,
predicate_program: Option<&PredicateProgram>,
storage_key: Option<StorageKey>,
slots: &dyn CanonicalSlotReader,
) -> Result<Option<IndexKey>, InternalError> {
let Some(storage_key) = storage_key else {
return Err(lane.missing_entity_key_error());
};
index_key_for_slot_reader_with_membership_structural(
entity_tag,
index,
predicate_program,
storage_key,
slots,
)
}
fn validate_existing_old_index_membership(
entity_path: &'static str,
index_fields: &str,
index_is_unique: bool,
old_storage_key: Option<StorageKey>,
old_key: Option<&IndexKey>,
old_entry: Option<&IndexEntry>,
) -> Result<(), InternalError> {
let Some(old_key) = old_key else {
return Ok(());
};
let Some(old_storage_key) = old_storage_key else {
return Err(InternalError::structural_index_removal_entity_key_required());
};
let entry = old_entry.as_ref().ok_or_else(|| {
InternalError::structural_index_entry_corruption(
entity_path,
index_fields,
IndexEntryCorruption::missing_key(old_key.to_raw(), old_storage_key),
)
})?;
if index_is_unique && entry.len() > 1 {
return Err(InternalError::structural_index_entry_corruption(
entity_path,
index_fields,
IndexEntryCorruption::NonUniqueEntry { keys: entry.len() },
));
}
if !entry.contains(old_storage_key) {
return Err(InternalError::structural_index_entry_corruption(
entity_path,
index_fields,
IndexEntryCorruption::missing_key(old_key.to_raw(), old_storage_key),
));
}
Ok(())
}
#[expect(clippy::too_many_arguments)]
pub(in crate::db) fn plan_index_mutation_for_slot_reader_structural(
entity_path: &'static str,
entity_tag: EntityTag,
model: &'static EntityModel,
read_view: &dyn IndexPlanReadView,
old_storage_key: Option<StorageKey>,
old_slots: Option<&mut dyn CanonicalSlotReader>,
new_storage_key: Option<StorageKey>,
new_slots: Option<&mut dyn CanonicalSlotReader>,
) -> Result<IndexMutationPlan, IndexPlanError> {
plan_index_mutation_for_slot_reader_structural_impl(
entity_path,
entity_tag,
model,
read_view,
old_storage_key,
old_slots,
new_storage_key,
new_slots,
)
}
#[expect(clippy::too_many_arguments)]
fn plan_index_mutation_for_slot_reader_structural_impl(
entity_path: &'static str,
entity_tag: EntityTag,
model: &'static EntityModel,
read_view: &dyn IndexPlanReadView,
old_storage_key: Option<StorageKey>,
mut old_slots: Option<&mut dyn CanonicalSlotReader>,
new_storage_key: Option<StorageKey>,
mut new_slots: Option<&mut dyn CanonicalSlotReader>,
) -> Result<IndexMutationPlan, IndexPlanError> {
let indexes = model.indexes();
let mut groups = Vec::with_capacity(indexes.len());
for index in indexes {
let index_fields = index_fields_csv(index);
let membership_program =
compile_index_membership_predicate_structural(entity_path, model, index);
let old_key = match old_slots.as_deref_mut() {
Some(slots) => load_structural_index_key(
IndexKeyLane::Old,
entity_tag,
index,
membership_program.as_ref(),
old_storage_key,
slots,
)?,
None => None,
};
let new_key = match new_slots.as_deref_mut() {
Some(slots) => load_structural_index_key(
IndexKeyLane::New,
entity_tag,
index,
membership_program.as_ref(),
new_storage_key,
slots,
)?,
None => None,
};
let old_entry = load_existing_entry_structural(
read_view,
index,
&index_fields,
old_key.as_ref(),
entity_path,
)?;
validate_existing_old_index_membership(
entity_path,
&index_fields,
index.is_unique(),
old_storage_key,
old_key.as_ref(),
old_entry.as_ref(),
)?;
unique::validate_unique_constraint_structural(
entity_path,
entity_tag,
model,
read_view,
index,
&index_fields,
if new_key.is_some() {
new_storage_key
} else {
None
},
new_key.as_ref(),
)?;
push_index_delta_group(
&mut groups,
index,
index_fields,
old_key,
new_key,
old_storage_key,
new_storage_key,
)?;
}
Ok(IndexMutationPlan::new(groups))
}
fn push_index_delta_group(
groups: &mut Vec<IndexDeltaGroup>,
index: &IndexModel,
index_fields: String,
old_key: Option<IndexKey>,
new_key: Option<IndexKey>,
old_storage_key: Option<StorageKey>,
new_storage_key: Option<StorageKey>,
) -> Result<(), InternalError> {
let mut deltas = Vec::with_capacity(2);
if let Some(old_key) = old_key {
let Some(old_storage_key) = old_storage_key else {
return Err(InternalError::index_commit_op_old_entity_key_required());
};
deltas.push(IndexDelta::remove(old_key, old_storage_key));
}
if let Some(new_key) = new_key {
let Some(new_storage_key) = new_storage_key else {
return Err(InternalError::index_commit_op_new_entity_key_required());
};
deltas.push(IndexDelta::insert(new_key, new_storage_key));
}
if !deltas.is_empty() {
groups.push(IndexDeltaGroup::new(index.store(), index_fields, deltas));
}
Ok(())
}
pub(super) fn load_existing_entry_structural(
read_view: &dyn IndexPlanReadView,
index: &IndexModel,
index_fields: &str,
key: Option<&IndexKey>,
entity_path: &'static str,
) -> Result<Option<IndexEntry>, InternalError> {
let Some(key) = key else {
return Ok(None);
};
let raw_key = key.to_raw();
read_view
.read_index_entry(index, &raw_key)?
.map(|raw_entry| {
raw_entry.try_decode().map_err(|err| {
InternalError::structural_index_entry_corruption(entity_path, index_fields, err)
})
})
.transpose()
}