synta 0.2.1

ASN.1 parser, decoder, and encoder library with DER/BER support and C FFI
Documentation
#!/usr/bin/env python3
"""
Example 3: Extension access, SAN parsing, and extension DER builders.

Demonstrates: Certificate.subject_alt_names, Certificate.extensions_der,
Certificate.get_extension_value_der, synta.general_name tag constants,
synta.parse_general_names, synta.oids.SUBJECT_ALT_NAME,
synta.oids.BASIC_CONSTRAINTS, synta.oids.SUBJECT_KEY_IDENTIFIER,
Decoder.decode_sequence, Decoder.peek_tag, Decoder.decode_implicit_tag,
Decoder.decode_explicit_tag, Decoder.remaining_bytes, Decoder.decode_raw_tlv,
Decoder.is_empty;
synta.ext.SubjectAlternativeNameBuilder (alias SAN),
synta.ext.ExtendedKeyUsageBuilder (alias EKU),
synta.ext.AuthorityInformationAccessBuilder (alias AIA),
synta.ext.basic_constraints, synta.ext.key_usage,
synta.ext.subject_key_identifier, synta.ext.authority_key_identifier,
synta.ext.KU_* bitmask constants, synta.ext.KEYID_* method constants;
synta.digest, synta.name_der_equal, synta.encode_subject_alt_names,
synta.encode_extended_key_usage.
"""

import base64
import ipaddress
import socket
import synta
import synta.general_name as gn
import synta.oids as oids

# ECDSA P-256 certificate with SAN (dNSName x2, iPAddress, rfc822Name)
# SubjectAltName: example.com, www.example.com, 192.168.1.1, admin@example.com
_SAN_CERT_B64 = (
    "MIIB6DCCAY2gAwIBAgIUEBUQAc/E3r/3vioki6IU6lyfL5IwCgYIKoZIzj0EAwIw"
    "OjEUMBIGA1UEAwwLZXhhbXBsZS5jb20xFTATBgNVBAoMDEV4YW1wbGUgQ29ycDEL"
    "MAkGA1UEBhMCVVMwHhcNMjUwMTAxMDAwMDAwWhcNMjcwMTAxMDAwMDAwWjA6MRQw"
    "EgYDVQQDDAtleGFtcGxlLmNvbTEVMBMGA1UECgwMRXhhbXBsZSBDb3JwMQswCQYD"
    "VQQGEwJVUzBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABLIerh5Q5OVsw1o/hfHd"
    "Hgi3mjz6WDirif1I+JAuF3oUPGa+iyMnngLjCgvIghAxvWNrcqp+/eewofR58P7X"
    "g6ujcTBvMEAGA1UdEQQ5MDeCC2V4YW1wbGUuY29tgg93d3cuZXhhbXBsZS5jb22H"
    "BMCoAQGBEWFkbWluQGV4YW1wbGUuY29tMAwGA1UdEwEB/wQCMAAwHQYDVR0OBBYE"
    "FI8ICKdxcww2aHlOya4RvcbKlSVXMAoGCCqGSM49BAMCA0kAMEYCIQDrZBGKzs0V"
    "cEQ3uYzh9xNlKOGqZOjxct32A+dWJMlEdAIhAM0zkny+EHOqpQXKYbsOuxJedws6"
    "6d3nEZ7+v/kQ8hJP"
)

