use super::{
certstore::TrustAmount,
error::{Error, PgpError, VerificationError},
};
use sequoia_openpgp::{
parse::Parse as _,
policy::StandardPolicy,
serialize::{Serialize as _, SerializeInto as _},
};
use tracing::warn;
pub mod error;
#[derive(Default, Copy, Clone)]
pub enum CipherSuite {
#[default]
Cv25519,
RSA4k,
}
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord)]
pub struct Fingerprint(pub(crate) sequoia_openpgp::Fingerprint);
impl Fingerprint {
pub fn to_hex(&self) -> String {
self.0.to_hex()
}
}
impl std::fmt::Display for Fingerprint {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl std::str::FromStr for Fingerprint {
type Err = PgpError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match sequoia_openpgp::Fingerprint::from_str(s) {
Ok(
fp @ sequoia_openpgp::Fingerprint::V4(_) | fp @ sequoia_openpgp::Fingerprint::V6(_),
) => Ok(Self(fp)),
Ok(_) => Err(Self::Err::Error(
"Incompatible fingerprint version".to_string(),
)),
Err(e) => Err(Self::Err::from(e)),
}
}
}
impl std::fmt::Debug for Fingerprint {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.0)
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct Cert(pub(crate) sequoia_openpgp::cert::Cert);
#[derive(Clone, Debug)]
pub struct ValidCert<'a>(pub(crate) sequoia_openpgp::cert::ValidCert<'a>);
#[derive(Debug, Default, Clone)]
pub struct ValidCertPolicy<'a>(StandardPolicy<'a>);
impl<'a> ValidCert<'a> {
pub fn fingerprint(&self) -> Fingerprint {
Fingerprint(self.0.fingerprint())
}
pub fn keys(&self) -> Vec<Key> {
self.0
.keys()
.map(|ka| Key {
fingerprint: Fingerprint(ka.key().fingerprint()),
})
.collect()
}
pub fn userids(&self) -> Vec<String> {
self.0
.userids()
.map(|uid| uid.userid().to_string())
.collect()
}
pub fn cert_type(&self) -> CertType {
if self.0.cert().is_tsk() {
CertType::Secret
} else {
CertType::Public
}
}
pub fn expiration_time(&self) -> Option<std::time::SystemTime> {
self.0.primary_key().key_expiration_time()
}
pub fn trust_amount(&self, certifier: &Cert, userid: &str) -> TrustAmount {
let certifications = super::certstore::active_certification(
&self.0,
std::iter::once(sequoia_openpgp::packet::UserID::from(userid)),
certifier.0.primary_key().key().role_as_unspecified(),
);
if certifications.iter().any(|(_, certification)| {
certification
.as_ref()
.is_some_and(|c| c.trust_signature().unwrap_or((0, 120)).1 == 120)
}) {
TrustAmount::Full
} else {
TrustAmount::None
}
}
pub fn revocation_status(&self) -> RevocationStatus {
RevocationStatus::from_sequoia(self.0.revocation_status())
}
pub fn get_primary_email(&self) -> Result<Option<String>, PgpError> {
self.0
.primary_userid()
.map_err(PgpError::from)?
.userid()
.email()
.map_err(PgpError::from)
.map(|s| s.map(String::from))
}
}
impl std::fmt::Display for ValidCert<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
if let Ok(user_id) = self.0.primary_userid() {
write!(f, "{}", user_id.userid())?;
} else {
write!(f, "Missing or invalid user ID")?;
}
write!(f, " [{}]", self.fingerprint().to_hex())
}
}
impl Cert {
pub fn fingerprint(&self) -> Fingerprint {
Fingerprint(self.0.fingerprint())
}
pub fn validate<'a>(&'a self, policy: &'a ValidCertPolicy) -> Result<ValidCert<'a>, PgpError> {
self.0
.with_policy(&policy.0, None)
.map(ValidCert)
.map_err(|e| {
PgpError::Error(format!(
"Invalid key: no valid self-signature was found. {e}"
))
})
}
pub async fn set_expiration_time(
mut self,
signer: crate::openpgp::keystore::Key,
expiration_time: Option<std::time::SystemTime>,
) -> Result<Self, Error> {
use sequoia_openpgp::packet::signature::SignatureBuilder;
let mut signer = signer.inner;
tokio::task::spawn_blocking(move || -> Result<_, Error> {
let now = std::time::SystemTime::now();
let policy = StandardPolicy::new();
let mut signatures = Vec::new();
for ka in self.0.keys().subkeys() {
signatures.push(
ka.key()
.bind(
&mut signer,
&self.0,
SignatureBuilder::from(
ka.binding_signature(&policy, now)
.or_else(|_| {
ka.self_signatures()
.next()
.ok_or(PgpError::from("no subkey binding signature"))
})?
.clone(),
)
.set_signature_creation_time(now)
.map_err(PgpError::from)?
.set_key_expiration_time(ka.key(), expiration_time)
.map_err(PgpError::from)?,
)
.map_err(PgpError::from)?,
);
}
signatures.push(
SignatureBuilder::from(
self.0
.primary_key()
.binding_signature(&policy, now)
.or_else(|_| {
self.0
.primary_key()
.self_signatures()
.next()
.ok_or(PgpError::from("no primary key signature"))
})?
.clone(),
)
.set_type(sequoia_openpgp::types::SignatureType::DirectKey)
.set_signature_creation_time(now)
.map_err(PgpError::from)?
.set_key_expiration_time(self.0.primary_key().key(), expiration_time)
.map_err(PgpError::from)?
.sign_direct_key(&mut signer, None)
.map_err(PgpError::from)?,
);
for ua in self.0.userids() {
signatures.push(
ua.userid()
.bind(
&mut signer,
&self.0,
SignatureBuilder::from(
ua.binding_signature(&policy, now)
.or_else(|_| {
ua.self_signatures()
.next()
.ok_or(PgpError::from("no user ID binding signature"))
})?
.clone(),
)
.set_signature_creation_time(now)
.map_err(PgpError::from)?
.set_key_expiration_time(self.0.primary_key().key(), expiration_time)
.map_err(PgpError::from)?,
)
.map_err(PgpError::from)?,
);
}
self.0 = self.0.insert_packets(signatures).map_err(PgpError::from)?.0;
Ok(self)
})
.await?
}
pub fn from_bytes(data: impl AsRef<[u8]>) -> Result<Self, PgpError> {
Ok(Self(
sequoia_openpgp::cert::Cert::from_bytes(data.as_ref()).map_err(PgpError::from)?,
))
}
pub fn cert_type(&self) -> CertType {
if self.0.is_tsk() {
CertType::Secret
} else {
CertType::Public
}
}
pub fn is_encrypted(&self) -> Result<bool, PgpError> {
Ok(self
.0
.primary_key()
.key()
.clone()
.parts_into_secret()
.map_err(PgpError::from)?
.secret()
.is_encrypted())
}
pub fn revocation_status(&self) -> RevocationStatus {
RevocationStatus::from_sequoia(self.0.revocation_status(&StandardPolicy::new(), None))
}
pub async fn generate_rev_sig(
&self,
signer: crate::openpgp::keystore::Key,
code: ReasonForRevocation,
reason: String,
) -> Result<Vec<u8>, Error> {
let mut signer = signer.inner;
let cert = self.0.clone();
let signature = tokio::task::spawn_blocking(move || -> Result<_, PgpError> {
cert.revoke(&mut signer, code.into_sequoia(), reason.as_bytes())
.map_err(PgpError::from)
})
.await??;
Ok(serialize_revocation_signature(&self.0, signature)?)
}
pub fn revoke<R: std::io::Read + Send + Sync>(&self, signature: R) -> Result<Self, PgpError> {
let mut ppr = sequoia_openpgp::parse::PacketParserBuilder::from_reader(signature)
.and_then(|ppb| ppb.buffer_unread_content().build())
.map_err(|_| error::RevocationError::NoSignature)?;
let mut revocations = Vec::new();
while let sequoia_openpgp::parse::PacketParserResult::Some(pp) = ppr {
let (packet, next_ppr) = pp
.recurse()
.map_err(|e| error::RevocationError::ParsingError(e.to_string()))?;
ppr = next_ppr;
if let sequoia_openpgp::Packet::Signature(sig) = packet
&& matches!(
sig.typ(),
sequoia_openpgp::types::SignatureType::CertificationRevocation
| sequoia_openpgp::types::SignatureType::KeyRevocation
| sequoia_openpgp::types::SignatureType::SubkeyRevocation
)
{
revocations.push(sig);
}
}
if revocations.is_empty() {
return Err(error::RevocationError::NoSignature.into());
}
let revoked_cert = Self(
self.0
.clone()
.insert_packets(revocations)
.map_err(PgpError::from)?
.0,
);
if let RevocationStatus::Revoked(_) = revoked_cert.revocation_status() {
Ok(revoked_cert)
} else {
Err(error::RevocationError::WrongSignature.into())
}
}
pub async fn update_secret_material(
self,
key_store: &mut crate::openpgp::keystore::KeyStore,
) -> Result<Cert, PgpError> {
let mut packets: Vec<sequoia_openpgp::Packet> = Vec::new();
let primary_fingerprint = self.0.primary_key().key().fingerprint();
let primary_or_subkey_text = |fingerprint: &sequoia_openpgp::Fingerprint| {
if fingerprint == &primary_fingerprint {
"".to_string()
} else {
format!(" the subkey '{fingerprint}' of")
}
};
for fingerprint in self.0.keys().map(|ka| ka.key().fingerprint()) {
if key_store
.inner
.find_key_async(fingerprint.clone().into())
.await
.map_err(PgpError::from)?
.is_empty()
{
return Err(PgpError::Error(format!(
"No secret material was found in the keystore for{primary_fingerprint} \
the certificate '{}'",
primary_or_subkey_text(&fingerprint)
)));
}
let secret_key = key_store.export(&fingerprint).await.map_err(|_| {
PgpError::Error(format!(
"Secret material for{} the certificate '{primary_fingerprint}' \
cannot be exported from the keystore",
primary_or_subkey_text(&fingerprint)
))
})?;
if fingerprint == primary_fingerprint {
packets.push(secret_key.role_into_primary().into())
} else {
packets.push(secret_key.role_into_subordinate().into())
}
}
let (cert, _modified) = self.0.insert_packets(packets).map_err(PgpError::from)?;
Ok(Cert(cert))
}
pub fn public(&self) -> CertDataView<&sequoia_openpgp::cert::Cert> {
CertDataView(&self.0)
}
pub fn secret(&self) -> Result<CertDataView<SecretCertDataView<'_>>, PgpError> {
if !self.0.is_tsk() {
return Err(PgpError::from(
"Secret material is missing, cannot generate a revocation signature for a public key.",
));
}
Ok(CertDataView(SecretCertDataView(&self.0)))
}
}
impl std::fmt::Display for Cert {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let user_id = self
.0
.with_policy(&StandardPolicy::new(), None)
.map_or_else(
|_| String::from("Invalid key: no valid self-signature was found"),
|valid_cert| {
valid_cert.primary_userid().map_or_else(
|_| String::from("Missing or no valid user ID"),
|user_id| user_id.userid().to_string(),
)
},
);
write!(f, "{} [{}]", user_id, self.fingerprint().to_hex())
}
}
#[derive(Debug, Clone, Copy)]
pub struct CertDataView<T>(T);
#[derive(Debug)]
pub struct AsciiArmored(pub(crate) Vec<u8>);
impl AsRef<[u8]> for AsciiArmored {
fn as_ref(&self) -> &[u8] {
&self.0
}
}
impl std::ops::Deref for AsciiArmored {
type Target = [u8];
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl From<AsciiArmored> for Vec<u8> {
fn from(value: AsciiArmored) -> Self {
value.0
}
}
#[derive(Debug, Copy, Clone)]
pub struct SecretCertDataView<'a>(&'a sequoia_openpgp::cert::Cert);
impl TryFrom<CertDataView<&sequoia_openpgp::cert::Cert>> for Vec<u8> {
type Error = PgpError;
fn try_from(value: CertDataView<&sequoia_openpgp::cert::Cert>) -> Result<Self, Self::Error> {
value.0.to_vec().map_err(Self::Error::from)
}
}
impl TryFrom<CertDataView<SecretCertDataView<'_>>> for Vec<u8> {
type Error = PgpError;
fn try_from(value: CertDataView<SecretCertDataView<'_>>) -> Result<Self, Self::Error> {
value.0.0.as_tsk().to_vec().map_err(Self::Error::from)
}
}
impl TryFrom<CertDataView<&sequoia_openpgp::cert::Cert>> for AsciiArmored {
type Error = PgpError;
fn try_from(value: CertDataView<&sequoia_openpgp::cert::Cert>) -> Result<Self, Self::Error> {
Ok(Self(value.0.armored().to_vec().map_err(Self::Error::from)?))
}
}
impl TryFrom<CertDataView<SecretCertDataView<'_>>> for AsciiArmored {
type Error = PgpError;
fn try_from(value: CertDataView<SecretCertDataView<'_>>) -> Result<Self, Self::Error> {
Ok(Self(
value
.0
.0
.as_tsk()
.armored()
.to_vec()
.map_err(Self::Error::from)?,
))
}
}
impl<T> serde::Serialize for CertDataView<T>
where
AsciiArmored: TryFrom<CertDataView<T>>,
<AsciiArmored as TryFrom<CertDataView<T>>>::Error: std::fmt::Display,
CertDataView<T>: Copy,
{
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::Error;
let bytes = AsciiArmored::try_from(*self).map_err(Error::custom)?;
let decoded = std::str::from_utf8(&bytes).map_err(Error::custom)?;
serializer.collect_str(decoded)
}
}
pub struct CertBuilder<'a> {
builder: sequoia_openpgp::cert::CertBuilder<'a>,
}
impl Default for CertBuilder<'_> {
fn default() -> Self {
Self {
builder: sequoia_openpgp::cert::CertBuilder::new()
.set_cipher_suite(sequoia_openpgp::cert::CipherSuite::Cv25519),
}
}
}
impl CertBuilder<'_> {
pub fn new() -> Self {
Self::default()
}
pub fn add_userid(mut self, uid: &str) -> Self {
self.builder = self.builder.add_userid(uid);
self
}
pub fn set_validity_period(
mut self,
validity_period: impl Into<Option<std::time::Duration>>,
) -> Self {
self.builder = self.builder.set_validity_period(validity_period);
self
}
pub fn set_cipher_suite(mut self, cipher: CipherSuite) -> Self {
self.builder = self.builder.set_cipher_suite(match cipher {
CipherSuite::Cv25519 => sequoia_openpgp::cert::CipherSuite::Cv25519,
CipherSuite::RSA4k => sequoia_openpgp::cert::CipherSuite::RSA4k,
});
self
}
pub fn set_password(mut self, password: Option<crate::secret::Secret>) -> Self {
self.builder = self
.builder
.set_password(password.map(|p| p.as_inner().to_owned()));
self
}
pub fn generate(self) -> Result<(Cert, Vec<u8>), PgpError> {
let (cert, rev) = self
.builder
.add_authentication_subkey()
.add_signing_subkey()
.add_subkey(
sequoia_openpgp::types::KeyFlags::empty()
.set_transport_encryption()
.set_storage_encryption(),
None,
None,
)
.generate()
.map_err(PgpError::from)?;
let rev_sig = serialize_revocation_signature(&cert, rev)?;
Ok((Cert(cert), rev_sig))
}
}
fn serialize_revocation_signature(
cert: &sequoia_openpgp::Cert,
rev: sequoia_openpgp::packet::Signature,
) -> Result<Vec<u8>, PgpError> {
let mut rev_serialized = Vec::new();
let mut writer = sequoia_openpgp::armor::Writer::with_headers(
&mut rev_serialized,
sequoia_openpgp::armor::Kind::Signature,
std::iter::once(("Comment", "Revocation certificate for")).chain(
cert.armor_headers()
.iter()
.map(|value| ("Comment", value.as_str())),
),
)
.map_err(PgpError::from)?;
sequoia_openpgp::packet::Packet::Signature(rev)
.serialize(&mut writer)
.map_err(PgpError::from)?;
writer.finalize().map_err(PgpError::from)?;
Ok(rev_serialized)
}
pub fn parse_certs<R: std::io::Read + Send + Sync>(data: R) -> Result<Vec<Cert>, PgpError> {
sequoia_openpgp::cert::CertParser::from(
sequoia_openpgp::parse::PacketParser::from_reader(data).map_err(PgpError::from)?,
)
.map(|c| Ok(Cert(c.map_err(PgpError::from)?)))
.collect()
}
#[derive(Clone, Copy, Debug, Default, PartialEq)]
pub enum CertType {
#[default]
Public,
Secret,
}
impl std::fmt::Display for CertType {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
CertType::Public => write!(f, "public"),
CertType::Secret => write!(f, "private"),
}
}
}
#[derive(Debug, Clone)]
pub struct Signature(sequoia_openpgp::packet::Signature);
impl Signature {
pub fn reason_for_revocation(&self) -> Option<(ReasonForRevocation, &[u8])> {
self.0
.reason_for_revocation()
.map(|(code, msg)| (ReasonForRevocation::from_sequoia(code), msg))
}
}
#[derive(Debug, Clone)]
pub enum RevocationStatus {
Revoked(Vec<Signature>),
CouldBe(Vec<Signature>),
NotAsFarAsWeKnow,
}
impl RevocationStatus {
fn from_sequoia(value: sequoia_openpgp::types::RevocationStatus) -> Self {
use sequoia_openpgp::types::RevocationStatus::*;
match value {
Revoked(s) => {
RevocationStatus::Revoked(s.into_iter().map(|s| Signature(s.clone())).collect())
}
CouldBe(s) => {
RevocationStatus::CouldBe(s.into_iter().map(|s| Signature(s.clone())).collect())
}
NotAsFarAsWeKnow => RevocationStatus::NotAsFarAsWeKnow,
}
}
}
#[derive(Debug, Clone, PartialEq)]
#[non_exhaustive]
pub enum ReasonForRevocation {
Unspecified,
KeySuperseded,
KeyCompromised,
KeyRetired,
UIDRetired,
Private(u8),
Unknown(u8),
}
impl std::fmt::Display for ReasonForRevocation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.clone().into_sequoia())
}
}
impl ReasonForRevocation {
pub(super) fn into_sequoia(self) -> sequoia_openpgp::types::ReasonForRevocation {
use sequoia_openpgp::types::ReasonForRevocation::*;
match self {
ReasonForRevocation::Unspecified => Unspecified,
ReasonForRevocation::KeySuperseded => KeySuperseded,
ReasonForRevocation::KeyCompromised => KeyCompromised,
ReasonForRevocation::KeyRetired => KeyRetired,
ReasonForRevocation::UIDRetired => UIDRetired,
ReasonForRevocation::Private(v) => Private(v),
ReasonForRevocation::Unknown(v) => Unknown(v),
}
}
pub(super) fn from_sequoia(val: sequoia_openpgp::types::ReasonForRevocation) -> Self {
use sequoia_openpgp::types::ReasonForRevocation::*;
match val {
Unspecified => Self::Unspecified,
KeySuperseded => Self::KeySuperseded,
KeyCompromised => Self::KeyCompromised,
KeyRetired => Self::KeyRetired,
UIDRetired => Self::UIDRetired,
Private(v) => Self::Private(v),
Unknown(v) => Self::Unknown(v),
_ => todo!(),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct Key {
pub(super) fingerprint: Fingerprint,
}
impl Key {
pub fn fingerprint(&self) -> Fingerprint {
self.fingerprint.clone()
}
}
pub(crate) fn warn_if_cert_expires_soon(valid_cert: &sequoia_openpgp::cert::ValidCert) {
use chrono::{DateTime, Duration, Utc};
if let Some(expiration_time) = valid_cert.primary_key().key_expiration_time() {
let expiration_time = DateTime::<Utc>::from(expiration_time);
if expiration_time < Utc::now() + Duration::days(90) {
warn!(
"Certificate {}[{}] expires on {}, consider renewing it",
valid_cert
.userids()
.next()
.map_or_else(String::new, |uid| format!("{} ", uid.userid())),
valid_cert.fingerprint(),
expiration_time
);
}
}
}
pub(crate) fn verify_file_signature(
body: &[u8],
signature: &[u8],
cert_store: &super::certstore::CertStore<'_>,
) -> Result<(), PgpError> {
use sequoia_openpgp::parse::{Parse, stream::DetachedVerifierBuilder};
DetachedVerifierBuilder::from_bytes(signature)
.map_err(PgpError::from)?
.with_policy(
&sequoia_openpgp::policy::StandardPolicy::new(),
None,
crate::openpgp::crypto::VerificationHelper { cert_store },
)
.map_err(PgpError::from)?
.verify_bytes(body)
.map_err(|e| match e.downcast() {
Ok(VerificationError::MissingKey { fingerprint }) => {
PgpError::VerificationError(VerificationError::MissingKey { fingerprint })
}
Ok(VerificationError::Error(value)) => {
PgpError::VerificationError(VerificationError::Error(value))
}
Err(value) => PgpError::from(value),
})
}
#[cfg(test)]
mod tests {
use sequoia_openpgp::{cert::CertBuilder, packet::UserID};
use super::*;
const VALID_USER_ID1: &str = "chuck norris <chuck@roundhouse.org>";
const VALID_USER_ID2: &str = "cn <cn@roundhouse.org>";
const INVALID_USER_ID: &str = "sgt hartman <invalid@roundhouse.org>";
#[test]
fn parse_fingerprint() {
use std::str::FromStr;
assert_eq!(
Fingerprint::from_str("B2E961753ECE0B345E718E74BA6F29C998DDD9BF")
.unwrap()
.0,
sequoia_openpgp::Fingerprint::from_bytes(
4,
&[
0xB2, 0xE9, 0x61, 0x75, 0x3E, 0xCE, 0x0B, 0x34, 0x5E, 0x71, 0x8E, 0x74, 0xBA,
0x6F, 0x29, 0xC9, 0x98, 0xDD, 0xD9, 0xBF
]
)
.unwrap()
);
assert_eq!(
Fingerprint::from_str("B2E9 6175 3ECE 0B34 5E71 8E74 BA6F 29C9 98DD D9BF")
.unwrap()
.0,
sequoia_openpgp::Fingerprint::from_bytes(
4,
&[
0xB2, 0xE9, 0x61, 0x75, 0x3E, 0xCE, 0x0B, 0x34, 0x5E, 0x71, 0x8E, 0x74, 0xBA,
0x6F, 0x29, 0xC9, 0x98, 0xDD, 0xD9, 0xBF
]
)
.unwrap()
);
sequoia_openpgp::Fingerprint::from_bytes(
6,
&[
0xB2, 0xE9, 0x61, 0x75, 0x3E, 0xCE, 0x0B, 0x34, 0x5E, 0x71, 0x8E, 0x74, 0xBA, 0x6F,
0x29, 0xC9, 0x98, 0xDD, 0xD9, 0xBF, 0xB2, 0xE9, 0x61, 0x75, 0x3E, 0xCE, 0x0B, 0x34,
0x5E, 0x71, 0x8E, 0x74,
],
)
.unwrap();
assert_eq!(
Fingerprint::from_str(
"B2E9 6175 3ECE 0B34 5E71 8E74 BA6F 29C9 98DD D9BF B2E9 6175 3ECE 0B34 5E71 8E74"
)
.unwrap()
.0,
sequoia_openpgp::Fingerprint::from_bytes(
6,
&[
0xB2, 0xE9, 0x61, 0x75, 0x3E, 0xCE, 0x0B, 0x34, 0x5E, 0x71, 0x8E, 0x74, 0xBA,
0x6F, 0x29, 0xC9, 0x98, 0xDD, 0xD9, 0xBF, 0xB2, 0xE9, 0x61, 0x75, 0x3E, 0xCE,
0x0B, 0x34, 0x5E, 0x71, 0x8E, 0x74
]
)
.unwrap()
);
assert!(Fingerprint::from_str("").is_err());
assert!(Fingerprint::from_str("aaa").is_err());
assert!(Fingerprint::from_str("aaaa").is_err());
}
#[test]
fn test_userids() {
let policy = ValidCertPolicy::default();
let (cert, _sig) = CertBuilder::general_purpose(Some(VALID_USER_ID1))
.generate()
.unwrap();
assert_eq!(cert.userids().len(), 1);
assert_eq!(
Cert(cert).validate(&policy).unwrap().userids(),
vec![VALID_USER_ID1]
);
let (cert, _sig) = CertBuilder::general_purpose(Some(VALID_USER_ID1))
.add_userid(VALID_USER_ID2)
.generate()
.unwrap();
assert_eq!(cert.userids().len(), 2);
assert_eq!(
Cert(cert).validate(&policy).unwrap().userids(),
vec![VALID_USER_ID1, VALID_USER_ID2]
);
let (cert, _sig) = CertBuilder::general_purpose(None::<UserID>)
.generate()
.unwrap();
let empty_vec: Vec<String> = Vec::new();
assert_eq!(cert.userids().len(), 0);
assert_eq!(Cert(cert).validate(&policy).unwrap().userids(), empty_vec);
let (cert, _sig) = CertBuilder::general_purpose(None::<UserID>)
.generate()
.unwrap();
let invalid_user: UserID = INVALID_USER_ID.into();
let (cert, changed) = cert.insert_packets(invalid_user).unwrap();
assert!(changed);
assert_eq!(cert.userids().len(), 1);
assert_eq!(Cert(cert).validate(&policy).unwrap().userids(), empty_vec);
let (cert, _sig) = CertBuilder::general_purpose(Some(VALID_USER_ID1))
.generate()
.unwrap();
let invalid_user: UserID = INVALID_USER_ID.into();
let (cert, changed) = cert.insert_packets(invalid_user).unwrap();
assert!(changed);
assert_eq!(cert.userids().len(), 2);
assert_eq!(
Cert(cert).validate(&policy).unwrap().userids(),
vec![VALID_USER_ID1]
);
let future_time = std::time::SystemTime::now() + std::time::Duration::from_secs(60);
let (cert, _) = CertBuilder::new()
.set_creation_time(future_time)
.add_userid(INVALID_USER_ID)
.generate()
.unwrap();
assert_eq!(cert.userids().len(), 1);
assert!(Cert(cert).validate(&policy).is_err());
}
const PUB: &[u8] =
include_bytes!("../../tests/data/B2E961753ECE0B345E718E74BA6F29C998DDD9BF.pub");
const SEC: &[u8] =
include_bytes!("../../tests/data/B2E961753ECE0B345E718E74BA6F29C998DDD9BF.sec");
#[test]
fn test_export_public_cert_to_bytes_works() {
let cert = Cert::from_bytes(PUB).unwrap();
let exported_cert: Vec<u8> = cert.public().try_into().unwrap();
let reimported_cert = Cert::from_bytes(&exported_cert).unwrap();
assert_eq!(reimported_cert, cert);
}
#[test]
fn test_export_public_cert_to_ascii_armored_works() {
let cert = Cert::from_bytes(PUB).unwrap();
let exported_cert = serde_json::to_string(&cert.public()).unwrap();
let reimported_cert =
Cert::from_bytes(serde_json::from_str::<String>(&exported_cert).unwrap()).unwrap();
assert_eq!(reimported_cert, cert);
}
#[test]
fn test_export_secret_cert_to_bytes_works() {
let cert = Cert::from_bytes(SEC).unwrap();
let exported_cert: Vec<u8> = cert.secret().unwrap().try_into().unwrap();
let reimported_cert = Cert::from_bytes(&exported_cert).unwrap();
assert_eq!(reimported_cert, cert);
}
#[test]
fn test_export_secret_cert_to_ascii_armored_works() {
let cert = Cert::from_bytes(SEC).unwrap();
let exported_cert = serde_json::to_string(&cert.secret().unwrap()).unwrap();
let reimported_cert =
Cert::from_bytes(serde_json::from_str::<String>(&exported_cert).unwrap()).unwrap();
assert_eq!(reimported_cert, cert);
}
#[test]
fn test_export_public_part_of_cert_to_bytes_strips_secret_part() {
let public = Cert::from_bytes(PUB).unwrap();
let cert = Cert::from_bytes(SEC).unwrap();
let exported_cert: Vec<u8> = cert.public().try_into().unwrap();
let reimported_cert = Cert::from_bytes(&exported_cert).unwrap();
assert_eq!(reimported_cert, public);
}
#[test]
fn test_export_public_part_of_cert_to_ascii_armored_strips_secret_part() {
let public = Cert::from_bytes(PUB).unwrap();
let cert = Cert::from_bytes(SEC).unwrap();
let exported_cert = serde_json::to_string(&cert.public()).unwrap();
let reimported_cert =
Cert::from_bytes(serde_json::from_str::<String>(&exported_cert).unwrap()).unwrap();
assert_eq!(reimported_cert, public);
}
#[test]
fn test_export_secret_part_of_public_cert_fails() {
let public = Cert::from_bytes(PUB).unwrap();
assert!(public.secret().is_err());
}
const MATCHING_REV_SIG: &[u8] =
include_bytes!("../../tests/data/B2E961753ECE0B345E718E74BA6F29C998DDD9BF.rev");
const NON_MATCHING_REV_SIG: &[u8] =
include_bytes!("../../tests/data/1EA0292ECBF2457CADAE20E2B94FA6A56D9FA1FB.rev");
fn assert_cert_gets_revoked(revocation_certificate: &str) {
let public = Cert::from_bytes(PUB).unwrap();
let revoked_cert = public.revoke(std::io::Cursor::new(revocation_certificate));
assert!(
matches!(
revoked_cert.map(|cert| cert.revocation_status()),
Ok(RevocationStatus::Revoked(_))
),
"The certificate RevocationStatus should now be revoked"
);
}
#[test]
fn test_revoke_cert_with_matching_signature_succeeds() {
let revocation_certificate = std::str::from_utf8(MATCHING_REV_SIG).unwrap();
assert_cert_gets_revoked(revocation_certificate);
}
#[test]
fn test_revoke_cert_with_correct_signature_and_junk_succeeds() {
let revocation_certificate = format!(
"# Some comments\n{}\n\nA suffix.\n-----BEGIN PGP SIGNATURE-----",
std::str::from_utf8(MATCHING_REV_SIG).unwrap()
);
assert_cert_gets_revoked(&revocation_certificate);
}
#[test]
fn test_revoke_cert_with_matching_and_non_matching_signatures_succeeds() {
let revocation_certificate = format!(
"# Matching:\n{}\n\n# Non-matching\n{}",
std::str::from_utf8(MATCHING_REV_SIG).unwrap(),
std::str::from_utf8(NON_MATCHING_REV_SIG).unwrap(),
);
assert_cert_gets_revoked(&revocation_certificate);
}
fn assert_cert_revocation_fails(
revocation_certificate: &str,
expected_err: error::RevocationError,
) {
let public = Cert::from_bytes(PUB).unwrap();
let revoked_cert = public.revoke(std::io::Cursor::new(revocation_certificate));
match revoked_cert.unwrap_err() {
PgpError::CertError(error::CertError::RevocationError(error)) => {
assert_eq!(error, expected_err)
}
other => panic!("Expected {expected_err}, got: {other:?}"),
}
}
#[test]
fn test_revoke_cert_with_a_wrong_signature_fails() {
let revocation_certificate = std::str::from_utf8(NON_MATCHING_REV_SIG).unwrap();
assert_cert_revocation_fails(
revocation_certificate,
error::RevocationError::WrongSignature,
);
}
#[test]
fn test_revoke_cert_with_empty_signature_fails() {
let revocation_certificate = "";
assert_cert_revocation_fails(revocation_certificate, error::RevocationError::NoSignature);
}
#[test]
fn test_revoke_cert_with_not_a_signature_fails() {
let revocation_certificate = "not a valid signature";
assert_cert_revocation_fails(revocation_certificate, error::RevocationError::NoSignature);
}
#[test]
fn test_revoke_cert_with_not_a_signature_fails_2() {
let revocation_certificate = "-----BEGIN PGP SIGNATURE-----\n
not a valid signature\n
-----END PGP SIGNATURE-----";
assert_cert_revocation_fails(
revocation_certificate,
error::RevocationError::ParsingError("Malformed packet: Truncated packet".to_string()),
);
}
}