#![forbid(unsafe_code)]
use anyhow::{Context, Result};
use clap::{CommandFactory, Parser, Subcommand};
use colored::Colorize;
use oxo_flow_core::config::WorkflowConfig;
use oxo_flow_core::dag::WorkflowDag;
use oxo_flow_core::executor::{ExecutorConfig, LocalExecutor};
use std::path::{Path, PathBuf};
#[derive(Parser, Debug)]
#[command(
name = "oxo-flow",
version,
about = "A Rust-native bioinformatics pipeline engine",
long_about = "oxo-flow is a high-performance, modular bioinformatics pipeline engine\n\
built from first principles in Rust. It supports conda, pixi, docker,\n\
singularity, and venv environments with DAG-based execution."
)]
struct Cli {
#[command(subcommand)]
command: Commands,
#[arg(global = true, short = 'v', long)]
verbose: bool,
#[arg(global = true, long)]
quiet: bool,
#[arg(global = true, long)]
no_color: bool,
}
#[derive(Subcommand, Debug)]
enum Commands {
Run {
#[arg(value_name = "WORKFLOW")]
workflow: PathBuf,
#[arg(short = 'j', long, default_value = "1")]
jobs: usize,
#[arg(short = 'k', long)]
keep_going: bool,
#[arg(short = 'd', long)]
workdir: Option<PathBuf>,
#[arg(short = 't', long)]
target: Vec<String>,
#[arg(short = 'r', long, default_value = "0")]
retry: u32,
#[arg(long, default_value = "0")]
timeout: u64,
},
DryRun {
#[arg(value_name = "WORKFLOW")]
workflow: PathBuf,
},
Validate {
#[arg(value_name = "WORKFLOW")]
workflow: PathBuf,
},
Graph {
#[arg(value_name = "WORKFLOW")]
workflow: PathBuf,
},
Report {
#[arg(value_name = "WORKFLOW")]
workflow: PathBuf,
#[arg(short = 'f', long, default_value = "html")]
format: String,
#[arg(short = 'o', long)]
output: Option<PathBuf>,
},
Env {
#[command(subcommand)]
action: EnvAction,
},
Package {
#[arg(value_name = "WORKFLOW")]
workflow: PathBuf,
#[arg(short = 'f', long, default_value = "docker")]
format: String,
#[arg(short = 'o', long)]
output: Option<PathBuf>,
},
Serve {
#[arg(long, default_value = "127.0.0.1")]
host: String,
#[arg(short = 'p', long, default_value = "8080")]
port: u16,
#[arg(long, default_value = "/")]
base_path: String,
},
Init {
#[arg(value_name = "NAME")]
name: String,
#[arg(short = 'd', long)]
dir: Option<PathBuf>,
},
Status {
#[arg(value_name = "CHECKPOINT")]
checkpoint: PathBuf,
},
Clean {
#[arg(value_name = "WORKFLOW")]
workflow: PathBuf,
#[arg(short = 'n', long)]
dry_run: bool,
#[arg(long)]
force: bool,
},
Config {
#[command(subcommand)]
action: ConfigAction,
},
Completions {
#[arg(value_enum)]
shell: clap_complete::Shell,
},
Format {
#[arg(value_name = "WORKFLOW")]
workflow: PathBuf,
#[arg(short = 'o', long)]
output: Option<PathBuf>,
#[arg(long)]
check: bool,
},
Lint {
#[arg(value_name = "WORKFLOW")]
workflow: PathBuf,
#[arg(long)]
strict: bool,
},
Profile {
#[command(subcommand)]
action: ProfileAction,
},
Export {
#[arg(value_name = "WORKFLOW")]
workflow: PathBuf,
#[arg(short = 'f', long, default_value = "docker")]
format: String,
#[arg(short = 'o', long)]
output: Option<PathBuf>,
},
Cluster {
#[command(subcommand)]
action: ClusterAction,
},
Diff {
#[arg(value_name = "WORKFLOW_A")]
workflow_a: PathBuf,
#[arg(value_name = "WORKFLOW_B")]
workflow_b: PathBuf,
},
Touch {
#[arg(value_name = "WORKFLOW")]
workflow: PathBuf,
#[arg(short = 'r', long = "rule")]
rules: Vec<String>,
},
}
#[derive(Subcommand, Debug)]
enum EnvAction {
List,
Check {
#[arg(value_name = "WORKFLOW")]
workflow: PathBuf,
},
}
#[derive(Subcommand, Debug)]
enum ProfileAction {
List,
Show {
#[arg(value_name = "NAME")]
name: String,
},
Current,
}
#[derive(Subcommand, Debug)]
enum ConfigAction {
Show {
#[arg(value_name = "WORKFLOW")]
workflow: PathBuf,
},
Stats {
#[arg(value_name = "WORKFLOW")]
workflow: PathBuf,
},
}
#[derive(Subcommand, Debug)]
enum ClusterAction {
Submit {
#[arg(value_name = "WORKFLOW")]
workflow: PathBuf,
#[arg(short = 'b', long, default_value = "slurm")]
backend: String,
#[arg(short = 'q', long)]
queue: Option<String>,
#[arg(short = 'a', long)]
account: Option<String>,
#[arg(short = 'o', long, default_value = ".oxo-flow/cluster")]
output_dir: PathBuf,
},
Status {
#[arg(short = 'b', long, default_value = "slurm")]
backend: String,
},
Cancel {
#[arg(short = 'b', long, default_value = "slurm")]
backend: String,
#[arg(value_name = "JOB_ID")]
job_ids: Vec<String>,
},
}
fn print_banner() {
eprintln!(
"{} {} — {}",
"oxo-flow".bold().cyan(),
env!("CARGO_PKG_VERSION"),
"Bioinformatics Pipeline Engine".dimmed()
);
}
#[tokio::main]
async fn main() -> Result<()> {
let cli = Cli::parse();
if cli.no_color || std::env::var_os("NO_COLOR").is_some() {
colored::control::set_override(false);
}
let default_level = if cli.quiet {
"error"
} else if cli.verbose {
"debug"
} else {
"info"
};
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(default_level)),
)
.with_target(false)
.init();
match cli.command {
Commands::Run {
workflow,
jobs,
keep_going,
workdir,
target: _,
retry,
timeout,
} => {
print_banner();
let config = WorkflowConfig::from_file(&workflow)
.with_context(|| format!("failed to parse {}", workflow.display()))?;
let dag =
WorkflowDag::from_rules(&config.rules).context("failed to build workflow DAG")?;
let order = dag.execution_order()?;
eprintln!(
"{} {} rules in execution order",
"DAG:".bold().green(),
order.len()
);
for (i, rule_name) in order.iter().enumerate() {
eprintln!(" {}. {}", i + 1, rule_name);
}
let exec_config = ExecutorConfig {
max_jobs: jobs,
dry_run: false,
workdir: workdir.unwrap_or_else(|| std::env::current_dir().unwrap_or_default()),
keep_going,
retry_count: retry,
timeout: if timeout > 0 {
Some(std::time::Duration::from_secs(timeout))
} else {
None
},
};
let executor = LocalExecutor::new(exec_config);
let mut success_count = 0;
let mut fail_count = 0;
for rule_name in &order {
let rule = config.get_rule(rule_name).unwrap();
match executor
.execute_rule(rule, &std::collections::HashMap::new())
.await
{
Ok(record) => {
if record.status == oxo_flow_core::executor::JobStatus::Success {
success_count += 1;
eprintln!(" {} {}", "✓".green().bold(), rule_name);
} else {
eprintln!(" {} {} ({})", "⊘".yellow(), rule_name, record.status);
}
}
Err(e) => {
fail_count += 1;
eprintln!(" {} {} — {}", "✗".red().bold(), rule_name, e);
if !keep_going {
return Err(e.into());
}
}
}
}
eprintln!(
"\n{} {} succeeded, {} failed",
"Done:".bold(),
success_count,
fail_count
);
}
Commands::DryRun { workflow } => {
print_banner();
let config = WorkflowConfig::from_file(&workflow)
.with_context(|| format!("failed to parse {}", workflow.display()))?;
let dag =
WorkflowDag::from_rules(&config.rules).context("failed to build workflow DAG")?;
let order = dag.execution_order()?;
eprintln!(
"{} {} rules would execute:",
"Dry-run:".bold().yellow(),
order.len()
);
for (i, rule_name) in order.iter().enumerate() {
let rule = config.get_rule(rule_name).unwrap();
eprintln!(
" {}. {} [threads={}, env={}]",
i + 1,
rule_name.bold(),
rule.effective_threads(),
rule.environment.kind()
);
if let Some(ref cmd) = rule.shell {
let preview: String = cmd.chars().take(80).collect();
eprintln!(" $ {}", preview.dimmed());
}
}
}
Commands::Validate { workflow } => {
let config = WorkflowConfig::from_file(&workflow);
match config {
Ok(cfg) => {
match WorkflowDag::from_rules(&cfg.rules) {
Ok(dag) => {
eprintln!(
"{} {} — {} rules, {} dependencies",
"✓".green().bold(),
workflow.display(),
dag.node_count(),
dag.edge_count()
);
}
Err(e) => {
eprintln!(
"{} {} — DAG error: {}",
"✗".red().bold(),
workflow.display(),
e
);
std::process::exit(1);
}
}
}
Err(e) => {
eprintln!("{} {} — {}", "✗".red().bold(), workflow.display(), e);
std::process::exit(1);
}
}
}
Commands::Graph { workflow } => {
let config = WorkflowConfig::from_file(&workflow)
.with_context(|| format!("failed to parse {}", workflow.display()))?;
let dag =
WorkflowDag::from_rules(&config.rules).context("failed to build workflow DAG")?;
println!("{}", dag.to_dot());
}
Commands::Report {
workflow,
format,
output,
} => {
print_banner();
let config = WorkflowConfig::from_file(&workflow)
.with_context(|| format!("failed to parse {}", workflow.display()))?;
let mut report = oxo_flow_core::report::Report::new(
&format!("{} Report", config.workflow.name),
&config.workflow.name,
&config.workflow.version,
);
report.add_section(oxo_flow_core::report::ReportSection {
title: "Workflow Information".to_string(),
id: "workflow-info".to_string(),
content: oxo_flow_core::report::ReportContent::KeyValue {
pairs: vec![
("Name".to_string(), config.workflow.name.clone()),
("Version".to_string(), config.workflow.version.clone()),
("Rules".to_string(), config.rules.len().to_string()),
],
},
subsections: vec![],
});
let content = match format.as_str() {
"json" => report.to_json()?,
_ => report.to_html(),
};
match output {
Some(path) => {
std::fs::write(&path, &content)?;
eprintln!("Report written to {}", path.display());
}
None => {
println!("{content}");
}
}
}
Commands::Env { action } => {
print_banner();
match action {
EnvAction::List => {
let resolver = oxo_flow_core::environment::EnvironmentResolver::new();
let available = resolver.available_backends();
eprintln!("{}", "Available environment backends:".bold());
for backend in available {
eprintln!(" {} {}", "✓".green(), backend);
}
}
EnvAction::Check { workflow } => {
let config = WorkflowConfig::from_file(&workflow)
.with_context(|| format!("failed to parse {}", workflow.display()))?;
let resolver = oxo_flow_core::environment::EnvironmentResolver::new();
let mut all_ok = true;
for rule in &config.rules {
match resolver.validate_spec(&rule.environment) {
Ok(()) => {
eprintln!(
" {} {} ({})",
"✓".green(),
rule.name,
rule.environment.kind()
);
}
Err(e) => {
eprintln!(" {} {} — {}", "✗".red(), rule.name, e);
all_ok = false;
}
}
}
if !all_ok {
std::process::exit(1);
}
}
}
}
Commands::Package {
workflow,
format,
output,
} => {
print_banner();
let config = WorkflowConfig::from_file(&workflow)
.with_context(|| format!("failed to parse {}", workflow.display()))?;
let pkg_config = oxo_flow_core::container::PackageConfig {
format: match format.as_str() {
"singularity" => oxo_flow_core::container::ContainerFormat::Singularity,
_ => oxo_flow_core::container::ContainerFormat::Docker,
},
..Default::default()
};
let content = match pkg_config.format {
oxo_flow_core::container::ContainerFormat::Docker => {
oxo_flow_core::container::generate_dockerfile(&config, &pkg_config)?
}
oxo_flow_core::container::ContainerFormat::Singularity => {
oxo_flow_core::container::generate_singularity_def(&config, &pkg_config)?
}
};
match output {
Some(path) => {
std::fs::write(&path, &content)?;
eprintln!("Container definition written to {}", path.display());
}
None => {
println!("{content}");
}
}
}
Commands::Serve {
host,
port,
base_path,
} => {
print_banner();
if base_path != "/" {
eprintln!(
"Starting web server at {}:{} with base path '{}' ...",
host, port, base_path
);
} else {
eprintln!("Starting web server at {}:{} ...", host, port);
}
oxo_flow_web::start_server_with_base(&host, port, &base_path).await?;
}
Commands::Init { name, dir } => {
print_banner();
let project_dir = dir.unwrap_or_else(|| PathBuf::from(&name));
std::fs::create_dir_all(&project_dir)?;
let workflow_content = format!(
r#"[workflow]
name = "{name}"
version = "0.1.0"
description = "A new oxo-flow pipeline"
[config]
# Add your configuration variables here
[defaults]
threads = 4
memory = "8G"
# Define your pipeline rules below:
# [[rules]]
# name = "step1"
# input = ["input.txt"]
# output = ["output.txt"]
# shell = "cat input.txt > output.txt"
"#
);
let workflow_path = project_dir.join(format!("{name}.oxoflow"));
std::fs::write(&workflow_path, workflow_content)?;
let envs_dir = project_dir.join("envs");
let scripts_dir = project_dir.join("scripts");
std::fs::create_dir_all(&envs_dir)?;
std::fs::create_dir_all(&scripts_dir)?;
let gitignore_content = "\
# Alignment files
*.bam
*.bam.bai
*.cram
*.cram.crai
*.sam
# Variant files
*.vcf.gz
*.vcf.gz.tbi
*.bcf
# Index files
*.fai
*.dict
# Workflow outputs
logs/
results/
benchmarks/
# oxo-flow internals
.oxo-flow/
.oxo-flow-cache/
# OS files
.DS_Store
Thumbs.db
";
let gitignore_path = project_dir.join(".gitignore");
std::fs::write(&gitignore_path, gitignore_content)?;
eprintln!(
"{} Created new project at {}",
"✓".green().bold(),
project_dir.display()
);
eprintln!(" {}", workflow_path.display());
eprintln!(" {}/", envs_dir.display());
eprintln!(" {}/", scripts_dir.display());
eprintln!(" {}", gitignore_path.display());
eprintln!(
"\n Edit {} to define your pipeline.",
workflow_path.display()
);
}
Commands::Status { checkpoint } => {
print_banner();
let content = std::fs::read_to_string(&checkpoint)
.with_context(|| format!("failed to read {}", checkpoint.display()))?;
let state = oxo_flow_core::executor::CheckpointState::from_json(&content)
.map_err(|e| anyhow::anyhow!("{}", e))?;
eprintln!("{}", "Checkpoint Status:".bold());
for rule in &state.completed_rules {
eprintln!(" {} {}", "✓".green().bold(), rule);
}
for rule in &state.failed_rules {
eprintln!(" {} {}", "✗".red().bold(), rule);
}
let completed = state.completed_rules.len();
let failed = state.failed_rules.len();
eprintln!(
"\n{} {} completed, {} failed",
"Summary:".bold(),
completed,
failed
);
}
Commands::Clean {
workflow,
dry_run,
force,
} => {
print_banner();
let config = WorkflowConfig::from_file(&workflow)
.with_context(|| format!("failed to parse {}", workflow.display()))?;
let mut outputs: Vec<String> = Vec::new();
for rule in &config.rules {
for output in &rule.output {
if !outputs.contains(output) {
outputs.push(output.clone());
}
}
}
if dry_run {
eprintln!("{}", "Would clean (dry-run):".bold().yellow());
for output in &outputs {
let has_wildcard = output.contains('{') && output.contains('}');
if has_wildcard {
eprintln!(" {} (wildcard, skipped)", output.dimmed());
} else if Path::new(output).exists() {
eprintln!(" {} (exists)", output);
} else {
eprintln!(" {} (not found)", output.dimmed());
}
}
eprintln!("\n{} {} output patterns", "Total:".bold(), outputs.len());
} else {
let mut deletable: Vec<&String> = Vec::new();
let mut skipped_wildcard = 0usize;
let mut not_found = 0usize;
for output in &outputs {
let has_wildcard = output.contains('{') && output.contains('}');
if has_wildcard {
skipped_wildcard += 1;
} else if Path::new(output).exists() {
deletable.push(output);
} else {
not_found += 1;
}
}
if deletable.is_empty() {
eprintln!(
"{} Nothing to delete ({} not found, {} wildcard patterns skipped)",
"Clean:".bold(),
not_found,
skipped_wildcard
);
} else {
if !force {
eprintln!(
"{} {} file(s) will be deleted. Continue? [y/N]",
"Clean:".bold().yellow(),
deletable.len()
);
let mut answer = String::new();
std::io::stdin().read_line(&mut answer)?;
if answer.trim().to_lowercase() != "y" {
eprintln!("Aborted.");
return Ok(());
}
}
let mut deleted = 0usize;
let mut failed = 0usize;
for path_str in &deletable {
match std::fs::remove_file(path_str) {
Ok(()) => {
deleted += 1;
eprintln!(" {} {}", "✓".green(), path_str);
}
Err(e) => {
failed += 1;
eprintln!(" {} {} — {}", "✗".red(), path_str, e);
}
}
}
eprintln!(
"\n{} {} deleted, {} failed, {} not found, {} wildcard skipped",
"Done:".bold(),
deleted,
failed,
not_found,
skipped_wildcard
);
}
}
}
Commands::Config { action } => {
print_banner();
match action {
ConfigAction::Show { workflow } => {
let config = WorkflowConfig::from_file(&workflow)
.with_context(|| format!("failed to parse {}", workflow.display()))?;
eprintln!("{}", "Workflow Configuration:".bold());
eprintln!(" Name: {}", config.workflow.name);
eprintln!(" Version: {}", config.workflow.version);
if let Some(ref desc) = config.workflow.description {
eprintln!(" Desc: {}", desc);
}
if let Some(ref author) = config.workflow.author {
eprintln!(" Author: {}", author);
}
if !config.config.is_empty() {
eprintln!("\n{}", " Config Variables:".bold());
let mut keys: Vec<&String> = config.config.keys().collect();
keys.sort();
for key in keys {
eprintln!(" {} = {}", key, config.config[key]);
}
}
if !config.includes.is_empty() {
eprintln!("\n{}", " Includes:".bold());
for inc in &config.includes {
if let Some(ref ns) = inc.namespace {
eprintln!(" {} (namespace: {})", inc.path, ns);
} else {
eprintln!(" {}", inc.path);
}
}
}
if !config.execution_groups.is_empty() {
eprintln!("\n{}", " Execution Groups:".bold());
for group in &config.execution_groups {
eprintln!(" {} ({:?}): {:?}", group.name, group.mode, group.rules);
}
}
}
ConfigAction::Stats { workflow } => {
let config = WorkflowConfig::from_file(&workflow)
.with_context(|| format!("failed to parse {}", workflow.display()))?;
let stats = oxo_flow_core::format::workflow_stats(&config);
eprintln!("{}", "Workflow Statistics:".bold());
eprintln!(" Rules: {}", stats.rule_count);
eprintln!(" Shell rules: {}", stats.shell_rules);
eprintln!(" Script rules: {}", stats.script_rules);
eprintln!(" Dependencies: {}", stats.dependency_count);
eprintln!(" Parallel groups: {}", stats.parallel_groups);
eprintln!(" Max depth: {}", stats.max_depth);
eprintln!(" Total threads: {}", stats.total_threads);
eprintln!(
" Wildcards: {} ({:?})",
stats.wildcard_count, stats.wildcard_names
);
if !stats.environments.is_empty() {
eprintln!(" Environments: {:?}", stats.environments);
}
}
}
}
Commands::Completions { shell } => {
let mut cmd = Cli::command();
clap_complete::generate(shell, &mut cmd, "oxo-flow", &mut std::io::stdout());
}
Commands::Format {
workflow,
output,
check,
} => {
let config = WorkflowConfig::from_file(&workflow)
.with_context(|| format!("failed to parse {}", workflow.display()))?;
let formatted = oxo_flow_core::format::format_workflow(&config);
if check {
let original = std::fs::read_to_string(&workflow)?;
if original.trim() == formatted.trim() {
eprintln!(
"{} {} is already formatted",
"✓".green().bold(),
workflow.display()
);
} else {
eprintln!(
"{} {} needs formatting",
"✗".red().bold(),
workflow.display()
);
std::process::exit(1);
}
} else {
match output {
Some(path) => {
std::fs::write(&path, &formatted)?;
eprintln!("Formatted workflow written to {}", path.display());
}
None => {
print!("{formatted}");
}
}
}
}
Commands::Lint { workflow, strict } => {
print_banner();
let config = WorkflowConfig::from_file(&workflow)
.with_context(|| format!("failed to parse {}", workflow.display()))?;
let validation = oxo_flow_core::format::validate_format(&config);
let lint_diags = oxo_flow_core::format::lint_format(&config);
let mut error_count = 0usize;
let mut warning_count = 0usize;
let mut info_count = 0usize;
for d in validation.diagnostics.iter().chain(lint_diags.iter()) {
let prefix = match d.severity {
oxo_flow_core::format::Severity::Error => {
error_count += 1;
"error".red().bold().to_string()
}
oxo_flow_core::format::Severity::Warning => {
warning_count += 1;
"warning".yellow().bold().to_string()
}
oxo_flow_core::format::Severity::Info => {
info_count += 1;
"info".blue().to_string()
}
};
eprint!(" {} [{}]: {}", prefix, d.code, d.message);
if let Some(ref rule) = d.rule {
eprint!(" (rule: {})", rule);
}
eprintln!();
}
eprintln!(
"\n{} {} error(s), {} warning(s), {} info",
"Summary:".bold(),
error_count,
warning_count,
info_count
);
if error_count > 0 || (strict && warning_count > 0) {
std::process::exit(1);
}
}
Commands::Profile { action } => {
print_banner();
match action {
ProfileAction::List => {
eprintln!("{}", "Available execution profiles:".bold());
let profiles = ["local", "slurm", "pbs", "sge", "lsf"];
for p in &profiles {
let desc = match *p {
"local" => "Local execution (default)",
"slurm" => "SLURM cluster scheduler",
"pbs" => "PBS/Torque cluster scheduler",
"sge" => "Sun Grid Engine (SGE) scheduler",
"lsf" => "IBM LSF scheduler",
_ => "Unknown",
};
eprintln!(" {} {} — {}", "•".cyan(), p.bold(), desc);
}
}
ProfileAction::Show { name } => match name.as_str() {
"local" => {
eprintln!("{}", "Profile: local".bold());
eprintln!(" Executor: local process");
eprintln!(" Max jobs: auto (CPU count)");
eprintln!(" Retries: 0");
eprintln!(" Timeout: none");
}
"slurm" | "pbs" | "sge" | "lsf" => {
let backend = match name.as_str() {
"slurm" => oxo_flow_core::cluster::ClusterBackend::Slurm,
"pbs" => oxo_flow_core::cluster::ClusterBackend::Pbs,
"sge" => oxo_flow_core::cluster::ClusterBackend::Sge,
_ => oxo_flow_core::cluster::ClusterBackend::Lsf,
};
eprintln!("{}", format!("Profile: {}", name).bold());
eprintln!(
" Submit cmd: {}",
oxo_flow_core::cluster::submit_command(&backend)
);
eprintln!(
" Status cmd: {}",
oxo_flow_core::cluster::status_command(&backend)
);
eprintln!(" Executor: cluster job submission");
}
other => {
eprintln!("{} Unknown profile: {}", "✗".red().bold(), other);
std::process::exit(1);
}
},
ProfileAction::Current => {
eprintln!("{} {}", "Active profile:".bold(), "local".green());
}
}
}
Commands::Export {
workflow,
format,
output,
} => {
print_banner();
let config = WorkflowConfig::from_file(&workflow)
.with_context(|| format!("failed to parse {}", workflow.display()))?;
let content = match format.as_str() {
"singularity" => {
let pkg = oxo_flow_core::container::PackageConfig {
format: oxo_flow_core::container::ContainerFormat::Singularity,
..Default::default()
};
oxo_flow_core::container::generate_singularity_def(&config, &pkg)?
}
"toml" => oxo_flow_core::format::format_workflow(&config),
_ => {
let pkg = oxo_flow_core::container::PackageConfig::default();
oxo_flow_core::container::generate_dockerfile(&config, &pkg)?
}
};
match output {
Some(path) => {
std::fs::write(&path, &content)?;
eprintln!(
"{} Exported {} to {}",
"✓".green().bold(),
format,
path.display()
);
}
None => {
println!("{content}");
}
}
}
Commands::Cluster { action } => {
print_banner();
match action {
ClusterAction::Submit {
workflow,
backend,
queue,
account,
output_dir,
} => {
let config = WorkflowConfig::from_file(&workflow)
.with_context(|| format!("failed to parse {}", workflow.display()))?;
let dag = WorkflowDag::from_rules(&config.rules)
.context("failed to build workflow DAG")?;
let order = dag.execution_order()?;
let cluster_backend = match backend.as_str() {
"pbs" => oxo_flow_core::cluster::ClusterBackend::Pbs,
"sge" => oxo_flow_core::cluster::ClusterBackend::Sge,
"lsf" => oxo_flow_core::cluster::ClusterBackend::Lsf,
_ => oxo_flow_core::cluster::ClusterBackend::Slurm,
};
let cluster_config = oxo_flow_core::cluster::ClusterJobConfig {
backend: cluster_backend,
queue: queue.clone(),
account: account.clone(),
walltime: None,
extra_args: vec![],
};
std::fs::create_dir_all(&output_dir)?;
eprintln!(
"{} Generating {} job scripts for {} rules",
"Cluster:".bold().cyan(),
backend,
order.len()
);
for rule_name in &order {
let rule = config.get_rule(rule_name).unwrap();
let shell_cmd = match rule.shell.as_deref() {
Some(cmd) => cmd,
None => {
eprintln!(
" {} {} — no shell command, skipping",
"⊘".yellow(),
rule_name
);
continue;
}
};
let script = oxo_flow_core::cluster::generate_submit_script(
&cluster_backend,
rule,
shell_cmd,
&cluster_config,
);
let script_path = output_dir.join(format!("{rule_name}.sh"));
std::fs::write(&script_path, &script)?;
eprintln!(" {} {}", "✓".green(), script_path.display());
}
eprintln!(
"\n{} {} scripts written to {}",
"Done:".bold(),
order.len(),
output_dir.display()
);
eprintln!(
" Submit with: {} {}/*.sh",
oxo_flow_core::cluster::submit_command(&cluster_backend),
output_dir.display()
);
}
ClusterAction::Status { backend } => {
let cluster_backend = match backend.as_str() {
"pbs" => oxo_flow_core::cluster::ClusterBackend::Pbs,
"sge" => oxo_flow_core::cluster::ClusterBackend::Sge,
"lsf" => oxo_flow_core::cluster::ClusterBackend::Lsf,
_ => oxo_flow_core::cluster::ClusterBackend::Slurm,
};
eprintln!(
"{} Use '{}' to check job status",
"Cluster:".bold().cyan(),
oxo_flow_core::cluster::status_command(&cluster_backend)
);
}
ClusterAction::Cancel { backend, job_ids } => {
let cancel_cmd = match backend.as_str() {
"pbs" => "qdel",
"sge" => "qdel",
"lsf" => "bkill",
_ => "scancel",
};
if job_ids.is_empty() {
eprintln!(
"{} No job IDs provided. Usage: oxo-flow cluster cancel <JOB_ID>...",
"Warning:".bold().yellow()
);
} else {
for id in &job_ids {
eprintln!(" {} {} {}", cancel_cmd, id, "(would cancel)".dimmed());
}
eprintln!(
"\n{} Run manually: {} {}",
"Hint:".bold(),
cancel_cmd,
job_ids.join(" ")
);
}
}
}
}
Commands::Diff {
workflow_a,
workflow_b,
} => {
print_banner();
let config_a = WorkflowConfig::from_file(&workflow_a)
.with_context(|| format!("failed to parse {}", workflow_a.display()))?;
let config_b = WorkflowConfig::from_file(&workflow_b)
.with_context(|| format!("failed to parse {}", workflow_b.display()))?;
let diffs = oxo_flow_core::format::diff_workflows(&config_a, &config_b);
if diffs.is_empty() {
eprintln!("{} Workflows are identical", "✓".green().bold());
} else {
eprintln!(
"{} {} difference(s) between {} and {}:",
"Diff:".bold().yellow(),
diffs.len(),
workflow_a.display(),
workflow_b.display()
);
for diff in &diffs {
eprintln!(" {} [{}] {}", "•".cyan(), diff.category, diff.description);
}
}
}
Commands::Touch { workflow, rules } => {
print_banner();
let config = WorkflowConfig::from_file(&workflow)
.with_context(|| format!("failed to parse {}", workflow.display()))?;
let rules_to_touch: Vec<&oxo_flow_core::rule::Rule> = if rules.is_empty() {
config.rules.iter().collect()
} else {
config
.rules
.iter()
.filter(|r| rules.contains(&r.name))
.collect()
};
let mut touched = 0usize;
let mut skipped = 0usize;
let base_dir = std::env::current_dir().unwrap_or_default();
for rule in &rules_to_touch {
for output in &rule.output {
let has_wildcard = output.contains('{') && output.contains('}');
if has_wildcard {
skipped += 1;
continue;
}
if output.contains("..") || output.starts_with('/') || output.starts_with('~') {
eprintln!(" {} {} (rejected: unsafe path)", "✗".red().bold(), output);
continue;
}
let path = base_dir.join(output);
if path.exists() {
match filetime::set_file_mtime(&path, filetime::FileTime::now()) {
Ok(()) => {
touched += 1;
eprintln!(" {} {}", "✓".green(), output);
}
Err(e) => {
eprintln!(" {} {} ({})", "✗".red(), output, e);
}
}
} else {
if let Some(parent) = path.parent()
&& let Err(e) = std::fs::create_dir_all(parent)
{
eprintln!(
" {} {} (cannot create directory: {})",
"✗".red(),
output,
e
);
continue;
}
match std::fs::write(&path, "") {
Ok(()) => {
touched += 1;
eprintln!(" {} {} (created)", "✓".green(), output);
}
Err(e) => {
eprintln!(" {} {} (failed: {})", "✗".red(), output, e);
}
}
}
}
}
eprintln!(
"\n{} {} file(s) touched, {} wildcard patterns skipped",
"Done:".bold(),
touched,
skipped
);
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use clap::CommandFactory;
#[test]
fn cli_parses_help() {
Cli::command().debug_assert();
}
#[test]
fn cli_parse_run() {
let cli = Cli::try_parse_from(["oxo-flow", "run", "test.oxoflow"]).unwrap();
match cli.command {
Commands::Run { workflow, jobs, .. } => {
assert_eq!(workflow, PathBuf::from("test.oxoflow"));
assert_eq!(jobs, 1);
}
_ => panic!("expected Run command"),
}
}
#[test]
fn cli_parse_dry_run() {
let cli = Cli::try_parse_from(["oxo-flow", "dry-run", "test.oxoflow"]).unwrap();
matches!(cli.command, Commands::DryRun { .. });
}
#[test]
fn cli_parse_validate() {
let cli = Cli::try_parse_from(["oxo-flow", "validate", "test.oxoflow"]).unwrap();
matches!(cli.command, Commands::Validate { .. });
}
#[test]
fn cli_parse_graph() {
let cli = Cli::try_parse_from(["oxo-flow", "graph", "test.oxoflow"]).unwrap();
matches!(cli.command, Commands::Graph { .. });
}
#[test]
fn cli_parse_init() {
let cli = Cli::try_parse_from(["oxo-flow", "init", "my-pipeline"]).unwrap();
match cli.command {
Commands::Init { name, .. } => {
assert_eq!(name, "my-pipeline");
}
_ => panic!("expected Init command"),
}
}
#[test]
fn cli_parse_env_list() {
let cli = Cli::try_parse_from(["oxo-flow", "env", "list"]).unwrap();
matches!(cli.command, Commands::Env { .. });
}
#[test]
fn cli_parse_run_with_options() {
let cli =
Cli::try_parse_from(["oxo-flow", "run", "test.oxoflow", "-j", "8", "-k"]).unwrap();
match cli.command {
Commands::Run {
jobs, keep_going, ..
} => {
assert_eq!(jobs, 8);
assert!(keep_going);
}
_ => panic!("expected Run command"),
}
}
#[test]
fn cli_parse_status() {
let cli = Cli::try_parse_from(["oxo-flow", "status", "checkpoint.json"]).unwrap();
match cli.command {
Commands::Status { checkpoint } => {
assert_eq!(checkpoint, PathBuf::from("checkpoint.json"));
}
_ => panic!("expected Status command"),
}
}
#[test]
fn cli_parse_clean() {
let cli = Cli::try_parse_from(["oxo-flow", "clean", "test.oxoflow"]).unwrap();
match cli.command {
Commands::Clean {
workflow,
dry_run,
force,
} => {
assert_eq!(workflow, PathBuf::from("test.oxoflow"));
assert!(!dry_run);
assert!(!force);
}
_ => panic!("expected Clean command"),
}
}
#[test]
fn cli_parse_clean_dry_run() {
let cli = Cli::try_parse_from(["oxo-flow", "clean", "test.oxoflow", "-n"]).unwrap();
match cli.command {
Commands::Clean { dry_run, .. } => {
assert!(dry_run);
}
_ => panic!("expected Clean command"),
}
}
#[test]
fn cli_parse_clean_force() {
let cli = Cli::try_parse_from(["oxo-flow", "clean", "test.oxoflow", "--force"]).unwrap();
match cli.command {
Commands::Clean { force, .. } => {
assert!(force);
}
_ => panic!("expected Clean command"),
}
}
#[test]
fn cli_parse_completions() {
let cli = Cli::try_parse_from(["oxo-flow", "completions", "bash"]).unwrap();
match cli.command {
Commands::Completions { shell } => {
assert_eq!(shell, clap_complete::Shell::Bash);
}
_ => panic!("expected Completions command"),
}
}
#[test]
fn cli_parse_verbose_flag() {
let cli =
Cli::try_parse_from(["oxo-flow", "--verbose", "validate", "test.oxoflow"]).unwrap();
assert!(cli.verbose);
}
#[test]
fn cli_parse_verbose_short_flag() {
let cli = Cli::try_parse_from(["oxo-flow", "-v", "validate", "test.oxoflow"]).unwrap();
assert!(cli.verbose);
}
#[test]
fn cli_parse_run_with_retry() {
let cli = Cli::try_parse_from(["oxo-flow", "run", "test.oxoflow", "-r", "3"]).unwrap();
match cli.command {
Commands::Run { retry, .. } => {
assert_eq!(retry, 3);
}
_ => panic!("expected Run command"),
}
}
#[test]
fn cli_parse_run_with_timeout() {
let cli =
Cli::try_parse_from(["oxo-flow", "run", "test.oxoflow", "--timeout", "300"]).unwrap();
match cli.command {
Commands::Run { timeout, .. } => {
assert_eq!(timeout, 300);
}
_ => panic!("expected Run command"),
}
}
#[test]
fn cli_parse_format() {
let cli = Cli::try_parse_from(["oxo-flow", "format", "test.oxoflow"]).unwrap();
matches!(cli.command, Commands::Format { .. });
}
#[test]
fn cli_parse_format_check() {
let cli = Cli::try_parse_from(["oxo-flow", "format", "test.oxoflow", "--check"]).unwrap();
match cli.command {
Commands::Format { check, .. } => assert!(check),
_ => panic!("expected Format command"),
}
}
#[test]
fn cli_parse_lint() {
let cli = Cli::try_parse_from(["oxo-flow", "lint", "test.oxoflow"]).unwrap();
matches!(cli.command, Commands::Lint { .. });
}
#[test]
fn cli_parse_lint_strict() {
let cli = Cli::try_parse_from(["oxo-flow", "lint", "test.oxoflow", "--strict"]).unwrap();
match cli.command {
Commands::Lint { strict, .. } => assert!(strict),
_ => panic!("expected Lint command"),
}
}
#[test]
fn cli_parse_profile_list() {
let cli = Cli::try_parse_from(["oxo-flow", "profile", "list"]).unwrap();
matches!(cli.command, Commands::Profile { .. });
}
#[test]
fn cli_parse_profile_show() {
let cli = Cli::try_parse_from(["oxo-flow", "profile", "show", "slurm"]).unwrap();
match cli.command {
Commands::Profile {
action: ProfileAction::Show { name },
} => {
assert_eq!(name, "slurm");
}
_ => panic!("expected Profile Show command"),
}
}
#[test]
fn cli_parse_profile_current() {
let cli = Cli::try_parse_from(["oxo-flow", "profile", "current"]).unwrap();
matches!(
cli.command,
Commands::Profile {
action: ProfileAction::Current
}
);
}
#[test]
fn cli_parse_config_show() {
let cli = Cli::try_parse_from(["oxo-flow", "config", "show", "test.oxoflow"]).unwrap();
matches!(cli.command, Commands::Config { .. });
}
#[test]
fn cli_parse_config_stats() {
let cli = Cli::try_parse_from(["oxo-flow", "config", "stats", "test.oxoflow"]).unwrap();
matches!(
cli.command,
Commands::Config {
action: ConfigAction::Stats { .. }
}
);
}
#[test]
fn cli_parse_export_default() {
let cli = Cli::try_parse_from(["oxo-flow", "export", "test.oxoflow"]).unwrap();
match cli.command {
Commands::Export {
workflow, format, ..
} => {
assert_eq!(workflow, PathBuf::from("test.oxoflow"));
assert_eq!(format, "docker");
}
_ => panic!("expected Export command"),
}
}
#[test]
fn cli_parse_export_singularity() {
let cli = Cli::try_parse_from(["oxo-flow", "export", "test.oxoflow", "-f", "singularity"])
.unwrap();
match cli.command {
Commands::Export { format, .. } => {
assert_eq!(format, "singularity");
}
_ => panic!("expected Export command"),
}
}
#[test]
fn cli_parse_export_toml() {
let cli =
Cli::try_parse_from(["oxo-flow", "export", "test.oxoflow", "-f", "toml"]).unwrap();
match cli.command {
Commands::Export { format, .. } => {
assert_eq!(format, "toml");
}
_ => panic!("expected Export command"),
}
}
#[test]
fn cli_parse_cluster_submit() {
let cli = Cli::try_parse_from([
"oxo-flow",
"cluster",
"submit",
"test.oxoflow",
"-b",
"slurm",
])
.unwrap();
match cli.command {
Commands::Cluster {
action:
ClusterAction::Submit {
workflow, backend, ..
},
} => {
assert_eq!(workflow, PathBuf::from("test.oxoflow"));
assert_eq!(backend, "slurm");
}
_ => panic!("expected Cluster Submit command"),
}
}
#[test]
fn cli_parse_cluster_status() {
let cli = Cli::try_parse_from(["oxo-flow", "cluster", "status", "-b", "pbs"]).unwrap();
match cli.command {
Commands::Cluster {
action: ClusterAction::Status { backend },
} => {
assert_eq!(backend, "pbs");
}
_ => panic!("expected Cluster Status command"),
}
}
#[test]
fn cli_parse_cluster_cancel() {
let cli = Cli::try_parse_from([
"oxo-flow", "cluster", "cancel", "-b", "slurm", "12345", "67890",
])
.unwrap();
match cli.command {
Commands::Cluster {
action: ClusterAction::Cancel { backend, job_ids },
} => {
assert_eq!(backend, "slurm");
assert_eq!(job_ids, vec!["12345", "67890"]);
}
_ => panic!("expected Cluster Cancel command"),
}
}
#[test]
fn cli_parse_diff() {
let cli = Cli::try_parse_from(["oxo-flow", "diff", "a.oxoflow", "b.oxoflow"]).unwrap();
match cli.command {
Commands::Diff {
workflow_a,
workflow_b,
} => {
assert_eq!(workflow_a, PathBuf::from("a.oxoflow"));
assert_eq!(workflow_b, PathBuf::from("b.oxoflow"));
}
_ => panic!("expected Diff command"),
}
}
#[test]
fn cli_parse_touch() {
let cli = Cli::try_parse_from(["oxo-flow", "touch", "pipeline.oxoflow"]).unwrap();
match cli.command {
Commands::Touch {
workflow, rules, ..
} => {
assert_eq!(workflow, PathBuf::from("pipeline.oxoflow"));
assert!(rules.is_empty());
}
_ => panic!("expected Touch command"),
}
}
#[test]
fn cli_parse_touch_with_rule_flag() {
let cli = Cli::try_parse_from(["oxo-flow", "touch", "pipeline.oxoflow", "--rule", "align"])
.unwrap();
match cli.command {
Commands::Touch {
workflow, rules, ..
} => {
assert_eq!(workflow, PathBuf::from("pipeline.oxoflow"));
assert_eq!(rules, vec!["align"]);
}
_ => panic!("expected Touch command"),
}
}
#[test]
fn cli_parse_serve_with_base_path() {
let cli = Cli::try_parse_from([
"oxo-flow",
"serve",
"--host",
"0.0.0.0",
"--base-path",
"/oxo-flow",
])
.unwrap();
match cli.command {
Commands::Serve {
host, base_path, ..
} => {
assert_eq!(host, "0.0.0.0");
assert_eq!(base_path, "/oxo-flow");
}
_ => panic!("expected Serve command"),
}
}
#[test]
fn cli_parse_serve_default_base_path() {
let cli = Cli::try_parse_from(["oxo-flow", "serve"]).unwrap();
match cli.command {
Commands::Serve { base_path, .. } => {
assert_eq!(base_path, "/");
}
_ => panic!("expected Serve command"),
}
}
}