# RSA cert with BasicConstraints CA:TRUE and SubjectKeyIdentifier
_RSA_CERT_B64 = (
    "MIIDiTCCAnGgAwIBAgIUXhaeS3ad5SJp60GRJU73OQaO0xkwDQYJKoZIhvcNAQEL"
    "BQAwVDELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkNBMRYwFAYDVQQHDA1TYW4gRnJh"
    "bmNpc2NvMQ0wCwYDVQQKDARUZXN0MREwDwYDVQQDDAh0ZXN0LmNvbTAeFw0yNjAy"
    "MjMxMDU0MzRaFw0yNzAyMjMxMDU0MzRaMFQxCzAJBgNVBAYTAlVTMQswCQYDVQQI"
    "DAJDQTEWMBQGA1UEBwwNU2FuIEZyYW5jaXNjbzENMAsGA1UECgwEVGVzdDERMA8G"
    "A1UEAwwIdGVzdC5jb20wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDP"
    "9oir0NwIXFZ6gOUo//akzjNjvUhA/V1KSUY0L/iXOGWRHFcdcf4gVhoCgR+DgCV6"
    "bJKKrYPIvfEwmd8DdpPj1WU4Dztb4NNLgxquFZym2Swe0xDLQdtWoIQYerF/ER8D"
    "9Pk0qQ5QVaCO+KB3UKyXiJwcTc/LJnDqEX24mrf0ZH/HqB2GsUE3aI9aW5Lgwm9A"
    "7+gV7FrumaT7fQqpfNucWwlXU2SIRm//JKUrT0MGrh99vmmkGRZK+c9wLfIK+pny"
    "UQxSD1E395bpQTqTWIfcMWti6af3ix3GsWeoXwY+GDfZlZ1w22GjLmSgg1RMhhKZ"
    "9l+QFnI/GtmiXX2pCRZfAgMBAAGjUzBRMB0GA1UdDgQWBBRCjPvAUpiRe0Zs6DTL"
    "K+KLoTZfezAfBgNVHSMEGDAWgBRCjPvAUpiRe0Zs6DTLK+KLoTZfezAPBgNVHRMB"
    "Af8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQC+xBpTScGlcZGYvwtWLZuF2qRu"
    "o9xhYzHXgIDJglXZ8i70O+ut2WRyI9RJOSMsa7BI2qmc87Ki9ZMCO2QIMQCQo7cB"
    "kZtQvK8iGGHhSwepMORekzdbfUUs7N4YEM3Xako1+4RzL+T1Z3qzQ6nrnQ+gYyQo"
    "GiIFKbYZONu2OlqXGQe5LPwbsPU52GUbttkxodaHgCdP7yKO/l3sDifXpaFEXJhY"
    "5RZqgSG77jymh8YKcY0X9J53OtP1So/IOS1Za137k+eYci6iCB4w511qEhSRZdCy"
    "+3NEoGrBearMcJHttGbMplR6TU6fDFRIUAdelPdWfpfmtl1iElRDwNdQkBBR"
)


def section(title):
    print(f"\n{'' * 60}\n{title}\n{'' * 60}")


def demo_extensions_der():
    section("extensions_der — raw SEQUENCE OF Extension bytes")
    cert = synta.Certificate.from_der(base64.b64decode(_SAN_CERT_B64))
    ext_der = cert.extensions_der
    assert ext_der is not None
    print(f"  extensions_der: <{len(ext_der)} bytes>")

    # Iterate extensions manually: each Extension is SEQUENCE { OID, [BOOLEAN], OCTET STRING }
    dec = synta.Decoder(ext_der, synta.Encoding.DER)
    ext_seq = dec.decode_sequence()          # SEQUENCE OF
    count = 0
    while not ext_seq.is_empty():
        ext = ext_seq.decode_sequence()      # Extension SEQUENCE
        oid = ext.decode_oid()
        # Optional critical BOOLEAN
        tag_no, tag_class, _ = ext.peek_tag()
        critical = False
        if tag_no == 1 and tag_class == "Universal":
            critical = ext.decode_boolean().value()
        value_bytes = ext.decode_octet_string()  # extnValue OCTET STRING
        print(f"  [{count}] OID={oid}  critical={critical}  value={len(value_bytes.to_bytes())} bytes")
        count += 1


def demo_get_extension_value_der():
    section("get_extension_value_der — look up by OID")
    cert = synta.Certificate.from_der(base64.b64decode(_RSA_CERT_B64))

    bc = cert.get_extension_value_der(oids.BASIC_CONSTRAINTS)
    print(f"  BasicConstraints value hex: {bc.hex()}")

    ski = cert.get_extension_value_der(oids.SUBJECT_KEY_IDENTIFIER)
    print(f"  SubjectKeyIdentifier value hex: {ski.hex()}")

    # OID not present → None
    san = cert.get_extension_value_der(oids.SUBJECT_ALT_NAME)
    assert san is None
    print(f"  SubjectAltName (absent):  None")


