use bstr::ByteSlice;
use lib_tsalign::a_star_aligner::{
alignment_geometry::{AlignmentCoordinates, AlignmentRange},
alignment_result::alignment::Alignment,
template_switch_distance::AlignmentType,
};
use rust_htslib::bam::{self, record::Cigar};
use tracing::{instrument, warn};
use crate::{
common::{
aligner::{AlignmentOrchestrator, AlignmentQuery, cli::CliAlignmentArgs},
alignment::ForwardAlignment,
coords::{GenomePosition, GenomeRegion},
csv::CSVAuxData,
list_of_regions::{CliTargetsArgs, Regions, RegionsDefinition},
reference::ReferenceReader,
},
counter,
reads::{
clusters::Cluster,
writer::{CliOutputArgs, ClusterResult, Writer},
},
};
pub struct Processor {
cluster_counter: usize,
writer: Writer,
aligner: AlignmentOrchestrator,
targets: Option<Regions>,
padding: usize,
min_quality: Option<u8>,
}
#[derive(clap::Args, Debug)]
pub struct CliProcessorArgs {
#[command(flatten)]
output: CliOutputArgs,
#[command(flatten)]
targets: CliTargetsArgs,
#[command(flatten)]
alignment: CliAlignmentArgs,
#[arg(long, default_value_t = false)]
no_dedup: bool,
#[arg(long = "min-quality", value_name = "NUM")]
min_quality: Option<u8>,
}
impl Processor {
pub async fn new(args: &CliProcessorArgs) -> anyhow::Result<Self> {
let targets = args.targets.read_regions().await?;
let writer = Writer::spawn(&args.output)?;
let mut aligner = AlignmentOrchestrator::try_from(&args.alignment)?;
if !args.no_dedup {
aligner.enable_cache();
}
Ok(Self {
cluster_counter: 0,
writer,
aligner,
targets,
padding: args.alignment.padding,
min_quality: args.min_quality,
})
}
#[instrument(skip_all, fields(pos = %cluster.region))]
pub async fn process_cluster(
&mut self,
cluster: Cluster,
bam_record: &bam::Record,
read_cigar: &[Cigar],
reference_reader: &ReferenceReader,
) -> anyhow::Result<()> {
if let Some(ref targets) = self.targets
&& !targets.contains(&cluster.region)
{
return Ok(());
}
let read_seq = bam_record.seq();
if let Some(q) = self.min_quality {
let read_qual = bam_record.qual();
let pass =
(cluster.read_range.start..cluster.read_range.end).all(|ix| read_qual[ix] >= q);
if !pass {
counter!("filtered.by_base_quality").inc(1);
return Ok(());
}
}
let (sc_start, sc_end) = non_softclip_range(read_cigar);
let read_min_offset = cluster.read_range.start.saturating_sub(self.padding).max(sc_start);
let read_max_offset = (cluster.read_range.end + self.padding).min(sc_end);
let record_pos = GenomePosition::new_0(
cluster.region.contig().clone(),
usize::try_from(bam_record.pos())?,
);
let (alt_forward_ctx_cigar, alt_context_region) =
get_forward_cigar(read_cigar, read_min_offset, read_max_offset, record_pos);
let ref_padding_left = alt_context_region.start().abs_diff(cluster.region.start());
let ref_padding_right = alt_context_region
.end_excl()
.unwrap()
.abs_diff(&cluster.region.end_excl().unwrap());
let Some(reference) = reference_reader
.get_seq(cluster.region.clone(), ref_padding_left, ref_padding_right)
.await?
else {
counter!("filtered.by_soft_mask").inc(1);
return Ok(());
};
if reference.region != alt_context_region {
warn!(
"The regions of the reference and alt sequence are not equal; This should not happen"
);
}
let alt_seq = (read_min_offset..read_max_offset).map(|i| read_seq[i]);
let ranges = AlignmentRange::new_offset_limit(
AlignmentCoordinates::new(
reference.range_in_sequence.start,
cluster.read_range.start - read_min_offset,
),
AlignmentCoordinates::new(
reference.range_in_sequence.end,
cluster.read_range.end - read_min_offset,
),
);
let sequences = crate::common::SequencePair {
reference: reference.sequence,
query: alt_seq.collect(),
};
let query = AlignmentQuery {
sequences: sequences.clone(),
ranges: ranges.clone(),
};
let recv = self.aligner.get_or_compute_alignment(
reference_reader.get_name(),
&reference.region,
cluster.region.clone(),
query,
)?;
self.cluster_counter += 1;
let aux = CSVAuxData {
cluster_id: self.cluster_counter.to_string(),
sequences,
ref_context_region: reference.region,
alt_context_region,
cluster_region: cluster.region,
alt_id: Some(bam_record.qname().to_str()?.to_string()),
forward_alignment: alt_forward_ctx_cigar,
cost: self.aligner.costs.clone(),
};
Ok(self.writer.write(ClusterResult { recv, aux }).await?)
}
pub async fn wait_until_done(self) {
self.writer.wait_until_done().await;
}
}
fn get_forward_cigar(
cigar: &[Cigar],
read_min_offset: usize,
read_max_offset_excl: usize,
read_start_pos: GenomePosition,
) -> (ForwardAlignment, GenomeRegion) {
let mut read_pos = 0;
let mut ref_pos = read_start_pos.clone();
let mut fw: Alignment<AlignmentType> = Alignment::new();
let mut ref_start = None;
let mut ref_end = read_start_pos.clone();
for op in cigar {
if read_pos >= read_max_offset_excl {
break;
}
let read_len = match op {
Cigar::Match(n)
| Cigar::Ins(n)
| Cigar::SoftClip(n)
| Cigar::Equal(n)
| Cigar::Diff(n) => *n as usize,
Cigar::Del(_) | Cigar::RefSkip(_) | Cigar::HardClip(_) | Cigar::Pad(_) => 0,
};
let ref_len = match op {
Cigar::Match(n)
| Cigar::Del(n)
| Cigar::RefSkip(n)
| Cigar::Equal(n)
| Cigar::Diff(n) => *n as usize,
Cigar::Ins(_) | Cigar::SoftClip(_) | Cigar::HardClip(_) | Cigar::Pad(_) => 0,
};
let op_end = read_pos + read_len;
let in_window = if read_len == 0 {
read_pos >= read_min_offset } else {
op_end > read_min_offset };
if in_window {
if read_len == 0 {
let (n, ty) = cigar_to_alignment_type(*op);
fw.push_n(n, ty);
if ref_start.is_none() {
ref_start = Some(ref_pos.clone());
}
ref_end = ref_pos.clone() + ref_len;
} else {
let clipped_start = read_min_offset.max(read_pos);
let clipped_end = read_max_offset_excl.min(op_end);
let clipped_len = clipped_end - clipped_start;
if clipped_len > 0 {
let (_, ty) = cigar_to_alignment_type(*op);
fw.push_n(clipped_len, ty);
if ref_len > 0 {
let ref_clip_start = ref_pos.clone() + (clipped_start - read_pos);
if ref_start.is_none() {
ref_start = Some(ref_clip_start.clone());
}
let clipped_ref_len = clipped_len.min(ref_len - (clipped_start - read_pos));
ref_end = ref_clip_start + clipped_ref_len;
} else {
if ref_start.is_none() {
ref_start = Some(ref_pos.clone());
}
ref_end = ref_pos.clone();
}
}
}
}
ref_pos += ref_len;
read_pos = op_end.max(read_pos); }
let ref_start = ref_start.unwrap_or(read_start_pos);
let ref_region = GenomeRegion::from_incl_excl(ref_start, Some(ref_end)).unwrap();
(ForwardAlignment(fw), ref_region)
}
fn cigar_to_alignment_type(cigar: Cigar) -> (usize, AlignmentType) {
match cigar {
Cigar::Match(_) => {
unreachable!("We have replaced the matches with equals and diff earlier")
}
Cigar::SoftClip(_) => {
unreachable!("Soft clips are excluded from the window by non_softclip_range clamping")
}
Cigar::Ins(n) => (n as usize, AlignmentType::PrimaryInsertion),
Cigar::Del(n) | Cigar::RefSkip(n) => (n as usize, AlignmentType::PrimaryDeletion),
Cigar::HardClip(_) | Cigar::Pad(_) => (0, AlignmentType::PrimaryMatch),
Cigar::Equal(n) => (n as usize, AlignmentType::PrimaryMatch),
Cigar::Diff(n) => (n as usize, AlignmentType::PrimarySubstitution),
}
}
fn non_softclip_range(cigar: &[Cigar]) -> (usize, usize) {
let mut prefix = 0usize;
for op in cigar {
match op {
Cigar::HardClip(_) => {}
Cigar::SoftClip(n) => prefix += *n as usize,
_ => break,
}
}
let total: usize = cigar
.iter()
.map(|op| match op {
Cigar::Match(n)
| Cigar::Ins(n)
| Cigar::SoftClip(n)
| Cigar::Equal(n)
| Cigar::Diff(n) => *n as usize,
_ => 0,
})
.sum();
let mut suffix = 0usize;
for op in cigar.iter().rev() {
match op {
Cigar::HardClip(_) => {}
Cigar::SoftClip(n) => suffix += *n as usize,
_ => break,
}
}
(prefix, total - suffix)
}
#[cfg(test)]
mod tests {
use crate::common::contig::ContigName;
use super::*;
use rust_htslib::bam::record::Cigar::*;
fn cigar_ops(forward: &ForwardAlignment) -> Vec<(usize, AlignmentType)> {
forward.0.clone().into_inner()
}
fn chr1_100() -> GenomePosition {
let chr = ContigName::from("chr1");
GenomePosition::new_1(chr, 100)
}
#[test]
fn test_full_cigar_no_clipping() {
let cigar = vec![Equal(10)];
let (result, region) = get_forward_cigar(&cigar, 0, 10, chr1_100());
assert_eq!(cigar_ops(&result), vec![(10, AlignmentType::PrimaryMatch)]);
assert_eq!("chr1:100-109", region.to_string());
}
#[test]
fn test_split_single_run() {
let cigar = vec![Equal(10)];
let (result, region) = get_forward_cigar(&cigar, 3, 8, chr1_100());
assert_eq!(cigar_ops(&result), vec![(5, AlignmentType::PrimaryMatch)]);
assert_eq!("chr1:103-107", region.to_string());
}
#[test]
fn test_window_selects_second_op() {
let cigar = vec![Diff(5), Ins(4), Equal(5)];
let (result, region) = get_forward_cigar(&cigar, 5, 9, chr1_100());
assert_eq!(
cigar_ops(&result),
vec![(4, AlignmentType::PrimaryInsertion)]
);
assert_eq!("chr1:105-104", region.to_string()); }
#[test]
fn test_split_across_boundary() {
let cigar = vec![Diff(5), Equal(5)];
let (result, region) = get_forward_cigar(&cigar, 3, 7, chr1_100());
assert_eq!(
cigar_ops(&result),
vec![
(2, AlignmentType::PrimarySubstitution),
(2, AlignmentType::PrimaryMatch)
]
);
assert_eq!("chr1:103-106", region.to_string());
}
#[test]
fn test_deletion_included_when_inside_window() {
let cigar = vec![Equal(5), Del(3), Diff(5)];
let (result, region) = get_forward_cigar(&cigar, 5, 10, chr1_100());
assert_eq!(
cigar_ops(&result),
vec![
(3, AlignmentType::PrimaryDeletion),
(5, AlignmentType::PrimarySubstitution)
]
);
assert_eq!("chr1:105-112", region.to_string());
}
#[test]
fn test_deletion_excluded_before_window() {
let cigar = vec![Diff(5), Del(3), Equal(5)];
let (result, region) = get_forward_cigar(&cigar, 0, 5, chr1_100());
assert_eq!(
cigar_ops(&result),
vec![(5, AlignmentType::PrimarySubstitution)]
);
assert_eq!("chr1:100-104", region.to_string());
}
#[test]
fn test_ref_pos_skips_deletion_before_window() {
let cigar = vec![Equal(4), Del(6), Diff(4)];
let (result, region) = get_forward_cigar(&cigar, 4, 8, chr1_100());
assert_eq!(
cigar_ops(&result),
vec![
(6, AlignmentType::PrimaryDeletion),
(4, AlignmentType::PrimarySubstitution)
]
);
assert_eq!("chr1:104-113", region.to_string());
}
#[test]
fn test_insertion_before_window_does_not_shift_ref() {
let cigar = vec![Ins(3), Equal(5)];
let (result, region) = get_forward_cigar(&cigar, 1, 8, chr1_100());
assert_eq!(
cigar_ops(&result),
vec![
(2, AlignmentType::PrimaryInsertion),
(5, AlignmentType::PrimaryMatch)
]
);
assert_eq!("chr1:100-104", region.to_string());
}
#[test]
fn test_non_softclip_range_no_clips() {
let cigar = vec![Equal(10)];
assert_eq!(non_softclip_range(&cigar), (0, 10));
}
#[test]
fn test_non_softclip_range_leading_clip() {
let cigar = vec![SoftClip(3), Equal(10)];
assert_eq!(non_softclip_range(&cigar), (3, 13));
}
#[test]
fn test_non_softclip_range_trailing_clip() {
let cigar = vec![Equal(10), SoftClip(4)];
assert_eq!(non_softclip_range(&cigar), (0, 10));
}
#[test]
fn test_non_softclip_range_both_clips() {
let cigar = vec![SoftClip(2), Equal(10), SoftClip(3)];
assert_eq!(non_softclip_range(&cigar), (2, 12));
}
#[test]
fn test_softclip_excluded_from_window_leading() {
let cigar = vec![SoftClip(3), Equal(7)];
let (result, region) = get_forward_cigar(&cigar, 3, 10, chr1_100());
assert_eq!(cigar_ops(&result), vec![(7, AlignmentType::PrimaryMatch)]);
assert_eq!("chr1:100-106", region.to_string());
}
#[test]
fn test_softclip_excluded_from_window_trailing() {
let cigar = vec![Equal(7), SoftClip(3)];
let (result, region) = get_forward_cigar(&cigar, 0, 7, chr1_100());
assert_eq!(cigar_ops(&result), vec![(7, AlignmentType::PrimaryMatch)]);
assert_eq!("chr1:100-106", region.to_string());
}
#[test]
fn test_window_splits_op_with_deletion_inside() {
let cigar = vec![Diff(10), Del(5), Equal(10)];
let (result, region) = get_forward_cigar(&cigar, 7, 13, chr1_100());
assert_eq!(
cigar_ops(&result),
vec![
(3, AlignmentType::PrimarySubstitution),
(5, AlignmentType::PrimaryDeletion),
(3, AlignmentType::PrimaryMatch),
]
);
assert_eq!("chr1:107-117", region.to_string());
}
}