mod target_keys;
use crate::{
db::{
Db,
commit::PreparedIndexMutation,
data::{
CanonicalSlotReader, DecodedDataStoreKey, RawDataStoreKey, RawRow, ScalarSlotValueRef,
ScalarValueRef, StructuralRowContract, StructuralSlotReader,
decode_accepted_relation_target_primary_key_components_bytes,
decode_runtime_value_from_accepted_field_contract,
},
index::{
IndexEntryValue, IndexId, IndexKey, IndexKeyKind, IndexRowIdentity, IndexStore,
RawIndexStoreKey, raw_keys_for_component_prefix_with_kind,
},
key_taxonomy::{EncodedPrimaryKey, PrimaryKeyComponent, PrimaryKeyValue},
relation::{
AcceptedRelationCardinality, AcceptedRelationEdgeTargetContract,
AcceptedRelationTargetAuthority, AcceptedRelationTupleEdgeLocalComponent,
RelationTargetDecodeContext, RelationTargetMismatchPolicy,
accepted_relation_target_metadata_from_kind, accepted_relation_tuple_edge_descriptor,
accepted_strong_scalar_relation_target_descriptor,
validate_relation_primary_key_component_kind,
},
schema::{AcceptedFieldDecodeContract, OwnedAcceptedRelationEdgeContract},
schema::{PersistedFieldKind, PersistedRelationStrength},
},
error::InternalError,
model::field::{FieldStorageDecode, LeafCodec},
traits::{CanisterKind, EntityKind},
types::EntityTag,
};
use std::{cell::RefCell, thread::LocalKey};
use target_keys::RelationTargetKeys;
#[derive(Clone, Copy)]
pub(crate) struct ReverseRelationSourceInfo {
path: &'static str,
entity_tag: EntityTag,
}
impl ReverseRelationSourceInfo {
pub(crate) const fn for_type<S>() -> Self
where
S: EntityKind,
{
Self {
path: S::PATH,
entity_tag: S::ENTITY_TAG,
}
}
#[must_use]
pub(in crate::db::relation) const fn entity_tag(self) -> EntityTag {
self.entity_tag
}
}
#[derive(Clone)]
struct ReverseRelationMutationTarget {
target_store: &'static LocalKey<RefCell<IndexStore>>,
reverse_key: RawIndexStoreKey,
old_contains: bool,
new_contains: bool,
}
struct ReverseRelationSourceTransition<'row, 'slots> {
source_row_contract: StructuralRowContract,
old_row_fields: Option<&'slots StructuralSlotReader<'row>>,
new_row_fields: Option<&'slots StructuralSlotReader<'row>>,
}
#[derive(Clone, Debug)]
pub(in crate::db::relation) struct AcceptedStrongRelationInfo {
relation_name: String,
relation_ordinal: usize,
local_components: AcceptedStrongRelationLocalComponents,
target: AcceptedStrongRelationTargetIdentity,
cardinality: AcceptedRelationCardinality,
}
impl AcceptedStrongRelationInfo {
fn new(
relation_name: impl Into<String>,
relation_ordinal: usize,
local_components: AcceptedStrongRelationLocalComponents,
target_contract: AcceptedRelationEdgeTargetContract,
cardinality: AcceptedRelationCardinality,
) -> Result<Self, InternalError> {
Ok(Self {
relation_name: relation_name.into(),
relation_ordinal,
local_components,
target: AcceptedStrongRelationTargetIdentity::from_relation_edge_target_contract(
target_contract,
)?,
cardinality,
})
}
#[must_use]
pub(in crate::db::relation) const fn field_name(&self) -> &str {
self.relation_name.as_str()
}
#[must_use]
pub(in crate::db::relation) const fn field_index(&self) -> usize {
self.relation_ordinal
}
#[must_use]
fn scalar_relation_field_kind(&self) -> Option<&PersistedFieldKind> {
self.scalar_local_component()
.map(AcceptedStrongRelationLocalComponent::field_kind)
}
#[must_use]
const fn local_components(&self) -> &AcceptedStrongRelationLocalComponents {
&self.local_components
}
#[must_use]
pub(in crate::db::relation) const fn target(&self) -> &AcceptedStrongRelationTargetIdentity {
&self.target
}
const fn cardinality(&self) -> AcceptedRelationCardinality {
self.cardinality
}
fn scalar_local_component(&self) -> Option<&AcceptedStrongRelationLocalComponent> {
self.local_components.scalar_component()
}
}
#[derive(Clone, Debug)]
struct AcceptedStrongRelationLocalComponents {
components: Vec<AcceptedStrongRelationLocalComponent>,
}
impl AcceptedStrongRelationLocalComponents {
fn scalar(field_index: usize, field: AcceptedFieldDecodeContract<'_>) -> Self {
Self::try_from_component_specs(&[AcceptedStrongRelationLocalComponentSpec {
index: field_index,
field,
}])
.expect("scalar relation metadata must carry one local component")
}
fn try_from_component_specs(
components: &[AcceptedStrongRelationLocalComponentSpec<'_>],
) -> Result<Self, InternalError> {
if components.is_empty() {
return Err(InternalError::relation_source_row_unsupported_key_kind(
components,
));
}
Ok(Self {
components: components
.iter()
.map(|component| AcceptedStrongRelationLocalComponent {
index: component.index,
name: component.field.field_name().to_string(),
kind: component.field.kind().clone(),
nullable: component.field.nullable(),
storage_decode: component.field.storage_decode(),
leaf_codec: component.field.leaf_codec(),
})
.collect(),
})
}
#[must_use]
const fn component_count(&self) -> usize {
self.components.len()
}
#[must_use]
const fn components(&self) -> &[AcceptedStrongRelationLocalComponent] {
self.components.as_slice()
}
#[must_use]
fn scalar_component(&self) -> Option<&AcceptedStrongRelationLocalComponent> {
let [component] = self.components.as_slice() else {
return None;
};
Some(component)
}
}
#[derive(Clone, Copy, Debug)]
struct AcceptedStrongRelationLocalComponentSpec<'a> {
index: usize,
field: AcceptedFieldDecodeContract<'a>,
}
#[derive(Clone, Debug)]
struct AcceptedStrongRelationLocalComponent {
index: usize,
name: String,
kind: PersistedFieldKind,
nullable: bool,
storage_decode: FieldStorageDecode,
leaf_codec: LeafCodec,
}
impl AcceptedStrongRelationLocalComponent {
#[must_use]
const fn field_index(&self) -> usize {
self.index
}
#[must_use]
const fn field_name(&self) -> &str {
self.name.as_str()
}
#[must_use]
const fn field_kind(&self) -> &PersistedFieldKind {
&self.kind
}
#[must_use]
const fn decode_contract(&self) -> AcceptedFieldDecodeContract<'_> {
AcceptedFieldDecodeContract::new(
self.name.as_str(),
&self.kind,
self.nullable,
self.storage_decode,
self.leaf_codec,
)
}
}
#[derive(Clone, Debug)]
pub(in crate::db::relation) struct AcceptedStrongRelationTargetIdentity {
authority: AcceptedRelationTargetAuthority,
primary_key: AcceptedStrongRelationTargetPrimaryKey,
}
impl AcceptedStrongRelationTargetIdentity {
#[cfg(test)]
fn try_new(
source_path: &str,
field_name: &str,
target_path: &str,
target_entity_name: &str,
target_entity_tag: EntityTag,
target_store_path: &str,
key_kinds: &[PersistedFieldKind],
) -> Result<Self, InternalError> {
Ok(Self {
authority: AcceptedRelationTargetAuthority::try_new(
source_path,
field_name,
target_path,
target_entity_name,
target_entity_tag,
target_store_path,
)?,
primary_key: AcceptedStrongRelationTargetPrimaryKey::try_from_component_kinds(
key_kinds,
)?,
})
}
fn from_relation_edge_target_contract(
contract: AcceptedRelationEdgeTargetContract,
) -> Result<Self, InternalError> {
Ok(Self {
primary_key: AcceptedStrongRelationTargetPrimaryKey::try_from_component_kinds(
contract.primary_key_kinds(),
)?,
authority: contract.into_target(),
})
}
#[must_use]
pub(in crate::db::relation) const fn path(&self) -> &str {
self.authority.path()
}
#[must_use]
const fn entity_name(&self) -> crate::db::identity::EntityName {
self.authority.entity_name()
}
#[must_use]
const fn entity_tag(&self) -> EntityTag {
self.authority.entity_tag()
}
#[must_use]
const fn store_path(&self) -> &str {
self.authority.store_path()
}
#[must_use]
const fn primary_key(&self) -> &AcceptedStrongRelationTargetPrimaryKey {
&self.primary_key
}
fn validate_against_db<C>(
&self,
db: &Db<C>,
source_path: &str,
field_name: &str,
) -> Result<(), InternalError>
where
C: CanisterKind,
{
self.authority
.validate_against_db(db, source_path, field_name)
.map(|_| ())
}
}
#[derive(Clone, Debug)]
struct AcceptedStrongRelationTargetPrimaryKey {
component_kinds: Vec<PersistedFieldKind>,
}
impl AcceptedStrongRelationTargetPrimaryKey {
fn try_from_component_kinds(
component_kinds: &[PersistedFieldKind],
) -> Result<Self, InternalError> {
if component_kinds.is_empty() {
return Err(InternalError::relation_source_row_unsupported_key_kind(
component_kinds,
));
}
Ok(Self {
component_kinds: component_kinds.to_vec(),
})
}
#[must_use]
const fn component_kinds(&self) -> &[PersistedFieldKind] {
self.component_kinds.as_slice()
}
#[must_use]
fn single_component_kind(&self) -> Option<&PersistedFieldKind> {
let [key_kind] = self.component_kinds.as_slice() else {
return None;
};
Some(key_kind)
}
}
const fn relation_target_key_decode_context_label(
context: RelationTargetDecodeContext,
) -> &'static str {
match context {
RelationTargetDecodeContext::DeleteValidation => "delete relation target key decode failed",
RelationTargetDecodeContext::ReverseIndexPrepare => {
"relation target key decode failed while preparing reverse index"
}
}
}
const fn relation_target_entity_mismatch_context_label(
context: RelationTargetDecodeContext,
) -> &'static str {
match context {
RelationTargetDecodeContext::DeleteValidation => {
"relation target entity mismatch during delete validation"
}
RelationTargetDecodeContext::ReverseIndexPrepare => {
"relation target entity mismatch while preparing reverse index"
}
}
}
pub(in crate::db::relation) fn accepted_strong_relations_for_row_contract<C>(
db: &Db<C>,
source_path: &str,
source_row_contract: &StructuralRowContract,
target_path_filter: Option<&str>,
) -> Result<Vec<AcceptedStrongRelationInfo>, InternalError>
where
C: CanisterKind,
{
let mut relations = accepted_strong_relations_from_edges(
db,
source_path,
source_row_contract,
target_path_filter,
)?;
for slot in 0..source_row_contract.field_count() {
if !source_row_contract.has_active_field_slot(slot) {
continue;
}
if source_row_contract
.accepted_relation_edges()
.iter()
.any(|edge| edge.local_field_slots().contains(&slot))
{
continue;
}
let field = source_row_contract.required_accepted_field_decode_contract(slot)?;
let Some(relation) =
accepted_strong_relation_from_field(source_path, slot, field, target_path_filter)?
else {
continue;
};
relations.push(relation);
}
Ok(relations)
}
fn accepted_strong_relations_from_edges<C>(
db: &Db<C>,
source_path: &str,
source_row_contract: &StructuralRowContract,
target_path_filter: Option<&str>,
) -> Result<Vec<AcceptedStrongRelationInfo>, InternalError>
where
C: CanisterKind,
{
let mut relations = Vec::new();
for edge in source_row_contract.accepted_relation_edges() {
let Some(relation) =
accepted_strong_relation_from_edge(db, source_path, source_row_contract, edge)?
else {
continue;
};
if target_path_filter.is_some_and(|filter| filter != relation.target().path()) {
continue;
}
relations.push(relation);
}
Ok(relations)
}
fn accepted_strong_relation_from_edge<C>(
db: &Db<C>,
source_path: &str,
source_row_contract: &StructuralRowContract,
edge: &OwnedAcceptedRelationEdgeContract,
) -> Result<Option<AcceptedStrongRelationInfo>, InternalError>
where
C: CanisterKind,
{
let local_fields = edge
.local_field_slots()
.iter()
.map(|slot| source_row_contract.required_accepted_field_decode_contract(*slot))
.collect::<Result<Vec<_>, _>>()?;
if let [field] = local_fields.as_slice()
&& let Some(descriptor) = accepted_strong_scalar_relation_target_descriptor(
source_path,
edge.name(),
field.field_name(),
field.kind(),
Some(edge.target_path()),
)?
{
let cardinality = descriptor.cardinality();
return Ok(Some(AcceptedStrongRelationInfo::new(
field.field_name(),
edge.local_field_slots()[0],
AcceptedStrongRelationLocalComponents::scalar(edge.local_field_slots()[0], *field),
descriptor.into_target_contract(),
cardinality,
)?));
}
let local_component_facts = local_fields
.iter()
.map(|field| AcceptedRelationTupleEdgeLocalComponent::new(field.field_name(), field.kind()))
.collect::<Vec<_>>();
let tuple_descriptor = accepted_relation_tuple_edge_descriptor(
db,
source_path,
edge.name(),
edge.target_path(),
local_component_facts.as_slice(),
)?;
let component_specs = local_fields
.iter()
.enumerate()
.map(|(offset, field)| AcceptedStrongRelationLocalComponentSpec {
index: edge.local_field_slots()[offset],
field: *field,
})
.collect::<Vec<_>>();
Ok(Some(AcceptedStrongRelationInfo::new(
edge.name(),
edge.local_field_slots()[0],
AcceptedStrongRelationLocalComponents::try_from_component_specs(
component_specs.as_slice(),
)?,
tuple_descriptor.into_target_contract(),
AcceptedRelationCardinality::Single,
)?))
}
fn accepted_strong_relation_from_field(
source_path: &str,
field_index: usize,
field: AcceptedFieldDecodeContract<'_>,
target_path_filter: Option<&str>,
) -> Result<Option<AcceptedStrongRelationInfo>, InternalError> {
let Some(target) = accepted_relation_target_metadata_from_kind(field.kind()) else {
return Ok(None);
};
if target.strength != PersistedRelationStrength::Strong {
return Ok(None);
}
if target_path_filter.is_some_and(|filter| filter != target.target_path) {
return Ok(None);
}
let Some(descriptor) = accepted_strong_scalar_relation_target_descriptor(
source_path,
field.field_name(),
field.field_name(),
field.kind(),
None,
)?
else {
return Ok(None);
};
let cardinality = descriptor.cardinality();
Ok(Some(AcceptedStrongRelationInfo::new(
field.field_name(),
field_index,
AcceptedStrongRelationLocalComponents::scalar(field_index, field),
descriptor.into_target_contract(),
cardinality,
)?))
}
fn reverse_index_id_for_relation(
source: ReverseRelationSourceInfo,
relation: &AcceptedStrongRelationInfo,
) -> Result<IndexId, InternalError> {
let ordinal = u16::try_from(relation.field_index()).map_err(|err| {
InternalError::reverse_index_ordinal_overflow(
source.path,
relation.field_name(),
relation.target().path(),
err,
)
})?;
Ok(IndexId::new(source.entity_tag, ordinal))
}
pub(super) fn reverse_index_key_bounds_for_target_primary_key_value(
source: ReverseRelationSourceInfo,
relation: &AcceptedStrongRelationInfo,
target_key_value: &PrimaryKeyValue,
) -> Result<Option<(RawIndexStoreKey, RawIndexStoreKey)>, InternalError> {
let encoded_value =
encode_reverse_relation_target_identity_component(source, relation, target_key_value)?;
let index_id = reverse_index_id_for_relation(source, relation)?;
let (start, end) = raw_keys_for_component_prefix_with_kind(
&index_id,
IndexKeyKind::System,
1,
std::slice::from_ref(&encoded_value),
);
Ok(Some((start, end)))
}
fn reverse_index_key_for_target_and_source_primary_key_value(
source: ReverseRelationSourceInfo,
relation: &AcceptedStrongRelationInfo,
target_key_value: &PrimaryKeyValue,
source_key_value: &PrimaryKeyValue,
) -> Result<Option<RawIndexStoreKey>, InternalError> {
let encoded_value =
encode_reverse_relation_target_identity_component(source, relation, target_key_value)?;
let index_id = reverse_index_id_for_relation(source, relation)?;
let key = IndexKey::new_from_components_with_primary_key_value(
&index_id,
IndexKeyKind::System,
std::slice::from_ref(&encoded_value),
source_key_value,
);
Ok(Some(key.to_raw()))
}
fn encode_reverse_relation_target_identity_component(
source: ReverseRelationSourceInfo,
relation: &AcceptedStrongRelationInfo,
target_key_value: &PrimaryKeyValue,
) -> Result<Vec<u8>, InternalError> {
EncodedPrimaryKey::encode(*target_key_value)
.map(|encoded| encoded.as_bytes().to_vec())
.map_err(|err| {
InternalError::relation_source_row_decode_failed(
source.path,
relation.field_name(),
relation.target().path(),
err,
)
})
}
fn relation_target_raw_keys_for_source_slots(
row_fields: &StructuralSlotReader<'_>,
source_info: ReverseRelationSourceInfo,
relation: &AcceptedStrongRelationInfo,
) -> Result<Vec<RawDataStoreKey>, InternalError> {
let keys = relation_target_keys_for_source_slots(row_fields, source_info, relation)?;
relation_target_raw_keys_from_relation_target_keys(source_info, relation, keys)
}
pub(in crate::db::relation) fn source_row_references_relation_target_primary_key_value(
raw_row: &RawRow,
source_row_contract: StructuralRowContract,
source_info: ReverseRelationSourceInfo,
relation: &AcceptedStrongRelationInfo,
target_key: &PrimaryKeyValue,
) -> Result<bool, InternalError> {
let row_fields =
StructuralSlotReader::from_raw_row_with_validated_contract(raw_row, source_row_contract)?;
source_slots_reference_relation_target(&row_fields, source_info, relation, target_key)
}
fn source_slots_reference_relation_target(
row_fields: &StructuralSlotReader<'_>,
source_info: ReverseRelationSourceInfo,
relation: &AcceptedStrongRelationInfo,
target_key: &PrimaryKeyValue,
) -> Result<bool, InternalError> {
let keys = relation_target_keys_for_source_slots(row_fields, source_info, relation)?;
Ok(keys.contains(target_key))
}
fn canonicalize_relation_target_keys(keys: &mut Vec<RawDataStoreKey>) {
keys.sort_unstable();
keys.dedup();
}
pub(super) fn decode_reverse_entry(
source: ReverseRelationSourceInfo,
relation: &AcceptedStrongRelationInfo,
index_key: &RawIndexStoreKey,
raw_entry: &IndexEntryValue,
) -> Result<IndexRowIdentity, InternalError> {
raw_entry.decode_row_identity(index_key).map_err(|err| {
InternalError::reverse_index_entry_corrupted(
source.path,
relation.field_name(),
relation.target().path(),
index_key,
err,
)
})
}
pub(super) fn relation_target_store<C>(
db: &Db<C>,
source: ReverseRelationSourceInfo,
relation: &AcceptedStrongRelationInfo,
) -> Result<&'static LocalKey<RefCell<IndexStore>>, InternalError>
where
C: CanisterKind,
{
relation
.target()
.validate_against_db(db, source.path, relation.field_name())?;
let target = relation.target();
db.with_store_registry(|reg| reg.try_get_store(target.store_path()))
.map(|store| store.index_store())
.map_err(|err| {
InternalError::relation_target_store_missing(
source.path,
relation.field_name(),
target.path(),
target.store_path(),
err,
)
})
}
pub(in crate::db::relation) fn decode_relation_target_data_key(
source: ReverseRelationSourceInfo,
relation: &AcceptedStrongRelationInfo,
target_raw_key: &RawDataStoreKey,
context: RelationTargetDecodeContext,
mismatch_policy: RelationTargetMismatchPolicy,
) -> Result<Option<DecodedDataStoreKey>, InternalError> {
let target_data_key = DecodedDataStoreKey::try_from_raw(target_raw_key).map_err(|err| {
InternalError::relation_target_key_decode_failed(
relation_target_key_decode_context_label(context),
source.path,
relation.field_name(),
relation.target().path(),
err,
)
})?;
let target = relation.target();
if target_data_key.entity_tag() != target.entity_tag() {
if matches!(mismatch_policy, RelationTargetMismatchPolicy::Skip) {
return Ok(None);
}
return Err(InternalError::relation_target_entity_mismatch(
relation_target_entity_mismatch_context_label(context),
source.path,
relation.field_name(),
target.path(),
target.entity_name().as_str(),
target.entity_tag().value(),
target_data_key.entity_tag().value(),
));
}
Ok(Some(target_data_key))
}
fn relation_target_raw_keys_from_relation_target_keys(
source: ReverseRelationSourceInfo,
relation: &AcceptedStrongRelationInfo,
keys: RelationTargetKeys,
) -> Result<Vec<RawDataStoreKey>, InternalError> {
let mut keys = keys
.into_values()
.into_iter()
.map(|value| raw_relation_target_key_from_primary_key_value(source, relation, &value))
.collect::<Result<Vec<_>, _>>()?;
canonicalize_relation_target_keys(&mut keys);
Ok(keys)
}
fn relation_target_keys_for_source_slots(
row_fields: &StructuralSlotReader<'_>,
source: ReverseRelationSourceInfo,
relation: &AcceptedStrongRelationInfo,
) -> Result<RelationTargetKeys, InternalError> {
if relation
.scalar_relation_field_kind()
.and_then(accepted_relation_target_metadata_from_kind)
.is_none()
{
return relation_target_keys_from_component_slots(row_fields, source, relation);
}
if let Some(keys) = relation_target_keys_from_scalar_slot(row_fields, source, relation)? {
return Ok(keys);
}
relation_target_keys_from_field_bytes(row_fields, source, relation)
}
fn relation_target_keys_from_component_slots(
row_fields: &StructuralSlotReader<'_>,
source: ReverseRelationSourceInfo,
relation: &AcceptedStrongRelationInfo,
) -> Result<RelationTargetKeys, InternalError> {
let mut components = Vec::with_capacity(relation.local_components().component_count());
let mut null_count = 0usize;
for local_component in relation.local_components().components() {
let bytes = row_fields
.required_field_bytes(local_component.field_index(), local_component.field_name())?;
let value = decode_runtime_value_from_accepted_field_contract(
local_component.decode_contract(),
bytes,
)
.map_err(|err| {
InternalError::relation_source_row_decode_failed(
source.path,
relation.field_name(),
relation.target().path(),
err,
)
})?;
if matches!(value, crate::value::Value::Null) {
null_count = null_count.saturating_add(1);
continue;
}
let Some(component) = PrimaryKeyComponent::from_runtime_value(&value) else {
return Err(InternalError::relation_source_row_decode_failed(
source.path,
relation.field_name(),
relation.target().path(),
"unsupported composite relation target component",
));
};
components.push(component);
}
if null_count == relation.local_components().component_count() {
return Ok(RelationTargetKeys::none());
}
if null_count != 0 {
return Err(InternalError::relation_source_row_decode_failed(
source.path,
relation.field_name(),
relation.target().path(),
"partial composite relation target tuple",
));
}
let key = relation_target_primary_key_value_from_components(components.as_slice())?;
Ok(RelationTargetKeys::one(&key))
}
fn relation_target_primary_key_value_from_components(
components: &[PrimaryKeyComponent],
) -> Result<PrimaryKeyValue, InternalError> {
match components {
[component] => Ok(PrimaryKeyValue::Scalar(*component)),
_ => Ok(PrimaryKeyValue::Composite(
crate::db::key_taxonomy::CompositePrimaryKeyValue::try_from_components(components)
.map_err(InternalError::relation_source_row_unsupported_key_kind)?,
)),
}
}
fn relation_target_keys_from_field_bytes(
row_fields: &StructuralSlotReader<'_>,
source: ReverseRelationSourceInfo,
relation: &AcceptedStrongRelationInfo,
) -> Result<RelationTargetKeys, InternalError> {
validate_relation_field_kind(relation)?;
let component = relation.scalar_local_component().ok_or_else(|| {
InternalError::relation_source_row_unsupported_key_kind(
relation.target().primary_key().component_kinds(),
)
})?;
let bytes = row_fields.required_field_bytes(component.field_index(), component.field_name())?;
let keys =
decode_accepted_relation_target_primary_key_components_bytes(bytes, component.field_kind())
.map_err(|err| {
InternalError::relation_source_row_decode_failed(
source.path,
relation.field_name(),
relation.target().path(),
err,
)
})?;
Ok(RelationTargetKeys::from_scalar_components(keys))
}
fn relation_target_keys_from_scalar_slot(
row_fields: &StructuralSlotReader<'_>,
source: ReverseRelationSourceInfo,
relation: &AcceptedStrongRelationInfo,
) -> Result<Option<RelationTargetKeys>, InternalError> {
let Some(field_kind) = relation.scalar_relation_field_kind() else {
return Ok(None);
};
if !matches!(field_kind, PersistedFieldKind::Relation { .. }) {
return Ok(None);
}
if !relation_scalar_slot_fast_path_key_kind_supported(field_kind) {
return Ok(None);
}
if !matches!(
row_fields.field_leaf_codec(relation.field_index())?,
LeafCodec::Scalar(_)
) {
return Ok(None);
}
match row_fields.required_scalar(relation.field_index())? {
ScalarSlotValueRef::Null => Ok(Some(RelationTargetKeys::none())),
ScalarSlotValueRef::Value(value) => {
let primary_key_value =
primary_key_value_from_relation_scalar(value).ok_or_else(|| {
InternalError::relation_source_row_unsupported_scalar_relation_key(
source.path,
relation.field_name(),
relation.target().path(),
)
})?;
let key = PrimaryKeyValue::Scalar(primary_key_value);
Ok(Some(RelationTargetKeys::one(&key)))
}
}
}
fn relation_scalar_slot_fast_path_key_kind_supported(kind: &PersistedFieldKind) -> bool {
let PersistedFieldKind::Relation { key_kind, .. } = kind else {
return false;
};
matches!(
key_kind.as_ref(),
PersistedFieldKind::Int8
| PersistedFieldKind::Int16
| PersistedFieldKind::Int32
| PersistedFieldKind::Int64
| PersistedFieldKind::Principal
| PersistedFieldKind::Subaccount
| PersistedFieldKind::Timestamp
| PersistedFieldKind::Nat8
| PersistedFieldKind::Nat16
| PersistedFieldKind::Nat32
| PersistedFieldKind::Nat64
| PersistedFieldKind::Ulid
| PersistedFieldKind::Unit
)
}
const fn primary_key_value_from_relation_scalar(
value: ScalarValueRef<'_>,
) -> Option<PrimaryKeyComponent> {
match value {
ScalarValueRef::Int(value) => Some(PrimaryKeyComponent::Int64(value)),
ScalarValueRef::Principal(value) => Some(PrimaryKeyComponent::Principal(value)),
ScalarValueRef::Subaccount(value) => Some(PrimaryKeyComponent::Subaccount(value)),
ScalarValueRef::Timestamp(value) => Some(PrimaryKeyComponent::Timestamp(value)),
ScalarValueRef::Nat(value) => Some(PrimaryKeyComponent::Nat64(value)),
ScalarValueRef::Ulid(value) => Some(PrimaryKeyComponent::Ulid(value)),
ScalarValueRef::Unit => Some(PrimaryKeyComponent::Unit),
ScalarValueRef::Blob(_)
| ScalarValueRef::Bool(_)
| ScalarValueRef::Date(_)
| ScalarValueRef::Duration(_)
| ScalarValueRef::Float32(_)
| ScalarValueRef::Float64(_)
| ScalarValueRef::Text(_) => None,
}
}
fn raw_relation_target_key_from_primary_key_value(
source: ReverseRelationSourceInfo,
relation: &AcceptedStrongRelationInfo,
value: &PrimaryKeyValue,
) -> Result<RawDataStoreKey, InternalError> {
DecodedDataStoreKey::new(relation.target().entity_tag(), value)
.to_raw()
.map_err(|err| {
InternalError::relation_source_row_decode_failed(
source.path,
relation.field_name(),
relation.target().path(),
err,
)
})
}
fn validate_relation_field_kind(
relation: &AcceptedStrongRelationInfo,
) -> Result<(), InternalError> {
match relation.cardinality() {
AcceptedRelationCardinality::Single
| AcceptedRelationCardinality::List
| AcceptedRelationCardinality::Set => {
validate_scalar_relation_target_primary_key_kind(relation)
}
}
}
fn validate_scalar_relation_target_primary_key_kind(
relation: &AcceptedStrongRelationInfo,
) -> Result<(), InternalError> {
if relation.local_components().component_count()
!= relation.target().primary_key().component_kinds().len()
{
return Err(InternalError::relation_source_row_unsupported_key_kind(
relation.target().primary_key().component_kinds(),
));
}
let Some(key_kind) = relation.target().primary_key().single_component_kind() else {
return Err(InternalError::relation_source_row_unsupported_key_kind(
relation.target().primary_key().component_kinds(),
));
};
validate_relation_primary_key_component_kind(key_kind)
}
fn prepare_reverse_relation_index_mutation_for_target(
target: ReverseRelationMutationTarget,
) -> Option<PreparedIndexMutation> {
if target.old_contains == target.new_contains {
return None;
}
let next_value = target.new_contains.then(IndexEntryValue::presence);
Some(PreparedIndexMutation::from_reverse_index_membership(
target.target_store,
target.reverse_key,
next_value,
target.old_contains,
target.new_contains,
))
}
pub(crate) fn prepare_reverse_relation_index_mutations_for_source_slot_readers<C>(
db: &Db<C>,
source: ReverseRelationSourceInfo,
source_row_contract: StructuralRowContract,
source_primary_key: &PrimaryKeyValue,
old_row_fields: Option<&StructuralSlotReader<'_>>,
new_row_fields: Option<&StructuralSlotReader<'_>>,
) -> Result<Vec<PreparedIndexMutation>, InternalError>
where
C: CanisterKind,
{
let source_rows = ReverseRelationSourceTransition {
source_row_contract,
old_row_fields,
new_row_fields,
};
prepare_reverse_relation_index_mutations_for_source_rows_impl(
db,
source,
source_primary_key,
source_rows,
)
}
fn prepare_reverse_relation_index_mutations_for_source_rows_impl<C>(
db: &Db<C>,
source: ReverseRelationSourceInfo,
source_primary_key: &PrimaryKeyValue,
source_rows: ReverseRelationSourceTransition<'_, '_>,
) -> Result<Vec<PreparedIndexMutation>, InternalError>
where
C: CanisterKind,
{
let mut ops = Vec::new();
let relations = accepted_strong_relations_for_row_contract(
db,
source.path,
&source_rows.source_row_contract,
None,
)?;
if relations.is_empty() {
return Ok(ops);
}
for relation in relations {
let old_targets = relation_target_keys_for_transition_side(
source_rows.old_row_fields,
source,
&relation,
)?;
let new_targets = relation_target_keys_for_transition_side(
source_rows.new_row_fields,
source,
&relation,
)?;
let target_store = relation_target_store(db, source, &relation)?;
let mut old_index = 0usize;
let mut new_index = 0usize;
while old_index < old_targets.len() || new_index < new_targets.len() {
let (target_raw_key, old_contains, new_contains) =
match (old_targets.get(old_index), new_targets.get(new_index)) {
(Some(old_key), Some(new_key)) => match old_key.cmp(new_key) {
std::cmp::Ordering::Less => {
old_index += 1;
(old_key.clone(), true, false)
}
std::cmp::Ordering::Greater => {
new_index += 1;
(new_key.clone(), false, true)
}
std::cmp::Ordering::Equal => {
old_index += 1;
new_index += 1;
(old_key.clone(), true, true)
}
},
(Some(old_key), None) => {
old_index += 1;
(old_key.clone(), true, false)
}
(None, Some(new_key)) => {
new_index += 1;
(new_key.clone(), false, true)
}
(None, None) => break,
};
let Some(target_data_key) = decode_relation_target_data_key(
source,
&relation,
&target_raw_key,
RelationTargetDecodeContext::ReverseIndexPrepare,
RelationTargetMismatchPolicy::Reject,
)?
else {
return Err(
InternalError::reverse_index_relation_target_decode_invariant_violated(
source.path,
relation.field_name(),
relation.target().path(),
),
);
};
let Some(reverse_key) = reverse_index_key_for_target_and_source_primary_key_value(
source,
&relation,
&target_data_key.primary_key_value(),
source_primary_key,
)?
else {
continue;
};
let target = ReverseRelationMutationTarget {
target_store,
reverse_key,
old_contains,
new_contains,
};
let Some(op) = prepare_reverse_relation_index_mutation_for_target(target) else {
continue;
};
ops.push(op);
}
}
Ok(ops)
}
fn relation_target_keys_for_transition_side(
row_fields: Option<&StructuralSlotReader<'_>>,
source: ReverseRelationSourceInfo,
relation: &AcceptedStrongRelationInfo,
) -> Result<Vec<RawDataStoreKey>, InternalError> {
match row_fields {
Some(row_fields) => relation_target_raw_keys_for_source_slots(row_fields, source, relation),
None => Ok(Vec::new()),
}
}
#[cfg(test)]
mod tests;