from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import struct
import os, sys
import binascii
import subprocess
try:
import curve25519
curve25519mod = curve25519.keys
except ImportError:
curve25519 = None
import slownacl_curve25519
curve25519mod = slownacl_curve25519
import hashlib
try:
import sha3
except ImportError:
sha3 = None
try:
from hashlib import sha3_256, shake_256
shake_squeeze = shake_256.digest
except ImportError:
if hasattr(sha3, "SHA3256"):
sha3_256 = sha3.SHA3256
shake_256 = sha3.SHAKE256
shake_squeeze = shake_256.squeeze
else:
sys.exit(77)
from ntor_ref import hash_nil
from ntor_ref import PrivateKey
PROTOID = b"tor-hs-ntor-curve25519-sha3-256-1"
T_HSENC = PROTOID + b":hs_key_extract"
T_HSVERIFY = PROTOID + b":hs_verify"
T_HSMAC = PROTOID + b":hs_mac"
M_HSEXPAND = PROTOID + b":hs_key_expand"
INTRO_SECRET_LEN = 161
REND_SECRET_LEN = 225
AUTH_INPUT_LEN = 199
def mac(k,m):
def htonll(num):
return struct.pack('!q', num)
s = sha3_256()
s.update(htonll(len(k)))
s.update(k)
s.update(m)
return s.digest()
def intro2_ntor_client(intro_auth_pubkey_str, intro_enc_pubkey,
client_ephemeral_enc_pubkey, client_ephemeral_enc_privkey, subcredential):
dh_result = client_ephemeral_enc_privkey.get_shared_key(intro_enc_pubkey, hash_nil)
secret = dh_result + intro_auth_pubkey_str + client_ephemeral_enc_pubkey.serialize() + intro_enc_pubkey.serialize() + PROTOID
assert(len(secret) == INTRO_SECRET_LEN)
info = M_HSEXPAND + subcredential
kdf = shake_256()
kdf.update(secret + T_HSENC + info)
key_material = shake_squeeze(kdf, 64*8)
enc_key = key_material[0:32]
mac_key = key_material[32:64]
return enc_key, mac_key
def client_part1(intro_auth_pubkey_str, intro_enc_pubkey,
client_ephemeral_enc_pubkey, client_ephemeral_enc_privkey, subcredential):
enc_key, mac_key = intro2_ntor_client(intro_auth_pubkey_str, intro_enc_pubkey, client_ephemeral_enc_pubkey, client_ephemeral_enc_privkey, subcredential)
assert(enc_key)
assert(mac_key)
return enc_key, mac_key
def intro2_ntor_service(intro_auth_pubkey_str, client_enc_pubkey, service_enc_privkey, service_enc_pubkey, subcredential):
dh_result = service_enc_privkey.get_shared_key(client_enc_pubkey, hash_nil)
secret = dh_result + intro_auth_pubkey_str + client_enc_pubkey.serialize() + service_enc_pubkey.serialize() + PROTOID
assert(len(secret) == INTRO_SECRET_LEN)
info = M_HSEXPAND + subcredential
kdf = shake_256()
kdf.update(secret + T_HSENC + info)
key_material = shake_squeeze(kdf, 64*8)
enc_key = key_material[0:32]
mac_key = key_material[32:64]
return enc_key, mac_key
def service_part1(intro_auth_pubkey_str, client_enc_pubkey, intro_enc_privkey, intro_enc_pubkey, subcredential):
intro_enc_key, intro_mac_key = intro2_ntor_service(intro_auth_pubkey_str, client_enc_pubkey, intro_enc_privkey, intro_enc_pubkey, subcredential)
assert(intro_enc_key)
assert(intro_mac_key)
service_ephemeral_privkey = PrivateKey()
service_ephemeral_pubkey = service_ephemeral_privkey.get_public()
dh_result1 = service_ephemeral_privkey.get_shared_key(client_enc_pubkey, hash_nil)
dh_result2 = intro_enc_privkey.get_shared_key(client_enc_pubkey, hash_nil)
rend_secret_hs_input = dh_result1 + dh_result2 + intro_auth_pubkey_str + intro_enc_pubkey.serialize() + client_enc_pubkey.serialize() + service_ephemeral_pubkey.serialize() + PROTOID
assert(len(rend_secret_hs_input) == REND_SECRET_LEN)
ntor_key_seed = mac(rend_secret_hs_input, T_HSENC)
verify = mac(rend_secret_hs_input, T_HSVERIFY)
auth_input = verify + intro_auth_pubkey_str + intro_enc_pubkey.serialize() + service_ephemeral_pubkey.serialize() + client_enc_pubkey.serialize() + PROTOID + b"Server"
assert(len(auth_input) == AUTH_INPUT_LEN)
auth_input_mac = mac(auth_input, T_HSMAC)
assert(ntor_key_seed)
assert(auth_input_mac)
assert(service_ephemeral_pubkey)
return intro_enc_key, intro_mac_key, ntor_key_seed, auth_input_mac, service_ephemeral_pubkey
def client_part2(intro_auth_pubkey_str, client_ephemeral_enc_pubkey, client_ephemeral_enc_privkey,
intro_enc_pubkey, service_ephemeral_rend_pubkey):
dh_result1 = client_ephemeral_enc_privkey.get_shared_key(service_ephemeral_rend_pubkey, hash_nil)
dh_result2 = client_ephemeral_enc_privkey.get_shared_key(intro_enc_pubkey, hash_nil)
rend_secret_hs_input = dh_result1 + dh_result2 + intro_auth_pubkey_str + intro_enc_pubkey.serialize() + client_ephemeral_enc_pubkey.serialize() + service_ephemeral_rend_pubkey.serialize() + PROTOID
assert(len(rend_secret_hs_input) == REND_SECRET_LEN)
ntor_key_seed = mac(rend_secret_hs_input, T_HSENC)
verify = mac(rend_secret_hs_input, T_HSVERIFY)
auth_input = verify + intro_auth_pubkey_str + intro_enc_pubkey.serialize() + service_ephemeral_rend_pubkey.serialize() + client_ephemeral_enc_pubkey.serialize() + PROTOID + b"Server"
assert(len(auth_input) == AUTH_INPUT_LEN)
auth_input_mac = mac(auth_input, T_HSMAC)
assert(ntor_key_seed)
assert(auth_input_mac)
return ntor_key_seed, auth_input_mac
PROG = "./src/test/test-hs-ntor-cl"
if sys.version_info[0] >= 3:
enhex=lambda s: binascii.b2a_hex(s).decode("ascii")
else:
enhex=lambda s: binascii.b2a_hex(s)
dehex=lambda s: binascii.a2b_hex(s.strip())
def tor_client1(intro_auth_pubkey_str, intro_enc_pubkey,
client_ephemeral_enc_privkey, subcredential):
p = subprocess.Popen([PROG, "client1",
enhex(intro_auth_pubkey_str),
enhex(intro_enc_pubkey.serialize()),
enhex(client_ephemeral_enc_privkey.serialize()),
enhex(subcredential)],
stdout=subprocess.PIPE)
return map(dehex, p.stdout.readlines())
def tor_server1(intro_auth_pubkey_str, intro_enc_privkey,
client_ephemeral_enc_pubkey, subcredential):
p = subprocess.Popen([PROG, "server1",
enhex(intro_auth_pubkey_str),
enhex(intro_enc_privkey.serialize()),
enhex(client_ephemeral_enc_pubkey.serialize()),
enhex(subcredential)],
stdout=subprocess.PIPE)
return map(dehex, p.stdout.readlines())
def tor_client2(intro_auth_pubkey_str, client_ephemeral_enc_privkey,
intro_enc_pubkey, service_ephemeral_rend_pubkey, subcredential):
p = subprocess.Popen([PROG, "client2",
enhex(intro_auth_pubkey_str),
enhex(client_ephemeral_enc_privkey.serialize()),
enhex(intro_enc_pubkey.serialize()),
enhex(service_ephemeral_rend_pubkey.serialize()),
enhex(subcredential)],
stdout=subprocess.PIPE)
return map(dehex, p.stdout.readlines())
def do_pure_python_ntor_test():
client_ephemeral_enc_privkey = PrivateKey()
client_ephemeral_enc_pubkey = client_ephemeral_enc_privkey.get_public()
intro_enc_privkey = PrivateKey()
intro_enc_pubkey = intro_enc_privkey.get_public()
intro_auth_pubkey_str = os.urandom(32)
subcredential = os.urandom(32)
client_enc_key, client_mac_key = client_part1(intro_auth_pubkey_str, intro_enc_pubkey, client_ephemeral_enc_pubkey, client_ephemeral_enc_privkey, subcredential)
service_enc_key, service_mac_key, service_ntor_key_seed, service_auth_input_mac, service_ephemeral_pubkey = service_part1(intro_auth_pubkey_str, client_ephemeral_enc_pubkey, intro_enc_privkey, intro_enc_pubkey, subcredential)
assert(client_enc_key == service_enc_key)
assert(client_mac_key == service_mac_key)
client_ntor_key_seed, client_auth_input_mac = client_part2(intro_auth_pubkey_str, client_ephemeral_enc_pubkey, client_ephemeral_enc_privkey,
intro_enc_pubkey, service_ephemeral_pubkey)
assert(client_ntor_key_seed == service_ntor_key_seed)
assert(client_auth_input_mac == service_auth_input_mac)
print("DONE: python dance [%s]" % repr(client_auth_input_mac))
def do_little_t_tor_ntor_test():
subcredential = os.urandom(32)
client_ephemeral_enc_privkey = PrivateKey()
client_ephemeral_enc_pubkey = client_ephemeral_enc_privkey.get_public()
intro_enc_privkey = PrivateKey()
intro_enc_pubkey = intro_enc_privkey.get_public() intro_auth_pubkey_str = os.urandom(32)
client_enc_key, client_mac_key = tor_client1(intro_auth_pubkey_str, intro_enc_pubkey,
client_ephemeral_enc_privkey, subcredential)
assert(client_enc_key)
assert(client_mac_key)
service_enc_key, service_mac_key, service_ntor_auth_mac, service_ntor_key_seed, service_eph_pubkey = tor_server1(intro_auth_pubkey_str,
intro_enc_privkey,
client_ephemeral_enc_pubkey,
subcredential)
assert(service_enc_key)
assert(service_mac_key)
assert(service_ntor_auth_mac)
assert(service_ntor_key_seed)
assert(client_enc_key == service_enc_key)
assert(client_mac_key == service_mac_key)
service_eph_pubkey = curve25519mod.Public(service_eph_pubkey)
client_ntor_auth_mac, client_ntor_key_seed = tor_client2(intro_auth_pubkey_str, client_ephemeral_enc_privkey,
intro_enc_pubkey, service_eph_pubkey, subcredential)
assert(client_ntor_auth_mac)
assert(client_ntor_key_seed)
assert(client_ntor_key_seed == service_ntor_key_seed)
assert(client_ntor_auth_mac == service_ntor_auth_mac)
print("DONE: tor dance [%s]" % repr(client_ntor_auth_mac))
def do_first_mixed_test():
subcredential = os.urandom(32)
client_ephemeral_enc_privkey = PrivateKey()
client_ephemeral_enc_pubkey = client_ephemeral_enc_privkey.get_public()
intro_enc_privkey = PrivateKey()
intro_enc_pubkey = intro_enc_privkey.get_public()
intro_auth_pubkey_str = os.urandom(32)
client_enc_key, client_mac_key = client_part1(intro_auth_pubkey_str, intro_enc_pubkey,
client_ephemeral_enc_pubkey, client_ephemeral_enc_privkey,
subcredential)
service_enc_key, service_mac_key, service_ntor_auth_mac, service_ntor_key_seed, service_eph_pubkey = tor_server1(intro_auth_pubkey_str,
intro_enc_privkey,
client_ephemeral_enc_pubkey,
subcredential)
assert(service_enc_key)
assert(service_mac_key)
assert(service_ntor_auth_mac)
assert(service_ntor_key_seed)
assert(service_eph_pubkey)
assert(client_enc_key == service_enc_key)
assert(client_mac_key == service_mac_key)
service_eph_pubkey = curve25519mod.Public(service_eph_pubkey)
client_ntor_key_seed, client_auth_input_mac = client_part2(intro_auth_pubkey_str, client_ephemeral_enc_pubkey, client_ephemeral_enc_privkey,
intro_enc_pubkey, service_eph_pubkey)
assert(client_auth_input_mac == service_ntor_auth_mac)
assert(client_ntor_key_seed == service_ntor_key_seed)
print("DONE: 1st mixed dance [%s]" % repr(client_auth_input_mac))
def do_second_mixed_test():
subcredential = os.urandom(32)
client_ephemeral_enc_privkey = PrivateKey()
client_ephemeral_enc_pubkey = client_ephemeral_enc_privkey.get_public()
intro_enc_privkey = PrivateKey()
intro_enc_pubkey = intro_enc_privkey.get_public()
intro_auth_pubkey_str = os.urandom(32)
client_enc_key, client_mac_key = tor_client1(intro_auth_pubkey_str, intro_enc_pubkey,
client_ephemeral_enc_privkey, subcredential)
assert(client_enc_key)
assert(client_mac_key)
service_enc_key, service_mac_key, service_ntor_key_seed, service_ntor_auth_mac, service_ephemeral_pubkey = service_part1(intro_auth_pubkey_str, client_ephemeral_enc_pubkey, intro_enc_privkey, intro_enc_pubkey, subcredential)
client_ntor_auth_mac, client_ntor_key_seed = tor_client2(intro_auth_pubkey_str, client_ephemeral_enc_privkey,
intro_enc_pubkey, service_ephemeral_pubkey, subcredential)
assert(client_ntor_auth_mac)
assert(client_ntor_key_seed)
assert(client_ntor_key_seed == service_ntor_key_seed)
assert(client_ntor_auth_mac == service_ntor_auth_mac)
print("DONE: 2nd mixed dance [%s]" % repr(client_ntor_auth_mac))
def do_mixed_tests():
do_first_mixed_test()
do_second_mixed_test()
if __name__ == '__main__':
do_pure_python_ntor_test()
do_little_t_tor_ntor_test()
do_mixed_tests()