# 22. `example_cms_encrypted_data.py` — CMS EncryptedData round-trip
[← Example index](index.md) · [example_cms_encrypted_data.py on Codeberg](https://codeberg.org/abbra/synta/src/branch/main/examples/example_cms_encrypted_data.py)
Bindings: `synta.cms.EncryptedData`, `synta.cms.ID_AES128_CBC`,
`synta.cms.ID_AES192_CBC`, `synta.cms.ID_AES256_CBC`.
Requires the `openssl` Cargo feature (`maturin develop --features openssl`).
- Create an `EncryptedData` with `EncryptedData.create` (AES-128-CBC) and inspect
all properties: `version`, `content_type`, `content_encryption_algorithm_oid`,
`content_encryption_algorithm_params` (IV extraction), `encrypted_content`.
- Decrypt → replace text in plaintext → re-encrypt with a fresh random IV;
verify the round-trip with `to_der()` / `from_der()`.
- Encrypt with each of AES-128/192/256-CBC using `ID_AES128_CBC`,
`ID_AES192_CBC`, `ID_AES256_CBC`; confirm random IV differs across calls.
- Verify synta-produced ciphertext with `openssl enc -d` (synta → openssl interop).
- Inspect the `EncryptedData` DER structure with `openssl asn1parse`.
- Encrypt raw bytes with `openssl enc -e`, wrap in an `EncryptedData` DER
by hand, parse with `from_der`, and decrypt with synta (openssl → synta interop).
## Source
```python
#!/usr/bin/env python3
"""
Example: CMS EncryptedData — create, decrypt, replace content, re-encrypt, verify.
Demonstrates:
- EncryptedData.create() — encrypt plaintext with a symmetric key
- EncryptedData.decrypt() — recover plaintext
- Round-trip: load → decrypt → replace text → re-encrypt → export DER
- Interoperability: verify synta-produced ciphertext with ``openssl enc``
- Interoperability: parse synta-produced DER with ``openssl asn1parse``
- Reverse: encrypt with ``openssl enc``, decrypt with synta
Requirements:
- synta built with the ``openssl`` Cargo feature
(``maturin develop --features openssl`` or equivalent)
- openssl(1) on PATH for the interoperability sections
Usage::
python examples/example_cms_encrypted_data.py
"""
import os
import shutil
import subprocess
import sys
import tempfile
from synta.cms import (
EncryptedData,
ID_AES128_CBC,
ID_AES192_CBC,
ID_AES256_CBC,
)
# ── Symmetric keys ────────────────────────────────────────────────────────────
KEY_128 = bytes.fromhex("00112233445566778899aabbccddeeff")
KEY_192 = bytes.fromhex("000102030405060708090a0b0c0d0e0f1011121314151617")
KEY_256 = bytes(range(32))
# ── Formatting helpers ────────────────────────────────────────────────────────
def section(title: str) -> None:
print(f"\n{'─' * 64}\n{title}\n{'─' * 64}")
def run(cmd: list[str], **kwargs) -> subprocess.CompletedProcess:
"""Run a subprocess, printing the command and any stderr on failure."""
print(f" $ {' '.join(cmd)}")
result = subprocess.run(cmd, **kwargs)
if result.returncode != 0:
stderr = getattr(result, "stderr", b"")
if isinstance(stderr, bytes):
stderr = stderr.decode(errors="replace")
sys.exit(f" Command failed (rc={result.returncode}): {stderr.strip()}")
return result
# ── DER helpers (used by the openssl→synta interop section) ──────────────────
def _der_len(n: int) -> bytes:
"""Encode a DER length."""
if n < 0x80:
return bytes([n])
elif n < 0x100:
return bytes([0x81, n])
else:
return bytes([0x82, n >> 8, n & 0xFF])
def _seq(body: bytes) -> bytes:
"""Wrap body in a DER SEQUENCE TLV."""
return b"\x30" + _der_len(len(body)) + body
def _iv_from_params(params: bytes) -> bytes:
"""Extract the raw IV bytes from a DER OCTET STRING parameter field.
``content_encryption_algorithm_params`` for CBC ciphers is the DER
encoding of ``OCTET STRING { iv }``, i.e. ``04 <len> <iv bytes>``.
"""
if params is None or len(params) < 2 or params[0] != 0x04:
raise ValueError(
f"expected OCTET STRING (tag 0x04), got: {params[:4].hex() if params else 'None'}"
)
iv_len = params[1]
return params[2 : 2 + iv_len]
def _build_encrypted_data_der(
ciphertext: bytes,
alg_oid_tlv: bytes,
iv: bytes,
content_type_oid_tlv: bytes | None = None,
) -> bytes:
"""Assemble a minimal ``EncryptedData`` DER SEQUENCE (RFC 5652 §8).
Used to wrap a ciphertext produced externally (e.g. by ``openssl enc``)
into a structure that ``EncryptedData.from_der()`` can parse and
``EncryptedData.decrypt()`` can decrypt.
Args:
ciphertext: Raw ciphertext bytes (the OCTET STRING value).
alg_oid_tlv: DER-encoded OID TLV (``06 <len> <value>``).
iv: Raw IV bytes; encoded as ``OCTET STRING`` inside
``AlgorithmIdentifier.parameters``.
content_type_oid_tlv: DER-encoded OID TLV for the content type;
defaults to ``id-data`` (1.2.840.113549.1.7.1).
Returns:
DER-encoded ``EncryptedData`` SEQUENCE.
"""
# Default content type: id-data (1.2.840.113549.1.7.1)
if content_type_oid_tlv is None:
content_type_oid_tlv = bytes(
[0x06, 0x09, 0x2A, 0x86, 0x48, 0x86, 0xF7, 0x0D, 0x01, 0x07, 0x01]
)
# AlgorithmIdentifier ::= SEQUENCE { OID, OCTET STRING(iv) }
iv_tlv = b"\x04" + _der_len(len(iv)) + iv
alg_id = _seq(alg_oid_tlv + iv_tlv)
# encryptedContent [0] IMPLICIT OCTET STRING
enc_content = b"\x80" + _der_len(len(ciphertext)) + ciphertext
# EncryptedContentInfo ::= SEQUENCE { contentType, alg, encryptedContent }
eci = _seq(content_type_oid_tlv + alg_id + enc_content)
# EncryptedData ::= SEQUENCE { version INTEGER 0, encryptedContentInfo }
return _seq(b"\x02\x01\x00" + eci)
# DER encoding of the ID_AES128_CBC OID (tag 0x06, length 9, value bytes).
# This is the fixed TLV for OID 2.16.840.1.101.3.4.1.2 used when manually
# assembling an EncryptedData DER structure for the openssl→synta interop demo.
_AES128_CBC_OID_TLV = bytes(
[0x06, 0x09, 0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x01, 0x02]
)
# ── Section 1: Create and inspect ─────────────────────────────────────────────
def demo_create_inspect() -> None:
section("1. Create an EncryptedData and inspect its fields")
plaintext = b"This is the confidential content protected by AES-128-CBC."
ed = EncryptedData.create(plaintext, KEY_128, ID_AES128_CBC)
iv = _iv_from_params(ed.content_encryption_algorithm_params)
print(f" version : {ed.version}")
print(f" content_type OID : {ed.content_type}")
print(f" content_encryption_algorithm OID : {ed.content_encryption_algorithm_oid}")
print(f" IV (hex) : {iv.hex()}")
print(f" encrypted_content length : {len(ed.encrypted_content)} bytes")
print(f" EncryptedData DER length : {len(ed.to_der())} bytes")
recovered = ed.decrypt(KEY_128)
assert recovered == plaintext
print(f" decrypt() : {recovered.decode()!r} ✓")
# ── Section 2: Decrypt → replace text → re-encrypt ───────────────────────────
def demo_decrypt_modify_reencrypt() -> EncryptedData:
"""Load a CMS EncryptedData, decrypt it, replace text, and re-encrypt.
Returns the re-encrypted EncryptedData object for use in later sections.
"""
section("2. Decrypt → replace text → re-encrypt")
original = b"Status: PENDING - awaiting manager approval."
print(f" original plaintext : {original.decode()!r}")
# Encrypt the original content.
ed1 = EncryptedData.create(original, KEY_128, ID_AES128_CBC)
# Decrypt.
decrypted = ed1.decrypt(KEY_128)
assert decrypted == original
print(f" decrypted : {decrypted.decode()!r} ✓")
# Replace text in the recovered plaintext.
modified = (
decrypted
.replace(b"PENDING", b"APPROVED")
.replace(b"awaiting manager approval", b"signed off by manager")
)
print(f" modified : {modified.decode()!r}")
# Re-encrypt — synta generates a fresh random IV automatically.
ed2 = EncryptedData.create(modified, KEY_128, ID_AES128_CBC)
# Verify the round-trip.
assert ed2.decrypt(KEY_128) == modified
print(f" re-encrypt + verify: ✓")
# Parse from DER to confirm serialisation round-trip.
ed3 = EncryptedData.from_der(ed2.to_der())
assert ed3.decrypt(KEY_128) == modified
print(f" DER round-trip : {len(ed2.to_der())} bytes ✓")
return ed2
# ── Section 3: Multiple AES key sizes ────────────────────────────────────────
def demo_algorithms() -> None:
section("3. Encrypt with AES-128/192/256-CBC")
cases = [
("AES-128-CBC", ID_AES128_CBC, KEY_128),
("AES-192-CBC", ID_AES192_CBC, KEY_192),
("AES-256-CBC", ID_AES256_CBC, KEY_256),
]
for label, oid, key in cases:
pt = f"Protected by {label}.".encode()
ed = EncryptedData.create(pt, key, oid)
assert ed.decrypt(key) == pt
print(f" {label}: {len(ed.encrypted_content):3d} byte ciphertext ✓")
# Confirm random IV: two encryptions of the same plaintext differ.
a = EncryptedData.create(b"same input", KEY_128, ID_AES128_CBC)
b_ = EncryptedData.create(b"same input", KEY_128, ID_AES128_CBC)
assert a.content_encryption_algorithm_params != b_.content_encryption_algorithm_params
print(" Random IV per call: ciphertexts differ ✓")
# ── Section 4: Verify with openssl enc ───────────────────────────────────────
def demo_verify_with_openssl(ed: EncryptedData) -> None:
"""Use ``openssl enc`` to decrypt synta-produced ciphertext and confirm
the modified plaintext is present.
``ed`` should be the re-encrypted EncryptedData from Section 2.
"""
section("4. Verify re-encrypted content with openssl enc")
openssl = shutil.which("openssl")
if openssl is None:
print(" Skipped: openssl not found on PATH")
return
key_hex = KEY_128.hex()
iv = _iv_from_params(ed.content_encryption_algorithm_params)
ciphertext = ed.encrypted_content
print(f" key (hex) : {key_hex}")
print(f" IV (hex) : {iv.hex()}")
print(f" ciphertext: {len(ciphertext)} bytes")
result = run(
[openssl, "enc", "-aes-128-cbc", "-d",
"-K", key_hex, "-iv", iv.hex(), "-nosalt"],
input=ciphertext,
capture_output=True,
)
recovered = result.stdout
print(f" openssl decrypted : {recovered.decode()!r} ✓")
assert b"APPROVED" in recovered
assert b"signed off by manager" in recovered
# ── Section 5: Inspect DER with openssl asn1parse ────────────────────────────
def demo_asn1parse(ed: EncryptedData) -> None:
"""Dump the EncryptedData DER structure using ``openssl asn1parse``."""
section("5. Inspect EncryptedData DER with openssl asn1parse")
openssl = shutil.which("openssl")
if openssl is None:
print(" Skipped: openssl not found on PATH")
return
with tempfile.NamedTemporaryFile(suffix=".der", delete=False) as fh:
fh.write(ed.to_der())
path = fh.name
try:
result = run(
[openssl, "asn1parse", "-inform", "DER", "-in", path],
capture_output=True, text=True,
)
print(" openssl asn1parse output:")
for line in result.stdout.strip().splitlines():
print(f" {line}")
finally:
os.unlink(path)
# ── Section 6: openssl → synta (reverse interop) ─────────────────────────────
def demo_openssl_to_synta() -> None:
"""Encrypt with ``openssl enc``, wrap in EncryptedData DER, decrypt with synta."""
section("6. Encrypt with openssl enc, decrypt with synta")
openssl = shutil.which("openssl")
if openssl is None:
print(" Skipped: openssl not found on PATH")
return
key_hex = KEY_128.hex()
plaintext = b"Declassified - cleared for public release."
print(f" plaintext : {plaintext.decode()!r}")
# Generate a random 16-byte IV using openssl rand.
iv_result = run([openssl, "rand", "-hex", "16"], capture_output=True, text=True)
iv_hex = iv_result.stdout.strip()
iv = bytes.fromhex(iv_hex)
print(f" IV (hex) : {iv_hex}")
# Encrypt with openssl enc.
enc_result = run(
[openssl, "enc", "-aes-128-cbc", "-e",
"-K", key_hex, "-iv", iv_hex, "-nosalt"],
input=plaintext,
capture_output=True,
)
ciphertext = enc_result.stdout
print(f" openssl ciphertext: {len(ciphertext)} bytes")
# Wrap the ciphertext in a EncryptedData DER and parse with synta.
ed_der = _build_encrypted_data_der(ciphertext, _AES128_CBC_OID_TLV, iv)
ed = EncryptedData.from_der(ed_der)
# Decrypt with synta.
recovered = ed.decrypt(KEY_128)
assert recovered == plaintext
print(f" synta decrypted : {recovered.decode()!r} ✓")
# ── Entry point ───────────────────────────────────────────────────────────────
def main() -> None:
print("=" * 64)
print("Example: CMS EncryptedData — encrypt, modify, decrypt, verify")
print("=" * 64)
try:
demo_create_inspect()
ed2 = demo_decrypt_modify_reencrypt()
demo_algorithms()
demo_verify_with_openssl(ed2)
demo_asn1parse(ed2)
demo_openssl_to_synta()
except NotImplementedError as exc:
print(f"\nNote: {exc}", file=sys.stderr)
print(
"Rebuild synta-python with --features openssl to run this example.",
file=sys.stderr,
)
sys.exit(1)
print("\nAll CMS EncryptedData examples completed.")
if __name__ == "__main__":
main()
```