# 28. `example_spnego.py` — SPNEGO negotiation tokens (RFC 4178)
[← Example index](index.md) · [example_spnego.py on Codeberg](https://codeberg.org/abbra/synta/src/branch/main/examples/example_spnego.py)
Bindings: `synta.spnego.NegTokenInit`, `synta.spnego.NegTokenResp`,
`synta.spnego.NegotiationToken`, `NEG_STATE_*` constants, `SPNEGO_OID`.
> **DER encoding note:** GssapiSpnego.asn1 uses `DEFINITIONS IMPLICIT TAGS`.
> The example builds all test vectors using IMPLICIT encoding helpers.
- Parse `NegTokenInit` with `mech_types` (two OIDs) and `mech_token`; verify
each property and confirm `req_flags`/`mech_list_mic` are absent.
- Parse an empty `NegTokenInit` SEQUENCE; verify `mech_types == []`.
- Parse `NegotiationToken` in the `negTokenInit` CHOICE arm (`0xa0`) and the
`negTokenResp` arm (`0xa1`).
- Parse a GSSAPI-wrapped token (`0x60` APPLICATION tag) and verify that
`from_der` strips the OID prefix automatically.
- Parse `NegTokenResp` with `neg_state` 3 (`NEG_STATE_REQUEST_MIC`) and a
`response_token`; verify `supported_mech` and `mech_list_mic`.
- Verify all four `NEG_STATE_*` integer constants and `SPNEGO_OID`.
## Source
```python
#!/usr/bin/env python3
"""
Example 21: SPNEGO negotiation tokens (RFC 4178).
Demonstrates: synta.spnego.NegTokenInit, synta.spnego.NegTokenResp,
synta.spnego.NegotiationToken, NEG_STATE_* constants, SPNEGO_OID.
GssapiSpnego.asn1 uses DEFINITIONS IMPLICIT TAGS, so all context-tagged
fields use IMPLICIT encoding (the universal tag is REPLACED, not wrapped).
"""
import synta
import synta.spnego as spnego
def section(title):
print(f"\n{'─' * 60}\n{title}\n{'─' * 60}")
# ── DER building helpers ──────────────────────────────────────
def _enc(fn):
"""Encode one element and return its DER bytes."""
enc = synta.Encoder(synta.Encoding.DER)
fn(enc)
return enc.finish()
def _oid(dotted):
"""Encode an OID in dot-notation and return its DER bytes (06 <len> <value>)."""
return _enc(lambda e: e.encode_oid(synta.ObjectIdentifier(dotted)))
def _octet(data):
"""Encode bytes as an OCTET STRING and return DER bytes (04 <len> <data>)."""
return _enc(lambda e: e.encode_octet_string(data))
def _encode_length(n):
"""Encode an ASN.1 definite-form length."""
if n < 0x80:
return bytes([n])
elif n < 0x100:
return bytes([0x81, n])
elif n < 0x10000:
return bytes([0x82, (n >> 8) & 0xFF, n & 0xFF])
else:
raise ValueError(f"length {n} too large for this helper")
def _tlv(tag_byte, body):
"""Build a TLV with the given tag byte and body."""
return bytes([tag_byte]) + _encode_length(len(body)) + body
# GssapiSpnego.asn1 uses DEFINITIONS IMPLICIT TAGS. For an IMPLICIT tag:
# - Constructed types (SEQUENCE, SEQUENCE OF): outer tag 0x30 → 0xa0|n
# - Primitive types (OID, OCTET STRING, ENUMERATED): universal tag → 0x80|n
def _implicit_seqof(tag_n, *elem_ders):
"""[n] IMPLICIT SEQUENCE OF: tag byte 0xa0|n, body = all element DERs."""
return _tlv(0xa0 | tag_n, b"".join(elem_ders))
def _implicit_primitive(tag_n, universal_der):
"""[n] IMPLICIT primitive: replace the first (universal) tag byte with 0x80|n."""
return bytes([0x80 | tag_n]) + universal_der[1:]
# ── Pre-built DER test vectors ────────────────────────────────
_SPNEGO_OID_DER = _oid("1.3.6.1.5.5.2")
_KRB5_OID_DER = _oid("1.2.840.113554.1.2.2")
# ── NegTokenInit ──────────────────────────────────────────────
#
# NegTokenInit ::= SEQUENCE {
# mechTypes [0] IMPLICIT SEQUENCE OF OID OPTIONAL,
# reqFlags [1] IMPLICIT BIT STRING OPTIONAL,
# mechToken [2] IMPLICIT OCTET STRING OPTIONAL,
# mechListMIC [3] IMPLICIT OCTET STRING OPTIONAL }
#
# [0] IMPLICIT SEQUENCE OF OID → a0 <len> <OID1> <OID2> (no inner 0x30 tag)
_MECH_TYPES_FIELD = _implicit_seqof(0, _SPNEGO_OID_DER, _KRB5_OID_DER)
# [2] IMPLICIT OCTET STRING → 82 <len> <bytes> (replaces 0x04 with 0x82)
_MECH_TOKEN_FIELD = _implicit_primitive(2, _octet(b"\x01\x02\x03"))
# NegTokenInit body (without outer SEQUENCE tag)
_NEG_TOKEN_INIT_BODY = _MECH_TYPES_FIELD + _MECH_TOKEN_FIELD
# NegTokenInit as a plain SEQUENCE (for NegTokenInit.from_der)
NEG_TOKEN_INIT_DER = _tlv(0x30, _NEG_TOKEN_INIT_BODY)
# NegotiationToken CHOICE, negTokenInit [0] IMPLICIT NegTokenInit:
# IMPLICIT [0] on SEQUENCE → 0xa0 replaces 0x30, same body
NEG_TOKEN_CHOICE_INIT_DER = _tlv(0xa0, _NEG_TOKEN_INIT_BODY)
# ── NegTokenResp ──────────────────────────────────────────────
#
# NegTokenResp ::= SEQUENCE {
# negState [0] IMPLICIT ENUMERATED OPTIONAL,
# supportedMech [1] IMPLICIT OID OPTIONAL,
# responseToken [2] IMPLICIT OCTET STRING OPTIONAL,
# mechListMIC [3] IMPLICIT OCTET STRING OPTIONAL }
#
# [0] IMPLICIT ENUMERATED → 80 01 <value> (replaces 0x0a with 0x80)
_NEG_STATE_FIELD = _implicit_primitive(0, bytes([0x0a, 0x01, 0x00])) # accept-completed
# [1] IMPLICIT OID → 81 <oid-value-len> <oid-value> (replaces 0x06 with 0x81)
_SUPPORTED_MECH_FIELD = _implicit_primitive(1, _KRB5_OID_DER)
# NegTokenResp body (without outer SEQUENCE tag)
_NEG_TOKEN_RESP_BODY = _NEG_STATE_FIELD + _SUPPORTED_MECH_FIELD
# NegTokenResp as a plain SEQUENCE (for NegTokenResp.from_der)
NEG_TOKEN_RESP_DER = _tlv(0x30, _NEG_TOKEN_RESP_BODY)
# NegotiationToken CHOICE, negTokenResp [1] IMPLICIT NegTokenResp:
# IMPLICIT [1] on SEQUENCE → 0xa1 replaces 0x30, same body
NEG_TOKEN_CHOICE_RESP_DER = _tlv(0xa1, _NEG_TOKEN_RESP_BODY)
# ── NegTokenResp with responseToken ──────────────────────────
_NEG_STATE_REQ_MIC = _implicit_primitive(0, bytes([0x0a, 0x01, 0x03])) # request-mic
_RESPONSE_TOKEN_FIELD = _implicit_primitive(2, _octet(b"\xde\xad\xbe\xef"))
NEG_TOKEN_RESP_WITH_TOKEN_DER = _tlv(
0x30,
_NEG_STATE_REQ_MIC + _SUPPORTED_MECH_FIELD + _RESPONSE_TOKEN_FIELD,
)
# ── Full GSSAPI-wrapped SPNEGO token ──────────────────────────
#
# 60 <len>
# 06 <len> <SPNEGO OID> thisMech
# a0 <len> ... NegotiationToken CHOICE (negTokenInit arm)
GSSAPI_SPNEGO_DER = _tlv(0x60, _SPNEGO_OID_DER + NEG_TOKEN_CHOICE_INIT_DER)
# ── Demo functions ────────────────────────────────────────────
def demo_neg_token_init():
section("NegTokenInit — RFC 4178 §4.2.1")
tok = spnego.NegTokenInit.from_der(NEG_TOKEN_INIT_DER)
# mechTypes is a list of OID dot-notation strings
assert isinstance(tok.mech_types, list)
assert len(tok.mech_types) == 2
assert tok.mech_types[0] == "1.3.6.1.5.5.2"
assert tok.mech_types[1] == "1.2.840.113554.1.2.2"
print(f" mech_types: {tok.mech_types}")
# mechToken is the raw token bytes for the first (preferred) mechanism
assert tok.mech_token == b"\x01\x02\x03"
print(f" mech_token ({len(tok.mech_token)} bytes): {tok.mech_token.hex()}")
# reqFlags and mechListMIC are absent
assert tok.req_flags is None
assert tok.mech_list_mic is None
print(f" req_flags: {tok.req_flags}")
print(f" mech_list_mic: {tok.mech_list_mic}")
print(f" repr: {repr(tok)}")
def demo_neg_token_init_empty():
section("NegTokenInit — all OPTIONAL fields absent")
# Minimal NegTokenInit: empty SEQUENCE
empty_seq = b"\x30\x00"
tok = spnego.NegTokenInit.from_der(empty_seq)
assert tok.mech_types == []
assert tok.req_flags is None
assert tok.mech_token is None
assert tok.mech_list_mic is None
print(f" mech_types: {tok.mech_types} (empty list)")
print(f" mech_token: {tok.mech_token}")
print(f" mech_list_mic: {tok.mech_list_mic}")
print(f" repr: {repr(tok)}")
def demo_negotiation_token_init_arm():
section("NegotiationToken CHOICE — negTokenInit arm (0xa0)")
# Parse from raw CHOICE form (0xa0 leading byte)
tok = spnego.NegotiationToken.from_der(NEG_TOKEN_CHOICE_INIT_DER)
assert tok.variant == "NegTokenInit"
assert tok.neg_token_resp is None
init = tok.neg_token_init
assert init is not None
assert init.mech_types[0] == "1.3.6.1.5.5.2"
assert init.mech_token == b"\x01\x02\x03"
print(f" variant: {tok.variant}")
print(f" neg_token_init.mech_types: {init.mech_types}")
print(f" neg_token_init.mech_token: {init.mech_token.hex()}")
print(f" repr: {repr(tok)}")
def demo_negotiation_token_resp_arm():
section("NegotiationToken CHOICE — negTokenResp arm (0xa1)")
tok = spnego.NegotiationToken.from_der(NEG_TOKEN_CHOICE_RESP_DER)
assert tok.variant == "NegTokenResp"
assert tok.neg_token_init is None
resp = tok.neg_token_resp
assert resp is not None
assert resp.neg_state == spnego.NEG_STATE_ACCEPT_COMPLETED
assert resp.supported_mech == "1.2.840.113554.1.2.2"
assert resp.response_token is None
assert resp.mech_list_mic is None
print(f" variant: {tok.variant}")
print(f" neg_token_resp.neg_state: {resp.neg_state} (accept-completed)")
print(f" neg_token_resp.supported_mech: {resp.supported_mech}")
print(f" repr: {repr(tok)}")
def demo_gssapi_wrapped():
section("Full GSSAPI-wrapped SPNEGO token (0x60 APPLICATION tag)")
# NegotiationToken.from_der handles the GSSAPI [APPLICATION 0] wrapper
# transparently — the leading 0x60 tag triggers automatic stripping of
# the OID prefix before decoding the inner NegotiationToken CHOICE.
tok = spnego.NegotiationToken.from_der(GSSAPI_SPNEGO_DER)
assert tok.variant == "NegTokenInit"
init = tok.neg_token_init
assert init is not None
assert "1.3.6.1.5.5.2" in init.mech_types
assert init.mech_token == b"\x01\x02\x03"
print(f" GSSAPI token length: {len(GSSAPI_SPNEGO_DER)} bytes")
print(f" leading byte: 0x{GSSAPI_SPNEGO_DER[0]:02x} (APPLICATION 0)")
print(f" variant: {tok.variant}")
print(f" mech_types: {init.mech_types}")
print(f" mech_token: {init.mech_token.hex()}")
# Verify that parsing the same token in its plain CHOICE form gives the
# same result (NegotiationToken also accepts 0xa0/0xa1 directly).
tok2 = spnego.NegotiationToken.from_der(NEG_TOKEN_CHOICE_INIT_DER)
assert tok2.variant == tok.variant
assert tok2.neg_token_init.mech_types == init.mech_types
print(f" plain-CHOICE parse matches GSSAPI-wrapped parse ✓")
def demo_neg_token_resp_with_response_token():
section("NegTokenResp — with responseToken and request-mic state")
tok = spnego.NegTokenResp.from_der(NEG_TOKEN_RESP_WITH_TOKEN_DER)
assert tok.neg_state == spnego.NEG_STATE_REQUEST_MIC
assert tok.supported_mech == "1.2.840.113554.1.2.2"
assert tok.response_token == b"\xde\xad\xbe\xef"
assert tok.mech_list_mic is None
print(f" neg_state: {tok.neg_state} (request-mic)")
print(f" supported_mech: {tok.supported_mech}")
print(f" response_token: {tok.response_token.hex()}")
print(f" mech_list_mic: {tok.mech_list_mic}")
print(f" repr: {repr(tok)}")
def demo_neg_state_constants():
section("NEG_STATE_* constants — RFC 4178 §4.2.2")
print(f" NEG_STATE_ACCEPT_COMPLETED = {spnego.NEG_STATE_ACCEPT_COMPLETED}")
print(f" NEG_STATE_ACCEPT_INCOMPLETE = {spnego.NEG_STATE_ACCEPT_INCOMPLETE}")
print(f" NEG_STATE_REJECT = {spnego.NEG_STATE_REJECT}")
print(f" NEG_STATE_REQUEST_MIC = {spnego.NEG_STATE_REQUEST_MIC}")
print(f" SPNEGO_OID = {spnego.SPNEGO_OID}")
assert spnego.NEG_STATE_ACCEPT_COMPLETED == 0
assert spnego.NEG_STATE_ACCEPT_INCOMPLETE == 1
assert spnego.NEG_STATE_REJECT == 2
assert spnego.NEG_STATE_REQUEST_MIC == 3
assert spnego.SPNEGO_OID == "1.3.6.1.5.5.2"
print(f" all constant values verified ✓")
# Typical usage pattern: compare neg_state against a constant
resp = spnego.NegTokenResp.from_der(NEG_TOKEN_RESP_DER)
if resp.neg_state == spnego.NEG_STATE_ACCEPT_COMPLETED:
print(f" negotiation accepted — state matches NEG_STATE_ACCEPT_COMPLETED ✓")
else:
raise AssertionError("unexpected neg_state")
def demo_repr():
section("repr() — readable representations")
init = spnego.NegTokenInit.from_der(NEG_TOKEN_INIT_DER)
resp = spnego.NegTokenResp.from_der(NEG_TOKEN_RESP_DER)
tok_init = spnego.NegotiationToken.from_der(NEG_TOKEN_CHOICE_INIT_DER)
tok_resp = spnego.NegotiationToken.from_der(NEG_TOKEN_CHOICE_RESP_DER)
print(f" NegTokenInit: {repr(init)}")
print(f" NegTokenResp: {repr(resp)}")
print(f" NegotiationToken[0]: {repr(tok_init)}")
print(f" NegotiationToken[1]: {repr(tok_resp)}")
def main():
print("=" * 60)
print("Example 21: SPNEGO negotiation tokens (RFC 4178)")
print("=" * 60)
demo_neg_token_init()
demo_neg_token_init_empty()
demo_negotiation_token_init_arm()
demo_negotiation_token_resp_arm()
demo_gssapi_wrapped()
demo_neg_token_resp_with_response_token()
demo_neg_state_constants()
demo_repr()
print("\nAll SPNEGO examples completed.")
if __name__ == "__main__":
main()
```