use super::*;
#[allow(
dead_code,
reason = "0.153 stages field-path runner orchestration before public DDL consumes it"
)]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::schema) enum SchemaFieldPathIndexRunnerError {
UnsupportedExecutionPlan,
TargetMismatch,
StageRowsFailed,
StagedStoreRejected,
IsolatedStoreValidationFailed,
RuntimeInvalidationIdentity,
SnapshotPublicationRejected,
PublishedStoreRejected,
}
#[allow(
dead_code,
reason = "0.153 stages field-path runner rollback diagnostics before public DDL consumes it"
)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db::schema) struct SchemaFieldPathIndexRunnerFailure {
error: SchemaFieldPathIndexRunnerError,
rollback_report: Option<Box<SchemaFieldPathIndexStagedStoreRollbackReport>>,
}
#[allow(
dead_code,
reason = "0.153 stages field-path runner rollback diagnostics before public DDL consumes it"
)]
impl SchemaFieldPathIndexRunnerFailure {
#[must_use]
const fn without_rollback(error: SchemaFieldPathIndexRunnerError) -> Self {
Self {
error,
rollback_report: None,
}
}
#[must_use]
fn with_rollback(
error: SchemaFieldPathIndexRunnerError,
rollback_report: SchemaFieldPathIndexStagedStoreRollbackReport,
) -> Self {
Self {
error,
rollback_report: Some(Box::new(rollback_report)),
}
}
#[must_use]
pub(in crate::db::schema) const fn error(&self) -> SchemaFieldPathIndexRunnerError {
self.error
}
#[must_use]
pub(in crate::db::schema) fn rollback_report(
&self,
) -> Option<&SchemaFieldPathIndexStagedStoreRollbackReport> {
self.rollback_report.as_deref()
}
}
#[allow(
dead_code,
reason = "0.153 stages field-path runner orchestration before public DDL consumes it"
)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db::schema) struct SchemaFieldPathIndexRunnerReport {
store: String,
write_report: SchemaFieldPathIndexStagedStoreWriteReport,
validation: SchemaFieldPathIndexIsolatedIndexStoreValidation,
invalidation_report: SchemaFieldPathIndexRuntimeInvalidationReport,
publication_report: SchemaFieldPathIndexSnapshotPublicationReport,
published_store_report: SchemaFieldPathIndexPublishedStoreReport,
}
#[allow(
dead_code,
reason = "0.153 stages field-path runner orchestration before public DDL consumes it"
)]
impl SchemaFieldPathIndexRunnerReport {
#[must_use]
pub(in crate::db::schema) const fn store(&self) -> &str {
self.store.as_str()
}
#[must_use]
pub(in crate::db::schema) const fn write_report(
&self,
) -> &SchemaFieldPathIndexStagedStoreWriteReport {
&self.write_report
}
#[must_use]
pub(in crate::db::schema) const fn validation(
&self,
) -> &SchemaFieldPathIndexIsolatedIndexStoreValidation {
&self.validation
}
#[must_use]
pub(in crate::db::schema) const fn invalidation_report(
&self,
) -> &SchemaFieldPathIndexRuntimeInvalidationReport {
&self.invalidation_report
}
#[must_use]
pub(in crate::db::schema) const fn publication_report(
&self,
) -> &SchemaFieldPathIndexSnapshotPublicationReport {
&self.publication_report
}
#[must_use]
pub(in crate::db::schema) const fn published_store_report(
&self,
) -> &SchemaFieldPathIndexPublishedStoreReport {
&self.published_store_report
}
#[must_use]
pub(in crate::db::schema) const fn runner_report(&self) -> &SchemaMutationRunnerReport {
self.published_store_report.runner_report()
}
#[must_use]
pub(in crate::db::schema) fn publication_readiness(
&self,
) -> SchemaFieldPathIndexStagedStorePublicationReadiness {
self.published_store_report.publication_readiness()
}
}
#[allow(
dead_code,
reason = "0.153 stages field-path runner orchestration before public DDL consumes it"
)]
pub(in crate::db::schema) struct SchemaFieldPathIndexRunner;
#[allow(
dead_code,
reason = "0.153 stages field-path runner orchestration before public DDL consumes it"
)]
impl SchemaFieldPathIndexRunner {
pub(in crate::db::schema) fn run<'a>(
input: &SchemaMutationRunnerInput<'_>,
entity_tag: EntityTag,
target: SchemaFieldPathIndexRebuildTarget,
rows: impl IntoIterator<Item = SchemaFieldPathIndexRebuildRow<'a>>,
index_store: &mut IndexStore,
invalidation_sink: &mut impl SchemaMutationRuntimeInvalidationSink,
publication_sink: &mut impl SchemaMutationAcceptedSnapshotPublicationSink,
) -> Result<SchemaFieldPathIndexRunnerReport, SchemaFieldPathIndexRunnerFailure> {
Self::validate_execution_plan(input.execution_plan(), &target)
.map_err(SchemaFieldPathIndexRunnerFailure::without_rollback)?;
let staged = SchemaFieldPathIndexStagedRebuild::from_rows(
input.accepted_after().entity_path(),
entity_tag,
target,
rows,
)
.map_err(|_| {
SchemaFieldPathIndexRunnerFailure::without_rollback(
SchemaFieldPathIndexRunnerError::StageRowsFailed,
)
})?;
let staged_store =
SchemaFieldPathIndexStagedStore::from_rebuild(&staged, input.execution_plan())
.map_err(|_| {
SchemaFieldPathIndexRunnerFailure::without_rollback(
SchemaFieldPathIndexRunnerError::StagedStoreRejected,
)
})?;
let store = staged_store.store().to_string();
let (write_report, validation) = {
let mut writer = SchemaFieldPathIndexIsolatedIndexStoreWriter::new(&store, index_store);
let batch = staged_store.write_batch(&writer);
let write_report = batch.write_to(&mut writer);
let Ok(validation) = writer.validate_batch(&batch) else {
let rollback_report = batch.rollback_plan().rollback_to(&mut writer);
return Err(SchemaFieldPathIndexRunnerFailure::with_rollback(
SchemaFieldPathIndexRunnerError::IsolatedStoreValidationFailed,
rollback_report,
));
};
(write_report, validation)
};
let invalidation_plan =
SchemaFieldPathIndexRuntimeInvalidationPlan::from_isolated_index_store_validation(
&validation,
input,
)
.map_err(|_| {
SchemaFieldPathIndexRunnerFailure::without_rollback(
SchemaFieldPathIndexRunnerError::RuntimeInvalidationIdentity,
)
})?;
let invalidation_report = invalidation_plan.invalidate_runtime_state(invalidation_sink);
let publication_plan =
SchemaFieldPathIndexSnapshotPublicationPlan::from_runtime_invalidation_report(
&invalidation_report,
input,
)
.map_err(|_| {
SchemaFieldPathIndexRunnerFailure::without_rollback(
SchemaFieldPathIndexRunnerError::SnapshotPublicationRejected,
)
})?;
let publication_report = publication_plan.publish_snapshot(publication_sink);
let published_store_plan =
SchemaFieldPathIndexPublishedStorePlan::from_validated_publication(
&validation,
&publication_report,
)
.map_err(|_| {
SchemaFieldPathIndexRunnerFailure::without_rollback(
SchemaFieldPathIndexRunnerError::PublishedStoreRejected,
)
})?;
let published_store_report = published_store_plan
.publish_index_store(index_store)
.map_err(|_| {
SchemaFieldPathIndexRunnerFailure::without_rollback(
SchemaFieldPathIndexRunnerError::PublishedStoreRejected,
)
})?;
Ok(SchemaFieldPathIndexRunnerReport {
store,
write_report,
validation,
invalidation_report,
publication_report,
published_store_report,
})
}
fn validate_execution_plan(
execution_plan: &SchemaMutationExecutionPlan,
target: &SchemaFieldPathIndexRebuildTarget,
) -> Result<(), SchemaFieldPathIndexRunnerError> {
match execution_plan.steps() {
[
SchemaMutationExecutionStep::BuildFieldPathIndex {
target: planned_target,
},
SchemaMutationExecutionStep::ValidatePhysicalWork,
SchemaMutationExecutionStep::InvalidateRuntimeState,
] if planned_target == target => Ok(()),
[
SchemaMutationExecutionStep::BuildFieldPathIndex { .. },
SchemaMutationExecutionStep::ValidatePhysicalWork,
SchemaMutationExecutionStep::InvalidateRuntimeState,
] => Err(SchemaFieldPathIndexRunnerError::TargetMismatch),
_ => Err(SchemaFieldPathIndexRunnerError::UnsupportedExecutionPlan),
}
}
}