import synta
import synta.spnego as spnego
def section(title):
print(f"\n{'─' * 60}\n{title}\n{'─' * 60}")
def _enc(fn):
enc = synta.Encoder(synta.Encoding.DER)
fn(enc)
return enc.finish()
def _oid(dotted):
return _enc(lambda e: e.encode_oid(synta.ObjectIdentifier(dotted)))
def _octet(data):
return _enc(lambda e: e.encode_octet_string(data))
def _encode_length(n):
if n < 0x80:
return bytes([n])
elif n < 0x100:
return bytes([0x81, n])
elif n < 0x10000:
return bytes([0x82, (n >> 8) & 0xFF, n & 0xFF])
else:
raise ValueError(f"length {n} too large for this helper")
def _tlv(tag_byte, body):
return bytes([tag_byte]) + _encode_length(len(body)) + body
def _implicit_seqof(tag_n, *elem_ders):
return _tlv(0xa0 | tag_n, b"".join(elem_ders))
def _implicit_primitive(tag_n, universal_der):
return bytes([0x80 | tag_n]) + universal_der[1:]
_SPNEGO_OID_DER = _oid("1.3.6.1.5.5.2")
_KRB5_OID_DER = _oid("1.2.840.113554.1.2.2")
_MECH_TYPES_FIELD = _implicit_seqof(0, _SPNEGO_OID_DER, _KRB5_OID_DER)
_MECH_TOKEN_FIELD = _implicit_primitive(2, _octet(b"\x01\x02\x03"))
_NEG_TOKEN_INIT_BODY = _MECH_TYPES_FIELD + _MECH_TOKEN_FIELD
NEG_TOKEN_INIT_DER = _tlv(0x30, _NEG_TOKEN_INIT_BODY)
NEG_TOKEN_CHOICE_INIT_DER = _tlv(0xa0, _NEG_TOKEN_INIT_BODY)
_NEG_STATE_FIELD = _implicit_primitive(0, bytes([0x0a, 0x01, 0x00]))
_SUPPORTED_MECH_FIELD = _implicit_primitive(1, _KRB5_OID_DER)
_NEG_TOKEN_RESP_BODY = _NEG_STATE_FIELD + _SUPPORTED_MECH_FIELD
NEG_TOKEN_RESP_DER = _tlv(0x30, _NEG_TOKEN_RESP_BODY)
NEG_TOKEN_CHOICE_RESP_DER = _tlv(0xa1, _NEG_TOKEN_RESP_BODY)
_NEG_STATE_REQ_MIC = _implicit_primitive(0, bytes([0x0a, 0x01, 0x03])) _RESPONSE_TOKEN_FIELD = _implicit_primitive(2, _octet(b"\xde\xad\xbe\xef"))
NEG_TOKEN_RESP_WITH_TOKEN_DER = _tlv(
0x30,
_NEG_STATE_REQ_MIC + _SUPPORTED_MECH_FIELD + _RESPONSE_TOKEN_FIELD,
)
GSSAPI_SPNEGO_DER = _tlv(0x60, _SPNEGO_OID_DER + NEG_TOKEN_CHOICE_INIT_DER)
def demo_neg_token_init():
section("NegTokenInit — RFC 4178 §4.2.1")
tok = spnego.NegTokenInit.from_der(NEG_TOKEN_INIT_DER)
assert isinstance(tok.mech_types, list)
assert len(tok.mech_types) == 2
assert tok.mech_types[0] == "1.3.6.1.5.5.2"
assert tok.mech_types[1] == "1.2.840.113554.1.2.2"
print(f" mech_types: {tok.mech_types}")
assert tok.mech_token == b"\x01\x02\x03"
print(f" mech_token ({len(tok.mech_token)} bytes): {tok.mech_token.hex()}")
assert tok.req_flags is None
assert tok.mech_list_mic is None
print(f" req_flags: {tok.req_flags}")
print(f" mech_list_mic: {tok.mech_list_mic}")
print(f" repr: {repr(tok)}")
def demo_neg_token_init_empty():
section("NegTokenInit — all OPTIONAL fields absent")
empty_seq = b"\x30\x00"
tok = spnego.NegTokenInit.from_der(empty_seq)
assert tok.mech_types == []
assert tok.req_flags is None
assert tok.mech_token is None
assert tok.mech_list_mic is None
print(f" mech_types: {tok.mech_types} (empty list)")
print(f" mech_token: {tok.mech_token}")
print(f" mech_list_mic: {tok.mech_list_mic}")
print(f" repr: {repr(tok)}")
def demo_negotiation_token_init_arm():
section("NegotiationToken CHOICE — negTokenInit arm (0xa0)")
tok = spnego.NegotiationToken.from_der(NEG_TOKEN_CHOICE_INIT_DER)
assert tok.variant == "NegTokenInit"
assert tok.neg_token_resp is None
init = tok.neg_token_init
assert init is not None
assert init.mech_types[0] == "1.3.6.1.5.5.2"
assert init.mech_token == b"\x01\x02\x03"
print(f" variant: {tok.variant}")
print(f" neg_token_init.mech_types: {init.mech_types}")
print(f" neg_token_init.mech_token: {init.mech_token.hex()}")
print(f" repr: {repr(tok)}")
def demo_negotiation_token_resp_arm():
section("NegotiationToken CHOICE — negTokenResp arm (0xa1)")
tok = spnego.NegotiationToken.from_der(NEG_TOKEN_CHOICE_RESP_DER)
assert tok.variant == "NegTokenResp"
assert tok.neg_token_init is None
resp = tok.neg_token_resp
assert resp is not None
assert resp.neg_state == spnego.NEG_STATE_ACCEPT_COMPLETED
assert resp.supported_mech == "1.2.840.113554.1.2.2"
assert resp.response_token is None
assert resp.mech_list_mic is None
print(f" variant: {tok.variant}")
print(f" neg_token_resp.neg_state: {resp.neg_state} (accept-completed)")
print(f" neg_token_resp.supported_mech: {resp.supported_mech}")
print(f" repr: {repr(tok)}")
def demo_gssapi_wrapped():
section("Full GSSAPI-wrapped SPNEGO token (0x60 APPLICATION tag)")
tok = spnego.NegotiationToken.from_der(GSSAPI_SPNEGO_DER)
assert tok.variant == "NegTokenInit"
init = tok.neg_token_init
assert init is not None
assert "1.3.6.1.5.5.2" in init.mech_types
assert init.mech_token == b"\x01\x02\x03"
print(f" GSSAPI token length: {len(GSSAPI_SPNEGO_DER)} bytes")
print(f" leading byte: 0x{GSSAPI_SPNEGO_DER[0]:02x} (APPLICATION 0)")
print(f" variant: {tok.variant}")
print(f" mech_types: {init.mech_types}")
print(f" mech_token: {init.mech_token.hex()}")
tok2 = spnego.NegotiationToken.from_der(NEG_TOKEN_CHOICE_INIT_DER)
assert tok2.variant == tok.variant
assert tok2.neg_token_init.mech_types == init.mech_types
print(f" plain-CHOICE parse matches GSSAPI-wrapped parse ✓")
def demo_neg_token_resp_with_response_token():
section("NegTokenResp — with responseToken and request-mic state")
tok = spnego.NegTokenResp.from_der(NEG_TOKEN_RESP_WITH_TOKEN_DER)
assert tok.neg_state == spnego.NEG_STATE_REQUEST_MIC
assert tok.supported_mech == "1.2.840.113554.1.2.2"
assert tok.response_token == b"\xde\xad\xbe\xef"
assert tok.mech_list_mic is None
print(f" neg_state: {tok.neg_state} (request-mic)")
print(f" supported_mech: {tok.supported_mech}")
print(f" response_token: {tok.response_token.hex()}")
print(f" mech_list_mic: {tok.mech_list_mic}")
print(f" repr: {repr(tok)}")
def demo_neg_state_constants():
section("NEG_STATE_* constants — RFC 4178 §4.2.2")
print(f" NEG_STATE_ACCEPT_COMPLETED = {spnego.NEG_STATE_ACCEPT_COMPLETED}")
print(f" NEG_STATE_ACCEPT_INCOMPLETE = {spnego.NEG_STATE_ACCEPT_INCOMPLETE}")
print(f" NEG_STATE_REJECT = {spnego.NEG_STATE_REJECT}")
print(f" NEG_STATE_REQUEST_MIC = {spnego.NEG_STATE_REQUEST_MIC}")
print(f" SPNEGO_OID = {spnego.SPNEGO_OID}")
assert spnego.NEG_STATE_ACCEPT_COMPLETED == 0
assert spnego.NEG_STATE_ACCEPT_INCOMPLETE == 1
assert spnego.NEG_STATE_REJECT == 2
assert spnego.NEG_STATE_REQUEST_MIC == 3
assert spnego.SPNEGO_OID == "1.3.6.1.5.5.2"
print(f" all constant values verified ✓")
resp = spnego.NegTokenResp.from_der(NEG_TOKEN_RESP_DER)
if resp.neg_state == spnego.NEG_STATE_ACCEPT_COMPLETED:
print(f" negotiation accepted — state matches NEG_STATE_ACCEPT_COMPLETED ✓")
else:
raise AssertionError("unexpected neg_state")
def demo_repr():
section("repr() — readable representations")
init = spnego.NegTokenInit.from_der(NEG_TOKEN_INIT_DER)
resp = spnego.NegTokenResp.from_der(NEG_TOKEN_RESP_DER)
tok_init = spnego.NegotiationToken.from_der(NEG_TOKEN_CHOICE_INIT_DER)
tok_resp = spnego.NegotiationToken.from_der(NEG_TOKEN_CHOICE_RESP_DER)
print(f" NegTokenInit: {repr(init)}")
print(f" NegTokenResp: {repr(resp)}")
print(f" NegotiationToken[0]: {repr(tok_init)}")
print(f" NegotiationToken[1]: {repr(tok_resp)}")
def main():
print("=" * 60)
print("Example 21: SPNEGO negotiation tokens (RFC 4178)")
print("=" * 60)
demo_neg_token_init()
demo_neg_token_init_empty()
demo_negotiation_token_init_arm()
demo_negotiation_token_resp_arm()
demo_gssapi_wrapped()
demo_neg_token_resp_with_response_token()
demo_neg_state_constants()
demo_repr()
print("\nAll SPNEGO examples completed.")
if __name__ == "__main__":
main()