use crate::{
catalog::TandemRepeatCatalog,
constants::{SAM_ID_TAG, SAM_READ_PAIR_TYPE_TAG, SAM_READ_TYPE_TAG},
positions::{HammingDistance, RepeatAlignmentPosition, RepeatAlignmentPositionSetGenerator},
records::{ReadKind, ReadOrder, ReadPairId, RecordManager},
reference::TandemRepeatReference,
sequence::Sequence,
util::Utf8String,
};
use anyhow::anyhow;
use anyhow::{Context, Ok, Result};
use bio::alphabets::dna::revcomp;
use rust_htslib::bam::FetchDefinition;
use rust_htslib::{
bam,
bam::{record::Aux, Read as BAMRead, Record},
};
use serde::ser::SerializeSeq;
use serde::{
de::{self, SeqAccess, Visitor},
Deserialize, Deserializer, Serialize, Serializer,
};
use std::collections::HashMap;
use std::fmt;
use std::hash::Hash;
use std::str::FromStr;
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct LocusFlankSizes {
pub left: HashMap<LocusId, u32>,
pub right: HashMap<LocusId, u32>,
}
#[derive(Clone)]
pub struct RepeatPurityScoreParams {
pub irr_score_min: f32,
pub base_qual_min: u8,
pub match_weight: f32,
pub mismatch_weight: f32,
pub low_qual_mismatch_weight: f32,
}
trait RecordRange {
fn start(&self) -> i64;
fn stop(&self) -> i64;
}
impl RecordRange for Record {
fn start(&self) -> i64 {
self.pos()
}
fn stop(&self) -> i64 {
self.start() + self.seq_len() as i64
}
}
#[derive(Debug, Serialize, Deserialize, Clone, Hash)]
pub struct Read {
pub qname: String,
pub source_contig: String,
pub source_pos: i64,
pub mapq: u8,
pub seq: Sequence,
pub seq_len: u32,
pub is_first_in_template: bool,
#[serde(
serialize_with = "serialize_vec_pos_tuple",
deserialize_with = "deserialize_vec_pos_tuple"
)]
pub position_set: Vec<(RepeatAlignmentPosition, HammingDistance)>,
}
impl Read {
pub fn from_record(
record: &Record,
record_contig: &str,
reference: &TandemRepeatReference,
num_lowest_distances: usize,
) -> Result<Self> {
let qname = String::from_utf8(record.qname().to_vec())?;
let source_contig = record_contig.to_string();
let source_pos = record.pos();
let mapq: u8 = record.mapq();
let seq_len = record.seq_len() as u32;
let is_first_in_template = record.is_first_in_template();
let mut seq = record.seq().as_bytes();
if record.is_reverse() {
seq = revcomp(seq);
}
let generator = RepeatAlignmentPositionSetGenerator::new(
&seq,
reference.motif.as_ref(),
reference.left_flank_seq.as_ref(),
reference.right_flank_seq.as_ref(),
num_lowest_distances,
);
let relative_pos = generator.relative_position_set();
Ok(Self {
qname,
source_pos,
source_contig,
mapq,
seq: seq.into(),
seq_len,
is_first_in_template,
position_set: relative_pos,
})
}
pub fn create_position_generator(
&self,
reference: &TandemRepeatReference,
num_lowest_distances: usize,
) -> RepeatAlignmentPositionSetGenerator {
RepeatAlignmentPositionSetGenerator::from_precomputed(
self.seq.as_ref(),
reference.motif.as_ref(),
reference.left_flank_seq.as_ref(),
reference.right_flank_seq.as_ref(),
num_lowest_distances,
self.position_set.clone(),
)
}
fn is_irr(&self) -> bool {
self.position_set
.iter()
.any(|(pos, _)| matches!(pos, RepeatAlignmentPosition::WithinRepeat(_, _)))
}
}
pub type LocusId = String;
pub fn extract_bag_of_reads(
reader: &mut bam::IndexedReader,
writer: &mut bam::Writer,
catalog: TandemRepeatCatalog,
extension_length: u32,
min_flank_mapq: u8,
max_irr_mapq: u8,
purity_score_params: &RepeatPurityScoreParams,
) -> Result<()> {
let header = reader.header().clone();
let ac = catalog
.build_ac()
.context("Failed to build Aho-Corasick automaton")?;
reader.fetch(FetchDefinition::All)?;
let mut record_manager = RecordManager::new();
for record_result in reader.rc_records() {
let record = record_result?;
if !is_primary_read(&record) {
continue;
}
let read_pair_id: ReadPairId = RecordManager::get_read_pair_id(&record);
debug!("Processing read pair {}", read_pair_id);
if record.mapq() <= max_irr_mapq {
let candidate_irr_for_loci = catalog.find_motifs(&record.seq().as_bytes(), &ac);
for locus in candidate_irr_for_loci {
if is_in_repeat_read(&record, &locus.motif, purity_score_params) {
debug!(
"Read {} is an in-repeat read for locus {}",
record.qname().to_string(),
locus.id
);
record_manager.add_read_to_locus_with_kind(
&locus.id,
record.as_ref(),
ReadKind::InRepeatRead,
);
}
}
}
if record.mapq() >= min_flank_mapq {
let tid = record.tid();
let contig = if tid >= 0 {
Some(header.tid2name(tid as u32).to_string())
} else {
None
};
if let Some(contig) = contig {
let extended_start = record.start() - extension_length as i64;
let extended_end = record.stop() + extension_length as i64;
let flank_for_loci = catalog.find(&contig, extended_start, extended_end);
for locus in flank_for_loci {
debug!(
"Read {} is a flanking read for locus {}",
record.qname().to_string(),
locus.id
);
record_manager.add_read_to_locus_with_kind(
&locus.id,
record.as_ref(),
ReadKind::FlankingRead,
);
}
}
}
}
for locus in catalog.iter() {
let incomplete_read_pairs_ids =
record_manager.incomplete_read_pair_ids_for_locus(&locus.id);
for read_pair_id in incomplete_read_pairs_ids {
let read_pair = record_manager.get_read_pair_by_id(&read_pair_id).unwrap();
let (read, read_order) = if read_pair.first.is_some() {
(read_pair.first.unwrap(), ReadOrder::First)
} else {
(read_pair.last.unwrap(), ReadOrder::Last)
};
let read_kind = record_manager
.get_read_kind(&locus.id, &read_pair_id, &read_order)
.unwrap();
if read_kind == ReadKind::FlankingRead && !read.is_mate_unmapped() {
let mate_tid = read.mtid();
let mate_pos = read.mpos();
if mate_tid >= 0 {
reader.fetch((mate_tid, mate_pos, mate_pos + 1))?;
for record_result in reader.rc_records() {
let record = record_result?;
if is_primary_read(&record) && record.qname().to_string() == read_pair.id {
record_manager.add_read_to_locus_with_kind(
&locus.id,
record.as_ref(),
ReadKind::FlankingRead,
);
break;
}
}
} else {
warn!(
"Read pair {} has a mate with invalid tid {}",
read_pair.id, mate_tid
);
}
}
}
}
let incomplete_read_pair_ids = record_manager.incomplete_read_pair_ids();
if !incomplete_read_pair_ids.is_empty() {
reader.fetch(FetchDefinition::Unmapped)?;
for record_result in reader.rc_records() {
let record = record_result?;
if is_primary_read(&record) {
let read_pair_id = RecordManager::get_read_pair_id(record.as_ref());
if incomplete_read_pair_ids.contains(&read_pair_id) {
let read_pair_locus_ids = record_manager.get_read_pair_locus_ids(&read_pair_id);
for locus_id in read_pair_locus_ids {
let read_pair = record_manager.get_read_pair_by_id(&read_pair_id).unwrap();
let read_order = if read_pair.first.is_some() {
ReadOrder::First
} else {
ReadOrder::Last
};
let read_kind = record_manager
.get_read_kind(&locus_id, &read_pair_id, &read_order)
.unwrap();
if read_kind == ReadKind::FlankingRead {
record_manager.add_read_to_locus_with_kind(
&locus_id,
record.as_ref(),
ReadKind::InRepeatRead,
);
}
}
}
}
}
}
for locus in catalog.iter() {
let read_pairs = record_manager.get_locus_read_pairs(&locus.id);
for read_pair in read_pairs {
if read_pair.len() != 2 {
continue;
}
let first_read_kind =
record_manager.get_read_kind(&locus.id, &read_pair.id, &ReadOrder::First);
let last_read_kind =
record_manager.get_read_kind(&locus.id, &read_pair.id, &ReadOrder::Last);
match (first_read_kind, last_read_kind) {
(Some(first_read_kind), Some(last_read_kind)) => {
let mut first_read = read_pair.first.as_ref().unwrap().clone();
let mut last_read = read_pair.last.as_ref().unwrap().clone();
let read_pair_kind = record_manager
.get_read_pair_kind(&locus.id, &read_pair.id)
.to_string();
clear_record_tags(&mut first_read)?;
clear_record_tags(&mut last_read)?;
first_read.push_aux(SAM_ID_TAG, Aux::String(&locus.id))?;
last_read.push_aux(SAM_ID_TAG, Aux::String(&locus.id))?;
first_read
.push_aux(SAM_READ_TYPE_TAG, Aux::String(&first_read_kind.to_string()))?;
last_read
.push_aux(SAM_READ_TYPE_TAG, Aux::String(&last_read_kind.to_string()))?;
first_read.push_aux(SAM_READ_PAIR_TYPE_TAG, Aux::String(&read_pair_kind))?;
last_read.push_aux(SAM_READ_PAIR_TYPE_TAG, Aux::String(&read_pair_kind))?;
writer.write(&first_read)?;
writer.write(&last_read)?;
}
_ => {
warn!(
"Read pair {} for locus {} is does not have their types defined",
read_pair.id, locus.id
);
}
}
}
}
Ok(())
}
fn clear_record_tags(record: &mut Record) -> Result<()> {
if record.aux(SAM_ID_TAG).is_ok() {
record.remove_aux(SAM_ID_TAG)?;
}
if record.aux(SAM_READ_TYPE_TAG).is_ok() {
record.remove_aux(SAM_READ_TYPE_TAG)?;
}
if record.aux(SAM_READ_PAIR_TYPE_TAG).is_ok() {
record.remove_aux(SAM_READ_PAIR_TYPE_TAG)?;
}
Ok(())
}
pub fn is_primary_read(read: &Record) -> bool {
!read.is_secondary() && !read.is_supplementary()
}
pub fn is_in_repeat_read(
record: &Record,
motif: &Sequence,
params: &RepeatPurityScoreParams,
) -> bool {
let seq = record.seq().as_bytes();
let base_quals = record.qual();
let mut motif = motif.clone();
let mut motif_rc = motif.revcomp();
for _ in 0..motif.len() {
if seq_passes_purity_score(&seq, base_quals, motif.as_ref(), params)
|| seq_passes_purity_score(&seq, base_quals, motif_rc.as_ref(), params)
{
return true;
}
motif.rotate_left(1);
motif_rc.rotate_left(1);
}
false
}
pub fn seq_passes_purity_score(
seq: &[u8],
base_quals: &[u8],
motif: &[u8],
params: &RepeatPurityScoreParams,
) -> bool {
let score = purity_score(seq, base_quals, motif, params);
score >= params.irr_score_min
}
pub fn purity_score(
seq: &[u8],
base_quals: &[u8],
motif: &[u8],
params: &RepeatPurityScoreParams,
) -> f32 {
if seq.is_empty() {
return 0.0;
}
let mut score = 0.0;
for (i, base) in seq.iter().enumerate() {
if *base == motif[i % motif.len()] {
score += params.match_weight;
} else if base_quals[i] >= params.base_qual_min {
score += params.mismatch_weight;
} else {
score += params.low_qual_mismatch_weight;
}
}
score / (seq.len() as f32)
}
pub fn get_locus_id(record: &Record) -> Result<&str> {
if let Aux::String(value) = record.aux(SAM_ID_TAG)? {
Ok(value)
} else {
Err(anyhow!(
"Read with qname={} does not have ID tag",
record.qname().to_string()
))
}
}
pub fn count_irrs(reads: &[Read]) -> usize {
reads.iter().filter(|r| r.is_irr()).count()
}
fn serialize_vec_pos_tuple<S>(
vec: &[(RepeatAlignmentPosition, HammingDistance)],
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut seq = serializer.serialize_seq(Some(vec.len()))?;
for tuple in vec {
let pos = serde_json::to_string(&tuple.0).unwrap();
let s = format!("({}: {})", &pos[1..pos.len() - 1], tuple.1);
seq.serialize_element(&s)?;
}
seq.end()
}
fn deserialize_vec_pos_tuple<'de, D>(
deserializer: D,
) -> Result<Vec<(RepeatAlignmentPosition, HammingDistance)>, D::Error>
where
D: Deserializer<'de>,
{
struct VecPosTupleVisitor;
impl<'de> Visitor<'de> for VecPosTupleVisitor {
type Value = Vec<(RepeatAlignmentPosition, HammingDistance)>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str(
"a list of strings representing (RepeatAlignmentPosition: HammingDistance) tuples",
)
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: SeqAccess<'de>,
{
let mut vec = Vec::new();
while let Some(value) = seq.next_element::<String>()? {
let value = value.trim_matches(|c| c == '(' || c == ')');
let mut parts = value.split(": ");
let position_str = parts
.next()
.ok_or_else(|| de::Error::custom("missing position part"))?;
let distance_str = parts
.next()
.ok_or_else(|| de::Error::custom("missing distance part"))?;
let repeat_pos: RepeatAlignmentPosition =
serde_json::from_str(&format!("\"{}\"", position_str)).map_err(|_| {
de::Error::custom("failed to parse RepeatAlignmentPosition")
})?;
let hamming_distance: HammingDistance =
HammingDistance::from_str(distance_str).map_err(de::Error::custom)?;
vec.push((repeat_pos, hamming_distance));
}
std::result::Result::Ok(vec)
}
}
deserializer.deserialize_seq(VecPosTupleVisitor)
}