def demo_parse_basic_constraints():
    section("Parse BasicConstraints — SEQUENCE { BOOLEAN TRUE }")
    cert = synta.Certificate.from_der(base64.b64decode(_RSA_CERT_B64))
    bc_der = cert.get_extension_value_der(oids.BASIC_CONSTRAINTS)
    dec = synta.Decoder(bc_der, synta.Encoding.DER)
    seq = dec.decode_sequence()
    is_ca = seq.decode_boolean().value()
    print(f"  cA BOOLEAN: {is_ca}")
    assert is_ca is True
    assert seq.is_empty()


def demo_subject_alt_names():
    section("Certificate.subject_alt_names() — high-level SAN access")
    cert = synta.Certificate.from_der(base64.b64decode(_SAN_CERT_B64))

    # Returns typed GeneralName objects (DNSName, IPAddress, RFC822Name, …).
    for name in cert.subject_alt_names():
        if isinstance(name, gn.DNSName):
            print(f"  dNSName:   {name.value}")
        elif isinstance(name, gn.IPAddress):
            print(f"  iPAddress: {ipaddress.ip_address(name.address)}")
        elif isinstance(name, gn.RFC822Name):
            print(f"  email:     {name.value}")
        elif isinstance(name, gn.DirectoryName):
            attrs = synta.parse_name_attrs(name.name_der)
            print(f"  DirName:   {attrs}")
        elif isinstance(name, gn.UniformResourceIdentifier):
            print(f"  URI:       {name.value}")
        else:
            print(f"  other:     {repr(name)}")

    # Certificates without a SAN extension return an empty list.
    no_san_cert = synta.Certificate.from_der(base64.b64decode(_RSA_CERT_B64))
    assert no_san_cert.subject_alt_names() == []
    print("  (cert without SAN → empty list: OK)")


def demo_parse_san():
    section("Parse SubjectAltName with peek_tag dispatch")
    cert = synta.Certificate.from_der(base64.b64decode(_SAN_CERT_B64))
    san_der = cert.get_extension_value_der(oids.SUBJECT_ALT_NAME)
    dec = synta.Decoder(san_der, synta.Encoding.DER)
    san_seq = dec.decode_sequence()

    _tag_names = {1: "rfc822Name", 2: "dNSName", 6: "uniformResourceIdentifier",
                  7: "iPAddress"}

    while not san_seq.is_empty():
        tag_no, tag_class, _ = san_seq.peek_tag()  # peek without consuming
        child = san_seq.decode_implicit_tag(tag_no, tag_class)
        raw = child.remaining_bytes()
        label = _tag_names.get(tag_no, f"[{tag_no}]")
        if tag_no == 7:       # iPAddress: 4 bytes for IPv4
            value = socket.inet_ntoa(raw)
        else:
            value = raw.decode("ascii")
        print(f"  {label}: {value}")


def demo_decode_raw_tlv():
    section("decode_raw_tlv — capture an extension as raw TLV bytes")
    cert = synta.Certificate.from_der(base64.b64decode(_RSA_CERT_B64))
    ext_der = cert.extensions_der
    dec = synta.Decoder(ext_der, synta.Encoding.DER)
    ext_seq = dec.decode_sequence()

    # Capture the first extension as a raw TLV blob
    raw_tlv = ext_seq.decode_raw_tlv()
    print(f"  First extension raw TLV: {len(raw_tlv)} bytes, "
          f"starts with tag 0x{raw_tlv[0]:02x}")
    # Re-parsing it yields the same extension
    dec2 = synta.Decoder(raw_tlv, synta.Encoding.DER)
    ext2 = dec2.decode_sequence()
    oid2 = ext2.decode_oid()
    print(f"  Re-parsed OID: {oid2}")


