"""Engine process management with auto-download from GitHub releases."""
from __future__ import annotations
import asyncio
import os
import platform
import signal
import shutil
import sys
import urllib.request
from pathlib import Path
from typing import Optional
NAUTILUS_ENGINE_VERSION = "0.1.0"
GITHUB_REPO = "nautilus-orm/nautilus"
_BINARY_NAME = "nautilus.exe" if platform.system() == "Windows" else "nautilus"
_LEGACY_BINARY_NAME = "nautilus-engine.exe" if platform.system() == "Windows" else "nautilus-engine"
class EngineProcess:
def __init__(self, engine_path: Optional[str] = None, migrate: bool = False) -> None:
self._resolved = engine_path
self._is_legacy = bool(
engine_path and os.path.basename(engine_path).startswith("nautilus-engine")
)
self.migrate = migrate
self._process: Optional[asyncio.subprocess.Process] = None
self._stderr_drain_task: Optional[asyncio.Task] = None
self._stderr_buffer: list = []
self._pid: Optional[int] = None
async def spawn(self, schema_path: str) -> None:
if self._process:
raise RuntimeError("Engine process already running")
self._stderr_buffer = []
self._load_dotenv(schema_path)
if self._resolved is None:
self._resolved = self._find_or_download_engine(schema_path)
self._is_legacy = os.path.basename(self._resolved).startswith("nautilus-engine")
if self._is_legacy:
cmd = [self._resolved, "--schema", schema_path]
if self.migrate:
cmd.append("--migrate")
else:
cmd = [self._resolved, "engine", "serve", "--schema", schema_path]
if self.migrate:
cmd.append("--migrate")
self._process = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
limit=16 * 1024 * 1024, )
self._pid = self._process.pid
self._stderr_drain_task = asyncio.ensure_future(self._drain_stderr())
async def terminate(self) -> None:
if not self._process:
return
process = self._process
self._process = None
self._pid = None
if self._stderr_drain_task and not self._stderr_drain_task.done():
self._stderr_drain_task.cancel()
try:
await self._stderr_drain_task
except (asyncio.CancelledError, Exception):
pass
self._stderr_drain_task = None
if process.stdin and not process.stdin.is_closing():
process.stdin.close()
try:
await process.stdin.wait_closed()
except Exception:
pass
try:
process.terminate()
except ProcessLookupError:
pass
try:
await asyncio.wait_for(process.wait(), timeout=5.0)
except asyncio.TimeoutError:
try:
process.kill()
except ProcessLookupError:
pass
await process.wait()
for stream in (process.stdout, process.stderr):
if stream is None:
continue
transport = getattr(stream, "_transport", None)
if transport is not None and not transport.is_closing():
transport.close()
sub_transport = getattr(process, "_transport", None)
if sub_transport is not None and not sub_transport.is_closing():
sub_transport.close()
for _ in range(5):
await asyncio.sleep(0)
def is_running(self) -> bool:
return self._process is not None and self._process.returncode is None
async def _drain_stderr(self) -> None:
try:
reader = self._process.stderr if self._process else None
if reader is None:
return
while True:
chunk = await reader.read(65536)
if not chunk:
break self._stderr_buffer.append(chunk)
except (asyncio.CancelledError, Exception):
pass
def get_stderr_output(self) -> str:
return b"".join(self._stderr_buffer).decode("utf-8", errors="replace")
@staticmethod
def _load_dotenv(schema_path: str) -> None:
seen: set = set()
search_dirs: list = []
d = Path(schema_path).resolve().parent
while True:
if d not in seen:
search_dirs.append(d)
seen.add(d)
parent = d.parent
if parent == d:
break
d = parent
cwd = Path.cwd()
if cwd not in seen:
search_dirs.append(cwd)
dotenv_path: Optional[Path] = None
for directory in search_dirs:
candidate = directory / ".env"
if candidate.is_file():
dotenv_path = candidate
break
if dotenv_path is None:
return
try:
with dotenv_path.open(encoding="utf-8") as fh:
for line in fh:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
key, _, value = line.partition("=")
key = key.strip()
value = value.strip()
if len(value) >= 2 and value[0] in ('"', "'") and value[0] == value[-1]:
value = value[1:-1]
if key and key not in os.environ:
os.environ[key] = value
except OSError:
pass
@property
def stdin(self) -> Optional[asyncio.StreamWriter]:
return self._process.stdin if self._process else None
@property
def stdout(self) -> Optional[asyncio.StreamReader]:
return self._process.stdout if self._process else None
@property
def stderr(self) -> Optional[asyncio.StreamReader]:
return self._process.stderr if self._process else None
def _find_or_download_engine(self, schema_path: Optional[str] = None) -> str:
local_binary = self._find_workspace_binary(schema_path)
if local_binary:
return local_binary
path_binary = shutil.which(_BINARY_NAME)
if path_binary:
return path_binary
legacy_binary = shutil.which(_LEGACY_BINARY_NAME)
if legacy_binary:
return legacy_binary
cache_dir = self._get_cache_dir()
cached_binary = cache_dir / _BINARY_NAME
if cached_binary.exists():
return str(cached_binary)
print(f"Downloading nautilus v{NAUTILUS_ENGINE_VERSION}...")
self._download_engine(cache_dir)
if not cached_binary.exists():
raise FileNotFoundError(
f"Could not find or download the nautilus binary.\n"
f"Install it manually with: cargo install nautilus-cli\n"
f"or add the compiled binary to your PATH."
)
return str(cached_binary)
def _find_workspace_binary(self, schema_path: Optional[str]) -> Optional[str]:
for root in self._search_roots(schema_path):
for build_dir in ("debug", "release"):
for binary_name in (_BINARY_NAME, _LEGACY_BINARY_NAME):
candidate = root / "target" / build_dir / binary_name
if candidate.is_file() and os.access(candidate, os.X_OK):
return str(candidate)
return None
@staticmethod
def _search_roots(schema_path: Optional[str]) -> list[Path]:
roots: list[Path] = []
seen: set[Path] = set()
if schema_path:
current = Path(schema_path).resolve().parent
while True:
if current not in seen:
roots.append(current)
seen.add(current)
parent = current.parent
if parent == current:
break
current = parent
cwd = Path.cwd()
if cwd not in seen:
roots.append(cwd)
return roots
def _get_cache_dir(self) -> Path:
if platform.system() == "Windows":
cache_base = Path(os.environ.get("LOCALAPPDATA", str(Path.home() / ".nautilus")))
else:
cache_base = Path.home() / ".nautilus"
cache_dir = cache_base / "bin" / NAUTILUS_ENGINE_VERSION
cache_dir.mkdir(parents=True, exist_ok=True)
return cache_dir
def _download_engine(self, cache_dir: Path) -> None:
system = platform.system()
machine = platform.machine().lower()
if system == "Windows":
platform_suffix = "x86_64-pc-windows-msvc.exe"
elif system == "Darwin":
platform_suffix = "x86_64-apple-darwin" if machine == "x86_64" else "aarch64-apple-darwin"
elif system == "Linux":
platform_suffix = "x86_64-unknown-linux-gnu"
else:
raise RuntimeError(f"Unsupported platform: {system}")
url = (
f"https://github.com/{GITHUB_REPO}/releases/download/"
f"v{NAUTILUS_ENGINE_VERSION}/nautilus-{platform_suffix}"
)
target_path = cache_dir / _BINARY_NAME
try:
print(f"Downloading from {url}...")
urllib.request.urlretrieve(url, target_path)
if system != "Windows":
os.chmod(target_path, 0o755)
print(f"Downloaded to {target_path}")
except Exception as e:
print(f"Warning: Auto-download failed: {e}", file=sys.stderr)
print("Please install manually: cargo install nautilus-cli", file=sys.stderr)
raise