twitcher 0.1.8

Find template switch mutations in genomic data
use std::sync::Arc;

use bstr::ByteSlice;
use rust_htslib::bam::{self, Read};
use tokio_stream::StreamExt;
use tracing::{Span, info, instrument, warn};
use tracing_indicatif::span_ext::IndicatifSpanExt;
use url::Url;

use crate::{
    RunnableCommand,
    common::{
        cluster_settings::ClusteringSettings,
        list_of_regions::{CliRegionsArgs, RegionsDefinition},
        reference::{CliReferenceArg, ReferenceReader},
    },
    counter,
    reads::{
        bam_reader::BAMReader,
        clusters::find_clusters,
        processor::{CliProcessorArgs, Processor},
    },
};

mod bam_reader;
mod clusters;
mod processor;
mod writer;

/// Scan through reads and find template switches in reads.
#[derive(clap::Args, Debug)]
pub struct Command {
    /// The input BAM file
    bam_file: String,

    /// Only analyze the first `n` reads. Respects -r/-R but not -t/-T.
    #[arg(short = 'n', long = "number-of-reads")]
    number_of_reads: Option<usize>,

    #[command(flatten)]
    ref_file: CliReferenceArg,

    #[command(flatten)]
    regions: CliRegionsArgs,

    #[command(flatten)]
    clustering: ClusteringSettings,

    #[command(flatten)]
    processor: CliProcessorArgs,
}

impl RunnableCommand for Command {
    #[instrument(name = "reads", skip_all, fields(indicatif.pb_show = true))]
    async fn run(self) -> anyhow::Result<()> {
        let mut reader = self.open_bam_file().await?;
        reader.set_reference(&self.ref_file.file)?; // enables CRAM
        let reference_reader = ReferenceReader::try_from(&self.ref_file)?;
        let processor = Processor::new(&self.processor).await?;
        let mut rec_buf = Arc::new(bam::Record::new());
        let skipped = counter!("reads.skipped");
        let num_targets = reader.header().target_count();
        let span = Span::current();
        span.pb_start();
        span.pb_set_length(num_targets.into());
        let mut read_counter = 0;
        loop {
            if let Some(max) = self.number_of_reads {
                if read_counter >= max {
                    info!("Maximum number of reads {max} reached, stopping.");
                    break;
                }
                read_counter += 1;
            }
            match tokio::task::block_in_place(|| reader.read(Arc::make_mut(&mut rec_buf))) {
                Some(result) => result?,
                None if reader.done() => break,
                None => continue,
            }

            if let Ok(tid) = u32::try_from(rec_buf.tid()) {
                span.pb_set_position(tid.into());
            }

            counter!("reads.total").inc(1);
            let seq_len = rec_buf.seq_len();
            if seq_len == 0 {
                if skipped.get() == 0 {
                    warn!(
                        "Skipping read {} with empty sequence.",
                        rec_buf.qname().as_bstr()
                    );
                }
                skipped.inc(1);
                continue;
            }

            if rec_buf.tid() < 0 {
                skipped.inc(1);
                continue;
            }

            let mut clusters =
                match find_clusters(reader.header(), rec_buf.clone(), &reference_reader, self.clustering.clone()).await {
                    Ok(clusters) => clusters,
                    Err(e) => {
                        warn!(
                            "Can't find clusters on read {}: {e}",
                            rec_buf.qname().as_bstr()
                        );
                        continue;
                    }
                };
            // if seq_len != expected_len {
            //     if skipped == 0 {
            //         warn!(
            //             "Skipping read {} with unexpected sequence length: The cigar string suggests the read should be {} long, but its lenth is {}.",
            //             rec_buf.qname().as_bstr(),
            //             expected_len,
            //             seq_len,
            //         );
            //     }
            //     skipped += 1;
            //     continue;
            // }

            while let Some(cluster) = clusters.next().await {
                counter!("clusters").inc(1);
                processor
                    .process_cluster(cluster, &rec_buf, &reference_reader)
                    .await?;
            }
        }
        processor.wait_until_done().await;
        if skipped.get() > 0 {
            warn!(
                "In total, {skipped} reads were skipped because they are either missing sequence or alignment information."
            );
        }

        Ok(())
    }
}

impl Command {
    async fn open_bam_file(&self) -> Result<bam_reader::BAMReader, anyhow::Error> {
        if let Some(regions) = self.regions.read_regions().await? {
            let inner = match self.bam_file.as_str() {
                "-" => anyhow::bail!("Cannot create indexed reader from stdin"),
                path_or_url => tokio::task::block_in_place(|| {
                    if let Ok(url) = Url::parse(path_or_url) {
                        bam::IndexedReader::from_url(&url)
                    } else {
                        bam::IndexedReader::from_path(path_or_url)
                    }
                }),
            }?;

            let regions = regions.into_linear(|a| inner.header().tid(a.as_ref()).unwrap() as usize);
            Ok(BAMReader::with_regions(inner, regions))
        } else {
            let inner = match self.bam_file.as_str() {
                "-" => bam::Reader::from_stdin(),
                path_or_url => tokio::task::block_in_place(|| {
                    if let Ok(url) = Url::parse(path_or_url) {
                        bam::Reader::from_url(&url)
                    } else {
                        bam::Reader::from_path(path_or_url)
                    }
                }),
            }?;
            Ok(BAMReader::entire_file(inner))
        }
    }
}