rho-cli 0.1.24

Rho CLI tools for encrypted agent collaboration, dataset publishing, controlled runs, and result release workflows
Documentation
#!/usr/bin/env python3

from __future__ import annotations

import argparse
from datetime import datetime, timezone
from pathlib import Path
import sys
import shlex

import yaml


def timestamp() -> str:
    return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")


def load_yaml(path: Path) -> dict:
    with path.open("r", encoding="utf-8") as handle:
        data = yaml.safe_load(handle) or {}
    if not isinstance(data, dict):
        raise SystemExit(f"invalid yaml object in {path}")
    return data


def dump_yaml(path: Path, payload: dict) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    with path.open("w", encoding="utf-8") as handle:
        yaml.safe_dump(payload, handle, sort_keys=False)


def request_path(shared_root: Path, request_id: str) -> Path:
    return shared_root / ".rho" / "requests" / f"{request_id}.yaml"


def approval_path(shared_root: Path, request_id: str) -> Path:
    return shared_root / ".rho" / "approvals" / f"{request_id}.yaml"


def approval_status(shared_root: Path, request_id: str) -> str:
    path = approval_path(shared_root, request_id)
    if not path.is_file():
        return "pending"

    payload = load_yaml(path)
    decision = str(payload.get("approval", {}).get("decision", "")).strip().lower()
    if decision in {"approve", "approved"}:
        return "approved"
    if decision in {"deny", "denied", "reject", "rejected"}:
        return "denied"
    return "pending"


def request_payload(shared_root: Path, request_id: str) -> dict:
    path = request_path(shared_root, request_id)
    if not path.is_file():
        raise SystemExit(f"request not found: {path}")
    payload = load_yaml(path)
    request = payload.get("request")
    if not isinstance(request, dict):
        raise SystemExit(f"invalid request payload in {path}")
    return request


def format_request_summary(shared_root: Path, request_id: str) -> str:
    request = request_payload(shared_root, request_id)
    code_paths = request.get("code_paths") or []
    command = request.get("command") or []
    summary = request.get("summary") or ""
    created_at = request.get("created_at") or ""

    lines = [
        "Pending approval",
        "",
        f"Request: {request_id}",
        f"Status: {approval_status(shared_root, request_id)}",
        f"From: {request.get('from', '')}",
        f"To: {request.get('to', '')}",
        f"Dataset: {request.get('dataset_uuid', '')}",
        f"Tier: {request.get('requested_tier', '')}",
    ]
    if created_at:
        lines.append(f"Created: {created_at}")
    if summary:
        lines.extend(["", f"Summary: {summary}"])
    if code_paths:
        lines.extend(["", "Code:"])
        lines.extend(f"- {path}" for path in code_paths)
    if command:
        lines.extend(["", f"Command: {shlex.join([str(part) for part in command])}"])
    lines.extend(
        [
            "",
            "Reply with:",
            f"/approve {request_id}",
            f"/deny {request_id}",
            f"/show {request_id}",
            "",
            "Shortcuts:",
            "/approve_last",
            "/deny_last",
        ]
    )
    return "\n".join(lines)


def pending_request_ids(shared_root: Path) -> list[str]:
    request_dir = shared_root / ".rho" / "requests"
    if not request_dir.is_dir():
        return []
    candidates = sorted(request_dir.glob("*.yaml"), key=lambda path: path.stat().st_mtime)
    pending: list[str] = []
    for path in candidates:
        request_id = path.stem
        if approval_status(shared_root, request_id) == "pending":
            pending.append(request_id)
    return pending


def cmd_status(args: argparse.Namespace) -> int:
    path = request_path(args.shared_root, args.request_id)
    if not path.is_file():
        print(f"request not found: {path}", file=sys.stderr)
        return 1
    print(approval_status(args.shared_root, args.request_id))
    return 0


def resolve_request_id(args: argparse.Namespace) -> str:
    if getattr(args, "latest", False):
        pending = pending_request_ids(args.shared_root)
        if not pending:
            print("no pending requests", file=sys.stderr)
            raise SystemExit(1)
        return pending[-1]

    if args.request_id:
        return args.request_id

    print("request id is required", file=sys.stderr)
    raise SystemExit(1)


def cmd_approve(args: argparse.Namespace) -> int:
    request_id = resolve_request_id(args)
    path = request_path(args.shared_root, request_id)
    if not path.is_file():
        print(f"request not found: {path}", file=sys.stderr)
        return 1

    request_payload = load_yaml(path)
    request = request_payload.get("request", {})
    requester = request.get("from")
    approver = args.actor or "cli"
    normalized = args.decision.lower()
    if normalized == "approve":
        status = "approved"
    elif normalized == "deny":
        status = "denied"
    else:
        print(f"unsupported decision: {args.decision}", file=sys.stderr)
        return 1

    payload = {
        "version": 1,
        "approval": {
            "request_id": request_id,
            "decision": status,
            "approver": approver,
            "note": args.note or "",
            "created_at": timestamp(),
        },
    }
    dump_yaml(approval_path(args.shared_root, request_id), payload)

    if requester:
        inbox_path = (
            args.shared_root
            / ".rho"
            / "inbox"
            / str(requester)
            / f"approval-{request_id}.yaml"
        )
        dump_yaml(
            inbox_path,
            {
                "version": 1,
                "message": {
                    "id": f"approval-{request_id}",
                    "from": approver,
                    "to": requester,
                    "type": "approval_update",
                    "related_request_id": request_id,
                    "body": {
                        "status": status,
                        "text": args.note or f"Request {request_id} {status}.",
                    },
                },
            },
        )

    print(status)
    return 0


def cmd_show(args: argparse.Namespace) -> int:
    print(format_request_summary(args.shared_root, args.request_id))
    return 0


def cmd_pending(args: argparse.Namespace) -> int:
    pending = pending_request_ids(args.shared_root)
    if args.latest:
      if not pending:
          print("no pending requests", file=sys.stderr)
          return 1
      print(pending[-1])
      return 0

    for request_id in pending:
        print(request_id)
    return 0


def build_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(prog="rho-request")
    subparsers = parser.add_subparsers(dest="command", required=True)

    status = subparsers.add_parser("status")
    status.add_argument("request_id")
    status.add_argument("--shared-root", type=Path, required=True)
    status.set_defaults(func=cmd_status)

    show = subparsers.add_parser("show")
    show.add_argument("request_id")
    show.add_argument("--shared-root", type=Path, required=True)
    show.set_defaults(func=cmd_show)

    pending = subparsers.add_parser("pending")
    pending.add_argument("--shared-root", type=Path, required=True)
    pending.add_argument("--latest", action="store_true")
    pending.set_defaults(func=cmd_pending)

    approve = subparsers.add_parser("approve")
    approve.add_argument("request_id", nargs="?")
    approve.add_argument("--shared-root", type=Path, required=True)
    approve.add_argument("--decision", choices=["approve", "deny"], required=True)
    approve.add_argument("--latest", action="store_true")
    approve.add_argument("--actor")
    approve.add_argument("--note")
    approve.set_defaults(func=cmd_approve)

    return parser


def main() -> int:
    parser = build_parser()
    args = parser.parse_args()
    return args.func(args)


if __name__ == "__main__":
    raise SystemExit(main())