synta 0.1.1

ASN.1 parser, decoder, and encoder library with DER/BER support and C FFI
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
#!/usr/bin/env python3
"""Benchmark synta Python bindings (PyO3) vs cryptography.x509 certificate parsing.

Loads the same certificate files used by the Criterion Rust benchmarks and
times each parser using time.perf_counter(), reporting per-iteration latency in
µs to match the units shown by Criterion.

Parsers / subtests
------------------
Four measurement groups are run for each certificate set:

  parse-only  (group: binding_comparison, binding_post_quantum)
      Times the parse call alone.  synta.Certificate.from_der() is lazy
      (shallow 4-op envelope scan; full decode deferred to first field
      access).  synta.Certificate.full_from_der() performs a complete RFC
      5280 decode immediately, matching what rust_typed measures.
      cryptography_x509 is also lazy (envelope validated only).

  parse + field access  (group: binding_comparison_fields,
                                binding_post_quantum_fields)
      Times parse + reading every field on the returned object in a single
      loop iteration (parse cost + field cost combined).

  field access only  (group: binding_comparison_access,
                             binding_post_quantum_access)
      Times reading every field on a *pre-parsed* object; parse cost is
      excluded.  This isolates getter overhead from parse overhead and
      mirrors the distinction that Criterion maintains between
      binding_comparison (parse-only) and binding_comparison_fields
      (parse+fields).

  per-field (group: binding_comparison_getter, binding_post_quantum_getter)
      Times each individual getter in isolation on a pre-parsed object
      (warm cache).  Enabled only when --per-field is passed.

Bindings under test
-------------------
- synta             — synta.Certificate.from_der()             (lazy: shallow envelope scan, PyO3)
- synta_full        — synta.Certificate.full_from_der()        (eager: full RFC 5280 decode, PyO3)
- cryptography_x509 — cryptography.x509.load_der_x509_certificate()

If the 'cryptography' package is not installed the comparison columns are
skipped automatically.

Public-key field asymmetry (T5)
--------------------------------
synta exposes:
    cert.public_key_algorithm  → str  (algorithm name, e.g. "RSA")
    cert.public_key            → bytes  (raw SubjectPublicKeyInfo bit-string,
                                         no further decoding)

cryptography.x509 exposes:
    cert.public_key_algorithm_oid  → ObjectIdentifier
    cert.public_key()              → key object  (triggers full SPKI DER decode
                                                  + key-material loading)

There is no public API in cryptography to retrieve raw SPKI bytes without
parsing the key.  The _access_cx509_fields function therefore calls
cert.public_key() which does MORE work than synta's cert.public_key.  This
makes the cryptography_x509 parse+fields and field-access-only numbers
slightly higher than they would be for a byte-equivalent comparison.  The
asymmetry is documented here rather than hidden.

Alignment with synta-bench/benches/bindings.rs
----------------------------------------------
- Traditional certificates: first 5 files (globally sorted) from
  cryptography/vectors/cryptography_vectors/x509  (same as Rust truncate(5))
- Post-quantum certificates: first 3 .crt files (globally sorted) from
  dilithium-certificates/examples  (same as Rust .filter(.crt).take(3))
- cert_id format: cert_NN_<filename[:30]> for traditional,
                  cert_NN_<filename>      for post-quantum
- Extension set: {".pem", ".der", ".crt", ".pub"} — matches
  find_certificate_files_recursive() in tests/test_utils/repo.rs exactly
- Warmup: runs discarded for WARMUP_TIME_S seconds before the timed
  measurement, mirroring Criterion's warmup phase (default: 3 s)
- Measurement: runs for MEASUREMENT_TIME_S seconds (default: 5 s),
  counting iterations adaptively — matching Criterion's adaptive sampling

Usage
-----
    # Build the Python extension first:
    cd synta-python && maturin develop --release && cd ..

    # Run from the repository root:
    python python/bench_certificate.py

    # Also run per-field getter breakdown:
    python python/bench_certificate.py --per-field

    # Save Criterion-compatible JSON (default dir: target/criterion):
    python python/bench_certificate.py --save-criterion
    python python/bench_certificate.py --save-criterion path/to/criterion

Output format (mirrors Criterion benchmark IDs)
-----------------------------------------------
    binding_comparison/synta/cert_00_<name>                      avg: N µs  (N iters)
    binding_comparison/synta_full/cert_00_<name>                 avg: N µs  (N iters)
    binding_comparison/cryptography_x509/cert_00_<name>          avg: N µs  (N iters)
    binding_comparison_fields/synta/cert_00_<name>               avg: N µs  (N iters)
    binding_comparison_fields/cryptography_x509/cert_00_<name>   avg: N µs  (N iters)
    binding_comparison_access/synta/cert_00_<name>               avg: N µs  (N iters)
    binding_comparison_access/cryptography_x509/cert_00_<name>   avg: N µs  (N iters)
    binding_comparison_getter/synta/issuer/cert_00_<name>        avg: N µs  (N iters)  [--per-field]
    ...
"""

