synta 0.1.6

ASN.1 parser, decoder, and encoder library with DER/BER support and C FFI
Documentation
# 20. `example_krb5_pkinit.py` — PKINIT protocol classes

[← Example index](index.md) · [example_krb5_pkinit.py on Codeberg](https://codeberg.org/abbra/synta/src/branch/main/examples/example_krb5_pkinit.py)

Bindings: `EncryptionKey`, `Checksum`, `KDFAlgorithmId`, `IssuerAndSerialNumber`,
`ExternalPrincipalIdentifier`, `PKAuthenticator`, `AuthPack`, `PaPkAsReq`,
`DHRepInfo`, `KDCDHKeyInfo`, `ReplyKeyPack`, `PaPkAsRep`.

For each class: parse hand-crafted DER bytes with `from_der`, access every property,
and print a summary of key values.

## Source

```python
#!/usr/bin/env python3
"""
Example 19: PKINIT ASN.1 structures (RFC 4556, RFC 8636).

Demonstrates: synta.krb5.EncryptionKey, synta.krb5.Checksum,
synta.krb5.KDFAlgorithmId, synta.krb5.IssuerAndSerialNumber,
synta.krb5.ExternalPrincipalIdentifier, synta.krb5.PKAuthenticator,
synta.krb5.DHRepInfo, synta.krb5.KDCDHKeyInfo,
synta.krb5.ReplyKeyPack, synta.krb5.PaPkAsRep,
synta.krb5.AuthPack, synta.krb5.PaPkAsReq.
"""

import synta
import synta.krb5 as krb5


def section(title):
    print(f"\n{'─' * 60}\n{title}\n{'─' * 60}")


# ── DER building helpers ──────────────────────────────────────

def _enc(fn):
    """Encode one element and return its DER bytes."""
    enc = synta.Encoder(synta.Encoding.DER)
    fn(enc)
    return enc.finish()


def _seq(*parts):
    """Wrap raw TLV bytes in a SEQUENCE."""
    enc = synta.Encoder(synta.Encoding.DER)
    enc.encode_sequence(b"".join(parts))
    return enc.finish()


def _explicit(n, content):
    """Wrap bytes in a [n] EXPLICIT context tag (constructed)."""
    enc = synta.Encoder(synta.Encoding.DER)
    enc.encode_explicit_tag(n, "Context", content)
    return enc.finish()


def _implicit(n, inner_der):
    """Standard [n] IMPLICIT encoding: replace the original tag with a context
    tag, preserving the original type's constructed bit.

    For primitive types (OCTET STRING, BIT STRING, …): context tag 0x80|n.
    For constructed types (SEQUENCE, SET, …):            context tag 0xa0|n.

    The length and value bytes are unchanged — only the tag byte is replaced.
    Assumes definite short-form length (< 128 bytes; sufficient for test vectors).
    """
    orig_tag = inner_der[0]
    constructed = orig_tag & 0x20          # bit 5 — preserved in context tag
    length_byte = inner_der[1]
    assert length_byte < 0x80, "only short-form length supported in helper"
    content = inner_der[2:2 + length_byte]
    ctx_tag = 0x80 | constructed | n
    return bytes([ctx_tag, len(content)]) + content


def _int_der(v):
    return _enc(lambda e: e.encode_integer(v))


def _oct_der(b):
    return _enc(lambda e: e.encode_octet_string(b))


def _oid_der(s):
    return _enc(lambda e: e.encode_oid(synta.ObjectIdentifier(s)))


def _gt_der():
    return _enc(
        lambda e: e.encode_generalized_time(
            synta.GeneralizedTime(2026, 1, 1, 0, 0, 0, None)
        )
    )


def _bit_der(b):
    return _enc(lambda e: e.encode_bit_string(synta.BitString(b, 0)))


# ── Pre-built DER for each PKINIT structure ───────────────────

# EncryptionKey { [0] EXPLICIT keytype, [1] EXPLICIT keyvalue }
ENC_KEY_DER = _seq(
    _explicit(0, _int_der(17)),            # keytype=17 (aes128-cts)
    _explicit(1, _oct_der(b"\xaa" * 16)),  # 16-byte key material
)

# Checksum { [0] EXPLICIT cksumtype, [1] EXPLICIT checksum }
CHECKSUM_DER = _seq(
    _explicit(0, _int_der(7)),             # cksumtype=7 (rsa-md5)
    _explicit(1, _oct_der(b"\xbb" * 8)),   # 8-byte checksum
)

# KDFAlgorithmId { OID }  (no explicit context tags)
KDFALGOID_DER = _seq(_oid_der("1.3.6.1.5.2.3.6.2"))  # id-pkinit-kdf-ah-sha256

# IssuerAndSerialNumber { OCTET STRING issuer, INTEGER serial }
ISSUER_SERIAL_DER = _seq(
    _oct_der(b"\x30\x00"),  # issuer: OCTET STRING holding an empty Name SEQUENCE
    _int_der(42),            # serial_number
)

# PKAuthenticator { [0] cusec, [1] ctime, [2] nonce }  (optional fields omitted)
PKA_DER = _seq(
    _explicit(0, _int_der(0)),      # cusec=0 µs
    _explicit(1, _gt_der()),        # ctime=20260101000000Z
    _explicit(2, _int_der(12345)),  # nonce
)

# PKAuthenticator with pa_checksum ([3] EXPLICIT OCTET STRING)
PKA_WITH_CHECKSUM_DER = _seq(
    _explicit(0, _int_der(500)),
    _explicit(1, _gt_der()),
    _explicit(2, _int_der(99999)),
    _explicit(3, _oct_der(b"\xcc" * 20)),  # pa_checksum: SHA-1 of AS-REQ body
)

# DHRepInfo { [0] IMPLICIT dh_signed_data: OctetStringRef }
DHREPINFO_DER = _seq(_implicit(0, _oct_der(b"\xca\xfe\xba\xbe")))

# KDCDHKeyInfo { [0] EXPLICIT BIT STRING, [1] EXPLICIT INTEGER nonce }
KDCDHKEYINFO_DER = _seq(
    _explicit(0, _bit_der(b"\xaa\xbb\xcc\xdd")),  # subject_public_key (4-byte DH key)
    _explicit(1, _int_der(12345)),                  # nonce
)

# ReplyKeyPack { [0] EXPLICIT EncryptionKey, [1] EXPLICIT Checksum }
REPLYKEYPACK_DER = _seq(
    _explicit(0, ENC_KEY_DER),
    _explicit(1, CHECKSUM_DER),
)

# ExternalPrincipalIdentifier with only subject_key_identifier ([2] IMPLICIT OctetStringRef)
EPI_SKI_DER = _seq(_implicit(2, _oct_der(b"\xde\xad\xbe\xef")))

# ExternalPrincipalIdentifier with issuer_and_serial_number ([1] IMPLICIT IssuerAndSerialNumber)
EPI_ISN_DER = _seq(_implicit(1, ISSUER_SERIAL_DER))

# AuthPack (minimal): { [0] EXPLICIT PKAuthenticator }
AUTHPACK_DER = _seq(_explicit(0, PKA_DER))

# AuthPack with supported_kdfs ([4] EXPLICIT SEQUENCE OF KDFAlgorithmId)
AUTHPACK_KDF_DER = _seq(
    _explicit(0, PKA_DER),
    _explicit(4, _seq(KDFALGOID_DER)),  # SEQUENCE OF one KDFAlgorithmId
)

# PaPkAsReq (minimal): { [0] IMPLICIT signed_auth_pack: OctetStringRef }
PAPKASREQ_DER = _seq(_implicit(0, _oct_der(b"\x01\x02\x03\x04")))

# PaPkAsRep — CHOICE: [0] EXPLICIT DHRepInfo
PAPKASREP_DHINFO_DER = _explicit(0, DHREPINFO_DER)

# PaPkAsRep — CHOICE: [1] IMPLICIT OctetString (EncKeyPack, owned type)
PAPKASREP_ENCKEYPACK_DER = _implicit(1, _oct_der(b"\x05\x06\x07\x08"))


# ── Demo functions ────────────────────────────────────────────

def demo_encryption_key():
    section("EncryptionKey — RFC 3961 §2")
    key = krb5.EncryptionKey.from_der(ENC_KEY_DER)
    assert key.keytype == 17
    assert key.keyvalue == b"\xaa" * 16
    print(f"  keytype:  {key.keytype}  (aes128-cts-hmac-sha1-96)")
    print(f"  keyvalue: {key.keyvalue.hex()}")
    print(f"  repr:     {repr(key)}")


def demo_checksum():
    section("Checksum — RFC 3961 §4")
    ck = krb5.Checksum.from_der(CHECKSUM_DER)
    assert ck.cksumtype == 7
    assert ck.checksum == b"\xbb" * 8
    print(f"  cksumtype: {ck.cksumtype}  (rsa-md5)")
    print(f"  checksum:  {ck.checksum.hex()}")
    print(f"  repr:      {repr(ck)}")


def demo_kdf_algorithm_id():
    section("KDFAlgorithmId — RFC 8636 §3.1")
    kdf = krb5.KDFAlgorithmId.from_der(KDFALGOID_DER)
    assert str(kdf.kdf_id) == "1.3.6.1.5.2.3.6.2"
    print(f"  kdf_id: {kdf.kdf_id}  (id-pkinit-kdf-ah-sha256)")
    print(f"  repr:   {repr(kdf)}")


def demo_issuer_and_serial_number():
    section("IssuerAndSerialNumber — RFC 4556 §3.2.2")
    isn = krb5.IssuerAndSerialNumber.from_der(ISSUER_SERIAL_DER)
    # issuer bytes are the OCTET STRING value (raw Name DER bytes)
    assert isn.issuer == b"\x30\x00"
    assert isn.serial_number == 42
    print(f"  issuer (raw Name bytes): {isn.issuer.hex()}")
    print(f"  serial_number:           {isn.serial_number}")
    print(f"  repr:                    {repr(isn)}")


def demo_external_principal_identifier():
    section("ExternalPrincipalIdentifier — RFC 4556 §3.2.2")

    # All fields OPTIONAL. Empty SEQUENCE: all None.
    epi_empty = krb5.ExternalPrincipalIdentifier.from_der(b"\x30\x00")
    assert epi_empty.subject_name is None
    assert epi_empty.issuer_and_serial_number is None
    assert epi_empty.subject_key_identifier is None
    print(f"  empty EPI: all fields None ✓")

    # With subject_key_identifier ([2] IMPLICIT OctetStringRef)
    epi = krb5.ExternalPrincipalIdentifier.from_der(EPI_SKI_DER)
    assert epi.subject_name is None
    assert epi.issuer_and_serial_number is None
    assert epi.subject_key_identifier == b"\xde\xad\xbe\xef"
    print(f"  subject_key_identifier: {epi.subject_key_identifier.hex()}")

    # With issuer_and_serial_number ([1] IMPLICIT IssuerAndSerialNumber)
    epi2 = krb5.ExternalPrincipalIdentifier.from_der(EPI_ISN_DER)
    assert epi2.subject_name is None
    assert epi2.subject_key_identifier is None
    isn = epi2.issuer_and_serial_number
    assert isn is not None
    assert isn.serial_number == 42
    print(f"  issuer_and_serial_number.serial: {isn.serial_number}")


def demo_pk_authenticator():
    section("PKAuthenticator — RFC 4556 §3.2.1")

    # Minimal: cusec, ctime, nonce only
    auth = krb5.PKAuthenticator.from_der(PKA_DER)
    assert auth.cusec == 0
    assert auth.ctime == "20260101000000Z"
    assert auth.nonce == 12345
    assert auth.pa_checksum is None
    assert auth.freshness_token is None
    print(f"  cusec:           {auth.cusec}")
    print(f"  ctime:           {auth.ctime}")
    print(f"  nonce:           {auth.nonce}")
    print(f"  pa_checksum:     {auth.pa_checksum}")
    print(f"  freshness_token: {auth.freshness_token}")
    print(f"  repr:            {repr(auth)}")

    # With pa_checksum ([3] EXPLICIT OCTET STRING)
    auth2 = krb5.PKAuthenticator.from_der(PKA_WITH_CHECKSUM_DER)
    assert auth2.cusec == 500
    assert auth2.nonce == 99999
    assert auth2.pa_checksum == b"\xcc" * 20
    print(f"  (with pa_checksum) pa_checksum: {auth2.pa_checksum.hex()[:16]}...")


def demo_dh_rep_info():
    section("DHRepInfo — RFC 4556 §3.2.4")
    rep = krb5.DHRepInfo.from_der(DHREPINFO_DER)
    assert rep.dh_signed_data == b"\xca\xfe\xba\xbe"
    assert rep.server_dhnonce is None
    print(f"  dh_signed_data ({len(rep.dh_signed_data)} bytes): {rep.dh_signed_data.hex()}")
    print(f"  server_dhnonce: {rep.server_dhnonce}")
    print(f"  repr:           {repr(rep)}")


def demo_kdc_dh_key_info():
    section("KDCDHKeyInfo — RFC 4556 §3.2.4")
    info = krb5.KDCDHKeyInfo.from_der(KDCDHKEYINFO_DER)
    # subject_public_key: BitStringRef.as_bytes() returns bit data only (no unused-bits byte)
    assert info.subject_public_key == b"\xaa\xbb\xcc\xdd"
    assert info.nonce == 12345
    assert info.dh_key_expiration is None
    print(f"  subject_public_key: {info.subject_public_key.hex()}")
    print(f"  nonce:              {info.nonce}")
    print(f"  dh_key_expiration:  {info.dh_key_expiration}")
    print(f"  repr:               {repr(info)}")


def demo_reply_key_pack():
    section("ReplyKeyPack — RFC 4556 §3.2.3")
    pack = krb5.ReplyKeyPack.from_der(REPLYKEYPACK_DER)
    key = pack.reply_key
    ck = pack.as_checksum
    assert key.keytype == 17
    assert key.keyvalue == b"\xaa" * 16
    assert ck.cksumtype == 7
    assert ck.checksum == b"\xbb" * 8
    print(f"  reply_key.keytype:      {key.keytype}")
    print(f"  reply_key.keyvalue:     {key.keyvalue.hex()}")
    print(f"  as_checksum.cksumtype:  {ck.cksumtype}")
    print(f"  as_checksum.checksum:   {ck.checksum.hex()}")


def demo_pa_pk_as_rep():
    section("PaPkAsRep — RFC 4556 §3.2.4 (CHOICE)")

    # DhInfo variant: [0] EXPLICIT DHRepInfo
    rep = krb5.PaPkAsRep.from_der(PAPKASREP_DHINFO_DER)
    assert rep.variant == "DhInfo"
    assert rep.enc_key_pack is None
    dh = rep.dh_info
    assert dh is not None
    assert dh.dh_signed_data == b"\xca\xfe\xba\xbe"
    print(f"  variant: {rep.variant}")
    print(f"  dh_info.dh_signed_data: {dh.dh_signed_data.hex()}")
    print(f"  repr: {repr(rep)}")

    # EncKeyPack variant: [1] IMPLICIT OctetString
    rep2 = krb5.PaPkAsRep.from_der(PAPKASREP_ENCKEYPACK_DER)
    assert rep2.variant == "EncKeyPack"
    assert rep2.dh_info is None
    assert rep2.enc_key_pack == b"\x05\x06\x07\x08"
    print(f"  variant: {rep2.variant}")
    print(f"  enc_key_pack: {rep2.enc_key_pack.hex()}")


def demo_auth_pack():
    section("AuthPack — RFC 4556 §3.2.1")

    # Minimal: only pk_authenticator
    pack = krb5.AuthPack.from_der(AUTHPACK_DER)
    pka = pack.pk_authenticator
    assert pka.nonce == 12345
    assert pka.ctime == "20260101000000Z"
    assert pack.client_public_value is None
    assert pack.supported_cmstypes is None
    assert pack.client_dhnonce is None
    assert pack.supported_kdfs is None
    print(f"  pk_authenticator.nonce: {pka.nonce}")
    print(f"  pk_authenticator.ctime: {pka.ctime}")
    print(f"  client_public_value:    {pack.client_public_value}")
    print(f"  supported_kdfs:         {pack.supported_kdfs}")

    # With supported_kdfs: [4] EXPLICIT SEQUENCE OF KDFAlgorithmId
    pack2 = krb5.AuthPack.from_der(AUTHPACK_KDF_DER)
    kdfs = pack2.supported_kdfs
    assert kdfs is not None
    assert len(kdfs) == 1
    assert str(kdfs[0].kdf_id) == "1.3.6.1.5.2.3.6.2"
    print(f"  supported_kdfs[0]: {kdfs[0].kdf_id}")


def demo_pa_pk_as_req():
    section("PaPkAsReq — RFC 4556 §3.2.2")
    req = krb5.PaPkAsReq.from_der(PAPKASREQ_DER)
    assert req.signed_auth_pack == b"\x01\x02\x03\x04"
    assert req.trusted_certifiers is None
    assert req.kdc_pk_id is None
    print(f"  signed_auth_pack ({len(req.signed_auth_pack)} bytes): {req.signed_auth_pack.hex()}")
    print(f"  trusted_certifiers: {req.trusted_certifiers}")
    print(f"  kdc_pk_id:          {req.kdc_pk_id}")
    print(f"  repr:               {repr(req)}")


def main():
    print("=" * 60)
    print("Example 19: PKINIT ASN.1 structures (RFC 4556 / RFC 8636)")
    print("=" * 60)
    demo_encryption_key()
    demo_checksum()
    demo_kdf_algorithm_id()
    demo_issuer_and_serial_number()
    demo_external_principal_identifier()
    demo_pk_authenticator()
    demo_dh_rep_info()
    demo_kdc_dh_key_info()
    demo_reply_key_pack()
    demo_pa_pk_as_rep()
    demo_auth_pack()
    demo_pa_pk_as_req()
    print("\nAll PKINIT examples completed.")


if __name__ == "__main__":
    main()
```