use crate::{
db::{
Db,
commit::{
CommitRowOp, CommitSchemaFingerprint,
prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint,
},
data::{CanonicalRow, DataKey, PersistedRow, RawRow, UpdatePatch},
executor::{
Context, ExecutorError,
mutation::{
MutationInput, PreparedRowOpDelta, commit_prepared_single_save_row_op_with_window,
commit_save_row_ops_with_window, emit_index_delta_metrics, mutation_write_context,
synchronized_store_handles_for_prepared_row_ops,
},
},
relation::model_has_strong_relation_targets,
schema::{SchemaInfo, commit_schema_fingerprint_for_entity},
},
error::InternalError,
metrics::sink::{ExecKind, MetricsEvent, Span, record},
traits::{EntityValue, FieldValue, Storable},
};
use candid::CandidType;
use derive_more::Display;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Display, Serialize)]
enum SaveMode {
#[default]
Insert,
Replace,
Update,
}
#[derive(Clone, Copy)]
pub(in crate::db) struct SaveExecutor<E: PersistedRow + EntityValue> {
pub(in crate::db::executor::mutation) db: Db<E::Canister>,
}
#[derive(Clone, Copy)]
enum SaveRule {
RequireAbsent,
RequirePresent,
AllowAny,
}
impl SaveRule {
const fn from_mode(mode: SaveMode) -> Self {
match mode {
SaveMode::Insert => Self::RequireAbsent,
SaveMode::Update => Self::RequirePresent,
SaveMode::Replace => Self::AllowAny,
}
}
}
#[derive(Clone, Copy)]
pub enum MutationMode {
#[allow(dead_code)]
Insert,
#[allow(dead_code)]
Replace,
Update,
}
impl MutationMode {
const fn save_rule(self) -> SaveRule {
match self {
Self::Insert => SaveRule::RequireAbsent,
Self::Replace => SaveRule::AllowAny,
Self::Update => SaveRule::RequirePresent,
}
}
}
impl<E: PersistedRow + EntityValue> SaveExecutor<E> {
#[must_use]
pub(in crate::db) const fn new(db: Db<E::Canister>, _debug: bool) -> Self {
Self { db }
}
pub(crate) fn insert(&self, entity: E) -> Result<E, InternalError> {
self.save_entity(SaveMode::Insert, entity)
}
pub(crate) fn update(&self, entity: E) -> Result<E, InternalError> {
self.save_entity(SaveMode::Update, entity)
}
#[allow(dead_code)]
pub(in crate::db) fn insert_structural(
&self,
key: E::Key,
patch: UpdatePatch,
) -> Result<E, InternalError> {
self.apply_structural_mutation(MutationMode::Insert, key, patch)
}
#[allow(dead_code)]
pub(in crate::db) fn replace_structural(
&self,
key: E::Key,
patch: UpdatePatch,
) -> Result<E, InternalError> {
self.apply_structural_mutation(MutationMode::Replace, key, patch)
}
#[allow(dead_code)]
pub(in crate::db) fn update_structural(
&self,
key: E::Key,
patch: UpdatePatch,
) -> Result<E, InternalError> {
self.apply_structural_mutation(MutationMode::Update, key, patch)
}
pub(crate) fn replace(&self, entity: E) -> Result<E, InternalError> {
self.save_entity(SaveMode::Replace, entity)
}
fn save_batch_non_atomic(
&self,
mode: SaveMode,
entities: impl IntoIterator<Item = E>,
) -> Result<Vec<E>, InternalError> {
let iter = entities.into_iter();
let mut out = Vec::with_capacity(iter.size_hint().0);
let ctx = mutation_write_context::<E>(&self.db)?;
let save_rule = SaveRule::from_mode(mode);
let schema = Self::schema_info()?;
let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
let validate_relations = model_has_strong_relation_targets(E::MODEL);
let mut batch_span = None;
let mut batch_delta = PreparedRowOpDelta {
index_inserts: 0,
index_removes: 0,
reverse_index_inserts: 0,
reverse_index_removes: 0,
};
for entity in iter {
let span = batch_span.get_or_insert_with(|| Span::<E>::new(ExecKind::Save));
let result = (|| {
let (saved, marker_row_op) = self.prepare_entity_save_row_op(
&ctx,
save_rule,
schema,
schema_fingerprint,
validate_relations,
entity,
)?;
let prepared_row_op =
prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint::<
E,
>(&self.db, &marker_row_op, &ctx, &ctx, schema_fingerprint)?;
Self::commit_prepared_single_row(
&self.db,
marker_row_op,
prepared_row_op,
|delta| accumulate_prepared_row_op_delta(&mut batch_delta, delta),
|| {},
)?;
Ok::<_, InternalError>(saved)
})();
match result {
Ok(saved) => {
out.push(saved);
span.set_rows(u64::try_from(out.len()).unwrap_or(u64::MAX));
}
Err(err) => {
if !out.is_empty() {
emit_index_delta_metrics::<E>(&batch_delta);
record(MetricsEvent::NonAtomicPartialCommit {
entity_path: E::PATH,
committed_rows: u64::try_from(out.len()).unwrap_or(u64::MAX),
});
}
return Err(err);
}
}
}
if !out.is_empty() {
emit_index_delta_metrics::<E>(&batch_delta);
}
Ok(out)
}
fn save_batch_atomic(
&self,
mode: SaveMode,
entities: impl IntoIterator<Item = E>,
) -> Result<Vec<E>, InternalError> {
let entities: Vec<E> = entities.into_iter().collect();
self.save_batch_atomic_impl(SaveRule::from_mode(mode), entities)
}
pub(crate) fn insert_many_atomic(
&self,
entities: impl IntoIterator<Item = E>,
) -> Result<Vec<E>, InternalError> {
self.save_batch_atomic(SaveMode::Insert, entities)
}
pub(crate) fn update_many_atomic(
&self,
entities: impl IntoIterator<Item = E>,
) -> Result<Vec<E>, InternalError> {
self.save_batch_atomic(SaveMode::Update, entities)
}
pub(crate) fn replace_many_atomic(
&self,
entities: impl IntoIterator<Item = E>,
) -> Result<Vec<E>, InternalError> {
self.save_batch_atomic(SaveMode::Replace, entities)
}
pub(crate) fn insert_many_non_atomic(
&self,
entities: impl IntoIterator<Item = E>,
) -> Result<Vec<E>, InternalError> {
self.save_batch_non_atomic(SaveMode::Insert, entities)
}
pub(crate) fn update_many_non_atomic(
&self,
entities: impl IntoIterator<Item = E>,
) -> Result<Vec<E>, InternalError> {
self.save_batch_non_atomic(SaveMode::Update, entities)
}
pub(crate) fn replace_many_non_atomic(
&self,
entities: impl IntoIterator<Item = E>,
) -> Result<Vec<E>, InternalError> {
self.save_batch_non_atomic(SaveMode::Replace, entities)
}
#[inline(never)]
fn save_batch_atomic_impl(
&self,
save_rule: SaveRule,
entities: Vec<E>,
) -> Result<Vec<E>, InternalError> {
let mut span = Span::<E>::new(ExecKind::Save);
let ctx = mutation_write_context::<E>(&self.db)?;
let mut out = Vec::with_capacity(entities.len());
let mut marker_row_ops = Vec::with_capacity(entities.len());
let mut seen_row_keys = HashSet::with_capacity(entities.len());
let schema = Self::schema_info()?;
let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
let validate_relations = model_has_strong_relation_targets(E::MODEL);
for mut entity in entities {
self.preflight_entity_with_cached_schema(&mut entity, schema, validate_relations)?;
let marker_row_op =
Self::prepare_typed_entity_row_op(&ctx, save_rule, &entity, schema_fingerprint)?;
if !seen_row_keys.insert(marker_row_op.key) {
let data_key = DataKey::try_new::<E>(entity.id().key())?;
return Err(InternalError::mutation_atomic_save_duplicate_key(
E::PATH,
data_key,
));
}
marker_row_ops.push(marker_row_op);
out.push(entity);
}
if marker_row_ops.is_empty() {
return Ok(out);
}
Self::commit_atomic_batch(&self.db, marker_row_ops, &mut span)?;
Ok(out)
}
fn prepare_typed_entity_row_op(
ctx: &Context<'_, E>,
save_rule: SaveRule,
entity: &E,
schema_fingerprint: CommitSchemaFingerprint,
) -> Result<CommitRowOp, InternalError> {
let data_key = DataKey::try_new::<E>(entity.id().key())?;
let raw_key = data_key.to_raw()?;
let old_raw = Self::resolve_existing_row_for_rule(ctx, &data_key, save_rule)?;
let row_bytes = CanonicalRow::from_entity(entity)?
.into_raw_row()
.into_bytes();
let before_bytes = old_raw.map(<RawRow as Storable>::into_bytes);
let row_op = CommitRowOp::new(
E::PATH,
raw_key,
before_bytes,
Some(row_bytes),
schema_fingerprint,
);
Ok(row_op)
}
fn build_structural_after_image_row(
mode: MutationMode,
mutation: &MutationInput,
old_row: Option<&RawRow>,
) -> Result<CanonicalRow, InternalError> {
match mode {
MutationMode::Update => {
let Some(old_row) = old_row else {
return RawRow::from_serialized_update_patch(
E::MODEL,
mutation.serialized_patch(),
);
};
old_row.apply_serialized_update_patch(E::MODEL, mutation.serialized_patch())
}
MutationMode::Insert | MutationMode::Replace => {
RawRow::from_serialized_update_patch(E::MODEL, mutation.serialized_patch())
}
}
}
fn resolve_existing_row_for_rule(
ctx: &Context<'_, E>,
data_key: &DataKey,
save_rule: SaveRule,
) -> Result<Option<RawRow>, InternalError> {
let raw_key = data_key.to_raw()?;
match save_rule {
SaveRule::RequireAbsent => {
if let Some(existing) = ctx.with_store(|store| store.get(&raw_key))? {
Self::validate_existing_row_identity(data_key, &existing)?;
return Err(ExecutorError::KeyExists(data_key.clone()).into());
}
Ok(None)
}
SaveRule::RequirePresent => {
let old_row = ctx
.with_store(|store| store.get(&raw_key))?
.ok_or_else(|| InternalError::store_not_found(data_key.to_string()))?;
Self::validate_existing_row_identity(data_key, &old_row)?;
Ok(Some(old_row))
}
SaveRule::AllowAny => {
let old_row = ctx.with_store(|store| store.get(&raw_key))?;
if let Some(old) = old_row.as_ref() {
Self::validate_existing_row_identity(data_key, old)?;
}
Ok(old_row)
}
}
}
fn validate_existing_row_identity(
data_key: &DataKey,
row: &RawRow,
) -> Result<(), InternalError> {
Self::ensure_persisted_row_invariants(data_key, row).map_err(|err| {
match (err.class(), err.origin()) {
(
crate::error::ErrorClass::Corruption,
crate::error::ErrorOrigin::Serialize | crate::error::ErrorOrigin::Store,
) => err,
_ => InternalError::from(ExecutorError::persisted_row_invariant_violation(
data_key,
&err.message,
)),
}
})?;
Ok(())
}
fn save_entity(&self, mode: SaveMode, entity: E) -> Result<E, InternalError> {
let ctx = mutation_write_context::<E>(&self.db)?;
let save_rule = SaveRule::from_mode(mode);
self.save_entity_with_context(&ctx, save_rule, entity)
}
fn save_entity_with_context(
&self,
ctx: &Context<'_, E>,
save_rule: SaveRule,
entity: E,
) -> Result<E, InternalError> {
let schema = Self::schema_info()?;
let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
let validate_relations = model_has_strong_relation_targets(E::MODEL);
self.save_entity_with_context_and_schema(
ctx,
save_rule,
schema,
schema_fingerprint,
validate_relations,
entity,
)
}
fn save_entity_with_context_and_schema(
&self,
ctx: &Context<'_, E>,
save_rule: SaveRule,
schema: &SchemaInfo,
schema_fingerprint: CommitSchemaFingerprint,
validate_relations: bool,
entity: E,
) -> Result<E, InternalError> {
let mut span = Span::<E>::new(ExecKind::Save);
let (entity, marker_row_op) = self.prepare_entity_save_row_op(
ctx,
save_rule,
schema,
schema_fingerprint,
validate_relations,
entity,
)?;
let prepared_row_op =
prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint::<E>(
&self.db,
&marker_row_op,
ctx,
ctx,
schema_fingerprint,
)?;
Self::commit_prepared_single_row(
&self.db,
marker_row_op,
prepared_row_op,
|delta| emit_index_delta_metrics::<E>(delta),
|| {
span.set_rows(1);
},
)?;
Ok(entity)
}
fn prepare_entity_save_row_op(
&self,
ctx: &Context<'_, E>,
save_rule: SaveRule,
schema: &SchemaInfo,
schema_fingerprint: CommitSchemaFingerprint,
validate_relations: bool,
entity: E,
) -> Result<(E, CommitRowOp), InternalError> {
let mut entity = entity;
self.preflight_entity_with_cached_schema(&mut entity, schema, validate_relations)?;
let marker_row_op =
Self::prepare_typed_entity_row_op(ctx, save_rule, &entity, schema_fingerprint)?;
Ok((entity, marker_row_op))
}
#[allow(dead_code)]
pub(in crate::db) fn apply_structural_mutation(
&self,
mode: MutationMode,
key: E::Key,
patch: UpdatePatch,
) -> Result<E, InternalError> {
let mutation = MutationInput::from_update_patch::<E>(key, &patch)?;
self.save_structural_mutation(mode, mutation)
}
#[allow(dead_code)]
fn save_structural_mutation(
&self,
mode: MutationMode,
mutation: MutationInput,
) -> Result<E, InternalError> {
let mut span = Span::<E>::new(ExecKind::Save);
let ctx = mutation_write_context::<E>(&self.db)?;
let data_key = mutation.data_key().clone();
let old_raw = Self::resolve_existing_row_for_rule(&ctx, &data_key, mode.save_rule())?;
let raw_after_image =
Self::build_structural_after_image_row(mode, &mutation, old_raw.as_ref())?;
let entity = self.validate_structural_after_image(&data_key, &raw_after_image)?;
let normalized_mutation = MutationInput::from_entity(&entity)?;
let row_bytes =
Self::build_structural_after_image_row(mode, &normalized_mutation, old_raw.as_ref())?;
let row_bytes = row_bytes.into_raw_row().into_bytes();
let before_bytes = old_raw.map(<RawRow as Storable>::into_bytes);
let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
let marker_row_op = CommitRowOp::new(
E::PATH,
data_key.to_raw()?,
before_bytes,
Some(row_bytes),
schema_fingerprint,
);
let prepared_row_op =
prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint::<E>(
&self.db,
&marker_row_op,
&ctx,
&ctx,
schema_fingerprint,
)?;
Self::commit_prepared_single_row(
&self.db,
marker_row_op,
prepared_row_op,
|delta| emit_index_delta_metrics::<E>(delta),
|| {
span.set_rows(1);
},
)?;
Ok(entity)
}
#[allow(dead_code)]
fn validate_structural_after_image(
&self,
data_key: &DataKey,
row: &RawRow,
) -> Result<E, InternalError> {
let expected_key = data_key.try_key::<E>()?;
let mut entity = row.try_decode::<E>().map_err(|err| {
InternalError::mutation_structural_after_image_invalid(
E::PATH,
data_key,
err.to_string(),
)
})?;
let identity_key = entity.id().key();
if identity_key != expected_key {
let field_name = E::MODEL.primary_key().name();
let field_value = FieldValue::to_value(&identity_key);
let identity_value = FieldValue::to_value(&expected_key);
return Err(InternalError::mutation_entity_primary_key_mismatch(
E::PATH,
field_name,
&field_value,
&identity_value,
));
}
self.preflight_entity(&mut entity)?;
Ok(entity)
}
fn commit_prepared_single_row(
db: &Db<E::Canister>,
marker_row_op: CommitRowOp,
prepared_row_op: crate::db::commit::PreparedRowCommitOp,
on_index_applied: impl FnOnce(&PreparedRowOpDelta),
on_data_applied: impl FnOnce(),
) -> Result<(), InternalError> {
let synchronized_store_handles = synchronized_store_handles_for_prepared_row_ops(
db,
std::slice::from_ref(&prepared_row_op),
);
commit_prepared_single_save_row_op_with_window(
marker_row_op,
prepared_row_op,
synchronized_store_handles,
"save_row_apply",
on_index_applied,
|| {
on_data_applied();
},
)?;
Ok(())
}
fn commit_atomic_batch(
db: &Db<E::Canister>,
marker_row_ops: Vec<CommitRowOp>,
span: &mut Span<E>,
) -> Result<(), InternalError> {
let rows_touched = u64::try_from(marker_row_ops.len()).unwrap_or(u64::MAX);
commit_save_row_ops_with_window::<E>(
db,
marker_row_ops,
"save_batch_atomic_row_apply",
|| {
span.set_rows(rows_touched);
},
)?;
Ok(())
}
}
const fn accumulate_prepared_row_op_delta(
total: &mut PreparedRowOpDelta,
delta: &PreparedRowOpDelta,
) {
total.index_inserts = total.index_inserts.saturating_add(delta.index_inserts);
total.index_removes = total.index_removes.saturating_add(delta.index_removes);
total.reverse_index_inserts = total
.reverse_index_inserts
.saturating_add(delta.reverse_index_inserts);
total.reverse_index_removes = total
.reverse_index_removes
.saturating_add(delta.reverse_index_removes);
}