use crate::{
db::{
Db,
commit::{
CommitIndexOp, CommitRowOp, CommitSchemaFingerprint, PreparedIndexMutation,
PreparedRowCommitOp,
},
data::{
CanonicalRow, CanonicalSlotReader, DataStore, DecodedDataStoreKey, RawDataStoreKey,
RawRow, StructuralRowContract, StructuralSlotReader,
canonical_row_from_structural_slot_reader_with_accepted_contract,
},
index::{
IndexDelta, IndexDeltaGroup, IndexEntryValue, IndexMembershipDelta, IndexMutationPlan,
IndexPlanReadView, IndexReadContract, IndexRowIdentity, RawIndexStoreKey,
StructuralIndexEntryReader, StructuralPrimaryRowReader,
plan_index_mutation_for_slot_reader_structural,
},
key_taxonomy::PrimaryKeyValue,
relation::{
ReverseRelationSourceInfo,
prepare_reverse_relation_index_mutations_for_source_slot_readers,
},
schema::{SchemaInfo, accepted_commit_schema_fingerprint, ensure_accepted_schema_snapshot},
},
error::{ErrorClass, InternalError},
metrics::sink::{MetricsEvent, record},
model::entity::EntityModel,
traits::{CanisterKind, EntityKind, EntityValue, Path},
types::EntityTag,
};
use std::{cell::RefCell, ops::Bound, thread::LocalKey};
struct CommitPrepareAuthority {
entity_path: &'static str,
entity_tag: EntityTag,
schema_fingerprint: crate::db::commit::CommitSchemaFingerprint,
data_store_path: &'static str,
relation_source: ReverseRelationSourceInfo,
model: &'static EntityModel,
}
struct AcceptedCommitSchemaContracts {
row_contract: StructuralRowContract,
schema_info: SchemaInfo,
}
impl CommitPrepareAuthority {
const fn for_type_with_schema_fingerprint<E>(
schema_fingerprint: CommitSchemaFingerprint,
) -> Self
where
E: EntityKind + Path,
{
Self {
entity_path: E::PATH,
entity_tag: E::ENTITY_TAG,
schema_fingerprint,
data_store_path: E::Store::PATH,
relation_source: ReverseRelationSourceInfo::for_type::<E>(),
model: E::MODEL,
}
}
}
struct CommitInputs {
raw_key: RawDataStoreKey,
data_key: DecodedDataStoreKey,
old_row: Option<RawRow>,
new_row: Option<RawRow>,
}
impl CommitInputs {
fn schema_fingerprint_mismatch(
entity_path: &str,
marker: crate::db::commit::CommitSchemaFingerprint,
runtime: crate::db::commit::CommitSchemaFingerprint,
) -> InternalError {
InternalError::store_unsupported(format!(
"commit marker schema fingerprint mismatch for entity '{entity_path}': marker={marker:?}, runtime={runtime:?}",
))
}
}
struct DecodedCommitRows<'a> {
old_slots: Option<StructuralSlotReader<'a>>,
new_slots: Option<StructuralSlotReader<'a>>,
}
struct CommitIndexPlanReadView<'a, C: CanisterKind> {
db: &'a Db<C>,
row_reader: &'a dyn StructuralPrimaryRowReader,
index_reader: &'a dyn StructuralIndexEntryReader,
}
impl<C> CommitIndexPlanReadView<'_, C>
where
C: CanisterKind,
{
fn index_store(
&self,
index_store: &str,
) -> Result<&'static LocalKey<RefCell<crate::db::index::IndexStore>>, InternalError> {
self.db
.with_store_registry(|registry| registry.try_get_store(index_store))
.map(|store| store.index_store())
}
}
impl<C> IndexPlanReadView for CommitIndexPlanReadView<'_, C>
where
C: CanisterKind,
{
fn read_primary_row(&self, key: &DecodedDataStoreKey) -> Result<Option<RawRow>, InternalError> {
self.row_reader.read_primary_row_structural(key)
}
fn read_index_entry(
&self,
index: IndexReadContract<'_>,
key: &RawIndexStoreKey,
) -> Result<Option<IndexEntryValue>, InternalError> {
let index_store = self.index_store(index.store_path())?;
self.index_reader
.read_index_entry_structural(index_store, key)
}
fn read_index_keys_in_raw_range(
&self,
entity_path: &'static str,
entity_tag: EntityTag,
index: IndexReadContract<'_>,
bounds: (&Bound<RawIndexStoreKey>, &Bound<RawIndexStoreKey>),
limit: usize,
) -> Result<Vec<PrimaryKeyValue>, InternalError> {
let index_store = self.index_store(index.store_path())?;
self.index_reader.read_index_keys_in_raw_range_structural(
entity_path,
entity_tag,
index_store,
index,
bounds,
limit,
)
}
}
pub(in crate::db) fn prepare_row_commit_for_entity_with_structural_readers<
E: EntityKind + EntityValue,
>(
db: &Db<E::Canister>,
op: &CommitRowOp,
row_reader: &dyn StructuralPrimaryRowReader,
index_reader: &dyn StructuralIndexEntryReader,
) -> Result<PreparedRowCommitOp, InternalError> {
let schema_fingerprint = accepted_commit_schema_fingerprint_for_entity::<E>(db)?;
prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint::<E>(
db,
op,
row_reader,
index_reader,
schema_fingerprint,
)
}
fn accepted_commit_schema_fingerprint_for_entity<E>(
db: &Db<E::Canister>,
) -> Result<CommitSchemaFingerprint, InternalError>
where
E: EntityKind,
{
let store = db.with_store_registry(|reg| reg.try_get_store(E::Store::PATH))?;
let accepted = store.with_schema_mut(|schema_store| {
ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
})?;
accepted_commit_schema_fingerprint(&accepted)
}
pub(in crate::db) fn prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint<
E: EntityKind + EntityValue,
>(
db: &Db<E::Canister>,
op: &CommitRowOp,
row_reader: &dyn StructuralPrimaryRowReader,
index_reader: &dyn StructuralIndexEntryReader,
schema_fingerprint: CommitSchemaFingerprint,
) -> Result<PreparedRowCommitOp, InternalError> {
prepare_row_commit_for_entity_impl(
db,
op,
CommitPrepareAuthority::for_type_with_schema_fingerprint::<E>(schema_fingerprint),
row_reader,
index_reader,
)
}
fn decode_commit_marker_rows_for_preflight<'a>(
data_key: &DecodedDataStoreKey,
before: Option<&'a RawRow>,
after: Option<&'a RawRow>,
row_contract: StructuralRowContract,
) -> Result<DecodedCommitRows<'a>, InternalError> {
let old_slots =
decode_optional_commit_marker_row_slots(data_key, before, "before", row_contract.clone())?;
let new_slots =
decode_optional_commit_marker_row_slots(data_key, after, "after", row_contract)?;
Ok(DecodedCommitRows {
old_slots,
new_slots,
})
}
#[inline(never)]
fn prepare_row_commit_for_entity_impl<C>(
db: &Db<C>,
op: &CommitRowOp,
authority: CommitPrepareAuthority,
row_reader: &dyn StructuralPrimaryRowReader,
index_reader: &dyn StructuralIndexEntryReader,
) -> Result<PreparedRowCommitOp, InternalError>
where
C: crate::traits::CanisterKind,
{
let structural = prepare_row_commit_structural_inputs(op, &authority)?;
let schema_contracts = accepted_commit_schema_contracts(db, &authority)?;
let (decoded, forward_index_ops) = {
let mut decoded = decode_commit_marker_rows_for_preflight(
&structural.data_key,
structural.old_row.as_ref(),
structural.new_row.as_ref(),
schema_contracts.row_contract.clone(),
)?;
let has_accepted_field_path_indexes =
!schema_contracts.schema_info.field_path_indexes().is_empty();
let has_accepted_expression_indexes =
!schema_contracts.schema_info.expression_indexes().is_empty();
let index_plan = if !has_accepted_field_path_indexes && !has_accepted_expression_indexes {
empty_forward_index_plan()
} else {
prepare_forward_index_commit_leaf(
db,
&authority,
row_reader,
index_reader,
&schema_contracts,
&structural.data_key,
&mut decoded,
)?
};
let forward_index_ops = materialize_forward_index_commit_ops(db, index_plan)?;
(decoded, forward_index_ops)
};
let source_primary_key = structural.data_key.primary_key_value();
let reverse_index_ops = prepare_reverse_relation_index_mutations_for_source_slot_readers(
db,
authority.relation_source,
schema_contracts.row_contract.clone(),
&source_primary_key,
decoded.old_slots.as_ref(),
decoded.new_slots.as_ref(),
)?;
let data_value = decoded
.new_slots
.as_ref()
.map(canonical_row_from_structural_slot_reader_with_accepted_contract)
.transpose()?;
finalize_row_commit_structural(
db,
authority,
structural.raw_key,
forward_index_ops,
reverse_index_ops,
data_value,
)
}
const fn empty_forward_index_plan() -> IndexMutationPlan {
IndexMutationPlan::new(Vec::new())
}
fn prepare_forward_index_commit_leaf<C>(
db: &Db<C>,
authority: &CommitPrepareAuthority,
row_reader: &dyn StructuralPrimaryRowReader,
index_reader: &dyn StructuralIndexEntryReader,
schema_contracts: &AcceptedCommitSchemaContracts,
data_key: &DecodedDataStoreKey,
decoded: &mut DecodedCommitRows<'_>,
) -> Result<IndexMutationPlan, InternalError>
where
C: crate::traits::CanisterKind,
{
let primary_key = data_key.primary_key_value();
let read_view = CommitIndexPlanReadView {
db,
row_reader,
index_reader,
};
match plan_index_mutation_for_slot_reader_structural(
authority.entity_path,
authority.entity_tag,
&schema_contracts.schema_info,
&read_view,
&schema_contracts.row_contract,
decoded.old_slots.as_ref().map(|_| &primary_key),
decoded
.old_slots
.as_mut()
.map(|slots| slots as &mut dyn CanonicalSlotReader),
decoded.new_slots.as_ref().map(|_| &primary_key),
decoded
.new_slots
.as_mut()
.map(|slots| slots as &mut dyn CanonicalSlotReader),
) {
Ok(index_plan) => Ok(index_plan),
Err(err) => {
if let Some(entity_path) = err.unique_violation_entity_path() {
record(MetricsEvent::UniqueViolation { entity_path });
}
Err(err.into_internal_error())
}
}
}
fn decode_optional_commit_marker_row_slots<'a>(
data_key: &DecodedDataStoreKey,
row: Option<&'a RawRow>,
label: &str,
row_contract: StructuralRowContract,
) -> Result<Option<StructuralSlotReader<'a>>, InternalError> {
row.map(|row| decode_commit_marker_structural_slots(data_key, row, label, row_contract))
.transpose()
}
fn decode_commit_marker_structural_slots<'a>(
data_key: &DecodedDataStoreKey,
row: &'a RawRow,
label: &str,
row_contract: StructuralRowContract,
) -> Result<StructuralSlotReader<'a>, InternalError> {
let slots = StructuralSlotReader::from_raw_row_with_validated_contract(row, row_contract)
.map_err(|err| {
let message = format!("commit marker {label} row: {err}");
if err.class() == ErrorClass::IncompatiblePersistedFormat {
InternalError::serialize_incompatible_persisted_format(message)
} else {
InternalError::serialize_corruption(message)
}
})?;
slots.validate_primary_key(data_key).map_err(|err| {
InternalError::store_corruption(format!("commit marker {label} row key mismatch: {err}"))
})?;
Ok(slots)
}
fn accepted_commit_schema_contracts<C>(
db: &Db<C>,
authority: &CommitPrepareAuthority,
) -> Result<AcceptedCommitSchemaContracts, InternalError>
where
C: CanisterKind,
{
let store = db.with_store_registry(|reg| reg.try_get_store(authority.data_store_path))?;
let accepted = store.with_schema_mut(|schema_store| {
ensure_accepted_schema_snapshot(
schema_store,
authority.entity_tag,
authority.entity_path,
authority.model,
)
})?;
Ok(AcceptedCommitSchemaContracts {
row_contract: StructuralRowContract::from_accepted_schema_snapshot(
authority.entity_path,
&accepted,
)?,
schema_info: SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
authority.model,
&accepted,
true,
),
})
}
fn prepare_row_commit_structural_inputs(
op: &CommitRowOp,
authority: &CommitPrepareAuthority,
) -> Result<CommitInputs, InternalError> {
if op.entity_path != authority.entity_path {
return Err(InternalError::store_corruption(format!(
"commit marker entity path mismatch: expected '{}', found '{}'",
authority.entity_path, op.entity_path,
)));
}
if op.schema_fingerprint != authority.schema_fingerprint {
return Err(CommitInputs::schema_fingerprint_mismatch(
authority.entity_path,
op.schema_fingerprint,
authority.schema_fingerprint,
));
}
let raw_key = op.key.clone();
let data_key = DecodedDataStoreKey::try_from_raw(&raw_key).map_err(|_| {
InternalError::store_corruption("commit marker row op key decode: invalid primary key")
})?;
let old_row = op
.before
.as_ref()
.map(|bytes| RawRow::from_untrusted_bytes(bytes.clone()))
.transpose()?;
let new_row = op
.after
.as_ref()
.map(|bytes| RawRow::from_untrusted_bytes(bytes.clone()))
.transpose()?;
if old_row.is_none() && new_row.is_none() {
return Err(InternalError::store_corruption(
"commit marker row op is a no-op (before/after both missing)",
));
}
Ok(CommitInputs {
raw_key,
data_key,
old_row,
new_row,
})
}
fn finalize_row_commit_structural<C>(
db: &Db<C>,
authority: CommitPrepareAuthority,
data_key: RawDataStoreKey,
forward_index_ops: Vec<CommitIndexOp>,
reverse_index_ops: Vec<PreparedIndexMutation>,
data_value: Option<CanonicalRow>,
) -> Result<PreparedRowCommitOp, InternalError>
where
C: crate::traits::CanisterKind,
{
let data_store = db.with_store_registry(|reg| reg.try_get_store(authority.data_store_path))?;
Ok(materialize_prepared_row_commit(
forward_index_ops,
reverse_index_ops,
data_store.data_store(),
data_key,
data_value,
))
}
fn materialize_prepared_row_commit(
forward_index_ops: Vec<CommitIndexOp>,
reverse_index_ops: Vec<PreparedIndexMutation>,
data_store: &'static LocalKey<RefCell<DataStore>>,
data_key: RawDataStoreKey,
data_value: Option<CanonicalRow>,
) -> PreparedRowCommitOp {
let mut index_ops = Vec::with_capacity(forward_index_ops.len() + reverse_index_ops.len());
index_ops.extend(
forward_index_ops
.into_iter()
.map(PreparedIndexMutation::from),
);
index_ops.extend(reverse_index_ops);
PreparedRowCommitOp {
index_ops,
data_store,
data_key,
data_value,
}
}
fn materialize_forward_index_commit_ops<C>(
db: &Db<C>,
index_plan: IndexMutationPlan,
) -> Result<Vec<CommitIndexOp>, InternalError>
where
C: crate::traits::CanisterKind,
{
let mut commit_ops = Vec::with_capacity(index_plan.groups.len().saturating_mul(2));
for group in index_plan.groups {
build_commit_ops_for_index_group(&mut commit_ops, db, group)?;
}
Ok(commit_ops)
}
fn build_commit_ops_for_index_group<C>(
commit_ops: &mut Vec<CommitIndexOp>,
db: &Db<C>,
group: IndexDeltaGroup,
) -> Result<(), InternalError>
where
C: crate::traits::CanisterKind,
{
let mut remove_delta = None;
let mut insert_delta = None;
let index_store = db
.with_store_registry(|registry| registry.try_get_store(group.index_store.as_str()))
.map(|store| store.index_store())?;
for delta in group.deltas {
match delta {
IndexDelta::Remove(delta) => remove_delta = Some(delta),
IndexDelta::Insert(delta) => insert_delta = Some(delta),
}
}
build_commit_ops_for_index_delta_pair(commit_ops, index_store, remove_delta, insert_delta);
Ok(())
}
fn build_commit_ops_for_index_delta_pair(
commit_ops: &mut Vec<CommitIndexOp>,
store: &'static LocalKey<RefCell<crate::db::index::IndexStore>>,
remove_delta: Option<IndexMembershipDelta>,
insert_delta: Option<IndexMembershipDelta>,
) {
if remove_delta
.as_ref()
.zip(insert_delta.as_ref())
.is_some_and(|(old_delta, new_delta)| old_delta.key == new_delta.key)
{
if let Some(insert_delta) = insert_delta {
push_commit_op_for_index_entry(
commit_ops,
store,
insert_delta.key.to_raw(),
Some(IndexRowIdentity::new(&insert_delta.primary_key)),
CommitIndexOp::unchanged,
);
}
return;
}
let mut first: Option<(
RawIndexStoreKey,
Option<IndexRowIdentity>,
CommitIndexOpBuilder,
)> = None;
let mut second: Option<(
RawIndexStoreKey,
Option<IndexRowIdentity>,
CommitIndexOpBuilder,
)> = None;
if let Some(remove_delta) = remove_delta {
insert_commit_candidate(
&mut first,
&mut second,
remove_delta.key.to_raw(),
None,
CommitIndexOp::index_remove,
);
}
if let Some(insert_delta) = insert_delta {
insert_commit_candidate(
&mut first,
&mut second,
insert_delta.key.to_raw(),
Some(IndexRowIdentity::new(&insert_delta.primary_key)),
CommitIndexOp::index_insert,
);
}
if let Some((raw_key, entry, build_commit_op)) = first {
push_commit_op_for_index_entry(commit_ops, store, raw_key, entry, build_commit_op);
}
if let Some((raw_key, entry, build_commit_op)) = second {
push_commit_op_for_index_entry(commit_ops, store, raw_key, entry, build_commit_op);
}
}
fn insert_commit_candidate(
first: &mut Option<(
RawIndexStoreKey,
Option<IndexRowIdentity>,
CommitIndexOpBuilder,
)>,
second: &mut Option<(
RawIndexStoreKey,
Option<IndexRowIdentity>,
CommitIndexOpBuilder,
)>,
raw_key: RawIndexStoreKey,
entry: Option<IndexRowIdentity>,
build_commit_op: CommitIndexOpBuilder,
) {
match first {
None => *first = Some((raw_key, entry, build_commit_op)),
Some((first_key, _, _)) if raw_key < *first_key => {
*second = first.take();
*first = Some((raw_key, entry, build_commit_op));
}
_ => *second = Some((raw_key, entry, build_commit_op)),
}
}
type CommitIndexOpBuilder = fn(
&'static LocalKey<RefCell<crate::db::index::IndexStore>>,
RawIndexStoreKey,
Option<IndexEntryValue>,
) -> CommitIndexOp;
fn push_commit_op_for_index_entry(
commit_ops: &mut Vec<CommitIndexOp>,
store: &'static LocalKey<RefCell<crate::db::index::IndexStore>>,
raw_key: RawIndexStoreKey,
entry: Option<IndexRowIdentity>,
build_commit_op: CommitIndexOpBuilder,
) {
let value = entry.map(|_| IndexEntryValue::presence());
commit_ops.push(build_commit_op(store, raw_key, value));
}