synta 0.2.2

ASN.1 parser, decoder, and encoder library with DER/BER support and C FFI
Documentation
#!/usr/bin/env python3
"""
Example: CMS EncryptedData — create, decrypt, replace content, re-encrypt, verify.

Demonstrates:
  - EncryptedData.create()   — encrypt plaintext with a symmetric key
  - EncryptedData.decrypt()  — recover plaintext
  - Round-trip: load → decrypt → replace text → re-encrypt → export DER
  - Interoperability: verify synta-produced ciphertext with ``openssl enc``
  - Interoperability: parse synta-produced DER with ``openssl asn1parse``
  - Reverse: encrypt with ``openssl enc``, decrypt with synta

Requirements:
  - synta built with the ``openssl`` Cargo feature
    (``maturin develop --features openssl`` or equivalent)
  - openssl(1) on PATH for the interoperability sections

Usage::

    python examples/example_cms_encrypted_data.py
"""

import os
import shutil
import subprocess
import sys
import tempfile

from synta.cms import (
    EncryptedData,
    ID_AES128_CBC,
    ID_AES192_CBC,
    ID_AES256_CBC,
)

# ── Symmetric keys ────────────────────────────────────────────────────────────

KEY_128 = bytes.fromhex("00112233445566778899aabbccddeeff")
KEY_192 = bytes.fromhex("000102030405060708090a0b0c0d0e0f1011121314151617")
KEY_256 = bytes(range(32))


# ── Formatting helpers ────────────────────────────────────────────────────────

def section(title: str) -> None:
    print(f"\n{'' * 64}\n{title}\n{'' * 64}")


def run(cmd: list[str], **kwargs) -> subprocess.CompletedProcess:
    """Run a subprocess, printing the command and any stderr on failure."""
    print(f"  $ {' '.join(cmd)}")
    result = subprocess.run(cmd, **kwargs)
    if result.returncode != 0:
        stderr = getattr(result, "stderr", b"")
        if isinstance(stderr, bytes):
            stderr = stderr.decode(errors="replace")
        sys.exit(f"  Command failed (rc={result.returncode}): {stderr.strip()}")
    return result


# ── DER helpers (used by the openssl→synta interop section) ──────────────────

def _der_len(n: int) -> bytes:
    """Encode a DER length."""
    if n < 0x80:
        return bytes([n])
    elif n < 0x100:
        return bytes([0x81, n])
    else:
        return bytes([0x82, n >> 8, n & 0xFF])


def _seq(body: bytes) -> bytes:
    """Wrap body in a DER SEQUENCE TLV."""
    return b"\x30" + _der_len(len(body)) + body


def _iv_from_params(params: bytes) -> bytes:
    """Extract the raw IV bytes from a DER OCTET STRING parameter field.

    ``content_encryption_algorithm_params`` for CBC ciphers is the DER
    encoding of ``OCTET STRING { iv }``, i.e. ``04 <len> <iv bytes>``.
    """
    if params is None or len(params) < 2 or params[0] != 0x04:
        raise ValueError(
            f"expected OCTET STRING (tag 0x04), got: {params[:4].hex() if params else 'None'}"
        )
    iv_len = params[1]
    return params[2 : 2 + iv_len]


def _build_encrypted_data_der(
    ciphertext: bytes,
    alg_oid_tlv: bytes,
    iv: bytes,
    content_type_oid_tlv: bytes | None = None,
) -> bytes:
    """Assemble a minimal ``EncryptedData`` DER SEQUENCE (RFC 5652 §8).

    Used to wrap a ciphertext produced externally (e.g. by ``openssl enc``)
    into a structure that ``EncryptedData.from_der()`` can parse and
    ``EncryptedData.decrypt()`` can decrypt.

    Args:
        ciphertext: Raw ciphertext bytes (the OCTET STRING value).
        alg_oid_tlv: DER-encoded OID TLV (``06 <len> <value>``).
        iv: Raw IV bytes; encoded as ``OCTET STRING`` inside
            ``AlgorithmIdentifier.parameters``.
        content_type_oid_tlv: DER-encoded OID TLV for the content type;
            defaults to ``id-data`` (1.2.840.113549.1.7.1).

    Returns:
        DER-encoded ``EncryptedData`` SEQUENCE.
    """
    # Default content type: id-data (1.2.840.113549.1.7.1)
    if content_type_oid_tlv is None:
        content_type_oid_tlv = bytes(
            [0x06, 0x09, 0x2A, 0x86, 0x48, 0x86, 0xF7, 0x0D, 0x01, 0x07, 0x01]
        )

    # AlgorithmIdentifier ::= SEQUENCE { OID, OCTET STRING(iv) }
    iv_tlv = b"\x04" + _der_len(len(iv)) + iv
    alg_id = _seq(alg_oid_tlv + iv_tlv)

    # encryptedContent [0] IMPLICIT OCTET STRING
    enc_content = b"\x80" + _der_len(len(ciphertext)) + ciphertext

    # EncryptedContentInfo ::= SEQUENCE { contentType, alg, encryptedContent }
    eci = _seq(content_type_oid_tlv + alg_id + enc_content)

    # EncryptedData ::= SEQUENCE { version INTEGER 0, encryptedContentInfo }
    return _seq(b"\x02\x01\x00" + eci)


