use anyhow::{Context, bail};
use iterator::{ClusterOrRecords, find_clusters};
use lib_tsalign::a_star_aligner::{
alignment_geometry::{AlignmentCoordinates, AlignmentRange},
alignment_result::alignment::Alignment,
template_switch_distance::AlignmentType,
};
use rust_htslib::{bam, bcf};
use std::{path::PathBuf, sync::Arc};
use tokio::{
fs::File,
io::{AsyncWriteExt, BufWriter},
pin,
sync::mpsc,
};
use tokio_stream::StreamExt;
use tracing::{error, instrument, warn, warn_span};
use crate::{
common::{
ImmutableSequence, SequencePair,
aligner::{AlignmentOrchestrator, AlignmentQuery, cli::CliAlignmentArgs},
alignment::ForwardAlignment,
contig::ContigName,
coords::{GenomePosition, GenomeRegion},
csv::CSVAuxData,
list_of_regions::Regions,
reference::{ReferenceQueryResult, ReferenceReader},
},
counter,
vcf::pipeline::reader::VCFReader,
};
use super::Message;
pub mod cluster;
mod iterator;
pub mod local_phasing;
pub mod phasing;
pub struct Clusterizer {
input: VCFReader,
targets: Option<Regions>,
reference: ReferenceReader,
output: mpsc::Sender<Message>,
settings: ClusterizerSettings,
pub bam: Option<bam::IndexedReader>,
pub phasing: local_phasing::PhasingSettings,
}
#[derive(clap::Args, Debug, Default)]
pub struct ClusterizerSettings {
#[command(flatten)]
pub cluster_strategy: cluster::ClusteringSettings,
#[command(flatten)]
pub aligner: CliAlignmentArgs,
#[arg(long = "output-unresolvable")]
pub unresolvable_out: Option<PathBuf>,
}
impl Clusterizer {
pub(super) fn new(
input: VCFReader,
reference: ReferenceReader,
targets: Option<Regions>,
output: mpsc::Sender<Message>,
settings: ClusterizerSettings,
bam: Option<bam::IndexedReader>,
phasing: local_phasing::PhasingSettings,
) -> Self {
Self {
input,
targets,
reference,
output,
settings,
bam,
phasing,
}
}
pub async fn run(self) -> anyhow::Result<()> {
let Self {
input,
targets,
reference,
output,
settings,
mut bam,
phasing: phasing_settings,
} = self;
let aligner = AlignmentOrchestrator::try_from(&settings.aligner)
.map_err(|e| anyhow::anyhow!("Cannot initialize aligners: {e}"))?;
let aligner_padding = settings.aligner.padding;
let aligner_range_extension = settings.aligner.range_extension;
let strategy = settings.cluster_strategy;
let vcf_name = input.name().to_string();
let clusters = find_clusters(strategy.clone(), input, targets);
pin!(clusters);
let mut cluster_id = 0usize;
let mut proximity_cluster_id = 0usize;
let mut unresolvable_out = if let Some(p) = settings.unresolvable_out {
Some(BufWriter::new(File::create(p).await?))
} else {
None
};
while let Some(e) = clusters.next().await {
match e {
ClusterOrRecords::Cluster(records) => {
proximity_cluster_id += 1;
counter!("clusters").inc(1);
let proximity_id_str = proximity_cluster_id.to_string();
let mut classified: Vec<Arc<bcf::Record>> =
records.iter().map(|r| Arc::new(r.clone())).collect();
let bam_consulted = if phasing::has_unphased_het(&classified) {
if let Some(bam_reader) = bam.as_mut() {
if let Ok(cluster_region) = extract_region(&records) {
classified = tokio::task::block_in_place(|| {
local_phasing::resolve_phasing(
classified,
&cluster_region,
bam_reader,
&phasing_settings,
)
});
true
} else {
counter!("clusters.phasing.skipped").inc(1);
false
}
} else {
counter!("clusters.phasing.skipped").inc(1);
false
}
} else {
counter!("clusters.phasing.not_needed").inc(1);
false
};
let sub_clusters =
phasing::split_into_haplotype_clusters(&classified, &strategy);
if sub_clusters.is_empty() {
if bam_consulted {
counter!("clusters.phasing.attempted.unsuccessful").inc(1);
}
counter!("clusters.unresolvable").inc(1);
if let Some(file) = &mut unresolvable_out {
file.write_all(
format!("{}\n", GenomeRegion::try_from(&records[..])?).as_bytes(),
)
.await?;
}
output
.send(Message::passthrough(records))
.await
.expect("Channel closed !?");
continue;
}
counter!("clusters.resolved").inc(1);
if bam_consulted {
counter!("clusters.phasing.attempted.rescued").inc(1);
}
for mut sub in sub_clusters {
cluster_id += 1;
sub.records.sort_by_key(|(r, _)| r.pos());
let recs: Vec<(bcf::Record, u32)> = sub
.records
.iter()
.map(|(r, a)| ((**r).clone(), *a))
.collect();
let sub_phasing =
phasing::OutputPhasing::from_subcluster(sub.haplo, sub.phaseset);
let pending = match prepare_cluster(
&reference,
&recs,
aligner_padding,
aligner_range_extension,
)
.await
{
Ok(Some(prepared)) => {
match aligner.get_or_compute_alignment(
reference.get_name(),
&prepared.reference_region.clone(),
prepared.cluster_region.clone(),
prepared.query.clone(),
) {
Ok(pending) => Some(Box::new((
pending,
CSVAuxData {
cluster_id: cluster_id.to_string(),
cluster_grp: proximity_id_str.clone(),
sequences: prepared.query.sequences,
ref_context_region: prepared.reference_region.clone(),
alt_context_region: prepared.reference_region,
cluster_region: prepared.cluster_region,
alt_id: Some(vcf_name.clone()),
forward_alignment: ForwardAlignment(
prepared.fw_alignment,
),
cost: aligner.costs.clone(),
reference_name: reference.get_name().to_string(),
},
))),
Err(e) => {
error!(
"Could not get or start the computation of an alignment: {e}"
);
None
}
}
}
Ok(None) => None,
Err((reg, err)) => {
let _s = reg
.map(|r| warn_span!("Failed to prepare", pos = %r).entered());
counter!("alignments.skipped.overlapping_mutations").inc(1);
warn!("{err}");
None
}
};
output
.send(Message::cluster(pending, recs, sub_phasing))
.await
.expect("Channel closed !?");
}
}
ClusterOrRecords::Records(records) => {
output
.send(Message::passthrough(records))
.await
.expect("Channel closed !?");
}
ClusterOrRecords::SequenceDone => continue,
}
}
if let Some(o) = &mut unresolvable_out {
o.flush().await?;
}
Ok(())
}
}
#[derive(Clone, Debug)]
struct PreparedCluster {
query: AlignmentQuery,
reference_region: GenomeRegion,
cluster_region: GenomeRegion,
fw_alignment: Alignment<AlignmentType>,
}
async fn prepare_cluster(
reference: &ReferenceReader,
cluster: &[(bcf::Record, u32)],
padding: usize,
range_extension: usize,
) -> Result<Option<PreparedCluster>, (Option<GenomeRegion>, anyhow::Error)> {
let query_region = extract_region(cluster.iter().map(|(r, _)| r)).map_err(|e| (None, e))?;
let Some(ReferenceQueryResult {
region: actual_region,
sequence: reference_sequence,
..
}) = reference
.get_seq(query_region.clone(), padding, padding)
.await
.map_err(|e| (Some(query_region.clone()), e))?
else {
return Ok(None);
};
let _span = warn_span!("prepare_cluster", pos = %query_region).entered();
let actual_padding_left =
query_region.start().position_0() - actual_region.start().position_0();
let actual_padding_right =
reference_sequence.len() - actual_padding_left - query_region.len().unwrap();
let (query_sequence, fw_alignment) = apply_mutations(
&reference_sequence,
actual_region.start().position_0(),
cluster.iter().map(|(r, a)| (r, *a)),
)
.map_err(|e| (Some(query_region.clone()), e))?;
let ranges = AlignmentRange::new_offset_limit(
AlignmentCoordinates::new(
actual_padding_left.saturating_sub(range_extension),
actual_padding_left.saturating_sub(range_extension),
),
AlignmentCoordinates::new(
(reference_sequence.len() - actual_padding_right + range_extension)
.min(reference_sequence.len()),
(query_sequence.len() - actual_padding_right + range_extension)
.min(query_sequence.len()),
),
);
Ok(Some(PreparedCluster {
query: AlignmentQuery {
sequences: SequencePair {
reference: reference_sequence,
query: query_sequence,
},
ranges,
},
reference_region: actual_region,
cluster_region: query_region,
fw_alignment,
}))
}
fn extract_region<'a>(
cluster: impl IntoIterator<Item = &'a bcf::Record>,
) -> anyhow::Result<GenomeRegion> {
let (start, end, contig) = {
let (mut start, mut end, mut contig) = (i64::MAX, 0, None);
for r in cluster {
start = start.min(r.pos());
end = end.max(r.end());
if contig.is_none()
&& let Some(present) = r.rid()
{
let name = r.header().rid2name(present)?;
contig = Some(ContigName::new(name));
}
}
(
usize::try_from(start)?,
usize::try_from(end)?,
contig.context("No rid present in records??")?,
)
};
let query_region = GenomeRegion::new_bounded(GenomePosition::new_0(contig, start), end - start);
Ok(query_region)
}
#[instrument(name = "build_query_sequence", skip_all)]
fn apply_mutations<'a>(
reference: &[u8],
reference_start: usize,
mutations: impl Iterator<Item = (&'a bcf::Record, u32)>,
) -> anyhow::Result<(ImmutableSequence, Alignment<AlignmentType>)> {
let mut alt_sequence = Vec::new();
let mut cigar = Alignment::new();
let mut last_end = reference_start;
let reference_end = reference_start + reference.len();
let pos2off = |pos: usize| pos - reference_start;
for (m, allele_idx) in mutations {
let mut pos = usize::try_from(m.pos())?;
let end = usize::try_from(m.end())?;
let alleles = m.alleles();
if alleles.len() <= 1 {
warn!("Record at {pos} has no alt allele");
continue;
}
let alt_idx = allele_idx as usize;
if allele_idx == 0 || alt_idx >= alleles.len() {
warn!(
"Record at {pos} has invalid allele index {allele_idx} (only {} alleles); skipping",
alleles.len()
);
continue;
}
let mut ref_allele = alleles[0];
let mut alt_allele = alleles[alt_idx];
if !ref_allele.is_empty() && !alt_allele.is_empty() && ref_allele[0] == alt_allele[0] {
ref_allele = &ref_allele[1..];
alt_allele = &alt_allele[1..];
pos += 1;
}
if ref_allele.is_empty() && alt_allele.is_empty() {
warn!("Empty record; Skipping.");
continue;
}
if pos < last_end {
bail!(
"Overlapping mutation: last mutation ended at {last_end}, \
but this one starts at {pos}"
);
}
if end > reference_end {
warn!(
"Skipping out-of-bounds mutation at {pos}: \
record end {end} exceeds reference end {reference_end}"
);
continue;
}
if pos > end {
warn!("Skip negative-sized mutation: the position is {pos} but the end is {end}",);
continue;
}
if pos - last_end > 0 {
alt_sequence.extend_from_slice(&reference[pos2off(last_end)..pos2off(pos)]);
cigar.push_n(pos - last_end, AlignmentType::PrimaryMatch);
}
alt_sequence.extend_from_slice(alt_allele);
match (ref_allele.len(), alt_allele.len()) {
(0, 0) => {
bail!("Empty record; should have been caught earlier!");
}
(1, 1) => {
cigar.push(AlignmentType::PrimarySubstitution);
}
(0, m) => {
cigar.push_n(m, AlignmentType::PrimaryInsertion);
}
(n, 0) => {
cigar.push_n(n, AlignmentType::PrimaryDeletion);
}
(n, m) => {
let match_len = n.min(m);
for i in 0..match_len {
if ref_allele[i] == alt_allele[i] {
cigar.push(AlignmentType::PrimaryMatch);
} else {
cigar.push(AlignmentType::PrimarySubstitution);
}
}
let extra = n.max(m) - match_len;
if n > m {
cigar.push_n(extra, AlignmentType::PrimaryDeletion);
} else {
cigar.push_n(extra, AlignmentType::PrimaryInsertion);
}
}
}
last_end = end;
}
alt_sequence.extend_from_slice(&reference[pos2off(last_end)..]);
cigar.push_n(
reference.len() - pos2off(last_end),
AlignmentType::PrimaryMatch,
);
Ok((alt_sequence.into(), cigar))
}