rho-cli 0.1.25

Rho CLI tools for encrypted agent collaboration, dataset publishing, controlled runs, and result release workflows
Documentation
#!/usr/bin/env python3

from __future__ import annotations

import argparse
import json
import os
from pathlib import Path
import shlex
import shutil
import subprocess
import sys
import tempfile


def parse_mount(value: str) -> tuple[Path, str, str]:
    parts = value.split(":")
    if len(parts) != 3:
        raise argparse.ArgumentTypeError("mount must be HOST:GUEST:MODE")
    host, guest, mode = parts
    if not guest.startswith("/"):
        raise argparse.ArgumentTypeError("guest path must be absolute")
    if mode not in {"ro", "rw"}:
        raise argparse.ArgumentTypeError("mode must be ro or rw")
    return Path(host).resolve(), guest.rstrip("/") or "/", mode


def parse_env(value: str) -> tuple[str, str]:
    if "=" not in value or value.startswith("="):
        raise argparse.ArgumentTypeError("env must be NAME=value")
    key, val = value.split("=", 1)
    validate_env_name(key)
    return key, val


def validate_env_name(value: str) -> None:
    if not value:
        raise argparse.ArgumentTypeError("env name must be non-empty")
    if not (value[0].isalpha() or value[0] == "_"):
        raise argparse.ArgumentTypeError(f"env name must start with a letter or _: {value}")
    if not all(ch.isalnum() or ch == "_" for ch in value):
        raise argparse.ArgumentTypeError(f"env name has invalid characters: {value}")


def validate_host_pattern(value: str) -> None:
    if not value:
        raise argparse.ArgumentTypeError("host pattern must be non-empty")
    if "://" in value or "/" in value or "\\" in value:
        raise argparse.ArgumentTypeError(f"host pattern must not include scheme or path: {value}")
    allowed = set("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789.-*:")
    if any(ch not in allowed for ch in value):
        raise argparse.ArgumentTypeError(f"host pattern has invalid characters: {value}")


def parse_tcp_map(value: str) -> str:
    if "=" not in value:
        raise argparse.ArgumentTypeError("tcp map must be GUEST_HOST[:PORT]=UPSTREAM_HOST:PORT")
    guest, upstream = value.split("=", 1)
    if not guest or not upstream or ":" not in upstream:
        raise argparse.ArgumentTypeError("tcp map must be GUEST_HOST[:PORT]=UPSTREAM_HOST:PORT")
    validate_host_pattern(guest)
    validate_host_pattern(upstream)
    return value


def build_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(prog="rho agent-run")
    parser.add_argument("--sandbox", action="store_true", help="run Pi inside Gondolin")
    parser.add_argument("--user", required=True)
    parser.add_argument("--provider")
    parser.add_argument("--model")
    parser.add_argument("--thinking")
    parser.add_argument("--prompt", required=True)
    parser.add_argument("--extension", action="append", default=[])
    parser.add_argument("--mount", action="append", default=[], type=parse_mount)
    parser.add_argument("--allow-host", action="append", default=[])
    parser.add_argument("--tcp-map", action="append", default=[], type=parse_tcp_map)
    parser.add_argument("--env", action="append", default=[], type=parse_env)
    parser.add_argument("--policy-out", type=Path)
    parser.add_argument("--auth-json-source", type=Path)
    parser.add_argument("--users-root", default="/pi-users")
    parser.add_argument("--control-root", default="/control")
    parser.add_argument("--repo-guest-root", default="/rho")
    parser.add_argument("--guest-cwd", default="/work")
    parser.add_argument("--dry-run", action="store_true")
    return parser


def validate_agent_policy(args: argparse.Namespace, repo_root: Path, mounts: list[tuple[Path, str, str]]) -> dict:
    seen_guest: set[str] = set()
    seen_env: set[str] = {"RHO_USERS_ROOT", "RHO_CONTROL_ROOT", "RHO_PI_USE_GLOBAL_AGENT_DIR", "PI_OFFLINE"}
    policy_mounts = []
    for host, guest, mode in mounts:
        if not host.exists() or not host.is_dir():
            raise ValueError(f"mount host path must be an existing directory: {host}")
        if guest in seen_guest:
            raise ValueError(f"duplicate guest mount path: {guest}")
        seen_guest.add(guest)
        policy_mounts.append({"host": str(host), "guest": guest, "mode": mode})
    for host in args.allow_host:
        validate_host_pattern(host)
    for key, _val in args.env:
        if key in seen_env:
            raise ValueError(f"env override is reserved for rho agent sandbox policy: {key}")
        seen_env.add(key)
    policy = {
        "version": 1,
        "agent_sandbox_policy": {
            "runner": "gondolin",
            "repo_root": str(repo_root),
            "repo_guest_root": args.repo_guest_root,
            "guest_cwd": args.guest_cwd,
            "mounts": policy_mounts,
            "network": {
                "default_deny": True,
                "dns": "synthetic",
                "allow_hosts": list(args.allow_host),
                "tcp_maps": list(args.tcp_map),
                "websockets": "disabled",
            },
            "env": {
                "fixed": {
                    "RHO_USERS_ROOT": args.users_root,
                    "RHO_CONTROL_ROOT": args.control_root,
                    "RHO_PI_USE_GLOBAL_AGENT_DIR": "0",
                    "PI_OFFLINE": "0",
                },
                "additional": {key: val for key, val in args.env},
            },
        },
    }
    if args.policy_out:
        args.policy_out.parent.mkdir(parents=True, exist_ok=True)
        args.policy_out.write_text(json.dumps(policy, indent=2) + "\n")
    return policy


