use crate::cli::Strandedness;
use crate::gtf::Gene;
use crate::rna::qualimap::QualimapAccum;
use crate::rna::rseqc::accumulators::{RseqcAccumulators, RseqcAnnotations, RseqcConfig};
use crate::ui::format_count;
use anyhow::{Context, Result};
use coitrees::{COITree, Interval, IntervalTree};
use indexmap::IndexMap;
use indicatif::ProgressBar;
use log::{debug, warn};
use rayon::prelude::*;
use rust_htslib::bam::{self, FetchDefinition, Read as BamRead};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
type GeneIdx = u32;
#[derive(Debug)]
struct GeneIdInterner {
id_to_idx: HashMap<String, GeneIdx>,
idx_to_id: Vec<String>,
}
impl GeneIdInterner {
fn from_genes(genes: &IndexMap<String, Gene>) -> Self {
let mut id_to_idx = HashMap::with_capacity(genes.len());
let mut idx_to_id = Vec::with_capacity(genes.len());
for (i, gene_id) in genes.keys().enumerate() {
id_to_idx.insert(gene_id.clone(), i as GeneIdx);
idx_to_id.push(gene_id.clone());
}
GeneIdInterner {
id_to_idx,
idx_to_id,
}
}
fn get(&self, gene_id: &str) -> Option<GeneIdx> {
self.id_to_idx.get(gene_id).copied()
}
fn name(&self, idx: GeneIdx) -> &str {
&self.idx_to_id[idx as usize]
}
fn len(&self) -> usize {
self.idx_to_id.len()
}
}
const KNOWN_DUP_MARKERS: &[&str] = &[
"markduplicates",
"samblaster",
"sambamba markdup",
"sambamba_markdup",
"biobambam",
"estreamer",
"fgbio",
"umis",
"umi_tools",
"umi-tools",
"gencore",
"gatk markduplicates",
"sentieon dedup",
];
fn extract_header_tag<'a>(line: &'a str, tag: &str) -> Option<&'a str> {
let prefix = format!("{}:", tag);
line.split('\t')
.find(|field| field.starts_with(&prefix))
.map(|field| &field[prefix.len()..])
}
fn header_has_dup_marker(header_text: &str) -> bool {
for line in header_text.lines() {
if !line.starts_with("@PG") {
continue;
}
let id = extract_header_tag(line, "ID").unwrap_or("");
let pn = extract_header_tag(line, "PN").unwrap_or("");
let id_lower = id.to_lowercase();
let pn_lower = pn.to_lowercase();
if KNOWN_DUP_MARKERS
.iter()
.any(|marker| id_lower.contains(marker) || pn_lower.contains(marker))
{
debug!("Found duplicate-marking tool in @PG header: {}", line);
return true;
}
}
false
}
fn verify_duplicates_marked(header: &bam::HeaderView, bam_path: &str) -> Result<()> {
let header_text = String::from_utf8_lossy(header.as_bytes());
if header_has_dup_marker(&header_text) {
return Ok(());
}
anyhow::bail!(
"No duplicate-marking tool found in BAM header of '{}'.\n\
\n\
RustQC requires that BAM files have duplicates marked (SAM flag 0x400)\n\
but NOT removed. The BAM @PG header lines do not contain evidence of a\n\
known duplicate-marking tool.\n\
\n\
Please run one of the following tools before using RustQC:\n\
\n\
- Picard MarkDuplicates: picard MarkDuplicates I=input.bam O=marked.bam M=metrics.txt\n\
- samblaster: samtools view -h input.bam | samblaster | samtools view -bS - > marked.bam\n\
- sambamba markdup: sambamba markdup input.bam marked.bam\n\
\n\
If you are certain that duplicates are already marked, use --skip-dup-check to bypass this check.",
bam_path
)
}
use crate::rna::bam_flags::*;
#[derive(Debug, Clone, Default)]
pub struct GeneCounts {
pub all_multi: u64,
pub nodup_multi: u64,
pub all_unique: u64,
pub nodup_unique: u64,
pub fc_reads: u64,
}
impl GeneCounts {
fn count_fragment(&mut self, is_dup: bool, is_multi: bool) {
self.all_multi += 1;
if !is_dup {
self.nodup_multi += 1;
}
if !is_multi {
self.all_unique += 1;
if !is_dup {
self.nodup_unique += 1;
}
}
}
}
fn merge_gene_hits(a: &[GeneIdx], b: &[GeneIdx]) -> Vec<GeneIdx> {
if a.is_empty() {
return b.to_vec();
}
if b.is_empty() {
return a.to_vec();
}
let mut both = Vec::new();
let mut ai = 0;
let mut bi = 0;
while ai < a.len() && bi < b.len() {
match a[ai].cmp(&b[bi]) {
std::cmp::Ordering::Equal => {
both.push(a[ai]);
ai += 1;
bi += 1;
}
std::cmp::Ordering::Less => ai += 1,
std::cmp::Ordering::Greater => bi += 1,
}
}
if !both.is_empty() {
return both;
}
let mut union = Vec::with_capacity(a.len() + b.len());
ai = 0;
bi = 0;
while ai < a.len() && bi < b.len() {
match a[ai].cmp(&b[bi]) {
std::cmp::Ordering::Less => {
union.push(a[ai]);
ai += 1;
}
std::cmp::Ordering::Greater => {
union.push(b[bi]);
bi += 1;
}
std::cmp::Ordering::Equal => {
union.push(a[ai]);
ai += 1;
bi += 1;
}
}
}
union.extend_from_slice(&a[ai..]);
union.extend_from_slice(&b[bi..]);
union
}
fn assign_fragment_to_gene(
gene_hits: &[GeneIdx],
gene_counts: &mut [GeneCounts],
is_dup: bool,
is_multi: bool,
) -> bool {
if gene_hits.len() == 1 {
let idx = gene_hits[0] as usize;
if idx < gene_counts.len() {
gene_counts[idx].count_fragment(is_dup, is_multi);
return true;
}
}
false
}
#[derive(Debug)]
pub struct CountResult {
pub gene_counts: IndexMap<String, GeneCounts>,
pub stat_total_reads: u64,
pub stat_assigned: u64,
pub stat_ambiguous: u64,
pub stat_no_features: u64,
pub stat_total_fragments: u64,
pub stat_total_mapped: u64,
pub stat_total_dup: u64,
pub stat_singleton_unmapped_mates: u64,
pub fc_assigned: u64,
pub fc_ambiguous: u64,
pub fc_no_features: u64,
pub fc_multimapping: u64,
pub fc_unmapped: u64,
pub fc_singleton: u64,
pub fc_chimera: u64,
pub biotype_reads: Vec<u64>,
pub biotype_names: Vec<String>,
pub fc_biotype_assigned: u64,
pub fc_biotype_ambiguous: u64,
pub fc_biotype_no_features: u64,
pub rseqc: Option<RseqcAccumulators>,
#[allow(dead_code)]
pub qualimap: Option<crate::rna::qualimap::QualimapResult>,
}
#[derive(Debug, Clone, Copy, Default)]
struct IvMeta {
gene_idx: GeneIdx,
strand: char,
}
type ChromIndex = COITree<IvMeta, u32>;
fn build_index(
genes: &IndexMap<String, Gene>,
interner: &GeneIdInterner,
) -> HashMap<String, ChromIndex> {
let mut chrom_intervals: HashMap<String, Vec<Interval<IvMeta>>> = HashMap::new();
for gene in genes.values() {
let gene_idx = interner
.get(&gene.gene_id)
.expect("gene must be in interner");
for exon in &gene.exons {
let iv = Interval::new(
(exon.start - 1) as i32,
(exon.end - 1) as i32,
IvMeta {
gene_idx,
strand: exon.strand,
},
);
chrom_intervals
.entry(exon.chrom.clone())
.or_default()
.push(iv);
}
}
chrom_intervals
.into_iter()
.map(|(chrom, intervals)| (chrom, COITree::new(&intervals)))
.collect()
}
fn strand_matches(
read_reverse: bool,
is_read1: bool,
paired: bool,
gene_strand: char,
stranded: Strandedness,
) -> bool {
if stranded == Strandedness::Unstranded || gene_strand == '.' {
return true; }
let read_on_plus = !read_reverse;
let effective_plus = if paired {
if is_read1 {
read_on_plus
} else {
!read_on_plus }
} else {
read_on_plus
};
match stranded {
Strandedness::Forward => {
(effective_plus && gene_strand == '+') || (!effective_plus && gene_strand == '-')
}
Strandedness::Reverse => {
(!effective_plus && gene_strand == '+') || (effective_plus && gene_strand == '-')
}
Strandedness::Unstranded => true,
}
}
fn cigar_to_aligned_blocks(
start: u64,
cigar: &rust_htslib::bam::record::CigarStringView,
blocks: &mut Vec<(u64, u64)>,
) {
use rust_htslib::bam::record::Cigar;
blocks.clear();
let mut ref_pos = start;
for op in cigar.iter() {
match op {
Cigar::Match(len) | Cigar::Equal(len) | Cigar::Diff(len) => {
let len = *len as u64;
blocks.push((ref_pos, ref_pos + len));
ref_pos += len;
}
Cigar::RefSkip(len) | Cigar::Del(len) => {
ref_pos += *len as u64;
}
Cigar::Ins(_) | Cigar::SoftClip(_) | Cigar::HardClip(_) | Cigar::Pad(_) => {}
}
}
}
#[derive(Debug)]
struct MateInfo {
gene_hits: Vec<GeneIdx>,
is_dup: bool,
is_multi: bool,
is_read1: bool,
}
type MateBufferKey = (u64, i32, i64, i32, i64, i32);
#[inline(always)]
fn hash_qname(qname: &[u8]) -> u64 {
crate::io::fnv1a(qname)
}
#[derive(Debug)]
struct ChromResult {
gene_counts: Vec<GeneCounts>,
unmatched_mates: HashMap<MateBufferKey, MateInfo>,
total_reads: u64,
total_mapped: u64,
total_dup: u64,
singleton_unmapped_mates: u64,
total_multi: u64,
total_fragments: u64,
stat_assigned: u64,
stat_ambiguous: u64,
stat_no_features: u64,
n_multi_dup: u64,
n_multi_nodup: u64,
n_unique_dup: u64,
n_unique_nodup: u64,
fc_assigned: u64,
fc_ambiguous: u64,
fc_no_features: u64,
fc_multimapping: u64,
fc_unmapped: u64,
fc_singleton: u64,
fc_chimera: u64,
biotype_reads: Vec<u64>,
fc_biotype_assigned: u64,
fc_biotype_ambiguous: u64,
fc_biotype_no_features: u64,
qualimap: Option<QualimapAccum>,
}
impl ChromResult {
fn new(num_genes: usize, num_biotypes: usize) -> Self {
ChromResult {
gene_counts: vec![GeneCounts::default(); num_genes],
unmatched_mates: HashMap::new(),
total_reads: 0,
total_mapped: 0,
total_dup: 0,
singleton_unmapped_mates: 0,
total_multi: 0,
total_fragments: 0,
stat_assigned: 0,
stat_ambiguous: 0,
stat_no_features: 0,
n_multi_dup: 0,
n_multi_nodup: 0,
n_unique_dup: 0,
n_unique_nodup: 0,
fc_assigned: 0,
fc_ambiguous: 0,
fc_no_features: 0,
fc_multimapping: 0,
fc_unmapped: 0,
fc_singleton: 0,
fc_chimera: 0,
biotype_reads: vec![0u64; num_biotypes],
fc_biotype_assigned: 0,
fc_biotype_ambiguous: 0,
fc_biotype_no_features: 0,
qualimap: None,
}
}
fn merge(&mut self, other: ChromResult) {
for (i, counts) in other.gene_counts.into_iter().enumerate() {
self.gene_counts[i].all_multi += counts.all_multi;
self.gene_counts[i].nodup_multi += counts.nodup_multi;
self.gene_counts[i].all_unique += counts.all_unique;
self.gene_counts[i].nodup_unique += counts.nodup_unique;
self.gene_counts[i].fc_reads += counts.fc_reads;
}
for (key, other_info) in other.unmatched_mates {
if let Some(self_info) = self.unmatched_mates.remove(&key) {
if self_info.is_read1 != other_info.is_read1 {
self.total_fragments += 1;
let frag_is_dup = self_info.is_dup;
let frag_is_multi = self_info.is_multi;
self.n_multi_dup += 1;
self.n_unique_dup += 1;
if !frag_is_dup {
self.n_multi_nodup += 1;
self.n_unique_nodup += 1;
}
let combined_genes =
merge_gene_hits(&self_info.gene_hits, &other_info.gene_hits);
if combined_genes.is_empty() {
self.stat_no_features += 1;
} else if combined_genes.len() > 1 {
self.stat_ambiguous += 1;
} else if assign_fragment_to_gene(
&combined_genes,
&mut self.gene_counts,
frag_is_dup,
frag_is_multi,
) {
self.stat_assigned += 1;
}
} else {
self.unmatched_mates.insert(key, self_info);
}
} else {
self.unmatched_mates.insert(key, other_info);
}
}
self.total_reads += other.total_reads;
self.total_mapped += other.total_mapped;
self.total_dup += other.total_dup;
self.singleton_unmapped_mates += other.singleton_unmapped_mates;
self.total_multi += other.total_multi;
self.total_fragments += other.total_fragments;
self.stat_assigned += other.stat_assigned;
self.stat_ambiguous += other.stat_ambiguous;
self.stat_no_features += other.stat_no_features;
self.n_multi_dup += other.n_multi_dup;
self.n_multi_nodup += other.n_multi_nodup;
self.n_unique_dup += other.n_unique_dup;
self.n_unique_nodup += other.n_unique_nodup;
self.fc_assigned += other.fc_assigned;
self.fc_ambiguous += other.fc_ambiguous;
self.fc_no_features += other.fc_no_features;
self.fc_multimapping += other.fc_multimapping;
self.fc_unmapped += other.fc_unmapped;
self.fc_singleton += other.fc_singleton;
self.fc_chimera += other.fc_chimera;
for (i, &count) in other.biotype_reads.iter().enumerate() {
self.biotype_reads[i] += count;
}
self.fc_biotype_assigned += other.fc_biotype_assigned;
self.fc_biotype_ambiguous += other.fc_biotype_ambiguous;
self.fc_biotype_no_features += other.fc_biotype_no_features;
if let Some(other_qm) = other.qualimap {
if let Some(ref mut self_qm) = self.qualimap {
self_qm.merge(other_qm);
} else {
self.qualimap = Some(other_qm);
}
}
}
}
fn classify_read_fc(is_multi: bool, gene_hits: &[GeneIdx], result: &mut ChromResult) {
if is_multi {
result.fc_multimapping += 1;
} else if gene_hits.is_empty() {
result.fc_no_features += 1;
} else if gene_hits.len() == 1 {
result.fc_assigned += 1;
let idx = gene_hits[0] as usize;
if idx < result.gene_counts.len() {
result.gene_counts[idx].fc_reads += 1;
}
} else {
result.fc_ambiguous += 1;
}
}
fn classify_read_fc_biotype(
is_multi: bool,
gene_hits: &[GeneIdx],
gene_to_biotype: &[u16],
biotype_hits_buf: &mut Vec<u16>,
result: &mut ChromResult,
) {
if is_multi {
return;
}
if gene_hits.is_empty() {
result.fc_biotype_no_features += 1;
return;
}
biotype_hits_buf.clear();
let mut unknown_gene_count: u32 = 0;
for &gidx in gene_hits {
let bidx = gene_to_biotype[gidx as usize];
if bidx != u16::MAX {
biotype_hits_buf.push(bidx);
} else {
unknown_gene_count += 1;
}
}
biotype_hits_buf.sort_unstable();
biotype_hits_buf.dedup();
let total_meta_features = biotype_hits_buf.len() as u32 + unknown_gene_count;
if total_meta_features == 1 {
if let Some(&bidx) = biotype_hits_buf.first() {
let idx = bidx as usize;
if idx < result.biotype_reads.len() {
result.biotype_reads[idx] += 1;
result.fc_biotype_assigned += 1;
}
} else {
result.fc_biotype_assigned += 1;
}
} else {
result.fc_biotype_ambiguous += 1;
}
}
#[inline]
#[allow(clippy::too_many_arguments)]
fn process_counting_record(
record: &bam::Record,
result: &mut ChromResult,
index: &HashMap<String, ChromIndex>,
chrom: &str,
stranded: Strandedness,
paired: bool,
gene_to_biotype: &[u16],
aligned_blocks_buf: &mut Vec<(u64, u64)>,
gene_hits: &mut Vec<GeneIdx>,
biotype_hits_buf: &mut Vec<u16>,
mate_buffer: &mut HashMap<MateBufferKey, MateInfo>,
) {
let flags = record.flags();
if flags & BAM_FUNMAP != 0 {
result.fc_unmapped += 1;
if paired && flags & BAM_FMUNMAP == 0 {
result.singleton_unmapped_mates += 1;
}
return;
}
if flags & BAM_FSUPPLEMENTARY != 0 {
return;
}
if flags & BAM_FQCFAIL != 0 {
return;
}
if paired && flags & BAM_FPAIRED == 0 {
return;
}
result.total_mapped += 1;
let is_dup = flags & BAM_FDUP != 0;
if is_dup {
result.total_dup += 1;
}
let is_multi = crate::rna::bam_flags::get_aux_int(record, b"NH").is_some_and(|nh| nh > 1);
if is_multi {
result.total_multi += 1;
}
let is_reverse = flags & BAM_FREVERSE != 0;
let is_read1 = flags & BAM_FREAD1 != 0;
gene_hits.clear();
if let Some(chrom_idx) = index.get(chrom) {
cigar_to_aligned_blocks(record.pos() as u64, &record.cigar(), aligned_blocks_buf);
for &(block_start, block_end) in aligned_blocks_buf.iter() {
chrom_idx.query(block_start as i32, (block_end - 1) as i32, |node| {
let meta = node.metadata;
if strand_matches(is_reverse, is_read1, paired, meta.strand, stranded) {
gene_hits.push(meta.gene_idx);
}
});
}
gene_hits.sort_unstable();
gene_hits.dedup();
}
classify_read_fc(is_multi, gene_hits, result);
classify_read_fc_biotype(
is_multi,
gene_hits,
gene_to_biotype,
biotype_hits_buf,
result,
);
if !paired {
result.n_multi_dup += 1;
result.n_unique_dup += 1;
if !is_dup {
result.n_multi_nodup += 1;
result.n_unique_nodup += 1;
}
result.total_fragments += 1;
if gene_hits.is_empty() {
result.stat_no_features += 1;
} else if gene_hits.len() > 1 {
result.stat_ambiguous += 1;
} else if assign_fragment_to_gene(gene_hits, &mut result.gene_counts, is_dup, is_multi) {
result.stat_assigned += 1;
}
return;
}
let qname_hash = hash_qname(record.qname());
let own_pos = record.pos();
let own_tid = record.tid();
let mate_pos_val = record.mpos();
let mate_tid = record.mtid();
let hi_tag: i32 = crate::rna::bam_flags::get_aux_int(record, b"HI").map_or(-1, |v| v as i32);
let buffer_key: MateBufferKey = if is_read1 {
(qname_hash, own_tid, own_pos, mate_tid, mate_pos_val, hi_tag)
} else {
(qname_hash, mate_tid, mate_pos_val, own_tid, own_pos, hi_tag)
};
let matched = mate_buffer.remove(&buffer_key).and_then(|mate_info| {
if mate_info.is_read1 != is_read1 {
Some(mate_info)
} else {
mate_buffer.insert(buffer_key, mate_info);
None
}
});
if let Some(mate_info) = matched {
result.total_fragments += 1;
let frag_is_dup = if is_read1 { is_dup } else { mate_info.is_dup };
let frag_is_multi = if is_read1 {
is_multi
} else {
mate_info.is_multi
};
result.n_multi_dup += 1;
result.n_unique_dup += 1;
if !frag_is_dup {
result.n_multi_nodup += 1;
result.n_unique_nodup += 1;
}
let combined_genes: Vec<GeneIdx> = merge_gene_hits(&mate_info.gene_hits, gene_hits);
if combined_genes.is_empty() {
result.stat_no_features += 1;
} else if combined_genes.len() > 1 {
result.stat_ambiguous += 1;
} else if assign_fragment_to_gene(
&combined_genes,
&mut result.gene_counts,
frag_is_dup,
frag_is_multi,
) {
result.stat_assigned += 1;
}
} else {
mate_buffer.insert(
buffer_key,
MateInfo {
gene_hits: std::mem::take(gene_hits),
is_dup,
is_multi,
is_read1,
},
);
}
}
#[allow(clippy::too_many_arguments)]
fn process_chromosome_batch(
bam_path: &str,
tids: &[u32],
tid_to_name: &[String],
index: &HashMap<String, ChromIndex>,
num_genes: usize,
stranded: Strandedness,
paired: bool,
chrom_mapping: &HashMap<String, String>,
chrom_prefix: Option<&str>,
global_read_counter: &AtomicU64,
reference: Option<&str>,
rseqc_config: Option<&RseqcConfig>,
rseqc_annotations: Option<&RseqcAnnotations>,
htslib_threads: usize,
qualimap_index: Option<&crate::rna::qualimap::QualimapIndex>,
gene_to_biotype: &[u16],
num_biotypes: usize,
progress: Option<&ProgressBar>,
) -> Result<(ChromResult, Option<RseqcAccumulators>)> {
let mut result = ChromResult::new(num_genes, num_biotypes);
if qualimap_index.is_some() {
result.qualimap = Some(crate::rna::qualimap::QualimapAccum::new(stranded));
}
let mut rseqc_accums = rseqc_config.map(|cfg| RseqcAccumulators::new(cfg, rseqc_annotations));
let tid_to_gtf_chrom: Vec<String> = tid_to_name
.iter()
.map(|bam_name| {
if let Some(mapped) = chrom_mapping.get(bam_name.as_str()) {
mapped.clone()
} else if let Some(prefix) = chrom_prefix {
format!("{}{}", prefix, bam_name)
} else {
bam_name.clone()
}
})
.collect();
let tid_to_rseqc_chrom = &tid_to_gtf_chrom;
let tid_to_rseqc_chrom_upper: Vec<String> =
tid_to_gtf_chrom.iter().map(|s| s.to_uppercase()).collect();
let mut bam = bam::IndexedReader::from_path(bam_path)
.with_context(|| format!("Failed to open indexed alignment file: {}", bam_path))?;
if let Some(ref_path) = reference {
bam.set_reference(ref_path)
.with_context(|| format!("Failed to set reference FASTA: {}", ref_path))?;
}
if htslib_threads > 0 {
bam.set_threads(htslib_threads)
.context("Failed to set htslib decompression threads")?;
}
let mut aligned_blocks_buf: Vec<(u64, u64)> = Vec::new();
let mut gene_hits: Vec<GeneIdx> = Vec::new();
let mut biotype_hits_buf: Vec<u16> = Vec::new();
let mut mate_buffer: HashMap<MateBufferKey, MateInfo> = HashMap::new();
let mut record = bam::Record::new();
for &tid in tids {
bam.fetch(tid)
.with_context(|| format!("Failed to seek to tid {}", tid))?;
while let Some(read_result) = bam.read(&mut record) {
read_result.context("Error reading alignment record")?;
result.total_reads += 1;
let prev = global_read_counter.fetch_add(1, Ordering::Relaxed);
if (prev + 1).is_multiple_of(500_000) {
if let Some(pb) = progress {
pb.set_message(format!("{} reads processed", format_count(prev + 1)));
}
}
if let (Some(ref mut accums), Some(annots), Some(cfg)) =
(&mut rseqc_accums, rseqc_annotations, rseqc_config)
{
let rec_tid = record.tid();
let (chrom, chrom_upper) =
if rec_tid >= 0 && (rec_tid as usize) < tid_to_rseqc_chrom.len() {
(
tid_to_rseqc_chrom[rec_tid as usize].as_str(),
tid_to_rseqc_chrom_upper[rec_tid as usize].as_str(),
)
} else {
("", "")
};
accums.process_read(&record, chrom, chrom_upper, annots, cfg);
}
if let (Some(ref mut qm), Some(qm_index)) = (&mut result.qualimap, qualimap_index) {
let tid = record.tid();
if tid >= 0 && (tid as usize) < tid_to_gtf_chrom.len() {
let qm_chrom = &tid_to_gtf_chrom[tid as usize];
qm.process_read(&record, qm_chrom, qm_index);
}
}
let rec_tid = record.tid();
let chrom = if rec_tid >= 0 && (rec_tid as usize) < tid_to_gtf_chrom.len() {
tid_to_gtf_chrom[rec_tid as usize].as_str()
} else {
""
};
process_counting_record(
&record,
&mut result,
index,
chrom,
stranded,
paired,
gene_to_biotype,
&mut aligned_blocks_buf,
&mut gene_hits,
&mut biotype_hits_buf,
&mut mate_buffer,
);
}
}
result.unmatched_mates = mate_buffer;
Ok((result, rseqc_accums))
}
fn partition_chromosomes(lengths: &[u64], num_workers: usize) -> Vec<Vec<u32>> {
let mut batches: Vec<Vec<u32>> = vec![Vec::new(); num_workers];
let mut loads: Vec<u64> = vec![0; num_workers];
let mut order: Vec<u32> = (0..lengths.len() as u32).collect();
order.sort_unstable_by(|&a, &b| lengths[b as usize].cmp(&lengths[a as usize]));
for tid in order {
let min_worker = loads
.iter()
.enumerate()
.min_by_key(|&(_, &load)| load)
.map(|(i, _)| i)
.unwrap_or(0);
batches[min_worker].push(tid);
loads[min_worker] += lengths[tid as usize];
}
batches
}
#[allow(clippy::too_many_arguments)]
pub fn count_reads(
bam_path: &str,
genes: &IndexMap<String, Gene>,
stranded: Strandedness,
paired: bool,
threads: usize,
chrom_mapping: &HashMap<String, String>,
chrom_prefix: Option<&str>,
reference: Option<&str>,
skip_dup_check: bool,
biotype_attribute: &str,
rseqc_config: Option<&RseqcConfig>,
rseqc_annotations: Option<&RseqcAnnotations>,
qualimap_index: Option<&crate::rna::qualimap::QualimapIndex>,
progress: Option<&ProgressBar>,
) -> Result<CountResult> {
let interner = GeneIdInterner::from_genes(genes);
let index = build_index(genes, &interner);
let (tid_to_name, tid_to_len): (Vec<String>, Vec<u64>) = {
let mut bam = bam::Reader::from_path(bam_path)
.with_context(|| format!("Failed to open alignment file: {}", bam_path))?;
if let Some(ref_path) = reference {
bam.set_reference(ref_path)
.with_context(|| format!("Failed to set reference FASTA: {}", ref_path))?;
}
let header = bam.header().clone();
if skip_dup_check {
log::info!("Skipping duplicate-marking verification (--skip-dup-check)");
} else {
verify_duplicates_marked(&header, bam_path)?;
}
let names = (0..header.target_count())
.map(|tid| String::from_utf8_lossy(header.tid2name(tid)).to_string())
.collect();
let lengths = (0..header.target_count())
.map(|tid| header.target_len(tid).unwrap_or(0))
.collect();
(names, lengths)
};
let use_parallel = threads > 1 && {
let idx_check = bam::IndexedReader::from_path(bam_path);
if let Ok(mut reader) = idx_check {
if let Some(ref_path) = reference {
reader.set_reference(ref_path).is_ok()
} else {
true
}
} else {
false
}
};
if threads > 1 && !use_parallel {
warn!(
"Alignment index not found for {}; falling back to single-threaded processing. \
Create an index with 'samtools index' to enable parallel processing.",
bam_path
);
}
let mut biotype_names: Vec<String> = Vec::new();
let mut biotype_name_to_idx: HashMap<String, u16> = HashMap::new();
let gene_to_biotype: Vec<u16> = genes
.values()
.map(|gene| {
if let Some(bt) = gene.attributes.get(biotype_attribute) {
let next_idx = biotype_names.len() as u16;
*biotype_name_to_idx.entry(bt.clone()).or_insert_with(|| {
biotype_names.push(bt.clone());
next_idx
})
} else {
u16::MAX }
})
.collect();
let num_biotypes = biotype_names.len();
let (mut merged, mut merged_rseqc) = if use_parallel {
let num_chroms = tid_to_name.len();
let num_workers = threads.min(num_chroms).max(1);
let batches = partition_chromosomes(&tid_to_len, num_workers);
if log::log_enabled!(log::Level::Debug) {
let worker_loads: Vec<u64> = batches
.iter()
.map(|b| b.iter().map(|&tid| tid_to_len[tid as usize]).sum())
.collect();
debug!(
"Chromosome load distribution across {} workers: {:?}",
num_workers, worker_loads
);
}
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(num_workers)
.build()
.context("Failed to build rayon thread pool")?;
let global_read_counter = AtomicU64::new(0);
debug!(
"Processing {} chromosomes across {} threads",
num_chroms, num_workers
);
let htslib_threads = if num_workers > 0 {
((threads.saturating_sub(num_workers)) / num_workers).max(1)
} else {
0
};
let results: Vec<Result<(ChromResult, Option<RseqcAccumulators>)>> = pool.install(|| {
batches
.par_iter()
.map(|batch| {
process_chromosome_batch(
bam_path,
batch,
&tid_to_name,
&index,
interner.len(),
stranded,
paired,
chrom_mapping,
chrom_prefix,
&global_read_counter,
reference,
rseqc_config,
rseqc_annotations,
htslib_threads,
qualimap_index,
&gene_to_biotype,
num_biotypes,
progress,
)
})
.collect()
});
let mut merged = ChromResult::new(interner.len(), num_biotypes);
let mut merged_rseqc: Option<RseqcAccumulators> =
rseqc_config.map(|cfg| RseqcAccumulators::new(cfg, rseqc_annotations));
for result in results {
let (chrom_result, rseqc_result) = result?;
merged.merge(chrom_result);
if let (Some(ref mut merged_acc), Some(worker_acc)) = (&mut merged_rseqc, rseqc_result)
{
merged_acc.merge(worker_acc);
}
}
(merged, merged_rseqc)
} else {
let global_read_counter = AtomicU64::new(0);
let mut result = ChromResult::new(interner.len(), num_biotypes);
let mut rseqc_accums: Option<RseqcAccumulators> =
rseqc_config.map(|cfg| RseqcAccumulators::new(cfg, rseqc_annotations));
let mut qualimap_accum: Option<crate::rna::qualimap::QualimapAccum> =
qualimap_index.map(|_| crate::rna::qualimap::QualimapAccum::new(stranded));
let tid_to_rseqc_chrom: Vec<String> = tid_to_name
.iter()
.map(|bam_name| {
if let Some(mapped) = chrom_mapping.get(bam_name.as_str()) {
mapped.clone()
} else if let Some(prefix) = chrom_prefix {
format!("{}{}", prefix, bam_name)
} else {
bam_name.clone()
}
})
.collect();
let tid_to_rseqc_chrom_upper: Vec<String> = tid_to_rseqc_chrom
.iter()
.map(|s| s.to_uppercase())
.collect();
let mut bam = bam::Reader::from_path(bam_path)
.with_context(|| format!("Failed to open alignment file: {}", bam_path))?;
if let Some(ref_path) = reference {
bam.set_reference(ref_path)
.with_context(|| format!("Failed to set reference FASTA: {}", ref_path))?;
}
if threads > 1 {
bam.set_threads(threads.saturating_sub(1))
.context("Failed to set htslib decompression threads")?;
}
let mut aligned_blocks_buf: Vec<(u64, u64)> = Vec::new();
let mut gene_hits: Vec<GeneIdx> = Vec::new();
let mut biotype_hits_buf: Vec<u16> = Vec::new();
let mut mate_buffer: HashMap<MateBufferKey, MateInfo> = HashMap::new();
let mut record = bam::Record::new();
while let Some(read_result) = bam.read(&mut record) {
read_result.context("Error reading alignment record")?;
result.total_reads += 1;
let prev = global_read_counter.fetch_add(1, Ordering::Relaxed);
if (prev + 1).is_multiple_of(500_000) {
if let Some(pb) = progress {
pb.set_message(format!("{} reads processed", format_count(prev + 1)));
}
}
let flags = record.flags();
if let (Some(ref mut accums), Some(annots), Some(cfg)) =
(&mut rseqc_accums, rseqc_annotations, rseqc_config)
{
let tid = record.tid();
let (chrom, chrom_upper) = if tid >= 0 && (tid as usize) < tid_to_rseqc_chrom.len()
{
(
tid_to_rseqc_chrom[tid as usize].as_str(),
tid_to_rseqc_chrom_upper[tid as usize].as_str(),
)
} else {
("", "")
};
accums.process_read(&record, chrom, chrom_upper, annots, cfg);
}
if let (Some(ref mut qm_accum), Some(qm_index)) = (&mut qualimap_accum, qualimap_index)
{
let tid = record.tid();
if tid >= 0 && (tid as usize) < tid_to_rseqc_chrom.len() {
let chrom = &tid_to_rseqc_chrom[tid as usize];
qm_accum.process_read(&record, chrom, qm_index);
} else if flags & BAM_FUNMAP != 0 {
qm_accum.counters.not_aligned += 1;
}
}
let tid = record.tid();
let chrom = if tid >= 0 && (tid as usize) < tid_to_rseqc_chrom.len() {
tid_to_rseqc_chrom[tid as usize].as_str()
} else {
""
};
process_counting_record(
&record,
&mut result,
&index,
chrom,
stranded,
paired,
&gene_to_biotype,
&mut aligned_blocks_buf,
&mut gene_hits,
&mut biotype_hits_buf,
&mut mate_buffer,
);
}
result.unmatched_mates = mate_buffer;
result.qualimap = qualimap_accum;
(result, rseqc_accums)
};
if use_parallel {
let mut bam = bam::IndexedReader::from_path(bam_path).with_context(|| {
format!(
"Failed to open indexed BAM for unmapped sweep: {}",
bam_path
)
})?;
if let Some(ref_path) = reference {
bam.set_reference(ref_path)
.with_context(|| format!("Failed to set reference FASTA: {}", ref_path))?;
}
bam.fetch(FetchDefinition::Unmapped)
.context("Failed to seek to unmapped segment")?;
let mut record = bam::Record::new();
let mut unmapped_count: u64 = 0;
while let Some(read_result) = bam.read(&mut record) {
read_result.context("Error reading unmapped record")?;
unmapped_count += 1;
merged.total_reads += 1;
if let (Some(ref mut accums), Some(annots), Some(cfg)) =
(&mut merged_rseqc, rseqc_annotations, rseqc_config)
{
accums.process_read(&record, "", "", annots, cfg);
}
if let Some(ref mut qm) = merged.qualimap {
let tid = record.tid();
if tid >= 0 {
if let Some(qm_index) = qualimap_index {
if (tid as usize) < tid_to_name.len() {
qm.process_read(&record, &tid_to_name[tid as usize], qm_index);
}
}
} else if record.flags() & BAM_FUNMAP != 0 {
qm.counters.not_aligned += 1;
}
}
let flags = record.flags();
if flags & BAM_FUNMAP != 0 {
merged.fc_unmapped += 1;
if paired && flags & BAM_FMUNMAP == 0 {
merged.singleton_unmapped_mates += 1;
}
}
}
if unmapped_count > 0 {
debug!(
"Parallel unmapped sweep: processed {} unmapped reads",
unmapped_count
);
}
}
if paired && !merged.unmatched_mates.is_empty() {
let unmatched: Vec<(MateBufferKey, MateInfo)> = merged.unmatched_mates.drain().collect();
let mut still_unmatched: HashMap<MateBufferKey, MateInfo> = HashMap::new();
for (key, info) in unmatched {
if let Some(mate_info) = still_unmatched.remove(&key) {
if info.is_read1 == mate_info.is_read1 {
still_unmatched.insert(key, mate_info);
continue;
}
merged.total_fragments += 1;
let frag_is_dup = if info.is_read1 {
info.is_dup
} else {
mate_info.is_dup
};
let frag_is_multi = if info.is_read1 {
info.is_multi
} else {
mate_info.is_multi
};
merged.n_multi_dup += 1;
merged.n_unique_dup += 1;
if !frag_is_dup {
merged.n_multi_nodup += 1;
merged.n_unique_nodup += 1;
}
let combined_genes: Vec<GeneIdx> =
merge_gene_hits(&mate_info.gene_hits, &info.gene_hits);
if combined_genes.is_empty() {
merged.stat_no_features += 1;
} else if combined_genes.len() > 1 {
merged.stat_ambiguous += 1;
} else if assign_fragment_to_gene(
&combined_genes,
&mut merged.gene_counts,
frag_is_dup,
frag_is_multi,
) {
merged.stat_assigned += 1;
}
} else {
still_unmatched.insert(key, info);
}
}
for (_key, mate_info) in still_unmatched.drain() {
merged.total_fragments += 1;
merged.n_multi_dup += 1;
merged.n_unique_dup += 1;
if !mate_info.is_dup {
merged.n_multi_nodup += 1;
merged.n_unique_nodup += 1;
}
if mate_info.gene_hits.is_empty() {
merged.stat_no_features += 1;
} else if mate_info.gene_hits.len() > 1 {
merged.stat_ambiguous += 1;
} else if assign_fragment_to_gene(
&mate_info.gene_hits,
&mut merged.gene_counts,
mate_info.is_dup,
mate_info.is_multi,
) {
merged.stat_assigned += 1;
}
}
}
debug!(
"Read {} total reads, {} mapped, {} fragments, {} duplicates, {} multimappers",
merged.total_reads,
merged.total_mapped,
merged.total_fragments,
merged.total_dup,
merged.total_multi
);
debug!(
"Assignment stats: {} assigned, {} ambiguous, {} no_features (total fragments: {})",
merged.stat_assigned,
merged.stat_ambiguous,
merged.stat_no_features,
merged.total_fragments
);
if merged.total_dup == 0 && merged.total_mapped > 0 && !skip_dup_check {
anyhow::bail!(
"No duplicate-flagged reads found among {} mapped reads in '{}'.\n\
\n\
Although the BAM header suggests a duplicate-marking tool was run,\n\
no reads have the duplicate flag (0x400) set. This likely means\n\
duplicates were removed rather than marked, or the tool did not\n\
flag any duplicates.\n\
\n\
RustQC requires duplicates to be marked (flagged) but NOT removed.\n\
Please re-run your duplicate-marking tool without removing duplicates.\n\
\n\
If this is expected, use --skip-dup-check to bypass this check.",
merged.total_mapped,
bam_path
);
}
let genes_with_reads = merged
.gene_counts
.iter()
.filter(|c| c.all_multi > 0)
.count();
if merged.total_mapped > 0 && genes_with_reads == 0 {
let bam_chroms: Vec<&str> = tid_to_name.iter().take(5).map(|s| s.as_str()).collect();
let gtf_chroms: Vec<&str> = index.keys().take(5).map(|s| s.as_str()).collect();
anyhow::bail!(
"Chromosome name mismatch: no reads could be assigned to any gene.\n\
\n\
Alignment chromosomes (first 5): {}\n\
GTF chromosomes (first 5): {}\n\
\n\
The alignment and GTF files appear to use different chromosome naming conventions.\n\
To fix this, create a YAML config file with a chromosome_mapping section and pass it via --config:\n\
\n\
Example config.yaml:\n\
\n\
chromosome_mapping:\n\
{}",
bam_chroms.join(", "),
gtf_chroms.join(", "),
bam_chroms
.iter()
.zip(gtf_chroms.iter())
.map(|(b, g)| format!(" {}: {}", g, b))
.collect::<Vec<_>>()
.join("\n")
);
}
let gene_counts_map: IndexMap<String, GeneCounts> = (0..interner.len())
.map(|i| {
let name = interner.name(i as GeneIdx).to_string();
let counts = std::mem::take(&mut merged.gene_counts[i]);
(name, counts)
})
.collect();
Ok(CountResult {
gene_counts: gene_counts_map,
stat_total_reads: merged.total_reads,
stat_assigned: merged.stat_assigned,
stat_ambiguous: merged.stat_ambiguous,
stat_no_features: merged.stat_no_features,
stat_total_fragments: merged.total_fragments,
stat_total_mapped: merged.total_mapped,
stat_total_dup: merged.total_dup,
stat_singleton_unmapped_mates: merged.singleton_unmapped_mates,
fc_assigned: merged.fc_assigned,
fc_ambiguous: merged.fc_ambiguous,
fc_no_features: merged.fc_no_features,
fc_multimapping: merged.fc_multimapping,
fc_unmapped: merged.fc_unmapped,
fc_singleton: merged.fc_singleton,
fc_chimera: merged.fc_chimera,
biotype_reads: merged.biotype_reads,
biotype_names,
fc_biotype_assigned: merged.fc_biotype_assigned,
fc_biotype_ambiguous: merged.fc_biotype_ambiguous,
fc_biotype_no_features: merged.fc_biotype_no_features,
rseqc: merged_rseqc,
qualimap: match (merged.qualimap, qualimap_index) {
(Some(mut a), Some(idx)) => {
a.flush_unpaired(idx);
Some(a.into_result())
}
_ => None,
},
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_strand_matching_unstranded() {
let u = Strandedness::Unstranded;
assert!(strand_matches(false, true, false, '+', u));
assert!(strand_matches(true, true, false, '+', u));
assert!(strand_matches(false, true, false, '-', u));
assert!(strand_matches(true, true, false, '-', u));
}
#[test]
fn test_strand_matching_forward_single() {
let f = Strandedness::Forward;
assert!(strand_matches(false, true, false, '+', f));
assert!(strand_matches(true, true, false, '-', f));
assert!(!strand_matches(false, true, false, '-', f));
assert!(!strand_matches(true, true, false, '+', f));
}
#[test]
fn test_strand_matching_reverse_single() {
let r = Strandedness::Reverse;
assert!(strand_matches(false, true, false, '-', r));
assert!(strand_matches(true, true, false, '+', r));
}
#[test]
fn test_strand_matching_forward_paired() {
let f = Strandedness::Forward;
assert!(strand_matches(false, true, true, '+', f));
assert!(strand_matches(false, false, true, '-', f));
assert!(strand_matches(true, true, true, '-', f));
assert!(strand_matches(true, false, true, '+', f));
}
#[test]
fn test_extract_header_tag() {
let line = "@PG\tID:MarkDuplicates\tPN:MarkDuplicates\tVN:2.27.4\tCL:picard MarkDuplicates I=in.bam O=out.bam";
assert_eq!(extract_header_tag(line, "ID"), Some("MarkDuplicates"));
assert_eq!(extract_header_tag(line, "PN"), Some("MarkDuplicates"));
assert_eq!(extract_header_tag(line, "VN"), Some("2.27.4"));
assert_eq!(
extract_header_tag(line, "CL"),
Some("picard MarkDuplicates I=in.bam O=out.bam")
);
assert_eq!(extract_header_tag(line, "XX"), None);
}
#[test]
fn test_dup_check_picard_markduplicates() {
let header = "@HD\tVN:1.6\tSO:coordinate\n\
@PG\tID:MarkDuplicates\tPN:MarkDuplicates\tVN:2.27.4";
assert!(header_has_dup_marker(header));
}
#[test]
fn test_dup_check_samblaster() {
let header = "@HD\tVN:1.6\n\
@PG\tID:samblaster\tPN:samblaster\tVN:0.1.26";
assert!(header_has_dup_marker(header));
}
#[test]
fn test_dup_check_sambamba_markdup() {
let header = "@HD\tVN:1.6\n\
@PG\tID:sambamba_markdup\tPN:sambamba markdup";
assert!(header_has_dup_marker(header));
}
#[test]
fn test_dup_check_biobambam() {
let header = "@HD\tVN:1.6\n\
@PG\tID:biobambam2\tPN:biobambam";
assert!(header_has_dup_marker(header));
}
#[test]
fn test_dup_check_case_insensitive() {
let header = "@PG\tID:MARKDUPLICATES\tPN:markduplicates";
assert!(header_has_dup_marker(header));
}
#[test]
fn test_dup_check_no_dup_marker() {
let header = "@HD\tVN:1.6\tSO:coordinate\n\
@PG\tID:bwa\tPN:bwa\tVN:0.7.17\n\
@PG\tID:samtools\tPN:samtools\tVN:1.17";
assert!(!header_has_dup_marker(header));
}
#[test]
fn test_dup_check_empty_header() {
assert!(!header_has_dup_marker(""));
assert!(!header_has_dup_marker("@HD\tVN:1.6\tSO:coordinate"));
}
#[test]
fn test_dup_check_picard_sortsam_no_false_positive() {
let header = "@PG\tID:SortSam\tPN:SortSam\tCL:picard SortSam I=in.bam O=out.bam";
assert!(!header_has_dup_marker(header));
}
#[test]
fn test_dup_check_picard_collect_metrics_no_false_positive() {
let header =
"@PG\tID:CollectInsertSizeMetrics\tPN:CollectInsertSizeMetrics\tCL:picard CollectInsertSizeMetrics";
assert!(!header_has_dup_marker(header));
}
#[test]
fn test_dup_check_sambamba_sort_no_false_positive() {
let header = "@PG\tID:sambamba\tPN:sambamba sort";
assert!(!header_has_dup_marker(header));
}
#[test]
fn test_dup_check_multiple_pg_lines() {
let header = "@HD\tVN:1.6\tSO:coordinate\n\
@PG\tID:bwa\tPN:bwa\tVN:0.7.17\n\
@PG\tID:samtools\tPN:samtools\tVN:1.17\n\
@PG\tID:MarkDuplicates\tPN:MarkDuplicates\tPP:samtools";
assert!(header_has_dup_marker(header));
}
#[test]
fn test_dup_check_dup_marker_only_in_cl_no_match() {
let header_text = "@PG\tID:STAR\tPN:STAR\tVN:2.7.10a\tCL:STAR --readFilesIn sample.fastq --outSAMtype BAM SortedByCoordinate\n";
assert!(!header_has_dup_marker(header_text));
}
fn make_test_chrom_result(num_genes: usize) -> ChromResult {
ChromResult::new(num_genes, 0)
}
#[test]
fn test_fc_classify_multimapped_read() {
let mut result = make_test_chrom_result(3);
let gene_hits: Vec<GeneIdx> = vec![0];
classify_read_fc(true, &gene_hits, &mut result);
assert_eq!(
result.fc_multimapping, 1,
"Multi-mapped read should be counted as fc_multimapping"
);
assert_eq!(result.fc_assigned, 0);
assert_eq!(result.fc_ambiguous, 0);
assert_eq!(result.fc_no_features, 0);
assert_eq!(
result.gene_counts[0].fc_reads, 0,
"Multi-mapped read should not credit any gene"
);
}
#[test]
fn test_fc_classify_no_gene_hits() {
let mut result = make_test_chrom_result(3);
let gene_hits: Vec<GeneIdx> = vec![];
classify_read_fc(false, &gene_hits, &mut result);
assert_eq!(
result.fc_no_features, 1,
"Read with no hits should be fc_no_features"
);
assert_eq!(result.fc_assigned, 0);
assert_eq!(result.fc_ambiguous, 0);
assert_eq!(result.fc_multimapping, 0);
}
#[test]
fn test_fc_classify_single_gene_hit() {
let mut result = make_test_chrom_result(3);
let gene_hits: Vec<GeneIdx> = vec![1];
classify_read_fc(false, &gene_hits, &mut result);
assert_eq!(
result.fc_assigned, 1,
"Single gene hit should be fc_assigned"
);
assert_eq!(
result.gene_counts[1].fc_reads, 1,
"Single gene hit should credit that gene"
);
assert_eq!(result.gene_counts[0].fc_reads, 0);
assert_eq!(result.gene_counts[2].fc_reads, 0);
assert_eq!(result.fc_ambiguous, 0);
assert_eq!(result.fc_no_features, 0);
}
#[test]
fn test_fc_classify_multi_gene_hits_ambiguous() {
let mut result = make_test_chrom_result(4);
let gene_hits: Vec<GeneIdx> = vec![0, 1];
classify_read_fc(false, &gene_hits, &mut result);
assert_eq!(
result.fc_ambiguous, 1,
"Multi-gene hits should be fc_ambiguous"
);
assert_eq!(result.fc_assigned, 0);
assert_eq!(
result.gene_counts[0].fc_reads, 0,
"Ambiguous read should not credit any gene"
);
assert_eq!(result.gene_counts[1].fc_reads, 0);
}
#[test]
fn test_fc_classify_three_gene_hits_ambiguous() {
let mut result = make_test_chrom_result(5);
let gene_hits: Vec<GeneIdx> = vec![0, 1, 2];
classify_read_fc(false, &gene_hits, &mut result);
assert_eq!(
result.fc_ambiguous, 1,
"Three-gene hits should be fc_ambiguous"
);
assert_eq!(result.fc_assigned, 0);
assert_eq!(result.gene_counts[0].fc_reads, 0);
assert_eq!(result.gene_counts[1].fc_reads, 0);
assert_eq!(result.gene_counts[2].fc_reads, 0);
}
#[test]
fn test_fc_classify_cumulative_counts() {
let mut result = make_test_chrom_result(3);
classify_read_fc(false, &[0], &mut result);
classify_read_fc(false, &[0], &mut result);
classify_read_fc(true, &[0], &mut result);
classify_read_fc(false, &[], &mut result);
classify_read_fc(false, &[0, 1], &mut result);
assert_eq!(result.fc_assigned, 2);
assert_eq!(result.fc_multimapping, 1);
assert_eq!(result.fc_no_features, 1);
assert_eq!(result.fc_ambiguous, 1);
assert_eq!(result.gene_counts[0].fc_reads, 2);
assert_eq!(result.gene_counts[1].fc_reads, 0);
}
#[test]
fn test_partition_chromosomes_balanced() {
let lengths = vec![
250, 243, 198, 191, 182, 171, 159, 146, 138, 133, 135, 130, 57,
];
let batches = partition_chromosomes(&lengths, 4);
assert_eq!(batches.len(), 4);
let mut all_tids: Vec<u32> = batches.iter().flatten().copied().collect();
all_tids.sort();
assert_eq!(all_tids, (0..13).collect::<Vec<u32>>());
let total: u64 = lengths.iter().sum();
let avg = total as f64 / 4.0;
for batch in &batches {
let load: u64 = batch.iter().map(|&tid| lengths[tid as usize]).sum();
assert!(
(load as f64) < avg * 1.3,
"Worker load {} exceeds 1.3× average {:.0}",
load,
avg
);
}
}
#[test]
fn test_partition_chromosomes_single_worker() {
let lengths = vec![100, 200, 300];
let batches = partition_chromosomes(&lengths, 1);
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].len(), 3);
}
#[test]
fn test_partition_chromosomes_more_workers_than_chroms() {
let lengths = vec![100, 200];
let batches = partition_chromosomes(&lengths, 4);
assert_eq!(batches.len(), 4);
let non_empty: usize = batches.iter().filter(|b| !b.is_empty()).count();
assert_eq!(non_empty, 2);
}
}