synta 0.1.9

ASN.1 parser, decoder, and encoder library with DER/BER support and C FFI
Documentation
#!/usr/bin/env python3
"""Benchmark synta PKCS#7 / PKCS#12 Python bindings vs cryptography.

Measures the wall-clock cost of extracting X.509 certificates from PKCS#7
SignedData blobs and PKCS#12 archives through the synta Python API and,
optionally, through the cryptography package for comparison.

Benchmark IDs match the Rust Criterion IDs in pkcs_formats.rs exactly
-----------------------------------------------------------------------
    pkcs7/synta/amazon_roots                          avg: N µs
    pkcs7/cryptography/amazon_roots                   avg: N µs
    pkcs7/synta/pem_isrg                              avg: N µs
    pkcs7/cryptography/pem_isrg                       avg: N µs
    pkcs12/synta/unencrypted_3certs                   avg: N µs
    pkcs12/cryptography/unencrypted_3certs            avg: N µs
    pkcs12/synta/unencrypted_1cert_with_key           avg: N µs

Usage
-----
    # Build the Python extension (release build required):
    cargo build --release -p synta-python
    cp target/release/lib_synta.so python/synta/_synta.abi3.so

    # Run from the repository root:
    python python/bench_pkcs.py

    # Save Criterion-compatible JSON:
    python python/bench_pkcs.py --save-criterion
    python python/bench_pkcs.py --save-criterion path/to/criterion
"""

from __future__ import annotations

import sys
from pathlib import Path
from typing import Callable

from criterion_compat import measure as _criterion_measure
from criterion_compat import save_criterion_files as _criterion_save

# ── synta import ──────────────────────────────────────────────────────────────

try:
    import synta
except ImportError:
    print(
        "ERROR: 'synta' Python module not found.\n"
        "Build and install it with:\n"
        "    cargo build --release -p synta-python\n"
        "    cp target/release/lib_synta.so python/synta/_synta.abi3.so",
        file=sys.stderr,
    )
    sys.exit(1)

# ── cryptography import (optional) ────────────────────────────────────────────

_HAS_CRYPTOGRAPHY = False
_cx509_pkcs7_load_der = None
_cx509_pkcs7_load_pem = None
_cx509_pkcs12_load = None

try:
    from cryptography.hazmat.primitives.serialization.pkcs7 import (
        load_der_pkcs7_certificates as _cx509_pkcs7_load_der,
        load_pem_pkcs7_certificates as _cx509_pkcs7_load_pem,
    )
    from cryptography.hazmat.primitives.serialization.pkcs12 import (
        load_pkcs12 as _cx509_pkcs12_load,
    )
    _HAS_CRYPTOGRAPHY = True
except ImportError:
    print(
        "INFO: 'cryptography' package not installed — skipping comparison.\n"
        "Install with: pip install cryptography",
        file=sys.stderr,
    )

# ── Paths ─────────────────────────────────────────────────────────────────────

_SCRIPT_DIR = Path(__file__).resolve().parent
_REPO_ROOT = _SCRIPT_DIR.parent
_VECTORS = (
    _REPO_ROOT
    / "tests"
    / "vectors"
    / "cryptography"
    / "vectors"
    / "cryptography_vectors"
)
_PKCS7_DIR = _VECTORS / "pkcs7"
_PKCS12_DIR = _VECTORS / "pkcs12"

# ── CLI flags ─────────────────────────────────────────────────────────────────

_CRITERION_DIR: Path | None = None
if "--save-criterion" in sys.argv:
    _idx = sys.argv.index("--save-criterion")
    _next = sys.argv[_idx + 1] if _idx + 1 < len(sys.argv) else ""
    if _next and not _next.startswith("--"):
        _CRITERION_DIR = Path(_next)
    else:
        _CRITERION_DIR = _REPO_ROOT / "target" / "criterion"

# ── Timing configuration (matches Criterion defaults) ─────────────────────────

WARMUP_TIME_S = 3.0
MEASUREMENT_TIME_S = 5.0


def _measure(fn: Callable[[], object]) -> tuple[float, int, list[float], list[float]]:
    avg_us, iters, times_ns = _criterion_measure(fn, WARMUP_TIME_S, MEASUREMENT_TIME_S)
    return avg_us, sum(int(x) for x in iters), iters, times_ns