from __future__ import annotations

import os
import sys
from pathlib import Path
from typing import Callable

from criterion_compat import measure as _criterion_measure
from criterion_compat import save_criterion_files as _criterion_save

# ── synta import ──────────────────────────────────────────────────────────────

try:
    import synta
except ImportError:
    print(
        "ERROR: 'synta' Python module not found.\n"
        "Build and install it with:\n"
        "    cd synta-python && maturin develop --release && cd ..",
        file=sys.stderr,
    )
    sys.exit(1)

# ── cryptography.x509 import (optional) ───────────────────────────────────────

try:
    from cryptography.x509 import load_der_x509_certificate as _cx509_load
    _HAS_CRYPTOGRAPHY = True
except ImportError:
    _HAS_CRYPTOGRAPHY = False
    print(
        "INFO: 'cryptography' package not installed — skipping cryptography_x509 comparison.\n"
        "Install with: pip install cryptography",
        file=sys.stderr,
    )

# ── Certificate file discovery ────────────────────────────────────────────────

# Test vectors are cloned by the Criterion benchmarks under target/test_vectors/
# relative to the synta-bench directory.  Resolve from this script's location.
_SCRIPT_DIR = Path(__file__).resolve().parent
_REPO_ROOT = _SCRIPT_DIR.parent
_TEST_VECTORS_DIR = _REPO_ROOT / "tests" / "vectors"

# Paths to the certificate subtrees within test_vectors/, matching the
# dir_name + cert_path values in tests/test_utils/repo.rs RepoConfig.
_CRYPTOGRAPHY_CERT_PATH = (
    Path("cryptography") / "vectors" / "cryptography_vectors" / "x509"
)
_LAMPS_ML_DSA_CERT_PATH = Path("dilithium-certificates") / "examples"

# Matches find_certificate_files_recursive() in tests/test_utils/repo.rs
_CERT_EXTENSIONS = {".pem", ".der", ".crt", ".pub"}

_PER_FIELD = "--per-field" in sys.argv

# --save-criterion [DIR]: write Criterion-compatible JSON to DIR
# (default: target/criterion relative to the repo root when flag is present).
_CRITERION_DIR: Path | None = None
if "--save-criterion" in sys.argv:
    _idx = sys.argv.index("--save-criterion")
    _next = sys.argv[_idx + 1] if _idx + 1 < len(sys.argv) else ""
    if _next and not _next.startswith("--"):
        _CRITERION_DIR = Path(_next)
    else:
        _CRITERION_DIR = _REPO_ROOT / "target" / "criterion"


def _find_certs(base: Path, max_files: int = 5,
                extensions: set[str] = _CERT_EXTENSIONS) -> list[Path]:
    """Return up to *max_files* certificate paths under *base*.

    Collects all matching paths recursively, then sorts the full list
    before truncating — matching the behaviour of the Rust bench's
    find_certificate_files_in_repo() which calls Vec::sort() on all paths.
    """
    found: list[Path] = []
    for root, _dirs, files in os.walk(base):
        for fname in files:
            p = Path(root) / fname
            if p.suffix.lower() in extensions:
                found.append(p)
    return sorted(found)[:max_files]


def _load_der(path: Path) -> bytes:
    """Return DER bytes for the first block, decoding PEM if necessary."""
    raw = path.read_bytes()
    if b"-----BEGIN" in raw:
        try:
            return synta.pem_to_der(raw)[0]
        except Exception:
            pass
    return raw


# ── Field access helpers ──────────────────────────────────────────────────────

def _access_synta_fields(cert: object) -> None:
    """Read every field of a synta.Certificate."""
    _ = cert.serial_number              # type: ignore[attr-defined]
    _ = cert.issuer
    _ = cert.subject
    _ = cert.signature_algorithm
    _ = cert.signature_algorithm_oid
    _ = cert.signature_algorithm_params
    _ = cert.signature_value
    _ = cert.not_before
    _ = cert.not_after
    _ = cert.public_key_algorithm
    _ = cert.public_key_algorithm_oid
    _ = cert.public_key_algorithm_params
    _ = cert.public_key                 # raw SubjectPublicKeyInfo bit-string bytes
    _ = cert.extensions_der
    _ = cert.issuer_raw_der
    _ = cert.subject_raw_der
    _ = cert.to_der()
    _ = cert.tbs_bytes
    _ = cert.version


