use std::env;
use std::fs;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use rho_core::{
Approval, ApprovalManifest, RequestManifest, RhoResult, RunRequest, ensure_parent, file_digest,
from_yaml, is_rho_encrypted_text, normalize_actor_id, normalize_repo_id, providers,
read_to_string_if_exists, require_arg, split_command_line, to_yaml, validate_actor_id,
validate_request_id, validate_tier, validate_tool_id, yaml_quote,
};
use serde::{Deserialize, Serialize};
fn usage() -> ! {
eprintln!(
"usage: rho request <status|approve|pending|show|create-run|submit-run|review> ...\n rho request submit-run <req-id> [owner/repo] [--repo <owner/repo>] [--root <repo-root>] [--from <id>] --to <id> (--tool <id>|--tool-id <id>) (--dataset <name|uuid>|--dataset-uuid <uuid>) (--code <path>|--code-path <path>) [--command <cmd>] [--tier <tier>] [--branch <name>] [--no-sync] [--no-stage] [--commit] [--push] [--pr]\n rho request review <req-id> [--root <repo-root>] [--mock-dataset <path>] [--summary-out <path>] [--no-run]"
);
std::process::exit(2);
}
fn request_path(shared_root: &Path, request_id: &str) -> PathBuf {
shared_root
.join(".rho/requests")
.join(format!("{request_id}.yaml"))
}
fn approval_path(shared_root: &Path, request_id: &str) -> PathBuf {
shared_root
.join(".rho/approvals")
.join(format!("{request_id}.yaml"))
}
fn approval_status(shared_root: &Path, request_id: &str) -> RhoResult<String> {
let Some(text) = read_to_string_if_exists(&approval_path(shared_root, request_id))? else {
return Ok("pending".to_string());
};
let decision = from_yaml::<ApprovalManifest>(&text)?
.approval
.decision
.to_ascii_lowercase();
Ok(match decision.as_str() {
"approve" | "approved" => "approved".to_string(),
"deny" | "denied" | "reject" | "rejected" => "denied".to_string(),
_ => "pending".to_string(),
})
}
fn shared_request_manifest(shared_root: &Path, request_id: &str) -> RhoResult<RequestManifest> {
let path = request_path(shared_root, request_id);
let text =
fs::read_to_string(&path).map_err(|_| format!("request not found: {}", path.display()))?;
from_yaml(&text)
}
fn pending_ids(shared_root: &Path) -> RhoResult<Vec<String>> {
let dir = shared_root.join(".rho/requests");
if !dir.is_dir() {
return Ok(Vec::new());
}
let mut ids = Vec::new();
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|value| value.to_str()) != Some("yaml") {
continue;
}
let Some(stem) = path.file_stem().and_then(|value| value.to_str()) else {
continue;
};
if approval_status(shared_root, stem)? == "pending" {
ids.push(stem.to_string());
}
}
ids.sort();
Ok(ids)
}
#[derive(Debug, Deserialize)]
struct RepoInboxRequestManifest {
request: RepoInboxRequest,
}
#[derive(Debug, Deserialize)]
struct DatasetManifest {
dataset: DatasetRecord,
}
#[derive(Debug, Deserialize)]
struct DatasetRecord {
uuid: String,
name: String,
variants: DatasetVariants,
}
#[derive(Debug, Deserialize)]
struct DatasetVariants {
#[serde(default)]
public: Option<DatasetVariant>,
#[serde(default)]
mock: Option<DatasetVariant>,
}
#[derive(Debug, Deserialize)]
struct DatasetVariant {
#[serde(default)]
relative_path: Option<String>,
}
#[derive(Debug, Deserialize)]
struct RepoInboxRequest {
id: String,
from: String,
to: String,
#[serde(default, rename = "type")]
request_type: Option<String>,
#[serde(default)]
tool_id: Option<String>,
#[serde(default)]
dataset_uuid: Option<String>,
#[serde(default)]
code_path: Option<String>,
#[serde(default)]
code_paths: Vec<String>,
#[serde(default)]
code_sha256: Option<String>,
#[serde(default)]
requested_tier: Option<String>,
#[serde(default)]
tier: Option<String>,
}
#[derive(Debug, Serialize)]
struct RequestReviewSummaryManifest {
version: u32,
request_review: RequestReviewSummary,
}
#[derive(Debug, Serialize)]
struct RequestReviewSummary {
request_id: String,
request_path: String,
from: String,
to: String,
#[serde(skip_serializing_if = "Option::is_none")]
request_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
tool_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
dataset_uuid: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
requested_tier: Option<String>,
signature: RequestReviewSignature,
code: RequestReviewCode,
mock_run: RequestReviewMockRun,
reviewed_at: String,
}
#[derive(Debug, Serialize)]
struct RequestReviewSignature {
status: String,
path: String,
}
#[derive(Debug, Serialize)]
struct RequestReviewCode {
path: String,
fs_path: String,
sha256: String,
digest_status: String,
}
#[derive(Debug, Serialize)]
struct RequestReviewMockRun {
status: String,
#[serde(skip_serializing_if = "Option::is_none")]
dataset_path: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
stdout: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
stderr: Option<String>,
}
fn review_request(request_id: &str, args: &[String]) -> RhoResult<()> {
validate_request_id(request_id)?;
let root = rho_core::arg_value(args, "--root")
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from("."));
let summary_out = rho_core::arg_value(args, "--summary-out").map(PathBuf::from);
let repo_id = read_repo_id(&root)?;
let request_path = find_repo_inbox_request(&root, request_id)?;
refresh_review_file_if_needed(&root, &request_path)?;
let text = fs::read_to_string(&request_path)?;
let manifest: RepoInboxRequestManifest = from_yaml(&text)?;
let request = manifest.request;
if request.id != request_id {
return Err(format!(
"request id mismatch in {}: expected {request_id}, got {}",
request_path.display(),
request.id
)
.into());
}
validate_actor_id(&request.from)?;
validate_actor_id(&request.to)?;
println!("rho request review");
println!("request: {}", request.id);
println!("path: {}", request_path.display());
println!("from: {}", request.from);
println!("to: {}", request.to);
if let Some(request_type) = &request.request_type {
println!("type: {request_type}");
}
if let Some(tool_id) = &request.tool_id {
println!("tool_id: {tool_id}");
}
if let Some(dataset_uuid) = &request.dataset_uuid {
println!("dataset_uuid: {dataset_uuid}");
}
if let Some(tier) = request.requested_tier.as_ref().or(request.tier.as_ref()) {
println!("requested_tier: {tier}");
}
let signature_path = request_path.with_file_name("request.rhosig.yaml");
if signature_path.is_file() {
refresh_review_file_if_needed(&root, &signature_path)?;
}
let signature_status;
if signature_path.is_file() {
verify_request_signature(RequestSignatureVerify {
request_path: &request_path,
signature_path: &signature_path,
identity: &request.from,
repo_root: &root,
repo_id: &repo_id,
request_id,
recipient: &request.to,
})?;
signature_status = "verified".to_string();
println!("signature: verified {}", signature_path.display());
} else {
signature_status = "missing".to_string();
println!("signature: missing {}", signature_path.display());
}
let code_path = request
.code_path
.as_ref()
.or_else(|| request.code_paths.first())
.ok_or("request has no code_path or code_paths entry")?;
let code_fs_path = repo_path(&root, code_path);
let code_sha256 = file_digest(&code_fs_path)?;
let mut code_digest_status = "unverified".to_string();
println!("code_path: {code_path}");
println!("code_fs_path: {}", code_fs_path.display());
println!("code_sha256: {code_sha256}");
if let Some(expected) = &request.code_sha256 {
if expected != &code_sha256 {
return Err(format!(
"code digest mismatch for {}: expected {expected}, got {code_sha256}",
code_fs_path.display()
)
.into());
}
code_digest_status = "verified".to_string();
println!("code_digest: verified");
}
if rho_core::has_flag(args, "--no-run") {
println!("mock_run: skipped");
write_review_summary_if_requested(
summary_out.as_deref(),
RequestReviewSummary {
request_id: request.id,
request_path: repo_relative_or_display(&root, &request_path),
from: request.from,
to: request.to,
request_type: request.request_type,
tool_id: request.tool_id,
dataset_uuid: request.dataset_uuid,
requested_tier: request.requested_tier.or(request.tier),
signature: RequestReviewSignature {
status: signature_status,
path: repo_relative_or_display(&root, &signature_path),
},
code: RequestReviewCode {
path: code_path.to_string(),
fs_path: repo_relative_or_display(&root, &code_fs_path),
sha256: code_sha256,
digest_status: code_digest_status,
},
mock_run: RequestReviewMockRun {
status: "skipped".to_string(),
dataset_path: None,
stdout: None,
stderr: None,
},
reviewed_at: rho_core::now_rfc3339(),
},
)?;
return Ok(());
}
let mock_dataset = rho_core::arg_value(args, "--mock-dataset")
.map(PathBuf::from)
.or_else(|| {
request
.dataset_uuid
.as_ref()
.map(|uuid| default_mock_dataset(&root, uuid))
});
let Some(mock_dataset) = mock_dataset else {
println!("mock_run: skipped, no dataset_uuid or --mock-dataset");
write_review_summary_if_requested(
summary_out.as_deref(),
RequestReviewSummary {
request_id: request.id,
request_path: repo_relative_or_display(&root, &request_path),
from: request.from,
to: request.to,
request_type: request.request_type,
tool_id: request.tool_id,
dataset_uuid: request.dataset_uuid,
requested_tier: request.requested_tier.or(request.tier),
signature: RequestReviewSignature {
status: signature_status,
path: repo_relative_or_display(&root, &signature_path),
},
code: RequestReviewCode {
path: code_path.to_string(),
fs_path: repo_relative_or_display(&root, &code_fs_path),
sha256: code_sha256,
digest_status: code_digest_status,
},
mock_run: RequestReviewMockRun {
status: "skipped".to_string(),
dataset_path: None,
stdout: None,
stderr: None,
},
reviewed_at: rho_core::now_rfc3339(),
},
)?;
return Ok(());
};
if !mock_dataset.is_file() {
println!("mock_run: skipped, missing {}", mock_dataset.display());
write_review_summary_if_requested(
summary_out.as_deref(),
RequestReviewSummary {
request_id: request.id,
request_path: repo_relative_or_display(&root, &request_path),
from: request.from,
to: request.to,
request_type: request.request_type,
tool_id: request.tool_id,
dataset_uuid: request.dataset_uuid,
requested_tier: request.requested_tier.or(request.tier),
signature: RequestReviewSignature {
status: signature_status,
path: repo_relative_or_display(&root, &signature_path),
},
code: RequestReviewCode {
path: code_path.to_string(),
fs_path: repo_relative_or_display(&root, &code_fs_path),
sha256: code_sha256,
digest_status: code_digest_status,
},
mock_run: RequestReviewMockRun {
status: "skipped-missing-dataset".to_string(),
dataset_path: Some(repo_relative_or_display(&root, &mock_dataset)),
stdout: None,
stderr: None,
},
reviewed_at: rho_core::now_rfc3339(),
},
)?;
return Ok(());
}
let output = Command::new("python3")
.arg(&code_fs_path)
.arg(&mock_dataset)
.output()?;
print!("{}", String::from_utf8_lossy(&output.stdout));
if !output.status.success() {
eprint!("{}", String::from_utf8_lossy(&output.stderr));
return Err(format!("mock run failed with status {}", output.status).into());
}
println!("mock_run: passed {}", mock_dataset.display());
write_review_summary_if_requested(
summary_out.as_deref(),
RequestReviewSummary {
request_id: request.id,
request_path: repo_relative_or_display(&root, &request_path),
from: request.from,
to: request.to,
request_type: request.request_type,
tool_id: request.tool_id,
dataset_uuid: request.dataset_uuid,
requested_tier: request.requested_tier.or(request.tier),
signature: RequestReviewSignature {
status: signature_status,
path: repo_relative_or_display(&root, &signature_path),
},
code: RequestReviewCode {
path: code_path.to_string(),
fs_path: repo_relative_or_display(&root, &code_fs_path),
sha256: code_sha256,
digest_status: code_digest_status,
},
mock_run: RequestReviewMockRun {
status: "passed".to_string(),
dataset_path: Some(repo_relative_or_display(&root, &mock_dataset)),
stdout: Some(String::from_utf8_lossy(&output.stdout).trim().to_string()),
stderr: if output.stderr.is_empty() {
None
} else {
Some(String::from_utf8_lossy(&output.stderr).trim().to_string())
},
},
reviewed_at: rho_core::now_rfc3339(),
},
)?;
Ok(())
}
fn refresh_review_file_if_needed(root: &Path, path: &Path) -> RhoResult<()> {
let text = fs::read_to_string(path)?;
if !is_rho_encrypted_text(&text) {
return Ok(());
}
let relative_path = path
.strip_prefix(root)
.map(|relative| relative.display().to_string())
.unwrap_or_else(|_| path.display().to_string());
let plaintext = smudge_review_file_text(root, &relative_path, &text)?;
if is_rho_encrypted_text(&plaintext) {
return Err(format!(
"cannot decrypt protected review file {}; run with a RHO_HOME/RHO_IDENTITY that can open it",
path.display()
)
.into());
}
fs::write(path, plaintext)?;
println!("refreshed protected file: {relative_path}");
Ok(())
}
fn smudge_review_file_text(root: &Path, relative_path: &str, text: &str) -> RhoResult<String> {
let mut rho_crypto = rho_crypto_command();
let mut child = rho_crypto
.arg("smudge")
.arg("--repo-root")
.arg(root)
.arg("--path")
.arg(relative_path)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()?;
{
let Some(stdin) = child.stdin.as_mut() else {
return Err("failed to open rho crypto smudge stdin".into());
};
stdin.write_all(text.as_bytes())?;
}
let output = child.wait_with_output()?;
if !output.status.success() {
return Err(format!("rho crypto smudge failed for {relative_path}").into());
}
Ok(String::from_utf8(output.stdout)?)
}
fn write_review_summary_if_requested(
summary_out: Option<&Path>,
summary: RequestReviewSummary,
) -> RhoResult<()> {
let Some(path) = summary_out else {
return Ok(());
};
ensure_parent(path)?;
fs::write(
path,
to_yaml(&RequestReviewSummaryManifest {
version: 1,
request_review: summary,
})?,
)?;
println!("review_summary: {}", path.display());
Ok(())
}
fn repo_relative_or_display(root: &Path, path: &Path) -> String {
path.strip_prefix(root)
.map(|relative| relative.display().to_string())
.unwrap_or_else(|_| path.display().to_string())
}
#[derive(Debug, Deserialize)]
struct RepoManifest {
repo: RepoRecord,
}
#[derive(Debug, Deserialize)]
struct RepoRecord {
id: String,
}
fn read_repo_id(root: &Path) -> RhoResult<String> {
let path = root.join("rho/repo.yaml");
let text = fs::read_to_string(&path)
.map_err(|_| format!("repo manifest not found: {}", path.display()))?;
let repo: RepoManifest = from_yaml(&text)?;
normalize_repo_id(&repo.repo.id)
}
fn find_repo_inbox_request(root: &Path, request_id: &str) -> RhoResult<PathBuf> {
let inbox_root = root.join("rho/messages/inbox");
if !inbox_root.is_dir() {
return Err(format!("repo inbox not found: {}", inbox_root.display()).into());
}
let mut matches = Vec::new();
find_request_paths(&inbox_root, request_id, &mut matches)?;
matches.sort();
match matches.len() {
0 => Err(format!("request not found in repo inbox: {request_id}").into()),
1 => Ok(matches.remove(0)),
_ => Err(format!("multiple requests found for {request_id}").into()),
}
}
fn find_request_paths(dir: &Path, request_id: &str, matches: &mut Vec<PathBuf>) -> RhoResult<()> {
for entry in fs::read_dir(dir)? {
let path = entry?.path();
if path.is_dir() {
find_request_paths(&path, request_id, matches)?;
continue;
}
if path.file_name().and_then(|value| value.to_str()) != Some("request.yaml") {
continue;
}
if path
.parent()
.and_then(|value| value.file_name())
.and_then(|value| value.to_str())
== Some(request_id)
{
matches.push(path);
}
}
Ok(())
}
struct RequestSignatureVerify<'a> {
request_path: &'a Path,
signature_path: &'a Path,
identity: &'a str,
repo_root: &'a Path,
repo_id: &'a str,
request_id: &'a str,
recipient: &'a str,
}
fn verify_request_signature(signature: RequestSignatureVerify<'_>) -> RhoResult<()> {
let mut rho_crypto = rho_crypto_command();
let status = rho_crypto
.arg("verify")
.arg(signature.request_path)
.arg("--signature")
.arg(signature.signature_path)
.arg("--identity")
.arg(signature.identity)
.arg("--repo-root")
.arg(signature.repo_root)
.arg("--repo-id")
.arg(signature.repo_id)
.arg("--request-id")
.arg(signature.request_id)
.arg("--recipient")
.arg(signature.recipient)
.arg("--purpose")
.arg("rho.request")
.status()?;
if !status.success() {
return Err(format!(
"signature verification failed: {}",
signature.signature_path.display()
)
.into());
}
Ok(())
}
fn rho_crypto_command() -> Command {
let rho = env::current_exe()
.ok()
.map(|path| path.with_file_name("rho"))
.unwrap_or_else(|| PathBuf::from("rho"));
let mut command = if rho.is_file() {
Command::new(rho)
} else {
let mut command = Command::new("cargo");
command.args(["run", "--quiet", "--bin", "rho", "--"]);
command
};
command.arg("crypto");
command
}
fn repo_path(root: &Path, path: &str) -> PathBuf {
if let Some(relative) = path.strip_prefix("rho://repo/") {
return root.join(relative);
}
root.join(path)
}
fn option_value_any(args: &[String], flags: &[&str]) -> Option<String> {
flags
.iter()
.find_map(|flag| rho_core::arg_value(args, flag))
}
fn active_identity() -> RhoResult<Option<String>> {
env::var("RHO_IDENTITY")
.ok()
.map(|value| normalize_actor_id(&value))
.transpose()
}
fn active_profile_handle() -> Option<String> {
env::var("RHO_ENV_HANDLE")
.ok()
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
}
fn github_handle_from_identity(identity: &str) -> Option<String> {
providers::github::handle_from_identity_id(identity).ok()
}
fn default_project_root(profile_handle: &str, repo_name: &str) -> PathBuf {
env::var("RHO_PROJECTS_ROOT")
.map(PathBuf::from)
.unwrap_or_else(|_| {
env::var("HOME")
.map(PathBuf::from)
.unwrap_or_else(|_| PathBuf::from("."))
.join("rho")
})
.join(profile_handle)
.join("projects")
.join(repo_name)
}
fn repo_name_from_ref(value: &str) -> RhoResult<String> {
let normalized = normalize_repo_id(value)?;
let Some(path) = normalized.strip_prefix("rho://repo/") else {
return Err(format!("invalid repository reference: {value}").into());
};
let Some(repo) = path.rsplit('/').next().filter(|repo| !repo.is_empty()) else {
return Err(format!("invalid repository reference: {value}").into());
};
Ok(repo.to_string())
}
fn submit_repo_ref(args: &[String]) -> Option<String> {
rho_core::arg_value(args, "--repo")
.or_else(|| args.get(1).filter(|value| !value.starts_with('-')).cloned())
}
fn submit_root(args: &[String]) -> RhoResult<PathBuf> {
if let Some(root) = rho_core::arg_value(args, "--root") {
return Ok(PathBuf::from(root));
}
let Some(repo_ref) = submit_repo_ref(args) else {
usage();
};
let handle = active_profile_handle()
.ok_or("missing --root; provide --profile with a repo slug or pass --root explicitly")?;
Ok(default_project_root(
&handle,
&repo_name_from_ref(&repo_ref)?,
))
}
fn repo_relative_path(root: &Path, path: &Path) -> RhoResult<String> {
Ok(path
.strip_prefix(root)
.map_err(|_| {
format!(
"path must be inside repo root {}: {}",
root.display(),
path.display()
)
})?
.display()
.to_string())
}
fn identity_inbox_relative_path(identity_id: &str) -> RhoResult<PathBuf> {
providers::identity_inbox_relative_path(identity_id)
}
fn sign_request_file(
root: &Path,
repo_id: &str,
request_id: &str,
signer: &str,
recipient: &str,
request_path: &Path,
signature_path: &Path,
) -> RhoResult<()> {
let mut rho_crypto = rho_crypto_command();
let status = rho_crypto
.arg("sign")
.arg(request_path)
.arg("--identity")
.arg(signer)
.arg("--out")
.arg(signature_path)
.arg("--repo-id")
.arg(repo_id)
.arg("--request-id")
.arg(request_id)
.arg("--recipient")
.arg(recipient)
.arg("--purpose")
.arg("rho.request")
.current_dir(root)
.status()?;
if !status.success() {
return Err(format!("failed to sign request: {}", request_path.display()).into());
}
Ok(())
}
fn git_add_with_identity(root: &Path, path: &Path, identity: &str) -> RhoResult<()> {
let relative = repo_relative_path(root, path)?;
let status = Command::new("git")
.arg("-C")
.arg(root)
.arg("add")
.arg(&relative)
.env("RHO_IDENTITY", identity)
.status()?;
if !status.success() {
return Err(format!("git add failed for {relative}").into());
}
Ok(())
}
fn finish_branch_work(
args: &[String],
root: &Path,
request_id: &str,
from: &str,
to: &str,
stage_paths: &[&Path],
) -> RhoResult<()> {
let want_pr = rho_core::has_flag(args, "--pr");
let want_commit = want_pr || rho_core::has_flag(args, "--commit");
let want_push = want_pr || rho_core::has_flag(args, "--push");
if !want_commit && !want_push && !want_pr {
return Ok(());
}
if want_commit {
for path in stage_paths {
git_add_with_identity(root, path, from)?;
}
git_commit(root, &format!("Request private analysis {request_id}"))?;
println!("committed: Request private analysis {request_id}");
}
if want_push {
git_push_current_branch(root)?;
println!("pushed: {}", current_branch(root)?);
}
if want_pr {
create_pr(
root,
&format!("Request private analysis {request_id}"),
&format!("Signed encrypted request {request_id} from {from} to {to}."),
)?;
}
Ok(())
}
fn switch_branch(root: &Path, branch: &str) -> RhoResult<()> {
let status = Command::new("git")
.arg("-C")
.arg(root)
.args(["switch", "-C", branch])
.status()?;
if !status.success() {
return Err(format!("git switch failed for branch {branch}").into());
}
Ok(())
}
fn git_commit(root: &Path, message: &str) -> RhoResult<()> {
let exe = env::current_exe()?;
let status = Command::new(exe)
.current_dir(root)
.arg("commit")
.arg("-m")
.arg(message)
.status()?;
if !status.success() {
return Err(format!("rho commit failed in {}", root.display()).into());
}
Ok(())
}
fn git_push_current_branch(root: &Path) -> RhoResult<()> {
let branch = current_branch(root)?;
let status = Command::new("git")
.arg("-C")
.arg(root)
.args(["push", "-u", "origin", &branch])
.status()?;
if !status.success() {
return Err(format!("git push failed for {branch}").into());
}
Ok(())
}
fn current_branch(root: &Path) -> RhoResult<String> {
let output = Command::new("git")
.arg("-C")
.arg(root)
.args(["branch", "--show-current"])
.output()?;
if !output.status.success() {
return Err(format!("git branch --show-current failed in {}", root.display()).into());
}
let branch = String::from_utf8(output.stdout)?.trim().to_string();
if branch.is_empty() {
return Err("current Git branch is empty".into());
}
Ok(branch)
}
fn create_pr(root: &Path, title: &str, body: &str) -> RhoResult<()> {
let exe = env::current_exe()?;
let status = Command::new(exe)
.arg("repo")
.arg("create-pr")
.arg("--root")
.arg(root)
.arg("--title")
.arg(title)
.arg("--body")
.arg(body)
.status()?;
if !status.success() {
return Err(format!("rho repo create-pr failed in {}", root.display()).into());
}
Ok(())
}
fn sync_repo_for_submit(args: &[String], root: &Path, from: &str) -> RhoResult<()> {
if submit_repo_ref(args).is_none() && detect_sync_remote(root).is_none() {
return Ok(());
}
let mut sync_args = vec!["repo".to_string(), "sync".to_string()];
if let Some(repo_ref) = submit_repo_ref(args) {
sync_args.push(repo_ref);
}
sync_args.push("--root".to_string());
sync_args.push(root.display().to_string());
run_rho_command_with_identity(&sync_args, Some(from))
}
fn run_rho_command_with_identity(args: &[String], identity: Option<&str>) -> RhoResult<()> {
let exe = env::current_exe()?;
let mut command = Command::new(exe);
command.args(args);
if let Some(identity) = identity {
command.env("RHO_IDENTITY", identity);
}
let status = command.status()?;
if !status.success() {
return Err(format!("rho command failed: {}", args.join(" ")).into());
}
Ok(())
}
fn detect_sync_remote(root: &Path) -> Option<String> {
detect_named_sync_remote(root, "upstream").or_else(|| detect_named_sync_remote(root, "origin"))
}
fn detect_named_sync_remote(root: &Path, remote_name: &str) -> Option<String> {
let output = Command::new("git")
.arg("-C")
.arg(root)
.args(["remote", "get-url", remote_name])
.output()
.ok()?;
if !output.status.success() {
return None;
}
let remote = String::from_utf8_lossy(&output.stdout).trim().to_string();
providers::github::repo_candidate_from_remote(&remote)
}
fn default_mock_dataset(root: &Path, dataset_uuid: &str) -> PathBuf {
find_dataset(root, dataset_uuid)
.ok()
.flatten()
.and_then(|dataset| dataset.mock_path)
.unwrap_or_else(|| {
root.join("datasets")
.join(dataset_uuid)
.join("mock")
.join("prices-mock.csv")
})
}
fn resolve_dataset_uuid(root: &Path, value: &str) -> RhoResult<String> {
if let Some(dataset) = find_dataset(root, value)? {
return Ok(dataset.uuid);
}
Ok(value.to_string())
}
fn default_dataset_tier(root: &Path, value: &str) -> RhoResult<String> {
Ok(find_dataset(root, value)?
.map(|dataset| dataset.default_tier)
.unwrap_or_else(|| "real".to_string()))
}
fn find_dataset(root: &Path, value: &str) -> RhoResult<Option<ResolvedDataset>> {
let datasets = root.join("datasets");
if !datasets.is_dir() {
return Ok(None);
}
let direct_manifest = datasets.join(value).join("dataset.yaml");
if direct_manifest.is_file() {
let dataset = read_dataset_manifest(&direct_manifest)?;
return Ok(Some(resolved_dataset_from_manifest(
&datasets.join(value),
dataset,
)?));
}
let mut matches = Vec::new();
for entry in fs::read_dir(&datasets)? {
let entry = entry?;
let dir = entry.path();
if !dir.is_dir() {
continue;
}
let manifest_path = dir.join("dataset.yaml");
if !manifest_path.is_file() {
continue;
}
let manifest = read_dataset_manifest(&manifest_path)?;
if manifest.dataset.uuid == value || manifest.dataset.name == value {
matches.push(resolved_dataset_from_manifest(&dir, manifest)?);
}
}
match matches.len() {
0 => Ok(None),
1 => Ok(matches.pop()),
_ => Err(format!("dataset reference is ambiguous: {value}; use the UUID").into()),
}
}
fn read_dataset_manifest(path: &Path) -> RhoResult<DatasetManifest> {
let text = fs::read_to_string(path)?;
from_yaml(&text)
}
fn resolved_dataset_from_manifest(
dir: &Path,
manifest: DatasetManifest,
) -> RhoResult<ResolvedDataset> {
let mock_path = manifest
.dataset
.variants
.mock
.as_ref()
.and_then(|mock| mock.relative_path.as_ref())
.map(|mock| {
rho_core::validate_relative_safe_path(mock)?;
Ok::<PathBuf, Box<dyn std::error::Error>>(dir.join(mock))
})
.transpose()?;
let default_tier = if manifest.dataset.variants.public.is_some() {
"public"
} else if manifest.dataset.variants.mock.is_some() {
"mock"
} else {
"real"
}
.to_string();
Ok(ResolvedDataset {
uuid: manifest.dataset.uuid,
mock_path,
default_tier,
})
}
struct ResolvedDataset {
uuid: String,
mock_path: Option<PathBuf>,
default_tier: String,
}
pub fn run(args: &[String]) -> RhoResult<()> {
let Some(command) = args.first().map(String::as_str) else {
usage();
};
let rest = &args[1..];
match command {
"status" => {
let request_id = rest.first().cloned().unwrap_or_else(|| usage());
let shared_root =
PathBuf::from(require_arg(rest, "--shared-root").unwrap_or_else(|_| usage()));
if !request_path(&shared_root, &request_id).is_file() {
return Err(format!(
"request not found: {}",
request_path(&shared_root, &request_id).display()
)
.into());
}
println!("{}", approval_status(&shared_root, &request_id)?);
}
"approve" => {
let shared_root =
PathBuf::from(require_arg(rest, "--shared-root").unwrap_or_else(|_| usage()));
let request_id = if rho_core::has_flag(rest, "--latest") {
pending_ids(&shared_root)?
.pop()
.ok_or("no pending requests")?
} else {
rest.first().cloned().unwrap_or_else(|| usage())
};
let decision_arg = require_arg(rest, "--decision").unwrap_or_else(|_| usage());
let status = match decision_arg.as_str() {
"approve" | "approved" => "approved",
"deny" | "denied" => "denied",
_ => return Err(format!("unsupported decision: {decision_arg}").into()),
};
let actor = rho_core::arg_value(rest, "--actor")
.map(|value| normalize_actor_id(&value))
.transpose()?
.unwrap_or_else(|| "cli".to_string());
let note = rho_core::arg_value(rest, "--note").unwrap_or_default();
validate_request_id(&request_id)?;
validate_actor_id(&actor)?;
let approval = approval_path(&shared_root, &request_id);
ensure_parent(&approval)?;
fs::write(
&approval,
to_yaml(&ApprovalManifest {
version: 1,
approval: Approval {
request_id: request_id.clone(),
decision: status.to_string(),
approver: actor.clone(),
note: note.clone(),
created_at: rho_core::now_rfc3339(),
},
})?,
)?;
let requester = shared_request_manifest(&shared_root, &request_id)?
.request
.from;
let inbox = shared_root
.join(".rho/inbox")
.join(&requester)
.join(format!("approval-{request_id}.yaml"));
ensure_parent(&inbox)?;
let message_text = if note.is_empty() {
format!("Request {request_id} {status}.")
} else {
note.clone()
};
fs::write(
inbox,
format!(
"version: 1\nmessage:\n id: {}\n from: {}\n to: {}\n type: \"approval_update\"\n related_request_id: {}\n body:\n status: {}\n text: {}\n",
yaml_quote(&format!("approval-{request_id}")),
yaml_quote(&actor),
yaml_quote(&requester),
yaml_quote(&request_id),
yaml_quote(status),
yaml_quote(&message_text),
),
)?;
println!("{status}");
}
"pending" => {
let shared_root =
PathBuf::from(require_arg(rest, "--shared-root").unwrap_or_else(|_| usage()));
let ids = pending_ids(&shared_root)?;
if rho_core::has_flag(rest, "--latest") {
println!("{}", ids.last().ok_or("no pending requests")?);
} else {
for id in ids {
println!("{id}");
}
}
}
"show" => {
let request_id = rest.first().cloned().unwrap_or_else(|| usage());
let shared_root =
PathBuf::from(require_arg(rest, "--shared-root").unwrap_or_else(|_| usage()));
let path = request_path(&shared_root, &request_id);
print!("{}", fs::read_to_string(path)?);
}
"review" => {
let request_id = rest.first().cloned().unwrap_or_else(|| usage());
review_request(&request_id, rest)?;
}
"submit-run" => {
let request_id = rest.first().cloned().unwrap_or_else(|| usage());
let root = submit_root(rest)?;
let from = rho_core::arg_value(rest, "--from")
.map(|value| normalize_actor_id(&value))
.transpose()?
.or(active_identity()?)
.ok_or("missing --from and no active RHO_IDENTITY; use --profile or --from")?;
if !rho_core::has_flag(rest, "--no-sync") {
sync_repo_for_submit(rest, &root, &from)?;
}
let to = normalize_actor_id(&require_arg(rest, "--to").unwrap_or_else(|_| usage()))?;
let tool_id =
option_value_any(rest, &["--tool", "--tool-id"]).unwrap_or_else(|| usage());
let dataset_arg =
option_value_any(rest, &["--dataset", "--dataset-uuid"]).unwrap_or_else(|| usage());
let dataset_uuid = resolve_dataset_uuid(&root, &dataset_arg)?;
let code_path =
option_value_any(rest, &["--code", "--code-path"]).unwrap_or_else(|| usage());
let tier = rho_core::arg_value(rest, "--tier")
.unwrap_or(default_dataset_tier(&root, &dataset_arg)?);
validate_request_id(&request_id)?;
validate_actor_id(&from)?;
validate_actor_id(&to)?;
validate_tool_id(&tool_id)?;
validate_tier(&tier)?;
if let Some(branch) = rho_core::arg_value(rest, "--branch") {
switch_branch(&root, &branch)?;
} else if rho_core::has_flag(rest, "--pr") || rho_core::has_flag(rest, "--push") {
let handle = active_profile_handle()
.or_else(|| github_handle_from_identity(&from))
.ok_or("could not infer request branch owner")?;
switch_branch(&root, &format!("{handle}/{request_id}"))?;
}
let repo_id = read_repo_id(&root)?;
let code_fs_path = repo_path(&root, &code_path);
let code_sha256 = file_digest(&code_fs_path)?;
let command = rho_core::arg_value(rest, "--command")
.map(|value| split_command_line(&value))
.transpose()?
.unwrap_or_default();
let request_dir = root
.join("rho/messages/inbox")
.join(identity_inbox_relative_path(&to)?)
.join(&request_id);
let request_path = request_dir.join("request.yaml");
let signature_path = request_dir.join("request.rhosig.yaml");
ensure_parent(&request_path)?;
fs::write(
&request_path,
to_yaml(&RequestManifest {
version: 1,
request: RunRequest {
id: request_id.clone(),
from: from.clone(),
to: to.clone(),
tool_id,
dataset_uuid,
code_paths: vec![code_path],
code_sha256,
command,
requested_tier: tier,
created_at: rho_core::now_rfc3339(),
},
})?,
)?;
sign_request_file(
&root,
&repo_id,
&request_id,
&from,
&to,
&request_path,
&signature_path,
)?;
if !rho_core::has_flag(rest, "--no-stage") {
git_add_with_identity(&root, &request_path, &from)?;
git_add_with_identity(&root, &signature_path, &from)?;
println!("staged encrypted request files");
}
println!("request: {request_id}");
println!("path: {}", repo_relative_or_display(&root, &request_path));
println!(
"signature: {}",
repo_relative_or_display(&root, &signature_path)
);
finish_branch_work(
rest,
&root,
&request_id,
&from,
&to,
&[&code_fs_path, &request_path, &signature_path],
)?;
}
"create-run" => {
let shared_root =
PathBuf::from(require_arg(rest, "--shared-root").unwrap_or_else(|_| usage()));
let request_id = require_arg(rest, "--id").unwrap_or_else(|_| usage());
let from =
normalize_actor_id(&require_arg(rest, "--from").unwrap_or_else(|_| usage()))?;
let to = normalize_actor_id(&require_arg(rest, "--to").unwrap_or_else(|_| usage()))?;
let tool_id = require_arg(rest, "--tool-id").unwrap_or_else(|_| usage());
let dataset_uuid = require_arg(rest, "--dataset-uuid").unwrap_or_else(|_| usage());
let code_path = require_arg(rest, "--code-path").unwrap_or_else(|_| usage());
let command_text = require_arg(rest, "--command").unwrap_or_else(|_| usage());
let tier = rho_core::arg_value(rest, "--tier").unwrap_or_else(|| "real".to_string());
validate_request_id(&request_id)?;
validate_actor_id(&from)?;
validate_actor_id(&to)?;
validate_tool_id(&tool_id)?;
validate_tier(&tier)?;
let command = split_command_line(&command_text)?;
let code_sha256 = file_digest(&PathBuf::from(&code_path))?;
let path = request_path(&shared_root, &request_id);
ensure_parent(&path)?;
fs::write(
&path,
to_yaml(&RequestManifest {
version: 1,
request: RunRequest {
id: request_id.clone(),
from,
to,
tool_id,
dataset_uuid,
code_paths: vec![code_path],
code_sha256,
command,
requested_tier: tier,
created_at: rho_core::now_rfc3339(),
},
})?,
)?;
println!("{}", path.display());
}
_ => usage(),
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn extracts_repo_name_from_repo_refs() {
assert_eq!(
repo_name_from_ref("madhavajay/rho-live-clean-test").unwrap(),
"rho-live-clean-test"
);
assert_eq!(
repo_name_from_ref("rho://repo/github/madhavajay/rho-live-clean-test").unwrap(),
"rho-live-clean-test"
);
}
}