use crate::{
db::key_taxonomy::PrimaryKeyValue,
db::{
Db,
data::{DecodedDataStoreKey, RawDataStoreKey, StructuralRowContract},
direction::Direction,
registry::StoreHandle,
relation::{
RelationTargetDecodeContext, RelationTargetMismatchPolicy,
reverse_index::{
AcceptedStrongRelationInfo, ReverseRelationSourceInfo,
accepted_strong_relations_for_row_contract, decode_relation_target_data_key,
decode_reverse_entry, relation_target_store,
reverse_index_key_bounds_for_target_primary_key_value,
source_row_references_relation_target_primary_key_value,
},
},
schema::ensure_accepted_schema_snapshot,
},
error::InternalError,
metrics::sink::{MetricsEvent, record},
traits::{CanisterKind, EntityKind, EntityValue, Path},
};
use std::{collections::BTreeSet, ops::Bound};
struct BlockedDeleteProof {
relation: AcceptedStrongRelationInfo,
source_data_key: DecodedDataStoreKey,
target_key: PrimaryKeyValue,
}
impl BlockedDeleteProof {
fn into_internal_error<S>(self) -> Result<InternalError, InternalError>
where
S: EntityKind + EntityValue,
{
Ok(InternalError::executor_unsupported(
blocked_delete_diagnostic::<S>(
self.relation,
self.source_data_key.try_key::<S>()?,
&self.target_key,
),
))
}
}
pub(in crate::db) fn validate_delete_strong_relations_for_source<S>(
db: &Db<S::Canister>,
target_path: &str,
deleted_target_keys: &BTreeSet<RawDataStoreKey>,
) -> Result<(), InternalError>
where
S: EntityKind + EntityValue,
{
let source_info = ReverseRelationSourceInfo::for_type::<S>();
if deleted_target_keys.is_empty() {
return Ok(());
}
let source_store = db.with_store_registry(|reg| reg.try_get_store(S::Store::PATH))?;
let source_row_contract = accepted_source_row_contract::<S>(source_store)?;
let relations = accepted_strong_relations_for_row_contract(
S::PATH,
&source_row_contract,
Some(target_path),
)?;
if relations.is_empty() {
return Ok(());
}
let Some(blocked) = validate_delete_strong_relations_structural(
db,
source_info,
S::PATH,
source_row_contract,
relations,
source_store,
deleted_target_keys,
)?
else {
return Ok(());
};
Err(blocked.into_internal_error::<S>()?)
}
fn validate_delete_strong_relations_structural<C>(
db: &Db<C>,
source_info: ReverseRelationSourceInfo,
source_path: &'static str,
source_row_contract: StructuralRowContract,
relations: Vec<AcceptedStrongRelationInfo>,
source_store: StoreHandle,
deleted_target_keys: &BTreeSet<RawDataStoreKey>,
) -> Result<Option<BlockedDeleteProof>, InternalError>
where
C: CanisterKind,
{
for relation in relations {
let target_index_store = relation_target_store(db, source_info, &relation)?;
for target_raw_key in deleted_target_keys {
let Some(target_data_key) = decode_relation_target_data_key(
source_info,
&relation,
target_raw_key,
RelationTargetDecodeContext::DeleteValidation,
RelationTargetMismatchPolicy::Skip,
)?
else {
continue;
};
let target_primary_key = target_data_key.primary_key_value();
let Some((reverse_start, reverse_end)) =
reverse_index_key_bounds_for_target_primary_key_value(
source_info,
&relation,
&target_primary_key,
)?
else {
continue;
};
record(MetricsEvent::RelationValidation {
entity_path: source_path,
reverse_lookups: 1,
blocked_deletes: 0,
});
let bounds = (Bound::Included(reverse_start), Bound::Excluded(reverse_end));
let mut blocked = None;
target_index_store.with_borrow(|store| {
store.visit_raw_entries_in_range(
(&bounds.0, &bounds.1),
Direction::Asc,
|reverse_key, raw_entry| {
let entry =
decode_reverse_entry(source_info, &relation, reverse_key, raw_entry)?;
let source_key = *entry.primary_key_value();
{
let source_data_key =
DecodedDataStoreKey::new(source_info.entity_tag(), &source_key);
let source_raw_key = source_data_key.to_raw()?;
let source_raw_row =
source_store.with_data(|store| store.get(&source_raw_key));
let Some(source_raw_row) = source_raw_row else {
let target = relation.target();
return Err(InternalError::reverse_index_entry_corrupted(
source_path,
relation.field_name(),
target.path(),
reverse_key,
format!(
"reverse index points at missing source row: source_id={source_key:?} key={target_primary_key:?}"
),
));
};
let still_references_target =
source_row_references_relation_target_primary_key_value(
&source_raw_row,
source_row_contract.clone(),
source_info,
&relation,
&target_primary_key,
)?;
if still_references_target {
record(MetricsEvent::RelationValidation {
entity_path: source_path,
reverse_lookups: 0,
blocked_deletes: 1,
});
blocked = Some(BlockedDeleteProof {
relation: relation.clone(),
source_data_key,
target_key: target_primary_key,
});
return Ok(true);
}
}
Ok(false)
},
)
})?;
if let Some(blocked) = blocked {
return Ok(Some(blocked));
}
}
}
Ok(None)
}
fn accepted_source_row_contract<S>(
source_store: StoreHandle,
) -> Result<StructuralRowContract, InternalError>
where
S: EntityKind,
{
let accepted = source_store.with_schema_mut(|schema_store| {
ensure_accepted_schema_snapshot(schema_store, S::ENTITY_TAG, S::PATH, S::MODEL)
})?;
StructuralRowContract::from_accepted_schema_snapshot(S::PATH, &accepted)
}
fn blocked_delete_diagnostic<S>(
relation: AcceptedStrongRelationInfo,
source_key: S::Key,
target_key: &PrimaryKeyValue,
) -> String
where
S: EntityKind + EntityValue,
{
format!(
"delete blocked by strong relation: source_entity={} source_field={} source_id={source_key:?} target_entity={} target_key={:?}; action=delete source rows or retarget relation before deleting target",
S::PATH,
relation.field_name(),
relation.target().path(),
target_key.as_runtime_value(),
)
}