from __future__ import annotations
import argparse
import importlib.util
import json
import os
from pathlib import Path
import shlex
import shutil
import subprocess
import sys
import tempfile
import yaml
SCRUB_MODULE_PATH = Path(__file__).with_name("rho_fixture_scrub.py")
def load_scrub_module():
spec = importlib.util.spec_from_file_location("rho_fixture_scrub", SCRUB_MODULE_PATH)
if spec is None or spec.loader is None:
raise SystemExit(f"unable to load scrubber module: {SCRUB_MODULE_PATH}")
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def parse_mount(value: str) -> dict[str, str]:
parts = value.split(":")
if len(parts) != 3:
raise argparse.ArgumentTypeError(
"mounts must be HOST:GUEST:MODE with MODE ro or rw"
)
host, guest, mode = parts
if mode not in {"ro", "rw"}:
raise argparse.ArgumentTypeError("mount mode must be ro or rw")
if not guest.startswith("/"):
raise argparse.ArgumentTypeError("guest mount path must be absolute")
return {"host": host, "guest": guest, "mode": mode}
def parse_env_assignment(value: str) -> tuple[str, str]:
if "=" not in value or value.startswith("="):
raise argparse.ArgumentTypeError("env assignments must be NAME=value")
key, val = value.split("=", 1)
return key, val
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="rho agent-run")
parser.add_argument(
"--config",
type=Path,
help="YAML config describing prompt, user, extensions, and sandbox settings",
)
parser.add_argument("--user", help="Agent user name for Pi")
parser.add_argument("--provider", help="Pi provider name, for example openai-codex")
parser.add_argument("--model", help="Pi model id or pattern, for example gpt-5.4")
parser.add_argument(
"--thinking",
choices=["off", "minimal", "low", "medium", "high", "xhigh"],
help="Pi thinking level",
)
prompt_group = parser.add_mutually_exclusive_group(required=False)
prompt_group.add_argument("--prompt", help="Inline prompt text")
prompt_group.add_argument(
"--prompt-file", type=Path, help="Read prompt text from a file"
)
parser.add_argument(
"-e",
"--extension",
action="append",
default=[],
help="Pi extension path, repeatable",
)
parser.add_argument(
"--sandbox",
action="store_true",
help="Run Pi through the Gondolin-backed rho run flow",
)
parser.add_argument(
"--repo-mount-mode",
choices=["ro", "rw"],
default=None,
help="How to mount the repo root in sandbox mode",
)
parser.add_argument(
"--repo-guest-root",
default=None,
help="Guest path for the mounted repo root in sandbox mode",
)
parser.add_argument(
"--guest-cwd",
default=None,
help="Guest working directory for sandbox mode; defaults to --repo-guest-root",
)
parser.add_argument(
"--mount",
action="append",
default=[],
type=parse_mount,
help="Additional sandbox mounts as HOST:GUEST:MODE",
)
parser.add_argument(
"--allow-host",
action="append",
default=[],
help="Allowlisted outbound host for sandbox mode, repeatable",
)
parser.add_argument(
"--env",
action="append",
default=[],
type=parse_env_assignment,
help="Extra sandbox environment variable NAME=value, repeatable",
)
parser.add_argument(
"--auth-json-source",
type=Path,
help="Path to a Pi auth.json file to stage into the sandbox agent directory",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Print the generated sandbox plan instead of executing it",
)
parser.add_argument(
"--print-config",
action="store_true",
help="Ask rho run to print the resolved config before execution",
)
return parser
def load_yaml_config(path: Path) -> dict:
with path.open("r", encoding="utf-8") as handle:
payload = yaml.safe_load(handle) or {}
if not isinstance(payload, dict):
raise SystemExit(f"config must be a YAML object: {path}")
return payload
def run_process(cmd: list[str], cwd: Path) -> subprocess.CompletedProcess[str]:
return subprocess.run(cmd, cwd=cwd, text=True, capture_output=True)
def resolve_path(path: str | Path, base_dir: Path) -> str:
candidate = Path(path)
if candidate.is_absolute():
return str(candidate)
return str((base_dir / candidate).resolve())
def read_text(path: Path) -> str:
return path.read_text(encoding="utf-8").strip()
def fixture_settings(config: dict, base_dir: Path) -> dict:
fixture_cfg = dict(config.get("fixture") or {})
mode = str(fixture_cfg.get("mode", "online")).strip() or "online"
if mode not in {"online", "record", "replay"}:
raise SystemExit(f"unsupported fixture mode: {mode}")
root = fixture_cfg.get("root")
key = str(fixture_cfg.get("key", "")).strip()
outputs = [str(entry).strip() for entry in (fixture_cfg.get("outputs") or []) if str(entry).strip()]
settings = {
"mode": mode,
"root": resolve_path(root, base_dir) if root else "",
"key": key,
"outputs": outputs,
}
if mode in {"record", "replay"}:
if not settings["root"]:
raise SystemExit(f"fixture.root is required when fixture mode is {mode}")
if not settings["key"]:
raise SystemExit(f"fixture.key is required when fixture mode is {mode}")
return settings
def fixture_dir(fixture: dict) -> Path:
return Path(fixture["root"]).resolve() / fixture["key"]
def fixture_meta_path(fixture: dict) -> Path:
return fixture_dir(fixture) / "result.json"
def fixture_file_path(fixture: dict, output: str) -> Path:
relative = output.lstrip("/")
if not relative:
raise SystemExit("fixture output paths must not be root")
return fixture_dir(fixture) / "files" / relative
def config_defaults(args: argparse.Namespace, repo_root: Path) -> tuple[dict, Path]:
if not args.config:
return {}, repo_root
config_path = args.config.resolve()
return load_yaml_config(config_path), config_path.parent
def effective_user(args: argparse.Namespace, config: dict) -> str:
return args.user or config.get("user", "")
def effective_provider(args: argparse.Namespace, config: dict) -> str:
return args.provider or str(config.get("provider", "")).strip()
def effective_model(args: argparse.Namespace, config: dict) -> str:
return args.model or str(config.get("model", "")).strip()
def effective_thinking(args: argparse.Namespace, config: dict) -> str:
return args.thinking or str(config.get("thinking", "")).strip()
def read_prompt(args: argparse.Namespace, config: dict, base_dir: Path) -> str:
if args.prompt is not None:
return args.prompt
if args.prompt_file is not None:
return read_text(args.prompt_file.resolve())
if "prompt" in config:
return str(config["prompt"]).strip()
prompt_file = config.get("prompt_file")
if prompt_file:
return read_text(Path(resolve_path(prompt_file, base_dir)))
return ""
def effective_extensions(args: argparse.Namespace, config: dict, base_dir: Path) -> list[str]:
if args.extension:
return args.extension
entries = config.get("extensions") or []
return [resolve_path(entry, base_dir) for entry in entries]
def sandbox_settings(args: argparse.Namespace, config: dict, base_dir: Path) -> dict:
sandbox_cfg = dict(config.get("sandbox") or {})
mounts = []
for mount in sandbox_cfg.get("mounts") or []:
mounts.append(
{
"host": resolve_path(mount["host"], base_dir),
"guest": mount["guest"],
"mode": mount.get("mode", "ro"),
}
)
mounts.extend(args.mount)
env_map = dict(sandbox_cfg.get("env") or {})
env_map.update(dict(args.env))
allow_hosts = list(sandbox_cfg.get("allow_hosts") or [])
allow_hosts.extend(args.allow_host)
return {
"enabled": bool(args.sandbox or sandbox_cfg.get("enabled", False)),
"repo_mount_mode": args.repo_mount_mode or sandbox_cfg.get("repo_mount_mode", "ro"),
"repo_guest_root": args.repo_guest_root or sandbox_cfg.get("repo_guest_root", "/app"),
"guest_cwd": args.guest_cwd or sandbox_cfg.get("guest_cwd"),
"mounts": mounts,
"allow_hosts": allow_hosts,
"env": env_map,
"audit_log_path": resolve_path(audit_log_path, base_dir)
if (audit_log_path := sandbox_cfg.get("audit_log_path"))
else None,
"audit_log_headers": bool(sandbox_cfg.get("audit_log_headers", True)),
"audit_log_bodies": bool(sandbox_cfg.get("audit_log_bodies", True)),
"audit_max_body_bytes": int(sandbox_cfg.get("audit_max_body_bytes", 32768)),
"auth_json_source": resolve_path(auth_source, base_dir)
if (auth_source := (str(args.auth_json_source) if args.auth_json_source else sandbox_cfg.get("auth_json_source")))
else None,
}
def append_pi_selection_flags(
cmd: list[str], provider: str, model: str, thinking: str
) -> None:
if provider:
cmd.extend(["--provider", provider])
if model:
cmd.extend(["--model", model])
if thinking:
cmd.extend(["--thinking", thinking])
def run_host(
repo_root: Path,
user: str,
extensions: list[str],
prompt: str,
provider: str,
model: str,
thinking: str,
) -> subprocess.CompletedProcess[str]:
cmd = [str(repo_root / "rho")]
for extension in extensions:
cmd.extend(["--extension", extension])
append_pi_selection_flags(cmd, provider, model, thinking)
cmd.extend([user, prompt])
return run_process(cmd, repo_root)
def sandbox_command(
repo_root: Path,
repo_guest_root: str,
guest_cwd: str,
user: str,
extensions: list[str],
prompt: str,
provider: str,
model: str,
thinking: str,
mounts: list[dict[str, str]],
) -> list[str]:
rho_path = f"{repo_guest_root}/rho"
shell_cmd: list[str] = [rho_path]
for extension in extensions:
shell_cmd.extend(
[
"--extension",
map_host_path_to_guest(Path(extension), repo_root, repo_guest_root, mounts),
]
)
append_pi_selection_flags(shell_cmd, provider, model, thinking)
shell_cmd.extend([user, prompt])
command = f"cd {shlex.quote(guest_cwd)} && " + shlex.join(shell_cmd)
return ["/bin/sh", "-lc", command]
def map_host_path_to_guest(
host_path: Path, repo_root: Path, repo_guest_root: str, mounts: list[dict[str, str]]
) -> str:
candidate = host_path.resolve()
options: list[tuple[Path, str]] = [(repo_root.resolve(), repo_guest_root)]
for mount in mounts:
options.append((Path(mount["host"]).resolve(), mount["guest"]))
best: tuple[Path, str] | None = None
for host_prefix, guest_prefix in options:
try:
candidate.relative_to(host_prefix)
except ValueError:
continue
if best is None or len(str(host_prefix)) > len(str(best[0])):
best = (host_prefix, guest_prefix)
if best is None:
return str(host_path)
relative = candidate.relative_to(best[0]).as_posix()
if relative == ".":
return best[1]
return f"{best[1].rstrip('/')}/{relative}"
def map_guest_path_to_host(
guest_path: str, repo_root: Path, repo_guest_root: str, mounts: list[dict[str, str]]
) -> Path:
candidate = Path(guest_path)
if not guest_path.startswith("/"):
return candidate.resolve()
options: list[tuple[str, Path]] = [(repo_guest_root.rstrip("/") + "/", repo_root.resolve())]
options.append((repo_guest_root, repo_root.resolve()))
for mount in mounts:
guest_prefix = mount["guest"].rstrip("/")
host_prefix = Path(mount["host"]).resolve()
options.append((guest_prefix + "/", host_prefix))
options.append((guest_prefix, host_prefix))
best: tuple[str, Path] | None = None
for guest_prefix, host_prefix in options:
if guest_path == guest_prefix.rstrip("/"):
if best is None or len(guest_prefix) > len(best[0]):
best = (guest_prefix, host_prefix)
continue
if guest_path.startswith(guest_prefix):
if best is None or len(guest_prefix) > len(best[0]):
best = (guest_prefix, host_prefix)
if best is None:
return candidate.resolve()
relative = guest_path[len(best[0]) :].lstrip("/")
return best[1] / relative if relative else best[1]
def write_captured_output(result: subprocess.CompletedProcess[str]) -> None:
if result.stdout:
sys.stdout.write(result.stdout)
if result.stderr:
sys.stderr.write(result.stderr)
def record_fixture_result(
fixture: dict,
result: subprocess.CompletedProcess[str],
*,
output_resolver,
) -> None:
target_dir = fixture_dir(fixture)
if target_dir.exists():
shutil.rmtree(target_dir)
(target_dir / "files").mkdir(parents=True, exist_ok=True)
captured_outputs: list[dict[str, str]] = []
for output in fixture["outputs"]:
host_path = output_resolver(output)
saved_path = fixture_file_path(fixture, output)
if host_path.exists():
saved_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(host_path, saved_path)
captured_outputs.append({"guest_path": output, "status": "captured"})
else:
captured_outputs.append({"guest_path": output, "status": "missing"})
fixture_meta_path(fixture).write_text(
json.dumps(
{
"version": 1,
"mode": "recorded",
"returncode": result.returncode,
"stdout": result.stdout,
"stderr": result.stderr,
"outputs": captured_outputs,
},
indent=2,
)
+ "\n",
encoding="utf-8",
)
scrubber = load_scrub_module()
for candidate in (target_dir,):
for path in scrubber.iter_target_files(candidate):
scrubber.scrub_file(path)
def replay_fixture_result(fixture: dict, *, output_resolver) -> subprocess.CompletedProcess[str]:
meta_path = fixture_meta_path(fixture)
if not meta_path.is_file():
raise SystemExit(f"fixture result not found: {meta_path}")
payload = json.loads(meta_path.read_text(encoding="utf-8"))
if payload.get("version") != 1:
raise SystemExit(f"unsupported fixture result version in {meta_path}")
for output in fixture["outputs"]:
source = fixture_file_path(fixture, output)
status = "captured"
for item in payload.get("outputs", []):
if item.get("guest_path") == output:
status = str(item.get("status", "captured"))
break
if status != "captured":
continue
if not source.is_file():
raise SystemExit(f"fixture output file missing: {source}")
target = output_resolver(output)
target.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(source, target)
result = subprocess.CompletedProcess(
args=["fixture-replay", fixture["key"]],
returncode=int(payload.get("returncode", 1)),
stdout=str(payload.get("stdout", "")),
stderr=str(payload.get("stderr", "")),
)
write_captured_output(result)
return result
def run_sandbox(
repo_root: Path,
user: str,
extensions: list[str],
prompt: str,
provider: str,
model: str,
thinking: str,
sandbox: dict,
args: argparse.Namespace,
) -> subprocess.CompletedProcess[str]:
repo_guest_root = sandbox["repo_guest_root"]
guest_cwd = sandbox["guest_cwd"] or repo_guest_root
cleanup_dirs: list[Path] = []
mounts = [
{
"host": str(repo_root),
"guest": repo_guest_root,
"mode": sandbox["repo_mount_mode"],
},
*sandbox["mounts"],
]
auth_json_source = sandbox.get("auth_json_source")
if auth_json_source:
auth_path = Path(auth_json_source)
if not auth_path.is_file():
return subprocess.CompletedProcess(
args=["auth-json-check", str(auth_path)],
returncode=2,
stdout="",
stderr=f"auth json not found: {auth_path}\n",
)
agent_dir = Path(tempfile.mkdtemp(prefix="rho-agent-auth-"))
cleanup_dirs.append(agent_dir)
shutil.copy2(auth_path, agent_dir / "auth.json")
mounts.append(
{
"host": str(agent_dir),
"guest": f"{repo_guest_root}/users/{user}/agent",
"mode": "rw",
}
)
config = {
"mounts": mounts,
"command": sandbox_command(
repo_root,
repo_guest_root,
guest_cwd,
user,
extensions,
prompt,
provider,
model,
thinking,
mounts,
),
"cwd": guest_cwd,
"env": sandbox["env"],
"network": {
"defaultDeny": True,
"audit": {
"logPath": sandbox["audit_log_path"],
"logHeaders": sandbox["audit_log_headers"],
"logBodies": sandbox["audit_log_bodies"],
"maxBodyBytes": sandbox["audit_max_body_bytes"],
}
if sandbox.get("audit_log_path")
else None,
},
}
with tempfile.NamedTemporaryFile(
mode="w", suffix=".yaml", prefix="rho-agent-run-", delete=False
) as handle:
yaml.safe_dump(config, handle, sort_keys=False)
config_path = Path(handle.name)
try:
cmd = [str(repo_root / "rho"), "run", str(config_path)]
if args.dry_run:
cmd.append("--dry-run")
if args.print_config:
cmd.append("--print-config")
for host in sandbox["allow_hosts"]:
cmd.extend(["--allow-host", host])
return run_process(cmd, repo_root)
finally:
try:
config_path.unlink()
except FileNotFoundError:
pass
for directory in cleanup_dirs:
shutil.rmtree(directory, ignore_errors=True)
def main() -> int:
parser = build_parser()
args = parser.parse_args()
repo_root = Path(__file__).resolve().parent.parent
config, base_dir = config_defaults(args, repo_root)
user = effective_user(args, config)
if not user:
print("user was empty", file=sys.stderr)
return 2
prompt = read_prompt(args, config, base_dir)
if not prompt:
print("prompt was empty", file=sys.stderr)
return 2
provider = effective_provider(args, config)
model = effective_model(args, config)
thinking = effective_thinking(args, config)
extensions = effective_extensions(args, config, base_dir)
sandbox = sandbox_settings(args, config, base_dir)
fixture = fixture_settings(config, base_dir)
if sandbox["enabled"]:
repo_guest_root = sandbox["repo_guest_root"]
repo_mounts = [
{
"host": str(repo_root),
"guest": repo_guest_root,
"mode": sandbox["repo_mount_mode"],
},
*sandbox["mounts"],
]
def output_resolver(output: str) -> Path:
return map_guest_path_to_host(output, repo_root, repo_guest_root, repo_mounts)
else:
def output_resolver(output: str) -> Path:
candidate = Path(output)
return candidate if candidate.is_absolute() else (base_dir / candidate).resolve()
if fixture["mode"] == "replay":
result = replay_fixture_result(fixture, output_resolver=output_resolver)
return result.returncode
if sandbox["enabled"]:
result = run_sandbox(
repo_root, user, extensions, prompt, provider, model, thinking, sandbox, args
)
else:
result = run_host(repo_root, user, extensions, prompt, provider, model, thinking)
write_captured_output(result)
if fixture["mode"] == "record":
record_fixture_result(fixture, result, output_resolver=output_resolver)
return result.returncode
if __name__ == "__main__":
raise SystemExit(main())