knafeh 1.1.0

QUIC-based RPC library with Python bindings
Documentation
"""E2E tests for Python async and sync APIs against a real QUIC server.

Tests use the default Protobuf codec. Two Python servers are started
(ports 14433 and 14434) — both use the Rust default codec (Protobuf)
with Python echo handlers.

Follows SOP categories A-G.
"""

import asyncio
import json
import time
import threading

import anyio

from knafeh._native import RpcServer, ServiceHandler, TlsConfig
from knafeh.client import Client, SyncClient


# ---------------------------------------------------------------------------
# Test server setup
# ---------------------------------------------------------------------------

_proto_server_addr = None
_proto_server_addr_2 = None
_servers_ready = threading.Event()


def _make_certs():
    import tempfile, subprocess, os
    d = tempfile.mkdtemp()
    cert = os.path.join(d, "cert.pem")
    key = os.path.join(d, "key.pem")
    subprocess.run([
        "openssl", "req", "-x509", "-newkey", "ec",
        "-pkeyopt", "ec_paramgen_curve:prime256v1",
        "-keyout", key, "-out", cert,
        "-days", "1", "-nodes", "-subj", "/CN=localhost",
    ], check=True, capture_output=True)
    return cert, key, d


_cert_path, _key_path, _cert_dir = _make_certs()


def _run_servers():
    """Start two Protobuf-default echo servers (ports 14433, 14434)."""
    global _proto_server_addr, _proto_server_addr_2
    tls = TlsConfig.server(_cert_path, _key_path)

    server1 = RpcServer("127.0.0.1:14433", tls)
    handler1 = ServiceHandler("echo")
    handler1.add_unary_handler("echo", lambda req: req)
    server1.add_service(handler1)

    server2 = RpcServer("127.0.0.1:14434", tls)
    handler2 = ServiceHandler("echo")
    handler2.add_unary_handler("echo", lambda req: req)
    server2.add_service(handler2)

    _proto_server_addr = "127.0.0.1:14433"
    _proto_server_addr_2 = "127.0.0.1:14434"
    _servers_ready.set()

    async def serve_both():
        async with anyio.create_task_group() as tg:
            tg.start_soon(_start, server1)
            tg.start_soon(_start, server2)

    asyncio.run(serve_both())


async def _start(server):
    await server.start()


_server_thread = threading.Thread(target=_run_servers, daemon=True)
_server_thread.start()
_servers_ready.wait(timeout=10)
time.sleep(1)


# ---------------------------------------------------------------------------
# SOP Category A: Unary — Sync API (against Protobuf server, default codec)
# ---------------------------------------------------------------------------

def test_sync_unary_echo():
    """A1: Basic echo with protobuf codec (default)."""
    with SyncClient(_proto_server_addr, tls=TlsConfig.client_insecure()) as client:
        body = b"hello protobuf"
        resp = client.call("echo/echo", body)
        assert resp == body


def test_sync_unary_multiple():
    """A1: Multiple sequential calls."""
    with SyncClient(_proto_server_addr, tls=TlsConfig.client_insecure()) as client:
        for i in range(10):
            body = f"msg-{i}".encode()
            resp = client.call("echo/echo", body)
            assert resp == body


def test_sync_unary_large_payload():
    """A4: ~100KB payload."""
    with SyncClient(_proto_server_addr, tls=TlsConfig.client_insecure()) as client:
        body = b"x" * 100_000
        resp = client.call("echo/echo", body)
        assert len(resp) == 100_000


# ---------------------------------------------------------------------------
# SOP Category A: Unary — Async API
# ---------------------------------------------------------------------------

def test_async_unary_echo():
    """A1: Basic echo async."""
    async def run():
        async with Client(_proto_server_addr, tls=TlsConfig.client_insecure()) as client:
            resp = await client.call("echo/echo", b"hello async proto")
            assert resp == b"hello async proto"
    anyio.run(run)


def test_async_unary_multiple():
    async def run():
        async with Client(_proto_server_addr, tls=TlsConfig.client_insecure()) as client:
            for i in range(10):
                body = f"msg-{i}".encode()
                resp = await client.call("echo/echo", body)
                assert resp == body
    anyio.run(run)


# ---------------------------------------------------------------------------
# SOP Category C: Connection reuse + concurrency
# ---------------------------------------------------------------------------

