use crate::db::{
codec::{
finalize_hash_sha256, new_hash_sha256_prefixed, write_hash_str_u32, write_hash_tag_u8,
write_hash_u32,
},
data::CanonicalSlotReader,
index::{
IndexEntryValue, IndexId, IndexKey, IndexState, IndexStore, IndexStoreVisit,
RawIndexStoreKey,
},
key_taxonomy::PrimaryKeyValue,
predicate::PredicateProgram,
schema::{
AcceptedSchemaSnapshot, FieldId, PersistedFieldSnapshot, PersistedIndexSnapshot,
PersistedSchemaSnapshot, SchemaFieldSlot, encode_persisted_schema_snapshot,
},
};
use crate::error::InternalError;
use crate::types::EntityTag;
use sha2::Digest;
use std::collections::BTreeMap;
mod field;
pub(in crate::db) use field::{
SchemaFieldAdditionTarget, SchemaFieldDefaultTarget, SchemaFieldDropTarget,
SchemaFieldNullabilityTarget, SchemaFieldRenameTarget,
derive_sql_ddl_field_addition_accepted_after, derive_sql_ddl_field_default_accepted_after,
derive_sql_ddl_field_drop_accepted_after, derive_sql_ddl_field_nullability_accepted_after,
derive_sql_ddl_field_rename_accepted_after, resolve_sql_ddl_field_drop_dependent_index,
};
#[cfg(test)]
pub(in crate::db) use field::{
admit_sql_ddl_field_addition_candidate, admit_sql_ddl_field_default_candidate,
admit_sql_ddl_field_drop_candidate, admit_sql_ddl_field_nullability_candidate,
admit_sql_ddl_field_rename_candidate,
};
mod index;
pub(in crate::db) use index::{
SchemaExpressionIndexRebuildExpression, SchemaExpressionIndexRebuildKey,
SchemaExpressionIndexRebuildTarget, SchemaFieldPathIndexRebuildKey,
SchemaFieldPathIndexRebuildTarget, SchemaSecondaryIndexDropCleanupTarget,
derive_sql_ddl_expression_index_accepted_after, derive_sql_ddl_field_path_index_accepted_after,
derive_sql_ddl_secondary_index_drop_accepted_after,
resolve_sql_ddl_secondary_index_drop_candidate,
};
#[cfg(test)]
pub(in crate::db) use index::{
admit_sql_ddl_expression_index_candidate, admit_sql_ddl_field_path_index_candidate,
admit_sql_ddl_secondary_index_drop_candidate,
};
#[allow(
dead_code,
reason = "used by mutation fingerprint tests until audit identity is surfaced in diagnostics"
)]
const SCHEMA_MUTATION_FINGERPRINT_PROFILE_TAG: &[u8] = b"icydb:schema-mutation-plan:v1";
#[allow(
dead_code,
reason = "0.153 stages runtime epoch identity before physical runners publish snapshots"
)]
const SCHEMA_MUTATION_RUNTIME_EPOCH_PROFILE_TAG: &[u8] = b"icydb:schema-mutation-runtime-epoch:v1";
#[allow(
dead_code,
reason = "0.152 defines the first mutation vocabulary before every operation is executable"
)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db::schema) enum SchemaMutation {
AddNullableField {
field_id: FieldId,
name: String,
slot: SchemaFieldSlot,
},
AddDefaultedField {
field_id: FieldId,
name: String,
slot: SchemaFieldSlot,
},
AddFieldPathIndex {
target: SchemaFieldPathIndexRebuildTarget,
},
AddExpressionIndex {
target: SchemaExpressionIndexRebuildTarget,
},
DropNonRequiredSecondaryIndex {
target: SchemaSecondaryIndexDropCleanupTarget,
},
AlterNullability {
field_id: FieldId,
},
}
#[allow(
dead_code,
reason = "0.152 stages the internal mutation request API before every request has a live caller"
)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db::schema) enum SchemaMutationRequest<'a> {
ExactMatch,
AppendOnlyFields(&'a [PersistedFieldSnapshot]),
AddFieldPathIndex {
target: SchemaFieldPathIndexRebuildTarget,
},
AddExpressionIndex {
target: SchemaExpressionIndexRebuildTarget,
},
DropNonRequiredSecondaryIndex {
target: SchemaSecondaryIndexDropCleanupTarget,
},
AlterNullability {
field_id: FieldId,
},
Incompatible,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db) enum AcceptedSchemaMutationError {
UnsupportedIndexKeyShape,
EmptyIndexKey,
ExpressionIndexRequiresExpressionKey,
}
#[allow(
dead_code,
reason = "0.155 stages SQL DDL lowering before execution can call the runner"
)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db) struct SchemaDdlMutationAdmission {
target: SchemaDdlMutationTarget,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db) enum SchemaDdlIndexDropCandidateError {
Generated,
Unknown,
Unsupported,
}
#[allow(
dead_code,
reason = "0.155 stages SQL DDL lowering before execution can call the runner"
)]
impl SchemaDdlMutationAdmission {
#[must_use]
pub(in crate::db) fn target(&self) -> &SchemaFieldPathIndexRebuildTarget {
let SchemaDdlMutationTarget::FieldPathAddition(target) = &self.target else {
panic!("SQL DDL admission does not carry a field-path index rebuild target");
};
target
}
#[must_use]
pub(in crate::db) const fn expression_target(
&self,
) -> Option<&SchemaExpressionIndexRebuildTarget> {
match &self.target {
SchemaDdlMutationTarget::ExpressionAddition(target) => Some(target),
SchemaDdlMutationTarget::FieldPathAddition(_)
| SchemaDdlMutationTarget::FieldAddition(_)
| SchemaDdlMutationTarget::FieldDefaultChange(_)
| SchemaDdlMutationTarget::FieldDrop(_)
| SchemaDdlMutationTarget::FieldNullabilityChange(_)
| SchemaDdlMutationTarget::FieldRename(_)
| SchemaDdlMutationTarget::SecondaryDrop(_) => None,
}
}
#[must_use]
pub(in crate::db) const fn drop_target(
&self,
) -> Option<&SchemaSecondaryIndexDropCleanupTarget> {
match &self.target {
SchemaDdlMutationTarget::FieldPathAddition(_)
| SchemaDdlMutationTarget::FieldAddition(_)
| SchemaDdlMutationTarget::FieldDefaultChange(_)
| SchemaDdlMutationTarget::FieldDrop(_)
| SchemaDdlMutationTarget::FieldNullabilityChange(_)
| SchemaDdlMutationTarget::FieldRename(_)
| SchemaDdlMutationTarget::ExpressionAddition(_) => None,
SchemaDdlMutationTarget::SecondaryDrop(target) => Some(target),
}
}
#[must_use]
pub(in crate::db) const fn field_addition_target(&self) -> Option<&SchemaFieldAdditionTarget> {
match &self.target {
SchemaDdlMutationTarget::FieldAddition(target) => Some(target),
SchemaDdlMutationTarget::FieldPathAddition(_)
| SchemaDdlMutationTarget::FieldDefaultChange(_)
| SchemaDdlMutationTarget::FieldDrop(_)
| SchemaDdlMutationTarget::FieldNullabilityChange(_)
| SchemaDdlMutationTarget::FieldRename(_)
| SchemaDdlMutationTarget::ExpressionAddition(_)
| SchemaDdlMutationTarget::SecondaryDrop(_) => None,
}
}
#[must_use]
pub(in crate::db) const fn field_default_target(&self) -> Option<&SchemaFieldDefaultTarget> {
match &self.target {
SchemaDdlMutationTarget::FieldDefaultChange(target) => Some(target),
SchemaDdlMutationTarget::FieldAddition(_)
| SchemaDdlMutationTarget::FieldPathAddition(_)
| SchemaDdlMutationTarget::FieldDrop(_)
| SchemaDdlMutationTarget::FieldNullabilityChange(_)
| SchemaDdlMutationTarget::FieldRename(_)
| SchemaDdlMutationTarget::ExpressionAddition(_)
| SchemaDdlMutationTarget::SecondaryDrop(_) => None,
}
}
#[must_use]
pub(in crate::db) const fn field_nullability_target(
&self,
) -> Option<&SchemaFieldNullabilityTarget> {
match &self.target {
SchemaDdlMutationTarget::FieldNullabilityChange(target) => Some(target),
SchemaDdlMutationTarget::FieldAddition(_)
| SchemaDdlMutationTarget::FieldDefaultChange(_)
| SchemaDdlMutationTarget::FieldDrop(_)
| SchemaDdlMutationTarget::FieldPathAddition(_)
| SchemaDdlMutationTarget::FieldRename(_)
| SchemaDdlMutationTarget::ExpressionAddition(_)
| SchemaDdlMutationTarget::SecondaryDrop(_) => None,
}
}
#[must_use]
pub(in crate::db) const fn field_rename_target(&self) -> Option<&SchemaFieldRenameTarget> {
match &self.target {
SchemaDdlMutationTarget::FieldRename(target) => Some(target),
SchemaDdlMutationTarget::FieldAddition(_)
| SchemaDdlMutationTarget::FieldDefaultChange(_)
| SchemaDdlMutationTarget::FieldDrop(_)
| SchemaDdlMutationTarget::FieldNullabilityChange(_)
| SchemaDdlMutationTarget::FieldPathAddition(_)
| SchemaDdlMutationTarget::ExpressionAddition(_)
| SchemaDdlMutationTarget::SecondaryDrop(_) => None,
}
}
#[must_use]
pub(in crate::db) const fn field_drop_target(&self) -> Option<&SchemaFieldDropTarget> {
match &self.target {
SchemaDdlMutationTarget::FieldDrop(target) => Some(target),
SchemaDdlMutationTarget::FieldAddition(_)
| SchemaDdlMutationTarget::FieldDefaultChange(_)
| SchemaDdlMutationTarget::FieldNullabilityChange(_)
| SchemaDdlMutationTarget::FieldRename(_)
| SchemaDdlMutationTarget::FieldPathAddition(_)
| SchemaDdlMutationTarget::ExpressionAddition(_)
| SchemaDdlMutationTarget::SecondaryDrop(_) => None,
}
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
enum SchemaDdlMutationTarget {
FieldAddition(SchemaFieldAdditionTarget),
FieldDefaultChange(SchemaFieldDefaultTarget),
FieldDrop(SchemaFieldDropTarget),
FieldNullabilityChange(SchemaFieldNullabilityTarget),
FieldRename(SchemaFieldRenameTarget),
FieldPathAddition(SchemaFieldPathIndexRebuildTarget),
ExpressionAddition(SchemaExpressionIndexRebuildTarget),
SecondaryDrop(SchemaSecondaryIndexDropCleanupTarget),
}
#[allow(
dead_code,
reason = "0.155 stages SQL DDL accepted-after derivation before execution can publish it"
)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db) struct SchemaDdlAcceptedSnapshotDerivation {
accepted_after: AcceptedSchemaSnapshot,
admission: SchemaDdlMutationAdmission,
}
#[allow(
dead_code,
reason = "0.155 stages SQL DDL accepted-after derivation before execution can publish it"
)]
impl SchemaDdlAcceptedSnapshotDerivation {
#[must_use]
pub(in crate::db) const fn accepted_after(&self) -> &AcceptedSchemaSnapshot {
&self.accepted_after
}
#[must_use]
pub(in crate::db) const fn admission(&self) -> &SchemaDdlMutationAdmission {
&self.admission
}
}
#[allow(
dead_code,
reason = "0.155 stages SQL DDL lowering before execution can call the runner"
)]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db) enum SchemaDdlMutationAdmissionError {
AcceptedIndex(AcceptedSchemaMutationError),
AcceptedAfterRejected,
UnsupportedExecutionPath,
}
#[allow(
dead_code,
reason = "0.152 stages rebuild and unsupported buckets before every bucket has a live caller"
)]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::schema) enum MutationCompatibility {
MetadataOnlySafe,
RequiresRebuild,
UnsupportedPreOne,
Incompatible,
}
#[allow(
dead_code,
reason = "0.152 exposes future rebuild buckets before orchestration consumes them"
)]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::schema) enum RebuildRequirement {
NoRebuildRequired,
IndexRebuildRequired,
FullDataRewriteRequired,
Unsupported,
}
#[allow(
dead_code,
reason = "0.152 stages rebuild orchestration contracts before execution consumes them"
)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db::schema) enum SchemaRebuildAction {
BuildFieldPathIndex {
target: SchemaFieldPathIndexRebuildTarget,
},
BuildExpressionIndex {
target: SchemaExpressionIndexRebuildTarget,
},
DropSecondaryIndex {
target: SchemaSecondaryIndexDropCleanupTarget,
},
RewriteAllRows,
Unsupported {
reason: &'static str,
},
}
#[allow(
dead_code,
reason = "0.152 stages rebuild orchestration contracts before execution consumes them"
)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db::schema) struct SchemaRebuildPlan {
requirement: RebuildRequirement,
actions: Vec<SchemaRebuildAction>,
}
#[allow(
dead_code,
reason = "0.152 stages rebuild orchestration contracts before execution consumes them"
)]
impl SchemaRebuildPlan {
const fn no_rebuild() -> Self {
Self {
requirement: RebuildRequirement::NoRebuildRequired,
actions: Vec::new(),
}
}
const fn new(requirement: RebuildRequirement, actions: Vec<SchemaRebuildAction>) -> Self {
Self {
requirement,
actions,
}
}
#[must_use]
pub(in crate::db::schema) const fn requirement(&self) -> RebuildRequirement {
self.requirement
}
#[must_use]
pub(in crate::db::schema) const fn actions(&self) -> &[SchemaRebuildAction] {
self.actions.as_slice()
}
#[must_use]
pub(in crate::db::schema) const fn requires_physical_work(&self) -> bool {
!matches!(self.requirement, RebuildRequirement::NoRebuildRequired)
}
#[must_use]
const fn publication_blocker(&self) -> Option<MutationPublicationBlocker> {
if self.requires_physical_work() {
return Some(MutationPublicationBlocker::RebuildRequired(
self.requirement,
));
}
None
}
}
mod runner;
pub(in crate::db::schema) use self::runner::*;
mod field_path;
pub(in crate::db::schema) use self::field_path::*;
mod expression;
#[allow(
unused_imports,
reason = "expression staging is consumed by tests and later physical runner wiring"
)]
pub(in crate::db::schema) use self::expression::*;
#[allow(
dead_code,
reason = "0.152 stages runner preflight publication checks before physical runners consume them"
)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db::schema) enum MutationPublicationPreflight {
PublishableNow,
PhysicalWorkReady {
step_count: usize,
required: Vec<SchemaMutationRunnerCapability>,
},
MissingRunnerCapabilities {
missing: Vec<SchemaMutationRunnerCapability>,
},
Rejected {
requirement: RebuildRequirement,
},
Blocked(MutationPublicationBlocker),
}
#[allow(
dead_code,
reason = "0.152 stages runner preflight contracts before physical runners consume them"
)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db::schema) struct SchemaMutationRunnerContract {
capabilities: Vec<SchemaMutationRunnerCapability>,
}
#[allow(
dead_code,
reason = "0.152 stages runner preflight contracts before physical runners consume them"
)]
impl SchemaMutationRunnerContract {
#[must_use]
pub(in crate::db::schema) fn new(capabilities: &[SchemaMutationRunnerCapability]) -> Self {
let mut deduped = Vec::new();
for capability in capabilities {
push_runner_capability_once(&mut deduped, *capability);
}
Self {
capabilities: deduped,
}
}
#[must_use]
pub(in crate::db::schema) const fn capabilities(&self) -> &[SchemaMutationRunnerCapability] {
self.capabilities.as_slice()
}
#[must_use]
pub(in crate::db::schema) fn preflight(
&self,
execution_plan: &SchemaMutationExecutionPlan,
) -> SchemaMutationRunnerPreflight {
match execution_plan.admit_runner_capabilities(self.capabilities()) {
SchemaMutationExecutionAdmission::PublishableNow => {
SchemaMutationRunnerPreflight::NoPhysicalWork
}
SchemaMutationExecutionAdmission::RunnerReady { required } => {
SchemaMutationRunnerPreflight::Ready {
step_count: execution_plan.steps().len(),
required,
}
}
SchemaMutationExecutionAdmission::MissingRunnerCapabilities { missing } => {
SchemaMutationRunnerPreflight::MissingCapabilities { missing }
}
SchemaMutationExecutionAdmission::Rejected { requirement } => {
SchemaMutationRunnerPreflight::Rejected { requirement }
}
}
}
#[must_use]
pub(in crate::db::schema) fn outcome(
&self,
execution_plan: &SchemaMutationExecutionPlan,
) -> SchemaMutationRunnerOutcome {
match self.preflight(execution_plan) {
SchemaMutationRunnerPreflight::NoPhysicalWork => {
SchemaMutationRunnerOutcome::NoPhysicalWork(
SchemaMutationRunnerReport::preflight_ready(0, Vec::new(), None),
)
}
SchemaMutationRunnerPreflight::Ready {
step_count,
required,
} => SchemaMutationRunnerOutcome::ReadyForPhysicalWork(
SchemaMutationRunnerReport::preflight_ready(
step_count,
required,
Some(SchemaMutationStoreVisibility::StagedOnly),
),
),
SchemaMutationRunnerPreflight::MissingCapabilities { missing } => {
SchemaMutationRunnerOutcome::Rejected(
SchemaMutationRunnerRejection::missing_runner_capabilities(
execution_plan.physical_requirement(),
missing,
),
)
}
SchemaMutationRunnerPreflight::Rejected { requirement } => {
SchemaMutationRunnerOutcome::Rejected(
SchemaMutationRunnerRejection::unsupported_requirement(requirement),
)
}
}
}
}
#[allow(
dead_code,
reason = "0.152 stages execution-boundary contracts before physical runners consume them"
)]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::schema) enum SchemaMutationExecutionGate {
ReadyToPublish,
AwaitingPhysicalWork {
requirement: RebuildRequirement,
step_count: usize,
},
Rejected {
requirement: RebuildRequirement,
},
}
#[allow(
dead_code,
reason = "0.152 stages execution-boundary contracts before physical runners consume them"
)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db::schema) struct SchemaMutationExecutionPlan {
readiness: SchemaMutationExecutionReadiness,
steps: Vec<SchemaMutationExecutionStep>,
}
#[allow(
dead_code,
reason = "0.152 stages execution-boundary contracts before physical runners consume them"
)]
impl SchemaMutationExecutionPlan {
const fn publishable_now() -> Self {
Self {
readiness: SchemaMutationExecutionReadiness::PublishableNow,
steps: Vec::new(),
}
}
fn from_rebuild_plan(rebuild_plan: SchemaRebuildPlan) -> Self {
if !rebuild_plan.requires_physical_work() {
return Self::publishable_now();
}
let readiness = match rebuild_plan.requirement() {
RebuildRequirement::NoRebuildRequired => {
SchemaMutationExecutionReadiness::PublishableNow
}
RebuildRequirement::IndexRebuildRequired => {
SchemaMutationExecutionReadiness::RequiresPhysicalRunner(
RebuildRequirement::IndexRebuildRequired,
)
}
RebuildRequirement::FullDataRewriteRequired | RebuildRequirement::Unsupported => {
SchemaMutationExecutionReadiness::Unsupported(rebuild_plan.requirement())
}
};
let mut steps = rebuild_plan
.actions()
.iter()
.map(|action| match action {
SchemaRebuildAction::BuildFieldPathIndex { target } => {
SchemaMutationExecutionStep::BuildFieldPathIndex {
target: target.clone(),
}
}
SchemaRebuildAction::BuildExpressionIndex { target } => {
SchemaMutationExecutionStep::BuildExpressionIndex {
target: target.clone(),
}
}
SchemaRebuildAction::DropSecondaryIndex { target } => {
SchemaMutationExecutionStep::DropSecondaryIndex {
target: target.clone(),
}
}
SchemaRebuildAction::RewriteAllRows => SchemaMutationExecutionStep::RewriteAllRows,
SchemaRebuildAction::Unsupported { reason } => {
SchemaMutationExecutionStep::Unsupported { reason }
}
})
.collect::<Vec<_>>();
if matches!(
readiness,
SchemaMutationExecutionReadiness::RequiresPhysicalRunner(_)
) {
steps.push(SchemaMutationExecutionStep::ValidatePhysicalWork);
steps.push(SchemaMutationExecutionStep::InvalidateRuntimeState);
}
Self { readiness, steps }
}
#[must_use]
pub(in crate::db::schema) const fn readiness(&self) -> SchemaMutationExecutionReadiness {
self.readiness
}
#[must_use]
pub(in crate::db::schema) const fn steps(&self) -> &[SchemaMutationExecutionStep] {
self.steps.as_slice()
}
#[must_use]
pub(in crate::db::schema) const fn execution_gate(&self) -> SchemaMutationExecutionGate {
match self.readiness {
SchemaMutationExecutionReadiness::PublishableNow => {
SchemaMutationExecutionGate::ReadyToPublish
}
SchemaMutationExecutionReadiness::RequiresPhysicalRunner(requirement) => {
SchemaMutationExecutionGate::AwaitingPhysicalWork {
requirement,
step_count: self.steps.len(),
}
}
SchemaMutationExecutionReadiness::Unsupported(requirement) => {
SchemaMutationExecutionGate::Rejected { requirement }
}
}
}
#[must_use]
const fn physical_requirement(&self) -> Option<RebuildRequirement> {
match self.execution_gate() {
SchemaMutationExecutionGate::ReadyToPublish => None,
SchemaMutationExecutionGate::AwaitingPhysicalWork { requirement, .. }
| SchemaMutationExecutionGate::Rejected { requirement } => Some(requirement),
}
}
#[must_use]
pub(in crate::db::schema) fn runner_capabilities(&self) -> Vec<SchemaMutationRunnerCapability> {
let mut capabilities = Vec::new();
for step in &self.steps {
let capability = match step {
SchemaMutationExecutionStep::BuildFieldPathIndex { .. } => {
Some(SchemaMutationRunnerCapability::BuildFieldPathIndex)
}
SchemaMutationExecutionStep::BuildExpressionIndex { .. } => {
Some(SchemaMutationRunnerCapability::BuildExpressionIndex)
}
SchemaMutationExecutionStep::DropSecondaryIndex { .. } => {
Some(SchemaMutationRunnerCapability::DropSecondaryIndex)
}
SchemaMutationExecutionStep::ValidatePhysicalWork => {
Some(SchemaMutationRunnerCapability::ValidatePhysicalWork)
}
SchemaMutationExecutionStep::InvalidateRuntimeState => {
Some(SchemaMutationRunnerCapability::InvalidateRuntimeState)
}
SchemaMutationExecutionStep::RewriteAllRows => {
Some(SchemaMutationRunnerCapability::RewriteAllRows)
}
SchemaMutationExecutionStep::Unsupported { .. } => None,
};
if let Some(capability) = capability {
push_runner_capability_once(&mut capabilities, capability);
}
}
capabilities
}
#[must_use]
pub(in crate::db::schema) fn admit_runner_capabilities(
&self,
available: &[SchemaMutationRunnerCapability],
) -> SchemaMutationExecutionAdmission {
match self.execution_gate() {
SchemaMutationExecutionGate::ReadyToPublish => {
SchemaMutationExecutionAdmission::PublishableNow
}
SchemaMutationExecutionGate::Rejected { requirement } => {
SchemaMutationExecutionAdmission::Rejected { requirement }
}
SchemaMutationExecutionGate::AwaitingPhysicalWork { .. } => {
let required = self.runner_capabilities();
let missing = required
.iter()
.copied()
.filter(|capability| !available.contains(capability))
.collect::<Vec<_>>();
if missing.is_empty() {
SchemaMutationExecutionAdmission::RunnerReady { required }
} else {
SchemaMutationExecutionAdmission::MissingRunnerCapabilities { missing }
}
}
}
}
#[must_use]
fn has_unsupported_supported_path_step(&self) -> bool {
self.steps.iter().any(|step| {
matches!(
step,
SchemaMutationExecutionStep::BuildExpressionIndex { .. }
| SchemaMutationExecutionStep::DropSecondaryIndex { .. }
| SchemaMutationExecutionStep::RewriteAllRows
| SchemaMutationExecutionStep::Unsupported { .. }
)
})
}
#[allow(
dead_code,
reason = "0.154 starts supported-path admission before reconciliation consumes it"
)]
pub(in crate::db::schema) fn supported_developer_execution_path(
&self,
) -> Result<SchemaMutationSupportedExecutionPath, SchemaMutationSupportedPathRejection> {
match self.readiness {
SchemaMutationExecutionReadiness::PublishableNow => {
return Err(SchemaMutationSupportedPathRejection::NoPhysicalWork);
}
SchemaMutationExecutionReadiness::RequiresPhysicalRunner(
RebuildRequirement::IndexRebuildRequired,
) => {}
SchemaMutationExecutionReadiness::Unsupported(requirement)
| SchemaMutationExecutionReadiness::RequiresPhysicalRunner(requirement) => {
return Err(
SchemaMutationSupportedPathRejection::UnsupportedRequirement(requirement),
);
}
}
let [
SchemaMutationExecutionStep::BuildFieldPathIndex { target },
SchemaMutationExecutionStep::ValidatePhysicalWork,
SchemaMutationExecutionStep::InvalidateRuntimeState,
] = self.steps.as_slice()
else {
return if self.has_unsupported_supported_path_step() {
Err(SchemaMutationSupportedPathRejection::UnsupportedMutationKind)
} else {
Err(SchemaMutationSupportedPathRejection::UnsupportedExecutionShape)
};
};
if target.key_paths().is_empty() {
return Err(SchemaMutationSupportedPathRejection::EmptyFieldPathKey);
}
Ok(SchemaMutationSupportedExecutionPath::new(target.clone()))
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::schema) enum MutationPublicationBlocker {
NotMetadataSafe(MutationCompatibility),
RebuildRequired(RebuildRequirement),
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::schema) enum MutationPublicationStatus {
Publishable,
Blocked(MutationPublicationBlocker),
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::schema) enum SchemaMutationDelta<'a> {
AppendOnlyFields(&'a [PersistedFieldSnapshot]),
AddFieldPathIndex(&'a PersistedIndexSnapshot),
AddExpressionIndex(&'a PersistedIndexSnapshot),
ExactMatch,
Incompatible,
}
pub(in crate::db::schema) fn classify_schema_mutation_delta<'a>(
actual: &PersistedSchemaSnapshot,
expected: &'a PersistedSchemaSnapshot,
) -> SchemaMutationDelta<'a> {
if actual == expected {
return SchemaMutationDelta::ExactMatch;
}
if let Some(fields) = append_only_additive_fields(actual, expected) {
return SchemaMutationDelta::AppendOnlyFields(fields);
}
if let Some(index) = single_added_index(actual, expected)
&& SchemaMutationRequest::from_accepted_field_path_index(index).is_ok()
{
return SchemaMutationDelta::AddFieldPathIndex(index);
}
if let Some(index) = single_added_index(actual, expected)
&& SchemaMutationRequest::from_accepted_expression_index(index).is_ok()
{
return SchemaMutationDelta::AddExpressionIndex(index);
}
SchemaMutationDelta::Incompatible
}
pub(in crate::db::schema) fn schema_mutation_request_for_snapshots<'a>(
actual: &PersistedSchemaSnapshot,
expected: &'a PersistedSchemaSnapshot,
) -> SchemaMutationRequest<'a> {
SchemaMutationRequest::from(classify_schema_mutation_delta(actual, expected))
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db::schema) struct MutationPlan {
mutations: Vec<SchemaMutation>,
compatibility: MutationCompatibility,
rebuild: RebuildRequirement,
}
impl MutationPlan {
pub(in crate::db::schema) const fn exact_match() -> Self {
Self {
mutations: Vec::new(),
compatibility: MutationCompatibility::MetadataOnlySafe,
rebuild: RebuildRequirement::NoRebuildRequired,
}
}
pub(in crate::db::schema) fn append_only_fields(fields: &[PersistedFieldSnapshot]) -> Self {
let mutations = fields
.iter()
.map(|field| {
if field.default().is_none() {
SchemaMutation::AddNullableField {
field_id: field.id(),
name: field.name().to_string(),
slot: field.slot(),
}
} else {
SchemaMutation::AddDefaultedField {
field_id: field.id(),
name: field.name().to_string(),
slot: field.slot(),
}
}
})
.collect();
Self {
mutations,
compatibility: MutationCompatibility::MetadataOnlySafe,
rebuild: RebuildRequirement::NoRebuildRequired,
}
}
fn field_path_index_addition(target: SchemaFieldPathIndexRebuildTarget) -> Self {
Self {
mutations: vec![SchemaMutation::AddFieldPathIndex { target }],
compatibility: MutationCompatibility::RequiresRebuild,
rebuild: RebuildRequirement::IndexRebuildRequired,
}
}
fn expression_index_addition(target: SchemaExpressionIndexRebuildTarget) -> Self {
Self {
mutations: vec![SchemaMutation::AddExpressionIndex { target }],
compatibility: MutationCompatibility::RequiresRebuild,
rebuild: RebuildRequirement::IndexRebuildRequired,
}
}
fn secondary_index_drop(target: SchemaSecondaryIndexDropCleanupTarget) -> Self {
Self {
mutations: vec![SchemaMutation::DropNonRequiredSecondaryIndex { target }],
compatibility: MutationCompatibility::RequiresRebuild,
rebuild: RebuildRequirement::IndexRebuildRequired,
}
}
fn nullability_alteration(field_id: FieldId) -> Self {
Self {
mutations: vec![SchemaMutation::AlterNullability { field_id }],
compatibility: MutationCompatibility::UnsupportedPreOne,
rebuild: RebuildRequirement::Unsupported,
}
}
const fn incompatible() -> Self {
Self {
mutations: Vec::new(),
compatibility: MutationCompatibility::Incompatible,
rebuild: RebuildRequirement::FullDataRewriteRequired,
}
}
#[allow(
dead_code,
reason = "mutation diagnostics and DDL lowering will consume this in the next 0.152 slice"
)]
#[must_use]
pub(in crate::db::schema) const fn mutations(&self) -> &[SchemaMutation] {
self.mutations.as_slice()
}
#[allow(
dead_code,
reason = "mutation diagnostics and DDL lowering will consume this in the next 0.152 slice"
)]
#[must_use]
pub(in crate::db::schema) const fn compatibility(&self) -> MutationCompatibility {
self.compatibility
}
#[allow(
dead_code,
reason = "mutation diagnostics and DDL lowering will consume this in the next 0.152 slice"
)]
#[must_use]
pub(in crate::db::schema) const fn rebuild_requirement(&self) -> RebuildRequirement {
self.rebuild
}
#[must_use]
pub(in crate::db::schema) fn publication_status(&self) -> MutationPublicationStatus {
if !matches!(self.compatibility, MutationCompatibility::MetadataOnlySafe) {
return MutationPublicationStatus::Blocked(
MutationPublicationBlocker::NotMetadataSafe(self.compatibility),
);
}
if let Some(blocker) = self.rebuild_plan().publication_blocker() {
return MutationPublicationStatus::Blocked(blocker);
}
MutationPublicationStatus::Publishable
}
#[allow(
dead_code,
reason = "0.152 stages runner preflight publication checks before physical runners consume them"
)]
#[must_use]
pub(in crate::db::schema) fn publication_preflight(
&self,
runner: &SchemaMutationRunnerContract,
) -> MutationPublicationPreflight {
match runner.preflight(&self.execution_plan()) {
SchemaMutationRunnerPreflight::NoPhysicalWork => match self.publication_status() {
MutationPublicationStatus::Publishable => {
MutationPublicationPreflight::PublishableNow
}
MutationPublicationStatus::Blocked(blocker) => {
MutationPublicationPreflight::Blocked(blocker)
}
},
SchemaMutationRunnerPreflight::Ready {
step_count,
required,
} => MutationPublicationPreflight::PhysicalWorkReady {
step_count,
required,
},
SchemaMutationRunnerPreflight::MissingCapabilities { missing } => {
MutationPublicationPreflight::MissingRunnerCapabilities { missing }
}
SchemaMutationRunnerPreflight::Rejected { requirement } => {
MutationPublicationPreflight::Rejected { requirement }
}
}
}
#[must_use]
pub(in crate::db::schema) fn rebuild_plan(&self) -> SchemaRebuildPlan {
if matches!(self.rebuild, RebuildRequirement::NoRebuildRequired) {
return SchemaRebuildPlan::no_rebuild();
}
let mut actions = Vec::new();
for mutation in &self.mutations {
match mutation {
SchemaMutation::AddNullableField { .. }
| SchemaMutation::AddDefaultedField { .. } => {}
SchemaMutation::AddFieldPathIndex { target } => {
actions.push(SchemaRebuildAction::BuildFieldPathIndex {
target: target.clone(),
});
}
SchemaMutation::AddExpressionIndex { target } => {
actions.push(SchemaRebuildAction::BuildExpressionIndex {
target: target.clone(),
});
}
SchemaMutation::DropNonRequiredSecondaryIndex { target } => {
actions.push(SchemaRebuildAction::DropSecondaryIndex {
target: target.clone(),
});
}
SchemaMutation::AlterNullability { .. } => {
actions.push(SchemaRebuildAction::Unsupported {
reason: "alter nullability requires data proof or rewrite",
});
}
}
}
if actions.is_empty() {
actions.push(match self.rebuild {
RebuildRequirement::FullDataRewriteRequired => SchemaRebuildAction::RewriteAllRows,
RebuildRequirement::Unsupported => SchemaRebuildAction::Unsupported {
reason: "unsupported schema mutation",
},
RebuildRequirement::IndexRebuildRequired => SchemaRebuildAction::Unsupported {
reason: "index rebuild mutation lacks an index target",
},
RebuildRequirement::NoRebuildRequired => {
unreachable!("no-rebuild plans returned before rebuild action derivation",)
}
});
}
SchemaRebuildPlan::new(self.rebuild, actions)
}
#[allow(
dead_code,
reason = "0.152 stages execution-boundary contracts before physical runners consume them"
)]
#[must_use]
pub(in crate::db::schema) fn execution_plan(&self) -> SchemaMutationExecutionPlan {
SchemaMutationExecutionPlan::from_rebuild_plan(self.rebuild_plan())
}
#[allow(
dead_code,
reason = "0.154 starts supported-path admission before reconciliation consumes it"
)]
pub(in crate::db::schema) fn supported_developer_physical_path(
&self,
) -> Result<SchemaMutationSupportedExecutionPath, SchemaMutationSupportedPathRejection> {
let [SchemaMutation::AddFieldPathIndex { target }] = self.mutations.as_slice() else {
return match self.rebuild {
RebuildRequirement::NoRebuildRequired => {
Err(SchemaMutationSupportedPathRejection::NoPhysicalWork)
}
RebuildRequirement::IndexRebuildRequired => {
Err(SchemaMutationSupportedPathRejection::UnsupportedMutationKind)
}
RebuildRequirement::FullDataRewriteRequired | RebuildRequirement::Unsupported => {
Err(SchemaMutationSupportedPathRejection::UnsupportedRequirement(self.rebuild))
}
};
};
let supported = self.execution_plan().supported_developer_execution_path()?;
if supported.target() != target {
return Err(SchemaMutationSupportedPathRejection::UnsupportedExecutionShape);
}
Ok(supported)
}
#[cfg(test)]
pub(in crate::db::schema) fn added_field_count(&self) -> usize {
self.mutations
.iter()
.filter(|mutation| {
matches!(
mutation,
SchemaMutation::AddNullableField { .. }
| SchemaMutation::AddDefaultedField { .. }
)
})
.count()
}
#[allow(
dead_code,
reason = "0.152 stages mutation audit identity before diagnostics expose it"
)]
pub(in crate::db::schema) fn fingerprint(&self) -> [u8; 16] {
let mut hasher = new_hash_sha256_prefixed(SCHEMA_MUTATION_FINGERPRINT_PROFILE_TAG);
write_hash_tag_u8(&mut hasher, self.compatibility.tag());
write_hash_tag_u8(&mut hasher, self.rebuild.tag());
write_hash_u32(
&mut hasher,
u32::try_from(self.mutations.len()).unwrap_or(u32::MAX),
);
for mutation in &self.mutations {
mutation.hash_into(&mut hasher);
}
let digest = finalize_hash_sha256(hasher);
let mut fingerprint = [0u8; 16];
fingerprint.copy_from_slice(&digest[..16]);
fingerprint
}
}
impl SchemaMutationRequest<'_> {
#[must_use]
pub(in crate::db::schema) fn lower_to_plan(self) -> MutationPlan {
match self {
Self::ExactMatch => MutationPlan::exact_match(),
Self::AppendOnlyFields(fields) => MutationPlan::append_only_fields(fields),
Self::AddFieldPathIndex { target } => MutationPlan::field_path_index_addition(target),
Self::AddExpressionIndex { target } => MutationPlan::expression_index_addition(target),
Self::DropNonRequiredSecondaryIndex { target } => {
MutationPlan::secondary_index_drop(target)
}
Self::AlterNullability { field_id } => MutationPlan::nullability_alteration(field_id),
Self::Incompatible => MutationPlan::incompatible(),
}
}
}
impl<'a> From<SchemaMutationDelta<'a>> for SchemaMutationRequest<'a> {
fn from(delta: SchemaMutationDelta<'a>) -> Self {
match delta {
SchemaMutationDelta::AppendOnlyFields(fields) => Self::AppendOnlyFields(fields),
SchemaMutationDelta::AddFieldPathIndex(index) => {
Self::from_accepted_field_path_index(index).unwrap_or(Self::Incompatible)
}
SchemaMutationDelta::AddExpressionIndex(index) => {
Self::from_accepted_expression_index(index).unwrap_or(Self::Incompatible)
}
SchemaMutationDelta::ExactMatch => Self::ExactMatch,
SchemaMutationDelta::Incompatible => Self::Incompatible,
}
}
}
impl SchemaMutation {
#[allow(
dead_code,
reason = "used by mutation fingerprint tests until audit identity is surfaced in diagnostics"
)]
fn hash_into(&self, hasher: &mut sha2::Sha256) {
match self {
Self::AddNullableField {
field_id,
name,
slot,
} => {
write_hash_tag_u8(hasher, 1);
hash_field_identity(hasher, *field_id, name, *slot);
}
Self::AddDefaultedField {
field_id,
name,
slot,
} => {
write_hash_tag_u8(hasher, 2);
hash_field_identity(hasher, *field_id, name, *slot);
}
Self::AddFieldPathIndex { target } => {
write_hash_tag_u8(hasher, 3);
target.hash_into(hasher);
}
Self::AddExpressionIndex { target } => {
write_hash_tag_u8(hasher, 4);
target.hash_into(hasher);
}
Self::DropNonRequiredSecondaryIndex { target } => {
write_hash_tag_u8(hasher, 5);
target.hash_into(hasher);
}
Self::AlterNullability { field_id } => {
write_hash_tag_u8(hasher, 6);
write_hash_u32(hasher, field_id.get());
}
}
}
}
impl MutationCompatibility {
#[allow(
dead_code,
reason = "used by mutation fingerprint tests until audit identity is surfaced in diagnostics"
)]
const fn tag(self) -> u8 {
match self {
Self::MetadataOnlySafe => 1,
Self::RequiresRebuild => 2,
Self::UnsupportedPreOne => 3,
Self::Incompatible => 4,
}
}
}
impl RebuildRequirement {
#[allow(
dead_code,
reason = "used by mutation fingerprint tests until audit identity is surfaced in diagnostics"
)]
const fn tag(self) -> u8 {
match self {
Self::NoRebuildRequired => 1,
Self::IndexRebuildRequired => 2,
Self::FullDataRewriteRequired => 3,
Self::Unsupported => 4,
}
}
}
#[allow(
dead_code,
reason = "used by mutation fingerprint tests until audit identity is surfaced in diagnostics"
)]
fn hash_field_identity(
hasher: &mut sha2::Sha256,
field_id: FieldId,
name: &str,
slot: SchemaFieldSlot,
) {
write_hash_u32(hasher, field_id.get());
write_hash_str_u32(hasher, name);
write_hash_u32(hasher, u32::from(slot.get()));
}
#[allow(
dead_code,
reason = "used by mutation fingerprint tests until audit identity is surfaced in diagnostics"
)]
fn write_hash_bool(hasher: &mut sha2::Sha256, value: bool) {
write_hash_tag_u8(hasher, u8::from(value));
}
#[allow(
dead_code,
reason = "0.152 stages runner capability contracts before physical runners consume them"
)]
fn push_runner_capability_once(
capabilities: &mut Vec<SchemaMutationRunnerCapability>,
capability: SchemaMutationRunnerCapability,
) {
if !capabilities.contains(&capability) {
capabilities.push(capability);
}
}
#[allow(
dead_code,
reason = "0.153 stages runtime epoch identity before physical runners publish snapshots"
)]
fn runtime_epoch_fingerprint(
snapshot: &PersistedSchemaSnapshot,
) -> Result<[u8; 16], InternalError> {
let encoded_snapshot = encode_persisted_schema_snapshot(snapshot)?;
let mut hasher = new_hash_sha256_prefixed(SCHEMA_MUTATION_RUNTIME_EPOCH_PROFILE_TAG);
write_hash_str_u32(&mut hasher, snapshot.entity_path());
write_hash_u32(&mut hasher, snapshot.version().get());
write_hash_u32(
&mut hasher,
u32::try_from(encoded_snapshot.len()).unwrap_or(u32::MAX),
);
hasher.update(encoded_snapshot);
let digest = finalize_hash_sha256(hasher);
let mut fingerprint = [0u8; 16];
fingerprint.copy_from_slice(&digest[..16]);
Ok(fingerprint)
}
fn append_only_additive_fields<'a>(
actual: &PersistedSchemaSnapshot,
expected: &'a PersistedSchemaSnapshot,
) -> Option<&'a [PersistedFieldSnapshot]> {
if actual.fields().len() >= expected.fields().len()
|| actual.row_layout().field_to_slot().len() >= expected.row_layout().field_to_slot().len()
{
return None;
}
if !actual
.fields()
.iter()
.zip(expected.fields())
.all(|(actual_field, expected_field)| actual_field == expected_field)
{
return None;
}
if !actual
.row_layout()
.field_to_slot()
.iter()
.zip(expected.row_layout().field_to_slot())
.all(|(actual_pair, expected_pair)| actual_pair == expected_pair)
{
return None;
}
Some(&expected.fields()[actual.fields().len()..])
}
fn single_added_index<'a>(
actual: &PersistedSchemaSnapshot,
expected: &'a PersistedSchemaSnapshot,
) -> Option<&'a PersistedIndexSnapshot> {
if actual.entity_path() != expected.entity_path()
|| actual.entity_name() != expected.entity_name()
|| actual.primary_key_field_ids() != expected.primary_key_field_ids()
|| actual.row_layout().field_to_slot() != expected.row_layout().field_to_slot()
|| actual.fields() != expected.fields()
|| expected.indexes().len() != actual.indexes().len().saturating_add(1)
{
return None;
}
if !actual
.indexes()
.iter()
.zip(expected.indexes())
.all(|(actual_index, expected_index)| actual_index == expected_index)
{
return None;
}
expected.indexes().last()
}
#[cfg(test)]
mod tests;