def _run(group: str, function: str, value: str, fn: Callable[[], object]) -> None:
    """Time *fn*, print the result, and optionally save Criterion JSON."""
    avg_us, total_iters, iters, times_ns = _measure(fn)
    print(f"{group}/{function}/{value}  avg: {avg_us:.3f} µs  ({total_iters} iterations)")
    if _CRITERION_DIR is not None:
        _criterion_save(_CRITERION_DIR, group, function, value, iters, times_ns)


# ── Benchmark functions ───────────────────────────────────────────────────────

def bench_load_der_pkcs7(data: bytes, value: str) -> None:
    """DER/BER PKCS#7 SignedData → list[Certificate]."""
    print()
    group = "pkcs7"

    _run(group, "synta", value,
         lambda: synta.load_der_pkcs7_certificates(data))

    if _HAS_CRYPTOGRAPHY:
        _run(group, "cryptography", value,
             lambda: _cx509_pkcs7_load_der(data))


def bench_load_pem_pkcs7(data: bytes, value: str) -> None:
    """PEM-encoded PKCS#7 SignedData → list[Certificate]."""
    print()
    group = "pkcs7"

    _run(group, "synta", value,
         lambda: synta.load_pem_pkcs7_certificates(data))

    if _HAS_CRYPTOGRAPHY:
        _run(group, "cryptography", value,
             lambda: _cx509_pkcs7_load_pem(data))


def bench_load_pkcs12(
    data: bytes,
    password: bytes | None,
    value: str,
    *,
    compare_cryptography: bool = True,
) -> None:
    """PKCS#12 archive → list[Certificate] (key bags silently skipped).

    Set *compare_cryptography=False* for archives that the cryptography package
    cannot parse (e.g. MAC-less / non-standard formats).
    """
    print()
    group = "pkcs12"

    _run(group, "synta", value,
         lambda: synta.load_pkcs12_certificates(data, password))

    if _HAS_CRYPTOGRAPHY and compare_cryptography:
        # cryptography returns a PKCS12KeyAndCertificates; collect cert + additional_certs.
        def _cx509_extract():
            r = _cx509_pkcs12_load(data, password)
            return ([r.cert] if r.cert else []) + list(r.additional_certs)
        _run(group, "cryptography", value, _cx509_extract)


# ── Main ─────────────────────────────────────────────────────────────────────

def main() -> None:
    if not _VECTORS.exists():
        print(
            f"ERROR: test vectors directory not found:\n  {_VECTORS}\n\n"
            "Run the Criterion benchmarks to fetch test vectors:\n"
            "    cargo bench -p synta-bench --bench pkcs_formats --no-run",
            file=sys.stderr,
        )
        sys.exit(1)

    if _CRITERION_DIR is not None:
        print(f"Criterion JSON output: {_CRITERION_DIR}", file=sys.stderr)

    # ── PKCS#7 DER: amazon-roots.p7b (2 certs, BER indefinite-length) ────────
    p = _PKCS7_DIR / "amazon-roots.p7b"
    if p.exists():
        bench_load_der_pkcs7(p.read_bytes(), "amazon_roots")
    else:
        print(f"\nWARNING: {p} not found — skipping load_der_pkcs7.", file=sys.stderr)

    # ── PKCS#7 PEM: isrg.pem (1 cert, PEM-wrapped SignedData) ────────────────
    p = _PKCS7_DIR / "isrg.pem"
    if p.exists():
        bench_load_pem_pkcs7(p.read_bytes(), "pem_isrg")
    else:
        print(f"\nWARNING: {p} not found — skipping load_pem_pkcs7.", file=sys.stderr)

    # ── PKCS#12 unencrypted: name-1-no-pwd.p12 (3 certs, id-data) ───────────
    p = _PKCS12_DIR / "name-1-no-pwd.p12"
    if p.exists():
        bench_load_pkcs12(p.read_bytes(), None, "unencrypted_3certs")
    else:
        print(f"\nWARNING: {p} not found — skipping load_pkcs12 (unencrypted).", file=sys.stderr)

    # ── PKCS#12 unencrypted: cert-none-key-none.p12 (1 cert + 1 key) ─────────
    # cryptography cannot parse this non-standard archive; synta-only.
    p = _PKCS12_DIR / "cert-none-key-none.p12"
    if p.exists():
        bench_load_pkcs12(
            p.read_bytes(), None, "unencrypted_1cert_with_key",
            compare_cryptography=False,
        )
    else:
        print(f"\nWARNING: {p} not found — skipping load_pkcs12 (cert+key).", file=sys.stderr)

    print()


if __name__ == "__main__":
    main()