import asyncio
import json
import time
import threading
import anyio
from knafeh._native import RpcServer, ServiceHandler, TlsConfig
from knafeh.client import Client, SyncClient
_proto_server_addr = None
_proto_server_addr_2 = None
_servers_ready = threading.Event()
def _make_certs():
import tempfile, subprocess, os
d = tempfile.mkdtemp()
cert = os.path.join(d, "cert.pem")
key = os.path.join(d, "key.pem")
subprocess.run([
"openssl", "req", "-x509", "-newkey", "ec",
"-pkeyopt", "ec_paramgen_curve:prime256v1",
"-keyout", key, "-out", cert,
"-days", "1", "-nodes", "-subj", "/CN=localhost",
], check=True, capture_output=True)
return cert, key, d
_cert_path, _key_path, _cert_dir = _make_certs()
def _run_servers():
global _proto_server_addr, _proto_server_addr_2
tls = TlsConfig.server(_cert_path, _key_path)
server1 = RpcServer("127.0.0.1:14433", tls)
handler1 = ServiceHandler("echo")
handler1.add_unary_handler("echo", lambda req: req)
server1.add_service(handler1)
server2 = RpcServer("127.0.0.1:14434", tls)
handler2 = ServiceHandler("echo")
handler2.add_unary_handler("echo", lambda req: req)
server2.add_service(handler2)
_proto_server_addr = "127.0.0.1:14433"
_proto_server_addr_2 = "127.0.0.1:14434"
_servers_ready.set()
async def serve_both():
async with anyio.create_task_group() as tg:
tg.start_soon(_start, server1)
tg.start_soon(_start, server2)
asyncio.run(serve_both())
async def _start(server):
await server.start()
_server_thread = threading.Thread(target=_run_servers, daemon=True)
_server_thread.start()
_servers_ready.wait(timeout=10)
time.sleep(1)
def test_sync_unary_echo():
with SyncClient(_proto_server_addr, tls=TlsConfig.client_insecure()) as client:
body = b"hello protobuf"
resp = client.call("echo/echo", body)
assert resp == body
def test_sync_unary_multiple():
with SyncClient(_proto_server_addr, tls=TlsConfig.client_insecure()) as client:
for i in range(10):
body = f"msg-{i}".encode()
resp = client.call("echo/echo", body)
assert resp == body
def test_sync_unary_large_payload():
with SyncClient(_proto_server_addr, tls=TlsConfig.client_insecure()) as client:
body = b"x" * 100_000
resp = client.call("echo/echo", body)
assert len(resp) == 100_000
def test_async_unary_echo():
async def run():
async with Client(_proto_server_addr, tls=TlsConfig.client_insecure()) as client:
resp = await client.call("echo/echo", b"hello async proto")
assert resp == b"hello async proto"
anyio.run(run)
def test_async_unary_multiple():
async def run():
async with Client(_proto_server_addr, tls=TlsConfig.client_insecure()) as client:
for i in range(10):
body = f"msg-{i}".encode()
resp = await client.call("echo/echo", body)
assert resp == body
anyio.run(run)
def test_async_concurrent_task_group():
async def run():
async with Client(_proto_server_addr, tls=TlsConfig.client_insecure()) as client:
results = []
async def one():
resp = await client.call("echo/echo", b"conc")
results.append(resp)
async with anyio.create_task_group() as tg:
for _ in range(50):
tg.start_soon(one)
assert len(results) == 50
anyio.run(run)
def test_async_call_many():
async def run():
async with Client(_proto_server_addr, tls=TlsConfig.client_insecure()) as client:
bodies = [f"item-{i}".encode() for i in range(50)]
results = await client.call_many("echo/echo", bodies)
assert len(results) == 50
for i, resp in enumerate(results):
assert resp == f"item-{i}".encode()
anyio.run(run)
def test_sync_throughput():
with SyncClient(_proto_server_addr, tls=TlsConfig.client_insecure()) as client:
body = b"bench"
start = time.perf_counter()
for _ in range(50):
client.call("echo/echo", body)
elapsed = time.perf_counter() - start
print(f"\n [sync protobuf] 50 sequential: {elapsed:.3f}s ({50/elapsed:.0f} rpc/s)")
def test_async_throughput_sequential():
async def run():
async with Client(_proto_server_addr, tls=TlsConfig.client_insecure()) as client:
body = b"bench"
start = time.perf_counter()
for _ in range(50):
await client.call("echo/echo", body)
elapsed = time.perf_counter() - start
print(f"\n [async seq protobuf] 50 sequential: {elapsed:.3f}s ({50/elapsed:.0f} rpc/s)")
anyio.run(run)
def test_async_throughput_concurrent():
async def run():
async with Client(_proto_server_addr, tls=TlsConfig.client_insecure()) as client:
body = b"bench"
bodies = [body] * 50
start = time.perf_counter()
await client.call_many("echo/echo", bodies)
elapsed = time.perf_counter() - start
print(f"\n [async call_many protobuf] 50 concurrent: {elapsed:.3f}s ({50/elapsed:.0f} rpc/s)")
anyio.run(run)
def test_benchmark_comparison():
N = 100
proto_body = b"bench payload"
json_body = json.dumps({"bench": True}).encode()
with SyncClient(_proto_server_addr, tls=TlsConfig.client_insecure()) as client:
start = time.perf_counter()
for _ in range(N):
client.call("echo/echo", proto_body)
sync_proto_time = time.perf_counter() - start
async def async_seq_proto():
async with Client(_proto_server_addr, tls=TlsConfig.client_insecure()) as client:
start = time.perf_counter()
for _ in range(N):
await client.call("echo/echo", proto_body)
return time.perf_counter() - start
async_seq_proto_time = anyio.run(async_seq_proto)
async def async_conc_proto():
async with Client(_proto_server_addr, tls=TlsConfig.client_insecure()) as client:
start = time.perf_counter()
await client.call_many("echo/echo", [proto_body] * N)
return time.perf_counter() - start
async_conc_proto_time = anyio.run(async_conc_proto)
print(f"\n === Python E2E Benchmark — Protobuf codec ({N} calls) ===")
print(f" Sync sequential: {sync_proto_time:.3f}s ({N/sync_proto_time:.0f} rpc/s)")
print(f" Async sequential: {async_seq_proto_time:.3f}s ({N/async_seq_proto_time:.0f} rpc/s)")
print(f" Async call_many: {async_conc_proto_time:.3f}s ({N/async_conc_proto_time:.0f} rpc/s)")