import asyncio
import json
import os
import re
import threading
WS_PORT = int(os.environ.get("AFH_TEST_WS_PORT", "18081"))
WS_BASE = f"ws://127.0.0.1:{WS_PORT}"
async def handler(websocket):
path = websocket.request.path
m = re.match(r"^/ws/push/(\d+)$", path)
if m:
n = int(m.group(1))
for i in range(n):
await websocket.send(json.dumps({"seq": i}))
return
m = re.match(r"^/ws/push/(\d+)/(\d+)$", path)
if m:
n, delay_ms = int(m.group(1)), int(m.group(2))
for i in range(n):
await websocket.send(json.dumps({"seq": i}))
if i < n - 1:
await asyncio.sleep(delay_ms / 1000.0)
return
if path == "/ws/echo":
async for message in websocket:
await websocket.send(message)
return
if path == "/ws/binary":
await websocket.send(bytes(range(16)))
return
if path == "/ws/headers":
headers = {k: v for k, v in websocket.request.headers.items()}
await websocket.send(json.dumps(headers))
return
await websocket.close(1008, "unknown path")
def start_ws_server(port: int = WS_PORT) -> threading.Thread:
import websockets
ready = threading.Event()
async def _main():
async with websockets.serve(handler, "127.0.0.1", port, ping_interval=None):
ready.set()
await asyncio.Future()
def _run():
asyncio.run(_main())
thread = threading.Thread(target=_run, daemon=True)
thread.start()
if not ready.wait(timeout=5):
raise RuntimeError("WebSocket test server failed to start")
return thread
if __name__ == "__main__":
import sys
import websockets
port = int(sys.argv[1]) if len(sys.argv) > 1 else WS_PORT
print(f"WebSocket test server on ws://127.0.0.1:{port}", file=sys.stderr)
async def _serve():
async with websockets.serve(handler, "127.0.0.1", port, ping_interval=None):
await asyncio.Future()
asyncio.run(_serve())