def _access_cx509_fields(cert: object) -> None:
    """Read every field of a cryptography.x509.Certificate.

    Note: cert.public_key() (called below) triggers full SubjectPublicKeyInfo
    DER decode and loads key material into a cryptography key object.  This is
    more work than synta's cert.public_key which returns the raw bytes without
    further parsing.  See the module docstring (Public-key field asymmetry) for
    details.
    """
    _ = cert.serial_number          # type: ignore[attr-defined]
    _ = cert.issuer
    _ = cert.subject
    _ = cert.signature_algorithm_oid
    _ = cert.signature
    _ = cert.not_valid_before_utc
    _ = cert.not_valid_after_utc
    try:
        # Parses the SubjectPublicKeyInfo and constructs a key object.
        # May raise UnsupportedAlgorithm for post-quantum key types that the
        # installed cryptography version does not yet recognise (e.g. ML-DSA).
        _ = cert.public_key()       # type: ignore[attr-defined]
    except Exception:
        pass
    _ = cert.version


# Per-field function lists for --per-field breakdown (T3).
# Each entry is (field_name, getter_fn).
_SYNTA_FIELD_FNS: list[tuple[str, Callable[[object], object]]] = [
    ("serial_number",              lambda c: c.serial_number),  # type: ignore[attr-defined]
    ("issuer",                     lambda c: c.issuer),
    ("subject",                    lambda c: c.subject),
    ("signature_algorithm",        lambda c: c.signature_algorithm),
    ("signature_algorithm_oid",    lambda c: c.signature_algorithm_oid),
    ("signature_algorithm_params", lambda c: c.signature_algorithm_params),
    ("signature_value",            lambda c: c.signature_value),
    ("not_before",                 lambda c: c.not_before),
    ("not_after",                  lambda c: c.not_after),
    ("public_key_algorithm",       lambda c: c.public_key_algorithm),
    ("public_key_algorithm_oid",   lambda c: c.public_key_algorithm_oid),
    ("public_key_algorithm_params", lambda c: c.public_key_algorithm_params),
    ("public_key",                 lambda c: c.public_key),
    ("extensions_der",             lambda c: c.extensions_der),
    ("issuer_raw_der",             lambda c: c.issuer_raw_der),
    ("subject_raw_der",            lambda c: c.subject_raw_der),
    ("to_der",                     lambda c: c.to_der()),
    ("tbs_bytes",                  lambda c: c.tbs_bytes),
    ("version",                    lambda c: c.version),
]

_CX509_FIELD_FNS: list[tuple[str, Callable[[object], object]]] = [
    ("serial_number",         lambda c: c.serial_number),    # type: ignore[attr-defined]
    ("issuer",                lambda c: c.issuer),
    ("subject",               lambda c: c.subject),
    ("signature_algorithm_oid", lambda c: c.signature_algorithm_oid),
    ("signature",             lambda c: c.signature),
    ("not_valid_before_utc",  lambda c: c.not_valid_before_utc),
    ("not_valid_after_utc",   lambda c: c.not_valid_after_utc),
    # public_key() triggers SPKI decode; see module docstring for asymmetry note.
    ("public_key",            lambda c: _cx509_pubkey_safe(c)),
    ("version",               lambda c: c.version),
]


def _cx509_pubkey_safe(cert: object) -> object:
    """Call cert.public_key() and return the result, or None on failure."""
    try:
        return cert.public_key()    # type: ignore[attr-defined]
    except Exception:
        return None


# ── Benchmark runners ─────────────────────────────────────────────────────────

# Mirror Criterion's default timing configuration:
#   warm-up time:    3 seconds  (results discarded)
#   measurement time: 5 seconds (used to compute avg latency and iteration count)
WARMUP_TIME_S = 3.0
MEASUREMENT_TIME_S = 5.0

# Shorter windows for the per-field breakdown (T3): many fields × many certs
# would take too long at full Criterion durations.
WARMUP_TIME_S_QUICK = 1.0
MEASUREMENT_TIME_S_QUICK = 2.0


