use pqrascv_core::{
config::PolicyConfig,
crypto::{pub_key_id, CryptoBackend, MlDsaBackend, SIGNING_CONTEXT_QUOTE},
error::PqRascvError,
nonce::NonceLedger,
pki::revocation::VerifiedRevocationList,
pki::{
validate_chain, validate_chain_with_store, validate_hardware_identity, CertChain,
DeviceCertificate, TrustAnchor, TrustStore,
},
policy::{PolicyContext, PolicyEngineV2},
provenance_v2::{ExternalProvenanceBundle, SigstoreConfig, VerifiedProvenance},
quote::{AttestationQuote, Challenge, PROTOCOL_VERSION},
};
use subtle::ConstantTimeEq;
#[derive(Debug)]
pub struct VerificationResult {
pub quote: AttestationQuote,
}
impl VerificationResult {
#[must_use]
pub fn slsa_level(&self) -> u8 {
self.quote.body.provenance.slsa_level()
}
#[must_use]
pub fn firmware_hash(&self) -> &[u8; 32] {
&self.quote.body.measurements.firmware_hash
}
#[must_use]
pub fn nonce(&self) -> &[u8; 32] {
&self.quote.body.nonce
}
}
#[derive(Debug)]
pub struct PkiVerificationResult {
pub quote: AttestationQuote,
pub cert_chain: CertChain,
}
impl PkiVerificationResult {
#[must_use]
pub fn slsa_level(&self) -> u8 {
self.quote.body.provenance.slsa_level()
}
#[must_use]
pub fn firmware_hash(&self) -> &[u8; 32] {
&self.quote.body.measurements.firmware_hash
}
#[must_use]
pub fn nonce(&self) -> &[u8; 32] {
&self.quote.body.nonce
}
#[must_use]
pub fn device_serial(&self) -> &str {
&self.cert_chain.device_cert.serial
}
#[must_use]
pub fn trust_anchor_id(&self) -> &str {
&self.cert_chain.trust_anchor.ca_id
}
#[must_use]
pub fn trust_anchor_fingerprint(&self) -> &[u8; 32] {
&self.cert_chain.trust_anchor.fingerprint
}
#[must_use]
pub fn trust_anchor_valid_until(&self) -> u64 {
self.cert_chain.trust_anchor.not_after
}
}
pub struct Verifier {
policy: PolicyConfig,
engine: PolicyEngineV2,
}
impl Verifier {
#[must_use]
pub fn new(policy: PolicyConfig) -> Self {
Self {
policy,
engine: PolicyEngineV2::new(vec![]),
}
}
#[must_use]
pub fn with_engine(policy: PolicyConfig, engine: PolicyEngineV2) -> Self {
Self { policy, engine }
}
pub fn verify_cbor(
&self,
cbor: &[u8],
verifying_key: &[u8],
expected_nonce: &[u8; 32],
now_secs: u64,
) -> Result<VerificationResult, PqRascvError> {
let quote = AttestationQuote::from_cbor(cbor)?;
self.verify_quote("e, verifying_key, expected_nonce, now_secs)?;
Ok(VerificationResult { quote })
}
pub fn verify_with_challenge(
&self,
cbor: &[u8],
verifying_key: &[u8],
challenge: &Challenge,
now_secs: u64,
) -> Result<VerificationResult, PqRascvError> {
self.verify_cbor(cbor, verifying_key, &challenge.nonce, now_secs)
}
pub fn verify_cbor_consuming<L: NonceLedger>(
&self,
cbor: &[u8],
verifying_key: &[u8],
expected_nonce: &[u8; 32],
now_secs: u64,
ledger: &mut L,
) -> Result<VerificationResult, PqRascvError> {
let result = self.verify_cbor(cbor, verifying_key, expected_nonce, now_secs)?;
if let Err(e) = ledger.consume(expected_nonce) {
tracing::warn!(error = %e, "nonce ledger rejected consume (replay or unregistered nonce)");
return Err(e);
}
Ok(result)
}
pub fn verify_with_challenge_consuming<L: NonceLedger>(
&self,
cbor: &[u8],
verifying_key: &[u8],
challenge: &Challenge,
now_secs: u64,
ledger: &mut L,
) -> Result<VerificationResult, PqRascvError> {
self.verify_cbor_consuming(cbor, verifying_key, &challenge.nonce, now_secs, ledger)
}
#[allow(clippy::too_many_arguments)]
pub fn verify_cbor_with_pki(
&self,
cbor: &[u8],
device_cert: DeviceCertificate,
intermediates: Vec<DeviceCertificate>,
trust_anchor: &TrustAnchor,
crl: Option<&VerifiedRevocationList<'_>>,
expected_nonce: &[u8; 32],
now_secs: u64,
) -> Result<PkiVerificationResult, PqRascvError> {
let chain = validate_chain(device_cert, intermediates, trust_anchor, now_secs)?;
if let Some(crl) = crl {
if crl.is_revoked(&chain.device_cert.serial) {
return Err(PqRascvError::CertificateRevoked);
}
}
let quote = AttestationQuote::from_cbor(cbor)?;
self.verify_signature_only("e, &chain.device_cert.subject_key, expected_nonce)?;
validate_hardware_identity(
&chain.device_cert.hardware_identity,
"e.body.measurements,
)?;
self.policy.evaluate(
quote.body.provenance.slsa_level(),
"e.body.measurements.firmware_hash,
quote.body.measurements.event_counter,
quote.body.timestamp,
now_secs,
)?;
let ctx = PolicyContext::from_verified_quote("e, Some(&chain), None, now_secs, None);
self.engine.evaluate(&ctx)?;
Ok(PkiVerificationResult {
quote,
cert_chain: chain,
})
}
#[allow(clippy::too_many_arguments)]
pub fn verify_cbor_with_trust_store(
&self,
cbor: &[u8],
device_cert: DeviceCertificate,
intermediates: Vec<DeviceCertificate>,
trust_store: &TrustStore,
crl: Option<&VerifiedRevocationList<'_>>,
expected_nonce: &[u8; 32],
now_secs: u64,
) -> Result<PkiVerificationResult, PqRascvError> {
let chain = validate_chain_with_store(&device_cert, &intermediates, trust_store, now_secs)?;
if let Some(crl) = crl {
if crl.is_revoked(&chain.device_cert.serial) {
return Err(PqRascvError::CertificateRevoked);
}
}
let quote = AttestationQuote::from_cbor(cbor)?;
self.verify_signature_only("e, &chain.device_cert.subject_key, expected_nonce)?;
validate_hardware_identity(
&chain.device_cert.hardware_identity,
"e.body.measurements,
)?;
self.policy.evaluate(
quote.body.provenance.slsa_level(),
"e.body.measurements.firmware_hash,
quote.body.measurements.event_counter,
quote.body.timestamp,
now_secs,
)?;
let ctx = PolicyContext::from_verified_quote("e, Some(&chain), None, now_secs, None);
self.engine.evaluate(&ctx)?;
Ok(PkiVerificationResult {
quote,
cert_chain: chain,
})
}
pub fn verify_cbor_with_sigstore(
&self,
cbor: &[u8],
verifying_key: &[u8],
expected_nonce: &[u8; 32],
now_secs: u64,
bundle: &ExternalProvenanceBundle,
sigstore_config: &SigstoreConfig,
) -> Result<VerificationResult, PqRascvError> {
let quote = AttestationQuote::from_cbor(cbor)?;
self.verify_signature_only("e, verifying_key, expected_nonce)?;
let vp: VerifiedProvenance = bundle.verify_all(
sigstore_config,
"e.body.measurements.firmware_hash,
now_secs,
)?;
self.policy.evaluate(
quote.body.provenance.slsa_level(),
"e.body.measurements.firmware_hash,
quote.body.measurements.event_counter,
quote.body.timestamp,
now_secs,
)?;
let ctx = PolicyContext::from_verified_quote("e, None, None, now_secs, Some(&vp));
self.engine.evaluate(&ctx)?;
Ok(VerificationResult { quote })
}
fn verify_signature_only(
&self,
quote: &AttestationQuote,
verifying_key: &[u8],
expected_nonce: &[u8; 32],
) -> Result<(), PqRascvError> {
if quote.body.version != PROTOCOL_VERSION {
return Err(PqRascvError::UnsupportedVersion);
}
if quote.body.nonce.ct_eq(expected_nonce).unwrap_u8() == 0 {
return Err(PqRascvError::VerificationFailed);
}
let expected_id = pub_key_id(verifying_key);
if quote.body.pub_key_id != expected_id {
return Err(PqRascvError::VerificationFailed);
}
let body_cbor = quote.body.to_cbor()?;
MlDsaBackend.verify(
&body_cbor,
verifying_key,
"e.signature,
SIGNING_CONTEXT_QUOTE,
)?;
Ok(())
}
#[tracing::instrument(skip_all, err)]
pub fn verify_quote(
&self,
quote: &AttestationQuote,
verifying_key: &[u8],
expected_nonce: &[u8; 32],
now_secs: u64,
) -> Result<(), PqRascvError> {
self.verify_signature_only(quote, verifying_key, expected_nonce)?;
self.policy.evaluate(
quote.body.provenance.slsa_level(),
"e.body.measurements.firmware_hash,
quote.body.measurements.event_counter,
quote.body.timestamp,
now_secs,
)?;
let ctx = PolicyContext::from_verified_quote(quote, None, None, now_secs, None);
self.engine.evaluate(&ctx)?;
tracing::debug!(
event_counter = quote.body.measurements.event_counter,
"attestation quote verified"
);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use pqrascv_core::{
crypto::generate_ml_dsa_keypair,
measurement::SoftwareRoT,
nonce::InMemoryNonceLedger,
provenance::SlsaPredicateBuilder,
quote::{generate_quote, QuoteTimestamp},
};
fn setup() -> (
pqrascv_core::crypto::SigningKeySeed,
[u8; pqrascv_core::crypto::ML_DSA_65_VERIFYING_KEY_SIZE],
AttestationQuote,
) {
let (sk, vk) = generate_ml_dsa_keypair().unwrap();
let rot = SoftwareRoT::new(b"verifier-test-firmware", None, 1);
let provenance = SlsaPredicateBuilder::new("https://ci.example.com")
.add_subject("fw.bin", &[0xabu8; 32])
.with_slsa_level(2)
.with_timestamps(1_700_000_000, 1_700_001_000)
.build()
.unwrap();
let nonce = [0x77u8; 32];
let quote = generate_quote(
&rot,
&pqrascv_core::crypto::MlDsaBackend,
sk.as_bytes(),
&vk,
&nonce,
provenance,
QuoteTimestamp::Rtc(1_700_000_500),
)
.unwrap();
(sk, vk, quote)
}
#[test]
fn verifier_accepts_valid_quote() {
let (_, vk, quote) = setup();
let verifier = Verifier::new(PolicyConfig::default());
let cbor = quote.to_cbor().unwrap();
let result = verifier.verify_cbor(&cbor, &vk, &[0x77u8; 32], 1_700_000_600);
assert!(result.is_ok(), "{result:?}");
}
#[test]
fn verifier_rejects_wrong_nonce() {
let (_, vk, quote) = setup();
let verifier = Verifier::new(PolicyConfig::default());
let cbor = quote.to_cbor().unwrap();
let result = verifier.verify_cbor(&cbor, &vk, &[0x00u8; 32], 1_700_000_600);
assert!(result.is_err());
}
#[test]
fn verify_cbor_consuming_blocks_replay() {
let (_, vk, quote) = setup();
let verifier = Verifier::new(PolicyConfig::default());
let cbor = quote.to_cbor().unwrap();
let nonce = [0x77u8; 32];
let mut ledger = InMemoryNonceLedger::new(1024);
ledger.register(nonce).unwrap();
assert!(verifier
.verify_cbor_consuming(&cbor, &vk, &nonce, 1_700_000_600, &mut ledger)
.is_ok());
let replay = verifier.verify_cbor_consuming(&cbor, &vk, &nonce, 1_700_000_600, &mut ledger);
assert!(
matches!(replay, Err(PqRascvError::InvalidNonce)),
"{replay:?}"
);
}
#[test]
fn verify_cbor_consuming_does_not_burn_nonce_on_verify_failure() {
let (_, vk, quote) = setup();
let verifier = Verifier::new(PolicyConfig::default());
let cbor = quote.to_cbor().unwrap();
let nonce = [0x77u8; 32];
let mut ledger = InMemoryNonceLedger::new(1024);
ledger.register(nonce).unwrap();
assert!(verifier
.verify_cbor_consuming(&cbor, &vk, &[0x00u8; 32], 1_700_000_600, &mut ledger)
.is_err());
assert!(verifier
.verify_cbor_consuming(&cbor, &vk, &nonce, 1_700_000_600, &mut ledger)
.is_ok());
}
#[test]
fn verifier_rejects_tampered_quote() {
let (_, vk, mut quote) = setup();
let verifier = Verifier::new(PolicyConfig::default());
quote.body.measurements.event_counter = 999;
let cbor = quote.to_cbor().unwrap();
let result = verifier.verify_cbor(&cbor, &vk, &[0x77u8; 32], 1_700_000_600);
assert!(result.is_err());
}
#[test]
fn verifier_rejects_wrong_verifying_key() {
let (_, _vk, quote) = setup();
let (_, different_vk) = generate_ml_dsa_keypair().unwrap();
let verifier = Verifier::new(PolicyConfig::default());
let cbor = quote.to_cbor().unwrap();
let result = verifier.verify_cbor(&cbor, &different_vk, &[0x77u8; 32], 1_700_000_600);
assert!(result.is_err());
}
#[test]
fn verifier_rejects_unsupported_version() {
let (_, vk, mut quote) = setup();
let verifier = Verifier::new(PolicyConfig::default());
quote.body.version = 99;
let cbor = quote.to_cbor().unwrap();
let result = verifier.verify_cbor(&cbor, &vk, &[0x77u8; 32], 1_700_000_600);
assert!(matches!(result, Err(PqRascvError::UnsupportedVersion)));
}
#[test]
fn verifier_rejects_rtcless_by_default() {
let (sk, vk) = generate_ml_dsa_keypair().unwrap();
let rot = SoftwareRoT::new(b"fw", None, 1);
let provenance = SlsaPredicateBuilder::new("https://ci.example.com")
.add_subject("fw.bin", &[0xabu8; 32])
.with_slsa_level(2)
.build()
.unwrap();
let quote = generate_quote(
&rot,
&pqrascv_core::crypto::MlDsaBackend,
sk.as_bytes(),
&vk,
&[0x77u8; 32],
provenance,
QuoteTimestamp::NoRtc,
)
.unwrap();
let cbor = quote.to_cbor().unwrap();
let verifier = Verifier::new(PolicyConfig::default());
assert!(matches!(
verifier.verify_cbor(&cbor, &vk, &[0x77u8; 32], 9_999_999),
Err(PqRascvError::RtcRequired)
));
}
#[test]
fn verify_with_challenge_accepts_valid_quote() {
let (_, vk, quote) = setup();
let verifier = Verifier::new(PolicyConfig::default());
let cbor = quote.to_cbor().unwrap();
let challenge = pqrascv_core::quote::Challenge::new([0x77u8; 32]);
let result = verifier.verify_with_challenge(&cbor, &vk, &challenge, 1_700_000_600);
assert!(result.is_ok(), "{result:?}");
}
#[test]
fn verify_with_challenge_rejects_wrong_nonce() {
let (_, vk, quote) = setup();
let verifier = Verifier::new(PolicyConfig::default());
let cbor = quote.to_cbor().unwrap();
let challenge = pqrascv_core::quote::Challenge::new([0x00u8; 32]);
let result = verifier.verify_with_challenge(&cbor, &vk, &challenge, 1_700_000_600);
assert!(result.is_err());
}
#[test]
fn verify_cbor_with_sigstore_rejects_invalid_bundle() {
use pqrascv_core::provenance_v2::{
ExternalProvenanceBundle, ProvenancePredicate, ProvenanceSubject, SigstoreBundle,
SigstoreConfig,
};
let (_, vk, quote) = setup();
let cbor = quote.to_cbor().unwrap();
let predicate = ProvenancePredicate::new(
"https://slsa.dev/provenance/v1".to_string(),
"https://github.com/actions/runner".to_string(),
"abc123".to_string(),
0,
0,
[0u8; 32],
2,
vec![ProvenanceSubject {
name: "firmware.bin".to_string(),
digest_sha3_256: [0xabu8; 32],
}],
);
let bundle = ExternalProvenanceBundle::new(
predicate,
SigstoreBundle::new(vec![], vec![], "{}".to_string(), [0xffu8; 32]),
);
let config = SigstoreConfig {
rekor_public_key_der: vec![],
fulcio_root_der: vec![],
required_issuer: "https://token.actions.githubusercontent.com".to_string(),
allowed_builders: vec![],
max_clock_skew_secs: 60,
};
let result = Verifier::new(PolicyConfig::default()).verify_cbor_with_sigstore(
&cbor,
&vk,
&[0x77u8; 32],
1_700_000_600,
&bundle,
&config,
);
assert!(matches!(result, Err(PqRascvError::InvalidProvenance)));
}
#[test]
fn verify_cbor_with_sigstore_fails_quote_before_bundle() {
use pqrascv_core::provenance_v2::{
ExternalProvenanceBundle, ProvenancePredicate, ProvenanceSubject, SigstoreBundle,
SigstoreConfig,
};
let (_, vk, quote) = setup();
let cbor = quote.to_cbor().unwrap();
let predicate = ProvenancePredicate::new(
"https://slsa.dev/provenance/v1".to_string(),
"https://github.com/actions/runner".to_string(),
"abc123".to_string(),
0,
0,
[0u8; 32],
2,
vec![ProvenanceSubject {
name: "firmware.bin".to_string(),
digest_sha3_256: [0xabu8; 32],
}],
);
let bundle = ExternalProvenanceBundle::new(
predicate,
SigstoreBundle::new(vec![], vec![], "{}".to_string(), [0xffu8; 32]),
);
let config = SigstoreConfig {
rekor_public_key_der: vec![],
fulcio_root_der: vec![],
required_issuer: "https://token.actions.githubusercontent.com".to_string(),
allowed_builders: vec![],
max_clock_skew_secs: 60,
};
let result = Verifier::new(PolicyConfig::default()).verify_cbor_with_sigstore(
&cbor,
&vk,
&[0x00u8; 32],
1_700_000_600,
&bundle,
&config,
);
assert!(matches!(result, Err(PqRascvError::VerificationFailed)));
}
#[test]
fn verification_result_accessors_return_correct_data() {
let (_, vk, quote) = setup();
let verifier = Verifier::new(PolicyConfig::default());
let expected_firmware_hash = quote.body.measurements.firmware_hash;
let cbor = quote.to_cbor().unwrap();
let result = verifier
.verify_cbor(&cbor, &vk, &[0x77u8; 32], 1_700_000_600)
.unwrap();
assert_eq!(result.slsa_level(), 2);
assert_eq!(result.firmware_hash(), &expected_firmware_hash);
assert_eq!(result.nonce(), &[0x77u8; 32]);
}
#[test]
fn engine_rejects_software_rot_via_verify_cbor() {
use pqrascv_core::policy::{PolicyEngineV2, PolicyRule};
let (_, vk, quote) = setup();
let cbor = quote.to_cbor().unwrap();
let verifier = Verifier::with_engine(
PolicyConfig::default(),
PolicyEngineV2::new(vec![PolicyRule::RequireHardwareBackend]),
);
let result = verifier.verify_cbor(&cbor, &vk, &[0x77u8; 32], 1_700_000_600);
assert!(
matches!(result, Err(PqRascvError::PolicyViolation(_))),
"RequireHardwareBackend must reject SoftwareRoT, got {result:?}"
);
}
#[test]
fn empty_engine_does_not_break_existing_verify_cbor() {
use pqrascv_core::policy::PolicyEngineV2;
let (_, vk, quote) = setup();
let cbor = quote.to_cbor().unwrap();
let verifier = Verifier::with_engine(PolicyConfig::default(), PolicyEngineV2::new(vec![]));
assert!(verifier
.verify_cbor(&cbor, &vk, &[0x77u8; 32], 1_700_000_600)
.is_ok());
}
}
#[cfg(test)]
mod pki_tests {
use super::*;
use pqrascv_core::{
crypto::{
generate_ml_dsa_keypair, CryptoBackend, MlDsaBackend, ML_DSA_65_VERIFYING_KEY_SIZE,
SIGNING_CONTEXT_CERT,
},
measurement::SoftwareRoT,
pki::{build_device_certificate, CaPublicKey, HardwareIdentity, TrustStore, CERT_VERSION},
provenance::SlsaPredicateBuilder,
quote::{generate_quote, QuoteTimestamp},
};
fn make_provenance() -> pqrascv_core::provenance::InTotoAttestation {
SlsaPredicateBuilder::new("https://ci.test")
.add_subject("fw.bin", &[0xabu8; 32])
.with_slsa_level(2)
.build()
.unwrap()
}
fn sign_cert(cert: &mut pqrascv_core::pki::DeviceCertificate, seed: &[u8]) {
let tbs = cert.tbs_cbor().unwrap();
let sig = MlDsaBackend.sign(&tbs, seed, SIGNING_CONTEXT_CERT).unwrap();
cert.issuer_signature = sig.as_ref().to_vec();
}
fn make_device_cert(
device_vk: &[u8; ML_DSA_65_VERIFYING_KEY_SIZE],
issuer_id: &str,
serial: &str,
signer_seed: &[u8],
) -> pqrascv_core::pki::DeviceCertificate {
let subject_key_id = pqrascv_core::crypto::pub_key_id(device_vk);
let mut cert = build_device_certificate(
CERT_VERSION,
serial.to_string(),
issuer_id.to_string(),
0,
u64::MAX,
device_vk.to_vec(),
subject_key_id,
HardwareIdentity::TpmEkCertHash([0u8; 32]),
None,
vec![],
serial.to_string(),
Some(0),
);
sign_cert(&mut cert, signer_seed);
cert
}
#[test]
fn pki_verification_succeeds_with_valid_chain() {
let (ca_seed, ca_vk) = generate_ml_dsa_keypair().unwrap();
let (dev_seed, dev_vk) = generate_ml_dsa_keypair().unwrap();
let anchor = TrustAnchor::new(CaPublicKey {
key_bytes: ca_vk,
ca_id: "https://ca.test".to_string(),
not_before: 0,
not_after: u64::MAX,
})
.unwrap();
let device_cert =
make_device_cert(&dev_vk, "https://ca.test", "DEV-001", ca_seed.as_bytes());
let rot = SoftwareRoT::new(b"fw", None, 1);
let nonce = [0xAAu8; 32];
let quote = generate_quote(
&rot,
&MlDsaBackend,
dev_seed.as_bytes(),
&dev_vk,
&nonce,
make_provenance(),
QuoteTimestamp::Rtc(1_700_000_000),
)
.unwrap();
let cbor = quote.to_cbor().unwrap();
let verifier = Verifier::new(PolicyConfig::default());
let result = verifier.verify_cbor_with_pki(
&cbor,
device_cert,
vec![],
&anchor,
None,
&nonce,
1_700_000_100,
);
assert!(result.is_ok());
assert_eq!(result.unwrap().device_serial(), "DEV-001");
}
#[test]
fn pki_verification_rejects_revoked_device() {
use pqrascv_core::crypto::SIGNING_CONTEXT_CRL;
use pqrascv_core::pki::revocation::{
build_revocation_list, RevocationEntry, RevocationReason,
};
let (ca_seed, ca_vk) = generate_ml_dsa_keypair().unwrap();
let (dev_seed, dev_vk) = generate_ml_dsa_keypair().unwrap();
let anchor = TrustAnchor::new(CaPublicKey {
key_bytes: ca_vk,
ca_id: "https://ca.test".to_string(),
not_before: 0,
not_after: u64::MAX,
})
.unwrap();
let device_cert = make_device_cert(
&dev_vk,
"https://ca.test",
"DEV-REVOKED",
ca_seed.as_bytes(),
);
let mut crl = build_revocation_list(
"https://ca.test".to_string(),
1_000,
9_999_999,
vec![RevocationEntry {
serial: "DEV-REVOKED".to_string(),
revoked_at: 1_000,
reason: RevocationReason::KeyCompromise,
}],
vec![],
);
let crl_tbs = crl.tbs_cbor().unwrap();
let crl_sig = MlDsaBackend
.sign(&crl_tbs, ca_seed.as_bytes(), SIGNING_CONTEXT_CRL)
.unwrap();
crl.issuer_signature = crl_sig.as_ref().to_vec();
let verified_crl = crl.verify(&ca_vk, 2_000).unwrap();
let rot = SoftwareRoT::new(b"fw", None, 1);
let nonce = [0xBBu8; 32];
let quote = generate_quote(
&rot,
&MlDsaBackend,
dev_seed.as_bytes(),
&dev_vk,
&nonce,
make_provenance(),
QuoteTimestamp::Rtc(1_700_000_000),
)
.unwrap();
let cbor = quote.to_cbor().unwrap();
let verifier = Verifier::new(PolicyConfig::default());
let result = verifier.verify_cbor_with_pki(
&cbor,
device_cert,
vec![],
&anchor,
Some(&verified_crl),
&nonce,
1_700_000_100,
);
assert!(matches!(result, Err(PqRascvError::CertificateRevoked)));
}
#[test]
fn pki_verification_rejects_wrong_trust_anchor() {
let (ca_seed, _ca_vk) = generate_ml_dsa_keypair().unwrap();
let (_other_seed, other_vk) = generate_ml_dsa_keypair().unwrap();
let (dev_seed, dev_vk) = generate_ml_dsa_keypair().unwrap();
let anchor = TrustAnchor::new(CaPublicKey {
key_bytes: other_vk,
ca_id: "https://ca.test".to_string(),
not_before: 0,
not_after: u64::MAX,
})
.unwrap();
let device_cert =
make_device_cert(&dev_vk, "https://ca.test", "DEV-001", ca_seed.as_bytes());
let rot = SoftwareRoT::new(b"fw", None, 1);
let nonce = [0xCCu8; 32];
let quote = generate_quote(
&rot,
&MlDsaBackend,
dev_seed.as_bytes(),
&dev_vk,
&nonce,
make_provenance(),
QuoteTimestamp::Rtc(1_700_000_000),
)
.unwrap();
let cbor = quote.to_cbor().unwrap();
let verifier = Verifier::new(PolicyConfig::default());
assert!(verifier
.verify_cbor_with_pki(
&cbor,
device_cert,
vec![],
&anchor,
None,
&nonce,
1_700_000_100,
)
.is_err());
}
#[test]
fn pki_verification_succeeds_with_intermediate_chain() {
let (root_seed, root_vk) = generate_ml_dsa_keypair().unwrap();
let (int_seed, int_vk) = generate_ml_dsa_keypair().unwrap();
let (dev_seed, dev_vk) = generate_ml_dsa_keypair().unwrap();
let anchor = TrustAnchor::new(CaPublicKey {
key_bytes: root_vk,
ca_id: "https://root.test".to_string(),
not_before: 0,
not_after: u64::MAX,
})
.unwrap();
let int_subject_key_id = pqrascv_core::crypto::pub_key_id(&int_vk);
let mut intermediate = build_device_certificate(
CERT_VERSION,
"INT-001".to_string(),
"https://root.test".to_string(), 0,
u64::MAX,
int_vk.to_vec(),
int_subject_key_id,
HardwareIdentity::TpmEkCertHash([0u8; 32]),
None,
vec![], "https://int.test".to_string(), None, );
sign_cert(&mut intermediate, root_seed.as_bytes());
let device_cert = make_device_cert(
&dev_vk,
"https://int.test",
"DEV-CHAIN-001",
int_seed.as_bytes(),
);
let rot = SoftwareRoT::new(b"fw", None, 1);
let nonce = [0xDDu8; 32];
let quote = generate_quote(
&rot,
&MlDsaBackend,
dev_seed.as_bytes(),
&dev_vk,
&nonce,
make_provenance(),
QuoteTimestamp::Rtc(1_700_000_000),
)
.unwrap();
let cbor = quote.to_cbor().unwrap();
let verifier = Verifier::new(PolicyConfig::default());
let result = verifier.verify_cbor_with_pki(
&cbor,
device_cert,
vec![intermediate],
&anchor,
None,
&nonce,
1_700_000_100,
);
assert!(
result.is_ok(),
"intermediate chain verification failed: {result:?}"
);
assert_eq!(result.unwrap().device_serial(), "DEV-CHAIN-001");
}
#[test]
fn pki_result_exposes_trust_anchor_metadata() {
let (ca_seed, ca_vk) = generate_ml_dsa_keypair().unwrap();
let (dev_seed, dev_vk) = generate_ml_dsa_keypair().unwrap();
let expected_fingerprint = pqrascv_core::crypto::pub_key_id(&ca_vk);
let anchor = TrustAnchor::new(CaPublicKey {
key_bytes: ca_vk,
ca_id: "https://audit.ca".to_string(),
not_before: 0,
not_after: u64::MAX,
})
.unwrap();
let device_cert =
make_device_cert(&dev_vk, "https://audit.ca", "DEV-AUDIT", ca_seed.as_bytes());
let rot = SoftwareRoT::new(b"fw", None, 1);
let nonce = [0xCCu8; 32];
let quote = generate_quote(
&rot,
&MlDsaBackend,
dev_seed.as_bytes(),
&dev_vk,
&nonce,
make_provenance(),
QuoteTimestamp::Rtc(1_700_000_000),
)
.unwrap();
let cbor = quote.to_cbor().unwrap();
let verifier = Verifier::new(PolicyConfig::default());
let result = verifier
.verify_cbor_with_pki(
&cbor,
device_cert,
vec![],
&anchor,
None,
&nonce,
1_700_000_100,
)
.unwrap();
assert_eq!(result.trust_anchor_id(), "https://audit.ca");
assert_eq!(result.trust_anchor_fingerprint(), &expected_fingerprint);
assert_eq!(result.trust_anchor_valid_until(), u64::MAX);
}
#[test]
fn verify_cbor_with_trust_store_accepts_valid_chain() {
let (ca_seed, ca_vk) = generate_ml_dsa_keypair().unwrap();
let (dev_seed, dev_vk) = generate_ml_dsa_keypair().unwrap();
let store = TrustStore::new(
TrustAnchor::new(CaPublicKey {
key_bytes: ca_vk,
ca_id: "https://store.ca".to_string(),
not_before: 0,
not_after: u64::MAX,
})
.unwrap(),
);
let device_cert =
make_device_cert(&dev_vk, "https://store.ca", "DEV-STORE", ca_seed.as_bytes());
let rot = SoftwareRoT::new(b"fw", None, 1);
let nonce = [0xDDu8; 32];
let quote = generate_quote(
&rot,
&MlDsaBackend,
dev_seed.as_bytes(),
&dev_vk,
&nonce,
make_provenance(),
QuoteTimestamp::Rtc(1_700_000_000),
)
.unwrap();
let cbor = quote.to_cbor().unwrap();
let verifier = Verifier::new(PolicyConfig::default());
let result = verifier.verify_cbor_with_trust_store(
&cbor,
device_cert,
vec![],
&store,
None,
&nonce,
1_700_000_100,
);
assert!(result.is_ok());
assert_eq!(result.unwrap().trust_anchor_id(), "https://store.ca");
}
#[test]
fn verify_cbor_with_trust_store_rejects_expired_store() {
let (ca_seed, ca_vk) = generate_ml_dsa_keypair().unwrap();
let (dev_seed, dev_vk) = generate_ml_dsa_keypair().unwrap();
let store = TrustStore::new(
TrustAnchor::new(CaPublicKey {
key_bytes: ca_vk,
ca_id: "https://expired.ca".to_string(),
not_before: 0,
not_after: 999,
})
.unwrap(),
);
let device_cert =
make_device_cert(&dev_vk, "https://expired.ca", "DEV-EXP", ca_seed.as_bytes());
let rot = SoftwareRoT::new(b"fw", None, 1);
let nonce = [0xEEu8; 32];
let quote = generate_quote(
&rot,
&MlDsaBackend,
dev_seed.as_bytes(),
&dev_vk,
&nonce,
make_provenance(),
QuoteTimestamp::Rtc(1_700_000_000),
)
.unwrap();
let cbor = quote.to_cbor().unwrap();
let verifier = Verifier::new(PolicyConfig::default());
let result = verifier.verify_cbor_with_trust_store(
&cbor,
device_cert,
vec![],
&store,
None,
&nonce,
1_700_000_100,
);
assert!(matches!(result, Err(PqRascvError::TrustAnchorExpired)));
}
#[test]
fn engine_require_cert_chain_passes_in_verify_cbor_with_pki() {
use pqrascv_core::policy::{PolicyEngineV2, PolicyRule};
let (ca_seed, ca_vk) = generate_ml_dsa_keypair().unwrap();
let (dev_seed, dev_vk) = generate_ml_dsa_keypair().unwrap();
let anchor = TrustAnchor::new(CaPublicKey {
key_bytes: ca_vk,
ca_id: "https://ca.test".to_string(),
not_before: 0,
not_after: u64::MAX,
})
.unwrap();
let device_cert =
make_device_cert(&dev_vk, "https://ca.test", "DEV-E1", ca_seed.as_bytes());
let rot = SoftwareRoT::new(b"fw", None, 1);
let nonce = [0xE1u8; 32];
let quote = generate_quote(
&rot,
&MlDsaBackend,
dev_seed.as_bytes(),
&dev_vk,
&nonce,
make_provenance(),
QuoteTimestamp::Rtc(1_700_000_000),
)
.unwrap();
let cbor = quote.to_cbor().unwrap();
let verifier = Verifier::with_engine(
PolicyConfig::default(),
PolicyEngineV2::new(vec![PolicyRule::RequireCertificateChain]),
);
let result = verifier.verify_cbor_with_pki(
&cbor,
device_cert,
vec![],
&anchor,
None,
&nonce,
1_700_000_100,
);
assert!(
result.is_ok(),
"RequireCertificateChain must pass when chain is provided: {result:?}"
);
}
#[test]
fn engine_require_cert_chain_fails_in_verify_cbor() {
use pqrascv_core::policy::{PolicyEngineV2, PolicyRule};
let (_, vk, quote) = {
let (sk, vk) = generate_ml_dsa_keypair().unwrap();
let rot = SoftwareRoT::new(b"fw", None, 1);
let nonce = [0xE2u8; 32];
let quote = generate_quote(
&rot,
&MlDsaBackend,
sk.as_bytes(),
&vk,
&nonce,
make_provenance(),
QuoteTimestamp::Rtc(1_700_000_000),
)
.unwrap();
(sk, vk, quote)
};
let cbor = quote.to_cbor().unwrap();
let verifier = Verifier::with_engine(
PolicyConfig::default(),
PolicyEngineV2::new(vec![PolicyRule::RequireCertificateChain]),
);
assert!(matches!(
verifier.verify_cbor(&cbor, &vk, &[0xE2u8; 32], 1_700_000_600),
Err(PqRascvError::PolicyViolation(_))
));
}
#[test]
fn pki_verification_cross_validates_hardware_identity() {
let (ca_seed, ca_vk) = generate_ml_dsa_keypair().unwrap();
let (dev_seed, dev_vk) = generate_ml_dsa_keypair().unwrap();
let anchor = TrustAnchor::new(CaPublicKey {
key_bytes: ca_vk,
ca_id: "https://ca.test".to_string(),
not_before: 0,
not_after: u64::MAX,
})
.unwrap();
let device_cert =
make_device_cert(&dev_vk, "https://ca.test", "DEV-HW-001", ca_seed.as_bytes());
let rot = SoftwareRoT::new(b"fw", None, 1);
let nonce = [0xCCu8; 32];
let quote = generate_quote(
&rot,
&MlDsaBackend,
dev_seed.as_bytes(),
&dev_vk,
&nonce,
make_provenance(),
QuoteTimestamp::Rtc(1_700_000_000),
)
.unwrap();
let cbor = quote.to_cbor().unwrap();
let verifier = Verifier::new(PolicyConfig::default());
assert!(verifier
.verify_cbor_with_pki(
&cbor,
device_cert,
vec![],
&anchor,
None,
&nonce,
1_700_000_100
)
.is_ok());
}
#[test]
fn e2e_pki_root_intermediate_device_verification() {
use sha3::{Digest, Sha3_256};
let (root_seed, root_vk) = generate_ml_dsa_keypair().unwrap();
let (int_seed, int_vk) = generate_ml_dsa_keypair().unwrap();
let (dev_seed, dev_vk) = generate_ml_dsa_keypair().unwrap();
let root_anchor = TrustAnchor::new(CaPublicKey {
ca_id: "https://root.pki.example.com".to_string(),
key_bytes: root_vk,
not_before: 0,
not_after: u64::MAX,
})
.unwrap();
let mut int_cert = build_device_certificate(
CERT_VERSION,
"INT-CA-001".to_string(),
"https://root.pki.example.com".to_string(),
0,
u64::MAX,
int_vk.to_vec(),
pqrascv_core::crypto::pub_key_id(&int_vk),
HardwareIdentity::TpmEkCertHash([0u8; 32]),
None,
vec![],
"https://int.pki.example.com".to_string(),
Some(1), );
sign_cert(&mut int_cert, root_seed.as_bytes());
let device_cert = make_device_cert(
&dev_vk,
"https://int.pki.example.com",
"DEV-E2E-001",
int_seed.as_bytes(),
);
let firmware: &[u8] = b"enterprise firmware v1.0";
let fw_hash: [u8; 32] = Sha3_256::digest(firmware).into();
let nonce = [0xF0u8; 32];
let provenance = SlsaPredicateBuilder::new("https://ci.example.com")
.add_subject("firmware.bin", &fw_hash)
.with_slsa_level(2)
.build()
.unwrap();
let rot = SoftwareRoT::new(firmware, None, 0);
let quote = generate_quote(
&rot,
&MlDsaBackend,
dev_seed.as_bytes(),
&dev_vk,
&nonce,
provenance,
QuoteTimestamp::Rtc(1_700_001_000),
)
.unwrap();
let cbor = quote.to_cbor().unwrap();
let verifier = Verifier::new(PolicyConfig::default());
let result = verifier
.verify_cbor_with_pki(
&cbor,
device_cert,
vec![int_cert], &root_anchor,
None,
&nonce,
1_700_001_100,
)
.unwrap();
assert_eq!(result.firmware_hash(), &fw_hash);
assert_eq!(result.device_serial(), "DEV-E2E-001");
assert_eq!(result.trust_anchor_id(), "https://root.pki.example.com");
assert_eq!(result.nonce(), &nonce);
}
}