from __future__ import annotations
import argparse
from datetime import datetime, timezone
from pathlib import Path
import sys
import shlex
import yaml
def timestamp() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
def load_yaml(path: Path) -> dict:
with path.open("r", encoding="utf-8") as handle:
data = yaml.safe_load(handle) or {}
if not isinstance(data, dict):
raise SystemExit(f"invalid yaml object in {path}")
return data
def dump_yaml(path: Path, payload: dict) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as handle:
yaml.safe_dump(payload, handle, sort_keys=False)
def request_path(shared_root: Path, request_id: str) -> Path:
return shared_root / ".rho" / "requests" / f"{request_id}.yaml"
def approval_path(shared_root: Path, request_id: str) -> Path:
return shared_root / ".rho" / "approvals" / f"{request_id}.yaml"
def approval_status(shared_root: Path, request_id: str) -> str:
path = approval_path(shared_root, request_id)
if not path.is_file():
return "pending"
payload = load_yaml(path)
decision = str(payload.get("approval", {}).get("decision", "")).strip().lower()
if decision in {"approve", "approved"}:
return "approved"
if decision in {"deny", "denied", "reject", "rejected"}:
return "denied"
return "pending"
def request_payload(shared_root: Path, request_id: str) -> dict:
path = request_path(shared_root, request_id)
if not path.is_file():
raise SystemExit(f"request not found: {path}")
payload = load_yaml(path)
request = payload.get("request")
if not isinstance(request, dict):
raise SystemExit(f"invalid request payload in {path}")
return request
def format_request_summary(shared_root: Path, request_id: str) -> str:
request = request_payload(shared_root, request_id)
code_paths = request.get("code_paths") or []
command = request.get("command") or []
summary = request.get("summary") or ""
created_at = request.get("created_at") or ""
lines = [
"Pending approval",
"",
f"Request: {request_id}",
f"Status: {approval_status(shared_root, request_id)}",
f"From: {request.get('from', '')}",
f"To: {request.get('to', '')}",
f"Dataset: {request.get('dataset_uuid', '')}",
f"Tier: {request.get('requested_tier', '')}",
]
if created_at:
lines.append(f"Created: {created_at}")
if summary:
lines.extend(["", f"Summary: {summary}"])
if code_paths:
lines.extend(["", "Code:"])
lines.extend(f"- {path}" for path in code_paths)
if command:
lines.extend(["", f"Command: {shlex.join([str(part) for part in command])}"])
lines.extend(
[
"",
"Reply with:",
f"/approve {request_id}",
f"/deny {request_id}",
f"/show {request_id}",
"",
"Shortcuts:",
"/approve_last",
"/deny_last",
]
)
return "\n".join(lines)
def pending_request_ids(shared_root: Path) -> list[str]:
request_dir = shared_root / ".rho" / "requests"
if not request_dir.is_dir():
return []
candidates = sorted(request_dir.glob("*.yaml"), key=lambda path: path.stat().st_mtime)
pending: list[str] = []
for path in candidates:
request_id = path.stem
if approval_status(shared_root, request_id) == "pending":
pending.append(request_id)
return pending
def cmd_status(args: argparse.Namespace) -> int:
path = request_path(args.shared_root, args.request_id)
if not path.is_file():
print(f"request not found: {path}", file=sys.stderr)
return 1
print(approval_status(args.shared_root, args.request_id))
return 0
def resolve_request_id(args: argparse.Namespace) -> str:
if getattr(args, "latest", False):
pending = pending_request_ids(args.shared_root)
if not pending:
print("no pending requests", file=sys.stderr)
raise SystemExit(1)
return pending[-1]
if args.request_id:
return args.request_id
print("request id is required", file=sys.stderr)
raise SystemExit(1)
def cmd_approve(args: argparse.Namespace) -> int:
request_id = resolve_request_id(args)
path = request_path(args.shared_root, request_id)
if not path.is_file():
print(f"request not found: {path}", file=sys.stderr)
return 1
request_payload = load_yaml(path)
request = request_payload.get("request", {})
requester = request.get("from")
approver = args.actor or "cli"
normalized = args.decision.lower()
if normalized == "approve":
status = "approved"
elif normalized == "deny":
status = "denied"
else:
print(f"unsupported decision: {args.decision}", file=sys.stderr)
return 1
payload = {
"version": 1,
"approval": {
"request_id": request_id,
"decision": status,
"approver": approver,
"note": args.note or "",
"created_at": timestamp(),
},
}
dump_yaml(approval_path(args.shared_root, request_id), payload)
if requester:
inbox_path = (
args.shared_root
/ ".rho"
/ "inbox"
/ str(requester)
/ f"approval-{request_id}.yaml"
)
dump_yaml(
inbox_path,
{
"version": 1,
"message": {
"id": f"approval-{request_id}",
"from": approver,
"to": requester,
"type": "approval_update",
"related_request_id": request_id,
"body": {
"status": status,
"text": args.note or f"Request {request_id} {status}.",
},
},
},
)
print(status)
return 0
def cmd_show(args: argparse.Namespace) -> int:
print(format_request_summary(args.shared_root, args.request_id))
return 0
def cmd_pending(args: argparse.Namespace) -> int:
pending = pending_request_ids(args.shared_root)
if args.latest:
if not pending:
print("no pending requests", file=sys.stderr)
return 1
print(pending[-1])
return 0
for request_id in pending:
print(request_id)
return 0
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="rho-request")
subparsers = parser.add_subparsers(dest="command", required=True)
status = subparsers.add_parser("status")
status.add_argument("request_id")
status.add_argument("--shared-root", type=Path, required=True)
status.set_defaults(func=cmd_status)
show = subparsers.add_parser("show")
show.add_argument("request_id")
show.add_argument("--shared-root", type=Path, required=True)
show.set_defaults(func=cmd_show)
pending = subparsers.add_parser("pending")
pending.add_argument("--shared-root", type=Path, required=True)
pending.add_argument("--latest", action="store_true")
pending.set_defaults(func=cmd_pending)
approve = subparsers.add_parser("approve")
approve.add_argument("request_id", nargs="?")
approve.add_argument("--shared-root", type=Path, required=True)
approve.add_argument("--decision", choices=["approve", "deny"], required=True)
approve.add_argument("--latest", action="store_true")
approve.add_argument("--actor")
approve.add_argument("--note")
approve.set_defaults(func=cmd_approve)
return parser
def main() -> int:
parser = build_parser()
args = parser.parse_args()
return args.func(args)
if __name__ == "__main__":
raise SystemExit(main())