# 23. `example_x509_verify.py` — X.509 certificate chain verification
[← Example index](index.md) · [example_x509_verify.py on Codeberg](https://codeberg.org/abbra/synta/src/branch/main/examples/example_x509_verify.py)
Bindings: `synta.x509.TrustStore`, `synta.x509.VerificationPolicy`,
`synta.x509.verify_server_certificate`, `synta.x509.verify_client_certificate`,
`synta.x509.X509VerificationError`.
Requires the `openssl` Cargo feature (`maturin develop --features openssl`).
- Build a self-signed root CA DER in memory with `openssl req -x509` and load it
into a `TrustStore`; print `store.len` and `repr(store)`.
- Verify a leaf certificate signed by that root with `verify_server_certificate`;
print the chain length and each certificate's subject with `synta.Certificate.from_der`.
- Demonstrate `VerificationPolicy` variants: single name, multi-name `any`, multi-name
`all`, fixed `validation_time`, `max_chain_depth`, `profile="rfc5280"`.
- Demonstrate client certificate verification with `verify_client_certificate`.
- Show that `X509VerificationError` is raised when the trust store does not contain
the issuer, the server name does not match the SAN, and the validation time is
outside the validity window.
## Source
```python
#!/usr/bin/env python3
"""
Example: X.509 certificate chain verification with synta.x509
Demonstrates:
- TrustStore construction from DER and PEM (via synta.pem_to_der)
- VerificationPolicy: single name, multi-name any/all, fixed
validation_time, max_chain_depth, profile="rfc5280"
- verify_server_certificate — server cert chain validation
- verify_client_certificate — client cert chain validation
- CrlStore — CRL-based revocation checking (soft-fail / hard-fail)
- OcspStore — OCSP-based revocation checking (soft-fail / hard-fail)
- X509VerificationError: untrusted issuer, server name mismatch,
and validation time outside the validity window
Requirements:
- synta built with the ``openssl`` Cargo feature
(``maturin develop --features openssl`` or equivalent)
- openssl(1) on PATH for certificate generation
Usage::
python examples/example_x509_verify.py
"""
import shutil
import subprocess
import sys
import tempfile
import time
from pathlib import Path
import synta
import synta.x509 as x509
# ── Formatting helpers ────────────────────────────────────────────────────────
def section(title: str) -> None:
print(f"\n{'─' * 64}\n{title}\n{'─' * 64}")
def run(cmd: list[str], **kwargs) -> subprocess.CompletedProcess:
"""Run a subprocess, raising SystemExit on failure."""
result = subprocess.run(cmd, **kwargs)
if result.returncode != 0:
stderr = getattr(result, "stderr", b"")
if isinstance(stderr, bytes):
stderr = stderr.decode(errors="replace")
sys.exit(f"Command failed (rc={result.returncode}): {stderr.strip()!r}")
return result
# ── Certificate generation ────────────────────────────────────────────────────
def generate_certs(tmpdir: Path) -> dict[str, Path]:
"""Generate a small PKI (root CA + server leaf + client leaf) in tmpdir.
Returns a dict of path names → Path objects, or an empty dict if
openssl is not on PATH.
"""
openssl = shutil.which("openssl")
if openssl is None:
return {}
root_key = tmpdir / "root.key"
root_pem = tmpdir / "root.pem"
root_der = tmpdir / "root.der"
leaf_key = tmpdir / "leaf.key"
leaf_csr = tmpdir / "leaf.csr"
leaf_pem = tmpdir / "leaf.pem"
leaf_der = tmpdir / "leaf.der"
client_key = tmpdir / "client.key"
client_csr = tmpdir / "client.csr"
client_pem = tmpdir / "client.pem"
client_der = tmpdir / "client.der"
# Extension config for the server leaf certificate.
leaf_ext = tmpdir / "leaf_ext.cnf"
leaf_ext.write_text(
"[v3_leaf]\n"
"subjectAltName=DNS:example.com,DNS:www.example.com\n"
"extendedKeyUsage=serverAuth\n"
"basicConstraints=CA:FALSE\n"
"keyUsage=digitalSignature\n"
)
# Extension config for the client leaf certificate.
# SAN is required by the default WebPKI EE extension policy.
client_ext = tmpdir / "client_ext.cnf"
client_ext.write_text(
"[v3_client]\n"
"subjectAltName=email:client@example.com\n"
"extendedKeyUsage=clientAuth\n"
"basicConstraints=CA:FALSE\n"
"keyUsage=digitalSignature\n"
)
# Root CA — self-signed, 10-year validity.
run([openssl, "req", "-x509", "-newkey", "rsa:2048",
"-keyout", str(root_key), "-out", str(root_pem),
"-days", "3650", "-nodes", "-subj", "/CN=Test Root CA"],
capture_output=True)
run([openssl, "x509", "-in", str(root_pem),
"-out", str(root_der), "-outform", "DER"],
capture_output=True)
# Server leaf — CSR then sign with root.
run([openssl, "req", "-newkey", "rsa:2048",
"-keyout", str(leaf_key), "-out", str(leaf_csr),
"-nodes", "-subj", "/CN=example.com"],
capture_output=True)
run([openssl, "x509", "-req", "-in", str(leaf_csr),
"-CA", str(root_pem), "-CAkey", str(root_key), "-CAcreateserial",
"-out", str(leaf_pem), "-days", "365",
"-extfile", str(leaf_ext), "-extensions", "v3_leaf"],
capture_output=True)
run([openssl, "x509", "-in", str(leaf_pem),
"-out", str(leaf_der), "-outform", "DER"],
capture_output=True)
# Client leaf — CSR then sign with root.
run([openssl, "req", "-newkey", "rsa:2048",
"-keyout", str(client_key), "-out", str(client_csr),
"-nodes", "-subj", "/CN=Test Client"],
capture_output=True)
run([openssl, "x509", "-req", "-in", str(client_csr),
"-CA", str(root_pem), "-CAkey", str(root_key), "-CAcreateserial",
"-out", str(client_pem), "-days", "365",
"-extfile", str(client_ext), "-extensions", "v3_client"],
capture_output=True)
run([openssl, "x509", "-in", str(client_pem),
"-out", str(client_der), "-outform", "DER"],
capture_output=True)
return {
"root_pem": root_pem,
"root_der": root_der,
"leaf_der": leaf_der,
"client_der": client_der,
}
# ── Section 1: TrustStore ─────────────────────────────────────────────────────
def demo_trust_store(paths: dict[str, Path]) -> None:
section("1. TrustStore — construction and inspection")
# From a DER byte string.
root_der = paths["root_der"].read_bytes()
store = x509.TrustStore([root_der])
print(f" store.len : {store.len}")
print(f" repr(store) : {store!r}")
assert store.len == 1
# From a PEM file via synta.pem_to_der — returns list[bytes], one per block.
pem_data = paths["root_pem"].read_bytes()
ders = synta.pem_to_der(pem_data)
assert len(ders) == 1
store2 = x509.TrustStore(ders)
assert store2.len == 1
print(f" from PEM : {store2!r} ✓")
# Empty store is valid (every verification will fail against it).
empty = x509.TrustStore([])
assert empty.len == 0
print(f" empty store : {empty!r}")
# ── Section 2: Basic server verification ──────────────────────────────────────
def demo_server_verify(paths: dict[str, Path]) -> None:
section("2. verify_server_certificate — single server name")
root_der = paths["root_der"].read_bytes()
leaf_der = paths["leaf_der"].read_bytes()
store = x509.TrustStore([root_der])
policy = x509.VerificationPolicy(server_names=["example.com"])
chain = x509.verify_server_certificate(leaf_der, [], store, policy)
print(f" chain length : {len(chain)}")
for i, cert_der in enumerate(chain):
cert = synta.Certificate.from_der(cert_der)
role = "trust anchor" if i == 0 else "leaf"
print(f" chain[{i}] ({role}) : {cert.subject}")
print(f" not_before={cert.not_before} not_after={cert.not_after}")
print(" verify_server_certificate: ✓")
# ── Section 3: VerificationPolicy variants ────────────────────────────────────
def demo_policy_variants(paths: dict[str, Path]) -> None:
section("3. VerificationPolicy — multiple variants")
root_der = paths["root_der"].read_bytes()
leaf_der = paths["leaf_der"].read_bytes()
store = x509.TrustStore([root_der])
# Multi-name, any-match: cert covers at least one of the listed names.
# The server leaf has SAN DNS:example.com and DNS:www.example.com, so
# either name satisfies the any-match check.
policy = x509.VerificationPolicy(
server_names=["example.com", "www.example.com"],
name_match="any",
)
chain = x509.verify_server_certificate(leaf_der, [], store, policy)
print(f" any-match ['example.com', 'www.example.com'] : ✓ (chain {len(chain)})")
# Multi-name, all-match: cert must cover every listed name.
# Both names are in the leaf SAN, so all-match also succeeds.
policy = x509.VerificationPolicy(
server_names=["example.com", "www.example.com"],
name_match="all",
)
chain = x509.verify_server_certificate(leaf_der, [], store, policy)
print(f" all-match ['example.com', 'www.example.com'] : ✓ (chain {len(chain)})")
# Fixed validation time: pin to the current second so the cert is always
# within its validity window regardless of when this example is run.
now = int(time.time())
policy = x509.VerificationPolicy(
server_names=["example.com"],
validation_time=now,
)
chain = x509.verify_server_certificate(leaf_der, [], store, policy)
print(f" fixed validation_time={now} : ✓")
# max_chain_depth: the test chain is root→leaf (0 intermediates), so even
# a depth limit of 1 passes.
policy = x509.VerificationPolicy(
server_names=["example.com"],
max_chain_depth=1,
)
chain = x509.verify_server_certificate(leaf_der, [], store, policy)
print(f" max_chain_depth=1 : ✓")
# RFC 5280 profile: relaxes CABF rules (EKU optional, NC must be critical).
policy = x509.VerificationPolicy(
server_names=["example.com"],
profile="rfc5280",
)
chain = x509.verify_server_certificate(leaf_der, [], store, policy)
print(f" profile='rfc5280' : ✓")
print(f" repr : {policy!r}")
# ── Section 4: Client certificate verification ────────────────────────────────
def demo_client_verify(paths: dict[str, Path]) -> None:
section("4. verify_client_certificate")
root_der = paths["root_der"].read_bytes()
client_der = paths["client_der"].read_bytes()
store = x509.TrustStore([root_der])
# server_names in the policy is ignored for client verification.
chain = x509.verify_client_certificate(client_der, [], store)
print(f" chain length : {len(chain)}")
for i, cert_der in enumerate(chain):
cert = synta.Certificate.from_der(cert_der)
role = "trust anchor" if i == 0 else "leaf"
print(f" chain[{i}] ({role}) : {cert.subject}")
print(" verify_client_certificate: ✓")
# ── Section 5: CRL revocation checking ───────────────────────────────────────
def generate_crl(tmpdir: Path, paths: dict[str, Path]) -> Path | None:
"""Generate a CRL signed by root CA (empty — no revocations yet)."""
openssl = shutil.which("openssl")
if openssl is None:
return None
crl_pem = tmpdir / "root.crl"
crl_der = tmpdir / "root.crl.der"
# openssl ca requires a database; use a minimal config.
ca_conf = tmpdir / "ca.cnf"
serial_file = tmpdir / "serial"
index_file = tmpdir / "index.txt"
crlnumber_file = tmpdir / "crlnumber"
index_file.write_text("")
serial_file.write_text("01\n")
crlnumber_file.write_text("01\n") # required when crl_extensions includes cRLNumber
ca_conf.write_text(
"[ca]\ndefault_ca = CA_default\n"
"[CA_default]\n"
f"database = {index_file}\n"
f"serial = {serial_file}\n"
f"certificate = {paths['root_pem']}\n"
f"private_key = {tmpdir}/root.key\n"
"default_md = sha256\n"
"default_crl_days = 30\n"
"crl_extensions = crl_ext\n" # activate CRL extensions section
"[crl_ext]\n"
"cRLNumber = auto\n" # required by WebPKI profile; auto-increments
)
result = subprocess.run(
[openssl, "ca", "-gencrl", "-config", str(ca_conf),
"-out", str(crl_pem)],
capture_output=True,
)
if result.returncode != 0:
return None
subprocess.run(
[openssl, "crl", "-in", str(crl_pem), "-out", str(crl_der), "-outform", "DER"],
capture_output=True, check=True,
)
return crl_der
def demo_crl_revocation(paths: dict[str, Path], tmpdir: Path) -> None:
section("5. CrlStore — CRL-based revocation checking")
root_der = paths["root_der"].read_bytes()
leaf_der = paths["leaf_der"].read_bytes()
store = x509.TrustStore([root_der])
policy = x509.VerificationPolicy(server_names=["example.com"])
# Empty CrlStore: no CRL matches the leaf issuer → soft-fail → passes.
empty_crl_store = x509.CrlStore([])
chain = x509.verify_server_certificate(leaf_der, [], store, policy, crls=empty_crl_store)
print(f" empty CrlStore (soft-fail): ✓ chain length={len(chain)}")
print(f" repr: {empty_crl_store!r} len={empty_crl_store.len}")
# Build from PEM via synta.pem_to_der (no-CRL path — simulate PEM bundle).
crl_der = generate_crl(tmpdir, paths)
if crl_der is not None:
crl_ders = synta.pem_to_der(
(tmpdir / "root.crl").read_bytes()
)
crl_store = x509.CrlStore(crl_ders)
chain = x509.verify_server_certificate(leaf_der, [], store, policy, crls=crl_store)
print(f" CRL present (leaf not revoked): ✓ chain length={len(chain)}")
print(f" repr: {crl_store!r} len={crl_store.len}")
else:
print(" (CRL generation skipped — openssl ca not available)")
# ── Section 6: OCSP revocation checking ──────────────────────────────────────
def demo_ocsp_revocation(paths: dict[str, Path]) -> None:
section("6. OcspStore — OCSP-based revocation checking")
root_der = paths["root_der"].read_bytes()
leaf_der = paths["leaf_der"].read_bytes()
store = x509.TrustStore([root_der])
policy = x509.VerificationPolicy(server_names=["example.com"])
# Empty OcspStore: no OCSP response matches the leaf → soft-fail → passes.
empty_ocsp_store = x509.OcspStore([])
chain = x509.verify_server_certificate(leaf_der, [], store, policy, ocsp=empty_ocsp_store)
print(f" empty OcspStore (soft-fail): ✓ chain length={len(chain)}")
print(f" repr: {empty_ocsp_store!r} len={empty_ocsp_store.len}")
# Malformed OCSP response: skipped (soft-fail) — verification still passes.
bad_ocsp_store = x509.OcspStore([b"not a valid OCSP response"])
chain = x509.verify_server_certificate(leaf_der, [], store, policy, ocsp=bad_ocsp_store)
print(f" malformed OCSP response (soft-fail): ✓ chain length={len(chain)}")
# CRL and OCSP together: both stores are checked; empty OCSP + empty CRL → passes.
empty_crl_store = x509.CrlStore([])
chain = x509.verify_server_certificate(
leaf_der, [], store, policy, crls=empty_crl_store, ocsp=empty_ocsp_store
)
print(f" CRL + OCSP together (both soft-fail): ✓ chain length={len(chain)}")
# ── Section 7: Error cases ────────────────────────────────────────────────────
def demo_errors(paths: dict[str, Path]) -> None:
section("7. X509VerificationError — expected failure cases")
root_der = paths["root_der"].read_bytes()
leaf_der = paths["leaf_der"].read_bytes()
store = x509.TrustStore([root_der])
# Untrusted issuer: an empty trust store has no root to anchor the chain.
try:
x509.verify_server_certificate(
leaf_der, [], x509.TrustStore([]),
x509.VerificationPolicy(server_names=["example.com"]),
)
raise AssertionError("expected X509VerificationError")
except x509.X509VerificationError as exc:
print(f" empty trust store → X509VerificationError: {exc}")
# Server name mismatch: the leaf SAN has example.com but policy asks for other.com.
try:
x509.verify_server_certificate(
leaf_der, [], store,
x509.VerificationPolicy(server_names=["other.com"]),
)
raise AssertionError("expected X509VerificationError")
except x509.X509VerificationError as exc:
print(f" name mismatch → X509VerificationError: {exc}")
# Validation time past the leaf's notAfter (year ~2286 is well beyond a 1-year cert).
future_time = 9_999_999_999
try:
x509.verify_server_certificate(
leaf_der, [], store,
x509.VerificationPolicy(
server_names=["example.com"],
validation_time=future_time,
),
)
raise AssertionError("expected X509VerificationError")
except x509.X509VerificationError as exc:
print(f" expired cert → X509VerificationError: {exc}")
# ── Entry point ───────────────────────────────────────────────────────────────
def main() -> None:
print("=" * 64)
print("Example: X.509 certificate chain verification (synta.x509)")
print("=" * 64)
if shutil.which("openssl") is None:
print("\nSkipped: openssl not found on PATH.")
print("Install openssl to run this example.")
return
tmpdir = Path(tempfile.mkdtemp(prefix="synta_x509_example_"))
try:
print(f"\nGenerating test PKI in {tmpdir} …")
paths = generate_certs(tmpdir)
if not paths:
print("Certificate generation failed; skipping.")
return
demo_trust_store(paths)
demo_server_verify(paths)
demo_policy_variants(paths)
demo_client_verify(paths)
demo_crl_revocation(paths, tmpdir)
demo_ocsp_revocation(paths)
demo_errors(paths)
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
print("\nAll x509 verification examples completed.")
if __name__ == "__main__":
main()
```