import os
import shutil
import subprocess
import sys
import tempfile
from synta.cms import (
EncryptedData,
ID_AES128_CBC,
ID_AES192_CBC,
ID_AES256_CBC,
)
KEY_128 = bytes.fromhex("00112233445566778899aabbccddeeff")
KEY_192 = bytes.fromhex("000102030405060708090a0b0c0d0e0f1011121314151617")
KEY_256 = bytes(range(32))
def section(title: str) -> None:
print(f"\n{'─' * 64}\n{title}\n{'─' * 64}")
def run(cmd: list[str], **kwargs) -> subprocess.CompletedProcess:
print(f" $ {' '.join(cmd)}")
result = subprocess.run(cmd, **kwargs)
if result.returncode != 0:
stderr = getattr(result, "stderr", b"")
if isinstance(stderr, bytes):
stderr = stderr.decode(errors="replace")
sys.exit(f" Command failed (rc={result.returncode}): {stderr.strip()}")
return result
def _der_len(n: int) -> bytes:
if n < 0x80:
return bytes([n])
elif n < 0x100:
return bytes([0x81, n])
else:
return bytes([0x82, n >> 8, n & 0xFF])
def _seq(body: bytes) -> bytes:
return b"\x30" + _der_len(len(body)) + body
def _iv_from_params(params: bytes) -> bytes:
if params is None or len(params) < 2 or params[0] != 0x04:
raise ValueError(
f"expected OCTET STRING (tag 0x04), got: {params[:4].hex() if params else 'None'}"
)
iv_len = params[1]
return params[2 : 2 + iv_len]
def _build_encrypted_data_der(
ciphertext: bytes,
alg_oid_tlv: bytes,
iv: bytes,
content_type_oid_tlv: bytes | None = None,
) -> bytes:
if content_type_oid_tlv is None:
content_type_oid_tlv = bytes(
[0x06, 0x09, 0x2A, 0x86, 0x48, 0x86, 0xF7, 0x0D, 0x01, 0x07, 0x01]
)
iv_tlv = b"\x04" + _der_len(len(iv)) + iv
alg_id = _seq(alg_oid_tlv + iv_tlv)
enc_content = b"\x80" + _der_len(len(ciphertext)) + ciphertext
eci = _seq(content_type_oid_tlv + alg_id + enc_content)
return _seq(b"\x02\x01\x00" + eci)
_AES128_CBC_OID_TLV = bytes(
[0x06, 0x09, 0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x01, 0x02]
)
def demo_create_inspect() -> None:
section("1. Create an EncryptedData and inspect its fields")
plaintext = b"This is the confidential content protected by AES-128-CBC."
ed = EncryptedData.create(plaintext, KEY_128, ID_AES128_CBC)
iv = _iv_from_params(ed.content_encryption_algorithm_params)
print(f" version : {ed.version}")
print(f" content_type OID : {ed.content_type}")
print(f" content_encryption_algorithm OID : {ed.content_encryption_algorithm_oid}")
print(f" IV (hex) : {iv.hex()}")
print(f" encrypted_content length : {len(ed.encrypted_content)} bytes")
print(f" EncryptedData DER length : {len(ed.to_der())} bytes")
recovered = ed.decrypt(KEY_128)
assert recovered == plaintext
print(f" decrypt() : {recovered.decode()!r} ✓")
def demo_decrypt_modify_reencrypt() -> EncryptedData:
section("2. Decrypt → replace text → re-encrypt")
original = b"Status: PENDING - awaiting manager approval."
print(f" original plaintext : {original.decode()!r}")
ed1 = EncryptedData.create(original, KEY_128, ID_AES128_CBC)
decrypted = ed1.decrypt(KEY_128)
assert decrypted == original
print(f" decrypted : {decrypted.decode()!r} ✓")
modified = (
decrypted
.replace(b"PENDING", b"APPROVED")
.replace(b"awaiting manager approval", b"signed off by manager")
)
print(f" modified : {modified.decode()!r}")
ed2 = EncryptedData.create(modified, KEY_128, ID_AES128_CBC)
assert ed2.decrypt(KEY_128) == modified
print(f" re-encrypt + verify: ✓")
ed3 = EncryptedData.from_der(ed2.to_der())
assert ed3.decrypt(KEY_128) == modified
print(f" DER round-trip : {len(ed2.to_der())} bytes ✓")
return ed2
def demo_algorithms() -> None:
section("3. Encrypt with AES-128/192/256-CBC")
cases = [
("AES-128-CBC", ID_AES128_CBC, KEY_128),
("AES-192-CBC", ID_AES192_CBC, KEY_192),
("AES-256-CBC", ID_AES256_CBC, KEY_256),
]
for label, oid, key in cases:
pt = f"Protected by {label}.".encode()
ed = EncryptedData.create(pt, key, oid)
assert ed.decrypt(key) == pt
print(f" {label}: {len(ed.encrypted_content):3d} byte ciphertext ✓")
a = EncryptedData.create(b"same input", KEY_128, ID_AES128_CBC)
b_ = EncryptedData.create(b"same input", KEY_128, ID_AES128_CBC)
assert a.content_encryption_algorithm_params != b_.content_encryption_algorithm_params
print(" Random IV per call: ciphertexts differ ✓")
def demo_verify_with_openssl(ed: EncryptedData) -> None:
section("4. Verify re-encrypted content with openssl enc")
openssl = shutil.which("openssl")
if openssl is None:
print(" Skipped: openssl not found on PATH")
return
key_hex = KEY_128.hex()
iv = _iv_from_params(ed.content_encryption_algorithm_params)
ciphertext = ed.encrypted_content
print(f" key (hex) : {key_hex}")
print(f" IV (hex) : {iv.hex()}")
print(f" ciphertext: {len(ciphertext)} bytes")
result = run(
[openssl, "enc", "-aes-128-cbc", "-d",
"-K", key_hex, "-iv", iv.hex(), "-nosalt"],
input=ciphertext,
capture_output=True,
)
recovered = result.stdout
print(f" openssl decrypted : {recovered.decode()!r} ✓")
assert b"APPROVED" in recovered
assert b"signed off by manager" in recovered
def demo_asn1parse(ed: EncryptedData) -> None:
section("5. Inspect EncryptedData DER with openssl asn1parse")
openssl = shutil.which("openssl")
if openssl is None:
print(" Skipped: openssl not found on PATH")
return
with tempfile.NamedTemporaryFile(suffix=".der", delete=False) as fh:
fh.write(ed.to_der())
path = fh.name
try:
result = run(
[openssl, "asn1parse", "-inform", "DER", "-in", path],
capture_output=True, text=True,
)
print(" openssl asn1parse output:")
for line in result.stdout.strip().splitlines():
print(f" {line}")
finally:
os.unlink(path)
def demo_openssl_to_synta() -> None:
section("6. Encrypt with openssl enc, decrypt with synta")
openssl = shutil.which("openssl")
if openssl is None:
print(" Skipped: openssl not found on PATH")
return
key_hex = KEY_128.hex()
plaintext = b"Declassified - cleared for public release."
print(f" plaintext : {plaintext.decode()!r}")
iv_result = run([openssl, "rand", "-hex", "16"], capture_output=True, text=True)
iv_hex = iv_result.stdout.strip()
iv = bytes.fromhex(iv_hex)
print(f" IV (hex) : {iv_hex}")
enc_result = run(
[openssl, "enc", "-aes-128-cbc", "-e",
"-K", key_hex, "-iv", iv_hex, "-nosalt"],
input=plaintext,
capture_output=True,
)
ciphertext = enc_result.stdout
print(f" openssl ciphertext: {len(ciphertext)} bytes")
ed_der = _build_encrypted_data_der(ciphertext, _AES128_CBC_OID_TLV, iv)
ed = EncryptedData.from_der(ed_der)
recovered = ed.decrypt(KEY_128)
assert recovered == plaintext
print(f" synta decrypted : {recovered.decode()!r} ✓")
def main() -> None:
print("=" * 64)
print("Example: CMS EncryptedData — encrypt, modify, decrypt, verify")
print("=" * 64)
try:
demo_create_inspect()
ed2 = demo_decrypt_modify_reencrypt()
demo_algorithms()
demo_verify_with_openssl(ed2)
demo_asn1parse(ed2)
demo_openssl_to_synta()
except NotImplementedError as exc:
print(f"\nNote: {exc}", file=sys.stderr)
print(
"Rebuild synta-python with --features openssl to run this example.",
file=sys.stderr,
)
sys.exit(1)
print("\nAll CMS EncryptedData examples completed.")
if __name__ == "__main__":
main()