from __future__ import annotations
import argparse
import hashlib
import json
import os
from pathlib import Path
import re
import shutil
import subprocess
import sys
from typing import Any
import yaml
REQUEST_ID_RE = re.compile(r"^req-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$")
ACTION_ID_RE = re.compile(r"^act-[a-z0-9_-]{1,80}$")
AGENT_RE = re.compile(r"^[a-z][a-z0-9_-]{0,31}$")
ABS_PATH_RE = re.compile(r"^/[A-Za-z0-9._/-]{1,240}$")
SAFE_SEGMENT_RE = re.compile(r"^[A-Za-z0-9._-]+$")
ALLOWED_PREFIXES = ("/input/", "/output/", "/messages/", "/tool-templates/", "/results/", "/release/")
ALLOWED_ACTIONS = {"run_real_data", "release_results"}
STATUS_VALUES = {"pending", "approved", "denied", "running", "completed", "failed"}
DECISION_VALUES = {"approved", "denied"}
def load_json(path: Path) -> dict:
with path.open("r", encoding="utf-8") as handle:
payload = json.load(handle)
if not isinstance(payload, dict):
raise SystemExit(f"invalid json object in {path}")
return payload
def dump_json(path: Path, payload: dict) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8")
def validate_text(value: object, label: str, max_length: int) -> str:
if not isinstance(value, str):
raise SystemExit(f"{label} must be a string")
if not value or len(value) > max_length or "\x00" in value:
raise SystemExit(f"{label} must be between 1 and {max_length} characters and contain no NUL bytes")
return value
def validate_regex(value: object, label: str, pattern: re.Pattern[str]) -> str:
if not isinstance(value, str) or not pattern.fullmatch(value):
raise SystemExit(f"{label} failed validation")
return value
def validate_guest_path(value: object, label: str) -> str:
raw = validate_regex(value, label, ABS_PATH_RE)
if not raw.startswith(ALLOWED_PREFIXES):
raise SystemExit(f"{label} must be under {', '.join(ALLOWED_PREFIXES)}")
normalized = os.path.normpath(raw).replace("\\", "/")
if normalized != raw:
raise SystemExit(f"{label} must already be normalized")
for segment in raw.split("/"):
if not segment or segment == ".":
continue
if segment == ".." or not SAFE_SEGMENT_RE.fullmatch(segment):
raise SystemExit(f"{label} contains an unsafe path segment: {segment}")
return raw
def validate_template_vars(value: object) -> list[dict[str, str]]:
if value in (None, []):
return []
if not isinstance(value, list) or len(value) > 8:
raise SystemExit("template_vars must be a list with at most 8 entries")
validated: list[dict[str, str]] = []
for item in value:
if not isinstance(item, dict):
raise SystemExit("template_vars entries must be objects")
name = validate_regex(item.get("name"), "template_vars.name", re.compile(r"^[a-z][a-z0-9_]{0,31}$"))
val = validate_text(item.get("value"), "template_vars.value", 240)
validated.append({"name": name, "value": val})
return validated
def validate_action_payload(payload: dict) -> dict:
if payload.get("version") != 1:
raise SystemExit("controlled action version must be 1")
if payload.get("kind") != "controlled_action":
raise SystemExit("controlled action kind must be controlled_action")
action = payload.get("action")
if not isinstance(action, dict):
raise SystemExit("controlled action missing action object")
supporting_raw = action.get("supporting_paths") or []
if not isinstance(supporting_raw, list) or len(supporting_raw) > 8:
raise SystemExit("supporting_paths must be a list with at most 8 entries")
validated = {
"action_id": validate_regex(action.get("action_id"), "action_id", ACTION_ID_RE),
"request_id": validate_regex(action.get("request_id"), "request_id", REQUEST_ID_RE),
"requested_by": validate_regex(action.get("requested_by"), "requested_by", AGENT_RE),
"requested_for": validate_regex(action.get("requested_for"), "requested_for", AGENT_RE),
"action_type": validate_text(action.get("action_type"), "action_type", 64),
"summary": validate_text(action.get("summary"), "summary", 240),
"reason": validate_text(action.get("reason"), "reason", 1200),
"input_path": validate_guest_path(action.get("input_path"), "input_path"),
"output_path": validate_guest_path(action.get("output_path"), "output_path"),
"script_path": action.get("script_path"),
"message_path": action.get("message_path"),
"manifest_path": action.get("manifest_path"),
"notification_path": action.get("notification_path"),
"supporting_paths": [validate_guest_path(value, "supporting_paths") for value in supporting_raw],
"template_vars": validate_template_vars(action.get("template_vars")),
}
if validated["action_type"] not in ALLOWED_ACTIONS:
raise SystemExit(f"unsupported action_type: {validated['action_type']}")
if validated["script_path"] is not None:
validated["script_path"] = validate_guest_path(validated["script_path"], "script_path")
if validated["message_path"] is not None:
validated["message_path"] = validate_guest_path(validated["message_path"], "message_path")
if validated["manifest_path"] is not None:
validated["manifest_path"] = validate_guest_path(validated["manifest_path"], "manifest_path")
if validated["notification_path"] is not None:
validated["notification_path"] = validate_guest_path(validated["notification_path"], "notification_path")
if validated["action_type"] == "run_real_data" and not validated["script_path"]:
raise SystemExit("script_path is required for run_real_data")
if validated["action_type"] == "release_results":
if not validated["manifest_path"]:
raise SystemExit("manifest_path is required for release_results")
if not validated["notification_path"]:
raise SystemExit("notification_path is required for release_results")
return validated
def load_action_manifest(manifest_dir: Path, request_id: str) -> dict | None:
path = manifest_dir / f"{request_id}.yaml"
if not path.is_file():
return None
with path.open("r", encoding="utf-8") as handle:
payload = yaml.safe_load(handle) or {}
if not isinstance(payload, dict):
raise SystemExit(f"action manifest must be a YAML object: {path}")
expected = payload.get("expected_action")
if not isinstance(expected, dict):
raise SystemExit(f"action manifest missing expected_action object: {path}")
return expected
def validate_against_manifest(action: dict, manifest: dict) -> None:
request_id = manifest.get("request_id")
if request_id and action["request_id"] != request_id:
raise SystemExit(
f"manifest binding failed: request_id mismatch: "
f"action has {action['request_id']}, manifest expects {request_id}"
)
action_type = manifest.get("action_type")
if action_type and action["action_type"] != action_type:
raise SystemExit(
f"manifest binding failed: action_type mismatch: "
f"action has {action['action_type']}, manifest expects {action_type}"
)
for field, manifest_key in [
("input_path", "allowed_input_paths"),
("script_path", "allowed_script_paths"),
("output_path", "allowed_output_paths"),
]:
allowed = manifest.get(manifest_key)
if allowed is None:
continue
if not isinstance(allowed, list):
raise SystemExit(f"manifest {manifest_key} must be a list")
value = action.get(field)
if value is not None and value not in allowed:
raise SystemExit(
f"manifest binding failed: {field} '{value}' "
f"not in allowed set {allowed}"
)
for field, manifest_key in [
("requested_by", "required_requested_by"),
("requested_for", "required_requested_for"),
]:
required = manifest.get(manifest_key)
if required is None:
continue
if action[field] != required:
raise SystemExit(
f"manifest binding failed: {field} mismatch: "
f"action has {action[field]}, manifest requires {required}"
)
def sha256_file(path: Path) -> str:
hasher = hashlib.sha256()
with path.open("rb") as handle:
while True:
chunk = handle.read(65536)
if not chunk:
break
hasher.update(chunk)
return hasher.hexdigest()
def verify_code_digest(
action: dict, manifest: dict, mappings: dict[str, Path]
) -> None:
expected_digest = manifest.get("code_digest")
if not expected_digest:
return
script_guest = action.get("script_path")
if not script_guest:
return
script_host = map_guest_to_host(script_guest, mappings)
if not script_host.is_file():
raise SystemExit(f"script file not found for digest check: {script_host}")
actual = sha256_file(script_host)
if actual != expected_digest:
raise SystemExit(
f"code digest mismatch: expected {expected_digest}, got {actual}"
)
def run_command(command: list[str], *, env: dict[str, str] | None = None) -> subprocess.CompletedProcess[str]:
return subprocess.run(command, text=True, capture_output=True, env=env)
def load_fixture_policy(path: Path) -> dict[str, Any]:
if not path.is_file():
raise SystemExit(f"middleware fixture file not found: {path}")
with path.open("r", encoding="utf-8") as handle:
payload = yaml.safe_load(handle) or {}
if not isinstance(payload, dict):
raise SystemExit(f"middleware fixture file must contain an object: {path}")
return payload
def latest_outbox_file(control_root: Path, action_id: str | None) -> Path:
outbox = control_root / "outbox"
if action_id:
candidate = outbox / f"{action_id}.json"
if not candidate.is_file():
raise SystemExit(f"controlled action outbox file not found: {candidate}")
return candidate
candidates = sorted(outbox.glob("act-*.json"), key=lambda path: path.stat().st_mtime)
if not candidates:
raise SystemExit(f"no controlled action outbox files found in {outbox}")
return candidates[-1]
def write_status(
control_root: Path,
action_id: str,
request_id: str,
state: str,
actor: str,
note: str,
*,
output_path: str = "",
) -> Path:
if state not in STATUS_VALUES:
raise SystemExit(f"invalid controlled action state: {state}")
target = control_root / "inbox" / f"{action_id}.json"
dump_json(
target,
{
"version": 1,
"kind": "controlled_action_status",
"status": {
"action_id": action_id,
"request_id": request_id,
"state": state,
"actor": actor,
"note": note,
"output_path": output_path,
},
},
)
return target
def map_guest_to_host(guest_path: str, mappings: dict[str, Path]) -> Path:
best_prefix: str | None = None
for prefix in mappings:
if guest_path == prefix.rstrip("/"):
if best_prefix is None or len(prefix) > len(best_prefix):
best_prefix = prefix
continue
if guest_path.startswith(prefix):
if best_prefix is None or len(prefix) > len(best_prefix):
best_prefix = prefix
if best_prefix is None:
raise SystemExit(f"no host mapping found for guest path: {guest_path}")
relative = guest_path[len(best_prefix) :].lstrip("/")
host_root = mappings[best_prefix]
return host_root / relative if relative else host_root
def parse_mappings(entries: list[str]) -> dict[str, Path]:
mappings: dict[str, Path] = {}
for entry in entries:
if "=" not in entry:
raise SystemExit(f"invalid map entry: {entry}")
guest_prefix, host_root = entry.split("=", 1)
if not guest_prefix.startswith("/"):
raise SystemExit(f"map prefix must be absolute guest path: {entry}")
prefix = guest_prefix.rstrip("/") + "/"
mappings[prefix] = Path(host_root).resolve()
return mappings
def approve_request_default(shared_root: Path, request_id: str) -> str:
result = run_command(
[
"./rho",
"approve",
request_id,
"--shared-root",
str(shared_root),
"--decision",
"approve",
"--actor",
"controlled-action-relay",
"--note",
"Approved by automated controlled action relay.",
]
)
if result.returncode != 0:
sys.stdout.write(result.stdout)
sys.stderr.write(result.stderr)
raise SystemExit(result.returncode)
return "approved"
def build_action_message(action: dict, manifest: dict | None = None) -> str:
lines = [
"Controlled action pending approval",
"",
f"Action: {action['action_id']}",
f"Request: {action['request_id']}",
f"Type: {action['action_type']}",
f"From: {action['requested_by']}",
f"For: {action['requested_for']}",
]
if manifest is not None:
allowed_inputs = manifest.get("allowed_input_paths", [])
allowed_scripts = manifest.get("allowed_script_paths", [])
allowed_outputs = manifest.get("allowed_output_paths", [])
code_digest = manifest.get("code_digest") or "none"
lines.extend([
"",
"--- Host-verified manifest ---",
f"Allowed inputs: {allowed_inputs}",
f"Allowed scripts: {allowed_scripts}",
f"Allowed outputs: {allowed_outputs}",
f"Code digest: {code_digest}",
"",
"--- Agent-proposed values ---",
f"Input: {action['input_path']}",
f"Output: {action['output_path']}",
])
if action.get("script_path"):
lines.append(f"Script: {action['script_path']}")
lines.extend([
"",
"--- Agent-generated (unverified) ---",
f"Summary: {action['summary']}",
f"Reason: {action['reason']}",
])
else:
lines.extend([
f"Input: {action['input_path']}",
f"Output: {action['output_path']}",
f"Summary: {action['summary']}",
f"Reason: {action['reason']}",
])
if action.get("script_path"):
lines.append(f"Script: {action['script_path']}")
if action.get("message_path"):
lines.append(f"Message: {action['message_path']}")
if action.get("manifest_path"):
lines.append(f"Manifest: {action['manifest_path']}")
if action.get("notification_path"):
lines.append(f"Notification: {action['notification_path']}")
if action.get("supporting_paths"):
lines.append("")
lines.append("Supporting paths:")
lines.extend(f"- {path}" for path in action["supporting_paths"])
lines.extend(
[
"",
"Reply with:",
f"/approve {action['request_id']}",
f"/deny {action['request_id']}",
]
)
return "\n".join(lines)
def approve_request_live(args: argparse.Namespace, action: dict, manifest: dict | None = None) -> str:
env = dict(os.environ)
env["RHO_TELEGRAM_ROOT"] = str(args.telegram_root)
message = build_action_message(action, manifest)
send = run_command(
[
"./rho-telegram",
"send",
args.telegram_profile,
"--user",
args.telegram_user,
"--text",
message,
],
env=env,
)
if send.returncode != 0:
sys.stdout.write(send.stdout)
sys.stderr.write(send.stderr)
raise SystemExit(send.returncode)
listen = run_command(
[
"./rho-telegram",
"approval-listen",
args.telegram_profile,
"--user",
args.telegram_user,
"--shared-root",
str(args.shared_root),
"--once",
"--timeout-seconds",
str(args.timeout_seconds),
],
env=env,
)
if listen.returncode != 0:
sys.stdout.write(listen.stdout)
sys.stderr.write(listen.stderr)
raise SystemExit(listen.returncode)
status = run_command(
[
"./rho",
"request",
"status",
action["request_id"],
"--shared-root",
str(args.shared_root),
]
)
if status.returncode != 0:
sys.stdout.write(status.stdout)
sys.stderr.write(status.stderr)
raise SystemExit(status.returncode)
return status.stdout.strip()
def decide_via_fixture(args: argparse.Namespace, action: dict) -> tuple[str, str]:
if not args.fixture_file:
raise SystemExit("fixture_file is required when middleware is fixture")
policy = load_fixture_policy(args.fixture_file)
default_decision = str(policy.get("default_decision", "approved")).strip().lower()
if default_decision not in DECISION_VALUES:
raise SystemExit(f"unsupported default_decision in fixture policy: {default_decision}")
decision = None
action_rules = policy.get("actions") or {}
if isinstance(action_rules, dict):
for key in (
action["action_id"],
action["request_id"],
action["action_type"],
):
if key in action_rules:
decision = str(action_rules[key]).strip().lower()
break
if decision is None:
decision = default_decision
if decision not in DECISION_VALUES:
raise SystemExit(f"unsupported fixture decision: {decision}")
return decision, "fixture-middleware"
def approve_request(args: argparse.Namespace, action: dict, manifest: dict | None = None) -> tuple[str, str]:
if args.middleware == "auto":
return approve_request_default(args.shared_root, action["request_id"]), "controlled-action-relay"
if args.middleware == "telegram":
for field in ("telegram_profile", "telegram_root", "telegram_user"):
if getattr(args, field) in (None, ""):
raise SystemExit(f"{field} is required when middleware is telegram")
return approve_request_live(args, action, manifest), "telegram-operator"
if args.middleware == "fixture":
return decide_via_fixture(args, action)
raise SystemExit(f"unsupported middleware adapter: {args.middleware}")
def execute_run_real_data(action: dict, mappings: dict[str, Path]) -> tuple[str, str]:
script_path = map_guest_to_host(action["script_path"], mappings)
input_path = map_guest_to_host(action["input_path"], mappings)
output_path = map_guest_to_host(action["output_path"], mappings)
output_path.parent.mkdir(parents=True, exist_ok=True)
result = subprocess.run(
["python3", str(script_path), str(input_path)],
text=True,
capture_output=True,
)
if result.returncode != 0:
raise RuntimeError(result.stderr.strip() or f"python3 exited with {result.returncode}")
output_text = result.stdout.strip() + "\n"
output_path.write_text(output_text, encoding="utf-8")
return str(output_path), output_text.strip()
def execute_release_results(action: dict, mappings: dict[str, Path]) -> tuple[str, str]:
input_path = map_guest_to_host(action["input_path"], mappings)
output_path = map_guest_to_host(action["output_path"], mappings)
manifest_path = map_guest_to_host(action["manifest_path"], mappings)
notification_path = map_guest_to_host(action["notification_path"], mappings)
output_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(input_path, output_path)
manifest = {
"version": 1,
"result": {
"request_id": action["request_id"],
"action_id": action["action_id"],
"status": "released",
"source": action["input_path"],
"released_to": action["output_path"],
},
}
notification = {
"version": 1,
"message": {
"id": f"release-{action['request_id']}",
"from": action["requested_for"],
"to": action["requested_by"],
"type": "result_release",
"related_request_id": action["request_id"],
"body": {
"status": "released",
"path": action["output_path"],
"text": f"Results for {action['request_id']} are now available.",
},
},
}
manifest_path.parent.mkdir(parents=True, exist_ok=True)
notification_path.parent.mkdir(parents=True, exist_ok=True)
manifest_path.write_text(json.dumps(manifest, indent=2) + "\n", encoding="utf-8")
notification_path.write_text(json.dumps(notification, indent=2) + "\n", encoding="utf-8")
return str(output_path), output_path.read_text(encoding="utf-8").strip()
def execute_action(action: dict, mappings: dict[str, Path]) -> tuple[str, str]:
if action["action_type"] == "run_real_data":
return execute_run_real_data(action, mappings)
if action["action_type"] == "release_results":
return execute_release_results(action, mappings)
raise SystemExit(f"unsupported action_type: {action['action_type']}")
def cmd_relay(args: argparse.Namespace) -> int:
mappings = parse_mappings(args.map)
outbox_path = latest_outbox_file(args.control_root, args.action_id)
payload = load_json(outbox_path)
action = validate_action_payload(payload)
action_id = action["action_id"]
request_id = action["request_id"]
if args.action_id and action_id != args.action_id:
raise SystemExit(f"action id mismatch: file payload targets {action_id}, expected {args.action_id}")
manifest = None
if args.manifest_dir:
manifest = load_action_manifest(args.manifest_dir, request_id)
if manifest is not None:
validate_against_manifest(action, manifest)
verify_code_digest(action, manifest, mappings)
elif args.require_manifest:
raise SystemExit(f"no action manifest found for {request_id} in {args.manifest_dir}")
decision, actor = approve_request(args, action, manifest)
if decision != "approved":
inbox_path = write_status(
args.control_root,
action_id,
request_id,
"denied" if decision == "denied" else decision,
actor,
f"Controlled action {action_id} was not approved.",
)
print(inbox_path)
raise SystemExit(1)
write_status(
args.control_root,
action_id,
request_id,
"running",
"controlled-action-relay",
f"Executing controlled action {action_id}.",
)
try:
host_output_path, detail = execute_action(action, mappings)
except Exception as exc: inbox_path = write_status(
args.control_root,
action_id,
request_id,
"failed",
"controlled-action-relay",
str(exc),
)
print(inbox_path)
raise SystemExit(1)
inbox_path = write_status(
args.control_root,
action_id,
request_id,
"completed",
"controlled-action-relay",
detail,
output_path=action["output_path"],
)
print(inbox_path)
return 0
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="rho-controlled-action-relay")
subparsers = parser.add_subparsers(dest="command", required=True)
relay = subparsers.add_parser("relay")
relay.add_argument("--control-root", type=Path, required=True)
relay.add_argument("--shared-root", type=Path, required=True)
relay.add_argument("--action-id")
relay.add_argument(
"--middleware",
choices=["auto", "telegram", "fixture"],
default=None,
help="Approval middleware adapter to use",
)
relay.add_argument(
"--mode",
choices=["default", "live"],
default=None,
help="Deprecated compatibility flag; maps to auto or telegram middleware",
)
relay.add_argument("--telegram-profile")
relay.add_argument("--telegram-root", type=Path)
relay.add_argument("--telegram-user")
relay.add_argument("--fixture-file", type=Path)
relay.add_argument(
"--manifest-dir",
type=Path,
default=None,
help="Directory containing expected action manifests keyed by request id",
)
relay.add_argument(
"--require-manifest",
action="store_true",
help="Fail if no action manifest is found for the request id",
)
relay.add_argument("--timeout-seconds", type=int, default=120)
relay.add_argument(
"--map",
action="append",
default=[],
help="Guest to host path mapping in the form /guest/prefix=/host/path",
)
relay.set_defaults(func=cmd_relay)
return parser
def main() -> int:
parser = build_parser()
args = parser.parse_args()
if args.middleware is None:
if args.mode == "live":
args.middleware = "telegram"
else:
args.middleware = "auto"
return args.func(args)
if __name__ == "__main__":
raise SystemExit(main())