use crate::{
db::{
commit::{
CommitRowOp, CommitSchemaFingerprint,
prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint,
},
data::{
CanonicalRow, DataKey, PersistedRow, RawRow, SerializedStructuralPatch,
StructuralPatch, StructuralRowContract, StructuralSlotReader,
apply_serialized_structural_patch_to_raw_row_with_accepted_contract,
canonical_row_from_entity_with_accepted_contract,
canonical_row_from_raw_row_with_accepted_decode_contract,
},
executor::{
Context,
mutation::{
MutationInput, emit_index_delta_metrics, mutation_write_context,
save::{MutationMode, SaveExecutor},
},
},
schema::{AcceptedRowDecodeContract, SchemaInfo},
},
error::InternalError,
metrics::sink::{ExecKind, Span},
sanitize::SanitizeWriteContext,
traits::{EntityValue, KeyValueCodec, Storable},
types::Timestamp,
};
use std::collections::HashSet;
struct StructuralMutationRequest<E: PersistedRow + EntityValue> {
mode: MutationMode,
key: E::Key,
patch: StructuralPatch,
write_context: SanitizeWriteContext,
accepted_row_decode_contract: AcceptedRowDecodeContract,
}
struct StructuralMutationBatchItem<E: PersistedRow + EntityValue> {
key: E::Key,
patch: StructuralPatch,
}
impl<E: PersistedRow + EntityValue> StructuralMutationBatchItem<E> {
const fn internal_lowered(key: E::Key, patch: StructuralPatch) -> Self {
Self { key, patch }
}
}
impl<E: PersistedRow + EntityValue> StructuralMutationRequest<E> {
const fn public_authored(
mode: MutationMode,
key: E::Key,
patch: StructuralPatch,
write_context: SanitizeWriteContext,
accepted_row_decode_contract: AcceptedRowDecodeContract,
) -> Self {
Self {
mode,
key,
patch,
write_context,
accepted_row_decode_contract,
}
}
const fn internal_lowered(
mode: MutationMode,
key: E::Key,
patch: StructuralPatch,
write_context: SanitizeWriteContext,
accepted_row_decode_contract: AcceptedRowDecodeContract,
) -> Self {
Self {
mode,
key,
patch,
write_context,
accepted_row_decode_contract,
}
}
}
impl<E: PersistedRow + EntityValue> SaveExecutor<E> {
const fn structural_write_context(mode: MutationMode, now: Timestamp) -> SanitizeWriteContext {
SanitizeWriteContext::new(mode.sanitize_write_mode(), now)
}
pub(in crate::db) fn apply_structural_mutation(
&self,
mode: MutationMode,
key: E::Key,
patch: StructuralPatch,
accepted_row_decode_contract: AcceptedRowDecodeContract,
) -> Result<E, InternalError> {
let write_context = Self::structural_write_context(mode, Timestamp::now());
let request = StructuralMutationRequest::public_authored(
mode,
key,
patch,
write_context,
accepted_row_decode_contract,
);
self.save_structural_mutation(request)
}
pub(in crate::db) fn apply_internal_lowered_structural_mutation_batch(
&self,
mode: MutationMode,
rows: Vec<(E::Key, StructuralPatch)>,
write_context: SanitizeWriteContext,
accepted_row_decode_contract: AcceptedRowDecodeContract,
) -> Result<Vec<E>, InternalError> {
let items = rows
.into_iter()
.map(|(key, patch)| StructuralMutationBatchItem::internal_lowered(key, patch))
.collect();
self.apply_internal_structural_mutation_batch(
mode,
items,
write_context,
accepted_row_decode_contract,
)
}
fn apply_internal_structural_mutation_batch(
&self,
mode: MutationMode,
items: Vec<StructuralMutationBatchItem<E>>,
write_context: SanitizeWriteContext,
accepted_row_decode_contract: AcceptedRowDecodeContract,
) -> Result<Vec<E>, InternalError> {
let mut span = Span::<E>::new(ExecKind::Save);
let result = (|| {
let ctx = mutation_write_context::<E>(&self.db)?;
let schema = self.accepted_schema_info();
let schema_fingerprint = self.accepted_schema_fingerprint();
let validate_relations = schema.has_any_strong_relations();
let mut entities = Vec::with_capacity(items.len());
let mut marker_row_ops = Vec::with_capacity(items.len());
let mut seen_row_keys = HashSet::with_capacity(items.len());
for item in items {
let request = StructuralMutationRequest::internal_lowered(
mode,
item.key,
item.patch,
write_context,
accepted_row_decode_contract.clone(),
);
let (entity, marker_row_op) = self.prepare_structural_mutation_row_op(
&ctx,
schema,
schema_fingerprint,
validate_relations,
request,
)?;
if !seen_row_keys.insert(marker_row_op.key) {
let data_key = DataKey::try_new::<E>(entity.id().key())?;
return Err(InternalError::mutation_atomic_save_duplicate_key(
E::PATH,
data_key,
));
}
marker_row_ops.push(marker_row_op);
entities.push(entity);
}
if marker_row_ops.is_empty() {
return Ok(entities);
}
Self::commit_atomic_batch(&self.db, marker_row_ops, schema_fingerprint, &mut span)?;
Self::record_save_mutation(
mode.save_mutation_kind(),
u64::try_from(entities.len()).unwrap_or(u64::MAX),
);
Ok(entities)
})();
if let Err(err) = &result {
span.set_error(err);
}
result
}
fn save_structural_mutation(
&self,
request: StructuralMutationRequest<E>,
) -> Result<E, InternalError> {
let mutation_kind = request.mode.save_mutation_kind();
let mut span = Span::<E>::new(ExecKind::Save);
let result =
(|| {
let ctx = mutation_write_context::<E>(&self.db)?;
let schema = self.accepted_schema_info();
let schema_fingerprint = self.accepted_schema_fingerprint();
let validate_relations = schema.has_any_strong_relations();
let (entity, marker_row_op) = self.prepare_structural_mutation_row_op(
&ctx,
schema,
schema_fingerprint,
validate_relations,
request,
)?;
let prepared_row_op =
prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint::<
E,
>(&self.db, &marker_row_op, &ctx, &ctx, schema_fingerprint)?;
Self::commit_prepared_single_row(
&self.db,
marker_row_op,
prepared_row_op,
|delta| emit_index_delta_metrics::<E>(delta),
|| {
span.set_rows(1);
},
)?;
Self::record_save_mutation(mutation_kind, 1);
Ok(entity)
})();
if let Err(err) = &result {
span.set_error(err);
}
result
}
fn prepare_structural_mutation_row_op(
&self,
ctx: &Context<'_, E>,
schema: &SchemaInfo,
schema_fingerprint: CommitSchemaFingerprint,
validate_relations: bool,
request: StructuralMutationRequest<E>,
) -> Result<(E, CommitRowOp), InternalError> {
let StructuralMutationRequest {
mode,
key,
patch,
write_context,
accepted_row_decode_contract,
} = request;
Self::validate_structural_patch_write_bounds_with_accepted_contract(
&patch,
&accepted_row_decode_contract,
)?;
let mutation = if matches!(mode, MutationMode::Insert | MutationMode::Replace) {
MutationInput::from_accepted_complete_structural_patch::<E>(
key,
&patch,
accepted_row_decode_contract.clone(),
)?
} else {
MutationInput::from_accepted_sparse_structural_patch::<E>(
key,
&patch,
accepted_row_decode_contract.clone(),
)?
};
let data_key = mutation.data_key().clone();
let old_raw = Self::resolve_existing_row_for_rule_with_accepted_contract(
ctx,
&data_key,
mode.save_rule(),
&accepted_row_decode_contract,
self.accepted_schema_info(),
)?;
let entity = match mode {
MutationMode::Update => {
let baseline_row = Self::structural_update_baseline_row(old_raw.as_ref())?;
let raw_after_image =
Self::build_structural_update_after_image_row_with_accepted_contract(
&mutation,
baseline_row,
accepted_row_decode_contract.clone(),
)?;
self.validate_structural_after_image(
&data_key,
&raw_after_image,
accepted_row_decode_contract.clone(),
schema,
validate_relations,
write_context,
)?
}
MutationMode::Insert | MutationMode::Replace => self
.validate_structural_after_image_from_patch(
&data_key,
mutation.serialized_slots(),
accepted_row_decode_contract.clone(),
schema,
validate_relations,
write_context,
)?,
};
let row_bytes = Self::build_normalized_structural_after_image_row_with_accepted_contract(
&entity,
accepted_row_decode_contract.clone(),
)?;
let row_bytes = row_bytes.into_raw_row().into_bytes();
let before_bytes = old_raw
.as_ref()
.map(|row| {
Self::build_structural_before_image_bytes_with_accepted_contract(
row,
&accepted_row_decode_contract,
)
})
.transpose()?;
let marker_row_op = CommitRowOp::new(
E::PATH,
data_key.to_raw()?,
before_bytes,
Some(row_bytes),
schema_fingerprint,
);
Ok((entity, marker_row_op))
}
fn structural_update_baseline_row(old_row: Option<&RawRow>) -> Result<&RawRow, InternalError> {
let Some(old_row) = old_row else {
return Err(InternalError::executor_invariant(
"structural update staging requires an existing baseline row",
));
};
Ok(old_row)
}
fn build_structural_update_after_image_row_with_accepted_contract(
mutation: &MutationInput,
old_row: &RawRow,
accepted_row_decode_contract: AcceptedRowDecodeContract,
) -> Result<CanonicalRow, InternalError> {
apply_serialized_structural_patch_to_raw_row_with_accepted_contract(
E::PATH,
accepted_row_decode_contract,
old_row,
mutation.serialized_slots(),
)
}
fn build_normalized_structural_after_image_row_with_accepted_contract(
entity: &E,
accepted_row_decode_contract: AcceptedRowDecodeContract,
) -> Result<CanonicalRow, InternalError> {
canonical_row_from_entity_with_accepted_contract(
E::PATH,
accepted_row_decode_contract,
entity,
)
}
fn build_structural_before_image_bytes_with_accepted_contract(
old_row: &RawRow,
accepted_row_decode_contract: &AcceptedRowDecodeContract,
) -> Result<Vec<u8>, InternalError> {
let canonical = canonical_row_from_raw_row_with_accepted_decode_contract(
E::PATH,
accepted_row_decode_contract.clone(),
old_row,
)?;
Ok(canonical.into_raw_row().into_bytes())
}
fn validate_structural_after_image(
&self,
data_key: &DataKey,
row: &RawRow,
accepted_row_decode_contract: AcceptedRowDecodeContract,
schema: &SchemaInfo,
validate_relations: bool,
write_context: SanitizeWriteContext,
) -> Result<E, InternalError> {
let contract = StructuralRowContract::from_accepted_decode_contract(
E::PATH,
accepted_row_decode_contract,
);
let expected_key = data_key.try_key::<E>()?;
let mut slots = StructuralSlotReader::from_raw_row_with_validated_contract(row, contract)
.map_err(|err| {
InternalError::mutation_structural_after_image_invalid(
E::PATH,
data_key,
err.to_string(),
)
})?;
let mut entity = E::materialize_from_slots(&mut slots).map_err(|err| {
InternalError::mutation_structural_after_image_invalid(
E::PATH,
data_key,
err.to_string(),
)
})?;
let identity_key = entity.id().key();
if identity_key != expected_key {
let field_name = Self::primary_key_name_from_schema(schema)?;
let field_value = KeyValueCodec::to_key_value(&identity_key);
let identity_value = KeyValueCodec::to_key_value(&expected_key);
return Err(InternalError::mutation_entity_primary_key_mismatch(
E::PATH,
field_name,
&field_value,
&identity_value,
));
}
self.preflight_entity_with_cached_schema(
&mut entity,
schema,
validate_relations,
write_context,
None,
)?;
Ok(entity)
}
fn validate_structural_after_image_from_patch(
&self,
data_key: &DataKey,
patch: &SerializedStructuralPatch,
accepted_row_decode_contract: AcceptedRowDecodeContract,
schema: &SchemaInfo,
validate_relations: bool,
write_context: SanitizeWriteContext,
) -> Result<E, InternalError> {
let expected_key = data_key.try_key::<E>()?;
let mut entity =
crate::db::data::materialize_entity_from_serialized_structural_patch_with_accepted_contract::<E>(
patch,
accepted_row_decode_contract,
)
.map_err(|err| {
InternalError::mutation_structural_after_image_invalid(
E::PATH,
data_key,
err.to_string(),
)
})?;
let identity_key = entity.id().key();
if identity_key != expected_key {
let field_name = Self::primary_key_name_from_schema(schema)?;
let field_value = KeyValueCodec::to_key_value(&identity_key);
let identity_value = KeyValueCodec::to_key_value(&expected_key);
return Err(InternalError::mutation_entity_primary_key_mismatch(
E::PATH,
field_name,
&field_value,
&identity_value,
));
}
self.preflight_entity_with_cached_schema(
&mut entity,
schema,
validate_relations,
write_context,
None,
)?;
Ok(entity)
}
fn primary_key_name_from_schema(schema: &SchemaInfo) -> Result<&str, InternalError> {
schema.primary_key_name().ok_or_else(|| {
InternalError::executor_invariant(
"structural save validation requires schema primary-key metadata",
)
})
}
}