use crate::cli::Cli;
use crate::io::CoverageSummary;
use crate::utils::ReadStats;
use crossbeam_channel::unbounded;
use dashmap::DashMap;
use futures::stream::{FuturesUnordered, StreamExt};
use noodles_bam as bam;
use noodles_bgzf as bgzf;
use noodles_core::{Position, Region};
use std::collections::HashMap;
use std::fmt;
use std::path::Path;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Instant;
#[derive(Debug)]
struct PlotError(String);
impl std::error::Error for PlotError {}
impl fmt::Display for PlotError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Plot error: {}", self.0)
}
}
struct RegionCoverage {
coverage: HashMap<u32, u32>,
}
type RegionCoverageResult = Result<RegionCoverage, Box<dyn std::error::Error + Send + Sync>>;
type BamReader = bam::io::Reader<bgzf::io::Reader<std::io::BufReader<std::fs::File>>>;
#[hotpath::measure]
pub async fn run_coverage_async_pipeline(
cli: &Cli,
read_stats: Option<ReadStats>,
) -> Result<CoverageSummary, Box<dyn std::error::Error + Send + Sync>> {
let pipeline_timer = Instant::now();
println!("Starting async pipeline processing mode");
let num_tasks = cli
.async_tasks
.or(cli.threads)
.unwrap_or_else(|| std::cmp::max(2, num_cpus::get() / 2));
let buffer_size_kb = cli.io_buffer_size_kb.unwrap_or(64);
println!("Using {num_tasks} async tasks with {buffer_size_kb}KB I/O buffers");
let global_coverage: Arc<DashMap<String, HashMap<u32, u32>>> = Arc::new(DashMap::new());
let mut bed_regions = if let Some(bed_path) = &cli.bed {
let bed_timer = Instant::now();
let regions = parse_bed_async(bed_path).await?;
eprintln!(
"[hotpath] bed parse: {:.2?} (chroms={})",
bed_timer.elapsed(),
regions.len()
);
Some(regions)
} else {
None
};
if cli.invert_regions && bed_regions.is_none() {
return Err("`--invert-regions` requires a BED file".into());
}
let header_timer = Instant::now();
let mut chromosomes = get_chromosomes_async(cli.input_path()).await?;
eprintln!(
"[hotpath] header read: {:.2?} (chroms={})",
header_timer.elapsed(),
chromosomes.len()
);
if let Some(ref bed) = bed_regions {
let filter_timer = Instant::now();
let before = chromosomes.len();
if !cli.invert_regions {
chromosomes.retain(|(name, _)| bed.contains_key(name));
}
eprintln!(
"[hotpath] bed filter: {:.2?} (before={}, after={})",
filter_timer.elapsed(),
before,
chromosomes.len()
);
}
if cli.invert_regions
&& let Some(ref bed) = bed_regions
{
bed_regions = Some(invert_bed_regions(&chromosomes, bed));
}
let total_chromosomes = chromosomes.len();
let processed_count = Arc::new(AtomicUsize::new(0));
let total_bases = Arc::new(AtomicU64::new(0));
let total_coverage_sum = Arc::new(AtomicU64::new(0));
let bed_regions = Arc::new(bed_regions);
let use_mmap = cli.use_mmap;
let buffer_bytes = buffer_size_kb * 1024;
let chunk_size = cli.chunk_size;
let adaptive_chunks = cli.adaptive_chunks;
let show_zero_regions = cli.show_zero_regions;
let store_coverage = cli.wants_any_plot();
let include_non_canonical = cli.include_non_canonical;
let chrom_summaries: Arc<Mutex<HashMap<String, (u64, u64)>>> =
Arc::new(Mutex::new(HashMap::new()));
let (out_tx, out_rx) = unbounded::<String>();
let output_path = cli.coverage_output_path();
let writer_handle = tokio::task::spawn_blocking(move || {
use std::io::Write;
let writer_timer = Instant::now();
let mut out = std::io::BufWriter::new(std::fs::File::create(output_path)?);
writeln!(out, "#chromosome\tposition\tcount")?;
let mut blocks = 0usize;
let mut bytes = 0usize;
for block in out_rx.iter() {
blocks += 1;
bytes += block.len();
out.write_all(block.as_bytes())?;
}
out.flush()?;
eprintln!(
"[hotpath] writer: {:.2?} (blocks={}, bytes={})",
writer_timer.elapsed(),
blocks,
bytes
);
Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
});
println!(
"Async chunked processing: {} chromosomes, concurrency={}, chunk_size={} bp",
total_chromosomes, num_tasks, cli.chunk_size
);
if total_chromosomes > 0 {
let worker_count = std::cmp::min(num_tasks, total_chromosomes);
let bucket_timer = Instant::now();
let mut buckets: Vec<Vec<(String, u32)>> = vec![Vec::new(); worker_count];
let mut bucket_sizes: Vec<u64> = vec![0; worker_count];
let mut chroms = chromosomes;
chroms.sort_by(|a, b| b.1.cmp(&a.1));
for (chrom_name, chrom_length) in chroms {
let (idx, _) = bucket_sizes
.iter()
.enumerate()
.min_by_key(|(_, size)| *size)
.unwrap();
buckets[idx].push((chrom_name, chrom_length));
bucket_sizes[idx] += chrom_length as u64;
}
eprintln!(
"[hotpath] bucketize: {:.2?} (buckets={})",
bucket_timer.elapsed(),
buckets.len()
);
let mut workers = Vec::new();
for bucket in buckets {
if bucket.is_empty() {
continue;
}
let global_coverage = Arc::clone(&global_coverage);
let processed_count = Arc::clone(&processed_count);
let total_bases = Arc::clone(&total_bases);
let total_coverage_sum = Arc::clone(&total_coverage_sum);
let bam_path = cli.input_path().to_path_buf();
let bed_regions = Arc::clone(&bed_regions);
let chrom_summaries = Arc::clone(&chrom_summaries);
let out_tx = out_tx.clone();
let handle = tokio::task::spawn_blocking(move || {
let worker_timer = Instant::now();
let (mut reader, header, index) =
open_bam_for_worker(&bam_path, buffer_bytes, use_mmap)?;
eprintln!(
"[hotpath] worker open: {:.2?} (chroms={})",
worker_timer.elapsed(),
bucket.len()
);
for (chrom_name, chrom_length) in bucket {
let chrom_timer = Instant::now();
let chrom_regions = determine_chromosome_regions(
&chrom_name,
chrom_length,
bed_regions.as_ref(),
chunk_size,
adaptive_chunks,
);
let region_count = chrom_regions.len();
if chrom_regions.is_empty() {
continue;
}
let chrom_bases: u64 = chrom_regions
.iter()
.map(|region| {
region.end.saturating_sub(region.start).saturating_add(1) as u64
})
.sum();
let include_in_stats =
include_non_canonical || is_stats_chromosome(&chrom_name);
if include_in_stats && chrom_bases > 0 {
total_bases.fetch_add(chrom_bases, Ordering::Relaxed);
}
let mut chrom_coverage: HashMap<u32, u32> = HashMap::new();
for region in chrom_regions {
match process_region_with_reader(&mut reader, &header, &index, ®ion) {
Ok(region_result) => {
for (pos, count) in region_result.coverage {
*chrom_coverage.entry(pos).or_insert(0) += count;
}
}
Err(e) => eprintln!(
"Error processing region {}:{}-{}: {e}",
region.chromosome, region.start, region.end
),
}
}
let (block, _positions, chrom_total_coverage) =
format_coverage_block(&chrom_name, &chrom_coverage);
if include_in_stats && chrom_total_coverage > 0 {
total_coverage_sum.fetch_add(chrom_total_coverage, Ordering::Relaxed);
}
if !block.is_empty()
&& let Err(e) = out_tx.send(block)
{
eprintln!("Failed to send output for {chrom_name}: {e}");
}
let should_record = chrom_bases > 0 || show_zero_regions;
if should_record {
if store_coverage {
global_coverage.insert(chrom_name.clone(), chrom_coverage);
}
let _avg_coverage = if chrom_bases > 0 {
chrom_total_coverage as f64 / chrom_bases as f64
} else {
0.0
};
if include_in_stats {
let mut summaries = chrom_summaries.lock().unwrap();
summaries
.insert(chrom_name.clone(), (chrom_bases, chrom_total_coverage));
}
}
eprintln!(
"[hotpath] chrom {chrom_name}: {:.2?} (regions={}, bases={}, coverage_sum={})",
chrom_timer.elapsed(),
region_count,
chrom_bases,
chrom_total_coverage
);
let count = processed_count.fetch_add(1, Ordering::Relaxed) + 1;
println!("Processed {count}/{total_chromosomes} chromosomes");
}
eprintln!("[hotpath] worker total: {:.2?}", worker_timer.elapsed());
Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
});
workers.push(handle);
}
for handle in workers {
handle.await??;
}
}
drop(out_tx);
println!(
"Completed async processing of {} chromosomes",
processed_count.load(Ordering::Relaxed)
);
if !cli.wants_any_plot() {
let summaries = { chrom_summaries.lock().unwrap().clone() };
print_final_statistics_summary(&summaries);
let writer_wait_timer = Instant::now();
writer_handle.await??;
eprintln!("[hotpath] writer wait: {:.2?}", writer_wait_timer.elapsed());
eprintln!(
"[hotpath] async pipeline total: {:.2?}",
pipeline_timer.elapsed()
);
return Ok(CoverageSummary {
total_coverage: total_coverage_sum.load(Ordering::Relaxed),
analyzed_bases: total_bases.load(Ordering::Relaxed),
});
}
let convert_timer = Instant::now();
let final_coverage = match Arc::try_unwrap(global_coverage) {
Ok(map) => map.into_iter().collect(),
Err(map) => convert_dashmap_to_hashmap(&map),
};
eprintln!(
"[hotpath] coverage materialize: {:.2?} (chroms={})",
convert_timer.elapsed(),
final_coverage.len()
);
if cli.wants_any_plot() {
if let Some(theme) = &cli.theme {
crate::plotting::set_theme(theme);
}
let plot_timer = Instant::now();
if cli.wants_per_chromosome_plot() {
if cli.use_concurrent_plots() {
generate_plots_concurrent(cli, &final_coverage, read_stats.as_ref()).await?;
} else {
generate_plots_sequential(cli, &final_coverage, read_stats.as_ref()).await?;
}
}
if cli.wants_overview_plot() {
let output_stem = cli.output_prefix();
let output_dir = cli.output_dir();
let file_format = if cli.svg_output { "svg" } else { "png" };
let overview_path = output_dir.join(format!("{output_stem}.overview.{file_format}"));
crate::plotting::plot_overview_coverage(
&final_coverage,
overview_path.to_str().unwrap(),
Some(output_stem.as_str()),
read_stats.as_ref(),
)?;
}
eprintln!("[hotpath] plots total: {:.2?}", plot_timer.elapsed());
}
let stats_timer = Instant::now();
let summaries = { chrom_summaries.lock().unwrap().clone() };
print_final_statistics_summary(&summaries);
eprintln!("[hotpath] stats print: {:.2?}", stats_timer.elapsed());
let writer_wait_timer = Instant::now();
writer_handle.await??;
eprintln!("[hotpath] writer wait: {:.2?}", writer_wait_timer.elapsed());
eprintln!(
"[hotpath] async pipeline total: {:.2?}",
pipeline_timer.elapsed()
);
Ok(CoverageSummary {
total_coverage: total_coverage_sum.load(Ordering::Relaxed),
analyzed_bases: total_bases.load(Ordering::Relaxed),
})
}
#[hotpath::measure]
async fn parse_bed_async(
bed_path: &Path,
) -> Result<HashMap<String, Vec<(u32, u32)>>, Box<dyn std::error::Error + Send + Sync>> {
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, BufReader};
let file = File::open(bed_path).await?;
let reader = BufReader::new(file);
let mut lines = reader.lines();
let mut bed_regions = HashMap::new();
while let Some(line) = lines.next_line().await? {
let line = line.trim();
if line.is_empty() || line.starts_with('#') {
continue;
}
let parts: Vec<&str> = line.split('\t').collect();
if parts.len() >= 3 {
let chrom = parts[0].to_string();
if let (Ok(start), Ok(end)) = (parts[1].parse::<u32>(), parts[2].parse::<u32>()) {
if end <= start {
continue;
}
let start = start.saturating_add(1);
bed_regions
.entry(chrom)
.or_insert_with(Vec::new)
.push((start, end));
}
}
}
Ok(bed_regions)
}
#[hotpath::measure]
async fn get_chromosomes_async(
bam_path: &Path,
) -> Result<Vec<(String, u32)>, Box<dyn std::error::Error + Send + Sync>> {
let bam_path = bam_path.to_owned();
tokio::task::spawn_blocking(move || {
#[allow(clippy::default_constructed_unit_structs)]
let mut reader = bam::io::reader::Builder::default().build_from_path(&bam_path)?;
let header = reader.read_header()?;
let reference_sequences = header.reference_sequences();
let chromosomes: Vec<(String, u32)> = reference_sequences
.iter()
.map(|(name, ref_seq)| (name.to_string(), ref_seq.length().get() as u32))
.collect();
Ok::<Vec<(String, u32)>, Box<dyn std::error::Error + Send + Sync>>(chromosomes)
})
.await?
}
fn invert_bed_regions(
chromosomes: &[(String, u32)],
bed_regions: &HashMap<String, Vec<(u32, u32)>>,
) -> HashMap<String, Vec<(u32, u32)>> {
let mut inverted = HashMap::new();
for (chrom, length) in chromosomes {
if *length == 0 {
continue;
}
let mut regions = bed_regions.get(chrom).cloned().unwrap_or_default();
if regions.is_empty() {
inverted.insert(chrom.clone(), vec![(1, *length)]);
continue;
}
regions.sort_by_key(|(start, _)| *start);
let mut merged: Vec<(u32, u32)> = Vec::new();
for (mut start, mut end) in regions {
if end < 1 || start > *length {
continue;
}
if start < 1 {
start = 1;
}
if end > *length {
end = *length;
}
if start > end {
continue;
}
if let Some(last) = merged.last_mut() {
if start <= last.1.saturating_add(1) {
if end > last.1 {
last.1 = end;
}
} else {
merged.push((start, end));
}
} else {
merged.push((start, end));
}
}
let mut complement = Vec::new();
let mut cursor = 1u32;
for (start, end) in merged {
if cursor < start {
complement.push((cursor, start - 1));
}
cursor = end.saturating_add(1);
if cursor == 0 {
break;
}
}
if cursor <= *length {
complement.push((cursor, *length));
}
if !complement.is_empty() {
inverted.insert(chrom.clone(), complement);
}
}
inverted
}
fn is_stats_chromosome(name: &str) -> bool {
is_canonical(name) || is_mito(name) || is_ebv(name)
}
fn is_canonical(name: &str) -> bool {
canonical_order_key(name).is_some()
}
fn canonical_order_key(name: &str) -> Option<u32> {
let mut s = name.to_ascii_lowercase();
if let Some(stripped) = s.strip_prefix("chr") {
s = stripped.to_string();
}
if s == "x" {
return Some(23);
}
if s == "y" {
return Some(24);
}
if let Ok(val) = s.parse::<u32>()
&& (1..=22).contains(&val)
{
return Some(val);
}
None
}
fn is_mito(name: &str) -> bool {
let n = name.to_ascii_lowercase();
n == "chrm" || n == "chrmt" || n == "mt" || n == "m"
}
fn is_ebv(name: &str) -> bool {
let n = name.to_ascii_lowercase();
n == "chrebv" || n == "ebv"
}
#[hotpath::measure]
fn determine_chromosome_regions(
chrom_name: &str,
chrom_length: u32,
bed_regions: &Option<HashMap<String, Vec<(u32, u32)>>>,
base_chunk_size: usize,
adaptive_chunks: bool,
) -> Vec<ChromosomeRegion> {
let mut regions = Vec::new();
let target_regions = if let Some(bed) = bed_regions {
bed.get(chrom_name).cloned().unwrap_or_default()
} else {
vec![(1, chrom_length)]
};
for (start, end) in target_regions {
let region_length = end - start + 1;
let chunk_size = if adaptive_chunks {
if region_length > 1_000_000 {
base_chunk_size * 2 } else if region_length < 50_000 {
base_chunk_size / 2 } else {
base_chunk_size
}
} else {
base_chunk_size
};
let mut chunk_start = start;
while chunk_start <= end {
let chunk_end = std::cmp::min(chunk_start + chunk_size as u32 - 1, end);
regions.push(ChromosomeRegion {
chromosome: chrom_name.to_string(),
start: chunk_start,
end: chunk_end,
});
chunk_start = chunk_end + 1;
}
}
regions
}
#[derive(Debug, Clone)]
pub struct ChromosomeRegion {
pub chromosome: String,
pub start: u32,
pub end: u32,
}
#[hotpath::measure]
fn open_bam_for_worker(
bam_path: &Path,
buffer_size: usize,
_use_mmap: bool,
) -> Result<
(BamReader, noodles_sam::Header, bam::bai::Index),
Box<dyn std::error::Error + Send + Sync>,
> {
let file = std::fs::File::open(bam_path)?;
let mut reader = bam::io::Reader::new(std::io::BufReader::with_capacity(buffer_size, file));
let header = reader.read_header()?;
let mut bai_reader =
bam::bai::Reader::new(std::fs::File::open(bam_path.with_extension("bam.bai"))?);
let index = bai_reader.read_index()?;
Ok((reader, header, index))
}
#[hotpath::measure]
fn process_region_with_reader(
reader: &mut BamReader,
header: &noodles_sam::Header,
index: &bam::bai::Index,
region: &ChromosomeRegion,
) -> RegionCoverageResult {
let mut coverage = HashMap::new();
let query_region = Region::new(
region.chromosome.clone(),
Position::try_from(region.start as usize)
.map_err(|e| format!("Invalid start position {}: {}", region.start, e))?
..=Position::try_from(region.end as usize)
.map_err(|e| format!("Invalid end position {}: {}", region.end, e))?,
);
let query = reader.query(header, index, &query_region)?;
for result in query {
let record = result?;
if record.flags().is_unmapped() {
continue;
}
let start_pos = match record.alignment_start() {
Some(Ok(pos)) => pos.get() as u32,
_ => continue,
};
let len = crate::io::calculate_reference_span(&record.cigar());
for pos in start_pos..start_pos + len {
if pos >= region.start && pos <= region.end {
let count = coverage.entry(pos).or_insert(0);
*count += 1;
}
}
}
Ok(RegionCoverage { coverage })
}
#[hotpath::measure]
fn format_coverage_block(chrom: &str, coverage: &HashMap<u32, u32>) -> (String, u64, u64) {
use std::fmt::Write as _;
if coverage.is_empty() {
return (String::new(), 0, 0);
}
let mut positions: Vec<_> = coverage.iter().collect();
positions.sort_by_key(|&(pos, _)| *pos);
let mut block = String::with_capacity(positions.len() * 24);
let mut total_coverage = 0u64;
let positions_len = positions.len() as u64;
for (&pos, &count) in positions {
total_coverage += count as u64;
let _ = writeln!(block, "{chrom}\t{pos}\t{count}");
}
(block, positions_len, total_coverage)
}
#[hotpath::measure]
fn convert_dashmap_to_hashmap(
global_coverage: &Arc<DashMap<String, HashMap<u32, u32>>>,
) -> HashMap<String, HashMap<u32, u32>> {
let mut result = HashMap::new();
for item in global_coverage.iter() {
let chrom = item.key().clone();
result.insert(chrom, item.value().clone());
}
result
}
#[hotpath::measure]
async fn generate_plots_concurrent(
cli: &Cli,
coverage: &HashMap<String, HashMap<u32, u32>>,
read_stats: Option<&ReadStats>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("Generating plots concurrently...");
let mut plot_futures = FuturesUnordered::new();
for (chrom, chrom_coverage) in coverage {
let future = generate_chromosome_plot_async(
cli,
chrom.clone(),
chrom_coverage.clone(),
read_stats.cloned(),
);
plot_futures.push(future);
}
while let Some(result) = plot_futures.next().await {
if let Err(e) = result {
eprintln!("Error generating chromosome plot: {e}");
}
}
println!("Completed concurrent plot generation");
Ok(())
}
#[hotpath::measure]
async fn generate_plots_sequential(
cli: &Cli,
coverage: &HashMap<String, HashMap<u32, u32>>,
read_stats: Option<&ReadStats>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
println!("Generating plots sequentially...");
for (chrom, chrom_coverage) in coverage {
generate_chromosome_plot_async(
cli,
chrom.clone(),
chrom_coverage.clone(),
read_stats.cloned(),
)
.await?;
}
println!("Completed sequential plot generation");
Ok(())
}
#[hotpath::measure]
async fn generate_chromosome_plot_async(
cli: &Cli,
chrom: String,
chrom_coverage: HashMap<u32, u32>,
read_stats: Option<ReadStats>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let cli_clone = cli.clone();
tokio::task::spawn_blocking(
move || -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let output_stem = cli_clone.output_prefix();
let output_dir = cli_clone.output_dir();
let file_format = if cli_clone.svg_output { "svg" } else { "png" };
let plot_path = output_dir.join(format!("{output_stem}.{chrom}.{file_format}"));
if let Some(theme) = &cli_clone.theme {
crate::plotting::set_theme(theme);
}
let min_pos = chrom_coverage.keys().min().copied().unwrap_or(0);
let max_pos = chrom_coverage.keys().max().copied().unwrap_or(0);
crate::plotting::plot_per_base_coverage_with_range(
&chrom,
&chrom_coverage,
plot_path.to_str().unwrap(),
min_pos,
max_pos,
read_stats.as_ref(),
cli_clone.show_zero_regions,
cli_clone.use_log_scale(),
cli_clone.plot_bin_size,
Some(output_stem.as_ref()),
)
.map_err(|e| {
Box::new(PlotError(e.to_string())) as Box<dyn std::error::Error + Send + Sync>
})
},
)
.await?
}
#[hotpath::measure]
fn print_final_statistics_summary(summaries: &HashMap<String, (u64, u64)>) {
println!("\n=== Final Processing Statistics ===");
println!("Chromosomes processed: {}", summaries.len());
let mut total_bases = 0u64;
let mut total_coverage = 0u64;
let mut chroms: Vec<_> = summaries.iter().collect();
chroms.sort_by_key(|(chrom, _)| *chrom);
for (chrom, (bases, coverage_sum)) in chroms {
let chrom_avg = if *bases > 0 {
*coverage_sum as f64 / *bases as f64
} else {
0.0
};
println!("{chrom}: {bases} bases, avg coverage: {chrom_avg:.2}");
total_bases += *bases;
total_coverage += *coverage_sum;
}
let global_avg = if total_bases > 0 {
total_coverage as f64 / total_bases as f64
} else {
0.0
};
println!("Global: {total_bases} bases, avg coverage: {global_avg:.2}");
}