twitcher 0.1.8

Find template switch mutations in genomic data
use std::{path::PathBuf, time::Duration};

use anyhow::bail;
use itertools::Itertools;
use rust_htslib::{
    bam,
    bcf::{self, Read},
};
use tracing::{debug, instrument};
use url::Url;

use crate::{
    RunnableCommand,
    common::{
        aligner::RUNNING,
        list_of_regions::{Regions, RegionsDefinition},
        reference::ReferenceReader,
    },
    vcf::{
        cli::{Command, OutputOptions},
        pipeline::{
            reader::VCFReader,
            writer::{OutputWriter, region_writer::RegionWriter},
        },
    },
};

pub mod cli;
pub mod pipeline;
pub mod strings;

impl RunnableCommand for Command {
    #[instrument(name = "vcf", skip_all)]
    async fn run(self) -> anyhow::Result<()> {
        tokio::spawn(async {
            let mut interval = tokio::time::interval(Duration::from_secs(1));
            loop {
                interval.tick().await;
                let r = RUNNING.load(std::sync::atomic::Ordering::Relaxed);
                debug!("Running tasks: {r}");
            }
        });
        self.validate()?;
        let regions = self.regions.read_regions().await?;
        let input = tokio::task::block_in_place(move || open_input_file(&self.input, regions))?;
        let mut header = bcf::Header::from_template(input.header());
        augment_header(&mut header, self.no_version);
        let output = tokio::task::block_in_place(move || open_output_file(&self.output, &header))?;
        let reference =
            tokio::task::block_in_place(move || ReferenceReader::try_from(&self.reference))?;
        let targets = self.targets.read_regions().await?;
        let region_output = RegionWriter::from_parameter(self.ts_regions.as_deref()).await?;
        let bam_f = tokio::task::block_in_place(move || open_bam_file(self.bam_f.as_deref()));

        pipeline::run(
            input,
            output,
            reference,
            targets,
            region_output,
            self.pipeline_settings,
            bam_f,
        )
        .await?;

        Ok(())
    }
}

impl Command {
    fn validate(&self) -> anyhow::Result<()> {
        if self.output.output_file.as_ref().is_none_or(|o| o == "-")
            && self.ts_regions.as_ref().is_some_and(|o| o == "-")
        {
            bail!("Cannot output both vcf and regions output on stdout.");
        }
        Ok(())
    }
}

fn augment_header(header: &mut bcf::Header, no_version: bool) {
    #[allow(clippy::wildcard_imports)]
    use strings::*;
    // Eventually, what we add in this method will likely depend on the settings etc ...
    header.push_record(
            format!(
                "##INFO=<ID={VCF_TS_CIGARETS_KEY},Number=1,Type=String,Description=\"{VCF_TS_CIGARETS_DESC}\">"
            )
            .as_bytes(),
        );
    if !no_version {
        header.push_record(
            format!("##{VCF_HEADER_TWITCHER_VERSION_KEY}={TWITCHER_VERSION}",).as_bytes(),
        );
        header.push_record(
            format!("##{VCF_HEADER_TWITCHER_CMD}={}", std::env::args().join(" ")).as_bytes(),
        );
    }
}

fn open_bam_file(bam_f: Option<&str>) -> Option<bam::IndexedReader> {
    bam_f.map(|path| bam::IndexedReader::from_path(path).unwrap())
}

fn open_input_file(input: &str, regions: Option<Regions>) -> anyhow::Result<VCFReader> {
    if let Some(regions) = regions {
        let inner = match input {
            "-" => bail!("Cannot create indexed reader from stdin"),
            path_or_url => {
                if let Ok(url) = Url::parse(path_or_url) {
                    bcf::IndexedReader::from_url(&url)
                } else {
                    bcf::IndexedReader::from_path(path_or_url)
                }
            }
        }?;

        let regions =
            regions.into_linear(|chr| inner.header().name2rid(chr.as_ref()).unwrap() as usize);
        Ok(VCFReader::with_regions(inner, regions))
    } else {
        let inner = match input {
            "-" => bcf::Reader::from_stdin(),
            path_or_url => {
                if let Ok(url) = Url::parse(path_or_url) {
                    bcf::Reader::from_url(&url)
                } else {
                    bcf::Reader::from_path(path_or_url)
                }
            }
        }?;
        Ok(VCFReader::entire_file(inner))
    }
}

