use super::{MutationMode, SaveExecutor};
use crate::{
db::{
commit::{
CommitRowOp,
prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint,
},
data::{CanonicalRow, DataKey, PersistedRow, RawRow, SerializedUpdatePatch, UpdatePatch},
executor::mutation::{MutationInput, emit_index_delta_metrics, mutation_write_context},
schema::commit_schema_fingerprint_for_entity,
},
error::InternalError,
metrics::sink::{ExecKind, Span},
sanitize::SanitizeWriteContext,
traits::{EntityValue, FieldValue, Storable},
types::Timestamp,
};
impl<E: PersistedRow + EntityValue> SaveExecutor<E> {
const fn structural_write_context(mode: MutationMode, now: Timestamp) -> SanitizeWriteContext {
SanitizeWriteContext::new(mode.sanitize_write_mode(), now)
}
pub(in crate::db) fn apply_structural_mutation(
&self,
mode: MutationMode,
key: E::Key,
patch: UpdatePatch,
) -> Result<E, InternalError> {
let write_context = Self::structural_write_context(mode, Timestamp::now());
self.apply_structural_mutation_with_write_context(mode, key, patch, write_context)
}
pub(in crate::db) fn apply_structural_mutation_with_write_context(
&self,
mode: MutationMode,
key: E::Key,
patch: UpdatePatch,
write_context: SanitizeWriteContext,
) -> Result<E, InternalError> {
let mutation = MutationInput::from_update_patch::<E>(key, &patch)?;
self.save_structural_mutation(mode, mutation, Some(&patch), write_context)
}
pub(in crate::db) fn apply_internal_structural_mutation_with_write_context(
&self,
mode: MutationMode,
key: E::Key,
patch: UpdatePatch,
write_context: SanitizeWriteContext,
) -> Result<E, InternalError> {
let mutation = MutationInput::from_update_patch::<E>(key, &patch)?;
self.save_structural_mutation(mode, mutation, None, write_context)
}
fn save_structural_mutation(
&self,
mode: MutationMode,
mutation: MutationInput,
authored_patch: Option<&UpdatePatch>,
write_context: SanitizeWriteContext,
) -> Result<E, InternalError> {
let mut span = Span::<E>::new(ExecKind::Save);
let ctx = mutation_write_context::<E>(&self.db)?;
let data_key = mutation.data_key().clone();
let old_raw = Self::resolve_existing_row_for_rule(&ctx, &data_key, mode.save_rule())?;
if let Some(authored_patch) = authored_patch {
Self::reject_explicit_generated_fields(authored_patch)?;
}
let entity = match mode {
MutationMode::Update => {
let raw_after_image =
Self::build_structural_after_image_row(mode, &mutation, old_raw.as_ref())?;
self.validate_structural_after_image(&data_key, &raw_after_image, write_context)?
}
MutationMode::Insert | MutationMode::Replace => self
.validate_structural_after_image_from_patch(
&data_key,
mutation.serialized_slots(),
write_context,
)?,
};
let normalized_mutation = MutationInput::from_entity(&entity)?;
let row_bytes =
Self::build_structural_after_image_row(mode, &normalized_mutation, old_raw.as_ref())?;
let row_bytes = row_bytes.into_raw_row().into_bytes();
let before_bytes = old_raw.map(<RawRow as Storable>::into_bytes);
let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
let marker_row_op = CommitRowOp::new(
E::PATH,
data_key.to_raw()?,
before_bytes,
Some(row_bytes),
schema_fingerprint,
);
let prepared_row_op =
prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint::<E>(
&self.db,
&marker_row_op,
&ctx,
&ctx,
schema_fingerprint,
)?;
Self::commit_prepared_single_row(
&self.db,
marker_row_op,
prepared_row_op,
|delta| emit_index_delta_metrics::<E>(delta),
|| {
span.set_rows(1);
},
)?;
Ok(entity)
}
fn reject_explicit_generated_fields(patch: &UpdatePatch) -> Result<(), InternalError> {
for entry in patch.entries() {
let field = &E::MODEL.fields()[entry.slot().index()];
if field.insert_generation().is_some() && field.name() != E::MODEL.primary_key.name() {
return Err(InternalError::mutation_generated_field_explicit(
E::PATH,
field.name(),
));
}
}
Ok(())
}
fn build_structural_after_image_row(
mode: MutationMode,
mutation: &MutationInput,
old_row: Option<&RawRow>,
) -> Result<CanonicalRow, InternalError> {
match mode {
MutationMode::Update => {
let Some(old_row) = old_row else {
return Err(InternalError::executor_invariant(
"structural update staging requires an existing baseline row",
));
};
old_row.apply_serialized_update_patch(E::MODEL, mutation.serialized_slots())
}
MutationMode::Insert | MutationMode::Replace => {
RawRow::from_complete_serialized_update_patch(E::MODEL, mutation.serialized_slots())
}
}
}
fn validate_structural_after_image(
&self,
data_key: &DataKey,
row: &RawRow,
write_context: SanitizeWriteContext,
) -> Result<E, InternalError> {
let expected_key = data_key.try_key::<E>()?;
let mut entity = row.try_decode::<E>().map_err(|err| {
InternalError::mutation_structural_after_image_invalid(
E::PATH,
data_key,
err.to_string(),
)
})?;
let identity_key = entity.id().key();
if identity_key != expected_key {
let field_name = E::MODEL.primary_key().name();
let field_value = FieldValue::to_value(&identity_key);
let identity_value = FieldValue::to_value(&expected_key);
return Err(InternalError::mutation_entity_primary_key_mismatch(
E::PATH,
field_name,
&field_value,
&identity_value,
));
}
self.preflight_entity(&mut entity, write_context)?;
Ok(entity)
}
fn validate_structural_after_image_from_patch(
&self,
data_key: &DataKey,
patch: &SerializedUpdatePatch,
write_context: SanitizeWriteContext,
) -> Result<E, InternalError> {
let expected_key = data_key.try_key::<E>()?;
let mut entity = crate::db::data::materialize_entity_from_serialized_update_patch::<E>(
patch,
)
.map_err(|err| {
InternalError::mutation_structural_after_image_invalid(
E::PATH,
data_key,
err.to_string(),
)
})?;
let identity_key = entity.id().key();
if identity_key != expected_key {
let field_name = E::MODEL.primary_key().name();
let field_value = FieldValue::to_value(&identity_key);
let identity_value = FieldValue::to_value(&expected_key);
return Err(InternalError::mutation_entity_primary_key_mismatch(
E::PATH,
field_name,
&field_value,
&identity_value,
));
}
self.preflight_entity(&mut entity, write_context)?;
Ok(entity)
}
}