use crate::{
db::{
Db,
commit::CommitRowOp,
data::{CanonicalRow, DataKey, PersistedRow, RawRow, UpdatePatch},
executor::{
Context, ExecutorError,
mutation::{
MutationInput, commit_save_row_ops_with_window,
commit_single_save_row_op_with_window, mutation_write_context,
},
},
schema::commit_schema_fingerprint_for_entity,
},
error::InternalError,
metrics::sink::{ExecKind, MetricsEvent, Span, record},
traits::{EntityValue, FieldValue},
};
use candid::CandidType;
use derive_more::Display;
use serde::{Deserialize, Serialize};
use std::collections::BTreeSet;
#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Display, Serialize)]
enum SaveMode {
#[default]
Insert,
Replace,
Update,
}
#[derive(Clone, Copy)]
pub(in crate::db) struct SaveExecutor<E: PersistedRow + EntityValue> {
pub(in crate::db::executor::mutation) db: Db<E::Canister>,
}
#[derive(Clone, Copy)]
enum SaveRule {
RequireAbsent,
RequirePresent,
AllowAny,
}
impl SaveRule {
const fn from_mode(mode: SaveMode) -> Self {
match mode {
SaveMode::Insert => Self::RequireAbsent,
SaveMode::Update => Self::RequirePresent,
SaveMode::Replace => Self::AllowAny,
}
}
}
#[derive(Clone, Copy)]
pub enum MutationMode {
#[allow(dead_code)]
Insert,
#[allow(dead_code)]
Replace,
Update,
}
impl MutationMode {
const fn save_rule(self) -> SaveRule {
match self {
Self::Insert => SaveRule::RequireAbsent,
Self::Replace => SaveRule::AllowAny,
Self::Update => SaveRule::RequirePresent,
}
}
}
impl<E: PersistedRow + EntityValue> SaveExecutor<E> {
#[must_use]
pub(in crate::db) const fn new(db: Db<E::Canister>, _debug: bool) -> Self {
Self { db }
}
pub(crate) fn insert(&self, entity: E) -> Result<E, InternalError> {
self.save_entity(SaveMode::Insert, entity)
}
pub(crate) fn update(&self, entity: E) -> Result<E, InternalError> {
self.save_entity(SaveMode::Update, entity)
}
#[allow(dead_code)]
pub(in crate::db) fn insert_structural(
&self,
key: E::Key,
patch: UpdatePatch,
) -> Result<E, InternalError> {
self.apply_structural_mutation(MutationMode::Insert, key, patch)
}
#[allow(dead_code)]
pub(in crate::db) fn replace_structural(
&self,
key: E::Key,
patch: UpdatePatch,
) -> Result<E, InternalError> {
self.apply_structural_mutation(MutationMode::Replace, key, patch)
}
#[allow(dead_code)]
pub(in crate::db) fn update_structural(
&self,
key: E::Key,
patch: UpdatePatch,
) -> Result<E, InternalError> {
self.apply_structural_mutation(MutationMode::Update, key, patch)
}
pub(crate) fn replace(&self, entity: E) -> Result<E, InternalError> {
self.save_entity(SaveMode::Replace, entity)
}
fn save_batch_non_atomic(
&self,
mode: SaveMode,
entities: impl IntoIterator<Item = E>,
) -> Result<Vec<E>, InternalError> {
let iter = entities.into_iter();
let mut out = Vec::with_capacity(iter.size_hint().0);
for entity in iter {
match self.save_entity(mode, entity) {
Ok(saved) => out.push(saved),
Err(err) => {
if !out.is_empty() {
record(MetricsEvent::NonAtomicPartialCommit {
entity_path: E::PATH,
committed_rows: u64::try_from(out.len()).unwrap_or(u64::MAX),
});
}
return Err(err);
}
}
}
Ok(out)
}
fn save_batch_atomic(
&self,
mode: SaveMode,
entities: impl IntoIterator<Item = E>,
) -> Result<Vec<E>, InternalError> {
let entities: Vec<E> = entities.into_iter().collect();
self.save_batch_atomic_impl(SaveRule::from_mode(mode), entities)
}
pub(crate) fn insert_many_atomic(
&self,
entities: impl IntoIterator<Item = E>,
) -> Result<Vec<E>, InternalError> {
self.save_batch_atomic(SaveMode::Insert, entities)
}
pub(crate) fn update_many_atomic(
&self,
entities: impl IntoIterator<Item = E>,
) -> Result<Vec<E>, InternalError> {
self.save_batch_atomic(SaveMode::Update, entities)
}
pub(crate) fn replace_many_atomic(
&self,
entities: impl IntoIterator<Item = E>,
) -> Result<Vec<E>, InternalError> {
self.save_batch_atomic(SaveMode::Replace, entities)
}
pub(crate) fn insert_many_non_atomic(
&self,
entities: impl IntoIterator<Item = E>,
) -> Result<Vec<E>, InternalError> {
self.save_batch_non_atomic(SaveMode::Insert, entities)
}
pub(crate) fn update_many_non_atomic(
&self,
entities: impl IntoIterator<Item = E>,
) -> Result<Vec<E>, InternalError> {
self.save_batch_non_atomic(SaveMode::Update, entities)
}
pub(crate) fn replace_many_non_atomic(
&self,
entities: impl IntoIterator<Item = E>,
) -> Result<Vec<E>, InternalError> {
self.save_batch_non_atomic(SaveMode::Replace, entities)
}
#[inline(never)]
fn save_batch_atomic_impl(
&self,
save_rule: SaveRule,
entities: Vec<E>,
) -> Result<Vec<E>, InternalError> {
let mut span = Span::<E>::new(ExecKind::Save);
let ctx = mutation_write_context::<E>(&self.db)?;
let mut out = Vec::with_capacity(entities.len());
let mut marker_row_ops = Vec::with_capacity(entities.len());
let mut seen_row_keys = BTreeSet::<Vec<u8>>::new();
for mut entity in entities {
self.preflight_entity(&mut entity)?;
let mutation = MutationInput::from_entity(&entity)?;
let (marker_row_op, data_key) =
Self::prepare_logical_row_op(&ctx, save_rule, &mutation)?;
if !seen_row_keys.insert(marker_row_op.key.clone()) {
return Err(InternalError::mutation_atomic_save_duplicate_key(
E::PATH,
data_key,
));
}
marker_row_ops.push(marker_row_op);
out.push(entity);
}
if marker_row_ops.is_empty() {
return Ok(out);
}
Self::commit_atomic_batch(&self.db, marker_row_ops, &mut span)?;
Ok(out)
}
fn prepare_logical_row_op(
ctx: &Context<'_, E>,
save_rule: SaveRule,
mutation: &MutationInput,
) -> Result<(CommitRowOp, DataKey), InternalError> {
let data_key = mutation.data_key().clone();
let raw_key = data_key.to_raw()?;
let old_raw = Self::resolve_existing_row_for_rule(ctx, &data_key, save_rule)?;
let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
let row = Self::build_after_image_row(mutation, old_raw.as_ref())?;
let row_op = CommitRowOp::new(
E::PATH,
raw_key.as_bytes().to_vec(),
old_raw.as_ref().map(|item| item.as_bytes().to_vec()),
Some(row.as_bytes().to_vec()),
schema_fingerprint,
);
Ok((row_op, data_key))
}
fn build_after_image_row(
mutation: &MutationInput,
old_row: Option<&RawRow>,
) -> Result<CanonicalRow, InternalError> {
let Some(old_row) = old_row else {
return RawRow::from_serialized_update_patch(E::MODEL, mutation.serialized_patch());
};
old_row.apply_serialized_update_patch(E::MODEL, mutation.serialized_patch())
}
fn build_structural_after_image_row(
mode: MutationMode,
mutation: &MutationInput,
old_row: Option<&RawRow>,
) -> Result<CanonicalRow, InternalError> {
match mode {
MutationMode::Update => {
let Some(old_row) = old_row else {
return RawRow::from_serialized_update_patch(
E::MODEL,
mutation.serialized_patch(),
);
};
old_row.apply_serialized_update_patch(E::MODEL, mutation.serialized_patch())
}
MutationMode::Insert | MutationMode::Replace => {
RawRow::from_serialized_update_patch(E::MODEL, mutation.serialized_patch())
}
}
}
fn resolve_existing_row_for_rule(
ctx: &Context<'_, E>,
data_key: &DataKey,
save_rule: SaveRule,
) -> Result<Option<RawRow>, InternalError> {
let raw_key = data_key.to_raw()?;
match save_rule {
SaveRule::RequireAbsent => {
if let Some(existing) = ctx.with_store(|store| store.get(&raw_key))? {
Self::validate_existing_row_identity(data_key, &existing)?;
return Err(ExecutorError::KeyExists(data_key.clone()).into());
}
Ok(None)
}
SaveRule::RequirePresent => {
let old_row = ctx
.with_store(|store| store.get(&raw_key))?
.ok_or_else(|| InternalError::store_not_found(data_key.to_string()))?;
Self::validate_existing_row_identity(data_key, &old_row)?;
Ok(Some(old_row))
}
SaveRule::AllowAny => {
let old_row = ctx.with_store(|store| store.get(&raw_key))?;
if let Some(old) = old_row.as_ref() {
Self::validate_existing_row_identity(data_key, old)?;
}
Ok(old_row)
}
}
}
fn validate_existing_row_identity(
data_key: &DataKey,
row: &RawRow,
) -> Result<(), InternalError> {
Self::ensure_persisted_row_invariants(data_key, row).map_err(|err| {
match (err.class(), err.origin()) {
(
crate::error::ErrorClass::Corruption,
crate::error::ErrorOrigin::Serialize | crate::error::ErrorOrigin::Store,
) => err,
_ => InternalError::from(ExecutorError::persisted_row_invariant_violation(
data_key,
&err.message,
)),
}
})?;
Ok(())
}
fn save_entity(&self, mode: SaveMode, entity: E) -> Result<E, InternalError> {
let mut entity = entity;
(|| {
let mut span = Span::<E>::new(ExecKind::Save);
let ctx = mutation_write_context::<E>(&self.db)?;
let save_rule = SaveRule::from_mode(mode);
self.preflight_entity(&mut entity)?;
let mutation = MutationInput::from_entity(&entity)?;
let (marker_row_op, _data_key) =
Self::prepare_logical_row_op(&ctx, save_rule, &mutation)?;
Self::commit_single_row(&self.db, marker_row_op, &mut span)?;
Ok(entity)
})()
}
#[allow(dead_code)]
pub(in crate::db) fn apply_structural_mutation(
&self,
mode: MutationMode,
key: E::Key,
patch: UpdatePatch,
) -> Result<E, InternalError> {
let mutation = MutationInput::from_update_patch::<E>(key, &patch)?;
self.save_structural_mutation(mode, mutation)
}
#[allow(dead_code)]
fn save_structural_mutation(
&self,
mode: MutationMode,
mutation: MutationInput,
) -> Result<E, InternalError> {
let mut span = Span::<E>::new(ExecKind::Save);
let ctx = mutation_write_context::<E>(&self.db)?;
let data_key = mutation.data_key().clone();
let old_raw = Self::resolve_existing_row_for_rule(&ctx, &data_key, mode.save_rule())?;
let raw_after_image =
Self::build_structural_after_image_row(mode, &mutation, old_raw.as_ref())?;
let entity = self.validate_structural_after_image(&data_key, &raw_after_image)?;
let normalized_mutation = MutationInput::from_entity(&entity)?;
let row =
Self::build_structural_after_image_row(mode, &normalized_mutation, old_raw.as_ref())?;
let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
let marker_row_op = CommitRowOp::new(
E::PATH,
data_key.to_raw()?.as_bytes().to_vec(),
old_raw.as_ref().map(|item| item.as_bytes().to_vec()),
Some(row.as_bytes().to_vec()),
schema_fingerprint,
);
Self::commit_single_row(&self.db, marker_row_op, &mut span)?;
Ok(entity)
}
#[allow(dead_code)]
fn validate_structural_after_image(
&self,
data_key: &DataKey,
row: &RawRow,
) -> Result<E, InternalError> {
let expected_key = data_key.try_key::<E>()?;
let mut entity = row.try_decode::<E>().map_err(|err| {
InternalError::mutation_structural_after_image_invalid(
E::PATH,
data_key,
err.to_string(),
)
})?;
let identity_key = entity.id().key();
if identity_key != expected_key {
let field_name = E::MODEL.primary_key().name();
let field_value = FieldValue::to_value(&identity_key);
let identity_value = FieldValue::to_value(&expected_key);
return Err(InternalError::mutation_entity_primary_key_mismatch(
E::PATH,
field_name,
&field_value,
&identity_value,
));
}
self.preflight_entity(&mut entity)?;
Ok(entity)
}
fn commit_single_row(
db: &Db<E::Canister>,
marker_row_op: CommitRowOp,
span: &mut Span<E>,
) -> Result<(), InternalError> {
commit_single_save_row_op_with_window::<E>(db, marker_row_op, "save_row_apply", || {
span.set_rows(1);
})?;
Ok(())
}
fn commit_atomic_batch(
db: &Db<E::Canister>,
marker_row_ops: Vec<CommitRowOp>,
span: &mut Span<E>,
) -> Result<(), InternalError> {
let rows_touched = u64::try_from(marker_row_ops.len()).unwrap_or(u64::MAX);
commit_save_row_ops_with_window::<E>(
db,
marker_row_ops,
"save_batch_atomic_row_apply",
|| {
span.set_rows(rows_touched);
},
)?;
Ok(())
}
}