def test_async_concurrent_task_group():
    """C2: 50 concurrent calls via anyio TaskGroup."""
    async def run():
        async with Client(_proto_server_addr, tls=TlsConfig.client_insecure()) as client:
            results = []
            async def one():
                resp = await client.call("echo/echo", b"conc")
                results.append(resp)
            async with anyio.create_task_group() as tg:
                for _ in range(50):
                    tg.start_soon(one)
            assert len(results) == 50
    anyio.run(run)


def test_async_call_many():
    """G2: call_many — batched concurrent calls."""
    async def run():
        async with Client(_proto_server_addr, tls=TlsConfig.client_insecure()) as client:
            bodies = [f"item-{i}".encode() for i in range(50)]
            results = await client.call_many("echo/echo", bodies)
            assert len(results) == 50
            for i, resp in enumerate(results):
                assert resp == f"item-{i}".encode()
    anyio.run(run)


# ---------------------------------------------------------------------------
# SOP Category G: Load + Performance benchmarks
# ---------------------------------------------------------------------------

def test_sync_throughput():
    """G1: Sync sequential throughput (protobuf)."""
    with SyncClient(_proto_server_addr, tls=TlsConfig.client_insecure()) as client:
        body = b"bench"
        start = time.perf_counter()
        for _ in range(50):
            client.call("echo/echo", body)
        elapsed = time.perf_counter() - start
        print(f"\n  [sync protobuf] 50 sequential: {elapsed:.3f}s ({50/elapsed:.0f} rpc/s)")


def test_async_throughput_sequential():
    """G1: Async sequential throughput (protobuf)."""
    async def run():
        async with Client(_proto_server_addr, tls=TlsConfig.client_insecure()) as client:
            body = b"bench"
            start = time.perf_counter()
            for _ in range(50):
                await client.call("echo/echo", body)
            elapsed = time.perf_counter() - start
            print(f"\n  [async seq protobuf] 50 sequential: {elapsed:.3f}s ({50/elapsed:.0f} rpc/s)")
    anyio.run(run)


def test_async_throughput_concurrent():
    """G2: Async concurrent throughput via call_many (protobuf)."""
    async def run():
        async with Client(_proto_server_addr, tls=TlsConfig.client_insecure()) as client:
            body = b"bench"
            bodies = [body] * 50
            start = time.perf_counter()
            await client.call_many("echo/echo", bodies)
            elapsed = time.perf_counter() - start
            print(f"\n  [async call_many protobuf] 50 concurrent: {elapsed:.3f}s ({50/elapsed:.0f} rpc/s)")
    anyio.run(run)


def test_benchmark_comparison():
    """Full benchmark: Protobuf vs JSON, sync vs async."""
    N = 100

    proto_body = b"bench payload"
    json_body = json.dumps({"bench": True}).encode()

    # --- Protobuf codec (default) ---

    with SyncClient(_proto_server_addr, tls=TlsConfig.client_insecure()) as client:
        start = time.perf_counter()
        for _ in range(N):
            client.call("echo/echo", proto_body)
        sync_proto_time = time.perf_counter() - start

    async def async_seq_proto():
        async with Client(_proto_server_addr, tls=TlsConfig.client_insecure()) as client:
            start = time.perf_counter()
            for _ in range(N):
                await client.call("echo/echo", proto_body)
            return time.perf_counter() - start

    async_seq_proto_time = anyio.run(async_seq_proto)

    async def async_conc_proto():
        async with Client(_proto_server_addr, tls=TlsConfig.client_insecure()) as client:
            start = time.perf_counter()
            await client.call_many("echo/echo", [proto_body] * N)
            return time.perf_counter() - start

    async_conc_proto_time = anyio.run(async_conc_proto)

    # --- JSON codec (via JSON server) ---
    # Note: Python client uses default (protobuf) codec, so JSON server
    # comparison uses the protobuf client against JSON server — this tests
    # codec mismatch handling. Instead, we benchmark protobuf only for Python
    # since the Python client doesn't expose codec selection.

    print(f"\n  === Python E2E Benchmark — Protobuf codec ({N} calls) ===")
    print(f"  Sync sequential:    {sync_proto_time:.3f}s  ({N/sync_proto_time:.0f} rpc/s)")
    print(f"  Async sequential:   {async_seq_proto_time:.3f}s  ({N/async_seq_proto_time:.0f} rpc/s)")
    print(f"  Async call_many:    {async_conc_proto_time:.3f}s  ({N/async_conc_proto_time:.0f} rpc/s)")