import json
import anyio
import knafeh
from knafeh import Server, TlsConfig
async def main() -> None:
tls = TlsConfig.server("cert.pem", "key.pem")
server = Server("0.0.0.0:4433", tls=tls)
@server.service("greeter")
class Greeter:
def say_hello(self, request: bytes) -> bytes:
data = json.loads(request)
name = data.get("name", "World")
return json.dumps({"message": f"Hello, {name}!"}).encode()
def say_goodbye(self, request: bytes) -> bytes:
data = json.loads(request)
name = data.get("name", "World")
return json.dumps({"message": f"Goodbye, {name}!"}).encode()
@server.service("echo")
class Echo:
def echo(self, request: bytes) -> bytes:
return request
print("Starting Knafeh RPC server on 0.0.0.0:4433...")
await server.serve()
if __name__ == "__main__":
anyio.run(main, backend="trio")