socks5-impl 0.9.1

Fundamental abstractions and async read / write functions for SOCKS5 protocol and Relatively low-level asynchronized SOCKS5 server implementation based on tokio
Documentation
#!/usr/bin/env python3
import os
import signal
import socket
import struct
import subprocess
import sys
import time

SOCKS5_VERSION = 0x05
CMD_BIND = 0x02
ATYP_IPV4 = 0x01
REP_SUCCEEDED = 0x00

PROXY_ADDR = ('127.0.0.1', 1081)
TARGET_ADDR = ('127.0.0.1', 0)


def recv_exact(sock, size):
    buf = b''
    while len(buf) < size:
        chunk = sock.recv(size - len(buf))
        if not chunk:
            raise EOFError('connection closed')
        buf += chunk
    return buf


def pack_address(addr):
    host, port = addr
    ip = socket.inet_aton(host)
    return b''.join([struct.pack('!B', ATYP_IPV4), ip, struct.pack('!H', port)])


def unpack_address(data):
    atyp = data[0]
    if atyp != ATYP_IPV4:
        raise ValueError('only ipv4 supported')
    ip = socket.inet_ntoa(data[1:5])
    port = struct.unpack('!H', data[5:7])[0]
    return ip, port


def wait_for_port(addr, timeout=10.0):
    deadline = time.time() + timeout
    while time.time() < deadline:
        try:
            sock = socket.create_connection(addr, timeout=1.0)
            sock.close()
            return True
        except OSError:
            time.sleep(0.2)
    return False


def start_proxy(repo_root):
    env = os.environ.copy()
    args = [
        'cargo', 'run', '--features', 'server', '--example', 's5-server', '--',
        '--listen-parameters', f'socks5://127.0.0.1:{PROXY_ADDR[1]}'
    ]
    proc = subprocess.Popen(
        args,
        cwd=repo_root,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True,
        preexec_fn=os.setsid,
        env=env,
    )
    return proc


def stop_proxy(proc):
    if proc.poll() is None:
        os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
        try:
            proc.wait(timeout=5)
        except subprocess.TimeoutExpired:
            os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
            proc.wait()


def main():
    repo_root = os.path.dirname(os.path.abspath(__file__))

    print('Building proxy example...')
    subprocess.run(
        ['cargo', 'build', '--features', 'server', '--example', 's5-server'],
        cwd=repo_root,
        check=True,
    )

    proxy_proc = start_proxy(repo_root)
    try:
        print('Waiting for proxy to listen...')
        if not wait_for_port(PROXY_ADDR, timeout=15.0):
            raise RuntimeError('proxy did not start in time')

        print('Proxy started, beginning BIND handshake')
        proxy = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        proxy.settimeout(10)
        proxy.connect(PROXY_ADDR)

        proxy.sendall(struct.pack('!BBB', SOCKS5_VERSION, 1, 0))

        data = recv_exact(proxy, 2)
        ver, method = data
        assert ver == SOCKS5_VERSION and method == 0

        addr = pack_address(TARGET_ADDR)
        proxy.sendall(struct.pack('!BBB', SOCKS5_VERSION, CMD_BIND, 0) + addr)

        data = recv_exact(proxy, 10)
        assert data[0] == SOCKS5_VERSION
        assert data[1] == REP_SUCCEEDED
        bind_ip, bind_port = unpack_address(data[3:10])
        print('Proxy bind address:', bind_ip, bind_port)

        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        client.settimeout(10)
        client.connect((bind_ip, bind_port))

        data = recv_exact(proxy, 10)
        assert data[0] == SOCKS5_VERSION
        assert data[1] == REP_SUCCEEDED
        print('Received second reply from proxy')

        hello = b'hello bind'
        client.sendall(hello)

        received = proxy.recv(1024)
        assert received == hello, f'expected {hello!r}, got {received!r}'
        print('Received on proxy stream:', received)

        proxy.close()
        client.close()
        print('BIND smoke test passed')
    finally:
        stop_proxy(proxy_proc)


if __name__ == '__main__':
    main()