use crate::NamedDigestVec;
use crate::alignment_parser;
use crate::em;
use crate::kde_utils;
use crate::prog_opts::Args;
use crate::util::constants::EMPTY_READ_NAME;
use crate::util::oarfish_types::AlnInfo;
use crate::util::oarfish_types::DiscardTable;
use crate::util::oarfish_types::{
AlignmentFilters, EMInfo, InMemoryAlignmentStore, InputSourceType, ReadChunkWithNames,
ReadSource, TranscriptInfo,
};
use crate::util::read_function::read_short_quant_vec;
use crate::util::write_function::{write_infrep_file, write_out_prob, write_output};
use crate::util::projection::{mapping_to_genomic_alignment, projected_to_records};
use bramble_rs::ProjectionConfig;
use bramble_rs::ProjectionContext;
use bramble_rs::g2t::G2TTree;
use bramble_rs::project_group_with;
use crate::{logistic_prob, normalize_read_probs};
use arrow2::{array::Float64Array, chunk::Chunk, datatypes::Field};
use crossbeam::channel::Receiver;
use crossbeam::channel::Sender;
use crossbeam::channel::bounded;
use needletail::parse_fastx_file;
use noodles_bam as bam;
use num_format::{Locale, ToFormattedString};
use serde_json::json;
use std::io::BufRead;
use swapvec::{SwapVec, SwapVecConfig};
use tracing::{info, warn};
fn get_json_info(
args: &Args,
emi: &EMInfo,
seqcol_digest: &NamedDigestVec,
aln_time: std::time::Duration,
) -> serde_json::Value {
let prob = if args.model_coverage {
"logistic_coverage"
} else {
"no_coverage"
};
let source = if args.alignments.is_some() {
"from_bam"
} else {
"from_raw_reads"
};
json!({
"prob_model" : prob,
"alignment_source" : source,
"alignment_time" : {
"comment" : "Time to parse (in alignment mode) or generate (in raw read mode) alignments, as well as apply filters, and compute conditional probabilities.",
"human_time" : humantime::format_duration(aln_time).to_string(),
"seconds" : aln_time.as_secs_f64()
},
"bin_width" : args.bin_width,
"filter_options" : &emi.eq_map.filter_opts,
"discard_table" : &emi.eq_map.discard_table,
"alignments": &args.alignments,
"output": args.output.as_ref().expect("present"),
"verbose": &args.verbose,
"single_cell": &args.single_cell,
"quiet": &args.quiet,
"em_max_iter": &args.max_em_iter,
"em_convergence_thresh": &args.convergence_thresh,
"threads": &args.threads,
"filter_group": &args.filter_group,
"write_assignment_probs": &emi.eq_map.filter_opts.write_assignment_probs_type,
"short_quant": &args.short_quant,
"num_bootstraps": &args.num_bootstraps,
"digest": seqcol_digest.to_json()
})
}
#[allow(clippy::too_many_arguments)]
fn perform_inference_and_write_output(
header: &noodles_sam::header::Header,
store: &mut InMemoryAlignmentStore,
name_vec: Option<SwapVec<String>>,
txps: &mut [TranscriptInfo],
txps_name: &[String],
seqcol_digest: NamedDigestVec,
aln_time: std::time::Duration,
args: &Args,
) -> anyhow::Result<()> {
info!("\ndiscard_table: \n{}\n", store.discard_table.to_table());
let kde_opt: Option<kders::kde::KDEModel> = if args.use_kde {
Some(kde_utils::get_kde_model(txps, store)?)
} else {
None
};
if store.filter_opts.model_coverage {
logistic_prob(txps, args.growth_rate, &args.bin_width, args.threads);
normalize_read_probs(store, txps, &args.bin_width);
}
info!(
"Total number of alignment records : {}",
store.total_len().to_formatted_string(&Locale::en)
);
info!(
"number of aligned reads : {}",
store.num_aligned_reads().to_formatted_string(&Locale::en)
);
info!(
"number of unique alignments : {}",
store.unique_alignments().to_formatted_string(&Locale::en)
);
let init_abundances = args.short_quant.as_ref().map(|sr_path| {
read_short_quant_vec(sr_path, txps_name).unwrap_or_else(|e| panic!("{}", e))
});
let emi = EMInfo {
eq_map: store,
txp_info: txps,
max_iter: args.max_em_iter,
convergence_thresh: args.convergence_thresh,
init_abundances,
kde_model: kde_opt,
};
if args.use_kde {
}
let counts = if args.threads > 4 {
em::em_par(&emi, args.threads)
} else {
em::em(&emi, args.threads)
};
let aux_txp_counts = crate::util::aux_counts::get_aux_counts(store, txps)?;
let json_info = get_json_info(args, &emi, &seqcol_digest, aln_time);
write_output(
args.output.as_ref().expect("present"),
json_info,
header,
&counts,
&aux_txp_counts,
)?;
if args.num_bootstraps > 0 {
let breps = em::bootstrap(&emi, args.num_bootstraps, args.threads);
let mut new_arrays = vec![];
let mut bs_fields = vec![];
for (i, b) in breps.into_iter().enumerate() {
let bs_array = Float64Array::from_vec(b);
bs_fields.push(Field::new(
format!("bootstrap.{}", i),
bs_array.data_type().clone(),
false,
));
new_arrays.push(bs_array.boxed());
}
let chunk = Chunk::new(new_arrays);
write_infrep_file(args.output.as_ref().expect("present"), bs_fields, chunk)?;
}
if args.write_assignment_probs.is_some() {
let name_vec = name_vec
.expect("cannot write assignment probabilities without valid vector of read names");
write_out_prob(
args.output.as_ref().expect("present"),
&emi,
&counts,
name_vec,
txps_name,
args.display_thresh,
)?;
}
Ok(())
}
pub fn quantify_bulk_alignments_from_bam<R: BufRead>(
header: &noodles_sam::Header,
filter_opts: AlignmentFilters,
reader: &mut bam::io::Reader<R>,
txps: &mut [TranscriptInfo],
txps_name: &[String],
args: &Args,
seqcol_digest: NamedDigestVec,
) -> anyhow::Result<()> {
let mut name_vec = if filter_opts.write_assignment_probs {
Some(SwapVec::<String>::with_config(SwapVecConfig {
swap_after: Default::default(),
batch_size: Default::default(),
compression: Some(swapvec::Compression::Lz4),
}))
} else {
None
};
let mut store = InMemoryAlignmentStore::new(filter_opts, header);
let read_aln_start = std::time::SystemTime::now();
alignment_parser::parse_alignments(
&mut store,
&mut name_vec,
header,
reader,
txps,
args.sort_check_num,
args.quiet,
)?;
let read_aln_time = read_aln_start.elapsed()?;
info!(
"Parsing of alignments from input took: {}",
humantime::format_duration(read_aln_time).to_string()
);
perform_inference_and_write_output(
header,
&mut store,
name_vec,
txps,
txps_name,
seqcol_digest,
read_aln_time,
args,
)
}
#[allow(clippy::too_many_arguments)]
pub fn quantify_genome_alignments_from_bam<R: BufRead>(
genome_header: &noodles_sam::Header,
txp_header: &noodles_sam::Header,
g2t: &G2TTree,
proj_config: &ProjectionConfig,
filter_opts: AlignmentFilters,
reader: &mut bam::io::Reader<R>,
txps: &mut [TranscriptInfo],
txps_name: &[String],
args: &Args,
seqcol_digest: NamedDigestVec,
) -> anyhow::Result<()> {
let mut name_vec = if filter_opts.write_assignment_probs {
Some(SwapVec::<String>::with_config(SwapVecConfig {
swap_after: Default::default(),
batch_size: Default::default(),
compression: Some(swapvec::Compression::Lz4),
}))
} else {
None
};
let mut store = InMemoryAlignmentStore::new(filter_opts, txp_header);
let read_aln_start = std::time::SystemTime::now();
alignment_parser::parse_genome_alignments(
&mut store,
&mut name_vec,
genome_header,
g2t,
proj_config,
args.projected_prob_beta,
args.projected_prob_source,
reader,
txps,
args.sort_check_num,
args.quiet,
)?;
let read_aln_time = read_aln_start.elapsed()?;
info!(
"Parsing and projection of genome alignments took: {}",
humantime::format_duration(read_aln_time).to_string()
);
perform_inference_and_write_output(
txp_header,
&mut store,
name_vec,
txps,
txps_name,
seqcol_digest,
read_aln_time,
args,
)
}
#[allow(clippy::too_many_arguments)]
#[allow(unused_mut)]
pub fn quantify_genome_raw_reads(
txp_header: &noodles_sam::Header,
mut aligner: crate::util::mapper::Mapper,
g2t: &G2TTree,
proj_config: &ProjectionConfig,
filter_opts: AlignmentFilters,
read_paths: &[std::path::PathBuf],
txps: &mut [TranscriptInfo],
txps_name: &[String],
args: &Args,
seqcol_digest: NamedDigestVec,
) -> anyhow::Result<()> {
let mut txp_info_view: Vec<TranscriptInfo> = Vec::with_capacity(txps.len());
for ti in txps.iter() {
txp_info_view.push(ti.clone());
}
let map_threads = args.threads.saturating_sub(2).max(1);
let beta = args.projected_prob_beta;
let use_fasta = proj_config.use_fasta;
let prob_source = args.projected_prob_source;
type ReadGroup = ReadChunkWithNames;
type AlignmentGroupInfo = (Vec<AlnInfo>, Vec<f32>, Vec<usize>, Option<Vec<String>>);
let (read_sender, read_receiver): (Sender<ReadGroup>, Receiver<ReadGroup>) =
bounded(args.threads * 10);
const READ_CHUNK_SIZE: usize = 200;
let mut rpaths = vec![];
read_paths.clone_into(&mut rpaths);
let producer = std::thread::spawn(move || {
let mut ctr = 0_usize;
let mut chunk_size = 0_usize;
let mut read_chunk = ReadChunkWithNames::new();
let mark_chunk = |chunk_size: &mut usize,
ctr: &mut usize,
read_chunk: &mut ReadGroup,
read_sender: &Sender<ReadGroup>| {
*chunk_size += 1;
*ctr += 1;
if *chunk_size >= READ_CHUNK_SIZE {
read_sender
.send(read_chunk.clone())
.expect("Error sending sequence");
read_chunk.clear();
*chunk_size = 0;
}
};
for read_path in rpaths {
match get_source_type(&read_path) {
InputSourceType::Ubam => {
let mut reader = std::fs::File::open(read_path)
.map(bam::io::Reader::new)
.expect("could not create BAM reader");
let header = reader.read_header().expect("could not read BAM header");
for result in reader.record_bufs(&header) {
let record = result.expect("Error reading ubam record");
record.add_to_read_group(&mut read_chunk);
mark_chunk(&mut chunk_size, &mut ctr, &mut read_chunk, &read_sender);
}
}
s @ (InputSourceType::Fastx | InputSourceType::Unknown) => {
if matches!(s, InputSourceType::Unknown) {
warn!(
"could not determine input file type for {} from suffix; assuming (possibly gzipped) fastx",
read_path.display()
);
}
let mut reader =
parse_fastx_file(read_path).expect("valid path/file to read sequences");
while let Some(result) = reader.next() {
let record = result.expect("Error reading record");
record.add_to_read_group(&mut read_chunk);
mark_chunk(&mut chunk_size, &mut ctr, &mut read_chunk, &read_sender);
}
}
}
}
if chunk_size > 0 {
read_sender
.send(read_chunk)
.expect("Error sending sequence");
}
ctr
});
let (mut store, name_vec, aln_time) = std::thread::scope(
|s| -> anyhow::Result<(
InMemoryAlignmentStore,
Option<SwapVec<String>>,
std::time::Duration,
)> {
const ALN_GROUP_CHUNK_LIMIT: usize = 100;
let (aln_group_sender, aln_group_receiver): (
Sender<AlignmentGroupInfo>,
Receiver<AlignmentGroupInfo>,
) = bounded(args.threads * 100);
let write_assignment_probs: bool = args.write_assignment_probs.is_some();
let n_genome_mapped = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
let n_projected = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
let consumers: Vec<_> = (0..map_threads)
.map(|_| {
let receiver = read_receiver.clone();
let filter = filter_opts.clone();
let loc_aligner = aligner.clone();
let my_txp_info_view = &txp_info_view;
let aln_group_sender = aln_group_sender.clone();
let n_genome_mapped = std::sync::Arc::clone(&n_genome_mapped);
let n_projected = std::sync::Arc::clone(&n_projected);
s.spawn(move || {
let mut discard_table = DiscardTable::new();
let mut chunk_size = 0_usize;
let mut aln_group_alns: Vec<AlnInfo> = Vec::new();
let mut aln_group_probs: Vec<f32> = Vec::new();
let mut aln_group_boundaries: Vec<usize> = Vec::new();
let mut aln_group_read_names = write_assignment_probs.then(Vec::new);
aln_group_boundaries.push(0);
let mut pctx = ProjectionContext::new();
let mut galns: Vec<bramble_rs::GenomicAlignment> = Vec::new();
let mut src_scores: Vec<i32> = Vec::new();
for read_chunk in receiver {
for (name, seq) in read_chunk.iter() {
let map_res_opt =
crate::util::mapper::map_read(&loc_aligner, name, seq);
let Ok(mappings) = map_res_opt else {
warn!("Error encountered mapping read: {}", map_res_opt.unwrap_err());
continue;
};
let query_name = String::from_utf8_lossy(name).into_owned();
galns.clear();
src_scores.clear();
for m in mappings.iter() {
if let Some(ga) =
mapping_to_genomic_alignment(m, &query_name, seq.len())
{
galns.push(ga);
src_scores.push(crate::util::mapper::alignment_score(m));
}
}
if galns.is_empty() {
continue;
}
n_genome_mapped
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if use_fasta {
galns[0].sequence = Some(if galns[0].is_reverse {
crate::util::projection::revcomp(seq)
} else {
seq.to_vec()
});
}
let projected =
project_group_with(&galns, g2t, proj_config, &mut pctx);
if projected.is_empty() {
continue;
}
n_projected.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let recs = projected_to_records(&projected, &src_scores);
let (ag, aprobs) = filter.filter_projected(
&mut discard_table,
my_txp_info_view,
&recs,
seq.len(),
beta,
prob_source,
);
if !ag.is_empty() {
aln_group_alns.extend_from_slice(&ag);
aln_group_probs.extend_from_slice(&aprobs);
aln_group_boundaries.push(aln_group_alns.len());
if let Some(ref mut names_vec) = aln_group_read_names {
names_vec.push(query_name);
}
chunk_size += 1;
}
if chunk_size >= ALN_GROUP_CHUNK_LIMIT {
aln_group_sender
.send((
aln_group_alns.clone(),
aln_group_probs.clone(),
aln_group_boundaries.clone(),
aln_group_read_names,
))
.expect("Error sending alignment group");
aln_group_alns.clear();
aln_group_probs.clear();
aln_group_boundaries.clear();
aln_group_boundaries.push(0);
aln_group_read_names = write_assignment_probs.then(Vec::new);
chunk_size = 0;
}
}
}
if chunk_size > 0 {
aln_group_sender
.send((
aln_group_alns,
aln_group_probs,
aln_group_boundaries,
aln_group_read_names,
))
.expect("Error sending alignment group");
}
discard_table
})
})
.collect();
#[allow(clippy::useless_asref)]
let txps_mut = txps.as_mut();
let filter_opts_store = filter_opts.clone();
let aln_group_consumer = s.spawn(move || {
let mut name_vec = if filter_opts_store.write_assignment_probs {
Some(SwapVec::<String>::with_config(SwapVecConfig {
swap_after: Default::default(),
batch_size: Default::default(),
compression: Some(swapvec::Compression::Lz4),
}))
} else {
None
};
let mut store = InMemoryAlignmentStore::new(filter_opts_store, txp_header);
let pb = if args.quiet {
indicatif::ProgressBar::hidden()
} else {
indicatif::ProgressBar::new_spinner().with_message("Number of reads mapped")
};
pb.set_style(
indicatif::ProgressStyle::with_template(
"[{elapsed_precise}] {spinner:4.green/blue} {msg} {human_pos:>12}",
)
.unwrap()
.tick_chars("⠁⠁⠉⠙⠚⠒⠂⠂⠒⠲⠴⠤⠄⠄⠤⠠⠠⠤⠦⠖⠒⠐⠐⠒⠓⠋⠉⠈⠈"),
);
pb.set_draw_target(indicatif::ProgressDrawTarget::stderr_with_hz(4));
for (ags, aprobs, aln_boundaries, read_names) in aln_group_receiver {
let mut reversed_read_names = if let Some(mut names_vec) = read_names {
names_vec.reverse();
Some(names_vec)
} else {
None
};
for window in aln_boundaries.windows(2) {
pb.inc(1);
let ag = &ags[window[0]..window[1]];
let as_probs = &aprobs[window[0]..window[1]];
let read_name_opt = if let Some(ref mut names_vec) = reversed_read_names {
names_vec.pop()
} else {
None
};
if store.add_filtered_group(ag, as_probs, txps_mut) {
if let Some(ref mut nvec) = name_vec {
let read_name =
read_name_opt.unwrap_or(EMPTY_READ_NAME.to_string());
nvec.push(read_name)
.expect("cannot push name to read name vector");
}
if ag.len() == 1 {
store.inc_unique_alignments();
}
}
}
}
pb.finish_with_message("Finished aligning and projecting reads.");
(store, name_vec)
});
let aln_start = std::time::SystemTime::now();
let total_reads = producer.join().expect("Producer thread panicked");
let mut discard_tables: Vec<DiscardTable> = Vec::with_capacity(map_threads);
for consumer in consumers {
discard_tables.push(consumer.join().expect("Consumer thread panicked"));
}
drop(aln_group_sender);
let (mut store, name_vec) = aln_group_consumer
.join()
.expect("Alignment group consumer panicked");
info!(
"Parsed {} total reads",
total_reads.to_formatted_string(&Locale::en)
);
let n_gm = n_genome_mapped.load(std::sync::atomic::Ordering::Relaxed);
let n_pr = n_projected.load(std::sync::atomic::Ordering::Relaxed);
info!(
"reads aligned to genome: {}; of those, projected to >=1 transcript: {} ({:.2}%); projection-dropped: {}",
n_gm.to_formatted_string(&Locale::en),
n_pr.to_formatted_string(&Locale::en),
if n_gm > 0 { 100.0 * (n_pr as f64) / (n_gm as f64) } else { 0.0 },
(n_gm - n_pr).to_formatted_string(&Locale::en),
);
let aln_time = aln_start.elapsed()?;
info!(
"Spliced genome alignment + projection of raw reads took: {}",
humantime::format_duration(aln_time).to_string()
);
for dt in &discard_tables {
store.aggregate_discard_table(dt);
}
Ok((store, name_vec, aln_time))
},
)?;
perform_inference_and_write_output(
txp_header,
&mut store,
name_vec,
txps,
txps_name,
seqcol_digest,
aln_time,
args,
)
}
fn get_source_type(pb: &std::path::Path) -> InputSourceType {
let faq_endings = vec![
".fasta",
".fastq",
".FASTA",
".FASTQ",
".fa",
".fq",
".FA",
".FQ",
".fasta.gz",
".fastq.gz",
".FASTA.GZ",
".FASTQ.GZ",
".fa.gz",
".fq.gz",
".FA.GZ",
".FQ.GZ",
];
let ubam_endings = vec![".bam", ".BAM", ".ubam", ".UBAM"];
if let Some(ps) = pb.to_str() {
for fe in faq_endings {
if ps.ends_with(fe) {
return InputSourceType::Fastx;
}
}
for be in ubam_endings {
if ps.ends_with(be) {
return InputSourceType::Ubam;
}
}
}
InputSourceType::Unknown
}
#[allow(clippy::too_many_arguments)]
#[allow(unused_mut)]
pub fn quantify_bulk_alignments_raw_reads(
header: &noodles_sam::Header,
mut aligner: crate::util::mapper::Mapper,
filter_opts: AlignmentFilters,
read_paths: &[std::path::PathBuf],
txps: &mut [TranscriptInfo],
txps_name: &[String],
args: &Args,
seqcol_digest: NamedDigestVec,
) -> anyhow::Result<()> {
let mut txp_info_view: Vec<TranscriptInfo> = Vec::with_capacity(txps.len());
for ti in txps.iter() {
txp_info_view.push(ti.clone());
}
let map_threads = args.threads.saturating_sub(2).max(1);
type ReadGroup = ReadChunkWithNames;
type AlignmentGroupInfo = (Vec<AlnInfo>, Vec<f32>, Vec<usize>, Option<Vec<String>>);
let (read_sender, read_receiver): (Sender<ReadGroup>, Receiver<ReadGroup>) =
bounded(args.threads * 10);
const READ_CHUNK_SIZE: usize = 200;
let mut rpaths = vec![];
read_paths.clone_into(&mut rpaths);
let producer = std::thread::spawn(move || {
let mut ctr = 0_usize;
let mut chunk_size = 0_usize;
let mut read_chunk = ReadChunkWithNames::new();
let mark_chunk = |chunk_size: &mut usize,
ctr: &mut usize,
read_chunk: &mut ReadGroup,
read_sender: &Sender<ReadGroup>| {
*chunk_size += 1;
*ctr += 1;
if *chunk_size >= READ_CHUNK_SIZE {
read_sender
.send(read_chunk.clone())
.expect("Error sending sequence");
read_chunk.clear();
*chunk_size = 0;
}
};
for read_path in rpaths {
match get_source_type(&read_path) {
InputSourceType::Ubam => {
let mut reader = std::fs::File::open(read_path)
.map(bam::io::Reader::new)
.expect("could not create BAM reader");
let header = reader.read_header().expect("could not read BAM header");
for result in reader.record_bufs(&header) {
let record = result.expect("Error reading ubam record");
record.add_to_read_group(&mut read_chunk);
mark_chunk(&mut chunk_size, &mut ctr, &mut read_chunk, &read_sender);
}
}
s @ (InputSourceType::Fastx | InputSourceType::Unknown) => {
if matches!(s, InputSourceType::Unknown) {
warn!(
"could not determine input file type for {} from suffix; assuming (possibly gzipped) fastx",
read_path.display()
);
}
let mut reader =
parse_fastx_file(read_path).expect("valid path/file to read sequences");
while let Some(result) = reader.next() {
let record = result.expect("Error reading record");
record.add_to_read_group(&mut read_chunk);
mark_chunk(&mut chunk_size, &mut ctr, &mut read_chunk, &read_sender);
}
}
}
}
if chunk_size > 0 {
read_sender
.send(read_chunk)
.expect("Error sending sequence");
}
ctr
});
let (mut store, name_vec, aln_time) = std::thread::scope(
|s| -> anyhow::Result<(
InMemoryAlignmentStore,
Option<SwapVec<String>>,
std::time::Duration,
)> {
const ALN_GROUP_CHUNK_LIMIT: usize = 100;
let (aln_group_sender, aln_group_receiver): (
Sender<AlignmentGroupInfo>,
Receiver<AlignmentGroupInfo>,
) = bounded(args.threads * 100);
let write_assignment_probs: bool = args.write_assignment_probs.is_some();
let consumers: Vec<_> = (0..map_threads)
.map(|_| {
let receiver = read_receiver.clone();
let mut filter = filter_opts.clone();
let loc_aligner = aligner.clone();
let my_txp_info_view = &txp_info_view;
let aln_group_sender = aln_group_sender.clone();
s.spawn(move || {
let mut discard_table = DiscardTable::new();
let mut chunk_size = 0_usize;
let mut aln_group_alns: Vec<AlnInfo> = Vec::new();
let mut aln_group_probs: Vec<f32> = Vec::new();
let mut aln_group_boundaries: Vec<usize> = Vec::new();
let mut aln_group_read_names = write_assignment_probs.then(Vec::new);
aln_group_boundaries.push(0);
for read_chunk in receiver {
for (name, seq) in read_chunk.iter() {
let map_res_opt =
crate::util::mapper::map_read(&loc_aligner, name, seq);
if let Ok(mut mappings) = map_res_opt {
let (ag, aprobs) = filter.filter(
&mut discard_table,
header,
my_txp_info_view,
&mut mappings,
);
if !ag.is_empty() {
aln_group_alns.extend_from_slice(&ag);
aln_group_probs.extend_from_slice(&aprobs);
aln_group_boundaries.push(aln_group_alns.len());
if let Some(ref mut names_vec) = aln_group_read_names {
let name_str =
String::from_utf8_lossy(name).into_owned();
names_vec.push(name_str);
}
chunk_size += 1;
}
if chunk_size >= ALN_GROUP_CHUNK_LIMIT {
aln_group_sender
.send((
aln_group_alns.clone(),
aln_group_probs.clone(),
aln_group_boundaries.clone(),
aln_group_read_names,
))
.expect("Error sending alignment group");
aln_group_alns.clear();
aln_group_probs.clear();
aln_group_boundaries.clear();
aln_group_boundaries.push(0);
aln_group_read_names =
write_assignment_probs.then(Vec::new);
chunk_size = 0;
}
} else {
warn!(
"Error encountered mappread_ing read : {}",
map_res_opt.unwrap_err()
);
}
}
}
if chunk_size > 0 {
aln_group_sender
.send((
aln_group_alns,
aln_group_probs,
aln_group_boundaries,
aln_group_read_names,
))
.expect("Error sending alignment group");
}
discard_table
})
})
.collect();
#[allow(clippy::useless_asref)]
let txps_mut = txps.as_mut();
let filter_opts_store = filter_opts.clone();
let aln_group_consumer = s.spawn(move || {
let mut name_vec = if filter_opts_store.write_assignment_probs {
Some(SwapVec::<String>::with_config(SwapVecConfig {
swap_after: Default::default(),
batch_size: Default::default(),
compression: Some(swapvec::Compression::Lz4),
}))
} else {
None
};
let mut store = InMemoryAlignmentStore::new(filter_opts_store, header);
let pb = if args.quiet {
indicatif::ProgressBar::hidden()
} else {
indicatif::ProgressBar::new_spinner().with_message("Number of reads mapped")
};
pb.set_style(
indicatif::ProgressStyle::with_template(
"[{elapsed_precise}] {spinner:4.green/blue} {msg} {human_pos:>12}",
)
.unwrap()
.tick_chars("⠁⠁⠉⠙⠚⠒⠂⠂⠒⠲⠴⠤⠄⠄⠤⠠⠠⠤⠦⠖⠒⠐⠐⠒⠓⠋⠉⠈⠈"),
);
pb.set_draw_target(indicatif::ProgressDrawTarget::stderr_with_hz(4));
for (ags, aprobs, aln_boundaries, read_names) in aln_group_receiver {
let mut reversed_read_names = if let Some(mut names_vec) = read_names {
names_vec.reverse();
Some(names_vec)
} else {
None
};
for window in aln_boundaries.windows(2) {
pb.inc(1);
let group_start = window[0];
let group_end = window[1];
let ag = &ags[group_start..group_end];
let as_probs = &aprobs[group_start..group_end];
let read_name_opt = if let Some(ref mut names_vec) = reversed_read_names {
names_vec.pop()
} else {
None
};
if store.add_filtered_group(ag, as_probs, txps_mut) {
if let Some(ref mut nvec) = name_vec {
let read_name =
read_name_opt.unwrap_or(EMPTY_READ_NAME.to_string());
nvec.push(read_name)
.expect("cannot push name to read name vector");
}
if ag.len() == 1 {
store.inc_unique_alignments();
}
}
}
}
pb.finish_with_message("Finished aligning reads.");
(store, name_vec)
});
let aln_start = std::time::SystemTime::now();
let total_reads = producer.join().expect("Producer thread panicked");
let mut discard_tables: Vec<DiscardTable> = Vec::with_capacity(map_threads);
for consumer in consumers {
let dt = consumer.join().expect("Consumer thread panicked");
discard_tables.push(dt);
}
drop(aln_group_sender);
let (mut store, name_vec) = aln_group_consumer
.join()
.expect("Alignment group consumer panicked");
info!(
"Parsed {} total reads",
total_reads.to_formatted_string(&Locale::en)
);
let aln_time = aln_start.elapsed()?;
info!(
"Alignment of raw reads using rammap took: {}",
humantime::format_duration(aln_time).to_string()
);
for dt in &discard_tables {
store.aggregate_discard_table(dt);
}
Ok((store, name_vec, aln_time))
},
)?;
perform_inference_and_write_output(
header,
&mut store,
name_vec,
txps,
txps_name,
seqcol_digest,
aln_time,
args,
)
}