from __future__ import annotations
import argparse
import json
import os
from pathlib import Path
import shlex
import shutil
import subprocess
import sys
import tempfile
def parse_mount(value: str) -> tuple[Path, str, str]:
parts = value.split(":")
if len(parts) != 3:
raise argparse.ArgumentTypeError("mount must be HOST:GUEST:MODE")
host, guest, mode = parts
if not guest.startswith("/"):
raise argparse.ArgumentTypeError("guest path must be absolute")
if mode not in {"ro", "rw"}:
raise argparse.ArgumentTypeError("mode must be ro or rw")
return Path(host).resolve(), guest.rstrip("/") or "/", mode
def parse_env(value: str) -> tuple[str, str]:
if "=" not in value or value.startswith("="):
raise argparse.ArgumentTypeError("env must be NAME=value")
key, val = value.split("=", 1)
validate_env_name(key)
return key, val
def validate_env_name(value: str) -> None:
if not value:
raise argparse.ArgumentTypeError("env name must be non-empty")
if not (value[0].isalpha() or value[0] == "_"):
raise argparse.ArgumentTypeError(f"env name must start with a letter or _: {value}")
if not all(ch.isalnum() or ch == "_" for ch in value):
raise argparse.ArgumentTypeError(f"env name has invalid characters: {value}")
def validate_host_pattern(value: str) -> None:
if not value:
raise argparse.ArgumentTypeError("host pattern must be non-empty")
if "://" in value or "/" in value or "\\" in value:
raise argparse.ArgumentTypeError(f"host pattern must not include scheme or path: {value}")
allowed = set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789.-*:")
if any(ch not in allowed for ch in value):
raise argparse.ArgumentTypeError(f"host pattern has invalid characters: {value}")
def parse_tcp_map(value: str) -> str:
if "=" not in value:
raise argparse.ArgumentTypeError("tcp map must be GUEST_HOST[:PORT]=UPSTREAM_HOST:PORT")
guest, upstream = value.split("=", 1)
if not guest or not upstream or ":" not in upstream:
raise argparse.ArgumentTypeError("tcp map must be GUEST_HOST[:PORT]=UPSTREAM_HOST:PORT")
validate_host_pattern(guest)
validate_host_pattern(upstream)
return value
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="rho agent-run")
parser.add_argument("--sandbox", action="store_true", help="run Pi inside Gondolin")
parser.add_argument("--user", required=True)
parser.add_argument("--provider")
parser.add_argument("--model")
parser.add_argument("--thinking")
parser.add_argument("--prompt", required=True)
parser.add_argument("--extension", action="append", default=[])
parser.add_argument("--mount", action="append", default=[], type=parse_mount)
parser.add_argument("--allow-host", action="append", default=[])
parser.add_argument("--tcp-map", action="append", default=[], type=parse_tcp_map)
parser.add_argument("--env", action="append", default=[], type=parse_env)
parser.add_argument("--policy-out", type=Path)
parser.add_argument("--auth-json-source", type=Path)
parser.add_argument("--users-root", default="/pi-users")
parser.add_argument("--control-root", default="/control")
parser.add_argument("--repo-guest-root", default="/rho")
parser.add_argument("--guest-cwd", default="/work")
parser.add_argument("--dry-run", action="store_true")
return parser
def validate_agent_policy(args: argparse.Namespace, repo_root: Path, mounts: list[tuple[Path, str, str]]) -> dict:
seen_guest: set[str] = set()
seen_env: set[str] = {"RHO_USERS_ROOT", "RHO_CONTROL_ROOT", "RHO_PI_USE_GLOBAL_AGENT_DIR", "PI_OFFLINE"}
policy_mounts = []
for host, guest, mode in mounts:
if not host.exists() or not host.is_dir():
raise ValueError(f"mount host path must be an existing directory: {host}")
if guest in seen_guest:
raise ValueError(f"duplicate guest mount path: {guest}")
seen_guest.add(guest)
policy_mounts.append({"host": str(host), "guest": guest, "mode": mode})
for host in args.allow_host:
validate_host_pattern(host)
for key, _val in args.env:
if key in seen_env:
raise ValueError(f"env override is reserved for rho agent sandbox policy: {key}")
seen_env.add(key)
policy = {
"version": 1,
"agent_sandbox_policy": {
"runner": "gondolin",
"repo_root": str(repo_root),
"repo_guest_root": args.repo_guest_root,
"guest_cwd": args.guest_cwd,
"mounts": policy_mounts,
"network": {
"default_deny": True,
"dns": "synthetic",
"allow_hosts": list(args.allow_host),
"tcp_maps": list(args.tcp_map),
"websockets": "disabled",
},
"env": {
"fixed": {
"RHO_USERS_ROOT": args.users_root,
"RHO_CONTROL_ROOT": args.control_root,
"RHO_PI_USE_GLOBAL_AGENT_DIR": "0",
"PI_OFFLINE": "0",
},
"additional": {key: val for key, val in args.env},
},
},
}
if args.policy_out:
args.policy_out.parent.mkdir(parents=True, exist_ok=True)
args.policy_out.write_text(json.dumps(policy, indent=2) + "\n")
return policy
def host_command(args: argparse.Namespace, repo_root: Path) -> list[str]:
cmd = [str(repo_root / "rho")]
for extension in args.extension:
cmd.extend(["--extension", extension])
if args.provider:
cmd.extend(["--provider", args.provider])
if args.model:
cmd.extend(["--model", args.model])
if args.thinking:
cmd.extend(["--thinking", args.thinking])
cmd.extend([args.user, args.prompt])
return cmd
def map_host_path_to_guest(
host_path: str, repo_root: Path, repo_guest_root: str, mounts: list[tuple[Path, str, str]]
) -> str:
candidate = Path(host_path).resolve()
options: list[tuple[Path, str]] = [(repo_root.resolve(), repo_guest_root)]
for host, guest, _mode in mounts:
options.append((host.resolve(), guest))
best: tuple[Path, str] | None = None
for host_prefix, guest_prefix in options:
try:
candidate.relative_to(host_prefix)
except ValueError:
continue
if best is None or len(str(host_prefix)) > len(str(best[0])):
best = (host_prefix, guest_prefix)
if best is None:
return host_path
relative = candidate.relative_to(best[0]).as_posix()
if relative == ".":
return best[1]
return f"{best[1].rstrip('/')}/{relative}"
def sandbox_command(
args: argparse.Namespace,
repo_root: Path,
mounts: list[tuple[Path, str, str]],
) -> str:
cmd = [f"{args.repo_guest_root}/rho"]
for extension in args.extension:
cmd.extend(
[
"--extension",
map_host_path_to_guest(extension, repo_root, args.repo_guest_root, mounts),
]
)
if args.provider:
cmd.extend(["--provider", args.provider])
if args.model:
cmd.extend(["--model", args.model])
if args.thinking:
cmd.extend(["--thinking", args.thinking])
cmd.extend([args.user, args.prompt])
return f"cd {shlex.quote(args.guest_cwd)} && exec {shlex.join(cmd)}"
def gondolin_bin(repo_root: Path) -> Path:
return repo_root / "repos" / "gondolin" / "host" / "bin" / "gondolin.ts"
def run_sandbox(args: argparse.Namespace, repo_root: Path) -> int:
gbin = gondolin_bin(repo_root)
if not gbin.is_file():
print(f"gondolin CLI not found: {gbin}", file=sys.stderr)
return 1
mounts: list[tuple[Path, str, str]] = [
(repo_root.resolve(), args.repo_guest_root.rstrip("/") or "/", "ro"),
*args.mount,
]
cleanup_dirs: list[Path] = []
if args.auth_json_source:
source = args.auth_json_source.expanduser().resolve()
if not source.is_file():
print(f"auth json not found: {source}", file=sys.stderr)
return 2
staged = Path(tempfile.mkdtemp(prefix="rho-agent-auth-"))
cleanup_dirs.append(staged)
shutil.copy2(source, staged / "auth.json")
mounts.append((staged, f"{args.users_root.rstrip('/')}/{args.user}/agent", "rw"))
validate_agent_policy(args, repo_root, mounts)
command = ["node", str(gbin), "exec"]
for host, guest, mode in mounts:
spec = f"{host}:{guest}"
if mode == "ro":
spec += ":ro"
command.extend(["--mount-hostfs", spec])
for host in args.allow_host:
command.extend(["--allow-host", host])
for mapping in args.tcp_map:
command.extend(["--tcp-map", mapping])
command.extend(["--dns", "synthetic", "--disable-websockets"])
command.extend(["--env", f"RHO_USERS_ROOT={args.users_root}"])
command.extend(["--env", f"RHO_CONTROL_ROOT={args.control_root}"])
command.extend(["--env", "RHO_PI_USE_GLOBAL_AGENT_DIR=0"])
command.extend(["--env", "PI_OFFLINE=0"])
for key, val in args.env:
command.extend(["--env", f"{key}={val}"])
command.extend(["--cwd", args.guest_cwd, "--", "/bin/sh", "-lc", sandbox_command(args, repo_root, mounts)])
if args.dry_run:
print(shlex.join(command))
return 0
try:
return subprocess.run(command, cwd=repo_root).returncode
finally:
for directory in cleanup_dirs:
shutil.rmtree(directory, ignore_errors=True)
def main() -> int:
args = build_parser().parse_args()
repo_root = Path(__file__).resolve().parent.parent
if args.sandbox:
return run_sandbox(args, repo_root)
return subprocess.run(host_command(args, repo_root), cwd=repo_root).returncode
if __name__ == "__main__":
raise SystemExit(main())