twitcher 0.1.9

Find template switch mutations in genomic data
use std::sync::Arc;

use bstr::ByteSlice;
use rust_htslib::bam::{self, Read};
use tokio_stream::StreamExt;
use tracing::{Span, info, instrument, warn};
use tracing_indicatif::span_ext::IndicatifSpanExt;
use url::Url;

use crate::{
    RunnableCommand,
    common::{
        cluster_settings::ClusteringSettings,
        list_of_regions::{CliRegionsArgs, RegionsDefinition},
        reference::{CliReferenceArg, ReferenceReader},
    },
    counter,
    reads::{
        bam_reader::BAMReader,
        clusters::find_clusters,
        processor::{CliProcessorArgs, Processor},
    },
};

mod bam_reader;
mod clusters;
mod processor;
mod writer;

/// Scan through reads and find template switches in reads.
#[derive(clap::Args, Debug)]
pub struct Command {
    /// The input BAM file
    bam_file: String,

    /// Only analyze the first `n` reads. Respects -r/-R but not -t/-T.
    #[arg(short = 'n', long = "number-of-reads")]
    number_of_reads: Option<usize>,

    #[command(flatten)]
    ref_file: CliReferenceArg,

    #[command(flatten)]
    regions: CliRegionsArgs,

    #[command(flatten)]
    clustering: ClusteringSettings,

    #[command(flatten)]
    processor: CliProcessorArgs,
}

impl RunnableCommand for Command {
    #[instrument(name = "reads", skip_all, fields(indicatif.pb_show = true))]
    async fn run(self) -> anyhow::Result<()> {
        let mut reader = self.open_bam_file().await?;
        reader.set_reference(&self.ref_file.file)?; // enables CRAM
        let reference_reader = ReferenceReader::try_from(&self.ref_file)?;
        let mut processor = Processor::new(&self.processor).await?;
        let mut rec_buf = Arc::new(bam::Record::new());
        let skipped = counter!("reads.skipped");
        let num_targets = reader.header().target_count();
        let span = Span::current();
        span.pb_start();
        span.pb_set_length(num_targets.into());
        let mut read_counter = 0;
        loop {
            if let Some(max) = self.number_of_reads {
                if read_counter >= max {
                    info!("Maximum number of reads {max} reached, stopping.");
                    break;
                }
                read_counter += 1;
            }
            {
                let rec_buf = Arc::make_mut(&mut rec_buf);
                match tokio::task::block_in_place(|| reader.read(rec_buf)) {
                    Some(result) => result?,
                    None if reader.done() => break,
                    None => continue,
                }
                rec_buf.cache_cigar();
            }

            if let Ok(tid) = u32::try_from(rec_buf.tid()) {
                span.pb_set_position(tid.into());
            }

            counter!("reads.total").inc(1);
            let seq_len = rec_buf.seq_len();
            if seq_len == 0 {
                if skipped.get() == 0 {
                    warn!(
                        "Skipping read {} with empty sequence.",
                        rec_buf.qname().as_bstr()
                    );
                }
                skipped.inc(1);
                continue;
            }

            if rec_buf.tid() < 0 {
                skipped.inc(1);
                continue;
            }

            let (cigar, mut clusters) = match find_clusters(
                reader.header(),
                rec_buf.clone(),
                &reference_reader,
                self.clustering.clone(),
            )
            .await
            {
                Ok(clusters) => clusters,
                Err(e) => {
                    warn!(
                        "Can't find clusters on read {}: {e}",
                        rec_buf.qname().as_bstr()
                    );
                    continue;
                }
            };

            while let Some(cluster) = clusters.next().await {
                counter!("clusters").inc(1);
                processor
                    .process_cluster(cluster, &rec_buf, &cigar, &reference_reader)
                    .await?;
            }
        }
        processor.wait_until_done().await;
        if skipped.get() > 0 {
            warn!(
                "In total, {skipped} reads were skipped because they are either missing sequence or alignment information."
            );
        }

        Ok(())
    }
}

impl Command {
    async fn open_bam_file(&self) -> Result<bam_reader::BAMReader, anyhow::Error> {
        if let Some(regions) = self.regions.read_regions().await? {
            let inner = match self.bam_file.as_str() {
                "-" => anyhow::bail!("Cannot create indexed reader from stdin"),
                path_or_url => tokio::task::block_in_place(|| {
                    if let Ok(url) = Url::parse(path_or_url) {
                        bam::IndexedReader::from_url(&url)
                    } else {
                        bam::IndexedReader::from_path(path_or_url)
                    }
                }),
            }?;

            let regions = regions.into_linear(|a| inner.header().tid(a.as_ref()).unwrap() as usize);
            Ok(BAMReader::with_regions(inner, regions))
        } else {
            let inner = match self.bam_file.as_str() {
                "-" => bam::Reader::from_stdin(),
                path_or_url => tokio::task::block_in_place(|| {
                    if let Ok(url) = Url::parse(path_or_url) {
                        bam::Reader::from_url(&url)
                    } else {
                        bam::Reader::from_path(path_or_url)
                    }
                }),
            }?;
            Ok(BAMReader::entire_file(inner))
        }
    }
}