use bstr::ByteSlice;
use lib_tsalign::a_star_aligner::alignment_geometry::{AlignmentCoordinates, AlignmentRange};
use rust_htslib::bam;
use crate::{
common::{
aligner::{ AlignmentOrchestrator, AlignmentQuery, cli::CliAlignmentArgs},
list_of_regions::{CliTargetsArgs, Regions, RegionsDefinition},
reference::ReferenceReader,
},
counter,
reads::{
clusters::Cluster,
writer::{CliOutputArgs, ClusterResult, Writer},
},
};
pub struct Processor {
writer: Writer,
aligner: AlignmentOrchestrator,
targets: Option<Regions>,
padding: usize,
min_quality: Option<u8>,
}
#[derive(clap::Args, Debug)]
pub struct CliProcessorArgs {
#[command(flatten)]
output: CliOutputArgs,
#[command(flatten)]
targets: CliTargetsArgs,
#[command(flatten)]
alignment: CliAlignmentArgs,
#[arg(long, default_value_t = false)]
no_dedup: bool,
#[arg(long = "min-quality", value_name = "NUM")]
min_quality: Option<u8>,
}
impl Processor {
pub async fn new(args: &CliProcessorArgs) -> anyhow::Result<Self> {
let targets = args.targets.read_regions().await?;
let writer = Writer::spawn(&args.output)?;
let mut aligner = AlignmentOrchestrator::try_from(&args.alignment)?;
if !args.no_dedup {
aligner.enable_cache();
}
Ok(Self {
writer,
aligner,
targets,
padding: args.alignment.padding,
min_quality: args.min_quality,
})
}
pub async fn process_cluster(
&self,
cluster: Cluster,
bam_record: &bam::Record,
reference_reader: &ReferenceReader,
) -> anyhow::Result<()> {
if let Some(ref targets) = self.targets {
if !targets.contains(&cluster.region) {
return Ok(());
}
}
let read_seq = bam_record.seq();
if let Some(q) = self.min_quality {
let read_qual = bam_record.qual();
let pass =
(cluster.read_range.start..cluster.read_range.end).all(|ix| read_qual[ix] >= q);
if !pass {
counter!("filtered.by_base_quality").inc(1);
return Ok(());
}
}
let read_min_offset = cluster.read_range.start.saturating_sub(self.padding);
let read_max_offset = (cluster.read_range.end + self.padding).min(bam_record.seq_len());
let qry_seq = (read_min_offset..read_max_offset).map(|i| read_seq[i]);
let Some(reference) = reference_reader
.get_seq(cluster.region.clone(), self.padding, self.padding)
.await?
else {
counter!("filtered.by_soft_mask").inc(1);
return Ok(());
};
let ranges = AlignmentRange::new_offset_limit(
AlignmentCoordinates::new(
reference.range_in_sequence.start,
cluster.read_range.start - read_min_offset,
),
AlignmentCoordinates::new(
reference.range_in_sequence.end,
cluster.read_range.end - read_min_offset,
),
);
let query = AlignmentQuery {
sequences: crate::common::SequencePair {
reference: reference.sequence,
query: qry_seq.collect(),
},
ranges,
};
let rx = self.aligner.get_or_compute_alignment(
reference_reader.get_name(),
&reference.region,
cluster.region.clone(),
query,
)?;
Ok(self
.writer
.write(ClusterResult {
recv: rx,
region: cluster.region,
rec_pos: usize::try_from(bam_record.pos())?,
rec_seq_len: bam_record.seq_len(),
rec_qname: bam_record.qname().to_str()?.to_string(),
})
.await?)
}
pub async fn wait_until_done(self) {
self.writer.wait_until_done().await;
}
}