use std::env;
use std::fs;
use std::path::{Path, PathBuf};
use std::process::Command;
use rho_core::{
ActionGrant, ActionGrantManifest, ApprovalManifest, ControlledActionManifest,
ControlledActionStatus, ControlledActionStatusManifest, GrantedActionFile, GrantedInput,
GrantedRepoState, ProposedActionManifest, RequestManifest, RhoResult, RunManifest, RunRecord,
SandboxMount, SandboxNetworkPolicy, SandboxRunManifest, SandboxRunRecord, ToolManifest,
ensure_parent, file_digest, from_json, from_yaml, normalize_actor_id, normalize_repo_id,
providers, require_arg, to_json_pretty, to_yaml, validate_action_id, validate_action_type,
validate_actor_id, validate_relative_safe_path, validate_request_id, validate_run_id,
validate_tier, validate_tool_id,
};
use serde::{Deserialize, Serialize};
fn usage() -> ! {
eprintln!(
"usage: rho run <approve-pr|approve-request|request|import-action|proposal-action|grant-action|controlled-action> ...\n\
approve-pr: rho run approve-pr <pr-number> --root <repo> [--private-root <path>] [--request-id <id>] [--runner local|gondolin|fake] [--command <cmd>] [--run-id <id>] [--action-id <id>] [--no-sync] [--commit] [--push] [--pr]\n\
approve-request: rho run approve-request <req-id> --root <repo> [--private-root <path>] [--runner local|gondolin|fake] [--command <cmd>] [--run-id <id>] [--action-id <id>]\n\
request: rho run request --shared-root <path> --request-id <id> --run-id <id> --private-root <path> [--tier real|mock]\n\
import-action: rho run import-action --agent-control-root <path> --control-root <path> --action-id <id> [--map <guest-prefix=host-prefix>]...\n\
rho run import-action --source <path> --control-root <path> --action-id <id> --repo-root <path> --run-root <path>\n\
proposal-action: rho run proposal-action --proposal <path> --control-root <path> --action-id <id> --map <guest-prefix=host-prefix> --script-root <path> --output-root <path> --allow-tool <tool> --allow-requested-by <actor> --allow-requested-for <actor> --allow-action-type <type>\n\
grant-action: rho run grant-action --shared-root <path> --control-root <path> --action-id <id> --granted-by <actor> [--input <kind:path>]...\n\
controlled-action: rho run controlled-action --shared-root <path> --control-root <path> --action-id <id> --private-root <path>"
);
std::process::exit(2);
}
fn request_path(shared_root: &Path, request_id: &str) -> PathBuf {
shared_root
.join(".rho/requests")
.join(format!("{request_id}.yaml"))
}
fn approval_path(shared_root: &Path, request_id: &str) -> PathBuf {
shared_root
.join(".rho/approvals")
.join(format!("{request_id}.yaml"))
}
fn run_manifest_path(shared_root: &Path, run_id: &str) -> PathBuf {
shared_root.join(".rho/runs").join(format!("{run_id}.yaml"))
}
fn action_grant_path(shared_root: &Path, action_id: &str) -> PathBuf {
shared_root
.join(".rho/action-grants")
.join(format!("{action_id}.yaml"))
}
fn action_grant_signature_path(shared_root: &Path, action_id: &str) -> PathBuf {
shared_root
.join(".rho/action-grants")
.join(format!("{action_id}.rhosig.yaml"))
}
fn committed_approval_grant_path(shared_root: &Path, request_id: &str, action_id: &str) -> PathBuf {
shared_root
.join("rho/approval-grants")
.join(request_id)
.join(format!("{action_id}.yaml"))
}
fn committed_run_receipt_path(shared_root: &Path, request_id: &str, run_id: &str) -> PathBuf {
shared_root
.join("rho/run-receipts")
.join(request_id)
.join(format!("{run_id}.yaml"))
}
fn tool_path(shared_root: &Path, tool_id: &str) -> PathBuf {
shared_root
.join(".rho/tools")
.join(format!("{tool_id}.yaml"))
}
fn committed_tool_path(shared_root: &Path, tool_id: &str) -> PathBuf {
shared_root
.join("rho/tools")
.join(format!("{tool_id}.yaml"))
}
fn run_artifact_dir(shared_root: &Path, run_id: &str) -> PathBuf {
shared_root.join(".rho/runs").join(run_id)
}
fn action_outbox_path(control_root: &Path, action_id: &str) -> PathBuf {
control_root
.join("outbox")
.join(format!("{action_id}.json"))
}
fn action_status_path(control_root: &Path, action_id: &str) -> PathBuf {
control_root
.join("inbox")
.join(format!("{action_id}.status.json"))
}
fn approval_status(shared_root: &Path, request_id: &str) -> RhoResult<String> {
let path = approval_path(shared_root, request_id);
if !path.is_file() {
return Ok("pending".to_string());
}
let text = fs::read_to_string(path)?;
let decision = from_yaml::<ApprovalManifest>(&text)?
.approval
.decision
.to_ascii_lowercase();
Ok(match decision.as_str() {
"approve" | "approved" => "approved".to_string(),
"deny" | "denied" | "reject" | "rejected" => "denied".to_string(),
_ => "pending".to_string(),
})
}
fn first_file_under(dir: &Path) -> RhoResult<PathBuf> {
if !dir.is_dir() {
return Err(format!("directory not found: {}", dir.display()).into());
}
let mut files = Vec::new();
for entry in fs::read_dir(dir)? {
let entry = entry?;
if entry.file_type()?.is_file() {
files.push(entry.path());
}
}
files.sort();
files
.into_iter()
.next()
.ok_or_else(|| format!("no dataset file found under {}", dir.display()).into())
}
fn dataset_csv(
shared_root: &Path,
private_root: &Path,
dataset_uuid: &str,
tier: &str,
) -> RhoResult<PathBuf> {
match tier {
"public" => dataset_variant_csv(&shared_root.join("datasets"), dataset_uuid, "public"),
"mock" => {
dataset_variant_csv(&shared_root.join("datasets"), dataset_uuid, "mock").or_else(|_| {
first_file_under(&shared_root.join("datasets").join(dataset_uuid).join("mock"))
})
}
"real" => dataset_variant_csv(private_root, dataset_uuid, "real")
.or_else(|_| first_file_under(&private_root.join(dataset_uuid).join("real"))),
_ => Err(format!("unsupported run tier: {tier}").into()),
}
}
fn dataset_csv_for_approval(
shared_root: &Path,
private_root: Option<&Path>,
dataset_uuid: &str,
tier: &str,
) -> RhoResult<PathBuf> {
if tier != "real" {
return dataset_csv(
shared_root,
private_root.unwrap_or_else(|| Path::new(".")),
dataset_uuid,
tier,
);
}
if let Some(private_root) = private_root {
return dataset_csv(shared_root, private_root, dataset_uuid, tier);
}
dataset_csv_from_binding(shared_root, dataset_uuid)
}
fn dataset_variant_csv(root: &Path, dataset_ref: &str, tier: &str) -> RhoResult<PathBuf> {
let bundle = resolve_dataset_bundle(root, dataset_ref)?;
let manifest = read_dataset_manifest(&bundle.join("dataset.yaml"))?;
let relative = match tier {
"public" => manifest
.dataset
.variants
.public
.and_then(|variant| variant.relative_path),
"mock" => manifest
.dataset
.variants
.mock
.and_then(|variant| variant.relative_path),
"real" => manifest
.dataset
.variants
.real
.and_then(|variant| variant.relative_path),
_ => None,
}
.ok_or_else(|| format!("dataset {dataset_ref} has no {tier} variant"))?;
validate_relative_safe_path(&relative)?;
let path = bundle.join(relative);
if !path.is_file() {
return Err(format!("dataset file not found: {}", path.display()).into());
}
Ok(path)
}
fn dataset_csv_from_binding(shared_root: &Path, dataset_uuid: &str) -> RhoResult<PathBuf> {
let dataset_name = repo_dataset_name(shared_root, dataset_uuid)?;
let binding = find_dataset_binding(dataset_uuid, dataset_name.as_deref())?;
if binding.real_path.is_file() {
return Ok(binding.real_path);
}
first_file_under(&binding.real_path)
}
fn repo_dataset_name(root: &Path, dataset_uuid: &str) -> RhoResult<Option<String>> {
let datasets = root.join("datasets");
if !datasets.is_dir() {
return Ok(None);
}
for entry in fs::read_dir(datasets)? {
let dir = entry?.path();
let manifest_path = dir.join("dataset.yaml");
if !manifest_path.is_file() {
continue;
}
let manifest = read_dataset_manifest(&manifest_path)?;
if manifest.dataset.uuid == dataset_uuid {
return Ok(Some(manifest.dataset.name));
}
}
Ok(None)
}
fn find_dataset_binding(
dataset_uuid: &str,
dataset_name: Option<&str>,
) -> RhoResult<DatasetBinding> {
let root = dataset_bindings_root()?;
if !root.is_dir() {
return Err(format!(
"no local dataset bindings found at {}; pass --private-root or run rho dataset bind",
root.display()
)
.into());
}
let mut name_matches = Vec::new();
for entry in fs::read_dir(&root)? {
let binding_path = entry?.path().join("bindings.yaml");
if !binding_path.is_file() {
continue;
}
let binding = read_dataset_binding(&binding_path)?;
if binding.uuid.as_deref() == Some(dataset_uuid) {
return Ok(binding);
}
if dataset_name.is_some_and(|name| binding.name == name) {
name_matches.push(binding);
}
}
match name_matches.len() {
0 => Err(format!(
"no local binding found for dataset uuid {dataset_uuid}; pass --private-root or run rho dataset bind"
)
.into()),
1 => Ok(name_matches.remove(0)),
_ => Err(format!(
"multiple local bindings match dataset name {}; bind with --uuid or pass --private-root",
dataset_name.unwrap_or("<unknown>")
)
.into()),
}
}
fn dataset_bindings_root() -> RhoResult<PathBuf> {
let handle = env::var("RHO_ENV_HANDLE")
.ok()
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
.ok_or("missing --profile for local dataset binding lookup")?;
Ok(env::var("RHO_PROJECTS_ROOT")
.map(PathBuf::from)
.unwrap_or_else(|_| {
env::var("HOME")
.map(PathBuf::from)
.unwrap_or_else(|_| PathBuf::from("."))
.join("rho")
})
.join(handle)
.join("datasets"))
}
fn read_dataset_binding(path: &Path) -> RhoResult<DatasetBinding> {
let text = fs::read_to_string(path)?;
let manifest: DatasetBindingManifest = from_yaml(&text)?;
let real = manifest
.dataset_binding
.variants
.real
.ok_or_else(|| format!("binding has no real variant: {}", path.display()))?;
if real.source.kind != "local_path" {
return Err(format!("unsupported binding source kind: {}", real.source.kind).into());
}
Ok(DatasetBinding {
name: manifest.dataset_binding.name,
uuid: manifest.dataset_binding.uuid,
real_path: PathBuf::from(real.source.path),
})
}
fn resolve_dataset_bundle(root: &Path, dataset_ref: &str) -> RhoResult<PathBuf> {
let direct = root.join(dataset_ref);
if direct.join("dataset.yaml").is_file() {
return Ok(direct);
}
let mut matches = Vec::new();
if root.is_dir() {
for entry in fs::read_dir(root)? {
let dir = entry?.path();
let manifest_path = dir.join("dataset.yaml");
if !manifest_path.is_file() {
continue;
}
let manifest = read_dataset_manifest(&manifest_path)?;
if manifest.dataset.uuid == dataset_ref || manifest.dataset.name == dataset_ref {
matches.push(dir);
}
}
}
match matches.len() {
0 => Err(format!(
"dataset bundle not found for {dataset_ref} under {}",
root.display()
)
.into()),
1 => Ok(matches.remove(0)),
_ => Err(format!("dataset reference is ambiguous: {dataset_ref}; use the UUID").into()),
}
}
fn read_dataset_manifest(path: &Path) -> RhoResult<DatasetManifest> {
let text = fs::read_to_string(path)?;
from_yaml(&text)
}
fn load_tool(shared_root: &Path, tool_id: &str) -> RhoResult<rho_core::Tool> {
validate_tool_id(tool_id)?;
let local_path = tool_path(shared_root, tool_id);
let committed_path = committed_tool_path(shared_root, tool_id);
let text = if local_path.is_file() {
fs::read_to_string(&local_path)?
} else if committed_path.is_file() {
fs::read_to_string(&committed_path)?
} else {
return Err(format!(
"tool manifest not found: {} or {}",
local_path.display(),
committed_path.display()
)
.into());
};
let manifest: ToolManifest = from_yaml(&text)?;
if manifest.tool.id != tool_id {
return Err(format!(
"tool id mismatch: requested {tool_id}, manifest has {}",
manifest.tool.id
)
.into());
}
validate_action_type(&manifest.tool.action_type)?;
validate_actor_id(&manifest.tool.owner)?;
Ok(manifest.tool)
}
fn action_type_for_tier(tier: &str) -> RhoResult<&'static str> {
match tier {
"public" => Ok("run_mock_data"),
"mock" => Ok("run_mock_data"),
"real" => Ok("run_real_data"),
_ => Err(format!("unsupported run tier: {tier}").into()),
}
}
fn ensure_code_path_inside_shared(shared_root: &Path, code_path: &Path) -> RhoResult<()> {
let shared = shared_root.canonicalize()?;
let code = code_path.canonicalize()?;
if !code.starts_with(&shared) {
return Err(format!(
"code path must be inside shared root: {} is outside {}",
code.display(),
shared.display()
)
.into());
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn write_run_manifest(
shared_root: &Path,
run_id: &str,
request_id: &str,
status: &str,
tier: &str,
runner: Option<&str>,
dataset_csv: Option<&Path>,
code_path: Option<&Path>,
command: &[String],
exit_code: Option<i32>,
error: Option<&str>,
) -> RhoResult<()> {
let manifest = run_manifest_path(shared_root, run_id);
let artifact_dir = run_artifact_dir(shared_root, run_id);
ensure_parent(&manifest)?;
fs::create_dir_all(&artifact_dir)?;
let code_sha256 = match code_path {
Some(path) if path.is_file() => Some(file_digest(path)?),
_ => None,
};
fs::write(
manifest,
to_yaml(&RunManifest {
version: 1,
run: RunRecord {
id: run_id.to_string(),
request_id: request_id.to_string(),
status: status.to_string(),
tier: tier.to_string(),
runner: runner.map(ToOwned::to_owned),
dataset_csv: dataset_csv.map(|path| path.display().to_string()),
code_path: code_path.map(|path| path.display().to_string()),
code_sha256,
command: command.to_vec(),
exit_code,
error: error.map(ToOwned::to_owned),
stdout_path: artifact_dir.join("stdout.txt").display().to_string(),
stderr_path: artifact_dir.join("stderr.txt").display().to_string(),
created_at: rho_core::now_rfc3339(),
},
})?,
)?;
Ok(())
}
fn verify_code_digest(code_path: &Path, expected: &str) -> RhoResult<()> {
let actual = file_digest(code_path)?;
if actual != expected {
return Err(format!("code digest mismatch: expected {expected}, got {actual}").into());
}
Ok(())
}
struct PreparedRun {
tier: String,
dataset_csv: PathBuf,
code_path: PathBuf,
command: Vec<String>,
artifact_dir: PathBuf,
}
struct SandboxExecution {
runner: String,
stdout_path: PathBuf,
exit_code: Option<i32>,
success: bool,
}
fn sandbox_runner() -> RhoResult<String> {
let runner = env::var("RHO_SANDBOX_RUNNER").unwrap_or_else(|_| "local".to_string());
validate_runner(&runner)?;
Ok(runner)
}
fn validate_runner(runner: &str) -> RhoResult<()> {
match runner {
"local" | "fake" | "gondolin" => Ok(()),
_ => Err(format!("unsupported sandbox runner: {runner}").into()),
}
}
fn shell_quote(value: &str) -> String {
format!("'{}'", value.replace('\'', "'\"'\"'"))
}
fn host_command_for_env(command: &[String]) -> String {
command
.iter()
.map(|part| shell_quote(part))
.collect::<Vec<_>>()
.join(" ")
}
fn guest_command(prepared: &PreparedRun) -> RhoResult<Vec<String>> {
let code_name = prepared
.code_path
.file_name()
.and_then(|value| value.to_str())
.ok_or_else(|| {
format!(
"code path has no file name: {}",
prepared.code_path.display()
)
})?;
let code_host = prepared.code_path.display().to_string();
let dataset_name = prepared
.dataset_csv
.file_name()
.and_then(|value| value.to_str())
.ok_or_else(|| {
format!(
"dataset path has no file name: {}",
prepared.dataset_csv.display()
)
})?;
let dataset_host = prepared.dataset_csv.display().to_string();
Ok(prepared
.command
.iter()
.map(|part| {
part.replace(&code_host, &format!("/workspace/{code_name}"))
.replace(&dataset_host, &format!("/input/{dataset_name}"))
})
.collect())
}
fn write_sandbox_run_manifest(
shared_root: &Path,
run_id: &str,
request_id: &str,
runner: &str,
prepared: &PreparedRun,
) -> RhoResult<()> {
let workspace = shared_root.join("workspace");
let manifest_path = prepared.artifact_dir.join("sandbox-run.yaml");
ensure_parent(&manifest_path)?;
fs::write(
manifest_path,
to_yaml(&SandboxRunManifest {
version: 1,
sandbox_run: SandboxRunRecord {
id: run_id.to_string(),
request_id: request_id.to_string(),
runner: runner.to_string(),
tier: prepared.tier.clone(),
dataset_csv: prepared.dataset_csv.display().to_string(),
code_path: prepared.code_path.display().to_string(),
command: prepared.command.clone(),
artifact_dir: prepared.artifact_dir.display().to_string(),
stdout_path: prepared
.artifact_dir
.join("stdout.txt")
.display()
.to_string(),
stderr_path: prepared
.artifact_dir
.join("stderr.txt")
.display()
.to_string(),
mounts: vec![
SandboxMount {
host_path: workspace.display().to_string(),
guest_path: "/workspace".to_string(),
mode: "ro".to_string(),
},
SandboxMount {
host_path: prepared
.dataset_csv
.parent()
.ok_or_else(|| {
format!(
"dataset path has no parent: {}",
prepared.dataset_csv.display()
)
})?
.display()
.to_string(),
guest_path: "/input".to_string(),
mode: "ro".to_string(),
},
SandboxMount {
host_path: prepared.artifact_dir.display().to_string(),
guest_path: "/output".to_string(),
mode: "rw".to_string(),
},
],
network: SandboxNetworkPolicy {
default_deny: true,
allow_hosts: vec![],
tcp_maps: vec![],
},
created_at: rho_core::now_rfc3339(),
},
})?,
)?;
Ok(())
}
fn write_gondolin_config(prepared: &PreparedRun) -> RhoResult<PathBuf> {
let guest_command = guest_command(prepared)?;
let guest_shell_command = format!(
"{} > /output/stdout.txt 2> /output/stderr.txt",
host_command_for_env(&guest_command)
);
let config_path = prepared.artifact_dir.join("gondolin-run.yaml");
ensure_parent(&config_path)?;
fs::write(
&config_path,
format!(
"network:\n defaultDeny: true\n\nmounts:\n - host: {}\n guest: /workspace\n mode: ro\n - host: {}\n guest: /input\n mode: ro\n - host: {}\n guest: /output\n mode: rw\n\ncommand:\n - /bin/sh\n - -lc\n - {}\n",
rho_core::yaml_quote(
&prepared
.code_path
.parent()
.ok_or_else(|| format!(
"code path has no parent: {}",
prepared.code_path.display()
))?
.display()
.to_string()
),
rho_core::yaml_quote(
&prepared
.dataset_csv
.parent()
.ok_or_else(|| format!(
"dataset path has no parent: {}",
prepared.dataset_csv.display()
))?
.display()
.to_string()
),
rho_core::yaml_quote(&prepared.artifact_dir.display().to_string()),
rho_core::yaml_quote(&guest_shell_command),
),
)?;
Ok(config_path)
}
fn execute_gondolin_runner(prepared: &PreparedRun) -> RhoResult<SandboxExecution> {
let config_path = write_gondolin_config(prepared)?;
let command_text =
env::var("RHO_GONDOLIN_RUNNER_CMD").unwrap_or_else(|_| "./rho-gondolin-run".to_string());
let command_parts = rho_core::split_command_line(&command_text)?;
let Some((program, args)) = command_parts.split_first() else {
return Err("RHO_GONDOLIN_RUNNER_CMD must not be empty".into());
};
let stdout_path = prepared.artifact_dir.join("stdout.txt");
let stderr_path = prepared.artifact_dir.join("stderr.txt");
let output = Command::new(program)
.args(args)
.arg(&config_path)
.env("RHO_GONDOLIN_RUN_CONFIG", &config_path)
.env("RHO_GONDOLIN_STDOUT_PATH", &stdout_path)
.env("RHO_GONDOLIN_STDERR_PATH", &stderr_path)
.env(
"RHO_GONDOLIN_HOST_COMMAND",
host_command_for_env(&prepared.command),
)
.env(
"RHO_GONDOLIN_GUEST_COMMAND",
host_command_for_env(&guest_command(prepared)?),
)
.env(
"RHO_GONDOLIN_WORKSPACE_DIR",
prepared.code_path.parent().ok_or_else(|| {
format!("code path has no parent: {}", prepared.code_path.display())
})?,
)
.env(
"RHO_GONDOLIN_DATASET_DIR",
prepared.dataset_csv.parent().ok_or_else(|| {
format!(
"dataset path has no parent: {}",
prepared.dataset_csv.display()
)
})?,
)
.env("RHO_GONDOLIN_OUTPUT_DIR", &prepared.artifact_dir)
.output()?;
if !stdout_path.exists() {
fs::write(&stdout_path, &output.stdout)?;
}
if !stderr_path.exists() {
fs::write(&stderr_path, &output.stderr)?;
}
Ok(SandboxExecution {
runner: "gondolin".to_string(),
stdout_path,
exit_code: output.status.code(),
success: output.status.success(),
})
}
fn execute_sandbox_run(
shared_root: &Path,
run_id: &str,
request_id: &str,
prepared: &PreparedRun,
runner_override: Option<&str>,
) -> RhoResult<SandboxExecution> {
let runner = match runner_override {
Some(runner) => {
validate_runner(runner)?;
runner.to_string()
}
None => sandbox_runner()?,
};
fs::create_dir_all(&prepared.artifact_dir)?;
write_sandbox_run_manifest(shared_root, run_id, request_id, &runner, prepared)?;
if runner == "gondolin" {
return execute_gondolin_runner(prepared);
}
let output = Command::new(&prepared.command[0])
.args(&prepared.command[1..])
.output()?;
let stdout_path = prepared.artifact_dir.join("stdout.txt");
let stderr_path = prepared.artifact_dir.join("stderr.txt");
fs::write(&stdout_path, &output.stdout)?;
fs::write(&stderr_path, &output.stderr)?;
Ok(SandboxExecution {
runner,
stdout_path,
exit_code: output.status.code(),
success: output.status.success(),
})
}
fn execute_request(
shared_root: &Path,
private_root: Option<&Path>,
request_id: &str,
run_id: &str,
tier_override: Option<String>,
require_request_approval: bool,
runner_override: Option<&str>,
) -> RhoResult<PathBuf> {
let request_text = fs::read_to_string(request_path(shared_root, request_id)).map_err(|_| {
format!(
"request not found: {}",
request_path(shared_root, request_id).display()
)
})?;
let request_manifest: RequestManifest = from_yaml(&request_text)?;
let request = request_manifest.request;
if request.id != request_id {
return Err(format!(
"request id mismatch: requested {request_id}, manifest has {}",
request.id
)
.into());
}
let requested_tier = request.requested_tier;
let tier = tier_override.unwrap_or(requested_tier);
validate_tier(&tier)?;
let tool = load_tool(shared_root, &request.tool_id)?;
let required_action_type = action_type_for_tier(&tier)?;
if tool.action_type != required_action_type {
return Err(format!(
"tool {} has action_type {}, but tier {tier} requires {required_action_type}",
tool.id, tool.action_type
)
.into());
}
let dataset_uuid = request.dataset_uuid;
let code_path_string = request
.code_paths
.first()
.ok_or("request manifest missing code_paths")?;
let code_path = PathBuf::from(code_path_string);
let mut command_template = request.command;
if command_template.is_empty() {
command_template = tool.command_template.clone();
}
if command_template.is_empty() {
command_template = default_command_for_code(&code_path)?;
}
if require_request_approval && tool.approval_required {
let approval = approval_status(shared_root, request_id)?;
if approval != "approved" {
write_run_manifest(
shared_root,
run_id,
request_id,
"blocked",
&tier,
None,
None,
Some(&code_path),
&command_template,
None,
Some(&format!(
"tool {} requires approved request; current status is {approval}",
tool.id
)),
)?;
return Err(format!(
"tool {} requires approved request; current status is {approval}",
tool.id
)
.into());
}
}
if !code_path.is_file() {
write_run_manifest(
shared_root,
run_id,
request_id,
"failed",
&tier,
None,
None,
Some(&code_path),
&[],
None,
Some("code path does not exist"),
)?;
return Err(format!("code path does not exist: {}", code_path.display()).into());
}
if let Err(error) = ensure_code_path_inside_shared(shared_root, &code_path) {
write_run_manifest(
shared_root,
run_id,
request_id,
"failed",
&tier,
None,
None,
Some(&code_path),
&command_template,
None,
Some(&error.to_string()),
)?;
return Err(error);
}
if let Err(error) = verify_code_digest(&code_path, &request.code_sha256) {
write_run_manifest(
shared_root,
run_id,
request_id,
"failed",
&tier,
None,
None,
Some(&code_path),
&command_template,
None,
Some(&error.to_string()),
)?;
return Err(error);
}
let dataset_csv = dataset_csv_for_approval(shared_root, private_root, &dataset_uuid, &tier)?;
let command = resolve_command_template(&command_template, &code_path, &dataset_csv)?;
let artifact_dir = run_artifact_dir(shared_root, run_id);
let prepared = PreparedRun {
tier: tier.clone(),
dataset_csv: dataset_csv.clone(),
code_path: code_path.clone(),
command: command.clone(),
artifact_dir,
};
let execution =
execute_sandbox_run(shared_root, run_id, request_id, &prepared, runner_override)?;
let status = if execution.success {
"completed"
} else {
"failed"
};
write_run_manifest(
shared_root,
run_id,
request_id,
status,
&tier,
Some(&execution.runner),
Some(&dataset_csv),
Some(&code_path),
&command,
execution.exit_code,
None,
)?;
if execution.success {
Ok(execution.stdout_path)
} else {
Err(format!(
"run failed; see {}",
run_manifest_path(shared_root, run_id).display()
)
.into())
}
}
fn write_action_status(
control_root: &Path,
action_id: &str,
request_id: &str,
status: &str,
run_id: Option<&str>,
stdout_path: Option<&Path>,
error: Option<&str>,
) -> RhoResult<()> {
let path = action_status_path(control_root, action_id);
ensure_parent(&path)?;
fs::write(
path,
to_json_pretty(&ControlledActionStatusManifest {
version: 1,
status: ControlledActionStatus {
action_id: action_id.to_string(),
request_id: request_id.to_string(),
status: status.to_string(),
run_id: run_id.map(ToOwned::to_owned),
stdout_path: stdout_path.map(|path| path.display().to_string()),
error: error.map(ToOwned::to_owned),
created_at: rho_core::now_rfc3339(),
},
})?,
)?;
Ok(())
}
fn run_id_for_action(action_id: &str) -> String {
format!("run-{}", action_id.trim_start_matches("act-"))
}
fn git_commit(shared_root: &Path) -> Option<String> {
let output = Command::new("git")
.args([
"-C",
&shared_root.display().to_string(),
"rev-parse",
"HEAD",
])
.output()
.ok()?;
if !output.status.success() {
return None;
}
Some(String::from_utf8_lossy(&output.stdout).trim().to_string())
}
fn action_grant_input_arg(value: &str) -> RhoResult<GrantedInput> {
let (kind, path) = value
.split_once(':')
.ok_or("input must be formatted as kind:path")?;
if kind.is_empty() || path.is_empty() {
return Err("input kind and path must be non-empty".into());
}
let path_buf = PathBuf::from(path);
Ok(GrantedInput {
kind: kind.to_string(),
path: path.to_string(),
sha256: file_digest(&path_buf)?,
})
}
fn grant_inputs_from_args(args: &[String]) -> RhoResult<Vec<GrantedInput>> {
let mut inputs = Vec::new();
let mut index = 0;
while index < args.len() {
if args[index] == "--input" {
let value = args.get(index + 1).ok_or("missing value after --input")?;
inputs.push(action_grant_input_arg(value)?);
index += 2;
} else {
index += 1;
}
}
Ok(inputs)
}
fn parse_path_mappings(args: &[String]) -> RhoResult<Vec<(String, PathBuf)>> {
let mut mappings = Vec::new();
let mut index = 0;
while index < args.len() {
if args[index] == "--map" {
let value = args.get(index + 1).ok_or("missing value after --map")?;
let (guest, host) = value
.split_once('=')
.ok_or("map must be formatted as /guest/prefix=/host/prefix")?;
if !guest.starts_with('/') || guest.is_empty() || host.is_empty() {
return Err(
"map must use an absolute guest prefix and non-empty host prefix".into(),
);
}
mappings.push((guest.trim_end_matches('/').to_string(), PathBuf::from(host)));
index += 2;
} else {
index += 1;
}
}
mappings.sort_by_key(|mapping| std::cmp::Reverse(mapping.0.len()));
Ok(mappings)
}
fn map_guest_path(value: &str, mappings: &[(String, PathBuf)]) -> RhoResult<String> {
if !value.starts_with('/') {
return Err(format!("guest path must be absolute: {value}").into());
}
for (guest_prefix, host_prefix) in mappings {
if value == guest_prefix {
return Ok(host_prefix.display().to_string());
}
let prefix = format!("{guest_prefix}/");
if let Some(relative) = value.strip_prefix(&prefix) {
if relative.split('/').any(|segment| segment == "..") {
return Err(format!("guest path must not contain ..: {value}").into());
}
return Ok(host_prefix.join(relative).display().to_string());
}
}
Err(format!("no host mapping found for guest path: {value}").into())
}
fn map_optional_guest_path(
value: Option<String>,
mappings: &[(String, PathBuf)],
) -> RhoResult<Option<String>> {
value
.map(|path| map_guest_path(&path, mappings))
.transpose()
}
fn map_rho_path(value: &str, repo_root: &Path, run_root: &Path) -> RhoResult<String> {
if let Some(relative) = value.strip_prefix("rho://repo/") {
validate_relative_safe_path(relative)?;
return Ok(repo_root.join(relative).display().to_string());
}
if let Some(relative) = value.strip_prefix("rho://run/") {
validate_relative_safe_path(relative)?;
return Ok(run_root.join(relative).display().to_string());
}
if value.starts_with("rho://") {
return Err(format!("unsupported rho path: {value}").into());
}
Ok(value.to_string())
}
fn map_optional_rho_path(
value: Option<String>,
repo_root: &Path,
run_root: &Path,
) -> RhoResult<Option<String>> {
value
.map(|path| map_rho_path(&path, repo_root, run_root))
.transpose()
}
fn map_proposed_path(value: &str, mappings: &[(String, PathBuf)]) -> RhoResult<String> {
if value.starts_with('/') {
return map_guest_path(value, mappings);
}
Ok(value.to_string())
}
fn require_allowed_value(rest: &[String], flag: &str, actual: &str, label: &str) -> RhoResult<()> {
let expected = require_arg(rest, flag).unwrap_or_else(|_| usage());
if expected != actual {
return Err(format!("{label} not allowed: expected {expected}, got {actual}").into());
}
Ok(())
}
fn require_allowed_actor(rest: &[String], flag: &str, actual: &str, label: &str) -> RhoResult<()> {
let expected = normalize_actor_id(&require_arg(rest, flag).unwrap_or_else(|_| usage()))?;
if expected != actual {
return Err(format!("{label} not allowed: expected {expected}, got {actual}").into());
}
Ok(())
}
fn grant_has_input(grant: &ActionGrant, path: &str) -> bool {
grant.inputs.iter().any(|input| input.path == path)
}
fn load_action_grant(shared_root: &Path, action_id: &str) -> RhoResult<ActionGrant> {
let path = action_grant_path(shared_root, action_id);
let text = fs::read_to_string(&path)
.map_err(|_| format!("action grant not found: {}", path.display()))?;
let manifest: ActionGrantManifest = from_yaml(&text)?;
Ok(manifest.action_grant)
}
fn validate_action_grant(
shared_root: &Path,
control_root: &Path,
action: &rho_core::ControlledAction,
) -> RhoResult<ActionGrant> {
let grant = load_action_grant(shared_root, &action.action_id)?;
if grant.decision != "approved" {
return Err(format!("action grant is not approved: {}", grant.decision).into());
}
if grant.action_id != action.action_id {
return Err("action grant action_id mismatch".into());
}
if grant.request_id != action.request_id {
return Err("action grant request_id mismatch".into());
}
if grant.tool_id != action.tool_id {
return Err(format!(
"action grant tool_id {}, got {}",
grant.tool_id, action.tool_id
)
.into());
}
if grant.action_type != action.action_type {
return Err(format!(
"action grant action_type {}, got {}",
grant.action_type, action.action_type
)
.into());
}
let action_path = action_outbox_path(control_root, &action.action_id);
let action_sha256 = file_digest(&action_path)?;
if grant.action.path != action_path.display().to_string()
|| grant.action.sha256 != action_sha256
{
return Err(format!(
"action digest mismatch: expected {}, got {}",
grant.action.sha256, action_sha256
)
.into());
}
if let Some(expected_commit) = &grant.repo.git_commit {
let actual = git_commit(shared_root)
.ok_or("grant requires a git commit but shared root is not a git repo")?;
if &actual != expected_commit {
return Err(
format!("git commit mismatch: expected {expected_commit}, got {actual}").into(),
);
}
}
for input in &grant.inputs {
let actual = file_digest(&PathBuf::from(&input.path))?;
if actual != input.sha256 {
return Err(format!(
"input digest mismatch for {}: expected {}, got {}",
input.path, input.sha256, actual
)
.into());
}
}
let tool = load_tool(shared_root, &action.tool_id)?;
if tool.action_type != action.action_type {
return Err(format!(
"tool {} has action_type {}, got action {}",
tool.id, tool.action_type, action.action_type
)
.into());
}
Ok(grant)
}
fn ensure_path_under(path: &Path, root: &Path, label: &str) -> RhoResult<()> {
let root = if root.exists() {
root.canonicalize()?
} else if root.is_absolute() {
root.to_path_buf()
} else {
env::current_dir()?.join(root)
};
let candidate = if path.exists() {
path.canonicalize()?
} else {
for component in path.components() {
if matches!(component, std::path::Component::ParentDir) {
return Err(format!("{label} must not contain ..: {}", path.display()).into());
}
}
if path.is_absolute() {
path.to_path_buf()
} else {
root.join(path)
}
};
if !candidate.starts_with(&root) {
return Err(format!(
"{label} must be under {}: {}",
root.display(),
candidate.display()
)
.into());
}
Ok(())
}
fn request_participants(shared_root: &Path, request_id: &str) -> RhoResult<(String, String)> {
let text = fs::read_to_string(request_path(shared_root, request_id))?;
let manifest: RequestManifest = from_yaml(&text)?;
Ok((manifest.request.from, manifest.request.to))
}
fn publish_result_artifact(
shared_root: &Path,
request_id: &str,
input_path: &Path,
output_path: &Path,
) -> RhoResult<()> {
ensure_path_under(
input_path,
&shared_root.join(".rho/runs"),
"release input path",
)?;
ensure_path_under(
output_path,
&shared_root.join("results"),
"release output path",
)?;
if !input_path.is_file() {
return Err(format!("release input file not found: {}", input_path.display()).into());
}
ensure_parent(output_path)?;
fs::copy(input_path, output_path)?;
let (requester, owner) = request_participants(shared_root, request_id)?;
let result_id = format!("result-{}", request_id.trim_start_matches("req-"));
let manifest = shared_root
.join(".rho/results")
.join(format!("{request_id}.yaml"));
ensure_parent(&manifest)?;
fs::write(
&manifest,
format!(
"version: 1\nresult:\n id: {}\n request_id: {}\n from: {}\n to: {}\n status: \"released\"\n artifact_paths:\n - {}\n",
rho_core::yaml_quote(&result_id),
rho_core::yaml_quote(request_id),
rho_core::yaml_quote(&owner),
rho_core::yaml_quote(&requester),
rho_core::yaml_quote(&output_path.display().to_string()),
),
)?;
let inbox = shared_root
.join(".rho/inbox")
.join(&requester)
.join(format!("result-{request_id}.yaml"));
ensure_parent(&inbox)?;
fs::write(
inbox,
format!(
"version: 1\nmessage:\n id: {}\n from: {}\n to: {}\n type: \"result_available\"\n related_request_id: {}\n body:\n text: {}\n result_manifest: {}\n",
rho_core::yaml_quote(&format!("result-{request_id}")),
rho_core::yaml_quote(&owner),
rho_core::yaml_quote(&requester),
rho_core::yaml_quote(request_id),
rho_core::yaml_quote(&format!("Result released for {request_id}.")),
rho_core::yaml_quote(&manifest.display().to_string()),
),
)?;
Ok(())
}
#[derive(Debug, Deserialize)]
struct RepoInboxRequestManifest {
request: RepoInboxRequest,
}
#[derive(Debug, Deserialize)]
struct RepoInboxRequest {
id: String,
from: String,
to: String,
#[serde(default)]
tool_id: Option<String>,
#[serde(default)]
dataset_uuid: Option<String>,
#[serde(default)]
code_path: Option<String>,
#[serde(default)]
code_paths: Vec<String>,
#[serde(default)]
code_sha256: Option<String>,
#[serde(default)]
requested_tier: Option<String>,
#[serde(default)]
tier: Option<String>,
#[serde(default)]
command: Vec<String>,
}
#[derive(Debug, Deserialize)]
struct RepoManifest {
repo: RepoRecord,
}
#[derive(Debug, Deserialize)]
struct RepoRecord {
id: String,
}
#[derive(Debug, Deserialize)]
struct DatasetManifest {
dataset: DatasetRecord,
}
#[derive(Debug, Deserialize)]
struct DatasetRecord {
uuid: String,
name: String,
variants: DatasetVariants,
}
#[derive(Debug, Deserialize)]
struct DatasetVariants {
#[serde(default)]
public: Option<DatasetVariant>,
#[serde(default)]
mock: Option<DatasetVariant>,
#[serde(default)]
real: Option<DatasetVariant>,
}
#[derive(Debug, Deserialize)]
struct DatasetVariant {
#[serde(default)]
relative_path: Option<String>,
}
struct DatasetBinding {
name: String,
uuid: Option<String>,
real_path: PathBuf,
}
#[derive(Debug, Deserialize)]
struct DatasetBindingManifest {
dataset_binding: DatasetBindingRecord,
}
#[derive(Debug, Deserialize)]
struct DatasetBindingRecord {
name: String,
#[serde(default)]
uuid: Option<String>,
variants: DatasetBindingVariants,
}
#[derive(Debug, Deserialize)]
struct DatasetBindingVariants {
real: Option<DatasetBindingVariant>,
}
#[derive(Debug, Deserialize)]
struct DatasetBindingVariant {
source: DatasetBindingSource,
}
#[derive(Debug, Deserialize)]
struct DatasetBindingSource {
kind: String,
path: String,
}
#[derive(Debug, Serialize)]
struct CommittedApprovalGrantManifest {
version: u32,
approval_grant: CommittedApprovalGrant,
}
#[derive(Debug, Serialize)]
struct CommittedApprovalGrant {
action_id: String,
request_id: String,
tool_id: String,
action_type: String,
decision: String,
granted_by: String,
created_at: String,
action: GrantedActionFile,
local_grant: GrantedActionFile,
repo: GrantedRepoState,
code: GrantedInput,
private_input: CommittedPrivateInput,
}
#[derive(Debug, Serialize)]
struct CommittedPrivateInput {
dataset_uuid: String,
tier: String,
sha256: String,
}
#[derive(Debug, Serialize)]
struct CommittedRunReceiptManifest {
version: u32,
run_receipt: CommittedRunReceipt,
}
#[derive(Debug, Serialize)]
struct CommittedRunReceipt {
request_id: String,
run_id: String,
runner: String,
status: String,
tier: String,
code_path: String,
code_sha256: String,
input_sha256: String,
output_sha256: String,
result_path: String,
created_at: String,
}
fn find_repo_inbox_request(root: &Path, request_id: &str) -> RhoResult<PathBuf> {
let inbox_root = root.join("rho/messages/inbox");
if !inbox_root.is_dir() {
return Err(format!("repo inbox not found: {}", inbox_root.display()).into());
}
let mut matches = Vec::new();
find_request_paths(&inbox_root, request_id, &mut matches)?;
matches.sort();
match matches.len() {
0 => Err(format!("request not found in repo inbox: {request_id}").into()),
1 => Ok(matches.remove(0)),
_ => Err(format!("multiple requests found for {request_id}").into()),
}
}
fn find_request_paths(dir: &Path, request_id: &str, matches: &mut Vec<PathBuf>) -> RhoResult<()> {
for entry in fs::read_dir(dir)? {
let path = entry?.path();
if path.is_dir() {
find_request_paths(&path, request_id, matches)?;
continue;
}
if path.file_name().and_then(|value| value.to_str()) != Some("request.yaml") {
continue;
}
if path
.parent()
.and_then(|value| value.file_name())
.and_then(|value| value.to_str())
== Some(request_id)
{
matches.push(path);
}
}
Ok(())
}
fn repo_path(root: &Path, path: &str) -> RhoResult<PathBuf> {
if let Some(relative) = path.strip_prefix("rho://repo/") {
validate_relative_safe_path(relative)?;
return Ok(root.join(relative));
}
if path.starts_with("rho://") {
return Err(format!("unsupported rho path: {path}").into());
}
let path = PathBuf::from(path);
if path.is_absolute() {
Ok(path)
} else {
Ok(root.join(path))
}
}
fn repo_relative_or_display(root: &Path, path: &Path) -> String {
path.strip_prefix(root)
.map(|relative| relative.display().to_string())
.unwrap_or_else(|_| path.display().to_string())
}
fn gh_pr_metadata(root: &Path, pr_number: &str) -> RhoResult<GhPrMetadata> {
let output = Command::new("gh")
.current_dir(root)
.args([
"pr",
"view",
pr_number,
"--json",
"headRefName,headRepository,headRepositoryOwner,url",
])
.output()?;
if !output.status.success() {
return Err(format!("gh pr view failed for {pr_number}").into());
}
from_json(&String::from_utf8(output.stdout)?)
}
fn gh_pr_checkout(root: &Path, pr_number: &str) -> RhoResult<()> {
let status = Command::new("gh")
.current_dir(root)
.args(["pr", "checkout", pr_number])
.status()?;
if !status.success() {
return Err(format!("gh pr checkout failed for {pr_number}").into());
}
Ok(())
}
fn finish_approve_pr(
args: &[String],
root: &Path,
pr: &GhPrMetadata,
approved: &ApprovedRun,
) -> RhoResult<()> {
let want_pr = rho_core::has_flag(args, "--pr");
let want_commit = want_pr || rho_core::has_flag(args, "--commit");
let want_push = want_pr || rho_core::has_flag(args, "--push");
if !want_commit && !want_push && !want_pr {
return Ok(());
}
if want_commit {
git_add_relative(
root,
&format!("rho/approval-grants/{}", approved.request_id),
)?;
git_add_relative(root, &format!("rho/run-receipts/{}", approved.request_id))?;
if git_has_staged_changes(root)? {
git_commit_with_rho(root, &format!("Approve and run {}", approved.request_id))?;
println!("committed approval artifacts for {}", approved.request_id);
} else {
println!("commit: no staged approval artifacts");
}
}
if want_push {
let target = github_ssh_url(&format!(
"{}/{}",
pr.head_repository_owner.login, pr.head_repository.name
));
git_push_head_to_url(root, &target, &pr.head_ref_name)?;
println!(
"pushed: {}:{}",
pr.head_repository_owner.login, pr.head_ref_name
);
}
if want_pr {
println!("pull request: {}", pr.url);
}
println!(
"approved run: request={} action={} run={}",
approved.request_id, approved.action_id, approved.run_id
);
Ok(())
}
fn git_add_relative(root: &Path, path: &str) -> RhoResult<()> {
let status = Command::new("git")
.arg("-C")
.arg(root)
.args(["add", "--"])
.arg(path)
.status()?;
if !status.success() {
return Err(format!("git add failed for {path}").into());
}
Ok(())
}
fn git_has_staged_changes(root: &Path) -> RhoResult<bool> {
let status = Command::new("git")
.arg("-C")
.arg(root)
.args(["diff", "--cached", "--quiet"])
.status()?;
match status.code() {
Some(0) => Ok(false),
Some(1) => Ok(true),
_ => Err(format!("git diff --cached --quiet failed in {}", root.display()).into()),
}
}
fn git_commit_with_rho(root: &Path, message: &str) -> RhoResult<()> {
let exe = env::current_exe()?;
let status = Command::new(exe)
.arg("commit")
.arg("-C")
.arg(root)
.arg("-m")
.arg(message)
.status()?;
if !status.success() {
return Err(format!("rho commit failed in {}", root.display()).into());
}
Ok(())
}
fn github_ssh_url(repo_slug: &str) -> String {
format!("git@github.com:{repo_slug}.git")
}
fn git_push_head_to_url(root: &Path, url: &str, branch: &str) -> RhoResult<()> {
let refspec = format!("HEAD:{branch}");
let status = Command::new("git")
.arg("-C")
.arg(root)
.args(["push", url, &refspec])
.status()?;
if !status.success() {
return Err(format!("git push failed for {url} {refspec}").into());
}
Ok(())
}
fn run_rho_command(args: &[String]) -> RhoResult<()> {
let exe = env::current_exe()?;
let status = Command::new(exe).args(args).status()?;
if !status.success() {
return Err(format!("rho command failed: {}", args.join(" ")).into());
}
Ok(())
}
fn find_single_repo_request_id(root: &Path) -> RhoResult<String> {
let mut ids = Vec::new();
collect_repo_request_ids(&root.join("rho/messages/inbox"), &mut ids)?;
ids.sort();
ids.dedup();
match ids.as_slice() {
[id] => Ok(id.clone()),
[] => Err("no request.yaml found in rho/messages/inbox; pass --request-id".into()),
_ => Err(format!(
"multiple requests found: {}; pass --request-id",
ids.join(", ")
)
.into()),
}
}
fn collect_repo_request_ids(dir: &Path, ids: &mut Vec<String>) -> RhoResult<()> {
if !dir.is_dir() {
return Ok(());
}
for entry in fs::read_dir(dir)? {
let path = entry?.path();
if path.is_dir() {
collect_repo_request_ids(&path, ids)?;
} else if path.file_name().and_then(|value| value.to_str()) == Some("request.yaml") {
let text = fs::read_to_string(&path)?;
if let Ok(manifest) = from_yaml::<RepoInboxRequestManifest>(&text) {
ids.push(manifest.request.id);
} else if let Some(id) = path
.parent()
.and_then(|parent| parent.file_name())
.and_then(|value| value.to_str())
{
ids.push(id.to_string());
}
}
}
Ok(())
}
fn request_suffix(request_id: &str) -> &str {
request_id.strip_prefix("req-").unwrap_or(request_id)
}
fn identity_inbox_relative_path(identity_id: &str) -> RhoResult<PathBuf> {
providers::identity_inbox_relative_path(identity_id)
}
fn run_slug_for_tier(tier: &str) -> RhoResult<&'static str> {
match tier {
"public" => Ok("run-public"),
"mock" => Ok("run-mock"),
"real" => Ok("run-real"),
_ => Err(format!("unsupported run tier: {tier}").into()),
}
}
fn default_tool_for_tier(tier: &str) -> RhoResult<&'static str> {
match tier {
"public" => Ok("run_mock"),
"mock" => Ok("run_mock"),
"real" => Ok("run_real"),
_ => Err(format!("unsupported run tier: {tier}").into()),
}
}
fn default_command_for_code(code_path: &Path) -> RhoResult<Vec<String>> {
let code_name = code_path
.file_name()
.and_then(|value| value.to_str())
.ok_or_else(|| format!("code path has no file name: {}", code_path.display()))?;
Ok(vec![
"python3".to_string(),
code_name.to_string(),
"DATASET_CSV".to_string(),
])
}
fn resolve_command_template(
template: &[String],
code_path: &Path,
dataset_csv: &Path,
) -> RhoResult<Vec<String>> {
if template.is_empty() {
return Err("command template must not be empty".into());
}
let code_name = code_path
.file_name()
.and_then(|value| value.to_str())
.ok_or_else(|| format!("code path has no file name: {}", code_path.display()))?;
let code_host = code_path.display().to_string();
let dataset_host = dataset_csv.display().to_string();
Ok(template
.iter()
.map(|part| {
if part == code_name {
code_host.clone()
} else {
part.replace("{{code_path}}", &code_host)
.replace("CODE_PATH", &code_host)
.replace("{{dataset_csv}}", &dataset_host)
.replace("DATASET_CSV", &dataset_host)
}
})
.collect())
}
fn rho_crypto_command() -> Command {
let rho = env::current_exe()
.ok()
.map(|path| path.with_file_name("rho"))
.unwrap_or_else(|| PathBuf::from("rho"));
let mut command = if rho.is_file() {
Command::new(rho)
} else {
let mut command = Command::new("cargo");
command.args(["run", "--quiet", "--bin", "rho", "--"]);
command
};
command.arg("crypto");
command
}
struct RequestSignatureVerify<'a> {
request_path: &'a Path,
signature_path: &'a Path,
identity: &'a str,
repo_root: &'a Path,
repo_id: &'a str,
request_id: &'a str,
recipient: &'a str,
}
fn verify_request_signature(signature: RequestSignatureVerify<'_>) -> RhoResult<()> {
let mut rho_crypto = rho_crypto_command();
let status = rho_crypto
.arg("verify")
.arg(signature.request_path)
.arg("--signature")
.arg(signature.signature_path)
.arg("--identity")
.arg(signature.identity)
.arg("--repo-root")
.arg(signature.repo_root)
.arg("--repo-id")
.arg(signature.repo_id)
.arg("--request-id")
.arg(signature.request_id)
.arg("--recipient")
.arg(signature.recipient)
.arg("--purpose")
.arg("rho.request")
.status()?;
if !status.success() {
return Err(format!(
"signature verification failed: {}",
signature.signature_path.display()
)
.into());
}
Ok(())
}
struct SignatureWrite<'a> {
signed_path: &'a Path,
signature_path: &'a Path,
identity: &'a str,
repo_id: &'a str,
request_id: &'a str,
recipient: &'a str,
purpose: &'a str,
}
fn sign_file_with_context(signature: SignatureWrite<'_>) -> RhoResult<()> {
let status = rho_crypto_command()
.arg("sign")
.arg(signature.signed_path)
.arg("--identity")
.arg(signature.identity)
.arg("--out")
.arg(signature.signature_path)
.arg("--repo-id")
.arg(signature.repo_id)
.arg("--request-id")
.arg(signature.request_id)
.arg("--recipient")
.arg(signature.recipient)
.arg("--purpose")
.arg(signature.purpose)
.status()?;
if !status.success() {
return Err(format!("failed to sign {}", signature.signed_path.display()).into());
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn write_approval_artifacts(
root: &Path,
repo_id: &str,
control_root: &Path,
request: &RepoInboxRequest,
request_id: &str,
action_id: &str,
run_id: &str,
tool_id: &str,
action_type: &str,
code_path: &Path,
dataset_path: &Path,
granted_by: &str,
tier: &str,
) -> RhoResult<()> {
let action_path = action_outbox_path(control_root, action_id);
ensure_parent(&action_path)?;
let output_path = root.join(".rho/runs").join(run_id).join("stdout.txt");
fs::write(
&action_path,
to_json_pretty(&ControlledActionManifest {
version: 1,
kind: "controlled_action".to_string(),
action: rho_core::ControlledAction {
action_id: action_id.to_string(),
request_id: request_id.to_string(),
tool_id: tool_id.to_string(),
requested_by: request.from.clone(),
requested_for: request.to.clone(),
action_type: action_type.to_string(),
summary: format!("Run reviewed request {request_id} on private data."),
reason: "Request signature and code digest were verified.".to_string(),
input_path: None,
script_path: Some(code_path.display().to_string()),
output_path: output_path.display().to_string(),
},
})?,
)?;
let grant_path = action_grant_path(root, action_id);
ensure_parent(&grant_path)?;
let created_at = rho_core::now_rfc3339();
let action_file = GrantedActionFile {
path: action_path.display().to_string(),
sha256: file_digest(&action_path)?,
};
let repo_state = GrantedRepoState {
git_commit: git_commit(root),
};
let code_input = GrantedInput {
kind: "code".to_string(),
path: code_path.display().to_string(),
sha256: file_digest(code_path)?,
};
let dataset_input = GrantedInput {
kind: "dataset".to_string(),
path: dataset_path.display().to_string(),
sha256: file_digest(dataset_path)?,
};
fs::write(
&grant_path,
to_yaml(&ActionGrantManifest {
version: 1,
action_grant: ActionGrant {
action_id: action_id.to_string(),
request_id: request_id.to_string(),
tool_id: tool_id.to_string(),
action_type: action_type.to_string(),
decision: "approved".to_string(),
granted_by: granted_by.to_string(),
created_at: created_at.clone(),
action: action_file.clone(),
repo: repo_state.clone(),
inputs: vec![code_input.clone(), dataset_input.clone()],
},
})?,
)?;
let local_grant_signature = action_grant_signature_path(root, action_id);
sign_file_with_context(SignatureWrite {
signed_path: &grant_path,
signature_path: &local_grant_signature,
identity: granted_by,
repo_id,
request_id,
recipient: &request.from,
purpose: "rho.local_action_grant",
})?;
let committed_grant_path = committed_approval_grant_path(root, request_id, action_id);
ensure_parent(&committed_grant_path)?;
let committed_grant_signature =
committed_grant_path.with_file_name(format!("{action_id}.rhosig.yaml"));
fs::write(
&committed_grant_path,
to_yaml(&CommittedApprovalGrantManifest {
version: 1,
approval_grant: CommittedApprovalGrant {
action_id: action_id.to_string(),
request_id: request_id.to_string(),
tool_id: tool_id.to_string(),
action_type: action_type.to_string(),
decision: "approved".to_string(),
granted_by: granted_by.to_string(),
created_at,
action: GrantedActionFile {
path: repo_relative_or_display(root, &action_path),
sha256: action_file.sha256,
},
local_grant: GrantedActionFile {
path: ".rho/action-grants/".to_string() + action_id + ".yaml",
sha256: file_digest(&grant_path)?,
},
repo: repo_state,
code: GrantedInput {
kind: "code".to_string(),
path: repo_relative_or_display(root, code_path),
sha256: code_input.sha256,
},
private_input: CommittedPrivateInput {
dataset_uuid: request
.dataset_uuid
.clone()
.unwrap_or_else(|| "unknown".to_string()),
tier: tier.to_string(),
sha256: dataset_input.sha256,
},
},
})?,
)?;
sign_file_with_context(SignatureWrite {
signed_path: &committed_grant_path,
signature_path: &committed_grant_signature,
identity: granted_by,
repo_id,
request_id,
recipient: &request.from,
purpose: "rho.approval_grant",
})?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn write_committed_run_receipt(
root: &Path,
repo_id: &str,
request: &RepoInboxRequest,
request_id: &str,
run_id: &str,
runner: &str,
tier: &str,
code_path: &Path,
code_sha256: &str,
dataset_path: &Path,
stdout_path: &Path,
signer: &str,
) -> RhoResult<PathBuf> {
let result_path = format!(
"rho/messages/inbox/{}/{request_id}/result.yaml",
identity_inbox_relative_path(&request.from)?.display()
);
let receipt_path = committed_run_receipt_path(root, request_id, run_id);
let receipt_signature_path = receipt_path.with_file_name(format!("{run_id}.rhosig.yaml"));
ensure_parent(&receipt_path)?;
fs::write(
&receipt_path,
to_yaml(&CommittedRunReceiptManifest {
version: 1,
run_receipt: CommittedRunReceipt {
request_id: request_id.to_string(),
run_id: run_id.to_string(),
runner: runner.to_string(),
status: "completed".to_string(),
tier: tier.to_string(),
code_path: repo_relative_or_display(root, code_path),
code_sha256: code_sha256.to_string(),
input_sha256: file_digest(dataset_path)?,
output_sha256: file_digest(stdout_path)?,
result_path,
created_at: rho_core::now_rfc3339(),
},
})?,
)?;
sign_file_with_context(SignatureWrite {
signed_path: &receipt_path,
signature_path: &receipt_signature_path,
identity: signer,
repo_id,
request_id,
recipient: &request.from,
purpose: "rho.run_receipt",
})?;
Ok(receipt_path)
}
fn read_repo_id(root: &Path) -> RhoResult<String> {
let path = root.join("rho/repo.yaml");
let text = fs::read_to_string(&path)
.map_err(|_| format!("repo manifest not found: {}", path.display()))?;
let repo: RepoManifest = from_yaml(&text)?;
normalize_repo_id(&repo.repo.id)
}
struct ExecutionRequestWrite<'a> {
root: &'a Path,
request: &'a RepoInboxRequest,
request_id: &'a str,
tool_id: &'a str,
dataset_uuid: &'a str,
code_path: &'a Path,
code_sha256: &'a str,
command: &'a [String],
tier: &'a str,
}
struct ApprovedRun {
request_id: String,
action_id: String,
run_id: String,
}
#[derive(Debug, Deserialize)]
struct GhPrMetadata {
#[serde(rename = "headRefName")]
head_ref_name: String,
#[serde(rename = "headRepository")]
head_repository: GhPrRepository,
#[serde(rename = "headRepositoryOwner")]
head_repository_owner: GhPrOwner,
url: String,
}
#[derive(Debug, Deserialize)]
struct GhPrRepository {
name: String,
}
#[derive(Debug, Deserialize)]
struct GhPrOwner {
login: String,
}
fn write_execution_request(write: ExecutionRequestWrite<'_>) -> RhoResult<()> {
let path = request_path(write.root, write.request_id);
ensure_parent(&path)?;
fs::write(
path,
to_yaml(&RequestManifest {
version: 1,
request: rho_core::RunRequest {
id: write.request_id.to_string(),
from: write.request.from.clone(),
to: write.request.to.clone(),
tool_id: write.tool_id.to_string(),
dataset_uuid: write.dataset_uuid.to_string(),
code_paths: vec![write.code_path.display().to_string()],
code_sha256: write.code_sha256.to_string(),
command: write.command.to_vec(),
requested_tier: write.tier.to_string(),
created_at: rho_core::now_rfc3339(),
},
})?,
)?;
Ok(())
}
fn handle_approve_pr(rest: &[String]) -> RhoResult<()> {
let pr_number = rest.first().cloned().unwrap_or_else(|| usage());
let root = rho_core::arg_value(rest, "--root")
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from("."));
if !rho_core::has_flag(rest, "--no-sync") {
run_rho_command(&[
"repo".to_string(),
"sync".to_string(),
"--root".to_string(),
root.display().to_string(),
])?;
}
let pr = gh_pr_metadata(&root, &pr_number)?;
gh_pr_checkout(&root, &pr_number)?;
run_rho_command(&[
"repo".to_string(),
"init".to_string(),
"--root".to_string(),
root.display().to_string(),
"--yes".to_string(),
])?;
let request_id = match rho_core::arg_value(rest, "--request-id") {
Some(id) => id,
None => find_single_repo_request_id(&root)?,
};
validate_request_id(&request_id)?;
run_rho_command(&[
"request".to_string(),
"review".to_string(),
request_id.clone(),
"--root".to_string(),
root.display().to_string(),
])?;
let mut approve_args = vec![request_id, "--root".to_string(), root.display().to_string()];
if let Some(private_root) = rho_core::arg_value(rest, "--private-root") {
approve_args.push("--private-root".to_string());
approve_args.push(private_root);
}
for flag in [
"--runner",
"--command",
"--run-id",
"--action-id",
"--approved-by",
"--tool-id",
"--tier",
] {
if let Some(value) = rho_core::arg_value(rest, flag) {
approve_args.push(flag.to_string());
approve_args.push(value);
}
}
let approved = handle_approve_request(&approve_args)?;
finish_approve_pr(rest, &root, &pr, &approved)?;
Ok(())
}
fn handle_approve_request(rest: &[String]) -> RhoResult<ApprovedRun> {
let request_id = rest.first().cloned().unwrap_or_else(|| usage());
validate_request_id(&request_id)?;
let root = rho_core::arg_value(rest, "--root")
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from("."))
.canonicalize()?;
let private_root = rho_core::arg_value(rest, "--private-root")
.map(PathBuf::from)
.map(|path| path.canonicalize())
.transpose()?;
let runner = rho_core::arg_value(rest, "--runner");
if let Some(runner) = runner.as_deref() {
validate_runner(runner)?;
}
let repo_id = read_repo_id(&root)?;
let inbox_request_path = find_repo_inbox_request(&root, &request_id)?;
let text = fs::read_to_string(&inbox_request_path)?;
let manifest: RepoInboxRequestManifest = from_yaml(&text)?;
let request = manifest.request;
if request.id != request_id {
return Err(format!(
"request id mismatch in {}: expected {request_id}, got {}",
inbox_request_path.display(),
request.id
)
.into());
}
validate_actor_id(&request.from)?;
validate_actor_id(&request.to)?;
let signature_path = inbox_request_path.with_file_name("request.rhosig.yaml");
if signature_path.is_file() {
verify_request_signature(RequestSignatureVerify {
request_path: &inbox_request_path,
signature_path: &signature_path,
identity: &request.from,
repo_root: &root,
repo_id: &repo_id,
request_id: &request_id,
recipient: &request.to,
})?;
} else if !rho_core::has_flag(rest, "--allow-unsigned-request") {
return Err(format!("request signature missing: {}", signature_path.display()).into());
}
let tier = rho_core::arg_value(rest, "--tier")
.or(request.requested_tier.clone())
.or(request.tier.clone())
.unwrap_or_else(|| "real".to_string());
validate_tier(&tier)?;
let tool_id = rho_core::arg_value(rest, "--tool-id")
.or(request.tool_id.clone())
.unwrap_or_else(|| default_tool_for_tier(&tier).unwrap().to_string());
let action_type = action_type_for_tier(&tier)?.to_string();
let tool = load_tool(&root, &tool_id)?;
if tool.action_type != action_type {
return Err(format!(
"tool {} has action_type {}, but tier {tier} requires {action_type}",
tool.id, tool.action_type
)
.into());
}
let dataset_uuid = request
.dataset_uuid
.as_ref()
.ok_or("request is missing dataset_uuid")?;
let code_value = request
.code_path
.as_ref()
.or_else(|| request.code_paths.first())
.ok_or("request has no code_path or code_paths entry")?;
let code_path = repo_path(&root, code_value)?;
ensure_code_path_inside_shared(&root, &code_path)?;
if !code_path.is_file() {
return Err(format!("code path does not exist: {}", code_path.display()).into());
}
let code_sha256 = file_digest(&code_path)?;
if let Some(expected) = request.code_sha256.as_ref()
&& expected != &code_sha256
{
return Err(format!(
"code digest mismatch for {}: expected {expected}, got {code_sha256}",
code_path.display()
)
.into());
}
let dataset_path =
dataset_csv_for_approval(&root, private_root.as_deref(), dataset_uuid, &tier)?;
let command_template = if let Some(command) = rho_core::arg_value(rest, "--command") {
rho_core::split_command_line(&command)?
} else if !request.command.is_empty() {
request.command.clone()
} else if !tool.command_template.is_empty() {
tool.command_template.clone()
} else {
default_command_for_code(&code_path)?
};
let command = resolve_command_template(&command_template, &code_path, &dataset_path)?;
let slug = run_slug_for_tier(&tier)?;
let suffix = request_suffix(&request_id);
let run_id =
rho_core::arg_value(rest, "--run-id").unwrap_or_else(|| format!("run-{slug}-{suffix}"));
let action_id =
rho_core::arg_value(rest, "--action-id").unwrap_or_else(|| format!("act-{slug}-{suffix}"));
validate_run_id(&run_id)?;
validate_action_id(&action_id)?;
let granted_by = rho_core::arg_value(rest, "--approved-by")
.map(|value| normalize_actor_id(&value))
.transpose()?
.unwrap_or_else(|| request.to.clone());
write_execution_request(ExecutionRequestWrite {
root: &root,
request: &request,
request_id: &request_id,
tool_id: &tool_id,
dataset_uuid,
code_path: &code_path,
code_sha256: &code_sha256,
command: &command,
tier: &tier,
})?;
let control_root = root.join(".rho/control");
write_approval_artifacts(
&root,
&repo_id,
&control_root,
&request,
&request_id,
&action_id,
&run_id,
&tool_id,
&action_type,
&code_path,
&dataset_path,
&granted_by,
&tier,
)?;
let stdout_path = execute_request(
&root,
private_root.as_deref(),
&request_id,
&run_id,
Some(tier.clone()),
false,
runner.as_deref(),
)?;
let runner_label = runner
.clone()
.unwrap_or_else(|| env::var("RHO_SANDBOX_RUNNER").unwrap_or_else(|_| "local".to_string()));
let receipt_path = write_committed_run_receipt(
&root,
&repo_id,
&request,
&request_id,
&run_id,
&runner_label,
&tier,
&code_path,
&code_sha256,
&dataset_path,
&stdout_path,
&granted_by,
)?;
println!("request: {request_id}");
println!("action: {action_id}");
println!("run: {run_id}");
println!("runner: {runner_label}");
println!("output: {}", stdout_path.display());
println!("receipt: {}", receipt_path.display());
println!(
"next: rho result release {request_id} --run-id {run_id} --to {}",
request.from
);
Ok(ApprovedRun {
request_id,
action_id,
run_id,
})
}
fn handle_request(rest: &[String]) -> RhoResult<()> {
let shared_root = PathBuf::from(require_arg(rest, "--shared-root").unwrap_or_else(|_| usage()));
let request_id = require_arg(rest, "--request-id").unwrap_or_else(|_| usage());
let run_id = require_arg(rest, "--run-id").unwrap_or_else(|_| usage());
let private_root =
PathBuf::from(require_arg(rest, "--private-root").unwrap_or_else(|_| usage()));
validate_request_id(&request_id)?;
validate_run_id(&run_id)?;
let tier = rho_core::arg_value(rest, "--tier");
let runner = rho_core::arg_value(rest, "--runner");
let stdout_path = execute_request(
&shared_root,
Some(&private_root),
&request_id,
&run_id,
tier,
true,
runner.as_deref(),
)?;
println!("{}", stdout_path.display());
Ok(())
}
fn handle_import_action(rest: &[String]) -> RhoResult<()> {
let control_root =
PathBuf::from(require_arg(rest, "--control-root").unwrap_or_else(|_| usage()));
let action_id = require_arg(rest, "--action-id").unwrap_or_else(|_| usage());
validate_action_id(&action_id)?;
let source_path = if let Some(source) = rho_core::arg_value(rest, "--source") {
PathBuf::from(source)
} else {
let agent_control_root =
PathBuf::from(require_arg(rest, "--agent-control-root").unwrap_or_else(|_| usage()));
action_outbox_path(&agent_control_root, &action_id)
};
let action_text = fs::read_to_string(&source_path).map_err(|_| {
format!(
"controlled action source file not found: {}",
source_path.display()
)
})?;
let mut manifest: ControlledActionManifest = from_json(&action_text)?;
if manifest.version != 1 || manifest.kind != "controlled_action" {
return Err("invalid controlled action envelope".into());
}
if manifest.action.action_id != action_id {
return Err(format!(
"action id mismatch: argument {action_id}, payload {}",
manifest.action.action_id
)
.into());
}
if let Some(repo_root) = rho_core::arg_value(rest, "--repo-root") {
let run_root = PathBuf::from(require_arg(rest, "--run-root").unwrap_or_else(|_| usage()));
let repo_root = PathBuf::from(repo_root);
manifest.action.input_path =
map_optional_rho_path(manifest.action.input_path, &repo_root, &run_root)?;
manifest.action.script_path =
map_optional_rho_path(manifest.action.script_path, &repo_root, &run_root)?;
manifest.action.output_path =
map_rho_path(&manifest.action.output_path, &repo_root, &run_root)?;
} else {
let mappings = parse_path_mappings(rest)?;
if mappings.is_empty() {
return Err("at least one --map entry is required".into());
}
manifest.action.input_path =
map_optional_guest_path(manifest.action.input_path, &mappings)?;
manifest.action.script_path =
map_optional_guest_path(manifest.action.script_path, &mappings)?;
manifest.action.output_path = map_guest_path(&manifest.action.output_path, &mappings)?;
}
let target_path = action_outbox_path(&control_root, &action_id);
ensure_parent(&target_path)?;
fs::write(&target_path, to_json_pretty(&manifest)?)?;
println!("{}", target_path.display());
Ok(())
}
fn handle_proposal_action(rest: &[String]) -> RhoResult<()> {
let proposal_path = PathBuf::from(require_arg(rest, "--proposal").unwrap_or_else(|_| usage()));
let control_root =
PathBuf::from(require_arg(rest, "--control-root").unwrap_or_else(|_| usage()));
let action_id = require_arg(rest, "--action-id").unwrap_or_else(|_| usage());
let script_root = PathBuf::from(require_arg(rest, "--script-root").unwrap_or_else(|_| usage()));
let output_root = PathBuf::from(require_arg(rest, "--output-root").unwrap_or_else(|_| usage()));
validate_action_id(&action_id)?;
let text = fs::read_to_string(&proposal_path)
.map_err(|_| format!("proposal file not found: {}", proposal_path.display()))?;
let manifest: ProposedActionManifest = from_yaml(&text)?;
if manifest.version != 1 {
return Err("invalid proposed action envelope".into());
}
let mut proposed = manifest.proposed_action;
proposed.requested_by = normalize_actor_id(&proposed.requested_by)?;
proposed.requested_for = normalize_actor_id(&proposed.requested_for)?;
validate_action_id(&proposed.action_id)?;
validate_request_id(&proposed.request_id)?;
validate_tool_id(&proposed.tool_id)?;
validate_actor_id(&proposed.requested_by)?;
validate_actor_id(&proposed.requested_for)?;
validate_action_type(&proposed.action_type)?;
if proposed.action_id != action_id {
return Err(format!(
"proposal action id mismatch: argument {action_id}, payload {}",
proposed.action_id
)
.into());
}
require_allowed_value(rest, "--allow-tool", &proposed.tool_id, "tool_id")?;
require_allowed_actor(
rest,
"--allow-requested-by",
&proposed.requested_by,
"requested_by",
)?;
require_allowed_actor(
rest,
"--allow-requested-for",
&proposed.requested_for,
"requested_for",
)?;
require_allowed_value(
rest,
"--allow-action-type",
&proposed.action_type,
"action_type",
)?;
let mappings = parse_path_mappings(rest)?;
if mappings.is_empty() {
return Err("at least one --map entry is required".into());
}
let script_path = map_proposed_path(&proposed.script_path, &mappings)?;
let output_path = map_proposed_path(&proposed.output_path, &mappings)?;
ensure_path_under(
&PathBuf::from(&script_path),
&script_root,
"proposal script path",
)?;
ensure_path_under(
&PathBuf::from(&output_path),
&output_root,
"proposal output path",
)?;
let target_path = action_outbox_path(&control_root, &action_id);
ensure_parent(&target_path)?;
fs::write(
&target_path,
to_json_pretty(&ControlledActionManifest {
version: 1,
kind: "controlled_action".to_string(),
action: rho_core::ControlledAction {
action_id: proposed.action_id,
request_id: proposed.request_id,
tool_id: proposed.tool_id,
requested_by: proposed.requested_by,
requested_for: proposed.requested_for,
action_type: proposed.action_type,
summary: proposed.summary,
reason: proposed.reason,
input_path: None,
script_path: Some(script_path),
output_path,
},
})?,
)?;
println!("{}", target_path.display());
Ok(())
}
fn handle_grant_action(rest: &[String]) -> RhoResult<()> {
let shared_root = PathBuf::from(require_arg(rest, "--shared-root").unwrap_or_else(|_| usage()));
let control_root =
PathBuf::from(require_arg(rest, "--control-root").unwrap_or_else(|_| usage()));
let action_id = require_arg(rest, "--action-id").unwrap_or_else(|_| usage());
let granted_by =
normalize_actor_id(&require_arg(rest, "--granted-by").unwrap_or_else(|_| usage()))?;
validate_action_id(&action_id)?;
validate_actor_id(&granted_by)?;
let action_path = action_outbox_path(&control_root, &action_id);
let action_text = fs::read_to_string(&action_path).map_err(|_| {
format!(
"controlled action file not found: {}",
action_path.display()
)
})?;
let manifest: ControlledActionManifest = from_json(&action_text)?;
if manifest.version != 1 || manifest.kind != "controlled_action" {
return Err("invalid controlled action envelope".into());
}
let action = manifest.action;
if action.action_id != action_id {
return Err(format!(
"action id mismatch: argument {action_id}, payload {}",
action.action_id
)
.into());
}
validate_request_id(&action.request_id)?;
validate_tool_id(&action.tool_id)?;
validate_action_type(&action.action_type)?;
load_tool(&shared_root, &action.tool_id)?;
let grant_path = action_grant_path(&shared_root, &action_id);
ensure_parent(&grant_path)?;
fs::write(
&grant_path,
to_yaml(&ActionGrantManifest {
version: 1,
action_grant: ActionGrant {
action_id: action.action_id,
request_id: action.request_id,
tool_id: action.tool_id,
action_type: action.action_type,
decision: "approved".to_string(),
granted_by,
created_at: rho_core::now_rfc3339(),
action: GrantedActionFile {
path: action_path.display().to_string(),
sha256: file_digest(&action_path)?,
},
repo: GrantedRepoState {
git_commit: git_commit(&shared_root),
},
inputs: grant_inputs_from_args(rest)?,
},
})?,
)?;
println!("{}", grant_path.display());
Ok(())
}
fn handle_controlled_action(rest: &[String]) -> RhoResult<()> {
let shared_root = PathBuf::from(require_arg(rest, "--shared-root").unwrap_or_else(|_| usage()));
let control_root =
PathBuf::from(require_arg(rest, "--control-root").unwrap_or_else(|_| usage()));
let private_root =
PathBuf::from(require_arg(rest, "--private-root").unwrap_or_else(|_| usage()));
let action_id = require_arg(rest, "--action-id").unwrap_or_else(|_| usage());
validate_action_id(&action_id)?;
let action_text =
fs::read_to_string(action_outbox_path(&control_root, &action_id)).map_err(|_| {
format!(
"controlled action file not found: {}",
action_outbox_path(&control_root, &action_id).display()
)
})?;
let manifest: ControlledActionManifest = from_json(&action_text)?;
if manifest.version != 1 || manifest.kind != "controlled_action" {
write_action_status(
&control_root,
&action_id,
"",
"failed",
None,
None,
Some("invalid controlled action envelope"),
)?;
return Err("invalid controlled action envelope".into());
}
let action = manifest.action;
validate_action_id(&action.action_id)?;
validate_request_id(&action.request_id)?;
validate_tool_id(&action.tool_id)?;
validate_actor_id(&action.requested_by)?;
validate_actor_id(&action.requested_for)?;
validate_action_type(&action.action_type)?;
if action.action_id != action_id {
write_action_status(
&control_root,
&action_id,
&action.request_id,
"failed",
None,
None,
Some("action id mismatch"),
)?;
return Err(format!(
"action id mismatch: argument {action_id}, payload {}",
action.action_id
)
.into());
}
let grant = match validate_action_grant(&shared_root, &control_root, &action) {
Ok(grant) => grant,
Err(error) => {
write_action_status(
&control_root,
&action_id,
&action.request_id,
"failed",
None,
None,
Some(&error.to_string()),
)?;
return Err(error);
}
};
if action.action_type == "release_results" {
let input_path = action
.input_path
.as_ref()
.ok_or("input_path is required for release_results")?;
if !grant_has_input(&grant, input_path) {
let error = format!("release input is not covered by action grant: {input_path}");
write_action_status(
&control_root,
&action_id,
&action.request_id,
"failed",
None,
None,
Some(&error),
)?;
return Err(error.into());
}
match publish_result_artifact(
&shared_root,
&action.request_id,
&PathBuf::from(input_path),
&PathBuf::from(&action.output_path),
) {
Ok(()) => {
let output_path = PathBuf::from(&action.output_path);
write_action_status(
&control_root,
&action_id,
&action.request_id,
"completed",
None,
Some(&output_path),
None,
)?;
println!(
"{}",
action_status_path(&control_root, &action_id).display()
);
return Ok(());
}
Err(error) => {
write_action_status(
&control_root,
&action_id,
&action.request_id,
"failed",
None,
None,
Some(&error.to_string()),
)?;
return Err(error);
}
}
}
let run_tier = match action.action_type.as_str() {
"run_mock_data" => "mock",
"run_real_data" => "real",
_ => {
let error = format!(
"controlled action type is not implemented yet: {}",
action.action_type
);
write_action_status(
&control_root,
&action_id,
&action.request_id,
"failed",
None,
None,
Some(&error),
)?;
return Err(error.into());
}
};
let script_path = action
.script_path
.as_ref()
.ok_or_else(|| format!("script_path is required for {}", action.action_type))?;
if !grant_has_input(&grant, script_path) {
let error = format!("script path is not covered by action grant: {script_path}");
write_action_status(
&control_root,
&action_id,
&action.request_id,
"failed",
None,
None,
Some(&error),
)?;
return Err(error.into());
}
if let Err(error) = ensure_path_under(
&PathBuf::from(script_path),
&shared_root.join("workspace"),
"script path",
) {
write_action_status(
&control_root,
&action_id,
&action.request_id,
"failed",
None,
None,
Some(&error.to_string()),
)?;
return Err(error);
}
if let Err(error) = ensure_path_under(
&PathBuf::from(&action.output_path),
&shared_root.join(".rho/runs"),
"run output path",
) {
write_action_status(
&control_root,
&action_id,
&action.request_id,
"failed",
None,
None,
Some(&error.to_string()),
)?;
return Err(error);
}
let run_id = run_id_for_action(&action_id);
validate_run_id(&run_id)?;
match execute_request(
&shared_root,
Some(&private_root),
&action.request_id,
&run_id,
Some(run_tier.to_string()),
false,
None,
) {
Ok(stdout_path) => {
write_action_status(
&control_root,
&action_id,
&action.request_id,
"completed",
Some(&run_id),
Some(&stdout_path),
None,
)?;
println!(
"{}",
action_status_path(&control_root, &action_id).display()
);
Ok(())
}
Err(error) => {
write_action_status(
&control_root,
&action_id,
&action.request_id,
"failed",
Some(&run_id),
None,
Some(&error.to_string()),
)?;
Err(error)
}
}
}
pub fn run(args: &[String]) -> RhoResult<()> {
let Some(command) = args.first().map(String::as_str) else {
usage();
};
let rest = &args[1..];
match command {
"approve-pr" => handle_approve_pr(rest),
"approve-request" => handle_approve_request(rest).map(|_| ()),
"request" => handle_request(rest),
"import-action" => handle_import_action(rest),
"proposal-action" => handle_proposal_action(rest),
"grant-action" => handle_grant_action(rest),
"controlled-action" => handle_controlled_action(rest),
_ => usage(),
}
}