import asyncio
import json
import sys
import os
import importlib.util
import traceback
from typing import Dict, Any, Optional, Callable
from dataclasses import dataclass
import websockets
import uuid
import logging
from datetime import datetime
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@dataclass
class ActorContext:
payload: Dict[str, Any]
config: Dict[str, Any]
state: Dict[str, Any]
actor_id: str
timestamp: int
def get_input(self, port: str, default=None):
return self.payload.get(port, default)
def send_async_output(self, port: str, data: Any):
pass
class Message:
@staticmethod
def string(value: str) -> Dict[str, Any]:
return {"type": "string", "value": value}
@staticmethod
def integer(value: int) -> Dict[str, Any]:
return {"type": "integer", "value": value}
@staticmethod
def float(value: float) -> Dict[str, Any]:
return {"type": "float", "value": value}
@staticmethod
def boolean(value: bool) -> Dict[str, Any]:
return {"type": "boolean", "value": value}
@staticmethod
def object(value: Dict) -> Dict[str, Any]:
return {"type": "object", "value": value}
@staticmethod
def array(value: list) -> Dict[str, Any]:
return {"type": "array", "value": value}
@staticmethod
def error(message: str) -> Dict[str, Any]:
return {"type": "error", "value": message}
class PythonRuntimeServer:
def __init__(self, host: str = "127.0.0.1", port: int = 8765):
self.host = host
self.port = port
self.actors: Dict[str, Callable] = {}
self.websocket = None
self.running = False
async def load_actor(self, file_path: str) -> bool:
try:
spec = importlib.util.spec_from_file_location("actor_module", file_path)
if spec is None or spec.loader is None:
logger.error(f"Failed to load module spec for {file_path}")
return False
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
actor_func = None
actor_name = None
for name, obj in module.__dict__.items():
if callable(obj) and hasattr(obj, '__actor_metadata__'):
actor_func = obj
actor_name = obj.__actor_metadata__.get('name', name)
break
if actor_func is None:
for name, obj in module.__dict__.items():
if callable(obj) and asyncio.iscoroutinefunction(obj):
actor_func = obj
actor_name = name
break
if actor_func:
self.actors[actor_name] = actor_func
logger.info(f"Loaded actor '{actor_name}' from {file_path}")
return True
else:
logger.error(f"No actor function found in {file_path}")
return False
except Exception as e:
logger.error(f"Failed to load actor from {file_path}: {e}")
return False
async def handle_rpc_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
request_id = request.get('id')
method = request.get('method')
params = request.get('params', {})
try:
if method == 'process':
result = await self.process_actor(params)
return {
"jsonrpc": "2.0",
"id": request_id,
"result": result
}
elif method == 'load':
file_path = params.get('file_path')
success = await self.load_actor(file_path)
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"success": success}
}
elif method == 'list':
return {
"jsonrpc": "2.0",
"id": request_id,
"result": {"actors": list(self.actors.keys())}
}
else:
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32601,
"message": f"Method not found: {method}"
}
}
except Exception as e:
logger.error(f"Error handling RPC request: {e}")
return {
"jsonrpc": "2.0",
"id": request_id,
"error": {
"code": -32603,
"message": str(e),
"data": traceback.format_exc()
}
}
async def process_actor(self, params: Dict[str, Any]) -> Dict[str, Any]:
actor_id = params.get('actor_id')
actor_func = self.actors.get(actor_id)
if not actor_func:
raise ValueError(f"Actor not found: {actor_id}")
context = ActorContext(
payload=params.get('payload', {}),
config=params.get('config', {}),
state=params.get('state', {}),
actor_id=actor_id,
timestamp=params.get('timestamp', 0)
)
original_send = context.send_async_output
async_outputs = []
def capture_async_output(port: str, data: Any):
async_outputs.append((port, data))
if self.websocket:
asyncio.create_task(self.send_notification(
"output",
{
"actor_id": actor_id,
"port": port,
"data": data,
"timestamp": int(datetime.now().timestamp() * 1000)
}
))
context.send_async_output = capture_async_output
try:
if asyncio.iscoroutinefunction(actor_func):
result = await actor_func(context)
else:
result = actor_func(context)
outputs = {}
if isinstance(result, dict):
outputs = result
elif result is not None:
outputs = {"output": result}
return {"outputs": outputs}
except Exception as e:
logger.error(f"Actor execution failed: {e}")
return {
"error": str(e),
"traceback": traceback.format_exc()
}
async def send_notification(self, method: str, params: Any):
if self.websocket:
notification = {
"jsonrpc": "2.0",
"method": method,
"params": params
}
try:
await self.websocket.send(json.dumps(notification))
except Exception as e:
logger.error(f"Failed to send notification: {e}")
async def handle_connection(self, websocket, path):
self.websocket = websocket
logger.info(f"Client connected from {websocket.remote_address}")
try:
async for message in websocket:
try:
request = json.loads(message)
logger.debug(f"Received request: {request}")
response = await self.handle_rpc_request(request)
await websocket.send(json.dumps(response))
logger.debug(f"Sent response: {response}")
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON: {e}")
error_response = {
"jsonrpc": "2.0",
"id": None,
"error": {
"code": -32700,
"message": "Parse error"
}
}
await websocket.send(json.dumps(error_response))
except websockets.exceptions.ConnectionClosed:
logger.info("Client disconnected")
except Exception as e:
logger.error(f"Connection error: {e}")
finally:
self.websocket = None
async def start(self):
logger.info(f"Starting Python Runtime Server on {self.host}:{self.port}")
self.running = True
async with websockets.serve(self.handle_connection, self.host, self.port):
logger.info(f"Server listening on ws://{self.host}:{self.port}")
await asyncio.Future()
def main():
import argparse
parser = argparse.ArgumentParser(description='Python Runtime Server for Reflow')
parser.add_argument('--host', default='127.0.0.1', help='Host to bind to')
parser.add_argument('--port', type=int, default=8765, help='Port to bind to')
parser.add_argument('--load', nargs='+', help='Actor files to preload')
parser.add_argument('--debug', action='store_true', help='Enable debug logging')
args = parser.parse_args()
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
server = PythonRuntimeServer(args.host, args.port)
if args.load:
async def preload():
for file_path in args.load:
await server.load_actor(file_path)
asyncio.run(preload())
try:
asyncio.run(server.start())
except KeyboardInterrupt:
logger.info("Server stopped by user")
if __name__ == '__main__':
main()