use der::{asn1::OctetString, Decode as _};
use pkix_path::names_match;
use x509_cert::{crl::CertificateList, Certificate};
const OID_AUTHORITY_KEY_IDENTIFIER: der::asn1::ObjectIdentifier =
der::asn1::ObjectIdentifier::new_unwrap("2.5.29.35");
const OID_SUBJECT_KEY_IDENTIFIER: der::asn1::ObjectIdentifier =
der::asn1::ObjectIdentifier::new_unwrap("2.5.29.14");
#[must_use]
pub fn discover_crl_signer<'a>(
bundle: &'a [Certificate],
crl: &CertificateList,
) -> Option<&'a Certificate> {
if let Some(aki_key_id) = crl_authority_key_id(crl) {
for cert in bundle {
if let Some(ski) = cert_subject_key_id(cert) {
if ski.as_bytes() == aki_key_id.as_bytes() {
return Some(cert);
}
}
}
return None;
}
let crl_issuer = &crl.tbs_cert_list.issuer;
bundle
.iter()
.find(|cert| names_match(&cert.tbs_certificate.subject, crl_issuer))
}
pub(crate) fn reaches_self_signed(bundle: &[Certificate], signer: &Certificate) -> bool {
let mut current = signer;
let mut steps = 0usize;
let max_steps = bundle.len().saturating_add(1);
loop {
if names_match(
¤t.tbs_certificate.subject,
¤t.tbs_certificate.issuer,
) {
return true;
}
if steps >= max_steps {
return false;
}
steps += 1;
let parent = find_parent(bundle, current);
match parent {
Some(p) => current = p,
None => return false,
}
}
}
fn find_parent<'a>(bundle: &'a [Certificate], cert: &Certificate) -> Option<&'a Certificate> {
if let Some(aki) = cert_authority_key_id(cert) {
for candidate in bundle {
if core::ptr::eq(candidate, cert) {
continue;
}
if let Some(ski) = cert_subject_key_id(candidate) {
if ski.as_bytes() == aki.as_bytes() {
return Some(candidate);
}
}
}
}
let issuer = &cert.tbs_certificate.issuer;
bundle.iter().find(|candidate| {
!core::ptr::eq(*candidate, cert) && names_match(&candidate.tbs_certificate.subject, issuer)
})
}
fn crl_authority_key_id(crl: &CertificateList) -> Option<OctetString> {
let ext = crl
.tbs_cert_list
.crl_extensions
.as_deref()
.unwrap_or(&[])
.iter()
.find(|e| e.extn_id == OID_AUTHORITY_KEY_IDENTIFIER)?;
let aki =
x509_cert::ext::pkix::AuthorityKeyIdentifier::from_der(ext.extn_value.as_bytes()).ok()?;
aki.key_identifier
}
fn cert_authority_key_id(cert: &Certificate) -> Option<OctetString> {
let ext = cert
.tbs_certificate
.extensions
.as_deref()
.unwrap_or(&[])
.iter()
.find(|e| e.extn_id == OID_AUTHORITY_KEY_IDENTIFIER)?;
let aki =
x509_cert::ext::pkix::AuthorityKeyIdentifier::from_der(ext.extn_value.as_bytes()).ok()?;
aki.key_identifier
}
fn cert_subject_key_id(cert: &Certificate) -> Option<OctetString> {
let ext = cert
.tbs_certificate
.extensions
.as_deref()
.unwrap_or(&[])
.iter()
.find(|e| e.extn_id == OID_SUBJECT_KEY_IDENTIFIER)?;
let ski =
x509_cert::ext::pkix::SubjectKeyIdentifier::from_der(ext.extn_value.as_bytes()).ok()?;
Some(ski.0)
}
#[cfg(test)]
mod tests {
use super::*;
fn pkits_cert(name: &str) -> Vec<u8> {
let base = env!("CARGO_MANIFEST_DIR");
let path = format!("{base}/../pkix-path/tests/pkits/certs/{name}.crt");
std::fs::read(&path).unwrap_or_else(|e| panic!("cert not found at {path}: {e}"))
}
fn pkits_crl(name: &str) -> Vec<u8> {
let base = env!("CARGO_MANIFEST_DIR");
let path = format!("{base}/../pkix-path/tests/pkits/crls/{name}.crl");
std::fs::read(&path).unwrap_or_else(|e| panic!("CRL not found at {path}: {e}"))
}
fn load_cert(der: &[u8]) -> Certificate {
Certificate::from_der(der).expect("cert DER parse")
}
fn load_crl(der: &[u8]) -> CertificateList {
CertificateList::from_der(der).expect("CRL DER parse")
}
#[test]
fn aki_ski_picks_correct_rollover_signer() {
let new_key_ca = load_cert(&pkits_cert("BasicSelfIssuedNewKeyCACert"));
let bridge = load_cert(&pkits_cert("BasicSelfIssuedNewKeyOldWithNewCACert"));
let crl = load_crl(&pkits_crl("BasicSelfIssuedNewKeyCACRL"));
let bundle = vec![bridge.clone(), new_key_ca.clone()];
let signer = discover_crl_signer(&bundle, &crl).expect("must locate signer");
assert_eq!(
signer.tbs_certificate.subject_public_key_info,
new_key_ca.tbs_certificate.subject_public_key_info,
"AKI/SKI walk should pick the NEW key cert, not the bridge",
);
}
#[test]
fn aki_ski_is_order_independent() {
let new_key_ca = load_cert(&pkits_cert("BasicSelfIssuedNewKeyCACert"));
let bridge = load_cert(&pkits_cert("BasicSelfIssuedNewKeyOldWithNewCACert"));
let crl = load_crl(&pkits_crl("BasicSelfIssuedNewKeyCACRL"));
let bundle = vec![new_key_ca.clone(), bridge.clone()];
let signer = discover_crl_signer(&bundle, &crl).expect("must locate signer");
assert_eq!(
signer.tbs_certificate.subject_public_key_info,
new_key_ca.tbs_certificate.subject_public_key_info,
);
}
#[test]
fn empty_bundle_returns_none() {
let crl = load_crl(&pkits_crl("BasicSelfIssuedNewKeyCACRL"));
assert!(discover_crl_signer(&[], &crl).is_none());
}
#[test]
fn unrelated_bundle_returns_none() {
let unrelated = load_cert(&pkits_cert("GoodCACert"));
let crl = load_crl(&pkits_crl("BasicSelfIssuedNewKeyCACRL"));
assert!(discover_crl_signer(std::slice::from_ref(&unrelated), &crl).is_none());
}
#[test]
fn self_signed_reaches_itself() {
let trust_anchor = load_cert(&pkits_cert("TrustAnchorRootCertificate"));
assert!(reaches_self_signed(
std::slice::from_ref(&trust_anchor),
&trust_anchor
));
}
#[test]
fn orphan_cert_does_not_reach_anchor() {
let signer = load_cert(&pkits_cert("BasicSelfIssuedNewKeyCACert"));
assert!(!reaches_self_signed(std::slice::from_ref(&signer), &signer));
}
#[test]
fn signer_with_root_reaches_anchor() {
let signer = load_cert(&pkits_cert("BasicSelfIssuedNewKeyCACert"));
let root = load_cert(&pkits_cert("TrustAnchorRootCertificate"));
let bundle = vec![signer.clone(), root.clone()];
assert!(reaches_self_signed(&bundle, &signer));
}
}