use crate::error::{OxoError, Result};
use colored::Colorize;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::path::Path;
use std::time::SystemTime;
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct WorkflowMeta {
pub name: String,
#[serde(default)]
pub description: String,
#[serde(default)]
pub version: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StepDef {
pub name: String,
pub cmd: String,
#[serde(default)]
pub depends_on: Vec<String>,
#[serde(default)]
pub inputs: Vec<String>,
#[serde(default)]
pub outputs: Vec<String>,
#[serde(default)]
pub gather: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct WorkflowDef {
pub workflow: WorkflowMeta,
#[serde(default)]
pub wildcards: HashMap<String, Vec<String>>,
#[serde(default)]
pub params: HashMap<String, String>,
#[serde(rename = "step", default)]
pub steps: Vec<StepDef>,
}
impl WorkflowDef {
pub fn from_file(path: &Path) -> Result<Self> {
let text = std::fs::read_to_string(path)?;
Self::from_str_content(&text)
}
pub fn from_str_content(s: &str) -> Result<Self> {
Ok(toml::from_str(s)?)
}
}
#[derive(Debug, Clone)]
pub struct ConcreteTask {
pub id: String,
pub step_name: String,
pub cmd: String,
pub inputs: Vec<String>,
pub outputs: Vec<String>,
pub deps: Vec<String>,
pub gather: bool,
}
fn wildcard_combinations(wildcards: &HashMap<String, Vec<String>>) -> Vec<HashMap<String, String>> {
if wildcards.is_empty() {
return vec![HashMap::new()];
}
let mut result: Vec<HashMap<String, String>> = vec![HashMap::new()];
let mut keys: Vec<&String> = wildcards.keys().collect();
keys.sort();
for key in keys {
let values = &wildcards[key];
let mut next = Vec::new();
for val in values {
for existing in &result {
let mut m = existing.clone();
m.insert(key.clone(), val.clone());
next.push(m);
}
}
result = next;
}
result
}
fn substitute(
template: &str,
bindings: &HashMap<String, String>,
params: &HashMap<String, String>,
) -> String {
let mut s = template.to_string();
for (k, v) in bindings {
s = s.replace(&format!("{{{k}}}"), v);
}
for (k, v) in params {
s = s.replace(&format!("{{params.{k}}}", k = k), v);
if !bindings.contains_key(k.as_str()) {
s = s.replace(&format!("{{{k}}}"), v);
}
}
s
}
fn task_id(step_name: &str, bindings: &HashMap<String, String>) -> String {
if bindings.is_empty() {
return step_name.to_string();
}
let mut parts: Vec<String> = bindings.iter().map(|(k, v)| format!("{k}={v}")).collect();
parts.sort();
format!("{step_name}[{}]", parts.join(","))
}
fn uses_wildcards(step: &StepDef, wildcards: &HashMap<String, Vec<String>>) -> bool {
wildcards.keys().any(|k| {
let pat = format!("{{{k}}}");
step.cmd.contains(&pat)
|| step.inputs.iter().any(|i| i.contains(&pat))
|| step.outputs.iter().any(|o| o.contains(&pat))
})
}
pub fn expand(def: &WorkflowDef) -> Result<Vec<ConcreteTask>> {
let combos = wildcard_combinations(&def.wildcards);
let mut tasks: Vec<ConcreteTask> = Vec::new();
let mut step_tasks: HashMap<String, Vec<String>> = HashMap::new();
for step in &def.steps {
let wc = uses_wildcards(step, &def.wildcards);
if step.gather || !wc || combos.len() <= 1 {
let bindings: HashMap<String, String> = if !step.gather && wc && combos.len() == 1 {
combos[0].clone()
} else {
HashMap::new()
};
let id = if step.gather {
step.name.clone() } else {
task_id(&step.name, &bindings)
};
let deps = if step.gather {
step.depends_on
.iter()
.flat_map(|dep| step_tasks.get(dep).cloned().unwrap_or_default())
.collect()
} else {
step.depends_on
.iter()
.flat_map(|dep| {
step_tasks
.get(dep)
.cloned()
.unwrap_or_default()
.into_iter()
.filter(|t| bindings.is_empty() || *t == task_id(dep, &bindings))
})
.collect()
};
let t = ConcreteTask {
id: id.clone(),
step_name: step.name.clone(),
cmd: substitute(&step.cmd, &bindings, &def.params),
inputs: step
.inputs
.iter()
.map(|i| substitute(i, &bindings, &def.params))
.collect(),
outputs: step
.outputs
.iter()
.map(|o| substitute(o, &bindings, &def.params))
.collect(),
deps,
gather: step.gather,
};
step_tasks.entry(step.name.clone()).or_default().push(id);
tasks.push(t);
} else {
for bindings in &combos {
let id = task_id(&step.name, bindings);
let deps: Vec<String> = step
.depends_on
.iter()
.flat_map(|dep| {
step_tasks
.get(dep)
.cloned()
.unwrap_or_default()
.into_iter()
.filter(|t| *t == task_id(dep, bindings))
})
.collect();
let t = ConcreteTask {
id: id.clone(),
step_name: step.name.clone(),
cmd: substitute(&step.cmd, bindings, &def.params),
inputs: step
.inputs
.iter()
.map(|i| substitute(i, bindings, &def.params))
.collect(),
outputs: step
.outputs
.iter()
.map(|o| substitute(o, bindings, &def.params))
.collect(),
deps,
gather: false,
};
step_tasks.entry(step.name.clone()).or_default().push(id);
tasks.push(t);
}
}
}
let id_to_idx: HashMap<&str, usize> = tasks
.iter()
.enumerate()
.map(|(i, t)| (t.id.as_str(), i))
.collect();
for task in &tasks {
for dep in &task.deps {
if !id_to_idx.contains_key(dep.as_str()) {
return Err(OxoError::ExecutionError(format!(
"Step '{}' depends on unknown step/task '{dep}'",
task.step_name
)));
}
}
}
Ok(tasks)
}
fn mtime(path: &str) -> Option<SystemTime> {
std::fs::metadata(path).ok()?.modified().ok()
}
fn is_up_to_date(task: &ConcreteTask) -> bool {
if task.outputs.is_empty() {
return false; }
let Some(oldest_output) = task
.outputs
.iter()
.map(|o| mtime(o))
.collect::<Option<Vec<_>>>()
.and_then(|v| v.into_iter().min())
else {
return false; };
task.inputs
.iter()
.all(|i| mtime(i).is_none_or(|t| t <= oldest_output))
}
#[cfg(not(target_arch = "wasm32"))]
pub async fn execute(tasks: Vec<ConcreteTask>, dry_run: bool) -> Result<()> {
use tokio::task::JoinSet;
if tasks.is_empty() {
println!("{}", "Workflow has no steps to execute.".yellow());
return Ok(());
}
let total = tasks.len();
println!();
println!(
"{} {} — {} task(s){}",
"◆".cyan().bold(),
"oxo workflow".bold(),
total,
if dry_run { " (dry-run)" } else { "" }
);
println!("{}", "─".repeat(60).dimmed());
let mut done: HashSet<String> = HashSet::new();
let mut started: HashSet<String> = HashSet::new();
let mut skipped_count: usize = 0;
let mut join_set: JoinSet<Result<(String, bool /*skipped*/)>> = JoinSet::new();
let mut iterations_without_progress = 0usize;
loop {
let newly_ready: Vec<&ConcreteTask> = tasks
.iter()
.filter(|t| !started.contains(&t.id) && t.deps.iter().all(|d| done.contains(d)))
.collect();
for task in newly_ready {
started.insert(task.id.clone());
if dry_run {
print_task_dry_run(task);
done.insert(task.id.clone());
} else {
let t = task.clone();
join_set.spawn(async move { run_single_task(t).await });
}
}
if dry_run {
if done.len() == total {
break;
}
iterations_without_progress += 1;
if iterations_without_progress > total {
return Err(OxoError::ExecutionError(
"Workflow dry-run stalled: possible dependency cycle".to_string(),
));
}
continue;
}
if done.len() == total {
break;
}
match join_set.join_next().await {
None => {
if done.len() < total {
return Err(OxoError::ExecutionError(
"Workflow stalled: possible dependency cycle or missing step".to_string(),
));
}
break;
}
Some(result) => {
let (id, skipped) = result
.map_err(|e| OxoError::ExecutionError(format!("Task join error: {e}")))??;
if skipped {
skipped_count += 1;
println!(
" {} {} {}",
"↷".dimmed(),
id.dimmed(),
"(up to date)".dimmed()
);
} else {
println!(" {} {}", "✓".green().bold(), id.green());
}
done.insert(id);
}
}
}
println!("{}", "─".repeat(60).dimmed());
let run_count = started.len();
if dry_run {
println!(
"\n{} {} task(s) would execute",
"◆".cyan().bold(),
run_count
);
} else {
println!(
"\n{} Workflow complete ({} task(s) run, {} up to date)",
"✓".green().bold(),
run_count.saturating_sub(skipped_count),
skipped_count
);
}
Ok(())
}
#[cfg(not(target_arch = "wasm32"))]
async fn run_single_task(task: ConcreteTask) -> Result<(String, bool)> {
use tokio::process::Command;
if is_up_to_date(&task) {
return Ok((task.id, true));
}
println!(" {} {}", "▶".cyan().bold(), task.id.cyan().bold());
println!(" {}", task.cmd.dimmed());
for out in &task.outputs {
if let Some(parent) = Path::new(out).parent() {
std::fs::create_dir_all(parent)?;
}
}
let status = Command::new("sh")
.arg("-c")
.arg(&task.cmd)
.status()
.await
.map_err(|e| {
OxoError::ExecutionError(format!("Failed to start task '{}': {e}", task.id))
})?;
if !status.success() {
return Err(OxoError::ExecutionError(format!(
"Task '{}' failed with exit code {}",
task.id,
status.code().unwrap_or(-1)
)));
}
Ok((task.id, false))
}
fn print_task_dry_run(task: &ConcreteTask) {
println!(
" {} {}",
"▷".cyan(),
if task.gather {
format!("{} [gather]", task.id).cyan().bold().to_string()
} else {
task.id.cyan().bold().to_string()
}
);
println!(" $ {}", task.cmd.dimmed());
if !task.deps.is_empty() {
println!(" after: {}", task.deps.join(", ").dimmed());
}
if !task.inputs.is_empty() {
println!(" inputs: {}", task.inputs.join(" ").dimmed());
}
if !task.outputs.is_empty() {
println!(" outputs: {}", task.outputs.join(" ").dimmed());
}
println!();
}
pub fn to_snakemake(def: &WorkflowDef) -> String {
let mut out = String::new();
out.push_str(&format!(
"# Generated by oxo-call — workflow '{}'\n",
def.workflow.name
));
if !def.workflow.description.is_empty() {
out.push_str(&format!("# {}\n", def.workflow.description));
}
out.push_str("# Edit this file or re-run 'oxo-call workflow export' to regenerate.\n\n");
out.push_str("configfile: \"config.yaml\"\n\n");
if !def.wildcards.is_empty() {
out.push_str("# ── Wildcard sample lists ─────────────────────────────────────────────\n");
let mut keys: Vec<&String> = def.wildcards.keys().collect();
keys.sort();
for k in &keys {
let vals = &def.wildcards[*k];
let quoted: Vec<String> = vals.iter().map(|v| format!("\"{v}\"")).collect();
out.push_str(&format!("{} = [{}]\n", k.to_uppercase(), quoted.join(", ")));
}
out.push('\n');
}
let depended_on: HashSet<&str> = def
.steps
.iter()
.flat_map(|s| s.depends_on.iter().map(|d| d.as_str()))
.collect();
let leaf_steps: Vec<&StepDef> = def
.steps
.iter()
.filter(|s| !depended_on.contains(s.name.as_str()))
.collect();
out.push_str("rule all:\n input:\n");
for step in &leaf_steps {
for out_pat in &step.outputs {
if step.gather || !uses_wildcards(step, &def.wildcards) || def.wildcards.is_empty() {
out.push_str(&format!(" \"{out_pat}\",\n"));
} else {
let mut keys: Vec<&String> = def.wildcards.keys().collect();
keys.sort();
let expand_args: Vec<String> = keys
.iter()
.map(|k| format!("{k}={ku}", ku = k.to_uppercase()))
.collect();
out.push_str(&format!(
" expand(\"{out_pat}\", {}),\n",
expand_args.join(", ")
));
}
}
}
out.push('\n');
for step in &def.steps {
out.push_str(&format!("\nrule {}:\n", step.name));
if !step.inputs.is_empty() {
out.push_str(" input:\n");
for inp in &step.inputs {
out.push_str(&format!(" \"{inp}\",\n"));
}
}
if !step.outputs.is_empty() {
out.push_str(" output:\n");
for outp in &step.outputs {
out.push_str(&format!(" \"{outp}\",\n"));
}
}
out.push_str(&format!(" log: \"logs/{}.log\"\n", step.name));
out.push_str(" shell:\n");
let mut cmd = step.cmd.clone();
let mut keys: Vec<&String> = def.wildcards.keys().collect();
keys.sort();
for k in &keys {
cmd = cmd.replace(&format!("{{{k}}}"), &format!("{{wildcards.{k}}}"));
}
let mut pkeys: Vec<&String> = def.params.keys().collect();
pkeys.sort();
for k in &pkeys {
cmd = cmd.replace(&format!("{{params.{k}}}"), &format!("{{config[\"{k}\"]}}"));
}
let cmd_escaped = cmd.replace('"', "\\\"");
out.push_str(&format!(" \"{cmd_escaped}\"\n"));
}
if !def.params.is_empty() {
out.push_str(
"\n# ─── config.yaml (create this file next to your Snakefile) ───────────────\n",
);
out.push_str("# ");
let mut pkeys: Vec<&String> = def.params.keys().collect();
pkeys.sort();
for k in pkeys {
out.push_str(&format!("{k}: \"{}\"\n# ", def.params[k]));
}
out.push('\n');
}
out
}
pub fn to_nextflow(def: &WorkflowDef) -> String {
let mut out = String::new();
out.push_str("#!/usr/bin/env nextflow\n");
out.push_str(&format!(
"// Generated by oxo-call — workflow '{}'\n",
def.workflow.name
));
if !def.workflow.description.is_empty() {
out.push_str(&format!("// {}\n", def.workflow.description));
}
out.push_str("// Edit this file or re-run 'oxo-call workflow export' to regenerate.\n\n");
out.push_str("nextflow.enable.dsl = 2\n\n");
let mut pkeys: Vec<&String> = def.params.keys().collect();
pkeys.sort();
for k in &pkeys {
let v = def.params.get(k.as_str()).map(String::as_str).unwrap_or("");
out.push_str(&format!("params.{k} = \"{v}\"\n"));
}
if !def.wildcards.is_empty() {
out.push_str("params.samplesheet = \"samplesheet.csv\"\n");
}
out.push('\n');
if !def.wildcards.is_empty() {
let mut keys: Vec<&String> = def.wildcards.keys().collect();
keys.sort();
out.push_str(
"// ── Sample channel ──────────────────────────────────────────────────────\n",
);
out.push_str("Channel\n");
out.push_str(" .fromPath(params.samplesheet)\n");
out.push_str(" .splitCsv(header: true)\n");
let fields: Vec<String> = keys.iter().map(|k| format!("row.{k}")).collect();
out.push_str(&format!(
" .map {{ row -> tuple({}) }}\n",
fields.join(", ")
));
out.push_str(" .set { samples_ch }\n\n");
}
for step in &def.steps {
out.push_str(&format!("process {} {{\n", step.name.to_uppercase()));
if !step.inputs.is_empty() {
out.push_str(" input:\n");
for inp in &step.inputs {
out.push_str(&format!(" path '{}'\n", inp));
}
}
if !step.outputs.is_empty() {
out.push_str(" output:\n");
for outp in &step.outputs {
out.push_str(&format!(" path '{}'\n", outp));
}
}
out.push_str("\n script:\n \"\"\"\n");
let mut cmd = step.cmd.clone();
let mut wc_keys: Vec<&String> = def.wildcards.keys().collect();
wc_keys.sort();
for k in &wc_keys {
cmd = cmd.replace(&format!("{{{k}}}"), &format!("${{{k}}}"));
}
let mut p_keys: Vec<&String> = def.params.keys().collect();
p_keys.sort();
for k in &p_keys {
cmd = cmd.replace(&format!("{{params.{k}}}"), &format!("${{params.{k}}}"));
}
out.push_str(&format!(" {cmd}\n"));
out.push_str(" \"\"\"\n}\n\n");
}
out.push_str("workflow {\n");
if !def.wildcards.is_empty() {
out.push_str(" ch = samples_ch\n");
}
for step in &def.steps {
let process = step.name.to_uppercase();
if def.wildcards.is_empty() {
out.push_str(&format!(" {process}()\n"));
} else if step.gather {
out.push_str(&format!(" {process}(ch.collect())\n"));
} else {
out.push_str(&format!(" ch = {process}(ch)\n"));
}
}
out.push_str("}\n");
out
}