use clap::Parser;
use noodles_sam::Header;
use std::num::NonZeroUsize;
use util::oarfish_types::NamedDigestVec;
#[cfg(feature = "mimalloc")]
#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
use num_format::{Locale, ToFormattedString};
use std::{fs::File, io};
use tracing::info;
use tracing_subscriber::{EnvFilter, filter::LevelFilter, fmt, prelude::*};
use noodles_bam as bam;
use noodles_bgzf as bgzf;
mod alignment_parser;
mod bootstrap;
mod bulk;
mod em;
mod prog_opts;
mod single_cell;
mod util;
use crate::prog_opts::{Args, FilterGroup};
use crate::util::aligner::{get_aligner_from_args, get_aligner_from_fastas};
use crate::util::digest_utils;
use crate::util::normalize_probability::normalize_read_probs;
use crate::util::oarfish_types::{AlignmentFilters, TranscriptInfo};
use crate::util::projection;
use crate::util::{
binomial_probability::binomial_continuous_prob, kde_utils, logistic_probability::logistic_prob,
};
fn get_txp_info_from_header(
header: &Header,
args: &Args,
) -> anyhow::Result<(Vec<TranscriptInfo>, Vec<String>)> {
let num_ref_seqs = header.reference_sequences().len();
let mut txps: Vec<TranscriptInfo> = Vec::with_capacity(num_ref_seqs);
let mut txps_name: Vec<String> = Vec::with_capacity(num_ref_seqs);
if args.model_coverage {
for (rseq, rmap) in header.reference_sequences().iter() {
txps.push(TranscriptInfo::with_len_and_bin_width(
rmap.length(),
args.bin_width,
));
txps_name.push(rseq.to_string());
}
} else {
for (rseq, rmap) in header.reference_sequences().iter() {
txps.push(TranscriptInfo::with_len(rmap.length()));
txps_name.push(rseq.to_string());
}
}
info!(
"parsed reference information for {} transcripts.",
txps.len().to_formatted_string(&Locale::en)
);
Ok((txps, txps_name))
}
fn get_filter_opts(args: &Args) -> anyhow::Result<AlignmentFilters> {
if args.score_prob_denom.is_some()
&& (args.genome.is_some() || args.genome_alignments.is_some())
{
anyhow::bail!(
"--score-prob-denom does not apply in genome (projection) mode: a read's \
projected alignments are weighted by bramble's exonic-coverage similarity \
(see --projected-prob-source), not the score→probability conversion that \
--score-prob-denom controls. Please omit --score-prob-denom in genome mode."
);
}
match args.filter_group {
Some(FilterGroup::NoFilters) => {
info!("disabling alignment filters.");
let fpc = args
.five_prime_clip
.provided_or_u32("overriding 5' clip with user-provided value", u32::MAX);
let tpc = args
.three_prime_clip
.provided_or_i64("overriding 3' clip with user-provided value", i64::MAX);
let st = args
.score_threshold
.provided_or_f32("overriding score threshold with user-provided value", 0_f32);
let maf = args.min_aligned_fraction.provided_or_f32(
"overriding min aligned fraction with user-provided value",
0_f32,
);
let mal = args.min_aligned_len.provided_or_u32(
"overriding min aligned length with user-provided value",
1_u32,
);
Ok(AlignmentFilters::builder()
.five_prime_clip(fpc)
.three_prime_clip(tpc)
.score_threshold(st)
.min_aligned_fraction(maf)
.min_aligned_len(mal)
.which_strand(args.strand_filter)
.model_coverage(args.model_coverage)
.logistic_growth_rate(args.growth_rate)
.write_assignment_probs(args.write_assignment_probs.is_some())
.write_assignment_probs_type(args.write_assignment_probs.clone())
.score_prob_denom(args.score_prob_denom.unwrap_or(5.0))
.build())
}
Some(FilterGroup::NanocountFilters) => {
info!("setting filters to nanocount defaults.");
let fpc = args
.five_prime_clip
.provided_or_u32("overriding 5' clip with user-provided value", u32::MAX);
let tpc = args
.three_prime_clip
.provided_or_i64("overriding 3' clip with user-provided value", 50_i64);
let st = args.score_threshold.provided_or_f32(
"overriding score threshold with user-provided value",
0.95_f32,
);
let maf = args.min_aligned_fraction.provided_or_f32(
"overriding min aligned fraction with user-provided value",
0.5_f32,
);
let mal = args.min_aligned_len.provided_or_u32(
"overriding min aligned length with user-provided value",
50_u32,
);
Ok(AlignmentFilters::builder()
.five_prime_clip(fpc)
.three_prime_clip(tpc)
.score_threshold(st)
.min_aligned_fraction(maf)
.min_aligned_len(mal)
.which_strand(bio_types::strand::Strand::Forward)
.model_coverage(args.model_coverage)
.logistic_growth_rate(args.growth_rate)
.write_assignment_probs(args.write_assignment_probs.is_some())
.write_assignment_probs_type(args.write_assignment_probs.clone())
.score_prob_denom(args.score_prob_denom.unwrap_or(5.0))
.build())
}
None => {
info!("setting user-provided filter parameters.");
Ok(AlignmentFilters::builder()
.five_prime_clip(args.five_prime_clip.try_as_u32()?)
.three_prime_clip(args.three_prime_clip.try_as_i64()?)
.score_threshold(args.score_threshold.try_as_f32()?)
.min_aligned_fraction(args.min_aligned_fraction.try_as_f32()?)
.min_aligned_len(args.min_aligned_len.try_as_u32()?)
.which_strand(args.strand_filter)
.model_coverage(args.model_coverage)
.logistic_growth_rate(args.growth_rate)
.write_assignment_probs(args.write_assignment_probs.is_some())
.write_assignment_probs_type(args.write_assignment_probs.clone())
.score_prob_denom(args.score_prob_denom.unwrap_or(5.0))
.build())
}
}
}
fn run_genome_alignments(args: &Args, filter_opts: AlignmentFilters) -> anyhow::Result<()> {
let bam_path = args
.genome_alignments
.clone()
.expect("genome_alignments present");
let annotation = args
.annotation
.clone()
.expect("--annotation is required with --genome-alignments");
info!("oarfish is operating in genome-alignment (projection) mode");
let afile = File::open(&bam_path)?;
let decomp_threads = 1.max(args.threads.saturating_sub(1));
let worker_count = NonZeroUsize::new(decomp_threads).expect("decompression threads >= 1");
let decoder = bgzf::io::MultithreadedReader::with_worker_count(worker_count, afile);
let mut reader = bam::io::Reader::from(decoder);
let genome_header = alignment_parser::read_and_verify_genome_header(&mut reader, &bam_path)?;
let refnames: Vec<String> = genome_header
.reference_sequences()
.iter()
.map(|(rseq, _)| rseq.to_string())
.collect();
let rescue_db = projection::load_rescue_fasta(args)?;
let use_fasta = rescue_db.is_some();
let g2t = projection::load_g2t(&annotation, &refnames, rescue_db)?;
let (txp_header, mut txps, txps_name) =
projection::build_transcriptome_header_and_info(&g2t, args)?;
let proj_config = projection::projection_config(args, use_fasta);
let seqcol_digest = digest_utils::digest_from_header(&txp_header)?;
let digest: NamedDigestVec = vec![("transcriptome_digest".to_string(), seqcol_digest)].into();
bulk::quantify_genome_alignments_from_bam(
&genome_header,
&txp_header,
&g2t,
&proj_config,
filter_opts,
&mut reader,
&mut txps,
&txps_name,
args,
digest,
)
}
fn run_genome_reads(args: &mut Args, filter_opts: AlignmentFilters) -> anyhow::Result<()> {
let annotation = args
.annotation
.clone()
.expect("--annotation is required with --genome");
let read_paths = args
.reads
.clone()
.expect("--reads is required in genome read mode");
info!("loading annotation from {}", annotation.display());
let transcripts = bramble_rs::annotation::load_transcripts(&annotation)?;
info!("loaded {} transcripts from annotation", transcripts.len());
let junc_bed: Option<std::path::PathBuf> = if let Some(b) = args.junctions.clone() {
Some(b)
} else if !args.ignore_annotation_junctions {
let mut bed = args.output.clone().expect("output prefix required");
let fname = format!(
"{}.annot_junctions.bed",
bed.file_name().map(|f| f.to_string_lossy().into_owned()).unwrap_or_default()
);
bed.set_file_name(fname);
let n = projection::write_annotation_junction_bed(&transcripts, &bed)?;
info!(
"derived {} spliced transcript models from the annotation for the splice-junction BED",
n
);
Some(bed)
} else {
info!("not using annotated splice junctions (--ignore-annotation-junctions)");
None
};
let (aligner, refnames) =
crate::util::aligner::get_genome_aligner_from_args(args, junc_bed.as_deref())?;
let rescue_db = match projection::load_rescue_fasta(args)? {
Some(db) => Some(db),
None if !args.no_rescue => crate::util::aligner::rescue_db_from_genome_index(&aligner),
None => None,
};
let use_fasta = rescue_db.is_some();
let g2t = projection::build_g2t_from_transcripts(&transcripts, &refnames, rescue_db)?;
let (txp_header, mut txps, txps_name) =
projection::build_transcriptome_header_and_info(&g2t, args)?;
let proj_config = projection::projection_config(args, use_fasta);
let seqcol_digest = digest_utils::digest_from_header(&txp_header)?;
let digest: NamedDigestVec = vec![("transcriptome_digest".to_string(), seqcol_digest)].into();
bulk::quantify_genome_raw_reads(
&txp_header,
aligner,
&g2t,
&proj_config,
filter_opts,
&read_paths,
&mut txps,
&txps_name,
args,
digest,
)
}
fn main() -> anyhow::Result<()> {
let env_filter = EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy();
let (filtered_layer, reload_handle) = tracing_subscriber::reload::Layer::new(env_filter);
tracing_subscriber::registry()
.with(fmt::layer().with_writer(io::stderr))
.with(filtered_layer)
.init();
let mut args = Args::parse();
if args.quiet {
reload_handle.modify(|filter| *filter = EnvFilter::new("WARN"))?;
}
if args.verbose {
reload_handle.modify(|filter| *filter = EnvFilter::new("TRACE"))?;
}
if let Some(mb) = args.dp_cache_cap_mb {
rammap::set_dp_cache_cap_mb(mb);
}
if args.only_index {
let (header, _reader, _aligner, digest) = get_aligner_from_fastas(&mut args)?;
info!(
"indexing completed; index over {} references written to {}",
header.reference_sequences().len(),
&args.index_out.expect("nonempty").display()
);
info!(
"reference digest = {}",
serde_json::to_string_pretty(&digest.to_json())?
);
return Ok(());
}
let filter_opts = get_filter_opts(&args)?;
if args.genome_alignments.is_some() {
run_genome_alignments(&args, filter_opts)?;
info!("oarfish completed successfully.");
return Ok(());
} else if args.genome.is_some() {
run_genome_reads(&mut args, filter_opts)?;
info!("oarfish completed successfully.");
return Ok(());
}
let (header, reader, aligner, digest) = if args.alignments.is_none() {
get_aligner_from_args(&mut args)?
} else {
let alignments = args.alignments.clone().unwrap();
let afile = File::open(&alignments)?;
let decomp_threads = if args.single_cell {
match args.threads {
1..=6 => 1,
7 | 8 => 2,
_ => 3,
}
} else {
1.max(args.threads.saturating_sub(1))
};
let worker_count = NonZeroUsize::new(decomp_threads).expect("decompression threads >= 1");
if args.single_cell {
args.threads = 1.max(args.threads.saturating_sub(decomp_threads));
}
let decoder = bgzf::io::MultithreadedReader::with_worker_count(worker_count, afile);
let mut reader = bam::io::Reader::from(decoder);
let header = alignment_parser::read_and_verify_header(&mut reader, &alignments)?;
let seqcol_digest = digest_utils::digest_from_header(&header)?;
(
header,
Some(reader),
None,
vec![("bam_digest".to_string(), seqcol_digest)].into(),
)
};
let (mut txps, txps_name) = get_txp_info_from_header(&header, &args)?;
if args.single_cell {
reload_handle.modify(|filter| {
*filter = if args.quiet {
EnvFilter::new("WARN")
.add_directive("oarfish=warn".parse().unwrap())
.add_directive("oarfish::single_cell=warn".parse().unwrap())
} else if args.verbose {
EnvFilter::new("TRACE")
.add_directive("oarfish=info".parse().unwrap())
.add_directive("oarfish::single_cell=trace".parse().unwrap())
} else {
EnvFilter::new("INFO")
.add_directive("oarfish=warn".parse().unwrap())
.add_directive("oarfish::single_cell=info".parse().unwrap())
}
})?;
single_cell::quantify_single_cell_from_collated_bam(
&header,
&filter_opts,
&mut reader.unwrap(),
&mut txps,
&args,
digest,
)?;
} else if args.alignments.is_some() {
bulk::quantify_bulk_alignments_from_bam(
&header,
filter_opts,
&mut reader.unwrap(),
&mut txps,
&txps_name,
&args,
digest,
)?;
} else {
bulk::quantify_bulk_alignments_raw_reads(
&header,
aligner.expect("need valid alinger to align reads"),
filter_opts,
&args.reads.clone().expect("expected read file(s)"),
&mut txps,
&txps_name,
&args,
digest,
)?;
}
info!("oarfish completed successfully.");
Ok(())
}