#![allow(non_upper_case_globals)]
#![allow(clippy::unused_unit)]
use crate::{
interfaces::statics::{
decrypt_hybrid_header as core_decrypt_hybrid_header,
decrypt_symmetric_block as core_decrypt_symmetric_block,
encrypt_hybrid_header as core_encrypt_hybrid_header,
encrypt_symmetric_block as core_encrypt_symmetric_block, ClearTextHeader,
},
PublicKey, UserPrivateKey,
};
use abe_policy::Attribute;
use cosmian_crypto_core::{
symmetric_crypto::{aes_256_gcm_pure::Aes256GcmCrypto, Metadata, SymmetricCrypto},
KeyTrait,
};
use pyo3::{exceptions::PyTypeError, pyfunction, PyResult};
pub const MAX_CLEAR_TEXT_SIZE: usize = 1 << 30;
#[pyfunction]
pub fn get_encrypted_header_size(encrypted_bytes: Vec<u8>) -> PyResult<u32> {
if encrypted_bytes.len() < 4 {
return Err(PyTypeError::new_err(
"Encrypted value must be at least 4-bytes long",
));
}
Ok(u32::from_be_bytes(encrypted_bytes[..4].try_into()?))
}
#[pyfunction]
pub fn encrypt_hybrid_header(
metadata_bytes: Vec<u8>,
policy_bytes: Vec<u8>,
attributes_bytes: Vec<u8>,
public_key_bytes: Vec<u8>,
) -> PyResult<(Vec<u8>, Vec<u8>)> {
let metadata = serde_json::from_slice(&metadata_bytes)
.map_err(|e| PyTypeError::new_err(format!("Error deserializing metadata: {e}")))?;
let policy = serde_json::from_slice(&policy_bytes)
.map_err(|e| PyTypeError::new_err(format!("Error deserializing policy: {e}")))?;
let attributes: Vec<Attribute> = serde_json::from_slice(&attributes_bytes)
.map_err(|e| PyTypeError::new_err(format!("Error deserializing attributes: {e}")))?;
let public_key = PublicKey::try_from_bytes(&public_key_bytes)?;
let encrypted_header = core_encrypt_hybrid_header::<Aes256GcmCrypto>(
&policy,
&public_key,
&attributes,
Some(&metadata),
)?;
Ok((
encrypted_header.symmetric_key.to_bytes().to_vec(),
encrypted_header.header_bytes,
))
}
#[pyfunction]
pub fn decrypt_hybrid_header(
user_decryption_key_bytes: Vec<u8>,
encrypted_header_bytes: Vec<u8>,
) -> PyResult<(Vec<u8>, Vec<u8>)> {
let cleartext_header: ClearTextHeader<Aes256GcmCrypto> =
core_decrypt_hybrid_header::<Aes256GcmCrypto>(
&UserPrivateKey::try_from_bytes(&user_decryption_key_bytes)?,
&encrypted_header_bytes,
)?;
let metadata = cleartext_header
.meta_data
.try_to_bytes()
.map_err(|e| PyTypeError::new_err(format!("Serialize metadata failed: {e}")))?;
Ok((cleartext_header.symmetric_key.to_bytes().to_vec(), metadata))
}
#[pyfunction]
pub fn encrypt_hybrid_block(
symmetric_key_bytes: Vec<u8>,
uid_bytes: Vec<u8>,
block_number: usize,
plaintext_bytes: Vec<u8>,
) -> PyResult<Vec<u8>> {
let symmetric_key =
<Aes256GcmCrypto as SymmetricCrypto>::Key::try_from_bytes(&symmetric_key_bytes)
.map_err(|e| PyTypeError::new_err(format!("Deserialize symmetric key failed: {e}")))?;
Ok(core_encrypt_symmetric_block::<
Aes256GcmCrypto,
MAX_CLEAR_TEXT_SIZE,
>(
&symmetric_key,
&uid_bytes,
block_number,
&plaintext_bytes,
)?)
}
#[pyfunction]
pub fn decrypt_hybrid_block(
symmetric_key_bytes: Vec<u8>,
uid_bytes: Vec<u8>,
block_number: usize,
encrypted_bytes: Vec<u8>,
) -> PyResult<Vec<u8>> {
let symmetric_key =
<Aes256GcmCrypto as SymmetricCrypto>::Key::try_from_bytes(&symmetric_key_bytes)
.map_err(|e| PyTypeError::new_err(format!("Deserialize symmetric key failed: {e}")))?;
Ok(core_decrypt_symmetric_block::<
Aes256GcmCrypto,
MAX_CLEAR_TEXT_SIZE,
>(
&symmetric_key,
&uid_bytes,
block_number,
&encrypted_bytes,
)?)
}
#[pyfunction]
pub fn encrypt(
metadata_bytes: Vec<u8>,
policy_bytes: Vec<u8>,
attributes_bytes: Vec<u8>,
public_key_bytes: Vec<u8>,
plaintext: Vec<u8>,
) -> PyResult<Vec<u8>> {
let metadata: Metadata = serde_json::from_slice(&metadata_bytes)
.map_err(|e| PyTypeError::new_err(format!("Error deserializing metadata: {e}")))?;
let header = encrypt_hybrid_header(
metadata_bytes,
policy_bytes,
attributes_bytes,
public_key_bytes,
)?;
let ciphertext = encrypt_hybrid_block(header.0, metadata.uid, 0, plaintext)?;
let mut encrypted = Vec::<u8>::with_capacity(4 + header.1.len() + ciphertext.len());
encrypted.extend_from_slice(&u32::to_be_bytes(header.1.len() as u32));
encrypted.extend_from_slice(&header.1);
encrypted.extend_from_slice(&ciphertext);
Ok(encrypted)
}
#[pyfunction]
pub fn decrypt(user_decryption_key_bytes: Vec<u8>, encrypted_bytes: Vec<u8>) -> PyResult<Vec<u8>> {
let header_size = get_encrypted_header_size(encrypted_bytes.clone())?;
let header = encrypted_bytes[4..4 + header_size as usize].to_vec();
let ciphertext = encrypted_bytes[4 + header_size as usize..].to_vec();
let cleartext_header = decrypt_hybrid_header(user_decryption_key_bytes, header)?;
let metadata = Metadata::try_from_bytes(&cleartext_header.1)
.map_err(|e| PyTypeError::new_err(format!("Error deserializing metadata: {e}")))?;
decrypt_hybrid_block(cleartext_header.0, metadata.uid, 0, ciphertext)
}