use crate::cli::Strandedness;
use crate::gtf::Gene;
use crate::rna::qualimap::QualimapAccum;
use crate::rna::rseqc::accumulators::{RseqcAccumulators, RseqcAnnotations, RseqcConfig};
use crate::ui::format_count;
use anyhow::{Context, Result};
use coitrees::{COITree, Interval, IntervalTree};
use indexmap::IndexMap;
use indicatif::ProgressBar;
use log::{debug, warn};
use rayon::prelude::*;
use rust_htslib::bam::{self, FetchDefinition, Read as BamRead};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
type GeneIdx = u32;
#[derive(Debug)]
struct GeneIdInterner {
id_to_idx: HashMap<String, GeneIdx>,
idx_to_id: Vec<String>,
}
impl GeneIdInterner {
fn from_genes(genes: &IndexMap<String, Gene>) -> Self {
let mut id_to_idx = HashMap::with_capacity(genes.len());
let mut idx_to_id = Vec::with_capacity(genes.len());
for (i, gene_id) in genes.keys().enumerate() {
id_to_idx.insert(gene_id.clone(), i as GeneIdx);
idx_to_id.push(gene_id.clone());
}
GeneIdInterner {
id_to_idx,
idx_to_id,
}
}
fn get(&self, gene_id: &str) -> Option<GeneIdx> {
self.id_to_idx.get(gene_id).copied()
}
fn name(&self, idx: GeneIdx) -> &str {
&self.idx_to_id[idx as usize]
}
fn len(&self) -> usize {
self.idx_to_id.len()
}
}
fn no_duplicates_error(mapped_count: u64, bam_path: &str) -> anyhow::Error {
anyhow::anyhow!(
"No duplicate-flagged reads found among {} mapped reads in '{}'.\n\
\n\
RustQC requires that BAM files have duplicates marked (SAM flag 0x400)\n\
but NOT removed. None of the reads examined so far have the duplicate\n\
flag set.\n\
\n\
Please run a duplicate-marking tool before using RustQC, for example:\n\
\n\
- Picard MarkDuplicates: picard MarkDuplicates I=input.bam O=marked.bam M=metrics.txt\n\
- samblaster: samtools view -h input.bam | samblaster | samtools view -bS - > marked.bam\n\
- sambamba markdup: sambamba markdup input.bam marked.bam\n\
\n\
If you are certain that duplicates are already marked, use --skip-dup-check to bypass this check.",
mapped_count,
bam_path
)
}
use crate::rna::bam_flags::*;
#[derive(Debug, Clone, Default)]
pub struct GeneCounts {
pub all_multi: u64,
pub nodup_multi: u64,
pub all_unique: u64,
pub nodup_unique: u64,
pub fc_reads: u64,
}
impl GeneCounts {
fn count_fragment(&mut self, is_dup: bool, is_multi: bool) {
self.all_multi += 1;
if !is_dup {
self.nodup_multi += 1;
}
if !is_multi {
self.all_unique += 1;
if !is_dup {
self.nodup_unique += 1;
}
}
}
}
fn merge_gene_hits(a: &[GeneIdx], b: &[GeneIdx]) -> Vec<GeneIdx> {
if a.is_empty() {
return b.to_vec();
}
if b.is_empty() {
return a.to_vec();
}
let mut both = Vec::new();
let mut ai = 0;
let mut bi = 0;
while ai < a.len() && bi < b.len() {
match a[ai].cmp(&b[bi]) {
std::cmp::Ordering::Equal => {
both.push(a[ai]);
ai += 1;
bi += 1;
}
std::cmp::Ordering::Less => ai += 1,
std::cmp::Ordering::Greater => bi += 1,
}
}
if !both.is_empty() {
return both;
}
let mut union = Vec::with_capacity(a.len() + b.len());
ai = 0;
bi = 0;
while ai < a.len() && bi < b.len() {
match a[ai].cmp(&b[bi]) {
std::cmp::Ordering::Less => {
union.push(a[ai]);
ai += 1;
}
std::cmp::Ordering::Greater => {
union.push(b[bi]);
bi += 1;
}
std::cmp::Ordering::Equal => {
union.push(a[ai]);
ai += 1;
bi += 1;
}
}
}
union.extend_from_slice(&a[ai..]);
union.extend_from_slice(&b[bi..]);
union
}
fn assign_fragment_to_gene(
gene_hits: &[GeneIdx],
gene_counts: &mut [GeneCounts],
is_dup: bool,
is_multi: bool,
) -> bool {
if gene_hits.len() == 1 {
let idx = gene_hits[0] as usize;
if idx < gene_counts.len() {
gene_counts[idx].count_fragment(is_dup, is_multi);
return true;
}
}
false
}
#[derive(Debug)]
pub struct CountResult {
pub gene_counts: IndexMap<String, GeneCounts>,
pub stat_total_reads: u64,
pub stat_assigned: u64,
pub stat_ambiguous: u64,
pub stat_no_features: u64,
pub stat_total_fragments: u64,
pub stat_total_mapped: u64,
pub stat_total_dup: u64,
pub stat_singleton_unmapped_mates: u64,
pub fc_assigned: u64,
pub fc_ambiguous: u64,
pub fc_no_features: u64,
pub fc_multimapping: u64,
pub fc_unmapped: u64,
pub fc_singleton: u64,
pub fc_chimera: u64,
pub biotype_reads: Vec<u64>,
pub biotype_names: Vec<String>,
pub fc_biotype_assigned: u64,
pub fc_biotype_ambiguous: u64,
pub fc_biotype_no_features: u64,
pub rseqc: Option<RseqcAccumulators>,
#[allow(dead_code)]
pub qualimap: Option<crate::rna::qualimap::QualimapResult>,
}
#[derive(Debug, Clone, Copy, Default)]
struct IvMeta {
gene_idx: GeneIdx,
strand: char,
}
type ChromIndex = COITree<IvMeta, u32>;
fn build_index(
genes: &IndexMap<String, Gene>,
interner: &GeneIdInterner,
) -> HashMap<String, ChromIndex> {
let mut chrom_intervals: HashMap<String, Vec<Interval<IvMeta>>> = HashMap::new();
for gene in genes.values() {
let gene_idx = interner
.get(&gene.gene_id)
.expect("gene must be in interner");
for exon in &gene.exons {
let iv = Interval::new(
(exon.start - 1) as i32,
(exon.end - 1) as i32,
IvMeta {
gene_idx,
strand: exon.strand,
},
);
chrom_intervals
.entry(exon.chrom.clone())
.or_default()
.push(iv);
}
}
chrom_intervals
.into_iter()
.map(|(chrom, intervals)| (chrom, COITree::new(&intervals)))
.collect()
}
fn strand_matches(
read_reverse: bool,
is_read1: bool,
paired: bool,
gene_strand: char,
stranded: Strandedness,
) -> bool {
if stranded == Strandedness::Unstranded || gene_strand == '.' {
return true; }
let read_on_plus = !read_reverse;
let effective_plus = if paired {
if is_read1 {
read_on_plus
} else {
!read_on_plus }
} else {
read_on_plus
};
match stranded {
Strandedness::Forward => {
(effective_plus && gene_strand == '+') || (!effective_plus && gene_strand == '-')
}
Strandedness::Reverse => {
(!effective_plus && gene_strand == '+') || (effective_plus && gene_strand == '-')
}
Strandedness::Unstranded => true,
}
}
fn cigar_to_aligned_blocks(
start: u64,
cigar: &rust_htslib::bam::record::CigarStringView,
blocks: &mut Vec<(u64, u64)>,
) {
use rust_htslib::bam::record::Cigar;
blocks.clear();
let mut ref_pos = start;
for op in cigar.iter() {
match op {
Cigar::Match(len) | Cigar::Equal(len) | Cigar::Diff(len) => {
let len = *len as u64;
blocks.push((ref_pos, ref_pos + len));
ref_pos += len;
}
Cigar::RefSkip(len) | Cigar::Del(len) => {
ref_pos += *len as u64;
}
Cigar::Ins(_) | Cigar::SoftClip(_) | Cigar::HardClip(_) | Cigar::Pad(_) => {}
}
}
}
#[derive(Debug)]
struct MateInfo {
gene_hits: Vec<GeneIdx>,
is_dup: bool,
is_multi: bool,
is_read1: bool,
}
type MateBufferKey = (u64, i32, i64, i32, i64, i32);
#[inline(always)]
fn hash_qname(qname: &[u8]) -> u64 {
crate::io::fnv1a(qname)
}
#[derive(Debug)]
struct ChromResult {
gene_counts: Vec<GeneCounts>,
unmatched_mates: HashMap<MateBufferKey, MateInfo>,
total_reads: u64,
total_mapped: u64,
total_dup: u64,
singleton_unmapped_mates: u64,
total_multi: u64,
total_fragments: u64,
stat_assigned: u64,
stat_ambiguous: u64,
stat_no_features: u64,
n_multi_dup: u64,
n_multi_nodup: u64,
n_unique_dup: u64,
n_unique_nodup: u64,
fc_assigned: u64,
fc_ambiguous: u64,
fc_no_features: u64,
fc_multimapping: u64,
fc_unmapped: u64,
fc_singleton: u64,
fc_chimera: u64,
biotype_reads: Vec<u64>,
fc_biotype_assigned: u64,
fc_biotype_ambiguous: u64,
fc_biotype_no_features: u64,
qualimap: Option<QualimapAccum>,
}
impl ChromResult {
fn new(num_genes: usize, num_biotypes: usize) -> Self {
ChromResult {
gene_counts: vec![GeneCounts::default(); num_genes],
unmatched_mates: HashMap::new(),
total_reads: 0,
total_mapped: 0,
total_dup: 0,
singleton_unmapped_mates: 0,
total_multi: 0,
total_fragments: 0,
stat_assigned: 0,
stat_ambiguous: 0,
stat_no_features: 0,
n_multi_dup: 0,
n_multi_nodup: 0,
n_unique_dup: 0,
n_unique_nodup: 0,
fc_assigned: 0,
fc_ambiguous: 0,
fc_no_features: 0,
fc_multimapping: 0,
fc_unmapped: 0,
fc_singleton: 0,
fc_chimera: 0,
biotype_reads: vec![0u64; num_biotypes],
fc_biotype_assigned: 0,
fc_biotype_ambiguous: 0,
fc_biotype_no_features: 0,
qualimap: None,
}
}
fn merge(&mut self, other: ChromResult) {
for (i, counts) in other.gene_counts.into_iter().enumerate() {
self.gene_counts[i].all_multi += counts.all_multi;
self.gene_counts[i].nodup_multi += counts.nodup_multi;
self.gene_counts[i].all_unique += counts.all_unique;
self.gene_counts[i].nodup_unique += counts.nodup_unique;
self.gene_counts[i].fc_reads += counts.fc_reads;
}
for (key, other_info) in other.unmatched_mates {
if let Some(self_info) = self.unmatched_mates.remove(&key) {
if self_info.is_read1 != other_info.is_read1 {
self.total_fragments += 1;
let frag_is_dup = self_info.is_dup;
let frag_is_multi = self_info.is_multi;
self.n_multi_dup += 1;
self.n_unique_dup += 1;
if !frag_is_dup {
self.n_multi_nodup += 1;
self.n_unique_nodup += 1;
}
let combined_genes =
merge_gene_hits(&self_info.gene_hits, &other_info.gene_hits);
if combined_genes.is_empty() {
self.stat_no_features += 1;
} else if combined_genes.len() > 1 {
self.stat_ambiguous += 1;
} else if assign_fragment_to_gene(
&combined_genes,
&mut self.gene_counts,
frag_is_dup,
frag_is_multi,
) {
self.stat_assigned += 1;
}
} else {
self.unmatched_mates.insert(key, self_info);
}
} else {
self.unmatched_mates.insert(key, other_info);
}
}
self.total_reads += other.total_reads;
self.total_mapped += other.total_mapped;
self.total_dup += other.total_dup;
self.singleton_unmapped_mates += other.singleton_unmapped_mates;
self.total_multi += other.total_multi;
self.total_fragments += other.total_fragments;
self.stat_assigned += other.stat_assigned;
self.stat_ambiguous += other.stat_ambiguous;
self.stat_no_features += other.stat_no_features;
self.n_multi_dup += other.n_multi_dup;
self.n_multi_nodup += other.n_multi_nodup;
self.n_unique_dup += other.n_unique_dup;
self.n_unique_nodup += other.n_unique_nodup;
self.fc_assigned += other.fc_assigned;
self.fc_ambiguous += other.fc_ambiguous;
self.fc_no_features += other.fc_no_features;
self.fc_multimapping += other.fc_multimapping;
self.fc_unmapped += other.fc_unmapped;
self.fc_singleton += other.fc_singleton;
self.fc_chimera += other.fc_chimera;
for (i, &count) in other.biotype_reads.iter().enumerate() {
self.biotype_reads[i] += count;
}
self.fc_biotype_assigned += other.fc_biotype_assigned;
self.fc_biotype_ambiguous += other.fc_biotype_ambiguous;
self.fc_biotype_no_features += other.fc_biotype_no_features;
if let Some(other_qm) = other.qualimap {
if let Some(ref mut self_qm) = self.qualimap {
self_qm.merge(other_qm);
} else {
self.qualimap = Some(other_qm);
}
}
}
}
fn classify_read_fc(is_multi: bool, gene_hits: &[GeneIdx], result: &mut ChromResult) {
if is_multi {
result.fc_multimapping += 1;
} else if gene_hits.is_empty() {
result.fc_no_features += 1;
} else if gene_hits.len() == 1 {
result.fc_assigned += 1;
let idx = gene_hits[0] as usize;
if idx < result.gene_counts.len() {
result.gene_counts[idx].fc_reads += 1;
}
} else {
result.fc_ambiguous += 1;
}
}
fn classify_read_fc_biotype(
is_multi: bool,
gene_hits: &[GeneIdx],
gene_to_biotype: &[u16],
biotype_hits_buf: &mut Vec<u16>,
result: &mut ChromResult,
) {
if is_multi {
return;
}
if gene_hits.is_empty() {
result.fc_biotype_no_features += 1;
return;
}
biotype_hits_buf.clear();
let mut unknown_gene_count: u32 = 0;
for &gidx in gene_hits {
let bidx = gene_to_biotype[gidx as usize];
if bidx != u16::MAX {
biotype_hits_buf.push(bidx);
} else {
unknown_gene_count += 1;
}
}
biotype_hits_buf.sort_unstable();
biotype_hits_buf.dedup();
let total_meta_features = biotype_hits_buf.len() as u32 + unknown_gene_count;
if total_meta_features == 1 {
if let Some(&bidx) = biotype_hits_buf.first() {
let idx = bidx as usize;
if idx < result.biotype_reads.len() {
result.biotype_reads[idx] += 1;
result.fc_biotype_assigned += 1;
}
} else {
result.fc_biotype_assigned += 1;
}
} else {
result.fc_biotype_ambiguous += 1;
}
}
#[inline]
#[allow(clippy::too_many_arguments)]
fn process_counting_record(
record: &bam::Record,
result: &mut ChromResult,
index: &HashMap<String, ChromIndex>,
chrom: &str,
stranded: Strandedness,
paired: bool,
gene_to_biotype: &[u16],
aligned_blocks_buf: &mut Vec<(u64, u64)>,
gene_hits: &mut Vec<GeneIdx>,
biotype_hits_buf: &mut Vec<u16>,
mate_buffer: &mut HashMap<MateBufferKey, MateInfo>,
) {
let flags = record.flags();
if flags & BAM_FUNMAP != 0 {
result.fc_unmapped += 1;
if paired && flags & BAM_FMUNMAP == 0 {
result.singleton_unmapped_mates += 1;
}
return;
}
if flags & BAM_FSUPPLEMENTARY != 0 {
return;
}
if flags & BAM_FQCFAIL != 0 {
return;
}
if paired && flags & BAM_FPAIRED == 0 {
return;
}
result.total_mapped += 1;
let is_dup = flags & BAM_FDUP != 0;
if is_dup {
result.total_dup += 1;
}
let is_multi = crate::rna::bam_flags::get_aux_int(record, b"NH").is_some_and(|nh| nh > 1);
if is_multi {
result.total_multi += 1;
}
let is_reverse = flags & BAM_FREVERSE != 0;
let is_read1 = flags & BAM_FREAD1 != 0;
gene_hits.clear();
if let Some(chrom_idx) = index.get(chrom) {
cigar_to_aligned_blocks(record.pos() as u64, &record.cigar(), aligned_blocks_buf);
for &(block_start, block_end) in aligned_blocks_buf.iter() {
chrom_idx.query(block_start as i32, (block_end - 1) as i32, |node| {
let meta = node.metadata;
if strand_matches(is_reverse, is_read1, paired, meta.strand, stranded) {
gene_hits.push(meta.gene_idx);
}
});
}
gene_hits.sort_unstable();
gene_hits.dedup();
}
classify_read_fc(is_multi, gene_hits, result);
classify_read_fc_biotype(
is_multi,
gene_hits,
gene_to_biotype,
biotype_hits_buf,
result,
);
if !paired {
result.n_multi_dup += 1;
result.n_unique_dup += 1;
if !is_dup {
result.n_multi_nodup += 1;
result.n_unique_nodup += 1;
}
result.total_fragments += 1;
if gene_hits.is_empty() {
result.stat_no_features += 1;
} else if gene_hits.len() > 1 {
result.stat_ambiguous += 1;
} else if assign_fragment_to_gene(gene_hits, &mut result.gene_counts, is_dup, is_multi) {
result.stat_assigned += 1;
}
return;
}
let qname_hash = hash_qname(record.qname());
let own_pos = record.pos();
let own_tid = record.tid();
let mate_pos_val = record.mpos();
let mate_tid = record.mtid();
let hi_tag: i32 = crate::rna::bam_flags::get_aux_int(record, b"HI").map_or(-1, |v| v as i32);
let buffer_key: MateBufferKey = if is_read1 {
(qname_hash, own_tid, own_pos, mate_tid, mate_pos_val, hi_tag)
} else {
(qname_hash, mate_tid, mate_pos_val, own_tid, own_pos, hi_tag)
};
let matched = mate_buffer.remove(&buffer_key).and_then(|mate_info| {
if mate_info.is_read1 != is_read1 {
Some(mate_info)
} else {
mate_buffer.insert(buffer_key, mate_info);
None
}
});
if let Some(mate_info) = matched {
result.total_fragments += 1;
let frag_is_dup = if is_read1 { is_dup } else { mate_info.is_dup };
let frag_is_multi = if is_read1 {
is_multi
} else {
mate_info.is_multi
};
result.n_multi_dup += 1;
result.n_unique_dup += 1;
if !frag_is_dup {
result.n_multi_nodup += 1;
result.n_unique_nodup += 1;
}
let combined_genes: Vec<GeneIdx> = merge_gene_hits(&mate_info.gene_hits, gene_hits);
if combined_genes.is_empty() {
result.stat_no_features += 1;
} else if combined_genes.len() > 1 {
result.stat_ambiguous += 1;
} else if assign_fragment_to_gene(
&combined_genes,
&mut result.gene_counts,
frag_is_dup,
frag_is_multi,
) {
result.stat_assigned += 1;
}
} else {
mate_buffer.insert(
buffer_key,
MateInfo {
gene_hits: std::mem::take(gene_hits),
is_dup,
is_multi,
is_read1,
},
);
}
}
#[allow(clippy::too_many_arguments)]
fn process_chromosome_batch(
bam_path: &str,
tids: &[u32],
tid_to_name: &[String],
index: &HashMap<String, ChromIndex>,
num_genes: usize,
stranded: Strandedness,
paired: bool,
chrom_mapping: &HashMap<String, String>,
chrom_prefix: Option<&str>,
global_read_counter: &AtomicU64,
reference: Option<&str>,
rseqc_config: Option<&RseqcConfig>,
rseqc_annotations: Option<&RseqcAnnotations>,
htslib_threads: usize,
qualimap_index: Option<&crate::rna::qualimap::QualimapIndex>,
gene_to_biotype: &[u16],
num_biotypes: usize,
progress: Option<&ProgressBar>,
) -> Result<(ChromResult, Option<RseqcAccumulators>)> {
let mut result = ChromResult::new(num_genes, num_biotypes);
if qualimap_index.is_some() {
result.qualimap = Some(crate::rna::qualimap::QualimapAccum::new(stranded));
}
let mut rseqc_accums = rseqc_config.map(|cfg| RseqcAccumulators::new(cfg, rseqc_annotations));
let tid_to_gtf_chrom: Vec<String> = tid_to_name
.iter()
.map(|bam_name| {
if let Some(mapped) = chrom_mapping.get(bam_name.as_str()) {
mapped.clone()
} else if let Some(prefix) = chrom_prefix {
format!("{}{}", prefix, bam_name)
} else {
bam_name.clone()
}
})
.collect();
let tid_to_rseqc_chrom = &tid_to_gtf_chrom;
let tid_to_rseqc_chrom_upper: Vec<String> =
tid_to_gtf_chrom.iter().map(|s| s.to_uppercase()).collect();
let mut bam = bam::IndexedReader::from_path(bam_path)
.with_context(|| format!("Failed to open indexed alignment file: {}", bam_path))?;
if let Some(ref_path) = reference {
bam.set_reference(ref_path)
.with_context(|| format!("Failed to set reference FASTA: {}", ref_path))?;
}
if htslib_threads > 0 {
bam.set_threads(htslib_threads)
.context("Failed to set htslib decompression threads")?;
}
let mut aligned_blocks_buf: Vec<(u64, u64)> = Vec::new();
let mut gene_hits: Vec<GeneIdx> = Vec::new();
let mut biotype_hits_buf: Vec<u16> = Vec::new();
let mut mate_buffer: HashMap<MateBufferKey, MateInfo> = HashMap::new();
let mut record = bam::Record::new();
for &tid in tids {
bam.fetch(tid)
.with_context(|| format!("Failed to seek to tid {}", tid))?;
while let Some(read_result) = bam.read(&mut record) {
read_result.context("Error reading alignment record")?;
result.total_reads += 1;
let prev = global_read_counter.fetch_add(1, Ordering::Relaxed);
if (prev + 1).is_multiple_of(500_000) {
if let Some(pb) = progress {
pb.set_message(format!("{} reads processed", format_count(prev + 1)));
}
}
if let (Some(ref mut accums), Some(annots), Some(cfg)) =
(&mut rseqc_accums, rseqc_annotations, rseqc_config)
{
let rec_tid = record.tid();
let (chrom, chrom_upper) =
if rec_tid >= 0 && (rec_tid as usize) < tid_to_rseqc_chrom.len() {
(
tid_to_rseqc_chrom[rec_tid as usize].as_str(),
tid_to_rseqc_chrom_upper[rec_tid as usize].as_str(),
)
} else {
("", "")
};
accums.process_read(&record, chrom, chrom_upper, annots, cfg);
}
if let (Some(ref mut qm), Some(qm_index)) = (&mut result.qualimap, qualimap_index) {
let tid = record.tid();
if tid >= 0 && (tid as usize) < tid_to_gtf_chrom.len() {
let qm_chrom = &tid_to_gtf_chrom[tid as usize];
qm.process_read(&record, qm_chrom, qm_index);
}
}
let rec_tid = record.tid();
let chrom = if rec_tid >= 0 && (rec_tid as usize) < tid_to_gtf_chrom.len() {
tid_to_gtf_chrom[rec_tid as usize].as_str()
} else {
""
};
process_counting_record(
&record,
&mut result,
index,
chrom,
stranded,
paired,
gene_to_biotype,
&mut aligned_blocks_buf,
&mut gene_hits,
&mut biotype_hits_buf,
&mut mate_buffer,
);
}
}
result.unmatched_mates = mate_buffer;
Ok((result, rseqc_accums))
}
fn partition_chromosomes(lengths: &[u64], num_workers: usize) -> Vec<Vec<u32>> {
let mut batches: Vec<Vec<u32>> = vec![Vec::new(); num_workers];
let mut loads: Vec<u64> = vec![0; num_workers];
let mut order: Vec<u32> = (0..lengths.len() as u32).collect();
order.sort_unstable_by(|&a, &b| lengths[b as usize].cmp(&lengths[a as usize]));
for tid in order {
let min_worker = loads
.iter()
.enumerate()
.min_by_key(|&(_, &load)| load)
.map(|(i, _)| i)
.unwrap_or(0);
batches[min_worker].push(tid);
loads[min_worker] += lengths[tid as usize];
}
batches
}
#[allow(clippy::too_many_arguments)]
pub fn count_reads(
bam_path: &str,
genes: &IndexMap<String, Gene>,
stranded: Strandedness,
paired: bool,
threads: usize,
chrom_mapping: &HashMap<String, String>,
chrom_prefix: Option<&str>,
reference: Option<&str>,
skip_dup_check: bool,
biotype_attribute: &str,
rseqc_config: Option<&RseqcConfig>,
rseqc_annotations: Option<&RseqcAnnotations>,
qualimap_index: Option<&crate::rna::qualimap::QualimapIndex>,
progress: Option<&ProgressBar>,
) -> Result<CountResult> {
let interner = GeneIdInterner::from_genes(genes);
let index = build_index(genes, &interner);
let (tid_to_name, tid_to_len): (Vec<String>, Vec<u64>) = {
let mut bam = bam::Reader::from_path(bam_path)
.with_context(|| format!("Failed to open alignment file: {}", bam_path))?;
if let Some(ref_path) = reference {
bam.set_reference(ref_path)
.with_context(|| format!("Failed to set reference FASTA: {}", ref_path))?;
}
let header = bam.header().clone();
let names = (0..header.target_count())
.map(|tid| String::from_utf8_lossy(header.tid2name(tid)).to_string())
.collect();
let lengths = (0..header.target_count())
.map(|tid| header.target_len(tid).unwrap_or(0))
.collect();
(names, lengths)
};
if skip_dup_check {
log::info!("Skipping duplicate-marking verification (--skip-dup-check)");
}
let use_parallel = threads > 1 && {
let idx_check = bam::IndexedReader::from_path(bam_path);
if let Ok(mut reader) = idx_check {
if let Some(ref_path) = reference {
reader.set_reference(ref_path).is_ok()
} else {
true
}
} else {
false
}
};
if threads > 1 && !use_parallel {
warn!(
"Alignment index not found for {}; falling back to single-threaded processing. \
Create an index with 'samtools index' to enable parallel processing.",
bam_path
);
}
let mut biotype_names: Vec<String> = Vec::new();
let mut biotype_name_to_idx: HashMap<String, u16> = HashMap::new();
let gene_to_biotype: Vec<u16> = genes
.values()
.map(|gene| {
if let Some(bt) = gene.attributes.get(biotype_attribute) {
let next_idx = biotype_names.len() as u16;
*biotype_name_to_idx.entry(bt.clone()).or_insert_with(|| {
biotype_names.push(bt.clone());
next_idx
})
} else {
u16::MAX }
})
.collect();
let num_biotypes = biotype_names.len();
let (mut merged, mut merged_rseqc) = if use_parallel {
let num_chroms = tid_to_name.len();
let num_workers = threads.min(num_chroms).max(1);
let batches = partition_chromosomes(&tid_to_len, num_workers);
if log::log_enabled!(log::Level::Debug) {
let worker_loads: Vec<u64> = batches
.iter()
.map(|b| b.iter().map(|&tid| tid_to_len[tid as usize]).sum())
.collect();
debug!(
"Chromosome load distribution across {} workers: {:?}",
num_workers, worker_loads
);
}
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(num_workers)
.build()
.context("Failed to build rayon thread pool")?;
let global_read_counter = AtomicU64::new(0);
debug!(
"Processing {} chromosomes across {} threads",
num_chroms, num_workers
);
let htslib_threads = if num_workers > 0 {
((threads.saturating_sub(num_workers)) / num_workers).max(1)
} else {
0
};
let results: Vec<Result<(ChromResult, Option<RseqcAccumulators>)>> = pool.install(|| {
batches
.par_iter()
.map(|batch| {
process_chromosome_batch(
bam_path,
batch,
&tid_to_name,
&index,
interner.len(),
stranded,
paired,
chrom_mapping,
chrom_prefix,
&global_read_counter,
reference,
rseqc_config,
rseqc_annotations,
htslib_threads,
qualimap_index,
&gene_to_biotype,
num_biotypes,
progress,
)
})
.collect()
});
let mut merged = ChromResult::new(interner.len(), num_biotypes);
let mut merged_rseqc: Option<RseqcAccumulators> =
rseqc_config.map(|cfg| RseqcAccumulators::new(cfg, rseqc_annotations));
for result in results {
let (chrom_result, rseqc_result) = result?;
merged.merge(chrom_result);
if let (Some(ref mut merged_acc), Some(worker_acc)) = (&mut merged_rseqc, rseqc_result)
{
merged_acc.merge(worker_acc);
}
}
(merged, merged_rseqc)
} else {
let global_read_counter = AtomicU64::new(0);
let mut result = ChromResult::new(interner.len(), num_biotypes);
let mut rseqc_accums: Option<RseqcAccumulators> =
rseqc_config.map(|cfg| RseqcAccumulators::new(cfg, rseqc_annotations));
let mut qualimap_accum: Option<crate::rna::qualimap::QualimapAccum> =
qualimap_index.map(|_| crate::rna::qualimap::QualimapAccum::new(stranded));
let tid_to_rseqc_chrom: Vec<String> = tid_to_name
.iter()
.map(|bam_name| {
if let Some(mapped) = chrom_mapping.get(bam_name.as_str()) {
mapped.clone()
} else if let Some(prefix) = chrom_prefix {
format!("{}{}", prefix, bam_name)
} else {
bam_name.clone()
}
})
.collect();
let tid_to_rseqc_chrom_upper: Vec<String> = tid_to_rseqc_chrom
.iter()
.map(|s| s.to_uppercase())
.collect();
let mut bam = bam::Reader::from_path(bam_path)
.with_context(|| format!("Failed to open alignment file: {}", bam_path))?;
if let Some(ref_path) = reference {
bam.set_reference(ref_path)
.with_context(|| format!("Failed to set reference FASTA: {}", ref_path))?;
}
if threads > 1 {
bam.set_threads(threads.saturating_sub(1))
.context("Failed to set htslib decompression threads")?;
}
let mut aligned_blocks_buf: Vec<(u64, u64)> = Vec::new();
let mut gene_hits: Vec<GeneIdx> = Vec::new();
let mut biotype_hits_buf: Vec<u16> = Vec::new();
let mut mate_buffer: HashMap<MateBufferKey, MateInfo> = HashMap::new();
let mut record = bam::Record::new();
while let Some(read_result) = bam.read(&mut record) {
read_result.context("Error reading alignment record")?;
result.total_reads += 1;
let prev = global_read_counter.fetch_add(1, Ordering::Relaxed);
if (prev + 1).is_multiple_of(500_000) {
if let Some(pb) = progress {
pb.set_message(format!("{} reads processed", format_count(prev + 1)));
}
}
let flags = record.flags();
if let (Some(ref mut accums), Some(annots), Some(cfg)) =
(&mut rseqc_accums, rseqc_annotations, rseqc_config)
{
let tid = record.tid();
let (chrom, chrom_upper) = if tid >= 0 && (tid as usize) < tid_to_rseqc_chrom.len()
{
(
tid_to_rseqc_chrom[tid as usize].as_str(),
tid_to_rseqc_chrom_upper[tid as usize].as_str(),
)
} else {
("", "")
};
accums.process_read(&record, chrom, chrom_upper, annots, cfg);
}
if let (Some(ref mut qm_accum), Some(qm_index)) = (&mut qualimap_accum, qualimap_index)
{
let tid = record.tid();
if tid >= 0 && (tid as usize) < tid_to_rseqc_chrom.len() {
let chrom = &tid_to_rseqc_chrom[tid as usize];
qm_accum.process_read(&record, chrom, qm_index);
} else if flags & BAM_FUNMAP != 0 {
qm_accum.counters.not_aligned += 1;
}
}
let tid = record.tid();
let chrom = if tid >= 0 && (tid as usize) < tid_to_rseqc_chrom.len() {
tid_to_rseqc_chrom[tid as usize].as_str()
} else {
""
};
process_counting_record(
&record,
&mut result,
&index,
chrom,
stranded,
paired,
&gene_to_biotype,
&mut aligned_blocks_buf,
&mut gene_hits,
&mut biotype_hits_buf,
&mut mate_buffer,
);
}
result.unmatched_mates = mate_buffer;
result.qualimap = qualimap_accum;
(result, rseqc_accums)
};
if use_parallel {
let mut bam = bam::IndexedReader::from_path(bam_path).with_context(|| {
format!(
"Failed to open indexed BAM for unmapped sweep: {}",
bam_path
)
})?;
if let Some(ref_path) = reference {
bam.set_reference(ref_path)
.with_context(|| format!("Failed to set reference FASTA: {}", ref_path))?;
}
bam.fetch(FetchDefinition::Unmapped)
.context("Failed to seek to unmapped segment")?;
let mut record = bam::Record::new();
let mut unmapped_count: u64 = 0;
while let Some(read_result) = bam.read(&mut record) {
read_result.context("Error reading unmapped record")?;
unmapped_count += 1;
merged.total_reads += 1;
if let (Some(ref mut accums), Some(annots), Some(cfg)) =
(&mut merged_rseqc, rseqc_annotations, rseqc_config)
{
accums.process_read(&record, "", "", annots, cfg);
}
if let Some(ref mut qm) = merged.qualimap {
let tid = record.tid();
if tid >= 0 {
if let Some(qm_index) = qualimap_index {
if (tid as usize) < tid_to_name.len() {
qm.process_read(&record, &tid_to_name[tid as usize], qm_index);
}
}
} else if record.flags() & BAM_FUNMAP != 0 {
qm.counters.not_aligned += 1;
}
}
let flags = record.flags();
if flags & BAM_FUNMAP != 0 {
merged.fc_unmapped += 1;
if paired && flags & BAM_FMUNMAP == 0 {
merged.singleton_unmapped_mates += 1;
}
}
}
if unmapped_count > 0 {
debug!(
"Parallel unmapped sweep: processed {} unmapped reads",
unmapped_count
);
}
}
if paired && !merged.unmatched_mates.is_empty() {
let unmatched: Vec<(MateBufferKey, MateInfo)> = merged.unmatched_mates.drain().collect();
let mut still_unmatched: HashMap<MateBufferKey, MateInfo> = HashMap::new();
for (key, info) in unmatched {
if let Some(mate_info) = still_unmatched.remove(&key) {
if info.is_read1 == mate_info.is_read1 {
still_unmatched.insert(key, mate_info);
continue;
}
merged.total_fragments += 1;
let frag_is_dup = if info.is_read1 {
info.is_dup
} else {
mate_info.is_dup
};
let frag_is_multi = if info.is_read1 {
info.is_multi
} else {
mate_info.is_multi
};
merged.n_multi_dup += 1;
merged.n_unique_dup += 1;
if !frag_is_dup {
merged.n_multi_nodup += 1;
merged.n_unique_nodup += 1;
}
let combined_genes: Vec<GeneIdx> =
merge_gene_hits(&mate_info.gene_hits, &info.gene_hits);
if combined_genes.is_empty() {
merged.stat_no_features += 1;
} else if combined_genes.len() > 1 {
merged.stat_ambiguous += 1;
} else if assign_fragment_to_gene(
&combined_genes,
&mut merged.gene_counts,
frag_is_dup,
frag_is_multi,
) {
merged.stat_assigned += 1;
}
} else {
still_unmatched.insert(key, info);
}
}
for (_key, mate_info) in still_unmatched.drain() {
merged.total_fragments += 1;
merged.n_multi_dup += 1;
merged.n_unique_dup += 1;
if !mate_info.is_dup {
merged.n_multi_nodup += 1;
merged.n_unique_nodup += 1;
}
if mate_info.gene_hits.is_empty() {
merged.stat_no_features += 1;
} else if mate_info.gene_hits.len() > 1 {
merged.stat_ambiguous += 1;
} else if assign_fragment_to_gene(
&mate_info.gene_hits,
&mut merged.gene_counts,
mate_info.is_dup,
mate_info.is_multi,
) {
merged.stat_assigned += 1;
}
}
}
debug!(
"Read {} total reads, {} mapped, {} fragments, {} duplicates, {} multimappers",
merged.total_reads,
merged.total_mapped,
merged.total_fragments,
merged.total_dup,
merged.total_multi
);
debug!(
"Assignment stats: {} assigned, {} ambiguous, {} no_features (total fragments: {})",
merged.stat_assigned,
merged.stat_ambiguous,
merged.stat_no_features,
merged.total_fragments
);
if merged.total_dup == 0 && merged.total_mapped > 0 && !skip_dup_check {
return Err(no_duplicates_error(merged.total_mapped, bam_path));
}
let genes_with_reads = merged
.gene_counts
.iter()
.filter(|c| c.all_multi > 0)
.count();
if merged.total_mapped > 0 && genes_with_reads == 0 {
let bam_chroms: Vec<&str> = tid_to_name.iter().take(5).map(|s| s.as_str()).collect();
let gtf_chroms: Vec<&str> = index.keys().take(5).map(|s| s.as_str()).collect();
anyhow::bail!(
"Chromosome name mismatch: no reads could be assigned to any gene.\n\
\n\
Alignment chromosomes (first 5): {}\n\
GTF chromosomes (first 5): {}\n\
\n\
The alignment and GTF files appear to use different chromosome naming conventions.\n\
To fix this, create a YAML config file with a chromosome_mapping section and pass it via --config:\n\
\n\
Example config.yaml:\n\
\n\
chromosome_mapping:\n\
{}",
bam_chroms.join(", "),
gtf_chroms.join(", "),
bam_chroms
.iter()
.zip(gtf_chroms.iter())
.map(|(b, g)| format!(" {}: {}", g, b))
.collect::<Vec<_>>()
.join("\n")
);
}
let gene_counts_map: IndexMap<String, GeneCounts> = (0..interner.len())
.map(|i| {
let name = interner.name(i as GeneIdx).to_string();
let counts = std::mem::take(&mut merged.gene_counts[i]);
(name, counts)
})
.collect();
Ok(CountResult {
gene_counts: gene_counts_map,
stat_total_reads: merged.total_reads,
stat_assigned: merged.stat_assigned,
stat_ambiguous: merged.stat_ambiguous,
stat_no_features: merged.stat_no_features,
stat_total_fragments: merged.total_fragments,
stat_total_mapped: merged.total_mapped,
stat_total_dup: merged.total_dup,
stat_singleton_unmapped_mates: merged.singleton_unmapped_mates,
fc_assigned: merged.fc_assigned,
fc_ambiguous: merged.fc_ambiguous,
fc_no_features: merged.fc_no_features,
fc_multimapping: merged.fc_multimapping,
fc_unmapped: merged.fc_unmapped,
fc_singleton: merged.fc_singleton,
fc_chimera: merged.fc_chimera,
biotype_reads: merged.biotype_reads,
biotype_names,
fc_biotype_assigned: merged.fc_biotype_assigned,
fc_biotype_ambiguous: merged.fc_biotype_ambiguous,
fc_biotype_no_features: merged.fc_biotype_no_features,
rseqc: merged_rseqc,
qualimap: match (merged.qualimap, qualimap_index) {
(Some(mut a), Some(idx)) => {
a.flush_unpaired(idx);
Some(a.into_result())
}
_ => None,
},
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_strand_matching_unstranded() {
let u = Strandedness::Unstranded;
assert!(strand_matches(false, true, false, '+', u));
assert!(strand_matches(true, true, false, '+', u));
assert!(strand_matches(false, true, false, '-', u));
assert!(strand_matches(true, true, false, '-', u));
}
#[test]
fn test_strand_matching_forward_single() {
let f = Strandedness::Forward;
assert!(strand_matches(false, true, false, '+', f));
assert!(strand_matches(true, true, false, '-', f));
assert!(!strand_matches(false, true, false, '-', f));
assert!(!strand_matches(true, true, false, '+', f));
}
#[test]
fn test_strand_matching_reverse_single() {
let r = Strandedness::Reverse;
assert!(strand_matches(false, true, false, '-', r));
assert!(strand_matches(true, true, false, '+', r));
}
#[test]
fn test_strand_matching_forward_paired() {
let f = Strandedness::Forward;
assert!(strand_matches(false, true, true, '+', f));
assert!(strand_matches(false, false, true, '-', f));
assert!(strand_matches(true, true, true, '-', f));
assert!(strand_matches(true, false, true, '+', f));
}
fn make_test_chrom_result(num_genes: usize) -> ChromResult {
ChromResult::new(num_genes, 0)
}
#[test]
fn test_fc_classify_multimapped_read() {
let mut result = make_test_chrom_result(3);
let gene_hits: Vec<GeneIdx> = vec![0];
classify_read_fc(true, &gene_hits, &mut result);
assert_eq!(
result.fc_multimapping, 1,
"Multi-mapped read should be counted as fc_multimapping"
);
assert_eq!(result.fc_assigned, 0);
assert_eq!(result.fc_ambiguous, 0);
assert_eq!(result.fc_no_features, 0);
assert_eq!(
result.gene_counts[0].fc_reads, 0,
"Multi-mapped read should not credit any gene"
);
}
#[test]
fn test_fc_classify_no_gene_hits() {
let mut result = make_test_chrom_result(3);
let gene_hits: Vec<GeneIdx> = vec![];
classify_read_fc(false, &gene_hits, &mut result);
assert_eq!(
result.fc_no_features, 1,
"Read with no hits should be fc_no_features"
);
assert_eq!(result.fc_assigned, 0);
assert_eq!(result.fc_ambiguous, 0);
assert_eq!(result.fc_multimapping, 0);
}
#[test]
fn test_fc_classify_single_gene_hit() {
let mut result = make_test_chrom_result(3);
let gene_hits: Vec<GeneIdx> = vec![1];
classify_read_fc(false, &gene_hits, &mut result);
assert_eq!(
result.fc_assigned, 1,
"Single gene hit should be fc_assigned"
);
assert_eq!(
result.gene_counts[1].fc_reads, 1,
"Single gene hit should credit that gene"
);
assert_eq!(result.gene_counts[0].fc_reads, 0);
assert_eq!(result.gene_counts[2].fc_reads, 0);
assert_eq!(result.fc_ambiguous, 0);
assert_eq!(result.fc_no_features, 0);
}
#[test]
fn test_fc_classify_multi_gene_hits_ambiguous() {
let mut result = make_test_chrom_result(4);
let gene_hits: Vec<GeneIdx> = vec![0, 1];
classify_read_fc(false, &gene_hits, &mut result);
assert_eq!(
result.fc_ambiguous, 1,
"Multi-gene hits should be fc_ambiguous"
);
assert_eq!(result.fc_assigned, 0);
assert_eq!(
result.gene_counts[0].fc_reads, 0,
"Ambiguous read should not credit any gene"
);
assert_eq!(result.gene_counts[1].fc_reads, 0);
}
#[test]
fn test_fc_classify_three_gene_hits_ambiguous() {
let mut result = make_test_chrom_result(5);
let gene_hits: Vec<GeneIdx> = vec![0, 1, 2];
classify_read_fc(false, &gene_hits, &mut result);
assert_eq!(
result.fc_ambiguous, 1,
"Three-gene hits should be fc_ambiguous"
);
assert_eq!(result.fc_assigned, 0);
assert_eq!(result.gene_counts[0].fc_reads, 0);
assert_eq!(result.gene_counts[1].fc_reads, 0);
assert_eq!(result.gene_counts[2].fc_reads, 0);
}
#[test]
fn test_fc_classify_cumulative_counts() {
let mut result = make_test_chrom_result(3);
classify_read_fc(false, &[0], &mut result);
classify_read_fc(false, &[0], &mut result);
classify_read_fc(true, &[0], &mut result);
classify_read_fc(false, &[], &mut result);
classify_read_fc(false, &[0, 1], &mut result);
assert_eq!(result.fc_assigned, 2);
assert_eq!(result.fc_multimapping, 1);
assert_eq!(result.fc_no_features, 1);
assert_eq!(result.fc_ambiguous, 1);
assert_eq!(result.gene_counts[0].fc_reads, 2);
assert_eq!(result.gene_counts[1].fc_reads, 0);
}
#[test]
fn test_partition_chromosomes_balanced() {
let lengths = vec![
250, 243, 198, 191, 182, 171, 159, 146, 138, 133, 135, 130, 57,
];
let batches = partition_chromosomes(&lengths, 4);
assert_eq!(batches.len(), 4);
let mut all_tids: Vec<u32> = batches.iter().flatten().copied().collect();
all_tids.sort();
assert_eq!(all_tids, (0..13).collect::<Vec<u32>>());
let total: u64 = lengths.iter().sum();
let avg = total as f64 / 4.0;
for batch in &batches {
let load: u64 = batch.iter().map(|&tid| lengths[tid as usize]).sum();
assert!(
(load as f64) < avg * 1.3,
"Worker load {} exceeds 1.3× average {:.0}",
load,
avg
);
}
}
#[test]
fn test_partition_chromosomes_single_worker() {
let lengths = vec![100, 200, 300];
let batches = partition_chromosomes(&lengths, 1);
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].len(), 3);
}
#[test]
fn test_partition_chromosomes_more_workers_than_chroms() {
let lengths = vec![100, 200];
let batches = partition_chromosomes(&lengths, 4);
assert_eq!(batches.len(), 4);
let non_empty: usize = batches.iter().filter(|b| !b.is_empty()).count();
assert_eq!(non_empty, 2);
}
}