use crate::bam_utils::{BamStats, add_bamnado_program_group, get_bam_header};
use crate::read_filter::BamReadFilter;
use anyhow::Result;
use crossbeam::channel::unbounded;
use indicatif::ProgressBar;
use noodles::bam;
use noodles::bam::bai;
use noodles::core::{Position, Region};
use noodles::bam::r#async::io::{Reader as AsyncReader, Writer as AsyncWriter};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use std::path::PathBuf;
use tokio::fs::File;
pub struct BamFilterer {
filepath: PathBuf,
output: PathBuf,
filter: BamReadFilter,
}
impl BamFilterer {
pub fn new(filepath: PathBuf, output: PathBuf, filter: BamReadFilter) -> Self {
BamFilterer {
filepath,
output,
filter,
}
}
pub async fn split_async(&self) -> Result<()> {
let filepath = self.filepath.clone();
let outfile = self.output.clone();
let mut reader = File::open(&filepath).await.map(AsyncReader::new)?;
let header = reader.read_header().await;
let header = match header {
Ok(header) => header,
Err(_e) => get_bam_header(&filepath)?,
};
let output_header = add_bamnado_program_group(&header)?;
let index_path = self.filepath.with_extension("bam.bai");
let index_file = File::open(&index_path).await?;
let mut index_reader = bai::r#async::io::Reader::new(index_file);
let index = index_reader.read_index().await?;
let mut writer = AsyncWriter::new(File::create(outfile).await?);
writer.write_header(&output_header).await?;
let chromsizes = header
.reference_sequences()
.iter()
.map(|(name, seq)| (name.to_string(), seq.length().get() as u64))
.collect::<std::collections::HashMap<_, _>>();
let query_regions = chromsizes.iter().map(|(name, size)| {
let start = Position::try_from(1).unwrap();
let end = Position::try_from(*size as usize).unwrap();
Region::new(name.to_string(), start..=end)
});
let progress = ProgressBar::new(chromsizes.len() as u64);
for region in query_regions {
progress.inc(1);
let mut query = reader.query(&header, &index, ®ion)?;
let mut record = bam::Record::default();
while query.read_record(&mut record).await? != 0 {
let is_valid = self.filter.is_valid(&record, Some(&header))?;
if is_valid {
writer.write_record(&output_header, &record).await?;
}
}
}
progress.finish();
writer.shutdown().await?;
Ok(())
}
pub fn split(&self) -> Result<()> {
let filepath = self.filepath.clone();
let bam_stats = BamStats::new(self.filepath.clone())?;
let genomic_chunks = bam_stats.genome_chunks(1e6 as u64)?;
let _chromsizes_refid = bam_stats
.chromsizes_ref_id()
.expect("Error getting chromsizes");
let _n_total_chunks = genomic_chunks.len();
let outfile = self.output.clone();
let (tx, rx) = unbounded();
let handle = std::thread::spawn(move || {
let _reader = bam::io::indexed_reader::Builder::default()
.build_from_path(&filepath)
.expect("Error reading file");
let header = get_bam_header(&filepath).expect("Error reading header");
let output_header =
add_bamnado_program_group(&header).expect("Error creating output BAM header");
let mut writer = bam::io::writer::Builder {}
.build_from_path(outfile)
.expect("Error writing to file");
writer
.write_header(&output_header)
.expect("Error writing BAM header");
for chunk in rx.iter() {
for record in chunk {
writer
.write_record(&output_header, &record)
.expect("Error writing record");
}
}
});
let filepath = self.filepath.clone();
genomic_chunks
.into_par_iter()
.for_each_with(tx, |tx, chunk| {
let mut reader = bam::io::indexed_reader::Builder::default()
.build_from_path(&filepath)
.expect("Error reading file");
let header = get_bam_header(&filepath).expect("Error reading header");
let records = reader
.query(&header, &chunk)
.expect("Error getting chunk of reads");
let filtered_records = records
.records()
.filter_map(|r| r.is_ok().then(|| r.unwrap()))
.filter(|record| self.filter.is_valid(record, Some(&header)).unwrap_or(false))
.collect::<Vec<_>>();
tx.send(filtered_records).expect("Error sending records");
});
handle.join().expect("Error joining threads");
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use noodles::sam::header::record::value::map::program::tag as pg_tag;
use tempfile::TempDir;
fn test_bam() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.parent()
.expect("Failed to get workspace root")
.join("test/data/test.bam")
}
#[test]
fn test_split_writes_bamnado_program_group_to_header() {
let temp_dir = TempDir::new().expect("Failed to create temp dir");
let output_bam = temp_dir.path().join("filtered.bam");
let filterer = BamFilterer::new(test_bam(), output_bam.clone(), BamReadFilter::default());
filterer.split().expect("Failed to split BAM");
let mut reader = bam::io::reader::Builder
.build_from_path(&output_bam)
.expect("Failed to open output BAM");
let header = reader
.read_header()
.expect("Failed to read output BAM header");
let program = header
.programs()
.as_ref()
.get(&b"bamnado"[..])
.expect("Missing bamnado @PG record");
assert_eq!(
program
.other_fields()
.get(&pg_tag::NAME)
.map(|v| v.as_ref()),
Some(&b"bamnado"[..])
);
assert_eq!(
program
.other_fields()
.get(&pg_tag::VERSION)
.map(|v| v.as_ref()),
Some(env!("CARGO_PKG_VERSION").as_bytes())
);
}
}