# DER encoding of the ID_AES128_CBC OID (tag 0x06, length 9, value bytes).
# This is the fixed TLV for OID 2.16.840.1.101.3.4.1.2 used when manually
# assembling an EncryptedData DER structure for the openssl→synta interop demo.
_AES128_CBC_OID_TLV = bytes(
    [0x06, 0x09, 0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x01, 0x02]
)


# ── Section 1: Create and inspect ─────────────────────────────────────────────

def demo_create_inspect() -> None:
    section("1. Create an EncryptedData and inspect its fields")

    plaintext = b"This is the confidential content protected by AES-128-CBC."
    ed = EncryptedData.create(plaintext, KEY_128, ID_AES128_CBC)

    iv = _iv_from_params(ed.content_encryption_algorithm_params)
    print(f"  version                          : {ed.version}")
    print(f"  content_type OID                 : {ed.content_type}")
    print(f"  content_encryption_algorithm OID : {ed.content_encryption_algorithm_oid}")
    print(f"  IV (hex)                         : {iv.hex()}")
    print(f"  encrypted_content length         : {len(ed.encrypted_content)} bytes")
    print(f"  EncryptedData DER length         : {len(ed.to_der())} bytes")

    recovered = ed.decrypt(KEY_128)
    assert recovered == plaintext
    print(f"  decrypt()                        : {recovered.decode()!r}")


# ── Section 2: Decrypt → replace text → re-encrypt ───────────────────────────

def demo_decrypt_modify_reencrypt() -> EncryptedData:
    """Load a CMS EncryptedData, decrypt it, replace text, and re-encrypt.

    Returns the re-encrypted EncryptedData object for use in later sections.
    """
    section("2. Decrypt → replace text → re-encrypt")

    original = b"Status: PENDING - awaiting manager approval."
    print(f"  original plaintext : {original.decode()!r}")

    # Encrypt the original content.
    ed1 = EncryptedData.create(original, KEY_128, ID_AES128_CBC)

    # Decrypt.
    decrypted = ed1.decrypt(KEY_128)
    assert decrypted == original
    print(f"  decrypted          : {decrypted.decode()!r}")

    # Replace text in the recovered plaintext.
    modified = (
        decrypted
        .replace(b"PENDING", b"APPROVED")
        .replace(b"awaiting manager approval", b"signed off by manager")
    )
    print(f"  modified           : {modified.decode()!r}")

    # Re-encrypt — synta generates a fresh random IV automatically.
    ed2 = EncryptedData.create(modified, KEY_128, ID_AES128_CBC)

    # Verify the round-trip.
    assert ed2.decrypt(KEY_128) == modified
    print(f"  re-encrypt + verify: ✓")

    # Parse from DER to confirm serialisation round-trip.
    ed3 = EncryptedData.from_der(ed2.to_der())
    assert ed3.decrypt(KEY_128) == modified
    print(f"  DER round-trip     : {len(ed2.to_der())} bytes  ✓")

    return ed2


# ── Section 3: Multiple AES key sizes ────────────────────────────────────────

def demo_algorithms() -> None:
    section("3. Encrypt with AES-128/192/256-CBC")

    cases = [
        ("AES-128-CBC", ID_AES128_CBC, KEY_128),
        ("AES-192-CBC", ID_AES192_CBC, KEY_192),
        ("AES-256-CBC", ID_AES256_CBC, KEY_256),
    ]
    for label, oid, key in cases:
        pt = f"Protected by {label}.".encode()
        ed = EncryptedData.create(pt, key, oid)
        assert ed.decrypt(key) == pt
        print(f"  {label}: {len(ed.encrypted_content):3d} byte ciphertext  ✓")

    # Confirm random IV: two encryptions of the same plaintext differ.
    a = EncryptedData.create(b"same input", KEY_128, ID_AES128_CBC)
    b_ = EncryptedData.create(b"same input", KEY_128, ID_AES128_CBC)
    assert a.content_encryption_algorithm_params != b_.content_encryption_algorithm_params
    print("  Random IV per call: ciphertexts differ  ✓")


