use crate::cli::download_cache::{
ChecksumPolicy, ChecksumPolicyType, DownloadCache, DownloadOptions,
};
use crate::cli::syft_url::SyftURL;
use crate::error::Error;
use anyhow::{anyhow, Context};
use colored::Colorize;
use dialoguer::Confirm;
use serde::{Deserialize, Serialize};
use serde_yaml::Value as YamlValue;
use std::collections::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
use std::process::Command;
use tracing::{debug, info};
#[derive(Debug, Serialize, Deserialize)]
struct ProjectConfig {
name: String,
author: String,
workflow: String,
#[serde(default, deserialize_with = "deserialize_string_or_vec")]
assets: Vec<String>,
#[serde(default)]
participants: Vec<String>,
}
fn deserialize_string_or_vec<'de, D>(deserializer: D) -> std::result::Result<Vec<String>, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::{self, Visitor};
use std::fmt;
struct StringOrVec;
impl<'de> Visitor<'de> for StringOrVec {
type Value = Vec<String>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("string or list of strings")
}
fn visit_str<E>(self, value: &str) -> std::result::Result<Self::Value, E>
where
E: de::Error,
{
Ok(vec![value.to_string()])
}
fn visit_seq<A>(self, mut seq: A) -> std::result::Result<Self::Value, A::Error>
where
A: de::SeqAccess<'de>,
{
let mut vec = Vec::new();
while let Some(value) = seq.next_element()? {
vec.push(value);
}
Ok(vec)
}
}
deserializer.deserialize_any(StringOrVec)
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ParticipantData {
#[serde(default)]
pub id: String,
pub ref_version: String,
#[serde(rename = "ref")]
pub ref_path: String,
pub ref_index: String,
pub aligned: String,
pub aligned_index: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub ref_b3sum: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub ref_index_b3sum: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub aligned_b3sum: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub aligned_index_b3sum: Option<String>,
}
pub struct RunParams {
pub project_folder: String,
pub participant_source: String,
pub test: bool,
pub download: bool,
pub dry_run: bool,
pub with_docker: bool,
pub work_dir: Option<String>,
pub resume: bool,
}
enum ParticipantSource {
LocalFile(PathBuf, Option<String>), SyftUrl(SyftURL),
HttpUrl(String),
SampleDataId(String),
}
impl ParticipantSource {
fn parse(source: &str) -> anyhow::Result<Self> {
if source.starts_with("syft://") {
Ok(ParticipantSource::SyftUrl(SyftURL::parse(source)?))
} else if source.starts_with("http://") || source.starts_with("https://") {
Ok(ParticipantSource::HttpUrl(source.to_string()))
} else {
#[derive(serde::Deserialize)]
struct SampleDataConfig {
sample_data_urls: std::collections::HashMap<String, serde_yaml::Value>,
}
let sample_yaml = include_str!("../../sample_data.yaml");
if let Ok(cfg) = serde_yaml::from_str::<SampleDataConfig>(sample_yaml) {
if cfg.sample_data_urls.contains_key(source) {
return Ok(ParticipantSource::SampleDataId(source.to_string()));
}
}
let (path, fragment) = if let Some(hash_pos) = source.find('#') {
(
source[..hash_pos].to_string(),
Some(source[hash_pos + 1..].to_string()),
)
} else {
(source.to_string(), None)
};
Ok(ParticipantSource::LocalFile(PathBuf::from(path), fragment))
}
}
}
async fn fetch_participant_file(
source: &ParticipantSource,
) -> anyhow::Result<(String, Option<String>)> {
match source {
ParticipantSource::LocalFile(path, fragment) => {
if !path.exists() {
return Err(anyhow!("Local file not found: {:?}", path));
}
let content = fs::read_to_string(path)
.with_context(|| format!("Failed to read file: {:?}", path))?;
Ok((content, fragment.clone()))
}
ParticipantSource::SyftUrl(syft_url) => {
let http_url = syft_url.to_http_relay_url("syftbox.net");
fetch_http_content(&http_url)
.await
.map(|content| (content, syft_url.fragment.clone()))
}
ParticipantSource::HttpUrl(url) => {
let (main_url, fragment) = if let Some(hash_pos) = url.find('#') {
(
url[..hash_pos].to_string(),
Some(url[hash_pos + 1..].to_string()),
)
} else {
(url.clone(), None)
};
fetch_http_content(&main_url)
.await
.map(|content| (content, fragment))
}
ParticipantSource::SampleDataId(sample_id) => {
crate::cli::commands::sample_data::fetch(Some(vec![sample_id.clone()]), false, true)
.await?;
#[derive(serde::Deserialize)]
struct SampleEntry {
ref_version: String,
#[serde(rename = "ref")]
ref_url: String,
ref_index: String,
aligned: serde_yaml::Value,
aligned_index: String,
}
#[derive(serde::Deserialize)]
struct SampleDataConfig {
sample_data_urls: std::collections::HashMap<String, SampleEntry>,
}
let sample_yaml = include_str!("../../sample_data.yaml");
let cfg: SampleDataConfig = serde_yaml::from_str(sample_yaml)
.context("Failed to parse embedded sample data configuration")?;
let entry = cfg
.sample_data_urls
.get(sample_id)
.ok_or_else(|| anyhow!("Sample data '{}' not found", sample_id))?;
let biovault_home = crate::config::get_biovault_home()?;
let sample_data_dir = biovault_home.join("data").join("sample");
let reference_dir = sample_data_dir.join("reference");
let participant_dir = sample_data_dir.join(sample_id);
fn filename_from_url(url: &str) -> String {
url.rsplit('/')
.next()
.unwrap_or("")
.split('#')
.next()
.unwrap()
.split('?')
.next()
.unwrap()
.to_string()
}
let ref_filename = filename_from_url(&entry.ref_url);
let ref_index_filename = filename_from_url(&entry.ref_index);
let aligned_abs_path = match &entry.aligned {
serde_yaml::Value::String(url) => participant_dir.join(filename_from_url(url)),
serde_yaml::Value::Sequence(seq) if !seq.is_empty() => {
if let Some(serde_yaml::Value::String(first_url)) = seq.first() {
let first_name = filename_from_url(first_url);
let base_name = if first_name.ends_with(".tar.gz.aa") {
first_name.trim_end_matches(".aa").to_string()
} else {
first_name
};
let cram_name = base_name.trim_end_matches(".tar.gz").to_string();
participant_dir.join(cram_name)
} else {
anyhow::bail!("Invalid aligned URL list in sample data");
}
}
_ => anyhow::bail!("Invalid 'aligned' field in sample data"),
};
let aligned_index_abs = if !entry.aligned_index.is_empty() {
participant_dir.join(filename_from_url(&entry.aligned_index))
} else {
PathBuf::new()
};
let mut yaml = String::new();
yaml.push_str("participants:\n");
yaml.push_str(&format!(" {}:\n", sample_id));
yaml.push_str(&format!(" ref_version: {}\n", entry.ref_version));
yaml.push_str(&format!(
" ref: {}\n",
reference_dir.join(ref_filename).to_string_lossy()
));
yaml.push_str(&format!(
" ref_index: {}\n",
reference_dir.join(ref_index_filename).to_string_lossy()
));
yaml.push_str(&format!(
" aligned: {}\n",
aligned_abs_path.to_string_lossy()
));
if !aligned_index_abs.as_os_str().is_empty() {
yaml.push_str(&format!(
" aligned_index: {}\n",
aligned_index_abs.to_string_lossy()
));
} else {
yaml.push_str(" aligned_index: \n");
}
Ok((yaml, Some(format!("participants.{}", sample_id))))
}
}
}
async fn fetch_http_content(url: &str) -> anyhow::Result<String> {
println!("Fetching participant file from: {}", url.cyan());
let response = reqwest::get(url)
.await
.with_context(|| format!("Failed to fetch URL: {}", url))?;
if !response.status().is_success() {
return Err(anyhow!(
"HTTP request failed with status: {}",
response.status()
));
}
response
.text()
.await
.with_context(|| format!("Failed to read response from: {}", url))
}
fn extract_participant_data(
yaml_content: &str,
fragment: Option<String>,
use_mock: bool,
) -> anyhow::Result<(ParticipantData, Option<String>)> {
let yaml: YamlValue =
serde_yaml::from_str(yaml_content).with_context(|| "Failed to parse participant YAML")?;
let participant_id = if let Some(ref frag) = fragment {
if frag.starts_with("participants.") {
frag.strip_prefix("participants.").unwrap().to_string()
} else {
return Err(anyhow!(
"Invalid fragment format. Expected: participants.ID"
));
}
} else {
return Err(anyhow!("No participant specified in fragment"));
};
let participant_yaml = yaml
.get("participants")
.and_then(|p| p.get(&participant_id))
.ok_or_else(|| anyhow!("Participant '{}' not found", participant_id))?;
if use_mock {
if let Some(mock_yaml) = participant_yaml.get("mock") {
let mut mock_data: ParticipantData = serde_yaml::from_value(mock_yaml.clone())
.with_context(|| "Failed to parse mock data")?;
mock_data.id = participant_id;
let mock_key = format!("mock_data_{}", mock_data.ref_version.to_lowercase());
return Ok((mock_data, Some(mock_key)));
} else {
println!(
"{}",
"Warning: --test flag set but no mock data available for this participant".yellow()
);
}
}
let mut participant: ParticipantData = serde_yaml::from_value(participant_yaml.clone())
.with_context(|| format!("Failed to parse participant data for '{}'", participant_id))?;
participant.id = participant_id;
Ok((participant, None))
}
async fn ensure_files_exist(
participant: &ParticipantData,
auto_download: bool,
source: &ParticipantSource,
mock_key: Option<&str>,
) -> anyhow::Result<ParticipantData> {
let mut local_participant = participant.clone();
let mut cache = DownloadCache::new(None)?;
let cache_base = crate::config::get_cache_dir()?;
let biovault_home = crate::config::get_biovault_home()?;
let downloads_base = biovault_home.join("data").join("downloads");
let participant_downloads_dir = match source {
ParticipantSource::SyftUrl(syft_url) => {
if let Some(mock_key) = mock_key {
downloads_base.join(&syft_url.email).join(mock_key)
} else {
downloads_base.join(&syft_url.email).join(&participant.id)
}
}
ParticipantSource::HttpUrl(url) => {
if let Ok(parsed_url) = reqwest::Url::parse(url) {
if let Some(host) = parsed_url.host_str() {
if host.contains("syftbox") && url.contains("/datasites/") {
if let Some(email_start) = url.find("/datasites/") {
let after_datasites = &url[email_start + 11..];
if let Some(slash_pos) = after_datasites.find('/') {
let email = &after_datasites[..slash_pos];
if let Some(mock_key) = mock_key {
downloads_base.join(email).join(mock_key)
} else {
downloads_base.join(email).join(&participant.id)
}
} else {
downloads_base.join(&participant.id)
}
} else {
downloads_base.join(&participant.id)
}
} else {
downloads_base.join(&participant.id)
}
} else {
downloads_base.join(&participant.id)
}
} else {
downloads_base.join(&participant.id)
}
}
ParticipantSource::LocalFile(path, _) => {
let filename = path.file_stem().and_then(|s| s.to_str()).unwrap_or("local");
if let Some(mock_key) = mock_key {
downloads_base.join(filename).join(mock_key)
} else {
downloads_base.join(filename).join(&participant.id)
}
}
ParticipantSource::SampleDataId(_) => {
downloads_base.join("sample").join(&participant.id)
}
};
fs::create_dir_all(&participant_downloads_dir)?;
fn extract_filename(url: &str) -> String {
url.split('/').next_back().unwrap_or("unknown").to_string()
}
let files_to_check = vec![
(
"reference",
participant.ref_path.clone(),
participant.ref_b3sum.clone(),
),
(
"reference index",
participant.ref_index.clone(),
participant.ref_index_b3sum.clone(),
),
(
"aligned",
participant.aligned.clone(),
participant.aligned_b3sum.clone(),
),
(
"aligned index",
participant.aligned_index.clone(),
participant.aligned_index_b3sum.clone(),
),
];
let mut downloads_needed = Vec::new();
let mut cached_paths: HashMap<String, (PathBuf, String)> = HashMap::new();
for (name, url, b3sum) in &files_to_check {
if url.starts_with("http://") || url.starts_with("https://") {
if let Some(checksum) = b3sum {
let cache_path = cache_base.join("by-hash").join(checksum);
if cache_path.exists() {
debug!("Found {} in cache: {:?}", name, cache_path);
let filename = extract_filename(url);
cached_paths.insert(name.to_string(), (cache_path, filename));
continue;
}
}
downloads_needed.push((name.to_string(), url.clone(), b3sum.clone()));
} else {
if !Path::new(url).exists() {
return Err(anyhow!("Local file not found: {} at {}", name, url));
}
match *name {
"reference" => local_participant.ref_path = url.clone(),
"reference index" => local_participant.ref_index = url.clone(),
"aligned" => local_participant.aligned = url.clone(),
"aligned index" => local_participant.aligned_index = url.clone(),
_ => {}
}
}
}
if !cached_paths.is_empty() {
println!("Using cached files with proper filenames:");
}
for (name, (cache_path, filename)) in cached_paths {
let symlink_path = participant_downloads_dir.join(&filename);
if symlink_path.exists() || symlink_path.is_symlink() {
fs::remove_file(&symlink_path).ok();
}
#[cfg(unix)]
{
std::os::unix::fs::symlink(&cache_path, &symlink_path)
.with_context(|| format!("Failed to create symlink for {}", name))?;
}
#[cfg(windows)]
{
std::os::windows::fs::symlink_file(&cache_path, &symlink_path)
.with_context(|| format!("Failed to create symlink for {}", name))?;
}
println!(" • {} → {}", name.green(), symlink_path.display());
match name.as_str() {
"reference" => local_participant.ref_path = symlink_path.to_string_lossy().to_string(),
"reference index" => {
local_participant.ref_index = symlink_path.to_string_lossy().to_string()
}
"aligned" => local_participant.aligned = symlink_path.to_string_lossy().to_string(),
"aligned index" => {
local_participant.aligned_index = symlink_path.to_string_lossy().to_string()
}
_ => {}
}
}
if !downloads_needed.is_empty() {
println!("\nThe following files need to be downloaded:");
for (name, url, _) in &downloads_needed {
println!(" - {} from {}", name.cyan(), url);
}
let should_download = if auto_download {
true
} else {
Confirm::new()
.with_prompt("Do you want to download these files?")
.default(true)
.interact()?
};
if !should_download {
return Err(anyhow!("File downloads cancelled by user"));
}
for (name, url, b3sum) in &downloads_needed {
println!("Downloading {}...", name.green());
let filename = extract_filename(url);
let symlink_path = participant_downloads_dir.join(&filename);
let temp_dir = tempfile::tempdir()?;
let temp_path = temp_dir.path().join(&filename);
let checksum_policy = if let Some(hash) = b3sum {
ChecksumPolicy {
policy_type: ChecksumPolicyType::Required,
expected_hash: Some(hash.clone()),
}
} else {
ChecksumPolicy {
policy_type: ChecksumPolicyType::Optional,
expected_hash: None,
}
};
let options = DownloadOptions {
checksum_policy,
show_progress: true,
cache_strategy: Default::default(),
};
let _downloaded_path = cache.download_with_cache(url, &temp_path, options).await?;
if let Some(hash) = b3sum {
let cache_path = crate::config::get_cache_dir()?.join("by-hash").join(hash);
if symlink_path.exists() || symlink_path.is_symlink() {
fs::remove_file(&symlink_path).ok();
}
#[cfg(unix)]
{
std::os::unix::fs::symlink(&cache_path, &symlink_path)
.with_context(|| format!("Failed to create symlink for {}", name))?;
}
#[cfg(windows)]
{
std::os::windows::fs::symlink_file(&cache_path, &symlink_path)
.with_context(|| format!("Failed to create symlink for {}", name))?;
}
println!(" • {} → {}", name.green(), symlink_path.display());
} else {
fs::copy(&temp_path, &symlink_path)?;
println!(" • {} → {}", name.green(), symlink_path.display());
}
match name.as_str() {
"reference" => {
local_participant.ref_path = symlink_path.to_string_lossy().to_string()
}
"reference index" => {
local_participant.ref_index = symlink_path.to_string_lossy().to_string()
}
"aligned" => local_participant.aligned = symlink_path.to_string_lossy().to_string(),
"aligned index" => {
local_participant.aligned_index = symlink_path.to_string_lossy().to_string()
}
_ => {}
}
}
}
Ok(local_participant)
}
pub async fn execute(params: RunParams) -> anyhow::Result<()> {
let project_path = PathBuf::from(¶ms.project_folder);
if !project_path.exists() {
return Err(Error::ProjectFolderMissing(params.project_folder.clone()).into());
}
let project_yaml = project_path.join("project.yaml");
if !project_yaml.exists() {
return Err(Error::ProjectConfigMissing(params.project_folder.clone()).into());
}
let workflow_file = project_path
.join("workflow.nf")
.canonicalize()
.context("Failed to resolve workflow.nf path")?;
if !workflow_file.exists() {
return Err(Error::WorkflowMissing(params.project_folder.clone()).into());
}
let config_content =
fs::read_to_string(&project_yaml).context("Failed to read project.yaml")?;
let config: ProjectConfig =
serde_yaml::from_str(&config_content).context("Failed to parse project.yaml")?;
let source = ParticipantSource::parse(¶ms.participant_source)?;
let (yaml_content, fragment) = fetch_participant_file(&source).await?;
let (mut participant, mock_key) =
extract_participant_data(&yaml_content, fragment, params.test)?;
participant =
ensure_files_exist(&participant, params.download, &source, mock_key.as_deref()).await?;
let biovault_home = crate::config::get_biovault_home()?;
let env_dir = biovault_home.join("env").join("default");
let template_nf = env_dir.join("template.nf");
let nextflow_config = env_dir.join("nextflow.config");
if !template_nf.exists() || !nextflow_config.exists() {
return Err(Error::TemplatesNotFound.into());
}
let temp_template = template_nf;
let temp_config = nextflow_config;
let assets_dir = if !config.assets.is_empty() {
project_path.join(&config.assets[0])
} else {
project_path.join("assets")
};
if !assets_dir.exists() {
fs::create_dir_all(&assets_dir)?;
}
let assets_dir = assets_dir
.canonicalize()
.context("Failed to resolve assets directory path")?;
let results_dir = project_path.join("results").join(&participant.id);
if !results_dir.exists() {
fs::create_dir_all(&results_dir)?;
}
let results_dir = results_dir
.canonicalize()
.context("Failed to resolve results directory path")?;
info!(
"Running workflow '{}' from project '{}'",
config.workflow, config.name
);
println!("Processing participant: {}", participant.id.cyan());
let mut cmd = Command::new("nextflow");
cmd.current_dir(&project_path);
cmd.arg("run")
.arg(&temp_template)
.arg("--participant_id")
.arg(&participant.id)
.arg("--ref_version")
.arg(&participant.ref_version)
.arg("--ref")
.arg(&participant.ref_path)
.arg("--ref_index")
.arg(&participant.ref_index)
.arg("--aligned")
.arg(&participant.aligned)
.arg("--aligned_index")
.arg(&participant.aligned_index)
.arg("--work_flow_file")
.arg(workflow_file.to_string_lossy().as_ref())
.arg("--assets_dir")
.arg(assets_dir.to_string_lossy().as_ref())
.arg("--results_dir")
.arg(results_dir.to_string_lossy().as_ref());
if params.resume {
cmd.arg("-resume");
}
if let Some(work_dir) = params.work_dir {
cmd.arg("-work-dir");
cmd.arg(work_dir);
}
if params.with_docker {
cmd.arg("-with-docker");
}
cmd.arg("-c").arg(&temp_config);
println!("\n{}", "Nextflow command:".green().bold());
let mut cmd_str = String::from("nextflow");
for arg in cmd.get_args() {
cmd_str.push(' ');
let arg_str = arg.to_string_lossy();
if arg_str.contains(' ') {
cmd_str.push_str(&format!("'{}'", arg_str));
} else {
cmd_str.push_str(&arg_str);
}
}
println!("{}\n", cmd_str.cyan());
if params.dry_run {
println!("{}", "[DRY RUN] Would execute the above command".yellow());
return Ok(());
}
println!("Executing Nextflow workflow...");
let status = cmd.status().context("Failed to execute Nextflow")?;
if !status.success() {
return Err(anyhow!("Nextflow execution failed"));
}
println!("{}", "Workflow completed successfully!".green().bold());
Ok(())
}