use std::{borrow::Cow, io::Write};
use async_generic::async_generic;
use coset::CoseSign1;
use x509_parser::{num_bigint::BigUint, prelude::*};
use crate::{
crypto::{
asn1::rfc3161::TstInfo,
base64,
cose::{
cert_chain_from_sign1, parse_cose_sign1, signing_alg_from_sign1,
signing_time_from_sign1, signing_time_from_sign1_async, validate_cose_tst_info,
validate_cose_tst_info_async, CertificateInfo, CertificateTrustPolicy, Verifier,
},
raw_signature::SigningAlg,
},
error::{Error, Result},
settings::Settings,
status_tracker::StatusTracker,
};
fn get_sign_cert(sign1: &coset::CoseSign1) -> Result<Vec<u8>> {
let certs = cert_chain_from_sign1(sign1)?;
Ok(certs[0].clone())
}
#[async_generic]
#[allow(clippy::too_many_arguments)]
pub(crate) fn verify_cose(
cose_bytes: &[u8],
data: &[u8],
additional_data: &[u8],
cert_check: bool,
ctp: &CertificateTrustPolicy,
tst_info: Option<&TstInfo>,
validation_log: &mut StatusTracker,
settings: &Settings,
) -> Result<CertificateInfo> {
let verifier = if cert_check {
if settings.verify.verify_trust {
Verifier::VerifyTrustPolicy(Cow::Borrowed(ctp))
} else {
Verifier::VerifyCertificateProfileOnly(Cow::Borrowed(ctp))
}
} else {
Verifier::IgnoreProfileAndTrustPolicy
};
let sign1 = parse_cose_sign1(cose_bytes, data, validation_log)?;
let tst_info = match tst_info {
Some(tst_info) => Some(tst_info.clone()),
None => {
if _sync {
validate_cose_tst_info(
&sign1,
data,
ctp,
validation_log,
settings.verify.verify_timestamp_trust,
)
.ok()
} else {
validate_cose_tst_info_async(
&sign1,
data,
ctp,
validation_log,
settings.verify.verify_timestamp_trust,
)
.await
.ok()
}
}
};
if _sync {
Ok(verifier.verify_signature(
cose_bytes,
data,
additional_data,
tst_info.as_ref(),
validation_log,
)?)
} else {
Ok(verifier
.verify_signature_async(
cose_bytes,
data,
additional_data,
tst_info.as_ref(),
validation_log,
)
.await?)
}
}
fn dump_cert_chain(certs: &[Vec<u8>]) -> Result<Vec<u8>> {
let mut writer = Vec::new();
let line_len = 64;
let cert_begin = "-----BEGIN CERTIFICATE-----";
let cert_end = "-----END CERTIFICATE-----";
for der_bytes in certs {
let cert_base_str = base64::encode(der_bytes);
let cert_lines = cert_base_str
.chars()
.collect::<Vec<char>>()
.chunks(line_len)
.map(|chunk| chunk.iter().collect::<String>())
.collect::<Vec<_>>();
writer
.write_fmt(format_args!("{cert_begin}\n"))
.map_err(|_e| Error::UnsupportedType)?;
for l in cert_lines {
writer
.write_fmt(format_args!("{l}\n"))
.map_err(|_e| Error::UnsupportedType)?;
}
writer
.write_fmt(format_args!("{cert_end}\n"))
.map_err(|_e| Error::UnsupportedType)?;
}
Ok(writer)
}
fn extract_subject_from_cert(cert: &X509Certificate) -> Option<String> {
cert.subject()
.iter_organization()
.map(|attr| attr.as_str())
.last()
.map(|attr| match attr {
Ok(attr) => Some(attr.to_string()),
Err(_) => None,
})?
}
fn extract_common_name_from_cert(cert: &X509Certificate) -> Option<String> {
cert.subject()
.iter_common_name()
.map(|attr| attr.as_str())
.last()
.map(|attr| match attr {
Ok(attr) => Some(attr.to_string()),
Err(_) => None,
})?
}
fn extract_serial_from_cert(cert: &X509Certificate) -> BigUint {
cert.serial.clone()
}
pub(crate) fn get_signing_cert_serial_num(sign1: &CoseSign1) -> Result<BigUint> {
let der_bytes = get_sign_cert(sign1)?;
let (_rem, signcert) =
X509Certificate::from_der(&der_bytes).map_err(|_| Error::CoseInvalidCert)?;
Ok(extract_serial_from_cert(&signcert))
}
#[allow(unused_variables)]
#[async_generic]
pub(crate) fn get_signing_info(
cose_bytes: &[u8],
data: &[u8],
validation_log: &mut StatusTracker,
verify_trust: bool,
) -> CertificateInfo {
let mut date = None;
let mut issuer_org = None;
let mut common_name = None;
let mut alg: Option<SigningAlg> = None;
let mut cert_serial_number = None;
let sign1 = match parse_cose_sign1(cose_bytes, data, validation_log) {
Ok(sign1) => {
match get_sign_cert(&sign1) {
Ok(der_bytes) => {
if let Ok((_rem, signcert)) = X509Certificate::from_der(&der_bytes) {
date = if _sync {
signing_time_from_sign1(&sign1, data, verify_trust)
} else {
signing_time_from_sign1_async(&sign1, data, verify_trust).await
};
issuer_org = extract_subject_from_cert(&signcert);
cert_serial_number = Some(extract_serial_from_cert(&signcert));
common_name = extract_common_name_from_cert(&signcert);
if let Ok(a) = signing_alg_from_sign1(&sign1) {
alg = Some(a);
}
};
Ok(sign1)
}
Err(e) => Err(e),
}
}
Err(e) => Err(e.into()),
};
let certs = match sign1 {
Ok(s) => match cert_chain_from_sign1(&s) {
Ok(c) => dump_cert_chain(&c).unwrap_or_default(),
Err(_) => Vec::new(),
},
Err(_e) => Vec::new(),
};
CertificateInfo {
issuer_org,
date,
alg,
validated: false,
cert_chain: certs,
cert_serial_number,
revocation_status: None,
iat: None,
common_name,
}
}
#[allow(unused_imports)]
#[allow(clippy::unwrap_used)]
#[cfg(test)]
pub mod tests {
use coset::{cbor::value::Value, Label};
#[allow(deprecated)]
use sha2::digest::generic_array::sequence::Shorten;
use x509_parser::{certificate::X509Certificate, pem::Pem};
use super::*;
use crate::{
crypto::raw_signature::SigningAlg, settings::Settings, status_tracker::StatusTracker,
utils::test_signer::test_signer, Signer,
};
#[test]
fn test_no_timestamp() {
let mut settings = Settings::default();
settings.verify.verify_trust = false;
let mut validation_log = StatusTracker::default();
let mut claim = crate::claim::Claim::new("extern_sign_test", Some("contentauth"), 1);
claim.build().unwrap();
let claim_bytes = claim.data().unwrap();
let box_size = 10000;
let signer = test_signer(SigningAlg::Ps256);
let cose_bytes =
crate::cose_sign::sign_claim(&claim_bytes, signer.as_ref(), box_size, &settings)
.unwrap();
let cose_sign1 = parse_cose_sign1(&cose_bytes, &claim_bytes, &mut validation_log).unwrap();
let signing_time = signing_time_from_sign1(
&cose_sign1,
&claim_bytes,
settings.verify.verify_timestamp_trust,
);
assert_eq!(signing_time, None);
}
#[test]
fn test_stapled_ocsp() {
use crate::crypto::{
raw_signature::{signer_from_cert_chain_and_private_key, RawSigner, RawSignerError},
time_stamp::{TimeStampError, TimeStampProvider},
};
let mut settings = Settings::default();
settings.verify.verify_trust = false;
let mut validation_log = StatusTracker::default();
let mut claim = crate::claim::Claim::new("ocsp_sign_test", Some("contentauth"), 1);
claim.build().unwrap();
let claim_bytes = claim.data().unwrap();
let sign_cert = include_bytes!("../tests/fixtures/certs/ps256.pub").to_vec();
let pem_key = include_bytes!("../tests/fixtures/certs/ps256.pem").to_vec();
let ocsp_rsp_data = include_bytes!("../tests/fixtures/ocsp_good.data");
let raw_signer =
signer_from_cert_chain_and_private_key(&sign_cert, &pem_key, SigningAlg::Ps256, None)
.unwrap();
struct OcspSigner {
pub raw_signer: Box<dyn RawSigner>,
pub ocsp_rsp: Vec<u8>,
}
impl crate::Signer for OcspSigner {
fn sign(&self, data: &[u8]) -> Result<Vec<u8>> {
Ok(self.raw_signer.sign(data)?)
}
fn alg(&self) -> SigningAlg {
SigningAlg::Ps256
}
fn certs(&self) -> Result<Vec<Vec<u8>>> {
Ok(self.raw_signer.cert_chain()?)
}
fn reserve_size(&self) -> usize {
self.raw_signer.reserve_size()
}
fn ocsp_val(&self) -> Option<Vec<u8>> {
Some(self.ocsp_rsp.clone())
}
}
let ocsp_signer = OcspSigner {
raw_signer,
ocsp_rsp: ocsp_rsp_data.to_vec(),
};
let cose_bytes = crate::cose_sign::sign_claim(
&claim_bytes,
&ocsp_signer,
ocsp_signer.reserve_size(),
&settings,
)
.unwrap();
let cose_sign1 = parse_cose_sign1(&cose_bytes, &claim_bytes, &mut validation_log).unwrap();
let ocsp_stapled = get_ocsp_der(&cose_sign1).unwrap();
assert_eq!(ocsp_rsp_data, ocsp_stapled.as_slice());
}
fn get_ocsp_der(sign1: &coset::CoseSign1) -> Option<Vec<u8>> {
if let Some(der) = sign1
.unprotected
.rest
.iter()
.find_map(|x: &(Label, Value)| {
if x.0 == Label::Text("rVals".to_string()) {
Some(x.1.clone())
} else {
None
}
})
{
match der {
Value::Map(rvals_map) => {
rvals_map.iter().find_map(|x: &(Value, Value)| {
if x.0 == Value::Text("ocspVals".to_string()) {
x.1.as_array()
.and_then(|ocsp_rsp_val| ocsp_rsp_val.first())
.and_then(Value::as_bytes)
.map(|b| b.to_vec())
} else {
None
}
})
}
_ => None,
}
} else {
None
}
}
}