def _measure(
    fn: Callable[[], object],
    warmup_s: float,
    measure_s: float,
) -> tuple[float, int, list[float], list[float]]:
    """Criterion Linear-mode sampling wrapper.

    Returns ``(avg_us, total_iters, iters_list, times_ns_list)``.
    *iters_list* and *times_ns_list* are the raw per-sample arrays suitable
    for passing directly to :func:`criterion_compat.save_criterion_files`.
    """
    avg_us, iters_list, times_ns_list = _criterion_measure(fn, warmup_s, measure_s)
    total_iters = sum(int(x) for x in iters_list)
    return avg_us, total_iters, iters_list, times_ns_list


def _bench_certs(
    group_name: str,
    binding: str,
    certs: list[tuple[str, bytes]],
    parse_fn: Callable[[bytes], object],
    access_fn: Callable[[object], None] | None = None,
    *,
    criterion_dir: Path | None = None,
) -> None:
    """Time parse_fn (+ optional access_fn) on each cert in a fresh loop."""
    for cert_id, der_bytes in certs:
        if access_fn is not None:
            work = lambda: access_fn(parse_fn(der_bytes))  # noqa: E731
        else:
            work = lambda: parse_fn(der_bytes)  # noqa: E731
        avg_us, total_iters, iters_list, times_ns_list = _measure(
            work, WARMUP_TIME_S, MEASUREMENT_TIME_S
        )
        print(f"{group_name}/{binding}/{cert_id}  avg: {avg_us:.2f} µs  ({total_iters} iterations)")
        if criterion_dir is not None:
            _criterion_save(criterion_dir, group_name, binding, cert_id,
                            iters_list, times_ns_list)


def _bench_access_only(
    group_name: str,
    binding: str,
    certs: list[tuple[str, bytes]],
    parse_fn: Callable[[bytes], object],
    access_fn: Callable[[object], None],
    *,
    criterion_dir: Path | None = None,
) -> None:
    """Time access_fn on a pre-parsed cert; parse cost excluded.

    The cert object is created once outside the timing loop so only the cost
    of reading the fields is measured.  For synta, all OnceLock caches are
    warmed before the timed loop begins, so this measures the warm-path
    clone_ref cost.  For cryptography_x509, Python-level caching applies.
    """
    for cert_id, der_bytes in certs:
        cert = parse_fn(der_bytes)
        # Prime all lazy caches before timing begins.
        access_fn(cert)
        avg_us, total_iters, iters_list, times_ns_list = _measure(
            lambda: access_fn(cert), WARMUP_TIME_S, MEASUREMENT_TIME_S
        )
        print(f"{group_name}/{binding}/{cert_id}  avg: {avg_us:.2f} µs  ({total_iters} iterations)")
        if criterion_dir is not None:
            _criterion_save(criterion_dir, group_name, binding, cert_id,
                            iters_list, times_ns_list)


def _bench_fields(
    group_name: str,
    binding: str,
    certs: list[tuple[str, bytes]],
    parse_fn: Callable[[bytes], object],
    field_fns: list[tuple[str, Callable[[object], object]]],
    *,
    criterion_dir: Path | None = None,
) -> None:
    """Time each getter individually on a pre-parsed cert (warm-cache path).

    Uses shorter warmup/measurement windows (WARMUP_TIME_S_QUICK /
    MEASUREMENT_TIME_S_QUICK) to keep total runtime manageable across many
    fields and certs.
    """
    for cert_id, der_bytes in certs:
        cert = parse_fn(der_bytes)
        for field_name, field_fn in field_fns:
            # Warm the specific getter so OnceLock (synta) or Python-level
            # caching (cryptography) is populated before timing.
            field_fn(cert)
            avg_us, total_iters, iters_list, times_ns_list = _measure(
                lambda fn=field_fn: fn(cert), WARMUP_TIME_S_QUICK, MEASUREMENT_TIME_S_QUICK
            )
            print(
                f"{group_name}/{binding}/{field_name}/{cert_id}"
                f"  avg: {avg_us:.3f} µs  ({total_iters} iterations)"
            )
            if criterion_dir is not None:
                # value_str uses a slash to nest field under cert in Criterion's
                # directory tree: …/getter/synta/issuer/cert_00_name/new/
                _criterion_save(criterion_dir, group_name, binding,
                                f"{field_name}/{cert_id}",
                                iters_list, times_ns_list)


