bamslice 0.1.1

Extract byte ranges from BAM files and convert to interleaved FASTQ format for parallel processing
Documentation
#!/usr/bin/env nextflow

/*
 * bamslice - Parallel BAM to FASTQ conversion using byte ranges
 *
 * This example pipeline demonstrates how to use bamslice to convert
 * BAM files to FASTQ in parallel by splitting on byte ranges, then
 * piping through fastp for QC/filtering.
 */

params.bam = "tests/fixtures/emseq-test1.bam"  // Can be a glob pattern like "*.bam"
params.chunk_size = "100 MB"  // Supports: 100 KB, 100 MB, 1 GB, etc.
params.outdir = "results"
params.bamslice = "${projectDir}/target/release/bamslice"  // Path to bamslice binary

workflow {
    def bamslice_bin = file(params.bamslice)
    if (!bamslice_bin.exists()) {
        error "ERROR: bamslice binary not found at: ${params.bamslice}\n" +
              "Please build it with: cargo build --release\n" +
              "Or specify the correct path with: --bamslice /path/to/bamslice"
    }

    if ("which fastp".execute().waitFor() != 0) {
        log.error "ERROR: fastp not found on PATH"
    }

    // Create channel of BAM files
    bam_ch = Channel.fromPath(params.bam, checkIfExists: true)

    chunk_size_bytes = new nextflow.util.MemoryUnit(params.chunk_size).toBytes()
    println "Chunk size: ${params.chunk_size} (${chunk_size_bytes} bytes)"

    // Create chunks for each BAM file
    chunks_ch = bam_ch
        .map { bam ->
            def file_size = bam.size()
            def num_chunks = Math.ceil(file_size / chunk_size_bytes).intValue()
            println "Input BAM: ${bam} (size: ${file_size} bytes, ${num_chunks} chunks)"

            // Return list of [bam, start, end] tuples for each chunk
            (0..<num_chunks).collect { i ->
                def start = i * chunk_size_bytes
                def end = Math.min(start + chunk_size_bytes, file_size)
                [bam, start, end]
            }
        }
        .flatMap()  // Flatten the list of chunks into individual items

    // Process each chunk: bamslice -> fastp -> count (or align)
    results = processChunk(chunks_ch).view()


}

process processChunk {
    publishDir params.outdir, mode: 'copy', pattern: "*.{json,html}"

    input:
    tuple path(bam), val(start), val(end)

    output:
    stdout emit: counts

    script:
    """
    # Extract chunk and pipe through fastp for QC/filtering
    # Then count reads with wc -l
    # NOTE: Replace 'wc -l' with an aligner (e.g., 'bwa mem ref.fa - | samtools ...')
    #       or other downstream tool that accepts interleaved FASTQ on stdin
    ${params.bamslice} -i ${bam} -s ${start} -e ${end} \
        | fastp --stdin --interleaved_in \
                --stdout \
                --disable_quality_filtering \
                --disable_length_filtering \
                --json ${bam.simpleName}_chunk_${start}.json \
                --html ${bam.simpleName}_chunk_${start}.html \
        | wc -l \
        | awk '{printf "${bam.simpleName} chunk ${start}: %d reads\\n", \$1/4}'
    """
}