import unittest
from libpep.keys import (
make_global_keys,
make_session_keys,
)
from libpep.factors import (
EncryptionSecret,
PseudonymizationSecret,
TranscryptionInfo,
PseudonymizationInfo,
PseudonymizationDomain,
EncryptionContext,
)
from libpep.client import (
encrypt,
encrypt_batch,
decrypt,
)
from libpep.data import json as pepjson
from libpep.data.json import transcrypt_json_batch, PEPJSONBuilder
class TestJSONTranscryption(unittest.TestCase):
def test_json_transcryption_with_builder(self):
global_keys = make_global_keys()
pseudo_secret = PseudonymizationSecret(b"pseudo-secret")
enc_secret = EncryptionSecret(b"encryption-secret")
domain_a = PseudonymizationDomain("clinic-a")
domain_b = PseudonymizationDomain("clinic-b")
session = EncryptionContext("session-1")
session_keys = make_session_keys(global_keys[1], session, enc_secret)
patient_data = {
"user_id": "user-67890",
"name": "Alice",
"age": 30,
"active": True,
}
patient_record = PEPJSONBuilder.from_json(patient_data, ["user_id"]).build()
encrypted = encrypt(patient_record, session_keys)
decrypted_original = decrypt(encrypted, session_keys)
json_original = decrypted_original.to_json()
self.assertEqual(json_original["user_id"], "user-67890")
self.assertEqual(json_original["name"], "Alice")
self.assertEqual(json_original["age"], 30)
self.assertEqual(json_original["active"], True)
transcrypted = encrypted.transcrypt(
domain_a, domain_b, session, session, pseudo_secret, enc_secret
)
self.assertNotEqual(
str(encrypted),
str(transcrypted),
"Encrypted values should be different after transcryption",
)
decrypted_transcrypted = decrypt(transcrypted, session_keys)
json_transcrypted = decrypted_transcrypted.to_json()
self.assertEqual(json_transcrypted["name"], "Alice")
self.assertEqual(json_transcrypted["age"], 30)
self.assertEqual(json_transcrypted["active"], True)
self.assertNotEqual(
json_transcrypted["user_id"],
"user-67890",
"Pseudonym should be different after cross-domain transcryption",
)
class TestJSONBatchTranscryption(unittest.TestCase):
def test_json_batch_transcryption_same_structure(self):
global_keys = make_global_keys()
pseudo_secret = PseudonymizationSecret(b"pseudo-secret")
enc_secret = EncryptionSecret(b"encryption-secret")
domain_a = PseudonymizationDomain("domain-a")
domain_b = PseudonymizationDomain("domain-b")
session = EncryptionContext("session-1")
session_keys = make_session_keys(global_keys[1], session, enc_secret)
data1 = {"patient_id": "patient-001", "diagnosis": "Flu", "temperature": 38.5}
data2 = {"patient_id": "patient-002", "diagnosis": "Cold", "temperature": 37.2}
record1 = PEPJSONBuilder.from_json(data1, ["patient_id"]).build()
record2 = PEPJSONBuilder.from_json(data2, ["patient_id"]).build()
encrypted1 = encrypt(record1, session_keys)
encrypted2 = encrypt(record2, session_keys)
structure1 = encrypted1.structure()
structure2 = encrypted2.structure()
self.assertEqual(structure1, structure2, "Records should have same structure")
transcryption_info = TranscryptionInfo(
domain_a,
domain_b,
session,
session,
pseudo_secret,
enc_secret,
)
transcrypted_batch = transcrypt_json_batch(
[encrypted1, encrypted2], transcryption_info
)
self.assertEqual(len(transcrypted_batch), 2)
self.assertNotEqual(
str([encrypted1, encrypted2]),
str(transcrypted_batch),
"Batch transcryption should transform the values",
)
decrypted_batch = [
decrypt(v, session_keys).to_json() for v in transcrypted_batch
]
decrypted_batch.sort(key=lambda x: x["temperature"])
self.assertEqual(decrypted_batch[0]["diagnosis"], "Cold")
self.assertEqual(decrypted_batch[0]["temperature"], 37.2)
self.assertNotEqual(
decrypted_batch[0]["patient_id"],
"patient-002",
"Patient ID should be different after cross-domain transcryption",
)
self.assertEqual(decrypted_batch[1]["diagnosis"], "Flu")
self.assertEqual(decrypted_batch[1]["temperature"], 38.5)
self.assertNotEqual(
decrypted_batch[1]["patient_id"],
"patient-001",
"Patient ID should be different after cross-domain transcryption",
)
def test_json_batch_transcryption_different_structures(self):
global_keys = make_global_keys()
pseudo_secret = PseudonymizationSecret(b"pseudo-secret")
enc_secret = EncryptionSecret(b"encryption-secret")
domain_a = PseudonymizationDomain("domain-a")
domain_b = PseudonymizationDomain("domain-b")
session = EncryptionContext("session-1")
session_keys = make_session_keys(global_keys[1], session, enc_secret)
data1 = {"patient_id": "patient-001", "diagnosis": "Flu", "temperature": 38.5}
data2 = {"user_id": "user-002", "name": "Bob", "age": 25, "active": True}
record1 = PEPJSONBuilder.from_json(data1, ["patient_id"]).build()
record2 = PEPJSONBuilder.from_json(data2, ["user_id"]).build()
encrypted1 = encrypt(record1, session_keys)
encrypted2 = encrypt(record2, session_keys)
structure1 = encrypted1.structure()
structure2 = encrypted2.structure()
self.assertNotEqual(
structure1, structure2, "Records should have different structures"
)
transcryption_info = TranscryptionInfo(
domain_a,
domain_b,
session,
session,
pseudo_secret,
enc_secret,
)
with self.assertRaises(Exception) as context:
transcrypt_json_batch([encrypted1, encrypted2], transcryption_info)
error_msg = str(context.exception).lower()
self.assertTrue(
"structure" in error_msg or "inconsistent" in error_msg,
f"Error should mention structure mismatch, got: {context.exception}",
)
def test_json_batch_transcryption_same_structure_different_lengths(self):
global_keys = make_global_keys()
pseudo_secret = PseudonymizationSecret(b"pseudo-secret")
enc_secret = EncryptionSecret(b"encryption-secret")
domain_a = PseudonymizationDomain("domain-a")
domain_b = PseudonymizationDomain("domain-b")
session = EncryptionContext("session-1")
session_keys = make_session_keys(global_keys[1], session, enc_secret)
data1 = {
"patient_id": "p1",
"diagnosis": "Flu",
"temperature": 38.5
}
data2 = {
"patient_id": "patient-002-with-a-very-long-id-that-changes-length",
"diagnosis": "Flu with a very long description to ensure structure length differs",
"temperature": 38.5
}
record1 = PEPJSONBuilder.from_json(data1, ["patient_id"]).build()
record2 = PEPJSONBuilder.from_json(data2, ["patient_id"]).build()
encrypted1 = encrypt(record1, session_keys)
encrypted2 = encrypt(record2, session_keys)
self.assertNotEqual(encrypted1.structure(), encrypted2.structure())
transcryption_info = TranscryptionInfo(
domain_a, domain_b, session, session, pseudo_secret, enc_secret
)
with self.assertRaises(Exception) as cm:
transcrypt_json_batch([encrypted1, encrypted2], transcryption_info)
self.assertIn("structure", str(cm.exception).lower())
encrypted_batch = encrypt_batch([record1, record2], session_keys)
self.assertEqual(
encrypted_batch[0].structure(),
encrypted_batch[1].structure(),
"encrypt_json_batch should have unified the structures via padding"
)
transcrypted_batch = transcrypt_json_batch(encrypted_batch, transcryption_info)
self.assertEqual(len(transcrypted_batch), 2)
decrypted_jsons = [
decrypt(v, session_keys).to_json() for v in transcrypted_batch
]
diagnoses = {d["diagnosis"] for d in decrypted_jsons}
self.assertIn("Flu", diagnoses)
self.assertIn("Flu with a very long description to ensure structure length differs", diagnoses)
if __name__ == "__main__":
unittest.main()