use std::fs;
use std::path::PathBuf;
use chrono::{Duration, Utc};
use clap::{Args, Subcommand};
use cortex_core::{
compose_policy_outcomes, AuthorityClass, ClaimCeiling, ClaimProofState, PolicyContribution,
PolicyOutcome, RuntimeMode,
};
use cortex_runtime::{runtime_claim_preflight_with_policy, RuntimeClaimKind};
use cortex_verifier::{
ceiling_from_state, verify_with_policy, EvidenceInput as VerifierEvidenceInput,
EvidenceKind as VerifierEvidenceKind, IndependentWitness, VerifiedTrustState,
};
use serde::Deserialize;
use serde_json::json;
use crate::cmd::from_store::{self, FromStoreArgs, Surface};
use crate::exit::Exit;
use crate::output::{self, Envelope};
const COMPLIANCE_FORBIDDEN_USES: &[&str] = &[
"compliance_evidence_artifact",
"trusted_stdout_artifact",
"trusted_artifact_emission",
"release_readiness_artifact",
"cross_system_trust_decision",
"external_reporting",
];
const DEFAULT_MAX_WITNESS_AGE_SECONDS: u64 = 86_400;
#[derive(Debug, Subcommand)]
pub enum ComplianceSub {
Evidence(ComplianceEvidenceArgs),
}
#[derive(Debug, Args)]
pub struct ComplianceEvidenceArgs {
#[arg(long, value_name = "PATH", conflicts_with = "evidence_json")]
pub evidence_file: Option<PathBuf>,
#[arg(long, value_name = "JSON", conflicts_with = "evidence_file")]
pub evidence_json: Option<String>,
#[arg(long, value_name = "HEX")]
pub evidence_blake3: Option<String>,
#[arg(long = "witness-file", value_name = "PATH", action = clap::ArgAction::Append)]
pub witness_file: Vec<PathBuf>,
#[arg(long = "witness-json", value_name = "JSON", action = clap::ArgAction::Append)]
pub witness_json: Vec<String>,
#[arg(long = "max-witness-age-seconds", value_name = "N", default_value_t = DEFAULT_MAX_WITNESS_AGE_SECONDS)]
pub max_witness_age_seconds: u64,
#[command(flatten)]
pub from_store: FromStoreArgs,
}
pub fn run(sub: ComplianceSub) -> Exit {
match sub {
ComplianceSub::Evidence(args) => run_evidence(args),
}
}
#[derive(Debug, Deserialize)]
struct ComplianceEvidenceInput {
evidence_kind: String,
runtime_mode: RuntimeMode,
authority_class: AuthorityClass,
proof_state: ClaimProofState,
claim_ceiling: ClaimCeiling,
policy_outcome: PolicyOutcome,
source_refs: Vec<String>,
}
struct ComplianceEvidenceEvaluation {
contributions: Vec<PolicyContribution>,
evidence_report: serde_json::Value,
runtime_mode: RuntimeMode,
authority_class: AuthorityClass,
proof_state: ClaimProofState,
requested_ceiling: ClaimCeiling,
verifier_state: Option<VerifiedTrustState>,
}
fn run_evidence(args: ComplianceEvidenceArgs) -> Exit {
if args.from_store.from_store {
return run_evidence_from_store(args);
}
let evaluation = match evaluate_compliance_evidence(args) {
Ok(evaluation) => evaluation,
Err(err) => {
eprintln!("cortex compliance evidence: failed to build policy contribution: {err:?}");
return Exit::Internal;
}
};
let policy = compose_policy_outcomes(evaluation.contributions, None);
let preflight = runtime_claim_preflight_with_policy(
"compliance evidence artifact",
RuntimeClaimKind::ComplianceEvidence,
evaluation.runtime_mode,
evaluation.authority_class,
evaluation.proof_state,
evaluation.requested_ceiling,
&policy,
);
let trusted_artifact_emitted = evaluation
.verifier_state
.as_ref()
.is_some_and(VerifiedTrustState::is_full_chain_verified)
&& preflight.allowed;
let independent_verification = trusted_artifact_emitted;
let verifier_report = verifier_report_value(&evaluation.verifier_state);
let report = json!({
"command": "compliance.evidence",
"artifact_emitted": trusted_artifact_emitted,
"trusted_artifact_emitted": trusted_artifact_emitted,
"trusted_stdout_artifact": trusted_artifact_emitted,
"independent_verification": independent_verification,
"forbidden_uses": COMPLIANCE_FORBIDDEN_USES,
"runtime_mode": preflight.claim.runtime_mode,
"proof_state": preflight.claim.proof_state,
"claim_ceiling": preflight.claim.effective_ceiling,
"required_ceiling": preflight.claim.required_ceiling,
"claim_allowed": preflight.allowed,
"preflight_reason": preflight.reason,
"downgrade_reasons": preflight.claim.reasons,
"policy_outcome": policy.final_outcome,
"policy_contributing": policy.contributing,
"policy_discarded": policy.discarded,
"evidence_input": evaluation.evidence_report,
"verifier": verifier_report,
});
let exit = Exit::PreconditionUnmet;
if output::json_enabled() {
let policy_summary = serde_json::json!({
"final_outcome": policy.final_outcome,
"contributing": policy.contributing,
"discarded": policy.discarded,
});
let envelope = Envelope::new("cortex.compliance.evidence", exit, report)
.with_policy_outcome(policy_summary);
return output::emit(&envelope, exit);
}
match serde_json::to_string_pretty(&report) {
Ok(serialized) => eprintln!("{serialized}"),
Err(err) => {
eprintln!("cortex compliance evidence: failed to serialize preflight report: {err}");
return Exit::Internal;
}
}
if trusted_artifact_emitted {
Exit::Ok
} else {
Exit::PreconditionUnmet
}
}
fn run_evidence_from_store(args: ComplianceEvidenceArgs) -> Exit {
let assembly = match from_store::assemble(&args.from_store, Surface::Compliance) {
Ok(a) => a,
Err(exit) => return exit,
};
from_store::emit_report(
&assembly,
"compliance.evidence.from_store",
"cortex.compliance.evidence",
COMPLIANCE_FORBIDDEN_USES,
)
}
fn evaluate_compliance_evidence(
args: ComplianceEvidenceArgs,
) -> Result<ComplianceEvidenceEvaluation, cortex_core::PolicyError> {
let raw = match read_compliance_evidence_input(&args) {
Ok(Some(raw)) => raw,
Ok(None) => {
return Ok(ComplianceEvidenceEvaluation {
contributions: vec![PolicyContribution::new(
"compliance.external_authority",
PolicyOutcome::Reject,
"external anchoring and authority-grade proof are unavailable; compliance evidence cannot be claimed",
)?],
evidence_report: json!({
"present": false,
"accepted": false,
"accepted_as": "none",
"trusted_evidence_input_present": false,
"verification_state": "missing_input",
"failed_rule": "compliance.evidence_input.missing",
"missing_evidence": ["compliance.evidence_input"],
"independent_verification": false,
"trusted_artifact_emitted": false,
"forbidden_uses": COMPLIANCE_FORBIDDEN_USES,
"reason": "missing compliance evidence input",
}),
runtime_mode: RuntimeMode::LocalUnsigned,
authority_class: AuthorityClass::Observed,
proof_state: ClaimProofState::Partial,
requested_ceiling: ClaimCeiling::AuthorityGrade,
verifier_state: None,
});
}
Err(err) => {
return Ok(ComplianceEvidenceEvaluation {
contributions: vec![PolicyContribution::new(
"compliance.evidence_input_read",
PolicyOutcome::Reject,
format!("compliance evidence input could not be read: {err}"),
)?],
evidence_report: json!({
"present": true,
"accepted": false,
"accepted_as": "none",
"trusted_evidence_input_present": false,
"verification_state": "read_failed",
"failed_rule": "compliance.evidence_input.read_failed",
"missing_evidence": ["compliance.evidence_input_readable"],
"independent_verification": false,
"trusted_artifact_emitted": false,
"forbidden_uses": COMPLIANCE_FORBIDDEN_USES,
"reason": "compliance evidence input could not be read",
}),
runtime_mode: RuntimeMode::Unknown,
authority_class: AuthorityClass::Untrusted,
proof_state: ClaimProofState::Unknown,
requested_ceiling: ClaimCeiling::AuthorityGrade,
verifier_state: None,
});
}
};
let input_text = match std::str::from_utf8(&raw) {
Ok(text) => text,
Err(err) => {
return Ok(ComplianceEvidenceEvaluation {
contributions: vec![PolicyContribution::new(
"compliance.evidence_input_parse",
PolicyOutcome::Reject,
format!("compliance evidence input is not UTF-8 JSON: {err}"),
)?],
evidence_report: json!({
"present": true,
"accepted": false,
"accepted_as": "none",
"trusted_evidence_input_present": false,
"verification_state": "parse_failed",
"failed_rule": "compliance.evidence_input.invalid_json",
"missing_evidence": ["compliance.evidence_input_valid_authority_json"],
"independent_verification": false,
"trusted_artifact_emitted": false,
"forbidden_uses": COMPLIANCE_FORBIDDEN_USES,
"reason": "invalid compliance evidence JSON",
}),
runtime_mode: RuntimeMode::Unknown,
authority_class: AuthorityClass::Untrusted,
proof_state: ClaimProofState::Unknown,
requested_ceiling: ClaimCeiling::AuthorityGrade,
verifier_state: None,
});
}
};
let input = match serde_json::from_str::<ComplianceEvidenceInput>(input_text) {
Ok(input) => input,
Err(err) => {
return Ok(ComplianceEvidenceEvaluation {
contributions: vec![PolicyContribution::new(
"compliance.evidence_input_parse",
PolicyOutcome::Reject,
format!("compliance evidence input is not valid authority JSON: {err}"),
)?],
evidence_report: json!({
"present": true,
"accepted": false,
"accepted_as": "none",
"trusted_evidence_input_present": false,
"verification_state": "parse_failed",
"failed_rule": "compliance.evidence_input.invalid_json",
"missing_evidence": ["compliance.evidence_input_valid_authority_json"],
"independent_verification": false,
"trusted_artifact_emitted": false,
"forbidden_uses": COMPLIANCE_FORBIDDEN_USES,
"reason": "invalid compliance evidence JSON",
}),
runtime_mode: RuntimeMode::Unknown,
authority_class: AuthorityClass::Untrusted,
proof_state: ClaimProofState::Unknown,
requested_ceiling: ClaimCeiling::AuthorityGrade,
verifier_state: None,
});
}
};
if compliance_evidence_is_advisory_only(&input) {
return Ok(ComplianceEvidenceEvaluation {
contributions: vec![PolicyContribution::new(
"compliance.external_authority",
PolicyOutcome::Reject,
"compliance evidence input is advisory-only and cannot support compliance evidence",
)?],
evidence_report: json!({
"present": true,
"accepted": false,
"accepted_as": "none",
"trusted_evidence_input_present": false,
"verification_state": "advisory_only",
"failed_rule": "compliance.evidence_input.advisory_only",
"missing_evidence": ["compliance.authority_grade_external_authority_evidence"],
"independent_verification": false,
"trusted_artifact_emitted": false,
"forbidden_uses": COMPLIANCE_FORBIDDEN_USES,
"reason": "compliance evidence input is advisory-only and cannot support a trusted compliance artifact",
"evidence_kind": input.evidence_kind,
"runtime_mode": input.runtime_mode,
"authority_class": input.authority_class,
"proof_state": input.proof_state,
"claim_ceiling": input.claim_ceiling,
"input_policy_outcome": input.policy_outcome,
"source_ref_count": input.source_refs.len(),
"advisory_only": true,
}),
runtime_mode: input.runtime_mode,
authority_class: input.authority_class,
proof_state: input.proof_state,
requested_ceiling: input.claim_ceiling,
verifier_state: None,
});
}
let actual_blake3 = blake3_hex(&raw);
let digest_verified = args
.evidence_blake3
.as_deref()
.map(|expected| expected == actual_blake3)
.unwrap_or(false);
let source_refs_present = !input.source_refs.is_empty();
let source_refs_valid = input
.source_refs
.iter()
.all(|source_ref| compliance_source_ref_has_authority_shape(source_ref));
let authority_grade_shape = input.evidence_kind == "external_authority"
&& source_refs_valid
&& source_refs_present
&& digest_verified
&& input.runtime_mode == RuntimeMode::AuthorityGrade
&& input.authority_class == AuthorityClass::Operator
&& input.proof_state == ClaimProofState::FullChainVerified
&& input.claim_ceiling == ClaimCeiling::AuthorityGrade
&& input.policy_outcome == PolicyOutcome::Allow;
if !authority_grade_shape {
let failed_rule = compliance_evidence_failed_rule(
&args,
&input,
digest_verified,
source_refs_present,
source_refs_valid,
);
let missing_evidence = compliance_missing_evidence_for_failed_rule(failed_rule);
return Ok(ComplianceEvidenceEvaluation {
contributions: vec![PolicyContribution::new(
"compliance.external_authority",
PolicyOutcome::Reject,
"compliance evidence input is missing authority-grade external authority proof",
)?],
evidence_report: json!({
"present": true,
"accepted": false,
"accepted_as": "none",
"trusted_evidence_input_present": false,
"verification_state": "rejected",
"failed_rule": failed_rule,
"missing_evidence": missing_evidence,
"independent_verification": false,
"trusted_artifact_emitted": false,
"forbidden_uses": COMPLIANCE_FORBIDDEN_USES,
"reason": "compliance evidence input is not authority-grade external authority evidence",
"evidence_kind": input.evidence_kind,
"input_policy_outcome": input.policy_outcome,
"source_ref_count": input.source_refs.len(),
"source_refs_present": source_refs_present,
"source_refs_valid": source_refs_valid,
"digest_verified": digest_verified,
}),
runtime_mode: input.runtime_mode,
authority_class: input.authority_class,
proof_state: input.proof_state,
requested_ceiling: input.claim_ceiling,
verifier_state: None,
});
}
let witnesses = match load_witnesses(&args) {
Ok(witnesses) => witnesses,
Err(detail) => {
return Ok(ComplianceEvidenceEvaluation {
contributions: vec![PolicyContribution::new(
"compliance.evidence_input_verification",
PolicyOutcome::Reject,
format!("witness load failed: {detail}"),
)?],
evidence_report: json!({
"present": true,
"accepted": false,
"accepted_as": "none",
"trusted_evidence_input_present": false,
"verification_state": "rejected",
"failed_rule": "compliance.evidence_input.witness_load_failed",
"missing_evidence": ["compliance.independent_witnesses"],
"independent_verification": false,
"trusted_artifact_emitted": false,
"forbidden_uses": COMPLIANCE_FORBIDDEN_USES,
"reason": "compliance witness input could not be loaded",
"evidence_kind": input.evidence_kind,
"source_ref_count": input.source_refs.len(),
"evidence_blake3": actual_blake3,
"witness_load_error": detail,
}),
runtime_mode: input.runtime_mode,
authority_class: input.authority_class,
proof_state: input.proof_state,
requested_ceiling: input.claim_ceiling,
verifier_state: None,
});
}
};
let verifier_input = VerifierEvidenceInput {
kind: VerifierEvidenceKind::ComplianceEvidence,
evidence_blake3: actual_blake3.clone(),
runtime_mode: input.runtime_mode,
authority_class: input.authority_class,
proof_state: input.proof_state,
requested_ceiling: input.claim_ceiling,
source_refs: input.source_refs.clone(),
advisory_only: false,
};
let max_age = Duration::seconds(args.max_witness_age_seconds as i64);
let verifier_state = verify_with_policy(&verifier_input, &witnesses, Utc::now(), max_age, None);
let (verification_state, failed_rule, accepted, missing_evidence, reason): (
&str,
&str,
bool,
Vec<&str>,
String,
) = match &verifier_state {
VerifiedTrustState::FullChainVerified { .. } => (
"full_chain_verified",
"compliance.evidence_input.full_chain_verified",
true,
vec![],
"compliance evidence input passed independent verification".into(),
),
VerifiedTrustState::Partial { reasons, .. } => (
"partial",
"compliance.evidence_input.independent_verification_partial",
false,
vec![
"compliance.external_authority_independent_verification",
"compliance.trusted_artifact_emission",
],
format!(
"compliance evidence input parsed, but independent verification is partial: {}",
reasons.join("; ")
),
),
VerifiedTrustState::Broken { edge, .. } => (
"broken",
"compliance.evidence_input.independent_verification_broken",
false,
vec![
"compliance.external_authority_independent_verification",
"compliance.trusted_artifact_emission",
],
format!(
"compliance evidence input parsed, but independent verification broke at {}: {}",
edge.invariant, edge.detail
),
),
};
let trusted_evidence_input_present =
matches!(verifier_state, VerifiedTrustState::FullChainVerified { .. });
let composer_outcome = if trusted_evidence_input_present {
PolicyOutcome::Allow
} else {
PolicyOutcome::Reject
};
let composer_reason = if trusted_evidence_input_present {
"compliance evidence input passed independent verification".to_string()
} else {
reason.clone()
};
Ok(ComplianceEvidenceEvaluation {
contributions: vec![PolicyContribution::new(
"compliance.evidence_input_verification",
composer_outcome,
composer_reason,
)?],
evidence_report: json!({
"present": true,
"accepted": accepted,
"accepted_as": if accepted { "trusted_external_authority_evidence" } else { "none" },
"trusted_evidence_input_present": trusted_evidence_input_present,
"verification_state": verification_state,
"failed_rule": failed_rule,
"missing_evidence": missing_evidence,
"independent_verification": trusted_evidence_input_present,
"trusted_artifact_emitted": trusted_evidence_input_present,
"forbidden_uses": COMPLIANCE_FORBIDDEN_USES,
"reason": reason,
"evidence_kind": input.evidence_kind,
"source_ref_count": input.source_refs.len(),
"source_refs_present": true,
"source_refs_valid": true,
"digest_verified": true,
"evidence_blake3": actual_blake3,
}),
runtime_mode: input.runtime_mode,
authority_class: input.authority_class,
proof_state: input.proof_state,
requested_ceiling: input.claim_ceiling,
verifier_state: Some(verifier_state),
})
}
fn verifier_report_value(state: &Option<VerifiedTrustState>) -> serde_json::Value {
let Some(state) = state else {
return json!({ "invoked": false });
};
let ceiling = ceiling_from_state(state);
let (witnesses, reasons, edge) = match state {
VerifiedTrustState::FullChainVerified { witnesses, .. } => {
(witnesses.clone(), Vec::new(), None)
}
VerifiedTrustState::Partial { reasons, witnesses } => {
(witnesses.clone(), reasons.clone(), None)
}
VerifiedTrustState::Broken { edge, witnesses } => {
(witnesses.clone(), Vec::new(), Some(edge.clone()))
}
};
json!({
"invoked": true,
"state": state.wire_str(),
"ceiling": ceiling,
"reasons": reasons,
"broken_edge": edge,
"witnesses": witnesses,
})
}
fn load_witnesses(args: &ComplianceEvidenceArgs) -> Result<Vec<IndependentWitness>, String> {
let mut witnesses: Vec<IndependentWitness> = Vec::new();
for path in &args.witness_file {
let bytes = fs::read(path).map_err(|err| {
format!(
"witness file {path} failed to read: {err}",
path = path.display()
)
})?;
let witness: IndependentWitness = serde_json::from_slice(&bytes).map_err(|err| {
format!(
"witness file {path} is not valid IndependentWitness JSON: {err}",
path = path.display()
)
})?;
witnesses.push(witness);
}
for inline in &args.witness_json {
let witness: IndependentWitness = serde_json::from_str(inline)
.map_err(|err| format!("--witness-json is not valid IndependentWitness JSON: {err}"))?;
witnesses.push(witness);
}
Ok(witnesses)
}
fn compliance_missing_evidence_for_failed_rule(failed_rule: &str) -> Vec<&'static str> {
match failed_rule {
"compliance.evidence_input.missing_digest" => vec!["compliance.evidence_input_blake3"],
"compliance.evidence_input.digest_mismatch" => {
vec!["compliance.evidence_input_matching_blake3"]
}
"compliance.evidence_input.missing_source_refs" => {
vec!["compliance.evidence_input_source_refs"]
}
"compliance.evidence_input.malformed_source_refs" => {
vec!["compliance.authority_source_refs"]
}
"compliance.evidence_input.wrong_evidence_kind" => {
vec!["compliance.external_authority_evidence_kind"]
}
_ => vec!["compliance.authority_grade_external_authority_evidence"],
}
}
fn compliance_evidence_is_advisory_only(input: &ComplianceEvidenceInput) -> bool {
matches!(
input.runtime_mode,
RuntimeMode::Dev | RuntimeMode::LocalUnsigned
) || matches!(
input.evidence_kind.as_str(),
"cortex_pre_v2_backup"
| "pre_v2_backup"
| "run_ledger_drill"
| "development_ledger"
| "local_development"
)
}
fn compliance_evidence_failed_rule(
args: &ComplianceEvidenceArgs,
input: &ComplianceEvidenceInput,
digest_verified: bool,
source_refs_present: bool,
source_refs_valid: bool,
) -> &'static str {
if args.evidence_blake3.is_none() {
"compliance.evidence_input.missing_digest"
} else if !digest_verified {
"compliance.evidence_input.digest_mismatch"
} else if !source_refs_present {
"compliance.evidence_input.missing_source_refs"
} else if !source_refs_valid {
"compliance.evidence_input.malformed_source_refs"
} else if input.evidence_kind != "external_authority" {
"compliance.evidence_input.wrong_evidence_kind"
} else {
"compliance.evidence_input.not_authority_grade"
}
}
fn read_compliance_evidence_input(
args: &ComplianceEvidenceArgs,
) -> Result<Option<Vec<u8>>, std::io::Error> {
match (&args.evidence_file, &args.evidence_json) {
(Some(path), None) => fs::read(path).map(Some),
(None, Some(json)) => Ok(Some(json.as_bytes().to_vec())),
(None, None) => Ok(None),
_ => unreachable!("clap prevents multiple compliance evidence inputs"),
}
}
fn compliance_source_ref_has_authority_shape(source_ref: &str) -> bool {
let Some((scheme, rest)) = source_ref.split_once("://") else {
return false;
};
matches!(scheme, "signed" | "https")
&& !rest.trim().is_empty()
&& rest == rest.trim()
&& !rest.chars().any(char::is_whitespace)
}
fn blake3_hex(bytes: &[u8]) -> String {
blake3::hash(bytes).to_hex().to_string()
}