biovault 0.1.110

A bioinformatics data vault CLI tool
Documentation
nextflow.enable.dsl=2

// ------------------------------------------------------------------
// Params
// ------------------------------------------------------------------

// samplesheet file (supports csv and tsv)
params.samplesheet     = params.samplesheet     ?: 'participants.csv'
// schema yaml file
params.schema_yaml     = params.schema_yaml     ?: 'schema.yaml'
// workflow file
params.work_flow_file  = params.work_flow_file  ?: null
// assets directory
params.assets_dir      = params.assets_dir      ?: null
// results directory
params.results_dir     = params.results_dir     ?: 'results'

// Channels for simple passthrough values/files
sample_sheet_ch = Channel.fromPath(params.samplesheet)
assets_dir_ch = Channel.value( file(params.assets_dir) )
work_flow_file  = Channel.fromPath(params.work_flow_file)

// ------------------------------------------------------------------
// Schema (YAML) loader
// schema.yaml keys supported:
//   required: [list of col names]
//   rename:   { csv_header_name: canonical_name, ... }
//   defaults: { col: default_value, ... }
//   types:    { col: string|int|float|bool|path, ... }
// ------------------------------------------------------------------

def yaml = new org.yaml.snakeyaml.Yaml()

/*
Example schema.yaml
required:
  - participant_id
  - genotype_file_path

rename:
  geno_path: genotype_file_path

defaults:
  age: 40

types:
  participant_id: string
  genotype_file_path: path
  weight: float
  height: float
  age: int
*/

def schema   = yaml.load(file(params.schema_yaml).text) ?: [:]
def sheetDir = file(params.samplesheet).toRealPath().normalize().parent

// ------------------------------------------------------------------
// Helpers
// ------------------------------------------------------------------

def castValue = { String v, String t ->
    if (v == null) return null
    switch ((t ?: 'string').toLowerCase()) {
        case 'int'   : return v.isInteger()    ? v.toInteger()   : (v ? v.toBigDecimal().intValue() : null)
        case 'float' : return v.isBigDecimal() ? v.toBigDecimal(): (v ? new BigDecimal(v)           : null)
        case 'bool'  : return ['1','true','yes','y'].contains(v.toLowerCase())
        case 'path'  :
            def raw = v.trim()
            def f = file(raw)
            if (f.isAbsolute() && f.exists()) {
                return f
            }
            def resolved = sheetDir.resolve(raw).normalize()
            return resolved
        default      : return v
    }
}

def applySchema = { Map row ->
    // 1) renames
    (schema.rename ?: [:]).each { from, to ->
        if (row.containsKey(from) && !row.containsKey(to)) {
            row[to] = row.remove(from)
        }
    }
    // 2) defaults
    (schema.defaults ?: [:]).each { k,v ->
        if (!row.containsKey(k) || row[k] == null || row[k].toString().trim() == '') {
            row[k] = v
        }
    }
    // 3) required
    (schema.required ?: []).each { k ->
        if (!row.containsKey(k) || row[k] == null || row[k].toString().trim() == '') {
            throw new IllegalArgumentException("Missing required field: ${k} in row: ${row}")
        }
    }
    // 4) types
    def types = (schema.types ?: [:])
    row.collectEntries { k,v -> [k, castValue(v?.toString(), types[k])] }
}

// ------------------------------------------------------------------
// Build channels
//  - mapping_ch: single value map with header/types/etc
//  - rows_src:   queue channel of row maps (schema-applied)
//  - rows_for_user: duplicated branch for the inner workflow
// ------------------------------------------------------------------

// Determine delimiter based on file extension
def delimiter = params.samplesheet.endsWith('.tsv') ? '\t' : ','

// Read header line safely
def header_line = file(params.samplesheet).newInputStream().withReader { it.readLine() }
if (!header_line) {
    throw new IllegalArgumentException("Samplesheet file appears empty or unreadable: ${params.samplesheet}")
}

def orig_headers_raw = header_line.split(delimiter).collect { it.trim() }
def rename_map       = (schema.rename ?: [:])
def renamed_headers  = orig_headers_raw.collect { h -> rename_map.getOrDefault(h, h) }

// mapping meta we pass inward (extend as needed)
mapping_ch = Channel.value([
    orig_headers: renamed_headers,   // original column order (after rename)
    types       : (schema.types ?: [:]),
    required    : (schema.required ?: []),
    rename      : rename_map,
    defaults    : (schema.defaults ?: [:])
])

// Build the rows source once (as you already have)
rows_src = Channel
  .fromPath(params.samplesheet)
  .splitCsv(header: true, sep: delimiter)
  .map { Map row ->
    def tidied = row.collectEntries { k,v -> [(k?.trim()), v] }
    applySchema(tidied)
  }

// ❌ remove this (it causes the DataflowBroadcast.into error):
// rows_src.into { rows_for_user /* , rows_for_stats, rows_for_logging, ... */ }

// ✅ just bind it (one consumer == no duplication needed)
def rows_for_user = rows_src

// ------------------------------------------------------------------
// Include and run inner workflow
// The inner workflow (workflow.nf) should define `workflow USER { ... }`
// and accept: sample_sheet_ch, rows_for_user, mapping_ch, assets_dir_ch, results_dir
// ------------------------------------------------------------------

include { USER } from "${params.work_flow_file}"

workflow {
    // ensure results dir exists (harmless if already present)
    file(params.results_dir).mkdirs()

    // hand off to inner workflow
    USER(
        sample_sheet_ch,
        rows_for_user,
        mapping_ch,
        assets_dir_ch,
        params.results_dir
    )
}