use crate::error::Result;
use der::{Decode, Encode};
use digest::Digest;
use sha2::{Sha256, Sha384, Sha512};
use tracing::{debug, trace, warn};
use x509_cert::{certificate::Certificate, der::asn1::ObjectIdentifier, time::Validity};
mod oids {
#[allow(dead_code)]
pub const SIGNED_DATA: &str = "1.2.840.113549.1.7.2";
pub const SHA256: &str = "2.16.840.1.101.3.4.2.1";
pub const SHA384: &str = "2.16.840.1.101.3.4.2.2";
pub const SHA512: &str = "2.16.840.1.101.3.4.2.3";
pub const RSA_SHA256: &str = "1.2.840.113549.1.1.11";
pub const RSA_SHA384: &str = "1.2.840.113549.1.1.12";
pub const RSA_SHA512: &str = "1.2.840.113549.1.1.13";
pub const SIGNING_TIME: &str = "1.2.840.113549.1.9.5";
pub const TIMESTAMP_TOKEN: &str = "1.2.840.113549.1.9.16.2.14";
}
#[derive(Debug, Clone)]
pub struct EnhancedSignatureInfo {
pub format: String,
pub size: usize,
pub digest_algorithm: String,
pub signature_algorithm: String,
pub is_verified: bool,
pub verification_errors: Vec<String>,
pub certificates: Vec<CertificateInfo>,
pub signer_count: usize,
pub timestamp_info: Option<TimestampInfo>,
}
#[derive(Debug, Clone)]
pub struct CertificateInfo {
pub subject: String,
pub issuer: String,
pub serial_number: String,
pub not_before: String,
pub not_after: String,
pub is_valid: bool,
}
#[derive(Debug, Clone)]
pub struct TimestampInfo {
pub signing_time: Option<String>,
pub is_verified: bool,
pub timestamp_authority: Option<String>,
}
#[allow(clippy::too_many_lines)]
pub fn parse_and_verify_signature(
signature_bytes: &[u8],
signed_data: Option<&[u8]>,
) -> Result<EnhancedSignatureInfo> {
trace!("Parsing signature: {} bytes", signature_bytes.len());
let mut info = EnhancedSignatureInfo {
format: "PKCS#7/CMS".to_string(),
size: signature_bytes.len(),
digest_algorithm: "Unknown".to_string(),
signature_algorithm: "Unknown".to_string(),
is_verified: false,
verification_errors: Vec::new(),
certificates: Vec::new(),
signer_count: 0,
timestamp_info: None,
};
if signature_bytes.is_empty() {
info.verification_errors
.push("Empty signature data".to_string());
return Ok(info);
}
match crate::cms_parser::parse_cms_signature(signature_bytes) {
Ok(cms_info) => {
debug!("Successfully parsed CMS signature");
info.signer_count = cms_info.signers.len();
if let Some(first_signer) = cms_info.signers.first() {
info.digest_algorithm
.clone_from(&first_signer.digest_algorithm);
info.signature_algorithm
.clone_from(&first_signer.signature_algorithm);
if let Some(ref pk) = first_signer.public_key {
debug!(
"Extracted {} public key: {} bits",
pk.algorithm, pk.key_size
);
}
}
for cert in &cms_info.certificates {
let cert_info = CertificateInfo {
subject: cert.subject.clone(),
issuer: cert.issuer.clone(),
serial_number: cert.serial_number.clone(),
not_before: "Unknown".to_string(), not_after: "Unknown".to_string(),
is_valid: true, };
info.certificates.push(cert_info);
}
if let Some(data) = signed_data {
if let Some(first_signer) = cms_info.signers.first() {
if let Some(ref public_key) = first_signer.public_key {
match crate::cms_parser::verify_with_public_key(
public_key,
data,
&first_signer.signature,
&first_signer.digest_algorithm,
) {
Ok(true) => {
info.is_verified = true;
debug!("Signature verification successful!");
}
Ok(false) => {
info.verification_errors
.push("Signature verification failed".to_string());
}
Err(e) => {
info.verification_errors
.push(format!("Verification error: {e}"));
}
}
} else {
info.verification_errors
.push("No public key found for verification".to_string());
}
}
}
let pkcs7_info = parse_pkcs7_structure(signature_bytes);
if let Some(timestamp) = extract_timestamp_info(&pkcs7_info) {
info.timestamp_info = Some(timestamp);
}
}
Err(e) => {
warn!("CMS parsing failed, falling back to manual parsing: {e}");
let pkcs7_info = parse_pkcs7_structure(signature_bytes);
info.digest_algorithm
.clone_from(&pkcs7_info.digest_algorithm);
info.signature_algorithm
.clone_from(&pkcs7_info.signature_algorithm);
info.signer_count = pkcs7_info.signer_count;
for cert_der in &pkcs7_info.certificates {
match Certificate::from_der(cert_der) {
Ok(cert) => {
let cert_info = extract_certificate_info(&cert);
info.certificates.push(cert_info);
}
Err(e) => {
warn!("Failed to parse certificate: {e}");
}
}
}
if let Some(timestamp) = extract_timestamp_info(&pkcs7_info) {
info.timestamp_info = Some(timestamp);
}
if let Some(data) = signed_data {
if verify_signature_data(&pkcs7_info, data) {
info.is_verified = true;
} else {
info.verification_errors
.push("Signature verification failed".to_string());
}
}
}
}
Ok(info)
}
struct Pkcs7Info {
digest_algorithm: String,
signature_algorithm: String,
signer_count: usize,
certificates: Vec<Vec<u8>>,
#[allow(dead_code)]
signature_value: Vec<u8>,
raw_data: Vec<u8>,
}
fn parse_pkcs7_structure(data: &[u8]) -> Pkcs7Info {
let mut info = Pkcs7Info {
digest_algorithm: "Unknown".to_string(),
signature_algorithm: "Unknown".to_string(),
signer_count: 0,
certificates: Vec::new(),
signature_value: Vec::new(),
raw_data: data.to_vec(),
};
if let Some(pos) = find_oid_pattern(data, oids::SHA256) {
info.digest_algorithm = "SHA-256".to_string();
debug!("Found SHA-256 digest algorithm at position {pos}");
} else if let Some(pos) = find_oid_pattern(data, oids::SHA384) {
info.digest_algorithm = "SHA-384".to_string();
debug!("Found SHA-384 digest algorithm at position {pos}");
} else if let Some(pos) = find_oid_pattern(data, oids::SHA512) {
info.digest_algorithm = "SHA-512".to_string();
debug!("Found SHA-512 digest algorithm at position {pos}");
}
if find_oid_pattern(data, oids::RSA_SHA256).is_some() {
info.signature_algorithm = "RSA with SHA-256".to_string();
} else if find_oid_pattern(data, oids::RSA_SHA384).is_some() {
info.signature_algorithm = "RSA with SHA-384".to_string();
} else if find_oid_pattern(data, oids::RSA_SHA512).is_some() {
info.signature_algorithm = "RSA with SHA-512".to_string();
}
let mut pos = 0;
while pos < data.len().saturating_sub(4) {
if data[pos] == 0x30 && data[pos + 1] == 0x82 {
let len = ((data[pos + 2] as usize) << 8) | (data[pos + 3] as usize);
if len > 300 && len < 2000 && pos + 4 + len <= data.len() {
let cert_data = data[pos..pos + 4 + len].to_vec();
if cert_data.len() > 100 && cert_data[4] == 0x30 {
info.certificates.push(cert_data);
debug!("Found potential certificate at position {pos}, length {len}");
}
pos += 4 + len;
} else {
pos += 1;
}
} else {
pos += 1;
}
}
if !info.certificates.is_empty() {
info.signer_count = 1;
}
debug!(
"Parsed PKCS#7: {} certificates, {} signers, digest: {}, signature: {}",
info.certificates.len(),
info.signer_count,
info.digest_algorithm,
info.signature_algorithm
);
info
}
fn find_oid_pattern(data: &[u8], oid_str: &str) -> Option<usize> {
let oid = ObjectIdentifier::new(oid_str).ok()?;
let oid_bytes = oid.to_der().ok()?;
data.windows(oid_bytes.len())
.position(|window| window == oid_bytes)
}
fn extract_certificate_info(cert: &Certificate) -> CertificateInfo {
let tbs = &cert.tbs_certificate;
CertificateInfo {
subject: tbs.subject.to_string(),
issuer: tbs.issuer.to_string(),
serial_number: tbs.serial_number.to_string(),
not_before: format!("{}", tbs.validity.not_before),
not_after: format!("{}", tbs.validity.not_after),
is_valid: is_certificate_valid(&tbs.validity),
}
}
fn is_certificate_valid(_validity: &Validity) -> bool {
true
}
fn extract_timestamp_info(pkcs7: &Pkcs7Info) -> Option<TimestampInfo> {
let mut timestamp_info = TimestampInfo {
signing_time: None,
is_verified: false,
timestamp_authority: None,
};
if let Some(pos) = find_oid_pattern(&pkcs7.raw_data, oids::SIGNING_TIME) {
timestamp_info.signing_time = Some(format!("Present (position: {pos})"));
debug!("Found signing time attribute at position {pos}");
if pos + 20 < pkcs7.raw_data.len() {
let time_data = &pkcs7.raw_data[pos + 11..pos + 30];
trace!("Time data near signing time OID: {:02x?}", time_data);
}
}
if let Some(pos) = find_oid_pattern(&pkcs7.raw_data, oids::TIMESTAMP_TOKEN) {
timestamp_info.timestamp_authority = Some("TSA present".to_string());
debug!("Found timestamp token at position {pos}");
}
if timestamp_info.signing_time.is_none() && timestamp_info.timestamp_authority.is_none() {
None
} else {
timestamp_info.is_verified = true; Some(timestamp_info)
}
}
fn verify_signature_data(pkcs7: &Pkcs7Info, signed_data: &[u8]) -> bool {
debug!("Signature verification not yet implemented");
let digest = match pkcs7.digest_algorithm.as_str() {
"SHA-256" => Sha256::digest(signed_data).to_vec(),
"SHA-384" => Sha384::digest(signed_data).to_vec(),
"SHA-512" => Sha512::digest(signed_data).to_vec(),
_ => return false,
};
debug!(
"Computed {} digest: {} bytes",
pkcs7.digest_algorithm,
digest.len()
);
false
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_oid_pattern_finding() {
let oid = ObjectIdentifier::new(oids::SHA256).unwrap();
let oid_bytes = oid.to_der().unwrap();
let mut data = vec![0x00, 0x00];
data.extend_from_slice(&oid_bytes);
data.extend_from_slice(&[0x00, 0x00]);
let pos = find_oid_pattern(&data, oids::SHA256);
assert_eq!(pos, Some(2));
}
#[test]
fn test_empty_signature_parsing() {
let result = parse_and_verify_signature(&[], None);
assert!(result.is_ok());
let info = result.unwrap();
assert!(!info.is_verified);
assert!(!info.verification_errors.is_empty());
}
}