use sequoia_openpgp::serialize::stream::Recipient;
use tracing::{debug, trace, warn};
use super::{
cert::{AsciiArmored, Fingerprint},
certstore::CertStore,
error,
keystore::KeyStore,
};
#[derive(Clone, Debug)]
pub struct PasswordHint {
pub fingerprint: super::cert::Fingerprint,
pub userid: Option<String>,
pub fingerprint_primary: Option<super::cert::Fingerprint>,
}
#[derive(Debug)]
struct ErrorForward<S>(S);
impl<S: std::fmt::Display> std::fmt::Display for ErrorForward<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl<S: std::fmt::Debug + std::fmt::Display> core::error::Error for ErrorForward<S> {}
pub(crate) fn sign_detached<T: sequoia_openpgp::crypto::Signer + Send + Sync>(
data: &[u8],
cert: T,
) -> Result<AsciiArmored, error::PgpError> {
use sequoia_openpgp::serialize::stream::{Armorer, Message, Signer};
use std::io::Write as _;
let mut sink = vec![];
let message = Armorer::new(Message::new(&mut sink))
.kind(sequoia_openpgp::armor::Kind::Signature)
.build()
.map_err(error::PgpError::from)?;
let mut message = Signer::new(message, cert)
.map_err(error::PgpError::from)?
.detached()
.build()
.map_err(error::PgpError::from)?;
message.write_all(data).map_err(error::PgpError::from)?;
message.finalize().map_err(error::PgpError::from)?;
Ok(AsciiArmored(sink))
}
pub(crate) fn get_recipients<'a>(
certs: &'a [sequoia_openpgp::Cert],
policy: &'a impl sequoia_openpgp::policy::Policy,
) -> Result<(Vec<Recipient<'a>>, Vec<sequoia_openpgp::Fingerprint>), error::PgpError> {
let mut recipients = Vec::new();
let mut cert_fingerprints = Vec::new();
for cert in certs {
let mut found_one = false;
for ka in cert
.keys()
.with_policy(policy, None)
.supported()
.alive()
.revoked(false)
.for_transport_encryption()
{
recipients.push(ka.into());
cert_fingerprints.push(cert.fingerprint());
found_one = true;
}
if !found_one {
return Err(error::PgpError::Error(format!(
"No suitable encryption subkey for {cert}"
)));
}
}
Ok((recipients, cert_fingerprints))
}
pub(crate) struct Signer {
pub(crate) key: sequoia_keystore::Key,
pub(crate) cert_fingerprint: sequoia_openpgp::Fingerprint,
}
impl Signer {
#[tracing::instrument(skip_all, fields(cert=%valid_cert.fingerprint()))]
pub(crate) async fn get<F, Fut>(
valid_cert: &sequoia_openpgp::cert::ValidCert<'_>,
key_store: &mut KeyStore,
password: F,
) -> Result<Self, error::PgpError>
where
F: Fn(PasswordHint) -> Fut,
Fut: std::future::Future<Output = crate::secret::Secret>,
{
let signing_capable_keys = valid_cert
.keys()
.alive()
.revoked(false)
.for_signing()
.map(|ka| ka.key().fingerprint().into())
.collect::<Vec<_>>();
if signing_capable_keys.is_empty() {
return Err(error::PgpError::Error(format!(
"No signing-capable subkey found for the provided certificate: {valid_cert}"
)));
}
let (keys, _) = key_store
.inner
.find_keys_async(&signing_capable_keys)
.await
.map_err(error::PgpError::from)?;
if keys.is_empty() {
return Err(error::PgpError::Error(format!(
"No signing key found for the provided fingerprint: {}",
valid_cert.fingerprint()
)));
}
let mut errors = Vec::new();
for mut key in keys.into_iter() {
let cert_fingerprint = valid_cert.fingerprint();
let hint = PasswordHint {
fingerprint: Fingerprint(key.fingerprint()),
userid: valid_cert
.userids()
.next()
.map(|ka| ka.userid().to_string()),
fingerprint_primary: Some(Fingerprint(cert_fingerprint.clone())),
};
let key_description = hint_to_string(&hint);
trace!("Attempting to unlock key: {key_description}");
match key.locked_async().await {
Ok(sequoia_keystore::Protection::Unlocked) => {
trace!("Key is already unlocked");
return Ok(Self {
key,
cert_fingerprint,
});
}
Ok(sequoia_keystore::Protection::Password(_)) => {
let unlocked = key
.unlock_async(password(hint).await.as_inner().clone())
.await;
if let Ok(()) = unlocked {
trace!("Key unlocked with the provided password");
return Ok(Self {
key,
cert_fingerprint,
});
} else {
let err_msg = format!(
"The provided password failed to unlock key: {key_description}"
);
debug!(err_msg);
errors.push(err_msg);
}
}
Ok(_) => {
trace!("Key is externally protected");
return Ok(Self {
key,
cert_fingerprint,
});
}
Err(e) => {
warn!("Failed to check lock status of key: {key_description}. Reason: {e}");
errors.push(e.to_string());
}
}
}
Err(error::PgpError::Error(if !errors.is_empty() {
errors.join(", ")
} else {
format!("Unable to unlock private key: {valid_cert}")
}))
}
}
pub(crate) struct VerificationHelper<'cert_store, 'cert_store_ref> {
pub(crate) cert_store: &'cert_store_ref CertStore<'cert_store>,
}
pub(crate) struct DecryptionHelper<'cert_store, 'cert_store_ref, 'key_store_ref, F> {
pub(crate) cert_store: &'cert_store_ref CertStore<'cert_store>,
pub(crate) key_store: &'key_store_ref mut KeyStore,
pub(crate) password: F,
}
impl sequoia_openpgp::parse::stream::VerificationHelper for VerificationHelper<'_, '_> {
fn get_certs(
&mut self,
ids: &[sequoia_openpgp::KeyHandle],
) -> sequoia_openpgp::Result<Vec<sequoia_openpgp::Cert>> {
get_certs(self.cert_store, ids)
}
fn check(
&mut self,
structure: sequoia_openpgp::parse::stream::MessageStructure,
) -> sequoia_openpgp::Result<()> {
check(structure)
}
}
impl<F> sequoia_openpgp::parse::stream::VerificationHelper for DecryptionHelper<'_, '_, '_, F> {
fn get_certs(
&mut self,
ids: &[sequoia_openpgp::KeyHandle],
) -> sequoia_openpgp::Result<Vec<sequoia_openpgp::Cert>> {
get_certs(self.cert_store, ids)
}
fn check(
&mut self,
structure: sequoia_openpgp::parse::stream::MessageStructure,
) -> sequoia_openpgp::Result<()> {
check(structure)
}
}
fn check(
structure: sequoia_openpgp::parse::stream::MessageStructure,
) -> sequoia_openpgp::Result<()> {
use sequoia_openpgp::parse::stream::MessageLayer;
for layer in structure.into_iter() {
match layer {
MessageLayer::Compression { algo } => trace!("Data compressed using {}", algo),
MessageLayer::Encryption {
sym_algo,
aead_algo,
} => match aead_algo {
Some(aead_algo) => {
trace!(
"Data encrypted and protected using {}/{}",
sym_algo, aead_algo
)
}
None => trace!("Data encrypted using {}", sym_algo),
},
MessageLayer::SignatureGroup { results } => {
if let Some(Err(err)) = results.into_iter().find(|item| item.is_err()) {
return Err(error::VerificationError::from(err).into());
}
}
}
}
Ok(())
}
fn get_certs(
cert_store: &CertStore<'_>,
ids: &[sequoia_openpgp::KeyHandle],
) -> sequoia_openpgp::Result<Vec<sequoia_openpgp::Cert>> {
Ok(ids
.iter()
.flat_map(|key_handle| {
cert_store
.get_certs_by_key_handle(key_handle)
.unwrap_or_else(|e| {
tracing::warn!(?e);
Vec::new()
})
})
.collect())
}
impl<F> sequoia_openpgp::parse::stream::DecryptionHelper for DecryptionHelper<'_, '_, '_, F>
where
F: Fn(PasswordHint) -> crate::secret::Secret,
{
#[tracing::instrument(skip_all)]
fn decrypt(
&mut self,
pkesks: &[sequoia_openpgp::packet::PKESK],
_skesks: &[sequoia_openpgp::packet::SKESK],
sym_algo: Option<sequoia_openpgp::types::SymmetricAlgorithm>,
decrypt: &mut dyn FnMut(
Option<sequoia_openpgp::types::SymmetricAlgorithm>,
&sequoia_openpgp::crypto::SessionKey,
) -> bool,
) -> sequoia_openpgp::Result<Option<sequoia_openpgp::Cert>> {
let mut errors = Vec::new();
match self.key_store.inner.decrypt(pkesks) {
Ok((_i, fp, sym_algo, sk)) => {
trace!(fingerprint=%fp, "Decrypted with an unlocked key");
if decrypt(sym_algo, &sk) {
tracing::debug!("Decrypted data with key {fp}");
return Ok(Some(
self.cert_store.get_cert_by_fingerprint(&Fingerprint(fp))?.0,
));
}
}
Err(err) => {
trace!("Unlocking decryption key");
match err.downcast() {
Ok(sequoia_keystore::Error::InaccessibleDecryptionKey(keys)) => {
for key_status in keys.into_iter() {
let pkesk = key_status.pkesk().clone();
let mut key = key_status.into_key();
let protection = key.locked();
match protection {
Ok(sequoia_keystore::Protection::Password(_)) => {
let fingerprint = Fingerprint(key.fingerprint());
let policy = Default::default();
let hint = if let Ok(cert) =
self.cert_store.get_cert_by_fingerprint(&fingerprint)
&& let Ok(vc) = cert.validate(&policy)
{
super::cert::warn_if_cert_expires_soon(&vc.0);
PasswordHint {
fingerprint,
userid: vc.userids().first().cloned(),
fingerprint_primary: Some(vc.fingerprint()),
}
} else {
PasswordHint {
fingerprint,
userid: None,
fingerprint_primary: None,
}
};
let key_description = hint_to_string(&hint);
if let Ok(()) =
key.unlock((self.password)(hint).as_inner().clone())
{
let (sym_algo, sk) = pkesk
.decrypt(&mut key, sym_algo)
.ok_or(error::PgpError::from("failed"))?;
if decrypt(sym_algo, &sk) {
let fp = key.fingerprint();
tracing::debug!(
"Decrypted data with key: {key_description}"
);
return Ok(Some(
self.cert_store
.get_cert_by_fingerprint(&Fingerprint(fp))?
.0,
));
}
} else {
let err_msg = format!(
"The provided password failed to unlock key: {key_description}"
);
debug!(err_msg);
errors.push(err_msg);
}
}
Ok(p) => {
let err_msg =
format!("Unsupported key protection method {p:?}");
debug!(err_msg);
errors.push(err_msg);
}
Err(e) => {
let err_msg = e.to_string();
debug!(err_msg);
errors.push(err_msg);
}
}
}
}
Ok(err) => {
let err_msg = format!("Unsupported key unlock operation ({err})");
debug!(err_msg);
errors.push(err_msg);
}
Err(err) => {
let err_msg = format!("Failed to decrypt using the keystore ({err})");
debug!(err_msg);
errors.push(err_msg);
}
}
}
}
Err(ErrorForward(if !errors.is_empty() {
errors.join(", ")
} else {
format!(
"Unable to find any suitable private key for decryption (expected one of: {:?})",
pkesks
.iter()
.filter_map(|pkesk| pkesk.recipient().map(|recipient| recipient.to_hex()))
.collect::<Vec<_>>()
.join(", ")
)
})
.into())
}
}
pub(super) fn hint_to_string(hint: &PasswordHint) -> String {
if let Some(fingerprint_primary) = hint.fingerprint_primary.as_ref() {
let userid = hint.userid.as_deref().unwrap_or("--missing user ID--");
if fingerprint_primary == &hint.fingerprint {
format!("{userid} {fingerprint_primary}")
} else {
format!(
"{} {} (subkey {})",
userid, fingerprint_primary, hint.fingerprint
)
}
} else {
format!("{} (key is not part of a certificate)", hint.fingerprint)
}
}