twitcher 0.1.8

Find template switch mutations in genomic data
use bstr::ByteSlice;
use lib_tsalign::a_star_aligner::alignment_geometry::{AlignmentCoordinates, AlignmentRange};
use rust_htslib::bam;

use crate::{
    common::{
        aligner::{ AlignmentOrchestrator, AlignmentQuery, cli::CliAlignmentArgs},
        list_of_regions::{CliTargetsArgs, Regions, RegionsDefinition},
        reference::ReferenceReader,
    },
    counter,
    reads::{
        clusters::Cluster,
        writer::{CliOutputArgs, ClusterResult, Writer},
    },
};

pub struct Processor {
    writer: Writer,
    aligner: AlignmentOrchestrator,
    targets: Option<Regions>,
    padding: usize,
    min_quality: Option<u8>,
}

#[derive(clap::Args, Debug)]
pub struct CliProcessorArgs {
    #[command(flatten)]
    output: CliOutputArgs,

    #[command(flatten)]
    targets: CliTargetsArgs,

    #[command(flatten)]
    alignment: CliAlignmentArgs,

    /// When specified, no effort is made to deduplicate clusters before aligning them.
    #[arg(long, default_value_t = false)]
    no_dedup: bool,

    /// Specify a minimum base quality (Phred-scaled) that all bases in a cluster must have.
    #[arg(long = "min-quality", value_name = "NUM")]
    min_quality: Option<u8>,
}

impl Processor {
    pub async fn new(args: &CliProcessorArgs) -> anyhow::Result<Self> {
        let targets = args.targets.read_regions().await?;
        let writer = Writer::spawn(&args.output)?;
        let mut aligner = AlignmentOrchestrator::try_from(&args.alignment)?;
        if !args.no_dedup {
            aligner.enable_cache();
        }
        Ok(Self {
            writer,
            aligner,
            targets,
            padding: args.alignment.padding,
            min_quality: args.min_quality,
        })
    }

    pub async fn process_cluster(
        &self,
        cluster: Cluster,
        bam_record: &bam::Record,
        reference_reader: &ReferenceReader,
    ) -> anyhow::Result<()> {
        if let Some(ref targets) = self.targets {
            if !targets.contains(&cluster.region) {
                return Ok(());
            }
        }

        let read_seq = bam_record.seq();

        if let Some(q) = self.min_quality {
            let read_qual = bam_record.qual();
            let pass =
                (cluster.read_range.start..cluster.read_range.end).all(|ix| read_qual[ix] >= q);
            if !pass {
                counter!("filtered.by_base_quality").inc(1);
                return Ok(());
            }
        }

        let read_min_offset = cluster.read_range.start.saturating_sub(self.padding);
        let read_max_offset = (cluster.read_range.end + self.padding).min(bam_record.seq_len());
        // We don't want to collect the whole sequence into a vec, but just the parts we need
        let qry_seq = (read_min_offset..read_max_offset).map(|i| read_seq[i]);

        let Some(reference) = reference_reader
            .get_seq(cluster.region.clone(), self.padding, self.padding)
            .await?
        else {
            counter!("filtered.by_soft_mask").inc(1);
            return Ok(());
        };

        let ranges = AlignmentRange::new_offset_limit(
            AlignmentCoordinates::new(
                reference.range_in_sequence.start,
                cluster.read_range.start - read_min_offset,
            ),
            AlignmentCoordinates::new(
                reference.range_in_sequence.end,
                cluster.read_range.end - read_min_offset,
            ),
        );

        let query = AlignmentQuery {
            sequences: crate::common::SequencePair {
                reference: reference.sequence,
                query: qry_seq.collect(),
            },
            ranges,
        };

        let rx = self.aligner.get_or_compute_alignment(
            reference_reader.get_name(),
            &reference.region,
            cluster.region.clone(),
            query,
        )?;

        Ok(self
            .writer
            .write(ClusterResult {
                recv: rx,
                region: cluster.region,
                rec_pos: usize::try_from(bam_record.pos())?,
                rec_seq_len: bam_record.seq_len(),
                rec_qname: bam_record.qname().to_str()?.to_string(),
            })
            .await?)
    }

    pub async fn wait_until_done(self) {
        self.writer.wait_until_done().await;
    }
}