use crate::{
db::{
PersistedRow,
commit::prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint,
data::DataKey,
executor::mutation::save::shared::accumulate_prepared_row_op_delta,
executor::mutation::{
PreparedRowOpDelta, emit_index_delta_metrics, mutation_write_context,
},
relation::model_has_strong_relation_targets,
schema::commit_schema_fingerprint_for_entity,
},
error::InternalError,
metrics::sink::{ExecKind, MetricsEvent, Span, record},
traits::EntityValue,
types::Timestamp,
};
use std::collections::HashSet;
use crate::db::executor::mutation::save::{SaveExecutor, SaveMode, SavePreflightInputs, SaveRule};
impl<E: PersistedRow + EntityValue> SaveExecutor<E> {
fn save_batch_non_atomic(
&self,
mode: SaveMode,
entities: impl IntoIterator<Item = E>,
) -> Result<Vec<E>, InternalError> {
let iter = entities.into_iter();
let mut out = Vec::with_capacity(iter.size_hint().0);
let ctx = mutation_write_context::<E>(&self.db)?;
let save_rule = SaveRule::from_mode(mode);
let schema = Self::schema_info();
let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
let validate_relations = model_has_strong_relation_targets(E::MODEL);
let write_context = Self::save_write_context(mode, Timestamp::now());
let preflight = SavePreflightInputs {
schema,
schema_fingerprint,
validate_relations,
write_context,
authored_create_slots: None,
};
let mut batch_span = None;
let mut batch_delta = PreparedRowOpDelta {
index_inserts: 0,
index_removes: 0,
reverse_index_inserts: 0,
reverse_index_removes: 0,
};
for entity in iter {
let span = batch_span.get_or_insert_with(|| Span::<E>::new(ExecKind::Save));
let result = (|| {
let (saved, marker_row_op) =
self.prepare_entity_save_row_op(&ctx, save_rule, preflight, entity)?;
let prepared_row_op =
prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint::<
E,
>(&self.db, &marker_row_op, &ctx, &ctx, schema_fingerprint)?;
Self::commit_prepared_single_row(
&self.db,
marker_row_op,
prepared_row_op,
|delta| accumulate_prepared_row_op_delta(&mut batch_delta, delta),
|| {},
)?;
Ok::<_, InternalError>(saved)
})();
match result {
Ok(saved) => {
out.push(saved);
span.set_rows(u64::try_from(out.len()).unwrap_or(u64::MAX));
}
Err(err) => {
if !out.is_empty() {
emit_index_delta_metrics::<E>(&batch_delta);
record(MetricsEvent::NonAtomicPartialCommit {
entity_path: E::PATH,
committed_rows: u64::try_from(out.len()).unwrap_or(u64::MAX),
});
}
return Err(err);
}
}
}
if !out.is_empty() {
emit_index_delta_metrics::<E>(&batch_delta);
}
Ok(out)
}
fn save_batch_atomic(
&self,
mode: SaveMode,
entities: impl IntoIterator<Item = E>,
) -> Result<Vec<E>, InternalError> {
let entities: Vec<E> = entities.into_iter().collect();
self.save_batch_atomic_impl(SaveRule::from_mode(mode), entities)
}
pub(crate) fn insert_many_atomic(
&self,
entities: impl IntoIterator<Item = E>,
) -> Result<Vec<E>, InternalError> {
self.save_batch_atomic(SaveMode::Insert, entities)
}
pub(crate) fn update_many_atomic(
&self,
entities: impl IntoIterator<Item = E>,
) -> Result<Vec<E>, InternalError> {
self.save_batch_atomic(SaveMode::Update, entities)
}
pub(crate) fn replace_many_atomic(
&self,
entities: impl IntoIterator<Item = E>,
) -> Result<Vec<E>, InternalError> {
self.save_batch_atomic(SaveMode::Replace, entities)
}
pub(crate) fn insert_many_non_atomic(
&self,
entities: impl IntoIterator<Item = E>,
) -> Result<Vec<E>, InternalError> {
self.save_batch_non_atomic(SaveMode::Insert, entities)
}
pub(crate) fn update_many_non_atomic(
&self,
entities: impl IntoIterator<Item = E>,
) -> Result<Vec<E>, InternalError> {
self.save_batch_non_atomic(SaveMode::Update, entities)
}
pub(crate) fn replace_many_non_atomic(
&self,
entities: impl IntoIterator<Item = E>,
) -> Result<Vec<E>, InternalError> {
self.save_batch_non_atomic(SaveMode::Replace, entities)
}
#[inline(never)]
fn save_batch_atomic_impl(
&self,
save_rule: SaveRule,
entities: Vec<E>,
) -> Result<Vec<E>, InternalError> {
let mut span = Span::<E>::new(ExecKind::Save);
let ctx = mutation_write_context::<E>(&self.db)?;
let mut out = Vec::with_capacity(entities.len());
let mut marker_row_ops = Vec::with_capacity(entities.len());
let mut seen_row_keys = HashSet::with_capacity(entities.len());
let schema = Self::schema_info();
let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
let validate_relations = model_has_strong_relation_targets(E::MODEL);
let write_context = Self::save_write_context(
match save_rule {
SaveRule::RequireAbsent => SaveMode::Insert,
SaveRule::RequirePresent => SaveMode::Update,
SaveRule::AllowAny => SaveMode::Replace,
},
Timestamp::now(),
);
let preflight = SavePreflightInputs {
schema,
schema_fingerprint,
validate_relations,
write_context,
authored_create_slots: None,
};
for mut entity in entities {
self.preflight_entity_with_cached_schema(
&mut entity,
preflight.schema,
preflight.validate_relations,
preflight.write_context,
preflight.authored_create_slots,
)?;
let marker_row_op = Self::prepare_typed_entity_row_op(
&ctx,
save_rule,
&entity,
preflight.schema_fingerprint,
)?;
if !seen_row_keys.insert(marker_row_op.key) {
let data_key = DataKey::try_new::<E>(entity.id().key())?;
return Err(InternalError::mutation_atomic_save_duplicate_key(
E::PATH,
data_key,
));
}
marker_row_ops.push(marker_row_op);
out.push(entity);
}
if marker_row_ops.is_empty() {
return Ok(out);
}
Self::commit_atomic_batch(&self.db, marker_row_ops, &mut span)?;
Ok(out)
}
}