from __future__ import annotations
import os
import sys
from pathlib import Path
from typing import Callable
from criterion_compat import measure as _criterion_measure
from criterion_compat import save_criterion_files as _criterion_save
try:
import synta
except ImportError:
print(
"ERROR: 'synta' Python module not found.\n"
"Build and install it with:\n"
" cd synta-python && maturin develop --release && cd ..",
file=sys.stderr,
)
sys.exit(1)
try:
from cryptography.x509 import load_der_x509_certificate as _cx509_load
_HAS_CRYPTOGRAPHY = True
except ImportError:
_HAS_CRYPTOGRAPHY = False
print(
"INFO: 'cryptography' package not installed — skipping cryptography_x509 comparison.\n"
"Install with: pip install cryptography",
file=sys.stderr,
)
_SCRIPT_DIR = Path(__file__).resolve().parent
_REPO_ROOT = _SCRIPT_DIR.parent
_TEST_VECTORS_DIR = _REPO_ROOT / "tests" / "vectors"
_CRYPTOGRAPHY_CERT_PATH = (
Path("cryptography") / "vectors" / "cryptography_vectors" / "x509"
)
_LAMPS_ML_DSA_CERT_PATH = Path("dilithium-certificates") / "examples"
_CERT_EXTENSIONS = {".pem", ".der", ".crt", ".pub"}
_PER_FIELD = "--per-field" in sys.argv
_CRITERION_DIR: Path | None = None
if "--save-criterion" in sys.argv:
_idx = sys.argv.index("--save-criterion")
_next = sys.argv[_idx + 1] if _idx + 1 < len(sys.argv) else ""
if _next and not _next.startswith("--"):
_CRITERION_DIR = Path(_next)
else:
_CRITERION_DIR = _REPO_ROOT / "target" / "criterion"
def _find_certs(base: Path, max_files: int = 5,
extensions: set[str] = _CERT_EXTENSIONS) -> list[Path]:
found: list[Path] = []
for root, _dirs, files in os.walk(base):
for fname in files:
p = Path(root) / fname
if p.suffix.lower() in extensions:
found.append(p)
return sorted(found)[:max_files]
def _load_der(path: Path) -> bytes:
raw = path.read_bytes()
if b"-----BEGIN" in raw:
try:
return synta.pem_to_der(raw)[0]
except Exception:
pass
return raw
def _access_synta_fields(cert: object) -> None:
_ = cert.serial_number _ = cert.issuer
_ = cert.subject
_ = cert.signature_algorithm
_ = cert.signature_algorithm_oid
_ = cert.signature_algorithm_params
_ = cert.signature_value
_ = cert.not_before
_ = cert.not_after
_ = cert.public_key_algorithm
_ = cert.public_key_algorithm_oid
_ = cert.public_key_algorithm_params
_ = cert.public_key _ = cert.extensions_der
_ = cert.issuer_raw_der
_ = cert.subject_raw_der
_ = cert.to_der()
_ = cert.tbs_bytes
_ = cert.version
def _access_cx509_fields(cert: object) -> None:
_ = cert.serial_number _ = cert.issuer
_ = cert.subject
_ = cert.signature_algorithm_oid
_ = cert.signature
_ = cert.not_valid_before_utc
_ = cert.not_valid_after_utc
try:
_ = cert.public_key() except Exception:
pass
_ = cert.version
_SYNTA_FIELD_FNS: list[tuple[str, Callable[[object], object]]] = [
("serial_number", lambda c: c.serial_number), ("issuer", lambda c: c.issuer),
("subject", lambda c: c.subject),
("signature_algorithm", lambda c: c.signature_algorithm),
("signature_algorithm_oid", lambda c: c.signature_algorithm_oid),
("signature_algorithm_params", lambda c: c.signature_algorithm_params),
("signature_value", lambda c: c.signature_value),
("not_before", lambda c: c.not_before),
("not_after", lambda c: c.not_after),
("public_key_algorithm", lambda c: c.public_key_algorithm),
("public_key_algorithm_oid", lambda c: c.public_key_algorithm_oid),
("public_key_algorithm_params", lambda c: c.public_key_algorithm_params),
("public_key", lambda c: c.public_key),
("extensions_der", lambda c: c.extensions_der),
("issuer_raw_der", lambda c: c.issuer_raw_der),
("subject_raw_der", lambda c: c.subject_raw_der),
("to_der", lambda c: c.to_der()),
("tbs_bytes", lambda c: c.tbs_bytes),
("version", lambda c: c.version),
]
_CX509_FIELD_FNS: list[tuple[str, Callable[[object], object]]] = [
("serial_number", lambda c: c.serial_number), ("issuer", lambda c: c.issuer),
("subject", lambda c: c.subject),
("signature_algorithm_oid", lambda c: c.signature_algorithm_oid),
("signature", lambda c: c.signature),
("not_valid_before_utc", lambda c: c.not_valid_before_utc),
("not_valid_after_utc", lambda c: c.not_valid_after_utc),
("public_key", lambda c: _cx509_pubkey_safe(c)),
("version", lambda c: c.version),
]
def _cx509_pubkey_safe(cert: object) -> object:
try:
return cert.public_key() except Exception:
return None
WARMUP_TIME_S = 3.0
MEASUREMENT_TIME_S = 5.0
WARMUP_TIME_S_QUICK = 1.0
MEASUREMENT_TIME_S_QUICK = 2.0
def _measure(
fn: Callable[[], object],
warmup_s: float,
measure_s: float,
) -> tuple[float, int, list[float], list[float]]:
avg_us, iters_list, times_ns_list = _criterion_measure(fn, warmup_s, measure_s)
total_iters = sum(int(x) for x in iters_list)
return avg_us, total_iters, iters_list, times_ns_list
def _bench_certs(
group_name: str,
binding: str,
certs: list[tuple[str, bytes]],
parse_fn: Callable[[bytes], object],
access_fn: Callable[[object], None] | None = None,
*,
criterion_dir: Path | None = None,
) -> None:
for cert_id, der_bytes in certs:
if access_fn is not None:
work = lambda: access_fn(parse_fn(der_bytes)) else:
work = lambda: parse_fn(der_bytes) avg_us, total_iters, iters_list, times_ns_list = _measure(
work, WARMUP_TIME_S, MEASUREMENT_TIME_S
)
print(f"{group_name}/{binding}/{cert_id} avg: {avg_us:.2f} µs ({total_iters} iterations)")
if criterion_dir is not None:
_criterion_save(criterion_dir, group_name, binding, cert_id,
iters_list, times_ns_list)
def _bench_access_only(
group_name: str,
binding: str,
certs: list[tuple[str, bytes]],
parse_fn: Callable[[bytes], object],
access_fn: Callable[[object], None],
*,
criterion_dir: Path | None = None,
) -> None:
for cert_id, der_bytes in certs:
cert = parse_fn(der_bytes)
access_fn(cert)
avg_us, total_iters, iters_list, times_ns_list = _measure(
lambda: access_fn(cert), WARMUP_TIME_S, MEASUREMENT_TIME_S
)
print(f"{group_name}/{binding}/{cert_id} avg: {avg_us:.2f} µs ({total_iters} iterations)")
if criterion_dir is not None:
_criterion_save(criterion_dir, group_name, binding, cert_id,
iters_list, times_ns_list)
def _bench_fields(
group_name: str,
binding: str,
certs: list[tuple[str, bytes]],
parse_fn: Callable[[bytes], object],
field_fns: list[tuple[str, Callable[[object], object]]],
*,
criterion_dir: Path | None = None,
) -> None:
for cert_id, der_bytes in certs:
cert = parse_fn(der_bytes)
for field_name, field_fn in field_fns:
field_fn(cert)
avg_us, total_iters, iters_list, times_ns_list = _measure(
lambda fn=field_fn: fn(cert), WARMUP_TIME_S_QUICK, MEASUREMENT_TIME_S_QUICK
)
print(
f"{group_name}/{binding}/{field_name}/{cert_id}"
f" avg: {avg_us:.3f} µs ({total_iters} iterations)"
)
if criterion_dir is not None:
_criterion_save(criterion_dir, group_name, binding,
f"{field_name}/{cert_id}",
iters_list, times_ns_list)
def _run_group(
prefix: str,
certs: list[tuple[str, bytes]],
*,
criterion_dir: Path | None = None,
) -> None:
if not certs:
return
kw = {"criterion_dir": criterion_dir}
print()
_bench_certs(prefix, "synta", certs, synta.Certificate.from_der, **kw)
print()
_bench_certs(prefix, "synta_full", certs, synta.Certificate.full_from_der, **kw)
if _HAS_CRYPTOGRAPHY:
print()
_bench_certs(prefix, "cryptography_x509", certs, _cx509_load, **kw)
print()
_bench_certs(f"{prefix}_fields", "synta", certs,
synta.Certificate.from_der, _access_synta_fields, **kw)
if _HAS_CRYPTOGRAPHY:
print()
_bench_certs(f"{prefix}_fields", "cryptography_x509", certs,
_cx509_load, _access_cx509_fields, **kw)
print()
_bench_access_only(f"{prefix}_access", "synta", certs,
synta.Certificate.from_der, _access_synta_fields, **kw)
if _HAS_CRYPTOGRAPHY:
print()
_bench_access_only(f"{prefix}_access", "cryptography_x509", certs,
_cx509_load, _access_cx509_fields, **kw)
if _PER_FIELD:
print()
_bench_fields(f"{prefix}_getter", "synta", certs,
synta.Certificate.from_der, _SYNTA_FIELD_FNS, **kw)
if _HAS_CRYPTOGRAPHY:
print()
_bench_fields(f"{prefix}_getter", "cryptography_x509", certs,
_cx509_load, _CX509_FIELD_FNS, **kw)
def main() -> None:
if not _TEST_VECTORS_DIR.exists():
print(
f"ERROR: test vectors directory not found:\n {_TEST_VECTORS_DIR}\n\n"
"Run the Criterion benchmarks first to clone certificate repositories:\n"
" cargo bench -p synta-bench --bench bindings --features bench-bindings --no-run",
file=sys.stderr,
)
sys.exit(1)
crypto_dir = _TEST_VECTORS_DIR / _CRYPTOGRAPHY_CERT_PATH
if not crypto_dir.exists():
print(
f"WARNING: PyCA cryptography cert directory not found: {crypto_dir}",
file=sys.stderr,
)
comparison_certs: list[tuple[str, bytes]] = []
else:
paths = _find_certs(crypto_dir, max_files=5)
comparison_certs = []
for idx, path in enumerate(paths):
try:
der = _load_der(path)
except Exception as exc:
print(f"WARNING: skipping {path.name}: {exc}", file=sys.stderr)
continue
comparison_certs.append((f"cert_{idx:02d}_{path.name[:30]}", der))
if len(comparison_certs) != 5:
print(
f"WARNING: expected 5 traditional certificates, found {len(comparison_certs)}. "
"Results may not match the Rust bindings benchmark.",
file=sys.stderr,
)
if _CRITERION_DIR is not None:
print(f"\nCriterion JSON output: {_CRITERION_DIR}", file=sys.stderr)
_run_group("binding_comparison", comparison_certs, criterion_dir=_CRITERION_DIR)
lamps_dir = _TEST_VECTORS_DIR / _LAMPS_ML_DSA_CERT_PATH
if not lamps_dir.exists():
print(
f"WARNING: LAMPS ML-DSA cert directory not found: {lamps_dir}",
file=sys.stderr,
)
pq_certs: list[tuple[str, bytes]] = []
else:
pq_paths = _find_certs(lamps_dir, max_files=3, extensions={".crt"})
pq_certs = []
for idx, path in enumerate(pq_paths):
try:
der = _load_der(path)
except Exception as exc:
print(f"WARNING: skipping {path.name}: {exc}", file=sys.stderr)
continue
pq_certs.append((f"cert_{idx:02d}_{path.name}", der))
if len(pq_certs) != 3:
print(
f"WARNING: expected 3 ML-DSA certificates, found {len(pq_certs)}. "
"Results may not match the Rust bindings benchmark.",
file=sys.stderr,
)
_run_group("binding_post_quantum", pq_certs, criterion_dir=_CRITERION_DIR)
if not comparison_certs and not pq_certs:
print(
"\nNo certificates found. Run the Rust benchmark first to populate test vectors.",
file=sys.stderr,
)
sys.exit(1)
if __name__ == "__main__":
main()