def demo_remaining_bytes():
    section("remaining_bytes — non-consuming read of value bytes")
    # Build [2] IMPLICIT IA5String "hello" (dNSName-style)
    name = b"hello"
    tlv = bytes([0x82, len(name)]) + name  # 0x82 = context[2] primitive
    dec = synta.Decoder(tlv, synta.Encoding.DER)
    child = dec.decode_implicit_tag(2, "Context")
    raw = child.remaining_bytes()
    print(f"  remaining_bytes(): {raw!r}")
    assert raw == name
    # Non-consuming: position unchanged
    assert child.remaining() == len(name)
    raw2 = child.remaining_bytes()
    assert raw2 == raw
    print("  Non-consuming: second call returns same bytes: OK")


def demo_ext_builders():
    section("synta.ext — extension DER value builders")
    import synta.ext as extmod

    # ── SubjectAlternativeNameBuilder ──────────────────────────────────────────
    san_der = (
        extmod.SubjectAlternativeNameBuilder()
        .dns_name("example.com")
        .dns_name("www.example.com")
        .rfc822_name("admin@example.com")
        .ip_address(bytes([192, 168, 1, 1]))
        .build()
    )
    print(f"  SAN DER: <{len(san_der)} bytes>")
    # First byte must be SEQUENCE tag 0x30
    assert san_der[0] == 0x30, f"expected SEQUENCE tag, got 0x{san_der[0]:02x}"

    # Short alias: SAN
    san2 = extmod.SAN().dns_name("example.com").build()
    assert san2[0] == 0x30
    print(f"  SAN alias: <{len(san2)} bytes>  OK")

    # ── ExtendedKeyUsageBuilder ────────────────────────────────────────────────
    eku_der = (
        extmod.ExtendedKeyUsageBuilder()
        .server_auth()
        .client_auth()
        .build()
    )
    print(f"  EKU DER: <{len(eku_der)} bytes>")
    assert eku_der[0] == 0x30

    # Custom OID via add_oid
    eku2 = extmod.EKU().add_oid([1, 3, 6, 1, 5, 5, 7, 3, 1]).build()
    assert eku2[0] == 0x30
    print(f"  EKU custom OID: <{len(eku2)} bytes>  OK")

    # ── AuthorityInformationAccessBuilder ──────────────────────────────────────
    aia_der = (
        extmod.AuthorityInformationAccessBuilder()
        .ocsp("http://ocsp.example.com")
        .ca_issuers("http://ca.example.com/ca.crt")
        .build()
    )
    print(f"  AIA DER: <{len(aia_der)} bytes>")
    assert aia_der[0] == 0x30

    # Short alias: AIA
    aia2 = extmod.AIA().ocsp("http://ocsp.example.com").build()
    assert aia2[0] == 0x30
    print(f"  AIA alias: <{len(aia2)} bytes>  OK")

    # ── basic_constraints ──────────────────────────────────────────────────────
    bc_ee = extmod.basic_constraints()           # end-entity: empty SEQUENCE
    assert bc_ee == bytes([0x30, 0x00]), f"expected empty SEQUENCE, got {bc_ee.hex()}"
    print(f"  basic_constraints(ca=False): {bc_ee.hex()}  OK")

    bc_ca = extmod.basic_constraints(ca=True, path_length=0)
    assert bc_ca[0] == 0x30
    print(f"  basic_constraints(ca=True, path_length=0): {bc_ca.hex()}  OK")

    # ── key_usage with KU_* bitmask constants ──────────────────────────────────
    # keyCertSign (bit5=0x020) | cRLSign (bit6=0x040) = 0x060
    ku_ca = extmod.key_usage(extmod.KU_KEY_CERT_SIGN | extmod.KU_CRL_SIGN)
    assert ku_ca[0] == 0x03, f"expected BIT STRING tag 0x03, got 0x{ku_ca[0]:02x}"
    print(f"  key_usage(keyCertSign|cRLSign): {ku_ca.hex()}  OK")

    ku_ds = extmod.key_usage(extmod.KU_DIGITAL_SIGNATURE)
    print(f"  key_usage(digitalSignature):   {ku_ds.hex()}  OK")

    # ── subject_key_identifier and authority_key_identifier ───────────────────
    # Use the SPKI DER from the hard-coded RSA certificate in this example
    cert_rsa = synta.Certificate.from_der(base64.b64decode(_RSA_CERT_B64))
    spki_der = cert_rsa.subject_public_key_info_der

    ski = extmod.subject_key_identifier(spki_der)
    assert ski[0] == 0x04, f"expected OCTET STRING tag, got 0x{ski[0]:02x}"
    print(f"  subject_key_identifier (RFC 5280): <{len(ski)} bytes>  OK")

    ski_m1 = extmod.subject_key_identifier(spki_der, method=extmod.KEYID_RFC7093M1)
    print(f"  subject_key_identifier (RFC7093M1): <{len(ski_m1)} bytes>  OK")

    aki = extmod.authority_key_identifier(spki_der)
    assert aki[0] == 0x30, f"expected SEQUENCE tag, got 0x{aki[0]:02x}"
    print(f"  authority_key_identifier: <{len(aki)} bytes>  OK")


