#[cfg(all(feature = "insecure", feature = "offline"))]
use crate::client::decrypt_global;
#[cfg(feature = "offline")]
use crate::client::encrypt_global;
use crate::data::json::builder::PEPJSONBuilder;
use crate::data::json::data::{EncryptedPEPJSONValue, PEPJSONValue};
use crate::data::json::structure::JSONStructure;
use crate::data::json::utils;
use crate::data::traits::Transcryptable;
use crate::factors::py::contexts::{
PyEncryptionContext, PyPseudonymizationDomain, PyTranscryptionInfo,
};
use crate::factors::TranscryptionInfo;
#[cfg(feature = "offline")]
use crate::keys::py::types::PyGlobalPublicKeys;
#[cfg(all(feature = "insecure", feature = "offline"))]
use crate::keys::py::types::PyGlobalSecretKeys;
use crate::keys::py::types::{PyEncryptionSecret, PyPseudonymizationSecret};
#[cfg(feature = "offline")]
use crate::keys::GlobalPublicKeys;
#[cfg(all(feature = "insecure", feature = "offline"))]
use crate::keys::GlobalSecretKeys;
#[cfg(feature = "batch")]
use crate::transcryptor::transcrypt_batch;
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::types::{PyAny, PyDict, PyList};
use serde_json::Value;
#[pyclass(name = "PEPJSONValue", from_py_object)]
#[derive(Clone)]
pub struct PyPEPJSONValue(pub(crate) PEPJSONValue);
#[pymethods]
impl PyPEPJSONValue {
#[staticmethod]
#[pyo3(name = "from_value")]
fn from_value(value: &Bound<PyAny>) -> PyResult<Self> {
let json_value = python_to_json(value)?;
Ok(Self(PEPJSONValue::from_value(&json_value)))
}
#[pyo3(name = "to_json")]
fn to_json(&self) -> PyResult<Py<PyAny>> {
let json_value = self
.0
.to_value()
.map_err(|e| PyValueError::new_err(format!("Conversion failed: {}", e)))?;
Python::attach(|py| json_to_python(py, &json_value))
}
#[pyo3(name = "structure")]
fn structure(&self) -> PyJSONStructure {
PyJSONStructure(self.0.structure())
}
#[pyo3(name = "pad_to")]
fn pad_to(&self, structure: &PyJSONStructure) -> PyResult<Self> {
self.0
.pad_to(&structure.0)
.map(Self)
.map_err(|e| PyValueError::new_err(format!("Padding failed: {}", e)))
}
}
#[pyclass(name = "EncryptedPEPJSONValue", from_py_object)]
#[derive(Clone)]
pub struct PyEncryptedPEPJSONValue(pub(crate) EncryptedPEPJSONValue);
#[pymethods]
impl PyEncryptedPEPJSONValue {
#[pyo3(name = "structure")]
fn structure(&self) -> PyJSONStructure {
PyJSONStructure(self.0.structure())
}
#[pyo3(name = "transcrypt")]
fn transcrypt(
&self,
from_domain: &PyPseudonymizationDomain,
to_domain: &PyPseudonymizationDomain,
from_session: &PyEncryptionContext,
to_session: &PyEncryptionContext,
pseudonymization_secret: &PyPseudonymizationSecret,
encryption_secret: &PyEncryptionSecret,
) -> PyResult<Self> {
let transcryption_info = TranscryptionInfo::new(
&from_domain.0,
&to_domain.0,
&from_session.0,
&to_session.0,
&pseudonymization_secret.0,
&encryption_secret.0,
);
let transcrypted = self.0.transcrypt(&transcryption_info);
Ok(Self(transcrypted))
}
#[pyo3(name = "to_json")]
fn to_json(&self) -> PyResult<String> {
serde_json::to_string(&self.0)
.map_err(|e| PyValueError::new_err(format!("Serialization failed: {}", e)))
}
#[staticmethod]
#[pyo3(name = "from_json")]
fn from_json(json_str: &str) -> PyResult<Self> {
let value: EncryptedPEPJSONValue = serde_json::from_str(json_str)
.map_err(|e| PyValueError::new_err(format!("Deserialization failed: {}", e)))?;
Ok(Self(value))
}
}
#[pyclass(name = "JSONStructure", from_py_object)]
#[derive(Clone)]
pub struct PyJSONStructure(pub(crate) JSONStructure);
#[pymethods]
impl PyJSONStructure {
fn __repr__(&self) -> String {
format!("{:?}", self.0)
}
fn __eq__(&self, other: &Self) -> bool {
self.0 == other.0
}
#[pyo3(name = "to_json")]
fn to_json(&self) -> PyResult<String> {
serde_json::to_string(&self.0)
.map_err(|e| PyValueError::new_err(format!("Serialization failed: {}", e)))
}
}
#[pyclass(name = "PEPJSONBuilder", skip_from_py_object)]
pub struct PyPEPJSONBuilder {
builder: PEPJSONBuilder,
}
#[pymethods]
impl PyPEPJSONBuilder {
#[new]
fn new() -> Self {
Self {
builder: PEPJSONBuilder::new(),
}
}
#[staticmethod]
#[pyo3(name = "from_json")]
fn from_json(value: &Bound<PyAny>, pseudonyms: Vec<String>) -> PyResult<Self> {
let json_value = python_to_json(value)?;
let pseudonym_refs: Vec<&str> = pseudonyms.iter().map(|s| s.as_str()).collect();
let builder = PEPJSONBuilder::from_json(&json_value, &pseudonym_refs).ok_or_else(|| {
PyValueError::new_err("Invalid object or pseudonym field not a string")
})?;
Ok(Self { builder })
}
#[pyo3(name = "attribute")]
fn attribute<'py>(
slf: Bound<'py, Self>,
key: &str,
value: &Bound<'py, PyAny>,
) -> PyResult<Bound<'py, Self>> {
let json_value = python_to_json(value)?;
let mut borrow = slf.borrow_mut();
borrow.builder = std::mem::take(&mut borrow.builder).attribute(key, json_value);
drop(borrow);
Ok(slf)
}
#[pyo3(name = "pseudonym")]
fn pseudonym<'py>(slf: Bound<'py, Self>, key: &str, value: &str) -> PyResult<Bound<'py, Self>> {
let mut borrow = slf.borrow_mut();
borrow.builder = std::mem::take(&mut borrow.builder).pseudonym(key, value);
drop(borrow);
Ok(slf)
}
#[pyo3(name = "build")]
fn build(&mut self) -> PyPEPJSONValue {
let builder = std::mem::take(&mut self.builder);
PyPEPJSONValue(builder.build())
}
}
#[cfg(feature = "batch")]
#[pyfunction]
#[pyo3(name = "transcrypt_batch")]
pub fn py_transcrypt_batch(
values: Vec<PyEncryptedPEPJSONValue>,
transcryption_info: &PyTranscryptionInfo,
) -> PyResult<Vec<PyEncryptedPEPJSONValue>> {
let mut rng = rand::rng();
let mut rust_values: Vec<EncryptedPEPJSONValue> = values.into_iter().map(|v| v.0).collect();
let info: TranscryptionInfo = transcryption_info.into();
let transcrypted = transcrypt_batch(&mut rust_values, &info, &mut rng)
.map_err(|e| PyValueError::new_err(format!("Batch transcryption failed: {}", e)))?;
Ok(transcrypted
.into_vec()
.into_iter()
.map(PyEncryptedPEPJSONValue)
.collect())
}
#[cfg(feature = "batch")]
#[pyfunction]
#[pyo3(name = "transcrypt_json_batch")]
pub fn py_transcrypt_json_batch(
values: Vec<PyEncryptedPEPJSONValue>,
transcryption_info: &PyTranscryptionInfo,
) -> PyResult<Vec<PyEncryptedPEPJSONValue>> {
let mut rng = rand::rng();
let mut rust_values: Vec<EncryptedPEPJSONValue> = values.into_iter().map(|v| v.0).collect();
let info: TranscryptionInfo = transcryption_info.into();
let transcrypted = transcrypt_batch(&mut rust_values, &info, &mut rng)
.map_err(|e| PyValueError::new_err(format!("Batch transcryption failed: {}", e)))?;
Ok(transcrypted
.into_vec()
.into_iter()
.map(PyEncryptedPEPJSONValue)
.collect())
}
fn python_to_json(value: &Bound<PyAny>) -> PyResult<Value> {
if value.is_none() {
Ok(Value::Null)
} else if let Ok(b) = value.extract::<bool>() {
Ok(Value::Bool(b))
} else if let Ok(i) = value.extract::<i64>() {
Ok(Value::Number(i.into()))
} else if let Ok(f) = value.extract::<f64>() {
Ok(serde_json::Number::from_f64(f)
.map(Value::Number)
.unwrap_or(Value::Null))
} else if let Ok(s) = value.extract::<String>() {
Ok(Value::String(s))
} else if let Ok(list) = value.cast::<PyList>() {
let mut arr = Vec::new();
for item in list.iter() {
arr.push(python_to_json(&item)?);
}
Ok(Value::Array(arr))
} else if let Ok(dict) = value.cast::<PyDict>() {
let mut obj = serde_json::Map::new();
for (key, val) in dict.iter() {
let key_str = key.extract::<String>()?;
obj.insert(key_str, python_to_json(&val)?);
}
Ok(Value::Object(obj))
} else {
Err(PyValueError::new_err(
"Unsupported Python type for JSON conversion",
))
}
}
pub(crate) fn json_to_python(py: Python, value: &Value) -> PyResult<Py<PyAny>> {
match value {
Value::Null => Ok(py.None()),
Value::Bool(b) => {
let bound = (*b).into_pyobject(py)?;
Ok(bound.as_any().clone().unbind())
}
Value::Number(n) => {
if let Some(i) = n.as_i64() {
let bound = i.into_pyobject(py)?;
Ok(bound.as_any().clone().unbind())
} else if let Some(u) = n.as_u64() {
let bound = u.into_pyobject(py)?;
Ok(bound.as_any().clone().unbind())
} else if let Some(f) = n.as_f64() {
let bound = f.into_pyobject(py)?;
Ok(bound.as_any().clone().unbind())
} else {
Err(PyValueError::new_err("Invalid number"))
}
}
Value::String(s) => {
let bound = s.as_str().into_pyobject(py)?;
Ok(bound.as_any().clone().unbind())
}
Value::Array(arr) => {
let list = PyList::empty(py);
for item in arr {
list.append(json_to_python(py, item)?)?;
}
Ok(list.into())
}
Value::Object(obj) => {
let dict = PyDict::new(py);
for (key, val) in obj {
dict.set_item(key, json_to_python(py, val)?)?;
}
Ok(dict.into())
}
}
}
#[cfg(feature = "offline")]
#[pyfunction]
#[pyo3(name = "encrypt_global")]
pub fn py_encrypt_global(
value: &PyPEPJSONValue,
global_keys: &PyGlobalPublicKeys,
) -> PyEncryptedPEPJSONValue {
let mut rng = rand::rng();
let keys = GlobalPublicKeys {
pseudonym: global_keys.pseudonym.0 .0.into(),
attribute: global_keys.attribute.0 .0.into(),
};
PyEncryptedPEPJSONValue(encrypt_global(&value.0, &keys, &mut rng))
}
#[cfg(all(feature = "insecure", feature = "offline"))]
#[pyfunction]
#[pyo3(name = "decrypt_global")]
pub fn py_decrypt_global(
encrypted: &PyEncryptedPEPJSONValue,
global_secret_keys: &PyGlobalSecretKeys,
) -> PyResult<PyPEPJSONValue> {
let keys = GlobalSecretKeys {
pseudonym: global_secret_keys.pseudonym.0 .0.into(),
attribute: global_secret_keys.attribute.0 .0.into(),
};
#[cfg(feature = "elgamal3")]
let decrypted = decrypt_global(&encrypted.0, &keys)
.ok_or_else(|| PyValueError::new_err("Decryption failed: key mismatch"))?;
#[cfg(not(feature = "elgamal3"))]
let decrypted = decrypt_global(&encrypted.0, &keys);
Ok(PyPEPJSONValue(decrypted))
}
#[pyfunction]
#[pyo3(name = "bool_to_byte")]
pub fn py_bool_to_byte(b: bool) -> u8 {
utils::bool_to_byte(b)
}
#[pyfunction]
#[pyo3(name = "byte_to_bool")]
pub fn py_byte_to_bool(byte: u8) -> PyResult<bool> {
utils::byte_to_bool(byte).map_err(|e| PyValueError::new_err(e.to_string()))
}
#[pyfunction]
#[pyo3(name = "number_to_bytes")]
pub fn py_number_to_bytes(n: f64) -> [u8; 9] {
let num = serde_json::Number::from_f64(n).unwrap_or(serde_json::Number::from(0));
utils::number_to_bytes(&num)
}
#[pyfunction]
#[pyo3(name = "bytes_to_number")]
pub fn py_bytes_to_number(bytes: [u8; 9]) -> f64 {
let num = utils::bytes_to_number(&bytes);
num.as_f64().unwrap_or(0.0)
}
#[pyfunction]
#[pyo3(name = "unify_structures")]
pub fn py_unify_structures(structures: Vec<PyJSONStructure>) -> PyResult<PyJSONStructure> {
let rust_structures: Vec<JSONStructure> = structures.into_iter().map(|s| s.0).collect();
crate::data::json::structure::unify_structures(&rust_structures)
.map(PyJSONStructure)
.map_err(|e| PyValueError::new_err(format!("Unification failed: {}", e)))
}
pub fn register(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyPEPJSONValue>()?;
m.add_class::<PyEncryptedPEPJSONValue>()?;
m.add_class::<PyJSONStructure>()?;
m.add_class::<PyPEPJSONBuilder>()?;
#[cfg(feature = "batch")]
{
m.add_function(wrap_pyfunction!(py_transcrypt_batch, m)?)?;
m.add_function(wrap_pyfunction!(py_transcrypt_json_batch, m)?)?;
}
#[cfg(feature = "offline")]
m.add_function(wrap_pyfunction!(py_encrypt_global, m)?)?;
#[cfg(all(feature = "insecure", feature = "offline"))]
m.add_function(wrap_pyfunction!(py_decrypt_global, m)?)?;
m.add_function(wrap_pyfunction!(py_bool_to_byte, m)?)?;
m.add_function(wrap_pyfunction!(py_byte_to_bool, m)?)?;
m.add_function(wrap_pyfunction!(py_number_to_bytes, m)?)?;
m.add_function(wrap_pyfunction!(py_bytes_to_number, m)?)?;
m.add_function(wrap_pyfunction!(py_unify_structures, m)?)?;
Ok(())
}