use alloc::string::String;
use alloc::vec::Vec;
use rustls_pki_types::pem::PemObject;
use rustls_pki_types::{CertificateDer, TrustAnchor};
#[derive(Debug, Clone)]
pub struct IdentityConfig {
pub identity_cert_pem: Vec<u8>,
pub identity_ca_pem: Vec<u8>,
pub identity_key_pem: Option<Vec<u8>>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PkiError {
InvalidPem(String),
NoCertInPem,
CertInvalid(String),
EmptyTrustAnchors,
}
impl core::fmt::Display for PkiError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
Self::InvalidPem(m) => write!(f, "invalid PEM: {m}"),
Self::NoCertInPem => write!(f, "no certificate in PEM"),
Self::CertInvalid(m) => write!(f, "certificate invalid: {m}"),
Self::EmptyTrustAnchors => write!(f, "trust-anchor bundle is empty"),
}
}
}
impl std::error::Error for PkiError {}
#[derive(Debug, Clone)]
pub(crate) struct ParsedIdentity {
pub cert_der: Vec<u8>,
pub trust_anchors_der: Vec<Vec<u8>>,
pub private_key_pkcs8_der: Option<Vec<u8>>,
pub key_algo: CertKeyAlgo,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CertKeyAlgo {
EcdsaP256Sha256,
RsaPssSha256,
Unknown,
}
impl ParsedIdentity {
pub fn from_config(cfg: &IdentityConfig) -> Result<Self, PkiError> {
let cert_der = first_cert_der(&cfg.identity_cert_pem)?;
let trust_anchors_der = all_certs_der(&cfg.identity_ca_pem)?;
if trust_anchors_der.is_empty() {
return Err(PkiError::EmptyTrustAnchors);
}
verify_cert_chain(&cert_der, &trust_anchors_der)?;
let key_algo = detect_cert_algo(&cert_der);
let private_key_pkcs8_der = match cfg.identity_key_pem.as_deref() {
Some(pem) => Some(parse_pkcs8_pem(pem)?),
None => None,
};
Ok(Self {
cert_der,
trust_anchors_der,
private_key_pkcs8_der,
key_algo,
})
}
pub fn verify_remote_der(&self, remote_cert_der: &[u8]) -> Result<(), PkiError> {
verify_cert_chain(remote_cert_der, &self.trust_anchors_der)
}
}
fn first_cert_der(pem: &[u8]) -> Result<Vec<u8>, PkiError> {
let certs = all_certs_der(pem)?;
certs.into_iter().next().ok_or(PkiError::NoCertInPem)
}
fn all_certs_der(pem: &[u8]) -> Result<Vec<Vec<u8>>, PkiError> {
let mut out = Vec::new();
for item in CertificateDer::pem_slice_iter(pem) {
let cert = item.map_err(|e| PkiError::InvalidPem(alloc::format!("{e:?}")))?;
out.push(cert.as_ref().to_vec());
}
Ok(out)
}
fn verify_cert_chain(end_entity_der: &[u8], trust_anchors_der: &[Vec<u8>]) -> Result<(), PkiError> {
let ee = CertificateDer::from_slice(end_entity_der);
let end_entity = webpki::EndEntityCert::try_from(&ee)
.map_err(|e| PkiError::CertInvalid(alloc::format!("parse: {e:?}")))?;
let ta_certs: Vec<CertificateDer<'_>> = trust_anchors_der
.iter()
.map(|b| CertificateDer::from_slice(b))
.collect();
let mut anchors: Vec<TrustAnchor<'_>> = Vec::with_capacity(ta_certs.len());
for ta_cert in &ta_certs {
let ta = webpki::anchor_from_trusted_cert(ta_cert)
.map_err(|e| PkiError::CertInvalid(alloc::format!("trust-anchor: {e:?}")))?;
anchors.push(ta);
}
let now = rustls_pki_types::UnixTime::now();
let algs = webpki::ALL_VERIFICATION_ALGS;
end_entity
.verify_for_usage(
algs,
&anchors,
&[],
now,
webpki::KeyUsage::client_auth(),
None,
None,
)
.map_err(|e| PkiError::CertInvalid(alloc::format!("verify: {e:?}")))?;
Ok(())
}
pub(crate) fn detect_cert_algo_pub(der: &[u8]) -> CertKeyAlgo {
detect_cert_algo(der)
}
fn detect_cert_algo(der: &[u8]) -> CertKeyAlgo {
const ECDSA_OID: &[u8] = &[0x06, 0x07, 0x2A, 0x86, 0x48, 0xCE, 0x3D, 0x02, 0x01];
const P256_OID: &[u8] = &[0x06, 0x08, 0x2A, 0x86, 0x48, 0xCE, 0x3D, 0x03, 0x01, 0x07];
const RSA_OID: &[u8] = &[
0x06, 0x09, 0x2A, 0x86, 0x48, 0x86, 0xF7, 0x0D, 0x01, 0x01, 0x01,
];
if contains_subseq(der, ECDSA_OID) && contains_subseq(der, P256_OID) {
CertKeyAlgo::EcdsaP256Sha256
} else if contains_subseq(der, RSA_OID) {
CertKeyAlgo::RsaPssSha256
} else {
CertKeyAlgo::Unknown
}
}
fn contains_subseq(haystack: &[u8], needle: &[u8]) -> bool {
if needle.is_empty() || haystack.len() < needle.len() {
return false;
}
haystack.windows(needle.len()).any(|w| w == needle)
}
fn parse_pkcs8_pem(pem: &[u8]) -> Result<Vec<u8>, PkiError> {
use rustls_pki_types::PrivatePkcs8KeyDer;
PrivatePkcs8KeyDer::pem_slice_iter(pem)
.next()
.ok_or(PkiError::NoCertInPem)?
.map(|k| k.secret_pkcs8_der().to_vec())
.map_err(|e| PkiError::InvalidPem(alloc::format!("{e:?}")))
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod mutation_killers {
use super::*;
#[test]
fn pki_error_display_messages_are_specific() {
assert_eq!(
alloc::format!(
"{}",
PkiError::InvalidPem(alloc::string::String::from("bad"))
),
"invalid PEM: bad"
);
assert_eq!(
alloc::format!("{}", PkiError::NoCertInPem),
"no certificate in PEM"
);
assert_eq!(
alloc::format!(
"{}",
PkiError::CertInvalid(alloc::string::String::from("expired"))
),
"certificate invalid: expired"
);
assert_eq!(
alloc::format!("{}", PkiError::EmptyTrustAnchors),
"trust-anchor bundle is empty"
);
}
#[test]
fn detect_cert_algo_requires_both_ecdsa_oids() {
let only_ecdsa = [0x06, 0x07, 0x2A, 0x86, 0x48, 0xCE, 0x3D, 0x02, 0x01];
assert_eq!(detect_cert_algo(&only_ecdsa), CertKeyAlgo::Unknown);
let only_p256 = [0x06, 0x08, 0x2A, 0x86, 0x48, 0xCE, 0x3D, 0x03, 0x01, 0x07];
assert_eq!(detect_cert_algo(&only_p256), CertKeyAlgo::Unknown);
let mut both = Vec::new();
both.extend_from_slice(&only_ecdsa);
both.extend_from_slice(&only_p256);
assert_eq!(detect_cert_algo(&both), CertKeyAlgo::EcdsaP256Sha256);
}
#[test]
fn contains_subseq_not_constant_true() {
assert!(!contains_subseq(b"abc", b"xyz"));
assert!(!contains_subseq(b"", b"x"));
assert!(!contains_subseq(b"a", b"abc"));
}
#[test]
fn contains_subseq_empty_needle_returns_false() {
assert!(!contains_subseq(b"abc", b""));
assert!(!contains_subseq(b"", b""));
}
#[test]
fn contains_subseq_exact_match_at_equal_length() {
assert!(contains_subseq(b"abc", b"abc"));
assert!(contains_subseq(b"\x06\x07\x2A", b"\x06\x07\x2A"));
}
#[test]
fn contains_subseq_no_match_returns_false() {
assert!(!contains_subseq(b"abcde", b"xyz"));
assert!(!contains_subseq(b"\x01\x02\x03\x04", b"\x05\x06"));
}
}