from __future__ import annotations
import asyncio
import atexit
import concurrent.futures
import json
import os
import threading
from datetime import datetime
from decimal import Decimal
from enum import Enum
from typing import Any, Dict, Generic, Optional, TypeVar
from uuid import UUID
C = TypeVar("C")
_RPC_TIMEOUT_S: float = 30.0
_STDERR_DRAIN_TIMEOUT_S: float = 1.0
_SYNC_CONNECT_TIMEOUT_S: int = 35
_SYNC_LOOP_JOIN_TIMEOUT_S: int = 10
from .engine import EngineProcess from ..errors.errors import HandshakeError, ProtocolError, TransactionError, TransactionTimeoutError from .protocol import JsonRpcRequest, JsonRpcResponse from .transaction import IsolationLevel, TransactionClient
def _build_engine_error(stderr: str, schema_path: str) -> str:
if stderr:
stderr_lower = stderr.lower()
if "database_url" in stderr_lower or "environment variable" in stderr_lower:
return (
"Engine failed to start: DATABASE_URL is not set or invalid.\n"
"Add DATABASE_URL to your .env file or set it as an environment variable.\n"
f"Details: {stderr}"
)
if "connection refused" in stderr_lower or "could not connect" in stderr_lower:
return (
"Engine could not connect to the database.\n"
"Make sure your database is running and DATABASE_URL is correct.\n"
f"Details: {stderr}"
)
if "no such file" in stderr_lower or "not found" in stderr_lower:
return (
"Engine failed to start: a required file was not found.\n"
f"Details: {stderr}"
)
return f"Engine process exited unexpectedly.\nDetails: {stderr}"
if not os.path.isfile(schema_path):
return (
f"Engine failed to start: schema file not found at:\n"
f" {schema_path}\n"
"Re-run 'nautilus generate' from the directory containing your schema file."
)
if not os.environ.get("DATABASE_URL"):
return (
"Engine failed to start: DATABASE_URL is not set.\n"
"Add DATABASE_URL to your .env file or set it as an environment variable."
)
return "Engine process exited unexpectedly (no output on stderr)."
def _json_default(obj: Any) -> Any:
if isinstance(obj, UUID):
return str(obj)
if isinstance(obj, datetime):
return obj.isoformat()
if isinstance(obj, Decimal):
return str(obj)
if isinstance(obj, Enum):
return obj.value
raise TypeError(f"Object of type {type(obj).__name__} is not JSON serializable")
_GLOBAL_INSTANCE: Optional[NautilusClient] = None
_auto_register_lock = threading.Lock()
class NautilusClient:
def __init__(self, schema_path: str, engine_path: Optional[str] = None, migrate: bool = False, auto_register: bool = False) -> None:
self.schema_path = schema_path
self.engine = EngineProcess(engine_path, migrate=migrate)
self._request_id = 0
self._pending: Dict[int, asyncio.Future] = {}
self._partial_data: Dict[int, list] = {}
self._reader_task: Optional[asyncio.Task] = None
self._writer_lock = asyncio.Lock()
self._handshake_done = False
self._delegates: Dict[str, Any] = {}
self._auto_registered = False
self._atexit_handler = None
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._loop_close_original = None
self._loop_close_patch = None
self._sync_loop: Optional[asyncio.AbstractEventLoop] = None
self._sync_thread: Optional[threading.Thread] = None
self._sync_loop_lock = threading.Lock()
if auto_register:
self._set_as_global_instance()
async def __aenter__(self) -> NautilusClient:
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.disconnect()
if self._auto_registered:
self._clear_global_instance()
async def connect(self) -> None:
if self.engine.is_running():
return
await self.engine.spawn(self.schema_path)
self._reader_task = asyncio.create_task(self._read_responses())
await self._handshake()
loop = asyncio.get_running_loop()
self._loop = loop
self._loop_close_original = loop.close
def _ensure_disconnected_then_close():
original_close = self._loop_close_original
loop.close = original_close if self.engine.is_running():
try:
loop.run_until_complete(self.disconnect())
except Exception:
pid = self.engine._pid
if pid is not None:
import signal as _signal
try:
os.kill(pid, _signal.SIGTERM)
except OSError:
pass
original_close()
self._loop_close_patch = _ensure_disconnected_then_close
loop.close = _ensure_disconnected_then_close
def _atexit_cleanup():
pid = self.engine._pid
if pid is not None:
import signal as _signal
try:
os.kill(pid, _signal.SIGTERM)
except OSError:
pass
self._atexit_handler = _atexit_cleanup
atexit.register(self._atexit_handler)
async def disconnect(self) -> None:
if (
self._loop is not None
and self._loop_close_patch is not None
and getattr(self._loop, "close", None) is self._loop_close_patch
):
self._loop.close = self._loop_close_original self._loop_close_patch = None
self._loop_close_original = None
if self._atexit_handler is not None:
atexit.unregister(self._atexit_handler)
self._atexit_handler = None
if self._reader_task:
self._reader_task.cancel()
try:
await self._reader_task
except asyncio.CancelledError:
pass
await self.engine.terminate()
for future in self._pending.values():
if not future.done():
future.cancel()
self._pending.clear()
self._partial_data.clear()
if self._auto_registered:
self._clear_global_instance()
async def _handshake(self) -> None:
try:
response = await self._rpc("engine.handshake", {
"protocolVersion": 1,
"clientName": "nautilus-py",
"clientVersion": "0.1.0",
})
protocol_version = response.get("protocolVersion")
if protocol_version != 1:
raise HandshakeError(
f"Protocol version mismatch: engine uses {protocol_version}, "
f"client expects 1"
)
self._handshake_done = True
except Exception as e:
await self.disconnect()
raise HandshakeError(f"Handshake failed: {e}") from e
async def _rpc(self, method: str, params: Dict[str, Any]) -> Any:
self._request_id += 1
request_id = self._request_id
request = JsonRpcRequest(
id=request_id,
method=method,
params=params,
)
future: asyncio.Future = asyncio.Future()
self._pending[request_id] = future
try:
await self._write_request(request)
response = await asyncio.wait_for(future, timeout=_RPC_TIMEOUT_S)
return response
except asyncio.TimeoutError:
raise ProtocolError(f"Request {request_id} timed out")
finally:
self._pending.pop(request_id, None)
async def _write_request(self, request: JsonRpcRequest) -> None:
stdin = self.engine.stdin
if not stdin:
raise ProtocolError("Engine process not running")
request_json = json.dumps(request.to_dict(), default=_json_default)
line = (request_json + "\n").encode("utf-8")
async with self._writer_lock:
stdin.write(line)
await stdin.drain()
async def _read_responses(self) -> None:
stdout = self.engine.stdout
if not stdout:
return
try:
while True:
line_bytes = await stdout.readline()
if not line_bytes:
drain_task = self.engine._stderr_drain_task
if drain_task and not drain_task.done():
try:
await asyncio.wait_for(asyncio.shield(drain_task), timeout=_STDERR_DRAIN_TIMEOUT_S)
except (asyncio.TimeoutError, Exception):
pass
stderr_output = self.engine.get_stderr_output().strip()
error_msg = _build_engine_error(stderr_output, self.schema_path)
for future in self._pending.values():
if not future.done():
future.set_exception(ProtocolError(error_msg))
break
line = line_bytes.decode("utf-8").strip()
if not line:
continue
try:
response_dict = json.loads(line)
response = JsonRpcResponse.from_dict(response_dict)
if response.id is not None:
req_id = response.id
future = self._pending.get(req_id)
if future and not future.done():
if response.partial is True:
chunk_data = (response.result or {}).get("data", [])
self._partial_data.setdefault(req_id, []).extend(chunk_data)
else:
try:
result = response.unwrap()
if req_id in self._partial_data:
accumulated = self._partial_data.pop(req_id)
if isinstance(result, dict) and "data" in result:
result = {**result, "data": accumulated + result["data"]}
future.set_result(result)
except Exception as e:
self._partial_data.pop(req_id, None)
future.set_exception(e)
except json.JSONDecodeError as e:
print(f"Failed to parse response: {line}", e)
except Exception as e:
print(f"Error processing response: {e}")
except asyncio.CancelledError:
raise except Exception as e:
print(f"Reader task error: {e}")
for future in self._pending.values():
if not future.done():
future.set_exception(ProtocolError(f"Reader task failed: {e}"))
def transaction(
self,
callback=None,
*,
timeout_ms: int = 5000,
isolation_level: Optional[IsolationLevel] = None,
):
if callback is not None:
return self._run_transaction_callback(
callback, timeout_ms=timeout_ms, isolation_level=isolation_level
)
return _AsyncTransactionContext(
self, timeout_ms=timeout_ms, isolation_level=isolation_level
)
async def _start_transaction(
self,
timeout_ms: int = 5000,
isolation_level: Optional[IsolationLevel] = None,
) -> str:
params: Dict[str, Any] = {"protocolVersion": 1, "timeoutMs": timeout_ms}
if isolation_level is not None:
params["isolationLevel"] = isolation_level.value
result = await self._rpc("transaction.start", params)
return result["id"]
async def _commit_transaction(self, tx_id: str) -> None:
await self._rpc("transaction.commit", {"protocolVersion": 1, "id": tx_id})
async def _rollback_transaction(self, tx_id: str) -> None:
try:
await self._rpc("transaction.rollback", {"protocolVersion": 1, "id": tx_id})
except Exception:
pass
async def _run_transaction_callback(
self,
callback,
*,
timeout_ms: int = 5000,
isolation_level: Optional[IsolationLevel] = None,
):
tx_id = await self._start_transaction(
timeout_ms=timeout_ms, isolation_level=isolation_level
)
tx = TransactionClient(self, tx_id)
try:
result = await callback(tx)
except Exception:
await self._rollback_transaction(tx_id)
raise
await self._commit_transaction(tx_id)
return result
async def transaction_batch(
self,
operations: list,
*,
timeout_ms: int = 5000,
isolation_level: Optional[IsolationLevel] = None,
) -> list:
params: Dict[str, Any] = {
"protocolVersion": 1,
"operations": operations,
"timeoutMs": timeout_ms,
}
if isolation_level is not None:
params["isolationLevel"] = isolation_level.value
result = await self._rpc("transaction.batch", params)
return result.get("results", [])
def _ensure_sync_loop(self) -> asyncio.AbstractEventLoop:
with self._sync_loop_lock:
if self._sync_loop is None or not self._sync_loop.is_running():
loop = asyncio.new_event_loop()
thread = threading.Thread(
target=loop.run_forever,
daemon=True,
name="nautilus-sync-loop",
)
thread.start()
self._sync_loop = loop
self._sync_thread = thread
return self._sync_loop
def _sync_rpc(self, method: str, params: Dict[str, Any]) -> Any:
loop = self._ensure_sync_loop()
future = asyncio.run_coroutine_threadsafe(self._rpc(method, params), loop)
return future.result()
def _sync_connect(self) -> None:
loop = self._ensure_sync_loop()
future = asyncio.run_coroutine_threadsafe(NautilusClient.connect(self), loop)
try:
future.result(timeout=_SYNC_CONNECT_TIMEOUT_S)
except concurrent.futures.TimeoutError:
future.cancel()
raise TimeoutError(f"Engine connection timed out after {_SYNC_CONNECT_TIMEOUT_S} seconds")
def _sync_disconnect(self) -> None:
if self._sync_loop is None:
return
loop = self._sync_loop
future = asyncio.run_coroutine_threadsafe(NautilusClient.disconnect(self), loop)
future.result()
loop.call_soon_threadsafe(loop.stop)
if self._sync_thread is not None:
self._sync_thread.join(timeout=_SYNC_LOOP_JOIN_TIMEOUT_S)
self._sync_loop = None
self._sync_thread = None
def sync_transaction(
self,
callback=None,
*,
timeout_ms: int = 5000,
isolation_level: Optional[IsolationLevel] = None,
):
if callback is not None:
return self._sync_run_transaction_callback(
callback, timeout_ms=timeout_ms, isolation_level=isolation_level
)
return _SyncTransactionContext(
self, timeout_ms=timeout_ms, isolation_level=isolation_level
)
def _sync_start_transaction(
self,
timeout_ms: int = 5000,
isolation_level: Optional[IsolationLevel] = None,
) -> str:
loop = self._ensure_sync_loop()
future = asyncio.run_coroutine_threadsafe(
self._start_transaction(timeout_ms, isolation_level), loop
)
return future.result()
def _sync_commit_transaction(self, tx_id: str) -> None:
loop = self._ensure_sync_loop()
future = asyncio.run_coroutine_threadsafe(
self._commit_transaction(tx_id), loop
)
future.result()
def _sync_rollback_transaction(self, tx_id: str) -> None:
loop = self._ensure_sync_loop()
future = asyncio.run_coroutine_threadsafe(
self._rollback_transaction(tx_id), loop
)
try:
future.result()
except Exception:
pass
def _sync_run_transaction_callback(
self,
callback,
*,
timeout_ms: int = 5000,
isolation_level: Optional[IsolationLevel] = None,
):
tx_id = self._sync_start_transaction(timeout_ms, isolation_level)
tx = TransactionClient(self, tx_id)
try:
result = callback(tx)
except Exception:
self._sync_rollback_transaction(tx_id)
raise
self._sync_commit_transaction(tx_id)
return result
def sync_transaction_batch(
self,
operations: list,
*,
timeout_ms: int = 5000,
isolation_level: Optional[IsolationLevel] = None,
) -> list:
loop = self._ensure_sync_loop()
future = asyncio.run_coroutine_threadsafe(
self.transaction_batch(
operations,
timeout_ms=timeout_ms,
isolation_level=isolation_level,
),
loop,
)
return future.result()
def register_delegate(self, name: str, delegate: Any) -> None:
self._delegates[name] = delegate
def get_delegate(self, name: str) -> Any:
return self._delegates[name]
def _set_as_global_instance(self) -> None:
global _GLOBAL_INSTANCE
with _auto_register_lock:
if _GLOBAL_INSTANCE is not None:
raise RuntimeError(
"A Nautilus instance with auto_register=True already exists. "
"Only one auto-registered instance is allowed at a time."
)
_GLOBAL_INSTANCE = self
self._auto_registered = True
def _clear_global_instance(self) -> None:
global _GLOBAL_INSTANCE
with _auto_register_lock:
if _GLOBAL_INSTANCE is self:
_GLOBAL_INSTANCE = None
self._auto_registered = False
@staticmethod
def get_global_instance() -> Optional[NautilusClient]:
return _GLOBAL_INSTANCE
class _AsyncTransactionContext(Generic[C]):
def __init__(
self,
client: NautilusClient,
*,
timeout_ms: int = 5000,
isolation_level: Optional[IsolationLevel] = None,
) -> None:
self._client = client
self._timeout_ms = timeout_ms
self._isolation_level = isolation_level
self._tx_id: Optional[str] = None
self._tx: Optional[TransactionClient] = None
async def __aenter__(self) -> C: self._tx_id = await self._client._start_transaction(
self._timeout_ms, self._isolation_level
)
self._tx = TransactionClient(self._client, self._tx_id)
return self._tx
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
if self._tx_id is None:
return
if exc_type is not None:
await self._client._rollback_transaction(self._tx_id)
else:
await self._client._commit_transaction(self._tx_id)
self._tx_id = None
class _SyncTransactionContext(Generic[C]):
def __init__(
self,
client: NautilusClient,
*,
timeout_ms: int = 5000,
isolation_level: Optional[IsolationLevel] = None,
) -> None:
self._client = client
self._timeout_ms = timeout_ms
self._isolation_level = isolation_level
self._tx_id: Optional[str] = None
self._tx: Optional[TransactionClient] = None
def __enter__(self) -> C: self._tx_id = self._client._sync_start_transaction(
self._timeout_ms, self._isolation_level
)
self._tx = TransactionClient(self._client, self._tx_id)
return self._tx
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
if self._tx_id is None:
return
if exc_type is not None:
self._client._sync_rollback_transaction(self._tx_id)
else:
self._client._sync_commit_transaction(self._tx_id)
self._tx_id = None