use crate::{
db::{
commit::{
CommitRowOp, CommitSchemaFingerprint,
prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint,
},
data::{
CanonicalRow, DataKey, PersistedRow, RawRow, SerializedStructuralPatch, StructuralPatch,
},
executor::{
Context,
mutation::{
MutationInput, emit_index_delta_metrics, mutation_write_context,
save::{MutationMode, SaveExecutor},
},
},
schema::{SchemaInfo, commit_schema_fingerprint_for_entity},
},
error::InternalError,
metrics::sink::{ExecKind, Span},
sanitize::SanitizeWriteContext,
traits::{EntityValue, KeyValueCodec, Storable},
types::Timestamp,
};
use std::collections::HashSet;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum StructuralPatchOrigin {
PublicAuthored,
InternalLowered,
}
impl StructuralPatchOrigin {
const fn rejects_explicit_generated_fields(self) -> bool {
matches!(self, Self::PublicAuthored)
}
}
struct StructuralMutationRequest<E: PersistedRow + EntityValue> {
mode: MutationMode,
key: E::Key,
patch: StructuralPatch,
write_context: SanitizeWriteContext,
origin: StructuralPatchOrigin,
}
struct StructuralMutationBatchItem<E: PersistedRow + EntityValue> {
key: E::Key,
patch: StructuralPatch,
}
impl<E: PersistedRow + EntityValue> StructuralMutationBatchItem<E> {
const fn internal_lowered(key: E::Key, patch: StructuralPatch) -> Self {
Self { key, patch }
}
}
impl<E: PersistedRow + EntityValue> StructuralMutationRequest<E> {
const fn public_authored(
mode: MutationMode,
key: E::Key,
patch: StructuralPatch,
write_context: SanitizeWriteContext,
) -> Self {
Self {
mode,
key,
patch,
write_context,
origin: StructuralPatchOrigin::PublicAuthored,
}
}
const fn internal_lowered(
mode: MutationMode,
key: E::Key,
patch: StructuralPatch,
write_context: SanitizeWriteContext,
) -> Self {
Self {
mode,
key,
patch,
write_context,
origin: StructuralPatchOrigin::InternalLowered,
}
}
}
impl<E: PersistedRow + EntityValue> SaveExecutor<E> {
const fn structural_write_context(mode: MutationMode, now: Timestamp) -> SanitizeWriteContext {
SanitizeWriteContext::new(mode.sanitize_write_mode(), now)
}
pub(in crate::db) fn apply_structural_mutation(
&self,
mode: MutationMode,
key: E::Key,
patch: StructuralPatch,
) -> Result<E, InternalError> {
let write_context = Self::structural_write_context(mode, Timestamp::now());
let request = StructuralMutationRequest::public_authored(mode, key, patch, write_context);
self.save_structural_mutation(request)
}
pub(in crate::db) fn apply_internal_lowered_structural_mutation_batch(
&self,
mode: MutationMode,
rows: Vec<(E::Key, StructuralPatch)>,
write_context: SanitizeWriteContext,
) -> Result<Vec<E>, InternalError> {
let items = rows
.into_iter()
.map(|(key, patch)| StructuralMutationBatchItem::internal_lowered(key, patch))
.collect();
self.apply_internal_structural_mutation_batch(mode, items, write_context)
}
fn apply_internal_structural_mutation_batch(
&self,
mode: MutationMode,
items: Vec<StructuralMutationBatchItem<E>>,
write_context: SanitizeWriteContext,
) -> Result<Vec<E>, InternalError> {
let mut span = Span::<E>::new(ExecKind::Save);
let ctx = mutation_write_context::<E>(&self.db)?;
let schema = Self::schema_info();
let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
let validate_relations = E::MODEL.has_any_strong_relations();
let mut entities = Vec::with_capacity(items.len());
let mut marker_row_ops = Vec::with_capacity(items.len());
let mut seen_row_keys = HashSet::with_capacity(items.len());
for item in items {
let request = StructuralMutationRequest::internal_lowered(
mode,
item.key,
item.patch,
write_context,
);
let (entity, marker_row_op) = self.prepare_structural_mutation_row_op(
&ctx,
schema,
schema_fingerprint,
validate_relations,
request,
)?;
if !seen_row_keys.insert(marker_row_op.key) {
let data_key = DataKey::try_new::<E>(entity.id().key())?;
return Err(InternalError::mutation_atomic_save_duplicate_key(
E::PATH,
data_key,
));
}
marker_row_ops.push(marker_row_op);
entities.push(entity);
}
if marker_row_ops.is_empty() {
return Ok(entities);
}
Self::commit_atomic_batch(&self.db, marker_row_ops, &mut span)?;
Ok(entities)
}
fn save_structural_mutation(
&self,
request: StructuralMutationRequest<E>,
) -> Result<E, InternalError> {
let mut span = Span::<E>::new(ExecKind::Save);
let ctx = mutation_write_context::<E>(&self.db)?;
let schema = Self::schema_info();
let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
let validate_relations = E::MODEL.has_any_strong_relations();
let (entity, marker_row_op) = self.prepare_structural_mutation_row_op(
&ctx,
schema,
schema_fingerprint,
validate_relations,
request,
)?;
let prepared_row_op =
prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint::<E>(
&self.db,
&marker_row_op,
&ctx,
&ctx,
schema_fingerprint,
)?;
Self::commit_prepared_single_row(
&self.db,
marker_row_op,
prepared_row_op,
|delta| emit_index_delta_metrics::<E>(delta),
|| {
span.set_rows(1);
},
)?;
Ok(entity)
}
fn prepare_structural_mutation_row_op(
&self,
ctx: &Context<'_, E>,
schema: &SchemaInfo,
schema_fingerprint: CommitSchemaFingerprint,
validate_relations: bool,
request: StructuralMutationRequest<E>,
) -> Result<(E, CommitRowOp), InternalError> {
let StructuralMutationRequest {
mode,
key,
patch,
write_context,
origin,
} = request;
if origin.rejects_explicit_generated_fields() {
Self::reject_explicit_generated_fields(&patch)?;
}
Self::validate_structural_patch_write_bounds(&patch)?;
let mutation = MutationInput::from_structural_patch::<E>(key, &patch)?;
let data_key = mutation.data_key().clone();
let old_raw = Self::resolve_existing_row_for_rule(ctx, &data_key, mode.save_rule())?;
let entity = match mode {
MutationMode::Update => {
let raw_after_image =
Self::build_structural_after_image_row(mode, &mutation, old_raw.as_ref())?;
self.validate_structural_after_image(
&data_key,
&raw_after_image,
schema,
validate_relations,
write_context,
)?
}
MutationMode::Insert | MutationMode::Replace => self
.validate_structural_after_image_from_patch(
&data_key,
mutation.serialized_slots(),
schema,
validate_relations,
write_context,
)?,
};
let normalized_mutation = MutationInput::from_entity(&entity)?;
let row_bytes =
Self::build_structural_after_image_row(mode, &normalized_mutation, old_raw.as_ref())?;
let row_bytes = row_bytes.into_raw_row().into_bytes();
let before_bytes = old_raw.map(<RawRow as Storable>::into_bytes);
let marker_row_op = CommitRowOp::new(
E::PATH,
data_key.to_raw()?,
before_bytes,
Some(row_bytes),
schema_fingerprint,
);
Ok((entity, marker_row_op))
}
fn reject_explicit_generated_fields(patch: &StructuralPatch) -> Result<(), InternalError> {
for entry in patch.entries() {
let field = &E::MODEL.fields()[entry.slot().index()];
if field.insert_generation().is_some() && field.name() != E::MODEL.primary_key.name() {
return Err(InternalError::mutation_generated_field_explicit(
E::PATH,
field.name(),
));
}
}
Ok(())
}
fn build_structural_after_image_row(
mode: MutationMode,
mutation: &MutationInput,
old_row: Option<&RawRow>,
) -> Result<CanonicalRow, InternalError> {
match mode {
MutationMode::Update => {
let Some(old_row) = old_row else {
return Err(InternalError::executor_invariant(
"structural update staging requires an existing baseline row",
));
};
old_row.apply_serialized_structural_patch(E::MODEL, mutation.serialized_slots())
}
MutationMode::Insert | MutationMode::Replace => {
RawRow::from_complete_serialized_structural_patch(
E::MODEL,
mutation.serialized_slots(),
)
}
}
}
fn validate_structural_after_image(
&self,
data_key: &DataKey,
row: &RawRow,
schema: &SchemaInfo,
validate_relations: bool,
write_context: SanitizeWriteContext,
) -> Result<E, InternalError> {
let expected_key = data_key.try_key::<E>()?;
let mut entity = row.try_decode::<E>().map_err(|err| {
InternalError::mutation_structural_after_image_invalid(
E::PATH,
data_key,
err.to_string(),
)
})?;
let identity_key = entity.id().key();
if identity_key != expected_key {
let field_name = E::MODEL.primary_key().name();
let field_value = KeyValueCodec::to_key_value(&identity_key);
let identity_value = KeyValueCodec::to_key_value(&expected_key);
return Err(InternalError::mutation_entity_primary_key_mismatch(
E::PATH,
field_name,
&field_value,
&identity_value,
));
}
self.preflight_entity_with_cached_schema(
&mut entity,
schema,
validate_relations,
write_context,
None,
)?;
Ok(entity)
}
fn validate_structural_after_image_from_patch(
&self,
data_key: &DataKey,
patch: &SerializedStructuralPatch,
schema: &SchemaInfo,
validate_relations: bool,
write_context: SanitizeWriteContext,
) -> Result<E, InternalError> {
let expected_key = data_key.try_key::<E>()?;
let mut entity =
crate::db::data::materialize_entity_from_serialized_structural_patch::<E>(patch)
.map_err(|err| {
InternalError::mutation_structural_after_image_invalid(
E::PATH,
data_key,
err.to_string(),
)
})?;
let identity_key = entity.id().key();
if identity_key != expected_key {
let field_name = E::MODEL.primary_key().name();
let field_value = KeyValueCodec::to_key_value(&identity_key);
let identity_value = KeyValueCodec::to_key_value(&expected_key);
return Err(InternalError::mutation_entity_primary_key_mismatch(
E::PATH,
field_name,
&field_value,
&identity_value,
));
}
self.preflight_entity_with_cached_schema(
&mut entity,
schema,
validate_relations,
write_context,
None,
)?;
Ok(entity)
}
}