use std::io::Write;
mod builder;
use std::collections::{HashMap, HashSet};
use std::ffi::CString;
use std::fs::File;
use std::io::BufWriter;
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicU32;
use std::sync::{Arc, Mutex};
use crossbeam_channel as channel;
use log::{debug, info, trace, warn};
use needletail::parse_fastx_file;
use rayon::prelude::*;
pub use self::builder::Builder;
use crate::error::LrgeError;
use crate::estimate::per_read_estimate;
use crate::io::FastqRecordExt;
use crate::minimap2::{AlignerWrapper, Preset};
use crate::{io, unique_random_set, Estimate, Platform};
pub const DEFAULT_AVA_NUM_READS: usize = 25_000;
pub struct AvaStrategy {
input: PathBuf,
num_reads: usize,
num_bases: usize,
remove_internal: bool,
max_overhang_ratio: f32,
tmpdir: PathBuf,
threads: usize,
seed: Option<u64>,
platform: Platform,
}
impl AvaStrategy {
pub fn new<P: AsRef<Path>>(input: P) -> Self {
let builder = Builder::default();
builder.build(input)
}
pub fn num_reads(&self) -> usize {
self.num_reads
}
fn subsample_reads(&mut self) -> crate::Result<(PathBuf, usize)> {
debug!("Counting records in input file...");
let n_fq_reads = io::count_records(&self.input)?;
debug!("Found {} reads in input file", n_fq_reads);
if n_fq_reads > u32::MAX as usize {
let msg = format!(
"Number of reads in input file ({}) exceeds maximum allowed value ({})",
n_fq_reads,
u32::MAX
);
return Err(LrgeError::TooManyReadsError(msg));
}
if n_fq_reads < self.num_reads {
warn!(
"Number of reads in input file ({}) is less than the number requested ({})",
n_fq_reads, self.num_reads
);
self.num_reads = n_fq_reads;
}
let mut indices: HashSet<u32> =
unique_random_set(self.num_reads, n_fq_reads as u32, self.seed)
.iter()
.cloned()
.collect();
let out_file = self.tmpdir.join("reads.fa");
debug!("Writing subsampled reads to temporary files...");
let mut writer = File::create(&out_file).map(BufWriter::new)?;
let mut sum_len = 0;
let mut idx: u32 = 0;
io::iter_records(&self.input, |id, seq| {
if indices.remove(&idx) {
sum_len += seq.len();
writer.write_all(b">")?;
writer.write_all(id)?;
writer.write_all(b"\n")?;
writer.write_all(seq)?;
writer.write_all(b"\n")?;
}
idx += 1;
Ok(())
})?;
self.num_bases = sum_len;
debug!("Reads written to: {}", out_file.display());
debug!("Total bases written: {}", sum_len);
Ok((out_file, sum_len))
}
fn align_reads(
&self,
aln_wrapper: AlignerWrapper,
reads_file: PathBuf,
sum_len: usize,
) -> crate::Result<(Vec<f32>, u32)> {
let (sender, receiver) = channel::bounded(25_000);
let aligner = Arc::clone(&aln_wrapper.aligner); let overlap_threshold = aln_wrapper.aligner.mapopt.min_chain_score as u32;
let read_lengths: HashMap<Vec<u8>, usize> = HashMap::with_capacity(self.num_reads);
let read_lengths = Arc::new(Mutex::new(read_lengths));
let read_lengths_for_producer = Arc::clone(&read_lengths);
let producer = std::thread::spawn(move || -> Result<(), LrgeError> {
let mut fastx_reader = parse_fastx_file(&reads_file).map_err(|e| {
LrgeError::FastqParseError(format!("Error parsing FASTQ file: {e}",))
})?;
let read_lengths = read_lengths_for_producer;
while let Some(record) = fastx_reader.next() {
match record {
Ok(rec) => {
let rid = rec.read_id().to_owned();
let msg = io::Message::Data((rid.to_owned(), rec.seq().into_owned()));
{
let mut read_lengths_lock = read_lengths.lock().unwrap();
if read_lengths_lock.insert(rid, rec.num_bases()).is_some() {
return Err(LrgeError::DuplicateReadIdentifier(
String::from_utf8_lossy(rec.read_id()).to_string(),
));
}
}
if sender.send(msg).is_err() {
break; }
}
Err(e) => {
return Err(LrgeError::FastqParseError(format!(
"Error parsing query FASTQ file: {e}",
)));
}
}
}
drop(sender);
Ok(())
});
let paf_path = self.tmpdir.join("overlaps.paf");
let mut buf = File::create(&paf_path).map(BufWriter::new)?;
let writer = csv::WriterBuilder::new()
.has_headers(false)
.delimiter(b'\t')
.from_writer(&mut buf);
let writer = Arc::new(Mutex::new(writer));
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(self.threads)
.build()
.map_err(|e| {
LrgeError::ThreadError(format!("Error setting number of threads: {e}",))
})?;
let ovlap_counter: HashMap<Vec<u8>, usize> = HashMap::with_capacity(self.num_reads);
let ovlap_counter = Arc::new(Mutex::new(ovlap_counter));
let seen_pairs: HashSet<(Vec<u8>, Vec<u8>)> = HashSet::with_capacity(self.num_reads);
let seen_pairs = Arc::new(Mutex::new(seen_pairs));
debug!("Aligning reads and writing overlaps to PAF file...");
pool.install(|| -> Result<(), LrgeError> {
receiver
.into_iter()
.par_bridge() .try_for_each(|record| -> Result<(), LrgeError> {
let io::Message::Data((rid, seq)) = record;
trace!("Processing read: {}", String::from_utf8_lossy(&rid));
let qname = CString::new(rid.clone()).map_err(|e| {
LrgeError::MapError(format!("Error converting read name to CString: {e}",))
})?;
let mappings = aligner.map(&seq, Some(&qname)).map_err(|e| {
LrgeError::MapError(format!(
"Error mapping read {}: {}",
String::from_utf8_lossy(&rid),
e
))
})?;
{
let mut ovlap_counter_lock = ovlap_counter.lock().unwrap();
if !mappings.is_empty() {
let mut writer_lock = writer.lock().unwrap();
let mut seen_pairs_lock = seen_pairs.lock().unwrap();
for mapping in &mappings {
writer_lock.serialize(mapping)?;
let tname = &mapping.target_name;
if &rid == tname {
ovlap_counter_lock.entry(rid.clone()).or_insert(0);
continue;
}
if self.remove_internal
&& mapping.is_internal(self.max_overhang_ratio)
{
continue;
}
let pair = if &rid < tname {
(rid.clone(), tname.clone())
} else {
(tname.clone(), rid.clone())
};
if seen_pairs_lock.contains(&pair) {
continue;
} else {
seen_pairs_lock.insert(pair);
}
*ovlap_counter_lock.entry(tname.clone()).or_insert(0) += 1;
*ovlap_counter_lock.entry(rid.clone()).or_insert(0) += 1;
}
}
ovlap_counter_lock.entry(rid.clone()).or_insert(0);
}
Ok(())
})?;
Ok(())
})?;
producer.join().map_err(|e| {
LrgeError::ThreadError(format!("Thread panicked when joining: {e:?}",))
})??;
debug!("Overlaps written to: {}", paf_path.to_string_lossy());
let ovlap_counter = Arc::try_unwrap(ovlap_counter)
.unwrap()
.into_inner()
.unwrap();
let read_lengths = Arc::try_unwrap(read_lengths).unwrap().into_inner().unwrap();
let no_mapping_count = AtomicU32::new(0);
let estimates = ovlap_counter
.par_iter()
.map(|(rid, n_ovlaps)| {
let est = if *n_ovlaps == 0 {
no_mapping_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
trace!(
"No overlaps found for read: {}",
String::from_utf8_lossy(rid)
);
f32::INFINITY
} else {
let read_len = read_lengths.get(rid).unwrap();
let avg_read_len = sum_len as f32 / (self.num_reads - 1) as f32;
per_read_estimate(
*read_len,
avg_read_len,
self.num_reads - 1,
*n_ovlaps,
overlap_threshold,
)
};
trace!("Estimate for {}: {}", String::from_utf8_lossy(rid), est);
est
})
.collect();
let no_mapping_count = no_mapping_count.load(std::sync::atomic::Ordering::Relaxed);
if no_mapping_count > 0 {
let percent = (no_mapping_count as f32 / self.num_reads as f32) * 100.0;
info!(
"{} ({:.2}%) read(s) did not overlap any other reads",
no_mapping_count, percent
);
} else {
debug!("All reads had at least one overlap");
}
Ok((estimates, no_mapping_count))
}
}
impl Estimate for AvaStrategy {
fn generate_estimates(&mut self) -> crate::Result<(Vec<f32>, u32)> {
let (reads_file, sum_len) = self.subsample_reads()?;
let preset = match self.platform {
Platform::PacBio => Preset::AvaPb,
Platform::Nanopore => Preset::AvaOnt,
};
let aligner = AlignerWrapper::new(&reads_file, self.threads, preset, false)?;
self.align_reads(aligner, reads_file, sum_len)
}
}