use super::{
ArtifactSourceV1, DEPLOYMENT_TRUTH_SCHEMA_VERSION, DeploymentPlanV1, PromotionArtifactLevelV1,
PromotionPlanTransformEvidenceV1, PromotionPlanTransformV1, PromotionReadinessStatusV1,
PromotionReadinessV1, RoleArtifactSourceKindV1, RoleArtifactSourceV1, RoleArtifactV1,
RolePromotionInputV1, RolePromotionPlanTransformV1, RolePromotionReadinessV1, SafetyFindingV1,
SafetySeverityV1,
};
use thiserror::Error as ThisError;
#[derive(Debug, ThisError)]
pub enum PromotionArtifactSourceError {
#[error("promotion artifact source is missing required field: {field}")]
MissingRequiredField { field: &'static str },
#[error("promotion artifact source field {field} must be a lowercase sha256 hex digest")]
InvalidSha256Digest { field: &'static str },
#[error("promotion artifact source kind {kind:?} requires a digest pin")]
MissingDigestPin { kind: RoleArtifactSourceKindV1 },
#[error("promotion artifact source kind {kind:?} cannot carry previous receipt kind")]
UnexpectedPreviousReceiptKind { kind: RoleArtifactSourceKindV1 },
#[error(
"promotion artifact source kind PreviousReceiptArtifact requires an eligible receipt kind"
)]
MissingPreviousReceiptKind,
}
#[derive(Debug, ThisError)]
pub enum PromotionReadinessError {
#[error("promotion readiness schema mismatch: expected {expected}, found {found}")]
SchemaVersionMismatch { expected: u32, found: u32 },
#[error("promotion readiness is missing required field: {field}")]
MissingRequiredField { field: &'static str },
#[error("promotion readiness status {status:?} does not match blocker count {blocker_count}")]
StatusBlockerMismatch {
status: PromotionReadinessStatusV1,
blocker_count: usize,
},
#[error("promotion readiness contains duplicate role: {role}")]
DuplicateRole { role: String },
#[error("promotion readiness role {role} has inconsistent restage state")]
RestageStateMismatch { role: String },
#[error("promotion readiness finding in {field} has severity {severity:?}")]
FindingSeverityMismatch {
field: &'static str,
severity: SafetySeverityV1,
},
#[error("promotion readiness field {field} must be a lowercase sha256 hex digest")]
InvalidSha256Digest { field: &'static str },
}
#[derive(Debug, ThisError)]
pub enum PromotionPlanTransformError {
#[error("promotion plan transform schema mismatch: expected {expected}, found {found}")]
SchemaVersionMismatch { expected: u32, found: u32 },
#[error("promotion plan transform is missing required field: {field}")]
MissingRequiredField { field: &'static str },
#[error("promotion readiness validation failed: {0}")]
Readiness(#[from] PromotionReadinessError),
#[error("promotion readiness is blocked with {blocker_count} blocker(s)")]
ReadinessBlocked { blocker_count: usize },
#[error("promotion target plan is missing role: {role}")]
TargetRoleMissing { role: String },
#[error("promotion transform contains duplicate role: {role}")]
DuplicateRole { role: String },
#[error("promotion transform promoted plan id mismatch: expected {expected}, found {found}")]
PromotedPlanIdMismatch { expected: String, found: String },
#[error("promotion transform role {role} is missing from promoted plan")]
PromotedRoleMissing { role: String },
#[error("promotion transform role {role} has inconsistent field {field}")]
RoleStateMismatch { role: String, field: &'static str },
}
#[derive(Debug, ThisError)]
pub enum PromotionPlanTransformEvidenceError {
#[error(
"promotion plan transform evidence schema mismatch: expected {expected}, found {found}"
)]
SchemaVersionMismatch { expected: u32, found: u32 },
#[error("promotion plan transform evidence is missing required field: {field}")]
MissingRequiredField { field: &'static str },
#[error("promotion plan transform evidence has invalid transform: {0}")]
Transform(#[from] PromotionPlanTransformError),
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct PromotionReadinessRequest {
pub readiness_id: String,
pub target_plan: DeploymentPlanV1,
pub inputs: Vec<RolePromotionInputV1>,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct PromotionPlanTransformRequest {
pub promoted_plan_id: String,
pub target_plan: DeploymentPlanV1,
pub inputs: Vec<RolePromotionInputV1>,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct PromotionPlanTransformEvidenceRequest {
pub evidence_id: String,
pub generated_at: String,
pub transform: PromotionPlanTransformV1,
}
pub fn promoted_deployment_plan_from_inputs(
request: &PromotionPlanTransformRequest,
) -> Result<DeploymentPlanV1, PromotionPlanTransformError> {
Ok(promoted_deployment_plan_transform_from_inputs(request)?.promoted_plan)
}
pub fn promoted_deployment_plan_transform_from_inputs(
request: &PromotionPlanTransformRequest,
) -> Result<PromotionPlanTransformV1, PromotionPlanTransformError> {
ensure_transform_field("promoted_plan_id", &request.promoted_plan_id)?;
let readiness = promotion_readiness_from_inputs(
&request.promoted_plan_id,
&request.target_plan,
&request.inputs,
);
validate_promotion_readiness(&readiness)?;
if readiness.status == PromotionReadinessStatusV1::Blocked {
return Err(PromotionPlanTransformError::ReadinessBlocked {
blocker_count: readiness.blockers.len(),
});
}
let mut promoted_plan = request.target_plan.clone();
promoted_plan.plan_id.clone_from(&request.promoted_plan_id);
for input in &request.inputs {
let Some(role_artifact) = promoted_plan
.role_artifacts
.iter_mut()
.find(|artifact| artifact.role == input.role)
else {
return Err(PromotionPlanTransformError::TargetRoleMissing {
role: input.role.clone(),
});
};
apply_promotion_input_to_role_artifact(role_artifact, input);
}
let transform =
promotion_plan_transform_from_parts(&request.target_plan, promoted_plan, &request.inputs);
validate_promotion_plan_transform(&transform)?;
Ok(transform)
}
pub fn check_promotion_readiness(
request: &PromotionReadinessRequest,
) -> Result<PromotionReadinessV1, PromotionReadinessError> {
ensure_readiness_field("readiness_id", &request.readiness_id)?;
let readiness = promotion_readiness_from_inputs(
&request.readiness_id,
&request.target_plan,
&request.inputs,
);
validate_promotion_readiness(&readiness)?;
Ok(readiness)
}
pub fn promotion_plan_transform_evidence(
request: PromotionPlanTransformEvidenceRequest,
) -> Result<PromotionPlanTransformEvidenceV1, PromotionPlanTransformEvidenceError> {
ensure_evidence_field("evidence_id", &request.evidence_id)?;
ensure_evidence_field("generated_at", &request.generated_at)?;
validate_promotion_plan_transform(&request.transform)?;
let evidence = PromotionPlanTransformEvidenceV1 {
schema_version: DEPLOYMENT_TRUTH_SCHEMA_VERSION,
evidence_id: request.evidence_id,
generated_at: request.generated_at,
transform: request.transform,
};
validate_promotion_plan_transform_evidence(&evidence)?;
Ok(evidence)
}
pub fn validate_promotion_plan_transform_evidence(
evidence: &PromotionPlanTransformEvidenceV1,
) -> Result<(), PromotionPlanTransformEvidenceError> {
if evidence.schema_version != DEPLOYMENT_TRUTH_SCHEMA_VERSION {
return Err(PromotionPlanTransformEvidenceError::SchemaVersionMismatch {
expected: DEPLOYMENT_TRUTH_SCHEMA_VERSION,
found: evidence.schema_version,
});
}
ensure_evidence_field("evidence_id", &evidence.evidence_id)?;
ensure_evidence_field("generated_at", &evidence.generated_at)?;
validate_promotion_plan_transform(&evidence.transform)?;
Ok(())
}
pub fn validate_promotion_plan_transform(
transform: &PromotionPlanTransformV1,
) -> Result<(), PromotionPlanTransformError> {
if transform.schema_version != DEPLOYMENT_TRUTH_SCHEMA_VERSION {
return Err(PromotionPlanTransformError::SchemaVersionMismatch {
expected: DEPLOYMENT_TRUTH_SCHEMA_VERSION,
found: transform.schema_version,
});
}
ensure_transform_field("transform_id", &transform.transform_id)?;
ensure_transform_field("target_plan_id", &transform.target_plan_id)?;
ensure_transform_field("promoted_plan_id", &transform.promoted_plan_id)?;
ensure_transform_field("promoted_plan.plan_id", &transform.promoted_plan.plan_id)?;
if transform.promoted_plan.plan_id != transform.promoted_plan_id {
return Err(PromotionPlanTransformError::PromotedPlanIdMismatch {
expected: transform.promoted_plan_id.clone(),
found: transform.promoted_plan.plan_id.clone(),
});
}
ensure_unique_transform_roles(&transform.roles)?;
for role in &transform.roles {
validate_role_plan_transform(role, &transform.promoted_plan)?;
}
Ok(())
}
#[must_use]
pub fn promotion_readiness_from_inputs(
readiness_id: impl Into<String>,
target_plan: &DeploymentPlanV1,
inputs: &[RolePromotionInputV1],
) -> PromotionReadinessV1 {
let mut roles = Vec::with_capacity(inputs.len());
let mut blockers = Vec::new();
let mut warnings = Vec::new();
for input in inputs {
let target_artifact = target_plan
.role_artifacts
.iter()
.find(|artifact| artifact.role == input.role);
let Some(target_artifact) = target_artifact else {
blockers.push(promotion_finding(
"promotion_target_role_missing",
format!("target plan does not contain role {}", input.role),
SafetySeverityV1::HardFailure,
&input.role,
));
continue;
};
let role_readiness = role_promotion_readiness(input, target_artifact);
collect_role_findings(input, &role_readiness, &mut blockers, &mut warnings);
roles.push(role_readiness);
}
let status = if blockers.is_empty() {
PromotionReadinessStatusV1::Ready
} else {
PromotionReadinessStatusV1::Blocked
};
PromotionReadinessV1 {
schema_version: DEPLOYMENT_TRUTH_SCHEMA_VERSION,
readiness_id: readiness_id.into(),
target_plan_id: target_plan.plan_id.clone(),
status,
roles,
blockers,
warnings,
}
}
pub fn validate_promotion_readiness(
readiness: &PromotionReadinessV1,
) -> Result<(), PromotionReadinessError> {
if readiness.schema_version != DEPLOYMENT_TRUTH_SCHEMA_VERSION {
return Err(PromotionReadinessError::SchemaVersionMismatch {
expected: DEPLOYMENT_TRUTH_SCHEMA_VERSION,
found: readiness.schema_version,
});
}
ensure_readiness_field("readiness_id", &readiness.readiness_id)?;
ensure_readiness_field("target_plan_id", &readiness.target_plan_id)?;
ensure_readiness_status_matches_blockers(readiness)?;
ensure_unique_readiness_roles(&readiness.roles)?;
for role in &readiness.roles {
validate_role_readiness(role)?;
}
validate_readiness_findings(
"blockers",
&readiness.blockers,
SafetySeverityV1::HardFailure,
)?;
validate_readiness_findings("warnings", &readiness.warnings, SafetySeverityV1::Warning)?;
Ok(())
}
pub fn validate_role_artifact_source(
source: &RoleArtifactSourceV1,
) -> Result<(), PromotionArtifactSourceError> {
ensure_field("role", &source.role)?;
ensure_locator_requirement(source)?;
ensure_previous_receipt_requirement(source)?;
ensure_digest_requirement(source)?;
ensure_optional_sha256(
"expected_wasm_sha256",
source.expected_wasm_sha256.as_deref(),
)?;
ensure_optional_sha256(
"expected_wasm_gz_sha256",
source.expected_wasm_gz_sha256.as_deref(),
)?;
ensure_optional_sha256(
"expected_candid_sha256",
source.expected_candid_sha256.as_deref(),
)?;
ensure_optional_sha256(
"expected_canonical_embedded_config_sha256",
source.expected_canonical_embedded_config_sha256.as_deref(),
)?;
Ok(())
}
fn apply_promotion_input_to_role_artifact(
role_artifact: &mut RoleArtifactV1,
input: &RolePromotionInputV1,
) {
match input.promotion_level {
PromotionArtifactLevelV1::SealedWasm => {
role_artifact.source = artifact_source_for_promotion_source(input.source.kind);
apply_promotion_source_locator(role_artifact, &input.source);
role_artifact
.wasm_sha256
.clone_from(&input.source.expected_wasm_sha256);
role_artifact
.wasm_gz_sha256
.clone_from(&input.source.expected_wasm_gz_sha256);
role_artifact
.candid_sha256
.clone_from(&input.source.expected_candid_sha256);
role_artifact
.canonical_embedded_config_sha256
.clone_from(&input.source.expected_canonical_embedded_config_sha256);
}
PromotionArtifactLevelV1::SourceBuild => {}
}
}
const fn artifact_source_for_promotion_source(kind: RoleArtifactSourceKindV1) -> ArtifactSourceV1 {
match kind {
RoleArtifactSourceKindV1::WorkspacePackage => ArtifactSourceV1::LocalBuild,
RoleArtifactSourceKindV1::CanonicalWasmStoreDefault => ArtifactSourceV1::WasmStore,
RoleArtifactSourceKindV1::PublishedPackage
| RoleArtifactSourceKindV1::LocalWasm
| RoleArtifactSourceKindV1::LocalWasmGz
| RoleArtifactSourceKindV1::PreviousReceiptArtifact => ArtifactSourceV1::External,
}
}
fn apply_promotion_source_locator(
role_artifact: &mut RoleArtifactV1,
source: &RoleArtifactSourceV1,
) {
match source.kind {
RoleArtifactSourceKindV1::LocalWasm => {
role_artifact.wasm_path.clone_from(&source.locator);
}
RoleArtifactSourceKindV1::LocalWasmGz => {
role_artifact.wasm_gz_path.clone_from(&source.locator);
}
_ => {}
}
}
fn promotion_plan_transform_from_parts(
target_plan: &DeploymentPlanV1,
promoted_plan: DeploymentPlanV1,
inputs: &[RolePromotionInputV1],
) -> PromotionPlanTransformV1 {
let roles = inputs
.iter()
.filter_map(|input| {
let before = target_plan
.role_artifacts
.iter()
.find(|artifact| artifact.role == input.role)?;
let after = promoted_plan
.role_artifacts
.iter()
.find(|artifact| artifact.role == input.role)?;
Some(role_plan_transform(input, before, after))
})
.collect();
PromotionPlanTransformV1 {
schema_version: DEPLOYMENT_TRUTH_SCHEMA_VERSION,
transform_id: format!("promotion-transform:{}", promoted_plan.plan_id),
target_plan_id: target_plan.plan_id.clone(),
promoted_plan_id: promoted_plan.plan_id.clone(),
promoted_plan,
roles,
}
}
fn role_plan_transform(
input: &RolePromotionInputV1,
before: &RoleArtifactV1,
after: &RoleArtifactV1,
) -> RolePromotionPlanTransformV1 {
RolePromotionPlanTransformV1 {
role: input.role.clone(),
promotion_level: input.promotion_level,
source_kind: input.source.kind,
source_locator: input.source.locator.clone(),
artifact_source_before: before.source,
artifact_source_after: after.source,
wasm_sha256_before: before.wasm_sha256.clone(),
wasm_sha256_after: after.wasm_sha256.clone(),
wasm_gz_sha256_before: before.wasm_gz_sha256.clone(),
wasm_gz_sha256_after: after.wasm_gz_sha256.clone(),
candid_sha256_before: before.candid_sha256.clone(),
candid_sha256_after: after.candid_sha256.clone(),
canonical_embedded_config_sha256_before: before.canonical_embedded_config_sha256.clone(),
canonical_embedded_config_sha256_after: after.canonical_embedded_config_sha256.clone(),
artifact_identity_changed: artifact_identity_changed(before, after),
embedded_config_changed: before.canonical_embedded_config_sha256
!= after.canonical_embedded_config_sha256,
target_materialization_preserved: input.promotion_level
== PromotionArtifactLevelV1::SourceBuild
&& role_materialization_identity_matches(before, after),
}
}
fn artifact_identity_changed(before: &RoleArtifactV1, after: &RoleArtifactV1) -> bool {
before.source != after.source
|| before.wasm_path != after.wasm_path
|| before.wasm_gz_path != after.wasm_gz_path
|| before.wasm_sha256 != after.wasm_sha256
|| before.wasm_gz_sha256 != after.wasm_gz_sha256
|| before.candid_path != after.candid_path
|| before.candid_sha256 != after.candid_sha256
}
fn role_materialization_identity_matches(before: &RoleArtifactV1, after: &RoleArtifactV1) -> bool {
before.source == after.source
&& before.wasm_path == after.wasm_path
&& before.wasm_gz_path == after.wasm_gz_path
&& before.wasm_sha256 == after.wasm_sha256
&& before.wasm_gz_sha256 == after.wasm_gz_sha256
&& before.candid_path == after.candid_path
&& before.candid_sha256 == after.candid_sha256
&& before.canonical_embedded_config_sha256 == after.canonical_embedded_config_sha256
}
fn validate_role_plan_transform(
role: &RolePromotionPlanTransformV1,
promoted_plan: &DeploymentPlanV1,
) -> Result<(), PromotionPlanTransformError> {
ensure_transform_field("role", &role.role)?;
let Some(promoted_role) = promoted_plan
.role_artifacts
.iter()
.find(|artifact| artifact.role == role.role)
else {
return Err(PromotionPlanTransformError::PromotedRoleMissing {
role: role.role.clone(),
});
};
ensure_role_matches_promoted_artifact(role, promoted_role)?;
ensure_role_transform_flags_are_consistent(role)?;
Ok(())
}
fn ensure_role_matches_promoted_artifact(
role: &RolePromotionPlanTransformV1,
promoted_role: &RoleArtifactV1,
) -> Result<(), PromotionPlanTransformError> {
ensure_role_field_matches(
role,
"artifact_source_after",
role.artifact_source_after == promoted_role.source,
)?;
ensure_role_field_matches(
role,
"wasm_sha256_after",
role.wasm_sha256_after == promoted_role.wasm_sha256,
)?;
ensure_role_field_matches(
role,
"wasm_gz_sha256_after",
role.wasm_gz_sha256_after == promoted_role.wasm_gz_sha256,
)?;
ensure_role_field_matches(
role,
"candid_sha256_after",
role.candid_sha256_after == promoted_role.candid_sha256,
)?;
ensure_role_field_matches(
role,
"canonical_embedded_config_sha256_after",
role.canonical_embedded_config_sha256_after
== promoted_role.canonical_embedded_config_sha256,
)
}
fn ensure_role_transform_flags_are_consistent(
role: &RolePromotionPlanTransformV1,
) -> Result<(), PromotionPlanTransformError> {
ensure_role_field_matches(
role,
"artifact_identity_changed",
role.artifact_identity_changed == role_summary_artifact_identity_changed(role),
)?;
ensure_role_field_matches(
role,
"embedded_config_changed",
role.embedded_config_changed
== (role.canonical_embedded_config_sha256_before
!= role.canonical_embedded_config_sha256_after),
)?;
if role.target_materialization_preserved {
ensure_role_field_matches(
role,
"target_materialization_preserved",
role.promotion_level == PromotionArtifactLevelV1::SourceBuild
&& !role.artifact_identity_changed
&& !role.embedded_config_changed,
)?;
}
Ok(())
}
fn role_summary_artifact_identity_changed(role: &RolePromotionPlanTransformV1) -> bool {
role.artifact_source_before != role.artifact_source_after
|| role.wasm_sha256_before != role.wasm_sha256_after
|| role.wasm_gz_sha256_before != role.wasm_gz_sha256_after
|| role.candid_sha256_before != role.candid_sha256_after
}
fn ensure_role_field_matches(
role: &RolePromotionPlanTransformV1,
field: &'static str,
matches: bool,
) -> Result<(), PromotionPlanTransformError> {
if matches {
Ok(())
} else {
Err(PromotionPlanTransformError::RoleStateMismatch {
role: role.role.clone(),
field,
})
}
}
fn validate_role_readiness(role: &RolePromotionReadinessV1) -> Result<(), PromotionReadinessError> {
ensure_readiness_field("role", &role.role)?;
ensure_readiness_optional_sha256("source_wasm_sha256", role.source_wasm_sha256.as_deref())?;
ensure_readiness_optional_sha256(
"source_wasm_gz_sha256",
role.source_wasm_gz_sha256.as_deref(),
)?;
ensure_readiness_optional_sha256("target_wasm_sha256", role.target_wasm_sha256.as_deref())?;
ensure_readiness_optional_sha256(
"target_wasm_gz_sha256",
role.target_wasm_gz_sha256.as_deref(),
)?;
ensure_readiness_optional_sha256(
"source_canonical_embedded_config_sha256",
role.source_canonical_embedded_config_sha256.as_deref(),
)?;
ensure_readiness_optional_sha256(
"target_canonical_embedded_config_sha256",
role.target_canonical_embedded_config_sha256.as_deref(),
)?;
if role.restage_required != (role.target_store_has_artifact == Some(false)) {
return Err(PromotionReadinessError::RestageStateMismatch {
role: role.role.clone(),
});
}
Ok(())
}
fn role_promotion_readiness(
input: &RolePromotionInputV1,
target_artifact: &RoleArtifactV1,
) -> RolePromotionReadinessV1 {
let source_wasm_sha256 = input.source.expected_wasm_sha256.clone();
let source_wasm_gz_sha256 = input.source.expected_wasm_gz_sha256.clone();
let target_wasm_sha256 = target_artifact.wasm_sha256.clone();
let target_wasm_gz_sha256 = target_artifact.wasm_gz_sha256.clone();
let byte_identical_wasm =
matching_optional_digest(source_wasm_sha256.as_ref(), target_wasm_sha256.as_ref()).or_else(
|| {
matching_optional_digest(
source_wasm_gz_sha256.as_ref(),
target_wasm_gz_sha256.as_ref(),
)
},
);
let embedded_config_identical = matching_optional_digest(
input
.source
.expected_canonical_embedded_config_sha256
.as_ref(),
target_artifact.canonical_embedded_config_sha256.as_ref(),
);
RolePromotionReadinessV1 {
role: input.role.clone(),
promotion_level: input.promotion_level,
source_kind: input.source.kind,
source_locator: input.source.locator.clone(),
source_wasm_sha256,
source_wasm_gz_sha256,
target_wasm_sha256,
target_wasm_gz_sha256,
source_canonical_embedded_config_sha256: input
.source
.expected_canonical_embedded_config_sha256
.clone(),
target_canonical_embedded_config_sha256: target_artifact
.canonical_embedded_config_sha256
.clone(),
byte_identical_wasm,
embedded_config_identical,
target_store_has_artifact: input.target_store_has_artifact,
restage_required: input.target_store_has_artifact == Some(false),
}
}
fn collect_role_findings(
input: &RolePromotionInputV1,
readiness: &RolePromotionReadinessV1,
blockers: &mut Vec<SafetyFindingV1>,
warnings: &mut Vec<SafetyFindingV1>,
) {
if let Err(err) = validate_role_artifact_source(&input.source) {
blockers.push(promotion_finding(
"promotion_artifact_source_invalid",
err.to_string(),
SafetySeverityV1::HardFailure,
&input.role,
));
}
if input.role != input.source.role {
blockers.push(promotion_finding(
"promotion_source_role_mismatch",
format!(
"promotion input role {} does not match artifact source role {}",
input.role, input.source.role
),
SafetySeverityV1::HardFailure,
&input.role,
));
}
if input.require_byte_identical_wasm && readiness.byte_identical_wasm != Some(true) {
blockers.push(promotion_finding(
"promotion_wasm_digest_mismatch",
"promotion requires byte-identical wasm but source and target digests differ or are incomplete",
SafetySeverityV1::HardFailure,
&input.role,
));
}
if input.require_target_embedded_config
&& readiness
.target_canonical_embedded_config_sha256
.as_deref()
.is_none_or(str::is_empty)
{
blockers.push(promotion_finding(
"promotion_target_embedded_config_missing",
"promotion requires target canonical embedded config but target plan has no digest",
SafetySeverityV1::HardFailure,
&input.role,
));
}
if input.promotion_level == PromotionArtifactLevelV1::SealedWasm
&& readiness.embedded_config_identical != Some(true)
{
blockers.push(promotion_finding(
"promotion_sealed_wasm_embedded_config_mismatch",
"sealed wasm promotion requires embedded config identity to be acceptable for the target",
SafetySeverityV1::HardFailure,
&input.role,
));
}
if readiness.restage_required {
warnings.push(promotion_finding(
"promotion_target_store_restage_required",
"target artifact store does not already contain the artifact; restaging is required",
SafetySeverityV1::Warning,
&input.role,
));
}
}
fn matching_optional_digest(left: Option<&String>, right: Option<&String>) -> Option<bool> {
match (left.map(String::as_str), right.map(String::as_str)) {
(Some(left), Some(right)) => Some(left == right),
_ => None,
}
}
fn promotion_finding(
code: impl Into<String>,
message: impl Into<String>,
severity: SafetySeverityV1,
role: &str,
) -> SafetyFindingV1 {
SafetyFindingV1 {
code: code.into(),
message: message.into(),
severity,
subject: Some(role.to_string()),
}
}
fn ensure_locator_requirement(
source: &RoleArtifactSourceV1,
) -> Result<(), PromotionArtifactSourceError> {
match source.kind {
RoleArtifactSourceKindV1::CanonicalWasmStoreDefault => Ok(()),
_ => ensure_option_field("locator", source.locator.as_deref()),
}
}
const fn ensure_previous_receipt_requirement(
source: &RoleArtifactSourceV1,
) -> Result<(), PromotionArtifactSourceError> {
match (source.kind, source.previous_receipt_kind) {
(RoleArtifactSourceKindV1::PreviousReceiptArtifact, Some(_)) => Ok(()),
(RoleArtifactSourceKindV1::PreviousReceiptArtifact, None) => {
Err(PromotionArtifactSourceError::MissingPreviousReceiptKind)
}
(_, Some(_)) => {
Err(PromotionArtifactSourceError::UnexpectedPreviousReceiptKind { kind: source.kind })
}
(_, None) => Ok(()),
}
}
const fn ensure_digest_requirement(
source: &RoleArtifactSourceV1,
) -> Result<(), PromotionArtifactSourceError> {
let has_digest =
source.expected_wasm_sha256.is_some() || source.expected_wasm_gz_sha256.is_some();
match source.kind {
RoleArtifactSourceKindV1::LocalWasm if source.expected_wasm_sha256.is_none() => {
Err(PromotionArtifactSourceError::MissingDigestPin { kind: source.kind })
}
RoleArtifactSourceKindV1::LocalWasmGz if source.expected_wasm_gz_sha256.is_none() => {
Err(PromotionArtifactSourceError::MissingDigestPin { kind: source.kind })
}
RoleArtifactSourceKindV1::PublishedPackage
| RoleArtifactSourceKindV1::PreviousReceiptArtifact
if !has_digest =>
{
Err(PromotionArtifactSourceError::MissingDigestPin { kind: source.kind })
}
_ => Ok(()),
}
}
fn ensure_option_field(
field: &'static str,
value: Option<&str>,
) -> Result<(), PromotionArtifactSourceError> {
match value {
Some(value) => ensure_field(field, value),
None => Err(PromotionArtifactSourceError::MissingRequiredField { field }),
}
}
fn ensure_field(field: &'static str, value: &str) -> Result<(), PromotionArtifactSourceError> {
if value.trim().is_empty() {
return Err(PromotionArtifactSourceError::MissingRequiredField { field });
}
Ok(())
}
fn ensure_optional_sha256(
field: &'static str,
value: Option<&str>,
) -> Result<(), PromotionArtifactSourceError> {
let Some(value) = value else {
return Ok(());
};
if is_lower_hex_sha256(value) {
Ok(())
} else {
Err(PromotionArtifactSourceError::InvalidSha256Digest { field })
}
}
fn is_lower_hex_sha256(value: &str) -> bool {
value.len() == 64
&& value
.bytes()
.all(|byte| byte.is_ascii_hexdigit() && !byte.is_ascii_uppercase())
}
const fn ensure_readiness_status_matches_blockers(
readiness: &PromotionReadinessV1,
) -> Result<(), PromotionReadinessError> {
match (readiness.status, readiness.blockers.is_empty()) {
(PromotionReadinessStatusV1::Ready, false)
| (PromotionReadinessStatusV1::Blocked, true) => {
Err(PromotionReadinessError::StatusBlockerMismatch {
status: readiness.status,
blocker_count: readiness.blockers.len(),
})
}
_ => Ok(()),
}
}
fn ensure_unique_readiness_roles(
roles: &[RolePromotionReadinessV1],
) -> Result<(), PromotionReadinessError> {
let mut seen = std::collections::BTreeSet::new();
for role in roles {
if !seen.insert(role.role.as_str()) {
return Err(PromotionReadinessError::DuplicateRole {
role: role.role.clone(),
});
}
}
Ok(())
}
fn ensure_unique_transform_roles(
roles: &[RolePromotionPlanTransformV1],
) -> Result<(), PromotionPlanTransformError> {
let mut seen = std::collections::BTreeSet::new();
for role in roles {
if !seen.insert(role.role.as_str()) {
return Err(PromotionPlanTransformError::DuplicateRole {
role: role.role.clone(),
});
}
}
Ok(())
}
fn validate_readiness_findings(
field: &'static str,
findings: &[SafetyFindingV1],
expected_severity: SafetySeverityV1,
) -> Result<(), PromotionReadinessError> {
for finding in findings {
ensure_readiness_field("finding.code", &finding.code)?;
ensure_readiness_field("finding.message", &finding.message)?;
if finding.severity != expected_severity {
return Err(PromotionReadinessError::FindingSeverityMismatch {
field,
severity: finding.severity,
});
}
}
Ok(())
}
fn ensure_readiness_field(field: &'static str, value: &str) -> Result<(), PromotionReadinessError> {
if value.trim().is_empty() {
return Err(PromotionReadinessError::MissingRequiredField { field });
}
Ok(())
}
fn ensure_readiness_optional_sha256(
field: &'static str,
value: Option<&str>,
) -> Result<(), PromotionReadinessError> {
let Some(value) = value else {
return Ok(());
};
if is_lower_hex_sha256(value) {
Ok(())
} else {
Err(PromotionReadinessError::InvalidSha256Digest { field })
}
}
fn ensure_transform_field(
field: &'static str,
value: &str,
) -> Result<(), PromotionPlanTransformError> {
if value.trim().is_empty() {
return Err(PromotionPlanTransformError::MissingRequiredField { field });
}
Ok(())
}
fn ensure_evidence_field(
field: &'static str,
value: &str,
) -> Result<(), PromotionPlanTransformEvidenceError> {
if value.trim().is_empty() {
return Err(PromotionPlanTransformEvidenceError::MissingRequiredField { field });
}
Ok(())
}