use crate::{
db::{
Db,
commit::CommitRowOp,
data::{DataKey, PersistedRow, RawRow},
executor::{
Context, ExecutorError,
mutation::{
PreparedRowOpDelta, commit_prepared_single_save_row_op_with_window,
commit_save_row_ops_with_window, synchronized_store_handles_for_prepared_row_ops,
},
},
},
error::InternalError,
metrics::sink::Span,
traits::EntityValue,
};
use crate::db::executor::mutation::save::{SaveExecutor, SaveRule};
impl<E: PersistedRow + EntityValue> SaveExecutor<E> {
pub(super) fn resolve_existing_row_for_rule(
ctx: &Context<'_, E>,
data_key: &DataKey,
save_rule: SaveRule,
) -> Result<Option<RawRow>, InternalError> {
let raw_key = data_key.to_raw()?;
match save_rule {
SaveRule::RequireAbsent => {
if let Some(existing) = ctx.with_store(|store| store.get(&raw_key))? {
Self::validate_existing_row_identity(data_key, &existing)?;
return Err(ExecutorError::KeyExists(data_key.clone()).into());
}
Ok(None)
}
SaveRule::RequirePresent => {
let old_row = ctx
.with_store(|store| store.get(&raw_key))?
.ok_or_else(|| InternalError::store_not_found(data_key.to_string()))?;
Self::validate_existing_row_identity(data_key, &old_row)?;
Ok(Some(old_row))
}
SaveRule::AllowAny => {
let old_row = ctx.with_store(|store| store.get(&raw_key))?;
if let Some(old) = old_row.as_ref() {
Self::validate_existing_row_identity(data_key, old)?;
}
Ok(old_row)
}
}
}
fn validate_existing_row_identity(
data_key: &DataKey,
row: &RawRow,
) -> Result<(), InternalError> {
Self::ensure_persisted_row_invariants(data_key, row).map_err(|err| {
match (err.class(), err.origin()) {
(
crate::error::ErrorClass::Corruption,
crate::error::ErrorOrigin::Serialize | crate::error::ErrorOrigin::Store,
) => err,
_ => InternalError::from(ExecutorError::persisted_row_invariant_violation(
data_key,
&err.message,
)),
}
})?;
Ok(())
}
pub(super) fn commit_prepared_single_row(
db: &Db<E::Canister>,
marker_row_op: CommitRowOp,
prepared_row_op: crate::db::commit::PreparedRowCommitOp,
on_index_applied: impl FnOnce(&PreparedRowOpDelta),
on_data_applied: impl FnOnce(),
) -> Result<(), InternalError> {
let synchronized_store_handles = synchronized_store_handles_for_prepared_row_ops(
db,
std::slice::from_ref(&prepared_row_op),
);
commit_prepared_single_save_row_op_with_window(
marker_row_op,
prepared_row_op,
synchronized_store_handles,
"save_row_apply",
on_index_applied,
|| {
on_data_applied();
},
)?;
Ok(())
}
pub(super) fn commit_atomic_batch(
db: &Db<E::Canister>,
marker_row_ops: Vec<CommitRowOp>,
span: &mut Span<E>,
) -> Result<(), InternalError> {
let rows_touched = u64::try_from(marker_row_ops.len()).unwrap_or(u64::MAX);
commit_save_row_ops_with_window::<E>(
db,
marker_row_ops,
"save_batch_atomic_row_apply",
|| {
span.set_rows(rows_touched);
},
)?;
Ok(())
}
}
pub(super) const fn accumulate_prepared_row_op_delta(
total: &mut PreparedRowOpDelta,
delta: &PreparedRowOpDelta,
) {
total.index_inserts = total.index_inserts.saturating_add(delta.index_inserts);
total.index_removes = total.index_removes.saturating_add(delta.index_removes);
total.reverse_index_inserts = total
.reverse_index_inserts
.saturating_add(delta.reverse_index_inserts);
total.reverse_index_removes = total
.reverse_index_removes
.saturating_add(delta.reverse_index_removes);
}