fn open_output_file(output: &OutputOptions, header: &bcf::Header) -> anyhow::Result<OutputWriter> {
    let inner = match output.output_file.as_deref() {
        None | Some("-") => bcf::Writer::from_stdout(header, true, bcf::Format::Vcf),
        Some(path_or_url) => {
            let url = Url::parse(path_or_url);
            let path = PathBuf::from(match &url {
                Ok(url) => url.path(),
                _ => path_or_url,
            });
            let compressed = matches!(
                path.extension().and_then(|e| e.to_str()),
                Some("gz" | "bgz")
            );
            let format = if path
                .file_name()
                .is_none_or(|n| n.to_str().is_some_and(|s| s.contains(".vcf")))
            {
                bcf::Format::Vcf
            } else {
                bcf::Format::Bcf
            };
            match url {
                Ok(url) => bcf::Writer::from_url(&url, header, !compressed, format),
                Err(_) => bcf::Writer::from_path(path, header, !compressed, format),
            }
        }
    }?;

    let writer = if let Some(plus_minus) = output.output_filter.realigned_and_context {
        OutputWriter::new_buffered(inner, i64::try_from(plus_minus)?)
    } else {
        OutputWriter::new_native(inner, output.output_filter.only_realigned)
    };
    Ok(writer)
}

#[cfg(test)]
mod tests {
    use std::io::Read as _;

    use rust_htslib::{
        bcf::{Header, Read},
        bgzf,
    };

    use crate::common::reference::CliReferenceArg;

    use super::*;

    #[test]
    fn validate_rejects_both_outputs_on_stdout() {
        let cmd = Command {
            output: None::<&str>.into(), // stdout (default)
            ts_regions: Some("-".into()),
            ..Default::default()
        };
        assert!(cmd.validate().is_err());
    }

    #[test]
    fn validate_allows_vcf_on_file_regions_on_stdout() {
        let cmd = Command {
            output: Some("/tmp/out.vcf").into(),
            ts_regions: Some("-".into()),
            ..Default::default()
        };
        assert!(cmd.validate().is_ok());
    }

    #[tokio::test(flavor = "multi_thread")]
    async fn test_pipeline() {
        let dir = tempfile::tempdir().unwrap();
        let outpath = format!("{}/out.vcf", dir.path().to_str().unwrap());
        let cmd = Command {
            input: "./test_files/test.vcf".to_string(),
            reference: CliReferenceArg::from("./test_files/test.fa"),
            output: Some(outpath.clone()).into(),
            ..Default::default()
        };
        cmd.run().await.unwrap();
        let output = std::fs::read_to_string(outpath).unwrap();
        let out_records = output.lines().filter(|l| !l.starts_with('#'));
        assert_eq!(out_records.count(), 2);
    }

    #[test]
    fn test_auto_detect_output_format_vcf() {
        let reader = open_input_file("./test_files/test.vcf", None).unwrap();
        let header = Header::from_template(reader.header());
        let dir = tempfile::tempdir().unwrap();
        let outpath_vcf = format!("{}/out.vcf", dir.path().to_str().unwrap());
        let writer = open_output_file(&Some(&outpath_vcf).into(), &header).unwrap();
        std::mem::drop(writer);

        let written_file = std::fs::read_to_string(outpath_vcf).unwrap();
        assert!(written_file.starts_with("##fileformat=VCFv4"));
    }

    #[test]
    fn test_auto_detect_output_format_vcf_gz() {
        let reader = open_input_file("./test_files/test.vcf", None).unwrap();
        let header = Header::from_template(reader.header());
        let dir = tempfile::tempdir().unwrap();
        let outpath_vcf_gz = format!("{}/out.vcf.gz", dir.path().to_str().unwrap());
        let writer = open_output_file(&Some(&outpath_vcf_gz).into(), &header).unwrap();
        std::mem::drop(writer);

        let mut written_file = Vec::new();
        bgzf::Reader::from_path(outpath_vcf_gz)
            .unwrap()
            .read_to_end(&mut written_file)
            .unwrap();
        assert!(written_file.starts_with(b"##fileformat=VCFv4"));
    }

    #[test]
    fn test_auto_detect_output_format_bcf() {
        let reader = open_input_file("./test_files/test.vcf", None).unwrap();
        let header = Header::from_template(reader.header());
        let dir = tempfile::tempdir().unwrap();
        let outpath_bcf = format!("{}/out.bcf", dir.path().to_str().unwrap());
        let writer = open_output_file(&Some(&outpath_bcf).into(), &header).unwrap();
        std::mem::drop(writer);

        let mut written_file = Vec::new();
        bgzf::Reader::from_path(outpath_bcf)
            .unwrap()
            .read_to_end(&mut written_file)
            .unwrap();
        assert!(written_file.starts_with(b"BCF"));
    }

    #[test]
    fn test_auto_detect_output_format_bcf_gz() {
        // .bcf.gz should be the same as bcf
        let reader = open_input_file("./test_files/test.vcf", None).unwrap();
        let header = Header::from_template(reader.header());
        let dir = tempfile::tempdir().unwrap();
        let outpath_bcf_gz = format!("{}/out.bcf.gz", dir.path().to_str().unwrap());
        let writer = open_output_file(&Some(&outpath_bcf_gz).into(), &header).unwrap();
        std::mem::drop(writer);

        let mut written_file = Vec::new();
        bgzf::Reader::from_path(outpath_bcf_gz)
            .unwrap()
            .read_to_end(&mut written_file)
            .unwrap();
        assert!(written_file.starts_with(b"BCF"));
    }
}