synta 0.1.9

ASN.1 parser, decoder, and encoder library with DER/BER support and C FFI
Documentation
# 28. `example_spnego.py` — SPNEGO negotiation tokens (RFC 4178)

[← Example index](index.md) · [example_spnego.py on Codeberg](https://codeberg.org/abbra/synta/src/branch/main/examples/example_spnego.py)

Bindings: `synta.spnego.NegTokenInit`, `synta.spnego.NegTokenResp`,
`synta.spnego.NegotiationToken`, `NEG_STATE_*` constants, `SPNEGO_OID`.

> **DER encoding note:** GssapiSpnego.asn1 uses `DEFINITIONS IMPLICIT TAGS`.
> The example builds all test vectors using IMPLICIT encoding helpers.

- Parse `NegTokenInit` with `mech_types` (two OIDs) and `mech_token`; verify
  each property and confirm `req_flags`/`mech_list_mic` are absent.
- Parse an empty `NegTokenInit` SEQUENCE; verify `mech_types == []`.
- Parse `NegotiationToken` in the `negTokenInit` CHOICE arm (`0xa0`) and the
  `negTokenResp` arm (`0xa1`).
- Parse a GSSAPI-wrapped token (`0x60` APPLICATION tag) and verify that
  `from_der` strips the OID prefix automatically.
- Parse `NegTokenResp` with `neg_state` 3 (`NEG_STATE_REQUEST_MIC`) and a
  `response_token`; verify `supported_mech` and `mech_list_mic`.
- Verify all four `NEG_STATE_*` integer constants and `SPNEGO_OID`.

## Source

```python
#!/usr/bin/env python3
"""
Example 21: SPNEGO negotiation tokens (RFC 4178).

Demonstrates: synta.spnego.NegTokenInit, synta.spnego.NegTokenResp,
synta.spnego.NegotiationToken, NEG_STATE_* constants, SPNEGO_OID.

GssapiSpnego.asn1 uses DEFINITIONS IMPLICIT TAGS, so all context-tagged
fields use IMPLICIT encoding (the universal tag is REPLACED, not wrapped).
"""

import synta
import synta.spnego as spnego


def section(title):
    print(f"\n{'─' * 60}\n{title}\n{'─' * 60}")


# ── DER building helpers ──────────────────────────────────────

def _enc(fn):
    """Encode one element and return its DER bytes."""
    enc = synta.Encoder(synta.Encoding.DER)
    fn(enc)
    return enc.finish()


def _oid(dotted):
    """Encode an OID in dot-notation and return its DER bytes (06 <len> <value>)."""
    return _enc(lambda e: e.encode_oid(synta.ObjectIdentifier(dotted)))


def _octet(data):
    """Encode bytes as an OCTET STRING and return DER bytes (04 <len> <data>)."""
    return _enc(lambda e: e.encode_octet_string(data))


def _encode_length(n):
    """Encode an ASN.1 definite-form length."""
    if n < 0x80:
        return bytes([n])
    elif n < 0x100:
        return bytes([0x81, n])
    elif n < 0x10000:
        return bytes([0x82, (n >> 8) & 0xFF, n & 0xFF])
    else:
        raise ValueError(f"length {n} too large for this helper")


def _tlv(tag_byte, body):
    """Build a TLV with the given tag byte and body."""
    return bytes([tag_byte]) + _encode_length(len(body)) + body


# GssapiSpnego.asn1 uses DEFINITIONS IMPLICIT TAGS.  For an IMPLICIT tag:
#   - Constructed types (SEQUENCE, SEQUENCE OF): outer tag 0x30 → 0xa0|n
#   - Primitive types  (OID, OCTET STRING, ENUMERATED): universal tag → 0x80|n

def _implicit_seqof(tag_n, *elem_ders):
    """[n] IMPLICIT SEQUENCE OF: tag byte 0xa0|n, body = all element DERs."""
    return _tlv(0xa0 | tag_n, b"".join(elem_ders))


def _implicit_primitive(tag_n, universal_der):
    """[n] IMPLICIT primitive: replace the first (universal) tag byte with 0x80|n."""
    return bytes([0x80 | tag_n]) + universal_der[1:]


# ── Pre-built DER test vectors ────────────────────────────────

_SPNEGO_OID_DER = _oid("1.3.6.1.5.5.2")
_KRB5_OID_DER   = _oid("1.2.840.113554.1.2.2")

# ── NegTokenInit ──────────────────────────────────────────────
#
# NegTokenInit ::= SEQUENCE {
#   mechTypes     [0] IMPLICIT SEQUENCE OF OID  OPTIONAL,
#   reqFlags      [1] IMPLICIT BIT STRING        OPTIONAL,
#   mechToken     [2] IMPLICIT OCTET STRING      OPTIONAL,
#   mechListMIC   [3] IMPLICIT OCTET STRING      OPTIONAL }
#
# [0] IMPLICIT SEQUENCE OF OID → a0 <len> <OID1> <OID2>  (no inner 0x30 tag)
_MECH_TYPES_FIELD = _implicit_seqof(0, _SPNEGO_OID_DER, _KRB5_OID_DER)

# [2] IMPLICIT OCTET STRING → 82 <len> <bytes>  (replaces 0x04 with 0x82)
_MECH_TOKEN_FIELD = _implicit_primitive(2, _octet(b"\x01\x02\x03"))

# NegTokenInit body (without outer SEQUENCE tag)
_NEG_TOKEN_INIT_BODY = _MECH_TYPES_FIELD + _MECH_TOKEN_FIELD

# NegTokenInit as a plain SEQUENCE  (for NegTokenInit.from_der)
NEG_TOKEN_INIT_DER = _tlv(0x30, _NEG_TOKEN_INIT_BODY)

# NegotiationToken CHOICE, negTokenInit [0] IMPLICIT NegTokenInit:
#   IMPLICIT [0] on SEQUENCE → 0xa0 replaces 0x30, same body
NEG_TOKEN_CHOICE_INIT_DER = _tlv(0xa0, _NEG_TOKEN_INIT_BODY)

# ── NegTokenResp ──────────────────────────────────────────────
#
# NegTokenResp ::= SEQUENCE {
#   negState       [0] IMPLICIT ENUMERATED  OPTIONAL,
#   supportedMech  [1] IMPLICIT OID         OPTIONAL,
#   responseToken  [2] IMPLICIT OCTET STRING OPTIONAL,
#   mechListMIC    [3] IMPLICIT OCTET STRING OPTIONAL }
#
# [0] IMPLICIT ENUMERATED → 80 01 <value>  (replaces 0x0a with 0x80)
_NEG_STATE_FIELD = _implicit_primitive(0, bytes([0x0a, 0x01, 0x00]))  # accept-completed

# [1] IMPLICIT OID → 81 <oid-value-len> <oid-value>  (replaces 0x06 with 0x81)
_SUPPORTED_MECH_FIELD = _implicit_primitive(1, _KRB5_OID_DER)

# NegTokenResp body (without outer SEQUENCE tag)
_NEG_TOKEN_RESP_BODY = _NEG_STATE_FIELD + _SUPPORTED_MECH_FIELD

# NegTokenResp as a plain SEQUENCE  (for NegTokenResp.from_der)
NEG_TOKEN_RESP_DER = _tlv(0x30, _NEG_TOKEN_RESP_BODY)

# NegotiationToken CHOICE, negTokenResp [1] IMPLICIT NegTokenResp:
#   IMPLICIT [1] on SEQUENCE → 0xa1 replaces 0x30, same body
NEG_TOKEN_CHOICE_RESP_DER = _tlv(0xa1, _NEG_TOKEN_RESP_BODY)

# ── NegTokenResp with responseToken ──────────────────────────

_NEG_STATE_REQ_MIC    = _implicit_primitive(0, bytes([0x0a, 0x01, 0x03]))  # request-mic
_RESPONSE_TOKEN_FIELD = _implicit_primitive(2, _octet(b"\xde\xad\xbe\xef"))

NEG_TOKEN_RESP_WITH_TOKEN_DER = _tlv(
    0x30,
    _NEG_STATE_REQ_MIC + _SUPPORTED_MECH_FIELD + _RESPONSE_TOKEN_FIELD,
)

# ── Full GSSAPI-wrapped SPNEGO token ──────────────────────────
#
# 60 <len>
#   06 <len> <SPNEGO OID>   thisMech
#   a0 <len> ...             NegotiationToken CHOICE (negTokenInit arm)
GSSAPI_SPNEGO_DER = _tlv(0x60, _SPNEGO_OID_DER + NEG_TOKEN_CHOICE_INIT_DER)


# ── Demo functions ────────────────────────────────────────────

def demo_neg_token_init():
    section("NegTokenInit — RFC 4178 §4.2.1")

    tok = spnego.NegTokenInit.from_der(NEG_TOKEN_INIT_DER)

    # mechTypes is a list of OID dot-notation strings
    assert isinstance(tok.mech_types, list)
    assert len(tok.mech_types) == 2
    assert tok.mech_types[0] == "1.3.6.1.5.5.2"
    assert tok.mech_types[1] == "1.2.840.113554.1.2.2"
    print(f"  mech_types: {tok.mech_types}")

    # mechToken is the raw token bytes for the first (preferred) mechanism
    assert tok.mech_token == b"\x01\x02\x03"
    print(f"  mech_token ({len(tok.mech_token)} bytes): {tok.mech_token.hex()}")

    # reqFlags and mechListMIC are absent
    assert tok.req_flags is None
    assert tok.mech_list_mic is None
    print(f"  req_flags:    {tok.req_flags}")
    print(f"  mech_list_mic: {tok.mech_list_mic}")

    print(f"  repr: {repr(tok)}")


def demo_neg_token_init_empty():
    section("NegTokenInit — all OPTIONAL fields absent")

    # Minimal NegTokenInit: empty SEQUENCE
    empty_seq = b"\x30\x00"
    tok = spnego.NegTokenInit.from_der(empty_seq)
    assert tok.mech_types == []
    assert tok.req_flags is None
    assert tok.mech_token is None
    assert tok.mech_list_mic is None
    print(f"  mech_types:    {tok.mech_types}  (empty list)")
    print(f"  mech_token:    {tok.mech_token}")
    print(f"  mech_list_mic: {tok.mech_list_mic}")
    print(f"  repr: {repr(tok)}")


def demo_negotiation_token_init_arm():
    section("NegotiationToken CHOICE — negTokenInit arm (0xa0)")

    # Parse from raw CHOICE form (0xa0 leading byte)
    tok = spnego.NegotiationToken.from_der(NEG_TOKEN_CHOICE_INIT_DER)
    assert tok.variant == "NegTokenInit"
    assert tok.neg_token_resp is None

    init = tok.neg_token_init
    assert init is not None
    assert init.mech_types[0] == "1.3.6.1.5.5.2"
    assert init.mech_token == b"\x01\x02\x03"

    print(f"  variant:                 {tok.variant}")
    print(f"  neg_token_init.mech_types: {init.mech_types}")
    print(f"  neg_token_init.mech_token: {init.mech_token.hex()}")
    print(f"  repr: {repr(tok)}")


def demo_negotiation_token_resp_arm():
    section("NegotiationToken CHOICE — negTokenResp arm (0xa1)")

    tok = spnego.NegotiationToken.from_der(NEG_TOKEN_CHOICE_RESP_DER)
    assert tok.variant == "NegTokenResp"
    assert tok.neg_token_init is None

    resp = tok.neg_token_resp
    assert resp is not None
    assert resp.neg_state == spnego.NEG_STATE_ACCEPT_COMPLETED
    assert resp.supported_mech == "1.2.840.113554.1.2.2"
    assert resp.response_token is None
    assert resp.mech_list_mic is None

    print(f"  variant:                    {tok.variant}")
    print(f"  neg_token_resp.neg_state:   {resp.neg_state}  (accept-completed)")
    print(f"  neg_token_resp.supported_mech: {resp.supported_mech}")
    print(f"  repr: {repr(tok)}")


def demo_gssapi_wrapped():
    section("Full GSSAPI-wrapped SPNEGO token (0x60 APPLICATION tag)")

    # NegotiationToken.from_der handles the GSSAPI [APPLICATION 0] wrapper
    # transparently — the leading 0x60 tag triggers automatic stripping of
    # the OID prefix before decoding the inner NegotiationToken CHOICE.
    tok = spnego.NegotiationToken.from_der(GSSAPI_SPNEGO_DER)
    assert tok.variant == "NegTokenInit"

    init = tok.neg_token_init
    assert init is not None
    assert "1.3.6.1.5.5.2" in init.mech_types
    assert init.mech_token == b"\x01\x02\x03"

    print(f"  GSSAPI token length: {len(GSSAPI_SPNEGO_DER)} bytes")
    print(f"  leading byte: 0x{GSSAPI_SPNEGO_DER[0]:02x}  (APPLICATION 0)")
    print(f"  variant:      {tok.variant}")
    print(f"  mech_types:   {init.mech_types}")
    print(f"  mech_token:   {init.mech_token.hex()}")

    # Verify that parsing the same token in its plain CHOICE form gives the
    # same result (NegotiationToken also accepts 0xa0/0xa1 directly).
    tok2 = spnego.NegotiationToken.from_der(NEG_TOKEN_CHOICE_INIT_DER)
    assert tok2.variant == tok.variant
    assert tok2.neg_token_init.mech_types == init.mech_types
    print(f"  plain-CHOICE parse matches GSSAPI-wrapped parse ✓")


def demo_neg_token_resp_with_response_token():
    section("NegTokenResp — with responseToken and request-mic state")

    tok = spnego.NegTokenResp.from_der(NEG_TOKEN_RESP_WITH_TOKEN_DER)

    assert tok.neg_state == spnego.NEG_STATE_REQUEST_MIC
    assert tok.supported_mech == "1.2.840.113554.1.2.2"
    assert tok.response_token == b"\xde\xad\xbe\xef"
    assert tok.mech_list_mic is None

    print(f"  neg_state:      {tok.neg_state}  (request-mic)")
    print(f"  supported_mech: {tok.supported_mech}")
    print(f"  response_token: {tok.response_token.hex()}")
    print(f"  mech_list_mic:  {tok.mech_list_mic}")
    print(f"  repr: {repr(tok)}")


def demo_neg_state_constants():
    section("NEG_STATE_* constants — RFC 4178 §4.2.2")

    print(f"  NEG_STATE_ACCEPT_COMPLETED  = {spnego.NEG_STATE_ACCEPT_COMPLETED}")
    print(f"  NEG_STATE_ACCEPT_INCOMPLETE = {spnego.NEG_STATE_ACCEPT_INCOMPLETE}")
    print(f"  NEG_STATE_REJECT            = {spnego.NEG_STATE_REJECT}")
    print(f"  NEG_STATE_REQUEST_MIC       = {spnego.NEG_STATE_REQUEST_MIC}")
    print(f"  SPNEGO_OID                  = {spnego.SPNEGO_OID}")

    assert spnego.NEG_STATE_ACCEPT_COMPLETED  == 0
    assert spnego.NEG_STATE_ACCEPT_INCOMPLETE == 1
    assert spnego.NEG_STATE_REJECT            == 2
    assert spnego.NEG_STATE_REQUEST_MIC       == 3
    assert spnego.SPNEGO_OID == "1.3.6.1.5.5.2"
    print(f"  all constant values verified ✓")

    # Typical usage pattern: compare neg_state against a constant
    resp = spnego.NegTokenResp.from_der(NEG_TOKEN_RESP_DER)
    if resp.neg_state == spnego.NEG_STATE_ACCEPT_COMPLETED:
        print(f"  negotiation accepted — state matches NEG_STATE_ACCEPT_COMPLETED ✓")
    else:
        raise AssertionError("unexpected neg_state")


def demo_repr():
    section("repr() — readable representations")

    init = spnego.NegTokenInit.from_der(NEG_TOKEN_INIT_DER)
    resp = spnego.NegTokenResp.from_der(NEG_TOKEN_RESP_DER)
    tok_init = spnego.NegotiationToken.from_der(NEG_TOKEN_CHOICE_INIT_DER)
    tok_resp = spnego.NegotiationToken.from_der(NEG_TOKEN_CHOICE_RESP_DER)

    print(f"  NegTokenInit:        {repr(init)}")
    print(f"  NegTokenResp:        {repr(resp)}")
    print(f"  NegotiationToken[0]: {repr(tok_init)}")
    print(f"  NegotiationToken[1]: {repr(tok_resp)}")


def main():
    print("=" * 60)
    print("Example 21: SPNEGO negotiation tokens (RFC 4178)")
    print("=" * 60)
    demo_neg_token_init()
    demo_neg_token_init_empty()
    demo_negotiation_token_init_arm()
    demo_negotiation_token_resp_arm()
    demo_gssapi_wrapped()
    demo_neg_token_resp_with_response_token()
    demo_neg_state_constants()
    demo_repr()
    print("\nAll SPNEGO examples completed.")


if __name__ == "__main__":
    main()
```