use std::fs;
use std::path::{Path, PathBuf};
use std::process::Command;
use rho_core::{
RhoResult, copy_dir_recursive, from_yaml, has_flag, normalize_actor_id, remove_dir_if_exists,
};
use serde::Deserialize;
fn usage() -> ! {
eprintln!(
"usage:\n rho publish <owner> <dataset-name|uuid> [--root <repo>] [--source-root <path>] [--target-root <path>] [--path-mode <name|uuid>] [--force] [--commit] [--push] [--pr]\n rho publish <dataset-name|uuid> [--owner <owner>] [--root <repo>] [--source-root <path>] [--target-root <path>] [--path-mode <name|uuid>] [--force] [--commit] [--push] [--pr]"
);
std::process::exit(2);
}
pub fn run(args: &[String]) -> RhoResult<()> {
if args.is_empty() || args.iter().any(|arg| arg == "--help" || arg == "-h") {
usage();
}
let positional = args
.iter()
.filter(|arg| !arg.starts_with("--"))
.cloned()
.collect::<Vec<_>>();
let (owner, dataset_ref) = match positional.as_slice() {
[owner, dataset_ref, ..] => (normalize_owner(owner)?, dataset_ref.clone()),
[dataset_ref] => (
rho_core::arg_value(args, "--owner")
.or_else(active_handle)
.ok_or("missing owner; pass --owner or use --profile")?,
dataset_ref.clone(),
),
[] => usage(),
};
let root = rho_core::arg_value(args, "--root")
.map(PathBuf::from)
.unwrap_or_else(|| PathBuf::from("."));
let source_root = rho_core::arg_value(args, "--source-root")
.map(PathBuf::from)
.unwrap_or_else(|| root.join("users"));
let target_root = rho_core::arg_value(args, "--target-root")
.map(PathBuf::from)
.unwrap_or_else(|| root.join("datasets"));
let source = resolve_source_bundle(&source_root, &owner, &dataset_ref)?;
let manifest = read_dataset_manifest(&source.join("dataset.yaml"))?;
let path_mode = rho_core::arg_value(args, "--path-mode").unwrap_or_else(|| "name".to_string());
let target_slug = match path_mode.as_str() {
"name" => dataset_path_slug(&manifest.dataset.name)?,
"uuid" => dataset_path_slug(&manifest.dataset.uuid)?,
_ => return Err(format!("unsupported --path-mode: {path_mode}").into()),
};
let target = target_root.join(&target_slug);
if should_branch(args) {
switch_branch(
&root,
&rho_core::arg_value(args, "--branch")
.unwrap_or_else(|| format!("{owner}/publish-{target_slug}")),
)?;
}
if !source.is_dir() {
return Err(format!("share bundle not found: {}", source.display()).into());
}
if target.exists() {
if has_flag(args, "--force") {
remove_dir_if_exists(&target)?;
} else {
return Err(format!(
"target already exists: {} (rerun with --force)",
target.display()
)
.into());
}
}
copy_dir_recursive(&source, &target)?;
println!("published dataset");
println!("owner: {owner}");
println!("uuid: {}", manifest.dataset.uuid);
println!("name: {}", manifest.dataset.name);
println!("source: {}", source.display());
println!("target: {}", target.display());
finish_publish(args, &root, &target_slug)?;
Ok(())
}
fn normalize_owner(value: &str) -> RhoResult<String> {
if value.starts_with("rho://") || value.starts_with("github/") {
let identity = normalize_actor_id(value)?;
return github_handle_from_identity(&identity);
}
Ok(value.to_string())
}
fn active_handle() -> Option<String> {
std::env::var("RHO_ENV_HANDLE")
.ok()
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
}
fn github_handle_from_identity(identity_id: &str) -> RhoResult<String> {
let Some(handle) = identity_id.strip_prefix("rho://id/github/") else {
return Err(format!("unsupported identity id: {identity_id}").into());
};
if handle.is_empty() || handle.contains('/') {
return Err(format!("unsupported identity id: {identity_id}").into());
}
Ok(handle.to_string())
}
fn resolve_source_bundle(source_root: &Path, owner: &str, dataset_ref: &str) -> RhoResult<PathBuf> {
let share_root = source_root.join(owner).join("datasets/share");
let direct = share_root.join(dataset_ref);
if direct.join("dataset.yaml").is_file() {
return Ok(direct);
}
let mut matches = Vec::new();
if share_root.is_dir() {
for entry in fs::read_dir(&share_root)? {
let dir = entry?.path();
let manifest_path = dir.join("dataset.yaml");
if !manifest_path.is_file() {
continue;
}
let manifest = read_dataset_manifest(&manifest_path)?;
if manifest.dataset.uuid == dataset_ref || manifest.dataset.name == dataset_ref {
matches.push(dir);
}
}
}
match matches.len() {
0 => Err(format!(
"share bundle not found for {dataset_ref} under {}",
share_root.display()
)
.into()),
1 => Ok(matches.remove(0)),
_ => Err(format!("dataset reference is ambiguous: {dataset_ref}; use the UUID").into()),
}
}
fn read_dataset_manifest(path: &Path) -> RhoResult<DatasetManifest> {
let text = fs::read_to_string(path)
.map_err(|_| format!("dataset manifest not found: {}", path.display()))?;
from_yaml(&text)
}
fn dataset_path_slug(value: &str) -> RhoResult<String> {
if value.is_empty()
|| !value
.chars()
.all(|ch| ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' || ch == '.')
{
return Err(format!("dataset name is not path-safe: {value}").into());
}
Ok(value.to_string())
}
fn should_branch(args: &[String]) -> bool {
!has_flag(args, "--no-branch")
&& (has_flag(args, "--pr") || has_flag(args, "--push") || has_flag(args, "--branch"))
}
fn finish_publish(args: &[String], root: &Path, dataset_slug: &str) -> RhoResult<()> {
let want_pr = has_flag(args, "--pr");
let want_commit = want_pr || has_flag(args, "--commit");
let want_push = want_pr || has_flag(args, "--push");
if !want_commit && !want_push && !want_pr {
return Ok(());
}
if want_commit {
git_add_relative(root, &format!("datasets/{dataset_slug}"))?;
if git_has_staged_changes(root)? {
git_commit(root, &format!("Publish {dataset_slug} dataset"))?;
println!("committed: Publish {dataset_slug} dataset");
} else {
println!("commit: no staged dataset changes");
}
}
if want_push {
git_push_current_branch(root)?;
println!("pushed: {}", current_branch(root)?);
}
if want_pr {
create_pr(
root,
&format!("Publish {dataset_slug} dataset"),
&format!("Publish the {dataset_slug} mock dataset."),
)?;
}
Ok(())
}
fn switch_branch(root: &Path, branch: &str) -> RhoResult<()> {
let status = Command::new("git")
.arg("-C")
.arg(root)
.args(["switch", "-C", branch])
.status()?;
if !status.success() {
return Err(format!("git switch failed for branch {branch}").into());
}
Ok(())
}
fn git_add_relative(root: &Path, path: &str) -> RhoResult<()> {
let status = Command::new("git")
.arg("-C")
.arg(root)
.args(["add", "--", path])
.status()?;
if !status.success() {
return Err(format!("git add failed for {path}").into());
}
Ok(())
}
fn git_has_staged_changes(root: &Path) -> RhoResult<bool> {
let status = Command::new("git")
.arg("-C")
.arg(root)
.args(["diff", "--cached", "--quiet"])
.status()?;
match status.code() {
Some(0) => Ok(false),
Some(1) => Ok(true),
_ => Err(format!("git diff --cached --quiet failed in {}", root.display()).into()),
}
}
fn git_commit(root: &Path, message: &str) -> RhoResult<()> {
let exe = std::env::current_exe()?;
let status = Command::new(exe)
.arg("commit")
.arg("-C")
.arg(root)
.arg("-m")
.arg(message)
.status()?;
if !status.success() {
return Err(format!("rho commit failed in {}", root.display()).into());
}
Ok(())
}
fn git_push_current_branch(root: &Path) -> RhoResult<()> {
let branch = current_branch(root)?;
let status = Command::new("git")
.arg("-C")
.arg(root)
.args(["push", "-u", "origin", &branch])
.status()?;
if !status.success() {
return Err(format!("git push failed for {branch}").into());
}
Ok(())
}
fn current_branch(root: &Path) -> RhoResult<String> {
let output = Command::new("git")
.arg("-C")
.arg(root)
.args(["branch", "--show-current"])
.output()?;
if !output.status.success() {
return Err(format!("git branch --show-current failed in {}", root.display()).into());
}
let branch = String::from_utf8(output.stdout)?.trim().to_string();
if branch.is_empty() {
return Err("current Git branch is empty".into());
}
Ok(branch)
}
fn create_pr(root: &Path, title: &str, body: &str) -> RhoResult<()> {
let exe = std::env::current_exe()?;
let status = Command::new(exe)
.arg("repo")
.arg("create-pr")
.arg("--root")
.arg(root)
.arg("--title")
.arg(title)
.arg("--body")
.arg(body)
.status()?;
if !status.success() {
return Err(format!("rho repo create-pr failed in {}", root.display()).into());
}
Ok(())
}
#[derive(Debug, Deserialize)]
struct DatasetManifest {
dataset: DatasetRecord,
}
#[derive(Debug, Deserialize)]
struct DatasetRecord {
uuid: String,
name: String,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn validates_dataset_path_slugs() {
assert_eq!(dataset_path_slug("prices-2026").unwrap(), "prices-2026");
assert!(dataset_path_slug("../prices").is_err());
assert!(dataset_path_slug("prices/raw").is_err());
assert!(dataset_path_slug("").is_err());
}
#[test]
fn resolves_source_bundle_by_name_or_uuid() {
let root = std::env::temp_dir().join(format!(
"rho-publish-test-{}-{}",
std::process::id(),
rho_core::uuid_like()
));
let bundle = root.join("madhavajay/datasets/share/prices");
fs::create_dir_all(&bundle).unwrap();
fs::write(
bundle.join("dataset.yaml"),
"version: 1\ndataset:\n uuid: \"uuid-1\"\n name: \"prices\"\n",
)
.unwrap();
assert_eq!(
resolve_source_bundle(&root, "madhavajay", "prices").unwrap(),
bundle
);
assert_eq!(
resolve_source_bundle(&root, "madhavajay", "uuid-1").unwrap(),
root.join("madhavajay/datasets/share/prices")
);
fs::remove_dir_all(root).unwrap();
}
}