def demo_low_level_helpers():
    section("Low-level helpers: digest, name_der_equal, encode_subject_alt_names, encode_extended_key_usage")

    # synta.digest — compute a raw hash over arbitrary bytes
    data = b"hello, synta"
    sha256 = synta.digest("sha256", data)
    sha1   = synta.digest("sha1",   data)
    assert len(sha256) == 32
    assert len(sha1) == 20
    print(f"  digest('sha256', ...): {sha256.hex()}")
    print(f"  digest('sha1',   ...): {sha1.hex()}")

    # synta.name_der_equal — compare two DER-encoded Name SEQUENCEs
    name_a = synta.NameBuilder().common_name("Test CA").organization("Synta").build()
    name_b = synta.NameBuilder().common_name("Test CA").organization("Synta").build()
    name_c = synta.NameBuilder().common_name("Other CA").build()
    assert synta.name_der_equal(name_a, name_b)
    assert not synta.name_der_equal(name_a, name_c)
    print("  name_der_equal: identical names → True, different names → False  OK")

    # synta.encode_subject_alt_names — build a GeneralNames SEQUENCE from (tag, bytes) pairs
    # tag 2 = dNSName, tag 7 = iPAddress (4-byte or 16-byte), tag 1 = rfc822Name
    import ipaddress
    dns_bytes  = b"example.com"
    ip_bytes   = ipaddress.IPv4Address("192.0.2.1").packed  # 4 bytes
    pairs = [(2, dns_bytes), (7, ip_bytes)]
    san_der = synta.encode_subject_alt_names(pairs)
    assert san_der[0] == 0x30, "expected SEQUENCE tag"
    print(f"  encode_subject_alt_names: <{len(san_der)} bytes>  OK")

    # Round-trip via parse_general_names
    parsed = synta.parse_general_names(san_der)
    assert len(parsed) == 2
    assert parsed[0] == (2, dns_bytes)
    assert parsed[1] == (7, ip_bytes)
    print("  parse_general_names round-trip: OK")

    # synta.encode_extended_key_usage — build an EKU SEQUENCE OF OID
    id_server_auth  = synta.ObjectIdentifier("1.3.6.1.5.5.7.3.1")
    id_client_auth  = synta.ObjectIdentifier("1.3.6.1.5.5.7.3.2")
    eku_der = synta.encode_extended_key_usage([id_server_auth, id_client_auth])
    assert eku_der[0] == 0x30, "expected SEQUENCE tag"
    print(f"  encode_extended_key_usage: <{len(eku_der)} bytes>  OK")

    # Also accepts plain strings
    eku_der2 = synta.encode_extended_key_usage(["1.3.6.1.5.5.7.3.1", "1.3.6.1.5.5.7.3.2"])
    assert eku_der2 == eku_der
    print("  encode_extended_key_usage (str OIDs): matches ObjectIdentifier variant  OK")


def main():
    print("=" * 60)
    print("Example 3: Extension access, SAN parsing, and ext builders")
    print("=" * 60)
    demo_subject_alt_names()
    demo_extensions_der()
    demo_get_extension_value_der()
    demo_parse_basic_constraints()
    demo_parse_san()
    demo_decode_raw_tlv()
    demo_remaining_bytes()
    demo_ext_builders()
    demo_low_level_helpers()
    print("\nAll extension examples completed.")


if __name__ == "__main__":
    main()