use crate::{
db::{
Db,
data::{DataKey, RawDataKey, StructuralRowContract},
registry::StoreHandle,
relation::{
RelationTargetDecodeContext, RelationTargetMismatchPolicy,
reverse_index::{
AcceptedStrongRelationInfo, ReverseRelationSourceInfo,
accepted_strong_relations_for_row_contract, decode_relation_target_data_key,
decode_reverse_entry, relation_target_store,
reverse_index_key_for_target_storage_key, source_row_references_relation_target,
},
},
schema::ensure_accepted_schema_snapshot,
},
error::InternalError,
metrics::sink::{MetricsEvent, record},
traits::{CanisterKind, EntityKind, EntityValue, Path},
value::{StorageKey, storage_key_as_runtime_value},
};
use std::collections::BTreeSet;
struct BlockedDeleteProof {
relation: AcceptedStrongRelationInfo,
source_data_key: DataKey,
target_key: StorageKey,
}
impl BlockedDeleteProof {
fn into_internal_error<S>(self) -> Result<InternalError, InternalError>
where
S: EntityKind + EntityValue,
{
Ok(InternalError::executor_unsupported(
blocked_delete_diagnostic::<S>(
self.relation,
self.source_data_key.try_key::<S>()?,
self.target_key,
),
))
}
}
pub(in crate::db) fn validate_delete_strong_relations_for_source<S>(
db: &Db<S::Canister>,
target_path: &str,
deleted_target_keys: &BTreeSet<RawDataKey>,
) -> Result<(), InternalError>
where
S: EntityKind + EntityValue,
{
let source_info = ReverseRelationSourceInfo::for_type::<S>();
if deleted_target_keys.is_empty() {
return Ok(());
}
let source_store = db.with_store_registry(|reg| reg.try_get_store(S::Store::PATH))?;
let source_row_contract = accepted_source_row_contract::<S>(source_store)?;
let relations = accepted_strong_relations_for_row_contract(
S::PATH,
&source_row_contract,
Some(target_path),
)?;
if relations.is_empty() {
return Ok(());
}
let Some(blocked) = validate_delete_strong_relations_structural(
db,
source_info,
S::PATH,
source_row_contract,
relations,
source_store,
deleted_target_keys,
)?
else {
return Ok(());
};
Err(blocked.into_internal_error::<S>()?)
}
fn validate_delete_strong_relations_structural<C>(
db: &Db<C>,
source_info: ReverseRelationSourceInfo,
source_path: &'static str,
source_row_contract: StructuralRowContract,
relations: Vec<AcceptedStrongRelationInfo>,
source_store: StoreHandle,
deleted_target_keys: &BTreeSet<RawDataKey>,
) -> Result<Option<BlockedDeleteProof>, InternalError>
where
C: CanisterKind,
{
for relation in relations {
let target_index_store = relation_target_store(db, source_info, &relation)?;
for target_raw_key in deleted_target_keys {
let Some(target_data_key) = decode_relation_target_data_key(
source_info,
&relation,
target_raw_key,
RelationTargetDecodeContext::DeleteValidation,
RelationTargetMismatchPolicy::Skip,
)?
else {
continue;
};
let target_storage_key = target_data_key.storage_key();
let Some(reverse_key) = reverse_index_key_for_target_storage_key(
source_info,
&relation,
target_storage_key,
)?
else {
continue;
};
record(MetricsEvent::RelationValidation {
entity_path: source_path,
reverse_lookups: 1,
blocked_deletes: 0,
});
let Some(raw_entry) = target_index_store.with_borrow(|store| store.get(&reverse_key))
else {
continue;
};
let entry = decode_reverse_entry(source_info, &relation, &reverse_key, &raw_entry)?;
for source_key in entry.iter_ids() {
let source_data_key = DataKey::new(source_info.entity_tag(), source_key);
let source_raw_key = source_data_key.to_raw()?;
let source_raw_row = source_store.with_data(|store| store.get(&source_raw_key));
let Some(source_raw_row) = source_raw_row else {
let target = relation.target();
return Err(InternalError::reverse_index_entry_corrupted(
source_path,
relation.field_name(),
target.path(),
&reverse_key,
format!(
"reverse index points at missing source row: source_id={source_key:?} key={:?}",
target_data_key.storage_key(),
),
));
};
let still_references_target = source_row_references_relation_target(
&source_raw_row,
source_row_contract.clone(),
source_info,
&relation,
target_storage_key,
)?;
if still_references_target {
record(MetricsEvent::RelationValidation {
entity_path: source_path,
reverse_lookups: 0,
blocked_deletes: 1,
});
return Ok(Some(BlockedDeleteProof {
relation,
source_data_key,
target_key: target_storage_key,
}));
}
}
}
}
Ok(None)
}
fn accepted_source_row_contract<S>(
source_store: StoreHandle,
) -> Result<StructuralRowContract, InternalError>
where
S: EntityKind,
{
let accepted = source_store.with_schema_mut(|schema_store| {
ensure_accepted_schema_snapshot(schema_store, S::ENTITY_TAG, S::PATH, S::MODEL)
})?;
StructuralRowContract::from_accepted_schema_snapshot(S::PATH, &accepted)
}
fn blocked_delete_diagnostic<S>(
relation: AcceptedStrongRelationInfo,
source_key: S::Key,
target_key: StorageKey,
) -> String
where
S: EntityKind + EntityValue,
{
format!(
"delete blocked by strong relation: source_entity={} source_field={} source_id={source_key:?} target_entity={} target_key={:?}; action=delete source rows or retarget relation before deleting target",
S::PATH,
relation.field_name(),
relation.target().path(),
storage_key_as_runtime_value(&target_key),
)
}