# ── Section 4: Verify with openssl enc ───────────────────────────────────────

def demo_verify_with_openssl(ed: EncryptedData) -> None:
    """Use ``openssl enc`` to decrypt synta-produced ciphertext and confirm
    the modified plaintext is present.

    ``ed`` should be the re-encrypted EncryptedData from Section 2.
    """
    section("4. Verify re-encrypted content with openssl enc")

    openssl = shutil.which("openssl")
    if openssl is None:
        print("  Skipped: openssl not found on PATH")
        return

    key_hex = KEY_128.hex()
    iv = _iv_from_params(ed.content_encryption_algorithm_params)
    ciphertext = ed.encrypted_content

    print(f"  key (hex) : {key_hex}")
    print(f"  IV  (hex) : {iv.hex()}")
    print(f"  ciphertext: {len(ciphertext)} bytes")

    result = run(
        [openssl, "enc", "-aes-128-cbc", "-d",
         "-K", key_hex, "-iv", iv.hex(), "-nosalt"],
        input=ciphertext,
        capture_output=True,
    )
    recovered = result.stdout
    print(f"  openssl decrypted : {recovered.decode()!r}")
    assert b"APPROVED" in recovered
    assert b"signed off by manager" in recovered


# ── Section 5: Inspect DER with openssl asn1parse ────────────────────────────

def demo_asn1parse(ed: EncryptedData) -> None:
    """Dump the EncryptedData DER structure using ``openssl asn1parse``."""
    section("5. Inspect EncryptedData DER with openssl asn1parse")

    openssl = shutil.which("openssl")
    if openssl is None:
        print("  Skipped: openssl not found on PATH")
        return

    with tempfile.NamedTemporaryFile(suffix=".der", delete=False) as fh:
        fh.write(ed.to_der())
        path = fh.name

    try:
        result = run(
            [openssl, "asn1parse", "-inform", "DER", "-in", path],
            capture_output=True, text=True,
        )
        print("  openssl asn1parse output:")
        for line in result.stdout.strip().splitlines():
            print(f"    {line}")
    finally:
        os.unlink(path)


# ── Section 6: openssl → synta (reverse interop) ─────────────────────────────

def demo_openssl_to_synta() -> None:
    """Encrypt with ``openssl enc``, wrap in EncryptedData DER, decrypt with synta."""
    section("6. Encrypt with openssl enc, decrypt with synta")

    openssl = shutil.which("openssl")
    if openssl is None:
        print("  Skipped: openssl not found on PATH")
        return

    key_hex = KEY_128.hex()
    plaintext = b"Declassified - cleared for public release."
    print(f"  plaintext : {plaintext.decode()!r}")

    # Generate a random 16-byte IV using openssl rand.
    iv_result = run([openssl, "rand", "-hex", "16"], capture_output=True, text=True)
    iv_hex = iv_result.stdout.strip()
    iv = bytes.fromhex(iv_hex)
    print(f"  IV (hex)  : {iv_hex}")

    # Encrypt with openssl enc.
    enc_result = run(
        [openssl, "enc", "-aes-128-cbc", "-e",
         "-K", key_hex, "-iv", iv_hex, "-nosalt"],
        input=plaintext,
        capture_output=True,
    )
    ciphertext = enc_result.stdout
    print(f"  openssl ciphertext: {len(ciphertext)} bytes")

    # Wrap the ciphertext in a EncryptedData DER and parse with synta.
    ed_der = _build_encrypted_data_der(ciphertext, _AES128_CBC_OID_TLV, iv)
    ed = EncryptedData.from_der(ed_der)

    # Decrypt with synta.
    recovered = ed.decrypt(KEY_128)
    assert recovered == plaintext
    print(f"  synta decrypted   : {recovered.decode()!r}")


# ── Entry point ───────────────────────────────────────────────────────────────

def main() -> None:
    print("=" * 64)
    print("Example: CMS EncryptedData — encrypt, modify, decrypt, verify")
    print("=" * 64)

    try:
        demo_create_inspect()
        ed2 = demo_decrypt_modify_reencrypt()
        demo_algorithms()
        demo_verify_with_openssl(ed2)
        demo_asn1parse(ed2)
        demo_openssl_to_synta()
    except NotImplementedError as exc:
        print(f"\nNote: {exc}", file=sys.stderr)
        print(
            "Rebuild synta-python with --features openssl to run this example.",
            file=sys.stderr,
        )
        sys.exit(1)

    print("\nAll CMS EncryptedData examples completed.")


if __name__ == "__main__":
    main()