import shutil
import subprocess
import sys
import tempfile
import time
from pathlib import Path
import synta
import synta.x509 as x509
def section(title: str) -> None:
print(f"\n{'─' * 64}\n{title}\n{'─' * 64}")
def run(cmd: list[str], **kwargs) -> subprocess.CompletedProcess:
result = subprocess.run(cmd, **kwargs)
if result.returncode != 0:
stderr = getattr(result, "stderr", b"")
if isinstance(stderr, bytes):
stderr = stderr.decode(errors="replace")
sys.exit(f"Command failed (rc={result.returncode}): {stderr.strip()!r}")
return result
def generate_certs(tmpdir: Path) -> dict[str, Path]:
openssl = shutil.which("openssl")
if openssl is None:
return {}
root_key = tmpdir / "root.key"
root_pem = tmpdir / "root.pem"
root_der = tmpdir / "root.der"
leaf_key = tmpdir / "leaf.key"
leaf_csr = tmpdir / "leaf.csr"
leaf_pem = tmpdir / "leaf.pem"
leaf_der = tmpdir / "leaf.der"
client_key = tmpdir / "client.key"
client_csr = tmpdir / "client.csr"
client_pem = tmpdir / "client.pem"
client_der = tmpdir / "client.der"
leaf_ext = tmpdir / "leaf_ext.cnf"
leaf_ext.write_text(
"[v3_leaf]\n"
"subjectAltName=DNS:example.com,DNS:www.example.com\n"
"extendedKeyUsage=serverAuth\n"
"basicConstraints=CA:FALSE\n"
"keyUsage=digitalSignature\n"
)
client_ext = tmpdir / "client_ext.cnf"
client_ext.write_text(
"[v3_client]\n"
"subjectAltName=email:client@example.com\n"
"extendedKeyUsage=clientAuth\n"
"basicConstraints=CA:FALSE\n"
"keyUsage=digitalSignature\n"
)
run([openssl, "req", "-x509", "-newkey", "rsa:2048",
"-keyout", str(root_key), "-out", str(root_pem),
"-days", "3650", "-nodes", "-subj", "/CN=Test Root CA"],
capture_output=True)
run([openssl, "x509", "-in", str(root_pem),
"-out", str(root_der), "-outform", "DER"],
capture_output=True)
run([openssl, "req", "-newkey", "rsa:2048",
"-keyout", str(leaf_key), "-out", str(leaf_csr),
"-nodes", "-subj", "/CN=example.com"],
capture_output=True)
run([openssl, "x509", "-req", "-in", str(leaf_csr),
"-CA", str(root_pem), "-CAkey", str(root_key), "-CAcreateserial",
"-out", str(leaf_pem), "-days", "365",
"-extfile", str(leaf_ext), "-extensions", "v3_leaf"],
capture_output=True)
run([openssl, "x509", "-in", str(leaf_pem),
"-out", str(leaf_der), "-outform", "DER"],
capture_output=True)
run([openssl, "req", "-newkey", "rsa:2048",
"-keyout", str(client_key), "-out", str(client_csr),
"-nodes", "-subj", "/CN=Test Client"],
capture_output=True)
run([openssl, "x509", "-req", "-in", str(client_csr),
"-CA", str(root_pem), "-CAkey", str(root_key), "-CAcreateserial",
"-out", str(client_pem), "-days", "365",
"-extfile", str(client_ext), "-extensions", "v3_client"],
capture_output=True)
run([openssl, "x509", "-in", str(client_pem),
"-out", str(client_der), "-outform", "DER"],
capture_output=True)
return {
"root_pem": root_pem,
"root_der": root_der,
"leaf_der": leaf_der,
"client_der": client_der,
}
def demo_trust_store(paths: dict[str, Path]) -> None:
section("1. TrustStore — construction and inspection")
root_der = paths["root_der"].read_bytes()
store = x509.TrustStore([root_der])
print(f" store.len : {store.len}")
print(f" repr(store) : {store!r}")
assert store.len == 1
pem_data = paths["root_pem"].read_bytes()
ders = synta.pem_to_der(pem_data)
assert len(ders) == 1
store2 = x509.TrustStore(ders)
assert store2.len == 1
print(f" from PEM : {store2!r} ✓")
empty = x509.TrustStore([])
assert empty.len == 0
print(f" empty store : {empty!r}")
def demo_server_verify(paths: dict[str, Path]) -> None:
section("2. verify_server_certificate — single server name")
root_der = paths["root_der"].read_bytes()
leaf_der = paths["leaf_der"].read_bytes()
store = x509.TrustStore([root_der])
policy = x509.VerificationPolicy(server_names=["example.com"])
chain = x509.verify_server_certificate(leaf_der, [], store, policy)
print(f" chain length : {len(chain)}")
for i, cert_der in enumerate(chain):
cert = synta.Certificate.from_der(cert_der)
role = "trust anchor" if i == 0 else "leaf"
print(f" chain[{i}] ({role}) : {cert.subject}")
print(f" not_before={cert.not_before} not_after={cert.not_after}")
print(" verify_server_certificate: ✓")
def demo_policy_variants(paths: dict[str, Path]) -> None:
section("3. VerificationPolicy — multiple variants")
root_der = paths["root_der"].read_bytes()
leaf_der = paths["leaf_der"].read_bytes()
store = x509.TrustStore([root_der])
policy = x509.VerificationPolicy(
server_names=["example.com", "www.example.com"],
name_match="any",
)
chain = x509.verify_server_certificate(leaf_der, [], store, policy)
print(f" any-match ['example.com', 'www.example.com'] : ✓ (chain {len(chain)})")
policy = x509.VerificationPolicy(
server_names=["example.com", "www.example.com"],
name_match="all",
)
chain = x509.verify_server_certificate(leaf_der, [], store, policy)
print(f" all-match ['example.com', 'www.example.com'] : ✓ (chain {len(chain)})")
now = int(time.time())
policy = x509.VerificationPolicy(
server_names=["example.com"],
validation_time=now,
)
chain = x509.verify_server_certificate(leaf_der, [], store, policy)
print(f" fixed validation_time={now} : ✓")
policy = x509.VerificationPolicy(
server_names=["example.com"],
max_chain_depth=1,
)
chain = x509.verify_server_certificate(leaf_der, [], store, policy)
print(f" max_chain_depth=1 : ✓")
policy = x509.VerificationPolicy(
server_names=["example.com"],
profile="rfc5280",
)
chain = x509.verify_server_certificate(leaf_der, [], store, policy)
print(f" profile='rfc5280' : ✓")
print(f" repr : {policy!r}")
def demo_client_verify(paths: dict[str, Path]) -> None:
section("4. verify_client_certificate")
root_der = paths["root_der"].read_bytes()
client_der = paths["client_der"].read_bytes()
store = x509.TrustStore([root_der])
chain = x509.verify_client_certificate(client_der, [], store)
print(f" chain length : {len(chain)}")
for i, cert_der in enumerate(chain):
cert = synta.Certificate.from_der(cert_der)
role = "trust anchor" if i == 0 else "leaf"
print(f" chain[{i}] ({role}) : {cert.subject}")
print(" verify_client_certificate: ✓")
def generate_crl(tmpdir: Path, paths: dict[str, Path]) -> Path | None:
openssl = shutil.which("openssl")
if openssl is None:
return None
crl_pem = tmpdir / "root.crl"
crl_der = tmpdir / "root.crl.der"
ca_conf = tmpdir / "ca.cnf"
serial_file = tmpdir / "serial"
index_file = tmpdir / "index.txt"
crlnumber_file = tmpdir / "crlnumber"
index_file.write_text("")
serial_file.write_text("01\n")
crlnumber_file.write_text("01\n") ca_conf.write_text(
"[ca]\ndefault_ca = CA_default\n"
"[CA_default]\n"
f"database = {index_file}\n"
f"serial = {serial_file}\n"
f"certificate = {paths['root_pem']}\n"
f"private_key = {tmpdir}/root.key\n"
"default_md = sha256\n"
"default_crl_days = 30\n"
"crl_extensions = crl_ext\n" "[crl_ext]\n"
"cRLNumber = auto\n" )
result = subprocess.run(
[openssl, "ca", "-gencrl", "-config", str(ca_conf),
"-out", str(crl_pem)],
capture_output=True,
)
if result.returncode != 0:
return None
subprocess.run(
[openssl, "crl", "-in", str(crl_pem), "-out", str(crl_der), "-outform", "DER"],
capture_output=True, check=True,
)
return crl_der
def demo_crl_revocation(paths: dict[str, Path], tmpdir: Path) -> None:
section("5. CrlStore — CRL-based revocation checking")
root_der = paths["root_der"].read_bytes()
leaf_der = paths["leaf_der"].read_bytes()
store = x509.TrustStore([root_der])
policy = x509.VerificationPolicy(server_names=["example.com"])
empty_crl_store = x509.CrlStore([])
chain = x509.verify_server_certificate(leaf_der, [], store, policy, crls=empty_crl_store)
print(f" empty CrlStore (soft-fail): ✓ chain length={len(chain)}")
print(f" repr: {empty_crl_store!r} len={empty_crl_store.len}")
crl_der = generate_crl(tmpdir, paths)
if crl_der is not None:
crl_ders = synta.pem_to_der(
(tmpdir / "root.crl").read_bytes()
)
crl_store = x509.CrlStore(crl_ders)
chain = x509.verify_server_certificate(leaf_der, [], store, policy, crls=crl_store)
print(f" CRL present (leaf not revoked): ✓ chain length={len(chain)}")
print(f" repr: {crl_store!r} len={crl_store.len}")
else:
print(" (CRL generation skipped — openssl ca not available)")
def demo_ocsp_revocation(paths: dict[str, Path]) -> None:
section("6. OcspStore — OCSP-based revocation checking")
root_der = paths["root_der"].read_bytes()
leaf_der = paths["leaf_der"].read_bytes()
store = x509.TrustStore([root_der])
policy = x509.VerificationPolicy(server_names=["example.com"])
empty_ocsp_store = x509.OcspStore([])
chain = x509.verify_server_certificate(leaf_der, [], store, policy, ocsp=empty_ocsp_store)
print(f" empty OcspStore (soft-fail): ✓ chain length={len(chain)}")
print(f" repr: {empty_ocsp_store!r} len={empty_ocsp_store.len}")
bad_ocsp_store = x509.OcspStore([b"not a valid OCSP response"])
chain = x509.verify_server_certificate(leaf_der, [], store, policy, ocsp=bad_ocsp_store)
print(f" malformed OCSP response (soft-fail): ✓ chain length={len(chain)}")
empty_crl_store = x509.CrlStore([])
chain = x509.verify_server_certificate(
leaf_der, [], store, policy, crls=empty_crl_store, ocsp=empty_ocsp_store
)
print(f" CRL + OCSP together (both soft-fail): ✓ chain length={len(chain)}")
def demo_errors(paths: dict[str, Path]) -> None:
section("7. X509VerificationError — expected failure cases")
root_der = paths["root_der"].read_bytes()
leaf_der = paths["leaf_der"].read_bytes()
store = x509.TrustStore([root_der])
try:
x509.verify_server_certificate(
leaf_der, [], x509.TrustStore([]),
x509.VerificationPolicy(server_names=["example.com"]),
)
raise AssertionError("expected X509VerificationError")
except x509.X509VerificationError as exc:
print(f" empty trust store → X509VerificationError: {exc}")
try:
x509.verify_server_certificate(
leaf_der, [], store,
x509.VerificationPolicy(server_names=["other.com"]),
)
raise AssertionError("expected X509VerificationError")
except x509.X509VerificationError as exc:
print(f" name mismatch → X509VerificationError: {exc}")
future_time = 9_999_999_999
try:
x509.verify_server_certificate(
leaf_der, [], store,
x509.VerificationPolicy(
server_names=["example.com"],
validation_time=future_time,
),
)
raise AssertionError("expected X509VerificationError")
except x509.X509VerificationError as exc:
print(f" expired cert → X509VerificationError: {exc}")
def main() -> None:
print("=" * 64)
print("Example: X.509 certificate chain verification (synta.x509)")
print("=" * 64)
if shutil.which("openssl") is None:
print("\nSkipped: openssl not found on PATH.")
print("Install openssl to run this example.")
return
tmpdir = Path(tempfile.mkdtemp(prefix="synta_x509_example_"))
try:
print(f"\nGenerating test PKI in {tmpdir} …")
paths = generate_certs(tmpdir)
if not paths:
print("Certificate generation failed; skipping.")
return
demo_trust_store(paths)
demo_server_verify(paths)
demo_policy_variants(paths)
demo_client_verify(paths)
demo_crl_revocation(paths, tmpdir)
demo_ocsp_revocation(paths)
demo_errors(paths)
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
print("\nAll x509 verification examples completed.")
if __name__ == "__main__":
main()