use cosmian_cover_crypt::{Error, UserSecretKey, XEnc, api::Covercrypt, traits::KemAc};
use cosmian_crypto_core::{
Aes256Gcm, Dem, FixedSizeCBytes, Instantiable, Nonce, SymmetricKey,
bytes_ser_de::{Deserializer, Serializable, Serializer},
};
use cosmian_kmip::kmip_2_1::{
kmip_objects::Object,
kmip_operations::{Decrypt, DecryptResponse},
kmip_types::{CryptographicAlgorithm, CryptographicParameters, UniqueIdentifier},
};
use cosmian_logger::{debug, trace};
use zeroize::Zeroizing;
use super::user_key::unwrap_user_decryption_key_object;
use crate::{
crypto::DecryptionSystem,
error::{CryptoError, result::CryptoResult},
};
pub struct CovercryptDecryption {
cover_crypt: Covercrypt,
usk_uid: String,
usk_bytes: Zeroizing<Vec<u8>>,
}
impl CovercryptDecryption {
pub fn instantiate(
cover_crypt: Covercrypt,
user_decryption_key_uid: &str,
user_decryption_key: &Object,
) -> Result<Self, CryptoError> {
trace!("instantiate entering");
let (user_decryption_key_bytes, _attributes) =
unwrap_user_decryption_key_object(user_decryption_key)?;
debug!(
"Instantiated hybrid CoverCrypt decipher for user decryption key id: \
{user_decryption_key_uid}"
);
Ok(Self {
cover_crypt,
usk_uid: user_decryption_key_uid.into(),
usk_bytes: user_decryption_key_bytes,
})
}
fn single_decrypt(
&self,
encrypted_bytes: &[u8],
ad: Option<&[u8]>,
usk: &UserSecretKey,
) -> CryptoResult<Zeroizing<Vec<u8>>> {
trace!("decrypt: ad: {ad:?}");
let mut de = Deserializer::new(encrypted_bytes);
trace!("encrypted_bytes len: {}", encrypted_bytes.len());
let enc = XEnc::read(&mut de)?;
trace!("encrypted_header parsed");
let seed = self.cover_crypt.decaps(usk, &enc)?.ok_or_else(|| {
Error::OperationNotPermitted("insufficient rights to open encapsulation".to_owned())
})?;
let key = SymmetricKey::derive(&seed, b"Covercrypt AEAD key")?;
let ctx = de.finalize();
let ptx = aead_decrypt(&key, &ctx, ad)?;
debug!(
"Decrypted data with user key {} of len (Plain/Enc): {}/{}",
&self.usk_uid,
ptx.len(),
enc.length(),
);
Ok(ptx)
}
fn bulk_decrypt(
&self,
encrypted_bytes: &[u8],
ad: Option<&[u8]>,
usk: &UserSecretKey,
) -> Result<Zeroizing<Vec<u8>>, CryptoError> {
let mut de = Deserializer::new(encrypted_bytes);
let mut ser = Serializer::new();
let nb_chunks = {
let len = de.read_leb128_u64()?;
ser.write_leb128_u64(len)?;
usize::try_from(len).map_err(|e| {
CryptoError::Kmip(format!(
"size of vector is too big for architecture: {len} bytes. Error: {e:?}"
))
})?
};
for _ in 0..nb_chunks {
let ctx = de.read_vec_as_ref()?;
let ptx = self.single_decrypt(ctx, ad, usk)?;
ser.write_vec(&ptx)?;
}
Ok(ser.finalize())
}
}
impl DecryptionSystem for CovercryptDecryption {
fn decrypt(&self, request: &Decrypt) -> Result<DecryptResponse, CryptoError> {
let usk = UserSecretKey::deserialize(&self.usk_bytes).map_err(|e| {
CryptoError::Kmip(format!(
"cover crypt decrypt: failed recovering the user key: {e}"
))
})?;
let ctx = request.data.as_ref().ok_or_else(|| {
CryptoError::Kmip("The decryption request should contain encrypted data".to_owned())
})?;
let ptx = if let Some(CryptographicParameters {
cryptographic_algorithm: Some(CryptographicAlgorithm::CoverCryptBulk),
..
}) = request.cryptographic_parameters
{
self.bulk_decrypt(
ctx.as_slice(),
request.authenticated_encryption_additional_data.as_deref(),
&usk,
)?
} else {
self.single_decrypt(
ctx.as_slice(),
request.authenticated_encryption_additional_data.as_deref(),
&usk,
)?
};
Ok(DecryptResponse {
unique_identifier: UniqueIdentifier::TextString(self.usk_uid.clone()),
data: Some(ptx),
correlation_value: None,
})
}
}
fn aead_decrypt(
key: &SymmetricKey<{ Aes256Gcm::KEY_LENGTH }>,
ctx: &[u8],
ad: Option<&[u8]>,
) -> CryptoResult<Zeroizing<Vec<u8>>> {
#![allow(clippy::indexing_slicing)]
if ctx.len() < Aes256Gcm::NONCE_LENGTH {
return Err(CryptoError::Default("encrypted block too short".to_owned()));
}
let nonce = Nonce::try_from_slice(&ctx[..Aes256Gcm::NONCE_LENGTH])?;
Aes256Gcm::new(key)
.decrypt(&nonce, &ctx[Aes256Gcm::NONCE_LENGTH..], ad)
.map(Zeroizing::new)
.map_err(CryptoError::from)
}