def _run_group(
    prefix: str,
    certs: list[tuple[str, bytes]],
    *,
    criterion_dir: Path | None = None,
) -> None:
    """Run all benchmark subtests (parse, parse+fields, access-only, per-field) for one group."""
    if not certs:
        return
    kw = {"criterion_dir": criterion_dir}
    # Parse-only
    print()
    _bench_certs(prefix, "synta", certs, synta.Certificate.from_der, **kw)
    print()
    _bench_certs(prefix, "synta_full", certs, synta.Certificate.full_from_der, **kw)
    if _HAS_CRYPTOGRAPHY:
        print()
        _bench_certs(prefix, "cryptography_x509", certs, _cx509_load, **kw)
    # Parse + access all fields (combined)
    print()
    _bench_certs(f"{prefix}_fields", "synta", certs,
                 synta.Certificate.from_der, _access_synta_fields, **kw)
    if _HAS_CRYPTOGRAPHY:
        print()
        _bench_certs(f"{prefix}_fields", "cryptography_x509", certs,
                     _cx509_load, _access_cx509_fields, **kw)
    # Field access only on pre-parsed cert (parse cost excluded)
    print()
    _bench_access_only(f"{prefix}_access", "synta", certs,
                       synta.Certificate.from_der, _access_synta_fields, **kw)
    if _HAS_CRYPTOGRAPHY:
        print()
        _bench_access_only(f"{prefix}_access", "cryptography_x509", certs,
                           _cx509_load, _access_cx509_fields, **kw)
    # Per-field getter breakdown (opt-in)
    if _PER_FIELD:
        print()
        _bench_fields(f"{prefix}_getter", "synta", certs,
                      synta.Certificate.from_der, _SYNTA_FIELD_FNS, **kw)
        if _HAS_CRYPTOGRAPHY:
            print()
            _bench_fields(f"{prefix}_getter", "cryptography_x509", certs,
                          _cx509_load, _CX509_FIELD_FNS, **kw)


def main() -> None:
    if not _TEST_VECTORS_DIR.exists():
        print(
            f"ERROR: test vectors directory not found:\n  {_TEST_VECTORS_DIR}\n\n"
            "Run the Criterion benchmarks first to clone certificate repositories:\n"
            "    cargo bench -p synta-bench --bench bindings --features bench-bindings --no-run",
            file=sys.stderr,
        )
        sys.exit(1)

    # ── Group A: traditional certificates ────────────────────────────────────
    crypto_dir = _TEST_VECTORS_DIR / _CRYPTOGRAPHY_CERT_PATH
    if not crypto_dir.exists():
        print(
            f"WARNING: PyCA cryptography cert directory not found: {crypto_dir}",
            file=sys.stderr,
        )
        comparison_certs: list[tuple[str, bytes]] = []
    else:
        paths = _find_certs(crypto_dir, max_files=5)
        comparison_certs = []
        for idx, path in enumerate(paths):
            try:
                der = _load_der(path)
            except Exception as exc:
                print(f"WARNING: skipping {path.name}: {exc}", file=sys.stderr)
                continue
            comparison_certs.append((f"cert_{idx:02d}_{path.name[:30]}", der))

    if len(comparison_certs) != 5:
        print(
            f"WARNING: expected 5 traditional certificates, found {len(comparison_certs)}. "
            "Results may not match the Rust bindings benchmark.",
            file=sys.stderr,
        )

    if _CRITERION_DIR is not None:
        print(f"\nCriterion JSON output: {_CRITERION_DIR}", file=sys.stderr)

    _run_group("binding_comparison", comparison_certs, criterion_dir=_CRITERION_DIR)

    # ── Group B: ML-DSA post-quantum certificates ─────────────────────────────
    lamps_dir = _TEST_VECTORS_DIR / _LAMPS_ML_DSA_CERT_PATH
    if not lamps_dir.exists():
        print(
            f"WARNING: LAMPS ML-DSA cert directory not found: {lamps_dir}",
            file=sys.stderr,
        )
        pq_certs: list[tuple[str, bytes]] = []
    else:
        pq_paths = _find_certs(lamps_dir, max_files=3, extensions={".crt"})
        pq_certs = []
        for idx, path in enumerate(pq_paths):
            try:
                der = _load_der(path)
            except Exception as exc:
                print(f"WARNING: skipping {path.name}: {exc}", file=sys.stderr)
                continue
            pq_certs.append((f"cert_{idx:02d}_{path.name}", der))

    if len(pq_certs) != 3:
        print(
            f"WARNING: expected 3 ML-DSA certificates, found {len(pq_certs)}. "
            "Results may not match the Rust bindings benchmark.",
            file=sys.stderr,
        )

    _run_group("binding_post_quantum", pq_certs, criterion_dir=_CRITERION_DIR)

    if not comparison_certs and not pq_certs:
        print(
            "\nNo certificates found.  Run the Rust benchmark first to populate test vectors.",
            file=sys.stderr,
        )
        sys.exit(1)


if __name__ == "__main__":
    main()