def host_command(args: argparse.Namespace, repo_root: Path) -> list[str]:
    cmd = [str(repo_root / "rho")]
    for extension in args.extension:
        cmd.extend(["--extension", extension])
    if args.provider:
        cmd.extend(["--provider", args.provider])
    if args.model:
        cmd.extend(["--model", args.model])
    if args.thinking:
        cmd.extend(["--thinking", args.thinking])
    cmd.extend([args.user, args.prompt])
    return cmd


def map_host_path_to_guest(
    host_path: str, repo_root: Path, repo_guest_root: str, mounts: list[tuple[Path, str, str]]
) -> str:
    candidate = Path(host_path).resolve()
    options: list[tuple[Path, str]] = [(repo_root.resolve(), repo_guest_root)]
    for host, guest, _mode in mounts:
        options.append((host.resolve(), guest))

    best: tuple[Path, str] | None = None
    for host_prefix, guest_prefix in options:
        try:
            candidate.relative_to(host_prefix)
        except ValueError:
            continue
        if best is None or len(str(host_prefix)) > len(str(best[0])):
            best = (host_prefix, guest_prefix)

    if best is None:
        return host_path

    relative = candidate.relative_to(best[0]).as_posix()
    if relative == ".":
        return best[1]
    return f"{best[1].rstrip('/')}/{relative}"


def sandbox_command(
    args: argparse.Namespace,
    repo_root: Path,
    mounts: list[tuple[Path, str, str]],
) -> str:
    cmd = [f"{args.repo_guest_root}/rho"]
    for extension in args.extension:
        cmd.extend(
            [
                "--extension",
                map_host_path_to_guest(extension, repo_root, args.repo_guest_root, mounts),
            ]
        )
    if args.provider:
        cmd.extend(["--provider", args.provider])
    if args.model:
        cmd.extend(["--model", args.model])
    if args.thinking:
        cmd.extend(["--thinking", args.thinking])
    cmd.extend([args.user, args.prompt])
    return f"cd {shlex.quote(args.guest_cwd)} && exec {shlex.join(cmd)}"


def gondolin_bin(repo_root: Path) -> Path:
    return repo_root / "repos" / "gondolin" / "host" / "bin" / "gondolin.ts"


def run_sandbox(args: argparse.Namespace, repo_root: Path) -> int:
    gbin = gondolin_bin(repo_root)
    if not gbin.is_file():
        print(f"gondolin CLI not found: {gbin}", file=sys.stderr)
        return 1

    mounts: list[tuple[Path, str, str]] = [
        (repo_root.resolve(), args.repo_guest_root.rstrip("/") or "/", "ro"),
        *args.mount,
    ]
    cleanup_dirs: list[Path] = []
    if args.auth_json_source:
        source = args.auth_json_source.expanduser().resolve()
        if not source.is_file():
            print(f"auth json not found: {source}", file=sys.stderr)
            return 2
        staged = Path(tempfile.mkdtemp(prefix="rho-agent-auth-"))
        cleanup_dirs.append(staged)
        shutil.copy2(source, staged / "auth.json")
        mounts.append((staged, f"{args.users_root.rstrip('/')}/{args.user}/agent", "rw"))

    validate_agent_policy(args, repo_root, mounts)

    command = ["node", str(gbin), "exec"]
    for host, guest, mode in mounts:
        spec = f"{host}:{guest}"
        if mode == "ro":
            spec += ":ro"
        command.extend(["--mount-hostfs", spec])
    for host in args.allow_host:
        command.extend(["--allow-host", host])
    for mapping in args.tcp_map:
        command.extend(["--tcp-map", mapping])
    command.extend(["--dns", "synthetic", "--disable-websockets"])
    command.extend(["--env", f"RHO_USERS_ROOT={args.users_root}"])
    command.extend(["--env", f"RHO_CONTROL_ROOT={args.control_root}"])
    command.extend(["--env", "RHO_PI_USE_GLOBAL_AGENT_DIR=0"])
    command.extend(["--env", "PI_OFFLINE=0"])
    for key, val in args.env:
        command.extend(["--env", f"{key}={val}"])
    command.extend(["--cwd", args.guest_cwd, "--", "/bin/sh", "-lc", sandbox_command(args, repo_root, mounts)])

    if args.dry_run:
        print(shlex.join(command))
        return 0
    try:
        return subprocess.run(command, cwd=repo_root).returncode
    finally:
        for directory in cleanup_dirs:
            shutil.rmtree(directory, ignore_errors=True)


def main() -> int:
    args = build_parser().parse_args()
    repo_root = Path(__file__).resolve().parent.parent
    if args.sandbox:
        return run_sandbox(args, repo_root)
    return subprocess.run(host_command(args, repo_root), cwd=repo_root).returncode


if __name__ == "__main__":
    raise SystemExit(main())